Skip to content

Commit

Permalink
Remove forced serialization of async-over-sync in Stream base methods
Browse files Browse the repository at this point in the history
The base implementation of Stream.BeginRead/Write queue a work items that invoke the abstract Read/Write methods.  When Stream.BeginRead/Write were introduced long ago, for reasons I’m not privy to, someone decided it would be a good idea to add protection to these methods, such that if you try to call either BeginRead or BeginWrite while a previous BeginRead or BeginWrite operation was still in flight, the synchronous call to BeginXx would synchronously block.  Yuck.  Back in .NET Framework 4.5 when we added Stream.Read/WriteAsync, we had to add the base implementations as wrappers for the BeginRead/Write methods, since Read/WriteAsync should pick up the overrides of those methods if they existed.  The idea of propagating that synchronous blocking behavior to Read/WriteAsync was unstomachable, but for compatibility we made it so that Read/WriteAsync would still serialize, just asynchronously (later we added a fast path optimization that would skip BeginRead/Write entirely if they weren’t overridden by the derived type).  That serialization, however, even though it was asynchronous, was also misguided.  In addition to adding overhead, both in terms of needing a semaphore and somewhere to store it and in terms of using that semaphore for every operation, it prevents the concurrent use of read and write.  In general, streams aren’t meant to be used concurrently at all, but some streams are duplex and support up to a single reader and single writer at a time.  This serialization ends up blocking such duplex streams from being used (if they don’t override Read/WriteAsync), but worse, it ends up hiding misuse of streams that shouldn’t be used concurrently by masking the misuse and turning it into behavior that might appear to work but is unlikely to actually be the desired behavior.

This PR deletes that serialization and then cleans up all the cruft that was built up around it.  This is a breaking change, as it’s possible code could have been relying on this (undocumented) protection; the fix for such an app is to stop doing that erroneous concurrent access, which could include applying its own serialization if necessary.

BufferedStream was explicitly using the same serialization mechanism; I left that intact.  BufferedFileStreamStrategy was also using it, as FileStream kinda sorta somewhat tries to enable concurrent (not parallel) usage when useAsync == true on Windows.
  • Loading branch information
stephentoub committed Jun 4, 2021
1 parent aea3ac5 commit bca4acd
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 300 deletions.
51 changes: 18 additions & 33 deletions src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,15 @@ public async Task WriteAsyncInternalBufferOverflow()

