Skip to content

Commit

Permalink
Asynchronous Promtail stages (#2996)
Browse files Browse the repository at this point in the history
* Introducing go pipelines to promtail.

Based off @jeschkies  idea.

WIP

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes all tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* More tests and code cleanup.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Wip breaking things.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixing tests, adding Stop to the interface.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes all tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes more test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Moar fixes for tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Close correctly client before reading.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Use defer.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add some comments.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes lint issues

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Attempt to fix journald without linux.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes journald json test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add missing stop in the filetarget.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Update pkg/logentry/stages/match_test.go

Co-authored-by: Karsten Jeschkies <k@jeschkies.xyz>

* First set of feeback review fixes.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Use newEntry as suggested by Karsten.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fix tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* lint.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

Co-authored-by: Karsten Jeschkies <k@jeschkies.xyz>
  • Loading branch information
cyriltovena and jeschkies authored Dec 2, 2020
1 parent 05f3d6e commit c4faa57
Show file tree
Hide file tree
Showing 58 changed files with 1,046 additions and 968 deletions.
16 changes: 15 additions & 1 deletion cmd/docker-driver/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logentry/stages"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/client"
)
Expand All @@ -21,6 +22,8 @@ type loki struct {
handler api.EntryHandler
labels model.LabelSet
logger log.Logger

stop func()
}

// New create a new Loki logger that forward logs to Loki instance
Expand All @@ -35,18 +38,21 @@ func New(logCtx logger.Info, logger log.Logger) (logger.Logger, error) {
return nil, err
}
var handler api.EntryHandler = c
var stop func() = func() {}
if len(cfg.pipeline.PipelineStages) != 0 {
pipeline, err := stages.NewPipeline(logger, cfg.pipeline.PipelineStages, &jobName, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
handler = pipeline.Wrap(c)
stop = handler.Stop
}
return &loki{
client: c,
labels: cfg.labels,
logger: logger,
handler: handler,
stop: stop,
}, nil
}

Expand All @@ -60,7 +66,14 @@ func (l *loki) Log(m *logger.Message) error {
if m.Source != "" {
lbs["source"] = model.LabelValue(m.Source)
}
return l.handler.Handle(lbs, m.Timestamp, string(m.Line))
l.handler.Chan() <- api.Entry{
Labels: lbs,
Entry: logproto.Entry{
Timestamp: m.Timestamp,
Line: string(m.Line),
},
}
return nil
}

// Log implements `logger.Logger`
Expand All @@ -70,6 +83,7 @@ func (l *loki) Name() string {

// Log implements `logger.Logger`
func (l *loki) Close() error {
l.stop()
l.client.StopNow()
return nil
}
58 changes: 42 additions & 16 deletions cmd/fluent-bit/dque.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/joncrlsn/dque"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/client"
)

Expand Down Expand Up @@ -39,10 +41,12 @@ func dqueEntryBuilder() interface{} {
}

type dqueClient struct {
logger log.Logger
queue *dque.DQue
loki client.Client
once sync.Once
logger log.Logger
queue *dque.DQue
loki client.Client
once sync.Once
wg sync.WaitGroup
entries chan api.Entry
}

// New makes a new dque loki client
Expand Down Expand Up @@ -72,11 +76,16 @@ func newDque(cfg *config, logger log.Logger) (client.Client, error) {
return nil, err
}

q.entries = make(chan api.Entry)

q.wg.Add(2)
go q.enqueuer()
go q.dequeuer()
return q, nil
}

func (c *dqueClient) dequeuer() {
defer c.wg.Done()
for {
// Dequeue the next item in the queue
entry, err := c.queue.DequeueBlock()
Expand All @@ -97,29 +106,46 @@ func (c *dqueClient) dequeuer() {
continue
}

if err := c.loki.Handle(record.Lbs, record.Ts, record.Line); err != nil {
level.Error(c.logger).Log("msg", "error sending record to Loki", "error", err)
c.loki.Chan() <- api.Entry{
Labels: record.Lbs,
Entry: logproto.Entry{
Timestamp: record.Ts,
Line: record.Line,
},
}
}
}

// Stop the client
func (c *dqueClient) Stop() {
c.once.Do(func() { c.queue.Close() })
c.loki.Stop()
c.once.Do(func() {
close(c.entries)
c.queue.Close()
c.loki.Stop()
c.wg.Wait()
})

}

func (c *dqueClient) Chan() chan<- api.Entry {
return c.entries
}

// Stop the client
func (c *dqueClient) StopNow() {
c.once.Do(func() { c.queue.Close() })
c.loki.StopNow()
c.once.Do(func() {
close(c.entries)
c.queue.Close()
c.loki.StopNow()
c.wg.Wait()
})
}

// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *dqueClient) Handle(ls model.LabelSet, t time.Time, s string) error {
if err := c.queue.Enqueue(&dqueEntry{ls, t, s}); err != nil {
return fmt.Errorf("cannot enqueue record %s: %s", s, err)
func (c *dqueClient) enqueuer() {
defer c.wg.Done()
for e := range c.entries {
if err := c.queue.Enqueue(&dqueEntry{e.Labels, e.Timestamp, e.Line}); err != nil {
level.Warn(c.logger).Log("msg", fmt.Sprintf("cannot enqueue record %s:", e.Line), "err", err)
}
}

return nil
}
21 changes: 18 additions & 3 deletions cmd/fluent-bit/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/prometheus/common/model"
"github.com/weaveworks/common/logging"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/client"
)

Expand Down Expand Up @@ -63,15 +65,28 @@ func (l *loki) sendRecord(r map[interface{}]interface{}, ts time.Time) error {
}
if l.cfg.dropSingleKey && len(records) == 1 {
for _, v := range records {
return l.client.Handle(lbs, ts, fmt.Sprintf("%v", v))
l.client.Chan() <- api.Entry{
Labels: lbs,
Entry: logproto.Entry{
Timestamp: ts,
Line: fmt.Sprintf("%v", v),
},
}
return nil
}
}
line, err := createLine(records, l.cfg.lineFormat)
if err != nil {
return fmt.Errorf("error creating line: %v", err)
}

return l.client.Handle(lbs, ts, line)
l.client.Chan() <- api.Entry{
Labels: lbs,
Entry: logproto.Entry{
Timestamp: ts,
Line: line,
},
}
return nil
}

// prevent base64-encoding []byte values (default json.Encoder rule) by
Expand Down
58 changes: 19 additions & 39 deletions cmd/fluent-bit/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,11 @@ import (

jsoniter "github.com/json-iterator/go"
"github.com/prometheus/common/model"
)

type entry struct {
lbs model.LabelSet
line string
ts time.Time
}

type recorder struct {
*entry
}

func (r *recorder) Handle(labels model.LabelSet, time time.Time, e string) error {
r.entry = &entry{
labels,
e,
time,
}
return nil
}

func (r *recorder) toEntry() *entry { return r.entry }

func (r *recorder) Stop() {}

func (r *recorder) StopNow() {}
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/client/fake"
)

var now = time.Now()

