diff --git a/docs/README.md b/docs/README.md index 790e2b3ad5c8..6f87f3b9fadc 100644 --- a/docs/README.md +++ b/docs/README.md @@ -45,6 +45,7 @@ simplifies the operation and significantly lowers the cost of Loki. 4. [Storage](operations/storage/README.md) 1. [Table Manager](operations/storage/table-manager.md) 2. [Retention](operations/storage/retention.md) + 3. [BoltDB Shipper](operations/storage/boltdb-shipper.md) 5. [Multi-tenancy](operations/multi-tenancy.md) 6. [Loki Canary](operations/loki-canary.md) 8. [HTTP API](api.md) diff --git a/docs/operations/storage/boltdb-shipper.md b/docs/operations/storage/boltdb-shipper.md new file mode 100644 index 000000000000..f6e70d7d217a --- /dev/null +++ b/docs/operations/storage/boltdb-shipper.md @@ -0,0 +1,86 @@ +# Loki with BoltDB Shipper + +:warning: BoltDB Shipper is still an experimental feature. It is not recommended to be used in production environments. + +BoltDB Shipper lets you run Loki without any dependency on NoSQL stores for storing index. +It locally stores the index in BoltDB files instead and keeps shipping those files to a shared object store i.e the same object store which is being used for storing chunks. +It also keeps syncing BoltDB files from shared object store to a configured local directory for getting index entries created by other services of same Loki cluster. +This helps run Loki with one less dependency and also saves costs in storage since object stores are likely to be much cheaper compared to cost of a hosted NoSQL store or running a self hosted instance of Cassandra. + +## Example Configuration + +Example configuration with GCS: + +```yaml +schema_config: + configs: + - from: 2018-04-15 + store: boltdb-shipper + object_store: gcs + schema: v11 + index: + prefix: loki_index_ + period: 168h + +storage_config: + gcs: + bucket_name: GCS_BUCKET_NAME + + boltdb_shipper_config: + active_index_directory: /loki/index + cache_location: /loki/boltdb-cache +``` + +This would run Loki with BoltDB Shipper storing BoltDB files locally at `/loki/index` and chunks at configured `GCS_BUCKET_NAME`. +It would also keep shipping BoltDB files periodically to same configured bucket. +It would also keep downloading BoltDB files from shared bucket uploaded by other ingesters to `/loki/boltdb-cache` folder locally. + +## Operational Details + +Loki can be configured to run as just a single vertically scaled instance or as a cluster of horizontally scaled single binary(running all Loki services) instances or in micro-services mode running just one of the services in each instance. +When it comes to reads and writes, Ingesters are the ones which writes the index and chunks to stores and Queriers are the ones which reads index and chunks from the store for serving requests. + +Before we get into more details, it is important to understand how Loki manages index in stores. Loki shards index as per configured period which defaults to 7 days i.e when it comes to table based stores like Bigtable/Cassandra/DynamoDB there would be separate table per week containing index for that week. +In case of BoltDB files there is no concept of tables so it creates a BoltDB file per week. Files/Tables created per week are identified by a configured `prefix_` + ``. +Here `` in case of default config would be week number since epoch. +For example, if you have prefix set to `loki_index_` and a write requests comes in on 20th April 2020, it would be stored in table/file named `loki_index_2624` because it has been `2623` weeks since epoch and we are in `2624`th week. +Since sharding of index creates multiple files when using BoltDB, BoltDB Shipper would create a folder per week and add files for that week in that folder and names those files after ingesters which created them. + +To show how BoltDB files in shared object store would look like, let us consider 2 ingesters named `ingester-0` and `ingester-1` running in a Loki cluster and +they both having shipped files for week `2623` and `2624` with prefix `loki_index_`, here is how the files would look like: + +``` +└── index + ├── loki_index_2623 + │ ├── ingester-0 + │ └── ingester-1 + └── loki_index_2624 + ├── ingester-0 + └── ingester-1 +``` +*NOTE: We also add a timestamp to names of the files to randomize the names to avoid overwriting files when running Ingesters with same name and not have a persistent storage. Timestamps not shown here for simplification* + +Let us talk about more in depth about how both Ingesters and Queriers work when running them with BoltDB Shipper. + +### Ingesters + +Ingesters keep writing the index to BoltDB files in `active_index_directory` and BoltDB Shipper keeps looking for new and updated files in that directory every 15 Minutes to upload them to the shared object store. +When running Loki in clustered mode there could be multiple ingesters serving write requests hence each of them generating BoltDB files locally. + +*NOTE: To avoid any loss of index when Ingester crashes it is recommended to run Ingesters as statefulset(when using k8s) with a persistent storage for storing index files.* + +Another important detail to note is when chunks are flushed they are available for reads in object store instantly while index is not since we only upload them every 15 Minutes with BoltDB shipper. +To avoid missing logs from queries which happen to be indexed in BoltDB files which are not shipped yet, while serving queries for in-memory logs, Ingesters would also do a store query for `now()` - (`max_chunk_age` + `30 Min`) to ``. + +### Queriers + +Queriers lazily loads BoltDB files from shared object store to configured `cache_location`. +When a querier receives a read request, query range from request is resolved to period numbers and all the files for those period numbers are downloaded to `cache_location` if not already. +Once we have downloaded files for a period we keep looking for updates in shared object store and download them every 15 Minutes by default. +Frequency for checking updates can be configured with `resync_interval` config. + +To avoid keeping downloaded index files forever there is a ttl for them which defaults to 24 hours, which means if index files for a period are not used for 24 hours they would be removed from cache location. +ttl can be configured using `cache_ttl` config. + + + diff --git a/go.mod b/go.mod index 5c925cf393e8..f03c7d95fc44 100644 --- a/go.mod +++ b/go.mod @@ -51,6 +51,7 @@ require ( github.com/uber/jaeger-client-go v2.20.1+incompatible github.com/ugorji/go v1.1.7 // indirect github.com/weaveworks/common v0.0.0-20200310113808-2708ba4e60a4 + go.etcd.io/bbolt v1.3.3 go.etcd.io/etcd v0.0.0-20200401174654-e694b7bb0875 // indirect golang.org/x/net v0.0.0-20200226121028-0de0cce0169b google.golang.org/grpc v1.25.1 diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index d1a99bdff2fe..82e236e61c76 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/grafana/loki/pkg/logql" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" @@ -223,7 +225,7 @@ func (s *testStore) IsLocal() bool { return false } -func (s *testStore) LazyQuery(ctx context.Context, req *logproto.QueryRequest) (iter.EntryIterator, error) { +func (s *testStore) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) { return nil, nil } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7a5b8c1bf4ee..cc15eadf2a3c 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -22,8 +22,12 @@ import ( "github.com/cortexproject/cortex/pkg/util/services" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/ingester/client" + "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/stats" "github.com/grafana/loki/pkg/util/validation" ) @@ -63,6 +67,9 @@ type Config struct { // For testing, you can override the address and ID of this ingester. ingesterClientFactory func(cfg client.Config, addr string) (client.HealthAndIngesterClient, error) + + QueryStore bool `yaml:"-"` + QueryStoreMaxLookBackPeriod time.Duration `yaml:"-"` } // RegisterFlags registers the flags. @@ -113,6 +120,7 @@ type Ingester struct { // ChunkStore is the interface we need to store chunks. type ChunkStore interface { Put(ctx context.Context, chunks []chunk.Chunk) error + LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) } // New makes a new Ingester. @@ -241,13 +249,35 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance { // Query the ingests for log streams matching a set of matchers. func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error { - instanceID, err := user.ExtractOrgID(queryServer.Context()) + // initialize stats collection for ingester queries and set grpc trailer with stats. + ctx := stats.NewContext(queryServer.Context()) + defer stats.SendAsTrailer(ctx, queryServer) + + instanceID, err := user.ExtractOrgID(ctx) if err != nil { return err } instance := i.getOrCreateInstance(instanceID) - return instance.Query(req, queryServer) + itrs, err := instance.Query(ctx, req) + if err != nil { + return err + } + + if storeReq := buildStoreRequest(i.cfg, req); storeReq != nil { + storeItr, err := i.store.LazyQuery(ctx, logql.SelectParams{QueryRequest: storeReq}) + if err != nil { + return err + } + + itrs = append(itrs, storeItr) + } + + heapItr := iter.NewHeapIterator(ctx, itrs, req.Direction) + + defer helpers.LogError("closing iterator", heapItr.Close) + + return sendBatches(queryServer.Context(), heapItr, queryServer, req.Limit) } // Label returns the set of labels for the stream this ingester knows about. @@ -356,3 +386,30 @@ func (i *Ingester) TailersCount(ctx context.Context, in *logproto.TailersCountRe return &resp, nil } + +// buildStoreRequest returns a store request from an ingester request, returns nit if QueryStore is set to false in configuration. +// The request may be truncated due to QueryStoreMaxLookBackPeriod which limits the range of request to make sure +// we only query enough to not miss any data and not add too to many duplicates by covering the who time range in query. +func buildStoreRequest(cfg Config, req *logproto.QueryRequest) *logproto.QueryRequest { + if !cfg.QueryStore { + return nil + } + start := req.Start + end := req.End + if cfg.QueryStoreMaxLookBackPeriod != 0 { + oldestStartTime := time.Now().Add(-cfg.QueryStoreMaxLookBackPeriod) + if oldestStartTime.After(req.Start) { + start = oldestStartTime + } + } + + if start.After(end) { + return nil + } + + newRequest := *req + newRequest.Start = start + newRequest.End = end + + return &newRequest +} diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index acd00e7dff20..4bf9f5b0062b 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -7,6 +7,9 @@ import ( "testing" "time" + "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/logql" + "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" @@ -247,6 +250,10 @@ func (s *mockStore) Put(ctx context.Context, chunks []chunk.Chunk) error { return nil } +func (s *mockStore) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) { + return nil, nil +} + type mockQuerierServer struct { ctx context.Context resps []*logproto.QueryResponse @@ -269,3 +276,77 @@ func defaultLimitsTestConfig() validation.Limits { flagext.DefaultValues(&limits) return limits } + +func TestIngester_buildStoreRequest(t *testing.T) { + ingesterQueryRequest := logproto.QueryRequest{ + Selector: `{foo="bar"}`, + Limit: 100, + } + + now := time.Now() + + for _, tc := range []struct { + name string + queryStore bool + maxLookBackPeriod time.Duration + ingesterQueryRequest *logproto.QueryRequest + expectedStoreQueryRequest *logproto.QueryRequest + }{ + { + name: "do not query store", + queryStore: false, + ingesterQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-time.Minute), now), + expectedStoreQueryRequest: nil, + }, + { + name: "query store with max look back covering whole request duration", + queryStore: true, + maxLookBackPeriod: time.Hour, + ingesterQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-10*time.Minute), now), + expectedStoreQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-10*time.Minute), now), + }, + { + name: "query store with max look back covering partial request duration", + queryStore: true, + maxLookBackPeriod: time.Hour, + ingesterQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-2*time.Hour), now), + expectedStoreQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-time.Hour), now), + }, + { + name: "query store with max look back not covering request duration at all", + queryStore: true, + maxLookBackPeriod: time.Hour, + ingesterQueryRequest: recreateRequestWithTime(ingesterQueryRequest, now.Add(-4*time.Hour), now.Add(-2*time.Hour)), + expectedStoreQueryRequest: nil, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ingesterConfig := defaultIngesterTestConfig(t) + ingesterConfig.QueryStore = tc.queryStore + ingesterConfig.QueryStoreMaxLookBackPeriod = tc.maxLookBackPeriod + storeRequest := buildStoreRequest(ingesterConfig, tc.ingesterQueryRequest) + if tc.expectedStoreQueryRequest == nil { + require.Nil(t, storeRequest) + return + } + + // because start time of store could be changed and built based on time when function is called we can't predict expected start time. + // So allowing upto 1s difference between expected and actual start time of store query request. + require.Equal(t, tc.expectedStoreQueryRequest.Selector, storeRequest.Selector) + require.Equal(t, tc.expectedStoreQueryRequest.Limit, storeRequest.Limit) + require.Equal(t, tc.expectedStoreQueryRequest.End, storeRequest.End) + + if storeRequest.Start.Sub(tc.expectedStoreQueryRequest.Start) > time.Second { + t.Fatalf("expected upto 1s difference in expected and acutal store request end time but got %d", storeRequest.End.Sub(tc.expectedStoreQueryRequest.End)) + } + }) + } +} + +func recreateRequestWithTime(req logproto.QueryRequest, start, end time.Time) *logproto.QueryRequest { + newReq := req + newReq.Start = start + newReq.End = end + + return &newReq +} diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 5724effb5756..7d05eb97608c 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -186,18 +186,14 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels return s.labels } -func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error { - // initialize stats collection for ingester queries and set grpc trailer with stats. - ctx := stats.NewContext(queryServer.Context()) - defer stats.SendAsTrailer(ctx, queryServer) - +func (i *instance) Query(ctx context.Context, req *logproto.QueryRequest) ([]iter.EntryIterator, error) { expr, err := (logql.SelectParams{QueryRequest: req}).LogSelector() if err != nil { - return err + return nil, err } filter, err := expr.Filter() if err != nil { - return err + return nil, err } ingStats := stats.GetIngesterData(ctx) @@ -215,13 +211,10 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie }, ) if err != nil { - return err + return nil, err } - iter := iter.NewHeapIterator(ctx, iters, req.Direction) - defer helpers.LogError("closing iterator", iter.Close) - - return sendBatches(ctx, iter, queryServer, req.Limit) + return iters, nil } func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) { diff --git a/pkg/logql/stats/context.go b/pkg/logql/stats/context.go index 75dacb9e5912..6b5a6800b3e6 100644 --- a/pkg/logql/stats/context.go +++ b/pkg/logql/stats/context.go @@ -138,9 +138,8 @@ func GetStoreData(ctx context.Context) *StoreData { // Snapshot compute query statistics from a context using the total exec time. func Snapshot(ctx context.Context, execTime time.Duration) Result { - var res Result // ingester data is decoded from grpc trailers. - res.Ingester = decodeTrailers(ctx) + res := decodeTrailers(ctx) // collect data from store. s, ok := ctx.Value(storeKey).(*StoreData) if ok { diff --git a/pkg/logql/stats/grpc.go b/pkg/logql/stats/grpc.go index 8dad127c6b60..e1640fda5edc 100644 --- a/pkg/logql/stats/grpc.go +++ b/pkg/logql/stats/grpc.go @@ -14,6 +14,7 @@ import ( const ( ingesterDataKey = "ingester_data" chunkDataKey = "chunk_data" + storeDataKey = "store_data" ) type trailerCollector struct { @@ -70,32 +71,45 @@ func encodeTrailer(ctx context.Context) (metadata.MD, error) { } meta.Set(chunkDataKey, data) } + + storeData, ok := ctx.Value(storeKey).(*StoreData) + if ok { + data, err := jsoniter.MarshalToString(storeData) + if err != nil { + return meta, err + } + meta.Set(storeDataKey, data) + } + return meta, nil } -func decodeTrailers(ctx context.Context) Ingester { - var res Ingester +func decodeTrailers(ctx context.Context) Result { + var res Result collector, ok := ctx.Value(trailersKey).(*trailerCollector) if !ok { return res } - res.TotalReached = int32(len(collector.trailers)) + res.Ingester.TotalReached = int32(len(collector.trailers)) for _, meta := range collector.trailers { ing := decodeTrailer(meta) - res.TotalChunksMatched += ing.TotalChunksMatched - res.TotalBatches += ing.TotalBatches - res.TotalLinesSent += ing.TotalLinesSent - res.HeadChunkBytes += ing.HeadChunkBytes - res.HeadChunkLines += ing.HeadChunkLines - res.DecompressedBytes += ing.DecompressedBytes - res.DecompressedLines += ing.DecompressedLines - res.CompressedBytes += ing.CompressedBytes - res.TotalDuplicates += ing.TotalDuplicates + res.Ingester.TotalChunksMatched += ing.Ingester.TotalChunksMatched + res.Ingester.TotalBatches += ing.Ingester.TotalBatches + res.Ingester.TotalLinesSent += ing.Ingester.TotalLinesSent + res.Ingester.HeadChunkBytes += ing.Ingester.HeadChunkBytes + res.Ingester.HeadChunkLines += ing.Ingester.HeadChunkLines + res.Ingester.DecompressedBytes += ing.Ingester.DecompressedBytes + res.Ingester.DecompressedLines += ing.Ingester.DecompressedLines + res.Ingester.CompressedBytes += ing.Ingester.CompressedBytes + res.Ingester.TotalDuplicates += ing.Ingester.TotalDuplicates + res.Store.TotalChunksRef += ing.Store.TotalChunksRef + res.Store.TotalChunksDownloaded += ing.Store.TotalChunksDownloaded + res.Store.ChunksDownloadTime += ing.Store.ChunksDownloadTime } return res } -func decodeTrailer(meta *metadata.MD) Ingester { +func decodeTrailer(meta *metadata.MD) Result { var ingData IngesterData values := meta.Get(ingesterDataKey) if len(values) == 1 { @@ -110,15 +124,29 @@ func decodeTrailer(meta *metadata.MD) Ingester { level.Warn(util.Logger).Log("msg", "could not unmarshal chunk data", "err", err) } } - return Ingester{ - TotalChunksMatched: ingData.TotalChunksMatched, - TotalBatches: ingData.TotalBatches, - TotalLinesSent: ingData.TotalLinesSent, - HeadChunkBytes: chunkData.HeadChunkBytes, - HeadChunkLines: chunkData.HeadChunkLines, - DecompressedBytes: chunkData.DecompressedBytes, - DecompressedLines: chunkData.DecompressedLines, - CompressedBytes: chunkData.CompressedBytes, - TotalDuplicates: chunkData.TotalDuplicates, + var storeData StoreData + values = meta.Get(storeDataKey) + if len(values) == 1 { + if err := jsoniter.UnmarshalFromString(values[0], &storeData); err != nil { + level.Warn(util.Logger).Log("msg", "could not unmarshal chunk data", "err", err) + } + } + return Result{ + Ingester: Ingester{ + TotalChunksMatched: ingData.TotalChunksMatched, + TotalBatches: ingData.TotalBatches, + TotalLinesSent: ingData.TotalLinesSent, + HeadChunkBytes: chunkData.HeadChunkBytes, + HeadChunkLines: chunkData.HeadChunkLines, + DecompressedBytes: chunkData.DecompressedBytes, + DecompressedLines: chunkData.DecompressedLines, + CompressedBytes: chunkData.CompressedBytes, + TotalDuplicates: chunkData.TotalDuplicates, + }, + Store: Store{ + TotalChunksRef: storeData.TotalChunksRef, + TotalChunksDownloaded: storeData.TotalChunksDownloaded, + ChunksDownloadTime: storeData.ChunksDownloadTime.Seconds(), + }, } } diff --git a/pkg/logql/stats/grpc_test.go b/pkg/logql/stats/grpc_test.go index 92f68a31cb40..d44538f81cae 100644 --- a/pkg/logql/stats/grpc_test.go +++ b/pkg/logql/stats/grpc_test.go @@ -82,16 +82,16 @@ func TestCollectTrailer(t *testing.T) { t.Fatal(err) } res := decodeTrailers(ctx) - require.Equal(t, int32(2), res.TotalReached) - require.Equal(t, int64(2), res.TotalChunksMatched) - require.Equal(t, int64(4), res.TotalBatches) - require.Equal(t, int64(6), res.TotalLinesSent) - require.Equal(t, int64(2), res.HeadChunkBytes) - require.Equal(t, int64(2), res.HeadChunkLines) - require.Equal(t, int64(2), res.DecompressedBytes) - require.Equal(t, int64(2), res.DecompressedLines) - require.Equal(t, int64(2), res.CompressedBytes) - require.Equal(t, int64(2), res.TotalDuplicates) + require.Equal(t, int32(2), res.Ingester.TotalReached) + require.Equal(t, int64(2), res.Ingester.TotalChunksMatched) + require.Equal(t, int64(4), res.Ingester.TotalBatches) + require.Equal(t, int64(6), res.Ingester.TotalLinesSent) + require.Equal(t, int64(2), res.Ingester.HeadChunkBytes) + require.Equal(t, int64(2), res.Ingester.HeadChunkLines) + require.Equal(t, int64(2), res.Ingester.DecompressedBytes) + require.Equal(t, int64(2), res.Ingester.DecompressedLines) + require.Equal(t, int64(2), res.Ingester.CompressedBytes) + require.Equal(t, int64(2), res.Ingester.TotalDuplicates) } type ingesterFn func(*logproto.QueryRequest, logproto.Querier_QueryServer) error diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index de6717ae6642..3c98f2d71133 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "os" + "sort" "strings" "time" @@ -20,6 +21,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" @@ -31,6 +33,7 @@ import ( "github.com/grafana/loki/pkg/querier" "github.com/grafana/loki/pkg/querier/queryrange" loki_storage "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/storage/stores/local" "github.com/grafana/loki/pkg/util/validation" ) @@ -197,6 +200,14 @@ func (t *Loki) initIngester() (err error) { t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV t.cfg.Ingester.LifecyclerConfig.ListenPort = &t.cfg.Server.GRPCListenPort + + // We want ingester to also query the store when using boltdb-shipper + if activeIndexType(t.cfg.SchemaConfig) == local.BoltDBShipperType { + t.cfg.Ingester.QueryStore = true + // When using shipper, limit max look back for query to MaxChunkAge + upload interval by shipper + 15 mins to query only data whose index is not pushed yet + t.cfg.Ingester.QueryStoreMaxLookBackPeriod = t.cfg.Ingester.MaxChunkAge + local.ShipperFileUploadInterval + (15 * time.Minute) + } + t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides) if err != nil { return @@ -243,7 +254,7 @@ func (t *Loki) initTableManager() error { os.Exit(1) } - tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.cfg.StorageConfig.Config) + tableClient, err := loki_storage.NewTableClient(lastConfig.IndexType, t.cfg.StorageConfig) if err != nil { return err } @@ -277,6 +288,20 @@ func (t *Loki) stopTableManager() error { } func (t *Loki) initStore() (err error) { + if activeIndexType(t.cfg.SchemaConfig) == local.BoltDBShipperType { + t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID + switch t.cfg.Target { + case Ingester: + // We do not want ingester to unnecessarily keep downloading files + t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeWriteOnly + case Querier: + // We do not want query to do any updates to index + t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadOnly + default: + t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadWrite + } + } + t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides) return } @@ -473,3 +498,16 @@ var modules = map[moduleName]module{ deps: []moduleName{Querier, Ingester, Distributor, TableManager}, }, } + +// activeIndexType type returns index type which would be applicable to logs that would be pushed starting now +// Note: Another periodic config can be applicable in future which can change index type +func activeIndexType(cfg chunk.SchemaConfig) string { + now := model.Now() + i := sort.Search(len(cfg.Configs), func(i int) bool { + return cfg.Configs[i].From.Time > now + }) + if i > 0 { + i-- + } + return cfg.Configs[i].IndexType +} diff --git a/pkg/loki/modules_test.go b/pkg/loki/modules_test.go index 99832efd86c1..c30624738820 100644 --- a/pkg/loki/modules_test.go +++ b/pkg/loki/modules_test.go @@ -2,7 +2,10 @@ package loki import ( "testing" + "time" + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" ) @@ -35,3 +38,30 @@ func TestUniqueDeps(t *testing.T) { expected := []moduleName{Server, Overrides, Distributor, Ingester} assert.Equal(t, expected, uniqueDeps(input)) } + +func TestActiveIndexType(t *testing.T) { + var cfg chunk.SchemaConfig + + // just one PeriodConfig in the past + cfg.Configs = []chunk.PeriodConfig{{ + From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)}, + IndexType: "first", + }} + + assert.Equal(t, "first", activeIndexType(cfg)) + + // add a newer PeriodConfig in the past which should be considered + cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{ + From: chunk.DayTime{Time: model.Now().Add(-12 * time.Hour)}, + IndexType: "second", + }) + assert.Equal(t, "second", activeIndexType(cfg)) + + // add a newer PeriodConfig in the future which should not be considered + cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{ + From: chunk.DayTime{Time: model.Now().Add(time.Hour)}, + IndexType: "third", + }) + assert.Equal(t, "second", activeIndexType(cfg)) + +} diff --git a/pkg/storage/store.go b/pkg/storage/store.go index b03992c45b1f..5402f0144965 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -5,30 +5,33 @@ import ( "flag" "sort" + "github.com/cortexproject/cortex/pkg/chunk" + cortex_local "github.com/cortexproject/cortex/pkg/chunk/local" + "github.com/cortexproject/cortex/pkg/chunk/storage" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/user" - "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/chunk/storage" - "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/stats" + "github.com/grafana/loki/pkg/storage/stores/local" "github.com/grafana/loki/pkg/util" ) // Config is the loki storage configuration type Config struct { - storage.Config `yaml:",inline"` - MaxChunkBatchSize int `yaml:"max_chunk_batch_size"` + storage.Config `yaml:",inline"` + MaxChunkBatchSize int `yaml:"max_chunk_batch_size"` + BoltDBShipperConfig local.ShipperConfig `yaml:"boltdb_shipper_config"` } // RegisterFlags adds the flags required to configure this flag set. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.Config.RegisterFlags(f) + cfg.BoltDBShipperConfig.RegisterFlags(f) f.IntVar(&cfg.MaxChunkBatchSize, "max-chunk-batch-size", 50, "The maximum number of chunks to fetch per batch.") } @@ -46,6 +49,8 @@ type store struct { // NewStore creates a new Loki Store using configuration supplied. func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits storage.StoreLimits) (Store, error) { + registerCustomIndexClients(cfg, schemaCfg) + s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits) if err != nil { return nil, err @@ -56,6 +61,16 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf }, nil } +// NewTableClient creates a TableClient for managing tables for index/chunk store. +// ToDo: Add support in Cortex for registering custom table client like index client. +func NewTableClient(name string, cfg Config) (chunk.TableClient, error) { + if name == local.BoltDBShipperType { + name = "boltdb" + cfg.FSConfig = cortex_local.FSConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory} + } + return storage.NewTableClient(name, cfg.Config) +} + // decodeReq sanitizes an incoming request, rounds bounds, and appends the __name__ matcher func decodeReq(req logql.SelectParams) ([]*labels.Matcher, logql.LineFilter, model.Time, model.Time, error) { expr, err := req.LogSelector() @@ -201,3 +216,36 @@ func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk. } return filtered } + +func registerCustomIndexClients(cfg Config, schemaCfg chunk.SchemaConfig) { + boltdbShipperInstances := 0 + storage.RegisterIndexStore(local.BoltDBShipperType, func() (chunk.IndexClient, error) { + // since we do not know which object client is being used for the period for which we are creating this index client, + // we need to iterate through all the periodic configs to find the right one. + // We maintain number of instances that we have already created in boltdbShipperInstances and then count the number of + // encounters of BoltDBShipperType until we find the right periodic config for getting the ObjectType. + // This is done assuming we are creating index client in the order of periodic configs. + // Note: We are assuming that user would never store chunks in table based store otherwise NewObjectClient would return an error. + + // ToDo: Try passing on ObjectType from Cortex to the callback for creating custom index client. + boltdbShipperEncounter := 0 + objectStoreType := "" + for _, config := range schemaCfg.Configs { + if config.IndexType == local.BoltDBShipperType { + boltdbShipperEncounter++ + if boltdbShipperEncounter > boltdbShipperInstances { + objectStoreType = config.ObjectType + break + } + } + } + + boltdbShipperInstances++ + objectClient, err := storage.NewObjectClient(objectStoreType, cfg.Config) + if err != nil { + return nil, err + } + + return local.NewBoltDBIndexClient(cortex_local.BoltDBConfig{Directory: cfg.BoltDBShipperConfig.ActiveIndexDirectory}, objectClient, cfg.BoltDBShipperConfig) + }, nil) +} diff --git a/pkg/storage/stores/local/boltdb_index_client.go b/pkg/storage/stores/local/boltdb_index_client.go new file mode 100644 index 000000000000..286780d79966 --- /dev/null +++ b/pkg/storage/stores/local/boltdb_index_client.go @@ -0,0 +1,61 @@ +package local + +import ( + "context" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/local" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + "go.etcd.io/bbolt" +) + +type BoltdbIndexClientWithShipper struct { + *local.BoltIndexClient + shipper *Shipper +} + +// NewBoltDBIndexClient creates a new IndexClient that used BoltDB. +func NewBoltDBIndexClient(cfg local.BoltDBConfig, archiveStoreClient chunk.ObjectClient, archiverCfg ShipperConfig) (chunk.IndexClient, error) { + boltDBIndexClient, err := local.NewBoltDBIndexClient(cfg) + if err != nil { + return nil, err + } + + shipper, err := NewShipper(archiverCfg, archiveStoreClient, boltDBIndexClient) + if err != nil { + return nil, err + } + + indexClient := BoltdbIndexClientWithShipper{ + BoltIndexClient: boltDBIndexClient, + shipper: shipper, + } + + return &indexClient, nil +} + +func (b *BoltdbIndexClientWithShipper) Stop() { + b.shipper.Stop() + b.BoltIndexClient.Stop() +} + +func (b *BoltdbIndexClientWithShipper) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { + return chunk_util.DoParallelQueries(ctx, b.query, queries, callback) +} + +func (b *BoltdbIndexClientWithShipper) query(ctx context.Context, query chunk.IndexQuery, callback func(chunk.ReadBatch) (shouldContinue bool)) error { + db, err := b.GetDB(query.TableName, local.DBOperationRead) + if err != nil && err != local.ErrUnexistentBoltDB { + return err + } + + if db != nil { + if err := b.QueryDB(ctx, db, query, callback); err != nil { + return err + } + } + + return b.shipper.forEach(ctx, query.TableName, func(db *bbolt.DB) error { + return b.QueryDB(ctx, db, query, callback) + }) +} diff --git a/pkg/storage/stores/local/downloads.go b/pkg/storage/stores/local/downloads.go new file mode 100644 index 000000000000..b98e2df0215e --- /dev/null +++ b/pkg/storage/stores/local/downloads.go @@ -0,0 +1,216 @@ +package local + +import ( + "context" + "fmt" + "io" + "os" + "path" + "strings" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/local" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" +) + +// checkStorageForUpdates compares files from cache with storage and builds the list of files to be downloaded from storage and to be deleted from cache +func (s *Shipper) checkStorageForUpdates(ctx context.Context, period string, fc *filesCollection) (toDownload []chunk.StorageObject, toDelete []string, err error) { + if s.cfg.Mode == ShipperModeWriteOnly { + return + } + + // listing tables from store + var objects []chunk.StorageObject + objects, err = s.storageClient.List(ctx, period+"/") + if err != nil { + return + } + + listedUploaders := make(map[string]struct{}, len(objects)) + + for _, object := range objects { + uploader := strings.Split(object.Key, "/")[1] + // don't include the file which was uploaded by same ingester + if uploader == s.uploader { + continue + } + listedUploaders[uploader] = struct{}{} + + // Checking whether file was updated in the store after we downloaded it, if not, no need to include it in updates + downloadedFileDetails, ok := fc.files[uploader] + if !ok || downloadedFileDetails.mtime != object.ModifiedAt { + toDownload = append(toDownload, object) + } + } + + for uploader := range fc.files { + if _, isOK := listedUploaders[uploader]; !isOK { + toDelete = append(toDelete, uploader) + } + } + + return +} + +// syncFilesForPeriod downloads updated and new files from for given period from all the uploaders and removes deleted ones +func (s *Shipper) syncFilesForPeriod(ctx context.Context, period string, fc *filesCollection) error { + level.Debug(util.Logger).Log("msg", fmt.Sprintf("syncing files for period %s", period)) + + fc.RLock() + toDownload, toDelete, err := s.checkStorageForUpdates(ctx, period, fc) + fc.RUnlock() + + if err != nil { + return err + } + + for _, storageObject := range toDownload { + err = s.downloadFile(ctx, period, storageObject, fc) + if err != nil { + return err + } + } + + for _, uploader := range toDelete { + err := s.deleteFileFromCache(period, uploader, fc) + if err != nil { + return err + } + } + + return nil +} + +// It first downloads file to a temp location so that we close the existing file(if already exists), replace it with new one and then reopen it. +func (s *Shipper) downloadFile(ctx context.Context, period string, storageObject chunk.StorageObject, fc *filesCollection) error { + uploader := strings.Split(storageObject.Key, "/")[1] + folderPath, _ := s.getFolderPathForPeriod(period, false) + filePath := path.Join(folderPath, uploader) + + // download the file temporarily with some other name to allow boltdb client to close the existing file first if it exists + tempFilePath := path.Join(folderPath, fmt.Sprintf("%s.%s", uploader, "temp")) + + err := s.getFileFromStorage(ctx, storageObject.Key, tempFilePath) + if err != nil { + return err + } + + fc.Lock() + defer fc.Unlock() + + df, ok := fc.files[uploader] + if ok { + if err := df.boltdb.Close(); err != nil { + return err + } + } else { + df = downloadedFiles{} + } + + // move the file from temp location to actual location + err = os.Rename(tempFilePath, filePath) + if err != nil { + return err + } + + df.mtime = storageObject.ModifiedAt + df.boltdb, err = local.OpenBoltdbFile(filePath) + if err != nil { + return err + } + + fc.files[uploader] = df + + return nil +} + +// getFileFromStorage downloads a file from storage to given location. +func (s *Shipper) getFileFromStorage(ctx context.Context, objectKey, destination string) error { + readCloser, err := s.storageClient.GetObject(ctx, objectKey) + if err != nil { + return err + } + + defer func() { + if err := readCloser.Close(); err != nil { + level.Error(util.Logger) + } + }() + + f, err := os.Create(destination) + if err != nil { + return err + } + + _, err = io.Copy(f, readCloser) + if err != nil { + return err + } + + level.Info(util.Logger).Log("msg", fmt.Sprintf("downloaded file %s", objectKey)) + + return f.Sync() +} + +// downloadFilesForPeriod should be called when files for a period does not exist i.e they were never downloaded or got cleaned up later on by TTL +// While files are being downloaded it will block all reads/writes on filesCollection by taking an exclusive lock +func (s *Shipper) downloadFilesForPeriod(ctx context.Context, period string, fc *filesCollection) error { + fc.Lock() + defer fc.Unlock() + + objects, err := s.storageClient.List(ctx, period+"/") + if err != nil { + return err + } + + level.Debug(util.Logger).Log("msg", fmt.Sprintf("list of files to download for period %s: %s", period, objects)) + + folderPath, err := s.getFolderPathForPeriod(period, true) + if err != nil { + return err + } + + for _, object := range objects { + uploader := getUploaderFromObjectKey(object.Key) + if uploader == s.uploader { + continue + } + + filePath := path.Join(folderPath, uploader) + df := downloadedFiles{} + + err := s.getFileFromStorage(ctx, object.Key, filePath) + if err != nil { + return err + } + + df.mtime = object.ModifiedAt + df.boltdb, err = local.OpenBoltdbFile(filePath) + if err != nil { + return err + } + + fc.files[uploader] = df + } + + return nil +} + +func (s *Shipper) getFolderPathForPeriod(period string, ensureExists bool) (string, error) { + folderPath := path.Join(s.cfg.CacheLocation, period) + + if ensureExists { + err := chunk_util.EnsureDirectory(folderPath) + if err != nil { + return "", err + } + } + + return folderPath, nil +} + +func getUploaderFromObjectKey(objectKey string) string { + return strings.Split(objectKey, "/")[1] +} diff --git a/pkg/storage/stores/local/downloads_test.go b/pkg/storage/stores/local/downloads_test.go new file mode 100644 index 000000000000..47710e892243 --- /dev/null +++ b/pkg/storage/stores/local/downloads_test.go @@ -0,0 +1,121 @@ +package local + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/chunk/local" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/stretchr/testify/require" +) + +func queryTestBoltdb(t *testing.T, boltdbIndexClient *BoltdbIndexClientWithShipper, query chunk.IndexQuery) map[string]string { + resp := map[string]string{} + + require.NoError(t, boltdbIndexClient.query(context.Background(), query, func(batch chunk.ReadBatch) (shouldContinue bool) { + itr := batch.Iterator() + for itr.Next() { + resp[string(itr.RangeValue())] = string(itr.Value()) + } + return true + })) + + return resp +} + +func writeTestData(t *testing.T, indexClient *BoltdbIndexClientWithShipper, tableName string, numRecords, startValue int) { + time.Sleep(time.Second / 2) + + batch := indexClient.NewWriteBatch() + for i := 0; i < numRecords; i++ { + value := []byte(strconv.Itoa(startValue + i)) + batch.Add(tableName, "", value, value) + } + + require.NoError(t, indexClient.BatchWrite(context.Background(), batch)) + + boltdb, err := indexClient.GetDB(tableName, local.DBOperationWrite) + require.NoError(t, err) + + require.NoError(t, boltdb.Sync()) +} + +func TestShipper_Downloads(t *testing.T) { + tempDirForTests, err := ioutil.TempDir("", "test-dir") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(tempDirForTests)) + }() + + localStoreLocation, err := ioutil.TempDir(tempDirForTests, "local-store") + require.NoError(t, err) + + boltDBWithShipper1 := createTestBoltDBWithShipper(t, tempDirForTests, "ingester1", localStoreLocation) + boltDBWithShipper2 := createTestBoltDBWithShipper(t, tempDirForTests, "ingester2", localStoreLocation) + + // add a file to boltDBWithShipper1 + writeTestData(t, boltDBWithShipper1, "1", 10, 0) + + // upload files from boltDBWithShipper1 + require.NoError(t, boltDBWithShipper1.shipper.uploadFiles(context.Background())) + + // query data for same table from boltDBWithShipper2 + resp := queryTestBoltdb(t, boltDBWithShipper2, chunk.IndexQuery{ + TableName: "1", + }) + + // make sure we got same data that was added from boltDBWithShipper1 + checkExpectedKVsInBoltdbResp(t, resp, 10, 0) + + // add more data to the previous file added to boltDBWithShipper1 and the upload it + writeTestData(t, boltDBWithShipper1, "1", 10, 10) + require.NoError(t, boltDBWithShipper1.shipper.uploadFiles(context.Background())) + + // sync files in boltDBWithShipper2 + require.NoError(t, boltDBWithShipper2.shipper.syncLocalWithStorage(context.Background())) + + // query data for same table from boltDBWithShipper2 + resp = queryTestBoltdb(t, boltDBWithShipper2, chunk.IndexQuery{ + TableName: "1", + }) + + // make sure we also got new data that was added from boltDBWithShipper1 + checkExpectedKVsInBoltdbResp(t, resp, 20, 0) + + // add some data for same table in boltDBWithShipper2 + writeTestData(t, boltDBWithShipper2, "1", 10, 20) + + // query data for same table from boltDBWithShipper2 + resp = queryTestBoltdb(t, boltDBWithShipper2, chunk.IndexQuery{ + TableName: "1", + }) + + // make sure we data from boltDBWithShipper1 and boltDBWithShipper2 + checkExpectedKVsInBoltdbResp(t, resp, 30, 0) + + // stop boltDBWithShipper1 + boltDBWithShipper1.Stop() + + // delete the file from the store that was uploaded by boltDBWithShipper1 + require.NoError(t, os.Remove(filepath.Join(localStoreLocation, storageKeyPrefix, "1", boltDBWithShipper1.shipper.uploader))) + + // sync files in boltDBWithShipper2 + require.NoError(t, boltDBWithShipper2.shipper.syncLocalWithStorage(context.Background())) + + // query data for same table from boltDBWithShipper2 + resp = queryTestBoltdb(t, boltDBWithShipper2, chunk.IndexQuery{ + TableName: "1", + }) + + // make sure we got only data that was added to boltDBWithShipper2 + checkExpectedKVsInBoltdbResp(t, resp, 10, 20) + + boltDBWithShipper2.Stop() +} diff --git a/pkg/storage/stores/local/shipper.go b/pkg/storage/stores/local/shipper.go new file mode 100644 index 000000000000..59f72ef1f64a --- /dev/null +++ b/pkg/storage/stores/local/shipper.go @@ -0,0 +1,294 @@ +package local + +import ( + "context" + "flag" + "fmt" + "io/ioutil" + "os" + "path" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + pkg_util "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + "go.etcd.io/bbolt" + + "github.com/grafana/loki/pkg/storage/stores/util" +) + +const ( + // ShipperModeReadWrite is to allow both read and write + ShipperModeReadWrite = iota + // ShipperModeReadOnly is to allow only read operations + ShipperModeReadOnly + // ShipperModeWriteOnly is to allow only write operations + ShipperModeWriteOnly + + // ShipperFileUploadInterval defines interval for uploading active boltdb files from local which are being written to by ingesters. + ShipperFileUploadInterval = 15 * time.Minute + + // BoltDBShipperType holds the index type for using boltdb with shipper which keeps flushing them to a shared storage + BoltDBShipperType = "boltdb-shipper" + + cacheCleanupInterval = 24 * time.Hour + storageKeyPrefix = "index/" +) + +type BoltDBGetter interface { + GetDB(name string, operation int) (*bbolt.DB, error) +} + +type ShipperConfig struct { + ActiveIndexDirectory string `yaml:"active_index_directory"` + CacheLocation string `yaml:"cache_location"` + CacheTTL time.Duration `yaml:"cache_ttl"` + ResyncInterval time.Duration `yaml:"resync_interval"` + IngesterName string `yaml:"-"` + Mode int `yaml:"-"` +} + +// RegisterFlags registers flags. +func (cfg *ShipperConfig) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.ActiveIndexDirectory, "boltdb.shipper.active-index-directory", "", "Directory where ingesters would write boltdb files which would then be uploaded by shipper to configured storage") + f.StringVar(&cfg.CacheLocation, "boltdb.shipper.cache-location", "", "Cache location for restoring boltDB files for queries") + f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries") + f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage") +} + +type downloadedFiles struct { + mtime time.Time + boltdb *bbolt.DB +} + +// filesCollection holds info about shipped boltdb index files by other uploaders(ingesters). +// It is generally used to hold boltdb files created by all the ingesters for same period i.e with same name. +// In the object store files are uploaded as / to manage files with same name from different ingesters +type filesCollection struct { + sync.RWMutex + lastUsedAt time.Time + files map[string]downloadedFiles +} + +type Shipper struct { + cfg ShipperConfig + boltDBGetter BoltDBGetter + + // downloadedPeriods holds mapping for period -> filesCollection. + // Here period is name of the file created by ingesters for a specific period. + downloadedPeriods map[string]*filesCollection + downloadedPeriodsMtx sync.RWMutex + storageClient chunk.ObjectClient + + uploader string + uploadedFilesMtime map[string]time.Time + uploadedFilesMtimeMtx sync.RWMutex + + done chan struct{} + wait sync.WaitGroup +} + +// NewShipper creates a shipper for syncing local objects with a store +func NewShipper(cfg ShipperConfig, storageClient chunk.ObjectClient, boltDBGetter BoltDBGetter) (*Shipper, error) { + err := chunk_util.EnsureDirectory(cfg.CacheLocation) + if err != nil { + return nil, err + } + + shipper := Shipper{ + cfg: cfg, + boltDBGetter: boltDBGetter, + downloadedPeriods: map[string]*filesCollection{}, + storageClient: util.NewPrefixedObjectClient(storageClient, storageKeyPrefix), + done: make(chan struct{}), + uploadedFilesMtime: map[string]time.Time{}, + } + + shipper.uploader, err = shipper.getUploaderName() + if err != nil { + return nil, err + } + + shipper.wait.Add(1) + go shipper.loop() + + return &shipper, nil +} + +// we would persist uploader name in /uploader/name file so that we use same name on subsequent restarts to +// avoid uploading same files again with different name. If the filed does not exist we would create one with uploader name set to +// ingester name and startup timestamp so that we randomise the name and do not override files from other ingesters. +func (s *Shipper) getUploaderName() (string, error) { + uploader := fmt.Sprintf("%s-%d", s.cfg.IngesterName, time.Now().Unix()) + + uploaderFilePath := path.Join(s.cfg.ActiveIndexDirectory, "uploader", "name") + if err := chunk_util.EnsureDirectory(path.Dir(uploaderFilePath)); err != nil { + return "", err + } + + _, err := os.Stat(uploaderFilePath) + if err != nil { + if !os.IsNotExist(err) { + return "", err + } + if err := ioutil.WriteFile(uploaderFilePath, []byte(uploader), 0666); err != nil { + return "", err + } + } else { + ub, err := ioutil.ReadFile(uploaderFilePath) + if err != nil { + return "", err + } + uploader = string(ub) + } + + return uploader, nil +} + +func (s *Shipper) loop() { + defer s.wait.Done() + + resyncTicker := time.NewTicker(s.cfg.ResyncInterval) + defer resyncTicker.Stop() + + uploadFilesTicker := time.NewTicker(ShipperFileUploadInterval) + defer uploadFilesTicker.Stop() + + cacheCleanupTicker := time.NewTicker(cacheCleanupInterval) + defer cacheCleanupTicker.Stop() + + for { + select { + case <-resyncTicker.C: + err := s.syncLocalWithStorage(context.Background()) + if err != nil { + level.Error(pkg_util.Logger).Log("msg", "error syncing local boltdb files with storage", "err", err) + } + case <-uploadFilesTicker.C: + err := s.uploadFiles(context.Background()) + if err != nil { + level.Error(pkg_util.Logger).Log("msg", "error pushing archivable files to store", "err", err) + } + case <-cacheCleanupTicker.C: + err := s.cleanupCache() + if err != nil { + level.Error(pkg_util.Logger).Log("msg", "error cleaning up expired tables", "err", err) + } + case <-s.done: + return + } + } +} + +// Stop the shipper and push all the local files to the store +func (s *Shipper) Stop() { + close(s.done) + s.wait.Wait() + + // Push all boltdb files to storage before returning + err := s.uploadFiles(context.Background()) + if err != nil { + level.Error(pkg_util.Logger).Log("msg", "error pushing archivable files to store", "err", err) + } + + s.downloadedPeriodsMtx.Lock() + defer s.downloadedPeriodsMtx.Unlock() + + for _, fc := range s.downloadedPeriods { + fc.Lock() + for _, fl := range fc.files { + _ = fl.boltdb.Close() + } + fc.Unlock() + } +} + +// cleanupCache removes all the files for a period which has not be queried for using the configured TTL +func (s *Shipper) cleanupCache() error { + s.downloadedPeriodsMtx.Lock() + defer s.downloadedPeriodsMtx.Unlock() + + for period, fc := range s.downloadedPeriods { + if fc.lastUsedAt.Add(s.cfg.CacheTTL).Before(time.Now()) { + for uploader := range fc.files { + if err := s.deleteFileFromCache(period, uploader, fc); err != nil { + return err + } + } + + delete(s.downloadedPeriods, period) + } + } + + return nil +} + +// syncLocalWithStorage syncs all the periods that we have in the cache with the storage +// i.e download new and updated files and remove files which were delete from the storage. +func (s *Shipper) syncLocalWithStorage(ctx context.Context) error { + s.downloadedPeriodsMtx.RLock() + defer s.downloadedPeriodsMtx.RUnlock() + + for period := range s.downloadedPeriods { + if err := s.syncFilesForPeriod(ctx, period, s.downloadedPeriods[period]); err != nil { + return err + } + } + + return nil +} + +// deleteFileFromCache removes a file from cache. +// It takes care of locking the filesCollection, closing the boltdb file and removing the file from cache +func (s *Shipper) deleteFileFromCache(period, uploader string, fc *filesCollection) error { + fc.Lock() + defer fc.Unlock() + + if err := fc.files[uploader].boltdb.Close(); err != nil { + return err + } + + delete(fc.files, uploader) + + return os.Remove(path.Join(s.cfg.CacheLocation, period, uploader)) +} + +func (s *Shipper) forEach(ctx context.Context, period string, callback func(db *bbolt.DB) error) error { + s.downloadedPeriodsMtx.RLock() + fc, ok := s.downloadedPeriods[period] + s.downloadedPeriodsMtx.RUnlock() + + if !ok { + s.downloadedPeriodsMtx.Lock() + fc, ok = s.downloadedPeriods[period] + if ok { + s.downloadedPeriodsMtx.Unlock() + } else { + level.Info(pkg_util.Logger).Log("msg", fmt.Sprintf("downloading all files for period %s", period)) + + fc = &filesCollection{files: map[string]downloadedFiles{}} + s.downloadedPeriods[period] = fc + s.downloadedPeriodsMtx.Unlock() + + if err := s.downloadFilesForPeriod(ctx, period, fc); err != nil { + return err + } + } + + } + + fc.RLock() + defer fc.RUnlock() + + fc.lastUsedAt = time.Now() + + for uploader := range fc.files { + if err := callback(fc.files[uploader].boltdb); err != nil { + return err + } + } + + return nil +} diff --git a/pkg/storage/stores/local/uploads.go b/pkg/storage/stores/local/uploads.go new file mode 100644 index 000000000000..b44e52928900 --- /dev/null +++ b/pkg/storage/stores/local/uploads.go @@ -0,0 +1,112 @@ +package local + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path" + + "github.com/cortexproject/cortex/pkg/chunk/local" + chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + "go.etcd.io/bbolt" +) + +// uploadFiles uploads all new and updated files to storage. +// It uploads the files from configured boltdb dir where ingester writes the index. +func (s *Shipper) uploadFiles(ctx context.Context) error { + if s.cfg.Mode == ShipperModeReadOnly { + return nil + } + + filesInfo, err := ioutil.ReadDir(s.cfg.ActiveIndexDirectory) + if err != nil { + return err + } + + for _, fileInfo := range filesInfo { + if fileInfo.IsDir() { + continue + } + + s.uploadedFilesMtimeMtx.RLock() + // Checking whether file is updated after last push, if not skipping it + uploadedFileMtime, ok := s.uploadedFilesMtime[fileInfo.Name()] + s.uploadedFilesMtimeMtx.RUnlock() + + if ok && uploadedFileMtime.Equal(fileInfo.ModTime()) { + continue + } + + err := s.uploadFile(ctx, fileInfo.Name()) + if err != nil { + return err + } + + s.uploadedFilesMtimeMtx.Lock() + s.uploadedFilesMtime[fileInfo.Name()] = fileInfo.ModTime() + s.uploadedFilesMtimeMtx.Unlock() + } + + return nil +} + +// uploadFile uploads one of the files locally written by ingesters to storage. +func (s *Shipper) uploadFile(ctx context.Context, period string) error { + if s.cfg.Mode == ShipperModeReadOnly { + return nil + } + + level.Debug(util.Logger).Log("msg", fmt.Sprintf("uploading file for period %s", period)) + + snapshotPath := path.Join(s.cfg.CacheLocation, period) + err := chunk_util.EnsureDirectory(snapshotPath) + if err != nil { + return err + } + + filePath := path.Join(snapshotPath, fmt.Sprintf("%s.%s", s.uploader, "temp")) + f, err := os.Create(filePath) + if err != nil { + return err + } + + defer func() { + if err := os.Remove(filePath); err != nil { + level.Error(util.Logger) + } + }() + + db, err := s.boltDBGetter.GetDB(period, local.DBOperationRead) + if err != nil { + return err + } + + err = db.View(func(tx *bbolt.Tx) error { + _, err := tx.WriteTo(f) + return err + }) + if err != nil { + return err + } + + if err := f.Sync(); err != nil { + return err + } + + if _, err := f.Seek(0, 0); err != nil { + return err + } + + defer func() { + if err := f.Close(); err != nil { + level.Error(util.Logger) + } + }() + + // Files are stored with / + objectKey := fmt.Sprintf("%s/%s", period, s.uploader) + return s.storageClient.PutObject(ctx, objectKey, f) +} diff --git a/pkg/storage/stores/local/uploads_test.go b/pkg/storage/stores/local/uploads_test.go new file mode 100644 index 000000000000..4a71da5194b7 --- /dev/null +++ b/pkg/storage/stores/local/uploads_test.go @@ -0,0 +1,165 @@ +package local + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/chunk/local" + "github.com/cortexproject/cortex/pkg/chunk/util" + "github.com/stretchr/testify/require" + "go.etcd.io/bbolt" +) + +const testBucketName = "testBucket" + +func createTestBoltDBWithShipper(t *testing.T, parentTempDir, ingesterName, localStoreLocation string) *BoltdbIndexClientWithShipper { + cacheLocation := filepath.Join(parentTempDir, ingesterName, "cache") + boltdbFilesLocation := filepath.Join(parentTempDir, ingesterName, "boltdb") + + require.NoError(t, util.EnsureDirectory(cacheLocation)) + require.NoError(t, util.EnsureDirectory(boltdbFilesLocation)) + + shipperConfig := ShipperConfig{ + ActiveIndexDirectory: boltdbFilesLocation, + CacheLocation: cacheLocation, + CacheTTL: 1 * time.Hour, + ResyncInterval: 1 * time.Hour, + IngesterName: ingesterName, + Mode: ShipperModeReadWrite, + } + + archiveStoreClient, err := local.NewFSObjectClient(local.FSConfig{ + Directory: localStoreLocation, + }) + require.NoError(t, err) + + boltdbIndexClientWithShipper, err := NewBoltDBIndexClient(local.BoltDBConfig{Directory: shipperConfig.ActiveIndexDirectory}, archiveStoreClient, shipperConfig) + require.NoError(t, err) + + return boltdbIndexClientWithShipper.(*BoltdbIndexClientWithShipper) +} + +func addTestRecordsToBoltDBFile(t *testing.T, boltdb *bbolt.DB, numRecords int, start int) { + time.Sleep(time.Second / 2) + + err := boltdb.Update(func(tx *bbolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte(testBucketName)) + if err != nil { + return err + } + + for i := 0; i < numRecords; i++ { + kv := []byte(strconv.Itoa(start + i)) + + err = b.Put(kv, kv) + if err != nil { + return err + } + } + + return nil + }) + + require.NoError(t, err) + require.NoError(t, boltdb.Sync()) +} + +func readAllKVsFromBoltdbFile(t *testing.T, boltdb *bbolt.DB) map[string]string { + resp := map[string]string{} + + err := boltdb.View(func(tx *bbolt.Tx) error { + b := tx.Bucket([]byte(testBucketName)) + require.NotNil(t, b) + + return b.ForEach(func(k, v []byte) error { + resp[string(k)] = string(v) + return nil + }) + }) + + require.NoError(t, err) + + return resp +} + +func readAllKVsFromBoltdbFileAtPath(t *testing.T, path string) map[string]string { + boltDBFile, err := local.OpenBoltdbFile(path) + require.NoError(t, err) + + defer func() { + require.NoError(t, boltDBFile.Close()) + }() + + return readAllKVsFromBoltdbFile(t, boltDBFile) +} + +func checkExpectedKVsInBoltdbResp(t *testing.T, resp map[string]string, expectedNumRecords, start int) { + require.Equal(t, expectedNumRecords, len(resp), "responses", resp) + + for i := 0; i < expectedNumRecords; i++ { + expectedKV := strconv.Itoa(start + i) + + val, ok := resp[expectedKV] + require.Equal(t, true, ok) + require.Equal(t, expectedKV, val) + } +} + +func TestShipper_Uploads(t *testing.T) { + tempDirForTests, err := ioutil.TempDir("", "test-dir") + require.NoError(t, err) + + defer func() { + require.NoError(t, os.RemoveAll(tempDirForTests)) + }() + + localStoreLocation, err := ioutil.TempDir(tempDirForTests, "local-store") + require.NoError(t, err) + + boltDBWithShipper := createTestBoltDBWithShipper(t, tempDirForTests, "ingester", localStoreLocation) + + // create a boltdb file for boltDBWithShipper to test upload. + boltdbFile1, err := boltDBWithShipper.GetDB("file1", local.DBOperationWrite) + require.NoError(t, err) + file1PathInStorage := filepath.Join(localStoreLocation, storageKeyPrefix, filepath.Base(boltdbFile1.Path()), boltDBWithShipper.shipper.uploader) + + // add some test records to boltdbFile1 + addTestRecordsToBoltDBFile(t, boltdbFile1, 10, 1) + + // Upload files from boltDBWithShipper + err = boltDBWithShipper.shipper.uploadFiles(context.Background()) + require.NoError(t, err) + + // open boltdbFile1 and verify it has expected records + checkExpectedKVsInBoltdbResp(t, readAllKVsFromBoltdbFileAtPath(t, file1PathInStorage), 10, 1) + + // create another boltdb file for boltDBWithShipper to test upload. + boltdbFile2, err := boltDBWithShipper.GetDB("file2", local.DBOperationWrite) + require.NoError(t, err) + file2PathInStorage := filepath.Join(localStoreLocation, storageKeyPrefix, filepath.Base(boltdbFile2.Path()), boltDBWithShipper.shipper.uploader) + + // add some test records to boltdbFile2 and some more records to boltdbFile1 + addTestRecordsToBoltDBFile(t, boltdbFile2, 10, 1) + addTestRecordsToBoltDBFile(t, boltdbFile1, 5, 11) + + // Upload files from boltDBWithShipper + err = boltDBWithShipper.shipper.uploadFiles(context.Background()) + require.NoError(t, err) + + // open boltdbFile1 and boltdbFile2 and verify it has expected records + checkExpectedKVsInBoltdbResp(t, readAllKVsFromBoltdbFileAtPath(t, file2PathInStorage), 10, 1) + checkExpectedKVsInBoltdbResp(t, readAllKVsFromBoltdbFileAtPath(t, file1PathInStorage), 15, 1) + + // modify boltdbFile2 again + addTestRecordsToBoltDBFile(t, boltdbFile2, 10, 11) + + // stop boltDBWithShipper to make it upload all the new and changed to store + boltDBWithShipper.Stop() + + checkExpectedKVsInBoltdbResp(t, readAllKVsFromBoltdbFileAtPath(t, file2PathInStorage), 20, 1) +} diff --git a/pkg/storage/stores/util/object_client.go b/pkg/storage/stores/util/object_client.go new file mode 100644 index 000000000000..bb95ca3ee17a --- /dev/null +++ b/pkg/storage/stores/util/object_client.go @@ -0,0 +1,47 @@ +package util + +import ( + "context" + "io" + "strings" + + "github.com/cortexproject/cortex/pkg/chunk" +) + +type PrefixedObjectClient struct { + downstreamClient chunk.ObjectClient + prefix string +} + +func (p PrefixedObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { + return p.downstreamClient.PutObject(ctx, p.prefix+objectKey, object) +} + +func (p PrefixedObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error) { + return p.downstreamClient.GetObject(ctx, p.prefix+objectKey) +} + +func (p PrefixedObjectClient) List(ctx context.Context, prefix string) ([]chunk.StorageObject, error) { + objects, err := p.downstreamClient.List(ctx, p.prefix+prefix) + if err != nil { + return nil, err + } + + for i := range objects { + objects[i].Key = strings.TrimPrefix(objects[i].Key, p.prefix) + } + + return objects, nil +} + +func (p PrefixedObjectClient) DeleteObject(ctx context.Context, objectKey string) error { + return p.downstreamClient.DeleteObject(ctx, p.prefix+objectKey) +} + +func (p PrefixedObjectClient) Stop() { + p.downstreamClient.Stop() +} + +func NewPrefixedObjectClient(downstreamClient chunk.ObjectClient, prefix string) chunk.ObjectClient { + return PrefixedObjectClient{downstreamClient: downstreamClient, prefix: prefix} +}