Skip to content

Commit

Permalink
Benchmarking
Browse files Browse the repository at this point in the history
  • Loading branch information
neilotoole committed Jan 25, 2024
1 parent 10ee50a commit fadd9b2
Show file tree
Hide file tree
Showing 3 changed files with 332 additions and 30 deletions.
42 changes: 21 additions & 21 deletions helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,31 +169,31 @@ func (r *tweakableReader) Read(p []byte) (n int, err error) {
}

// requireNoTake fails if a value is taken from c.
func requireNoTake[C any](t *testing.T, c <-chan C, msgAndArgs ...any) {
t.Helper()
func requireNoTake[C any](tb testing.TB, c <-chan C, msgAndArgs ...any) {
tb.Helper()
select {
case <-c:
require.Fail(t, "unexpected take from channel", msgAndArgs...)
require.Fail(tb, "unexpected take from channel", msgAndArgs...)
default:
}
}

// requireTake fails if a value is not taken from c.
func requireTake[C any](t *testing.T, c <-chan C, msgAndArgs ...any) {
t.Helper()
func requireTake[C any](tb testing.TB, c <-chan C, msgAndArgs ...any) {
tb.Helper()
select {
case <-c:
default:
require.Fail(t, "unexpected failure to take from channel", msgAndArgs...)
require.Fail(tb, "unexpected failure to take from channel", msgAndArgs...)
}
}

// totalTimeout is used by requireTotal and requireNoTotal.
const totalTimeout = time.Millisecond * 100

// requireNoTotal requires that s.Total blocks.
func requireNoTotal(t *testing.T, s *streamcache.Stream) {
t.Helper()
func requireNoTotal(tb testing.TB, s *streamcache.Stream) {
tb.Helper()

failErr := errors.New("fail")
ctx, cancel := context.WithCancelCause(context.Background())
Expand All @@ -213,15 +213,15 @@ func requireNoTotal(t *testing.T, s *streamcache.Stream) {
}()

<-wait
require.Error(t, err)
require.True(t, errors.Is(err, failErr))
require.Equal(t, 0, size)
require.Error(tb, err)
require.True(tb, errors.Is(err, failErr))
require.Equal(tb, 0, size)
}

// requireTotal requires that s.Total doesn't block, and
// that s.Total returns want and no error.
func requireTotal(t *testing.T, s *streamcache.Stream, want int) {
t.Helper()
func requireTotal(tb testing.TB, s *streamcache.Stream, want int) {
tb.Helper()

var (
ctx, cancel = context.WithCancelCause(context.Background())
Expand All @@ -239,16 +239,16 @@ func requireTotal(t *testing.T, s *streamcache.Stream, want int) {
}()

<-wait
require.NoError(t, err)
require.Equal(t, want, size)
require.NoError(tb, err)
require.Equal(tb, want, size)
}

// generateSampleFile generates a temp file of sample data with the
// specified number of rows. It is the caller's responsibility to
// close the file. Note that the file is removed by t.Cleanup.
func generateSampleFile(t *testing.T, rows int) (size int, fp string) {
func generateSampleFile(tb testing.TB, rows int) (size int, fp string) {
f, err := os.CreateTemp("", "")
require.NoError(t, err)
require.NoError(tb, err)
fp = f.Name()

const line = "A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z"
Expand All @@ -258,14 +258,14 @@ func generateSampleFile(t *testing.T, rows int) (size int, fp string) {
// 1,A,B,C...
s := strconv.Itoa(i) + "," + line
_, err = fmt.Fprintln(f, s)
require.NoError(t, err)
require.NoError(tb, err)
}

require.NoError(t, f.Close())
require.NoError(tb, f.Close())
fi, err := os.Stat(fp)
require.NoError(t, err)
require.NoError(tb, err)
size = int(fi.Size())
t.Logf("Generated sample file [%d]: %s", size, fp)
tb.Logf("Generated sample file [%d]: %s", size, fp)
return int(fi.Size()), fp
}

Expand Down
18 changes: 9 additions & 9 deletions streamcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,8 @@ type Reader struct {
// call to Read will read. It is incremented by each Read.
offset int

// mu guards Reader's methods.
mu sync.Mutex
// closeOnce guards Reader's Close method.
closeOnce sync.Once
}

// Read reads from the stream. If a non-nil context was provided to Stream.NewReader
Expand All @@ -624,8 +624,8 @@ type Reader struct {
//
// Use io.ReadFull or io.ReadAtLeast if you want to ensure that p is filled.
func (r *Reader) Read(p []byte) (n int, err error) {
r.mu.Lock()
defer r.mu.Unlock()
// r.mu.Lock()
// defer r.mu.Unlock()

if r.ctx != nil {
select {
Expand Down Expand Up @@ -665,18 +665,18 @@ func (r *Reader) Read(p []byte) (n int, err error) {
// Note that subsequent calls to Close are no-op and return the same result
// as the first call.
func (r *Reader) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
r.closeOnce.Do(func() {
closeErr := r.s.close(r)
r.pCloseErr = &closeErr
})

if r.pCloseErr != nil {
// Already closed. Return the same error as the first call
// to Close (which may be nil).
return *r.pCloseErr
}

closeErr := r.s.close(r)
r.pCloseErr = &closeErr
return closeErr
return nil
}

// removeElement returns a (possibly new) slice that contains the
Expand Down
Loading

0 comments on commit fadd9b2

Please sign in to comment.