Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pin cmd: stream recursive pins #6493

Merged
merged 6 commits into from
Jul 12, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 107 additions & 44 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ collected if needed. (By default, recursively. Use -r=false for direct pins.)
}

const (
pinTypeOptionName = "type"
pinQuietOptionName = "quiet"
pinTypeOptionName = "type"
pinQuietOptionName = "quiet"
pinStreamOptionName = "stream"
)

var listPinCmd = &cmds.Command{
Expand Down Expand Up @@ -313,6 +314,7 @@ Example:
Options: []cmds.Option{
cmds.StringOption(pinTypeOptionName, "t", "The type of pinned keys to list. Can be \"direct\", \"indirect\", \"recursive\", or \"all\".").WithDefault("all"),
cmds.BoolOption(pinQuietOptionName, "q", "Write just hashes of objects."),
cmds.BoolOption(pinStreamOptionName, "s", "Don't buffer pins before sending."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
Expand All @@ -326,9 +328,7 @@ Example:
}

typeStr, _ := req.Options[pinTypeOptionName].(string)
if err != nil {
return err
}
stream, _ := req.Options[pinStreamOptionName].(bool)

switch typeStr {
case "all", "direct", "indirect", "recursive":
Expand All @@ -337,34 +337,50 @@ Example:
return err
}

enc, err := cmdenv.GetCidEncoder(req)
MichaelMure marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
// For backward compatibility, we accumulate the pins in the same output type as before.
emit := res.Emit
lgcList := map[string]RefObject{}
if !stream {
emit = func(v interface{}) error {
obj := v.(*PinLsOutputWrapper)
lgcList[obj.RefKeyObject.Cid] = RefObject{Type: obj.RefKeyObject.Type}
return nil
}
}

var keys map[cid.Cid]RefKeyObject
if len(req.Arguments) > 0 {
keys, err = pinLsKeys(req.Context, req.Arguments, typeStr, n, api)
err = pinLsKeys(req.Context, req.Arguments, typeStr, n, api, emit)
} else {
keys, err = pinLsAll(req.Context, typeStr, n)
err = pinLsAll(req.Context, typeStr, n, emit)
}
if err != nil {
return err
}

refKeys := make(map[string]RefKeyObject, len(keys))
for k, v := range keys {
refKeys[enc.Encode(k)] = v
MichaelMure marked this conversation as resolved.
Show resolved Hide resolved
if !stream {
return cmds.EmitOnce(res, &PinLsOutputWrapper{
RefKeyList: RefKeyList{Keys: lgcList},
})
}

return cmds.EmitOnce(res, &RefKeyList{Keys: refKeys})
return nil
},
Type: RefKeyList{},
Type: &PinLsOutputWrapper{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RefKeyList) error {
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error {
quiet, _ := req.Options[pinQuietOptionName].(bool)
stream, _ := req.Options[pinStreamOptionName].(bool)

if stream {
if quiet {
fmt.Fprintf(w, "%s\n", out.RefKeyObject.Cid)
} else {
fmt.Fprintf(w, "%s %s\n", out.RefKeyObject.Cid, out.RefKeyObject.Type)
}
return nil
}

for k, v := range out.Keys {
for k, v := range out.RefKeyList.Keys {
if quiet {
fmt.Fprintf(w, "%s\n", k)
} else {
Expand Down Expand Up @@ -492,80 +508,127 @@ var verifyPinCmd = &cmds.Command{
}

type RefKeyObject struct {
Cid string
Type string
}

type RefObject struct {
Type string
}

type RefKeyList struct {
Keys map[string]RefKeyObject
Keys map[string]RefObject
}

func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsNode, api coreiface.CoreAPI) (map[cid.Cid]RefKeyObject, error) {
// Pin ls needs to output two different type depending on if it's streamed or not.
// We use this to bypass the cmds lib refusing to have interface{}
type PinLsOutputWrapper struct {
RefKeyList
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're going to have to use "omitempty" for full backwards compatibility.

RefKeyObject
}

func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsNode, api coreiface.CoreAPI, emit func(value interface{}) error) error {
mode, ok := pin.StringToMode(typeStr)
if !ok {
return nil, fmt.Errorf("invalid pin mode '%s'", typeStr)
return fmt.Errorf("invalid pin mode '%s'", typeStr)
}

keys := make(map[cid.Cid]RefKeyObject)

for _, p := range args {
c, err := api.ResolvePath(ctx, path.New(p))
if err != nil {
return nil, err
return err
}

pinType, pinned, err := n.Pinning.IsPinnedWithType(c.Cid(), mode)
if err != nil {
return nil, err
return err
}

if !pinned {
return nil, fmt.Errorf("path '%s' is not pinned", p)
return fmt.Errorf("path '%s' is not pinned", p)
}

switch pinType {
case "direct", "indirect", "recursive", "internal":
default:
pinType = "indirect through " + pinType
}
keys[c.Cid()] = RefKeyObject{
Type: pinType,

err = emit(&PinLsOutputWrapper{
RefKeyObject: RefKeyObject{
Type: pinType,
Cid: c.Cid().String(),
},
})
if err != nil {
return err
}
}

return keys, nil
return nil
}

func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode) (map[cid.Cid]RefKeyObject, error) {

keys := make(map[cid.Cid]RefKeyObject)
func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode, emit func(value interface{}) error) error {
keys := cid.NewSet()

AddToResultKeys := func(keyList []cid.Cid, typeStr string) {
AddToResultKeys := func(keyList []cid.Cid, typeStr string) error {
for _, c := range keyList {
keys[c] = RefKeyObject{
Type: typeStr,
if keys.Visit(c) {
err := emit(&PinLsOutputWrapper{
RefKeyObject: RefKeyObject{
Type: typeStr,
Cid: c.String(),
},
})
if err != nil {
return err
}
}
}
return nil
}

if typeStr == "direct" || typeStr == "all" {
AddToResultKeys(n.Pinning.DirectKeys(), "direct")
err := AddToResultKeys(n.Pinning.DirectKeys(), "direct")
if err != nil {
return err
}
}
if typeStr == "recursive" || typeStr == "all" {
err := AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
if err != nil {
return err
}
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
err := dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(n.DAG), k, set.Visit)
var visitErr error
err := dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(n.DAG), k, func(c cid.Cid) bool {
r := keys.Visit(c)
if r {
err := emit(&PinLsOutputWrapper{
RefKeyObject: RefKeyObject{
Type: "indirect",
Cid: c.String(),
},
})
if err != nil {
visitErr = err
}
}
return r
})

if visitErr != nil {
return visitErr
}
if err != nil {
return nil, err
return err
}
}
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
}

return keys, nil
return nil
}

// PinVerifyRes is the result returned for each pin checked in "pin verify"
Expand Down