From bfd6524844b15333b902985aa75bea9b6fbce9eb Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Thu, 12 Feb 2026 14:54:46 -0500 Subject: [PATCH 1/2] fix(celery): Get `driver_type` from broker's URL Under the hood within Kombu (the messaging library used by Celery), the `driver_type` is ultimately derived from the broker's URL that's provided in the Celery configuration by the user via either the `broker` or `broker_url` arguments. We can accomplish the same thing in our context, removing the need to open a connection to the broker to determine the `driver_type` to set in the span. Fixes PY-2071 Fixes https://github.com/getsentry/sentry-python/issues/5428 --- sentry_sdk/integrations/celery/__init__.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index 1b1eb4f380..ec1cce6983 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -1,6 +1,7 @@ import sys from collections.abc import Mapping from functools import wraps +from urllib.parse import urlparse import sentry_sdk from sentry_sdk import isolation_scope @@ -392,11 +393,9 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": ) with capture_internal_exceptions(): - with task.app.connection() as conn: - span.set_data( - SPANDATA.MESSAGING_SYSTEM, - conn.transport.driver_type, - ) + task_broker_url = task.app.conf.broker_url + driver_type = urlparse(task_broker_url).scheme + span.set_data(SPANDATA.MESSAGING_SYSTEM, driver_type) return f(*args, **kwargs) except Exception: From cfc0a1d36beee11ad236ef034c8824d3de32edba Mon Sep 17 00:00:00 2001 From: Erica Pisani Date: Fri, 13 Feb 2026 09:37:16 -0500 Subject: [PATCH 2/2] Fix issue that cursor bot found with SSL variants. Uses a subset of mapping that Kombu uses, along with the default `driver_type` value that it uses when Kombu hasn't explicitly set one for a given Transport class (like SLMQ). --- sentry_sdk/integrations/celery/__init__.py | 10 ++++++-- sentry_sdk/integrations/celery/utils.py | 28 ++++++++++++++++++++++ tests/integrations/celery/test_celery.py | 15 +++++++++--- 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index ec1cce6983..8fc450f6ea 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -13,7 +13,10 @@ _patch_redbeat_apply_async, _setup_celery_beat_signals, ) -from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch +from sentry_sdk.integrations.celery.utils import ( + _now_seconds_since_epoch, + _get_driver_type_from_url_scheme, +) from sentry_sdk.integrations.logging import ignore_logger from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, TransactionSource from sentry_sdk.tracing_utils import Baggage @@ -394,7 +397,10 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": with capture_internal_exceptions(): task_broker_url = task.app.conf.broker_url - driver_type = urlparse(task_broker_url).scheme + + scheme = urlparse(task_broker_url).scheme + driver_type = _get_driver_type_from_url_scheme(scheme) + span.set_data(SPANDATA.MESSAGING_SYSTEM, driver_type) return f(*args, **kwargs) diff --git a/sentry_sdk/integrations/celery/utils.py b/sentry_sdk/integrations/celery/utils.py index f9378558c1..69ceb3613e 100644 --- a/sentry_sdk/integrations/celery/utils.py +++ b/sentry_sdk/integrations/celery/utils.py @@ -31,6 +31,34 @@ def _get_humanized_interval(seconds: float) -> "Tuple[int, MonitorConfigSchedule return (int(seconds), "second") +def _get_driver_type_from_url_scheme(scheme) -> str: + # Source: https://github.com/celery/kombu/blob/5cbdaf97131d8a3de005f1355b567b0d60224829/kombu/transport/__init__.py#L21-L48 + url_scheme_to_driver_type_mapping = { + "amqp": "amqp", + "amqps": "amqp", + "confluentkafka": "kafka", + "kafka": "kafka", + "memory": "memory", + "redis": "redis", + "rediss": "redis", + "SQS": "sqs", + "sqs": "sqs", + "mongodb": "mongodb", + "zookeeper": "zookeeper", + "filesystem": "filesystem", + "qpid": "qpid", + "sentinel": "redis", + "consul": "consul", + "etcd": "etcd", + "pyro": "pyro", + "gcpubsub": "gcpubsub", + } + + # "N/A" is set as the fallback value in Kombu when a `driver_type` + # is not set on the Kombu Transport class itself. + return url_scheme_to_driver_type_mapping.get(scheme, "N/A") + + class NoOpMgr: def __enter__(self) -> None: return None diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index b8fc2bb3e8..cf5f71aef5 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -689,8 +689,17 @@ def task(): ... assert span["data"]["messaging.message.retry.count"] == 3 -@pytest.mark.parametrize("system", ("redis", "amqp")) -def test_messaging_system(system, init_celery, capture_events): +@pytest.mark.parametrize( + "system,expected_driver_type", + [ + ("redis", "redis"), + ("rediss", "redis"), + ("amqp", "amqp"), + ("amqps", "amqp"), + ("slmq", "N/A"), + ], +) +def test_messaging_system(system, expected_driver_type, init_celery, capture_events): celery = init_celery(enable_tracing=True) events = capture_events() @@ -704,7 +713,7 @@ def task(): ... (event,) = events (span,) = event["spans"] - assert span["data"]["messaging.system"] == system + assert span["data"]["messaging.system"] == expected_driver_type @pytest.mark.parametrize("system", ("amqp", "redis"))