Skip to content

Commit

Permalink
commands/filestore: use res.Emit directly
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Overbool <overbool.xu@gmail.com>
  • Loading branch information
overbool committed Oct 28, 2018
1 parent f0ea3ef commit 672dfd0
Showing 1 changed file with 58 additions and 83 deletions.
141 changes: 58 additions & 83 deletions core/commands/filestore.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package commands

import (
"context"
"fmt"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/commands/cmdenv"
"github.com/ipfs/go-ipfs/filestore"
"io"

core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
filestore "github.com/ipfs/go-ipfs/filestore"

cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
cmds "gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds"
cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
"gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
"gx/ipfs/QmSXUokcP4TJpFfqozT69AVAYRtzXVMUjzQVkYX41R9Svs/go-ipfs-cmds"
"gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
)

var FileStoreCmd = &cmds.Command{
Expand Down Expand Up @@ -56,11 +54,7 @@ The output is:
}
args := req.Arguments
if len(args) > 0 {
out := perKeyActionToChan(req.Context, args, func(c cid.Cid) *filestore.ListRes {
return filestore.List(fs, c)
})

return res.Emit(out)
return listByArgs(res, fs, args)
}

fileOrder, _ := req.Options[fileOrderOptionName].(bool)
Expand All @@ -69,8 +63,17 @@ The output is:
return err
}

out := listResToChan(req.Context, next)
return res.Emit(out)
for {
r := next()
if r == nil {
break
}
if err := res.Emit(r); err != nil {
return err
}
}

return nil
},
PostRun: cmds.PostRunMap{
cmds.CLI: streamResult(func(v interface{}, out io.Writer) nonFatalError {
Expand Down Expand Up @@ -122,19 +125,26 @@ For ERROR entries the error will also be printed to stderr.
}
args := req.Arguments
if len(args) > 0 {
out := perKeyActionToChan(req.Context, args, func(c cid.Cid) *filestore.ListRes {
return filestore.Verify(fs, c)
})
return res.Emit(out)
return listByArgs(res, fs, args)
}

fileOrder, _ := req.Options[fileOrderOptionName].(bool)
next, err := filestore.VerifyAll(fs, fileOrder)
if err != nil {
return err
}
out := listResToChan(req.Context, next)
return res.Emit(out)

for {
r := next()
if r == nil {
break
}
if err := res.Emit(r); err != nil {
return err
}
}

return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *filestore.ListRes) error {
Expand Down Expand Up @@ -162,29 +172,19 @@ var dupsFileStore = &cmds.Command{
return err
}

out := make(chan interface{}, 128)

go func() {
defer close(out)
for cid := range ch {
have, err := fs.MainBlockstore().Has(cid)
if err != nil {
select {
case out <- &RefWrapper{Err: err.Error()}:
case <-req.Context.Done():
}
return
}
if have {
select {
case out <- &RefWrapper{Ref: cid.String()}:
case <-req.Context.Done():
return
}
for cid := range ch {
have, err := fs.MainBlockstore().Has(cid)
if err != nil {
return res.Emit(&RefWrapper{Err: err.Error()})
}
if have {
if err := res.Emit(&RefWrapper{Ref: cid.String()}); err != nil {
return err
}
}
}()
return res.Emit(out)
}

return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RefWrapper) error {
Expand Down Expand Up @@ -212,49 +212,24 @@ func getFilestore(env cmds.Environment) (*core.IpfsNode, *filestore.Filestore, e
return n, fs, err
}

func listResToChan(ctx context.Context, next func() *filestore.ListRes) <-chan interface{} {
out := make(chan interface{}, 128)
go func() {
defer close(out)
for {
r := next()
if r == nil {
return
func listByArgs(res cmds.ResponseEmitter, fs *filestore.Filestore, args []string) error {
for _, arg := range args {
c, err := cid.Decode(arg)
if err != nil {
ret := &filestore.ListRes{
Status: filestore.StatusOtherError,
ErrorMsg: fmt.Sprintf("%s: %v", arg, err),
}
select {
case out <- r:
case <-ctx.Done():
return
if err := res.Emit(ret); err != nil {
return err
}
continue
}
}()
return out
}

func perKeyActionToChan(ctx context.Context, args []string, action func(cid.Cid) *filestore.ListRes) <-chan interface{} {
out := make(chan interface{}, 128)
go func() {
defer close(out)
for _, arg := range args {
c, err := cid.Decode(arg)
if err != nil {
select {
case out <- &filestore.ListRes{
Status: filestore.StatusOtherError,
ErrorMsg: fmt.Sprintf("%s: %v", arg, err),
}:
case <-ctx.Done():
}

continue
}
r := action(c)
select {
case out <- r:
case <-ctx.Done():
return
}
r := filestore.Verify(fs, c)
if err := res.Emit(r); err != nil {
return err
}
}()
return out
}

return nil
}

0 comments on commit 672dfd0

Please sign in to comment.