Skip to content

Commit

Permalink
WIP: batch fetching interfaces on dagservice
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
  • Loading branch information
whyrusleeping committed Oct 23, 2017
1 parent 786d81e commit db5b831
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 23 deletions.
7 changes: 5 additions & 2 deletions core/coreunix/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"context"

core "github.com/ipfs/go-ipfs/core"
mdag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path"
uio "github.com/ipfs/go-ipfs/unixfs/io"
)

func Cat(ctx context.Context, n *core.IpfsNode, pstr string) (uio.DagReader, error) {
ses := mdag.NewSession(ctx, n.Blocks)

r := &path.Resolver{
DAG: n.DAG,
DAG: ses,
ResolveOnce: uio.ResolveUnixfsOnce,
}

Expand All @@ -19,5 +22,5 @@ func Cat(ctx context.Context, n *core.IpfsNode, pstr string) (uio.DagReader, err
return nil, err
}

return uio.NewDagReader(ctx, dagNode, n.DAG)
return uio.NewDagReader(ctx, dagNode, ses)
}
67 changes: 64 additions & 3 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format"
blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
ipldcbor "gx/ipfs/QmWCs8kMecJwCPK8JThue8TjgM2ieJ2HjTLDu7Cv2NEmZi/go-ipld-cbor"
)

Expand Down Expand Up @@ -40,6 +41,12 @@ type DAGService interface {
LinkService
}

// TODO: should live in go-ipld-format, not here. See: https://github.com/ipfs/go-ipld-format/pull/8
type NodeFetcher interface {
Get(context.Context, *cid.Cid) (node.Node, error)
GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
}

type LinkService interface {
// GetLinks return all links for a node. The complete node does not
// necessarily have to exist locally, or at all. For example, raw
Expand Down Expand Up @@ -147,6 +154,11 @@ func GetLinksDirect(serv node.NodeGetter) GetLinks {
}
}

func NewSession(ctx context.Context, bs bserv.BlockService) NodeFetcher {
ses := bserv.NewSession(ctx, bs)
return &sesGetter{ses}
}

type sesGetter struct {
bs *bserv.Session
}
Expand All @@ -165,6 +177,10 @@ func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
return node.Decode(blk)
}

func (sg *sesGetter) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *NodeOption {
return getMany(ctx, sg.bs, cids)
}

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
var ng node.NodeGetter = serv
Expand Down Expand Up @@ -207,8 +223,16 @@ type NodeOption struct {
}

func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
return getMany(ctx, ds.Blocks, keys)
}

type blocksGetter interface {
GetBlocks(context.Context, []*cid.Cid) <-chan blocks.Block
}

