Skip to content

Commit

Permalink
Ensure ReadBufferState resets any BOM offsets every time the buffer i…
Browse files Browse the repository at this point in the history
…s advanced. (#78221)

* Ensure the async reader state resets the BOM offset in every AdvanceBuffer() call.

* Add BOM insertion to async serialization stress testing

* Update src/libraries/System.Text.Json/tests/System.Text.Json.Tests/Serialization/JsonSerializerWrapper.Reflection.cs
  • Loading branch information
eiriktsarpalis committed Nov 11, 2022
1 parent 0eb7d71 commit 2b1f57e
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public void AdvanceBuffer(int bytesConsumed)
// Copy the unprocessed data to the new buffer while shifting the processed bytes.
Buffer.BlockCopy(oldBuffer, _offset + bytesConsumed, newBuffer, 0, _count);
_buffer = newBuffer;
_offset = 0;
_maxCount = _count;

// Clear and return the old buffer
Expand All @@ -133,9 +132,10 @@ public void AdvanceBuffer(int bytesConsumed)
{
// Shift the processed bytes to the beginning of buffer to make more room.
Buffer.BlockCopy(_buffer, _offset + bytesConsumed, _buffer, 0, _count);
_offset = 0;
}
}

_offset = 0;
}

private void ProcessReadBytes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,31 @@ public static void InvalidJsonShouldFailAtAnyPosition_Sequence(
Assert.Equal(expectedFailure.Column, ex.BytePositionInLine);
}

[Fact]
public static async Task BomHandlingRegressionTest()
{
byte[] utf8Bom = Encoding.UTF8.GetPreamble();
byte[] json = """{ "Value" : "Hello" }"""u8.ToArray();

using var stream = new MemoryStream();
stream.Write(utf8Bom, 0, utf8Bom.Length);
stream.Write(json, 0, json.Length);
stream.Position = 0;

var options = new JsonSerializerOptions
{
DefaultBufferSize = 32
};

Test result = await JsonSerializer.DeserializeAsync<Test>(stream, options);
Assert.Equal("Hello", result.Value);
}

private class Test
{
public string Value { get; set; }
}

