Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Optimize SqlClient SNIPacket async paths #34184

Merged
merged 5 commits into from
Dec 21, 2018
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 70 additions & 31 deletions src/System.Data.SqlClient/src/System/Data/SqlClient/SNI/SNIPacket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -247,37 +247,50 @@ public void Reset()
/// <param name="callback">Completion callback</param>
public void ReadFromStreamAsync(Stream stream, SNIAsyncCallback callback)
{
bool error = false;

stream.ReadAsync(_data, 0, _capacity, CancellationToken.None).ContinueWith(t =>
// Treat local function as a static and pass all params otherwise as async will allocate
async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, ValueTask<int> valueTask)
{
Exception e = t.Exception?.InnerException;
if (e != null)
bool error = false;
try
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, SNICommon.InternalExceptionError, e);
error = true;
}
else
{
_length = t.Result;

if (_length == 0)
packet._length = await valueTask.ConfigureAwait(false);
if (packet._length == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
}
}
catch (Exception ex)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, SNICommon.InternalExceptionError, ex);
error = true;
}

if (error)
{
Release();
packet.Release();
}

callback(this, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
},
CancellationToken.None,
TaskContinuationOptions.DenyChildAttach,
TaskScheduler.Default);
cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

ValueTask<int> vt = stream.ReadAsync(new Memory<byte>(_data, 0, _capacity), CancellationToken.None);
benaadams marked this conversation as resolved.
Show resolved Hide resolved

if (vt.IsCompletedSuccessfully)
{
_length = vt.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

// Completed
return;
}
}

// Not complete or error call the async local function to complete
_ = ReadFromStreamAsync(this, callback, vt);
}

/// <summary>
Expand All @@ -302,24 +315,50 @@ public void WriteToStream(Stream stream)
/// Write data to a stream asynchronously
/// </summary>
/// <param name="stream">Stream to write to</param>
public async void WriteToStreamAsync(Stream stream, SNIAsyncCallback callback, SNIProviders provider, bool disposeAfterWriteAsync = false)
public void WriteToStreamAsync(Stream stream, SNIAsyncCallback callback, SNIProviders provider, bool disposeAfterWriteAsync = false)
{
uint status = TdsEnums.SNI_SUCCESS;
try
// Treat local function as a static and pass all params otherwise as async will allocate
async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProviders providers, bool disposeAfter, ValueTask valueTask)
{
await stream.WriteAsync(_data, 0, _length, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception e)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(provider, SNICommon.InternalExceptionError, e);
status = TdsEnums.SNI_ERROR;
uint status = TdsEnums.SNI_SUCCESS;
try
{
await valueTask.ConfigureAwait(false);
}
catch (Exception e)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(providers, SNICommon.InternalExceptionError, e);
status = TdsEnums.SNI_ERROR;
}

cb(packet, status);

if (disposeAfter)
{
packet.Dispose();
}
}
callback(this, status);

if (disposeAfterWriteAsync)
ValueTask vt = stream.WriteAsync(new Memory<byte>(_data, 0, _length), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
benaadams marked this conversation as resolved.
Show resolved Hide resolved
Dispose();
// Read the result to register as complete for the ValueTask
vt.GetAwaiter().GetResult();

callback(this, TdsEnums.SNI_SUCCESS);

if (disposeAfterWriteAsync)
{
Dispose();
}

// Completed
return;
}

// Not complete or error call the async local function to complete
_ = WriteToStreamAsync(this, callback, provider, disposeAfterWriteAsync, vt);
}

/// <summary>
Expand Down