From e829fbc46b05080ec732fec98bc4da6e744c3d2b Mon Sep 17 00:00:00 2001 From: ilya latushko Date: Tue, 16 Apr 2019 00:19:23 +0300 Subject: [PATCH] updates stream sketch --- Telega.Example/Program.cs | 33 +++++++++++++++++++ .../CallMiddleware/TgCustomizedTransport.cs | 5 ++- Telega/Connect/TgBellhop.cs | 19 ++++++++++- Telega/Connect/TgConnection.cs | 5 ++- Telega/Connect/TgConnectionPool.cs | 2 +- Telega/Rpc/CustomObservable.cs | 21 ++++++++++++ Telega/Rpc/TgSystemMessageHandler.cs | 2 +- Telega/Rpc/TgSystemMessageHandlerContext.cs | 2 ++ Telega/Rpc/TgTransport.cs | 32 ++++++++++++++++-- Telega/Telega.csproj | 5 ++- Telega/TelegramClient.cs | 2 ++ Telega/TelegramClientUpdates.cs | 19 +++++++++++ 12 files changed, 139 insertions(+), 8 deletions(-) create mode 100644 Telega/Rpc/CustomObservable.cs create mode 100644 Telega/TelegramClientUpdates.cs diff --git a/Telega.Example/Program.cs b/Telega.Example/Program.cs index fc8ef84..56b0f51 100644 --- a/Telega.Example/Program.cs +++ b/Telega.Example/Program.cs @@ -1,9 +1,11 @@ using System; using System.IO; using System.Net; +using System.Threading; using System.Threading.Tasks; using LanguageExt; using Newtonsoft.Json; +using Telega.Rpc.Dto.Functions.Users; using Telega.Rpc.Dto.Types; using static LanguageExt.Prelude; @@ -137,6 +139,35 @@ await tg.Messages.SendPhoto( ); } + static async Task PrintUserInfo(TelegramClient tg) + { + var myInfo = await tg.Call(new GetFullUser(new InputUser.SelfTag())); + Console.WriteLine(myInfo); + } + + static async Task ListenUpdates(TelegramClient tg) + { + tg.Updates.Stream.Subscribe( + onNext: updatesType => + { + var messageText = updatesType.Match( + updateShortMessageTag: x => Some("updateShortMessageTag: " + x.Message), + updateShortChatMessageTag: x => Some("updateShortChatMessageTag: " + x.Message), + updateShortTag: update => update.Update.Match( + newMessageTag: msg => msg.Message.AsTag().Map(x => "newMessageTag: " + x.Message), + editMessageTag: msg => msg.Message.AsTag().Map(x => "editMessageTag: " + x.Message), + editChannelMessageTag: msg => msg.Message.AsTag().Map(x => "editChannelMessageTag: " + x.Message), + _: () => None + ), + _: () => None + ); + messageText.Iter(Console.WriteLine); + }, + onError: Console.WriteLine + ); + await Task.Delay(Timeout.Infinite); + } + static async Task Main() { // it is disabled by default @@ -147,9 +178,11 @@ static async Task Main() { await EnsureAuthorized(tg, cfg); + // await PrintUserInfo(tg); // await DownloadFirstChannelPictureExample(tg); // await PrintFirstChannelTop100MessagesExample(tg); await SendOnePavelDurovPictureToMeExample(tg); + await ListenUpdates(tg); } } } diff --git a/Telega/CallMiddleware/TgCustomizedTransport.cs b/Telega/CallMiddleware/TgCustomizedTransport.cs index 38225eb..0775315 100644 --- a/Telega/CallMiddleware/TgCustomizedTransport.cs +++ b/Telega/CallMiddleware/TgCustomizedTransport.cs @@ -1,3 +1,4 @@ +using System; using System.Threading.Tasks; using LanguageExt; using Telega.Rpc; @@ -5,7 +6,7 @@ namespace Telega.CallMiddleware { - sealed class TgCustomizedTransport + sealed class TgCustomizedTransport : IDisposable { public readonly TgTransport Transport; public readonly TgCallMiddlewareChain CallMiddlewareChain; @@ -16,6 +17,8 @@ public TgCustomizedTransport(Some transport, Some Transport.Dispose(); + public async Task Call(ITgFunc func) { TgCallHandler handler = f => Transport.Call(f); diff --git a/Telega/Connect/TgBellhop.cs b/Telega/Connect/TgBellhop.cs index 5623577..30d2c12 100644 --- a/Telega/Connect/TgBellhop.cs +++ b/Telega/Connect/TgBellhop.cs @@ -2,7 +2,9 @@ using System.Threading.Tasks; using LanguageExt; using Telega.CallMiddleware; +using Telega.Rpc; using Telega.Rpc.Dto; +using Telega.Rpc.Dto.Types; using Telega.Utils; namespace Telega.Connect @@ -11,6 +13,7 @@ sealed class TgBellhop { public readonly TgConnectionPool ConnectionPool; public readonly Var CurrentConnection; + public readonly CustomObservable Updates = new CustomObservable(); public IVarGetter SessionVar => CurrentConnection.Bind(x => x.Session); @@ -19,11 +22,21 @@ sealed class TgBellhop public void SetSession(Func func) => CurrentConnection.Get().Session.SetWith(func); + void MirrorUpdates(TgConnection conn) + { + conn.Transport.Transport.Updates.Subscribe( + onNext: Updates.OnNext, + onError: Updates.OnError, + onCompleted: Updates.OnCompleted + ); + } + async Task ChangeConn(Func> f) { var oldConn = CurrentConnection.Get(); var newConn = await f(oldConn); CurrentConnection.Set(newConn); + MirrorUpdates(newConn); return newConn; } @@ -31,6 +44,7 @@ public TgBellhop(Some connectionPool, Some curre { ConnectionPool = connectionPool; CurrentConnection = currentConnection.Value.AsVar(); + MirrorUpdates(currentConnection); } public TgBellhop Fork() => @@ -57,8 +71,11 @@ async Task CallWithReConnect(ITgFunc func) var conn = CurrentConnection.Get(); return await conn.Transport.Call(func); } - catch (TgBrokenConnectionException) + catch (TgTransportException) { + var oldConn = CurrentConnection.Get(); + oldConn.Dispose(); + var conn = await ChangeConn(x => ConnectionPool.ReConnect(x.Config.ThisDc)); return await conn.Transport.Call(func); } diff --git a/Telega/Connect/TgConnection.cs b/Telega/Connect/TgConnection.cs index 00c7722..2f6178a 100644 --- a/Telega/Connect/TgConnection.cs +++ b/Telega/Connect/TgConnection.cs @@ -1,3 +1,4 @@ +using System; using LanguageExt; using Telega.CallMiddleware; using Telega.Rpc.Dto.Types; @@ -5,7 +6,7 @@ namespace Telega.Connect { - sealed class TgConnection + sealed class TgConnection : IDisposable { public readonly Var Session; public readonly TgCustomizedTransport Transport; @@ -17,5 +18,7 @@ public TgConnection(Some> session, Some tran Transport = transport; Config = config; } + + public void Dispose() => Transport.Dispose(); } } diff --git a/Telega/Connect/TgConnectionPool.cs b/Telega/Connect/TgConnectionPool.cs index 54e00da..eda8848 100644 --- a/Telega/Connect/TgConnectionPool.cs +++ b/Telega/Connect/TgConnectionPool.cs @@ -83,7 +83,7 @@ public async Task ReConnect(int dcId) public void Dispose() { _isDisposed = true; - _conns.Values.Iter(x => x.Transport.Transport.Dispose()); + _conns.Values.Iter(x => x.Dispose()); } public TgConnectionPool( diff --git a/Telega/Rpc/CustomObservable.cs b/Telega/Rpc/CustomObservable.cs new file mode 100644 index 0000000..7adf5eb --- /dev/null +++ b/Telega/Rpc/CustomObservable.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Reactive.Disposables; + +namespace Telega.Rpc +{ + sealed class CustomObservable : IObservable + { + readonly List> _observers = new List>(); + + public void OnCompleted() => _observers.ForEach(x => x.OnCompleted()); + public void OnError(Exception error) => _observers.ForEach(x => x.OnError(error)); + public void OnNext(T value) => _observers.ForEach(x => x.OnNext(value)); + + public IDisposable Subscribe(IObserver observer) + { + _observers.Add(observer); + return Disposable.Create(() => _observers.Remove(observer)); + } + } +} diff --git a/Telega/Rpc/TgSystemMessageHandler.cs b/Telega/Rpc/TgSystemMessageHandler.cs index e6baf9a..746559b 100644 --- a/Telega/Rpc/TgSystemMessageHandler.cs +++ b/Telega/Rpc/TgSystemMessageHandler.cs @@ -196,7 +196,7 @@ public static Func Handle(TgSystemMessageHandlerContext ctx) => m UpdatesType.TryDeserialize(typeNumber, br).Match(updates => { ctx.Ack.Add(msgId); - // TgTrace.Trace("Updates " + updatesOpt.ToString()); + ctx.Updates.Add(updates); }, () => { diff --git a/Telega/Rpc/TgSystemMessageHandlerContext.cs b/Telega/Rpc/TgSystemMessageHandlerContext.cs index f8fd8d0..7974e7f 100644 --- a/Telega/Rpc/TgSystemMessageHandlerContext.cs +++ b/Telega/Rpc/TgSystemMessageHandlerContext.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; using LanguageExt; +using Telega.Rpc.Dto.Types; namespace Telega.Rpc { @@ -8,6 +9,7 @@ sealed class TgSystemMessageHandlerContext { public readonly List Ack = new List(); public readonly List RpcResults = new List(); + public readonly List Updates = new List(); public Option NewSalt; } } diff --git a/Telega/Rpc/TgTransport.cs b/Telega/Rpc/TgTransport.cs index eee91b6..8371933 100644 --- a/Telega/Rpc/TgTransport.cs +++ b/Telega/Rpc/TgTransport.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Threading.Tasks; using LanguageExt; +using Telega.Connect; using Telega.Internal; using Telega.Rpc.Dto; using Telega.Rpc.Dto.Types; @@ -22,7 +23,10 @@ class TgTransport : IDisposable readonly Task _receiveLoopTask; readonly ConcurrentDictionary> _rpcFlow = new ConcurrentDictionary>(); - async Task ReceiveLoop() + + public readonly CustomObservable Updates = new CustomObservable(); + + async Task ReceiveLoopImpl() { while (true) { @@ -42,6 +46,21 @@ Option> CaptureFlow(long id) => flow => flow.SetResult(res), () => TgTrace.Trace($"TgTransport: Unexpected RPC result, the message id is {res.Id}") )); + + ctx.Updates.Iter(Updates.OnNext); + } + } + + async Task ReceiveLoop() + { + try + { + await ReceiveLoopImpl(); + } + catch (TgTransportException e) + { + // Updates.OnError(e); + throw; } } @@ -148,7 +167,16 @@ async Task CheckReceiveLoop() var tcs = new TaskCompletionSource(); _rpcFlow[msgId] = tcs; - await _transport.Send(container); + try + { + await _transport.Send(container); + } + catch (TgTransportException e) + { + // Updates.OnError(e); + throw; + } + return tcs.Task; }); diff --git a/Telega/Telega.csproj b/Telega/Telega.csproj index a54179c..092f492 100644 --- a/Telega/Telega.csproj +++ b/Telega/Telega.csproj @@ -14,12 +14,15 @@ Telegram client ilyalatt - 0.3.1 + 0.3.2 + preview001 + + diff --git a/Telega/TelegramClient.cs b/Telega/TelegramClient.cs index 45e1bd4..22d8ddc 100644 --- a/Telega/TelegramClient.cs +++ b/Telega/TelegramClient.cs @@ -18,6 +18,7 @@ public sealed class TelegramClient : IDisposable public readonly TelegramClientContacts Contacts; public readonly TelegramClientMessages Messages; public readonly TelegramClientUpload Upload; + public readonly TelegramClientUpdates Updates; static readonly IPEndPoint DefaultEndpoint = new IPEndPoint(IPAddress.Parse("149.154.167.50"), 443); @@ -32,6 +33,7 @@ ISessionStore sessionStore Contacts = new TelegramClientContacts(_bellhop); Messages = new TelegramClientMessages(_bellhop); Upload = new TelegramClientUpload(_bellhop); + Updates = new TelegramClientUpdates(_bellhop); } public void Dispose() diff --git a/Telega/TelegramClientUpdates.cs b/Telega/TelegramClientUpdates.cs new file mode 100644 index 0000000..4d60948 --- /dev/null +++ b/Telega/TelegramClientUpdates.cs @@ -0,0 +1,19 @@ +using System; +using LanguageExt; +using Telega.Connect; +using Telega.Rpc.Dto.Types; + +namespace Telega +{ + public sealed class TelegramClientUpdates + { + readonly TgBellhop _tg; + public readonly IObservable Stream; + + internal TelegramClientUpdates(Some tg) + { + _tg = tg; + Stream = _tg.Updates; + } + } +}