Skip to content

Commit

Permalink
Merge pull request #64494 from CyrusNajmabadi/simpleConnection
Browse files Browse the repository at this point in the history
Simpler bidirectional streaming invocation in OOP.
  • Loading branch information
CyrusNajmabadi committed Oct 7, 2022
2 parents ef1adec + 4ef903f commit dc830b2
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 128 deletions.
73 changes: 1 addition & 72 deletions src/Workspaces/Remote/Core/BrokeredServiceConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,78 +318,7 @@ public override async ValueTask<bool> TryInvokeAsync(Solution solution1, Solutio
}
}

// streaming

/// <param name="service">The service instance.</param>
/// <param name="invocation">A callback to asynchronously write data. The callback is required to complete the
/// <see cref="PipeWriter"/> except in cases where the callback throws an exception.</param>
/// <param name="reader">A callback to asynchronously read data. The callback is allowed, but not required, to
/// complete the <see cref="PipeReader"/>.</param>
/// <param name="cancellationToken">A cancellation token the operation will observe.</param>
internal static async ValueTask<TResult> InvokeStreamingServiceAsync<TResult>(
TService service,
Func<TService, PipeWriter, CancellationToken, ValueTask> invocation,
Func<PipeReader, CancellationToken, ValueTask<TResult>> reader,
CancellationToken cancellationToken)
{
// We can cancel at entry, but once the pipe operations are scheduled we rely on both operations running to
// avoid deadlocks (the exception handler in 'writerTask' ensures progress is made in 'readerTask').
cancellationToken.ThrowIfCancellationRequested();
var mustNotCancelToken = CancellationToken.None;

// After this point, the full cancellation sequence is as follows:
// 1. 'cancellationToken' indicates cancellation is requested
// 2. 'invocation' and 'readerTask' have cancellation requested
// 3. 'invocation' stops writing to 'pipe.Writer'
// 4. 'pipe.Writer' is completed
// 5. 'readerTask' continues reading until EndOfStreamException (workaround for https://github.com/AArnott/Nerdbank.Streams/issues/361)
// 6. 'pipe.Reader' is completed
// 7. OperationCanceledException is thrown back to the caller

var pipe = new Pipe();

// Create new tasks that both start executing, rather than invoking the delegates directly
// to make sure both invocation and reader start executing and transfering data.

var writerTask = Task.Run(async () =>
{
try
{
await invocation(service, pipe.Writer, cancellationToken).ConfigureAwait(false);
}
catch (Exception e)
{
// Ensure that the writer is complete if an exception is thrown
// before the writer is passed to the RPC proxy. Once it's passed to the proxy
// the proxy should complete it as soon as the remote side completes it.
await pipe.Writer.CompleteAsync(e).ConfigureAwait(false);
throw;
}
}, mustNotCancelToken);

var readerTask = Task.Run(
async () =>
{
Exception? exception = null;
try
{
return await reader(pipe.Reader, cancellationToken).ConfigureAwait(false);
}
catch (Exception e) when ((exception = e) == null)
{
throw ExceptionUtilities.Unreachable();
}
finally
{
await pipe.Reader.CompleteAsync(exception).ConfigureAwait(false);
}
}, mustNotCancelToken);

await Task.WhenAll(writerTask, readerTask).ConfigureAwait(false);

return readerTask.Result;
}
// Exceptions

