Skip to content

Commit

Permalink
Merge pull request #1247 from bollhals/avoidRefInBasicPublish
Browse files Browse the repository at this point in the history
change ref to in and use Unsafe.AsRef
  • Loading branch information
lukebakken authored Sep 22, 2022
2 parents 868dfc3 + b597225 commit 99a2b69
Show file tree
Hide file tree
Showing 15 changed files with 91 additions and 114 deletions.
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/api/IModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ string BasicConsume(
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
void BasicPublish<TProperties>(string exchange, string routingKey, ref TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
void BasicPublish<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
/// <summary>
/// Publishes a message.
Expand All @@ -198,7 +198,7 @@ void BasicPublish<TProperties>(string exchange, string routingKey, ref TProperti
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, ref TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
#nullable disable

Expand Down
12 changes: 6 additions & 6 deletions projects/RabbitMQ.Client/client/api/IModelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,17 @@ public static string BasicConsume(this IModel model, string queue,
/// <remarks>
/// The publication occurs with mandatory=false and immediate=false.
/// </remarks>
public static void BasicPublish<T>(this IModel model, PublicationAddress addr, ref T basicProperties, ReadOnlyMemory<byte> body)
public static void BasicPublish<T>(this IModel model, PublicationAddress addr, in T basicProperties, ReadOnlyMemory<byte> body)
where T : IReadOnlyBasicProperties, IAmqpHeader
{
model.BasicPublish(addr.ExchangeName, addr.RoutingKey, ref basicProperties, body);
model.BasicPublish(addr.ExchangeName, addr.RoutingKey, in basicProperties, body);
}

public static void BasicPublish(this IModel model, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> model.BasicPublish(exchange, routingKey, ref EmptyBasicProperty.Empty, body, mandatory);
=> model.BasicPublish(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);

public static void BasicPublish(this IModel model, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> model.BasicPublish(exchange, routingKey, ref EmptyBasicProperty.Empty, body, mandatory);
=> model.BasicPublish(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
#nullable disable

/// <summary>
Expand All @@ -113,7 +113,7 @@ public static void ExchangeBind(this IModel model, string destination, string so
}

/// <summary>
/// (Extension method) Like exchange bind but sets nowait to true.
/// (Extension method) Like exchange bind but sets nowait to true.
/// </summary>
public static void ExchangeBindNoWait(this IModel model, string destination, string source, string routingKey, IDictionary<string, object> arguments = null)
{
Expand All @@ -130,7 +130,7 @@ public static void ExchangeDeclare(this IModel model, string exchange, string ty
}

/// <summary>
/// (Extension method) Like ExchangeDeclare but sets nowait to true.
/// (Extension method) Like ExchangeDeclare but sets nowait to true.
/// </summary>
public static void ExchangeDeclareNoWait(this IModel model, string exchange, string type, bool durable = false, bool autoDelete = false,
IDictionary<string, object> arguments = null)
Expand Down
121 changes: 49 additions & 72 deletions projects/RabbitMQ.Client/client/framing/Model.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,255 +44,232 @@ public Model(ConnectionConfig config, ISession session) : base(config, session)

public override void ConnectionTuneOk(ushort channelMax, uint frameMax, ushort heartbeat)
{
var cmd = new ConnectionTuneOk(channelMax, frameMax, heartbeat);
ModelSend(ref cmd);
ModelSend(new ConnectionTuneOk(channelMax, frameMax, heartbeat));
}

public override void _Private_BasicCancel(string consumerTag, bool nowait)
{
var cmd = new BasicCancel(consumerTag, nowait);
ModelSend(ref cmd);
ModelSend(new BasicCancel(consumerTag, nowait));
}

public override void _Private_BasicConsume(string queue, string consumerTag, bool noLocal, bool autoAck, bool exclusive, bool nowait, IDictionary<string, object> arguments)
{
var cmd = new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, nowait, arguments);
ModelSend(ref cmd);
ModelSend(new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, nowait, arguments));
}

public override void _Private_BasicGet(string queue, bool autoAck)
{
var cmd = new BasicGet(queue, autoAck);
ModelSend(ref cmd);
ModelSend(new BasicGet(queue, autoAck));
}

public override void _Private_BasicRecover(bool requeue)
{
var cmd = new BasicRecover(requeue);
ModelSend(ref cmd);
ModelSend(new BasicRecover(requeue));
}

public override void _Private_ChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId)
{
var cmd = new ChannelClose(replyCode, replyText, classId, methodId);
ModelSend(ref cmd);
ModelSend(new ChannelClose(replyCode, replyText, classId, methodId));
}

public override void _Private_ChannelCloseOk()
{
var cmd = new ChannelCloseOk();
ModelSend(ref cmd);
ModelSend(new ChannelCloseOk());
}

public override void _Private_ChannelFlowOk(bool active)
{
var cmd = new ChannelFlowOk(active);
ModelSend(ref cmd);
ModelSend(new ChannelFlowOk(active));
}

public override void _Private_ChannelOpen()
{
var cmd = new ChannelOpen();
ModelRpc(ref cmd, ProtocolCommandId.ChannelOpenOk);
ModelRpc(new ChannelOpen(), ProtocolCommandId.ChannelOpenOk);
}

public override void _Private_ConfirmSelect(bool nowait)
{
var method = new ConfirmSelect(nowait);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
}
else
{
ModelRpc(ref method, ProtocolCommandId.ConfirmSelectOk);
ModelRpc(method, ProtocolCommandId.ConfirmSelectOk);
}
}

public override void _Private_ConnectionCloseOk()
{
var cmd = new ConnectionCloseOk();
ModelSend(ref cmd);
ModelSend(new ConnectionCloseOk());
}

public override void _Private_ConnectionOpen(string virtualHost)
{
var cmd = new ConnectionOpen(virtualHost);
ModelSend(ref cmd);
ModelSend(new ConnectionOpen(virtualHost));
}

public override void _Private_ConnectionSecureOk(byte[] response)
{
var cmd = new ConnectionSecureOk(response);
ModelSend(ref cmd);
ModelSend(new ConnectionSecureOk(response));
}

public override void _Private_ConnectionStartOk(IDictionary<string, object> clientProperties, string mechanism, byte[] response, string locale)
{
var cmd = new ConnectionStartOk(clientProperties, mechanism, response, locale);
ModelSend(ref cmd);
ModelSend(new ConnectionStartOk(clientProperties, mechanism, response, locale));
}

public override void _Private_UpdateSecret(byte[] newSecret, string reason)
{
var cmd = new ConnectionUpdateSecret(newSecret, reason);
ModelRpc(ref cmd, ProtocolCommandId.ConnectionUpdateSecretOk);
ModelRpc(new ConnectionUpdateSecret(newSecret, reason), ProtocolCommandId.ConnectionUpdateSecretOk);
}

public override void _Private_ExchangeBind(string destination, string source, string routingKey, bool nowait, IDictionary<string, object> arguments)
{
ExchangeBind method = new ExchangeBind(destination, source, routingKey, nowait, arguments);
var method = new ExchangeBind(destination, source, routingKey, nowait, arguments);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
}
else
{
ModelRpc(ref method, ProtocolCommandId.ExchangeBindOk);
ModelRpc(method, ProtocolCommandId.ExchangeBindOk);
}
}

public override void _Private_ExchangeDeclare(string exchange, string type, bool passive, bool durable, bool autoDelete, bool @internal, bool nowait, IDictionary<string, object> arguments)
{
ExchangeDeclare method = new ExchangeDeclare(exchange, type, passive, durable, autoDelete, @internal, nowait, arguments);
var method = new ExchangeDeclare(exchange, type, passive, durable, autoDelete, @internal, nowait, arguments);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
}
else
{
ModelRpc(ref method, ProtocolCommandId.ExchangeDeclareOk);
ModelRpc(method, ProtocolCommandId.ExchangeDeclareOk);
}
}

public override void _Private_ExchangeDelete(string exchange, bool ifUnused, bool nowait)
{
ExchangeDelete method = new ExchangeDelete(exchange, ifUnused, nowait);
var method = new ExchangeDelete(exchange, ifUnused, nowait);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
}
else
{
ModelRpc(ref method, ProtocolCommandId.ExchangeDeleteOk);
ModelRpc(method, ProtocolCommandId.ExchangeDeleteOk);
}
}

public override void _Private_ExchangeUnbind(string destination, string source, string routingKey, bool nowait, IDictionary<string, object> arguments)
{
ExchangeUnbind method = new ExchangeUnbind(destination, source, routingKey, nowait, arguments);
var method = new ExchangeUnbind(destination, source, routingKey, nowait, arguments);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
}
else
{
ModelRpc(ref method, ProtocolCommandId.ExchangeUnbindOk);
ModelRpc(method, ProtocolCommandId.ExchangeUnbindOk);
}
}

