From 2416feb22cd2d0c207c916750982b5f83ab12bab Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Sun, 8 Nov 2020 18:05:52 +0100 Subject: [PATCH] fix(docker-driver): Multiplex the client.batchSend promtail `client` stop is not propaged properly to the its `batchSend` method in the `run` method. though `run` method multiplex on `c.quit` channel, the other branch can block in `batchSend` without even listening on the `c.quit` channel on the `run` method. This fixes the it by multiplex on `c.quit` inside the batchSend as well. Other suggestion: expose `client.run` to promtail client and they control it via context. But that requires API changes like accepting `ctx` in `client.Run` method. --- cmd/docker-driver/Dockerfile | 1 + pkg/promtail/client/client.go | 99 +++++++++++++++++++++-------------- 2 files changed, 60 insertions(+), 40 deletions(-) diff --git a/cmd/docker-driver/Dockerfile b/cmd/docker-driver/Dockerfile index f00ca8f35619..3e9e66e06fbe 100644 --- a/cmd/docker-driver/Dockerfile +++ b/cmd/docker-driver/Dockerfile @@ -11,6 +11,7 @@ RUN make clean && make BUILD_IN_CONTAINER=false cmd/docker-driver/docker-driver FROM alpine:3.11 RUN apk add --update --no-cache ca-certificates tzdata +RUN mkdir -p /run/docker/plugins COPY --from=build /src/loki/cmd/docker-driver/docker-driver /bin/docker-driver WORKDIR /bin/ ENTRYPOINT [ "/bin/docker-driver" ] diff --git a/pkg/promtail/client/client.go b/pkg/promtail/client/client.go index 543268b68fb2..4dafb32c6435 100644 --- a/pkg/promtail/client/client.go +++ b/pkg/promtail/client/client.go @@ -246,54 +246,73 @@ func (c *client) sendBatch(tenantID string, batch *batch) { bufBytes := float64(len(buf)) encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig) var status int - for backoff.Ongoing() { - start := time.Now() - status, err = c.send(ctx, tenantID, buf) - requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds()) - - if err == nil { - sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) - sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) - for _, s := range batch.streams { - lbls, err := parser.ParseMetric(s.Labels) - if err != nil { - // is this possible? - level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err) - return - } - var lblSet model.LabelSet - for i := range lbls { - if lbls[i].Name == LatencyLabel { - lblSet = model.LabelSet{ - model.LabelName(HostLabel): model.LabelValue(c.cfg.URL.Host), - model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value), + + // instrument + defer func() { + if err != nil { + level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err) + droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) + droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) + } + }() + + var nextDelay time.Duration + + // sendBatch can block till backoff.DelayMax. So need to multiplex with `c.quit` channel to propagate the client's stop. + for { + select { + case <-c.quit: + return + case <-time.After(nextDelay): + + if !backoff.Ongoing() { + return + } + + start := time.Now() + status, err = c.send(ctx, tenantID, buf) + requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds()) + + if err == nil { + sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) + sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) + for _, s := range batch.streams { + lbls, err := parser.ParseMetric(s.Labels) + if err != nil { + // is this possible? + level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err) + return + } + var lblSet model.LabelSet + for i := range lbls { + if lbls[i].Name == LatencyLabel { + lblSet = model.LabelSet{ + model.LabelName(HostLabel): model.LabelValue(c.cfg.URL.Host), + model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value), + } } } + if lblSet != nil { + streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds()) + } } - if lblSet != nil { - streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds()) - } + return } - return - } - // Only retry 429s, 500s and connection-level errors. - if status > 0 && status != 429 && status/100 != 5 { - break - } - - level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err) - batchRetries.WithLabelValues(c.cfg.URL.Host).Inc() - backoff.Wait() - } + // Only retry 429s, 500s and connection-level errors. + if status > 0 && status != 429 && status/100 != 5 { + return + } - if err != nil { - level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err) - droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) - droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) + level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err) + batchRetries.WithLabelValues(c.cfg.URL.Host).Inc() + nextDelay = backoff.NextDelay() + } } }