Skip to content

Commit

Permalink
[chore][pkg/stanza] Migrate flush func tests (#27229)
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Sep 28, 2023
1 parent 189a64b commit 6f1fcba
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 99 deletions.
125 changes: 40 additions & 85 deletions pkg/stanza/flush/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,93 +8,48 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split/splittest"
)

func TestFlusher(t *testing.T) {

// bufio.ScanWords is a simple split function which with tokenize based on newlines.
// It will return a partial token if atEOF=true. In order to test the flusher,
// we don't want the split func to return partial tokens on its own. Instead, we only
// want the flusher to force the partial out based on its own behavior. Therefore, we
// always use atEOF=false.

flushPeriod := 100 * time.Millisecond
f := WithPeriod(bufio.ScanWords, flushPeriod)

content := []byte("foo bar hellowo")

// The first token is complete
advance, token, err := f(content, false)
assert.NoError(t, err)
assert.Equal(t, 4, advance)
assert.Equal(t, []byte("foo"), token)

// The second token is also complete
advance, token, err = f(content[4:], false)
assert.NoError(t, err)
assert.Equal(t, 4, advance)
assert.Equal(t, []byte("bar"), token)

// We find a partial token, but we just updated, so don't flush it yet
advance, token, err = f(content[8:], false)
assert.NoError(t, err)
assert.Equal(t, 0, advance)
assert.Equal(t, []byte(nil), token)

// We find the same partial token, but we updated quite recently, so still don't flush it yet
advance, token, err = f(content[8:], false)
assert.NoError(t, err)
assert.Equal(t, 0, advance)
assert.Equal(t, []byte(nil), token)

time.Sleep(2 * flushPeriod)

// Now it's been a while, so we should just flush the partial token
advance, token, err = f(content[8:], false)
assert.NoError(t, err)
assert.Equal(t, 7, advance)
assert.Equal(t, []byte("hellowo"), token)
func TestNewlineSplitFunc(t *testing.T) {
testCases := []struct {
name string
flushPeriod time.Duration
baseFunc bufio.SplitFunc
input []byte
steps []splittest.Step
}{
{
name: "FlushNoPeriod",
input: []byte("complete line\nincomplete"),
baseFunc: scanLinesStrict,
steps: []splittest.Step{
splittest.ExpectAdvanceToken(len("complete line\n"), "complete line"),
},
},
{
name: "FlushIncompleteLineAfterPeriod",
input: []byte("complete line\nincomplete"),
baseFunc: scanLinesStrict,
flushPeriod: 100 * time.Millisecond,
steps: []splittest.Step{
splittest.ExpectAdvanceToken(len("complete line\n"), "complete line"),
splittest.ExpectReadMore(),
splittest.Eventually(splittest.ExpectToken("incomplete"), 150*time.Millisecond, 10*time.Millisecond),
},
},
}

for _, tc := range testCases {
splitFunc := WithPeriod(tc.baseFunc, tc.flushPeriod)
t.Run(tc.name, splittest.New(splitFunc, tc.input, tc.steps...))
}
}

func TestNoFlushPeriod(t *testing.T) {
// Same test as above, but with a flush period of 0 we should never force flush.
// In other words, we should expect exactly the behavior of bufio.ScanWords.

flushPeriod := time.Duration(0)
f := WithPeriod(bufio.ScanWords, flushPeriod)

content := []byte("foo bar hellowo")

// The first token is complete
advance, token, err := f(content, false)
assert.NoError(t, err)
assert.Equal(t, 4, advance)
assert.Equal(t, []byte("foo"), token)

// The second token is also complete
advance, token, err = f(content[4:], false)
assert.NoError(t, err)
assert.Equal(t, 4, advance)
assert.Equal(t, []byte("bar"), token)

// We find a partial token, but we're using flushPeriod = 0 so we should never flush
advance, token, err = f(content[8:], false)
assert.NoError(t, err)
assert.Equal(t, 0, advance)
assert.Equal(t, []byte(nil), token)

// We find the same partial token, but we're using flushPeriod = 0 so we should never flush
advance, token, err = f(content[8:], false)
assert.NoError(t, err)
assert.Equal(t, 0, advance)
assert.Equal(t, []byte(nil), token)

time.Sleep(2 * flushPeriod)

// Now it's been a while, but we are using flushPeriod=0, so we should never not flush
advance, token, err = f(content[8:], false)
assert.NoError(t, err)
assert.Equal(t, 0, advance)
assert.Equal(t, []byte(nil), token)
func scanLinesStrict(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = bufio.ScanLines(data, atEOF)
if advance == len(token) {
return 0, nil, nil
}
return
}
44 changes: 31 additions & 13 deletions pkg/stanza/split/splittest/splittest_detailed.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,34 @@ package splittest // import "github.com/open-telemetry/opentelemetry-collector-c
import (
"bufio"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

type Step struct {
validate validate
waitAtEOF func() bool
validate func(t *testing.T, advance int, token []byte, err error)
}

type validate func(t *testing.T, advance int, token []byte, err error)
var noWait = func() bool { return false }

func ExpectReadMore() Step {
return Step{
waitAtEOF: noWait,
validate: func(t *testing.T, advance int, token []byte, err error) {
assert.True(t, needMoreData(advance, token, err))
},
}
}

func ExpectToken(expectToken string) Step {
return ExpectAdvanceToken(len(expectToken), expectToken)
}

func ExpectAdvanceToken(expectAdvance int, expectToken string) Step {
return Step{
waitAtEOF: noWait,
validate: func(t *testing.T, advance int, token []byte, err error) {
assert.Equal(t, expectAdvance, advance)
assert.Equal(t, []byte(expectToken), token)
Expand All @@ -32,6 +44,7 @@ func ExpectAdvanceToken(expectAdvance int, expectToken string) Step {

func ExpectAdvanceNil(expectAdvance int) Step {
return Step{
waitAtEOF: noWait,
validate: func(t *testing.T, advance int, token []byte, err error) {
assert.Equal(t, expectAdvance, advance)
assert.Equal(t, []byte(nil), token)
Expand All @@ -42,26 +55,38 @@ func ExpectAdvanceNil(expectAdvance int) Step {

func ExpectError(expectErr string) Step {
return Step{
waitAtEOF: noWait,
validate: func(t *testing.T, advance int, token []byte, err error) {
assert.EqualError(t, err, expectErr)
},
}
}

func Eventually(step Step, maxTime time.Duration, tick time.Duration) Step {
var waited time.Duration
step.waitAtEOF = func() bool {
time.Sleep(tick)
waited += tick
return waited < maxTime
}
return step
}

func New(splitFunc bufio.SplitFunc, input []byte, steps ...Step) func(*testing.T) {
return func(t *testing.T) {
var offset int
var atEOF bool
for _, step := range append(steps, expectMoreRequestAtEOF()) {
for _, step := range append(steps, ExpectReadMore()) {
// Split funcs do not have control over the size of the
// buffer so must be able to ask for more data as needed.
// Start with a tiny buffer and grow it slowly to ensure
// the split func is capable of asking appropriately.
var bufferSize int
var atEOF bool
var advance int
var token []byte
var err error
for !atEOF && needMoreData(advance, token, err) {

for needMoreData(advance, token, err) && (!atEOF || step.waitAtEOF()) {
// Grow the buffer at a slow pace to ensure that we're
// exercising the split func's ability to ask for more data.
bufferSize = 1 + bufferSize + bufferSize/8
Expand All @@ -73,21 +98,14 @@ func New(splitFunc bufio.SplitFunc, input []byte, steps ...Step) func(*testing.T
data = append(data, input[offset:offset+bufferSize]...)
}
advance, token, err = splitFunc(data, atEOF)
// t.Errorf("\nbuffer: %d, advance: %d, token: %q, err: %v", bufferSize, advance, token, err)
}
offset += advance
step.validate(t, advance, token, err)
}
}
}

func expectMoreRequestAtEOF() Step {
return Step{
validate: func(t *testing.T, advance int, token []byte, err error) {
assert.True(t, needMoreData(advance, token, err))
},
}
}

func needMoreData(advance int, token []byte, err error) bool {
return advance == 0 && token == nil && err == nil
}
29 changes: 28 additions & 1 deletion pkg/stanza/split/splittest/splittest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"strings"
"testing"
"time"
)

func TestNew(t *testing.T) {
Expand Down Expand Up @@ -111,6 +112,17 @@ func TestNew(t *testing.T) {
ExpectError("hello error!"),
},
},
{
name: "ScanLinesStrictWithFlush",
splitFunc: scanLinesStrictWithFlush(100 * time.Millisecond),
input: []byte("foo bar.\nhello world!\nincomplete line"),
steps: []Step{
ExpectAdvanceToken(len("foo bar.\n"), "foo bar."),
ExpectAdvanceToken(len("hello world!\n"), "hello world!"),
ExpectReadMore(),
Eventually(ExpectToken("incomplete line"), 150*time.Millisecond, 10*time.Millisecond),
},
},
}

for _, tc := range testCases {
Expand All @@ -129,7 +141,22 @@ func scanLinesStrict(data []byte, atEOF bool) (advance int, token []byte, err er
func scanLinesError(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = bufio.ScanLines(data, atEOF)
if strings.Contains(string(token), "error") {
return 0, nil, errors.New(string(token))
return advance, token, errors.New(string(token))
}
return
}

func scanLinesStrictWithFlush(flushPeriod time.Duration) bufio.SplitFunc {
now := time.Now()
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = scanLinesStrict(data, atEOF)
if advance > 0 || token != nil || err != nil {
return
}
if time.Since(now) > flushPeriod {
now = time.Now()
return len(data), data, nil
}
return 0, nil, nil
}
}

0 comments on commit 6f1fcba

Please sign in to comment.