diff --git a/api/core/ops/langfuse_trace/langfuse_trace.py b/api/core/ops/langfuse_trace/langfuse_trace.py index b9ba068b19..19d0c5145b 100644 --- a/api/core/ops/langfuse_trace/langfuse_trace.py +++ b/api/core/ops/langfuse_trace/langfuse_trace.py @@ -29,7 +29,7 @@ from core.ops.langfuse_trace.entities.langfuse_trace_entity import ( ) from core.ops.utils import filter_none_values from extensions.ext_database import db -from models.model import EndUser +from models.model import EndUser, Message from models.workflow import WorkflowNodeExecution logger = logging.getLogger(__name__) @@ -213,9 +213,32 @@ class LangFuseDataTrace(BaseTraceInstance): if process_data and process_data.get("model_mode") == "chat": total_token = metadata.get("total_tokens", 0) + + # through workflow_run_id get message data + message_data = ( + db.session.query( + Message.answer_tokens, # input + Message.message_tokens, # output + ) + .filter(Message.workflow_run_id == trace_info.workflow_run_id) + .first() + ) + + if message_data: + # chatflow data + input_tokens = message_data.message_tokens + output_tokens = message_data.answer_tokens + else: + # workflow data + input_tokens = json.loads(node_execution.outputs).get("usage", {}).get("prompt_tokens", 0) + output_tokens = json.loads(node_execution.outputs).get("usage", {}).get("completion_tokens", 0) + # add generation generation_usage = GenerationUsage( total=total_token, + input=input_tokens, + output=output_tokens, + unit=UnitEnum.TOKENS, ) node_generation_data = LangfuseGeneration( diff --git a/api/tasks/ops_trace_task.py b/api/tasks/ops_trace_task.py index bb3b9e17ea..ed57f663e9 100644 --- a/api/tasks/ops_trace_task.py +++ b/api/tasks/ops_trace_task.py @@ -49,6 +49,6 @@ def process_trace_tasks(file_info): except Exception: failed_key = f"{OPS_TRACE_FAILED_KEY}_{app_id}" redis_client.incr(failed_key) - logging.info(f"Processing trace tasks failed, app_id: {app_id}") + logging.exception(f"Processing trace tasks failed, app_id: {app_id}") finally: storage.delete(file_path)