private bool ReportUnexpectedException(Exception exception, CancellationToken cancellationToken)
{
Expand Down
98 changes: 94 additions & 4 deletions src/Workspaces/Remote/Core/RemoteCallback.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.ErrorReporting;
using Microsoft.ServiceHub.Framework;
using Microsoft.VisualStudio.Threading;
using Roslyn.Utilities;
using StreamJsonRpc;

Expand Down Expand Up @@ -87,23 +88,112 @@ public async ValueTask<TResult> InvokeAsync<TResult>(Func<T, CancellationToken,
}

/// <summary>
/// Invokes API on the callback object hosted in the original process (usually devenv) associated with the currently executing brokered service hosted in ServiceHub process.
/// The API streams results back to the caller.
/// Invokes API on the callback object hosted in the original process (usually devenv) associated with the
/// currently executing brokered service hosted in ServiceHub process. The API streams results back to the
/// caller.
/// </summary>
/// <inheritdoc cref="BrokeredServiceConnection{TService}.InvokeStreamingServiceAsync"/>
/// <param name="invocation">A callback to asynchronously write data. The callback should always <see
/// cref="PipeWriter.Complete"/> the <see cref="PipeWriter"/>. If it does not then reading will hang</param>
/// <param name="reader">A callback to asynchronously read data. The callback should not complete the <see
/// cref="PipeReader"/>, but no harm will happen if it does.</param>
/// <param name="cancellationToken">A cancellation token the operation will observe.</param>
public async ValueTask<TResult> InvokeAsync<TResult>(
Func<T, PipeWriter, CancellationToken, ValueTask> invocation,
Func<PipeReader, CancellationToken, ValueTask<TResult>> reader,
CancellationToken cancellationToken)
{
try
{
return await BrokeredServiceConnection<T>.InvokeStreamingServiceAsync(_callback, invocation, reader, cancellationToken).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
var pipe = new Pipe();

// Kick off the work to do the writing to the pipe asynchronously. It will start hot and will be able
// to do work as the reading side attempts to pull in the data it is writing.

var writeTask = WriteAsync(_callback, pipe.Writer);
var readTask = ReadAsync(pipe.Reader);

// Note: waiting on the write-task is not strictly necessary. The read-task cannot complete unless it
// the write-task completes (or it faults for some reason). However, it's nice and clean to just not
// use fire-and-forget here and avoids us having to consider things like async-tracking-tokens for
// testing purposes.
await Task.WhenAll(writeTask, readTask).ConfigureAwait(false);
return await readTask.ConfigureAwait(false);
}
catch (Exception exception) when (ReportUnexpectedException(exception, cancellationToken))
{
throw new OperationCanceledIgnoringCallerTokenException(exception);
}

async Task WriteAsync(T service, PipeWriter pipeWriter)
{
Exception? exception = null;
try
{
// Intentionally yield this thread so that the caller can proceed concurrently and start reading.
// This is not strictly necessary (as we know the writer will always call FlushAsync()), but it is nice
// as it allows both to proceed concurrently on the initial writing/reading.
await Task.Yield();

await invocation(service, pipeWriter, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex) when ((exception = ex) == null)
{
throw ExceptionUtilities.Unreachable();
}
finally
{
// Absolutely do not Complete/CompleteAsync the writer here *unless* an exception occurred. The
// writer is passed to StreamJsonRPC which takes ownership of it. The *inside* of that rpc is
// responsible for Completing the writer *it* is passed, which will signal the completion of the
// writer we have here.
//
// We *do* need to complete this writer in the event if an exception as that may have happened
// *prior* to even issuing the rpc. If we don't complete the writer we will hang. If the exception
// happened within the RPC the writer may already be completed, but it's fine for us to complete it
// a second time.
//
// The reason is *not* fine for us to complete the writer in a non-exception event is that it legal
// (and is the case in practice) that the code in StreamJsonRPC may still be using it (see
// https://github.com/AArnott/Nerdbank.Streams/blob/dafeb5846702bc29e261c9ddf60f42feae01654c/src/Nerdbank.Streams/PipeExtensions.cs#L428)
// where the writer may be advanced in an independent Task even once the rpc message has returned to
// the caller (us).
//
// NOTE: it is intentinonal that the try/catch pattern here does NOT match the one in ReadAsync. There
// are very different semantics around each. The writer code passes ownership to StreamJsonRPC, while
// the reader code does not. As such, the reader code is responsible for completing the reader in all
// cases, whereas the writer code only completes when faulting.

// DO NOT REMOVE THIS NULL CHECK WITHOUT DEEP AND CAREFUL REVIEW.
if (exception != null)
await pipeWriter.CompleteAsync(exception).ConfigureAwait(false);
}
}

async Task<TResult> ReadAsync(PipeReader pipeReader)
{
// NOTE: it is intentional that the try/catch pattern here does NOT match the one in WriteAsync. There
// are very different semantics around each. The writer code passes ownership to StreamJsonRPC, while
// the reader code does not. As such, the reader code is responsible for completing the reader in all
// cases, whereas the writer code only completes when faulting.

Exception? exception = null;
try
{
return await reader(pipeReader, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex) when ((exception = ex) == null)
{
throw ExceptionUtilities.Unreachable();
}
finally
{
// ensure we always complete the reader so the pipe can clean up all its resources. in the case of
// an exception, attempt to complete the reader with that as well as that will tear down the writer
// allowing it to stop writing and allowing the pipe to be cleaned up.
await pipeReader.CompleteAsync(exception).ConfigureAwait(false);
}
}
}

// Remote calls can only throw 4 types of exceptions that correspond to
Expand Down
5 changes: 4 additions & 1 deletion src/Workspaces/Remote/Core/RemoteHostAssetSerialization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ public static async ValueTask WriteDataAsync(
{
WriteAsset(writer, serializer, context, checksum, asset, cancellationToken);

// Flush after each item as a reasonably chunk of data to want to send over the pipe.
// We flush after each item as that forms a reasonably sized chunk of data to want to then send over the
// pipe for the reader on the other side to read. This allows the item-writing to remain entirely
// synchronous without any blocking on async flushing, while also ensuring that we're not buffering the
// entire stream of data into the pipe before it gets sent to the other side.
await stream.FlushAsync(cancellationToken).ConfigureAwait(false);
}

Expand Down
114 changes: 63 additions & 51 deletions src/Workspaces/Remote/Core/SolutionAssetProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ public async ValueTask GetAssetsAsync(PipeWriter pipeWriter, Checksum solutionCh
// The responsibility is on us (as per the requirements of RemoteCallback.InvokeAsync) to Complete the
// pipewriter. This will signal to streamjsonrpc that the writer passed into it is complete, which will
// allow the calling side know to stop reading results.
Exception? exception = null;
try
{
await GetAssetsWorkerAsync(pipeWriter, solutionChecksum, checksums, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
catch (Exception ex) when ((exception = ex) == null)
{
await pipeWriter.CompleteAsync(ex).ConfigureAwait(false);
throw;
throw ExceptionUtilities.Unreachable();
}
finally
{
await pipeWriter.CompleteAsync().ConfigureAwait(false);
await pipeWriter.CompleteAsync(exception).ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -91,57 +91,39 @@ await RemoteHostAssetSerialization.WriteDataAsync(
/// <em>every</em> write. That's undesirable as that will then block a thread pool thread on the actual
/// asynchronous flush call to the underlying PipeWriter
/// </summary>
/// <remarks>
/// Note: this stream does not have to <see cref="PipeWriter.Complete"/> the underlying <see cref="_writer"/> it
/// is holding onto (including within <see cref="Flush"/>, <see cref="FlushAsync"/>, or <see cref="Dispose"/>).
/// Responsibility for that is solely in the hands of <see cref="GetAssetsAsync"/>.
/// </remarks>
private class PipeWriterStream : Stream, IDisposableObservable
{
private readonly PipeWriter _writer;

internal PipeWriterStream(PipeWriter writer)
{
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
}
public bool IsDisposed { get; private set; }

public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => !this.IsDisposed;

#region read/seek api (not supported)

public override long Length => throw this.ThrowDisposedOr(new NotSupportedException());
public override long Position
internal PipeWriterStream(PipeWriter writer)
{
get => throw this.ThrowDisposedOr(new NotSupportedException());
set => this.ThrowDisposedOr(new NotSupportedException());
_writer = writer;
}

public override int Read(byte[] buffer, int offset, int count)
=> throw this.ThrowDisposedOr(new NotSupportedException());

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> throw this.ThrowDisposedOr(new NotSupportedException());

#if !NETSTANDARD

public override int Read(Span<byte> buffer)
=> throw this.ThrowDisposedOr(new NotSupportedException());

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
=> throw this.ThrowDisposedOr(new NotSupportedException());

#endif

public override int ReadByte()
=> throw this.ThrowDisposedOr(new NotSupportedException());

public override long Seek(long offset, SeekOrigin origin)
=> throw this.ThrowDisposedOr(new NotSupportedException());

public override void SetLength(long value)
=> this.ThrowDisposedOr(new NotSupportedException());

#endregion
protected override void Dispose(bool disposing)
{
this.IsDisposed = true;
base.Dispose(disposing);

public bool IsDisposed { get; private set; }
// DO NOT CALL .Complete on the PipeWriter here (see remarks on type).
}

public override bool CanWrite => !this.IsDisposed;
private Exception ThrowDisposedOr(Exception ex)
{
Verify.NotDisposed(this);
throw ex;
}

/// <summary>
/// Intentionally a no op. We know that we and <see cref="RemoteHostAssetSerialization.WriteDataAsync"/>
Expand All @@ -151,10 +133,16 @@ public override void SetLength(long value)
public override void Flush()
{
Verify.NotDisposed(this);

// DO NOT CALL .Complete on the PipeWriter here (see remarks on type).
}

public override async Task FlushAsync(CancellationToken cancellationToken)
=> await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
{
await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);

// DO NOT CALL .Complete on the PipeWriter here (see remarks on type).
}

public override void Write(byte[] buffer, int offset, int count)
{
Expand Down Expand Up @@ -200,17 +188,41 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo

#endif

protected override void Dispose(bool disposing)
{
this.IsDisposed = true;
base.Dispose(disposing);
}
#region read/seek api (not supported)

private Exception ThrowDisposedOr(Exception ex)
public override long Length => throw this.ThrowDisposedOr(new NotSupportedException());
public override long Position
{
Verify.NotDisposed(this);
throw ex;
get => throw this.ThrowDisposedOr(new NotSupportedException());
set => this.ThrowDisposedOr(new NotSupportedException());
}

public override int Read(byte[] buffer, int offset, int count)
=> throw this.ThrowDisposedOr(new NotSupportedException());

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> throw this.ThrowDisposedOr(new NotSupportedException());

#if !NETSTANDARD

public override int Read(Span<byte> buffer)
=> throw this.ThrowDisposedOr(new NotSupportedException());

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
=> throw this.ThrowDisposedOr(new NotSupportedException());

#endif

public override int ReadByte()
=> throw this.ThrowDisposedOr(new NotSupportedException());

public override long Seek(long offset, SeekOrigin origin)
=> throw this.ThrowDisposedOr(new NotSupportedException());

public override void SetLength(long value)
=> this.ThrowDisposedOr(new NotSupportedException());

#endregion
}
}
}

0 comments on commit dc830b2

Please sign in to comment.