Skip to content

Commit

Permalink
updates stream sketch
Browse files Browse the repository at this point in the history
  • Loading branch information
ilyalatt committed Apr 15, 2019
1 parent 175a44a commit e829fbc
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 8 deletions.
33 changes: 33 additions & 0 deletions Telega.Example/Program.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using LanguageExt;
using Newtonsoft.Json;
using Telega.Rpc.Dto.Functions.Users;
using Telega.Rpc.Dto.Types;
using static LanguageExt.Prelude;

Expand Down Expand Up @@ -137,6 +139,35 @@ await tg.Messages.SendPhoto(
);
}

static async Task PrintUserInfo(TelegramClient tg)
{
var myInfo = await tg.Call(new GetFullUser(new InputUser.SelfTag()));
Console.WriteLine(myInfo);
}

static async Task ListenUpdates(TelegramClient tg)
{
tg.Updates.Stream.Subscribe(
onNext: updatesType =>
{
var messageText = updatesType.Match(
updateShortMessageTag: x => Some("updateShortMessageTag: " + x.Message),
updateShortChatMessageTag: x => Some("updateShortChatMessageTag: " + x.Message),
updateShortTag: update => update.Update.Match(
newMessageTag: msg => msg.Message.AsTag().Map(x => "newMessageTag: " + x.Message),
editMessageTag: msg => msg.Message.AsTag().Map(x => "editMessageTag: " + x.Message),
editChannelMessageTag: msg => msg.Message.AsTag().Map(x => "editChannelMessageTag: " + x.Message),
_: () => None
),
_: () => None
);
messageText.Iter(Console.WriteLine);
},
onError: Console.WriteLine
);
await Task.Delay(Timeout.Infinite);
}

