Skip to content

Commit

Permalink
Merge pull request #1463 from libp2p/merge-upgrader
Browse files Browse the repository at this point in the history
move go-libp2p-transport-upgrader here
  • Loading branch information
marten-seemann authored Apr 27, 2022
2 parents 4b7059c + 9dc18ed commit 9f11ccf
Show file tree
Hide file tree
Showing 15 changed files with 1,163 additions and 11 deletions.
3 changes: 1 addition & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import (
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"

tptu "github.com/libp2p/go-libp2p-transport-upgrader"

logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/ipfs/go-datastore v0.5.1
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-log/v2 v2.5.1
github.com/jbenet/go-temp-err-catcher v0.1.0
github.com/klauspost/compress v1.15.1
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-eventbus v0.2.1
Expand All @@ -22,10 +23,10 @@ require (
github.com/libp2p/go-libp2p-nat v0.1.0
github.com/libp2p/go-libp2p-noise v0.4.0
github.com/libp2p/go-libp2p-peerstore v0.6.0
github.com/libp2p/go-libp2p-pnet v0.2.0
github.com/libp2p/go-libp2p-resource-manager v0.2.1
github.com/libp2p/go-libp2p-testing v0.9.2
github.com/libp2p/go-libp2p-tls v0.4.1
github.com/libp2p/go-libp2p-transport-upgrader v0.7.1
github.com/libp2p/go-mplex v0.7.0
github.com/libp2p/go-msgio v0.2.0
github.com/libp2p/go-netroute v0.2.0
Expand Down Expand Up @@ -74,16 +75,15 @@ require (
github.com/google/uuid v1.3.0 // indirect
github.com/huin/goupnp v1.0.3 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/koron/go-ssdp v0.0.2 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.0.3 // indirect
github.com/libp2p/go-libp2p-blankhost v0.3.0 // indirect
github.com/libp2p/go-libp2p-pnet v0.2.0 // indirect
github.com/libp2p/go-libp2p-quic-transport v0.17.0 // indirect
github.com/libp2p/go-libp2p-swarm v0.10.2 // indirect
github.com/libp2p/go-libp2p-transport-upgrader v0.7.1 // indirect
github.com/libp2p/go-libp2p-yamux v0.9.1 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-openssl v0.0.7 // indirect
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,6 @@ github.com/libp2p/go-libp2p-core v0.14.0/go.mod h1:tLasfcVdTXnixsLB0QYaT1syJOhsb
github.com/libp2p/go-libp2p-core v0.15.1 h1:0RY+Mi/ARK9DgG1g9xVQLb8dDaaU8tCePMtGALEfBnM=
github.com/libp2p/go-libp2p-core v0.15.1/go.mod h1:agSaboYM4hzB1cWekgVReqV5M4g5M+2eNNejV+1EEhs=
github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g=
github.com/libp2p/go-libp2p-mplex v0.5.0 h1:vt3k4E4HSND9XH4Z8rUpacPJFSAgLOv6HDvG8W9Ks9E=
github.com/libp2p/go-libp2p-mplex v0.5.0/go.mod h1:eLImPJLkj3iG5t5lq68w3Vm5NAQ5BcKwrrb2VmOYb3M=
github.com/libp2p/go-libp2p-nat v0.1.0 h1:vigUi2MEN+fwghe5ijpScxtbbDz+L/6y8XwlzYOJgSY=
github.com/libp2p/go-libp2p-nat v0.1.0/go.mod h1:DQzAG+QbDYjN1/C3B6vXucLtz3u9rEonLVPtZVzQqks=
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/dial_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream"
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"

Expand All @@ -19,7 +20,6 @@ import (

"github.com/libp2p/go-libp2p-peerstore/pstoremem"
tnet "github.com/libp2p/go-libp2p-testing/net"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
msmux "github.com/libp2p/go-stream-muxer-multistream"
ma "github.com/multiformats/go-multiaddr"

Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"

Expand All @@ -22,7 +23,6 @@ import (

"github.com/libp2p/go-libp2p-peerstore/pstoremem"
tnet "github.com/libp2p/go-libp2p-testing/net"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
msmux "github.com/libp2p/go-stream-muxer-multistream"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
Expand Down
52 changes: 52 additions & 0 deletions p2p/net/upgrader/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package upgrader

import (
"fmt"

"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/transport"
)

type transportConn struct {
mux.MuxedConn
network.ConnMultiaddrs
network.ConnSecurity
transport transport.Transport
scope network.ConnManagementScope
stat network.ConnStats
}

var _ transport.CapableConn = &transportConn{}

func (t *transportConn) Transport() transport.Transport {
return t.transport
}

func (t *transportConn) String() string {
ts := ""
if s, ok := t.transport.(fmt.Stringer); ok {
ts = "[" + s.String() + "]"
}
return fmt.Sprintf(
"<stream.Conn%s %s (%s) <-> %s (%s)>",
ts,
t.LocalMultiaddr(),
t.LocalPeer(),
t.RemoteMultiaddr(),
t.RemotePeer(),
)
}

func (t *transportConn) Stat() network.ConnStats {
return t.stat
}

func (t *transportConn) Scope() network.ConnScope {
return t.scope
}

func (t *transportConn) Close() error {
defer t.scope.Done()
return t.MuxedConn.Close()
}
60 changes: 60 additions & 0 deletions p2p/net/upgrader/gater_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package upgrader_test

import (
"sync"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/control"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

ma "github.com/multiformats/go-multiaddr"
)

type testGater struct {
sync.Mutex

blockAccept, blockSecured bool
}

var _ connmgr.ConnectionGater = (*testGater)(nil)

func (t *testGater) BlockAccept(block bool) {
t.Lock()
defer t.Unlock()

t.blockAccept = block
}

func (t *testGater) BlockSecured(block bool) {
t.Lock()
defer t.Unlock()

t.blockSecured = block
}

func (t *testGater) InterceptPeerDial(p peer.ID) (allow bool) {
panic("not implemented")
}

func (t *testGater) InterceptAddrDial(id peer.ID, multiaddr ma.Multiaddr) (allow bool) {
panic("not implemented")
}

func (t *testGater) InterceptAccept(multiaddrs network.ConnMultiaddrs) (allow bool) {
t.Lock()
defer t.Unlock()

return !t.blockAccept
}

func (t *testGater) InterceptSecured(direction network.Direction, id peer.ID, multiaddrs network.ConnMultiaddrs) (allow bool) {
t.Lock()
defer t.Unlock()

return !t.blockSecured
}

func (t *testGater) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason) {
panic("not implemented")
}
178 changes: 178 additions & 0 deletions p2p/net/upgrader/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package upgrader

import (
"context"
"fmt"
"sync"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/transport"

logging "github.com/ipfs/go-log/v2"
tec "github.com/jbenet/go-temp-err-catcher"
manet "github.com/multiformats/go-multiaddr/net"
)

var log = logging.Logger("upgrader")

type listener struct {
manet.Listener

transport transport.Transport
upgrader *upgrader
rcmgr network.ResourceManager

incoming chan transport.CapableConn
err error

// Used for backpressure
threshold *threshold

// Canceling this context isn't sufficient to tear down the listener.
// Call close.
ctx context.Context
cancel func()
}

// Close closes the listener.
func (l *listener) Close() error {
// Do this first to try to get any relevent errors.
err := l.Listener.Close()

l.cancel()
// Drain and wait.
for c := range l.incoming {
c.Close()
}
return err
}

// handles inbound connections.
//
// This function does a few interesting things that should be noted:
//
// 1. It logs and discards temporary/transient errors (errors with a Temporary()
// function that returns true).
// 2. It stops accepting new connections once AcceptQueueLength connections have
// been fully negotiated but not accepted. This gives us a basic backpressure
// mechanism while still allowing us to negotiate connections in parallel.
func (l *listener) handleIncoming() {
var wg sync.WaitGroup
defer func() {
// make sure we're closed
l.Listener.Close()
if l.err == nil {
l.err = fmt.Errorf("listener closed")
}

wg.Wait()
close(l.incoming)
}()

var catcher tec.TempErrCatcher
for l.ctx.Err() == nil {
maconn, err := l.Listener.Accept()
if err != nil {
// Note: function may pause the accept loop.
if catcher.IsTemporary(err) {
log.Infof("temporary accept error: %s", err)
continue
}
l.err = err
return
}
catcher.Reset()

// gate the connection if applicable
if l.upgrader.connGater != nil && !l.upgrader.connGater.InterceptAccept(maconn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if err := maconn.Close(); err != nil {
log.Warnf("failed to close incoming connection rejected by gater: %s", err)
}
continue
}

connScope, err := l.rcmgr.OpenConnection(network.DirInbound, true)
if err != nil {
log.Debugw("resource manager blocked accept of new connection", "error", err)
if err := maconn.Close(); err != nil {
log.Warnf("failed to incoming connection rejected by resource manager: %s", err)
}
continue
}

// The go routine below calls Release when the context is
// canceled so there's no need to wait on it here.
l.threshold.Wait()

log.Debugf("listener %s got connection: %s <---> %s",
l,
maconn.LocalMultiaddr(),
maconn.RemoteMultiaddr())

wg.Add(1)
go func() {
defer wg.Done()

ctx, cancel := context.WithTimeout(l.ctx, l.upgrader.acceptTimeout)
defer cancel()

conn, err := l.upgrader.Upgrade(ctx, l.transport, maconn, network.DirInbound, "", connScope)
if err != nil {
// Don't bother bubbling this up. We just failed
// to completely negotiate the connection.
log.Debugf("accept upgrade error: %s (%s <--> %s)",
err,
maconn.LocalMultiaddr(),
maconn.RemoteMultiaddr())
connScope.Done()
return
}

log.Debugf("listener %s accepted connection: %s", l, conn)

// This records the fact that the connection has been
// setup and is waiting to be accepted. This call
// *never* blocks, even if we go over the threshold. It
// simply ensures that calls to Wait block while we're
// over the threshold.
l.threshold.Acquire()
defer l.threshold.Release()

select {
case l.incoming <- conn:
case <-ctx.Done():
if l.ctx.Err() == nil {
// Listener *not* closed but the accept timeout expired.
log.Warn("listener dropped connection due to slow accept")
}
// Wait on the context with a timeout. This way,
// if we stop accepting connections for some reason,
// we'll eventually close all the open ones
// instead of hanging onto them.
conn.Close()
}
}()
}
}

// Accept accepts a connection.
func (l *listener) Accept() (transport.CapableConn, error) {
for c := range l.incoming {
// Could have been sitting there for a while.
if !c.IsClosed() {
return c, nil
}
}
return nil, l.err
}

func (l *listener) String() string {
if s, ok := l.transport.(fmt.Stringer); ok {
return fmt.Sprintf("<stream.Listener[%s] %s>", s, l.Multiaddr())
}
return fmt.Sprintf("<stream.Listener %s>", l.Multiaddr())
}

var _ transport.Listener = (*listener)(nil)
Loading

0 comments on commit 9f11ccf

Please sign in to comment.