From 6c167038af2b0c20bc85c9fd54bfdb31480bc267 Mon Sep 17 00:00:00 2001 From: AichiB7A Date: Tue, 15 Apr 2025 11:35:34 +0800 Subject: [PATCH] [Observability] Instrument with celery (#18029) --- api/extensions/ext_otel.py | 58 ++++++++++++++++++++++++++++---------- api/poetry.lock | 22 ++++++++++++++- api/pyproject.toml | 1 + 3 files changed, 65 insertions(+), 16 deletions(-) diff --git a/api/extensions/ext_otel.py b/api/extensions/ext_otel.py index f2da3e0275..59ec0d0686 100644 --- a/api/extensions/ext_otel.py +++ b/api/extensions/ext_otel.py @@ -1,16 +1,20 @@ import atexit +import logging import os import platform import socket +import sys from typing import Union +from celery.signals import worker_init # type: ignore from flask_login import user_loaded_from_request, user_logged_in # type: ignore from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry.instrumentation.flask import FlaskInstrumentor from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor -from opentelemetry.metrics import set_meter_provider +from opentelemetry.metrics import get_meter_provider, set_meter_provider from opentelemetry.propagate import set_global_textmap from opentelemetry.propagators.b3 import B3Format from opentelemetry.propagators.composite import CompositePropagator @@ -24,7 +28,7 @@ from opentelemetry.sdk.trace.export import ( ) from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio from opentelemetry.semconv.resource import ResourceAttributes -from opentelemetry.trace import Span, get_current_span, set_tracer_provider +from opentelemetry.trace import Span, get_current_span, get_tracer_provider, set_tracer_provider from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from opentelemetry.trace.status import StatusCode @@ -96,22 +100,37 @@ def init_app(app: DifyApp): export_timeout_millis=dify_config.OTEL_METRIC_EXPORT_TIMEOUT, ) set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader])) - - def response_hook(span: Span, status: str, response_headers: list): - if span and span.is_recording(): - if status.startswith("2"): - span.set_status(StatusCode.OK) - else: - span.set_status(StatusCode.ERROR, status) - - instrumentor = FlaskInstrumentor() - instrumentor.instrument_app(app, response_hook=response_hook) - with app.app_context(): - engines = list(app.extensions["sqlalchemy"].engines.values()) - SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines) + if not is_celery_worker(): + init_flask_instrumentor(app) + CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument() + init_sqlalchemy_instrumentor(app) atexit.register(shutdown_tracer) +def is_celery_worker(): + return "celery" in sys.argv[0].lower() + + +def init_flask_instrumentor(app: DifyApp): + def response_hook(span: Span, status: str, response_headers: list): + if span and span.is_recording(): + if status.startswith("2"): + span.set_status(StatusCode.OK) + else: + span.set_status(StatusCode.ERROR, status) + + instrumentor = FlaskInstrumentor() + if dify_config.DEBUG: + logging.info("Initializing Flask instrumentor") + instrumentor.instrument_app(app, response_hook=response_hook) + + +def init_sqlalchemy_instrumentor(app: DifyApp): + with app.app_context(): + engines = list(app.extensions["sqlalchemy"].engines.values()) + SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines) + + def setup_context_propagation(): # Configure propagators set_global_textmap( @@ -124,6 +143,15 @@ def setup_context_propagation(): ) +@worker_init.connect(weak=False) +def init_celery_worker(*args, **kwargs): + tracer_provider = get_tracer_provider() + metric_provider = get_meter_provider() + if dify_config.DEBUG: + logging.info("Initializing OpenTelemetry for Celery worker") + CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument() + + def shutdown_tracer(): provider = trace.get_tracer_provider() if hasattr(provider, "force_flush"): diff --git a/api/poetry.lock b/api/poetry.lock index 3ee71c5c58..2901553682 100644 --- a/api/poetry.lock +++ b/api/poetry.lock @@ -5537,6 +5537,26 @@ opentelemetry-util-http = "0.48b0" [package.extras] instruments = ["asgiref (>=3.0,<4.0)"] +[[package]] +name = "opentelemetry-instrumentation-celery" +version = "0.48b0" +description = "OpenTelemetry Celery Instrumentation" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "opentelemetry_instrumentation_celery-0.48b0-py3-none-any.whl", hash = "sha256:c1904e38cc58fb2a33cd657d6e296285c5ffb0dca3f164762f94b905e5abc88e"}, + {file = "opentelemetry_instrumentation_celery-0.48b0.tar.gz", hash = "sha256:1d33aa6c4a1e6c5d17a64215245208a96e56c9d07611685dbae09a557704af26"}, +] + +[package.dependencies] +opentelemetry-api = ">=1.12,<2.0" +opentelemetry-instrumentation = "0.48b0" +opentelemetry-semantic-conventions = "0.48b0" + +[package.extras] +instruments = ["celery (>=4.0,<6.0)"] + [[package]] name = "opentelemetry-instrumentation-fastapi" version = "0.48b0" @@ -10560,4 +10580,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = ">=3.11,<3.13" -content-hash = "f433068e3819e71110da806dc5f0e80db64d439499c56126ac67d31c1ac30391" +content-hash = "23f5322fb6a6397f1cabb206d6806284f95a277ae1f1269df727f58a49ce4384" diff --git a/api/pyproject.toml b/api/pyproject.toml index 2bacf3b1dc..a06f1747d4 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -56,6 +56,7 @@ opentelemetry-exporter-otlp-proto-common = "1.27.0" opentelemetry-exporter-otlp-proto-grpc = "1.27.0" opentelemetry-exporter-otlp-proto-http = "1.27.0" opentelemetry-instrumentation = "0.48b0" +opentelemetry-instrumentation-celery = "0.48b0" opentelemetry-instrumentation-flask = "0.48b0" opentelemetry-instrumentation-sqlalchemy = "0.48b0" opentelemetry-propagator-b3 = "1.27.0"