Skip to content

Commit

Permalink
use Fx to start and stop the host, swarm, autorelay and quicreuse (#2118
Browse files Browse the repository at this point in the history
)

* config: refactor AutoNAT construction into separate method

* config: use a lifecycle hook to start listening on swarm addresses

* use Fx to construct the host

* add a test for constructing a routed host

* use Fx hooks to start the host

* config: use Fx lifecycle hooks to start AutoRelay and for PeerRouting

* basichost: don't close the swarm

The swarm is not constructed by the basic host, thus is shouldn't be
closed by it.

* config: use Fx hook to close the quicreuse connection manager

* test for goroutine leaks when starting/stopping fx

To do this, I've had to move a few leaky tests into a separate package.
I've filed a bug for the AutoNAT issue (#2743) but the "error on
startup" issue is going to require some pretty invasive changes (we need
to construct _then_ start).

* go fmt

* Ignore one more top function

* Typo

* Ignore any not top

---------

Co-authored-by: Sukun <sukunrt@gmail.com>
Co-authored-by: Steven Allen <steven@stebalien.com>
Co-authored-by: Marco Munizaga <git@marcopolo.io>
  • Loading branch information
4 people authored Mar 21, 2024
1 parent 39242a4 commit 9d149fa
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 141 deletions.
232 changes: 138 additions & 94 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"context"
"crypto/rand"
"errors"
"fmt"
Expand Down Expand Up @@ -38,6 +39,7 @@ import (

ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
"github.com/quic-go/quic-go"
"go.uber.org/fx"
"go.uber.org/fx/fxevent"
)
Expand Down Expand Up @@ -190,20 +192,11 @@ func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swa
return swarm.NewSwarm(pid, cfg.Peerstore, eventBus, opts...)
}

func (cfg *Config) addTransports(h host.Host) error {
swrm, ok := h.Network().(transport.TransportNetwork)
if !ok {
// Should probably skip this if no transports.
return fmt.Errorf("swarm does not support transports")
}

func (cfg *Config) addTransports() ([]fx.Option, error) {
fxopts := []fx.Option{
fx.WithLogger(func() fxevent.Logger { return getFXLogger() }),
fx.Provide(fx.Annotate(tptu.New, fx.ParamTags(`name:"security"`))),
fx.Supply(cfg.Muxers),
fx.Supply(h.ID()),
fx.Provide(func() host.Host { return h }),
fx.Provide(func() crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }),
fx.Provide(func() connmgr.ConnectionGater { return cfg.ConnectionGater }),
fx.Provide(func() pnet.PSK { return cfg.PSK }),
fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }),
Expand Down Expand Up @@ -265,56 +258,38 @@ func (cfg *Config) addTransports(h host.Host) error {
if cfg.QUICReuse != nil {
fxopts = append(fxopts, cfg.QUICReuse...)
} else {
fxopts = append(fxopts, fx.Provide(quicreuse.NewConnManager)) // TODO: close the ConnManager when shutting down the node
fxopts = append(fxopts,
fx.Provide(func(key quic.StatelessResetKey, tokenGenerator quic.TokenGeneratorKey, _ *swarm.Swarm, lifecycle fx.Lifecycle) (*quicreuse.ConnManager, error) {
cm, err := quicreuse.NewConnManager(key, tokenGenerator)
if err != nil {
return nil, err
}
lifecycle.Append(fx.StopHook(cm.Close))
return cm, nil
}),
)
}

fxopts = append(fxopts, fx.Invoke(
fx.Annotate(
func(tpts []transport.Transport) error {
func(swrm *swarm.Swarm, tpts []transport.Transport) error {
for _, t := range tpts {
if err := swrm.AddTransport(t); err != nil {
return err
}
}
return nil
},
fx.ParamTags(`group:"transport"`),
fx.ParamTags("", `group:"transport"`),
)),
)
if cfg.Relay {
fxopts = append(fxopts, fx.Invoke(circuitv2.AddTransport))
}
app := fx.New(fxopts...)
if err := app.Err(); err != nil {
h.Close()
return err
}
return nil
return fxopts, nil
}

// NewNode constructs a new libp2p Host from the Config.
//
// This function consumes the config. Do not reuse it (really!).
func (cfg *Config) NewNode() (host.Host, error) {
// If possible check that the resource manager conn limit is higher than the
// limit set in the conn manager.
if l, ok := cfg.ResourceManager.(connmgr.GetConnLimiter); ok {
err := cfg.ConnManager.CheckLimit(l)
if err != nil {
log.Warn(fmt.Sprintf("rcmgr limit conflicts with connmgr limit: %v", err))
}
}

eventBus := eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer))))
swrm, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics)
if err != nil {
return nil, err
}

if !cfg.DisableMetrics {
rcmgr.MustRegisterWith(cfg.PrometheusRegisterer)
}

func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus) (*bhost.BasicHost, error) {
h, err := bhost.NewHost(swrm, &bhost.HostOpts{
EventBus: eventBus,
ConnManager: cfg.ConnManager,
Expand All @@ -331,10 +306,8 @@ func (cfg *Config) NewNode() (host.Host, error) {
PrometheusRegisterer: cfg.PrometheusRegisterer,
})
if err != nil {
swrm.Close()
return nil, err
}

if cfg.Relay {
// If we've enabled the relay, we should filter out relay
// addresses by default.
Expand All @@ -345,60 +318,137 @@ func (cfg *Config) NewNode() (host.Host, error) {
return oldFactory(autorelay.Filter(addrs))
}
}
return h, nil
}

if err := cfg.addTransports(h); err != nil {
h.Close()
return nil, err
// NewNode constructs a new libp2p Host from the Config.
//
// This function consumes the config. Do not reuse it (really!).
func (cfg *Config) NewNode() (host.Host, error) {
if cfg.EnableAutoRelay && !cfg.Relay {
return nil, fmt.Errorf("cannot enable autorelay; relay is not enabled")
}
// If possible check that the resource manager conn limit is higher than the
// limit set in the conn manager.
if l, ok := cfg.ResourceManager.(connmgr.GetConnLimiter); ok {
err := cfg.ConnManager.CheckLimit(l)
if err != nil {
log.Warn(fmt.Sprintf("rcmgr limit conflicts with connmgr limit: %v", err))
}
}

// TODO: This method succeeds if listening on one address succeeds. We
// should probably fail if listening on *any* addr fails.
if err := h.Network().Listen(cfg.ListenAddrs...); err != nil {
h.Close()
if !cfg.DisableMetrics {
rcmgr.MustRegisterWith(cfg.PrometheusRegisterer)
}

fxopts := []fx.Option{
fx.Provide(func() event.Bus {
return eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer))))
}),
fx.Provide(func(eventBus event.Bus, lifecycle fx.Lifecycle) (*swarm.Swarm, error) {
sw, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics)
if err != nil {
return nil, err
}
lifecycle.Append(fx.StopHook(sw.Close))
return sw, nil
}),
// Make sure the swarm constructor depends on the quicreuse.ConnManager.
// That way, the ConnManager will be started before the swarm, and more importantly,
// the swarm will be stopped before the ConnManager.
fx.Decorate(func(sw *swarm.Swarm, _ *quicreuse.ConnManager, lifecycle fx.Lifecycle) *swarm.Swarm {
lifecycle.Append(fx.Hook{
OnStart: func(context.Context) error {
// TODO: This method succeeds if listening on one address succeeds. We
// should probably fail if listening on *any* addr fails.
return sw.Listen(cfg.ListenAddrs...)
},
OnStop: func(context.Context) error {
return sw.Close()
},
})
return sw
}),
fx.Provide(cfg.newBasicHost),
fx.Provide(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) host.Host {
lifecycle.Append(fx.StartHook(h.Start))
return h
}),
fx.Provide(func(h host.Host) peer.ID { return h.ID() }),
fx.Provide(func(h host.Host) crypto.PrivKey { return h.Peerstore().PrivKey(h.ID()) }),
}
transportOpts, err := cfg.addTransports()
if err != nil {
return nil, err
}
fxopts = append(fxopts, transportOpts...)

// Configure routing and autorelay
var router routing.PeerRouting
if cfg.Routing != nil {
router, err = cfg.Routing(h)
if err != nil {
h.Close()
return nil, err
}
fxopts = append(fxopts,
fx.Provide(cfg.Routing),
fx.Provide(func(h host.Host, router routing.PeerRouting) *routed.RoutedHost {
return routed.Wrap(h, router)
}),
)
}

// Note: h.AddrsFactory may be changed by relayFinder, but non-relay version is
// used by AutoNAT below.
var ar *autorelay.AutoRelay
addrF := h.AddrsFactory
if cfg.EnableAutoRelay {
if !cfg.Relay {
h.Close()
return nil, fmt.Errorf("cannot enable autorelay; relay is not enabled")
}
if !cfg.DisableMetrics {
mt := autorelay.WithMetricsTracer(
autorelay.NewMetricsTracer(autorelay.WithRegisterer(cfg.PrometheusRegisterer)))
mtOpts := []autorelay.Option{mt}
cfg.AutoRelayOpts = append(mtOpts, cfg.AutoRelayOpts...)
}
fxopts = append(fxopts,
fx.Invoke(func(h *bhost.BasicHost, lifecycle fx.Lifecycle) (*autorelay.AutoRelay, error) {
ar, err := autorelay.NewAutoRelay(h, cfg.AutoRelayOpts...)
if err != nil {
return nil, err
}
lifecycle.Append(fx.StartStopHook(ar.Start, ar.Close))
return ar, nil
}),
)
}

