Skip to content

Commit

Permalink
Merge pull request #4640 from ipfs/doc/godoc-importer
Browse files Browse the repository at this point in the history
Docs: golint-ify  "importers" module
  • Loading branch information
whyrusleeping authored Feb 3, 2018
2 parents cb24cfa + b382191 commit 8e6519b
Show file tree
Hide file tree
Showing 15 changed files with 139 additions and 56 deletions.
4 changes: 2 additions & 2 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ func (adder *Adder) add(reader io.Reader) (ipld.Node, error) {
}

if adder.Trickle {
return trickle.TrickleLayout(params.New(chnk))
return trickle.Layout(params.New(chnk))
}

return balanced.BalancedLayout(params.New(chnk))
return balanced.Layout(params.New(chnk))
}

// RootNode returns the root node of the Added.
Expand Down
2 changes: 1 addition & 1 deletion importer/balanced/balanced_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func buildTestDag(ds ipld.DAGService, spl chunk.Splitter) (*dag.ProtoNode, error
Maxlinks: h.DefaultLinksPerBlock,
}

nd, err := BalancedLayout(dbp.New(spl))
nd, err := Layout(dbp.New(spl))
if err != nil {
return nil, err
}
Expand Down
22 changes: 20 additions & 2 deletions importer/balanced/builder.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
// Package balanced provides methods to build balanced DAGs.
// In a balanced DAG, nodes are added to a single root
// until the maximum number of links is reached (with leaves
// being at depth 0). Then, a new root is created, and points to the
// old root, and incorporates a new child, which proceeds to be
// filled up (link) to more leaves. In all cases, the Data (chunks)
// is stored only at the leaves, with the rest of nodes only
// storing links to their children.
//
// In a balanced DAG, nodes fill their link capacity before
// creating new ones, thus depth only increases when the
// current tree is completely full.
//
// Balanced DAGs are generalistic DAGs in which all leaves
// are at the same distance from the root.
package balanced

