Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileStream rewrite: Caching the ValueTaskSource in AsyncWindowsFileStreamStrategy #51363

Merged
merged 15 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ internal sealed partial class AsyncWindowsFileStreamStrategy : WindowsFileStream
/// <summary>
/// Type that helps reduce allocations for FileStream.ReadAsync and FileStream.WriteAsync.
/// </summary>
private unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
private sealed unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
{
internal static readonly IOCompletionCallback s_ioCallback = IOCallback;

internal readonly PreAllocatedOverlapped _preallocatedOverlapped;
private readonly AsyncWindowsFileStreamStrategy _strategy;

private MemoryHandle _handle;
private ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
private NativeOverlapped* _overlapped;
private CancellationTokenRegistration _cancellationRegistration;
Expand All @@ -29,42 +29,26 @@ private unsafe class ValueTaskSource : IValueTaskSource<int>, IValueTaskSource
private bool _cancellationHasBeenRegistered;
#endif

public static ValueTaskSource Create(
AsyncWindowsFileStreamStrategy strategy,
PreAllocatedOverlapped? preallocatedOverlapped,
ReadOnlyMemory<byte> memory)
{
// If the memory passed in is the strategy's internal buffer, we can use the base AwaitableProvider,
// which has a PreAllocatedOverlapped with the memory already pinned. Otherwise, we use the derived
// MemoryAwaitableProvider, which Retains the memory, which will result in less pinning in the case
// where the underlying memory is backed by pre-pinned buffers.
return preallocatedOverlapped != null &&
MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> buffer) &&
preallocatedOverlapped.IsUserObject(buffer.Array) ?
new ValueTaskSource(strategy, preallocatedOverlapped, buffer.Array) :
new MemoryValueTaskSource(strategy, memory);
}

protected ValueTaskSource(
AsyncWindowsFileStreamStrategy strategy,
PreAllocatedOverlapped? preallocatedOverlapped,
byte[]? bytes)
internal ValueTaskSource(AsyncWindowsFileStreamStrategy strategy)
{
_strategy = strategy;
_result = TaskSourceCodes.NoResult;
_preallocatedOverlapped = new PreAllocatedOverlapped(s_ioCallback, this, null);

_source = default;
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
_source.RunContinuationsAsynchronously = true;
}

_overlapped = bytes != null &&
_strategy.CompareExchangeCurrentOverlappedOwner(this, null) == null ?
_strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(preallocatedOverlapped!) : // allocated when buffer was created, and buffer is non-null
_strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(s_ioCallback, this, bytes);
internal NativeOverlapped* Configure(ReadOnlyMemory<byte> memory)
{
_result = TaskSourceCodes.NoResult;
_source.Reset();
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved

_handle = memory.Pin();
_overlapped = _strategy._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped);

Debug.Assert(_overlapped != null, "AllocateNativeOverlapped returned null");
return _overlapped;
}

internal NativeOverlapped* Overlapped => _overlapped;
public ValueTaskSourceStatus GetStatus(short token) => _source.GetStatus(token);
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) => _source.OnCompleted(continuation, state, token, flags);
void IValueTaskSource.GetResult(short token) => _source.GetResult(token);
Expand Down Expand Up @@ -106,8 +90,10 @@ internal void RegisterForCancellation(CancellationToken cancellationToken)
}
}

internal virtual void ReleaseNativeResource()
internal void ReleaseNativeResource()
{
_handle.Dispose();

// Ensure that cancellation has been completed and cleaned up.
_cancellationRegistration.Dispose();

Expand All @@ -119,27 +105,11 @@ internal virtual void ReleaseNativeResource()
_strategy._fileHandle.ThreadPoolBinding!.FreeNativeOverlapped(_overlapped);
_overlapped = null;
}

// Ensure we're no longer set as the current AwaitableProvider (we may not have been to begin with).
// Only one operation at a time is eligible to use the preallocated overlapped
_strategy.CompareExchangeCurrentOverlappedOwner(null, this);
}

private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
{
// Extract the AwaitableProvider from the overlapped. The state in the overlapped
// will either be a AsyncWindowsFileStreamStrategy (in the case where the preallocated overlapped was used),
// in which case the operation being completed is its _currentOverlappedOwner, or it'll
// be directly the AwaitableProvider that's completing (in the case where the preallocated
// overlapped was already in use by another operation).
object? state = ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
Debug.Assert(state is AsyncWindowsFileStreamStrategy or ValueTaskSource);
ValueTaskSource valueTaskSource = state switch
{
AsyncWindowsFileStreamStrategy strategy => strategy._currentOverlappedOwner!, // must be owned
_ => (ValueTaskSource)state
};
Debug.Assert(valueTaskSource != null);
ValueTaskSource valueTaskSource = (ValueTaskSource)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped)!;
Debug.Assert(valueTaskSource._overlapped == pOverlapped, "Overlaps don't match");

