Skip to content

Commit

Permalink
Simplify the allocate and commit logic (dotnet/corefx#33811)
Browse files Browse the repository at this point in the history
- Only clear the writing head when the list is empty.
- Commit just updates the reading tail (renamed the commit head).



Commit migrated from dotnet/corefx@07d1bb7
  • Loading branch information
davidfowl authored Dec 5, 2018
1 parent 656e5d4 commit 1710c91
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<Compile Include="System\IO\Pipelines\PipeCompletionCallbacks.cs" />
<Compile Include="System\IO\Pipelines\PipeOptions.cs" />
<Compile Include="System\IO\Pipelines\PipeReader.cs" />
<Compile Include="System\IO\Pipelines\PipeReaderState.cs" />
<Compile Include="System\IO\Pipelines\PipeOperationState.cs" />
<Compile Include="System\IO\Pipelines\PipeScheduler.cs" />
<Compile Include="System\IO\Pipelines\PipeWriter.cs" />
<Compile Include="System\IO\Pipelines\ReadResult.cs" />
Expand Down
139 changes: 56 additions & 83 deletions src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ public sealed partial class Pipe
private int _readHeadIndex;

// The commit head which is the extent of the bytes available to the IPipelineReader to consume
private BufferSegment _commitHead;
private int _commitHeadIndex;
private BufferSegment _readTail;
private int _readTailIndex;

// The write head which is the extent of the IPipelineWriter's written bytes
private BufferSegment _writingHead;

private PipeReaderState _readingState;
private PipeOperationState _operationState;

private bool _disposed;

Expand All @@ -97,7 +97,7 @@ public Pipe(PipeOptions options)

_bufferSegmentPool = new BufferSegment[SegmentPoolSize];

_readingState = default;
_operationState = default;
_readerCompletion = default;
_writerCompletion = default;

Expand All @@ -120,7 +120,8 @@ private void ResetState()
_writerCompletion.Reset();
_readerAwaitable = new PipeAwaitable(completed: false, _useSynchronizationContext);
_writerAwaitable = new PipeAwaitable(completed: true, _useSynchronizationContext);
_commitHeadIndex = 0;
_readTailIndex = 0;
_readHeadIndex = 0;
_currentWriteLength = 0;
_length = 0;
}
Expand Down Expand Up @@ -175,60 +176,28 @@ internal Span<byte> GetSpan(int sizeHint)

private void AllocateWriteHeadUnsynchronized(int sizeHint)
{
BufferSegment segment = null;
if (_writingHead != null)
_operationState.BeginWrite();
if (_writingHead == null)
{
segment = _writingHead;

int bytesLeftInBuffer = segment.WritableBytes;

// If inadequate bytes left or if the segment is readonly
if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint)
{
BufferSegment nextSegment = CreateSegmentUnsynchronized();
nextSegment.SetMemory(_pool.Rent(GetSegmentSize(sizeHint)));

segment.SetNext(nextSegment);
// We need to allocate memory to write since nobody has written before
BufferSegment newSegment = CreateSegmentUnsynchronized();
newSegment.SetMemory(_pool.Rent(GetSegmentSize(sizeHint)));

_writingHead = nextSegment;
}
// Set all the pointers
_writingHead = _readHead = _readTail = newSegment;
}
else
{
if (_commitHead != null)
{
// Try to return the tail so the calling code can append to it
int remaining = _commitHead.WritableBytes;

if (sizeHint <= remaining && remaining > 0)
{
// Free tail space of the right amount, use that
segment = _commitHead;

// Set write head to assigned segment
_writingHead = segment;
return;
}
}

// No free tail space, allocate a new segment
segment = CreateSegmentUnsynchronized();
segment.SetMemory(_pool.Rent(GetSegmentSize(sizeHint)));
int bytesLeftInBuffer = _writingHead.WritableBytes;

if (_commitHead == null)
{
// No previous writes have occurred
_commitHead = segment;
}
else if (segment != _commitHead && _commitHead.Next == null)
if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint)
{
// Append the segment to the commit head if writes have been committed
// and it isn't the same segment (unused tail space)
_commitHead.SetNext(segment);
}
BufferSegment newSegment = CreateSegmentUnsynchronized();
newSegment.SetMemory(_pool.Rent(GetSegmentSize(sizeHint)));

// Set write head to assigned segment
_writingHead = segment;
_writingHead.SetNext(newSegment);
_writingHead = newSegment;
}
}
}

Expand Down Expand Up @@ -263,25 +232,18 @@ private void ReturnSegmentUnsynchronized(BufferSegment segment)

