Skip to content

Commit

Permalink
Fixed JetStream ACK custom serialization (#255)
Browse files Browse the repository at this point in the history
* Fixed JetStream ACK custom serialization

When a custom serializer used JetStream msg ACKs are still being serialized
using the custom serializer which is resulting in +ACK being sent to the
server with a distorted format. Now we just used the NATS default serializer
on message ACKs to format the +ACK/NAK/etc. payload correctly.

ConsumerFetchTest.cs was also passing for the wrong reasons, which is also
fixed. Plus minor format issue fixed.

* Fixed consume ack test

* Test flap tuning
  • Loading branch information
mtmk committed Nov 28, 2023
1 parent 0860566 commit a387d6e
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 53 deletions.
8 changes: 7 additions & 1 deletion src/NATS.Client.JetStream/NatsJSMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,12 @@ private async ValueTask SendAckAsync(ReadOnlySequence<byte> payload, AckOpts opt

if ((opts.DoubleAck ?? _context.Opts.AckOpts.DoubleAck) == true)
{
await Connection.RequestAsync<ReadOnlySequence<byte>, object?>(ReplyTo, payload, cancellationToken: cancellationToken);
await Connection.RequestAsync<ReadOnlySequence<byte>, object?>(
subject: ReplyTo,
data: payload,
requestSerializer: NatsRawSerializer<ReadOnlySequence<byte>>.Default,
replySerializer: NatsRawSerializer<object?>.Default,
cancellationToken: cancellationToken);
}
else
{
Expand All @@ -149,6 +154,7 @@ await _msg.ReplyAsync(
{
WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,
},
serializer: NatsRawSerializer<ReadOnlySequence<byte>>.Default,
cancellationToken: cancellationToken);
}
}
Expand Down
108 changes: 75 additions & 33 deletions tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,79 +230,99 @@ await Retry.Until(
[Fact]
public async Task Consume_dispose_test()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await using var server = NatsServer.StartJS();

await using var nats = server.CreateClientConnection();

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

var js = new NatsJSContext(nats);
var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
var consumer = (NatsJSConsumer)await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

var consumerOpts = new NatsJSConsumeOpts
{
MaxMsgs = 10,
MaxMsgs = 100,
IdleHeartbeat = TimeSpan.FromSeconds(5),
Expires = TimeSpan.FromSeconds(10),
};

for (var i = 0; i < 100; i++)
for (var i = 0; i < 10; i++)
{
var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, serializer: TestDataJsonSerializer<TestData>.Default, cancellationToken: cts.Token);
ack.EnsureSuccess();
}

var cc = await consumer.ConsumeInternalAsync<TestData>(serializer: TestDataJsonSerializer<TestData>.Default, consumerOpts, cancellationToken: cts.Token);

var signal = new WaitSignal();
var signal1 = new WaitSignal();
var signal2 = new WaitSignal();
var reader = Task.Run(async () =>
{
var count = 0;
await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
signal.Pulse();
// Introduce delay to make sure not all messages will be acked.
await Task.Delay(1_000, cts.Token);
signal1.Pulse();
await signal2;
_output.WriteLine($">>>>>>>> {count++}");

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 267 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

// dispose will end the loop

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (release/v2.9.23)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (release/v2.9.23)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 268 in tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

}
});

await signal;
await signal1;

// Dispose waits for all the pending messages to be delivered to the loop
// since the channel reader carries on reading the messages in its internal queue.
await cc.DisposeAsync();

await reader;
// At this point we should only have ACKed one message
await Retry.Until(
"ack pending 9",
async () =>
{
var c = await js.GetConsumerAsync("s1", "c1", cts.Token);
return c.Info.NumAckPending == 9;
},
timeout: TimeSpan.FromSeconds(20));
await consumer.RefreshAsync(cts.Token);
Assert.Equal(9, consumer.Info.NumAckPending);

var infos = new List<INatsJSConsumer>();
await foreach (var natsJSConsumer in stream.ListConsumersAsync(cts.Token))
{
infos.Add(natsJSConsumer);
}
signal2.Pulse();

Assert.Single(infos);
await reader;

