Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: http publisher without fully setting up server #382

Merged
merged 4 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 55 additions & 27 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"sync"

Expand All @@ -27,8 +28,6 @@
"github.com/ipni/go-libipni/metadata"
provider "github.com/ipni/index-provider"
"github.com/ipni/index-provider/engine/chunker"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)
Expand Down Expand Up @@ -119,7 +118,7 @@

e.publisher, err = e.newPublisher()
if err != nil {
log.Errorw("Failed to instantiate dagsync publisher", "err", err, "kind", e.pubKind)
log.Errorw("Failed to instantiate publisher", "err", err, "kind", e.pubKind)
return err
}

Expand All @@ -134,7 +133,7 @@
}

// If publisher created, then create announcement senders.
e.senders, err = createSenders(e.announceURLs, e.h, e.pubTopicName, e.pubExtraGossipData, e.pubTopic)
e.senders, err = e.createSenders()
if err != nil {
return err
}
Expand All @@ -149,7 +148,13 @@
log.Info("Remote announcements disabled; all advertisements will only be stored locally.")
return nil, nil
case HttpPublisher:
httpPub, err := httpsync.NewPublisher(e.pubHttpListenAddr, e.lsys, e.key)
var httpPub *httpsync.Publisher
var err error
if e.pubHttpWithoutServer {
httpPub, err = httpsync.NewPublisherWithoutServer(e.pubHttpListenAddr, e.pubHttpHandlerPath, e.lsys, e.key)
} else {
httpPub, err = httpsync.NewPublisher(e.pubHttpListenAddr, e.lsys, e.key)
}
if err != nil {
return nil, fmt.Errorf("cannot create http publisher: %w", err)
}
Expand All @@ -172,29 +177,29 @@
return nil, fmt.Errorf("unknown publisher kind: %s", e.pubKind)
}

