Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keep MsQuicConnection alive when streams are pending #52800

Merged
merged 9 commits into from
Jun 10, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ internal static class MsQuicStatusCodes
internal static uint InternalError => OperatingSystem.IsWindows() ? Windows.InternalError : Posix.InternalError;
internal static uint InvalidState => OperatingSystem.IsWindows() ? Windows.InvalidState : Posix.InvalidState;
internal static uint HandshakeFailure => OperatingSystem.IsWindows() ? Windows.HandshakeFailure : Posix.HandshakeFailure;
internal static uint UserCanceled => OperatingSystem.IsWindows() ? Windows.UserCanceled : Posix.UserCanceled;

// TODO return better error messages here.
public static string GetError(uint status) => OperatingSystem.IsWindows() ? Windows.GetError(status) : Posix.GetError(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ internal sealed class MsQuicConnection : QuicConnectionProvider
private readonly SafeMsQuicConfigurationHandle? _configuration;

private readonly State _state = new State();
private GCHandle _stateHandle;
private bool _disposed;
private int _disposed;

private IPEndPoint? _localEndPoint;
private readonly EndPoint _remoteEndPoint;
Expand All @@ -43,6 +42,7 @@ internal sealed class MsQuicConnection : QuicConnectionProvider
internal sealed class State
{
public SafeMsQuicConnectionHandle Handle = null!; // set inside of MsQuicConnection ctor.
public GCHandle StateGCHandle;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand why was the StateGCHandle moved inside State?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the connection is gone and state is kept alive because of the streams, we still want to lock state in place because we handed pointer to MsQuic. When we release all the streams, the original location is not accessible as we don't have link back to connection (and it may be gone) I did the move (sort of) in previous round using IntPtr. @CarnaViire and @jkotas suggested to preserve the type so I end up with this.


// These exists to prevent GC of the MsQuicConnection in the middle of an async op (Connect or Shutdown).
public MsQuicConnection? Connection;
Expand All @@ -53,6 +53,8 @@ internal sealed class State

public bool Connected;
public long AbortErrorCode = -1;
public int StreamCount;
wfurt marked this conversation as resolved.
Show resolved Hide resolved
private bool _closing;

// Queue for accepted streams.
// Backlog limit is managed by MsQuic so it can be unbounded here.
Expand All @@ -61,30 +63,79 @@ internal sealed class State
SingleReader = true,
SingleWriter = true,
});

public void RemoveStream(MsQuicStream stream)
{
bool releaseHandles;
lock (this)
{
StreamCount--;
wfurt marked this conversation as resolved.
Show resolved Hide resolved
releaseHandles = _closing && StreamCount == 0;
}

if (releaseHandles)
{
Handle?.Dispose();
if (StateGCHandle.IsAllocated) StateGCHandle.Free();
wfurt marked this conversation as resolved.
Show resolved Hide resolved
}
}

public bool TryQueueNewStream(SafeMsQuicStreamHandle streamHandle, QUIC_STREAM_OPEN_FLAGS flags)
{
var stream = new MsQuicStream(this, streamHandle, flags);
if (AcceptQueue.Writer.TryWrite(stream))
{
return true;
}
else
{
stream.Dispose();
return false;
}
}

public bool TryAddStream(MsQuicStream stream)
{
lock (this)
{
if (_closing)
{
return false;
}

StreamCount++;
return true;
}
}

// This is called under lock from connection dispose
public void SetClosing()
{
_closing = true;
wfurt marked this conversation as resolved.
Show resolved Hide resolved
}
}

// constructor for inbound connections
public MsQuicConnection(IPEndPoint localEndPoint, IPEndPoint remoteEndPoint, SafeMsQuicConnectionHandle handle)
{
_state.Handle = handle;
_state.StateGCHandle = GCHandle.Alloc(_state);
_state.Connected = true;
_localEndPoint = localEndPoint;
_remoteEndPoint = remoteEndPoint;
_remoteCertificateRequired = false;
_isServer = true;

_stateHandle = GCHandle.Alloc(_state);

try
{
MsQuicApi.Api.SetCallbackHandlerDelegate(
_state.Handle,
s_connectionDelegate,
GCHandle.ToIntPtr(_stateHandle));
GCHandle.ToIntPtr(_state.StateGCHandle));
}
catch
{
_stateHandle.Free();
_state.StateGCHandle.Free();
throw;
}

Expand All @@ -107,7 +158,7 @@ public MsQuicConnection(QuicClientConnectionOptions options)
_remoteCertificateValidationCallback = options.ClientAuthenticationOptions.RemoteCertificateValidationCallback;
}

_stateHandle = GCHandle.Alloc(_state);
_state.StateGCHandle = GCHandle.Alloc(_state);
try
{
// this handle is ref counted by MsQuic, so safe to dispose here.
Expand All @@ -116,14 +167,14 @@ public MsQuicConnection(QuicClientConnectionOptions options)
uint status = MsQuicApi.Api.ConnectionOpenDelegate(
MsQuicApi.Api.Registration,
s_connectionDelegate,
GCHandle.ToIntPtr(_stateHandle),
GCHandle.ToIntPtr(_state.StateGCHandle),
out _state.Handle);

QuicExceptionHelpers.ThrowIfFailed(status, "Could not open the connection.");
}
catch
{
_stateHandle.Free();
_state.StateGCHandle.Free();
throw;
}

