-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: Add ingester handler for shutdown and forget tokens #6179
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -11,6 +11,7 @@ import ( | |||
"time" | ||||
|
||||
"github.com/go-kit/log/level" | ||||
"github.com/grafana/dskit/modules" | ||||
"github.com/grafana/dskit/ring" | ||||
"github.com/grafana/dskit/services" | ||||
"github.com/grafana/dskit/tenant" | ||||
|
@@ -172,8 +173,10 @@ type Interface interface { | |||
logproto.QuerierServer | ||||
CheckReady(ctx context.Context) error | ||||
FlushHandler(w http.ResponseWriter, _ *http.Request) | ||||
ShutdownHandler(w http.ResponseWriter, r *http.Request) | ||||
GetOrCreateInstance(instanceID string) (*instance, error) | ||||
// deprecated | ||||
LegacyShutdownHandler(w http.ResponseWriter, r *http.Request) | ||||
ShutdownHandler(w http.ResponseWriter, r *http.Request) | ||||
} | ||||
|
||||
// Ingester builds chunks for incoming log streams. | ||||
|
@@ -209,6 +212,10 @@ type Ingester struct { | |||
// Denotes whether the ingester should flush on shutdown. | ||||
// Currently only used by the WAL to signal when the disk is full. | ||||
flushOnShutdownSwitch *OnceSwitch | ||||
// Flag for whether stopping the ingester service should also terminate the | ||||
// loki process. | ||||
// This is set when calling the shutdown handler. | ||||
terminateOnShutdown bool | ||||
|
||||
// Only used by WAL & flusher to coordinate backpressure during replay. | ||||
replayController *replayController | ||||
|
@@ -245,6 +252,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid | |||
tailersQuit: make(chan struct{}), | ||||
metrics: metrics, | ||||
flushOnShutdownSwitch: &OnceSwitch{}, | ||||
terminateOnShutdown: false, | ||||
} | ||||
i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i}) | ||||
|
||||
|
@@ -506,6 +514,12 @@ func (i *Ingester) stopping(_ error) error { | |||
} | ||||
i.flushQueuesDone.Wait() | ||||
|
||||
// In case the flag to terminate on shutdown is set we need to mark the | ||||
// ingester service as "failed", so Loki will shut down entirely. | ||||
// The module manager logs the failure `modules.ErrStopProcess` in a special way. | ||||
if i.terminateOnShutdown && errs.Err() == nil { | ||||
return modules.ErrStopProcess | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why can't we just rely on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, because we need to be able to distinguish between stopping of the ingester service that happened through the signal handler, and stopping of the service that was triggered by the shutdown handler. In case of the former, the error will be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In both cases, we are returning There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I see what you mean, but the returned error Line 385 in 3b3fcf6
|
||||
} | ||||
return errs.Err() | ||||
} | ||||
|
||||
|
@@ -526,10 +540,16 @@ func (i *Ingester) loop() { | |||
} | ||||
} | ||||
|
||||
// ShutdownHandler triggers the following set of operations in order: | ||||
// LegacyShutdownHandler triggers the following set of operations in order: | ||||
// * Change the state of ring to stop accepting writes. | ||||
// * Flush all the chunks. | ||||
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { | ||||
// Note: This handler does not trigger a termination of the Loki process, | ||||
// despite its name. Instead, the ingester service is stopped, so an external | ||||
// source can trigger a safe termination through a signal to the process. | ||||
// The handler is deprecated and usage is discouraged. Use ShutdownHandler | ||||
// instead. | ||||
func (i *Ingester) LegacyShutdownHandler(w http.ResponseWriter, r *http.Request) { | ||||
level.Warn(util_log.Logger).Log("msg", "The handler /ingester/flush_shutdown is deprecated and usage is discouraged. Please use /ingester/shutdown?flush=true instead.") | ||||
originalState := i.lifecycler.FlushOnShutdown() | ||||
// We want to flush the chunks if transfer fails irrespective of original flag. | ||||
i.lifecycler.SetFlushOnShutdown(true) | ||||
|
@@ -538,6 +558,45 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { | |||
w.WriteHeader(http.StatusNoContent) | ||||
} | ||||
|
||||
// ShutdownHandler handles a graceful shutdown of the ingester service and | ||||
// termination of the Loki process. | ||||
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { | ||||
// Don't allow calling the shutdown handler multiple times | ||||
if i.State() != services.Running { | ||||
w.WriteHeader(http.StatusServiceUnavailable) | ||||
_, _ = w.Write([]byte("Ingester is stopping or already stopped.")) | ||||
dannykopping marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
return | ||||
} | ||||
params := r.URL.Query() | ||||
doFlush := util.FlagFromValues(params, "flush", true) | ||||
doDeleteRingTokens := util.FlagFromValues(params, "delete_ring_tokens", false) | ||||
doTerminate := util.FlagFromValues(params, "terminate", true) | ||||
err := i.handleShutdown(doTerminate, doFlush, doDeleteRingTokens) | ||||
|
||||
// Stopping the module will return the modules.ErrStopProcess error. This is | ||||
// needed so the Loki process is shut down completely. | ||||
if err == nil || err == modules.ErrStopProcess { | ||||
w.WriteHeader(http.StatusNoContent) | ||||
} else { | ||||
w.WriteHeader(http.StatusInternalServerError) | ||||
_, _ = w.Write([]byte(err.Error())) | ||||
} | ||||
} | ||||
|
||||
// handleShutdown triggers the following operations: | ||||
// * Change the state of ring to stop accepting writes. | ||||
// * optional: Flush all the chunks. | ||||
// * optional: Delete ring tokens file | ||||
// * Unregister from KV store | ||||
// * optional: Terminate process (handled by service manager in loki.go) | ||||
func (i *Ingester) handleShutdown(terminate, flush, del bool) error { | ||||
i.lifecycler.SetFlushOnShutdown(flush) | ||||
i.lifecycler.SetClearTokensOnShutdown(del) | ||||
i.lifecycler.SetUnregisterOnShutdown(true) | ||||
i.terminateOnShutdown = terminate | ||||
return services.StopAndAwaitTerminated(context.Background(), i) | ||||
} | ||||
|
||||
// Push implements logproto.Pusher. | ||||
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) { | ||||
instanceID, err := tenant.TenantID(ctx) | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is a POST, curious why we opt for url params intead of a request body?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Query params are I guess easier to write when you execute the request using
curl
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I think we do not follow any strict, restful HTTP API design guides in our endpoint design.