diff --git a/go.sum b/go.sum index 1c7660ed3b..62ea252a03 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= @@ -427,6 +428,7 @@ go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= @@ -532,4 +534,5 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 1058f622f5..766b83b8a1 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -85,8 +85,7 @@ type IDService struct { addrMu sync.Mutex // our own observed addresses. - // TODO: instead of expiring, remove these when we disconnect - observedAddrs *ObservedAddrSet + observedAddrs *ObservedAddrManager subscription event.Subscription emitters struct { @@ -117,7 +116,7 @@ func NewIDService(h host.Host, opts ...Option) *IDService { ctx: hostCtx, ctxCancel: cancel, conns: make(map[network.Conn]chan struct{}), - observedAddrs: NewObservedAddrSet(hostCtx), + observedAddrs: NewObservedAddrManager(hostCtx, h), } // handle local protocol handler updates, and push deltas to peers. @@ -584,31 +583,7 @@ func (ids *IDService) consumeObservedAddress(observed []byte, c network.Conn) { return } - // we should only use ObservedAddr when our connection's LocalAddr is one - // of our ListenAddrs. If we Dial out using an ephemeral addr, knowing that - // address's external mapping is not very useful because the port will not be - // the same as the listen addr. - ifaceaddrs, err := ids.Host.Network().InterfaceListenAddresses() - if err != nil { - log.Infof("failed to get interface listen addrs", err) - return - } - - log.Debugf("identify identifying observed multiaddr: %s %s", c.LocalMultiaddr(), ifaceaddrs) - if !addrInAddrs(c.LocalMultiaddr(), ifaceaddrs) && !addrInAddrs(c.LocalMultiaddr(), ids.Host.Network().ListenAddresses()) { - // not in our list - return - } - - if !HasConsistentTransport(maddr, ids.Host.Addrs()) { - log.Debugf("ignoring observed multiaddr that doesn't match the transports of any addresses we're announcing", c.RemoteMultiaddr()) - return - } - - // ok! we have the observed version of one of our ListenAddresses! - log.Debugf("added own observed listen addr: %s --> %s", c.LocalMultiaddr(), maddr) - ids.observedAddrs.Add(maddr, c.LocalMultiaddr(), c.RemoteMultiaddr(), - c.Stat().Direction) + ids.observedAddrs.Record(c, maddr) } func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool { diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index d9e3318dd7..3822ba8daa 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -5,10 +5,12 @@ import ( "sync" "time" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peerstore" ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr-net" ) // ActivationThresh sets how many times an address must be seen as "activated" @@ -24,9 +26,9 @@ var ActivationThresh = 4 // it is older than OwnObservedAddressTTL * ActivationThresh (40 minutes). var GCInterval = 10 * time.Minute -// observedAddrSetWorkerChannelSize defines how many addresses can be enqueued -// for adding to an ObservedAddrSet. -var observedAddrSetWorkerChannelSize = 16 +// observedAddrManagerWorkerChannelSize defines how many addresses can be enqueued +// for adding to an ObservedAddrManager. +var observedAddrManagerWorkerChannelSize = 16 type observation struct { seenTime time.Time @@ -52,40 +54,52 @@ func (oa *ObservedAddr) activated(ttl time.Duration) bool { } type newObservation struct { - observed, local, observer ma.Multiaddr - direction network.Direction + conn network.Conn + observed ma.Multiaddr } -// ObservedAddrSet keeps track of a set of ObservedAddrs -// the zero-value is ready to be used. -type ObservedAddrSet struct { - sync.RWMutex // guards whole datastruct. +// ObservedAddrManager keeps track of a ObservedAddrs. +type ObservedAddrManager struct { + host host.Host + // latest observation from active connections + // we'll "re-observe" these when we gc + activeConnsMu sync.Mutex + // active connection -> most recent observation + activeConns map[network.Conn]ma.Multiaddr + + mu sync.RWMutex // local(internal) address -> list of observed(external) addresses - addrs map[string][]*ObservedAddr - ttl time.Duration + addrs map[string][]*ObservedAddr + ttl time.Duration + refreshTimer *time.Timer // this is the worker channel wch chan newObservation } -// NewObservedAddrSet returns a new set using peerstore.OwnObservedAddressTTL -// as the TTL. -func NewObservedAddrSet(ctx context.Context) *ObservedAddrSet { - oas := &ObservedAddrSet{ - addrs: make(map[string][]*ObservedAddr), - ttl: peerstore.OwnObservedAddrTTL, - wch: make(chan newObservation, observedAddrSetWorkerChannelSize), +// NewObservedAddrManager returns a new address manager using +// peerstore.OwnObservedAddressTTL as the TTL. +func NewObservedAddrManager(ctx context.Context, host host.Host) *ObservedAddrManager { + oas := &ObservedAddrManager{ + addrs: make(map[string][]*ObservedAddr), + ttl: peerstore.OwnObservedAddrTTL, + wch: make(chan newObservation, observedAddrManagerWorkerChannelSize), + host: host, + activeConns: make(map[network.Conn]ma.Multiaddr), + // refresh every ttl/2 so we don't forget observations from connected peers + refreshTimer: time.NewTimer(peerstore.OwnObservedAddrTTL / 2), } + oas.host.Network().Notify((*obsAddrNotifiee)(oas)) go oas.worker(ctx) return oas } // AddrsFor return all activated observed addresses associated with the given // (resolved) listen address. -func (oas *ObservedAddrSet) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) { - oas.RLock() - defer oas.RUnlock() +func (oas *ObservedAddrManager) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) { + oas.mu.RLock() + defer oas.mu.RUnlock() if len(oas.addrs) == 0 { return nil @@ -108,9 +122,9 @@ func (oas *ObservedAddrSet) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) { } // Addrs return all activated observed addresses -func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) { - oas.RLock() - defer oas.RUnlock() +func (oas *ObservedAddrManager) Addrs() (addrs []ma.Multiaddr) { + oas.mu.RLock() + defer oas.mu.RUnlock() if len(oas.addrs) == 0 { return nil @@ -127,43 +141,79 @@ func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) { return addrs } -// Add attemps to queue a new observed address to be added to the set. -func (oas *ObservedAddrSet) Add(observed, local, observer ma.Multiaddr, - direction network.Direction) { +// Record records an address observation, if valid. +func (oas *ObservedAddrManager) Record(conn network.Conn, observed ma.Multiaddr) { select { - case oas.wch <- newObservation{observed: observed, local: local, observer: observer, direction: direction}: + case oas.wch <- newObservation{ + conn: conn, + observed: observed, + }: default: - log.Debugf("dropping address observation of %s; buffer full", observed) + log.Debugw("dropping address observation due to full buffer", + "from", conn.RemoteMultiaddr(), + "observed", observed, + ) } } -func (oas *ObservedAddrSet) worker(ctx context.Context) { +func (oas *ObservedAddrManager) teardown() { + oas.host.Network().StopNotify((*obsAddrNotifiee)(oas)) + + oas.mu.Lock() + oas.refreshTimer.Stop() + oas.mu.Unlock() +} + +func (oas *ObservedAddrManager) worker(ctx context.Context) { + defer oas.teardown() + ticker := time.NewTicker(GCInterval) defer ticker.Stop() + hostClosing := oas.host.Network().Process().Closing() for { select { case obs := <-oas.wch: - oas.doAdd(obs.observed, obs.local, obs.observer, obs.direction) - + oas.maybeRecordObservation(obs.conn, obs.observed) case <-ticker.C: oas.gc() - + case <-oas.refreshTimer.C: + oas.refresh() + case <-hostClosing: + return case <-ctx.Done(): return } } } -func (oas *ObservedAddrSet) gc() { - oas.Lock() - defer oas.Unlock() +func (oas *ObservedAddrManager) refresh() { + oas.activeConnsMu.Lock() + recycledObservations := make([]newObservation, 0, len(oas.activeConns)) + for conn, observed := range oas.activeConns { + recycledObservations = append(recycledObservations, newObservation{ + conn: conn, + observed: observed, + }) + } + oas.activeConnsMu.Unlock() + + oas.mu.Lock() + defer oas.mu.Unlock() + for _, obs := range recycledObservations { + oas.recordObservationUnlocked(obs.conn, obs.observed) + } + // refresh every ttl/2 so we don't forget observations from connected peers + oas.refreshTimer.Reset(oas.ttl / 2) +} + +func (oas *ObservedAddrManager) gc() { + oas.mu.Lock() + defer oas.mu.Unlock() now := time.Now() for local, observedAddrs := range oas.addrs { - // TODO we can do this without allocating by compacting the array in place - filteredAddrs := make([]*ObservedAddr, 0, len(observedAddrs)) - + filteredAddrs := observedAddrs[:0] for _, a := range observedAddrs { // clean up SeenBy set for k, ob := range a.SeenBy { @@ -185,20 +235,92 @@ func (oas *ObservedAddrSet) gc() { } } -func (oas *ObservedAddrSet) doAdd(observed, local, observer ma.Multiaddr, - direction network.Direction) { +func (oas *ObservedAddrManager) addConn(conn network.Conn, observed ma.Multiaddr) { + oas.activeConnsMu.Lock() + defer oas.activeConnsMu.Unlock() + + // We need to make sure we haven't received a disconnect event for this + // connection yet. The only way to do that right now is to make sure the + // swarm still has the connection. + // + // Doing this under a lock that we _also_ take in a disconnect event + // handler ensures everything happens in the right order. + for _, c := range oas.host.Network().ConnsToPeer(conn.RemotePeer()) { + if c == conn { + oas.activeConns[conn] = observed + return + } + } +} + +func (oas *ObservedAddrManager) removeConn(conn network.Conn) { + // DO NOT remove this lock. + // This ensures we don't call addConn at the same time: + // 1. see that we have a connection and pause inside addConn right before recording it. + // 2. process a disconnect event. + // 3. record the connection (leaking it). + + oas.activeConnsMu.Lock() + delete(oas.activeConns, conn) + oas.activeConnsMu.Unlock() +} + +func (oas *ObservedAddrManager) maybeRecordObservation(conn network.Conn, observed ma.Multiaddr) { + // First, determine if this observation is even worth keeping... + + // Ignore observations from loopback nodes. We already know our loopback + // addresses. + if manet.IsIPLoopback(observed) { + return + } + + // we should only use ObservedAddr when our connection's LocalAddr is one + // of our ListenAddrs. If we Dial out using an ephemeral addr, knowing that + // address's external mapping is not very useful because the port will not be + // the same as the listen addr. + ifaceaddrs, err := oas.host.Network().InterfaceListenAddresses() + if err != nil { + log.Infof("failed to get interface listen addrs", err) + return + } + + local := conn.LocalMultiaddr() + if !addrInAddrs(local, ifaceaddrs) && !addrInAddrs(local, oas.host.Network().ListenAddresses()) { + // not in our list + return + } + + // We should reject the connection if the observation doesn't match the + // transports of one of our advertised addresses. + if !HasConsistentTransport(observed, oas.host.Addrs()) { + log.Debugw( + "observed multiaddr doesn't match the transports of any announced addresses", + "from", conn.RemoteMultiaddr(), + "observed", observed, + ) + return + } + + // Ok, the observation is good, record it. + log.Debugw("added own observed listen addr", "observed", observed) + + defer oas.addConn(conn, observed) + + oas.mu.Lock() + defer oas.mu.Unlock() + oas.recordObservationUnlocked(conn, observed) +} + +func (oas *ObservedAddrManager) recordObservationUnlocked(conn network.Conn, observed ma.Multiaddr) { now := time.Now() - observerString := observerGroup(observer) - localString := string(local.Bytes()) + observerString := observerGroup(conn.RemoteMultiaddr()) + localString := string(conn.LocalMultiaddr().Bytes()) ob := observation{ seenTime: now, - connDirection: direction, + connDirection: conn.Stat().Direction, } - oas.Lock() - defer oas.Unlock() - observedAddrs := oas.addrs[localString] // check if observed address seen yet, if so, update it for i, previousObserved := range observedAddrs { @@ -234,16 +356,29 @@ func observerGroup(m ma.Multiaddr) string { return string(first.Bytes()) } -// SetTTL sets the TTL of an observed address-set. -func (oas *ObservedAddrSet) SetTTL(ttl time.Duration) { - oas.Lock() - defer oas.Unlock() +// SetTTL sets the TTL of an observed address manager. +func (oas *ObservedAddrManager) SetTTL(ttl time.Duration) { + oas.mu.Lock() + defer oas.mu.Unlock() oas.ttl = ttl + // refresh every ttl/2 so we don't forget observations from connected peers + oas.refreshTimer.Reset(ttl / 2) } -// TTL gets the TTL of an observed address-set. -func (oas *ObservedAddrSet) TTL() time.Duration { - oas.RLock() - defer oas.RUnlock() +// TTL gets the TTL of an observed address manager. +func (oas *ObservedAddrManager) TTL() time.Duration { + oas.mu.RLock() + defer oas.mu.RUnlock() return oas.ttl } + +type obsAddrNotifiee ObservedAddrManager + +func (on *obsAddrNotifiee) Listen(n network.Network, a ma.Multiaddr) {} +func (on *obsAddrNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {} +func (on *obsAddrNotifiee) Connected(n network.Network, v network.Conn) {} +func (on *obsAddrNotifiee) Disconnected(n network.Network, v network.Conn) { + (*ObservedAddrManager)(on).removeConn(v) +} +func (on *obsAddrNotifiee) OpenedStream(n network.Network, s network.Stream) {} +func (on *obsAddrNotifiee) ClosedStream(n network.Network, s network.Stream) {} diff --git a/p2p/protocol/identify/obsaddr_test.go b/p2p/protocol/identify/obsaddr_test.go index 3a42a4c000..9c0d2183d4 100644 --- a/p2p/protocol/identify/obsaddr_test.go +++ b/p2p/protocol/identify/obsaddr_test.go @@ -1,4 +1,4 @@ -package identify +package identify_test import ( "context" @@ -7,10 +7,75 @@ import ( "time" detectrace "github.com/ipfs/go-detect-race" - net "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + p2putil "github.com/libp2p/go-libp2p-netutil" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" ma "github.com/multiformats/go-multiaddr" + + identify "github.com/libp2p/go-libp2p/p2p/protocol/identify" ) +type harness struct { + t *testing.T + + mocknet mocknet.Mocknet + host host.Host + + oas *identify.ObservedAddrManager +} + +func (h *harness) add(observer ma.Multiaddr) peer.ID { + // create a new fake peer. + sk, err := p2putil.RandTestBogusPrivateKey() + if err != nil { + h.t.Fatal(err) + } + h2, err := h.mocknet.AddPeer(sk, observer) + if err != nil { + h.t.Fatal(err) + } + _, err = h.mocknet.LinkPeers(h.host.ID(), h2.ID()) + if err != nil { + h.t.Fatal(err) + } + return h2.ID() +} + +func (h *harness) conn(observer peer.ID) network.Conn { + c, err := h.mocknet.ConnectPeers(h.host.ID(), observer) + if err != nil { + h.t.Fatal(err) + } + return c +} + +func (h *harness) observe(observed ma.Multiaddr, observer peer.ID) { + c := h.conn(observer) + h.oas.Record(c, observed) + time.Sleep(1 * time.Millisecond) // let the worker run +} + +func newHarness(ctx context.Context, t *testing.T) harness { + mn := mocknet.New(ctx) + sk, err := p2putil.RandTestBogusPrivateKey() + if err != nil { + t.Fatal(err) + } + + h, err := mn.AddPeer(sk, ma.StringCast("/ip4/127.0.0.1/tcp/10086")) + if err != nil { + t.Fatal(err) + } + + return harness{ + oas: identify.NewObservedAddrManager(ctx, h), + mocknet: mn, + host: h, + } +} + // TestObsAddrSet func TestObsAddrSet(t *testing.T) { m := func(s string) ma.Multiaddr { @@ -55,120 +120,125 @@ func TestObsAddrSet(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - oas := NewObservedAddrSet(ctx) - if !addrsMarch(oas.Addrs(), nil) { + harness := newHarness(ctx, t) + + if !addrsMarch(harness.oas.Addrs(), nil) { t.Error("addrs should be empty") } - add := func(oas *ObservedAddrSet, observed, observer ma.Multiaddr) { - dummyLocal := m("/ip4/127.0.0.1/tcp/10086") - dummyDirection := net.DirOutbound + pa4 := harness.add(a4) + pa5 := harness.add(a5) - oas.Add(observed, dummyLocal, observer, dummyDirection) - time.Sleep(1 * time.Millisecond) // let the worker run - } + pb1 := harness.add(b1) + pb2 := harness.add(b2) + pb3 := harness.add(b3) + pb4 := harness.add(b4) + pb5 := harness.add(b5) - add(oas, a1, a4) - add(oas, a2, a4) - add(oas, a3, a4) + harness.observe(a1, pa4) + harness.observe(a2, pa4) + harness.observe(a3, pa4) // these are all different so we should not yet get them. - if !addrsMarch(oas.Addrs(), nil) { + if !addrsMarch(harness.oas.Addrs(), nil) { t.Error("addrs should _still_ be empty (once)") } // same observer, so should not yet get them. - add(oas, a1, a4) - add(oas, a2, a4) - add(oas, a3, a4) - if !addrsMarch(oas.Addrs(), nil) { + harness.observe(a1, pa4) + harness.observe(a2, pa4) + harness.observe(a3, pa4) + if !addrsMarch(harness.oas.Addrs(), nil) { t.Error("addrs should _still_ be empty (same obs)") } // different observer, but same observer group. - add(oas, a1, a5) - add(oas, a2, a5) - add(oas, a3, a5) - if !addrsMarch(oas.Addrs(), nil) { + harness.observe(a1, pa5) + harness.observe(a2, pa5) + harness.observe(a3, pa5) + if !addrsMarch(harness.oas.Addrs(), nil) { t.Error("addrs should _still_ be empty (same obs group)") } - add(oas, a1, b1) - add(oas, a1, b2) - add(oas, a1, b3) - if !addrsMarch(oas.Addrs(), []ma.Multiaddr{a1}) { + harness.observe(a1, pb1) + harness.observe(a1, pb2) + harness.observe(a1, pb3) + if !addrsMarch(harness.oas.Addrs(), []ma.Multiaddr{a1}) { t.Error("addrs should only have a1") } - add(oas, a2, a5) - add(oas, a1, a5) - add(oas, a1, a5) - add(oas, a2, b1) - add(oas, a1, b1) - add(oas, a1, b1) - add(oas, a2, b2) - add(oas, a1, b2) - add(oas, a1, b2) - add(oas, a2, b4) - add(oas, a2, b5) - if !addrsMarch(oas.Addrs(), []ma.Multiaddr{a1, a2}) { + harness.observe(a2, pa5) + harness.observe(a1, pa5) + harness.observe(a1, pa5) + harness.observe(a2, pb1) + harness.observe(a1, pb1) + harness.observe(a1, pb1) + harness.observe(a2, pb2) + harness.observe(a1, pb2) + harness.observe(a1, pb2) + harness.observe(a2, pb4) + harness.observe(a2, pb5) + if !addrsMarch(harness.oas.Addrs(), []ma.Multiaddr{a1, a2}) { t.Error("addrs should only have a1, a2") } - // change the timeout constant so we can time it out. - oas.SetTTL(time.Millisecond * 200) + // force a refresh. + harness.oas.SetTTL(time.Millisecond * 200) <-time.After(time.Millisecond * 210) - if !addrsMarch(oas.Addrs(), nil) { - t.Error("addrs should have timed out") + if !addrsMarch(harness.oas.Addrs(), []ma.Multiaddr{a1, a2}) { + t.Error("addrs should only have a1, a2") } -} -func TestAddAddrsProfile(b *testing.T) { - if detectrace.WithRace() { - b.Skip("test too slow when the race detector is running") - } - m := func(s string) ma.Multiaddr { - m, err := ma.NewMultiaddr(s) - if err != nil { - b.Fatal(err) + // disconnect from all but b5. + for _, p := range harness.host.Network().Peers() { + if p == pb5 { + continue } - return m + harness.host.Network().ClosePeer(p) } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - oas := NewObservedAddrSet(ctx) + // wait for all other addresses to time out. + <-time.After(time.Millisecond * 210) + + // Should still have a2 + if !addrsMarch(harness.oas.Addrs(), []ma.Multiaddr{a2}) { + t.Error("should only have a2, have: ", harness.oas.Addrs()) + } + + harness.host.Network().ClosePeer(pb5) - add := func(oas *ObservedAddrSet, observed, observer ma.Multiaddr) { - dummyLocal := m("/ip4/127.0.0.1/tcp/10086") - dummyDirection := net.DirOutbound + // wait for all addresses to timeout + <-time.After(time.Millisecond * 400) - oas.Add(observed, dummyLocal, observer, dummyDirection) - time.Sleep(1 * time.Millisecond) // let the worker run + // Should still have a2 + if !addrsMarch(harness.oas.Addrs(), nil) { + t.Error("addrs should have timed out") } +} - a1 := m("/ip4/1.2.3.4/tcp/1231") - a2 := m("/ip4/1.2.3.4/tcp/1232") - a3 := m("/ip4/1.2.3.4/tcp/1233") - a4 := m("/ip4/1.2.3.4/tcp/1234") - a5 := m("/ip4/1.2.3.4/tcp/1235") +func TestAddAddrsProfile(t *testing.T) { + if detectrace.WithRace() { + t.Skip("test too slow when the race detector is running") + } - b1 := m("/ip4/1.2.3.6/tcp/1236") - b2 := m("/ip4/1.2.3.7/tcp/1237") - b3 := m("/ip4/1.2.3.8/tcp/1237") - b4 := m("/ip4/1.2.3.9/tcp/1237") - b5 := m("/ip4/1.2.3.10/tcp/1237") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + harness := newHarness(ctx, t) - _ = []ma.Multiaddr{a1, a2, a3, a4, a5, b1, b2, b3, b4, b5} + addr := ma.StringCast("/ip4/1.2.3.4/tcp/1231") + p := harness.add(ma.StringCast("/ip4/1.2.3.6/tcp/1236")) + c := harness.conn(p) var wg sync.WaitGroup for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < 10000; j++ { - add(oas, a1, b1) + harness.oas.Record(c, addr) + time.Sleep(1 * time.Millisecond) } }() }