Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
Make aggregation work
Browse files Browse the repository at this point in the history
  • Loading branch information
hwwhww committed Nov 21, 2019
1 parent c68b5be commit adbc6a1
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 43 deletions.
10 changes: 8 additions & 2 deletions eth2/beacon/tools/builder/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,21 @@ def validate_aggregator_proof(
state: BeaconState, aggregate_and_proof: AggregateAndProof, config: CommitteeConfig
) -> None:
slot = aggregate_and_proof.aggregate.data.slot
pubkey = state.validators[aggregate_and_proof.index]
pubkey = state.validators[aggregate_and_proof.index].pubkey
domain = get_domain(
state,
SignatureDomain.DOMAIN_BEACON_ATTESTER,
config.SLOTS_PER_EPOCH,
message_epoch=compute_epoch_at_slot(slot, config.SLOTS_PER_EPOCH),
)
message_hash = get_hash_tree_root(slot, sedes=uint64)
if not bls.verify(message_hash, pubkey, aggregate_and_proof.proof, domain):

if not bls.verify(
message_hash=message_hash,
pubkey=pubkey,
signature=aggregate_and_proof.selection_proof,
domain=domain,
):
raise ValidationError(
"Incorrect selection proof:"
f" aggregate_and_proof={aggregate_and_proof}"
Expand Down
2 changes: 2 additions & 0 deletions trinity/components/eth2/beacon/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ def do_start(self) -> None:
event_bus=self.event_bus,
token=libp2p_node.cancel_token,
get_ready_attestations_fn=receive_server.get_ready_attestations,
get_aggregatable_attestations_fn=receive_server.get_aggregatable_attestations,
import_attestation_fn=receive_server.import_attestation,
)

slot_ticker = SlotTicker(
Expand Down
58 changes: 47 additions & 11 deletions trinity/components/eth2/beacon/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@


GetReadyAttestationsFn = Callable[[Slot, Optional[CommitteeIndex]], Sequence[Attestation]]
GetAggregatableAttestationsFn = Callable[[Slot, Optional[CommitteeIndex]], Sequence[Attestation]]
ImportAttestationFn = Callable[[Attestation], None]


class Validator(BaseService):
Expand All @@ -108,6 +110,8 @@ def __init__(
validator_privkeys: Dict[ValidatorIndex, int],
event_bus: EndpointAPI,
get_ready_attestations_fn: GetReadyAttestationsFn,
get_aggregatable_attestations_fn: GetAggregatableAttestationsFn,
import_attestation_fn: ImportAttestationFn,
token: CancelToken = None) -> None:
super().__init__(token)
self.chain = chain
Expand All @@ -129,6 +133,8 @@ def __init__(
CommitteeAssignment((), CommitteeIndex(-1), Slot(-1)),
)
self.get_ready_attestations: GetReadyAttestationsFn = get_ready_attestations_fn
self.get_aggregatable_attestations: GetAggregatableAttestationsFn = get_aggregatable_attestations_fn # noqa: E501
self.import_attestation: ImportAttestationFn = import_attestation_fn

async def _run(self) -> None:
self.logger.info(
Expand Down Expand Up @@ -250,7 +256,7 @@ async def handle_second_tick(self, slot: Slot) -> None:
await self.attest(slot)

async def handle_third_tick(self, slot: Slot) -> None:
await self._create_and_broadcast_aggregate_and_proof(slot)
await self.create_and_broadcast_aggregate_and_proof(slot)

#
# Proposing block
Expand Down Expand Up @@ -335,10 +341,17 @@ def _get_attesting_validator_and_committee_index(
self,
assignments: Dict[ValidatorIndex, CommitteeAssignment],
slot: Slot,
epoch: Epoch
epoch: Epoch,
is_aggregating: bool = False
) -> Iterable[Tuple[ValidatorIndex, CommitteeIndex]]:
# TODO(hwwhww) refactor it
for validator_index, assignment in assignments.items():
if self._is_attesting(validator_index, assignment, slot, epoch):
aggregator_condition = is_aggregating and assignment.slot == slot
attester_condition = (
not is_aggregating and
self._is_attesting(validator_index, assignment, slot, epoch)
)
if aggregator_condition or attester_condition:
yield (validator_index, assignment.committee_index)

async def attest(self, slot: Slot) -> Tuple[Attestation, ...]:
Expand Down Expand Up @@ -408,9 +421,11 @@ async def attest(self, slot: Slot) -> Tuple[Attestation, ...]:
for validator_index in attesting_validators_indices:
self.latest_attested_epoch[validator_index] = epoch

self.logger.debug("broadcasting attestation %s", attestation)
# await self.p2p_node.broadcast_attestation(attestation)
subnet_id = committee_index % ATTESTATION_SUBNET_COUNT

# Import attestation to pool and then broadcast it
self.import_attestation(attestation)
await self.p2p_node.broadcast_attestation_to_subnet(attestation, subnet_id)

attestations = attestations + (attestation,)
Expand All @@ -421,7 +436,9 @@ async def attest(self, slot: Slot) -> Tuple[Attestation, ...]:
#
# Aggregating attestation
#
async def _create_and_broadcast_aggregate_and_proof(self, slot: Slot) -> None:

async def create_and_broadcast_aggregate_and_proof(self, slot: Slot) -> None:
self.logger.error("In _create_and_broadcast_aggregate_and_proof...")
# Check the aggregators selection
# aggregate_and_proofs: Tuple[AggregateAndProof] = ()
state_machine = self.chain.get_state_machine()
Expand All @@ -439,6 +456,7 @@ async def _create_and_broadcast_aggregate_and_proof(self, slot: Slot) -> None:
validator_assignments,
slot,
epoch,
is_aggregating=True,
)
if len(attesting_validators) == 0:
return
Expand All @@ -459,30 +477,48 @@ async def _create_and_broadcast_aggregate_and_proof(self, slot: Slot) -> None:
attesting_data[0]: self.validator_privkeys[attesting_data[0]]
for attesting_data in group
}

