Skip to content

Commit

Permalink
ipns(pubsub): utilize persistent pubsub value store
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Dec 9, 2019
1 parent 17e886e commit 0633640
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 12 deletions.
5 changes: 4 additions & 1 deletion core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {

// parse PubSub config

ps := fx.Options()
ps, disc := fx.Options(), fx.Options()
if bcfg.getOpt("pubsub") || bcfg.getOpt("ipnsps") {
disc = fx.Provide(libp2p.TopicDiscovery())

var pubsubOptions []pubsub.Option
pubsubOptions = append(
pubsubOptions,
Expand Down Expand Up @@ -113,6 +115,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {
maybeInvoke(libp2p.AutoNATService(cfg.Experimental.QUIC), cfg.Swarm.EnableAutoNATService),
connmgr,
ps,
disc,
)

return opts
Expand Down
11 changes: 6 additions & 5 deletions core/node/libp2p/pubsub.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package libp2p

import (
host "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"

"github.com/ipfs/go-ipfs/core/node/helpers"
)

func FloodSub(pubsubOptions ...pubsub.Option) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) {
return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...)
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) {
return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, append(pubsubOptions, pubsub.WithDiscovery(disc))...)
}
}

func GossipSub(pubsubOptions ...pubsub.Option) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) {
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...)
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) {
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, append(pubsubOptions, pubsub.WithDiscovery(disc))...)
}
}
13 changes: 9 additions & 4 deletions core/node/libp2p/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package libp2p
import (
"context"
"sort"
"time"

host "github.com/libp2p/go-libp2p-core/host"
routing "github.com/libp2p/go-libp2p-core/routing"
Expand Down Expand Up @@ -83,15 +84,19 @@ type p2pPSRoutingIn struct {
PubSub *pubsub.PubSub `optional:"true"`
}

func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore) {
psRouter := namesys.NewPubsubValueStore(
func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore, error) {
psRouter, err := namesys.NewPubsubValueStore(
helpers.LifecycleCtx(mctx, lc),
in.Host,
in.BaseIpfsRouting,
in.PubSub,
in.Validator,
namesys.WithRebroadcastInterval(time.Minute),
)

if err != nil {
return p2pRouterOut{}, nil, err
}

return p2pRouterOut{
Router: Router{
Routing: &routinghelpers.Compose{
Expand All @@ -102,5 +107,5 @@ func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (
},
Priority: 100,
},
}, psRouter
}, psRouter, nil
}
31 changes: 31 additions & 0 deletions core/node/libp2p/topicdiscovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package libp2p

import (
"math/rand"
"time"

"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
disc "github.com/libp2p/go-libp2p-discovery"

"github.com/ipfs/go-ipfs/core/node/helpers"
"go.uber.org/fx"
)

func TopicDiscovery() interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, cr BaseIpfsRouting) (service discovery.Discovery, err error) {
baseDisc := disc.NewRoutingDiscovery(cr)
minBackoff, maxBackoff := time.Second*60, time.Hour
rng := rand.New(rand.NewSource(rand.Int63()))
d, err := disc.NewBackoffDiscovery(
baseDisc,
disc.NewExponentialBackoff(minBackoff, maxBackoff, disc.FullJitter, time.Second, 5.0, 0, rng),
)

if err != nil {
return nil, err
}

return d, nil
}
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/libp2p/go-libp2p-circuit v0.1.4
github.com/libp2p/go-libp2p-connmgr v0.1.1
github.com/libp2p/go-libp2p-core v0.2.5
github.com/libp2p/go-libp2p-discovery v0.2.0
github.com/libp2p/go-libp2p-http v0.1.4
github.com/libp2p/go-libp2p-kad-dht v0.3.1
github.com/libp2p/go-libp2p-kbucket v0.2.1
Expand All @@ -70,7 +71,7 @@ require (
github.com/libp2p/go-libp2p-peerstore v0.1.4
github.com/libp2p/go-libp2p-pnet v0.1.0
github.com/libp2p/go-libp2p-pubsub v0.2.4
github.com/libp2p/go-libp2p-pubsub-router v0.1.0
github.com/libp2p/go-libp2p-pubsub-router v0.1.1-0.20191111061427-2fcdcceaeaab
github.com/libp2p/go-libp2p-quic-transport v0.2.2
github.com/libp2p/go-libp2p-record v0.1.2
github.com/libp2p/go-libp2p-routing-helpers v0.1.0
Expand Down
4 changes: 3 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,12 @@ github.com/libp2p/go-libp2p-pnet v0.1.0/go.mod h1:ZkyZw3d0ZFOex71halXRihWf9WH/j3
github.com/libp2p/go-libp2p-protocol v0.0.1/go.mod h1:Af9n4PiruirSDjHycM1QuiMi/1VZNHYcK8cLgFJLZ4s=
github.com/libp2p/go-libp2p-protocol v0.1.0/go.mod h1:KQPHpAabB57XQxGrXCNvbL6UEXfQqUgC/1adR2Xtflk=
github.com/libp2p/go-libp2p-pubsub v0.1.0/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uzw0t1Bp9R1znpR/Q=
github.com/libp2p/go-libp2p-pubsub v0.2.1/go.mod h1:Jscj3fk23R5mCrOwb625xjVs5ZEyTZcx/OlTwMDqU+g=
github.com/libp2p/go-libp2p-pubsub v0.2.4 h1:O4BcaKpPQ9p82yTBtzIzgDFoOXkqhrQpfcVac3FAywU=
github.com/libp2p/go-libp2p-pubsub v0.2.4/go.mod h1:1tJwAfySvZQ49R9uTVlkwtSTMVLeQQdrnLTJrr91gVc=
github.com/libp2p/go-libp2p-pubsub-router v0.1.0 h1:xA5B8Sdx64tNlSRIcay2QUngtlu8LpUJClaUk/dYYrg=
github.com/libp2p/go-libp2p-pubsub-router v0.1.0/go.mod h1:PnHOshBr/2I2ZxVfEsqfgCQPsVg09zo+DhSlWkOhPFM=
github.com/libp2p/go-libp2p-pubsub-router v0.1.1-0.20191111061427-2fcdcceaeaab h1:upGMP9YYJ/+IZSVoEQ14E8WOA56h86KDXCMav/g8DjM=
github.com/libp2p/go-libp2p-pubsub-router v0.1.1-0.20191111061427-2fcdcceaeaab/go.mod h1:CeModTwYOlqcWtbc+7N1F3RhG7nbY3h9s3g5iHHe/AQ=
github.com/libp2p/go-libp2p-quic-transport v0.2.2 h1:XyGRqFHD1oHdI2k98P1tWWRb9s27fl1SfmCcaX8plso=
github.com/libp2p/go-libp2p-quic-transport v0.2.2/go.mod h1:rVzcsiuOFBomAqvNOxeBUcP4vM4wE+NqqRZWvxjkbe0=
github.com/libp2p/go-libp2p-record v0.0.1/go.mod h1:grzqg263Rug/sRex85QrDOLntdFAymLDLm7lxMgU79Q=
Expand Down

0 comments on commit 0633640

Please sign in to comment.