Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coreapi: PubSub API #4805

Merged
merged 6 commits into from
Oct 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 36 additions & 97 deletions core/commands/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,16 @@ package commands
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net/http"
"sort"
"sync"
"time"

core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"

cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
blocks "gx/ipfs/QmRcHuYzAyswytBuMF78rj3LTChYszomRFXNg4685ZN1WM/go-block-format"
pstore "gx/ipfs/QmSJ36wcYQyEViJUWUEhJU81tw1KdakTKqLLHbvYbA9zDv/go-libp2p-peerstore"
cmdkit "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit"
floodsub "gx/ipfs/QmUK4h113Hh7bR2gPpsMcbUEbbzc7hspocmPi91Bmi69nH/go-libp2p-floodsub"
cmds "gx/ipfs/QmXTmUCBtDUrzDYVzASogLiNph7EBuYqEgPL7QoHNMzUnz/go-ipfs-cmds"
)

Expand Down Expand Up @@ -48,6 +41,13 @@ const (
pubsubDiscoverOptionName = "discover"
)

type pubsubMessage struct {
From []byte `json:"from,omitempty"`
Data []byte `json:"data,omitempty"`
Seqno []byte `json:"seqno,omitempty"`
TopicIDs []string `json:"topicIDs,omitempty"`
}

var PubsubSubCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Subscribe to messages on a given topic.",
Expand Down Expand Up @@ -79,40 +79,19 @@ This command outputs data in the following encodings:
cmdkit.BoolOption(pubsubDiscoverOptionName, "try to discover other peers subscribed to the same topic"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}

if n.Floodsub == nil {
return fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use")
}

topic := req.Arguments[0]
sub, err := n.Floodsub.Subscribe(topic)
discover, _ := req.Options[pubsubDiscoverOptionName].(bool)

sub, err := api.PubSub().Subscribe(req.Context, topic, options.PubSub.Discover(discover))
magik6k marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
defer sub.Cancel()

discover, _ := req.Options[pubsubDiscoverOptionName].(bool)
if discover {
go func() {
blk := blocks.NewBlock([]byte("floodsub:" + topic))
err := n.Blocks.AddBlock(blk)
if err != nil {
log.Error("pubsub discovery: ", err)
return
}

connectToPubSubPeers(req.Context, n, blk.Cid())
}()
}
defer sub.Close()

if f, ok := res.(http.Flusher); ok {
f.Flush()
Expand All @@ -126,15 +105,17 @@ This command outputs data in the following encodings:
return err
}

err = res.Emit(msg)
if err != nil {
return err
}
res.Emit(&pubsubMessage{
Data: msg.Data(),
From: []byte(msg.From()),
Seqno: msg.Seq(),
TopicIDs: msg.Topics(),
})
}
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
m, ok := v.(*floodsub.Message)
m, ok := v.(*pubsubMessage)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}
Expand All @@ -143,7 +124,7 @@ This command outputs data in the following encodings:
return err
}),
"ndpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
m, ok := v.(*floodsub.Message)
m, ok := v.(*pubsubMessage)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}
Expand All @@ -153,7 +134,7 @@ This command outputs data in the following encodings:
return err
}),
"lenpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
m, ok := v.(*floodsub.Message)
m, ok := v.(*pubsubMessage)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}
Expand All @@ -166,31 +147,7 @@ This command outputs data in the following encodings:
return err
}),
},
Type: floodsub.Message{},
}

func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
wg := &sync.WaitGroup{}
for p := range provs {
wg.Add(1)
go func(pi pstore.PeerInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
err := n.PeerHost.Connect(ctx, pi)
if err != nil {
log.Info("pubsub discover: ", err)
return
}
log.Info("connected to pubsub peer:", pi.ID)
}(p)
}

wg.Wait()
Type: pubsubMessage{},
}

var PubsubPubCmd = &cmds.Command{
Expand All @@ -210,20 +167,11 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}

if n.Floodsub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
}

topic := req.Arguments[0]

err = req.ParseBodyArgs()
Expand All @@ -232,7 +180,7 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
}

for _, data := range req.Arguments[1:] {
if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
if err := api.PubSub().Publish(req.Context, topic, []byte(data)); err != nil {
return err
}
}
Expand All @@ -254,21 +202,17 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}

if n.Floodsub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
l, err := api.PubSub().Ls(req.Context)
if err != nil {
return err
}

return cmds.EmitOnce(res, stringList{n.Floodsub.GetTopics()})
return cmds.EmitOnce(res, stringList{l})
},
Type: stringList{},
Encoders: cmds.EncoderMap{
Expand Down Expand Up @@ -308,26 +252,21 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
cmdkit.StringArg("topic", false, false, "topic to list connected peers of"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}

if n.Floodsub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use")
}

var topic string
if len(req.Arguments) == 1 {
topic = req.Arguments[0]
}

peers := n.Floodsub.ListPeers(topic)
peers, err := api.PubSub().Peers(req.Context, options.PubSub.Topic(topic))
if err != nil {
return err
}

list := &stringList{make([]string, 0, len(peers))}

for _, peer := range peers {
Expand Down
9 changes: 9 additions & 0 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ package coreapi
import (
core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"

logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log"
)

var log = logging.Logger("core/coreapi")

type CoreAPI struct {
node *core.IpfsNode
}
Expand Down Expand Up @@ -72,3 +76,8 @@ func (api *CoreAPI) Dht() coreiface.DhtAPI {
func (api *CoreAPI) Swarm() coreiface.SwarmAPI {
return (*SwarmAPI)(api)
}

// PubSub returns the PubSubAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) PubSub() coreiface.PubSubAPI {
return (*PubSubAPI)(api)
}
3 changes: 3 additions & 0 deletions core/coreapi/interface/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type CoreAPI interface {
// Swarm returns an implementation of Swarm API
Swarm() SwarmAPI

// PubSub returns an implementation of PubSub API
PubSub() PubSubAPI

// ResolvePath resolves the path using Unixfs resolver
ResolvePath(context.Context, Path) (ResolvedPath, error)

Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/interface/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ import "errors"

var (
ErrIsDir = errors.New("object is a directory")
ErrOffline = errors.New("can't resolve, ipfs node is offline")
ErrOffline = errors.New("this action must be run in online mode, try running 'ipfs daemon' first")
)
58 changes: 58 additions & 0 deletions core/coreapi/interface/options/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package options

type PubSubPeersSettings struct {
Topic string
}

type PubSubSubscribeSettings struct {
Discover bool
}

type PubSubPeersOption func(*PubSubPeersSettings) error
type PubSubSubscribeOption func(*PubSubSubscribeSettings) error

func PubSubPeersOptions(opts ...PubSubPeersOption) (*PubSubPeersSettings, error) {
options := &PubSubPeersSettings{
Topic: "",
}

for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}
return options, nil
}

func PubSubSubscribeOptions(opts ...PubSubSubscribeOption) (*PubSubSubscribeSettings, error) {
options := &PubSubSubscribeSettings{
Discover: false,
}

for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}
return options, nil
}

type pubsubOpts struct{}

var PubSub pubsubOpts

func (pubsubOpts) Topic(topic string) PubSubPeersOption {
return func(settings *PubSubPeersSettings) error {
settings.Topic = topic
return nil
}
}

func (pubsubOpts) Discover(discover bool) PubSubSubscribeOption {
return func(settings *PubSubSubscribeSettings) error {
settings.Discover = discover
return nil
}
}
48 changes: 48 additions & 0 deletions core/coreapi/interface/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package iface

import (
"context"
"io"

options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"

peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer"
)

// PubSubSubscription is an active PubSub subscription
type PubSubSubscription interface {
io.Closer

// Next return the next incoming message
Next(context.Context) (PubSubMessage, error)
}

// PubSubMessage is a single PubSub message
type PubSubMessage interface {
// From returns id of a peer from which the message has arrived
From() peer.ID

// Data returns the message body
Data() []byte

// Seq returns message identifier
Seq() []byte

// Topics returns list of topics this message was set to
Topics() []string
}

// PubSubAPI specifies the interface to PubSub
type PubSubAPI interface {
// Ls lists subscribed topics by name
Ls(context.Context) ([]string, error)
magik6k marked this conversation as resolved.
Show resolved Hide resolved

// Peers list peers we are currently pubsubbing with
Peers(context.Context, ...options.PubSubPeersOption) ([]peer.ID, error)
magik6k marked this conversation as resolved.
Show resolved Hide resolved

// Publish a message to a given pubsub topic
Publish(context.Context, string, []byte) error

// Subscribe to messages on a given topic
Subscribe(context.Context, string, ...options.PubSubSubscribeOption) (PubSubSubscription, error)
}
Loading