Skip to content

Commit

Permalink
fix(docker-driver): Multiplex the client.batchSend
Browse files Browse the repository at this point in the history
promtail `client` stop is not propaged properly to the its `batchSend` method
in the `run` method.

though `run` method multiplex on `c.quit` channel, the other branch can block
in `batchSend` without even listening on the `c.quit` channel on the `run` method.
This fixes the it by multiplex on `c.quit` inside the batchSend as well.

Other suggestion: expose `client.run` to promtail client and they control it via
context. But that requires API changes like accepting `ctx` in `client.Run` method.
  • Loading branch information
kavirajk committed Nov 9, 2020
1 parent fd451d9 commit 2416feb
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 40 deletions.
1 change: 1 addition & 0 deletions cmd/docker-driver/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ RUN make clean && make BUILD_IN_CONTAINER=false cmd/docker-driver/docker-driver

FROM alpine:3.11
RUN apk add --update --no-cache ca-certificates tzdata
RUN mkdir -p /run/docker/plugins
COPY --from=build /src/loki/cmd/docker-driver/docker-driver /bin/docker-driver
WORKDIR /bin/
ENTRYPOINT [ "/bin/docker-driver" ]
99 changes: 59 additions & 40 deletions pkg/promtail/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,54 +246,73 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
bufBytes := float64(len(buf))
encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)

ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig)
var status int
for backoff.Ongoing() {
start := time.Now()
status, err = c.send(ctx, tenantID, buf)
requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())

if err == nil {
sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
for _, s := range batch.streams {
lbls, err := parser.ParseMetric(s.Labels)
if err != nil {
// is this possible?
level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err)
return
}
var lblSet model.LabelSet
for i := range lbls {
if lbls[i].Name == LatencyLabel {
lblSet = model.LabelSet{
model.LabelName(HostLabel): model.LabelValue(c.cfg.URL.Host),
model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value),

// instrument
defer func() {
if err != nil {
level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
}
}()

var nextDelay time.Duration

// sendBatch can block till backoff.DelayMax. So need to multiplex with `c.quit` channel to propagate the client's stop.
for {
select {
case <-c.quit:
return
case <-time.After(nextDelay):

if !backoff.Ongoing() {
return
}

start := time.Now()
status, err = c.send(ctx, tenantID, buf)
requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())

if err == nil {
sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
for _, s := range batch.streams {
lbls, err := parser.ParseMetric(s.Labels)
if err != nil {
// is this possible?
level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err)
return
}
var lblSet model.LabelSet
for i := range lbls {
if lbls[i].Name == LatencyLabel {
lblSet = model.LabelSet{
model.LabelName(HostLabel): model.LabelValue(c.cfg.URL.Host),
model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value),
}
}
}
if lblSet != nil {
streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds())
}
}
if lblSet != nil {
streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds())
}
return
}
return
}

// Only retry 429s, 500s and connection-level errors.
if status > 0 && status != 429 && status/100 != 5 {
break
}

level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
backoff.Wait()
}
// Only retry 429s, 500s and connection-level errors.
if status > 0 && status != 429 && status/100 != 5 {
return
}

if err != nil {
level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
nextDelay = backoff.NextDelay()
}
}
}

Expand Down

0 comments on commit 2416feb

Please sign in to comment.