Skip to content

Commit

Permalink
Fix Async Cancel (#956)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wraith2 committed Sep 17, 2021
1 parent f25e96d commit 576fadb
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1470,19 +1470,13 @@ public int EndExecuteNonQueryAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
if (!_internalEndExecuteInitiated && _stateObj != null)
{
lock (_stateObj)
{
return EndExecuteNonQueryInternal(asyncResult);
}
}
else
{
return EndExecuteNonQueryInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
}
return EndExecuteNonQueryInternal(asyncResult);
}
}

Expand Down Expand Up @@ -1904,19 +1898,15 @@ private XmlReader EndExecuteXmlReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
{
lock (_stateObj)
{
return EndExecuteXmlReaderInternal(asyncResult);
}
}
else

if (!_internalEndExecuteInitiated && _stateObj != null)
{
return EndExecuteXmlReaderInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
}

return EndExecuteXmlReaderInternal(asyncResult);
}
}

Expand Down Expand Up @@ -2102,18 +2092,15 @@ internal SqlDataReader EndExecuteReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
if (!_internalEndExecuteInitiated)
{
lock (_stateObj)
{
return EndExecuteReaderInternal(asyncResult);
}
}
else

if (!_internalEndExecuteInitiated && _stateObj != null)
{
return EndExecuteReaderInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot happen after
// we have changed started the end processing
_stateObj.SetCancelStateClosed();
}

return EndExecuteReaderInternal(asyncResult);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,18 @@ public TimeoutState(int value)
// 2) post first packet write, but before session return - a call to cancel will send an
// attention to the server
// 3) post session close - no attention is allowed
private bool _cancelled;
private const int _waitForCancellationLockPollTimeout = 100;
private WeakReference _cancellationOwner = new WeakReference(null);

private static class CancelState
{
public const int Unset = 0;
public const int Closed = 1;
public const int Cancelled = 2;
}

private int _cancelState;

// Cache the transaction for which this command was executed so upon completion we can
// decrement the appropriate result count.
internal SqlInternalTransaction _executedUnderTransaction;
Expand Down Expand Up @@ -623,68 +631,50 @@ internal void Activate(object owner)
Debug.Assert(result == 1, "invalid deactivate count");
}

internal bool SetCancelStateClosed()
{
return Interlocked.CompareExchange(ref _cancelState, CancelState.Closed, CancelState.Unset) == CancelState.Unset && _cancelState == CancelState.Closed;
}

