Skip to content

Commit

Permalink
Add QueuePurgeAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Oct 24, 2023
1 parent 076bddd commit fccd710
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 6 deletions.
14 changes: 8 additions & 6 deletions projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -558,14 +558,16 @@ ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString rou
/// <returns>Returns the number of messages purged during deletion.</returns>
void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty);

/// <summary>
/// Purge a queue of messages.
/// </summary>
/// <remarks>
/// Returns the number of messages purged.
/// </remarks>
/// <summary>Asynchronously purge a queue of messages.</summary>
/// <param name="queue">The queue.</param>
/// <returns>Returns the number of messages purged.</returns>
uint QueuePurge(string queue);

/// <summary>Asynchronously purge a queue of messages.</summary>
/// <param name="queue">The queue.</param>
/// <returns>Returns the number of messages purged.</returns>
ValueTask<uint> QueuePurgeAsync(string queue);

/// <summary>
/// Unbind a queue from an exchange.
/// </summary>
Expand Down
27 changes: 27 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,31 @@ public override void HandleCommand(in IncomingCommand cmd)
}
}
}

internal class QueuePurgeAsyncRpcContinuation : AsyncRpcContinuation<uint>
{
public QueuePurgeAsyncRpcContinuation(TimeSpan continuationTimeout) : base(continuationTimeout)
{
}

public override void HandleCommand(in IncomingCommand cmd)
{
try
{
if (cmd.CommandId == ProtocolCommandId.QueuePurgeOk)
{
var method = new Client.Framing.Impl.QueuePurgeOk(cmd.MethodBytes.Span);
_tcs.TrySetResult(method._messageCount);
}
else
{
_tcs.SetException(new InvalidOperationException($"Received unexpected command of type {cmd.CommandId}!"));
}
}
finally
{
cmd.ReturnMethodBuffer();
}
}
}
}
3 changes: 3 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,9 @@ public void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty)
public uint QueuePurge(string queue)
=> InnerChannel.QueuePurge(queue);

public ValueTask<uint> QueuePurgeAsync(string queue)
=> InnerChannel.QueuePurgeAsync(queue);

public void QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments)
{
ThrowIfDisposed();
Expand Down
19 changes: 19 additions & 0 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,25 @@ public uint QueuePurge(string queue)
return _Private_QueuePurge(queue, false);
}

public async ValueTask<uint> QueuePurgeAsync(string queue)
{
await _rpcSemaphore.WaitAsync().ConfigureAwait(false);
try
{
var k = new QueuePurgeAsyncRpcContinuation(ContinuationTimeout);
Enqueue(k);

var method = new QueuePurge(queue, false);
await ModelSendAsync(method).ConfigureAwait(false);

return await k;
}
finally
{
_rpcSemaphore.Release();
}
}

public abstract void QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);

public abstract void TxCommit();
Expand Down
1 change: 1 addition & 0 deletions projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ namespace RabbitMQ.Client
System.Threading.Tasks.ValueTask<uint> QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty);
void QueueDeleteNoWait(string queue, bool ifUnused, bool ifEmpty);
uint QueuePurge(string queue);
System.Threading.Tasks.ValueTask<uint> QueuePurgeAsync(string queue);
void QueueUnbind(string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments);
void TxCommit();
void TxRollback();
Expand Down
30 changes: 30 additions & 0 deletions projects/Unit/TestBasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -292,5 +292,35 @@ public void TestPropertiesRountrip_Headers()
Assert.Equal("World", response);
}
}

[Fact]
public async Task TestQueuePurgeAsync()
{
const int messageCount = 1024;
var publishSyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var cf = new ConnectionFactory { DispatchConsumersAsync = true };
using IConnection connection = cf.CreateConnection();
using IChannel channel = connection.CreateChannel();

await channel.ConfirmSelectAsync();

QueueDeclareOk q = await channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
string queueName = q.QueueName;

var publishTask = Task.Run(async () =>
{
for (int i = 0; i < messageCount; i++)
{
byte[] body = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
await channel.BasicPublishAsync(string.Empty, queueName, body);
}
publishSyncSource.SetResult(true);
});

await channel.WaitForConfirmsOrDieAsync();
Assert.True(await publishSyncSource.Task);

Assert.Equal((uint)messageCount, await channel.QueuePurgeAsync(queueName));
}
}
}

0 comments on commit fccd710

Please sign in to comment.