Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Commit

Permalink
Optimize SqlClient SNIPacket async paths (#34184)
Browse files Browse the repository at this point in the history
* Optimize SNIPacket async paths

* Feedback

* NET Core vs NET Std

* csproj

* Feedback
  • Loading branch information
benaadams authored and saurabh500 committed Dec 21, 2018
1 parent 8fd4d1d commit 6870c4e
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 65 deletions.
4 changes: 4 additions & 0 deletions src/System.Data.SqlClient/src/System.Data.SqlClient.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@
<Compile Include="System\Data\ProviderBase\DbConnectionPool.NetCoreApp.cs" />
<Compile Include="System\Data\SqlClient\SqlConnectionString.NetCoreApp.cs" />
<Compile Include="System\Data\SqlClient\SqlConnectionStringBuilder.NetCoreApp.cs" />
<Compile Include="System\Data\SqlClient\SNI\SNIPacket.NetCoreApp.cs" />
</ItemGroup>
<ItemGroup Condition="'$(IsPartialFacadeAssembly)' != 'true' AND '$(OSGroup)' != 'AnyOS' AND '$(TargetsNetCoreApp)' != 'true'">
<Compile Include="System\Data\SqlClient\SNI\SNIPacket.NetStandard.cs" />
</ItemGroup>
<!-- Manage the SNI toggle for Windows netstandard and UWP -->
<ItemGroup Condition="('$(TargetGroup)' == 'netstandard' OR '$(TargetsNetCoreApp)' == 'true') AND '$(TargetsWindows)' == 'true'">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace System.Data.SqlClient.SNI
{
internal partial class SNIPacket
{
/// <summary>
/// Read data from a stream asynchronously
/// </summary>
/// <param name="stream">Stream to read from</param>
/// <param name="callback">Completion callback</param>
public void ReadFromStreamAsync(Stream stream, SNIAsyncCallback callback)
{
// Treat local function as a static and pass all params otherwise as async will allocate
async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, ValueTask<int> valueTask)
{
bool error = false;
try
{
packet._length = await valueTask.ConfigureAwait(false);
if (packet._length == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
}
}
catch (Exception ex)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, SNICommon.InternalExceptionError, ex);
error = true;
}

if (error)
{
packet.Release();
}

cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

ValueTask<int> vt = stream.ReadAsync(new Memory<byte>(_data, 0, _capacity), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
_length = vt.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

// Completed
return;
}
}

// Not complete or error call the async local function to complete
_ = ReadFromStreamAsync(this, callback, vt);
}

/// <summary>
/// Write data to a stream asynchronously
/// </summary>
/// <param name="stream">Stream to write to</param>
public void WriteToStreamAsync(Stream stream, SNIAsyncCallback callback, SNIProviders provider, bool disposeAfterWriteAsync = false)
{
// Treat local function as a static and pass all params otherwise as async will allocate
async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProviders providers, bool disposeAfter, ValueTask valueTask)
{
uint status = TdsEnums.SNI_SUCCESS;
try
{
await valueTask.ConfigureAwait(false);
}
catch (Exception e)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(providers, SNICommon.InternalExceptionError, e);
status = TdsEnums.SNI_ERROR;
}

cb(packet, status);

if (disposeAfter)
{
packet.Dispose();
}
}

ValueTask vt = stream.WriteAsync(new Memory<byte>(_data, 0, _length), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
// Read the result to register as complete for the ValueTask
vt.GetAwaiter().GetResult();

callback(this, TdsEnums.SNI_SUCCESS);

if (disposeAfterWriteAsync)
{
Dispose();
}

// Completed
return;
}

