diff --git a/pkg/storage/chunk/local/fixtures.go b/pkg/storage/chunk/local/fixtures.go index 32839eb8b09b..6b9c24de7729 100644 --- a/pkg/storage/chunk/local/fixtures.go +++ b/pkg/storage/chunk/local/fixtures.go @@ -43,7 +43,7 @@ func (f *fixture) Clients() ( return } - chunkClient = objectclient.NewClient(oClient, objectclient.Base64Encoder, chunk.SchemaConfig{}) + chunkClient = objectclient.NewClient(oClient, objectclient.FSEncoder, chunk.SchemaConfig{}) tableClient, err = NewTableClient(f.dirname) if err != nil { diff --git a/pkg/storage/chunk/objectclient/client.go b/pkg/storage/chunk/objectclient/client.go index 7aac07319c53..78d8b24a61c9 100644 --- a/pkg/storage/chunk/objectclient/client.go +++ b/pkg/storage/chunk/objectclient/client.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/base64" + "strings" "github.com/pkg/errors" @@ -13,14 +14,30 @@ import ( // KeyEncoder is used to encode chunk keys before writing/retrieving chunks // from the underlying ObjectClient -type KeyEncoder func(string) string +// Schema/Chunk are passed as arguments to allow this to improve over revisions +type KeyEncoder func(schema chunk.SchemaConfig, chk chunk.Chunk) string -// Base64Encoder is used to encode chunk keys in base64 before storing/retrieving +// base64Encoder is used to encode chunk keys in base64 before storing/retrieving // them from the ObjectClient -var Base64Encoder = func(key string) string { +var base64Encoder = func(key string) string { return base64.StdEncoding.EncodeToString([]byte(key)) } +var FSEncoder = func(schema chunk.SchemaConfig, chk chunk.Chunk) string { + // Filesystem encoder pre-v12 encodes the chunk as one base64 string. + // This has the downside of making them opaque and storing all chunks in a single + // directory, hurting performance at scale and discoverability. + // Post v12, we respect the directory structure imposed by chunk keys. + key := schema.ExternalKey(chk) + if schema.VersionForChunk(chk) > 11 { + split := strings.LastIndexByte(key, '/') + encodedTail := base64Encoder(key[split+1:]) + return strings.Join([]string{key[:split], encodedTail}, "/") + + } + return base64Encoder(key) +} + const defaultMaxParallel = 150 // Client is used to store chunks in object store backends @@ -56,7 +73,6 @@ func (o *Client) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { var ( chunkKeys []string chunkBufs [][]byte - key string ) for i := range chunks { @@ -65,10 +81,11 @@ func (o *Client) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { return err } - key = o.schema.ExternalKey(chunks[i]) - + var key string if o.keyEncoder != nil { - key = o.keyEncoder(key) + key = o.keyEncoder(o.schema, chunks[i]) + } else { + key = o.schema.ExternalKey(chunks[i]) } chunkKeys = append(chunkKeys, key) @@ -109,7 +126,7 @@ func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContex key := o.schema.ExternalKey(c) if o.keyEncoder != nil { - key = o.keyEncoder(key) + key = o.keyEncoder(o.schema, c) } readCloser, size, err := o.store.GetObject(ctx, key) @@ -137,7 +154,11 @@ func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContex func (o *Client) DeleteChunk(ctx context.Context, userID, chunkID string) error { key := chunkID if o.keyEncoder != nil { - key = o.keyEncoder(key) + c, err := chunk.ParseExternalKey(userID, key) + if err != nil { + return err + } + key = o.keyEncoder(o.schema, c) } return o.store.DeleteObject(ctx, key) } diff --git a/pkg/storage/chunk/objectclient/client_test.go b/pkg/storage/chunk/objectclient/client_test.go new file mode 100644 index 000000000000..2fa659a2806a --- /dev/null +++ b/pkg/storage/chunk/objectclient/client_test.go @@ -0,0 +1,79 @@ +package objectclient + +import ( + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/chunk" +) + +func MustParseDayTime(s string) chunk.DayTime { + t, err := time.Parse("2006-01-02", s) + if err != nil { + panic(err) + } + return chunk.DayTime{ + Time: model.TimeFromUnix(t.Unix()), + } +} + +func TestFSEncoder(t *testing.T) { + schema := chunk.SchemaConfig{ + Configs: []chunk.PeriodConfig{ + { + From: MustParseDayTime("2020-01-01"), + Schema: "v11", + }, + { + From: MustParseDayTime("2022-01-01"), + Schema: "v12", + }, + }, + } + + // chunk that resolves to v11 + oldChunk := chunk.Chunk{ + UserID: "fake", + From: MustParseDayTime("2020-01-02").Time, + Through: MustParseDayTime("2020-01-03").Time, + Checksum: 123, + Fingerprint: 456, + ChecksumSet: true, + } + + // chunk that resolves to v12 + newChunk := chunk.Chunk{ + UserID: "fake", + From: MustParseDayTime("2022-01-02").Time, + Through: MustParseDayTime("2022-01-03").Time, + Checksum: 123, + Fingerprint: 456, + ChecksumSet: true, + } + + for _, tc := range []struct { + desc string + from string + exp string + }{ + { + desc: "before v12 encodes entire chunk", + from: schema.ExternalKey(oldChunk), + exp: "ZmFrZS8xYzg6MTZmNjM4ZDQ0MDA6MTZmNjhiM2EwMDA6N2I=", + }, + { + desc: "v12+ encodes encodes the non-directory trail", + from: schema.ExternalKey(newChunk), + exp: "fake/1c8/MTdlMTgxNWY4MDA6MTdlMWQzYzU0MDA6N2I=", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + chk, err := chunk.ParseExternalKey("fake", tc.from) + require.Nil(t, err) + require.Equal(t, tc.exp, FSEncoder(schema, chk)) + }) + } +} diff --git a/pkg/storage/chunk/schema_config.go b/pkg/storage/chunk/schema_config.go index fb23243d7cb6..cd2d67636e0d 100644 --- a/pkg/storage/chunk/schema_config.go +++ b/pkg/storage/chunk/schema_config.go @@ -504,6 +504,14 @@ func (cfg SchemaConfig) ExternalKey(chunk Chunk) string { } } +// VersionForChunk will return the schema version associated with the `From` timestamp of a chunk. +// The schema and chunk must be valid+compatible as the errors are not checked. +func (cfg SchemaConfig) VersionForChunk(c Chunk) int { + p, _ := cfg.SchemaForTime(c.From) + v, _ := p.VersionAsInt() + return v +} + // pre-checksum func (cfg SchemaConfig) legacyExternalKey(chunk Chunk) string { // This is the inverse of chunk.parseLegacyExternalKey, with "/" prepended. diff --git a/pkg/storage/chunk/storage/factory.go b/pkg/storage/chunk/storage/factory.go index a960d4d413e0..fa977f6231ed 100644 --- a/pkg/storage/chunk/storage/factory.go +++ b/pkg/storage/chunk/storage/factory.go @@ -330,7 +330,7 @@ func NewChunkClient(name string, cfg Config, schemaCfg chunk.SchemaConfig, clien if err != nil { return nil, err } - return objectclient.NewClientWithMaxParallel(store, objectclient.Base64Encoder, cfg.MaxParallelGetChunk, schemaCfg), nil + return objectclient.NewClientWithMaxParallel(store, objectclient.FSEncoder, cfg.MaxParallelGetChunk, schemaCfg), nil case StorageTypeGrpc: return grpc.NewStorageClient(cfg.GrpcConfig, schemaCfg) default: diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 5f9abefced04..43d01f264bcf 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -191,7 +191,7 @@ func (c *Compactor) init(storageConfig storage.Config, schemaConfig loki_storage if c.cfg.RetentionEnabled { var encoder objectclient.KeyEncoder if _, ok := objectClient.(*local.FSObjectClient); ok { - encoder = objectclient.Base64Encoder + encoder = objectclient.FSEncoder } chunkClient := objectclient.NewClient(objectClient, encoder, schemaConfig.SchemaConfig) diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index 10dffb698b5d..dc62d298f5dd 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -389,7 +389,7 @@ func TestChunkRewriter(t *testing.T) { require.NoError(t, store.Put(context.TODO(), []chunk.Chunk{tt.chunk})) store.Stop() - chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir, cm), objectclient.Base64Encoder, schemaCfg.SchemaConfig) + chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir, cm), objectclient.FSEncoder, schemaCfg.SchemaConfig) for _, indexTable := range store.indexTables() { err := indexTable.DB.Update(func(tx *bbolt.Tx) error { bucket := tx.Bucket(local.IndexBucketName) @@ -668,7 +668,7 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { tables := store.indexTables() require.Len(t, tables, len(tc.expectedDeletedSeries)) - chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir, cm), objectclient.Base64Encoder, schemaCfg.SchemaConfig) + chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir, cm), objectclient.FSEncoder, schemaCfg.SchemaConfig) for i, table := range tables { seriesCleanRecorder := newSeriesCleanRecorder()