diff --git a/hamt/hamt.go b/hamt/hamt.go index 0b49e9696..b7ac0a2f4 100644 --- a/hamt/hamt.go +++ b/hamt/hamt.go @@ -24,6 +24,7 @@ import ( "context" "fmt" "os" + "sync" bitfield "github.com/Stebalien/go-bitfield" cid "github.com/ipfs/go-cid" @@ -104,7 +105,6 @@ func NewHamtFromDag(dserv ipld.DAGService, nd ipld.Node) (*Shard, error) { return nil, err } - if fsn.Type() != format.THAMTShard { return nil, fmt.Errorf("node was not a dir shard") } @@ -202,6 +202,14 @@ func (sv *shardValue) Label() string { return sv.key } +func (ds *Shard) makeShardValue(lnk *ipld.Link) *shardValue { + lnk2 := *lnk + return &shardValue{ + key: lnk.Name[ds.maxpadlen:], + val: &lnk2, + } +} + func hash(val []byte) []byte { h := murmur3.New64() h.Write(val) @@ -253,6 +261,24 @@ func (ds *Shard) Find(ctx context.Context, name string) (*ipld.Link, error) { return out, nil } +type linkType int + +const ( + invalidLink linkType = iota + shardLink + shardValueLink +) + +func (ds *Shard) childLinkType(lnk *ipld.Link) (linkType, error) { + if len(lnk.Name) < ds.maxpadlen { + return invalidLink, fmt.Errorf("invalid link name '%s'", lnk.Name) + } + if len(lnk.Name) == ds.maxpadlen { + return shardLink, nil + } + return shardValueLink, nil +} + // getChild returns the i'th child of this shard. If it is cached in the // children array, it will return it from there. Otherwise, it loads the child // node from disk. @@ -277,12 +303,13 @@ func (ds *Shard) getChild(ctx context.Context, i int) (child, error) { // as a 'child' interface func (ds *Shard) loadChild(ctx context.Context, i int) (child, error) { lnk := ds.nd.Links()[i] - if len(lnk.Name) < ds.maxpadlen { - return nil, fmt.Errorf("invalid link name '%s'", lnk.Name) + lnkLinkType, err := ds.childLinkType(lnk) + if err != nil { + return nil, err } var c child - if len(lnk.Name) == ds.maxpadlen { + if lnkLinkType == shardLink { nd, err := lnk.GetNode(ctx, ds.dserv) if err != nil { return nil, err @@ -294,11 +321,7 @@ func (ds *Shard) loadChild(ctx context.Context, i int) (child, error) { c = cds } else { - lnk2 := *lnk - c = &shardValue{ - key: lnk.Name[ds.maxpadlen:], - val: &lnk2, - } + c = ds.makeShardValue(lnk) } ds.children[i] = c @@ -383,10 +406,20 @@ func (ds *Shard) getValue(ctx context.Context, hv *hashBits, key string, cb func // EnumLinks collects all links in the Shard. func (ds *Shard) EnumLinks(ctx context.Context) ([]*ipld.Link, error) { var links []*ipld.Link - err := ds.ForEachLink(ctx, func(l *ipld.Link) error { - links = append(links, l) + var setlk sync.Mutex + + getLinks := makeAsyncTrieGetLinks(ds.dserv, func(sv *shardValue) error { + lnk := sv.val + lnk.Name = sv.key + setlk.Lock() + links = append(links, lnk) + setlk.Unlock() return nil }) + + cset := cid.NewSet() + + err := dag.EnumerateChildrenAsync(ctx, getLinks, ds.nd.Cid(), cset.Visit) return links, err } @@ -400,6 +433,44 @@ func (ds *Shard) ForEachLink(ctx context.Context, f func(*ipld.Link) error) erro }) } +// makeAsyncTrieGetLinks builds a getLinks function that can be used with EnumerateChildrenAsync +// to iterate a HAMT shard. It takes an IPLD Dag Service to fetch nodes, and a call back that will get called +// on all links to leaf nodes in a HAMT tree, so they can be collected for an EnumLinks operation +func makeAsyncTrieGetLinks(dagService ipld.DAGService, onShardValue func(*shardValue) error) dag.GetLinks { + + return func(ctx context.Context, currentCid cid.Cid) ([]*ipld.Link, error) { + node, err := dagService.Get(ctx, currentCid) + if err != nil { + return nil, err + } + directoryShard, err := NewHamtFromDag(dagService, node) + if err != nil { + return nil, err + } + + childShards := make([]*ipld.Link, 0, len(directoryShard.children)) + links := directoryShard.nd.Links() + for idx := range directoryShard.children { + lnk := links[idx] + lnkLinkType, err := directoryShard.childLinkType(lnk) + + if err != nil { + return nil, err + } + if lnkLinkType == shardLink { + childShards = append(childShards, lnk) + } else { + sv := directoryShard.makeShardValue(lnk) + err := onShardValue(sv) + if err != nil { + return nil, err + } + } + } + return childShards, nil + } +} + func (ds *Shard) walkTrie(ctx context.Context, cb func(*shardValue) error) error { for idx := range ds.children { c, err := ds.getChild(ctx, idx) diff --git a/hamt/hamt_test.go b/hamt/hamt_test.go index e56d9363c..ffbb676eb 100644 --- a/hamt/hamt_test.go +++ b/hamt/hamt_test.go @@ -11,6 +11,7 @@ import ( dag "github.com/ipfs/go-merkledag" mdtest "github.com/ipfs/go-merkledag/test" + ft "github.com/ipfs/go-unixfs" ipld "github.com/ipfs/go-ipld-format" @@ -100,6 +101,8 @@ func assertSerializationWorks(ds ipld.DAGService, s *Shard) error { return fmt.Errorf("links arrays are different sizes") } + sort.Stable(dag.LinkSlice(linksA)) + sort.Stable(dag.LinkSlice(linksB)) for i, a := range linksA { b := linksB[i] if a.Name != b.Name { @@ -280,14 +283,17 @@ func TestSetAfterMarshal(t *testing.T) { t.Fatal(err) } - empty := ft.EmptyDirNode() for i := 0; i < 100; i++ { + empty := ft.EmptyDirNode() err := nds.Set(ctx, fmt.Sprintf("moredirs%d", i), empty) if err != nil { t.Fatal(err) } } + nd, err = nds.Node() + nds, err = NewHamtFromDag(ds, nd) + links, err := nds.EnumLinks(ctx) if err != nil { t.Fatal(err) @@ -319,6 +325,9 @@ func TestDuplicateAddShard(t *testing.T) { t.Fatal(err) } + node, err := dir.Node() + dir, err = NewHamtFromDag(ds, node) + lnks, err := dir.EnumLinks(ctx) if err != nil { t.Fatal(err) @@ -411,6 +420,9 @@ func TestRemoveElemsAfterMarshal(t *testing.T) { } } + nd, err = nds.Node() + nds, err = NewHamtFromDag(ds, nd) + links, err := nds.EnumLinks(ctx) if err != nil { t.Fatal(err)