Skip to content

Commit

Permalink
Merged PR 1070: Porting PR 34184
Browse files Browse the repository at this point in the history
Porting PR 34184 from dotnet/corefx#34184
Optimizes allocations in the async read and write paths for SqlClient's managed SNI implementation
  • Loading branch information
Saurabh Singh (SQL) committed Jan 2, 2019
1 parent 6e58906 commit 9bd2dec
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
<Compile Include="System\Data\SqlClient\ColumnEncryptionKeyInfo.cs" />
<Compile Include="System\Data\SqlClient\EnclaveDelegate.cs" />
<Compile Include="System\Data\SqlClient\SqlQueryMetadataCache.cs" />
<Compile Include="System\Data\SqlClient\SNI\SNIPacket.NetCoreApp.cs" />
</ItemGroup>
<ItemGroup Condition="'$(IsPartialFacadeAssembly)' != 'true' AND '$(OSGroup)' != 'AnyOS' AND '$(TargetGroup)' == 'netstandard'">
<Compile Include="System\Data\SqlClient\SNI\SNIPacket.NetStandard.cs" />
</ItemGroup>
<ItemGroup Condition="'$(IsPartialFacadeAssembly)' != 'true' AND '$(OSGroup)' != 'AnyOS'">
<Compile Include="$(CommonPath)\System\SR.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Data.SqlClient.SNI
{
internal partial class SNIPacket
{
/// <summary>
/// Read data from a stream asynchronously
/// </summary>
/// <param name="stream">Stream to read from</param>
/// <param name="callback">Completion callback</param>
public void ReadFromStreamAsync(Stream stream, SNIAsyncCallback callback)
{
// Treat local function as a static and pass all params otherwise as async will allocate
async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, ValueTask<int> valueTask)
{
bool error = false;
try
{
packet._length = await valueTask.ConfigureAwait(false);
if (packet._length == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
}
}
catch (Exception ex)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, SNICommon.InternalExceptionError, ex);
error = true;
}

if (error)
{
packet.Release();
}

cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

ValueTask<int> vt = stream.ReadAsync(new Memory<byte>(_data, 0, _capacity), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
_length = vt.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

// Completed
return;
}
}

// Not complete or error call the async local function to complete
_ = ReadFromStreamAsync(this, callback, vt);
}

/// <summary>
/// Write data to a stream asynchronously
/// </summary>
/// <param name="stream">Stream to write to</param>
public void WriteToStreamAsync(Stream stream, SNIAsyncCallback callback, SNIProviders provider, bool disposeAfterWriteAsync = false)
{
// Treat local function as a static and pass all params otherwise as async will allocate
async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProviders providers, bool disposeAfter, ValueTask valueTask)
{
uint status = TdsEnums.SNI_SUCCESS;
try
{
await valueTask.ConfigureAwait(false);
}
catch (Exception e)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(providers, SNICommon.InternalExceptionError, e);
status = TdsEnums.SNI_ERROR;
}

cb(packet, status);

if (disposeAfter)
{
packet.Dispose();
}
}

ValueTask vt = stream.WriteAsync(new Memory<byte>(_data, 0, _length), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
// Read the result to register as complete for the ValueTask
vt.GetAwaiter().GetResult();

callback(this, TdsEnums.SNI_SUCCESS);

if (disposeAfterWriteAsync)
{
Dispose();
}

// Completed
return;
}

// Not complete or error call the async local function to complete
_ = WriteToStreamAsync(this, callback, provider, disposeAfterWriteAsync, vt);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Data.SqlClient.SNI
{
internal partial class SNIPacket
{
/// <summary>
/// Read data from a stream asynchronously
/// </summary>
/// <param name="stream">Stream to read from</param>
/// <param name="callback">Completion callback</param>
public void ReadFromStreamAsync(Stream stream, SNIAsyncCallback callback)
{
// Treat local function as a static and pass all params otherwise as async will allocate
async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, Task<int> task)
{
bool error = false;
try
{
packet._length = await task.ConfigureAwait(false);
if (packet._length == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
}
}
catch (Exception ex)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, SNICommon.InternalExceptionError, ex);
error = true;
}

if (error)
{
packet.Release();
}

cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

Task<int> t = stream.ReadAsync(_data, 0, _capacity, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
_length = t.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

// Completed
return;
}
}

// Not complete or error call the async local function to complete
_ = ReadFromStreamAsync(this, callback, t);
}

/// <summary>
/// Write data to a stream asynchronously
/// </summary>
/// <param name="stream">Stream to write to</param>
public void WriteToStreamAsync(Stream stream, SNIAsyncCallback callback, SNIProviders provider, bool disposeAfterWriteAsync = false)
{
// Treat local function as a static and pass all params otherwise as async will allocate
async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProviders providers, bool disposeAfter, Task task)
{
uint status = TdsEnums.SNI_SUCCESS;
try
{
await task.ConfigureAwait(false);
}
catch (Exception e)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(providers, SNICommon.InternalExceptionError, e);
status = TdsEnums.SNI_ERROR;
}

cb(packet, status);

if (disposeAfter)
{
packet.Dispose();
}
}

Task t = stream.WriteAsync(_data, 0, _length, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
// Read the result to register as complete for the Task
t.GetAwaiter().GetResult();

callback(this, TdsEnums.SNI_SUCCESS);

if (disposeAfterWriteAsync)
{
Dispose();
}

// Completed
return;
}

// Not complete or error call the async local function to complete
_ = WriteToStreamAsync(this, callback, provider, disposeAfterWriteAsync, t);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Microsoft.Data.SqlClient.SNI
/// <summary>
/// SNI Packet
/// </summary>
internal class SNIPacket : IDisposable, IEquatable<SNIPacket>
internal partial class SNIPacket : IDisposable, IEquatable<SNIPacket>
{
private byte[] _data;
private int _length;
Expand Down Expand Up @@ -241,46 +241,6 @@ public void Reset()
_completionCallback = null;
}

/// <summary>
/// Read data from a stream asynchronously
/// </summary>
/// <param name="stream">Stream to read from</param>
/// <param name="callback">Completion callback</param>
public void ReadFromStreamAsync(Stream stream, SNIAsyncCallback callback)
{
bool error = false;

stream.ReadAsync(_data, 0, _capacity, CancellationToken.None).ContinueWith(t =>
{
Exception e = t.Exception?.InnerException;
if (e != null)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, SNICommon.InternalExceptionError, e);
error = true;
}
else
{
_length = t.Result;
if (_length == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
}
}
if (error)
{
Release();
}
callback(this, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
},
CancellationToken.None,
TaskContinuationOptions.DenyChildAttach,
TaskScheduler.Default);
}

/// <summary>
/// Read data from a stream synchronously
/// </summary>
Expand All @@ -299,30 +259,6 @@ public void WriteToStream(Stream stream)
stream.Write(_data, 0, _length);
}

/// <summary>
/// Write data to a stream asynchronously
/// </summary>
/// <param name="stream">Stream to write to</param>
public async void WriteToStreamAsync(Stream stream, SNIAsyncCallback callback, SNIProviders provider, bool disposeAfterWriteAsync = false)
{
uint status = TdsEnums.SNI_SUCCESS;
try
{
await stream.WriteAsync(_data, 0, _length, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception e)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(provider, SNICommon.InternalExceptionError, e);
status = TdsEnums.SNI_ERROR;
}
callback(this, status);

if (disposeAfterWriteAsync)
{
Dispose();
}
}

/// <summary>
/// Get hash code
/// </summary>
Expand Down

0 comments on commit 9bd2dec

Please sign in to comment.