Skip to content

Commit

Permalink
Added TCP client sample to HubSample (#1805)
Browse files Browse the repository at this point in the history
- Ripped of Kestrel's SocketConnection to make a TcpConnection
IConnection implementation.
- Fixed issue with SignalR assuming there will always be a non-null user
on the ConnectionContext.
  • Loading branch information
davidfowl authored Apr 2, 2018
1 parent ef30e2e commit e9d5815
Show file tree
Hide file tree
Showing 10 changed files with 505 additions and 9 deletions.
4 changes: 4 additions & 0 deletions samples/ClientSample/ClientSample.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
<OutputType>Exe</OutputType>
</PropertyGroup>

<ItemGroup>
<Compile Include="..\..\src\Common\DuplexPipe.cs" Link="DuplexPipe.cs" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.AspNetCore.SignalR.Client\Microsoft.AspNetCore.SignalR.Client.csproj" />
</ItemGroup>
Expand Down
12 changes: 6 additions & 6 deletions samples/ClientSample/HubSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR.Client;
Expand All @@ -27,12 +28,11 @@ internal static void Register(CommandLineApplication app)

public static async Task<int> ExecuteAsync(string baseUrl)
{
baseUrl = string.IsNullOrEmpty(baseUrl) ? "http://localhost:5000/default" : baseUrl;

Console.WriteLine("Connecting to {0}", baseUrl);
var endpoint = new IPEndPoint(IPAddress.Loopback, 9001);
Console.WriteLine("Connecting to {0}", endpoint);
var connection = new HubConnectionBuilder()
.WithUrl(baseUrl)
.WithConsoleLogger(LogLevel.Trace)
.WithEndPoint(endpoint)
.WithConsoleLogger(LogLevel.Information)
.Build();

try
Expand All @@ -44,7 +44,7 @@ public static async Task<int> ExecuteAsync(string baseUrl)

await ConnectAsync(connection);

Console.WriteLine("Connected to {0}", baseUrl);
Console.WriteLine("Connected to {0}", endpoint);

var sendCts = new CancellationTokenSource();

Expand Down
24 changes: 24 additions & 0 deletions samples/ClientSample/Tcp/BufferExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Text;

namespace System
{
public static class BufferExtensions
{
public static ArraySegment<byte> GetArray(this Memory<byte> memory)
{
return ((ReadOnlyMemory<byte>)memory).GetArray();
}

public static ArraySegment<byte> GetArray(this ReadOnlyMemory<byte> memory)
{
if (!MemoryMarshal.TryGetArray(memory, out var result))
{
throw new InvalidOperationException("Buffer backed by array was expected");
}
return result;
}
}
}
71 changes: 71 additions & 0 deletions samples/ClientSample/Tcp/SocketAwaitable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ClientSample
{
public class SocketAwaitable : ICriticalNotifyCompletion
{
private static readonly Action _callbackCompleted = () => { };

private readonly PipeScheduler _ioScheduler;

private Action _callback;
private int _bytesTransferred;
private SocketError _error;

public SocketAwaitable(PipeScheduler ioScheduler)
{
_ioScheduler = ioScheduler;
}

public SocketAwaitable GetAwaiter() => this;
public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);

public int GetResult()
{
Debug.Assert(ReferenceEquals(_callback, _callbackCompleted));

_callback = null;

if (_error != SocketError.Success)
{
throw new SocketException((int)_error);
}

return _bytesTransferred;
}

public void OnCompleted(Action continuation)
{
if (ReferenceEquals(_callback, _callbackCompleted) ||
ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted))
{
Task.Run(continuation);
}
}

public void UnsafeOnCompleted(Action continuation)
{
OnCompleted(continuation);
}

public void Complete(int bytesTransferred, SocketError socketError)
{
_error = socketError;
_bytesTransferred = bytesTransferred;
var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);

if (continuation != null)
{
_ioScheduler.Schedule(state => ((Action)state)(), continuation);
}
}
}
}
40 changes: 40 additions & 0 deletions samples/ClientSample/Tcp/SocketReceiver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Text;

namespace ClientSample
{
public class SocketReceiver
{
private readonly Socket _socket;
private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs();
private readonly SocketAwaitable _awaitable;

public SocketReceiver(Socket socket, PipeScheduler scheduler)
{
_socket = socket;
_awaitable = new SocketAwaitable(scheduler);
_eventArgs.UserToken = _awaitable;
_eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError);
}

public SocketAwaitable ReceiveAsync(Memory<byte> buffer)
{
#if NETCOREAPP2_1
_eventArgs.SetBuffer(buffer);
#else
var segment = buffer.GetArray();

_eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count);
#endif
if (!_socket.ReceiveAsync(_eventArgs))
{
_awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError);
}

return _awaitable;
}
}
}
100 changes: 100 additions & 0 deletions samples/ClientSample/Tcp/SocketSender.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Text;

namespace ClientSample
{
public class SocketSender
{
private readonly Socket _socket;
private readonly SocketAsyncEventArgs _eventArgs = new SocketAsyncEventArgs();
private readonly SocketAwaitable _awaitable;

private List<ArraySegment<byte>> _bufferList;

public SocketSender(Socket socket, PipeScheduler scheduler)
{
_socket = socket;
_awaitable = new SocketAwaitable(scheduler);
_eventArgs.UserToken = _awaitable;
_eventArgs.Completed += (_, e) => ((SocketAwaitable)e.UserToken).Complete(e.BytesTransferred, e.SocketError);
}

public SocketAwaitable SendAsync(ReadOnlySequence<byte> buffers)
{
if (buffers.IsSingleSegment)
{
return SendAsync(buffers.First);
}

#if NETCOREAPP2_1
if (!_eventArgs.MemoryBuffer.Equals(Memory<byte>.Empty))
#else
if (_eventArgs.Buffer != null)
#endif
{
_eventArgs.SetBuffer(null, 0, 0);
}

_eventArgs.BufferList = GetBufferList(buffers);

if (!_socket.SendAsync(_eventArgs))
{
_awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError);
}

return _awaitable;
}

private SocketAwaitable SendAsync(ReadOnlyMemory<byte> memory)
{
// The BufferList getter is much less expensive then the setter.
if (_eventArgs.BufferList != null)
{
_eventArgs.BufferList = null;
}

#if NETCOREAPP2_1
_eventArgs.SetBuffer(MemoryMarshal.AsMemory(memory));
#else
var segment = memory.GetArray();

_eventArgs.SetBuffer(segment.Array, segment.Offset, segment.Count);
#endif
if (!_socket.SendAsync(_eventArgs))
{
_awaitable.Complete(_eventArgs.BytesTransferred, _eventArgs.SocketError);
}

return _awaitable;
}

private List<ArraySegment<byte>> GetBufferList(ReadOnlySequence<byte> buffer)
{
Debug.Assert(!buffer.IsEmpty);
Debug.Assert(!buffer.IsSingleSegment);

if (_bufferList == null)
{
_bufferList = new List<ArraySegment<byte>>();
}
else
{
// Buffers are pooled, so it's OK to root them until the next multi-buffer write.
_bufferList.Clear();
}

foreach (var b in buffer)
{
_bufferList.Add(b.GetArray());
}

return _bufferList;
}
}
}
Loading

0 comments on commit e9d5815

Please sign in to comment.