diff --git a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md index 0ee6fa8091d..a5fab6c7421 100644 --- a/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md +++ b/daprdocs/content/en/getting-started/quickstarts/workflow-quickstart.md @@ -28,7 +28,7 @@ In this guide, you'll: Currently, you can experience the Dapr Workflow using the .NET SDK. -{{< tabs ".NET" >}} +{{< tabs ".NET" "Python" >}} {{% codetab %}} @@ -254,8 +254,234 @@ The `Activities` directory holds the four workflow activities used by the workfl - `ProcessPaymentActivity.cs` - `UpdateInventoryActivity.cs` +{{% /codetab %}} + + +{{% codetab %}} + +### Step 1: Pre-requisites + +For this example, you will need: + +- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started). +- [Python 3.7+ installed](https://www.python.org/downloads/). + +- [Docker Desktop](https://www.docker.com/products/docker-desktop) + + +### Step 2: Set up the environment + +Clone the [sample provided in the Quickstarts repo](https://github.com/dapr/quickstarts/tree/master/workflows). + +```bash +git clone https://github.com/dapr/quickstarts.git +``` +In a new terminal window, navigate to the `order-processor` directory: +```bash +cd workflows/python/sdk/order-processor +``` + +Install the Dapr Python SDK package: + +```bash +pip3 install -r requirements.txt +``` + +### Step 3: Run the order processor app + +In the terminal, start the order processor app alongside a Dapr sidecar: + +```bash +dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py +``` + +> **Note:** Since Python3.exe is not defined in Windows, you may need to use `python app.py` instead of `python3 app.py`. + +This starts the `order-processor` app with unique workflow ID and runs the workflow activities. + +Expected output: + +```bash +== APP == Starting order workflow, purchasing 10 of cars +== APP == 2023-06-06 09:35:52.945 durabletask-worker INFO: Successfully connected to 127.0.0.1:65406. Waiting for work items... +== APP == INFO:NotifyActivity:Received order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at $165000 ! +== APP == INFO:VerifyInventoryActivity:Verifying inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da of 10 cars +== APP == INFO:VerifyInventoryActivity:There are 100 Cars available for purchase +== APP == INFO:RequestApprovalActivity:Requesting approval for payment of 165000 USD for 10 cars +== APP == 2023-06-06 09:36:05.969 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da Event raised: manager_approval +== APP == INFO:NotifyActivity:Payment for order f4e1926e-3721-478d-be8a-f5bebd1995da has been approved! +== APP == INFO:ProcessPaymentActivity:Processing payment: f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at 165000 USD +== APP == INFO:ProcessPaymentActivity:Payment for request ID f4e1926e-3721-478d-be8a-f5bebd1995da processed successfully +== APP == INFO:UpdateInventoryActivity:Checking inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars +== APP == INFO:UpdateInventoryActivity:There are now 90 cars left in stock +== APP == INFO:NotifyActivity:Order f4e1926e-3721-478d-be8a-f5bebd1995da has completed! +== APP == 2023-06-06 09:36:06.106 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Orchestration completed with status: COMPLETED +== APP == Workflow completed! Result: Completed +== APP == Purchase of item is Completed +``` + +### (Optional) Step 4: View in Zipkin + +If you have Zipkin configured for Dapr locally on your machine, you can view the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`). + + + +### What happened? + +When you ran `dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py`: + +1. A unique order ID for the workflow is generated (in the above example, `f4e1926e-3721-478d-be8a-f5bebd1995da`) and the workflow is scheduled. +1. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received. +1. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock. +1. Your workflow starts and notifies you of its status. +1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `f4e1926e-3721-478d-be8a-f5bebd1995da` and confirms if successful. +1. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed. +1. The `NotifyActivity` workflow activity sends a notification saying that order `f4e1926e-3721-478d-be8a-f5bebd1995da` has completed. +1. The workflow terminates as completed. + +#### `order-processor/app.py` + +In the application's program file: +- The unique workflow order ID is generated +- The workflow is scheduled +- The workflow status is retrieved +- The workflow and the workflow activities it invokes are registered + +```python +class WorkflowConsoleApp: + def main(self): + # Register workflow and activities + workflowRuntime = WorkflowRuntime(settings.DAPR_RUNTIME_HOST, settings.DAPR_GRPC_PORT) + workflowRuntime.register_workflow(order_processing_workflow) + workflowRuntime.register_activity(notify_activity) + workflowRuntime.register_activity(requst_approval_activity) + workflowRuntime.register_activity(verify_inventory_activity) + workflowRuntime.register_activity(process_payment_activity) + workflowRuntime.register_activity(update_inventory_activity) + workflowRuntime.start() + + print("==========Begin the purchase of item:==========", flush=True) + item_name = default_item_name + order_quantity = 10 + + total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost + order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost) + + # Start Workflow + print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True) + start_resp = daprClient.start_workflow(workflow_component=workflow_component, + workflow_name=workflow_name, + input=order) + _id = start_resp.instance_id + + def prompt_for_approval(daprClient: DaprClient): + daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component, + event_name="manager_approval", event_data={'approval': True}) + + approval_seeked = False + start_time = datetime.now() + while True: + time_delta = datetime.now() - start_time + state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) + if not state: + print("Workflow not found!") # not expected + elif state.runtime_status == "Completed" or\ + state.runtime_status == "Failed" or\ + state.runtime_status == "Terminated": + print(f'Workflow completed! Result: {state.runtime_status}', flush=True) + break + if time_delta.total_seconds() >= 10: + state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component) + if total_cost > 50000 and ( + state.runtime_status != "Completed" or + state.runtime_status != "Failed" or + state.runtime_status != "Terminated" + ) and not approval_seeked: + approval_seeked = True + threading.Thread(target=prompt_for_approval(daprClient), daemon=True).start() + + print("Purchase of item is ", state.runtime_status, flush=True) + + def restock_inventory(self, daprClient: DaprClient, baseInventory): + for key, item in baseInventory.items(): + print(f'item: {item}') + item_str = f'{{"name": "{item.item_name}", "quantity": {item.quantity},\ + "per_item_cost": {item.per_item_cost}}}' + daprClient.save_state("statestore-actors", key, item_str) + +if __name__ == '__main__': + app = WorkflowConsoleApp() + app.main() +``` + +#### `order-processor/workflow.py` + +In `workflow.py`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities). + +```python + def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: OrderPayload): + """Defines the order processing workflow. + When the order is received, the inventory is checked to see if there is enough inventory to + fulfill the order. If there is enough inventory, the payment is processed and the inventory is + updated. If there is not enough inventory, the order is rejected. + If the total order is greater than $50,000, the order is sent to a manager for approval. + """ + order_id = ctx.instance_id + order_payload=json.loads(order_payload_str) + yield ctx.call_activity(notify_activity, + input=Notification(message=('Received order ' +order_id+ ' for ' + +f'{order_payload["quantity"]}' +' ' +f'{order_payload["item_name"]}' + +' at $'+f'{order_payload["total_cost"]}' +' !'))) + result = yield ctx.call_activity(verify_inventory_activity, + input=InventoryRequest(request_id=order_id, + item_name=order_payload["item_name"], + quantity=order_payload["quantity"])) + if not result.success: + yield ctx.call_activity(notify_activity, + input=Notification(message='Insufficient inventory for ' + +f'{order_payload["item_name"]}'+'!')) + return OrderResult(processed=False) + + if order_payload["total_cost"] > 50000: + yield ctx.call_activity(requst_approval_activity, input=order_payload) + approval_task = ctx.wait_for_external_event("manager_approval") + timeout_event = ctx.create_timer(timedelta(seconds=200)) + winner = yield when_any([approval_task, timeout_event]) + if winner == timeout_event: + yield ctx.call_activity(notify_activity, + input=Notification(message='Payment for order '+order_id + +' has been cancelled due to timeout!')) + return OrderResult(processed=False) + approval_result = yield approval_task + if approval_result["approval"]: + yield ctx.call_activity(notify_activity, input=Notification( + message=f'Payment for order {order_id} has been approved!')) + else: + yield ctx.call_activity(notify_activity, input=Notification( + message=f'Payment for order {order_id} has been rejected!')) + return OrderResult(processed=False) + + yield ctx.call_activity(process_payment_activity, input=PaymentRequest( + request_id=order_id, item_being_purchased=order_payload["item_name"], + amount=order_payload["total_cost"], quantity=order_payload["quantity"])) + + try: + yield ctx.call_activity(update_inventory_activity, + input=PaymentRequest(request_id=order_id, + item_being_purchased=order_payload["item_name"], + amount=order_payload["total_cost"], + quantity=order_payload["quantity"])) + except Exception: + yield ctx.call_activity(notify_activity, + input=Notification(message=f'Order {order_id} Failed!')) + return OrderResult(processed=False) + + yield ctx.call_activity(notify_activity, input=Notification( + message=f'Order {order_id} has completed!')) + return OrderResult(processed=True) +``` {{% /codetab %}} diff --git a/daprdocs/static/images/workflow-trace-spans-zipkin-python.png b/daprdocs/static/images/workflow-trace-spans-zipkin-python.png new file mode 100644 index 00000000000..87fb35af8d4 Binary files /dev/null and b/daprdocs/static/images/workflow-trace-spans-zipkin-python.png differ