Skip to content

Commit

Permalink
feat: add the sqsx.RawQueue (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
allisson committed Dec 14, 2023
1 parent 69f68a5 commit 7d9db2a
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 152 deletions.
80 changes: 73 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

A simple task processor for Amazon SQS.

## quickstart
## Quickstart

For this demonstration we will use elasticmq locally using docker:

Expand All @@ -20,7 +20,11 @@ Install the package:
pip install sqsx
```

Now let's create a script that will create some tasks and we will consume them:
### Working with sqsx.Queue

We use sqsx.Queue when we need to work with scheduling and consuming tasks.

Now let's create a script that will create a new task and we will consume them:

```python
# file script.py
Expand Down Expand Up @@ -56,23 +60,85 @@ queue = Queue(url=queue_url, sqs_client=sqs_client)
# add a new task
queue.add_task("my_task", a=1, b=2, c=3)

# create the task handler, the first argument must be the context
def task_handler(context, a, b, c):
# create the task handler, which must be a simple function like this
def task_handler(context: dict, a: int, b: int, c: int):
print(f"context={context}, a={a}, b={b}, c={c}")

# add a new task handler
queue.add_task_handler("my_task", task_handler)

# start the consumption of tasks, to stop press ctrl+c to exit gracefully
queue.consume_tasks()
# start the consumption of messages, to stop press ctrl+c to exit gracefully
queue.consume_messages()
```

Running the script:

```bash
python script.py
INFO:sqsx.queue:Starting consuming tasks, queue_url=http://localhost:9324/000000000000/tests
context={'queue_url': 'http://localhost:9324/000000000000/tests', 'task_name': 'my_task', 'sqs_message': {'MessageId': '0c126462-0184-485b-b66f-77ed0b6f3780', 'ReceiptHandle': '0c126462-0184-485b-b66f-77ed0b6f3780#2959620c-7ead-4e12-80a7-672530c43f26', 'MD5OfBody': '8087eb7436895841c5d646156a8a469f', 'Body': 'eyJrd2FyZ3MiOiB7ImEiOiAxLCAiYiI6IDIsICJjIjogM319', 'Attributes': {'SentTimestamp': '1702527171600', 'ApproximateReceiveCount': '1', 'ApproximateFirstReceiveTimestamp': '1702527171603', 'SenderId': '127.0.0.1'}, 'MD5OfMessageAttributes': '5346f2cd7c539a880febaf9112a86921', 'MessageAttributes': {'TaskName': {'StringValue': 'my_task', 'DataType': 'String'}}}}, a=1, b=2, c=3
context={'queue_url': 'http://localhost:9324/000000000000/tests', 'task_name': 'my_task', 'sqs_message': {'MessageId': '42513c2d-ac93-4701-bb63-83b45e6fe2ca', 'ReceiptHandle': '42513c2d-ac93-4701-bb63-83b45e6fe2ca#6eb5443b-a2eb-454e-8619-86f6d2e67561', 'MD5OfBody': '8087eb7436895841c5d646156a8a469f', 'Body': 'eyJrd2FyZ3MiOiB7ImEiOiAxLCAiYiI6IDIsICJjIjogM319', 'Attributes': {'SentTimestamp': '1702573178736', 'ApproximateReceiveCount': '1', 'ApproximateFirstReceiveTimestamp': '1702573178740', 'SenderId': '127.0.0.1'}, 'MD5OfMessageAttributes': '5346f2cd7c539a880febaf9112a86921', 'MessageAttributes': {'TaskName': {'StringValue': 'my_task', 'DataType': 'String'}}}}, a=1, b=2, c=3
DEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests
DEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests
^CINFO:sqsx.queue:Starting graceful shutdown process
INFO:sqsx.queue:Stopping consuming tasks, queue_url=http://localhost:9324/000000000000/tests
```

### Working with sqsx.RawQueue

We use sqsx.RawQueue when we need to work with one handler consuming all the queue messages.

Now let's create a script that will create a new message and we will consume them:

```python
# file raw_script.py
import logging

import boto3

from sqsx import RawQueue

# configure the logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger('botocore').setLevel(logging.CRITICAL)
logging.getLogger('urllib3').setLevel(logging.CRITICAL)

# create the sqs_client
queue_url = "http://localhost:9324/000000000000/tests"
queue_name = "tests"
sqs_client = boto3.client(
"sqs",
endpoint_url="http://localhost:9324",
region_name="elasticmq",
aws_secret_access_key="x",
aws_access_key_id="x",
use_ssl=False,
)

# create the new sqs queue
sqs_client.create_queue(QueueName=queue_name)

# create the message handler, which must be a simple function like this
def message_handler(queue_url: str, sqs_message: dict):
print(f"queue_url={queue_url}, sqs_message={sqs_message}")

# create the sqsx.Queue
queue = RawQueue(url=queue_url, message_handler_function=message_handler, sqs_client=sqs_client)

# add a new message
queue.add_message(
message_body="My Message",
message_attributes={"Attr1": {"DataType": "String", "StringValue": "Attr1"}},
)

# start the consumption of messages, to stop press ctrl+c to exit gracefully
queue.consume_messages()
```

Running the script:

```bash
INFO:sqsx.queue:Starting consuming tasks, queue_url=http://localhost:9324/000000000000/tests
queue_url=http://localhost:9324/000000000000/tests, sqs_message={'MessageId': 'fb2ed6cf-9346-4ded-8cfe-4fc297f95928', 'ReceiptHandle': 'fb2ed6cf-9346-4ded-8cfe-4fc297f95928#bd9f27a6-0a73-4d27-9c1e-0947f21d3c02', 'MD5OfBody': '069840f6917e85a02167febb964f0041', 'Body': 'My Message', 'Attributes': {'SentTimestamp': '1702573585302', 'ApproximateReceiveCount': '1', 'ApproximateFirstReceiveTimestamp': '1702573585306', 'SenderId': '127.0.0.1'}, 'MD5OfMessageAttributes': '90f34a800b9d242c1b32320e4a3ed630', 'MessageAttributes': {'Attr1': {'StringValue': 'Attr1', 'DataType': 'String'}}}
DEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests
DEBUG:sqsx.queue:Waiting some seconds because no message was received, seconds=10, queue_url=http://localhost:9324/000000000000/tests
^CINFO:sqsx.queue:Starting graceful shutdown process
Expand Down
Loading

0 comments on commit 7d9db2a

Please sign in to comment.