Skip to content

Commit

Permalink
Fix/buffer reallocation (#4306)
Browse files Browse the repository at this point in the history
* serialization

* reuse buffer
  • Loading branch information
tanishqjasoria authored Jul 25, 2022
1 parent c4ce05e commit b52682a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,25 @@ public override void ChannelReadComplete(IChannelHandlerContext context)

public async void SendMsg(DiscoveryMsg discoveryMsg)
{
byte[] msgBytes;
IByteBuffer msgBuffer = Unpooled.Buffer(64);

try
{
if (_logger.IsTrace) _logger.Trace($"Sending message: {discoveryMsg}");
msgBytes = Serialize(discoveryMsg);
Serialize(discoveryMsg, msgBuffer);
}
catch (Exception e)
{
_logger.Error($"Error during serialization of the message: {discoveryMsg}", e);
return;
}

if (msgBytes.Length > 1280)
if (msgBuffer.ReadableBytes > 1280)
{
if (_logger.IsWarn) _logger.Warn($"Attempting to send message larger than 1280 bytes. This is out of spec and may not work for all client. Msg: ${discoveryMsg}");
}

IByteBuffer copiedBuffer = Unpooled.CopiedBuffer(msgBytes);
IAddressedEnvelope<IByteBuffer> packet = new DatagramPacket(copiedBuffer, discoveryMsg.FarAddress);
IAddressedEnvelope<IByteBuffer> packet = new DatagramPacket(msgBuffer, discoveryMsg.FarAddress);

await _channel.WriteAndFlushAsync(packet).ContinueWith(t =>
{
Expand All @@ -109,7 +108,7 @@ await _channel.WriteAndFlushAsync(packet).ContinueWith(t =>
}
});

Interlocked.Add(ref Metrics.DiscoveryBytesSent, msgBytes.Length);
Interlocked.Add(ref Metrics.DiscoveryBytesSent, msgBuffer.ReadableBytes);
}
protected override void ChannelRead0(IChannelHandlerContext ctx, DatagramPacket packet)
{
Expand Down Expand Up @@ -193,6 +192,33 @@ private byte[] Serialize(DiscoveryMsg msg)
};
}

private void Serialize(DiscoveryMsg msg, IByteBuffer msgBuffer)
{
switch (msg.MsgType)
{
case MsgType.Ping:
_msgSerializationService.ZeroSerialize((PingMsg)msg, msgBuffer);
break;
case MsgType.Pong:
_msgSerializationService.ZeroSerialize((PongMsg)msg, msgBuffer);
break;
case MsgType.FindNode :
_msgSerializationService.ZeroSerialize((FindNodeMsg)msg, msgBuffer);
break;
case MsgType.Neighbors :
_msgSerializationService.ZeroSerialize((NeighborsMsg)msg, msgBuffer);
break;
case MsgType.EnrRequest :
_msgSerializationService.ZeroSerialize((EnrRequestMsg)msg, msgBuffer);
break;
case MsgType.EnrResponse :
_msgSerializationService.ZeroSerialize((EnrResponseMsg)msg, msgBuffer);
break;
default:
throw new Exception($"Unsupported messageType: {msg.MsgType}");
}
}

private bool ValidateMsg(DiscoveryMsg msg, MsgType type, EndPoint address, IChannelHandlerContext ctx, DatagramPacket packet)
{
long timeToExpire = msg.ExpirationTime - _timestamper.UnixTime.SecondsLong;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace Nethermind.Network
public interface IMessageSerializationService
{
IByteBuffer ZeroSerialize<T>(T message) where T : MessageBase;
void ZeroSerialize<T>(T message, IByteBuffer msgBuffer) where T : MessageBase;
T Deserialize<T>(byte[] bytes) where T : MessageBase;
T Deserialize<T>(IByteBuffer buffer) where T : MessageBase;
void Register(Assembly assembly);
Expand Down
25 changes: 25 additions & 0 deletions src/Nethermind/Nethermind.Network/MessageSerializationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,31 @@ zeroMessageSerializer is IZeroInnerMessageSerializer<T> zeroInnerMessageSerializ
throw new InvalidOperationException($"No {nameof(IZeroMessageSerializer<T>)} registered for {typeof(T).Name}.");
}

public void ZeroSerialize<T>(T message, IByteBuffer msgBuffer) where T : MessageBase
{
void WriteAdaptivePacketType(in IByteBuffer buffer)
{
switch (message)
{
case HelloMessage:
break;
case DisconnectMessage:
break;
case P2PMessage p2PMessage:
buffer.WriteByte(p2PMessage.AdaptivePacketType);
break;
}
}

if (TryGetZeroSerializer(out IZeroMessageSerializer<T> zeroMessageSerializer))
{
WriteAdaptivePacketType(msgBuffer);
zeroMessageSerializer.Serialize(msgBuffer, message);
return;
}
throw new InvalidOperationException($"No {nameof(IZeroMessageSerializer<T>)} registered for {typeof(T).Name}.");
}

private bool TryGetZeroSerializer<T>(out IZeroMessageSerializer<T> serializer) where T : MessageBase
{
RuntimeTypeHandle typeHandle = typeof(T).TypeHandle;
Expand Down

0 comments on commit b52682a

Please sign in to comment.