Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pin cmd: stream recursive pins #6493

Merged
merged 6 commits into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/commands/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ The JSON output contains type information.
cmds.BoolOption(lsHeadersOptionNameTime, "v", "Print table headers (Hash, Size, Name)."),
cmds.BoolOption(lsResolveTypeOptionName, "Resolve linked objects to find out their types.").WithDefault(true),
cmds.BoolOption(lsSizeOptionName, "Resolve linked objects to find out their file size.").WithDefault(true),
cmds.BoolOption(lsStreamOptionName, "s", "Enable exprimental streaming of directory entries as they are traversed."),
cmds.BoolOption(lsStreamOptionName, "s", "Enable experimental streaming of directory entries as they are traversed."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env, req)
Expand Down
267 changes: 172 additions & 95 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,9 @@ collected if needed. (By default, recursively. Use -r=false for direct pins.)
}

const (
pinTypeOptionName = "type"
pinQuietOptionName = "quiet"
pinTypeOptionName = "type"
pinQuietOptionName = "quiet"
pinStreamOptionName = "stream"
)

var listPinCmd = &cmds.Command{
Expand Down Expand Up @@ -313,6 +314,7 @@ Example:
Options: []cmds.Option{
cmds.StringOption(pinTypeOptionName, "t", "The type of pinned keys to list. Can be \"direct\", \"indirect\", \"recursive\", or \"all\".").WithDefault("all"),
cmds.BoolOption(pinQuietOptionName, "q", "Write just hashes of objects."),
cmds.BoolOption(pinStreamOptionName, "s", "Enable streaming of pins as they are discovered."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
Expand All @@ -326,9 +328,7 @@ Example:
}

typeStr, _ := req.Options[pinTypeOptionName].(string)
if err != nil {
return err
}
stream, _ := req.Options[pinStreamOptionName].(bool)

switch typeStr {
case "all", "direct", "indirect", "recursive":
Expand All @@ -337,34 +337,50 @@ Example:
return err
}

enc, err := cmdenv.GetCidEncoder(req)
MichaelMure marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
// For backward compatibility, we accumulate the pins in the same output type as before.
emit := res.Emit
lgcList := map[string]PinLsType{}
if !stream {
emit = func(v interface{}) error {
obj := v.(*PinLsOutputWrapper)
lgcList[obj.PinLsObject.Cid] = PinLsType{Type: obj.PinLsObject.Type}
return nil
}
}

var keys map[cid.Cid]RefKeyObject
if len(req.Arguments) > 0 {
keys, err = pinLsKeys(req.Context, req.Arguments, typeStr, n, api)
err = pinLsKeys(req, typeStr, n, api, emit)
} else {
keys, err = pinLsAll(req.Context, typeStr, n)
err = pinLsAll(req, typeStr, n, emit)
}
if err != nil {
return err
}

refKeys := make(map[string]RefKeyObject, len(keys))
for k, v := range keys {
refKeys[enc.Encode(k)] = v
MichaelMure marked this conversation as resolved.
Show resolved Hide resolved
if !stream {
return cmds.EmitOnce(res, &PinLsOutputWrapper{
PinLsList: PinLsList{Keys: lgcList},
})
}

return cmds.EmitOnce(res, &RefKeyList{Keys: refKeys})
return nil
},
Type: RefKeyList{},
Type: &PinLsOutputWrapper{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RefKeyList) error {
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error {
quiet, _ := req.Options[pinQuietOptionName].(bool)
stream, _ := req.Options[pinStreamOptionName].(bool)

if stream {
if quiet {
fmt.Fprintf(w, "%s\n", out.PinLsObject.Cid)
} else {
fmt.Fprintf(w, "%s %s\n", out.PinLsObject.Cid, out.PinLsObject.Type)
}
return nil
}

for k, v := range out.Keys {
for k, v := range out.PinLsList.Keys {
if quiet {
fmt.Fprintf(w, "%s\n", k)
} else {
Expand All @@ -377,6 +393,144 @@ Example:
},
}

// PinLsOutputWrapper is the output type of the pin ls command.
// Pin ls needs to output two different type depending on if it's streamed or not.
// We use this to bypass the cmds lib refusing to have interface{}
type PinLsOutputWrapper struct {
PinLsList
PinLsObject
}

// PinLsList is a set of pins with their type
type PinLsList struct {
Keys map[string]PinLsType `json:",omitempty"`
}

// PinLsType contains the type of a pin
type PinLsType struct {
Type string
}

// PinLsObject contains the description of a pin
type PinLsObject struct {
Cid string `json:",omitempty"`
Type string `json:",omitempty"`
}

func pinLsKeys(req *cmds.Request, typeStr string, n *core.IpfsNode, api coreiface.CoreAPI, emit func(value interface{}) error) error {
mode, ok := pin.StringToMode(typeStr)
if !ok {
return fmt.Errorf("invalid pin mode '%s'", typeStr)
}

enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
}

for _, p := range req.Arguments {
c, err := api.ResolvePath(req.Context, path.New(p))
if err != nil {
return err
}

pinType, pinned, err := n.Pinning.IsPinnedWithType(c.Cid(), mode)
if err != nil {
return err
}

if !pinned {
return fmt.Errorf("path '%s' is not pinned", p)
}

switch pinType {
case "direct", "indirect", "recursive", "internal":
default:
pinType = "indirect through " + pinType
}

err = emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: pinType,
Cid: enc.Encode(c.Cid()),
},
})
if err != nil {
return err
}
}

return nil
}

func pinLsAll(req *cmds.Request, typeStr string, n *core.IpfsNode, emit func(value interface{}) error) error {
enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
}

keys := cid.NewSet()

AddToResultKeys := func(keyList []cid.Cid, typeStr string) error {
for _, c := range keyList {
if keys.Visit(c) {
err := emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: typeStr,
Cid: enc.Encode(c),
},
})
if err != nil {
return err
}
}
}
return nil
}

if typeStr == "direct" || typeStr == "all" {
err := AddToResultKeys(n.Pinning.DirectKeys(), "direct")
if err != nil {
return err
}
}
if typeStr == "recursive" || typeStr == "all" {
err := AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
if err != nil {
return err
}
}
if typeStr == "indirect" || typeStr == "all" {
for _, k := range n.Pinning.RecursiveKeys() {
var visitErr error
err := dag.EnumerateChildren(req.Context, dag.GetLinksWithDAG(n.DAG), k, func(c cid.Cid) bool {
r := keys.Visit(c)
if r {
err := emit(&PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: "indirect",
Cid: enc.Encode(c),
},
})
if err != nil {
visitErr = err
}
}
return r
})

if visitErr != nil {
return visitErr
}
if err != nil {
return err
}
}
}

return nil
}

const (
pinUnpinOptionName = "unpin"
)
Expand Down Expand Up @@ -491,83 +645,6 @@ var verifyPinCmd = &cmds.Command{
},
}

type RefKeyObject struct {
Type string
}

type RefKeyList struct {
Keys map[string]RefKeyObject
}

func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsNode, api coreiface.CoreAPI) (map[cid.Cid]RefKeyObject, error) {

mode, ok := pin.StringToMode(typeStr)
if !ok {
return nil, fmt.Errorf("invalid pin mode '%s'", typeStr)
}

keys := make(map[cid.Cid]RefKeyObject)

for _, p := range args {
c, err := api.ResolvePath(ctx, path.New(p))
if err != nil {
return nil, err
}

pinType, pinned, err := n.Pinning.IsPinnedWithType(c.Cid(), mode)
if err != nil {
return nil, err
}

if !pinned {
return nil, fmt.Errorf("path '%s' is not pinned", p)
}

switch pinType {
case "direct", "indirect", "recursive", "internal":
default:
pinType = "indirect through " + pinType
}
keys[c.Cid()] = RefKeyObject{
Type: pinType,
}
}

return keys, nil
}

func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode) (map[cid.Cid]RefKeyObject, error) {

keys := make(map[cid.Cid]RefKeyObject)

AddToResultKeys := func(keyList []cid.Cid, typeStr string) {
for _, c := range keyList {
keys[c] = RefKeyObject{
Type: typeStr,
}
}
}

if typeStr == "direct" || typeStr == "all" {
AddToResultKeys(n.Pinning.DirectKeys(), "direct")
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
err := dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(n.DAG), k, set.Visit)
if err != nil {
return nil, err
}
}
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
}

return keys, nil
}

// PinVerifyRes is the result returned for each pin checked in "pin verify"
type PinVerifyRes struct {
Cid string
Expand Down