diff --git a/pubsub.go b/pubsub.go index a9ebbf0..933382e 100644 --- a/pubsub.go +++ b/pubsub.go @@ -253,35 +253,6 @@ func (p *PubsubValueStore) rebroadcast(ctx context.Context) { keys := make([]string, 0, len(p.topics)) topics := make([]*topicInfo, 0, len(p.topics)) for k, ti := range p.topics { - keys = append(keys, k) - topics = append(topics, ti) - } - p.mx.Unlock() - if len(topics) > 0 { - for i, k := range keys { - val, err := p.getLocal(k) - if err == nil { - topic := topics[i].topic - select { - case <-p.psPublishChannel(ctx, topic, val): - case <-ctx.Done(): - return - } - } - } - } - case <-ctx.Done(): - return - } - } -} - -func (p *PubsubValueStore) psPublishChannel(ctx context.Context, topic *pubsub.Topic, value []byte) chan error { - done := make(chan error, 1) - go func() { - done <- topic.Publish(ctx, value) - }() - return done } // putLocal tries to put the key-value pair into the local datastore @@ -478,7 +449,7 @@ func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo case <-ctx.Done(): return default: - log.Errorf("PubsubPeerJoin: error interacting with new peer", err) + log.Errorf("PubsubPeerJoin: error interacting with new peer: %s", err) } } } @@ -524,15 +495,7 @@ func (p *PubsubValueStore) handleNewMsgs(ctx context.Context, sub *pubsub.Subscr } func (p *PubsubValueStore) handleNewPeer(ctx context.Context, peerEvtHandler *pubsub.TopicEventHandler, key string) ([]byte, error) { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } - - var pid peer.ID - - for { + for ctx.Err() == nil { peerEvt, err := peerEvtHandler.NextPeerEvent(ctx) if err != nil { if err != context.Canceled { @@ -540,13 +503,18 @@ func (p *PubsubValueStore) handleNewPeer(ctx context.Context, peerEvtHandler *pu } return nil, err } - if peerEvt.Type == pubsub.PeerJoin { - pid = peerEvt.Peer - break + + if peerEvt.Type != pubsub.PeerJoin { + continue } - } - return p.fetch.Fetch(ctx, pid, key) + value, err := p.fetch.Fetch(ctx, peerEvt.Peer, key) + if err == nil { + return value, nil + } + log.Debugf("failed to fetch latest pubsub value for key '%s' from peer '%s': %s", key, peerEvt.Peer, err) + } + return nil, ctx.Err() } func (p *PubsubValueStore) notifyWatchers(key string, data []byte) {