diff --git a/pubsub.go b/pubsub.go index a9ebbf0..ebccfe6 100644 --- a/pubsub.go +++ b/pubsub.go @@ -478,7 +478,7 @@ func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo case <-ctx.Done(): return default: - log.Errorf("PubsubPeerJoin: error interacting with new peer", err) + log.Errorf("PubsubPeerJoin: error interacting with new peer: %s", err) } } } @@ -524,15 +524,7 @@ func (p *PubsubValueStore) handleNewMsgs(ctx context.Context, sub *pubsub.Subscr } func (p *PubsubValueStore) handleNewPeer(ctx context.Context, peerEvtHandler *pubsub.TopicEventHandler, key string) ([]byte, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } - - var pid peer.ID - - for { + for ctx.Err() == nil { peerEvt, err := peerEvtHandler.NextPeerEvent(ctx) if err != nil { if err != context.Canceled { @@ -540,13 +532,19 @@ func (p *PubsubValueStore) handleNewPeer(ctx context.Context, peerEvtHandler *pu } return nil, err } - if peerEvt.Type == pubsub.PeerJoin { - pid = peerEvt.Peer - break + + if peerEvt.Type != pubsub.PeerJoin { + continue } - } - return p.fetch.Fetch(ctx, pid, key) + pid := peerEvt.Peer + value, err := p.fetch.Fetch(ctx, pid, key) + if err == nil { + return value, nil + } + log.Debugf("failed to fetch latest pubsub value for key '%s' from peer '%s': %s", key, pid, err) + } + return nil, ctx.Err() } func (p *PubsubValueStore) notifyWatchers(key string, data []byte) {