Skip to content

Commit

Permalink
Pass cids instead of nodes around in EnumerateChildrenAsync
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Iaroslav Gridin <voker57@gmail.com>
  • Loading branch information
Voker57 committed Jan 19, 2017
1 parent a90c508 commit 88023a9
Showing 1 changed file with 27 additions and 35 deletions.
62 changes: 27 additions & 35 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,38 +394,21 @@ func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit
var FetchGraphConcurrency = 8

func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
if !visit(c) {
return nil
}

root, err := ds.Get(ctx, c)
if err != nil {
return err
}

feed := make(chan node.Node)
out := make(chan *NodeOption)
feed := make(chan *cid.Cid)
out := make(chan *cid.Cid)
done := make(chan struct{})

var setlk sync.Mutex

for i := 0; i < FetchGraphConcurrency; i++ {
go func() {
for n := range feed {
links := n.Links()
cids := make([]*cid.Cid, 0, len(links))
for _, l := range links {
setlk.Lock()
unseen := visit(l.Cid)
setlk.Unlock()
if unseen {
cids = append(cids, l.Cid)
}
}

for nopt := range ds.GetMany(ctx, cids) {
for l := range feed {
setlk.Lock()
unseen := visit(l)
setlk.Unlock()
if unseen {
select {
case out <- nopt:
case out <- l:
case <-ctx.Done():
return
}
Expand All @@ -440,10 +423,10 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi
defer close(feed)

send := feed
var todobuffer []node.Node
var todobuffer []*cid.Cid
var inProgress int

next := root
next := c
for {
select {
case send <- next:
Expand All @@ -460,16 +443,25 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi
if inProgress == 0 && next == nil {
return nil
}
case nc := <-out:
if nc.Err != nil {
return nc.Err
case l := <-out:
n, err := ds.Get(ctx, l)
if err != nil {
return err
}

if next == nil {
next = nc.Node
send = feed
} else {
todobuffer = append(todobuffer, nc.Node)
links := n.Links()
if len(links) > 0 {
cids := make([]*cid.Cid, len(links), len(links))
for i, l := range links {
cids[i] = l.Cid
}
if next == nil {
next = cids[0]
todobuffer = append(todobuffer, cids[1:]...)
send = feed
} else {
todobuffer = append(todobuffer, cids...)
}
}

case <-ctx.Done():
Expand Down

0 comments on commit 88023a9

Please sign in to comment.