Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Bulk State, i.e. SaveBulkStateAsync(...) method #962

Merged
merged 2 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ bld/
# VS Code
.vscode/

# Jetbrains
.idea/

# coverlet code coverage results
coverage.json
.fake
Expand Down
54 changes: 54 additions & 0 deletions examples/Client/StateManagement/BulkStateExample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Client;

namespace Samples.Client
{
public class BulkStateExample : Example
{
private static readonly string firstKey = "testKey1";
private static readonly string secondKey = "testKey2";
private static readonly string firstEtag = "123";
private static readonly string secondEtag = "456";
private static readonly string storeName = "statestore";

public override string DisplayName => "Using the State Store";

public override async Task RunAsync(CancellationToken cancellationToken)
{
using var client = new DaprClientBuilder().Build();

var state1 = new Widget() { Size = "small", Color = "yellow", };
var state2 = new Widget() { Size = "big", Color = "green", };

var stateItem1 = new SaveStateItem<Widget>(firstKey, state1, firstEtag);
var stateItem2 = new SaveStateItem<Widget>(secondKey, state2, secondEtag);

await client.SaveBulkStateAsync(storeName, new List<SaveStateItem<Widget>>() { stateItem1, stateItem2});

Console.WriteLine("Saved 2 States!");

await Task.Delay(2000);

IReadOnlyList<BulkStateItem> states = await client.GetBulkStateAsync(storeName,
new List<string>(){firstKey, secondKey}, null);

yash-nisar marked this conversation as resolved.
Show resolved Hide resolved
Console.WriteLine($"Got {states.Count} States: ");

var deleteBulkStateItem1 = new BulkDeleteStateItem(states[0].Key, states[0].ETag);
var deleteBulkStateItem2 = new BulkDeleteStateItem(states[1].Key, states[1].ETag);

await client.DeleteBulkStateAsync(storeName, new List<BulkDeleteStateItem>() { deleteBulkStateItem1, deleteBulkStateItem2 });

Console.WriteLine("Deleted States!");
}

private class Widget
{
public string? Size { get; set; }
public string? Color { get; set; }
}
}
}
1 change: 1 addition & 0 deletions examples/Client/StateManagement/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Program
new StateStoreExample(),
new StateStoreTransactionsExample(),
new StateStoreETagsExample(),
new BulkStateExample()
yash-nisar marked this conversation as resolved.
Show resolved Hide resolved
};

static async Task<int> Main(string[] args)
Expand Down
6 changes: 4 additions & 2 deletions examples/Client/StateManagement/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ dapr run --app-id DaprClient -- dotnet run 0

See [StateStoreExample.cs](./StateStoreExample.cs) for an example of using `DaprClient` for basic state store operations like get, set, and delete.

# State transactions
## State transactions

See: [StateStoreTransactionsExample.cs](./StateStoreTransactionsExample.cs) for an example of using `DaprClient` for transactional state store operations that affect multiple keys.

## ETags

See [StateStoreETagsExample.cs](./StateStoreETagsExample.cs) for an example of using `DaprClient` for optimistic concurrency control with the state store.

## Bulk State
See [BulkStateExample.cs](./BulkStateExample.cs) for an example of using `DaprClient` for Bulk State, i.e. sending multiple key value pairs to the state store.
9 changes: 9 additions & 0 deletions src/Dapr.Client/DaprClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,15 @@ public abstract Task<TResponse> InvokeMethodGrpcAsync<TRequest, TResponse>(
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task{IReadOnlyList}" /> that will return the list of values when the operation has completed.</returns>
public abstract Task<IReadOnlyList<BulkStateItem>> GetBulkStateAsync(string storeName, IReadOnlyList<string> keys, int? parallelism, IReadOnlyDictionary<string, string> metadata = default, CancellationToken cancellationToken = default);

/// <summary>
/// Saves a list of <paramref name="items" /> to the Dapr state store.
/// </summary>
/// <param name="storeName">The name of state store.</param>
/// <param name="items">The list of items to save.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task" /> that will complete when the operation has completed.</returns>
public abstract Task SaveBulkStateAsync<TValue>(string storeName, IReadOnlyList<SaveStateItem<TValue>> items, CancellationToken cancellationToken = default);

/// <summary>
/// Deletes a list of <paramref name="items" /> from the Dapr state store.
Expand Down
59 changes: 59 additions & 0 deletions src/Dapr.Client/DaprClientGrpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,65 @@ public override async Task<TValue> GetStateAsync<TValue>(
throw new DaprException("State operation failed: the state payload could not be deserialized. See InnerException for details.", ex);
}
}

/// <inheritdoc />
public override async Task SaveBulkStateAsync<TValue>(string storeName, IReadOnlyList<SaveStateItem<TValue>> items, CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));

if (items.Count == 0)
{
throw new ArgumentException("items do not contain any elements");
}

var envelope = new Autogenerated.SaveStateRequest()
{
StoreName = storeName,
};

