Skip to content

Commit

Permalink
akka-io: Stop select loop on idle
Browse files Browse the repository at this point in the history
  • Loading branch information
fergusn committed Aug 12, 2015
1 parent ccf6680 commit e545780
Showing 1 changed file with 47 additions and 39 deletions.
86 changes: 47 additions & 39 deletions src/core/Akka/IO/SelectionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ public ChannelRegistryImpl(ILoggingAdapter log)
{
_log = log;
_executionContext = new SingleThreadExecutionContext();
Execute(Select);
}

private void Execute(Action action)
Expand All @@ -206,40 +205,39 @@ private void Execute(Action action)

private void Select()
{
if (_read.Count > 0 || _write.Count > 0)
if (_read.Count == 0 && _write.Count == 0) return; // Stop select loop when no more interested sockets. It will be started again once a socket is registered

var readable = _read.Keys.ToList();
var writeable = _write.Keys.ToList();
try
{
var readable = _read.Keys.ToList();
var writeable = _write.Keys.ToList();
try
Socket.Select(readable, writeable, null, 1);
foreach (var socket in readable)
{
Socket.Select(readable, writeable, null, 1);
foreach (var socket in readable)
{
var channel = _read[socket];
if (channel.IsOpen())
channel.Connection.Tell(ChannelReadable.Instance);
else
channel.Connection.Tell(ChannelAcceptable.Instance);
_read.Remove(socket);
}
foreach (var socket in writeable)
{
var channel = _write[socket];
if (channel.IsOpen())
channel.Connection.Tell(ChannelWritable.Instance);
else
channel.Connection.Tell(ChannelConnectable.Instance);
_write.Remove(socket);
}
var channel = _read[socket];
if (channel.IsOpen())
channel.Connection.Tell(ChannelReadable.Instance);
else
channel.Connection.Tell(ChannelAcceptable.Instance);
_read.Remove(socket);
}
catch (SocketException ex)
foreach (var socket in writeable)
{
if (ex.SocketErrorCode == SocketError.NotSocket)
{
// One of the sockets has been closed
readable.Where(x => !x.Connected).ForEach(x =>_read.Remove(x));
writeable.Where(x => !x.Connected).ForEach(x => _write.Remove(x));
}
var channel = _write[socket];
if (channel.IsOpen())
channel.Connection.Tell(ChannelWritable.Instance);
else
channel.Connection.Tell(ChannelConnectable.Instance);
_write.Remove(socket);
}
}
catch (SocketException ex)
{
if (ex.SocketErrorCode == SocketError.NotSocket)
{
// One of the sockets has been closed
readable.Where(x => !x.Connected).ForEach(x =>_read.Remove(x));
writeable.Where(x => !x.Connected).ForEach(x => _write.Remove(x));
}
}
Execute(Select);
Expand All @@ -262,14 +260,24 @@ private void EnableInterest(SocketChannel channel, SocketAsyncOperation op)
{
switch (op)
{
case SocketAsyncOperation.Accept:
case SocketAsyncOperation.Receive:
Execute(() => _read.Add(channel.Socket, channel));
break;
case SocketAsyncOperation.Connect:
case SocketAsyncOperation.Send:
Execute(() => _write.Add(channel.Socket, channel));
break;
case SocketAsyncOperation.Accept:
case SocketAsyncOperation.Receive:
Execute(() =>
{
_read.Add(channel.Socket, channel);
if (_read.Count == 1 && _write.Count == 0) // Start the select loop on initial enable interest
Select(); // The select loop will stop itself if no more interested sockets
});
break;
case SocketAsyncOperation.Connect:
case SocketAsyncOperation.Send:
Execute(() =>
{
_write.Add(channel.Socket, channel); // Start the select loop on initial enable interest
if (_read.Count == 0 && _write.Count == 1) // The select loop will stop itself if no more interested sockets
Select();
});
break;
}
}
private void DisableInterest(SocketChannel channel, SocketAsyncOperation op)
Expand Down

0 comments on commit e545780

Please sign in to comment.