Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filtered persistent subscriptions to $all #122

Merged
merged 1 commit into from
Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ message CreateReq {
event_store.client.Empty start = 2;
event_store.client.Empty end = 3;
}
oneof filter_option {
event_store.client.FilterOptions filter = 4;
event_store.client.Empty no_filter = 5;
}
}

message Position {
Expand Down
17 changes: 17 additions & 0 deletions src/EventStore.Client.Common/protos/shared.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,20 @@ message StreamIdentifier {
reserved 1 to 2;
bytes streamName = 3;
}

message FilterOptions {
oneof filter {
Expression stream_name = 1;
Expression event_type = 2;
}
oneof window {
uint32 max = 3;
event_store.client.Empty count = 4;
}
uint32 checkpointIntervalMultiplier = 5;

message Expression {
string regex = 1;
repeated string prefix = 2;
}
}
17 changes: 1 addition & 16 deletions src/EventStore.Client.Common/protos/streams.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ message ReadReq {
SubscriptionOptions subscription = 6;
}
oneof filter_option {
FilterOptions filter = 7;
event_store.client.FilterOptions filter = 7;
event_store.client.Empty no_filter = 8;
}
UUIDOption uuid_option = 9;
Expand Down Expand Up @@ -56,22 +56,7 @@ message ReadReq {
uint64 commit_position = 1;
uint64 prepare_position = 2;
}
message FilterOptions {
oneof filter {
Expression stream_name = 1;
Expression event_type = 2;
}
oneof window {
uint32 max = 3;
event_store.client.Empty count = 4;
}
uint32 checkpointIntervalMultiplier = 5;

message Expression {
string regex = 1;
repeated string prefix = 2;
}
}
message UUIDOption {
oneof content {
event_store.client.Empty structured = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,84 @@ private static CreateReq.Types.StreamOptions StreamOptionsForCreateProto(string
};
}

private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position position) {
private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position position, IEventFilter? filter) {
var allFilter = GetFilterOptions(filter);
CreateReq.Types.AllOptions allOptions;
if (position == Position.Start) {
return new CreateReq.Types.AllOptions {
Start = new Empty()
allOptions = new CreateReq.Types.AllOptions {
Start = new Empty(),
};
}

if (position == Position.End) {
return new CreateReq.Types.AllOptions {
else if (position == Position.End) {
allOptions = new CreateReq.Types.AllOptions {
End = new Empty()
};
} else {
allOptions = new CreateReq.Types.AllOptions {
Position = new CreateReq.Types.Position {
CommitPosition = position.CommitPosition,
PreparePosition = position.PreparePosition
}
};
}

return new CreateReq.Types.AllOptions {
Position = new CreateReq.Types.Position {
CommitPosition = position.CommitPosition,
PreparePosition = position.PreparePosition
}
if (allFilter is null) {
allOptions.NoFilter = new Empty();
} else {
allOptions.Filter = allFilter;
}

return allOptions;
}

private static FilterOptions? GetFilterOptions(IEventFilter? filter) {
if (filter == null) {
return null;
}

var options = filter switch {
StreamFilter _ => new FilterOptions {
StreamName = (filter.Prefixes, filter.Regex) switch {
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) == 0 &&
filter.Regex != RegularFilterExpression.None =>
new FilterOptions.Types.Expression
{Regex = filter.Regex},
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) != 0 &&
filter.Regex == RegularFilterExpression.None =>
new FilterOptions.Types.Expression {
Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())}
},
_ => throw new InvalidOperationException()
}
},
EventTypeFilter _ => new FilterOptions {
EventType = (filter.Prefixes, filter.Regex) switch {
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) == 0 &&
filter.Regex != RegularFilterExpression.None =>
new FilterOptions.Types.Expression
{Regex = filter.Regex},
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) != 0 &&
filter.Regex == RegularFilterExpression.None =>
new FilterOptions.Types.Expression {
Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())}
},
_ => throw new InvalidOperationException()
}
},
_ => throw new InvalidOperationException()
};

if (filter.MaxSearchWindow.HasValue) {
options.Max = filter.MaxSearchWindow.Value;
} else {
options.Count = new Empty();
}

return options;
}


