diff --git a/src/core/Akka/IO/SelectionHandler.cs b/src/core/Akka/IO/SelectionHandler.cs index 07ea19ebcb2..b3753d3f39e 100644 --- a/src/core/Akka/IO/SelectionHandler.cs +++ b/src/core/Akka/IO/SelectionHandler.cs @@ -196,7 +196,6 @@ public ChannelRegistryImpl(ILoggingAdapter log) { _log = log; _executionContext = new SingleThreadExecutionContext(); - Execute(Select); } private void Execute(Action action) @@ -206,40 +205,39 @@ private void Execute(Action action) private void Select() { - if (_read.Count > 0 || _write.Count > 0) + if (_read.Count == 0 && _write.Count == 0) return; // Stop select loop when no more interested sockets. It will be started again once a socket is registered + + var readable = _read.Keys.ToList(); + var writeable = _write.Keys.ToList(); + try { - var readable = _read.Keys.ToList(); - var writeable = _write.Keys.ToList(); - try + Socket.Select(readable, writeable, null, 1); + foreach (var socket in readable) { - Socket.Select(readable, writeable, null, 1); - foreach (var socket in readable) - { - var channel = _read[socket]; - if (channel.IsOpen()) - channel.Connection.Tell(ChannelReadable.Instance); - else - channel.Connection.Tell(ChannelAcceptable.Instance); - _read.Remove(socket); - } - foreach (var socket in writeable) - { - var channel = _write[socket]; - if (channel.IsOpen()) - channel.Connection.Tell(ChannelWritable.Instance); - else - channel.Connection.Tell(ChannelConnectable.Instance); - _write.Remove(socket); - } + var channel = _read[socket]; + if (channel.IsOpen()) + channel.Connection.Tell(ChannelReadable.Instance); + else + channel.Connection.Tell(ChannelAcceptable.Instance); + _read.Remove(socket); } - catch (SocketException ex) + foreach (var socket in writeable) { - if (ex.SocketErrorCode == SocketError.NotSocket) - { - // One of the sockets has been closed - readable.Where(x => !x.Connected).ForEach(x =>_read.Remove(x)); - writeable.Where(x => !x.Connected).ForEach(x => _write.Remove(x)); - } + var channel = _write[socket]; + if (channel.IsOpen()) + channel.Connection.Tell(ChannelWritable.Instance); + else + channel.Connection.Tell(ChannelConnectable.Instance); + _write.Remove(socket); + } + } + catch (SocketException ex) + { + if (ex.SocketErrorCode == SocketError.NotSocket) + { + // One of the sockets has been closed + readable.Where(x => !x.Connected).ForEach(x =>_read.Remove(x)); + writeable.Where(x => !x.Connected).ForEach(x => _write.Remove(x)); } } Execute(Select); @@ -262,14 +260,24 @@ private void EnableInterest(SocketChannel channel, SocketAsyncOperation op) { switch (op) { - case SocketAsyncOperation.Accept: - case SocketAsyncOperation.Receive: - Execute(() => _read.Add(channel.Socket, channel)); - break; - case SocketAsyncOperation.Connect: - case SocketAsyncOperation.Send: - Execute(() => _write.Add(channel.Socket, channel)); - break; + case SocketAsyncOperation.Accept: + case SocketAsyncOperation.Receive: + Execute(() => + { + _read.Add(channel.Socket, channel); + if (_read.Count == 1 && _write.Count == 0) // Start the select loop on initial enable interest + Select(); // The select loop will stop itself if no more interested sockets + }); + break; + case SocketAsyncOperation.Connect: + case SocketAsyncOperation.Send: + Execute(() => + { + _write.Add(channel.Socket, channel); // Start the select loop on initial enable interest + if (_read.Count == 0 && _write.Count == 1) // The select loop will stop itself if no more interested sockets + Select(); + }); + break; } } private void DisableInterest(SocketChannel channel, SocketAsyncOperation op)