diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 4f8e9420..2cfaf4ff 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -20,6 +20,8 @@ Added (`#191`_, `@bersace`_) * Support for eagerly raising actor exceptions in the joining thread with the |StubBroker|. (`#195`_, `#203`_) +* Support for accessing the current message from an actor via + |CurrentMessage|. (`#208`_) .. _#179: https://github.com/Bogdanp/dramatiq/issues/179 .. _#183: https://github.com/Bogdanp/dramatiq/issues/183 @@ -27,6 +29,7 @@ Added .. _#191: https://github.com/Bogdanp/dramatiq/pull/191 .. _#195: https://github.com/Bogdanp/dramatiq/issues/195 .. _#203: https://github.com/Bogdanp/dramatiq/pull/203 +.. _#208: https://github.com/Bogdanp/dramatiq/issues/208 .. _@bersace: https://github.com/bersace .. _@davidt99: https://github.com/davidt99 .. _@xelhark: https://github.com/xelhark diff --git a/docs/source/global.rst b/docs/source/global.rst index b2d54e5e..10c2bdd5 100644 --- a/docs/source/global.rst +++ b/docs/source/global.rst @@ -4,6 +4,7 @@ .. |Brokers| replace:: :class:`Brokers` .. |Broker| replace:: :class:`Broker` .. |Callbacks| replace:: :class:`Callbacks` +.. |CurrentMessage| replace:: :class:`CurrentMessage` .. |DramatiqError| replace:: :class:`DramatiqError` .. |Encoders| replace:: :class:`Encoders` .. |GenericActors| replace:: :class:`class-based actors` diff --git a/docs/source/reference.rst b/docs/source/reference.rst index 263ce287..d339477b 100644 --- a/docs/source/reference.rst +++ b/docs/source/reference.rst @@ -77,6 +77,7 @@ The following middleware are all enabled by default. :member-order: bysource .. autoclass:: dramatiq.middleware.AgeLimit .. autoclass:: dramatiq.middleware.Callbacks +.. autoclass:: dramatiq.middleware.CurrentMessage .. autoclass:: dramatiq.middleware.Pipelines .. autoclass:: dramatiq.middleware.Prometheus .. autoclass:: dramatiq.middleware.Retries diff --git a/dramatiq/middleware/__init__.py b/dramatiq/middleware/__init__.py index bb18c8c2..36891394 100644 --- a/dramatiq/middleware/__init__.py +++ b/dramatiq/middleware/__init__.py @@ -19,6 +19,7 @@ from .age_limit import AgeLimit from .callbacks import Callbacks +from .current_message import CurrentMessage from .middleware import Middleware, MiddlewareError, SkipMessage from .pipelines import Pipelines from .retries import Retries @@ -40,7 +41,7 @@ "Interrupt", "raise_thread_exception", # Middlewares - "AgeLimit", "Callbacks", "Pipelines", "Retries", + "AgeLimit", "Callbacks", "CurrentMessage", "Pipelines", "Retries", "Shutdown", "ShutdownNotifications", "TimeLimit", "TimeLimitExceeded", ] diff --git a/dramatiq/middleware/current_message.py b/dramatiq/middleware/current_message.py new file mode 100644 index 00000000..05c94362 --- /dev/null +++ b/dramatiq/middleware/current_message.py @@ -0,0 +1,49 @@ +# This file is a part of Dramatiq. +# +# Copyright (C) 2019 CLEARTYPE SRL +# +# Dramatiq is free software; you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or (at +# your option) any later version. +# +# Dramatiq is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +# License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +from threading import local + +from .middleware import Middleware + + +class CurrentMessage(Middleware): + """Middleware that exposes the current message via a thread-local + variable. + + Example: + >>> import dramatiq + >>> from dramatiq.middleware import CurrentMessage + + >>> @dramatiq.actor + ... def example(x): + ... print(CurrentMessage.get_current_message()) + ... + >>> example.send(1) + + """ + + STATE = local() + + @classmethod + def get_current_message(cls): + return getattr(cls.STATE, "message", None) + + def before_process_message(self, broker, message): + setattr(self.STATE, "message", message) + + def after_process_message(self, broker, message, *, result=None, exception=None): + delattr(self.STATE, "message") diff --git a/tests/test_actors.py b/tests/test_actors.py index cb66bd40..cacc1207 100644 --- a/tests/test_actors.py +++ b/tests/test_actors.py @@ -6,7 +6,7 @@ import dramatiq from dramatiq import Message, Middleware from dramatiq.errors import RateLimitExceeded -from dramatiq.middleware import SkipMessage +from dramatiq.middleware import CurrentMessage, SkipMessage from .common import skip_on_pypy, worker @@ -554,3 +554,30 @@ def raise_rate_limit_exceeded(): # Then warning mock should be called with a special message warning_messages = [args[0] for _, args, _ in warning_mock.mock_calls] assert "Rate limit exceeded in message %s: %s." in warning_messages + + +def test_currrent_message_middleware_exposes_the_current_message(stub_broker, stub_worker): + # Given that I have a CurrentMessage middleware + stub_broker.add_middleware(CurrentMessage()) + + # And an actor that accesses the current message + sent_messages = [] + received_messages = [] + + @dramatiq.actor + def accessor(x): + received_messages.append(CurrentMessage.get_current_message()) + + # When I send it a couple messages + sent_messages.append(accessor.send(1)) + sent_messages.append(accessor.send(2)) + + # And wait for it to finish its work + stub_broker.join(accessor.queue_name) + + # Then the sent messages and the received messages should be the same + assert sent_messages == received_messages + + # When I try to access the current message from a non-worker thread + # Then I should get back None + assert CurrentMessage.get_current_message() is None