Skip to content

Commit

Permalink
Merge pull request #4296 from ipfs/feat/parallel-batch
Browse files Browse the repository at this point in the history
parallelize batch flushing
  • Loading branch information
whyrusleeping authored Oct 14, 2017
2 parents e7acb96 + 9de031b commit 9c06a0e
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 28 deletions.
99 changes: 99 additions & 0 deletions merkledag/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package merkledag

import (
"runtime"

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format"
blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
)

// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// TODO(#4299): Experiment with multiple datastores, storage devices, and CPUs to find
// the right value/formula.
var ParallelBatchCommits = runtime.NumCPU() * 2

// Batch is a buffer for batching adds to a dag.
type Batch struct {
ds *dagService

activeCommits int
commitError error
commitResults chan error

blocks []blocks.Block
size int

MaxSize int
MaxBlocks int
}

func (t *Batch) processResults() {
for t.activeCommits > 0 && t.commitError == nil {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.commitError = err
}
default:
return
}
}
}

func (t *Batch) asyncCommit() {
numBlocks := len(t.blocks)
if numBlocks == 0 || t.commitError != nil {
return
}
if t.activeCommits >= ParallelBatchCommits {
err := <-t.commitResults
t.activeCommits--

if err != nil {
t.commitError = err
return
}
}
go func(b []blocks.Block) {
_, err := t.ds.Blocks.AddBlocks(b)
t.commitResults <- err
}(t.blocks)

t.activeCommits++
t.blocks = make([]blocks.Block, 0, numBlocks)
t.size = 0

return
}

// Add adds a node to the batch and commits the batch if necessary.
func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
// Not strictly necessary but allows us to catch errors early.
t.processResults()
if t.commitError != nil {
return nil, t.commitError
}

t.blocks = append(t.blocks, nd)
t.size += len(nd.RawData())
if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks {
t.asyncCommit()
}
return nd.Cid(), t.commitError
}

// Commit commits batched nodes.
func (t *Batch) Commit() error {
t.asyncCommit()
for t.activeCommits > 0 && t.commitError == nil {
err := <-t.commitResults
t.activeCommits--
if err != nil {
t.commitError = err
}
}

return t.commitError
}
31 changes: 3 additions & 28 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format"
blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
ipldcbor "gx/ipfs/QmWCs8kMecJwCPK8JThue8TjgM2ieJ2HjTLDu7Cv2NEmZi/go-ipld-cbor"
)

Expand Down Expand Up @@ -75,8 +74,9 @@ func (n *dagService) Add(nd node.Node) (*cid.Cid, error) {

func (n *dagService) Batch() *Batch {
return &Batch{
ds: n,
MaxSize: 8 << 20,
ds: n,
commitResults: make(chan error, ParallelBatchCommits),
MaxSize: 8 << 20,

// By default, only batch up to 128 nodes at a time.
// The current implementation of flatfs opens this many file
Expand Down Expand Up @@ -389,31 +389,6 @@ func (np *nodePromise) Get(ctx context.Context) (node.Node, error) {
}
}

type Batch struct {
ds *dagService

blocks []blocks.Block
size int
MaxSize int
MaxBlocks int
}

func (t *Batch) Add(nd node.Node) (*cid.Cid, error) {
t.blocks = append(t.blocks, nd)
t.size += len(nd.RawData())
if t.size > t.MaxSize || len(t.blocks) > t.MaxBlocks {
return nd.Cid(), t.Commit()
}
return nd.Cid(), nil
}

func (t *Batch) Commit() error {
_, err := t.ds.Blocks.AddBlocks(t.blocks)
t.blocks = nil
t.size = 0
return err
}

type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

// EnumerateChildren will walk the dag below the given root node and add all
Expand Down

0 comments on commit 9c06a0e

Please sign in to comment.