Skip to content

Commit

Permalink
tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
rauljordan committed May 1, 2020
1 parent 7a40233 commit 9029d63
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 17 deletions.
2 changes: 0 additions & 2 deletions beacon-chain/rpc/validator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ go_library(
"//shared/featureconfig:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"//shared/slotutil:go_default_library",
"//shared/traceutil:go_default_library",
"//shared/trieutil:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
Expand Down Expand Up @@ -96,7 +95,6 @@ go_test(
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
"//shared/slotutil/testing:go_default_library",
"//shared/testutil:go_default_library",
"//shared/trieutil:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
Expand Down
35 changes: 20 additions & 15 deletions beacon-chain/rpc/validator/assignments.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import (
"context"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/slotutil"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// GetDuties returns the duties assigned to a list of validators specified
Expand Down Expand Up @@ -47,19 +46,8 @@ func (vs *Server) StreamDuties(req *ethpb.DutiesRequest, stream ethpb.BeaconNode
stateSub := vs.StateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()

secondsPerEpoch := params.BeaconConfig().SecondsPerSlot * params.BeaconConfig().SlotsPerEpoch
epochTicker := slotutil.GetSlotTicker(vs.GenesisTimeFetcher.GenesisTime(), secondsPerEpoch)
for {
select {
// Ticks every epoch to submit assignments to connected validator clients.
case <-epochTicker.C():
res, err := vs.duties(stream.Context(), req)
if err != nil {
return status.Errorf(codes.Internal, "Could not compute validator duties: %v", err)
}
if err := stream.Send(res); err != nil {
return status.Errorf(codes.Internal, "Could not send response over stream: %v", err)
}
case ev := <-stateChannel:
// If a reorg occurred, we recompute duties for the connected validator clients
// and send another response over the server stream right away.
Expand All @@ -83,6 +71,23 @@ func (vs *Server) StreamDuties(req *ethpb.DutiesRequest, stream ethpb.BeaconNode
return status.Errorf(codes.Internal, "Could not send response over stream: %v", err)
}
}
if ev.Type == statefeed.BlockProcessed {
data, ok := ev.Data.(*statefeed.BlockProcessedData)
if !ok {
return status.Errorf(codes.Internal, "Received incorrect data type over reorg feed: %v", data)
}
// If this is not a new epoch, we do not update duties.
if data != nil && data.Slot%params.BeaconConfig().SlotsPerEpoch != 0 {
continue
}
res, err := vs.duties(stream.Context(), req)
if err != nil {
return status.Errorf(codes.Internal, "Could not compute validator duties: %v", err)
}
if err := stream.Send(res); err != nil {
return status.Errorf(codes.Internal, "Could not send response over stream: %v", err)
}
}
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Stream context canceled")
case <-vs.Ctx.Done():
Expand Down

0 comments on commit 9029d63

Please sign in to comment.