From 7b6d8a16fd139e5733c0ea9048f007e5baa0f781 Mon Sep 17 00:00:00 2001 From: rht Date: Sat, 3 Oct 2015 13:59:50 +0700 Subject: [PATCH] Move parts of `ipfs add` into core/coreunix License: MIT Signed-off-by: rht --- core/commands/add.go | 297 ++------------------------------------ core/commands/tar.go | 7 +- core/coreunix/add.go | 332 ++++++++++++++++++++++++++++++++++++++----- 3 files changed, 311 insertions(+), 325 deletions(-) diff --git a/core/commands/add.go b/core/commands/add.go index 51f529c17a2..8a350564748 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -3,34 +3,19 @@ package commands import ( "fmt" "io" - "path" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb" - ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - syncds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" - cxt "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/ipfs/go-ipfs/core/coreunix" - bstore "github.com/ipfs/go-ipfs/blocks/blockstore" - bserv "github.com/ipfs/go-ipfs/blockservice" cmds "github.com/ipfs/go-ipfs/commands" files "github.com/ipfs/go-ipfs/commands/files" core "github.com/ipfs/go-ipfs/core" - offline "github.com/ipfs/go-ipfs/exchange/offline" - importer "github.com/ipfs/go-ipfs/importer" - "github.com/ipfs/go-ipfs/importer/chunk" - dag "github.com/ipfs/go-ipfs/merkledag" - dagutils "github.com/ipfs/go-ipfs/merkledag/utils" - pin "github.com/ipfs/go-ipfs/pin" - ft "github.com/ipfs/go-ipfs/unixfs" u "github.com/ipfs/go-ipfs/util" ) // Error indicating the max depth has been exceded. var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded") -// how many bytes of progress to wait before sending a progress update message -const progressReaderIncrement = 1024 * 256 - const ( quietOptionName = "quiet" progressOptionName = "progress" @@ -41,12 +26,6 @@ const ( chunkerOptionName = "chunker" ) -type AddedObject struct { - Name string - Hash string `json:",omitempty"` - Bytes int64 `json:",omitempty"` -} - var AddCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Add an object to ipfs.", @@ -108,7 +87,6 @@ remains to be implemented. hidden, _, _ := req.Option(hiddenOptionName).Bool() chunker, _, _ := req.Option(chunkerOptionName).String() - e := dagutils.NewDagEditor(NewMemoryDagService(), newDirNode()) if hash { nilnode, err := core.NewNode(n.Context(), &core.BuildCfg{ //TODO: need this to be true or all files @@ -125,17 +103,12 @@ remains to be implemented. outChan := make(chan interface{}, 8) res.SetOutput((<-chan interface{})(outChan)) - fileAdder := adder{ - ctx: req.Context(), - node: n, - editor: e, - out: outChan, - chunker: chunker, - progress: progress, - hidden: hidden, - trickle: trickle, - wrap: wrap, - } + fileAdder := coreunix.NewAdder(req.Context(), n, outChan) + fileAdder.Chunker = chunker + fileAdder.Progress = progress + fileAdder.Hidden = hidden + fileAdder.Trickle = trickle + fileAdder.Wrap = wrap // addAllFiles loops over a convenience slice file to // add each file individually. e.g. 'ipfs add a b c' @@ -149,22 +122,12 @@ remains to be implemented. return nil // done } - if _, err := fileAdder.addFile(file); err != nil { + if _, err := fileAdder.AddFile(file); err != nil { return err } } } - pinRoot := func(rootnd *dag.Node) error { - rnk, err := rootnd.Key() - if err != nil { - return err - } - - n.Pinning.PinWithMode(rnk, pin.Recursive) - return n.Pinning.Flush() - } - addAllAndPin := func(f files.File) error { if err := addAllFiles(f); err != nil { return err @@ -172,19 +135,14 @@ remains to be implemented. if !hash { // copy intermediary nodes from editor to our actual dagservice - err := e.WriteOutputTo(n.DAG) + err := fileAdder.WriteOutputTo(n.DAG) if err != nil { log.Error("WRITE OUT: ", err) return err } } - rootnd, err := fileAdder.RootNode() - if err != nil { - return err - } - - return pinRoot(rootnd) + return fileAdder.PinRoot() } go func() { @@ -243,7 +201,7 @@ remains to be implemented. var totalProgress, prevFiles, lastBytes int64 for out := range outChan { - output := out.(*AddedObject) + output := out.(*coreunix.AddedObject) if len(output.Hash) > 0 { if showProgressBar { // clear progress bar line before we print "added x" output @@ -279,236 +237,5 @@ remains to be implemented. } } }, - Type: AddedObject{}, -} - -func NewMemoryDagService() dag.DAGService { - // build mem-datastore for editor's intermediary nodes - bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore())) - bsrv := bserv.New(bs, offline.Exchange(bs)) - return dag.NewDAGService(bsrv) -} - -// Internal structure for holding the switches passed to the `add` call -type adder struct { - ctx cxt.Context - node *core.IpfsNode - editor *dagutils.Editor - out chan interface{} - progress bool - hidden bool - trickle bool - wrap bool - chunker string - - nextUntitled int -} - -// Perform the actual add & pin locally, outputting results to reader -func add(n *core.IpfsNode, reader io.Reader, useTrickle bool, chunker string) (*dag.Node, error) { - chnk, err := chunk.FromString(reader, chunker) - if err != nil { - return nil, err - } - - var node *dag.Node - if useTrickle { - node, err = importer.BuildTrickleDagFromReader( - n.DAG, - chnk, - ) - } else { - node, err = importer.BuildDagFromReader( - n.DAG, - chnk, - ) - } - - if err != nil { - return nil, err - } - - return node, nil -} - -func (params *adder) RootNode() (*dag.Node, error) { - r := params.editor.GetNode() - - // if not wrapping, AND one root file, use that hash as root. - if !params.wrap && len(r.Links) == 1 { - var err error - r, err = r.Links[0].GetNode(params.ctx, params.editor.GetDagService()) - // no need to output, as we've already done so. - return r, err - } - - // otherwise need to output, as we have not. - err := outputDagnode(params.out, "", r) - return r, err -} - -func (params *adder) addNode(node *dag.Node, path string) error { - // patch it into the root - if path == "" { - key, err := node.Key() - if err != nil { - return err - } - - path = key.Pretty() - } - - if err := params.editor.InsertNodeAtPath(params.ctx, path, node, newDirNode); err != nil { - return err - } - - return outputDagnode(params.out, path, node) -} - -// Add the given file while respecting the params. -func (params *adder) addFile(file files.File) (*dag.Node, error) { - // Check if file is hidden - if fileIsHidden := files.IsHidden(file); fileIsHidden && !params.hidden { - log.Debugf("%s is hidden, skipping", file.FileName()) - return nil, &hiddenFileError{file.FileName()} - } - - // Check if "file" is actually a directory - if file.IsDirectory() { - return params.addDir(file) - } - - if s, ok := file.(*files.Symlink); ok { - sdata, err := ft.SymlinkData(s.Target) - if err != nil { - return nil, err - } - - dagnode := &dag.Node{Data: sdata} - _, err = params.node.DAG.Add(dagnode) - if err != nil { - return nil, err - } - - err = params.addNode(dagnode, s.FileName()) - return dagnode, err - } - - // if the progress flag was specified, wrap the file so that we can send - // progress updates to the client (over the output channel) - var reader io.Reader = file - if params.progress { - reader = &progressReader{file: file, out: params.out} - } - - dagnode, err := add(params.node, reader, params.trickle, params.chunker) - if err != nil { - return nil, err - } - - // patch it into the root - log.Infof("adding file: %s", file.FileName()) - err = params.addNode(dagnode, file.FileName()) - return dagnode, err -} - -func (params *adder) addDir(file files.File) (*dag.Node, error) { - tree := &dag.Node{Data: ft.FolderPBData()} - log.Infof("adding directory: %s", file.FileName()) - - for { - file, err := file.NextFile() - if err != nil && err != io.EOF { - return nil, err - } - if file == nil { - break - } - - node, err := params.addFile(file) - if _, ok := err.(*hiddenFileError); ok { - // hidden file error, set the node to nil for below - node = nil - } else if err != nil { - return nil, err - } - - if node != nil { - _, name := path.Split(file.FileName()) - - err = tree.AddNodeLink(name, node) - if err != nil { - return nil, err - } - } - } - - if err := params.addNode(tree, file.FileName()); err != nil { - return nil, err - } - - _, err := params.node.DAG.Add(tree) - if err != nil { - return nil, err - } - - return tree, nil -} - -// outputDagnode sends dagnode info over the output channel -func outputDagnode(out chan interface{}, name string, dn *dag.Node) error { - o, err := getOutput(dn) - if err != nil { - return err - } - - out <- &AddedObject{ - Hash: o.Hash, - Name: name, - } - - return nil -} - -type hiddenFileError struct { - fileName string -} - -func (e *hiddenFileError) Error() string { - return fmt.Sprintf("%s is a hidden file", e.fileName) -} - -type ignoreFileError struct { - fileName string -} - -func (e *ignoreFileError) Error() string { - return fmt.Sprintf("%s is an ignored file", e.fileName) -} - -type progressReader struct { - file files.File - out chan interface{} - bytes int64 - lastProgress int64 -} - -func (i *progressReader) Read(p []byte) (int, error) { - n, err := i.file.Read(p) - - i.bytes += int64(n) - if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF { - i.lastProgress = i.bytes - i.out <- &AddedObject{ - Name: i.file.FileName(), - Bytes: i.bytes, - } - } - - return n, err -} - -// TODO: generalize this to more than unix-fs nodes. -func newDirNode() *dag.Node { - return &dag.Node{Data: ft.FolderPBData()} + Type: coreunix.AddedObject{}, } diff --git a/core/commands/tar.go b/core/commands/tar.go index 0d6fc1318fa..53eaca12586 100644 --- a/core/commands/tar.go +++ b/core/commands/tar.go @@ -6,6 +6,7 @@ import ( cmds "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/coreunix" path "github.com/ipfs/go-ipfs/path" tar "github.com/ipfs/go-ipfs/tar" ) @@ -58,15 +59,15 @@ var tarAddCmd = &cmds.Command{ } fi.FileName() - res.SetOutput(&AddedObject{ + res.SetOutput(&coreunix.AddedObject{ Name: fi.FileName(), Hash: k.B58String(), }) }, - Type: AddedObject{}, + Type: coreunix.AddedObject{}, Marshalers: cmds.MarshalerMap{ cmds.Text: func(res cmds.Response) (io.Reader, error) { - o := res.Output().(*AddedObject) + o := res.Output().(*coreunix.AddedObject) return strings.NewReader(o.Hash), nil }, }, diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 8c762cb3f19..27f9ed620a6 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -1,17 +1,25 @@ package coreunix import ( + "fmt" "io" "io/ioutil" "os" gopath "path" + ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + syncds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + bstore "github.com/ipfs/go-ipfs/blocks/blockstore" + bserv "github.com/ipfs/go-ipfs/blockservice" + "github.com/ipfs/go-ipfs/exchange/offline" + importer "github.com/ipfs/go-ipfs/importer" + "github.com/ipfs/go-ipfs/importer/chunk" + dagutils "github.com/ipfs/go-ipfs/merkledag/utils" + "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/commands/files" core "github.com/ipfs/go-ipfs/core" - importer "github.com/ipfs/go-ipfs/importer" - chunk "github.com/ipfs/go-ipfs/importer/chunk" merkledag "github.com/ipfs/go-ipfs/merkledag" "github.com/ipfs/go-ipfs/thirdparty/eventlog" unixfs "github.com/ipfs/go-ipfs/unixfs" @@ -19,22 +27,146 @@ import ( var log = eventlog.Logger("coreunix") +// how many bytes of progress to wait before sending a progress update message +const progressReaderIncrement = 1024 * 256 + +type Link struct { + Name, Hash string + Size uint64 +} + +type Object struct { + Hash string + Links []Link +} + +type hiddenFileError struct { + fileName string +} + +func (e *hiddenFileError) Error() string { + return fmt.Sprintf("%s is a hidden file", e.fileName) +} + +type ignoreFileError struct { + fileName string +} + +func (e *ignoreFileError) Error() string { + return fmt.Sprintf("%s is an ignored file", e.fileName) +} + +type AddedObject struct { + Name string + Hash string `json:",omitempty"` + Bytes int64 `json:",omitempty"` +} + +func NewAdder(ctx context.Context, n *core.IpfsNode, out chan interface{}) *Adder { + e := dagutils.NewDagEditor(NewMemoryDagService(), newDirNode()) + return &Adder{ + ctx: ctx, + node: n, + editor: e, + out: out, + Progress: false, + Hidden: true, + Pin: true, + Trickle: false, + Wrap: false, + Chunker: "", + } +} + +// Internal structure for holding the switches passed to the `add` call +type Adder struct { + ctx context.Context + node *core.IpfsNode + editor *dagutils.Editor + out chan interface{} + Progress bool + Hidden bool + Pin bool + Trickle bool + Wrap bool + Chunker string + root *merkledag.Node +} + +// Perform the actual add & pin locally, outputting results to reader +func (params Adder) add(reader io.Reader) (*merkledag.Node, error) { + chnk, err := chunk.FromString(reader, params.Chunker) + if err != nil { + return nil, err + } + + if params.Trickle { + return importer.BuildTrickleDagFromReader( + params.node.DAG, + chnk, + ) + } + return importer.BuildDagFromReader( + params.node.DAG, + chnk, + ) +} + +func (params *Adder) RootNode() (*merkledag.Node, error) { + // for memoizing + if params.root != nil { + return params.root, nil + } + + root := params.editor.GetNode() + + // if not wrapping, AND one root file, use that hash as root. + if !params.Wrap && len(root.Links) == 1 { + var err error + root, err = root.Links[0].GetNode(params.ctx, params.editor.GetDagService()) + params.root = root + // no need to output, as we've already done so. + return root, err + } + + // otherwise need to output, as we have not. + err := outputDagnode(params.out, "", root) + params.root = root + return root, err +} + +func (params *Adder) PinRoot() error { + root, err := params.RootNode() + if err != nil { + return err + } + + rnk, err := root.Key() + if err != nil { + return err + } + + params.node.Pinning.PinWithMode(rnk, pin.Recursive) + return params.node.Pinning.Flush() +} + +func (params *Adder) WriteOutputTo(DAG merkledag.DAGService) error { + return params.editor.WriteOutputTo(DAG) +} + // Add builds a merkledag from the a reader, pinning all objects to the local // datastore. Returns a key representing the root node. func Add(n *core.IpfsNode, r io.Reader) (string, error) { unlock := n.Blockstore.PinLock() defer unlock() - // TODO more attractive function signature importer.BuildDagFromReader + fileAdder := NewAdder(n.Context(), n, nil) - dagNode, err := importer.BuildDagFromReader( - n.DAG, - chunk.NewSizeSplitter(r, chunk.DefaultBlockSize), - ) + node, err := fileAdder.add(r) if err != nil { return "", err } - k, err := dagNode.Key() + k, err := node.Key() if err != nil { return "", err } @@ -58,7 +190,9 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) { } defer f.Close() - dagnode, err := addFile(n, f) + fileAdder := NewAdder(n.Context(), n, nil) + + dagnode, err := fileAdder.AddFile(f) if err != nil { return "", err } @@ -78,10 +212,11 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) { func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *merkledag.Node, error) { file := files.NewReaderFile(filename, filename, ioutil.NopCloser(r), nil) dir := files.NewSliceFile("", "", []files.File{file}) + fileAdder := NewAdder(n.Context(), n, nil) unlock := n.Blockstore.PinLock() defer unlock() - dagnode, err := addDir(n, dir) + dagnode, err := fileAdder.addDir(dir) if err != nil { return "", nil, err } @@ -92,46 +227,88 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *merkle return gopath.Join(k.String(), filename), dagnode, nil } -func add(n *core.IpfsNode, reader io.Reader) (*merkledag.Node, error) { - return importer.BuildDagFromReader( - n.DAG, - chunk.DefaultSplitter(reader), - ) -} +func (params *Adder) addNode(node *merkledag.Node, path string) error { + // patch it into the root + if path == "" { + key, err := node.Key() + if err != nil { + return err + } + + path = key.Pretty() + } -func addNode(n *core.IpfsNode, node *merkledag.Node) error { - if err := n.DAG.AddRecursive(node); err != nil { // add the file to the graph + local storage + if err := params.editor.InsertNodeAtPath(params.ctx, path, node, newDirNode); err != nil { return err } - ctx, cancel := context.WithCancel(n.Context()) - defer cancel() - err := n.Pinning.Pin(ctx, node, true) // ensure we keep it - return err + + return outputDagnode(params.out, path, node) } -func addFile(n *core.IpfsNode, file files.File) (*merkledag.Node, error) { - if file.IsDirectory() { - return addDir(n, file) +// Add the given file while respecting the params. +func (params *Adder) AddFile(file files.File) (*merkledag.Node, error) { + switch { + case files.IsHidden(file) && !params.Hidden: + log.Debugf("%s is hidden, skipping", file.FileName()) + return nil, &hiddenFileError{file.FileName()} + case file.IsDirectory(): + return params.addDir(file) } - return add(n, file) -} -func addDir(n *core.IpfsNode, dir files.File) (*merkledag.Node, error) { + // case for symlink + if s, ok := file.(*files.Symlink); ok { + sdata, err := unixfs.SymlinkData(s.Target) + if err != nil { + return nil, err + } - tree := &merkledag.Node{Data: unixfs.FolderPBData()} + dagnode := &merkledag.Node{Data: sdata} + _, err = params.node.DAG.Add(dagnode) + if err != nil { + return nil, err + } + + err = params.addNode(dagnode, s.FileName()) + return dagnode, err + } + + // case for regular file + // if the progress flag was specified, wrap the file so that we can send + // progress updates to the client (over the output channel) + var reader io.Reader = file + if params.Progress { + reader = &progressReader{file: file, out: params.out} + } + + dagnode, err := params.add(reader) + if err != nil { + return nil, err + } + + // patch it into the root + log.Infof("adding file: %s", file.FileName()) + err = params.addNode(dagnode, file.FileName()) + return dagnode, err +} + +func (params *Adder) addDir(dir files.File) (*merkledag.Node, error) { + tree := newDirNode() + log.Infof("adding directory: %s", dir.FileName()) -Loop: for { file, err := dir.NextFile() - switch { - case err != nil && err != io.EOF: + if err != nil && err != io.EOF { return nil, err - case err == io.EOF: - break Loop + } + if file == nil { + break } - node, err := addFile(n, file) - if err != nil { + node, err := params.AddFile(file) + if _, ok := err.(*hiddenFileError); ok { + // hidden file error, skip file + continue + } else if err != nil { return nil, err } @@ -142,8 +319,89 @@ Loop: } } - if err := addNode(n, tree); err != nil { + if err := params.addNode(tree, dir.FileName()); err != nil { + return nil, err + } + + if _, err := params.node.DAG.Add(tree); err != nil { return nil, err } + return tree, nil } + +// outputDagnode sends dagnode info over the output channel +func outputDagnode(out chan interface{}, name string, dn *merkledag.Node) error { + if out == nil { + return nil + } + + o, err := getOutput(dn) + if err != nil { + return err + } + + out <- &AddedObject{ + Hash: o.Hash, + Name: name, + } + + return nil +} + +func NewMemoryDagService() merkledag.DAGService { + // build mem-datastore for editor's intermediary nodes + bs := bstore.NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore())) + bsrv := bserv.New(bs, offline.Exchange(bs)) + return merkledag.NewDAGService(bsrv) +} + +// TODO: generalize this to more than unix-fs nodes. +func newDirNode() *merkledag.Node { + return &merkledag.Node{Data: unixfs.FolderPBData()} +} + +// from core/commands/object.go +func getOutput(dagnode *merkledag.Node) (*Object, error) { + key, err := dagnode.Key() + if err != nil { + return nil, err + } + + output := &Object{ + Hash: key.Pretty(), + Links: make([]Link, len(dagnode.Links)), + } + + for i, link := range dagnode.Links { + output.Links[i] = Link{ + Name: link.Name, + Hash: link.Hash.B58String(), + Size: link.Size, + } + } + + return output, nil +} + +type progressReader struct { + file files.File + out chan interface{} + bytes int64 + lastProgress int64 +} + +func (i *progressReader) Read(p []byte) (int, error) { + n, err := i.file.Read(p) + + i.bytes += int64(n) + if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF { + i.lastProgress = i.bytes + i.out <- &AddedObject{ + Name: i.file.FileName(), + Bytes: i.bytes, + } + } + + return n, err +}