// Not complete or error call the async local function to complete
_ = WriteToStreamAsync(this, callback, provider, disposeAfterWriteAsync, vt);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace System.Data.SqlClient.SNI
{
internal partial class SNIPacket
{
/// <summary>
/// Read data from a stream asynchronously
/// </summary>
/// <param name="stream">Stream to read from</param>
/// <param name="callback">Completion callback</param>
public void ReadFromStreamAsync(Stream stream, SNIAsyncCallback callback)
{
// Treat local function as a static and pass all params otherwise as async will allocate
async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, Task<int> task)
{
bool error = false;
try
{
packet._length = await task.ConfigureAwait(false);
if (packet._length == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
}
}
catch (Exception ex)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, SNICommon.InternalExceptionError, ex);
error = true;
}

if (error)
{
packet.Release();
}

cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

Task<int> t = stream.ReadAsync(_data, 0, _capacity, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
_length = t.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

// Completed
return;
}
}

// Not complete or error call the async local function to complete
_ = ReadFromStreamAsync(this, callback, t);
}

/// <summary>
/// Write data to a stream asynchronously
/// </summary>
/// <param name="stream">Stream to write to</param>
public void WriteToStreamAsync(Stream stream, SNIAsyncCallback callback, SNIProviders provider, bool disposeAfterWriteAsync = false)
{
// Treat local function as a static and pass all params otherwise as async will allocate
async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProviders providers, bool disposeAfter, Task task)
{
uint status = TdsEnums.SNI_SUCCESS;
try
{
await task.ConfigureAwait(false);
}
catch (Exception e)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(providers, SNICommon.InternalExceptionError, e);
status = TdsEnums.SNI_ERROR;
}

cb(packet, status);

if (disposeAfter)
{
packet.Dispose();
}
}

Task t = stream.WriteAsync(_data, 0, _length, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
// Read the result to register as complete for the Task
t.GetAwaiter().GetResult();

callback(this, TdsEnums.SNI_SUCCESS);

if (disposeAfterWriteAsync)
{
Dispose();
}

// Completed
return;
}

// Not complete or error call the async local function to complete
_ = WriteToStreamAsync(this, callback, provider, disposeAfterWriteAsync, t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace System.Data.SqlClient.SNI
/// <summary>
/// SNI Packet
/// </summary>
internal class SNIPacket : IDisposable, IEquatable<SNIPacket>
internal partial class SNIPacket : IDisposable, IEquatable<SNIPacket>
{
private byte[] _data;
private int _length;
Expand Down Expand Up @@ -240,46 +240,6 @@ public void Reset()
_completionCallback = null;
}

/// <summary>
/// Read data from a stream asynchronously
/// </summary>
/// <param name="stream">Stream to read from</param>
/// <param name="callback">Completion callback</param>
public void ReadFromStreamAsync(Stream stream, SNIAsyncCallback callback)
{
bool error = false;

stream.ReadAsync(_data, 0, _capacity, CancellationToken.None).ContinueWith(t =>
{
Exception e = t.Exception?.InnerException;
if (e != null)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, SNICommon.InternalExceptionError, e);
error = true;
}
else
{
_length = t.Result;
if (_length == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
}
}
if (error)
{
Release();
}
callback(this, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
},
CancellationToken.None,
TaskContinuationOptions.DenyChildAttach,
TaskScheduler.Default);
}

/// <summary>
/// Read data from a stream synchronously
/// </summary>
Expand All @@ -298,30 +258,6 @@ public void WriteToStream(Stream stream)
stream.Write(_data, 0, _length);
}

/// <summary>
/// Write data to a stream asynchronously
/// </summary>
/// <param name="stream">Stream to write to</param>
public async void WriteToStreamAsync(Stream stream, SNIAsyncCallback callback, SNIProviders provider, bool disposeAfterWriteAsync = false)
{
uint status = TdsEnums.SNI_SUCCESS;
try
{
await stream.WriteAsync(_data, 0, _length, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception e)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(provider, SNICommon.InternalExceptionError, e);
status = TdsEnums.SNI_ERROR;
}
callback(this, status);

if (disposeAfterWriteAsync)
{
Dispose();
}
}

/// <summary>
/// Get hash code
/// </summary>
Expand Down

0 comments on commit 6870c4e

Please sign in to comment.