// This method is only called by the command or datareader as a result of a user initiated
// cancel request.
internal void Cancel(object caller)
{
Debug.Assert(caller != null, "Null caller for Cancel!");
Debug.Assert(caller is SqlCommand || caller is SqlDataReader, "Calling API with invalid caller type: " + caller.GetType());

bool hasLock = false;
try
// only change state if it is Unset, so don't check the return value
Interlocked.CompareExchange(ref _cancelState, CancelState.Cancelled, CancelState.Unset);

if ((_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken)
&& (_cancellationOwner.Target == caller) && HasPendingData && !_attentionSent)
{
// Keep looping until we either grabbed the lock (and therefore sent attention) or the connection closes\breaks
while ((!hasLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
bool hasParserLock = false;
// Keep looping until we have the parser lock (and so are allowed to write), or the connection closes\breaks
while ((!hasParserLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
{
Monitor.TryEnter(this, _waitForCancellationLockPollTimeout, ref hasLock);
if (hasLock)
{ // Lock for the time being - since we need to synchronize the attention send.
// This lock is also protecting against concurrent close and async continuations

// Ensure that, once we have the lock, that we are still the owner
if ((!_cancelled) && (_cancellationOwner.Target == caller))
try
{
_parser.Connection._parserLock.Wait(canReleaseFromAnyThread: false, timeout: _waitForCancellationLockPollTimeout, lockTaken: ref hasParserLock);
if (hasParserLock)
{
_cancelled = true;

if (HasPendingData && !_attentionSent)
_parser.Connection.ThreadHasParserLockForClose = true;
SendAttention();
}
}
finally
{
if (hasParserLock)
{
if (_parser.Connection.ThreadHasParserLockForClose)
{
bool hasParserLock = false;
// Keep looping until we have the parser lock (and so are allowed to write), or the connection closes\breaks
while ((!hasParserLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
{
try
{
_parser.Connection._parserLock.Wait(canReleaseFromAnyThread: false, timeout: _waitForCancellationLockPollTimeout, lockTaken: ref hasParserLock);
if (hasParserLock)
{
_parser.Connection.ThreadHasParserLockForClose = true;
SendAttention();
}
}
finally
{
if (hasParserLock)
{
if (_parser.Connection.ThreadHasParserLockForClose)
{
_parser.Connection.ThreadHasParserLockForClose = false;
}
_parser.Connection._parserLock.Release();
}
}
}
_parser.Connection.ThreadHasParserLockForClose = false;
}
_parser.Connection._parserLock.Release();
}
}
}
}
finally
{
if (hasLock)
{
Monitor.Exit(this);
}
}
}

// CancelRequest - use to cancel while writing a request to the server
Expand Down Expand Up @@ -771,7 +761,7 @@ private void ResetCancelAndProcessAttention()
lock (this)
{
// Reset cancel state.
_cancelled = false;
_cancelState = CancelState.Unset;
_cancellationOwner.Target = null;

if (_attentionSent)
Expand Down Expand Up @@ -993,10 +983,10 @@ internal Task ExecuteFlush()
{
lock (this)
{
if (_cancelled && 1 == _outputPacketNumber)
if (_cancelState != CancelState.Unset && 1 == _outputPacketNumber)
{
ResetBuffer();
_cancelled = false;
_cancelState = CancelState.Unset;
throw SQL.OperationCancelled();
}
else
Expand Down Expand Up @@ -3354,7 +3344,7 @@ internal Task WritePacket(byte flushMode, bool canAccumulate = false)
byte packetNumber = _outputPacketNumber;

// Set Status byte based whether this is end of message or not
bool willCancel = (_cancelled) && (_parser._asyncWrite);
bool willCancel = (_cancelState != CancelState.Unset) && (_parser._asyncWrite);
if (willCancel)
{
status = TdsEnums.ST_EOM | TdsEnums.ST_IGNORE;
Expand Down Expand Up @@ -3402,7 +3392,7 @@ internal Task WritePacket(byte flushMode, bool canAccumulate = false)

private void CancelWritePacket()
{
Debug.Assert(_cancelled, "Should not call CancelWritePacket if _cancelled is not set");
Debug.Assert(_cancelState != CancelState.Unset, "Should not call CancelWritePacket if _cancelled is not set");

_parser.Connection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we are holding the lock
try
Expand Down Expand Up @@ -3988,7 +3978,7 @@ internal void AssertStateIsClean()
Debug.Assert(_delayedWriteAsyncCallbackException == null, "StateObj has an unobserved exceptions from an async write");
// Attention\Cancellation\Timeouts
Debug.Assert(!HasReceivedAttention && !_attentionSent && !_attentionSending, $"StateObj is still dealing with attention: Sent: {_attentionSent}, Received: {HasReceivedAttention}, Sending: {_attentionSending}");
Debug.Assert(!_cancelled, "StateObj still has cancellation set");
Debug.Assert(_cancelState == CancelState.Unset, "StateObj still has cancellation set");
Debug.Assert(_timeoutState == TimeoutState.Stopped, "StateObj still has internal timeout set");
// Errors and Warnings
Debug.Assert(!_hasErrorOrWarning, "StateObj still has stored errors or warnings");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1783,19 +1783,13 @@ private int EndExecuteNonQueryAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
if (!_internalEndExecuteInitiated && _stateObj != null)
{
lock (_stateObj)
{
return EndExecuteNonQueryInternal(asyncResult);
}
}
else
{
return EndExecuteNonQueryInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
}
return EndExecuteNonQueryInternal(asyncResult);
}
}

Expand Down Expand Up @@ -2303,19 +2297,14 @@ private XmlReader EndExecuteXmlReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
{
lock (_stateObj)
{
return EndExecuteXmlReaderInternal(asyncResult);
}
}
else
if (!_internalEndExecuteInitiated && _stateObj != null)
{
return EndExecuteXmlReaderInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
}

return EndExecuteXmlReaderInternal(asyncResult);
}
}

Expand Down Expand Up @@ -2562,19 +2551,15 @@ private SqlDataReader EndExecuteReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
{
lock (_stateObj)
{
return EndExecuteReaderInternal(asyncResult);
}
}
else

if (!_internalEndExecuteInitiated && _stateObj != null)
{
return EndExecuteReaderInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot happen after
// we have changed started the end processing
_stateObj.SetCancelStateClosed();
}

return EndExecuteReaderInternal(asyncResult);
}
}

Expand Down
Loading

0 comments on commit 576fadb

Please sign in to comment.