Skip to content

Commit

Permalink
upgrader: absorb the muxer_multistream.Transport into the upgrader
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Nov 16, 2022
1 parent d8d2efa commit 96ccea1
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 175 deletions.
15 changes: 2 additions & 13 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/pnet"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/host/autonat"
Expand Down Expand Up @@ -71,7 +70,7 @@ type Config struct {
PeerKey crypto.PrivKey

Transports []fx.Option
Muxers []Muxer
Muxers []tptu.StreamMuxer
SecurityTransports []fx.Option
Insecure bool
PSK pnet.PSK
Expand Down Expand Up @@ -168,31 +167,21 @@ func (cfg *Config) addTransports(h host.Host) error {
return fmt.Errorf("swarm does not support transports")
}

muxers := make([]protocol.ID, 0, len(cfg.Muxers))
for _, m := range cfg.Muxers {
muxers = append(muxers, m.ID)
}

var security []fx.Option
if cfg.Insecure {
security = append(security, fx.Provide(makeInsecureTransport))
} else {
security = cfg.SecurityTransports
}
muxer, err := makeMuxer(cfg.Muxers)
if err != nil {
return err
}

fxopts := []fx.Option{
fx.WithLogger(func() fxevent.Logger { return getFXLogger() }),
fx.Provide(tptu.New),
fx.Provide(func() network.Multiplexer { return muxer }),
fx.Provide(fx.Annotate(
makeSecurityMuxer,
fx.ParamTags(`group:"security"`),
)),
fx.Supply(muxers),
fx.Supply(cfg.Muxers),
fx.Provide(func() host.Host { return h }),
fx.Provide(func() crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }),
fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }),
Expand Down
29 changes: 0 additions & 29 deletions config/muxer.go

This file was deleted.

3 changes: 2 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"

