Skip to content

Commit

Permalink
Retry pulling Cloudflare logs after a stream error (#5158)
Browse files Browse the repository at this point in the history
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Jan 17, 2022
1 parent f884f2b commit 144bee1
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 16 deletions.
41 changes: 26 additions & 15 deletions clients/pkg/promtail/targets/cloudflare/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,26 +156,37 @@ func (t *Target) pull(ctx context.Context, start, end time.Time) error {
backoff.Wait()
continue
}
defer it.Close()
for it.Next() {
if err := func() error {
defer it.Close()
var lineRead int64
for it.Next() {
line := it.Line()
ts, err := jsonparser.GetInt(line, "EdgeStartTimestamp")
if err != nil {
ts = time.Now().UnixNano()
}
t.handler.Chan() <- api.Entry{
Labels: t.config.Labels.Clone(),
Entry: logproto.Entry{
Timestamp: time.Unix(0, ts),
Line: string(line),
},
}
lineRead++
t.metrics.Entries.Inc()
}
if it.Err() != nil {
level.Warn(t.logger).Log("msg", "failed iterating over logs", "err", it.Err(), "start", start, "end", end, "retries", backoff.NumRetries(), "lineRead", lineRead)
return it.Err()
}
line := it.Line()
ts, err := jsonparser.GetInt(line, "EdgeStartTimestamp")
if err != nil {
ts = time.Now().UnixNano()
}
t.handler.Chan() <- api.Entry{
Labels: t.config.Labels.Clone(),
Entry: logproto.Entry{
Timestamp: time.Unix(0, ts),
Line: string(line),
},
}
t.metrics.Entries.Inc()
return nil
}(); err != nil {
errs.Add(err)
backoff.Wait()
continue
}
return nil

}
return errs.Err()
}
Expand Down
47 changes: 47 additions & 0 deletions clients/pkg/promtail/targets/cloudflare/target_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cloudflare

import (
"context"
"errors"
"os"
"sort"
Expand Down Expand Up @@ -99,6 +100,52 @@ func Test_CloudflareTarget(t *testing.T) {
require.Greater(t, newPos, end.UnixNano())
}

func Test_RetryErrorIterating(t *testing.T) {
var (
w = log.NewSyncWriter(os.Stderr)
logger = log.NewLogfmtLogger(w)
end = time.Unix(0, time.Hour.Nanoseconds())
start = time.Unix(0, end.Add(-30*time.Minute).UnixNano())
client = fake.New(func() {})
cfClient = newFakeCloudflareClient()
)
cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{
logs: []string{
`{"EdgeStartTimestamp":1, "EdgeRequestHost":"foo.com"}`,
`error`,
},
}, nil).Once()
// setup response for the first pull batch of 1 minutes.
cfClient.On("LogpullReceived", mock.Anything, start, end).Return(&fakeLogIterator{
logs: []string{
`{"EdgeStartTimestamp":1, "EdgeRequestHost":"foo.com"}`,
`{"EdgeStartTimestamp":2, "EdgeRequestHost":"foo.com"}`,
`{"EdgeStartTimestamp":3, "EdgeRequestHost":"foo.com"}`,
},
}, nil).Once()
// replace the client.
getClient = func(apiKey, zoneID string, fields []string) (Client, error) {
return cfClient, nil
}
// retries as fast as possible.
defaultBackoff.MinBackoff = 0
defaultBackoff.MaxBackoff = 0
ta := &Target{
logger: logger,
handler: client,
client: cfClient,
config: &scrapeconfig.CloudflareConfig{
Labels: make(model.LabelSet),
},
metrics: NewMetrics(prometheus.DefaultRegisterer),
}

require.NoError(t, ta.pull(context.Background(), start, end))
require.Eventually(t, func() bool {
return len(client.Received()) == 4
}, 5*time.Second, 100*time.Millisecond)
}

func Test_CloudflareTargetError(t *testing.T) {
var (
w = log.NewSyncWriter(os.Stderr)
Expand Down
9 changes: 8 additions & 1 deletion clients/pkg/promtail/targets/cloudflare/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cloudflare

import (
"context"
"errors"
"time"

"github.com/cloudflare/cloudflare-go"
Expand All @@ -25,17 +26,23 @@ func (f *fakeCloudflareClient) CallCount() int {
type fakeLogIterator struct {
logs []string
current string

err error
}

func (f *fakeLogIterator) Next() bool {
if len(f.logs) == 0 {
return false
}
f.current = f.logs[0]
if f.current == `error` {
f.err = errors.New("error")
return false
}
f.logs = f.logs[1:]
return true
}
func (f *fakeLogIterator) Err() error { return nil }
func (f *fakeLogIterator) Err() error { return f.err }
func (f *fakeLogIterator) Line() []byte { return []byte(f.current) }
func (f *fakeLogIterator) Fields() (map[string]string, error) { return nil, nil }
func (f *fakeLogIterator) Close() error { return nil }
Expand Down

0 comments on commit 144bee1

Please sign in to comment.