From 400347c26fa269aed112cd54885dccea6d8894bc Mon Sep 17 00:00:00 2001 From: Yash Nisar Date: Sun, 14 Aug 2022 20:23:01 -0500 Subject: [PATCH] Add dead letter topic support Closes https://github.com/dapr/dotnet-sdk/issues/897 Signed-off-by: Yash Nisar --- .../Controllers/SampleController.cs | 35 ++++++++-- .../ControllerSample/CustomTopicAttribute.cs | 2 +- .../AspNetCore/ControllerSample/README.md | 25 +++++++ .../ControllerSample/Transaction.cs | 3 +- examples/AspNetCore/RoutingSample/README.md | 36 ++++++++++ examples/AspNetCore/RoutingSample/Startup.cs | 66 ++++++++++++------ ...DaprEndpointConventionBuilderExtensions.cs | 41 ++++++++++++ .../DaprEndpointRouteBuilderExtensions.cs | 8 ++- .../IDeadLetterTopicMetadata.cs | 27 ++++++++ src/Dapr.AspNetCore/ITopicMetadata.cs | 2 +- src/Dapr.AspNetCore/Subscription.cs | 5 ++ src/Dapr.AspNetCore/TopicAttribute.cs | 41 +++++++++--- src/Dapr.AspNetCore/TopicOptions.cs | 67 +++++++++++++++++++ .../dapr/proto/dapr/v1/appcallback.proto | 3 + .../CustomTopicAttribute.cs | 2 +- .../DaprController.cs | 6 ++ .../SubscribeEndpointTest.cs | 55 ++++++++------- 17 files changed, 364 insertions(+), 60 deletions(-) create mode 100644 src/Dapr.AspNetCore/IDeadLetterTopicMetadata.cs create mode 100644 src/Dapr.AspNetCore/TopicOptions.cs diff --git a/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs b/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs index a80717787..7a5f8a75f 100644 --- a/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs +++ b/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs @@ -64,18 +64,39 @@ public ActionResult Get([FromState(StoreName)] StateEntry acco /// State client to interact with Dapr runtime. /// A representing the result of the asynchronous operation. /// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI. - [Topic("pubsub", "deposit")] + [Topic("pubsub", "deposit", "amountDeadLetterTopic", false)] [HttpPost("deposit")] public async Task> Deposit(Transaction transaction, [FromServices] DaprClient daprClient) { logger.LogDebug("Enter deposit"); var state = await daprClient.GetStateEntryAsync(StoreName, transaction.Id); state.Value ??= new Account() { Id = transaction.Id, }; + logger.LogDebug("Id is {0}, the amount to be deposited is {1}", transaction.Id, transaction.Amount); + + if (transaction.Amount < 0m) + { + return BadRequest(new { statusCode = 400, message = "bad request" }); + } + state.Value.Balance += transaction.Amount; + logger.LogDebug("Balance is {0}", state.Value.Balance); await state.SaveAsync(); return state.Value; } + /// + /// Method for viewing the error message when the deposit/withdrawal amounts + /// are negative. + /// + /// Transaction info. + [Topic("pubsub", "amountDeadLetterTopic")] + [HttpPost("deadLetterTopicRoute")] + public ActionResult ViewErrorMessage(Transaction transaction) + { + logger.LogDebug("The amount cannot be negative: {0}", transaction.Amount); + return Ok(); + } + /// /// Method for withdrawing from account as specified in transaction. /// @@ -83,19 +104,25 @@ public async Task> Deposit(Transaction transaction, [FromS /// State client to interact with Dapr runtime. /// A representing the result of the asynchronous operation. /// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI. - [Topic("pubsub", "withdraw")] + [Topic("pubsub", "withdraw", "amountDeadLetterTopic", false)] [HttpPost("withdraw")] public async Task> Withdraw(Transaction transaction, [FromServices] DaprClient daprClient) { - logger.LogDebug("Enter withdraw"); + logger.LogDebug("Enter withdraw method..."); var state = await daprClient.GetStateEntryAsync(StoreName, transaction.Id); + logger.LogDebug("Id is {0}, the amount to be withdrawn is {1}", transaction.Id, transaction.Amount); if (state.Value == null) { return this.NotFound(); } + if (transaction.Amount < 0m) + { + return BadRequest(new { statusCode = 400, message = "bad request" }); + } state.Value.Balance -= transaction.Amount; + logger.LogDebug("Balance is {0}", state.Value.Balance); await state.SaveAsync(); return state.Value; } @@ -134,7 +161,7 @@ public async Task> WithdrawV2(TransactionV2 transaction, [ [HttpPost("throwException")] public async Task> ThrowException(Transaction transaction, [FromServices] DaprClient daprClient) { - Console.WriteLine("Enter ThrowException"); + logger.LogDebug("Enter ThrowException"); var task = Task.Delay(10); await task; return BadRequest(new { statusCode = 400, message = "bad request" }); diff --git a/examples/AspNetCore/ControllerSample/CustomTopicAttribute.cs b/examples/AspNetCore/ControllerSample/CustomTopicAttribute.cs index 2017315ed..96eb918fb 100644 --- a/examples/AspNetCore/ControllerSample/CustomTopicAttribute.cs +++ b/examples/AspNetCore/ControllerSample/CustomTopicAttribute.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/examples/AspNetCore/ControllerSample/README.md b/examples/AspNetCore/ControllerSample/README.md index 0a727b706..bb6c448a7 100644 --- a/examples/AspNetCore/ControllerSample/README.md +++ b/examples/AspNetCore/ControllerSample/README.md @@ -133,7 +133,32 @@ On Windows: dapr publish --pubsub pubsub --publish-app-id controller -t deposit -d "{\"id\": \"17\", \"amount\": 15 }" ``` --- +**Dead Letter Topic example (pubsub)** +Publish an event using the Dapr cli with an incorrect input, i.e. negative amount: +Deposit: +On Linux, MacOS: +```sh +dapr publish --pubsub pubsub --publish-app-id controller -t deposit -d '{"id": "17", "amount": -15 }' +``` +On Windows: + ```sh + dapr publish --pubsub pubsub --publish-app-id controller -t deposit -d "{\"id\": \"17\", \"amount\": -15 }" +``` + +Withdraw: + On Linux, MacOS: +```sh +dapr publish --pubsub pubsub --publish-app-id controller -t withdraw -d '{"id": "17", "amount": -15 }' +``` +On Windows: + ```sh + dapr publish --pubsub pubsub --publish-app-id controller -t withdraw -d "{\"id\": \"17\", \"amount\": -15 }" + ``` + +First a message is sent from a publisher on a `deposit` or `withdraw` topic. Dapr receives the message on behalf of a subscriber application, however the `deposit` or `withdraw` topic message fails to be delivered to the `/deposit` or `/withdraw` endpoint on the application, even after retries. As a result of the failure to deliver, the message is forwarded to the `amountDeadLetterTopic` topic which delivers this to the `/deadLetterTopicRoute` endpoint. + + --- ## Code Samples *All of the interesting code in this sample is in Startup.cs and Controllers/SampleController.cs* diff --git a/examples/AspNetCore/ControllerSample/Transaction.cs b/examples/AspNetCore/ControllerSample/Transaction.cs index cf553a9c9..2d6f5a5aa 100644 --- a/examples/AspNetCore/ControllerSample/Transaction.cs +++ b/examples/AspNetCore/ControllerSample/Transaction.cs @@ -28,8 +28,7 @@ public class Transaction /// /// Gets or sets amount for the transaction. - /// - [Range(0, double.MaxValue)] + /// `MapGet(...)` and `MapPost(...)` are provided by ASP.NET Core routing - these are used to setup endpoints to handle HTTP requests. `WithTopic(...)` associates an endpoint with a pub/sub topic. +```C# +var depositTopicOptions = new TopicOptions(); +depositTopicOptions.PubsubName = PubsubName; +depositTopicOptions.Name = "deposit"; +depositTopicOptions.DeadLetterTopic = "amountDeadLetterTopic"; + +var withdrawTopicOptions = new TopicOptions(); +withdrawTopicOptions.PubsubName = PubsubName; +withdrawTopicOptions.Name = "withdraw"; +withdrawTopicOptions.DeadLetterTopic = "amountDeadLetterTopic"; +``` +`WithTopic(...)` now takes the `TopicOptions(..)` instance that defines configurations for the subscribe endpoint. --- diff --git a/examples/AspNetCore/RoutingSample/Startup.cs b/examples/AspNetCore/RoutingSample/Startup.cs index 5ac1541a5..bb8aad831 100644 --- a/examples/AspNetCore/RoutingSample/Startup.cs +++ b/examples/AspNetCore/RoutingSample/Startup.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ namespace RoutingSample using System; using System.Text.Json; using System.Threading.Tasks; + using Dapr; using Dapr.Client; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; @@ -23,6 +24,7 @@ namespace RoutingSample using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; + using Microsoft.Extensions.Logging; /// /// Startup class. @@ -74,7 +76,8 @@ public void ConfigureServices(IServiceCollection services) /// Application builder. /// Webhost environment. /// Options for JSON serialization. - public void Configure(IApplicationBuilder app, IWebHostEnvironment env, JsonSerializerOptions serializerOptions) + public void Configure(IApplicationBuilder app, IWebHostEnvironment env, JsonSerializerOptions serializerOptions, + ILogger logger) { if (env.IsDevelopment()) { @@ -89,27 +92,38 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, JsonSeri { endpoints.MapSubscribeHandler(); + var depositTopicOptions = new TopicOptions(); + depositTopicOptions.PubsubName = PubsubName; + depositTopicOptions.Name = "deposit"; + depositTopicOptions.DeadLetterTopic = "amountDeadLetterTopic"; + + var withdrawTopicOptions = new TopicOptions(); + withdrawTopicOptions.PubsubName = PubsubName; + withdrawTopicOptions.Name = "withdraw"; + withdrawTopicOptions.DeadLetterTopic = "amountDeadLetterTopic"; + endpoints.MapGet("{id}", Balance); - endpoints.MapPost("deposit", Deposit).WithTopic(PubsubName, "deposit"); - endpoints.MapPost("withdraw", Withdraw).WithTopic(PubsubName, "withdraw"); + endpoints.MapPost("deposit", Deposit).WithTopic(depositTopicOptions); + endpoints.MapPost("deadLetterTopicRoute", ViewErrorMessage).WithTopic(PubsubName, "amountDeadLetterTopic"); + endpoints.MapPost("withdraw", Withdraw).WithTopic(withdrawTopicOptions); }); async Task Balance(HttpContext context) { - Console.WriteLine("Enter Balance"); + logger.LogDebug("Enter Balance"); var client = context.RequestServices.GetRequiredService(); var id = (string)context.Request.RouteValues["id"]; - Console.WriteLine("id is {0}", id); + logger.LogDebug("id is {0}", id); var account = await client.GetStateAsync(StoreName, id); if (account == null) { - Console.WriteLine("Account not found"); + logger.LogDebug("Account not found"); context.Response.StatusCode = 404; return; } - Console.WriteLine("Account balance is {0}", account.Balance); + logger.LogDebug("Account balance is {0}", account.Balance); context.Response.ContentType = "application/json"; await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions); @@ -117,12 +131,13 @@ async Task Balance(HttpContext context) async Task Deposit(HttpContext context) { - Console.WriteLine("Enter Deposit"); - - var client = context.RequestServices.GetRequiredService(); + logger.LogDebug("Enter Deposit"); + var client = context.RequestServices.GetRequiredService(); var transaction = await JsonSerializer.DeserializeAsync(context.Request.Body, serializerOptions); - Console.WriteLine("Id is {0}, Amount is {1}", transaction.Id, transaction.Amount); + + logger.LogDebug("Id is {0}, Amount is {1}", transaction.Id, transaction.Amount); + var account = await client.GetStateAsync(StoreName, transaction.Id); if (account == null) { @@ -131,43 +146,56 @@ async Task Deposit(HttpContext context) if (transaction.Amount < 0m) { - Console.WriteLine("Invalid amount"); + logger.LogDebug("Invalid amount"); context.Response.StatusCode = 400; return; } account.Balance += transaction.Amount; await client.SaveStateAsync(StoreName, transaction.Id, account); - Console.WriteLine("Balance is {0}", account.Balance); + logger.LogDebug("Balance is {0}", account.Balance); context.Response.ContentType = "application/json"; await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions); } + async Task ViewErrorMessage(HttpContext context) + { + var client = context.RequestServices.GetRequiredService(); + var transaction = await JsonSerializer.DeserializeAsync(context.Request.Body, serializerOptions); + + logger.LogDebug("The amount cannot be negative: {0}", transaction.Amount); + + return; + } + async Task Withdraw(HttpContext context) { - Console.WriteLine("Enter Withdraw"); + logger.LogDebug("Enter Withdraw"); + var client = context.RequestServices.GetRequiredService(); var transaction = await JsonSerializer.DeserializeAsync(context.Request.Body, serializerOptions); - Console.WriteLine("Id is {0}", transaction.Id); + + logger.LogDebug("Id is {0}, Amount is {1}", transaction.Id, transaction.Amount); + var account = await client.GetStateAsync(StoreName, transaction.Id); if (account == null) { - Console.WriteLine("Account not found"); + logger.LogDebug("Account not found"); context.Response.StatusCode = 404; return; } if (transaction.Amount < 0m) { - Console.WriteLine("Invalid amount"); + logger.LogDebug("Invalid amount"); context.Response.StatusCode = 400; return; } account.Balance -= transaction.Amount; await client.SaveStateAsync(StoreName, transaction.Id, account); - Console.WriteLine("Balance is {0}", account.Balance); + logger.LogDebug("Balance is {0}", account.Balance); context.Response.ContentType = "application/json"; await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions); diff --git a/src/Dapr.AspNetCore/DaprEndpointConventionBuilderExtensions.cs b/src/Dapr.AspNetCore/DaprEndpointConventionBuilderExtensions.cs index b29b81500..af8440e38 100644 --- a/src/Dapr.AspNetCore/DaprEndpointConventionBuilderExtensions.cs +++ b/src/Dapr.AspNetCore/DaprEndpointConventionBuilderExtensions.cs @@ -15,7 +15,10 @@ namespace Microsoft.AspNetCore.Builder { using System; using System.Collections.Generic; + using System.Reflection; + using System.Xml.Linq; using Dapr; + using Grpc.Core; /// /// Contains extension methods for . @@ -103,5 +106,43 @@ public static T WithTopic(this T builder, string pubsubName, string name, boo } return builder; } + + /// + /// Adds metadata to the provided . + /// + /// The .\ + /// The object of TopicOptions class that provides all topic attributes. + /// The type. + /// The builder object. + public static T WithTopic(this T builder, TopicOptions topicOptions) + where T : IEndpointConventionBuilder + { + if (builder is null) + { + throw new ArgumentNullException(nameof(builder)); + } + + ArgumentVerifier.ThrowIfNullOrEmpty(topicOptions.PubsubName, nameof(topicOptions.PubsubName)); + ArgumentVerifier.ThrowIfNullOrEmpty(topicOptions.Name, nameof(topicOptions.Name)); + + var topicObject = new TopicAttribute(topicOptions.PubsubName, topicOptions.Name, topicOptions.DeadLetterTopic, topicOptions.EnableRawPayload); + + topicObject.Match = topicOptions.Match; + topicObject.Priority = topicOptions.Priority; + topicObject.OwnedMetadatas = topicOptions.OwnedMetadatas; + topicObject.MetadataSeparator = topicObject.MetadataSeparator; + + if (topicOptions.Metadata is not null) + { + foreach (var md in topicOptions.Metadata) + { + builder.WithMetadata(new TopicMetadataAttribute(md.Key, md.Value)); + } + } + + builder.WithMetadata(topicObject); + + return builder; + } } } diff --git a/src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs b/src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs index 558e415ca..2ad7ac749 100644 --- a/src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs +++ b/src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs @@ -72,12 +72,13 @@ private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRoute var topicMetadata = e.Metadata.GetOrderedMetadata(); var originalTopicMetadata = e.Metadata.GetOrderedMetadata(); - var subs = new List<(string PubsubName, string Name, bool? EnableRawPayload, string Match, int Priority, Dictionary OriginalTopicMetadata, string MetadataSeparator, RoutePattern RoutePattern)>(); + var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload, string Match, int Priority, Dictionary OriginalTopicMetadata, string MetadataSeparator, RoutePattern RoutePattern)>(); for (int i = 0; i < topicMetadata.Count(); i++) { subs.Add((topicMetadata[i].PubsubName, topicMetadata[i].Name, + (topicMetadata[i] as IDeadLetterTopicMetadata)?.DeadLetterTopic, (topicMetadata[i] as IRawTopicMetadata)?.EnableRawPayload, topicMetadata[i].Match, topicMetadata[i].Priority, @@ -133,6 +134,11 @@ private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRoute Metadata = metadata.Count > 0 ? metadata : null, }; + if (first.DeadLetterTopic != null) + { + subscription.DeadLetterTopic = first.DeadLetterTopic; + } + // Use the V2 routing rules structure if (rules.Count > 0) { diff --git a/src/Dapr.AspNetCore/IDeadLetterTopicMetadata.cs b/src/Dapr.AspNetCore/IDeadLetterTopicMetadata.cs new file mode 100644 index 000000000..97707b980 --- /dev/null +++ b/src/Dapr.AspNetCore/IDeadLetterTopicMetadata.cs @@ -0,0 +1,27 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr +{ + /// + /// IDeadLetterTopicMetadata that describes the metadata of a dead letter topic. + /// + public interface IDeadLetterTopicMetadata + { + /// + /// Gets the dead letter topic name + /// + public string DeadLetterTopic { get; } + } +} + diff --git a/src/Dapr.AspNetCore/ITopicMetadata.cs b/src/Dapr.AspNetCore/ITopicMetadata.cs index 4f297470e..eb3732139 100644 --- a/src/Dapr.AspNetCore/ITopicMetadata.cs +++ b/src/Dapr.AspNetCore/ITopicMetadata.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/src/Dapr.AspNetCore/Subscription.cs b/src/Dapr.AspNetCore/Subscription.cs index 54e41956c..1c3177b9f 100644 --- a/src/Dapr.AspNetCore/Subscription.cs +++ b/src/Dapr.AspNetCore/Subscription.cs @@ -44,6 +44,11 @@ internal class Subscription /// Gets or sets the metadata. /// public Metadata Metadata { get; set; } + + /// + /// Gets or sets the deadletter topic. + /// + public string DeadLetterTopic { get; set; } } /// diff --git a/src/Dapr.AspNetCore/TopicAttribute.cs b/src/Dapr.AspNetCore/TopicAttribute.cs index 770bfe18b..daa5d850f 100644 --- a/src/Dapr.AspNetCore/TopicAttribute.cs +++ b/src/Dapr.AspNetCore/TopicAttribute.cs @@ -19,7 +19,7 @@ namespace Dapr /// TopicAttribute describes an endpoint as a subscriber to a topic. /// [AttributeUsage(AttributeTargets.All, AllowMultiple = true)] - public class TopicAttribute : Attribute, ITopicMetadata, IRawTopicMetadata, IOwnedOriginalTopicMetadata + public class TopicAttribute : Attribute, ITopicMetadata, IRawTopicMetadata, IOwnedOriginalTopicMetadata, IDeadLetterTopicMetadata { /// /// Initializes a new instance of the class. @@ -105,25 +105,50 @@ public TopicAttribute(string pubsubName, string name, bool enableRawPayload, str this.MetadataSeparator = metadataSeparator; } + /// + /// Initializes a new instance of the class. + /// + /// The name of the pubsub component to use. + /// The topic name. + /// The dead letter topic name. + /// The enable/disable raw pay load flag. + /// The topic owned metadata ids. + /// Separator to use for metadata. + public TopicAttribute(string pubsubName, string name, string deadLetterTopic, bool enableRawPayload, string[] ownedMetadatas = null, string metadataSeparator = null) + { + ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName)); + ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name)); + + this.Name = name; + this.PubsubName = pubsubName; + this.DeadLetterTopic = deadLetterTopic; + this.EnableRawPayload = enableRawPayload; + this.OwnedMetadatas = ownedMetadatas; + this.MetadataSeparator = metadataSeparator; + } + + /// + public string Name { get; set; } + /// - public string Name { get; } + public string PubsubName { get; set; } /// - public string PubsubName { get; } + public bool? EnableRawPayload { get; set; } /// - public bool? EnableRawPayload { get; } + public new string Match { get; set; } /// - public new string Match { get; } + public int Priority { get; set; } /// - public int Priority { get; } + public string[] OwnedMetadatas { get; set; } /// - public string[] OwnedMetadatas { get; } + public string MetadataSeparator { get; set; } /// - public string MetadataSeparator { get; } + public string DeadLetterTopic { get; set; } } } diff --git a/src/Dapr.AspNetCore/TopicOptions.cs b/src/Dapr.AspNetCore/TopicOptions.cs new file mode 100644 index 000000000..2ca2eea49 --- /dev/null +++ b/src/Dapr.AspNetCore/TopicOptions.cs @@ -0,0 +1,67 @@ +// ------------------------------------------------------------------------ +// Copyright 2022 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr +{ + using System.Collections.Generic; + /// + /// This class defines configurations for the subscribe endpoint. + /// + public class TopicOptions + { + /// + /// Gets or Sets the topic name. + /// + public string Name { get; set; } + + /// + /// Gets or Sets the name of the pubsub component to use. + /// + public string PubsubName { get; set; } + + /// + /// Gets or Sets a value which indicates whether to enable or disable processing raw messages. + /// + public bool EnableRawPayload { get; set; } + + /// + /// Gets or Sets the CEL expression to use to match events for this handler. + /// + public string Match { get; set; } + + /// + /// Gets or Sets the priority in which this rule should be evaluated (lower to higher). + /// + public int Priority { get; set; } + + /// + /// Gets or Sets the owned by topic. + /// + public string[] OwnedMetadatas { get; set; } + + /// + /// Get or Sets the separator to use for metadata. + /// + public string MetadataSeparator { get; set; } + + /// + /// Gets or Sets the dead letter topic. + /// + public string DeadLetterTopic { get; set; } + + /// + /// Gets or Sets the original topic metadata. + /// + public IDictionary Metadata; + } +} diff --git a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/appcallback.proto b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/appcallback.proto index 02436dae8..bfbb4d79d 100644 --- a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/appcallback.proto +++ b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/appcallback.proto @@ -160,6 +160,9 @@ message TopicSubscription { // The optional routing rules to match against. In the gRPC interface, OnTopicEvent // is still invoked but the matching path is sent in the TopicEventRequest. TopicRoutes routes = 5; + + // The optional dead letter queue for this topic to send events to. + string dead_letter_topic = 6; } message TopicRoutes { diff --git a/test/Dapr.AspNetCore.IntegrationTest.App/CustomTopicAttribute.cs b/test/Dapr.AspNetCore.IntegrationTest.App/CustomTopicAttribute.cs index e0d5df8e2..6f65f060f 100644 --- a/test/Dapr.AspNetCore.IntegrationTest.App/CustomTopicAttribute.cs +++ b/test/Dapr.AspNetCore.IntegrationTest.App/CustomTopicAttribute.cs @@ -1,4 +1,4 @@ -// ------------------------------------------------------------------------ +// ------------------------------------------------------------------------ // Copyright 2021 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs b/test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs index 52e786879..733ee0b73 100644 --- a/test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs +++ b/test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs @@ -67,6 +67,12 @@ public void TopicEImportant() public void MultipleTopics() { } + + [Topic("pubsub", "G", "deadLetterTopicName", false)] + [HttpPost("/G")] + public void TopicG() + { + } [Topic("pubsub", "metadata", new string[1] { "id1" })] [Topic("pubsub", "metadata.1", true)] diff --git a/test/Dapr.AspNetCore.IntegrationTest/SubscribeEndpointTest.cs b/test/Dapr.AspNetCore.IntegrationTest/SubscribeEndpointTest.cs index 048018a64..62a66898d 100644 --- a/test/Dapr.AspNetCore.IntegrationTest/SubscribeEndpointTest.cs +++ b/test/Dapr.AspNetCore.IntegrationTest/SubscribeEndpointTest.cs @@ -39,14 +39,22 @@ public async Task SubscribeEndpoint_ReportsTopics() var json = await JsonSerializer.DeserializeAsync(stream); json.ValueKind.Should().Be(JsonValueKind.Array); - json.GetArrayLength().Should().Be(16); - var subscriptions = new List<(string PubsubName, string Topic, string Route, string rawPayload, string match, string metadata)>(); + json.GetArrayLength().Should().Be(17); + + var subscriptions = new List<(string PubsubName, string Topic, string Route, string rawPayload, string match, string metadata, string DeadLetterTopic)>(); + foreach (var element in json.EnumerateArray()) { var pubsubName = element.GetProperty("pubsubName").GetString(); var topic = element.GetProperty("topic").GetString(); var rawPayload = string.Empty; + var deadLetterTopic = string.Empty; Dictionary originalMetadata = new Dictionary(); + + if (element.TryGetProperty("deadLetterTopic", out JsonElement DeadLetterTopic)) + { + deadLetterTopic = DeadLetterTopic.GetString(); + } if (element.TryGetProperty("metadata", out JsonElement metadata)) { if (metadata.TryGetProperty("rawPayload", out JsonElement rawPayloadJson)) @@ -70,7 +78,7 @@ public async Task SubscribeEndpoint_ReportsTopics() if (element.TryGetProperty("route", out JsonElement route)) { - subscriptions.Add((pubsubName, topic, route.GetString(), rawPayload, string.Empty, originalMetadataString)); + subscriptions.Add((pubsubName, topic, route.GetString(), rawPayload, string.Empty, originalMetadataString, deadLetterTopic)); } else if (element.TryGetProperty("routes", out JsonElement routes)) { @@ -80,34 +88,35 @@ public async Task SubscribeEndpoint_ReportsTopics() { var match = rule.GetProperty("match").GetString(); var path = rule.GetProperty("path").GetString(); - subscriptions.Add((pubsubName, topic, path, rawPayload, match, originalMetadataString)); + subscriptions.Add((pubsubName, topic, path, rawPayload, match, originalMetadataString, deadLetterTopic)); } } if (routes.TryGetProperty("default", out JsonElement defaultProperty)) { - subscriptions.Add((pubsubName, topic, defaultProperty.GetString(), rawPayload, string.Empty, originalMetadataString)); + subscriptions.Add((pubsubName, topic, defaultProperty.GetString(), rawPayload, string.Empty, originalMetadataString, deadLetterTopic)); } } } - subscriptions.Should().Contain(("testpubsub", "A", "topic-a", string.Empty, string.Empty, string.Empty)); - subscriptions.Should().Contain(("testpubsub", "A.1", "topic-a", string.Empty, string.Empty, string.Empty)); - subscriptions.Should().Contain(("pubsub", "B", "B", string.Empty, string.Empty, string.Empty)); - subscriptions.Should().Contain(("custom-pubsub", "custom-C", "C", string.Empty, string.Empty, string.Empty)); - subscriptions.Should().Contain(("pubsub", "register-user", "register-user", string.Empty, string.Empty, string.Empty)); - subscriptions.Should().Contain(("pubsub", "register-user-plaintext", "register-user-plaintext", string.Empty, string.Empty, string.Empty)); - subscriptions.Should().Contain(("pubsub", "D", "D", "true", string.Empty, string.Empty)); - subscriptions.Should().Contain(("pubsub", "E", "E", string.Empty, string.Empty, string.Empty)); - subscriptions.Should().Contain(("pubsub", "E", "E-Critical", string.Empty, "event.type == \"critical\"", string.Empty)); - subscriptions.Should().Contain(("pubsub", "E", "E-Important", string.Empty, "event.type == \"important\"", string.Empty)); - subscriptions.Should().Contain(("pubsub", "F", "multiTopicAttr", string.Empty, string.Empty, string.Empty)); - subscriptions.Should().Contain(("pubsub", "F.1", "multiTopicAttr", "true", string.Empty, string.Empty)); - subscriptions.Should().Contain(("pubsub", "splitTopicBuilder", "splitTopics", string.Empty, string.Empty, string.Empty)); - subscriptions.Should().Contain(("pubsub", "splitTopicAttr", "splitTopics", "true", string.Empty, string.Empty)); - subscriptions.Should().Contain(("pubsub", "metadata", "multiMetadataTopicAttr", string.Empty, string.Empty, "n1=v1;n2=v2,v3")); - subscriptions.Should().Contain(("pubsub", "metadata.1", "multiMetadataTopicAttr", "true", string.Empty, "n1=v1")); - subscriptions.Should().Contain(("pubsub", "splitMetadataTopicBuilder", "splitMetadataTopics", string.Empty, string.Empty, "n1=v1;n2=v1")); - subscriptions.Should().Contain(("pubsub", "metadataseparator", "topicmetadataseparatorattr", string.Empty, string.Empty, "n1=v1|v2")); + subscriptions.Should().Contain(("testpubsub", "A", "topic-a", string.Empty, string.Empty, string.Empty, string.Empty)); + subscriptions.Should().Contain(("testpubsub", "A.1", "topic-a", string.Empty, string.Empty, string.Empty, string.Empty)); + subscriptions.Should().Contain(("pubsub", "B", "B", string.Empty, string.Empty, string.Empty, string.Empty)); + subscriptions.Should().Contain(("custom-pubsub", "custom-C", "C", string.Empty, string.Empty, string.Empty, string.Empty)); + subscriptions.Should().Contain(("pubsub", "register-user", "register-user", string.Empty, string.Empty, string.Empty, string.Empty)); + subscriptions.Should().Contain(("pubsub", "register-user-plaintext", "register-user-plaintext", string.Empty, string.Empty, string.Empty, string.Empty)); + subscriptions.Should().Contain(("pubsub", "D", "D", "true", string.Empty, string.Empty, string.Empty)); + subscriptions.Should().Contain(("pubsub", "E", "E", string.Empty, string.Empty, string.Empty, string.Empty)); + subscriptions.Should().Contain(("pubsub", "E", "E-Critical", string.Empty, "event.type == \"critical\"", string.Empty, string.Empty)); + subscriptions.Should().Contain(("pubsub", "E", "E-Important", string.Empty, "event.type == \"important\"", string.Empty, string.Empty)); + subscriptions.Should().Contain(("pubsub", "F", "multiTopicAttr", string.Empty, string.Empty, string.Empty, string.Empty)); + subscriptions.Should().Contain(("pubsub", "F.1", "multiTopicAttr", "true", string.Empty, string.Empty, string.Empty)); + subscriptions.Should().Contain(("pubsub", "G", "G", string.Empty, string.Empty, string.Empty, "deadLetterTopicName")); + subscriptions.Should().Contain(("pubsub", "splitTopicBuilder", "splitTopics", string.Empty, string.Empty, string.Empty, string.Empty)); + subscriptions.Should().Contain(("pubsub", "splitTopicAttr", "splitTopics", "true", string.Empty, string.Empty, string.Empty)); + subscriptions.Should().Contain(("pubsub", "metadata", "multiMetadataTopicAttr", string.Empty, string.Empty, "n1=v1;n2=v2,v3", string.Empty)); + subscriptions.Should().Contain(("pubsub", "metadata.1", "multiMetadataTopicAttr", "true", string.Empty, "n1=v1", string.Empty)); + subscriptions.Should().Contain(("pubsub", "splitMetadataTopicBuilder", "splitMetadataTopics", string.Empty, string.Empty, "n1=v1;n2=v1", string.Empty)); + // Test priority route sorting var eTopic = subscriptions.FindAll(e => e.Topic == "E"); eTopic.Count.Should().Be(3);