mirror of
https://git.mirrors.martin98.com/https://github.com/langgenius/dify.git
synced 2025-08-14 06:05:51 +08:00
fix: sending app trace data to other app trace provider (#6931)
This commit is contained in:
parent
70283f5b9f
commit
bcd7c8e921
@ -153,27 +153,12 @@ class OpsTraceManager:
|
|||||||
def get_ops_trace_instance(
|
def get_ops_trace_instance(
|
||||||
cls,
|
cls,
|
||||||
app_id: Optional[Union[UUID, str]] = None,
|
app_id: Optional[Union[UUID, str]] = None,
|
||||||
message_id: Optional[str] = None,
|
|
||||||
conversation_id: Optional[str] = None
|
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Get ops trace through model config
|
Get ops trace through model config
|
||||||
:param app_id: app_id
|
:param app_id: app_id
|
||||||
:param message_id: message_id
|
|
||||||
:param conversation_id: conversation_id
|
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
if conversation_id is not None:
|
|
||||||
conversation_data: Conversation = db.session.query(Conversation).filter(
|
|
||||||
Conversation.id == conversation_id
|
|
||||||
).first()
|
|
||||||
if conversation_data:
|
|
||||||
app_id = conversation_data.app_id
|
|
||||||
|
|
||||||
if message_id is not None:
|
|
||||||
record: Message = db.session.query(Message).filter(Message.id == message_id).first()
|
|
||||||
app_id = record.app_id
|
|
||||||
|
|
||||||
if isinstance(app_id, UUID):
|
if isinstance(app_id, UUID):
|
||||||
app_id = str(app_id)
|
app_id = str(app_id)
|
||||||
|
|
||||||
@ -297,6 +282,8 @@ class TraceTask:
|
|||||||
self.kwargs = kwargs
|
self.kwargs = kwargs
|
||||||
self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
|
self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
|
||||||
|
|
||||||
|
self.app_id = None
|
||||||
|
|
||||||
def execute(self):
|
def execute(self):
|
||||||
return self.preprocess()
|
return self.preprocess()
|
||||||
|
|
||||||
@ -667,13 +654,11 @@ trace_manager_batch_size = int(os.getenv("TRACE_QUEUE_MANAGER_BATCH_SIZE", 100))
|
|||||||
|
|
||||||
|
|
||||||
class TraceQueueManager:
|
class TraceQueueManager:
|
||||||
def __init__(self, app_id=None, conversation_id=None, message_id=None):
|
def __init__(self, app_id=None):
|
||||||
global trace_manager_timer
|
global trace_manager_timer
|
||||||
|
|
||||||
self.app_id = app_id
|
self.app_id = app_id
|
||||||
self.conversation_id = conversation_id
|
self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
|
||||||
self.message_id = message_id
|
|
||||||
self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id, conversation_id, message_id)
|
|
||||||
self.flask_app = current_app._get_current_object()
|
self.flask_app = current_app._get_current_object()
|
||||||
if trace_manager_timer is None:
|
if trace_manager_timer is None:
|
||||||
self.start_timer()
|
self.start_timer()
|
||||||
@ -683,6 +668,7 @@ class TraceQueueManager:
|
|||||||
global trace_manager_queue
|
global trace_manager_queue
|
||||||
try:
|
try:
|
||||||
if self.trace_instance:
|
if self.trace_instance:
|
||||||
|
trace_task.app_id = self.app_id
|
||||||
trace_manager_queue.put(trace_task)
|
trace_manager_queue.put(trace_task)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.debug(f"Error adding trace task: {e}")
|
logging.debug(f"Error adding trace task: {e}")
|
||||||
@ -721,9 +707,7 @@ class TraceQueueManager:
|
|||||||
for task in tasks:
|
for task in tasks:
|
||||||
trace_info = task.execute()
|
trace_info = task.execute()
|
||||||
task_data = {
|
task_data = {
|
||||||
"app_id": self.app_id,
|
"app_id": task.app_id,
|
||||||
"conversation_id": self.conversation_id,
|
|
||||||
"message_id": self.message_id,
|
|
||||||
"trace_info_type": type(trace_info).__name__,
|
"trace_info_type": type(trace_info).__name__,
|
||||||
"trace_info": trace_info.model_dump() if trace_info else {},
|
"trace_info": trace_info.model_dump() if trace_info else {},
|
||||||
}
|
}
|
||||||
|
@ -22,10 +22,8 @@ def process_trace_tasks(tasks_data):
|
|||||||
|
|
||||||
trace_info = tasks_data.get('trace_info')
|
trace_info = tasks_data.get('trace_info')
|
||||||
app_id = tasks_data.get('app_id')
|
app_id = tasks_data.get('app_id')
|
||||||
conversation_id = tasks_data.get('conversation_id')
|
|
||||||
message_id = tasks_data.get('message_id')
|
|
||||||
trace_info_type = tasks_data.get('trace_info_type')
|
trace_info_type = tasks_data.get('trace_info_type')
|
||||||
trace_instance = OpsTraceManager.get_ops_trace_instance(app_id, conversation_id, message_id)
|
trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
|
||||||
|
|
||||||
if trace_info.get('message_data'):
|
if trace_info.get('message_data'):
|
||||||
trace_info['message_data'] = Message.from_dict(data=trace_info['message_data'])
|
trace_info['message_data'] = Message.from_dict(data=trace_info['message_data'])
|
||||||
|
Loading…
x
Reference in New Issue
Block a user