// Handle reading from & writing to closed pipes. While I'm not sure
Expand Down Expand Up @@ -200,6 +170,9 @@ private void CompleteCallback(ulong packedResult)
Debug.Assert(result == TaskSourceCodes.ResultSuccess, "Unknown result");
_source.SetResult((int)(packedResult & uint.MaxValue));
}

// The instance is ready to be reused
_strategy.TryToReuse(this);
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
}

private void Cancel(CancellationToken token)
Expand All @@ -224,28 +197,5 @@ private void Cancel(CancellationToken token)
}
}
}

/// <summary>
/// Extends <see cref="ValueTaskSource"/> with to support disposing of a
/// <see cref="MemoryHandle"/> when the operation has completed. This should only be used
/// when memory doesn't wrap a byte[].
/// </summary>
private sealed class MemoryValueTaskSource : ValueTaskSource
{
private MemoryHandle _handle; // mutable struct; do not make this readonly

// this type handles the pinning, so bytes are null
internal unsafe MemoryValueTaskSource(AsyncWindowsFileStreamStrategy strategy, ReadOnlyMemory<byte> memory)
: base(strategy, null, null) // this type handles the pinning, so null is passed for bytes to the base
{
_handle = memory.Pin();
}

internal override void ReleaseNativeResource()
{
_handle.Dispose();
base.ReleaseNativeResource();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ namespace System.IO.Strategies
{
internal sealed partial class AsyncWindowsFileStreamStrategy : WindowsFileStreamStrategy
{
private PreAllocatedOverlapped? _preallocatedOverlapped; // optimization for async ops to avoid per-op allocations
private ValueTaskSource? _currentOverlappedOwner; // async op currently using the preallocated overlapped
private ValueTaskSource? _reusableValueTaskSource; // reusable ValueTaskSource that is currently NOT being used

internal AsyncWindowsFileStreamStrategy(SafeFileHandle handle, FileAccess access, FileShare share)
: base(handle, access, share)
Expand All @@ -32,7 +31,7 @@ public override ValueTask DisposeAsync()
ValueTask result = base.DisposeAsync();
Debug.Assert(result.IsCompleted, "the method must be sync, as it performs no flushing");

_preallocatedOverlapped?.Dispose();
Interlocked.Exchange(ref _reusableValueTaskSource, null)?._preallocatedOverlapped.Dispose();

return result;
}
Expand All @@ -43,7 +42,7 @@ protected override void Dispose(bool disposing)
// before _preallocatedOverlapped is disposed
base.Dispose(disposing);

_preallocatedOverlapped?.Dispose();
Interlocked.Exchange(ref _reusableValueTaskSource, null)?._preallocatedOverlapped.Dispose();
}

protected override void OnInitFromHandle(SafeFileHandle handle)
Expand Down Expand Up @@ -103,17 +102,14 @@ protected override void OnInit()
}
}

// called by BufferedFileStreamStrategy
internal override void OnBufferAllocated(byte[] buffer)
private void TryToReuse(ValueTaskSource source)
{
Debug.Assert(_preallocatedOverlapped == null);

_preallocatedOverlapped = new PreAllocatedOverlapped(ValueTaskSource.s_ioCallback, this, buffer);
if (Interlocked.CompareExchange(ref _reusableValueTaskSource, source, null) is not null)
{
source._preallocatedOverlapped.Dispose();
}
}
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved

private ValueTaskSource? CompareExchangeCurrentOverlappedOwner(ValueTaskSource? newSource, ValueTaskSource? existingSource)
=> Interlocked.CompareExchange(ref _currentOverlappedOwner, newSource, existingSource);

