Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect slasher streams on beacon node shutdown #5376

Merged
merged 10 commits into from
Apr 10, 2020
Merged
2 changes: 2 additions & 0 deletions slasher/beaconclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ go_library(
"@io_opencensus_go//plugin/ocgrpc:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//credentials:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
],
)

Expand Down
85 changes: 81 additions & 4 deletions slasher/beaconclient/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@ package beaconclient

import (
"context"
"errors"
"io"
"time"

ptypes "github.com/gogo/protobuf/types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// reconnectPeriod is the frequency that we try to restart our
// streams when the beacon chain is node does not respond.
var reconnectPeriod = 5 * time.Second

// receiveBlocks starts a gRPC client stream listener to obtain
// blocks from the beacon node. Upon receiving a block, the service
// broadcasts it to a feed for other services in slasher to subscribe to.
Expand All @@ -34,8 +41,23 @@ func (bs *Service) receiveBlocks(ctx context.Context) {
return
}
if err != nil {
log.WithError(err).Error("Could not receive block from beacon node")
break
if e, ok := status.FromError(err); ok {
switch e.Code() {
case codes.Canceled:
stream, err = bs.restartBlockStream(ctx)
if err != nil {
log.WithError(err).Error("Could not restart stream")
return
}
break
default:
log.WithError(err).Errorf("Could not receive block from beacon node. rpc status: %v", e.Code())
return
}
} else {
log.WithError(err).Error("Could not receive blocks from beacon node")
return
}
}
if res == nil {
continue
Expand Down Expand Up @@ -71,8 +93,23 @@ func (bs *Service) receiveAttestations(ctx context.Context) {
return
}
if err != nil {
log.WithError(err).Error("Could not receive attestations from beacon node")
continue
if e, ok := status.FromError(err); ok {
switch e.Code() {
case codes.Canceled:
stream, err = bs.restartIndexedAttestationStream(ctx)
if err != nil {
log.WithError(err).Error("Could not restart stream")
return
}
break
default:
log.WithError(err).Errorf("Could not receive attestations from beacon node. rpc status: %v", e.Code())
return
}
} else {
log.WithError(err).Error("Could not receive attestations from beacon node")
return
}
}
if res == nil {
continue
Expand Down Expand Up @@ -120,3 +157,43 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) {
}
}
}

func (bs *Service) restartIndexedAttestationStream(ctx context.Context) (ethpb.BeaconChain_StreamIndexedAttestationsClient, error) {
ticker := time.NewTicker(reconnectPeriod)
for {
select {
case <-ticker.C:
log.Info("Context closed, attempting to restart attestation stream")
stream, err := bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
if err != nil {
continue
}
log.Info("Attestation stream restarted...")
return stream, nil
case <-ctx.Done():
log.Debug("Context closed, exiting reconnect routine")
return nil, errors.New("context closed, no longer attempting to restart stream")
}
}

}

func (bs *Service) restartBlockStream(ctx context.Context) (ethpb.BeaconChain_StreamBlocksClient, error) {
ticker := time.NewTicker(reconnectPeriod)
for {
select {
case <-ticker.C:
log.Info("Context closed, attempting to restart block stream")
stream, err := bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{})
if err != nil {
continue
}
log.Info("Block stream restarted...")
return stream, nil
case <-ctx.Done():
log.Debug("Context closed, exiting reconnect routine")
return nil, errors.New("context closed, no longer attempting to restart stream")
}
}

}