Skip to content

Commit

Permalink
fix: ignore bad peers when fetching the latest value
Browse files Browse the repository at this point in the history
They may not support the protocol. Or they may just not give us what we're
looking for. We might as well keep asking till someone answers.
  • Loading branch information
Stebalien committed Feb 5, 2020
1 parent eebeec4 commit 6a6dbc4
Showing 1 changed file with 13 additions and 15 deletions.
28 changes: 13 additions & 15 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo
case <-ctx.Done():
return
default:
log.Errorf("PubsubPeerJoin: error interacting with new peer", err)
log.Errorf("PubsubPeerJoin: error interacting with new peer: %s", err)
}
}
}
Expand Down Expand Up @@ -524,29 +524,27 @@ func (p *PubsubValueStore) handleNewMsgs(ctx context.Context, sub *pubsub.Subscr
}

func (p *PubsubValueStore) handleNewPeer(ctx context.Context, peerEvtHandler *pubsub.TopicEventHandler, key string) ([]byte, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

var pid peer.ID

for {
for ctx.Err() == nil {
peerEvt, err := peerEvtHandler.NextPeerEvent(ctx)
if err != nil {
if err != context.Canceled {
log.Warningf("PubsubNewPeer: subscription error in %s: %s", key, err.Error())
}
return nil, err
}
if peerEvt.Type == pubsub.PeerJoin {
pid = peerEvt.Peer
break

if peerEvt.Type != pubsub.PeerJoin {
continue
}
}

return p.fetch.Fetch(ctx, pid, key)
pid := peerEvt.Peer
value, err := p.fetch.Fetch(ctx, pid, key)
if err == nil {
return value, nil
}
log.Debugf("failed to fetch latest pubsub value for key '%s' from peer '%s': %s", key, pid, err)
}
return nil, ctx.Err()
}

func (p *PubsubValueStore) notifyWatchers(key string, data []byte) {
Expand Down

0 comments on commit 6a6dbc4

Please sign in to comment.