From a3a491b640568121092ae2f534d501c9b1b49b6f Mon Sep 17 00:00:00 2001 From: Hayley Campbell Date: Fri, 14 May 2021 11:37:52 +0200 Subject: [PATCH] Add filtered persistent subscriptions --- .../protos/persistentsubscriptions.proto | 4 + .../protos/shared.proto | 17 +++ .../protos/streams.proto | 17 +-- ...orePersistentSubscriptionsClient.Create.cs | 127 +++++++++++++++--- .../EventStoreClient.cs | 14 +- .../EventTypeFilter.cs | 0 .../IEventFilter.cs | 0 .../PrefixFilterExpression.cs | 0 .../RegularFilterExpression.cs | 0 .../StreamFilter.cs | 0 10 files changed, 141 insertions(+), 38 deletions(-) rename src/{EventStore.Client.Streams => EventStore.Client}/EventTypeFilter.cs (100%) rename src/{EventStore.Client.Streams => EventStore.Client}/IEventFilter.cs (100%) rename src/{EventStore.Client.Streams => EventStore.Client}/PrefixFilterExpression.cs (100%) rename src/{EventStore.Client.Streams => EventStore.Client}/RegularFilterExpression.cs (100%) rename src/{EventStore.Client.Streams => EventStore.Client}/StreamFilter.cs (100%) diff --git a/src/EventStore.Client.Common/protos/persistentsubscriptions.proto b/src/EventStore.Client.Common/protos/persistentsubscriptions.proto index 2f303bb7d..d1ef0d275 100644 --- a/src/EventStore.Client.Common/protos/persistentsubscriptions.proto +++ b/src/EventStore.Client.Common/protos/persistentsubscriptions.proto @@ -117,6 +117,10 @@ message CreateReq { event_store.client.Empty start = 2; event_store.client.Empty end = 3; } + oneof filter_option { + event_store.client.FilterOptions filter = 4; + event_store.client.Empty no_filter = 5; + } } message Position { diff --git a/src/EventStore.Client.Common/protos/shared.proto b/src/EventStore.Client.Common/protos/shared.proto index 6b18dd5ba..5cc8f08e0 100644 --- a/src/EventStore.Client.Common/protos/shared.proto +++ b/src/EventStore.Client.Common/protos/shared.proto @@ -20,3 +20,20 @@ message StreamIdentifier { reserved 1 to 2; bytes streamName = 3; } + +message FilterOptions { + oneof filter { + Expression stream_name = 1; + Expression event_type = 2; + } + oneof window { + uint32 max = 3; + event_store.client.Empty count = 4; + } + uint32 checkpointIntervalMultiplier = 5; + + message Expression { + string regex = 1; + repeated string prefix = 2; + } +} diff --git a/src/EventStore.Client.Common/protos/streams.proto b/src/EventStore.Client.Common/protos/streams.proto index 8c3bbbb7c..e760c38a6 100644 --- a/src/EventStore.Client.Common/protos/streams.proto +++ b/src/EventStore.Client.Common/protos/streams.proto @@ -26,7 +26,7 @@ message ReadReq { SubscriptionOptions subscription = 6; } oneof filter_option { - FilterOptions filter = 7; + event_store.client.FilterOptions filter = 7; event_store.client.Empty no_filter = 8; } UUIDOption uuid_option = 9; @@ -56,22 +56,7 @@ message ReadReq { uint64 commit_position = 1; uint64 prepare_position = 2; } - message FilterOptions { - oneof filter { - Expression stream_name = 1; - Expression event_type = 2; - } - oneof window { - uint32 max = 3; - event_store.client.Empty count = 4; - } - uint32 checkpointIntervalMultiplier = 5; - message Expression { - string regex = 1; - repeated string prefix = 2; - } - } message UUIDOption { oneof content { event_store.client.Empty structured = 1; diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs index b82011730..a26c71be6 100644 --- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs +++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs @@ -35,25 +35,84 @@ private static CreateReq.Types.StreamOptions StreamOptionsForCreateProto(string }; } - private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position position) { + private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position position, IEventFilter? filter) { + var allFilter = GetFilterOptions(filter); + CreateReq.Types.AllOptions allOptions; if (position == Position.Start) { - return new CreateReq.Types.AllOptions { - Start = new Empty() + allOptions = new CreateReq.Types.AllOptions { + Start = new Empty(), }; } - - if (position == Position.End) { - return new CreateReq.Types.AllOptions { + else if (position == Position.End) { + allOptions = new CreateReq.Types.AllOptions { End = new Empty() }; + } else { + allOptions = new CreateReq.Types.AllOptions { + Position = new CreateReq.Types.Position { + CommitPosition = position.CommitPosition, + PreparePosition = position.PreparePosition + } + }; } - return new CreateReq.Types.AllOptions { - Position = new CreateReq.Types.Position { - CommitPosition = position.CommitPosition, - PreparePosition = position.PreparePosition - } + if (allFilter is null) { + allOptions.NoFilter = new Empty(); + } else { + allOptions.Filter = allFilter; + } + + return allOptions; + } + + private static FilterOptions? GetFilterOptions(IEventFilter? filter) { + if (filter == null) { + return null; + } + + var options = filter switch { + StreamFilter _ => new FilterOptions { + StreamName = (filter.Prefixes, filter.Regex) switch { + (PrefixFilterExpression[] _, RegularFilterExpression _) + when (filter.Prefixes?.Length ?? 0) == 0 && + filter.Regex != RegularFilterExpression.None => + new FilterOptions.Types.Expression + {Regex = filter.Regex}, + (PrefixFilterExpression[] _, RegularFilterExpression _) + when (filter.Prefixes?.Length ?? 0) != 0 && + filter.Regex == RegularFilterExpression.None => + new FilterOptions.Types.Expression { + Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())} + }, + _ => throw new InvalidOperationException() + } + }, + EventTypeFilter _ => new FilterOptions { + EventType = (filter.Prefixes, filter.Regex) switch { + (PrefixFilterExpression[] _, RegularFilterExpression _) + when (filter.Prefixes?.Length ?? 0) == 0 && + filter.Regex != RegularFilterExpression.None => + new FilterOptions.Types.Expression + {Regex = filter.Regex}, + (PrefixFilterExpression[] _, RegularFilterExpression _) + when (filter.Prefixes?.Length ?? 0) != 0 && + filter.Regex == RegularFilterExpression.None => + new FilterOptions.Types.Expression { + Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())} + }, + _ => throw new InvalidOperationException() + } + }, + _ => throw new InvalidOperationException() }; + + if (filter.MaxSearchWindow.HasValue) { + options.Max = filter.MaxSearchWindow.Value; + } else { + options.Count = new Empty(); + } + + return options; } @@ -68,6 +127,18 @@ private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position posi /// /// public async Task CreateAsync(string streamName, string groupName, + PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null, + CancellationToken cancellationToken = default) => + await CreateInternalAsync( + streamName: streamName, + groupName: groupName, + eventFilter: null, + settings: settings, + userCredentials: userCredentials, + cancellationToken: cancellationToken) + .ConfigureAwait(false); + + private async Task CreateInternalAsync(string streamName, string groupName, IEventFilter? eventFilter, PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) { if (streamName == null) { @@ -90,13 +161,17 @@ public async Task CreateAsync(string streamName, string groupName, throw new ArgumentException($"{nameof(settings.StartFrom)} must be of type '{nameof(Position)}' when subscribing to {SystemStreams.AllStream}"); } + if (eventFilter != null && streamName != SystemStreams.AllStream) { + throw new ArgumentException($"Filters are only supported when subscribing to {SystemStreams.AllStream}"); + } + await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient( await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(new CreateReq { Options = new CreateReq.Types.Options { Stream = streamName != SystemStreams.AllStream ? StreamOptionsForCreateProto(streamName, (StreamPosition)(settings.StartFrom ?? StreamPosition.End)) : null, All = streamName == SystemStreams.AllStream ? - AllOptionsForCreateProto((Position)(settings.StartFrom ?? Position.End)) : null, + AllOptionsForCreateProto((Position)(settings.StartFrom ?? Position.End), eventFilter) : null, #pragma warning disable 612 StreamIdentifier = streamName != SystemStreams.AllStream ? streamName : string.Empty, /*for backwards compatibility*/ #pragma warning restore 612 @@ -123,22 +198,44 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(ne } /// - /// Creates a persistent subscription to $all. + /// Creates a filtered persistent subscription to $all. /// /// + /// /// /// /// /// - public async Task CreateToAllAsync(string groupName, + public async Task CreateToAllAsync(string groupName, IEventFilter eventFilter, PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null, CancellationToken cancellationToken = default) => - await CreateAsync( + await CreateInternalAsync( streamName: SystemStreams.AllStream, groupName: groupName, + eventFilter: eventFilter, settings: settings, userCredentials: userCredentials, cancellationToken: cancellationToken) .ConfigureAwait(false); + + /// + /// Creates a persistent subscription to $all. + /// + /// + /// + /// + /// + /// + public async Task CreateToAllAsync(string groupName, + PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null, + CancellationToken cancellationToken = default) => + await CreateInternalAsync( + streamName: SystemStreams.AllStream, + groupName: groupName, + eventFilter: null, + settings: settings, + userCredentials: userCredentials, + cancellationToken: cancellationToken) + .ConfigureAwait(false); } } diff --git a/src/EventStore.Client.Streams/EventStoreClient.cs b/src/EventStore.Client.Streams/EventStoreClient.cs index 11eb23c97..ef4427b7a 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.cs @@ -62,7 +62,7 @@ public EventStoreClient(EventStoreClientSettings? settings = null) : base(settin _log = Settings.LoggerFactory?.CreateLogger() ?? new NullLogger(); } - private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions( + private static FilterOptions? GetFilterOptions( SubscriptionFilterOptions? filterOptions) { if (filterOptions == null) { return null; @@ -71,33 +71,33 @@ public EventStoreClient(EventStoreClientSettings? settings = null) : base(settin var filter = filterOptions.Filter; var options = filter switch { - StreamFilter _ => new ReadReq.Types.Options.Types.FilterOptions { + StreamFilter _ => new FilterOptions { StreamName = (filter.Prefixes, filter.Regex) switch { (PrefixFilterExpression[] _, RegularFilterExpression _) when (filter.Prefixes?.Length ?? 0) == 0 && filter.Regex != RegularFilterExpression.None => - new ReadReq.Types.Options.Types.FilterOptions.Types.Expression + new FilterOptions.Types.Expression {Regex = filter.Regex}, (PrefixFilterExpression[] _, RegularFilterExpression _) when (filter.Prefixes?.Length ?? 0) != 0 && filter.Regex == RegularFilterExpression.None => - new ReadReq.Types.Options.Types.FilterOptions.Types.Expression { + new FilterOptions.Types.Expression { Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())} }, _ => throw new InvalidOperationException() } }, - EventTypeFilter _ => new ReadReq.Types.Options.Types.FilterOptions { + EventTypeFilter _ => new FilterOptions { EventType = (filter.Prefixes, filter.Regex) switch { (PrefixFilterExpression[] _, RegularFilterExpression _) when (filter.Prefixes?.Length ?? 0) == 0 && filter.Regex != RegularFilterExpression.None => - new ReadReq.Types.Options.Types.FilterOptions.Types.Expression + new FilterOptions.Types.Expression {Regex = filter.Regex}, (PrefixFilterExpression[] _, RegularFilterExpression _) when (filter.Prefixes?.Length ?? 0) != 0 && filter.Regex == RegularFilterExpression.None => - new ReadReq.Types.Options.Types.FilterOptions.Types.Expression { + new FilterOptions.Types.Expression { Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())} }, _ => throw new InvalidOperationException() diff --git a/src/EventStore.Client.Streams/EventTypeFilter.cs b/src/EventStore.Client/EventTypeFilter.cs similarity index 100% rename from src/EventStore.Client.Streams/EventTypeFilter.cs rename to src/EventStore.Client/EventTypeFilter.cs diff --git a/src/EventStore.Client.Streams/IEventFilter.cs b/src/EventStore.Client/IEventFilter.cs similarity index 100% rename from src/EventStore.Client.Streams/IEventFilter.cs rename to src/EventStore.Client/IEventFilter.cs diff --git a/src/EventStore.Client.Streams/PrefixFilterExpression.cs b/src/EventStore.Client/PrefixFilterExpression.cs similarity index 100% rename from src/EventStore.Client.Streams/PrefixFilterExpression.cs rename to src/EventStore.Client/PrefixFilterExpression.cs diff --git a/src/EventStore.Client.Streams/RegularFilterExpression.cs b/src/EventStore.Client/RegularFilterExpression.cs similarity index 100% rename from src/EventStore.Client.Streams/RegularFilterExpression.cs rename to src/EventStore.Client/RegularFilterExpression.cs diff --git a/src/EventStore.Client.Streams/StreamFilter.cs b/src/EventStore.Client/StreamFilter.cs similarity index 100% rename from src/EventStore.Client.Streams/StreamFilter.cs rename to src/EventStore.Client/StreamFilter.cs