Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass cids instead of nodes around in EnumerateChildrenAsync #3598

Merged
merged 2 commits into from
Feb 14, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 36 additions & 41 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (n *dagService) Remove(nd node.Node) error {

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error {
return EnumerateChildren(ctx, serv, c, cid.NewSet().Visit, false)
return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit)
}

// FindLinks searches this nodes links for the given key,
Expand Down Expand Up @@ -394,56 +394,51 @@ func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit
var FetchGraphConcurrency = 8

func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
if !visit(c) {
return nil
}

root, err := ds.Get(ctx, c)
if err != nil {
return err
}

feed := make(chan node.Node)
out := make(chan *NodeOption)
feed := make(chan *cid.Cid)
out := make(chan node.Node)
done := make(chan struct{})

var setlk sync.Mutex


errChan := make(chan error)
fetchersCtx, cancel := context.WithCancel(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put a defer cancel() after this call and then remove the other call youre making to cancel


defer cancel()

for i := 0; i < FetchGraphConcurrency; i++ {
go func() {
for n := range feed {
links := n.Links()
cids := make([]*cid.Cid, 0, len(links))
for _, l := range links {
setlk.Lock()
unseen := visit(l.Cid)
setlk.Unlock()
if unseen {
cids = append(cids, l.Cid)
}
for ic := range feed {
n, err := ds.Get(ctx, ic)
if err != nil {
errChan <- err
return
}

for nopt := range ds.GetMany(ctx, cids) {

setlk.Lock()
unseen := visit(ic)
setlk.Unlock()

if unseen {
select {
case out <- nopt:
case <-ctx.Done():
case out <- n:
case <-fetchersCtx.Done():
return
}
}
select {
case done <- struct{}{}:
case <-ctx.Done():
case <-fetchersCtx.Done():
}
}
}()
}
defer close(feed)

send := feed
var todobuffer []node.Node
var todobuffer []*cid.Cid
var inProgress int

next := root
next := c
for {
select {
case send <- next:
Expand All @@ -460,18 +455,18 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi
if inProgress == 0 && next == nil {
return nil
}
case nc := <-out:
if nc.Err != nil {
return nc.Err
}

if next == nil {
next = nc.Node
send = feed
} else {
todobuffer = append(todobuffer, nc.Node)
case nd := <-out:
for _, lnk := range nd.Links() {
if next == nil {
next = lnk.Cid
send = feed
} else {
todobuffer = append(todobuffer, lnk.Cid)
}
}

case err := <-errChan:
return err

case <-ctx.Done():
return ctx.Err()
}
Expand Down