diff --git a/eth2/beacon/tools/builder/aggregator.py b/eth2/beacon/tools/builder/aggregator.py index ab2101cbde..ee47150409 100644 --- a/eth2/beacon/tools/builder/aggregator.py +++ b/eth2/beacon/tools/builder/aggregator.py @@ -88,7 +88,7 @@ def validate_aggregator_proof( state: BeaconState, aggregate_and_proof: AggregateAndProof, config: CommitteeConfig ) -> None: slot = aggregate_and_proof.aggregate.data.slot - pubkey = state.validators[aggregate_and_proof.index] + pubkey = state.validators[aggregate_and_proof.index].pubkey domain = get_domain( state, SignatureDomain.DOMAIN_BEACON_ATTESTER, @@ -96,7 +96,13 @@ def validate_aggregator_proof( message_epoch=compute_epoch_at_slot(slot, config.SLOTS_PER_EPOCH), ) message_hash = get_hash_tree_root(slot, sedes=uint64) - if not bls.verify(message_hash, pubkey, aggregate_and_proof.proof, domain): + + if not bls.verify( + message_hash=message_hash, + pubkey=pubkey, + signature=aggregate_and_proof.selection_proof, + domain=domain, + ): raise ValidationError( "Incorrect selection proof:" f" aggregate_and_proof={aggregate_and_proof}" diff --git a/trinity/components/eth2/beacon/component.py b/trinity/components/eth2/beacon/component.py index ff02bc11f7..abde084ebe 100644 --- a/trinity/components/eth2/beacon/component.py +++ b/trinity/components/eth2/beacon/component.py @@ -150,6 +150,8 @@ def do_start(self) -> None: event_bus=self.event_bus, token=libp2p_node.cancel_token, get_ready_attestations_fn=receive_server.get_ready_attestations, + get_aggregatable_attestations_fn=receive_server.get_aggregatable_attestations, + import_attestation_fn=receive_server.import_attestation, ) slot_ticker = SlotTicker( diff --git a/trinity/components/eth2/beacon/validator.py b/trinity/components/eth2/beacon/validator.py index 420849e9db..3bdba8a716 100644 --- a/trinity/components/eth2/beacon/validator.py +++ b/trinity/components/eth2/beacon/validator.py @@ -89,6 +89,8 @@ GetReadyAttestationsFn = Callable[[Slot, Optional[CommitteeIndex]], Sequence[Attestation]] +GetAggregatableAttestationsFn = Callable[[Slot, Optional[CommitteeIndex]], Sequence[Attestation]] +ImportAttestationFn = Callable[[Attestation], None] class Validator(BaseService): @@ -108,6 +110,8 @@ def __init__( validator_privkeys: Dict[ValidatorIndex, int], event_bus: EndpointAPI, get_ready_attestations_fn: GetReadyAttestationsFn, + get_aggregatable_attestations_fn: GetAggregatableAttestationsFn, + import_attestation_fn: ImportAttestationFn, token: CancelToken = None) -> None: super().__init__(token) self.chain = chain @@ -129,6 +133,8 @@ def __init__( CommitteeAssignment((), CommitteeIndex(-1), Slot(-1)), ) self.get_ready_attestations: GetReadyAttestationsFn = get_ready_attestations_fn + self.get_aggregatable_attestations: GetAggregatableAttestationsFn = get_aggregatable_attestations_fn # noqa: E501 + self.import_attestation: ImportAttestationFn = import_attestation_fn async def _run(self) -> None: self.logger.info( @@ -250,7 +256,7 @@ async def handle_second_tick(self, slot: Slot) -> None: await self.attest(slot) async def handle_third_tick(self, slot: Slot) -> None: - await self._create_and_broadcast_aggregate_and_proof(slot) + await self.create_and_broadcast_aggregate_and_proof(slot) # # Proposing block @@ -335,10 +341,17 @@ def _get_attesting_validator_and_committee_index( self, assignments: Dict[ValidatorIndex, CommitteeAssignment], slot: Slot, - epoch: Epoch + epoch: Epoch, + is_aggregating: bool = False ) -> Iterable[Tuple[ValidatorIndex, CommitteeIndex]]: + # TODO(hwwhww) refactor it for validator_index, assignment in assignments.items(): - if self._is_attesting(validator_index, assignment, slot, epoch): + aggregator_condition = is_aggregating and assignment.slot == slot + attester_condition = ( + not is_aggregating and + self._is_attesting(validator_index, assignment, slot, epoch) + ) + if aggregator_condition or attester_condition: yield (validator_index, assignment.committee_index) async def attest(self, slot: Slot) -> Tuple[Attestation, ...]: @@ -408,9 +421,11 @@ async def attest(self, slot: Slot) -> Tuple[Attestation, ...]: for validator_index in attesting_validators_indices: self.latest_attested_epoch[validator_index] = epoch - self.logger.debug("broadcasting attestation %s", attestation) # await self.p2p_node.broadcast_attestation(attestation) subnet_id = committee_index % ATTESTATION_SUBNET_COUNT + + # Import attestation to pool and then broadcast it + self.import_attestation(attestation) await self.p2p_node.broadcast_attestation_to_subnet(attestation, subnet_id) attestations = attestations + (attestation,) @@ -421,7 +436,9 @@ async def attest(self, slot: Slot) -> Tuple[Attestation, ...]: # # Aggregating attestation # - async def _create_and_broadcast_aggregate_and_proof(self, slot: Slot) -> None: + + async def create_and_broadcast_aggregate_and_proof(self, slot: Slot) -> None: + self.logger.error("In _create_and_broadcast_aggregate_and_proof...") # Check the aggregators selection # aggregate_and_proofs: Tuple[AggregateAndProof] = () state_machine = self.chain.get_state_machine() @@ -439,6 +456,7 @@ async def _create_and_broadcast_aggregate_and_proof(self, slot: Slot) -> None: validator_assignments, slot, epoch, + is_aggregating=True, ) if len(attesting_validators) == 0: return @@ -459,30 +477,48 @@ async def _create_and_broadcast_aggregate_and_proof(self, slot: Slot) -> None: attesting_data[0]: self.validator_privkeys[attesting_data[0]] for attesting_data in group } + selected_proofs: Dict[ValidatorIndex, BLSSignature] = {} for validator_index, privkey in attesting_validator_privkeys.items(): signature = slot_signature( state, slot, privkey, CommitteeConfig(config), ) - if is_aggregator(state, slot, committee_index, signature, CommitteeConfig(config)): + is_aggregator_result = is_aggregator( + state, + slot, + committee_index, + signature, + CommitteeConfig(config), + ) + + if is_aggregator_result: + self.logger.debug( + f"validator ({validator_index}) is aggregator of" + f" committee_index={committee_index} at slot={slot}" + ) selected_proofs[validator_index] = signature for validator_index, selected_proof in selected_proofs.items(): - aggregates = self._get_aggregates(slot, committee_index) + aggregates = self._get_aggregates(slot, committee_index, config) for aggregate in aggregates: aggregate_and_proof = AggregateAndProof( index=validator_index, aggregate=aggregate, selection_proof=selected_proof, ) - self.logger.debug2(f"broadcasting aggregate_and_proof={aggregate_and_proof}") + + # Import attestation to pool and then broadcast it + self.import_attestation(aggregate_and_proof.aggregate) await self.p2p_node.broadcast_beacon_aggregate_and_proof(aggregate_and_proof) @to_tuple - def _get_aggregates(self, slot: Slot, committee_index: CommitteeIndex) -> Iterable[Attestation]: - attestations = self.get_ready_attestations(slot, committee_index) + def _get_aggregates( + self, slot: Slot, committee_index: CommitteeIndex, config: CommitteeConfig + ) -> Iterable[Attestation]: + # TODO: The aggregator should aggregate the late attestations? + aggregatable_attestations = self.get_aggregatable_attestations(slot, committee_index) attestation_groups = groupby( - attestations, + aggregatable_attestations, key=lambda attestation: attestation.data, ) for _, group in attestation_groups: diff --git a/trinity/protocol/bcc_libp2p/servers.py b/trinity/protocol/bcc_libp2p/servers.py index f8e00054db..ff6bf2234b 100644 --- a/trinity/protocol/bcc_libp2p/servers.py +++ b/trinity/protocol/bcc_libp2p/servers.py @@ -196,6 +196,9 @@ async def _run(self) -> None: self.ready.set() await self.cancellation() + # + # Daemon tasks + # async def _handle_message( self, topic: str, @@ -214,7 +217,7 @@ async def _handle_message( async def _handle_beacon_attestation_loop(self) -> None: await self._handle_message( PUBSUB_TOPIC_BEACON_ATTESTATION, - self._handle_beacon_attestations + self._handle_beacon_attestation ) async def _handle_committee_beacon_attestation_loop(self, subnet_id: SubnetId) -> None: @@ -222,7 +225,7 @@ async def _handle_committee_beacon_attestation_loop(self, subnet_id: SubnetId) - topic = PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(subnet_id=str(subnet_id)) await self._handle_message( topic, - self._handle_beacon_attestations, + self._handle_committee_beacon_attestation, ) async def _handle_aggregate_and_proof_loop(self) -> None: @@ -274,34 +277,25 @@ async def _process_orphan_blocks_loop(self) -> None: else: self._process_received_block(block) - async def _handle_committee_beacon_attestations(self, msg: rpc_pb2.Message) -> None: - # TODO - while True: - await self.sleep(0.5) + # + # Message handlers + # + async def _handle_committee_beacon_attestation(self, msg: rpc_pb2.Message) -> None: + await self._handle_beacon_attestation(msg) async def _handle_beacon_aggregate_and_proof(self, msg: rpc_pb2.Message) -> None: aggregate_and_proof = ssz.decode(msg.data, sedes=AggregateAndProof) self.logger.debug("Received aggregate_and_proof=%s", aggregate_and_proof) - # Check if aggregate_and_proof has been seen already. - if not self._is_attestation_new(aggregate_and_proof.aggregate): - return - - # Add new attestation to attestation pool. - self.attestation_pool.add(aggregate_and_proof.aggregate) - self.logger.debug2(f"Adding aggregate from aggregate_and_proof={aggregate_and_proof}") + self._add_attestation(aggregate_and_proof.aggregate) - async def _handle_beacon_attestations(self, msg: rpc_pb2.Message) -> None: + async def _handle_beacon_attestation(self, msg: rpc_pb2.Message) -> None: attestation = ssz.decode(msg.data, sedes=Attestation) self.logger.debug("Received attestation=%s", attestation) - # Check if attestation has been seen already. - if not self._is_attestation_new(attestation): - return - # Add new attestation to attestation pool. - self.attestation_pool.add(attestation) + self._add_attestation(attestation) async def _handle_beacon_block(self, msg: rpc_pb2.Message) -> None: block = ssz.decode(msg.data, BeaconBlock) @@ -315,6 +309,14 @@ def _is_attestation_new(self, attestation: Attestation) -> bool: return False return not self.chain.attestation_exists(attestation.hash_tree_root) + def _add_attestation(self, attestation: Attestation) -> None: + # Check if attestation has been seen already. + if not self._is_attestation_new(attestation): + return + + # Add new attestation to attestation pool. + self.attestation_pool.add(attestation) + def _process_received_block(self, block: BaseBeaconBlock) -> None: # If the block is an orphan, put it to the orphan pool self.logger.debug( @@ -396,6 +398,9 @@ def _is_block_root_seen(self, block_root: SigningRoot) -> bool: def _is_block_seen(self, block: BaseBeaconBlock) -> bool: return self._is_block_root_seen(block_root=block.signing_root) + # + # Exposed APIs for Validator + # @to_tuple def get_ready_attestations( self, @@ -419,3 +424,25 @@ def get_ready_attestations( continue else: yield attestation + + @to_tuple + def get_aggregatable_attestations( + self, + slot: Slot, + committee_index: Optional[CommitteeIndex] = None + ) -> Iterable[Attestation]: + for attestation in self.attestation_pool.get_all(): + try: + # Filter by committee_index + if committee_index is not None and committee_index != attestation.data.index: + continue + if slot != attestation.data.slot: + continue + + except ValidationError: + continue + else: + yield attestation + + def import_attestation(self, attestation: Attestation) -> None: + self.attestation_pool.add(attestation) diff --git a/trinity/protocol/bcc_libp2p/topic_validators.py b/trinity/protocol/bcc_libp2p/topic_validators.py index beab14aaa1..a5f5e054ac 100644 --- a/trinity/protocol/bcc_libp2p/topic_validators.py +++ b/trinity/protocol/bcc_libp2p/topic_validators.py @@ -108,10 +108,23 @@ def beacon_attestation_validator(msg_forwarder: ID, msg: rpc_pb2.Message) -> boo # Fast forward to state in future slot in order to pass # attestation.data.slot validity check - future_state = state_machine.state_transition.apply_state_transition( - state, - future_slot=Slot(attestation.data.slot + config.MIN_ATTESTATION_INCLUSION_DELAY), + future_slot = max( + Slot(attestation.data.slot + config.MIN_ATTESTATION_INCLUSION_DELAY), + state.slot ) + try: + future_state = state_machine.state_transition.apply_state_transition( + state, + future_slot=future_slot, + ) + except ValidationError as error: + logger.error( + bold_red("Failed to fast forward to state at slot=%d, error=%s"), + future_slot, + str(error), + ) + return False + try: validate_attestation( future_state, @@ -156,10 +169,13 @@ def validate_aggregate_and_proof( state, aggregate_and_proof.aggregate.data, aggregate_and_proof.aggregate.aggregation_bits, - config + config, ) if aggregate_and_proof.index not in attesting_indices: - raise ValidationError("The aggregator index is not within the aggregate's committee") + raise ValidationError( + f"The aggregator index ({aggregate_and_proof.index}) is not within" + f" the aggregate's committee {attesting_indices}" + ) if not is_aggregator( state, @@ -194,7 +210,7 @@ def beacon_aggregate_and_proof_validator(msg_forwarder: ID, msg: rpc_pb2.Message config = state_machine.config state = chain.get_head_state() - attestation = aggregate_and_proof.aggregate_and_proof + attestation = aggregate_and_proof.aggregate # Check that beacon blocks attested to by the attestation are validated try: @@ -211,10 +227,23 @@ def beacon_aggregate_and_proof_validator(msg_forwarder: ID, msg: rpc_pb2.Message # Fast forward to state in future slot in order to pass # attestation.data.slot validity check - future_state = state_machine.state_transition.apply_state_transition( - state, - future_slot=Slot(attestation.data.slot + config.MIN_ATTESTATION_INCLUSION_DELAY), + future_slot = max( + Slot(attestation.data.slot + config.MIN_ATTESTATION_INCLUSION_DELAY), + state.slot ) + try: + future_state = state_machine.state_transition.apply_state_transition( + state, + future_slot=future_slot, + ) + except ValidationError as error: + logger.error( + bold_red("Failed to fast forward to state at slot=%d, error=%s"), + future_slot, + str(error), + ) + return False + try: validate_aggregate_and_proof( future_state, @@ -229,7 +258,5 @@ def beacon_aggregate_and_proof_validator(msg_forwarder: ID, msg: rpc_pb2.Message ) return False - logger.debug2(f"aggregate_and_proof={aggregate_and_proof} passed") - return True return beacon_aggregate_and_proof_validator