Skip to content

Commit

Permalink
add participation
Browse files Browse the repository at this point in the history
  • Loading branch information
kanishkatn committed Jun 23, 2023
1 parent 38176e1 commit 9dc5a20
Show file tree
Hide file tree
Showing 2 changed files with 262 additions and 16 deletions.
262 changes: 262 additions & 0 deletions dot/parachain/dispute/participation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package dispute

import (
"fmt"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/parachain"
"github.com/ChainSafe/gossamer/pkg/scale"
"sync"
"sync/atomic"
)

// CandidateComparator comparator for ordering of disputes for candidate.
type CandidateComparator struct {
relayParentBlockNumber *uint32
candidateHash common.Hash
}

// NewCandidateComparator creates a new CandidateComparator.
func NewCandidateComparator(relayParentBlockNumber *uint32, receipt parachain.CandidateReceipt) (CandidateComparator, error) {
encodedReceipt, err := scale.Marshal(receipt)
if err != nil {
return CandidateComparator{}, fmt.Errorf("encode candidate receipt: %w", err)
}

candidateHash, err := common.Blake2bHash(encodedReceipt)
if err != nil {
return CandidateComparator{}, fmt.Errorf("hash candidate receipt: %w", err)
}

return CandidateComparator{
relayParentBlockNumber: relayParentBlockNumber,
candidateHash: candidateHash,
}, nil
}

// ParticipationRequest a dispute participation request
type ParticipationRequest struct {
candidateHash common.Hash
candidateReceipt parachain.CandidateReceipt
session parachain.SessionIndex
//TODO: requestTimer for metrics
}

// ParticipationStatement is a statement as result of the validation process.
type ParticipationStatement struct {
Session parachain.SessionIndex
CandidateHash common.Hash
CandidateReceipt parachain.CandidateReceipt
Outcome ParticipationOutcome
}

// ValidOutcome is the outcome when the candidate is valid.
type ValidOutcome struct{}

// Index returns the index of the type.
func (ValidOutcome) Index() uint {
return 0
}

// InvalidOutcome is the outcome when the candidate is invalid.
type InvalidOutcome struct{}

// Index returns the index of the type.
func (InvalidOutcome) Index() uint {
return 1
}

// UnAvailableOutcome is the outcome when the candidate is unavailable.
type UnAvailableOutcome struct{}

// Index returns the index of the type.
func (UnAvailableOutcome) Index() uint {
return 2
}

// ErrorOutcome is the outcome when the candidate has an error.
type ErrorOutcome struct{}

// Index returns the index of the type.
func (ErrorOutcome) Index() uint {
return 3
}

// ParticipationOutcome is the outcome of the validation process.
type ParticipationOutcome scale.VaryingDataType

// Set will set a VaryingDataTypeValue using the underlying VaryingDataType
func (po *ParticipationOutcome) Set(val scale.VaryingDataTypeValue) (err error) {
vdt := scale.VaryingDataType(*po)
err = vdt.Set(val)
if err != nil {
return fmt.Errorf("setting value to varying data type: %w", err)
}
*po = ParticipationOutcome(vdt)
return nil
}

// Value returns the value from the underlying VaryingDataType
func (po *ParticipationOutcome) Value() (scale.VaryingDataTypeValue, error) {
vdt := scale.VaryingDataType(*po)
return vdt.Value()
}

// Validity returns true if the outcome is valid.
func (po *ParticipationOutcome) Validity() (bool, error) {
val, err := po.Value()
if err != nil {
return false, fmt.Errorf("getting value from varying data type: %w", err)
}

_, ok := val.(ValidOutcome)
if !ok {
return true, nil
}

return false, nil
}

// Participation keeps track of the disputes we need to participate in.
type Participation interface {
// Queue a dispute for the node to participate in
Queue(request ParticipationRequest, priority ParticipationPriority) error

// Clear clears a participation request. This is called when we have the dispute result.
Clear(candidateHash common.Hash) error

// TODO: implement this once we have the message bus
// ProcessActiveLeavesUpdate processes an active leaves update
// ProcessActiveLeavesUpdate(update parachain.ActiveLeavesUpdate) error

// BumpPriority bumps the priority for the given receipts
BumpPriority(receipts []parachain.CandidateReceipt) error
}

