Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed JetStream ACK custom serialization #255

Merged
merged 3 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/NATS.Client.JetStream/NatsJSMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,12 @@ private async ValueTask SendAckAsync(ReadOnlySequence<byte> payload, AckOpts opt

if ((opts.DoubleAck ?? _context.Opts.AckOpts.DoubleAck) == true)
{
await Connection.RequestAsync<ReadOnlySequence<byte>, object?>(ReplyTo, payload, cancellationToken: cancellationToken);
await Connection.RequestAsync<ReadOnlySequence<byte>, object?>(
subject: ReplyTo,
data: payload,
requestSerializer: NatsRawSerializer<ReadOnlySequence<byte>>.Default,
replySerializer: NatsRawSerializer<object?>.Default,
cancellationToken: cancellationToken);
}
else
{
Expand All @@ -149,6 +154,7 @@ await _msg.ReplyAsync(
{
WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,
},
serializer: NatsRawSerializer<ReadOnlySequence<byte>>.Default,
cancellationToken: cancellationToken);
}
}
Expand Down
29 changes: 15 additions & 14 deletions tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,32 +94,33 @@ public async Task Fetch_dispose_test()

var fc = await consumer.FetchInternalAsync<TestData>(serializer: TestDataJsonSerializer<TestData>.Default, fetchOpts, cancellationToken: cts.Token);

var signal = new WaitSignal();
var signal1 = new WaitSignal();
var signal2 = new WaitSignal();
var reader = Task.Run(async () =>
{
await foreach (var msg in fc.Msgs.ReadAllAsync(cts.Token))
{
await msg.AckAsync(cancellationToken: cts.Token);
signal.Pulse();

// Introduce delay to make sure not all messages will be acked.
await Task.Delay(1_000, cts.Token);
signal1.Pulse();
await signal2;
}
});

await signal;
await signal1;

// Dispose waits for all the pending messages to be delivered to the loop
// since the channel reader carries on reading the messages in its internal queue.
await fc.DisposeAsync();

await reader;
// At this point we should only have ACKed one message
await consumer.RefreshAsync(cts.Token);
Assert.Equal(9, consumer.Info.NumAckPending);

var infos = new List<INatsJSConsumer>();
await foreach (var natsJSConsumer in stream.ListConsumersAsync(cts.Token))
{
infos.Add(natsJSConsumer);
}
signal2.Pulse();

Assert.Single(infos);
await reader;

Assert.True(infos[0].Info.NumAckPending > 0);
await consumer.RefreshAsync(cts.Token);
Assert.Equal(0, consumer.Info.NumAckPending);
}
}
77 changes: 77 additions & 0 deletions tests/NATS.Client.JetStream.Tests/CustomSerializerTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using System.Buffers;
using NATS.Client.Core.Tests;

namespace NATS.Client.JetStream.Tests;

public class CustomSerializerTest
{
[Fact]
public async Task When_consuming_ack_should_be_serialized_normally_if_custom_serializer_used()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection(new NatsOpts
{
SerializerRegistry = new Level42SerializerRegistry(),
});
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);

await js.PublishAsync("s1.1", new byte[] { 0 }, cancellationToken: cts.Token);
await js.PublishAsync("s1.2", new byte[] { 0 }, cancellationToken: cts.Token);

var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

// single ack
{
var next = await consumer.NextAsync<byte[]>(cancellationToken: cts.Token);
if (next is { } msg)
{
Assert.Equal(new byte[] { 42 }, msg.Data);
await msg.AckAsync(cancellationToken: cts.Token);
}
else
{
Assert.Fail("No message received");
}
}

// double ack
{
var next = await consumer.NextAsync<byte[]>(cancellationToken: cts.Token);
if (next is { } msg)
{
Assert.Equal(new byte[] { 42 }, msg.Data);
await msg.AckAsync(opts: new AckOpts(DoubleAck: true), cts.Token);
}
else
{
Assert.Fail("No message received");
}
}

await consumer.RefreshAsync(cts.Token);

Assert.Equal(0, consumer.Info.NumAckPending);
}

private class Level42Serializer<T> : INatsSerializer<T>
{
public void Serialize(IBufferWriter<byte> bufferWriter, T value)
{
bufferWriter.Write(new byte[] { 42 });
bufferWriter.Advance(1);
}

public T Deserialize(in ReadOnlySequence<byte> buffer) => (T)(object)new byte[] { 42 };
}

private class Level42SerializerRegistry : INatsSerializerRegistry
{
public INatsSerialize<T> GetSerializer<T>() => new Level42Serializer<T>();

public INatsDeserialize<T> GetDeserializer<T>() => new Level42Serializer<T>();
}
}
5 changes: 2 additions & 3 deletions tests/NATS.Client.JetStream.Tests/DoubleAckTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public async Task Fetch_should_not_block_socket()

// fetch loop
{
var consumer = (NatsJSConsumer) await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);
var consumer = (NatsJSConsumer)await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

var fetchOpts = new NatsJSFetchOpts
{
Expand All @@ -44,7 +44,7 @@ public async Task Fetch_should_not_block_socket()

// consume loop
{
var consumer = (NatsJSConsumer) await js.CreateConsumerAsync("s1", "c2", cancellationToken: cts.Token);
var consumer = (NatsJSConsumer)await js.CreateConsumerAsync("s1", "c2", cancellationToken: cts.Token);

var opts = new NatsJSConsumeOpts
{
Expand All @@ -62,6 +62,5 @@ public async Task Fetch_should_not_block_socket()

Assert.Equal(100, count);
}

}
}
Loading