Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove muxer-multistream.Transport and refactor upgrader #1861

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (cfg *Config) addTransports(h host.Host) error {
return err
}
}
muxer, err := makeMuxer(h, cfg.Muxers)
muxer, err := makeMsMuxer(h, cfg.Muxers)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions config/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
msmux "github.com/libp2p/go-libp2p/p2p/muxer/muxer-multistream"
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
)

// MuxC is a stream multiplex transport constructor.
Expand Down Expand Up @@ -43,8 +43,8 @@ func MuxerConstructor(m interface{}) (MuxC, error) {
}, nil
}

func makeMuxer(h host.Host, tpts []MsMuxC) (network.Multiplexer, error) {
muxMuxer := msmux.NewBlankTransport()
func makeMsMuxer(h host.Host, tpts []MsMuxC) (tptu.MsTransport, error) {
muxMuxer := tptu.NewMsTransport()
transportSet := make(map[string]struct{}, len(tpts))
for _, tptC := range tpts {
if _, ok := transportSet[tptC.ID]; ok {
Expand All @@ -57,7 +57,7 @@ func makeMuxer(h host.Host, tpts []MsMuxC) (network.Multiplexer, error) {
if err != nil {
return nil, err
}
muxMuxer.AddTransport(tptC.ID, tpt)
muxMuxer.AddMuxer(tptC.ID, tpt)
}
return muxMuxer, nil
}
2 changes: 1 addition & 1 deletion config/muxer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestCatchDuplicateTransportsMuxer(t *testing.T) {
}
for testName, test := range tests {
t.Run(testName, func(t *testing.T) {
_, err = makeMuxer(test.h, test.transports)
_, err = makeMsMuxer(test.h, test.transports)
if err != nil {
if err.Error() != test.expectedError {
t.Errorf(
Expand Down
5 changes: 2 additions & 3 deletions p2p/net/swarm/dial_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/libp2p/go-libp2p/core/sec/insecure"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
msmux "github.com/libp2p/go-libp2p/p2p/muxer/muxer-multistream"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream"
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
Expand Down Expand Up @@ -79,8 +78,8 @@ func makeUpgrader(t *testing.T, n *Swarm) transport.Upgrader {
secMuxer := new(csms.SSMuxer)
secMuxer.AddTransport(insecure.ID, insecure.NewWithIdentity(id, pk))

stMuxer := msmux.NewBlankTransport()
stMuxer.AddTransport("/yamux/1.0.0", yamux.DefaultTransport)
stMuxer := tptu.NewMsTransport()
stMuxer.AddMuxer("/yamux/1.0.0", yamux.DefaultTransport)
u, err := tptu.New(secMuxer, stMuxer)
require.NoError(t, err)
return u
Expand Down
5 changes: 2 additions & 3 deletions p2p/net/swarm/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/libp2p/go-libp2p/core/sec/insecure"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
msmux "github.com/libp2p/go-libp2p/p2p/muxer/muxer-multistream"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
Expand Down Expand Up @@ -105,8 +104,8 @@ func GenUpgrader(t *testing.T, n *swarm.Swarm, opts ...tptu.Option) transport.Up
secMuxer := new(csms.SSMuxer)
secMuxer.AddTransport(insecure.ID, insecure.NewWithIdentity(id, pk))

stMuxer := msmux.NewBlankTransport()
stMuxer.AddTransport("/yamux/1.0.0", yamux.DefaultTransport)
stMuxer := tptu.NewMsTransport()
stMuxer.AddMuxer("/yamux/1.0.0", yamux.DefaultTransport)
u, err := tptu.New(secMuxer, stMuxer, opts...)
require.NoError(t, err)
return u
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
// Package muxer_multistream implements a peerstream transport using
// go-multistream to select the underlying stream muxer
package muxer_multistream
package upgrader

import (
"fmt"
Expand All @@ -14,6 +12,17 @@ import (

var DefaultNegotiateTimeout = time.Second * 60

type Multiplexer struct {
ID string
StreamMuxer network.Multiplexer
}

type MsTransport interface {
julian88110 marked this conversation as resolved.
Show resolved Hide resolved
julian88110 marked this conversation as resolved.
Show resolved Hide resolved
AddMuxer(path string, tpt network.Multiplexer)
NegotiateMuxer(nc net.Conn, isServer bool) (*Multiplexer, error)
GetTransportByKey(key string) (network.Multiplexer, bool)
}

type Transport struct {
mux *mss.MultistreamMuxer

Expand All @@ -24,21 +33,21 @@ type Transport struct {
OrderPreference []string
}

func NewBlankTransport() *Transport {
func NewMsTransport() MsTransport {
return &Transport{
mux: mss.NewMultistreamMuxer(),
tpts: make(map[string]network.Multiplexer),
NegotiateTimeout: DefaultNegotiateTimeout,
}
}

func (t *Transport) AddTransport(path string, tpt network.Multiplexer) {
func (t *Transport) AddMuxer(path string, tpt network.Multiplexer) {
t.mux.AddHandler(path, nil)
t.tpts[path] = tpt
t.OrderPreference = append(t.OrderPreference, path)
}

func (t *Transport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) {
func (t *Transport) NegotiateMuxer(nc net.Conn, isServer bool) (*Multiplexer, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only function you need – and it's just a thin wrapper around go-multistream. I would say:

  1. we move this to the upgrader (No one else really cares about this package (I think)).
  2. Use a ctx to set the negotiate timeout
  3. Make it private
  4. Delete this whole package. less code!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Agreed, see https://github.com/libp2p/go-libp2p/pull/1861/files#r1017265425.
  2. I wish that was possible, but unfortunately the MultistreamMuxer doesn't take a context (because there's no io.Reader that takes a context), so we can't use a context here. Anyway, fixing this seems orthogonal to this PR.
  3. Yes please!
  4. I assume you mean struct, not package. This code is now living in the upgrader package :)

if t.NegotiateTimeout != 0 {
if err := nc.SetDeadline(time.Now().Add(t.NegotiateTimeout)); err != nil {
return nil, err
Expand Down Expand Up @@ -70,8 +79,10 @@ func (t *Transport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope)
if !ok {
return nil, fmt.Errorf("selected protocol we don't have a transport for")
}

return tpt.NewConn(nc, isServer, scope)
return &Multiplexer{
ID: proto,
StreamMuxer: tpt,
}, nil
}

func (t *Transport) GetTransportByKey(key string) (network.Multiplexer, bool) {
Expand Down
22 changes: 12 additions & 10 deletions p2p/net/upgrader/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
ipnet "github.com/libp2p/go-libp2p/core/pnet"
"github.com/libp2p/go-libp2p/core/sec"
"github.com/libp2p/go-libp2p/core/transport"
msmux "github.com/libp2p/go-libp2p/p2p/muxer/muxer-multistream"
"github.com/libp2p/go-libp2p/p2p/net/pnet"

manet "github.com/multiformats/go-multiaddr/net"
Expand Down Expand Up @@ -61,8 +60,8 @@ func WithResourceManager(m network.ResourceManager) Option {
// Upgrader is a multistream upgrader that can upgrade an underlying connection
// to a full transport connection (secure and multiplexed).
type upgrader struct {
secure sec.SecureMuxer
muxer network.Multiplexer
secure sec.SecureMuxer
mstream MsTransport

psk ipnet.PSK
connGater connmgr.ConnectionGater
Expand All @@ -78,10 +77,10 @@ type upgrader struct {

var _ transport.Upgrader = &upgrader{}

func New(secureMuxer sec.SecureMuxer, muxer network.Multiplexer, opts ...Option) (transport.Upgrader, error) {
func New(secureMuxer sec.SecureMuxer, mstream MsTransport, opts ...Option) (transport.Upgrader, error) {
u := &upgrader{
secure: secureMuxer,
muxer: muxer,
mstream: mstream,
acceptTimeout: defaultAcceptTimeout,
}
for _, opt := range opts {
Expand Down Expand Up @@ -196,11 +195,10 @@ func (u *upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID,
}

func (u *upgrader) setupMuxer(ctx context.Context, conn sec.SecureConn, server bool, scope network.PeerScope) (network.MuxedConn, error) {
msmuxer, ok := u.muxer.(*msmux.Transport)
muxerSelected := conn.ConnState().NextProto
// Use muxer selected from security handshake if available. Otherwise fall back to multistream-selection.
if ok && len(muxerSelected) > 0 {
tpt, ok := msmuxer.GetTransportByKey(muxerSelected)
if len(muxerSelected) > 0 {
tpt, ok := u.mstream.GetTransportByKey(muxerSelected)
if !ok {
return nil, fmt.Errorf("selected a muxer we don't know: %s", muxerSelected)
}
Expand All @@ -212,10 +210,14 @@ func (u *upgrader) setupMuxer(ctx context.Context, conn sec.SecureConn, server b

var smconn network.MuxedConn
var err error
// TODO: The muxer should take a context.
var streamMuxer *Multiplexer

go func() {
defer close(done)
smconn, err = u.muxer.NewConn(conn, server, scope)
streamMuxer, err = u.mstream.NegotiateMuxer(conn, server)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think NegotiateMuxer should take a ctx and then we don't have to do this select dance below and we don't have to spawn a go routine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not possible since the MultistreamMuxer doesn't take a context (see my comment in https://github.com/libp2p/go-libp2p/pull/1861/files#r1018835597). Let's not touch timeouts in this PR.

if err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Return early if err != nil.

smconn, err = streamMuxer.StreamMuxer.NewConn(conn, server, scope)
}
}()

select {
Expand Down
42 changes: 33 additions & 9 deletions p2p/net/upgrader/upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/libp2p/go-libp2p/core/test"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
"github.com/libp2p/go-libp2p/p2p/net/upgrader"
upgrader "github.com/libp2p/go-libp2p/p2p/net/upgrader"

"github.com/golang/mock/gomock"
ma "github.com/multiformats/go-multiaddr"
Expand All @@ -26,7 +26,7 @@ func createUpgrader(t *testing.T, opts ...upgrader.Option) (peer.ID, transport.U
return createUpgraderWithMuxer(t, &negotiatingMuxer{}, opts...)
}

func createUpgraderWithMuxer(t *testing.T, muxer network.Multiplexer, opts ...upgrader.Option) (peer.ID, transport.Upgrader) {
func createUpgraderWithMuxer(t *testing.T, muxer upgrader.MsTransport, opts ...upgrader.Option) (peer.ID, transport.Upgrader) {
priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
require.NoError(t, err)
id, err := peer.IDFromPrivateKey(priv)
Expand All @@ -40,7 +40,9 @@ func createUpgraderWithMuxer(t *testing.T, muxer network.Multiplexer, opts ...up
// It makes sure that this happens at the same time for client and server.
type negotiatingMuxer struct{}

func (m *negotiatingMuxer) NewConn(c net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) {
var _ upgrader.MsTransport = &negotiatingMuxer{}

func (m *negotiatingMuxer) NegotiateMuxer(c net.Conn, isServer bool) (*upgrader.Multiplexer, error) {
var err error
// run a fake muxer negotiation
if isServer {
Expand All @@ -51,23 +53,39 @@ func (m *negotiatingMuxer) NewConn(c net.Conn, isServer bool, scope network.Peer
if err != nil {
return nil, err
}
return yamux.DefaultTransport.NewConn(c, isServer, scope)

return &upgrader.Multiplexer{
ID: "/yamux/1.0.0",
StreamMuxer: yamux.DefaultTransport,
}, nil
}

func (m *negotiatingMuxer) AddMuxer(path string, tpt network.Multiplexer) {}

func (m *negotiatingMuxer) GetTransportByKey(key string) (network.Multiplexer, bool) {
return nil, false
}

// blockingMuxer blocks the muxer negotiation until the contain chan is closed
type blockingMuxer struct {
unblock chan struct{}
}

var _ network.Multiplexer = &blockingMuxer{}
var _ upgrader.MsTransport = &blockingMuxer{}

func newBlockingMuxer() *blockingMuxer {
return &blockingMuxer{unblock: make(chan struct{})}
}

func (m *blockingMuxer) NewConn(c net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) {
func (m *blockingMuxer) AddMuxer(path string, tpt network.Multiplexer) {}

func (m *blockingMuxer) GetTransportByKey(key string) (network.Multiplexer, bool) {
return nil, false
}

func (m *blockingMuxer) NegotiateMuxer(c net.Conn, isServer bool) (*upgrader.Multiplexer, error) {
<-m.unblock
return (&negotiatingMuxer{}).NewConn(c, isServer, scope)
return (&negotiatingMuxer{}).NegotiateMuxer(c, isServer)
}

func (m *blockingMuxer) Unblock() {
Expand All @@ -77,12 +95,18 @@ func (m *blockingMuxer) Unblock() {
// errorMuxer is a muxer that errors while setting up
type errorMuxer struct{}

var _ network.Multiplexer = &errorMuxer{}
var _ upgrader.MsTransport = &errorMuxer{}

func (m *errorMuxer) NewConn(c net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) {
func (m *errorMuxer) NegotiateMuxer(c net.Conn, isServer bool) (*upgrader.Multiplexer, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a different test now. The old test errored when creating a muxed connection. This now errors when negotiation. I don't think we should change this test, but it would maybe we should add another test if it makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the multistream transport itself is removed, this test is no longer relevant so I think it should be removed now.

return nil, errors.New("mux error")
}

func (m *errorMuxer) AddMuxer(path string, tpt network.Multiplexer) {}

func (m *errorMuxer) GetTransportByKey(string) (network.Multiplexer, bool) {
return nil, false
}

func testConn(t *testing.T, clientConn, serverConn transport.CapableConn) {
t.Helper()
require := require.New(t)
Expand Down
18 changes: 12 additions & 6 deletions p2p/transport/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@ import (
"github.com/stretchr/testify/require"
)

func makeMsTransport() tptu.MsTransport {
muxer := tptu.NewMsTransport()
muxer.AddMuxer("/yamux/1.0.0", yamux.DefaultTransport)
return muxer
}

func TestTcpTransport(t *testing.T) {
for i := 0; i < 2; i++ {
peerA, ia := makeInsecureMuxer(t)
_, ib := makeInsecureMuxer(t)

ua, err := tptu.New(ia, yamux.DefaultTransport)
ua, err := tptu.New(ia, makeMsTransport())
require.NoError(t, err)
ta, err := NewTCPTransport(ua, nil)
require.NoError(t, err)
ub, err := tptu.New(ib, yamux.DefaultTransport)
ub, err := tptu.New(ib, makeMsTransport())
require.NoError(t, err)
tb, err := NewTCPTransport(ub, nil)
require.NoError(t, err)
Expand All @@ -48,11 +54,11 @@ func TestTcpTransportWithMetrics(t *testing.T) {
peerA, ia := makeInsecureMuxer(t)
_, ib := makeInsecureMuxer(t)

ua, err := tptu.New(ia, yamux.DefaultTransport)
ua, err := tptu.New(ia, makeMsTransport())
require.NoError(t, err)
ta, err := NewTCPTransport(ua, nil, WithMetrics())
require.NoError(t, err)
ub, err := tptu.New(ib, yamux.DefaultTransport)
ub, err := tptu.New(ib, makeMsTransport())
require.NoError(t, err)
tb, err := NewTCPTransport(ub, nil, WithMetrics())
require.NoError(t, err)
Expand All @@ -68,15 +74,15 @@ func TestResourceManager(t *testing.T) {
peerA, ia := makeInsecureMuxer(t)
_, ib := makeInsecureMuxer(t)

ua, err := tptu.New(ia, yamux.DefaultTransport)
ua, err := tptu.New(ia, makeMsTransport())
require.NoError(t, err)
ta, err := NewTCPTransport(ua, nil)
require.NoError(t, err)
ln, err := ta.Listen(ma.StringCast("/ip4/127.0.0.1/tcp/0"))
require.NoError(t, err)
defer ln.Close()

ub, err := tptu.New(ib, yamux.DefaultTransport)
ub, err := tptu.New(ib, makeMsTransport())
require.NoError(t, err)
rcmgr := mocknetwork.NewMockResourceManager(ctrl)
tb, err := NewTCPTransport(ub, rcmgr)
Expand Down
10 changes: 8 additions & 2 deletions p2p/transport/websocket/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,16 @@ import (
"github.com/stretchr/testify/require"
)

func makeMsTransport() tptu.MsTransport {
muxer := tptu.NewMsTransport()
muxer.AddMuxer("/yamux/1.0.0", yamux.DefaultTransport)
return muxer
}

func newUpgrader(t *testing.T) (peer.ID, transport.Upgrader) {
t.Helper()
id, m := newInsecureMuxer(t)
u, err := tptu.New(m, yamux.DefaultTransport)
u, err := tptu.New(m, makeMsTransport())
julian88110 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Fatal(err)
}
Expand All @@ -49,7 +55,7 @@ func newUpgrader(t *testing.T) (peer.ID, transport.Upgrader) {
func newSecureUpgrader(t *testing.T) (peer.ID, transport.Upgrader) {
t.Helper()
id, m := newSecureMuxer(t)
u, err := tptu.New(m, yamux.DefaultTransport)
u, err := tptu.New(m, makeMsTransport())
if err != nil {
t.Fatal(err)
}
Expand Down