Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle tashbin file listing concurrently #4374

Merged
merged 4 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changelog/unreleased/concurrent-trashbin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Enhancement: Handle trashbin file listings concurrently

We now use a concurrent walker to list files in the trashbin. This
improves performance when listing files in the trashbin.

https://github.com/cs3org/reva/pull/4374
https://github.com/owncloud/ocis/issues/7844
147 changes: 97 additions & 50 deletions pkg/storage/utils/decomposedfs/recycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"strings"
"time"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
Expand All @@ -35,7 +38,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/pkg/errors"
)

// Recycle items are stored inside the node folder and start with the uuid of the deleted node.
Expand Down Expand Up @@ -214,66 +216,111 @@ func readTrashLink(path string) (string, string, string, error) {

func (fs *Decomposedfs) listTrashRoot(ctx context.Context, spaceID string) ([]*provider.RecycleItem, error) {
log := appctx.GetLogger(ctx)
items := make([]*provider.RecycleItem, 0)

trashRoot := fs.getRecycleRoot(spaceID)
matches, err := filepath.Glob(trashRoot + "/*/*/*/*/*")

subTrees, err := filepath.Glob(trashRoot + "/*")
if err != nil {
return nil, err
}

for _, itemPath := range matches {
nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping")
continue
}
numWorkers := fs.o.MaxConcurrency
if len(subTrees) < numWorkers {
numWorkers = len(subTrees)
}

md, err := os.Stat(nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping")
continue
}
work := make(chan string, len(subTrees))
results := make(chan *provider.RecycleItem, len(subTrees))

attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping")
continue
}
g, ctx := errgroup.WithContext(ctx)

nodeType := fs.lu.TypeFromPath(ctx, nodePath)
if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping")
continue
}

item := &provider.RecycleItem{
Type: nodeType,
Size: uint64(md.Size()),
Key: nodeID,
}
if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil {
item.DeletionTime = &types.Timestamp{
Seconds: uint64(deletionTime.Unix()),
// TODO nanos
// Distribute work
g.Go(func() error {
defer close(work)
for _, itemPath := range subTrees {
select {
case work <- itemPath:
case <-ctx.Done():
return ctx.Err()
}
} else {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring")
}
return nil
})

// Spawn workers that'll concurrently work the queue
for i := 0; i < numWorkers; i++ {
g.Go(func() error {
for subTree := range work {
matches, err := filepath.Glob(subTree + "/*/*/*/*")
if err != nil {
return err
}

for _, itemPath := range matches {
nodePath, nodeID, timeSuffix, err := readTrashLink(itemPath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Msg("error reading trash link, skipping")
continue
}

md, err := os.Stat(nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not stat trash item, skipping")
continue
}

attrs, err := fs.lu.MetadataBackend().All(ctx, nodePath)
if err != nil {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("could not get extended attributes, skipping")
continue
}

nodeType := fs.lu.TypeFromPath(ctx, nodePath)
if nodeType == provider.ResourceType_RESOURCE_TYPE_INVALID {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node_path", nodePath).Msg("invalid node type, skipping")
continue
}

item := &provider.RecycleItem{
Type: nodeType,
Size: uint64(md.Size()),
Key: nodeID,
}
if deletionTime, err := time.Parse(time.RFC3339Nano, timeSuffix); err == nil {
item.DeletionTime = &types.Timestamp{
Seconds: uint64(deletionTime.Unix()),
// TODO nanos
}
} else {
log.Error().Err(err).Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not parse time format, ignoring")
}

// lookup origin path in extended attributes
if attr, ok := attrs[prefixes.TrashOriginAttr]; ok {
item.Ref = &provider.Reference{Path: string(attr)}
} else {
log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path")
}
select {
case results <- item:
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
})
}

// lookup origin path in extended attributes
if attr, ok := attrs[prefixes.TrashOriginAttr]; ok {
item.Ref = &provider.Reference{Path: string(attr)}
} else {
log.Error().Str("trashRoot", trashRoot).Str("item", itemPath).Str("node", nodeID).Str("dtime", timeSuffix).Msg("could not read origin path, skipping")
continue
}
// TODO filter results by permission ... on the original parent? or the trashed node?
// if it were on the original parent it would be possible to see files that were trashed before the current user got access
// so -> check the trash node itself
// hmm listing trash currently lists the current users trash or the 'root' trash. from ocs only the home storage is queried for trash items.
// for now we can only really check if the current user is the owner
items = append(items, item)
// Wait for things to settle down, then close results chan
go func() {
_ = g.Wait() // error is checked later
close(results)
}()

// Collect results
items := []*provider.RecycleItem{}
for ri := range results {
items = append(items, ri)
}
return items, nil
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/storage/utils/decomposedfs/recycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,10 @@ var _ = Describe("Recycle", func() {
itemsA, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/")
Expect(err).ToNot(HaveOccurred())
Expect(len(itemsA)).To(Equal(2))

itemsB, err := env.Fs.ListRecycle(env.Ctx, &provider.Reference{ResourceId: env.SpaceRootRes}, "", "/")
Expect(err).ToNot(HaveOccurred())
Expect(len(itemsB)).To(Equal(2))

Expect(itemsA).To(Equal(itemsB))
Expect(itemsA).To(ConsistOf(itemsB))
})

It("they can be permanently deleted by the other user", func() {
Expand Down
Loading