Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement GetObjectRange for all storage providers #13650

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/ingester-rf1/objstore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ func (m *Multi) GetObject(ctx context.Context, objectKey string) (io.ReadCloser,
return s.GetObject(ctx, objectKey)
}

func (m *Multi) GetObjectRange(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return nil, err
}
return s.GetObjectRange(ctx, objectKey, off, length)
}

func (m *Multi) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions pkg/storage/chunk/client/alibaba/oss_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,26 @@ func (s *OssObjectClient) GetObject(ctx context.Context, objectKey string) (io.R
return resp.Response.Body, int64(size), err
}

// GetObject returns a reader and the size for the specified object key from the configured OSS bucket.
func (s *OssObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
var resp *oss.GetObjectResult
options := []oss.Option{
oss.Range(offset, offset+length-1),
}
err := instrument.CollectedRequest(ctx, "OSS.GetObject", ossRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
resp, requestErr = s.defaultBucket.DoGetObject(&oss.GetObjectRequest{ObjectKey: objectKey}, options)
if requestErr != nil {
return requestErr
}
return nil
})
if err != nil {
return nil, err
}
return resp.Response.Body, err
}

// PutObject puts the specified bytes into the configured OSS bucket at the provided key
func (s *OssObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return instrument.CollectedRequest(ctx, "OSS.PutObject", ossRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
Expand Down
34 changes: 34 additions & 0 deletions pkg/storage/chunk/client/aws/s3_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,40 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re
return nil, 0, errors.Wrap(lastErr, "failed to get s3 object")
}

// GetObject from the store
func (a *S3ObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
var resp *s3.GetObjectOutput

// Map the key into a bucket
bucket := a.bucketFromKey(objectKey)

var lastErr error

retries := backoff.New(ctx, a.cfg.BackoffConfig)
for retries.Ongoing() {
if ctx.Err() != nil {
return nil, errors.Wrap(ctx.Err(), "ctx related error during s3 getObject")
}

lastErr = loki_instrument.TimeRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
resp, requestErr = a.hedgedS3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(objectKey),
Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+length-1)),
})
return requestErr
})

if lastErr == nil && resp.Body != nil {
return resp.Body, nil
}
retries.Wait()
}

return nil, errors.Wrap(lastErr, "failed to get s3 object")
}

// PutObject into the store
func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return loki_instrument.TimeRequest(ctx, "S3.PutObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
Expand Down
35 changes: 32 additions & 3 deletions pkg/storage/chunk/client/azure/blob_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (b *BlobStorage) GetObject(ctx context.Context, objectKey string) (io.ReadC
)
err := loki_instrument.TimeRequest(ctx, "azure.GetObject", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
var err error
rc, size, err = b.getObject(ctx, objectKey)
rc, size, err = b.getObject(ctx, objectKey, 0, 0)
return err
})
b.metrics.egressBytesTotal.Add(float64(size))
Expand All @@ -262,14 +262,43 @@ func (b *BlobStorage) GetObject(ctx context.Context, objectKey string) (io.ReadC
return client_util.NewReadCloserWithContextCancelFunc(rc, cancel), size, nil
}

func (b *BlobStorage) getObject(ctx context.Context, objectKey string) (rc io.ReadCloser, size int64, err error) {
// GetObject returns a reader and the size for the specified object key.
func (b *BlobStorage) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
var cancel context.CancelFunc = func() {}
if b.cfg.RequestTimeout > 0 {
ctx, cancel = context.WithTimeout(ctx, (time.Duration(b.cfg.MaxRetries)*b.cfg.RequestTimeout)+(time.Duration(b.cfg.MaxRetries-1)*b.cfg.MaxRetryDelay)) // timeout only after azure client's built in retries
}

var (
size int64
rc io.ReadCloser
)
err := loki_instrument.TimeRequest(ctx, "azure.GetObject", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error {
var err error
rc, size, err = b.getObject(ctx, objectKey, offset, length)
return err
})
b.metrics.egressBytesTotal.Add(float64(size))
if err != nil {
// cancel the context if there is an error.
cancel()
return nil, err
}
// else return a wrapped ReadCloser which cancels the context while closing the reader.
return client_util.NewReadCloserWithContextCancelFunc(rc, cancel), nil
}

func (b *BlobStorage) getObject(ctx context.Context, objectKey string, offset, length int64) (rc io.ReadCloser, size int64, err error) {
if offset == 0 && length == 0 {
length = azblob.CountToEnd // azblob.CountToEnd == 0 but leaving this here for clarity
}
blockBlobURL, err := b.getBlobURL(objectKey, true)
if err != nil {
return nil, 0, err
}

// Request access to the blob
downloadResponse, err := blockBlobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, noClientKey)
downloadResponse, err := blockBlobURL.Download(ctx, offset, length, azblob.BlobAccessConditions{}, false, noClientKey)
if err != nil {
return nil, 0, err
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/storage/chunk/client/baidubce/bos_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,19 @@ func (b *BOSObjectStorage) GetObject(ctx context.Context, objectKey string) (io.
return res.Body, size, nil
}

func (b *BOSObjectStorage) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
var res *api.GetObjectResult
err := instrument.CollectedRequest(ctx, "BOS.GetObject", bosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
res, requestErr = b.client.GetObject(b.cfg.BucketName, objectKey, nil, offset, offset+length-1)
return requestErr
})
if err != nil {
return nil, errors.Wrapf(err, "failed to get BOS object [ %s ]", objectKey)
}
return res.Body, nil
}

func (b *BOSObjectStorage) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
var storageObjects []client.StorageObject
var commonPrefixes []client.StorageCommonPrefix
Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/chunk/client/gcp/gcs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,24 @@ func (s *GCSObjectClient) GetObject(ctx context.Context, objectKey string) (io.R
return util.NewReadCloserWithContextCancelFunc(rc, cancel), size, nil
}

// GetObject returns a reader and the size for the specified object key from the configured GCS bucket.
func (s *GCSObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
var cancel context.CancelFunc = func() {}
if s.cfg.RequestTimeout > 0 {
ctx, cancel = context.WithTimeout(ctx, s.cfg.RequestTimeout)
}

rangeReader, err := s.getsBuckets.Object(objectKey).NewRangeReader(ctx, offset, length)
if err != nil {
// cancel the context if there is an error.
cancel()
return nil, err
}

// else return a wrapped ReadCloser which cancels the context while closing the reader.
return util.NewReadCloserWithContextCancelFunc(rangeReader, cancel), nil
}

