Skip to content

Commit

Permalink
namesys/pubsub: --enable-namesys-pubsub option and management
Browse files Browse the repository at this point in the history
Commits:
package.json: update go-libp2p-blankhost
namesys: fix stale package imports
update go-testutil
namesys/pubsub: reduce bootstrap provide period to 8hr
namesys/pubsub: try to extract the key from id first
option to enable ipns pubsub: --enable-namesys-pubsub
ipfs name pubsub management subcommands
corehttp/gateway_test: mockNamesys needs to implement GetResolver
pacify code climate

License: MIT
Signed-off-by: vyzo <vyzo@hackzen.org>
  • Loading branch information
vyzo committed Nov 18, 2017
1 parent e7e88e5 commit 98bba87
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 23 deletions.
4 changes: 4 additions & 0 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
unrestrictedApiAccessKwd = "unrestricted-api"
writableKwd = "writable"
enableFloodSubKwd = "enable-pubsub-experiment"
enableIPNSPubSubKwd = "enable-namesys-pubsub"
enableMultiplexKwd = "enable-mplex-experiment"
// apiAddrKwd = "address-api"
// swarmAddrKwd = "address-swarm"
Expand Down Expand Up @@ -156,6 +157,7 @@ Headers.
cmds.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API.").Default(false),
cmds.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."),
cmds.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."),
cmds.BoolOption(enableIPNSPubSubKwd, "Enable IPNS record distribution through pubsub; enables pubsub."),
cmds.BoolOption(enableMultiplexKwd, "Add the experimental 'go-multiplex' stream muxer to libp2p on construction.").Default(true),
// TODO: add way to override addresses. tricky part: updating the config if also --init.
// cmds.StringOption(apiAddrKwd, "Address for the daemon rpc API (overrides config)"),
Expand Down Expand Up @@ -281,6 +283,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {

offline, _, _ := req.Option(offlineKwd).Bool()
pubsub, _, _ := req.Option(enableFloodSubKwd).Bool()
ipnsps, _, _ := req.Option(enableIPNSPubSubKwd).Bool()
mplex, _, _ := req.Option(enableMultiplexKwd).Bool()

// Start assembling node config
Expand All @@ -290,6 +293,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
Online: !offline,
ExtraOpts: map[string]bool{
"pubsub": pubsub,
"ipnsps": ipnsps,
"mplex": mplex,
},
//TODO(Kubuxu): refactor Online vs Offline by adding Permanent vs Ephemeral
Expand Down
2 changes: 1 addition & 1 deletion core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {

if cfg.Online {
do := setupDiscoveryOption(rcfg.Discovery)
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("mplex")); err != nil {
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("ipnsps"), cfg.getOpt("mplex")); err != nil {
return err
}
} else {
Expand Down
129 changes: 129 additions & 0 deletions core/commands/ipnsps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package commands

import (
"errors"
"fmt"
"io"
"strings"

cmds "github.com/ipfs/go-ipfs/commands"
ns "github.com/ipfs/go-ipfs/namesys"

u "gx/ipfs/QmSU6eubNdhXjFBJBSksTp8kv8YRub8mGAPv8tVJHmL2EU/go-ipfs-util"
)

type ipnsPubsubState struct {
Enabled bool
}

// IpnsPubsubCmd is the subcommand that allows us to manage the IPNS pubsub system
var IpnsPubsubCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "IPNS pubsub management",
ShortDescription: `
Manage and inspect the state of the IPNS pubsub resolver.
Note: this command is experimental and subject to change as the system is refined
`,
},
Subcommands: map[string]*cmds.Command{
"state": ipnspsStateCmd,
"subs": ipnspsSubsCmd,
"cancel": ipnspsCancelCmd,
},
}

var ipnspsStateCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Query the state of IPNS pubsub",
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

_, ok := n.Namesys.GetResolver("pubsub")
res.SetOutput(&ipnsPubsubState{ok})
},
Type: ipnsPubsubState{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
output, ok := res.Output().(*ipnsPubsubState)
if !ok {
return nil, u.ErrCast()
}

var state string
if output.Enabled {
state = "enabled"
} else {
state = "disabled"
}

return strings.NewReader(state + "\n"), nil
},
},
}

var ipnspsSubsCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Show current name subscriptions",
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

r, ok := n.Namesys.GetResolver("pubsub")
if !ok {
res.SetError(errors.New("IPNS pubsub subsystem is not enabled"), cmds.ErrClient)
return
}

psr, ok := r.(*ns.PubsubResolver)
if !ok {
res.SetError(fmt.Errorf("unexpected resolver type: %v", r), cmds.ErrNormal)
return
}

res.SetOutput(&stringList{psr.GetSubscriptions()})
},
Type: stringList{},
Marshalers: cmds.MarshalerMap{
cmds.Text: stringListMarshaler,
},
}

var ipnspsCancelCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Cancel a name subscription",
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

r, ok := n.Namesys.GetResolver("pubsub")
if !ok {
res.SetError(errors.New("IPNS pubsub subsystem is not enabled"), cmds.ErrClient)
return
}

psr, ok := r.(*ns.PubsubResolver)
if !ok {
res.SetError(fmt.Errorf("unexpected resolver type: %v", r), cmds.ErrNormal)
return
}

psr.Cancel(req.Arguments()[0])
},
Arguments: []cmds.Argument{
cmds.StringArg("name", true, false, "Name to cancel the subscription for."),
},
}
1 change: 1 addition & 0 deletions core/commands/name.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ Resolve the value of a dnslink:
Subcommands: map[string]*cmds.Command{
"publish": PublishCmd,
"resolve": IpnsCmd,
"pubsub": IpnsPubsubCmd,
},
}
7 changes: 5 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ type Mounts struct {
Ipns mount.Mount
}

