Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Serialize Json direct to Http PipeWriter" (#6369) #6613

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 33 additions & 22 deletions src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ namespace Nethermind.Runner.JsonRpc
{
public class Startup
{
private static ReadOnlySpan<byte> _jsonOpeningBracket => new byte[] { (byte)'[' };
private static ReadOnlySpan<byte> _jsonComma => new byte[] { (byte)',' };
private static ReadOnlySpan<byte> _jsonClosingBracket => new byte[] { (byte)']' };
private static readonly byte _jsonOpeningBracket = Convert.ToByte('[');
private static readonly byte _jsonComma = Convert.ToByte(',');
private static readonly byte _jsonClosingBracket = Convert.ToByte(']');

public void ConfigureServices(IServiceCollection services)
{
Expand All @@ -55,6 +55,7 @@ public void ConfigureServices(IServiceCollection services)

services.Configure<KestrelServerOptions>(options =>
{
options.AllowSynchronousIO = true;
options.Limits.MaxRequestBodySize = jsonRpcConfig.MaxRequestBodySize;
options.ConfigureHttpsDefaults(co => co.SslProtocols |= SslProtocols.Tls13);
});
Expand All @@ -77,10 +78,10 @@ public void ConfigureServices(IServiceCollection services)

public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpcProcessor jsonRpcProcessor, IJsonRpcService jsonRpcService, IJsonRpcLocalStats jsonRpcLocalStats, IJsonSerializer jsonSerializer)
{
void SerializeTimeoutException(IJsonRpcService service, IBufferWriter<byte> resultStream)
ValueTask<long> SerializeTimeoutException(IJsonRpcService service, Stream resultStream)
{
JsonRpcErrorResponse? error = service.GetErrorResponse(ErrorCodes.Timeout, "Request was canceled due to enabled timeout.");
jsonSerializer.Serialize(resultStream, error);
return jsonSerializer.SerializeAsync(resultStream, error);
}

if (env.IsDevelopment())
Expand Down Expand Up @@ -156,7 +157,7 @@ void SerializeTimeoutException(IJsonRpcService service, IBufferWriter<byte> resu
JsonRpcErrorResponse? response = jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Authentication error");
ctx.Response.ContentType = "application/json";
ctx.Response.StatusCode = StatusCodes.Status403Forbidden;
jsonSerializer.Serialize(ctx.Response.BodyWriter, response);
await jsonSerializer.SerializeAsync(ctx.Response.Body, response);
await ctx.Response.CompleteAsync();
return;
}
Expand All @@ -169,15 +170,18 @@ void SerializeTimeoutException(IJsonRpcService service, IBufferWriter<byte> resu
await foreach (JsonRpcResult result in jsonRpcProcessor.ProcessAsync(request, jsonRpcContext))
{
using Stream stream = jsonRpcConfig.BufferResponses ? RecyclableStream.GetStream("http") : null;
ICountingBufferWriter resultWriter = stream is not null ? new CountingStreamPipeWriter(stream) : new CountingPipeWriter(ctx.Response.BodyWriter);
Stream resultStream = jsonRpcConfig.BufferResponses ? stream : ctx.Response.Body;

long responseSize = 0;
try
{
ctx.Response.ContentType = "application/json";
ctx.Response.StatusCode = GetStatusCode(result);

if (result.IsCollection)
{
resultWriter.Write(_jsonOpeningBracket);
resultStream.WriteByte(_jsonOpeningBracket);
responseSize += 1;
bool first = true;
JsonRpcBatchResultAsyncEnumerator enumerator = result.BatchedResponses.GetAsyncEnumerator(CancellationToken.None);
try
Expand All @@ -189,17 +193,18 @@ void SerializeTimeoutException(IJsonRpcService service, IBufferWriter<byte> resu
{
if (!first)
{
resultWriter.Write(_jsonComma);
resultStream.WriteByte(_jsonComma);
responseSize += 1;
}

first = false;
jsonSerializer.Serialize(resultWriter, entry.Response);
responseSize += await jsonSerializer.SerializeAsync(resultStream, entry.Response);
_ = jsonRpcLocalStats.ReportCall(entry.Report);

// We reached the limit and don't want to responded to more request in the batch
if (!jsonRpcContext.IsAuthenticated && resultWriter.WrittenCount > jsonRpcConfig.MaxBatchResponseBodySize)
if (!jsonRpcContext.IsAuthenticated && responseSize > jsonRpcConfig.MaxBatchResponseBodySize)
{
if (logger.IsWarn) logger.Warn($"The max batch response body size exceeded. The current response size {resultWriter.WrittenCount}, and the config setting is JsonRpc.{nameof(jsonRpcConfig.MaxBatchResponseBodySize)} = {jsonRpcConfig.MaxBatchResponseBodySize}");
if (logger.IsWarn) logger.Warn($"The max batch response body size exceeded. The current response size {responseSize}, and the config setting is JsonRpc.{nameof(jsonRpcConfig.MaxBatchResponseBodySize)} = {jsonRpcConfig.MaxBatchResponseBodySize}");
enumerator.IsStopped = true;
}
}
Expand All @@ -210,42 +215,48 @@ void SerializeTimeoutException(IJsonRpcService service, IBufferWriter<byte> resu
await enumerator.DisposeAsync();
}

resultWriter.Write(_jsonClosingBracket);
resultStream.WriteByte(_jsonClosingBracket);
responseSize += 1;
}
else
{
using (result.Response)
{
jsonSerializer.Serialize(resultWriter, result.Response);
await jsonSerializer.SerializeAsync(resultStream, result.Response);
}
}

if (stream is not null)
if (jsonRpcConfig.BufferResponses)
{
ctx.Response.ContentLength = resultWriter.WrittenCount;
stream.Seek(0, SeekOrigin.Begin);
await stream.CopyToAsync(ctx.Response.Body);
ctx.Response.ContentLength = responseSize = resultStream.Length;
resultStream.Seek(0, SeekOrigin.Begin);
await resultStream.CopyToAsync(ctx.Response.Body);
}
}
catch (Exception e) when (e.InnerException is OperationCanceledException)
{
SerializeTimeoutException(jsonRpcService, resultWriter);
responseSize = await SerializeTimeoutException(jsonRpcService, resultStream);
}
catch (OperationCanceledException)
{
SerializeTimeoutException(jsonRpcService, resultWriter);
responseSize = await SerializeTimeoutException(jsonRpcService, resultStream);
}
finally
{
await ctx.Response.CompleteAsync();

if (jsonRpcConfig.BufferResponses)
{
await resultStream.DisposeAsync();
}
}

long handlingTimeMicroseconds = stopwatch.ElapsedMicroseconds();
_ = jsonRpcLocalStats.ReportCall(result.IsCollection
? new RpcReport("# collection serialization #", handlingTimeMicroseconds, true)
: result.Report.Value, handlingTimeMicroseconds, resultWriter.WrittenCount);
: result.Report.Value, handlingTimeMicroseconds, responseSize);

Interlocked.Add(ref Metrics.JsonRpcBytesSentHttp, resultWriter.WrittenCount);
Interlocked.Add(ref Metrics.JsonRpcBytesSentHttp, responseSize);

// There should be only one response because we don't expect multiple JSON tokens in the request
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
using System.Threading;
using System.Threading.Tasks;

using Nethermind.Core.Collections;

namespace Nethermind.Serialization.Json
{
public class EthereumJsonSerializer : IJsonSerializer
Expand Down Expand Up @@ -119,26 +121,21 @@ public long Serialize<T>(Stream stream, T value, bool indented = false, bool lea
JsonSerializer.Serialize(writer, value, indented ? JsonOptionsIndented : _jsonOptions);
countingWriter.Complete();

long outputCount = countingWriter.WrittenCount;
long outputCount = countingWriter.OutputCount;
return outputCount;
}

public async ValueTask<long> SerializeAsync<T>(Stream stream, T value, bool indented = false, bool leaveOpen = true)
{
var writer = GetPipeWriter(stream, leaveOpen);
Serialize(writer, value, indented);
await writer.CompleteAsync();
var countingWriter = GetPipeWriter(stream, leaveOpen);
using var writer = new Utf8JsonWriter(countingWriter, new JsonWriterOptions() { SkipValidation = true, Indented = indented });
JsonSerializer.Serialize(writer, value, indented ? JsonOptionsIndented : _jsonOptions);
await countingWriter.CompleteAsync();

long outputCount = writer.WrittenCount;
long outputCount = countingWriter.OutputCount;
return outputCount;
}

public void Serialize<T>(IBufferWriter<byte> writer, T value, bool indented = false)
{
using var jsonWriter = new Utf8JsonWriter(writer, new JsonWriterOptions() { SkipValidation = true, Indented = indented });
JsonSerializer.Serialize(jsonWriter, value, indented ? JsonOptionsIndented : _jsonOptions);
}

public static void SerializeToStream<T>(Stream stream, T value, bool indented = false)
{
JsonSerializer.Serialize(stream, value, indented ? JsonOptionsIndented : JsonOptions);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;

namespace Nethermind.Serialization.Json
Expand All @@ -15,6 +13,5 @@ public interface IJsonSerializer
string Serialize<T>(T value, bool indented = false);
long Serialize<T>(Stream stream, T value, bool indented = false, bool leaveOpen = true);
ValueTask<long> SerializeAsync<T>(Stream stream, T value, bool indented = false, bool leaveOpen = true);
void Serialize<T>(IBufferWriter<byte> writer, T value, bool indented = false);
}
}
42 changes: 7 additions & 35 deletions src/Nethermind/Nethermind.Serialization.Json/StreamPipeWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,7 @@ namespace Nethermind.Serialization.Json;

#nullable enable

public interface ICountingBufferWriter : IBufferWriter<byte>
{
public long WrittenCount { get; }
}

public sealed class CountingPipeWriter : ICountingBufferWriter
{
private readonly PipeWriter _writer;
public long WrittenCount { get; private set; }

public CountingPipeWriter(PipeWriter writer)
{
ArgumentNullException.ThrowIfNull(writer);

_writer = writer;
}

public void Advance(int count)
{
_writer.Advance(count);
WrittenCount += count;
}

public Memory<byte> GetMemory(int sizeHint = 0) => _writer.GetMemory(sizeHint);

public Span<byte> GetSpan(int sizeHint = 0) => _writer.GetSpan(sizeHint);
}

public sealed class CountingStreamPipeWriter : PipeWriter, ICountingBufferWriter
internal sealed class CountingStreamPipeWriter : PipeWriter
{
internal const int InitialSegmentPoolSize = 4; // 16K
internal const int MaxSegmentPoolSize = 256; // 1MB
Expand Down Expand Up @@ -78,7 +50,7 @@ private CancellationTokenSource InternalTokenSource
}
}

public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions? options = null)
public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions options)
{
if (writingStream is null)
{
Expand All @@ -90,18 +62,18 @@ public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions? o
}

InnerStream = writingStream;
_minimumBufferSize = options?.MinimumBufferSize ?? 4096;
_pool = options?.Pool == MemoryPool<byte>.Shared ? null : options?.Pool;
_minimumBufferSize = options.MinimumBufferSize;
_pool = options.Pool == MemoryPool<byte>.Shared ? null : options.Pool;
_maxPooledBufferSize = _pool?.MaxBufferSize ?? -1;
_bufferSegmentPool = new BufferSegmentStack(InitialSegmentPoolSize);
_leaveOpen = options?.LeaveOpen ?? true;
_leaveOpen = options.LeaveOpen;
}

/// <summary>
/// Gets the inner stream that is being written to.
/// </summary>
public Stream InnerStream { get; }
public long WrittenCount { get; set; }
public long OutputCount { get; set; }

/// <inheritdoc />
public override void Advance(int bytes)
Expand All @@ -114,7 +86,7 @@ public override void Advance(int bytes)
_tailBytesBuffered += bytes;
_bytesBuffered += bytes;
_tailMemory = _tailMemory.Slice(bytes);
WrittenCount += bytes;
OutputCount += bytes;

if (_bytesBuffered > _minimumBufferSize)
{
Expand Down