Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#56 from ipfs/feat/connect-provider…
Browse files Browse the repository at this point in the history
…s-in-sessions

fix(sessions): explicitly connect found peers

This commit was moved from ipfs/go-bitswap@fa9aec8
  • Loading branch information
hannahhoward authored Jan 11, 2019
2 parents b272024 + f5a3826 commit 13ba88a
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 3 deletions.
41 changes: 41 additions & 0 deletions bitswap/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,47 @@ func TestSessionSplitFetch(t *testing.T) {
}
}

func TestFetchNotConnected(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

bssession.SetProviderSearchDelay(10 * time.Millisecond)
vnet := getVirtualNetwork()
sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()

other := sesgen.Next()

blks := bgen.Blocks(10)
for _, block := range blks {
if err := other.Exchange.HasBlock(block); err != nil {
t.Fatal(err)
}
}

var cids []cid.Cid
for _, blk := range blks {
cids = append(cids, blk.Cid())
}

thisNode := sesgen.Next()
ses := thisNode.Exchange.NewSession(ctx).(*bssession.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)

ch, err := ses.GetBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}

var got []blocks.Block
for b := range ch {
got = append(got, b)
}
if err := assertBlockLists(got, blks); err != nil {
t.Fatal(err)
}
}
func TestInterestCacheOverflow(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
7 changes: 6 additions & 1 deletion bitswap/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,12 @@ func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
}
}

const provSearchDelay = time.Second * 10
var provSearchDelay = time.Second

// SetProviderSearchDelay overwrites the global provider search delay
func SetProviderSearchDelay(newProvSearchDelay time.Duration) {
provSearchDelay = newProvSearchDelay
}

// Session run loop -- everything function below here should not be called
// of this loop
Expand Down
13 changes: 12 additions & 1 deletion bitswap/sessionpeermanager/sessionpeermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ import (
"fmt"
"math/rand"

logging "github.com/ipfs/go-log"

cid "github.com/ipfs/go-cid"
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
peer "github.com/libp2p/go-libp2p-peer"
)

var log = logging.Logger("bitswap")

const (
maxOptimizedPeers = 32
reservePeers = 2
Expand All @@ -18,6 +22,7 @@ const (
// PeerNetwork is an interface for finding providers and managing connections
type PeerNetwork interface {
ConnectionManager() ifconnmgr.ConnManager
ConnectTo(context.Context, peer.ID) error
FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.ID
}

Expand Down Expand Up @@ -102,7 +107,13 @@ func (spm *SessionPeerManager) FindMorePeers(ctx context.Context, c cid.Cid) {
// - ensure two 'findprovs' calls for the same block don't run concurrently
// - share peers between sessions based on interest set
for p := range spm.network.FindProvidersAsync(ctx, k, 10) {
spm.peerMessages <- &peerFoundMessage{p}
go func(p peer.ID) {
err := spm.network.ConnectTo(ctx, p)
if err != nil {
log.Debugf("failed to connect to provider %s: %s", p, err)
}
spm.peerMessages <- &peerFoundMessage{p}
}(p)
}
}(c)
}
Expand Down
6 changes: 5 additions & 1 deletion bitswap/sessionpeermanager/sessionpeermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package sessionpeermanager

import (
"context"
"sync"
"math/rand"
"sync"
"testing"
"time"

Expand All @@ -24,6 +24,10 @@ func (fpn *fakePeerNetwork) ConnectionManager() ifconnmgr.ConnManager {
return fpn.connManager
}

func (fpn *fakePeerNetwork) ConnectTo(context.Context, peer.ID) error {
return nil
}

func (fpn *fakePeerNetwork) FindProvidersAsync(ctx context.Context, c cid.Cid, num int) <-chan peer.ID {
peerCh := make(chan peer.ID)
go func() {
Expand Down

0 comments on commit 13ba88a

Please sign in to comment.