Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New go libipni dagsync without selectors #56

Merged
merged 6 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ require (
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-car/v2 v2.10.1
github.com/ipld/go-ipld-prime v0.20.0
github.com/ipni/go-libipni v0.3.2
github.com/libp2p/go-libp2p v0.29.1
github.com/ipni/go-libipni v0.4.0
github.com/libp2p/go-libp2p v0.29.2
github.com/mattn/go-isatty v0.0.19
github.com/montanaflynn/stats v0.7.0
github.com/multiformats/go-multiaddr v0.10.1
Expand Down Expand Up @@ -123,7 +123,7 @@ require (
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-19 v0.3.3 // indirect
github.com/quic-go/qtls-go1-20 v0.2.3 // indirect
github.com/quic-go/quic-go v0.36.3 // indirect
github.com/quic-go/quic-go v0.36.4 // indirect
github.com/quic-go/webtransport-go v0.5.3 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYt
github.com/ipld/go-ipld-prime v0.20.0 h1:Ud3VwE9ClxpO2LkCYP7vWPc0Fo+dYdYzgxUJZ3uRG4g=
github.com/ipld/go-ipld-prime v0.20.0/go.mod h1:PzqZ/ZR981eKbgdr3y2DJYeD/8bgMawdGVlJDE8kK+M=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo=
github.com/ipni/go-libipni v0.3.2 h1:pzCoWQIefTkIZ0ob2BXCkIxnGoIKIMJOudvt/UgyMJk=
github.com/ipni/go-libipni v0.3.2/go.mod h1:9APtwq1JhcpyEjVsbi8f87nkUtxQcQXokU/HNY7B9g0=
github.com/ipni/go-libipni v0.4.0 h1:zZ8OU2N0D4iYt0E9jInbDapeh9bG10b5sBgqvScflNw=
github.com/ipni/go-libipni v0.4.0/go.mod h1:LxH6NUmEVruK3FjV2bFWfXKougX7AIe7wVjvPqITrDI=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
Expand Down Expand Up @@ -313,8 +313,8 @@ github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38y
github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic=
github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM=
github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro=
github.com/libp2p/go-libp2p v0.29.1 h1:yNeg6XgP8gbdc4YSrwiIt5T1TGOrVjH8dzl8h0GIOfQ=
github.com/libp2p/go-libp2p v0.29.1/go.mod h1:20El+LLy3/YhdUYIvGbLnvVJN32nMdqY6KXBENRAfLY=
github.com/libp2p/go-libp2p v0.29.2 h1:uPw/c8hOxoLP/KhFnzlc5Ejqf+OmAL1dwIsqE31WBtY=
github.com/libp2p/go-libp2p v0.29.2/go.mod h1:OU7nSq0aEZMsV2wY8nXn1+XNNt9q2UiR8LjW3Kmp2UE=
github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s=
github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w=
github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qkCnjyaZUPYU=
Expand Down Expand Up @@ -446,8 +446,8 @@ github.com/quic-go/qtls-go1-19 v0.3.3 h1:wznEHvJwd+2X3PqftRha0SUKmGsnb6dfArMhy9P
github.com/quic-go/qtls-go1-19 v0.3.3/go.mod h1:ySOI96ew8lnoKPtSqx2BlI5wCpUVPT05RMAlajtnyOI=
github.com/quic-go/qtls-go1-20 v0.2.3 h1:m575dovXn1y2ATOb1XrRFcrv0F+EQmlowTkoraNkDPI=
github.com/quic-go/qtls-go1-20 v0.2.3/go.mod h1:JKtK6mjbAVcUTN/9jZpvLbGxvdWIKS8uT7EiStoU1SM=
github.com/quic-go/quic-go v0.36.3 h1:f+yOqeGhMoRX7/M3wmEw/djhzKWr15FtQysox85/834=
github.com/quic-go/quic-go v0.36.3/go.mod h1:qxQumdeKw5GmWs1OsTZZnOxzSI+RJWuhf1O8FN35L2o=
github.com/quic-go/quic-go v0.36.4 h1:CXn/ZLN5Vntlk53fjR+kUMC8Jt7flfQe+I5Ty5A+k0o=
github.com/quic-go/quic-go v0.36.4/go.mod h1:qxQumdeKw5GmWs1OsTZZnOxzSI+RJWuhf1O8FN35L2o=
github.com/quic-go/webtransport-go v0.5.3 h1:5XMlzemqB4qmOlgIus5zB45AcZ2kCgCy2EptUrfOPWU=
github.com/quic-go/webtransport-go v0.5.3/go.mod h1:OhmmgJIzTTqXK5xvtuX0oBpLV2GkLWNDA+UeTGJXErU=
github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk=
Expand Down
145 changes: 47 additions & 98 deletions pkg/adpub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/ipni/go-libipni/dagsync"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -38,17 +32,10 @@ type client struct {
maxSyncRetry uint64
syncRetryBackoff time.Duration

sub *dagsync.Subscriber

store *ClientStore
publisher peer.AddrInfo

// adSel is the selector for a single advertisement.
adSel ipld.Node

host host.Host
ownsHost bool
topic string
host host.Host
ownsHost bool
topic string
}

var ErrContentNotFound = errors.New("content not found at publisher")
Expand All @@ -71,65 +58,29 @@ func NewClient(addrInfo peer.AddrInfo, options ...Option) (Client, error) {

opts.p2pHost.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, time.Hour)

store := newClientStore()
sub, err := dagsync.NewSubscriber(opts.p2pHost, store.Batching, store.LinkSystem, opts.topic)
if err != nil {
return nil, err
}

ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
adSel := ssb.ExploreRecursive(selector.RecursionLimitDepth(1), ssb.ExploreFields(
func(efsb builder.ExploreFieldsSpecBuilder) {
efsb.Insert("PreviousID", ssb.ExploreRecursiveEdge())
})).Node()

return &client{
adChainDepthLimit: opts.adChainDepthLimit,
entriesDepthLimit: opts.entriesDepthLimit,
maxSyncRetry: opts.maxSyncRetry,
syncRetryBackoff: opts.syncRetryBackoff,

sub: sub,
publisher: addrInfo,
store: store,
adSel: adSel,

host: opts.p2pHost,
ownsHost: ownsHost,
topic: opts.topic,
host: opts.p2pHost,
ownsHost: ownsHost,
topic: opts.topic,
}, nil
}

