From 61dfa40a18f78df1ce64a490e832884094394a69 Mon Sep 17 00:00:00 2001 From: Yash Nisar Date: Thu, 29 Sep 2022 10:42:21 -0500 Subject: [PATCH 1/2] Add support for Bulk State, i.e. SaveBulkStateAsync Closes https://github.com/dapr/dotnet-sdk/issues/785 Signed-off-by: Yash Nisar --- .gitignore | 3 ++ src/Dapr.Client/DaprClient.cs | 9 ++++ src/Dapr.Client/DaprClientGrpc.cs | 59 ++++++++++++++++++++++++ src/Dapr.Client/SaveStateItem.cs | 65 +++++++++++++++++++++++++++ test/Dapr.Client.Test/StateApiTest.cs | 43 ++++++++++++++++++ 5 files changed, 179 insertions(+) create mode 100644 src/Dapr.Client/SaveStateItem.cs diff --git a/.gitignore b/.gitignore index 2803b32a6..9e2330966 100644 --- a/.gitignore +++ b/.gitignore @@ -91,6 +91,9 @@ bld/ # VS Code .vscode/ +# Jetbrains +.idea/ + # coverlet code coverage results coverage.json .fake diff --git a/src/Dapr.Client/DaprClient.cs b/src/Dapr.Client/DaprClient.cs index 49b0c416d..31d397545 100644 --- a/src/Dapr.Client/DaprClient.cs +++ b/src/Dapr.Client/DaprClient.cs @@ -679,6 +679,15 @@ public abstract Task InvokeMethodGrpcAsync( /// A that can be used to cancel the operation. /// A that will return the list of values when the operation has completed. public abstract Task> GetBulkStateAsync(string storeName, IReadOnlyList keys, int? parallelism, IReadOnlyDictionary metadata = default, CancellationToken cancellationToken = default); + + /// + /// Saves a list of to the Dapr state store. + /// + /// The name of state store. + /// The list of items to save. + /// A that can be used to cancel the operation. + /// A that will complete when the operation has completed. + public abstract Task SaveBulkStateAsync(string storeName, IReadOnlyList> items, CancellationToken cancellationToken = default); /// /// Deletes a list of from the Dapr state store. diff --git a/src/Dapr.Client/DaprClientGrpc.cs b/src/Dapr.Client/DaprClientGrpc.cs index d5c5538a2..e292f0726 100644 --- a/src/Dapr.Client/DaprClientGrpc.cs +++ b/src/Dapr.Client/DaprClientGrpc.cs @@ -607,6 +607,65 @@ public override async Task GetStateAsync( throw new DaprException("State operation failed: the state payload could not be deserialized. See InnerException for details.", ex); } } + + /// + public override async Task SaveBulkStateAsync(string storeName, IReadOnlyList> items, CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName)); + + if (items.Count == 0) + { + throw new ArgumentException("items do not contain any elements"); + } + + var envelope = new Autogenerated.SaveStateRequest() + { + StoreName = storeName, + }; + + foreach (var item in items) + { + var stateItem = new Autogenerated.StateItem() + { + Key = item.Key, + }; + + if (item.ETag != null) + { + stateItem.Etag = new Autogenerated.Etag() { Value = item.ETag }; + } + + if (item.Metadata != null) + { + foreach (var kvp in item.Metadata) + { + stateItem.Metadata.Add(kvp.Key, kvp.Value); + } + } + + if (item.StateOptions != null) + { + stateItem.Options = ToAutoGeneratedStateOptions(item.StateOptions); + } + + if (item.Value != null) + { + stateItem.Value = TypeConverters.ToJsonByteString(item.Value, this.jsonSerializerOptions); + } + + envelope.States.Add(stateItem); + } + + try + { + await this.Client.SaveStateAsync(envelope, cancellationToken: cancellationToken); + } + catch (RpcException ex) + { + throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + + } /// public override async Task DeleteBulkStateAsync(string storeName, IReadOnlyList items, CancellationToken cancellationToken = default) diff --git a/src/Dapr.Client/SaveStateItem.cs b/src/Dapr.Client/SaveStateItem.cs new file mode 100644 index 000000000..818bee206 --- /dev/null +++ b/src/Dapr.Client/SaveStateItem.cs @@ -0,0 +1,65 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Collections.Generic; + +namespace Dapr.Client +{ + /// + /// Represents a state object used for bulk delete state operation + /// + public readonly struct SaveStateItem + { + /// + /// Initializes a new instance of the class. + /// + /// The state key. + /// The state value. + /// The ETag. + /// The stateOptions. + /// The metadata. + public SaveStateItem(string key, TValue value, string etag, StateOptions stateOptions = default, IReadOnlyDictionary metadata = default) + { + this.Key = key; + this.Value = value; + this.ETag = etag; + this.StateOptions = stateOptions; + this.Metadata = metadata; + } + + /// + /// Gets the state key. + /// + public string Key { get; } + + /// + /// Gets the state value. + /// + public TValue Value { get; } + + /// + /// Get the ETag. + /// + public string ETag { get; } + + /// + /// Gets the StateOptions. + /// + public StateOptions StateOptions { get; } + + /// + /// Gets the Metadata. + /// + public IReadOnlyDictionary Metadata { get; } + } +} diff --git a/test/Dapr.Client.Test/StateApiTest.cs b/test/Dapr.Client.Test/StateApiTest.cs index f3dbd6a0a..90c06e6b1 100644 --- a/test/Dapr.Client.Test/StateApiTest.cs +++ b/test/Dapr.Client.Test/StateApiTest.cs @@ -316,6 +316,49 @@ public async Task SaveStateAsync_CanSaveState() stateFromRequest.Size.Should().Be(widget.Size); stateFromRequest.Color.Should().Be(widget.Color); } + + [Fact] + public async Task SaveBulkStateAsync_ValidateRequest() + { + await using var client = TestClient.CreateForDaprClient(); + + var stateItem1 = new SaveStateItem("testKey1", "testValue1", "testEtag1", + new StateOptions { Concurrency = ConcurrencyMode.LastWrite }, + new Dictionary {{ "partitionKey1", "mypartition1" } }); + + var stateItem2 = new SaveStateItem("testKey2", "testValue2", "testEtag2", + new StateOptions { Concurrency = ConcurrencyMode.LastWrite }, + new Dictionary {{ "partitionKey2", "mypartition2" } }); + + var stateItem3 = new SaveStateItem("testKey3", "testValue3", null, + new StateOptions { Concurrency = ConcurrencyMode.LastWrite }, + new Dictionary {{ "partitionKey3", "mypartition3" } }); + + var request = await client.CaptureGrpcRequestAsync(async daprClient => + { + await daprClient.SaveBulkStateAsync("testStore", + new List>() { stateItem1, stateItem2, stateItem3}); + }); + + request.Dismiss(); + + // Create Response & Validate + var envelope = await request.GetRequestEnvelopeAsync(); + envelope.StoreName.Should().Be("testStore"); + envelope.States.Count.Should().Be(3); + + envelope.States[0].Key.Should().Be("testKey1"); + envelope.States[0].Value.Should().Equal(ByteString.CopyFromUtf8(JsonSerializer.Serialize("testValue1"))); + envelope.States[0].Metadata.Should().ContainKey("partitionKey1"); + + envelope.States[1].Key.Should().Be("testKey2"); + envelope.States[1].Value.Should().Equal(ByteString.CopyFromUtf8(JsonSerializer.Serialize("testValue2"))); + envelope.States[1].Metadata.Should().ContainKey("partitionKey2"); + + envelope.States[2].Key.Should().Be("testKey3"); + envelope.States[2].Value.Should().Equal(ByteString.CopyFromUtf8(JsonSerializer.Serialize("testValue3"))); + envelope.States[2].Metadata.Should().ContainKey("partitionKey3"); + } [Fact] public async Task GetStateAsync_WithCancelledToken() From b8223bebe902f81c5791ed06edf2287f2bff82ea Mon Sep 17 00:00:00 2001 From: Yash Nisar Date: Tue, 25 Oct 2022 00:55:28 -0500 Subject: [PATCH 2/2] Update examples to include support for Bulk State Closes https://github.com/dapr/dotnet-sdk/issues/963 Signed-off-by: Yash Nisar --- .../StateManagement/BulkStateExample.cs | 54 +++++++++++++++++++ examples/Client/StateManagement/Program.cs | 1 + examples/Client/StateManagement/README.md | 6 ++- 3 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 examples/Client/StateManagement/BulkStateExample.cs diff --git a/examples/Client/StateManagement/BulkStateExample.cs b/examples/Client/StateManagement/BulkStateExample.cs new file mode 100644 index 000000000..250c69cef --- /dev/null +++ b/examples/Client/StateManagement/BulkStateExample.cs @@ -0,0 +1,54 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Dapr.Client; + +namespace Samples.Client +{ + public class BulkStateExample : Example + { + private static readonly string firstKey = "testKey1"; + private static readonly string secondKey = "testKey2"; + private static readonly string firstEtag = "123"; + private static readonly string secondEtag = "456"; + private static readonly string storeName = "statestore"; + + public override string DisplayName => "Using the State Store"; + + public override async Task RunAsync(CancellationToken cancellationToken) + { + using var client = new DaprClientBuilder().Build(); + + var state1 = new Widget() { Size = "small", Color = "yellow", }; + var state2 = new Widget() { Size = "big", Color = "green", }; + + var stateItem1 = new SaveStateItem(firstKey, state1, firstEtag); + var stateItem2 = new SaveStateItem(secondKey, state2, secondEtag); + + await client.SaveBulkStateAsync(storeName, new List>() { stateItem1, stateItem2}); + + Console.WriteLine("Saved 2 States!"); + + await Task.Delay(2000); + + IReadOnlyList states = await client.GetBulkStateAsync(storeName, + new List(){firstKey, secondKey}, null); + + Console.WriteLine($"Got {states.Count} States: "); + + var deleteBulkStateItem1 = new BulkDeleteStateItem(states[0].Key, states[0].ETag); + var deleteBulkStateItem2 = new BulkDeleteStateItem(states[1].Key, states[1].ETag); + + await client.DeleteBulkStateAsync(storeName, new List() { deleteBulkStateItem1, deleteBulkStateItem2 }); + + Console.WriteLine("Deleted States!"); + } + + private class Widget + { + public string? Size { get; set; } + public string? Color { get; set; } + } + } +} diff --git a/examples/Client/StateManagement/Program.cs b/examples/Client/StateManagement/Program.cs index eb717a0bd..24e37d004 100644 --- a/examples/Client/StateManagement/Program.cs +++ b/examples/Client/StateManagement/Program.cs @@ -24,6 +24,7 @@ class Program new StateStoreExample(), new StateStoreTransactionsExample(), new StateStoreETagsExample(), + new BulkStateExample() }; static async Task Main(string[] args) diff --git a/examples/Client/StateManagement/README.md b/examples/Client/StateManagement/README.md index f60e5a20c..141f16760 100644 --- a/examples/Client/StateManagement/README.md +++ b/examples/Client/StateManagement/README.md @@ -33,10 +33,12 @@ dapr run --app-id DaprClient -- dotnet run 0 See [StateStoreExample.cs](./StateStoreExample.cs) for an example of using `DaprClient` for basic state store operations like get, set, and delete. -# State transactions +## State transactions See: [StateStoreTransactionsExample.cs](./StateStoreTransactionsExample.cs) for an example of using `DaprClient` for transactional state store operations that affect multiple keys. ## ETags - See [StateStoreETagsExample.cs](./StateStoreETagsExample.cs) for an example of using `DaprClient` for optimistic concurrency control with the state store. + +## Bulk State +See [BulkStateExample.cs](./BulkStateExample.cs) for an example of using `DaprClient` for Bulk State, i.e. sending multiple key value pairs to the state store.