Skip to content

Commit

Permalink
Add /ingester/shutdown_and_forget handler
Browse files Browse the repository at this point in the history
This handler can be used to gracefully shut down a Loki instance and
delete the file that persists the tokens of the ingester ring.

In production environments you usually want to persist ring tokens so
that during a restart of an ingester instance, or during rollout, the
tokens from that instance are not re-distributed to other instances, but
instead kept so that the same streams end up on the same instance once
it is up and running again. For that, the tokens are written to a file
that can be specified via the `-ingester.tokens-file-path` argument.

In certain cases, however, you want to forget the tokens and
re-distribute them when shutting down an ingester instance. This was
already possible by calling `/ingester/flush_shutdown`, deleting the
tokens file and terminating the instance. The new handler
`/ingester/shutdown_and_forget` combines these manual steps into a
single handler.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed May 17, 2022
1 parent 3e04537 commit 5211bb5
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 2 deletions.
28 changes: 28 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/modules"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
Expand Down Expand Up @@ -173,6 +174,7 @@ type Interface interface {
CheckReady(ctx context.Context) error
FlushHandler(w http.ResponseWriter, _ *http.Request)
ShutdownHandler(w http.ResponseWriter, r *http.Request)
ShutdownAndForgetHandler(w http.ResponseWriter, r *http.Request)
GetOrCreateInstance(instanceID string) *instance
}

Expand Down Expand Up @@ -506,6 +508,12 @@ func (i *Ingester) stopping(_ error) error {
}
i.flushQueuesDone.Wait()

// In case the flag to clear tokens on shutdown is set we need to mark the
// ingester service as "failed", so Loki will shut down entirely.
// The module manager logs the failure `modules.ErrStopProcess` in a special way.
if i.lifecycler.ClearTokensOnShutdown() && errs.Err() == nil {
return modules.ErrStopProcess
}
return errs.Err()
}

Expand Down Expand Up @@ -538,6 +546,26 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

// ShutdownAndForgetHandler triggers the following operations:
// * Change the state of ring to stop accepting writes.
// * Flush all the chunks.
// * Unregister from KV store
// * Terminate process
func (i *Ingester) ShutdownAndForgetHandler(w http.ResponseWriter, r *http.Request) {
i.lifecycler.SetFlushOnShutdown(true)
i.lifecycler.SetUnregisterOnShutdown(true)
i.lifecycler.SetClearTokensOnShutdown(true)
err := services.StopAndAwaitTerminated(context.Background(), i)
// Stopping the module will return the modules.ErrStopProcess error. This is
// needed so the Loki process is shut down completely.
if err == nil || err == modules.ErrStopProcess {
w.WriteHeader(http.StatusNoContent)
} else {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
}
}

// Push implements logproto.Pusher.
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
instanceID, err := tenant.TenantID(ctx)
Expand Down
11 changes: 9 additions & 2 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,15 @@ func (t *Loki) initIngester() (_ services.Service, err error) {
httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
)
t.Server.HTTP.Path("/flush").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler)))
t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler)))
t.Server.HTTP.Methods("GET", "POST").Path("/flush").Handler(
httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler)),
)
t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(
httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler)),
)
t.Server.HTTP.Methods("POST").Path("/ingester/shutdown_and_forget").Handler(
httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownAndForgetHandler)),
)

return t.Ingester, nil
}
Expand Down
76 changes: 76 additions & 0 deletions pkg/storage/stores/series/series_index_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package series

import (
"context"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/validation"
)

type mockStore struct {
index.Client
queries []index.Query
results index.ReadBatchResult
}

func (m *mockStore) QueryPages(ctx context.Context, queries []index.Query, callback index.QueryPagesCallback) error {
for _, query := range queries {
callback(query, m.results)
}
m.queries = append(m.queries, queries...)
return nil
}

func defaultLimits() (*validation.Overrides, error) {
var defaults validation.Limits
flagext.DefaultValues(&defaults)
defaults.CardinalityLimit = 5
return validation.NewOverrides(defaults, nil)
}

func BenchmarkIndexStore(b *testing.B) {
store := &mockStore{
results: index.ReadBatch{
Entries: []index.CacheEntry{{
Column: []byte("foo"),
Value: []byte("bar"),
}},
},
}
limits, err := defaultLimits()
require.NoError(b, err)
logger := log.NewNopLogger()
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 10, TTL: 10 * time.Second}, nil, logger)
client := index.NewCachingIndexClient(store, cache, 100*time.Millisecond, limits, logger, false)

schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{
{From: config.DayTime{Time: model.Now().Add(-24 * time.Hour)}, Schema: "v12", RowShards: 16},
},
}
schema, err := index.CreateSchema(schemaCfg.Configs[0])
require.NoError(b, err)
idxStore := NewIndexStore(schemaCfg, schema, client, nil, 10)

userID := "fake"
from := model.Now().Add(-1 * time.Hour)
through := model.Now()
ctx := user.InjectOrgID(context.Background(), userID)

b.ResetTimer()
b.ReportAllocs()
for x := 0; x < b.N; x++ {
res, err := idxStore.LabelValuesForMetricName(ctx, userID, from, through, "logs", "foo")
b.Log(res, err)
}
}

0 comments on commit 5211bb5

Please sign in to comment.