From 2f35326ee0ce928a27838e6665c2ccc377404a80 Mon Sep 17 00:00:00 2001 From: Andrew Gillis Date: Mon, 24 Jul 2023 06:55:45 -0700 Subject: [PATCH] Decoupe announce Senders from Publishers (#88) There is no strong reason to associate a set of announce Senders with a Publisher. The only association is that a Sender is often used to after updating the Sublisher's head CID. By separating these, it allows more flexibility in where in handling serving an advertisement chain is done and where sending advertisement announcements is done. --- announce/httpsender/option.go | 13 ++- announce/httpsender/sender.go | 8 ++ announce/p2psender/option.go | 13 ++- announce/p2psender/sender.go | 5 + announce/sender.go | 32 ++++++ dagsync/announce_test.go | 12 +-- dagsync/dtsync/option.go | 23 ----- dagsync/dtsync/publisher.go | 83 +--------------- dagsync/dtsync/syncer_test.go | 2 +- dagsync/example_test.go | 8 +- dagsync/http_test.go | 9 +- dagsync/httpsync/option.go | 47 --------- dagsync/httpsync/publisher.go | 129 ++++--------------------- dagsync/httpsync/publisher_test.go | 7 +- dagsync/httpsync/sync_test.go | 2 +- dagsync/interface.go | 14 +-- dagsync/legs_test.go | 30 +++--- dagsync/p2p/protocol/head/head.go | 25 ++--- dagsync/p2p/protocol/head/head_test.go | 6 +- dagsync/subscriber_test.go | 80 +++++++-------- dagsync/sync_test.go | 46 +++++---- 21 files changed, 201 insertions(+), 393 deletions(-) delete mode 100644 dagsync/httpsync/option.go diff --git a/announce/httpsender/option.go b/announce/httpsender/option.go index 584d85e..5005c9d 100644 --- a/announce/httpsender/option.go +++ b/announce/httpsender/option.go @@ -11,8 +11,9 @@ import ( const defaultTimeout = time.Minute type config struct { - timeout time.Duration client *http.Client + extraData []byte + timeout time.Duration userAgent string } @@ -33,6 +34,16 @@ func getOpts(opts []Option) (config, error) { return cfg, nil } +// WithExtraData sets the extra data to include in the announce message. +func WithExtraData(data []byte) Option { + return func(c *config) error { + if len(data) != 0 { + c.extraData = data + } + return nil + } +} + // WithTimeout configures the timeout to wait for a response. func WithTimeout(timeout time.Duration) Option { return func(cfg *config) error { diff --git a/announce/httpsender/sender.go b/announce/httpsender/sender.go index 1dc4597..14663ed 100644 --- a/announce/httpsender/sender.go +++ b/announce/httpsender/sender.go @@ -22,6 +22,7 @@ const DefaultAnnouncePath = "/announce" type Sender struct { announceURLs []string client *http.Client + extraData []byte peerID peer.ID userAgent string } @@ -68,6 +69,7 @@ func New(announceURLs []*url.URL, peerID peer.ID, options ...Option) (*Sender, e return &Sender{ announceURLs: urls, + extraData: opts.extraData, client: client, peerID: peerID, userAgent: opts.userAgent, @@ -86,6 +88,9 @@ func (s *Sender) Send(ctx context.Context, msg message.Message) error { if err != nil { return fmt.Errorf("cannot add p2p id to message addrs: %w", err) } + if len(s.extraData) != 0 { + msg.ExtraData = s.extraData + } buf := bytes.NewBuffer(nil) if err = msg.MarshalCBOR(buf); err != nil { return fmt.Errorf("cannot cbor encode announce message: %w", err) @@ -98,6 +103,9 @@ func (s *Sender) SendJson(ctx context.Context, msg message.Message) error { if err != nil { return fmt.Errorf("cannot add p2p id to message addrs: %w", err) } + if len(s.extraData) != 0 { + msg.ExtraData = s.extraData + } buf := new(bytes.Buffer) if err = json.NewEncoder(buf).Encode(msg); err != nil { return fmt.Errorf("cannot json encode announce message: %w", err) diff --git a/announce/p2psender/option.go b/announce/p2psender/option.go index 9d3684d..eeae2fc 100644 --- a/announce/p2psender/option.go +++ b/announce/p2psender/option.go @@ -8,7 +8,8 @@ import ( // config contains all options for configuring dtsync.publisher. type config struct { - topic *pubsub.Topic + topic *pubsub.Topic + extraData []byte } // Option is a function that sets a value in a config. @@ -32,3 +33,13 @@ func WithTopic(topic *pubsub.Topic) Option { return nil } } + +// WithExtraData sets the extra data to include in the announce message. +func WithExtraData(data []byte) Option { + return func(c *config) error { + if len(data) != 0 { + c.extraData = data + } + return nil + } +} diff --git a/announce/p2psender/sender.go b/announce/p2psender/sender.go index 41002ff..135209e 100644 --- a/announce/p2psender/sender.go +++ b/announce/p2psender/sender.go @@ -18,6 +18,7 @@ const shutdownTime = 5 * time.Second type Sender struct { cancelPubSub context.CancelFunc topic *pubsub.Topic + extraData []byte } // New creates a new Sender that sends announce messages over pubsub. @@ -41,6 +42,7 @@ func New(p2pHost host.Host, topicName string, options ...Option) (*Sender, error return &Sender{ cancelPubSub: cancelPubsub, topic: topic, + extraData: opts.extraData, }, nil } @@ -65,6 +67,9 @@ func (s *Sender) Close() error { // Send sends the Message to the pubsub topic. func (s *Sender) Send(ctx context.Context, msg message.Message) error { + if len(s.extraData) != 0 { + msg.ExtraData = s.extraData + } buf := bytes.NewBuffer(nil) if err := msg.MarshalCBOR(buf); err != nil { return err diff --git a/announce/sender.go b/announce/sender.go index 62dc95f..1639b0c 100644 --- a/announce/sender.go +++ b/announce/sender.go @@ -2,8 +2,12 @@ package announce import ( "context" + "errors" + "github.com/hashicorp/go-multierror" + "github.com/ipfs/go-cid" "github.com/ipni/go-libipni/announce/message" + "github.com/multiformats/go-multiaddr" ) // Sender is the interface for announce sender implementations. @@ -13,3 +17,31 @@ type Sender interface { // Send sends the announce Message. Send(context.Context, message.Message) error } + +// Send sends an advertisement announcement message, containing the specified +// addresses and extra data, to all senders. +func Send(ctx context.Context, c cid.Cid, addrs []multiaddr.Multiaddr, senders ...Sender) error { + // Do nothing if nothing to announce or no means to announce it. + if c == cid.Undef || len(senders) == 0 { + return nil + } + + msg := message.Message{ + Cid: c, + } + msg.SetAddrs(addrs) + + var errs error + for _, sender := range senders { + if sender == nil { + continue + } + if err := sender.Send(ctx, msg); err != nil { + errs = multierror.Append(errs, err) + if errors.Is(err, context.Canceled) { + return err + } + } + } + return errs +} diff --git a/dagsync/announce_test.go b/dagsync/announce_test.go index 5a33ecf..48f1563 100644 --- a/dagsync/announce_test.go +++ b/dagsync/announce_test.go @@ -58,8 +58,7 @@ func TestAnnounceReplace(t *testing.T) { hnd.subscriber.scopedBlockHookMutex.Lock() firstCid := chainLnks[2].(cidlink.Link).Cid - err = pub.SetRoot(context.Background(), firstCid) - require.NoError(t, err) + pub.SetRoot(firstCid) // Have the subscriber receive an announce. This is the same as if it was // published by the publisher without having to wait for it to arrive. @@ -79,16 +78,14 @@ func TestAnnounceReplace(t *testing.T) { // Announce two more times. c := chainLnks[1].(cidlink.Link).Cid - err = pub.SetRoot(context.Background(), c) - require.NoError(t, err) + pub.SetRoot(c) err = sub.Announce(context.Background(), c, srcHost.ID(), srcHost.Addrs()) require.NoError(t, err) t.Log("Sent announce for second CID", c) lastCid := chainLnks[0].(cidlink.Link).Cid - err = pub.SetRoot(context.Background(), lastCid) - require.NoError(t, err) + pub.SetRoot(lastCid) err = sub.Announce(context.Background(), lastCid, srcHost.ID(), srcHost.Addrs()) require.NoError(t, err) @@ -240,8 +237,7 @@ func TestAnnounceRepublish(t *testing.T) { chainLnks := test.MkChain(srcLnkS, true) firstCid := chainLnks[2].(cidlink.Link).Cid - err = pub.SetRoot(context.Background(), firstCid) - require.NoError(t, err) + pub.SetRoot(firstCid) // Announce one CID to subscriber1. err = sub1.Announce(context.Background(), firstCid, srcHost.ID(), srcHost.Addrs()) diff --git a/dagsync/dtsync/option.go b/dagsync/dtsync/option.go index 8f40faa..b30770d 100644 --- a/dagsync/dtsync/option.go +++ b/dagsync/dtsync/option.go @@ -3,7 +3,6 @@ package dtsync import ( "fmt" - "github.com/ipni/go-libipni/announce" "github.com/libp2p/go-libp2p/core/peer" ) @@ -15,9 +14,7 @@ const ( // config contains all options for configuring dtsync.publisher. type config struct { - extraData []byte allowPeer func(peer.ID) bool - senders []announce.Sender gsMaxInRequests uint64 gsMaxOutRequests uint64 @@ -40,16 +37,6 @@ func getOpts(opts []Option) (config, error) { return cfg, nil } -// WithExtraData sets the extra data to include in the pubsub message. -func WithExtraData(data []byte) Option { - return func(c *config) error { - if len(data) != 0 { - c.extraData = data - } - return nil - } -} - // WithAllowPeer sets the function that determines whether to allow or reject // graphsync sessions from a peer. func WithAllowPeer(allowPeer func(peer.ID) bool) Option { @@ -59,16 +46,6 @@ func WithAllowPeer(allowPeer func(peer.ID) bool) Option { } } -// WithAnnounceSenders sets announce.Senders to use for sending announcements. -func WithAnnounceSenders(senders ...announce.Sender) Option { - return func(c *config) error { - if len(senders) != 0 { - c.senders = senders - } - return nil - } -} - func WithMaxGraphsyncRequests(maxIn, maxOut uint64) Option { return func(c *config) error { c.gsMaxInRequests = maxIn diff --git a/dagsync/dtsync/publisher.go b/dagsync/dtsync/publisher.go index 0af8744..4546099 100644 --- a/dagsync/dtsync/publisher.go +++ b/dagsync/dtsync/publisher.go @@ -2,7 +2,6 @@ package dtsync import ( "context" - "errors" "fmt" "net/http" "sync" @@ -12,24 +11,19 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipld/go-ipld-prime" - "github.com/ipni/go-libipni/announce" - "github.com/ipni/go-libipni/announce/message" "github.com/ipni/go-libipni/dagsync/p2p/protocol/head" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" ) -// Publisher is a data-transfer publisher that announces the head of an -// advertisement chain to a set of configured senders. +// Publisher serves an advertisement over libp2p using data-transfer. type Publisher struct { closeOnce sync.Once dtManager dt.Manager dtClose dtCloseFunc - extraData []byte headPublisher *head.Publisher host host.Host - senders []announce.Sender } // NewPublisher creates a new dagsync publisher. @@ -50,10 +44,8 @@ func NewPublisher(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, t return &Publisher{ dtManager: dtManager, dtClose: dtClose, - extraData: opts.extraData, headPublisher: headPublisher, host: host, - senders: opts.senders, }, nil } @@ -85,10 +77,8 @@ func NewPublisherFromExisting(dtManager dt.Manager, host host.Host, topicName st startHeadPublisher(host, topicName, headPublisher) return &Publisher{ - extraData: opts.extraData, headPublisher: headPublisher, host: host, - senders: opts.senders, }, nil } @@ -108,65 +98,12 @@ func (p *Publisher) Protocol() int { return multiaddr.P_P2P } -// AnnounceHead announces the current head of the advertisement chain to the -// configured senders. -func (p *Publisher) AnnounceHead(ctx context.Context) error { - return p.announce(ctx, p.headPublisher.Root(), p.Addrs()) -} - -// AnnounceHeadWithAddrs announces the current head of the advertisement chain -// to the configured senders with the given addresses. -func (p *Publisher) AnnounceHeadWithAddrs(ctx context.Context, addrs []multiaddr.Multiaddr) error { - return p.announce(ctx, p.headPublisher.Root(), addrs) -} - -func (p *Publisher) announce(ctx context.Context, c cid.Cid, addrs []multiaddr.Multiaddr) error { - // Do nothing if nothing to announce or no means to announce it. - if c == cid.Undef || len(p.senders) == 0 { - return nil - } - - msg := message.Message{ - Cid: c, - ExtraData: p.extraData, - } - msg.SetAddrs(addrs) - - var errs error - for _, sender := range p.senders { - if err := sender.Send(ctx, msg); err != nil { - errs = multierror.Append(errs, err) - } - } - return errs -} - // SetRoot sets the root CID of the advertisement chain. -func (p *Publisher) SetRoot(ctx context.Context, c cid.Cid) error { - if c == cid.Undef { - return errors.New("cannot update to an undefined cid") - } - log.Debugf("Setting root CID: %s", c) - return p.headPublisher.UpdateRoot(ctx, c) -} - -// UpdateRoot updates the root CID of the advertisement chain and announces it -// to the configured senders. -func (p *Publisher) UpdateRoot(ctx context.Context, c cid.Cid) error { - return p.UpdateRootWithAddrs(ctx, c, p.Addrs()) -} - -// UpdateRootWithAddrs updates the root CID of the advertisement chain and -// announces it to the configured senders with the given addresses. -func (p *Publisher) UpdateRootWithAddrs(ctx context.Context, c cid.Cid, addrs []multiaddr.Multiaddr) error { - err := p.SetRoot(ctx, c) - if err != nil { - return err - } - return p.announce(ctx, c, addrs) +func (p *Publisher) SetRoot(c cid.Cid) { + p.headPublisher.SetRoot(c) } -// Close closes the publisher and all of its senders. +// Close closes the publisher. func (p *Publisher) Close() error { var errs error p.closeOnce.Do(func() { @@ -175,24 +112,12 @@ func (p *Publisher) Close() error { errs = multierror.Append(errs, err) } - for _, sender := range p.senders { - if err = sender.Close(); err != nil { - errs = multierror.Append(errs, err) - } - } - if p.dtClose != nil { err = p.dtClose() if err != nil { errs = multierror.Append(errs, err) } } - - for _, sender := range p.senders { - if err = sender.Close(); err != nil { - errs = multierror.Append(errs, err) - } - } }) return errs } diff --git a/dagsync/dtsync/syncer_test.go b/dagsync/dtsync/syncer_test.go index 834e3cc..7079eba 100644 --- a/dagsync/dtsync/syncer_test.go +++ b/dagsync/dtsync/syncer_test.go @@ -146,7 +146,7 @@ func TestDTSync_CallsBlockHookWhenCIDsArePartiallyFoundLocally(t *testing.T) { t.Cleanup(func() { require.NoError(t, pub.Close()) }) // Publish head. - require.NoError(t, pub.SetRoot(ctx, l3.(cidlink.Link).Cid)) + pub.SetRoot(l3.(cidlink.Link).Cid) } // Set up a syncer. diff --git a/dagsync/example_test.go b/dagsync/example_test.go index 4ce465a..2793d2f 100644 --- a/dagsync/example_test.go +++ b/dagsync/example_test.go @@ -41,9 +41,7 @@ func ExamplePublisher() { if err != nil { panic(err) } - if err = pub.UpdateRoot(context.Background(), lnk1.(cidlink.Link).Cid); err != nil { - panic(err) - } + pub.SetRoot(lnk1.(cidlink.Link).Cid) log.Print("Publish 1:", lnk1.(cidlink.Link).Cid) // Update root on publisher one with item @@ -52,9 +50,7 @@ func ExamplePublisher() { if err != nil { panic(err) } - if err = pub.UpdateRoot(context.Background(), lnk2.(cidlink.Link).Cid); err != nil { - panic(err) - } + pub.SetRoot(lnk2.(cidlink.Link).Cid) log.Print("Publish 2:", lnk2.(cidlink.Link).Cid) } diff --git a/dagsync/http_test.go b/dagsync/http_test.go index bda5e51..c92782a 100644 --- a/dagsync/http_test.go +++ b/dagsync/http_test.go @@ -77,8 +77,7 @@ func TestManualSync(t *testing.T) { rootLnk, err := test.Store(te.srcStore, basicnode.NewString("hello world")) require.NoError(t, err) - err = te.pub.SetRoot(context.Background(), rootLnk.(cidlink.Link).Cid) - require.NoError(t, err) + te.pub.SetRoot(rootLnk.(cidlink.Link).Cid) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -101,8 +100,7 @@ func TestSyncHttpFailsUnexpectedPeer(t *testing.T) { rootLnk, err := test.Store(te.srcStore, basicnode.NewString("hello world")) require.NoError(t, err) - err = te.pub.SetRoot(context.Background(), rootLnk.(cidlink.Link).Cid) - require.NoError(t, err) + te.pub.SetRoot(rootLnk.(cidlink.Link).Cid) ctx, cancel := context.WithTimeout(context.Background(), updateTimeout) @@ -158,8 +156,7 @@ func TestSyncFnHttp(t *testing.T) { // Assert the latestSync is updated by explicit sync when cid and selector are unset. newHead := chainLnks[0].(cidlink.Link).Cid - err = te.pub.SetRoot(context.Background(), newHead) - require.NoError(t, err) + te.pub.SetRoot(newHead) lnk := chainLnks[1] diff --git a/dagsync/httpsync/option.go b/dagsync/httpsync/option.go deleted file mode 100644 index fa862a7..0000000 --- a/dagsync/httpsync/option.go +++ /dev/null @@ -1,47 +0,0 @@ -package httpsync - -import ( - "fmt" - - "github.com/ipni/go-libipni/announce" -) - -// config contains all options for configuring dtsync.publisher. -type config struct { - extraData []byte - senders []announce.Sender -} - -// Option is a function that sets a value in a config. -type Option func(*config) error - -// getOpts creates a config and applies Options to it. -func getOpts(opts []Option) (config, error) { - var cfg config - for i, opt := range opts { - if err := opt(&cfg); err != nil { - return config{}, fmt.Errorf("option %d failed: %s", i, err) - } - } - return cfg, nil -} - -// WithAnnounceSenders sets announce.Senders to use for sending announcements. -func WithAnnounceSenders(senders ...announce.Sender) Option { - return func(c *config) error { - if len(senders) != 0 { - c.senders = senders - } - return nil - } -} - -// WithExtraData sets the extra data to include in the pubsub message. -func WithExtraData(data []byte) Option { - return func(c *config) error { - if len(data) != 0 { - c.extraData = data - } - return nil - } -} diff --git a/dagsync/httpsync/publisher.go b/dagsync/httpsync/publisher.go index cddcc95..2347774 100644 --- a/dagsync/httpsync/publisher.go +++ b/dagsync/httpsync/publisher.go @@ -1,7 +1,6 @@ package httpsync import ( - "context" "errors" "fmt" "io" @@ -12,22 +11,18 @@ import ( "strings" "sync" - "github.com/hashicorp/go-multierror" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/codec/dagjson" cidlink "github.com/ipld/go-ipld-prime/linking/cid" basicnode "github.com/ipld/go-ipld-prime/node/basic" - "github.com/ipni/go-libipni/announce" - "github.com/ipni/go-libipni/announce/message" ic "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" ) -// Publisher is an HTTP publisher that announces the head of an advertisement -// chain to a set of configured senders. +// Publisher serves an advertisement chain over HTTP. type Publisher struct { addr multiaddr.Multiaddr closer io.Closer @@ -35,22 +30,15 @@ type Publisher struct { handlerPath string peerID peer.ID privKey ic.PrivKey - rl sync.RWMutex + lock sync.Mutex root cid.Cid - senders []announce.Sender - extraData []byte } var _ http.Handler = (*Publisher)(nil) // NewPublisher creates a new http publisher, listening on the specified // address. -func NewPublisher(address string, lsys ipld.LinkSystem, privKey ic.PrivKey, options ...Option) (*Publisher, error) { - opts, err := getOpts(options) - if err != nil { - return nil, err - } - +func NewPublisher(address string, lsys ipld.LinkSystem, privKey ic.PrivKey) (*Publisher, error) { if privKey == nil { return nil, errors.New("private key required to sign head requests") } @@ -72,13 +60,11 @@ func NewPublisher(address string, lsys ipld.LinkSystem, privKey ic.PrivKey, opti proto, _ := multiaddr.NewMultiaddr("/http") pub := &Publisher{ - addr: multiaddr.Join(maddr, proto), - closer: l, - lsys: lsys, - peerID: peerID, - privKey: privKey, - senders: opts.senders, - extraData: opts.extraData, + addr: multiaddr.Join(maddr, proto), + closer: l, + lsys: lsys, + peerID: peerID, + privKey: privKey, } // Run service on configured port. @@ -98,8 +84,8 @@ func NewPublisher(address string, lsys ipld.LinkSystem, privKey ic.PrivKey, opti // requests on, e.g. "ipni" for `/ipni/...` requests. // // DEPRECATED: use NewPublisherWithoutServer(listener.Addr(), ...) -func NewPublisherForListener(listener net.Listener, handlerPath string, lsys ipld.LinkSystem, privKey ic.PrivKey, options ...Option) (*Publisher, error) { - return NewPublisherWithoutServer(listener.Addr().String(), handlerPath, lsys, privKey, options...) +func NewPublisherForListener(listener net.Listener, handlerPath string, lsys ipld.LinkSystem, privKey ic.PrivKey) (*Publisher, error) { + return NewPublisherWithoutServer(listener.Addr().String(), handlerPath, lsys, privKey) } // NewPublisherWithoutServer creates a new http publisher for an existing @@ -107,12 +93,7 @@ func NewPublisherForListener(listener net.Listener, handlerPath string, lsys ipl // the HTTP server is the caller's responsibility. ServeHTTP on the // returned Publisher can be used to handle requests. handlerPath is the // path to handle requests on, e.g. "ipni" for `/ipni/...` requests. -func NewPublisherWithoutServer(address string, handlerPath string, lsys ipld.LinkSystem, privKey ic.PrivKey, options ...Option) (*Publisher, error) { - opts, err := getOpts(options) - if err != nil { - return nil, err - } - +func NewPublisherWithoutServer(address string, handlerPath string, lsys ipld.LinkSystem, privKey ic.PrivKey) (*Publisher, error) { if privKey == nil { return nil, errors.New("private key required to sign head requests") } @@ -147,8 +128,6 @@ func NewPublisherWithoutServer(address string, handlerPath string, lsys ipld.Lin handlerPath: handlerPath, peerID: peerID, privKey: privKey, - senders: opts.senders, - extraData: opts.extraData, }, nil } @@ -169,83 +148,16 @@ func (p *Publisher) Protocol() int { return multiaddr.P_HTTP } -// AnnounceHead announces the head of the advertisement chain to the configured -// senders. -func (p *Publisher) AnnounceHead(ctx context.Context) error { - p.rl.Lock() - c := p.root - p.rl.Unlock() - return p.announce(ctx, c, p.Addrs()) -} - -// AnnounceHeadWithAddrs announces the head of the advertisement chain to the -// configured senders, with the provided addresses. -func (p *Publisher) AnnounceHeadWithAddrs(ctx context.Context, addrs []multiaddr.Multiaddr) error { - p.rl.Lock() - c := p.root - p.rl.Unlock() - return p.announce(ctx, c, addrs) -} - -func (p *Publisher) announce(ctx context.Context, c cid.Cid, addrs []multiaddr.Multiaddr) error { - // Do nothing if nothing to announce or no means to announce it. - if c == cid.Undef || len(p.senders) == 0 { - return nil - } - - log.Debugf("Publishing CID and addresses over HTTP: %s", c) - msg := message.Message{ - Cid: c, - ExtraData: p.extraData, - } - msg.SetAddrs(addrs) - - var errs error - for _, sender := range p.senders { - if err := sender.Send(ctx, msg); err != nil { - errs = multierror.Append(errs, err) - } - } - return errs -} - // SetRoot sets the head of the advertisement chain. -func (p *Publisher) SetRoot(_ context.Context, c cid.Cid) error { - p.rl.Lock() - defer p.rl.Unlock() +func (p *Publisher) SetRoot(c cid.Cid) { + p.lock.Lock() p.root = c - return nil -} - -// UpdateRoot updates the head of the advertisement chain and announces it to -// the configured senders. -func (p *Publisher) UpdateRoot(ctx context.Context, c cid.Cid) error { - return p.UpdateRootWithAddrs(ctx, c, p.Addrs()) + p.lock.Unlock() } -// UpdateRootWithAddrs updates the head of the advertisement chain and announces -// it to the configured senders, with the provided addresses. -func (p *Publisher) UpdateRootWithAddrs(ctx context.Context, c cid.Cid, addrs []multiaddr.Multiaddr) error { - err := p.SetRoot(ctx, c) - if err != nil { - return err - } - return p.announce(ctx, c, addrs) -} - -// Close closes the Publisher and all of its senders. +// Close closes the Publisher. func (p *Publisher) Close() error { - var errs error - err := p.closer.Close() - if err != nil { - errs = multierror.Append(errs, err) - } - for _, sender := range p.senders { - if err = sender.Close(); err != nil { - errs = multierror.Append(errs, err) - } - } - return errs + return p.closer.Close() } // ServeHTTP implements the http.Handler interface. @@ -262,14 +174,15 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if ask == "head" { // serve the head - p.rl.RLock() - defer p.rl.RUnlock() + p.lock.Lock() + rootCid := p.root + p.lock.Unlock() - if p.root == cid.Undef { + if rootCid == cid.Undef { http.Error(w, "", http.StatusNoContent) return } - marshalledMsg, err := newEncodedSignedHead(p.root, p.privKey) + marshalledMsg, err := newEncodedSignedHead(rootCid, p.privKey) if err != nil { http.Error(w, "Failed to encode", http.StatusInternalServerError) log.Errorw("Failed to serve root", "err", err) diff --git a/dagsync/httpsync/publisher_test.go b/dagsync/httpsync/publisher_test.go index fe1f705..e6fb11f 100644 --- a/dagsync/httpsync/publisher_test.go +++ b/dagsync/httpsync/publisher_test.go @@ -50,11 +50,12 @@ func TestNewPublisherForListener(t *testing.T) { privKey, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, rand.Reader) req.NoError(err) sender := &fakeSender{} - subject, err := httpsync.NewPublisherForListener(l, handlerPath, lsys, privKey, httpsync.WithAnnounceSenders(sender)) + subject, err := httpsync.NewPublisherForListener(l, handlerPath, lsys, privKey) req.NoError(err) - subject.SetRoot(ctx, rootLnk.(cidlink.Link).Cid) - req.NoError(subject.AnnounceHead(ctx)) + rootCid := rootLnk.(cidlink.Link).Cid + subject.SetRoot(rootCid) + req.NoError(announce.Send(ctx, rootCid, subject.Addrs(), sender)) req.Len(sender.msgs, 1) req.Equal(rootLnk.(cidlink.Link).Cid, sender.msgs[0].Cid) req.Len(sender.msgs[0].Addrs, 1) diff --git a/dagsync/httpsync/sync_test.go b/dagsync/httpsync/sync_test.go index 8068147..eae2a67 100644 --- a/dagsync/httpsync/sync_test.go +++ b/dagsync/httpsync/sync_test.go @@ -153,7 +153,7 @@ func TestHttpsync_AcceptsSpecCompliantDagJson(t *testing.T) { na.AssembleEntry("fish0").AssignString("lobster0") })) require.NoError(t, err) - require.NoError(t, pub.SetRoot(ctx, link.(cidlink.Link).Cid)) + pub.SetRoot(link.(cidlink.Link).Cid) ls := cidlink.DefaultLinkSystem() store := &memstore.Store{} diff --git a/dagsync/interface.go b/dagsync/interface.go index a61e041..3fa6643 100644 --- a/dagsync/interface.go +++ b/dagsync/interface.go @@ -17,20 +17,8 @@ type Publisher interface { ID() peer.ID // Protocol returns multiaddr protocol code (P_P2P or P_HTTP). Protocol() int - // AnnounceHead sends an announce messag, via all senders, to announce the - // current head advertisement CID. If there is no head, then does nothing. - AnnounceHead(context.Context) error - // AnnounceHeadWithAddrs sends an announce messag containing the specified - // addresses, via all senders, to announce the current head advertisement - // CID. If there is no head, then does nothing. - AnnounceHeadWithAddrs(context.Context, []multiaddr.Multiaddr) error // SetRoot sets the root CID without publishing it. - SetRoot(context.Context, cid.Cid) error - // UpdateRoot sets the root CID and publishes its update via all senders. - UpdateRoot(context.Context, cid.Cid) error - // UpdateRootWithAddrs publishes an update for the DAG, using custom - // multiaddrs, via all senders. - UpdateRootWithAddrs(context.Context, cid.Cid, []multiaddr.Multiaddr) error + SetRoot(cid.Cid) // Close publisher. Close() error } diff --git a/dagsync/legs_test.go b/dagsync/legs_test.go index 86de8f7..4ca74cc 100644 --- a/dagsync/legs_test.go +++ b/dagsync/legs_test.go @@ -16,6 +16,7 @@ import ( _ "github.com/ipld/go-ipld-prime/codec/dagjson" cidlink "github.com/ipld/go-ipld-prime/linking/cid" basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/ipni/go-libipni/announce" "github.com/ipni/go-libipni/announce/p2psender" "github.com/ipni/go-libipni/dagsync" "github.com/ipni/go-libipni/dagsync/dtsync" @@ -35,7 +36,7 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host, host.Host, dagsync.Publisher, *dagsync.Subscriber) { +func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host, host.Host, dagsync.Publisher, *dagsync.Subscriber, announce.Sender) { srcHost := test.MkTestHost() dstHost := test.MkTestHost() @@ -43,10 +44,10 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host, srcLnkS := test.MkLinkSystem(srcStore) - p2pSender, err := p2psender.New(nil, "", p2psender.WithTopic(topics[0])) + p2pSender, err := p2psender.New(nil, "", p2psender.WithTopic(topics[0]), p2psender.WithExtraData([]byte("t01000"))) require.NoError(t, err) - pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic, dtsync.WithExtraData([]byte("t01000")), dtsync.WithAnnounceSenders(p2pSender)) + pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic) require.NoError(t, err) srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour) @@ -61,7 +62,7 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching) (host.Host, require.NoError(t, test.WaitForP2PPublisher(pub, dstHost, testTopic)) - return srcHost, dstHost, pub, sub + return srcHost, dstHost, pub, sub, p2pSender } func TestAllowPeerReject(t *testing.T) { @@ -69,7 +70,7 @@ func TestAllowPeerReject(t *testing.T) { // Init dagsync publisher and subscriber srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) dstStore := dssync.MutexWrap(datastore.NewMapDatastore()) - srcHost, dstHost, pub, sub := initPubSub(t, srcStore, dstStore) + srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore) defer srcHost.Close() defer dstHost.Close() defer pub.Close() @@ -87,7 +88,8 @@ func TestAllowPeerReject(t *testing.T) { c := mkLnk(t, srcStore) // Update root with item - err := pub.UpdateRoot(context.Background(), c) + pub.SetRoot(c) + err := announce.Send(context.Background(), c, pub.Addrs(), sender) require.NoError(t, err) select { @@ -102,7 +104,7 @@ func TestAllowPeerAllows(t *testing.T) { // Init dagsync publisher and subscriber srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) dstStore := dssync.MutexWrap(datastore.NewMapDatastore()) - srcHost, dstHost, pub, sub := initPubSub(t, srcStore, dstStore) + srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore) defer srcHost.Close() defer dstHost.Close() defer pub.Close() @@ -119,7 +121,8 @@ func TestAllowPeerAllows(t *testing.T) { c := mkLnk(t, srcStore) // Update root with item - err := pub.UpdateRoot(context.Background(), c) + pub.SetRoot(c) + err := announce.Send(context.Background(), c, pub.Addrs(), sender) require.NoError(t, err) select { @@ -156,7 +159,7 @@ func TestPublisherRejectsPeer(t *testing.T) { p2pSender, err := p2psender.New(nil, "", p2psender.WithTopic(topics[0])) require.NoError(t, err) - pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic, dtsync.WithAllowPeer(allowPeer), dtsync.WithAnnounceSenders(p2pSender)) + pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic, dtsync.WithAllowPeer(allowPeer)) require.NoError(t, err) defer pub.Close() @@ -179,7 +182,8 @@ func TestPublisherRejectsPeer(t *testing.T) { c := mkLnk(t, srcStore) // Update root with item - err = pub.UpdateRoot(context.Background(), c) + pub.SetRoot(c) + err = announce.Send(context.Background(), c, pub.Addrs(), p2pSender) require.NoError(t, err) select { @@ -196,7 +200,8 @@ func TestPublisherRejectsPeer(t *testing.T) { c = mkLnk(t, srcStore) // Update root with item - err = pub.UpdateRoot(context.Background(), c) + pub.SetRoot(c) + err = announce.Send(context.Background(), c, pub.Addrs(), p2pSender) require.NoError(t, err) select { @@ -219,8 +224,7 @@ func TestIdleHandlerCleaner(t *testing.T) { rootLnk, err := test.Store(te.srcStore, basicnode.NewString("hello world")) require.NoError(t, err) - err = te.pub.SetRoot(context.Background(), rootLnk.(cidlink.Link).Cid) - require.NoError(t, err) + te.pub.SetRoot(rootLnk.(cidlink.Link).Cid) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/dagsync/p2p/protocol/head/head.go b/dagsync/p2p/protocol/head/head.go index 1be74e6..d39690a 100644 --- a/dagsync/p2p/protocol/head/head.go +++ b/dagsync/p2p/protocol/head/head.go @@ -25,7 +25,7 @@ const closeTimeout = 30 * time.Second var log = logging.Logger("dagsync/head") type Publisher struct { - rl sync.RWMutex + lock sync.Mutex root cid.Cid server *http.Server } @@ -134,11 +134,13 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - p.rl.RLock() - defer p.rl.RUnlock() + p.lock.Lock() + rootCid := p.root + p.lock.Unlock() + var out []byte - if p.root != cid.Undef { - currentHead := p.root.String() + if rootCid != cid.Undef { + currentHead := rootCid.String() log.Debug("Found current head: %s", currentHead) out = []byte(currentHead) } else { @@ -151,12 +153,11 @@ func (p *Publisher) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -// UpdateRoot sets the CID being published. -func (p *Publisher) UpdateRoot(_ context.Context, c cid.Cid) error { - p.rl.Lock() - defer p.rl.Unlock() +// SetRoot sets the CID being published. +func (p *Publisher) SetRoot(c cid.Cid) { + p.lock.Lock() p.root = c - return nil + p.lock.Unlock() } // Close stops the server. @@ -168,7 +169,7 @@ func (p *Publisher) Close() error { // Root returns the current root being publisher. func (p *Publisher) Root() cid.Cid { - p.rl.Lock() - defer p.rl.Unlock() + p.lock.Lock() + defer p.lock.Unlock() return p.root } diff --git a/dagsync/p2p/protocol/head/head_test.go b/dagsync/p2p/protocol/head/head_test.go index 24bb360..7bd8b51 100644 --- a/dagsync/p2p/protocol/head/head_test.go +++ b/dagsync/p2p/protocol/head/head_test.go @@ -64,8 +64,7 @@ func TestFetchLatestHead(t *testing.T) { require.NoError(t, err) require.Equal(t, cid.Undef, c, "Expected cid undef because there is no root") - err = p.UpdateRoot(context.Background(), rootLnk.(cidlink.Link).Cid) - require.NoError(t, err) + p.SetRoot(rootLnk.(cidlink.Link).Cid) c, err = head.QueryRootCid(ctx, client, testTopic, publisher.ID()) require.NoError(t, err) @@ -91,8 +90,7 @@ func TestOldProtocolID(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err = p.UpdateRoot(context.Background(), rootLnk.(cidlink.Link).Cid) - require.NoError(t, err) + p.SetRoot(rootLnk.(cidlink.Link).Cid) c, err := head.QueryRootCid(ctx, client, testTopic, publisher.ID()) require.NoError(t, err) diff --git a/dagsync/subscriber_test.go b/dagsync/subscriber_test.go index df18e49..09e8132 100644 --- a/dagsync/subscriber_test.go +++ b/dagsync/subscriber_test.go @@ -57,8 +57,7 @@ func TestScopedBlockHook(t *testing.T) { return } - err = pub.SetRoot(context.Background(), head.(cidlink.Link).Cid) - require.NoError(t, err) + pub.SetRoot(head.(cidlink.Link).Cid) subHost := test.MkTestHost() subDS := dssync.MutexWrap(datastore.NewMapDatastore()) @@ -93,8 +92,7 @@ func TestScopedBlockHook(t *testing.T) { Seed: ll.Seed + 1, }.Build(t, lsys) - err = pub.SetRoot(context.Background(), anotherLL.(cidlink.Link).Cid) - require.NoError(t, err) + pub.SetRoot(anotherLL.(cidlink.Link).Cid) _, err = sub.Sync(context.Background(), peerInfo, cid.Undef, nil) require.NoError(t, err) @@ -123,8 +121,7 @@ func TestSyncedCidsReturned(t *testing.T) { return } - err = pub.SetRoot(context.Background(), head.(cidlink.Link).Cid) - require.NoError(t, err) + pub.SetRoot(head.(cidlink.Link).Cid) subHost := test.MkTestHost() subDS := dssync.MutexWrap(datastore.NewMapDatastore()) @@ -188,8 +185,7 @@ func TestConcurrentSync(t *testing.T) { return } - err = pub.SetRoot(context.Background(), head.(cidlink.Link).Cid) - require.NoError(t, err) + pub.SetRoot(head.(cidlink.Link).Cid) } subDS := dssync.MutexWrap(datastore.NewMapDatastore()) @@ -252,7 +248,7 @@ func TestSync(t *testing.T) { defer subSys.close() calledTimes := 0 - pub, sub := dpsb.Build(t, testTopic, pubSys, subSys, + pub, sub, _ := dpsb.Build(t, testTopic, pubSys, subSys, []dagsync.Option{dagsync.BlockHook(func(i peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) { calledTimes++ })}, @@ -264,14 +260,13 @@ func TestSync(t *testing.T) { return } - err := pub.SetRoot(context.Background(), head.(cidlink.Link).Cid) - require.NoError(t, err) + pub.SetRoot(head.(cidlink.Link).Cid) peerInfo := peer.AddrInfo{ ID: pub.ID(), Addrs: pub.Addrs(), } - _, err = sub.Sync(context.Background(), peerInfo, cid.Undef, nil) + _, err := sub.Sync(context.Background(), peerInfo, cid.Undef, nil) require.NoError(t, err) calledTimesFirstSync := calledTimes latestSync := sub.GetLatestSync(pubSys.host.ID()) @@ -314,7 +309,7 @@ func TestSyncWithHydratedDataStore(t *testing.T) { calledTimes := 0 var calledWith []cid.Cid - pub, sub := dpsb.Build(t, testTopic, pubSys, subSys, + pub, sub, _ := dpsb.Build(t, testTopic, pubSys, subSys, []dagsync.Option{dagsync.BlockHook(func(i peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) { calledWith = append(calledWith, c) calledTimes++ @@ -327,8 +322,7 @@ func TestSyncWithHydratedDataStore(t *testing.T) { return } - err = pub.SetRoot(context.Background(), head.(cidlink.Link).Cid) - require.NoError(t, err) + pub.SetRoot(head.(cidlink.Link).Cid) // Sync once to hydrate the datastore // Note we set the cid we are syncing to so we don't update the latestSync. @@ -358,7 +352,7 @@ func TestRoundTripSimple(t *testing.T) { // Init dagsync publisher and subscriber srcStore := dssync.MutexWrap(datastore.NewMapDatastore()) dstStore := dssync.MutexWrap(datastore.NewMapDatastore()) - srcHost, dstHost, pub, sub := initPubSub(t, srcStore, dstStore) + srcHost, dstHost, pub, sub, sender := initPubSub(t, srcStore, dstStore) defer srcHost.Close() defer dstHost.Close() defer pub.Close() @@ -372,7 +366,9 @@ func TestRoundTripSimple(t *testing.T) { lnk, err := test.Store(srcStore, itm) require.NoError(t, err) - err = pub.UpdateRoot(context.Background(), lnk.(cidlink.Link).Cid) + rootCid := lnk.(cidlink.Link).Cid + pub.SetRoot(rootCid) + err = announce.Send(context.Background(), rootCid, pub.Addrs(), sender) require.NoError(t, err) select { @@ -405,17 +401,17 @@ func TestRoundTrip(t *testing.T) { topics := test.WaitForMeshWithMessage(t, "testTopic", srcHost1, srcHost2, dstHost) - p2pSender, err := p2psender.New(nil, "", p2psender.WithTopic(topics[0])) + p2pSender1, err := p2psender.New(nil, "", p2psender.WithTopic(topics[0])) require.NoError(t, err) - pub1, err := dtsync.NewPublisher(srcHost1, srcStore1, srcLnkS1, "", dtsync.WithAnnounceSenders(p2pSender)) + pub1, err := dtsync.NewPublisher(srcHost1, srcStore1, srcLnkS1, "") require.NoError(t, err) defer pub1.Close() - p2pSender, err = p2psender.New(nil, "", p2psender.WithTopic(topics[1])) + p2pSender2, err := p2psender.New(nil, "", p2psender.WithTopic(topics[1])) require.NoError(t, err) - pub2, err := dtsync.NewPublisher(srcHost2, srcStore2, srcLnkS2, "", dtsync.WithAnnounceSenders(p2pSender)) + pub2, err := dtsync.NewPublisher(srcHost2, srcStore2, srcLnkS2, "") require.NoError(t, err) defer pub2.Close() @@ -443,13 +439,17 @@ func TestRoundTrip(t *testing.T) { lnk2, err := test.Store(srcStore2, itm2) require.NoError(t, err) - err = pub1.UpdateRoot(context.Background(), lnk1.(cidlink.Link).Cid) + rootCid1 := lnk1.(cidlink.Link).Cid + pub1.SetRoot(rootCid1) + err = announce.Send(context.Background(), rootCid1, pub1.Addrs(), p2pSender1) require.NoError(t, err) t.Log("Publish 1:", lnk1.(cidlink.Link).Cid) waitForSync(t, "Watcher 1", dstStore, lnk1.(cidlink.Link), watcher1) waitForSync(t, "Watcher 2", dstStore, lnk1.(cidlink.Link), watcher2) - err = pub2.UpdateRoot(context.Background(), lnk2.(cidlink.Link).Cid) + rootCid2 := lnk2.(cidlink.Link).Cid + pub2.SetRoot(rootCid2) + err = announce.Send(context.Background(), rootCid2, pub2.Addrs(), p2pSender2) require.NoError(t, err) t.Log("Publish 2:", lnk2.(cidlink.Link).Cid) waitForSync(t, "Watcher 1", dstStore, lnk2.(cidlink.Link), watcher1) @@ -470,7 +470,7 @@ func TestHttpPeerAddrPeerstore(t *testing.T) { defer pubHostSys.close() defer subHostSys.close() - pub, sub := dagsyncPubSubBuilder{ + pub, sub, _ := dagsyncPubSubBuilder{ IsHttp: true, }.Build(t, testTopic, pubHostSys, subHostSys, nil) @@ -488,18 +488,16 @@ func TestHttpPeerAddrPeerstore(t *testing.T) { prevHead := ll head := nextLL - err := pub.SetRoot(context.Background(), prevHead.(cidlink.Link).Cid) - require.NoError(t, err) + pub.SetRoot(prevHead.(cidlink.Link).Cid) peerInfo := peer.AddrInfo{ ID: pub.ID(), Addrs: pub.Addrs(), } - _, err = sub.Sync(context.Background(), peerInfo, cid.Undef, nil) + _, err := sub.Sync(context.Background(), peerInfo, cid.Undef, nil) require.NoError(t, err) - err = pub.SetRoot(context.Background(), head.(cidlink.Link).Cid) - require.NoError(t, err) + pub.SetRoot(head.(cidlink.Link).Cid) // Now call sync again with no address. The subscriber should re-use the // previous address and succeeed. @@ -515,7 +513,7 @@ func TestSyncFinishedAlwaysDelivered(t *testing.T) { defer pubHostSys.close() defer subHostSys.close() - pub, sub := dagsyncPubSubBuilder{}.Build(t, testTopic, pubHostSys, subHostSys, nil) + pub, sub, _ := dagsyncPubSubBuilder{}.Build(t, testTopic, pubHostSys, subHostSys, nil) ll := llBuilder{ Length: 1, @@ -537,24 +535,21 @@ func TestSyncFinishedAlwaysDelivered(t *testing.T) { onSyncFinishedChan, cncl := sub.OnSyncFinished() defer cncl() - err := pub.SetRoot(context.Background(), ll.(cidlink.Link).Cid) - require.NoError(t, err) + pub.SetRoot(ll.(cidlink.Link).Cid) peerInfo := peer.AddrInfo{ ID: pub.ID(), Addrs: pub.Addrs(), } - _, err = sub.Sync(context.Background(), peerInfo, cid.Undef, nil) + _, err := sub.Sync(context.Background(), peerInfo, cid.Undef, nil) require.NoError(t, err) - err = pub.SetRoot(context.Background(), nextLL.(cidlink.Link).Cid) - require.NoError(t, err) + pub.SetRoot(nextLL.(cidlink.Link).Cid) _, err = sub.Sync(context.Background(), peerInfo, cid.Undef, nil) require.NoError(t, err) - err = pub.SetRoot(context.Background(), headLL.(cidlink.Link).Cid) - require.NoError(t, err) + pub.SetRoot(headLL.(cidlink.Link).Cid) _, err = sub.Sync(context.Background(), peerInfo, cid.Undef, nil) require.NoError(t, err) @@ -564,8 +559,7 @@ func TestSyncFinishedAlwaysDelivered(t *testing.T) { Seed: 2, }.BuildWithPrev(t, pubHostSys.lsys, headLL) - err = pub.SetRoot(context.Background(), head.(cidlink.Link).Cid) - require.NoError(t, err) + pub.SetRoot(head.(cidlink.Link).Cid) // This is blocked until we read from onSyncFinishedChan syncDoneCh := make(chan error) @@ -682,7 +676,7 @@ func (h *hostSystem) close() { h.host.Close() } -func (b dagsyncPubSubBuilder) Build(t *testing.T, topicName string, pubSys hostSystem, subSys hostSystem, subOpts []dagsync.Option) (dagsync.Publisher, *dagsync.Subscriber) { +func (b dagsyncPubSubBuilder) Build(t *testing.T, topicName string, pubSys hostSystem, subSys hostSystem, subOpts []dagsync.Option) (dagsync.Publisher, *dagsync.Subscriber, []announce.Sender) { var senders []announce.Sender if !b.P2PAnnounce { p2pSender, err := p2psender.New(pubSys.host, topicName) @@ -693,11 +687,11 @@ func (b dagsyncPubSubBuilder) Build(t *testing.T, topicName string, pubSys hostS var pub dagsync.Publisher var err error if b.IsHttp { - pub, err = httpsync.NewPublisher("127.0.0.1:0", pubSys.lsys, pubSys.privKey, httpsync.WithAnnounceSenders(senders...)) + pub, err = httpsync.NewPublisher("127.0.0.1:0", pubSys.lsys, pubSys.privKey) require.NoError(t, err) require.NoError(t, test.WaitForHttpPublisher(pub)) } else { - pub, err = dtsync.NewPublisher(pubSys.host, pubSys.ds, pubSys.lsys, topicName, dtsync.WithAnnounceSenders(senders...)) + pub, err = dtsync.NewPublisher(pubSys.host, pubSys.ds, pubSys.lsys, topicName) require.NoError(t, err) require.NoError(t, test.WaitForP2PPublisher(pub, subSys.host, topicName)) } @@ -705,7 +699,7 @@ func (b dagsyncPubSubBuilder) Build(t *testing.T, topicName string, pubSys hostS sub, err := dagsync.NewSubscriber(subSys.host, subSys.ds, subSys.lsys, topicName, nil, subOpts...) require.NoError(t, err) - return pub, sub + return pub, sub, senders } type llBuilder struct { diff --git a/dagsync/sync_test.go b/dagsync/sync_test.go index 440f51b..f3129ad 100644 --- a/dagsync/sync_test.go +++ b/dagsync/sync_test.go @@ -12,6 +12,7 @@ import ( dssync "github.com/ipfs/go-datastore/sync" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipni/go-libipni/announce" "github.com/ipni/go-libipni/announce/p2psender" "github.com/ipni/go-libipni/dagsync" "github.com/ipni/go-libipni/dagsync/dtsync" @@ -39,7 +40,7 @@ func TestLatestSyncSuccess(t *testing.T) { p2pSender, err := p2psender.New(nil, "", p2psender.WithTopic(topics[0])) require.NoError(t, err) - pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic, dtsync.WithAnnounceSenders(p2pSender)) + pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic) require.NoError(t, err) defer pub.Close() @@ -55,11 +56,11 @@ func TestLatestSyncSuccess(t *testing.T) { // Store the whole chain in source node chainLnks := test.MkChain(srcLnkS, true) - err = newUpdateTest(pub, sub, dstStore, watcher, srcHost.ID(), chainLnks[2], false, chainLnks[2].(cidlink.Link).Cid) + err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[2], false, chainLnks[2].(cidlink.Link).Cid) require.NoError(t, err) - err = newUpdateTest(pub, sub, dstStore, watcher, srcHost.ID(), chainLnks[1], false, chainLnks[1].(cidlink.Link).Cid) + err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[1], false, chainLnks[1].(cidlink.Link).Cid) require.NoError(t, err) - err = newUpdateTest(pub, sub, dstStore, watcher, srcHost.ID(), chainLnks[0], false, chainLnks[0].(cidlink.Link).Cid) + err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0], false, chainLnks[0].(cidlink.Link).Cid) require.NoError(t, err) } @@ -82,7 +83,7 @@ func TestSyncFn(t *testing.T) { p2pSender, err := p2psender.New(nil, "", p2psender.WithTopic(topics[0])) require.NoError(t, err) - pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic, dtsync.WithAnnounceSenders(p2pSender)) + pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic) require.NoError(t, err) defer pub.Close() @@ -150,7 +151,8 @@ func TestSyncFn(t *testing.T) { // Assert the latestSync is updated by explicit sync when cid and selector are unset. newHead := chainLnks[0].(cidlink.Link).Cid - err = pub.UpdateRoot(context.Background(), newHead) + pub.SetRoot(newHead) + err = announce.Send(context.Background(), newHead, pub.Addrs(), p2pSender) require.NoError(t, err) select { @@ -201,7 +203,7 @@ func TestPartialSync(t *testing.T) { p2pSender, err := p2psender.New(nil, "", p2psender.WithTopic(topics[0])) require.NoError(t, err) - pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic, dtsync.WithAnnounceSenders(p2pSender)) + pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic) require.NoError(t, err) defer pub.Close() test.MkChain(srcLnkS, true) @@ -222,7 +224,7 @@ func TestPartialSync(t *testing.T) { defer cncl() // Fetching first few nodes. - err = newUpdateTest(pub, sub, dstStore, watcher, srcHost.ID(), chainLnks[2], false, chainLnks[2].(cidlink.Link).Cid) + err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[2], false, chainLnks[2].(cidlink.Link).Cid) require.NoError(t, err) // Check that first nodes hadn't been synced @@ -236,7 +238,7 @@ func TestPartialSync(t *testing.T) { require.NoError(t, err) // Update all the chain from scratch again. - err = newUpdateTest(pub, sub, dstStore, watcher, srcHost.ID(), chainLnks[0], false, chainLnks[0].(cidlink.Link).Cid) + err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0], false, chainLnks[0].(cidlink.Link).Cid) require.NoError(t, err) // Check if the node we pass through was retrieved @@ -261,7 +263,7 @@ func TestStepByStepSync(t *testing.T) { p2pSender, err := p2psender.New(nil, "", p2psender.WithTopic(topics[0])) require.NoError(t, err) - pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic, dtsync.WithAnnounceSenders(p2pSender)) + pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic) require.NoError(t, err) defer pub.Close() @@ -282,9 +284,9 @@ func TestStepByStepSync(t *testing.T) { test.MkChain(dstLnkS, true) // Sync the rest of the chain - err = newUpdateTest(pub, sub, dstStore, watcher, srcHost.ID(), chainLnks[1], false, chainLnks[1].(cidlink.Link).Cid) + err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[1], false, chainLnks[1].(cidlink.Link).Cid) require.NoError(t, err) - err = newUpdateTest(pub, sub, dstStore, watcher, srcHost.ID(), chainLnks[0], false, chainLnks[0].(cidlink.Link).Cid) + err = newUpdateTest(pub, p2pSender, sub, dstStore, watcher, srcHost.ID(), chainLnks[0], false, chainLnks[0].(cidlink.Link).Cid) require.NoError(t, err) } @@ -325,7 +327,7 @@ func TestLatestSyncFailure(t *testing.T) { watcher, cncl := sub.OnSyncFinished() t.Log("Testing sync fail when the other end does not have the data") - err = newUpdateTest(pub, sub, dstStore, watcher, srcHost.ID(), cidlink.Link{Cid: cid.Undef}, true, chainLnks[3].(cidlink.Link).Cid) + err = newUpdateTest(pub, nil, sub, dstStore, watcher, srcHost.ID(), cidlink.Link{Cid: cid.Undef}, true, chainLnks[3].(cidlink.Link).Cid) cncl() require.NoError(t, err) sub.Close() @@ -340,7 +342,7 @@ func TestLatestSyncFailure(t *testing.T) { watcher, cncl = sub2.OnSyncFinished() t.Log("Testing sync fail when not able to run the full exchange") - err = newUpdateTest(pub, sub2, dstStore, watcher, srcHost.ID(), chainLnks[2], true, chainLnks[3].(cidlink.Link).Cid) + err = newUpdateTest(pub, nil, sub2, dstStore, watcher, srcHost.ID(), chainLnks[2], true, chainLnks[3].(cidlink.Link).Cid) cncl() require.NoError(t, err) } @@ -411,8 +413,7 @@ func TestCancelDeadlock(t *testing.T) { chainLnks := test.MkChain(srcLnkS, true) c := chainLnks[2].(cidlink.Link).Cid - err = pub.SetRoot(context.Background(), c) - require.NoError(t, err) + pub.SetRoot(c) peerInfo := peer.AddrInfo{ ID: srcHost.ID(), @@ -423,8 +424,7 @@ func TestCancelDeadlock(t *testing.T) { // Now there should be an event on the watcher channel. c = chainLnks[1].(cidlink.Link).Cid - err = pub.SetRoot(context.Background(), c) - require.NoError(t, err) + pub.SetRoot(c) _, err = sub.Sync(context.Background(), peerInfo, cid.Undef, nil) require.NoError(t, err) @@ -455,10 +455,7 @@ func newAnnounceTest(pub dagsync.Publisher, sub *dagsync.Subscriber, dstStore da var err error c := lnk.(cidlink.Link).Cid if c != cid.Undef { - err = pub.SetRoot(context.Background(), c) - if err != nil { - return err - } + pub.SetRoot(c) } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -486,11 +483,12 @@ func newAnnounceTest(pub dagsync.Publisher, sub *dagsync.Subscriber, dstStore da return assertLatestSyncEquals(sub, peerID, expectedSync) } -func newUpdateTest(pub dagsync.Publisher, sub *dagsync.Subscriber, dstStore datastore.Batching, watcher <-chan dagsync.SyncFinished, peerID peer.ID, lnk ipld.Link, withFailure bool, expectedSync cid.Cid) error { +func newUpdateTest(pub dagsync.Publisher, sender announce.Sender, sub *dagsync.Subscriber, dstStore datastore.Batching, watcher <-chan dagsync.SyncFinished, peerID peer.ID, lnk ipld.Link, withFailure bool, expectedSync cid.Cid) error { var err error c := lnk.(cidlink.Link).Cid if c != cid.Undef { - err = pub.UpdateRoot(context.Background(), c) + pub.SetRoot(c) + err = announce.Send(context.Background(), c, pub.Addrs(), sender) if err != nil { return err }