Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate announcement senders from publisher #392

Merged
merged 6 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion delegatedrouting/listener_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,5 @@ func TestShouldProcessMillionCIDsInThirtySeconds(t *testing.T) {
start := time.Now()
provideMany(t, client, ctx, cids)

require.True(t, time.Since(start) < timeExpectation)
require.Less(t, time.Since(start), timeExpectation)
}
4 changes: 2 additions & 2 deletions e2e_retrieve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func testRetrievalRoundTripWithTestCase(t *testing.T, tc testCase) {
// TODO: Review after resolution of https://github.com/filecoin-project/go-legs/issues/95
// For now instantiate a new host for subscriber so that dt constructed by test client works.
subHost := newHost(t)
sub, err := dagsync.NewSubscriber(subHost, client.store, client.lsys, testTopic, nil)
sub, err := dagsync.NewSubscriber(subHost, client.store, client.lsys, testTopic)
require.NoError(t, err)

serverInfo := peer.AddrInfo{
Expand Down Expand Up @@ -193,7 +193,7 @@ func testReimportCarWtihTestCase(t *testing.T, tc testCase) {
// TODO: Review after resolution of https://github.com/filecoin-project/go-legs/issues/95
// For now instantiate a new host for subscriber so that dt constructed by test client works.
subHost := newHost(t)
sub, err := dagsync.NewSubscriber(subHost, client.store, client.lsys, testTopic, nil)
sub, err := dagsync.NewSubscriber(subHost, client.store, client.lsys, testTopic)
require.NoError(t, err)

serverInfo := peer.AddrInfo{
Expand Down
134 changes: 83 additions & 51 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/ipni/go-libipni/metadata"
provider "github.com/ipni/index-provider"
"github.com/ipni/index-provider/engine/chunker"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)
Expand Down Expand Up @@ -54,6 +56,7 @@ type Engine struct {
entriesChunker *chunker.CachedEntriesChunker

publisher dagsync.Publisher
senders []announce.Sender

mhLister provider.MultihashLister
cblk sync.Mutex
Expand Down Expand Up @@ -127,9 +130,13 @@ func (e *Engine) Start(ctx context.Context) error {
return fmt.Errorf("could not get latest advertisement cid: %w", err)
}
if adCid != cid.Undef {
if err = e.publisher.SetRoot(ctx, adCid); err != nil {
return err
}
e.publisher.SetRoot(adCid)
}

// If publisher created, then create announcement senders.
e.senders, err = createSenders(e.announceURLs, e.h, e.pubTopicName, e.pubExtraGossipData, e.pubTopic)
if err != nil {
return err
}
}

Expand All @@ -139,49 +146,79 @@ func (e *Engine) Start(ctx context.Context) error {
func (e *Engine) newPublisher() (dagsync.Publisher, error) {
switch e.pubKind {
case NoPublisher:
log.Info("Remote announcements is disabled; all advertisements will only be store locally.")
log.Info("Remote announcements disabled; all advertisements will only be stored locally.")
return nil, nil
case DataTransferPublisher, HttpPublisher:
default:
return nil, fmt.Errorf("unknown publisher kind: %s", e.pubKind)
case HttpPublisher:
httpPub, err := httpsync.NewPublisher(e.pubHttpListenAddr, e.lsys, e.key)
if err != nil {
return nil, fmt.Errorf("cannot create http publisher: %w", err)
}
return httpPub, nil
case DataTransferPublisher:
if e.pubDT != nil {
dtPub, err := dtsync.NewPublisherFromExisting(e.pubDT, e.h, e.pubTopicName, e.lsys, dtsync.WithAllowPeer(e.syncPolicy.Allowed))
if err != nil {
return nil, fmt.Errorf("cannot create data-transfer publisher with existing dt manager: %w", err)
}
return dtPub, nil
}
ds := dsn.Wrap(e.ds, datastore.NewKey("/dagsync/dtsync/pub"))
dtPub, err := dtsync.NewPublisher(e.h, ds, e.lsys, e.pubTopicName, dtsync.WithAllowPeer(e.syncPolicy.Allowed))
if err != nil {
return nil, fmt.Errorf("cannot create data-transfer publisher: %w", err)
}
return dtPub, nil
}
return nil, fmt.Errorf("unknown publisher kind: %s", e.pubKind)
}

func createSenders(directAnnounceURLs []*url.URL, p2pHost host.Host, pubsubTopicName string, extraGossipData []byte, existingTopic *pubsub.Topic) ([]announce.Sender, error) {
var senders []announce.Sender

// If there are announce URLs, then creage an announce sender to send
// direct HTTP announce messages to these URLs.
if len(e.announceURLs) != 0 {
httpSender, err := httpsender.New(e.announceURLs, e.h.ID())
if len(directAnnounceURLs) != 0 {
var peerID peer.ID
if p2pHost != nil {
peerID = p2pHost.ID()
}
httpSender, err := httpsender.New(directAnnounceURLs, peerID)
if err != nil {
return nil, fmt.Errorf("cannot create http announce sender: %w", err)
}
senders = append(senders, httpSender)
}

// If there is a libp2p host, then create a gossip pubsub announce sender.
if e.h != nil {
if p2pHost != nil {
// Create an announce sender to send over gossip pubsub.
p2pSender, err := p2psender.New(e.h, e.pubTopicName, p2psender.WithTopic(e.pubTopic))
p2pSender, err := p2psender.New(p2pHost, pubsubTopicName,
p2psender.WithTopic(existingTopic),
p2psender.WithExtraData(extraGossipData))
if err != nil {
return nil, err
return nil, fmt.Errorf("cannot create p2p pubsub announce sender: %w", err)
}
senders = append(senders, p2pSender)
}

if e.pubKind == HttpPublisher {
return httpsync.NewPublisher(e.pubHttpListenAddr, e.lsys, e.key, httpsync.WithAnnounceSenders(senders...))
}
return senders, nil
}

dtOpts := []dtsync.Option{
dtsync.WithExtraData(e.pubExtraGossipData),
dtsync.WithAllowPeer(e.syncPolicy.Allowed),
dtsync.WithAnnounceSenders(senders...),
// announce uses the engines senders to send advertisement announcement messages.
func (e *Engine) announce(ctx context.Context, c cid.Cid) {
var err error
switch e.pubKind {
case HttpPublisher:
err = announce.Send(ctx, c, e.pubHttpAnnounceAddrs, e.senders...)
case DataTransferPublisher:
// TODO: It may be necessary to specify a set of external addresses to
// put into the announce message, instead of using the libp2p host's
// addresses.
err = announce.Send(ctx, c, e.h.Addrs(), e.senders...)
}
if e.pubDT != nil {
return dtsync.NewPublisherFromExisting(e.pubDT, e.h, e.pubTopicName, e.lsys, dtOpts...)
if err != nil {
log.Errorw("Failed to announce advertisement", "err", err)
}
ds := dsn.Wrap(e.ds, datastore.NewKey("/dagsync/dtsync/pub"))
return dtsync.NewPublisher(e.h, ds, e.lsys, e.pubTopicName, dtOpts...)
}

// PublishLocal stores the advertisement in the local link system and marks it
Expand Down Expand Up @@ -239,24 +276,8 @@ func (e *Engine) Publish(ctx context.Context, adv schema.Advertisement) (cid.Cid
log.Info("Announcing advertisement in pubsub channel and via http")
}

// The publishers have their own senders of announcements. Further, there is a bespoke sender in the engine
// to allow explicit announcements via HTTP. The catch is that their behaviour is inconsistent:
// * engine takes pubHttpAnnounceAddrs option to allow configuring which addrs should be announced.
// But those addrs are only used by the bespoke sender, _not_ the HTTP sender inside publishers.
//
// To work around this issue, check if announce addrs are set, and publisher kind is HTTP, and
// if so announce with explicit addresses configured.
if len(e.pubHttpAnnounceAddrs) > 0 && e.pubKind == HttpPublisher {
err = e.publisher.UpdateRootWithAddrs(ctx, c, e.pubHttpAnnounceAddrs)
} else {
err = e.publisher.UpdateRoot(ctx, c)
}

if err != nil {
log.Errorw("Failed to announce advertisement", "err", err)
// Do not consider a failure to announce an error, since publishing
// locally worked.
}
e.publisher.SetRoot(c)
e.announce(ctx, c)
}

return c, nil
Expand All @@ -282,30 +303,30 @@ func (e *Engine) latestAdToPublish(ctx context.Context) (cid.Cid, error) {
return adCid, nil
}

// PublishLatest re-publishes the latest existing advertisement to pubsub.
// PublishLatest re-publishes the latest existing advertisement and send
// announcements using the engine's configured senders.
func (e *Engine) PublishLatest(ctx context.Context) (cid.Cid, error) {
adCid, err := e.latestAdToPublish(ctx)
if err != nil {
return cid.Undef, err
}
log.Infow("Publishing latest advertisement", "cid", adCid)

err = e.publisher.UpdateRoot(ctx, adCid)
if err != nil {
return adCid, err
}
e.publisher.SetRoot(adCid)
e.announce(ctx, adCid)

return adCid, nil
}

// PublishLatestHTTP publishes the latest existing advertisement to the
// specific indexers.
// PublishLatestHTTP publishes the latest existing advertisement and sends
// direct HTTP announcements to the specified URLs.
func (e *Engine) PublishLatestHTTP(ctx context.Context, announceURLs ...*url.URL) (cid.Cid, error) {
adCid, err := e.latestAdToPublish(ctx)
if err != nil {
return cid.Undef, err
}

e.publisher.SetRoot(adCid)
err = e.httpAnnounce(ctx, adCid, announceURLs)
if err != nil {
return adCid, err
Expand All @@ -314,6 +335,9 @@ func (e *Engine) PublishLatestHTTP(ctx context.Context, announceURLs ...*url.URL
return adCid, nil
}

// httpAnnounce creates an HTTP sender to send an announcement to the specified
// URLs. An HTTP sender is created because the engine may not have an HTTP
// sender that sends to the URLs specified by announceURLs.
func (e *Engine) httpAnnounce(ctx context.Context, adCid cid.Cid, announceURLs []*url.URL) error {
if ctx.Err() != nil {
return ctx.Err()
Expand All @@ -334,6 +358,9 @@ func (e *Engine) httpAnnounce(ctx context.Context, adCid cid.Cid, announceURLs [
log.Info("Remote announcements disabled")
return nil
case DataTransferPublisher:
// TODO: It may be necessary to specify a set of external addresses to
// put into the announce message, instead of using the libp2p host's
// addresses.
msg.SetAddrs(e.h.Addrs())
case HttpPublisher:
if len(e.pubHttpAnnounceAddrs) != 0 {
Expand Down Expand Up @@ -413,13 +440,18 @@ func (e *Engine) LinkSystem() *ipld.LinkSystem {
// Shutdown shuts down the engine and discards all resources opened by the
// engine. The engine is no longer usable after the call to this function.
func (e *Engine) Shutdown() error {
var errs error
var err, errs error
if e.publisher != nil {
if err := e.publisher.Close(); err != nil {
for i := range e.senders {
if err = e.senders[i].Close(); err != nil {
errs = multierror.Append(errs, fmt.Errorf("error closing sender: %s", err))
}
}
if err = e.publisher.Close(); err != nil {
errs = multierror.Append(errs, fmt.Errorf("error closing leg publisher: %s", err))
}
}
if err := e.entriesChunker.Close(); err != nil {
if err = e.entriesChunker.Close(); err != nil {
errs = multierror.Append(errs, fmt.Errorf("error closing link entriesChunker: %s", err))
}
return errs
Expand Down
38 changes: 20 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ require (
github.com/ipld/go-codec-dagpb v1.6.0
github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0
github.com/ipld/go-ipld-prime v0.20.0
github.com/ipni/go-libipni v0.2.2
github.com/libp2p/go-libp2p v0.28.0
github.com/ipni/go-libipni v0.3.0
github.com/libp2p/go-libp2p v0.29.0
github.com/libp2p/go-libp2p-pubsub v0.9.3
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multiaddr v0.9.0
github.com/multiformats/go-multiaddr v0.10.1
github.com/multiformats/go-multicodec v0.9.0
github.com/multiformats/go-multihash v0.2.3
github.com/prometheus/client_golang v1.15.1
Expand All @@ -39,7 +39,9 @@ require (
github.com/filecoin-project/go-amt-ipld/v4 v4.0.0 // indirect
github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0 // indirect
github.com/filecoin-project/go-state-types v0.9.9 // indirect
github.com/google/pprof v0.0.0-20230602150820-91b7bce49751 // indirect
github.com/gammazero/channelqueue v0.2.1 // indirect
github.com/gammazero/deque v0.2.1 // indirect
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect
Expand All @@ -48,18 +50,18 @@ require (
github.com/ipfs/go-unixfs v0.4.5 // indirect
github.com/libp2p/go-libp2p-record v0.2.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/onsi/ginkgo/v2 v2.9.7 // indirect
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-19 v0.3.2 // indirect
github.com/quic-go/qtls-go1-20 v0.2.2 // indirect
github.com/quic-go/quic-go v0.33.0 // indirect
github.com/quic-go/quic-go v0.36.2 // indirect
github.com/quic-go/webtransport-go v0.5.3 // indirect
github.com/samber/lo v1.36.0 // indirect
github.com/whyrusleeping/cbor-gen v0.0.0-20230418232409-daab9ece03a0 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/dig v1.17.0 // indirect
go.uber.org/fx v1.19.2 // indirect
golang.org/x/text v0.9.0 // indirect
go.uber.org/fx v1.20.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
)

Expand Down Expand Up @@ -119,7 +121,7 @@ require (
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
Expand All @@ -131,11 +133,11 @@ require (
github.com/libp2p/go-nat v0.2.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect
github.com/libp2p/go-reuseport v0.3.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.1 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/dns v1.1.54 // indirect
github.com/miekg/dns v1.1.55 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
Expand Down Expand Up @@ -169,13 +171,13 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/tools v0.9.1 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/tools v0.11.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
Expand Down
Loading