Skip to content

Commit

Permalink
FileStream rewrite: Caching the ValueTaskSource in AsyncWindowsFileSt…
Browse files Browse the repository at this point in the history
…reamStrategy (#51363)

* Caching the ValueTaskSource in AsyncWindowsFileStreamStrategy

* Add debug assert to WriteByteSlow for _writePos == _bufferSize

* Fix assertion failure

* Apply suggestions from code review

* Apply suggestions from code review

* some pedantic polishing

* fix the allocation bug: actually return the instance to the "pool"

* don't Dipose the handle if it has default value

* reset _result and _source in the Configure method

* delay the moment when the ValueTaskSource becomes ready to be reused to the moment after _source.SetException|SetResult is called

* apply Stephen suggestions

* implement Stephen idea

* use Reset in explicit way

* remove outdated comment

* address code review feedback

Co-authored-by: carlossanlop <carlossanlop@users.noreply.github.com>
Co-authored-by: David Cantú <dacantu@microsoft.com>
Co-authored-by: Adam Sitnik <adam.sitnik@gmail.com>
  • Loading branch information
4 people authored Apr 16, 2021
1 parent f26bc6a commit 7878130
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,60 +15,56 @@ internal sealed partial class AsyncWindowsFileStreamStrategy : WindowsFileStream
/// <summary>
/// Type that helps reduce allocations for FileStream.ReadAsync and FileStream.WriteAsync.
/// </summary>
private unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
private sealed unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
{
internal static readonly IOCompletionCallback s_ioCallback = IOCallback;

internal readonly PreAllocatedOverlapped _preallocatedOverlapped;
private readonly AsyncWindowsFileStreamStrategy _strategy;

private ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
private MemoryHandle _handle;
internal ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
private NativeOverlapped* _overlapped;
private CancellationTokenRegistration _cancellationRegistration;
private long _result; // Using long since this needs to be used in Interlocked APIs
#if DEBUG
private bool _cancellationHasBeenRegistered;
#endif

public static ValueTaskSource Create(
AsyncWindowsFileStreamStrategy strategy,
PreAllocatedOverlapped? preallocatedOverlapped,
ReadOnlyMemory<byte> memory)
internal ValueTaskSource(AsyncWindowsFileStreamStrategy strategy)
{
// If the memory passed in is the strategy's internal buffer, we can use the base AwaitableProvider,
// which has a PreAllocatedOverlapped with the memory already pinned. Otherwise, we use the derived
// MemoryAwaitableProvider, which Retains the memory, which will result in less pinning in the case
// where the underlying memory is backed by pre-pinned buffers.
return preallocatedOverlapped != null &&
MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> buffer) &&
preallocatedOverlapped.IsUserObject(buffer.Array) ?
new ValueTaskSource(strategy, preallocatedOverlapped, buffer.Array) :
new MemoryValueTaskSource(strategy, memory);
_strategy = strategy;
_preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, null);

_source.RunContinuationsAsynchronously = true;
}

protected ValueTaskSource(
AsyncWindowsFileStreamStrategy strategy,
PreAllocatedOverlapped? preallocatedOverlapped,
byte[]? bytes)
internal NativeOverlapped* Configure(ReadOnlyMemory<byte> memory)
{
_strategy = strategy;
_result = TaskSourceCodes.NoResult;

_source = default;
_source.RunContinuationsAsynchronously = true;

_overlapped = bytes != null &&
_strategy.CompareExchangeCurrentOverlappedOwner(this, null) == null ?
_strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(preallocatedOverlapped!) : // allocated when buffer was created, and buffer is non-null
_strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(s_ioCallback, this, bytes);
_handle = memory.Pin();
_overlapped = _strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped);

Debug.Assert(_overlapped != null, "AllocateNativeOverlapped returned null");
return _overlapped;
}

internal NativeOverlapped* Overlapped => _overlapped;
public ValueTaskSourceStatus GetStatus(short token) => _source.GetStatus(token);
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _source.OnCompleted(continuation, state, token, flags);
void IValueTaskSource.GetResult(short token) => _source.GetResult(token);
int IValueTaskSource<int>.GetResult(short token) => _source.GetResult(token);
void IValueTaskSource.GetResult(short token) => GetResultAndRelease(token);
int IValueTaskSource<int>.GetResult(short token) => GetResultAndRelease(token);

