Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Stream Duties in Validator Client #5814

Closed
wants to merge 55 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
3813a16
begin stream duties impl on validator client side
rauljordan May 11, 2020
4f67c0f
client-side impl of stream duties
rauljordan May 11, 2020
bc42e86
modify runtime
rauljordan May 11, 2020
400474a
able to build validator client
rauljordan May 11, 2020
508b73c
prevent nil pointer on current epoch assignments
rauljordan May 11, 2020
0c9932e
include the next epoch assignments in the response type
rauljordan May 11, 2020
a88b234
including next epoch assignments
rauljordan May 11, 2020
1565d5c
all tests passing
rauljordan May 11, 2020
2e279a0
all validator client tests passing for stream duties
rauljordan May 11, 2020
1f85b7a
beacon chain tests passing
rauljordan May 11, 2020
9e33516
Merge branch 'master' into stream-impl
rauljordan May 11, 2020
6ae4d2b
fix format
rauljordan May 11, 2020
855b787
Merge branch 'stream-impl' of github.com:prysmaticlabs/prysm into str…
rauljordan May 11, 2020
3b11678
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 12, 2020
26296e8
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 12, 2020
aee306f
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 12, 2020
5699429
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 12, 2020
3077b24
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 12, 2020
b090e72
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 12, 2020
2c25e6c
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 12, 2020
ec519d1
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 12, 2020
f30a9dd
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 12, 2020
f8903d8
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 12, 2020
62918fd
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 12, 2020
d53a612
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 12, 2020
9101a99
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 13, 2020
9185c32
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 13, 2020
54dd211
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 13, 2020
aaa0d9a
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 13, 2020
c8c1d26
add in check
rauljordan May 13, 2020
5503d0d
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 13, 2020
2132287
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 13, 2020
d614fb3
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 14, 2020
b12d928
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 14, 2020
0417349
add duties map
rauljordan May 14, 2020
e6b66fd
Merge branch 'master' into stream-impl
rauljordan May 14, 2020
911a813
break up into smaller methods, use map for duties instead
rauljordan May 14, 2020
8d3c030
Merge branch 'stream-impl' of github.com:prysmaticlabs/prysm into str…
rauljordan May 14, 2020
a05d311
get duties before chainstarts
rauljordan May 14, 2020
c475d4c
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 14, 2020
bbf0318
add back log
rauljordan May 14, 2020
815b347
Merge branch 'stream-impl' of github.com:prysmaticlabs/prysm into str…
rauljordan May 14, 2020
6961dd1
rem log
rauljordan May 14, 2020
31f952a
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 14, 2020
42b79e4
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 14, 2020
36554ea
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 14, 2020
dc387d7
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 14, 2020
0f6daa5
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 14, 2020
e6a1fec
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 14, 2020
0bd8d39
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 14, 2020
26a0a01
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 14, 2020
bc151fe
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 14, 2020
7cb87c8
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 15, 2020
5c48d63
resolve conf
rauljordan May 15, 2020
ffbf47a
Merge refs/heads/master into stream-impl
prylabs-bulldozer[bot] May 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon-chain/rpc/validator/assignments.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (vs *Server) StreamDuties(req *ethpb.DutiesRequest, stream ethpb.BeaconNode
// the number epochs since the genesis time, otherwise 0 by default.
genesisTime := vs.GenesisTimeFetcher.GenesisTime()
var currentEpoch uint64
if genesisTime.Before(roughtime.Now()) {
if roughtime.Now().After(genesisTime) {
currentEpoch = slotutil.EpochsSinceGenesis(vs.GenesisTimeFetcher.GenesisTime())
}
res, err := vs.duties(stream.Context(), req, currentEpoch)
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/initial-sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/kevinms/leakybucket-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
Expand All @@ -21,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
"github.com/sirupsen/logrus"
)

var _ = shared.Service(&Service{})
Expand Down
2 changes: 2 additions & 0 deletions validator/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"validator.go",
"validator_aggregate.go",
"validator_attest.go",
"validator_duties.go",
"validator_log.go",
"validator_metrics.go",
"validator_propose.go",
Expand Down Expand Up @@ -63,6 +64,7 @@ go_test(
"service_test.go",
"validator_aggregate_test.go",
"validator_attest_test.go",
"validator_duties_test.go",
"validator_propose_test.go",
"validator_test.go",
],
Expand Down
12 changes: 5 additions & 7 deletions validator/client/fake_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type fakeValidator struct {
WaitForSyncedCalled bool
NextSlotCalled bool
CanonicalHeadSlotCalled bool
UpdateDutiesCalled bool
StreamDutiesCalled bool
UpdateProtectionsCalled bool
RoleAtCalled bool
AttestToBlockHeadCalled bool
Expand All @@ -26,10 +26,9 @@ type fakeValidator struct {
ProposeBlockArg1 uint64
AttestToBlockHeadArg1 uint64
RoleAtArg1 uint64
UpdateDutiesArg1 uint64
NextSlotRet <-chan uint64
PublicKey string
UpdateDutiesRet error
StreamDutiesRet error
RolesAtRet []validatorRole
}

Expand Down Expand Up @@ -72,10 +71,9 @@ func (fv *fakeValidator) NextSlot() <-chan uint64 {
return fv.NextSlotRet
}

func (fv *fakeValidator) UpdateDuties(_ context.Context, slot uint64) error {
fv.UpdateDutiesCalled = true
fv.UpdateDutiesArg1 = slot
return fv.UpdateDutiesRet
func (fv *fakeValidator) StreamDuties(_ context.Context) error {
fv.StreamDutiesCalled = true
return fv.StreamDutiesRet
}

func (fv *fakeValidator) UpdateProtections(_ context.Context, slot uint64) error {
Expand Down
30 changes: 14 additions & 16 deletions validator/client/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Validator interface {
NextSlot() <-chan uint64
SlotDeadline(slot uint64) time.Time
LogValidatorGainsAndLosses(ctx context.Context, slot uint64) error
UpdateDuties(ctx context.Context, slot uint64) error
StreamDuties(ctx context.Context) error
UpdateProtections(ctx context.Context, slot uint64) error
RolesAt(ctx context.Context, slot uint64) (map[[48]byte][]validatorRole, error) // validator pubKey -> roles
SubmitAttestation(ctx context.Context, slot uint64, pubKey [48]byte)
Expand All @@ -43,8 +43,8 @@ type Validator interface {
// Order of operations:
// 1 - Initialize validator data
// 2 - Wait for validator activation
// 3 - Wait for the next slot start
// 4 - Update assignments
// 3 - Listen to a server-side stream of validator duties
// 4 - Wait for the next slot start
// 5 - Determine role at current slot
// 6 - Perform assigned role, if any
func run(ctx context.Context, v Validator) {
Expand All @@ -64,13 +64,20 @@ func run(ctx context.Context, v Validator) {
if err := v.WaitForActivation(ctx); err != nil {
log.Fatalf("Could not wait for validator activation: %v", err)
}
log.Info("Got activation")
headSlot, err := v.CanonicalHeadSlot(ctx)
if err != nil {
log.Fatalf("Could not get current canonical head slot: %v", err)
}
if err := v.UpdateDuties(ctx, headSlot); err != nil {
handleAssignmentError(err, headSlot)
}
log.Info("Got canonical head slot")
// We listen to a server-side stream of validator duties in the
// background of the validator client.
go func() {
log.Info("Requesting duties stream")
if err := v.StreamDuties(ctx); err != nil {
handleAssignmentError(err, headSlot)
}
}()
for {
ctx, span := trace.StartSpan(ctx, "validator.processSlot")

Expand All @@ -81,23 +88,14 @@ func run(ctx context.Context, v Validator) {
case slot := <-v.NextSlot():
span.AddAttributes(trace.Int64Attribute("slot", int64(slot)))
deadline := v.SlotDeadline(slot)
slotCtx, cancel := context.WithDeadline(ctx, deadline)
slotCtx, _ := context.WithDeadline(ctx, deadline)
// Report this validator client's rewards and penalties throughout its lifecycle.
log := log.WithField("slot", slot)
log.WithField("deadline", deadline).Debug("Set deadline for proposals and attestations")
if err := v.LogValidatorGainsAndLosses(slotCtx, slot); err != nil {
log.WithError(err).Error("Could not report validator's rewards/penalties")
}

// Keep trying to update assignments if they are nil or if we are past an
// epoch transition in the beacon node's state.
if err := v.UpdateDuties(ctx, slot); err != nil {
handleAssignmentError(err, slot)
cancel()
span.End()
continue
}

if featureconfig.Get().ProtectAttester {
if err := v.UpdateProtections(ctx, slot); err != nil {
log.WithError(err).Error("Could not update validator protection")
Expand Down
46 changes: 0 additions & 46 deletions validator/client/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ package client

import (
"context"
"errors"
"testing"
"time"

"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
)

func cancelledContext() context.Context {
Expand Down Expand Up @@ -54,49 +51,6 @@ func TestCancelledContext_WaitsForActivation(t *testing.T) {
}
}

func TestUpdateDuties_NextSlot(t *testing.T) {
v := &fakeValidator{}
ctx, cancel := context.WithCancel(context.Background())

slot := uint64(55)
ticker := make(chan uint64)
v.NextSlotRet = ticker
go func() {
ticker <- slot

cancel()
}()

run(ctx, v)

if !v.UpdateDutiesCalled {
t.Fatalf("Expected UpdateAssignments(%d) to be called", slot)
}
if v.UpdateDutiesArg1 != slot {
t.Errorf("UpdateAssignments was called with wrong argument. Want=%d, got=%d", slot, v.UpdateDutiesArg1)
}
}

func TestUpdateDuties_HandlesError(t *testing.T) {
hook := logTest.NewGlobal()
v := &fakeValidator{}
ctx, cancel := context.WithCancel(context.Background())

slot := uint64(55)
ticker := make(chan uint64)
v.NextSlotRet = ticker
go func() {
ticker <- slot

cancel()
}()
v.UpdateDutiesRet = errors.New("bad")

run(ctx, v)

testutil.AssertLogsContain(t, hook, "Failed to update assignments")
}

func TestRoleAt_NextSlot(t *testing.T) {
v := &fakeValidator{}
ctx, cancel := context.WithCancel(context.Background())
Expand Down
5 changes: 2 additions & 3 deletions validator/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (v *ValidatorService) Start() {
}
v.validator = &validator{
db: valDB,
dutiesByEpoch: make(map[uint64][]*ethpb.DutiesResponse_Duty, 2), // 2 epochs worth of duties.
validatorClient: ethpb.NewBeaconNodeValidatorClient(v.conn),
beaconClient: ethpb.NewBeaconChainClient(v.conn),
node: ethpb.NewNodeClient(v.conn),
Expand All @@ -200,9 +201,7 @@ func (v *ValidatorService) Stop() error {
return nil
}

// Status ...
//
// WIP - not done.
// Status of the validator service's health.
func (v *ValidatorService) Status() error {
if v.conn == nil {
return errors.New("no connection to beacon RPC")
Expand Down
Loading