Skip to content

Commit

Permalink
cephfs: initial implementation of recycle bin functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
glpatcern committed Jun 5, 2024
1 parent d0695a4 commit acab3ea
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 11 deletions.
129 changes: 118 additions & 11 deletions pkg/storage/fs/cephfs/cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cs3org/reva/pkg/storage/fs/registry"
"github.com/cs3org/reva/pkg/utils"
"github.com/cs3org/reva/pkg/utils/cfg"
"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -171,6 +172,21 @@ func (fs *cephfs) CreateDir(ctx context.Context, ref *provider.Reference) error
return getRevaError(err)
}

func getRecycleTargetFromPath(path string, recyclePath string, recyclePathDepth int) (string, error) {
// Tokenize the given (absolute) path
components := strings.Split(filepath.Clean(filepath.Separator+path), string(filepath.Separator))
if recyclePathDepth > len(components)-1 {
return "", errors.New("path is too short")
}

// And construct the target by injecting the recyclePath at the required depth
var target []string = []string{filepath.Separator}
target = append(target, components[:recyclePathDepth+1]...)
target = append(target, recyclePath, time.Now().Format("2006/01/02"))
target = append(target, components[recyclePathDepth+1:]...)
return filepath.Join(target...), nil
}

func (fs *cephfs) Delete(ctx context.Context, ref *provider.Reference) (err error) {
var path string
user := fs.makeUser(ctx)
Expand All @@ -180,10 +196,17 @@ func (fs *cephfs) Delete(ctx context.Context, ref *provider.Reference) (err erro
}

user.op(func(cv *cacheVal) {
if err = cv.mount.Unlink(path); err != nil && err.Error() == errIsADirectory {
err = cv.mount.RemoveDir(path)
if fs.c.RecyclePath != "" {
// Recycle bin is configured, move to recycle as opposed to unlink
targetPath, err := getRecycleTargetFromPath(path, fs.c.RecyclePath, fs.c.RecyclePathDepth)
if err == nil {
err = cv.mount.Rename(path, targetPath)
}
} else {
if err = cv.mount.Unlink(path); err != nil && err.Error() == errIsADirectory {
err = cv.mount.RemoveDir(path)
}
}

//TODO(tmourati): Add entry id logic
})

Expand Down Expand Up @@ -597,24 +620,108 @@ func (fs *cephfs) TouchFile(ctx context.Context, ref *provider.Reference) error
return getRevaError(err)
}

func (fs *cephfs) EmptyRecycle(ctx context.Context) error {
return errtypes.NotSupported("unimplemented")
}
func (fs *cephfs) listDeletedEntries(ctx context.Context, maxentries int, basePath string, from, to time.Time) ([]*provider.RecycleItem, error) {
deleted := []*provider.RecycleItem{}
count := 0
rootRecyclePath := filePath.Join(basePath, fs.conf.RecyclePath)
for d := to; !d.Before(from); d = d.AddDate(0, 0, -1) {

func (fs *cephfs) CreateStorageSpace(ctx context.Context, req *provider.CreateStorageSpaceRequest) (r *provider.CreateStorageSpaceResponse, err error) {
return nil, errtypes.NotSupported("unimplemented")
user.op(func(cv *cacheVal) {
var dir *goceph.Directory
if dir, err = cv.mount.OpenDir(filepath.Join(rootRecyclePath, d.Format("2006/01/02"))); err != nil {
return
}
defer closeDir(dir)

var entry *goceph.DirEntryPlus
for entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0); entry != nil && err == nil; entry, err = dir.ReadDirPlus(goceph.StatxBasicStats, 0) {
//TODO(lopresti) validate content of entry.Name() here.
targetPath := filepath.Join(basePath, entry.Name())
deleted = append(deleted, &provider.RecycleItem{
Ref: &provider.Reference{Path: targetPath},
Key: filepath.Join(rootRecyclePath, targetPath),
Size: entry.size,
DeletionTime: &types.Timestamp{Seconds: entry.mtime_sec},
})

count += 1
if count > maxentries {
err = errtypes.BadRequest("list too long")
return
}
}
})
}
return deleted, err
}

func (fs *cephfs) ListRecycle(ctx context.Context, basePath, key, relativePath string, from, to *typepb.Timestamp) ([]*provider.RecycleItem, error) {
return nil, errtypes.NotSupported("unimplemented")
md, err := fs.GetMD(ctx, &provider.Reference{Path: basePath}, nil)
if err != nil {
return nil, err
}
if !md.PermissionSet.ListRecycle {
return nil, errtypes.PermissionDenied("cephfs: user doesn't have permissions to restore recycled items")
}

var dateFrom, dateTo time.Time
if from != nil && to != nil {
dateFrom = time.Unix(int64(from.Seconds), 0)
dateTo = time.Unix(int64(to.Seconds), 0)
if dateFrom.AddDate(0, 0, fs.c.MaxDaysInRecycleList).Before(dateTo) {
return nil, errtypes.BadRequest("cephfs: too many days requested in listing the recycle bin")
}
} else {
// if no date range was given, list up to two days ago
dateTo = time.Now()
dateFrom = dateTo.AddDate(0, 0, -2)
}

sublog := appctx.GetLogger(ctx).With().Logger()
sublog.Debug().Time("from", dateFrom).Time("to", dateTo).Msg("executing ListDeletedEntries")
recycleEntries, err := fs.listDeletedEntries(ctx, fs.c.MaxRecycleEntries, basePath, dateFrom, dateTo)
if err != nil {
switch err.(type) {
case errtypes.IsBadRequest:
return nil, errtypes.BadRequest("cephfs: too many entries found in listing the recycle bin")
default:
return nil, errors.Wrap(err, "cephfs: error listing deleted entries")
}
}
return recycleEntries, nil
}

func (fs *cephfs) RestoreRecycleItem(ctx context.Context, basePath, key, relativePath string, restoreRef *provider.Reference) error {
return errtypes.NotSupported("unimplemented")
user := fs.makeUser(ctx)
md, err := fs.GetMD(ctx, &provider.Reference{Path: basePath}, nil)
if err != nil {
return err
}
if !md.PermissionSet.RestoreRecycleItem {
return errtypes.PermissionDenied("eosfs: user doesn't have permissions to restore recycled items")
}

user.op(func(cv *cacheVal) {
//TODO(lopresti) validate content of basePath and relativePath. Key is expected to contain the recycled path
if err = cv.mount.Rename(key, filepath.Join(basePath, relativePath)); err != nil {
return
}
//TODO(tmourati): Add entry id logic, handle already moved file error
})

return getRevaError(err)
}

func (fs *cephfs) PurgeRecycleItem(ctx context.Context, basePath, key, relativePath string) error {
return errtypes.NotSupported("unimplemented")
return errtypes.NotSupported("cephfs: operation not supported")
}

func (fs *cephfs) EmptyRecycle(ctx context.Context) error {
return errtypes.NotSupported("cephfs: operation not supported")
}

func (fs *cephfs) CreateStorageSpace(ctx context.Context, req *provider.CreateStorageSpaceRequest) (r *provider.CreateStorageSpaceResponse, err error) {
return nil, errtypes.NotSupported("unimplemented")
}

func (fs *cephfs) ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter) ([]*provider.StorageSpace, error) {
Expand Down
23 changes: 23 additions & 0 deletions pkg/storage/fs/cephfs/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ type Options struct {
DirPerms uint32 `mapstructure:"dir_perms"`
FilePerms uint32 `mapstructure:"file_perms"`
UserQuotaBytes uint64 `mapstructure:"user_quota_bytes"`
// Path of the recycle bin. If empty, recycling is disabled.
RecyclePath string `mapstructure:"recycle_path"`
// Depth of the Recycle bin location, that is after how many path components
// the recycle path is located: this allows supporting recycles such as
// /top-level/s/space/.recycle with a depth = 3. Defaults to 0.
RecyclePathDepth int `mapstructure:"recycle_path_depth"`
// Maximum entries count a ListRecycle call may return: if exceeded, ListRecycle
// will return a BadRequest error
MaxRecycleEntries int `mapstructure:"max_recycle_entries"`
// Maximum time span in days a ListRecycle call may return: if exceeded, ListRecycle
// will override the "to" date with "from" + this value
MaxDaysInRecycleList int `mapstructure:"max_days_in_recycle_list"`
HiddenDirs map[string]bool
}

Expand Down Expand Up @@ -102,6 +114,9 @@ func (c *Options) ApplyDefaults() {
"..": true,
removeLeadingSlash(c.ShadowFolder): true,
}
if c.RecyclePath != "" {
c.HiddenDirs.append({c.RecyclePath: true})
}

if c.DirPerms == 0 {
c.DirPerms = dirPermDefault
Expand All @@ -114,4 +129,12 @@ func (c *Options) ApplyDefaults() {
if c.UserQuotaBytes == 0 {
c.UserQuotaBytes = 50000000000
}

if c.MaxDaysInRecycleList == 0 {
c.MaxDaysInRecycleList = 14
}

if c.MaxRecycleEntries == 0 {
c.MaxRecycleEntries = 2000
}
}

0 comments on commit acab3ea

Please sign in to comment.