Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more context to logs, improve comments for clarity in boltdb-shipper code #5341

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/compactor/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (is *indexSet) initUserIndexSet(workingDir string) {
}

compactedDBName := filepath.Join(workingDir, fmt.Sprint(time.Now().Unix()))
seedFileIdx := findSeedFileIdx(is.sourceObjects)
seedFileIdx := compactedFileIdx(is.sourceObjects)

if len(is.sourceObjects) > 0 {
// we would only have compacted files in user index folder, so it is not expected to have -1 for seedFileIdx but
Expand Down Expand Up @@ -236,7 +236,7 @@ func (is *indexSet) writeBatch(_ string, batch []indexEntry) error {

// runRetention runs the retention on index set
func (is *indexSet) runRetention(tableMarker retention.TableMarker) error {
empty, modified, err := tableMarker.MarkForDelete(is.ctx, is.tableName, is.compactedDB)
empty, modified, err := tableMarker.MarkForDelete(is.ctx, is.tableName, is.userID, is.compactedDB, is.logger)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/compactor/retention/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func newMarkerMetrics(r prometheus.Registerer) *markerMetrics {
tableProcessedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki_boltdb_shipper",
Name: "retention_marker_table_processed_total",
Help: "Total amount of table processed per action.",
}, []string{"table", "action"}),
Help: "Total amount of table processed for each user per action. Empty string for user_id is for common index",
}, []string{"table", "user_id", "action"}),
tableMarksCreatedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki_boltdb_shipper",
Name: "retention_marker_count_total",
Expand Down
19 changes: 10 additions & 9 deletions pkg/storage/stores/shipper/compactor/retention/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand All @@ -32,7 +33,7 @@ var errNoChunksFound = errors.New("no chunks found in table, please check if the

type TableMarker interface {
// MarkForDelete marks chunks to delete for a given table and returns if it's empty or modified.
MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error)
MarkForDelete(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error)
}

type Marker struct {
Expand All @@ -58,24 +59,24 @@ func NewMarker(workingDirectory string, config storage.SchemaConfig, expiration
}

// MarkForDelete marks all chunks expired for a given table.
func (t *Marker) MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
func (t *Marker) MarkForDelete(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
start := time.Now()
status := statusSuccess
defer func() {
t.markerMetrics.tableProcessedDurationSeconds.WithLabelValues(tableName, status).Observe(time.Since(start).Seconds())
level.Debug(util_log.Logger).Log("msg", "finished to process table", "table", tableName, "duration", time.Since(start))
level.Debug(logger).Log("msg", "finished to process table", "duration", time.Since(start))
}()
level.Debug(util_log.Logger).Log("msg", "starting to process table", "table", tableName)
level.Debug(logger).Log("msg", "starting to process table")

empty, modified, err := t.markTable(ctx, tableName, db)
empty, modified, err := t.markTable(ctx, tableName, userID, db)
if err != nil {
status = statusFailure
return false, false, err
}
return empty, modified, nil
}

func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
func (t *Marker) markTable(ctx context.Context, tableName, userID string, db *bbolt.DB) (bool, bool, error) {
schemaCfg, ok := schemaPeriodForTable(t.config, tableName)
if !ok {
return false, false, fmt.Errorf("could not find schema for table: %s", tableName)
Expand Down Expand Up @@ -119,14 +120,14 @@ func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB)
return false, false, err
}
if empty {
t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionDeleted).Inc()
t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, userID, tableActionDeleted).Inc()
return empty, true, nil
}
if !modified {
t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionNone).Inc()
t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, userID, tableActionNone).Inc()
return empty, modified, nil
}
t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionModified).Inc()
t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, userID, tableActionModified).Inc()
return empty, modified, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/objectclient"
"github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
)