func createSenders(directAnnounceURLs []*url.URL, p2pHost host.Host, pubsubTopicName string, extraGossipData []byte, existingTopic *pubsub.Topic) ([]announce.Sender, error) {
func (e *Engine) createSenders() ([]announce.Sender, error) {
var senders []announce.Sender

// If there are announce URLs, then creage an announce sender to send
// direct HTTP announce messages to these URLs.
if len(directAnnounceURLs) != 0 {
var peerID peer.ID
if p2pHost != nil {
peerID = p2pHost.ID()
if len(e.announceURLs) != 0 {
id, err := peer.IDFromPrivateKey(e.key)
if err != nil {
return nil, fmt.Errorf("cannot get peer ID from private key: %w", err)
}
httpSender, err := httpsender.New(directAnnounceURLs, peerID)
httpSender, err := httpsender.New(e.announceURLs, id)
if err != nil {
return nil, fmt.Errorf("cannot create http announce sender: %w", err)
}
senders = append(senders, httpSender)
}

// If there is a libp2p host, then create a gossip pubsub announce sender.
if p2pHost != nil {
if e.h != nil {
// Create an announce sender to send over gossip pubsub.
p2pSender, err := p2psender.New(p2pHost, pubsubTopicName,
p2psender.WithTopic(existingTopic),
p2psender.WithExtraData(extraGossipData))
p2pSender, err := p2psender.New(e.h, e.pubTopicName,
p2psender.WithTopic(e.pubTopic),
p2psender.WithExtraData(e.pubExtraGossipData))
if err != nil {
return nil, fmt.Errorf("cannot create p2p pubsub announce sender: %w", err)
}
Expand Down Expand Up @@ -270,10 +275,14 @@
// Only announce the advertisement CID if publisher is configured.
if e.publisher != nil {
log := log.With("adCid", c)
if len(e.announceURLs) == 0 {
if len(e.announceURLs) == 0 && e.h != nil {
log.Info("Announcing advertisement in pubsub channel")
} else {
} else if len(e.announceURLs) != 0 && e.h == nil {
log.Info("Announcing advertisement via http")
} else if len(e.announceURLs) != 0 && e.h != nil {
log.Info("Announcing advertisement in pubsub channel and via http")
} else {
return cid.Undef, fmt.Errorf("unexpected publisher state, no announceURLs or libp2p host")
}

e.publisher.SetRoot(c)
Expand Down Expand Up @@ -432,7 +441,8 @@
return e.publishAdvForIndex(ctx, provider, nil, contextID, metadata.Metadata{}, true)
}

// LinkSystem gets the link system used by the engine to store and retrieve advertisement data.
// LinkSystem gets the link system used by the engine to store and retrieve
// advertisement data.
func (e *Engine) LinkSystem() *ipld.LinkSystem {
return &e.lsys
}
Expand All @@ -457,6 +467,23 @@
return errs
}

// GetPublisherHttpFunc gets the http.HandlerFunc that can be used to serve
// advertisements over HTTP. The returned handler is only valid if the
// PublisherKind is HttpPublisher and the HttpPublisherWithoutServer option is
// set.
func (e *Engine) GetPublisherHttpFunc() (http.HandlerFunc, error) {
if e.publisher == nil {
return nil, errors.New("no publisher configured")
}
if !e.pubHttpWithoutServer {
return nil, errors.New("HttpPublisherWithoutServer option not set")
}
if hp, ok := e.publisher.(*httpsync.Publisher); !ok {

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / ubuntu (go 1.20.x)

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / ubuntu (go 1.20.x)

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / ubuntu (go 1.20.x)

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / ubuntu (go 1.20.x)

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / macos (go 1.20.x)

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / macos (go 1.20.x)

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / windows (go 1.20.x)

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / windows (go 1.20.x)

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / macos (go 1.20.x)

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / macos (go 1.20.x)

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / windows (go 1.20.x)

hp declared and not used

Check failure on line 481 in engine/engine.go

View workflow job for this annotation

GitHub Actions / windows (go 1.20.x)

hp declared and not used
return nil, errors.New("publisher is not an http publisher")
}
return hp.ServeHTTP, nil

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / ubuntu (go 1.20.x)

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / ubuntu (go 1.20.x)

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / ubuntu (go 1.20.x)

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / ubuntu (go 1.20.x)

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / macos (go 1.20.x)

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / macos (go 1.20.x)

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

undefined: hp (compile)

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / All

undefined: hp (compile)

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / windows (go 1.20.x)

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / windows (go 1.20.x)

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / macos (go 1.20.x)

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / macos (go 1.20.x)

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / windows (go 1.20.x)

undefined: hp

Check failure on line 484 in engine/engine.go

View workflow job for this annotation

GitHub Actions / windows (go 1.20.x)

undefined: hp
}

// GetAdv gets the advertisement associated to the given cid c. The context is
// not used.
func (e *Engine) GetAdv(_ context.Context, adCid cid.Cid) (*schema.Advertisement, error) {
Expand Down Expand Up @@ -504,9 +531,8 @@
}
}

// If not removing, then generate the link for the list of
// CIDs from the contextID using the multihash lister, and store the
// relationship.
// If not removing, then generate the link for the list of CIDs from the
// contextID using the multihash lister, and store the relationship.
if !isRm {
log.Info("Creating advertisement")

Expand Down Expand Up @@ -664,14 +690,14 @@
func (e *Engine) putKeyCidMap(ctx context.Context, provider peer.ID, contextID []byte, c cid.Cid) error {
// Store the map Key-Cid to know what CidLink to put in advertisement when
// notifying about a removal.

err := e.ds.Put(ctx, e.keyToCidKey(provider, contextID), c.Bytes())
if err != nil {
return err
}
// And the other way around when graphsync is making a request, so the
// lister in the linksystem knows to what contextID the CID referrs to.
// it's enough for us to store just a single mapping of cid to provider and context to generate chunks
// it's enough for us to store just a single mapping of cid to provider and
// context to generate chunks

pB, err := provider.Marshal()
if err != nil {
Expand Down Expand Up @@ -710,13 +736,15 @@
ContextID []byte `json:"c"`
}

// getCidKeyMap returns the provider and contextID for a given cid. Provider and Context ID are guaranteed to be
// not nil. In the case if legacy index exists, the default provider identity is assumed.
// getCidKeyMap returns the provider and contextID for a given cid. Provider
// and Context ID are guaranteed to be not nil. In the case if legacy index
// exists, the default provider identity is assumed.
func (e *Engine) getCidKeyMap(ctx context.Context, c cid.Cid) (*providerAndContext, error) {
// first see whether the mapping exists in the legacy index
val, err := e.ds.Get(ctx, e.cidToKeyKey(c))
if err == nil {
// if the mapping has been found in the legacy index - return the default provider identity
// if the mapping has been found in the legacy index - return the
// default provider identity.
return &providerAndContext{Provider: []byte(e.provider.ID), ContextID: val}, nil
}
if !errors.Is(err, datastore.ErrNotFound) {
Expand All @@ -733,7 +761,7 @@
if err != nil {
return nil, err
}
// in case if provider is empty (which should never happen), assume the default one
// if provider is empty (which should never happen), assume the default one
if len(pAndC.Provider) == 0 {
pAndC.Provider = []byte(e.provider.ID)
}
Expand Down
38 changes: 34 additions & 4 deletions engine/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type (
// published.
pubHttpAnnounceAddrs []multiaddr.Multiaddr
pubHttpListenAddr string
pubHttpWithoutServer bool
pubHttpHandlerPath string
pubTopicName string
pubTopic *pubsub.Topic
pubExtraGossipData []byte
Expand Down Expand Up @@ -116,7 +118,8 @@ func newOptions(o ...Option) (*options, error) {
opts.ds = dssync.MutexWrap(datastore.NewMapDatastore())
}

if opts.h == nil {
if (opts.key == nil || len(opts.provider.Addrs) == 0 || opts.provider.ID == "") && opts.h == nil {
// need a host
h, err := libp2p.New()
if err != nil {
return nil, err
Expand All @@ -125,8 +128,10 @@ func newOptions(o ...Option) (*options, error) {
opts.h = h
}

// Initialize private key from libp2p host
opts.key = opts.h.Peerstore().PrivKey(opts.h.ID())
if opts.key == nil {
// Initialize private key from libp2p host
opts.key = opts.h.Peerstore().PrivKey(opts.h.ID())
}
// Defensively check that host's self private key is indeed set.
if opts.key == nil {
return nil, fmt.Errorf("cannot find private key in self peerstore; libp2p host is misconfigured")
Expand Down Expand Up @@ -240,6 +245,24 @@ func WithHttpPublisherListenAddr(addr string) Option {
}
}

// WithHttpPublisherWithoutServer sets the HTTP publisher to not start a server.
// Setting up the handler is left to the user.
func WithHttpPublisherWithoutServer() Option {
return func(o *options) error {
o.pubHttpWithoutServer = true
return nil
}
}

// WithHttpPublisherHandlerPath should only be used with
// WithHttpPublisherWithoutServer
func WithHttpPublisherHandlerPath(handlerPath string) Option {
return func(o *options) error {
o.pubHttpHandlerPath = handlerPath
return nil
}
}

// WithHttpPublisherAnnounceAddr sets the address to be supplied in announce
// messages to tell indexers where to retrieve advertisements.
//
Expand All @@ -249,7 +272,7 @@ func WithHttpPublisherAnnounceAddr(addr string) Option {
if addr != "" {
maddr, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return fmt.Errorf("here: %w", err)
return err
}
o.pubHttpAnnounceAddrs = append(o.pubHttpAnnounceAddrs, maddr)
}
Expand Down Expand Up @@ -365,6 +388,13 @@ func WithExtraGossipData(extraData []byte) Option {
}
}

func WithPrivateKey(key crypto.PrivKey) Option {
return func(o *options) error {
o.key = key
return nil
}
}

// WithDirectAnnounce sets indexer URLs to send direct HTTP announcements to.
func WithDirectAnnounce(announceURLs ...string) Option {
return func(o *options) error {
Expand Down
Loading