Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the number of concurrent goroutines for listing directories #3740

Merged
merged 4 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/limit-concurrency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Limit concurrency in decomposedfs

The number of concurrent goroutines used for listing directories in decomposedfs are now limited to a configurable number.

https://github.com/cs3org/reva/pull/3740
74 changes: 57 additions & 17 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream) (
return nil, fmt.Errorf("unknown metadata backend %s, only 'messagepack' or 'xattrs' (default) supported", o.MetadataBackend)
}

tp := tree.New(o.Root, o.TreeTimeAccounting, o.TreeSizeAccounting, lu, bs)
tp := tree.New(lu, bs, o)

permissionsClient, err := pool.GetPermissionsClient(o.PermissionsSVC, pool.WithTLSMode(o.PermTLSMode))
if err != nil {
Expand Down Expand Up @@ -782,29 +782,69 @@ func (fs *Decomposedfs) ListFolder(ctx context.Context, ref *provider.Reference,
return nil, errtypes.NotFound(f)
}

var children []*node.Node
children, err = fs.tp.ListFolder(ctx, n)
children, err := fs.tp.ListFolder(ctx, n)
if err != nil {
return nil, err
}
finfos := make([]*provider.ResourceInfo, len(children))
eg, ctx := errgroup.WithContext(ctx)
for i := range children {
pos := i
eg.Go(func() error {
np := rp
// add this childs permissions
pset, _ := n.PermissionSet(ctx)
node.AddPermissions(&np, &pset)
ri, err := children[pos].AsResourceInfo(ctx, &np, mdKeys, fieldMask, utils.IsRelativeReference(ref))
if err != nil {
return errtypes.InternalError(err.Error())

numWorkers := fs.o.MaxConcurrency
if len(children) < numWorkers {
numWorkers = len(children)
}
work := make(chan *node.Node, len(children))
results := make(chan *provider.ResourceInfo, len(children))

g, ctx := errgroup.WithContext(ctx)

// Distribute work
g.Go(func() error {
defer close(work)
for _, child := range children {
select {
case work <- child:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})

// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
for child := range work {
np := rp
// add this childs permissions
pset, _ := n.PermissionSet(ctx)
node.AddPermissions(&np, &pset)
ri, err := child.AsResourceInfo(ctx, &np, mdKeys, fieldMask, utils.IsRelativeReference(ref))
if err != nil {
return errtypes.InternalError(err.Error())
}
select {
case results <- ri:
case <-ctx.Done():
return ctx.Err()
}
}
finfos[pos] = ri
return nil
})
}
if err := eg.Wait(); err != nil {

// Wait for things to settle down, then close results chan
go func() {
_ = g.Wait() // error is checked later
close(results)
}()

finfos := make([]*provider.ResourceInfo, len(children))
i := 0
for fi := range results {
finfos[i] = fi
i++
}

if err := g.Wait(); err != nil {
return nil, err
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/utils/decomposedfs/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Options struct {

MaxAcquireLockCycles int `mapstructure:"max_acquire_lock_cycles"`
LockCycleDurationFactor int `mapstructure:"lock_cycle_duration_factor"`
MaxConcurrency int `mapstructure:"max_concurrency"`

MaxQuota uint64 `mapstructure:"max_quota"`
}
Expand Down Expand Up @@ -140,5 +141,9 @@ func New(m map[string]interface{}) (*Options, error) {
}
}

if o.MaxConcurrency <= 0 {
o.MaxConcurrency = 100
}

return o, nil
}
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/testhelpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func NewTestEnv(config map[string]interface{}) (*TestEnv, error) {
permissions := &mocks.PermissionsChecker{}
cs3permissionsclient := &mocks.CS3PermissionsClient{}
bs := &treemocks.Blobstore{}
tree := tree.New(o.Root, true, true, lu, bs)
tree := tree.New(lu, bs, o)
fs, err := decomposedfs.New(o, lu, decomposedfs.NewPermissions(permissions, cs3permissionsclient), tree, nil)
if err != nil {
return nil, err
Expand Down
108 changes: 68 additions & 40 deletions pkg/storage/utils/decomposedfs/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/options"
"github.com/cs3org/reva/v2/pkg/storage/utils/filelocks"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/google/uuid"
Expand Down Expand Up @@ -76,33 +77,29 @@ type Tree struct {
lookup PathLookup
blobstore Blobstore

root string
treeSizeAccounting bool
treeTimeAccounting bool
options *options.Options
}

// PermissionCheckFunc defined a function used to check resource permissions
type PermissionCheckFunc func(rp *provider.ResourcePermissions) bool

// New returns a new instance of Tree
func New(root string, tta bool, tsa bool, lu PathLookup, bs Blobstore) *Tree {
func New(lu PathLookup, bs Blobstore, o *options.Options) *Tree {
return &Tree{
lookup: lu,
blobstore: bs,
root: root,
treeTimeAccounting: tta,
treeSizeAccounting: tsa,
lookup: lu,
blobstore: bs,
options: o,
}
}

// Setup prepares the tree structure
func (t *Tree) Setup() error {
// create data paths for internal layout
dataPaths := []string{
filepath.Join(t.root, "spaces"),
filepath.Join(t.options.Root, "spaces"),
// notes contain symlinks from nodes/<u-u-i-d>/uploads/<uploadid> to ../../uploads/<uploadid>
// better to keep uploads on a fast / volatile storage before a workflow finally moves them to the nodes dir
filepath.Join(t.root, "uploads"),
filepath.Join(t.options.Root, "uploads"),
}
for _, v := range dataPaths {
err := os.MkdirAll(v, 0700)
Expand Down Expand Up @@ -328,42 +325,73 @@ func (t *Tree) ListFolder(ctx context.Context, n *node.Node) ([]*node.Node, erro
if err != nil {
return nil, err
}
nodes := make([]*node.Node, len(names))

eg, ctx := errgroup.WithContext(ctx)
for i := range names {
pos := i
eg.Go(func() error {
nodeID, err := readChildNodeFromLink(filepath.Join(dir, names[pos]))
if err != nil {
return err
}
numWorkers := t.options.MaxConcurrency
if len(names) < numWorkers {
numWorkers = len(names)
}
work := make(chan string)
results := make(chan *node.Node)

child, err := node.ReadNode(ctx, t.lookup, n.SpaceID, nodeID, false, n, true)
if err != nil {
return err
g, ctx := errgroup.WithContext(ctx)

// Distribute work
g.Go(func() error {
defer close(work)
for _, name := range names {
select {
case work <- name:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})

// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
for name := range work {
nodeID, err := readChildNodeFromLink(filepath.Join(dir, name))
if err != nil {
return err
}

// prevent listing denied resources
if !child.IsDenied(ctx) {
if child.SpaceRoot == nil {
child.SpaceRoot = n.SpaceRoot
child, err := node.ReadNode(ctx, t.lookup, n.SpaceID, nodeID, false, n, true)
if err != nil {
return err
}

// prevent listing denied resources
if !child.IsDenied(ctx) {
if child.SpaceRoot == nil {
child.SpaceRoot = n.SpaceRoot
}
select {
case results <- child:
case <-ctx.Done():
return ctx.Err()
}
}
nodes[pos] = child
}
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
// Wait for things to settle down, then close results chan
go func() {
_ = g.Wait() // error is checked later
close(results)
}()

retNodes := []*node.Node{}
for _, n := range nodes {
if n != nil {
retNodes = append(retNodes, n)
}
for n := range results {
retNodes = append(retNodes, n)
}

if err := g.Wait(); err != nil {
return nil, err
}

return retNodes, nil
}

Expand Down Expand Up @@ -402,7 +430,7 @@ func (t *Tree) Delete(ctx context.Context, n *node.Node) (err error) {
deletionTime := time.Now().UTC().Format(time.RFC3339Nano)

// Prepare the trash
trashLink := filepath.Join(t.root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2))
trashLink := filepath.Join(t.options.Root, "spaces", lookup.Pathify(n.SpaceRoot.ID, 1, 2), "trash", lookup.Pathify(n.ID, 4, 2))
if err := os.MkdirAll(filepath.Dir(trashLink), 0700); err != nil {
// Roll back changes
_ = n.RemoveXattr(prefixes.TrashOriginAttr)
Expand Down Expand Up @@ -642,7 +670,7 @@ func (t *Tree) removeNode(path string, n *node.Node) error {
// Propagate propagates changes to the root of the tree
func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err error) {
sublog := appctx.GetLogger(ctx).With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger()
if !t.treeTimeAccounting && !t.treeSizeAccounting {
if !t.options.TreeTimeAccounting && !t.options.TreeSizeAccounting {
// no propagation enabled
sublog.Debug().Msg("propagation disabled")
return
Expand Down Expand Up @@ -671,7 +699,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err
return nil
}

if t.treeTimeAccounting || (t.treeSizeAccounting && sizeDiff != 0) {
if t.options.TreeTimeAccounting || (t.options.TreeSizeAccounting && sizeDiff != 0) {
attrs := node.Attributes{}

var f *lockedfile.File
Expand All @@ -696,7 +724,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err
}
}()

if t.treeTimeAccounting {
if t.options.TreeTimeAccounting {
// update the parent tree time if it is older than the nodes mtime
updateSyncTime := false

Expand Down Expand Up @@ -731,7 +759,7 @@ func (t *Tree) Propagate(ctx context.Context, n *node.Node, sizeDiff int64) (err
}

// size accounting
if t.treeSizeAccounting && sizeDiff != 0 {
if t.options.TreeSizeAccounting && sizeDiff != 0 {
var newSize uint64

// read treesize
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/upload_async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ var _ = Describe("Async file uploads", Ordered, func() {

// setup fs
pub, con = make(chan interface{}), make(chan interface{})
tree := tree.New(o.Root, true, true, lu, bs)
tree := tree.New(lu, bs, o)
fs, err = New(o, lu, NewPermissions(permissions, cs3permissionsclient), tree, stream.Chan{pub, con})
Expect(err).ToNot(HaveOccurred())

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils/decomposedfs/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ var _ = Describe("File uploads", func() {
AddGrant: true,
}, nil).Times(1)
var err error
tree := tree.New(o.Root, true, true, lu, bs)
tree := tree.New(lu, bs, o)
fs, err = decomposedfs.New(o, lu, decomposedfs.NewPermissions(permissions, cs3permissionsclient), tree, nil)
Expect(err).ToNot(HaveOccurred())

Expand Down