diff --git a/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs b/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs index d08c96b0253..4d8beea6e7f 100644 --- a/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs +++ b/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs @@ -37,9 +37,9 @@ namespace Nethermind.Runner.JsonRpc { public class Startup { - private static readonly byte _jsonOpeningBracket = Convert.ToByte('['); - private static readonly byte _jsonComma = Convert.ToByte(','); - private static readonly byte _jsonClosingBracket = Convert.ToByte(']'); + private static ReadOnlySpan _jsonOpeningBracket => new byte[] { (byte)'[' }; + private static ReadOnlySpan _jsonComma => new byte[] { (byte)',' }; + private static ReadOnlySpan _jsonClosingBracket => new byte[] { (byte)']' }; public void ConfigureServices(IServiceCollection services) { @@ -54,7 +54,6 @@ public void ConfigureServices(IServiceCollection services) services.Configure(options => { - options.AllowSynchronousIO = true; options.Limits.MaxRequestBodySize = jsonRpcConfig.MaxRequestBodySize; options.ConfigureHttpsDefaults(co => co.SslProtocols |= SslProtocols.Tls13); }); @@ -77,10 +76,10 @@ public void ConfigureServices(IServiceCollection services) public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IJsonRpcProcessor jsonRpcProcessor, IJsonRpcService jsonRpcService, IJsonRpcLocalStats jsonRpcLocalStats, IJsonSerializer jsonSerializer) { - ValueTask SerializeTimeoutException(IJsonRpcService service, Stream resultStream) + void SerializeTimeoutException(IJsonRpcService service, IBufferWriter resultStream) { JsonRpcErrorResponse? error = service.GetErrorResponse(ErrorCodes.Timeout, "Request was canceled due to enabled timeout."); - return jsonSerializer.SerializeAsync(resultStream, error); + jsonSerializer.Serialize(resultStream, error); } if (env.IsDevelopment()) @@ -156,7 +155,7 @@ ValueTask SerializeTimeoutException(IJsonRpcService service, Stream result JsonRpcErrorResponse? response = jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Authentication error"); ctx.Response.ContentType = "application/json"; ctx.Response.StatusCode = StatusCodes.Status403Forbidden; - await jsonSerializer.SerializeAsync(ctx.Response.Body, response); + jsonSerializer.Serialize(ctx.Response.BodyWriter, response); await ctx.Response.CompleteAsync(); return; } @@ -166,11 +165,12 @@ ValueTask SerializeTimeoutException(IJsonRpcService service, Stream result try { JsonRpcContext jsonRpcContext = JsonRpcContext.Http(jsonRpcUrl); + long totalResponseSize = 0; await foreach (JsonRpcResult result in jsonRpcProcessor.ProcessAsync(request, jsonRpcContext)) { - Stream resultStream = jsonRpcConfig.BufferResponses ? new MemoryStream() : ctx.Response.Body; + Stream stream = jsonRpcConfig.BufferResponses ? new MemoryStream() : null; + ICountingBufferWriter resultWriter = stream is not null ? new CountingStreamPipeWriter(stream) : new CountingPipeWriter(ctx.Response.BodyWriter); - long responseSize = 0; try { ctx.Response.ContentType = "application/json"; @@ -178,8 +178,7 @@ ValueTask SerializeTimeoutException(IJsonRpcService service, Stream result if (result.IsCollection) { - resultStream.WriteByte(_jsonOpeningBracket); - responseSize += 1; + resultWriter.Write(_jsonOpeningBracket); bool first = true; JsonRpcBatchResultAsyncEnumerator enumerator = result.BatchedResponses.GetAsyncEnumerator(CancellationToken.None); try @@ -191,18 +190,17 @@ ValueTask SerializeTimeoutException(IJsonRpcService service, Stream result { if (!first) { - resultStream.WriteByte(_jsonComma); - responseSize += 1; + resultWriter.Write(_jsonComma); } first = false; - responseSize += await jsonSerializer.SerializeAsync(resultStream, entry.Response); + jsonSerializer.Serialize(resultWriter, entry.Response); _ = jsonRpcLocalStats.ReportCall(entry.Report); // We reached the limit and don't want to responded to more request in the batch - if (!jsonRpcContext.IsAuthenticated && responseSize > jsonRpcConfig.MaxBatchResponseBodySize) + if (!jsonRpcContext.IsAuthenticated && totalResponseSize > jsonRpcConfig.MaxBatchResponseBodySize) { - if (logger.IsWarn) logger.Warn($"The max batch response body size exceeded. The current response size {responseSize}, and the config setting is JsonRpc.{nameof(jsonRpcConfig.MaxBatchResponseBodySize)} = {jsonRpcConfig.MaxBatchResponseBodySize}"); + if (logger.IsWarn) logger.Warn($"The max batch response body size exceeded. The current response size {totalResponseSize}, and the config setting is JsonRpc.{nameof(jsonRpcConfig.MaxBatchResponseBodySize)} = {jsonRpcConfig.MaxBatchResponseBodySize}"); enumerator.IsStopped = true; } } @@ -213,48 +211,43 @@ ValueTask SerializeTimeoutException(IJsonRpcService service, Stream result await enumerator.DisposeAsync(); } - resultStream.WriteByte(_jsonClosingBracket); - responseSize += 1; + resultWriter.Write(_jsonClosingBracket); } else { using (result.Response) { - await jsonSerializer.SerializeAsync(resultStream, result.Response); + jsonSerializer.Serialize(resultWriter, result.Response); } } - if (jsonRpcConfig.BufferResponses) + if (stream is not null) { - ctx.Response.ContentLength = responseSize = resultStream.Length; - resultStream.Seek(0, SeekOrigin.Begin); - await resultStream.CopyToAsync(ctx.Response.Body); + ctx.Response.ContentLength = resultWriter.WrittenCount; + stream.Seek(0, SeekOrigin.Begin); + await stream.CopyToAsync(ctx.Response.Body); } } catch (Exception e) when (e.InnerException is OperationCanceledException) { - responseSize = await SerializeTimeoutException(jsonRpcService, resultStream); + SerializeTimeoutException(jsonRpcService, resultWriter); } catch (OperationCanceledException) { - responseSize = await SerializeTimeoutException(jsonRpcService, resultStream); + SerializeTimeoutException(jsonRpcService, resultWriter); } finally { await ctx.Response.CompleteAsync(); - - if (jsonRpcConfig.BufferResponses) - { - await resultStream.DisposeAsync(); - } } long handlingTimeMicroseconds = stopwatch.ElapsedMicroseconds(); _ = jsonRpcLocalStats.ReportCall(result.IsCollection ? new RpcReport("# collection serialization #", handlingTimeMicroseconds, true) - : result.Report.Value, handlingTimeMicroseconds, responseSize); + : result.Report.Value, handlingTimeMicroseconds, resultWriter.WrittenCount); - Interlocked.Add(ref Metrics.JsonRpcBytesSentHttp, responseSize); + totalResponseSize += resultWriter.WrittenCount; + Interlocked.Add(ref Metrics.JsonRpcBytesSentHttp, resultWriter.WrittenCount); // There should be only one response because we don't expect multiple JSON tokens in the request break; diff --git a/src/Nethermind/Nethermind.Serialization.Json/EthereumJsonSerializer.cs b/src/Nethermind/Nethermind.Serialization.Json/EthereumJsonSerializer.cs index 86189982c5c..c4af68f59c5 100644 --- a/src/Nethermind/Nethermind.Serialization.Json/EthereumJsonSerializer.cs +++ b/src/Nethermind/Nethermind.Serialization.Json/EthereumJsonSerializer.cs @@ -11,8 +11,6 @@ using System.Threading; using System.Threading.Tasks; -using Nethermind.Core.Collections; - namespace Nethermind.Serialization.Json { public class EthereumJsonSerializer : IJsonSerializer @@ -121,21 +119,26 @@ public long Serialize(Stream stream, T value, bool indented = false, bool lea JsonSerializer.Serialize(writer, value, indented ? JsonOptionsIndented : _jsonOptions); countingWriter.Complete(); - long outputCount = countingWriter.OutputCount; + long outputCount = countingWriter.WrittenCount; return outputCount; } public async ValueTask SerializeAsync(Stream stream, T value, bool indented = false, bool leaveOpen = true) { - var countingWriter = GetPipeWriter(stream, leaveOpen); - using var writer = new Utf8JsonWriter(countingWriter, new JsonWriterOptions() { SkipValidation = true, Indented = indented }); - JsonSerializer.Serialize(writer, value, indented ? JsonOptionsIndented : _jsonOptions); - await countingWriter.CompleteAsync(); + var writer = GetPipeWriter(stream, leaveOpen); + Serialize(writer, value, indented); + await writer.CompleteAsync(); - long outputCount = countingWriter.OutputCount; + long outputCount = writer.WrittenCount; return outputCount; } + public void Serialize(IBufferWriter writer, T value, bool indented = false) + { + using var jsonWriter = new Utf8JsonWriter(writer, new JsonWriterOptions() { SkipValidation = true, Indented = indented }); + JsonSerializer.Serialize(jsonWriter, value, indented ? JsonOptionsIndented : _jsonOptions); + } + public static void SerializeToStream(Stream stream, T value, bool indented = false) { JsonSerializer.Serialize(stream, value, indented ? JsonOptionsIndented : JsonOptions); diff --git a/src/Nethermind/Nethermind.Serialization.Json/IJsonSerializer.cs b/src/Nethermind/Nethermind.Serialization.Json/IJsonSerializer.cs index 16a5ee3168a..6011c5e141b 100644 --- a/src/Nethermind/Nethermind.Serialization.Json/IJsonSerializer.cs +++ b/src/Nethermind/Nethermind.Serialization.Json/IJsonSerializer.cs @@ -1,7 +1,9 @@ // SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited // SPDX-License-Identifier: LGPL-3.0-only +using System.Buffers; using System.IO; +using System.IO.Pipelines; using System.Threading.Tasks; namespace Nethermind.Serialization.Json @@ -13,5 +15,6 @@ public interface IJsonSerializer string Serialize(T value, bool indented = false); long Serialize(Stream stream, T value, bool indented = false, bool leaveOpen = true); ValueTask SerializeAsync(Stream stream, T value, bool indented = false, bool leaveOpen = true); + void Serialize(IBufferWriter writer, T value, bool indented = false); } } diff --git a/src/Nethermind/Nethermind.Serialization.Json/StreamPipeWriter.cs b/src/Nethermind/Nethermind.Serialization.Json/StreamPipeWriter.cs index f791f5d071b..18119af4f99 100644 --- a/src/Nethermind/Nethermind.Serialization.Json/StreamPipeWriter.cs +++ b/src/Nethermind/Nethermind.Serialization.Json/StreamPipeWriter.cs @@ -16,7 +16,35 @@ namespace Nethermind.Serialization.Json; #nullable enable -internal sealed class CountingStreamPipeWriter : PipeWriter +public interface ICountingBufferWriter : IBufferWriter +{ + public long WrittenCount { get; } +} + +public sealed class CountingPipeWriter : ICountingBufferWriter +{ + private readonly PipeWriter _writer; + public long WrittenCount { get; private set; } + + public CountingPipeWriter(PipeWriter writer) + { + ArgumentNullException.ThrowIfNull(writer); + + _writer = writer; + } + + public void Advance(int count) + { + _writer.Advance(count); + WrittenCount += count; + } + + public Memory GetMemory(int sizeHint = 0) => _writer.GetMemory(sizeHint); + + public Span GetSpan(int sizeHint = 0) => _writer.GetSpan(sizeHint); +} + +public sealed class CountingStreamPipeWriter : PipeWriter, ICountingBufferWriter { internal const int InitialSegmentPoolSize = 4; // 16K internal const int MaxSegmentPoolSize = 256; // 1MB @@ -50,7 +78,7 @@ private CancellationTokenSource InternalTokenSource } } - public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions options) + public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions? options = null) { if (writingStream is null) { @@ -62,18 +90,18 @@ public CountingStreamPipeWriter(Stream writingStream, StreamPipeWriterOptions op } InnerStream = writingStream; - _minimumBufferSize = options.MinimumBufferSize; - _pool = options.Pool == MemoryPool.Shared ? null : options.Pool; + _minimumBufferSize = options?.MinimumBufferSize ?? 4096; + _pool = options?.Pool == MemoryPool.Shared ? null : options?.Pool; _maxPooledBufferSize = _pool?.MaxBufferSize ?? -1; _bufferSegmentPool = new BufferSegmentStack(InitialSegmentPoolSize); - _leaveOpen = options.LeaveOpen; + _leaveOpen = options?.LeaveOpen ?? true; } /// /// Gets the inner stream that is being written to. /// public Stream InnerStream { get; } - public long OutputCount { get; set; } + public long WrittenCount { get; set; } /// public override void Advance(int bytes) @@ -86,7 +114,7 @@ public override void Advance(int bytes) _tailBytesBuffered += bytes; _bytesBuffered += bytes; _tailMemory = _tailMemory.Slice(bytes); - OutputCount += bytes; + WrittenCount += bytes; if (_bytesBuffered > _minimumBufferSize) {