Skip to content

Commit

Permalink
implement channel
Browse files Browse the repository at this point in the history
  • Loading branch information
bollhals committed Dec 5, 2020
1 parent 34aac15 commit 1ea8c30
Show file tree
Hide file tree
Showing 116 changed files with 3,922 additions and 6,221 deletions.
5 changes: 4 additions & 1 deletion projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,15 @@
</AssemblyAttribute>
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' ">
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.1" />
<PackageReference Include="System.Memory" Version="4.5.4" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.CodeAnalysis.FxCopAnalyzers" Version="3.3.1" PrivateAssets="All" />
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="MinVer" Version="2.3.0" PrivateAssets="All" />
<PackageReference Include="System.Memory" Version="4.5.4" />
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
</ItemGroup>

Expand Down
19 changes: 19 additions & 0 deletions projects/RabbitMQ.Client/client/FrameworkExtension/Interlocked.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Runtime.CompilerServices;

namespace RabbitMQ.Client
{
#if NETCOREAPP3_1 || NETSTANDARD
internal static class Interlocked
{
public static ulong CompareExchange(ref ulong location1, ulong value, ulong comparand)
{
return (ulong)System.Threading.Interlocked.CompareExchange(ref Unsafe.As<ulong, long>(ref location1), (long)value, (long)comparand);
}

public static ulong Increment(ref ulong location1)
{
return (ulong)System.Threading.Interlocked.Add(ref Unsafe.As<ulong, long>(ref location1), 1L);
}
}
#endif
}
26 changes: 13 additions & 13 deletions projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

using RabbitMQ.Client.client.impl.Channel;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
Expand All @@ -12,29 +12,29 @@ public class AsyncDefaultBasicConsumer : IBasicConsumer, IAsyncBasicConsumer
private readonly HashSet<string> _consumerTags = new HashSet<string>();

/// <summary>
/// Creates a new instance of an <see cref="DefaultBasicConsumer"/>.
/// Creates a new instance of an <see cref="AsyncDefaultBasicConsumer"/>.
/// </summary>
public AsyncDefaultBasicConsumer()
{
ShutdownReason = null;
Model = null;
Channel = null;
IsRunning = false;
}

/// <summary>
/// Constructor which sets the Model property to the given value.
/// Constructor which sets the <see cref="Channel"/> property to the given value.
/// </summary>
/// <param name="model">Common AMQP model.</param>
public AsyncDefaultBasicConsumer(IModel model)
/// <param name="channel">The channel.</param>
public AsyncDefaultBasicConsumer(IChannel channel)
{
ShutdownReason = null;
IsRunning = false;
Model = model;
Channel = channel;
}

/// <summary>
/// Retrieve the consumer tags this consumer is registered as; to be used when discussing this consumer
/// with the server, for instance with <see cref="IModel.BasicCancel"/>.
/// with the server, for instance with <see cref="IChannel.CancelConsumerAsync"/>.
/// </summary>
public string[] ConsumerTags
{
Expand All @@ -50,7 +50,7 @@ public string[] ConsumerTags
public bool IsRunning { get; protected set; }

/// <summary>
/// If our <see cref="IModel"/> shuts down, this property will contain a description of the reason for the
/// If our <see cref="IChannel"/> shuts down, this property will contain a description of the reason for the
/// shutdown. Otherwise it will contain null. See <see cref="ShutdownEventArgs"/>.
/// </summary>
public ShutdownEventArgs ShutdownReason { get; protected set; }
Expand All @@ -61,10 +61,10 @@ public string[] ConsumerTags
public event AsyncEventHandler<ConsumerEventArgs> ConsumerCancelled;

/// <summary>
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
public IModel Model { get; set; }
public IChannel Channel { get; set; }

/// <summary>
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
Expand Down Expand Up @@ -101,7 +101,7 @@ public virtual Task HandleBasicConsumeOk(string consumerTag)
/// Called each time a message is delivered for this consumer.
/// </summary>
/// <remarks>
/// This is a no-op implementation. It will not acknowledge deliveries via <see cref="IModel.BasicAck"/>
/// This is a no-op implementation. It will not acknowledge deliveries via <see cref="IChannel.AckMessageAsync"/>
/// if consuming in automatic acknowledgement mode.
/// Subclasses must copy or fully use delivery body before returning.
/// Accessing the body at a later point is unsafe as its memory can
Expand All @@ -120,7 +120,7 @@ public virtual Task HandleBasicDeliver(string consumerTag,
}

/// <summary>
/// Called when the model (channel) this consumer was registered on terminates.
/// Called when the channel this consumer was registered on terminates.
/// </summary>
/// <param name="model">A channel this consumer was registered on.</param>
/// <param name="reason">Shutdown context.</param>
Expand Down
8 changes: 4 additions & 4 deletions projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ namespace RabbitMQ.Client
/// //
/// IConnection conn = factory.CreateConnection();
/// //
/// IModel ch = conn.CreateModel();
/// IChannel ch = await conn.CreateChannelAsync().ConfigureAwait(false);
/// //
/// // ... use ch's IModel methods ...
/// // ... use ch's IChannel methods ...
/// //
/// ch.Close(Constants.ReplySuccess, "Closing the channel");
/// await ch.CloseAsync().ConfigureAwait(false);
/// conn.Close(Constants.ReplySuccess, "Closing the connection");
/// </code></example>
/// <para>
Expand Down Expand Up @@ -492,7 +492,7 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c
else
{
var protocol = new RabbitMQ.Client.Framing.Protocol();
conn = protocol.CreateConnection(this, false, endpointResolver.SelectOne(CreateFrameHandler), clientProvidedName);
conn = protocol.CreateConnection(this, endpointResolver.SelectOne(CreateFrameHandler), clientProvidedName);
}
}
catch (Exception e)
Expand Down
24 changes: 12 additions & 12 deletions projects/RabbitMQ.Client/client/api/DefaultBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
using System;
using System.Collections.Generic;
using System.Linq;

