Skip to content

Commit

Permalink
Merge pull request #804 from rabbitmq/rabbitmq-dotnet-client-802
Browse files Browse the repository at this point in the history
Fix issue #802
  • Loading branch information
michaelklishin authored Apr 7, 2020
2 parents 337f96c + aeb1dc9 commit 9404246
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Buffers;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Impl
Expand Down Expand Up @@ -46,7 +47,9 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body)
{
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body));
IMemoryOwner<byte> bodyCopy = MemoryPool<byte>.Shared.Rent(body.Length);
body.CopyTo(bodyCopy.Memory);
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, bodyCopy, body.Length));
}

public void HandleBasicCancelOk(IBasicConsumer consumer, string consumerTag)
Expand Down
14 changes: 11 additions & 3 deletions projects/RabbitMQ.Client/client/impl/BasicDeliver.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Threading.Tasks;

Expand All @@ -14,7 +15,8 @@ sealed class BasicDeliver : Work
readonly string _exchange;
readonly string _routingKey;
readonly IBasicProperties _basicProperties;
readonly ReadOnlyMemory<byte> _body;
readonly IMemoryOwner<byte> _body;
readonly int _bodyLength;

public BasicDeliver(IBasicConsumer consumer,
string consumerTag,
Expand All @@ -23,7 +25,8 @@ public BasicDeliver(IBasicConsumer consumer,
string exchange,
string routingKey,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body) : base(consumer)
IMemoryOwner<byte> body,
int bodyLength) : base(consumer)
{
_consumerTag = consumerTag;
_deliveryTag = deliveryTag;
Expand All @@ -32,6 +35,7 @@ public BasicDeliver(IBasicConsumer consumer,
_routingKey = routingKey;
_basicProperties = basicProperties;
_body = body;
_bodyLength = bodyLength;
}

protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
Expand All @@ -44,7 +48,7 @@ await consumer.HandleBasicDeliver(_consumerTag,
_exchange,
_routingKey,
_basicProperties,
_body).ConfigureAwait(false);
_body.Memory.Slice(0, _bodyLength)).ConfigureAwait(false);
}
catch (Exception e)
{
Expand All @@ -55,6 +59,10 @@ await consumer.HandleBasicDeliver(_consumerTag,
};
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
finally
{
_body.Dispose();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
Expand Down Expand Up @@ -63,6 +64,8 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body)
{
IMemoryOwner<byte> memoryCopy = MemoryPool<byte>.Shared.Rent(body.Length);
body.CopyTo(memoryCopy.Memory);
UnlessShuttingDown(() =>
{
try
Expand All @@ -73,7 +76,7 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
exchange,
routingKey,
basicProperties,
body);
memoryCopy.Memory.Slice(0, body.Length));
}
catch (Exception e)
{
Expand All @@ -84,6 +87,10 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
};
_model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
}
finally
{
memoryCopy.Dispose();
}
});
}

Expand Down
8 changes: 2 additions & 6 deletions projects/Unit/TestBasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ public void TestBasicRoundtripArray()
};
string tag = m.BasicConsume(q.QueueName, true, consumer);


m.BasicPublish("", q.QueueName, bp, sendBody);
bool waitResFalse = are.WaitOne(2000);
m.BasicCancel(tag);


Assert.IsTrue(waitResFalse);
Assert.AreEqual(sendBody, consumeBody);
CollectionAssert.AreEqual(sendBody, consumeBody);
}
}

Expand All @@ -62,14 +60,12 @@ public void TestBasicRoundtripReadOnlyMemory()
};
string tag = m.BasicConsume(q.QueueName, true, consumer);


m.BasicPublish("", q.QueueName, bp, new ReadOnlyMemory<byte>(sendBody));
bool waitResFalse = are.WaitOne(2000);
m.BasicCancel(tag);


Assert.IsTrue(waitResFalse);
Assert.AreEqual(sendBody, consumeBody);
CollectionAssert.AreEqual(sendBody, consumeBody);
}
}
}
Expand Down
120 changes: 94 additions & 26 deletions projects/Unit/TestFloodPublishing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,52 +38,120 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;