Expand Down Expand Up @@ -78,24 +57,24 @@ func Test_loki_sendRecord(t *testing.T) {
name string
cfg *config
record map[interface{}]interface{}
want *entry
want []api.Entry
wantErr bool
}{
{"map to JSON", &config{labelKeys: []string{"A"}, lineFormat: jsonFormat}, mapRecordFixture, &entry{model.LabelSet{"A": "A"}, `{"B":"B","C":"C","D":"D","E":"E","F":"F","G":"G","H":"H"}`, now}, false},
{"map to kvPairFormat", &config{labelKeys: []string{"A"}, lineFormat: kvPairFormat}, mapRecordFixture, &entry{model.LabelSet{"A": "A"}, `B=B C=C D=D E=E F=F G=G H=H`, now}, false},
{"not enough records", &config{labelKeys: []string{"foo"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, simpleRecordFixture, nil, false},
{"labels", &config{labelKeys: []string{"bar", "fake"}, lineFormat: jsonFormat, removeKeys: []string{"fuzz", "error"}}, simpleRecordFixture, &entry{model.LabelSet{"bar": "500"}, `{"foo":"bar"}`, now}, false},
{"remove key", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `{"bar":500}`, now}, false},
{"error", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo"}}, simpleRecordFixture, nil, true},
{"key value", &config{labelKeys: []string{"fake"}, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `bar=500`, now}, false},
{"single", &config{labelKeys: []string{"fake"}, dropSingleKey: true, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `500`, now}, false},
{"labelmap", &config{labelMap: map[string]interface{}{"bar": "other"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, simpleRecordFixture, &entry{model.LabelSet{"other": "500"}, `{"foo":"bar"}`, now}, false},
{"byte array", &config{labelKeys: []string{"label"}, lineFormat: jsonFormat}, byteArrayRecordFixture, &entry{model.LabelSet{"label": "label"}, `{"map":{"inner":"bar"},"outer":"foo"}`, now}, false},
{"mixed types", &config{labelKeys: []string{"label"}, lineFormat: jsonFormat}, mixedTypesRecordFixture, &entry{model.LabelSet{"label": "label"}, `{"array":[42,42.42,"foo"],"float":42.42,"int":42,"map":{"nested":{"foo":"bar","invalid":"a\ufffdz"}}}`, now}, false},
{"map to JSON", &config{labelKeys: []string{"A"}, lineFormat: jsonFormat}, mapRecordFixture, []api.Entry{{Labels: model.LabelSet{"A": "A"}, Entry: logproto.Entry{Line: `{"B":"B","C":"C","D":"D","E":"E","F":"F","G":"G","H":"H"}`, Timestamp: now}}}, false},
{"map to kvPairFormat", &config{labelKeys: []string{"A"}, lineFormat: kvPairFormat}, mapRecordFixture, []api.Entry{{Labels: model.LabelSet{"A": "A"}, Entry: logproto.Entry{Line: `B=B C=C D=D E=E F=F G=G H=H`, Timestamp: now}}}, false},
{"not enough records", &config{labelKeys: []string{"foo"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, simpleRecordFixture, []api.Entry{}, false},
{"labels", &config{labelKeys: []string{"bar", "fake"}, lineFormat: jsonFormat, removeKeys: []string{"fuzz", "error"}}, simpleRecordFixture, []api.Entry{{Labels: model.LabelSet{"bar": "500"}, Entry: logproto.Entry{Line: `{"foo":"bar"}`, Timestamp: now}}}, false},
{"remove key", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, []api.Entry{{Labels: model.LabelSet{}, Entry: logproto.Entry{Line: `{"bar":500}`, Timestamp: now}}}, false},
{"error", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo"}}, simpleRecordFixture, []api.Entry{}, true},
{"key value", &config{labelKeys: []string{"fake"}, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, []api.Entry{{Labels: model.LabelSet{}, Entry: logproto.Entry{Line: `bar=500`, Timestamp: now}}}, false},
{"single", &config{labelKeys: []string{"fake"}, dropSingleKey: true, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, []api.Entry{{Labels: model.LabelSet{}, Entry: logproto.Entry{Line: `500`, Timestamp: now}}}, false},
{"labelmap", &config{labelMap: map[string]interface{}{"bar": "other"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, simpleRecordFixture, []api.Entry{{Labels: model.LabelSet{"other": "500"}, Entry: logproto.Entry{Line: `{"foo":"bar"}`, Timestamp: now}}}, false},
{"byte array", &config{labelKeys: []string{"label"}, lineFormat: jsonFormat}, byteArrayRecordFixture, []api.Entry{{Labels: model.LabelSet{"label": "label"}, Entry: logproto.Entry{Line: `{"map":{"inner":"bar"},"outer":"foo"}`, Timestamp: now}}}, false},
{"mixed types", &config{labelKeys: []string{"label"}, lineFormat: jsonFormat}, mixedTypesRecordFixture, []api.Entry{{Labels: model.LabelSet{"label": "label"}, Entry: logproto.Entry{Line: `{"array":[42,42.42,"foo"],"float":42.42,"int":42,"map":{"nested":{"foo":"bar","invalid":"a\ufffdz"}}}`, Timestamp: now}}}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rec := &recorder{}
rec := fake.New(func() {})
l := &loki{
cfg: tt.cfg,
client: rec,
Expand All @@ -106,7 +85,8 @@ func Test_loki_sendRecord(t *testing.T) {
t.Errorf("sendRecord() error = %v, wantErr %v", err, tt.wantErr)
return
}
got := rec.toEntry()
rec.Stop()
got := rec.Received()
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("sendRecord() want:%v got:%v", tt.want, got)
}
Expand Down
Loading

0 comments on commit c4faa57

Please sign in to comment.