func (s *GCSObjectClient) getObject(ctx context.Context, objectKey string) (rc io.ReadCloser, size int64, err error) {
reader, err := s.getsBuckets.Object(objectKey).NewReader(ctx)
if err != nil {
Expand Down
31 changes: 31 additions & 0 deletions pkg/storage/chunk/client/ibmcloud/cos_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ibmcloud
import (
"context"
"flag"
"fmt"
"hash/fnv"
"io"
"net"
Expand Down Expand Up @@ -368,6 +369,36 @@ func (c *COSObjectClient) GetObject(ctx context.Context, objectKey string) (io.R
return nil, 0, errors.Wrap(err, "failed to get cos object")
}

// GetObject returns a reader and the size for the specified object key from the configured S3 bucket.
func (c *COSObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
var resp *cos.GetObjectOutput

// Map the key into a bucket
bucket := c.bucketFromKey(objectKey)

retries := backoff.New(ctx, c.cfg.BackoffConfig)
err := ctx.Err()
for retries.Ongoing() {
if ctx.Err() != nil {
return nil, errors.Wrap(ctx.Err(), "ctx related error during cos getObject")
}
err = instrument.CollectedRequest(ctx, "COS.GetObject", cosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
var requestErr error
resp, requestErr = c.hedgedCOS.GetObjectWithContext(ctx, &cos.GetObjectInput{
Bucket: ibm.String(bucket),
Key: ibm.String(objectKey),
Range: ibm.String(fmt.Sprintf("bytes=%d-%d", offset, offset+length-1)),
})
return requestErr
})
if err == nil && resp.Body != nil {
return resp.Body, nil
}
retries.Wait()
}
return nil, errors.Wrap(err, "failed to get cos object")
}

// PutObject into the store
func (c *COSObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
return instrument.CollectedRequest(ctx, "COS.PutObject", cosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error {
Expand Down
24 changes: 24 additions & 0 deletions pkg/storage/chunk/client/local/fs_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type FSObjectClient struct {
pathSeparator string
}

var _ client.ObjectClient = (*FSObjectClient)(nil)

// NewFSObjectClient makes a chunk.Client which stores chunks as files in the local filesystem.
func NewFSObjectClient(cfg FSConfig) (*FSObjectClient, error) {
// filepath.Clean cleans up the path by removing unwanted duplicate slashes, dots etc.
Expand Down Expand Up @@ -88,6 +90,28 @@ func (f *FSObjectClient) GetObject(_ context.Context, objectKey string) (io.Read
return fl, stats.Size(), nil
}

type SectionReadCloser struct {
io.Reader
closeFn func() error
}

func (l SectionReadCloser) Close() error {
return l.closeFn()
}

// GetObject from the store
func (f *FSObjectClient) GetObjectRange(_ context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
fl, err := os.Open(filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey)))
if err != nil {
return nil, err
}
closer := SectionReadCloser{
Reader: io.NewSectionReader(fl, offset, length),
closeFn: fl.Close,
}
return closer, nil
}

// PutObject into the store
func (f *FSObjectClient) PutObject(_ context.Context, objectKey string, object io.Reader) error {
fullPath := filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey))
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/chunk/client/object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type ObjectClient interface {
PutObject(ctx context.Context, objectKey string, object io.Reader) error
// NOTE: The consumer of GetObject should always call the Close method when it is done reading which otherwise could cause a resource leak.
GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error)
GetObjectRange(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error)

// List objects with given prefix.
//
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/chunk/client/openstack/swift_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ func (s *SwiftObjectClient) GetObject(_ context.Context, objectKey string) (io.R
return io.NopCloser(&buf), int64(buf.Len()), nil
}

// GetObject returns a reader and the size for the specified object key from the configured swift container.
func (s *SwiftObjectClient) GetObjectRange(_ context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
var buf bytes.Buffer
h := swift.Headers{
"Range": fmt.Sprintf("bytes=%d-%d", offset, offset+length-1),
}
_, err := s.hedgingConn.ObjectGet(s.cfg.ContainerName, objectKey, &buf, false, h)
if err != nil {
return nil, err
}

return io.NopCloser(&buf), nil
}

// PutObject puts the specified bytes into the configured Swift container at the provided key
func (s *SwiftObjectClient) PutObject(_ context.Context, objectKey string, object io.Reader) error {
_, err := s.conn.ObjectPut(s.cfg.ContainerName, objectKey, object, false, "", "", nil)
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/chunk/client/prefixed_object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func (p PrefixedObjectClient) GetObject(ctx context.Context, objectKey string) (
return p.downstreamClient.GetObject(ctx, p.prefix+objectKey)
}

func (p PrefixedObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
return p.downstreamClient.GetObjectRange(ctx, p.prefix+objectKey, offset, length)
}

func (p PrefixedObjectClient) List(ctx context.Context, prefix, delimiter string) ([]StorageObject, []StorageCommonPrefix, error) {
objects, commonPrefixes, err := p.downstreamClient.List(ctx, p.prefix+prefix, delimiter)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions pkg/storage/chunk/client/testutils/inmemory_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,26 @@ func (m *InMemoryObjectClient) GetObject(_ context.Context, objectKey string) (i
return io.NopCloser(bytes.NewReader(buf)), int64(len(buf)), nil
}

// GetObject implements client.ObjectClient.
func (m *InMemoryObjectClient) GetObjectRange(_ context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) {
m.mtx.RLock()
defer m.mtx.RUnlock()

if m.mode == MockStorageModeWriteOnly {
return nil, errPermissionDenied
}

buf, ok := m.objects[objectKey]
if !ok {
return nil, errStorageObjectNotFound
}
if len(buf) < int(offset+length) {
return nil, io.ErrUnexpectedEOF
}

return io.NopCloser(bytes.NewReader(buf[offset : offset+length])), nil
}

// PutObject implements client.ObjectClient.
func (m *InMemoryObjectClient) PutObject(_ context.Context, objectKey string, object io.Reader) error {
buf, err := io.ReadAll(object)
Expand Down
Loading