From 8299614e602925208ae98cef6ed3c1ec56d257df Mon Sep 17 00:00:00 2001 From: AichiB7A Date: Mon, 28 Apr 2025 10:31:13 +0800 Subject: [PATCH] [Observability][Bugfix] Fix expected an instance of Token, got None error in OpenTelemetry (#18934) --- api/app_factory.py | 2 - api/extensions/ext_otel.py | 327 ++++++++++++++++--------------- api/extensions/ext_otel_patch.py | 63 ------ 3 files changed, 166 insertions(+), 226 deletions(-) delete mode 100644 api/extensions/ext_otel_patch.py diff --git a/api/app_factory.py b/api/app_factory.py index 9648d770ab..586f2ded9e 100644 --- a/api/app_factory.py +++ b/api/app_factory.py @@ -52,7 +52,6 @@ def initialize_extensions(app: DifyApp): ext_mail, ext_migrate, ext_otel, - ext_otel_patch, ext_proxy_fix, ext_redis, ext_repositories, @@ -85,7 +84,6 @@ def initialize_extensions(app: DifyApp): ext_proxy_fix, ext_blueprints, ext_commands, - ext_otel_patch, # Apply patch before initializing OpenTelemetry ext_otel, ] for ext in extensions: diff --git a/api/extensions/ext_otel.py b/api/extensions/ext_otel.py index 29fa46902e..be47fdc6d6 100644 --- a/api/extensions/ext_otel.py +++ b/api/extensions/ext_otel.py @@ -8,192 +8,197 @@ from typing import Union from celery.signals import worker_init # type: ignore from flask_login import user_loaded_from_request, user_logged_in # type: ignore -from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.instrumentation.celery import CeleryInstrumentor -from opentelemetry.instrumentation.flask import FlaskInstrumentor -from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor -from opentelemetry.metrics import get_meter, get_meter_provider, set_meter_provider -from opentelemetry.propagate import set_global_textmap -from opentelemetry.propagators.b3 import B3Format -from opentelemetry.propagators.composite import CompositePropagator -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import ( - BatchSpanProcessor, - ConsoleSpanExporter, -) -from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio -from opentelemetry.semconv.resource import ResourceAttributes -from opentelemetry.trace import Span, get_current_span, get_tracer_provider, set_tracer_provider -from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator -from opentelemetry.trace.status import StatusCode from configs import dify_config from dify_app import DifyApp -class ExceptionLoggingHandler(logging.Handler): - """Custom logging handler that creates spans for logging.exception() calls""" - - def emit(self, record): - try: - if record.exc_info: - tracer = get_tracer_provider().get_tracer("dify.exception.logging") - with tracer.start_as_current_span( - "log.exception", - attributes={ - "log.level": record.levelname, - "log.message": record.getMessage(), - "log.logger": record.name, - "log.file.path": record.pathname, - "log.file.line": record.lineno, - }, - ) as span: - span.set_status(StatusCode.ERROR) - span.record_exception(record.exc_info[1]) - span.set_attribute("exception.type", record.exc_info[0].__name__) - span.set_attribute("exception.message", str(record.exc_info[1])) - except Exception: - pass - - @user_logged_in.connect @user_loaded_from_request.connect def on_user_loaded(_sender, user): - if user: - current_span = get_current_span() - if current_span: - current_span.set_attribute("service.tenant.id", user.current_tenant_id) - current_span.set_attribute("service.user.id", user.id) + if dify_config.ENABLE_OTEL: + from opentelemetry.trace import get_current_span + + if user: + current_span = get_current_span() + if current_span: + current_span.set_attribute("service.tenant.id", user.current_tenant_id) + current_span.set_attribute("service.user.id", user.id) def init_app(app: DifyApp): - if dify_config.ENABLE_OTEL: - setup_context_propagation() - # Initialize OpenTelemetry - # Follow Semantic Convertions 1.32.0 to define resource attributes - resource = Resource( - attributes={ - ResourceAttributes.SERVICE_NAME: dify_config.APPLICATION_NAME, - ResourceAttributes.SERVICE_VERSION: f"dify-{dify_config.CURRENT_VERSION}-{dify_config.COMMIT_SHA}", - ResourceAttributes.PROCESS_PID: os.getpid(), - ResourceAttributes.DEPLOYMENT_ENVIRONMENT: f"{dify_config.DEPLOY_ENV}-{dify_config.EDITION}", - ResourceAttributes.HOST_NAME: socket.gethostname(), - ResourceAttributes.HOST_ARCH: platform.machine(), - "custom.deployment.git_commit": dify_config.COMMIT_SHA, - ResourceAttributes.HOST_ID: platform.node(), - ResourceAttributes.OS_TYPE: platform.system().lower(), - ResourceAttributes.OS_DESCRIPTION: platform.platform(), - ResourceAttributes.OS_VERSION: platform.version(), - } - ) - sampler = ParentBasedTraceIdRatio(dify_config.OTEL_SAMPLING_RATE) - provider = TracerProvider(resource=resource, sampler=sampler) - set_tracer_provider(provider) - exporter: Union[OTLPSpanExporter, ConsoleSpanExporter] - metric_exporter: Union[OTLPMetricExporter, ConsoleMetricExporter] - if dify_config.OTEL_EXPORTER_TYPE == "otlp": - exporter = OTLPSpanExporter( - endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/traces", - headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"}, - ) - metric_exporter = OTLPMetricExporter( - endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/metrics", - headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"}, - ) - else: - # Fallback to console exporter - exporter = ConsoleSpanExporter() - metric_exporter = ConsoleMetricExporter() + def is_celery_worker(): + return "celery" in sys.argv[0].lower() - provider.add_span_processor( - BatchSpanProcessor( - exporter, - max_queue_size=dify_config.OTEL_MAX_QUEUE_SIZE, - schedule_delay_millis=dify_config.OTEL_BATCH_EXPORT_SCHEDULE_DELAY, - max_export_batch_size=dify_config.OTEL_MAX_EXPORT_BATCH_SIZE, - export_timeout_millis=dify_config.OTEL_BATCH_EXPORT_TIMEOUT, + def instrument_exception_logging(): + exception_handler = ExceptionLoggingHandler() + logging.getLogger().addHandler(exception_handler) + + def init_flask_instrumentor(app: DifyApp): + meter = get_meter("http_metrics", version=dify_config.CURRENT_VERSION) + _http_response_counter = meter.create_counter( + "http.server.response.count", description="Total number of HTTP responses by status code", unit="{response}" + ) + + def response_hook(span: Span, status: str, response_headers: list): + if span and span.is_recording(): + if status.startswith("2"): + span.set_status(StatusCode.OK) + else: + span.set_status(StatusCode.ERROR, status) + + status = status.split(" ")[0] + status_code = int(status) + status_class = f"{status_code // 100}xx" + _http_response_counter.add(1, {"status_code": status_code, "status_class": status_class}) + + instrumentor = FlaskInstrumentor() + if dify_config.DEBUG: + logging.info("Initializing Flask instrumentor") + instrumentor.instrument_app(app, response_hook=response_hook) + + def init_sqlalchemy_instrumentor(app: DifyApp): + with app.app_context(): + engines = list(app.extensions["sqlalchemy"].engines.values()) + SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines) + + def setup_context_propagation(): + # Configure propagators + set_global_textmap( + CompositePropagator( + [ + TraceContextTextMapPropagator(), # W3C trace context + B3Format(), # B3 propagation (used by many systems) + ] ) ) - reader = PeriodicExportingMetricReader( - metric_exporter, - export_interval_millis=dify_config.OTEL_METRIC_EXPORT_INTERVAL, - export_timeout_millis=dify_config.OTEL_METRIC_EXPORT_TIMEOUT, - ) - set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader])) - if not is_celery_worker(): - init_flask_instrumentor(app) - CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument() - instrument_exception_logging() - init_sqlalchemy_instrumentor(app) - atexit.register(shutdown_tracer) + def shutdown_tracer(): + provider = trace.get_tracer_provider() + if hasattr(provider, "force_flush"): + provider.force_flush() -def is_celery_worker(): - return "celery" in sys.argv[0].lower() + class ExceptionLoggingHandler(logging.Handler): + """Custom logging handler that creates spans for logging.exception() calls""" + def emit(self, record): + try: + if record.exc_info: + tracer = get_tracer_provider().get_tracer("dify.exception.logging") + with tracer.start_as_current_span( + "log.exception", + attributes={ + "log.level": record.levelname, + "log.message": record.getMessage(), + "log.logger": record.name, + "log.file.path": record.pathname, + "log.file.line": record.lineno, + }, + ) as span: + span.set_status(StatusCode.ERROR) + span.record_exception(record.exc_info[1]) + span.set_attribute("exception.type", record.exc_info[0].__name__) + span.set_attribute("exception.message", str(record.exc_info[1])) + except Exception: + pass -def instrument_exception_logging(): - exception_handler = ExceptionLoggingHandler() - logging.getLogger().addHandler(exception_handler) - - -def init_flask_instrumentor(app: DifyApp): - meter = get_meter("http_metrics", version=dify_config.CURRENT_VERSION) - _http_response_counter = meter.create_counter( - "http.server.response.count", description="Total number of HTTP responses by status code", unit="{response}" + from opentelemetry import trace + from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + from opentelemetry.instrumentation.celery import CeleryInstrumentor + from opentelemetry.instrumentation.flask import FlaskInstrumentor + from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor + from opentelemetry.metrics import get_meter, get_meter_provider, set_meter_provider + from opentelemetry.propagate import set_global_textmap + from opentelemetry.propagators.b3 import B3Format + from opentelemetry.propagators.composite import CompositePropagator + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader + from opentelemetry.sdk.resources import Resource + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, + ConsoleSpanExporter, ) + from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio + from opentelemetry.semconv.resource import ResourceAttributes + from opentelemetry.trace import Span, get_tracer_provider, set_tracer_provider + from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + from opentelemetry.trace.status import StatusCode - def response_hook(span: Span, status: str, response_headers: list): - if span and span.is_recording(): - if status.startswith("2"): - span.set_status(StatusCode.OK) - else: - span.set_status(StatusCode.ERROR, status) + setup_context_propagation() + # Initialize OpenTelemetry + # Follow Semantic Convertions 1.32.0 to define resource attributes + resource = Resource( + attributes={ + ResourceAttributes.SERVICE_NAME: dify_config.APPLICATION_NAME, + ResourceAttributes.SERVICE_VERSION: f"dify-{dify_config.CURRENT_VERSION}-{dify_config.COMMIT_SHA}", + ResourceAttributes.PROCESS_PID: os.getpid(), + ResourceAttributes.DEPLOYMENT_ENVIRONMENT: f"{dify_config.DEPLOY_ENV}-{dify_config.EDITION}", + ResourceAttributes.HOST_NAME: socket.gethostname(), + ResourceAttributes.HOST_ARCH: platform.machine(), + "custom.deployment.git_commit": dify_config.COMMIT_SHA, + ResourceAttributes.HOST_ID: platform.node(), + ResourceAttributes.OS_TYPE: platform.system().lower(), + ResourceAttributes.OS_DESCRIPTION: platform.platform(), + ResourceAttributes.OS_VERSION: platform.version(), + } + ) + sampler = ParentBasedTraceIdRatio(dify_config.OTEL_SAMPLING_RATE) + provider = TracerProvider(resource=resource, sampler=sampler) + set_tracer_provider(provider) + exporter: Union[OTLPSpanExporter, ConsoleSpanExporter] + metric_exporter: Union[OTLPMetricExporter, ConsoleMetricExporter] + if dify_config.OTEL_EXPORTER_TYPE == "otlp": + exporter = OTLPSpanExporter( + endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/traces", + headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"}, + ) + metric_exporter = OTLPMetricExporter( + endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/metrics", + headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"}, + ) + else: + # Fallback to console exporter + exporter = ConsoleSpanExporter() + metric_exporter = ConsoleMetricExporter() - status = status.split(" ")[0] - status_code = int(status) - status_class = f"{status_code // 100}xx" - _http_response_counter.add(1, {"status_code": status_code, "status_class": status_class}) - - instrumentor = FlaskInstrumentor() - if dify_config.DEBUG: - logging.info("Initializing Flask instrumentor") - instrumentor.instrument_app(app, response_hook=response_hook) - - -def init_sqlalchemy_instrumentor(app: DifyApp): - with app.app_context(): - engines = list(app.extensions["sqlalchemy"].engines.values()) - SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines) - - -def setup_context_propagation(): - # Configure propagators - set_global_textmap( - CompositePropagator( - [ - TraceContextTextMapPropagator(), # W3C trace context - B3Format(), # B3 propagation (used by many systems) - ] + provider.add_span_processor( + BatchSpanProcessor( + exporter, + max_queue_size=dify_config.OTEL_MAX_QUEUE_SIZE, + schedule_delay_millis=dify_config.OTEL_BATCH_EXPORT_SCHEDULE_DELAY, + max_export_batch_size=dify_config.OTEL_MAX_EXPORT_BATCH_SIZE, + export_timeout_millis=dify_config.OTEL_BATCH_EXPORT_TIMEOUT, ) ) + reader = PeriodicExportingMetricReader( + metric_exporter, + export_interval_millis=dify_config.OTEL_METRIC_EXPORT_INTERVAL, + export_timeout_millis=dify_config.OTEL_METRIC_EXPORT_TIMEOUT, + ) + set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader])) + if not is_celery_worker(): + init_flask_instrumentor(app) + CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument() + instrument_exception_logging() + init_sqlalchemy_instrumentor(app) + atexit.register(shutdown_tracer) + + +def is_enabled(): + return dify_config.ENABLE_OTEL @worker_init.connect(weak=False) def init_celery_worker(*args, **kwargs): - tracer_provider = get_tracer_provider() - metric_provider = get_meter_provider() - if dify_config.DEBUG: - logging.info("Initializing OpenTelemetry for Celery worker") - CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument() + if dify_config.ENABLE_OTEL: + from opentelemetry.instrumentation.celery import CeleryInstrumentor + from opentelemetry.metrics import get_meter_provider + from opentelemetry.trace import get_tracer_provider - -def shutdown_tracer(): - provider = trace.get_tracer_provider() - if hasattr(provider, "force_flush"): - provider.force_flush() + tracer_provider = get_tracer_provider() + metric_provider = get_meter_provider() + if dify_config.DEBUG: + logging.info("Initializing OpenTelemetry for Celery worker") + CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument() diff --git a/api/extensions/ext_otel_patch.py b/api/extensions/ext_otel_patch.py deleted file mode 100644 index 58309fe4d1..0000000000 --- a/api/extensions/ext_otel_patch.py +++ /dev/null @@ -1,63 +0,0 @@ -""" -Patch for OpenTelemetry context detach method to handle None tokens gracefully. - -This patch addresses the issue where OpenTelemetry's context.detach() method raises a TypeError -when called with a None token. The error occurs in the contextvars_context.py file where it tries -to call reset() on a None token. - -Related GitHub issue: https://github.com/langgenius/dify/issues/18496 - -Error being fixed: -``` -Traceback (most recent call last): - File "opentelemetry/context/__init__.py", line 154, in detach - _RUNTIME_CONTEXT.detach(token) - File "opentelemetry/context/contextvars_context.py", line 50, in detach - self._current_context.reset(token) # type: ignore - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -TypeError: expected an instance of Token, got None -``` - -Instead of modifying the third-party package directly, this patch monkey-patches the -context.detach method to gracefully handle None tokens. -""" - -import logging -from functools import wraps - -from opentelemetry import context - -logger = logging.getLogger(__name__) - -# Store the original detach method -original_detach = context.detach - - -# Create a patched version that handles None tokens -@wraps(original_detach) -def patched_detach(token): - """ - A patched version of context.detach that handles None tokens gracefully. - """ - if token is None: - logger.debug("Attempted to detach a None token, skipping") - return - - return original_detach(token) - - -def is_enabled(): - """ - Check if the extension is enabled. - Always enable this patch to prevent errors even when OpenTelemetry is disabled. - """ - return True - - -def init_app(app): - """ - Initialize the OpenTelemetry context patch. - """ - # Replace the original detach method with our patched version - context.detach = patched_detach - logger.info("OpenTelemetry context.detach patched to handle None tokens")