import (
Expand All @@ -8,8 +23,11 @@ import (
ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format"
)

func BalancedLayout(db *h.DagBuilderHelper) (ipld.Node, error) {
var offset uint64 = 0
// Layout builds a balanced DAG. Data is stored at the leaves
// and depth only increases when the tree is full, that is, when
// the root node has reached the maximum number of links.
func Layout(db *h.DagBuilderHelper) (ipld.Node, error) {
var offset uint64
var root *h.UnixfsNode
for level := 0; !db.Done(); level++ {

Expand Down
3 changes: 3 additions & 0 deletions importer/chunk/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"strings"
)

// FromString returns a Splitter depending on the given string:
// it supports "default" (""), "size-{size}", "rabin", "rabin-{blocksize}" and
// "rabin-{min}-{avg}-{max}".
func FromString(r io.Reader, chunker string) (Splitter, error) {
switch {
case chunker == "" || chunker == "default":
Expand Down
9 changes: 9 additions & 0 deletions importer/chunk/rabin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,27 @@ import (
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/chunker"
)

// IpfsRabinPoly is the irreducible polynomial of degree 53 used by for Rabin.
var IpfsRabinPoly = chunker.Pol(17437180132763653)

// Rabin implements the Splitter interface and splits content with Rabin
// fingerprints.
type Rabin struct {
r *chunker.Chunker
reader io.Reader
}

// NewRabin creates a new Rabin splitter with the given
// average block size.
func NewRabin(r io.Reader, avgBlkSize uint64) *Rabin {
min := avgBlkSize / 3
max := avgBlkSize + (avgBlkSize / 2)

return NewRabinMinMax(r, min, avgBlkSize, max)
}

// NewRabinMinMax returns a new Rabin splitter which uses
// the given min, average and max block sizes.
func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin {
h := fnv.New32a()
ch := chunker.New(r, IpfsRabinPoly, h, avg, min, max)
Expand All @@ -31,6 +38,7 @@ func NewRabinMinMax(r io.Reader, min, avg, max uint64) *Rabin {
}
}

// NextBytes reads the next bytes from the reader and returns a slice.
func (r *Rabin) NextBytes() ([]byte, error) {
ch, err := r.r.Next()
if err != nil {
Expand All @@ -40,6 +48,7 @@ func (r *Rabin) NextBytes() ([]byte, error) {
return ch.Data, nil
}

// Reader returns the io.Reader associated to this Splitter.
func (r *Rabin) Reader() io.Reader {
return r.reader
}
2 changes: 1 addition & 1 deletion importer/chunk/rabin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestRabinChunkReuse(t *testing.T) {
ch2 := chunkData(t, data)

var extra int
for k, _ := range ch2 {
for k := range ch2 {
_, ok := ch1[k]
if !ok {
extra++
Expand Down
17 changes: 16 additions & 1 deletion importer/chunk/splitting.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
// package chunk implements streaming block splitters
// Package chunk implements streaming block splitters.
// Splitters read data from a reader and provide byte slices (chunks)
// The size and contents of these slices depend on the splitting method
// used.
package chunk

import (
Expand All @@ -10,25 +13,34 @@ import (

var log = logging.Logger("chunk")

// DefaultBlockSize is the chunk size that splitters produce (or aim to).
var DefaultBlockSize int64 = 1024 * 256

// A Splitter reads bytes from a Reader and creates "chunks" (byte slices)
// that can be used to build DAG nodes.
type Splitter interface {
Reader() io.Reader
NextBytes() ([]byte, error)
}

// SplitterGen is a splitter generator, given a reader.
type SplitterGen func(r io.Reader) Splitter

// DefaultSplitter returns a SizeSplitter with the DefaultBlockSize.
func DefaultSplitter(r io.Reader) Splitter {
return NewSizeSplitter(r, DefaultBlockSize)
}

// SizeSplitterGen returns a SplitterGen function which will create
// a splitter with the given size when called.
func SizeSplitterGen(size int64) SplitterGen {
return func(r io.Reader) Splitter {
return NewSizeSplitter(r, size)
}
}

// Chan returns a channel that receives each of the chunks produced
// by a splitter, along with another one for errors.
func Chan(s Splitter) (<-chan []byte, <-chan error) {
out := make(chan []byte)
errs := make(chan error, 1)
Expand Down Expand Up @@ -56,13 +68,15 @@ type sizeSplitterv2 struct {
err error
}

// NewSizeSplitter returns a new size-based Splitter with the given block size.
func NewSizeSplitter(r io.Reader, size int64) Splitter {
return &sizeSplitterv2{
r: r,
size: uint32(size),
}
}

// NextBytes produces a new chunk.
func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {
if ss.err != nil {
return nil, ss.err
Expand All @@ -85,6 +99,7 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {
}
}

// Reader returns the io.Reader associated to this Splitter.
func (ss *sizeSplitterv2) Reader() io.Reader {
return ss.r
}
50 changes: 30 additions & 20 deletions importer/helpers/dagbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type DagBuilderHelper struct {
prefix *cid.Prefix
}

// DagBuilderParams wraps configuration options to create a DagBuilderHelper
// from a chunk.Splitter.
type DagBuilderParams struct {
// Maximum number of links per intermediate node
Maxlinks int
Expand All @@ -48,8 +50,8 @@ type DagBuilderParams struct {
NoCopy bool
}

// Generate a new DagBuilderHelper from the given params, which data source comes
// from chunks object
// New generates a new DagBuilderHelper from the given params and a given
// chunk.Splitter as data source.
func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper {
db := &DagBuilderHelper{
dserv: dbp.Dagserv,
Expand Down Expand Up @@ -94,16 +96,15 @@ func (db *DagBuilderHelper) Done() bool {

// Next returns the next chunk of data to be inserted into the dag
// if it returns nil, that signifies that the stream is at an end, and
// that the current building operation should finish
// that the current building operation should finish.
func (db *DagBuilderHelper) Next() ([]byte, error) {
db.prepareNext() // idempotent
d := db.nextData
db.nextData = nil // signal we've consumed it
if db.recvdErr != nil {
return nil, db.recvdErr
} else {
return d, nil
}
return d, nil
}

// GetDagServ returns the dagservice object this Helper is using
Expand Down Expand Up @@ -132,8 +133,7 @@ func (db *DagBuilderHelper) newUnixfsBlock() *UnixfsNode {
}

// FillNodeLayer will add datanodes as children to the give node until
// at most db.indirSize ndoes are added
//
// at most db.indirSize nodes are added.
func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {

// while we have room AND we're not done
Expand All @@ -151,6 +151,9 @@ func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {
return nil
}

// GetNextDataNode builds a UnixFsNode with the data obtained from the
// Splitter, given the constraints (BlockSizeLimit, RawLeaves) specified
// when creating the DagBuilderHelper.
func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
data, err := db.Next()
if err != nil {
Expand All @@ -171,29 +174,31 @@ func (db *DagBuilderHelper) GetNextDataNode() (*UnixfsNode, error) {
rawnode: dag.NewRawNode(data),
raw: true,
}, nil
} else {
rawnode, err := dag.NewRawNodeWPrefix(data, *db.prefix)
if err != nil {
return nil, err
}
return &UnixfsNode{
rawnode: rawnode,
raw: true,
}, nil
}
} else {
blk := db.newUnixfsBlock()
blk.SetData(data)
return blk, nil
rawnode, err := dag.NewRawNodeWPrefix(data, *db.prefix)
if err != nil {
return nil, err
}
return &UnixfsNode{
rawnode: rawnode,
raw: true,
}, nil
}

blk := db.newUnixfsBlock()
blk.SetData(data)
return blk, nil
}

// SetPosInfo sets the offset information of a node using the fullpath and stat
// from the DagBuilderHelper.
func (db *DagBuilderHelper) SetPosInfo(node *UnixfsNode, offset uint64) {
if db.fullPath != "" {
node.SetPosInfo(offset, db.fullPath, db.stat)
}
}

// Add sends a node to the DAGService, and returns it.
func (db *DagBuilderHelper) Add(node *UnixfsNode) (ipld.Node, error) {
dn, err := node.GetDagNode()
if err != nil {
Expand All @@ -208,10 +213,15 @@ func (db *DagBuilderHelper) Add(node *UnixfsNode) (ipld.Node, error) {
return dn, nil
}

// Maxlinks returns the configured maximum number for links
// for nodes built with this helper.
func (db *DagBuilderHelper) Maxlinks() int {
return db.maxlinks
}

// Close has the DAGServce perform a batch Commit operation.
// It should be called at the end of the building process to make
// sure all data is persisted.
func (db *DagBuilderHelper) Close() error {
return db.batch.Commit()
}
15 changes: 11 additions & 4 deletions importer/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func (n *UnixfsNode) NumChildren() int {
return n.ufmt.NumChildren()
}

// Set replaces this UnixfsNode with another UnixfsNode
// Set replaces the current UnixfsNode with another one. It performs
// a shallow copy.
func (n *UnixfsNode) Set(other *UnixfsNode) {
n.node = other.node
n.raw = other.raw
Expand All @@ -97,7 +98,7 @@ func (n *UnixfsNode) GetChild(ctx context.Context, i int, ds ipld.DAGService) (*

// AddChild adds the given UnixfsNode as a child of the receiver.
// The passed in DagBuilderHelper is used to store the child node an
// pin it locally so it doesnt get lost
// pin it locally so it doesnt get lost.
func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error {
n.ufmt.AddBlockSize(child.FileSize())

Expand All @@ -118,23 +119,29 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error {
return err
}

// RemoveChild removes the child node at the given index
// RemoveChild deletes the child node at the given index.
func (n *UnixfsNode) RemoveChild(index int, dbh *DagBuilderHelper) {
n.ufmt.RemoveBlockSize(index)
n.node.SetLinks(append(n.node.Links()[:index], n.node.Links()[index+1:]...))
}

// SetData stores data in this node.
func (n *UnixfsNode) SetData(data []byte) {
n.ufmt.Data = data
}

// FileSize returns the total file size of this tree (including children)
// In the case of raw nodes, it returns the length of the
// raw data.
func (n *UnixfsNode) FileSize() uint64 {
if n.raw {
return uint64(len(n.rawnode.RawData()))
}
return n.ufmt.FileSize()
}

// SetPosInfo sets information about the offset of the data of this node in a
// filesystem file.
func (n *UnixfsNode) SetPosInfo(offset uint64, fullPath string, stat os.FileInfo) {
n.posInfo = &pi.PosInfo{
Offset: offset,
Expand All @@ -144,7 +151,7 @@ func (n *UnixfsNode) SetPosInfo(offset uint64, fullPath string, stat os.FileInfo
}

// GetDagNode fills out the proper formatting for the unixfs node
// inside of a DAG node and returns the dag node
// inside of a DAG node and returns the dag node.
func (n *UnixfsNode) GetDagNode() (ipld.Node, error) {
nd, err := n.getBaseDagNode()
if err != nil {
Expand Down
Loading

0 comments on commit 8e6519b

Please sign in to comment.