func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, mplex bool) error {
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, ipnsps, mplex bool) error {

if n.PeerHost != nil { // already online.
return errors.New("node already online")
Expand Down Expand Up @@ -249,8 +249,11 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return err
}

if pubsub {
if pubsub || ipnsps {
n.Floodsub = floodsub.NewFloodSub(ctx, peerhost)
}

if ipnsps {
err = namesys.AddPubsubNameSystem(ctx, n.Namesys, n.PeerHost, n.Routing, n.Repo.Datastore(), n.Floodsub)
if err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions core/corehttp/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (m mockNamesys) PublishWithEOL(ctx context.Context, name ci.PrivKey, value
return errors.New("not implemented for mockNamesys")
}

func (m mockNamesys) GetResolver(subs string) (namesys.Resolver, bool) {
return nil, false
}

func newNodeWithMockNamesys(ns mockNamesys) (*core.IpfsNode, error) {
c := config.Config{
Identity: config.Identity{
Expand Down
8 changes: 8 additions & 0 deletions namesys/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ var ErrPublishFailed = errors.New("Could not publish name.")
type NameSystem interface {
Resolver
Publisher
ResolverLookup
}

// Resolver is an object capable of resolving names.
Expand Down Expand Up @@ -111,3 +112,10 @@ type Publisher interface {
// call once the records spec is implemented
PublishWithEOL(ctx context.Context, name ci.PrivKey, value path.Path, eol time.Time) error
}

// ResolverLookup is an object capable of finding resolvers for a subsystem
type ResolverLookup interface {

// GetResolver retrieves a resolver associated with a subsystem
GetResolver(subs string) (Resolver, bool)
}
19 changes: 16 additions & 3 deletions namesys/namesys.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
path "github.com/ipfs/go-ipfs/path"

routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing"
floodsub "gx/ipfs/QmUpeULWfmtsgCnfuRN3BHsfhHvBxNphoYh4La4CMxGt2Z/floodsub"
p2phost "gx/ipfs/QmUywuGNZoUKV8B9iyvup9bPkLiMrhTsyVMkeSXW5VxAfC/go-libp2p-host"
mh "gx/ipfs/QmVGtdTZdTFaLsaj2RwdVG8jcjNNcp1DE914DKZ2kHmXHw/go-multihash"
mh "gx/ipfs/QmU9a9NV9RdPNwZQDYd5uKsm6N6LJLSvLbywDDYFbaaC6P/go-multihash"
ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
isd "gx/ipfs/QmZmmuAXgX73UQmX1jRKjTGmjzq24Jinqkq8vzkBtno4uX/go-is-domain"
ci "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto"
p2phost "gx/ipfs/QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6/go-libp2p-host"
floodsub "gx/ipfs/Qmdnza7rLi7CMNNwNhNkcs9piX5sf6rxE8FrCsPzYtUEUi/floodsub"
)

// mpns (a multi-protocol NameSystem) implements generic IPFS naming.
Expand Down Expand Up @@ -223,3 +223,16 @@ func (ns *mpns) addToDHTCache(key ci.PrivKey, value path.Path, eol time.Time) {
eol: eol,
})
}

// GetResolver implements ResolverLookup
func (ns *mpns) GetResolver(subs string) (Resolver, bool) {
res, ok := ns.resolvers[subs]
if ok {
ires, ok := res.(Resolver)
if ok {
return ires, true
}
}

return nil, false
}
34 changes: 25 additions & 9 deletions namesys/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@ import (
path "github.com/ipfs/go-ipfs/path"
dshelp "github.com/ipfs/go-ipfs/thirdparty/ds-help"

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing"
pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
routing "gx/ipfs/QmPjTrrSfE6TzLv6ya6VWhGcCgPrUAdcgrDcQyRDX2VyW1/go-libp2p-routing"
u "gx/ipfs/QmSU6eubNdhXjFBJBSksTp8kv8YRub8mGAPv8tVJHmL2EU/go-ipfs-util"
cid "gx/ipfs/QmTprEaAA2A9bst5XH7exuyi5KzNMK3SEDNN8rBDnKWcUS/go-cid"
mh "gx/ipfs/QmU9a9NV9RdPNwZQDYd5uKsm6N6LJLSvLbywDDYFbaaC6P/go-multihash"
ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
dssync "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore/sync"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
proto "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/proto"
floodsub "gx/ipfs/QmZdsQf8BiCpAj61nz9NgqVeRUkw9vATvCs7UHFTxoUMDb/floodsub"
p2phost "gx/ipfs/QmZy7c24mmkEHpNJndwgsEE3wcVxHd8yB969yTnAJFVw7f/go-libp2p-host"
ci "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto"
p2phost "gx/ipfs/QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6/go-libp2p-host"
record "gx/ipfs/QmbxkgUceEcuSZ4ZdBA3x74VUDSSYjHYmmeEqkjxbtZ6Jg/go-libp2p-record"
dhtpb "gx/ipfs/QmbxkgUceEcuSZ4ZdBA3x74VUDSSYjHYmmeEqkjxbtZ6Jg/go-libp2p-record/pb"
floodsub "gx/ipfs/Qmdnza7rLi7CMNNwNhNkcs9piX5sf6rxE8FrCsPzYtUEUi/floodsub"
)

// PubsubPublisher is a publisher that distributes IPNS records through pubsub
Expand Down Expand Up @@ -210,10 +210,13 @@ func (r *PubsubResolver) resolveOnce(ctx context.Context, name string) (path.Pat
return "", errors.New("Cannot resolve own name through pubsub")
}

pubk, err := r.pkf.GetPublicKey(ctx, id)
if err != nil {
log.Warningf("PubsubResolve: error fetching public key: %s [%s]", err.Error(), xname)
return "", err
pubk := id.ExtractPublicKey()
if pubk == nil {
pubk, err = r.pkf.GetPublicKey(ctx, id)
if err != nil {
log.Warningf("PubsubResolve: error fetching public key: %s [%s]", err.Error(), xname)
return "", err
}
}

// the topic is /ipns/Qmhash
Expand Down Expand Up @@ -273,6 +276,19 @@ func (r *PubsubResolver) resolveOnce(ctx context.Context, name string) (path.Pat
return value, err
}

// GetSubscriptions retrieves a list of active topic subscriptions
func (r *PubsubResolver) GetSubscriptions() []string {
r.mx.Lock()
defer r.mx.Unlock()

var res []string
for sub := range r.subs {
res = append(res, sub)
}

return res
}

// Cancel cancels a topic subscription
func (r *PubsubResolver) Cancel(name string) {
r.mx.Lock()
Expand Down Expand Up @@ -368,7 +384,7 @@ func bootstrapPubsub(ctx context.Context, cr routing.ContentRouting, host p2phos
go func() {
for {
select {
case <-time.After(24 * time.Hour):
case <-time.After(8 * time.Hour):
err := cr.Provide(ctx, rz, true)
if err != nil {
log.Warningf("bootstrapPubsub: error providing rendezvous for %s: %s", topic, err.Error())
Expand Down
12 changes: 6 additions & 6 deletions namesys/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import (

path "github.com/ipfs/go-ipfs/path"
mockrouting "github.com/ipfs/go-ipfs/routing/mock"
testutil "github.com/ipfs/go-ipfs/thirdparty/testutil"

routing "gx/ipfs/QmPR2JzfKd9poHx9XBhzoFeBBC31ZM3W5iUPKJZWyaoZZm/go-libp2p-routing"
bhost "gx/ipfs/QmPZRCaYeNLMo5GfcRS2rv9ZxVuXXt6MFg9dWLmgsdXKCw/go-libp2p-blankhost"
pstore "gx/ipfs/QmPgDWmTmuzvP7QE5zwo1TmjbJme9pmZHNujB2453jkCTr/go-libp2p-peerstore"
routing "gx/ipfs/QmPjTrrSfE6TzLv6ya6VWhGcCgPrUAdcgrDcQyRDX2VyW1/go-libp2p-routing"
testutil "gx/ipfs/QmQgLZP9haZheimMHqqAjJh2LhRmNfEoZDfbtkpeMhi9xK/go-testutil"
ds "gx/ipfs/QmVSase1JP7cq9QkPT46oNwdp9pT6kBkG3oqS14y3QcZjG/go-datastore"
netutil "gx/ipfs/QmViDDJGzv2TKrheoxckReECc72iRgaYsobG2HYUGWuPVF/go-libp2p-netutil"
peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
floodsub "gx/ipfs/QmZdsQf8BiCpAj61nz9NgqVeRUkw9vATvCs7UHFTxoUMDb/floodsub"
p2phost "gx/ipfs/QmZy7c24mmkEHpNJndwgsEE3wcVxHd8yB969yTnAJFVw7f/go-libp2p-host"
ci "gx/ipfs/QmaPbCnUMBohSGo3KnxEa2bHqyJVVeEEcwtqJAYxerieBo/go-libp2p-crypto"
bhost "gx/ipfs/QmbQXcWAa9ZbTH74m6yroexY8QjTS4oivLNEFwjamZCJTU/go-libp2p-blankhost"
p2phost "gx/ipfs/QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6/go-libp2p-host"
floodsub "gx/ipfs/Qmdnza7rLi7CMNNwNhNkcs9piX5sf6rxE8FrCsPzYtUEUi/floodsub"
netutil "gx/ipfs/QmdzuGp4a9pahgXuBeReHdYGUzdVX3FUCwfmWVo5mQfkTi/go-libp2p-netutil"
)

func newNetHost(ctx context.Context, t *testing.T) p2phost.Host {
Expand Down
10 changes: 8 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,15 @@
},
{
"author": "whyrusleeping",
"hash": "QmbQXcWAa9ZbTH74m6yroexY8QjTS4oivLNEFwjamZCJTU",
"hash": "QmPZRCaYeNLMo5GfcRS2rv9ZxVuXXt6MFg9dWLmgsdXKCw",
"name": "go-libp2p-blankhost",
"version": "0.1.15"
"version": "0.2.0"
},
{
"author": "whyrusleeping",
"hash": "QmQgLZP9haZheimMHqqAjJh2LhRmNfEoZDfbtkpeMhi9xK",
"name": "go-testutil",
"version": "1.1.12"
}
],
"gxVersion": "0.10.0",
Expand Down

0 comments on commit 98bba87

Please sign in to comment.