Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Require IChannel for AsyncDefaultBasicConsumer #1667

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public static async Task Publish_Hello_World(IConnection connection, uint messag
using (IChannel channel = await connection.CreateChannelAsync())
{
QueueDeclareOk queue = await channel.QueueDeclareAsync();
var consumer = new CountingConsumer(messageCount);
var consumer = new CountingConsumer(channel, messageCount);
await channel.BasicConsumeAsync(queue.QueueName, true, consumer);

for (int i = 0; i < messageCount; i++)
Expand All @@ -35,7 +35,7 @@ internal sealed class CountingConsumer : AsyncDefaultBasicConsumer

public Task CompletedTask => _tcs.Task;

public CountingConsumer(uint messageCount)
public CountingConsumer(IChannel channel, uint messageCount) : base(channel)
{
_remainingCount = (int)messageCount;
_tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down
4 changes: 0 additions & 4 deletions projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,11 @@ RabbitMQ.Client.AmqpTimestamp.AmqpTimestamp() -> void
RabbitMQ.Client.AmqpTimestamp.AmqpTimestamp(long unixTime) -> void
RabbitMQ.Client.AmqpTimestamp.Equals(RabbitMQ.Client.AmqpTimestamp other) -> bool
RabbitMQ.Client.AsyncDefaultBasicConsumer
RabbitMQ.Client.AsyncDefaultBasicConsumer.AsyncDefaultBasicConsumer() -> void
RabbitMQ.Client.AsyncDefaultBasicConsumer.AsyncDefaultBasicConsumer(RabbitMQ.Client.IChannel channel) -> void
RabbitMQ.Client.AsyncDefaultBasicConsumer.Channel.get -> RabbitMQ.Client.IChannel
RabbitMQ.Client.AsyncDefaultBasicConsumer.Channel.set -> void
RabbitMQ.Client.AsyncDefaultBasicConsumer.ConsumerTags.get -> string[]
RabbitMQ.Client.AsyncDefaultBasicConsumer.IsRunning.get -> bool
RabbitMQ.Client.AsyncDefaultBasicConsumer.IsRunning.set -> void
RabbitMQ.Client.AsyncDefaultBasicConsumer.ShutdownReason.get -> RabbitMQ.Client.ShutdownEventArgs
RabbitMQ.Client.AsyncDefaultBasicConsumer.ShutdownReason.set -> void
RabbitMQ.Client.BasicGetResult
RabbitMQ.Client.BasicGetResult.BasicGetResult(ulong deliveryTag, bool redelivered, string exchange, string routingKey, uint messageCount, RabbitMQ.Client.IReadOnlyBasicProperties basicProperties, System.ReadOnlyMemory<byte> body) -> void
RabbitMQ.Client.BasicProperties
Expand Down
13 changes: 3 additions & 10 deletions projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ public class AsyncDefaultBasicConsumer : IAsyncBasicConsumer
{
private readonly HashSet<string> _consumerTags = new HashSet<string>();

/// <summary>
/// Creates a new instance of an <see cref="AsyncDefaultBasicConsumer"/>.
/// </summary>
public AsyncDefaultBasicConsumer()
{
}

/// <summary>
/// Constructor which sets the Channel property to the given value.
/// </summary>
Expand All @@ -40,19 +33,19 @@ public string[] ConsumerTags
/// <summary>
/// Returns true while the consumer is registered and expecting deliveries from the broker.
/// </summary>
public bool IsRunning { get; protected set; }
public bool IsRunning { get; private set; }

/// <summary>
/// If our <see cref="IChannel"/> shuts down, this property will contain a description of the reason for the
/// shutdown. Otherwise it will contain null. See <see cref="ShutdownEventArgs"/>.
/// </summary>
public ShutdownEventArgs? ShutdownReason { get; protected set; }
public ShutdownEventArgs? ShutdownReason { get; private set; }

/// <summary>
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
public IChannel? Channel { get; set; }
public IChannel Channel { get; private set; }

/// <summary>
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
Expand Down
13 changes: 8 additions & 5 deletions projects/Test/Integration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public TestAsyncConsumer(ITestOutputHelper output)
public async Task TestBasicRoundtripConcurrent()
{
AddCallbackExceptionHandlers();
_channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output);
_channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output);

QueueDeclareOk q = await _channel.QueueDeclareAsync();

Expand Down Expand Up @@ -147,7 +147,7 @@ public async Task TestBasicRoundtripConcurrent()
public async Task TestBasicRoundtripConcurrentManyMessages()
{
AddCallbackExceptionHandlers();
_channel.DefaultConsumer = new DefaultAsyncConsumer("_channel,", _output);
_channel.DefaultConsumer = new DefaultAsyncConsumer(_channel, "_channel,", _output);

const int publish_total = 4096;
const int length = 512;
Expand Down Expand Up @@ -205,7 +205,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
using (IChannel publishChannel = await publishConn.CreateChannelAsync())
{
AddCallbackExceptionHandlers(publishConn, publishChannel);
publishChannel.DefaultConsumer = new DefaultAsyncConsumer("publishChannel,", _output);
publishChannel.DefaultConsumer = new DefaultAsyncConsumer(publishChannel,
"publishChannel,", _output);
publishChannel.ChannelShutdown += (o, ea) =>
{
HandleChannelShutdown(publishChannel, ea, (args) =>
Expand Down Expand Up @@ -247,7 +248,8 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
using (IChannel consumeChannel = await consumeConn.CreateChannelAsync())
{
AddCallbackExceptionHandlers(consumeConn, consumeChannel);
consumeChannel.DefaultConsumer = new DefaultAsyncConsumer("consumeChannel,", _output);
consumeChannel.DefaultConsumer = new DefaultAsyncConsumer(consumeChannel,
"consumeChannel,", _output);
consumeChannel.ChannelShutdown += (o, ea) =>
{
HandleChannelShutdown(consumeChannel, ea, (args) =>
Expand Down Expand Up @@ -722,7 +724,8 @@ private class DefaultAsyncConsumer : AsyncDefaultBasicConsumer
private readonly string _logPrefix;
private readonly ITestOutputHelper _output;

public DefaultAsyncConsumer(string logPrefix, ITestOutputHelper output)
public DefaultAsyncConsumer(IChannel channel, string logPrefix, ITestOutputHelper output)
: base(channel)
{
_logPrefix = logPrefix;
_output = output;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public async Task TestChannelShutdownDoesNotShutDownDispatcher()

private class ShutdownLatchConsumer : AsyncDefaultBasicConsumer
{
public ShutdownLatchConsumer()
public ShutdownLatchConsumer(IChannel channel) : base(channel)
{
Latch = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
DuplicateLatch = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down Expand Up @@ -211,7 +211,7 @@ public override Task HandleChannelShutdownAsync(object channel, ShutdownEventArg
public async Task TestChannelShutdownHandler()
{
string q = await _channel.QueueDeclareAsync();
var consumer = new ShutdownLatchConsumer();
var consumer = new ShutdownLatchConsumer(_channel);

await _channel.BasicConsumeAsync(q, true, consumer);
await _channel.CloseAsync();
Expand Down