Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize Json direct to Http PipeWriter #6369

Merged
merged 2 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 25 additions & 32 deletions src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ namespace Nethermind.Runner.JsonRpc
{
public class Startup
{
private static readonly byte _jsonOpeningBracket = Convert.ToByte('[');
private static readonly byte _jsonComma = Convert.ToByte(',');
private static readonly byte _jsonClosingBracket = Convert.ToByte(']');
private static ReadOnlySpan<byte> _jsonOpeningBracket => new byte[] { (byte)'[' };
private static ReadOnlySpan<byte> _jsonComma => new byte[] { (byte)',' };
private static ReadOnlySpan<byte> _jsonClosingBracket => new byte[] { (byte)']' };

public void ConfigureServices(IServiceCollection services)
{
Expand All @@ -54,7 +54,6 @@ public void ConfigureServices(IServiceCollection services)

services.Configure<KestrelServerOptions>(options =>
{
options.AllowSynchronousIO = true;
options.Limits.MaxRequestBodySize = jsonRpcConfig.MaxRequestBodySize;
options.ConfigureHttpsDefaults(co => co.SslProtocols |= SslProtocols.Tls13);
});
Expand All @@ -77,10 +76,10 @@ public void ConfigureServices(IServiceCollection services)

public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpcProcessor jsonRpcProcessor, IJsonRpcService jsonRpcService, IJsonRpcLocalStats jsonRpcLocalStats, IJsonSerializer jsonSerializer)
{
ValueTask<long> SerializeTimeoutException(IJsonRpcService service, Stream resultStream)
void SerializeTimeoutException(IJsonRpcService service, IBufferWriter<byte> resultStream)
{
JsonRpcErrorResponse? error = service.GetErrorResponse(ErrorCodes.Timeout, "Request was canceled due to enabled timeout.");
return jsonSerializer.SerializeAsync(resultStream, error);
jsonSerializer.Serialize(resultStream, error);
}

if (env.IsDevelopment())
Expand Down Expand Up @@ -156,7 +155,7 @@ ValueTask<long> SerializeTimeoutException(IJsonRpcService service, Stream result
JsonRpcErrorResponse? response = jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Authentication error");
ctx.Response.ContentType = "application/json";
ctx.Response.StatusCode = StatusCodes.Status403Forbidden;
await jsonSerializer.SerializeAsync(ctx.Response.Body, response);
jsonSerializer.Serialize(ctx.Response.BodyWriter, response);
await ctx.Response.CompleteAsync();
return;
}
Expand All @@ -166,20 +165,20 @@ ValueTask<long> SerializeTimeoutException(IJsonRpcService service, Stream result
try
{
JsonRpcContext jsonRpcContext = JsonRpcContext.Http(jsonRpcUrl);
long totalResponseSize = 0;
await foreach (JsonRpcResult result in jsonRpcProcessor.ProcessAsync(request, jsonRpcContext))
{
Stream resultStream = jsonRpcConfig.BufferResponses ? new MemoryStream() : ctx.Response.Body;
Stream stream = jsonRpcConfig.BufferResponses ? new MemoryStream() : null;
ICountingBufferWriter resultWriter = stream is not null ? new CountingStreamPipeWriter(stream) : new CountingPipeWriter(ctx.Response.BodyWriter);
benaadams marked this conversation as resolved.
Show resolved Hide resolved

long responseSize = 0;
try
{
ctx.Response.ContentType = "application/json";
ctx.Response.StatusCode = GetStatusCode(result);

if (result.IsCollection)
{
resultStream.WriteByte(_jsonOpeningBracket);
responseSize += 1;
resultWriter.Write(_jsonOpeningBracket);
bool first = true;
JsonRpcBatchResultAsyncEnumerator enumerator = result.BatchedResponses.GetAsyncEnumerator(CancellationToken.None);
try
Expand All @@ -191,18 +190,17 @@ ValueTask<long> SerializeTimeoutException(IJsonRpcService service, Stream result
{
if (!first)
{
resultStream.WriteByte(_jsonComma);
responseSize += 1;
resultWriter.Write(_jsonComma);
}

first = false;
responseSize += await jsonSerializer.SerializeAsync(resultStream, entry.Response);
jsonSerializer.Serialize(resultWriter, entry.Response);
_ = jsonRpcLocalStats.ReportCall(entry.Report);

// We reached the limit and don't want to responded to more request in the batch
if (!jsonRpcContext.IsAuthenticated && responseSize > jsonRpcConfig.MaxBatchResponseBodySize)
if (!jsonRpcContext.IsAuthenticated && totalResponseSize > jsonRpcConfig.MaxBatchResponseBodySize)
{
if (logger.IsWarn) logger.Warn($"The max batch response body size exceeded. The current response size {responseSize}, and the config setting is JsonRpc.{nameof(jsonRpcConfig.MaxBatchResponseBodySize)} = {jsonRpcConfig.MaxBatchResponseBodySize}");
if (logger.IsWarn) logger.Warn($"The max batch response body size exceeded. The current response size {totalResponseSize}, and the config setting is JsonRpc.{nameof(jsonRpcConfig.MaxBatchResponseBodySize)} = {jsonRpcConfig.MaxBatchResponseBodySize}");
enumerator.IsStopped = true;
}
}
Expand All @@ -213,48 +211,43 @@ ValueTask<long> SerializeTimeoutException(IJsonRpcService service, Stream result
await enumerator.DisposeAsync();
}

resultStream.WriteByte(_jsonClosingBracket);
responseSize += 1;
resultWriter.Write(_jsonClosingBracket);
}
else
{
using (result.Response)
{
await jsonSerializer.SerializeAsync(resultStream, result.Response);
jsonSerializer.Serialize(resultWriter, result.Response);
}
}

if (jsonRpcConfig.BufferResponses)
if (stream is not null)
{
ctx.Response.ContentLength = responseSize = resultStream.Length;
resultStream.Seek(0, SeekOrigin.Begin);
await resultStream.CopyToAsync(ctx.Response.Body);
ctx.Response.ContentLength = resultWriter.WrittenCount;
stream.Seek(0, SeekOrigin.Begin);
await stream.CopyToAsync(ctx.Response.Body);
}
}
catch (Exception e) when (e.InnerException is OperationCanceledException)
{
responseSize = await SerializeTimeoutException(jsonRpcService, resultStream);
SerializeTimeoutException(jsonRpcService, resultWriter);
}
catch (OperationCanceledException)
{
responseSize = await SerializeTimeoutException(jsonRpcService, resultStream);
SerializeTimeoutException(jsonRpcService, resultWriter);
}
finally
{
await ctx.Response.CompleteAsync();

if (jsonRpcConfig.BufferResponses)
{
await resultStream.DisposeAsync();
}
}

long handlingTimeMicroseconds = stopwatch.ElapsedMicroseconds();
_ = jsonRpcLocalStats.ReportCall(result.IsCollection
? new RpcReport("# collection serialization #", handlingTimeMicroseconds, true)
: result.Report.Value, handlingTimeMicroseconds, responseSize);
: result.Report.Value, handlingTimeMicroseconds, resultWriter.WrittenCount);

Interlocked.Add(ref Metrics.JsonRpcBytesSentHttp, responseSize);
totalResponseSize += resultWriter.WrittenCount;
Interlocked.Add(ref Metrics.JsonRpcBytesSentHttp, resultWriter.WrittenCount);

// There should be only one response because we don't expect multiple JSON tokens in the request
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
using System.Threading;
using System.Threading.Tasks;

using Nethermind.Core.Collections;

namespace Nethermind.Serialization.Json
{
public class EthereumJsonSerializer : IJsonSerializer
Expand Down Expand Up @@ -121,21 +119,26 @@ public long Serialize<T>(Stream stream, T value, bool indented = false, bool lea
JsonSerializer.Serialize(writer, value, indented ? JsonOptionsIndented : _jsonOptions);
countingWriter.Complete();

long outputCount = countingWriter.OutputCount;
long outputCount = countingWriter.WrittenCount;
return outputCount;
}

public async ValueTask<long> SerializeAsync<T>(Stream stream, T value, bool indented = false, bool leaveOpen = true)
{
var countingWriter = GetPipeWriter(stream, leaveOpen);
using var writer = new Utf8JsonWriter(countingWriter, new JsonWriterOptions() { SkipValidation = true, Indented = indented });
JsonSerializer.Serialize(writer, value, indented ? JsonOptionsIndented : _jsonOptions);
await countingWriter.CompleteAsync();
var writer = GetPipeWriter(stream, leaveOpen);
Serialize(writer, value, indented);
await writer.CompleteAsync();

long outputCount = countingWriter.OutputCount;
long outputCount = writer.WrittenCount;
return outputCount;
}

public void Serialize<T>(IBufferWriter<byte> writer, T value, bool indented = false)
{
using var jsonWriter = new Utf8JsonWriter(writer, new JsonWriterOptions() { SkipValidation = true, Indented = indented });
JsonSerializer.Serialize(jsonWriter, value, indented ? JsonOptionsIndented : _jsonOptions);
}

public static void SerializeToStream<T>(Stream stream, T value, bool indented = false)
{
JsonSerializer.Serialize(stream, value, indented ? JsonOptionsIndented : JsonOptions);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;

namespace Nethermind.Serialization.Json
Expand All @@ -13,5 +15,6 @@ public interface IJsonSerializer
string Serialize<T>(T value, bool indented = false);
long Serialize<T>(Stream stream, T value, bool indented = false, bool leaveOpen = true);
ValueTask<long> SerializeAsync<T>(Stream stream, T value, bool indented = false, bool leaveOpen = true);
void Serialize<T>(IBufferWriter<byte> writer, T value, bool indented = false);
}
}
42 changes: 35 additions & 7 deletions src/Nethermind/Nethermind.Serialization.Json/StreamPipeWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,35 @@ namespace Nethermind.Serialization.Json;

#nullable enable

internal sealed class CountingStreamPipeWriter : PipeWriter
public interface ICountingBufferWriter : IBufferWriter<byte>
{
public long WrittenCount { get; }
}

public sealed class CountingPipeWriter : ICountingBufferWriter
{
private readonly PipeWriter _writer;
public long WrittenCount { get; private set; }

public CountingPipeWriter(PipeWriter writer)
{
ArgumentNullException.ThrowIfNull(writer);

_writer = writer;
}

public void Advance(int count)
{
_writer.Advance(count);
WrittenCount += count;
}

public Memory<byte> GetMemory(int sizeHint = 0) => _writer.GetMemory(sizeHint);

public Span<byte> GetSpan(int sizeHint = 0) => _writer.GetSpan(sizeHint);
}

public sealed class CountingStreamPipeWriter : PipeWriter, ICountingBufferWriter
{
internal const int InitialSegmentPoolSize = 4; // 16K
internal const int MaxSegmentPoolSize = 256; // 1MB
Expand Down Expand Up @@ -50,7 +78,7 @@ private CancellationTokenSource InternalTokenSource
}
}

public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions options)
public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions? options = null)
{
if (writingStream is null)
{
Expand All @@ -62,18 +90,18 @@ public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions op
}

InnerStream = writingStream;
_minimumBufferSize = options.MinimumBufferSize;
_pool = options.Pool == MemoryPool<byte>.Shared ? null : options.Pool;
_minimumBufferSize = options?.MinimumBufferSize ?? 4096;
_pool = options?.Pool == MemoryPool<byte>.Shared ? null : options?.Pool;
_maxPooledBufferSize = _pool?.MaxBufferSize ?? -1;
_bufferSegmentPool = new BufferSegmentStack(InitialSegmentPoolSize);
_leaveOpen = options.LeaveOpen;
_leaveOpen = options?.LeaveOpen ?? true;
}

/// <summary>
/// Gets the inner stream that is being written to.
/// </summary>
public Stream InnerStream { get; }
public long OutputCount { get; set; }
public long WrittenCount { get; set; }

/// <inheritdoc />
public override void Advance(int bytes)
Expand All @@ -86,7 +114,7 @@ public override void Advance(int bytes)
_tailBytesBuffered += bytes;
_bytesBuffered += bytes;
_tailMemory = _tailMemory.Slice(bytes);
OutputCount += bytes;
WrittenCount += bytes;

if (_bytesBuffered > _minimumBufferSize)
{
Expand Down