public static IEnumerable<object[]> MemberData_FileStreamAsyncWriting()
{
foreach (bool useAsync in new[] { true, false })
foreach (bool preSize in new[] { true, false })
{
if (useAsync && !OperatingSystem.IsWindows())
foreach (bool cancelable in new[] { true, false })
{
// We don't have a special async I/O implementation in FileStream on Unix.
continue;
}

foreach (bool preSize in new[] { true, false })
{
foreach (bool cancelable in new[] { true, false })
{
yield return new object[] { useAsync, preSize, false, cancelable, 0x1000, 0x100, 100 };
yield return new object[] { useAsync, preSize, false, cancelable, 0x1, 0x1, 1000 };
yield return new object[] { useAsync, preSize, true, cancelable, 0x2, 0x100, 100 };
yield return new object[] { useAsync, preSize, false, cancelable, 0x4000, 0x10, 100 };
yield return new object[] { useAsync, preSize, true, cancelable, 0x1000, 99999, 10 };
}
yield return new object[] { preSize, false, cancelable, 0x1000, 0x100, 100 };
yield return new object[] { preSize, false, cancelable, 0x1, 0x1, 1000 };
yield return new object[] { preSize, true, cancelable, 0x2, 0x100, 100 };
yield return new object[] { preSize, false, cancelable, 0x4000, 0x10, 100 };
yield return new object[] { preSize, true, cancelable, 0x1000, 99999, 10 };
}
}
}
Expand All @@ -183,7 +174,6 @@ public Task ManyConcurrentWriteAsyncs()
{
// For inner loop, just test one case
return ManyConcurrentWriteAsyncs_OuterLoop(
useAsync: OperatingSystem.IsWindows(),
presize: false,
exposeHandle: false,
cancelable: true,
Expand All @@ -196,15 +186,15 @@ public Task ManyConcurrentWriteAsyncs()
[MemberData(nameof(MemberData_FileStreamAsyncWriting))]
[OuterLoop] // many combinations: we test just one in inner loop and the rest outer
public async Task ManyConcurrentWriteAsyncs_OuterLoop(
bool useAsync, bool presize, bool exposeHandle, bool cancelable, int bufferSize, int writeSize, int numWrites)
bool presize, bool exposeHandle, bool cancelable, int bufferSize, int writeSize, int numWrites)
{
long totalLength = writeSize * numWrites;
var expectedData = new byte[totalLength];
new Random(42).NextBytes(expectedData);
CancellationToken cancellationToken = cancelable ? new CancellationTokenSource().Token : CancellationToken.None;

string path = GetTestFilePath();
using (FileStream fs = new FileStream(path, FileMode.Create, FileAccess.ReadWrite, FileShare.None, bufferSize, useAsync))
using (FileStream fs = new FileStream(path, FileMode.Create, FileAccess.ReadWrite, FileShare.None, bufferSize, useAsync: true))
{
if (presize)
{
Expand All @@ -220,17 +210,15 @@ public async Task ManyConcurrentWriteAsyncs_OuterLoop(
{
writes[i] = WriteAsync(fs, expectedData, i * writeSize, writeSize, cancellationToken);
Assert.Null(writes[i].Exception);
if (useAsync)

// To ensure that the buffer of a FileStream opened for async IO is flushed
// by FlushAsync in asynchronous way, we aquire a lock for every buffered WriteAsync.
// The side effect of this is that the Position of FileStream is not updated until
// the lock is released by a previous operation.
// So now all WriteAsync calls should be awaited before starting another async file operation.
if (PlatformDetection.IsNet5CompatFileStreamEnabled)
{
// To ensure that the buffer of a FileStream opened for async IO is flushed
// by FlushAsync in asynchronous way, we aquire a lock for every buffered WriteAsync.
// The side effect of this is that the Position of FileStream is not updated until
// the lock is released by a previous operation.
// So now all WriteAsync calls should be awaited before starting another async file operation.
if (PlatformDetection.IsNet5CompatFileStreamEnabled)
{
Assert.Equal((i + 1) * writeSize, fs.Position);
}
Assert.Equal((i + 1) * writeSize, fs.Position);
}
}

Expand All @@ -239,10 +227,7 @@ public async Task ManyConcurrentWriteAsyncs_OuterLoop(

byte[] actualData = File.ReadAllBytes(path);
Assert.Equal(expectedData.Length, actualData.Length);
if (useAsync)
{
Assert.Equal<byte>(expectedData, actualData);
}
AssertExtensions.SequenceEqual(expectedData, actualData);
}

[Theory]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -58,6 +59,7 @@ public sealed class BufferedStream : Stream
// (perf optimization for successive reads of the same size)
// Removing a private default constructor is a breaking change for the DataDebugSerializer.
// Because this ctor was here previously we need to keep it around.
private SemaphoreSlim? _asyncActiveSemaphore; // To serialize async operations.

public BufferedStream(Stream stream)
: this(stream, DefaultBufferSize)
Expand Down Expand Up @@ -136,6 +138,16 @@ private void EnsureBufferAllocated()
_buffer = new byte[_bufferSize];
}

[MemberNotNull(nameof(_asyncActiveSemaphore))]
private SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized() =>
// Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's
// WaitHandle, we don't need to worry about Disposing it in the case of a race condition.
#pragma warning disable CS8774 // We lack a NullIffNull annotation for Volatile.Read
Volatile.Read(ref _asyncActiveSemaphore) ??
#pragma warning restore CS8774
Interlocked.CompareExchange(ref _asyncActiveSemaphore, new SemaphoreSlim(1, 1), null) ??
_asyncActiveSemaphore;

public Stream UnderlyingStream
{
get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ public override int Read(byte[] buffer, int offset, int count)
vt.AsTask().GetAwaiter().GetResult();
}

public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);

public override int EndRead(IAsyncResult asyncResult) =>
TaskToApm.End<int>(asyncResult);

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();

Expand Down Expand Up @@ -207,6 +213,12 @@ private unsafe ValueTask<int> ReadAsyncInternal(Memory<byte> destination, Cancel
public override void Write(byte[] buffer, int offset, int count)
=> WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), CancellationToken.None).AsTask().GetAwaiter().GetResult();

public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);

public override void EndWrite(IAsyncResult asyncResult) =>
TaskToApm.End(asyncResult);

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ internal sealed class BufferedFileStreamStrategy : FileStreamStrategy
{
private readonly FileStreamStrategy _strategy;
private readonly int _bufferSize;
private SemaphoreSlim? _asyncActiveSemaphore;

private byte[]? _buffer;
private int _writePos;
Expand Down Expand Up @@ -46,6 +47,16 @@ internal BufferedFileStreamStrategy(FileStreamStrategy strategy, int bufferSize)
}
}

