Skip to content

Commit

Permalink
consider previous schema as well for boltdb-shipper for calculating m…
Browse files Browse the repository at this point in the history
…ax look back
  • Loading branch information
sandeepsukhani committed Sep 18, 2020
1 parent e4450ae commit 2946f52
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 42 deletions.
4 changes: 2 additions & 2 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ func (s *testStore) GetChunkRefs(ctx context.Context, userID string, from, throu
return nil, nil, nil
}

func (s *testStore) ActivePeriodConfig() chunk.PeriodConfig {
return chunk.PeriodConfig{}
func (s *testStore) GetSchemaConfigs() []chunk.PeriodConfig {
return nil
}

func (s *testStore) Stop() {}
Expand Down
55 changes: 39 additions & 16 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"github.com/grafana/loki/pkg/storage"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -106,7 +108,8 @@ type Ingester struct {
lifecycler *ring.Lifecycler
lifecyclerWatcher *services.FailureWatcher

store ChunkStore
store ChunkStore
periodicConfigs []chunk.PeriodConfig

loopDone sync.WaitGroup
loopQuit chan struct{}
Expand All @@ -127,7 +130,7 @@ type ChunkStore interface {
SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error)
SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error)
GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error)
ActivePeriodConfig() chunk.PeriodConfig
GetSchemaConfigs() []chunk.PeriodConfig
}

// New makes a new Ingester.
Expand All @@ -141,13 +144,14 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
}

i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
periodicConfigs: store.GetSchemaConfigs(),
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
factory: func() chunkenc.Chunk {
return chunkenc.NewMemChunk(enc, cfg.BlockSize, cfg.TargetChunkSize)
},
Expand Down Expand Up @@ -358,20 +362,39 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
return sendSampleBatches(queryServer.Context(), heapItr, queryServer)
}

// boltdbShipperMaxLookBack returns a max look back period only if active index type is boltdb-shipper.
// max look back is limited to from time of boltdb-shipper config.
// It considers previous periodic config's from time if that also has index type set to boltdb-shipper.
func (i *Ingester) boltdbShipperMaxLookBack() time.Duration {
activePeriodicConfigIndex := storage.ActivePeriodConfig(i.periodicConfigs)
activePeriodicConfig := i.periodicConfigs[activePeriodicConfigIndex]
if activePeriodicConfig.IndexType != shipper.BoltDBShipperType {
return 0
}

startTime := activePeriodicConfig.From
if activePeriodicConfigIndex != 0 && i.periodicConfigs[activePeriodicConfigIndex-1].IndexType == shipper.BoltDBShipperType {
startTime = i.periodicConfigs[activePeriodicConfigIndex-1].From
}

maxLookBack := time.Since(startTime.Time.Time())
return maxLookBack
}

// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper.
func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) {
orgID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

activePeriodicConfig := i.store.ActivePeriodConfig()
if activePeriodicConfig.IndexType != shipper.BoltDBShipperType {
boltdbShipperMaxLookBack := i.boltdbShipperMaxLookBack()
if boltdbShipperMaxLookBack == 0 {
return nil, nil
}

reqStart := req.Start
reqStart = adjustQueryStartTime(time.Since(activePeriodicConfig.From.Time.Time()), reqStart, time.Now())
reqStart = adjustQueryStartTime(boltdbShipperMaxLookBack, reqStart, time.Now())

// parse the request
start, end := listutil.RoundToMilliseconds(reqStart, req.End)
Expand Down Expand Up @@ -411,8 +434,8 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
}

// Only continue if the active index type is boltdb-shipper or QueryStore flag is true.
activePeriodicConfig := i.store.ActivePeriodConfig()
if activePeriodicConfig.IndexType != shipper.BoltDBShipperType && !i.cfg.QueryStore {
boltdbShipperMaxLookBack := i.boltdbShipperMaxLookBack()
if boltdbShipperMaxLookBack == 0 && !i.cfg.QueryStore {
return resp, nil
}

Expand All @@ -429,8 +452,8 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
}

maxLookBackPeriod := i.cfg.QueryStoreMaxLookBackPeriod
if activePeriodicConfig.IndexType == shipper.BoltDBShipperType {
maxLookBackPeriod = time.Since(activePeriodicConfig.From.Time.Time())
if boltdbShipperMaxLookBack != 0 {
maxLookBackPeriod = boltdbShipperMaxLookBack
}
// Adjust the start time based on QueryStoreMaxLookBackPeriod.
start := adjustQueryStartTime(maxLookBackPeriod, *req.Start, time.Now())
Expand Down
81 changes: 79 additions & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ func (s *mockStore) GetChunkRefs(ctx context.Context, userID string, from, throu
return nil, nil, nil
}

func (s *mockStore) ActivePeriodConfig() chunk.PeriodConfig {
return chunk.PeriodConfig{}
func (s *mockStore) GetSchemaConfigs() []chunk.PeriodConfig {
return nil
}

type mockQuerierServer struct {
Expand Down Expand Up @@ -367,3 +367,80 @@ func TestIngester_buildStoreRequest(t *testing.T) {
})
}
}

func TestIngester_boltdbShipperMaxLookBack(t *testing.T) {
now := model.Now()

for _, tc := range []struct {
name string
periodicConfigs []chunk.PeriodConfig
expectedMaxLookBack time.Duration
}{
{
name: "not using boltdb-shipper",
periodicConfigs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: now.Add(-24 * time.Hour)},
IndexType: "bigtable",
},
},
},
{
name: "just one periodic config with boltdb-shipper",
periodicConfigs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: now.Add(-24 * time.Hour)},
IndexType: "boltdb-shipper",
},
},
expectedMaxLookBack: time.Since(now.Add(-24 * time.Hour).Time()),
},
{
name: "active config boltdb-shipper, previous config non boltdb-shipper",
periodicConfigs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: now.Add(-48 * time.Hour)},
IndexType: "bigtable",
},
{
From: chunk.DayTime{Time: now.Add(-24 * time.Hour)},
IndexType: "boltdb-shipper",
},
},
expectedMaxLookBack: time.Since(now.Add(-24 * time.Hour).Time()),
},
{
name: "current and previous config both using boltdb-shipper",
periodicConfigs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: now.Add(-48 * time.Hour)},
IndexType: "boltdb-shipper",
},
{
From: chunk.DayTime{Time: now.Add(-24 * time.Hour)},
IndexType: "boltdb-shipper",
},
},
expectedMaxLookBack: time.Since(now.Add(-48 * time.Hour).Time()),
},
{
name: "active config non boltdb-shipper, previous config boltdb-shipper",
periodicConfigs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: now.Add(-48 * time.Hour)},
IndexType: "boltdb-shipper",
},
{
From: chunk.DayTime{Time: now.Add(-24 * time.Hour)},
IndexType: "bigtable",
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
ingester := Ingester{periodicConfigs: tc.periodicConfigs}
mlb := ingester.boltdbShipperMaxLookBack()
require.InDelta(t, tc.expectedMaxLookBack, mlb, float64(time.Second))
})
}
}
8 changes: 4 additions & 4 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,12 @@ func (t *Loki) initTableManager() (services.Service, error) {
func (t *Loki) initStore() (_ services.Service, err error) {
// If RF > 1 and current or upcoming index type is boltdb-shipper then disable index dedupe and write dedupe cache.
// This is to ensure that index entries are replicated to all the boltdb files in ingesters flushing replicated data.
if t.cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && loki_storage.UsingBoltdbShipper(t.cfg.SchemaConfig) {
if t.cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && loki_storage.UsingBoltdbShipper(t.cfg.SchemaConfig.Configs) {
t.cfg.ChunkStoreConfig.DisableIndexDeduplication = true
t.cfg.ChunkStoreConfig.WriteDedupeCacheConfig = cache.Config{}
}

if loki_storage.UsingBoltdbShipper(t.cfg.SchemaConfig) {
if loki_storage.UsingBoltdbShipper(t.cfg.SchemaConfig.Configs) {
t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID
switch t.cfg.Target {
case Ingester:
Expand All @@ -276,7 +276,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
return
}

if loki_storage.UsingBoltdbShipper(t.cfg.SchemaConfig) {
if loki_storage.UsingBoltdbShipper(t.cfg.SchemaConfig.Configs) {
switch t.cfg.Target {
case Querier:
// Use AsyncStore to query both ingesters local store and chunk store for store queries.
Expand All @@ -287,7 +287,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
// We do not want to use AsyncStore otherwise it would start spiraling around doing queries over and over again to the ingesters and store.
// ToDo: See if we can avoid doing this when not running loki in clustered mode.
t.cfg.Ingester.QueryStore = true
boltdbShipperConfigIdx := loki_storage.ActivePeriodConfig(t.cfg.SchemaConfig)
boltdbShipperConfigIdx := loki_storage.ActivePeriodConfig(t.cfg.SchemaConfig.Configs)
if t.cfg.SchemaConfig.Configs[boltdbShipperConfigIdx].IndexType != shipper.BoltDBShipperType {
boltdbShipperConfigIdx++
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (s *storeMock) GetChunkFetcher(_ model.Time) *chunk.Fetcher {
panic("don't call me please")
}

func (s *storeMock) ActivePeriodConfig() chunk.PeriodConfig {
func (s *storeMock) GetSchemaConfigs() []chunk.PeriodConfig {
panic("don't call me please")
}

Expand Down
22 changes: 11 additions & 11 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (cfg *SchemaConfig) Validate() error {
if len(cfg.Configs) == 0 {
return zeroLengthConfigError
}
activePCIndex := ActivePeriodConfig(*cfg)
activePCIndex := ActivePeriodConfig((*cfg).Configs)

// if current index type is boltdb-shipper and there are no upcoming index types then it should be set to 24 hours.
if cfg.Configs[activePCIndex].IndexType == shipper.BoltDBShipperType && cfg.Configs[activePCIndex].IndexTables.Period != 24*time.Hour && len(cfg.Configs)-1 == activePCIndex {
Expand All @@ -75,7 +75,7 @@ type Store interface {
SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error)
SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error)
GetSeries(ctx context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error)
ActivePeriodConfig() chunk.PeriodConfig
GetSchemaConfigs() []chunk.PeriodConfig
}

type store struct {
Expand Down Expand Up @@ -316,8 +316,8 @@ func (s *store) SelectSamples(ctx context.Context, req logql.SelectSampleParams)
return newSampleBatchIterator(ctx, s.chunkMetrics, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, filter, extractor, req.Start, req.End)
}

func (s *store) ActivePeriodConfig() chunk.PeriodConfig {
return s.schemaCfg.Configs[ActivePeriodConfig(s.schemaCfg)]
func (s *store) GetSchemaConfigs() []chunk.PeriodConfig {
return s.schemaCfg.Configs
}

func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.Chunk {
Expand Down Expand Up @@ -362,10 +362,10 @@ func RegisterCustomIndexClients(cfg *Config, registerer prometheus.Registerer) {

// ActivePeriodConfig returns index of active PeriodicConfig which would be applicable to logs that would be pushed starting now.
// Note: Another PeriodicConfig might be applicable for future logs which can change index type.
func ActivePeriodConfig(cfg SchemaConfig) int {
func ActivePeriodConfig(configs []chunk.PeriodConfig) int {
now := model.Now()
i := sort.Search(len(cfg.Configs), func(i int) bool {
return cfg.Configs[i].From.Time > now
i := sort.Search(len(configs), func(i int) bool {
return configs[i].From.Time > now
})
if i > 0 {
i--
Expand All @@ -374,10 +374,10 @@ func ActivePeriodConfig(cfg SchemaConfig) int {
}

// UsingBoltdbShipper checks whether current or the next index type is boltdb-shipper, returns true if yes.
func UsingBoltdbShipper(cfg SchemaConfig) bool {
activePCIndex := ActivePeriodConfig(cfg)
if cfg.Configs[activePCIndex].IndexType == shipper.BoltDBShipperType ||
(len(cfg.Configs)-1 > activePCIndex && cfg.Configs[activePCIndex+1].IndexType == shipper.BoltDBShipperType) {
func UsingBoltdbShipper(configs []chunk.PeriodConfig) bool {
activePCIndex := ActivePeriodConfig(configs)
if configs[activePCIndex].IndexType == shipper.BoltDBShipperType ||
(len(configs)-1 > activePCIndex && configs[activePCIndex+1].IndexType == shipper.BoltDBShipperType) {
return true
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,21 +907,21 @@ func TestActiveIndexType(t *testing.T) {
IndexType: "first",
}}

assert.Equal(t, 0, ActivePeriodConfig(cfg))
assert.Equal(t, 0, ActivePeriodConfig(cfg.Configs))

// add a newer PeriodConfig in the past which should be considered
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(-12 * time.Hour)},
IndexType: "second",
})
assert.Equal(t, 1, ActivePeriodConfig(cfg))
assert.Equal(t, 1, ActivePeriodConfig(cfg.Configs))

// add a newer PeriodConfig in the future which should not be considered
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(time.Hour)},
IndexType: "third",
})
assert.Equal(t, 1, ActivePeriodConfig(cfg))
assert.Equal(t, 1, ActivePeriodConfig(cfg.Configs))
}

func TestUsingBoltdbShipper(t *testing.T) {
Expand All @@ -932,18 +932,18 @@ func TestUsingBoltdbShipper(t *testing.T) {
From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)},
IndexType: "boltdb-shipper",
}}
assert.Equal(t, true, UsingBoltdbShipper(cfg))
assert.Equal(t, true, UsingBoltdbShipper(cfg.Configs))

// just one PeriodConfig in the past not using boltdb-shipper
cfg.Configs[0].IndexType = "boltdb"
assert.Equal(t, false, UsingBoltdbShipper(cfg))
assert.Equal(t, false, UsingBoltdbShipper(cfg.Configs))

// add a newer PeriodConfig in the future using boltdb-shipper
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(time.Hour)},
IndexType: "boltdb-shipper",
})
assert.Equal(t, true, UsingBoltdbShipper(cfg))
assert.Equal(t, true, UsingBoltdbShipper(cfg.Configs))
}

func TestSchemaConfig_Validate(t *testing.T) {
Expand Down

0 comments on commit 2946f52

Please sign in to comment.