Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Rework ExecuteReaderAsync to minimize allocations #528

Merged
merged 5 commits into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@
<Reference Include="System.Memory" />
</ItemGroup>
<ItemGroup>
<Compile Include="Microsoft\Data\SqlClient\AAsyncCallContext.cs" />
<Compile Include="Resources\SR.Designer.cs">
<DesignTime>True</DesignTime>
<AutoGen>True</AutoGen>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
Expand All @@ -15,6 +16,7 @@ namespace Microsoft.Data.ProviderBase
{
internal abstract partial class DbConnectionFactory
{
private static readonly Action<Task<DbConnectionInternal>, object> s_tryGetConnectionCompletedContinuation = TryGetConnectionCompletedContinuation;

internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions, DbConnectionInternal oldConnection, out DbConnectionInternal connection)
{
Expand Down Expand Up @@ -82,25 +84,7 @@ internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSour

// now that we have an antecedent task, schedule our work when it is completed.
// If it is a new slot or a completed task, this continuation will start right away.
newTask = s_pendingOpenNonPooled[idx].ContinueWith((_) =>
{
Transaction originalTransaction = ADP.GetCurrentTransaction();
try
{
ADP.SetCurrentTransaction(retry.Task.AsyncState as Transaction);
var newConnection = CreateNonPooledConnection(owningConnection, poolGroup, userOptions);
if ((oldConnection != null) && (oldConnection.State == ConnectionState.Open))
{
oldConnection.PrepareForReplaceConnection();
oldConnection.Dispose();
}
return newConnection;
}
finally
{
ADP.SetCurrentTransaction(originalTransaction);
}
}, cancellationTokenSource.Token, TaskContinuationOptions.LongRunning, TaskScheduler.Default);
newTask = CreateReplaceConnectionContinuation(s_pendingOpenNonPooled[idx], owningConnection, retry, userOptions, oldConnection, poolGroup, cancellationTokenSource);

// Place this new task in the slot so any future work will be queued behind it
s_pendingOpenNonPooled[idx] = newTask;
Expand All @@ -114,29 +98,11 @@ internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSour
}

// once the task is done, propagate the final results to the original caller
newTask.ContinueWith((task) =>
{
cancellationTokenSource.Dispose();
if (task.IsCanceled)
{
retry.TrySetException(ADP.ExceptionWithStackTrace(ADP.NonPooledOpenTimeout()));
}
else if (task.IsFaulted)
{
retry.TrySetException(task.Exception.InnerException);
}
else
{
if (!retry.TrySetResult(task.Result))
{
// The outer TaskCompletionSource was already completed
// Which means that we don't know if someone has messed with the outer connection in the middle of creation
// So the best thing to do now is to destroy the newly created connection
task.Result.DoomThisConnection();
task.Result.Dispose();
}
}
}, TaskScheduler.Default);
newTask.ContinueWith(
continuationAction: s_tryGetConnectionCompletedContinuation,
state: Tuple.Create(cancellationTokenSource, retry),
scheduler: TaskScheduler.Default
);

return false;
}
Expand Down Expand Up @@ -188,5 +154,62 @@ internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSour

return true;
}

private Task<DbConnectionInternal> CreateReplaceConnectionContinuation(Task<DbConnectionInternal> task, DbConnection owningConnection, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions, DbConnectionInternal oldConnection, DbConnectionPoolGroup poolGroup, CancellationTokenSource cancellationTokenSource)
{
return task.ContinueWith(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small comment @Wraith2: ContinueWith generally performs much worse than a simple async method with await (not to mention that's it's more brittle wrt exception handling etc.). I can post some benchmarking if you want.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though this may be totally justified if TaskContinuationOptions.LongRunning is important here - just making the general remark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Language supported async await would be easier to understand as well.
Whoever wrote the original code had some reason to use imperative constructs instead of language support but I've no idea what it was.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Language supported async await would be easier to understand as well.

Absolutely.

Whoever wrote the original code had some reason to use imperative constructs instead of language support but I've no idea what it was.

I suspect a lot of the code was simply written before async/await was introduced (in .NET 4.5, relatively "late"). I'd definitely consider replacing the callback approach as you work through the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine with me but it's down to the MS team to make that sort of decision. I don't want to make working on this library any harder than it already is so I try to replicate the existing style wherever it's sensible to do so in my changes.

There are several library wide things that would be nice to do.

  1. Merge the netfx and netcore codebases into a single solution with documented reasons for any differences
  2. Change how async methods are implemented with snapshots to reduce performance issues.
  3. Change from imperative async to language async.

We've got 1 in progress but the others really need 1 to be done and judged stable so we don't have to duplicate a lot of work. 1 will aoso bring perf improvements done in netcore to netfx.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vote for prioritizing 1 - it is currently a quite confusing codebase, which makes community contributions harder than the should be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

working on it bit by bit. #625 The easy part is the identical files. The complex part is the files which have opposing or complex changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, thing kind of cleanup can be done separately of course. I'd suggest that if ContinueWith continuations are being rewritten for unrelated reasons, it may be a good opportunity to just do that with async/await, but of course that's up to the team and you.

I also totally agree that merging the netfx/netcore codebases is a high-priority task.

(_) =>
{
Transaction originalTransaction = ADP.GetCurrentTransaction();
try
{
ADP.SetCurrentTransaction(retry.Task.AsyncState as Transaction);
var newConnection = CreateNonPooledConnection(owningConnection, poolGroup, userOptions);
if ((oldConnection != null) && (oldConnection.State == ConnectionState.Open))
{
oldConnection.PrepareForReplaceConnection();
oldConnection.Dispose();
}
return newConnection;
}
finally
{
ADP.SetCurrentTransaction(originalTransaction);
}
},
cancellationTokenSource.Token,
TaskContinuationOptions.LongRunning,
TaskScheduler.Default
);
}