foreach (var item in items)
{
var stateItem = new Autogenerated.StateItem()
{
Key = item.Key,
};

if (item.ETag != null)
{
stateItem.Etag = new Autogenerated.Etag() { Value = item.ETag };
}

if (item.Metadata != null)
{
foreach (var kvp in item.Metadata)
{
stateItem.Metadata.Add(kvp.Key, kvp.Value);
}
}

if (item.StateOptions != null)
{
stateItem.Options = ToAutoGeneratedStateOptions(item.StateOptions);
}

if (item.Value != null)
{
stateItem.Value = TypeConverters.ToJsonByteString(item.Value, this.jsonSerializerOptions);
}

envelope.States.Add(stateItem);
}

try
{
await this.Client.SaveStateAsync(envelope, cancellationToken: cancellationToken);
}
catch (RpcException ex)
{
throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}

}

/// <inheritdoc />
public override async Task DeleteBulkStateAsync(string storeName, IReadOnlyList<BulkDeleteStateItem> items, CancellationToken cancellationToken = default)
Expand Down
65 changes: 65 additions & 0 deletions src/Dapr.Client/SaveStateItem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System.Collections.Generic;

namespace Dapr.Client
{
/// <summary>
/// Represents a state object used for bulk delete state operation
/// </summary>
public readonly struct SaveStateItem<TValue>
{
/// <summary>
/// Initializes a new instance of the <see cref="SaveStateItem{TValue}"/> class.
/// </summary>
/// <param name="key">The state key.</param>
/// <param name="value">The state value.</param>
/// <param name="etag">The ETag.</param>
/// <param name="stateOptions">The stateOptions.</param>
/// <param name="metadata">The metadata.</param>
public SaveStateItem(string key, TValue value, string etag, StateOptions stateOptions = default, IReadOnlyDictionary<string, string> metadata = default)
{
this.Key = key;
this.Value = value;
this.ETag = etag;
this.StateOptions = stateOptions;
this.Metadata = metadata;
}

/// <summary>
/// Gets the state key.
/// </summary>
public string Key { get; }

/// <summary>
/// Gets the state value.
/// </summary>
public TValue Value { get; }

/// <summary>
/// Get the ETag.
/// </summary>
public string ETag { get; }

/// <summary>
/// Gets the StateOptions.
/// </summary>
public StateOptions StateOptions { get; }

/// <summary>
/// Gets the Metadata.
/// </summary>
public IReadOnlyDictionary<string, string> Metadata { get; }
}
}
43 changes: 43 additions & 0 deletions test/Dapr.Client.Test/StateApiTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,49 @@ public async Task SaveStateAsync_CanSaveState()
stateFromRequest.Size.Should().Be(widget.Size);
stateFromRequest.Color.Should().Be(widget.Color);
}

[Fact]
public async Task SaveBulkStateAsync_ValidateRequest()
{
await using var client = TestClient.CreateForDaprClient();

var stateItem1 = new SaveStateItem<object>("testKey1", "testValue1", "testEtag1",
new StateOptions { Concurrency = ConcurrencyMode.LastWrite },
new Dictionary<string, string> {{ "partitionKey1", "mypartition1" } });

var stateItem2 = new SaveStateItem<object>("testKey2", "testValue2", "testEtag2",
new StateOptions { Concurrency = ConcurrencyMode.LastWrite },
new Dictionary<string, string> {{ "partitionKey2", "mypartition2" } });

var stateItem3 = new SaveStateItem<object>("testKey3", "testValue3", null,
new StateOptions { Concurrency = ConcurrencyMode.LastWrite },
new Dictionary<string, string> {{ "partitionKey3", "mypartition3" } });

var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
await daprClient.SaveBulkStateAsync("testStore",
new List<SaveStateItem<object>>() { stateItem1, stateItem2, stateItem3});
});

request.Dismiss();

// Create Response & Validate
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.SaveStateRequest>();
envelope.StoreName.Should().Be("testStore");
envelope.States.Count.Should().Be(3);

envelope.States[0].Key.Should().Be("testKey1");
envelope.States[0].Value.Should().Equal(ByteString.CopyFromUtf8(JsonSerializer.Serialize("testValue1")));
envelope.States[0].Metadata.Should().ContainKey("partitionKey1");

envelope.States[1].Key.Should().Be("testKey2");
envelope.States[1].Value.Should().Equal(ByteString.CopyFromUtf8(JsonSerializer.Serialize("testValue2")));
envelope.States[1].Metadata.Should().ContainKey("partitionKey2");

envelope.States[2].Key.Should().Be("testKey3");
envelope.States[2].Value.Should().Equal(ByteString.CopyFromUtf8(JsonSerializer.Serialize("testValue3")));
envelope.States[2].Metadata.Should().ContainKey("partitionKey3");
}

[Fact]
public async Task GetStateAsync_WithCancelledToken()
Expand Down