Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Optimize SqlClient SNIPacket async paths #34184

Merged
merged 5 commits into from
Dec 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/System.Data.SqlClient/src/System.Data.SqlClient.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@
<Compile Include="System\Data\ProviderBase\DbConnectionPool.NetCoreApp.cs" />
<Compile Include="System\Data\SqlClient\SqlConnectionString.NetCoreApp.cs" />
<Compile Include="System\Data\SqlClient\SqlConnectionStringBuilder.NetCoreApp.cs" />
<Compile Include="System\Data\SqlClient\SNI\SNIPacket.NetCoreApp.cs" />
</ItemGroup>
<ItemGroup Condition="'$(IsPartialFacadeAssembly)' != 'true' AND '$(OSGroup)' != 'AnyOS' AND '$(TargetsNetCoreApp)' != 'true'">
<Compile Include="System\Data\SqlClient\SNI\SNIPacket.NetStandard.cs" />
</ItemGroup>
<!-- Manage the SNI toggle for Windows netstandard and UWP -->
<ItemGroup Condition="('$(TargetGroup)' == 'netstandard' OR '$(TargetsNetCoreApp)' == 'true') AND '$(TargetsWindows)' == 'true'">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace System.Data.SqlClient.SNI
{
internal partial class SNIPacket
{
/// <summary>
/// Read data from a stream asynchronously
/// </summary>
/// <param name="stream">Stream to read from</param>
/// <param name="callback">Completion callback</param>
public void ReadFromStreamAsync(Stream stream, SNIAsyncCallback callback)
{
// Treat local function as a static and pass all params otherwise as async will allocate
async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, ValueTask<int> valueTask)
{
bool error = false;
try
{
packet._length = await valueTask.ConfigureAwait(false);
if (packet._length == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
}
}
catch (Exception ex)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, SNICommon.InternalExceptionError, ex);
error = true;
}

if (error)
{
packet.Release();
}

cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

ValueTask<int> vt = stream.ReadAsync(new Memory<byte>(_data, 0, _capacity), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
_length = vt.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

// Completed
return;
}
}

// Not complete or error call the async local function to complete
_ = ReadFromStreamAsync(this, callback, vt);
}

/// <summary>
/// Write data to a stream asynchronously
/// </summary>
/// <param name="stream">Stream to write to</param>
public void WriteToStreamAsync(Stream stream, SNIAsyncCallback callback, SNIProviders provider, bool disposeAfterWriteAsync = false)
{
// Treat local function as a static and pass all params otherwise as async will allocate
async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProviders providers, bool disposeAfter, ValueTask valueTask)
{
uint status = TdsEnums.SNI_SUCCESS;
try
{
await valueTask.ConfigureAwait(false);
}
catch (Exception e)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(providers, SNICommon.InternalExceptionError, e);
status = TdsEnums.SNI_ERROR;
}

cb(packet, status);

if (disposeAfter)
{
packet.Dispose();
}
}

ValueTask vt = stream.WriteAsync(new Memory<byte>(_data, 0, _length), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
// Read the result to register as complete for the ValueTask
vt.GetAwaiter().GetResult();

callback(this, TdsEnums.SNI_SUCCESS);

if (disposeAfterWriteAsync)
{
Dispose();
}

// Completed
return;
}

// Not complete or error call the async local function to complete
_ = WriteToStreamAsync(this, callback, provider, disposeAfterWriteAsync, vt);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace System.Data.SqlClient.SNI
{
internal partial class SNIPacket
{
/// <summary>
/// Read data from a stream asynchronously
/// </summary>
/// <param name="stream">Stream to read from</param>
/// <param name="callback">Completion callback</param>
public void ReadFromStreamAsync(Stream stream, SNIAsyncCallback callback)
{
// Treat local function as a static and pass all params otherwise as async will allocate
async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, Task<int> task)
{
bool error = false;
try
{
packet._length = await task.ConfigureAwait(false);
if (packet._length == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
}
}
catch (Exception ex)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, SNICommon.InternalExceptionError, ex);
error = true;
}

if (error)
{
packet.Release();
}

cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

Task<int> t = stream.ReadAsync(_data, 0, _capacity, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
_length = t.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

// Completed
return;
}
}

// Not complete or error call the async local function to complete
_ = ReadFromStreamAsync(this, callback, t);
}

/// <summary>
/// Write data to a stream asynchronously
/// </summary>
/// <param name="stream">Stream to write to</param>
public void WriteToStreamAsync(Stream stream, SNIAsyncCallback callback, SNIProviders provider, bool disposeAfterWriteAsync = false)
{
// Treat local function as a static and pass all params otherwise as async will allocate
async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProviders providers, bool disposeAfter, Task task)
{
uint status = TdsEnums.SNI_SUCCESS;
try
{
await task.ConfigureAwait(false);
}
catch (Exception e)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(providers, SNICommon.InternalExceptionError, e);
status = TdsEnums.SNI_ERROR;
}

cb(packet, status);

if (disposeAfter)
{
packet.Dispose();
}
}

Task t = stream.WriteAsync(_data, 0, _length, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
// Read the result to register as complete for the Task
t.GetAwaiter().GetResult();

callback(this, TdsEnums.SNI_SUCCESS);

if (disposeAfterWriteAsync)
{
Dispose();
}

// Completed
return;
}

// Not complete or error call the async local function to complete
_ = WriteToStreamAsync(this, callback, provider, disposeAfterWriteAsync, t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace System.Data.SqlClient.SNI
/// <summary>
/// SNI Packet
/// </summary>
internal class SNIPacket : IDisposable, IEquatable<SNIPacket>
internal partial class SNIPacket : IDisposable, IEquatable<SNIPacket>
{
private byte[] _data;
private int _length;
Expand Down Expand Up @@ -240,46 +240,6 @@ public void Reset()
_completionCallback = null;
}

/// <summary>
/// Read data from a stream asynchronously
/// </summary>
/// <param name="stream">Stream to read from</param>
/// <param name="callback">Completion callback</param>
public void ReadFromStreamAsync(Stream stream, SNIAsyncCallback callback)
{
bool error = false;

stream.ReadAsync(_data, 0, _capacity, CancellationToken.None).ContinueWith(t =>
{
Exception e = t.Exception?.InnerException;
if (e != null)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, SNICommon.InternalExceptionError, e);
error = true;
}
else
{
_length = t.Result;

if (_length == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
}
}

if (error)
{
Release();
}

callback(this, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
},
CancellationToken.None,
TaskContinuationOptions.DenyChildAttach,
TaskScheduler.Default);
}

/// <summary>
/// Read data from a stream synchronously
/// </summary>
Expand All @@ -298,30 +258,6 @@ public void WriteToStream(Stream stream)
stream.Write(_data, 0, _length);
}

/// <summary>
/// Write data to a stream asynchronously
/// </summary>
/// <param name="stream">Stream to write to</param>
public async void WriteToStreamAsync(Stream stream, SNIAsyncCallback callback, SNIProviders provider, bool disposeAfterWriteAsync = false)
{
uint status = TdsEnums.SNI_SUCCESS;
try
{
await stream.WriteAsync(_data, 0, _length, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception e)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(provider, SNICommon.InternalExceptionError, e);
status = TdsEnums.SNI_ERROR;
}
callback(this, status);

if (disposeAfterWriteAsync)
{
Dispose();
}
}

/// <summary>
/// Get hash code
/// </summary>
Expand Down