diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index 1b1eb4f380..8fc450f6ea 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -1,6 +1,7 @@ import sys from collections.abc import Mapping from functools import wraps +from urllib.parse import urlparse import sentry_sdk from sentry_sdk import isolation_scope @@ -12,7 +13,10 @@ _patch_redbeat_apply_async, _setup_celery_beat_signals, ) -from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch +from sentry_sdk.integrations.celery.utils import ( + _now_seconds_since_epoch, + _get_driver_type_from_url_scheme, +) from sentry_sdk.integrations.logging import ignore_logger from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, TransactionSource from sentry_sdk.tracing_utils import Baggage @@ -392,11 +396,12 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any": ) with capture_internal_exceptions(): - with task.app.connection() as conn: - span.set_data( - SPANDATA.MESSAGING_SYSTEM, - conn.transport.driver_type, - ) + task_broker_url = task.app.conf.broker_url + + scheme = urlparse(task_broker_url).scheme + driver_type = _get_driver_type_from_url_scheme(scheme) + + span.set_data(SPANDATA.MESSAGING_SYSTEM, driver_type) return f(*args, **kwargs) except Exception: diff --git a/sentry_sdk/integrations/celery/utils.py b/sentry_sdk/integrations/celery/utils.py index f9378558c1..69ceb3613e 100644 --- a/sentry_sdk/integrations/celery/utils.py +++ b/sentry_sdk/integrations/celery/utils.py @@ -31,6 +31,34 @@ def _get_humanized_interval(seconds: float) -> "Tuple[int, MonitorConfigSchedule return (int(seconds), "second") +def _get_driver_type_from_url_scheme(scheme) -> str: + # Source: https://github.com/celery/kombu/blob/5cbdaf97131d8a3de005f1355b567b0d60224829/kombu/transport/__init__.py#L21-L48 + url_scheme_to_driver_type_mapping = { + "amqp": "amqp", + "amqps": "amqp", + "confluentkafka": "kafka", + "kafka": "kafka", + "memory": "memory", + "redis": "redis", + "rediss": "redis", + "SQS": "sqs", + "sqs": "sqs", + "mongodb": "mongodb", + "zookeeper": "zookeeper", + "filesystem": "filesystem", + "qpid": "qpid", + "sentinel": "redis", + "consul": "consul", + "etcd": "etcd", + "pyro": "pyro", + "gcpubsub": "gcpubsub", + } + + # "N/A" is set as the fallback value in Kombu when a `driver_type` + # is not set on the Kombu Transport class itself. + return url_scheme_to_driver_type_mapping.get(scheme, "N/A") + + class NoOpMgr: def __enter__(self) -> None: return None diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index b8fc2bb3e8..cf5f71aef5 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -689,8 +689,17 @@ def task(): ... assert span["data"]["messaging.message.retry.count"] == 3 -@pytest.mark.parametrize("system", ("redis", "amqp")) -def test_messaging_system(system, init_celery, capture_events): +@pytest.mark.parametrize( + "system,expected_driver_type", + [ + ("redis", "redis"), + ("rediss", "redis"), + ("amqp", "amqp"), + ("amqps", "amqp"), + ("slmq", "N/A"), + ], +) +def test_messaging_system(system, expected_driver_type, init_celery, capture_events): celery = init_celery(enable_tracing=True) events = capture_events() @@ -704,7 +713,7 @@ def task(): ... (event,) = events (span,) = event["spans"] - assert span["data"]["messaging.system"] == system + assert span["data"]["messaging.system"] == expected_driver_type @pytest.mark.parametrize("system", ("amqp", "redis"))