Expand Down Expand Up @@ -155,7 +156,7 @@ func Test_Retention(t *testing.T) {
marker, err := NewMarker(workDir, store.schemaCfg, expiration, nil, prometheus.NewRegistry())
require.NoError(t, err)
for _, table := range store.indexTables() {
_, _, err := marker.MarkForDelete(context.Background(), table.name, table.DB)
_, _, err := marker.MarkForDelete(context.Background(), table.name, "", table.DB, util_log.Logger)
require.Nil(t, err)
table.Close()

Expand Down
67 changes: 50 additions & 17 deletions pkg/storage/stores/shipper/compactor/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,38 @@ import (
util_log "github.com/grafana/loki/pkg/util/log"
)

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Below we show various formats that we have for structuring index in the object store. //
// //
// FORMAT1 FORMAT2 FORMAT3 //
// //
// table1 table1 table1 //
// | | | //
// ----> db1.gz ----> db1.gz ----> user1 //
// | | | | //
// ----> index ----> user1 | ----> db1.gz //
// ----> user2 | | //
// | ----> index //
// ----> user2 //
// | //
// ----> db1.gz //
// | //
// ----> index //
// //
// FORMAT1 - `table1` has 1 db named db1.gz and 1 boltdb bucket named `index` which contains index for all the users. //
// It is in use when the flag to build per user index is not enabled. //
// Ingesters write the index in Format1 which then compactor compacts down in same format. //
// //
// FORMAT2 - `table1` has 1 db named db1.gz and 1 boltdb bucket each for `user1` and `user2` containing //
// index just for those users. //
// It is an intermediate format built by ingesters when the flag to build per user index is enabled. //
// //
// FORMAT3 - `table1` has 1 folder each for `user1` and `user2` containing index files having index just for those users. //
// Compactor builds index in this format from Format2. //
// //
// THING TO NOTE HERE IS COMPACTOR BUILDS INDEX IN FORMAT1 FROM FORMAT1 AND FORMAT3 FROM FORMAT2. //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

const (
uploaderName = "compactor"

Expand Down Expand Up @@ -60,7 +92,6 @@ type table struct {
usersWithPerUserIndex []string
uploadCompactedDB bool
compactedDB *bbolt.DB
seedSourceFileIdx int
logger log.Logger

ctx context.Context
Expand All @@ -83,7 +114,6 @@ func newTable(ctx context.Context, workingDirectory string, indexStorageClient s
indexSets: map[string]*indexSet{},
baseUserIndexSet: storage.NewIndexSet(indexStorageClient, true),
baseCommonIndexSet: storage.NewIndexSet(indexStorageClient, false),
seedSourceFileIdx: -1,
}
table.logger = log.With(util_log.Logger, "table-name", table.name)

Expand Down Expand Up @@ -130,8 +160,8 @@ func (t *table) compact(applyRetention bool) error {
return err
}
} else if len(indexFiles) == 1 && (applyRetention || mustRecreateCompactedDB(indexFiles)) {
// we have just 1 common index file which is already compacted.
// initialize common compacted db if we need to apply retention, or we need to recreate it
t.seedSourceFileIdx = 0
downloadAt := filepath.Join(t.workingDirectory, indexFiles[0].Name)
err = shipper_util.DownloadFileFromStorage(downloadAt, shipper_util.IsCompressedFile(indexFiles[0].Name),
false, shipper_util.LoggerWithFilename(t.logger, indexFiles[0].Name),
Expand Down Expand Up @@ -182,6 +212,9 @@ func (t *table) done() error {
return err
}

// initialize the user index sets for:
// - compaction if we have more than 1 index file, taken care of by index set initialization
// - recreation if mustRecreateCompactedDB says so, taken care of by indexSet.done call below
if len(indexFiles) > 1 || mustRecreateCompactedDB(indexFiles) {
t.indexSets[userID], err = t.getOrCreateUserIndex(userID)
if err != nil {
Expand Down Expand Up @@ -243,16 +276,17 @@ func (t *table) compactFiles(files []storage.IndexFile) error {
level.Info(t.logger).Log("msg", "starting compaction of dbs")

compactedDBName := filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix()))
t.seedSourceFileIdx = findSeedFileIdx(files)
// if we find a previously compacted file, use it as a seed file to copy other index into it
seedSourceFileIdx := compactedFileIdx(files)

if t.seedSourceFileIdx != -1 {
if seedSourceFileIdx != -1 {
t.uploadCompactedDB = true
compactedDBName = filepath.Join(t.workingDirectory, files[t.seedSourceFileIdx].Name)
compactedDBName = filepath.Join(t.workingDirectory, files[seedSourceFileIdx].Name)

level.Info(t.logger).Log("msg", fmt.Sprintf("using %s as seed file", files[t.seedSourceFileIdx].Name))
err = shipper_util.DownloadFileFromStorage(compactedDBName, shipper_util.IsCompressedFile(files[t.seedSourceFileIdx].Name),
false, shipper_util.LoggerWithFilename(t.logger, files[t.seedSourceFileIdx].Name), func() (io.ReadCloser, error) {
return t.baseCommonIndexSet.GetFile(t.ctx, t.name, "", files[t.seedSourceFileIdx].Name)
level.Info(t.logger).Log("msg", fmt.Sprintf("using %s as seed file", files[seedSourceFileIdx].Name))
err = shipper_util.DownloadFileFromStorage(compactedDBName, shipper_util.IsCompressedFile(files[seedSourceFileIdx].Name),
false, shipper_util.LoggerWithFilename(t.logger, files[seedSourceFileIdx].Name), func() (io.ReadCloser, error) {
return t.baseCommonIndexSet.GetFile(t.ctx, t.name, "", files[seedSourceFileIdx].Name)
})
if err != nil {
return err
Expand All @@ -264,10 +298,11 @@ func (t *table) compactFiles(files []storage.IndexFile) error {
return err
}

// go through each file and build index in FORMAT1 from FORMAT1 files and FORMAT3 from FORMAT2 files
return concurrency.ForEachJob(t.ctx, len(files), readDBsConcurrency, func(ctx context.Context, idx int) error {
workNum := idx
// skip seed file
if workNum == t.seedSourceFileIdx {
if workNum == seedSourceFileIdx {
return nil
}
fileName := files[idx].Name
Expand All @@ -293,7 +328,7 @@ func (t *table) writeBatch(bucketName string, batch []indexEntry) error {
return t.writeUserIndex(bucketName, batch)
}

// writeCommonIndex writes a batch to compactedDB
// writeCommonIndex writes a batch to compactedDB which is for FORMAT1 index
func (t *table) writeCommonIndex(batch []indexEntry) error {
t.uploadCompactedDB = true
return t.compactedDB.Batch(func(tx *bbolt.Tx) error {
Expand All @@ -313,7 +348,7 @@ func (t *table) writeCommonIndex(batch []indexEntry) error {
})
}

// writeUserIndex sends a batch to write to the user index set
// writeUserIndex sends a batch to write to the user index set which is for FORMAT3 index
func (t *table) writeUserIndex(userID string, batch []indexEntry) error {
ui, err := t.getOrCreateUserIndex(userID)
if err != nil {
Expand Down Expand Up @@ -364,11 +399,9 @@ func openBoltdbFileWithNoSync(path string) (*bbolt.DB, error) {
return boltdb, nil
}

// findSeedFileIdx returns index of file to use as seed which would then get index from all the files written to.
// It tries to find previously compacted file(which has uploaderName) which would be the biggest file.
// In a large cluster, using previously compacted file as seed would significantly reduce compaction time.
// compactedFileIdx returns index of previously compacted file(which starts with uploaderName).
// If it can't find a previously compacted file, it would return -1.
func findSeedFileIdx(files []storage.IndexFile) int {
func compactedFileIdx(files []storage.IndexFile) int {
for i, file := range files {
if strings.HasPrefix(file.Name, uploaderName) {
return i
Expand Down
23 changes: 12 additions & 11 deletions pkg/storage/stores/shipper/compactor/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
Expand Down Expand Up @@ -269,10 +270,10 @@ func TestTable_Compaction(t *testing.T) {
}
}

type TableMarkerFunc func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error)
type TableMarkerFunc func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error)

func (t TableMarkerFunc) MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
return t(ctx, tableName, db)
func (t TableMarkerFunc) MarkForDelete(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return t(ctx, tableName, userID, db, logger)
}

type IntervalMayHaveExpiredChunksFunc func(interval model.Interval, userID string) bool
Expand Down Expand Up @@ -309,7 +310,7 @@ func TestTable_CompactionRetention(t *testing.T) {
_, err := ioutil.ReadDir(filepath.Join(storagePath, tableName))
require.True(t, os.IsNotExist(err))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return true, true, nil
}),
},
Expand All @@ -321,7 +322,7 @@ func TestTable_CompactionRetention(t *testing.T) {
})
compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName)))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return false, true, nil
}),
},
Expand All @@ -333,7 +334,7 @@ func TestTable_CompactionRetention(t *testing.T) {
})
compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName)))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return false, false, nil
}),
},
Expand Down Expand Up @@ -554,7 +555,7 @@ func TestTable_RecreateCompactedDB(t *testing.T) {
})
compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName)))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return false, false, nil
}),
expectedIndexSetState: indexSetState{
Expand All @@ -571,7 +572,7 @@ func TestTable_RecreateCompactedDB(t *testing.T) {
})
compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName)))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return false, false, nil
}),
compactedDBMtime: time.Now().Add(-recreateCompactedDBOlderThan / 2),
Expand All @@ -585,7 +586,7 @@ func TestTable_RecreateCompactedDB(t *testing.T) {
})
compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName)))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return false, true, nil
}),
expectedIndexSetState: indexSetState{
Expand All @@ -599,7 +600,7 @@ func TestTable_RecreateCompactedDB(t *testing.T) {
_, err := ioutil.ReadDir(filepath.Join(storagePath, tableName))
require.True(t, os.IsNotExist(err))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return true, true, nil
}),
expectedIndexSetState: indexSetState{
Expand All @@ -616,7 +617,7 @@ func TestTable_RecreateCompactedDB(t *testing.T) {
})
compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName)))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return false, false, nil
}),
compactedDBMtime: time.Now().Add(-(recreateCompactedDBOlderThan + time.Minute)),
Expand Down
Loading