diff --git a/src/Nethermind/Nethermind.Network.Discovery/NettyDiscoveryHandler.cs b/src/Nethermind/Nethermind.Network.Discovery/NettyDiscoveryHandler.cs index cd1c1e343dd..bbbb678244c 100644 --- a/src/Nethermind/Nethermind.Network.Discovery/NettyDiscoveryHandler.cs +++ b/src/Nethermind/Nethermind.Network.Discovery/NettyDiscoveryHandler.cs @@ -80,12 +80,12 @@ public override void ChannelReadComplete(IChannelHandlerContext context) public async void SendMsg(DiscoveryMsg discoveryMsg) { - byte[] msgBytes; + IByteBuffer msgBuffer = Unpooled.Buffer(64); try { if (_logger.IsTrace) _logger.Trace($"Sending message: {discoveryMsg}"); - msgBytes = Serialize(discoveryMsg); + Serialize(discoveryMsg, msgBuffer); } catch (Exception e) { @@ -93,13 +93,12 @@ public async void SendMsg(DiscoveryMsg discoveryMsg) return; } - if (msgBytes.Length > 1280) + if (msgBuffer.ReadableBytes > 1280) { if (_logger.IsWarn) _logger.Warn($"Attempting to send message larger than 1280 bytes. This is out of spec and may not work for all client. Msg: ${discoveryMsg}"); } - IByteBuffer copiedBuffer = Unpooled.CopiedBuffer(msgBytes); - IAddressedEnvelope packet = new DatagramPacket(copiedBuffer, discoveryMsg.FarAddress); + IAddressedEnvelope packet = new DatagramPacket(msgBuffer, discoveryMsg.FarAddress); await _channel.WriteAndFlushAsync(packet).ContinueWith(t => { @@ -109,7 +108,7 @@ await _channel.WriteAndFlushAsync(packet).ContinueWith(t => } }); - Interlocked.Add(ref Metrics.DiscoveryBytesSent, msgBytes.Length); + Interlocked.Add(ref Metrics.DiscoveryBytesSent, msgBuffer.ReadableBytes); } protected override void ChannelRead0(IChannelHandlerContext ctx, DatagramPacket packet) { @@ -193,6 +192,33 @@ private byte[] Serialize(DiscoveryMsg msg) }; } + private void Serialize(DiscoveryMsg msg, IByteBuffer msgBuffer) + { + switch (msg.MsgType) + { + case MsgType.Ping: + _msgSerializationService.ZeroSerialize((PingMsg)msg, msgBuffer); + break; + case MsgType.Pong: + _msgSerializationService.ZeroSerialize((PongMsg)msg, msgBuffer); + break; + case MsgType.FindNode : + _msgSerializationService.ZeroSerialize((FindNodeMsg)msg, msgBuffer); + break; + case MsgType.Neighbors : + _msgSerializationService.ZeroSerialize((NeighborsMsg)msg, msgBuffer); + break; + case MsgType.EnrRequest : + _msgSerializationService.ZeroSerialize((EnrRequestMsg)msg, msgBuffer); + break; + case MsgType.EnrResponse : + _msgSerializationService.ZeroSerialize((EnrResponseMsg)msg, msgBuffer); + break; + default: + throw new Exception($"Unsupported messageType: {msg.MsgType}"); + } + } + private bool ValidateMsg(DiscoveryMsg msg, MsgType type, EndPoint address, IChannelHandlerContext ctx, DatagramPacket packet) { long timeToExpire = msg.ExpirationTime - _timestamper.UnixTime.SecondsLong; diff --git a/src/Nethermind/Nethermind.Network/IMessageSerializationService.cs b/src/Nethermind/Nethermind.Network/IMessageSerializationService.cs index 1c82d83f606..a5d1094d29e 100644 --- a/src/Nethermind/Nethermind.Network/IMessageSerializationService.cs +++ b/src/Nethermind/Nethermind.Network/IMessageSerializationService.cs @@ -22,6 +22,7 @@ namespace Nethermind.Network public interface IMessageSerializationService { IByteBuffer ZeroSerialize(T message) where T : MessageBase; + void ZeroSerialize(T message, IByteBuffer msgBuffer) where T : MessageBase; T Deserialize(byte[] bytes) where T : MessageBase; T Deserialize(IByteBuffer buffer) where T : MessageBase; void Register(Assembly assembly); diff --git a/src/Nethermind/Nethermind.Network/MessageSerializationService.cs b/src/Nethermind/Nethermind.Network/MessageSerializationService.cs index cfae3f216bd..cd87c3cb9bf 100644 --- a/src/Nethermind/Nethermind.Network/MessageSerializationService.cs +++ b/src/Nethermind/Nethermind.Network/MessageSerializationService.cs @@ -123,6 +123,31 @@ zeroMessageSerializer is IZeroInnerMessageSerializer zeroInnerMessageSerializ throw new InvalidOperationException($"No {nameof(IZeroMessageSerializer)} registered for {typeof(T).Name}."); } + public void ZeroSerialize(T message, IByteBuffer msgBuffer) where T : MessageBase + { + void WriteAdaptivePacketType(in IByteBuffer buffer) + { + switch (message) + { + case HelloMessage: + break; + case DisconnectMessage: + break; + case P2PMessage p2PMessage: + buffer.WriteByte(p2PMessage.AdaptivePacketType); + break; + } + } + + if (TryGetZeroSerializer(out IZeroMessageSerializer zeroMessageSerializer)) + { + WriteAdaptivePacketType(msgBuffer); + zeroMessageSerializer.Serialize(msgBuffer, message); + return; + } + throw new InvalidOperationException($"No {nameof(IZeroMessageSerializer)} registered for {typeof(T).Name}."); + } + private bool TryGetZeroSerializer(out IZeroMessageSerializer serializer) where T : MessageBase { RuntimeTypeHandle typeHandle = typeof(T).TypeHandle;