-
-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: batch fetching interfaces on dagservice #4314
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ import ( | |
|
||
cid "gx/ipfs/QmNp85zy9RLrQ5oQD4hPyS39ezrrXpcaa7R4Y9kxdWQLLQ/go-cid" | ||
node "gx/ipfs/QmPN7cwmpcc4DWXb4KTB9dNAJgjuPY69h3npsMfhRrQL9c/go-ipld-format" | ||
blocks "gx/ipfs/QmSn9Td7xgxm9EV7iEjTckpUWmWApggzPxu7eFGWkkpwin/go-block-format" | ||
ipldcbor "gx/ipfs/QmWCs8kMecJwCPK8JThue8TjgM2ieJ2HjTLDu7Cv2NEmZi/go-ipld-cbor" | ||
) | ||
|
||
|
@@ -40,6 +41,12 @@ type DAGService interface { | |
LinkService | ||
} | ||
|
||
// TODO: should live in go-ipld-format, not here. See: https://github.com/ipfs/go-ipld-format/pull/8 | ||
type NodeFetcher interface { | ||
Get(context.Context, *cid.Cid) (node.Node, error) | ||
GetMany(context.Context, []*cid.Cid) <-chan *NodeOption | ||
} | ||
|
||
type LinkService interface { | ||
// GetLinks return all links for a node. The complete node does not | ||
// necessarily have to exist locally, or at all. For example, raw | ||
|
@@ -147,6 +154,11 @@ func GetLinksDirect(serv node.NodeGetter) GetLinks { | |
} | ||
} | ||
|
||
func NewSession(ctx context.Context, bs bserv.BlockService) NodeFetcher { | ||
ses := bserv.NewSession(ctx, bs) | ||
return &sesGetter{ses} | ||
} | ||
|
||
type sesGetter struct { | ||
bs *bserv.Session | ||
} | ||
|
@@ -165,6 +177,10 @@ func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) { | |
return node.Decode(blk) | ||
} | ||
|
||
func (sg *sesGetter) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *NodeOption { | ||
return getMany(ctx, sg.bs, cids) | ||
} | ||
|
||
// FetchGraph fetches all nodes that are children of the given node | ||
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error { | ||
var ng node.NodeGetter = serv | ||
|
@@ -207,8 +223,16 @@ type NodeOption struct { | |
} | ||
|
||
func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *NodeOption { | ||
return getMany(ctx, ds.Blocks, keys) | ||
} | ||
|
||
type blocksGetter interface { | ||
GetBlocks(context.Context, []*cid.Cid) <-chan blocks.Block | ||
} | ||
|
||
func getMany(ctx context.Context, bg blocksGetter, keys []*cid.Cid) <-chan *NodeOption { | ||
out := make(chan *NodeOption, len(keys)) | ||
blocks := ds.Blocks.GetBlocks(ctx, keys) | ||
blocks := bg.GetBlocks(ctx, keys) | ||
var count int | ||
|
||
go func() { | ||
|
@@ -244,7 +268,7 @@ func (ds *dagService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *Node | |
// GetDAG will fill out all of the links of the given Node. | ||
// It returns a channel of nodes, which the caller can receive | ||
// all the child nodes of 'root' on, in proper order. | ||
func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter { | ||
func GetDAG(ctx context.Context, ds NodeFetcher, root node.Node) []NodeGetter { | ||
var cids []*cid.Cid | ||
for _, lnk := range root.Links() { | ||
cids = append(cids, lnk.Cid) | ||
|
@@ -255,7 +279,7 @@ func GetDAG(ctx context.Context, ds DAGService, root node.Node) []NodeGetter { | |
|
||
// GetNodes returns an array of 'NodeGetter' promises, with each corresponding | ||
// to the key with the same index as the passed in keys | ||
func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodeGetter { | ||
func GetNodes(ctx context.Context, ds NodeFetcher, keys []*cid.Cid) []NodeGetter { | ||
|
||
// Early out if no work to do | ||
if len(keys) == 0 { | ||
|
@@ -334,6 +358,7 @@ type nodePromise struct { | |
// the first call to Get will block until the Node is received | ||
// from its internal channels, subsequent calls will return the | ||
// cached node. | ||
// TODO: this is a name clash with node.NodeGetter | ||
type NodeGetter interface { | ||
Get(context.Context) (node.Node, error) | ||
Fail(err error) | ||
|
@@ -516,3 +541,39 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, | |
} | ||
|
||
} | ||
|
||
type roDagService struct { | ||
NodeFetcher | ||
} | ||
|
||
var ErrReadOnly = fmt.Errorf("cannot write to read-only dagservice") | ||
|
||
func (r *roDagService) Add(nd node.Node) (*cid.Cid, error) { | ||
return nil, ErrReadOnly | ||
} | ||
|
||
func (r *roDagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*node.Link, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function will also go away wit that PR. |
||
if c.Type() == cid.Raw { | ||
return nil, nil | ||
} | ||
node, err := r.Get(ctx, c) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return node.Links(), nil | ||
} | ||
|
||
func (r *roDagService) GetOfflineLinkService() LinkService { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As with this one. |
||
panic("do not call this method on this object") | ||
} | ||
func (r *roDagService) Batch() *Batch { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And this one. |
||
panic("do not call this method on this object") | ||
} | ||
|
||
func (r *roDagService) Remove(node.Node) error { | ||
return ErrReadOnly | ||
} | ||
|
||
func NewReadOnlyDagService(nf NodeFetcher) DAGService { | ||
return &roDagService{nf} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm removing this interface in ipfs/go-ipld-format#8 (introducing a single, public
NodePromise
type.