Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: batch fetching interfaces on dagservice #4314

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/coreunix/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"context"

core "github.com/ipfs/go-ipfs/core"
mdag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path"
uio "github.com/ipfs/go-ipfs/unixfs/io"
)

func Cat(ctx context.Context, n *core.IpfsNode, pstr string) (uio.DagReader, error) {
ses := mdag.NewSession(ctx, n.Blocks)

r := &path.Resolver{
DAG: n.DAG,
DAG: ses,
ResolveOnce: uio.ResolveUnixfsOnce,
}

Expand All @@ -19,5 +22,5 @@ func Cat(ctx context.Context, n *core.IpfsNode, pstr string) (uio.DagReader, err
return nil, err
}

return uio.NewDagReader(ctx, dagNode, n.DAG)
return uio.NewDagReader(ctx, dagNode, ses)
}
67 changes: 64 additions & 3 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid"
node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format"
blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format"
ipldcbor "gx/ipfs/QmWCs8kMecJwCPK8JThue8TjgM2ieJ2HjTLDu7Cv2NEmZi/go-ipld-cbor"
)

Expand Down Expand Up @@ -40,6 +41,12 @@ type DAGService interface {
LinkService
}

// TODO: should live in go-ipld-format, not here. See: https://github.com/ipfs/go-ipld-format/pull/8
type NodeFetcher interface {
Get(context.Context, *cid.Cid) (node.Node, error)
GetMany(context.Context, []*cid.Cid) <-chan *NodeOption
}

type LinkService interface {
// GetLinks return all links for a node. The complete node does not
// necessarily have to exist locally, or at all. For example, raw
Expand Down Expand Up @@ -147,6 +154,11 @@ func GetLinksDirect(serv node.NodeGetter) GetLinks {
}
}

func NewSession(ctx context.Context, bs bserv.BlockService) NodeFetcher {
ses := bserv.NewSession(ctx, bs)
return &sesGetter{ses}
}

type sesGetter struct {
bs *bserv.Session
}
Expand All @@ -165,6 +177,10 @@ func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) {
return node.Decode(blk)
}

func (sg *sesGetter) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *NodeOption {
return getMany(ctx, sg.bs, cids)
}

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
var ng node.NodeGetter = serv
Expand Down Expand Up @@ -207,8 +223,16 @@ type NodeOption struct {
}

func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption {
return getMany(ctx, ds.Blocks, keys)
}

type blocksGetter interface {
GetBlocks(context.Context, []*cid.Cid) <-chan blocks.Block
}

