Skip to content

Commit

Permalink
Calculate distance by counting but not storing ads (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero authored Jul 28, 2023
1 parent e48b504 commit 13a0193
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 47 deletions.
23 changes: 19 additions & 4 deletions pkg/adpub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/ipni/go-libipni/dagsync"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
)

Expand All @@ -44,6 +46,9 @@ type client struct {

// adSel is the selector for a single advertisement.
adSel ipld.Node

host host.Host
topic string
}

var ErrContentNotFound = errors.New("content not found at publisher")
Expand Down Expand Up @@ -83,6 +88,9 @@ func NewClient(addrInfo peer.AddrInfo, options ...Option) (Client, error) {
publisher: addrInfo,
store: store,
adSel: adSel,

host: h,
topic: opts.topic,
}, nil
}

Expand All @@ -103,7 +111,7 @@ func (c *client) Distance(ctx context.Context, oldestCid, newestCid cid.Cid) (in
if c.adChainDepthLimit == 0 {
rLimit = selector.RecursionLimitNone()
} else {
rLimit = selector.RecursionLimitDepth(c.adChainDepthLimit)
rLimit = selector.RecursionLimitDepth(c.adChainDepthLimit + 1)
}

stopAt := cidlink.Link{Cid: oldestCid}
Expand All @@ -116,16 +124,23 @@ func (c *client) Distance(ctx context.Context, oldestCid, newestCid cid.Cid) (in

sel := dagsync.ExploreRecursiveWithStopNode(rLimit, adSeqSel, stopAt)

newestCid, err := c.sub.Sync(ctx, c.publisher, newestCid, sel)
// Create a linksystem that only counts, and does not store data.
cs := newCountStore()
gsds := dssync.MutexWrap(datastore.NewMapDatastore())
sub, err := dagsync.NewSubscriber(c.host, gsds, cs.LinkSystem, c.topic)
if err != nil {
return 0, cid.Undef, err
}

dist, err := c.store.distance(ctx, oldestCid, newestCid, c.adChainDepthLimit)
newestCid, err = sub.Sync(ctx, c.publisher, newestCid, sel)
if err != nil {
return 0, cid.Undef, err
}

dist := cs.distance()
if int64(dist) > c.adChainDepthLimit {
dist = -1
}

return dist, newestCid, nil
}

Expand Down
40 changes: 0 additions & 40 deletions pkg/adpub/client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package adpub
import (
"bytes"
"context"
"fmt"
"io"

"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -169,45 +168,6 @@ func (s *ClientStore) getAdvertisement(ctx context.Context, id cid.Cid) (*Advert
return a, nil
}

func (s *ClientStore) distance(ctx context.Context, oldestCid, newestCid cid.Cid, depthLimit int64) (int, error) {
var count int
for newestCid != oldestCid {
val, err := s.Batching.Get(ctx, datastore.NewKey(newestCid.String()))
if err != nil {
return 0, err
}

nb := schema.AdvertisementPrototype.NewBuilder()
decoder, err := multicodec.LookupDecoder(newestCid.Prefix().Codec)
if err != nil {
return 0, err
}

err = decoder(nb, bytes.NewBuffer(val))
if err != nil {
return 0, err
}
node := nb.Build()

ad, err := schema.UnwrapAdvertisement(node)
if err != nil {
return 0, err
}

count++

if ad.PreviousID == nil {
break
}
newestCid = ad.PreviousID.(cidlink.Link).Cid

if depthLimit != 0 && count == int(depthLimit) {
return 0, fmt.Errorf("exceeded limit %d+", depthLimit)
}
}
return count, nil
}

func (s *ClientStore) list(ctx context.Context, nextCid cid.Cid, n int, w io.Writer) error {
for i := 0; i < n; i++ {
val, err := s.Batching.Get(ctx, datastore.NewKey(nextCid.String()))
Expand Down
46 changes: 46 additions & 0 deletions pkg/adpub/count_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package adpub

import (
"io"

"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"

// Import so these codecs get registered.
_ "github.com/ipld/go-ipld-prime/codec/dagcbor"
_ "github.com/ipld/go-ipld-prime/codec/dagjson"
)

type CountStore struct {
datastore.Batching
ipld.LinkSystem
count int
}

func newCountStore() *CountStore {
cs := &CountStore{
Batching: datastore.NewNullDatastore(),
}
lsys := cidlink.DefaultLinkSystem()
lsys.StorageReadOpener = func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
return nil, datastore.ErrNotFound
}
lsys.StorageWriteOpener = func(lctx ipld.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) {
return io.Discard, func(lnk ipld.Link) error {
cs.count++
return nil
}, nil
}
cs.LinkSystem = lsys
return cs
}

func (s *CountStore) distance() int {
return s.count
}

func (s *CountStore) clear() {

Check failure on line 43 in pkg/adpub/count_store.go

View workflow job for this annotation

GitHub Actions / All

func (*CountStore).clear is unused (U1000)
s.count = 0
panic("hre")
}
22 changes: 21 additions & 1 deletion pkg/dtrack/distance_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ func updateTrack(ctx context.Context, pid peer.ID, track *distTrack, provCache *
track.errType = errTypeNone
track.ad = pinfo.LastAdvertisement
track.dist = dist
track.head = head
if dist != -1 {
track.head = head
}
updates <- DistanceUpdate{
ID: pid,
Distance: dist,
Expand All @@ -197,6 +199,15 @@ func updateTrack(ctx context.Context, pid peer.ID, track *distTrack, provCache *
}
track.err = nil
track.errType = errTypeNone
if dist == -1 {
track.dist = -1
track.head = cid.Undef
updates <- DistanceUpdate{
ID: pid,
Distance: -1,
}
return
}
if head != track.head {
track.dist += dist
track.head = head
Expand All @@ -219,6 +230,15 @@ func updateTrack(ctx context.Context, pid peer.ID, track *distTrack, provCache *
}
track.err = nil
track.errType = errTypeNone
if dist == -1 {
track.dist = -1
track.head = cid.Undef
updates <- DistanceUpdate{
ID: pid,
Distance: -1,
}
return
}
track.ad = pinfo.LastAdvertisement
track.dist -= dist
updated = true
Expand Down
13 changes: 11 additions & 2 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,20 @@ func followDistance(cctx *cli.Context, include, exclude map[peer.ID]struct{}, pc
}

fmt.Fprintln(os.Stderr, "Showing provider distance updates, ctrl-c to cancel...")
updates := dtrack.RunDistanceTracker(cctx.Context, include, exclude, pc, cctx.Int64("ad-depth-limit"), trackUpdateIn)
limit := cctx.Int64("ad-depth-limit")
updates := dtrack.RunDistanceTracker(cctx.Context, include, exclude, pc, limit, trackUpdateIn)
for update := range updates {
if update.Err != nil {
fmt.Fprintln(os.Stderr, "Provider", update.ID, "distance error:", update.Err)
continue
}
fmt.Println("Provider", update.ID, "distance to head advertisement:", update.Distance)
var dist string
if update.Distance == -1 {
dist = fmt.Sprintf("exceeded limit %d+", limit)
} else {
dist = fmt.Sprintf("%d", update.Distance)
}
fmt.Println("Provider", update.ID, "distance to head advertisement:", dist)
}
return nil
}
Expand Down Expand Up @@ -320,6 +327,8 @@ func showProviderInfo(cctx *cli.Context, pinfo *model.ProviderInfo) {
dist, _, err := getLastSeenDistance(cctx, pinfo)
if err != nil {
fmt.Println("error:", err)
} else if dist == -1 {
fmt.Printf("exceeded limit %d+", cctx.Int64("ad-depth-limit"))
} else {
fmt.Println(dist)
}
Expand Down

0 comments on commit 13a0193

Please sign in to comment.