selected_proofs: Dict[ValidatorIndex, BLSSignature] = {}
for validator_index, privkey in attesting_validator_privkeys.items():
signature = slot_signature(
state, slot, privkey, CommitteeConfig(config),
)
if is_aggregator(state, slot, committee_index, signature, CommitteeConfig(config)):
is_aggregator_result = is_aggregator(
state,
slot,
committee_index,
signature,
CommitteeConfig(config),
)

if is_aggregator_result:
self.logger.debug(
f"validator ({validator_index}) is aggregator of"
f" committee_index={committee_index} at slot={slot}"
)
selected_proofs[validator_index] = signature

for validator_index, selected_proof in selected_proofs.items():
aggregates = self._get_aggregates(slot, committee_index)
aggregates = self._get_aggregates(slot, committee_index, config)
for aggregate in aggregates:
aggregate_and_proof = AggregateAndProof(
index=validator_index,
aggregate=aggregate,
selection_proof=selected_proof,
)
self.logger.debug2(f"broadcasting aggregate_and_proof={aggregate_and_proof}")

# Import attestation to pool and then broadcast it
self.import_attestation(aggregate_and_proof.aggregate)
await self.p2p_node.broadcast_beacon_aggregate_and_proof(aggregate_and_proof)

@to_tuple
def _get_aggregates(self, slot: Slot, committee_index: CommitteeIndex) -> Iterable[Attestation]:
attestations = self.get_ready_attestations(slot, committee_index)
def _get_aggregates(
self, slot: Slot, committee_index: CommitteeIndex, config: CommitteeConfig
) -> Iterable[Attestation]:
# TODO: The aggregator should aggregate the late attestations?
aggregatable_attestations = self.get_aggregatable_attestations(slot, committee_index)
attestation_groups = groupby(
attestations,
aggregatable_attestations,
key=lambda attestation: attestation.data,
)
for _, group in attestation_groups:
Expand Down
65 changes: 46 additions & 19 deletions trinity/protocol/bcc_libp2p/servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ async def _run(self) -> None:
self.ready.set()
await self.cancellation()