func getMany(ctx context.Context, bg blocksGetter, keys []*cid.Cid) <-chan *NodeOption {
out := make(chan *NodeOption, len(keys))
blocks := ds.Blocks.GetBlocks(ctx, keys)
blocks := bg.GetBlocks(ctx, keys)
var count int

go func() {
Expand Down Expand Up @@ -244,7 +268,7 @@ func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *Node
// GetDAG will fill out all of the links of the given Node.
// It returns a channel of nodes, which the caller can receive
// all the child nodes of 'root' on, in proper order.
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {
func GetDAG(ctx context.Context, ds NodeFetcher, root node.Node) []NodeGetter {
var cids []*cid.Cid
for _, lnk := range root.Links() {
cids = append(cids, lnk.Cid)
Expand All @@ -255,7 +279,7 @@ func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter {

// GetNodes returns an array of 'NodeGetter' promises, with each corresponding
// to the key with the same index as the passed in keys
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter {
func GetNodes(ctx context.Context, ds NodeFetcher, keys []*cid.Cid) []NodeGetter {

// Early out if no work to do
if len(keys) == 0 {
Expand Down Expand Up @@ -334,6 +358,7 @@ type nodePromise struct {
// the first call to Get will block until the Node is received
// from its internal channels, subsequent calls will return the
// cached node.
// TODO: this is a name clash with node.NodeGetter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm removing this interface in ipfs/go-ipld-format#8 (introducing a single, public NodePromise type.

type NodeGetter interface {
Get(context.Context) (node.Node, error)
Fail(err error)
Expand Down Expand Up @@ -516,3 +541,39 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid,
}

}

type roDagService struct {
NodeFetcher
}

var ErrReadOnly = fmt.Errorf("cannot write to read-only dagservice")

func (r *roDagService) Add(nd node.Node) (*cid.Cid, error) {
return nil, ErrReadOnly
}

func (r *roDagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function will also go away wit that PR.

if c.Type() == cid.Raw {
return nil, nil
}
node, err := r.Get(ctx, c)
if err != nil {
return nil, err
}
return node.Links(), nil
}

func (r *roDagService) GetOfflineLinkService() LinkService {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with this one.

panic("do not call this method on this object")
}
func (r *roDagService) Batch() *Batch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this one.

panic("do not call this method on this object")
}

func (r *roDagService) Remove(node.Node) error {
return ErrReadOnly
}

func NewReadOnlyDagService(nf NodeFetcher) DAGService {
return &roDagService{nf}
}
8 changes: 4 additions & 4 deletions path/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ func (e ErrNoLink) Error() string {
// TODO: now that this is more modular, try to unify this code with the
// the resolvers in namesys
type Resolver struct {
DAG dag.DAGService
DAG dag.NodeFetcher

ResolveOnce func(ctx context.Context, ds dag.DAGService, nd node.Node, names []string) (*node.Link, []string, error)
ResolveOnce func(ctx context.Context, ds dag.NodeFetcher, nd node.Node, names []string) (*node.Link, []string, error)
}

func NewBasicResolver(ds dag.DAGService) *Resolver {
func NewBasicResolver(ds dag.NodeFetcher) *Resolver {
return &Resolver{
DAG: ds,
ResolveOnce: ResolveSingle,
Expand Down Expand Up @@ -123,7 +123,7 @@ func (s *Resolver) ResolvePath(ctx context.Context, fpath Path) (node.Node, erro

// ResolveSingle simply resolves one hop of a path through a graph with no
// extra context (does not opaquely resolve through sharded nodes)
func ResolveSingle(ctx context.Context, ds dag.DAGService, nd node.Node, names []string) (*node.Link, []string, error) {
func ResolveSingle(ctx context.Context, ds dag.NodeFetcher, nd node.Node, names []string) (*node.Link, []string, error) {
return nd.ResolveLink(names)
}

Expand Down
53 changes: 44 additions & 9 deletions unixfs/hamt/hamt.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type HamtShard struct {
prefixPadStr string
maxpadlen int

dserv dag.DAGService
dsadd dag.DAGService
dsget dag.NodeFetcher
}

// child can either be another shard, or a leaf node value
Expand All @@ -66,19 +67,22 @@ type child interface {
Label() string
}

var ErrReadOnly = fmt.Errorf("cannot modify readonly hamt shard")

func NewHamtShard(dserv dag.DAGService, size int) (*HamtShard, error) {
ds, err := makeHamtShard(dserv, size)
if err != nil {
return nil, err
}
ds.setWriterDag(dserv)

ds.bitfield = big.NewInt(0)
ds.nd = new(dag.ProtoNode)
ds.hashFunc = HashMurmur3
return ds, nil
}

func makeHamtShard(ds dag.DAGService, size int) (*HamtShard, error) {
func makeHamtShard(rodag dag.NodeFetcher, size int) (*HamtShard, error) {
lg2s := int(math.Log2(float64(size)))
if 1<<uint(lg2s) != size {
return nil, fmt.Errorf("hamt size should be a power of two")
Expand All @@ -89,11 +93,25 @@ func makeHamtShard(ds dag.DAGService, size int) (*HamtShard, error) {
prefixPadStr: fmt.Sprintf("%%0%dX", len(maxpadding)),
maxpadlen: len(maxpadding),
tableSize: size,
dserv: ds,
dsget: rodag,
}, nil
}

func (s *HamtShard) setWriterDag(ds dag.DAGService) {
s.dsadd = ds
}

func NewHamtFromDag(dserv dag.DAGService, nd node.Node) (*HamtShard, error) {
hs, err := NewHamtReader(dserv, nd)
if err != nil {
return nil, err
}

hs.setWriterDag(dserv)
return hs, nil
}

func NewHamtReader(dserv dag.NodeFetcher, nd node.Node) (*HamtShard, error) {
pbnd, ok := nd.(*dag.ProtoNode)
if !ok {
return nil, dag.ErrLinkNotFound
Expand Down Expand Up @@ -132,6 +150,10 @@ func (ds *HamtShard) SetPrefix(prefix *cid.Prefix) {

// Node serializes the HAMT structure into a merkledag node with unixfs formatting
func (ds *HamtShard) Node() (node.Node, error) {
if ds.dsadd == nil {
return nil, ErrReadOnly
}

out := new(dag.ProtoNode)
out.SetPrefix(ds.prefix)

Expand Down Expand Up @@ -178,7 +200,7 @@ func (ds *HamtShard) Node() (node.Node, error) {

out.SetData(data)

_, err = ds.dserv.Add(out)
_, err = ds.dsadd.Add(out)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -214,8 +236,12 @@ func (ds *HamtShard) Label() string {

// Set sets 'name' = nd in the HAMT
func (ds *HamtShard) Set(ctx context.Context, name string, nd node.Node) error {
if ds.dsadd == nil {
return ErrReadOnly
}

hv := &hashBits{b: hash([]byte(name))}
_, err := ds.dserv.Add(nd)
_, err := ds.dsadd.Add(nd)
if err != nil {
return err
}
Expand Down Expand Up @@ -279,7 +305,7 @@ func (ds *HamtShard) loadChild(ctx context.Context, i int) (child, error) {
return nil, fmt.Errorf("invalid link name '%s'", lnk.Name)
}

nd, err := lnk.GetNode(ctx, ds.dserv)
nd, err := lnk.GetNode(ctx, ds.dsget)
if err != nil {
return nil, err
}
Expand All @@ -300,11 +326,15 @@ func (ds *HamtShard) loadChild(ctx context.Context, i int) (child, error) {
return nil, fmt.Errorf("HAMT entries must have non-zero length name")
}

cds, err := NewHamtFromDag(ds.dserv, nd)
cds, err := NewHamtReader(ds.dsget, nd)
if err != nil {
return nil, err
}

if ds.dsadd != nil {
cds.setWriterDag(ds.dsadd)
}

c = cds
} else {
lnk2 := *lnk
Expand All @@ -329,7 +359,7 @@ func (ds *HamtShard) Link() (*node.Link, error) {
return nil, err
}

_, err = ds.dserv.Add(nd)
_, err = ds.dsadd.Add(nd)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -445,6 +475,10 @@ func (ds *HamtShard) walkTrie(ctx context.Context, cb func(*shardValue) error) e
}

func (ds *HamtShard) modifyValue(ctx context.Context, hv *hashBits, key string, val *node.Link) error {
if ds.dsadd == nil {
return ErrReadOnly
}

idx := hv.Next(ds.tableSizeLg2)

if ds.bitfield.Bit(idx) != 1 {
Expand Down Expand Up @@ -496,10 +530,11 @@ func (ds *HamtShard) modifyValue(ctx context.Context, hv *hashBits, key string,
return nil

default: // replace value with another shard, one level deeper
ns, err := NewHamtShard(ds.dserv, ds.tableSize)
ns, err := NewHamtShard(ds.dsadd, ds.tableSize)
if err != nil {
return err
}

chhv := &hashBits{
b: hash([]byte(child.key)),
consumed: hv.consumed,
Expand Down
2 changes: 1 addition & 1 deletion unixfs/io/dagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ReadSeekCloser interface {

// NewDagReader creates a new reader object that reads the data represented by
// the given node, using the passed in DAGService for data retreival
func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (DagReader, error) {
func NewDagReader(ctx context.Context, n node.Node, serv mdag.NodeFetcher) (DagReader, error) {
switch n := n.(type) {
case *mdag.RawNode:
return NewBufDagReader(n.RawData()), nil
Expand Down
4 changes: 2 additions & 2 deletions unixfs/io/pbdagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

// DagReader provides a way to easily read the data contained in a dag.
type pbDagReader struct {
serv mdag.DAGService
serv mdag.NodeFetcher

// the node being read
node *mdag.ProtoNode
Expand Down Expand Up @@ -45,7 +45,7 @@ type pbDagReader struct {

var _ DagReader = (*pbDagReader)(nil)

func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *pbDagReader {
func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.NodeFetcher) *pbDagReader {
fctx, cancel := context.WithCancel(ctx)
promises := mdag.GetDAG(fctx, serv, n)
return &pbDagReader{
Expand Down
4 changes: 2 additions & 2 deletions unixfs/io/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// ResolveUnixfsOnce resolves a single hop of a path through a graph in a
// unixfs context. This includes handling traversing sharded directories.
func ResolveUnixfsOnce(ctx context.Context, ds dag.DAGService, nd node.Node, names []string) (*node.Link, []string, error) {
func ResolveUnixfsOnce(ctx context.Context, ds dag.NodeFetcher, nd node.Node, names []string) (*node.Link, []string, error) {
switch nd := nd.(type) {
case *dag.ProtoNode:
upb, err := ft.FromBytes(nd.Data())
Expand All @@ -28,7 +28,7 @@ func ResolveUnixfsOnce(ctx context.Context, ds dag.DAGService, nd node.Node, nam

switch upb.GetType() {
case ft.THAMTShard:
s, err := hamt.NewHamtFromDag(ds, nd)
s, err := hamt.NewHamtReader(ds, nd)
if err != nil {
return nil, nil, err
}
Expand Down