diff --git a/src/exits/tasks.py b/src/exits/tasks.py index ba6f212e..b4f1656b 100644 --- a/src/exits/tasks.py +++ b/src/exits/tasks.py @@ -2,11 +2,8 @@ import logging import random import time -from urllib.parse import urljoin -import aiohttp from eth_typing import BlockNumber, BLSPubkey -from sw_utils.decorators import retry_aiohttp_errors from web3 import Web3 from web3.types import HexStr @@ -17,16 +14,17 @@ from src.common.typings import Oracles from src.common.utils import get_current_timestamp, wait_block_finalization from src.config.settings import ( - DEFAULT_RETRY_TIME, ORACLE_SIGNATURE_UPDATE_SYNC_DELAY, ORACLES_SIGNATURE_UPDATE_SYNC_TIMEOUT, - OUTDATED_SIGNATURES_URL_PATH, settings, ) from src.exits.consensus import get_validator_public_keys from src.exits.execution import submit_exit_signatures from src.exits.typings import SignatureRotationRequest -from src.exits.utils import send_signature_rotation_requests +from src.exits.utils import ( + get_oracle_outdated_signatures_response, + send_signature_rotation_requests, +) from src.validators.signing.local import get_exit_signature_shards from src.validators.signing.remote import ( RemoteSignerConfiguration, @@ -37,15 +35,52 @@ logger = logging.getLogger(__name__) -async def fetch_outdated_indexes(oracle_endpoint) -> list[int]: - response = await _get_oracle_outdated_signatures_response(oracle_endpoint) +async def update_exit_signatures_periodically( + keystores: Keystores, + remote_signer_config: RemoteSignerConfiguration | None, +): + # Oracle may have lag if operator was stopped + # during `update_exit_signatures_periodically` process. + # Wait all oracles sync. + oracles = await get_oracles() + await _wait_oracles_signature_update(oracles) + + while True: + timer_start = time.time() + + try: + oracles = await get_oracles() + + oracle_replicas = random.choice(oracles.endpoints) # nosec + oracle_endpoint = random.choice(oracle_replicas) # nosec + outdated_indexes = await _fetch_outdated_indexes(oracle_endpoint) + + if outdated_indexes: + await _update_exit_signatures( + keystores=keystores, + remote_signer_config=remote_signer_config, + oracles=oracles, + outdated_indexes=outdated_indexes, + ) + + # Wait all oracles sync. + await _wait_oracles_signature_update(oracles) + except Exception as e: + logger.exception(e) + + elapsed = time.time() - timer_start + await asyncio.sleep(float(settings.network_config.SECONDS_PER_BLOCK) - elapsed) + + +async def _fetch_outdated_indexes(oracle_endpoint) -> list[int]: + response = await get_oracle_outdated_signatures_response(oracle_endpoint) outdated_indexes = [val['index'] for val in response['validators']] metrics.outdated_signatures.set(len(outdated_indexes)) return outdated_indexes -async def wait_oracles_signature_update(oracles: Oracles) -> None: +async def _wait_oracles_signature_update(oracles: Oracles) -> None: last_event = await keeper_contract.get_exit_signatures_updated_event(vault=settings.vault) if not last_event: return @@ -54,20 +89,26 @@ async def wait_oracles_signature_update(oracles: Oracles) -> None: logger.info('Waiting for block %d finalization...', update_block) await wait_block_finalization(update_block) - oracle_tasks = ( - wait_oracle_signature_update( - exit_signature_update_block=update_block, - oracle_endpoint=endpoint, - max_time=ORACLES_SIGNATURE_UPDATE_SYNC_TIMEOUT, + oracle_tasks = { + asyncio.create_task( + _wait_oracle_signature_update( + exit_signature_update_block=update_block, + oracle_endpoint=endpoint, + max_time=ORACLES_SIGNATURE_UPDATE_SYNC_TIMEOUT, + ) ) for replicas in oracles.endpoints for endpoint in replicas - ) - await asyncio.gather(*oracle_tasks) - logger.info('All the oracles have fetched exit signatures update') + } + while oracle_tasks: + done, oracle_tasks = await asyncio.wait(oracle_tasks, return_when=asyncio.FIRST_COMPLETED) + if done: + for pending_task in oracle_tasks: + pending_task.cancel() + logger.info('Oracles have fetched exit signatures update') -async def wait_oracle_signature_update( +async def _wait_oracle_signature_update( exit_signature_update_block: BlockNumber, oracle_endpoint: str, max_time: int | float = 0 ) -> None: """ @@ -93,7 +134,7 @@ async def wait_oracle_signature_update( ) -async def update_exit_signatures( +async def _update_exit_signatures( keystores: Keystores, remote_signer_config: RemoteSignerConfiguration | None, oracles: Oracles, @@ -104,7 +145,6 @@ async def update_exit_signatures( outdated_indexes = outdated_indexes[:exit_rotation_batch_limit] logger.info('Starting exit signature rotation for %d validators', len(outdated_indexes)) - # pylint: disable=duplicate-code validators = await get_validator_public_keys(outdated_indexes) deadline = None @@ -112,7 +152,7 @@ async def update_exit_signatures( current_timestamp = get_current_timestamp() if not deadline or deadline <= current_timestamp: deadline = current_timestamp + oracles.signature_validity_period - oracles_request = await get_oracles_request( + oracles_request = await _get_oracles_request( oracles=oracles, keystores=keystores, remote_signer_config=remote_signer_config, @@ -134,37 +174,15 @@ async def update_exit_signatures( return tx_hash -@retry_aiohttp_errors(delay=DEFAULT_RETRY_TIME) -async def _get_oracle_outdated_signatures_response(oracle_endpoint: str) -> dict: - """ - :param oracle_endpoint: - :return: Example response - ``` - { - "exit_signature_block_number": 100, - "validators": [{"index": 1}, ...] - } - ``` - """ - path = OUTDATED_SIGNATURES_URL_PATH.format(vault=settings.vault) - url = urljoin(oracle_endpoint, path) - - async with aiohttp.ClientSession() as session: - async with session.get(url=url) as response: - response.raise_for_status() - data = await response.json() - return data - - async def _fetch_exit_signature_block(oracle_endpoint: str) -> BlockNumber | None: - data = await _get_oracle_outdated_signatures_response(oracle_endpoint) + data = await get_oracle_outdated_signatures_response(oracle_endpoint) block_number = data['exit_signature_block_number'] if block_number is None: return None return BlockNumber(block_number) -async def get_oracles_request( +async def _get_oracles_request( oracles: Oracles, keystores: Keystores, remote_signer_config: RemoteSignerConfiguration | None, @@ -209,40 +227,3 @@ async def get_oracles_request( request.exit_signature_shards.append(shards.exit_signatures) return request - - -async def update_exit_signatures_periodically( - keystores: Keystores, - remote_signer_config: RemoteSignerConfiguration | None, -): - # Oracle may have lag if operator was stopped - # during `update_exit_signatures_periodically` process. - # Wait all oracles sync. - oracles = await get_oracles() - await wait_oracles_signature_update(oracles) - - while True: - timer_start = time.time() - - try: - oracles = await get_oracles() - - oracle_replicas = random.choice(oracles.endpoints) # nosec - oracle_endpoint = random.choice(oracle_replicas) # nosec - outdated_indexes = await fetch_outdated_indexes(oracle_endpoint) - - if outdated_indexes: - await update_exit_signatures( - keystores=keystores, - remote_signer_config=remote_signer_config, - oracles=oracles, - outdated_indexes=outdated_indexes, - ) - - # Wait all oracles sync. - await wait_oracles_signature_update(oracles) - except Exception as e: - logger.exception(e) - - elapsed = time.time() - timer_start - await asyncio.sleep(float(settings.network_config.SECONDS_PER_BLOCK) - elapsed) diff --git a/src/exits/tests/test_tasks.py b/src/exits/tests/test_tasks.py index d508f24d..92160893 100644 --- a/src/exits/tests/test_tasks.py +++ b/src/exits/tests/test_tasks.py @@ -11,7 +11,7 @@ from src.common.typings import Oracles from src.common.utils import get_current_timestamp from src.config.settings import settings -from src.exits.tasks import get_oracles_request, wait_oracle_signature_update +from src.exits.tasks import _get_oracles_request, _wait_oracle_signature_update from src.validators.signing.remote import RemoteSignerConfiguration from src.validators.typings import ExitSignatureShards, Keystores @@ -27,7 +27,7 @@ async def test_normal(self): 'src.exits.tasks._fetch_exit_signature_block', side_effect=[None, 1, 2, 3] ) as fetch_mock, ): - await wait_oracle_signature_update(update_block, 'http://oracle', max_time=5) + await _wait_oracle_signature_update(update_block, 'http://oracle', max_time=5) assert fetch_mock.call_count == 4 @@ -41,7 +41,7 @@ async def test_timeout(self): ) as fetch_mock, pytest.raises(asyncio.TimeoutError), ): - await wait_oracle_signature_update(update_block, 'http://oracle', max_time=5) + await _wait_oracle_signature_update(update_block, 'http://oracle', max_time=5) assert fetch_mock.call_count == 2 @@ -66,7 +66,7 @@ async def test_local_keystores( ), ), ): - request = await get_oracles_request( + request = await _get_oracles_request( oracles=oracles, keystores=Keystores({test_validator_pubkey: test_validator_privkey}), remote_signer_config=None, @@ -108,7 +108,7 @@ async def test_remote_signer( randint(0, int(1e6)): pubkey for pubkey in remote_signer_config.pubkeys_to_shares.keys() } - request = await get_oracles_request( + request = await _get_oracles_request( oracles=oracles, keystores=Keystores(dict()), remote_signer_config=remote_signer_config, diff --git a/src/exits/utils.py b/src/exits/utils.py index a54ddcc5..0f03bf2a 100644 --- a/src/exits/utils.py +++ b/src/exits/utils.py @@ -12,7 +12,12 @@ from src.common.typings import OracleApproval, Oracles, OraclesApproval from src.common.utils import process_oracles_approvals -from src.config.settings import DEFAULT_RETRY_TIME, UPDATE_SIGNATURES_URL_PATH +from src.config.settings import ( + DEFAULT_RETRY_TIME, + OUTDATED_SIGNATURES_URL_PATH, + UPDATE_SIGNATURES_URL_PATH, + settings, +) from src.exits.typings import SignatureRotationRequest logger = logging.getLogger(__name__) @@ -81,3 +86,25 @@ async def send_signature_rotation_request( signature=Web3.to_bytes(hexstr=data['signature']), deadline=data['deadline'], ) + + +@retry_aiohttp_errors(delay=DEFAULT_RETRY_TIME) +async def get_oracle_outdated_signatures_response(oracle_endpoint: str) -> dict: + """ + :param oracle_endpoint: + :return: Example response + ``` + { + "exit_signature_block_number": 100, + "validators": [{"index": 1}, ...] + } + ``` + """ + path = OUTDATED_SIGNATURES_URL_PATH.format(vault=settings.vault) + url = urljoin(oracle_endpoint, path) + + async with aiohttp.ClientSession() as session: + async with session.get(url=url) as response: + response.raise_for_status() + data = await response.json() + return data