Skip to content

Commit

Permalink
initial list spaces implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
butonic committed Jun 18, 2021
1 parent 9740eed commit 087ec08
Show file tree
Hide file tree
Showing 13 changed files with 434 additions and 15 deletions.
99 changes: 90 additions & 9 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,37 +115,118 @@ func (s *svc) CreateStorageSpace(ctx context.Context, req *provider.CreateStorag

func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSpacesRequest) (*provider.ListStorageSpacesResponse, error) {
log := appctx.GetLogger(ctx)
// TODO: needs to be fixed
var id *provider.StorageSpaceId
for _, f := range req.Filters {
if f.Type == provider.ListStorageSpacesRequest_Filter_TYPE_ID {
id = f.GetId()
}
}

var providers []*registry.ProviderInfo
var err error
c, err := pool.GetStorageRegistryClient(s.c.StorageRegistryEndpoint)
if err != nil {
return nil, errors.Wrap(err, "gateway: error getting storage registry client")
}

if id != nil {
// query that specific story provider
parts := strings.SplitN(id.OpaqueId, "!", 2)
if len(parts) != 2 {
return &provider.ListStorageSpacesResponse{
Status: status.NewInvalidArg(ctx, "space id must be separated by !"),
}, nil
}
c, err := s.find(ctx, &provider.Reference{ResourceId: &provider.ResourceId{
StorageId: parts[0], // FIXME REFERENCE the StorageSpaceId is a storageid + a opaqueid
res, err := c.GetStorageProviders(ctx, &registry.GetStorageProvidersRequest{
Ref: &provider.Reference{ResourceId: &provider.ResourceId{
StorageId: parts[0], // FIXME REFERENCE the StorageSpaceId is a storageid + an opaqueid
OpaqueId: parts[1],
}})
}},
})
if err != nil {
return &provider.ListStorageSpacesResponse{
Status: status.NewStatusFromErrType(ctx, "error finding path", err),
Status: status.NewStatusFromErrType(ctx, "ListStorageSpaces filters: req "+req.String(), err),
}, nil
}
if res.Status.Code != rpc.Code_CODE_OK {
return &provider.ListStorageSpacesResponse{
Status: res.Status,
}, nil
}
providers = res.Providers
} else {
// get list of all storage providers
res, err := c.ListStorageProviders(ctx, &registry.ListStorageProvidersRequest{})

res, err := c.ListStorageSpaces(ctx, req)
if err != nil {
log.Err(err).Msg("gateway: error listing storage space on storage provider")
return &provider.ListStorageSpacesResponse{
Status: status.NewInternal(ctx, err, "error calling ListStorageSpaces"),
Status: status.NewStatusFromErrType(ctx, "error listing providers", err),
}, nil
}
if res.Status.Code != rpc.Code_CODE_OK {
return &provider.ListStorageSpacesResponse{
Status: res.Status,
}, nil
}

providers = []*registry.ProviderInfo{}
// FIXME filter only providers that have an id set ... currently none have?
// bug? only ProviderPath is set
for i := range res.Providers {
// use only providers whose path does not start with a /?
if strings.HasPrefix(res.Providers[i].ProviderPath, "/") {
continue
}
providers = append(providers, res.Providers[i])
}
}

spacesFromProviders := make([][]*provider.StorageSpace, len(providers))
errors := make([]error, len(providers))
var wg sync.WaitGroup

for i, p := range providers {
wg.Add(1)
go s.listStorageSpacesOnProvider(ctx, req, &spacesFromProviders[i], p, &errors[i], &wg)
}
wg.Wait()

uniqueSpaces := map[string]*provider.StorageSpace{}
for i := range providers {
if errors[i] != nil {
log.Debug().Err(errors[i]).Msg("skipping provider")
continue
}
for j := range spacesFromProviders[i] {
uniqueSpaces[spacesFromProviders[i][j].Id.OpaqueId] = spacesFromProviders[i][j]
}
}
spaces := []*provider.StorageSpace{}
for spaceID := range uniqueSpaces {
spaces = append(spaces, uniqueSpaces[spaceID])
}

return &provider.ListStorageSpacesResponse{
Status: status.NewOK(ctx),
StorageSpaces: spaces,
}, nil
}
return res, nil

func (s *svc) listStorageSpacesOnProvider(ctx context.Context, req *provider.ListStorageSpacesRequest, res *[]*provider.StorageSpace, p *registry.ProviderInfo, e *error, wg *sync.WaitGroup) {
defer wg.Done()
c, err := s.getStorageProviderClient(ctx, p)
if err != nil {
*e = errors.Wrap(err, "error connecting to storage provider="+p.Address)
return
}

r, err := c.ListStorageSpaces(ctx, req)
if err != nil {
*e = errors.Wrap(err, "gateway: error calling ListStorageSpaces")
return
}

*res = r.StorageSpaces
}

