From 0a0b6eafed26eb4ec7cdfc5e402df05009b32f88 Mon Sep 17 00:00:00 2001 From: Takis Panagopoulos Date: Tue, 7 Apr 2020 01:02:06 +0300 Subject: [PATCH 1/2] Add throws to actors for handlind expected exceptions --- dramatiq/actor.py | 22 ++++++++- dramatiq/middleware/retries.py | 4 ++ dramatiq/worker.py | 2 + tests/test_actors.py | 87 ++++++++++++++++++++++++++++++++++ 4 files changed, 113 insertions(+), 2 deletions(-) diff --git a/dramatiq/actor.py b/dramatiq/actor.py index 88077116..be6b1963 100644 --- a/dramatiq/actor.py +++ b/dramatiq/actor.py @@ -39,9 +39,22 @@ class Actor: priority(int): The actor's priority. options(dict): Arbitrary options that are passed to the broker and middleware. + throws(BaseException subclass or tuple of such): + One or more expected exceptions. """ - def __init__(self, fn, *, broker, actor_name, queue_name, priority, options): + def __init__(self, fn, *, broker, actor_name, queue_name, priority, options, throws): + if throws: + # Validate throws can be used in an except statement + try: + try: + raise Exception() + except throws: + pass + except Exception: + pass + except TypeError: + raise TypeError("'throws' must be a subclass of BaseException or a tuple of such.") self.logger = get_logger(fn.__module__, actor_name) self.fn = fn self.broker = broker @@ -50,6 +63,7 @@ def __init__(self, fn, *, broker, actor_name, queue_name, priority, options): self.priority = priority self.options = options self.broker.declare_actor(self) + self.throws = throws def message(self, *args, **kwargs): """Build a message. This method is useful if you want to @@ -162,7 +176,8 @@ def __str__(self): return "Actor(%(actor_name)s)" % vars(self) -def actor(fn=None, *, actor_class=Actor, actor_name=None, queue_name="default", priority=0, broker=None, **options): +def actor(fn=None, *, actor_class=Actor, actor_name=None, queue_name="default", priority=0, broker=None, throws=None, + **options): """Declare an actor. Examples: @@ -199,6 +214,8 @@ def actor(fn=None, *, actor_class=Actor, actor_name=None, queue_name="default", priority than the other then it will be processed first. Lower numbers represent higher priorities. broker(Broker): The broker to use with this actor. + throws(BaseException subclass or tuple of such): + One or more expected exceptions. **options(dict): Arbitrary options that vary with the set of middleware that you use. See ``get_broker().actor_options``. @@ -226,6 +243,7 @@ def decorator(fn): return actor_class( fn, actor_name=actor_name, queue_name=queue_name, priority=priority, broker=broker, options=options, + throws=throws, ) if fn is None: diff --git a/dramatiq/middleware/retries.py b/dramatiq/middleware/retries.py index 9bdd1216..55f66a17 100644 --- a/dramatiq/middleware/retries.py +++ b/dramatiq/middleware/retries.py @@ -74,6 +74,10 @@ def after_process_message(self, broker, message, *, result=None, exception=None) retries = message.options.setdefault("retries", 0) max_retries = message.options.get("max_retries") or actor.options.get("max_retries", self.max_retries) retry_when = actor.options.get("retry_when", self.retry_when) + if actor.throws and isinstance(exception, actor.throws): + self.logger.info("Aborting message %r.", message.message_id) + message.fail() + return if retry_when is not None and not retry_when(retries, exception) or \ retry_when is None and max_retries is not None and retries >= max_retries: self.logger.warning("Retries exceeded for message %r.", message.message_id) diff --git a/dramatiq/worker.py b/dramatiq/worker.py index 15dc97d4..821f7b22 100644 --- a/dramatiq/worker.py +++ b/dramatiq/worker.py @@ -484,6 +484,8 @@ def process_message(self, message): if isinstance(e, RateLimitExceeded): self.logger.debug("Rate limit exceeded in message %s: %s.", message, e) + elif actor.throws and isinstance(e, actor.throws): + self.logger.info("Failed to process message %s with expected exception %s.", message, type(e).__name__) elif not isinstance(e, Retry): self.logger.error("Failed to process message %s with unhandled exception.", message, exc_info=True) diff --git a/tests/test_actors.py b/tests/test_actors.py index 627bbc61..07041dc1 100644 --- a/tests/test_actors.py +++ b/tests/test_actors.py @@ -626,3 +626,90 @@ def accessor(x): # When I try to access the current message from a non-worker thread # Then I should get back None assert CurrentMessage.get_current_message() is None + + +def test_actor_throws_can_be_assigned_with_class(stub_broker, stub_worker): + # Given I have defined an exception + class MyException(Exception): + pass + + # And I define an actor with that exception passed to throws + @dramatiq.actor(throws=MyException) + def add(x, y): + return x + y + + # I expect throws to be equal to MyException + assert add.throws == MyException + + +def test_actor_throws_can_be_assigned_with_tuple(stub_broker, stub_worker): + # Given I have defined an exception + class MyException(Exception): + pass + + # And I have defined a second exception + class MyException2(Exception): + pass + + # And I define an actor with that exception passed to throws + @dramatiq.actor(throws=(MyException, MyException2)) + def add(x, y): + return x + y + + # I expect throws to be equal to (MyException, MyException2) + assert add.throws == (MyException, MyException2) + + +def test_actor_throws_validates_throws(stub_broker, stub_worker): + # If I define an actor with invalid throws argument + # I expect tha to fail + with pytest.raises(TypeError): + @dramatiq.actor(throws=("bla",)) + def add(x, y): + return x + y + + +def test_actor_with_throws_logs_info_and_does_not_retry(stub_broker, stub_worker): + # Given that I have a database + retries = [] + + # And an exception that will be expected to thrown + class MyException(Exception): + pass + + # And an actor that raises MyException + @dramatiq.actor(throws=MyException) + def do_work(): + if sum(retries) == 0: + raise MyException("Expected Failure") + else: + retries.append(1) + + # And that I've mocked the logging classes + with patch("logging.Logger.error") as error_mock, \ + patch("logging.Logger.warning") as warning_mock, patch("logging.Logger.info") as info_mock: + # When I send that actor a message + do_work.send() + + # And join on the queue + stub_broker.join(do_work.queue_name) + stub_worker.join() + + # Then no error should be logged + error_messages = [args[0] for _, args, _ in error_mock.mock_calls] + assert error_messages == [] + + # And no warnings should be logged + warnings = [args[0] for _, args, _ in warning_mock.mock_calls] + assert warnings == [] + + # And info mock should contain two messages + info_messages = [args[0] for _, args, _ in info_mock.mock_calls] + expected_messages = [ + "Failed to process message %s with expected exception %s.", + "Aborting message %r." + ] + assert info_messages == expected_messages + + # And no retries should be made + assert sum(retries) == 0 From 5e5172cbdd074ce43265667f56772bfd038ca3b3 Mon Sep 17 00:00:00 2001 From: Takis Panagopoulos Date: Fri, 10 Apr 2020 23:18:22 +0300 Subject: [PATCH 2/2] Add a more straightforward validation of throws arg --- dramatiq/actor.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/dramatiq/actor.py b/dramatiq/actor.py index be6b1963..acb1713a 100644 --- a/dramatiq/actor.py +++ b/dramatiq/actor.py @@ -15,6 +15,7 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . +import inspect import re import time @@ -44,16 +45,16 @@ class Actor: """ def __init__(self, fn, *, broker, actor_name, queue_name, priority, options, throws): - if throws: + if throws is not None: # Validate throws can be used in an except statement - try: - try: - raise Exception() - except throws: - pass - except Exception: - pass - except TypeError: + def is_exception(exc): + return inspect.isclass(exc) and issubclass(exc, BaseException) + if ( + not ( + is_exception(throws) or + (isinstance(throws, tuple) and all([is_exception(exc) for exc in throws])) + ) + ): raise TypeError("'throws' must be a subclass of BaseException or a tuple of such.") self.logger = get_logger(fn.__module__, actor_name) self.fn = fn