Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

positions.ignore-corruptions #1472

Merged
merged 3 commits into from
Jan 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## master / unreleased

* [FEATURE] promtail positions file corruptions can be ignored with the `positions.ignore-invalid-yaml` flag. In the case the positions yaml is corrupted an empty positions config will be used and should later overwrite the malformed yaml.

# 1.2.0 (2019-12-09)

One week has passed since the last Loki release, and it's time for a new one!
Expand Down
3 changes: 3 additions & 0 deletions docs/clients/promtail/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ is restarted to allow it to continue from where it left off.

# How often to update the positions file
[sync_period: <duration> | default = 10s]

# Whether to ignore & later overwrite positions files that are corrupted
[ignore_invalid_yaml: <boolean> | default = false]
```

## scrape_config
Expand Down
24 changes: 17 additions & 7 deletions pkg/promtail/positions/positions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ const positionFileMode = 0600

// Config describes where to get postition information from.
type Config struct {
SyncPeriod time.Duration `yaml:"sync_period"`
PositionsFile string `yaml:"filename"`
SyncPeriod time.Duration `yaml:"sync_period"`
PositionsFile string `yaml:"filename"`
IgnoreInvalidYaml bool `yaml:"ignore_invalid_yaml"`
}

// RegisterFlags register flags.
func (cfg *Config) RegisterFlags(flags *flag.FlagSet) {
flags.DurationVar(&cfg.SyncPeriod, "positions.sync-period", 10*time.Second, "Period with this to sync the position file.")
flag.StringVar(&cfg.PositionsFile, "positions.file", "/var/log/positions.yaml", "Location to read/write positions from.")
flag.BoolVar(&cfg.IgnoreInvalidYaml, "positions.ignore-invalid-yaml", false, "whether to ignore & later overwrite positions files that are corrupted")
}

// Positions tracks how far through each file we've read.
Expand All @@ -47,7 +49,7 @@ type File struct {

// New makes a new Positions.
func New(logger log.Logger, cfg Config) (*Positions, error) {
positions, err := readPositionsFile(cfg.PositionsFile)
positions, err := readPositionsFile(cfg, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,8 +183,9 @@ func (p *Positions) cleanup() {
}
}

func readPositionsFile(filename string) (map[string]string, error) {
cleanfn := filepath.Clean(filename)
func readPositionsFile(cfg Config, logger log.Logger) (map[string]string, error) {

cleanfn := filepath.Clean(cfg.PositionsFile)
buf, err := ioutil.ReadFile(cleanfn)
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -192,8 +195,15 @@ func readPositionsFile(filename string) (map[string]string, error) {
}

var p File
if err := yaml.UnmarshalStrict(buf, &p); err != nil {
return nil, fmt.Errorf("%s: %v", cleanfn, err)
err = yaml.UnmarshalStrict(buf, &p)
if err != nil {
// return empty if cfg option enabled
if cfg.IgnoreInvalidYaml {
level.Debug(logger).Log("msg", "ignoring invalid positions file", "file", cleanfn, "error", err)
return map[string]string{}, nil
}

return nil, fmt.Errorf("invalid yaml positions file [%s]: %v", cleanfn, err)
}

return p.Positions, nil
Expand Down
39 changes: 36 additions & 3 deletions pkg/promtail/positions/positions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"
"testing"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -44,7 +45,10 @@ func TestReadPositionsOK(t *testing.T) {
t.Fatal(err)
}

pos, err := readPositionsFile(temp)
pos, err := readPositionsFile(Config{
PositionsFile: temp,
}, log.NewNopLogger())

require.NoError(t, err)
require.Equal(t, "17623", pos["/tmp/random.log"])
}
Expand All @@ -60,7 +64,10 @@ func TestReadPositionsFromDir(t *testing.T) {
_ = os.Remove(temp)
}()

_, err = readPositionsFile(temp)
_, err = readPositionsFile(Config{
PositionsFile: temp,
}, log.NewNopLogger())

require.Error(t, err)
require.True(t, strings.Contains(err.Error(), temp)) // error must contain filename
}
Expand All @@ -79,7 +86,33 @@ func TestReadPositionsFromBadYaml(t *testing.T) {
t.Fatal(err)
}

_, err = readPositionsFile(temp)
_, err = readPositionsFile(Config{
PositionsFile: temp,
}, log.NewNopLogger())

require.Error(t, err)
require.True(t, strings.Contains(err.Error(), temp)) // error must contain filename
}

func TestReadPositionsFromBadYamlIgnoreCorruption(t *testing.T) {
temp := tempFilename(t)
defer func() {
_ = os.Remove(temp)
}()

badYaml := []byte(`positions:
/tmp/random.log: "176
`)
err := ioutil.WriteFile(temp, badYaml, 0644)
if err != nil {
t.Fatal(err)
}

out, err := readPositionsFile(Config{
PositionsFile: temp,
IgnoreInvalidYaml: true,
}, log.NewNopLogger())

require.NoError(t, err)
require.Equal(t, map[string]string{}, out)
}