Skip to content

Commit

Permalink
feat: add exceptions to change the default retry behavior (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
allisson committed Dec 15, 2023
1 parent 19df5cf commit 9175c65
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 23 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,35 @@ DEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=1
^CINFO:sqsx.queue:Starting graceful shutdown process
INFO:sqsx.queue:Stopping consuming tasks, queue_url=http://localhost:9324/000000000000/tests
```

### Working with exceptions

The default behavior is to retry the message when an exception is raised, you can change this behavior using the exceptions sqsx.exceptions.Retry and sqsx.exceptions.NoRetry.

If you want to change the backoff policy, use the sqsx.exceptions.Retry like this:

```python
from sqsx.exceptions import Retry

# to use with sqsx.Queue and change the default backoff policy
def task_handler(context: dict, a: int, b: int, c: int):
raise Retry(min_backoff_seconds=100, max_backoff_seconds=200)

# to use with sqsx.RawQueue and change the default backoff policy
def message_handler(queue_url: str, sqs_message: dict):
raise Retry(min_backoff_seconds=100, max_backoff_seconds=200)
```

If you want to remove the task or message from the queue use the sqsx.exceptions.Retry like this:

```python
from sqsx.exceptions import NoRetry

# to use with sqsx.Queue and remove the task
def task_handler(context: dict, a: int, b: int, c: int):
raise NoRetry()

# to use with sqsx.RawQueue and remove the message
def message_handler(queue_url: str, sqs_message: dict):
raise NoRetry()
```
14 changes: 7 additions & 7 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sqsx"
version = "0.2.0"
version = "0.3.0"
description = "A simple task processor for Amazon SQS"
authors = ["Allisson Azevedo"]
readme = "README.md"
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

setup(
name="sqsx",
version="0.2.0",
version="0.3.0",
description="A simple task processor for Amazon SQS",
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
16 changes: 16 additions & 0 deletions sqsx/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
class Retry(Exception):
"""
This exception must be used when we need a custom backoff config
"""

def __init__(self, min_backoff_seconds: int, max_backoff_seconds: int):
self.min_backoff_seconds = min_backoff_seconds
self.max_backoff_seconds = max_backoff_seconds


class NoRetry(Exception):
"""
This exception must be used when we need that the message will be removed from the queue
"""

pass
43 changes: 37 additions & 6 deletions sqsx/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import signal
import time
from concurrent.futures import ThreadPoolExecutor, wait
from typing import Any, Callable, Dict
from typing import Any, Callable, Dict, Optional

from pydantic import BaseModel, Field, PrivateAttr

from sqsx.exceptions import NoRetry, Retry
from sqsx.helper import backoff_calculator_seconds, base64_to_dict, dict_to_base64

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -57,12 +58,17 @@ def _message_ack(self, sqs_message: dict) -> None:
receipt_handle = sqs_message["ReceiptHandle"]
self.sqs_client.delete_message(QueueUrl=self.url, ReceiptHandle=receipt_handle)

def _message_nack(self, sqs_message: dict) -> None:
def _message_nack(
self,
sqs_message: dict,
min_backoff_seconds: Optional[int] = None,
max_backoff_seconds: Optional[int] = None,
) -> None:
min_backoff_seconds = min_backoff_seconds if min_backoff_seconds else self.min_backoff_seconds
max_backoff_seconds = max_backoff_seconds if max_backoff_seconds else self.max_backoff_seconds
receipt_handle = sqs_message["ReceiptHandle"]
receive_count = int(sqs_message["Attributes"]["ApproximateReceiveCount"]) - 1
timeout = backoff_calculator_seconds(
receive_count, self.min_backoff_seconds, self.max_backoff_seconds
)
timeout = backoff_calculator_seconds(receive_count, min_backoff_seconds, max_backoff_seconds)
self.sqs_client.change_message_visibility(
QueueUrl=self.url, ReceiptHandle=receipt_handle, VisibilityTimeout=timeout
)
Expand Down Expand Up @@ -114,6 +120,20 @@ def _consume_message(self, sqs_message: dict) -> None:

try:
task_handler_function(context, **kwargs)
except Retry as exc:
logger.info(
f"Received an sqsx.Retry, setting a custom backoff policy, message_id={message_id}, task_name={task_name}"
)
return self._message_nack(
sqs_message,
min_backoff_seconds=exc.min_backoff_seconds,
max_backoff_seconds=exc.max_backoff_seconds,
)
except NoRetry:
logger.info(
f"Received an sqsx.NoRetry, removing the task, message_id={message_id}, task_name={task_name}"
)
return self._message_ack(sqs_message)
except Exception:
logger.exception(f"Error while processing, message_id={message_id}, task_name={task_name}")
return self._message_nack(sqs_message)
Expand All @@ -137,10 +157,21 @@ def add_message(self, message_body: str, message_attributes: dict = {}) -> dict:
)

def _consume_message(self, sqs_message: dict) -> None:
message_id = sqs_message["MessageId"]

try:
self.message_handler_function(self.url, sqs_message)
except Retry as exc:
logger.info(f"Received an sqsx.Retry, setting a custom backoff policy, message_id={message_id}")
return self._message_nack(
sqs_message,
min_backoff_seconds=exc.min_backoff_seconds,
max_backoff_seconds=exc.max_backoff_seconds,
)
except NoRetry:
logger.info(f"Received an sqsx.NoRetry, removing the message, message_id={message_id}")
return self._message_ack(sqs_message)
except Exception:
message_id = sqs_message["MessageId"]
logger.exception(f"Error while processing, message_id={message_id}")
return self._message_nack(sqs_message)

Expand Down
6 changes: 4 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@ def raw_queue_url():


@pytest.fixture
def queue(sqs_client, queue_url):
def queue(sqs_client, queue_url, caplog):
caplog.set_level("INFO")
sqs_client.create_queue(QueueName=queue_url.split("/")[-1])
yield Queue(url=queue_url, sqs_client=sqs_client)
sqs_client.delete_queue(QueueUrl=queue_url)


@pytest.fixture
def raw_queue(sqs_client, raw_queue_url):
def raw_queue(sqs_client, raw_queue_url, caplog):
def task_handler_function(queue_url, sqs_message):
print(f"queue_url={queue_url}, sqs_message={sqs_message}")

caplog.set_level("INFO")
sqs_client.create_queue(QueueName=raw_queue_url.split("/")[-1])
yield RawQueue(url=raw_queue_url, message_handler_function=task_handler_function, sqs_client=sqs_client)
sqs_client.delete_queue(QueueUrl=raw_queue_url)
Expand Down
73 changes: 67 additions & 6 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import pytest

from sqsx.exceptions import NoRetry, Retry
from sqsx.queue import queue_url_regex


Expand All @@ -18,10 +19,26 @@ def exception_handler(context, a, b, c):
raise Exception("BOOM!")


def retry_exception_handler(context, a, b, c):
raise Retry(min_backoff_seconds=100, max_backoff_seconds=200)


def no_retry_exception_handler(context, a, b, c):
raise NoRetry()


def raw_exception_handler(queue_url, sqs_message):
raise Exception("BOOM!")


def raw_retry_exception_handler(queue_url, sqs_message):
raise Retry(min_backoff_seconds=100, max_backoff_seconds=200)


def raw_no_retry_exception_handler(queue_url, sqs_message):
raise NoRetry()


def trigger_signal():
pid = os.getpid()
time.sleep(0.2)
Expand Down Expand Up @@ -163,9 +180,31 @@ def test_queue_consume_messages_with_task_handler_exception(queue, caplog):

queue.consume_messages(run_forever=False)

assert caplog.record_tuples[0][0] == "sqsx.queue"
assert caplog.record_tuples[0][1] == 40
assert "Error while processing" in caplog.record_tuples[0][2]
assert caplog.record_tuples[1][0] == "sqsx.queue"
assert caplog.record_tuples[1][1] == 40
assert "Error while processing" in caplog.record_tuples[1][2]


def test_queue_consume_messages_with_task_handler_retry_exception(queue, caplog):
queue.add_task_handler("my_task", retry_exception_handler)
queue.add_task("my_task", a=1, b=2, c=3)

queue.consume_messages(run_forever=False)

assert caplog.record_tuples[1][0] == "sqsx.queue"
assert caplog.record_tuples[1][1] == 20
assert "Received an sqsx.Retry, setting a custom backoff policy" in caplog.record_tuples[1][2]


def test_queue_consume_messages_with_task_handler_no_retry_exception(queue, caplog):
queue.add_task_handler("my_task", no_retry_exception_handler)
queue.add_task("my_task", a=1, b=2, c=3)

queue.consume_messages(run_forever=False)

assert caplog.record_tuples[1][0] == "sqsx.queue"
assert caplog.record_tuples[1][1] == 20
assert "Received an sqsx.NoRetry, removing the task" in caplog.record_tuples[1][2]


def test_queue_exit_gracefully(queue):
Expand Down Expand Up @@ -214,9 +253,31 @@ def test_raw_queue_consume_messages_with_message_handler_exception(raw_queue, ca
raw_queue.add_message(message_body="Message Body")
raw_queue.consume_messages(run_forever=False)

assert caplog.record_tuples[0][0] == "sqsx.queue"
assert caplog.record_tuples[0][1] == 40
assert "Error while processing" in caplog.record_tuples[0][2]
assert caplog.record_tuples[1][0] == "sqsx.queue"
assert caplog.record_tuples[1][1] == 40
assert "Error while processing" in caplog.record_tuples[1][2]


def test_raw_queue_consume_messages_with_message_handler_retry_exception(raw_queue, caplog):
raw_queue.message_handler_function = raw_retry_exception_handler

raw_queue.add_message(message_body="Message Body")
raw_queue.consume_messages(run_forever=False)

assert caplog.record_tuples[1][0] == "sqsx.queue"
assert caplog.record_tuples[1][1] == 20
assert "Received an sqsx.Retry, setting a custom backoff policy" in caplog.record_tuples[1][2]


def test_raw_queue_consume_messages_with_message_handler_no_retry_exception(raw_queue, caplog):
raw_queue.message_handler_function = raw_no_retry_exception_handler

raw_queue.add_message(message_body="Message Body")
raw_queue.consume_messages(run_forever=False)

assert caplog.record_tuples[1][0] == "sqsx.queue"
assert caplog.record_tuples[1][1] == 20
assert "Received an sqsx.NoRetry, removing the message" in caplog.record_tuples[1][2]


def test_raw_queue_exit_gracefully(raw_queue):
Expand Down

0 comments on commit 9175c65

Please sign in to comment.