Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Release 1.1] Enable context switch by default in NetFx 4.8 to fix MARS TDS Header errors #959

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4239,15 +4239,6 @@ internal void ResetSnapshotState()
_stateObj._cleanupMetaData = _snapshotCleanupMetaData;
_stateObj._cleanupAltMetaDataSetArray = _snapshotCleanupAltMetaDataSetArray;

// Make sure to go through the appropriate increment/decrement methods if changing the OpenResult flag
if (!_stateObj.HasOpenResult && _state.HasFlag(SnapshottedStateFlags.OpenResult))
{
_stateObj.IncrementAndObtainOpenResultCount(_stateObj._executedUnderTransaction);
}
else if (_stateObj.HasOpenResult && !_state.HasFlag(SnapshottedStateFlags.OpenResult))
{
_stateObj.DecrementOpenResultCount();
}
_stateObj._snapshottedState = _state;

// Reset partially read state (these only need to be maintained if doing async without snapshot)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ static partial void PopulateDefaultValuesPartial(string platformIdentifier, stri
case ".NETCore":
case ".NETFramework":
{
if (version <= 40702)
{
LocalAppContext.DefineSwitchDefault(LocalAppContextSwitches.MakeReadAsyncBlockingString, true);
}
LocalAppContext.DefineSwitchDefault(LocalAppContextSwitches.MakeReadAsyncBlockingString, true);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4337,18 +4337,7 @@ internal void ResetSnapshotState()
_stateObj._nullBitmapInfo = _snapshotNullBitmapInfo;
_stateObj._cleanupMetaData = _snapshotCleanupMetaData;
_stateObj._cleanupAltMetaDataSetArray = _snapshotCleanupAltMetaDataSetArray;

// Make sure to go through the appropriate increment/decrement methods if changing HasOpenResult
if (!_stateObj._hasOpenResult && _snapshotHasOpenResult)
{
_stateObj.IncrementAndObtainOpenResultCount(_stateObj._executedUnderTransaction);
}
else if (_stateObj._hasOpenResult && !_snapshotHasOpenResult)
{
_stateObj.DecrementOpenResultCount();
}
//else _stateObj._hasOpenResult is already == _snapshotHasOpenResult

_stateObj._hasOpenResult = _snapshotHasOpenResult;
_stateObj._receivedColMetaData = _snapshotReceivedColumnMetadata;
_stateObj._attentionReceived = _snapshotAttentionReceived;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,7 @@ public AsyncCancelledConnectionsTest(ITestOutputHelper output)
[ConditionalFact(typeof(DataTestUtility), nameof(DataTestUtility.AreConnStringsSetup))]
public void CancelAsyncConnections()
{
SqlConnectionStringBuilder builder = new SqlConnectionStringBuilder(DataTestUtility.TCPConnectionString);
builder.MultipleActiveResultSets = false;
RunCancelAsyncConnections(builder, false);
RunCancelAsyncConnections(builder, true);
builder.MultipleActiveResultSets = true;
RunCancelAsyncConnections(builder, false);
RunCancelAsyncConnections(builder, true);
}

private void RunCancelAsyncConnections(SqlConnectionStringBuilder connectionStringBuilder, bool makeAsyncBlocking)
{
SqlConnection.ClearAllPools();
AppContext.SetSwitch("Switch.Microsoft.Data.SqlClient.MakeReadAsyncBlocking", makeAsyncBlocking);

string connectionString = DataTestUtility.TCPConnectionString;
_watch = Stopwatch.StartNew();
_random = new Random(4); // chosen via fair dice role.
ParallelLoopResult results = new ParallelLoopResult();
Expand All @@ -50,7 +37,7 @@ private void RunCancelAsyncConnections(SqlConnectionStringBuilder connectionStri
results = Parallel.For(
fromInclusive: 0,
toExclusive: NumberOfTasks,
(int i) => DoManyAsync(connectionStringBuilder).GetAwaiter().GetResult());
(int i) => DoManyAsync(connectionString).GetAwaiter().GetResult());
}
}
catch (Exception ex)
Expand Down Expand Up @@ -90,26 +77,18 @@ private void DisplaySummary()
}

