diff --git a/core/commands/filestore.go b/core/commands/filestore.go index 53095c3dadb2..d684589c4842 100644 --- a/core/commands/filestore.go +++ b/core/commands/filestore.go @@ -1,17 +1,15 @@ package commands import ( - "context" "fmt" + "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/commands/cmdenv" + "github.com/ipfs/go-ipfs/filestore" "io" - core "github.com/ipfs/go-ipfs/core" - cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" - filestore "github.com/ipfs/go-ipfs/filestore" - - cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" - cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds" - cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit" + "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" + "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds" + "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit" ) var FileStoreCmd = &cmds.Command{ @@ -56,11 +54,7 @@ The output is: } args := req.Arguments if len(args) > 0 { - out := perKeyActionToChan(req.Context, args, func(c cid.Cid) *filestore.ListRes { - return filestore.List(fs, c) - }) - - return res.Emit(out) + return listByArgs(res, fs, args) } fileOrder, _ := req.Options[fileOrderOptionName].(bool) @@ -69,8 +63,17 @@ The output is: return err } - out := listResToChan(req.Context, next) - return res.Emit(out) + for { + r := next() + if r == nil { + break + } + if err := res.Emit(r); err != nil { + return err + } + } + + return nil }, PostRun: cmds.PostRunMap{ cmds.CLI: streamResult(func(v interface{}, out io.Writer) nonFatalError { @@ -122,10 +125,7 @@ For ERROR entries the error will also be printed to stderr. } args := req.Arguments if len(args) > 0 { - out := perKeyActionToChan(req.Context, args, func(c cid.Cid) *filestore.ListRes { - return filestore.Verify(fs, c) - }) - return res.Emit(out) + return listByArgs(res, fs, args) } fileOrder, _ := req.Options[fileOrderOptionName].(bool) @@ -133,8 +133,18 @@ For ERROR entries the error will also be printed to stderr. if err != nil { return err } - out := listResToChan(req.Context, next) - return res.Emit(out) + + for { + r := next() + if r == nil { + break + } + if err := res.Emit(r); err != nil { + return err + } + } + + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *filestore.ListRes) error { @@ -162,29 +172,19 @@ var dupsFileStore = &cmds.Command{ return err } - out := make(chan interface{}, 128) - - go func() { - defer close(out) - for cid := range ch { - have, err := fs.MainBlockstore().Has(cid) - if err != nil { - select { - case out <- &RefWrapper{Err: err.Error()}: - case <-req.Context.Done(): - } - return - } - if have { - select { - case out <- &RefWrapper{Ref: cid.String()}: - case <-req.Context.Done(): - return - } + for cid := range ch { + have, err := fs.MainBlockstore().Has(cid) + if err != nil { + return res.Emit(&RefWrapper{Err: err.Error()}) + } + if have { + if err := res.Emit(&RefWrapper{Ref: cid.String()}); err != nil { + return err } } - }() - return res.Emit(out) + } + + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RefWrapper) error { @@ -212,49 +212,24 @@ func getFilestore(env cmds.Environment) (*core.IpfsNode, *filestore.Filestore, e return n, fs, err } -func listResToChan(ctx context.Context, next func() *filestore.ListRes) <-chan interface{} { - out := make(chan interface{}, 128) - go func() { - defer close(out) - for { - r := next() - if r == nil { - return +func listByArgs(res cmds.ResponseEmitter, fs *filestore.Filestore, args []string) error { + for _, arg := range args { + c, err := cid.Decode(arg) + if err != nil { + ret := &filestore.ListRes{ + Status: filestore.StatusOtherError, + ErrorMsg: fmt.Sprintf("%s: %v", arg, err), } - select { - case out <- r: - case <-ctx.Done(): - return + if err := res.Emit(ret); err != nil { + return err } + continue } - }() - return out -} - -func perKeyActionToChan(ctx context.Context, args []string, action func(cid.Cid) *filestore.ListRes) <-chan interface{} { - out := make(chan interface{}, 128) - go func() { - defer close(out) - for _, arg := range args { - c, err := cid.Decode(arg) - if err != nil { - select { - case out <- &filestore.ListRes{ - Status: filestore.StatusOtherError, - ErrorMsg: fmt.Sprintf("%s: %v", arg, err), - }: - case <-ctx.Done(): - } - - continue - } - r := action(c) - select { - case out <- r: - case <-ctx.Done(): - return - } + r := filestore.Verify(fs, c) + if err := res.Emit(r); err != nil { + return err } - }() - return out + } + + return nil }