#
# Daemon tasks
#
async def _handle_message(
self,
topic: str,
Expand All @@ -214,15 +217,15 @@ async def _handle_message(
async def _handle_beacon_attestation_loop(self) -> None:
await self._handle_message(
PUBSUB_TOPIC_BEACON_ATTESTATION,
self._handle_beacon_attestations
self._handle_beacon_attestation
)

async def _handle_committee_beacon_attestation_loop(self, subnet_id: SubnetId) -> None:
await self.sleep(0.5)
topic = PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(subnet_id=str(subnet_id))
await self._handle_message(
topic,
self._handle_beacon_attestations,
self._handle_committee_beacon_attestation,
)

async def _handle_aggregate_and_proof_loop(self) -> None:
Expand Down Expand Up @@ -274,34 +277,25 @@ async def _process_orphan_blocks_loop(self) -> None:
else:
self._process_received_block(block)

async def _handle_committee_beacon_attestations(self, msg: rpc_pb2.Message) -> None:
# TODO
while True:
await self.sleep(0.5)
#
# Message handlers
#
async def _handle_committee_beacon_attestation(self, msg: rpc_pb2.Message) -> None:
await self._handle_beacon_attestation(msg)

async def _handle_beacon_aggregate_and_proof(self, msg: rpc_pb2.Message) -> None:
aggregate_and_proof = ssz.decode(msg.data, sedes=AggregateAndProof)

self.logger.debug("Received aggregate_and_proof=%s", aggregate_and_proof)

# Check if aggregate_and_proof has been seen already.
if not self._is_attestation_new(aggregate_and_proof.aggregate):
return

# Add new attestation to attestation pool.
self.attestation_pool.add(aggregate_and_proof.aggregate)
self.logger.debug2(f"Adding aggregate from aggregate_and_proof={aggregate_and_proof}")
self._add_attestation(aggregate_and_proof.aggregate)

async def _handle_beacon_attestations(self, msg: rpc_pb2.Message) -> None:
async def _handle_beacon_attestation(self, msg: rpc_pb2.Message) -> None:
attestation = ssz.decode(msg.data, sedes=Attestation)

self.logger.debug("Received attestation=%s", attestation)

# Check if attestation has been seen already.
if not self._is_attestation_new(attestation):
return
# Add new attestation to attestation pool.
self.attestation_pool.add(attestation)
self._add_attestation(attestation)

async def _handle_beacon_block(self, msg: rpc_pb2.Message) -> None:
block = ssz.decode(msg.data, BeaconBlock)
Expand All @@ -315,6 +309,14 @@ def _is_attestation_new(self, attestation: Attestation) -> bool:
return False
return not self.chain.attestation_exists(attestation.hash_tree_root)

def _add_attestation(self, attestation: Attestation) -> None:
# Check if attestation has been seen already.
if not self._is_attestation_new(attestation):
return

# Add new attestation to attestation pool.
self.attestation_pool.add(attestation)

def _process_received_block(self, block: BaseBeaconBlock) -> None:
# If the block is an orphan, put it to the orphan pool
self.logger.debug(
Expand Down Expand Up @@ -396,6 +398,9 @@ def _is_block_root_seen(self, block_root: SigningRoot) -> bool:
def _is_block_seen(self, block: BaseBeaconBlock) -> bool:
return self._is_block_root_seen(block_root=block.signing_root)

#
# Exposed APIs for Validator
#
@to_tuple
def get_ready_attestations(
self,
Expand All @@ -419,3 +424,25 @@ def get_ready_attestations(
continue
else:
yield attestation

@to_tuple
def get_aggregatable_attestations(
self,
slot: Slot,
committee_index: Optional[CommitteeIndex] = None
) -> Iterable[Attestation]:
for attestation in self.attestation_pool.get_all():
try:
# Filter by committee_index
if committee_index is not None and committee_index != attestation.data.index:
continue
if slot != attestation.data.slot:
continue

except ValidationError:
continue
else:
yield attestation

def import_attestation(self, attestation: Attestation) -> None:
self.attestation_pool.add(attestation)
49 changes: 38 additions & 11 deletions trinity/protocol/bcc_libp2p/topic_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,23 @@ def beacon_attestation_validator(msg_forwarder: ID, msg: rpc_pb2.Message) -> boo

# Fast forward to state in future slot in order to pass
# attestation.data.slot validity check
future_state = state_machine.state_transition.apply_state_transition(
state,
future_slot=Slot(attestation.data.slot + config.MIN_ATTESTATION_INCLUSION_DELAY),
future_slot = max(
Slot(attestation.data.slot + config.MIN_ATTESTATION_INCLUSION_DELAY),
state.slot
)
try:
future_state = state_machine.state_transition.apply_state_transition(
state,
future_slot=future_slot,
)
except ValidationError as error:
logger.error(
bold_red("Failed to fast forward to state at slot=%d, error=%s"),
future_slot,
str(error),
)
return False

try:
validate_attestation(
future_state,
Expand Down Expand Up @@ -156,10 +169,13 @@ def validate_aggregate_and_proof(
state,
aggregate_and_proof.aggregate.data,
aggregate_and_proof.aggregate.aggregation_bits,
config
config,
)
if aggregate_and_proof.index not in attesting_indices:
raise ValidationError("The aggregator index is not within the aggregate's committee")
raise ValidationError(
f"The aggregator index ({aggregate_and_proof.index}) is not within"
f" the aggregate's committee {attesting_indices}"
)

if not is_aggregator(
state,
Expand Down Expand Up @@ -194,7 +210,7 @@ def beacon_aggregate_and_proof_validator(msg_forwarder: ID, msg: rpc_pb2.Message
config = state_machine.config
state = chain.get_head_state()

attestation = aggregate_and_proof.aggregate_and_proof
attestation = aggregate_and_proof.aggregate

# Check that beacon blocks attested to by the attestation are validated
try:
Expand All @@ -211,10 +227,23 @@ def beacon_aggregate_and_proof_validator(msg_forwarder: ID, msg: rpc_pb2.Message

# Fast forward to state in future slot in order to pass
# attestation.data.slot validity check
future_state = state_machine.state_transition.apply_state_transition(
state,
future_slot=Slot(attestation.data.slot + config.MIN_ATTESTATION_INCLUSION_DELAY),
future_slot = max(
Slot(attestation.data.slot + config.MIN_ATTESTATION_INCLUSION_DELAY),
state.slot
)
try:
future_state = state_machine.state_transition.apply_state_transition(
state,
future_slot=future_slot,
)
except ValidationError as error:
logger.error(
bold_red("Failed to fast forward to state at slot=%d, error=%s"),
future_slot,
str(error),
)
return False

try:
validate_aggregate_and_proof(
future_state,
Expand All @@ -229,7 +258,5 @@ def beacon_aggregate_and_proof_validator(msg_forwarder: ID, msg: rpc_pb2.Message
)
return False

logger.debug2(f"aggregate_and_proof={aggregate_and_proof} passed")

return True
return beacon_aggregate_and_proof_validator

0 comments on commit adbc6a1

Please sign in to comment.