Expand All @@ -68,6 +127,18 @@ private static CreateReq.Types.AllOptions AllOptionsForCreateProto(Position posi
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public async Task CreateAsync(string streamName, string groupName,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
await CreateInternalAsync(
streamName: streamName,
groupName: groupName,
eventFilter: null,
settings: settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

private async Task CreateInternalAsync(string streamName, string groupName, IEventFilter? eventFilter,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) {
if (streamName == null) {
Expand All @@ -90,13 +161,17 @@ public async Task CreateAsync(string streamName, string groupName,
throw new ArgumentException($"{nameof(settings.StartFrom)} must be of type '{nameof(Position)}' when subscribing to {SystemStreams.AllStream}");
}

if (eventFilter != null && streamName != SystemStreams.AllStream) {
throw new ArgumentException($"Filters are only supported when subscribing to {SystemStreams.AllStream}");
}

await new PersistentSubscriptions.PersistentSubscriptions.PersistentSubscriptionsClient(
await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(new CreateReq {
Options = new CreateReq.Types.Options {
Stream = streamName != SystemStreams.AllStream ?
StreamOptionsForCreateProto(streamName, (StreamPosition)(settings.StartFrom ?? StreamPosition.End)) : null,
All = streamName == SystemStreams.AllStream ?
AllOptionsForCreateProto((Position)(settings.StartFrom ?? Position.End)) : null,
AllOptionsForCreateProto((Position)(settings.StartFrom ?? Position.End), eventFilter) : null,
#pragma warning disable 612
StreamIdentifier = streamName != SystemStreams.AllStream ? streamName : string.Empty, /*for backwards compatibility*/
#pragma warning restore 612
Expand All @@ -123,22 +198,44 @@ await SelectCallInvoker(cancellationToken).ConfigureAwait(false)).CreateAsync(ne
}

/// <summary>
/// Creates a persistent subscription to $all.
/// Creates a filtered persistent subscription to $all.
/// </summary>
/// <param name="groupName"></param>
/// <param name="eventFilter"></param>
/// <param name="settings"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task CreateToAllAsync(string groupName,
public async Task CreateToAllAsync(string groupName, IEventFilter eventFilter,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
await CreateAsync(
await CreateInternalAsync(
streamName: SystemStreams.AllStream,
groupName: groupName,
eventFilter: eventFilter,
settings: settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

/// <summary>
/// Creates a persistent subscription to $all.
/// </summary>
/// <param name="groupName"></param>
/// <param name="settings"></param>
/// <param name="userCredentials"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task CreateToAllAsync(string groupName,
PersistentSubscriptionSettings settings, UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default) =>
await CreateInternalAsync(
streamName: SystemStreams.AllStream,
groupName: groupName,
eventFilter: null,
settings: settings,
userCredentials: userCredentials,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
}
14 changes: 7 additions & 7 deletions src/EventStore.Client.Streams/EventStoreClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public EventStoreClient(EventStoreClientSettings? settings = null) : base(settin
_log = Settings.LoggerFactory?.CreateLogger<EventStoreClient>() ?? new NullLogger<EventStoreClient>();
}

private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(
private static FilterOptions? GetFilterOptions(
SubscriptionFilterOptions? filterOptions) {
if (filterOptions == null) {
return null;
Expand All @@ -71,33 +71,33 @@ public EventStoreClient(EventStoreClientSettings? settings = null) : base(settin
var filter = filterOptions.Filter;

var options = filter switch {
StreamFilter _ => new ReadReq.Types.Options.Types.FilterOptions {
StreamFilter _ => new FilterOptions {
StreamName = (filter.Prefixes, filter.Regex) switch {
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) == 0 &&
filter.Regex != RegularFilterExpression.None =>
new ReadReq.Types.Options.Types.FilterOptions.Types.Expression
new FilterOptions.Types.Expression
{Regex = filter.Regex},
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) != 0 &&
filter.Regex == RegularFilterExpression.None =>
new ReadReq.Types.Options.Types.FilterOptions.Types.Expression {
new FilterOptions.Types.Expression {
Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())}
},
_ => throw new InvalidOperationException()
}
},
EventTypeFilter _ => new ReadReq.Types.Options.Types.FilterOptions {
EventTypeFilter _ => new FilterOptions {
EventType = (filter.Prefixes, filter.Regex) switch {
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) == 0 &&
filter.Regex != RegularFilterExpression.None =>
new ReadReq.Types.Options.Types.FilterOptions.Types.Expression
new FilterOptions.Types.Expression
{Regex = filter.Regex},
(PrefixFilterExpression[] _, RegularFilterExpression _)
when (filter.Prefixes?.Length ?? 0) != 0 &&
filter.Regex == RegularFilterExpression.None =>
new ReadReq.Types.Options.Types.FilterOptions.Types.Expression {
new FilterOptions.Types.Expression {
Prefix = {Array.ConvertAll(filter.Prefixes!, e => e.ToString())}
},
_ => throw new InvalidOperationException()
Expand Down