Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Udp.Received give incorrect ByteString when client send several packets at once #3210

Closed
blackclavus opened this issue Dec 13, 2017 · 2 comments

Comments

@blackclavus
Copy link

Akka version: 1.3.2
Platform: .net core 2.0

When client sending several packets at once (within a loop) like this:

a
bb
ccc
ddd
eeee
fffff
gggggg
hhhhhhh
iiiiiiii
jjjjjjjjj

Server will receive incorrect bytes like this:

i
jj
iii
jjjj

...

The length of the bytes remain correct but the content is replaced by other bytes.

Here code sample to reproduce result above:

Server:

using Akka.Actor;
using Akka.Configuration;
using Akka.Event;
using Akka.IO;
using System;
using System.Net;

class Program
{
    static void Main(string[] args)
    {
        var config = @"akka 
        {
            stdout-loglevel = INFO
            loglevel = DEBUG
            log-config-on-start = off
            io 
            {
                udp 
                {
                    trace-logging = on
                }
            }
            actor 
            {
                serializers 
                {
                    hyperion = ""Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion""
                }
                serialization-bindings 
                {
                    ""System.Object"" = hyperion
                }
            }
        }";

        using (var system = ActorSystem.Create("test", ConfigurationFactory.ParseString(config)))
        {
            system.ActorOf(Props.Create(() => new UdpServer(new IPEndPoint(IPAddress.Any, 32500))));

            Console.Read();
        }
    }

    public class UdpServer : ReceiveActor
    {
        private readonly ILoggingAdapter _log = Context.GetLogger();
        private readonly IPEndPoint _serverAddr;

        private IActorRef _server;

        public UdpServer(IPEndPoint serverAddr)
        {
            _serverAddr = serverAddr;

            Receive<Udp.Bound>(b => _server = Sender);
            Receive<Udp.Received>(rec => _log.Debug($"{rec.Sender} {rec.Data.ToString()}"));
        }

        protected override void PreStart()
        {
            Context.System.Udp().Tell(new Udp.Bind(Self, _serverAddr));
        }
    }
}

Client:

using Akka.Actor;
using Akka.Configuration;
using Akka.Event;
using Akka.IO;
using System;
using System.Net;
using System.Text;

class Program
{
    static void Main(string[] args)
    {
        var config = @"akka 
        {
            stdout-loglevel = INFO
            loglevel = DEBUG
            log-config-on-start = off
            io 
            {
                udp 
                {
                    trace-logging = on
                }
            }
            actor 
            {
                serializers 
                {
                    hyperion = ""Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion""
                }
                serialization-bindings 
                {
                    ""System.Object"" = hyperion
                }
            }
        }";

        using (var system = ActorSystem.Create("test", ConfigurationFactory.ParseString(config)))
        {
            var serverAddr = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 32500);
            var worker = system.ActorOf(Props.Create(() => new Worker(serverAddr)));

            while (true)
            {
                Console.ReadLine();
                worker.Tell(Send.Instance);
            }
        }
    }

    class Worker : ReceiveActor
    {
        private readonly IPEndPoint _serverAddr;
        private IActorRef _simpleSender;

        public Worker(IPEndPoint serverAddr)
        {
            Receive<Udp.SimpleSenderReady>(r =>
            {
                _simpleSender = Sender;
                Self.Tell(Send.Instance);
            });

            Receive<Send>(s =>
            {
                var start = 'a';
                for (int i = 0; i < 10; i++)
                {
                    var str = new string((char)(start + i), i + 1);
                    var data = Encoding.Default.GetBytes(str);
                    Context.GetLogger().Debug(str);

                    _simpleSender.Tell(Udp.Send.Create(ByteString.CopyFrom(data), _serverAddr));
                }
            });

            _serverAddr = serverAddr;
        }

        protected override void PreStart()
        {
            Context.System.Udp().Tell(Udp.SimpleSender.Instance);
        }
    }

    class Send
    {
        private Send() { }

        public static readonly Send Instance = new Send();
    }
}
@Horusiath
Copy link
Contributor

@blackclavus I haven't run your sample yet to investigate the issue, but are you sure, that this problem is not related to fact that UDP doesn't guarantee ordered delivery of datagrams? (in UDP receiver may get packets in different order than the one, in which sender has send them)

@blackclavus
Copy link
Author

I'm aware that UDP doesn't guarantee ordered delivery of datagrams, but I'm afraid that is not the case. The problem is: the content of ByteString seems to be replaced by newer data, but the length is still maintained. I also tested with another code with simple UDP Listener (without using Akka.IO.Udp) and it give correct ByteString.

Here is the code:

public class UdpListenerActor : ActorBase
    {
        private readonly UdpClient _udp;

        public UdpListenerActor(IPEndPoint serverAddr)
        {
            _udp = new UdpClient(serverAddr);
        }

        protected override void PreStart()
        {
            Self.Tell(Listen.Instance);
        }

        protected override bool Receive(object message)
        {
            if (message is Listen)
            {
                _udp.ReceiveAsync().PipeTo(Self, Self);
                return true;
            }

            if (message is Udp.Send send)
            {
                var datagram = send.Payload.ToArray();
                _udp.Send(datagram, datagram.Length, (IPEndPoint)send.Target);
                return true;
            }

            if (message is UdpReceiveResult result)
            {
                var receive = new UdpReceived(ByteString.FromBytes(result.Buffer), 
                      result.RemoteEndPoint);
                Context.System.EventStream.Publish(receive);
                Self.Tell(Listen.Instance);
                return true;
            }

            return false;
        }

        private class Listen
        {
            private Listen() { }

            public static readonly Listen Instance = new Listen();
        }
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants