Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: use fx dependency injection to construct transports #1858

Merged
merged 8 commits into from
Nov 10, 2022
99 changes: 56 additions & 43 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/pnet"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/core/sec"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/host/autonat"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
Expand All @@ -28,13 +28,12 @@ import (
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"

logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
"go.uber.org/fx"
"go.uber.org/fx/fxevent"
)

var log = logging.Logger("p2p-config")

// AddrsFactory is a function that takes a set of multiaddrs we're listening on and
// returns the set of multiaddrs we should advertise to the network.
type AddrsFactory = bhost.AddrsFactory
Expand Down Expand Up @@ -71,9 +70,9 @@ type Config struct {

PeerKey crypto.PrivKey

Transports []TptC
Muxers []MsMuxC
SecurityTransports []MsSecC
Transports []fx.Option
Muxers []Muxer
SecurityTransports []fx.Option
Insecure bool
PSK pnet.PSK

Expand Down Expand Up @@ -168,51 +167,65 @@ func (cfg *Config) addTransports(h host.Host) error {
// Should probably skip this if no transports.
return fmt.Errorf("swarm does not support transports")
}
var secure sec.SecureMuxer

muxers := make([]protocol.ID, 0, len(cfg.Muxers))
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
for _, m := range cfg.Muxers {
muxers = append(muxers, m.ID)
}

var security []fx.Option
if cfg.Insecure {
secure = makeInsecureTransport(h.ID(), cfg.PeerKey)
security = append(security, fx.Provide(makeInsecureTransport))
} else {
var err error
secure, err = makeSecurityMuxer(h, cfg.SecurityTransports, cfg.Muxers)
if err != nil {
return err
}
}
muxer, err := makeMuxer(h, cfg.Muxers)
if err != nil {
return err
}
var opts []tptu.Option
if len(cfg.PSK) > 0 {
opts = append(opts, tptu.WithPSK(cfg.PSK))
}
if cfg.ConnectionGater != nil {
opts = append(opts, tptu.WithConnectionGater(cfg.ConnectionGater))
}
if cfg.ResourceManager != nil {
opts = append(opts, tptu.WithResourceManager(cfg.ResourceManager))
}
upgrader, err := tptu.New(secure, muxer, opts...)
if err != nil {
return err
security = cfg.SecurityTransports
}
tpts, err := makeTransports(h, upgrader, cfg.ConnectionGater, cfg.PSK, cfg.ResourceManager, cfg.MultiaddrResolver, cfg.Transports)
muxer, err := makeMuxer(cfg.Muxers)
if err != nil {
return err
}
for _, t := range tpts {
if err := swrm.AddTransport(t); err != nil {
return err
}
}

fxopts := []fx.Option{
fx.WithLogger(func() fxevent.Logger { return getFXLogger() }),
fx.Provide(tptu.New),
fx.Provide(func() network.Multiplexer { return muxer }),
fx.Provide(fx.Annotate(
makeSecurityMuxer,
fx.ParamTags(`group:"security"`),
)),
fx.Supply(muxers),
fx.Provide(func() host.Host { return h }),
fx.Provide(func() crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }),
fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }),
fx.Provide(func() pnet.PSK { return cfg.PSK }),
fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }),
fx.Provide(func() *madns.Resolver { return cfg.MultiaddrResolver }),
}
fxopts = append(fxopts, cfg.Transports...)
if !cfg.Insecure {
fxopts = append(fxopts, security...)
}

fxopts = append(fxopts, fx.Invoke(
fx.Annotate(
func(tpts []transport.Transport) error {
for _, t := range tpts {
if err := swrm.AddTransport(t); err != nil {
return err
}
}
return nil
},
fx.ParamTags(`group:"transport"`),
)),
)
if cfg.Relay {
if err := circuitv2.AddTransport(h, upgrader); err != nil {
h.Close()
return err
}
fxopts = append(fxopts, fx.Invoke(circuitv2.AddTransport))
}
app := fx.New(fxopts...)
if err := app.Err(); err != nil {
h.Close()
return err
}

return nil
}

Expand Down
91 changes: 0 additions & 91 deletions config/constructor_types.go

This file was deleted.

28 changes: 28 additions & 0 deletions config/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package config

import (
"strings"
"sync"

logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx/fxevent"
)

var log = logging.Logger("p2p-config")

var (
fxLogger fxevent.Logger
logInitOnce sync.Once
)

type fxLogWriter struct{}

func (l *fxLogWriter) Write(b []byte) (int, error) {
log.Debug(strings.TrimSuffix(string(b), "\n"))
return len(b), nil
}

func getFXLogger() fxevent.Logger {
logInitOnce.Do(func() { fxLogger = &fxevent.ConsoleLogger{W: &fxLogWriter{}} })
return fxLogger
}
58 changes: 12 additions & 46 deletions config/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,27 @@ package config
import (
"fmt"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
msmux "github.com/libp2p/go-libp2p/p2p/muxer/muxer-multistream"
)

// MuxC is a stream multiplex transport constructor.
type MuxC func(h host.Host) (network.Multiplexer, error)

// MsMuxC is a tuple containing a multiplex transport constructor and a protocol
// ID.
type MsMuxC struct {
MuxC
ID string
type Muxer struct {
ID protocol.ID
Multiplexer network.Multiplexer
}

var muxArgTypes = newArgTypeSet(hostType, networkType, peerIDType, pstoreType)

// MuxerConstructor creates a multiplex constructor from the passed parameter
// using reflection.
func MuxerConstructor(m interface{}) (MuxC, error) {
// Already constructed?
if t, ok := m.(network.Multiplexer); ok {
return func(_ host.Host) (network.Multiplexer, error) {
return t, nil
}, nil
}

ctor, err := makeConstructor(m, muxType, muxArgTypes)
if err != nil {
return nil, err
}
return func(h host.Host) (network.Multiplexer, error) {
t, err := ctor(h, nil, nil, nil, nil, nil, nil)
if err != nil {
return nil, err
}
return t.(network.Multiplexer), nil
}, nil
}

func makeMuxer(h host.Host, tpts []MsMuxC) (network.Multiplexer, error) {
func makeMuxer(muxers []Muxer) (network.Multiplexer, error) {
muxMuxer := msmux.NewBlankTransport()
transportSet := make(map[string]struct{}, len(tpts))
for _, tptC := range tpts {
if _, ok := transportSet[tptC.ID]; ok {
return nil, fmt.Errorf("duplicate muxer transport: %s", tptC.ID)
transportSet := make(map[protocol.ID]struct{}, len(muxers))
for _, m := range muxers {
if _, ok := transportSet[m.ID]; ok {
return nil, fmt.Errorf("duplicate muxer transport: %s", m.ID)
}
transportSet[tptC.ID] = struct{}{}
transportSet[m.ID] = struct{}{}
}
for _, tptC := range tpts {
tpt, err := tptC.MuxC(h)
if err != nil {
return nil, err
}
muxMuxer.AddTransport(tptC.ID, tpt)
for _, m := range muxers {
muxMuxer.AddTransport(string(m.ID), m.Multiplexer)
}
return muxMuxer, nil
}
Loading