-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_workflow.py
44 lines (32 loc) · 1.14 KB
/
run_workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
from aiohttp import web
from temporalio.client import Client
from activities import Purchase
from workflows import OneClickBuyWorkflow, PurchaseStatus
async def init_app():
app = web.Application()
app.client = await Client.connect("localhost:7233")
app.router.add_post("/purchase", handle_purchase)
return app
async def handle_purchase(request) -> web.Response:
data = await request.json()
print(f"Received purchase request: {data}")
client = request.app.client
handle = await client.start_workflow(
OneClickBuyWorkflow.run,
Purchase(
item_id=data.get("item_id", "unknown"),
user_id=data.get("user_id", "unknown"),
),
id=f"{data.get('user_id', 'unknown')}-purchase1",
task_queue="my-task-queue",
)
await handle.cancel()
status = await handle.query(OneClickBuyWorkflow.current_status)
assert status == PurchaseStatus.CANCELLED
return web.json_response(
{"status": "success", "workflow_status": status.name}, status=200
)
if __name__ == "__main__":
app = asyncio.run(init_app())
web.run_app(app, port=5001)