Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
Add validate_aggregate_and_proof
Browse files Browse the repository at this point in the history
  • Loading branch information
hwwhww committed Nov 21, 2019
1 parent 68c1e92 commit c68b5be
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 82 deletions.
26 changes: 25 additions & 1 deletion eth2/beacon/tools/builder/aggregator.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from typing import Sequence

from eth_typing import BLSSignature
from eth_utils import ValidationError, encode_hex
from ssz import get_hash_tree_root, uint64

from eth2._utils.bls import bls
from eth2._utils.hash import hash_eth2
from eth2.beacon.committee_helpers import get_beacon_committee
from eth2.beacon.helpers import compute_epoch_at_slot, get_domain
from eth2.beacon.signature_domain import SignatureDomain
from eth2.beacon.types.aggregate_and_proof import AggregateAndProof
from eth2.beacon.types.attestations import Attestation
from eth2.beacon.types.states import BeaconState
from eth2.beacon.typing import CommitteeIndex, Slot
Expand Down Expand Up @@ -65,7 +67,7 @@ def get_aggregate_from_valid_committee_attestations(
"""
Return the aggregate attestation.
The given attestations have the same `data: AttestationData` and are valid.
The given attestations SHOULD have the same `data: AttestationData` and are valid.
"""
signatures = [attestation.signature for attestation in attestations]
aggregate_signature = bls.aggregate_signatures(signatures)
Expand All @@ -80,3 +82,25 @@ def get_aggregate_from_valid_committee_attestations(
aggregation_bits=aggregation_bits,
signature=aggregate_signature,
)


def validate_aggregator_proof(
state: BeaconState, aggregate_and_proof: AggregateAndProof, config: CommitteeConfig
) -> None:
slot = aggregate_and_proof.aggregate.data.slot
pubkey = state.validators[aggregate_and_proof.index]
domain = get_domain(
state,
SignatureDomain.DOMAIN_BEACON_ATTESTER,
config.SLOTS_PER_EPOCH,
message_epoch=compute_epoch_at_slot(slot, config.SLOTS_PER_EPOCH),
)
message_hash = get_hash_tree_root(slot, sedes=uint64)
if not bls.verify(message_hash, pubkey, aggregate_and_proof.proof, domain):
raise ValidationError(
"Incorrect selection proof:"
f" aggregate_and_proof={aggregate_and_proof}"
f" pubkey={encode_hex(pubkey)}"
f" domain={domain}"
f" message_hash={message_hash}"
)
150 changes: 78 additions & 72 deletions trinity/components/eth2/beacon/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
SerenityBeaconBlock,
)
from eth2.beacon.tools.builder.aggregator import (
get_aggregate_attestation,
get_aggregate_from_valid_committee_attestations,
is_aggregator,
slot_signature,
)
Expand Down Expand Up @@ -250,78 +250,11 @@ async def handle_second_tick(self, slot: Slot) -> None:
await self.attest(slot)

async def handle_third_tick(self, slot: Slot) -> None:
# TODO: Add aggregator strategy
pass

async def _create_and_broadcast_aggregate_and_proof(self, slot: Slot) -> None:
# Check the aggregators selection
# aggregate_and_proofs: Tuple[AggregateAndProof] = ()
state_machine = self.chain.get_state_machine()
state = self.chain.get_head_state()
config = state_machine.config
epoch = compute_epoch_at_slot(slot, self.slots_per_epoch)
validator_assignments = {
validator_index: self._get_this_epoch_assignment(
validator_index,
epoch,
)
for validator_index in self.validator_privkeys
}
attesting_validators = self._get_attesting_validator_and_committee_index(
validator_assignments,
slot,
epoch,
)
if len(attesting_validators) == 0:
return