using RabbitMQ.Client.client.impl.Channel;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
Expand All @@ -56,24 +56,24 @@ public class DefaultBasicConsumer : IBasicConsumer
public DefaultBasicConsumer()
{
ShutdownReason = null;
Model = null;
Channel = null;
IsRunning = false;
}

/// <summary>
/// Constructor which sets the Model property to the given value.
/// Constructor which sets the Channel property to the given value.
/// </summary>
/// <param name="model">Common AMQP model.</param>
public DefaultBasicConsumer(IModel model)
/// <param name="channel">The channel.</param>
public DefaultBasicConsumer(IChannel channel)
{
ShutdownReason = null;
IsRunning = false;
Model = model;
Channel = channel;
}

/// <summary>
/// Retrieve the consumer tags this consumer is registered as; to be used to identify
/// this consumer, for example, when cancelling it with <see cref="IModel.BasicCancel"/>.
/// this consumer, for example, when cancelling it with <see cref="IChannel.CancelConsumerAsync"/>.
/// This value is an array because a single consumer instance can be reused to consume on
/// multiple channels.
/// </summary>
Expand All @@ -91,7 +91,7 @@ public string[] ConsumerTags
public bool IsRunning { get; protected set; }

/// <summary>
/// If our <see cref="IModel"/> shuts down, this property will contain a description of the reason for the
/// If our <see cref="IChannel"/> shuts down, this property will contain a description of the reason for the
/// shutdown. Otherwise it will contain null. See <see cref="ShutdownEventArgs"/>.
/// </summary>
public ShutdownEventArgs ShutdownReason { get; protected set; }
Expand All @@ -102,10 +102,10 @@ public string[] ConsumerTags
public event EventHandler<ConsumerEventArgs> ConsumerCancelled;

/// <summary>
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
public IModel Model { get; set; }
public IChannel Channel { get; set; }

/// <summary>
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
Expand Down Expand Up @@ -141,7 +141,7 @@ public virtual void HandleBasicConsumeOk(string consumerTag)
/// Called each time a message is delivered for this consumer.
/// </summary>
/// <remarks>
/// This is a no-op implementation. It will not acknowledge deliveries via <see cref="IModel.BasicAck"/>
/// This is a no-op implementation. It will not acknowledge deliveries via <see cref="IChannel.AckMessageAsync"/>
/// if consuming in automatic acknowledgement mode.
/// Subclasses must copy or fully use delivery body before returning.
/// Accessing the body at a later point is unsafe as its memory can
Expand All @@ -159,7 +159,7 @@ public virtual void HandleBasicDeliver(string consumerTag,
}

/// <summary>
/// Called when the model (channel) this consumer was registered on terminates.
/// Called when the channel this consumer was registered on terminates.
/// </summary>
/// <param name="model">A channel this consumer was registered on.</param>
/// <param name="reason">Shutdown context.</param>
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/api/ExchangeType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace RabbitMQ.Client
/// </summary>
/// <remarks>
/// Use the static members of this class as values for the
/// "exchangeType" arguments for IModel methods such as
/// "exchangeType" arguments for IChannel methods such as
/// ExchangeDeclare. The broker may be extended with additional
/// exchange types that do not appear in this class.
/// </remarks>
Expand Down
14 changes: 7 additions & 7 deletions projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
using System;
using System.Threading.Tasks;

