Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReplayMarkerStorage with PgSQL not working with a saved replay id #19

Open
welcomemat-services opened this issue Feb 7, 2022 · 3 comments

Comments

@welcomemat-services
Copy link

welcomemat-services commented Feb 7, 2022

I have the below code that is working if the Pgsql table is empty. But if it pulls an existing replay id, it is throwing the below exception.

Thanks for any help provided here.

Code:

class MyReplayMarkerStorage(ReplayMarkerStorage):

	def __init__(self, connection, cursor):
		super().__init__()
		self.connection = connection
		self.cursor = cursor

	async def set_replay_marker(self, subscription: str, replay_marker: ReplayMarker):
		# store *replay_marker* for the given *subscription*
		event = subscription[subscription.rfind('/')+1:]
		self.cursor.execute(f"""
			INSERT INTO platform_event_setting (channel, replay_id, date_str)
			VALUES ('{event}', '{replay_marker.replay_id}', '{replay_marker.date}')
			ON CONFLICT ON CONSTRAINT platform_event_setting_pkey
			DO UPDATE SET (replay_id, date_str) = (EXCLUDED.replay_id, EXCLUDED.date_str)
		""")

		print(f"Upserting replay id {replay_marker.replay_id} for {subscription}")
		self.connection.commit()

	async def get_replay_marker(self, subscription: str):
		# retrieve the replay marker for the given *subscription*
		event = subscription[subscription.rfind('/')+1:]
		self.cursor.execute(f"""
			SELECT replay_id, date_str
			FROM platform_event_setting
			WHERE channel = '{event}'
			LIMIT 1
		""")

		channel = self.cursor.fetchone()
		
		if channel == None:
			print(f"Platform Event Setting does not exist: {subscription}")
			return None

		print(f"{subscription} - {channel['replay_id']} - {channel['date_str']}")
		return ReplayMarker(date=channel['date_str'], replay_id=channel['replay_id'])
# MyReplayMarkerStorage

async def process_events():
	try:
		connection = psycopg2.connect(os.getenv('DATABASE_URL'), sslmode='require')
		cursor = connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

		myReplay = MyReplayMarkerStorage(connection, cursor)

		async with SalesforceStreamingClient(
			consumer_key=os.getenv("SF_APP_KEY"),
			consumer_secret=os.getenv("SF_APP_SECRET"),
			username=os.getenv('SF_USERNAME'),
			password=os.getenv('SF_PASSWORD'),
			replay=myReplay,
			replay_fallback=ReplayOption.ALL_EVENTS,
			replay_storage_policy=ReplayMarkerStoragePolicy.MANUAL) as client:

			await client.subscribe("/event/Notification__e")

			async for message in client:
				async with client.replay_storage(message):
					print(message['data']['payload'])

	except (Exception, psycopg2.Error) as e:
		exc_type, exc_value, exc_traceback = sys.exc_info()
		print(f"Exception - {e}")
	finally:
		if cursor:
			cursor.close()
		if connection:
			connection.close()

# process_events

Error log:

DEBUG:asyncio:Using proactor: IocpProactor
Connected to DB
DEBUG:aiosfstream.client:Client created with replay storage: <main.MyReplayMarkerStorage object at 0x0000024E0287CA30>, replay fallback: <ReplayOption.ALL_EVENTS: -2>
DEBUG:aiosfstream.client:Authenticating using PasswordAuthenticator(consumer_key='xxxxx',consumer_secret='yyyyy', username='test.test.com', password='abcde').
INFO:aiosfstream.client:Successful authentication. Instance URL: 'https://test.my.salesforce.com'.
INFO:aiocometd.client:Opening client with connection types ['websocket', 'long-polling'] ...
INFO:aiocometd.client:Connection types supported by the server: ['long-polling']
DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': 'kvl2mi3sd9hs6ei1m3vnsilh4nb9', 'advice': {'interval': 0, 'timeout': 110000, 'reconnect': 'retry'}, 'channel': '/meta/connect', 'id': '1', 'successful': True}
INFO:aiocometd.client:Client opened with connection_type 'long-polling'
/event/Notification__e - 7785481 - 2022-02-10T23:23:49.572Z
INFO:aiocometd.client:Closing client...
ERROR:asyncio:Exception in callback TransportBase._connect_done(<Task cancell...\utils.py:22>>)
handle: <Handle TransportBase._connect_done(<Task cancell...\utils.py:22>>)>
Traceback (most recent call last):
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiocometd\utils.py", line 27, in wrapper
return await coro_func(*args, **kwargs)
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiocometd\transports\base.py", line 524, in _connect
result = await self._send_payload_with_auth(payload)
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiocometd\transports\base.py", line 323, in _send_payload_with_auth
await self._auth.authenticate()
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiosfstream\auth.py", line 96, in authenticate
status_code, response_data = await self._authenticate()
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiosfstream\auth.py", line 173, in _authenticate
response = await session.post(self._token_url, data=data)
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiohttp\client.py", line 535, in _request
conn = await self._connector.connect(
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiohttp\connector.py", line 542, in connect
proto = await self._create_connection(req, traces, timeout)
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiohttp\connector.py", line 907, in _create_connection
_, proto = await self._create_direct_connection(req, traces, timeout)
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiohttp\connector.py", line 1154, in _create_direct_connection
hosts = await asyncio.shield(host_resolved)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

self._raise_server_error(response)

File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiocometd\client.py", line 385, in _raise_server_error
raise ServerError(message, response)
aiocometd.exceptions.ServerError: ('Subscribe request failed.', {'clientId': 'kvl2mi3sd9hs6ei1m3vnsilh4nb9', 'channel': '/meta/subscribe', 'id': '2', 'subscription': '/event/Notification__e', 'error': 'Failed to create an internal subscription!', 'successful': False})

The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "c:\code\Python\scheduled_processes\processor.py", line 70, in process_events
await client.subscribe("/event/Notification__e")
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiosfstream\exceptions.py", line 143, in async_wrapper
return await func(*args, **kwargs)
File "C:\Users\test\AppData\Local\Programs\Python\Python39\lib\contextlib.py", line 137, in exit
self.gen.throw(typ, value, traceback)
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiosfstream\exceptions.py", line 123, in translate_errors_context
raise error_cls(*cometd_error.args) from cometd_error
aiosfstream.exceptions.ServerError: ('Subscribe request failed.', {'clientId': 'kvl2mi3sd9hs6ei1m3vnsilh4nb9', 'channel': '/meta/subscribe', 'id': '2', 'subscription': '/event/Notification__e', 'error': 'Failed to create an internal subscription!', 'successful': False})

@welcomemat-services welcomemat-services changed the title Help needed with my custom replay marker storage ReplayMarkerStorage with PgSQL not working with a saved replay id Feb 11, 2022
@efh365
Copy link

efh365 commented Mar 10, 2022

This is the code I am using and it works for me

from aiosfstream import ReplayMarker, ReplayMarkerStorage
from asgiref.sync import sync_to_async
from django.db import connection, transaction
from psycopg2 import sql

class LocalReplayMarkerStorage(ReplayMarkerStorage):
    """Replay marker storage implementation for storing replay markers in
    a PostgresSQL table
    """

    #: SQL table creation statement template
    TABLE_CREATION_TEMPLATE = """
        CREATE TABLE IF NOT EXISTS {table_name} (
            subscription VARCHAR(255) NOT NULL PRIMARY KEY,
            date VARCHAR(32) NOT NULL,
            replay_id INT NOT NULL
        )
    """

    #: SQL statement template for setting the value of a replay marker
    SET_REPLAY_MARKER_TEMPLATE = """
        INSERT INTO {table_name} (subscription, date, replay_id)
        VALUES (%s, %s, %s)
        ON CONFLICT (subscription) DO UPDATE 
        SET date = excluded.date, replay_id = excluded.replay_id
    """

    #: SQL statement template for getting the value of a replay marker
    GET_REPLAY_MARKER_TEMPLATE = """
        SELECT date, replay_id
        FROM {table_name}
        WHERE subscription=%s
    """

    def __init__(self, table_name: str = "replay") -> None:
        """
        :param connection_pool: MySQL connection pool
        :param table_name: Name of the table for storing replay markers
        """
        super().__init__()
        self.table_name = table_name

    def render_sql(self, template: str) -> str:
        """Create an SQL statement from the given *template* by inserting the
        correct table name

        :param template: SQL statement template
        :return: The rendered SQL statement
        """
        query = sql.SQL(template)
        return query.format(table_name=sql.Identifier(self.table_name))

    async def ensure_table_exists(self) -> None:
        """Create the table for storing the replay markers if it doesn't
        already exist
        """
        await self._ensure_table_exists()

    @sync_to_async
    def _ensure_table_exists(self) -> None:
        with transaction.atomic(), connection.cursor() as cursor:
            sql = self.render_sql(self.TABLE_CREATION_TEMPLATE)
            cursor.execute(
                sql.as_string(cursor.cursor.connection)
            )

    async def set_replay_marker(
        self, subscription: str, replay_marker: ReplayMarker
    ) -> None:
        """Store the *replay_marker* for the given *subscription*

        :param subscription: Name of the subscribed channel
        :param replay_marker: A replay marker
        """
        await self._set_replay_marker(subscription, replay_marker)

    @sync_to_async
    def _set_replay_marker(
        self, subscription: str, replay_marker: ReplayMarker
    ) -> None:
        with transaction.atomic(), connection.cursor() as cursor:
            sql = self.render_sql(self.SET_REPLAY_MARKER_TEMPLATE)
            cursor.execute(
                sql.as_string(cursor.cursor.connection),
                [subscription, replay_marker.date, replay_marker.replay_id]
            )

    async def get_replay_marker(self, subscription: str) -> Optional[ReplayMarker]:
        """Retrieve a stored replay marker for the given *subscription*

        :param subscription: Name of the subscribed channel
        :return: A replay marker or ``None`` if there is nothing stored for \
        the given *subscription*
        """
        return await self._get_replay_marker(subscription)
    
    @sync_to_async
    def _get_replay_marker(self, subscription: str) -> Optional[ReplayMarker]:
        with transaction.atomic(), connection.cursor() as cursor:
            sql = self.render_sql(self.GET_REPLAY_MARKER_TEMPLATE)
            cursor.execute(
                sql.as_string(cursor.cursor.connection),
                [subscription]
            )
            result = cursor.fetchone()
            logger.info(f"RESULT: {result}")
            #  ('2022-03-09T15:29:37.188Z', 20090)
            if result is None:
                return None
            marker = ReplayMarker(date=result[0], replay_id=result[1])
            if marker:
                logger.info(f"MARKER.REPLAY: {marker.replay_id}") 
            return marker
            
 class Command(BaseCommand):
    def handle(self, *args, **options):
        self.stream_events()

    @async_to_sync
    async def stream_events(self):
        replay_storage = LocalReplayMarkerStorage()
        # make sure the required table exists
        await replay_storage.ensure_table_exists()

        sandbox = False if settings.DJANGO_MODE == "Production" else True

        async with SalesforceStreamingClient(
            consumer_key=settings.SALESFORCE_CONSUMER_KEY,
            consumer_secret=settings.SALESFORCE_CONSUMER_SECRET,
            username=settings.SALESFORCE_USERNAME,
            password=settings.SALESFORCE_PASSWORD,
            sandbox=sandbox,
            replay=replay_storage,  # type: ignore
        ) as client:

            await client.subscribe(ACCOUNT_TOPIC)

            async for message in client:
                topic = message["channel"]
                data = message["data"]
                logger.info(f"{topic} - {data}")
                processor = MessageProcessor(topic, data)
                try:
                    await processor.process_message()
                except Exception as e:
                    logger.exception(f"Unable to process message due to: {e}")

@trankos
Copy link

trankos commented Aug 26, 2022

Same issue. The problem was on replay_id data type. Must be int!!!

@jdelStrother
Copy link

@trankos thanks, that saved me some hair-pulling

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants