Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add an integration test for muxer selection #1887

Merged
merged 4 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (cfg *Config) addTransports(h host.Host) error {
fx.ParamTags(`group:"security"`),
)),
fx.Supply(cfg.Muxers),
fx.Supply(h.ID()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? Did this not work before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The insecure transport consume the peer.ID:

func NewWithIdentity(protocolID protocol.ID, id peer.ID, key ci.PrivKey) *Transport {

The other transports consume the private key.

fx.Provide(func() host.Host { return h }),
fx.Provide(func() crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }),
fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }),
Expand Down
7 changes: 7 additions & 0 deletions p2p/net/upgrader/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/transport"
)

Expand All @@ -14,6 +15,8 @@ type transportConn struct {
transport transport.Transport
scope network.ConnManagementScope
stat network.ConnStats

muxer protocol.ID
}

var _ transport.CapableConn = &transportConn{}
Expand Down Expand Up @@ -49,3 +52,7 @@ func (t *transportConn) Close() error {
defer t.scope.Done()
return t.MuxedConn.Close()
}

func (t *transportConn) ConnState() network.ConnectionState {
return network.ConnectionState{NextProto: string(t.muxer)}
}
36 changes: 22 additions & 14 deletions p2p/net/upgrader/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
}
}

smconn, err := u.setupMuxer(ctx, sconn, server, connScope.PeerScope())
muxer, smconn, err := u.setupMuxer(ctx, sconn, server, connScope.PeerScope())
if err != nil {
sconn.Close()
return nil, fmt.Errorf("failed to negotiate stream multiplexer: %s", err)
Expand All @@ -184,6 +184,7 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
transport: t,
stat: stat,
scope: connScope,
muxer: muxer,
}
return tc, nil
}
Expand Down Expand Up @@ -234,40 +235,47 @@ func (u *upgrader) getMuxerByID(id string) *StreamMuxer {
return nil
}

func (u *upgrader) setupMuxer(ctx context.Context, conn sec.SecureConn, server bool, scope network.PeerScope) (network.MuxedConn, error) {
func (u *upgrader) setupMuxer(ctx context.Context, conn sec.SecureConn, server bool, scope network.PeerScope) (protocol.ID, network.MuxedConn, error) {
muxerSelected := conn.ConnState().NextProto
// Use muxer selected from security handshake if available. Otherwise fall back to multistream-selection.
if len(muxerSelected) > 0 {
m := u.getMuxerByID(muxerSelected)
if m == nil {
return nil, fmt.Errorf("selected a muxer we don't know: %s", muxerSelected)
return "", nil, fmt.Errorf("selected a muxer we don't know: %s", muxerSelected)
}
return m.Muxer.NewConn(conn, server, scope)
c, err := m.Muxer.NewConn(conn, server, scope)
if err != nil {
return "", nil, err
}
return protocol.ID(muxerSelected), c, nil
}

done := make(chan struct{})
type result struct {
smconn network.MuxedConn
muxerID protocol.ID
err error
}

var smconn network.MuxedConn
var err error
done := make(chan result, 1)
// TODO: The muxer should take a context.
go func() {
defer close(done)
var m *StreamMuxer
m, err = u.negotiateMuxer(conn, server)
m, err := u.negotiateMuxer(conn, server)
if err != nil {
done <- result{err: err}
return
}
smconn, err = m.Muxer.NewConn(conn, server, scope)
smconn, err := m.Muxer.NewConn(conn, server, scope)
done <- result{smconn: smconn, muxerID: m.ID, err: err}
}()

select {
case <-done:
return smconn, err
case r := <-done:
return r.muxerID, r.smconn, r.err
case <-ctx.Done():
// interrupt this process
conn.Close()
// wait to finish
<-done
return nil, ctx.Err()
return "", nil, ctx.Err()
}
}
126 changes: 126 additions & 0 deletions p2p/test/muxer-negotiation/muxer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package muxer_negotiation

import (
"context"
"crypto/rand"
"fmt"
"testing"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/sec/insecure"
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
"github.com/libp2p/go-libp2p/p2p/security/noise"
tls "github.com/libp2p/go-libp2p/p2p/security/tls"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"

"github.com/stretchr/testify/require"
)

var (
yamuxOpt = libp2p.Muxer("/yamux", yamux.DefaultTransport)
mplexOpt = libp2p.Muxer("/mplex", mplex.DefaultTransport)
)

type testcase struct {
Name string
ServerPreference []libp2p.Option
ClientPreference []libp2p.Option

Error string
Expected protocol.ID
}

type security struct {
Name string
Option libp2p.Option
}

func TestMuxerNegotiation(t *testing.T) {
testcases := []testcase{
{
Name: "server and client have the same preference",
ServerPreference: []libp2p.Option{yamuxOpt, mplexOpt},
ClientPreference: []libp2p.Option{yamuxOpt, mplexOpt},
Expected: "/yamux",
},
{
Name: "client only supports one muxer",
ServerPreference: []libp2p.Option{yamuxOpt, mplexOpt},
ClientPreference: []libp2p.Option{yamuxOpt},
Expected: "/yamux",
},
{
Name: "server only supports one muxer",
ServerPreference: []libp2p.Option{yamuxOpt},
ClientPreference: []libp2p.Option{mplexOpt, yamuxOpt},
Expected: "/yamux",
},
{
Name: "client preference preferred",
ServerPreference: []libp2p.Option{yamuxOpt, mplexOpt},
ClientPreference: []libp2p.Option{mplexOpt, yamuxOpt},
Expected: "/mplex",
},
{
Name: "no preference overlap",
ServerPreference: []libp2p.Option{yamuxOpt},
ClientPreference: []libp2p.Option{mplexOpt},
Error: "failed to negotiate stream multiplexer: protocol not supported",
},
}

clientID, _, err := crypto.GenerateEd25519Key(rand.Reader)
require.NoError(t, err)
serverID, _, err := crypto.GenerateEd25519Key(rand.Reader)
require.NoError(t, err)

securities := []security{
{Name: "noise", Option: libp2p.Security("/noise", noise.New)},
{Name: "tls", Option: libp2p.Security("/tls", tls.New)},
{Name: "insecure", Option: libp2p.Security("/insecure", insecure.NewWithIdentity)},
}

for _, tc := range testcases {
tc := tc

for _, sec := range securities {
sec := sec

t.Run(fmt.Sprintf("%s: %s", sec.Name, tc.Name), func(t *testing.T) {
server, err := libp2p.New(
libp2p.Identity(serverID),
sec.Option,
libp2p.ChainOptions(tc.ServerPreference...),
libp2p.Transport(tcp.NewTCPTransport),
libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"),
)
require.NoError(t, err)

client, err := libp2p.New(
libp2p.Identity(clientID),
sec.Option,
libp2p.ChainOptions(tc.ClientPreference...),
libp2p.Transport(tcp.NewTCPTransport),
libp2p.NoListenAddrs,
)
require.NoError(t, err)

err = client.Connect(context.Background(), peer.AddrInfo{ID: server.ID(), Addrs: server.Addrs()})
if tc.Error != "" {
require.Error(t, err)
require.ErrorContains(t, err, tc.Error)
return
}

require.NoError(t, err)
conns := client.Network().ConnsToPeer(server.ID())
require.Len(t, conns, 1, "expected exactly one connection")
require.Equal(t, tc.Expected, protocol.ID(conns[0].ConnState().NextProto))
})
}
}
}