Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/buffer reallocation #4306

Merged
merged 2 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,25 @@ public override void ChannelReadComplete(IChannelHandlerContext context)

public async void SendMsg(DiscoveryMsg discoveryMsg)
{
byte[] msgBytes;
IByteBuffer msgBuffer = Unpooled.Buffer(64);

try
{
if (_logger.IsTrace) _logger.Trace($"Sending message: {discoveryMsg}");
msgBytes = Serialize(discoveryMsg);
Serialize(discoveryMsg, msgBuffer);
}
catch (Exception e)
{
_logger.Error($"Error during serialization of the message: {discoveryMsg}", e);
return;
}

if (msgBytes.Length > 1280)
if (msgBuffer.ReadableBytes > 1280)
{
if (_logger.IsWarn) _logger.Warn($"Attempting to send message larger than 1280 bytes. This is out of spec and may not work for all client. Msg: ${discoveryMsg}");
}

IByteBuffer copiedBuffer = Unpooled.CopiedBuffer(msgBytes);
IAddressedEnvelope<IByteBuffer> packet = new DatagramPacket(copiedBuffer, discoveryMsg.FarAddress);
IAddressedEnvelope<IByteBuffer> packet = new DatagramPacket(msgBuffer, discoveryMsg.FarAddress);

await _channel.WriteAndFlushAsync(packet).ContinueWith(t =>
{
Expand All @@ -109,7 +108,7 @@ await _channel.WriteAndFlushAsync(packet).ContinueWith(t =>
}
});

Interlocked.Add(ref Metrics.DiscoveryBytesSent, msgBytes.Length);
Interlocked.Add(ref Metrics.DiscoveryBytesSent, msgBuffer.ReadableBytes);
}
protected override void ChannelRead0(IChannelHandlerContext ctx, DatagramPacket packet)
{
Expand Down Expand Up @@ -193,6 +192,33 @@ private byte[] Serialize(DiscoveryMsg msg)
};
}

private void Serialize(DiscoveryMsg msg, IByteBuffer msgBuffer)
{
switch (msg.MsgType)
{
case MsgType.Ping:
_msgSerializationService.ZeroSerialize((PingMsg)msg, msgBuffer);
break;
case MsgType.Pong:
_msgSerializationService.ZeroSerialize((PongMsg)msg, msgBuffer);
break;
case MsgType.FindNode :
_msgSerializationService.ZeroSerialize((FindNodeMsg)msg, msgBuffer);
break;
case MsgType.Neighbors :
_msgSerializationService.ZeroSerialize((NeighborsMsg)msg, msgBuffer);
break;
case MsgType.EnrRequest :
_msgSerializationService.ZeroSerialize((EnrRequestMsg)msg, msgBuffer);
break;
case MsgType.EnrResponse :
_msgSerializationService.ZeroSerialize((EnrResponseMsg)msg, msgBuffer);
break;
default:
throw new Exception($"Unsupported messageType: {msg.MsgType}");
}
}

private bool ValidateMsg(DiscoveryMsg msg, MsgType type, EndPoint address, IChannelHandlerContext ctx, DatagramPacket packet)
{
long timeToExpire = msg.ExpirationTime - _timestamper.UnixTime.SecondsLong;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace Nethermind.Network
public interface IMessageSerializationService
{
IByteBuffer ZeroSerialize<T>(T message) where T : MessageBase;
void ZeroSerialize<T>(T message, IByteBuffer msgBuffer) where T : MessageBase;
T Deserialize<T>(byte[] bytes) where T : MessageBase;
T Deserialize<T>(IByteBuffer buffer) where T : MessageBase;
void Register(Assembly assembly);
Expand Down
25 changes: 25 additions & 0 deletions src/Nethermind/Nethermind.Network/MessageSerializationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,31 @@ zeroMessageSerializer is IZeroInnerMessageSerializer<T> zeroInnerMessageSerializ
throw new InvalidOperationException($"No {nameof(IZeroMessageSerializer<T>)} registered for {typeof(T).Name}.");
}

public void ZeroSerialize<T>(T message, IByteBuffer msgBuffer) where T : MessageBase
{
void WriteAdaptivePacketType(in IByteBuffer buffer)
{
switch (message)
{
case HelloMessage:
break;
case DisconnectMessage:
break;
case P2PMessage p2PMessage:
buffer.WriteByte(p2PMessage.AdaptivePacketType);
break;
}
}

if (TryGetZeroSerializer(out IZeroMessageSerializer<T> zeroMessageSerializer))
{
WriteAdaptivePacketType(msgBuffer);
zeroMessageSerializer.Serialize(msgBuffer, message);
return;
}
throw new InvalidOperationException($"No {nameof(IZeroMessageSerializer<T>)} registered for {typeof(T).Name}.");
}

private bool TryGetZeroSerializer<T>(out IZeroMessageSerializer<T> serializer) where T : MessageBase
{
RuntimeTypeHandle typeHandle = typeof(T).TypeHandle;
Expand Down