Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

use the Resource Manager #249

Merged
merged 2 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/interop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ jobs:
if [[ `git merge-base --is-ancestor HEAD 3123af36d6cec13e31dac75058c8046e6e4a6690; echo $?` != "1" ]]; then
TAGS+=("stream_open_no_context")
fi
# This command doesn't take into account off-master releases. That's why we need a special case for the v0.11.2 release.
if [[ `git merge-base --is-ancestor HEAD 5c11755be71c11950e107f0c2c7b900e1d59ce6d; echo $?` != "1" || `git merge-base --is-ancestor HEAD v0.11.2; echo $?` != "1" ]]; then
TAGS+=("new_transport_no_rcmgr")
fi
if [[ "${{ matrix.cfg.retireBugBackwardsCompatiblityMode }}" == "true" ]]; then
TAGS+=("retirebugcompatmode")
fi
Expand Down
2 changes: 1 addition & 1 deletion cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func run(raddr string, p string) error {
return err
}

t, err := libp2pquic.NewTransport(priv, nil, nil)
t, err := libp2pquic.NewTransport(priv, nil, nil, nil)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func run(port string) error {
return err
}

t, err := libp2pquic.NewTransport(priv, nil, nil)
t, err := libp2pquic.NewTransport(priv, nil, nil, nil)
if err != nil {
return err
}
Expand Down
22 changes: 17 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"

ic "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
tpt "github.com/libp2p/go-libp2p-core/transport"

