From 36ded0389863aafe675ea318bcc56660098d5011 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Fri, 16 Jun 2023 20:13:39 +1000 Subject: [PATCH 1/4] feat: http publisher without fully setting up server Ref: https://github.com/ipni/go-libipni/pull/56 --- engine/engine.go | 47 ++++++++++++++++++++++++++++++++++++++--------- engine/options.go | 38 ++++++++++++++++++++++++++++++++++---- 2 files changed, 72 insertions(+), 13 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index aa3ece2f..8be627d4 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "net/http" "net/url" "sync" @@ -119,7 +120,7 @@ func (e *Engine) Start(ctx context.Context) error { e.publisher, err = e.newPublisher() if err != nil { - log.Errorw("Failed to instantiate dagsync publisher", "err", err, "kind", e.pubKind) + log.Errorw("Failed to instantiate publisher", "err", err, "kind", e.pubKind) return err } @@ -149,7 +150,13 @@ func (e *Engine) newPublisher() (dagsync.Publisher, error) { log.Info("Remote announcements disabled; all advertisements will only be stored locally.") return nil, nil case HttpPublisher: - httpPub, err := httpsync.NewPublisher(e.pubHttpListenAddr, e.lsys, e.key) + var httpPub *httpsync.Publisher + var err error + if e.pubHttpWithoutServer { + httpPub, err = httpsync.NewPublisherWithoutServer(e.pubHttpListenAddr, e.pubHttpHandlerPath, e.lsys, e.key) + } else { + httpPub, err = httpsync.NewPublisher(e.pubHttpListenAddr, e.lsys, e.key) + } if err != nil { return nil, fmt.Errorf("cannot create http publisher: %w", err) } @@ -177,12 +184,12 @@ func createSenders(directAnnounceURLs []*url.URL, p2pHost host.Host, pubsubTopic // If there are announce URLs, then creage an announce sender to send // direct HTTP announce messages to these URLs. - if len(directAnnounceURLs) != 0 { - var peerID peer.ID - if p2pHost != nil { - peerID = p2pHost.ID() + if len(e.announceURLs) != 0 { + id, err := peer.IDFromPrivateKey(e.key) + if err != nil { + return nil, fmt.Errorf("cannot get peer ID from private key: %w", err) } - httpSender, err := httpsender.New(directAnnounceURLs, peerID) + httpSender, err := httpsender.New(e.announceURLs, id) if err != nil { return nil, fmt.Errorf("cannot create http announce sender: %w", err) } @@ -270,10 +277,14 @@ func (e *Engine) Publish(ctx context.Context, adv schema.Advertisement) (cid.Cid // Only announce the advertisement CID if publisher is configured. if e.publisher != nil { log := log.With("adCid", c) - if len(e.announceURLs) == 0 { + if len(e.announceURLs) == 0 && e.h != nil { log.Info("Announcing advertisement in pubsub channel") - } else { + } else if len(e.announceURLs) != 0 && e.h == nil { + log.Info("Announcing advertisement via http") + } else if len(e.announceURLs) != 0 && e.h != nil { log.Info("Announcing advertisement in pubsub channel and via http") + } else { + return cid.Undef, fmt.Errorf("unexpected publisher state, no announceURLs or libp2p host") } e.publisher.SetRoot(c) @@ -457,6 +468,24 @@ func (e *Engine) Shutdown() error { return errs } +// GetPublisherHttpFunc gets the http.HandlerFunc that can be used to serve +// advertisements over HTTP. The returned handler is only valid if the +// PublisherKind is HttpPublisher and the HttpPublisherWithoutServer option is +// set. +func (e *Engine) GetPublisherHttpFunc() (http.HandlerFunc, error) { + if e.publisher == nil { + return nil, errors.New("no publisher configured") + } + if !e.pubHttpWithoutServer { + return nil, errors.New("HttpPublisherWithoutServer option not set") + } + if hp, ok := e.publisher.(*httpsync.Publisher); !ok { + return nil, errors.New("publisher is not an http publisher") + } else { + return hp.ServeHTTP, nil + } +} + // GetAdv gets the advertisement associated to the given cid c. The context is // not used. func (e *Engine) GetAdv(_ context.Context, adCid cid.Cid) (*schema.Advertisement, error) { diff --git a/engine/options.go b/engine/options.go index 46425fbd..3852017d 100644 --- a/engine/options.go +++ b/engine/options.go @@ -72,6 +72,8 @@ type ( // published. pubHttpAnnounceAddrs []multiaddr.Multiaddr pubHttpListenAddr string + pubHttpWithoutServer bool + pubHttpHandlerPath string pubTopicName string pubTopic *pubsub.Topic pubExtraGossipData []byte @@ -116,7 +118,8 @@ func newOptions(o ...Option) (*options, error) { opts.ds = dssync.MutexWrap(datastore.NewMapDatastore()) } - if opts.h == nil { + if (opts.key == nil || len(opts.provider.Addrs) == 0 || opts.provider.ID == "") && opts.h == nil { + // need a host h, err := libp2p.New() if err != nil { return nil, err @@ -125,8 +128,10 @@ func newOptions(o ...Option) (*options, error) { opts.h = h } - // Initialize private key from libp2p host - opts.key = opts.h.Peerstore().PrivKey(opts.h.ID()) + if opts.key == nil { + // Initialize private key from libp2p host + opts.key = opts.h.Peerstore().PrivKey(opts.h.ID()) + } // Defensively check that host's self private key is indeed set. if opts.key == nil { return nil, fmt.Errorf("cannot find private key in self peerstore; libp2p host is misconfigured") @@ -240,6 +245,24 @@ func WithHttpPublisherListenAddr(addr string) Option { } } +// WithHttpPublisherWithoutServer sets the HTTP publisher to not start a server. +// Setting up the handler is left to the user. +func WithHttpPublisherWithoutServer() Option { + return func(o *options) error { + o.pubHttpWithoutServer = true + return nil + } +} + +// WithHttpPublisherHandlerPath should only be used with +// WithHttpPublisherWithoutServer +func WithHttpPublisherHandlerPath(handlerPath string) Option { + return func(o *options) error { + o.pubHttpHandlerPath = handlerPath + return nil + } +} + // WithHttpPublisherAnnounceAddr sets the address to be supplied in announce // messages to tell indexers where to retrieve advertisements. // @@ -249,7 +272,7 @@ func WithHttpPublisherAnnounceAddr(addr string) Option { if addr != "" { maddr, err := multiaddr.NewMultiaddr(addr) if err != nil { - return fmt.Errorf("here: %w", err) + return err } o.pubHttpAnnounceAddrs = append(o.pubHttpAnnounceAddrs, maddr) } @@ -365,6 +388,13 @@ func WithExtraGossipData(extraData []byte) Option { } } +func WithPrivateKey(key crypto.PrivKey) Option { + return func(o *options) error { + o.key = key + return nil + } +} + // WithDirectAnnounce sets indexer URLs to send direct HTTP announcements to. func WithDirectAnnounce(announceURLs ...string) Option { return func(o *options) error { From a0086943c2e47bfb22253d8acb0a7b47e4ce1021 Mon Sep 17 00:00:00 2001 From: gammazero Date: Tue, 25 Jul 2023 14:06:25 -0700 Subject: [PATCH 2/4] Simplify createSenders --- engine/engine.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index 8be627d4..46700563 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -28,8 +28,6 @@ import ( "github.com/ipni/go-libipni/metadata" provider "github.com/ipni/index-provider" "github.com/ipni/index-provider/engine/chunker" - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" ) @@ -135,7 +133,7 @@ func (e *Engine) Start(ctx context.Context) error { } // If publisher created, then create announcement senders. - e.senders, err = createSenders(e.announceURLs, e.h, e.pubTopicName, e.pubExtraGossipData, e.pubTopic) + e.senders, err = e.createSenders() if err != nil { return err } @@ -179,7 +177,7 @@ func (e *Engine) newPublisher() (dagsync.Publisher, error) { return nil, fmt.Errorf("unknown publisher kind: %s", e.pubKind) } -func createSenders(directAnnounceURLs []*url.URL, p2pHost host.Host, pubsubTopicName string, extraGossipData []byte, existingTopic *pubsub.Topic) ([]announce.Sender, error) { +func (e *Engine) createSenders() ([]announce.Sender, error) { var senders []announce.Sender // If there are announce URLs, then creage an announce sender to send @@ -197,11 +195,11 @@ func createSenders(directAnnounceURLs []*url.URL, p2pHost host.Host, pubsubTopic } // If there is a libp2p host, then create a gossip pubsub announce sender. - if p2pHost != nil { + if e.h != nil { // Create an announce sender to send over gossip pubsub. - p2pSender, err := p2psender.New(p2pHost, pubsubTopicName, - p2psender.WithTopic(existingTopic), - p2psender.WithExtraData(extraGossipData)) + p2pSender, err := p2psender.New(e.h, e.pubTopicName, + p2psender.WithTopic(e.pubTopic), + p2psender.WithExtraData(e.pubExtraGossipData)) if err != nil { return nil, fmt.Errorf("cannot create p2p pubsub announce sender: %w", err) } From 93a4029b0c67bca8cc9d48628d65f7530835300c Mon Sep 17 00:00:00 2001 From: gammazero Date: Tue, 25 Jul 2023 14:21:09 -0700 Subject: [PATCH 3/4] format comments --- engine/engine.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index 46700563..0d72d292 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -441,7 +441,8 @@ func (e *Engine) NotifyRemove(ctx context.Context, provider peer.ID, contextID [ return e.publishAdvForIndex(ctx, provider, nil, contextID, metadata.Metadata{}, true) } -// LinkSystem gets the link system used by the engine to store and retrieve advertisement data. +// LinkSystem gets the link system used by the engine to store and retrieve +// advertisement data. func (e *Engine) LinkSystem() *ipld.LinkSystem { return &e.lsys } @@ -479,9 +480,8 @@ func (e *Engine) GetPublisherHttpFunc() (http.HandlerFunc, error) { } if hp, ok := e.publisher.(*httpsync.Publisher); !ok { return nil, errors.New("publisher is not an http publisher") - } else { - return hp.ServeHTTP, nil } + return hp.ServeHTTP, nil } // GetAdv gets the advertisement associated to the given cid c. The context is @@ -531,9 +531,8 @@ func (e *Engine) publishAdvForIndex(ctx context.Context, p peer.ID, addrs []mult } } - // If not removing, then generate the link for the list of - // CIDs from the contextID using the multihash lister, and store the - // relationship. + // If not removing, then generate the link for the list of CIDs from the + // contextID using the multihash lister, and store the relationship. if !isRm { log.Info("Creating advertisement") @@ -691,14 +690,14 @@ func (e *Engine) keyToMetadataKey(provider peer.ID, contextID []byte) datastore. func (e *Engine) putKeyCidMap(ctx context.Context, provider peer.ID, contextID []byte, c cid.Cid) error { // Store the map Key-Cid to know what CidLink to put in advertisement when // notifying about a removal. - err := e.ds.Put(ctx, e.keyToCidKey(provider, contextID), c.Bytes()) if err != nil { return err } // And the other way around when graphsync is making a request, so the // lister in the linksystem knows to what contextID the CID referrs to. - // it's enough for us to store just a single mapping of cid to provider and context to generate chunks + // it's enough for us to store just a single mapping of cid to provider and + // context to generate chunks pB, err := provider.Marshal() if err != nil { @@ -737,13 +736,15 @@ type providerAndContext struct { ContextID []byte `json:"c"` } -// getCidKeyMap returns the provider and contextID for a given cid. Provider and Context ID are guaranteed to be -// not nil. In the case if legacy index exists, the default provider identity is assumed. +// getCidKeyMap returns the provider and contextID for a given cid. Provider +// and Context ID are guaranteed to be not nil. In the case if legacy index +// exists, the default provider identity is assumed. func (e *Engine) getCidKeyMap(ctx context.Context, c cid.Cid) (*providerAndContext, error) { // first see whether the mapping exists in the legacy index val, err := e.ds.Get(ctx, e.cidToKeyKey(c)) if err == nil { - // if the mapping has been found in the legacy index - return the default provider identity + // if the mapping has been found in the legacy index - return the + // default provider identity. return &providerAndContext{Provider: []byte(e.provider.ID), ContextID: val}, nil } if !errors.Is(err, datastore.ErrNotFound) { @@ -760,7 +761,7 @@ func (e *Engine) getCidKeyMap(ctx context.Context, c cid.Cid) (*providerAndConte if err != nil { return nil, err } - // in case if provider is empty (which should never happen), assume the default one + // if provider is empty (which should never happen), assume the default one if len(pAndC.Provider) == 0 { pAndC.Provider = []byte(e.provider.ID) } From 7222d38b74a6dc70ad3f961d01197154a6b20993 Mon Sep 17 00:00:00 2001 From: gammazero Date: Tue, 25 Jul 2023 14:24:54 -0700 Subject: [PATCH 4/4] fix typo --- engine/engine.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/engine/engine.go b/engine/engine.go index 0d72d292..7238ab75 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -478,7 +478,8 @@ func (e *Engine) GetPublisherHttpFunc() (http.HandlerFunc, error) { if !e.pubHttpWithoutServer { return nil, errors.New("HttpPublisherWithoutServer option not set") } - if hp, ok := e.publisher.(*httpsync.Publisher); !ok { + hp, ok := e.publisher.(*httpsync.Publisher) + if !ok { return nil, errors.New("publisher is not an http publisher") } return hp.ServeHTTP, nil