Skip to content

Commit

Permalink
fix: ignore bad peers when fetching the latest value
Browse files Browse the repository at this point in the history
They may not support the protocol. Or they may just not give us what we're
looking for. We might as well keep asking till someone answers.
  • Loading branch information
Stebalien committed Feb 5, 2020
1 parent eebeec4 commit 1a38051
Showing 1 changed file with 12 additions and 44 deletions.
56 changes: 12 additions & 44 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,35 +253,6 @@ func (p *PubsubValueStore) rebroadcast(ctx context.Context) {
keys := make([]string, 0, len(p.topics))
topics := make([]*topicInfo, 0, len(p.topics))
for k, ti := range p.topics {
keys = append(keys, k)
topics = append(topics, ti)
}
p.mx.Unlock()
if len(topics) > 0 {
for i, k := range keys {
val, err := p.getLocal(k)
if err == nil {
topic := topics[i].topic
select {
case <-p.psPublishChannel(ctx, topic, val):
case <-ctx.Done():
return
}
}
}
}
case <-ctx.Done():
return
}
}
}

func (p *PubsubValueStore) psPublishChannel(ctx context.Context, topic *pubsub.Topic, value []byte) chan error {
done := make(chan error, 1)
go func() {
done <- topic.Publish(ctx, value)
}()
return done
}

// putLocal tries to put the key-value pair into the local datastore
Expand Down Expand Up @@ -478,7 +449,7 @@ func (p *PubsubValueStore) handleSubscription(ctx context.Context, ti *topicInfo
case <-ctx.Done():
return
default:
log.Errorf("PubsubPeerJoin: error interacting with new peer", err)
log.Errorf("PubsubPeerJoin: error interacting with new peer: %s", err)
}
}
}
Expand Down Expand Up @@ -524,29 +495,26 @@ func (p *PubsubValueStore) handleNewMsgs(ctx context.Context, sub *pubsub.Subscr
}

func (p *PubsubValueStore) handleNewPeer(ctx context.Context, peerEvtHandler *pubsub.TopicEventHandler, key string) ([]byte, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

var pid peer.ID

for {
for ctx.Err() == nil {
peerEvt, err := peerEvtHandler.NextPeerEvent(ctx)
if err != nil {
if err != context.Canceled {
log.Warningf("PubsubNewPeer: subscription error in %s: %s", key, err.Error())
}
return nil, err
}
if peerEvt.Type == pubsub.PeerJoin {
pid = peerEvt.Peer
break

if peerEvt.Type != pubsub.PeerJoin {
continue
}
}

return p.fetch.Fetch(ctx, pid, key)
value, err := p.fetch.Fetch(ctx, peerEvt.Peer, key)
if err == nil {
return value, nil
}
log.Debugf("failed to fetch latest pubsub value for key '%s' from peer '%s': %s", key, peerEvt.Peer, err)
}
return nil, ctx.Err()
}

func (p *PubsubValueStore) notifyWatchers(key string, data []byte) {
Expand Down

0 comments on commit 1a38051

Please sign in to comment.