diff --git a/cmd/docker-driver/loki.go b/cmd/docker-driver/loki.go index 6496bf380e3f..63a736bdcb93 100644 --- a/cmd/docker-driver/loki.go +++ b/cmd/docker-driver/loki.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/logentry/stages" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/promtail/client" ) @@ -21,6 +22,8 @@ type loki struct { handler api.EntryHandler labels model.LabelSet logger log.Logger + + stop func() } // New create a new Loki logger that forward logs to Loki instance @@ -35,18 +38,21 @@ func New(logCtx logger.Info, logger log.Logger) (logger.Logger, error) { return nil, err } var handler api.EntryHandler = c + var stop func() = func() {} if len(cfg.pipeline.PipelineStages) != 0 { pipeline, err := stages.NewPipeline(logger, cfg.pipeline.PipelineStages, &jobName, prometheus.DefaultRegisterer) if err != nil { return nil, err } handler = pipeline.Wrap(c) + stop = handler.Stop } return &loki{ client: c, labels: cfg.labels, logger: logger, handler: handler, + stop: stop, }, nil } @@ -60,7 +66,14 @@ func (l *loki) Log(m *logger.Message) error { if m.Source != "" { lbs["source"] = model.LabelValue(m.Source) } - return l.handler.Handle(lbs, m.Timestamp, string(m.Line)) + l.handler.Chan() <- api.Entry{ + Labels: lbs, + Entry: logproto.Entry{ + Timestamp: m.Timestamp, + Line: string(m.Line), + }, + } + return nil } // Log implements `logger.Logger` @@ -70,6 +83,7 @@ func (l *loki) Name() string { // Log implements `logger.Logger` func (l *loki) Close() error { + l.stop() l.client.StopNow() return nil } diff --git a/cmd/fluent-bit/dque.go b/cmd/fluent-bit/dque.go index 8636eb2ef4e5..6515b11307f9 100644 --- a/cmd/fluent-bit/dque.go +++ b/cmd/fluent-bit/dque.go @@ -11,6 +11,8 @@ import ( "github.com/joncrlsn/dque" "github.com/prometheus/common/model" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/promtail/client" ) @@ -39,10 +41,12 @@ func dqueEntryBuilder() interface{} { } type dqueClient struct { - logger log.Logger - queue *dque.DQue - loki client.Client - once sync.Once + logger log.Logger + queue *dque.DQue + loki client.Client + once sync.Once + wg sync.WaitGroup + entries chan api.Entry } // New makes a new dque loki client @@ -72,11 +76,16 @@ func newDque(cfg *config, logger log.Logger) (client.Client, error) { return nil, err } + q.entries = make(chan api.Entry) + + q.wg.Add(2) + go q.enqueuer() go q.dequeuer() return q, nil } func (c *dqueClient) dequeuer() { + defer c.wg.Done() for { // Dequeue the next item in the queue entry, err := c.queue.DequeueBlock() @@ -97,29 +106,46 @@ func (c *dqueClient) dequeuer() { continue } - if err := c.loki.Handle(record.Lbs, record.Ts, record.Line); err != nil { - level.Error(c.logger).Log("msg", "error sending record to Loki", "error", err) + c.loki.Chan() <- api.Entry{ + Labels: record.Lbs, + Entry: logproto.Entry{ + Timestamp: record.Ts, + Line: record.Line, + }, } } } // Stop the client func (c *dqueClient) Stop() { - c.once.Do(func() { c.queue.Close() }) - c.loki.Stop() + c.once.Do(func() { + close(c.entries) + c.queue.Close() + c.loki.Stop() + c.wg.Wait() + }) + +} + +func (c *dqueClient) Chan() chan<- api.Entry { + return c.entries } // Stop the client func (c *dqueClient) StopNow() { - c.once.Do(func() { c.queue.Close() }) - c.loki.StopNow() + c.once.Do(func() { + close(c.entries) + c.queue.Close() + c.loki.StopNow() + c.wg.Wait() + }) } -// Handle implement EntryHandler; adds a new line to the next batch; send is async. -func (c *dqueClient) Handle(ls model.LabelSet, t time.Time, s string) error { - if err := c.queue.Enqueue(&dqueEntry{ls, t, s}); err != nil { - return fmt.Errorf("cannot enqueue record %s: %s", s, err) +func (c *dqueClient) enqueuer() { + defer c.wg.Done() + for e := range c.entries { + if err := c.queue.Enqueue(&dqueEntry{e.Labels, e.Timestamp, e.Line}); err != nil { + level.Warn(c.logger).Log("msg", fmt.Sprintf("cannot enqueue record %s:", e.Line), "err", err) + } } - - return nil } diff --git a/cmd/fluent-bit/loki.go b/cmd/fluent-bit/loki.go index e9e036060d28..4b0d62102325 100644 --- a/cmd/fluent-bit/loki.go +++ b/cmd/fluent-bit/loki.go @@ -16,6 +16,8 @@ import ( "github.com/prometheus/common/model" "github.com/weaveworks/common/logging" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/promtail/client" ) @@ -63,15 +65,28 @@ func (l *loki) sendRecord(r map[interface{}]interface{}, ts time.Time) error { } if l.cfg.dropSingleKey && len(records) == 1 { for _, v := range records { - return l.client.Handle(lbs, ts, fmt.Sprintf("%v", v)) + l.client.Chan() <- api.Entry{ + Labels: lbs, + Entry: logproto.Entry{ + Timestamp: ts, + Line: fmt.Sprintf("%v", v), + }, + } + return nil } } line, err := createLine(records, l.cfg.lineFormat) if err != nil { return fmt.Errorf("error creating line: %v", err) } - - return l.client.Handle(lbs, ts, line) + l.client.Chan() <- api.Entry{ + Labels: lbs, + Entry: logproto.Entry{ + Timestamp: ts, + Line: line, + }, + } + return nil } // prevent base64-encoding []byte values (default json.Encoder rule) by diff --git a/cmd/fluent-bit/loki_test.go b/cmd/fluent-bit/loki_test.go index 3c3f437a75b1..a2754669e849 100644 --- a/cmd/fluent-bit/loki_test.go +++ b/cmd/fluent-bit/loki_test.go @@ -8,32 +8,11 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/prometheus/common/model" -) - -type entry struct { - lbs model.LabelSet - line string - ts time.Time -} -type recorder struct { - *entry -} - -func (r *recorder) Handle(labels model.LabelSet, time time.Time, e string) error { - r.entry = &entry{ - labels, - e, - time, - } - return nil -} - -func (r *recorder) toEntry() *entry { return r.entry } - -func (r *recorder) Stop() {} - -func (r *recorder) StopNow() {} + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" + "github.com/grafana/loki/pkg/promtail/client/fake" +) var now = time.Now() @@ -78,24 +57,24 @@ func Test_loki_sendRecord(t *testing.T) { name string cfg *config record map[interface{}]interface{} - want *entry + want []api.Entry wantErr bool }{ - {"map to JSON", &config{labelKeys: []string{"A"}, lineFormat: jsonFormat}, mapRecordFixture, &entry{model.LabelSet{"A": "A"}, `{"B":"B","C":"C","D":"D","E":"E","F":"F","G":"G","H":"H"}`, now}, false}, - {"map to kvPairFormat", &config{labelKeys: []string{"A"}, lineFormat: kvPairFormat}, mapRecordFixture, &entry{model.LabelSet{"A": "A"}, `B=B C=C D=D E=E F=F G=G H=H`, now}, false}, - {"not enough records", &config{labelKeys: []string{"foo"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, simpleRecordFixture, nil, false}, - {"labels", &config{labelKeys: []string{"bar", "fake"}, lineFormat: jsonFormat, removeKeys: []string{"fuzz", "error"}}, simpleRecordFixture, &entry{model.LabelSet{"bar": "500"}, `{"foo":"bar"}`, now}, false}, - {"remove key", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `{"bar":500}`, now}, false}, - {"error", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo"}}, simpleRecordFixture, nil, true}, - {"key value", &config{labelKeys: []string{"fake"}, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `bar=500`, now}, false}, - {"single", &config{labelKeys: []string{"fake"}, dropSingleKey: true, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, &entry{model.LabelSet{}, `500`, now}, false}, - {"labelmap", &config{labelMap: map[string]interface{}{"bar": "other"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, simpleRecordFixture, &entry{model.LabelSet{"other": "500"}, `{"foo":"bar"}`, now}, false}, - {"byte array", &config{labelKeys: []string{"label"}, lineFormat: jsonFormat}, byteArrayRecordFixture, &entry{model.LabelSet{"label": "label"}, `{"map":{"inner":"bar"},"outer":"foo"}`, now}, false}, - {"mixed types", &config{labelKeys: []string{"label"}, lineFormat: jsonFormat}, mixedTypesRecordFixture, &entry{model.LabelSet{"label": "label"}, `{"array":[42,42.42,"foo"],"float":42.42,"int":42,"map":{"nested":{"foo":"bar","invalid":"a\ufffdz"}}}`, now}, false}, + {"map to JSON", &config{labelKeys: []string{"A"}, lineFormat: jsonFormat}, mapRecordFixture, []api.Entry{{Labels: model.LabelSet{"A": "A"}, Entry: logproto.Entry{Line: `{"B":"B","C":"C","D":"D","E":"E","F":"F","G":"G","H":"H"}`, Timestamp: now}}}, false}, + {"map to kvPairFormat", &config{labelKeys: []string{"A"}, lineFormat: kvPairFormat}, mapRecordFixture, []api.Entry{{Labels: model.LabelSet{"A": "A"}, Entry: logproto.Entry{Line: `B=B C=C D=D E=E F=F G=G H=H`, Timestamp: now}}}, false}, + {"not enough records", &config{labelKeys: []string{"foo"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, simpleRecordFixture, []api.Entry{}, false}, + {"labels", &config{labelKeys: []string{"bar", "fake"}, lineFormat: jsonFormat, removeKeys: []string{"fuzz", "error"}}, simpleRecordFixture, []api.Entry{{Labels: model.LabelSet{"bar": "500"}, Entry: logproto.Entry{Line: `{"foo":"bar"}`, Timestamp: now}}}, false}, + {"remove key", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, []api.Entry{{Labels: model.LabelSet{}, Entry: logproto.Entry{Line: `{"bar":500}`, Timestamp: now}}}, false}, + {"error", &config{labelKeys: []string{"fake"}, lineFormat: jsonFormat, removeKeys: []string{"foo"}}, simpleRecordFixture, []api.Entry{}, true}, + {"key value", &config{labelKeys: []string{"fake"}, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, []api.Entry{{Labels: model.LabelSet{}, Entry: logproto.Entry{Line: `bar=500`, Timestamp: now}}}, false}, + {"single", &config{labelKeys: []string{"fake"}, dropSingleKey: true, lineFormat: kvPairFormat, removeKeys: []string{"foo", "error", "fake"}}, simpleRecordFixture, []api.Entry{{Labels: model.LabelSet{}, Entry: logproto.Entry{Line: `500`, Timestamp: now}}}, false}, + {"labelmap", &config{labelMap: map[string]interface{}{"bar": "other"}, lineFormat: jsonFormat, removeKeys: []string{"bar", "error"}}, simpleRecordFixture, []api.Entry{{Labels: model.LabelSet{"other": "500"}, Entry: logproto.Entry{Line: `{"foo":"bar"}`, Timestamp: now}}}, false}, + {"byte array", &config{labelKeys: []string{"label"}, lineFormat: jsonFormat}, byteArrayRecordFixture, []api.Entry{{Labels: model.LabelSet{"label": "label"}, Entry: logproto.Entry{Line: `{"map":{"inner":"bar"},"outer":"foo"}`, Timestamp: now}}}, false}, + {"mixed types", &config{labelKeys: []string{"label"}, lineFormat: jsonFormat}, mixedTypesRecordFixture, []api.Entry{{Labels: model.LabelSet{"label": "label"}, Entry: logproto.Entry{Line: `{"array":[42,42.42,"foo"],"float":42.42,"int":42,"map":{"nested":{"foo":"bar","invalid":"a\ufffdz"}}}`, Timestamp: now}}}, false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - rec := &recorder{} + rec := fake.New(func() {}) l := &loki{ cfg: tt.cfg, client: rec, @@ -106,7 +85,8 @@ func Test_loki_sendRecord(t *testing.T) { t.Errorf("sendRecord() error = %v, wantErr %v", err, tt.wantErr) return } - got := rec.toEntry() + rec.Stop() + got := rec.Received() if !reflect.DeepEqual(got, tt.want) { t.Errorf("sendRecord() want:%v got:%v", tt.want, got) } diff --git a/pkg/logentry/stages/drop.go b/pkg/logentry/stages/drop.go index ee67dc5a9988..368d641f6fe0 100644 --- a/pkg/logentry/stages/drop.go +++ b/pkg/logentry/stages/drop.go @@ -10,7 +10,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" - "github.com/prometheus/common/model" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/util/flagext" ) @@ -76,7 +76,7 @@ func validateDropConfig(cfg *DropConfig) error { } // newDropStage creates a DropStage from config -func newDropStage(logger log.Logger, config interface{}) (Stage, error) { +func newDropStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) { cfg := &DropConfig{} err := mapstructure.WeakDecode(config, cfg) if err != nil { @@ -88,55 +88,71 @@ func newDropStage(logger log.Logger, config interface{}) (Stage, error) { } return &dropStage{ - logger: log.With(logger, "component", "stage", "type", "drop"), - cfg: cfg, + logger: log.With(logger, "component", "stage", "type", "drop"), + cfg: cfg, + dropCount: getDropCountMetric(registerer), }, nil } // dropStage applies Label matchers to determine if the include stages should be run type dropStage struct { - logger log.Logger - cfg *DropConfig + logger log.Logger + cfg *DropConfig + dropCount *prometheus.CounterVec } -// Process implements Stage -func (m *dropStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { +func (m *dropStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + for e := range in { + if !m.shouldDrop(e) { + out <- e + continue + } + m.dropCount.WithLabelValues(*m.cfg.DropReason) + } + }() + return out +} + +func (m *dropStage) shouldDrop(e Entry) bool { // There are many options for dropping a log and if multiple are defined it's treated like an AND condition // where all drop conditions must be met to drop the log. // Therefore if at any point there is a condition which does not match we can return. // The order is what I roughly think would be fastest check to slowest check to try to quit early whenever possible if m.cfg.LongerThan != nil { - if len([]byte(*entry)) > m.cfg.longerThan.Val() { + if len(e.Line) > m.cfg.longerThan.Val() { // Too long, drop if Debug { - level.Debug(m.logger).Log("msg", fmt.Sprintf("line met drop criteria for length %v > %v", len([]byte(*entry)), m.cfg.longerThan.Val())) + level.Debug(m.logger).Log("msg", fmt.Sprintf("line met drop criteria for length %v > %v", len(e.Line), m.cfg.longerThan.Val())) } } else { if Debug { - level.Debug(m.logger).Log("msg", fmt.Sprintf("line will not be dropped, it did not meet criteria for drop length %v is not greater than %v", len([]byte(*entry)), m.cfg.longerThan.Val())) + level.Debug(m.logger).Log("msg", fmt.Sprintf("line will not be dropped, it did not meet criteria for drop length %v is not greater than %v", len(e.Line), m.cfg.longerThan.Val())) } - return + return false } } if m.cfg.OlderThan != nil { ct := time.Now() - if t.Before(ct.Add(-m.cfg.olderThan)) { + if e.Timestamp.Before(ct.Add(-m.cfg.olderThan)) { // Too old, drop if Debug { - level.Debug(m.logger).Log("msg", fmt.Sprintf("line met drop criteria for age; current time=%v, drop before=%v, log timestamp=%v", ct, ct.Add(-m.cfg.olderThan), t)) + level.Debug(m.logger).Log("msg", fmt.Sprintf("line met drop criteria for age; current time=%v, drop before=%v, log timestamp=%v", ct, ct.Add(-m.cfg.olderThan), e.Timestamp)) } } else { if Debug { - level.Debug(m.logger).Log("msg", fmt.Sprintf("line will not be dropped, it did not meet drop criteria for age; current time=%v, drop before=%v, log timestamp=%v", ct, ct.Add(-m.cfg.olderThan), t)) + level.Debug(m.logger).Log("msg", fmt.Sprintf("line will not be dropped, it did not meet drop criteria for age; current time=%v, drop before=%v, log timestamp=%v", ct, ct.Add(-m.cfg.olderThan), e.Timestamp)) } - return + return false } } if m.cfg.Source != nil && m.cfg.Expression == nil { - if v, ok := extracted[*m.cfg.Source]; ok { + if v, ok := e.Extracted[*m.cfg.Source]; ok { if m.cfg.Value == nil { // Found in map, no value set meaning drop if found in map if Debug { @@ -153,7 +169,7 @@ func (m *dropStage) Process(labels model.LabelSet, extracted map[string]interfac if Debug { level.Debug(m.logger).Log("msg", fmt.Sprintf("line will not be dropped, source key was found in extracted map but value '%v' did not match desired value '%v'", v, *m.cfg.Value)) } - return + return false } } } else { @@ -161,19 +177,19 @@ func (m *dropStage) Process(labels model.LabelSet, extracted map[string]interfac if Debug { level.Debug(m.logger).Log("msg", "line will not be dropped, the provided source was not found in the extracted map") } - return + return false } } if m.cfg.Expression != nil { if m.cfg.Source != nil { - if v, ok := extracted[*m.cfg.Source]; ok { + if v, ok := e.Extracted[*m.cfg.Source]; ok { s, err := getString(v) if err != nil { if Debug { level.Debug(m.logger).Log("msg", "Failed to convert extracted map value to string, cannot test regex line will not be dropped.", "err", err, "type", reflect.TypeOf(v)) } - return + return false } match := m.cfg.regex.FindStringSubmatch(s) if match == nil { @@ -181,7 +197,7 @@ func (m *dropStage) Process(labels model.LabelSet, extracted map[string]interfac if Debug { level.Debug(m.logger).Log("msg", fmt.Sprintf("line will not be dropped, the provided regular expression did not match the value found in the extracted map for source key: %v", *m.cfg.Source)) } - return + return false } // regex match, will be dropped if Debug { @@ -193,28 +209,19 @@ func (m *dropStage) Process(labels model.LabelSet, extracted map[string]interfac if Debug { level.Debug(m.logger).Log("msg", "line will not be dropped, the provided source was not found in the extracted map") } - return + return false } } else { - if entry != nil { - match := m.cfg.regex.FindStringSubmatch(*entry) - if match == nil { - // Not a match to the regex, don't drop - if Debug { - level.Debug(m.logger).Log("msg", "line will not be dropped, the provided regular expression did not match the log line") - } - return - } + match := m.cfg.regex.FindStringSubmatch(e.Line) + if match == nil { + // Not a match to the regex, don't drop if Debug { - level.Debug(m.logger).Log("msg", "line met drop criteria, the provided regular expression matched the log line") + level.Debug(m.logger).Log("msg", "line will not be dropped, the provided regular expression did not match the log line") } - - } else { - // Not a match to entry was nil, do not drop - if Debug { - level.Debug(m.logger).Log("msg", "line will not be dropped, because it was nil and we can't regex match to nil") - } - return + return false + } + if Debug { + level.Debug(m.logger).Log("msg", "line met drop criteria, the provided regular expression matched the log line") } } } @@ -223,8 +230,7 @@ func (m *dropStage) Process(labels model.LabelSet, extracted map[string]interfac if Debug { level.Debug(m.logger).Log("msg", "all criteria met, line will be dropped") } - // Adds the drop label to not be sent by the api.EntryHandler - labels[dropLabel] = model.LabelValue(*m.cfg.DropReason) + return true } // Name implements Stage diff --git a/pkg/logentry/stages/drop_test.go b/pkg/logentry/stages/drop_test.go index 2958a6a18868..b82066272013 100644 --- a/pkg/logentry/stages/drop_test.go +++ b/pkg/logentry/stages/drop_test.go @@ -47,8 +47,8 @@ func Test_dropStage_Process(t *testing.T) { config *DropConfig labels model.LabelSet extracted map[string]interface{} - t *time.Time - entry *string + t time.Time + entry string shouldDrop bool }{ { @@ -58,8 +58,7 @@ func Test_dropStage_Process(t *testing.T) { }, labels: model.LabelSet{}, extracted: map[string]interface{}{}, - t: nil, - entry: ptrFromString("12345678901"), + entry: "12345678901", shouldDrop: true, }, { @@ -69,8 +68,7 @@ func Test_dropStage_Process(t *testing.T) { }, labels: model.LabelSet{}, extracted: map[string]interface{}{}, - t: nil, - entry: ptrFromString("1234567890"), + entry: "1234567890", shouldDrop: false, }, { @@ -80,8 +78,7 @@ func Test_dropStage_Process(t *testing.T) { }, labels: model.LabelSet{}, extracted: map[string]interface{}{}, - t: nil, - entry: ptrFromString("123456789"), + entry: "123456789", shouldDrop: false, }, { @@ -91,8 +88,7 @@ func Test_dropStage_Process(t *testing.T) { }, labels: model.LabelSet{}, extracted: map[string]interface{}{}, - t: ptrFromTime(time.Now().Add(-2 * time.Hour)), - entry: nil, + t: time.Now().Add(-2 * time.Hour), shouldDrop: true, }, { @@ -102,8 +98,7 @@ func Test_dropStage_Process(t *testing.T) { }, labels: model.LabelSet{}, extracted: map[string]interface{}{}, - t: ptrFromTime(time.Now().Add(-5 * time.Minute)), - entry: nil, + t: time.Now().Add(-5 * time.Minute), shouldDrop: false, }, { @@ -194,7 +189,7 @@ func Test_dropStage_Process(t *testing.T) { Expression: ptrFromString(".*val.*"), }, labels: model.LabelSet{}, - entry: ptrFromString("this is a line which does not match the regex"), + entry: "this is a line which does not match the regex", extracted: map[string]interface{}{}, shouldDrop: false, }, @@ -204,7 +199,7 @@ func Test_dropStage_Process(t *testing.T) { Expression: ptrFromString(".*val.*"), }, labels: model.LabelSet{}, - entry: ptrFromString("this is a line with the word value in it"), + entry: "this is a line with the word value in it", extracted: map[string]interface{}{}, shouldDrop: true, }, @@ -218,8 +213,7 @@ func Test_dropStage_Process(t *testing.T) { extracted: map[string]interface{}{ "key": "pal1", }, - t: nil, - entry: ptrFromString("12345678901"), + entry: "12345678901", shouldDrop: true, }, { @@ -232,8 +226,7 @@ func Test_dropStage_Process(t *testing.T) { extracted: map[string]interface{}{ "key": "pal1", }, - t: nil, - entry: ptrFromString("123456789"), + entry: "123456789", shouldDrop: false, }, { @@ -246,8 +239,7 @@ func Test_dropStage_Process(t *testing.T) { extracted: map[string]interface{}{ "WOOOOOOOOOOOOOO": "pal1", }, - t: nil, - entry: ptrFromString("123456789012"), + entry: "123456789012", shouldDrop: false, }, { @@ -262,8 +254,8 @@ func Test_dropStage_Process(t *testing.T) { extracted: map[string]interface{}{ "key": "must contain value to match", }, - t: ptrFromTime(time.Now().Add(-2 * time.Hour)), - entry: ptrFromString("12345678901"), + t: time.Now().Add(-2 * time.Hour), + entry: "12345678901", shouldDrop: true, }, } @@ -273,15 +265,13 @@ func Test_dropStage_Process(t *testing.T) { if err != nil { t.Error(err) } - m := &dropStage{ - cfg: tt.config, - logger: util.Logger, - } - m.Process(tt.labels, tt.extracted, tt.t, tt.entry) + m, err := newDropStage(util.Logger, tt.config, prometheus.DefaultRegisterer) + require.NoError(t, err) + out := processEntries(m, newEntry(tt.extracted, tt.labels, tt.entry, tt.t)) if tt.shouldDrop { - assert.Contains(t, tt.labels.String(), dropLabel) + assert.Len(t, out, 0) } else { - assert.NotContains(t, tt.labels.String(), dropLabel) + assert.Len(t, out, 1) } }) } @@ -291,31 +281,20 @@ func ptrFromString(str string) *string { return &str } -func ptrFromTime(t time.Time) *time.Time { - return &t -} - // TestDropPipeline is used to verify we properly parse the yaml config and create a working pipeline func TestDropPipeline(t *testing.T) { registry := prometheus.NewRegistry() plName := "test_pipeline" pl, err := NewPipeline(util.Logger, loadConfig(testDropYaml), &plName, registry) require.NoError(t, err) - lbls := model.LabelSet{} - ts := time.Now() - - // Process the first log line which should be dropped - entry := testMatchLogLineApp1 - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Contains(t, lbls.String(), dropLabel) + out := processEntries(pl, + newEntry(nil, nil, testMatchLogLineApp1, time.Now()), + newEntry(nil, nil, testMatchLogLineApp2, time.Now()), + ) - // Process the second line which should not be dropped. - entry = testMatchLogLineApp2 - extracted = map[string]interface{}{} - lbls = model.LabelSet{} - pl.Process(lbls, extracted, &ts, &entry) - assert.NotContains(t, lbls.String(), dropLabel) + // Only the second line will go through. + assert.Len(t, out, 1) + assert.Equal(t, out[0].Line, testMatchLogLineApp2) } var ( diff --git a/pkg/logentry/stages/extensions_test.go b/pkg/logentry/stages/extensions_test.go index f544614cf901..1852db978023 100644 --- a/pkg/logentry/stages/extensions_test.go +++ b/pkg/logentry/stages/extensions_test.go @@ -69,13 +69,11 @@ func TestNewDocker(t *testing.T) { if err != nil { t.Fatalf("failed to create Docker parser: %s", err) } - lbs := toLabelSet(tt.labels) - extr := map[string]interface{}{} - p.Process(lbs, extr, &tt.t, &tt.entry) + out := processEntries(p, newEntry(nil, toLabelSet(tt.labels), tt.entry, tt.t))[0] - assertLabels(t, tt.expectedLabels, lbs) - assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry") - if tt.t.Unix() != tt.expectedT.Unix() { + assertLabels(t, tt.expectedLabels, out.Labels) + assert.Equal(t, tt.expectedEntry, out.Line, "did not receive expected log entry") + if out.Timestamp.Unix() != tt.expectedT.Unix() { t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t) } }) @@ -145,13 +143,11 @@ func TestNewCri(t *testing.T) { if err != nil { t.Fatalf("failed to create CRI parser: %s", err) } - lbs := toLabelSet(tt.labels) - extr := map[string]interface{}{} - p.Process(lbs, extr, &tt.t, &tt.entry) + out := processEntries(p, newEntry(nil, toLabelSet(tt.labels), tt.entry, tt.t))[0] - assertLabels(t, tt.expectedLabels, lbs) - assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry") - if tt.t.Unix() != tt.expectedT.Unix() { + assertLabels(t, tt.expectedLabels, out.Labels) + assert.Equal(t, tt.expectedEntry, out.Line, "did not receive expected log entry") + if out.Timestamp.Unix() != tt.expectedT.Unix() { t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t) } }) diff --git a/pkg/logentry/stages/json.go b/pkg/logentry/stages/json.go index a980408d38ef..c516ce5140e5 100644 --- a/pkg/logentry/stages/json.go +++ b/pkg/logentry/stages/json.go @@ -66,7 +66,7 @@ type jsonStage struct { } // newJSONStage creates a new json pipeline stage from a config. -func newJSONStage(logger log.Logger, config interface{}) (*jsonStage, error) { +func newJSONStage(logger log.Logger, config interface{}) (Stage, error) { cfg, err := parseJSONConfig(config) if err != nil { return nil, err @@ -75,11 +75,11 @@ func newJSONStage(logger log.Logger, config interface{}) (*jsonStage, error) { if err != nil { return nil, err } - return &jsonStage{ + return toStage(&jsonStage{ cfg: cfg, expressions: expressions, logger: log.With(logger, "component", "stage", "type", "json"), - }, nil + }), nil } func parseJSONConfig(config interface{}) (*JSONConfig, error) { diff --git a/pkg/logentry/stages/json_test.go b/pkg/logentry/stages/json_test.go index 60aae3f9bf99..8b94215b4b62 100644 --- a/pkg/logentry/stages/json_test.go +++ b/pkg/logentry/stages/json_test.go @@ -8,7 +8,6 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "gopkg.in/yaml.v2" ) @@ -87,12 +86,8 @@ func TestPipeline_JSON(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} - ts := time.Now() - entry := testData.entry - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, testData.expectedExtract, extracted) + out := processEntries(pl, newEntry(nil, nil, testData.entry, time.Now()))[0] + assert.Equal(t, testData.expectedExtract, out.Extracted) }) } } @@ -364,12 +359,9 @@ func TestJSONParser_Parse(t *testing.T) { if err != nil { t.Fatalf("failed to create json parser: %s", err) } - lbs := model.LabelSet{} - extr := tt.extracted - ts := time.Now() - p.Process(lbs, extr, &ts, &tt.entry) + out := processEntries(p, newEntry(tt.extracted, nil, tt.entry, time.Now()))[0] - assert.Equal(t, tt.expectedExtract, extr) + assert.Equal(t, tt.expectedExtract, out.Extracted) }) } } diff --git a/pkg/logentry/stages/labeldrop.go b/pkg/logentry/stages/labeldrop.go index c04c57feb7bb..f423b6ea4f05 100644 --- a/pkg/logentry/stages/labeldrop.go +++ b/pkg/logentry/stages/labeldrop.go @@ -24,7 +24,7 @@ func validateLabelDropConfig(c LabelDropConfig) error { return nil } -func newLabelDropStage(configs interface{}) (*labelDropStage, error) { +func newLabelDropStage(configs interface{}) (Stage, error) { cfgs := &LabelDropConfig{} err := mapstructure.Decode(configs, cfgs) if err != nil { @@ -36,9 +36,9 @@ func newLabelDropStage(configs interface{}) (*labelDropStage, error) { return nil, err } - return &labelDropStage{ + return toStage(&labelDropStage{ cfgs: *cfgs, - }, nil + }), nil } type labelDropStage struct { diff --git a/pkg/logentry/stages/labeldrop_test.go b/pkg/logentry/stages/labeldrop_test.go index 5987f3ef39b6..1e7baeb91178 100644 --- a/pkg/logentry/stages/labeldrop_test.go +++ b/pkg/logentry/stages/labeldrop_test.go @@ -2,6 +2,7 @@ package stages import ( "testing" + "time" "github.com/cortexproject/cortex/pkg/util" "github.com/prometheus/common/model" @@ -63,8 +64,8 @@ func Test_dropLabelStage_Process(t *testing.T) { if err != nil { t.Fatal(err) } - st.Process(test.inputLabels, map[string]interface{}{}, nil, nil) - assert.Equal(t, test.expectedLabels, test.inputLabels) + out := processEntries(st, newEntry(nil, test.inputLabels, "", time.Now()))[0] + assert.Equal(t, test.expectedLabels, out.Labels) }) } } diff --git a/pkg/logentry/stages/labels.go b/pkg/logentry/stages/labels.go index a2d364ff81a2..4e49c7b037ce 100644 --- a/pkg/logentry/stages/labels.go +++ b/pkg/logentry/stages/labels.go @@ -39,7 +39,7 @@ func validateLabelsConfig(c LabelsConfig) error { } // newLabelStage creates a new label stage to set labels from extracted data -func newLabelStage(logger log.Logger, configs interface{}) (*labelStage, error) { +func newLabelStage(logger log.Logger, configs interface{}) (Stage, error) { cfgs := &LabelsConfig{} err := mapstructure.Decode(configs, cfgs) if err != nil { @@ -49,10 +49,10 @@ func newLabelStage(logger log.Logger, configs interface{}) (*labelStage, error) if err != nil { return nil, err } - return &labelStage{ + return toStage(&labelStage{ cfgs: *cfgs, logger: logger, - }, nil + }), nil } // labelStage sets labels from extracted data diff --git a/pkg/logentry/stages/labels_test.go b/pkg/logentry/stages/labels_test.go index 0191a4b79977..532d16e4be4c 100644 --- a/pkg/logentry/stages/labels_test.go +++ b/pkg/logentry/stages/labels_test.go @@ -47,16 +47,13 @@ func TestLabelsPipeline_Labels(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} expectedLbls := model.LabelSet{ "level": "WARN", "app": "loki", } - ts := time.Now() - entry := testLabelsLogLine - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, expectedLbls, lbls) + + out := processEntries(pl, newEntry(nil, nil, testLabelsLogLine, time.Now()))[0] + assert.Equal(t, expectedLbls, out.Labels) } func TestLabelsPipelineWithMissingKey_Labels(t *testing.T) { @@ -67,12 +64,10 @@ func TestLabelsPipelineWithMissingKey_Labels(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} Debug = true - ts := time.Now() - entry := testLabelsLogLineWithMissingKey - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) + + _ = processEntries(pl, newEntry(nil, nil, testLabelsLogLineWithMissingKey, time.Now())) + expectedLog := "level=debug msg=\"failed to convert extracted label value to string\" err=\"Can't convert to string\" type=null" if !(strings.Contains(buf.String(), expectedLog)) { t.Errorf("\nexpected: %s\n+actual: %s", expectedLog, buf.String()) @@ -187,8 +182,9 @@ func TestLabelStage_Process(t *testing.T) { if err != nil { t.Fatal(err) } - st.Process(test.inputLabels, test.extractedData, nil, nil) - assert.Equal(t, test.expectedLabels, test.inputLabels) + + out := processEntries(st, newEntry(test.extractedData, test.inputLabels, "", time.Time{}))[0] + assert.Equal(t, test.expectedLabels, out.Labels) }) } } diff --git a/pkg/logentry/stages/match.go b/pkg/logentry/stages/match.go index c6e94fad43dc..e4746e20e332 100644 --- a/pkg/logentry/stages/match.go +++ b/pkg/logentry/stages/match.go @@ -1,8 +1,6 @@ package stages import ( - "time" - "github.com/prometheus/prometheus/pkg/labels" "github.com/go-kit/kit/log" @@ -11,7 +9,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/util" ) @@ -108,6 +108,7 @@ func newMatcherStage(logger log.Logger, jobName *string, config interface{}, reg return &matcherStage{ dropReason: dropReason, + dropCount: getDropCountMetric(registerer), matchers: selector.Matchers(), stage: pl, action: cfg.Action, @@ -115,40 +116,110 @@ func newMatcherStage(logger log.Logger, jobName *string, config interface{}, reg }, nil } +func getDropCountMetric(registerer prometheus.Registerer) *prometheus.CounterVec { + dropCount := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "logentry", + Name: "dropped_lines_total", + Help: "A count of all log lines dropped as a result of a pipeline stage", + }, []string{"reason"}) + err := registerer.Register(dropCount) + if err != nil { + if existing, ok := err.(prometheus.AlreadyRegisteredError); ok { + dropCount = existing.ExistingCollector.(*prometheus.CounterVec) + } else { + // Same behavior as MustRegister if the error is not for AlreadyRegistered + panic(err) + } + } + return dropCount +} + // matcherStage applies Label matchers to determine if the include stages should be run type matcherStage struct { dropReason string + dropCount *prometheus.CounterVec matchers []*labels.Matcher pipeline logql.Pipeline stage Stage action string } -// Process implements Stage -func (m *matcherStage) Process(lbs model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { - for _, filter := range m.matchers { - if !filter.Matches(string(lbs[model.LabelName(filter.Name)])) { - return - } +func (m *matcherStage) Run(in chan Entry) chan Entry { + switch m.action { + case MatchActionDrop: + return m.runDrop(in) + case MatchActionKeep: + return m.runKeep(in) } + panic("unexpected action") +} - sp := m.pipeline.ForStream(labels.FromMap(util.ModelLabelSetToMap(lbs))) - if newLine, newLabels, ok := sp.ProcessString(*entry); ok { - switch m.action { - case MatchActionDrop: - // Adds the drop label to not be sent by the api.EntryHandler - lbs[dropLabel] = model.LabelValue(m.dropReason) - case MatchActionKeep: - *entry = newLine - for k := range lbs { - delete(lbs, k) +func (m *matcherStage) runKeep(in chan Entry) chan Entry { + next := make(chan Entry) + out := make(chan Entry) + outNext := m.stage.Run(next) + go func() { + defer close(out) + for e := range outNext { + out <- e + } + }() + go func() { + defer close(next) + for e := range in { + e, ok := m.processLogQL(e) + if !ok { + out <- e + continue } - for _, l := range newLabels.Labels() { - lbs[model.LabelName(l.Name)] = model.LabelValue(l.Value) + next <- e + } + }() + return out +} + +func (m *matcherStage) runDrop(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + for e := range in { + if e, ok := m.processLogQL(e); !ok { + out <- e + continue } - m.stage.Process(lbs, extracted, t, entry) + m.dropCount.WithLabelValues(m.dropReason).Inc() + } + }() + return out +} + +func (m *matcherStage) processLogQL(e Entry) (Entry, bool) { + for _, filter := range m.matchers { + if !filter.Matches(string(e.Labels[model.LabelName(filter.Name)])) { + return e, false } } + sp := m.pipeline.ForStream(labels.FromMap(util.ModelLabelSetToMap(e.Labels))) + newLine, newLabels, ok := sp.ProcessString(e.Line) + if !ok { + return e, false + } + for k := range e.Labels { + delete(e.Labels, k) + } + for _, l := range newLabels.Labels() { + e.Labels[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + return Entry{ + Extracted: e.Extracted, + Entry: api.Entry{ + Labels: e.Labels, + Entry: logproto.Entry{ + Line: newLine, + Timestamp: e.Timestamp, + }, + }, + }, true } // Name implements Stage diff --git a/pkg/logentry/stages/match_test.go b/pkg/logentry/stages/match_test.go index 112876ca895a..ee285dbc6523 100644 --- a/pkg/logentry/stages/match_test.go +++ b/pkg/logentry/stages/match_test.go @@ -1,15 +1,12 @@ package stages import ( - "bytes" "fmt" "testing" "time" "github.com/cortexproject/cortex/pkg/util" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/expfmt" - "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" ) @@ -64,36 +61,24 @@ func TestMatchPipeline(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} - ts := time.Now() - // Process the first log line which should extract the output from the `message` field - entry := testMatchLogLineApp1 - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, "app1 log line", entry) - // Process the second log line which should extract the output from the `msg` field - entry = testMatchLogLineApp2 - extracted = map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, "app2 log line", entry) + in := make(chan Entry) - got, err := registry.Gather() - if err != nil { - t.Fatalf("gathering metrics failed: %s", err) - } - var gotBuf bytes.Buffer - enc := expfmt.NewEncoder(&gotBuf, expfmt.FmtText) - for _, mf := range got { - if err := enc.Encode(mf); err != nil { - t.Fatalf("encoding gathered metrics failed: %s", err) - } - } - gotStr := gotBuf.String() - // We should only get metrics from the main pipeline and the second match which defines the pipeline_name - assert.Contains(t, gotStr, "logentry_pipeline_duration_seconds_bucket{job_name=\"test_pipeline\"") - assert.Contains(t, gotStr, "logentry_pipeline_duration_seconds_bucket{job_name=\"test_pipeline_app2\"") - assert.NotContains(t, gotStr, "logentry_pipeline_duration_seconds_bucket{job_name=\"test_pipeline_app1\"") + out := pl.Run(in) + + in <- newEntry(nil, nil, testMatchLogLineApp1, time.Now()) + + e := <-out + + assert.Equal(t, "app1 log line", e.Line) + + // Process the second log line which should extract the output from the `msg` field + e.Line = testMatchLogLineApp2 + e.Extracted = map[string]interface{}{} + in <- e + e = <-out + assert.Equal(t, "app2 log line", e.Line) + close(in) } func TestMatcher(t *testing.T) { @@ -171,24 +156,24 @@ func TestMatcher(t *testing.T) { return } if s != nil { - ts, entry := time.Now(), "foo" - extracted := map[string]interface{}{ + + out := processEntries(s, newEntry(map[string]interface{}{ "test_label": "unimportant value", - } - labels := toLabelSet(tt.labels) - s.Process(labels, extracted, &ts, &entry) + }, toLabelSet(tt.labels), "foo", time.Now())) + if tt.shouldDrop { + if len(out) != 0 { + t.Errorf("stage should have been dropped but got %v", out) + } + return + } // test_label should only be in the label set if the stage ran - if _, ok := labels["test_label"]; ok { + if _, ok := out[0].Labels["test_label"]; ok { if !tt.shouldRun { t.Error("stage ran but should have not") } } - if tt.shouldDrop { - if _, ok := labels[dropLabel]; !ok { - t.Error("stage should have been dropped") - } - } + } }) } diff --git a/pkg/logentry/stages/metrics.go b/pkg/logentry/stages/metrics.go index 684dda4423e2..0453372d4de9 100644 --- a/pkg/logentry/stages/metrics.go +++ b/pkg/logentry/stages/metrics.go @@ -85,7 +85,7 @@ func validateMetricsConfig(cfg MetricsConfig) error { } // newMetricStage creates a new set of metrics to process for each log entry -func newMetricStage(logger log.Logger, config interface{}, registry prometheus.Registerer) (*metricStage, error) { +func newMetricStage(logger log.Logger, config interface{}, registry prometheus.Registerer) (Stage, error) { cfgs := &MetricsConfig{} err := mapstructure.Decode(config, cfgs) if err != nil { @@ -128,11 +128,11 @@ func newMetricStage(logger log.Logger, config interface{}, registry prometheus.R metrics[name] = collector } } - return &metricStage{ + return toStage(&metricStage{ logger: logger, cfg: *cfgs, metrics: metrics, - }, nil + }), nil } // metricStage creates and updates prometheus metrics based on extracted pipeline data @@ -144,9 +144,6 @@ type metricStage struct { // Process implements Stage func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { - if _, ok := labels[dropLabel]; ok { - return - } for name, collector := range m.metrics { // There is a special case for counters where we count even if there is no match in the extracted map. if c, ok := collector.(*metric.Counters); ok { diff --git a/pkg/logentry/stages/metrics_test.go b/pkg/logentry/stages/metrics_test.go index 7722c6aa4b01..0505cf99af22 100644 --- a/pkg/logentry/stages/metrics_test.go +++ b/pkg/logentry/stages/metrics_test.go @@ -114,14 +114,10 @@ func TestMetricsPipeline(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} - lbls["test"] = "app" - ts := time.Now() - extracted := map[string]interface{}{} - entry := testMetricLogLine1 - pl.Process(lbls, extracted, &ts, &entry) - entry = testMetricLogLine2 - pl.Process(lbls, extracted, &ts, &entry) + + out := <-pl.Run(withInboundEntries(newEntry(nil, model.LabelSet{"test": "app"}, testMetricLogLine1, time.Now()))) + out.Line = testMetricLogLine2 + <-pl.Run(withInboundEntries(out)) if err := testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics)); err != nil { @@ -137,12 +133,8 @@ func TestPipelineWithMissingKey_Metrics(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} Debug = true - ts := time.Now() - entry := testMetricLogLineWithMissingKey - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) + processEntries(pl, newEntry(nil, nil, testMetricLogLineWithMissingKey, time.Now())) expectedLog := "level=debug msg=\"failed to convert extracted value to string, can't perform value comparison\" metric=bloki_count err=\"can't convert to string\"" if !(strings.Contains(buf.String(), expectedLog)) { t.Errorf("\nexpected: %s\n+actual: %s", expectedLog, buf.String()) @@ -167,9 +159,12 @@ pipeline_stages: action: inc ` -const expectedDropMetrics = `# HELP promtail_custom_loki_count should only inc on non dropped labels +const expectedDropMetrics = `# HELP logentry_dropped_lines_total A count of all log lines dropped as a result of a pipeline stage +# TYPE logentry_dropped_lines_total counter +logentry_dropped_lines_total{reason="match_stage"} 1 +# HELP promtail_custom_loki_count should only inc on non dropped labels # TYPE promtail_custom_loki_count counter -promtail_custom_loki_count 1.0 +promtail_custom_loki_count 1 ` func TestMetricsWithDropInPipeline(t *testing.T) { @@ -182,13 +177,16 @@ func TestMetricsWithDropInPipeline(t *testing.T) { droppingLabels := model.LabelSet{ "drop": "true", } + in := make(chan Entry) + out := pl.Run(in) - ts := time.Now() - extracted := map[string]interface{}{} - entry := testMetricLogLine1 - pl.Process(lbls, extracted, &ts, &entry) - entry = testMetricLogLine2 - pl.Process(droppingLabels, extracted, &ts, &entry) + in <- newEntry(nil, lbls, testMetricLogLine1, time.Now()) + e := <-out + e.Labels = droppingLabels + e.Line = testMetricLogLine2 + in <- e + close(in) + <-out if err := testutil.GatherAndCompare(registry, strings.NewReader(expectedDropMetrics)); err != nil { @@ -198,7 +196,7 @@ func TestMetricsWithDropInPipeline(t *testing.T) { var metricTestInvalidIdle = "10f" -func Test(t *testing.T) { +func TestValidateMetricsConfig(t *testing.T) { tests := map[string]struct { config MetricsConfig err error @@ -266,7 +264,7 @@ func TestDefaultIdleDuration(t *testing.T) { if err != nil { t.Fatalf("failed to create stage with metrics: %v", err) } - assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*metricStage).cfg["total_keys"].maxIdleSec) + assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*stageProcessor).Processor.(*metricStage).cfg["total_keys"].maxIdleSec) } var labelFoo = model.LabelSet(map[model.LabelName]model.LabelValue{"foo": "bar", "bar": "foo"}) @@ -372,15 +370,13 @@ func TestMetricStage_Process(t *testing.T) { if err != nil { t.Fatalf("failed to create stage with metrics: %v", err) } - var ts = time.Now() - var entry = logFixture - extr := map[string]interface{}{} - jsonStage.Process(labelFoo, extr, &ts, &entry) - regexStage.Process(labelFoo, extr, &ts, ®exLogFixture) - metricStage.Process(labelFoo, extr, &ts, &entry) + out := processEntries(jsonStage, newEntry(nil, labelFoo, logFixture, time.Now())) + out[0].Line = regexLogFixture + out = processEntries(regexStage, out...) + out = processEntries(metricStage, out...) + out[0].Labels = labelFu // Process the same extracted values again with different labels so we can verify proper metric/label assignments - metricStage.Process(labelFu, extr, &ts, &entry) - + _ = processEntries(metricStage, out...) names := metricNames(metricsConfig) if err := testutil.GatherAndCompare(registry, strings.NewReader(goldenMetrics), names...); err != nil { diff --git a/pkg/logentry/stages/output.go b/pkg/logentry/stages/output.go index 2201f50241a4..d187f57b59d7 100644 --- a/pkg/logentry/stages/output.go +++ b/pkg/logentry/stages/output.go @@ -34,7 +34,7 @@ func validateOutputConfig(cfg *OutputConfig) error { } // newOutputStage creates a new outputStage -func newOutputStage(logger log.Logger, config interface{}) (*outputStage, error) { +func newOutputStage(logger log.Logger, config interface{}) (Stage, error) { cfg := &OutputConfig{} err := mapstructure.Decode(config, cfg) if err != nil { @@ -44,10 +44,10 @@ func newOutputStage(logger log.Logger, config interface{}) (*outputStage, error) if err != nil { return nil, err } - return &outputStage{ + return toStage(&outputStage{ cfgs: cfg, logger: logger, - }, nil + }), nil } // outputStage will mutate the incoming entry and set it from extracted data diff --git a/pkg/logentry/stages/output_test.go b/pkg/logentry/stages/output_test.go index 351eed90b3fb..760617b0ca67 100644 --- a/pkg/logentry/stages/output_test.go +++ b/pkg/logentry/stages/output_test.go @@ -10,7 +10,6 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" ) @@ -48,12 +47,9 @@ func TestPipeline_Output(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} - ts := time.Now() - entry := testOutputLogLine - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, "this is a log line", entry) + out := processEntries(pl, newEntry(nil, nil, testOutputLogLine, time.Now()))[0] + + assert.Equal(t, "this is a log line", out.Line) } func TestPipelineWithMissingKey_Output(t *testing.T) { @@ -64,12 +60,8 @@ func TestPipelineWithMissingKey_Output(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} Debug = true - ts := time.Now() - entry := testOutputLogLineWithMissingKey - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) + _ = processEntries(pl, newEntry(nil, nil, testOutputLogLineWithMissingKey, time.Now())) expectedLog := "level=debug msg=\"extracted output could not be converted to a string\" err=\"Can't convert to string\" type=null" if !(strings.Contains(buf.String(), expectedLog)) { t.Errorf("\nexpected: %s\n+actual: %s", expectedLog, buf.String()) @@ -134,10 +126,9 @@ func TestOutputStage_Process(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} - entry := "replaceme" - st.Process(lbls, test.extracted, nil, &entry) - assert.Equal(t, test.expectedOutput, entry) + out := processEntries(st, newEntry(test.extracted, nil, "replaceme", time.Time{}))[0] + + assert.Equal(t, test.expectedOutput, out.Line) }) } } diff --git a/pkg/logentry/stages/pipeline.go b/pkg/logentry/stages/pipeline.go index dafa6e50e80f..d982bd527426 100644 --- a/pkg/logentry/stages/pipeline.go +++ b/pkg/logentry/stages/pipeline.go @@ -1,19 +1,15 @@ package stages import ( - "time" + "sync" "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/promtail/api" ) -const dropLabel = "__drop__" - // PipelineStages contains configuration for each stage within a pipeline type PipelineStages = []interface{} @@ -22,45 +18,13 @@ type PipelineStage = map[interface{}]interface{} // Pipeline pass down a log entry to each stage for mutation and/or label extraction. type Pipeline struct { - logger log.Logger - stages []Stage - jobName *string - plDuration *prometheus.HistogramVec - dropCount *prometheus.CounterVec + logger log.Logger + stages []Stage + jobName *string } // NewPipeline creates a new log entry pipeline from a configuration func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, registerer prometheus.Registerer) (*Pipeline, error) { - hist := prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "logentry", - Name: "pipeline_duration_seconds", - Help: "Label and metric extraction pipeline processing time, in seconds", - Buckets: []float64{.000005, .000010, .000025, .000050, .000100, .000250, .000500, .001000, .002500, .005000, .010000, .025000}, - }, []string{"job_name"}) - err := registerer.Register(hist) - if err != nil { - if existing, ok := err.(prometheus.AlreadyRegisteredError); ok { - hist = existing.ExistingCollector.(*prometheus.HistogramVec) - } else { - // Same behavior as MustRegister if the error is not for AlreadyRegistered - panic(err) - } - } - dropCount := prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "logentry", - Name: "dropped_lines_total", - Help: "A count of all log lines dropped as a result of a pipeline stage", - }, []string{"reason"}) - err = registerer.Register(dropCount) - if err != nil { - if existing, ok := err.(prometheus.AlreadyRegisteredError); ok { - dropCount = existing.ExistingCollector.(*prometheus.CounterVec) - } else { - // Same behavior as MustRegister if the error is not for AlreadyRegistered - panic(err) - } - } - st := []Stage{} for _, s := range stgs { stage, ok := s.(PipelineStage) @@ -84,37 +48,39 @@ func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, regist } } return &Pipeline{ - logger: log.With(logger, "component", "pipeline"), - stages: st, - jobName: jobName, - plDuration: hist, - dropCount: dropCount, + logger: log.With(logger, "component", "pipeline"), + stages: st, + jobName: jobName, }, nil } -// Process implements Stage allowing a pipeline stage to also be an entire pipeline -func (p *Pipeline) Process(labels model.LabelSet, extracted map[string]interface{}, ts *time.Time, entry *string) { - start := time.Now() - - // Initialize the extracted map with the initial labels (ie. "filename"), - // so that stages can operate on initial labels too - for labelName, labelValue := range labels { - extracted[string(labelName)] = string(labelValue) - } +// RunWith will reads from the input channel entries, mutate them with the process function and returns them via the output channel. +func RunWith(input chan Entry, process func(e Entry) Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + for e := range input { + out <- process(e) + } + }() + return out +} - for i, stage := range p.stages { - if Debug { - level.Debug(p.logger).Log("msg", "processing pipeline", "stage", i, "name", stage.Name(), "labels", labels, "time", ts, "entry", entry) +// Run implements Stage +func (p *Pipeline) Run(in chan Entry) chan Entry { + in = RunWith(in, func(e Entry) Entry { + // Initialize the extracted map with the initial labels (ie. "filename"), + // so that stages can operate on initial labels too + for labelName, labelValue := range e.Labels { + e.Extracted[string(labelName)] = string(labelValue) } - stage.Process(labels, extracted, ts, entry) - } - dur := time.Since(start).Seconds() - if Debug { - level.Debug(p.logger).Log("msg", "finished processing log line", "labels", labels, "time", ts, "entry", entry, "duration_s", dur) - } - if p.jobName != nil { - p.plDuration.WithLabelValues(*p.jobName).Observe(dur) + return e + }) + // chain all stages together. + for _, m := range p.stages { + in = m.Run(in) } + return in } // Name implements Stage @@ -124,26 +90,34 @@ func (p *Pipeline) Name() string { // Wrap implements EntryMiddleware func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler { - return api.EntryHandlerFunc(func(labels model.LabelSet, timestamp time.Time, line string) error { - extracted := map[string]interface{}{} - p.Process(labels, extracted, ×tamp, &line) - // if the labels set contains the __drop__ label we don't send this entry to the next EntryHandler - if reason, ok := labels[dropLabel]; ok { - if reason == "" { - reason = "undefined" + handlerIn := make(chan api.Entry) + nextChan := next.Chan() + wg, once := sync.WaitGroup{}, sync.Once{} + pipelineIn := make(chan Entry) + pipelineOut := p.Run(pipelineIn) + wg.Add(2) + go func() { + defer wg.Done() + for e := range pipelineOut { + nextChan <- e.Entry + } + }() + go func() { + defer wg.Done() + defer close(pipelineIn) + for e := range handlerIn { + pipelineIn <- Entry{ + Extracted: map[string]interface{}{}, + Entry: e, } - p.dropCount.WithLabelValues(string(reason)).Inc() - return nil } - return next.Handle(labels, timestamp, line) + }() + return api.NewEntryHandler(handlerIn, func() { + once.Do(func() { close(handlerIn) }) + wg.Wait() }) } -// AddStage adds a stage to the pipeline -func (p *Pipeline) AddStage(stage Stage) { - p.stages = append(p.stages, stage) -} - // Size gets the current number of stages in the pipeline func (p *Pipeline) Size() int { return len(p.stages) diff --git a/pkg/logentry/stages/pipeline_test.go b/pkg/logentry/stages/pipeline_test.go index 6a01fe5fadae..f3ee08aaa047 100644 --- a/pkg/logentry/stages/pipeline_test.go +++ b/pkg/logentry/stages/pipeline_test.go @@ -12,6 +12,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" + "github.com/grafana/loki/pkg/promtail/client/fake" ) var ( @@ -38,6 +42,9 @@ pipeline_stages: action: service: status_code: "status" +- match: + selector: "{match=\"false\"}" + action: drop ` var testLabelsFromJSONYaml = ` @@ -52,6 +59,24 @@ pipeline_stages: source: message ` +func withInboundEntries(entries ...Entry) chan Entry { + in := make(chan Entry, len(entries)) + defer close(in) + for _, e := range entries { + in <- e + } + return in +} + +func processEntries(s Stage, entries ...Entry) []Entry { + out := s.Run(withInboundEntries(entries...)) + var res []Entry + for e := range out { + res = append(res, e) + } + return res +} + func loadConfig(yml string) PipelineStages { var config map[string]interface{} err := yaml.Unmarshal([]byte(yml), &config) @@ -67,7 +92,7 @@ func TestNewPipeline(t *testing.T) { if err != nil { panic(err) } - require.Equal(t, 1, len(p.stages)) + require.Len(t, p.stages, 2) } func TestPipeline_Process(t *testing.T) { @@ -178,12 +203,11 @@ func TestPipeline_Process(t *testing.T) { p, err := NewPipeline(util.Logger, config["pipeline_stages"].([]interface{}), nil, prometheus.DefaultRegisterer) require.NoError(t, err) - extracted := map[string]interface{}{} - p.Process(tt.initialLabels, extracted, &tt.t, &tt.entry) + out := processEntries(p, newEntry(nil, tt.initialLabels, tt.entry, tt.t))[0] - assert.Equal(t, tt.expectedLabels, tt.initialLabels, "did not get expected labels") - assert.Equal(t, tt.expectedEntry, tt.entry, "did not receive expected log entry") - if tt.t.Unix() != tt.expectedT.Unix() { + assert.Equal(t, tt.expectedLabels, out.Labels, "did not get expected labels") + assert.Equal(t, tt.expectedEntry, out.Line, "did not receive expected log entry") + if out.Timestamp.Unix() != tt.expectedT.Unix() { t.Fatalf("mismatch ts want: %s got:%s", tt.expectedT, tt.t) } }) @@ -224,24 +248,24 @@ func BenchmarkPipeline(b *testing.B) { } lb := model.LabelSet{} ts := time.Now() + + in := make(chan Entry) + out := pl.Run(in) + b.ResetTimer() + + go func() { + for range out { + + } + }() for i := 0; i < b.N; i++ { - entry := bm.entry - extracted := map[string]interface{}{} - pl.Process(lb, extracted, &ts, &entry) + in <- newEntry(nil, lb, bm.entry, ts) } + close(in) }) } } -type stubHandler struct { - bool -} - -func (s *stubHandler) Handle(labels model.LabelSet, time time.Time, entry string) error { - s.bool = true - return nil -} - func TestPipeline_Wrap(t *testing.T) { now := time.Now() var config map[string]interface{} @@ -260,10 +284,10 @@ func TestPipeline_Wrap(t *testing.T) { }{ "should drop": { map[model.LabelName]model.LabelValue{ - dropLabel: "true", "stream": "stderr", "action": "GET", "status_code": "200", + "match": "false", }, false, }, @@ -281,14 +305,25 @@ func TestPipeline_Wrap(t *testing.T) { tt := tt t.Run(tName, func(t *testing.T) { t.Parallel() - extracted := map[string]interface{}{} - p.Process(tt.labels, extracted, &now, &rawTestLine) - stub := &stubHandler{} - handler := p.Wrap(stub) - if err := handler.Handle(tt.labels, now, rawTestLine); err != nil { - t.Fatalf("failed to handle entry: %v", err) + c := fake.New(func() {}) + handler := p.Wrap(c) + + handler.Chan() <- api.Entry{ + Labels: tt.labels, + Entry: logproto.Entry{ + Line: rawTestLine, + Timestamp: now, + }, + } + handler.Stop() + c.Stop() + var received bool + + if len(c.Received()) != 0 { + received = true } - assert.Equal(t, stub.bool, tt.shouldSend) + + assert.Equal(t, tt.shouldSend, received) }) } diff --git a/pkg/logentry/stages/regex.go b/pkg/logentry/stages/regex.go index c53a4ade8507..d91f89c841d6 100644 --- a/pkg/logentry/stages/regex.go +++ b/pkg/logentry/stages/regex.go @@ -65,11 +65,11 @@ func newRegexStage(logger log.Logger, config interface{}) (Stage, error) { if err != nil { return nil, err } - return ®exStage{ + return toStage(®exStage{ cfg: cfg, expression: expression, logger: log.With(logger, "component", "stage", "type", "regex"), - }, nil + }), nil } // parseRegexConfig processes an incoming configuration into a RegexConfig diff --git a/pkg/logentry/stages/regex_test.go b/pkg/logentry/stages/regex_test.go index 70f8c028ef2c..8773b4d564c5 100644 --- a/pkg/logentry/stages/regex_test.go +++ b/pkg/logentry/stages/regex_test.go @@ -106,12 +106,8 @@ func TestPipeline_Regex(t *testing.T) { t.Fatal(err) } - lbls := model.LabelSet{} - ts := time.Now() - entry := testData.entry - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, testData.expectedExtract, extracted) + out := processEntries(pl, newEntry(nil, nil, testData.entry, time.Now()))[0] + assert.Equal(t, testData.expectedExtract, out.Extracted) }) } } @@ -124,12 +120,9 @@ func TestPipelineWithMissingKey_Regex(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} Debug = true - ts := time.Now() - entry := testRegexLogLineWithMissingKey - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) + _ = processEntries(pl, newEntry(nil, nil, testRegexLogLineWithMissingKey, time.Now()))[0] + expectedLog := "level=debug component=stage type=regex msg=\"failed to convert source value to string\" source=time err=\"Can't convert to string\" type=null" if !(strings.Contains(buf.String(), expectedLog)) { t.Errorf("\nexpected: %s\n+actual: %s", expectedLog, buf.String()) @@ -333,11 +326,8 @@ func TestRegexParser_Parse(t *testing.T) { if err != nil { t.Fatalf("failed to create regex parser: %s", err) } - lbs := model.LabelSet{} - extr := tt.extracted - ts := time.Now() - p.Process(lbs, extr, &ts, &tt.entry) - assert.Equal(t, tt.expectedExtract, extr) + out := processEntries(p, newEntry(tt.extracted, nil, tt.entry, time.Now()))[0] + assert.Equal(t, tt.expectedExtract, out.Extracted) }) } @@ -378,10 +368,18 @@ func BenchmarkRegexStage(b *testing.B) { labels := model.LabelSet{} ts := time.Now() extr := map[string]interface{}{} + + in := make(chan Entry) + out := stage.Run(in) + go func() { + for range out { + + } + }() for i := 0; i < b.N; i++ { - entry := bm.entry - stage.Process(labels, extr, &ts, &entry) + in <- newEntry(extr, labels, bm.entry, ts) } + close(in) }) } } diff --git a/pkg/logentry/stages/replace.go b/pkg/logentry/stages/replace.go index c548309c1e2d..c24ced7c61c7 100644 --- a/pkg/logentry/stages/replace.go +++ b/pkg/logentry/stages/replace.go @@ -66,11 +66,11 @@ func newReplaceStage(logger log.Logger, config interface{}) (Stage, error) { return nil, err } - return &replaceStage{ + return toStage(&replaceStage{ cfg: cfg, expression: expression, logger: log.With(logger, "component", "stage", "type", "replace"), - }, nil + }), nil } // parseReplaceConfig processes an incoming configuration into a ReplaceConfig diff --git a/pkg/logentry/stages/replace_test.go b/pkg/logentry/stages/replace_test.go index c94d7af92461..4e2176ed82de 100644 --- a/pkg/logentry/stages/replace_test.go +++ b/pkg/logentry/stages/replace_test.go @@ -8,14 +8,13 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "gopkg.in/yaml.v2" ) var testReplaceYamlSingleStageWithoutSource = ` -pipeline_stages: -- replace: +pipeline_stages: +- replace: expression: "11.11.11.11 - (\\S+) .*" replace: "dummy" ` @@ -32,19 +31,19 @@ pipeline_stages: ` var testReplaceYamlWithNamedCaputedGroupWithTemplate = ` ---- -pipeline_stages: - - - replace: +--- +pipeline_stages: + - + replace: expression: "^(?P\\S+) (?P\\S+) (?P\\S+) \\[(?P[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P\\S+)\\s?(?P\\S+)?\\s?(?P\\S+)?\" (?P\\d{3}|-) (\\d+|-)\\s?\"?(?P[^\"]*)\"?\\s?\"?(?P[^\"]*)?\"?$" replace: '{{ if eq .Value "200" }}{{ Replace .Value "200" "HttpStatusOk" -1 }}{{ else }}{{ .Value | ToUpper }}{{ end }}' ` var testReplaceYamlWithTemplate = ` ---- -pipeline_stages: - - - replace: +--- +pipeline_stages: + - + replace: expression: "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+)\\s?(\\S+)?\\s?(\\S+)?\" (\\d{3}|-) (\\d+|-)\\s?\"?([^\"]*)\"?\\s?\"?([^\"]*)?\"?$" replace: '{{ if eq .Value "200" }}{{ Replace .Value "200" "HttpStatusOk" -1 }}{{ else }}{{ .Value | ToUpper }}{{ end }}' ` @@ -126,14 +125,9 @@ func TestPipeline_Replace(t *testing.T) { if err != nil { t.Fatal(err) } - - lbls := model.LabelSet{} - ts := time.Now() - entry := testData.entry - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, testData.expectedEntry, entry) - assert.Equal(t, testData.extracted, extracted) + out := processEntries(pl, newEntry(nil, nil, testData.entry, time.Now()))[0] + assert.Equal(t, testData.expectedEntry, out.Line) + assert.Equal(t, testData.extracted, out.Extracted) }) } } diff --git a/pkg/logentry/stages/stage.go b/pkg/logentry/stages/stage.go index 144e006ee14f..28eb917c2891 100644 --- a/pkg/logentry/stages/stage.go +++ b/pkg/logentry/stages/stage.go @@ -7,6 +7,8 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/promtail/api" ) const ( @@ -27,19 +29,38 @@ const ( StageTypeDrop = "drop" ) -// Stage takes an existing set of labels, timestamp and log entry and returns either a possibly mutated +// Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated // timestamp and log entry -type Stage interface { +type Processor interface { Process(labels model.LabelSet, extracted map[string]interface{}, time *time.Time, entry *string) Name() string } -// StageFunc is modelled on http.HandlerFunc. -type StageFunc func(labels model.LabelSet, extracted map[string]interface{}, time *time.Time, entry *string) +type Entry struct { + Extracted map[string]interface{} + api.Entry +} + +// Stage can receive entries via an inbound channel and forward mutated entries to an outbound channel. +type Stage interface { + Name() string + Run(chan Entry) chan Entry +} + +// stageProcessor Allow to transform a Processor (old synchronous pipeline stage) into an async Stage +type stageProcessor struct { + Processor +} + +func (s stageProcessor) Run(in chan Entry) chan Entry { + return RunWith(in, func(e Entry) Entry { + s.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line) + return e + }) +} -// Process implements EntryHandler. -func (s StageFunc) Process(labels model.LabelSet, extracted map[string]interface{}, time *time.Time, entry *string) { - s(labels, extracted, time, entry) +func toStage(p Processor) Stage { + return &stageProcessor{Processor: p} } // New creates a new stage for the given type and configuration. @@ -114,7 +135,7 @@ func New(logger log.Logger, jobName *string, stageType string, return nil, err } case StageTypeDrop: - s, err = newDropStage(logger, cfg) + s, err = newDropStage(logger, cfg, registerer) if err != nil { return nil, err } diff --git a/pkg/logentry/stages/template.go b/pkg/logentry/stages/template.go index c8d0152cd518..31698f808fae 100644 --- a/pkg/logentry/stages/template.go +++ b/pkg/logentry/stages/template.go @@ -74,7 +74,7 @@ func validateTemplateConfig(cfg *TemplateConfig) (*template.Template, error) { } // newTemplateStage creates a new templateStage -func newTemplateStage(logger log.Logger, config interface{}) (*templateStage, error) { +func newTemplateStage(logger log.Logger, config interface{}) (Stage, error) { cfg := &TemplateConfig{} err := mapstructure.Decode(config, cfg) if err != nil { @@ -85,11 +85,11 @@ func newTemplateStage(logger log.Logger, config interface{}) (*templateStage, er return nil, err } - return &templateStage{ + return toStage(&templateStage{ cfgs: cfg, logger: logger, template: t, - }, nil + }), nil } // templateStage will mutate the incoming entry and set it from extracted data diff --git a/pkg/logentry/stages/template_test.go b/pkg/logentry/stages/template_test.go index b77179298468..0c09e90d9175 100644 --- a/pkg/logentry/stages/template_test.go +++ b/pkg/logentry/stages/template_test.go @@ -60,17 +60,13 @@ func TestPipeline_Template(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} expectedLbls := model.LabelSet{ "app": "LOKI doki", "level": "OK", "type": "TEST", } - ts := time.Now() - entry := testTemplateLogLine - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, expectedLbls, lbls) + out := processEntries(pl, newEntry(nil, nil, testTemplateLogLine, time.Now()))[0] + assert.Equal(t, expectedLbls, out.Labels) } func TestPipelineWithMissingKey_Template(t *testing.T) { @@ -81,12 +77,10 @@ func TestPipelineWithMissingKey_Template(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} Debug = true - ts := time.Now() - entry := testTemplateLogLineWithMissingKey - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) + + _ = processEntries(pl, newEntry(nil, nil, testTemplateLogLineWithMissingKey, time.Now())) + expectedLog := "level=debug msg=\"extracted template could not be converted to a string\" err=\"Can't convert to string\" type=null" if !(strings.Contains(buf.String(), expectedLog)) { t.Errorf("\nexpected: %s\n+actual: %s", expectedLog, buf.String()) @@ -375,10 +369,9 @@ func TestTemplateStage_Process(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} - entry := "not important for this test" - st.Process(lbls, test.extracted, nil, &entry) - assert.Equal(t, test.expectedExtracted, test.extracted) + + out := processEntries(st, newEntry(test.expectedExtracted, nil, "not important for this test", time.Time{}))[0] + assert.Equal(t, test.expectedExtracted, out.Extracted) }) } } diff --git a/pkg/logentry/stages/tenant.go b/pkg/logentry/stages/tenant.go index 010488fc766e..1ee2863c0a80 100644 --- a/pkg/logentry/stages/tenant.go +++ b/pkg/logentry/stages/tenant.go @@ -42,7 +42,7 @@ func validateTenantConfig(c TenantConfig) error { } // newTenantStage creates a new tenant stage to override the tenant ID from extracted data -func newTenantStage(logger log.Logger, configs interface{}) (*tenantStage, error) { +func newTenantStage(logger log.Logger, configs interface{}) (Stage, error) { cfg := TenantConfig{} err := mapstructure.Decode(configs, &cfg) if err != nil { @@ -54,10 +54,10 @@ func newTenantStage(logger log.Logger, configs interface{}) (*tenantStage, error return nil, err } - return &tenantStage{ + return toStage(&tenantStage{ cfg: cfg, logger: logger, - }, nil + }), nil } // Process implements Stage diff --git a/pkg/logentry/stages/tenant_test.go b/pkg/logentry/stages/tenant_test.go index dec3d9626760..0999ec71ad6a 100644 --- a/pkg/logentry/stages/tenant_test.go +++ b/pkg/logentry/stages/tenant_test.go @@ -43,12 +43,9 @@ func TestPipelineWithMissingKey_Tenant(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} Debug = true - ts := time.Now() - entry := testTenantLogLineWithMissingKey - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) + + _ = processEntries(pl, newEntry(nil, nil, testTenantLogLineWithMissingKey, time.Now())) expectedLog := "level=debug msg=\"failed to convert value to string\" err=\"Can't convert to string\" type=null" if !(strings.Contains(buf.String(), expectedLog)) { t.Errorf("\nexpected: %s\n+actual: %s", expectedLog, buf.String()) @@ -178,17 +175,13 @@ func TestTenantStage_Process(t *testing.T) { // Process and dummy line and ensure nothing has changed except // the tenant reserved label - timestamp := time.Unix(1, 1) - entry := "hello world" - labels := testData.inputLabels.Clone() - extracted := testData.inputExtracted - stage.Process(labels, extracted, ×tamp, &entry) + out := processEntries(stage, newEntry(testData.inputExtracted, testData.inputLabels.Clone(), "hello world", time.Unix(1, 1)))[0] - assert.Equal(t, time.Unix(1, 1), timestamp) - assert.Equal(t, "hello world", entry) + assert.Equal(t, time.Unix(1, 1), out.Timestamp) + assert.Equal(t, "hello world", out.Line) - actualTenant, ok := labels[client.ReservedLabelTenantID] + actualTenant, ok := out.Labels[client.ReservedLabelTenantID] if testData.expectedTenant == nil { assert.False(t, ok) } else { diff --git a/pkg/logentry/stages/timestamp.go b/pkg/logentry/stages/timestamp.go index 2b06e3ffaf49..1851b43cb6f1 100644 --- a/pkg/logentry/stages/timestamp.go +++ b/pkg/logentry/stages/timestamp.go @@ -103,7 +103,7 @@ func validateTimestampConfig(cfg *TimestampConfig) (parser, error) { } // newTimestampStage creates a new timestamp extraction pipeline stage. -func newTimestampStage(logger log.Logger, config interface{}) (*timestampStage, error) { +func newTimestampStage(logger log.Logger, config interface{}) (Stage, error) { cfg := &TimestampConfig{} err := mapstructure.Decode(config, cfg) if err != nil { @@ -122,12 +122,12 @@ func newTimestampStage(logger log.Logger, config interface{}) (*timestampStage, } } - return ×tampStage{ + return toStage(×tampStage{ cfg: cfg, logger: logger, parser: parser, lastKnownTimestamps: lastKnownTimestamps, - }, nil + }), nil } // timestampStage will set the timestamp using extracted data diff --git a/pkg/logentry/stages/timestamp_test.go b/pkg/logentry/stages/timestamp_test.go index abc460d018fa..422b8a0f1abd 100644 --- a/pkg/logentry/stages/timestamp_test.go +++ b/pkg/logentry/stages/timestamp_test.go @@ -50,12 +50,8 @@ func TestTimestampPipeline(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} - ts := time.Now() - entry := testTimestampLogLine - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) - assert.Equal(t, time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("", -4*60*60)).Unix(), ts.Unix()) + out := processEntries(pl, newEntry(nil, nil, testTimestampLogLine, time.Now()))[0] + assert.Equal(t, time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("", -4*60*60)).Unix(), out.Timestamp.Unix()) } var ( @@ -72,12 +68,9 @@ func TestPipelineWithMissingKey_Timestamp(t *testing.T) { if err != nil { t.Fatal(err) } - lbls := model.LabelSet{} Debug = true - ts := time.Now() - entry := testTimestampLogLineWithMissingKey - extracted := map[string]interface{}{} - pl.Process(lbls, extracted, &ts, &entry) + _ = processEntries(pl, newEntry(nil, nil, testTimestampLogLineWithMissingKey, time.Now())) + expectedLog := fmt.Sprintf("level=debug msg=\"%s\" err=\"Can't convert to string\" type=null", ErrTimestampConversionFailed) if !(strings.Contains(buf.String(), expectedLog)) { t.Errorf("\nexpected: %s\n+actual: %s", expectedLog, buf.String()) @@ -309,10 +302,8 @@ func TestTimestampStage_Process(t *testing.T) { if err != nil { t.Fatal(err) } - ts := time.Now() - lbls := model.LabelSet{} - st.Process(lbls, test.extracted, &ts, nil) - assert.Equal(t, test.expected.UnixNano(), ts.UnixNano()) + out := processEntries(st, newEntry(test.extracted, nil, "hello world", time.Now()))[0] + assert.Equal(t, test.expected.UnixNano(), out.Timestamp.UnixNano()) }) } } @@ -452,12 +443,8 @@ func TestTimestampStage_ProcessActionOnFailure(t *testing.T) { require.NoError(t, err) for i, inputEntry := range testData.inputEntries { - extracted := inputEntry.extracted - timestamp := inputEntry.timestamp - entry := "" - - s.Process(inputEntry.labels, extracted, ×tamp, &entry) - assert.Equal(t, testData.expectedTimestamps[i], timestamp, "entry: %d", i) + out := processEntries(s, newEntry(inputEntry.extracted, inputEntry.labels, "", inputEntry.timestamp))[0] + assert.Equal(t, testData.expectedTimestamps[i], out.Timestamp, "entry: %d", i) } }) } diff --git a/pkg/logentry/stages/util_test.go b/pkg/logentry/stages/util_test.go index 6e25d245db1e..5bcc8be673a7 100644 --- a/pkg/logentry/stages/util_test.go +++ b/pkg/logentry/stages/util_test.go @@ -7,8 +7,30 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" ) +func newEntry(ex map[string]interface{}, lbs model.LabelSet, line string, ts time.Time) Entry { + if ex == nil { + ex = map[string]interface{}{} + } + if lbs == nil { + lbs = model.LabelSet{} + } + return Entry{ + Extracted: ex, + Entry: api.Entry{ + Labels: lbs, + Entry: logproto.Entry{ + Timestamp: ts, + Line: line, + }, + }, + } +} + // nolint func mustParseTime(layout, value string) time.Time { t, err := time.Parse(layout, value) diff --git a/pkg/promtail/api/types.go b/pkg/promtail/api/types.go index 69efd0eb317d..f1a4aa87b390 100644 --- a/pkg/promtail/api/types.go +++ b/pkg/promtail/api/types.go @@ -1,48 +1,91 @@ package api import ( - "time" + "sync" "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/logproto" ) +// Entry is a log entry with labels. +type Entry struct { + Labels model.LabelSet + logproto.Entry +} + type InstrumentedEntryHandler interface { EntryHandler UnregisterLatencyMetric(labels model.LabelSet) } -// EntryHandler is something that can "handle" entries. +// EntryHandler is something that can "handle" entries via a channel. +// Stop must be called to gracefully shutdown the EntryHandler type EntryHandler interface { - Handle(labels model.LabelSet, time time.Time, entry string) error + Chan() chan<- Entry + Stop() } -// EntryHandlerFunc is modelled on http.HandlerFunc. -type EntryHandlerFunc func(labels model.LabelSet, time time.Time, entry string) error - -// Handle implements EntryHandler. -func (e EntryHandlerFunc) Handle(labels model.LabelSet, time time.Time, entry string) error { - return e(labels, time, entry) -} - -// EntryMiddleware is something that takes on EntryHandler and produces another. +// EntryMiddleware takes an EntryHandler and returns another one that will intercept and forward entries. +// The newly created EntryHandler should be Stopped independently from the original one. type EntryMiddleware interface { - Wrap(next EntryHandler) EntryHandler + Wrap(EntryHandler) EntryHandler } -// EntryMiddlewareFunc is modelled on http.HandlerFunc. -type EntryMiddlewareFunc func(next EntryHandler) EntryHandler +// EntryMiddlewareFunc allows to create EntryMiddleware via a function. +type EntryMiddlewareFunc func(EntryHandler) EntryHandler -// Wrap implements EntryMiddleware. func (e EntryMiddlewareFunc) Wrap(next EntryHandler) EntryHandler { return e(next) } +// EntryMutatorFunc is a function that can mutate an entry +type EntryMutatorFunc func(Entry) Entry + +type entryHandler struct { + stop func() + entries chan<- Entry +} + +func (e entryHandler) Chan() chan<- Entry { + return e.entries +} + +func (e entryHandler) Stop() { + e.stop() +} + +// NewEntryHandler creates a new EntryHandler using a input channel and a stop function. +func NewEntryHandler(entries chan<- Entry, stop func()) EntryHandler { + return entryHandler{ + stop: stop, + entries: entries, + } +} + +// NewEntryMutatorHandler creates a EntryHandler that mutates incoming entries from another EntryHandler. +func NewEntryMutatorHandler(next EntryHandler, f EntryMutatorFunc) EntryHandler { + in, wg, once := make(chan Entry), sync.WaitGroup{}, sync.Once{} + nextChan := next.Chan() + wg.Add(1) + go func() { + defer wg.Done() + for e := range in { + nextChan <- f(e) + } + }() + return NewEntryHandler(in, func() { + once.Do(func() { close(in) }) + wg.Wait() + }) +} + // AddLabelsMiddleware is an EntryMiddleware that adds some labels. func AddLabelsMiddleware(additionalLabels model.LabelSet) EntryMiddleware { - return EntryMiddlewareFunc(func(next EntryHandler) EntryHandler { - return EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { - labels = additionalLabels.Merge(labels) // Add the additionalLabels but preserves the original labels. - return next.Handle(labels, time, entry) + return EntryMiddlewareFunc(func(eh EntryHandler) EntryHandler { + return NewEntryMutatorHandler(eh, func(e Entry) Entry { + e.Labels = additionalLabels.Merge(e.Labels) + return e }) }) } diff --git a/pkg/promtail/client/batch.go b/pkg/promtail/client/batch.go index 31853568623e..16bb23ebbe92 100644 --- a/pkg/promtail/client/batch.go +++ b/pkg/promtail/client/batch.go @@ -7,6 +7,7 @@ import ( "github.com/golang/snappy" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" ) // batch holds pending log streams waiting to be sent to Loki, and it's used @@ -19,7 +20,7 @@ type batch struct { createdAt time.Time } -func newBatch(entries ...entry) *batch { +func newBatch(entries ...api.Entry) *batch { b := &batch{ streams: map[string]*logproto.Stream{}, bytes: 0, @@ -35,11 +36,11 @@ func newBatch(entries ...entry) *batch { } // add an entry to the batch -func (b *batch) add(entry entry) { +func (b *batch) add(entry api.Entry) { b.bytes += len(entry.Line) // Append the entry to an already existing stream (if any) - labels := entry.labels.String() + labels := entry.Labels.String() if stream, ok := b.streams[labels]; ok { stream.Entries = append(stream.Entries, entry.Entry) return @@ -59,7 +60,7 @@ func (b *batch) sizeBytes() int { // sizeBytesAfter returns the size of the batch after the input entry // will be added to the batch itself -func (b *batch) sizeBytesAfter(entry entry) int { +func (b *batch) sizeBytesAfter(entry api.Entry) int { return b.bytes + len(entry.Line) } diff --git a/pkg/promtail/client/batch_test.go b/pkg/promtail/client/batch_test.go index d029c6c4c951..9cdee0fbda06 100644 --- a/pkg/promtail/client/batch_test.go +++ b/pkg/promtail/client/batch_test.go @@ -10,37 +10,38 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" ) func TestBatch_add(t *testing.T) { t.Parallel() tests := map[string]struct { - inputEntries []entry + inputEntries []api.Entry expectedSizeBytes int }{ "empty batch": { - inputEntries: []entry{}, + inputEntries: []api.Entry{}, expectedSizeBytes: 0, }, "single stream with single log entry": { - inputEntries: []entry{ - {"tenant", model.LabelSet{}, logEntries[0].Entry}, + inputEntries: []api.Entry{ + {Labels: model.LabelSet{}, Entry: logEntries[0].Entry}, }, expectedSizeBytes: len(logEntries[0].Entry.Line), }, "single stream with multiple log entries": { - inputEntries: []entry{ - {"tenant", model.LabelSet{}, logEntries[0].Entry}, - {"tenant", model.LabelSet{}, logEntries[1].Entry}, + inputEntries: []api.Entry{ + {Labels: model.LabelSet{}, Entry: logEntries[0].Entry}, + {Labels: model.LabelSet{}, Entry: logEntries[1].Entry}, }, expectedSizeBytes: len(logEntries[0].Entry.Line) + len(logEntries[1].Entry.Line), }, "multiple streams with multiple log entries": { - inputEntries: []entry{ - {"tenant", model.LabelSet{"type": "a"}, logEntries[0].Entry}, - {"tenant", model.LabelSet{"type": "a"}, logEntries[1].Entry}, - {"tenant", model.LabelSet{"type": "b"}, logEntries[2].Entry}, + inputEntries: []api.Entry{ + {Labels: model.LabelSet{"type": "a"}, Entry: logEntries[0].Entry}, + {Labels: model.LabelSet{"type": "a"}, Entry: logEntries[1].Entry}, + {Labels: model.LabelSet{"type": "b"}, Entry: logEntries[2].Entry}, }, expectedSizeBytes: len(logEntries[0].Entry.Line) + len(logEntries[1].Entry.Line) + len(logEntries[2].Entry.Line), }, @@ -74,22 +75,22 @@ func TestBatch_encode(t *testing.T) { }, "single stream with single log entry": { inputBatch: newBatch( - entry{"tenant", model.LabelSet{}, logEntries[0].Entry}, + api.Entry{Labels: model.LabelSet{}, Entry: logEntries[0].Entry}, ), expectedEntriesCount: 1, }, "single stream with multiple log entries": { inputBatch: newBatch( - entry{"tenant", model.LabelSet{}, logEntries[0].Entry}, - entry{"tenant", model.LabelSet{}, logEntries[1].Entry}, + api.Entry{Labels: model.LabelSet{}, Entry: logEntries[0].Entry}, + api.Entry{Labels: model.LabelSet{}, Entry: logEntries[1].Entry}, ), expectedEntriesCount: 2, }, "multiple streams with multiple log entries": { inputBatch: newBatch( - entry{"tenant", model.LabelSet{"type": "a"}, logEntries[0].Entry}, - entry{"tenant", model.LabelSet{"type": "a"}, logEntries[1].Entry}, - entry{"tenant", model.LabelSet{"type": "b"}, logEntries[2].Entry}, + api.Entry{Labels: model.LabelSet{"type": "a"}, Entry: logEntries[0].Entry}, + api.Entry{Labels: model.LabelSet{"type": "a"}, Entry: logEntries[1].Entry}, + api.Entry{Labels: model.LabelSet{"type": "b"}, Entry: logEntries[2].Entry}, ), expectedEntriesCount: 3, }, @@ -120,8 +121,8 @@ func TestHashCollisions(t *testing.T) { const entriesPerLabel = 10 for i := 0; i < entriesPerLabel; i++ { - b.add(entry{labels: ls1, Entry: logproto.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}}) - b.add(entry{labels: ls2, Entry: logproto.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}}) + b.add(api.Entry{Labels: ls1, Entry: logproto.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}}) + b.add(api.Entry{Labels: ls2, Entry: logproto.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}}) } // make sure that colliding labels are stored properly as independent streams diff --git a/pkg/promtail/client/client.go b/pkg/promtail/client/client.go index fdadb6a69e73..f59dea5146bc 100644 --- a/pkg/promtail/client/client.go +++ b/pkg/promtail/client/client.go @@ -27,7 +27,6 @@ import ( "github.com/prometheus/common/version" "github.com/grafana/loki/pkg/helpers" - "github.com/grafana/loki/pkg/logproto" ) const ( @@ -110,25 +109,19 @@ func init() { // Client pushes entries to Loki and can be stopped type Client interface { api.EntryHandler - // Stop goroutine sending batch of entries. - Stop() - // Stop goroutine sending batch of entries without retries. StopNow() } // Client for pushing logs in snappy-compressed protos over HTTP. type client struct { - logger log.Logger - cfg Config - client *http.Client - - // quit chan is depricated. Will be removed. Use `client.ctx` and `client.cancel` instead. - quit chan struct{} + logger log.Logger + cfg Config + client *http.Client + entries chan api.Entry - once sync.Once - entries chan entry - wg sync.WaitGroup + once sync.Once + wg sync.WaitGroup externalLabels model.LabelSet @@ -137,12 +130,6 @@ type client struct { cancel context.CancelFunc } -type entry struct { - tenantID string - labels model.LabelSet - logproto.Entry -} - // New makes a new Client. func New(cfg Config, logger log.Logger) (Client, error) { if cfg.URL.URL == nil { @@ -154,8 +141,7 @@ func New(cfg Config, logger log.Logger) (Client, error) { c := &client{ logger: log.With(logger, "component", "client", "host", cfg.URL.Host), cfg: cfg, - quit: make(chan struct{}), - entries: make(chan entry), + entries: make(chan api.Entry), externalLabels: cfg.ExternalLabels.LabelSet, ctx: ctx, @@ -213,24 +199,25 @@ func (c *client) run() { for { select { - case <-c.quit: - return - - case e := <-c.entries: - batch, ok := batches[e.tenantID] + case e, ok := <-c.entries: + if !ok { + return + } + e, tenantID := c.processEntry(e) + batch, ok := batches[tenantID] // If the batch doesn't exist yet, we create a new one with the entry if !ok { - batches[e.tenantID] = newBatch(e) + batches[tenantID] = newBatch(e) break } // If adding the entry to the batch will increase the size over the max // size allowed, we do send the current batch and then create a new one if batch.sizeBytesAfter(e) > c.cfg.BatchSize { - c.sendBatch(e.tenantID, batch) + c.sendBatch(tenantID, batch) - batches[e.tenantID] = newBatch(e) + batches[tenantID] = newBatch(e) break } @@ -251,6 +238,10 @@ func (c *client) run() { } } +func (c *client) Chan() chan<- api.Entry { + return c.entries +} + func (c *client) sendBatch(tenantID string, batch *batch) { buf, entriesCount, err := batch.encode() if err != nil { @@ -369,37 +360,24 @@ func (c *client) getTenantID(labels model.LabelSet) string { // Stop the client. func (c *client) Stop() { - c.once.Do(func() { close(c.quit) }) + c.once.Do(func() { close(c.entries) }) c.wg.Wait() } // StopNow stops the client without retries func (c *client) StopNow() { - // cancel any upstream calls made using client's `ctx`. + // cancel will stop retrying http requests. c.cancel() c.Stop() } -// Handle implement EntryHandler; adds a new line to the next batch; send is async. -func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error { +func (c *client) processEntry(e api.Entry) (api.Entry, string) { if len(c.externalLabels) > 0 { - ls = c.externalLabels.Merge(ls) + e.Labels = c.externalLabels.Merge(e.Labels) } - - // Get the tenant ID in case it has been overridden while processing - // the pipeline stages, then remove the special label - tenantID := c.getTenantID(ls) - if _, ok := ls[ReservedLabelTenantID]; ok { - // Clone the label set to not manipulate the input one - ls = ls.Clone() - delete(ls, ReservedLabelTenantID) - } - - c.entries <- entry{tenantID, ls, logproto.Entry{ - Timestamp: t, - Line: s, - }} - return nil + tenantID := c.getTenantID(e.Labels) + delete(e.Labels, ReservedLabelTenantID) + return e, tenantID } func (c *client) UnregisterLatencyMetric(labels model.LabelSet) { diff --git a/pkg/promtail/client/client_test.go b/pkg/promtail/client/client_test.go index 9384997cdfa4..321c356be313 100644 --- a/pkg/promtail/client/client_test.go +++ b/pkg/promtail/client/client_test.go @@ -20,17 +20,18 @@ import ( "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" lokiflag "github.com/grafana/loki/pkg/util/flagext" ) var ( - logEntries = []entry{ - {labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(1, 0).UTC(), Line: "line1"}}, - {labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(2, 0).UTC(), Line: "line2"}}, - {labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(3, 0).UTC(), Line: "line3"}}, - {labels: model.LabelSet{"__tenant_id__": "tenant-1"}, Entry: logproto.Entry{Timestamp: time.Unix(4, 0).UTC(), Line: "line4"}}, - {labels: model.LabelSet{"__tenant_id__": "tenant-1"}, Entry: logproto.Entry{Timestamp: time.Unix(5, 0).UTC(), Line: "line5"}}, - {labels: model.LabelSet{"__tenant_id__": "tenant-2"}, Entry: logproto.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line6"}}, + logEntries = []api.Entry{ + {Labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(1, 0).UTC(), Line: "line1"}}, + {Labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(2, 0).UTC(), Line: "line2"}}, + {Labels: model.LabelSet{}, Entry: logproto.Entry{Timestamp: time.Unix(3, 0).UTC(), Line: "line3"}}, + {Labels: model.LabelSet{"__tenant_id__": "tenant-1"}, Entry: logproto.Entry{Timestamp: time.Unix(4, 0).UTC(), Line: "line4"}}, + {Labels: model.LabelSet{"__tenant_id__": "tenant-1"}, Entry: logproto.Entry{Timestamp: time.Unix(5, 0).UTC(), Line: "line5"}}, + {Labels: model.LabelSet{"__tenant_id__": "tenant-2"}, Entry: logproto.Entry{Timestamp: time.Unix(6, 0).UTC(), Line: "line6"}}, } ) @@ -46,7 +47,7 @@ func TestClient_Handle(t *testing.T) { clientMaxRetries int clientTenantID string serverResponseStatus int - inputEntries []entry + inputEntries []api.Entry inputDelay time.Duration expectedReqs []receivedReq expectedMetrics string @@ -56,7 +57,7 @@ func TestClient_Handle(t *testing.T) { clientBatchWait: 100 * time.Millisecond, clientMaxRetries: 3, serverResponseStatus: 200, - inputEntries: []entry{logEntries[0], logEntries[1], logEntries[2]}, + inputEntries: []api.Entry{logEntries[0], logEntries[1], logEntries[2]}, expectedReqs: []receivedReq{ { tenantID: "", @@ -81,7 +82,7 @@ func TestClient_Handle(t *testing.T) { clientBatchWait: 100 * time.Millisecond, clientMaxRetries: 3, serverResponseStatus: 200, - inputEntries: []entry{logEntries[0], logEntries[1]}, + inputEntries: []api.Entry{logEntries[0], logEntries[1]}, inputDelay: 110 * time.Millisecond, expectedReqs: []receivedReq{ { @@ -107,7 +108,7 @@ func TestClient_Handle(t *testing.T) { clientBatchWait: 10 * time.Millisecond, clientMaxRetries: 3, serverResponseStatus: 500, - inputEntries: []entry{logEntries[0]}, + inputEntries: []api.Entry{logEntries[0]}, expectedReqs: []receivedReq{ { tenantID: "", @@ -136,7 +137,7 @@ func TestClient_Handle(t *testing.T) { clientBatchWait: 10 * time.Millisecond, clientMaxRetries: 3, serverResponseStatus: 400, - inputEntries: []entry{logEntries[0]}, + inputEntries: []api.Entry{logEntries[0]}, expectedReqs: []receivedReq{ { tenantID: "", @@ -157,7 +158,7 @@ func TestClient_Handle(t *testing.T) { clientBatchWait: 10 * time.Millisecond, clientMaxRetries: 3, serverResponseStatus: 429, - inputEntries: []entry{logEntries[0]}, + inputEntries: []api.Entry{logEntries[0]}, expectedReqs: []receivedReq{ { tenantID: "", @@ -187,7 +188,7 @@ func TestClient_Handle(t *testing.T) { clientMaxRetries: 3, clientTenantID: "tenant-default", serverResponseStatus: 200, - inputEntries: []entry{logEntries[0], logEntries[1]}, + inputEntries: []api.Entry{logEntries[0], logEntries[1]}, expectedReqs: []receivedReq{ { tenantID: "tenant-default", @@ -209,7 +210,7 @@ func TestClient_Handle(t *testing.T) { clientMaxRetries: 3, clientTenantID: "tenant-default", serverResponseStatus: 200, - inputEntries: []entry{logEntries[0], logEntries[3], logEntries[4], logEntries[5]}, + inputEntries: []api.Entry{logEntries[0], logEntries[3], logEntries[4], logEntries[5]}, expectedReqs: []receivedReq{ { tenantID: "tenant-default", @@ -271,8 +272,7 @@ func TestClient_Handle(t *testing.T) { // Send all the input log entries for i, logEntry := range testData.inputEntries { - err = c.Handle(logEntry.labels, logEntry.Timestamp, logEntry.Line) - require.NoError(t, err) + c.Chan() <- logEntry if testData.inputDelay > 0 && i < len(testData.inputEntries)-1 { time.Sleep(testData.inputDelay) @@ -315,7 +315,7 @@ func TestClient_StopNow(t *testing.T) { clientMaxRetries int clientTenantID string serverResponseStatus int - inputEntries []entry + inputEntries []api.Entry inputDelay time.Duration expectedReqs []receivedReq expectedMetrics string @@ -326,7 +326,7 @@ func TestClient_StopNow(t *testing.T) { clientBatchWait: 100 * time.Millisecond, clientMaxRetries: 3, serverResponseStatus: 200, - inputEntries: []entry{logEntries[0], logEntries[1], logEntries[2]}, + inputEntries: []api.Entry{logEntries[0], logEntries[1], logEntries[2]}, expectedReqs: []receivedReq{ { tenantID: "", @@ -352,7 +352,7 @@ func TestClient_StopNow(t *testing.T) { clientBatchWait: 10 * time.Millisecond, clientMaxRetries: 3, serverResponseStatus: 429, - inputEntries: []entry{logEntries[0]}, + inputEntries: []api.Entry{logEntries[0]}, expectedReqs: []receivedReq{ { tenantID: "", @@ -406,8 +406,7 @@ func TestClient_StopNow(t *testing.T) { // Send all the input log entries for i, logEntry := range c.inputEntries { - err = cl.Handle(logEntry.labels, logEntry.Timestamp, logEntry.Line) - require.NoError(t, err) + cl.Chan() <- logEntry if c.inputDelay > 0 && i < len(c.inputEntries)-1 { time.Sleep(c.inputDelay) diff --git a/pkg/promtail/client/fake/client.go b/pkg/promtail/client/fake/client.go index ab7bd1686c00..80f440aecda7 100644 --- a/pkg/promtail/client/fake/client.go +++ b/pkg/promtail/client/fake/client.go @@ -1,30 +1,58 @@ package fake import ( - "time" - - "github.com/prometheus/common/model" + "sync" "github.com/grafana/loki/pkg/promtail/api" ) // Client is a fake client used for testing. type Client struct { - OnHandleEntry api.EntryHandlerFunc - OnStop func() + entries chan api.Entry + received []api.Entry + once sync.Once + mtx sync.Mutex + wg sync.WaitGroup + OnStop func() +} + +func New(stop func()) *Client { + c := &Client{ + OnStop: stop, + entries: make(chan api.Entry), + } + c.wg.Add(1) + go func() { + defer c.wg.Done() + for e := range c.entries { + c.mtx.Lock() + c.received = append(c.received, e) + c.mtx.Unlock() + } + }() + return c } // Stop implements client.Client func (c *Client) Stop() { + c.once.Do(func() { close(c.entries) }) + c.wg.Wait() c.OnStop() } -// StopNow implements client.Client -func (c *Client) StopNow() { - c.OnStop() +func (c *Client) Chan() chan<- api.Entry { + return c.entries +} + +func (c *Client) Received() []api.Entry { + c.mtx.Lock() + defer c.mtx.Unlock() + cpy := make([]api.Entry, len(c.received)) + copy(cpy, c.received) + return cpy } -// Handle implements client.Client -func (c *Client) Handle(labels model.LabelSet, time time.Time, entry string) error { - return c.OnHandleEntry.Handle(labels, time, entry) +// StopNow implements client.Client +func (c *Client) StopNow() { + c.Stop() } diff --git a/pkg/promtail/client/logger.go b/pkg/promtail/client/logger.go index dc164fa1141b..9a8f960179d4 100644 --- a/pkg/promtail/client/logger.go +++ b/pkg/promtail/client/logger.go @@ -6,13 +6,12 @@ import ( "runtime" "sync" "text/tabwriter" - "time" "github.com/fatih/color" "github.com/go-kit/kit/log" - "github.com/prometheus/common/model" "gopkg.in/yaml.v2" + "github.com/grafana/loki/pkg/promtail/api" lokiflag "github.com/grafana/loki/pkg/util/flagext" ) @@ -31,6 +30,9 @@ func init() { type logger struct { *tabwriter.Writer sync.Mutex + entries chan api.Entry + + once sync.Once } // NewLogger creates a new client logger that logs entries instead of sending them. @@ -51,24 +53,33 @@ func NewLogger(log log.Logger, externalLabels lokiflag.LabelSet, cfgs ...Config) fmt.Println("----------------------") fmt.Println(string(yaml)) } - return &logger{ - Writer: tabwriter.NewWriter(os.Stdout, 0, 8, 0, '\t', 0), - }, nil + entries := make(chan api.Entry) + l := &logger{ + Writer: tabwriter.NewWriter(os.Stdout, 0, 8, 0, '\t', 0), + entries: entries, + } + go l.run() + return l, nil +} + +func (l *logger) Stop() { + l.once.Do(func() { close(l.entries) }) } -func (*logger) Stop() {} +func (l *logger) Chan() chan<- api.Entry { + return l.entries +} -func (*logger) StopNow() {} +func (l *logger) run() { + for e := range l.entries { + fmt.Fprint(l.Writer, blue.Sprint(e.Timestamp.Format("2006-01-02T15:04:05"))) + fmt.Fprint(l.Writer, "\t") + fmt.Fprint(l.Writer, yellow.Sprint(e.Labels.String())) + fmt.Fprint(l.Writer, "\t") + fmt.Fprint(l.Writer, e.Line) + fmt.Fprint(l.Writer, "\n") + l.Flush() + } -func (l *logger) Handle(labels model.LabelSet, time time.Time, entry string) error { - l.Lock() - defer l.Unlock() - fmt.Fprint(l.Writer, blue.Sprint(time.Format("2006-01-02T15:04:05"))) - fmt.Fprint(l.Writer, "\t") - fmt.Fprint(l.Writer, yellow.Sprint(labels.String())) - fmt.Fprint(l.Writer, "\t") - fmt.Fprint(l.Writer, entry) - fmt.Fprint(l.Writer, "\n") - l.Flush() - return nil } +func (l *logger) StopNow() { l.Stop() } diff --git a/pkg/promtail/client/logger_test.go b/pkg/promtail/client/logger_test.go index ae2f1fd5d696..72f17a4c7ba4 100644 --- a/pkg/promtail/client/logger_test.go +++ b/pkg/promtail/client/logger_test.go @@ -10,6 +10,8 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/util/flagext" ) @@ -19,6 +21,7 @@ func TestNewLogger(t *testing.T) { l, err := NewLogger(util.Logger, flagext.LabelSet{}, []Config{{URL: cortexflag.URLValue{URL: &url.URL{Host: "string"}}}}...) require.NoError(t, err) - err = l.Handle(model.LabelSet{"foo": "bar"}, time.Now(), "entry") - require.NoError(t, err) + l.Chan() <- api.Entry{Labels: model.LabelSet{"foo": "bar"}, Entry: logproto.Entry{Timestamp: time.Now(), Line: "entry"}} + l.Stop() + } diff --git a/pkg/promtail/client/multi.go b/pkg/promtail/client/multi.go index 0e8720ca5da3..9f962c3fd2e6 100644 --- a/pkg/promtail/client/multi.go +++ b/pkg/promtail/client/multi.go @@ -2,17 +2,22 @@ package client import ( "errors" - "time" + "sync" "github.com/go-kit/kit/log" - "github.com/prometheus/common/model" - "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/util/flagext" ) // MultiClient is client pushing to one or more loki instances. -type MultiClient []Client +type MultiClient struct { + clients []Client + entries chan api.Entry + wg sync.WaitGroup + + once sync.Once +} // NewMulti creates a new client func NewMulti(logger log.Logger, externalLabels flagext.LabelSet, cfgs ...Config) (Client, error) { @@ -35,30 +40,42 @@ func NewMulti(logger log.Logger, externalLabels flagext.LabelSet, cfgs ...Config } clients = append(clients, client) } - return MultiClient(clients), nil + multi := &MultiClient{ + clients: clients, + entries: make(chan api.Entry), + } + multi.start() + return multi, nil } -// Handle Implements api.EntryHandler -func (m MultiClient) Handle(labels model.LabelSet, time time.Time, entry string) error { - var result util.MultiError - for _, client := range m { - if err := client.Handle(labels, time, entry); err != nil { - result.Add(err) +func (m *MultiClient) start() { + m.wg.Add(1) + go func() { + defer m.wg.Done() + for e := range m.entries { + for _, c := range m.clients { + c.Chan() <- e + } } - } - return result.Err() + }() +} + +func (m *MultiClient) Chan() chan<- api.Entry { + return m.entries } // Stop implements Client -func (m MultiClient) Stop() { - for _, c := range m { +func (m *MultiClient) Stop() { + m.once.Do(func() { close(m.entries) }) + m.wg.Wait() + for _, c := range m.clients { c.Stop() } } // StopNow implements Client -func (m MultiClient) StopNow() { - for _, c := range m { +func (m *MultiClient) StopNow() { + for _, c := range m.clients { c.StopNow() } } diff --git a/pkg/promtail/client/multi_test.go b/pkg/promtail/client/multi_test.go index b5e1fbb50ea0..980c7ed7ee90 100644 --- a/pkg/promtail/client/multi_test.go +++ b/pkg/promtail/client/multi_test.go @@ -1,7 +1,6 @@ package client import ( - "errors" "net/url" "reflect" "testing" @@ -11,6 +10,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/prometheus/common/model" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/promtail/api" lokiflag "github.com/grafana/loki/pkg/util/flagext" @@ -41,11 +41,11 @@ func TestNewMulti(t *testing.T) { if err != nil { t.Fatalf("expected err: nil got:%v", err) } - multi := clients.(MultiClient) - if len(multi) != 2 { - t.Fatalf("expected client: 2 got:%d", len(multi)) + multi := clients.(*MultiClient) + if len(multi.clients) != 2 { + t.Fatalf("expected client: 2 got:%d", len(multi.clients)) } - actualCfg1 := clients.(MultiClient)[0].(*client).cfg + actualCfg1 := clients.(*MultiClient).clients[0].(*client).cfg // Yaml should overried the command line so 'order: yaml' should be expected expectedCfg1 := Config{ BatchSize: 20, @@ -58,7 +58,7 @@ func TestNewMulti(t *testing.T) { t.Fatalf("expected cfg: %v got:%v", expectedCfg1, actualCfg1) } - actualCfg2 := clients.(MultiClient)[1].(*client).cfg + actualCfg2 := clients.(*MultiClient).clients[1].(*client).cfg // No overlapping label keys so both should be in the output expectedCfg2 := Config{ BatchSize: 10, @@ -83,10 +83,13 @@ func TestMultiClient_Stop(t *testing.T) { stopping := func() { stopped++ } - fc := &fake.Client{OnStop: stopping} + fc := fake.New(stopping) clients := []Client{fc, fc, fc, fc} - m := MultiClient(clients) - + m := &MultiClient{ + clients: clients, + entries: make(chan api.Entry), + } + m.start() m.Stop() if stopped != len(clients) { @@ -96,39 +99,20 @@ func TestMultiClient_Stop(t *testing.T) { func TestMultiClient_Handle(t *testing.T) { - var called int - - errorFn := api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { called++; return errors.New("") }) - okFn := api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error { called++; return nil }) - - errfc := &fake.Client{OnHandleEntry: errorFn} - okfc := &fake.Client{OnHandleEntry: okFn} - t.Run("some error", func(t *testing.T) { - clients := []Client{okfc, errfc, okfc, errfc, errfc, okfc} - m := MultiClient(clients) - - if err := m.Handle(nil, time.Now(), ""); err == nil { - t.Fatal("expected err got nil") - } - - if called != len(clients) { - t.Fatal("missing handle call") - } - - }) - t.Run("no error", func(t *testing.T) { - called = 0 - clients := []Client{okfc, okfc, okfc, okfc, okfc, okfc} - m := MultiClient(clients) + f := fake.New(func() {}) + clients := []Client{f, f, f, f, f, f} + m := &MultiClient{ + clients: clients, + entries: make(chan api.Entry), + } + m.start() - if err := m.Handle(nil, time.Now(), ""); err != nil { - t.Fatal("expected err to be nil") - } + m.Chan() <- api.Entry{Labels: model.LabelSet{"foo": "bar"}, Entry: logproto.Entry{Line: "foo"}} - if called != len(clients) { - t.Fatal("missing handle call") - } + m.Stop() - }) + if len(f.Received()) != len(clients) { + t.Fatal("missing handle call") + } } diff --git a/pkg/promtail/promtail.go b/pkg/promtail/promtail.go index fe02183b1eb0..82a6859cd6e6 100644 --- a/pkg/promtail/promtail.go +++ b/pkg/promtail/promtail.go @@ -114,5 +114,6 @@ func (p *Promtail) Shutdown() { if p.targetManagers != nil { p.targetManagers.Stop() } + // todo work out the stop. p.client.Stop() } diff --git a/pkg/promtail/promtail_test.go b/pkg/promtail/promtail_test.go index b0a4c311a80c..da148086dd8d 100644 --- a/pkg/promtail/promtail_test.go +++ b/pkg/promtail/promtail_test.go @@ -656,5 +656,5 @@ func Test_DryRun(t *testing.T) { }, }, false) require.NoError(t, err) - require.IsType(t, client.MultiClient{}, p.client) + require.IsType(t, &client.MultiClient{}, p.client) } diff --git a/pkg/promtail/targets/file/filetarget.go b/pkg/promtail/targets/file/filetarget.go index 3809ad319b59..f22170f26dbd 100644 --- a/pkg/promtail/targets/file/filetarget.go +++ b/pkg/promtail/targets/file/filetarget.go @@ -134,6 +134,7 @@ func (t *FileTarget) Ready() bool { func (t *FileTarget) Stop() { close(t.quit) <-t.done + t.handler.Stop() } // Type implements a Target diff --git a/pkg/promtail/targets/file/filetarget_test.go b/pkg/promtail/targets/file/filetarget_test.go index 9dd29af6e306..15725a57ab70 100644 --- a/pkg/promtail/targets/file/filetarget_test.go +++ b/pkg/promtail/targets/file/filetarget_test.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/kit/log" "gopkg.in/yaml.v2" + "github.com/grafana/loki/pkg/promtail/client/fake" "github.com/grafana/loki/pkg/promtail/positions" "github.com/grafana/loki/pkg/promtail/targets/testutils" ) @@ -41,10 +42,8 @@ func TestLongPositionsSyncDelayStillSavesCorrectPosition(t *testing.T) { t.Fatal(err) } - client := &testutils.TestClient{ - Log: logger, - Messages: make([]*testutils.Entry, 0), - } + client := fake.New(func() {}) + defer client.Stop() f, err := os.Create(logFile) if err != nil { @@ -67,7 +66,7 @@ func TestLongPositionsSyncDelayStillSavesCorrectPosition(t *testing.T) { } countdown := 10000 - for len(client.Messages) != 10 && countdown > 0 { + for len(client.Received()) != 10 && countdown > 0 { time.Sleep(1 * time.Millisecond) countdown-- } @@ -94,13 +93,13 @@ func TestLongPositionsSyncDelayStillSavesCorrectPosition(t *testing.T) { } // Assert the number of messages the handler received is correct. - if len(client.Messages) != 10 { - t.Error("Handler did not receive the correct number of messages, expected 10 received", len(client.Messages)) + if len(client.Received()) != 10 { + t.Error("Handler did not receive the correct number of messages, expected 10 received", len(client.Received())) } // Spot check one of the messages. - if client.Messages[0].Log != "test" { - t.Error("Expected first log message to be 'test' but was", client.Messages[0]) + if client.Received()[0].Line != "test" { + t.Error("Expected first log message to be 'test' but was", client.Received()[0]) } } @@ -134,10 +133,8 @@ func TestWatchEntireDirectory(t *testing.T) { t.Fatal(err) } - client := &testutils.TestClient{ - Log: logger, - Messages: make([]*testutils.Entry, 0), - } + client := fake.New(func() {}) + defer client.Stop() f, err := os.Create(logFileDir + "test.log") if err != nil { @@ -160,7 +157,7 @@ func TestWatchEntireDirectory(t *testing.T) { } countdown := 10000 - for len(client.Messages) != 10 && countdown > 0 { + for len(client.Received()) != 10 && countdown > 0 { time.Sleep(1 * time.Millisecond) countdown-- } @@ -187,13 +184,13 @@ func TestWatchEntireDirectory(t *testing.T) { } // Assert the number of messages the handler received is correct. - if len(client.Messages) != 10 { - t.Error("Handler did not receive the correct number of messages, expected 10 received", len(client.Messages)) + if len(client.Received()) != 10 { + t.Error("Handler did not receive the correct number of messages, expected 10 received", len(client.Received())) } // Spot check one of the messages. - if client.Messages[0].Log != "test" { - t.Error("Expected first log message to be 'test' but was", client.Messages[0]) + if client.Received()[0].Line != "test" { + t.Error("Expected first log message to be 'test' but was", client.Received()[0]) } } @@ -223,10 +220,8 @@ func TestFileRolls(t *testing.T) { t.Fatal(err) } - client := &testutils.TestClient{ - Log: logger, - Messages: make([]*testutils.Entry, 0), - } + client := fake.New(func() {}) + defer client.Stop() f, err := os.Create(logFile) if err != nil { @@ -249,7 +244,7 @@ func TestFileRolls(t *testing.T) { } countdown := 10000 - for len(client.Messages) != 10 && countdown > 0 { + for len(client.Received()) != 10 && countdown > 0 { time.Sleep(1 * time.Millisecond) countdown-- } @@ -273,7 +268,7 @@ func TestFileRolls(t *testing.T) { } countdown = 10000 - for len(client.Messages) != 20 && countdown > 0 { + for len(client.Received()) != 20 && countdown > 0 { time.Sleep(1 * time.Millisecond) countdown-- } @@ -281,18 +276,18 @@ func TestFileRolls(t *testing.T) { target.Stop() positions.Stop() - if len(client.Messages) != 20 { - t.Error("Handler did not receive the correct number of messages, expected 20 received", len(client.Messages)) + if len(client.Received()) != 20 { + t.Error("Handler did not receive the correct number of messages, expected 20 received", len(client.Received())) } // Spot check one of the messages. - if client.Messages[0].Log != "test1" { - t.Error("Expected first log message to be 'test1' but was", client.Messages[0]) + if client.Received()[0].Line != "test1" { + t.Error("Expected first log message to be 'test1' but was", client.Received()[0]) } // Spot check the first message from the second file. - if client.Messages[10].Log != "test2" { - t.Error("Expected first log message to be 'test2' but was", client.Messages[10]) + if client.Received()[10].Line != "test2" { + t.Error("Expected first log message to be 'test2' but was", client.Received()[10]) } } @@ -321,10 +316,8 @@ func TestResumesWhereLeftOff(t *testing.T) { t.Fatal(err) } - client := &testutils.TestClient{ - Log: logger, - Messages: make([]*testutils.Entry, 0), - } + client := fake.New(func() {}) + defer client.Stop() f, err := os.Create(logFile) if err != nil { @@ -347,7 +340,7 @@ func TestResumesWhereLeftOff(t *testing.T) { } countdown := 10000 - for len(client.Messages) != 10 && countdown > 0 { + for len(client.Received()) != 10 && countdown > 0 { time.Sleep(1 * time.Millisecond) countdown-- } @@ -381,7 +374,7 @@ func TestResumesWhereLeftOff(t *testing.T) { } countdown = 10000 - for len(client.Messages) != 20 && countdown > 0 { + for len(client.Received()) != 20 && countdown > 0 { time.Sleep(1 * time.Millisecond) countdown-- } @@ -389,18 +382,18 @@ func TestResumesWhereLeftOff(t *testing.T) { target2.Stop() ps2.Stop() - if len(client.Messages) != 20 { - t.Error("Handler did not receive the correct number of messages, expected 20 received", len(client.Messages)) + if len(client.Received()) != 20 { + t.Error("Handler did not receive the correct number of messages, expected 20 received", len(client.Received())) } // Spot check one of the messages. - if client.Messages[0].Log != "test1" { - t.Error("Expected first log message to be 'test1' but was", client.Messages[0]) + if client.Received()[0].Line != "test1" { + t.Error("Expected first log message to be 'test1' but was", client.Received()[0]) } // Spot check the first message from the second file. - if client.Messages[10].Log != "test2" { - t.Error("Expected first log message to be 'test2' but was", client.Messages[10]) + if client.Received()[10].Line != "test2" { + t.Error("Expected first log message to be 'test2' but was", client.Received()[10]) } } @@ -430,10 +423,8 @@ func TestGlobWithMultipleFiles(t *testing.T) { t.Fatal(err) } - client := &testutils.TestClient{ - Log: logger, - Messages: make([]*testutils.Entry, 0), - } + client := fake.New(func() {}) + defer client.Stop() f1, err := os.Create(logFile1) if err != nil { @@ -466,7 +457,7 @@ func TestGlobWithMultipleFiles(t *testing.T) { } countdown := 10000 - for len(client.Messages) != 20 && countdown > 0 { + for len(client.Received()) != 20 && countdown > 0 { time.Sleep(1 * time.Millisecond) countdown-- } @@ -500,8 +491,8 @@ func TestGlobWithMultipleFiles(t *testing.T) { } // Assert the number of messages the handler received is correct. - if len(client.Messages) != 20 { - t.Error("Handler did not receive the correct number of messages, expected 20 received", len(client.Messages)) + if len(client.Received()) != 20 { + t.Error("Handler did not receive the correct number of messages, expected 20 received", len(client.Received())) } } @@ -533,10 +524,8 @@ func TestFileTargetSync(t *testing.T) { t.Fatal(err) } - client := &testutils.TestClient{ - Log: logger, - Messages: make([]*testutils.Entry, 0), - } + client := fake.New(func() {}) + defer client.Stop() target, err := NewFileTarget(logger, client, ps, logDir1+"/*.log", nil, nil, &Config{ SyncPeriod: 10 * time.Second, diff --git a/pkg/promtail/targets/file/filetargetmanager.go b/pkg/promtail/targets/file/filetargetmanager.go index c513bb7e9f08..4eaf98c3af39 100644 --- a/pkg/promtail/targets/file/filetargetmanager.go +++ b/pkg/promtail/targets/file/filetargetmanager.go @@ -323,6 +323,7 @@ func (s *targetSyncer) stop() { target.Stop() delete(s.targets, key) } + s.entryHandler.Stop() } func hostname() (string, error) { diff --git a/pkg/promtail/targets/file/tailer.go b/pkg/promtail/targets/file/tailer.go index 25d82f03c266..38fd94da7a28 100644 --- a/pkg/promtail/targets/file/tailer.go +++ b/pkg/promtail/targets/file/tailer.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/common/model" "go.uber.org/atomic" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/promtail/positions" "github.com/grafana/loki/pkg/util" @@ -131,7 +132,7 @@ func (t *tailer) readLines() { level.Info(t.logger).Log("msg", "tail routine: exited", "path", t.path) close(t.done) }() - + entries := t.handler.Chan() for { line, ok := <-t.tail.Lines if !ok { @@ -147,8 +148,12 @@ func (t *tailer) readLines() { readLines.WithLabelValues(t.path).Inc() logLengthHistogram.WithLabelValues(t.path).Observe(float64(len(line.Text))) - if err := t.handler.Handle(model.LabelSet{}, line.Time, line.Text); err != nil { - level.Error(t.logger).Log("msg", "tail routine: error handling line", "path", t.path, "error", err) + entries <- api.Entry{ + Labels: model.LabelSet{}, + Entry: logproto.Entry{ + Timestamp: line.Time, + Line: line.Text, + }, } } @@ -202,6 +207,7 @@ func (t *tailer) stop() { // Wait for readLines() to consume all the remaining messages and exit when the channel is closed <-t.done level.Info(t.logger).Log("msg", "stopped tailing file", "path", t.path) + t.handler.Stop() }) } diff --git a/pkg/promtail/targets/journal/journaltarget.go b/pkg/promtail/targets/journal/journaltarget.go index 84f3989ff20b..de303d3b35ae 100644 --- a/pkg/promtail/targets/journal/journaltarget.go +++ b/pkg/promtail/targets/journal/journaltarget.go @@ -9,24 +9,20 @@ import ( "strings" "time" + "github.com/coreos/go-systemd/sdjournal" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" jsoniter "github.com/json-iterator/go" + "github.com/pkg/errors" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" - "github.com/go-kit/kit/log/level" - + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/promtail/positions" - "github.com/grafana/loki/pkg/promtail/targets/target" - - "github.com/go-kit/kit/log" - "github.com/grafana/loki/pkg/promtail/scrapeconfig" - - "github.com/coreos/go-systemd/sdjournal" - "github.com/pkg/errors" - "github.com/prometheus/common/model" - - "github.com/grafana/loki/pkg/promtail/api" + "github.com/grafana/loki/pkg/promtail/targets/target" ) const ( @@ -294,8 +290,14 @@ func (t *JournalTarget) formatter(entry *sdjournal.JournalEntry) (string, error) } t.positions.PutString(t.positionPath, entry.Cursor) - err := t.handler.Handle(labels, ts, msg) - return journalEmptyStr, err + t.handler.Chan() <- api.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Line: msg, + Timestamp: ts, + }, + } + return journalEmptyStr, nil } // Type returns JournalTargetType. @@ -332,7 +334,9 @@ func (t *JournalTarget) Details() interface{} { // Stop shuts down the JournalTarget. func (t *JournalTarget) Stop() error { t.until <- time.Now() - return t.r.Close() + err := t.r.Close() + t.handler.Stop() + return err } func makeJournalFields(fields map[string]string) map[string]string { diff --git a/pkg/promtail/targets/journal/journaltarget_test.go b/pkg/promtail/targets/journal/journaltarget_test.go index 6aed7075438e..67ec2dd0abfc 100644 --- a/pkg/promtail/targets/journal/journaltarget_test.go +++ b/pkg/promtail/targets/journal/journaltarget_test.go @@ -3,27 +3,22 @@ package journal import ( - "fmt" "io" "os" "testing" "time" "github.com/coreos/go-systemd/sdjournal" - - "gopkg.in/yaml.v2" - + "github.com/go-kit/kit/log" "github.com/prometheus/prometheus/pkg/relabel" - "github.com/stretchr/testify/assert" - - "github.com/grafana/loki/pkg/promtail/scrapeconfig" - "github.com/grafana/loki/pkg/promtail/targets/testutils" - - "github.com/go-kit/kit/log" "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" + "github.com/grafana/loki/pkg/promtail/client/fake" "github.com/grafana/loki/pkg/promtail/positions" + "github.com/grafana/loki/pkg/promtail/scrapeconfig" + "github.com/grafana/loki/pkg/promtail/targets/testutils" ) type mockJournalReader struct { @@ -86,10 +81,7 @@ func TestJournalTarget(t *testing.T) { t.Fatal(err) } - client := &testutils.TestClient{ - Log: logger, - Messages: make([]*testutils.Entry, 0), - } + client := fake.New(func() {}) relabelCfg := ` - source_labels: ['__journal_code_file'] @@ -115,9 +107,9 @@ func TestJournalTarget(t *testing.T) { }) assert.NoError(t, err) } - fmt.Println(client.Messages) - assert.Len(t, client.Messages, 10) require.NoError(t, jt.Stop()) + client.Stop() + assert.Len(t, client.Received(), 10) } func TestJournalTarget_JSON(t *testing.T) { @@ -139,10 +131,7 @@ func TestJournalTarget_JSON(t *testing.T) { t.Fatal(err) } - client := &testutils.TestClient{ - Log: logger, - Messages: make([]*testutils.Entry, 0), - } + client := fake.New(func() {}) relabelCfg := ` - source_labels: ['__journal_code_file'] @@ -171,14 +160,16 @@ func TestJournalTarget_JSON(t *testing.T) { }) assert.NoError(t, err) - expectMsg := `{"CODE_FILE":"journaltarget_test.go","MESSAGE":"ping","OTHER_FIELD":"foobar"}` + } + expectMsg := `{"CODE_FILE":"journaltarget_test.go","MESSAGE":"ping","OTHER_FIELD":"foobar"}` + require.NoError(t, jt.Stop()) + client.Stop() - require.Greater(t, len(client.Messages), 0) - require.Equal(t, expectMsg, client.Messages[len(client.Messages)-1].Log) + assert.Len(t, client.Received(), 10) + for i := 0; i < 10; i++ { + require.Equal(t, expectMsg, client.Received()[i].Line) } - assert.Len(t, client.Messages, 10) - require.NoError(t, jt.Stop()) } func TestJournalTarget_Since(t *testing.T) { @@ -200,10 +191,7 @@ func TestJournalTarget_Since(t *testing.T) { t.Fatal(err) } - client := &testutils.TestClient{ - Log: logger, - Messages: make([]*testutils.Entry, 0), - } + client := fake.New(func() {}) cfg := scrapeconfig.JournalTargetConfig{ MaxAge: "4h", @@ -215,6 +203,7 @@ func TestJournalTarget_Since(t *testing.T) { r := jt.r.(*mockJournalReader) require.Equal(t, r.config.Since, -1*time.Hour*4) + client.Stop() } func TestJournalTarget_Cursor_TooOld(t *testing.T) { @@ -237,10 +226,7 @@ func TestJournalTarget_Cursor_TooOld(t *testing.T) { } ps.PutString("journal-test", "foobar") - client := &testutils.TestClient{ - Log: logger, - Messages: make([]*testutils.Entry, 0), - } + client := fake.New(func() {}) cfg := scrapeconfig.JournalTargetConfig{} @@ -257,6 +243,7 @@ func TestJournalTarget_Cursor_TooOld(t *testing.T) { r := jt.r.(*mockJournalReader) require.Equal(t, r.config.Since, -1*time.Hour*7) + client.Stop() } func TestJournalTarget_Cursor_NotTooOld(t *testing.T) { @@ -279,10 +266,7 @@ func TestJournalTarget_Cursor_NotTooOld(t *testing.T) { } ps.PutString("journal-test", "foobar") - client := &testutils.TestClient{ - Log: logger, - Messages: make([]*testutils.Entry, 0), - } + client := fake.New(func() {}) cfg := scrapeconfig.JournalTargetConfig{} @@ -300,6 +284,7 @@ func TestJournalTarget_Cursor_NotTooOld(t *testing.T) { r := jt.r.(*mockJournalReader) require.Equal(t, r.config.Since, time.Duration(0)) require.Equal(t, r.config.Cursor, "foobar") + client.Stop() } func Test_MakeJournalFields(t *testing.T) { diff --git a/pkg/promtail/targets/lokipush/pushtarget.go b/pkg/promtail/targets/lokipush/pushtarget.go index dc91720f39b8..fce793dcebe5 100644 --- a/pkg/promtail/targets/lokipush/pushtarget.go +++ b/pkg/promtail/targets/lokipush/pushtarget.go @@ -16,6 +16,7 @@ import ( "github.com/weaveworks/common/server" "github.com/grafana/loki/pkg/distributor" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/promtail/scrapeconfig" @@ -139,17 +140,18 @@ func (t *PushTarget) handle(w http.ResponseWriter, r *http.Request) { } for _, entry := range stream.Entries { - var err error + e := api.Entry{ + Labels: filtered.Clone(), + Entry: logproto.Entry{ + Line: entry.Line, + }, + } if t.config.KeepTimestamp { - err = t.handler.Handle(filtered.Clone(), entry.Timestamp, entry.Line) + e.Timestamp = entry.Timestamp } else { - err = t.handler.Handle(filtered.Clone(), time.Now(), entry.Line) - } - - if err != nil { - lastErr = err - continue + e.Timestamp = time.Now() } + t.handler.Chan() <- e } } @@ -193,5 +195,6 @@ func (t *PushTarget) Details() interface{} { func (t *PushTarget) Stop() error { level.Info(t.logger).Log("msg", "stopping push server", "job", t.jobName) t.server.Shutdown() + t.handler.Stop() return nil } diff --git a/pkg/promtail/targets/lokipush/pushtarget_test.go b/pkg/promtail/targets/lokipush/pushtarget_test.go index c464ab9ab29b..bf95e10f9fb7 100644 --- a/pkg/promtail/targets/lokipush/pushtarget_test.go +++ b/pkg/promtail/targets/lokipush/pushtarget_test.go @@ -15,9 +15,11 @@ import ( "github.com/stretchr/testify/require" "github.com/weaveworks/common/server" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/promtail/client" + "github.com/grafana/loki/pkg/promtail/client/fake" "github.com/grafana/loki/pkg/promtail/scrapeconfig" - "github.com/grafana/loki/pkg/promtail/targets/testutils" ) func TestPushTarget(t *testing.T) { @@ -25,10 +27,8 @@ func TestPushTarget(t *testing.T) { logger := log.NewLogfmtLogger(w) //Create PushTarget - eh := &testutils.TestClient{ - Log: logger, - Messages: make([]*testutils.Entry, 0), - } + eh := fake.New(func() {}) + defer eh.Stop() // Get a randomly available port by open and closing a TCP socket addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0") @@ -79,6 +79,7 @@ func TestPushTarget(t *testing.T) { } pc, err := client.New(ccfg, logger) require.NoError(t, err) + defer pc.Stop() // Send some logs labels := model.LabelSet{ @@ -86,19 +87,24 @@ func TestPushTarget(t *testing.T) { "__anotherdroplabel": "dropme", } for i := 0; i < 100; i++ { - err := pc.Handle(labels, time.Unix(int64(i), 0), "line"+strconv.Itoa(i)) - require.NoError(t, err) + pc.Chan() <- api.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Unix(int64(i), 0), + Line: "line" + strconv.Itoa(i), + }, + } } // Wait for them to appear in the test handler countdown := 10000 - for len(eh.Messages) != 100 && countdown > 0 { + for len(eh.Received()) != 100 && countdown > 0 { time.Sleep(1 * time.Millisecond) countdown-- } // Make sure we didn't timeout - require.Equal(t, 100, len(eh.Messages)) + require.Equal(t, 100, len(eh.Received())) // Verify labels expectedLabels := model.LabelSet{ @@ -106,10 +112,10 @@ func TestPushTarget(t *testing.T) { "stream": "stream1", } // Spot check the first value in the result to make sure relabel rules were applied properly - require.Equal(t, expectedLabels, eh.Messages[0].Labels) + require.Equal(t, expectedLabels, eh.Received()[0].Labels) // With keep timestamp enabled, verify timestamp - require.Equal(t, time.Unix(99, 0).Unix(), eh.Messages[99].Time.Unix()) + require.Equal(t, time.Unix(99, 0).Unix(), eh.Received()[99].Timestamp.Unix()) _ = pt.Stop() diff --git a/pkg/promtail/targets/stdin/stdin_target_manager.go b/pkg/promtail/targets/stdin/stdin_target_manager.go index 2425b22b929e..f19cd5bae420 100644 --- a/pkg/promtail/targets/stdin/stdin_target_manager.go +++ b/pkg/promtail/targets/stdin/stdin_target_manager.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/grafana/loki/pkg/logentry/stages" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/promtail/scrapeconfig" "github.com/grafana/loki/pkg/promtail/targets/target" @@ -129,7 +130,9 @@ func newReaderTarget(logger log.Logger, in io.Reader, client api.EntryHandler, c func (t *readerTarget) read() { defer t.cancel() + defer t.out.Stop() + entries := t.out.Chan() for { if t.ctx.Err() != nil { return @@ -146,8 +149,12 @@ func (t *readerTarget) read() { } continue } - if err := t.out.Handle(t.lbs.Clone(), time.Now(), line); err != nil { - level.Error(t.logger).Log("msg", "error sending line", "err", err) + entries <- api.Entry{ + Labels: t.lbs.Clone(), + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: line, + }, } if err == io.EOF { return diff --git a/pkg/promtail/targets/stdin/stdin_target_manager_test.go b/pkg/promtail/targets/stdin/stdin_target_manager_test.go index eef72e3874a9..26ec7b1b448a 100644 --- a/pkg/promtail/targets/stdin/stdin_target_manager_test.go +++ b/pkg/promtail/targets/stdin/stdin_target_manager_test.go @@ -6,7 +6,6 @@ import ( "os" "strings" "testing" - "time" "github.com/cortexproject/cortex/pkg/util" "github.com/prometheus/common/model" @@ -14,37 +13,26 @@ import ( "gopkg.in/yaml.v2" "github.com/grafana/loki/pkg/logentry/stages" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" + "github.com/grafana/loki/pkg/promtail/client/fake" "github.com/grafana/loki/pkg/promtail/scrapeconfig" ) -type line struct { - labels model.LabelSet - entry string -} - -type clientRecorder struct { - recorded []line -} - -func (c *clientRecorder) Handle(labels model.LabelSet, time time.Time, entry string) error { - c.recorded = append(c.recorded, line{labels: labels, entry: entry}) - return nil -} - func Test_newReaderTarget(t *testing.T) { tests := []struct { name string in io.Reader cfg scrapeconfig.Config - want []line + want []api.Entry wantErr bool }{ { "no newlines", bytes.NewReader([]byte("bar")), scrapeconfig.Config{}, - []line{ - {model.LabelSet{}, "bar"}, + []api.Entry{ + {Labels: model.LabelSet{}, Entry: logproto.Entry{Line: "bar"}}, }, false, }, @@ -59,9 +47,9 @@ func Test_newReaderTarget(t *testing.T) { "newlines", bytes.NewReader([]byte("\nfoo\r\nbar")), scrapeconfig.Config{}, - []line{ - {model.LabelSet{}, "foo"}, - {model.LabelSet{}, "bar"}, + []api.Entry{ + {Labels: model.LabelSet{}, Entry: logproto.Entry{Line: "foo"}}, + {Labels: model.LabelSet{}, Entry: logproto.Entry{Line: "bar"}}, }, false, }, @@ -71,9 +59,9 @@ func Test_newReaderTarget(t *testing.T) { scrapeconfig.Config{ PipelineStages: loadConfig(stagesConfig), }, - []line{ - {model.LabelSet{"new_key": "hello world!"}, "foo"}, - {model.LabelSet{"new_key": "hello world!"}, "bar"}, + []api.Entry{ + {Labels: model.LabelSet{"new_key": "hello world!"}, Entry: logproto.Entry{Line: "foo"}}, + {Labels: model.LabelSet{"new_key": "hello world!"}, Entry: logproto.Entry{Line: "bar"}}, }, false, }, @@ -81,17 +69,17 @@ func Test_newReaderTarget(t *testing.T) { "default config", bytes.NewReader([]byte("\nfoo\r\nbar")), defaultStdInCfg, - []line{ - {model.LabelSet{"job": "stdin", "hostname": model.LabelValue(hostName)}, "foo"}, - {model.LabelSet{"job": "stdin", "hostname": model.LabelValue(hostName)}, "bar"}, + []api.Entry{ + {Labels: model.LabelSet{"job": "stdin", "hostname": model.LabelValue(hostName)}, Entry: logproto.Entry{Line: "foo"}}, + {Labels: model.LabelSet{"job": "stdin", "hostname": model.LabelValue(hostName)}, Entry: logproto.Entry{Line: "bar"}}, }, false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - recorder := &clientRecorder{} - got, err := newReaderTarget(util.Logger, tt.in, recorder, tt.cfg) + c := fake.New(func() {}) + got, err := newReaderTarget(util.Logger, tt.in, c, tt.cfg) if (err != nil) != tt.wantErr { t.Errorf("newReaderTarget() error = %v, wantErr %v", err, tt.wantErr) return @@ -100,7 +88,8 @@ func Test_newReaderTarget(t *testing.T) { return } <-got.ctx.Done() - require.Equal(t, tt.want, recorder.recorded) + c.Stop() + compareEntries(t, tt.want, c.Received()) }) } } @@ -129,13 +118,22 @@ func (f fakeStdin) Stat() (os.FileInfo, error) { return f.FileInfo, nil } func Test_Shutdown(t *testing.T) { stdIn = newFakeStdin("line") appMock := &mockShutdownable{called: make(chan bool, 1)} - recorder := &clientRecorder{} + recorder := fake.New(func() {}) manager, err := NewStdinTargetManager(util.Logger, appMock, recorder, []scrapeconfig.Config{{}}) require.NoError(t, err) require.NotNil(t, manager) - called := <-appMock.called - require.Equal(t, true, called) - require.Equal(t, []line{{labels: model.LabelSet{}, entry: "line"}}, recorder.recorded) + require.Equal(t, true, <-appMock.called) + recorder.Stop() + compareEntries(t, []api.Entry{{Labels: model.LabelSet{}, Entry: logproto.Entry{Line: "line"}}}, recorder.Received()) +} + +func compareEntries(t *testing.T, expected, actual []api.Entry) { + t.Helper() + require.Equal(t, len(expected), len(actual)) + for i := range expected { + require.Equal(t, expected[i].Entry.Line, actual[i].Entry.Line) + require.Equal(t, expected[i].Labels, actual[i].Labels) + } } func Test_StdinConfigs(t *testing.T) { diff --git a/pkg/promtail/targets/syslog/syslogtarget.go b/pkg/promtail/targets/syslog/syslogtarget.go index 501c3aa67a86..d52b4ad73ecc 100644 --- a/pkg/promtail/targets/syslog/syslogtarget.go +++ b/pkg/promtail/targets/syslog/syslogtarget.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/promtail/api" "github.com/grafana/loki/pkg/promtail/scrapeconfig" "github.com/grafana/loki/pkg/promtail/targets/syslog/syslogparser" @@ -91,7 +92,7 @@ func NewSyslogTarget( } t.messages = make(chan message) - go t.messageSender() + go t.messageSender(handler.Chan()) err := t.run() return t, err @@ -241,10 +242,14 @@ func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Messag t.messages <- message{filtered, *rfc5424Msg.Message, timestamp} } -func (t *SyslogTarget) messageSender() { +func (t *SyslogTarget) messageSender(entries chan<- api.Entry) { for msg := range t.messages { - if err := t.handler.Handle(msg.labels, msg.timestamp, msg.message); err != nil { - level.Error(t.logger).Log("msg", "error handling line", "error", err) + entries <- api.Entry{ + Labels: msg.labels, + Entry: logproto.Entry{ + Timestamp: msg.timestamp, + Line: msg.message, + }, } syslogEntries.Inc() } @@ -310,6 +315,7 @@ func (t *SyslogTarget) Stop() error { err := t.listener.Close() t.openConnections.Wait() close(t.messages) + t.handler.Stop() return err } diff --git a/pkg/promtail/targets/syslog/syslogtarget_test.go b/pkg/promtail/targets/syslog/syslogtarget_test.go index cb1af821b60b..d32f2fbfb93c 100644 --- a/pkg/promtail/targets/syslog/syslogtarget_test.go +++ b/pkg/promtail/targets/syslog/syslogtarget_test.go @@ -5,50 +5,20 @@ import ( "io" "net" "os" - "sync" "testing" "time" "unicode/utf8" "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/relabel" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" + "github.com/grafana/loki/pkg/promtail/client/fake" "github.com/grafana/loki/pkg/promtail/scrapeconfig" ) -type ClientMessage struct { - Labels model.LabelSet - Timestamp time.Time - Message string -} - -type TestLabeledClient struct { - log log.Logger - - messagesMtx sync.RWMutex - messages []ClientMessage -} - -func (c *TestLabeledClient) Handle(ls model.LabelSet, t time.Time, s string) error { - level.Debug(c.log).Log("msg", "received log", "log", s) - - c.messagesMtx.Lock() - defer c.messagesMtx.Unlock() - c.messages = append(c.messages, ClientMessage{ls, t, s}) - return nil -} - -func (c *TestLabeledClient) Messages() []ClientMessage { - c.messagesMtx.RLock() - defer c.messagesMtx.RUnlock() - - return c.messages -} - func TestSyslogTarget_NewlineSeparatedMessages(t *testing.T) { testSyslogTarget(t, false) } @@ -60,7 +30,7 @@ func TestSyslogTarget_OctetCounting(t *testing.T) { func testSyslogTarget(t *testing.T, octetCounting bool) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) - client := &TestLabeledClient{log: logger} + client := fake.New(func() {}) tgt, err := NewSyslogTarget(logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ ListenAddress: "127.0.0.1:0", @@ -89,8 +59,8 @@ func testSyslogTarget(t *testing.T, octetCounting bool) { require.NoError(t, c.Close()) require.Eventuallyf(t, func() bool { - return len(client.Messages()) == len(messages) - }, time.Second, time.Millisecond, "Expected to receive %d messages, got %d.", len(messages), len(client.Messages())) + return len(client.Received()) == len(messages) + }, time.Second, time.Millisecond, "Expected to receive %d messages, got %d.", len(messages), len(client.Received())) require.Equal(t, model.LabelSet{ "test": "syslog_target", @@ -102,10 +72,10 @@ func testSyslogTarget(t *testing.T, octetCounting bool) { "msg_id": "id1", "sd_custom_exkey": "1", - }, client.Messages()[0].Labels) - require.Equal(t, "An application event log entry...", client.Messages()[0].Message) + }, client.Received()[0].Labels) + require.Equal(t, "An application event log entry...", client.Received()[0].Line) - require.NotZero(t, client.Messages()[0].Timestamp) + require.NotZero(t, client.Received()[0].Timestamp) } func relabelConfig(t *testing.T) []*relabel.Config { @@ -159,7 +129,7 @@ func writeMessagesToStream(w io.Writer, messages []string, octetCounting bool) e func TestSyslogTarget_InvalidData(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) - client := &TestLabeledClient{log: logger} + client := fake.New(func() {}) tgt, err := NewSyslogTarget(logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ ListenAddress: "127.0.0.1:0", @@ -189,7 +159,7 @@ func TestSyslogTarget_InvalidData(t *testing.T) { func TestSyslogTarget_NonUTF8Message(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) - client := &TestLabeledClient{log: logger} + client := fake.New(func() {}) tgt, err := NewSyslogTarget(logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ ListenAddress: "127.0.0.1:0", @@ -216,17 +186,17 @@ func TestSyslogTarget_NonUTF8Message(t *testing.T) { require.NoError(t, c.Close()) require.Eventuallyf(t, func() bool { - return len(client.Messages()) == 2 - }, time.Second, time.Millisecond, "Expected to receive 2 messages, got %d.", len(client.Messages())) + return len(client.Received()) == 2 + }, time.Second, time.Millisecond, "Expected to receive 2 messages, got %d.", len(client.Received())) - require.Equal(t, msg1, client.Messages()[0].Message) - require.Equal(t, msg2, client.Messages()[1].Message) + require.Equal(t, msg1, client.Received()[0].Line) + require.Equal(t, msg2, client.Received()[1].Line) } func TestSyslogTarget_IdleTimeout(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) - client := &TestLabeledClient{log: logger} + client := fake.New(func() {}) tgt, err := NewSyslogTarget(logger, client, relabelConfig(t), &scrapeconfig.SyslogTargetConfig{ ListenAddress: "127.0.0.1:0", diff --git a/pkg/promtail/targets/testutils/testutils.go b/pkg/promtail/targets/testutils/testutils.go index 092c3b1e089f..41444769c28d 100644 --- a/pkg/promtail/targets/testutils/testutils.go +++ b/pkg/promtail/targets/testutils/testutils.go @@ -2,35 +2,9 @@ package testutils import ( "math/rand" - "sync" "time" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/prometheus/common/model" ) -type Entry struct { - Labels model.LabelSet - Time time.Time - Log string -} - -type TestClient struct { - Log log.Logger - Messages []*Entry - sync.Mutex -} - -func (c *TestClient) Handle(ls model.LabelSet, t time.Time, s string) error { - level.Debug(c.Log).Log("msg", "received log", "log", s) - - c.Lock() - defer c.Unlock() - c.Messages = append(c.Messages, &Entry{ls, t, s}) - return nil -} - func InitRandom() { rand.Seed(time.Now().UnixNano()) }