diff --git a/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs b/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs index 7937a45a8e3..aa2bd67939f 100644 --- a/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs +++ b/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs @@ -38,9 +38,9 @@ namespace Nethermind.Runner.JsonRpc { public class Startup { - private static ReadOnlySpan _jsonOpeningBracket => new byte[] { (byte)'[' }; - private static ReadOnlySpan _jsonComma => new byte[] { (byte)',' }; - private static ReadOnlySpan _jsonClosingBracket => new byte[] { (byte)']' }; + private static readonly byte _jsonOpeningBracket = Convert.ToByte('['); + private static readonly byte _jsonComma = Convert.ToByte(','); + private static readonly byte _jsonClosingBracket = Convert.ToByte(']'); public void ConfigureServices(IServiceCollection services) { @@ -55,6 +55,7 @@ public void ConfigureServices(IServiceCollection services) services.Configure(options => { + options.AllowSynchronousIO = true; options.Limits.MaxRequestBodySize = jsonRpcConfig.MaxRequestBodySize; options.ConfigureHttpsDefaults(co => co.SslProtocols |= SslProtocols.Tls13); }); @@ -77,10 +78,10 @@ public void ConfigureServices(IServiceCollection services) public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpcProcessor jsonRpcProcessor, IJsonRpcService jsonRpcService, IJsonRpcLocalStats jsonRpcLocalStats, IJsonSerializer jsonSerializer) { - void SerializeTimeoutException(IJsonRpcService service, IBufferWriter resultStream) + ValueTask SerializeTimeoutException(IJsonRpcService service, Stream resultStream) { JsonRpcErrorResponse? error = service.GetErrorResponse(ErrorCodes.Timeout, "Request was canceled due to enabled timeout."); - jsonSerializer.Serialize(resultStream, error); + return jsonSerializer.SerializeAsync(resultStream, error); } if (env.IsDevelopment()) @@ -156,7 +157,7 @@ void SerializeTimeoutException(IJsonRpcService service, IBufferWriter resu JsonRpcErrorResponse? response = jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Authentication error"); ctx.Response.ContentType = "application/json"; ctx.Response.StatusCode = StatusCodes.Status403Forbidden; - jsonSerializer.Serialize(ctx.Response.BodyWriter, response); + await jsonSerializer.SerializeAsync(ctx.Response.Body, response); await ctx.Response.CompleteAsync(); return; } @@ -169,7 +170,9 @@ void SerializeTimeoutException(IJsonRpcService service, IBufferWriter resu await foreach (JsonRpcResult result in jsonRpcProcessor.ProcessAsync(request, jsonRpcContext)) { using Stream stream = jsonRpcConfig.BufferResponses ? RecyclableStream.GetStream("http") : null; - ICountingBufferWriter resultWriter = stream is not null ? new CountingStreamPipeWriter(stream) : new CountingPipeWriter(ctx.Response.BodyWriter); + Stream resultStream = jsonRpcConfig.BufferResponses ? stream : ctx.Response.Body; + + long responseSize = 0; try { ctx.Response.ContentType = "application/json"; @@ -177,7 +180,8 @@ void SerializeTimeoutException(IJsonRpcService service, IBufferWriter resu if (result.IsCollection) { - resultWriter.Write(_jsonOpeningBracket); + resultStream.WriteByte(_jsonOpeningBracket); + responseSize += 1; bool first = true; JsonRpcBatchResultAsyncEnumerator enumerator = result.BatchedResponses.GetAsyncEnumerator(CancellationToken.None); try @@ -189,17 +193,18 @@ void SerializeTimeoutException(IJsonRpcService service, IBufferWriter resu { if (!first) { - resultWriter.Write(_jsonComma); + resultStream.WriteByte(_jsonComma); + responseSize += 1; } first = false; - jsonSerializer.Serialize(resultWriter, entry.Response); + responseSize += await jsonSerializer.SerializeAsync(resultStream, entry.Response); _ = jsonRpcLocalStats.ReportCall(entry.Report); // We reached the limit and don't want to responded to more request in the batch - if (!jsonRpcContext.IsAuthenticated && resultWriter.WrittenCount > jsonRpcConfig.MaxBatchResponseBodySize) + if (!jsonRpcContext.IsAuthenticated && responseSize > jsonRpcConfig.MaxBatchResponseBodySize) { - if (logger.IsWarn) logger.Warn($"The max batch response body size exceeded. The current response size {resultWriter.WrittenCount}, and the config setting is JsonRpc.{nameof(jsonRpcConfig.MaxBatchResponseBodySize)} = {jsonRpcConfig.MaxBatchResponseBodySize}"); + if (logger.IsWarn) logger.Warn($"The max batch response body size exceeded. The current response size {responseSize}, and the config setting is JsonRpc.{nameof(jsonRpcConfig.MaxBatchResponseBodySize)} = {jsonRpcConfig.MaxBatchResponseBodySize}"); enumerator.IsStopped = true; } } @@ -210,42 +215,48 @@ void SerializeTimeoutException(IJsonRpcService service, IBufferWriter resu await enumerator.DisposeAsync(); } - resultWriter.Write(_jsonClosingBracket); + resultStream.WriteByte(_jsonClosingBracket); + responseSize += 1; } else { using (result.Response) { - jsonSerializer.Serialize(resultWriter, result.Response); + await jsonSerializer.SerializeAsync(resultStream, result.Response); } } - if (stream is not null) + if (jsonRpcConfig.BufferResponses) { - ctx.Response.ContentLength = resultWriter.WrittenCount; - stream.Seek(0, SeekOrigin.Begin); - await stream.CopyToAsync(ctx.Response.Body); + ctx.Response.ContentLength = responseSize = resultStream.Length; + resultStream.Seek(0, SeekOrigin.Begin); + await resultStream.CopyToAsync(ctx.Response.Body); } } catch (Exception e) when (e.InnerException is OperationCanceledException) { - SerializeTimeoutException(jsonRpcService, resultWriter); + responseSize = await SerializeTimeoutException(jsonRpcService, resultStream); } catch (OperationCanceledException) { - SerializeTimeoutException(jsonRpcService, resultWriter); + responseSize = await SerializeTimeoutException(jsonRpcService, resultStream); } finally { await ctx.Response.CompleteAsync(); + + if (jsonRpcConfig.BufferResponses) + { + await resultStream.DisposeAsync(); + } } long handlingTimeMicroseconds = stopwatch.ElapsedMicroseconds(); _ = jsonRpcLocalStats.ReportCall(result.IsCollection ? new RpcReport("# collection serialization #", handlingTimeMicroseconds, true) - : result.Report.Value, handlingTimeMicroseconds, resultWriter.WrittenCount); + : result.Report.Value, handlingTimeMicroseconds, responseSize); - Interlocked.Add(ref Metrics.JsonRpcBytesSentHttp, resultWriter.WrittenCount); + Interlocked.Add(ref Metrics.JsonRpcBytesSentHttp, responseSize); // There should be only one response because we don't expect multiple JSON tokens in the request break; diff --git a/src/Nethermind/Nethermind.Serialization.Json/EthereumJsonSerializer.cs b/src/Nethermind/Nethermind.Serialization.Json/EthereumJsonSerializer.cs index c4af68f59c5..86189982c5c 100644 --- a/src/Nethermind/Nethermind.Serialization.Json/EthereumJsonSerializer.cs +++ b/src/Nethermind/Nethermind.Serialization.Json/EthereumJsonSerializer.cs @@ -11,6 +11,8 @@ using System.Threading; using System.Threading.Tasks; +using Nethermind.Core.Collections; + namespace Nethermind.Serialization.Json { public class EthereumJsonSerializer : IJsonSerializer @@ -119,26 +121,21 @@ public long Serialize(Stream stream, T value, bool indented = false, bool lea JsonSerializer.Serialize(writer, value, indented ? JsonOptionsIndented : _jsonOptions); countingWriter.Complete(); - long outputCount = countingWriter.WrittenCount; + long outputCount = countingWriter.OutputCount; return outputCount; } public async ValueTask SerializeAsync(Stream stream, T value, bool indented = false, bool leaveOpen = true) { - var writer = GetPipeWriter(stream, leaveOpen); - Serialize(writer, value, indented); - await writer.CompleteAsync(); + var countingWriter = GetPipeWriter(stream, leaveOpen); + using var writer = new Utf8JsonWriter(countingWriter, new JsonWriterOptions() { SkipValidation = true, Indented = indented }); + JsonSerializer.Serialize(writer, value, indented ? JsonOptionsIndented : _jsonOptions); + await countingWriter.CompleteAsync(); - long outputCount = writer.WrittenCount; + long outputCount = countingWriter.OutputCount; return outputCount; } - public void Serialize(IBufferWriter writer, T value, bool indented = false) - { - using var jsonWriter = new Utf8JsonWriter(writer, new JsonWriterOptions() { SkipValidation = true, Indented = indented }); - JsonSerializer.Serialize(jsonWriter, value, indented ? JsonOptionsIndented : _jsonOptions); - } - public static void SerializeToStream(Stream stream, T value, bool indented = false) { JsonSerializer.Serialize(stream, value, indented ? JsonOptionsIndented : JsonOptions); diff --git a/src/Nethermind/Nethermind.Serialization.Json/IJsonSerializer.cs b/src/Nethermind/Nethermind.Serialization.Json/IJsonSerializer.cs index 6011c5e141b..16a5ee3168a 100644 --- a/src/Nethermind/Nethermind.Serialization.Json/IJsonSerializer.cs +++ b/src/Nethermind/Nethermind.Serialization.Json/IJsonSerializer.cs @@ -1,9 +1,7 @@ // SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited // SPDX-License-Identifier: LGPL-3.0-only -using System.Buffers; using System.IO; -using System.IO.Pipelines; using System.Threading.Tasks; namespace Nethermind.Serialization.Json @@ -15,6 +13,5 @@ public interface IJsonSerializer string Serialize(T value, bool indented = false); long Serialize(Stream stream, T value, bool indented = false, bool leaveOpen = true); ValueTask SerializeAsync(Stream stream, T value, bool indented = false, bool leaveOpen = true); - void Serialize(IBufferWriter writer, T value, bool indented = false); } } diff --git a/src/Nethermind/Nethermind.Serialization.Json/StreamPipeWriter.cs b/src/Nethermind/Nethermind.Serialization.Json/StreamPipeWriter.cs index da1ecd2a3fb..84963f4f4b4 100644 --- a/src/Nethermind/Nethermind.Serialization.Json/StreamPipeWriter.cs +++ b/src/Nethermind/Nethermind.Serialization.Json/StreamPipeWriter.cs @@ -16,35 +16,7 @@ namespace Nethermind.Serialization.Json; #nullable enable -public interface ICountingBufferWriter : IBufferWriter -{ - public long WrittenCount { get; } -} - -public sealed class CountingPipeWriter : ICountingBufferWriter -{ - private readonly PipeWriter _writer; - public long WrittenCount { get; private set; } - - public CountingPipeWriter(PipeWriter writer) - { - ArgumentNullException.ThrowIfNull(writer); - - _writer = writer; - } - - public void Advance(int count) - { - _writer.Advance(count); - WrittenCount += count; - } - - public Memory GetMemory(int sizeHint = 0) => _writer.GetMemory(sizeHint); - - public Span GetSpan(int sizeHint = 0) => _writer.GetSpan(sizeHint); -} - -public sealed class CountingStreamPipeWriter : PipeWriter, ICountingBufferWriter +internal sealed class CountingStreamPipeWriter : PipeWriter { internal const int InitialSegmentPoolSize = 4; // 16K internal const int MaxSegmentPoolSize = 256; // 1MB @@ -78,7 +50,7 @@ private CancellationTokenSource InternalTokenSource } } - public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions? options = null) + public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions options) { if (writingStream is null) { @@ -90,18 +62,18 @@ public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions? o } InnerStream = writingStream; - _minimumBufferSize = options?.MinimumBufferSize ?? 4096; - _pool = options?.Pool == MemoryPool.Shared ? null : options?.Pool; + _minimumBufferSize = options.MinimumBufferSize; + _pool = options.Pool == MemoryPool.Shared ? null : options.Pool; _maxPooledBufferSize = _pool?.MaxBufferSize ?? -1; _bufferSegmentPool = new BufferSegmentStack(InitialSegmentPoolSize); - _leaveOpen = options?.LeaveOpen ?? true; + _leaveOpen = options.LeaveOpen; } /// /// Gets the inner stream that is being written to. /// public Stream InnerStream { get; } - public long WrittenCount { get; set; } + public long OutputCount { get; set; } /// public override void Advance(int bytes) @@ -114,7 +86,7 @@ public override void Advance(int bytes) _tailBytesBuffered += bytes; _bytesBuffered += bytes; _tailMemory = _tailMemory.Slice(bytes); - WrittenCount += bytes; + OutputCount += bytes; if (_bytesBuffered > _minimumBufferSize) {