Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #43 from mjs/timestamp-filtering
Browse files Browse the repository at this point in the history
Rejection of invalid timestamps
  • Loading branch information
oplehto authored Mar 13, 2018
2 parents 15c080c + ffc45b8 commit a67dc47
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 51 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ nats_subject_monitor = "influx-spout-monitor"
# The number of filter workers to spawn.
workers = 8

# Incoming metrics with timestamps ± this value from the current time will be
# rejected. Metrics with timestamps that are significantly different from previously
# written timestamps negatively impact InfluxDB performance.
max_time_delta_secs = 600

# At least one rule should be defined. Rules are defined using TOML's table
# syntax. The following examples show each rule type.

Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Config struct {
NATSPendingMaxMB int `toml:"nats_pending_max_mb"`
ListenerBatchBytes int `toml:"listener_batch_bytes"`
Rule []Rule `toml:"rule"`
MaxTimeDeltaSecs int `toml:"max_time_delta_secs"`
Debug bool `toml:"debug"`
}

Expand Down Expand Up @@ -77,6 +78,7 @@ func newDefaultConfig() *Config {
ReadBufferBytes: 4 * 1024 * 1024,
NATSPendingMaxMB: 200,
ListenerBatchBytes: 1024 * 1024,
MaxTimeDeltaSecs: 600,
}
}

