Skip to content

Commit

Permalink
code additions
Browse files Browse the repository at this point in the history
  • Loading branch information
twin-drill committed Aug 27, 2024
1 parent 7af99d0 commit d80b5ee
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 7 deletions.
100 changes: 96 additions & 4 deletions temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,22 @@
import concurrent.futures
import logging
import os
import sys
from datetime import timezone
from typing import Callable, Dict, List, MutableMapping, Optional, Sequence, Set, Type
from threading import get_ident
from types import TracebackType
from typing import (
Callable,
Dict,
List,
Literal,
MutableMapping,
Optional,
Sequence,
Set,
Tuple,
Type,
)

import temporalio.activity
import temporalio.api.common.v1
Expand Down Expand Up @@ -250,9 +264,10 @@ async def _handle_activation(
activate_task, self._deadlock_timeout_seconds
)
except asyncio.TimeoutError:
raise RuntimeError(
f"[TMPRL1101] Potential deadlock detected, workflow didn't yield within {self._deadlock_timeout_seconds} second(s)"
)
raise _DeadlockError.from_deadlocked_workflow(
workflow, self._deadlock_timeout_seconds
) from None

except Exception as err:
# We cannot fail a cache eviction, we must just log and not complete
# the activation (failed or otherwise). This should only happen in
Expand All @@ -268,6 +283,9 @@ async def _handle_activation(
self._could_not_evict_count += 1
return

if isinstance(err, _DeadlockError):
err.swap_traceback()

logger.exception(
"Failed handling activation on workflow with run ID %s", act.run_id
)
Expand Down Expand Up @@ -421,3 +439,77 @@ def nondeterminism_as_workflow_fail_for_types(self) -> Set[str]:
for typ in v.failure_exception_types
)
)


class _DeadlockError(Exception):
"""Exception class for deadlocks. Contains functionality to swap the default traceback for another."""

def __init__(self, message: str, replacement_tb: Optional[TracebackType] = None):
"""Create a new DeadlockError, with message `msg` and optionally a traceback `tb` to be swapped in later.
Args:
message: Message to be presented through exception.
replacement_tb: Optional TracebackType to be swapped later.
"""
super().__init__(message)
self._new_tb = replacement_tb

def swap_traceback(self) -> None:
"""Swap the current traceback for the replacement passed during construction. Used to work around Python adding the current frame to the stack trace.
Returns:
None
"""
if self._new_tb:
self.__traceback__ = self._new_tb
self._new_tb = None

@classmethod
def from_deadlocked_workflow(
cls, workflow: WorkflowInstance, timeout: Optional[int]
):
msg = f"[TMPRL1101] Potential deadlock detected: workflow didn't yield within {timeout} second(s)."
tid = workflow.get_thread_id()
if not tid:
return cls(msg)

try:
tb = cls._gen_tb_helper(tid)
if tb:
return cls(msg, tb)
return cls(f"{msg} (no frames available)")
except Exception as err:
return cls(f"{msg} (failed getting frames: {err})")

@staticmethod
def _gen_tb_helper(
tid: int,
) -> Optional[TracebackType]:
"""Take a thread id and construct a stack trace.
Returns:
<Optional[TracebackType]> the traceback that was constructed, None if the thread could not be found.
"""
frame = sys._current_frames().get(tid)
if not frame:
return None

# not using traceback.extract_stack() because it obfuscates the frame objects (specifically f_lasti)
thread_frames = [frame]
while frame.f_back:
frame = frame.f_back
thread_frames.append(frame)

thread_frames.reverse()

size = 0
tb = None
for frm in thread_frames:
tb = TracebackType(tb, frm, frm.f_lasti, frm.f_lineno)
size += sys.getsizeof(tb)

while size > 200000 and tb:
size -= sys.getsizeof(tb)
tb = tb.tb_next

