Skip to content

Commit

Permalink
Add dead letter topic support
Browse files Browse the repository at this point in the history
Closes dapr#897

Signed-off-by: Yash Nisar <yashnisar@microsoft.com>
  • Loading branch information
yash-nisar committed Aug 22, 2022
1 parent 62c1d72 commit 400347c
Show file tree
Hide file tree
Showing 17 changed files with 364 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,38 +64,65 @@ public ActionResult<Account> Get([FromState(StoreName)] StateEntry<Account> acco
/// <param name="daprClient">State client to interact with Dapr runtime.</param>
/// <returns>A <see cref="Task{TResult}"/> representing the result of the asynchronous operation.</returns>
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
[Topic("pubsub", "deposit")]
[Topic("pubsub", "deposit", "amountDeadLetterTopic", false)]
[HttpPost("deposit")]
public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromServices] DaprClient daprClient)
{
logger.LogDebug("Enter deposit");
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
logger.LogDebug("Id is {0}, the amount to be deposited is {1}", transaction.Id, transaction.Amount);

if (transaction.Amount < 0m)
{
return BadRequest(new { statusCode = 400, message = "bad request" });
}

state.Value.Balance += transaction.Amount;
logger.LogDebug("Balance is {0}", state.Value.Balance);
await state.SaveAsync();
return state.Value;
}

/// <summary>
/// Method for viewing the error message when the deposit/withdrawal amounts
/// are negative.
/// </summary>
/// <param name="transaction">Transaction info.</param>
[Topic("pubsub", "amountDeadLetterTopic")]
[HttpPost("deadLetterTopicRoute")]
public ActionResult<Account> ViewErrorMessage(Transaction transaction)
{
logger.LogDebug("The amount cannot be negative: {0}", transaction.Amount);
return Ok();
}

