Skip to content

Commit

Permalink
Merge pull request #107 from libp2p/feat/better-dialsync
Browse files Browse the repository at this point in the history
Improve swarm dial sync code
  • Loading branch information
whyrusleeping authored Sep 16, 2016
2 parents d3f70d1 + 126e150 commit 4ae3510
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 127 deletions.
92 changes: 92 additions & 0 deletions p2p/net/swarm/dial_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package swarm

import (
"context"
"sync"

peer "github.com/ipfs/go-libp2p-peer"
)

type DialFunc func(context.Context, peer.ID) (*Conn, error)

func NewDialSync(dfn DialFunc) *DialSync {
return &DialSync{
dials: make(map[peer.ID]*activeDial),
dialFunc: dfn,
}
}

type DialSync struct {
dials map[peer.ID]*activeDial
dialsLk sync.Mutex
dialFunc DialFunc
}

type activeDial struct {
id peer.ID
refCnt int
refCntLk sync.Mutex
cancel func()

err error
conn *Conn
waitch chan struct{}

ds *DialSync
}

func (dr *activeDial) wait(ctx context.Context) (*Conn, error) {
defer dr.decref()
select {
case <-dr.waitch:
return dr.conn, dr.err
case <-ctx.Done():
return nil, ctx.Err()
}
}

func (ad *activeDial) incref() {
ad.refCntLk.Lock()
defer ad.refCntLk.Unlock()
ad.refCnt++
}

func (ad *activeDial) decref() {
ad.refCntLk.Lock()
defer ad.refCntLk.Unlock()
ad.refCnt--
if ad.refCnt <= 0 {
ad.cancel()
ad.ds.dialsLk.Lock()
delete(ad.ds.dials, ad.id)
ad.ds.dialsLk.Unlock()
}
}

func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) {
ds.dialsLk.Lock()

actd, ok := ds.dials[p]
if !ok {
ctx, cancel := context.WithCancel(context.Background())
actd = &activeDial{
id: p,
cancel: cancel,
waitch: make(chan struct{}),
ds: ds,
}
ds.dials[p] = actd

go func(ctx context.Context, p peer.ID, ad *activeDial) {
ad.conn, ad.err = ds.dialFunc(ctx, p)
close(ad.waitch)
ad.cancel()
ad.waitch = nil // to ensure nobody tries reusing this
}(ctx, p, actd)
}

actd.incref()
ds.dialsLk.Unlock()

return actd.wait(ctx)
}
203 changes: 203 additions & 0 deletions p2p/net/swarm/dial_sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package swarm

import (
"context"
"fmt"
"sync"
"testing"
"time"

peer "github.com/ipfs/go-libp2p-peer"
)

