diff --git a/core/commands/add.go b/core/commands/add.go index be15b302476..db4e21e6a00 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb" - context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" cmds "github.com/ipfs/go-ipfs/commands" files "github.com/ipfs/go-ipfs/commands/files" @@ -16,6 +15,7 @@ import ( importer "github.com/ipfs/go-ipfs/importer" "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" + pin "github.com/ipfs/go-ipfs/pin" ft "github.com/ipfs/go-ipfs/unixfs" u "github.com/ipfs/go-ipfs/util" ) @@ -113,12 +113,16 @@ remains to be implemented. return } - err = n.Pinning.Pin(context.Background(), rootnd, true) + rnk, err := rootnd.Key() if err != nil { res.SetError(err, cmds.ErrNormal) return } + mp := n.Pinning.GetManual() + mp.RemovePinWithMode(rnk, pin.Indirect) + mp.PinWithMode(rnk, pin.Recursive) + err = n.Pinning.Flush() if err != nil { res.SetError(err, cmds.ErrNormal) @@ -214,7 +218,12 @@ remains to be implemented. } func add(n *core.IpfsNode, reader io.Reader) (*dag.Node, error) { - node, err := importer.BuildDagFromReader(reader, n.DAG, nil, chunk.DefaultSplitter) + node, err := importer.BuildDagFromReader( + reader, + n.DAG, + chunk.DefaultSplitter, + importer.PinIndirectCB(n.Pinning.GetManual()), + ) if err != nil { return nil, err } @@ -290,11 +299,13 @@ func addDir(n *core.IpfsNode, dir files.File, out chan interface{}, progress boo return nil, err } - _, err = n.DAG.Add(tree) + k, err := n.DAG.Add(tree) if err != nil { return nil, err } + n.Pinning.GetManual().PinWithMode(k, pin.Indirect) + return tree, nil } diff --git a/core/corehttp/gateway_handler.go b/core/corehttp/gateway_handler.go index efc35c445f1..4fc84dbc508 100644 --- a/core/corehttp/gateway_handler.go +++ b/core/corehttp/gateway_handler.go @@ -72,7 +72,7 @@ func (i *gatewayHandler) newDagFromReader(r io.Reader) (*dag.Node, error) { // TODO(cryptix): change and remove this helper once PR1136 is merged // return ufs.AddFromReader(i.node, r.Body) return importer.BuildDagFromReader( - r, i.node.DAG, i.node.Pinning.GetManual(), chunk.DefaultSplitter) + r, i.node.DAG, chunk.DefaultSplitter, importer.BasicPinnerCB(i.node.Pinning.GetManual())) } // TODO(btc): break this apart into separate handlers using a more expressive muxer diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 69acbe320e3..c1b9586c7a7 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -1,7 +1,6 @@ package coreunix import ( - "errors" "io" "io/ioutil" "os" @@ -29,15 +28,12 @@ func Add(n *core.IpfsNode, r io.Reader) (string, error) { dagNode, err := importer.BuildDagFromReader( r, n.DAG, - n.Pinning.GetManual(), // Fix this interface chunk.DefaultSplitter, + importer.BasicPinnerCB(n.Pinning.GetManual()), ) if err != nil { return "", err } - if err := n.Pinning.Flush(); err != nil { - return "", err - } k, err := dagNode.Key() if err != nil { return "", err @@ -53,18 +49,28 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) { return "", err } defer f.Close() + ff, err := files.NewSerialFile(root, f) if err != nil { return "", err } + dagnode, err := addFile(n, ff) if err != nil { return "", err } + k, err := dagnode.Key() if err != nil { return "", err } + + n.Pinning.GetManual().RemovePinWithMode(k, pin.Indirect) + err = n.Pinning.Flush() + if err != nil { + return "", err + } + return k.String(), nil } @@ -87,17 +93,14 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *merkle } func add(n *core.IpfsNode, reader io.Reader) (*merkledag.Node, error) { - mp, ok := n.Pinning.(pin.ManualPinner) - if !ok { - return nil, errors.New("invalid pinner type! expected manual pinner") - } - - node, err := importer.BuildDagFromReader(reader, n.DAG, mp, chunk.DefaultSplitter) - if err != nil { - return nil, err - } + mp := n.Pinning.GetManual() - err = n.Pinning.Flush() + node, err := importer.BuildDagFromReader( + reader, + n.DAG, + chunk.DefaultSplitter, + importer.PinIndirectCB(mp), + ) if err != nil { return nil, err } diff --git a/core/coreunix/metadata_test.go b/core/coreunix/metadata_test.go index e3a9e5441c4..f752bb4521c 100644 --- a/core/coreunix/metadata_test.go +++ b/core/coreunix/metadata_test.go @@ -37,7 +37,7 @@ func TestMetadata(t *testing.T) { data := make([]byte, 1000) u.NewTimeSeededRand().Read(data) r := bytes.NewReader(data) - nd, err := importer.BuildDagFromReader(r, ds, nil, chunk.DefaultSplitter) + nd, err := importer.BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil) if err != nil { t.Fatal(err) } diff --git a/fuse/readonly/ipfs_test.go b/fuse/readonly/ipfs_test.go index 8a139fc69bb..2f476568117 100644 --- a/fuse/readonly/ipfs_test.go +++ b/fuse/readonly/ipfs_test.go @@ -34,7 +34,7 @@ func randObj(t *testing.T, nd *core.IpfsNode, size int64) (*dag.Node, []byte) { buf := make([]byte, size) u.NewTimeSeededRand().Read(buf) read := bytes.NewReader(buf) - obj, err := importer.BuildTrickleDagFromReader(read, nd.DAG, nil, chunk.DefaultSplitter) + obj, err := importer.BuildTrickleDagFromReader(read, nd.DAG, chunk.DefaultSplitter, nil) if err != nil { t.Fatal(err) } diff --git a/importer/helpers/dagbuilder.go b/importer/helpers/dagbuilder.go index a5ca4e0a2e8..e6c7e721d31 100644 --- a/importer/helpers/dagbuilder.go +++ b/importer/helpers/dagbuilder.go @@ -5,6 +5,14 @@ import ( "github.com/ipfs/go-ipfs/pin" ) +// NodeCB is callback function for dag generation +// the `last` flag signifies whether or not this is the last +// (top-most root) node being added. useful for things like +// only pinning the first node recursively. +type NodeCB func(node *dag.Node, last bool) error + +var nilFunc NodeCB = func(_ *dag.Node, _ bool) error { return nil } + // DagBuilderHelper wraps together a bunch of objects needed to // efficiently create unixfs dag trees type DagBuilderHelper struct { @@ -13,6 +21,7 @@ type DagBuilderHelper struct { in <-chan []byte nextData []byte // the next item to return. maxlinks int + ncb NodeCB } type DagBuilderParams struct { @@ -22,18 +31,23 @@ type DagBuilderParams struct { // DAGService to write blocks to (required) Dagserv dag.DAGService - // Pinner to use for pinning files (optionally nil) - Pinner pin.ManualPinner + // Callback for each block added + NodeCB NodeCB } // Generate a new DagBuilderHelper from the given params, using 'in' as a // data source func (dbp *DagBuilderParams) New(in <-chan []byte) *DagBuilderHelper { + ncb := dbp.NodeCB + if ncb == nil { + ncb = nilFunc + } + return &DagBuilderHelper{ dserv: dbp.Dagserv, - mp: dbp.Pinner, in: in, maxlinks: dbp.Maxlinks, + ncb: ncb, } } @@ -125,17 +139,15 @@ func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) { return nil, err } - key, err := db.dserv.Add(dn) + _, err = db.dserv.Add(dn) if err != nil { return nil, err } - if db.mp != nil { - db.mp.PinWithMode(key, pin.Recursive) - err := db.mp.Flush() - if err != nil { - return nil, err - } + // node callback + err = db.ncb(dn, true) + if err != nil { + return nil, err } return dn, nil diff --git a/importer/helpers/helpers.go b/importer/helpers/helpers.go index 96aafab2da9..e6a1a2012e0 100644 --- a/importer/helpers/helpers.go +++ b/importer/helpers/helpers.go @@ -107,14 +107,15 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error { return err } - childkey, err := db.dserv.Add(childnode) + _, err = db.dserv.Add(childnode) if err != nil { return err } // Pin the child node indirectly - if db.mp != nil { - db.mp.PinWithMode(childkey, pin.Indirect) + err = db.ncb(childnode, false) + if err != nil { + return err } return nil diff --git a/importer/importer.go b/importer/importer.go index 3c70c443374..f499b190a4f 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -13,10 +13,10 @@ import ( trickle "github.com/ipfs/go-ipfs/importer/trickle" dag "github.com/ipfs/go-ipfs/merkledag" "github.com/ipfs/go-ipfs/pin" - "github.com/ipfs/go-ipfs/util" + u "github.com/ipfs/go-ipfs/util" ) -var log = util.Logger("importer") +var log = u.Logger("importer") // Builds a DAG from the given file, writing created blocks to disk as they are // created @@ -36,31 +36,60 @@ func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*da } defer f.Close() - return BuildDagFromReader(f, ds, mp, chunk.DefaultSplitter) + return BuildDagFromReader(f, ds, chunk.DefaultSplitter, BasicPinnerCB(mp)) } -func BuildDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) { +func BuildDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, ncb h.NodeCB) (*dag.Node, error) { // Start the splitter blkch := spl.Split(r) dbp := h.DagBuilderParams{ Dagserv: ds, Maxlinks: h.DefaultLinksPerBlock, - Pinner: mp, + NodeCB: ncb, } return bal.BalancedLayout(dbp.New(blkch)) } -func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) { +func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, ncb h.NodeCB) (*dag.Node, error) { // Start the splitter blkch := spl.Split(r) dbp := h.DagBuilderParams{ Dagserv: ds, Maxlinks: h.DefaultLinksPerBlock, - Pinner: mp, + NodeCB: ncb, } return trickle.TrickleLayout(dbp.New(blkch)) } + +func BasicPinnerCB(p pin.ManualPinner) h.NodeCB { + return func(n *dag.Node, last bool) error { + k, err := n.Key() + if err != nil { + return err + } + + if last { + p.PinWithMode(k, pin.Recursive) + return p.Flush() + } else { + p.PinWithMode(k, pin.Indirect) + return nil + } + } +} + +func PinIndirectCB(p pin.ManualPinner) h.NodeCB { + return func(n *dag.Node, last bool) error { + k, err := n.Key() + if err != nil { + return err + } + + p.PinWithMode(k, pin.Indirect) + return nil + } +} diff --git a/importer/importer_test.go b/importer/importer_test.go index f9e83f19dca..3641fb1b09b 100644 --- a/importer/importer_test.go +++ b/importer/importer_test.go @@ -17,7 +17,7 @@ import ( func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) { ds := mdtest.Mock(t) r := io.LimitReader(u.NewTimeSeededRand(), size) - nd, err := BuildDagFromReader(r, ds, nil, &chunk.SizeSplitter{blksize}) + nd, err := BuildDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil) if err != nil { t.Fatal(err) } @@ -27,7 +27,7 @@ func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGSe func getTrickleDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) { ds := mdtest.Mock(t) r := io.LimitReader(u.NewTimeSeededRand(), size) - nd, err := BuildTrickleDagFromReader(r, ds, nil, &chunk.SizeSplitter{blksize}) + nd, err := BuildTrickleDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil) if err != nil { t.Fatal(err) } @@ -40,7 +40,7 @@ func TestBalancedDag(t *testing.T) { u.NewTimeSeededRand().Read(buf) r := bytes.NewReader(buf) - nd, err := BuildDagFromReader(r, ds, nil, chunk.DefaultSplitter) + nd, err := BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil) if err != nil { t.Fatal(err) } diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index 07525b8911d..28b58f88303 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -156,7 +156,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) { spl := &chunk.SizeSplitter{512} - root, err := imp.BuildDagFromReader(read, dagservs[0], nil, spl) + root, err := imp.BuildDagFromReader(read, dagservs[0], spl, nil) if err != nil { t.Fatal(err) } diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go index e0e09f71157..bba3139cb76 100644 --- a/unixfs/mod/dagmodifier.go +++ b/unixfs/mod/dagmodifier.go @@ -11,6 +11,7 @@ import ( mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + imp "github.com/ipfs/go-ipfs/importer" chunk "github.com/ipfs/go-ipfs/importer/chunk" help "github.com/ipfs/go-ipfs/importer/helpers" trickle "github.com/ipfs/go-ipfs/importer/trickle" @@ -308,7 +309,7 @@ func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.No dbp := &help.DagBuilderParams{ Dagserv: dm.dagserv, Maxlinks: help.DefaultLinksPerBlock, - Pinner: dm.mp, + NodeCB: imp.BasicPinnerCB(dm.mp), } return trickle.TrickleAppend(node, dbp.New(blks)) diff --git a/unixfs/mod/dagmodifier_test.go b/unixfs/mod/dagmodifier_test.go index abc8268e361..3e2bea6cbf9 100644 --- a/unixfs/mod/dagmodifier_test.go +++ b/unixfs/mod/dagmodifier_test.go @@ -52,7 +52,7 @@ func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.Blocksto func getNode(t testing.TB, dserv mdag.DAGService, size int64, pinner pin.ManualPinner) ([]byte, *mdag.Node) { in := io.LimitReader(u.NewTimeSeededRand(), size) - node, err := imp.BuildTrickleDagFromReader(in, dserv, pinner, &chunk.SizeSplitter{500}) + node, err := imp.BuildTrickleDagFromReader(in, dserv, &chunk.SizeSplitter{500}, imp.BasicPinnerCB(pinner)) if err != nil { t.Fatal(err) }