From 6a6dbc433d398d686cce00775ff4349f75e29bd1 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 4 Feb 2020 16:35:30 -0800 Subject: [PATCH] fix: ignore bad peers when fetching the latest value They may not support the protocol. Or they may just not give us what we're looking for. We might as well keep asking till someone answers. --- pubsub.go | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/pubsub.go b/pubsub.go index a9ebbf0..ebccfe6 100644 --- a/pubsub.go +++ b/pubsub.go @@ -478,7 +478,7 @@ func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo case <-ctx.Done(): return default: - log.Errorf("PubsubPeerJoin: error interacting with new peer", err) + log.Errorf("PubsubPeerJoin: error interacting with new peer: %s", err) } } } @@ -524,15 +524,7 @@ func (p *PubsubValueStore) handleNewMsgs(ctx context.Context, sub *pubsub.Subscr } func (p *PubsubValueStore) handleNewPeer(ctx context.Context, peerEvtHandler *pubsub.TopicEventHandler, key string) ([]byte, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } - - var pid peer.ID - - for { + for ctx.Err() == nil { peerEvt, err := peerEvtHandler.NextPeerEvent(ctx) if err != nil { if err != context.Canceled { @@ -540,13 +532,19 @@ func (p *PubsubValueStore) handleNewPeer(ctx context.Context, peerEvtHandler *pu } return nil, err } - if peerEvt.Type == pubsub.PeerJoin { - pid = peerEvt.Peer - break + + if peerEvt.Type != pubsub.PeerJoin { + continue } - } - return p.fetch.Fetch(ctx, pid, key) + pid := peerEvt.Peer + value, err := p.fetch.Fetch(ctx, pid, key) + if err == nil { + return value, nil + } + log.Debugf("failed to fetch latest pubsub value for key '%s' from peer '%s': %s", key, pid, err) + } + return nil, ctx.Err() } func (p *PubsubValueStore) notifyWatchers(key string, data []byte) {