Expand All @@ -15,7 +15,8 @@ import (
type conn struct {
sess quic.Session
pconn *reuseConn
transport tpt.Transport
transport *transport
scope network.ConnManagementScope

localPeer peer.ID
privKey ic.PrivKey
Expand All @@ -32,23 +33,30 @@ var _ tpt.CapableConn = &conn{}
// It must be called even if the peer closed the connection in order for
// garbage collection to properly work in this package.
func (c *conn) Close() error {
c.transport.removeConn(c.sess)
err := c.sess.CloseWithError(0, "")
c.pconn.DecreaseCount()
return c.sess.CloseWithError(0, "")
c.scope.Done()
return err
}

// IsClosed returns whether a connection is fully closed.
func (c *conn) IsClosed() bool {
return c.sess.Context().Err() != nil
}

func (c *conn) allowWindowIncrease(size uint64) bool {
return c.scope.ReserveMemory(int(size), network.ReservationPriorityMedium) == nil
}

// OpenStream creates a new stream.
func (c *conn) OpenStream(ctx context.Context) (mux.MuxedStream, error) {
func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
qstr, err := c.sess.OpenStreamSync(ctx)
return &stream{Stream: qstr}, err
}

// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (mux.MuxedStream, error) {
func (c *conn) AcceptStream() (network.MuxedStream, error) {
qstr, err := c.sess.AcceptStream(context.Background())
return &stream{Stream: qstr}, err
}
Expand Down Expand Up @@ -86,3 +94,7 @@ func (c *conn) RemoteMultiaddr() ma.Multiaddr {
func (c *conn) Transport() tpt.Transport {
return c.transport
}

func (c *conn) Scope() network.ConnScope {
return c.scope
}
159 changes: 135 additions & 24 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -14,9 +15,12 @@ import (
"time"

ic "github.com/libp2p/go-libp2p-core/crypto"
n "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
tpt "github.com/libp2p/go-libp2p-core/transport"

mocknetwork "github.com/libp2p/go-libp2p-testing/mocks/network"

quicproxy "github.com/lucas-clemente/quic-go/integrationtests/tools/proxy"
ma "github.com/multiformats/go-multiaddr"

Expand Down Expand Up @@ -46,24 +50,22 @@ func createPeer(t *testing.T) (peer.ID, ic.PrivKey) {
return id, priv
}

func runServer(t *testing.T, tr tpt.Transport, multiaddr string) tpt.Listener {
func runServer(t *testing.T, tr tpt.Transport, addr string) tpt.Listener {
t.Helper()
addr, err := ma.NewMultiaddr(multiaddr)
require.NoError(t, err)
ln, err := tr.Listen(addr)
ln, err := tr.Listen(ma.StringCast(addr))
require.NoError(t, err)
return ln
}

func TestHandshake(t *testing.T) {
serverID, serverKey := createPeer(t)
clientID, clientKey := createPeer(t)
serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()

handshake := func(t *testing.T, ln tpt.Listener) {
clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
Expand Down Expand Up @@ -97,17 +99,126 @@ func TestHandshake(t *testing.T) {
})
}

func TestResourceManagerSuccess(t *testing.T) {
serverID, serverKey := createPeer(t)
clientID, clientKey := createPeer(t)

ctrl := gomock.NewController(t)
defer ctrl.Finish()

serverRcmgr := mocknetwork.NewMockResourceManager(ctrl)
serverTransport, err := NewTransport(serverKey, nil, nil, serverRcmgr)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()
ln, err := serverTransport.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/quic"))
require.NoError(t, err)
defer ln.Close()

clientRcmgr := mocknetwork.NewMockResourceManager(ctrl)
clientTransport, err := NewTransport(clientKey, nil, nil, clientRcmgr)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()

connChan := make(chan tpt.CapableConn)
serverConnScope := mocknetwork.NewMockConnManagementScope(ctrl)
go func() {
serverRcmgr.EXPECT().OpenConnection(network.DirInbound, false).Return(serverConnScope, nil)
serverConnScope.EXPECT().SetPeer(clientID)
serverConn, err := ln.Accept()
require.NoError(t, err)
connChan <- serverConn
}()

connScope := mocknetwork.NewMockConnManagementScope(ctrl)
clientRcmgr.EXPECT().OpenConnection(network.DirOutbound, false).Return(connScope, nil)
connScope.EXPECT().SetPeer(serverID)
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
require.NoError(t, err)
serverConn := <-connChan
t.Log("received conn")
connScope.EXPECT().Done().MinTimes(1) // for dialed connections, we might call Done multiple times
conn.Close()
serverConnScope.EXPECT().Done()
serverConn.Close()
}

func TestResourceManagerDialDenied(t *testing.T) {
_, clientKey := createPeer(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()

rcmgr := mocknetwork.NewMockResourceManager(ctrl)
clientTransport, err := NewTransport(clientKey, nil, nil, rcmgr)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()

connScope := mocknetwork.NewMockConnManagementScope(ctrl)
rcmgr.EXPECT().OpenConnection(network.DirOutbound, false).Return(connScope, nil)
rerr := errors.New("nope")
p := peer.ID("server")
connScope.EXPECT().SetPeer(p).Return(rerr)
connScope.EXPECT().Done()

_, err = clientTransport.Dial(context.Background(), ma.StringCast("/ip4/127.0.0.1/udp/1234/quic"), p)
require.ErrorIs(t, err, rerr)
}

func TestResourceManagerAcceptDenied(t *testing.T) {
serverID, serverKey := createPeer(t)
clientID, clientKey := createPeer(t)
ctrl := gomock.NewController(t)
defer ctrl.Finish()

clientRcmgr := mocknetwork.NewMockResourceManager(ctrl)
clientTransport, err := NewTransport(clientKey, nil, nil, clientRcmgr)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()

serverRcmgr := mocknetwork.NewMockResourceManager(ctrl)
serverConnScope := mocknetwork.NewMockConnManagementScope(ctrl)
rerr := errors.New("denied")
gomock.InOrder(
serverRcmgr.EXPECT().OpenConnection(network.DirInbound, false).Return(serverConnScope, nil),
serverConnScope.EXPECT().SetPeer(clientID).Return(rerr),
serverConnScope.EXPECT().Done(),
)
serverTransport, err := NewTransport(serverKey, nil, nil, serverRcmgr)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()
ln, err := serverTransport.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/quic"))
require.NoError(t, err)
defer ln.Close()
connChan := make(chan tpt.CapableConn)
go func() {
ln.Accept()
close(connChan)
}()

clientConnScope := mocknetwork.NewMockConnManagementScope(ctrl)
clientRcmgr.EXPECT().OpenConnection(network.DirOutbound, false).Return(clientConnScope, nil)
clientConnScope.EXPECT().SetPeer(serverID)
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
require.NoError(t, err)
_, err = conn.AcceptStream()
require.Error(t, err)
select {
case <-connChan:
t.Fatal("didn't expect to accept a connection")
default:
}
}

func TestStreams(t *testing.T) {
serverID, serverKey := createPeer(t)
_, clientKey := createPeer(t)

serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()
ln := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic")
defer ln.Close()

clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()
conn, err := clientTransport.Dial(context.Background(), ln.Multiaddr(), serverID)
Expand All @@ -134,12 +245,12 @@ func TestHandshakeFailPeerIDMismatch(t *testing.T) {
_, clientKey := createPeer(t)
thirdPartyID, _ := createPeer(t)

serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()
ln := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic")

clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
require.NoError(t, err)
// dial, but expect the wrong peer ID
_, err = clientTransport.Dial(context.Background(), ln.Multiaddr(), thirdPartyID)
Expand Down Expand Up @@ -172,7 +283,7 @@ func TestConnectionGating(t *testing.T) {
cg := NewMockConnectionGater(mockCtrl)

t.Run("accepted connections", func(t *testing.T) {
serverTransport, err := NewTransport(serverKey, nil, cg)
serverTransport, err := NewTransport(serverKey, nil, cg, nil)
defer serverTransport.(io.Closer).Close()
require.NoError(t, err)
ln := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic")
Expand All @@ -187,7 +298,7 @@ func TestConnectionGating(t *testing.T) {
require.NoError(t, err)
}()

clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()
// make sure that connection attempts fails
Expand Down Expand Up @@ -215,7 +326,7 @@ func TestConnectionGating(t *testing.T) {
})

t.Run("secured connections", func(t *testing.T) {
serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()
ln := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic")
Expand All @@ -224,7 +335,7 @@ func TestConnectionGating(t *testing.T) {
cg := NewMockConnectionGater(mockCtrl)
cg.EXPECT().InterceptSecured(gomock.Any(), gomock.Any(), gomock.Any())

clientTransport, err := NewTransport(clientKey, nil, cg)
clientTransport, err := NewTransport(clientKey, nil, cg, nil)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()

Expand All @@ -247,12 +358,12 @@ func TestDialTwo(t *testing.T) {
_, clientKey := createPeer(t)
serverID2, serverKey2 := createPeer(t)

serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()
ln1 := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic")
defer ln1.Close()
serverTransport2, err := NewTransport(serverKey2, nil, nil)
serverTransport2, err := NewTransport(serverKey2, nil, nil, nil)
require.NoError(t, err)
defer serverTransport2.(io.Closer).Close()
ln2 := runServer(t, serverTransport2, "/ip4/127.0.0.1/udp/0/quic")
Expand All @@ -278,7 +389,7 @@ func TestDialTwo(t *testing.T) {
}
}()

clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()
c1, err := clientTransport.Dial(context.Background(), ln1.Multiaddr(), serverID)
Expand Down Expand Up @@ -329,7 +440,7 @@ func TestStatelessReset(t *testing.T) {
serverID, serverKey := createPeer(t)
_, clientKey := createPeer(t)

serverTransport, err := NewTransport(serverKey, nil, nil)
serverTransport, err := NewTransport(serverKey, nil, nil, nil)
require.NoError(t, err)
defer serverTransport.(io.Closer).Close()
ln := runServer(t, serverTransport, "/ip4/127.0.0.1/udp/0/quic")
Expand All @@ -346,7 +457,7 @@ func TestStatelessReset(t *testing.T) {
defer proxy.Close()

// establish a connection
clientTransport, err := NewTransport(clientKey, nil, nil)
clientTransport, err := NewTransport(clientKey, nil, nil, nil)
require.NoError(t, err)
defer clientTransport.(io.Closer).Close()
proxyAddr, err := toQuicMultiaddr(proxy.LocalAddr())
Expand Down Expand Up @@ -395,7 +506,7 @@ func TestHolePunching(t *testing.T) {
serverID, serverKey := createPeer(t)
clientID, clientKey := createPeer(t)

t1, err := NewTransport(serverKey, nil, nil)
t1, err := NewTransport(serverKey, nil, nil, nil)
require.NoError(t, err)
defer t1.(io.Closer).Close()
laddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/quic")
Expand All @@ -409,7 +520,7 @@ func TestHolePunching(t *testing.T) {
require.Error(t, err, "didn't expect to accept any connections")
}()

t2, err := NewTransport(clientKey, nil, nil)
t2, err := NewTransport(clientKey, nil, nil, nil)
require.NoError(t, err)
defer t2.(io.Closer).Close()
ln2, err := t2.Listen(laddr)
Expand All @@ -423,15 +534,15 @@ func TestHolePunching(t *testing.T) {
connChan := make(chan tpt.CapableConn)
go func() {
conn, err := t2.Dial(
n.WithSimultaneousConnect(context.Background(), false, ""),
network.WithSimultaneousConnect(context.Background(), false, ""),
ln1.Multiaddr(),
serverID,
)
require.NoError(t, err)
connChan <- conn
}()
conn1, err := t1.Dial(
n.WithSimultaneousConnect(context.Background(), true, ""),
network.WithSimultaneousConnect(context.Background(), true, ""),
ln2.Multiaddr(),
clientID,
)
Expand Down
Loading