Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change ref to in and use Unsafe.AsRef #1247

Merged
merged 1 commit into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/api/IModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ string BasicConsume(
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
void BasicPublish<TProperties>(string exchange, string routingKey, ref TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
void BasicPublish<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
/// <summary>
/// Publishes a message.
Expand All @@ -198,7 +198,7 @@ void BasicPublish<TProperties>(string exchange, string routingKey, ref TProperti
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, ref TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
#nullable disable

Expand Down
12 changes: 6 additions & 6 deletions projects/RabbitMQ.Client/client/api/IModelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,17 @@ public static string BasicConsume(this IModel model, string queue,
/// <remarks>
/// The publication occurs with mandatory=false and immediate=false.
/// </remarks>
public static void BasicPublish<T>(this IModel model, PublicationAddress addr, ref T basicProperties, ReadOnlyMemory<byte> body)
public static void BasicPublish<T>(this IModel model, PublicationAddress addr, in T basicProperties, ReadOnlyMemory<byte> body)
where T : IReadOnlyBasicProperties, IAmqpHeader
{
model.BasicPublish(addr.ExchangeName, addr.RoutingKey, ref basicProperties, body);
model.BasicPublish(addr.ExchangeName, addr.RoutingKey, in basicProperties, body);
}

public static void BasicPublish(this IModel model, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> model.BasicPublish(exchange, routingKey, ref EmptyBasicProperty.Empty, body, mandatory);
=> model.BasicPublish(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);

public static void BasicPublish(this IModel model, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> model.BasicPublish(exchange, routingKey, ref EmptyBasicProperty.Empty, body, mandatory);
=> model.BasicPublish(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
#nullable disable

/// <summary>
Expand All @@ -113,7 +113,7 @@ public static void ExchangeBind(this IModel model, string destination, string so
}

/// <summary>
/// (Extension method) Like exchange bind but sets nowait to true.
/// (Extension method) Like exchange bind but sets nowait to true.
/// </summary>
public static void ExchangeBindNoWait(this IModel model, string destination, string source, string routingKey, IDictionary<string, object> arguments = null)
{
Expand All @@ -130,7 +130,7 @@ public static void ExchangeDeclare(this IModel model, string exchange, string ty
}

/// <summary>
/// (Extension method) Like ExchangeDeclare but sets nowait to true.
/// (Extension method) Like ExchangeDeclare but sets nowait to true.
/// </summary>
public static void ExchangeDeclareNoWait(this IModel model, string exchange, string type, bool durable = false, bool autoDelete = false,
IDictionary<string, object> arguments = null)
Expand Down
121 changes: 49 additions & 72 deletions projects/RabbitMQ.Client/client/framing/Model.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,255 +44,232 @@ public Model(ConnectionConfig config, ISession session) : base(config, session)

public override void ConnectionTuneOk(ushort channelMax, uint frameMax, ushort heartbeat)
{
var cmd = new ConnectionTuneOk(channelMax, frameMax, heartbeat);
ModelSend(ref cmd);
ModelSend(new ConnectionTuneOk(channelMax, frameMax, heartbeat));
}

public override void _Private_BasicCancel(string consumerTag, bool nowait)
{
var cmd = new BasicCancel(consumerTag, nowait);
ModelSend(ref cmd);
ModelSend(new BasicCancel(consumerTag, nowait));
}

public override void _Private_BasicConsume(string queue, string consumerTag, bool noLocal, bool autoAck, bool exclusive, bool nowait, IDictionary<string, object> arguments)
{
var cmd = new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, nowait, arguments);
ModelSend(ref cmd);
ModelSend(new BasicConsume(queue, consumerTag, noLocal, autoAck, exclusive, nowait, arguments));
}

public override void _Private_BasicGet(string queue, bool autoAck)
{
var cmd = new BasicGet(queue, autoAck);
ModelSend(ref cmd);
ModelSend(new BasicGet(queue, autoAck));
}

public override void _Private_BasicRecover(bool requeue)
{
var cmd = new BasicRecover(requeue);
ModelSend(ref cmd);
ModelSend(new BasicRecover(requeue));
}

public override void _Private_ChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId)
{
var cmd = new ChannelClose(replyCode, replyText, classId, methodId);
ModelSend(ref cmd);
ModelSend(new ChannelClose(replyCode, replyText, classId, methodId));
}

public override void _Private_ChannelCloseOk()
{
var cmd = new ChannelCloseOk();
ModelSend(ref cmd);
ModelSend(new ChannelCloseOk());
}

public override void _Private_ChannelFlowOk(bool active)
{
var cmd = new ChannelFlowOk(active);
ModelSend(ref cmd);
ModelSend(new ChannelFlowOk(active));
}

public override void _Private_ChannelOpen()
{
var cmd = new ChannelOpen();
ModelRpc(ref cmd, ProtocolCommandId.ChannelOpenOk);
ModelRpc(new ChannelOpen(), ProtocolCommandId.ChannelOpenOk);
}

public override void _Private_ConfirmSelect(bool nowait)
{
var method = new ConfirmSelect(nowait);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
}
else
{
ModelRpc(ref method, ProtocolCommandId.ConfirmSelectOk);
ModelRpc(method, ProtocolCommandId.ConfirmSelectOk);
}
}

public override void _Private_ConnectionCloseOk()
{
var cmd = new ConnectionCloseOk();
ModelSend(ref cmd);
ModelSend(new ConnectionCloseOk());
}

public override void _Private_ConnectionOpen(string virtualHost)
{
var cmd = new ConnectionOpen(virtualHost);
ModelSend(ref cmd);
ModelSend(new ConnectionOpen(virtualHost));
}

public override void _Private_ConnectionSecureOk(byte[] response)
{
var cmd = new ConnectionSecureOk(response);
ModelSend(ref cmd);
ModelSend(new ConnectionSecureOk(response));
}

public override void _Private_ConnectionStartOk(IDictionary<string, object> clientProperties, string mechanism, byte[] response, string locale)
{
var cmd = new ConnectionStartOk(clientProperties, mechanism, response, locale);
ModelSend(ref cmd);
ModelSend(new ConnectionStartOk(clientProperties, mechanism, response, locale));
}

public override void _Private_UpdateSecret(byte[] newSecret, string reason)
{
var cmd = new ConnectionUpdateSecret(newSecret, reason);
ModelRpc(ref cmd, ProtocolCommandId.ConnectionUpdateSecretOk);
ModelRpc(new ConnectionUpdateSecret(newSecret, reason), ProtocolCommandId.ConnectionUpdateSecretOk);
}

public override void _Private_ExchangeBind(string destination, string source, string routingKey, bool nowait, IDictionary<string, object> arguments)
{
ExchangeBind method = new ExchangeBind(destination, source, routingKey, nowait, arguments);
var method = new ExchangeBind(destination, source, routingKey, nowait, arguments);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
}
else
{
ModelRpc(ref method, ProtocolCommandId.ExchangeBindOk);
ModelRpc(method, ProtocolCommandId.ExchangeBindOk);
}
}

public override void _Private_ExchangeDeclare(string exchange, string type, bool passive, bool durable, bool autoDelete, bool @internal, bool nowait, IDictionary<string, object> arguments)
{
ExchangeDeclare method = new ExchangeDeclare(exchange, type, passive, durable, autoDelete, @internal, nowait, arguments);
var method = new ExchangeDeclare(exchange, type, passive, durable, autoDelete, @internal, nowait, arguments);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
}
else
{
ModelRpc(ref method, ProtocolCommandId.ExchangeDeclareOk);
ModelRpc(method, ProtocolCommandId.ExchangeDeclareOk);
}
}

public override void _Private_ExchangeDelete(string exchange, bool ifUnused, bool nowait)
{
ExchangeDelete method = new ExchangeDelete(exchange, ifUnused, nowait);
var method = new ExchangeDelete(exchange, ifUnused, nowait);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
}
else
{
ModelRpc(ref method, ProtocolCommandId.ExchangeDeleteOk);
ModelRpc(method, ProtocolCommandId.ExchangeDeleteOk);
}
}

public override void _Private_ExchangeUnbind(string destination, string source, string routingKey, bool nowait, IDictionary<string, object> arguments)
{
ExchangeUnbind method = new ExchangeUnbind(destination, source, routingKey, nowait, arguments);
var method = new ExchangeUnbind(destination, source, routingKey, nowait, arguments);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
}
else
{
ModelRpc(ref method, ProtocolCommandId.ExchangeUnbindOk);
ModelRpc(method, ProtocolCommandId.ExchangeUnbindOk);
}
}

public override void _Private_QueueBind(string queue, string exchange, string routingKey, bool nowait, IDictionary<string, object> arguments)
{
QueueBind method = new QueueBind(queue, exchange, routingKey, nowait, arguments);
var method = new QueueBind(queue, exchange, routingKey, nowait, arguments);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
}
else
{
ModelRpc(ref method, ProtocolCommandId.QueueBindOk);
ModelRpc(method, ProtocolCommandId.QueueBindOk);
}
}

