Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add throws to actors for handling expected exceptions #303

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions dramatiq/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import inspect
import re
import time

Expand All @@ -39,9 +40,22 @@ class Actor:
priority(int): The actor's priority.
options(dict): Arbitrary options that are passed to the broker
and middleware.
throws(BaseException subclass or tuple of such):
One or more expected exceptions.
"""

def __init__(self, fn, *, broker, actor_name, queue_name, priority, options):
def __init__(self, fn, *, broker, actor_name, queue_name, priority, options, throws):
if throws is not None:
# Validate throws can be used in an except statement
def is_exception(exc):
return inspect.isclass(exc) and issubclass(exc, BaseException)
if (
not (
is_exception(throws) or
(isinstance(throws, tuple) and all([is_exception(exc) for exc in throws]))
)
):
raise TypeError("'throws' must be a subclass of BaseException or a tuple of such.")
self.logger = get_logger(fn.__module__, actor_name)
self.fn = fn
self.broker = broker
Expand All @@ -50,6 +64,7 @@ def __init__(self, fn, *, broker, actor_name, queue_name, priority, options):
self.priority = priority
self.options = options
self.broker.declare_actor(self)
self.throws = throws

def message(self, *args, **kwargs):
"""Build a message. This method is useful if you want to
Expand Down Expand Up @@ -162,7 +177,8 @@ def __str__(self):
return "Actor(%(actor_name)s)" % vars(self)


def actor(fn=None, *, actor_class=Actor, actor_name=None, queue_name="default", priority=0, broker=None, **options):
def actor(fn=None, *, actor_class=Actor, actor_name=None, queue_name="default", priority=0, broker=None, throws=None,
**options):
"""Declare an actor.

Examples:
Expand Down Expand Up @@ -199,6 +215,8 @@ def actor(fn=None, *, actor_class=Actor, actor_name=None, queue_name="default",
priority than the other then it will be processed first.
Lower numbers represent higher priorities.
broker(Broker): The broker to use with this actor.
throws(BaseException subclass or tuple of such):
One or more expected exceptions.
**options(dict): Arbitrary options that vary with the set of
middleware that you use. See ``get_broker().actor_options``.

Expand Down Expand Up @@ -226,6 +244,7 @@ def decorator(fn):
return actor_class(
fn, actor_name=actor_name, queue_name=queue_name,
priority=priority, broker=broker, options=options,
throws=throws,
)

if fn is None:
Expand Down
4 changes: 4 additions & 0 deletions dramatiq/middleware/retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def after_process_message(self, broker, message, *, result=None, exception=None)
retries = message.options.setdefault("retries", 0)
max_retries = message.options.get("max_retries") or actor.options.get("max_retries", self.max_retries)
retry_when = actor.options.get("retry_when", self.retry_when)
if actor.throws and isinstance(exception, actor.throws):
self.logger.info("Aborting message %r.", message.message_id)
message.fail()
return
if retry_when is not None and not retry_when(retries, exception) or \
retry_when is None and max_retries is not None and retries >= max_retries:
self.logger.warning("Retries exceeded for message %r.", message.message_id)
Expand Down
2 changes: 2 additions & 0 deletions dramatiq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,8 @@ def process_message(self, message):

if isinstance(e, RateLimitExceeded):
self.logger.debug("Rate limit exceeded in message %s: %s.", message, e)
elif actor.throws and isinstance(e, actor.throws):
self.logger.info("Failed to process message %s with expected exception %s.", message, type(e).__name__)
elif not isinstance(e, Retry):
self.logger.error("Failed to process message %s with unhandled exception.", message, exc_info=True)

Expand Down
87 changes: 87 additions & 0 deletions tests/test_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,3 +626,90 @@ def accessor(x):
# When I try to access the current message from a non-worker thread
# Then I should get back None
assert CurrentMessage.get_current_message() is None


def test_actor_throws_can_be_assigned_with_class(stub_broker, stub_worker):
# Given I have defined an exception
class MyException(Exception):
pass

# And I define an actor with that exception passed to throws
@dramatiq.actor(throws=MyException)
def add(x, y):
return x + y

# I expect throws to be equal to MyException
assert add.throws == MyException


def test_actor_throws_can_be_assigned_with_tuple(stub_broker, stub_worker):
# Given I have defined an exception
class MyException(Exception):
pass

# And I have defined a second exception
class MyException2(Exception):
pass

# And I define an actor with that exception passed to throws
@dramatiq.actor(throws=(MyException, MyException2))
def add(x, y):
return x + y

# I expect throws to be equal to (MyException, MyException2)
assert add.throws == (MyException, MyException2)


def test_actor_throws_validates_throws(stub_broker, stub_worker):
# If I define an actor with invalid throws argument
# I expect tha to fail
with pytest.raises(TypeError):
@dramatiq.actor(throws=("bla",))
def add(x, y):
return x + y


def test_actor_with_throws_logs_info_and_does_not_retry(stub_broker, stub_worker):
# Given that I have a database
retries = []

# And an exception that will be expected to thrown
class MyException(Exception):
pass

# And an actor that raises MyException
@dramatiq.actor(throws=MyException)
def do_work():
if sum(retries) == 0:
raise MyException("Expected Failure")
else:
retries.append(1)

# And that I've mocked the logging classes
with patch("logging.Logger.error") as error_mock, \
patch("logging.Logger.warning") as warning_mock, patch("logging.Logger.info") as info_mock:
# When I send that actor a message
do_work.send()

# And join on the queue
stub_broker.join(do_work.queue_name)
stub_worker.join()

# Then no error should be logged
error_messages = [args[0] for _, args, _ in error_mock.mock_calls]
assert error_messages == []

# And no warnings should be logged
warnings = [args[0] for _, args, _ in warning_mock.mock_calls]
assert warnings == []

# And info mock should contain two messages
info_messages = [args[0] for _, args, _ in info_mock.mock_calls]
expected_messages = [
"Failed to process message %s with expected exception %s.",
"Aborting message %r."
]
assert info_messages == expected_messages

# And no retries should be made
assert sum(retries) == 0