func selectEntriesWithLimit(limit selector.RecursionLimit) datamodel.Node {
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
return ssb.ExploreRecursive(limit, ssb.ExploreFields(
func(efsb builder.ExploreFieldsSpecBuilder) {
efsb.Insert("Next", ssb.ExploreRecursiveEdge())
})).Node()
}

func (c *client) Distance(ctx context.Context, oldestCid, newestCid cid.Cid) (int, cid.Cid, error) {
if oldestCid == cid.Undef {
return 0, cid.Undef, errors.New("must specify a oldest CID")
}

var rLimit selector.RecursionLimit
if c.adChainDepthLimit == 0 {
rLimit = selector.RecursionLimitNone()
} else {
rLimit = selector.RecursionLimitDepth(c.adChainDepthLimit + 1)
var depthLimit int64
if c.adChainDepthLimit != 0 {
depthLimit = c.adChainDepthLimit + 1
}

stopAt := cidlink.Link{Cid: oldestCid}

ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
adSeqSel := ssb.ExploreFields(
func(efsb builder.ExploreFieldsSpecBuilder) {
efsb.Insert("PreviousID", ssb.ExploreRecursiveEdge())
}).Node()

sel := dagsync.ExploreRecursiveWithStopNode(rLimit, adSeqSel, stopAt)

// Create a linksystem that only counts, and does not store data.
cs := newCountStore()
gsds := dssync.MutexWrap(datastore.NewMapDatastore())
Expand All @@ -139,7 +90,8 @@ func (c *client) Distance(ctx context.Context, oldestCid, newestCid cid.Cid) (in
}
defer sub.Close()

newestCid, err = sub.Sync(ctx, c.publisher, newestCid, sel)
newestCid, err = sub.SyncAdChain(ctx, c.publisher, dagsync.ScopedDepthLimit(depthLimit),
dagsync.WithHeadAdCid(newestCid), dagsync.WithStopAdCid(oldestCid))
if err != nil {
return 0, cid.Undef, err
}
Expand All @@ -153,37 +105,37 @@ func (c *client) Distance(ctx context.Context, oldestCid, newestCid cid.Cid) (in
}

func (c *client) List(ctx context.Context, latestCid cid.Cid, n int, w io.Writer) error {
var rLimit selector.RecursionLimit
if n < 1 {
rLimit = selector.RecursionLimitNone()
} else {
rLimit = selector.RecursionLimitDepth(int64(n))
store := newClientStore()
sub, err := dagsync.NewSubscriber(c.host, store.Batching, store.LinkSystem, c.topic)
if err != nil {
return err
}
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
adSeqSel := ssb.ExploreFields(
func(efsb builder.ExploreFieldsSpecBuilder) {
efsb.Insert("PreviousID", ssb.ExploreRecursiveEdge())
}).Node()
sel := dagsync.ExploreRecursiveWithStopNode(rLimit, adSeqSel, nil)
defer sub.Close()

latestCid, err := c.sub.Sync(ctx, c.publisher, latestCid, sel)
latestCid, err = sub.SyncAdChain(ctx, c.publisher, dagsync.WithHeadAdCid(latestCid), dagsync.ScopedDepthLimit(int64(n)))
if err != nil {
return err
}

return c.store.list(ctx, latestCid, n, w)
return store.list(ctx, latestCid, n, w)
}

func (c *client) GetAdvertisement(ctx context.Context, adCid cid.Cid) (*Advertisement, error) {
store := newClientStore()
sub, err := dagsync.NewSubscriber(c.host, store.Batching, store.LinkSystem, c.topic)
if err != nil {
return nil, err
}
defer sub.Close()

// Sync the advertisement without entries first.
var err error
adCid, err = c.syncAdWithRetry(ctx, adCid)
adCid, err = c.syncAdWithRetry(ctx, adCid, sub)
if err != nil {
return nil, err
}

// Load the synced advertisement from local store.
ad, err := c.store.getAdvertisement(ctx, adCid)
ad, err := store.getAdvertisement(ctx, adCid)
if err != nil {
return nil, err
}
Expand All @@ -196,14 +148,14 @@ func (c *client) GetAdvertisement(ctx context.Context, adCid cid.Cid) (*Advertis
return ad, err
}

func (c *client) syncAdWithRetry(ctx context.Context, adCid cid.Cid) (cid.Cid, error) {
func (c *client) syncAdWithRetry(ctx context.Context, adCid cid.Cid, sub *dagsync.Subscriber) (cid.Cid, error) {
if c.maxSyncRetry == 0 {
return c.sub.Sync(ctx, c.publisher, adCid, c.adSel)
return sub.SyncAdChain(ctx, c.publisher, dagsync.WithHeadAdCid(adCid), dagsync.ScopedDepthLimit(1))
}
var attempt uint64
var err error
for {
adCid, err = c.sub.Sync(ctx, c.publisher, adCid, c.adSel)
adCid, err = sub.SyncAdChain(ctx, c.publisher, dagsync.WithHeadAdCid(adCid), dagsync.ScopedDepthLimit(1))
if err == nil {
return adCid, nil
}
Expand All @@ -223,19 +175,20 @@ func (c *client) syncAdWithRetry(ctx context.Context, adCid cid.Cid) (cid.Cid, e
}

func (c *client) SyncEntriesWithRetry(ctx context.Context, id cid.Cid) error {
var attempt uint64
var recurLimit selector.RecursionLimit
if c.entriesDepthLimit == 0 {
recurLimit = selector.RecursionLimitNone()
} else {
recurLimit = selector.RecursionLimitDepth(c.entriesDepthLimit)
store := newClientStore()
sub, err := dagsync.NewSubscriber(c.host, store.Batching, store.LinkSystem, c.topic)
if err != nil {
return err
}
defer sub.Close()

var attempt uint64
recurLimit := c.entriesDepthLimit

for {
sel := selectEntriesWithLimit(recurLimit)
_, err := c.sub.Sync(ctx, c.publisher, id, sel)
err := sub.SyncEntries(ctx, c.publisher, id, dagsync.ScopedDepthLimit(recurLimit))
if err == nil {
// Synced everything asked for by the selector.
// Synced everything asked for.
return nil
}
if strings.HasSuffix(err.Error(), "content not found") {
Expand All @@ -245,26 +198,25 @@ func (c *client) SyncEntriesWithRetry(ctx context.Context, id cid.Cid) error {
if attempt > c.maxSyncRetry {
return fmt.Errorf("exceeded maximum retries syncing entries: %w", err)
}
nextMissing, visitedDepth, present := c.findNextMissingChunkLink(ctx, id)
nextMissing, visitedDepth, present := findNextMissingChunkLink(ctx, id, store)
if !present {
// Reached the end of the chain.
return nil
}
id = nextMissing
remainingLimit := recurLimit.Depth() - visitedDepth
recurLimit = selector.RecursionLimitDepth(remainingLimit)
recurLimit -= visitedDepth
fmt.Fprintf(os.Stderr, "entries sync retry %d: %s\n", attempt, err)
time.Sleep(c.syncRetryBackoff)
}
}

func (c *client) findNextMissingChunkLink(ctx context.Context, next cid.Cid) (cid.Cid, int64, bool) {
func findNextMissingChunkLink(ctx context.Context, next cid.Cid, store *ClientStore) (cid.Cid, int64, bool) {
var depth int64
for {
if !isPresent(next) {
return cid.Undef, depth, false
}
c, err := c.store.getNextChunkLink(ctx, next)
c, err := store.getNextChunkLink(ctx, next)
if errors.Is(err, datastore.ErrNotFound) {
return next, depth, true
}
Expand All @@ -274,11 +226,8 @@ func (c *client) findNextMissingChunkLink(ctx context.Context, next cid.Cid) (ci
}

func (c *client) Close() error {
err := c.sub.Close()
if c.ownsHost {
if err := c.host.Close(); err != nil {
return err
}
if !c.ownsHost {
return nil
}
return err
return c.host.Close()
}
15 changes: 8 additions & 7 deletions pkg/ads/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ func adsGetAction(cctx *cli.Context) error {
return err
}

for _, adCid := range adCids {
for i, adCid := range adCids {
if i != 0 {
fmt.Println()
}

ad, err := pubClient.GetAdvertisement(cctx.Context, adCid)
if err != nil {
if ad == nil {
Expand Down Expand Up @@ -180,18 +184,16 @@ func adsGetAction(cctx *cli.Context) error {
} else {
fmt.Println("Entries: None")
}

return nil
continue
}

// Sync entries if not a removal advertisement and has entries.
if ad.HasEntries() {
err = pubClient.SyncEntriesWithRetry(cctx.Context, ad.Entries.Root())
if err != nil {
if !errors.Is(err, adpub.ErrContentNotFound) {
fmt.Fprintf(os.Stderr, "⚠️ Failed to sync entries for advertisement %s. Content no longer hosted\n", ad.ID)
}
fmt.Fprintf(os.Stderr, "⚠️ Failed to sync entries for advertisement %s. %s\n", ad.ID, err)
}
continue
}

fmt.Println("Entries:")
Expand All @@ -215,7 +217,6 @@ func adsGetAction(cctx *cli.Context) error {
if entriesOutput != "" {
fmt.Println(entriesOutput)
}
fmt.Println()
}
return nil
}