public override void _Private_QueueDeclare(string queue, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, IDictionary<string, object> arguments)
{
QueueDeclare method = new QueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments);
var method = new QueueDeclare(queue, passive, durable, exclusive, autoDelete, nowait, arguments);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
}
else
{
ModelSend(ref method);
ModelSend(method);
}
}

public override uint _Private_QueueDelete(string queue, bool ifUnused, bool ifEmpty, bool nowait)
{
QueueDelete method = new QueueDelete(queue, ifUnused, ifEmpty, nowait);
var method = new QueueDelete(queue, ifUnused, ifEmpty, nowait);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
return 0xFFFFFFFF;
}

return ModelRpc(ref method, ProtocolCommandId.QueueDeleteOk, memory => new QueueDeleteOk(memory.Span)._messageCount);
return ModelRpc(method, ProtocolCommandId.QueueDeleteOk, memory => new QueueDeleteOk(memory.Span)._messageCount);
}

public override uint _Private_QueuePurge(string queue, bool nowait)
{
QueuePurge method = new QueuePurge(queue, nowait);
var method = new QueuePurge(queue, nowait);
if (nowait)
{
ModelSend(ref method);
ModelSend(method);
return 0xFFFFFFFF;
}

return ModelRpc(ref method, ProtocolCommandId.QueuePurgeOk, memory => new QueuePurgeOk(memory.Span)._messageCount);
return ModelRpc(method, ProtocolCommandId.QueuePurgeOk, memory => new QueuePurgeOk(memory.Span)._messageCount);
}