# Sort the attesting validators by committee index
sorted_attesting_validators = sorted(
attesting_validators,
key=itemgetter(1),
)
# Group the attesting validators by committee index
attesting_validators_groups = groupby(
sorted_attesting_validators,
key=itemgetter(1),
)
for committee_index, group in attesting_validators_groups:
# Get the validator_index -> privkey map of the attesting validators
attesting_validator_privkeys = {
attesting_data[0]: self.validator_privkeys[attesting_data[0]]
for attesting_data in group
}
selected_proofs: Dict[ValidatorIndex, BLSSignature] = {}
for validator_index, privkey in attesting_validator_privkeys.items():
signature = slot_signature(
state, slot, privkey, CommitteeConfig(config),
)
if is_aggregator(state, slot, committee_index, signature, CommitteeConfig(config)):
selected_proofs[validator_index] = signature

for validator_index, selected_proof in selected_proofs.items():
aggregates = self._get_aggregates(slot, committee_index)
for aggregate in aggregates:
aggregate_and_proof = AggregateAndProof(
index=validator_index,
aggregate=aggregate,
selection_proof=selected_proof,
)
# aggregate_and_proofs += aggregate_and_proof
await self.p2p_node.broadcast_beacon_aggregate_and_proof(aggregate_and_proof)


@to_tuple
def _get_aggregates(self, slot: Slot, committee_index: CommitteeIndex) -> Iterable[Attestation]:
attestations = self.get_ready_attestations(slot, committee_index)
attestation_groups = groupby(
attestations,
key=lambda attestation: attestation.data,
)
for _, group in attestation_groups:
yield get_aggregate_attestation(tuple(group))

await self._create_and_broadcast_aggregate_and_proof(slot)

