Skip to content

Commit

Permalink
Decoupe announce Senders from Publishers (#88)
Browse files Browse the repository at this point in the history
There is no strong reason to associate a set of announce Senders with a Publisher. The only association is that a Sender is often used to after updating the Sublisher's head CID. By separating these, it allows more flexibility in where in handling serving an advertisement chain is done and where sending advertisement announcements is done.
  • Loading branch information
gammazero authored Jul 24, 2023
1 parent 9116d75 commit 2f35326
Show file tree
Hide file tree
Showing 21 changed files with 201 additions and 393 deletions.
13 changes: 12 additions & 1 deletion announce/httpsender/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (
const defaultTimeout = time.Minute

type config struct {
timeout time.Duration
client *http.Client
extraData []byte
timeout time.Duration
userAgent string
}

Expand All @@ -33,6 +34,16 @@ func getOpts(opts []Option) (config, error) {
return cfg, nil
}

// WithExtraData sets the extra data to include in the announce message.
func WithExtraData(data []byte) Option {
return func(c *config) error {
if len(data) != 0 {
c.extraData = data
}
return nil
}
}

// WithTimeout configures the timeout to wait for a response.
func WithTimeout(timeout time.Duration) Option {
return func(cfg *config) error {
Expand Down
8 changes: 8 additions & 0 deletions announce/httpsender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const DefaultAnnouncePath = "/announce"
type Sender struct {
announceURLs []string
client *http.Client
extraData []byte
peerID peer.ID
userAgent string
}
Expand Down Expand Up @@ -68,6 +69,7 @@ func New(announceURLs []*url.URL, peerID peer.ID, options ...Option) (*Sender, e

return &Sender{
announceURLs: urls,
extraData: opts.extraData,
client: client,
peerID: peerID,
userAgent: opts.userAgent,
Expand All @@ -86,6 +88,9 @@ func (s *Sender) Send(ctx context.Context, msg message.Message) error {
if err != nil {
return fmt.Errorf("cannot add p2p id to message addrs: %w", err)
}
if len(s.extraData) != 0 {
msg.ExtraData = s.extraData
}
buf := bytes.NewBuffer(nil)
if err = msg.MarshalCBOR(buf); err != nil {
return fmt.Errorf("cannot cbor encode announce message: %w", err)
Expand All @@ -98,6 +103,9 @@ func (s *Sender) SendJson(ctx context.Context, msg message.Message) error {
if err != nil {
return fmt.Errorf("cannot add p2p id to message addrs: %w", err)
}
if len(s.extraData) != 0 {
msg.ExtraData = s.extraData
}
buf := new(bytes.Buffer)
if err = json.NewEncoder(buf).Encode(msg); err != nil {
return fmt.Errorf("cannot json encode announce message: %w", err)
Expand Down
13 changes: 12 additions & 1 deletion announce/p2psender/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (

// config contains all options for configuring dtsync.publisher.
type config struct {
topic *pubsub.Topic
topic *pubsub.Topic
extraData []byte
}

// Option is a function that sets a value in a config.
Expand All @@ -32,3 +33,13 @@ func WithTopic(topic *pubsub.Topic) Option {
return nil
}
}

// WithExtraData sets the extra data to include in the announce message.
func WithExtraData(data []byte) Option {
return func(c *config) error {
if len(data) != 0 {
c.extraData = data
}
return nil
}
}
5 changes: 5 additions & 0 deletions announce/p2psender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const shutdownTime = 5 * time.Second
type Sender struct {
cancelPubSub context.CancelFunc
topic *pubsub.Topic
extraData []byte
}

// New creates a new Sender that sends announce messages over pubsub.
Expand All @@ -41,6 +42,7 @@ func New(p2pHost host.Host, topicName string, options ...Option) (*Sender, error
return &Sender{
cancelPubSub: cancelPubsub,
topic: topic,
extraData: opts.extraData,
}, nil
}

Expand All @@ -65,6 +67,9 @@ func (s *Sender) Close() error {

// Send sends the Message to the pubsub topic.
func (s *Sender) Send(ctx context.Context, msg message.Message) error {
if len(s.extraData) != 0 {
msg.ExtraData = s.extraData
}
buf := bytes.NewBuffer(nil)
if err := msg.MarshalCBOR(buf); err != nil {
return err
Expand Down
32 changes: 32 additions & 0 deletions announce/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package announce

import (
"context"
"errors"

"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-cid"
"github.com/ipni/go-libipni/announce/message"
"github.com/multiformats/go-multiaddr"
)

// Sender is the interface for announce sender implementations.
Expand All @@ -13,3 +17,31 @@ type Sender interface {
// Send sends the announce Message.
Send(context.Context, message.Message) error
}

// Send sends an advertisement announcement message, containing the specified
// addresses and extra data, to all senders.
func Send(ctx context.Context, c cid.Cid, addrs []multiaddr.Multiaddr, senders ...Sender) error {
// Do nothing if nothing to announce or no means to announce it.
if c == cid.Undef || len(senders) == 0 {
return nil
}

msg := message.Message{
Cid: c,
}
msg.SetAddrs(addrs)

var errs error
for _, sender := range senders {
if sender == nil {
continue
}
if err := sender.Send(ctx, msg); err != nil {
errs = multierror.Append(errs, err)
if errors.Is(err, context.Canceled) {
return err
}
}
}
return errs
}
12 changes: 4 additions & 8 deletions dagsync/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ func TestAnnounceReplace(t *testing.T) {
hnd.subscriber.scopedBlockHookMutex.Lock()

firstCid := chainLnks[2].(cidlink.Link).Cid
err = pub.SetRoot(context.Background(), firstCid)
require.NoError(t, err)
pub.SetRoot(firstCid)

// Have the subscriber receive an announce. This is the same as if it was
// published by the publisher without having to wait for it to arrive.
Expand All @@ -79,16 +78,14 @@ func TestAnnounceReplace(t *testing.T) {

// Announce two more times.
c := chainLnks[1].(cidlink.Link).Cid
err = pub.SetRoot(context.Background(), c)
require.NoError(t, err)
pub.SetRoot(c)

err = sub.Announce(context.Background(), c, srcHost.ID(), srcHost.Addrs())
require.NoError(t, err)

t.Log("Sent announce for second CID", c)
lastCid := chainLnks[0].(cidlink.Link).Cid
err = pub.SetRoot(context.Background(), lastCid)
require.NoError(t, err)
pub.SetRoot(lastCid)

err = sub.Announce(context.Background(), lastCid, srcHost.ID(), srcHost.Addrs())
require.NoError(t, err)
Expand Down Expand Up @@ -240,8 +237,7 @@ func TestAnnounceRepublish(t *testing.T) {
chainLnks := test.MkChain(srcLnkS, true)

firstCid := chainLnks[2].(cidlink.Link).Cid
err = pub.SetRoot(context.Background(), firstCid)
require.NoError(t, err)
pub.SetRoot(firstCid)

// Announce one CID to subscriber1.
err = sub1.Announce(context.Background(), firstCid, srcHost.ID(), srcHost.Addrs())
Expand Down
23 changes: 0 additions & 23 deletions dagsync/dtsync/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dtsync
import (
"fmt"

"github.com/ipni/go-libipni/announce"
"github.com/libp2p/go-libp2p/core/peer"
)

Expand All @@ -15,9 +14,7 @@ const (

// config contains all options for configuring dtsync.publisher.
type config struct {
extraData []byte
allowPeer func(peer.ID) bool
senders []announce.Sender

gsMaxInRequests uint64
gsMaxOutRequests uint64
Expand All @@ -40,16 +37,6 @@ func getOpts(opts []Option) (config, error) {
return cfg, nil
}

// WithExtraData sets the extra data to include in the pubsub message.
func WithExtraData(data []byte) Option {
return func(c *config) error {
if len(data) != 0 {
c.extraData = data
}
return nil
}
}

// WithAllowPeer sets the function that determines whether to allow or reject
// graphsync sessions from a peer.
func WithAllowPeer(allowPeer func(peer.ID) bool) Option {
Expand All @@ -59,16 +46,6 @@ func WithAllowPeer(allowPeer func(peer.ID) bool) Option {
}
}

// WithAnnounceSenders sets announce.Senders to use for sending announcements.
func WithAnnounceSenders(senders ...announce.Sender) Option {
return func(c *config) error {
if len(senders) != 0 {
c.senders = senders
}
return nil
}
}

func WithMaxGraphsyncRequests(maxIn, maxOut uint64) Option {
return func(c *config) error {
c.gsMaxInRequests = maxIn
Expand Down
83 changes: 4 additions & 79 deletions dagsync/dtsync/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dtsync

import (
"context"
"errors"
"fmt"
"net/http"
"sync"
Expand All @@ -12,24 +11,19 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"
"github.com/ipni/go-libipni/announce"
"github.com/ipni/go-libipni/announce/message"
"github.com/ipni/go-libipni/dagsync/p2p/protocol/head"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)

// Publisher is a data-transfer publisher that announces the head of an
// advertisement chain to a set of configured senders.
// Publisher serves an advertisement over libp2p using data-transfer.
type Publisher struct {
closeOnce sync.Once
dtManager dt.Manager
dtClose dtCloseFunc
extraData []byte
headPublisher *head.Publisher
host host.Host
senders []announce.Sender
}

// NewPublisher creates a new dagsync publisher.
Expand All @@ -50,10 +44,8 @@ func NewPublisher(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem, t
return &Publisher{
dtManager: dtManager,
dtClose: dtClose,
extraData: opts.extraData,
headPublisher: headPublisher,
host: host,
senders: opts.senders,
}, nil
}

Expand Down Expand Up @@ -85,10 +77,8 @@ func NewPublisherFromExisting(dtManager dt.Manager, host host.Host, topicName st
startHeadPublisher(host, topicName, headPublisher)

return &Publisher{
extraData: opts.extraData,
headPublisher: headPublisher,
host: host,
senders: opts.senders,
}, nil
}

Expand All @@ -108,65 +98,12 @@ func (p *Publisher) Protocol() int {
return multiaddr.P_P2P
}

// AnnounceHead announces the current head of the advertisement chain to the
// configured senders.
func (p *Publisher) AnnounceHead(ctx context.Context) error {
return p.announce(ctx, p.headPublisher.Root(), p.Addrs())
}

// AnnounceHeadWithAddrs announces the current head of the advertisement chain
// to the configured senders with the given addresses.
func (p *Publisher) AnnounceHeadWithAddrs(ctx context.Context, addrs []multiaddr.Multiaddr) error {
return p.announce(ctx, p.headPublisher.Root(), addrs)
}

func (p *Publisher) announce(ctx context.Context, c cid.Cid, addrs []multiaddr.Multiaddr) error {
// Do nothing if nothing to announce or no means to announce it.
if c == cid.Undef || len(p.senders) == 0 {
return nil
}

msg := message.Message{
Cid: c,
ExtraData: p.extraData,
}
msg.SetAddrs(addrs)

var errs error
for _, sender := range p.senders {
if err := sender.Send(ctx, msg); err != nil {
errs = multierror.Append(errs, err)
}
}
return errs
}

// SetRoot sets the root CID of the advertisement chain.
func (p *Publisher) SetRoot(ctx context.Context, c cid.Cid) error {
if c == cid.Undef {
return errors.New("cannot update to an undefined cid")
}
log.Debugf("Setting root CID: %s", c)
return p.headPublisher.UpdateRoot(ctx, c)
}

// UpdateRoot updates the root CID of the advertisement chain and announces it
// to the configured senders.
func (p *Publisher) UpdateRoot(ctx context.Context, c cid.Cid) error {
return p.UpdateRootWithAddrs(ctx, c, p.Addrs())
}

// UpdateRootWithAddrs updates the root CID of the advertisement chain and
// announces it to the configured senders with the given addresses.
func (p *Publisher) UpdateRootWithAddrs(ctx context.Context, c cid.Cid, addrs []multiaddr.Multiaddr) error {
err := p.SetRoot(ctx, c)
if err != nil {
return err
}
return p.announce(ctx, c, addrs)
func (p *Publisher) SetRoot(c cid.Cid) {
p.headPublisher.SetRoot(c)
}

// Close closes the publisher and all of its senders.
// Close closes the publisher.
func (p *Publisher) Close() error {
var errs error
p.closeOnce.Do(func() {
Expand All @@ -175,24 +112,12 @@ func (p *Publisher) Close() error {
errs = multierror.Append(errs, err)
}

for _, sender := range p.senders {
if err = sender.Close(); err != nil {
errs = multierror.Append(errs, err)
}
}

if p.dtClose != nil {
err = p.dtClose()
if err != nil {
errs = multierror.Append(errs, err)
}
}

for _, sender := range p.senders {
if err = sender.Close(); err != nil {
errs = multierror.Append(errs, err)
}
}
})
return errs
}
Loading

0 comments on commit 2f35326

Please sign in to comment.