Skip to content

Commit

Permalink
Initial Bulk Subscribe functionality (#1009)
Browse files Browse the repository at this point in the history
Signed-off-by: Yash Nisar <yashnisar@microsoft.com>
  • Loading branch information
yash-nisar authored Feb 8, 2023
1 parent c863582 commit 0f1e1bf
Show file tree
Hide file tree
Showing 21 changed files with 852 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
namespace ControllerSample.Controllers
{
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Dapr;
using Dapr.AspNetCore;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -86,6 +88,51 @@ public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromS
return state.Value;
}

/// <summary>
/// Method for depositing multiple times to the account as specified in transaction.
/// </summary>
/// <param name="bulkMessage">List of entries of type BulkMessageModel received from dapr.</param>
/// <param name="daprClient">State client to interact with Dapr runtime.</param>
/// <returns>A <see cref="Task{TResult}"/> representing the result of the asynchronous operation.</returns>
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
[Topic("pubsub", "multideposit", "amountDeadLetterTopic", false)]
[BulkSubscribe("multideposit", 500, 2000)]
[HttpPost("multideposit")]
public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody] BulkSubscribeMessage<BulkMessageModel<Transaction>>
bulkMessage, [FromServices] DaprClient daprClient)
{
logger.LogInformation("Enter bulk deposit");

List<BulkSubscribeAppResponseEntry> entries = new List<BulkSubscribeAppResponseEntry>();

foreach (var entry in bulkMessage.Entries)
{
try
{
var transaction = entry.Event.Data;

var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
logger.LogInformation("Id is {0}, the amount to be deposited is {1}",
transaction.Id, transaction.Amount);

if (transaction.Amount < 0m)
{
return BadRequest(new { statusCode = 400, message = "bad request" });
}

state.Value.Balance += transaction.Amount;
logger.LogInformation("Balance is {0}", state.Value.Balance);
await state.SaveAsync();
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
logger.LogError(e.Message);
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(entries);
}

/// <summary>
/// Method for viewing the error message when the deposit/withdrawal amounts
/// are negative.
Expand Down Expand Up @@ -190,6 +237,7 @@ public async Task<ActionResult<Account>> RawDeposit([FromBody] JsonDocument rawT

/// <summary>
/// Method for returning a BadRequest result which will cause Dapr sidecar to throw an RpcException
/// </summary>
[HttpPost("throwException")]
public async Task<ActionResult<Account>> ThrowException(Transaction transaction, [FromServices] DaprClient daprClient)
{
Expand Down
86 changes: 85 additions & 1 deletion examples/AspNetCore/ControllerSample/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ This sample shows using Dapr with ASP.NET Core controllers. This application is
It exposes the following endpoints over HTTP:
- GET `/{account}`: Get the balance for the account specified by `id`
- POST `/deposit`: Accepts a JSON payload to deposit money to an account
- POST `/multideposit`: Accepts a JSON payload to deposit money multiple times to a bulk subscribed topic
- POST `/withdraw`: Accepts a JSON payload to withdraw money from an account

The application also registers for pub/sub with the `deposit` and `withdraw` topics.
The application also registers for pub/sub with the `deposit`, `multideposit` and `withdraw` topics.

## Prerequisitess

Expand Down Expand Up @@ -57,7 +58,76 @@ Output:
```
---
**Deposit Money multiple times to a bulk subscribed topic**
On Linux, MacOS:
```
curl -X POST http://127.0.0.1:5000/multideposit \
-H 'Content-Type: application/json' \
-d '{
"entries":[
{
"entryId":"653dd9f5-f375-499b-8b2a-c4599bbd36b0",
"event":{
"data":{
"amount":10,
"id":"17"
},
"datacontenttype":"application/json",
"id":"DaprClient",
"pubsubname":"pubsub",
"source":"Dapr",
"specversion":"1.0",
"topic":"multideposit",
"type":"com.dapr.event.sent"
},
"metadata":null,
"contentType":"application/cloudevents+json"
},
{
"entryId":"7ea8191e-1e62-46d0-9ba8-ff6e571351cc",
"event":{
"data":{
"amount":20,
"id":"17"
},
"datacontenttype":"application/json",
"id":"DaprClient",
"pubsubname":"pubsub",
"source":"Dapr",
"specversion":"1.0",
"topic":"multideposit",
"type":"com.dapr.event.sent"
},
"metadata":null,
"contentType":"application/cloudevents+json"
}
],
"id":"fa68c580-1b96-40d3-aa2c-04bab05e954e",
"metadata":{
"pubsubName":"pubsub"
},
"pubsubname":"pubsub",
"topic":"multideposit",
"type":"com.dapr.event.sent.bulk"
}'
```
Output:
```
{
"statuses":[
{
"entryId":"653dd9f5-f375-499b-8b2a-c4599bbd36b0",
"status":"SUCCESS"
},
{
"entryId":"7ea8191e-1e62-46d0-9ba8-ff6e571351cc",
"status":"SUCCESS"
}
]
}
```
---
**Withdraw Money**
On Linux, MacOS:
```sh
Expand Down Expand Up @@ -213,6 +283,20 @@ public async Task<ActionResult<Account>> Deposit(...)
`[Topic(...)]` associates a pub/sub named `pubsub` (this is the default configured by the Dapr CLI) pub/sub topic `deposit` with this endpoint.
---
```C#
[Topic("pubsub", "multideposit", "amountDeadLetterTopic", false)]
[BulkSubscribe("multideposit")]
[HttpPost("multideposit")]
public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody] BulkSubscribeMessage<BulkMessageModel<Transaction>>
bulkMessage, [FromServices] DaprClient daprClient)
```
`[BulkSubscribe(...)]` associates a topic with the name mentioned in the attribute with the ability to be bulk subscribed to. It can take additional parameters like `MaxMessagesCount` and `MaxAwaitDurationMs`.
If those parameters are not supplied, the defaults of 100 and 1000ms are set.
However, you need to use `BulkSubscribeMessage<BulkMessageModel<T>>` in the input and that you need to return the `BulkSubscribeAppResponse` as well.
---
```C#
Expand Down
90 changes: 86 additions & 4 deletions examples/AspNetCore/RoutingSample/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
This sample shows using Dapr with ASP.NET Core routing. This application is a simple and not-so-secure banking application. The application uses the Dapr state-store for its data storage.

It exposes the following endpoints over HTTP:
- GET `/{id}`: Get the balance for the account specified by `id`
- POST `/deposit`: Accepts a JSON payload to deposit money to an account
- POST `/withdraw`: Accepts a JSON payload to withdraw money from an account
- GET `/{id}`: Get the balance for the account specified by `id`
- POST `/deposit`: Accepts a JSON payload to deposit money to an account
- POST `/multideposit`: Accepts a JSON payload to deposit money multiple times to a bulk subscribed topic
- POST `/withdraw`: Accepts a JSON payload to withdraw money from an account

The application also registers for pub/sub with the `deposit` and `withdraw` topics.
The application also registers for pub/sub with the `deposit`, `multideposit`, and `withdraw` topics.

## Prerequisites

Expand Down Expand Up @@ -56,6 +57,76 @@ Output:
```
---
**Deposit Money multiple times to a bulk subscribed topic**
On Linux, MacOS:
```
curl -X POST http://127.0.0.1:5000/multideposit \
-H 'Content-Type: application/json' \
-d '{
"entries":[
{
"entryId":"653dd9f5-f375-499b-8b2a-c4599bbd36b0",
"event":{
"data":{
"amount":10,
"id":"17"
},
"datacontenttype":"application/json",
"id":"DaprClient",
"pubsubname":"pubsub",
"source":"Dapr",
"specversion":"1.0",
"topic":"multideposit",
"type":"com.dapr.event.sent"
},
"metadata":null,
"contentType":"application/cloudevents+json"
},
{
"entryId":"7ea8191e-1e62-46d0-9ba8-ff6e571351cc",
"event":{
"data":{
"amount":20,
"id":"17"
},
"datacontenttype":"application/json",
"id":"DaprClient",
"pubsubname":"pubsub",
"source":"Dapr",
"specversion":"1.0",
"topic":"multideposit",
"type":"com.dapr.event.sent"
},
"metadata":null,
"contentType":"application/cloudevents+json"
}
],
"id":"fa68c580-1b96-40d3-aa2c-04bab05e954e",
"metadata":{
"pubsubName":"pubsub"
},
"pubsubname":"pubsub",
"topic":"multideposit",
"type":"com.dapr.event.sent.bulk"
}'
```
Output:
```
{
"statuses":[
{
"entryId":"653dd9f5-f375-499b-8b2a-c4599bbd36b0",
"status":"SUCCESS"
},
{
"entryId":"7ea8191e-1e62-46d0-9ba8-ff6e571351cc",
"status":"SUCCESS"
}
]
}
```
---
**Withdraw Money**
On Linux, MacOS:
Expand Down Expand Up @@ -194,6 +265,7 @@ app.UseEndpoints(endpoints =>
endpoints.MapGet("{id}", Balance);
endpoints.MapPost("deposit", Deposit).WithTopic(PubsubName, "deposit");
endpoints.MapPost("multideposit", MultiDeposit).WithTopic(multiDepositTopicOptions).WithBulkSubscribe(bulkSubscribeTopicOptions);
endpoints.MapPost("withdraw", Withdraw).WithTopic(PubsubName, "withdraw");
});
```
Expand All @@ -213,9 +285,19 @@ var withdrawTopicOptions = new TopicOptions();
withdrawTopicOptions.PubsubName = PubsubName;
withdrawTopicOptions.Name = "withdraw";
withdrawTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
var multiDepositTopicOptions = new TopicOptions
{ PubsubName = PubsubName, Name = "multideposit" };
var bulkSubscribeTopicOptions = new BulkSubscribeTopicOptions
{
TopicName = "multideposit", MaxMessagesCount = 250, MaxAwaitDurationMs = 1000
};
```
`WithTopic(...)` now takes the `TopicOptions(..)` instance that defines configurations for the subscribe endpoint.
`WithBulkSubscribe(...)` now takes the `BulkSubscribeTopicOptions(..)` instance that defines configurations for the bulk subscribe endpoint.
---
```C#
Expand Down
2 changes: 1 addition & 1 deletion examples/AspNetCore/RoutingSample/RoutingSample.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net6</TargetFramework>
</PropertyGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit 0f1e1bf

Please sign in to comment.