Skip to content

Commit

Permalink
rewrite add command to use dagwriter, moved a pinner into the dagwrit…
Browse files Browse the repository at this point in the history
…er for inline pinning
  • Loading branch information
whyrusleeping committed Oct 30, 2014
1 parent a29b9d3 commit 18ada93
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 4 deletions.
4 changes: 2 additions & 2 deletions core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func addDir(n *core.IpfsNode, fpath string, depth int, out io.Writer) (*dag.Node
}

func addFile(n *core.IpfsNode, fpath string, depth int, out io.Writer) (*dag.Node, error) {
root, err := importer.NewDagFromFile(fpath)
root, err := importer.NewDagFromFileWServer(fpath, n.DAG, n.Pinning)
if err != nil {
return nil, err
}
Expand All @@ -98,7 +98,7 @@ func addFile(n *core.IpfsNode, fpath string, depth int, out io.Writer) (*dag.Nod
log.Info("adding subblock: %s %s", l.Name, l.Hash.B58String())
}

return root, addNode(n, root, fpath, out)
return root, nil
}

// addNode adds the node to the graph + local storage
Expand Down
41 changes: 41 additions & 0 deletions importer/importer.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package importer

import (
"errors"
"fmt"
"io"
"os"

"github.com/jbenet/go-ipfs/importer/chunk"
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/pin"
ft "github.com/jbenet/go-ipfs/unixfs"
uio "github.com/jbenet/go-ipfs/unixfs/io"
"github.com/jbenet/go-ipfs/util"
)

Expand Down Expand Up @@ -72,3 +75,41 @@ func NewDagFromFile(fpath string) (*dag.Node, error) {

return NewDagFromReader(f)
}

func NewDagFromFileWServer(fpath string, dserv dag.DAGService, p pin.Pinner) (*dag.Node, error) {
stat, err := os.Stat(fpath)
if err != nil {
return nil, err
}

if stat.IsDir() {
return nil, fmt.Errorf("`%s` is a directory", fpath)
}

f, err := os.Open(fpath)
if err != nil {
return nil, err
}
defer f.Close()

return NewDagFromReaderWServer(f, dserv, p)
}

func NewDagFromReaderWServer(r io.Reader, dserv dag.DAGService, p pin.Pinner) (*dag.Node, error) {
dw := uio.NewDagWriter(dserv, chunk.DefaultSplitter)

mp, ok := p.(pin.ManualPinner)
if !ok {
return nil, errors.New("Needed to be passed a manual pinner!")
}
dw.Pinner = mp
_, err := io.Copy(dw, r)
if err != nil {
return nil, err
}
err = dw.Close()
if err != nil {
return nil, err
}
return dw.GetNode(), nil
}
26 changes: 26 additions & 0 deletions pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,28 @@ var recursePinDatastoreKey = ds.NewKey("/local/pins/recursive/keys")
var directPinDatastoreKey = ds.NewKey("/local/pins/direct/keys")
var indirectPinDatastoreKey = ds.NewKey("/local/pins/indirect/keys")

type PinMode int

const (
Recursive PinMode = iota
Direct
Indirect
)

type Pinner interface {
IsPinned(util.Key) bool
Pin(*mdag.Node, bool) error
Unpin(util.Key, bool) error
Flush() error
}

// ManualPinner is for manually editing the pin structure
// Use with care
type ManualPinner interface {
PinWithMode(util.Key, PinMode)
Pinner
}

type pinner struct {
lock sync.RWMutex
recursePin set.BlockSet
Expand Down Expand Up @@ -228,3 +243,14 @@ func loadSet(d ds.Datastore, k ds.Key, val interface{}) error {
}
return json.Unmarshal(bf, val)
}

func (p *pinner) PinWithMode(k util.Key, mode PinMode) {
switch mode {
case Recursive:
p.recursePin.AddBlock(k)
case Direct:
p.directPin.AddBlock(k)
case Indirect:
p.indirPin.Increment(k)
}
}
12 changes: 10 additions & 2 deletions unixfs/io/dagwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io
import (
"github.com/jbenet/go-ipfs/importer/chunk"
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/pin"
ft "github.com/jbenet/go-ipfs/unixfs"
"github.com/jbenet/go-ipfs/util"
)
Expand All @@ -17,6 +18,7 @@ type DagWriter struct {
done chan struct{}
splitter chunk.BlockSplitter
seterr error
Pinner pin.ManualPinner
}

func NewDagWriter(ds dag.DAGService, splitter chunk.BlockSplitter) *DagWriter {
Expand Down Expand Up @@ -48,7 +50,10 @@ func (dw *DagWriter) startSplitter() {
// Store the block size in the root node
mbf.AddBlockSize(uint64(len(blkData)))
node := &dag.Node{Data: ft.WrapData(blkData)}
_, err := dw.dagserv.Add(node)
nk, err := dw.dagserv.Add(node)
if dw.Pinner != nil {
dw.Pinner.PinWithMode(nk, pin.Indirect)
}
if err != nil {
dw.seterr = err
log.Critical("Got error adding created node to dagservice: %s", err)
Expand All @@ -75,12 +80,15 @@ func (dw *DagWriter) startSplitter() {
root.Data = data

// Add root node to the dagservice
_, err = dw.dagserv.Add(root)
rootk, err := dw.dagserv.Add(root)
if err != nil {
dw.seterr = err
log.Critical("Got error adding created node to dagservice: %s", err)
return
}
if dw.Pinner != nil {
dw.Pinner.PinWithMode(rootk, pin.Recursive)
}
dw.node = root
dw.done <- struct{}{}
}
Expand Down

0 comments on commit 18ada93

Please sign in to comment.