private int GetResultAndRelease(short token)
{
try
{
return _source.GetResult(token);
}
finally
{
// The instance is ready to be reused
_strategy.TryToReuse(this);
}
}

internal short Version => _source.Version;

internal void RegisterForCancellation(CancellationToken cancellationToken)
Expand Down Expand Up @@ -106,8 +102,10 @@ internal void RegisterForCancellation(CancellationToken cancellationToken)
}
}

internal virtual void ReleaseNativeResource()
internal void ReleaseNativeResource()
{
_handle.Dispose();

// Ensure that cancellation has been completed and cleaned up.
_cancellationRegistration.Dispose();

Expand All @@ -119,27 +117,11 @@ internal virtual void ReleaseNativeResource()
_strategy._fileHandle.ThreadPoolBinding!.FreeNativeOverlapped(_overlapped);
_overlapped = null;
}

// Ensure we're no longer set as the current AwaitableProvider (we may not have been to begin with).
// Only one operation at a time is eligible to use the preallocated overlapped
_strategy.CompareExchangeCurrentOverlappedOwner(null, this);
}

private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
{
// Extract the AwaitableProvider from the overlapped. The state in the overlapped
// will either be a AsyncWindowsFileStreamStrategy (in the case where the preallocated overlapped was used),
// in which case the operation being completed is its _currentOverlappedOwner, or it'll
// be directly the AwaitableProvider that's completing (in the case where the preallocated
// overlapped was already in use by another operation).
object? state = ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
Debug.Assert(state is AsyncWindowsFileStreamStrategy or ValueTaskSource);
ValueTaskSource valueTaskSource = state switch
{
AsyncWindowsFileStreamStrategy strategy => strategy._currentOverlappedOwner!, // must be owned
_ => (ValueTaskSource)state
};
Debug.Assert(valueTaskSource != null);
ValueTaskSource valueTaskSource = (ValueTaskSource)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped)!;
Debug.Assert(valueTaskSource._overlapped == pOverlapped, "Overlaps don't match");

// Handle reading from & writing to closed pipes. While I'm not sure
Expand Down Expand Up @@ -224,28 +206,5 @@ private void Cancel(CancellationToken token)
}
}
}

/// <summary>
/// Extends <see cref="ValueTaskSource"/> with to support disposing of a
/// <see cref="MemoryHandle"/> when the operation has completed. This should only be used
/// when memory doesn't wrap a byte[].
/// </summary>
private sealed class MemoryValueTaskSource : ValueTaskSource
{
private MemoryHandle _handle; // mutable struct; do not make this readonly

// this type handles the pinning, so bytes are null
internal unsafe MemoryValueTaskSource(AsyncWindowsFileStreamStrategy strategy, ReadOnlyMemory<byte> memory)
: base(strategy, null, null) // this type handles the pinning, so null is passed for bytes to the base
{
_handle = memory.Pin();
}

internal override void ReleaseNativeResource()
{
_handle.Dispose();
base.ReleaseNativeResource();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ namespace System.IO.Strategies
{
internal sealed partial class AsyncWindowsFileStreamStrategy : WindowsFileStreamStrategy
{
private PreAllocatedOverlapped? _preallocatedOverlapped; // optimization for async ops to avoid per-op allocations
private ValueTaskSource? _currentOverlappedOwner; // async op currently using the preallocated overlapped
private ValueTaskSource? _reusableValueTaskSource; // reusable ValueTaskSource that is currently NOT being used

internal AsyncWindowsFileStreamStrategy(SafeFileHandle handle, FileAccess access, FileShare share)
: base(handle, access, share)
Expand All @@ -32,7 +31,7 @@ public override ValueTask DisposeAsync()
ValueTask result = base.DisposeAsync();
Debug.Assert(result.IsCompleted, "the method must be sync, as it performs no flushing");

_preallocatedOverlapped?.Dispose();
Interlocked.Exchange(ref _reusableValueTaskSource, null)?._preallocatedOverlapped.Dispose();

return result;
}
Expand All @@ -43,7 +42,7 @@ protected override void Dispose(bool disposing)
// before _preallocatedOverlapped is disposed
base.Dispose(disposing);

_preallocatedOverlapped?.Dispose();
Interlocked.Exchange(ref _reusableValueTaskSource, null)?._preallocatedOverlapped.Dispose();
}

protected override void OnInitFromHandle(SafeFileHandle handle)
Expand Down Expand Up @@ -103,17 +102,16 @@ protected override void OnInit()
}
}