Expand Down Expand Up @@ -106,7 +107,7 @@ var NoSecurity Option = func(cfg *Config) error {
// name is the protocol name.
func Muxer(name string, muxer network.Multiplexer) Option {
return func(cfg *Config) error {
cfg.Muxers = append(cfg.Muxers, config.Muxer{Multiplexer: muxer, ID: protocol.ID(name)})
cfg.Muxers = append(cfg.Muxers, tptu.StreamMuxer{Muxer: muxer, ID: protocol.ID(name)})
return nil
}
}
Expand Down
80 changes: 0 additions & 80 deletions p2p/muxer/muxer-multistream/multistream.go

This file was deleted.

5 changes: 1 addition & 4 deletions p2p/net/swarm/dial_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/libp2p/go-libp2p/core/sec/insecure"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
msmux "github.com/libp2p/go-libp2p/p2p/muxer/muxer-multistream"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream"
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
Expand Down Expand Up @@ -79,9 +78,7 @@ func makeUpgrader(t *testing.T, n *Swarm) transport.Upgrader {
secMuxer := new(csms.SSMuxer)
secMuxer.AddTransport(insecure.ID, insecure.NewWithIdentity(insecure.ID, id, pk))

stMuxer := msmux.NewBlankTransport()
stMuxer.AddTransport("/yamux/1.0.0", yamux.DefaultTransport)
u, err := tptu.New(secMuxer, stMuxer, nil, nil, nil)
u, err := tptu.New(secMuxer, []tptu.StreamMuxer{{ID: "/yamux/1.0.0", Muxer: yamux.DefaultTransport}}, nil, nil, nil)
require.NoError(t, err)
return u
}
Expand Down
5 changes: 1 addition & 4 deletions p2p/net/swarm/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/libp2p/go-libp2p/core/sec/insecure"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
msmux "github.com/libp2p/go-libp2p/p2p/muxer/muxer-multistream"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
csms "github.com/libp2p/go-libp2p/p2p/net/conn-security-multistream"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
Expand Down Expand Up @@ -105,9 +104,7 @@ func GenUpgrader(t *testing.T, n *swarm.Swarm, connGater connmgr.ConnectionGater
secMuxer := new(csms.SSMuxer)
secMuxer.AddTransport(insecure.ID, insecure.NewWithIdentity(insecure.ID, id, pk))

stMuxer := msmux.NewBlankTransport()
stMuxer.AddTransport("/yamux/1.0.0", yamux.DefaultTransport)
u, err := tptu.New(secMuxer, stMuxer, nil, nil, connGater, opts...)
u, err := tptu.New(secMuxer, []tptu.StreamMuxer{{ID: "/yamux/1.0.0", Muxer: yamux.DefaultTransport}}, nil, nil, connGater, opts...)
require.NoError(t, err)
return u
}
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/upgrader/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestConnectionsClosedIfNotAccepted(t *testing.T) {
func TestFailedUpgradeOnListen(t *testing.T) {
require := require.New(t)

id, u := createUpgraderWithMuxer(t, &errorMuxer{}, nil, nil)
id, u := createUpgraderWithMuxers(t, []upgrader.StreamMuxer{{ID: "errorMuxer", Muxer: &errorMuxer{}}}, nil, nil)
ln := createListener(t, u)

errCh := make(chan error)
Expand Down Expand Up @@ -225,7 +225,7 @@ func TestConcurrentAccept(t *testing.T) {
var num = 3 * upgrader.AcceptQueueLength

blockingMuxer := newBlockingMuxer()
id, u := createUpgraderWithMuxer(t, blockingMuxer, nil, nil)
id, u := createUpgraderWithMuxers(t, []upgrader.StreamMuxer{{ID: "blockingMuxer", Muxer: blockingMuxer}}, nil, nil)
ln := createListener(t, u)
defer ln.Close()

Expand Down
84 changes: 72 additions & 12 deletions p2p/net/upgrader/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
ipnet "github.com/libp2p/go-libp2p/core/pnet"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/sec"
"github.com/libp2p/go-libp2p/core/transport"
msmux "github.com/libp2p/go-libp2p/p2p/muxer/muxer-multistream"
"github.com/libp2p/go-libp2p/p2p/net/pnet"

manet "github.com/multiformats/go-multiaddr/net"
mss "github.com/multiformats/go-multistream"
)

// ErrNilPeer is returned when attempting to upgrade an outbound connection
Expand All @@ -26,7 +27,10 @@ var ErrNilPeer = errors.New("nil peer")
// AcceptQueueLength is the number of connections to fully setup before not accepting any new connections
var AcceptQueueLength = 16

const defaultAcceptTimeout = 15 * time.Second
const (
defaultAcceptTimeout = 15 * time.Second
defaultNegotiateTimeout = 60 * time.Second
)

type Option func(*upgrader) error

Expand All @@ -37,16 +41,24 @@ func WithAcceptTimeout(t time.Duration) Option {
}
}

type StreamMuxer struct {
ID protocol.ID
Muxer network.Multiplexer
}

// Upgrader is a multistream upgrader that can upgrade an underlying connection
// to a full transport connection (secure and multiplexed).
type upgrader struct {
secure sec.SecureMuxer
muxer network.Multiplexer

psk ipnet.PSK
connGater connmgr.ConnectionGater
rcmgr network.ResourceManager

msmuxer *mss.MultistreamMuxer
muxers []StreamMuxer
muxerIDs []string

// AcceptTimeout is the maximum duration an Accept is allowed to take.
// This includes the time between accepting the raw network connection,
// protocol selection as well as the handshake, if applicable.
Expand All @@ -57,14 +69,15 @@ type upgrader struct {

var _ transport.Upgrader = &upgrader{}

func New(secureMuxer sec.SecureMuxer, muxer network.Multiplexer, psk ipnet.PSK, rcmgr network.ResourceManager, connGater connmgr.ConnectionGater, opts ...Option) (transport.Upgrader, error) {
func New(secureMuxer sec.SecureMuxer, muxers []StreamMuxer, psk ipnet.PSK, rcmgr network.ResourceManager, connGater connmgr.ConnectionGater, opts ...Option) (transport.Upgrader, error) {
u := &upgrader{
secure: secureMuxer,
muxer: muxer,
acceptTimeout: defaultAcceptTimeout,
rcmgr: rcmgr,
connGater: connGater,
psk: psk,
msmuxer: mss.NewMultistreamMuxer(),
muxers: muxers,
}
for _, opt := range opts {
if err := opt(u); err != nil {
Expand All @@ -74,6 +87,11 @@ func New(secureMuxer sec.SecureMuxer, muxer network.Multiplexer, psk ipnet.PSK,
if u.rcmgr == nil {
u.rcmgr = &network.NullResourceManager{}
}
u.muxerIDs = make([]string, 0, len(muxers))
for _, m := range muxers {
u.msmuxer.AddHandler(string(m.ID), nil)
u.muxerIDs = append(u.muxerIDs, string(m.ID))
}
return u, nil
}

Expand Down Expand Up @@ -177,17 +195,54 @@ func (u *upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID,
return u.secure.SecureOutbound(ctx, conn, p)
}

func (u *upgrader) negotiateMuxer(nc net.Conn, isServer bool) (*StreamMuxer, error) {
if err := nc.SetDeadline(time.Now().Add(defaultNegotiateTimeout)); err != nil {
return nil, err
}

var proto string
if isServer {
selected, _, err := u.msmuxer.Negotiate(nc)
if err != nil {
return nil, err
}
proto = selected
} else {
selected, err := mss.SelectOneOf(u.muxerIDs, nc)
if err != nil {
return nil, err
}
proto = selected
}

if err := nc.SetDeadline(time.Time{}); err != nil {
return nil, err
}

if m := u.getMuxerByID(proto); m != nil {
return m, nil
}
return nil, fmt.Errorf("selected protocol we don't have a transport for")
}

func (u *upgrader) getMuxerByID(id string) *StreamMuxer {
for _, m := range u.muxers {
if string(m.ID) == id {
return &m
}
}
return nil
}

func (u *upgrader) setupMuxer(ctx context.Context, conn sec.SecureConn, server bool, scope network.PeerScope) (network.MuxedConn, error) {
msmuxer, ok := u.muxer.(*msmux.Transport)
muxerSelected := conn.ConnState().NextProto
// Use muxer selected from security handshake if available. Otherwise fall back to multistream-selection.
if ok && len(muxerSelected) > 0 {
tpt, ok := msmuxer.GetTransportByKey(muxerSelected)
if !ok {
if len(muxerSelected) > 0 {
m := u.getMuxerByID(muxerSelected)
if m == nil {
return nil, fmt.Errorf("selected a muxer we don't know: %s", muxerSelected)
}

return tpt.NewConn(conn, server, scope)
return m.Muxer.NewConn(conn, server, scope)
}

done := make(chan struct{})
Expand All @@ -197,7 +252,12 @@ func (u *upgrader) setupMuxer(ctx context.Context, conn sec.SecureConn, server b
// TODO: The muxer should take a context.
go func() {
defer close(done)
smconn, err = u.muxer.NewConn(conn, server, scope)
var m *StreamMuxer
m, err = u.negotiateMuxer(conn, server)
if err != nil {
return
}
smconn, err = m.Muxer.NewConn(conn, server, scope)
}()

select {
Expand Down
Loading

0 comments on commit 96ccea1

Please sign in to comment.