diff --git a/core/commands/add.go b/core/commands/add.go index be15b302476..5dffaff895e 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/cheggaaa/pb" - context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" cmds "github.com/ipfs/go-ipfs/commands" files "github.com/ipfs/go-ipfs/commands/files" @@ -16,6 +15,7 @@ import ( importer "github.com/ipfs/go-ipfs/importer" "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" + pin "github.com/ipfs/go-ipfs/pin" ft "github.com/ipfs/go-ipfs/unixfs" u "github.com/ipfs/go-ipfs/util" ) @@ -113,12 +113,16 @@ remains to be implemented. return } - err = n.Pinning.Pin(context.Background(), rootnd, true) + rnk, err := rootnd.Key() if err != nil { res.SetError(err, cmds.ErrNormal) return } + mp := n.Pinning.GetManual() + mp.RemovePinWithMode(rnk, pin.Indirect) + mp.PinWithMode(rnk, pin.Recursive) + err = n.Pinning.Flush() if err != nil { res.SetError(err, cmds.ErrNormal) @@ -214,7 +218,12 @@ remains to be implemented. } func add(n *core.IpfsNode, reader io.Reader) (*dag.Node, error) { - node, err := importer.BuildDagFromReader(reader, n.DAG, nil, chunk.DefaultSplitter) + node, err := importer.BuildDagFromReader( + reader, + n.DAG, + chunk.DefaultSplitter, + importer.PinIndirectCB(n.Pinning.GetManual()), + ) if err != nil { return nil, err } diff --git a/core/corehttp/gateway_handler.go b/core/corehttp/gateway_handler.go index efc35c445f1..4fc84dbc508 100644 --- a/core/corehttp/gateway_handler.go +++ b/core/corehttp/gateway_handler.go @@ -72,7 +72,7 @@ func (i *gatewayHandler) newDagFromReader(r io.Reader) (*dag.Node, error) { // TODO(cryptix): change and remove this helper once PR1136 is merged // return ufs.AddFromReader(i.node, r.Body) return importer.BuildDagFromReader( - r, i.node.DAG, i.node.Pinning.GetManual(), chunk.DefaultSplitter) + r, i.node.DAG, chunk.DefaultSplitter, importer.BasicPinnerCB(i.node.Pinning.GetManual())) } // TODO(btc): break this apart into separate handlers using a more expressive muxer diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 69acbe320e3..07105bdb0f5 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -1,7 +1,6 @@ package coreunix import ( - "errors" "io" "io/ioutil" "os" @@ -18,6 +17,7 @@ import ( "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/thirdparty/eventlog" unixfs "github.com/ipfs/go-ipfs/unixfs" + u "github.com/ipfs/go-ipfs/util" ) var log = eventlog.Logger("coreunix") @@ -29,15 +29,12 @@ func Add(n *core.IpfsNode, r io.Reader) (string, error) { dagNode, err := importer.BuildDagFromReader( r, n.DAG, - n.Pinning.GetManual(), // Fix this interface chunk.DefaultSplitter, + importer.BasicPinnerCB(n.Pinning.GetManual()), ) if err != nil { return "", err } - if err := n.Pinning.Flush(); err != nil { - return "", err - } k, err := dagNode.Key() if err != nil { return "", err @@ -53,18 +50,28 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) { return "", err } defer f.Close() + ff, err := files.NewSerialFile(root, f) if err != nil { return "", err } + dagnode, err := addFile(n, ff) if err != nil { return "", err } + k, err := dagnode.Key() if err != nil { return "", err } + + n.Pinning.GetManual().RemovePinWithMode(k, pin.Indirect) + err = n.Pinning.Flush() + if err != nil { + return "", err + } + return k.String(), nil } @@ -87,17 +94,17 @@ func AddWrapped(n *core.IpfsNode, r io.Reader, filename string) (string, *merkle } func add(n *core.IpfsNode, reader io.Reader) (*merkledag.Node, error) { - mp, ok := n.Pinning.(pin.ManualPinner) - if !ok { - return nil, errors.New("invalid pinner type! expected manual pinner") - } - - node, err := importer.BuildDagFromReader(reader, n.DAG, mp, chunk.DefaultSplitter) - if err != nil { - return nil, err - } + mp := n.Pinning.GetManual() - err = n.Pinning.Flush() + node, err := importer.BuildDagFromReader( + reader, + n.DAG, + chunk.DefaultSplitter, + func(k u.Key, root bool) error { + mp.PinWithMode(k, pin.Indirect) + return nil + }, + ) if err != nil { return nil, err } diff --git a/core/coreunix/metadata_test.go b/core/coreunix/metadata_test.go index e3a9e5441c4..f752bb4521c 100644 --- a/core/coreunix/metadata_test.go +++ b/core/coreunix/metadata_test.go @@ -37,7 +37,7 @@ func TestMetadata(t *testing.T) { data := make([]byte, 1000) u.NewTimeSeededRand().Read(data) r := bytes.NewReader(data) - nd, err := importer.BuildDagFromReader(r, ds, nil, chunk.DefaultSplitter) + nd, err := importer.BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil) if err != nil { t.Fatal(err) } diff --git a/fuse/readonly/ipfs_test.go b/fuse/readonly/ipfs_test.go index 8a139fc69bb..2f476568117 100644 --- a/fuse/readonly/ipfs_test.go +++ b/fuse/readonly/ipfs_test.go @@ -34,7 +34,7 @@ func randObj(t *testing.T, nd *core.IpfsNode, size int64) (*dag.Node, []byte) { buf := make([]byte, size) u.NewTimeSeededRand().Read(buf) read := bytes.NewReader(buf) - obj, err := importer.BuildTrickleDagFromReader(read, nd.DAG, nil, chunk.DefaultSplitter) + obj, err := importer.BuildTrickleDagFromReader(read, nd.DAG, chunk.DefaultSplitter, nil) if err != nil { t.Fatal(err) } diff --git a/importer/helpers/dagbuilder.go b/importer/helpers/dagbuilder.go index a5ca4e0a2e8..99d4de5c2f6 100644 --- a/importer/helpers/dagbuilder.go +++ b/importer/helpers/dagbuilder.go @@ -3,8 +3,13 @@ package helpers import ( dag "github.com/ipfs/go-ipfs/merkledag" "github.com/ipfs/go-ipfs/pin" + u "github.com/ipfs/go-ipfs/util" ) +type BlockCB func(u.Key, bool) error + +var nilFunc BlockCB = func(_ u.Key, _ bool) error { return nil } + // DagBuilderHelper wraps together a bunch of objects needed to // efficiently create unixfs dag trees type DagBuilderHelper struct { @@ -13,6 +18,7 @@ type DagBuilderHelper struct { in <-chan []byte nextData []byte // the next item to return. maxlinks int + bcb BlockCB } type DagBuilderParams struct { @@ -22,18 +28,23 @@ type DagBuilderParams struct { // DAGService to write blocks to (required) Dagserv dag.DAGService - // Pinner to use for pinning files (optionally nil) - Pinner pin.ManualPinner + // Callback for each block added + BlockCB BlockCB } // Generate a new DagBuilderHelper from the given params, using 'in' as a // data source func (dbp *DagBuilderParams) New(in <-chan []byte) *DagBuilderHelper { + bcb := dbp.BlockCB + if bcb == nil { + bcb = nilFunc + } + return &DagBuilderHelper{ dserv: dbp.Dagserv, - mp: dbp.Pinner, in: in, maxlinks: dbp.Maxlinks, + bcb: bcb, } } @@ -130,12 +141,10 @@ func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) { return nil, err } - if db.mp != nil { - db.mp.PinWithMode(key, pin.Recursive) - err := db.mp.Flush() - if err != nil { - return nil, err - } + // block callback + err = db.bcb(key, true) + if err != nil { + return nil, err } return dn, nil diff --git a/importer/helpers/helpers.go b/importer/helpers/helpers.go index 96aafab2da9..39605f93f1a 100644 --- a/importer/helpers/helpers.go +++ b/importer/helpers/helpers.go @@ -113,8 +113,9 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error { } // Pin the child node indirectly - if db.mp != nil { - db.mp.PinWithMode(childkey, pin.Indirect) + err = db.bcb(childkey, false) + if err != nil { + return err } return nil diff --git a/importer/importer.go b/importer/importer.go index 3c70c443374..c33b6091197 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -13,10 +13,10 @@ import ( trickle "github.com/ipfs/go-ipfs/importer/trickle" dag "github.com/ipfs/go-ipfs/merkledag" "github.com/ipfs/go-ipfs/pin" - "github.com/ipfs/go-ipfs/util" + u "github.com/ipfs/go-ipfs/util" ) -var log = util.Logger("importer") +var log = u.Logger("importer") // Builds a DAG from the given file, writing created blocks to disk as they are // created @@ -36,31 +36,50 @@ func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*da } defer f.Close() - return BuildDagFromReader(f, ds, mp, chunk.DefaultSplitter) + return BuildDagFromReader(f, ds, chunk.DefaultSplitter, BasicPinnerCB(mp)) } -func BuildDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) { +func BuildDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, bcb h.BlockCB) (*dag.Node, error) { // Start the splitter blkch := spl.Split(r) dbp := h.DagBuilderParams{ Dagserv: ds, Maxlinks: h.DefaultLinksPerBlock, - Pinner: mp, + BlockCB: bcb, } return bal.BalancedLayout(dbp.New(blkch)) } -func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) { +func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, bcb h.BlockCB) (*dag.Node, error) { // Start the splitter blkch := spl.Split(r) dbp := h.DagBuilderParams{ Dagserv: ds, Maxlinks: h.DefaultLinksPerBlock, - Pinner: mp, + BlockCB: bcb, } return trickle.TrickleLayout(dbp.New(blkch)) } + +func BasicPinnerCB(p pin.ManualPinner) h.BlockCB { + return func(k u.Key, root bool) error { + if root { + p.PinWithMode(k, pin.Recursive) + return p.Flush() + } else { + p.PinWithMode(k, pin.Indirect) + return nil + } + } +} + +func PinIndirectCB(p pin.ManualPinner) h.BlockCB { + return func(k u.Key, root bool) error { + p.PinWithMode(k, pin.Indirect) + return nil + } +} diff --git a/importer/importer_test.go b/importer/importer_test.go index f9e83f19dca..3641fb1b09b 100644 --- a/importer/importer_test.go +++ b/importer/importer_test.go @@ -17,7 +17,7 @@ import ( func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) { ds := mdtest.Mock(t) r := io.LimitReader(u.NewTimeSeededRand(), size) - nd, err := BuildDagFromReader(r, ds, nil, &chunk.SizeSplitter{blksize}) + nd, err := BuildDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil) if err != nil { t.Fatal(err) } @@ -27,7 +27,7 @@ func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGSe func getTrickleDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) { ds := mdtest.Mock(t) r := io.LimitReader(u.NewTimeSeededRand(), size) - nd, err := BuildTrickleDagFromReader(r, ds, nil, &chunk.SizeSplitter{blksize}) + nd, err := BuildTrickleDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil) if err != nil { t.Fatal(err) } @@ -40,7 +40,7 @@ func TestBalancedDag(t *testing.T) { u.NewTimeSeededRand().Read(buf) r := bytes.NewReader(buf) - nd, err := BuildDagFromReader(r, ds, nil, chunk.DefaultSplitter) + nd, err := BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil) if err != nil { t.Fatal(err) } diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index 07525b8911d..28b58f88303 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -156,7 +156,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) { spl := &chunk.SizeSplitter{512} - root, err := imp.BuildDagFromReader(read, dagservs[0], nil, spl) + root, err := imp.BuildDagFromReader(read, dagservs[0], spl, nil) if err != nil { t.Fatal(err) } diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go index e0e09f71157..78f7282fbeb 100644 --- a/unixfs/mod/dagmodifier.go +++ b/unixfs/mod/dagmodifier.go @@ -11,6 +11,7 @@ import ( mh "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + imp "github.com/ipfs/go-ipfs/importer" chunk "github.com/ipfs/go-ipfs/importer/chunk" help "github.com/ipfs/go-ipfs/importer/helpers" trickle "github.com/ipfs/go-ipfs/importer/trickle" @@ -308,7 +309,7 @@ func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.No dbp := &help.DagBuilderParams{ Dagserv: dm.dagserv, Maxlinks: help.DefaultLinksPerBlock, - Pinner: dm.mp, + BlockCB: imp.BasicPinnerCB(dm.mp), } return trickle.TrickleAppend(node, dbp.New(blks)) diff --git a/unixfs/mod/dagmodifier_test.go b/unixfs/mod/dagmodifier_test.go index abc8268e361..3e2bea6cbf9 100644 --- a/unixfs/mod/dagmodifier_test.go +++ b/unixfs/mod/dagmodifier_test.go @@ -52,7 +52,7 @@ func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.Blocksto func getNode(t testing.TB, dserv mdag.DAGService, size int64, pinner pin.ManualPinner) ([]byte, *mdag.Node) { in := io.LimitReader(u.NewTimeSeededRand(), size) - node, err := imp.BuildTrickleDagFromReader(in, dserv, pinner, &chunk.SizeSplitter{500}) + node, err := imp.BuildTrickleDagFromReader(in, dserv, &chunk.SizeSplitter{500}, imp.BasicPinnerCB(pinner)) if err != nil { t.Fatal(err) }