diff --git a/beacon-chain/rpc/validator/server.go b/beacon-chain/rpc/validator/server.go index 23eefa144a9f..82c186e91023 100644 --- a/beacon-chain/rpc/validator/server.go +++ b/beacon-chain/rpc/validator/server.go @@ -171,7 +171,7 @@ func (vs *Server) WaitForSynced(req *ptypes.Empty, stream ethpb.BeaconNodeValida select { case event := <-stateChannel: if event.Type == statefeed.Synced { - data := event.Data.(*statefeed.ChainStartedData) + data := event.Data.(*statefeed.SyncedData) log.WithField("starttime", data.StartTime).Debug("Received sync completed event") log.Info("Sending genesis time notification to connected validator clients") res := ðpb.SyncedResponse{ diff --git a/beacon-chain/rpc/validator/server_test.go b/beacon-chain/rpc/validator/server_test.go index 9e429da7f4a2..ce4e2aa04244 100644 --- a/beacon-chain/rpc/validator/server_test.go +++ b/beacon-chain/rpc/validator/server_test.go @@ -21,6 +21,7 @@ import ( internal "github.com/prysmaticlabs/prysm/beacon-chain/rpc/testing" mockRPC "github.com/prysmaticlabs/prysm/beacon-chain/rpc/testing" stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" + mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing" pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/bls" "github.com/prysmaticlabs/prysm/shared/event" @@ -275,6 +276,7 @@ func TestWaitForSynced_AlreadyStarted(t *testing.T) { BeaconDB: db, StateNotifier: chainService.StateNotifier(), HeadFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, } ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -325,8 +327,8 @@ func TestWaitForSynced_NotStartedThenLogFired(t *testing.T) { // Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed). for sent := 0; sent == 0; { sent = Server.StateNotifier.StateFeed().Send(&feed.Event{ - Type: statefeed.ChainStarted, - Data: &statefeed.ChainStartedData{ + Type: statefeed.Synced, + Data: &statefeed.SyncedData{ StartTime: time.Unix(0, 0), }, }) diff --git a/shared/mock/BUILD.bazel b/shared/mock/BUILD.bazel index 9ef03639c0c9..731bcb6f8b00 100644 --- a/shared/mock/BUILD.bazel +++ b/shared/mock/BUILD.bazel @@ -14,7 +14,6 @@ go_library( "@com_github_gogo_protobuf//types:go_default_library", "@com_github_golang_mock//gomock:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", - "@io_bazel_rules_go//proto/wkt:empty_go_proto", "@org_golang_google_grpc//:go_default_library", "@org_golang_google_grpc//metadata:go_default_library", ], diff --git a/shared/mock/beacon_chain_service_mock.go b/shared/mock/beacon_chain_service_mock.go index 4c285db8c805..1f99d60c7acf 100644 --- a/shared/mock/beacon_chain_service_mock.go +++ b/shared/mock/beacon_chain_service_mock.go @@ -8,7 +8,7 @@ import ( context "context" reflect "reflect" - empty "github.com/gogo/protobuf/types" + ptypes "github.com/gogo/protobuf/types" gomock "github.com/golang/mock/gomock" v1alpha1 "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" grpc "google.golang.org/grpc" @@ -59,7 +59,7 @@ func (mr *MockBeaconChainClientMockRecorder) AttestationPool(arg0, arg1 interfac } // GetBeaconConfig mocks base method -func (m *MockBeaconChainClient) GetBeaconConfig(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (*v1alpha1.BeaconConfig, error) { +func (m *MockBeaconChainClient) GetBeaconConfig(arg0 context.Context, arg1 *ptypes.Empty, arg2 ...grpc.CallOption) (*v1alpha1.BeaconConfig, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -79,7 +79,7 @@ func (mr *MockBeaconChainClientMockRecorder) GetBeaconConfig(arg0, arg1 interfac } // GetChainHead mocks base method -func (m *MockBeaconChainClient) GetChainHead(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (*v1alpha1.ChainHead, error) { +func (m *MockBeaconChainClient) GetChainHead(arg0 context.Context, arg1 *ptypes.Empty, arg2 ...grpc.CallOption) (*v1alpha1.ChainHead, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -179,7 +179,7 @@ func (mr *MockBeaconChainClientMockRecorder) GetValidatorPerformance(arg0, arg1 } // GetValidatorQueue mocks base method -func (m *MockBeaconChainClient) GetValidatorQueue(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (*v1alpha1.ValidatorQueue, error) { +func (m *MockBeaconChainClient) GetValidatorQueue(arg0 context.Context, arg1 *ptypes.Empty, arg2 ...grpc.CallOption) (*v1alpha1.ValidatorQueue, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -339,7 +339,7 @@ func (mr *MockBeaconChainClientMockRecorder) ListValidators(arg0, arg1 interface } // StreamAttestations mocks base method -func (m *MockBeaconChainClient) StreamAttestations(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (v1alpha1.BeaconChain_StreamAttestationsClient, error) { +func (m *MockBeaconChainClient) StreamAttestations(arg0 context.Context, arg1 *ptypes.Empty, arg2 ...grpc.CallOption) (v1alpha1.BeaconChain_StreamAttestationsClient, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -359,7 +359,7 @@ func (mr *MockBeaconChainClientMockRecorder) StreamAttestations(arg0, arg1 inter } // StreamBlocks mocks base method -func (m *MockBeaconChainClient) StreamBlocks(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (v1alpha1.BeaconChain_StreamBlocksClient, error) { +func (m *MockBeaconChainClient) StreamBlocks(arg0 context.Context, arg1 *ptypes.Empty, arg2 ...grpc.CallOption) (v1alpha1.BeaconChain_StreamBlocksClient, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -379,7 +379,7 @@ func (mr *MockBeaconChainClientMockRecorder) StreamBlocks(arg0, arg1 interface{} } // StreamChainHead mocks base method -func (m *MockBeaconChainClient) StreamChainHead(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (v1alpha1.BeaconChain_StreamChainHeadClient, error) { +func (m *MockBeaconChainClient) StreamChainHead(arg0 context.Context, arg1 *ptypes.Empty, arg2 ...grpc.CallOption) (v1alpha1.BeaconChain_StreamChainHeadClient, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -399,7 +399,7 @@ func (mr *MockBeaconChainClientMockRecorder) StreamChainHead(arg0, arg1 interfac } // StreamIndexedAttestations mocks base method -func (m *MockBeaconChainClient) StreamIndexedAttestations(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (v1alpha1.BeaconChain_StreamIndexedAttestationsClient, error) { +func (m *MockBeaconChainClient) StreamIndexedAttestations(arg0 context.Context, arg1 *ptypes.Empty, arg2 ...grpc.CallOption) (v1alpha1.BeaconChain_StreamIndexedAttestationsClient, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { diff --git a/shared/mock/node_service_mock.go b/shared/mock/node_service_mock.go index 23fa752728f4..15917a98a1ac 100644 --- a/shared/mock/node_service_mock.go +++ b/shared/mock/node_service_mock.go @@ -6,12 +6,12 @@ package mock import ( context "context" + reflect "reflect" + ptypes "github.com/gogo/protobuf/types" gomock "github.com/golang/mock/gomock" - empty "github.com/golang/protobuf/ptypes/empty" eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" grpc "google.golang.org/grpc" - reflect "reflect" ) // MockNodeClient is a mock of NodeClient interface. @@ -58,7 +58,7 @@ func (mr *MockNodeClientMockRecorder) GetGenesis(arg0, arg1 interface{}, arg2 .. } // GetSyncStatus mocks base method. -func (m *MockNodeClient) GetSyncStatus(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (*eth.SyncStatus, error) { +func (m *MockNodeClient) GetSyncStatus(arg0 context.Context, arg1 *ptypes.Empty, arg2 ...grpc.CallOption) (*eth.SyncStatus, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -78,7 +78,7 @@ func (mr *MockNodeClientMockRecorder) GetSyncStatus(arg0, arg1 interface{}, arg2 } // GetVersion mocks base method. -func (m *MockNodeClient) GetVersion(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (*eth.Version, error) { +func (m *MockNodeClient) GetVersion(arg0 context.Context, arg1 *ptypes.Empty, arg2 ...grpc.CallOption) (*eth.Version, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -98,7 +98,7 @@ func (mr *MockNodeClientMockRecorder) GetVersion(arg0, arg1 interface{}, arg2 .. } // ListImplementedServices mocks base method. -func (m *MockNodeClient) ListImplementedServices(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (*eth.ImplementedServices, error) { +func (m *MockNodeClient) ListImplementedServices(arg0 context.Context, arg1 *ptypes.Empty, arg2 ...grpc.CallOption) (*eth.ImplementedServices, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { @@ -118,7 +118,7 @@ func (mr *MockNodeClientMockRecorder) ListImplementedServices(arg0, arg1 interfa } // ListPeers mocks base method. -func (m *MockNodeClient) ListPeers(arg0 context.Context, arg1 *empty.Empty, arg2 ...grpc.CallOption) (*eth.Peers, error) { +func (m *MockNodeClient) ListPeers(arg0 context.Context, arg1 *ptypes.Empty, arg2 ...grpc.CallOption) (*eth.Peers, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { diff --git a/validator/client/validator.go b/validator/client/validator.go index 3d28cd5269f5..bdf2b87c24af 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -101,12 +101,12 @@ func (v *validator) WaitForSynced(ctx context.Context) error { return nil } v.genesisTime = syncedRes.GenesisTime + // Once we know the beacon chain is ready log is received, we update the genesis time of the validator client + // and begin a slot ticker used to track the current slot the beacon node is in. + v.ticker = slotutil.GetSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot) + log.WithField("genesisTime", time.Unix(int64(v.genesisTime), 0)).Info("Beacon chain started") break } - // Once we know the beacon chain is ready log is received, we update the genesis time of the validator client - // and begin a slot ticker used to track the current slot the beacon node is in. - v.ticker = slotutil.GetSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot) - log.WithField("genesisTime", time.Unix(int64(v.genesisTime), 0)).Info("Beacon chain started") return nil } @@ -151,7 +151,6 @@ func (v *validator) WaitForActivation(ctx context.Context) error { for _, pubKey := range validatorActivatedRecords { log.WithField("pubKey", fmt.Sprintf("%#x", bytesutil.Trunc(pubKey[:]))).Info("Validator activated") } - v.ticker = slotutil.GetSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot) return nil } diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index 8b9fb291e270..600c9644ba91 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -157,7 +157,7 @@ func TestWaitForSynced_ReceiveErrorFromStream(t *testing.T) { errors.New("fails"), ) err := v.WaitForSynced(context.Background()) - want := "could not receive ChainStart from stream" + want := "could not receive Synced from stream" if !strings.Contains(err.Error(), want) { t.Errorf("Expected %v, received %v", want, err) } @@ -377,18 +377,29 @@ func TestWaitSync_ContextCanceled(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() n := internal.NewMockNodeClient(ctrl) + valClient := internal.NewMockBeaconNodeValidatorClient(ctrl) v := validator{ - node: n, + node: n, + validatorClient: valClient, } ctx, cancel := context.WithCancel(context.Background()) cancel() - n.EXPECT().GetSyncStatus( + genesis := uint64(time.Unix(0, 0).Unix()) + clientStream := internal.NewMockBeaconNodeValidator_WaitForSyncedClient(ctrl) + clientStream.EXPECT().Recv().Return( + ðpb.SyncedResponse{ + Synced: true, + GenesisTime: genesis, + }, + nil, + ) + valClient.EXPECT().WaitForSynced( gomock.Any(), gomock.Any(), - ).Return(ðpb.SyncStatus{Syncing: true}, nil) + ).Return(clientStream, nil) err := v.WaitForSynced(ctx) want := cancelledCtx @@ -401,40 +412,26 @@ func TestWaitSync_NotSyncing(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() n := internal.NewMockNodeClient(ctrl) + valClient := internal.NewMockBeaconNodeValidatorClient(ctrl) v := validator{ - node: n, + node: n, + validatorClient: valClient, } - n.EXPECT().GetSyncStatus( - gomock.Any(), - gomock.Any(), - ).Return(ðpb.SyncStatus{Syncing: false}, nil) - - err := v.WaitForSynced(context.Background()) - if err != nil { - t.Fatal(err) - } -} - -func TestWaitSync_Syncing(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - n := internal.NewMockNodeClient(ctrl) - - v := validator{ - node: n, - } - - n.EXPECT().GetSyncStatus( - gomock.Any(), - gomock.Any(), - ).Return(ðpb.SyncStatus{Syncing: true}, nil) - - n.EXPECT().GetSyncStatus( + genesis := uint64(time.Unix(1, 0).Unix()) + clientStream := internal.NewMockBeaconNodeValidator_WaitForSyncedClient(ctrl) + clientStream.EXPECT().Recv().Return( + ðpb.SyncedResponse{ + Synced: true, + GenesisTime: genesis, + }, + nil, + ) + valClient.EXPECT().WaitForSynced( gomock.Any(), gomock.Any(), - ).Return(ðpb.SyncStatus{Syncing: false}, nil) + ).Return(clientStream, nil) err := v.WaitForSynced(context.Background()) if err != nil {