Skip to content

Commit

Permalink
Merge pull request #1858 from libp2p/fx
Browse files Browse the repository at this point in the history
config: use fx dependency injection to construct transports
  • Loading branch information
marten-seemann authored Nov 10, 2022
2 parents c334288 + b90b74f commit e538b40
Show file tree
Hide file tree
Showing 38 changed files with 434 additions and 932 deletions.
99 changes: 56 additions & 43 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/pnet"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/core/sec"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/host/autonat"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
Expand All @@ -28,13 +28,12 @@ import (
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"

logging "github.com/ipfs/go-log/v2"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
"go.uber.org/fx"
"go.uber.org/fx/fxevent"
)

var log = logging.Logger("p2p-config")

// AddrsFactory is a function that takes a set of multiaddrs we're listening on and
// returns the set of multiaddrs we should advertise to the network.
type AddrsFactory = bhost.AddrsFactory
Expand Down Expand Up @@ -71,9 +70,9 @@ type Config struct {

PeerKey crypto.PrivKey

Transports []TptC
Muxers []MsMuxC
SecurityTransports []MsSecC
Transports []fx.Option
Muxers []Muxer
SecurityTransports []fx.Option
Insecure bool
PSK pnet.PSK

Expand Down Expand Up @@ -168,51 +167,65 @@ func (cfg *Config) addTransports(h host.Host) error {
// Should probably skip this if no transports.
return fmt.Errorf("swarm does not support transports")
}
var secure sec.SecureMuxer

muxers := make([]protocol.ID, 0, len(cfg.Muxers))
for _, m := range cfg.Muxers {
muxers = append(muxers, m.ID)
}

var security []fx.Option
if cfg.Insecure {
secure = makeInsecureTransport(h.ID(), cfg.PeerKey)
security = append(security, fx.Provide(makeInsecureTransport))
} else {
var err error
secure, err = makeSecurityMuxer(h, cfg.SecurityTransports, cfg.Muxers)
if err != nil {
return err
}
}
muxer, err := makeMuxer(h, cfg.Muxers)
if err != nil {
return err
}
var opts []tptu.Option
if len(cfg.PSK) > 0 {
opts = append(opts, tptu.WithPSK(cfg.PSK))
}
if cfg.ConnectionGater != nil {
opts = append(opts, tptu.WithConnectionGater(cfg.ConnectionGater))
}
if cfg.ResourceManager != nil {
opts = append(opts, tptu.WithResourceManager(cfg.ResourceManager))
}
upgrader, err := tptu.New(secure, muxer, opts...)
if err != nil {
return err
security = cfg.SecurityTransports
}
tpts, err := makeTransports(h, upgrader, cfg.ConnectionGater, cfg.PSK, cfg.ResourceManager, cfg.MultiaddrResolver, cfg.Transports)
muxer, err := makeMuxer(cfg.Muxers)
if err != nil {
return err
}
for _, t := range tpts {
if err := swrm.AddTransport(t); err != nil {
return err
}
}

fxopts := []fx.Option{
fx.WithLogger(func() fxevent.Logger { return getFXLogger() }),
fx.Provide(tptu.New),
fx.Provide(func() network.Multiplexer { return muxer }),
fx.Provide(fx.Annotate(
makeSecurityMuxer,
fx.ParamTags(`group:"security"`),
)),
fx.Supply(muxers),
fx.Provide(func() host.Host { return h }),
fx.Provide(func() crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }),
fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }),
fx.Provide(func() pnet.PSK { return cfg.PSK }),
fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }),
fx.Provide(func() *madns.Resolver { return cfg.MultiaddrResolver }),
}
fxopts = append(fxopts, cfg.Transports...)
if !cfg.Insecure {
fxopts = append(fxopts, security...)
}

fxopts = append(fxopts, fx.Invoke(
fx.Annotate(
func(tpts []transport.Transport) error {
for _, t := range tpts {
if err := swrm.AddTransport(t); err != nil {
return err
}
}
return nil
},
fx.ParamTags(`group:"transport"`),
)),
)
if cfg.Relay {
if err := circuitv2.AddTransport(h, upgrader); err != nil {
h.Close()
return err
}
fxopts = append(fxopts, fx.Invoke(circuitv2.AddTransport))
}
app := fx.New(fxopts...)
if err := app.Err(); err != nil {
h.Close()
return err
}

return nil
}

Expand Down
91 changes: 0 additions & 91 deletions config/constructor_types.go

This file was deleted.

28 changes: 28 additions & 0 deletions config/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package config

import (
"strings"
"sync"

logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx/fxevent"
)

var log = logging.Logger("p2p-config")

var (
fxLogger fxevent.Logger
logInitOnce sync.Once
)

type fxLogWriter struct{}

func (l *fxLogWriter) Write(b []byte) (int, error) {
log.Debug(strings.TrimSuffix(string(b), "\n"))
return len(b), nil
}

func getFXLogger() fxevent.Logger {
logInitOnce.Do(func() { fxLogger = &fxevent.ConsoleLogger{W: &fxLogWriter{}} })
return fxLogger
}
58 changes: 12 additions & 46 deletions config/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,27 @@ package config
import (
"fmt"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
msmux "github.com/libp2p/go-libp2p/p2p/muxer/muxer-multistream"
)

// MuxC is a stream multiplex transport constructor.
type MuxC func(h host.Host) (network.Multiplexer, error)

// MsMuxC is a tuple containing a multiplex transport constructor and a protocol
// ID.
type MsMuxC struct {
MuxC
ID string
type Muxer struct {
ID protocol.ID
Multiplexer network.Multiplexer
}

var muxArgTypes = newArgTypeSet(hostType, networkType, peerIDType, pstoreType)

// MuxerConstructor creates a multiplex constructor from the passed parameter
// using reflection.
func MuxerConstructor(m interface{}) (MuxC, error) {
// Already constructed?
if t, ok := m.(network.Multiplexer); ok {
return func(_ host.Host) (network.Multiplexer, error) {
return t, nil
}, nil
}

ctor, err := makeConstructor(m, muxType, muxArgTypes)
if err != nil {
return nil, err
}
return func(h host.Host) (network.Multiplexer, error) {
t, err := ctor(h, nil, nil, nil, nil, nil, nil)
if err != nil {
return nil, err
}
return t.(network.Multiplexer), nil
}, nil
}

func makeMuxer(h host.Host, tpts []MsMuxC) (network.Multiplexer, error) {
func makeMuxer(muxers []Muxer) (network.Multiplexer, error) {
muxMuxer := msmux.NewBlankTransport()
transportSet := make(map[string]struct{}, len(tpts))
for _, tptC := range tpts {
if _, ok := transportSet[tptC.ID]; ok {
return nil, fmt.Errorf("duplicate muxer transport: %s", tptC.ID)
transportSet := make(map[protocol.ID]struct{}, len(muxers))
for _, m := range muxers {
if _, ok := transportSet[m.ID]; ok {
return nil, fmt.Errorf("duplicate muxer transport: %s", m.ID)
}
transportSet[tptC.ID] = struct{}{}
transportSet[m.ID] = struct{}{}
}
for _, tptC := range tpts {
tpt, err := tptC.MuxC(h)
if err != nil {
return nil, err
}
muxMuxer.AddTransport(tptC.ID, tpt)
for _, m := range muxers {
muxMuxer.AddTransport(string(m.ID), m.Multiplexer)
}
return muxMuxer, nil
}
Loading

0 comments on commit e538b40

Please sign in to comment.