return tb
18 changes: 18 additions & 0 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import logging
import random
import sys
import threading
import traceback
import warnings
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -158,6 +159,16 @@ def activate(
"""
raise NotImplementedError

def get_thread_id(self) -> Optional[int]:
"""Return the thread identifier that this workflow is running on.
Not an abstractmethod because it is not mandatory to implement. Used primarily for getting the frames of a deadlocked thread.
Returns:
Thread ID if the workflow is running, None if not.
"""
return None


class UnsandboxedWorkflowRunner(WorkflowRunner):
"""Workflow runner that does not do any sandboxing."""
Expand Down Expand Up @@ -300,6 +311,12 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
# We only create the metric meter lazily
self._metric_meter: Optional[_ReplaySafeMetricMeter] = None

# For tracking the thread this workflow is running on (primarily for deadlock situations)
self._current_thread_id: Optional[int] = None

def get_thread_id(self) -> Optional[int]:
return self._current_thread_id

#### Activation functions ####
# These are in alphabetical order and besides "activate", all other calls
# are "_apply_" + the job field name.
Expand All @@ -320,6 +337,7 @@ def activate(
self._time_ns = act.timestamp.ToNanoseconds()
self._is_replaying = act.is_replaying

self._current_thread_id = threading.get_ident()
activation_err: Optional[Exception] = None
try:
# Split into job sets with patches, then signals + updates, then
Expand Down
10 changes: 9 additions & 1 deletion temporalio/worker/workflow_sandbox/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

from __future__ import annotations

import threading
from datetime import datetime, timedelta, timezone
from typing import Any, Sequence, Type
from typing import Any, Optional, Sequence, Type

import temporalio.bridge.proto.workflow_activation
import temporalio.bridge.proto.workflow_completion
Expand Down Expand Up @@ -112,6 +113,8 @@ def __init__(
self.runner_class = runner_class
self.importer = Importer(restrictions, RestrictionContext())

self._current_thread_id: Optional[int] = None

# Create the instance
self.globals_and_locals = {
"__file__": "workflow_sandbox.py",
Expand Down Expand Up @@ -169,8 +172,13 @@ def _run_code(self, code: str, **extra_globals: Any) -> None:
self.globals_and_locals[k] = v
try:
temporalio.workflow.unsafe._set_in_sandbox(True)
self._current_thread_id = threading.get_ident()
exec(code, self.globals_and_locals, self.globals_and_locals)
finally:
temporalio.workflow.unsafe._set_in_sandbox(False)
self._current_thread_id = None
for k, v in extra_globals.items():
self.globals_and_locals.pop(k, None)

def get_thread_id(self) -> Optional[int]:
return self._current_thread_id
36 changes: 34 additions & 2 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2108,6 +2108,38 @@ async def status() -> str:


async def test_workflow_enhanced_stack_trace(client: Client):
"""Expected format of __enhanced_stack_trace:
EnhancedStackTrace : {
sdk (StackTraceSDKInfo) : {
name: string,
version: string
},
sources (map<string, StackTraceFileSlice>) : {
filename: (StackTraceFileSlice) {
line_offset: int,
content: string
},
...
},
stacks (StackTrace[]) : [
(StackTraceFileLocation) {
file_path: string,
line: int,
column: int,
function_name: string,
internal_code: bool
},
...
]
}
More details available in API repository: temporal/api/sdk/v1/enhanced_stack_trace.proto
"""

async with new_worker(
client, StackTraceWorkflow, LongSleepWorkflow, activities=[wait_cancel]
) as worker:
Expand Down Expand Up @@ -2570,7 +2602,7 @@ async def last_history_task_failure() -> str:

try:
await assert_eq_eventually(
"[TMPRL1101] Potential deadlock detected, workflow didn't yield within 1 second(s)",
"[TMPRL1101] Potential deadlock detected: workflow didn't yield within 1 second(s).",
last_history_task_failure,
timeout=timedelta(seconds=5),
interval=timedelta(seconds=1),
Expand Down Expand Up @@ -2627,7 +2659,7 @@ async def last_history_task_failure() -> str:
return "<no failure>"

await assert_eq_eventually(
"[TMPRL1101] Potential deadlock detected, workflow didn't yield within 1 second(s)",
"[TMPRL1101] Potential deadlock detected: workflow didn't yield within 1 second(s).",
last_history_task_failure,
timeout=timedelta(seconds=5),
interval=timedelta(seconds=1),
Expand Down

0 comments on commit d80b5ee

Please sign in to comment.