using NUnit.Framework;
//using System.Timers;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Unit
{
[TestFixture]
public class TestFloodPublishing : IntegrationFixture
public class TestFloodPublishing
{
[SetUp]
public override void Init()
private readonly byte[] _body = new byte[2048];
private readonly TimeSpan _tenSeconds = TimeSpan.FromSeconds(10);

[Test]
public void TestUnthrottledFloodPublishing()
{
var connFactory = new ConnectionFactory()
{
RequestedHeartbeat = TimeSpan.FromSeconds(60),
AutomaticRecoveryEnabled = false
};
Conn = connFactory.CreateConnection();
Model = Conn.CreateModel();
}

[Test, Category("LongRunning")]
public void TestUnthrottledFloodPublishing()
{
Conn.ConnectionShutdown += (_, args) =>
using (var conn = connFactory.CreateConnection())
{
if (args.Initiator != ShutdownInitiator.Application)
using (var model = conn.CreateModel())
{
Assert.Fail("Unexpected connection shutdown!");
conn.ConnectionShutdown += (_, args) =>
{
if (args.Initiator != ShutdownInitiator.Application)
{
Assert.Fail("Unexpected connection shutdown!");
}
};

bool shouldStop = false;
DateTime now = DateTime.Now;
DateTime stopTime = DateTime.Now.Add(_tenSeconds);
for (int i = 0; i < 65535*64; i++)
{
if (i % 65536 == 0)
{
now = DateTime.Now;
shouldStop = DateTime.Now > stopTime;
if (shouldStop)
{
break;
}
}
model.BasicPublish("", "", null, _body);
}
Assert.IsTrue(conn.IsOpen);
}
}
}

[Test]
public void TestMultithreadFloodPublishing()
{
string testName = TestContext.CurrentContext.Test.FullName;
string message = string.Format("Hello from test {0}", testName);
byte[] sendBody = Encoding.UTF8.GetBytes(message);
int publishCount = 4096;
int receivedCount = 0;
AutoResetEvent autoResetEvent = new AutoResetEvent(false);

var cf = new ConnectionFactory()
{
RequestedHeartbeat = TimeSpan.FromSeconds(60),
AutomaticRecoveryEnabled = false
};

bool elapsed = false;
var t = new System.Threading.Timer((_obj) => { elapsed = true; }, null, 1000 * 185, -1);
/*
t.Elapsed += (_sender, _args) => { elapsed = true; };
t.AutoReset = false;
t.Start();
*/
while (!elapsed)
using (IConnection c = cf.CreateConnection())
{
Model.BasicPublish("", "", null, new byte[2048]);
string queueName = null;
using (IModel m = c.CreateModel())
{
QueueDeclareOk q = m.QueueDeclare();
queueName = q.QueueName;
}

Task pub = Task.Run(() =>
{
using (IModel m = c.CreateModel())
{
IBasicProperties bp = m.CreateBasicProperties();
for (int i = 0; i < publishCount; i++)
{
m.BasicPublish(string.Empty, queueName, bp, sendBody);
}
}
});

using (IModel consumerModel = c.CreateModel())
{
var consumer = new EventingBasicConsumer(consumerModel);
consumer.Received += (o, a) =>
{
string receivedMessage = Encoding.UTF8.GetString(a.Body.ToArray());
Assert.AreEqual(message, receivedMessage);
Interlocked.Increment(ref receivedCount);
if (receivedCount == publishCount)
{
autoResetEvent.Set();
}
};
consumerModel.BasicConsume(queueName, true, consumer);
Assert.IsTrue(pub.Wait(_tenSeconds));
Assert.IsTrue(autoResetEvent.WaitOne(_tenSeconds));
}

Assert.AreEqual(publishCount, receivedCount);
}
Assert.IsTrue(Conn.IsOpen);
t.Dispose();
}
}
}
2 changes: 1 addition & 1 deletion projects/Unit/TestRecoverAfterCancel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void TestRecoverAfterCancel_()
Channel.BasicConsume(Queue, false, Consumer2);
BasicDeliverEventArgs Event2 = EventQueue2.Dequeue();

Assert.AreEqual(Event.Body, Event2.Body);
CollectionAssert.AreEqual(Event.Body.ToArray(), Event2.Body.ToArray());
Assert.IsFalse(Event.Redelivered);
Assert.IsTrue(Event2.Redelivered);
}
Expand Down

0 comments on commit 9404246

Please sign in to comment.