public override void BasicAck(ulong deliveryTag, bool multiple)
{
var cmd = new BasicAck(deliveryTag, multiple);
ModelSend(ref cmd);
ModelSend(new BasicAck(deliveryTag, multiple));
}

public override void BasicNack(ulong deliveryTag, bool multiple, bool requeue)
{
var cmd = new BasicNack(deliveryTag, multiple, requeue);
ModelSend(ref cmd);
ModelSend(new BasicNack(deliveryTag, multiple, requeue));
}

public override void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
{
var cmd = new BasicQos(prefetchSize, prefetchCount, global);
ModelRpc(ref cmd, ProtocolCommandId.BasicQosOk);
ModelRpc(new BasicQos(prefetchSize, prefetchCount, global), ProtocolCommandId.BasicQosOk);
}

public override void BasicRecoverAsync(bool requeue)
{
var cmd = new BasicRecoverAsync(requeue);
ModelSend(ref cmd);
ModelSend(new BasicRecoverAsync(requeue));
}

public override void BasicReject(ulong deliveryTag, bool requeue)
{
var cmd = new BasicReject(deliveryTag, requeue);
ModelSend(ref cmd);
ModelSend(new BasicReject(deliveryTag, requeue));
}

public override void QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments)
{
var cmd = new QueueUnbind(queue, exchange, routingKey, arguments);
ModelRpc(ref cmd, ProtocolCommandId.QueueUnbindOk);
ModelRpc(new QueueUnbind(queue, exchange, routingKey, arguments), ProtocolCommandId.QueueUnbindOk);
}

public override void TxCommit()
{
var cmd = new TxCommit();
ModelRpc(ref cmd, ProtocolCommandId.TxCommitOk);
ModelRpc(new TxCommit(), ProtocolCommandId.TxCommitOk);
}

public override void TxRollback()
{
var cmd = new TxRollback();
ModelRpc(ref cmd, ProtocolCommandId.TxRollbackOk);
ModelRpc(new TxRollback(), ProtocolCommandId.TxRollbackOk);
}

public override void TxSelect()
{
var cmd = new TxSelect();
ModelRpc(ref cmd, ProtocolCommandId.TxSelectOk);
ModelRpc(new TxSelect(), ProtocolCommandId.TxSelectOk);
}

protected override bool DispatchAsynchronous(in IncomingCommand cmd)
Expand Down
8 changes: 4 additions & 4 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ public BasicGetResult BasicGet(string queue, bool autoAck)
public void BasicNack(ulong deliveryTag, bool multiple, bool requeue)
=> InnerChannel.BasicNack(deliveryTag, multiple, requeue);

public void BasicPublish<TProperties>(string exchange, string routingKey, ref TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
public void BasicPublish<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublish(exchange, routingKey, ref basicProperties, body, mandatory);
=> InnerChannel.BasicPublish(exchange, routingKey, in basicProperties, body, mandatory);

public void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, ref TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
public void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublish(exchange, routingKey, ref basicProperties, body, mandatory);
=> InnerChannel.BasicPublish(exchange, routingKey, in basicProperties, body, mandatory);

public void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
{
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private void HardProtocolExceptionHandler(HardProtocolException hpe)
try
{
var cmd = new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0);
_session0.Transmit(ref cmd);
_session0.Transmit(in cmd);
if (hpe.CanShutdownCleanly)
{
ClosingLoop();
Expand Down
3 changes: 1 addition & 2 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,7 @@ internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
// Try to send connection.close wait for CloseOk in the MainLoop
if (!_closed)
{
var cmd = new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0);
_session0.Transmit(ref cmd);
_session0.Transmit(new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0));
}
}
catch (AlreadyClosedException)
Expand Down
Loading