Assert.True(infos[0].Info.NumAckPending > 0);
await Retry.Until(
"ack pending 0",
async () =>
{
var c = await js.GetConsumerAsync("s1", "c1", cts.Token);
return c.Info.NumAckPending == 0;
},
timeout: TimeSpan.FromSeconds(20));
await consumer.RefreshAsync(cts.Token);
Assert.Equal(0, consumer.Info.NumAckPending);
}

[Fact]
public async Task Consume_stop_test()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await using var server = NatsServer.StartJS();

await using var nats = server.CreateClientConnection();

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

var js = new NatsJSContext(nats);
var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
var consumer = (NatsJSConsumer)await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

var consumerOpts = new NatsJSConsumeOpts
{
MaxMsgs = 10,
MaxMsgs = 100,
IdleHeartbeat = TimeSpan.FromSeconds(2),
Expires = TimeSpan.FromSeconds(4),
};

for (var i = 0; i < 100; i++)
for (var i = 0; i < 10; i++)
{
var ack = await js.PublishAsync("s1.foo", new TestData { Test = i }, serializer: TestDataJsonSerializer<TestData>.Default, cancellationToken: cts.Token);
ack.EnsureSuccess();
Expand All @@ -311,29 +331,51 @@ public async Task Consume_stop_test()
var consumeStop = CancellationTokenSource.CreateLinkedTokenSource(cts.Token);
var cc = await consumer.ConsumeInternalAsync<TestData>(serializer: TestDataJsonSerializer<TestData>.Default, consumerOpts, cancellationToken: consumeStop.Token);

var signal = new WaitSignal();
var signal1 = new WaitSignal();
var signal2 = new WaitSignal();
var reader = Task.Run(async () =>
{
await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
signal.Pulse();
signal1.Pulse();
await signal2;
// dispose will end the loop
}
});

await signal;
await signal1;

// After cancelled consume waits for all the pending messages to be delivered to the loop
// since the channel reader carries on reading the messages in its internal queue.
consumeStop.Cancel();

await reader;
// At this point we should only have ACKed one message
await Retry.Until(
"ack pending 9",
async () =>
{
var c = await js.GetConsumerAsync("s1", "c1", cts.Token);
return c.Info.NumAckPending == 9;
},
timeout: TimeSpan.FromSeconds(20));
await consumer.RefreshAsync(cts.Token);
Assert.Equal(9, consumer.Info.NumAckPending);

var infos = new List<INatsJSConsumer>();
await foreach (var natsJSConsumer in stream.ListConsumersAsync(cts.Token))
{
infos.Add(natsJSConsumer);
}
signal2.Pulse();

Assert.Single(infos);
await reader;

Assert.True(infos[0].Info.NumAckPending == 0);
await Retry.Until(
"ack pending 0",
async () =>
{
var c = await js.GetConsumerAsync("s1", "c1", cts.Token);
return c.Info.NumAckPending == 0;
},
timeout: TimeSpan.FromSeconds(20));
await consumer.RefreshAsync(cts.Token);
Assert.Equal(0, consumer.Info.NumAckPending);
}
}
49 changes: 33 additions & 16 deletions tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ public async Task FetchNoWait_test()
[Fact]
public async Task Fetch_dispose_test()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await using var server = NatsServer.StartJS();

await using var nats = server.CreateClientConnection();

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

var js = new NatsJSContext(nats);
var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
var consumer = (NatsJSConsumer)await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
Expand All @@ -94,32 +94,49 @@ public async Task Fetch_dispose_test()

var fc = await consumer.FetchInternalAsync<TestData>(serializer: TestDataJsonSerializer<TestData>.Default, fetchOpts, cancellationToken: cts.Token);

