diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 0cfa58ca745d..3ae9d20627bd 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -131,6 +131,12 @@ func (c *Config) Validate() error { if err := c.Ingester.Validate(); err != nil { return errors.Wrap(err, "invalid ingester config") } + if err := c.StorageConfig.BoltDBShipperConfig.Validate(); err != nil { + return errors.Wrap(err, "invalid boltdb-shipper config") + } + if err := c.CompactorConfig.Validate(); err != nil { + return errors.Wrap(err, "invalid compactor config") + } return nil } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index a3fdfb49f80d..312c530b0ab3 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -362,7 +362,7 @@ func RegisterCustomIndexClients(cfg *Config, registerer prometheus.Registerer) { return nil, err } - return shipper.NewBoltDBShipperTableClient(objectClient), nil + return shipper.NewBoltDBShipperTableClient(objectClient, cfg.BoltDBShipperConfig.SharedStoreKeyPrefix), nil }) } diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 56c04902a26d..94828f4ed299 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -17,22 +17,24 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" - "github.com/grafana/loki/pkg/storage/stores/shipper" + shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" "github.com/grafana/loki/pkg/storage/stores/util" ) const delimiter = "/" type Config struct { - WorkingDirectory string `yaml:"working_directory"` - SharedStoreType string `yaml:"shared_store"` - CompactionInterval time.Duration `yaml:"compaction_interval"` + WorkingDirectory string `yaml:"working_directory"` + SharedStoreType string `yaml:"shared_store"` + SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"` + CompactionInterval time.Duration `yaml:"compaction_interval"` } // RegisterFlags registers flags. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.WorkingDirectory, "boltdb.shipper.compactor.working-directory", "", "Directory where files can be downloaded for compaction.") f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.compactor.shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem") + f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.compactor.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it.") f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 2*time.Hour, "Interval at which to re-run the compaction operation.") } @@ -42,6 +44,10 @@ func (cfg *Config) IsDefaults() bool { return reflect.DeepEqual(cfg, cpy) } +func (cfg *Config) Validate() error { + return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix) +} + type Compactor struct { services.Service @@ -68,7 +74,7 @@ func NewCompactor(cfg Config, storageConfig storage.Config, r prometheus.Registe compactor := Compactor{ cfg: cfg, - objectClient: util.NewPrefixedObjectClient(objectClient, shipper.StorageKeyPrefix), + objectClient: util.NewPrefixedObjectClient(objectClient, cfg.SharedStoreKeyPrefix), metrics: newMetrics(r), } diff --git a/pkg/storage/stores/shipper/compactor/compactor_test.go b/pkg/storage/stores/shipper/compactor/compactor_test.go index c8d2d0bfc362..f291a0fff7d2 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_test.go +++ b/pkg/storage/stores/shipper/compactor/compactor_test.go @@ -18,7 +18,8 @@ func TestIsDefaults(t *testing.T) { }, false}, {&Config{}, false}, {&Config{ - CompactionInterval: 2 * time.Hour, + SharedStoreKeyPrefix: "index/", + CompactionInterval: 2 * time.Hour, }, true}, } { t.Run(fmt.Sprint(i), func(t *testing.T) { diff --git a/pkg/storage/stores/shipper/shipper_index_client.go b/pkg/storage/stores/shipper/shipper_index_client.go index ccf8b7d2b029..4b6bd5803a22 100644 --- a/pkg/storage/stores/shipper/shipper_index_client.go +++ b/pkg/storage/stores/shipper/shipper_index_client.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/downloads" "github.com/grafana/loki/pkg/storage/stores/shipper/uploads" + shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" "github.com/grafana/loki/pkg/storage/stores/util" ) @@ -39,8 +40,6 @@ const ( // FilesystemObjectStoreType holds the periodic config type for the filesystem store FilesystemObjectStoreType = "filesystem" - StorageKeyPrefix = "index/" - // UploadInterval defines interval for when we check if there are new index files to upload. // It's also used to snapshot the currently written index tables so the snapshots can be used for reads. UploadInterval = 1 * time.Minute @@ -56,6 +55,7 @@ type boltDBIndexClient interface { type Config struct { ActiveIndexDirectory string `yaml:"active_index_directory"` SharedStoreType string `yaml:"shared_store"` + SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"` CacheLocation string `yaml:"cache_location"` CacheTTL time.Duration `yaml:"cache_ttl"` ResyncInterval time.Duration `yaml:"resync_interval"` @@ -69,12 +69,17 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.ActiveIndexDirectory, "boltdb.shipper.active-index-directory", "", "Directory where ingesters would write boltdb files which would then be uploaded by shipper to configured storage") f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.shared-store", "", "Shared store for keeping boltdb files. Supported types: gcs, s3, azure, filesystem") + f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it") f.StringVar(&cfg.CacheLocation, "boltdb.shipper.cache-location", "", "Cache location for restoring boltDB files for queries") f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries") f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage") f.IntVar(&cfg.QueryReadyNumDays, "boltdb.shipper.query-ready-num-days", 0, "Number of days of index to be kept downloaded for queries. Works only with tables created with 24h period.") } +func (cfg *Config) Validate() error { + return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix) +} + type Shipper struct { cfg Config boltDBIndexClient boltDBIndexClient @@ -116,7 +121,7 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R return err } - prefixedObjectClient := util.NewPrefixedObjectClient(storageClient, StorageKeyPrefix) + prefixedObjectClient := util.NewPrefixedObjectClient(storageClient, s.cfg.SharedStoreKeyPrefix) if s.cfg.Mode != ModeReadOnly { uploader, err := s.getUploaderName() diff --git a/pkg/storage/stores/shipper/table_client.go b/pkg/storage/stores/shipper/table_client.go index 6d2f0661e40e..f64fbdae38f2 100644 --- a/pkg/storage/stores/shipper/table_client.go +++ b/pkg/storage/stores/shipper/table_client.go @@ -21,8 +21,8 @@ type boltDBShipperTableClient struct { objectClient chunk.ObjectClient } -func NewBoltDBShipperTableClient(objectClient chunk.ObjectClient) chunk.TableClient { - return &boltDBShipperTableClient{util.NewPrefixedObjectClient(objectClient, StorageKeyPrefix)} +func NewBoltDBShipperTableClient(objectClient chunk.ObjectClient, storageKeyPrefix string) chunk.TableClient { + return &boltDBShipperTableClient{util.NewPrefixedObjectClient(objectClient, storageKeyPrefix)} } func (b *boltDBShipperTableClient) ListTables(ctx context.Context) ([]string, error) { diff --git a/pkg/storage/stores/shipper/table_client_test.go b/pkg/storage/stores/shipper/table_client_test.go index 04454c1e6197..76d7fa278538 100644 --- a/pkg/storage/stores/shipper/table_client_test.go +++ b/pkg/storage/stores/shipper/table_client_test.go @@ -36,7 +36,7 @@ func TestBoltDBShipperTableClient(t *testing.T) { } // we need to use prefixed object client while creating files/folder - prefixedObjectClient := util.NewPrefixedObjectClient(objectClient, StorageKeyPrefix) + prefixedObjectClient := util.NewPrefixedObjectClient(objectClient, "index/") for folder, files := range foldersWithFiles { for _, fileName := range files { @@ -45,7 +45,7 @@ func TestBoltDBShipperTableClient(t *testing.T) { } } - tableClient := NewBoltDBShipperTableClient(objectClient) + tableClient := NewBoltDBShipperTableClient(objectClient, "index/") // check list of tables returns all the folders/tables created above checkExpectedTables(t, tableClient, foldersWithFiles) diff --git a/pkg/storage/stores/shipper/util/util.go b/pkg/storage/stores/shipper/util/util.go index 7753c48652b1..48bedcdf75a0 100644 --- a/pkg/storage/stores/shipper/util/util.go +++ b/pkg/storage/stores/shipper/util/util.go @@ -2,6 +2,7 @@ package util import ( "context" + "errors" "fmt" "io" "os" @@ -17,6 +18,8 @@ import ( "github.com/grafana/loki/pkg/chunkenc" ) +const delimiter = "/" + type StorageClient interface { GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) } @@ -172,3 +175,19 @@ func RemoveDirectories(incoming []chunk.StorageObject) []chunk.StorageObject { func IsDirectory(key string) bool { return strings.HasSuffix(key, "/") } + +func ValidateSharedStoreKeyPrefix(prefix string) error { + if prefix == "" { + return errors.New("shared store key prefix must be set") + } else if strings.Contains(prefix, "\\") { + // When using windows filesystem as object store the implementation of ObjectClient in Cortex takes care of conversion of separator. + // We just need to always use `/` as a path separator. + return fmt.Errorf("shared store key prefix should only have '%s' as a path separator", delimiter) + } else if strings.HasPrefix(prefix, delimiter) { + return errors.New("shared store key prefix should never start with a path separator i.e '/'") + } else if !strings.HasSuffix(prefix, delimiter) { + return errors.New("shared store key prefix should end with a path separator i.e '/'") + } + + return nil +}