diff --git a/pkg/storage/stores/shipper/compactor/index_set.go b/pkg/storage/stores/shipper/compactor/index_set.go index dc5cd05dd213..d784fecd00ba 100644 --- a/pkg/storage/stores/shipper/compactor/index_set.go +++ b/pkg/storage/stores/shipper/compactor/index_set.go @@ -99,7 +99,7 @@ func (is *indexSet) initUserIndexSet(workingDir string) { } compactedDBName := filepath.Join(workingDir, fmt.Sprint(time.Now().Unix())) - seedFileIdx := findSeedFileIdx(is.sourceObjects) + seedFileIdx := compactedFileIdx(is.sourceObjects) if len(is.sourceObjects) > 0 { // we would only have compacted files in user index folder, so it is not expected to have -1 for seedFileIdx but @@ -236,7 +236,7 @@ func (is *indexSet) writeBatch(_ string, batch []indexEntry) error { // runRetention runs the retention on index set func (is *indexSet) runRetention(tableMarker retention.TableMarker) error { - empty, modified, err := tableMarker.MarkForDelete(is.ctx, is.tableName, is.compactedDB) + empty, modified, err := tableMarker.MarkForDelete(is.ctx, is.tableName, is.userID, is.compactedDB, is.logger) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/compactor/retention/metrics.go b/pkg/storage/stores/shipper/compactor/retention/metrics.go index 6734b9b24a79..1a50fe3e8261 100644 --- a/pkg/storage/stores/shipper/compactor/retention/metrics.go +++ b/pkg/storage/stores/shipper/compactor/retention/metrics.go @@ -59,8 +59,8 @@ func newMarkerMetrics(r prometheus.Registerer) *markerMetrics { tableProcessedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: "loki_boltdb_shipper", Name: "retention_marker_table_processed_total", - Help: "Total amount of table processed per action.", - }, []string{"table", "action"}), + Help: "Total amount of table processed for each user per action. Empty string for user_id is for common index", + }, []string{"table", "user_id", "action"}), tableMarksCreatedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: "loki_boltdb_shipper", Name: "retention_marker_count_total", diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index f67ecd4748fa..b242e3b0a127 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -7,6 +7,7 @@ import ( "fmt" "time" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -32,7 +33,7 @@ var errNoChunksFound = errors.New("no chunks found in table, please check if the type TableMarker interface { // MarkForDelete marks chunks to delete for a given table and returns if it's empty or modified. - MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) + MarkForDelete(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) } type Marker struct { @@ -58,16 +59,16 @@ func NewMarker(workingDirectory string, config storage.SchemaConfig, expiration } // MarkForDelete marks all chunks expired for a given table. -func (t *Marker) MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { +func (t *Marker) MarkForDelete(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) { start := time.Now() status := statusSuccess defer func() { t.markerMetrics.tableProcessedDurationSeconds.WithLabelValues(tableName, status).Observe(time.Since(start).Seconds()) - level.Debug(util_log.Logger).Log("msg", "finished to process table", "table", tableName, "duration", time.Since(start)) + level.Debug(logger).Log("msg", "finished to process table", "duration", time.Since(start)) }() - level.Debug(util_log.Logger).Log("msg", "starting to process table", "table", tableName) + level.Debug(logger).Log("msg", "starting to process table") - empty, modified, err := t.markTable(ctx, tableName, db) + empty, modified, err := t.markTable(ctx, tableName, userID, db) if err != nil { status = statusFailure return false, false, err @@ -75,7 +76,7 @@ func (t *Marker) MarkForDelete(ctx context.Context, tableName string, db *bbolt. return empty, modified, nil } -func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { +func (t *Marker) markTable(ctx context.Context, tableName, userID string, db *bbolt.DB) (bool, bool, error) { schemaCfg, ok := schemaPeriodForTable(t.config, tableName) if !ok { return false, false, fmt.Errorf("could not find schema for table: %s", tableName) @@ -119,14 +120,14 @@ func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) return false, false, err } if empty { - t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionDeleted).Inc() + t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, userID, tableActionDeleted).Inc() return empty, true, nil } if !modified { - t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionNone).Inc() + t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, userID, tableActionNone).Inc() return empty, modified, nil } - t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionModified).Inc() + t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, userID, tableActionModified).Inc() return empty, modified, nil } diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index dc62d298f5dd..28424e0d14be 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -28,6 +28,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/objectclient" "github.com/grafana/loki/pkg/storage/chunk/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/util" + util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/validation" ) @@ -155,7 +156,7 @@ func Test_Retention(t *testing.T) { marker, err := NewMarker(workDir, store.schemaCfg, expiration, nil, prometheus.NewRegistry()) require.NoError(t, err) for _, table := range store.indexTables() { - _, _, err := marker.MarkForDelete(context.Background(), table.name, table.DB) + _, _, err := marker.MarkForDelete(context.Background(), table.name, "", table.DB, util_log.Logger) require.Nil(t, err) table.Close() diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index c1c5434725ac..bfe626a98e0d 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -25,6 +25,38 @@ import ( util_log "github.com/grafana/loki/pkg/util/log" ) +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Below we show various formats that we have for structuring index in the object store. // +// // +// FORMAT1 FORMAT2 FORMAT3 // +// // +// table1 table1 table1 // +// | | | // +// ----> db1.gz ----> db1.gz ----> user1 // +// | | | | // +// ----> index ----> user1 | ----> db1.gz // +// ----> user2 | | // +// | ----> index // +// ----> user2 // +// | // +// ----> db1.gz // +// | // +// ----> index // +// // +// FORMAT1 - `table1` has 1 db named db1.gz and 1 boltdb bucket named `index` which contains index for all the users. // +// It is in use when the flag to build per user index is not enabled. // +// Ingesters write the index in Format1 which then compactor compacts down in same format. // +// // +// FORMAT2 - `table1` has 1 db named db1.gz and 1 boltdb bucket each for `user1` and `user2` containing // +// index just for those users. // +// It is an intermediate format built by ingesters when the flag to build per user index is enabled. // +// // +// FORMAT3 - `table1` has 1 folder each for `user1` and `user2` containing index files having index just for those users. // +// Compactor builds index in this format from Format2. // +// // +// THING TO NOTE HERE IS COMPACTOR BUILDS INDEX IN FORMAT1 FROM FORMAT1 AND FORMAT3 FROM FORMAT2. // +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + const ( uploaderName = "compactor" @@ -60,7 +92,6 @@ type table struct { usersWithPerUserIndex []string uploadCompactedDB bool compactedDB *bbolt.DB - seedSourceFileIdx int logger log.Logger ctx context.Context @@ -83,7 +114,6 @@ func newTable(ctx context.Context, workingDirectory string, indexStorageClient s indexSets: map[string]*indexSet{}, baseUserIndexSet: storage.NewIndexSet(indexStorageClient, true), baseCommonIndexSet: storage.NewIndexSet(indexStorageClient, false), - seedSourceFileIdx: -1, } table.logger = log.With(util_log.Logger, "table-name", table.name) @@ -130,8 +160,8 @@ func (t *table) compact(applyRetention bool) error { return err } } else if len(indexFiles) == 1 && (applyRetention || mustRecreateCompactedDB(indexFiles)) { + // we have just 1 common index file which is already compacted. // initialize common compacted db if we need to apply retention, or we need to recreate it - t.seedSourceFileIdx = 0 downloadAt := filepath.Join(t.workingDirectory, indexFiles[0].Name) err = shipper_util.DownloadFileFromStorage(downloadAt, shipper_util.IsCompressedFile(indexFiles[0].Name), false, shipper_util.LoggerWithFilename(t.logger, indexFiles[0].Name), @@ -182,6 +212,9 @@ func (t *table) done() error { return err } + // initialize the user index sets for: + // - compaction if we have more than 1 index file, taken care of by index set initialization + // - recreation if mustRecreateCompactedDB says so, taken care of by indexSet.done call below if len(indexFiles) > 1 || mustRecreateCompactedDB(indexFiles) { t.indexSets[userID], err = t.getOrCreateUserIndex(userID) if err != nil { @@ -243,16 +276,17 @@ func (t *table) compactFiles(files []storage.IndexFile) error { level.Info(t.logger).Log("msg", "starting compaction of dbs") compactedDBName := filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix())) - t.seedSourceFileIdx = findSeedFileIdx(files) + // if we find a previously compacted file, use it as a seed file to copy other index into it + seedSourceFileIdx := compactedFileIdx(files) - if t.seedSourceFileIdx != -1 { + if seedSourceFileIdx != -1 { t.uploadCompactedDB = true - compactedDBName = filepath.Join(t.workingDirectory, files[t.seedSourceFileIdx].Name) + compactedDBName = filepath.Join(t.workingDirectory, files[seedSourceFileIdx].Name) - level.Info(t.logger).Log("msg", fmt.Sprintf("using %s as seed file", files[t.seedSourceFileIdx].Name)) - err = shipper_util.DownloadFileFromStorage(compactedDBName, shipper_util.IsCompressedFile(files[t.seedSourceFileIdx].Name), - false, shipper_util.LoggerWithFilename(t.logger, files[t.seedSourceFileIdx].Name), func() (io.ReadCloser, error) { - return t.baseCommonIndexSet.GetFile(t.ctx, t.name, "", files[t.seedSourceFileIdx].Name) + level.Info(t.logger).Log("msg", fmt.Sprintf("using %s as seed file", files[seedSourceFileIdx].Name)) + err = shipper_util.DownloadFileFromStorage(compactedDBName, shipper_util.IsCompressedFile(files[seedSourceFileIdx].Name), + false, shipper_util.LoggerWithFilename(t.logger, files[seedSourceFileIdx].Name), func() (io.ReadCloser, error) { + return t.baseCommonIndexSet.GetFile(t.ctx, t.name, "", files[seedSourceFileIdx].Name) }) if err != nil { return err @@ -264,10 +298,11 @@ func (t *table) compactFiles(files []storage.IndexFile) error { return err } + // go through each file and build index in FORMAT1 from FORMAT1 files and FORMAT3 from FORMAT2 files return concurrency.ForEachJob(t.ctx, len(files), readDBsConcurrency, func(ctx context.Context, idx int) error { workNum := idx // skip seed file - if workNum == t.seedSourceFileIdx { + if workNum == seedSourceFileIdx { return nil } fileName := files[idx].Name @@ -293,7 +328,7 @@ func (t *table) writeBatch(bucketName string, batch []indexEntry) error { return t.writeUserIndex(bucketName, batch) } -// writeCommonIndex writes a batch to compactedDB +// writeCommonIndex writes a batch to compactedDB which is for FORMAT1 index func (t *table) writeCommonIndex(batch []indexEntry) error { t.uploadCompactedDB = true return t.compactedDB.Batch(func(tx *bbolt.Tx) error { @@ -313,7 +348,7 @@ func (t *table) writeCommonIndex(batch []indexEntry) error { }) } -// writeUserIndex sends a batch to write to the user index set +// writeUserIndex sends a batch to write to the user index set which is for FORMAT3 index func (t *table) writeUserIndex(userID string, batch []indexEntry) error { ui, err := t.getOrCreateUserIndex(userID) if err != nil { @@ -364,11 +399,9 @@ func openBoltdbFileWithNoSync(path string) (*bbolt.DB, error) { return boltdb, nil } -// findSeedFileIdx returns index of file to use as seed which would then get index from all the files written to. -// It tries to find previously compacted file(which has uploaderName) which would be the biggest file. -// In a large cluster, using previously compacted file as seed would significantly reduce compaction time. +// compactedFileIdx returns index of previously compacted file(which starts with uploaderName). // If it can't find a previously compacted file, it would return -1. -func findSeedFileIdx(files []storage.IndexFile) int { +func compactedFileIdx(files []storage.IndexFile) int { for i, file := range files { if strings.HasPrefix(file.Name, uploaderName) { return i diff --git a/pkg/storage/stores/shipper/compactor/table_test.go b/pkg/storage/stores/shipper/compactor/table_test.go index adac8c4832cd..34467df1e6f9 100644 --- a/pkg/storage/stores/shipper/compactor/table_test.go +++ b/pkg/storage/stores/shipper/compactor/table_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" @@ -269,10 +270,10 @@ func TestTable_Compaction(t *testing.T) { } } -type TableMarkerFunc func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) +type TableMarkerFunc func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) -func (t TableMarkerFunc) MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { - return t(ctx, tableName, db) +func (t TableMarkerFunc) MarkForDelete(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) { + return t(ctx, tableName, userID, db, logger) } type IntervalMayHaveExpiredChunksFunc func(interval model.Interval, userID string) bool @@ -309,7 +310,7 @@ func TestTable_CompactionRetention(t *testing.T) { _, err := ioutil.ReadDir(filepath.Join(storagePath, tableName)) require.True(t, os.IsNotExist(err)) }, - tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) { return true, true, nil }), }, @@ -321,7 +322,7 @@ func TestTable_CompactionRetention(t *testing.T) { }) compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName))) }, - tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) { return false, true, nil }), }, @@ -333,7 +334,7 @@ func TestTable_CompactionRetention(t *testing.T) { }) compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName))) }, - tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) { return false, false, nil }), }, @@ -554,7 +555,7 @@ func TestTable_RecreateCompactedDB(t *testing.T) { }) compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName))) }, - tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) { return false, false, nil }), expectedIndexSetState: indexSetState{ @@ -571,7 +572,7 @@ func TestTable_RecreateCompactedDB(t *testing.T) { }) compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName))) }, - tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) { return false, false, nil }), compactedDBMtime: time.Now().Add(-recreateCompactedDBOlderThan / 2), @@ -585,7 +586,7 @@ func TestTable_RecreateCompactedDB(t *testing.T) { }) compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName))) }, - tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) { return false, true, nil }), expectedIndexSetState: indexSetState{ @@ -599,7 +600,7 @@ func TestTable_RecreateCompactedDB(t *testing.T) { _, err := ioutil.ReadDir(filepath.Join(storagePath, tableName)) require.True(t, os.IsNotExist(err)) }, - tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) { return true, true, nil }), expectedIndexSetState: indexSetState{ @@ -616,7 +617,7 @@ func TestTable_RecreateCompactedDB(t *testing.T) { }) compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName))) }, - tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) { + tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) { return false, false, nil }), compactedDBMtime: time.Now().Add(-(recreateCompactedDBOlderThan + time.Minute)), diff --git a/pkg/storage/stores/shipper/downloads/index_set.go b/pkg/storage/stores/shipper/downloads/index_set.go index 3c9d0f8c32b3..36c9d1b7ff46 100644 --- a/pkg/storage/stores/shipper/downloads/index_set.go +++ b/pkg/storage/stores/shipper/downloads/index_set.go @@ -117,6 +117,7 @@ func (t *indexSet) Init() (err error) { return err } + // open all the locally present files first to avoid downloading them again during sync operation below. for _, fileInfo := range filesInfo { if fileInfo.IsDir() { continue diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index 141f74fcf064..b2ab47418988 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -101,13 +101,15 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBI level.Debug(table.logger).Log("msg", fmt.Sprintf("opening locally present files for table %s", name), "files", fmt.Sprint(filesInfo)) + // common index files are outside the directories and user index files are in the directories for _, fileInfo := range filesInfo { if !fileInfo.IsDir() { continue } - userIndexSet, err := NewIndexSet(name, fileInfo.Name(), filepath.Join(cacheLocation, fileInfo.Name()), - table.baseUserIndexSet, boltDBIndexClient, table.logger, metrics) + userID := fileInfo.Name() + userIndexSet, err := NewIndexSet(name, userID, filepath.Join(cacheLocation, userID), + table.baseUserIndexSet, boltDBIndexClient, loggerWithUserID(table.logger, userID), metrics) if err != nil { return nil, err } @@ -117,7 +119,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBI return nil, err } - table.indexSets[fileInfo.Name()] = userIndexSet + table.indexSets[userID] = userIndexSet } commonIndexSet, err := NewIndexSet(name, "", cacheLocation, table.baseCommonIndexSet, @@ -155,6 +157,7 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca return err } + // query both user and common index for _, uid := range []string{userID, ""} { indexSet, err := t.getOrCreateIndexSet(uid) if err != nil { @@ -277,7 +280,8 @@ func (t *Table) getOrCreateIndexSet(id string) (IndexSet, error) { } // instantiate the index set, add it to the map - indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.boltDBIndexClient, t.logger, t.metrics) + indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.boltDBIndexClient, + loggerWithUserID(t.logger, id), t.metrics) if err != nil { return nil, err } @@ -336,3 +340,11 @@ func (t *Table) downloadUserIndexes(ctx context.Context, userIDs []string) error return indexSet.AwaitReady(ctx) }) } + +func loggerWithUserID(logger log.Logger, userID string) log.Logger { + if userID == "" { + return logger + } + + return log.With(logger, "user-id", userID) +}