Skip to content

Commit

Permalink
Serialize Json direct to Http PipeWriter (#6369)
Browse files Browse the repository at this point in the history
* Serialize Json direct to Http PipeWriter

* Whitespace
  • Loading branch information
benaadams authored Dec 13, 2023
1 parent 7a768f7 commit dd88db6
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 47 deletions.
57 changes: 25 additions & 32 deletions src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ namespace Nethermind.Runner.JsonRpc
{
public class Startup
{
private static readonly byte _jsonOpeningBracket = Convert.ToByte('[');
private static readonly byte _jsonComma = Convert.ToByte(',');
private static readonly byte _jsonClosingBracket = Convert.ToByte(']');
private static ReadOnlySpan<byte> _jsonOpeningBracket => new byte[] { (byte)'[' };
private static ReadOnlySpan<byte> _jsonComma => new byte[] { (byte)',' };
private static ReadOnlySpan<byte> _jsonClosingBracket => new byte[] { (byte)']' };

public void ConfigureServices(IServiceCollection services)
{
Expand All @@ -54,7 +54,6 @@ public void ConfigureServices(IServiceCollection services)

services.Configure<KestrelServerOptions>(options =>
{
options.AllowSynchronousIO = true;
options.Limits.MaxRequestBodySize = jsonRpcConfig.MaxRequestBodySize;
options.ConfigureHttpsDefaults(co => co.SslProtocols |= SslProtocols.Tls13);
});
Expand All @@ -77,10 +76,10 @@ public void ConfigureServices(IServiceCollection services)

public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpcProcessor jsonRpcProcessor, IJsonRpcService jsonRpcService, IJsonRpcLocalStats jsonRpcLocalStats, IJsonSerializer jsonSerializer)
{
ValueTask<long> SerializeTimeoutException(IJsonRpcService service, Stream resultStream)
void SerializeTimeoutException(IJsonRpcService service, IBufferWriter<byte> resultStream)
{
JsonRpcErrorResponse? error = service.GetErrorResponse(ErrorCodes.Timeout, "Request was canceled due to enabled timeout.");
return jsonSerializer.SerializeAsync(resultStream, error);
jsonSerializer.Serialize(resultStream, error);
}

if (env.IsDevelopment())
Expand Down Expand Up @@ -156,7 +155,7 @@ ValueTask<long> SerializeTimeoutException(IJsonRpcService service, Stream result
JsonRpcErrorResponse? response = jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Authentication error");
ctx.Response.ContentType = "application/json";
ctx.Response.StatusCode = StatusCodes.Status403Forbidden;
await jsonSerializer.SerializeAsync(ctx.Response.Body, response);
jsonSerializer.Serialize(ctx.Response.BodyWriter, response);
await ctx.Response.CompleteAsync();
return;
}
Expand All @@ -166,20 +165,20 @@ ValueTask<long> SerializeTimeoutException(IJsonRpcService service, Stream result
try
{
JsonRpcContext jsonRpcContext = JsonRpcContext.Http(jsonRpcUrl);
long totalResponseSize = 0;
await foreach (JsonRpcResult result in jsonRpcProcessor.ProcessAsync(request, jsonRpcContext))
{
Stream resultStream = jsonRpcConfig.BufferResponses ? new MemoryStream() : ctx.Response.Body;
Stream stream = jsonRpcConfig.BufferResponses ? new MemoryStream() : null;
ICountingBufferWriter resultWriter = stream is not null ? new CountingStreamPipeWriter(stream) : new CountingPipeWriter(ctx.Response.BodyWriter);
long responseSize = 0;
try
{
ctx.Response.ContentType = "application/json";
ctx.Response.StatusCode = GetStatusCode(result);
if (result.IsCollection)
{
resultStream.WriteByte(_jsonOpeningBracket);
responseSize += 1;
resultWriter.Write(_jsonOpeningBracket);
bool first = true;
JsonRpcBatchResultAsyncEnumerator enumerator = result.BatchedResponses.GetAsyncEnumerator(CancellationToken.None);
try
Expand All @@ -191,18 +190,17 @@ ValueTask<long> SerializeTimeoutException(IJsonRpcService service, Stream result
{
if (!first)
{
resultStream.WriteByte(_jsonComma);
responseSize += 1;
resultWriter.Write(_jsonComma);
}
first = false;
responseSize += await jsonSerializer.SerializeAsync(resultStream, entry.Response);
jsonSerializer.Serialize(resultWriter, entry.Response);
_ = jsonRpcLocalStats.ReportCall(entry.Report);
// We reached the limit and don't want to responded to more request in the batch
if (!jsonRpcContext.IsAuthenticated && responseSize > jsonRpcConfig.MaxBatchResponseBodySize)
if (!jsonRpcContext.IsAuthenticated && totalResponseSize > jsonRpcConfig.MaxBatchResponseBodySize)
{
if (logger.IsWarn) logger.Warn($"The max batch response body size exceeded. The current response size {responseSize}, and the config setting is JsonRpc.{nameof(jsonRpcConfig.MaxBatchResponseBodySize)} = {jsonRpcConfig.MaxBatchResponseBodySize}");
if (logger.IsWarn) logger.Warn($"The max batch response body size exceeded. The current response size {totalResponseSize}, and the config setting is JsonRpc.{nameof(jsonRpcConfig.MaxBatchResponseBodySize)} = {jsonRpcConfig.MaxBatchResponseBodySize}");
enumerator.IsStopped = true;
}
}
Expand All @@ -213,48 +211,43 @@ ValueTask<long> SerializeTimeoutException(IJsonRpcService service, Stream result
await enumerator.DisposeAsync();
}
resultStream.WriteByte(_jsonClosingBracket);
responseSize += 1;
resultWriter.Write(_jsonClosingBracket);
}
else
{
using (result.Response)
{
await jsonSerializer.SerializeAsync(resultStream, result.Response);
jsonSerializer.Serialize(resultWriter, result.Response);
}
}
if (jsonRpcConfig.BufferResponses)
if (stream is not null)
{
ctx.Response.ContentLength = responseSize = resultStream.Length;
resultStream.Seek(0, SeekOrigin.Begin);
await resultStream.CopyToAsync(ctx.Response.Body);
ctx.Response.ContentLength = resultWriter.WrittenCount;
stream.Seek(0, SeekOrigin.Begin);
await stream.CopyToAsync(ctx.Response.Body);
}
}
catch (Exception e) when (e.InnerException is OperationCanceledException)
{
responseSize = await SerializeTimeoutException(jsonRpcService, resultStream);
SerializeTimeoutException(jsonRpcService, resultWriter);
}
catch (OperationCanceledException)
{
responseSize = await SerializeTimeoutException(jsonRpcService, resultStream);
SerializeTimeoutException(jsonRpcService, resultWriter);
}
finally
{
await ctx.Response.CompleteAsync();
if (jsonRpcConfig.BufferResponses)
{
await resultStream.DisposeAsync();
}
}
long handlingTimeMicroseconds = stopwatch.ElapsedMicroseconds();
_ = jsonRpcLocalStats.ReportCall(result.IsCollection
? new RpcReport("# collection serialization #", handlingTimeMicroseconds, true)
: result.Report.Value, handlingTimeMicroseconds, responseSize);
: result.Report.Value, handlingTimeMicroseconds, resultWriter.WrittenCount);
Interlocked.Add(ref Metrics.JsonRpcBytesSentHttp, responseSize);
totalResponseSize += resultWriter.WrittenCount;
Interlocked.Add(ref Metrics.JsonRpcBytesSentHttp, resultWriter.WrittenCount);
// There should be only one response because we don't expect multiple JSON tokens in the request
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
using System.Threading;
using System.Threading.Tasks;

using Nethermind.Core.Collections;

namespace Nethermind.Serialization.Json
{
public class EthereumJsonSerializer : IJsonSerializer
Expand Down Expand Up @@ -121,21 +119,26 @@ public long Serialize<T>(Stream stream, T value, bool indented = false, bool lea
JsonSerializer.Serialize(writer, value, indented ? JsonOptionsIndented : _jsonOptions);
countingWriter.Complete();

long outputCount = countingWriter.OutputCount;
long outputCount = countingWriter.WrittenCount;
return outputCount;
}

public async ValueTask<long> SerializeAsync<T>(Stream stream, T value, bool indented = false, bool leaveOpen = true)
{
var countingWriter = GetPipeWriter(stream, leaveOpen);
using var writer = new Utf8JsonWriter(countingWriter, new JsonWriterOptions() { SkipValidation = true, Indented = indented });
JsonSerializer.Serialize(writer, value, indented ? JsonOptionsIndented : _jsonOptions);
await countingWriter.CompleteAsync();
var writer = GetPipeWriter(stream, leaveOpen);
Serialize(writer, value, indented);
await writer.CompleteAsync();

long outputCount = countingWriter.OutputCount;
long outputCount = writer.WrittenCount;
return outputCount;
}

public void Serialize<T>(IBufferWriter<byte> writer, T value, bool indented = false)
{
using var jsonWriter = new Utf8JsonWriter(writer, new JsonWriterOptions() { SkipValidation = true, Indented = indented });
JsonSerializer.Serialize(jsonWriter, value, indented ? JsonOptionsIndented : _jsonOptions);
}

public static void SerializeToStream<T>(Stream stream, T value, bool indented = false)
{
JsonSerializer.Serialize(stream, value, indented ? JsonOptionsIndented : JsonOptions);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;

namespace Nethermind.Serialization.Json
Expand All @@ -13,5 +15,6 @@ public interface IJsonSerializer
string Serialize<T>(T value, bool indented = false);
long Serialize<T>(Stream stream, T value, bool indented = false, bool leaveOpen = true);
ValueTask<long> SerializeAsync<T>(Stream stream, T value, bool indented = false, bool leaveOpen = true);
void Serialize<T>(IBufferWriter<byte> writer, T value, bool indented = false);
}
}
42 changes: 35 additions & 7 deletions src/Nethermind/Nethermind.Serialization.Json/StreamPipeWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,35 @@ namespace Nethermind.Serialization.Json;

#nullable enable

internal sealed class CountingStreamPipeWriter : PipeWriter
public interface ICountingBufferWriter : IBufferWriter<byte>
{
public long WrittenCount { get; }
}

public sealed class CountingPipeWriter : ICountingBufferWriter
{
private readonly PipeWriter _writer;
public long WrittenCount { get; private set; }

public CountingPipeWriter(PipeWriter writer)
{
ArgumentNullException.ThrowIfNull(writer);

_writer = writer;
}

public void Advance(int count)
{
_writer.Advance(count);
WrittenCount += count;
}

public Memory<byte> GetMemory(int sizeHint = 0) => _writer.GetMemory(sizeHint);

public Span<byte> GetSpan(int sizeHint = 0) => _writer.GetSpan(sizeHint);
}

public sealed class CountingStreamPipeWriter : PipeWriter, ICountingBufferWriter
{
internal const int InitialSegmentPoolSize = 4; // 16K
internal const int MaxSegmentPoolSize = 256; // 1MB
Expand Down Expand Up @@ -50,7 +78,7 @@ private CancellationTokenSource InternalTokenSource
}
}

public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions options)
public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions? options = null)
{
if (writingStream is null)
{
Expand All @@ -62,18 +90,18 @@ public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions op
}

InnerStream = writingStream;
_minimumBufferSize = options.MinimumBufferSize;
_pool = options.Pool == MemoryPool<byte>.Shared ? null : options.Pool;
_minimumBufferSize = options?.MinimumBufferSize ?? 4096;
_pool = options?.Pool == MemoryPool<byte>.Shared ? null : options?.Pool;
_maxPooledBufferSize = _pool?.MaxBufferSize ?? -1;
_bufferSegmentPool = new BufferSegmentStack(InitialSegmentPoolSize);
_leaveOpen = options.LeaveOpen;
_leaveOpen = options?.LeaveOpen ?? true;
}

/// <summary>
/// Gets the inner stream that is being written to.
/// </summary>
public Stream InnerStream { get; }
public long OutputCount { get; set; }
public long WrittenCount { get; set; }

/// <inheritdoc />
public override void Advance(int bytes)
Expand All @@ -86,7 +114,7 @@ public override void Advance(int bytes)
_tailBytesBuffered += bytes;
_bytesBuffered += bytes;
_tailMemory = _tailMemory.Slice(bytes);
OutputCount += bytes;
WrittenCount += bytes;

if (_bytesBuffered > _minimumBufferSize)
{
Expand Down

0 comments on commit dd88db6

Please sign in to comment.