Skip to content

Commit

Permalink
Track requeue_timestamp for retried messages
Browse files Browse the repository at this point in the history
* Updated Retries middleware

It now records requeue_timestamp in message options in before_enqueue function if message failed execution

* Improved test

It now records and asserts all requeue timestamps

* Refactorings

Moved logic for recording requeue_timestamp from before_enqueue to after_process_message
  • Loading branch information
kuba-lilz authored Jun 12, 2024
1 parent 052fea0 commit 7f8caac
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 0 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ of those changes to CLEARTYPE SRL.
| [@pahrohfit](https://github.com/pahrohfit/) | Robert Dailey |
| [@nhairs](https://github.com/nhairs) | Nicholas Hairs |
| [@5tefan](https://github.com/5tefan/) | Stefan Codrescu |
| [@kuba-lilz](https://github.com/kuba-lilz/) | Jakub Kolodziejczyk |
4 changes: 4 additions & 0 deletions dramatiq/middleware/retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import time
import traceback

from ..common import compute_backoff
Expand Down Expand Up @@ -96,6 +97,9 @@ def after_process_message(self, broker, message, *, result=None, exception=None)
message.options["retries"] += 1
message.options["traceback"] = traceback.format_exc(limit=30)

# Record in message options time at which it is requeued
message.options["requeue_timestamp"] = int(time.time() * 1000)

max_retries = message.options.get("max_retries") or actor.options.get("max_retries", self.max_retries)
retry_when = actor.options.get("retry_when", self.retry_when)
if retry_when is not None and not retry_when(retries, exception) or \
Expand Down
35 changes: 35 additions & 0 deletions tests/middleware/test_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,38 @@ def do_work():

# And the message should not be retried
assert sum(attempts) == 1


def test_message_contains_requeue_time_after_retry(stub_broker, stub_worker):

# Given that I have a database
requeue_timestamps = []

stub_broker.add_middleware(dramatiq.middleware.CurrentMessage())
max_retries = 2

# And an actor that raises an exception and should be retried
@dramatiq.actor(max_retries=max_retries, min_backoff=100, max_backoff=100)
def do_work():

current_message = dramatiq.middleware.CurrentMessage.get_current_message()

if "requeue_timestamp" in current_message.options:
requeue_timestamps.append(current_message.options["requeue_timestamp"])

raise RuntimeError()

message = do_work.send()

# When I join on the queue and run the actor
stub_broker.join(do_work.queue_name)
stub_worker.join()

# Then I expect correct number of requeue timestamps recorded
assert len(requeue_timestamps) == max_retries

# And that requeue timestamps are in increasing order
assert all(requeue_timestamps[i] < requeue_timestamps[i + 1] for i in range(len(requeue_timestamps) - 1))

# And that all requeue timestamps are larger than message timestamp
assert all(requeue_time > message.message_timestamp for requeue_time in requeue_timestamps)

0 comments on commit 7f8caac

Please sign in to comment.