diff --git a/core/coreunix/add.go b/core/coreunix/add.go index b7841808d3f..c0c247a0e15 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -148,10 +148,10 @@ func (adder *Adder) add(reader io.Reader) (ipld.Node, error) { } if adder.Trickle { - return trickle.TrickleLayout(params.New(chnk)) + return trickle.Layout(params.New(chnk)) } - return balanced.BalancedLayout(params.New(chnk)) + return balanced.Layout(params.New(chnk)) } // RootNode returns the root node of the Added. diff --git a/importer/balanced/balanced_test.go b/importer/balanced/balanced_test.go index 8a9cee75eae..c787cce1888 100644 --- a/importer/balanced/balanced_test.go +++ b/importer/balanced/balanced_test.go @@ -27,7 +27,7 @@ func buildTestDag(ds ipld.DAGService, spl chunk.Splitter) (*dag.ProtoNode, error Maxlinks: h.DefaultLinksPerBlock, } - nd, err := BalancedLayout(dbp.New(spl)) + nd, err := Layout(dbp.New(spl)) if err != nil { return nil, err } diff --git a/importer/balanced/builder.go b/importer/balanced/builder.go index 63d725a7354..c15c56c62de 100644 --- a/importer/balanced/builder.go +++ b/importer/balanced/builder.go @@ -1,3 +1,18 @@ +// Package balanced provides methods to build balanced DAGs. +// In a balanced DAG, nodes are added to a single root +// until the maximum number of links is reached (with leaves +// being at depth 0). Then, a new root is created, and points to the +// old root, and incorporates a new child, which proceeds to be +// filled up (link) to more leaves. In all cases, the Data (chunks) +// is stored only at the leaves, with the rest of nodes only +// storing links to their children. +// +// In a balanced DAG, nodes fill their link capacity before +// creating new ones, thus depth only increases when the +// current tree is completely full. +// +// Balanced DAGs are generalistic DAGs in which all leaves +// are at the same distance from the root. package balanced import ( @@ -8,8 +23,11 @@ import ( ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format" ) -func BalancedLayout(db *h.DagBuilderHelper) (ipld.Node, error) { - var offset uint64 = 0 +// Layout builds a balanced DAG. Data is stored at the leaves +// and depth only increases when the tree is full, that is, when +// the root node has reached the maximum number of links. +func Layout(db *h.DagBuilderHelper) (ipld.Node, error) { + var offset uint64 var root *h.UnixfsNode for level := 0; !db.Done(); level++ { diff --git a/importer/chunk/parse.go b/importer/chunk/parse.go index f4cc5629080..7d511c21700 100644 --- a/importer/chunk/parse.go +++ b/importer/chunk/parse.go @@ -8,6 +8,9 @@ import ( "strings" ) +// FromString returns a Splitter depending on the given string: +// it supports "default" (""), "size-{size}", "rabin", "rabin-{blocksize}" and +// "rabin-{min}-{avg}-{max}". func FromString(r io.Reader, chunker string) (Splitter, error) { switch { case chunker == "" || chunker == "default": diff --git a/importer/chunk/rabin.go b/importer/chunk/rabin.go index d2d71460d34..c3d1ebdba0a 100644 --- a/importer/chunk/rabin.go +++ b/importer/chunk/rabin.go @@ -7,13 +7,18 @@ import ( "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/chunker" ) +// IpfsRabinPoly is the irreducible polynomial of degree 53 used by for Rabin. var IpfsRabinPoly = chunker.Pol(17437180132763653) +// Rabin implements the Splitter interface and splits content with Rabin +// fingerprints. type Rabin struct { r *chunker.Chunker reader io.Reader } +// NewRabin creates a new Rabin splitter with the given +// average block size. func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin { min := avgBlkSize / 3 max := avgBlkSize + (avgBlkSize / 2) @@ -21,6 +26,8 @@ func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin { return NewRabinMinMax(r, min, avgBlkSize, max) } +// NewRabinMinMax returns a new Rabin splitter which uses +// the given min, average and max block sizes. func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin { h := fnv.New32a() ch := chunker.New(r, IpfsRabinPoly, h, avg, min, max) @@ -31,6 +38,7 @@ func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin { } } +// NextBytes reads the next bytes from the reader and returns a slice. func (r *Rabin) NextBytes() ([]byte, error) { ch, err := r.r.Next() if err != nil { @@ -40,6 +48,7 @@ func (r *Rabin) NextBytes() ([]byte, error) { return ch.Data, nil } +// Reader returns the io.Reader associated to this Splitter. func (r *Rabin) Reader() io.Reader { return r.reader } diff --git a/importer/chunk/rabin_test.go b/importer/chunk/rabin_test.go index 1d5702e3834..2f68f01c440 100644 --- a/importer/chunk/rabin_test.go +++ b/importer/chunk/rabin_test.go @@ -68,7 +68,7 @@ func TestRabinChunkReuse(t *testing.T) { ch2 := chunkData(t, data) var extra int - for k, _ := range ch2 { + for k := range ch2 { _, ok := ch1[k] if !ok { extra++ diff --git a/importer/chunk/splitting.go b/importer/chunk/splitting.go index ddd71e969ca..9586df85024 100644 --- a/importer/chunk/splitting.go +++ b/importer/chunk/splitting.go @@ -1,4 +1,7 @@ -// package chunk implements streaming block splitters +// Package chunk implements streaming block splitters. +// Splitters read data from a reader and provide byte slices (chunks) +// The size and contents of these slices depend on the splitting method +// used. package chunk import ( @@ -10,25 +13,34 @@ import ( var log = logging.Logger("chunk") +// DefaultBlockSize is the chunk size that splitters produce (or aim to). var DefaultBlockSize int64 = 1024 * 256 +// A Splitter reads bytes from a Reader and creates "chunks" (byte slices) +// that can be used to build DAG nodes. type Splitter interface { Reader() io.Reader NextBytes() ([]byte, error) } +// SplitterGen is a splitter generator, given a reader. type SplitterGen func(r io.Reader) Splitter +// DefaultSplitter returns a SizeSplitter with the DefaultBlockSize. func DefaultSplitter(r io.Reader) Splitter { return NewSizeSplitter(r, DefaultBlockSize) } +// SizeSplitterGen returns a SplitterGen function which will create +// a splitter with the given size when called. func SizeSplitterGen(size int64) SplitterGen { return func(r io.Reader) Splitter { return NewSizeSplitter(r, size) } } +// Chan returns a channel that receives each of the chunks produced +// by a splitter, along with another one for errors. func Chan(s Splitter) (<-chan []byte, <-chan error) { out := make(chan []byte) errs := make(chan error, 1) @@ -56,6 +68,7 @@ type sizeSplitterv2 struct { err error } +// NewSizeSplitter returns a new size-based Splitter with the given block size. func NewSizeSplitter(r io.Reader, size int64) Splitter { return &sizeSplitterv2{ r: r, @@ -63,6 +76,7 @@ func NewSizeSplitter(r io.Reader, size int64) Splitter { } } +// NextBytes produces a new chunk. func (ss *sizeSplitterv2) NextBytes() ([]byte, error) { if ss.err != nil { return nil, ss.err @@ -85,6 +99,7 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) { } } +// Reader returns the io.Reader associated to this Splitter. func (ss *sizeSplitterv2) Reader() io.Reader { return ss.r } diff --git a/importer/helpers/dagbuilder.go b/importer/helpers/dagbuilder.go index 7d8375b0ad4..ad36bfcefd9 100644 --- a/importer/helpers/dagbuilder.go +++ b/importer/helpers/dagbuilder.go @@ -29,6 +29,8 @@ type DagBuilderHelper struct { prefix *cid.Prefix } +// DagBuilderParams wraps configuration options to create a DagBuilderHelper +// from a chunk.Splitter. type DagBuilderParams struct { // Maximum number of links per intermediate node Maxlinks int @@ -48,8 +50,8 @@ type DagBuilderParams struct { NoCopy bool } -// Generate a new DagBuilderHelper from the given params, which data source comes -// from chunks object +// New generates a new DagBuilderHelper from the given params and a given +// chunk.Splitter as data source. func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper { db := &DagBuilderHelper{ dserv: dbp.Dagserv, @@ -94,16 +96,15 @@ func (db *DagBuilderHelper) Done() bool { // Next returns the next chunk of data to be inserted into the dag // if it returns nil, that signifies that the stream is at an end, and -// that the current building operation should finish +// that the current building operation should finish. func (db *DagBuilderHelper) Next() ([]byte, error) { db.prepareNext() // idempotent d := db.nextData db.nextData = nil // signal we've consumed it if db.recvdErr != nil { return nil, db.recvdErr - } else { - return d, nil } + return d, nil } // GetDagServ returns the dagservice object this Helper is using @@ -132,8 +133,7 @@ func (db *DagBuilderHelper) newUnixfsBlock() *UnixfsNode { } // FillNodeLayer will add datanodes as children to the give node until -// at most db.indirSize ndoes are added -// +// at most db.indirSize nodes are added. func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error { // while we have room AND we're not done @@ -151,6 +151,9 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error { return nil } +// GetNextDataNode builds a UnixFsNode with the data obtained from the +// Splitter, given the constraints (BlockSizeLimit, RawLeaves) specified +// when creating the DagBuilderHelper. func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) { data, err := db.Next() if err != nil { @@ -171,29 +174,31 @@ func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) { rawnode: dag.NewRawNode(data), raw: true, }, nil - } else { - rawnode, err := dag.NewRawNodeWPrefix(data, *db.prefix) - if err != nil { - return nil, err - } - return &UnixfsNode{ - rawnode: rawnode, - raw: true, - }, nil } - } else { - blk := db.newUnixfsBlock() - blk.SetData(data) - return blk, nil + rawnode, err := dag.NewRawNodeWPrefix(data, *db.prefix) + if err != nil { + return nil, err + } + return &UnixfsNode{ + rawnode: rawnode, + raw: true, + }, nil } + + blk := db.newUnixfsBlock() + blk.SetData(data) + return blk, nil } +// SetPosInfo sets the offset information of a node using the fullpath and stat +// from the DagBuilderHelper. func (db *DagBuilderHelper) SetPosInfo(node *UnixfsNode, offset uint64) { if db.fullPath != "" { node.SetPosInfo(offset, db.fullPath, db.stat) } } +// Add sends a node to the DAGService, and returns it. func (db *DagBuilderHelper) Add(node *UnixfsNode) (ipld.Node, error) { dn, err := node.GetDagNode() if err != nil { @@ -208,10 +213,15 @@ func (db *DagBuilderHelper) Add(node *UnixfsNode) (ipld.Node, error) { return dn, nil } +// Maxlinks returns the configured maximum number for links +// for nodes built with this helper. func (db *DagBuilderHelper) Maxlinks() int { return db.maxlinks } +// Close has the DAGServce perform a batch Commit operation. +// It should be called at the end of the building process to make +// sure all data is persisted. func (db *DagBuilderHelper) Close() error { return db.batch.Commit() } diff --git a/importer/helpers/helpers.go b/importer/helpers/helpers.go index 651a9449bc5..28630c822c6 100644 --- a/importer/helpers/helpers.go +++ b/importer/helpers/helpers.go @@ -70,7 +70,8 @@ func (n *UnixfsNode) NumChildren() int { return n.ufmt.NumChildren() } -// Set replaces this UnixfsNode with another UnixfsNode +// Set replaces the current UnixfsNode with another one. It performs +// a shallow copy. func (n *UnixfsNode) Set(other *UnixfsNode) { n.node = other.node n.raw = other.raw @@ -97,7 +98,7 @@ func (n *UnixfsNode) GetChild(ctx context.Context, i int, ds ipld.DAGService) (* // AddChild adds the given UnixfsNode as a child of the receiver. // The passed in DagBuilderHelper is used to store the child node an -// pin it locally so it doesnt get lost +// pin it locally so it doesnt get lost. func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error { n.ufmt.AddBlockSize(child.FileSize()) @@ -118,16 +119,20 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error { return err } -// RemoveChild removes the child node at the given index +// RemoveChild deletes the child node at the given index. func (n *UnixfsNode) RemoveChild(index int, dbh *DagBuilderHelper) { n.ufmt.RemoveBlockSize(index) n.node.SetLinks(append(n.node.Links()[:index], n.node.Links()[index+1:]...)) } +// SetData stores data in this node. func (n *UnixfsNode) SetData(data []byte) { n.ufmt.Data = data } +// FileSize returns the total file size of this tree (including children) +// In the case of raw nodes, it returns the length of the +// raw data. func (n *UnixfsNode) FileSize() uint64 { if n.raw { return uint64(len(n.rawnode.RawData())) @@ -135,6 +140,8 @@ func (n *UnixfsNode) FileSize() uint64 { return n.ufmt.FileSize() } +// SetPosInfo sets information about the offset of the data of this node in a +// filesystem file. func (n *UnixfsNode) SetPosInfo(offset uint64, fullPath string, stat os.FileInfo) { n.posInfo = &pi.PosInfo{ Offset: offset, @@ -144,7 +151,7 @@ func (n *UnixfsNode) SetPosInfo(offset uint64, fullPath string, stat os.FileInfo } // GetDagNode fills out the proper formatting for the unixfs node -// inside of a DAG node and returns the dag node +// inside of a DAG node and returns the dag node. func (n *UnixfsNode) GetDagNode() (ipld.Node, error) { nd, err := n.getBaseDagNode() if err != nil { diff --git a/importer/importer.go b/importer/importer.go index 4358ee39c93..7e53eb77603 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -6,17 +6,17 @@ import ( "fmt" "os" + "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit/files" + ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format" + bal "github.com/ipfs/go-ipfs/importer/balanced" "github.com/ipfs/go-ipfs/importer/chunk" h "github.com/ipfs/go-ipfs/importer/helpers" trickle "github.com/ipfs/go-ipfs/importer/trickle" - "gx/ipfs/QmceUdzxkimdYsgtX733uNgzf1DLHyBKN6ehGSp85ayppM/go-ipfs-cmdkit/files" - - ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format" ) // BuildDagFromFile builds a DAG from the given file, writing created blocks to -// disk as they are created +// disk as they are created. func BuildDagFromFile(fpath string, ds ipld.DAGService) (ipld.Node, error) { stat, err := os.Lstat(fpath) if err != nil { @@ -36,23 +36,24 @@ func BuildDagFromFile(fpath string, ds ipld.DAGService) (ipld.Node, error) { return BuildDagFromReader(ds, chunk.DefaultSplitter(f)) } -// BuildDagFromReader builds a DAG from the chunks returned by the given chunk -// splitter. +// BuildDagFromReader creates a DAG given a DAGService and a Splitter +// implementation (Splitters are io.Readers), using a Balanced layout. func BuildDagFromReader(ds ipld.DAGService, spl chunk.Splitter) (ipld.Node, error) { dbp := h.DagBuilderParams{ Dagserv: ds, Maxlinks: h.DefaultLinksPerBlock, } - return bal.BalancedLayout(dbp.New(spl)) + return bal.Layout(dbp.New(spl)) } -// BuildTrickleDagFromReader is similar to BuildDagFromReader but uses the trickle layout. +// BuildTrickleDagFromReader creates a DAG given a DAGService and a Splitter +// implementation (Splitters are io.Readers), using a Trickle Layout. func BuildTrickleDagFromReader(ds ipld.DAGService, spl chunk.Splitter) (ipld.Node, error) { dbp := h.DagBuilderParams{ Dagserv: ds, Maxlinks: h.DefaultLinksPerBlock, } - return trickle.TrickleLayout(dbp.New(spl)) + return trickle.Layout(dbp.New(spl)) } diff --git a/importer/trickle/trickle_test.go b/importer/trickle/trickle_test.go index 3791afe5c8d..618bf69ebfc 100644 --- a/importer/trickle/trickle_test.go +++ b/importer/trickle/trickle_test.go @@ -39,7 +39,7 @@ func buildTestDag(ds ipld.DAGService, spl chunk.Splitter, rawLeaves UseRawLeaves RawLeaves: bool(rawLeaves), } - nd, err := TrickleLayout(dbp.New(spl)) + nd, err := Layout(dbp.New(spl)) if err != nil { return nil, err } @@ -503,7 +503,7 @@ func testAppend(t *testing.T, rawLeaves UseRawLeaves) { r := bytes.NewReader(should[nbytes/2:]) ctx := context.Background() - nnode, err := TrickleAppend(ctx, nd, dbp.New(chunk.NewSizeSplitter(r, 500))) + nnode, err := Append(ctx, nd, dbp.New(chunk.NewSizeSplitter(r, 500))) if err != nil { t.Fatal(err) } @@ -564,7 +564,7 @@ func testMultipleAppends(t *testing.T, rawLeaves UseRawLeaves) { ctx := context.Background() for i := 0; i < len(should); i++ { - nnode, err := TrickleAppend(ctx, nd, dbp.New(spl(bytes.NewReader(should[i:i+1])))) + nnode, err := Append(ctx, nd, dbp.New(spl(bytes.NewReader(should[i:i+1])))) if err != nil { t.Fatal(err) } @@ -612,12 +612,12 @@ func TestAppendSingleBytesToEmpty(t *testing.T) { spl := chunk.SizeSplitterGen(500) ctx := context.Background() - nnode, err := TrickleAppend(ctx, nd, dbp.New(spl(bytes.NewReader(data[:1])))) + nnode, err := Append(ctx, nd, dbp.New(spl(bytes.NewReader(data[:1])))) if err != nil { t.Fatal(err) } - nnode, err = TrickleAppend(ctx, nnode, dbp.New(spl(bytes.NewReader(data[1:])))) + nnode, err = Append(ctx, nnode, dbp.New(spl(bytes.NewReader(data[1:])))) if err != nil { t.Fatal(err) } diff --git a/importer/trickle/trickledag.go b/importer/trickle/trickledag.go index ff5d28f663f..1c7c1c95f1d 100644 --- a/importer/trickle/trickledag.go +++ b/importer/trickle/trickledag.go @@ -1,3 +1,18 @@ +// Package trickle allows to build trickle DAGs. +// In this type of DAG, non-leave nodes are first filled +// with data leaves, and then incorporate "layers" of subtrees +// as additional links. +// +// Each layer is a trickle sub-tree and is limited by an increasing +// maxinum depth. Thus, the nodes first layer +// can only hold leaves (depth 1) but subsequent layers can grow deeper. +// By default, this module places 4 nodes per layer (that is, 4 subtrees +// of the same maxinum depth before increasing it). +// +// Trickle DAGs are very good for sequentially reading data, as the +// first data leaves are directly reachable from the root and those +// coming next are always nearby. They are +// suited for things like streaming applications. package trickle import ( @@ -18,7 +33,10 @@ import ( // improves seek speeds. const layerRepeat = 4 -func TrickleLayout(db *h.DagBuilderHelper) (ipld.Node, error) { +// Layout builds a new DAG with the trickle format using the provided +// DagBuilderHelper. See the module's description for a more detailed +// explanation. +func Layout(db *h.DagBuilderHelper) (ipld.Node, error) { root := db.NewUnixfsNode() if err := db.FillNodeLayer(root); err != nil { return nil, err @@ -68,17 +86,17 @@ func fillTrickleRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error return nil } -// TrickleAppend appends the data in `db` to the dag, using the Trickledag format -func TrickleAppend(ctx context.Context, basen ipld.Node, db *h.DagBuilderHelper) (out ipld.Node, err_out error) { +// Append appends the data in `db` to the dag, using the Trickledag format +func Append(ctx context.Context, basen ipld.Node, db *h.DagBuilderHelper) (out ipld.Node, errOut error) { base, ok := basen.(*dag.ProtoNode) if !ok { return nil, dag.ErrNotProtobuf } defer func() { - if err_out == nil { + if errOut == nil { if err := db.Close(); err != nil { - err_out = err + errOut = err } } }() @@ -148,7 +166,7 @@ func appendFillLastChild(ctx context.Context, ufsn *h.UnixfsNode, depth int, lay } // Fill out last child (may not be full tree) - nchild, err := trickleAppendRec(ctx, lastChild, db, depth-1) + nchild, err := appendRec(ctx, lastChild, db, depth-1) if err != nil { return err } @@ -179,8 +197,8 @@ func appendFillLastChild(ctx context.Context, ufsn *h.UnixfsNode, depth int, lay return nil } -// recursive call for TrickleAppend -func trickleAppendRec(ctx context.Context, ufsn *h.UnixfsNode, db *h.DagBuilderHelper, depth int) (*h.UnixfsNode, error) { +// recursive call for Append +func appendRec(ctx context.Context, ufsn *h.UnixfsNode, db *h.DagBuilderHelper, depth int) (*h.UnixfsNode, error) { if depth == 0 || db.Done() { return ufsn, nil } @@ -337,7 +355,7 @@ func verifyTDagRec(n ipld.Node, depth int, p VerifyParams) error { // Recursive trickle dags rdepth := ((i - p.Direct) / p.LayerRepeat) + 1 if rdepth >= depth && depth > 0 { - return errors.New("Child dag was too deep!") + return errors.New("child dag was too deep") } err := verifyTDagRec(child, rdepth, p) if err != nil { diff --git a/unixfs/format.go b/unixfs/format.go index 4157ec0e570..26d4b0cc3b7 100644 --- a/unixfs/format.go +++ b/unixfs/format.go @@ -174,6 +174,8 @@ func (n *FSNode) GetBytes() ([]byte, error) { return proto.Marshal(pbn) } +// FileSize returns the total size of this tree. That is, the size of +// the data in this node plus the size of all its children. func (n *FSNode) FileSize() uint64 { return uint64(len(n.Data)) + n.subtotal } diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go index ff2d51f2523..8bf9382ddf4 100644 --- a/unixfs/mod/dagmodifier.go +++ b/unixfs/mod/dagmodifier.go @@ -362,7 +362,7 @@ func (dm *DagModifier) appendData(nd ipld.Node, spl chunk.Splitter) (ipld.Node, Prefix: &dm.Prefix, RawLeaves: dm.RawLeaves, } - return trickle.TrickleAppend(dm.ctx, nd, dbp.New(spl)) + return trickle.Append(dm.ctx, nd, dbp.New(spl)) default: return nil, ErrNotUnixfs } diff --git a/unixfs/test/utils.go b/unixfs/test/utils.go index a50c4ecbea6..5e1977ddb23 100644 --- a/unixfs/test/utils.go +++ b/unixfs/test/utils.go @@ -63,7 +63,7 @@ func GetNode(t testing.TB, dserv ipld.DAGService, data []byte, opts NodeOpts) ip RawLeaves: opts.RawLeavesUsed, } - node, err := trickle.TrickleLayout(dbp.New(SizeSplitterGen(500)(in))) + node, err := trickle.Layout(dbp.New(SizeSplitterGen(500)(in))) if err != nil { t.Fatal(err) }