Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix object disposed exception during channel Recovery #1648

Merged
merged 3 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion RabbitMQDotNetClient.sln
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Common", "projects\Test\Com
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "projects\toxiproxy-netcore\src\ToxiproxyNetCore\ToxiproxyNetCore.csproj", "{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1647", "projects\Test\Applications\GH-1647\GH-1647.csproj", "{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -96,6 +98,10 @@ Global
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.Build.0 = Release|Any CPU
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -110,6 +116,7 @@ Global
{F25725D7-2978-45F4-B90F-25D6F8B71C9E} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
{C11F25F4-7EA1-4874-9E25-DEB42E3A7C67} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1}
Expand Down
28 changes: 22 additions & 6 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,19 @@ public IAsyncBasicConsumer? DefaultConsumer

public string? CurrentQueue => InnerChannel.CurrentQueue;

internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers,
internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers,
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
if (false == recordedEntitiesSemaphoreHeld)
{
throw new InvalidOperationException("recordedEntitiesSemaphore must be held");
}

ThrowIfDisposed();
if (_disposed)
{
return false;
}

_connection = conn;

RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(cancellationToken)
Expand Down Expand Up @@ -189,15 +193,27 @@ await newChannel.TxSelectAsync(cancellationToken)
* chance that an invalid Channel will be used to handle a basic.deliver frame,
* with the resulting basic.ack never getting sent out.
*/
_innerChannel = newChannel;

if (recoverConsumers)
if (_disposed)
{
await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaphoreHeld)
await newChannel.AbortAsync()
.ConfigureAwait(false);
return false;
}
else
{
_innerChannel = newChannel;

if (recoverConsumers)
{
await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaphoreHeld)
.ConfigureAwait(false);
}

_innerChannel.RunRecoveryEventHandlers(this);

_innerChannel.RunRecoveryEventHandlers(this);
return true;
}
}

public async Task CloseAsync(ushort replyCode, string replyText, bool abort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -583,12 +584,44 @@ private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitie
throw new InvalidOperationException("recordedEntitiesSemaphore must be held");
}

foreach (AutorecoveringChannel channel in _channels)
var channelsToRecover = new List<AutorecoveringChannel>();
await _channelsSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
channelsToRecover.AddRange(_channels);
}
finally
{
_channelsSemaphore.Release();
}

var notRecoveredChannels = new List<AutorecoveringChannel>();
foreach (AutorecoveringChannel channel in channelsToRecover)
{
await channel.AutomaticallyRecoverAsync(this, _config.TopologyRecoveryEnabled,
bool recovered = await channel.AutomaticallyRecoverAsync(this, _config.TopologyRecoveryEnabled,
recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld,
cancellationToken: cancellationToken)
.ConfigureAwait(false);

if (false == recovered)
{
notRecoveredChannels.Add(channel);
}
}

await _channelsSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
foreach (AutorecoveringChannel channel in notRecoveredChannels)
{
_channels.Remove(channel);
}
}
finally
{
_channelsSemaphore.Release();
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions projects/Test/Applications/GH-1647/GH-1647.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>GH_1647</RootNamespace>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="../../../RabbitMQ.Client\RabbitMQ.Client.csproj" />
</ItemGroup>

</Project>
29 changes: 29 additions & 0 deletions projects/Test/Applications/GH-1647/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
using System.Text;
using RabbitMQ.Client;

ConnectionFactory connectionFactory = new()
{
AutomaticRecoveryEnabled = true,
UserName = "guest",
Password = "guest"
};

var props = new BasicProperties();
byte[] msg = Encoding.UTF8.GetBytes("test");
using var connection = await connectionFactory.CreateConnectionAsync();
for (int i = 0; i < 300; i++)
{
try
{
using var channel = await connection.CreateChannelAsync(); // New channel for each message
await Task.Delay(1000);
await channel.BasicPublishAsync(string.Empty, string.Empty, props, msg);
Console.WriteLine($"Sent message {i}");
}
catch (Exception ex)
{
Console.WriteLine($"Failed to send message {i}: {ex.Message}");
await Task.Delay(1000);
}
}