private static void TryGetConnectionCompletedContinuation(Task<DbConnectionInternal> task, object state)
DavoudEshtehari marked this conversation as resolved.
Show resolved Hide resolved
{
Tuple<CancellationTokenSource, TaskCompletionSource<DbConnectionInternal>> parameters = (Tuple<CancellationTokenSource, TaskCompletionSource<DbConnectionInternal>>)state;
CancellationTokenSource source = parameters.Item1;
source.Dispose();

TaskCompletionSource<DbConnectionInternal> retryCompletionSource = parameters.Item2;

if (task.IsCanceled)
{
retryCompletionSource.TrySetException(ADP.ExceptionWithStackTrace(ADP.NonPooledOpenTimeout()));
}
else if (task.IsFaulted)
{
retryCompletionSource.TrySetException(task.Exception.InnerException);
}
else
{
if (!retryCompletionSource.TrySetResult(task.Result))
{
// The outer TaskCompletionSource was already completed
// Which means that we don't know if someone has messed with the outer connection in the middle of creation
// So the best thing to do now is to destroy the newly created connection
task.Result.DoomThisConnection();
task.Result.Dispose();
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Threading.Tasks;

namespace Microsoft.Data.SqlClient
{
// this class is a base class for creating derived objects that will store state for async operations
// avoiding the use of closures and allowing caching/reuse of the instances for frequently used async
// calls
//
// DO derive from this and seal your class
// DO add additional fields or properties needed for the async operation and then override Clear to zero them
// DO override AfterClear and use the owner parameter to return the object to a cache location if you have one, this is the purpose of the method
// CONSIDER creating your own Set method that calls the base Set rather than providing a parameterized ctor, it is friendlier to caching
// DO NOT use this class' state after Dispose has been called. It will not throw ObjectDisposedException but it will be a cleared object

internal abstract class AAsyncCallContext<TOwner, TTask> : IDisposable
where TOwner : class
{
protected TOwner _owner;
protected TaskCompletionSource<TTask> _source;
protected IDisposable _disposable;

protected AAsyncCallContext()
{
}

protected AAsyncCallContext(TOwner owner, TaskCompletionSource<TTask> source, IDisposable disposable = null)
{
Set(owner, source, disposable);
}

protected void Set(TOwner owner, TaskCompletionSource<TTask> source, IDisposable disposable = null)
{
_owner = owner ?? throw new ArgumentNullException(nameof(owner));
_source = source ?? throw new ArgumentNullException(nameof(source));
_disposable = disposable;
}

protected void ClearCore()
{
_source = null;
_owner = default;
IDisposable copyDisposable = _disposable;
_disposable = null;
copyDisposable?.Dispose();
}

/// <summary>
/// override this method to cleanup instance data before ClearCore is called which will blank the base data
/// </summary>
protected virtual void Clear()
{
}

/// <summary>
/// override this method to do work after the instance has been totally blanked, intended for cache return etc
/// </summary>
protected virtual void AfterCleared(TOwner owner)
{
}

public void Dispose()
{
TOwner owner = _owner;
try
{
Clear();
}
finally
{
ClearCore();
}
AfterCleared(owner);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2265,7 +2265,7 @@ private Task ReadWriteColumnValueAsync(int col)
return writeTask;
}

private void RegisterForConnectionCloseNotification<T>(ref Task<T> outerTask)
private Task<T> RegisterForConnectionCloseNotification<T>(Task<T> outerTask)
{
SqlConnection connection = _connection;
if (connection == null)
Expand All @@ -2274,7 +2274,7 @@ private void RegisterForConnectionCloseNotification<T>(ref Task<T> outerTask)
throw ADP.ClosedConnectionError();
}

connection.RegisterForConnectionCloseNotification<T>(ref outerTask, this, SqlReferenceCollection.BulkCopyTag);
return connection.RegisterForConnectionCloseNotification(outerTask, this, SqlReferenceCollection.BulkCopyTag);
}

// Runs a loop to copy all columns of a single row.
Expand Down Expand Up @@ -3057,7 +3057,7 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken)
source = new TaskCompletionSource<object>(); // Creating the completion source/Task that we pass to application
resultTask = source.Task;

RegisterForConnectionCloseNotification(ref resultTask);
resultTask = RegisterForConnectionCloseNotification(resultTask);
}

if (_destinationTableName == null)
Expand Down
Loading