public override int Read(byte[] buffer, int offset, int count)
{
ValueTask<int> vt = ReadAsyncInternal(new Memory<byte>(buffer, offset, count), CancellationToken.None);
Expand All @@ -137,9 +133,14 @@ private unsafe ValueTask<int> ReadAsyncInternal(Memory<byte> destination, Cancel

Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed");

// Create and store async stream class library specific data in the async result
ValueTaskSource valueTaskSource = ValueTaskSource.Create(this, _preallocatedOverlapped, destination);
NativeOverlapped* intOverlapped = valueTaskSource.Overlapped;
// valueTaskSource is not null when:
// - First time calling ReadAsync in buffered mode
// - Second+ time calling ReadAsync, both buffered or unbuffered
// - On buffered flush, when source memory is also the internal buffer
// valueTaskSource is null when:
// - First time calling ReadAsync in unbuffered mode
ValueTaskSource valueTaskSource = Interlocked.Exchange(ref _reusableValueTaskSource, null) ?? new ValueTaskSource(this);
NativeOverlapped* intOverlapped = valueTaskSource.Configure(destination);

// Calculate position in the file we should be at after the read is done
long positionBefore = _filePosition;
Expand Down Expand Up @@ -195,6 +196,7 @@ private unsafe ValueTask<int> ReadAsyncInternal(Memory<byte> destination, Cancel
// Failure to do so looks like we are freeing a pending overlapped later.
intOverlapped->InternalLow = IntPtr.Zero;
valueTaskSource.ReleaseNativeResource();
TryToReuse(valueTaskSource);
return new ValueTask<int>(0);
}
else if (errorCode != Interop.Errors.ERROR_IO_PENDING)
Expand All @@ -205,6 +207,7 @@ private unsafe ValueTask<int> ReadAsyncInternal(Memory<byte> destination, Cancel
}

valueTaskSource.ReleaseNativeResource();
TryToReuse(valueTaskSource);

if (errorCode == Interop.Errors.ERROR_HANDLE_EOF)
{
Expand Down Expand Up @@ -253,9 +256,14 @@ private unsafe ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, Cancell

Debug.Assert(!_fileHandle.IsClosed, "!_handle.IsClosed");

// Create and store async stream class library specific data in the async result
ValueTaskSource valueTaskSource = ValueTaskSource.Create(this, _preallocatedOverlapped, source);
NativeOverlapped* intOverlapped = valueTaskSource.Overlapped;
// valueTaskSource is not null when:
// - First time calling WriteAsync in buffered mode
// - Second+ time calling WriteAsync, both buffered or unbuffered
// - On buffered flush, when source memory is also the internal buffer
// valueTaskSource is null when:
// - First time calling WriteAsync in unbuffered mode
ValueTaskSource valueTaskSource = Interlocked.Exchange(ref _reusableValueTaskSource, null) ?? new ValueTaskSource(this);
NativeOverlapped* intOverlapped = valueTaskSource.Configure(source);

long positionBefore = _filePosition;
if (CanSeek)
Expand Down Expand Up @@ -293,6 +301,7 @@ private unsafe ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, Cancell
// Not an error, but EOF. AsyncFSCallback will NOT be called.
// Completing TCS and return cached task allowing the GC to collect TCS.
valueTaskSource.ReleaseNativeResource();
TryToReuse(valueTaskSource);
return ValueTask.CompletedTask;
}
else if (errorCode != Interop.Errors.ERROR_IO_PENDING)
Expand All @@ -303,6 +312,7 @@ private unsafe ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, Cancell
}

valueTaskSource.ReleaseNativeResource();
TryToReuse(valueTaskSource);

if (errorCode == Interop.Errors.ERROR_HANDLE_EOF)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Win32.SafeHandles;
Expand Down Expand Up @@ -462,7 +463,7 @@ private async ValueTask<int> ReadAsyncSlowPath(Task semaphoreLockTask, Memory<by
// If there was anything in the write buffer, clear it.
if (_writePos > 0)
{
await _strategy.WriteAsync(new ReadOnlyMemory<byte>(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
await _strategy.WriteAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
_writePos = 0;
}

Expand All @@ -474,7 +475,7 @@ private async ValueTask<int> ReadAsyncSlowPath(Task semaphoreLockTask, Memory<by

// Ok. We can fill the buffer:
EnsureBufferAllocated();
_readLen = await _strategy.ReadAsync(new Memory<byte>(_buffer, 0, _bufferSize), cancellationToken).ConfigureAwait(false);
_readLen = await _strategy.ReadAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, 0, _bufferSize), cancellationToken).ConfigureAwait(false);

bytesFromBuffer = Math.Min(_readLen, buffer.Length);
_buffer.AsSpan(0, bytesFromBuffer).CopyTo(buffer.Span);
Expand Down Expand Up @@ -604,6 +605,7 @@ private void WriteByteSlow(byte value)
}
else
{
Debug.Assert(_writePos <= _bufferSize);
FlushWrite();
}

Expand Down Expand Up @@ -731,7 +733,7 @@ private async ValueTask WriteAsyncSlowPath(Task semaphoreLockTask, ReadOnlyMemor
}
}

await _strategy.WriteAsync(new ReadOnlyMemory<byte>(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
await _strategy.WriteAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
_writePos = 0;
}

Expand Down Expand Up @@ -832,7 +834,7 @@ private async Task FlushAsyncInternal(CancellationToken cancellationToken)
{
if (_writePos > 0)
{
await _strategy.WriteAsync(new ReadOnlyMemory<byte>(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
await _strategy.WriteAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
_writePos = 0;
Debug.Assert(_writePos == 0 && _readPos == 0 && _readLen == 0);
return;
Expand Down Expand Up @@ -886,13 +888,13 @@ private async Task CopyToAsyncCore(Stream destination, int bufferSize, Cancellat
{
// If there's any read data in the buffer, write it all to the destination stream.
Debug.Assert(_writePos == 0, "Write buffer must be empty if there's data in the read buffer");
await destination.WriteAsync(new ReadOnlyMemory<byte>(_buffer, _readPos, readBytes), cancellationToken).ConfigureAwait(false);
await destination.WriteAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, _readPos, readBytes), cancellationToken).ConfigureAwait(false);
_readPos = _readLen = 0;
}
else if (_writePos > 0)
{
// If there's write data in the buffer, flush it back to the underlying stream, as does ReadAsync.
await _strategy.WriteAsync(new ReadOnlyMemory<byte>(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
await _strategy.WriteAsync(MemoryMarshal.CreateFromPinnedArray(_buffer, 0, _writePos), cancellationToken).ConfigureAwait(false);
_writePos = 0;
}

Expand Down Expand Up @@ -1067,7 +1069,8 @@ private void EnsureBufferAllocated()

void AllocateBuffer() // logic kept in a separate method to get EnsureBufferAllocated() inlined
{
_strategy.OnBufferAllocated(_buffer = new byte[_bufferSize]);
_buffer = GC.AllocateArray<byte>(_bufferSize,
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
pinned: true); // this allows us to avoid pinning when the buffer is used for the syscalls
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That correct, but pinned: true also allocates the array in Gen2 as side-effect so this may actually hurt real-world scenarios at the end..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can allocate it and use a GCHandle if that ends up being better. Previously it was pinned as part of a PreallocatedOverlapped.

(It's still not at all obvious when this newfangled POH should be used. )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also investigate not pinning at all, in which case the code this interacts with will just create a gchandle for each operation.

And/or look at using a pool array, but we'd want to ensure enough synchronization was in place to minimize erroneous usage causing us to return an array still in use. We do that in a few other streams.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a simple test:

using System;
using System.IO;

for (int i = 0; i < 100_000; i++)
{
    using (var f = new FileStream("test", FileMode.Create))
    {
        f.WriteByte(42);
    }
}
Console.WriteLine($"Allocated: {GC.GetTotalAllocatedBytes()} Gen2 GCs: {GC.CollectionCount(2)}");
  • .NET 5: Allocated: 442474096 Gen2 GCs: 0
  • This PR: Allocated: 448051624 Gen2 GCs: 103

It will be interesting to see whether these excessive Gen2 GCs hit performance gates of services trying .NET 6 previews.

It's still not at all obvious when this newfangled POH should be used.

Agree. It is very hard to use.

we'd want to ensure enough synchronization was in place to minimize erroneous usage causing us to return an array still in use

If you can cover all these cases, it may be better to use unmanaged buffer. It is pinned too, and it does not cause excessive Gen2 GCs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So allocating on the POH contributes to the gen2 budget. This is why we disable the buffer using size 1, that still works right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing in this file is used at all if buffer size is 1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking forward to taking another stab at optimizing static files in .NET 6

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can cover all these cases, it may be better to use unmanaged buffer. It is pinned too, and it does not cause excessive Gen2 GCs.

I'm going to start with a GCHandle and a normally allocated array. I believe in that case I can mostly restrict synchronization to the async code paths (plus disposal). If we use a native buffer, we'll need to protect the sync code paths as well. We can start with this and then see if it makes sense to use a pooled or native buffer as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, actually, I'm going to start by not pinning here at all (it'll then pin/unpin in the rest of the implementation per operation). If there's no measurable impact, we can stick with that for now.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,5 @@ internal abstract class FileStreamStrategy : Stream
internal abstract void Flush(bool flushToDisk);

internal abstract void DisposeInternal(bool disposing);

internal virtual void OnBufferAllocated(byte[] buffer) { }
}
}