internal bool CommitUnsynchronized()
{
if (_writingHead == null)
_operationState.EndWrite();

if (_currentWriteLength == 0)
{
// Nothing written to commit
return true;
}

if (_readHead == null)
{
// Update the head to point to the head of the buffer.
// This happens if we called alloc(0) then write
_readHead = _commitHead;
_readHeadIndex = 0;
}

// Always move the commit head to the write head
var bytesWritten = _currentWriteLength;
_commitHead = _writingHead;
_commitHeadIndex = _writingHead.End;
_length += bytesWritten;
// Always move the read tail to the write head
_readTail = _writingHead;
_readTailIndex = _writingHead.End;
_length += _currentWriteLength;

// Do not reset if reader is complete
if (_pauseWriterThreshold > 0 &&
Expand All @@ -291,11 +253,9 @@ internal bool CommitUnsynchronized()
_writerAwaitable.Reset();
}

// Clear the writing state
_writingHead = null;
_currentWriteLength = 0;

return bytesWritten == 0;
return false;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -420,7 +380,7 @@ internal void AdvanceReader(in SequencePosition consumed, in SequencePosition ex
AdvanceReader((BufferSegment)consumed.GetObject(), consumed.GetInteger(), (BufferSegment)examined.GetObject(), examined.GetInteger());
}

internal void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex)
private void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex)
{
BufferSegment returnStart = null;
BufferSegment returnEnd = null;
Expand All @@ -430,9 +390,9 @@ internal void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, Bu
lock (_sync)
{
var examinedEverything = false;
if (examinedSegment == _commitHead)
if (examinedSegment == _readTail)
{
examinedEverything = _commitHead != null ? examinedIndex == _commitHeadIndex : examinedIndex == 0;
examinedEverything = examinedIndex == _readTailIndex;
}

if (consumedSegment != null)
Expand Down Expand Up @@ -460,17 +420,28 @@ internal void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, Bu
// Check if we consumed entire last segment
// if we are going to return commit head we need to check that there is no writing operation that
// might be using tailspace
if (consumedIndex == returnEnd.Length && _writingHead != returnEnd)
if (consumedIndex == returnEnd.Length && !_operationState.IsWritingActive)
{
BufferSegment nextBlock = returnEnd.NextSegment;
if (_commitHead == returnEnd)
if (_readTail == returnEnd)
{
_commitHead = nextBlock;
_commitHeadIndex = 0;
_readTail = nextBlock;
_readTailIndex = 0;
}

_readHead = nextBlock;
_readHeadIndex = 0;

// Reset the writing head to null if it's the return block
// then null it out as we're about to reset that memory
if (_writingHead == returnEnd)
{
// If we're about to null out the _writingHead then assert the list is empty
Debug.Assert(_readHead == null);
Debug.Assert(_readTail == null);
_writingHead = null;
}

returnEnd = nextBlock;
}
else
Expand Down Expand Up @@ -499,7 +470,7 @@ internal void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, Bu
returnStart = returnStart.NextSegment;
}

_readingState.End();
_operationState.EndRead();
}

TrySchedule(_writerScheduler, completionData);
Expand All @@ -514,9 +485,9 @@ internal void CompleteReader(Exception exception)
lock (_sync)
{
// If we're reading, treat clean up that state before continuting
if (_readingState.IsActive)
if (_operationState.IsReadingActive)
{
_readingState.End();
_operationState.EndRead();
}

// REVIEW: We should consider cleaning up all of the allocated memory
Expand Down Expand Up @@ -645,6 +616,8 @@ internal bool TryRead(out ReadResult result)
{
ThrowHelper.ThrowInvalidOperationException_AlreadyReading();
}

_operationState.BeginReadTentative();
result = default;
return false;
}
Expand Down Expand Up @@ -727,7 +700,7 @@ private void CompletePipe()
// Return all segments
// if _readHead is null we need to try return _commitHead
// because there might be a block allocated for writing
BufferSegment segment = _readHead ?? _commitHead;
BufferSegment segment = _readHead ?? _readTail;
while (segment != null)
{
BufferSegment returnSegment = segment;
Expand All @@ -738,7 +711,7 @@ private void CompletePipe()

_writingHead = null;
_readHead = null;
_commitHead = null;
_readTail = null;
}
}

Expand Down Expand Up @@ -808,7 +781,7 @@ private void GetReadResult(out ReadResult result)
if (head != null)
{
// Reading commit head shared with writer
var readOnlySequence = new ReadOnlySequence<byte>(head, _readHeadIndex, _commitHead, _commitHeadIndex);
var readOnlySequence = new ReadOnlySequence<byte>(head, _readHeadIndex, _readTail, _readTailIndex);
result = new ReadResult(readOnlySequence, isCanceled, isCompleted);
}
else
Expand All @@ -818,11 +791,11 @@ private void GetReadResult(out ReadResult result)

if (isCanceled)
{
_readingState.BeginTentative();
_operationState.BeginReadTentative();
}
else
{
_readingState.Begin();
_operationState.BeginRead();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Diagnostics;
using System.Runtime.CompilerServices;

namespace System.IO.Pipelines
{
[DebuggerDisplay("State: {_state}")]
internal struct PipeOperationState
{
private State _state;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginRead()
{
if ((_state & State.Reading) == State.Reading)
{
ThrowHelper.ThrowInvalidOperationException_AlreadyReading();
}

_state |= State.Reading;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginReadTentative()
{
if ((_state & State.Reading) == State.Reading)
{
ThrowHelper.ThrowInvalidOperationException_AlreadyReading();
}

_state |= State.ReadingTentative;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EndRead()
{
if ((_state & State.Reading) != State.Reading &&
(_state & State.ReadingTentative) != State.ReadingTentative)
{
ThrowHelper.ThrowInvalidOperationException_NoReadToComplete();
}

_state &= ~(State.Reading | State.ReadingTentative);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginWrite()
{
_state |= State.Writing;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EndWrite()
{
_state &= ~State.Writing;
}

public bool IsWritingActive => (_state & State.Writing) == State.Writing;

public bool IsReadingActive => (_state & State.Reading) == State.Reading;

[Flags]
internal enum State : byte
{
Reading = 1,
ReadingTentative = 2,
Writing = 4
}
}
}
Loading

0 comments on commit 1710c91

Please sign in to comment.