ar, err = autorelay.NewAutoRelay(h, cfg.AutoRelayOpts...)
if err != nil {
return nil, err
}
var bh *bhost.BasicHost
fxopts = append(fxopts, fx.Invoke(func(bho *bhost.BasicHost) { bh = bho }))

var rh *routed.RoutedHost
if cfg.Routing != nil {
fxopts = append(fxopts, fx.Invoke(func(bho *routed.RoutedHost) { rh = bho }))
}

app := fx.New(fxopts...)
if err := app.Start(context.Background()); err != nil {
return nil, err
}

if err := cfg.addAutoNAT(bh); err != nil {
rh.Close()
return nil, err
}

if cfg.Routing != nil {
return &closableRoutedHost{App: app, RoutedHost: rh}, nil
}
return &closableBasicHost{App: app, BasicHost: bh}, nil
}

func (cfg *Config) addAutoNAT(h *bhost.BasicHost) error {
addrF := h.AddrsFactory
autonatOpts := []autonat.Option{
autonat.UsingAddresses(func() []ma.Multiaddr {
return addrF(h.AllAddrs())
}),
}
if !cfg.DisableMetrics {
autonatOpts = append(autonatOpts,
autonat.WithMetricsTracer(
autonat.NewMetricsTracer(autonat.WithRegisterer(cfg.PrometheusRegisterer))))
autonatOpts = append(autonatOpts, autonat.WithMetricsTracer(
autonat.NewMetricsTracer(autonat.WithRegisterer(cfg.PrometheusRegisterer)),
))
}
if cfg.AutoNATConfig.ThrottleInterval != 0 {
autonatOpts = append(autonatOpts,
Expand All @@ -408,11 +458,11 @@ func (cfg *Config) NewNode() (host.Host, error) {
if cfg.AutoNATConfig.EnableService {
autonatPrivKey, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return nil, err
return err
}
ps, err := pstoremem.NewPeerstore()
if err != nil {
return nil, err
return err
}

// Pull out the pieces of the config that we _actually_ care about.
Expand All @@ -438,14 +488,23 @@ func (cfg *Config) NewNode() (host.Host, error) {

dialer, err := autoNatCfg.makeSwarm(eventbus.NewBus(), false)
if err != nil {
h.Close()
return nil, err
return err
}
dialerHost := blankhost.NewBlankHost(dialer)
if err := autoNatCfg.addTransports(dialerHost); err != nil {
fxopts, err := autoNatCfg.addTransports()
if err != nil {
dialerHost.Close()
return err
}
fxopts = append(fxopts,
fx.Supply(dialerHost.ID()),
fx.Supply(dialer),
fx.Provide(func() crypto.PrivKey { return autonatPrivKey }),
)
app := fx.New(fxopts...)
if err := app.Err(); err != nil {
dialerHost.Close()
h.Close()
return nil, err
return err
}
// NOTE: We're dropping the blank host here but that's fine. It
// doesn't really _do_ anything and doesn't even need to be
Expand All @@ -458,25 +517,10 @@ func (cfg *Config) NewNode() (host.Host, error) {

autonat, err := autonat.New(h, autonatOpts...)
if err != nil {
h.Close()
return nil, fmt.Errorf("cannot enable autorelay; autonat failed to start: %v", err)
return fmt.Errorf("cannot enable autorelay; autonat failed to start: %v", err)
}
h.SetAutoNat(autonat)

// start the host background tasks
h.Start()

var ho host.Host
ho = h
if router != nil {
ho = routed.Wrap(h, router)
}
if ar != nil {
arh := autorelay.NewAutoRelayHost(ho, ar)
arh.Start()
ho = arh
}
return ho, nil
return nil
}

// Option is a libp2p config option that can be given to the libp2p constructor
Expand Down
30 changes: 30 additions & 0 deletions config/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package config

import (
"context"

basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
routed "github.com/libp2p/go-libp2p/p2p/host/routed"

"go.uber.org/fx"
)

type closableBasicHost struct {
*fx.App
*basichost.BasicHost
}

func (h *closableBasicHost) Close() error {
_ = h.App.Stop(context.Background())
return h.BasicHost.Close()
}

type closableRoutedHost struct {
*fx.App
*routed.RoutedHost
}

func (h *closableRoutedHost) Close() error {
_ = h.App.Stop(context.Background())
return h.RoutedHost.Close()
}
1 change: 1 addition & 0 deletions leaky_tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Tests that leak goroutines for various reasons. Mostly because libp2p node shutdown logic doesn't run if we fail to construct the node.
Loading

0 comments on commit 9d149fa

Please sign in to comment.