using RabbitMQ.Client.client.impl.Channel;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
{
public interface IAsyncBasicConsumer
{
/// <summary>
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
IModel Model { get; }
IChannel Channel { get; }

/// <summary>
/// Signalled when the consumer gets cancelled.
Expand Down Expand Up @@ -43,7 +43,7 @@ public interface IAsyncBasicConsumer
/// </summary>
/// <remarks>
/// Does nothing with the passed in information.
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IModel.BasicAck"/>.
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IChannel.AckMessageAsync"/>.
/// The implementation of this method in this class does NOT acknowledge such messages.
/// </remarks>
Task HandleBasicDeliver(string consumerTag,
Expand All @@ -55,10 +55,10 @@ Task HandleBasicDeliver(string consumerTag,
ReadOnlyMemory<byte> body);

/// <summary>
/// Called when the model shuts down.
/// Called when the channel shuts down.
/// </summary>
/// <param name="model"> Common AMQP model.</param>
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
/// <param name="model">The channel.</param>
/// <param name="reason"> Information about the reason why a particular channel, session, or connection was destroyed.</param>
Task HandleModelShutdown(object model, ShutdownEventArgs reason);
}
}
17 changes: 7 additions & 10 deletions projects/RabbitMQ.Client/client/api/IBasicConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
//---------------------------------------------------------------------------

using System;

using RabbitMQ.Client.client.impl.Channel;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
Expand All @@ -39,9 +39,6 @@ namespace RabbitMQ.Client
///receive messages from a queue by subscription.</summary>
/// <remarks>
/// <para>
/// See IModel.BasicConsume, IModel.BasicCancel.
/// </para>
/// <para>
/// Note that the "Handle*" methods run in the connection's
/// thread! Consider using <see cref="EventingBasicConsumer"/>, which uses a
/// SharedQueue instance to safely pass received messages across
Expand All @@ -51,10 +48,10 @@ namespace RabbitMQ.Client
public interface IBasicConsumer
{
/// <summary>
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
IModel Model { get; }
IChannel Channel { get; }

/// <summary>
/// Signalled when the consumer gets cancelled.
Expand Down Expand Up @@ -86,7 +83,7 @@ public interface IBasicConsumer
/// </summary>
/// <remarks>
/// Does nothing with the passed in information.
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IModel.BasicAck"/>.
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IChannel.AckMessageAsync"/>.
/// The implementation of this method in this class does NOT acknowledge such messages.
/// </remarks>
void HandleBasicDeliver(string consumerTag,
Expand All @@ -98,10 +95,10 @@ void HandleBasicDeliver(string consumerTag,
ReadOnlyMemory<byte> body);

/// <summary>
/// Called when the model shuts down.
/// Called when the channel shuts down.
/// </summary>
/// <param name="model"> Common AMQP model.</param>
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
/// <param name="model">The channel.</param>
/// <param name="reason"> Information about the reason why a particular channel, session, or connection was destroyed.</param>
void HandleModelShutdown(object model, ShutdownEventArgs reason);
}
}
6 changes: 0 additions & 6 deletions projects/RabbitMQ.Client/client/api/IBasicProperties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ namespace RabbitMQ.Client
/// 0-8, 0-8qpid, 0-9 and 0-9-1 of AMQP.</summary>
/// <remarks>
/// <para>
/// The specification code generator provides
/// protocol-version-specific implementations of this interface. To
/// obtain an implementation of this interface in a
/// protocol-version-neutral way, use <see cref="IModel.CreateBasicProperties"/>.
/// </para>
/// <para>
/// Each property is readable, writable and clearable: a cleared
/// property will not be transmitted over the wire. Properties on a
/// fresh instance are clear by default.
Expand Down
14 changes: 7 additions & 7 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
using System.Collections.Generic;
using System.IO;
using System.Threading;

using System.Threading.Tasks;
using RabbitMQ.Client.client.impl.Channel;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
{
Expand Down Expand Up @@ -218,7 +218,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// Abort this connection and all its channels.
/// </summary>
/// <remarks>
/// Note that all active channels, sessions, and models will be closed if this method is called.
/// Note that all active channels, sessions, and consumers will be closed if this method is called.
/// In comparison to normal <see cref="Close()"/> method, <see cref="Abort()"/> will not throw
/// <see cref="IOException"/> during closing connection.
///This method waits infinitely for the in-progress close operation to complete.
Expand Down Expand Up @@ -275,7 +275,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// Close this connection and all its channels.
/// </summary>
/// <remarks>
/// Note that all active channels, sessions, and models will be
/// Note that all active channels, sessions, and consumers will be
/// closed if this method is called. It will wait for the in-progress
/// close operation to complete. This method will not return to the caller
/// until the shutdown is complete. If the connection is already closed
Expand Down Expand Up @@ -304,7 +304,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// and wait with a timeout for all the in-progress close operations to complete.
/// </summary>
/// <remarks>
/// Note that all active channels, sessions, and models will be
/// Note that all active channels, sessions, and consumers will be
/// closed if this method is called. It will wait for the in-progress
/// close operation to complete with a timeout. If the connection is
/// already closed (or closing), then this method will do nothing.
Expand Down Expand Up @@ -336,9 +336,9 @@ public interface IConnection : INetworkConnection, IDisposable
void Close(ushort reasonCode, string reasonText, TimeSpan timeout);

/// <summary>
/// Create and return a fresh channel, session, and model.
/// Create and return a fresh channel, session.
/// </summary>
IModel CreateModel();
ValueTask<IChannel> CreateChannelAsync();

/// <summary>
/// Handle incoming Connection.Blocked methods.
Expand Down
Loading

0 comments on commit 1ea8c30

Please sign in to comment.