From 144bee11af8262be94c5a04bd94f3b2b9bc22178 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 17 Jan 2022 10:38:56 +0100 Subject: [PATCH] Retry pulling Cloudflare logs after a stream error (#5158) Signed-off-by: Cyril Tovena --- .../pkg/promtail/targets/cloudflare/target.go | 41 ++++++++++------ .../targets/cloudflare/target_test.go | 47 +++++++++++++++++++ .../promtail/targets/cloudflare/util_test.go | 9 +++- 3 files changed, 81 insertions(+), 16 deletions(-) diff --git a/clients/pkg/promtail/targets/cloudflare/target.go b/clients/pkg/promtail/targets/cloudflare/target.go index baba1343d4f0..806eefabfdc1 100644 --- a/clients/pkg/promtail/targets/cloudflare/target.go +++ b/clients/pkg/promtail/targets/cloudflare/target.go @@ -156,26 +156,37 @@ func (t *Target) pull(ctx context.Context, start, end time.Time) error { backoff.Wait() continue } - defer it.Close() - for it.Next() { + if err := func() error { + defer it.Close() + var lineRead int64 + for it.Next() { + line := it.Line() + ts, err := jsonparser.GetInt(line, "EdgeStartTimestamp") + if err != nil { + ts = time.Now().UnixNano() + } + t.handler.Chan() <- api.Entry{ + Labels: t.config.Labels.Clone(), + Entry: logproto.Entry{ + Timestamp: time.Unix(0, ts), + Line: string(line), + }, + } + lineRead++ + t.metrics.Entries.Inc() + } if it.Err() != nil { + level.Warn(t.logger).Log("msg", "failed iterating over logs", "err", it.Err(), "start", start, "end", end, "retries", backoff.NumRetries(), "lineRead", lineRead) return it.Err() } - line := it.Line() - ts, err := jsonparser.GetInt(line, "EdgeStartTimestamp") - if err != nil { - ts = time.Now().UnixNano() - } - t.handler.Chan() <- api.Entry{ - Labels: t.config.Labels.Clone(), - Entry: logproto.Entry{ - Timestamp: time.Unix(0, ts), - Line: string(line), - }, - } - t.metrics.Entries.Inc() + return nil + }(); err != nil { + errs.Add(err) + backoff.Wait() + continue } return nil + } return errs.Err() } diff --git a/clients/pkg/promtail/targets/cloudflare/target_test.go b/clients/pkg/promtail/targets/cloudflare/target_test.go index f2c870e23730..eec6e5197df9 100644 --- a/clients/pkg/promtail/targets/cloudflare/target_test.go +++ b/clients/pkg/promtail/targets/cloudflare/target_test.go @@ -1,6 +1,7 @@ package cloudflare import ( + "context" "errors" "os" "sort" @@ -99,6 +100,52 @@ func Test_CloudflareTarget(t *testing.T) { require.Greater(t, newPos, end.UnixNano()) } +func Test_RetryErrorIterating(t *testing.T) { + var ( + w = log.NewSyncWriter(os.Stderr) + logger = log.NewLogfmtLogger(w) + end = time.Unix(0, time.Hour.Nanoseconds()) + start = time.Unix(0, end.Add(-30*time.Minute).UnixNano()) + client = fake.New(func() {}) + cfClient = newFakeCloudflareClient() + ) + cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{ + logs: []string{ + `{"EdgeStartTimestamp":1, "EdgeRequestHost":"foo.com"}`, + `error`, + }, + }, nil).Once() + // setup response for the first pull batch of 1 minutes. + cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{ + logs: []string{ + `{"EdgeStartTimestamp":1, "EdgeRequestHost":"foo.com"}`, + `{"EdgeStartTimestamp":2, "EdgeRequestHost":"foo.com"}`, + `{"EdgeStartTimestamp":3, "EdgeRequestHost":"foo.com"}`, + }, + }, nil).Once() + // replace the client. + getClient = func(apiKey, zoneID string, fields []string) (Client, error) { + return cfClient, nil + } + // retries as fast as possible. + defaultBackoff.MinBackoff = 0 + defaultBackoff.MaxBackoff = 0 + ta := &Target{ + logger: logger, + handler: client, + client: cfClient, + config: &scrapeconfig.CloudflareConfig{ + Labels: make(model.LabelSet), + }, + metrics: NewMetrics(prometheus.DefaultRegisterer), + } + + require.NoError(t, ta.pull(context.Background(), start, end)) + require.Eventually(t, func() bool { + return len(client.Received()) == 4 + }, 5*time.Second, 100*time.Millisecond) +} + func Test_CloudflareTargetError(t *testing.T) { var ( w = log.NewSyncWriter(os.Stderr) diff --git a/clients/pkg/promtail/targets/cloudflare/util_test.go b/clients/pkg/promtail/targets/cloudflare/util_test.go index 4d9077e12fa7..14d241c51375 100644 --- a/clients/pkg/promtail/targets/cloudflare/util_test.go +++ b/clients/pkg/promtail/targets/cloudflare/util_test.go @@ -2,6 +2,7 @@ package cloudflare import ( "context" + "errors" "time" "github.com/cloudflare/cloudflare-go" @@ -25,6 +26,8 @@ func (f *fakeCloudflareClient) CallCount() int { type fakeLogIterator struct { logs []string current string + + err error } func (f *fakeLogIterator) Next() bool { @@ -32,10 +35,14 @@ func (f *fakeLogIterator) Next() bool { return false } f.current = f.logs[0] + if f.current == `error` { + f.err = errors.New("error") + return false + } f.logs = f.logs[1:] return true } -func (f *fakeLogIterator) Err() error { return nil } +func (f *fakeLogIterator) Err() error { return f.err } func (f *fakeLogIterator) Line() []byte { return []byte(f.current) } func (f *fakeLogIterator) Fields() (map[string]string, error) { return nil, nil } func (f *fakeLogIterator) Close() error { return nil }