private class Chunk : ReadOnlySequenceSegment<byte>
{
public Chunk(string json, int firstSegmentLength)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.Text.Json.Nodes;
Expand All @@ -15,9 +16,9 @@ public abstract partial class JsonSerializerWrapper
public static JsonSerializerWrapper SpanSerializer { get; } = new SpanSerializerWrapper();
public static JsonSerializerWrapper StringSerializer { get; } = new StringSerializerWrapper();
public static StreamingJsonSerializerWrapper AsyncStreamSerializer { get; } = new AsyncStreamSerializerWrapper();
public static StreamingJsonSerializerWrapper AsyncStreamSerializerWithSmallBuffer { get; } = new AsyncStreamSerializerWrapper(forceSmallBufferInOptions: true);
public static StreamingJsonSerializerWrapper AsyncStreamSerializerWithSmallBuffer { get; } = new AsyncStreamSerializerWrapper(forceSmallBufferInOptions: true, forceBomInsertions: true);
public static StreamingJsonSerializerWrapper SyncStreamSerializer { get; } = new SyncStreamSerializerWrapper();
public static StreamingJsonSerializerWrapper SyncStreamSerializerWithSmallBuffer { get; } = new SyncStreamSerializerWrapper(forceSmallBufferInOptions: true);
public static StreamingJsonSerializerWrapper SyncStreamSerializerWithSmallBuffer { get; } = new SyncStreamSerializerWrapper(forceSmallBufferInOptions: true, forceBomInsertions: true);
public static JsonSerializerWrapper ReaderWriterSerializer { get; } = new ReaderWriterSerializerWrapper();
public static JsonSerializerWrapper DocumentSerializer { get; } = new DocumentSerializerWrapper();
public static JsonSerializerWrapper ElementSerializer { get; } = new ElementSerializerWrapper();
Expand Down Expand Up @@ -120,17 +121,22 @@ public override Task<object> DeserializeWrapper(string json, Type type, JsonSeri
private class AsyncStreamSerializerWrapper : StreamingJsonSerializerWrapper
{
private readonly bool _forceSmallBufferInOptions;
private readonly bool _forceBomInsertions;

public override bool IsAsyncSerializer => true;

public AsyncStreamSerializerWrapper(bool forceSmallBufferInOptions = false)
public AsyncStreamSerializerWrapper(bool forceSmallBufferInOptions = false, bool forceBomInsertions = false)
{
_forceSmallBufferInOptions = forceSmallBufferInOptions;
_forceBomInsertions = forceBomInsertions;
}

private JsonSerializerOptions? ResolveOptionsInstance(JsonSerializerOptions? options)
=> _forceSmallBufferInOptions ? JsonSerializerOptionsSmallBufferMapper.ResolveOptionsInstanceWithSmallBuffer(options) : options;

private Stream ResolveReadStream(Stream stream)
=> stream is not null && _forceBomInsertions ? new Utf8BomInsertingStream(stream) : stream;

public override Task SerializeWrapper<T>(Stream utf8Json, T value, JsonSerializerOptions options = null)
{
return JsonSerializer.SerializeAsync<T>(utf8Json, value, ResolveOptionsInstance(options));
Expand All @@ -153,38 +159,43 @@ public override Task SerializeWrapper(Stream stream, object value, Type inputTyp

public override async Task<T> DeserializeWrapper<T>(Stream utf8Json, JsonSerializerOptions options = null)
{
return await JsonSerializer.DeserializeAsync<T>(utf8Json, ResolveOptionsInstance(options));
return await JsonSerializer.DeserializeAsync<T>(ResolveReadStream(utf8Json), ResolveOptionsInstance(options));
}

public override async Task<object> DeserializeWrapper(Stream utf8Json, Type returnType, JsonSerializerOptions options = null)
{
return await JsonSerializer.DeserializeAsync(utf8Json, returnType, ResolveOptionsInstance(options));
return await JsonSerializer.DeserializeAsync(ResolveReadStream(utf8Json), returnType, ResolveOptionsInstance(options));
}

public override async Task<T> DeserializeWrapper<T>(Stream utf8Json, JsonTypeInfo<T> jsonTypeInfo)
{
return await JsonSerializer.DeserializeAsync<T>(utf8Json, jsonTypeInfo);
return await JsonSerializer.DeserializeAsync<T>(ResolveReadStream(utf8Json), jsonTypeInfo);
}

public override async Task<object> DeserializeWrapper(Stream utf8Json, Type returnType, JsonSerializerContext context)
{
return await JsonSerializer.DeserializeAsync(utf8Json, returnType, context);
return await JsonSerializer.DeserializeAsync(ResolveReadStream(utf8Json), returnType, context);
}
}

private class SyncStreamSerializerWrapper : StreamingJsonSerializerWrapper
{
private readonly bool _forceSmallBufferInOptions;
private readonly bool _forceBomInsertions;

public override bool IsAsyncSerializer => false;

public SyncStreamSerializerWrapper(bool forceSmallBufferInOptions = false)
public SyncStreamSerializerWrapper(bool forceSmallBufferInOptions = false, bool forceBomInsertions = false)
{
_forceSmallBufferInOptions = forceSmallBufferInOptions;
_forceBomInsertions = forceBomInsertions;
}

private JsonSerializerOptions? ResolveOptionsInstance(JsonSerializerOptions? options)
=> _forceSmallBufferInOptions ? JsonSerializerOptionsSmallBufferMapper.ResolveOptionsInstanceWithSmallBuffer(options) : options;

public override bool IsAsyncSerializer => false;
private Stream ResolveReadStream(Stream stream)
=> stream is not null && _forceBomInsertions ? new Utf8BomInsertingStream(stream) : stream;

public override Task SerializeWrapper<T>(Stream utf8Json, T value, JsonSerializerOptions options = null)
{
Expand Down Expand Up @@ -212,25 +223,25 @@ public override Task SerializeWrapper(Stream stream, object value, Type inputTyp

public override Task<T> DeserializeWrapper<T>(Stream utf8Json, JsonSerializerOptions options = null)
{
T result = JsonSerializer.Deserialize<T>(utf8Json, ResolveOptionsInstance(options));
T result = JsonSerializer.Deserialize<T>(ResolveReadStream(utf8Json), ResolveOptionsInstance(options));
return Task.FromResult(result);
}

public override Task<object> DeserializeWrapper(Stream utf8Json, Type returnType, JsonSerializerOptions options = null)
{
object result = JsonSerializer.Deserialize(utf8Json, returnType, ResolveOptionsInstance(options));
object result = JsonSerializer.Deserialize(ResolveReadStream(utf8Json), returnType, ResolveOptionsInstance(options));
return Task.FromResult(result);
}

public override Task<T> DeserializeWrapper<T>(Stream utf8Json, JsonTypeInfo<T> jsonTypeInfo)
{
T result = JsonSerializer.Deserialize<T>(utf8Json, jsonTypeInfo);
T result = JsonSerializer.Deserialize<T>(ResolveReadStream(utf8Json), jsonTypeInfo);
return Task.FromResult(result);
}

public override Task<object> DeserializeWrapper(Stream utf8Json, Type returnType, JsonSerializerContext context)
{
object result = JsonSerializer.Deserialize(utf8Json, returnType, context);
object result = JsonSerializer.Deserialize(ResolveReadStream(utf8Json), returnType, context);
return Task.FromResult(result);
}
}
Expand Down Expand Up @@ -653,5 +664,88 @@ public static JsonSerializerOptions ResolveOptionsInstanceWithSmallBuffer(JsonSe
return smallBufferCopy;
}
}

private sealed class Utf8BomInsertingStream : Stream
{
private const int Utf8BomLength = 3;
private readonly static byte[] s_utf8Bom = Encoding.UTF8.GetPreamble();

private readonly Stream _source;
private byte[]? _prefixBytes;
private int _prefixBytesOffset = 0;
private int _prefixBytesCount = 0;

public Utf8BomInsertingStream(Stream source)
{
Debug.Assert(source.CanRead);
_source = source;
}

public override bool CanRead => _source.CanRead;
public override bool CanSeek => false;
public override bool CanWrite => false;

public override int Read(byte[] buffer, int offset, int count)
{
if (_prefixBytes is null)
{
// This is the first read operation; read the first 3 bytes
// from the source to determine if it already includes a BOM.
// Only insert a BOM if it's missing from the source stream.

_prefixBytes = new byte[2 * Utf8BomLength];
int bytesRead = ReadExactlyFromSource(_prefixBytes, Utf8BomLength, Utf8BomLength);

if (_prefixBytes.AsSpan(Utf8BomLength).SequenceEqual(s_utf8Bom))
{
_prefixBytesOffset = Utf8BomLength;
_prefixBytesCount = Utf8BomLength;
}
else
{
s_utf8Bom.CopyTo(_prefixBytes, 0);
_prefixBytesOffset = 0;
_prefixBytesCount = Utf8BomLength + bytesRead;
}
}

int prefixBytesToWrite = Math.Min(_prefixBytesCount, count);
if (prefixBytesToWrite > 0)
{
_prefixBytes.AsSpan(_prefixBytesOffset, prefixBytesToWrite).CopyTo(buffer.AsSpan(offset, count));
_prefixBytesOffset += prefixBytesToWrite;
_prefixBytesCount -= prefixBytesToWrite;
offset += prefixBytesToWrite;
count -= prefixBytesToWrite;
}

return prefixBytesToWrite + _source.Read(buffer, offset, count);
}

private int ReadExactlyFromSource(byte[] buffer, int offset, int count)
{
int totalRead = 0;

while (totalRead < count)
{
int read = _source.Read(buffer, offset + totalRead, count - totalRead);
if (read == 0)
{
break;
}

totalRead += read;
}

return totalRead;
}

public override long Length => throw new NotSupportedException();
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
public override void Flush() => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
}
}
}

0 comments on commit 2b1f57e

Please sign in to comment.