Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start of Python QS doc #3495

Merged
merged 14 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ In this guide, you'll:

Currently, you can experience the Dapr Workflow using the .NET SDK.

{{< tabs ".NET" >}}
{{< tabs ".NET" "Python" >}}

<!-- .NET -->
{{% codetab %}}
Expand Down Expand Up @@ -254,8 +254,234 @@ The `Activities` directory holds the four workflow activities used by the workfl
- `ProcessPaymentActivity.cs`
- `UpdateInventoryActivity.cs`

{{% /codetab %}}

<!-- Python -->
{{% codetab %}}

### Step 1: Pre-requisites

For this example, you will need:

- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started).
- [Python 3.7+ installed](https://www.python.org/downloads/).
<!-- IGNORE_LINKS -->
- [Docker Desktop](https://www.docker.com/products/docker-desktop)
<!-- END_IGNORE -->

### Step 2: Set up the environment

Clone the [sample provided in the Quickstarts repo](https://github.com/dapr/quickstarts/tree/master/workflows).

```bash
git clone https://github.com/dapr/quickstarts.git
```

In a new terminal window, navigate to the `order-processor` directory:

```bash
cd workflows/python/sdk/order-processor
```

greenie-msft marked this conversation as resolved.
Show resolved Hide resolved
Install the Dapr Python SDK package:

```bash
pip3 install -r requirements.txt
```

### Step 3: Run the order processor app

In the terminal, start the order processor app alongside a Dapr sidecar:

```bash
dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py
```

> **Note:** Since Python3.exe is not defined in Windows, you may need to use `python app.py` instead of `python3 app.py`.

This starts the `order-processor` app with unique workflow ID and runs the workflow activities.
greenie-msft marked this conversation as resolved.
Show resolved Hide resolved

Expected output:

```bash
== APP == Starting order workflow, purchasing 10 of cars
== APP == 2023-06-06 09:35:52.945 durabletask-worker INFO: Successfully connected to 127.0.0.1:65406. Waiting for work items...
hhunter-ms marked this conversation as resolved.
Show resolved Hide resolved
== APP == INFO:NotifyActivity:Received order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at $165000 !
== APP == INFO:VerifyInventoryActivity:Verifying inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da of 10 cars
== APP == INFO:VerifyInventoryActivity:There are 100 Cars available for purchase
== APP == INFO:RequestApprovalActivity:Requesting approval for payment of 165000 USD for 10 cars
== APP == 2023-06-06 09:36:05.969 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da Event raised: manager_approval
== APP == INFO:NotifyActivity:Payment for order f4e1926e-3721-478d-be8a-f5bebd1995da has been approved!
== APP == INFO:ProcessPaymentActivity:Processing payment: f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars at 165000 USD
== APP == INFO:ProcessPaymentActivity:Payment for request ID f4e1926e-3721-478d-be8a-f5bebd1995da processed successfully
== APP == INFO:UpdateInventoryActivity:Checking inventory for order f4e1926e-3721-478d-be8a-f5bebd1995da for 10 cars
== APP == INFO:UpdateInventoryActivity:There are now 90 cars left in stock
== APP == INFO:NotifyActivity:Order f4e1926e-3721-478d-be8a-f5bebd1995da has completed!
== APP == 2023-06-06 09:36:06.106 durabletask-worker INFO: f4e1926e-3721-478d-be8a-f5bebd1995da: Orchestration completed with status: COMPLETED
== APP == Workflow completed! Result: Completed
== APP == Purchase of item is Completed
```

### (Optional) Step 4: View in Zipkin

If you have Zipkin configured for Dapr locally on your machine, you can view the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`).

<img src="/images/workflow-trace-spans-zipkin-python.png" width=900 style="padding-bottom:15px;">

### What happened?

When you ran `dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py`:

1. A unique order ID for the workflow is generated (in the above example, `f4e1926e-3721-478d-be8a-f5bebd1995da`) and the workflow is scheduled.
1. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received.
1. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock.
1. Your workflow starts and notifies you of its status.
1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `f4e1926e-3721-478d-be8a-f5bebd1995da` and confirms if successful.
1. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed.
1. The `NotifyActivity` workflow activity sends a notification saying that order `f4e1926e-3721-478d-be8a-f5bebd1995da` has completed.
1. The workflow terminates as completed.

#### `order-processor/app.py`

In the application's program file:
- The unique workflow order ID is generated
- The workflow is scheduled
- The workflow status is retrieved
- The workflow and the workflow activities it invokes are registered

```python
class WorkflowConsoleApp:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Step 2, we asked to git clone quickstarts package.
Do we need to provide app.py and workflow.py here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, can you clarify what you're suggesting we change?Sorry, can you clarify what you're suggesting we change?

def main(self):
# Register workflow and activities
workflowRuntime = WorkflowRuntime(settings.DAPR_RUNTIME_HOST, settings.DAPR_GRPC_PORT)
workflowRuntime.register_workflow(order_processing_workflow)
workflowRuntime.register_activity(notify_activity)
workflowRuntime.register_activity(requst_approval_activity)
workflowRuntime.register_activity(verify_inventory_activity)
workflowRuntime.register_activity(process_payment_activity)
workflowRuntime.register_activity(update_inventory_activity)
workflowRuntime.start()

print("==========Begin the purchase of item:==========", flush=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we have missed few lines here, like daprClient instantiation etc.

ref: https://github.com/dapr/quickstarts/blob/master/workflows/python/sdk/order-processor/app.py#L34C9-L40

item_name = default_item_name
order_quantity = 10

total_cost = int(order_quantity) * baseInventory[item_name].per_item_cost
order = OrderPayload(item_name=item_name, quantity=int(order_quantity), total_cost=total_cost)

# Start Workflow
print(f'Starting order workflow, purchasing {order_quantity} of {item_name}', flush=True)
start_resp = daprClient.start_workflow(workflow_component=workflow_component,
workflow_name=workflow_name,
input=order)
_id = start_resp.instance_id

def prompt_for_approval(daprClient: DaprClient):
daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component,
event_name="manager_approval", event_data={'approval': True})

approval_seeked = False
start_time = datetime.now()
while True:
time_delta = datetime.now() - start_time
state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
if not state:
print("Workflow not found!") # not expected
elif state.runtime_status == "Completed" or\
state.runtime_status == "Failed" or\
state.runtime_status == "Terminated":
print(f'Workflow completed! Result: {state.runtime_status}', flush=True)
break
if time_delta.total_seconds() >= 10:
state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
if total_cost > 50000 and (
state.runtime_status != "Completed" or
state.runtime_status != "Failed" or
state.runtime_status != "Terminated"
) and not approval_seeked:
approval_seeked = True
threading.Thread(target=prompt_for_approval(daprClient), daemon=True).start()

print("Purchase of item is ", state.runtime_status, flush=True)

def restock_inventory(self, daprClient: DaprClient, baseInventory):
for key, item in baseInventory.items():
print(f'item: {item}')
item_str = f'{{"name": "{item.item_name}", "quantity": {item.quantity},\
"per_item_cost": {item.per_item_cost}}}'
daprClient.save_state("statestore-actors", key, item_str)

if __name__ == '__main__':
app = WorkflowConsoleApp()
app.main()
```

#### `order-processor/workflow.py`

In `workflow.py`, the workflow is defined as a class with all of its associated tasks (determined by workflow activities).

```python
def order_processing_workflow(ctx: DaprWorkflowContext, order_payload_str: OrderPayload):
"""Defines the order processing workflow.
When the order is received, the inventory is checked to see if there is enough inventory to
fulfill the order. If there is enough inventory, the payment is processed and the inventory is
updated. If there is not enough inventory, the order is rejected.
If the total order is greater than $50,000, the order is sent to a manager for approval.
"""
order_id = ctx.instance_id
order_payload=json.loads(order_payload_str)
yield ctx.call_activity(notify_activity,
input=Notification(message=('Received order ' +order_id+ ' for '
+f'{order_payload["quantity"]}' +' ' +f'{order_payload["item_name"]}'
+' at $'+f'{order_payload["total_cost"]}' +' !')))
result = yield ctx.call_activity(verify_inventory_activity,
input=InventoryRequest(request_id=order_id,
item_name=order_payload["item_name"],
quantity=order_payload["quantity"]))
if not result.success:
yield ctx.call_activity(notify_activity,
input=Notification(message='Insufficient inventory for '
+f'{order_payload["item_name"]}'+'!'))
return OrderResult(processed=False)

if order_payload["total_cost"] > 50000:
yield ctx.call_activity(requst_approval_activity, input=order_payload)
approval_task = ctx.wait_for_external_event("manager_approval")
timeout_event = ctx.create_timer(timedelta(seconds=200))
winner = yield when_any([approval_task, timeout_event])
if winner == timeout_event:
yield ctx.call_activity(notify_activity,
input=Notification(message='Payment for order '+order_id
+' has been cancelled due to timeout!'))
return OrderResult(processed=False)
approval_result = yield approval_task
if approval_result["approval"]:
yield ctx.call_activity(notify_activity, input=Notification(
message=f'Payment for order {order_id} has been approved!'))
else:
yield ctx.call_activity(notify_activity, input=Notification(
message=f'Payment for order {order_id} has been rejected!'))
return OrderResult(processed=False)

yield ctx.call_activity(process_payment_activity, input=PaymentRequest(
request_id=order_id, item_being_purchased=order_payload["item_name"],
amount=order_payload["total_cost"], quantity=order_payload["quantity"]))

try:
yield ctx.call_activity(update_inventory_activity,
input=PaymentRequest(request_id=order_id,
item_being_purchased=order_payload["item_name"],
amount=order_payload["total_cost"],
quantity=order_payload["quantity"]))
except Exception:
yield ctx.call_activity(notify_activity,
input=Notification(message=f'Order {order_id} Failed!'))
return OrderResult(processed=False)

yield ctx.call_activity(notify_activity, input=Notification(
message=f'Order {order_id} has completed!'))
return OrderResult(processed=True)
```
{{% /codetab %}}


Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.