diff --git a/api/core/ops/base_trace_instance.py b/api/core/ops/base_trace_instance.py index f7b882fc71..8593198bc2 100644 --- a/api/core/ops/base_trace_instance.py +++ b/api/core/ops/base_trace_instance.py @@ -1,7 +1,11 @@ from abc import ABC, abstractmethod +from sqlalchemy.orm import Session + from core.ops.entities.config_entity import BaseTracingConfig from core.ops.entities.trace_entity import BaseTraceInfo +from extensions.ext_database import db +from models import Account, App, TenantAccountJoin class BaseTraceInstance(ABC): @@ -24,3 +28,38 @@ class BaseTraceInstance(ABC): Subclasses must implement specific tracing logic for activities. """ ... + + def get_service_account_with_tenant(self, app_id: str) -> Account: + """ + Get service account for an app and set up its tenant. + + Args: + app_id: The ID of the app + + Returns: + Account: The service account with tenant set up + + Raises: + ValueError: If app, creator account or tenant cannot be found + """ + with Session(db.engine, expire_on_commit=False) as session: + # Get the app to find its creator + app = session.query(App).filter(App.id == app_id).first() + if not app: + raise ValueError(f"App with id {app_id} not found") + + if not app.created_by: + raise ValueError(f"App with id {app_id} has no creator (created_by is None)") + + service_account = session.query(Account).filter(Account.id == app.created_by).first() + if not service_account: + raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}") + + current_tenant = ( + session.query(TenantAccountJoin).filter_by(account_id=service_account.id, current=True).first() + ) + if not current_tenant: + raise ValueError(f"Current tenant not found for account {service_account.id}") + service_account.set_tenant_id(current_tenant.tenant_id) + + return service_account diff --git a/api/core/ops/langfuse_trace/langfuse_trace.py b/api/core/ops/langfuse_trace/langfuse_trace.py index fa1c6b4557..0ea74e9ef0 100644 --- a/api/core/ops/langfuse_trace/langfuse_trace.py +++ b/api/core/ops/langfuse_trace/langfuse_trace.py @@ -4,7 +4,7 @@ from datetime import datetime, timedelta from typing import Optional from langfuse import Langfuse # type: ignore -from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.orm import sessionmaker from core.ops.base_trace_instance import BaseTraceInstance from core.ops.entities.config_entity import LangfuseConfig @@ -31,8 +31,7 @@ from core.ops.utils import filter_none_values from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.workflow.nodes.enums import NodeType from extensions.ext_database import db -from models import Account, App, EndUser, WorkflowNodeExecutionTriggeredFrom -from models.account import TenantAccountJoin +from models import EndUser, WorkflowNodeExecutionTriggeredFrom logger = logging.getLogger(__name__) @@ -115,28 +114,11 @@ class LangFuseDataTrace(BaseTraceInstance): # through workflow_run_id get all_nodes_execution using repository session_factory = sessionmaker(bind=db.engine) # Find the app's creator account - with Session(db.engine, expire_on_commit=False) as session: - # Get the app to find its creator - app_id = trace_info.metadata.get("app_id") - if not app_id: - raise ValueError("No app_id found in trace_info metadata") + app_id = trace_info.metadata.get("app_id") + if not app_id: + raise ValueError("No app_id found in trace_info metadata") - app = session.query(App).filter(App.id == app_id).first() - if not app: - raise ValueError(f"App with id {app_id} not found") - - if not app.created_by: - raise ValueError(f"App with id {app_id} has no creator (created_by is None)") - - service_account = session.query(Account).filter(Account.id == app.created_by).first() - if not service_account: - raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}") - current_tenant = ( - session.query(TenantAccountJoin).filter_by(account_id=service_account.id, current=True).first() - ) - if not current_tenant: - raise ValueError(f"Current tenant not found for account {service_account.id}") - service_account.set_tenant_id(current_tenant.tenant_id) + service_account = self.get_service_account_with_tenant(app_id) workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( session_factory=session_factory, diff --git a/api/core/ops/langsmith_trace/langsmith_trace.py b/api/core/ops/langsmith_trace/langsmith_trace.py index e4183ea1e2..8a392940db 100644 --- a/api/core/ops/langsmith_trace/langsmith_trace.py +++ b/api/core/ops/langsmith_trace/langsmith_trace.py @@ -6,7 +6,7 @@ from typing import Optional, cast from langsmith import Client from langsmith.schemas import RunBase -from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.orm import sessionmaker from core.ops.base_trace_instance import BaseTraceInstance from core.ops.entities.config_entity import LangSmithConfig @@ -31,7 +31,7 @@ from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey from core.workflow.nodes.enums import NodeType from extensions.ext_database import db -from models import Account, App, EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom +from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom logger = logging.getLogger(__name__) @@ -139,22 +139,11 @@ class LangSmithDataTrace(BaseTraceInstance): # through workflow_run_id get all_nodes_execution using repository session_factory = sessionmaker(bind=db.engine) # Find the app's creator account - with Session(db.engine, expire_on_commit=False) as session: - # Get the app to find its creator - app_id = trace_info.metadata.get("app_id") - if not app_id: - raise ValueError("No app_id found in trace_info metadata") + app_id = trace_info.metadata.get("app_id") + if not app_id: + raise ValueError("No app_id found in trace_info metadata") - app = session.query(App).filter(App.id == app_id).first() - if not app: - raise ValueError(f"App with id {app_id} not found") - - if not app.created_by: - raise ValueError(f"App with id {app_id} has no creator (created_by is None)") - - service_account = session.query(Account).filter(Account.id == app.created_by).first() - if not service_account: - raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}") + service_account = self.get_service_account_with_tenant(app_id) workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( session_factory=session_factory, diff --git a/api/core/ops/opik_trace/opik_trace.py b/api/core/ops/opik_trace/opik_trace.py index f7a4464267..f4d2760ba5 100644 --- a/api/core/ops/opik_trace/opik_trace.py +++ b/api/core/ops/opik_trace/opik_trace.py @@ -6,7 +6,7 @@ from typing import Optional, cast from opik import Opik, Trace from opik.id_helpers import uuid4_to_uuid7 -from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.orm import sessionmaker from core.ops.base_trace_instance import BaseTraceInstance from core.ops.entities.config_entity import OpikConfig @@ -25,7 +25,7 @@ from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey from core.workflow.nodes.enums import NodeType from extensions.ext_database import db -from models import Account, App, EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom +from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom logger = logging.getLogger(__name__) @@ -154,22 +154,11 @@ class OpikDataTrace(BaseTraceInstance): # through workflow_run_id get all_nodes_execution using repository session_factory = sessionmaker(bind=db.engine) # Find the app's creator account - with Session(db.engine, expire_on_commit=False) as session: - # Get the app to find its creator - app_id = trace_info.metadata.get("app_id") - if not app_id: - raise ValueError("No app_id found in trace_info metadata") + app_id = trace_info.metadata.get("app_id") + if not app_id: + raise ValueError("No app_id found in trace_info metadata") - app = session.query(App).filter(App.id == app_id).first() - if not app: - raise ValueError(f"App with id {app_id} not found") - - if not app.created_by: - raise ValueError(f"App with id {app_id} has no creator (created_by is None)") - - service_account = session.query(Account).filter(Account.id == app.created_by).first() - if not service_account: - raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}") + service_account = self.get_service_account_with_tenant(app_id) workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( session_factory=session_factory, diff --git a/api/core/ops/weave_trace/weave_trace.py b/api/core/ops/weave_trace/weave_trace.py index b12380be47..cfc8a505bb 100644 --- a/api/core/ops/weave_trace/weave_trace.py +++ b/api/core/ops/weave_trace/weave_trace.py @@ -6,7 +6,7 @@ from typing import Any, Optional, cast import wandb import weave -from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.orm import sessionmaker from core.ops.base_trace_instance import BaseTraceInstance from core.ops.entities.config_entity import WeaveConfig @@ -26,7 +26,7 @@ from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey from core.workflow.nodes.enums import NodeType from extensions.ext_database import db -from models import Account, App, EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom +from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom logger = logging.getLogger(__name__) @@ -133,22 +133,11 @@ class WeaveDataTrace(BaseTraceInstance): # through workflow_run_id get all_nodes_execution using repository session_factory = sessionmaker(bind=db.engine) # Find the app's creator account - with Session(db.engine, expire_on_commit=False) as session: - # Get the app to find its creator - app_id = trace_info.metadata.get("app_id") - if not app_id: - raise ValueError("No app_id found in trace_info metadata") + app_id = trace_info.metadata.get("app_id") + if not app_id: + raise ValueError("No app_id found in trace_info metadata") - app = session.query(App).filter(App.id == app_id).first() - if not app: - raise ValueError(f"App with id {app_id} not found") - - if not app.created_by: - raise ValueError(f"App with id {app_id} has no creator (created_by is None)") - - service_account = session.query(Account).filter(Account.id == app.created_by).first() - if not service_account: - raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}") + service_account = self.get_service_account_with_tenant(app_id) workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( session_factory=session_factory,