func getMany(ctx context.Context, bg blocksGetter, keys []*cid.Cid) <-chan *NodeOption {
out := make(chan *NodeOption, len(keys))
blocks := ds.Blocks.GetBlocks(ctx, keys)
blocks := bg.GetBlocks(ctx, keys)
var count int

go func() {
Expand Down Expand Up @@ -244,7 +268,7 @@ func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *Node
// GetDAG will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
func GetDAG(ctx context.Context, ds NodeFetcher, root node.Node) []NodeGetter {
var cids []*cid.Cid
for _, lnk := range root.Links() {
cids = append(cids, lnk.Cid)
Expand All @@ -255,7 +279,7 @@ func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {

// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
func GetNodes(ctx context.Context, ds NodeFetcher, keys []*cid.Cid) []NodeGetter {

// Early out if no work to do
if len(keys) == 0 {
Expand Down Expand Up @@ -334,6 +358,7 @@ type nodePromise struct {
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
// TODO: this is a name clash with node.NodeGetter
type NodeGetter interface {
Get(context.Context) (node.Node, error)
Fail(err error)
Expand Down Expand Up @@ -516,3 +541,39 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid,
}

}

type roDagService struct {
NodeFetcher
}

var ErrReadOnly = fmt.Errorf("cannot write to read-only dagservice")

func (r *roDagService) Add(nd node.Node) (*cid.Cid, error) {
return nil, ErrReadOnly
}

func (r *roDagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
if c.Type() == cid.Raw {
return nil, nil
}
node, err := r.Get(ctx, c)
if err != nil {
return nil, err
}
return node.Links(), nil
}

func (r *roDagService) GetOfflineLinkService() LinkService {
panic("do not call this method on this object")
}
func (r *roDagService) Batch() *Batch {
panic("do not call this method on this object")
}

func (r *roDagService) Remove(node.Node) error {
return ErrReadOnly
}

func NewReadOnlyDagService(nf NodeFetcher) DAGService {
return &roDagService{nf}
}
8 changes: 4 additions & 4 deletions path/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ func (e ErrNoLink) Error() string {
// TODO: now that this is more modular, try to unify this code with the
// the resolvers in namesys
type Resolver struct {
DAG dag.DAGService
DAG dag.NodeFetcher

ResolveOnce func(ctx context.Context, ds dag.DAGService, nd node.Node, names []string) (*node.Link, []string, error)
ResolveOnce func(ctx context.Context, ds dag.NodeFetcher, nd node.Node, names []string) (*node.Link, []string, error)
}

func NewBasicResolver(ds dag.DAGService) *Resolver {
func NewBasicResolver(ds dag.NodeFetcher) *Resolver {
return &Resolver{
DAG: ds,
ResolveOnce: ResolveSingle,
Expand Down Expand Up @@ -123,7 +123,7 @@ func (s *Resolver) ResolvePath(ctx context.Context, fpath Path) (node.Node, erro

// ResolveSingle simply resolves one hop of a path through a graph with no
// extra context (does not opaquely resolve through sharded nodes)
func ResolveSingle(ctx context.Context, ds dag.DAGService, nd node.Node, names []string) (*node.Link, []string, error) {
func ResolveSingle(ctx context.Context, ds dag.NodeFetcher, nd node.Node, names []string) (*node.Link, []string, error) {
return nd.ResolveLink(names)
}

Expand Down
53 changes: 44 additions & 9 deletions unixfs/hamt/hamt.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type HamtShard struct {
prefixPadStr string
maxpadlen int

dserv dag.DAGService
dsadd dag.DAGService
dsget dag.NodeFetcher
}

// child can either be another shard, or a leaf node value
Expand All @@ -66,19 +67,22 @@ type child interface {
Label() string
}

var ErrReadOnly = fmt.Errorf("cannot modify readonly hamt shard")

func NewHamtShard(dserv dag.DAGService, size int) (*HamtShard, error) {
ds, err := makeHamtShard(dserv, size)
if err != nil {
return nil, err
}
ds.setWriterDag(dserv)

ds.bitfield = big.NewInt(0)
ds.nd = new(dag.ProtoNode)
ds.hashFunc = HashMurmur3
return ds, nil
}

func makeHamtShard(ds dag.DAGService, size int) (*HamtShard, error) {
func makeHamtShard(rodag dag.NodeFetcher, size int) (*HamtShard, error) {
lg2s := int(math.Log2(float64(size)))
if 1<<uint(lg2s) != size {
return nil, fmt.Errorf("hamt size should be a power of two")
Expand All @@ -89,11 +93,25 @@ func makeHamtShard(ds dag.DAGService, size int) (*HamtShard, error) {
prefixPadStr: fmt.Sprintf("%%0%dX", len(maxpadding)),
maxpadlen: len(maxpadding),
tableSize: size,
dserv: ds,
dsget: rodag,
}, nil
}

func (s *HamtShard) setWriterDag(ds dag.DAGService) {
s.dsadd = ds
}

func NewHamtFromDag(dserv dag.DAGService, nd node.Node) (*HamtShard, error) {
hs, err := NewHamtReader(dserv, nd)
if err != nil {
return nil, err
}

hs.setWriterDag(dserv)
return hs, nil
}

func NewHamtReader(dserv dag.NodeFetcher, nd node.Node) (*HamtShard, error) {
pbnd, ok := nd.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrLinkNotFound
Expand Down Expand Up @@ -132,6 +150,10 @@ func (ds *HamtShard) SetPrefix(prefix *cid.Prefix) {

// Node serializes the HAMT structure into a merkledag node with unixfs formatting
func (ds *HamtShard) Node() (node.Node, error) {
if ds.dsadd == nil {
return nil, ErrReadOnly
}

out := new(dag.ProtoNode)
out.SetPrefix(ds.prefix)

Expand Down Expand Up @@ -178,7 +200,7 @@ func (ds *HamtShard) Node() (node.Node, error) {

out.SetData(data)

_, err = ds.dserv.Add(out)
_, err = ds.dsadd.Add(out)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -214,8 +236,12 @@ func (ds *HamtShard) Label() string {

// Set sets 'name' = nd in the HAMT
func (ds *HamtShard) Set(ctx context.Context, name string, nd node.Node) error {
if ds.dsadd == nil {
return ErrReadOnly
}

hv := &hashBits{b: hash([]byte(name))}
_, err := ds.dserv.Add(nd)
_, err := ds.dsadd.Add(nd)
if err != nil {
return err
}
Expand Down Expand Up @@ -279,7 +305,7 @@ func (ds *HamtShard) loadChild(ctx context.Context, i int) (child, error) {
return nil, fmt.Errorf("invalid link name '%s'", lnk.Name)
}

nd, err := lnk.GetNode(ctx, ds.dserv)
nd, err := lnk.GetNode(ctx, ds.dsget)
if err != nil {
return nil, err
}
Expand All @@ -300,11 +326,15 @@ func (ds *HamtShard) loadChild(ctx context.Context, i int) (child, error) {
return nil, fmt.Errorf("HAMT entries must have non-zero length name")
}

cds, err := NewHamtFromDag(ds.dserv, nd)
cds, err := NewHamtReader(ds.dsget, nd)
if err != nil {
return nil, err
}

if ds.dsadd != nil {
cds.setWriterDag(ds.dsadd)
}

c = cds
} else {
lnk2 := *lnk
Expand All @@ -329,7 +359,7 @@ func (ds *HamtShard) Link() (*node.Link, error) {
return nil, err
}

_, err = ds.dserv.Add(nd)
_, err = ds.dsadd.Add(nd)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -445,6 +475,10 @@ func (ds *HamtShard) walkTrie(ctx context.Context, cb func(*shardValue) error) e
}

func (ds *HamtShard) modifyValue(ctx context.Context, hv *hashBits, key string, val *node.Link) error {
if ds.dsadd == nil {
return ErrReadOnly
}

idx := hv.Next(ds.tableSizeLg2)

if ds.bitfield.Bit(idx) != 1 {
Expand Down Expand Up @@ -496,10 +530,11 @@ func (ds *HamtShard) modifyValue(ctx context.Context, hv *hashBits, key string,
return nil

default: // replace value with another shard, one level deeper
ns, err := NewHamtShard(ds.dserv, ds.tableSize)
ns, err := NewHamtShard(ds.dsadd, ds.tableSize)
if err != nil {
return err
}

chhv := &hashBits{
b: hash([]byte(child.key)),
consumed: hv.consumed,
Expand Down
2 changes: 1 addition & 1 deletion unixfs/io/dagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ReadSeekCloser interface {

// NewDagReader creates a new reader object that reads the data represented by
// the given node, using the passed in DAGService for data retreival
func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (DagReader, error) {
func NewDagReader(ctx context.Context, n node.Node, serv mdag.NodeFetcher) (DagReader, error) {
switch n := n.(type) {
case *mdag.RawNode:
return NewBufDagReader(n.RawData()), nil
Expand Down
4 changes: 2 additions & 2 deletions unixfs/io/pbdagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

// DagReader provides a way to easily read the data contained in a dag.
type pbDagReader struct {
serv mdag.DAGService
serv mdag.NodeFetcher

// the node being read
node *mdag.ProtoNode
Expand Down Expand Up @@ -45,7 +45,7 @@ type pbDagReader struct {

var _ DagReader = (*pbDagReader)(nil)

func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *pbDagReader {
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.NodeFetcher) *pbDagReader {
fctx, cancel := context.WithCancel(ctx)
promises := mdag.GetDAG(fctx, serv, n)
return &pbDagReader{
Expand Down
4 changes: 2 additions & 2 deletions unixfs/io/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// ResolveUnixfsOnce resolves a single hop of a path through a graph in a
// unixfs context. This includes handling traversing sharded directories.
func ResolveUnixfsOnce(ctx context.Context, ds dag.DAGService, nd node.Node, names []string) (*node.Link, []string, error) {
func ResolveUnixfsOnce(ctx context.Context, ds dag.NodeFetcher, nd node.Node, names []string) (*node.Link, []string, error) {
switch nd := nd.(type) {
case *dag.ProtoNode:
upb, err := ft.FromBytes(nd.Data())
Expand All @@ -28,7 +28,7 @@ func ResolveUnixfsOnce(ctx context.Context, ds dag.DAGService, nd node.Node, nam

switch upb.GetType() {
case ft.THAMTShard:
s, err := hamt.NewHamtFromDag(ds, nd)
s, err := hamt.NewHamtReader(ds, nd)
if err != nil {
return nil, nil, err
}
Expand Down

0 comments on commit db5b831

Please sign in to comment.