type block struct {
Number uint32
Hash common.Hash
}

type participation struct {
runningParticipation sync.Map
workers atomic.Int32

queue Queue
//TODO: worker sender channel
recentBlock *block

//TODO: metrics
}

const MaxParallelParticipation = 3

func (p *participation) Queue(request ParticipationRequest, priority ParticipationPriority) error {
if _, ok := p.runningParticipation.Load(request.candidateHash); ok {
return nil
}

// if we already have a recent block, participate right away
if p.recentBlock != nil && p.numberOfWorkers() < MaxParallelParticipation {
if err := p.forkParticipation(&request, p.recentBlock.Hash); err != nil {
return fmt.Errorf("fork participation: %w", err)
}

return nil
}

// TODO: it looks like the relayParentBlockNumber needs to be fetched from the overseer which is not available at the moment.
comparator, err := NewCandidateComparator(nil, request.candidateReceipt)
if err != nil {
return fmt.Errorf("create candidate comparator: %w", err)
}

if err := p.queue.Queue(comparator, &request, priority); err != nil {
return fmt.Errorf("queue participation request: %w", err)
}

return nil
}

func (p *participation) Clear(candidateHash common.Hash) error {
p.runningParticipation.Delete(candidateHash)
p.workers.Add(-1)

if p.recentBlock == nil {
panic("we never ever reset recentBlock to nil and we already received a result, so it must have been set before. qed")
}

if err := p.dequeueUntilCapacity(p.recentBlock.Hash); err != nil {
return fmt.Errorf("dequeue until capacity: %w", err)
}

return nil
}

func (p *participation) BumpPriority(receipts []parachain.CandidateReceipt) error {
for _, receipt := range receipts {
// TODO: it looks like the relayParentBlockNumber needs to be fetched from the overseer which is not available at the moment.
comparator, err := NewCandidateComparator(nil, receipt)
if err != nil {
log.Errorf("failed to create candidate comparator: %s", err)
continue
}

if err := p.queue.PrioritiseIfPresent(comparator); err != nil {
log.Errorf("failed to prioritise candidate comparator: %s", err)
}
}

return nil
}

func (p *participation) numberOfWorkers() int {
return int(p.workers.Load())
}

func (p *participation) dequeueUntilCapacity(recentHead common.Hash) error {
for {
if p.numberOfWorkers() >= MaxParallelParticipation {
break
}

if request := p.queue.Dequeue(); request != nil {
if err := p.forkParticipation(request.request, recentHead); err != nil {
log.Errorf("failed to fork participation: %s", err)
}
} else {
break
}
}

return nil
}

func (p *participation) forkParticipation(request *ParticipationRequest, recentHead common.Hash) error {
_, ok := p.runningParticipation.LoadOrStore(request.candidateHash, nil)
if ok {
return nil
}

p.workers.Add(1)
go func() {
if err := p.participate(); err != nil {
log.Errorf("failed to participate in dispute: %s", err)
}
}()

return nil
}

// TODO: implement this once we have the network protocols
func (p *participation) participate() error {
//TODO implement me
panic("implement me")
}

func NewParticipation() Participation {
return &participation{
runningParticipation: sync.Map{},
queue: NewQueue(),
}
}
16 changes: 0 additions & 16 deletions dot/parachain/dispute/queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package dispute

import (
"bytes"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/parachain"
"sync"

"github.com/google/btree"
Expand All @@ -22,20 +20,6 @@ import (
// However, I will not be implementing it right away. It will be picked up as a single task for the
// entire dispute module https://github.com/ChainSafe/gossamer/issues/3313.

// CandidateComparator comparator for ordering of disputes for candidate.
type CandidateComparator struct {
relayParentBlockNumber *uint32
candidateHash common.Hash
}

// ParticipationRequest a dispute participation request
type ParticipationRequest struct {
candidateHash common.Hash
candidateReceipt parachain.CandidateReceipt
session parachain.SessionIndex
//TODO: requestTimer for metrics
}

// participationItem implements btree.Item
type participationItem struct {
comparator CandidateComparator
Expand Down

0 comments on commit 9dc5a20

Please sign in to comment.