Skip to content

Commit

Permalink
Mirror only publishes over HTTP
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Aug 17, 2023
1 parent ba7bfd9 commit c2251f3
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 73 deletions.
33 changes: 23 additions & 10 deletions cmd/provider/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ var Mirror struct {
source *cli.StringFlag
syncInterval *cli.DurationFlag
identityPath *cli.PathFlag
listenAddrs *cli.StringSliceFlag
listenAddr *cli.StringFlag
p2pListenAddrs *cli.StringSliceFlag
storePath *cli.PathFlag
initAdRecurLimit *cli.UintFlag
entriesRecurLimit *cli.UintFlag
Expand Down Expand Up @@ -57,9 +58,15 @@ func init() {
Usage: "The path to the file containing the marshalled libp2p private key that the mirror should use as its identity.",
DefaultText: "Randomly generated",
}
Mirror.flags.listenAddrs = &cli.StringSliceFlag{
Name: "listenAddrs",
Usage: "The mirror listen addresses in form of multiaddr.",
Mirror.flags.listenAddr = &cli.StringFlag{
Name: "listenAddr",
Usage: "The HTTP address:port that the mirror publishes advertisements over HTTP on.",
// TODO: when libp2phttp available: "none, publish http over libp2p"
DefaultText: "Local host with a random open port",
}
Mirror.flags.p2pListenAddrs = &cli.StringSliceFlag{
Name: "p2pListenAddrs",
Usage: "The mirror p2p host listen addresses in form of multiaddr.",
DefaultText: "Local host with a random open port",
}
Mirror.flags.storePath = &cli.PathFlag{
Expand Down Expand Up @@ -125,7 +132,8 @@ func init() {
Mirror.flags.source,
Mirror.flags.syncInterval,
Mirror.flags.identityPath,
Mirror.flags.listenAddrs,
Mirror.flags.listenAddr,
Mirror.flags.p2pListenAddrs,
Mirror.flags.storePath,
Mirror.flags.initAdRecurLimit,
Mirror.flags.entriesRecurLimit,
Expand Down Expand Up @@ -153,28 +161,33 @@ func beforeMirror(cctx *cli.Context) error {
Mirror.options = append(Mirror.options, mirror.WithSyncInterval(Mirror.flags.syncInterval.Get(cctx)))
}
var hostOpts []libp2p.Option
var pk crypto.PrivKey
if cctx.IsSet(Mirror.flags.identityPath.Name) {
pkPath := Mirror.flags.identityPath.Get(cctx)
pkBytes, err := os.ReadFile(pkPath)
if err != nil {
return err
}
pk, err := crypto.UnmarshalPrivateKey(pkBytes)
pk, err = crypto.UnmarshalPrivateKey(pkBytes)
if err != nil {
return err
}
hostOpts = append(hostOpts, libp2p.Identity(pk))
}
if cctx.IsSet(Mirror.flags.listenAddrs.Name) {
listenAddrs := Mirror.flags.listenAddrs.Get(cctx)
hostOpts = append(hostOpts, libp2p.ListenAddrStrings(listenAddrs...))
if cctx.IsSet(Mirror.flags.listenAddr.Name) {
listenAddr := Mirror.flags.listenAddr.Get(cctx)
Mirror.options = append(Mirror.options, mirror.WithHTTPListenAddr(listenAddr))
}
if cctx.IsSet(Mirror.flags.p2pListenAddrs.Name) {
p2pListenAddrs := Mirror.flags.p2pListenAddrs.Get(cctx)
hostOpts = append(hostOpts, libp2p.ListenAddrStrings(p2pListenAddrs...))
}
if len(hostOpts) != 0 {
h, err := libp2p.New(hostOpts...)
if err != nil {
return err
}
Mirror.options = append(Mirror.options, mirror.WithHost(h))
Mirror.options = append(Mirror.options, mirror.WithHost(h, pk))
}
if cctx.IsSet(Mirror.flags.storePath.Name) {
path := Mirror.flags.storePath.Get(cctx)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/ipld/go-codec-dagpb v1.6.0
github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0
github.com/ipld/go-ipld-prime v0.20.0
github.com/ipni/go-libipni v0.3.5-0.20230810183809-74fc137eba82
github.com/ipni/go-libipni v0.4.0
github.com/libp2p/go-libp2p v0.29.2
github.com/libp2p/go-libp2p-pubsub v0.9.3
github.com/mitchellh/go-homedir v1.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0/go.mod h1:od
github.com/ipld/go-ipld-prime v0.20.0 h1:Ud3VwE9ClxpO2LkCYP7vWPc0Fo+dYdYzgxUJZ3uRG4g=
github.com/ipld/go-ipld-prime v0.20.0/go.mod h1:PzqZ/ZR981eKbgdr3y2DJYeD/8bgMawdGVlJDE8kK+M=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo=
github.com/ipni/go-libipni v0.3.5-0.20230810183809-74fc137eba82 h1:BRA4D5DAOulJ6+WrA51q28xINcQ8QSW5aCE12UdC194=
github.com/ipni/go-libipni v0.3.5-0.20230810183809-74fc137eba82/go.mod h1:LxH6NUmEVruK3FjV2bFWfXKougX7AIe7wVjvPqITrDI=
github.com/ipni/go-libipni v0.4.0 h1:zZ8OU2N0D4iYt0E9jInbDapeh9bG10b5sBgqvScflNw=
github.com/ipni/go-libipni v0.4.0/go.mod h1:LxH6NUmEVruK3FjV2bFWfXKougX7AIe7wVjvPqITrDI=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
Expand Down
54 changes: 15 additions & 39 deletions mirror/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,9 @@ import (
"io"
"time"

dt "github.com/filecoin-project/go-data-transfer/v2"
datatransfer "github.com/filecoin-project/go-data-transfer/v2/impl"
dtnetwork "github.com/filecoin-project/go-data-transfer/v2/network"
gstransport "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-graphsync"
gsimpl "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
Expand All @@ -26,12 +19,12 @@ import (
"github.com/ipni/go-libipni/announce"
"github.com/ipni/go-libipni/announce/p2psender"
"github.com/ipni/go-libipni/dagsync"
"github.com/ipni/go-libipni/dagsync/dtsync"
"github.com/ipni/go-libipni/dagsync/ipnisync"
"github.com/ipni/go-libipni/ingest/schema"
"github.com/ipni/index-provider/engine/chunker"
"github.com/ipni/index-provider/metrics"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
Expand Down Expand Up @@ -62,6 +55,7 @@ func New(ctx context.Context, source peer.AddrInfo, o ...Option) (*Mirror, error
if err != nil {
return nil, err
}

m := &Mirror{
options: opts,
source: source,
Expand All @@ -81,17 +75,15 @@ func New(ctx context.Context, source peer.AddrInfo, o ...Option) (*Mirror, error
}
}

dtds := namespace.Wrap(opts.ds, datastore.NewKey("datatransfer"))
dm, gx, err := newDataTransfer(ctx, m.h, dtds, m.ls)
// Create ipnisync publisher.
//
// TODO: When libp2phttp available, Listen on http over libp2p if
// httpListenAddr is not set.
m.pub, err = ipnisync.NewPublisher(m.httpListenAddr, m.ls, m.privKey, ipnisync.WithHeadTopic(m.topic), ipnisync.WithServer(true))
if err != nil {
return nil, err
}

// TODO: make the publisher kind configurable just like we do for Engine.
m.pub, err = dtsync.NewPublisherFromExisting(dm, m.h, m.topic, m.ls)
if err != nil {
return nil, err
}
// TODO: If a mirror should send its own announcements, then pubsub senders
// will need a storage provider ID, set as the sender's extra data, in
// order to relayed through gateways. HTTP senders will new destination
Expand All @@ -102,34 +94,14 @@ func New(ctx context.Context, source peer.AddrInfo, o ...Option) (*Mirror, error
}
m.senders = append(m.senders, p2pSender)

m.sub, err = dagsync.NewSubscriber(m.h, nil, m.ls, m.topic, dagsync.DtManager(dm, gx), dagsync.RecvAnnounce())
dtds := namespace.Wrap(opts.ds, datastore.NewKey("datatransfer"))
m.sub, err = dagsync.NewSubscriber(m.h, dtds, m.ls, m.topic, dagsync.RecvAnnounce())
if err != nil {
return nil, err
}
return m, nil
}

// TODO: add option to override this
func newDataTransfer(ctx context.Context, host host.Host, ds datastore.Batching, ls ipld.LinkSystem) (dt.Manager, graphsync.GraphExchange, error) {
gn := gsnet.NewFromLibp2pHost(host)
gx := gsimpl.New(ctx, gn, ls)
dn := dtnetwork.NewFromLibp2pHost(host)
tp := gstransport.NewTransport(host.ID(), gx)
dm, err := datatransfer.NewDataTransfer(ds, dn, tp)
if err != nil {
return nil, nil, err
}
dtReady := make(chan error)
dm.OnReady(func(e error) { dtReady <- e })
if err := dm.Start(ctx); err != nil {
return nil, nil, err
}
if err := <-dtReady; err != nil {
return nil, nil, err
}
return dm, gx, nil
}

func (m *Mirror) Start() error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel
Expand Down Expand Up @@ -203,6 +175,10 @@ func (m *Mirror) Shutdown() error {
return nil
}

func (m *Mirror) PublisherAddrs() []multiaddr.Multiaddr {
return m.pub.Addrs()
}

func (m *Mirror) mirror(ctx context.Context, adCid cid.Cid) error {
log := log.With("originalAd", adCid)
ad, err := m.loadAd(ctx, adCid)
Expand Down Expand Up @@ -296,7 +272,7 @@ func (m *Mirror) mirror(ctx context.Context, adCid cid.Cid) error {
}

m.pub.SetRoot(mirroredAdCid)
if err = announce.Send(ctx, mirroredAdCid, m.h.Addrs(), m.senders...); err != nil {
if err = announce.Send(ctx, mirroredAdCid, m.pub.Addrs(), m.senders...); err != nil {
return err
}
log.Infow("Mirrored successfully", "originalAdCid", adCid, "mirroredAdCid", mirroredAdCid)
Expand Down
29 changes: 13 additions & 16 deletions mirror/mirror_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,30 @@ package mirror_test

import (
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"testing"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
hamt "github.com/ipld/go-ipld-adl-hamt"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/bindnode"
"github.com/ipld/go-ipld-prime/storage/memstore"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/ipni/go-libipni/dagsync/dtsync"
"github.com/ipni/go-libipni/dagsync/ipnisync"
"github.com/ipni/go-libipni/ingest/schema"
"github.com/ipni/go-libipni/metadata"
provider "github.com/ipni/index-provider"
"github.com/ipni/index-provider/engine"
"github.com/ipni/index-provider/mirror"
"github.com/ipni/index-provider/testutil"
"github.com/libp2p/go-libp2p"
p2pcrypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
)
Expand All @@ -38,37 +37,35 @@ type testEnv struct {

mirror *mirror.Mirror
mirrorHost host.Host
mirrorSync *dtsync.Sync
mirrorSync *ipnisync.Sync
mirrorSyncHost host.Host

Check failure on line 41 in mirror/mirror_env_test.go

View workflow job for this annotation

GitHub Actions / All

field mirrorSyncHost is unused (U1000)

Check failure on line 41 in mirror/mirror_env_test.go

View workflow job for this annotation

GitHub Actions / go-check / All

field mirrorSyncHost is unused (U1000)
mirrorSyncLs ipld.LinkSystem
mirrorSyncer *dtsync.Syncer
mirrorSyncer *ipnisync.Syncer
mirrorSyncLsStore *memstore.Store
}

func (te *testEnv) startMirror(t *testing.T, ctx context.Context, opts ...mirror.Option) {
var err error
te.mirrorHost, err = libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
privKey, _, err := p2pcrypto.GenerateEd25519Key(rand.Reader)
require.NoError(t, err)
te.mirrorHost, err = libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), libp2p.Identity(privKey))
require.NoError(t, err)
// Override the host, since test environment needs explicit access to it.
opts = append(opts, mirror.WithHost(te.mirrorHost))
opts = append(opts, mirror.WithHost(te.mirrorHost, privKey))
te.mirror, err = mirror.New(ctx, te.sourceAddrInfo(t), opts...)
require.NoError(t, err)
require.NoError(t, te.mirror.Start())
t.Cleanup(func() { require.NoError(t, te.mirror.Shutdown()) })

te.mirrorSyncHost, err = libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
require.NoError(t, err)
te.mirrorSyncHost.Peerstore().AddAddrs(te.mirrorHost.ID(), te.mirrorHost.Addrs(), peerstore.PermanentAddrTTL)

te.mirrorSyncLsStore = &memstore.Store{}
te.mirrorSyncLs = cidlink.DefaultLinkSystem()
te.mirrorSyncLs.SetReadStorage(te.mirrorSyncLsStore)
te.mirrorSyncLs.SetWriteStorage(te.mirrorSyncLsStore)

te.mirrorSync, err = dtsync.NewSync(te.mirrorSyncHost, dssync.MutexWrap(datastore.NewMapDatastore()), te.mirrorSyncLs, nil, 0, 0)
te.mirrorSync = ipnisync.NewSync(te.mirrorSyncLs, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { te.mirrorSync.Close() })
te.mirrorSyncer, err = te.mirrorSync.NewSyncer(te.mirrorHost.ID(), te.mirror.PublisherAddrs())
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, te.mirrorSync.Close()) })
te.mirrorSyncer = te.mirrorSync.NewSyncer(te.mirrorHost.ID(), te.mirror.GetTopicName())
}

func (te *testEnv) sourceAddrInfo(t *testing.T) peer.AddrInfo {
Expand Down
Loading

0 comments on commit c2251f3

Please sign in to comment.