Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileStream rewrite: Caching the ValueTaskSource in AsyncWindowsFileStreamStrategy #51363

Merged
merged 15 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ internal sealed partial class AsyncWindowsFileStreamStrategy : WindowsFileStream
/// <summary>
/// Type that helps reduce allocations for FileStream.ReadAsync and FileStream.WriteAsync.
/// </summary>
private unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
private sealed unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
{
internal static readonly IOCompletionCallback s_ioCallback = IOCallback;

private MemoryHandle _handle;
private readonly AsyncWindowsFileStreamStrategy _strategy;

internal PreAllocatedOverlapped? _preallocatedOverlapped;
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
private ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
private NativeOverlapped* _overlapped;
private CancellationTokenRegistration _cancellationRegistration;
Expand All @@ -29,37 +29,36 @@ private unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
private bool _cancellationHasBeenRegistered;
#endif

public static ValueTaskSource Create(
AsyncWindowsFileStreamStrategy strategy,
PreAllocatedOverlapped? preallocatedOverlapped,
ReadOnlyMemory<byte> memory)
{
// If the memory passed in is the strategy's internal buffer, we can use the base AwaitableProvider,
// which has a PreAllocatedOverlapped with the memory already pinned. Otherwise, we use the derived
// MemoryAwaitableProvider, which Retains the memory, which will result in less pinning in the case
// where the underlying memory is backed by pre-pinned buffers.
return preallocatedOverlapped != null &&
MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> buffer) &&
preallocatedOverlapped.IsUserObject(buffer.Array) ?
new ValueTaskSource(strategy, preallocatedOverlapped, buffer.Array) :
new MemoryValueTaskSource(strategy, memory);
}

protected ValueTaskSource(
AsyncWindowsFileStreamStrategy strategy,
PreAllocatedOverlapped? preallocatedOverlapped,
byte[]? bytes)
internal ValueTaskSource(AsyncWindowsFileStreamStrategy strategy, byte[]? internalBuffer)
{
_strategy = strategy;
_result = TaskSourceCodes.NoResult;

_source = default;
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
_source.RunContinuationsAsynchronously = true;
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved

_overlapped = bytes != null &&
_strategy.CompareExchangeCurrentOverlappedOwner(this, null) == null ?
_strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(preallocatedOverlapped!) : // allocated when buffer was created, and buffer is non-null
_strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(s_ioCallback, this, bytes);
if (internalBuffer != null)
{
_preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, internalBuffer);
}
}

internal void Configure(ReadOnlyMemory<byte> memory)
{
// The array of preallocatedOverlapped is the same as the memory array when flushing in buffered mode,
// because the ReadOnlyMemory object passed to WriteAsync/ReadAsync is the internal buffer that we need
// to write to disk at the end
if (_preallocatedOverlapped != null &&
MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> arrSegment) &&
_preallocatedOverlapped.IsUserObject(arrSegment.Array))
{
_overlapped = _strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped);
}
else
{
_handle = memory.Pin();
_overlapped = _strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(s_ioCallback, this, null);
}
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved

Debug.Assert(_overlapped != null, "AllocateNativeOverlapped returned null");
}
Expand Down Expand Up @@ -108,6 +107,8 @@ internal void RegisterForCancellation(CancellationToken cancellationToken)

internal virtual void ReleaseNativeResource()
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
{
_handle.Dispose();

// Ensure that cancellation has been completed and cleaned up.
_cancellationRegistration.Dispose();

Expand All @@ -120,25 +121,19 @@ internal virtual void ReleaseNativeResource()
_overlapped = null;
}

// Ensure we're no longer set as the current AwaitableProvider (we may not have been to begin with).
// Ensure we're no longer set as the current ValueTaskSource (we may not have been to begin with).
// Only one operation at a time is eligible to use the preallocated overlapped
_strategy.CompareExchangeCurrentOverlappedOwner(null, this);
}

private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
{
// Extract the AwaitableProvider from the overlapped. The state in the overlapped
// Extract the ValueTaskSource from the overlapped. The state in the overlapped
// will either be a AsyncWindowsFileStreamStrategy (in the case where the preallocated overlapped was used),
// in which case the operation being completed is its _currentOverlappedOwner, or it'll
// be directly the AwaitableProvider that's completing (in the case where the preallocated
// be directly the ValueTaskSource that's completing (in the case where the preallocated
// overlapped was already in use by another operation).
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
object? state = ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
Debug.Assert(state is AsyncWindowsFileStreamStrategy or ValueTaskSource);
ValueTaskSource valueTaskSource = state switch
{
AsyncWindowsFileStreamStrategy strategy => strategy._currentOverlappedOwner!, // must be owned
_ => (ValueTaskSource)state
};
ValueTaskSource valueTaskSource = (ValueTaskSource)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
Debug.Assert(valueTaskSource != null);
Debug.Assert(valueTaskSource._overlapped == pOverlapped, "Overlaps don't match");

Expand Down Expand Up @@ -224,28 +219,5 @@ private void Cancel(CancellationToken token)
}
}
}