[MemberNotNull(nameof(_asyncActiveSemaphore))]
private SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized() =>
// Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's
// WaitHandle, we don't need to worry about Disposing it in the case of a race condition.
#pragma warning disable CS8774 // We lack a NullIffNull annotation for Volatile.Read
Volatile.Read(ref _asyncActiveSemaphore) ??
#pragma warning restore CS8774
Interlocked.CompareExchange(ref _asyncActiveSemaphore, new SemaphoreSlim(1, 1), null) ??
_asyncActiveSemaphore;

public override bool CanRead => _strategy.CanRead;

public override bool CanWrite => _strategy.CanWrite;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
// Read is invoked asynchronously. But we can do so using the base Stream's internal helper
// that bypasses delegating to BeginRead, since we already know this is FileStream rather
// than something derived from it and what our BeginRead implementation is going to do.
return (Task<int>)base.BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
return BeginReadInternal(buffer, offset, count, null, null);
}

return ReadAsyncTask(buffer, offset, count, cancellationToken);
Expand All @@ -178,7 +178,7 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
// internal helper that bypasses delegating to BeginRead, since we already know this is FileStream
// rather than something derived from it and what our BeginRead implementation is going to do.
return MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment) ?
new ValueTask<int>((Task<int>)base.BeginReadInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) :
new ValueTask<int>(BeginReadInternal(segment.Array!, segment.Offset, segment.Count, null, null)) :
base.ReadAsync(buffer, cancellationToken);
}

Expand Down Expand Up @@ -245,7 +245,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
// Write is invoked asynchronously. But we can do so using the base Stream's internal helper
// that bypasses delegating to BeginWrite, since we already know this is FileStream rather
// than something derived from it and what our BeginWrite implementation is going to do.
return (Task)base.BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
return BeginWriteInternal(buffer, offset, count, null, null);
}

return WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();
Expand All @@ -260,7 +260,7 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
// internal helper that bypasses delegating to BeginWrite, since we already know this is FileStream
// rather than something derived from it and what our BeginWrite implementation is going to do.
return MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment) ?
new ValueTask((Task)base.BeginWriteInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) :
new ValueTask(BeginWriteInternal(segment.Array!, segment.Offset, segment.Count, null, null)) :
base.WriteAsync(buffer, cancellationToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
// Read is invoked asynchronously. But we can do so using the base Stream's internal helper
// that bypasses delegating to BeginRead, since we already know this is FileStream rather
// than something derived from it and what our BeginRead implementation is going to do.
return (Task<int>)BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
return BeginReadInternal(buffer, offset, count, null, null);
}

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
Expand All @@ -56,7 +56,7 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
// internal helper that bypasses delegating to BeginRead, since we already know this is FileStream
// rather than something derived from it and what our BeginRead implementation is going to do.
return MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment) ?
new ValueTask<int>((Task<int>)BeginReadInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) :
new ValueTask<int>(BeginReadInternal(segment.Array!, segment.Offset, segment.Count, null, null)) :
base.ReadAsync(buffer, cancellationToken);
}

Expand All @@ -79,7 +79,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
// Write is invoked asynchronously. But we can do so using the base Stream's internal helper
// that bypasses delegating to BeginWrite, since we already know this is FileStream rather
// than something derived from it and what our BeginWrite implementation is going to do.
return (Task)BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
return BeginWriteInternal(buffer, offset, count, null, null);
}

public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
Expand All @@ -89,7 +89,7 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
// internal helper that bypasses delegating to BeginWrite, since we already know this is FileStream
// rather than something derived from it and what our BeginWrite implementation is going to do.
return MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment) ?
new ValueTask((Task)BeginWriteInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) :
new ValueTask(BeginWriteInternal(segment.Array!, segment.Offset, segment.Count, null, null)) :
base.WriteAsync(buffer, cancellationToken);
}

Expand Down
Loading

0 comments on commit bca4acd

Please sign in to comment.