diff --git a/slasher/beaconclient/BUILD.bazel b/slasher/beaconclient/BUILD.bazel index 499ea9adfd3d..6759712b4d2d 100644 --- a/slasher/beaconclient/BUILD.bazel +++ b/slasher/beaconclient/BUILD.bazel @@ -30,7 +30,9 @@ go_library( "@io_opencensus_go//plugin/ocgrpc:go_default_library", "@io_opencensus_go//trace:go_default_library", "@org_golang_google_grpc//:go_default_library", + "@org_golang_google_grpc//codes:go_default_library", "@org_golang_google_grpc//credentials:go_default_library", + "@org_golang_google_grpc//status:go_default_library", ], ) diff --git a/slasher/beaconclient/receivers.go b/slasher/beaconclient/receivers.go index 4b7220fb8bfd..6cbd0c5fafdf 100644 --- a/slasher/beaconclient/receivers.go +++ b/slasher/beaconclient/receivers.go @@ -2,6 +2,7 @@ package beaconclient import ( "context" + "errors" "io" "time" @@ -9,8 +10,14 @@ import ( ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/sirupsen/logrus" "go.opencensus.io/trace" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +// reconnectPeriod is the frequency that we try to restart our +// streams when the beacon chain is node does not respond. +var reconnectPeriod = 5 * time.Second + // receiveBlocks starts a gRPC client stream listener to obtain // blocks from the beacon node. Upon receiving a block, the service // broadcasts it to a feed for other services in slasher to subscribe to. @@ -34,8 +41,23 @@ func (bs *Service) receiveBlocks(ctx context.Context) { return } if err != nil { - log.WithError(err).Error("Could not receive block from beacon node") - break + if e, ok := status.FromError(err); ok { + switch e.Code() { + case codes.Canceled: + stream, err = bs.restartBlockStream(ctx) + if err != nil { + log.WithError(err).Error("Could not restart stream") + return + } + break + default: + log.WithError(err).Errorf("Could not receive block from beacon node. rpc status: %v", e.Code()) + return + } + } else { + log.WithError(err).Error("Could not receive blocks from beacon node") + return + } } if res == nil { continue @@ -71,8 +93,23 @@ func (bs *Service) receiveAttestations(ctx context.Context) { return } if err != nil { - log.WithError(err).Error("Could not receive attestations from beacon node") - continue + if e, ok := status.FromError(err); ok { + switch e.Code() { + case codes.Canceled: + stream, err = bs.restartIndexedAttestationStream(ctx) + if err != nil { + log.WithError(err).Error("Could not restart stream") + return + } + break + default: + log.WithError(err).Errorf("Could not receive attestations from beacon node. rpc status: %v", e.Code()) + return + } + } else { + log.WithError(err).Error("Could not receive attestations from beacon node") + return + } } if res == nil { continue @@ -120,3 +157,43 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) { } } } + +func (bs *Service) restartIndexedAttestationStream(ctx context.Context) (ethpb.BeaconChain_StreamIndexedAttestationsClient, error) { + ticker := time.NewTicker(reconnectPeriod) + for { + select { + case <-ticker.C: + log.Info("Context closed, attempting to restart attestation stream") + stream, err := bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{}) + if err != nil { + continue + } + log.Info("Attestation stream restarted...") + return stream, nil + case <-ctx.Done(): + log.Debug("Context closed, exiting reconnect routine") + return nil, errors.New("context closed, no longer attempting to restart stream") + } + } + +} + +func (bs *Service) restartBlockStream(ctx context.Context) (ethpb.BeaconChain_StreamBlocksClient, error) { + ticker := time.NewTicker(reconnectPeriod) + for { + select { + case <-ticker.C: + log.Info("Context closed, attempting to restart block stream") + stream, err := bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{}) + if err != nil { + continue + } + log.Info("Block stream restarted...") + return stream, nil + case <-ctx.Done(): + log.Debug("Context closed, exiting reconnect routine") + return nil, errors.New("context closed, no longer attempting to restart stream") + } + } + +}