Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make prefix for keys of objects created by boltdb-shipper configurable #3491

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ func (c *Config) Validate() error {
if err := c.Ingester.Validate(); err != nil {
return errors.Wrap(err, "invalid ingester config")
}
if err := c.StorageConfig.BoltDBShipperConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid boltdb-shipper config")
}
if err := c.CompactorConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid compactor config")
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func RegisterCustomIndexClients(cfg *Config, registerer prometheus.Registerer) {
return nil, err
}

return shipper.NewBoltDBShipperTableClient(objectClient), nil
return shipper.NewBoltDBShipperTableClient(objectClient, cfg.BoltDBShipperConfig.SharedStoreKeyPrefix), nil
})
}

Expand Down
16 changes: 11 additions & 5 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/storage/stores/shipper"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/storage/stores/util"
)

const delimiter = "/"

type Config struct {
WorkingDirectory string `yaml:"working_directory"`
SharedStoreType string `yaml:"shared_store"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
WorkingDirectory string `yaml:"working_directory"`
SharedStoreType string `yaml:"shared_store"`
SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
}

// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.WorkingDirectory, "boltdb.shipper.compactor.working-directory", "", "Directory where files can be downloaded for compaction.")
f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.compactor.shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem")
f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.compactor.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it.")
f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 2*time.Hour, "Interval at which to re-run the compaction operation.")
}

Expand All @@ -42,6 +44,10 @@ func (cfg *Config) IsDefaults() bool {
return reflect.DeepEqual(cfg, cpy)
}

func (cfg *Config) Validate() error {
return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
}

type Compactor struct {
services.Service

Expand All @@ -68,7 +74,7 @@ func NewCompactor(cfg Config, storageConfig storage.Config, r prometheus.Registe

compactor := Compactor{
cfg: cfg,
objectClient: util.NewPrefixedObjectClient(objectClient, shipper.StorageKeyPrefix),
objectClient: util.NewPrefixedObjectClient(objectClient, cfg.SharedStoreKeyPrefix),
metrics: newMetrics(r),
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/stores/shipper/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ func TestIsDefaults(t *testing.T) {
}, false},
{&Config{}, false},
{&Config{
CompactionInterval: 2 * time.Hour,
SharedStoreKeyPrefix: "index/",
CompactionInterval: 2 * time.Hour,
}, true},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
Expand Down
11 changes: 8 additions & 3 deletions pkg/storage/stores/shipper/shipper_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/grafana/loki/pkg/storage/stores/shipper/downloads"
"github.com/grafana/loki/pkg/storage/stores/shipper/uploads"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/storage/stores/util"
)

Expand All @@ -39,8 +40,6 @@ const (
// FilesystemObjectStoreType holds the periodic config type for the filesystem store
FilesystemObjectStoreType = "filesystem"

StorageKeyPrefix = "index/"

// UploadInterval defines interval for when we check if there are new index files to upload.
// It's also used to snapshot the currently written index tables so the snapshots can be used for reads.
UploadInterval = 1 * time.Minute
Expand All @@ -56,6 +55,7 @@ type boltDBIndexClient interface {
type Config struct {
ActiveIndexDirectory string `yaml:"active_index_directory"`
SharedStoreType string `yaml:"shared_store"`
SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"`
CacheLocation string `yaml:"cache_location"`
CacheTTL time.Duration `yaml:"cache_ttl"`
ResyncInterval time.Duration `yaml:"resync_interval"`
Expand All @@ -69,12 +69,17 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.ActiveIndexDirectory, "boltdb.shipper.active-index-directory", "", "Directory where ingesters would write boltdb files which would then be uploaded by shipper to configured storage")
f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.shared-store", "", "Shared store for keeping boltdb files. Supported types: gcs, s3, azure, filesystem")
f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it")
f.StringVar(&cfg.CacheLocation, "boltdb.shipper.cache-location", "", "Cache location for restoring boltDB files for queries")
f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries")
f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage")
f.IntVar(&cfg.QueryReadyNumDays, "boltdb.shipper.query-ready-num-days", 0, "Number of days of index to be kept downloaded for queries. Works only with tables created with 24h period.")
}

func (cfg *Config) Validate() error {
return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
}

type Shipper struct {
cfg Config
boltDBIndexClient boltDBIndexClient
Expand Down Expand Up @@ -116,7 +121,7 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R
return err
}

prefixedObjectClient := util.NewPrefixedObjectClient(storageClient, StorageKeyPrefix)
prefixedObjectClient := util.NewPrefixedObjectClient(storageClient, s.cfg.SharedStoreKeyPrefix)

if s.cfg.Mode != ModeReadOnly {
uploader, err := s.getUploaderName()
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/table_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ type boltDBShipperTableClient struct {
objectClient chunk.ObjectClient
}

func NewBoltDBShipperTableClient(objectClient chunk.ObjectClient) chunk.TableClient {
return &boltDBShipperTableClient{util.NewPrefixedObjectClient(objectClient, StorageKeyPrefix)}
func NewBoltDBShipperTableClient(objectClient chunk.ObjectClient, storageKeyPrefix string) chunk.TableClient {
return &boltDBShipperTableClient{util.NewPrefixedObjectClient(objectClient, storageKeyPrefix)}
}

func (b *boltDBShipperTableClient) ListTables(ctx context.Context) ([]string, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/table_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestBoltDBShipperTableClient(t *testing.T) {
}

// we need to use prefixed object client while creating files/folder
prefixedObjectClient := util.NewPrefixedObjectClient(objectClient, StorageKeyPrefix)
prefixedObjectClient := util.NewPrefixedObjectClient(objectClient, "index/")

for folder, files := range foldersWithFiles {
for _, fileName := range files {
Expand All @@ -45,7 +45,7 @@ func TestBoltDBShipperTableClient(t *testing.T) {
}
}

tableClient := NewBoltDBShipperTableClient(objectClient)
tableClient := NewBoltDBShipperTableClient(objectClient, "index/")

// check list of tables returns all the folders/tables created above
checkExpectedTables(t, tableClient, foldersWithFiles)
Expand Down
19 changes: 19 additions & 0 deletions pkg/storage/stores/shipper/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package util

import (
"context"
"errors"
"fmt"
"io"
"os"
Expand All @@ -17,6 +18,8 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
)

const delimiter = "/"

type StorageClient interface {
GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error)
}
Expand Down Expand Up @@ -172,3 +175,19 @@ func RemoveDirectories(incoming []chunk.StorageObject) []chunk.StorageObject {
func IsDirectory(key string) bool {
return strings.HasSuffix(key, "/")
}

func ValidateSharedStoreKeyPrefix(prefix string) error {
if prefix == "" {
return errors.New("shared store key prefix must be set")
} else if strings.Contains(prefix, "\\") {
// When using windows filesystem as object store the implementation of ObjectClient in Cortex takes care of conversion of separator.
// We just need to always use `/` as a path separator.
return fmt.Errorf("shared store key prefix should only have '%s' as a path separator", delimiter)
} else if strings.HasPrefix(prefix, delimiter) {
return errors.New("shared store key prefix should never start with a path separator i.e '/'")
} else if !strings.HasSuffix(prefix, delimiter) {
return errors.New("shared store key prefix should end with a path separator i.e '/'")
}

return nil
}