Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions sentry_sdk/integrations/celery/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys
from collections.abc import Mapping
from functools import wraps
from urllib.parse import urlparse

import sentry_sdk
from sentry_sdk import isolation_scope
Expand All @@ -12,7 +13,10 @@
_patch_redbeat_apply_async,
_setup_celery_beat_signals,
)
from sentry_sdk.integrations.celery.utils import _now_seconds_since_epoch
from sentry_sdk.integrations.celery.utils import (
_now_seconds_since_epoch,
_get_driver_type_from_url_scheme,
)
from sentry_sdk.integrations.logging import ignore_logger
from sentry_sdk.tracing import BAGGAGE_HEADER_NAME, TransactionSource
from sentry_sdk.tracing_utils import Baggage
Expand Down Expand Up @@ -392,11 +396,12 @@ def _inner(*args: "Any", **kwargs: "Any") -> "Any":
)

with capture_internal_exceptions():
with task.app.connection() as conn:
span.set_data(
SPANDATA.MESSAGING_SYSTEM,
conn.transport.driver_type,
)
task_broker_url = task.app.conf.broker_url

scheme = urlparse(task_broker_url).scheme
driver_type = _get_driver_type_from_url_scheme(scheme)

span.set_data(SPANDATA.MESSAGING_SYSTEM, driver_type)

return f(*args, **kwargs)
except Exception:
Expand Down
28 changes: 28 additions & 0 deletions sentry_sdk/integrations/celery/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,34 @@ def _get_humanized_interval(seconds: float) -> "Tuple[int, MonitorConfigSchedule
return (int(seconds), "second")


def _get_driver_type_from_url_scheme(scheme) -> str:
# Source: https://github.com/celery/kombu/blob/5cbdaf97131d8a3de005f1355b567b0d60224829/kombu/transport/__init__.py#L21-L48
url_scheme_to_driver_type_mapping = {
"amqp": "amqp",
"amqps": "amqp",
"confluentkafka": "kafka",
"kafka": "kafka",
"memory": "memory",
"redis": "redis",
"rediss": "redis",
"SQS": "sqs",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uppercase SQS mapping key is unreachable dead code

Low Severity

The "SQS" (uppercase) key in url_scheme_to_driver_type_mapping can never be matched. Python's urlparse normalizes schemes to lowercase per RFC 3986, so urlparse("SQS://...").scheme returns "sqs". The lowercase "sqs" key already handles this correctly, making the uppercase entry dead code that may mislead future maintainers.

Fix in Cursor Fix in Web

"sqs": "sqs",
"mongodb": "mongodb",
"zookeeper": "zookeeper",
"filesystem": "filesystem",
"qpid": "qpid",
"sentinel": "redis",
"consul": "consul",
"etcd": "etcd",
"pyro": "pyro",
"gcpubsub": "gcpubsub",
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mapping missing several valid Kombu transport URL schemes

Medium Severity

The url_scheme_to_driver_type_mapping is incomplete compared to the Kombu TRANSPORT_ALIASES it references in the source comment. It's missing at least pyamqp (→ "amqp"), librabbitmq (→ "amqp"), azureservicebus (→ "azureservicebus"), and azurestoragequeues (→ "azurestoragequeues"). Users with these broker URL schemes will get "N/A" instead of the correct driver_type, which is a regression from the old conn.transport.driver_type approach.

Fix in Cursor Fix in Web


# "N/A" is set as the fallback value in Kombu when a `driver_type`
# is not set on the Kombu Transport class itself.
return url_scheme_to_driver_type_mapping.get(scheme, "N/A")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

URL schemes with + modifier not handled correctly

Medium Severity

Kombu supports URL schemes with + modifiers like redis+socket:///path/to/socket for Redis over Unix sockets. Python's urlparse returns the full scheme (e.g., "redis+socket"), which doesn't match any key in url_scheme_to_driver_type_mapping, causing _get_driver_type_from_url_scheme to incorrectly return "N/A" instead of "redis". The old code using conn.transport.driver_type correctly resolved these to the base transport's driver_type. The scheme needs to be split on + and the base part used for lookup.

Additional Locations (1)

Fix in Cursor Fix in Web



class NoOpMgr:
def __enter__(self) -> None:
return None
Expand Down
15 changes: 12 additions & 3 deletions tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,8 +689,17 @@ def task(): ...
assert span["data"]["messaging.message.retry.count"] == 3


@pytest.mark.parametrize("system", ("redis", "amqp"))
def test_messaging_system(system, init_celery, capture_events):
@pytest.mark.parametrize(
"system,expected_driver_type",
[
("redis", "redis"),
("rediss", "redis"),
("amqp", "amqp"),
("amqps", "amqp"),
("slmq", "N/A"),
],
)
def test_messaging_system(system, expected_driver_type, init_celery, capture_events):
celery = init_celery(enable_tracing=True)
events = capture_events()

Expand All @@ -704,7 +713,7 @@ def task(): ...

(event,) = events
(span,) = event["spans"]
assert span["data"]["messaging.system"] == system
assert span["data"]["messaging.system"] == expected_driver_type


@pytest.mark.parametrize("system", ("amqp", "redis"))
Expand Down
Loading