public override void _Private_QueueBind(string queue, string exchange, string routingKey, bool nowait, IDictionary<string, object> arguments)
{
QueueBind method = new QueueBind(queue, exchange, routingKey, nowait, arguments);
var method = new QueueBind(queue, exchange, routingKey, nowait, arguments);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
}
else
{
ModelRpc(ref method, ProtocolCommandId.QueueBindOk);
ModelRpc(method, ProtocolCommandId.QueueBindOk);
}
}

public override void _Private_QueueDeclare(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, IDictionary<string, object> arguments)
{
QueueDeclare method = new QueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments);
var method = new QueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
}
else
{
ModelSend(ref method);
ModelSend(method);
}
}

public override uint _Private_QueueDelete(string queue, bool ifUnused, bool ifEmpty, bool nowait)
{
QueueDelete method = new QueueDelete(queue, ifUnused, ifEmpty, nowait);
var method = new QueueDelete(queue, ifUnused, ifEmpty, nowait);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
return 0xFFFFFFFF;
}

return ModelRpc(ref method, ProtocolCommandId.QueueDeleteOk, memory => new QueueDeleteOk(memory.Span)._messageCount);
return ModelRpc(method, ProtocolCommandId.QueueDeleteOk, memory => new QueueDeleteOk(memory.Span)._messageCount);
}

