Skip to content

Commit

Permalink
Merge pull request #1832 from libp2p/webtransport-rcmgr
Browse files Browse the repository at this point in the history
webtransport: use the rcmgr to control flow control window increases
  • Loading branch information
marten-seemann authored Oct 24, 2022
2 parents 012b8dd + b50b460 commit c33f910
Show file tree
Hide file tree
Showing 22 changed files with 295 additions and 127 deletions.
85 changes: 41 additions & 44 deletions core/network/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,60 +271,57 @@ type ScopeStat struct {
}

// NullResourceManager is a stub for tests and initialization of default values
var NullResourceManager ResourceManager = &nullResourceManager{}

type nullResourceManager struct{}
type nullScope struct{}

var _ ResourceScope = (*nullScope)(nil)
var _ ResourceScopeSpan = (*nullScope)(nil)
var _ ServiceScope = (*nullScope)(nil)
var _ ProtocolScope = (*nullScope)(nil)
var _ PeerScope = (*nullScope)(nil)
var _ ConnManagementScope = (*nullScope)(nil)
var _ ConnScope = (*nullScope)(nil)
var _ StreamManagementScope = (*nullScope)(nil)
var _ StreamScope = (*nullScope)(nil)
type NullResourceManager struct{}

var _ ResourceScope = (*NullScope)(nil)
var _ ResourceScopeSpan = (*NullScope)(nil)
var _ ServiceScope = (*NullScope)(nil)
var _ ProtocolScope = (*NullScope)(nil)
var _ PeerScope = (*NullScope)(nil)
var _ ConnManagementScope = (*NullScope)(nil)
var _ ConnScope = (*NullScope)(nil)
var _ StreamManagementScope = (*NullScope)(nil)
var _ StreamScope = (*NullScope)(nil)

// NullScope is a stub for tests and initialization of default values
var NullScope = &nullScope{}
type NullScope struct{}

