-
Notifications
You must be signed in to change notification settings - Fork 0
/
shopping_cart.py
101 lines (88 loc) · 3.55 KB
/
shopping_cart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import asyncio
from datetime import timedelta
from typing import List
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from activities import Product, products, send_email
@workflow.defn
class ShoppingCartWorkflow:
def __init__(self) -> None:
self._add_to_cart: asyncio.Queue[str] = asyncio.Queue()
self._remove_from_cart: asyncio.Queue[str] = asyncio.Queue()
self._exit = False
self._cart: List[Product] = []
@workflow.run
async def run(self) -> float:
cart: List[Product] = []
while True:
await workflow.wait_condition(
lambda: not self._add_to_cart.empty()
or not self._remove_from_cart.empty()
or self._exit
)
while not self._add_to_cart.empty():
product_id = int(self._add_to_cart.get_nowait())
if product_id in products:
product = products[product_id]
# add 1 to quantity if product already in cart
product_in_cart = next(
(p for p in self._cart if p.name == product.name), None
)
if product_in_cart:
product_in_cart.quantity += 1
else:
new_product = Product(
name=product.name,
description=product.description,
price=product.price,
quantity=1,
)
self._cart.append(new_product)
print(
f"Added {new_product.name} to cart, costs {new_product.price}."
)
cart.append(new_product)
else:
print(f"Product with ID {product_id} not found.")
while not self._remove_from_cart.empty():
product_id = int(self._remove_from_cart.get_nowait())
if product_id in products:
product = products[product_id]
# add 1 to quantity if product already in cart
product_in_cart = next(
(p for p in self._cart if p.name == product.name), None
)
if product_in_cart:
product_in_cart.quantity -= 1
if product_in_cart.quantity == 0:
self._cart.remove(product_in_cart)
print(
f"Removed {product_in_cart.name} from cart, costs {product_in_cart.price}."
)
cart.remove(product_in_cart)
else:
print(f"Product with ID {product_id} not found.")
if self._exit:
return cart
return cart
@workflow.signal
async def add_to_cart(self, item: str) -> None:
await self._add_to_cart.put(item)
@workflow.signal
async def remove_from_cart(self, item: str) -> None:
await self._remove_from_cart.put(item)
@workflow.signal
def exit(self) -> None:
self._exit = True
@workflow.query
def cart_details(self):
return self._cart
@workflow.defn
class ScheduleWorkflow:
@workflow.run
async def run(self, cart: Product) -> None:
await workflow.execute_activity(
send_email,
cart,
start_to_close_timeout=timedelta(seconds=15),
)