Expand Down
7 changes: 5 additions & 2 deletions config/config_small_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ write_timeout_secs = 32
read_buffer_bytes = 43210
nats_pending_max_mb = 100
listener_batch_bytes = 4096
max_time_delta_secs = 789
`
conf, err := parseConfig(validConfigSample)
require.NoError(t, err, "Couldn't parse a valid config: %v\n", err)
Expand All @@ -67,8 +68,9 @@ listener_batch_bytes = 4096
assert.Equal(t, 96, conf.Workers)
assert.Equal(t, 32, conf.WriteTimeoutSecs, "WriteTimeoutSecs must match")
assert.Equal(t, 43210, conf.ReadBufferBytes)
assert.Equal(t, 100, conf.NATSPendingMaxMB, "NATSPendingMaxMB must match")
assert.Equal(t, 4096, conf.ListenerBatchBytes, "NATSPendingMaxMB must match")
assert.Equal(t, 100, conf.NATSPendingMaxMB)
assert.Equal(t, 4096, conf.ListenerBatchBytes)
assert.Equal(t, 789, conf.MaxTimeDeltaSecs)

assert.Equal(t, 8086, conf.InfluxDBPort, "InfluxDB Port must match")
assert.Equal(t, "junk_nats", conf.DBName, "InfluxDB DBname must match")
Expand Down Expand Up @@ -101,6 +103,7 @@ func TestAllDefaults(t *testing.T) {
assert.Equal(t, 4194304, conf.ReadBufferBytes)
assert.Equal(t, 200, conf.NATSPendingMaxMB)
assert.Equal(t, 1048576, conf.ListenerBatchBytes)
assert.Equal(t, 600, conf.MaxTimeDeltaSecs)
assert.Equal(t, false, conf.Debug)
assert.Len(t, conf.Rule, 0)
}
Expand Down
23 changes: 16 additions & 7 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ import (
"sync"
"time"

"github.com/nats-io/go-nats"

"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/lineformatter"
"github.com/jumptrading/influx-spout/stats"
"github.com/nats-io/nats"
)

// Name for supported stats
const (
linesPassed = "lines-passed"
linesProcessed = "lines-processed"
linesRejected = "lines-rejected"
linesPassed = "passed"
linesProcessed = "processed"
linesRejected = "rejected"
linesInvalidTime = "invalid-time"
)

// StartFilter creates a Filter instance, sets up its rules based on
Expand Down Expand Up @@ -65,7 +65,14 @@ func StartFilter(conf *config.Config) (_ *Filter, err error) {

jobs := make(chan []byte, 1024)
for i := 0; i < f.c.Workers; i++ {
w, err := newWorker(rules, stats, f.natsConnect, f.c.NATSSubjectJunkyard)
w, err := newWorker(
f.c.MaxTimeDeltaSecs,
rules,
stats,
f.c.Debug,
f.natsConnect,
f.c.NATSSubjectJunkyard,
)
if err != nil {
return nil, fmt.Errorf("failed to start worker: %v", err)
}
Expand Down Expand Up @@ -105,6 +112,7 @@ func initStats(rules *RuleSet) *stats.Stats {
linesPassed,
linesProcessed,
linesRejected,
linesInvalidTime,
}
for i := 0; i < rules.Count(); i++ {
statNames = append(statNames, ruleToStatsName(i))
Expand Down Expand Up @@ -153,7 +161,7 @@ func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) {
totalLine := lineformatter.New(
"spout_stat_filter",
[]string{"filter"},
"passed", "processed", "rejected",
linesPassed, linesProcessed, linesRejected, linesInvalidTime,
)
ruleLine := lineformatter.New(
"spout_stat_filter_rule",
Expand All @@ -170,6 +178,7 @@ func (f *Filter) startStatistician(stats *stats.Stats, rules *RuleSet) {
st.Get(linesPassed),
st.Get(linesProcessed),
st.Get(linesRejected),
st.Get(linesInvalidTime),
))

// publish the per rule stats
Expand Down
93 changes: 79 additions & 14 deletions filter/filter_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package filter

import (
"fmt"
"strings"
"testing"
"time"

"github.com/nats-io/go-nats"
"github.com/stretchr/testify/require"
Expand All @@ -29,25 +31,30 @@ import (

const natsPort = 44446

var conf = config.Config{
Name: "particle",
NATSAddress: fmt.Sprintf("nats://127.0.0.1:%d", natsPort),
NATSSubject: []string{"filter-test"},
NATSSubjectMonitor: "filter-test-monitor",
NATSSubjectJunkyard: "filter-junkyard",
Workers: 1,
Rule: []config.Rule{{
Rtype: "basic",
Match: "hello",
Subject: "hello-subject",
}},
func testConfig() *config.Config {
return &config.Config{
Name: "particle",
NATSAddress: fmt.Sprintf("nats://127.0.0.1:%d", natsPort),
NATSSubject: []string{"filter-test"},
NATSSubjectMonitor: "filter-test-monitor",
NATSSubjectJunkyard: "filter-junkyard",
Workers: 1,
MaxTimeDeltaSecs: 600,
Rule: []config.Rule{{
Rtype: "basic",
Match: "hello",
Subject: "hello-subject",
}},
}
}

func TestFilterWorker(t *testing.T) {
gnatsd := spouttest.RunGnatsd(natsPort)
defer gnatsd.Shutdown()

filter, err := StartFilter(&conf)
conf := testConfig()

filter, err := StartFilter(conf)
require.NoError(t, err)
defer filter.Stop()

Expand Down Expand Up @@ -98,7 +105,65 @@ goodbye,host=gopher01

// Receive total stats
spouttest.AssertRecvMulti(t, statsCh, "stats", `
spout_stat_filter,filter=particle passed=2,processed=3,rejected=1
spout_stat_filter,filter=particle passed=2,processed=3,rejected=1,invalid-time=0
`)

// Receive rule specific stats
spouttest.AssertRecvMulti(t, statsCh, "rule stats", `
spout_stat_filter_rule,filter=particle,rule=hello-subject triggered=2
`)
}

func TestInvalidTimeStamps(t *testing.T) {
gnatsd := spouttest.RunGnatsd(natsPort)
defer gnatsd.Shutdown()

conf := testConfig()
conf.MaxTimeDeltaSecs = 10

filter, err := StartFilter(conf)
require.NoError(t, err)
defer filter.Stop()

nc, err := nats.Connect(conf.NATSAddress)
require.NoError(t, err)
defer nc.Close()

// Subscribe to filter output
helloCh := make(chan string, 1)
_, err = nc.Subscribe(conf.Rule[0].Subject, func(msg *nats.Msg) {
helloCh <- string(msg.Data)
})
require.NoError(t, err)

// Subscribe to stats output
statsCh := make(chan string, 10)
_, err = nc.Subscribe(conf.NATSSubjectMonitor, func(msg *nats.Msg) {
statsCh <- string(msg.Data)
})
require.NoError(t, err)

// Publish 3 lines.
// The first should be rejected because it is too old.
// The second should be rejected because it is too new.
// The third should make it through because it is current.
// The fourth should make it through because it has no timestamp.
now := time.Now()
lines := []string{
fmt.Sprintf("hello,instance=0 foo=0 %d", now.Add(-time.Second*11).UnixNano()),
fmt.Sprintf("hello,instance=1 foo=0 %d", now.Add(time.Second*11).UnixNano()),
fmt.Sprintf("hello,instance=2 foo=1 %d", now.UnixNano()),
"hello,instance=2 foo=3",
}
err = nc.Publish(conf.NATSSubject[0], []byte(strings.Join(lines, "\n")))
require.NoError(t, err)

// Expect to see the 3rd & 4th lines.
spouttest.AssertRecv(t, helloCh, "helloCh", strings.Join(lines[2:], "\n"))

// Receive total stats.
spouttest.AssertRecvMulti(t, statsCh, "stats", `
spout_stat_filter,filter=particle passed=2,processed=4,rejected=0,invalid-time=2
`)

// Receive rule specific stats
Expand Down
36 changes: 24 additions & 12 deletions filter/rules_small_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package filter

import (
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -188,22 +191,31 @@ func BenchmarkProcessBatch(b *testing.B) {
rs.Append(CreateBasicRule("hello", "hello-out"))
rs.Append(CreateRegexRule("foo|bar", "foobar-out"))

w, err := newWorker(rs, initStats(rs), nullNATSConnect, "junk")
w, err := newWorker(600, rs, initStats(rs), false, nullNATSConnect, "junk")
require.NoError(b, err)

batch := []byte(`
hello,host=gopher01 somefield=11,etc=false
bar,host=gopher02 somefield=14
pepsi host=gopher01,cheese=stilton
hello,host=gopher01 somefield=11,etc=false
bar,host=gopher02 somefield=14
pepsi host=gopher01,cheese=stilton
hello,host=gopher01 somefield=11,etc=false
bar,host=gopher02 somefield=14
pepsi host=gopher01,cheese=stilton
`[1:])
lines := []string{
"hello,host=gopher01 somefield=11,etc=false",
"bar,host=gopher02 somefield=14",
"pepsi host=gopher01,cheese=stilton",
"hello,host=gopher01 somefield=11,etc=false",
"bar,host=gopher02 somefield=14",
"pepsi host=gopher01,cheese=stilton",
"hello,host=gopher01 somefield=11,etc=false",
"bar,host=gopher02 somefield=14",
"pepsi host=gopher01,cheese=stilton",
}

// Add a timestamp to each line.
ts := strconv.FormatInt(time.Now().UnixNano(), 10)
for i, line := range lines {
lines[i] = line + " " + ts
}

batch := []byte(strings.Join(lines, "\n"))

b.ResetTimer()

for i := 0; i < b.N; i++ {
w.processBatch(batch)
}
Expand Down
83 changes: 83 additions & 0 deletions filter/timestamp_small_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2018 Jump Trading
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build small

package filter

import (
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestExtractTimestamp(t *testing.T) {
ts := time.Date(1997, 6, 5, 4, 3, 2, 1, time.UTC).UnixNano()
tsStr := strconv.FormatInt(ts, 10)
defaultTs := time.Now().UnixNano()

check := func(input string, expected int64) {
assert.Equal(
t, expected,
extractTimestamp([]byte(input), defaultTs),
"extractTimestamp(%q)", input)
}

check("", defaultTs)
check(" ", defaultTs)
check("weather temp=99", defaultTs)
check("weather,city=paris temp=60", defaultTs)
check("weather,city=paris temp=99,humidity=100", defaultTs)
check("weather temp=99 "+tsStr, ts)
check("weather temp=99 "+tsStr+"\n", ts)
check("weather,city=paris temp=60 "+tsStr, ts)
check("weather,city=paris temp=60,humidity=100 "+tsStr, ts)
check("weather,city=paris temp=60,humidity=100 "+tsStr+"\n", ts)

// Various invalid timestamps
check("weather temp=99 "+tsStr+" ", defaultTs)
check("weather temp=99 xxxxxxxxxxxxxxxxxxx", defaultTs)
check("weather temp=99 152076148x803180202", defaultTs) // non-digit embedded
check("weather temp=99 11520761485803180202", defaultTs) // too long
check("weather temp=99 -"+tsStr, defaultTs)
check(tsStr, defaultTs)
}

func TestFastParseInt(t *testing.T) {
check := func(input string, expected int64) {
actual, err := fastParseInt([]byte(input))
require.NoError(t, err)
assert.Equal(t, expected, actual, "fastParseInt(%q)", input)
}

shouldFail := func(input string) {
_, err := fastParseInt([]byte(input))
assert.Error(t, err)
}

check("0", 0)
check("1", 1)
check("9", 9)
check("10", 10)
check("99", 99)
check("101", 101)
check("9223372036854775807", (1<<63)-1) // max int64 value

shouldFail("9223372036854775808") // max int64 value + 1
shouldFail("-1") // negatives not supported
shouldFail("x")
}
Loading

0 comments on commit a67dc47

Please sign in to comment.