#
# Proposing block
#
async def propose_block(self,
proposer_index: ValidatorIndex,
slot: Slot,
Expand Down Expand Up @@ -386,6 +319,9 @@ def skip_block(self,
self.chain.chaindb.persist_state(post_state)
return post_state

#
# Attesting attestation
#
def _is_attesting(self,
validator_index: ValidatorIndex,
assignment: CommitteeAssignment,
Expand Down Expand Up @@ -481,3 +417,73 @@ async def attest(self, slot: Slot) -> Tuple[Attestation, ...]:
# TODO: Aggregate attestations

return attestations

#
# Aggregating attestation
#
async def _create_and_broadcast_aggregate_and_proof(self, slot: Slot) -> None:
# Check the aggregators selection
# aggregate_and_proofs: Tuple[AggregateAndProof] = ()
state_machine = self.chain.get_state_machine()
state = self.chain.get_head_state()
config = state_machine.config
epoch = compute_epoch_at_slot(slot, self.slots_per_epoch)
validator_assignments = {
validator_index: self._get_this_epoch_assignment(
validator_index,
epoch,
)
for validator_index in self.validator_privkeys
}
attesting_validators = self._get_attesting_validator_and_committee_index(
validator_assignments,
slot,
epoch,
)
if len(attesting_validators) == 0:
return

# Sort the attesting validators by committee index
sorted_attesting_validators = sorted(
attesting_validators,
key=itemgetter(1),
)
# Group the attesting validators by committee index
attesting_validators_groups = groupby(
sorted_attesting_validators,
key=itemgetter(1),
)
for committee_index, group in attesting_validators_groups:
# Get the validator_index -> privkey map of the attesting validators
attesting_validator_privkeys = {
attesting_data[0]: self.validator_privkeys[attesting_data[0]]
for attesting_data in group
}
selected_proofs: Dict[ValidatorIndex, BLSSignature] = {}
for validator_index, privkey in attesting_validator_privkeys.items():
signature = slot_signature(
state, slot, privkey, CommitteeConfig(config),
)
if is_aggregator(state, slot, committee_index, signature, CommitteeConfig(config)):
selected_proofs[validator_index] = signature

for validator_index, selected_proof in selected_proofs.items():
aggregates = self._get_aggregates(slot, committee_index)
for aggregate in aggregates:
aggregate_and_proof = AggregateAndProof(
index=validator_index,
aggregate=aggregate,
selection_proof=selected_proof,
)
self.logger.debug2(f"broadcasting aggregate_and_proof={aggregate_and_proof}")
await self.p2p_node.broadcast_beacon_aggregate_and_proof(aggregate_and_proof)

@to_tuple
def _get_aggregates(self, slot: Slot, committee_index: CommitteeIndex) -> Iterable[Attestation]:
attestations = self.get_ready_attestations(slot, committee_index)
attestation_groups = groupby(
attestations,
key=lambda attestation: attestation.data,
)
for _, group in attestation_groups:
yield get_aggregate_from_valid_committee_attestations(tuple(group))
17 changes: 15 additions & 2 deletions trinity/protocol/bcc_libp2p/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
BeaconBlocksByRootRequest,
)
from .topic_validators import (
get_beacon_aggregate_and_proof_validator,
get_beacon_attestation_validator,
get_beacon_block_validator,
)
Expand Down Expand Up @@ -332,6 +333,7 @@ async def start(self) -> None:
# Global channel
await self.pubsub.subscribe(PUBSUB_TOPIC_BEACON_BLOCK)
await self.pubsub.subscribe(PUBSUB_TOPIC_BEACON_ATTESTATION)
await self.pubsub.subscribe(PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF)
# Attestation subnets
for subnet_id in self.subnets:
topic = PUBSUB_TOPIC_COMMITTEE_BEACON_ATTESTATION.substitute(subnet_id=str(subnet_id))
Expand Down Expand Up @@ -361,6 +363,12 @@ def _setup_topic_validators(self) -> None:
False,
)

self.pubsub.set_topic_validator(
PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF,
get_beacon_aggregate_and_proof_validator(self.chain),
False,
)

async def dial_peer(self, ip: str, port: int, peer_id: ID) -> None:
"""
Dial the peer ``peer_id`` through the IPv4 protocol
Expand Down Expand Up @@ -452,8 +460,13 @@ async def broadcast_attestation_to_subnet(
ssz.encode(attestation)
)

async def broadcast_beacon_aggregate_and_proof(self, aggregate_and_proof: AggregateAndProof) -> None:
await self._broadcast_data(PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF, ssz.encode(aggregate_and_proof))
async def broadcast_beacon_aggregate_and_proof(
self, aggregate_and_proof: AggregateAndProof
) -> None:
await self._broadcast_data(
PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF,
ssz.encode(aggregate_and_proof),
)

async def _broadcast_data(self, topic: str, data: bytes) -> None:
await self.pubsub.publish(topic, data)
Expand Down
16 changes: 12 additions & 4 deletions trinity/protocol/bcc_libp2p/servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
BaseBeaconChain,
)
from eth2.beacon.operations.pool import OperationPool
from eth2.beacon.types.aggregate_and_proof import AggregateAndProof
from eth2.beacon.types.attestations import (
Attestation,
)
Expand Down Expand Up @@ -225,7 +226,6 @@ async def _handle_committee_beacon_attestation_loop(self, subnet_id: SubnetId) -
)

async def _handle_aggregate_and_proof_loop(self) -> None:
await self.sleep(0.5)
await self._handle_message(
PUBSUB_TOPIC_BEACON_AGGREGATE_AND_PROOF,
self._handle_beacon_aggregate_and_proof,
Expand Down Expand Up @@ -280,9 +280,17 @@ async def _handle_committee_beacon_attestations(self, msg: rpc_pb2.Message) -> N
await self.sleep(0.5)

async def _handle_beacon_aggregate_and_proof(self, msg: rpc_pb2.Message) -> None:
# TODO
while True:
await self.sleep(0.5)
aggregate_and_proof = ssz.decode(msg.data, sedes=AggregateAndProof)

self.logger.debug("Received aggregate_and_proof=%s", aggregate_and_proof)

# Check if aggregate_and_proof has been seen already.
if not self._is_attestation_new(aggregate_and_proof.aggregate):
return

# Add new attestation to attestation pool.
self.attestation_pool.add(aggregate_and_proof.aggregate)
self.logger.debug2(f"Adding aggregate from aggregate_and_proof={aggregate_and_proof}")

async def _handle_beacon_attestations(self, msg: rpc_pb2.Message) -> None:
attestation = ssz.decode(msg.data, sedes=Attestation)
Expand Down
Loading

0 comments on commit c68b5be

Please sign in to comment.