From d9c441efcc159e91a4251c9aca8f8914633c8505 Mon Sep 17 00:00:00 2001 From: benclive Date: Wed, 24 Jul 2024 18:25:42 +0100 Subject: [PATCH] feat: Implement GetObjectRange for all storage providers (#13650) --- pkg/ingester-rf1/objstore/storage.go | 8 +++++ .../chunk/client/alibaba/oss_object_client.go | 20 +++++++++++ .../chunk/client/aws/s3_storage_client.go | 34 ++++++++++++++++++ .../chunk/client/azure/blob_storage_client.go | 35 +++++++++++++++++-- .../client/baidubce/bos_storage_client.go | 13 +++++++ .../chunk/client/congestion/controller.go | 7 ++++ .../client/congestion/controller_test.go | 3 ++ .../chunk/client/gcp/gcs_object_client.go | 18 ++++++++++ .../client/ibmcloud/cos_object_client.go | 31 ++++++++++++++++ .../chunk/client/local/fs_object_client.go | 24 +++++++++++++ pkg/storage/chunk/client/object_client.go | 1 + .../client/openstack/swift_object_client.go | 14 ++++++++ .../chunk/client/prefixed_object_client.go | 4 +++ .../testutils/inmemory_storage_client.go | 20 +++++++++++ 14 files changed, 229 insertions(+), 3 deletions(-) diff --git a/pkg/ingester-rf1/objstore/storage.go b/pkg/ingester-rf1/objstore/storage.go index 8ec7cf697083..beb544e1980a 100644 --- a/pkg/ingester-rf1/objstore/storage.go +++ b/pkg/ingester-rf1/objstore/storage.go @@ -93,6 +93,14 @@ func (m *Multi) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, return s.GetObject(ctx, objectKey) } +func (m *Multi) GetObjectRange(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error) { + s, err := m.GetStoreFor(model.Now()) + if err != nil { + return nil, err + } + return s.GetObjectRange(ctx, objectKey, off, length) +} + func (m *Multi) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { s, err := m.GetStoreFor(model.Now()) if err != nil { diff --git a/pkg/storage/chunk/client/alibaba/oss_object_client.go b/pkg/storage/chunk/client/alibaba/oss_object_client.go index d8446f6db905..cbe449fca9e5 100644 --- a/pkg/storage/chunk/client/alibaba/oss_object_client.go +++ b/pkg/storage/chunk/client/alibaba/oss_object_client.go @@ -108,6 +108,26 @@ func (s *OssObjectClient) GetObject(ctx context.Context, objectKey string) (io.R return resp.Response.Body, int64(size), err } +// GetObject returns a reader and the size for the specified object key from the configured OSS bucket. +func (s *OssObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { + var resp *oss.GetObjectResult + options := []oss.Option{ + oss.Range(offset, offset+length-1), + } + err := instrument.CollectedRequest(ctx, "OSS.GetObject", ossRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + var requestErr error + resp, requestErr = s.defaultBucket.DoGetObject(&oss.GetObjectRequest{ObjectKey: objectKey}, options) + if requestErr != nil { + return requestErr + } + return nil + }) + if err != nil { + return nil, err + } + return resp.Response.Body, err +} + // PutObject puts the specified bytes into the configured OSS bucket at the provided key func (s *OssObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error { return instrument.CollectedRequest(ctx, "OSS.PutObject", ossRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { diff --git a/pkg/storage/chunk/client/aws/s3_storage_client.go b/pkg/storage/chunk/client/aws/s3_storage_client.go index c2a50dd16ff6..b0f8152db6b0 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client.go @@ -380,6 +380,40 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re return nil, 0, errors.Wrap(lastErr, "failed to get s3 object") } +// GetObject from the store +func (a *S3ObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { + var resp *s3.GetObjectOutput + + // Map the key into a bucket + bucket := a.bucketFromKey(objectKey) + + var lastErr error + + retries := backoff.New(ctx, a.cfg.BackoffConfig) + for retries.Ongoing() { + if ctx.Err() != nil { + return nil, errors.Wrap(ctx.Err(), "ctx related error during s3 getObject") + } + + lastErr = loki_instrument.TimeRequest(ctx, "S3.GetObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + var requestErr error + resp, requestErr = a.hedgedS3.GetObjectWithContext(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(objectKey), + Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+length-1)), + }) + return requestErr + }) + + if lastErr == nil && resp.Body != nil { + return resp.Body, nil + } + retries.Wait() + } + + return nil, errors.Wrap(lastErr, "failed to get s3 object") +} + // PutObject into the store func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error { return loki_instrument.TimeRequest(ctx, "S3.PutObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { diff --git a/pkg/storage/chunk/client/azure/blob_storage_client.go b/pkg/storage/chunk/client/azure/blob_storage_client.go index f1e93b993d56..2e66e1e89da3 100644 --- a/pkg/storage/chunk/client/azure/blob_storage_client.go +++ b/pkg/storage/chunk/client/azure/blob_storage_client.go @@ -249,7 +249,7 @@ func (b *BlobStorage) GetObject(ctx context.Context, objectKey string) (io.ReadC ) err := loki_instrument.TimeRequest(ctx, "azure.GetObject", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error { var err error - rc, size, err = b.getObject(ctx, objectKey) + rc, size, err = b.getObject(ctx, objectKey, 0, 0) return err }) b.metrics.egressBytesTotal.Add(float64(size)) @@ -262,14 +262,43 @@ func (b *BlobStorage) GetObject(ctx context.Context, objectKey string) (io.ReadC return client_util.NewReadCloserWithContextCancelFunc(rc, cancel), size, nil } -func (b *BlobStorage) getObject(ctx context.Context, objectKey string) (rc io.ReadCloser, size int64, err error) { +// GetObject returns a reader and the size for the specified object key. +func (b *BlobStorage) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { + var cancel context.CancelFunc = func() {} + if b.cfg.RequestTimeout > 0 { + ctx, cancel = context.WithTimeout(ctx, (time.Duration(b.cfg.MaxRetries)*b.cfg.RequestTimeout)+(time.Duration(b.cfg.MaxRetries-1)*b.cfg.MaxRetryDelay)) // timeout only after azure client's built in retries + } + + var ( + size int64 + rc io.ReadCloser + ) + err := loki_instrument.TimeRequest(ctx, "azure.GetObject", instrument.NewHistogramCollector(b.metrics.requestDuration), instrument.ErrorCode, func(ctx context.Context) error { + var err error + rc, size, err = b.getObject(ctx, objectKey, offset, length) + return err + }) + b.metrics.egressBytesTotal.Add(float64(size)) + if err != nil { + // cancel the context if there is an error. + cancel() + return nil, err + } + // else return a wrapped ReadCloser which cancels the context while closing the reader. + return client_util.NewReadCloserWithContextCancelFunc(rc, cancel), nil +} + +func (b *BlobStorage) getObject(ctx context.Context, objectKey string, offset, length int64) (rc io.ReadCloser, size int64, err error) { + if offset == 0 && length == 0 { + length = azblob.CountToEnd // azblob.CountToEnd == 0 but leaving this here for clarity + } blockBlobURL, err := b.getBlobURL(objectKey, true) if err != nil { return nil, 0, err } // Request access to the blob - downloadResponse, err := blockBlobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, noClientKey) + downloadResponse, err := blockBlobURL.Download(ctx, offset, length, azblob.BlobAccessConditions{}, false, noClientKey) if err != nil { return nil, 0, err } diff --git a/pkg/storage/chunk/client/baidubce/bos_storage_client.go b/pkg/storage/chunk/client/baidubce/bos_storage_client.go index b9abd8c90dbf..edb6870033db 100644 --- a/pkg/storage/chunk/client/baidubce/bos_storage_client.go +++ b/pkg/storage/chunk/client/baidubce/bos_storage_client.go @@ -117,6 +117,19 @@ func (b *BOSObjectStorage) GetObject(ctx context.Context, objectKey string) (io. return res.Body, size, nil } +func (b *BOSObjectStorage) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { + var res *api.GetObjectResult + err := instrument.CollectedRequest(ctx, "BOS.GetObject", bosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + var requestErr error + res, requestErr = b.client.GetObject(b.cfg.BucketName, objectKey, nil, offset, offset+length-1) + return requestErr + }) + if err != nil { + return nil, errors.Wrapf(err, "failed to get BOS object [ %s ]", objectKey) + } + return res.Body, nil +} + func (b *BOSObjectStorage) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { var storageObjects []client.StorageObject var commonPrefixes []client.StorageCommonPrefix diff --git a/pkg/storage/chunk/client/congestion/controller.go b/pkg/storage/chunk/client/congestion/controller.go index 6a60a2ce7aea..1e3e2ee0dcb3 100644 --- a/pkg/storage/chunk/client/congestion/controller.go +++ b/pkg/storage/chunk/client/congestion/controller.go @@ -133,6 +133,10 @@ func (a *AIMDController) GetObject(ctx context.Context, objectKey string) (io.Re return rc, sz, err } +func (a *AIMDController) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { + return a.inner.GetObjectRange(ctx, objectKey, offset, length) +} + func (a *AIMDController) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { return a.inner.List(ctx, prefix, delimiter) } @@ -213,6 +217,9 @@ func (n *NoopController) PutObject(context.Context, string, io.Reader) error { r func (n *NoopController) GetObject(context.Context, string) (io.ReadCloser, int64, error) { return nil, 0, nil } +func (n *NoopController) GetObjectRange(context.Context, string, int64, int64) (io.ReadCloser, error) { + return nil, nil +} func (n *NoopController) List(context.Context, string, string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { return nil, nil, nil diff --git a/pkg/storage/chunk/client/congestion/controller_test.go b/pkg/storage/chunk/client/congestion/controller_test.go index 23fa8a419676..a46466ebfc54 100644 --- a/pkg/storage/chunk/client/congestion/controller_test.go +++ b/pkg/storage/chunk/client/congestion/controller_test.go @@ -259,6 +259,9 @@ func (m *mockObjectClient) GetObject(context.Context, string) (io.ReadCloser, in return io.NopCloser(strings.NewReader("bar")), 3, nil } +func (m *mockObjectClient) GetObjectRange(context.Context, string, int64, int64) (io.ReadCloser, error) { + panic("not implemented") +} func (m *mockObjectClient) ObjectExists(context.Context, string) (bool, error) { panic("not implemented") diff --git a/pkg/storage/chunk/client/gcp/gcs_object_client.go b/pkg/storage/chunk/client/gcp/gcs_object_client.go index 57d26b334ece..d1289a61e771 100644 --- a/pkg/storage/chunk/client/gcp/gcs_object_client.go +++ b/pkg/storage/chunk/client/gcp/gcs_object_client.go @@ -151,6 +151,24 @@ func (s *GCSObjectClient) GetObject(ctx context.Context, objectKey string) (io.R return util.NewReadCloserWithContextCancelFunc(rc, cancel), size, nil } +// GetObject returns a reader and the size for the specified object key from the configured GCS bucket. +func (s *GCSObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { + var cancel context.CancelFunc = func() {} + if s.cfg.RequestTimeout > 0 { + ctx, cancel = context.WithTimeout(ctx, s.cfg.RequestTimeout) + } + + rangeReader, err := s.getsBuckets.Object(objectKey).NewRangeReader(ctx, offset, length) + if err != nil { + // cancel the context if there is an error. + cancel() + return nil, err + } + + // else return a wrapped ReadCloser which cancels the context while closing the reader. + return util.NewReadCloserWithContextCancelFunc(rangeReader, cancel), nil +} + func (s *GCSObjectClient) getObject(ctx context.Context, objectKey string) (rc io.ReadCloser, size int64, err error) { reader, err := s.getsBuckets.Object(objectKey).NewReader(ctx) if err != nil { diff --git a/pkg/storage/chunk/client/ibmcloud/cos_object_client.go b/pkg/storage/chunk/client/ibmcloud/cos_object_client.go index 6126df1b2332..a796ab88dea4 100644 --- a/pkg/storage/chunk/client/ibmcloud/cos_object_client.go +++ b/pkg/storage/chunk/client/ibmcloud/cos_object_client.go @@ -3,6 +3,7 @@ package ibmcloud import ( "context" "flag" + "fmt" "hash/fnv" "io" "net" @@ -368,6 +369,36 @@ func (c *COSObjectClient) GetObject(ctx context.Context, objectKey string) (io.R return nil, 0, errors.Wrap(err, "failed to get cos object") } +// GetObject returns a reader and the size for the specified object key from the configured S3 bucket. +func (c *COSObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { + var resp *cos.GetObjectOutput + + // Map the key into a bucket + bucket := c.bucketFromKey(objectKey) + + retries := backoff.New(ctx, c.cfg.BackoffConfig) + err := ctx.Err() + for retries.Ongoing() { + if ctx.Err() != nil { + return nil, errors.Wrap(ctx.Err(), "ctx related error during cos getObject") + } + err = instrument.CollectedRequest(ctx, "COS.GetObject", cosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { + var requestErr error + resp, requestErr = c.hedgedCOS.GetObjectWithContext(ctx, &cos.GetObjectInput{ + Bucket: ibm.String(bucket), + Key: ibm.String(objectKey), + Range: ibm.String(fmt.Sprintf("bytes=%d-%d", offset, offset+length-1)), + }) + return requestErr + }) + if err == nil && resp.Body != nil { + return resp.Body, nil + } + retries.Wait() + } + return nil, errors.Wrap(err, "failed to get cos object") +} + // PutObject into the store func (c *COSObjectClient) PutObject(ctx context.Context, objectKey string, object io.Reader) error { return instrument.CollectedRequest(ctx, "COS.PutObject", cosRequestDuration, instrument.ErrorCode, func(ctx context.Context) error { diff --git a/pkg/storage/chunk/client/local/fs_object_client.go b/pkg/storage/chunk/client/local/fs_object_client.go index bde92c83025d..751e1b94b37b 100644 --- a/pkg/storage/chunk/client/local/fs_object_client.go +++ b/pkg/storage/chunk/client/local/fs_object_client.go @@ -46,6 +46,8 @@ type FSObjectClient struct { pathSeparator string } +var _ client.ObjectClient = (*FSObjectClient)(nil) + // NewFSObjectClient makes a chunk.Client which stores chunks as files in the local filesystem. func NewFSObjectClient(cfg FSConfig) (*FSObjectClient, error) { // filepath.Clean cleans up the path by removing unwanted duplicate slashes, dots etc. @@ -88,6 +90,28 @@ func (f *FSObjectClient) GetObject(_ context.Context, objectKey string) (io.Read return fl, stats.Size(), nil } +type SectionReadCloser struct { + io.Reader + closeFn func() error +} + +func (l SectionReadCloser) Close() error { + return l.closeFn() +} + +// GetObject from the store +func (f *FSObjectClient) GetObjectRange(_ context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { + fl, err := os.Open(filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey))) + if err != nil { + return nil, err + } + closer := SectionReadCloser{ + Reader: io.NewSectionReader(fl, offset, length), + closeFn: fl.Close, + } + return closer, nil +} + // PutObject into the store func (f *FSObjectClient) PutObject(_ context.Context, objectKey string, object io.Reader) error { fullPath := filepath.Join(f.cfg.Directory, filepath.FromSlash(objectKey)) diff --git a/pkg/storage/chunk/client/object_client.go b/pkg/storage/chunk/client/object_client.go index 460c9566f6e7..2f5c3e263e85 100644 --- a/pkg/storage/chunk/client/object_client.go +++ b/pkg/storage/chunk/client/object_client.go @@ -22,6 +22,7 @@ type ObjectClient interface { PutObject(ctx context.Context, objectKey string, object io.Reader) error // NOTE: The consumer of GetObject should always call the Close method when it is done reading which otherwise could cause a resource leak. GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) + GetObjectRange(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error) // List objects with given prefix. // diff --git a/pkg/storage/chunk/client/openstack/swift_object_client.go b/pkg/storage/chunk/client/openstack/swift_object_client.go index 50ac21c12a75..b97dbe1dd9da 100644 --- a/pkg/storage/chunk/client/openstack/swift_object_client.go +++ b/pkg/storage/chunk/client/openstack/swift_object_client.go @@ -144,6 +144,20 @@ func (s *SwiftObjectClient) GetObject(_ context.Context, objectKey string) (io.R return io.NopCloser(&buf), int64(buf.Len()), nil } +// GetObject returns a reader and the size for the specified object key from the configured swift container. +func (s *SwiftObjectClient) GetObjectRange(_ context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { + var buf bytes.Buffer + h := swift.Headers{ + "Range": fmt.Sprintf("bytes=%d-%d", offset, offset+length-1), + } + _, err := s.hedgingConn.ObjectGet(s.cfg.ContainerName, objectKey, &buf, false, h) + if err != nil { + return nil, err + } + + return io.NopCloser(&buf), nil +} + // PutObject puts the specified bytes into the configured Swift container at the provided key func (s *SwiftObjectClient) PutObject(_ context.Context, objectKey string, object io.Reader) error { _, err := s.conn.ObjectPut(s.cfg.ContainerName, objectKey, object, false, "", "", nil) diff --git a/pkg/storage/chunk/client/prefixed_object_client.go b/pkg/storage/chunk/client/prefixed_object_client.go index 1f887a64e271..899dcd2b2112 100644 --- a/pkg/storage/chunk/client/prefixed_object_client.go +++ b/pkg/storage/chunk/client/prefixed_object_client.go @@ -27,6 +27,10 @@ func (p PrefixedObjectClient) GetObject(ctx context.Context, objectKey string) ( return p.downstreamClient.GetObject(ctx, p.prefix+objectKey) } +func (p PrefixedObjectClient) GetObjectRange(ctx context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { + return p.downstreamClient.GetObjectRange(ctx, p.prefix+objectKey, offset, length) +} + func (p PrefixedObjectClient) List(ctx context.Context, prefix, delimiter string) ([]StorageObject, []StorageCommonPrefix, error) { objects, commonPrefixes, err := p.downstreamClient.List(ctx, p.prefix+prefix, delimiter) if err != nil { diff --git a/pkg/storage/chunk/client/testutils/inmemory_storage_client.go b/pkg/storage/chunk/client/testutils/inmemory_storage_client.go index 2056703522d1..da65a35d5317 100644 --- a/pkg/storage/chunk/client/testutils/inmemory_storage_client.go +++ b/pkg/storage/chunk/client/testutils/inmemory_storage_client.go @@ -425,6 +425,26 @@ func (m *InMemoryObjectClient) GetObject(_ context.Context, objectKey string) (i return io.NopCloser(bytes.NewReader(buf)), int64(len(buf)), nil } +// GetObject implements client.ObjectClient. +func (m *InMemoryObjectClient) GetObjectRange(_ context.Context, objectKey string, offset, length int64) (io.ReadCloser, error) { + m.mtx.RLock() + defer m.mtx.RUnlock() + + if m.mode == MockStorageModeWriteOnly { + return nil, errPermissionDenied + } + + buf, ok := m.objects[objectKey] + if !ok { + return nil, errStorageObjectNotFound + } + if len(buf) < int(offset+length) { + return nil, io.ErrUnexpectedEOF + } + + return io.NopCloser(bytes.NewReader(buf[offset : offset+length])), nil +} + // PutObject implements client.ObjectClient. func (m *InMemoryObjectClient) PutObject(_ context.Context, objectKey string, object io.Reader) error { buf, err := io.ReadAll(object)