// called by BufferedFileStreamStrategy
internal override void OnBufferAllocated(byte[] buffer)
private void TryToReuse(ValueTaskSource source)
{
Debug.Assert(_preallocatedOverlapped == null);
source._source.Reset();

_preallocatedOverlapped = new PreAllocatedOverlapped(ValueTaskSource.s_ioCallback, this, buffer);
if (Interlocked.CompareExchange(ref _reusableValueTaskSource, source, null) is not null)
{
source._preallocatedOverlapped.Dispose();
}
}

private ValueTaskSource? CompareExchangeCurrentOverlappedOwner(ValueTaskSource? newSource, ValueTaskSource? existingSource)
=> Interlocked.CompareExchange(ref _currentOverlappedOwner, newSource, existingSource);

public override int Read(byte[] buffer, int offset, int count)
{
ValueTask<int> vt = ReadAsyncInternal(new Memory<byte>(buffer, offset, count), CancellationToken.None);
Expand All @@ -137,9 +135,14 @@ private unsafe ValueTask<int> ReadAsyncInternal(Memory<byte> destination, Cancel

Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed");

// Create and store async stream class library specific data in the async result
ValueTaskSource valueTaskSource = ValueTaskSource.Create(this, _preallocatedOverlapped, destination);
NativeOverlapped* intOverlapped = valueTaskSource.Overlapped;
// valueTaskSource is not null when:
// - First time calling ReadAsync in buffered mode
// - Second+ time calling ReadAsync, both buffered or unbuffered
// - On buffered flush, when source memory is also the internal buffer
// valueTaskSource is null when:
// - First time calling ReadAsync in unbuffered mode
ValueTaskSource valueTaskSource = Interlocked.Exchange(ref _reusableValueTaskSource, null) ?? new ValueTaskSource(this);
NativeOverlapped* intOverlapped = valueTaskSource.Configure(destination);

// Calculate position in the file we should be at after the read is done
long positionBefore = _filePosition;
Expand Down Expand Up @@ -195,6 +198,7 @@ private unsafe ValueTask<int> ReadAsyncInternal(Memory<byte> destination, Cancel
// Failure to do so looks like we are freeing a pending overlapped later.
intOverlapped->InternalLow = IntPtr.Zero;
valueTaskSource.ReleaseNativeResource();
TryToReuse(valueTaskSource);
return new ValueTask<int>(0);
}
else if (errorCode != Interop.Errors.ERROR_IO_PENDING)
Expand All @@ -205,6 +209,7 @@ private unsafe ValueTask<int> ReadAsyncInternal(Memory<byte> destination, Cancel
}

valueTaskSource.ReleaseNativeResource();
TryToReuse(valueTaskSource);

if (errorCode == Interop.Errors.ERROR_HANDLE_EOF)
{
Expand Down Expand Up @@ -253,9 +258,14 @@ private unsafe ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, Cancell

Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed");

// Create and store async stream class library specific data in the async result
ValueTaskSource valueTaskSource = ValueTaskSource.Create(this, _preallocatedOverlapped, source);
NativeOverlapped* intOverlapped = valueTaskSource.Overlapped;
// valueTaskSource is not null when:
// - First time calling WriteAsync in buffered mode
// - Second+ time calling WriteAsync, both buffered or unbuffered
// - On buffered flush, when source memory is also the internal buffer
// valueTaskSource is null when:
// - First time calling WriteAsync in unbuffered mode
ValueTaskSource valueTaskSource = Interlocked.Exchange(ref _reusableValueTaskSource, null) ?? new ValueTaskSource(this);
NativeOverlapped* intOverlapped = valueTaskSource.Configure(source);

long positionBefore = _filePosition;
if (CanSeek)
Expand Down Expand Up @@ -293,6 +303,7 @@ private unsafe ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, Cancell
// Not an error, but EOF. AsyncFSCallback will NOT be called.
// Completing TCS and return cached task allowing the GC to collect TCS.
valueTaskSource.ReleaseNativeResource();
TryToReuse(valueTaskSource);
return ValueTask.CompletedTask;
}
else if (errorCode != Interop.Errors.ERROR_IO_PENDING)
Expand All @@ -303,6 +314,7 @@ private unsafe ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, Cancell
}

valueTaskSource.ReleaseNativeResource();
TryToReuse(valueTaskSource);

if (errorCode == Interop.Errors.ERROR_HANDLE_EOF)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Win32.SafeHandles;
Expand Down Expand Up @@ -462,7 +463,7 @@ private async ValueTask<int> ReadAsyncSlowPath(Task semaphoreLockTask, Memory<by
// If there was anything in the write buffer, clear it.
if (_writePos > 0)
{
await _strategy.WriteAsync(new ReadOnlyMemory<byte>(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
await _strategy.WriteAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
_writePos = 0;
}

Expand All @@ -474,7 +475,7 @@ private async ValueTask<int> ReadAsyncSlowPath(Task semaphoreLockTask, Memory<by

// Ok. We can fill the buffer:
EnsureBufferAllocated();
_readLen = await _strategy.ReadAsync(new Memory<byte>(_buffer, 0, _bufferSize), cancellationToken).ConfigureAwait(false);
_readLen = await _strategy.ReadAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, 0, _bufferSize), cancellationToken).ConfigureAwait(false);

bytesFromBuffer = Math.Min(_readLen, buffer.Length);
_buffer.AsSpan(0, bytesFromBuffer).CopyTo(buffer.Span);
Expand Down Expand Up @@ -604,6 +605,7 @@ private void WriteByteSlow(byte value)
}
else
{
Debug.Assert(_writePos <= _bufferSize);
FlushWrite();
}

Expand Down Expand Up @@ -731,7 +733,7 @@ private async ValueTask WriteAsyncSlowPath(Task semaphoreLockTask, ReadOnlyMemor
}
}

await _strategy.WriteAsync(new ReadOnlyMemory<byte>(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
await _strategy.WriteAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
_writePos = 0;
}

Expand Down Expand Up @@ -832,7 +834,7 @@ private async Task FlushAsyncInternal(CancellationToken cancellationToken)
{
if (_writePos > 0)
{
await _strategy.WriteAsync(new ReadOnlyMemory<byte>(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
await _strategy.WriteAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
_writePos = 0;
Debug.Assert(_writePos == 0 && _readPos == 0 && _readLen == 0);
return;
Expand Down Expand Up @@ -886,13 +888,13 @@ private async Task CopyToAsyncCore(Stream destination, int bufferSize, Cancellat
{
// If there's any read data in the buffer, write it all to the destination stream.
Debug.Assert(_writePos == 0, "Write buffer must be empty if there's data in the read buffer");
await destination.WriteAsync(new ReadOnlyMemory<byte>(_buffer, _readPos, readBytes), cancellationToken).ConfigureAwait(false);
await destination.WriteAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, _readPos, readBytes), cancellationToken).ConfigureAwait(false);
_readPos = _readLen = 0;
}
else if (_writePos > 0)
{
// If there's write data in the buffer, flush it back to the underlying stream, as does ReadAsync.
await _strategy.WriteAsync(new ReadOnlyMemory<byte>(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
await _strategy.WriteAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
_writePos = 0;
}

Expand Down Expand Up @@ -1067,7 +1069,8 @@ private void EnsureBufferAllocated()

void AllocateBuffer() // logic kept in a separate method to get EnsureBufferAllocated() inlined
{
_strategy.OnBufferAllocated(_buffer = new byte[_bufferSize]);
_buffer = GC.AllocateUninitializedArray<byte>(_bufferSize,
pinned: true); // this allows us to avoid pinning when the buffer is used for the syscalls
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,5 @@ internal abstract class FileStreamStrategy : Stream
internal abstract void Flush(bool flushToDisk);

internal abstract void DisposeInternal(bool disposing);

internal virtual void OnBufferAllocated(byte[] buffer) { }
}
}

0 comments on commit 7878130

Please sign in to comment.