/// <summary>
/// Method for withdrawing from account as specified in transaction.
/// </summary>
/// <param name="transaction">Transaction info.</param>
/// <param name="daprClient">State client to interact with Dapr runtime.</param>
/// <returns>A <see cref="Task{TResult}"/> representing the result of the asynchronous operation.</returns>
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
[Topic("pubsub", "withdraw")]
[Topic("pubsub", "withdraw", "amountDeadLetterTopic", false)]
[HttpPost("withdraw")]
public async Task<ActionResult<Account>> Withdraw(Transaction transaction, [FromServices] DaprClient daprClient)
{
logger.LogDebug("Enter withdraw");
logger.LogDebug("Enter withdraw method...");
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
logger.LogDebug("Id is {0}, the amount to be withdrawn is {1}", transaction.Id, transaction.Amount);

if (state.Value == null)
{
return this.NotFound();
}
if (transaction.Amount < 0m)
{
return BadRequest(new { statusCode = 400, message = "bad request" });
}

state.Value.Balance -= transaction.Amount;
logger.LogDebug("Balance is {0}", state.Value.Balance);
await state.SaveAsync();
return state.Value;
}
Expand Down Expand Up @@ -134,7 +161,7 @@ public async Task<ActionResult<Account>> WithdrawV2(TransactionV2 transaction, [
[HttpPost("throwException")]
public async Task<ActionResult<Account>> ThrowException(Transaction transaction, [FromServices] DaprClient daprClient)
{
Console.WriteLine("Enter ThrowException");
logger.LogDebug("Enter ThrowException");
var task = Task.Delay(10);
await task;
return BadRequest(new { statusCode = 400, message = "bad request" });
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
25 changes: 25 additions & 0 deletions examples/AspNetCore/ControllerSample/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,32 @@ On Windows:
dapr publish --pubsub pubsub --publish-app-id controller -t deposit -d "{\"id\": \"17\", \"amount\": 15 }"
```
---
**Dead Letter Topic example (pubsub)**
Publish an event using the Dapr cli with an incorrect input, i.e. negative amount:
Deposit:
On Linux, MacOS:
```sh
dapr publish --pubsub pubsub --publish-app-id controller -t deposit -d '{"id": "17", "amount": -15 }'
```
On Windows:
```sh
dapr publish --pubsub pubsub --publish-app-id controller -t deposit -d "{\"id\": \"17\", \"amount\": -15 }"
```
Withdraw:
On Linux, MacOS:
```sh
dapr publish --pubsub pubsub --publish-app-id controller -t withdraw -d '{"id": "17", "amount": -15 }'
```
On Windows:
```sh
dapr publish --pubsub pubsub --publish-app-id controller -t withdraw -d "{\"id\": \"17\", \"amount\": -15 }"
```
First a message is sent from a publisher on a `deposit` or `withdraw` topic. Dapr receives the message on behalf of a subscriber application, however the `deposit` or `withdraw` topic message fails to be delivered to the `/deposit` or `/withdraw` endpoint on the application, even after retries. As a result of the failure to deliver, the message is forwarded to the `amountDeadLetterTopic` topic which delivers this to the `/deadLetterTopicRoute` endpoint.
---
## Code Samples
*All of the interesting code in this sample is in Startup.cs and Controllers/SampleController.cs*
Expand Down
3 changes: 1 addition & 2 deletions examples/AspNetCore/ControllerSample/Transaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ public class Transaction

/// <summary>
/// Gets or sets amount for the transaction.
/// </summary>
[Range(0, double.MaxValue)]
/// </summary
public decimal Amount { get; set; }
}
}
36 changes: 36 additions & 0 deletions examples/AspNetCore/RoutingSample/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,31 @@ On Windows:
dapr publish --pubsub pubsub --publish-app-id routing -t deposit -d "{\"id\": \"17\", \"amount\": 15 }"
```
---
**Dead Letter Topic example (pubsub)**
Publish an event using the Dapr cli with an incorrect input, i.e. negative amount:
Deposit:
On Linux, MacOS:
```sh
dapr publish --pubsub pubsub --publish-app-id routing -t deposit -d '{"id": "17", "amount": -15 }'
```
On Windows:
```sh
dapr publish --pubsub pubsub --publish-app-id routing -t deposit -d "{\"id\": \"17\", \"amount\": -15 }"
```
Withdraw:
On Linux, MacOS:
```sh
dapr publish --pubsub pubsub --publish-app-id routing -t withdraw -d '{"id": "17", "amount": -15 }'
```
On Windows:
```sh
dapr publish --pubsub pubsub --publish-app-id routing -t withdraw -d "{\"id\": \"17\", \"amount\": -15 }"
```
First a message is sent from a publisher on a `deposit` or `withdraw` topic. Dapr receives the message on behalf of a subscriber application, however the `deposit` or `withdraw` topic message fails to be delivered to the `/deposit` or `/withdraw` endpoint on the application, even after retries. As a result of the failure to deliver, the message is forwarded to the `amountDeadLetterTopic` topic which delivers this to the `/deadLetterTopicRoute` endpoint.
---
## Code Samples
*All of the interesting code in this sample is in Startup.cs*
Expand Down Expand Up @@ -179,6 +203,18 @@ app.UseEndpoints(endpoints =>
`MapGet(...)` and `MapPost(...)` are provided by ASP.NET Core routing - these are used to setup endpoints to handle HTTP requests.
`WithTopic(...)` associates an endpoint with a pub/sub topic.
```C#
var depositTopicOptions = new TopicOptions();
depositTopicOptions.PubsubName = PubsubName;
depositTopicOptions.Name = "deposit";
depositTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
var withdrawTopicOptions = new TopicOptions();
withdrawTopicOptions.PubsubName = PubsubName;
withdrawTopicOptions.Name = "withdraw";
withdrawTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
```
`WithTopic(...)` now takes the `TopicOptions(..)` instance that defines configurations for the subscribe endpoint.
---
Expand Down
66 changes: 47 additions & 19 deletions examples/AspNetCore/RoutingSample/Startup.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,13 +16,15 @@ namespace RoutingSample
using System;
using System.Text.Json;
using System.Threading.Tasks;
using Dapr;
using Dapr.Client;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

/// <summary>
/// Startup class.
Expand Down Expand Up @@ -74,7 +76,8 @@ public void ConfigureServices(IServiceCollection services)
/// <param name="app">Application builder.</param>
/// <param name="env">Webhost environment.</param>
/// <param name="serializerOptions">Options for JSON serialization.</param>
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, JsonSerializerOptions serializerOptions)
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, JsonSerializerOptions serializerOptions,
ILogger<Startup> logger)
{
if (env.IsDevelopment())
{
Expand All @@ -89,40 +92,52 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, JsonSeri
{
endpoints.MapSubscribeHandler();
var depositTopicOptions = new TopicOptions();
depositTopicOptions.PubsubName = PubsubName;
depositTopicOptions.Name = "deposit";
depositTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
var withdrawTopicOptions = new TopicOptions();
withdrawTopicOptions.PubsubName = PubsubName;
withdrawTopicOptions.Name = "withdraw";
withdrawTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
endpoints.MapGet("{id}", Balance);
endpoints.MapPost("deposit", Deposit).WithTopic(PubsubName, "deposit");
endpoints.MapPost("withdraw", Withdraw).WithTopic(PubsubName, "withdraw");
endpoints.MapPost("deposit", Deposit).WithTopic(depositTopicOptions);
endpoints.MapPost("deadLetterTopicRoute", ViewErrorMessage).WithTopic(PubsubName, "amountDeadLetterTopic");
endpoints.MapPost("withdraw", Withdraw).WithTopic(withdrawTopicOptions);
});

async Task Balance(HttpContext context)
{
Console.WriteLine("Enter Balance");
logger.LogDebug("Enter Balance");
var client = context.RequestServices.GetRequiredService<DaprClient>();

var id = (string)context.Request.RouteValues["id"];
Console.WriteLine("id is {0}", id);
logger.LogDebug("id is {0}", id);
var account = await client.GetStateAsync<Account>(StoreName, id);
if (account == null)
{
Console.WriteLine("Account not found");
logger.LogDebug("Account not found");
context.Response.StatusCode = 404;
return;
}

Console.WriteLine("Account balance is {0}", account.Balance);
logger.LogDebug("Account balance is {0}", account.Balance);

context.Response.ContentType = "application/json";
await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions);
}

async Task Deposit(HttpContext context)
{
Console.WriteLine("Enter Deposit");

var client = context.RequestServices.GetRequiredService<DaprClient>();
logger.LogDebug("Enter Deposit");

var client = context.RequestServices.GetRequiredService<DaprClient>();
var transaction = await JsonSerializer.DeserializeAsync<Transaction>(context.Request.Body, serializerOptions);
Console.WriteLine("Id is {0}, Amount is {1}", transaction.Id, transaction.Amount);

logger.LogDebug("Id is {0}, Amount is {1}", transaction.Id, transaction.Amount);

var account = await client.GetStateAsync<Account>(StoreName, transaction.Id);
if (account == null)
{
Expand All @@ -131,43 +146,56 @@ async Task Deposit(HttpContext context)

if (transaction.Amount < 0m)
{
Console.WriteLine("Invalid amount");
logger.LogDebug("Invalid amount");
context.Response.StatusCode = 400;
return;
}

account.Balance += transaction.Amount;
await client.SaveStateAsync(StoreName, transaction.Id, account);
Console.WriteLine("Balance is {0}", account.Balance);
logger.LogDebug("Balance is {0}", account.Balance);

context.Response.ContentType = "application/json";
await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions);
}

async Task ViewErrorMessage(HttpContext context)
{
var client = context.RequestServices.GetRequiredService<DaprClient>();
var transaction = await JsonSerializer.DeserializeAsync<Transaction>(context.Request.Body, serializerOptions);

logger.LogDebug("The amount cannot be negative: {0}", transaction.Amount);

return;
}

async Task Withdraw(HttpContext context)
{
Console.WriteLine("Enter Withdraw");
logger.LogDebug("Enter Withdraw");

var client = context.RequestServices.GetRequiredService<DaprClient>();
var transaction = await JsonSerializer.DeserializeAsync<Transaction>(context.Request.Body, serializerOptions);
Console.WriteLine("Id is {0}", transaction.Id);

logger.LogDebug("Id is {0}, Amount is {1}", transaction.Id, transaction.Amount);

var account = await client.GetStateAsync<Account>(StoreName, transaction.Id);
if (account == null)
{
Console.WriteLine("Account not found");
logger.LogDebug("Account not found");
context.Response.StatusCode = 404;
return;
}

if (transaction.Amount < 0m)
{
Console.WriteLine("Invalid amount");
logger.LogDebug("Invalid amount");
context.Response.StatusCode = 400;
return;
}

account.Balance -= transaction.Amount;
await client.SaveStateAsync(StoreName, transaction.Id, account);
Console.WriteLine("Balance is {0}", account.Balance);
logger.LogDebug("Balance is {0}", account.Balance);

context.Response.ContentType = "application/json";
await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions);
Expand Down
41 changes: 41 additions & 0 deletions src/Dapr.AspNetCore/DaprEndpointConventionBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ namespace Microsoft.AspNetCore.Builder
{
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Xml.Linq;
using Dapr;
using Grpc.Core;

/// <summary>
/// Contains extension methods for <see cref="IEndpointConventionBuilder" />.
Expand Down Expand Up @@ -103,5 +106,43 @@ public static T WithTopic<T>(this T builder, string pubsubName, string name, boo
}
return builder;
}

/// <summary>
/// Adds <see cref="ITopicMetadata" /> metadata to the provided <see cref="IEndpointConventionBuilder" />.
/// </summary>
/// <param name="builder">The <see cref="IEndpointConventionBuilder" />.</param>\
/// <param name="topicOptions">The object of TopicOptions class that provides all topic attributes.</param>
/// <typeparam name="T">The <see cref="IEndpointConventionBuilder" /> type.</typeparam>
/// <returns>The <see cref="IEndpointConventionBuilder" /> builder object.</returns>
public static T WithTopic<T>(this T builder, TopicOptions topicOptions)
where T : IEndpointConventionBuilder
{
if (builder is null)
{
throw new ArgumentNullException(nameof(builder));
}

ArgumentVerifier.ThrowIfNullOrEmpty(topicOptions.PubsubName, nameof(topicOptions.PubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(topicOptions.Name, nameof(topicOptions.Name));

var topicObject = new TopicAttribute(topicOptions.PubsubName, topicOptions.Name, topicOptions.DeadLetterTopic, topicOptions.EnableRawPayload);

topicObject.Match = topicOptions.Match;
topicObject.Priority = topicOptions.Priority;
topicObject.OwnedMetadatas = topicOptions.OwnedMetadatas;
topicObject.MetadataSeparator = topicObject.MetadataSeparator;

if (topicOptions.Metadata is not null)
{
foreach (var md in topicOptions.Metadata)
{
builder.WithMetadata(new TopicMetadataAttribute(md.Key, md.Value));
}
}

builder.WithMetadata(topicObject);

return builder;
}
}
}
Loading

0 comments on commit 400347c

Please sign in to comment.