var signal = new WaitSignal();
var signal1 = new WaitSignal();
var signal2 = new WaitSignal();
var reader = Task.Run(async () =>
{
await foreach (var msg in fc.Msgs.ReadAllAsync(cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
signal.Pulse();
// Introduce delay to make sure not all messages will be acked.
await Task.Delay(1_000, cts.Token);
signal1.Pulse();
await signal2;
}
});

await signal;
await signal1;

// Dispose waits for all the pending messages to be delivered to the loop
// since the channel reader carries on reading the messages in its internal queue.
await fc.DisposeAsync();

await reader;
// At this point we should only have ACKed one message
await Retry.Until(
"ack pending 9",
async () =>
{
var c = await js.GetConsumerAsync("s1", "c1", cts.Token);
return c.Info.NumAckPending == 9;
},
timeout: TimeSpan.FromSeconds(20));
await consumer.RefreshAsync(cts.Token);
Assert.Equal(9, consumer.Info.NumAckPending);

var infos = new List<INatsJSConsumer>();
await foreach (var natsJSConsumer in stream.ListConsumersAsync(cts.Token))
{
infos.Add(natsJSConsumer);
}
signal2.Pulse();

Assert.Single(infos);
await reader;

Assert.True(infos[0].Info.NumAckPending > 0);
await Retry.Until(
"ack pending 0",
async () =>
{
var c = await js.GetConsumerAsync("s1", "c1", cts.Token);
return c.Info.NumAckPending == 0;
},
timeout: TimeSpan.FromSeconds(20));
await consumer.RefreshAsync(cts.Token);
Assert.Equal(0, consumer.Info.NumAckPending);
}
}
77 changes: 77 additions & 0 deletions tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using System.Buffers;
using NATS.Client.Core.Tests;

namespace NATS.Client.JetStream.Tests;

public class CustomSerializerTest
{
[Fact]
public async Task When_consuming_ack_should_be_serialized_normally_if_custom_serializer_used()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection(new NatsOpts
{
SerializerRegistry = new Level42SerializerRegistry(),
});
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);

await js.PublishAsync("s1.1", new byte[] { 0 }, cancellationToken: cts.Token);
await js.PublishAsync("s1.2", new byte[] { 0 }, cancellationToken: cts.Token);

var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

// single ack
{
var next = await consumer.NextAsync<byte[]>(cancellationToken: cts.Token);
if (next is { } msg)
{
Assert.Equal(new byte[] { 42 }, msg.Data);
await msg.AckAsync(cancellationToken: cts.Token);
}
else
{
Assert.Fail("No message received");
}
}

// double ack
{
var next = await consumer.NextAsync<byte[]>(cancellationToken: cts.Token);
if (next is { } msg)
{
Assert.Equal(new byte[] { 42 }, msg.Data);
await msg.AckAsync(opts: new AckOpts(DoubleAck: true), cts.Token);
}
else
{
Assert.Fail("No message received");
}
}

await consumer.RefreshAsync(cts.Token);

Assert.Equal(0, consumer.Info.NumAckPending);
}

private class Level42Serializer<T> : INatsSerializer<T>
{
public void Serialize(IBufferWriter<byte> bufferWriter, T value)
{
bufferWriter.Write(new byte[] { 42 });
bufferWriter.Advance(1);
}

public T Deserialize(in ReadOnlySequence<byte> buffer) => (T)(object)new byte[] { 42 };
}

private class Level42SerializerRegistry : INatsSerializerRegistry
{
public INatsSerialize<T> GetSerializer<T>() => new Level42Serializer<T>();

public INatsDeserialize<T> GetDeserializer<T>() => new Level42Serializer<T>();
}
}
5 changes: 2 additions & 3 deletions tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public async Task Fetch_should_not_block_socket()

// fetch loop
{
var consumer = (NatsJSConsumer) await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
var consumer = (NatsJSConsumer)await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

var fetchOpts = new NatsJSFetchOpts
{
Expand All @@ -44,7 +44,7 @@ public async Task Fetch_should_not_block_socket()

// consume loop
{
var consumer = (NatsJSConsumer) await js.CreateConsumerAsync("s1", "c2", cancellationToken: cts.Token);
var consumer = (NatsJSConsumer)await js.CreateConsumerAsync("s1", "c2", cancellationToken: cts.Token);

var opts = new NatsJSConsumeOpts
{
Expand All @@ -62,6 +62,5 @@ public async Task Fetch_should_not_block_socket()

Assert.Equal(100, count);
}

}
}

0 comments on commit a387d6e

Please sign in to comment.