From c0510aa90eb96f163b9f9337b6f500b9662d4001 Mon Sep 17 00:00:00 2001 From: petrov-e Date: Fri, 3 Apr 2020 16:18:03 +0300 Subject: [PATCH 01/12] add test --- projects/Unit/TestFloodPublishing.cs | 82 ++++++++++++++++++++++++---- 1 file changed, 71 insertions(+), 11 deletions(-) diff --git a/projects/Unit/TestFloodPublishing.cs b/projects/Unit/TestFloodPublishing.cs index 56417b0bc2..0b890ba039 100644 --- a/projects/Unit/TestFloodPublishing.cs +++ b/projects/Unit/TestFloodPublishing.cs @@ -38,31 +38,30 @@ // Copyright (c) 2007-2020 VMware, Inc. All rights reserved. //--------------------------------------------------------------------------- -using System; - using NUnit.Framework; -//using System.Timers; +using RabbitMQ.Client.Events; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; namespace RabbitMQ.Client.Unit { [TestFixture] public class TestFloodPublishing : IntegrationFixture { - [SetUp] - public override void Init() + [Test, Category("LongRunning")] + public void TestUnthrottledFloodPublishing() { var connFactory = new ConnectionFactory() { RequestedHeartbeat = TimeSpan.FromSeconds(60), AutomaticRecoveryEnabled = false }; - Conn = connFactory.CreateConnection(); - Model = Conn.CreateModel(); - } + using var Conn = connFactory.CreateConnection(); + using var Model = Conn.CreateModel(); - [Test, Category("LongRunning")] - public void TestUnthrottledFloodPublishing() - { Conn.ConnectionShutdown += (_, args) => { if (args.Initiator != ShutdownInitiator.Application) @@ -85,5 +84,66 @@ public void TestUnthrottledFloodPublishing() Assert.IsTrue(Conn.IsOpen); t.Dispose(); } + + [Test] + public async Task TestMultithreadFloodPublishing() + { + string message = "test message"; + int threadCount = 4; + int publishCount = 100; + var receivedCount = 0; + byte[] sendBody = Encoding.UTF8.GetBytes(message); + + var cf = new ConnectionFactory(); + using (IConnection c = cf.CreateConnection()) + using (IModel m = c.CreateModel()) + { + QueueDeclareOk q = m.QueueDeclare(); + IBasicProperties bp = m.CreateBasicProperties(); + + var consumer = new EventingBasicConsumer(m); + var tcs = new TaskCompletionSource(); + consumer.Received += (o, a) => + { + Assert.AreEqual(message, Encoding.UTF8.GetString(a.Body.ToArray())); + + var result = Interlocked.Increment(ref receivedCount); + if (result == threadCount * publishCount) + { + tcs.SetResult(true); + } + }; + + string tag = m.BasicConsume(q.QueueName, true, consumer); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + + + using (var timeoutRegistration = cts.Token.Register(() => tcs.SetCanceled())) + { + var tasks = new List(); + for (int i = 0; i < threadCount; i++) + { + tasks.Add(Task.Run(() => StartFlood(m, q.QueueName, bp, sendBody, publishCount))); + } + await Task.WhenAll(tasks); + await tcs.Task; + } + m.BasicCancel(tag); + + + + Assert.AreEqual(threadCount * publishCount, receivedCount); + } + + + void StartFlood(IModel model, string queue, IBasicProperties properties, byte[] body, int count) + { + for (int i = 0; i < count; i++) + { + model.BasicPublish(string.Empty, queue, properties, body); + } + } + } } } From c105a17ea71e8e80aff3d900791a60794131c887 Mon Sep 17 00:00:00 2001 From: petrov-e Date: Fri, 3 Apr 2020 20:03:36 +0300 Subject: [PATCH 02/12] make test single thread --- projects/Unit/TestFloodPublishing.cs | 34 ++++++++++++++++------------ 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/projects/Unit/TestFloodPublishing.cs b/projects/Unit/TestFloodPublishing.cs index 0b890ba039..3ce68df14e 100644 --- a/projects/Unit/TestFloodPublishing.cs +++ b/projects/Unit/TestFloodPublishing.cs @@ -41,7 +41,7 @@ using NUnit.Framework; using RabbitMQ.Client.Events; using System; -using System.Collections.Generic; +using System.Diagnostics; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -89,7 +89,7 @@ public void TestUnthrottledFloodPublishing() public async Task TestMultithreadFloodPublishing() { string message = "test message"; - int threadCount = 4; + int threadCount = 1; int publishCount = 100; var receivedCount = 0; byte[] sendBody = Encoding.UTF8.GetBytes(message); @@ -105,7 +105,13 @@ public async Task TestMultithreadFloodPublishing() var tcs = new TaskCompletionSource(); consumer.Received += (o, a) => { - Assert.AreEqual(message, Encoding.UTF8.GetString(a.Body.ToArray())); + Console.WriteLine("Receiving"); + var receivedMessage = Encoding.UTF8.GetString(a.Body.ToArray()); + if (!receivedMessage.Equals(message)) + { + Debugger.Break(); + } + Assert.AreEqual(message, receivedMessage); var result = Interlocked.Increment(ref receivedCount); if (result == threadCount * publishCount) @@ -115,24 +121,22 @@ public async Task TestMultithreadFloodPublishing() }; string tag = m.BasicConsume(q.QueueName, true, consumer); - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); - - + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); using (var timeoutRegistration = cts.Token.Register(() => tcs.SetCanceled())) { - var tasks = new List(); - for (int i = 0; i < threadCount; i++) - { - tasks.Add(Task.Run(() => StartFlood(m, q.QueueName, bp, sendBody, publishCount))); - } - await Task.WhenAll(tasks); + StartFlood(m, q.QueueName, bp, sendBody, publishCount); + + //var tasks = new List(); + //for (int i = 0; i < threadCount; i++) + //{ + // tasks.Add(Task.Run(() => StartFlood(m, q.QueueName, bp, sendBody, publishCount))); + //} + //await Task.WhenAll(tasks); await tcs.Task; } m.BasicCancel(tag); - - - + await tcs.Task; Assert.AreEqual(threadCount * publishCount, receivedCount); } From c17c33a258168568b14e119217420d520c8d9358 Mon Sep 17 00:00:00 2001 From: petrov-e Date: Fri, 3 Apr 2020 20:04:48 +0300 Subject: [PATCH 03/12] clean --- projects/Unit/TestFloodPublishing.cs | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/projects/Unit/TestFloodPublishing.cs b/projects/Unit/TestFloodPublishing.cs index 3ce68df14e..32efb41914 100644 --- a/projects/Unit/TestFloodPublishing.cs +++ b/projects/Unit/TestFloodPublishing.cs @@ -105,12 +105,7 @@ public async Task TestMultithreadFloodPublishing() var tcs = new TaskCompletionSource(); consumer.Received += (o, a) => { - Console.WriteLine("Receiving"); var receivedMessage = Encoding.UTF8.GetString(a.Body.ToArray()); - if (!receivedMessage.Equals(message)) - { - Debugger.Break(); - } Assert.AreEqual(message, receivedMessage); var result = Interlocked.Increment(ref receivedCount); @@ -125,29 +120,17 @@ public async Task TestMultithreadFloodPublishing() using (var timeoutRegistration = cts.Token.Register(() => tcs.SetCanceled())) { - StartFlood(m, q.QueueName, bp, sendBody, publishCount); + for (int i = 0; i < publishCount; i++) + { + m.BasicPublish(string.Empty, q.QueueName, bp, sendBody); + } - //var tasks = new List(); - //for (int i = 0; i < threadCount; i++) - //{ - // tasks.Add(Task.Run(() => StartFlood(m, q.QueueName, bp, sendBody, publishCount))); - //} - //await Task.WhenAll(tasks); await tcs.Task; } m.BasicCancel(tag); await tcs.Task; Assert.AreEqual(threadCount * publishCount, receivedCount); } - - - void StartFlood(IModel model, string queue, IBasicProperties properties, byte[] body, int count) - { - for (int i = 0; i < count; i++) - { - model.BasicPublish(string.Empty, queue, properties, body); - } - } } } } From 366fb9bd156f13fae7b520c3b2b27c8982b4b8c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stef=C3=A1n=20J=C3=B6kull=20Sigur=C3=B0arson?= Date: Fri, 3 Apr 2020 17:58:00 +0000 Subject: [PATCH 04/12] Fixing stuff. --- .../client/impl/AsyncConsumerDispatcher.cs | 5 ++++- .../RabbitMQ.Client/client/impl/BasicDeliver.cs | 14 +++++++++++--- .../client/impl/ConcurrentConsumerDispatcher.cs | 9 ++++++++- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/projects/RabbitMQ.Client/client/impl/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/AsyncConsumerDispatcher.cs index dd17c34bbb..8938b14b2d 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncConsumerDispatcher.cs @@ -1,4 +1,5 @@ using System; +using System.Buffers; using System.Threading.Tasks; namespace RabbitMQ.Client.Impl @@ -46,7 +47,9 @@ public void HandleBasicDeliver(IBasicConsumer consumer, IBasicProperties basicProperties, ReadOnlyMemory body) { - ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body)); + IMemoryOwner bodyCopy = MemoryPool.Shared.Rent(body.Length); + body.CopyTo(bodyCopy.Memory); + ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, bodyCopy, body.Length)); } public void HandleBasicCancelOk(IBasicConsumer consumer, string consumerTag) diff --git a/projects/RabbitMQ.Client/client/impl/BasicDeliver.cs b/projects/RabbitMQ.Client/client/impl/BasicDeliver.cs index f50856b874..360ce1bb95 100644 --- a/projects/RabbitMQ.Client/client/impl/BasicDeliver.cs +++ b/projects/RabbitMQ.Client/client/impl/BasicDeliver.cs @@ -1,4 +1,5 @@ using System; +using System.Buffers; using System.Collections.Generic; using System.Threading.Tasks; @@ -14,7 +15,8 @@ sealed class BasicDeliver : Work readonly string _exchange; readonly string _routingKey; readonly IBasicProperties _basicProperties; - readonly ReadOnlyMemory _body; + readonly IMemoryOwner _body; + readonly int _bodyLength; public BasicDeliver(IBasicConsumer consumer, string consumerTag, @@ -23,7 +25,8 @@ public BasicDeliver(IBasicConsumer consumer, string exchange, string routingKey, IBasicProperties basicProperties, - ReadOnlyMemory body) : base(consumer) + IMemoryOwner body, + int bodyLength) : base(consumer) { _consumerTag = consumerTag; _deliveryTag = deliveryTag; @@ -32,6 +35,7 @@ public BasicDeliver(IBasicConsumer consumer, _routingKey = routingKey; _basicProperties = basicProperties; _body = body; + _bodyLength = bodyLength; } protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer) @@ -44,7 +48,7 @@ await consumer.HandleBasicDeliver(_consumerTag, _exchange, _routingKey, _basicProperties, - _body).ConfigureAwait(false); + _body.Memory.Slice(0, _bodyLength)).ConfigureAwait(false); } catch (Exception e) { @@ -55,6 +59,10 @@ await consumer.HandleBasicDeliver(_consumerTag, }; model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); } + finally + { + _body.Dispose(); + } } } } diff --git a/projects/RabbitMQ.Client/client/impl/ConcurrentConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConcurrentConsumerDispatcher.cs index 3664316102..feff45e8d2 100644 --- a/projects/RabbitMQ.Client/client/impl/ConcurrentConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConcurrentConsumerDispatcher.cs @@ -1,4 +1,5 @@ using System; +using System.Buffers; using System.Collections.Generic; using System.Threading.Tasks; using RabbitMQ.Client.Events; @@ -63,6 +64,8 @@ public void HandleBasicDeliver(IBasicConsumer consumer, IBasicProperties basicProperties, ReadOnlyMemory body) { + IMemoryOwner memoryCopy = MemoryPool.Shared.Rent(body.Length); + body.CopyTo(memoryCopy.Memory); UnlessShuttingDown(() => { try @@ -73,7 +76,7 @@ public void HandleBasicDeliver(IBasicConsumer consumer, exchange, routingKey, basicProperties, - body); + memoryCopy.Memory.Slice(0, body.Length)); } catch (Exception e) { @@ -84,6 +87,10 @@ public void HandleBasicDeliver(IBasicConsumer consumer, }; _model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details)); } + finally + { + memoryCopy.Dispose(); + } }); } From 76ae8dddf47623c1c5f3c48262b44db171b30e84 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Sat, 4 Apr 2020 17:42:23 -0700 Subject: [PATCH 05/12] Test fixup --- projects/Unit/TestFloodPublishing.cs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/projects/Unit/TestFloodPublishing.cs b/projects/Unit/TestFloodPublishing.cs index 32efb41914..48f94f39c7 100644 --- a/projects/Unit/TestFloodPublishing.cs +++ b/projects/Unit/TestFloodPublishing.cs @@ -71,18 +71,14 @@ public void TestUnthrottledFloodPublishing() }; bool elapsed = false; - var t = new System.Threading.Timer((_obj) => { elapsed = true; }, null, 1000 * 185, -1); - /* - t.Elapsed += (_sender, _args) => { elapsed = true; }; - t.AutoReset = false; - t.Start(); -*/ - while (!elapsed) + using (Timer t = new Timer((_obj) => { elapsed = true; }, null, 1000 * 185, -1)) { - Model.BasicPublish("", "", null, new byte[2048]); + while (!elapsed) + { + Model.BasicPublish("", "", null, new byte[2048]); + } + Assert.IsTrue(Conn.IsOpen); } - Assert.IsTrue(Conn.IsOpen); - t.Dispose(); } [Test] @@ -100,7 +96,7 @@ public async Task TestMultithreadFloodPublishing() { QueueDeclareOk q = m.QueueDeclare(); IBasicProperties bp = m.CreateBasicProperties(); - + var consumer = new EventingBasicConsumer(m); var tcs = new TaskCompletionSource(); consumer.Received += (o, a) => From 157d71c0424e316495f4a8cda8a208d81f3d1a71 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Sun, 5 Apr 2020 08:40:13 -0700 Subject: [PATCH 06/12] Move flaky test to LongRunning category --- projects/Unit/TestFloodPublishing.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/projects/Unit/TestFloodPublishing.cs b/projects/Unit/TestFloodPublishing.cs index 48f94f39c7..2e387d5022 100644 --- a/projects/Unit/TestFloodPublishing.cs +++ b/projects/Unit/TestFloodPublishing.cs @@ -81,7 +81,8 @@ public void TestUnthrottledFloodPublishing() } } - [Test] + // TODO rabbitmq/rabbitmq-dotnet-client#802 FIX THIS + [Test, Category("LongRunning")] public async Task TestMultithreadFloodPublishing() { string message = "test message"; From 0e501a5f9adb3d336ae20cd947c9ff169784973a Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 6 Apr 2020 16:19:58 +0000 Subject: [PATCH 07/12] Test refactor --- projects/Unit/TestFloodPublishing.cs | 87 +++++++++++++++------------- 1 file changed, 46 insertions(+), 41 deletions(-) diff --git a/projects/Unit/TestFloodPublishing.cs b/projects/Unit/TestFloodPublishing.cs index 2e387d5022..8008a7385f 100644 --- a/projects/Unit/TestFloodPublishing.cs +++ b/projects/Unit/TestFloodPublishing.cs @@ -59,74 +59,79 @@ public void TestUnthrottledFloodPublishing() RequestedHeartbeat = TimeSpan.FromSeconds(60), AutomaticRecoveryEnabled = false }; - using var Conn = connFactory.CreateConnection(); - using var Model = Conn.CreateModel(); - Conn.ConnectionShutdown += (_, args) => + using(var conn = connFactory.CreateConnection()) { - if (args.Initiator != ShutdownInitiator.Application) + using(var model = conn.CreateModel()) { - Assert.Fail("Unexpected connection shutdown!"); - } - }; + conn.ConnectionShutdown += (_, args) => + { + if (args.Initiator != ShutdownInitiator.Application) + { + Assert.Fail("Unexpected connection shutdown!"); + } + }; - bool elapsed = false; - using (Timer t = new Timer((_obj) => { elapsed = true; }, null, 1000 * 185, -1)) - { - while (!elapsed) - { - Model.BasicPublish("", "", null, new byte[2048]); + bool elapsed = false; + using (Timer t = new Timer((_obj) => { elapsed = true; }, null, 1000 * 185, -1)) + { + while (!elapsed) + { + model.BasicPublish("", "", null, new byte[2048]); + } + Assert.IsTrue(conn.IsOpen); + } } - Assert.IsTrue(Conn.IsOpen); } } - // TODO rabbitmq/rabbitmq-dotnet-client#802 FIX THIS + // rabbitmq/rabbitmq-dotnet-client#802 FIX THIS TODO [Test, Category("LongRunning")] public async Task TestMultithreadFloodPublishing() { string message = "test message"; int threadCount = 1; int publishCount = 100; - var receivedCount = 0; + int receivedCount = 0; byte[] sendBody = Encoding.UTF8.GetBytes(message); var cf = new ConnectionFactory(); using (IConnection c = cf.CreateConnection()) - using (IModel m = c.CreateModel()) { - QueueDeclareOk q = m.QueueDeclare(); - IBasicProperties bp = m.CreateBasicProperties(); - - var consumer = new EventingBasicConsumer(m); - var tcs = new TaskCompletionSource(); - consumer.Received += (o, a) => + using (IModel m = c.CreateModel()) { - var receivedMessage = Encoding.UTF8.GetString(a.Body.ToArray()); - Assert.AreEqual(message, receivedMessage); + QueueDeclareOk q = m.QueueDeclare(); + IBasicProperties bp = m.CreateBasicProperties(); - var result = Interlocked.Increment(ref receivedCount); - if (result == threadCount * publishCount) + var consumer = new EventingBasicConsumer(m); + var tcs = new TaskCompletionSource(); + consumer.Received += (o, a) => { - tcs.SetResult(true); - } - }; + var receivedMessage = Encoding.UTF8.GetString(a.Body.ToArray()); + Assert.AreEqual(message, receivedMessage); - string tag = m.BasicConsume(q.QueueName, true, consumer); - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var result = Interlocked.Increment(ref receivedCount); + if (result == threadCount * publishCount) + { + tcs.SetResult(true); + } + }; - using (var timeoutRegistration = cts.Token.Register(() => tcs.SetCanceled())) - { - for (int i = 0; i < publishCount; i++) + string tag = m.BasicConsume(q.QueueName, true, consumer); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + using (var timeoutRegistration = cts.Token.Register(() => tcs.SetCanceled())) { - m.BasicPublish(string.Empty, q.QueueName, bp, sendBody); + for (int i = 0; i < publishCount; i++) + { + m.BasicPublish(string.Empty, q.QueueName, bp, sendBody); + } + bool allMessagesReceived = await tcs.Task; + Assert.IsTrue(allMessagesReceived); } - - await tcs.Task; + m.BasicCancel(tag); + Assert.AreEqual(threadCount * publishCount, receivedCount); } - m.BasicCancel(tag); - await tcs.Task; - Assert.AreEqual(threadCount * publishCount, receivedCount); } } } From 9af8bf81adb3924c36254ce3d26dbfc6c8c44010 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 6 Apr 2020 17:04:45 +0000 Subject: [PATCH 08/12] Shorten test durations --- projects/Unit/TestFloodPublishing.cs | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/projects/Unit/TestFloodPublishing.cs b/projects/Unit/TestFloodPublishing.cs index 8008a7385f..f0dcfea77f 100644 --- a/projects/Unit/TestFloodPublishing.cs +++ b/projects/Unit/TestFloodPublishing.cs @@ -49,9 +49,12 @@ namespace RabbitMQ.Client.Unit { [TestFixture] - public class TestFloodPublishing : IntegrationFixture + public class TestFloodPublishing { - [Test, Category("LongRunning")] + private readonly byte[] _body = new byte[2048]; + private readonly TimeSpan _tenSeconds = TimeSpan.FromSeconds(10); + + [Test] public void TestUnthrottledFloodPublishing() { var connFactory = new ConnectionFactory() @@ -60,9 +63,9 @@ public void TestUnthrottledFloodPublishing() AutomaticRecoveryEnabled = false }; - using(var conn = connFactory.CreateConnection()) + using (var conn = connFactory.CreateConnection()) { - using(var model = conn.CreateModel()) + using (var model = conn.CreateModel()) { conn.ConnectionShutdown += (_, args) => { @@ -73,11 +76,11 @@ public void TestUnthrottledFloodPublishing() }; bool elapsed = false; - using (Timer t = new Timer((_obj) => { elapsed = true; }, null, 1000 * 185, -1)) + using (Timer t = new Timer((_obj) => { elapsed = true; }, null, (int)_tenSeconds.TotalMilliseconds, -1)) { while (!elapsed) { - model.BasicPublish("", "", null, new byte[2048]); + model.BasicPublish("", "", null, _body); } Assert.IsTrue(conn.IsOpen); } @@ -85,8 +88,7 @@ public void TestUnthrottledFloodPublishing() } } - // rabbitmq/rabbitmq-dotnet-client#802 FIX THIS TODO - [Test, Category("LongRunning")] + [Test] public async Task TestMultithreadFloodPublishing() { string message = "test message"; @@ -95,7 +97,12 @@ public async Task TestMultithreadFloodPublishing() int receivedCount = 0; byte[] sendBody = Encoding.UTF8.GetBytes(message); - var cf = new ConnectionFactory(); + var cf = new ConnectionFactory() + { + RequestedHeartbeat = TimeSpan.FromSeconds(60), + AutomaticRecoveryEnabled = false + }; + using (IConnection c = cf.CreateConnection()) { using (IModel m = c.CreateModel()) @@ -118,7 +125,7 @@ public async Task TestMultithreadFloodPublishing() }; string tag = m.BasicConsume(q.QueueName, true, consumer); - var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var cts = new CancellationTokenSource(_tenSeconds); using (var timeoutRegistration = cts.Token.Register(() => tcs.SetCanceled())) { From 40f5a25e4d8cd272cb082dfc4246e2da072f84ef Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 6 Apr 2020 15:24:01 -0700 Subject: [PATCH 09/12] Add test constraint and debugging --- projects/Unit/TestFloodPublishing.cs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/projects/Unit/TestFloodPublishing.cs b/projects/Unit/TestFloodPublishing.cs index f0dcfea77f..521622ec64 100644 --- a/projects/Unit/TestFloodPublishing.cs +++ b/projects/Unit/TestFloodPublishing.cs @@ -41,7 +41,6 @@ using NUnit.Framework; using RabbitMQ.Client.Events; using System; -using System.Diagnostics; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -75,15 +74,25 @@ public void TestUnthrottledFloodPublishing() } }; - bool elapsed = false; - using (Timer t = new Timer((_obj) => { elapsed = true; }, null, (int)_tenSeconds.TotalMilliseconds, -1)) + bool shouldStop = false; + DateTime now = DateTime.Now; + DateTime stopTime = DateTime.Now.Add(_tenSeconds); + for (int i = 0; i < 65535*64; i++) { - while (!elapsed) + if (i % 65536 == 0) { - model.BasicPublish("", "", null, _body); + now = DateTime.Now; + shouldStop = DateTime.Now > stopTime; + TestContext.Out.WriteLine("@@@@@@@@ NUNIT Checking now {0} stopTime {1} shouldStop {2}", now, stopTime, shouldStop); + Console.Error.WriteLine("@@@@@@@@ STDERR Checking now {0} stopTime {1} shouldStop {2}", now, stopTime, shouldStop); + if (shouldStop) + { + break; + } } - Assert.IsTrue(conn.IsOpen); + model.BasicPublish("", "", null, _body); } + Assert.IsTrue(conn.IsOpen); } } } From 6989f9f646a4489479bcd932cb5f20e1634aad08 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 6 Apr 2020 16:19:54 -0700 Subject: [PATCH 10/12] Comment out test to see which one is freezing --- projects/Unit/TestFloodPublishing.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/projects/Unit/TestFloodPublishing.cs b/projects/Unit/TestFloodPublishing.cs index 521622ec64..3f6603a8d4 100644 --- a/projects/Unit/TestFloodPublishing.cs +++ b/projects/Unit/TestFloodPublishing.cs @@ -53,6 +53,7 @@ public class TestFloodPublishing private readonly byte[] _body = new byte[2048]; private readonly TimeSpan _tenSeconds = TimeSpan.FromSeconds(10); + /* [Test] public void TestUnthrottledFloodPublishing() { @@ -96,6 +97,7 @@ public void TestUnthrottledFloodPublishing() } } } + */ [Test] public async Task TestMultithreadFloodPublishing() From b200208903c6cf111c631e4815617d4faad86627 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 6 Apr 2020 17:31:12 -0700 Subject: [PATCH 11/12] Make test non-async but still threaded --- projects/Unit/TestFloodPublishing.cs | 60 +++++++++++++--------------- 1 file changed, 27 insertions(+), 33 deletions(-) diff --git a/projects/Unit/TestFloodPublishing.cs b/projects/Unit/TestFloodPublishing.cs index 3f6603a8d4..ca2e639ebd 100644 --- a/projects/Unit/TestFloodPublishing.cs +++ b/projects/Unit/TestFloodPublishing.cs @@ -53,7 +53,6 @@ public class TestFloodPublishing private readonly byte[] _body = new byte[2048]; private readonly TimeSpan _tenSeconds = TimeSpan.FromSeconds(10); - /* [Test] public void TestUnthrottledFloodPublishing() { @@ -84,8 +83,6 @@ public void TestUnthrottledFloodPublishing() { now = DateTime.Now; shouldStop = DateTime.Now > stopTime; - TestContext.Out.WriteLine("@@@@@@@@ NUNIT Checking now {0} stopTime {1} shouldStop {2}", now, stopTime, shouldStop); - Console.Error.WriteLine("@@@@@@@@ STDERR Checking now {0} stopTime {1} shouldStop {2}", now, stopTime, shouldStop); if (shouldStop) { break; @@ -97,16 +94,15 @@ public void TestUnthrottledFloodPublishing() } } } - */ [Test] - public async Task TestMultithreadFloodPublishing() + public void TestMultithreadFloodPublishing() { - string message = "test message"; - int threadCount = 1; - int publishCount = 100; - int receivedCount = 0; + string testName = TestContext.CurrentContext.Test.FullName; + string message = string.Format("Hello from test {0}", testName); byte[] sendBody = Encoding.UTF8.GetBytes(message); + int publishCount = 4096; + int receivedCount = 0; var cf = new ConnectionFactory() { @@ -116,39 +112,37 @@ public async Task TestMultithreadFloodPublishing() using (IConnection c = cf.CreateConnection()) { + string queueName = null; using (IModel m = c.CreateModel()) { QueueDeclareOk q = m.QueueDeclare(); - IBasicProperties bp = m.CreateBasicProperties(); - - var consumer = new EventingBasicConsumer(m); - var tcs = new TaskCompletionSource(); - consumer.Received += (o, a) => - { - var receivedMessage = Encoding.UTF8.GetString(a.Body.ToArray()); - Assert.AreEqual(message, receivedMessage); - - var result = Interlocked.Increment(ref receivedCount); - if (result == threadCount * publishCount) - { - tcs.SetResult(true); - } - }; - - string tag = m.BasicConsume(q.QueueName, true, consumer); - var cts = new CancellationTokenSource(_tenSeconds); + queueName = q.QueueName; + } - using (var timeoutRegistration = cts.Token.Register(() => tcs.SetCanceled())) + Task pub = Task.Run(() => + { + using (IModel m = c.CreateModel()) { + IBasicProperties bp = m.CreateBasicProperties(); for (int i = 0; i < publishCount; i++) { - m.BasicPublish(string.Empty, q.QueueName, bp, sendBody); + m.BasicPublish(string.Empty, queueName, bp, sendBody); } - bool allMessagesReceived = await tcs.Task; - Assert.IsTrue(allMessagesReceived); } - m.BasicCancel(tag); - Assert.AreEqual(threadCount * publishCount, receivedCount); + }); + + using (IModel consumerModel = c.CreateModel()) + { + var consumer = new EventingBasicConsumer(consumerModel); + consumer.Received += (o, a) => + { + string receivedMessage = Encoding.UTF8.GetString(a.Body.ToArray()); + Assert.AreEqual(message, receivedMessage); + Interlocked.Increment(ref receivedCount); + }; + consumerModel.BasicConsume(queueName, true, consumer); + Assert.IsTrue(pub.Wait(_tenSeconds)); + Assert.AreEqual(publishCount, receivedCount); } } } From aeb1dc99cff7542e3c0f1c8e20b3a9cea3be671f Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 6 Apr 2020 17:49:04 -0700 Subject: [PATCH 12/12] Ensure we wait for all messages to be consumed --- projects/Unit/TestBasicPublish.cs | 8 ++------ projects/Unit/TestFloodPublishing.cs | 9 ++++++++- projects/Unit/TestRecoverAfterCancel.cs | 2 +- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/projects/Unit/TestBasicPublish.cs b/projects/Unit/TestBasicPublish.cs index 9418b10a00..183838ddcc 100644 --- a/projects/Unit/TestBasicPublish.cs +++ b/projects/Unit/TestBasicPublish.cs @@ -30,14 +30,12 @@ public void TestBasicRoundtripArray() }; string tag = m.BasicConsume(q.QueueName, true, consumer); - m.BasicPublish("", q.QueueName, bp, sendBody); bool waitResFalse = are.WaitOne(2000); m.BasicCancel(tag); - Assert.IsTrue(waitResFalse); - Assert.AreEqual(sendBody, consumeBody); + CollectionAssert.AreEqual(sendBody, consumeBody); } } @@ -62,14 +60,12 @@ public void TestBasicRoundtripReadOnlyMemory() }; string tag = m.BasicConsume(q.QueueName, true, consumer); - m.BasicPublish("", q.QueueName, bp, new ReadOnlyMemory(sendBody)); bool waitResFalse = are.WaitOne(2000); m.BasicCancel(tag); - Assert.IsTrue(waitResFalse); - Assert.AreEqual(sendBody, consumeBody); + CollectionAssert.AreEqual(sendBody, consumeBody); } } } diff --git a/projects/Unit/TestFloodPublishing.cs b/projects/Unit/TestFloodPublishing.cs index ca2e639ebd..5f4b18ead2 100644 --- a/projects/Unit/TestFloodPublishing.cs +++ b/projects/Unit/TestFloodPublishing.cs @@ -103,6 +103,7 @@ public void TestMultithreadFloodPublishing() byte[] sendBody = Encoding.UTF8.GetBytes(message); int publishCount = 4096; int receivedCount = 0; + AutoResetEvent autoResetEvent = new AutoResetEvent(false); var cf = new ConnectionFactory() { @@ -139,11 +140,17 @@ public void TestMultithreadFloodPublishing() string receivedMessage = Encoding.UTF8.GetString(a.Body.ToArray()); Assert.AreEqual(message, receivedMessage); Interlocked.Increment(ref receivedCount); + if (receivedCount == publishCount) + { + autoResetEvent.Set(); + } }; consumerModel.BasicConsume(queueName, true, consumer); Assert.IsTrue(pub.Wait(_tenSeconds)); - Assert.AreEqual(publishCount, receivedCount); + Assert.IsTrue(autoResetEvent.WaitOne(_tenSeconds)); } + + Assert.AreEqual(publishCount, receivedCount); } } } diff --git a/projects/Unit/TestRecoverAfterCancel.cs b/projects/Unit/TestRecoverAfterCancel.cs index 462cf7cd36..09b5699d84 100644 --- a/projects/Unit/TestRecoverAfterCancel.cs +++ b/projects/Unit/TestRecoverAfterCancel.cs @@ -96,7 +96,7 @@ public void TestRecoverAfterCancel_() Channel.BasicConsume(Queue, false, Consumer2); BasicDeliverEventArgs Event2 = EventQueue2.Dequeue(); - Assert.AreEqual(Event.Body, Event2.Body); + CollectionAssert.AreEqual(Event.Body.ToArray(), Event2.Body.ToArray()); Assert.IsFalse(Event.Redelivered); Assert.IsTrue(Event2.Redelivered); }