func (n *nullResourceManager) ViewSystem(f func(ResourceScope) error) error {
return f(NullScope)
func (n *NullResourceManager) ViewSystem(f func(ResourceScope) error) error {
return f(&NullScope{})
}
func (n *nullResourceManager) ViewTransient(f func(ResourceScope) error) error {
return f(NullScope)
func (n *NullResourceManager) ViewTransient(f func(ResourceScope) error) error {
return f(&NullScope{})
}
func (n *nullResourceManager) ViewService(svc string, f func(ServiceScope) error) error {
return f(NullScope)
func (n *NullResourceManager) ViewService(svc string, f func(ServiceScope) error) error {
return f(&NullScope{})
}
func (n *nullResourceManager) ViewProtocol(p protocol.ID, f func(ProtocolScope) error) error {
return f(NullScope)
func (n *NullResourceManager) ViewProtocol(p protocol.ID, f func(ProtocolScope) error) error {
return f(&NullScope{})
}
func (n *nullResourceManager) ViewPeer(p peer.ID, f func(PeerScope) error) error {
return f(NullScope)
func (n *NullResourceManager) ViewPeer(p peer.ID, f func(PeerScope) error) error {
return f(&NullScope{})
}
func (n *nullResourceManager) OpenConnection(dir Direction, usefd bool, endpoint multiaddr.Multiaddr) (ConnManagementScope, error) {
return NullScope, nil
func (n *NullResourceManager) OpenConnection(dir Direction, usefd bool, endpoint multiaddr.Multiaddr) (ConnManagementScope, error) {
return &NullScope{}, nil
}
func (n *nullResourceManager) OpenStream(p peer.ID, dir Direction) (StreamManagementScope, error) {
return NullScope, nil
func (n *NullResourceManager) OpenStream(p peer.ID, dir Direction) (StreamManagementScope, error) {
return &NullScope{}, nil
}
func (n *nullResourceManager) Close() error {
func (n *NullResourceManager) Close() error {
return nil
}

func (n *nullScope) ReserveMemory(size int, prio uint8) error { return nil }
func (n *nullScope) ReleaseMemory(size int) {}
func (n *nullScope) Stat() ScopeStat { return ScopeStat{} }
func (n *nullScope) BeginSpan() (ResourceScopeSpan, error) { return NullScope, nil }
func (n *nullScope) Done() {}
func (n *nullScope) Name() string { return "" }
func (n *nullScope) Protocol() protocol.ID { return "" }
func (n *nullScope) Peer() peer.ID { return "" }
func (n *nullScope) PeerScope() PeerScope { return NullScope }
func (n *nullScope) SetPeer(peer.ID) error { return nil }
func (n *nullScope) ProtocolScope() ProtocolScope { return NullScope }
func (n *nullScope) SetProtocol(proto protocol.ID) error { return nil }
func (n *nullScope) ServiceScope() ServiceScope { return NullScope }
func (n *nullScope) SetService(srv string) error { return nil }
func (n *NullScope) ReserveMemory(size int, prio uint8) error { return nil }
func (n *NullScope) ReleaseMemory(size int) {}
func (n *NullScope) Stat() ScopeStat { return ScopeStat{} }
func (n *NullScope) BeginSpan() (ResourceScopeSpan, error) { return &NullScope{}, nil }
func (n *NullScope) Done() {}
func (n *NullScope) Name() string { return "" }
func (n *NullScope) Protocol() protocol.ID { return "" }
func (n *NullScope) Peer() peer.ID { return "" }
func (n *NullScope) PeerScope() PeerScope { return &NullScope{} }
func (n *NullScope) SetPeer(peer.ID) error { return nil }
func (n *NullScope) ProtocolScope() ProtocolScope { return &NullScope{} }
func (n *NullScope) SetProtocol(proto protocol.ID) error { return nil }
func (n *NullScope) ServiceScope() ServiceScope { return &NullScope{} }
func (n *NullScope) SetService(srv string) error { return nil }
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/libp2p/zeroconf/v2 v2.2.0
github.com/lucas-clemente/quic-go v0.30.0
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd
github.com/marten-seemann/webtransport-go v0.1.1
github.com/marten-seemann/webtransport-go v0.2.0
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b
github.com/minio/sha256-simd v1.0.0
github.com/mr-tron/base58 v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ github.com/marten-seemann/qtls-go1-19 v0.1.1 h1:mnbxeq3oEyQxQXwI4ReCgW9DPoPR94sN
github.com/marten-seemann/qtls-go1-19 v0.1.1/go.mod h1:5HTDWtVudo/WFsHKRNuOhWlbdjrfs5JHrYb0wIJqGpI=
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8u83wA0rVZ8ttrq5CpaPZdvrK0LP2lOk=
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs1Nt24+FYQEqAAncTDPJIuGs+LxK1MCiFL25pMU=
github.com/marten-seemann/webtransport-go v0.1.1 h1:TnyKp3pEXcDooTaNn4s9dYpMJ7kMnTp7k5h+SgYP/mc=
github.com/marten-seemann/webtransport-go v0.1.1/go.mod h1:kBEh5+RSvOA4troP1vyOVBWK4MIMzDICXVrvCPrYcrM=
github.com/marten-seemann/webtransport-go v0.2.0 h1:987jPVqcyE3vF+CHNIxDhT0P21O+bI4fVF+0NoRujSo=
github.com/marten-seemann/webtransport-go v0.2.0/go.mod h1:XmnWYsWXaxUF7kjeIIzLWPyS+q0OcBY5vA64NuyK0ps=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/mock/mock_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,5 +189,5 @@ func (c *conn) Stat() network.ConnStats {
}

func (c *conn) Scope() network.ConnScope {
return network.NullScope
return &network.NullScope{}
}
2 changes: 1 addition & 1 deletion p2p/net/mock/mock_peernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,5 +370,5 @@ func (pn *peernet) notifyAll(notification func(f network.Notifiee)) {
}

func (pn *peernet) ResourceManager() network.ResourceManager {
return network.NullResourceManager
return &network.NullResourceManager{}
}
2 changes: 1 addition & 1 deletion p2p/net/mock/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (s *stream) transport() {
}

func (s *stream) Scope() network.StreamScope {
return network.NullScope
return &network.NullScope{}
}

func (s *stream) cancelWrite(err error) {
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, opts ...Option) (*Swarm,
}
}
if s.rcmgr == nil {
s.rcmgr = network.NullResourceManager
s.rcmgr = &network.NullResourceManager{}
}

s.dsync = newDialSync(s.dialWorkerLoop)
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/swarm/swarm_dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestAddrsForDial(t *testing.T) {
ps.AddPrivKey(id, priv)
t.Cleanup(func() { ps.Close() })

tpt, err := websocket.New(nil, network.NullResourceManager)
tpt, err := websocket.New(nil, &network.NullResourceManager{})
require.NoError(t, err)
s, err := NewSwarm(id, ps, WithMultiaddrResolver(resolver))
require.NoError(t, err)
Expand Down Expand Up @@ -81,7 +81,7 @@ func newTestSwarmWithResolver(t *testing.T, resolver *madns.Resolver) *Swarm {
})

// Add a tcp transport so that we know we can dial a tcp multiaddr and we don't filter it out.
tpt, err := tcp.NewTCPTransport(nil, network.NullResourceManager)
tpt, err := tcp.NewTCPTransport(nil, &network.NullResourceManager{})
require.NoError(t, err)
err = s.AddTransport(tpt)
require.NoError(t, err)
Expand Down
30 changes: 15 additions & 15 deletions p2p/net/upgrader/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestAcceptSingleConn(t *testing.T) {
ln := createListener(t, u)
defer ln.Close()

cconn, err := dial(t, u, ln.Multiaddr(), id, network.NullScope)
cconn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)

sconn, err := ln.Accept()
Expand All @@ -80,7 +80,7 @@ func TestAcceptMultipleConns(t *testing.T) {
}()

for i := 0; i < 10; i++ {
cconn, err := dial(t, u, ln.Multiaddr(), id, network.NullScope)
cconn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
toClose = append(toClose, cconn)

Expand All @@ -104,7 +104,7 @@ func TestConnectionsClosedIfNotAccepted(t *testing.T) {
ln := createListener(t, u)
defer ln.Close()

conn, err := dial(t, u, ln.Multiaddr(), id, network.NullScope)
conn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)

errCh := make(chan error)
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestFailedUpgradeOnListen(t *testing.T) {
errCh <- err
}()

_, err := dial(t, u, ln.Multiaddr(), id, network.NullScope)
_, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.Error(err)

// close the listener.
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestListenerClose(t *testing.T) {
require.Contains(err.Error(), "use of closed network connection")

// doesn't accept new connections when it is closed
_, err = dial(t, u, ln.Multiaddr(), peer.ID("1"), network.NullScope)
_, err = dial(t, u, ln.Multiaddr(), peer.ID("1"), &network.NullScope{})
require.Error(err)
}

Expand All @@ -189,7 +189,7 @@ func TestListenerCloseClosesQueued(t *testing.T) {

var conns []transport.CapableConn
for i := 0; i < 10; i++ {
conn, err := dial(t, upgrader, ln.Multiaddr(), id, network.NullScope)
conn, err := dial(t, upgrader, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
conns = append(conns, conn)
}
Expand Down Expand Up @@ -249,7 +249,7 @@ func TestConcurrentAccept(t *testing.T) {
go func() {
defer wg.Done()

conn, err := dial(t, u, ln.Multiaddr(), id, network.NullScope)
conn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestAcceptQueueBacklogged(t *testing.T) {
// setup AcceptQueueLength connections, but don't accept any of them
var counter int32 // to be used atomically
doDial := func() {
conn, err := dial(t, u, ln.Multiaddr(), id, network.NullScope)
conn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
atomic.AddInt32(&counter, 1)
t.Cleanup(func() { conn.Close() })
Expand Down Expand Up @@ -315,36 +315,36 @@ func TestListenerConnectionGater(t *testing.T) {
defer ln.Close()

// no gating.
conn, err := dial(t, u, ln.Multiaddr(), id, network.NullScope)
conn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()

// rejecting after handshake.
testGater.BlockSecured(true)
testGater.BlockAccept(false)
conn, err = dial(t, u, ln.Multiaddr(), "invalid", network.NullScope)
conn, err = dial(t, u, ln.Multiaddr(), "invalid", &network.NullScope{})
require.Error(err)
require.Nil(conn)

// rejecting on accept will trigger firupgrader.
testGater.BlockSecured(true)
testGater.BlockAccept(true)
conn, err = dial(t, u, ln.Multiaddr(), "invalid", network.NullScope)
conn, err = dial(t, u, ln.Multiaddr(), "invalid", &network.NullScope{})
require.Error(err)
require.Nil(conn)

// rejecting only on acceptance.
testGater.BlockSecured(false)
testGater.BlockAccept(true)
conn, err = dial(t, u, ln.Multiaddr(), "invalid", network.NullScope)
conn, err = dial(t, u, ln.Multiaddr(), "invalid", &network.NullScope{})
require.Error(err)
require.Nil(conn)

// back to normal
testGater.BlockSecured(false)
testGater.BlockAccept(false)
conn, err = dial(t, u, ln.Multiaddr(), id, network.NullScope)
conn, err = dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()
Expand All @@ -366,7 +366,7 @@ func TestListenerResourceManagement(t *testing.T) {
connScope.EXPECT().PeerScope(),
)

cconn, err := dial(t, upgrader, ln.Multiaddr(), id, network.NullScope)
cconn, err := dial(t, upgrader, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(t, err)
defer cconn.Close()

Expand All @@ -384,7 +384,7 @@ func TestListenerResourceManagementDenied(t *testing.T) {
ln := createListener(t, upgrader)

rcmgr.EXPECT().OpenConnection(network.DirInbound, true, gomock.Not(ln.Multiaddr())).Return(nil, errors.New("nope"))
_, err := dial(t, upgrader, ln.Multiaddr(), id, network.NullScope)
_, err := dial(t, upgrader, ln.Multiaddr(), id, &network.NullScope{})
require.Error(t, err)

done := make(chan struct{})
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/upgrader/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(secureMuxer sec.SecureMuxer, muxer network.Multiplexer, opts ...Option)
}
}
if u.rcmgr == nil {
u.rcmgr = network.NullResourceManager
u.rcmgr = &network.NullResourceManager{}
}
return u, nil
}
Expand Down
10 changes: 5 additions & 5 deletions p2p/net/upgrader/upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,21 @@ func TestOutboundConnectionGating(t *testing.T) {

testGater := &testGater{}
_, dialUpgrader := createUpgrader(t, upgrader.WithConnectionGater(testGater))
conn, err := dial(t, dialUpgrader, ln.Multiaddr(), id, network.NullScope)
conn, err := dial(t, dialUpgrader, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
require.NotNil(conn)
_ = conn.Close()

// blocking accepts doesn't affect the dialling side, only the listener.
testGater.BlockAccept(true)
conn, err = dial(t, dialUpgrader, ln.Multiaddr(), id, network.NullScope)
conn, err = dial(t, dialUpgrader, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
require.NotNil(conn)
_ = conn.Close()

// now let's block all connections after being secured.
testGater.BlockSecured(true)
conn, err = dial(t, dialUpgrader, ln.Multiaddr(), id, network.NullScope)
conn, err = dial(t, dialUpgrader, ln.Multiaddr(), id, &network.NullScope{})
require.Error(err)
require.Contains(err.Error(), "gater rejected connection")
require.Nil(conn)
Expand All @@ -153,7 +153,7 @@ func TestOutboundResourceManagement(t *testing.T) {
gomock.InOrder(
connScope.EXPECT().PeerScope(),
connScope.EXPECT().SetPeer(id),
connScope.EXPECT().PeerScope().Return(network.NullScope),
connScope.EXPECT().PeerScope().Return(&network.NullScope{}),
)
_, dialUpgrader := createUpgrader(t)
conn, err := dial(t, dialUpgrader, ln.Multiaddr(), id, connScope)
Expand All @@ -174,7 +174,7 @@ func TestOutboundResourceManagement(t *testing.T) {
gomock.InOrder(
connScope.EXPECT().PeerScope(),
connScope.EXPECT().SetPeer(id),
connScope.EXPECT().PeerScope().Return(network.NullScope),
connScope.EXPECT().PeerScope().Return(&network.NullScope{}),
connScope.EXPECT().Done(),
)
_, dialUpgrader := createUpgrader(t)
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/circuitv2/client/reservation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestReservationFailures(t *testing.T) {
host.SetStreamHandler(proto.ProtoIDv2Hop, tc.streamHandler)
}

cl, err := libp2p.New(libp2p.ResourceManager(network.NullResourceManager))
cl, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{}))
require.NoError(t, err)
defer cl.Close()
_, err = client.Reserve(context.Background(), cl, peer.AddrInfo{ID: host.ID(), Addrs: host.Addrs()})
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/internal/circuitv1-deprecated/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (d *RelayTransport) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (t
return nil, err
}
c.tagHop()
scope, _ := network.NullResourceManager.OpenConnection(network.DirOutbound, false, a)
scope, _ := (&network.NullResourceManager{}).OpenConnection(network.DirOutbound, false, a)
return d.upgrader.Upgrade(ctx, d, c, network.DirOutbound, p, scope)
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/transport/quic/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func NewTransport(key ic.PrivKey, psk pnet.PSK, gater connmgr.ConnectionGater, r
return nil, err
}
if rcmgr == nil {
rcmgr = network.NullResourceManager
rcmgr = &network.NullResourceManager{}
}
qconfig := quicConfig.Clone()
keyBytes, err := key.Raw()
Expand Down
2 changes: 1 addition & 1 deletion p2p/transport/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ var _ transport.Transport = &TcpTransport{}
// created. It represents an entire TCP stack (though it might not necessarily be).
func NewTCPTransport(upgrader transport.Upgrader, rcmgr network.ResourceManager, opts ...Option) (*TcpTransport, error) {
if rcmgr == nil {
rcmgr = network.NullResourceManager
rcmgr = &network.NullResourceManager{}
}
tr := &TcpTransport{
upgrader: upgrader,
Expand Down
Loading

0 comments on commit c33f910

Please sign in to comment.