// This is the the main body that our Tasks run
private async Task DoManyAsync(SqlConnectionStringBuilder connectionStringBuilder)
private async Task DoManyAsync(string connectionString)
{
Interlocked.Increment(ref _start);
Interlocked.Increment(ref _inFlight);

using (SqlConnection marsConnection = new SqlConnection(connectionStringBuilder.ToString()))
{
if (connectionStringBuilder.MultipleActiveResultSets)
{
await marsConnection.OpenAsync();
}

// First poison
await DoOneAsync(marsConnection, connectionStringBuilder.ToString(), poison: true);
// First poison
await DoOneAsync(connectionString, poison: true);

for (int i = 0; i < NumberOfNonPoisoned && _continue; i++)
{
// now run some without poisoning
await DoOneAsync(marsConnection, connectionStringBuilder.ToString());
}
for (int i = 0; i < NumberOfNonPoisoned && _continue; i++)
{
// now run some without poisoning
await DoOneAsync(connectionString);
}

Interlocked.Decrement(ref _inFlight);
Expand All @@ -120,30 +99,95 @@ private async Task DoManyAsync(SqlConnectionStringBuilder connectionStringBuilde
// if we are poisoning we will
// 1 - Interject some sleeps in the sql statement so that it will run long enough that we can cancel it
// 2 - Setup a time bomb task that will cancel the command a random amount of time later
private async Task DoOneAsync(SqlConnection marsConnection, string connectionString, bool poison = false)
private async Task DoOneAsync(string connectionString, bool poison = false)
{
try
{
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 4; i++)
using (var connection = new SqlConnection(connectionString))
{
builder.AppendLine("SELECT name FROM sys.tables");
if (poison && i < 3)
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 4; i++)
{
builder.AppendLine("WAITFOR DELAY '00:00:01'");
builder.AppendLine("SELECT name FROM sys.tables");
if (poison && i < 3)
{
builder.AppendLine("WAITFOR DELAY '00:00:01'");
}
}
}

using (var connection = new SqlConnection(connectionString))
{
if (marsConnection != null && marsConnection.State == System.Data.ConnectionState.Open)
int rowsRead = 0;
int resultRead = 0;

try
{
await RunCommand(marsConnection, builder.ToString(), poison);
await connection.OpenAsync();
using (var command = connection.CreateCommand())
{
Task timeBombTask = default;
try
{
// Setup our time bomb
if (poison)
{
timeBombTask = TimeBombAsync(command);
}

command.CommandText = builder.ToString();

// Attempt to read all of the data
using (var reader = await command.ExecuteReaderAsync())
{
try
{
do
{
resultRead++;
while (await reader.ReadAsync() && _continue)
{
rowsRead++;
}
}
while (await reader.NextResultAsync() && _continue);
}
catch when (poison)
{
// This looks a little strange, we failed to read above so this should fail too
// But consider the case where this code is elsewhere (in the Dispose method of a class holding this logic)
try
{
while (await reader.NextResultAsync())
{
}
}
catch
{
Interlocked.Increment(ref _poisonCleanUpExceptions);
}

throw;
}
}
}
finally
{
// Make sure to clean up our time bomb
// It is unlikely, but the timebomb may get delayed in the Task Queue
// And we don't want it running after we dispose the command
if (timeBombTask != default)
{
await timeBombTask;
}
}
}
}
else
finally
{
await connection.OpenAsync();
await RunCommand(connection, builder.ToString(), poison);
Interlocked.Add(ref _rowsRead, rowsRead);
Interlocked.Add(ref _resultRead, resultRead);
if (poison)
{
Interlocked.Increment(ref _poisonedEnded);
}
}
}
}
Expand Down Expand Up @@ -179,83 +223,6 @@ private async Task DoOneAsync(SqlConnection marsConnection, string connectionStr
}
}

private async Task RunCommand(SqlConnection connection, string commandText, bool poison)
{
int rowsRead = 0;
int resultRead = 0;

try
{
using (var command = connection.CreateCommand())
{
Task timeBombTask = default;
try
{
// Setup our time bomb
if (poison)
{
timeBombTask = TimeBombAsync(command);
}

command.CommandText = commandText;

// Attempt to read all of the data
using (var reader = await command.ExecuteReaderAsync())
{
try
{
do
{
resultRead++;
while (await reader.ReadAsync() && _continue)
{
rowsRead++;
}
}
while (await reader.NextResultAsync() && _continue);
}
catch when (poison)
{
// This looks a little strange, we failed to read above so this should fail too
// But consider the case where this code is elsewhere (in the Dispose method of a class holding this logic)
try
{
while (await reader.NextResultAsync())
{
}
}
catch
{
Interlocked.Increment(ref _poisonCleanUpExceptions);
}

throw;
}
}
}
finally
{
// Make sure to clean up our time bomb
// It is unlikely, but the timebomb may get delayed in the Task Queue
// And we don't want it running after we dispose the command
if (timeBombTask != default)
{
await timeBombTask;
}
}
}
}
finally
{
Interlocked.Add(ref _rowsRead, rowsRead);
Interlocked.Add(ref _resultRead, resultRead);
if (poison)
{
Interlocked.Increment(ref _poisonedEnded);
}
}
}

private async Task TimeBombAsync(SqlCommand command)
{
await SleepAsync(100, 3000);
Expand Down