diff --git a/src/EventStore.Client.Common/protos/persistentsubscriptions.proto b/src/EventStore.Client.Common/protos/persistentsubscriptions.proto
index 2f303bb7d..d1ef0d275 100644
--- a/src/EventStore.Client.Common/protos/persistentsubscriptions.proto
+++ b/src/EventStore.Client.Common/protos/persistentsubscriptions.proto
@@ -117,6 +117,10 @@ message CreateReq {
event_store.client.Empty start = 2;
event_store.client.Empty end = 3;
}
+ oneof filter_option {
+ event_store.client.FilterOptions filter = 4;
+ event_store.client.Empty no_filter = 5;
+ }
}
message Position {
diff --git a/src/EventStore.Client.Common/protos/shared.proto b/src/EventStore.Client.Common/protos/shared.proto
index 6b18dd5ba..5cc8f08e0 100644
--- a/src/EventStore.Client.Common/protos/shared.proto
+++ b/src/EventStore.Client.Common/protos/shared.proto
@@ -20,3 +20,20 @@ message StreamIdentifier {
reserved 1 to 2;
bytes streamName = 3;
}
+
+message FilterOptions {
+ oneof filter {
+ Expression stream_name = 1;
+ Expression event_type = 2;
+ }
+ oneof window {
+ uint32 max = 3;
+ event_store.client.Empty count = 4;
+ }
+ uint32 checkpointIntervalMultiplier = 5;
+
+ message Expression {
+ string regex = 1;
+ repeated string prefix = 2;
+ }
+}
diff --git a/src/EventStore.Client.Common/protos/streams.proto b/src/EventStore.Client.Common/protos/streams.proto
index 8c3bbbb7c..e760c38a6 100644
--- a/src/EventStore.Client.Common/protos/streams.proto
+++ b/src/EventStore.Client.Common/protos/streams.proto
@@ -26,7 +26,7 @@ message ReadReq {
SubscriptionOptions subscription = 6;
}
oneof filter_option {
- FilterOptions filter = 7;
+ event_store.client.FilterOptions filter = 7;
event_store.client.Empty no_filter = 8;
}
UUIDOption uuid_option = 9;
@@ -56,22 +56,7 @@ message ReadReq {
uint64 commit_position = 1;
uint64 prepare_position = 2;
}
- message FilterOptions {
- oneof filter {
- Expression stream_name = 1;
- Expression event_type = 2;
- }
- oneof window {
- uint32 max = 3;
- event_store.client.Empty count = 4;
- }
- uint32 checkpointIntervalMultiplier = 5;
- message Expression {
- string regex = 1;
- repeated string prefix = 2;
- }
- }
message UUIDOption {
oneof content {
event_store.client.Empty structured = 1;
diff --git a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs
index b82011730..a26c71be6 100644
--- a/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs
+++ b/src/EventStore.Client.PersistentSubscriptions/EventStorePersistentSubscriptionsClient.Create.cs
@@ -35,25 +35,84 @@ private static CreateReq.Types.StreamOptions StreamOptionsForCreateProto(string
};
}
- private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position position) {
+ private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position position, IEventFilter? filter) {
+ var allFilter = GetFilterOptions(filter);
+ CreateReq.Types.AllOptions allOptions;
if (position == Position.Start) {
- return new CreateReq.Types.AllOptions {
- Start = new Empty()
+ allOptions = new CreateReq.Types.AllOptions {
+ Start = new Empty(),
};
}
-
- if (position == Position.End) {
- return new CreateReq.Types.AllOptions {
+ else if (position == Position.End) {
+ allOptions = new CreateReq.Types.AllOptions {
End = new Empty()
};
+ } else {
+ allOptions = new CreateReq.Types.AllOptions {
+ Position = new CreateReq.Types.Position {
+ CommitPosition = position.CommitPosition,
+ PreparePosition = position.PreparePosition
+ }
+ };
}
- return new CreateReq.Types.AllOptions {
- Position = new CreateReq.Types.Position {
- CommitPosition = position.CommitPosition,
- PreparePosition = position.PreparePosition
- }
+ if (allFilter is null) {
+ allOptions.NoFilter = new Empty();
+ } else {
+ allOptions.Filter = allFilter;
+ }
+
+ return allOptions;
+ }
+
+ private static FilterOptions? GetFilterOptions(IEventFilter? filter) {
+ if (filter == null) {
+ return null;
+ }
+
+ var options = filter switch {
+ StreamFilter _ => new FilterOptions {
+ StreamName = (filter.Prefixes, filter.Regex) switch {
+ (PrefixFilterExpression[] _, RegularFilterExpression _)
+ when (filter.Prefixes?.Length ?? 0) == 0 &&
+ filter.Regex != RegularFilterExpression.None =>
+ new FilterOptions.Types.Expression
+ {Regex = filter.Regex},
+ (PrefixFilterExpression[] _, RegularFilterExpression _)
+ when (filter.Prefixes?.Length ?? 0) != 0 &&
+ filter.Regex == RegularFilterExpression.None =>
+ new FilterOptions.Types.Expression {
+ Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())}
+ },
+ _ => throw new InvalidOperationException()
+ }
+ },
+ EventTypeFilter _ => new FilterOptions {
+ EventType = (filter.Prefixes, filter.Regex) switch {
+ (PrefixFilterExpression[] _, RegularFilterExpression _)
+ when (filter.Prefixes?.Length ?? 0) == 0 &&
+ filter.Regex != RegularFilterExpression.None =>
+ new FilterOptions.Types.Expression
+ {Regex = filter.Regex},
+ (PrefixFilterExpression[] _, RegularFilterExpression _)
+ when (filter.Prefixes?.Length ?? 0) != 0 &&
+ filter.Regex == RegularFilterExpression.None =>
+ new FilterOptions.Types.Expression {
+ Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())}
+ },
+ _ => throw new InvalidOperationException()
+ }
+ },
+ _ => throw new InvalidOperationException()
};
+
+ if (filter.MaxSearchWindow.HasValue) {
+ options.Max = filter.MaxSearchWindow.Value;
+ } else {
+ options.Count = new Empty();
+ }
+
+ return options;
}
@@ -68,6 +127,18 @@ private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position posi
///
///
public async Task CreateAsync(string streamName, string groupName,
+ PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
+ CancellationToken cancellationToken = default) =>
+ await CreateInternalAsync(
+ streamName: streamName,
+ groupName: groupName,
+ eventFilter: null,
+ settings: settings,
+ userCredentials: userCredentials,
+ cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
+
+ private async Task CreateInternalAsync(string streamName, string groupName, IEventFilter? eventFilter,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
if (streamName == null) {
@@ -90,13 +161,17 @@ public async Task CreateAsync(string streamName, string groupName,
throw new ArgumentException($"{nameof(settings.StartFrom)} must be of type '{nameof(Position)}' when subscribing to {SystemStreams.AllStream}");
}
+ if (eventFilter != null && streamName != SystemStreams.AllStream) {
+ throw new ArgumentException($"Filters are only supported when subscribing to {SystemStreams.AllStream}");
+ }
+
await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(new CreateReq {
Options = new CreateReq.Types.Options {
Stream = streamName != SystemStreams.AllStream ?
StreamOptionsForCreateProto(streamName, (StreamPosition)(settings.StartFrom ?? StreamPosition.End)) : null,
All = streamName == SystemStreams.AllStream ?
- AllOptionsForCreateProto((Position)(settings.StartFrom ?? Position.End)) : null,
+ AllOptionsForCreateProto((Position)(settings.StartFrom ?? Position.End), eventFilter) : null,
#pragma warning disable 612
StreamIdentifier = streamName != SystemStreams.AllStream ? streamName : string.Empty, /*for backwards compatibility*/
#pragma warning restore 612
@@ -123,22 +198,44 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(ne
}
///
- /// Creates a persistent subscription to $all.
+ /// Creates a filtered persistent subscription to $all.
///
///
+ ///
///
///
///
///
- public async Task CreateToAllAsync(string groupName,
+ public async Task CreateToAllAsync(string groupName, IEventFilter eventFilter,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
- await CreateAsync(
+ await CreateInternalAsync(
streamName: SystemStreams.AllStream,
groupName: groupName,
+ eventFilter: eventFilter,
settings: settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
+
+ ///
+ /// Creates a persistent subscription to $all.
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ public async Task CreateToAllAsync(string groupName,
+ PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
+ CancellationToken cancellationToken = default) =>
+ await CreateInternalAsync(
+ streamName: SystemStreams.AllStream,
+ groupName: groupName,
+ eventFilter: null,
+ settings: settings,
+ userCredentials: userCredentials,
+ cancellationToken: cancellationToken)
+ .ConfigureAwait(false);
}
}
diff --git a/src/EventStore.Client.Streams/EventStoreClient.cs b/src/EventStore.Client.Streams/EventStoreClient.cs
index 11eb23c97..ef4427b7a 100644
--- a/src/EventStore.Client.Streams/EventStoreClient.cs
+++ b/src/EventStore.Client.Streams/EventStoreClient.cs
@@ -62,7 +62,7 @@ public EventStoreClient(EventStoreClientSettings? settings = null) : base(settin
_log = Settings.LoggerFactory?.CreateLogger() ?? new NullLogger();
}
- private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(
+ private static FilterOptions? GetFilterOptions(
SubscriptionFilterOptions? filterOptions) {
if (filterOptions == null) {
return null;
@@ -71,33 +71,33 @@ public EventStoreClient(EventStoreClientSettings? settings = null) : base(settin
var filter = filterOptions.Filter;
var options = filter switch {
- StreamFilter _ => new ReadReq.Types.Options.Types.FilterOptions {
+ StreamFilter _ => new FilterOptions {
StreamName = (filter.Prefixes, filter.Regex) switch {
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) == 0 &&
filter.Regex != RegularFilterExpression.None =>
- new ReadReq.Types.Options.Types.FilterOptions.Types.Expression
+ new FilterOptions.Types.Expression
{Regex = filter.Regex},
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) != 0 &&
filter.Regex == RegularFilterExpression.None =>
- new ReadReq.Types.Options.Types.FilterOptions.Types.Expression {
+ new FilterOptions.Types.Expression {
Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())}
},
_ => throw new InvalidOperationException()
}
},
- EventTypeFilter _ => new ReadReq.Types.Options.Types.FilterOptions {
+ EventTypeFilter _ => new FilterOptions {
EventType = (filter.Prefixes, filter.Regex) switch {
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) == 0 &&
filter.Regex != RegularFilterExpression.None =>
- new ReadReq.Types.Options.Types.FilterOptions.Types.Expression
+ new FilterOptions.Types.Expression
{Regex = filter.Regex},
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) != 0 &&
filter.Regex == RegularFilterExpression.None =>
- new ReadReq.Types.Options.Types.FilterOptions.Types.Expression {
+ new FilterOptions.Types.Expression {
Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())}
},
_ => throw new InvalidOperationException()
diff --git a/src/EventStore.Client.Streams/EventTypeFilter.cs b/src/EventStore.Client/EventTypeFilter.cs
similarity index 100%
rename from src/EventStore.Client.Streams/EventTypeFilter.cs
rename to src/EventStore.Client/EventTypeFilter.cs
diff --git a/src/EventStore.Client.Streams/IEventFilter.cs b/src/EventStore.Client/IEventFilter.cs
similarity index 100%
rename from src/EventStore.Client.Streams/IEventFilter.cs
rename to src/EventStore.Client/IEventFilter.cs
diff --git a/src/EventStore.Client.Streams/PrefixFilterExpression.cs b/src/EventStore.Client/PrefixFilterExpression.cs
similarity index 100%
rename from src/EventStore.Client.Streams/PrefixFilterExpression.cs
rename to src/EventStore.Client/PrefixFilterExpression.cs
diff --git a/src/EventStore.Client.Streams/RegularFilterExpression.cs b/src/EventStore.Client/RegularFilterExpression.cs
similarity index 100%
rename from src/EventStore.Client.Streams/RegularFilterExpression.cs
rename to src/EventStore.Client/RegularFilterExpression.cs
diff --git a/src/EventStore.Client.Streams/StreamFilter.cs b/src/EventStore.Client/StreamFilter.cs
similarity index 100%
rename from src/EventStore.Client.Streams/StreamFilter.cs
rename to src/EventStore.Client/StreamFilter.cs