Expand Down Expand Up @@ -198,9 +249,13 @@ private static uint HandleEventShutdownComplete(State state, ref ConnectionEvent
private static uint HandleEventNewStream(State state, ref ConnectionEvent connectionEvent)
{
var streamHandle = new SafeMsQuicStreamHandle(connectionEvent.Data.PeerStreamStarted.Stream);
var stream = new MsQuicStream(state, streamHandle, connectionEvent.Data.PeerStreamStarted.Flags);
if (!state.TryQueueNewStream(streamHandle, connectionEvent.Data.PeerStreamStarted.Flags))
{
// This will call StreamCloseDelegate and free the stream.
// We will return Success to the MsQuic to prevent double free.
streamHandle.Dispose();
}

state.AcceptQueue.Writer.TryWrite(stream);
return MsQuicStatusCodes.Success;
}

Expand Down Expand Up @@ -488,16 +543,48 @@ public override void Dispose()
Dispose(false);
}

private async Task FlushAcceptQueue()
{
try {
// Writer may or may not be completed.
_state.AcceptQueue.Writer.Complete();
} catch { };
wfurt marked this conversation as resolved.
Show resolved Hide resolved

await foreach (MsQuicStream item in _state.AcceptQueue.Reader.ReadAllAsync())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await foreach (MsQuicStream item in _state.AcceptQueue.Reader.ReadAllAsync())
await foreach (MsQuicStream item in _state.AcceptQueue.Reader.ReadAllAsync().ConfigureAwait(false))

{
item.Dispose();
}
}

private void Dispose(bool disposing)
{
if (_disposed)
int disposed = Interlocked.Exchange(ref _disposed, 1);
if (disposed == 1)
{
return;
}
wfurt marked this conversation as resolved.
Show resolved Hide resolved

_state?.Handle?.Dispose();
if (_stateHandle.IsAllocated) _stateHandle.Free();
_disposed = true;
bool releaseHandles = false;
lock (_state)
{
_state.Connection = null;
if (_state.StreamCount == 0)
{
releaseHandles = true;
}
else
{
// We have pending streams so we need to defer cleanup until last one is gone.
_state.SetClosing();
}
}

FlushAcceptQueue().GetAwaiter().GetResult();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connections are IAsyncDisposable; we should make dispose do proper await there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you suggest. We still need to flush for synchronous Dispose(). Can we do what ever needs to be done as follow-up? I would like to get this this in to get verifications on the crashes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine.

if (releaseHandles)
{
_state!.Handle?.Dispose();
if (_state.StateGCHandle.IsAllocated) _state.StateGCHandle.Free();
}
}

// TODO: this appears abortive and will cause prior successfully shutdown and closed streams to drop data.
Expand All @@ -511,7 +598,7 @@ internal override ValueTask CloseAsync(long errorCode, CancellationToken cancell

private void ThrowIfDisposed()
{
if (_disposed)
if (_disposed == 1)
wfurt marked this conversation as resolved.
Show resolved Hide resolved
{
throw new ObjectDisposedException(nameof(MsQuicStream));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ internal MsQuicStream(MsQuicConnection.State connectionState, SafeMsQuicStreamHa
throw;
}

if (!connectionState.TryAddStream(this))
{
_stateHandle.Free();
throw new ObjectDisposedException(nameof(QuicConnection));
}

if (NetEventSource.Log.IsEnabled())
{
NetEventSource.Info(
Expand Down Expand Up @@ -134,6 +140,13 @@ internal MsQuicStream(MsQuicConnection.State connectionState, QUIC_STREAM_OPEN_F
throw;
}

if (!connectionState.TryAddStream(this))
{
_state.Handle?.Dispose();
_stateHandle.Free();
throw new ObjectDisposedException(nameof(QuicConnection));
}

if (NetEventSource.Log.IsEnabled())
{
NetEventSource.Info(
Expand Down Expand Up @@ -320,7 +333,6 @@ internal override async ValueTask<int> ReadAsync(Memory<byte> destination, Cance
{
shouldComplete = true;
}

state.ReadState = ReadState.Aborted;
}

Expand Down Expand Up @@ -555,6 +567,8 @@ private void Dispose(bool disposing)
Marshal.FreeHGlobal(_state.SendQuicBuffers);
if (_stateHandle.IsAllocated) _stateHandle.Free();
CleanupSendState(_state);
Debug.Assert(_state.ConnectionState != null);
_state.ConnectionState?.RemoveStream(this);

if (NetEventSource.Log.IsEnabled())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ await RunClientServer(
await (new[] { t1, t2 }).WhenAllOrAnyFailed(millisecondsTimeout: 1000000);
}

[ActiveIssue("https://github.com/dotnet/runtime/issues/52048")]
[Fact]
public async Task ManagedAVE_MinimalFailingTest()
{
Expand All @@ -369,6 +368,32 @@ async Task GetStreamIdWithoutStartWorks()
// TODO: stream that is opened by client but left unaccepted by server may cause AccessViolationException in its Finalizer
}

await GetStreamIdWithoutStartWorks().WaitAsync(TimeSpan.FromSeconds(15));

GC.Collect();
}

[Fact]
public async Task DisposingConnection_OK()
{
async Task GetStreamIdWithoutStartWorks()
{
using QuicListener listener = CreateQuicListener();
using QuicConnection clientConnection = CreateQuicConnection(listener.ListenEndPoint);

ValueTask clientTask = clientConnection.ConnectAsync();
using QuicConnection serverConnection = await listener.AcceptConnectionAsync();
await clientTask;

using QuicStream clientStream = clientConnection.OpenBidirectionalStream();
Assert.Equal(0, clientStream.StreamId);

// Dispose all connections before the streams;
clientConnection.Dispose();
serverConnection.Dispose();
listener.Dispose();
}

await GetStreamIdWithoutStartWorks();

GC.Collect();
Expand Down