From 291bbf00b511b5a309355fc8edf16b76ff677922 Mon Sep 17 00:00:00 2001 From: kjnilsson Date: Fri, 6 Sep 2019 13:09:44 +0100 Subject: [PATCH 1/2] Remove messaging patterns --- .../client/messagepatterns/ISubscription.cs | 71 --- .../client/messagepatterns/SimpleRpcClient.cs | 465 --------------- .../client/messagepatterns/SimpleRpcServer.cs | 442 -------------- .../client/messagepatterns/Subscription.cs | 560 ------------------ .../unit/TestMessagePatternsSubscription.cs | 207 ------- .../client/Unit/src/unit/TestSubscription.cs | 80 --- 6 files changed, 1825 deletions(-) delete mode 100644 projects/client/RabbitMQ.Client/src/client/messagepatterns/ISubscription.cs delete mode 100644 projects/client/RabbitMQ.Client/src/client/messagepatterns/SimpleRpcClient.cs delete mode 100644 projects/client/RabbitMQ.Client/src/client/messagepatterns/SimpleRpcServer.cs delete mode 100755 projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs delete mode 100644 projects/client/Unit/src/unit/TestMessagePatternsSubscription.cs delete mode 100755 projects/client/Unit/src/unit/TestSubscription.cs diff --git a/projects/client/RabbitMQ.Client/src/client/messagepatterns/ISubscription.cs b/projects/client/RabbitMQ.Client/src/client/messagepatterns/ISubscription.cs deleted file mode 100644 index 31d3ce029e..0000000000 --- a/projects/client/RabbitMQ.Client/src/client/messagepatterns/ISubscription.cs +++ /dev/null @@ -1,71 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 1.1. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2016 Pivotal Software, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v1.1: -// -//--------------------------------------------------------------------------- -// The contents of this file are subject to the Mozilla Public License -// Version 1.1 (the "License"); you may not use this file except in -// compliance with the License. You may obtain a copy of the License -// at https://www.mozilla.org/MPL/ -// -// Software distributed under the License is distributed on an "AS IS" -// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -// the License for the specific language governing rights and -// limitations under the License. -// -// The Original Code is RabbitMQ. -// -// The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using RabbitMQ.Client.Events; -using System; -using System.Collections; - -namespace RabbitMQ.Client.MessagePatterns -{ - ///Manages a subscription to a queue. - /// - /// - /// This interface is provided to make creation of test doubles - /// for easier. - /// - /// - public interface ISubscription : IEnumerable, IEnumerator, IDisposable - { - void Ack(); - void Ack(BasicDeliverEventArgs evt); - void Close(); - IBasicConsumer Consumer { get; } - string ConsumerTag { get; } - BasicDeliverEventArgs LatestEvent { get; } - IModel Model { get; } - void Nack(BasicDeliverEventArgs evt, bool multiple, bool requeue); - void Nack(bool multiple, bool requeue); - void Nack(bool requeue); - BasicDeliverEventArgs Next(); - bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result); - bool AutoAck { get; } - string QueueName { get; } - } -} diff --git a/projects/client/RabbitMQ.Client/src/client/messagepatterns/SimpleRpcClient.cs b/projects/client/RabbitMQ.Client/src/client/messagepatterns/SimpleRpcClient.cs deleted file mode 100644 index c6487a0879..0000000000 --- a/projects/client/RabbitMQ.Client/src/client/messagepatterns/SimpleRpcClient.cs +++ /dev/null @@ -1,465 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 1.1. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2016 Pivotal Software, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v1.1: -// -//--------------------------------------------------------------------------- -// The contents of this file are subject to the Mozilla Public License -// Version 1.1 (the "License"); you may not use this file except in -// compliance with the License. You may obtain a copy of the License -// at https://www.mozilla.org/MPL/ -// -// Software distributed under the License is distributed on an "AS IS" -// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -// the License for the specific language governing rights and -// limitations under the License. -// -// The Original Code is RabbitMQ. -// -// The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Net; -using System.Threading; -using RabbitMQ.Client.Content; -using RabbitMQ.Client.Events; - -namespace RabbitMQ.Client.MessagePatterns -{ - ///Implements a simple RPC client. - /// - /// - /// This class sends requests that can be processed by remote - /// SimpleRpcServer instances. - /// - /// - /// The basic pattern for accessing a remote service is to - /// determine the exchange name and routing key needed for - /// submissions of service requests, and to construct a - /// SimpleRpcClient instance using that address. Once constructed, - /// the various Call() and Cast() overloads can be used to send - /// requests and receive the corresponding replies. - /// - /// - /// string queueName = "ServiceRequestQueue"; // See also Subscription ctors - /// using (IConnection conn = new ConnectionFactory() - /// .CreateConnection(serverAddress)) { - /// using (IModel ch = conn.CreateModel()) { - /// SimpleRpcClient client = - /// new SimpleRpcClient(ch, queueName); - /// client.TimeoutMilliseconds = 5000; // optional - /// - /// /// ... make use of the various Call() overloads - /// } - /// } - /// - /// - /// Instances of this class declare a queue, so it is the user's - /// responsibility to ensure that the exchange concerned exists - /// (using IModel.ExchangeDeclare) before invoking Call() or - /// Cast(). - /// - /// - /// This class implements only a few basic RPC message formats - - /// to extend it with support for more formats, either subclass, - /// or transcode the messages before transmission using the - /// built-in byte[] format. - /// - /// - /// - public class SimpleRpcClient : IDisposable - { - ///Construct an instance with no configured - ///Address. The Address property must be set before Call() or - ///Cast() are called. - public SimpleRpcClient(IModel model) - : this(model, (PublicationAddress)null) - { - } - - ///Construct an instance that will deliver to the - ///default exchange (""), with routing key equal to the passed - ///in queueName, thereby delivering directly to a named queue - ///on the AMQP server. - public SimpleRpcClient(IModel model, string queueName) - : this(model, new PublicationAddress(ExchangeType.Direct, "", queueName)) - { - } - - ///Construct an instance that will deliver to the - ///named and typed exchange, with the given routing - ///key. - public SimpleRpcClient(IModel model, string exchange, - string exchangeType, string routingKey) - : this(model, new PublicationAddress(exchangeType, exchange, routingKey)) - { - } - - ///Construct an instance that will deliver to the - ///given address. - public SimpleRpcClient(IModel model, PublicationAddress address) - { - Model = model; - Address = address; - Subscription = null; - TimeoutMilliseconds = Timeout.Infinite; - } - - ///This event is fired whenever Call() detects the - ///disconnection of the underlying Subscription while waiting - ///for a reply from the service. - /// - /// See also OnDisconnected(). Note that the sending of a - /// request may result in OperationInterruptedException before - /// the request is even sent. - /// - public event EventHandler Disconnected; - - ///This event is fired whenever Call() decides that a - ///timeout has occurred while waiting for a reply from the - ///service. - /// - /// See also OnTimedOut(). - /// - public event EventHandler TimedOut; - - ///Retrieve or modify the address that will be used - ///for the next Call() or Cast(). - /// - /// This address represents the service, i.e. the destination - /// service requests should be published to. It can be changed - /// at any time before a Call() or Cast() request is sent - - /// the value at the time of the call is used by Call() and - /// Cast(). - /// - public PublicationAddress Address { get; set; } - - ///Retrieve the IModel this instance uses to communicate. - public IModel Model { get; protected set; } - - ///Retrieve the Subscription that is used to receive - ///RPC replies corresponding to Call() RPC requests. May be - ///null. - /// - /// - /// Upon construction, this property will be null. It is - /// initialised by the protected virtual method - /// EnsureSubscription upon the first call to Call(). Calls to - /// Cast() do not initialise the subscription, since no - /// replies are expected or possible when using Cast(). - /// - /// - public Subscription Subscription { get; protected set; } - - ///Retrieve or modify the timeout (in milliseconds) - ///that will be used for the next Call(). - /// - /// - /// This property defaults to - /// System.Threading.Timeout.Infinite (i.e. -1). If it is set - /// to any other value, Call() will only wait for the - /// specified amount of time before returning indicating a - /// timeout. - /// - /// - /// See also TimedOut event and OnTimedOut(). - /// - /// - public int TimeoutMilliseconds { get; set; } - - ///Sends a "jms/stream-message"-encoded RPC request, - ///and expects an RPC reply in the same format. - /// - /// - /// The arguments passed in must be of types that are - /// representable as JMS StreamMessage values, and so must the - /// results returned from the service in its reply message. - /// - /// - /// Calls OnTimedOut() and OnDisconnected() when a timeout or - /// disconnection, respectively, is detected when waiting for - /// our reply. - /// - /// - /// Returns null if the request timed out or if we were - /// disconnected before a reply arrived. - /// - /// - /// The reply message, if any, is acknowledged to the AMQP - /// server via Subscription.Ack(). - /// - /// - /// - /// - public virtual object[] Call(params object[] args) - { - IStreamMessageBuilder builder = new StreamMessageBuilder(Model); - builder.WriteObjects(args); - IBasicProperties replyProperties; - byte[] replyBody = Call((IBasicProperties)builder.GetContentHeader(), - builder.GetContentBody(), - out replyProperties); - if (replyProperties == null) - { - return null; - } - if (replyProperties.ContentType != StreamMessageBuilder.MimeType) - { - throw new ProtocolViolationException - (string.Format("Expected reply of MIME type {0}; got {1}", - StreamMessageBuilder.MimeType, - replyProperties.ContentType)); - } - IStreamMessageReader reader = new StreamMessageReader(replyProperties, replyBody); - return reader.ReadObjects(); - } - - ///Sends a simple byte[] message, without any custom - ///headers or properties. - /// - /// - /// Delegates directly to Call(IBasicProperties, byte[]), and - /// discards the properties of the received reply, returning - /// only the body of the reply. - /// - /// - /// Calls OnTimedOut() and OnDisconnected() when a timeout or - /// disconnection, respectively, is detected when waiting for - /// our reply. - /// - /// - /// Returns null if the request timed out or if we were - /// disconnected before a reply arrived. - /// - /// - /// The reply message, if any, is acknowledged to the AMQP - /// server via Subscription.Ack(). - /// - /// - public virtual byte[] Call(byte[] body) - { - var reply = Call(null, body); - return reply == null ? null : reply.Body; - } - - ///Sends a byte[] message and IBasicProperties - ///header, returning both the body and headers of the received - ///reply. - /// - /// - /// Sets the "replyProperties" outbound parameter to the - /// properties of the received reply, and returns the byte[] - /// body of the reply. - /// - /// - /// Calls OnTimedOut() and OnDisconnected() when a timeout or - /// disconnection, respectively, is detected when waiting for - /// our reply. - /// - /// - /// Both sets "replyProperties" to null and returns null when - /// either the request timed out or we were disconnected - /// before a reply arrived. - /// - /// - /// The reply message, if any, is acknowledged to the AMQP - /// server via Subscription.Ack(). - /// - /// - public virtual byte[] Call(IBasicProperties requestProperties, - byte[] body, - out IBasicProperties replyProperties) - { - var reply = Call(requestProperties, body); - if (reply == null) - { - replyProperties = null; - return null; - } - else - { - replyProperties = reply.BasicProperties; - return reply.Body; - } - } - - ///Sends a byte[]/IBasicProperties RPC request, - ///returning full information about the delivered reply as a - ///BasicDeliverEventArgs. - /// - /// - /// This is the most general/lowest-level Call()-style method - /// on SimpleRpcClient. It sets CorrelationId and ReplyTo on - /// the request message's headers before transmitting the - /// request to the service via the AMQP server. If the reply's - /// CorrelationId does not match the request's CorrelationId, - /// ProtocolViolationException will be thrown. - /// - /// - /// Calls OnTimedOut() and OnDisconnected() when a timeout or - /// disconnection, respectively, is detected when waiting for - /// our reply. - /// - /// - /// Returns null if the request timed out or if we were - /// disconnected before a reply arrived. - /// - /// - /// The reply message, if any, is acknowledged to the AMQP - /// server via Subscription.Ack(). - /// - /// - /// - public virtual BasicDeliverEventArgs Call(IBasicProperties requestProperties, byte[] body) - { - EnsureSubscription(); - - if (requestProperties == null) - { - requestProperties = Model.CreateBasicProperties(); - } - requestProperties.CorrelationId = Guid.NewGuid().ToString(); - requestProperties.ReplyTo = Subscription.QueueName; - - Cast(requestProperties, body); - return RetrieveReply(requestProperties.CorrelationId); - } - - ///Sends an asynchronous/one-way message to the - ///service. - public virtual void Cast(IBasicProperties requestProperties, - byte[] body) - { - Model.BasicPublish(Address, - requestProperties, - body); - } - - ///Close the reply subscription associated with this instance, if any. - /// - /// Simply delegates to calling Subscription.Close(). Clears - /// the Subscription property, so that subsequent Call()s, if - /// any, will re-initialize it to a fresh Subscription - /// instance. - /// - public void Close() - { - if (Subscription != null) - { - Subscription.Close(); - Subscription = null; - } - } - - ///Signals that the Subscription we use for receiving - ///our RPC replies was disconnected while we were - ///waiting. - /// - /// Fires the Disconnected event. - /// - public virtual void OnDisconnected() - { - if (Disconnected != null) - { - Disconnected(this, null); - } - } - - ///Signals that the configured timeout fired while - ///waiting for an RPC reply. - /// - /// Fires the TimedOut event. - /// - public virtual void OnTimedOut() - { - if (TimedOut != null) - { - TimedOut(this, null); - } - } - - ///Implement the IDisposable interface, permitting - ///SimpleRpcClient instances to be used in using - ///statements. - void IDisposable.Dispose() - { - Dispose(true); - } - - protected virtual void Dispose(bool disposing) - { - if (disposing) - { - // dispose managed resources - Close(); - } - - // dispose unmanaged resources - } - - ///Should initialise m_subscription to be non-null - ///and usable for fetching RPC replies from the service - ///through the AMQP server. - protected virtual void EnsureSubscription() - { - if (Subscription == null) - { - string queueName = Model.QueueDeclare(); - Subscription = new Subscription(Model, queueName); - } - } - - ///Retrieves the reply for the request with the given - ///correlation ID from our internal Subscription. - /// - /// Currently requires replies to arrive in the same order as - /// the requests were sent out. Subclasses may override this - /// to provide more sophisticated behaviour. - /// - protected virtual BasicDeliverEventArgs RetrieveReply(string correlationId) - { - if (!Subscription.Next(TimeoutMilliseconds, out var reply)) - { - OnTimedOut(); - return null; - } - - if (reply == null) - { - OnDisconnected(); - return null; - } - - if (reply.BasicProperties.CorrelationId != correlationId) - { - throw new ProtocolViolationException - (string.Format("Wrong CorrelationId in reply; expected {0}, got {1}", - correlationId, - reply.BasicProperties.CorrelationId)); - } - - Subscription.Ack(reply); - return reply; - } - } -} diff --git a/projects/client/RabbitMQ.Client/src/client/messagepatterns/SimpleRpcServer.cs b/projects/client/RabbitMQ.Client/src/client/messagepatterns/SimpleRpcServer.cs deleted file mode 100644 index 0117d42781..0000000000 --- a/projects/client/RabbitMQ.Client/src/client/messagepatterns/SimpleRpcServer.cs +++ /dev/null @@ -1,442 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 1.1. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2016 Pivotal Software, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v1.1: -// -//--------------------------------------------------------------------------- -// The contents of this file are subject to the Mozilla Public License -// Version 1.1 (the "License"); you may not use this file except in -// compliance with the License. You may obtain a copy of the License -// at https://www.mozilla.org/MPL/ -// -// Software distributed under the License is distributed on an "AS IS" -// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -// the License for the specific language governing rights and -// limitations under the License. -// -// The Original Code is RabbitMQ. -// -// The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System; -using RabbitMQ.Client.Content; -using RabbitMQ.Client.Events; - -namespace RabbitMQ.Client.MessagePatterns -{ - ///Implements a simple RPC service, responding to - ///requests received via a Subscription. - /// - /// - /// This class interprets requests such as those sent by instances - /// of SimpleRpcClient. - /// - /// - /// The basic pattern for implementing a service is to subclass - /// SimpleRpcServer, overriding HandleCall and HandleCast as - /// appropriate, and then to create a Subscription object for - /// receiving requests from clients, and start an instance of the - /// SimpleRpcServer subclass with the Subscription. - /// - /// - /// string queueName = "ServiceRequestQueue"; // See also Subscription ctors - /// using (IConnection conn = new ConnectionFactory() - /// .CreateConnection(serverAddress)) { - /// using (IModel ch = conn.CreateModel()) { - /// Subscription sub = new Subscription(ch, queueName); - /// new MySimpleRpcServerSubclass(sub).MainLoop(); - /// } - /// } - /// - /// - /// Note that this class itself does not declare any resources - /// (exchanges, queues or bindings). The Subscription we use for - /// receiving RPC requests should have already declared all the - /// resources we need. See the Subscription constructors and the - /// Subscription.Bind method. - /// - /// - /// If you are implementing a service that responds to - /// "jms/stream-message"-formatted requests (as implemented by - /// RabbitMQ.Client.Content.IStreamMessageReader), override - /// HandleStreamMessageCall. Otherwise, override HandleSimpleCall - /// or HandleCall as appropriate. Asynchronous, one-way requests - /// are dealt with by HandleCast etc. - /// - /// - /// Every time a request is successfully received and processed - /// within the server's MainLoop, the request message is Ack()ed - /// using Subscription.Ack before the next request is - /// retrieved. This causes the Subscription object to take care of - /// acknowledging receipt and processing of the request message. - /// - /// - /// If transactional service is enabled, via SetTransactional(), - /// then after every successful ProcessRequest, IModel.TxCommit is - /// called. Making use of transactional service has effects on all - /// parts of the application that share an IModel instance, - /// completely changing the style of interaction with the AMQP - /// server. For this reason, it is initially disabled, and must be - /// explicitly enabled with a call to SetTransactional(). Please - /// see the documentation for SetTransactional() for details. - /// - /// - /// To stop a running RPC server, call Close(). This will in turn - /// Close() the Subscription, which will cause MainLoop() to - /// return to its caller. - /// - /// - /// Unless overridden, ProcessRequest examines properties in the - /// request content header, and uses them to dispatch to one of - /// the Handle[...]() methods. See the documentation for - /// ProcessRequest and each Handle[...] method for details. - /// - /// - /// - public class SimpleRpcServer : IDisposable - { - protected Subscription m_subscription; - - ///Create, but do not start, an instance that will - ///receive requests via the given Subscription. - /// - /// - /// The instance is initially in non-transactional mode. See - /// SetTransactional(). - /// - /// - /// Call MainLoop() to start the request-processing loop. - /// - /// - public SimpleRpcServer(Subscription subscription) - { - m_subscription = subscription; - Transactional = false; - } - - ///Returns true if we are in "transactional" mode, or - ///false if we are not. - public bool Transactional { get; private set; } - - ///Shut down the server, causing MainLoop() to return - ///to its caller. - /// - /// Acts by calling Close() on the server's Subscription object. - /// - public void Close() - { - m_subscription.Close(); - } - - ///Called by ProcessRequest(), this is the most - ///general method that handles RPC-style requests. - /// - /// - /// This method should map requestProperties and body to - /// replyProperties and the returned byte array. - /// - /// - /// The default implementation checks - /// requestProperties.ContentType, and if it is - /// "jms/stream-message" (i.e. the current value of - /// StreamMessageBuilder.MimeType), parses it using - /// StreamMessageReader and delegates to - /// HandleStreamMessageCall before encoding and returning the - /// reply. If the ContentType is any other value, the request - /// is passed to HandleSimpleCall instead. - /// - /// - /// The isRedelivered flag is true when the server knows for - /// sure that it has tried to send this request previously - /// (although not necessarily to this application). It is not - /// a reliable indicator of previous receipt, however - the - /// only claim it makes is that a delivery attempt was made, - /// not that the attempt succeeded. Be careful if you choose - /// to use the isRedelivered flag. - /// - /// - public virtual byte[] HandleCall(bool isRedelivered, - IBasicProperties requestProperties, - byte[] body, - out IBasicProperties replyProperties) - { - if (requestProperties.ContentType == StreamMessageBuilder.MimeType) - { - IStreamMessageReader r = new StreamMessageReader(requestProperties, body); - IStreamMessageBuilder w = new StreamMessageBuilder(m_subscription.Model); - HandleStreamMessageCall(w, - isRedelivered, - requestProperties, - r.ReadObjects()); - replyProperties = (IBasicProperties)w.GetContentHeader(); - return w.GetContentBody(); - } - else - { - return HandleSimpleCall(isRedelivered, - requestProperties, - body, - out replyProperties); - } - } - - ///Called by ProcessRequest(), this is the most - ///general method that handles asynchronous, one-way - ///requests. - /// - /// - /// The default implementation checks - /// requestProperties.ContentType, and if it is - /// "jms/stream-message" (i.e. the current value of - /// StreamMessageBuilder.MimeType), parses it using - /// StreamMessageReader and delegates to - /// HandleStreamMessageCall, passing in null as the - /// replyWriter parameter to indicate that no reply is desired - /// or possible. If the ContentType is any other value, the - /// request is passed to HandleSimpleCast instead. - /// - /// - /// The isRedelivered flag is true when the server knows for - /// sure that it has tried to send this request previously - /// (although not necessarily to this application). It is not - /// a reliable indicator of previous receipt, however - the - /// only claim it makes is that a delivery attempt was made, - /// not that the attempt succeeded. Be careful if you choose - /// to use the isRedelivered flag. - /// - /// - public virtual void HandleCast(bool isRedelivered, - IBasicProperties requestProperties, - byte[] body) - { - if (requestProperties.ContentType == StreamMessageBuilder.MimeType) - { - IStreamMessageReader r = new StreamMessageReader(requestProperties, body); - HandleStreamMessageCall(null, - isRedelivered, - requestProperties, - r.ReadObjects()); - } - else - { - HandleSimpleCast(isRedelivered, requestProperties, body); - } - } - - ///Called by the default HandleCall() implementation - ///as a fallback. - /// - /// If the MIME ContentType of the request did not match any - /// of the types specially recognised - /// (e.g. "jms/stream-message"), this method is called instead - /// with the raw bytes of the request. It should fill in - /// replyProperties (or set it to null) and return a byte - /// array to send back to the remote caller as a reply - /// message. - /// - public virtual byte[] HandleSimpleCall(bool isRedelivered, - IBasicProperties requestProperties, - byte[] body, - out IBasicProperties replyProperties) - { - // Override to do something with the request. - replyProperties = null; - return null; - } - - ///Called by the default HandleCast() implementation - ///as a fallback. - /// - /// If the MIME ContentType of the request did not match any - /// of the types specially recognised - /// (e.g. "jms/stream-message"), this method is called instead - /// with the raw bytes of the request. - /// - public virtual void HandleSimpleCast(bool isRedelivered, - IBasicProperties requestProperties, - byte[] body) - { - // Override to do something with the request. - } - - ///Called by HandleCall and HandleCast when a - ///"jms/stream-message" request is received. - /// - /// - /// The args array contains the values decoded by HandleCall - /// or HandleCast. - /// - /// - /// The replyWriter parameter will be null if we were called - /// from HandleCast, in which case a reply is not expected or - /// possible, or non-null if we were called from - /// HandleCall. Use the methods of replyWriter in this case to - /// assemble your reply, which will be sent back to the remote - /// caller. - /// - /// - /// This default implementation does nothing, which - /// effectively sends back an empty reply to any and all - /// remote callers. - /// - /// - public virtual void HandleStreamMessageCall(IStreamMessageBuilder replyWriter, - bool isRedelivered, - IBasicProperties requestProperties, - object[] args) - { - // Override to do something with the request. - } - - ///Enters the main loop of the RPC service. - /// - /// - /// Retrieves requests repeatedly from the service's - /// subscription. Each request is passed to - /// ProcessRequest. Once ProcessRequest returns, the request - /// is acknowledged via Subscription.Ack(). If transactional - /// mode is enabled, TxCommit is then called. Finally, the - /// loop begins again. - /// - /// - /// Runs until the subscription ends, which happens either as - /// a result of disconnection, or of a call to Close(). - /// - /// - public void MainLoop() - { - foreach (BasicDeliverEventArgs evt in m_subscription) - { - ProcessRequest(evt); - m_subscription.Ack(); - if (Transactional) - { - m_subscription.Model.TxCommit(); - } - } - } - - ///Process a single request received from our - ///subscription. - /// - /// - /// If the request's properties contain a non-null, non-empty - /// CorrelationId string (see IBasicProperties), it is assumed - /// to be a two-way call, requiring a response. The ReplyTo - /// header property is used as the reply address (via - /// PublicationAddress.Parse, unless that fails, in which case it - /// is treated as a simple queue name), and the request is - /// passed to HandleCall(). - /// - /// - /// If the CorrelationId is absent or empty, the request is - /// treated as one-way asynchronous event, and is passed to - /// HandleCast(). - /// - /// - /// Usually, overriding HandleCall(), HandleCast(), or one of - /// their delegates is sufficient to implement a service, but - /// in some cases overriding ProcessRequest() is - /// required. Overriding ProcessRequest() gives the - /// opportunity to implement schemes for detecting interaction - /// patterns other than simple request/response or one-way - /// communication. - /// - /// - public virtual void ProcessRequest(BasicDeliverEventArgs evt) - { - IBasicProperties properties = evt.BasicProperties; - if (properties.ReplyTo != null && properties.ReplyTo != "") - { - // It's a request. - - PublicationAddress replyAddress = PublicationAddress.Parse(properties.ReplyTo); - if (replyAddress == null) - { - replyAddress = new PublicationAddress(ExchangeType.Direct, - "", - properties.ReplyTo); - } - - IBasicProperties replyProperties; - byte[] reply = HandleCall(evt.Redelivered, - properties, - evt.Body, - out replyProperties); - if (replyProperties == null) - { - replyProperties = m_subscription.Model.CreateBasicProperties(); - } - - replyProperties.CorrelationId = properties.CorrelationId; - m_subscription.Model.BasicPublish(replyAddress, - replyProperties, - reply); - } - else - { - // It's an asynchronous message. - HandleCast(evt.Redelivered, properties, evt.Body); - } - } - - ///Enables transactional mode. - /// - /// - /// Once enabled, transactional mode is not only enabled for - /// all users of the underlying IModel instance, but cannot be - /// disabled without shutting down the entire IModel (which - /// involves shutting down all the services depending on it, - /// and should not be undertaken lightly). - /// - /// - /// This method calls IModel.TxSelect, every time it is - /// called. (TxSelect is idempotent, so this is harmless.) - /// - /// - public void SetTransactional() - { - m_subscription.Model.TxSelect(); - Transactional = true; - } - - ///Implement the IDisposable interface, permitting - ///SimpleRpcServer instances to be used in using - ///statements. - void IDisposable.Dispose() - { - Dispose(true); - } - - protected virtual void Dispose(bool disposing) - { - if (disposing) - { - // dispose managed resources - Close(); - } - - // dispose unmanaged resources - } - } -} diff --git a/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs b/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs deleted file mode 100755 index 55926440f8..0000000000 --- a/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs +++ /dev/null @@ -1,560 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 1.1. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2016 Pivotal Software, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v1.1: -// -//--------------------------------------------------------------------------- -// The contents of this file are subject to the Mozilla Public License -// Version 1.1 (the "License"); you may not use this file except in -// compliance with the License. You may obtain a copy of the License -// at https://www.mozilla.org/MPL/ -// -// Software distributed under the License is distributed on an "AS IS" -// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -// the License for the specific language governing rights and -// limitations under the License. -// -// The Original Code is RabbitMQ. -// -// The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Collections; -using System.Collections.Concurrent; -using System.IO; - -using System.Threading; - -using RabbitMQ.Client.Events; -using RabbitMQ.Client.Exceptions; - -namespace RabbitMQ.Client.MessagePatterns -{ - ///Manages a subscription to a queue. - /// - /// - /// This convenience class abstracts away from much of the detail - /// involved in receiving messages from a queue. - /// - /// - /// Once created, the Subscription consumes from a queue (using a - /// EventingBasicConsumer). Received deliveries can be retrieved - /// by calling Next(), or by using the Subscription as an - /// IEnumerator in, for example, a foreach loop. - /// - /// - /// Note that if the "autoAck" option is enabled (which it is by - /// default), then received deliveries are automatically acked - /// within the server before they are even transmitted across the - /// network to us. Calling Ack() on received events will always do - /// the right thing: if "autoAck" is enabled, nothing is done on an - /// Ack() call, and if "autoAck" is disabled, IModel.BasicAck() is - /// called with the correct parameters. - /// - /// - public class Subscription : ISubscription - { - protected readonly object m_eventLock = new object(); - protected volatile EventingBasicConsumer m_consumer; - private BlockingCollection m_queue = - new BlockingCollection(new ConcurrentQueue()); - - private CancellationTokenSource m_queueCts = new CancellationTokenSource(); - -#if NETFX_CORE || NET4 - private ConcurrentQueue> m_waiting = - new ConcurrentQueue>(); -#endif - ///Creates a new Subscription in "autoAck" mode, - ///consuming from a named queue. - public Subscription(IModel model, string queueName) - : this(model, queueName, true) - { - } - - ///Creates a new Subscription, with full control over - ///both "autoAck" mode and the name of the queue. - public Subscription(IModel model, string queueName, bool autoAck) - { - Model = model; - QueueName = queueName; - AutoAck = autoAck; - m_consumer = new EventingBasicConsumer(Model); -#if NETFX_CORE || NET4 - m_consumer.Received += (sender, args) => QueueAdd(args); -#else - m_consumer.Received += (sender, args) => m_queue.Add(args); -#endif - ConsumerTag = Model.BasicConsume(QueueName, AutoAck, m_consumer); - m_consumer.ConsumerCancelled += HandleConsumerCancelled; - LatestEvent = null; - } - - ///Creates a new Subscription, with full control over - ///both "autoAck" mode, the name of the queue, and the consumer tag. - public Subscription(IModel model, string queueName, bool autoAck, string consumerTag) - { - Model = model; - QueueName = queueName; - AutoAck = autoAck; - m_consumer = new EventingBasicConsumer(Model); - m_consumer.ConsumerCancelled += HandleConsumerCancelled; - m_consumer.Received += (sender, args) => m_queue.Add(args); - ConsumerTag = Model.BasicConsume(QueueName, AutoAck, consumerTag, m_consumer); - LatestEvent = null; - } - - ///Retrieve the IBasicConsumer that is receiving the - ///messages from the server for us. Normally, you will not - ///need to access this property - use Next() and friends - ///instead. - public IBasicConsumer Consumer - { - get { return m_consumer; } - } - - ///Retrieve the consumer-tag that this subscription - ///is using. Will usually be a server-generated - ///name. - public string ConsumerTag { get; protected set; } - - ///Returns the most recent value returned by Next(), - ///or null when either no values have been retrieved yet, the - ///end of the subscription has been reached, or the most - ///recent value has already been Ack()ed. See also the - ///documentation for Ack(). - public BasicDeliverEventArgs LatestEvent { get; protected set; } - - ///Retrieve the IModel our subscription is carried by. - public IModel Model { get; protected set; } - - ///Returns true if we are in "autoAck" mode, where - ///calls to Ack() will be no-ops, and where the server acks - ///messages before they are delivered to us. Returns false if - ///we are in a mode where calls to Ack() are required, and - ///where such calls will actually send an acknowledgement - ///message across the network to the server. - public bool AutoAck { get; protected set; } - - ///Retrieve the queue name we have subscribed to. - public string QueueName { get; protected set; } - - ///Implementation of the IEnumerator interface, for - ///permitting Subscription to be used in foreach - ///loops. - /// - /// - /// As per the IEnumerator interface definition, throws - /// InvalidOperationException if LatestEvent is null. - /// - /// - /// Does not acknowledge any deliveries at all. Ack() must be - /// called explicitly on received deliveries. - /// - /// - object IEnumerator.Current - { - get - { - if (LatestEvent == null) - { - throw new InvalidOperationException(); - } - return LatestEvent; - } - } - - ///If LatestEvent is non-null, passes it to - ///Ack(BasicDeliverEventArgs). Causes LatestEvent to become - ///null. - public void Ack() - { - Ack(LatestEvent); - } - - ///If we are not in "autoAck" mode, calls - ///IModel.BasicAck with the delivery-tag from ; - ///otherwise, sends nothing to the server. if is the same as LatestEvent - ///by pointer comparison, sets LatestEvent to null. - /// - /// - ///Passing an event that did not originate with this Subscription's - /// channel, will lead to unpredictable behaviour - /// - public void Ack(BasicDeliverEventArgs evt) - { - if (evt == null) - { - return; - } - - if (!AutoAck && Model.IsOpen) - { - Model.BasicAck(evt.DeliveryTag, false); - } - - if (evt == LatestEvent) - { - MutateLatestEvent(null); - } - } - - ///Closes this Subscription, cancelling the consumer - ///record in the server. - public void Close() - { - try - { - bool shouldCancelConsumer = false; - if (m_consumer != null) - { - shouldCancelConsumer = m_consumer.IsRunning; - m_consumer = null; - } - - if (shouldCancelConsumer) - { - if (Model.IsOpen) - { - Model.BasicCancel(ConsumerTag); - } - - ConsumerTag = null; - } - - m_queueCts.Cancel(true); - if(m_queue != null) - { - m_queue.Dispose(); - m_queue = null; - } -#if NETFX_CORE || NET4 - var exn = new EndOfStreamException("Subscription closed"); - foreach (var tsc in m_waiting) - { - tsc.TrySetException(exn); - } -#endif - } - catch (OperationInterruptedException) - { - // We don't mind, here. - } - } - - ///If LatestEvent is non-null, passes it to - ///Nack(BasicDeliverEventArgs, false, requeue). Causes LatestEvent to become - ///null. - public void Nack(bool requeue) - { - Nack(LatestEvent, false, requeue); - } - - ///If LatestEvent is non-null, passes it to - ///Nack(BasicDeliverEventArgs, multiple, requeue). Causes LatestEvent to become - ///null. - public void Nack(bool multiple, bool requeue) - { - Nack(LatestEvent, multiple, requeue); - } - - ///If we are not in "autoAck" mode, calls - ///IModel.BasicNack with the delivery-tag from ; - ///otherwise, sends nothing to the server. if is the same as LatestEvent - ///by pointer comparison, sets LatestEvent to null. - /// - /// - ///Passing an event that did not originate with this Subscription's - /// channel, will lead to unpredictable behaviour - /// - public void Nack(BasicDeliverEventArgs evt, bool multiple, bool requeue) - { - if (evt == null) - { - return; - } - - if (!AutoAck && Model.IsOpen) - { - Model.BasicNack(evt.DeliveryTag, multiple, requeue); - } - - if (evt == LatestEvent) - { - MutateLatestEvent(null); - } - } - - ///Retrieves the next incoming delivery in our - ///subscription queue. - /// - /// - /// Returns null when the end of the stream is reached and on - /// every subsequent call. End-of-stream can arise through the - /// action of the Subscription.Close() method, or through the - /// closure of the IModel or its underlying IConnection. - /// - /// - /// Updates LatestEvent to the value returned. - /// - /// - /// Does not acknowledge any deliveries at all (but in "autoAck" - /// mode, the server will have auto-acknowledged each event - /// before it is even sent across the wire to us). - /// - /// - public BasicDeliverEventArgs Next() - { - // Alias the pointer as otherwise it may change out - // from under us by the operation of Close() from - // another thread. - EventingBasicConsumer consumer = m_consumer; - try - { - if (consumer == null || Model.IsClosed) - { - MutateLatestEvent(null); - } - else - { - BasicDeliverEventArgs bdea = m_queue.Take(m_queueCts.Token); - MutateLatestEvent(bdea); - } - } - catch (EndOfStreamException) - { - MutateLatestEvent(null); - } - return LatestEvent; - } - -#if NETFX_CORE || NET4 - public Task NextAsync() - { - try - { - // Alias the pointer as otherwise it may change out - // from under us by the operation of Close() from - // another thread. - var queue = m_queue; - if (queue == null || Model.IsClosed) - { - // Closed! - MutateLatestEvent(null); - } - else - { - BasicDeliverEventArgs evt = null; - if(queue.TryTake(out evt)) - { - MutateLatestEvent(evt); - } - else - { - var tcs = new TaskCompletionSource(); - m_waiting.Enqueue(tcs); - return tcs.Task; - } - } - } - catch (AggregateException ex) - { - // since tasks wrap exceptions as AggregateException, - // reach in and check if the EndOfStream exception is what happened - if (ex.InnerException is EndOfStreamException) - { - MutateLatestEvent(null); - } - } - catch (EndOfStreamException) - { - MutateLatestEvent(null); - } - - return Task.FromResult(LatestEvent); - } -#endif - - ///Retrieves the next incoming delivery in our - ///subscription queue, or times out after a specified number - ///of milliseconds. - /// - /// - /// Returns false only if the timeout expires before either a - /// delivery appears or the end-of-stream is reached. If false - /// is returned, the out parameter "result" is set to null, - /// but LatestEvent is not updated. - /// - /// - /// Returns true to indicate a delivery or the end-of-stream. - /// - /// - /// If a delivery is already waiting in the queue, or one - /// arrives before the timeout expires, it is removed from the - /// queue and placed in the "result" out parameter. If the - /// end-of-stream is detected before the timeout expires, - /// "result" is set to null. - /// - /// - /// Whenever this method returns true, it updates LatestEvent - /// to the value placed in "result" before returning. - /// - /// - /// End-of-stream can arise through the action of the - /// Subscription.Close() method, or through the closure of the - /// IModel or its underlying IConnection. - /// - /// - /// This method does not acknowledge any deliveries at all - /// (but in "autoAck" mode, the server will have - /// auto-acknowledged each event before it is even sent across - /// the wire to us). - /// - /// - /// A timeout of -1 (i.e. System.Threading.Timeout.Infinite) - /// will be interpreted as a command to wait for an - /// indefinitely long period of time for an item or the end of - /// the stream to become available. Usage of such a timeout is - /// equivalent to calling Next() with no arguments (modulo - /// predictable method signature differences). - /// - /// - public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result) - { - try - { - // Alias the pointer as otherwise it may change out - // from under us by the operation of Close() from - // another thread. - var consumer = m_consumer; - if (consumer == null || Model.IsClosed) - { - MutateLatestEvent(null); - result = null; - return false; - } - else - { - BasicDeliverEventArgs qValue; - if (!m_queue.TryTake(out qValue, millisecondsTimeout)) - { - result = null; - return false; - } - MutateLatestEvent(qValue); - } - } - catch (EndOfStreamException) - { - MutateLatestEvent(null); - } - result = LatestEvent; - return true; - } - - ///Implementation of the IDisposable interface, - ///permitting Subscription to be used in using - ///statements. Simply calls Close(). - void IDisposable.Dispose() - { - Dispose(true); - } - - protected virtual void Dispose(bool disposing) - { - if (disposing) - { - // dispose managed resources - Close(); - } - - // dispose unmanaged resources - } - - ///Implementation of the IEnumerable interface, for - ///permitting Subscription to be used in foreach - ///loops. - IEnumerator IEnumerable.GetEnumerator() - { - return this; - } - - ///Implementation of the IEnumerator interface, for - ///permitting Subscription to be used in foreach - ///loops. - /// - /// - /// Does not acknowledge any deliveries at all. Ack() must be - /// called explicitly on received deliveries. - /// - /// - bool IEnumerator.MoveNext() - { - return Next() != null; - } - - ///Dummy implementation of the IEnumerator interface, - ///for permitting Subscription to be used in foreach loops; - ///Reset()ting a Subscription doesn't make sense, so this - ///method always throws InvalidOperationException. - void IEnumerator.Reset() - { - // It really doesn't make sense to try to reset a subscription. - throw new InvalidOperationException("Subscription.Reset() does not make sense"); - } - - protected void MutateLatestEvent(BasicDeliverEventArgs value) - { - lock (m_eventLock) - { - LatestEvent = value; - } - } - - private void HandleConsumerCancelled(object sender, ConsumerEventArgs e) - { - lock (m_eventLock) - { - m_consumer = null; - MutateLatestEvent(null); - } - } - -#if NETFX_CORE || NET4 - private void QueueAdd(BasicDeliverEventArgs args) - { - //NB: as long as there are async awaiters sync callers will never be served - //this is not ideal but consistent with how SharedQueue behaves - TaskCompletionSource tsc; - if(m_waiting.TryDequeue(out tsc) && tsc.TrySetResult(args)) - { - return; - } - else - { - m_queue.Add(args); - } - } -#endif - } -} diff --git a/projects/client/Unit/src/unit/TestMessagePatternsSubscription.cs b/projects/client/Unit/src/unit/TestMessagePatternsSubscription.cs deleted file mode 100644 index 86b79dc7ae..0000000000 --- a/projects/client/Unit/src/unit/TestMessagePatternsSubscription.cs +++ /dev/null @@ -1,207 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 1.1. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2016 Pivotal Software, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v1.1: -// -//--------------------------------------------------------------------------- -// The contents of this file are subject to the Mozilla Public License -// Version 1.1 (the "License"); you may not use this file except in -// compliance with the License. You may obtain a copy of the License -// at https://www.mozilla.org/MPL/ -// -// Software distributed under the License is distributed on an "AS IS" -// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -// the License for the specific language governing rights and -// limitations under the License. -// -// The Original Code is RabbitMQ. -// -// The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using NUnit.Framework; -using RabbitMQ.Client.Events; -using RabbitMQ.Client.Exceptions; -using RabbitMQ.Client.MessagePatterns; -using System; -using System.Collections.Generic; -using System.Threading; - -namespace RabbitMQ.Client.Unit -{ - [TestFixture] - public class TestMessagePatternsSubscription : IntegrationFixture - { - protected void TestConcurrentIterationWithDrainer(Action action) - { - IDictionary args = new Dictionary - { - {Headers.XMessageTTL, 5000} - }; - string queueDeclare = Model.QueueDeclare("", false, true, false, args); - var subscription = new Subscription(Model, queueDeclare, false); - - PreparedQueue(queueDeclare); - - var threads = new List(); - for (int i = 0; i < 50; i++) - { - var drainer = new SubscriptionDrainer(subscription, action); - var thread = new Thread(drainer.Drain); - threads.Add(thread); - thread.Start(); - } - - threads.ForEach(x => x.Join(20 * 1000)); - } - - protected void TestSequentialIterationWithDrainer(Action action) - { - IDictionary args = new Dictionary - { - {Headers.XMessageTTL, 5000} - }; - string queueDeclare = Model.QueueDeclare("", false, true, false, args); - var subscription = new Subscription(Model, queueDeclare, false); - - PreparedQueue(queueDeclare); - - var drainer = new SubscriptionDrainer(subscription, action); - drainer.Drain(); - } - - private void TestSubscriptionAction(Action action) - { - Model.BasicQos(0, 1, false); - string queueDeclare = Model.QueueDeclare(); - var subscription = new Subscription(Model, queueDeclare, false); - - Model.BasicPublish("", queueDeclare, null, encoding.GetBytes("a message")); - BasicDeliverEventArgs res = subscription.Next(); - Assert.IsNotNull(res); - action(subscription); - QueueDeclareOk ok = Model.QueueDeclarePassive(queueDeclare); - Assert.AreEqual(0, ok.MessageCount); - } - - protected class SubscriptionDrainer - { - protected Subscription m_subscription; - - public SubscriptionDrainer(Subscription subscription, Action action) - { - m_subscription = subscription; - PostProcess = action; - } - - private Action PostProcess { get; set; } - - public void Drain() - { -#pragma warning disable 0168 - try - { - for (int i = 0; i < 100; i++) - { - BasicDeliverEventArgs ea = m_subscription.Next(); - if (ea != null) - { - Assert.That(ea, Is.TypeOf(typeof(BasicDeliverEventArgs))); - PostProcess(m_subscription); - } - else - { - break; - } - } - } - catch (AlreadyClosedException ace) - { - // expected - } - finally - { - m_subscription.Close(); - } -#pragma warning restore - } - } - - private void PreparedQueue(string q) - { - // this should be greater than the number of threads - // multiplied by 100 (deliveries per Subscription), alternatively - // drainers can use Subscription.Next with a timeout. - for (int i = 0; i < 20000; i++) - { - Model.BasicPublish("", q, null, encoding.GetBytes("a message")); - } - } - - [Test] - public void TestChannelClosureIsObservableOnSubscription() - { - string q = Model.QueueDeclare(); - var sub = new Subscription(Model, q, true); - - BasicDeliverEventArgs r1; - Assert.IsFalse(sub.Next(100, out r1)); - - Model.BasicPublish("", q, null, encoding.GetBytes("a message")); - Model.BasicPublish("", q, null, encoding.GetBytes("a message")); - - BasicDeliverEventArgs r2; - Assert.IsTrue(sub.Next(1000, out r2)); - Assert.IsNotNull(sub.Next()); - - Model.Close(); - Assert.IsNull(sub.Next()); - - BasicDeliverEventArgs r3; - Assert.IsFalse(sub.Next(100, out r3)); - } - - [Test] - public void TestConcurrentIterationAndAck() - { - TestConcurrentIterationWithDrainer(s => s.Ack()); - } - - [Test] - public void TestConcurrentIterationAndNack() - { - TestConcurrentIterationWithDrainer(s => s.Nack(false, false)); - } - - [Test] - public void TestSubscriptionAck() - { - TestSubscriptionAction(s => s.Ack()); - } - - [Test] - public void TestSubscriptionNack() - { - TestSubscriptionAction(s => s.Nack(false, false)); - } - } -} diff --git a/projects/client/Unit/src/unit/TestSubscription.cs b/projects/client/Unit/src/unit/TestSubscription.cs deleted file mode 100755 index c3422a6c55..0000000000 --- a/projects/client/Unit/src/unit/TestSubscription.cs +++ /dev/null @@ -1,80 +0,0 @@ -// This source code is dual-licensed under the Apache License, version -// 2.0, and the Mozilla Public License, version 1.1. -// -// The APL v2.0: -// -//--------------------------------------------------------------------------- -// Copyright (c) 2007-2016 Pivotal Software, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//--------------------------------------------------------------------------- -// -// The MPL v1.1: -// -//--------------------------------------------------------------------------- -// The contents of this file are subject to the Mozilla Public License -// Version 1.1 (the "License"); you may not use this file except in -// compliance with the License. You may obtain a copy of the License -// at https://www.mozilla.org/MPL/ -// -// Software distributed under the License is distributed on an "AS IS" -// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -// the License for the specific language governing rights and -// limitations under the License. -// -// The Original Code is RabbitMQ. -// -// The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. -//--------------------------------------------------------------------------- - -using System; -using System.Threading; -using NUnit.Framework; - -using RabbitMQ.Client.MessagePatterns; - -namespace RabbitMQ.Client.Unit -{ - [TestFixture] - class TestSubscription : IntegrationFixture - { - [SetUp] - public override void Init() - { - var connFactory = new ConnectionFactory() - { - AutomaticRecoveryEnabled = false - }; - Conn = connFactory.CreateConnection(); - Model = Conn.CreateModel(); - } - - [Test, MaxTime(16000)] - public void TestConsumerCancellationNotification() - { - var q = Guid.NewGuid().ToString(); - this.Model.QueueDeclare(queue: q, durable: false, exclusive: false, autoDelete: false, arguments: null); - var sub = new Subscription(this.Model, q); - var latch = new ManualResetEvent(false); - sub.Consumer.ConsumerCancelled += (_sender, _args) => - { - sub.Close(); - Conn.Close(); - latch.Set(); - }; - this.Model.QueueDelete(q); - Wait(latch, TimeSpan.FromSeconds(8)); - } - } -} From 4fd1235f86c8661a11907d04c4cf88b35e9c6eff Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 6 Sep 2019 18:30:35 +0300 Subject: [PATCH 2/2] Ignore .ionide --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index ddddf6b8b1..e85a5ffcd9 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ TestResults.xml TestResult.xml /packages /.fake +/.ionide .paket/* .paket/paket.exe /paket-files/*