func getMockDialFunc() (DialFunc, func(), context.Context, <-chan struct{}) {
dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care
dialctx, cancel := context.WithCancel(context.Background())
ch := make(chan struct{})
f := func(ctx context.Context, p peer.ID) (*Conn, error) {
dfcalls <- struct{}{}
defer cancel()
select {
case <-ch:
return new(Conn), nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

o := new(sync.Once)

return f, func() { o.Do(func() { close(ch) }) }, dialctx, dfcalls
}

func TestBasicDialSync(t *testing.T) {
df, done, _, callsch := getMockDialFunc()

dsync := NewDialSync(df)

p := peer.ID("testpeer")

ctx := context.Background()

finished := make(chan struct{})
go func() {
_, err := dsync.DialLock(ctx, p)
if err != nil {
t.Error(err)
}
finished <- struct{}{}
}()

go func() {
_, err := dsync.DialLock(ctx, p)
if err != nil {
t.Error(err)
}
finished <- struct{}{}
}()

// short sleep just to make sure we've moved around in the scheduler
time.Sleep(time.Millisecond * 20)
done()

<-finished
<-finished

if len(callsch) > 1 {
t.Fatal("should only have called dial func once!")
}
}

func TestDialSyncCancel(t *testing.T) {
df, done, _, dcall := getMockDialFunc()

dsync := NewDialSync(df)

p := peer.ID("testpeer")

ctx1, cancel1 := context.WithCancel(context.Background())

finished := make(chan struct{})
go func() {
_, err := dsync.DialLock(ctx1, p)
if err != ctx1.Err() {
t.Error("should have gotten context error")
}
finished <- struct{}{}
}()

// make sure the above makes it through the wait code first
select {
case <-dcall:
case <-time.After(time.Second):
t.Fatal("timed out waiting for dial to start")
}

// Add a second dialwait in so two actors are waiting on the same dial
go func() {
_, err := dsync.DialLock(context.Background(), p)
if err != nil {
t.Error(err)
}
finished <- struct{}{}
}()

time.Sleep(time.Millisecond * 20)

// cancel the first dialwait, it should not affect the second at all
cancel1()
select {
case <-finished:
case <-time.After(time.Second):
t.Fatal("timed out waiting for wait to exit")
}

// short sleep just to make sure we've moved around in the scheduler
time.Sleep(time.Millisecond * 20)
done()

<-finished
}

func TestDialSyncAllCancel(t *testing.T) {
df, done, dctx, _ := getMockDialFunc()

dsync := NewDialSync(df)

p := peer.ID("testpeer")

ctx1, cancel1 := context.WithCancel(context.Background())

finished := make(chan struct{})
go func() {
_, err := dsync.DialLock(ctx1, p)
if err != ctx1.Err() {
t.Error("should have gotten context error")
}
finished <- struct{}{}
}()

// Add a second dialwait in so two actors are waiting on the same dial
go func() {
_, err := dsync.DialLock(ctx1, p)
if err != ctx1.Err() {
t.Error("should have gotten context error")
}
finished <- struct{}{}
}()

cancel1()
for i := 0; i < 2; i++ {
select {
case <-finished:
case <-time.After(time.Second):
t.Fatal("timed out waiting for wait to exit")
}
}

// the dial should have exited now
select {
case <-dctx.Done():
case <-time.After(time.Second):
t.Fatal("timed out waiting for dial to return")
}

// should be able to successfully dial that peer again
done()
_, err := dsync.DialLock(context.Background(), p)
if err != nil {
t.Fatal(err)
}
}

func TestFailFirst(t *testing.T) {
var count int
f := func(ctx context.Context, p peer.ID) (*Conn, error) {
if count > 0 {
return new(Conn), nil
}
count++
return nil, fmt.Errorf("gophers ate the modem")
}

ds := NewDialSync(f)

p := peer.ID("testing")

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

_, err := ds.DialLock(ctx, p)
if err == nil {
t.Fatal("expected gophers to have eaten the modem")
}

c, err := ds.DialLock(ctx, p)
if err != nil {
t.Fatal(err)
}

if c == nil {
t.Fatal("should have gotten a 'real' conn back")
}
}
1 change: 0 additions & 1 deletion p2p/net/swarm/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,6 @@ func TestDialBackoff(t *testing.T) {
if !s1.backf.Backoff(s3p) {
t.Error("s3 should be on backoff")
}

}
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/limiter.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package swarm

import (
"context"
"sync"

peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr"
context "golang.org/x/net/context"

conn "github.com/libp2p/go-libp2p/p2p/net/conn"
addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr"
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/limiter_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package swarm

import (
"context"
"fmt"
"math/rand"
"strconv"
Expand All @@ -10,7 +11,6 @@ import (
peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr"
mafmt "github.com/whyrusleeping/mafmt"
context "golang.org/x/net/context"

conn "github.com/libp2p/go-libp2p/p2p/net/conn"
)
Expand Down
5 changes: 3 additions & 2 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package swarm

import (
"context"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -32,7 +33,6 @@ import (
yamux "github.com/whyrusleeping/go-smux-yamux"
mafilter "github.com/whyrusleeping/multiaddr-filter"
ws "github.com/whyrusleeping/ws-transport"
context "golang.org/x/net/context"
)

var log = logging.Logger("swarm2")
Expand Down Expand Up @@ -76,7 +76,7 @@ type Swarm struct {
peers pstore.Peerstore
connh ConnHandler

dsync dialsync
dsync *DialSync
backf dialbackoff
dialT time.Duration // mainly for tests

Expand Down Expand Up @@ -134,6 +134,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
dialer: conn.NewDialer(local, peers.PrivKey(local), wrap),
}

s.dsync = NewDialSync(s.doDial)
s.limiter = newDialLimiter(s.dialAddr)

// configure Swarm
Expand Down
Loading

0 comments on commit 4ae3510

Please sign in to comment.