From 9bcf072ccb07ff2981487a5144dab2691079ec06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 26 Apr 2019 17:03:07 +0200 Subject: [PATCH 1/4] WIP cleanup config handling in core MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/node/builder.go | 11 ++++++++--- core/node/groups.go | 25 +++++++++++++++++++------ core/node/helpers.go | 7 ------- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/core/node/builder.go b/core/node/builder.go index 029ebd37ddf..5228310f2ed 100644 --- a/core/node/builder.go +++ b/core/node/builder.go @@ -84,10 +84,10 @@ func (cfg *BuildCfg) fillDefaults() error { } // options creates fx option group from this build config -func (cfg *BuildCfg) options(ctx context.Context) fx.Option { +func (cfg *BuildCfg) options(ctx context.Context) (fx.Option, *cfg.Config) { err := cfg.fillDefaults() if err != nil { - return fx.Error(err) + return fx.Error(err), nil } repoOption := fx.Provide(func(lc fx.Lifecycle) repo.Repo { @@ -112,12 +112,17 @@ func (cfg *BuildCfg) options(ctx context.Context) fx.Option { return cfg.Routing }) + conf, err := cfg.Repo.Config() + if err != nil { + return fx.Error(err), nil + } + return fx.Options( repoOption, hostOption, routingOption, metricsCtx, - ) + ), conf } func defaultRepo(dstore repo.Datastore) (repo.Repo, error) { diff --git a/core/node/groups.go b/core/node/groups.go index 1a69f03de40..56f2f353835 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -9,6 +9,7 @@ import ( offline "github.com/ipfs/go-ipfs-exchange-offline" offroute "github.com/ipfs/go-ipfs-routing/offline" + uio "github.com/ipfs/go-unixfs/io" "github.com/ipfs/go-path/resolver" "go.uber.org/fx" ) @@ -122,21 +123,33 @@ func Networked(cfg *BuildCfg) fx.Option { } // IPFS builds a group of fx Options based on the passed BuildCfg -func IPFS(ctx context.Context, cfg *BuildCfg) fx.Option { +func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option { + if bcfg == nil { + bcfg = new(BuildCfg) + } + + bcfgOpts, cfg := bcfg.options(ctx) if cfg == nil { - cfg = new(BuildCfg) + return bcfgOpts // error } + // TEMP: setting global sharding switch here + uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled + + + + + + return fx.Options( - cfg.options(ctx), + bcfgOpts, fx.Provide(baseProcess), - fx.Invoke(setupSharding), - Storage(cfg), + Storage(bcfg), Identity, IPNS, - Networked(cfg), + Networked(bcfg), Core, ) diff --git a/core/node/helpers.go b/core/node/helpers.go index 17954b63a89..2c91ff5f965 100644 --- a/core/node/helpers.go +++ b/core/node/helpers.go @@ -3,8 +3,6 @@ package node import ( "context" - config "github.com/ipfs/go-ipfs-config" - uio "github.com/ipfs/go-unixfs/io" "github.com/jbenet/goprocess" "github.com/pkg/errors" "go.uber.org/fx" @@ -45,11 +43,6 @@ func maybeProvide(opt interface{}, enable bool) fx.Option { return fx.Options() } -func setupSharding(cfg *config.Config) { - // TEMP: setting global sharding switch here - uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled -} - // baseProcess creates a goprocess which is closed when the lifecycle signals it to stop func baseProcess(lc fx.Lifecycle) goprocess.Process { p := goprocess.WithParent(goprocess.Background()) From ed514b91774de87fb1c96f878943e78db0f17fc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 29 Apr 2019 23:37:37 +0200 Subject: [PATCH 2/4] Invert constructor config handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/core.go | 4 +- core/node/groups.go | 268 ++++++++++++++++++++++++++++------ core/node/identity.go | 49 ++----- core/node/ipns.go | 51 ++----- core/node/libp2p/discovery.go | 29 ++-- core/node/libp2p/libp2p.go | 229 ++++++++++++----------------- core/node/provider.go | 32 +--- core/node/storage.go | 35 ++--- reprovide/providers.go | 36 ++--- 9 files changed, 399 insertions(+), 334 deletions(-) diff --git a/core/core.go b/core/core.go index 23656e72295..d7ededb8224 100644 --- a/core/core.go +++ b/core/core.go @@ -71,13 +71,13 @@ type IpfsNode struct { // Local node Pinning pin.Pinner // the pinning manager Mounts Mounts `optional:"true"` // current mount state, if any. - PrivateKey ic.PrivKey // the local node's private Key + PrivateKey ic.PrivKey `optional:"true"` // the local node's private Key PNetFingerprint libp2p.PNetFingerprint `optional:"true"` // fingerprint of private network // Services Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances Blockstore bstore.GCBlockstore // the block store (lower level) - Filestore *filestore.Filestore // the filestore blockstore + Filestore *filestore.Filestore `optional:"true"` // the filestore blockstore BaseBlocks node.BaseBlocks // the raw blockstore, no filestore wrapping GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc Blocks bserv.BlockService // the block service, get/add blocks. diff --git a/core/node/groups.go b/core/node/groups.go index 56f2f353835..549aef81d4d 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -2,72 +2,187 @@ package node import ( "context" + "errors" + "fmt" + "time" + + blockstore "github.com/ipfs/go-ipfs-blockstore" + "github.com/ipfs/go-ipfs-config" + util "github.com/ipfs/go-ipfs-util" + peer "github.com/libp2p/go-libp2p-peer" + "github.com/libp2p/go-libp2p-peerstore/pstoremem" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/ipfs/go-ipfs/core/node/libp2p" "github.com/ipfs/go-ipfs/p2p" "github.com/ipfs/go-ipfs/provider" + "github.com/ipfs/go-ipfs/reprovide" offline "github.com/ipfs/go-ipfs-exchange-offline" offroute "github.com/ipfs/go-ipfs-routing/offline" - uio "github.com/ipfs/go-unixfs/io" "github.com/ipfs/go-path/resolver" + uio "github.com/ipfs/go-unixfs/io" "go.uber.org/fx" ) var BaseLibP2P = fx.Options( - fx.Provide(libp2p.AddrFilters), - fx.Provide(libp2p.BandwidthCounter), fx.Provide(libp2p.PNet), - fx.Provide(libp2p.AddrsFactory), fx.Provide(libp2p.ConnectionManager), - fx.Provide(libp2p.NatPortMap), - fx.Provide(libp2p.Relay), - fx.Provide(libp2p.AutoRealy), fx.Provide(libp2p.DefaultTransports), - fx.Provide(libp2p.QUIC), fx.Provide(libp2p.Host), fx.Provide(libp2p.DiscoveryHandler), - fx.Invoke(libp2p.AutoNATService), fx.Invoke(libp2p.PNetChecker), - fx.Invoke(libp2p.StartListening), - fx.Invoke(libp2p.SetupDiscovery), ) -func LibP2P(cfg *BuildCfg) fx.Option { +func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { + + // parse ConnMgr config + + grace := config.DefaultConnMgrGracePeriod + low := config.DefaultConnMgrHighWater + high := config.DefaultConnMgrHighWater + + connmgr := fx.Options() + + if cfg.Swarm.ConnMgr.Type != "none" { + switch cfg.Swarm.ConnMgr.Type { + case "": + // 'default' value is the basic connection manager + break + case "basic": + var err error + grace, err = time.ParseDuration(cfg.Swarm.ConnMgr.GracePeriod) + if err != nil { + return fx.Error(fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err)) + } + + low = cfg.Swarm.ConnMgr.LowWater + high = cfg.Swarm.ConnMgr.HighWater + default: + return fx.Error(fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Swarm.ConnMgr.Type)) + } + + connmgr = fx.Provide(libp2p.ConnectionManager(low, high, grace)) + } + + // parse PubSub config + + ps := fx.Options() + if bcfg.getOpt("pubsub") || bcfg.getOpt("ipnsps") { + var pubsubOptions []pubsub.Option + if cfg.Pubsub.DisableSigning { + pubsubOptions = append(pubsubOptions, pubsub.WithMessageSigning(false)) + } + + if cfg.Pubsub.StrictSignatureVerification { + pubsubOptions = append(pubsubOptions, pubsub.WithStrictSignatureVerification(true)) + } + + switch cfg.Pubsub.Router { + case "": + fallthrough + case "floodsub": + ps = fx.Provide(libp2p.FloodSub(pubsubOptions...)) + case "gossipsub": + ps = fx.Provide(libp2p.GossipSub(pubsubOptions...)) + default: + return fx.Error(fmt.Errorf("unknown pubsub router %s", cfg.Pubsub.Router)) + } + } + + // Gather all the options + opts := fx.Options( BaseLibP2P, - fx.Provide(libp2p.Security(!cfg.DisableEncryptedConnections)), - maybeProvide(libp2p.Pubsub, cfg.getOpt("pubsub") || cfg.getOpt("ipnsps")), + fx.Provide(libp2p.AddrFilters(cfg.Swarm.AddrFilters)), + fx.Invoke(libp2p.SetupDiscovery(cfg.Discovery.MDNS.Enabled, cfg.Discovery.MDNS.Interval)), + fx.Provide(libp2p.AddrsFactory(cfg.Addresses.Announce, cfg.Addresses.NoAnnounce)), + fx.Provide(libp2p.SmuxTransport(bcfg.getOpt("mplex"))), + fx.Provide(libp2p.Relay(cfg.Swarm.DisableRelay, cfg.Swarm.EnableRelayHop)), + fx.Invoke(libp2p.StartListening(cfg.Addresses.Swarm)), + + fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Experimental.PreferTLS)), - fx.Provide(libp2p.SmuxTransport(cfg.getOpt("mplex"))), fx.Provide(libp2p.Routing), fx.Provide(libp2p.BaseRouting), - maybeProvide(libp2p.PubsubRouter, cfg.getOpt("ipnsps")), + maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")), + + maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics), + maybeProvide(libp2p.NatPortMap, !cfg.Swarm.DisableNatPortMap), + maybeProvide(libp2p.AutoRealy, cfg.Swarm.EnableAutoRelay), + maybeProvide(libp2p.QUIC, cfg.Experimental.QUIC), + maybeProvide(libp2p.AutoNATService(cfg.Experimental.QUIC), cfg.Swarm.EnableAutoNATService), + connmgr, + ps, ) return opts } // Storage groups units which setup datastore based persistence and blockstore layers -func Storage(cfg *BuildCfg) fx.Option { +func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option { + cacheOpts := blockstore.DefaultCacheOpts() + cacheOpts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize + if !bcfg.Permanent { + cacheOpts.HasBloomFilterSize = 0 + } + + finalBstore := fx.Provide(GcBlockstoreCtor) + if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled { + finalBstore = fx.Provide(FilestoreBlockstoreCtor) + } + return fx.Options( fx.Provide(RepoConfig), fx.Provide(Datastore), - fx.Provide(BaseBlockstoreCtor(cfg.Permanent, cfg.NilRepo)), - fx.Provide(GcBlockstoreCtor), + fx.Provide(BaseBlockstoreCtor(cacheOpts, bcfg.NilRepo, cfg.Datastore.HashOnRead)), + finalBstore, ) } // Identity groups units providing cryptographic identity -var Identity = fx.Options( - fx.Provide(PeerID), - fx.Provide(PrivateKey), - fx.Provide(libp2p.Peerstore), -) +func Identity(cfg *config.Config) fx.Option { + // PeerID + + cid := cfg.Identity.PeerID + if cid == "" { + return fx.Error(errors.New("identity was not set in config (was 'ipfs init' run?)")) + } + if len(cid) == 0 { + return fx.Error(errors.New("no peer ID in config! (was 'ipfs init' run?)")) + } + + id, err := peer.IDB58Decode(cid) + if err != nil { + return fx.Error(fmt.Errorf("peer ID invalid: %s", err)) + } + + // Private Key + + if cfg.Identity.PrivKey == "" { + return fx.Options( // No PK (usually in tests) + fx.Provide(PeerID(id)), + fx.Provide(pstoremem.NewPeerstore), + ) + } + + sk, err := cfg.Identity.DecodePrivateKey("passphrase todo!") + if err != nil { + return fx.Error(err) + } + + return fx.Options( // Full identity + fx.Provide(PeerID(id)), + fx.Provide(PrivateKey(sk)), + fx.Provide(pstoremem.NewPeerstore), + + fx.Invoke(libp2p.PstoreAddSelfKeys), + ) +} // IPNS groups namesys related units var IPNS = fx.Options( @@ -75,33 +190,97 @@ var IPNS = fx.Options( ) // Providers groups units managing provider routing records -var Providers = fx.Options( - fx.Provide(ProviderQueue), - fx.Provide(ProviderCtor), - fx.Provide(ReproviderCtor), +func Providers(cfg *config.Config) fx.Option { + reproviderInterval := kReprovideFrequency + if cfg.Reprovider.Interval != "" { + dur, err := time.ParseDuration(cfg.Reprovider.Interval) + if err != nil { + return fx.Error(err) + } + + reproviderInterval = dur + } - fx.Invoke(Reprovider), -) + var keyProvider fx.Option + switch cfg.Reprovider.Strategy { + case "all": + fallthrough + case "": + keyProvider = fx.Provide(reprovide.NewBlockstoreProvider) + case "roots": + keyProvider = fx.Provide(reprovide.NewPinnedProvider(true)) + case "pinned": + keyProvider = fx.Provide(reprovide.NewPinnedProvider(false)) + default: + return fx.Error(fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy)) + } + + return fx.Options( + fx.Provide(ProviderQueue), + fx.Provide(ProviderCtor), + fx.Provide(ReproviderCtor(reproviderInterval)), + keyProvider, + + fx.Invoke(Reprovider), + ) +} // Online groups online-only units -func Online(cfg *BuildCfg) fx.Option { +func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option { + + // Namesys params + + ipnsCacheSize := cfg.Ipns.ResolveCacheSize + if ipnsCacheSize == 0 { + ipnsCacheSize = DefaultIpnsCacheSize + } + if ipnsCacheSize < 0 { + return fx.Error(fmt.Errorf("cannot specify negative resolve cache size")) + } + + // Republisher params + + var repubPeriod, recordLifetime time.Duration + + if cfg.Ipns.RepublishPeriod != "" { + d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod) + if err != nil { + return fx.Error(fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err)) + } + + if !util.Debug && (d < time.Minute || d > (time.Hour*24)) { + return fx.Error(fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d)) + } + + repubPeriod = d + } + + if cfg.Ipns.RecordLifetime != "" { + d, err := time.ParseDuration(cfg.Ipns.RecordLifetime) + if err != nil { + return fx.Error(fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err)) + } + + recordLifetime = d + } + return fx.Options( fx.Provide(OnlineExchange), - fx.Provide(OnlineNamesys), + fx.Provide(Namesys(ipnsCacheSize)), - fx.Invoke(IpnsRepublisher), + fx.Invoke(IpnsRepublisher(repubPeriod, recordLifetime)), fx.Provide(p2p.New), - LibP2P(cfg), - Providers, + LibP2P(bcfg, cfg), + Providers(cfg), ) } // Offline groups offline alternatives to Online units var Offline = fx.Options( fx.Provide(offline.Exchange), - fx.Provide(OfflineNamesys), + fx.Provide(Namesys(0)), fx.Provide(offroute.NewOfflineRouter), fx.Provide(provider.NewOfflineProvider), ) @@ -115,9 +294,9 @@ var Core = fx.Options( fx.Provide(Files), ) -func Networked(cfg *BuildCfg) fx.Option { - if cfg.Online { - return Online(cfg) +func Networked(bcfg *BuildCfg, cfg *config.Config) fx.Option { + if bcfg.Online { + return Online(bcfg, cfg) } return Offline } @@ -136,20 +315,15 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option { // TEMP: setting global sharding switch here uio.UseHAMTSharding = cfg.Experimental.ShardingEnabled - - - - - return fx.Options( bcfgOpts, fx.Provide(baseProcess), - Storage(bcfg), - Identity, + Storage(bcfg, cfg), + Identity(cfg), IPNS, - Networked(bcfg), + Networked(bcfg, cfg), Core, ) diff --git a/core/node/identity.go b/core/node/identity.go index 336750082c2..46baa84942c 100644 --- a/core/node/identity.go +++ b/core/node/identity.go @@ -1,50 +1,29 @@ package node import ( - "errors" "fmt" - "github.com/ipfs/go-ipfs-config" "github.com/libp2p/go-libp2p-crypto" "github.com/libp2p/go-libp2p-peer" ) -// PeerID loads peer identity form config -func PeerID(cfg *config.Config) (peer.ID, error) { - cid := cfg.Identity.PeerID - if cid == "" { - return "", errors.New("identity was not set in config (was 'ipfs init' run?)") +func PeerID(id peer.ID) func() peer.ID { + return func() peer.ID { + return id } - if len(cid) == 0 { - return "", errors.New("no peer ID in config! (was 'ipfs init' run?)") - } - - id, err := peer.IDB58Decode(cid) - if err != nil { - return "", fmt.Errorf("peer ID invalid: %s", err) - } - - return id, nil } // PrivateKey loads the private key from config -func PrivateKey(cfg *config.Config, id peer.ID) (crypto.PrivKey, error) { - if cfg.Identity.PrivKey == "" { - return nil, nil - } - - sk, err := cfg.Identity.DecodePrivateKey("passphrase todo!") - if err != nil { - return nil, err - } - - id2, err := peer.IDFromPrivateKey(sk) - if err != nil { - return nil, err - } - - if id2 != id { - return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2) +func PrivateKey(sk crypto.PrivKey) func(id peer.ID) (crypto.PrivKey, error) { + return func(id peer.ID) (crypto.PrivKey, error) { + id2, err := peer.IDFromPrivateKey(sk) + if err != nil { + return nil, err + } + + if id2 != id { + return nil, fmt.Errorf("private key in config does not match id: %s != %s", id, id2) + } + return sk, nil } - return sk, nil } diff --git a/core/node/ipns.go b/core/node/ipns.go index 58e9955f0f5..1f245760331 100644 --- a/core/node/ipns.go +++ b/core/node/ipns.go @@ -4,7 +4,6 @@ import ( "fmt" "time" - "github.com/ipfs/go-ipfs-config" "github.com/ipfs/go-ipfs-util" "github.com/ipfs/go-ipns" "github.com/libp2p/go-libp2p-crypto" @@ -27,49 +26,31 @@ func RecordValidator(ps peerstore.Peerstore) record.Validator { } } -// OfflineNamesys creates namesys setup for offline operation -func OfflineNamesys(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) { - return namesys.NewNameSystem(rt, repo.Datastore(), 0), nil -} - -// OnlineNamesys createn new namesys setup for online operation -func OnlineNamesys(rt routing.IpfsRouting, repo repo.Repo, cfg *config.Config) (namesys.NameSystem, error) { - cs := cfg.Ipns.ResolveCacheSize - if cs == 0 { - cs = DefaultIpnsCacheSize +// Namesys creates new name system +func Namesys(cacheSize int) func(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) { + return func(rt routing.IpfsRouting, repo repo.Repo) (namesys.NameSystem, error) { + return namesys.NewNameSystem(rt, repo.Datastore(), cacheSize), nil } - if cs < 0 { - return nil, fmt.Errorf("cannot specify negative resolve cache size") - } - return namesys.NewNameSystem(rt, repo.Datastore(), cs), nil } // IpnsRepublisher runs new IPNS republisher service -func IpnsRepublisher(lc lcProcess, cfg *config.Config, namesys namesys.NameSystem, repo repo.Repo, privKey crypto.PrivKey) error { - repub := republisher.NewRepublisher(namesys, repo.Datastore(), privKey, repo.Keystore()) +func IpnsRepublisher(repubPeriod time.Duration, recordLifetime time.Duration) func(lcProcess, namesys.NameSystem, repo.Repo, crypto.PrivKey) error { + return func(lc lcProcess, namesys namesys.NameSystem, repo repo.Repo, privKey crypto.PrivKey) error { + repub := republisher.NewRepublisher(namesys, repo.Datastore(), privKey, repo.Keystore()) - if cfg.Ipns.RepublishPeriod != "" { - d, err := time.ParseDuration(cfg.Ipns.RepublishPeriod) - if err != nil { - return fmt.Errorf("failure to parse config setting IPNS.RepublishPeriod: %s", err) - } + if repubPeriod != 0 { + if !util.Debug && (repubPeriod < time.Minute || repubPeriod > (time.Hour*24)) { + return fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", repubPeriod) + } - if !util.Debug && (d < time.Minute || d > (time.Hour*24)) { - return fmt.Errorf("config setting IPNS.RepublishPeriod is not between 1min and 1day: %s", d) + repub.Interval = repubPeriod } - repub.Interval = d - } - - if cfg.Ipns.RecordLifetime != "" { - d, err := time.ParseDuration(cfg.Ipns.RecordLifetime) - if err != nil { - return fmt.Errorf("failure to parse config setting IPNS.RecordLifetime: %s", err) + if recordLifetime != 0 { + repub.RecordLifetime = recordLifetime } - repub.RecordLifetime = d + lc.Append(repub.Run) + return nil } - - lc.Append(repub.Run) - return nil } diff --git a/core/node/libp2p/discovery.go b/core/node/libp2p/discovery.go index 1dd8def2ed7..f1351d6468f 100644 --- a/core/node/libp2p/discovery.go +++ b/core/node/libp2p/discovery.go @@ -4,12 +4,12 @@ import ( "context" "time" - "github.com/ipfs/go-ipfs-config" - "github.com/ipfs/go-ipfs/core/node/helpers" "github.com/libp2p/go-libp2p-host" "github.com/libp2p/go-libp2p-peerstore" "github.com/libp2p/go-libp2p/p2p/discovery" "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/core/node/helpers" ) const discoveryConnTimeout = time.Second * 30 @@ -35,18 +35,19 @@ func DiscoveryHandler(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) } } -func SetupDiscovery(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, host host.Host, handler *discoveryHandler) error { - if cfg.Discovery.MDNS.Enabled { - mdns := cfg.Discovery.MDNS - if mdns.Interval == 0 { - mdns.Interval = 5 - } - service, err := discovery.NewMdnsService(helpers.LifecycleCtx(mctx, lc), host, time.Duration(mdns.Interval)*time.Second, discovery.ServiceTag) - if err != nil { - log.Error("mdns error: ", err) - return nil +func SetupDiscovery(mdns bool, mdnsInterval int) func(helpers.MetricsCtx, fx.Lifecycle, host.Host, *discoveryHandler) error { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, handler *discoveryHandler) error { + if mdns { + if mdnsInterval == 0 { + mdnsInterval = 5 + } + service, err := discovery.NewMdnsService(helpers.LifecycleCtx(mctx, lc), host, time.Duration(mdnsInterval)*time.Second, discovery.ServiceTag) + if err != nil { + log.Error("mdns error: ", err) + return nil + } + service.RegisterNotifee(handler) } - service.RegisterNotifee(handler) + return nil } - return nil } diff --git a/core/node/libp2p/libp2p.go b/core/node/libp2p/libp2p.go index 95497386fa7..7faccb5971c 100644 --- a/core/node/libp2p/libp2p.go +++ b/core/node/libp2p/libp2p.go @@ -11,7 +11,6 @@ import ( "time" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-ipfs-config" nilrouting "github.com/ipfs/go-ipfs-routing/none" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" @@ -25,7 +24,6 @@ import ( "github.com/libp2p/go-libp2p-metrics" "github.com/libp2p/go-libp2p-peer" "github.com/libp2p/go-libp2p-peerstore" - "github.com/libp2p/go-libp2p-peerstore/pstoremem" "github.com/libp2p/go-libp2p-pnet" "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p-pubsub-router" @@ -87,38 +85,30 @@ var DHTOption RoutingOption = constructDHTRouting var DHTClientOption RoutingOption = constructClientDHTRouting var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting -func Peerstore(id peer.ID, sk crypto.PrivKey) (peerstore.Peerstore, error) { - ps := pstoremem.NewPeerstore() - - if sk != nil { - if err := ps.AddPubKey(id, sk.GetPublic()); err != nil { - return nil, err - } - if err := ps.AddPrivKey(id, sk); err != nil { - return nil, err - } +func PstoreAddSelfKeys(id peer.ID, sk crypto.PrivKey, ps peerstore.Peerstore) error { + if err := ps.AddPubKey(id, sk.GetPublic()); err != nil { + return err } - return ps, nil + return ps.AddPrivKey(id, sk) } -func AddrFilters(cfg *config.Config) (opts Libp2pOpts, err error) { - for _, s := range cfg.Swarm.AddrFilters { - f, err := mamask.NewMask(s) - if err != nil { - return opts, fmt.Errorf("incorrectly formatted address filter in config: %s", s) +func AddrFilters(filters []string) func() (opts Libp2pOpts, err error) { + return func() (opts Libp2pOpts, err error) { + for _, s := range filters { + f, err := mamask.NewMask(s) + if err != nil { + return opts, fmt.Errorf("incorrectly formatted address filter in config: %s", s) + } + opts.Opts = append(opts.Opts, libp2p.FilterAddresses(f)) } - opts.Opts = append(opts.Opts, libp2p.FilterAddresses(f)) + return opts, nil } - return opts, nil } -func BandwidthCounter(cfg *config.Config) (opts Libp2pOpts, reporter metrics.Reporter) { +func BandwidthCounter() (opts Libp2pOpts, reporter metrics.Reporter) { reporter = metrics.NewBandwidthCounter() - - if !cfg.Swarm.DisableBandwidthMetrics { - opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter)) - } + opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter)) return opts, reporter } @@ -183,9 +173,9 @@ func PNetChecker(repo repo.Repo, ph host.Host, lc fx.Lifecycle) error { return nil } -func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) { +func makeAddrsFactory(announce []string, noAnnounce []string) (p2pbhost.AddrsFactory, error) { var annAddrs []ma.Multiaddr - for _, addr := range cfg.Announce { + for _, addr := range announce { maddr, err := ma.NewMultiaddr(addr) if err != nil { return nil, err @@ -195,7 +185,7 @@ func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) { filters := mafilter.NewFilters() noAnnAddrs := map[string]bool{} - for _, addr := range cfg.NoAnnounce { + for _, addr := range noAnnounce { f, err := mamask.NewMask(addr) if err == nil { filters.AddDialFilter(f) @@ -229,41 +219,23 @@ func makeAddrsFactory(cfg config.Addresses) (p2pbhost.AddrsFactory, error) { }, nil } -func AddrsFactory(cfg *config.Config) (opts Libp2pOpts, err error) { - addrsFactory, err := makeAddrsFactory(cfg.Addresses) - if err != nil { - return opts, err +func AddrsFactory(announce []string, noAnnounce []string) func() (opts Libp2pOpts, err error) { + return func() (opts Libp2pOpts, err error) { + addrsFactory, err := makeAddrsFactory(announce, noAnnounce) + if err != nil { + return opts, err + } + opts.Opts = append(opts.Opts, libp2p.AddrsFactory(addrsFactory)) + return } - opts.Opts = append(opts.Opts, libp2p.AddrsFactory(addrsFactory)) - return } -func ConnectionManager(cfg *config.Config) (opts Libp2pOpts, err error) { - grace := config.DefaultConnMgrGracePeriod - low := config.DefaultConnMgrHighWater - high := config.DefaultConnMgrHighWater - - switch cfg.Swarm.ConnMgr.Type { - case "": - // 'default' value is the basic connection manager +func ConnectionManager(low, high int, grace time.Duration) func() (opts Libp2pOpts, err error) { + return func() (opts Libp2pOpts, err error) { + cm := connmgr.NewConnManager(low, high, grace) + opts.Opts = append(opts.Opts, libp2p.ConnectionManager(cm)) return - case "none": - return opts, nil - case "basic": - grace, err = time.ParseDuration(cfg.Swarm.ConnMgr.GracePeriod) - if err != nil { - return opts, fmt.Errorf("parsing Swarm.ConnMgr.GracePeriod: %s", err) - } - - low = cfg.Swarm.ConnMgr.LowWater - high = cfg.Swarm.ConnMgr.HighWater - default: - return opts, fmt.Errorf("unrecognized ConnMgr.Type: %q", cfg.Swarm.ConnMgr.Type) } - - cm := connmgr.NewConnManager(low, high, grace) - opts.Opts = append(opts.Opts, libp2p.ConnectionManager(cm)) - return } func makeSmuxTransportOption(mplexExp bool) libp2p.Option { @@ -315,32 +287,29 @@ func SmuxTransport(mplex bool) func() (opts Libp2pOpts, err error) { } } -func NatPortMap(cfg *config.Config) (opts Libp2pOpts, err error) { - if !cfg.Swarm.DisableNatPortMap { - opts.Opts = append(opts.Opts, libp2p.NATPortMap()) - } +func NatPortMap() (opts Libp2pOpts, err error) { + opts.Opts = append(opts.Opts, libp2p.NATPortMap()) return } -func Relay(cfg *config.Config) (opts Libp2pOpts, err error) { - if cfg.Swarm.DisableRelay { - // Enabled by default. - opts.Opts = append(opts.Opts, libp2p.DisableRelay()) - } else { - relayOpts := []relay.RelayOpt{relay.OptDiscovery} - if cfg.Swarm.EnableRelayHop { - relayOpts = append(relayOpts, relay.OptHop) +func Relay(disable, enableHop bool) func() (opts Libp2pOpts, err error) { + return func() (opts Libp2pOpts, err error) { + if disable { + // Enabled by default. + opts.Opts = append(opts.Opts, libp2p.DisableRelay()) + } else { + relayOpts := []relay.RelayOpt{relay.OptDiscovery} + if enableHop { + relayOpts = append(relayOpts, relay.OptHop) + } + opts.Opts = append(opts.Opts, libp2p.EnableRelay(relayOpts...)) } - opts.Opts = append(opts.Opts, libp2p.EnableRelay(relayOpts...)) + return } - return } -func AutoRealy(cfg *config.Config) (opts Libp2pOpts, err error) { - // enable autorelay - if cfg.Swarm.EnableAutoRelay { - opts.Opts = append(opts.Opts, libp2p.EnableAutoRelay()) - } +func AutoRealy() (opts Libp2pOpts, err error) { + opts.Opts = append(opts.Opts, libp2p.EnableAutoRelay()) return } @@ -349,14 +318,12 @@ func DefaultTransports() (opts Libp2pOpts, err error) { return } -func QUIC(cfg *config.Config) (opts Libp2pOpts, err error) { - if cfg.Experimental.QUIC { - opts.Opts = append(opts.Opts, libp2p.Transport(libp2pquic.NewTransport)) - } +func QUIC() (opts Libp2pOpts, err error) { + opts.Opts = append(opts.Opts, libp2p.Transport(libp2pquic.NewTransport)) return } -func Security(enabled bool) interface{} { +func Security(enabled, preferTLS bool) interface{} { if !enabled { return func() (opts Libp2pOpts) { // TODO: shouldn't this be Errorf to guarantee visibility? @@ -366,8 +333,8 @@ func Security(enabled bool) interface{} { return opts } } - return func(cfg *config.Config) (opts Libp2pOpts) { - if cfg.Experimental.PreferTLS { + return func() (opts Libp2pOpts) { + if preferTLS { opts.Opts = append(opts.Opts, libp2p.ChainOptions(libp2p.Security(tls.ID, tls.New), libp2p.Security(secio.ID, secio.New))) } else { opts.Opts = append(opts.Opts, libp2p.ChainOptions(libp2p.Security(secio.ID, secio.New), libp2p.Security(tls.ID, tls.New))) @@ -524,58 +491,42 @@ func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) ( }, psRouter } -func AutoNATService(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, host host.Host) error { - if !cfg.Swarm.EnableAutoNATService { - return nil - } +func AutoNATService(quic bool) func(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) error { + return func(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) error { + // collect private net option in case swarm.key is presented + opts, _, err := PNet(repo) + if err != nil { + // swarm key exists but was failed to decode + return err + } - // collect private net option in case swarm.key is presented - opts, _, err := PNet(repo) - if err != nil { - // swarm key exists but was failed to decode - return err - } + if quic { + opts.Opts = append(opts.Opts, libp2p.DefaultTransports, libp2p.Transport(libp2pquic.NewTransport)) + } - if cfg.Experimental.QUIC { - opts.Opts = append(opts.Opts, libp2p.DefaultTransports, libp2p.Transport(libp2pquic.NewTransport)) + _, err = autonat.NewAutoNATService(helpers.LifecycleCtx(mctx, lc), host, opts.Opts...) + return err } - - _, err = autonat.NewAutoNATService(helpers.LifecycleCtx(mctx, lc), host, opts.Opts...) - return err } -func Pubsub(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, cfg *config.Config) (service *pubsub.PubSub, err error) { - var pubsubOptions []pubsub.Option - if cfg.Pubsub.DisableSigning { - pubsubOptions = append(pubsubOptions, pubsub.WithMessageSigning(false)) +func FloodSub(pubsubOptions ...pubsub.Option) interface{} { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) { + return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...) } +} - if cfg.Pubsub.StrictSignatureVerification { - pubsubOptions = append(pubsubOptions, pubsub.WithStrictSignatureVerification(true)) - } - - switch cfg.Pubsub.Router { - case "": - fallthrough - case "floodsub": - service, err = pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...) - - case "gossipsub": - service, err = pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...) - - default: - err = fmt.Errorf("Unknown pubsub router %s", cfg.Pubsub.Router) +func GossipSub(pubsubOptions ...pubsub.Option) interface{} { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) { + return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...) } - - return service, err } -func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { +func listenAddresses(addresses []string) ([]ma.Multiaddr, error) { var listen []ma.Multiaddr - for _, addr := range cfg.Addresses.Swarm { + for _, addr := range addresses { maddr, err := ma.NewMultiaddr(addr) if err != nil { - return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", cfg.Addresses.Swarm) + return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", addresses) } listen = append(listen, maddr) } @@ -583,22 +534,24 @@ func listenAddresses(cfg *config.Config) ([]ma.Multiaddr, error) { return listen, nil } -func StartListening(host host.Host, cfg *config.Config) error { - listenAddrs, err := listenAddresses(cfg) - if err != nil { - return err - } +func StartListening(addresses []string) func(host host.Host) error { + return func(host host.Host) error { + listenAddrs, err := listenAddresses(addresses) + if err != nil { + return err + } - // Actually start listening: - if err := host.Network().Listen(listenAddrs...); err != nil { - return err - } + // Actually start listening: + if err := host.Network().Listen(listenAddrs...); err != nil { + return err + } - // list out our addresses - addrs, err := host.Network().InterfaceListenAddresses() - if err != nil { - return err + // list out our addresses + addrs, err := host.Network().InterfaceListenAddresses() + if err != nil { + return err + } + log.Infof("Swarm listening at: %s", addrs) + return nil } - log.Infof("Swarm listening at: %s", addrs) - return nil } diff --git a/core/node/provider.go b/core/node/provider.go index 37b9637a7dd..e85a909141d 100644 --- a/core/node/provider.go +++ b/core/node/provider.go @@ -2,16 +2,12 @@ package node import ( "context" - "fmt" "time" - "github.com/ipfs/go-ipfs-config" - "github.com/ipfs/go-ipld-format" "github.com/libp2p/go-libp2p-routing" "go.uber.org/fx" "github.com/ipfs/go-ipfs/core/node/helpers" - "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/provider" "github.com/ipfs/go-ipfs/repo" "github.com/ipfs/go-ipfs/reprovide" @@ -42,32 +38,10 @@ func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queu } // ReproviderCtor creates new reprovider -func ReproviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, cfg *config.Config, bs BaseBlocks, ds format.DAGService, pinning pin.Pinner, rt routing.IpfsRouting) (*reprovide.Reprovider, error) { - var keyProvider reprovide.KeyChanFunc - - reproviderInterval := kReprovideFrequency - if cfg.Reprovider.Interval != "" { - dur, err := time.ParseDuration(cfg.Reprovider.Interval) - if err != nil { - return nil, err - } - - reproviderInterval = dur - } - - switch cfg.Reprovider.Strategy { - case "all": - fallthrough - case "": - keyProvider = reprovide.NewBlockstoreProvider(bs) - case "roots": - keyProvider = reprovide.NewPinnedProvider(pinning, ds, true) - case "pinned": - keyProvider = reprovide.NewPinnedProvider(pinning, ds, false) - default: - return nil, fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy) +func ReproviderCtor(reproviderInterval time.Duration) func(helpers.MetricsCtx, fx.Lifecycle, routing.IpfsRouting, reprovide.KeyChanFunc) (*reprovide.Reprovider, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt routing.IpfsRouting, keyProvider reprovide.KeyChanFunc) (*reprovide.Reprovider, error) { + return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil } - return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil } // Reprovider runs the reprovider service diff --git a/core/node/storage.go b/core/node/storage.go index 6b58be60b16..5d705359b87 100644 --- a/core/node/storage.go +++ b/core/node/storage.go @@ -42,8 +42,8 @@ func Datastore(repo repo.Repo) datastore.Datastore { type BaseBlocks blockstore.Blockstore // BaseBlockstoreCtor creates cached blockstore backed by the provided datastore -func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsCtx, repo repo.Repo, cfg *config.Config, lc fx.Lifecycle) (bs BaseBlocks, err error) { - return func(mctx helpers.MetricsCtx, repo repo.Repo, cfg *config.Config, lc fx.Lifecycle) (bs BaseBlocks, err error) { +func BaseBlockstoreCtor(cacheOpts blockstore.CacheOpts, nilRepo bool, hashOnRead bool) func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) { + return func(mctx helpers.MetricsCtx, repo repo.Repo, lc fx.Lifecycle) (bs BaseBlocks, err error) { rds := &retrystore.Datastore{ Batching: repo.Datastore(), Delay: time.Millisecond * 200, @@ -54,12 +54,6 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC bs = blockstore.NewBlockstore(rds) bs = &verifbs.VerifBS{Blockstore: bs} - opts := blockstore.DefaultCacheOpts() - opts.HasBloomFilterSize = cfg.Datastore.BloomFilterSize - if !permanent { - opts.HasBloomFilterSize = 0 - } - if !nilRepo { ctx, cancel := context.WithCancel(mctx) @@ -69,7 +63,7 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC return nil }, }) - bs, err = blockstore.CachedBlockstore(ctx, bs, opts) + bs, err = blockstore.CachedBlockstore(ctx, bs, cacheOpts) if err != nil { return nil, err } @@ -78,7 +72,7 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC bs = blockstore.NewIdStore(bs) bs = cidv0v1.NewBlockstore(bs) - if cfg.Datastore.HashOnRead { // TODO: review: this is how it was done originally, is there a reason we can't just pass this directly? + if hashOnRead { // TODO: review: this is how it was done originally, is there a reason we can't just pass this directly? bs.HashOnRead(true) } @@ -87,16 +81,23 @@ func BaseBlockstoreCtor(permanent bool, nilRepo bool) func(mctx helpers.MetricsC } // GcBlockstoreCtor wraps the base blockstore with GC and Filestore layers -func GcBlockstoreCtor(repo repo.Repo, bb BaseBlocks, cfg *config.Config) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) { +func GcBlockstoreCtor(bb BaseBlocks) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore) { gclocker = blockstore.NewGCLocker() gcbs = blockstore.NewGCBlockstore(bb, gclocker) - if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled { - // hash security - fstore = filestore.NewFilestore(bb, repo.FileManager()) // TODO: mark optional - gcbs = blockstore.NewGCBlockstore(fstore, gclocker) - gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs} - } + bs = gcbs + return +} + +// GcBlockstoreCtor wraps GcBlockstore and adds Filestore support +func FilestoreBlockstoreCtor(repo repo.Repo, bb BaseBlocks) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) { + gclocker, gcbs, bs = GcBlockstoreCtor(bb) + + // hash security + fstore = filestore.NewFilestore(bb, repo.FileManager()) + gcbs = blockstore.NewGCBlockstore(fstore, gclocker) + gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs} + bs = gcbs return } diff --git a/reprovide/providers.go b/reprovide/providers.go index 77b19e2f826..bef56a0b720 100644 --- a/reprovide/providers.go +++ b/reprovide/providers.go @@ -20,27 +20,29 @@ func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { } // NewPinnedProvider returns provider supplying pinned keys -func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - set, err := pinSet(ctx, pinning, dag, onlyRoots) - if err != nil { - return nil, err - } +func NewPinnedProvider(onlyRoots bool) func(pinning pin.Pinner, dag ipld.DAGService) KeyChanFunc { + return func(pinning pin.Pinner, dag ipld.DAGService) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + set, err := pinSet(ctx, pinning, dag, onlyRoots) + if err != nil { + return nil, err + } - outCh := make(chan cid.Cid) - go func() { - defer close(outCh) - for c := range set.New { - select { - case <-ctx.Done(): - return - case outCh <- c: + outCh := make(chan cid.Cid) + go func() { + defer close(outCh) + for c := range set.New { + select { + case <-ctx.Done(): + return + case outCh <- c: + } } - } - }() + }() - return outCh, nil + return outCh, nil + } } } From 3a12454c952ea191a803eca305213ed4ac20acce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 29 Apr 2019 23:45:00 +0200 Subject: [PATCH 3/4] constructor: libp2p simpleOpts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/node/libp2p/libp2p.go | 32 ++++++++++++-------------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/core/node/libp2p/libp2p.go b/core/node/libp2p/libp2p.go index 7faccb5971c..300cb0948e9 100644 --- a/core/node/libp2p/libp2p.go +++ b/core/node/libp2p/libp2p.go @@ -280,6 +280,11 @@ func makeSmuxTransportOption(mplexExp bool) libp2p.Option { return libp2p.ChainOptions(opts...) } +var NatPortMap = simpleOpt(libp2p.NATPortMap()) +var AutoRealy = simpleOpt(libp2p.EnableAutoRelay()) +var DefaultTransports = simpleOpt(libp2p.DefaultTransports) +var QUIC = simpleOpt(libp2p.Transport(libp2pquic.NewTransport)) + func SmuxTransport(mplex bool) func() (opts Libp2pOpts, err error) { return func() (opts Libp2pOpts, err error) { opts.Opts = append(opts.Opts, makeSmuxTransportOption(mplex)) @@ -287,11 +292,6 @@ func SmuxTransport(mplex bool) func() (opts Libp2pOpts, err error) { } } -func NatPortMap() (opts Libp2pOpts, err error) { - opts.Opts = append(opts.Opts, libp2p.NATPortMap()) - return -} - func Relay(disable, enableHop bool) func() (opts Libp2pOpts, err error) { return func() (opts Libp2pOpts, err error) { if disable { @@ -308,21 +308,6 @@ func Relay(disable, enableHop bool) func() (opts Libp2pOpts, err error) { } } -func AutoRealy() (opts Libp2pOpts, err error) { - opts.Opts = append(opts.Opts, libp2p.EnableAutoRelay()) - return -} - -func DefaultTransports() (opts Libp2pOpts, err error) { - opts.Opts = append(opts.Opts, libp2p.DefaultTransports) - return -} - -func QUIC() (opts Libp2pOpts, err error) { - opts.Opts = append(opts.Opts, libp2p.Transport(libp2pquic.NewTransport)) - return -} - func Security(enabled, preferTLS bool) interface{} { if !enabled { return func() (opts Libp2pOpts) { @@ -555,3 +540,10 @@ func StartListening(addresses []string) func(host host.Host) error { return nil } } + +func simpleOpt(opt libp2p.Option) func() (opts Libp2pOpts, err error) { + return func() (opts Libp2pOpts, err error) { + opts.Opts = append(opts.Opts, opt) + return + } +} From e133058487c61939f928c33e4f0f28a1e19867ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 30 Apr 2019 00:03:02 +0200 Subject: [PATCH 4/4] constructor: break down libp2p logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Łukasz Magiera --- core/node/groups.go | 1 - core/node/libp2p/addrs.go | 117 ++++++++ core/node/libp2p/host.go | 76 +++++ core/node/libp2p/hostopt.go | 25 ++ core/node/libp2p/libp2p.go | 511 +-------------------------------- core/node/libp2p/nat.go | 32 +++ core/node/libp2p/pnet.go | 70 +++++ core/node/libp2p/pubsub.go | 21 ++ core/node/libp2p/relay.go | 24 ++ core/node/libp2p/routing.go | 108 +++++++ core/node/libp2p/routingopt.go | 36 +++ core/node/libp2p/smux.go | 62 ++++ core/node/libp2p/transport.go | 38 +++ 13 files changed, 613 insertions(+), 508 deletions(-) create mode 100644 core/node/libp2p/addrs.go create mode 100644 core/node/libp2p/host.go create mode 100644 core/node/libp2p/hostopt.go create mode 100644 core/node/libp2p/nat.go create mode 100644 core/node/libp2p/pnet.go create mode 100644 core/node/libp2p/pubsub.go create mode 100644 core/node/libp2p/relay.go create mode 100644 core/node/libp2p/routing.go create mode 100644 core/node/libp2p/routingopt.go create mode 100644 core/node/libp2p/smux.go create mode 100644 core/node/libp2p/transport.go diff --git a/core/node/groups.go b/core/node/groups.go index 549aef81d4d..b0c7fbbbd8f 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -38,7 +38,6 @@ var BaseLibP2P = fx.Options( ) func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { - // parse ConnMgr config grace := config.DefaultConnMgrGracePeriod diff --git a/core/node/libp2p/addrs.go b/core/node/libp2p/addrs.go new file mode 100644 index 00000000000..7bdd0f7a5fc --- /dev/null +++ b/core/node/libp2p/addrs.go @@ -0,0 +1,117 @@ +package libp2p + +import ( + "fmt" + + "github.com/libp2p/go-libp2p" + host "github.com/libp2p/go-libp2p-host" + p2pbhost "github.com/libp2p/go-libp2p/p2p/host/basic" + mafilter "github.com/libp2p/go-maddr-filter" + ma "github.com/multiformats/go-multiaddr" + mamask "github.com/whyrusleeping/multiaddr-filter" +) + +func AddrFilters(filters []string) func() (opts Libp2pOpts, err error) { + return func() (opts Libp2pOpts, err error) { + for _, s := range filters { + f, err := mamask.NewMask(s) + if err != nil { + return opts, fmt.Errorf("incorrectly formatted address filter in config: %s", s) + } + opts.Opts = append(opts.Opts, libp2p.FilterAddresses(f)) + } + return opts, nil + } +} + +func makeAddrsFactory(announce []string, noAnnounce []string) (p2pbhost.AddrsFactory, error) { + var annAddrs []ma.Multiaddr + for _, addr := range announce { + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + return nil, err + } + annAddrs = append(annAddrs, maddr) + } + + filters := mafilter.NewFilters() + noAnnAddrs := map[string]bool{} + for _, addr := range noAnnounce { + f, err := mamask.NewMask(addr) + if err == nil { + filters.AddDialFilter(f) + continue + } + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + return nil, err + } + noAnnAddrs[string(maddr.Bytes())] = true + } + + return func(allAddrs []ma.Multiaddr) []ma.Multiaddr { + var addrs []ma.Multiaddr + if len(annAddrs) > 0 { + addrs = annAddrs + } else { + addrs = allAddrs + } + + var out []ma.Multiaddr + for _, maddr := range addrs { + // check for exact matches + ok := noAnnAddrs[string(maddr.Bytes())] + // check for /ipcidr matches + if !ok && !filters.AddrBlocked(maddr) { + out = append(out, maddr) + } + } + return out + }, nil +} + +func AddrsFactory(announce []string, noAnnounce []string) func() (opts Libp2pOpts, err error) { + return func() (opts Libp2pOpts, err error) { + addrsFactory, err := makeAddrsFactory(announce, noAnnounce) + if err != nil { + return opts, err + } + opts.Opts = append(opts.Opts, libp2p.AddrsFactory(addrsFactory)) + return + } +} + +func listenAddresses(addresses []string) ([]ma.Multiaddr, error) { + var listen []ma.Multiaddr + for _, addr := range addresses { + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", addresses) + } + listen = append(listen, maddr) + } + + return listen, nil +} + +func StartListening(addresses []string) func(host host.Host) error { + return func(host host.Host) error { + listenAddrs, err := listenAddresses(addresses) + if err != nil { + return err + } + + // Actually start listening: + if err := host.Network().Listen(listenAddrs...); err != nil { + return err + } + + // list out our addresses + addrs, err := host.Network().InterfaceListenAddresses() + if err != nil { + return err + } + log.Infof("Swarm listening at: %s", addrs) + return nil + } +} diff --git a/core/node/libp2p/host.go b/core/node/libp2p/host.go new file mode 100644 index 00000000000..f2a9069ce24 --- /dev/null +++ b/core/node/libp2p/host.go @@ -0,0 +1,76 @@ +package libp2p + +import ( + "context" + + "github.com/libp2p/go-libp2p" + host "github.com/libp2p/go-libp2p-host" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" + record "github.com/libp2p/go-libp2p-record" + routing "github.com/libp2p/go-libp2p-routing" + routedhost "github.com/libp2p/go-libp2p/p2p/host/routed" + "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/core/node/helpers" + "github.com/ipfs/go-ipfs/repo" +) + +type P2PHostIn struct { + fx.In + + Repo repo.Repo + Validator record.Validator + HostOption HostOption + RoutingOption RoutingOption + ID peer.ID + Peerstore peerstore.Peerstore + + Opts [][]libp2p.Option `group:"libp2p"` +} + +type P2PHostOut struct { + fx.Out + + Host host.Host + Routing BaseIpfsRouting +} + +func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHostOut, err error) { + opts := []libp2p.Option{libp2p.NoListenAddrs} + for _, o := range params.Opts { + opts = append(opts, o...) + } + + ctx := helpers.LifecycleCtx(mctx, lc) + + opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { + r, err := params.RoutingOption(ctx, h, params.Repo.Datastore(), params.Validator) + out.Routing = r + return r, err + })) + + out.Host, err = params.HostOption(ctx, params.ID, params.Peerstore, opts...) + if err != nil { + return P2PHostOut{}, err + } + + // this code is necessary just for tests: mock network constructions + // ignore the libp2p constructor options that actually construct the routing! + if out.Routing == nil { + r, err := params.RoutingOption(ctx, out.Host, params.Repo.Datastore(), params.Validator) + if err != nil { + return P2PHostOut{}, err + } + out.Routing = r + out.Host = routedhost.Wrap(out.Host, out.Routing) + } + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return out.Host.Close() + }, + }) + + return out, err +} diff --git a/core/node/libp2p/hostopt.go b/core/node/libp2p/hostopt.go new file mode 100644 index 00000000000..984c038bd51 --- /dev/null +++ b/core/node/libp2p/hostopt.go @@ -0,0 +1,25 @@ +package libp2p + +import ( + "context" + "fmt" + + "github.com/libp2p/go-libp2p" + host "github.com/libp2p/go-libp2p-host" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" +) + +type HostOption func(ctx context.Context, id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error) + +var DefaultHostOption HostOption = constructPeerHost + +// isolates the complex initialization steps +func constructPeerHost(ctx context.Context, id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error) { + pkey := ps.PrivKey(id) + if pkey == nil { + return nil, fmt.Errorf("missing private key for node ID: %s", id.Pretty()) + } + options = append([]libp2p.Option{libp2p.Identity(pkey), libp2p.Peerstore(ps)}, options...) + return libp2p.New(ctx, options...) +} diff --git a/core/node/libp2p/libp2p.go b/core/node/libp2p/libp2p.go index 300cb0948e9..758994b5622 100644 --- a/core/node/libp2p/libp2p.go +++ b/core/node/libp2p/libp2p.go @@ -1,234 +1,26 @@ package libp2p import ( - "bytes" - "context" - "fmt" - "io/ioutil" - "os" - "sort" - "strings" "time" - "github.com/ipfs/go-datastore" - nilrouting "github.com/ipfs/go-ipfs-routing/none" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-autonat-svc" - "github.com/libp2p/go-libp2p-circuit" "github.com/libp2p/go-libp2p-connmgr" "github.com/libp2p/go-libp2p-crypto" - "github.com/libp2p/go-libp2p-host" - "github.com/libp2p/go-libp2p-kad-dht" - dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts" - "github.com/libp2p/go-libp2p-metrics" "github.com/libp2p/go-libp2p-peer" "github.com/libp2p/go-libp2p-peerstore" - "github.com/libp2p/go-libp2p-pnet" - "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p-pubsub-router" - "github.com/libp2p/go-libp2p-quic-transport" - "github.com/libp2p/go-libp2p-record" - "github.com/libp2p/go-libp2p-routing" - "github.com/libp2p/go-libp2p-routing-helpers" - secio "github.com/libp2p/go-libp2p-secio" - tls "github.com/libp2p/go-libp2p-tls" - p2pbhost "github.com/libp2p/go-libp2p/p2p/host/basic" - "github.com/libp2p/go-libp2p/p2p/host/routed" - mafilter "github.com/libp2p/go-maddr-filter" - smux "github.com/libp2p/go-stream-muxer" - ma "github.com/multiformats/go-multiaddr" - mplex "github.com/whyrusleeping/go-smux-multiplex" - yamux "github.com/whyrusleeping/go-smux-yamux" - mamask "github.com/whyrusleeping/multiaddr-filter" "go.uber.org/fx" - - "github.com/ipfs/go-ipfs/core/node/helpers" - "github.com/ipfs/go-ipfs/repo" ) var log = logging.Logger("p2pnode") -type HostOption func(ctx context.Context, id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error) -type RoutingOption func(context.Context, host.Host, datastore.Batching, record.Validator) (routing.IpfsRouting, error) - -var DefaultHostOption HostOption = constructPeerHost - -// isolates the complex initialization steps -func constructPeerHost(ctx context.Context, id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error) { - pkey := ps.PrivKey(id) - if pkey == nil { - return nil, fmt.Errorf("missing private key for node ID: %s", id.Pretty()) - } - options = append([]libp2p.Option{libp2p.Identity(pkey), libp2p.Peerstore(ps)}, options...) - return libp2p.New(ctx, options...) -} - -func constructDHTRouting(ctx context.Context, host host.Host, dstore datastore.Batching, validator record.Validator) (routing.IpfsRouting, error) { - return dht.New( - ctx, host, - dhtopts.Datastore(dstore), - dhtopts.Validator(validator), - ) -} - -func constructClientDHTRouting(ctx context.Context, host host.Host, dstore datastore.Batching, validator record.Validator) (routing.IpfsRouting, error) { - return dht.New( - ctx, host, - dhtopts.Client(true), - dhtopts.Datastore(dstore), - dhtopts.Validator(validator), - ) -} - -var DHTOption RoutingOption = constructDHTRouting -var DHTClientOption RoutingOption = constructClientDHTRouting -var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting - -func PstoreAddSelfKeys(id peer.ID, sk crypto.PrivKey, ps peerstore.Peerstore) error { - if err := ps.AddPubKey(id, sk.GetPublic()); err != nil { - return err - } - - return ps.AddPrivKey(id, sk) -} - -func AddrFilters(filters []string) func() (opts Libp2pOpts, err error) { - return func() (opts Libp2pOpts, err error) { - for _, s := range filters { - f, err := mamask.NewMask(s) - if err != nil { - return opts, fmt.Errorf("incorrectly formatted address filter in config: %s", s) - } - opts.Opts = append(opts.Opts, libp2p.FilterAddresses(f)) - } - return opts, nil - } -} - -func BandwidthCounter() (opts Libp2pOpts, reporter metrics.Reporter) { - reporter = metrics.NewBandwidthCounter() - opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter)) - return opts, reporter -} - type Libp2pOpts struct { fx.Out Opts []libp2p.Option `group:"libp2p"` } -type PNetFingerprint []byte - -func PNet(repo repo.Repo) (opts Libp2pOpts, fp PNetFingerprint, err error) { - swarmkey, err := repo.SwarmKey() - if err != nil || swarmkey == nil { - return opts, nil, err - } - - protec, err := pnet.NewProtector(bytes.NewReader(swarmkey)) - if err != nil { - return opts, nil, fmt.Errorf("failed to configure private network: %s", err) - } - fp = protec.Fingerprint() - - opts.Opts = append(opts.Opts, libp2p.PrivateNetwork(protec)) - return opts, fp, nil -} - -func PNetChecker(repo repo.Repo, ph host.Host, lc fx.Lifecycle) error { - // TODO: better check? - swarmkey, err := repo.SwarmKey() - if err != nil || swarmkey == nil { - return err - } - - done := make(chan struct{}) - lc.Append(fx.Hook{ - OnStart: func(_ context.Context) error { - go func() { - t := time.NewTicker(30 * time.Second) - defer t.Stop() - - <-t.C // swallow one tick - for { - select { - case <-t.C: - if len(ph.Network().Peers()) == 0 { - log.Warning("We are in private network and have no peers.") - log.Warning("This might be configuration mistake.") - } - case <-done: - return - } - } - }() - return nil - }, - OnStop: func(_ context.Context) error { - close(done) - return nil - }, - }) - return nil -} - -func makeAddrsFactory(announce []string, noAnnounce []string) (p2pbhost.AddrsFactory, error) { - var annAddrs []ma.Multiaddr - for _, addr := range announce { - maddr, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, err - } - annAddrs = append(annAddrs, maddr) - } - - filters := mafilter.NewFilters() - noAnnAddrs := map[string]bool{} - for _, addr := range noAnnounce { - f, err := mamask.NewMask(addr) - if err == nil { - filters.AddDialFilter(f) - continue - } - maddr, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, err - } - noAnnAddrs[string(maddr.Bytes())] = true - } - - return func(allAddrs []ma.Multiaddr) []ma.Multiaddr { - var addrs []ma.Multiaddr - if len(annAddrs) > 0 { - addrs = annAddrs - } else { - addrs = allAddrs - } - - var out []ma.Multiaddr - for _, maddr := range addrs { - // check for exact matches - ok := noAnnAddrs[string(maddr.Bytes())] - // check for /ipcidr matches - if !ok && !filters.AddrBlocked(maddr) { - out = append(out, maddr) - } - } - return out - }, nil -} - -func AddrsFactory(announce []string, noAnnounce []string) func() (opts Libp2pOpts, err error) { - return func() (opts Libp2pOpts, err error) { - addrsFactory, err := makeAddrsFactory(announce, noAnnounce) - if err != nil { - return opts, err - } - opts.Opts = append(opts.Opts, libp2p.AddrsFactory(addrsFactory)) - return - } -} +// Misc options func ConnectionManager(low, high int, grace time.Duration) func() (opts Libp2pOpts, err error) { return func() (opts Libp2pOpts, err error) { @@ -238,307 +30,12 @@ func ConnectionManager(low, high int, grace time.Duration) func() (opts Libp2pOp } } -func makeSmuxTransportOption(mplexExp bool) libp2p.Option { - const yamuxID = "/yamux/1.0.0" - const mplexID = "/mplex/6.7.0" - - ymxtpt := &yamux.Transport{ - AcceptBacklog: 512, - ConnectionWriteTimeout: time.Second * 10, - KeepAliveInterval: time.Second * 30, - EnableKeepAlive: true, - MaxStreamWindowSize: uint32(16 * 1024 * 1024), // 16MiB - LogOutput: ioutil.Discard, - } - - if os.Getenv("YAMUX_DEBUG") != "" { - ymxtpt.LogOutput = os.Stderr - } - - muxers := map[string]smux.Transport{yamuxID: ymxtpt} - if mplexExp { - muxers[mplexID] = mplex.DefaultTransport - } - - // Allow muxer preference order overriding - order := []string{yamuxID, mplexID} - if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" { - order = strings.Fields(prefs) - } - - opts := make([]libp2p.Option, 0, len(order)) - for _, id := range order { - tpt, ok := muxers[id] - if !ok { - log.Warning("unknown or duplicate muxer in LIBP2P_MUX_PREFS: %s", id) - continue - } - delete(muxers, id) - opts = append(opts, libp2p.Muxer(id, tpt)) - } - - return libp2p.ChainOptions(opts...) -} - -var NatPortMap = simpleOpt(libp2p.NATPortMap()) -var AutoRealy = simpleOpt(libp2p.EnableAutoRelay()) -var DefaultTransports = simpleOpt(libp2p.DefaultTransports) -var QUIC = simpleOpt(libp2p.Transport(libp2pquic.NewTransport)) - -func SmuxTransport(mplex bool) func() (opts Libp2pOpts, err error) { - return func() (opts Libp2pOpts, err error) { - opts.Opts = append(opts.Opts, makeSmuxTransportOption(mplex)) - return - } -} - -func Relay(disable, enableHop bool) func() (opts Libp2pOpts, err error) { - return func() (opts Libp2pOpts, err error) { - if disable { - // Enabled by default. - opts.Opts = append(opts.Opts, libp2p.DisableRelay()) - } else { - relayOpts := []relay.RelayOpt{relay.OptDiscovery} - if enableHop { - relayOpts = append(relayOpts, relay.OptHop) - } - opts.Opts = append(opts.Opts, libp2p.EnableRelay(relayOpts...)) - } - return - } -} - -func Security(enabled, preferTLS bool) interface{} { - if !enabled { - return func() (opts Libp2pOpts) { - // TODO: shouldn't this be Errorf to guarantee visibility? - log.Warningf(`Your IPFS node has been configured to run WITHOUT ENCRYPTED CONNECTIONS. - You will not be able to connect to any nodes configured to use encrypted connections`) - opts.Opts = append(opts.Opts, libp2p.NoSecurity) - return opts - } - } - return func() (opts Libp2pOpts) { - if preferTLS { - opts.Opts = append(opts.Opts, libp2p.ChainOptions(libp2p.Security(tls.ID, tls.New), libp2p.Security(secio.ID, secio.New))) - } else { - opts.Opts = append(opts.Opts, libp2p.ChainOptions(libp2p.Security(secio.ID, secio.New), libp2p.Security(tls.ID, tls.New))) - } - return opts - } -} - -type P2PHostIn struct { - fx.In - - Repo repo.Repo - Validator record.Validator - HostOption HostOption - RoutingOption RoutingOption - ID peer.ID - Peerstore peerstore.Peerstore - - Opts [][]libp2p.Option `group:"libp2p"` -} - -type BaseIpfsRouting routing.IpfsRouting -type P2PHostOut struct { - fx.Out - - Host host.Host - Routing BaseIpfsRouting -} - -func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHostOut, err error) { - opts := []libp2p.Option{libp2p.NoListenAddrs} - for _, o := range params.Opts { - opts = append(opts, o...) - } - - ctx := helpers.LifecycleCtx(mctx, lc) - - opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { - r, err := params.RoutingOption(ctx, h, params.Repo.Datastore(), params.Validator) - out.Routing = r - return r, err - })) - - out.Host, err = params.HostOption(ctx, params.ID, params.Peerstore, opts...) - if err != nil { - return P2PHostOut{}, err - } - - // this code is necessary just for tests: mock network constructions - // ignore the libp2p constructor options that actually construct the routing! - if out.Routing == nil { - r, err := params.RoutingOption(ctx, out.Host, params.Repo.Datastore(), params.Validator) - if err != nil { - return P2PHostOut{}, err - } - out.Routing = r - out.Host = routedhost.Wrap(out.Host, out.Routing) - } - - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return out.Host.Close() - }, - }) - - return out, err -} - -type Router struct { - routing.IpfsRouting - - Priority int // less = more important -} - -type p2pRouterOut struct { - fx.Out - - Router Router `group:"routers"` -} - -func BaseRouting(lc fx.Lifecycle, in BaseIpfsRouting) (out p2pRouterOut, dr *dht.IpfsDHT) { - if dht, ok := in.(*dht.IpfsDHT); ok { - dr = dht - - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return dr.Close() - }, - }) - } - - return p2pRouterOut{ - Router: Router{ - Priority: 1000, - IpfsRouting: in, - }, - }, dr -} - -type p2pOnlineRoutingIn struct { - fx.In - - Routers []Router `group:"routers"` - Validator record.Validator -} - -func Routing(in p2pOnlineRoutingIn) routing.IpfsRouting { - routers := in.Routers - - sort.SliceStable(routers, func(i, j int) bool { - return routers[i].Priority < routers[j].Priority - }) - - irouters := make([]routing.IpfsRouting, len(routers)) - for i, v := range routers { - irouters[i] = v.IpfsRouting - } - - return routinghelpers.Tiered{ - Routers: irouters, - Validator: in.Validator, - } -} - -type p2pPSRoutingIn struct { - fx.In - - BaseRouting BaseIpfsRouting - Repo repo.Repo - Validator record.Validator - Host host.Host - PubSub *pubsub.PubSub `optional:"true"` -} - -func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore) { - psRouter := namesys.NewPubsubValueStore( - helpers.LifecycleCtx(mctx, lc), - in.Host, - in.BaseRouting, - in.PubSub, - in.Validator, - ) - - return p2pRouterOut{ - Router: Router{ - IpfsRouting: &routinghelpers.Compose{ - ValueStore: &routinghelpers.LimitedValueStore{ - ValueStore: psRouter, - Namespaces: []string{"ipns"}, - }, - }, - Priority: 100, - }, - }, psRouter -} - -func AutoNATService(quic bool) func(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) error { - return func(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) error { - // collect private net option in case swarm.key is presented - opts, _, err := PNet(repo) - if err != nil { - // swarm key exists but was failed to decode - return err - } - - if quic { - opts.Opts = append(opts.Opts, libp2p.DefaultTransports, libp2p.Transport(libp2pquic.NewTransport)) - } - - _, err = autonat.NewAutoNATService(helpers.LifecycleCtx(mctx, lc), host, opts.Opts...) +func PstoreAddSelfKeys(id peer.ID, sk crypto.PrivKey, ps peerstore.Peerstore) error { + if err := ps.AddPubKey(id, sk.GetPublic()); err != nil { return err } -} - -func FloodSub(pubsubOptions ...pubsub.Option) interface{} { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) { - return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...) - } -} - -func GossipSub(pubsubOptions ...pubsub.Option) interface{} { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) { - return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...) - } -} -func listenAddresses(addresses []string) ([]ma.Multiaddr, error) { - var listen []ma.Multiaddr - for _, addr := range addresses { - maddr, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", addresses) - } - listen = append(listen, maddr) - } - - return listen, nil -} - -func StartListening(addresses []string) func(host host.Host) error { - return func(host host.Host) error { - listenAddrs, err := listenAddresses(addresses) - if err != nil { - return err - } - - // Actually start listening: - if err := host.Network().Listen(listenAddrs...); err != nil { - return err - } - - // list out our addresses - addrs, err := host.Network().InterfaceListenAddresses() - if err != nil { - return err - } - log.Infof("Swarm listening at: %s", addrs) - return nil - } + return ps.AddPrivKey(id, sk) } func simpleOpt(opt libp2p.Option) func() (opts Libp2pOpts, err error) { diff --git a/core/node/libp2p/nat.go b/core/node/libp2p/nat.go new file mode 100644 index 00000000000..b4aadf68593 --- /dev/null +++ b/core/node/libp2p/nat.go @@ -0,0 +1,32 @@ +package libp2p + +import ( + "github.com/libp2p/go-libp2p" + autonat "github.com/libp2p/go-libp2p-autonat-svc" + host "github.com/libp2p/go-libp2p-host" + libp2pquic "github.com/libp2p/go-libp2p-quic-transport" + "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/core/node/helpers" + "github.com/ipfs/go-ipfs/repo" +) + +var NatPortMap = simpleOpt(libp2p.NATPortMap()) + +func AutoNATService(quic bool) func(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) error { + return func(repo repo.Repo, mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) error { + // collect private net option in case swarm.key is presented + opts, _, err := PNet(repo) + if err != nil { + // swarm key exists but was failed to decode + return err + } + + if quic { + opts.Opts = append(opts.Opts, libp2p.DefaultTransports, libp2p.Transport(libp2pquic.NewTransport)) + } + + _, err = autonat.NewAutoNATService(helpers.LifecycleCtx(mctx, lc), host, opts.Opts...) + return err + } +} diff --git a/core/node/libp2p/pnet.go b/core/node/libp2p/pnet.go new file mode 100644 index 00000000000..5f7a3763269 --- /dev/null +++ b/core/node/libp2p/pnet.go @@ -0,0 +1,70 @@ +package libp2p + +import ( + "bytes" + "context" + "fmt" + "time" + + "github.com/libp2p/go-libp2p" + host "github.com/libp2p/go-libp2p-host" + pnet "github.com/libp2p/go-libp2p-pnet" + "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/repo" +) + +type PNetFingerprint []byte + +func PNet(repo repo.Repo) (opts Libp2pOpts, fp PNetFingerprint, err error) { + swarmkey, err := repo.SwarmKey() + if err != nil || swarmkey == nil { + return opts, nil, err + } + + protec, err := pnet.NewProtector(bytes.NewReader(swarmkey)) + if err != nil { + return opts, nil, fmt.Errorf("failed to configure private network: %s", err) + } + fp = protec.Fingerprint() + + opts.Opts = append(opts.Opts, libp2p.PrivateNetwork(protec)) + return opts, fp, nil +} + +func PNetChecker(repo repo.Repo, ph host.Host, lc fx.Lifecycle) error { + // TODO: better check? + swarmkey, err := repo.SwarmKey() + if err != nil || swarmkey == nil { + return err + } + + done := make(chan struct{}) + lc.Append(fx.Hook{ + OnStart: func(_ context.Context) error { + go func() { + t := time.NewTicker(30 * time.Second) + defer t.Stop() + + <-t.C // swallow one tick + for { + select { + case <-t.C: + if len(ph.Network().Peers()) == 0 { + log.Warning("We are in private network and have no peers.") + log.Warning("This might be configuration mistake.") + } + case <-done: + return + } + } + }() + return nil + }, + OnStop: func(_ context.Context) error { + close(done) + return nil + }, + }) + return nil +} diff --git a/core/node/libp2p/pubsub.go b/core/node/libp2p/pubsub.go new file mode 100644 index 00000000000..4dd3f096566 --- /dev/null +++ b/core/node/libp2p/pubsub.go @@ -0,0 +1,21 @@ +package libp2p + +import ( + host "github.com/libp2p/go-libp2p-host" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/core/node/helpers" +) + +func FloodSub(pubsubOptions ...pubsub.Option) interface{} { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) { + return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...) + } +} + +func GossipSub(pubsubOptions ...pubsub.Option) interface{} { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) { + return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...) + } +} diff --git a/core/node/libp2p/relay.go b/core/node/libp2p/relay.go new file mode 100644 index 00000000000..b9e8afa49cc --- /dev/null +++ b/core/node/libp2p/relay.go @@ -0,0 +1,24 @@ +package libp2p + +import ( + "github.com/libp2p/go-libp2p" + relay "github.com/libp2p/go-libp2p-circuit" +) + +func Relay(disable, enableHop bool) func() (opts Libp2pOpts, err error) { + return func() (opts Libp2pOpts, err error) { + if disable { + // Enabled by default. + opts.Opts = append(opts.Opts, libp2p.DisableRelay()) + } else { + relayOpts := []relay.RelayOpt{relay.OptDiscovery} + if enableHop { + relayOpts = append(relayOpts, relay.OptHop) + } + opts.Opts = append(opts.Opts, libp2p.EnableRelay(relayOpts...)) + } + return + } +} + +var AutoRealy = simpleOpt(libp2p.EnableAutoRelay()) diff --git a/core/node/libp2p/routing.go b/core/node/libp2p/routing.go new file mode 100644 index 00000000000..9dc518871d4 --- /dev/null +++ b/core/node/libp2p/routing.go @@ -0,0 +1,108 @@ +package libp2p + +import ( + "context" + "sort" + + host "github.com/libp2p/go-libp2p-host" + dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p-pubsub" + namesys "github.com/libp2p/go-libp2p-pubsub-router" + record "github.com/libp2p/go-libp2p-record" + routing "github.com/libp2p/go-libp2p-routing" + routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" + "go.uber.org/fx" + + "github.com/ipfs/go-ipfs/core/node/helpers" + "github.com/ipfs/go-ipfs/repo" +) + +type BaseIpfsRouting routing.IpfsRouting + +type Router struct { + routing.IpfsRouting + + Priority int // less = more important +} + +type p2pRouterOut struct { + fx.Out + + Router Router `group:"routers"` +} + +func BaseRouting(lc fx.Lifecycle, in BaseIpfsRouting) (out p2pRouterOut, dr *dht.IpfsDHT) { + if dht, ok := in.(*dht.IpfsDHT); ok { + dr = dht + + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return dr.Close() + }, + }) + } + + return p2pRouterOut{ + Router: Router{ + Priority: 1000, + IpfsRouting: in, + }, + }, dr +} + +type p2pOnlineRoutingIn struct { + fx.In + + Routers []Router `group:"routers"` + Validator record.Validator +} + +func Routing(in p2pOnlineRoutingIn) routing.IpfsRouting { + routers := in.Routers + + sort.SliceStable(routers, func(i, j int) bool { + return routers[i].Priority < routers[j].Priority + }) + + irouters := make([]routing.IpfsRouting, len(routers)) + for i, v := range routers { + irouters[i] = v.IpfsRouting + } + + return routinghelpers.Tiered{ + Routers: irouters, + Validator: in.Validator, + } +} + +type p2pPSRoutingIn struct { + fx.In + + BaseRouting BaseIpfsRouting + Repo repo.Repo + Validator record.Validator + Host host.Host + PubSub *pubsub.PubSub `optional:"true"` +} + +func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore) { + psRouter := namesys.NewPubsubValueStore( + helpers.LifecycleCtx(mctx, lc), + in.Host, + in.BaseRouting, + in.PubSub, + in.Validator, + ) + + return p2pRouterOut{ + Router: Router{ + IpfsRouting: &routinghelpers.Compose{ + ValueStore: &routinghelpers.LimitedValueStore{ + ValueStore: psRouter, + Namespaces: []string{"ipns"}, + }, + }, + Priority: 100, + }, + }, psRouter +} diff --git a/core/node/libp2p/routingopt.go b/core/node/libp2p/routingopt.go new file mode 100644 index 00000000000..5c756743668 --- /dev/null +++ b/core/node/libp2p/routingopt.go @@ -0,0 +1,36 @@ +package libp2p + +import ( + "context" + + "github.com/ipfs/go-datastore" + nilrouting "github.com/ipfs/go-ipfs-routing/none" + host "github.com/libp2p/go-libp2p-host" + dht "github.com/libp2p/go-libp2p-kad-dht" + dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts" + record "github.com/libp2p/go-libp2p-record" + routing "github.com/libp2p/go-libp2p-routing" +) + +type RoutingOption func(context.Context, host.Host, datastore.Batching, record.Validator) (routing.IpfsRouting, error) + +func constructDHTRouting(ctx context.Context, host host.Host, dstore datastore.Batching, validator record.Validator) (routing.IpfsRouting, error) { + return dht.New( + ctx, host, + dhtopts.Datastore(dstore), + dhtopts.Validator(validator), + ) +} + +func constructClientDHTRouting(ctx context.Context, host host.Host, dstore datastore.Batching, validator record.Validator) (routing.IpfsRouting, error) { + return dht.New( + ctx, host, + dhtopts.Client(true), + dhtopts.Datastore(dstore), + dhtopts.Validator(validator), + ) +} + +var DHTOption RoutingOption = constructDHTRouting +var DHTClientOption RoutingOption = constructClientDHTRouting +var NilRouterOption RoutingOption = nilrouting.ConstructNilRouting diff --git a/core/node/libp2p/smux.go b/core/node/libp2p/smux.go new file mode 100644 index 00000000000..0fe27bb1984 --- /dev/null +++ b/core/node/libp2p/smux.go @@ -0,0 +1,62 @@ +package libp2p + +import ( + "io/ioutil" + "os" + "strings" + "time" + + "github.com/libp2p/go-libp2p" + smux "github.com/libp2p/go-stream-muxer" + mplex "github.com/whyrusleeping/go-smux-multiplex" + yamux "github.com/whyrusleeping/go-smux-yamux" +) + +func makeSmuxTransportOption(mplexExp bool) libp2p.Option { + const yamuxID = "/yamux/1.0.0" + const mplexID = "/mplex/6.7.0" + + ymxtpt := &yamux.Transport{ + AcceptBacklog: 512, + ConnectionWriteTimeout: time.Second * 10, + KeepAliveInterval: time.Second * 30, + EnableKeepAlive: true, + MaxStreamWindowSize: uint32(16 * 1024 * 1024), // 16MiB + LogOutput: ioutil.Discard, + } + + if os.Getenv("YAMUX_DEBUG") != "" { + ymxtpt.LogOutput = os.Stderr + } + + muxers := map[string]smux.Transport{yamuxID: ymxtpt} + if mplexExp { + muxers[mplexID] = mplex.DefaultTransport + } + + // Allow muxer preference order overriding + order := []string{yamuxID, mplexID} + if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" { + order = strings.Fields(prefs) + } + + opts := make([]libp2p.Option, 0, len(order)) + for _, id := range order { + tpt, ok := muxers[id] + if !ok { + log.Warning("unknown or duplicate muxer in LIBP2P_MUX_PREFS: %s", id) + continue + } + delete(muxers, id) + opts = append(opts, libp2p.Muxer(id, tpt)) + } + + return libp2p.ChainOptions(opts...) +} + +func SmuxTransport(mplex bool) func() (opts Libp2pOpts, err error) { + return func() (opts Libp2pOpts, err error) { + opts.Opts = append(opts.Opts, makeSmuxTransportOption(mplex)) + return + } +} diff --git a/core/node/libp2p/transport.go b/core/node/libp2p/transport.go new file mode 100644 index 00000000000..c4572724eaa --- /dev/null +++ b/core/node/libp2p/transport.go @@ -0,0 +1,38 @@ +package libp2p + +import ( + "github.com/libp2p/go-libp2p" + metrics "github.com/libp2p/go-libp2p-metrics" + libp2pquic "github.com/libp2p/go-libp2p-quic-transport" + secio "github.com/libp2p/go-libp2p-secio" + tls "github.com/libp2p/go-libp2p-tls" +) + +var DefaultTransports = simpleOpt(libp2p.DefaultTransports) +var QUIC = simpleOpt(libp2p.Transport(libp2pquic.NewTransport)) + +func Security(enabled, preferTLS bool) interface{} { + if !enabled { + return func() (opts Libp2pOpts) { + // TODO: shouldn't this be Errorf to guarantee visibility? + log.Warningf(`Your IPFS node has been configured to run WITHOUT ENCRYPTED CONNECTIONS. + You will not be able to connect to any nodes configured to use encrypted connections`) + opts.Opts = append(opts.Opts, libp2p.NoSecurity) + return opts + } + } + return func() (opts Libp2pOpts) { + if preferTLS { + opts.Opts = append(opts.Opts, libp2p.ChainOptions(libp2p.Security(tls.ID, tls.New), libp2p.Security(secio.ID, secio.New))) + } else { + opts.Opts = append(opts.Opts, libp2p.ChainOptions(libp2p.Security(secio.ID, secio.New), libp2p.Security(tls.ID, tls.New))) + } + return opts + } +} + +func BandwidthCounter() (opts Libp2pOpts, reporter metrics.Reporter) { + reporter = metrics.NewBandwidthCounter() + opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter)) + return opts, reporter +}