diff --git a/dot/parachain/dispute/participation.go b/dot/parachain/dispute/participation.go new file mode 100644 index 0000000000..407ce20fc1 --- /dev/null +++ b/dot/parachain/dispute/participation.go @@ -0,0 +1,262 @@ +package dispute + +import ( + "fmt" + "github.com/ChainSafe/gossamer/internal/log" + "github.com/ChainSafe/gossamer/lib/common" + "github.com/ChainSafe/gossamer/lib/parachain" + "github.com/ChainSafe/gossamer/pkg/scale" + "sync" + "sync/atomic" +) + +// CandidateComparator comparator for ordering of disputes for candidate. +type CandidateComparator struct { + relayParentBlockNumber *uint32 + candidateHash common.Hash +} + +// NewCandidateComparator creates a new CandidateComparator. +func NewCandidateComparator(relayParentBlockNumber *uint32, receipt parachain.CandidateReceipt) (CandidateComparator, error) { + encodedReceipt, err := scale.Marshal(receipt) + if err != nil { + return CandidateComparator{}, fmt.Errorf("encode candidate receipt: %w", err) + } + + candidateHash, err := common.Blake2bHash(encodedReceipt) + if err != nil { + return CandidateComparator{}, fmt.Errorf("hash candidate receipt: %w", err) + } + + return CandidateComparator{ + relayParentBlockNumber: relayParentBlockNumber, + candidateHash: candidateHash, + }, nil +} + +// ParticipationRequest a dispute participation request +type ParticipationRequest struct { + candidateHash common.Hash + candidateReceipt parachain.CandidateReceipt + session parachain.SessionIndex + //TODO: requestTimer for metrics +} + +// ParticipationStatement is a statement as result of the validation process. +type ParticipationStatement struct { + Session parachain.SessionIndex + CandidateHash common.Hash + CandidateReceipt parachain.CandidateReceipt + Outcome ParticipationOutcome +} + +// ValidOutcome is the outcome when the candidate is valid. +type ValidOutcome struct{} + +// Index returns the index of the type. +func (ValidOutcome) Index() uint { + return 0 +} + +// InvalidOutcome is the outcome when the candidate is invalid. +type InvalidOutcome struct{} + +// Index returns the index of the type. +func (InvalidOutcome) Index() uint { + return 1 +} + +// UnAvailableOutcome is the outcome when the candidate is unavailable. +type UnAvailableOutcome struct{} + +// Index returns the index of the type. +func (UnAvailableOutcome) Index() uint { + return 2 +} + +// ErrorOutcome is the outcome when the candidate has an error. +type ErrorOutcome struct{} + +// Index returns the index of the type. +func (ErrorOutcome) Index() uint { + return 3 +} + +// ParticipationOutcome is the outcome of the validation process. +type ParticipationOutcome scale.VaryingDataType + +// Set will set a VaryingDataTypeValue using the underlying VaryingDataType +func (po *ParticipationOutcome) Set(val scale.VaryingDataTypeValue) (err error) { + vdt := scale.VaryingDataType(*po) + err = vdt.Set(val) + if err != nil { + return fmt.Errorf("setting value to varying data type: %w", err) + } + *po = ParticipationOutcome(vdt) + return nil +} + +// Value returns the value from the underlying VaryingDataType +func (po *ParticipationOutcome) Value() (scale.VaryingDataTypeValue, error) { + vdt := scale.VaryingDataType(*po) + return vdt.Value() +} + +// Validity returns true if the outcome is valid. +func (po *ParticipationOutcome) Validity() (bool, error) { + val, err := po.Value() + if err != nil { + return false, fmt.Errorf("getting value from varying data type: %w", err) + } + + _, ok := val.(ValidOutcome) + if !ok { + return true, nil + } + + return false, nil +} + +// Participation keeps track of the disputes we need to participate in. +type Participation interface { + // Queue a dispute for the node to participate in + Queue(request ParticipationRequest, priority ParticipationPriority) error + + // Clear clears a participation request. This is called when we have the dispute result. + Clear(candidateHash common.Hash) error + + // TODO: implement this once we have the message bus + // ProcessActiveLeavesUpdate processes an active leaves update + // ProcessActiveLeavesUpdate(update parachain.ActiveLeavesUpdate) error + + // BumpPriority bumps the priority for the given receipts + BumpPriority(receipts []parachain.CandidateReceipt) error +} + +type block struct { + Number uint32 + Hash common.Hash +} + +type participation struct { + runningParticipation sync.Map + workers atomic.Int32 + + queue Queue + //TODO: worker sender channel + recentBlock *block + + //TODO: metrics +} + +const MaxParallelParticipation = 3 + +func (p *participation) Queue(request ParticipationRequest, priority ParticipationPriority) error { + if _, ok := p.runningParticipation.Load(request.candidateHash); ok { + return nil + } + + // if we already have a recent block, participate right away + if p.recentBlock != nil && p.numberOfWorkers() < MaxParallelParticipation { + if err := p.forkParticipation(&request, p.recentBlock.Hash); err != nil { + return fmt.Errorf("fork participation: %w", err) + } + + return nil + } + + // TODO: it looks like the relayParentBlockNumber needs to be fetched from the overseer which is not available at the moment. + comparator, err := NewCandidateComparator(nil, request.candidateReceipt) + if err != nil { + return fmt.Errorf("create candidate comparator: %w", err) + } + + if err := p.queue.Queue(comparator, &request, priority); err != nil { + return fmt.Errorf("queue participation request: %w", err) + } + + return nil +} + +func (p *participation) Clear(candidateHash common.Hash) error { + p.runningParticipation.Delete(candidateHash) + p.workers.Add(-1) + + if p.recentBlock == nil { + panic("we never ever reset recentBlock to nil and we already received a result, so it must have been set before. qed") + } + + if err := p.dequeueUntilCapacity(p.recentBlock.Hash); err != nil { + return fmt.Errorf("dequeue until capacity: %w", err) + } + + return nil +} + +func (p *participation) BumpPriority(receipts []parachain.CandidateReceipt) error { + for _, receipt := range receipts { + // TODO: it looks like the relayParentBlockNumber needs to be fetched from the overseer which is not available at the moment. + comparator, err := NewCandidateComparator(nil, receipt) + if err != nil { + log.Errorf("failed to create candidate comparator: %s", err) + continue + } + + if err := p.queue.PrioritiseIfPresent(comparator); err != nil { + log.Errorf("failed to prioritise candidate comparator: %s", err) + } + } + + return nil +} + +func (p *participation) numberOfWorkers() int { + return int(p.workers.Load()) +} + +func (p *participation) dequeueUntilCapacity(recentHead common.Hash) error { + for { + if p.numberOfWorkers() >= MaxParallelParticipation { + break + } + + if request := p.queue.Dequeue(); request != nil { + if err := p.forkParticipation(request.request, recentHead); err != nil { + log.Errorf("failed to fork participation: %s", err) + } + } else { + break + } + } + + return nil +} + +func (p *participation) forkParticipation(request *ParticipationRequest, recentHead common.Hash) error { + _, ok := p.runningParticipation.LoadOrStore(request.candidateHash, nil) + if ok { + return nil + } + + p.workers.Add(1) + go func() { + if err := p.participate(); err != nil { + log.Errorf("failed to participate in dispute: %s", err) + } + }() + + return nil +} + +// TODO: implement this once we have the network protocols +func (p *participation) participate() error { + //TODO implement me + panic("implement me") +} + +func NewParticipation() Participation { + return &participation{ + runningParticipation: sync.Map{}, + queue: NewQueue(), + } +} diff --git a/dot/parachain/dispute/queues.go b/dot/parachain/dispute/queues.go index fdab029fbf..fd86470893 100644 --- a/dot/parachain/dispute/queues.go +++ b/dot/parachain/dispute/queues.go @@ -2,8 +2,6 @@ package dispute import ( "bytes" - "github.com/ChainSafe/gossamer/lib/common" - "github.com/ChainSafe/gossamer/lib/parachain" "sync" "github.com/google/btree" @@ -22,20 +20,6 @@ import ( // However, I will not be implementing it right away. It will be picked up as a single task for the // entire dispute module https://github.com/ChainSafe/gossamer/issues/3313. -// CandidateComparator comparator for ordering of disputes for candidate. -type CandidateComparator struct { - relayParentBlockNumber *uint32 - candidateHash common.Hash -} - -// ParticipationRequest a dispute participation request -type ParticipationRequest struct { - candidateHash common.Hash - candidateReceipt parachain.CandidateReceipt - session parachain.SessionIndex - //TODO: requestTimer for metrics -} - // participationItem implements btree.Item type participationItem struct { comparator CandidateComparator