diff --git a/changelog/unreleased/limit-concurrency.md b/changelog/unreleased/limit-concurrency.md new file mode 100644 index 0000000000..5f48ab1285 --- /dev/null +++ b/changelog/unreleased/limit-concurrency.md @@ -0,0 +1,5 @@ +Enhancement: Limit concurrency in decomposedfs + +The number of concurrent goroutines used for listing directories in decomposedfs are now limited to a configurable number. + +https://github.com/cs3org/reva/pull/3740 diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 74dd741576..b1cc166f4c 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -115,7 +115,7 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) ( return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend) } - tp := tree.New(o.Root, o.TreeTimeAccounting, o.TreeSizeAccounting, lu, bs) + tp := tree.New(lu, bs, o) permissionsClient, err := pool.GetPermissionsClient(o.PermissionsSVC, pool.WithTLSMode(o.PermTLSMode)) if err != nil { @@ -782,29 +782,69 @@ func (fs *Decomposedfs) ListFolder(ctx context.Context, ref *provider.Reference, return nil, errtypes.NotFound(f) } - var children []*node.Node - children, err = fs.tp.ListFolder(ctx, n) + children, err := fs.tp.ListFolder(ctx, n) if err != nil { return nil, err } - finfos := make([]*provider.ResourceInfo, len(children)) - eg, ctx := errgroup.WithContext(ctx) - for i := range children { - pos := i - eg.Go(func() error { - np := rp - // add this childs permissions - pset, _ := n.PermissionSet(ctx) - node.AddPermissions(&np, &pset) - ri, err := children[pos].AsResourceInfo(ctx, &np, mdKeys, fieldMask, utils.IsRelativeReference(ref)) - if err != nil { - return errtypes.InternalError(err.Error()) + + numWorkers := fs.o.MaxConcurrency + if len(children) < numWorkers { + numWorkers = len(children) + } + work := make(chan *node.Node, len(children)) + results := make(chan *provider.ResourceInfo, len(children)) + + g, ctx := errgroup.WithContext(ctx) + + // Distribute work + g.Go(func() error { + defer close(work) + for _, child := range children { + select { + case work <- child: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + + // Spawn workers that'll concurrently work the queue + for i := 0; i < numWorkers; i++ { + g.Go(func() error { + for child := range work { + np := rp + // add this childs permissions + pset, _ := n.PermissionSet(ctx) + node.AddPermissions(&np, &pset) + ri, err := child.AsResourceInfo(ctx, &np, mdKeys, fieldMask, utils.IsRelativeReference(ref)) + if err != nil { + return errtypes.InternalError(err.Error()) + } + select { + case results <- ri: + case <-ctx.Done(): + return ctx.Err() + } } - finfos[pos] = ri return nil }) } - if err := eg.Wait(); err != nil { + + // Wait for things to settle down, then close results chan + go func() { + _ = g.Wait() // error is checked later + close(results) + }() + + finfos := make([]*provider.ResourceInfo, len(children)) + i := 0 + for fi := range results { + finfos[i] = fi + i++ + } + + if err := g.Wait(); err != nil { return nil, err } diff --git a/pkg/storage/utils/decomposedfs/options/options.go b/pkg/storage/utils/decomposedfs/options/options.go index 3b9e2007ed..bb4c8342e3 100644 --- a/pkg/storage/utils/decomposedfs/options/options.go +++ b/pkg/storage/utils/decomposedfs/options/options.go @@ -68,6 +68,7 @@ type Options struct { MaxAcquireLockCycles int `mapstructure:"max_acquire_lock_cycles"` LockCycleDurationFactor int `mapstructure:"lock_cycle_duration_factor"` + MaxConcurrency int `mapstructure:"max_concurrency"` MaxQuota uint64 `mapstructure:"max_quota"` } @@ -140,5 +141,9 @@ func New(m map[string]interface{}) (*Options, error) { } } + if o.MaxConcurrency <= 0 { + o.MaxConcurrency = 100 + } + return o, nil } diff --git a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go index 230a3d35af..25f9c7e02b 100644 --- a/pkg/storage/utils/decomposedfs/testhelpers/helpers.go +++ b/pkg/storage/utils/decomposedfs/testhelpers/helpers.go @@ -156,7 +156,7 @@ func NewTestEnv(config map[string]interface{}) (*TestEnv, error) { permissions := &mocks.PermissionsChecker{} cs3permissionsclient := &mocks.CS3PermissionsClient{} bs := &treemocks.Blobstore{} - tree := tree.New(o.Root, true, true, lu, bs) + tree := tree.New(lu, bs, o) fs, err := decomposedfs.New(o, lu, decomposedfs.NewPermissions(permissions, cs3permissionsclient), tree, nil) if err != nil { return nil, err diff --git a/pkg/storage/utils/decomposedfs/tree/tree.go b/pkg/storage/utils/decomposedfs/tree/tree.go index 0fce22d50e..19de08ad4b 100644 --- a/pkg/storage/utils/decomposedfs/tree/tree.go +++ b/pkg/storage/utils/decomposedfs/tree/tree.go @@ -39,6 +39,7 @@ import ( "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options" "github.com/cs3org/reva/v2/pkg/storage/utils/filelocks" "github.com/cs3org/reva/v2/pkg/utils" "github.com/google/uuid" @@ -76,22 +77,18 @@ type Tree struct { lookup PathLookup blobstore Blobstore - root string - treeSizeAccounting bool - treeTimeAccounting bool + options *options.Options } // PermissionCheckFunc defined a function used to check resource permissions type PermissionCheckFunc func(rp *provider.ResourcePermissions) bool // New returns a new instance of Tree -func New(root string, tta bool, tsa bool, lu PathLookup, bs Blobstore) *Tree { +func New(lu PathLookup, bs Blobstore, o *options.Options) *Tree { return &Tree{ - lookup: lu, - blobstore: bs, - root: root, - treeTimeAccounting: tta, - treeSizeAccounting: tsa, + lookup: lu, + blobstore: bs, + options: o, } } @@ -99,10 +96,10 @@ func New(root string, tta bool, tsa bool, lu PathLookup, bs Blobstore) *Tree { func (t *Tree) Setup() error { // create data paths for internal layout dataPaths := []string{ - filepath.Join(t.root, "spaces"), + filepath.Join(t.options.Root, "spaces"), // notes contain symlinks from nodes//uploads/ to ../../uploads/ // better to keep uploads on a fast / volatile storage before a workflow finally moves them to the nodes dir - filepath.Join(t.root, "uploads"), + filepath.Join(t.options.Root, "uploads"), } for _, v := range dataPaths { err := os.MkdirAll(v, 0700) @@ -328,42 +325,73 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro if err != nil { return nil, err } - nodes := make([]*node.Node, len(names)) - eg, ctx := errgroup.WithContext(ctx) - for i := range names { - pos := i - eg.Go(func() error { - nodeID, err := readChildNodeFromLink(filepath.Join(dir, names[pos])) - if err != nil { - return err - } + numWorkers := t.options.MaxConcurrency + if len(names) < numWorkers { + numWorkers = len(names) + } + work := make(chan string) + results := make(chan *node.Node) - child, err := node.ReadNode(ctx, t.lookup, n.SpaceID, nodeID, false, n, true) - if err != nil { - return err + g, ctx := errgroup.WithContext(ctx) + + // Distribute work + g.Go(func() error { + defer close(work) + for _, name := range names { + select { + case work <- name: + case <-ctx.Done(): + return ctx.Err() } + } + return nil + }) + + // Spawn workers that'll concurrently work the queue + for i := 0; i < numWorkers; i++ { + g.Go(func() error { + for name := range work { + nodeID, err := readChildNodeFromLink(filepath.Join(dir, name)) + if err != nil { + return err + } - // prevent listing denied resources - if !child.IsDenied(ctx) { - if child.SpaceRoot == nil { - child.SpaceRoot = n.SpaceRoot + child, err := node.ReadNode(ctx, t.lookup, n.SpaceID, nodeID, false, n, true) + if err != nil { + return err + } + + // prevent listing denied resources + if !child.IsDenied(ctx) { + if child.SpaceRoot == nil { + child.SpaceRoot = n.SpaceRoot + } + select { + case results <- child: + case <-ctx.Done(): + return ctx.Err() + } } - nodes[pos] = child } return nil }) } - if err := eg.Wait(); err != nil { - return nil, err - } + // Wait for things to settle down, then close results chan + go func() { + _ = g.Wait() // error is checked later + close(results) + }() retNodes := []*node.Node{} - for _, n := range nodes { - if n != nil { - retNodes = append(retNodes, n) - } + for n := range results { + retNodes = append(retNodes, n) + } + + if err := g.Wait(); err != nil { + return nil, err } + return retNodes, nil } @@ -402,7 +430,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) { deletionTime := time.Now().UTC().Format(time.RFC3339Nano) // Prepare the trash - trashLink := filepath.Join(t.root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2)) + trashLink := filepath.Join(t.options.Root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2)) if err := os.MkdirAll(filepath.Dir(trashLink), 0700); err != nil { // Roll back changes _ = n.RemoveXattr(prefixes.TrashOriginAttr) @@ -642,7 +670,7 @@ func (t *Tree) removeNode(path string, n *node.Node) error { // Propagate propagates changes to the root of the tree func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err error) { sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger() - if !t.treeTimeAccounting && !t.treeSizeAccounting { + if !t.options.TreeTimeAccounting && !t.options.TreeSizeAccounting { // no propagation enabled sublog.Debug().Msg("propagation disabled") return @@ -671,7 +699,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err return nil } - if t.treeTimeAccounting || (t.treeSizeAccounting && sizeDiff != 0) { + if t.options.TreeTimeAccounting || (t.options.TreeSizeAccounting && sizeDiff != 0) { attrs := node.Attributes{} var f *lockedfile.File @@ -696,7 +724,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err } }() - if t.treeTimeAccounting { + if t.options.TreeTimeAccounting { // update the parent tree time if it is older than the nodes mtime updateSyncTime := false @@ -731,7 +759,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err } // size accounting - if t.treeSizeAccounting && sizeDiff != 0 { + if t.options.TreeSizeAccounting && sizeDiff != 0 { var newSize uint64 // read treesize diff --git a/pkg/storage/utils/decomposedfs/upload_async_test.go b/pkg/storage/utils/decomposedfs/upload_async_test.go index 7d335b8b95..6cef81838d 100644 --- a/pkg/storage/utils/decomposedfs/upload_async_test.go +++ b/pkg/storage/utils/decomposedfs/upload_async_test.go @@ -105,7 +105,7 @@ var _ = Describe("Async file uploads", Ordered, func() { // setup fs pub, con = make(chan interface{}), make(chan interface{}) - tree := tree.New(o.Root, true, true, lu, bs) + tree := tree.New(lu, bs, o) fs, err = New(o, lu, NewPermissions(permissions, cs3permissionsclient), tree, stream.Chan{pub, con}) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/storage/utils/decomposedfs/upload_test.go b/pkg/storage/utils/decomposedfs/upload_test.go index 84480c2ca7..1102c63690 100644 --- a/pkg/storage/utils/decomposedfs/upload_test.go +++ b/pkg/storage/utils/decomposedfs/upload_test.go @@ -119,7 +119,7 @@ var _ = Describe("File uploads", func() { AddGrant: true, }, nil).Times(1) var err error - tree := tree.New(o.Root, true, true, lu, bs) + tree := tree.New(lu, bs, o) fs, err = decomposedfs.New(o, lu, decomposedfs.NewPermissions(permissions, cs3permissionsclient), tree, nil) Expect(err).ToNot(HaveOccurred())