Skip to content

Commit

Permalink
Wait only one oracle during exit rotation
Browse files Browse the repository at this point in the history
Signed-off-by: cyc60 <avsysoev60@gmail.com>
  • Loading branch information
cyc60 committed Sep 28, 2023
1 parent 56ed2ad commit 4f5e5e6
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 88 deletions.
145 changes: 63 additions & 82 deletions src/exits/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
import logging
import random
import time
from urllib.parse import urljoin

import aiohttp
from eth_typing import BlockNumber, BLSPubkey
from sw_utils.decorators import retry_aiohttp_errors
from web3 import Web3
from web3.types import HexStr

Expand All @@ -17,16 +14,17 @@
from src.common.typings import Oracles
from src.common.utils import get_current_timestamp, wait_block_finalization
from src.config.settings import (
DEFAULT_RETRY_TIME,
ORACLE_SIGNATURE_UPDATE_SYNC_DELAY,
ORACLES_SIGNATURE_UPDATE_SYNC_TIMEOUT,
OUTDATED_SIGNATURES_URL_PATH,
settings,
)
from src.exits.consensus import get_validator_public_keys
from src.exits.execution import submit_exit_signatures
from src.exits.typings import SignatureRotationRequest
from src.exits.utils import send_signature_rotation_requests
from src.exits.utils import (
get_oracle_outdated_signatures_response,
send_signature_rotation_requests,
)
from src.validators.signing.local import get_exit_signature_shards
from src.validators.signing.remote import (
RemoteSignerConfiguration,
Expand All @@ -37,15 +35,52 @@
logger = logging.getLogger(__name__)


async def fetch_outdated_indexes(oracle_endpoint) -> list[int]:
response = await _get_oracle_outdated_signatures_response(oracle_endpoint)
async def update_exit_signatures_periodically(
keystores: Keystores,
remote_signer_config: RemoteSignerConfiguration | None,
):
# Oracle may have lag if operator was stopped
# during `update_exit_signatures_periodically` process.
# Wait all oracles sync.
oracles = await get_oracles()
await _wait_oracles_signature_update(oracles)

while True:
timer_start = time.time()

try:
oracles = await get_oracles()

oracle_replicas = random.choice(oracles.endpoints) # nosec
oracle_endpoint = random.choice(oracle_replicas) # nosec
outdated_indexes = await _fetch_outdated_indexes(oracle_endpoint)

if outdated_indexes:
await _update_exit_signatures(
keystores=keystores,
remote_signer_config=remote_signer_config,
oracles=oracles,
outdated_indexes=outdated_indexes,
)

# Wait all oracles sync.
await _wait_oracles_signature_update(oracles)
except Exception as e:
logger.exception(e)

elapsed = time.time() - timer_start
await asyncio.sleep(float(settings.network_config.SECONDS_PER_BLOCK) - elapsed)


async def _fetch_outdated_indexes(oracle_endpoint) -> list[int]:
response = await get_oracle_outdated_signatures_response(oracle_endpoint)
outdated_indexes = [val['index'] for val in response['validators']]

metrics.outdated_signatures.set(len(outdated_indexes))
return outdated_indexes


async def wait_oracles_signature_update(oracles: Oracles) -> None:
async def _wait_oracles_signature_update(oracles: Oracles) -> None:
last_event = await keeper_contract.get_exit_signatures_updated_event(vault=settings.vault)
if not last_event:
return
Expand All @@ -54,20 +89,26 @@ async def wait_oracles_signature_update(oracles: Oracles) -> None:
logger.info('Waiting for block %d finalization...', update_block)
await wait_block_finalization(update_block)

oracle_tasks = (
wait_oracle_signature_update(
exit_signature_update_block=update_block,
oracle_endpoint=endpoint,
max_time=ORACLES_SIGNATURE_UPDATE_SYNC_TIMEOUT,
oracle_tasks = {
asyncio.create_task(
_wait_oracle_signature_update(
exit_signature_update_block=update_block,
oracle_endpoint=endpoint,
max_time=ORACLES_SIGNATURE_UPDATE_SYNC_TIMEOUT,
)
)
for replicas in oracles.endpoints
for endpoint in replicas
)
await asyncio.gather(*oracle_tasks)
logger.info('All the oracles have fetched exit signatures update')
}
while oracle_tasks:
done, oracle_tasks = await asyncio.wait(oracle_tasks, return_when=asyncio.FIRST_COMPLETED)
if done:
for pending_task in oracle_tasks:
pending_task.cancel()
logger.info('Oracles have fetched exit signatures update')


async def wait_oracle_signature_update(
async def _wait_oracle_signature_update(
exit_signature_update_block: BlockNumber, oracle_endpoint: str, max_time: int | float = 0
) -> None:
"""
Expand All @@ -93,7 +134,7 @@ async def wait_oracle_signature_update(
)


async def update_exit_signatures(
async def _update_exit_signatures(
keystores: Keystores,
remote_signer_config: RemoteSignerConfiguration | None,
oracles: Oracles,
Expand All @@ -104,15 +145,14 @@ async def update_exit_signatures(
outdated_indexes = outdated_indexes[:exit_rotation_batch_limit]

logger.info('Starting exit signature rotation for %d validators', len(outdated_indexes))

# pylint: disable=duplicate-code
validators = await get_validator_public_keys(outdated_indexes)
deadline = None
while True:
current_timestamp = get_current_timestamp()
if not deadline or deadline <= current_timestamp:
deadline = current_timestamp + oracles.signature_validity_period
oracles_request = await get_oracles_request(
oracles_request = await _get_oracles_request(
oracles=oracles,
keystores=keystores,
remote_signer_config=remote_signer_config,
Expand All @@ -134,37 +174,15 @@ async def update_exit_signatures(
return tx_hash


@retry_aiohttp_errors(delay=DEFAULT_RETRY_TIME)
async def _get_oracle_outdated_signatures_response(oracle_endpoint: str) -> dict:
"""
:param oracle_endpoint:
:return: Example response
```
{
"exit_signature_block_number": 100,
"validators": [{"index": 1}, ...]
}
```
"""
path = OUTDATED_SIGNATURES_URL_PATH.format(vault=settings.vault)
url = urljoin(oracle_endpoint, path)

async with aiohttp.ClientSession() as session:
async with session.get(url=url) as response:
response.raise_for_status()
data = await response.json()
return data


async def _fetch_exit_signature_block(oracle_endpoint: str) -> BlockNumber | None:
data = await _get_oracle_outdated_signatures_response(oracle_endpoint)
data = await get_oracle_outdated_signatures_response(oracle_endpoint)
block_number = data['exit_signature_block_number']
if block_number is None:
return None
return BlockNumber(block_number)


async def get_oracles_request(
async def _get_oracles_request(
oracles: Oracles,
keystores: Keystores,
remote_signer_config: RemoteSignerConfiguration | None,
Expand Down Expand Up @@ -209,40 +227,3 @@ async def get_oracles_request(
request.exit_signature_shards.append(shards.exit_signatures)

return request


async def update_exit_signatures_periodically(
keystores: Keystores,
remote_signer_config: RemoteSignerConfiguration | None,
):
# Oracle may have lag if operator was stopped
# during `update_exit_signatures_periodically` process.
# Wait all oracles sync.
oracles = await get_oracles()
await wait_oracles_signature_update(oracles)

while True:
timer_start = time.time()

try:
oracles = await get_oracles()

oracle_replicas = random.choice(oracles.endpoints) # nosec
oracle_endpoint = random.choice(oracle_replicas) # nosec
outdated_indexes = await fetch_outdated_indexes(oracle_endpoint)

if outdated_indexes:
await update_exit_signatures(
keystores=keystores,
remote_signer_config=remote_signer_config,
oracles=oracles,
outdated_indexes=outdated_indexes,
)

# Wait all oracles sync.
await wait_oracles_signature_update(oracles)
except Exception as e:
logger.exception(e)

elapsed = time.time() - timer_start
await asyncio.sleep(float(settings.network_config.SECONDS_PER_BLOCK) - elapsed)
10 changes: 5 additions & 5 deletions src/exits/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from src.common.typings import Oracles
from src.common.utils import get_current_timestamp
from src.config.settings import settings
from src.exits.tasks import get_oracles_request, wait_oracle_signature_update
from src.exits.tasks import _get_oracles_request, _wait_oracle_signature_update
from src.validators.signing.remote import RemoteSignerConfiguration
from src.validators.typings import ExitSignatureShards, Keystores

Expand All @@ -27,7 +27,7 @@ async def test_normal(self):
'src.exits.tasks._fetch_exit_signature_block', side_effect=[None, 1, 2, 3]
) as fetch_mock,
):
await wait_oracle_signature_update(update_block, 'http://oracle', max_time=5)
await _wait_oracle_signature_update(update_block, 'http://oracle', max_time=5)

assert fetch_mock.call_count == 4

Expand All @@ -41,7 +41,7 @@ async def test_timeout(self):
) as fetch_mock,
pytest.raises(asyncio.TimeoutError),
):
await wait_oracle_signature_update(update_block, 'http://oracle', max_time=5)
await _wait_oracle_signature_update(update_block, 'http://oracle', max_time=5)

assert fetch_mock.call_count == 2

Expand All @@ -66,7 +66,7 @@ async def test_local_keystores(
),
),
):
request = await get_oracles_request(
request = await _get_oracles_request(
oracles=oracles,
keystores=Keystores({test_validator_pubkey: test_validator_privkey}),
remote_signer_config=None,
Expand Down Expand Up @@ -108,7 +108,7 @@ async def test_remote_signer(
randint(0, int(1e6)): pubkey
for pubkey in remote_signer_config.pubkeys_to_shares.keys()
}
request = await get_oracles_request(
request = await _get_oracles_request(
oracles=oracles,
keystores=Keystores(dict()),
remote_signer_config=remote_signer_config,
Expand Down
29 changes: 28 additions & 1 deletion src/exits/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@

from src.common.typings import OracleApproval, Oracles, OraclesApproval
from src.common.utils import process_oracles_approvals
from src.config.settings import DEFAULT_RETRY_TIME, UPDATE_SIGNATURES_URL_PATH
from src.config.settings import (
DEFAULT_RETRY_TIME,
OUTDATED_SIGNATURES_URL_PATH,
UPDATE_SIGNATURES_URL_PATH,
settings,
)
from src.exits.typings import SignatureRotationRequest

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -81,3 +86,25 @@ async def send_signature_rotation_request(
signature=Web3.to_bytes(hexstr=data['signature']),
deadline=data['deadline'],
)


@retry_aiohttp_errors(delay=DEFAULT_RETRY_TIME)
async def get_oracle_outdated_signatures_response(oracle_endpoint: str) -> dict:
"""
:param oracle_endpoint:
:return: Example response
```
{
"exit_signature_block_number": 100,
"validators": [{"index": 1}, ...]
}
```
"""
path = OUTDATED_SIGNATURES_URL_PATH.format(vault=settings.vault)
url = urljoin(oracle_endpoint, path)

async with aiohttp.ClientSession() as session:
async with session.get(url=url) as response:
response.raise_for_status()
data = await response.json()
return data

0 comments on commit 4f5e5e6

Please sign in to comment.