Skip to content

Commit

Permalink
Fix tracing injection when event is non-JSON (#317)
Browse files Browse the repository at this point in the history
  • Loading branch information
josephcummings committed Aug 19, 2024
1 parent 6a12e6a commit 113f650
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 159 deletions.
176 changes: 94 additions & 82 deletions src/EventStore.Client.Streams/EventStoreClient.Append.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,21 @@ public async Task<IWriteResult> AppendToStreamAsync(
_log.LogDebug("Append to stream - {streamName}@{expectedRevision}.", streamName, expectedRevision);

var task = userCredentials is null && await BatchAppender.IsUsable().ConfigureAwait(false)
? BatchAppender.Append(streamName, expectedRevision, eventData, deadline, cancellationToken)
: AppendToStreamInternal(
await GetChannelInfo(cancellationToken).ConfigureAwait(false),
new AppendReq {
Options = new() {
StreamIdentifier = streamName,
Revision = expectedRevision
}
},
eventData,
options,
deadline,
userCredentials,
cancellationToken
);
? BatchAppender.Append(streamName, expectedRevision, eventData, deadline, cancellationToken)
: AppendToStreamInternal(
await GetChannelInfo(cancellationToken).ConfigureAwait(false),
new AppendReq {
Options = new() {
StreamIdentifier = streamName,
Revision = expectedRevision
}
},
eventData,
options,
deadline,
userCredentials,
cancellationToken
);

return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(options);
}
Expand Down Expand Up @@ -104,7 +104,7 @@ await GetChannelInfo(cancellationToken).ConfigureAwait(false),
return (await task.ConfigureAwait(false)).OptionallyThrowWrongExpectedVersionException(operationOptions);
}

ValueTask<IWriteResult> AppendToStreamInternal(
ValueTask<IWriteResult> AppendToStreamInternal(
ChannelInfo channelInfo,
AppendReq header,
IEnumerable<EventData> eventData,
Expand All @@ -113,28 +113,32 @@ ValueTask<IWriteResult> AppendToStreamInternal(
UserCredentials? userCredentials,
CancellationToken cancellationToken
) {
var tags = new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.EventStore.Stream, header.Options.StreamIdentifier.StreamName.ToStringUtf8())
.WithGrpcChannelServerTags(channelInfo)
.WithClientSettingsServerTags(Settings)
.WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? Settings.DefaultCredentials?.Username);
var tags = new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.EventStore.Stream, header.Options.StreamIdentifier.StreamName.ToStringUtf8())
.WithGrpcChannelServerTags(channelInfo)
.WithClientSettingsServerTags(Settings)
.WithOptionalTag(TelemetryTags.Database.User, userCredentials?.Username ?? Settings.DefaultCredentials?.Username);

return EventStoreClientDiagnostics.ActivitySource.TraceClientOperation(Operation, TracingConstants.Operations.Append, tags);
return EventStoreClientDiagnostics.ActivitySource.TraceClientOperation(Operation, TracingConstants.Operations.Append, tags);

async ValueTask<IWriteResult> Operation() {
using var call = new StreamsClient(channelInfo.CallInvoker)
.Append(EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));
.Append(EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken));

await call.RequestStream
.WriteAsync(header)
.ConfigureAwait(false);
.WriteAsync(header)
.ConfigureAwait(false);

foreach (var e in eventData) {
var appendReq = new AppendReq {
ProposedMessage = new() {
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(e.Metadata.InjectTracingContext(Activity.Current)),
Id = e.EventId.ToDto(),
Data = ByteString.CopyFrom(e.Data.Span),
CustomMetadata = ByteString.CopyFrom(
e.ContentType == Constants.Metadata.ContentTypes.ApplicationJson
? e.Metadata.InjectTracingContext(Activity.Current)
: e.Metadata.Span
),
Metadata = {
{ Constants.Metadata.Type, e.Type },
{ Constants.Metadata.ContentType, e.ContentType }
Expand All @@ -159,7 +163,7 @@ await call.RequestStream
}
}

IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
var currentRevision = response.Success.CurrentRevisionOptionCase == AppendResp.Types.Success.CurrentRevisionOptionOneofCase.NoStream
? StreamRevision.None
: new StreamRevision(response.Success.CurrentRevision);
Expand All @@ -178,13 +182,13 @@ IWriteResult HandleSuccessAppend(AppendResp response, AppendReq header) {
return new SuccessResult(currentRevision, position);
}

IWriteResult HandleWrongExpectedRevision(
IWriteResult HandleWrongExpectedRevision(
AppendResp response, AppendReq header, EventStoreClientOperationOptions operationOptions
) {
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase == CurrentRevisionOptionOneofCase.CurrentRevision
? new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
: StreamRevision.None;
var actualStreamRevision = response.WrongExpectedVersion.CurrentRevisionOptionCase == CurrentRevisionOptionOneofCase.CurrentRevision
? new StreamRevision(response.WrongExpectedVersion.CurrentRevision)
: StreamRevision.None;

_log.LogDebug(
"Append to stream failed with Wrong Expected Version - {streamName}/{expectedRevision}/{currentRevision}",
header.Options.StreamIdentifier,
Expand All @@ -201,12 +205,12 @@ IWriteResult HandleWrongExpectedRevision(
);
}

var expectedStreamState = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
ExpectedRevisionOptionOneofCase.ExpectedAny => StreamState.Any,
ExpectedRevisionOptionOneofCase.ExpectedNoStream => StreamState.NoStream,
ExpectedRevisionOptionOneofCase.ExpectedStreamExists => StreamState.StreamExists,
_ => StreamState.Any
};
var expectedStreamState = response.WrongExpectedVersion.ExpectedRevisionOptionCase switch {
ExpectedRevisionOptionOneofCase.ExpectedAny => StreamState.Any,
ExpectedRevisionOptionOneofCase.ExpectedNoStream => StreamState.NoStream,
ExpectedRevisionOptionOneofCase.ExpectedStreamExists => StreamState.StreamExists,
_ => StreamState.Any
};

throw new WrongExpectedVersionException(
header.Options.StreamIdentifier!,
Expand All @@ -226,16 +230,16 @@ IWriteResult HandleWrongExpectedRevision(
);
}

class StreamAppender : IDisposable {
readonly EventStoreClientSettings _settings;
readonly CancellationToken _cancellationToken;
readonly Action<Exception> _onException;
readonly Channel<BatchAppendReq> _channel;
readonly ConcurrentDictionary<Uuid, TaskCompletionSource<IWriteResult>> _pendingRequests;
readonly TaskCompletionSource<bool> _isUsable;
class StreamAppender : IDisposable {
readonly EventStoreClientSettings _settings;
readonly CancellationToken _cancellationToken;
readonly Action<Exception> _onException;
readonly Channel<BatchAppendReq> _channel;
readonly ConcurrentDictionary<Uuid, TaskCompletionSource<IWriteResult>> _pendingRequests;
readonly TaskCompletionSource<bool> _isUsable;

ChannelInfo? _channelInfo;
AsyncDuplexStreamingCall<BatchAppendReq, BatchAppendResp>? _call;
ChannelInfo? _channelInfo;
AsyncDuplexStreamingCall<BatchAppendReq, BatchAppendResp>? _call;

public StreamAppender(
EventStoreClientSettings settings,
Expand All @@ -255,8 +259,8 @@ Action<Exception> onException

public ValueTask<IWriteResult> Append(
string streamName, StreamRevision expectedStreamPosition,
IEnumerable<EventData> events, TimeSpan? timeoutAfter,
CancellationToken cancellationToken = default
IEnumerable<EventData> events, TimeSpan? timeoutAfter,
CancellationToken cancellationToken = default
) =>
AppendInternal(
BatchAppendReq.Types.Options.Create(streamName, expectedStreamPosition, timeoutAfter),
Expand All @@ -266,8 +270,8 @@ public ValueTask<IWriteResult> Append(

public ValueTask<IWriteResult> Append(
string streamName, StreamState expectedStreamState,
IEnumerable<EventData> events, TimeSpan? timeoutAfter,
CancellationToken cancellationToken = default
IEnumerable<EventData> events, TimeSpan? timeoutAfter,
CancellationToken cancellationToken = default
) =>
AppendInternal(
BatchAppendReq.Types.Options.Create(streamName, expectedStreamState, timeoutAfter),
Expand All @@ -277,21 +281,21 @@ public ValueTask<IWriteResult> Append(

public Task<bool> IsUsable() => _isUsable.Task;

ValueTask<IWriteResult> AppendInternal(
ValueTask<IWriteResult> AppendInternal(
BatchAppendReq.Types.Options options,
IEnumerable<EventData> events,
CancellationToken cancellationToken
IEnumerable<EventData> events,
CancellationToken cancellationToken
) {
var tags = new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.EventStore.Stream, options.StreamIdentifier.StreamName.ToStringUtf8())
.WithGrpcChannelServerTags(_channelInfo)
.WithClientSettingsServerTags(_settings)
.WithOptionalTag(TelemetryTags.Database.User, _settings.DefaultCredentials?.Username);
var tags = new ActivityTagsCollection()
.WithRequiredTag(TelemetryTags.EventStore.Stream, options.StreamIdentifier.StreamName.ToStringUtf8())
.WithGrpcChannelServerTags(_channelInfo)
.WithClientSettingsServerTags(_settings)
.WithOptionalTag(TelemetryTags.Database.User, _settings.DefaultCredentials?.Username);

return EventStoreClientDiagnostics.ActivitySource.TraceClientOperation(
Operation,
TracingConstants.Operations.Append,
tags
tags
);

async ValueTask<IWriteResult> Operation() {
Expand All @@ -300,9 +304,10 @@ async ValueTask<IWriteResult> Operation() {
var complete = _pendingRequests.GetOrAdd(correlationId, new TaskCompletionSource<IWriteResult>());

try {
foreach (var appendRequest in GetRequests(events, options, correlationId))
await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false);
} catch (ChannelClosedException ex) {
foreach (var appendRequest in GetRequests(events, options, correlationId))
await _channel.Writer.WriteAsync(appendRequest, cancellationToken).ConfigureAwait(false);
}
catch (ChannelClosedException ex) {
// channel is closed, our tcs won't necessarily get completed, don't wait for it.
throw ex.InnerException ?? ex;
}
Expand All @@ -311,7 +316,7 @@ async ValueTask<IWriteResult> Operation() {
}
}

async Task Duplex(ValueTask<ChannelInfo> channelInfoTask) {
async Task Duplex(ValueTask<ChannelInfo> channelInfoTask) {
try {
_channelInfo = await channelInfoTask.ConfigureAwait(false);
if (!_channelInfo.ServerCapabilities.SupportsBatchAppend) {
Expand All @@ -332,7 +337,8 @@ async Task Duplex(ValueTask<ChannelInfo> channelInfoTask) {
_ = Task.Run(Receive, _cancellationToken);

_isUsable.TrySetResult(true);
} catch (Exception ex) {
}
catch (Exception ex) {
_isUsable.TrySetException(ex);
_onException(ex);
}
Expand All @@ -342,8 +348,8 @@ async Task Duplex(ValueTask<ChannelInfo> channelInfoTask) {
async Task Send() {
if (_call is null) return;

await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false))
await _call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false);
await foreach (var appendRequest in _channel.Reader.ReadAllAsync(_cancellationToken).ConfigureAwait(false))
await _call.RequestStream.WriteAsync(appendRequest).ConfigureAwait(false);

await _call.RequestStream.CompleteAsync().ConfigureAwait(false);
}
Expand All @@ -359,11 +365,13 @@ async Task Receive() {

try {
writeResult.TrySetResult(response.ToWriteResult());
} catch (Exception ex) {
}
catch (Exception ex) {
writeResult.TrySetException(ex);
}
}
} catch (Exception ex) {
}
catch (Exception ex) {
// signal that no tcs added to _pendingRequests after this point will necessarily complete
_channel.Writer.TryComplete(ex);

Expand All @@ -376,17 +384,21 @@ async Task Receive() {
}
}

IEnumerable<BatchAppendReq> GetRequests(IEnumerable<EventData> events, BatchAppendReq.Types.Options options, Uuid correlationId) {
var batchSize = 0;
var first = true;
var correlationIdDto = correlationId.ToDto();
var proposedMessages = new List<BatchAppendReq.Types.ProposedMessage>();
IEnumerable<BatchAppendReq> GetRequests(IEnumerable<EventData> events, BatchAppendReq.Types.Options options, Uuid correlationId) {
var batchSize = 0;
var first = true;
var correlationIdDto = correlationId.ToDto();
var proposedMessages = new List<BatchAppendReq.Types.ProposedMessage>();

foreach (var eventData in events) {
var proposedMessage = new BatchAppendReq.Types.ProposedMessage {
Data = ByteString.CopyFrom(eventData.Data.Span),
CustomMetadata = ByteString.CopyFrom(eventData.Metadata.InjectTracingContext(Activity.Current)),
Id = eventData.EventId.ToDto(),
Data = ByteString.CopyFrom(eventData.Data.Span),
CustomMetadata = ByteString.CopyFrom(
eventData.ContentType == Constants.Metadata.ContentTypes.ApplicationJson
? eventData.Metadata.InjectTracingContext(Activity.Current)
: eventData.Metadata.Span
),
Id = eventData.EventId.ToDto(),
Metadata = {
{ Constants.Metadata.Type, eventData.Type },
{ Constants.Metadata.ContentType, eventData.ContentType }
Expand All @@ -396,7 +408,7 @@ IEnumerable<BatchAppendReq> GetRequests(IEnumerable<EventData> events, BatchAppe
proposedMessages.Add(proposedMessage);

if ((batchSize += proposedMessage.CalculateSize()) < _settings.OperationOptions.BatchAppendSize)
continue;
continue;

yield return new BatchAppendReq {
ProposedMessages = { proposedMessages },
Expand All @@ -423,4 +435,4 @@ public void Dispose() {
}
}
}
}
}
Loading

0 comments on commit 113f650

Please sign in to comment.