Skip to content

Commit

Permalink
Akka IO Transport: framing support
Browse files Browse the repository at this point in the history
  • Loading branch information
fergusn committed Oct 12, 2015
1 parent 8571423 commit 60b5d2a
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ public static Address ToAddress(this EndPoint endpoint, ActorSystem system)
return new Address(AkkaIOTransport.Protocal, system.Name, dns.Host, dns.Port);
var ip = endpoint as IPEndPoint;
if (ip != null)
return new Address(AkkaIOTransport.Protocal, system.Name, "127.0.0.1", ip.Port);
return new Address(AkkaIOTransport.Protocal, system.Name, ip.Address.MapToIPv4().ToString(), ip.Port);
throw new ArgumentException("endpoint");
}

public static EndPoint ToEndpoint(this Address address)
{
return new DnsEndPoint(address.Host, address.Port.GetValueOrDefault(9099), AddressFamily.InterNetwork);
if (address == null || address.Host == null || !address.Port.HasValue)
throw new ArgumentException("Invalid address", "address");
return new DnsEndPoint(address.Host, address.Port.Value, AddressFamily.InterNetwork);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ class Settings
public Settings(Config config)
{
Port = config.GetInt("port");
Hostname = config.GetString("hostname");
}

public int Port { get; set; }
public int Port { get; private set; }
public string Hostname { get; private set; }
}

private readonly IActorRef _manager;
Expand All @@ -37,14 +39,17 @@ public AkkaIOTransport(ActorSystem system, Config config)
}

public override string SchemeIdentifier { get { return Protocal; } }

public override long MaximumPayloadBytes { get { return 128000; } }

public override bool IsResponsibleFor(Address remote)
{
return true;
}

public override Task<Tuple<Address, TaskCompletionSource<IAssociationEventListener>>> Listen()
{
return _manager.Ask<Tuple<Address, TaskCompletionSource<IAssociationEventListener>>>(new Listen(_settings.Port));
return _manager.Ask<Tuple<Address, TaskCompletionSource<IAssociationEventListener>>>(new Listen(_settings.Hostname, _settings.Port));
}
public override Task<AssociationHandle> Associate(Address remoteAddress)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.IO;
Expand Down Expand Up @@ -53,7 +54,7 @@ protected override void OnReceive(object message)
if (message is IHandleEventListener)
{
var el = message as IHandleEventListener;
Context.Become(Receiving(el));
Context.Become(WaitingForPrefix(el, IO.ByteString.Empty));
Stash.UnstashAll();
}
else
Expand All @@ -62,27 +63,57 @@ protected override void OnReceive(object message)
}
}

private UntypedReceive Receiving(IHandleEventListener el)
private UntypedReceive WaitingForPrefix(IHandleEventListener el, IO.ByteString buffer)
{
if (buffer.Count >= 4)
{
var length = buffer.Iterator().GetInt();
return WaitingForBody(el, buffer.Drop(4), length);
}
return message =>
{
if (message is Tcp.Received)
{
var received = message as Tcp.Received;
el.Notify(new InboundPayload(ByteString.CopyFrom(received.Data.ToArray())));
Become(WaitingForPrefix(el, buffer.Concat(received.Data)));
}
if (message is ByteString)
{
var bs = message as ByteString;
_connection.Tell(Tcp.Write.Create(IO.ByteString.Create(bs.ToByteArray())));
}
else
else HandleWrite(message);
};
}

private UntypedReceive WaitingForBody(IHandleEventListener el, IO.ByteString buffer, int length)
{
if (buffer.Count >= length)
{
var parts = buffer.SplitAt(length);
el.Notify(new InboundPayload(ByteString.CopyFrom(parts.Item1.ToArray())));
return WaitingForPrefix(el, parts.Item2);
}
return message =>
{
if (message is Tcp.Received)
{
Unhandled(message);
var received = message as Tcp.Received;
Become(WaitingForBody(el, buffer.Concat(received.Data), length));
}
else HandleWrite(message);
};
}

private void HandleWrite(object message)
{
if (message is ByteString)
{
var bs = message as ByteString;
var buffer = ByteString.Unsafe.GetBuffer(bs);
var builder = new ByteStringBuilder();
builder.PutInt(buffer.Length, ByteOrder.BigEndian);
builder.PutBytes(buffer);
_connection.Tell(Tcp.Write.Create(builder.Result()));
}
else Unhandled(message);
}

public IStash Stash { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ public Associate(Address remoteAddress)
}
class Listen
{
public Listen(int port)
public Listen(string hostname, int port)
{
Hostname = hostname;
Port = port;
}

public string Hostname { get; private set; }
public int Port { get; private set; }
}

Expand All @@ -42,22 +45,22 @@ protected override void OnReceive(object message)
{
var associate = message as Associate;
Context.System.Tcp().Tell(new Tcp.Connect(associate.RemoteAddress.ToEndpoint()));
BecomeStacked(WaitingForConnected(Sender));
BecomeStacked(WaitingForConnected(Sender, associate));
}
else if (message is Listen)
{
var listen = message as Listen;
var handler = Context.ActorOf(Props.Create(() => new TransportListener()));
Context.System.Tcp().Tell(new Tcp.Bind(handler, new IPEndPoint(IPAddress.Loopback, listen.Port)));
BecomeStacked(WaitingForBound(Sender, handler));
Context.System.Tcp().Tell(new Tcp.Bind(handler, new IPEndPoint(IPAddress.Any, listen.Port)));
BecomeStacked(WaitingForBound(Sender, handler, listen));
}
else
{
Unhandled(message);
}
}

private Receive WaitingForBound(IActorRef replyTo, IActorRef handler)
private Receive WaitingForBound(IActorRef replyTo, IActorRef handler, Listen listen)
{
return message =>
{
Expand All @@ -66,7 +69,7 @@ private Receive WaitingForBound(IActorRef replyTo, IActorRef handler)
var bound = message as Tcp.Bound;
var promise = new TaskCompletionSource<IAssociationEventListener>();
promise.Task.PipeTo(handler);
replyTo.Tell(Tuple.Create(bound.LocalAddress.ToAddress(Context.System), promise));
replyTo.Tell(Tuple.Create(new Address(AkkaIOTransport.Protocal, Context.System.Name, listen.Hostname, ((IPEndPoint) bound.LocalAddress).Port), promise));
UnbecomeStacked();
Stash.Unstash();
return true;
Expand All @@ -75,7 +78,7 @@ private Receive WaitingForBound(IActorRef replyTo, IActorRef handler)
};
}

private Receive WaitingForConnected(IActorRef replyTo)
private Receive WaitingForConnected(IActorRef replyTo, Associate associate)
{
return message =>
{
Expand All @@ -84,7 +87,7 @@ private Receive WaitingForConnected(IActorRef replyTo)
var connected = message as Tcp.Connected;
var handler = Context.ActorOf(Props.Create(() => new ConnectionAssociationActor(Sender)));
Sender.Tell(new Tcp.Register(handler));
replyTo.Tell(new ConnectionAssociationHandle(handler, connected.LocalAddress.ToAddress(Context.System), connected.RemoteAddress.ToAddress(Context.System)));
replyTo.Tell(new ConnectionAssociationHandle(handler, connected.LocalAddress.ToAddress(Context.System), associate.RemoteAddress));
UnbecomeStacked();
Stash.Unstash();
return true;
Expand Down

0 comments on commit 60b5d2a

Please sign in to comment.