From 6f1fcba33ed7e918618021b623d2e44e4925e8dd Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Thu, 28 Sep 2023 06:09:26 -0600 Subject: [PATCH] [chore][pkg/stanza] Migrate flush func tests (#27229) --- pkg/stanza/flush/flush_test.go | 125 ++++++------------ .../split/splittest/splittest_detailed.go | 44 ++++-- pkg/stanza/split/splittest/splittest_test.go | 29 +++- 3 files changed, 99 insertions(+), 99 deletions(-) diff --git a/pkg/stanza/flush/flush_test.go b/pkg/stanza/flush/flush_test.go index 140308274033..b40c829e9392 100644 --- a/pkg/stanza/flush/flush_test.go +++ b/pkg/stanza/flush/flush_test.go @@ -8,93 +8,48 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split/splittest" ) -func TestFlusher(t *testing.T) { - - // bufio.ScanWords is a simple split function which with tokenize based on newlines. - // It will return a partial token if atEOF=true. In order to test the flusher, - // we don't want the split func to return partial tokens on its own. Instead, we only - // want the flusher to force the partial out based on its own behavior. Therefore, we - // always use atEOF=false. - - flushPeriod := 100 * time.Millisecond - f := WithPeriod(bufio.ScanWords, flushPeriod) - - content := []byte("foo bar hellowo") - - // The first token is complete - advance, token, err := f(content, false) - assert.NoError(t, err) - assert.Equal(t, 4, advance) - assert.Equal(t, []byte("foo"), token) - - // The second token is also complete - advance, token, err = f(content[4:], false) - assert.NoError(t, err) - assert.Equal(t, 4, advance) - assert.Equal(t, []byte("bar"), token) - - // We find a partial token, but we just updated, so don't flush it yet - advance, token, err = f(content[8:], false) - assert.NoError(t, err) - assert.Equal(t, 0, advance) - assert.Equal(t, []byte(nil), token) - - // We find the same partial token, but we updated quite recently, so still don't flush it yet - advance, token, err = f(content[8:], false) - assert.NoError(t, err) - assert.Equal(t, 0, advance) - assert.Equal(t, []byte(nil), token) - - time.Sleep(2 * flushPeriod) - - // Now it's been a while, so we should just flush the partial token - advance, token, err = f(content[8:], false) - assert.NoError(t, err) - assert.Equal(t, 7, advance) - assert.Equal(t, []byte("hellowo"), token) +func TestNewlineSplitFunc(t *testing.T) { + testCases := []struct { + name string + flushPeriod time.Duration + baseFunc bufio.SplitFunc + input []byte + steps []splittest.Step + }{ + { + name: "FlushNoPeriod", + input: []byte("complete line\nincomplete"), + baseFunc: scanLinesStrict, + steps: []splittest.Step{ + splittest.ExpectAdvanceToken(len("complete line\n"), "complete line"), + }, + }, + { + name: "FlushIncompleteLineAfterPeriod", + input: []byte("complete line\nincomplete"), + baseFunc: scanLinesStrict, + flushPeriod: 100 * time.Millisecond, + steps: []splittest.Step{ + splittest.ExpectAdvanceToken(len("complete line\n"), "complete line"), + splittest.ExpectReadMore(), + splittest.Eventually(splittest.ExpectToken("incomplete"), 150*time.Millisecond, 10*time.Millisecond), + }, + }, + } + + for _, tc := range testCases { + splitFunc := WithPeriod(tc.baseFunc, tc.flushPeriod) + t.Run(tc.name, splittest.New(splitFunc, tc.input, tc.steps...)) + } } -func TestNoFlushPeriod(t *testing.T) { - // Same test as above, but with a flush period of 0 we should never force flush. - // In other words, we should expect exactly the behavior of bufio.ScanWords. - - flushPeriod := time.Duration(0) - f := WithPeriod(bufio.ScanWords, flushPeriod) - - content := []byte("foo bar hellowo") - - // The first token is complete - advance, token, err := f(content, false) - assert.NoError(t, err) - assert.Equal(t, 4, advance) - assert.Equal(t, []byte("foo"), token) - - // The second token is also complete - advance, token, err = f(content[4:], false) - assert.NoError(t, err) - assert.Equal(t, 4, advance) - assert.Equal(t, []byte("bar"), token) - - // We find a partial token, but we're using flushPeriod = 0 so we should never flush - advance, token, err = f(content[8:], false) - assert.NoError(t, err) - assert.Equal(t, 0, advance) - assert.Equal(t, []byte(nil), token) - - // We find the same partial token, but we're using flushPeriod = 0 so we should never flush - advance, token, err = f(content[8:], false) - assert.NoError(t, err) - assert.Equal(t, 0, advance) - assert.Equal(t, []byte(nil), token) - - time.Sleep(2 * flushPeriod) - - // Now it's been a while, but we are using flushPeriod=0, so we should never not flush - advance, token, err = f(content[8:], false) - assert.NoError(t, err) - assert.Equal(t, 0, advance) - assert.Equal(t, []byte(nil), token) +func scanLinesStrict(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = bufio.ScanLines(data, atEOF) + if advance == len(token) { + return 0, nil, nil + } + return } diff --git a/pkg/stanza/split/splittest/splittest_detailed.go b/pkg/stanza/split/splittest/splittest_detailed.go index 3fe9a1f7f580..07960bd253f6 100644 --- a/pkg/stanza/split/splittest/splittest_detailed.go +++ b/pkg/stanza/split/splittest/splittest_detailed.go @@ -6,15 +6,26 @@ package splittest // import "github.com/open-telemetry/opentelemetry-collector-c import ( "bufio" "testing" + "time" "github.com/stretchr/testify/assert" ) type Step struct { - validate validate + waitAtEOF func() bool + validate func(t *testing.T, advance int, token []byte, err error) } -type validate func(t *testing.T, advance int, token []byte, err error) +var noWait = func() bool { return false } + +func ExpectReadMore() Step { + return Step{ + waitAtEOF: noWait, + validate: func(t *testing.T, advance int, token []byte, err error) { + assert.True(t, needMoreData(advance, token, err)) + }, + } +} func ExpectToken(expectToken string) Step { return ExpectAdvanceToken(len(expectToken), expectToken) @@ -22,6 +33,7 @@ func ExpectToken(expectToken string) Step { func ExpectAdvanceToken(expectAdvance int, expectToken string) Step { return Step{ + waitAtEOF: noWait, validate: func(t *testing.T, advance int, token []byte, err error) { assert.Equal(t, expectAdvance, advance) assert.Equal(t, []byte(expectToken), token) @@ -32,6 +44,7 @@ func ExpectAdvanceToken(expectAdvance int, expectToken string) Step { func ExpectAdvanceNil(expectAdvance int) Step { return Step{ + waitAtEOF: noWait, validate: func(t *testing.T, advance int, token []byte, err error) { assert.Equal(t, expectAdvance, advance) assert.Equal(t, []byte(nil), token) @@ -42,26 +55,38 @@ func ExpectAdvanceNil(expectAdvance int) Step { func ExpectError(expectErr string) Step { return Step{ + waitAtEOF: noWait, validate: func(t *testing.T, advance int, token []byte, err error) { assert.EqualError(t, err, expectErr) }, } } +func Eventually(step Step, maxTime time.Duration, tick time.Duration) Step { + var waited time.Duration + step.waitAtEOF = func() bool { + time.Sleep(tick) + waited += tick + return waited < maxTime + } + return step +} + func New(splitFunc bufio.SplitFunc, input []byte, steps ...Step) func(*testing.T) { return func(t *testing.T) { var offset int - var atEOF bool - for _, step := range append(steps, expectMoreRequestAtEOF()) { + for _, step := range append(steps, ExpectReadMore()) { // Split funcs do not have control over the size of the // buffer so must be able to ask for more data as needed. // Start with a tiny buffer and grow it slowly to ensure // the split func is capable of asking appropriately. var bufferSize int + var atEOF bool var advance int var token []byte var err error - for !atEOF && needMoreData(advance, token, err) { + + for needMoreData(advance, token, err) && (!atEOF || step.waitAtEOF()) { // Grow the buffer at a slow pace to ensure that we're // exercising the split func's ability to ask for more data. bufferSize = 1 + bufferSize + bufferSize/8 @@ -73,6 +98,7 @@ func New(splitFunc bufio.SplitFunc, input []byte, steps ...Step) func(*testing.T data = append(data, input[offset:offset+bufferSize]...) } advance, token, err = splitFunc(data, atEOF) + // t.Errorf("\nbuffer: %d, advance: %d, token: %q, err: %v", bufferSize, advance, token, err) } offset += advance step.validate(t, advance, token, err) @@ -80,14 +106,6 @@ func New(splitFunc bufio.SplitFunc, input []byte, steps ...Step) func(*testing.T } } -func expectMoreRequestAtEOF() Step { - return Step{ - validate: func(t *testing.T, advance int, token []byte, err error) { - assert.True(t, needMoreData(advance, token, err)) - }, - } -} - func needMoreData(advance int, token []byte, err error) bool { return advance == 0 && token == nil && err == nil } diff --git a/pkg/stanza/split/splittest/splittest_test.go b/pkg/stanza/split/splittest/splittest_test.go index 44fce018d86a..0d3854201f44 100644 --- a/pkg/stanza/split/splittest/splittest_test.go +++ b/pkg/stanza/split/splittest/splittest_test.go @@ -8,6 +8,7 @@ import ( "errors" "strings" "testing" + "time" ) func TestNew(t *testing.T) { @@ -111,6 +112,17 @@ func TestNew(t *testing.T) { ExpectError("hello error!"), }, }, + { + name: "ScanLinesStrictWithFlush", + splitFunc: scanLinesStrictWithFlush(100 * time.Millisecond), + input: []byte("foo bar.\nhello world!\nincomplete line"), + steps: []Step{ + ExpectAdvanceToken(len("foo bar.\n"), "foo bar."), + ExpectAdvanceToken(len("hello world!\n"), "hello world!"), + ExpectReadMore(), + Eventually(ExpectToken("incomplete line"), 150*time.Millisecond, 10*time.Millisecond), + }, + }, } for _, tc := range testCases { @@ -129,7 +141,22 @@ func scanLinesStrict(data []byte, atEOF bool) (advance int, token []byte, err er func scanLinesError(data []byte, atEOF bool) (advance int, token []byte, err error) { advance, token, err = bufio.ScanLines(data, atEOF) if strings.Contains(string(token), "error") { - return 0, nil, errors.New(string(token)) + return advance, token, errors.New(string(token)) } return } + +func scanLinesStrictWithFlush(flushPeriod time.Duration) bufio.SplitFunc { + now := time.Now() + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = scanLinesStrict(data, atEOF) + if advance > 0 || token != nil || err != nil { + return + } + if time.Since(now) > flushPeriod { + now = time.Now() + return len(data), data, nil + } + return 0, nil, nil + } +}