Skip to content

Commit

Permalink
feat: initial code (#1)
Browse files Browse the repository at this point in the history
* feat: initial code

* feat: add setup.py
  • Loading branch information
allisson committed Dec 14, 2023
1 parent 45c789f commit 43929c1
Show file tree
Hide file tree
Showing 17 changed files with 1,882 additions and 1 deletion.
23 changes: 23 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: Execute tests/release

on:
push:
branches:
- main

permissions:
contents: write
pull-requests: write

jobs:
tests:
uses: ./.github/workflows/tests.yml
release-please:
needs: build
runs-on: ubuntu-latest
steps:
- uses: GoogleCloudPlatform/release-please-action@v4
with:
token: ${{ secrets.GITHUB_TOKEN }}
release-type: simple
package-name: sqsx
35 changes: 35 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: Execute lint and tests

on:
workflow_call:
push:

jobs:
tests:
runs-on: ubuntu-latest
services:
elasticmq:
image: softwaremill/elasticmq-native
ports:
- 9324:9324
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
steps:
- name: Install system dependencies
run: sudo apt update && sudo apt install --no-install-recommends -y make git
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install poetry
poetry config virtualenvs.create false
poetry install
- name: pre-commit lint
run: make lint
- name: pytest
run: make test
36 changes: 36 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: check-ast
- id: fix-byte-order-marker
- id: check-docstring-first
- id: check-json
- id: check-merge-conflict
- id: check-symlinks
- id: check-toml
- id: check-vcs-permalinks
- id: check-xml
- id: check-yaml
- id: debug-statements
- id: destroyed-symlinks
- id: end-of-file-fixer
- id: trailing-whitespace

- repo: https://github.com/pycqa/isort
rev: 5.13.1
hooks:
- id: isort
args: ["--overwrite-in-place"]

- repo: https://github.com/psf/black
rev: 23.12.0
hooks:
- id: black
args: ["--line-length=110"]

- repo: https://github.com/pycqa/flake8
rev: 6.1.0
hooks:
- id: flake8
args: ["--max-line-length=110", "--ignore=E203,E501,W503"]
Empty file added CHANGELOG.md
Empty file.
31 changes: 31 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
.PHONY: test
test:
poetry run pytest -v

.PHONY: lint
lint:
poetry run pre-commit run --all-files

.PHONY: start-queue
start-queue:
docker run --name pysqsx-elasticmq -p 9324:9324 -d softwaremill/elasticmq-native

.PHONY: stop-queue
stop-queue:
docker kill $$(docker ps -aqf name=pysqsx-elasticmq)
docker container rm $$(docker ps -aqf name=pysqsx-elasticmq)

.PHONY: clean
clean:
@rm -rf dist/
@rm -rf build/
@rm -rf *.egg-info

.PHONY: dist
dist: clean
poetry run python setup.py sdist
poetry run python setup.py bdist_wheel

.PHONY: release
release: dist
poetry run twine upload dist/*
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
# pysqsx
A simple queue processor for Amazon SQS
[![Tests](https://github.com/allisson/pysqsx/actions/workflows/tests.yml/badge.svg?branch=main)](https://github.com/allisson/pysqsx/actions/workflows/tests.yml)
![PyPI - Version](https://img.shields.io/pypi/v/sqsx)
![PyPI - Python Version](https://img.shields.io/pypi/pyversions/sqsx)
![GitHub License](https://img.shields.io/github/license/allisson/pysqsx)

A simple task processor for Amazon SQS.
1,278 changes: 1,278 additions & 0 deletions poetry.lock

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[tool.poetry]
name = "sqsx"
version = "0.1.0"
description = "A simple task processor for Amazon SQS"
authors = ["Allisson Azevedo"]
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.8"
boto3 = "^1.33.13"
pydantic = "^2.5.2"

[tool.poetry.group.test.dependencies]
pytest = "^7.4.3"
pytest-cov = "^4.1.0"
pre-commit = "3.5.0"

[tool.poetry.group.release.dependencies]
wheel = "^0.42.0"
twine = "^4.0.2"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.pytest.ini_options]
minversion = "7.4"
addopts = "-vvv --cov=sqsx --cov-report=term-missing"

[tool.isort]
profile = "black"
line_length = 110
force_alphabetical_sort_within_sections = true
sections = ["FUTURE", "STDLIB", "THIRDPARTY", "FIRSTPARTY", "LOCALFOLDER"]
default_section = "THIRDPARTY"
31 changes: 31 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import pathlib

from setuptools import find_packages, setup

here = pathlib.Path(__file__).parent.resolve()
long_description = (here / "README.md").read_text(encoding="utf-8")

setup(
name="sqsx",
version="0.1.0",
description="A simple task processor for Amazon SQS",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/allisson/pysqsx",
author="Allisson Azevedo",
author_email="allisson@gmail.com",
classifiers=[
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
],
keywords="aws, sqs",
packages=find_packages(),
install_requires=["boto3>=1.33.13", "pydantic>=2.5.2"],
tests_require=["pytest>=7.4.3", "pytest-cov>=4.1.0", "pre-commit==3.5.0"],
)
1 change: 1 addition & 0 deletions sqsx/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from sqsx.queue import Queue # noqa
15 changes: 15 additions & 0 deletions sqsx/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import base64
import json


def dict_to_base64(data: dict) -> str:
return base64.urlsafe_b64encode(json.dumps(data).encode()).decode()


def base64_to_dict(data: str) -> dict:
return json.loads(base64.urlsafe_b64decode(data).decode())


def backoff_calculator_seconds(retries: int, minimum: int, maximum: int) -> int:
maximum = min(maximum, 43200)
return min(minimum * 2**retries, maximum)
118 changes: 118 additions & 0 deletions sqsx/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import logging
import signal
import time
from concurrent.futures import ThreadPoolExecutor, wait
from typing import Any, Callable, Dict

from pydantic import BaseModel, Field, PrivateAttr

from sqsx.helper import backoff_calculator_seconds, base64_to_dict, dict_to_base64

logger = logging.getLogger(__name__)


class Queue(BaseModel):
url: str = Field(pattern=r"(http|https)[:][\/]{2}[a-zA-Z0-9-_:.]+[\/][0-9]{12}[\/]{1}[a-zA-Z0-9-_]{0,80}")
sqs_client: Any
min_backoff_seconds: int = Field(default=30)
max_backoff_seconds: int = Field(default=900)
_handlers: Dict[str, Callable] = PrivateAttr(default={})
_should_consume_tasks_stop: bool = PrivateAttr(default=False)

def add_task(self, task_name: str, **task_kwargs) -> dict:
return self.sqs_client.send_message(
QueueUrl=self.url,
MessageAttributes={"TaskName": {"DataType": "String", "StringValue": task_name}},
MessageBody=dict_to_base64({"kwargs": task_kwargs}),
)

def add_task_handler(self, task_name: str, fn: Callable) -> None:
self._handlers.update({task_name: fn})

def consume_tasks(
self, max_tasks: int = 1, max_threads: int = 1, wait_seconds: int = 10, run_forever: bool = True
) -> None:
logger.info(f"Starting consuming tasks, queue_url={self.url}")
signal.signal(signal.SIGINT, self._exit_gracefully)
signal.signal(signal.SIGTERM, self._exit_gracefully)

while True:
if self._should_consume_tasks_stop:
logger.info(f"Stopping consuming tasks, queue_url={self.url}")
break

response = self.sqs_client.receive_message(
QueueUrl=self.url,
AttributeNames=["All"],
MaxNumberOfMessages=max_tasks,
MessageAttributeNames=["All"],
)

sqs_messages = response.get("Messages", [])
if not sqs_messages:
logger.debug(
f"Waiting some seconds because no message was received, seconds={wait_seconds}, queue_url={self.url}"
)
time.sleep(wait_seconds)
continue

with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
for sqs_message in sqs_messages:
futures.append(executor.submit(self._consume_task, sqs_message))
wait(futures)

if not run_forever:
break

def _exit_gracefully(self, signal_num, current_stack_frame) -> None:
logger.info("Starting graceful shutdown process")
self._should_consume_tasks_stop = True

def _consume_task(self, sqs_message: dict) -> None:
message_id = sqs_message["MessageId"]
task_name_attribute = sqs_message["MessageAttributes"].get("TaskName")
if task_name_attribute is None:
logger.warning(f"Message without TaskName attribute, message_id={message_id}")
return self._message_nack(sqs_message)

task_name = task_name_attribute["StringValue"]
fn = self._handlers.get(task_name)
if fn is None:
logger.warning(f"Task handler not found, message_id={message_id}, task_name={task_name}")
return self._message_nack(sqs_message)

try:
message_data = base64_to_dict(sqs_message["Body"])
except Exception:
logger.exception(f"Invalid message body, message_id={message_id}, task_name={task_name}")
return self._message_nack(sqs_message)

kwargs = message_data["kwargs"]
context = {
"queue_url": self.url,
"task_name": task_name,
"sqs_message": sqs_message,
}

try:
fn(context, **kwargs)
except Exception:
logger.exception(f"Error while processing, message_id={message_id}, task_name={task_name}")
return self._message_nack(sqs_message)

self._message_ack(sqs_message)

def _message_ack(self, sqs_message: dict) -> None:
receipt_handle = sqs_message["ReceiptHandle"]
self.sqs_client.delete_message(QueueUrl=self.url, ReceiptHandle=receipt_handle)

def _message_nack(self, sqs_message: dict) -> None:
receipt_handle = sqs_message["ReceiptHandle"]
receive_count = int(sqs_message["Attributes"]["ApproximateReceiveCount"]) - 1
timeout = backoff_calculator_seconds(
receive_count, self.min_backoff_seconds, self.max_backoff_seconds
)
self.sqs_client.change_message_visibility(
QueueUrl=self.url, ReceiptHandle=receipt_handle, VisibilityTimeout=timeout
)
Empty file added tests/__init__.py
Empty file.
46 changes: 46 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import boto3
import pytest

from sqsx.queue import Queue


@pytest.fixture
def sqs_client():
return boto3.client(
"sqs",
endpoint_url="http://localhost:9324",
region_name="elasticmq",
aws_secret_access_key="x",
aws_access_key_id="x",
use_ssl=False,
)


@pytest.fixture
def queue_url():
return "http://localhost:9324/000000000000/tests"


@pytest.fixture
def queue(sqs_client, queue_url):
sqs_client.create_queue(QueueName=queue_url.split("/")[-1])
yield Queue(url=queue_url, sqs_client=sqs_client)
sqs_client.delete_queue(QueueUrl=queue_url)


@pytest.fixture
def sqs_message():
return {
"MessageId": "33425f12-50e6-4f93-ac26-7ae7a069cf88",
"ReceiptHandle": "33425f12-50e6-4f93-ac26-7ae7a069cf88#d128816c-aea8-406b-bbdd-1edbacb5573f",
"MD5OfBody": "8087eb7436895841c5d646156a8a469f",
"Body": "eyJrd2FyZ3MiOiB7ImEiOiAxLCAiYiI6IDIsICJjIjogM319",
"Attributes": {
"SentTimestamp": "1702512255653",
"ApproximateReceiveCount": "1",
"ApproximateFirstReceiveTimestamp": "1702512255660",
"SenderId": "127.0.0.1",
},
"MD5OfMessageAttributes": "5346f2cd7c539a880febaf9112a86921",
"MessageAttributes": {"TaskName": {"StringValue": "my_task", "DataType": "String"}},
}
Loading

0 comments on commit 43929c1

Please sign in to comment.