Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Topic Validator Deregistering Bug #5683

Merged
merged 2 commits into from
Apr 29, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions beacon-chain/sync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (r *Service) subscribeWithBase(base proto.Message, topic string, validator
msg, err := sub.Next(r.ctx)
if err != nil {
// This should only happen when the context is cancelled or subscription is cancelled.
log.WithError(err).Error("Subscription next failed")
log.WithError(err).Warn("Subscription next failed")
return
}

Expand Down Expand Up @@ -241,7 +241,7 @@ func (r *Service) subscribeDynamicWithSubnets(
// Update desired topic indices for aggregator
wantedSubs := r.aggregatorCommitteeIndices(currentSlot)
// Resize as appropriate.
r.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat)
r.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)

for _, idx := range wantedSubs {
if _, exists := subscriptions[idx]; !exists {
Expand Down Expand Up @@ -311,7 +311,7 @@ func (r *Service) subscribeDynamic(topicFormat string, determineSubsLen func() i

// revalidate that our currently connected subnets are valid.
func (r *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subscription,
wantedSubs []uint64, topicFormat string) {
wantedSubs []uint64, topicFormat string, digest [4]byte) {
for k, v := range subscriptions {
var wanted bool
for _, idx := range wantedSubs {
Expand All @@ -322,7 +322,8 @@ func (r *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subsc
}
if !wanted && v != nil {
v.Cancel()
if err := r.p2p.PubSub().UnregisterTopicValidator(fmt.Sprintf(topicFormat, k)); err != nil {
fullTopic := fmt.Sprintf(topicFormat, digest, k) + r.p2p.Encoding().ProtocolSuffix()
if err := r.p2p.PubSub().UnregisterTopicValidator(fullTopic); err != nil {
log.WithError(err).Error("Failed to unregister topic validator")
}
delete(subscriptions, k)
Expand Down