diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 2f5449bbe988..52f522ac1af3 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -48,6 +48,7 @@ import ( "github.com/grafana/loki/pkg/ruler" loki_storage "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/stores/shipper" + "github.com/grafana/loki/pkg/storage/stores/shipper/uploads" serverutil "github.com/grafana/loki/pkg/util/server" "github.com/grafana/loki/pkg/util/validation" ) @@ -297,11 +298,13 @@ func (t *Loki) initStore() (_ services.Service, err error) { Validity: t.cfg.StorageConfig.IndexCacheValidity - 1*time.Minute, }, } + t.cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.cfg) + 2*time.Minute case Querier, Ruler: // We do not want query to do any updates to index t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly default: t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadWrite + t.cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.cfg) + 2*time.Minute } } @@ -311,13 +314,13 @@ func (t *Loki) initStore() (_ services.Service, err error) { } if loki_storage.UsingBoltdbShipper(t.cfg.SchemaConfig.Configs) { + boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.cfg) switch t.cfg.Target { case Querier, Ruler: // Use AsyncStore to query both ingesters local store and chunk store for store queries. // Only queriers should use the AsyncStore, it should never be used in ingesters. chunkStore = loki_storage.NewAsyncStore(chunkStore, t.ingesterQuerier, - calculateAsyncStoreQueryIngestersWithin(t.cfg.Querier.QueryIngestersWithin, - t.cfg.Ingester.MaxChunkAge, t.cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval), + calculateAsyncStoreQueryIngestersWithin(t.cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration), ) case All: // We want ingester to also query the store when using boltdb-shipper but only when running with target All. @@ -329,7 +332,7 @@ func (t *Loki) initStore() (_ services.Service, err error) { boltdbShipperConfigIdx++ } mlb, err := calculateMaxLookBack(t.cfg.SchemaConfig.Configs[boltdbShipperConfigIdx], t.cfg.Ingester.QueryStoreMaxLookBackPeriod, - t.cfg.Ingester.MaxChunkAge, t.cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval) + boltdbShipperMinIngesterQueryStoreDuration) if err != nil { return nil, err } @@ -566,34 +569,49 @@ func (t *Loki) initCompactor() (services.Service, error) { return t.compactor, nil } -func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, maxChunkAge, querierResyncInterval time.Duration) (time.Duration, error) { +func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) { if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 { return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`") } - // When using shipper, limit max look back for query to MaxChunkAge + upload interval by shipper + 15 mins to query only data whose index is not pushed yet - defaultMaxLookBack := maxChunkAge + shipper.UploadInterval + querierResyncInterval + (15 * time.Minute) if maxLookBackConfig == 0 { - // If the QueryStoreMaxLookBackPeriod is still it's default value of 0, set it to the default calculated value. - return defaultMaxLookBack, nil - } else if maxLookBackConfig > 0 && maxLookBackConfig < defaultMaxLookBack { - // If the QueryStoreMaxLookBackPeriod is > 0 (-1 is allowed for infinite), make sure it's at least greater than the default or throw an error + // If the QueryStoreMaxLookBackPeriod is still it's default value of 0, set it to the minDuration. + return minDuration, nil + } else if maxLookBackConfig > 0 && maxLookBackConfig < minDuration { + // If the QueryStoreMaxLookBackPeriod is > 0 (-1 is allowed for infinite), make sure it's at least greater than minDuration or throw an error return 0, fmt.Errorf("the configured query_store_max_look_back_period of '%v' is less than the calculated default of '%v' "+ "which is calculated based on the max_chunk_age + 15 minute boltdb-shipper interval + 15 min additional buffer. Increase this value"+ - "greater than the default or remove it from the configuration to use the default", maxLookBackConfig, defaultMaxLookBack) + "greater than the default or remove it from the configuration to use the default", maxLookBackConfig, minDuration) } return maxLookBackConfig, nil } -func calculateAsyncStoreQueryIngestersWithin(queryIngestersWithinConfig, maxChunkAge, querierResyncInterval time.Duration) time.Duration { +func calculateAsyncStoreQueryIngestersWithin(queryIngestersWithinConfig, minDuration time.Duration) time.Duration { // 0 means do not limit queries, we would also not limit ingester queries from AsyncStore. if queryIngestersWithinConfig == 0 { return 0 } - minVal := maxChunkAge + shipper.UploadInterval + querierResyncInterval + (15 * time.Minute) - if queryIngestersWithinConfig < minVal { - return minVal + if queryIngestersWithinConfig < minDuration { + return minDuration } return queryIngestersWithinConfig } + +// boltdbShipperQuerierIndexUpdateDelay returns duration it could take for queriers to serve the index since it was uploaded. +// It also considers index cache validity because a querier could have cached index just before it was going to resync which means +// it would keep serving index until the cache entries expire. +func boltdbShipperQuerierIndexUpdateDelay(cfg Config) time.Duration { + return cfg.StorageConfig.IndexCacheValidity + cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval +} + +// boltdbShipperIngesterIndexUploadDelay returns duration it could take for an index file containing id of a chunk to be uploaded to the shared store since it got flushed. +func boltdbShipperIngesterIndexUploadDelay() time.Duration { + return uploads.ShardDBsByDuration + shipper.UploadInterval +} + +// boltdbShipperMinIngesterQueryStoreDuration returns minimum duration(with some buffer) ingesters should query their stores to +// avoid missing any logs or chunk ids due to async nature of BoltDB Shipper. +func boltdbShipperMinIngesterQueryStoreDuration(cfg Config) time.Duration { + return cfg.Ingester.MaxChunkAge + boltdbShipperIngesterIndexUploadDelay() + boltdbShipperQuerierIndexUpdateDelay(cfg) + 2*time.Minute +} diff --git a/pkg/loki/modules_test.go b/pkg/loki/modules_test.go index 627b4a910613..3cec7814409c 100644 --- a/pkg/loki/modules_test.go +++ b/pkg/loki/modules_test.go @@ -9,10 +9,9 @@ import ( func Test_calculateMaxLookBack(t *testing.T) { type args struct { - pc chunk.PeriodConfig - maxLookBackConfig time.Duration - maxChunkAge time.Duration - querierBoltDBFilesResyncInterval time.Duration + pc chunk.PeriodConfig + maxLookBackConfig time.Duration + minDuration time.Duration } tests := []struct { name string @@ -26,11 +25,10 @@ func Test_calculateMaxLookBack(t *testing.T) { pc: chunk.PeriodConfig{ ObjectType: "filesystem", }, - maxLookBackConfig: 0, - maxChunkAge: 1 * time.Hour, - querierBoltDBFilesResyncInterval: 5 * time.Minute, + maxLookBackConfig: 0, + minDuration: time.Hour, }, - want: 81 * time.Minute, + want: time.Hour, wantErr: false, }, { @@ -39,9 +37,8 @@ func Test_calculateMaxLookBack(t *testing.T) { pc: chunk.PeriodConfig{ ObjectType: "filesystem", }, - maxLookBackConfig: -1, - maxChunkAge: 1 * time.Hour, - querierBoltDBFilesResyncInterval: 5 * time.Minute, + maxLookBackConfig: -1, + minDuration: time.Hour, }, want: -1, wantErr: false, @@ -52,22 +49,20 @@ func Test_calculateMaxLookBack(t *testing.T) { pc: chunk.PeriodConfig{ ObjectType: "gcs", }, - maxLookBackConfig: -1, - maxChunkAge: 1 * time.Hour, - querierBoltDBFilesResyncInterval: 5 * time.Minute, + maxLookBackConfig: -1, + minDuration: time.Hour, }, want: 0, wantErr: true, }, { - name: "less than default", + name: "less than minDuration", args: args{ pc: chunk.PeriodConfig{ ObjectType: "filesystem", }, - maxLookBackConfig: 1 * time.Hour, - maxChunkAge: 1 * time.Hour, - querierBoltDBFilesResyncInterval: 5 * time.Minute, + maxLookBackConfig: 1 * time.Hour, + minDuration: 2 * time.Hour, }, want: 0, wantErr: true, @@ -75,7 +70,7 @@ func Test_calculateMaxLookBack(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := calculateMaxLookBack(tt.args.pc, tt.args.maxLookBackConfig, tt.args.maxChunkAge, tt.args.querierBoltDBFilesResyncInterval) + got, err := calculateMaxLookBack(tt.args.pc, tt.args.maxLookBackConfig, tt.args.minDuration) if (err != nil) != tt.wantErr { t.Errorf("calculateMaxLookBack() error = %v, wantErr %v", err, tt.wantErr) return @@ -86,53 +81,3 @@ func Test_calculateMaxLookBack(t *testing.T) { }) } } - -func Test_calculateAsyncStoreQueryIngestersWithin(t *testing.T) { - type args struct { - queryIngestersWithin time.Duration - maxChunkAge time.Duration - querierBoltDBFilesResyncInterval time.Duration - } - tests := []struct { - name string - args args - want time.Duration - }{ - { - name: "default", - args: args{ - 0, - time.Hour, - 5 * time.Minute, - }, - want: 0, - }, - { - name: "queryIngestersWithin more than min val", - args: args{ - 3 * time.Hour, - time.Hour, - 5 * time.Minute, - }, - want: 3 * time.Hour, - }, - { - name: "queryIngestersWithin less than min val", - args: args{ - time.Hour, - time.Hour, - 5 * time.Minute, - }, - want: 81 * time.Minute, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := calculateAsyncStoreQueryIngestersWithin(tt.args.queryIngestersWithin, tt.args.maxChunkAge, tt.args.querierBoltDBFilesResyncInterval) - - if got != tt.want { - t.Errorf("calculateAsyncStoreQueryIngestersWithin() got = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/pkg/storage/stores/shipper/shipper_index_client.go b/pkg/storage/stores/shipper/shipper_index_client.go index 4eeeb30a9ebe..0a1f53f3c0e5 100644 --- a/pkg/storage/stores/shipper/shipper_index_client.go +++ b/pkg/storage/stores/shipper/shipper_index_client.go @@ -55,14 +55,15 @@ type boltDBIndexClient interface { } type Config struct { - ActiveIndexDirectory string `yaml:"active_index_directory"` - SharedStoreType string `yaml:"shared_store"` - CacheLocation string `yaml:"cache_location"` - CacheTTL time.Duration `yaml:"cache_ttl"` - ResyncInterval time.Duration `yaml:"resync_interval"` - QueryReadyNumDays int `yaml:"query_ready_num_days"` - IngesterName string `yaml:"-"` - Mode int `yaml:"-"` + ActiveIndexDirectory string `yaml:"active_index_directory"` + SharedStoreType string `yaml:"shared_store"` + CacheLocation string `yaml:"cache_location"` + CacheTTL time.Duration `yaml:"cache_ttl"` + ResyncInterval time.Duration `yaml:"resync_interval"` + QueryReadyNumDays int `yaml:"query_ready_num_days"` + IngesterName string `yaml:"-"` + Mode int `yaml:"-"` + IngesterDBRetainPeriod time.Duration `yaml:"-"` } // RegisterFlags registers flags. @@ -132,7 +133,7 @@ func (s *Shipper) init(objectClient chunk.ObjectClient, registerer prometheus.Re Uploader: uploader, IndexDir: s.cfg.ActiveIndexDirectory, UploadInterval: UploadInterval, - DBRetainPeriod: s.cfg.ResyncInterval + 2*time.Minute, + DBRetainPeriod: s.cfg.IngesterDBRetainPeriod, } uploadsManager, err := uploads.NewTableManager(cfg, s.boltDBIndexClient, objectClient, registerer) if err != nil { diff --git a/pkg/storage/stores/shipper/uploads/table.go b/pkg/storage/stores/shipper/uploads/table.go index 6747b88c03a9..46cd87153825 100644 --- a/pkg/storage/stores/shipper/uploads/table.go +++ b/pkg/storage/stores/shipper/uploads/table.go @@ -26,7 +26,7 @@ import ( const ( // create a new db sharded by time based on when write request is received - shardDBsByDuration = 15 * time.Minute + ShardDBsByDuration = 15 * time.Minute // a temp file is created during uploads with name of the db + tempFileSuffix tempFileSuffix = ".temp" @@ -241,12 +241,12 @@ func (lt *Table) Write(ctx context.Context, writes local.TableWrites) error { return lt.write(ctx, time.Now(), writes) } -// write writes to a db locally. It shards the db files by truncating the passed time by shardDBsByDuration using https://golang.org/pkg/time/#Time.Truncate +// write writes to a db locally. It shards the db files by truncating the passed time by ShardDBsByDuration using https://golang.org/pkg/time/#Time.Truncate // db files are named after the time shard i.e epoch of the truncated time. // If a db file does not exist for a shard it gets created. func (lt *Table) write(ctx context.Context, tm time.Time, writes local.TableWrites) error { // do not write to files older than init time otherwise we might endup modifying file which was already created and uploaded before last shutdown. - shard := tm.Truncate(shardDBsByDuration).Unix() + shard := tm.Truncate(ShardDBsByDuration).Unix() if shard < lt.modifyShardsSince { shard = lt.modifyShardsSince } @@ -501,5 +501,5 @@ func loadBoltDBsFromDir(dir string) (map[string]*bbolt.DB, error) { func getOldestActiveShardTime() time.Time { // upload files excluding active shard. It could so happen that we just started a new shard but the file for last shard is still being updated due to pending writes or pending flush to disk. // To avoid uploading it, excluding previous active shard as well if it has been not more than a minute since it became inactive. - return time.Now().Add(-time.Minute).Truncate(shardDBsByDuration) + return time.Now().Add(-time.Minute).Truncate(ShardDBsByDuration) } diff --git a/pkg/storage/stores/shipper/uploads/table_test.go b/pkg/storage/stores/shipper/uploads/table_test.go index 4716c9f143fe..8ba5d6b0f448 100644 --- a/pkg/storage/stores/shipper/uploads/table_test.go +++ b/pkg/storage/stores/shipper/uploads/table_test.go @@ -109,7 +109,7 @@ func TestTable_Write(t *testing.T) { now := time.Now() // allow modifying last 5 shards - table.modifyShardsSince = now.Add(-5 * shardDBsByDuration).Unix() + table.modifyShardsSince = now.Add(-5 * ShardDBsByDuration).Unix() // a couple of times for which we want to do writes to make the table create different shards testCases := []struct { @@ -120,13 +120,13 @@ func TestTable_Write(t *testing.T) { writeTime: now, }, { - writeTime: now.Add(-(shardDBsByDuration + 5*time.Minute)), + writeTime: now.Add(-(ShardDBsByDuration + 5*time.Minute)), }, { - writeTime: now.Add(-(shardDBsByDuration*3 + 3*time.Minute)), + writeTime: now.Add(-(ShardDBsByDuration*3 + 3*time.Minute)), }, { - writeTime: now.Add(-6 * shardDBsByDuration), // write with time older than table.modifyShardsSince + writeTime: now.Add(-6 * ShardDBsByDuration), // write with time older than table.modifyShardsSince dbName: fmt.Sprint(table.modifyShardsSince), }, } @@ -145,7 +145,7 @@ func TestTable_Write(t *testing.T) { expectedDBName := tc.dbName if expectedDBName == "" { - expectedDBName = fmt.Sprint(tc.writeTime.Truncate(shardDBsByDuration).Unix()) + expectedDBName = fmt.Sprint(tc.writeTime.Truncate(ShardDBsByDuration).Unix()) } db, ok := table.dbs[expectedDBName] require.True(t, ok) @@ -189,7 +189,7 @@ func TestTable_Upload(t *testing.T) { // write a batch to another shard batch = boltIndexClient.NewWriteBatch() testutil.AddRecordsToBatch(batch, "test", 20, 10) - require.NoError(t, table.write(context.Background(), now.Add(shardDBsByDuration), batch.(*local.BoltWriteBatch).Writes["test"])) + require.NoError(t, table.write(context.Background(), now.Add(ShardDBsByDuration), batch.(*local.BoltWriteBatch).Writes["test"])) // upload the dbs to storage require.NoError(t, table.Upload(context.Background(), true)) @@ -382,9 +382,9 @@ func TestTable_ImmutableUploads(t *testing.T) { // some dbs to setup dbNames := []int64{ - shardCutoff.Add(-shardDBsByDuration).Unix(), // inactive shard, should upload + shardCutoff.Add(-ShardDBsByDuration).Unix(), // inactive shard, should upload shardCutoff.Add(-1 * time.Minute).Unix(), // 1 minute before shard cutoff, should upload - time.Now().Truncate(shardDBsByDuration).Unix(), // active shard, should not upload + time.Now().Truncate(ShardDBsByDuration).Unix(), // active shard, should not upload } dbs := map[string]testutil.DBRecords{}