From 6ebf451ffcfd94bbfef9e38c91a708f551c66a00 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Fri, 7 Feb 2020 17:23:14 -0500 Subject: [PATCH 01/20] basics working, needs some more cleanup and more flags for config and a Dockerfile Signed-off-by: Edward Welch --- cmd/migrate/main.go | 154 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 cmd/migrate/main.go diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go new file mode 100644 index 000000000000..33d8a93016ee --- /dev/null +++ b/cmd/migrate/main.go @@ -0,0 +1,154 @@ +package main + +import ( + "bufio" + "context" + "flag" + "fmt" + "log" + "os" + "sort" + "strings" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/weaveworks/common/user" + + "github.com/grafana/loki/pkg/cfg" + "github.com/grafana/loki/pkg/loki" + "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/validation" +) + +func main() { + + var defaultsConfig loki.Config + + sf := flag.String("source.config.file", "", "source datasource config") + df := flag.String("dest.config.file", "", "dest datasource config") + id := flag.String("tenant", "fake", "Tenant identifier, default is `fake` for single tenant Loki") + batch := flag.Int("batchLen", 1000, "Specify how many chunks to read/write in one batch") + flag.Parse() + + if err := cfg.Unmarshal(&defaultsConfig, cfg.Defaults()); err != nil { + fmt.Fprintf(os.Stderr, "failed parsing defaults config: %v\n", err) + os.Exit(1) + } + + sourceConfig := defaultsConfig + destConfig := defaultsConfig + + if err := cfg.YAML(sf)(&sourceConfig); err != nil { + fmt.Fprintf(os.Stderr, "failed parsing source config file %v: %v\n", *sf, err) + os.Exit(1) + } + if err := cfg.YAML(df)(&destConfig); err != nil { + fmt.Fprintf(os.Stderr, "failed parsing dest config file %v: %v\n", *df, err) + os.Exit(1) + } + + //if sourceConfig.StorageConfig.IndexQueriesCacheConfig + + limits, err := validation.NewOverrides(sourceConfig.LimitsConfig, nil) + s, err := storage.NewStore(sourceConfig.StorageConfig, sourceConfig.ChunkStoreConfig, sourceConfig.SchemaConfig, limits) + if err != nil { + panic(err) + } + + d, err := storage.NewStore(destConfig.StorageConfig, destConfig.ChunkStoreConfig, destConfig.SchemaConfig, limits) + if err != nil { + panic(err) + } + + nameLabelMatcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "logs") + if err != nil { + panic(err) + } + ctx := context.Background() + ctx = user.InjectOrgID(ctx, *id) + userID, err := user.ExtractOrgID(ctx) + if err != nil { + panic(err) + } + + from, through := util.RoundToMilliseconds(time.Now().Add(-60*time.Minute), time.Now()) + + schemaGroups, fetchers, err := s.GetChunkRefs(ctx, userID, from, through, nameLabelMatcher) + + var totalChunks int + for i := range schemaGroups { + totalChunks += len(schemaGroups[i]) + } + rdr := bufio.NewReader(os.Stdin) + fmt.Printf("Timespan will sync %v chunks spanning %v schemas.\n", totalChunks, len(fetchers)) + fmt.Print("Proceed? (Y/n):") + in, err := rdr.ReadString('\n') + if err != nil { + log.Fatalf("Error reading input: %v", err) + } + if strings.ToLower(strings.TrimSpace(in)) == "n" { + fmt.Println("Exiting") + os.Exit(0) + } + totalBytes := 0 + start := time.Now() + for i, f := range fetchers { + fmt.Printf("Processing Schema %v which contains %v chunks\n", i, len(schemaGroups[i])) + + //Slice up into batches + for j := 0; j < len(schemaGroups[i]); j += *batch { + k := j + *batch + if k > len(schemaGroups[i]) { + k = len(schemaGroups[i]) + } + + chunks := schemaGroups[i][j:k] + fmt.Printf("Processing chunks %v-%v of %v\n", j, k, len(schemaGroups[i])) + + keys := make([]string, 0, len(chunks)) + chks := make([]chunk.Chunk, 0, len(chunks)) + + // For retry purposes, find the earliest "Through" and keep it, + // if restarting you would want to set the "From" to this value + // This might send some duplicate chunks but should ensure nothing is missed. + earliestThrough := through + for _, c := range chunks { + if c.Through < earliestThrough { + earliestThrough = c.Through + } + } + + // FetchChunks requires chunks to be ordered by external key. + sort.Slice(chunks, func(l, m int) bool { return chunks[l].ExternalKey() < chunks[m].ExternalKey() }) + for _, chk := range chunks { + key := chk.ExternalKey() + keys = append(keys, key) + chks = append(chks, chk) + } + chks, err := f.FetchChunks(ctx, chks, keys) + if err != nil { + log.Fatalf("Error fetching chunks: %v\n", err) + } + + // Calculate some size stats + for _, chk := range chks { + if enc, err := chk.Encoded(); err == nil { + totalBytes += len(enc) + } else { + log.Fatalf("Error encoding a chunk: %v\n", err) + } + } + + err = d.Put(ctx, chks) + if err != nil { + log.Fatalf("Error sending chunks to new store: %v\n", err) + } + fmt.Printf("Batch sent successfully, if restarting after this batch you can safely start at time: %v\n", earliestThrough.Time()) + } + } + + fmt.Printf("Finished transfering %v chunks totalling %v bytes in %v\n", totalChunks, totalBytes, time.Now().Sub(start)) + +} From 4d3b81060f6856d7c1561f8f74389428b4160532 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 16 Mar 2020 19:01:02 -0400 Subject: [PATCH 02/20] add label matchers start end time params and ability to remap the chunk to a different user Signed-off-by: Edward Welch --- cmd/migrate/main.go | 58 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index 33d8a93016ee..18dd9e8b3eb5 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -16,6 +16,7 @@ import ( "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/cfg" + "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/loki" "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/util" @@ -26,9 +27,14 @@ func main() { var defaultsConfig loki.Config + from := flag.String("from", "", "Start Time RFC339Nano 2006-01-02T15:04:05.999999999Z07:00") + to := flag.String("to", "", "End Time RFC339Nano 2006-01-02T15:04:05.999999999Z07:00") sf := flag.String("source.config.file", "", "source datasource config") df := flag.String("dest.config.file", "", "dest datasource config") - id := flag.String("tenant", "fake", "Tenant identifier, default is `fake` for single tenant Loki") + source := flag.String("source.tenant", "fake", "Source tenant identifier, default is `fake` for single tenant Loki") + dest := flag.String("dest.tenant", "fake", "Destination tenant identifier, default is `fake` for single tenant Loki") + match := flag.String("match", "", "Optional label match") + batch := flag.Int("batchLen", 1000, "Specify how many chunks to read/write in one batch") flag.Parse() @@ -66,16 +72,27 @@ func main() { if err != nil { panic(err) } + + matchers := []*labels.Matcher{nameLabelMatcher} + + if *match != "" { + m, err := logql.ParseMatchers(*match) + if err != nil { + panic(err) + } + matchers = append(matchers, m...) + } + ctx := context.Background() - ctx = user.InjectOrgID(ctx, *id) + ctx = user.InjectOrgID(ctx, *source) userID, err := user.ExtractOrgID(ctx) if err != nil { panic(err) } - from, through := util.RoundToMilliseconds(time.Now().Add(-60*time.Minute), time.Now()) + f, t := util.RoundToMilliseconds(mustParse(*from), mustParse(*to)) - schemaGroups, fetchers, err := s.GetChunkRefs(ctx, userID, from, through, nameLabelMatcher) + schemaGroups, fetchers, err := s.GetChunkRefs(ctx, userID, f, t, matchers...) var totalChunks int for i := range schemaGroups { @@ -113,7 +130,7 @@ func main() { // For retry purposes, find the earliest "Through" and keep it, // if restarting you would want to set the "From" to this value // This might send some duplicate chunks but should ensure nothing is missed. - earliestThrough := through + earliestThrough := t for _, c := range chunks { if c.Through < earliestThrough { earliestThrough = c.Through @@ -132,16 +149,30 @@ func main() { log.Fatalf("Error fetching chunks: %v\n", err) } - // Calculate some size stats - for _, chk := range chks { + output := make([]chunk.Chunk, 0, len(chks)) + + // Calculate some size stats and change the tenant ID if necessary + for i, chk := range chks { if enc, err := chk.Encoded(); err == nil { totalBytes += len(enc) } else { log.Fatalf("Error encoding a chunk: %v\n", err) } + if *source != *dest { + // Because the incoming chunks are already encoded, to change the username we have to make a new chunk + nc := chunk.NewChunk(*dest, chk.Fingerprint, chk.Metric, chk.Data, chk.From, chk.Through) + err := nc.Encode() + if err != nil { + log.Fatalf("Failed to encode new chunk with new user: %v\n", err) + } + output = append(output, nc) + } else { + output = append(output, chks[i]) + } + } - err = d.Put(ctx, chks) + err = d.Put(ctx, output) if err != nil { log.Fatalf("Error sending chunks to new store: %v\n", err) } @@ -152,3 +183,14 @@ func main() { fmt.Printf("Finished transfering %v chunks totalling %v bytes in %v\n", totalChunks, totalBytes, time.Now().Sub(start)) } + +func mustParse(t string) time.Time { + + ret, err := time.Parse(time.RFC3339Nano, t) + + if err != nil { + log.Fatalf("Unable to parse time %v", err) + } + + return ret +} From 6971001eebeaa555024b91a0385dd412bfa1b24d Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Wed, 8 Apr 2020 11:48:27 -0400 Subject: [PATCH 03/20] adding dockerfile and makefile instructions Signed-off-by: Edward Welch --- Makefile | 17 +++++++++++++++++ cmd/migrate/Dockerfile | 9 +++++++++ cmd/migrate/main.go | 2 -- 3 files changed, 26 insertions(+), 2 deletions(-) create mode 100644 cmd/migrate/Dockerfile diff --git a/Makefile b/Makefile index ca538b8f564a..2f8fbe8169df 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,7 @@ .PHONY: push-images push-latest save-images load-images promtail-image loki-image build-image .PHONY: bigtable-backup, push-bigtable-backup .PHONY: benchmark-store, drone, check-mod +.PHONY: migrate migrate-image SHELL = /usr/bin/env bash @@ -225,6 +226,16 @@ cmd/promtail/promtail-debug: $(APP_GO_FILES) pkg/promtail/server/ui/assets_vfsda CGO_ENABLED=$(PROMTAIL_CGO) go build $(PROMTAIL_DEBUG_GO_FLAGS) -o $@ ./$(@D) $(NETGO_CHECK) +############### +# Migrate # +############### + +migrate: cmd/migrate/migrate + +cmd/migrate/migrate: $(APP_GO_FILES) cmd/migrate/main.go + CGO_ENABLED=0 go build $(GO_FLAGS) -o $@ ./$(@D) + $(NETGO_CHECK) + ############# # Releasing # ############# @@ -273,6 +284,7 @@ clean: rm -rf dist/ rm -rf cmd/fluent-bit/out_grafana_loki.h rm -rf cmd/fluent-bit/out_grafana_loki.so + rm -rf cmd/migrate/migrate go clean $(MOD_FLAG) ./... ######### @@ -502,6 +514,11 @@ loki-querytee-image-cross: loki-querytee-push: loki-querytee-image-cross $(SUDO) $(PUSH_OCI) $(IMAGE_PREFIX)/loki-querytee:$(IMAGE_TAG) +# migrate-image +migrate-image: + $(SUDO) docker build -t $(IMAGE_PREFIX)/loki-migrate:$(IMAGE_TAG) -f cmd/migrate/Dockerfile . + + # build-image (only amd64) build-image: OCI_PLATFORMS= build-image: diff --git a/cmd/migrate/Dockerfile b/cmd/migrate/Dockerfile new file mode 100644 index 000000000000..3c057aa83a81 --- /dev/null +++ b/cmd/migrate/Dockerfile @@ -0,0 +1,9 @@ +FROM golang:1.13 as build +COPY . /src/loki +WORKDIR /src/loki +RUN make clean && make BUILD_IN_CONTAINER=false migrate + +FROM alpine:3.9 +RUN apk add --update --no-cache ca-certificates +COPY --from=build /src/loki/cmd/migrate/migrate /usr/bin/migrate +ENTRYPOINT [ "/usr/bin/migrate" ] diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index 18dd9e8b3eb5..b7bac386b08b 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -55,8 +55,6 @@ func main() { os.Exit(1) } - //if sourceConfig.StorageConfig.IndexQueriesCacheConfig - limits, err := validation.NewOverrides(sourceConfig.LimitsConfig, nil) s, err := storage.NewStore(sourceConfig.StorageConfig, sourceConfig.ChunkStoreConfig, sourceConfig.SchemaConfig, limits) if err != nil { From e8d03895ed61c54566b2fdf6d17227cd77af4a0c Mon Sep 17 00:00:00 2001 From: Ed Date: Sat, 11 Apr 2020 15:12:12 -0400 Subject: [PATCH 04/20] push out some of the limits show errors from index query --- cmd/migrate/main.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index b7bac386b08b..ce00e322944c 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -55,6 +55,9 @@ func main() { os.Exit(1) } + // The long nature of queries requires stretching out the cardinality limit some and removing the query length limit + sourceConfig.LimitsConfig.CardinalityLimit = 1e9 + sourceConfig.LimitsConfig.MaxQueryLength = 0 limits, err := validation.NewOverrides(sourceConfig.LimitsConfig, nil) s, err := storage.NewStore(sourceConfig.StorageConfig, sourceConfig.ChunkStoreConfig, sourceConfig.SchemaConfig, limits) if err != nil { @@ -91,6 +94,10 @@ func main() { f, t := util.RoundToMilliseconds(mustParse(*from), mustParse(*to)) schemaGroups, fetchers, err := s.GetChunkRefs(ctx, userID, f, t, matchers...) + if err != nil { + fmt.Println("Error querying index for chunk refs:", err) + os.Exit(1) + } var totalChunks int for i := range schemaGroups { From 2fe95d97f98d70f50972eac67a8f616fb184ffbe Mon Sep 17 00:00:00 2001 From: Ed Date: Sat, 11 Apr 2020 15:27:17 -0400 Subject: [PATCH 05/20] for testing have docker container not run migrate command but instead stay running by tailing /dev/null --- cmd/migrate/Dockerfile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/migrate/Dockerfile b/cmd/migrate/Dockerfile index 3c057aa83a81..8c6e28aa6708 100644 --- a/cmd/migrate/Dockerfile +++ b/cmd/migrate/Dockerfile @@ -6,4 +6,5 @@ RUN make clean && make BUILD_IN_CONTAINER=false migrate FROM alpine:3.9 RUN apk add --update --no-cache ca-certificates COPY --from=build /src/loki/cmd/migrate/migrate /usr/bin/migrate -ENTRYPOINT [ "/usr/bin/migrate" ] +#ENTRYPOINT [ "/usr/bin/migrate" ] +CMD tail -f /dev/null From 23b57482fdb11c34b53cb917913105976e0ab9de Mon Sep 17 00:00:00 2001 From: Ed Date: Sat, 11 Apr 2020 21:17:46 -0400 Subject: [PATCH 06/20] big overhaul to add parallelization --- cmd/migrate/main.go | 260 +++++++++++++++++++++++++++++---------- cmd/migrate/main_test.go | 112 +++++++++++++++++ 2 files changed, 305 insertions(+), 67 deletions(-) create mode 100644 cmd/migrate/main_test.go diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index ce00e322944c..f842ec97b4e5 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -5,10 +5,12 @@ import ( "context" "flag" "fmt" + "github.com/prometheus/common/model" "log" "os" "sort" "strings" + "sync" "time" "github.com/cortexproject/cortex/pkg/chunk" @@ -23,6 +25,11 @@ import ( "github.com/grafana/loki/pkg/util/validation" ) +type syncRange struct { + from int64 + to int64 +} + func main() { var defaultsConfig loki.Config @@ -36,10 +43,12 @@ func main() { match := flag.String("match", "", "Optional label match") batch := flag.Int("batchLen", 1000, "Specify how many chunks to read/write in one batch") + shardBy := flag.Duration("shardBy", 6*time.Hour, "Break down the total interval into shards of this size, making this too small can lead to syncing a lot of duplicate chunks") + parallel := flag.Int("parallel", 4, "How many parallel threads to process each shard") flag.Parse() if err := cfg.Unmarshal(&defaultsConfig, cfg.Defaults()); err != nil { - fmt.Fprintf(os.Stderr, "failed parsing defaults config: %v\n", err) + log.Println("Failed parsing defaults config:", err) os.Exit(1) } @@ -47,11 +56,11 @@ func main() { destConfig := defaultsConfig if err := cfg.YAML(sf)(&sourceConfig); err != nil { - fmt.Fprintf(os.Stderr, "failed parsing source config file %v: %v\n", *sf, err) + log.Printf("Failed parsing source config file %v: %v\n", *sf, err) os.Exit(1) } if err := cfg.YAML(df)(&destConfig); err != nil { - fmt.Fprintf(os.Stderr, "failed parsing dest config file %v: %v\n", *df, err) + log.Printf("Failed parsing dest config file %v: %v\n", *df, err) os.Exit(1) } @@ -61,17 +70,20 @@ func main() { limits, err := validation.NewOverrides(sourceConfig.LimitsConfig, nil) s, err := storage.NewStore(sourceConfig.StorageConfig, sourceConfig.ChunkStoreConfig, sourceConfig.SchemaConfig, limits) if err != nil { - panic(err) + log.Println("Failed to create source store:", err) + os.Exit(1) } d, err := storage.NewStore(destConfig.StorageConfig, destConfig.ChunkStoreConfig, destConfig.SchemaConfig, limits) if err != nil { - panic(err) + log.Println("Failed to create destination store:", err) + os.Exit(1) } nameLabelMatcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "logs") if err != nil { - panic(err) + log.Println("Failed to create label matcher:", err) + os.Exit(1) } matchers := []*labels.Matcher{nameLabelMatcher} @@ -79,7 +91,8 @@ func main() { if *match != "" { m, err := logql.ParseMatchers(*match) if err != nil { - panic(err) + log.Println("Failed to parse log matcher:", err) + os.Exit(1) } matchers = append(matchers, m...) } @@ -90,12 +103,13 @@ func main() { if err != nil { panic(err) } - - f, t := util.RoundToMilliseconds(mustParse(*from), mustParse(*to)) + parsedFrom := mustParse(*from) + parsedTo := mustParse(*to) + f, t := util.RoundToMilliseconds(parsedFrom, parsedTo) schemaGroups, fetchers, err := s.GetChunkRefs(ctx, userID, f, t, matchers...) if err != nil { - fmt.Println("Error querying index for chunk refs:", err) + log.Println("Error querying index for chunk refs:", err) os.Exit(1) } @@ -111,82 +125,194 @@ func main() { log.Fatalf("Error reading input: %v", err) } if strings.ToLower(strings.TrimSpace(in)) == "n" { - fmt.Println("Exiting") + log.Println("Exiting") os.Exit(0) } totalBytes := 0 start := time.Now() - for i, f := range fetchers { - fmt.Printf("Processing Schema %v which contains %v chunks\n", i, len(schemaGroups[i])) - - //Slice up into batches - for j := 0; j < len(schemaGroups[i]); j += *batch { - k := j + *batch - if k > len(schemaGroups[i]) { - k = len(schemaGroups[i]) - } - chunks := schemaGroups[i][j:k] - fmt.Printf("Processing chunks %v-%v of %v\n", j, k, len(schemaGroups[i])) + shardByNs := *shardBy + syncRanges := calcSyncRanges(parsedFrom.UnixNano(), parsedTo.UnixNano(), shardByNs.Nanoseconds()) + log.Printf("With a shard duration of %v, %v ranges have been calculated.\n", shardByNs, len(syncRanges)) + + cm := newChunkMover(ctx, s, d, *source, *dest, matchers, *batch) + syncChan := make(chan *syncRange) + errorChan := make(chan error) + + //Start the parallel processors + var wg sync.WaitGroup + cancelContext, cancelFunc := context.WithCancel(ctx) + for i := 0; i < *parallel; i++ { + wg.Add(1) + go func(threadId int) { + defer wg.Done() + cm.moveChunks(threadId, cancelContext, syncChan, errorChan) + }(i) + } - keys := make([]string, 0, len(chunks)) - chks := make([]chunk.Chunk, 0, len(chunks)) + // Launch a thread to dispatch requests: + go func() { + i := 0 + length := len(syncRanges) + for i < length { + log.Printf("Dispatching sync range %v of %v\n", i+1, length) + syncChan <- syncRanges[i] + i++ + } + //Everything processed, exit + cancelFunc() + }() + + // Wait for an error or the context to be canceled + select { + case <-cancelContext.Done(): + log.Println("Received done call, waiting on threads") + log.Printf("Finished transfering %v chunks totalling %v bytes in %v\n", totalChunks, totalBytes, time.Now().Sub(start)) + case err := <-errorChan: + log.Println("Received an error from processing thread, shutting down: ", err) + cancelFunc() + } + log.Println("Waiting for threads to exit") + wg.Wait() + log.Println("All threads finished") + log.Println("Going to sleep....") + for { + time.Sleep(100 * time.Second) + } - // For retry purposes, find the earliest "Through" and keep it, - // if restarting you would want to set the "From" to this value - // This might send some duplicate chunks but should ensure nothing is missed. - earliestThrough := t - for _, c := range chunks { - if c.Through < earliestThrough { - earliestThrough = c.Through - } - } +} - // FetchChunks requires chunks to be ordered by external key. - sort.Slice(chunks, func(l, m int) bool { return chunks[l].ExternalKey() < chunks[m].ExternalKey() }) - for _, chk := range chunks { - key := chk.ExternalKey() - keys = append(keys, key) - chks = append(chks, chk) - } - chks, err := f.FetchChunks(ctx, chks, keys) +func calcSyncRanges(from, to int64, shardBy int64) []*syncRange { + //Calculate the sync ranges + syncRanges := []*syncRange{} + //diff := to - from + //shards := diff / shardBy + currentFrom := from + //currentTo := from + currentTo := from + shardBy + for currentFrom < to && currentTo <= to { + s := &syncRange{ + from: currentFrom, + to: currentTo, + } + syncRanges = append(syncRanges, s) + + currentFrom = currentTo + 1 + currentTo = currentTo + shardBy + + if currentTo > to { + currentTo = to + } + } + return syncRanges +} + +type chunkMover struct { + ctx context.Context + source storage.Store + dest storage.Store + sourceUser string + destUser string + matchers []*labels.Matcher + batch int +} + +func newChunkMover(ctx context.Context, source, dest storage.Store, sourceUser, destUser string, matchers []*labels.Matcher, batch int) *chunkMover { + cm := &chunkMover{ + ctx: ctx, + source: source, + dest: dest, + sourceUser: sourceUser, + destUser: destUser, + matchers: matchers, + batch: batch, + } + return cm +} + +func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh <-chan *syncRange, errCh chan<- error) { + + for { + select { + case <-ctx.Done(): + log.Println(threadId, "Requested to be done, context cancelled, quitting.") + return + case sr := <-syncRangeCh: + log.Println(threadId, "Processing", time.Unix(0, sr.from).UTC(), time.Unix(0, sr.to).UTC()) + schemaGroups, fetchers, err := m.source.GetChunkRefs(m.ctx, m.sourceUser, model.TimeFromUnixNano(sr.from), model.TimeFromUnixNano(sr.to), m.matchers...) if err != nil { - log.Fatalf("Error fetching chunks: %v\n", err) + log.Println(threadId, "Error querying index for chunk refs:", err) + errCh <- err + return } + for i, f := range fetchers { + log.Printf("%v Processing Schema %v which contains %v chunks\n", threadId, i, len(schemaGroups[i])) + + //Slice up into batches + for j := 0; j < len(schemaGroups[i]); j += m.batch { + k := j + m.batch + if k > len(schemaGroups[i]) { + k = len(schemaGroups[i]) + } - output := make([]chunk.Chunk, 0, len(chks)) + chunks := schemaGroups[i][j:k] + log.Printf("%v Processing chunks %v-%v of %v\n", threadId, j, k, len(schemaGroups[i])) - // Calculate some size stats and change the tenant ID if necessary - for i, chk := range chks { - if enc, err := chk.Encoded(); err == nil { - totalBytes += len(enc) - } else { - log.Fatalf("Error encoding a chunk: %v\n", err) - } - if *source != *dest { - // Because the incoming chunks are already encoded, to change the username we have to make a new chunk - nc := chunk.NewChunk(*dest, chk.Fingerprint, chk.Metric, chk.Data, chk.From, chk.Through) - err := nc.Encode() + keys := make([]string, 0, len(chunks)) + chks := make([]chunk.Chunk, 0, len(chunks)) + + // FetchChunks requires chunks to be ordered by external key. + sort.Slice(chunks, func(l, m int) bool { return chunks[l].ExternalKey() < chunks[m].ExternalKey() }) + for _, chk := range chunks { + key := chk.ExternalKey() + keys = append(keys, key) + chks = append(chks, chk) + } + chks, err := f.FetchChunks(m.ctx, chks, keys) if err != nil { - log.Fatalf("Failed to encode new chunk with new user: %v\n", err) + log.Println(threadId, "Error fetching chunks:", err) + errCh <- err + return } - output = append(output, nc) - } else { - output = append(output, chks[i]) - } - } + output := make([]chunk.Chunk, 0, len(chks)) + + // Calculate some size stats and change the tenant ID if necessary + for i, chk := range chks { + if _, err := chk.Encoded(); err == nil { + //totalBytes += len(enc) + } else { + log.Println(threadId, "Error encoding a chunk:", err) + errCh <- err + return + } + if m.sourceUser != m.destUser { + // Because the incoming chunks are already encoded, to change the username we have to make a new chunk + nc := chunk.NewChunk(m.destUser, chk.Fingerprint, chk.Metric, chk.Data, chk.From, chk.Through) + err := nc.Encode() + if err != nil { + log.Println(threadId, "Failed to encode new chunk with new user:", err) + errCh <- err + return + } + output = append(output, nc) + } else { + output = append(output, chks[i]) + } - err = d.Put(ctx, output) - if err != nil { - log.Fatalf("Error sending chunks to new store: %v\n", err) + } + + err = m.dest.Put(m.ctx, output) + if err != nil { + log.Println(threadId, "Error sending chunks to new store:", err) + errCh <- err + return + } + log.Println(threadId, "Batch sent successfully") + } } - fmt.Printf("Batch sent successfully, if restarting after this batch you can safely start at time: %v\n", earliestThrough.Time()) } } - - fmt.Printf("Finished transfering %v chunks totalling %v bytes in %v\n", totalChunks, totalBytes, time.Now().Sub(start)) - } func mustParse(t string) time.Time { diff --git a/cmd/migrate/main_test.go b/cmd/migrate/main_test.go new file mode 100644 index 000000000000..f7ebecfe365b --- /dev/null +++ b/cmd/migrate/main_test.go @@ -0,0 +1,112 @@ +package main + +import ( + "github.com/stretchr/testify/assert" + "reflect" + "testing" +) + +func Test_calcSyncRanges(t *testing.T) { + type args struct { + from int64 + to int64 + shardBy int64 + } + tests := []struct { + name string + args args + want []*syncRange + }{ + { + name: "one range", + args: args{ + from: 0, + to: 10, + shardBy: 10, + }, + want: []*syncRange{ + &syncRange{ + from: 0, + to: 10, + }, + }, + }, + { + name: "two ranges", + args: args{ + from: 0, + to: 20, + shardBy: 10, + }, + want: []*syncRange{ + { + from: 0, + to: 10, + }, + { + from: 11, + to: 20, + }, + }, + }, + { + name: "three ranges", + args: args{ + from: 0, + to: 20, + shardBy: 6, + }, + want: []*syncRange{ + { + from: 0, + to: 6, + }, + { + from: 7, + to: 12, + }, + { + from: 13, + to: 18, + }, + { + from: 19, + to: 20, + }, + }, + }, + { + name: "four ranges actual data", + args: args{ + from: 1583798400000000000, + to: 1583884800000000000, + shardBy: 21600000000000, + }, + want: []*syncRange{ + { + from: 1583798400000000000, + to: 1583820000000000000, + }, + { + from: 1583820000000000001, + to: 1583841600000000000, + }, + { + from: 1583841600000000001, + to: 1583863200000000000, + }, + { + from: 1583863200000000001, + to: 1583884800000000000, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := calcSyncRanges(tt.args.from, tt.args.to, tt.args.shardBy); !reflect.DeepEqual(got, tt.want) { + assert.Equal(t, tt.want, got) + } + }) + } +} From af896082c0cfe4a56710979c05719785ac105f1e Mon Sep 17 00:00:00 2001 From: Ed Date: Sat, 11 Apr 2020 21:45:20 -0400 Subject: [PATCH 07/20] adding better stats --- cmd/migrate/main.go | 42 +++++++++++++++++++++++++++++++++++------- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index f842ec97b4e5..1f10d092d298 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -128,7 +128,6 @@ func main() { log.Println("Exiting") os.Exit(0) } - totalBytes := 0 start := time.Now() shardByNs := *shardBy @@ -138,6 +137,7 @@ func main() { cm := newChunkMover(ctx, s, d, *source, *dest, matchers, *batch) syncChan := make(chan *syncRange) errorChan := make(chan error) + statsChan := make(chan stats) //Start the parallel processors var wg sync.WaitGroup @@ -146,7 +146,7 @@ func main() { wg.Add(1) go func(threadId int) { defer wg.Done() - cm.moveChunks(threadId, cancelContext, syncChan, errorChan) + cm.moveChunks(threadId, cancelContext, syncChan, errorChan, statsChan) }(i) } @@ -163,18 +163,32 @@ func main() { cancelFunc() }() + processedChunks := 0 + processedBytes := 0 + + // Launch a thread to track stats + go func() { + for stat := range statsChan { + processedChunks += stat.totalChunks + processedBytes += stat.totalBytes + } + log.Printf("Transfering %v chunks totalling %v bytes in %v\n", processedChunks, processedBytes, time.Now().Sub(start)) + log.Println("Exiting stats thread") + }() + // Wait for an error or the context to be canceled select { case <-cancelContext.Done(): - log.Println("Received done call, waiting on threads") - log.Printf("Finished transfering %v chunks totalling %v bytes in %v\n", totalChunks, totalBytes, time.Now().Sub(start)) + log.Println("Received done call") case err := <-errorChan: log.Println("Received an error from processing thread, shutting down: ", err) cancelFunc() } log.Println("Waiting for threads to exit") wg.Wait() + close(statsChan) log.Println("All threads finished") + log.Println("Going to sleep....") for { time.Sleep(100 * time.Second) @@ -207,6 +221,11 @@ func calcSyncRanges(from, to int64, shardBy int64) []*syncRange { return syncRanges } +type stats struct { + totalChunks int + totalBytes int +} + type chunkMover struct { ctx context.Context source storage.Store @@ -230,7 +249,7 @@ func newChunkMover(ctx context.Context, source, dest storage.Store, sourceUser, return cm } -func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh <-chan *syncRange, errCh chan<- error) { +func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh <-chan *syncRange, errCh chan<- error, statsCh chan<- stats) { for { select { @@ -238,6 +257,9 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < log.Println(threadId, "Requested to be done, context cancelled, quitting.") return case sr := <-syncRangeCh: + start := time.Now() + totalBytes := 0 + totalChunks := 0 log.Println(threadId, "Processing", time.Unix(0, sr.from).UTC(), time.Unix(0, sr.to).UTC()) schemaGroups, fetchers, err := m.source.GetChunkRefs(m.ctx, m.sourceUser, model.TimeFromUnixNano(sr.from), model.TimeFromUnixNano(sr.to), m.matchers...) if err != nil { @@ -274,13 +296,14 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < errCh <- err return } + totalChunks += len(chks) output := make([]chunk.Chunk, 0, len(chks)) // Calculate some size stats and change the tenant ID if necessary for i, chk := range chks { - if _, err := chk.Encoded(); err == nil { - //totalBytes += len(enc) + if enc, err := chk.Encoded(); err == nil { + totalBytes += len(enc) } else { log.Println(threadId, "Error encoding a chunk:", err) errCh <- err @@ -311,6 +334,11 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < log.Println(threadId, "Batch sent successfully") } } + log.Printf("%v Finished processing sync range, %v chunks, %v bytes in %v seconds\n", threadId, totalChunks, totalBytes, time.Now().Sub(start).Seconds()) + statsCh <- stats{ + totalChunks: totalChunks, + totalBytes: totalBytes, + } } } } From 24da86ae28fb163c3bef5a1285487cedb04c5d88 Mon Sep 17 00:00:00 2001 From: Ed Date: Sat, 11 Apr 2020 22:09:27 -0400 Subject: [PATCH 08/20] adding retry logic --- cmd/migrate/main.go | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index 1f10d092d298..ea5235369670 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -290,12 +290,21 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < keys = append(keys, key) chks = append(chks, chk) } - chks, err := f.FetchChunks(m.ctx, chks, keys) - if err != nil { - log.Println(threadId, "Error fetching chunks:", err) - errCh <- err - return + retry := 4 + for retry > 0 { + chks, err = f.FetchChunks(m.ctx, chks, keys) + if err != nil { + if retry == 0 { + log.Println(threadId, "Final error retrieving chunks, giving up:", err) + errCh <- err + return + } else { + log.Println(threadId, "Error fetching chunks, will retry:", err) + retry-- + } + } } + totalChunks += len(chks) output := make([]chunk.Chunk, 0, len(chks)) @@ -325,11 +334,19 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < } - err = m.dest.Put(m.ctx, output) - if err != nil { - log.Println(threadId, "Error sending chunks to new store:", err) - errCh <- err - return + retry = 4 + for retry > 0 { + err = m.dest.Put(m.ctx, output) + if err != nil { + if retry == 0 { + log.Println(threadId, "Final error sending chunks to new store, giving up:", err) + errCh <- err + return + } else { + log.Println(threadId, "Error sending chunks to new store, will retry:", err) + retry-- + } + } } log.Println(threadId, "Batch sent successfully") } From 39db8c968779207722b1763d06d99fe8310f249f Mon Sep 17 00:00:00 2001 From: Ed Date: Sat, 11 Apr 2020 22:13:05 -0400 Subject: [PATCH 09/20] tweaking retry logic --- cmd/migrate/main.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index ea5235369670..2cec60310de2 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -290,8 +290,7 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < keys = append(keys, key) chks = append(chks, chk) } - retry := 4 - for retry > 0 { + for retry := 4; retry >= 0; retry-- { chks, err = f.FetchChunks(m.ctx, chks, keys) if err != nil { if retry == 0 { @@ -302,6 +301,8 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < log.Println(threadId, "Error fetching chunks, will retry:", err) retry-- } + } else { + break } } @@ -333,9 +334,7 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < } } - - retry = 4 - for retry > 0 { + for retry := 4; retry >= 0; retry-- { err = m.dest.Put(m.ctx, output) if err != nil { if retry == 0 { @@ -346,6 +345,8 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < log.Println(threadId, "Error sending chunks to new store, will retry:", err) retry-- } + } else { + break } } log.Println(threadId, "Batch sent successfully") From 90c65d270792c9cf0199038d7012b3f6d9408df3 Mon Sep 17 00:00:00 2001 From: Ed Date: Sat, 11 Apr 2020 22:14:00 -0400 Subject: [PATCH 10/20] tweaking retry logic --- cmd/migrate/main.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index 2cec60310de2..abff53e022e6 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -299,7 +299,6 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < return } else { log.Println(threadId, "Error fetching chunks, will retry:", err) - retry-- } } else { break @@ -343,7 +342,6 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < return } else { log.Println(threadId, "Error sending chunks to new store, will retry:", err) - retry-- } } else { break From 2d5b590556c00868e17c97b2ee7a7a3da408a0c6 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Sun, 26 Apr 2020 19:21:03 -0400 Subject: [PATCH 11/20] work around changes in latest cortex Signed-off-by: Ed Welch --- cmd/migrate/main.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index abff53e022e6..96bb71eedbfb 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -5,6 +5,7 @@ import ( "context" "flag" "fmt" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "log" "os" @@ -14,6 +15,7 @@ import ( "time" "github.com/cortexproject/cortex/pkg/chunk" + gokit "github.com/go-kit/kit/log" "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/user" @@ -68,12 +70,25 @@ func main() { sourceConfig.LimitsConfig.CardinalityLimit = 1e9 sourceConfig.LimitsConfig.MaxQueryLength = 0 limits, err := validation.NewOverrides(sourceConfig.LimitsConfig, nil) + lg := gokit.NewLogfmtLogger(gokit.NewSyncWriter(os.Stdout)) + err = sourceConfig.Validate(lg) + if err != nil { + log.Println("Failed to validate source store config:", err) + os.Exit(1) + } + err = destConfig.Validate(lg) + if err != nil { + log.Println("Failed to validate dest store config:", err) + os.Exit(1) + } + prometheus.DefaultRegisterer = prometheus.NewRegistry() s, err := storage.NewStore(sourceConfig.StorageConfig, sourceConfig.ChunkStoreConfig, sourceConfig.SchemaConfig, limits) if err != nil { log.Println("Failed to create source store:", err) os.Exit(1) } + prometheus.DefaultRegisterer = prometheus.NewRegistry() d, err := storage.NewStore(destConfig.StorageConfig, destConfig.ChunkStoreConfig, destConfig.SchemaConfig, limits) if err != nil { log.Println("Failed to create destination store:", err) @@ -369,3 +384,10 @@ func mustParse(t string) time.Time { return ret } + +type logger struct{} + +func (l *logger) Log(keyvals ...interface{}) error { + log.Println(keyvals) + return nil +} From e01b635ec8fa407ebec714dbf89e11e001bfb20a Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 10 Jan 2021 18:10:26 -0500 Subject: [PATCH 12/20] fix up imports and changes since the last time this was merged added a readme fixed up the makefile added some comments --- .gitignore | 1 + cmd/migrate/Dockerfile | 2 +- cmd/migrate/README.md | 57 ++++++++++++++++++++++++++++++++++++++++++ cmd/migrate/main.go | 27 ++++++++++++++------ pkg/cfg/flag.go | 6 +++++ 5 files changed, 84 insertions(+), 9 deletions(-) create mode 100644 cmd/migrate/README.md diff --git a/.gitignore b/.gitignore index cc5171101664..00a9126ca355 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ cmd/docker-driver/docker-driver cmd/loki-canary/loki-canary cmd/fluent-bit/out_loki.so cmd/fluent-bit/out_loki.h +cmd/migrate/migrate /loki /promtail /logcli diff --git a/cmd/migrate/Dockerfile b/cmd/migrate/Dockerfile index 8c6e28aa6708..41d560e179b5 100644 --- a/cmd/migrate/Dockerfile +++ b/cmd/migrate/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.13 as build +FROM golang:1.15.3 as build COPY . /src/loki WORKDIR /src/loki RUN make clean && make BUILD_IN_CONTAINER=false migrate diff --git a/cmd/migrate/README.md b/cmd/migrate/README.md new file mode 100644 index 000000000000..0689296a5465 --- /dev/null +++ b/cmd/migrate/README.md @@ -0,0 +1,57 @@ +# Loki Migrate Tool + +**WARNING: THIS TOOL IS NOT WELL TESTED, ALWAYS MAKE BACKUPS AND TEST ON LESS IMPORTANT DATA FIRST!** + +This is sort of a bare minimum code hooked directly into the store interfaces within Loki.a + +Two stores are created, a source store and dest store. + +Chunks are queried from the source store and written to the dest store, new index entries are created in the dest store as well. + +This _should_ handle schema changes and different schemas on both the source and dest store, I say should because for the original use this was created this use case was not tested, but there is no reason it shouldn't work for this. + +You should be able to: + +* Migrate between clusters +* Change tenant ID during migration +* Migrate data between schemas + +All data is read and re-written (even when migrating within the same cluster), there are really no optimizations in this code for performance, there are much faster ways to move data depending on what you want to change. + +This is simple and because it uses the storage interfaces should be complete and should stay working, but it's not optimized to be fast. + +There is however some parallelism built in and there are a few flags to tune this, `migrate -help` for more info + +This does not remove any source data, it only reads existing source data and writes to the destination. + +## Usage + +Build with + +``` +make migrate +``` + +or + +``` +make migrate-image +``` + +The docker image currently runs and doesn't do anything, it's intended you exec into the container and run commands manually. + + +### Examples + +Migrate between clusters + +``` +migrate -source.config.file=/etc/loki-us-west1/config/config.yaml -dest.config.file=/etc/loki-us-central1/config/config.yaml -source.tenant=2289 -dest.tenant=2289 -from=2020-06-16T14:00:00-00:00 -to=2020-07-01T00:00:00-00:00 +``` + +Migrate tenant ID within a cluster + +``` +migrate -source.config.file=/etc/loki-us-west1/config/config.yaml -dest.config.file=/etc/loki-us-west1/config/config.yaml -source.tenant=fake -dest.tenant=1 -from=2020-06-16T14:00:00-00:00 -to=2020-07-01T00:00:00-00:00 +``` + diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index 96bb71eedbfb..132eec7a53d3 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -5,8 +5,6 @@ import ( "context" "flag" "fmt" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" "log" "os" "sort" @@ -14,6 +12,11 @@ import ( "sync" "time" + cortex_storage "github.com/cortexproject/cortex/pkg/chunk/storage" + cortex_util "github.com/cortexproject/cortex/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/cortexproject/cortex/pkg/chunk" gokit "github.com/go-kit/kit/log" "github.com/prometheus/prometheus/pkg/labels" @@ -44,24 +47,27 @@ func main() { dest := flag.String("dest.tenant", "fake", "Destination tenant identifier, default is `fake` for single tenant Loki") match := flag.String("match", "", "Optional label match") - batch := flag.Int("batchLen", 1000, "Specify how many chunks to read/write in one batch") + batch := flag.Int("batchLen", 500, "Specify how many chunks to read/write in one batch") shardBy := flag.Duration("shardBy", 6*time.Hour, "Break down the total interval into shards of this size, making this too small can lead to syncing a lot of duplicate chunks") - parallel := flag.Int("parallel", 4, "How many parallel threads to process each shard") + parallel := flag.Int("parallel", 8, "How many parallel threads to process each shard") flag.Parse() + // Create a set of defaults if err := cfg.Unmarshal(&defaultsConfig, cfg.Defaults()); err != nil { log.Println("Failed parsing defaults config:", err) os.Exit(1) } + // Copy each defaults to a source and dest config sourceConfig := defaultsConfig destConfig := defaultsConfig - if err := cfg.YAML(sf)(&sourceConfig); err != nil { + // Load each from provided files + if err := cfg.YAML(*sf, true)(&sourceConfig); err != nil { log.Printf("Failed parsing source config file %v: %v\n", *sf, err) os.Exit(1) } - if err := cfg.YAML(df)(&destConfig); err != nil { + if err := cfg.YAML(*df, true)(&destConfig); err != nil { log.Printf("Failed parsing dest config file %v: %v\n", *df, err) os.Exit(1) } @@ -81,15 +87,19 @@ func main() { log.Println("Failed to validate dest store config:", err) os.Exit(1) } + // Create a new registerer to avoid registering duplicate metrics prometheus.DefaultRegisterer = prometheus.NewRegistry() - s, err := storage.NewStore(sourceConfig.StorageConfig, sourceConfig.ChunkStoreConfig, sourceConfig.SchemaConfig, limits) + sourceStore, err := cortex_storage.NewStore(sourceConfig.StorageConfig.Config, sourceConfig.ChunkStoreConfig, sourceConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, cortex_util.Logger) + s, err := storage.NewStore(sourceConfig.StorageConfig, sourceConfig.SchemaConfig, sourceStore, prometheus.DefaultRegisterer) if err != nil { log.Println("Failed to create source store:", err) os.Exit(1) } + // Create a new registerer to avoid registering duplicate metrics prometheus.DefaultRegisterer = prometheus.NewRegistry() - d, err := storage.NewStore(destConfig.StorageConfig, destConfig.ChunkStoreConfig, destConfig.SchemaConfig, limits) + destStore, err := cortex_storage.NewStore(destConfig.StorageConfig.Config, destConfig.ChunkStoreConfig, destConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, cortex_util.Logger) + d, err := storage.NewStore(destConfig.StorageConfig, destConfig.SchemaConfig, destStore, prometheus.DefaultRegisterer) if err != nil { log.Println("Failed to create destination store:", err) os.Exit(1) @@ -113,6 +123,7 @@ func main() { } ctx := context.Background() + // This is a little weird but it was the easiest way to guarantee the userID is in the right format ctx = user.InjectOrgID(ctx, *source) userID, err := user.ExtractOrgID(ctx) if err != nil { diff --git a/pkg/cfg/flag.go b/pkg/cfg/flag.go index cabf3c8ea230..1813941fe288 100644 --- a/pkg/cfg/flag.go +++ b/pkg/cfg/flag.go @@ -11,6 +11,12 @@ import ( "github.com/pkg/errors" ) +// Defaults registers flags to the command line using dst as the +// flagext.Registerer +func Defaults() Source { + return dDefaults(flag.CommandLine) +} + // dDefaults registers flags to the flagSet using dst as the flagext.Registerer func dDefaults(fs *flag.FlagSet) Source { return func(dst Cloneable) error { From 1bd65f6bb1b411939c0637ff1967ff73d48d80de Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 11 Jan 2021 09:04:39 -0500 Subject: [PATCH 13/20] fixing some of my garbage --- cmd/migrate/main.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index 132eec7a53d3..b9bbc64bf1cb 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -395,10 +395,3 @@ func mustParse(t string) time.Time { return ret } - -type logger struct{} - -func (l *logger) Log(keyvals ...interface{}) error { - log.Println(keyvals) - return nil -} From 70b5340c349080dd87bfc4a236b2becf0ee6acae Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Mon, 11 Jan 2021 09:38:38 -0500 Subject: [PATCH 14/20] fixing more of my garbage --- cmd/migrate/main.go | 46 ++++++++++++++++++++++++---------------- cmd/migrate/main_test.go | 5 +++-- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index b9bbc64bf1cb..9f8f3a06a4fa 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -76,6 +76,10 @@ func main() { sourceConfig.LimitsConfig.CardinalityLimit = 1e9 sourceConfig.LimitsConfig.MaxQueryLength = 0 limits, err := validation.NewOverrides(sourceConfig.LimitsConfig, nil) + if err != nil { + log.Println("Failed to create limit overrides:", err) + os.Exit(1) + } lg := gokit.NewLogfmtLogger(gokit.NewSyncWriter(os.Stdout)) err = sourceConfig.Validate(lg) if err != nil { @@ -90,6 +94,10 @@ func main() { // Create a new registerer to avoid registering duplicate metrics prometheus.DefaultRegisterer = prometheus.NewRegistry() sourceStore, err := cortex_storage.NewStore(sourceConfig.StorageConfig.Config, sourceConfig.ChunkStoreConfig, sourceConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, cortex_util.Logger) + if err != nil { + log.Println("Failed to create source store:", err) + os.Exit(1) + } s, err := storage.NewStore(sourceConfig.StorageConfig, sourceConfig.SchemaConfig, sourceStore, prometheus.DefaultRegisterer) if err != nil { log.Println("Failed to create source store:", err) @@ -99,6 +107,10 @@ func main() { // Create a new registerer to avoid registering duplicate metrics prometheus.DefaultRegisterer = prometheus.NewRegistry() destStore, err := cortex_storage.NewStore(destConfig.StorageConfig.Config, destConfig.ChunkStoreConfig, destConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, cortex_util.Logger) + if err != nil { + log.Println("Failed to create destination store:", err) + os.Exit(1) + } d, err := storage.NewStore(destConfig.StorageConfig, destConfig.SchemaConfig, destStore, prometheus.DefaultRegisterer) if err != nil { log.Println("Failed to create destination store:", err) @@ -172,7 +184,7 @@ func main() { wg.Add(1) go func(threadId int) { defer wg.Done() - cm.moveChunks(threadId, cancelContext, syncChan, errorChan, statsChan) + cm.moveChunks(cancelContext, threadId, syncChan, errorChan, statsChan) }(i) } @@ -198,7 +210,7 @@ func main() { processedChunks += stat.totalChunks processedBytes += stat.totalBytes } - log.Printf("Transfering %v chunks totalling %v bytes in %v\n", processedChunks, processedBytes, time.Now().Sub(start)) + log.Printf("Transferring %v chunks totalling %v bytes in %v\n", processedChunks, processedBytes, time.Since(start)) log.Println("Exiting stats thread") }() @@ -275,26 +287,26 @@ func newChunkMover(ctx context.Context, source, dest storage.Store, sourceUser, return cm } -func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh <-chan *syncRange, errCh chan<- error, statsCh chan<- stats) { +func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh <-chan *syncRange, errCh chan<- error, statsCh chan<- stats) { for { select { case <-ctx.Done(): - log.Println(threadId, "Requested to be done, context cancelled, quitting.") + log.Println(threadID, "Requested to be done, context cancelled, quitting.") return case sr := <-syncRangeCh: start := time.Now() totalBytes := 0 totalChunks := 0 - log.Println(threadId, "Processing", time.Unix(0, sr.from).UTC(), time.Unix(0, sr.to).UTC()) + log.Println(threadID, "Processing", time.Unix(0, sr.from).UTC(), time.Unix(0, sr.to).UTC()) schemaGroups, fetchers, err := m.source.GetChunkRefs(m.ctx, m.sourceUser, model.TimeFromUnixNano(sr.from), model.TimeFromUnixNano(sr.to), m.matchers...) if err != nil { - log.Println(threadId, "Error querying index for chunk refs:", err) + log.Println(threadID, "Error querying index for chunk refs:", err) errCh <- err return } for i, f := range fetchers { - log.Printf("%v Processing Schema %v which contains %v chunks\n", threadId, i, len(schemaGroups[i])) + log.Printf("%v Processing Schema %v which contains %v chunks\n", threadID, i, len(schemaGroups[i])) //Slice up into batches for j := 0; j < len(schemaGroups[i]); j += m.batch { @@ -304,7 +316,7 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < } chunks := schemaGroups[i][j:k] - log.Printf("%v Processing chunks %v-%v of %v\n", threadId, j, k, len(schemaGroups[i])) + log.Printf("%v Processing chunks %v-%v of %v\n", threadID, j, k, len(schemaGroups[i])) keys := make([]string, 0, len(chunks)) chks := make([]chunk.Chunk, 0, len(chunks)) @@ -320,12 +332,11 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < chks, err = f.FetchChunks(m.ctx, chks, keys) if err != nil { if retry == 0 { - log.Println(threadId, "Final error retrieving chunks, giving up:", err) + log.Println(threadID, "Final error retrieving chunks, giving up:", err) errCh <- err return - } else { - log.Println(threadId, "Error fetching chunks, will retry:", err) } + log.Println(threadID, "Error fetching chunks, will retry:", err) } else { break } @@ -340,7 +351,7 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < if enc, err := chk.Encoded(); err == nil { totalBytes += len(enc) } else { - log.Println(threadId, "Error encoding a chunk:", err) + log.Println(threadID, "Error encoding a chunk:", err) errCh <- err return } @@ -349,7 +360,7 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < nc := chunk.NewChunk(m.destUser, chk.Fingerprint, chk.Metric, chk.Data, chk.From, chk.Through) err := nc.Encode() if err != nil { - log.Println(threadId, "Failed to encode new chunk with new user:", err) + log.Println(threadID, "Failed to encode new chunk with new user:", err) errCh <- err return } @@ -363,20 +374,19 @@ func (m *chunkMover) moveChunks(threadId int, ctx context.Context, syncRangeCh < err = m.dest.Put(m.ctx, output) if err != nil { if retry == 0 { - log.Println(threadId, "Final error sending chunks to new store, giving up:", err) + log.Println(threadID, "Final error sending chunks to new store, giving up:", err) errCh <- err return - } else { - log.Println(threadId, "Error sending chunks to new store, will retry:", err) } + log.Println(threadID, "Error sending chunks to new store, will retry:", err) } else { break } } - log.Println(threadId, "Batch sent successfully") + log.Println(threadID, "Batch sent successfully") } } - log.Printf("%v Finished processing sync range, %v chunks, %v bytes in %v seconds\n", threadId, totalChunks, totalBytes, time.Now().Sub(start).Seconds()) + log.Printf("%v Finished processing sync range, %v chunks, %v bytes in %v seconds\n", threadID, totalChunks, totalBytes, time.Since(start).Seconds()) statsCh <- stats{ totalChunks: totalChunks, totalBytes: totalBytes, diff --git a/cmd/migrate/main_test.go b/cmd/migrate/main_test.go index f7ebecfe365b..3133c22c8b77 100644 --- a/cmd/migrate/main_test.go +++ b/cmd/migrate/main_test.go @@ -1,9 +1,10 @@ package main import ( - "github.com/stretchr/testify/assert" "reflect" "testing" + + "github.com/stretchr/testify/assert" ) func Test_calcSyncRanges(t *testing.T) { @@ -25,7 +26,7 @@ func Test_calcSyncRanges(t *testing.T) { shardBy: 10, }, want: []*syncRange{ - &syncRange{ + { from: 0, to: 10, }, From 6913a18eeb38439010cb5fa37f5b626682ada670 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Wed, 20 Jan 2021 10:19:18 -0500 Subject: [PATCH 15/20] Update cmd/migrate/README.md Co-authored-by: Owen Diehl --- cmd/migrate/README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/migrate/README.md b/cmd/migrate/README.md index 0689296a5465..d187a581372c 100644 --- a/cmd/migrate/README.md +++ b/cmd/migrate/README.md @@ -2,7 +2,7 @@ **WARNING: THIS TOOL IS NOT WELL TESTED, ALWAYS MAKE BACKUPS AND TEST ON LESS IMPORTANT DATA FIRST!** -This is sort of a bare minimum code hooked directly into the store interfaces within Loki.a +This is sort of a bare minimum code hooked directly into the store interfaces within Loki. Two stores are created, a source store and dest store. @@ -54,4 +54,3 @@ Migrate tenant ID within a cluster ``` migrate -source.config.file=/etc/loki-us-west1/config/config.yaml -dest.config.file=/etc/loki-us-west1/config/config.yaml -source.tenant=fake -dest.tenant=1 -from=2020-06-16T14:00:00-00:00 -to=2020-07-01T00:00:00-00:00 ``` - From 3e00b36d4ca6c51efc8b8fee55a18772212cc8c3 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Wed, 20 Jan 2021 10:19:25 -0500 Subject: [PATCH 16/20] Update cmd/migrate/README.md Co-authored-by: Owen Diehl --- cmd/migrate/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/migrate/README.md b/cmd/migrate/README.md index d187a581372c..aa57e75a7884 100644 --- a/cmd/migrate/README.md +++ b/cmd/migrate/README.md @@ -4,7 +4,7 @@ This is sort of a bare minimum code hooked directly into the store interfaces within Loki. -Two stores are created, a source store and dest store. +Two stores are created, a source store and destination (abbreviated dest) store. Chunks are queried from the source store and written to the dest store, new index entries are created in the dest store as well. From 2420b266cc41a3bc5a4a8c39dd6b22fee5881972 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Wed, 20 Jan 2021 10:19:41 -0500 Subject: [PATCH 17/20] Update cmd/migrate/README.md Co-authored-by: Owen Diehl --- cmd/migrate/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/migrate/README.md b/cmd/migrate/README.md index aa57e75a7884..75c31f94878f 100644 --- a/cmd/migrate/README.md +++ b/cmd/migrate/README.md @@ -16,7 +16,7 @@ You should be able to: * Change tenant ID during migration * Migrate data between schemas -All data is read and re-written (even when migrating within the same cluster), there are really no optimizations in this code for performance, there are much faster ways to move data depending on what you want to change. +All data is read and re-written (even when migrating within the same cluster). There are really no optimizations in this code for performance and there are much faster ways to move data depending on what you want to change. This is simple and because it uses the storage interfaces should be complete and should stay working, but it's not optimized to be fast. From a7adba05dddaa65d937fc92d8ce1c8b5347196c9 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Wed, 20 Jan 2021 10:19:48 -0500 Subject: [PATCH 18/20] Update cmd/migrate/README.md Co-authored-by: Owen Diehl --- cmd/migrate/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/migrate/README.md b/cmd/migrate/README.md index 75c31f94878f..7cd79e18dbda 100644 --- a/cmd/migrate/README.md +++ b/cmd/migrate/README.md @@ -18,7 +18,7 @@ You should be able to: All data is read and re-written (even when migrating within the same cluster). There are really no optimizations in this code for performance and there are much faster ways to move data depending on what you want to change. -This is simple and because it uses the storage interfaces should be complete and should stay working, but it's not optimized to be fast. +This is simple and because it uses the storage interfaces, should be complete and should stay working, but it's not optimized to be fast. There is however some parallelism built in and there are a few flags to tune this, `migrate -help` for more info From 3e23467a00e702ba2c4c9e4879d7879c8b553c7c Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Wed, 20 Jan 2021 10:20:17 -0500 Subject: [PATCH 19/20] Update cmd/migrate/README.md Co-authored-by: Owen Diehl --- cmd/migrate/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/migrate/README.md b/cmd/migrate/README.md index 7cd79e18dbda..07601448d56d 100644 --- a/cmd/migrate/README.md +++ b/cmd/migrate/README.md @@ -8,7 +8,7 @@ Two stores are created, a source store and destination (abbreviated dest) store. Chunks are queried from the source store and written to the dest store, new index entries are created in the dest store as well. -This _should_ handle schema changes and different schemas on both the source and dest store, I say should because for the original use this was created this use case was not tested, but there is no reason it shouldn't work for this. +This _should_ handle schema changes and different schemas on both the source and dest store. You should be able to: From 09078f14779cc12f0bd48ce840ca18dfa4593254 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Thu, 28 Jan 2021 15:21:51 -0500 Subject: [PATCH 20/20] tabs not spaces --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 2f8fbe8169df..410f6a7bdb1f 100644 --- a/Makefile +++ b/Makefile @@ -284,7 +284,7 @@ clean: rm -rf dist/ rm -rf cmd/fluent-bit/out_grafana_loki.h rm -rf cmd/fluent-bit/out_grafana_loki.so - rm -rf cmd/migrate/migrate + rm -rf cmd/migrate/migrate go clean $(MOD_FLAG) ./... #########