From 9f9e942dea5e83081f1419317636775505d9c5bb Mon Sep 17 00:00:00 2001 From: nigeldaniels Date: Mon, 24 Jun 2024 15:17:31 -0700 Subject: [PATCH] WIP --- switchio/connection.py | 60 +++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/switchio/connection.py b/switchio/connection.py index 37dd6d8..7fac4ed 100644 --- a/switchio/connection.py +++ b/switchio/connection.py @@ -4,20 +4,19 @@ # # Copyright (c) 2017 Tyler Goodlet """ -Asyncio ESL connection abstactions +Asyncio ESL connection abstractions """ import asyncio from functools import partial from concurrent import futures from threading import get_ident +import traceback from . import utils from .protocol import InboundProtocol - class ConnectionError(utils.ESLError): "Failed to connect to ESL" - async def await_in_order(awaitables, loop, timeout=None): awaitables = map(partial(asyncio.ensure_future, loop=loop), awaitables) for awaitable in awaitables: @@ -30,7 +29,6 @@ async def await_in_order(awaitables, loop, timeout=None): return res - def run_in_order_threadsafe(awaitables, loop, timeout=0.5, block=True): """"Given a sequence of awaitables, schedule each threadsafe in order optionally blocking until completion. @@ -54,30 +52,32 @@ def run_in_order_threadsafe(awaitables, loop, timeout=0.5, block=True): return future - async def connect_and_auth(host, port, password, prot, loop, log, timeout=0.5): """Try to create a connection and authenticate to the - target FS ESL. - """ + target FS ESL.""" msg = ("Failed to connect to server at '{}:{}'\n" "Please check that FreeSWITCH is running and " "accepting ESL connections.".format(host, port)) try: + log.debug("Attempting to create connection to {}:{}".format(host, port)) await asyncio.wait_for( loop.create_connection(lambda: prot, host, port), timeout=timeout) + log.debug("Connection to {}:{} created".format(host, port)) except ( ConnectionRefusedError, asyncio.TimeoutError, OSError, futures.TimeoutError, ) as err: - raise ConnectionError(msg.format(host, port)) + log.error(f"Connection attempt failed: {traceback.format_exc()}") + raise ConnectionError(msg.format(host, port)) from err - # TODO: consider using the asyncio_timeout lib here try: + log.debug("Attempting to authenticate to {}:{}".format(host, port)) await asyncio.wait_for(prot.authenticate(), timeout) - except asyncio.TimeoutError: - raise ConnectionRefusedError(msg.format(host, port)) - + log.debug("Authentication to {}:{} succeeded".format(host, port)) + except asyncio.TimeoutError as err: + log.error(f"Authentication attempt failed: {traceback.format_exc()}") + raise ConnectionRefusedError(msg.format(host, port)) from err async def async_reconnect(host, port, password, prot, loop, log): log.info("Attempting to reconnect to {}:{}".format(host, port)) @@ -107,7 +107,6 @@ async def async_reconnect(host, port, password, prot, loop, log): log.info("Successfully reconnected to '{}:{}'" .format(host, port)) - class Connection(object): """An ESL connection implemented using an ``asyncio`` TCP protocol. @@ -140,6 +139,19 @@ def __init__(self, host, port='8021', password='ClueCon', loop=None, self.loop = loop self.autorecon = autorecon self.protocol = None + self.initialize_protocol() + + def initialize_protocol(self): + self.protocol = InboundProtocol( + self.host, self.password, self.loop, autorecon=self.autorecon, + on_disconnect=self.reconnect) + + def reconnect(self, prot): + """Schedule a reconnection task.""" + self.log.debug("Scheduling a reconnection task") + asyncio.ensure_future(async_reconnect( + self.host, self.port, self.password, prot, + self.loop, self.log), loop=self.loop) def __enter__(self, **kwargs): self.connect(**kwargs) @@ -166,8 +178,7 @@ def connect( if not self.connected() or not block: def reconnect(prot): - """Schedule a reconnection task. - """ + """Schedule a reconnection task.""" self.log.debug("Scheduling a reconnection task") asyncio.ensure_future(async_reconnect( host, port, password, prot, @@ -210,22 +221,14 @@ def disconnect(self, block=True, loop=None): ).result() async def recv_event(self): - """Retreive the latest queued event. - """ + """Retrieve the latest queued event.""" queue = self.protocol.event_queue event = await queue.get() queue.task_done() return event - def execute(self, uuid, app, arg='', params='', loops=1): - """Execute a dialplan ``app`` with argument ``arg``. - """ - return self.protocol.sendmsg(uuid, 'execute', app, arg, params, - loops=loops) - def api(self, cmd, errcheck=True, block=False, timeout=0.5): - '''Invoke api command (with error checking by default). - ''' + '''Invoke api command (with error checking by default).''' if not self.connected(): raise ConnectionError("Call ``connect()`` first") self.log.debug("api cmd '{}'".format(cmd)) @@ -247,8 +250,7 @@ def api(self, cmd, errcheck=True, block=False, timeout=0.5): return future.result(0.005) def cmd(self, cmd): - '''Return the string-body output from invoking a command. - ''' + '''Return the string-body output from invoking a command.''' event = self.api(cmd, block=True) _, body = self._handle_socket_data(event) return body @@ -303,10 +305,8 @@ def _handle_socket_data(event): raise utils.APIError(body) return True, body - def get_connection(host, port=8021, password='ClueCon', loop=None): - """ESL connection factory. - """ + """ESL connection factory.""" loop = loop or asyncio.get_event_loop() loop._tid = get_ident() return Connection(host, port=port, password=password, loop=loop)