Skip to content

Commit

Permalink
Add distance tracking functionality to provider command
Browse files Browse the repository at this point in the history
The provider command has a new --follow-dist option that continues to show distance updates for providers.

- Add dtrack package that has distance tracking functionslity
- New provider flag to track distance updates
- Default limit to ad chain depth
- Default limit to entries chain depth
- Proper error message if ad chain depth limit hit
- Disable logging done by dependencies
  • Loading branch information
gammazero committed Jul 13, 2023
1 parent a74e76b commit cb5acaa
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 38 deletions.
6 changes: 6 additions & 0 deletions cmd/ipni/ipni.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"

logging "github.com/ipfs/go-log/v2"
"github.com/ipni/ipni-cli"
"github.com/ipni/ipni-cli/pkg/ads"
"github.com/ipni/ipni-cli/pkg/find"
Expand All @@ -13,7 +14,12 @@ import (
"github.com/urfave/cli/v2"
)

var log = logging.Logger("ipni-cli")

Check failure on line 17 in cmd/ipni/ipni.go

View workflow job for this annotation

GitHub Actions / All

var log is unused (U1000)

Check failure on line 17 in cmd/ipni/ipni.go

View workflow job for this annotation

GitHub Actions / All

var log is unused (U1000)

func main() {
// Disable logging that happens in packages such as data-transfer.
_ = logging.SetLogLevel("*", "fatal")

app := &cli.App{
Name: "ipni",
Usage: "Commands to interact with IPNI indexers and index providers",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/filecoin-project/go-address v1.1.0
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-car/v2 v2.10.0
github.com/ipld/go-ipld-prime v0.20.0
github.com/ipni/go-libipni v0.2.10
Expand Down Expand Up @@ -74,7 +75,6 @@ require (
github.com/ipfs/go-ipld-legacy v0.1.1 // indirect
github.com/ipfs/go-libipfs v0.6.2 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/ipfs/go-merkledag v0.10.0 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipfs/go-peertaskqueue v0.8.1 // indirect
Expand Down
33 changes: 26 additions & 7 deletions pkg/adpub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import (
type Client interface {
GetAdvertisement(context.Context, cid.Cid) (*Advertisement, error)
Close() error
ClearStore()
Distance(context.Context, cid.Cid, cid.Cid) (int, cid.Cid, error)
List(context.Context, cid.Cid, int, io.Writer) error
SyncEntriesWithRetry(context.Context, cid.Cid) error
}

type client struct {
entriesDepthLimit selector.RecursionLimit
adChainDepthLimit int64
entriesDepthLimit int64
maxSyncRetry uint64
syncRetryBackoff time.Duration

Expand Down Expand Up @@ -72,6 +74,7 @@ func NewClient(addrInfo peer.AddrInfo, options ...Option) (Client, error) {
})).Node()

return &client{
adChainDepthLimit: opts.adChainDepthLimit,
entriesDepthLimit: opts.entriesDepthLimit,
maxSyncRetry: opts.maxSyncRetry,
syncRetryBackoff: opts.syncRetryBackoff,
Expand All @@ -97,8 +100,7 @@ func (c *client) Distance(ctx context.Context, oldestCid, newestCid cid.Cid) (in
}

// Sync the advertisement without entries first.
var err error
_, err = c.syncAdWithRetry(ctx, oldestCid)
_, err := c.syncAdWithRetry(ctx, oldestCid)
if err != nil {
return 0, cid.Undef, err
}
Expand All @@ -109,8 +111,12 @@ func (c *client) Distance(ctx context.Context, oldestCid, newestCid cid.Cid) (in
return 0, cid.Undef, err
}

// TODO: Allow a maximum depth to be specified for the ad chain.
rLimit := selector.RecursionLimitNone()
var rLimit selector.RecursionLimit
if c.adChainDepthLimit == 0 {
rLimit = selector.RecursionLimitNone()
} else {
rLimit = selector.RecursionLimitDepth(c.adChainDepthLimit)
}

stopAt := cidlink.Link{Cid: ad.PreviousID}

Expand All @@ -127,7 +133,7 @@ func (c *client) Distance(ctx context.Context, oldestCid, newestCid cid.Cid) (in
return 0, cid.Undef, err
}

dist, err := c.store.distance(ctx, oldestCid, newestCid)
dist, err := c.store.distance(ctx, oldestCid, newestCid, c.adChainDepthLimit)
if err != nil {
return 0, cid.Undef, err
}
Expand Down Expand Up @@ -180,6 +186,9 @@ func (c *client) GetAdvertisement(ctx context.Context, adCid cid.Cid) (*Advertis
}

func (c *client) syncAdWithRetry(ctx context.Context, adCid cid.Cid) (cid.Cid, error) {
if c.maxSyncRetry == 0 {
return c.sub.Sync(ctx, c.publisher, adCid, c.adSel)
}
var attempt uint64
var err error
for {
Expand All @@ -204,7 +213,13 @@ func (c *client) syncAdWithRetry(ctx context.Context, adCid cid.Cid) (cid.Cid, e

func (c *client) SyncEntriesWithRetry(ctx context.Context, id cid.Cid) error {
var attempt uint64
recurLimit := c.entriesDepthLimit
var recurLimit selector.RecursionLimit
if c.entriesDepthLimit == 0 {
recurLimit = selector.RecursionLimitNone()
} else {
recurLimit = selector.RecursionLimitDepth(c.entriesDepthLimit)
}

for {
sel := selectEntriesWithLimit(recurLimit)
_, err := c.sub.Sync(ctx, c.publisher, id, sel)
Expand Down Expand Up @@ -250,3 +265,7 @@ func (c *client) findNextMissingChunkLink(ctx context.Context, next cid.Cid) (ci
func (c *client) Close() error {
return c.sub.Close()
}

func (c *client) ClearStore() {
c.store.clear()
}
11 changes: 10 additions & 1 deletion pkg/adpub/client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package adpub
import (
"bytes"
"context"
"fmt"
"io"

"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -168,7 +169,7 @@ func (s *ClientStore) getAdvertisement(ctx context.Context, id cid.Cid) (*Advert
return a, nil
}

func (s *ClientStore) distance(ctx context.Context, oldestCid, newestCid cid.Cid) (int, error) {
func (s *ClientStore) distance(ctx context.Context, oldestCid, newestCid cid.Cid, depthLimit int64) (int, error) {
var count int
for newestCid != oldestCid {
val, err := s.Batching.Get(ctx, datastore.NewKey(newestCid.String()))
Expand Down Expand Up @@ -199,6 +200,10 @@ func (s *ClientStore) distance(ctx context.Context, oldestCid, newestCid cid.Cid
break
}
newestCid = ad.PreviousID.(cidlink.Link).Cid

if count == int(depthLimit) {
return 0, fmt.Errorf("exceeded limit %d+", depthLimit)
}
}
return count, nil
}
Expand Down Expand Up @@ -241,3 +246,7 @@ func (s *ClientStore) list(ctx context.Context, nextCid cid.Cid, n int, w io.Wri
}
return nil
}

func (s *ClientStore) clear() {
s.Batching = dssync.MutexWrap(datastore.NewMapDatastore())
}
30 changes: 21 additions & 9 deletions pkg/adpub/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import (
"errors"
"fmt"
"time"
)

"github.com/ipld/go-ipld-prime/traversal/selector"
const (
defaultAdChainDepthLimit = 50000
defaultEntriesDepthLimit = 1000
)

type config struct {
entriesDepthLimit selector.RecursionLimit
adChainDepthLimit int64
entriesDepthLimit int64
maxSyncRetry uint64
syncRetryBackoff time.Duration
topic string
Expand All @@ -21,9 +25,9 @@ type Option func(*config) error
// getOpts creates a config and applies Options to it.
func getOpts(opts []Option) (config, error) {
cfg := config{
entriesDepthLimit: selector.RecursionLimitNone(),
adChainDepthLimit: defaultAdChainDepthLimit,
entriesDepthLimit: defaultEntriesDepthLimit,
topic: "/indexer/ingest/mainnet",
maxSyncRetry: 3,
syncRetryBackoff: 500 * time.Millisecond,
}

Expand All @@ -35,6 +39,18 @@ func getOpts(opts []Option) (config, error) {
return cfg, nil
}

// WithAdChainDepthLimit sets the depth limit when syncing an advertisement
// chain. Setting to 0 means no limit.
func WithAdChainDepthLimit(limit int64) Option {
return func(c *config) error {
if limit < 0 {
return errors.New("ad chain depth limit cannot be negative")
}
c.adChainDepthLimit = limit
return nil
}
}

// WithSyncRetryBackoff sets the length of time to wait before retrying a faild
// sync. Defaults to 500ms if unset.
func WithSyncRetryBackoff(d time.Duration) Option {
Expand Down Expand Up @@ -69,11 +85,7 @@ func WithEntriesDepthLimit(depthLimit int64) Option {
if depthLimit < 0 {
return errors.New("ad entries depth limit cannot be negative")
}
if depthLimit == 0 {
c.entriesDepthLimit = selector.RecursionLimitNone()
} else {
c.entriesDepthLimit = selector.RecursionLimitDepth(depthLimit)
}
c.entriesDepthLimit = depthLimit
return nil
}
}
Loading

0 comments on commit cb5acaa

Please sign in to comment.