From 728ff6dd2166cb780cfb3a2cbc83de196ed8ff48 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 27 Jan 2017 17:19:58 -0800 Subject: [PATCH] Make pinset sharding deterministic Making this deterministic keeps us from creating an exponential amount of objects as the number of pins in the set increases. License: MIT Signed-off-by: Jeromy --- pin/set.go | 23 ++++------------- pin/set_test.go | 65 +++++++++++++++++++++++++++++++++++++------------ 2 files changed, 55 insertions(+), 33 deletions(-) diff --git a/pin/set.go b/pin/set.go index 89791b1b606..01e0e198b07 100644 --- a/pin/set.go +++ b/pin/set.go @@ -3,7 +3,6 @@ package pin import ( "bytes" "context" - "crypto/rand" "encoding/binary" "errors" "fmt" @@ -26,14 +25,6 @@ const ( maxItems = 8192 ) -func randomSeed() (uint32, error) { - var buf [4]byte - if _, err := rand.Read(buf[:]); err != nil { - return 0, err - } - return binary.LittleEndian.Uint32(buf[:]), nil -} - func hash(seed uint32, c *cid.Cid) uint32 { var buf [4]byte binary.LittleEndian.PutUint32(buf[:], seed) @@ -63,11 +54,7 @@ func (s sortByHash) Swap(a, b int) { s.links[a], s.links[b] = s.links[b], s.links[a] } -func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) { - seed, err := randomSeed() - if err != nil { - return nil, err - } +func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, depth uint32, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) { links := make([]*node.Link, 0, defaultFanout+maxItems) for i := 0; i < defaultFanout; i++ { links = append(links, &node.Link{Cid: emptyKey}) @@ -82,7 +69,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint hdr := &pb.Set{ Version: proto.Uint32(1), Fanout: proto.Uint32(defaultFanout), - Seed: proto.Uint32(seed), + Seed: proto.Uint32(depth), } if err := writeHdr(n, hdr); err != nil { return nil, err @@ -129,7 +116,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint if !ok { break } - h := hash(seed, k) % defaultFanout + h := hash(depth, k) % defaultFanout hashed[h] = append(hashed[h], k) } @@ -142,7 +129,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint childIter := getCidListIterator(items) // recursively create a pinset from the items for this bucket index - child, err := storeItems(ctx, dag, uint64(len(items)), childIter, internalKeys) + child, err := storeItems(ctx, dag, uint64(len(items)), depth+1, childIter, internalKeys) if err != nil { return nil, err } @@ -296,7 +283,7 @@ func getCidListIterator(cids []*cid.Cid) itemIterator { func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.ProtoNode, error) { iter := getCidListIterator(cids) - n, err := storeItems(ctx, dag, uint64(len(cids)), iter, internalKeys) + n, err := storeItems(ctx, dag, uint64(len(cids)), 0, iter, internalKeys) if err != nil { return nil, err } diff --git a/pin/set_test.go b/pin/set_test.go index c409fae4b95..788af5a46c0 100644 --- a/pin/set_test.go +++ b/pin/set_test.go @@ -2,40 +2,75 @@ package pin import ( "context" - "fmt" - "os" + "encoding/binary" "testing" + blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" + bserv "github.com/ipfs/go-ipfs/blockservice" + offline "github.com/ipfs/go-ipfs/exchange/offline" dag "github.com/ipfs/go-ipfs/merkledag" - mdtest "github.com/ipfs/go-ipfs/merkledag/test" + ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore" + dsq "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/query" cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid" ) func ignoreCids(_ *cid.Cid) {} -func TestSet(t *testing.T) { - ds := mdtest.Mock() - limit := 10000 // 10000 reproduces the pinloss issue fairly reliably - - if os.Getenv("STRESS_IT_OUT_YO") != "" { - limit = 10000000 +func objCount(d ds.Datastore) int { + q := dsq.Query{KeysOnly: true} + res, err := d.Query(q) + if err != nil { + panic(err) } - var inputs []*cid.Cid - for i := 0; i < limit; i++ { - c, err := ds.Add(dag.NodeWithData([]byte(fmt.Sprint(i)))) - if err != nil { - t.Fatal(err) + + var count int + for { + _, ok := res.NextSync() + if !ok { + break } + count++ + } + return count +} + +func TestSet(t *testing.T) { + dst := ds.NewMapDatastore() + bstore := blockstore.NewBlockstore(dst) + ds := dag.NewDAGService(bserv.New(bstore, offline.Exchange(bstore))) + + // this value triggers the creation of a recursive shard. + // If the recursive sharding is done improperly, this will result in + // an infinite recursion and crash (OOM) + limit := uint32((defaultFanout * maxItems) + 1) + + var inputs []*cid.Cid + buf := make([]byte, 4) + for i := uint32(0); i < limit; i++ { + binary.BigEndian.PutUint32(buf, i) + c := dag.NewRawNode(buf).Cid() inputs = append(inputs, c) } + _, err := storeSet(context.Background(), ds, inputs[:len(inputs)-1], ignoreCids) + if err != nil { + t.Fatal(err) + } + + objs1 := objCount(dst) + out, err := storeSet(context.Background(), ds, inputs, ignoreCids) if err != nil { t.Fatal(err) } + objs2 := objCount(dst) + if objs2-objs1 > 2 { + t.Fatal("set sharding does not appear to be deterministic") + } + // weird wrapper node because loadSet expects us to pass an // object pointing to multiple named sets setroot := &dag.ProtoNode{} @@ -49,7 +84,7 @@ func TestSet(t *testing.T) { t.Fatal(err) } - if len(outset) != limit { + if uint32(len(outset)) != limit { t.Fatal("got wrong number", len(outset), limit) }