Skip to content

Commit

Permalink
Option to enable/disable pubsub announcement (#422)
Browse files Browse the repository at this point in the history
* Option to enable/disable pubsub announcement

Make it optional to send announcements via gossip pubsub, instead of always sending them if a libp2p host is present.

This PR also separates announcement sender configuration from publisher configuration.

Other minor updates:
- Always close http response body
- Update go-libp2p and go-libipni

Fixes issue #364
  • Loading branch information
gammazero authored Nov 24, 2023
1 parent b5a3c45 commit 791a485
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 69 deletions.
2 changes: 2 additions & 0 deletions cmd/provider/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func connectCommand(cctx *cli.Context) error {
if err != nil {
return err
}
defer resp.Body.Close()

// Handle failed requests
if resp.StatusCode != http.StatusOK {
return errFromHttpResp(resp)
Expand Down
1 change: 1 addition & 0 deletions cmd/provider/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func daemonCommand(cctx *cli.Context) error {
engine.WithPublisherKind(engine.PublisherKind(cfg.Ingest.PublisherKind)),
engine.WithHttpPublisherListenAddr(httpListenAddr),
engine.WithHttpPublisherAnnounceAddr(cfg.Ingest.HttpPublisher.AnnounceMultiaddr),
engine.WithPubsubAnnounce(!cfg.DirectAnnounce.NoPubsubAnnounce),
engine.WithSyncPolicy(syncPolicy),
engine.WithRetrievalAddrs(cfg.ProviderServer.RetrievalMultiaddrs...),
)
Expand Down
3 changes: 2 additions & 1 deletion cmd/provider/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ func beforeImportCar(cctx *cli.Context) error {
}

func doImportCar(cctx *cli.Context) error {

mdBytes, err := md.MarshalBinary()
if err != nil {
return err
Expand All @@ -101,6 +100,8 @@ func doImportCar(cctx *cli.Context) error {
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return errFromHttpResp(resp)
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/provider/internal/config/announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
// DirectAnnounce configures the target indexers that advertisement announce
// messages are sent directly to via HTTP.
type DirectAnnounce struct {
// NoPubsubAnnounce disables pubsub announce when set to true. The default
// behavior (false) is to enable sending advertisement announcements via
// gossib pubsub.
NoPubsubAnnounce bool
// URLs is a list of indexer URLs to send HTTP announce messages to.
URLs []string
}
Expand Down
1 change: 1 addition & 0 deletions cmd/provider/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func doRemoveCar(cctx *cli.Context) error {
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return errFromHttpResp(resp)
Expand Down
35 changes: 22 additions & 13 deletions doc/publisher-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,25 @@ The index-provider engine must be configured to use a libp2p publisher, HTTP pub
- `Libp2pHttpPublisher` serves advertisements using both HTTP and libp2p servers.
- `DataTransferPublisher` exposes a data-transfer/graphsync server that allows peers in the network to sync advertisements. This option is being discontinued. Only provided as a fallback in case HttpPublisher and Libp2pHttpPublisher are not working.

If `WithPublisherKind` is not provided a value, it defaults to `NoPublisher` and advertisements are only stored locally and no announcements are made. If configuring the command-line application, `WithPublisherKind` is configured by setting the `config.Ingest.PublisherKind` item in the configuration file to a value of "http", "libp2p", "libp2phttp, "dtsync", or "".
If `WithPublisherKind` is not provided a value, it defaults to `NoPublisher` and advertisements are only stored locally and no announcements are made. If configuring the command-line application, `WithPublisherKind` is configured by setting the `Ingest.PublisherKind` item in the configuration file to a value of "http", "libp2p", "libp2phttp, "dtsync", or "".

For all publisher kinds, except the `DataTransfer` publisher, the `WithHttpPublisherAnnounceAddr` option sets the addresses that are announced to indexers, telling the indexers where to fetch advertisements from. If configuring the command-line application, `WithHttpPublisherAnnounceAddr` is configured by specifying multiaddr strings in `config.Ingest.HttpPublisher.AnnounceMultiaddr`.
For all publisher kinds, except the `DataTransfer` publisher, the `WithHttpPublisherAnnounceAddr` option sets the addresses that are announced to indexers, telling the indexers where to fetch advertisements from. If configuring the command-line application, `WithHttpPublisherAnnounceAddr` is configured by specifying multiaddr strings in `Ingest.HttpPublisher.AnnounceMultiaddr`.

In future index-provider releases support for the DataTransfer publisher kind will be removed.

## Publishing vs Announcing

When a new advertisement is made available by a publisher, the new advertisement's CID is generally announced to one or more indexers. An announcement message is constructed and is sent to indexers over libp2p gossip pubsub where it is received by any indexers subscribed to the topic on which the message was publisher. Or, the announcement message is sent directly to specific indexers by HTTP.
When a new advertisement is made available by a publisher, the new advertisement's CID is generally announced to one or more indexers. An announcement message is constructed and is sent to indexers over libp2p gossip pubsub where it is received by any indexers subscribed to the topic on which the message was publisher. The announcement message may also be sent directly to specific indexers via HTTP.

The `WithDirectAnnounce` option enables sending announcements directly, via HTTP, to the indexer URLs specified. If this option is not configured or no URLs specified, then direct HTTP announcement is disabled. The corresponding config file item is `DirectAnnounce.URLs`. The `WithPubsubAnnounce` option configures whether or not to broadcast announcements to all subscribed indexers. The corresponding config file item is `DirectAnnounce.NoPubsubAnnounce`.

One, both, or none of these announcement methods may be used to make announcements for the publisher, without regard to what kind of publisher is configured. It is only necessary that the announcement message is created with one or more addresses that indexers can use to contact the publisher. The address(es) configured by `WithHttpPublisherAnnounceAddr` may depend on the type of publisher. Otherwise, the configuration of announcement senders is totally separate from the publisher.

If using a plain HTTP server, then provide addresses that specify the "http" or "https" protocol. For example "/dns4/ipni.example.com/tcp/80/https" uses "ipni.example.com" as a DNS address that is expected to resolve to a public address that handles TLS termination, and indicated by the "https" portion. If using a Libp2p server, then specify address(es) that your libp2p host can be contacted on _without_ the "http". Specifying multiple addresses to announce is OK. If no addresses are specified, then the listening HTTP addresses are used if there is an HTTP publisher, and the libp2p host addresses are used if there is a libp2p server.
If using a plain HTTP server, then provide addresses that specify the "http" or "https" protocol. For example "/dns4/ipni.example.com/tcp/80/https" uses "ipni.example.com" as a DNS address that is expected to resolve to a public address that handles TLS termination, as indicated by the "https" portion. If using a Libp2p server, then specify address(es) that your libp2p host can be contacted on _without_ the "http". Specifying multiple addresses to announce is OK. If no addresses are specified, then the listening HTTP addresses are used if there is an HTTP publisher, and the libp2p host addresses are used if there is a libp2p server.

## Configure HTTP server `HttpPublisher` publisher kind

The publisher can be configured to server HTTP using a plain (non-libp2p) HTTP server. This is configured by calling `WithPublisherKind(HttpPublisher)`. If configuring the command-line application, this is set in the configuration file as `config.Ingest.HttpPublisher.ListenMultiaddr`.
The publisher can be configured to server HTTP using a plain (non-libp2p) HTTP server. This is configured by calling `WithPublisherKind(HttpPublisher)`. If configuring the command-line application, this is set in the configuration file as `Ingest.HttpPublisher.ListenMultiaddr`.

The publisher's HTTP listen address is configured using the engine option `WithHttpPublisherListenAddr`. If unset, the default net listen address of '0.0.0.0:3104' is used.

Expand All @@ -51,7 +53,7 @@ The `HttpPublisher` kind can use either the publisher's HTTP server, or use an e

This is the default for the `HttpPublisher`.

At least one HTTP listen address is required for the HTTP server to listen on. An HTTP listen address is supplied by the `WithHttpPublisherListenAddr` option. The corresponding config file item is `config.Ingest.HttpPublisher.ListenMultiaddr`. If left unspecified a default listen address is provided. This does not apply if using one of the http provider kinds.
At least one HTTP listen address is required for the HTTP server to listen on. An HTTP listen address is supplied by the `WithHttpPublisherListenAddr` option. The corresponding config file item is `Ingest.HttpPublisher.ListenMultiaddr`. If left unspecified a default listen address is provided. This does not apply if using one of the http provider kinds.

#### Existing HTTP server

Expand All @@ -77,7 +79,7 @@ When serving HTTP over libp2p, it is not necessary to specify a publisher HTTP l

The engine always has a libp2p stream host supplied to it with the `WithHost` option or created internally. This libp2p host is give to the engine's HTTP publisher. The private key associated with the libp2p host's Identity is given to the engine using the `WithPrivateKey` option, and is also given to the publisher. This allows advertisements to be signed.

If using the command-line, the libp2p host Identity and private key are configured using the `config.Identiry.PeerID` and `config.Identity.PrivKey` configuration file items. The libp2p host's listen address is configured using the config file item `config.ProviderServer.ListenMultiaddr`.
If using the command-line, the libp2p host Identity and private key are configured using the `Identiry.PeerID` and `Identity.PrivKey` configuration file items. The libp2p host's listen address is configured using the config file item `ProviderServer.ListenMultiaddr`.

## Configure HTTP over libp2p and HTTP with `Libp2pHttpPublisher` publisher kind

Expand All @@ -91,15 +93,22 @@ Publishing with data-transfer/graphsync is legacy configuration, and is only sup

- Tell indexers where to fetch advertisements from:
- `WithHttpPublisherAnnounceAddr`
- `"config"."Ingest"."HttpPublisher"."AnnounceMultiaddr"`
- `"Ingest"."HttpPublisher"."AnnounceMultiaddr"`
- Listen for plain HTTP requests for advertisements
- `WithHttpPublisherListenAddr`
- `"config"."Ingest"."HttpPublisher"."ListenMultiaddr"`
- `"Ingest"."HttpPublisher"."ListenMultiaddr"`
- Tell retrieval clients where to retrieve content from, by advertising these addrs:
- `WithRetrievalAddrs`
- `"ProviderServer"."RetrievalMultiaddrs"
- `"ProviderServer"."RetrievalMultiaddrs"`
- Configure the interface that the content-retrieval-server listens on and provider ID:
- `WithProvider`
- `WithProvider`
- `"ProviderServer"."ListenMultiaddr"

A data-transfer publisher is configured by specifying the engine option `WithPublisherKind(DataTransferPublisher)`. When this option is specified, all of the `HttpPublisher` and `Libp2pPublisher` options are ignored.
- Send advertisement announcements to specific indexers via HTTP:
- `WithDirectAnnounce`
- `"DirectAnnounce"."URLs"`
- Disable/enable sending advertisement announcements via pubsub:
- `WithPubsubAnnounce`
- `"DirectAnnounce"."NoPubsubAnnounce"`


A data-transfer publisher is configured by specifying the engine option `WithPublisherKind(DataTransferPublisher)`. When this option is specified, all of the `HttpPublisher` and `Libp2pPublisher` options are ignored. This option is deprecated and will not be supported in the future.
65 changes: 38 additions & 27 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type Engine struct {

publisher dagsync.Publisher
senders []announce.Sender
// announceMsg is the message that is logged when an announcement is sent.
announceMsg string

mhLister provider.MultihashLister
cblk sync.Mutex
Expand Down Expand Up @@ -116,7 +118,7 @@ func (e *Engine) Start(ctx context.Context) error {
return err
}

e.publisher, err = e.newPublisher()
e.publisher, err = e.newPublisher(e.pubHttpListenAddr, e.pubHttpHandlerPath)
if err != nil {
log.Errorw("Failed to create publisher", "err", err)
return err
Expand All @@ -133,7 +135,7 @@ func (e *Engine) Start(ctx context.Context) error {
}

// If publisher created, then create announcement senders.
e.senders, err = e.createSenders()
e.senders, err = e.createSenders(e.announceURLs, e.pubsubAnnounce, e.pubsubExtraGossipData)
if err != nil {
return err
}
Expand All @@ -142,16 +144,16 @@ func (e *Engine) Start(ctx context.Context) error {
return nil
}

func (e *Engine) newPublisher() (dagsync.Publisher, error) {
func (e *Engine) newPublisher(httpListenAddr, httpPath string) (dagsync.Publisher, error) {
switch e.pubKind {
case NoPublisher:
log.Info("Remote announcements disabled; all advertisements will only be stored locally.")
return nil, nil
case HttpPublisher:
httpPub, err := ipnisync.NewPublisher(e.lsys, e.key,
ipnisync.WithHTTPListenAddrs(e.pubHttpListenAddr),
ipnisync.WithHTTPListenAddrs(httpListenAddr),
ipnisync.WithHeadTopic(e.pubTopicName),
ipnisync.WithHandlerPath(e.pubHttpHandlerPath),
ipnisync.WithHandlerPath(httpPath),
ipnisync.WithStartServer(!e.pubHttpWithoutServer))
if err != nil {
return nil, fmt.Errorf("cannot create publisher: %w", err)
Expand All @@ -176,9 +178,9 @@ func (e *Engine) newPublisher() (dagsync.Publisher, error) {
case Libp2pHttpPublisher:
libp2phttpPub, err := ipnisync.NewPublisher(e.lsys, e.key,
ipnisync.WithStreamHost(e.h),
ipnisync.WithHTTPListenAddrs(e.pubHttpListenAddr),
ipnisync.WithHTTPListenAddrs(httpListenAddr),
ipnisync.WithHeadTopic(e.pubTopicName),
ipnisync.WithHandlerPath(e.pubHttpHandlerPath),
ipnisync.WithHandlerPath(httpPath),
ipnisync.WithStartServer(!e.pubHttpWithoutServer))
if err != nil {
return nil, fmt.Errorf("cannot create publisher: %w", err)
Expand Down Expand Up @@ -209,33 +211,49 @@ func (e *Engine) newPublisher() (dagsync.Publisher, error) {
return nil, fmt.Errorf("unknown publisher kind %s, expecting one of %v", e.pubKind, []PublisherKind{HttpPublisher, Libp2pPublisher, Libp2pHttpPublisher, DataTransferPublisher})
}

func (e *Engine) createSenders() ([]announce.Sender, error) {
func (e *Engine) createSenders(announceURLs []*url.URL, pubsubOK bool, extraGossipData []byte) ([]announce.Sender, error) {
var senders []announce.Sender
var hasHttpSender, hasP2pSender bool

// If there are announce URLs, then creage an announce sender to send
// direct HTTP announce messages to these URLs.
if len(e.announceURLs) != 0 {
if len(announceURLs) != 0 {
id, err := peer.IDFromPrivateKey(e.key)
if err != nil {
return nil, fmt.Errorf("cannot get peer ID from private key: %w", err)
}
httpSender, err := httpsender.New(e.announceURLs, id)
httpSender, err := httpsender.New(announceURLs, id)
if err != nil {
return nil, fmt.Errorf("cannot create http announce sender: %w", err)
}
senders = append(senders, httpSender)
hasHttpSender = true
log.Info("HTTP announcements enabled")
}

// If there is a libp2p host, then create a gossip pubsub announce sender.
if e.h != nil {
// If pubsub announcements are enabled and there is a libp2p host, then
// create a gossip pubsub announce sender.
if pubsubOK && e.h != nil {
// Create an announce sender to send over gossip pubsub.
p2pSender, err := p2psender.New(e.h, e.pubTopicName,
p2psender.WithTopic(e.pubTopic),
p2psender.WithExtraData(e.pubExtraGossipData))
p2psender.WithExtraData(extraGossipData))
if err != nil {
return nil, fmt.Errorf("cannot create p2p pubsub announce sender: %w", err)
}
senders = append(senders, p2pSender)
hasP2pSender = true
log.Info("Pubsub announcements enabled")
}

if hasHttpSender && hasP2pSender {
e.announceMsg = "Announcing advertisement in pubsub channel and via http"
} else if hasHttpSender {
e.announceMsg = "Announcing advertisement via http"
} else if hasP2pSender {
e.announceMsg = "Announcing advertisement in pubsub channel"
} else {
e.announceMsg = "Cannot announce advertisement, no http or pubsub senders configured"
}

return senders, nil
Expand All @@ -253,6 +271,8 @@ func (e *Engine) announce(ctx context.Context, c cid.Cid) {
// put into the announce message, instead of using the libp2p host's
// addresses.
err = announce.Send(ctx, c, e.h.Addrs(), e.senders...)
case NoPublisher:
// Announcements disabled.
}
if err != nil {
log.Errorw("Failed to announce advertisement", "err", err)
Expand Down Expand Up @@ -308,17 +328,7 @@ func (e *Engine) Publish(ctx context.Context, adv schema.Advertisement) (cid.Cid

// Only announce the advertisement CID if publisher is configured.
if e.publisher != nil {
log := log.With("adCid", c)
if len(e.announceURLs) == 0 && e.h != nil {
log.Info("Announcing advertisement in pubsub channel")
} else if len(e.announceURLs) != 0 && e.h == nil {
log.Info("Announcing advertisement via http")
} else if len(e.announceURLs) != 0 && e.h != nil {
log.Info("Announcing advertisement in pubsub channel and via http")
} else {
return cid.Undef, fmt.Errorf("unexpected publisher state, no announceURLs or libp2p host")
}

log.Infow(e.announceMsg, "adCid", c)
e.publisher.SetRoot(c)
e.announce(ctx, c)
}
Expand Down Expand Up @@ -388,6 +398,10 @@ func (e *Engine) httpAnnounce(ctx context.Context, adCid cid.Cid, announceURLs [
if len(announceURLs) == 0 {
return nil
}
if e.publisher == nil {
log.Info("No publisher to announce advertisements for")
return nil
}

// Create announce message.
msg := message.Message{
Expand All @@ -397,9 +411,6 @@ func (e *Engine) httpAnnounce(ctx context.Context, adCid cid.Cid, announceURLs [
// The publisher kind determines what addresses to put into the announce
// message.
switch e.pubKind {
case NoPublisher:
log.Info("Remote announcements disabled")
return nil
case HttpPublisher, Libp2pPublisher, Libp2pHttpPublisher:
// e.pubHttpAnnounceAddrs is always set in newPublisher.
msg.SetAddrs(e.pubHttpAnnounceAddrs)
Expand Down
Loading

0 comments on commit 791a485

Please sign in to comment.