Skip to content

Commit

Permalink
Make sure each task that is started by Celery Beat has its own trace. (
Browse files Browse the repository at this point in the history
…#2249)

When tasks are started by Celery Beat they should not inherit the trace from the starting code (which is Celery Beat) but get their own trace.
  • Loading branch information
antonpirker committed Jul 13, 2023
1 parent 093003f commit d586149
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 27 deletions.
46 changes: 25 additions & 21 deletions sentry_sdk/integrations/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,30 +462,34 @@ def sentry_apply_entry(*args, **kwargs):
if match_regex_list(monitor_name, integration.exclude_beat_tasks):
return original_apply_entry(*args, **kwargs)

monitor_config = _get_monitor_config(celery_schedule, app)

is_supported_schedule = bool(monitor_config)
if is_supported_schedule:
headers = schedule_entry.options.pop("headers", {})
headers.update(
{
"sentry-monitor-slug": monitor_name,
"sentry-monitor-config": monitor_config,
}
)
with hub.configure_scope() as scope:
# When tasks are started from Celery Beat, make sure each task has its own trace.
scope.set_new_propagation_context()

monitor_config = _get_monitor_config(celery_schedule, app)

is_supported_schedule = bool(monitor_config)
if is_supported_schedule:
headers = schedule_entry.options.pop("headers", {})
headers.update(
{
"sentry-monitor-slug": monitor_name,
"sentry-monitor-config": monitor_config,
}
)

check_in_id = capture_checkin(
monitor_slug=monitor_name,
monitor_config=monitor_config,
status=MonitorStatus.IN_PROGRESS,
)
headers.update({"sentry-monitor-check-in-id": check_in_id})
check_in_id = capture_checkin(
monitor_slug=monitor_name,
monitor_config=monitor_config,
status=MonitorStatus.IN_PROGRESS,
)
headers.update({"sentry-monitor-check-in-id": check_in_id})

# Set the Sentry configuration in the options of the ScheduleEntry.
# Those will be picked up in `apply_async` and added to the headers.
schedule_entry.options["headers"] = headers
# Set the Sentry configuration in the options of the ScheduleEntry.
# Those will be picked up in `apply_async` and added to the headers.
schedule_entry.options["headers"] = headers

return original_apply_entry(*args, **kwargs)
return original_apply_entry(*args, **kwargs)

Scheduler.apply_entry = sentry_apply_entry

Expand Down
21 changes: 15 additions & 6 deletions sentry_sdk/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,23 @@ def _create_new_propagation_context(self):
"dynamic_sampling_context": None,
}

def set_new_propagation_context(self):
# type: () -> None
"""
Creates a new propagation context and sets it as `_propagation_context`. Overwriting existing one.
"""
self._propagation_context = self._create_new_propagation_context()
logger.debug(
"[Tracing] Create new propagation context: %s",
self._propagation_context,
)

def generate_propagation_context(self, incoming_data=None):
# type: (Optional[Dict[str, str]]) -> None
"""
Populates `_propagation_context`. Either from `incoming_data` or with a new propagation context.
Makes sure `_propagation_context` is set.
If there is `incoming_data` overwrite existing `_propagation_context`.
if there is no `incoming_data` create new `_propagation_context`, but do NOT overwrite if already existing.
"""
if incoming_data:
context = self._extract_propagation_context(incoming_data)
Expand All @@ -212,11 +225,7 @@ def generate_propagation_context(self, incoming_data=None):
)

if self._propagation_context is None:
self._propagation_context = self._create_new_propagation_context()
logger.debug(
"[Tracing] Create new propagation context: %s",
self._propagation_context,
)
self.set_new_propagation_context()

def get_dynamic_sampling_context(self):
# type: () -> Optional[Dict[str, str]]
Expand Down

0 comments on commit d586149

Please sign in to comment.