func (s *svc) UpdateStorageSpace(ctx context.Context, req *provider.UpdateStorageSpaceRequest) (*provider.UpdateStorageSpaceResponse, error) {
Expand Down
38 changes: 37 additions & 1 deletion internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,45 @@ func (s *service) CreateStorageSpace(ctx context.Context, req *provider.CreateSt
}, nil
}

func hasNodeID(s *provider.StorageSpace) bool {
return s != nil && s.Root != nil && s.Root.OpaqueId != ""
}

func (s *service) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSpacesRequest) (*provider.ListStorageSpacesResponse, error) {
spaces, err := s.storage.ListStorageSpaces(ctx, req.Filters)
if err != nil {
var st *rpc.Status
switch err.(type) {
case errtypes.IsNotFound:
st = status.NewNotFound(ctx, "not found when listing spaces")
case errtypes.PermissionDenied:
st = status.NewPermissionDenied(ctx, err, "permission denied")
case errtypes.NotSupported:
st = status.NewUnimplemented(ctx, err, "not implemented")
default:
st = status.NewInternal(ctx, err, "error listing spaces")
}
return &provider.ListStorageSpacesResponse{
Status: st,
}, nil
}

for i := range spaces {
if hasNodeID(spaces[i]) {
// fill in storagespace id if it is not set
if spaces[i].Id == nil || spaces[i].Id.OpaqueId == "" {
spaces[i].Id = &provider.StorageSpaceId{OpaqueId: s.mountID + "!" + spaces[i].Root.OpaqueId}
}
// fill in storage id if it is not set
if spaces[i].Root.StorageId == "" {
spaces[i].Root.StorageId = s.mountID
}
}
}

return &provider.ListStorageSpacesResponse{
Status: status.NewUnimplemented(ctx, errtypes.NotSupported("ListStorageSpaces not implemented"), "ListStorageSpaces not implemented"),
Status: status.NewOK(ctx),
StorageSpaces: spaces,
}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/fs/owncloud/owncloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2217,6 +2217,10 @@ func (fs *ocfs) RestoreRecycleItem(ctx context.Context, key string, restoreRef *
return fs.propagate(ctx, tgt)
}

func (fs *ocfs) ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter) ([]*provider.StorageSpace, error) {
return nil, errtypes.NotSupported("list storage spaces")
}

func (fs *ocfs) propagate(ctx context.Context, leafPath string) error {
var root string
if fs.c.EnableHome {
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/fs/owncloudsql/owncloudsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2160,6 +2160,11 @@ func (fs *ocfs) HashFile(path string) (string, string, string, error) {
}
}

func (fs *ocfs) ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter) ([]*provider.StorageSpace, error) {
// TODO(corby): Implement
return nil, nil
}

func readChecksumIntoResourceChecksum(ctx context.Context, checksums, algo string, ri *provider.ResourceInfo) {
re := regexp.MustCompile(strings.ToUpper(algo) + `:(.*)`)
matches := re.FindStringSubmatch(checksums)
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/fs/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,3 +661,7 @@ func (fs *s3FS) ListRecycle(ctx context.Context) ([]*provider.RecycleItem, error
func (fs *s3FS) RestoreRecycleItem(ctx context.Context, key string, restoreRef *provider.Reference) error {
return errtypes.NotSupported("restore recycle")
}

func (fs *s3FS) ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter) ([]*provider.StorageSpace, error) {
return nil, errtypes.NotSupported("list storage spaces")
}
7 changes: 7 additions & 0 deletions pkg/storage/registry/static/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (b *reg) FindProviders(ctx context.Context, ref *provider.Reference) ([]*re

// If the reference has a resource id set, use it to route
if ref.ResourceId != nil {
if ref.ResourceId.StorageId != "" {
for prefix, rule := range b.c.Rules {
addr := getProviderAddr(ctx, rule)
r, err := regexp.Compile("^" + prefix + "$")
Expand All @@ -159,6 +160,12 @@ func (b *reg) FindProviders(ctx context.Context, ref *provider.Reference) ([]*re
}}, nil
}
}
// TODO if the storage id is not set but node id is set we could poll all storage providers to check if the node is known there
// for now, say the reference is invalid
if ref.ResourceId.OpaqueId != "" {
return nil, errtypes.BadRequest("invalid reference " + ref.String())
}
}
}

// Try to find by path as most storage operations will be done using the path.
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type FS interface {
Shutdown(ctx context.Context) error
SetArbitraryMetadata(ctx context.Context, ref *provider.Reference, md *provider.ArbitraryMetadata) error
UnsetArbitraryMetadata(ctx context.Context, ref *provider.Reference, keys []string) error
ListStorageSpaces(ctx context.Context, filter []*provider.ListStorageSpacesRequest_Filter) ([]*provider.StorageSpace, error)
}

// Registry is the interface that storage registries implement
Expand Down
Loading

0 comments on commit 087ec08

Please sign in to comment.