Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Commit

Permalink
Optimize SNIPacket async paths
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams committed Dec 17, 2018
1 parent 24ecf91 commit 1922660
Showing 1 changed file with 69 additions and 31 deletions.
100 changes: 69 additions & 31 deletions src/System.Data.SqlClient/src/System/Data/SqlClient/SNI/SNIPacket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ internal class SNIPacket : IDisposable, IEquatable<SNIPacket>

private ArrayPool<byte> _arrayPool = ArrayPool<byte>.Shared;
private bool _isBufferFromArrayPool = false;
private ValueTask _asyncOperation;

public SNIPacket() { }

Expand Down Expand Up @@ -238,6 +239,7 @@ public void Reset()
_offset = 0;
_description = null;
_completionCallback = null;
_asyncOperation = default;
}

/// <summary>
Expand All @@ -247,37 +249,50 @@ public void Reset()
/// <param name="callback">Completion callback</param>
public void ReadFromStreamAsync(Stream stream, SNIAsyncCallback callback)
{
bool error = false;

stream.ReadAsync(_data, 0, _capacity, CancellationToken.None).ContinueWith(t =>
// Treat local function as a static and pass all params otherwise as async will allocate
async ValueTask ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, ValueTask<int> valueTask)
{
Exception e = t.Exception?.InnerException;
if (e != null)
bool error = false;
try
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, SNICommon.InternalExceptionError, e);
error = true;
}
else
{
_length = t.Result;
if (_length == 0)
packet._length = await valueTask.ConfigureAwait(false);
if (packet._length == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
}
}
catch (Exception ex)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, SNICommon.InternalExceptionError, ex);
error = true;
}

if (error)
{
Release();
packet.Release();
}

cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

ValueTask<int> vt = stream.ReadAsync(new Memory<byte>(_data, 0, _capacity), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
_length = vt.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

// Completed
return;
}
}

callback(this, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
},
CancellationToken.None,
TaskContinuationOptions.DenyChildAttach,
TaskScheduler.Default);
// Not complete or error call the async local function to complete
_asyncOperation = ReadFromStreamAsync(this, callback, vt);
}

/// <summary>
Expand All @@ -302,24 +317,47 @@ public void WriteToStream(Stream stream)
/// Write data to a stream asynchronously
/// </summary>
/// <param name="stream">Stream to write to</param>
public async void WriteToStreamAsync(Stream stream, SNIAsyncCallback callback, SNIProviders provider, bool disposeAfterWriteAsync = false)
public void WriteToStreamAsync(Stream stream, SNIAsyncCallback callback, SNIProviders provider, bool disposeAfterWriteAsync = false)
{
uint status = TdsEnums.SNI_SUCCESS;
try
// Treat local function as a static and pass all params otherwise as async will allocate
async ValueTask WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProviders providers, bool disposeAfter, ValueTask valueTask)
{
await stream.WriteAsync(_data, 0, _length, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception e)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(provider, SNICommon.InternalExceptionError, e);
status = TdsEnums.SNI_ERROR;
uint status = TdsEnums.SNI_SUCCESS;
try
{
await valueTask.ConfigureAwait(false);
}
catch (Exception e)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(providers, SNICommon.InternalExceptionError, e);
status = TdsEnums.SNI_ERROR;
}

cb(packet, status);

if (disposeAfter)
{
packet.Dispose();
}
}
callback(this, status);

if (disposeAfterWriteAsync)
ValueTask vt = stream.WriteAsync(new Memory<byte>(_data, 0, _length), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
Dispose();
callback(this, TdsEnums.SNI_SUCCESS);

if (disposeAfterWriteAsync)
{
Dispose();
}

// Completed
return;
}

// Not complete or error call the async local function to complete
_asyncOperation = WriteToStreamAsync(this, callback, provider, disposeAfterWriteAsync, vt);
}

/// <summary>
Expand Down

0 comments on commit 1922660

Please sign in to comment.