static async Task Main()
{
// it is disabled by default
Expand All @@ -147,9 +178,11 @@ static async Task Main()
{
await EnsureAuthorized(tg, cfg);

// await PrintUserInfo(tg);
// await DownloadFirstChannelPictureExample(tg);
// await PrintFirstChannelTop100MessagesExample(tg);
await SendOnePavelDurovPictureToMeExample(tg);
await ListenUpdates(tg);
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion Telega/CallMiddleware/TgCustomizedTransport.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using System;
using System.Threading.Tasks;
using LanguageExt;
using Telega.Rpc;
using Telega.Rpc.Dto;

namespace Telega.CallMiddleware
{
sealed class TgCustomizedTransport
sealed class TgCustomizedTransport : IDisposable
{
public readonly TgTransport Transport;
public readonly TgCallMiddlewareChain CallMiddlewareChain;
Expand All @@ -16,6 +17,8 @@ public TgCustomizedTransport(Some<TgTransport> transport, Some<TgCallMiddlewareC
CallMiddlewareChain = callMiddlewareChain;
}

public void Dispose() => Transport.Dispose();

public async Task<T> Call<T>(ITgFunc<T> func)
{
TgCallHandler<T> handler = f => Transport.Call(f);
Expand Down
19 changes: 18 additions & 1 deletion Telega/Connect/TgBellhop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
using System.Threading.Tasks;
using LanguageExt;
using Telega.CallMiddleware;
using Telega.Rpc;
using Telega.Rpc.Dto;
using Telega.Rpc.Dto.Types;
using Telega.Utils;

namespace Telega.Connect
Expand All @@ -11,6 +13,7 @@ sealed class TgBellhop
{
public readonly TgConnectionPool ConnectionPool;
public readonly Var<TgConnection> CurrentConnection;
public readonly CustomObservable<UpdatesType> Updates = new CustomObservable<UpdatesType>();

public IVarGetter<Session> SessionVar =>
CurrentConnection.Bind(x => x.Session);
Expand All @@ -19,18 +22,29 @@ sealed class TgBellhop
public void SetSession(Func<Session, Session> func) =>
CurrentConnection.Get().Session.SetWith(func);

void MirrorUpdates(TgConnection conn)
{
conn.Transport.Transport.Updates.Subscribe(
onNext: Updates.OnNext,
onError: Updates.OnError,
onCompleted: Updates.OnCompleted
);
}

async Task<TgConnection> ChangeConn(Func<TgConnection, Task<TgConnection>> f)
{
var oldConn = CurrentConnection.Get();
var newConn = await f(oldConn);
CurrentConnection.Set(newConn);
MirrorUpdates(newConn);
return newConn;
}

public TgBellhop(Some<TgConnectionPool> connectionPool, Some<TgConnection> currentConnection)
{
ConnectionPool = connectionPool;
CurrentConnection = currentConnection.Value.AsVar();
MirrorUpdates(currentConnection);
}

public TgBellhop Fork() =>
Expand All @@ -57,8 +71,11 @@ async Task<T> CallWithReConnect<T>(ITgFunc<T> func)
var conn = CurrentConnection.Get();
return await conn.Transport.Call(func);
}
catch (TgBrokenConnectionException)
catch (TgTransportException)
{
var oldConn = CurrentConnection.Get();
oldConn.Dispose();

var conn = await ChangeConn(x => ConnectionPool.ReConnect(x.Config.ThisDc));
return await conn.Transport.Call(func);
}
Expand Down
5 changes: 4 additions & 1 deletion Telega/Connect/TgConnection.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
using System;
using LanguageExt;
using Telega.CallMiddleware;
using Telega.Rpc.Dto.Types;
using Telega.Utils;

namespace Telega.Connect
{
sealed class TgConnection
sealed class TgConnection : IDisposable
{
public readonly Var<Session> Session;
public readonly TgCustomizedTransport Transport;
Expand All @@ -17,5 +18,7 @@ public TgConnection(Some<Var<Session>> session, Some<TgCustomizedTransport> tran
Transport = transport;
Config = config;
}

public void Dispose() => Transport.Dispose();
}
}
2 changes: 1 addition & 1 deletion Telega/Connect/TgConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public async Task<TgConnection> ReConnect(int dcId)
public void Dispose()
{
_isDisposed = true;
_conns.Values.Iter(x => x.Transport.Transport.Dispose());
_conns.Values.Iter(x => x.Dispose());
}

public TgConnectionPool(
Expand Down
21 changes: 21 additions & 0 deletions Telega/Rpc/CustomObservable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Reactive.Disposables;

namespace Telega.Rpc
{
sealed class CustomObservable<T> : IObservable<T>
{
readonly List<IObserver<T>> _observers = new List<IObserver<T>>();

public void OnCompleted() => _observers.ForEach(x => x.OnCompleted());
public void OnError(Exception error) => _observers.ForEach(x => x.OnError(error));
public void OnNext(T value) => _observers.ForEach(x => x.OnNext(value));

public IDisposable Subscribe(IObserver<T> observer)
{
_observers.Add(observer);
return Disposable.Create(() => _observers.Remove(observer));
}
}
}
2 changes: 1 addition & 1 deletion Telega/Rpc/TgSystemMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public static Func<Message, Unit> Handle(TgSystemMessageHandlerContext ctx) => m
UpdatesType.TryDeserialize(typeNumber, br).Match(updates =>
{
ctx.Ack.Add(msgId);
// TgTrace.Trace("Updates " + updatesOpt.ToString());
ctx.Updates.Add(updates);
},
() =>
{
Expand Down
2 changes: 2 additions & 0 deletions Telega/Rpc/TgSystemMessageHandlerContext.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using LanguageExt;
using Telega.Rpc.Dto.Types;

namespace Telega.Rpc
{
Expand All @@ -8,6 +9,7 @@ sealed class TgSystemMessageHandlerContext
{
public readonly List<long> Ack = new List<long>();
public readonly List<RpcResult> RpcResults = new List<RpcResult>();
public readonly List<UpdatesType> Updates = new List<UpdatesType>();
public Option<long> NewSalt;
}
}
32 changes: 30 additions & 2 deletions Telega/Rpc/TgTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using LanguageExt;
using Telega.Connect;
using Telega.Internal;
using Telega.Rpc.Dto;
using Telega.Rpc.Dto.Types;
Expand All @@ -22,7 +23,10 @@ class TgTransport : IDisposable
readonly Task _receiveLoopTask;
readonly ConcurrentDictionary<long, TaskCompletionSource<RpcResult>> _rpcFlow =
new ConcurrentDictionary<long, TaskCompletionSource<RpcResult>>();
async Task ReceiveLoop()

public readonly CustomObservable<UpdatesType> Updates = new CustomObservable<UpdatesType>();

async Task ReceiveLoopImpl()
{
while (true)
{
Expand All @@ -42,6 +46,21 @@ Option<TaskCompletionSource<RpcResult>> CaptureFlow(long id) =>
flow => flow.SetResult(res),
() => TgTrace.Trace($"TgTransport: Unexpected RPC result, the message id is {res.Id}")
));

ctx.Updates.Iter(Updates.OnNext);
}
}

async Task ReceiveLoop()
{
try
{
await ReceiveLoopImpl();
}
catch (TgTransportException e)
{
// Updates.OnError(e);
throw;
}
}

Expand Down Expand Up @@ -148,7 +167,16 @@ async Task CheckReceiveLoop()
var tcs = new TaskCompletionSource<RpcResult>();
_rpcFlow[msgId] = tcs;
await _transport.Send(container);
try
{
await _transport.Send(container);
}
catch (TgTransportException e)
{
// Updates.OnError(e);
throw;
}
return tcs.Task;
});

Expand Down
5 changes: 4 additions & 1 deletion Telega/Telega.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
<Description>Telegram client</Description>
<Authors>ilyalatt</Authors>

<VersionPrefix>0.3.1</VersionPrefix>
<VersionPrefix>0.3.2</VersionPrefix>
<VersionSuffix>preview001</VersionSuffix>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BigMathNetStandard" Version="1.0.0" />
<PackageReference Include="LanguageExt.Core" Version="3.1.14" />
<PackageReference Include="System.Reactive" Version="4.1.5" />
<PackageReference Include="System.Reactive.Observable.Aliases" Version="4.1.5" />
</ItemGroup>

</Project>
2 changes: 2 additions & 0 deletions Telega/TelegramClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public sealed class TelegramClient : IDisposable
public readonly TelegramClientContacts Contacts;
public readonly TelegramClientMessages Messages;
public readonly TelegramClientUpload Upload;
public readonly TelegramClientUpdates Updates;

static readonly IPEndPoint DefaultEndpoint = new IPEndPoint(IPAddress.Parse("149.154.167.50"), 443);

Expand All @@ -32,6 +33,7 @@ ISessionStore sessionStore
Contacts = new TelegramClientContacts(_bellhop);
Messages = new TelegramClientMessages(_bellhop);
Upload = new TelegramClientUpload(_bellhop);
Updates = new TelegramClientUpdates(_bellhop);
}

public void Dispose()
Expand Down
19 changes: 19 additions & 0 deletions Telega/TelegramClientUpdates.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;
using LanguageExt;
using Telega.Connect;
using Telega.Rpc.Dto.Types;

namespace Telega
{
public sealed class TelegramClientUpdates
{
readonly TgBellhop _tg;
public readonly IObservable<UpdatesType> Stream;

internal TelegramClientUpdates(Some<TgBellhop> tg)
{
_tg = tg;
Stream = _tg.Updates;
}
}
}

0 comments on commit e829fbc

Please sign in to comment.