Skip to content

Commit

Permalink
contexts: make sure to abort when a context is canceled
Browse files Browse the repository at this point in the history
Also, buffer single-use channels we may walk away from. This was showing
up (rarely) in a go-ipfs test.


This commit was moved from ipfs/go-bitswap@0cbfff7
  • Loading branch information
Stebalien committed Jan 22, 2019
1 parent 13ba88a commit 8674ca8
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 20 deletions.
15 changes: 12 additions & 3 deletions bitswap/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,18 @@ func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {

// ConnectedPeers returns a list of peers this PeerManager is managing.
func (pm *PeerManager) ConnectedPeers() []peer.ID {
resp := make(chan []peer.ID)
pm.peerMessages <- &getPeersMessage{resp}
return <-resp
resp := make(chan []peer.ID, 1)
select {
case pm.peerMessages <- &getPeersMessage{resp}:
case <-pm.ctx.Done():
return nil
}
select {
case peers := <-resp:
return peers
case <-pm.ctx.Done():
return nil
}
}

// Connected is called to add a new peer to the pool, and send it an initial set
Expand Down
9 changes: 7 additions & 2 deletions bitswap/sessionpeermanager/sessionpeermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (spm *SessionPeerManager) RecordPeerRequests(p []peer.ID, ks []cid.Cid) {
func (spm *SessionPeerManager) GetOptimizedPeers() []peer.ID {
// right now this just returns all peers, but soon we might return peers
// ordered by optimization, or only a subset
resp := make(chan []peer.ID)
resp := make(chan []peer.ID, 1)
select {
case spm.peerMessages <- &peerReqMessage{resp}:
case <-spm.ctx.Done():
Expand All @@ -108,11 +108,16 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
// - share peers between sessions based on interest set
for p := range spm.network.FindProvidersAsync(ctx, k, 10) {
go func(p peer.ID) {
// TODO: Also use context from spm.
err := spm.network.ConnectTo(ctx, p)
if err != nil {
log.Debugf("failed to connect to provider %s: %s", p, err)
}
spm.peerMessages <- &peerFoundMessage{p}
select {
case spm.peerMessages <- &peerFoundMessage{p}:
case <-ctx.Done():
case <-spm.ctx.Done():
}
}(p)
}
}(c)
Expand Down
2 changes: 1 addition & 1 deletion bitswap/sessionrequestsplitter/sessionrequestsplitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func New(ctx context.Context) *SessionRequestSplitter {
// SplitRequest splits a request for the given cids one or more times among the
// given peers.
func (srs *SessionRequestSplitter) SplitRequest(peers []peer.ID, ks []cid.Cid) []*PartialRequest {
resp := make(chan []*PartialRequest)
resp := make(chan []*PartialRequest, 1)

select {
case srs.messages <- &splitRequestMessage{peers, ks, resp}:
Expand Down
60 changes: 48 additions & 12 deletions bitswap/wantmanager/wantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,30 +83,66 @@ func (wm *WantManager) CancelWants(ctx context.Context, ks []cid.Cid, peers []pe

// IsWanted returns whether a CID is currently wanted.
func (wm *WantManager) IsWanted(c cid.Cid) bool {
resp := make(chan bool)
wm.wantMessages <- &isWantedMessage{c, resp}
return <-resp
resp := make(chan bool, 1)
select {
case wm.wantMessages <- &isWantedMessage{c, resp}:
case <-wm.ctx.Done():
return false
}
select {
case wanted := <-resp:
return wanted
case <-wm.ctx.Done():
return false
}
}

// CurrentWants returns the list of current wants.
func (wm *WantManager) CurrentWants() []*wantlist.Entry {
resp := make(chan []*wantlist.Entry)
wm.wantMessages <- &currentWantsMessage{resp}
return <-resp
resp := make(chan []*wantlist.Entry, 1)
select {
case wm.wantMessages <- &currentWantsMessage{resp}:
case <-wm.ctx.Done():
return nil
}
select {
case wantlist := <-resp:
return wantlist
case <-wm.ctx.Done():
return nil
}
}

// CurrentBroadcastWants returns the current list of wants that are broadcasts.
func (wm *WantManager) CurrentBroadcastWants() []*wantlist.Entry {
resp := make(chan []*wantlist.Entry)
wm.wantMessages <- &currentBroadcastWantsMessage{resp}
return <-resp
resp := make(chan []*wantlist.Entry, 1)
select {
case wm.wantMessages <- &currentBroadcastWantsMessage{resp}:
case <-wm.ctx.Done():
return nil
}
select {
case wl := <-resp:
return wl
case <-wm.ctx.Done():
return nil
}
}

// WantCount returns the total count of wants.
func (wm *WantManager) WantCount() int {
resp := make(chan int)
wm.wantMessages <- &wantCountMessage{resp}
return <-resp
resp := make(chan int, 1)
select {
case wm.wantMessages <- &wantCountMessage{resp}:
case <-wm.ctx.Done():
return 0
}
select {
case count := <-resp:
return count
case <-wm.ctx.Done():
return 0
}
}

// Startup starts processing for the WantManager.
Expand Down
8 changes: 6 additions & 2 deletions bitswap/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,15 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
// TODO: come up with a better strategy for determining when to search
// for new providers for blocks.
i := rand.Intn(len(entries))
bs.findKeys <- &blockRequest{
select {
case bs.findKeys <- &blockRequest{
Cid: entries[i].Cid,
Ctx: ctx,
}:
case <-ctx.Done():
return
}
case <-parent.Done():
case <-ctx.Done():
return
}
}
Expand Down

0 comments on commit 8674ca8

Please sign in to comment.