Skip to content

Commit

Permalink
feat(middleware): add CurrentMessage middleware
Browse files Browse the repository at this point in the history
Closes #208
  • Loading branch information
Bogdanp committed Apr 15, 2019
1 parent d7da3c7 commit 48e59ec
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 2 deletions.
3 changes: 3 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ Added
(`#191`_, `@bersace`_)
* Support for eagerly raising actor exceptions in the joining thread
with the |StubBroker|. (`#195`_, `#203`_)
* Support for accessing the current message from an actor via
|CurrentMessage|. (`#208`_)

.. _#179: https://github.com/Bogdanp/dramatiq/issues/179
.. _#183: https://github.com/Bogdanp/dramatiq/issues/183
.. _#184: https://github.com/Bogdanp/dramatiq/issues/184
.. _#191: https://github.com/Bogdanp/dramatiq/pull/191
.. _#195: https://github.com/Bogdanp/dramatiq/issues/195
.. _#203: https://github.com/Bogdanp/dramatiq/pull/203
.. _#208: https://github.com/Bogdanp/dramatiq/issues/208
.. _@bersace: https://github.com/bersace
.. _@davidt99: https://github.com/davidt99
.. _@xelhark: https://github.com/xelhark
Expand Down
1 change: 1 addition & 0 deletions docs/source/global.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
.. |Brokers| replace:: :class:`Brokers<dramatiq.Broker>`
.. |Broker| replace:: :class:`Broker<dramatiq.Broker>`
.. |Callbacks| replace:: :class:`Callbacks<dramatiq.middleware.Callbacks>`
.. |CurrentMessage| replace:: :class:`CurrentMessage<dramatiq.middleware.CurrentMessage>`
.. |DramatiqError| replace:: :class:`DramatiqError<dramatiq.DramatiqError>`
.. |Encoders| replace:: :class:`Encoders<dramatiq.Encoder>`
.. |GenericActors| replace:: :class:`class-based actors<dramatiq.GenericActor>`
Expand Down
1 change: 1 addition & 0 deletions docs/source/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ The following middleware are all enabled by default.
:member-order: bysource
.. autoclass:: dramatiq.middleware.AgeLimit
.. autoclass:: dramatiq.middleware.Callbacks
.. autoclass:: dramatiq.middleware.CurrentMessage
.. autoclass:: dramatiq.middleware.Pipelines
.. autoclass:: dramatiq.middleware.Prometheus
.. autoclass:: dramatiq.middleware.Retries
Expand Down
3 changes: 2 additions & 1 deletion dramatiq/middleware/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from .age_limit import AgeLimit
from .callbacks import Callbacks
from .current_message import CurrentMessage
from .middleware import Middleware, MiddlewareError, SkipMessage
from .pipelines import Pipelines
from .retries import Retries
Expand All @@ -40,7 +41,7 @@
"Interrupt", "raise_thread_exception",

# Middlewares
"AgeLimit", "Callbacks", "Pipelines", "Retries",
"AgeLimit", "Callbacks", "CurrentMessage", "Pipelines", "Retries",
"Shutdown", "ShutdownNotifications", "TimeLimit", "TimeLimitExceeded",
]

Expand Down
49 changes: 49 additions & 0 deletions dramatiq/middleware/current_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# This file is a part of Dramatiq.
#
# Copyright (C) 2019 CLEARTYPE SRL <bogdan@cleartype.io>
#
# Dramatiq is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Dramatiq is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from threading import local

from .middleware import Middleware


class CurrentMessage(Middleware):
"""Middleware that exposes the current message via a thread-local
variable.
Example:
>>> import dramatiq
>>> from dramatiq.middleware import CurrentMessage
>>> @dramatiq.actor
... def example(x):
... print(CurrentMessage.get_current_message())
...
>>> example.send(1)
"""

STATE = local()

@classmethod
def get_current_message(cls):
return getattr(cls.STATE, "message", None)

def before_process_message(self, broker, message):
setattr(self.STATE, "message", message)

def after_process_message(self, broker, message, *, result=None, exception=None):
delattr(self.STATE, "message")
29 changes: 28 additions & 1 deletion tests/test_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import dramatiq
from dramatiq import Message, Middleware
from dramatiq.errors import RateLimitExceeded
from dramatiq.middleware import SkipMessage
from dramatiq.middleware import CurrentMessage, SkipMessage

from .common import skip_on_pypy, worker

Expand Down Expand Up @@ -554,3 +554,30 @@ def raise_rate_limit_exceeded():
# Then warning mock should be called with a special message
warning_messages = [args[0] for _, args, _ in warning_mock.mock_calls]
assert "Rate limit exceeded in message %s: %s." in warning_messages


def test_currrent_message_middleware_exposes_the_current_message(stub_broker, stub_worker):
# Given that I have a CurrentMessage middleware
stub_broker.add_middleware(CurrentMessage())

# And an actor that accesses the current message
sent_messages = []
received_messages = []

@dramatiq.actor
def accessor(x):
received_messages.append(CurrentMessage.get_current_message())

# When I send it a couple messages
sent_messages.append(accessor.send(1))
sent_messages.append(accessor.send(2))

# And wait for it to finish its work
stub_broker.join(accessor.queue_name)

# Then the sent messages and the received messages should be the same
assert sent_messages == received_messages

# When I try to access the current message from a non-worker thread
# Then I should get back None
assert CurrentMessage.get_current_message() is None

0 comments on commit 48e59ec

Please sign in to comment.