Skip to content

Activity worker for performing activity tasks from AWS StepFunctions

License

Notifications You must be signed in to change notification settings

AmberEngine/stepfunctions_activity_worker

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

StepFunctions Activity Worker

A worker that listens to a StepFunctions activity and executes a provided function using the inputs from the activity task.

The StepFunctions Activity Worker encapsulates all the parts of communicating with the StepFunctions API so you don't have to worry about task heartbeats or maintaining task tokens and success/failure scenarios; all you have to worry about is executing the task.

Installation

Install from PyPI:

pip install stepfunctions_activity_worker

Usage

from stepfunctions_activity_worker import ActivityWorker


def my_task(**task_input):
    """Perform the task based on this task's input."""
    # Perform your task here! 
    return {"result": "done!"}


if __name__ == "__main__":
    activity_arn = "PLACE YOUR ACTIVITY ARN HERE"
    worker = ActivityWorker(activity_arn, my_task)
    worker.listen()

Warning

The ActivityWorker class, if not provided with a client argument on instantiation, will create a properly configured client from your default session.

However, if you are providing an already instantiated client to the ActivityWorker class, make sure it is proply configured to make StepFunctions API calls!

The GetActivityTask API call blocks for 60 seconds which matches the botocore.config.Config default read_timeout. This means that if the API response for GetActivityTask is not punctual (which it often isn't) it will cause unnecessary retry-requests & eventually bubble up an HTTP exception.

import boto3
import botocore
from stepfunctions_activity_worker import ActivityWorker

def my_task(**task_input):
    """Perform the task based on this task's input."""
    # Perform your task here! 
    return {"result": "done!"}
    
config = botocore.config.Config(
  read_timeout=70,
  # Insert other custom configuration here
)
stepfunctions = boto3.client('stepfunctions', config=config)

activity_worker = ActivityWorker(activity_arn, my_task, client=stepfunctions)