-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
journaltarget.go
415 lines (356 loc) · 10.7 KB
/
journaltarget.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
//go:build linux && cgo && promtail_journal_enabled
// +build linux,cgo,promtail_journal_enabled
package journal
import (
"fmt"
"io"
"strings"
"syscall"
"time"
"github.com/coreos/go-systemd/sdjournal"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/grafana/loki/v3/clients/pkg/promtail/api"
"github.com/grafana/loki/v3/clients/pkg/promtail/positions"
"github.com/grafana/loki/v3/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/target"
"github.com/grafana/loki/v3/pkg/logproto"
)
const (
// journalEmptyStr is represented as a single-character space because
// returning an empty string from sdjournal.JournalReaderConfig's
// Formatter causes an immediate EOF and induces performance issues
// with how that is handled in sdjournal.
journalEmptyStr = " "
// journalDefaultMaxAgeTime represents the default earliest entry that
// will be read by the journal reader if there is no saved position
// newer than the "max_age" time.
journalDefaultMaxAgeTime = time.Hour * 7
)
type journalReader interface {
io.Closer
Follow(until <-chan time.Time, writer io.Writer) error
}
// Abstracted functions for interacting with the journal, used for mocking in tests:
type (
journalReaderFunc func(sdjournal.JournalReaderConfig) (journalReader, error)
journalEntryFunc func(cfg sdjournal.JournalReaderConfig, cursor string) (*sdjournal.JournalEntry, error)
)
// Default implementations of abstracted functions:
var defaultJournalReaderFunc = func(c sdjournal.JournalReaderConfig) (journalReader, error) {
return sdjournal.NewJournalReader(c)
}
var defaultJournalEntryFunc = func(c sdjournal.JournalReaderConfig, cursor string) (entry *sdjournal.JournalEntry, err error) {
var journal *sdjournal.Journal
if c.Path != "" {
journal, err = sdjournal.NewJournalFromDir(c.Path)
} else {
journal, err = sdjournal.NewJournal()
}
if err != nil {
return nil, err
}
defer func() {
if errClose := journal.Close(); err == nil {
err = errClose
}
}()
err = journal.SeekCursor(cursor)
if err != nil {
return nil, err
}
// Just seeking the cursor won't give us the entry. We should call Next() or Previous()
// to get the closest following or the closest preceding entry. We have chosen here to call Next(),
// reason being, if we call Previous() we would re read an already read entry.
// More info here https://www.freedesktop.org/software/systemd/man/sd_journal_seek_cursor.html#
_, err = journal.Next()
if err != nil {
return nil, err
}
return journal.GetEntry()
}
// JournalTarget tails systemd journal entries.
// nolint
type JournalTarget struct {
metrics *Metrics
logger log.Logger
handler api.EntryHandler
positions positions.Positions
positionPath string
relabelConfig []*relabel.Config
config *scrapeconfig.JournalTargetConfig
labels model.LabelSet
r journalReader
until chan time.Time
}
// NewJournalTarget configures a new JournalTarget.
func NewJournalTarget(
metrics *Metrics,
logger log.Logger,
handler api.EntryHandler,
positions positions.Positions,
jobName string,
relabelConfig []*relabel.Config,
targetConfig *scrapeconfig.JournalTargetConfig,
) (*JournalTarget, error) {
return journalTargetWithReader(
metrics,
logger,
handler,
positions,
jobName,
relabelConfig,
targetConfig,
defaultJournalReaderFunc,
defaultJournalEntryFunc,
)
}
func journalTargetWithReader(
metrics *Metrics,
logger log.Logger,
handler api.EntryHandler,
pos positions.Positions,
jobName string,
relabelConfig []*relabel.Config,
targetConfig *scrapeconfig.JournalTargetConfig,
readerFunc journalReaderFunc,
entryFunc journalEntryFunc,
) (*JournalTarget, error) {
positionPath := positions.CursorKey(jobName)
position := pos.GetString(positionPath)
if readerFunc == nil {
readerFunc = defaultJournalReaderFunc
}
if entryFunc == nil {
entryFunc = defaultJournalEntryFunc
}
until := make(chan time.Time)
t := &JournalTarget{
metrics: metrics,
logger: logger,
handler: handler,
positions: pos,
positionPath: positionPath,
relabelConfig: relabelConfig,
labels: targetConfig.Labels,
config: targetConfig,
until: until,
}
var maxAge time.Duration
var err error
if targetConfig.MaxAge == "" {
maxAge = journalDefaultMaxAgeTime
} else {
maxAge, err = time.ParseDuration(targetConfig.MaxAge)
}
if err != nil {
return nil, errors.Wrap(err, "parsing journal reader 'max_age' config value")
}
cb := journalConfigBuilder{
JournalPath: targetConfig.Path,
Position: position,
MaxAge: maxAge,
EntryFunc: entryFunc,
}
matches := strings.Fields(targetConfig.Matches)
for _, m := range matches {
fv := strings.Split(m, "=")
if len(fv) != 2 {
return nil, errors.New("Error parsing journal reader 'matches' config value")
}
cb.Matches = append(cb.Matches, sdjournal.Match{
Field: fv[0],
Value: fv[1],
})
}
cfg := t.generateJournalConfig(cb)
t.r, err = readerFunc(cfg)
if err != nil {
return nil, errors.Wrap(err, "creating journal reader")
}
go func() {
for {
err := t.r.Follow(until, io.Discard)
if err != nil {
if err == sdjournal.ErrExpired {
return
}
if err == syscall.EBADMSG || err == io.EOF || strings.HasPrefix(err.Error(), "failed to iterate journal:") {
level.Error(t.logger).Log("msg", "unable to follow journal", "err", err.Error())
return
}
level.Error(t.logger).Log("msg", "received unexpected error while following the journal", "err", err.Error())
}
// prevent tight loop
time.Sleep(100 * time.Millisecond)
}
}()
return t, nil
}
type journalConfigBuilder struct {
JournalPath string
Position string
Matches []sdjournal.Match
MaxAge time.Duration
EntryFunc journalEntryFunc
}
// generateJournalConfig generates a journal config by trying to intelligently
// determine if a time offset or the cursor should be used for the starting
// position in the reader.
func (t *JournalTarget) generateJournalConfig(
cb journalConfigBuilder,
) sdjournal.JournalReaderConfig {
cfg := sdjournal.JournalReaderConfig{
Path: cb.JournalPath,
Matches: cb.Matches,
Formatter: t.formatter,
}
// When generating the JournalReaderConfig, we want to preferably
// use the Cursor, since it's guaranteed unique to a given journal
// entry. When we don't know the cursor position (or want to set
// a start time), we'll fall back to the less-precise Since, which
// takes a negative duration back from the current system time.
//
// The presence of Since takes precedence over Cursor, so we only
// ever set one and not both here.
if cb.Position == "" {
cfg.Since = -1 * cb.MaxAge
return cfg
}
// We have a saved position and need to get that entry to see if it's
// older than cb.MaxAge. If it _is_ older, then we need to use cfg.Since
// rather than cfg.Cursor.
entry, err := cb.EntryFunc(cfg, cb.Position)
if err != nil {
level.Error(t.logger).Log("msg", "received error reading saved journal position", "err", err.Error())
cfg.Since = -1 * cb.MaxAge
return cfg
}
ts := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))
if time.Since(ts) > cb.MaxAge {
cfg.Since = -1 * cb.MaxAge
return cfg
}
cfg.Cursor = cb.Position
return cfg
}
func (t *JournalTarget) formatter(entry *sdjournal.JournalEntry) (string, error) {
ts := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))
var msg string
if t.config.JSON {
json := jsoniter.ConfigCompatibleWithStandardLibrary
bb, err := json.Marshal(entry.Fields)
if err != nil {
level.Error(t.logger).Log("msg", "could not marshal journal fields to JSON", "err", err, "unit", entry.Fields["_SYSTEMD_UNIT"])
return journalEmptyStr, nil
}
msg = string(bb)
} else {
var ok bool
msg, ok = entry.Fields["MESSAGE"]
if !ok {
level.Debug(t.logger).Log("msg", "received journal entry with no MESSAGE field", "unit", entry.Fields["_SYSTEMD_UNIT"])
t.metrics.journalErrors.WithLabelValues(noMessageError).Inc()
return journalEmptyStr, nil
}
}
entryLabels := makeJournalFields(entry.Fields)
// Add constant labels
for k, v := range t.labels {
entryLabels[string(k)] = string(v)
}
processedLabels, _ := relabel.Process(labels.FromMap(entryLabels), t.relabelConfig...)
processedLabelsMap := processedLabels.Map()
labels := make(model.LabelSet, len(processedLabelsMap))
for k, v := range processedLabelsMap {
if k[0:2] == "__" {
continue
}
labels[model.LabelName(k)] = model.LabelValue(v)
}
if len(labels) == 0 {
// No labels, drop journal entry
level.Debug(t.logger).Log("msg", "received journal entry with no labels", "unit", entry.Fields["_SYSTEMD_UNIT"])
t.metrics.journalErrors.WithLabelValues(emptyLabelsError).Inc()
return journalEmptyStr, nil
}
t.metrics.journalLines.Inc()
t.positions.PutString(t.positionPath, entry.Cursor)
t.handler.Chan() <- api.Entry{
Labels: labels,
Entry: logproto.Entry{
Line: msg,
Timestamp: ts,
},
}
return journalEmptyStr, nil
}
// Type returns JournalTargetType.
func (t *JournalTarget) Type() target.TargetType {
return target.JournalTargetType
}
// Ready indicates whether or not the journal is ready to be
// read from.
func (t *JournalTarget) Ready() bool {
return true
}
// DiscoveredLabels returns the set of labels discovered by
// the JournalTarget, which is always nil. Implements
// Target.
func (t *JournalTarget) DiscoveredLabels() model.LabelSet {
return nil
}
// Labels returns the set of labels that statically apply to
// all log entries produced by the JournalTarget.
func (t *JournalTarget) Labels() model.LabelSet {
return t.labels
}
// Details returns target-specific details.
func (t *JournalTarget) Details() interface{} {
return map[string]string{
"position": t.positions.GetString(t.positionPath),
}
}
// Stop shuts down the JournalTarget.
func (t *JournalTarget) Stop() error {
t.until <- time.Now()
err := t.r.Close()
t.handler.Stop()
return err
}
func makeJournalFields(fields map[string]string) map[string]string {
result := make(map[string]string, len(fields))
for k, v := range fields {
if k == "PRIORITY" {
result[fmt.Sprintf("__journal_%s_%s", strings.ToLower(k), "keyword")] = makeJournalPriority(v)
}
result[fmt.Sprintf("__journal_%s", strings.ToLower(k))] = v
}
return result
}
func makeJournalPriority(priority string) string {
switch priority {
case "0":
return "emerg"
case "1":
return "alert"
case "2":
return "crit"
case "3":
return "error"
case "4":
return "warning"
case "5":
return "notice"
case "6":
return "info"
case "7":
return "debug"
}
return priority
}