public override uint _Private_QueuePurge(string queue, bool nowait)
{
QueuePurge method = new QueuePurge(queue, nowait);
var method = new QueuePurge(queue, nowait);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
return 0xFFFFFFFF;
}

return ModelRpc(ref method, ProtocolCommandId.QueuePurgeOk, memory => new QueuePurgeOk(memory.Span)._messageCount);
return ModelRpc(method, ProtocolCommandId.QueuePurgeOk, memory => new QueuePurgeOk(memory.Span)._messageCount);
}

public override void BasicAck(ulong deliveryTag, bool multiple)
{
var cmd = new BasicAck(deliveryTag, multiple);
ModelSend(ref cmd);
ModelSend(new BasicAck(deliveryTag, multiple));
}

public override void BasicNack(ulong deliveryTag, bool multiple, bool requeue)
{
var cmd = new BasicNack(deliveryTag, multiple, requeue);
ModelSend(ref cmd);
ModelSend(new BasicNack(deliveryTag, multiple, requeue));
}

public override void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
{
var cmd = new BasicQos(prefetchSize, prefetchCount, global);
ModelRpc(ref cmd, ProtocolCommandId.BasicQosOk);
ModelRpc(new BasicQos(prefetchSize, prefetchCount, global), ProtocolCommandId.BasicQosOk);
}

public override void BasicRecoverAsync(bool requeue)
{
var cmd = new BasicRecoverAsync(requeue);
ModelSend(ref cmd);
ModelSend(new BasicRecoverAsync(requeue));
}

public override void BasicReject(ulong deliveryTag, bool requeue)
{
var cmd = new BasicReject(deliveryTag, requeue);
ModelSend(ref cmd);
ModelSend(new BasicReject(deliveryTag, requeue));
}

public override void QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments)
{
var cmd = new QueueUnbind(queue, exchange, routingKey, arguments);
ModelRpc(ref cmd, ProtocolCommandId.QueueUnbindOk);
ModelRpc(new QueueUnbind(queue, exchange, routingKey, arguments), ProtocolCommandId.QueueUnbindOk);
}

public override void TxCommit()
{
var cmd = new TxCommit();
ModelRpc(ref cmd, ProtocolCommandId.TxCommitOk);
ModelRpc(new TxCommit(), ProtocolCommandId.TxCommitOk);
}

public override void TxRollback()
{
var cmd = new TxRollback();
ModelRpc(ref cmd, ProtocolCommandId.TxRollbackOk);
ModelRpc(new TxRollback(), ProtocolCommandId.TxRollbackOk);
}

public override void TxSelect()
{
var cmd = new TxSelect();
ModelRpc(ref cmd, ProtocolCommandId.TxSelectOk);
ModelRpc(new TxSelect(), ProtocolCommandId.TxSelectOk);
}

protected override bool DispatchAsynchronous(in IncomingCommand cmd)
Expand Down
8 changes: 4 additions & 4 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ public BasicGetResult BasicGet(string queue, bool autoAck)
public void BasicNack(ulong deliveryTag, bool multiple, bool requeue)
=> InnerChannel.BasicNack(deliveryTag, multiple, requeue);

public void BasicPublish<TProperties>(string exchange, string routingKey, ref TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
public void BasicPublish<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublish(exchange, routingKey, ref basicProperties, body, mandatory);
=> InnerChannel.BasicPublish(exchange, routingKey, in basicProperties, body, mandatory);

public void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, ref TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
public void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublish(exchange, routingKey, ref basicProperties, body, mandatory);
=> InnerChannel.BasicPublish(exchange, routingKey, in basicProperties, body, mandatory);

public void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
{
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private void HardProtocolExceptionHandler(HardProtocolException hpe)
try
{
var cmd = new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0);
_session0.Transmit(ref cmd);
_session0.Transmit(in cmd);
if (hpe.CanShutdownCleanly)
{
ClosingLoop();
Expand Down
3 changes: 1 addition & 2 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,7 @@ internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
// Try to send connection.close wait for CloseOk in the MainLoop
if (!_closed)
{
var cmd = new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0);
_session0.Transmit(ref cmd);
_session0.Transmit(new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0));
}
}
catch (AlreadyClosedException)
Expand Down
Loading

0 comments on commit 99a2b69

Please sign in to comment.