/// <summary>
/// Extends <see cref="ValueTaskSource"/> with to support disposing of a
/// <see cref="MemoryHandle"/> when the operation has completed. This should only be used
/// when memory doesn't wrap a byte[].
/// </summary>
private sealed class MemoryValueTaskSource : ValueTaskSource
{
private MemoryHandle _handle; // mutable struct; do not make this readonly

// this type handles the pinning, so bytes are null
internal unsafe MemoryValueTaskSource(AsyncWindowsFileStreamStrategy strategy, ReadOnlyMemory<byte> memory)
: base(strategy, null, null) // this type handles the pinning, so null is passed for bytes to the base
{
_handle = memory.Pin();
}

internal override void ReleaseNativeResource()
{
_handle.Dispose();
base.ReleaseNativeResource();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace System.IO.Strategies
{
internal sealed partial class AsyncWindowsFileStreamStrategy : WindowsFileStreamStrategy
{
private PreAllocatedOverlapped? _preallocatedOverlapped; // optimization for async ops to avoid per-op allocations
private ValueTaskSource? _currentOverlappedOwner; // async op currently using the preallocated overlapped
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved

internal AsyncWindowsFileStreamStrategy(SafeFileHandle handle, FileAccess access, FileShare share)
Expand All @@ -32,7 +31,7 @@ public override ValueTask DisposeAsync()
ValueTask result = base.DisposeAsync();
Debug.Assert(result.IsCompleted, "the method must be sync, as it performs no flushing");

_preallocatedOverlapped?.Dispose();
_currentOverlappedOwner?._preallocatedOverlapped?.Dispose();

return result;
}
Expand All @@ -43,7 +42,7 @@ protected override void Dispose(bool disposing)
// before _preallocatedOverlapped is disposed
base.Dispose(disposing);

_preallocatedOverlapped?.Dispose();
_currentOverlappedOwner?._preallocatedOverlapped?.Dispose();
}

protected override void OnInitFromHandle(SafeFileHandle handle)
Expand Down Expand Up @@ -106,9 +105,8 @@ protected override void OnInit()
// called by BufferedFileStreamStrategy
internal override void OnBufferAllocated(byte[] buffer)
{
Debug.Assert(_preallocatedOverlapped == null);

_preallocatedOverlapped = new PreAllocatedOverlapped(ValueTaskSource.s_ioCallback, this, buffer);
Debug.Assert(_currentOverlappedOwner == null);
_currentOverlappedOwner = new ValueTaskSource(this, buffer);
}
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved

private ValueTaskSource? CompareExchangeCurrentOverlappedOwner(ValueTaskSource? newSource, ValueTaskSource? existingSource)
Expand Down Expand Up @@ -137,8 +135,18 @@ private unsafe ValueTask<int> ReadAsyncInternal(Memory<byte> destination, Cancel

Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed");

// Create and store async stream class library specific data in the async result
ValueTaskSource valueTaskSource = ValueTaskSource.Create(this, _preallocatedOverlapped, destination);
// valueTaskSource is not null when:
// - First time calling ReadAsync in buffered mode
// - Second+ time calling ReadAsync, both buffered or unbuffered
// - On buffered flush, when source memory is also the internal buffer
ValueTaskSource? valueTaskSource = Interlocked.Exchange(ref _currentOverlappedOwner, null);
// valueTaskSource is null when:
// - First time calling ReadAsync in unbuffered mode
if (valueTaskSource == null)
{
valueTaskSource = new ValueTaskSource(this, null);
}
valueTaskSource.Configure(destination);
NativeOverlapped* intOverlapped = valueTaskSource.Overlapped;

// Calculate position in the file we should be at after the read is done
Expand Down Expand Up @@ -253,8 +261,18 @@ private unsafe ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, Cancell

Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed");

// Create and store async stream class library specific data in the async result
ValueTaskSource valueTaskSource = ValueTaskSource.Create(this, _preallocatedOverlapped, source);
// valueTaskSource is not null when:
// - First time calling WriteAsync in buffered mode
// - Second+ time calling WriteAsync, both buffered or unbuffered
// - On buffered flush, when source memory is also the internal buffer
ValueTaskSource? valueTaskSource = Interlocked.Exchange(ref _currentOverlappedOwner, null);
// valueTaskSource is null when:
// - First time calling WriteAsync in unbuffered mode
if (valueTaskSource == null)
{
valueTaskSource = new ValueTaskSource(this, null);
}
valueTaskSource.Configure(source);
NativeOverlapped* intOverlapped = valueTaskSource.Overlapped;

long positionBefore = _filePosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ private void WriteByteSlow(byte value)
}
else
{
Debug.Assert(_writePos <= _bufferSize);
FlushWrite();
}

Expand Down