Skip to content

Commit

Permalink
Merge pull request #1887 from libp2p/muxer-integration-test
Browse files Browse the repository at this point in the history
add an integration test for muxer selection
  • Loading branch information
marten-seemann authored Nov 17, 2022
2 parents 7357a00 + d813808 commit da9005d
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 14 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (cfg *Config) addTransports(h host.Host) error {
fx.ParamTags(`group:"security"`),
)),
fx.Supply(cfg.Muxers),
fx.Supply(h.ID()),
fx.Provide(func() host.Host { return h }),
fx.Provide(func() crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }),
fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }),
Expand Down
7 changes: 7 additions & 0 deletions p2p/net/upgrader/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/transport"
)

Expand All @@ -14,6 +15,8 @@ type transportConn struct {
transport transport.Transport
scope network.ConnManagementScope
stat network.ConnStats

muxer protocol.ID
}

var _ transport.CapableConn = &transportConn{}
Expand Down Expand Up @@ -49,3 +52,7 @@ func (t *transportConn) Close() error {
defer t.scope.Done()
return t.MuxedConn.Close()
}

func (t *transportConn) ConnState() network.ConnectionState {
return network.ConnectionState{NextProto: string(t.muxer)}
}
36 changes: 22 additions & 14 deletions p2p/net/upgrader/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
}
}

smconn, err := u.setupMuxer(ctx, sconn, server, connScope.PeerScope())
muxer, smconn, err := u.setupMuxer(ctx, sconn, server, connScope.PeerScope())
if err != nil {
sconn.Close()
return nil, fmt.Errorf("failed to negotiate stream multiplexer: %s", err)
Expand All @@ -184,6 +184,7 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
transport: t,
stat: stat,
scope: connScope,
muxer: muxer,
}
return tc, nil
}
Expand Down Expand Up @@ -234,40 +235,47 @@ func (u *upgrader) getMuxerByID(id string) *StreamMuxer {
return nil
}

func (u *upgrader) setupMuxer(ctx context.Context, conn sec.SecureConn, server bool, scope network.PeerScope) (network.MuxedConn, error) {
func (u *upgrader) setupMuxer(ctx context.Context, conn sec.SecureConn, server bool, scope network.PeerScope) (protocol.ID, network.MuxedConn, error) {
muxerSelected := conn.ConnState().NextProto
// Use muxer selected from security handshake if available. Otherwise fall back to multistream-selection.
if len(muxerSelected) > 0 {
m := u.getMuxerByID(muxerSelected)
if m == nil {
return nil, fmt.Errorf("selected a muxer we don't know: %s", muxerSelected)
return "", nil, fmt.Errorf("selected a muxer we don't know: %s", muxerSelected)
}
return m.Muxer.NewConn(conn, server, scope)
c, err := m.Muxer.NewConn(conn, server, scope)
if err != nil {
return "", nil, err
}
return protocol.ID(muxerSelected), c, nil
}

done := make(chan struct{})
type result struct {
smconn network.MuxedConn
muxerID protocol.ID
err error
}

var smconn network.MuxedConn
var err error
done := make(chan result, 1)
// TODO: The muxer should take a context.
go func() {
defer close(done)
var m *StreamMuxer
m, err = u.negotiateMuxer(conn, server)
m, err := u.negotiateMuxer(conn, server)
if err != nil {
done <- result{err: err}
return
}
smconn, err = m.Muxer.NewConn(conn, server, scope)
smconn, err := m.Muxer.NewConn(conn, server, scope)
done <- result{smconn: smconn, muxerID: m.ID, err: err}
}()

select {
case <-done:
return smconn, err
case r := <-done:
return r.muxerID, r.smconn, r.err
case <-ctx.Done():
// interrupt this process
conn.Close()
// wait to finish
<-done
return nil, ctx.Err()
return "", nil, ctx.Err()
}
}
126 changes: 126 additions & 0 deletions p2p/test/muxer-negotiation/muxer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package muxer_negotiation

import (
"context"
"crypto/rand"
"fmt"
"testing"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/sec/insecure"
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
"github.com/libp2p/go-libp2p/p2p/security/noise"
tls "github.com/libp2p/go-libp2p/p2p/security/tls"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"

"github.com/stretchr/testify/require"
)

var (
yamuxOpt = libp2p.Muxer("/yamux", yamux.DefaultTransport)
mplexOpt = libp2p.Muxer("/mplex", mplex.DefaultTransport)
)

type testcase struct {
Name string
ServerPreference []libp2p.Option
ClientPreference []libp2p.Option

Error string
Expected protocol.ID
}

type security struct {
Name string
Option libp2p.Option
}

func TestMuxerNegotiation(t *testing.T) {
testcases := []testcase{
{
Name: "server and client have the same preference",
ServerPreference: []libp2p.Option{yamuxOpt, mplexOpt},
ClientPreference: []libp2p.Option{yamuxOpt, mplexOpt},
Expected: "/yamux",
},
{
Name: "client only supports one muxer",
ServerPreference: []libp2p.Option{yamuxOpt, mplexOpt},
ClientPreference: []libp2p.Option{yamuxOpt},
Expected: "/yamux",
},
{
Name: "server only supports one muxer",
ServerPreference: []libp2p.Option{yamuxOpt},
ClientPreference: []libp2p.Option{mplexOpt, yamuxOpt},
Expected: "/yamux",
},
{
Name: "client preference preferred",
ServerPreference: []libp2p.Option{yamuxOpt, mplexOpt},
ClientPreference: []libp2p.Option{mplexOpt, yamuxOpt},
Expected: "/mplex",
},
{
Name: "no preference overlap",
ServerPreference: []libp2p.Option{yamuxOpt},
ClientPreference: []libp2p.Option{mplexOpt},
Error: "failed to negotiate stream multiplexer: protocol not supported",
},
}

clientID, _, err := crypto.GenerateEd25519Key(rand.Reader)
require.NoError(t, err)
serverID, _, err := crypto.GenerateEd25519Key(rand.Reader)
require.NoError(t, err)

securities := []security{
{Name: "noise", Option: libp2p.Security("/noise", noise.New)},
{Name: "tls", Option: libp2p.Security("/tls", tls.New)},
{Name: "insecure", Option: libp2p.Security("/insecure", insecure.NewWithIdentity)},
}

for _, tc := range testcases {
tc := tc

for _, sec := range securities {
sec := sec

t.Run(fmt.Sprintf("%s: %s", sec.Name, tc.Name), func(t *testing.T) {
server, err := libp2p.New(
libp2p.Identity(serverID),
sec.Option,
libp2p.ChainOptions(tc.ServerPreference...),
libp2p.Transport(tcp.NewTCPTransport),
libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"),
)
require.NoError(t, err)

client, err := libp2p.New(
libp2p.Identity(clientID),
sec.Option,
libp2p.ChainOptions(tc.ClientPreference...),
libp2p.Transport(tcp.NewTCPTransport),
libp2p.NoListenAddrs,
)
require.NoError(t, err)

err = client.Connect(context.Background(), peer.AddrInfo{ID: server.ID(), Addrs: server.Addrs()})
if tc.Error != "" {
require.Error(t, err)
require.ErrorContains(t, err, tc.Error)
return
}

require.NoError(t, err)
conns := client.Network().ConnsToPeer(server.ID())
require.Len(t, conns, 1, "expected exactly one connection")
require.Equal(t, tc.Expected, protocol.ID(conns[0].ConnState().NextProto))
})
}
}
}

0 comments on commit da9005d

Please sign in to comment.