Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable errorlint linter and fix issues #914

Merged
merged 2 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
linters:
enable:
- bodyclose
- errorlint
- goconst
- godot
- gofmt
Expand All @@ -10,5 +11,4 @@ linters:
disable:
# Temporarily disabling so it can be addressed in a dedicated PR.
- errcheck
- errorlint
- goerr113
3 changes: 2 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"bytes"
"context"
"errors"
"io"
"math/rand"
"net"
Expand Down Expand Up @@ -262,7 +263,7 @@ func TestClientProduceAndConsume(t *testing.T) {
for {
r, err := res.Records.ReadRecord()
if err != nil {
if err != io.EOF {
if !errors.Is(err, io.EOF) {
t.Fatal(err)
}
break
Expand Down
9 changes: 5 additions & 4 deletions compress/snappy/go-xerial-snappy/snappy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package snappy

import (
"bytes"
"errors"
"testing"
)

Expand Down Expand Up @@ -92,7 +93,7 @@ func TestSnappyDecodeMalformedTruncatedHeader(t *testing.T) {
for i := 0; i < len(xerialHeader); i++ {
buf := make([]byte, i)
copy(buf, xerialHeader[:i])
if _, err := Decode(buf); err != ErrMalformed {
if _, err := Decode(buf); !errors.Is(err, ErrMalformed) {
t.Errorf("expected ErrMalformed got %v", err)
}
}
Expand All @@ -104,7 +105,7 @@ func TestSnappyDecodeMalformedTruncatedSize(t *testing.T) {
for _, size := range sizes {
buf := make([]byte, size)
copy(buf, xerialHeader)
if _, err := Decode(buf); err != ErrMalformed {
if _, err := Decode(buf); !errors.Is(err, ErrMalformed) {
t.Errorf("expected ErrMalformed got %v", err)
}
}
Expand All @@ -116,7 +117,7 @@ func TestSnappyDecodeMalformedBNoData(t *testing.T) {
copy(buf, xerialHeader)
// indicate that there's one byte of data to be read
buf[len(buf)-1] = 1
if _, err := Decode(buf); err != ErrMalformed {
if _, err := Decode(buf); !errors.Is(err, ErrMalformed) {
t.Errorf("expected ErrMalformed got %v", err)
}
}
Expand All @@ -128,7 +129,7 @@ func TestSnappyMasterDecodeFailed(t *testing.T) {
buf[len(buf)-2] = 1
// A payload which will not decode
buf[len(buf)-1] = 1
if _, err := Decode(buf); err == ErrMalformed || err == nil {
if _, err := Decode(buf); errors.Is(err, ErrMalformed) || err == nil {
t.Errorf("unexpected err: %v", err)
}
}
Expand Down
14 changes: 6 additions & 8 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1231,11 +1231,10 @@ func (c *Conn) writeRequest(apiKey apiKey, apiVersion apiVersion, correlationID

func (c *Conn) readResponse(size int, res interface{}) error {
size, err := read(&c.rbuf, size, res)
switch err.(type) {
case Error:
var e error
if size, e = discardN(&c.rbuf, size, size); e != nil {
err = e
if err != nil {
var kafkaError Error
if errors.As(err, &kafkaError) {
size, err = discardN(&c.rbuf, size, size)
}
}
return expectZeroSize(size, err)
Expand Down Expand Up @@ -1294,9 +1293,8 @@ func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func
}

if err = read(deadline, size); err != nil {
switch err.(type) {
case Error:
default:
var kafkaError Error
if !errors.As(err, &kafkaError) {
c.conn.Close()
}
}
Expand Down
46 changes: 26 additions & 20 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,11 @@ func testConnWrite(t *testing.T, conn *Conn) {
func testConnCloseAndWrite(t *testing.T, conn *Conn) {
conn.Close()

switch _, err := conn.Write([]byte("Hello World!")); err.(type) {
case *net.OpError:
default:
_, err := conn.Write([]byte("Hello World!"))

// expect a network error
var netOpError *net.OpError
if !errors.As(err, &netOpError) {
t.Error(err)
}
}
Expand Down Expand Up @@ -489,7 +491,7 @@ func testConnSeekDontCheck(t *testing.T, conn *Conn) {
t.Error("bad offset:", offset)
}

if _, err := conn.ReadMessage(1024); err != OffsetOutOfRange {
if _, err := conn.ReadMessage(1024); !errors.Is(err, OffsetOutOfRange) {
t.Error("unexpected error:", err)
}
}
Expand Down Expand Up @@ -659,13 +661,15 @@ func waitForCoordinator(t *testing.T, conn *Conn, groupID string) {
_, err := conn.findCoordinator(findCoordinatorRequestV0{
CoordinatorKey: groupID,
})
switch err {
case nil:
if err != nil {
if errors.Is(err, GroupCoordinatorNotAvailable) {
time.Sleep(250 * time.Millisecond)
continue
} else {
t.Fatalf("unable to find coordinator for group: %v", err)
}
} else {
return
case GroupCoordinatorNotAvailable:
time.Sleep(250 * time.Millisecond)
default:
t.Fatalf("unable to find coordinator for group: %v", err)
}
}

Expand All @@ -690,15 +694,18 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32,
},
},
})
switch err {
case nil:
if err != nil {
if errors.Is(err, NotCoordinatorForGroup) {
time.Sleep(250 * time.Millisecond)
continue
} else {
t.Fatalf("bad joinGroup: %s", err)
}
} else {
return
case NotCoordinatorForGroup:
time.Sleep(250 * time.Millisecond)
default:
t.Fatalf("bad joinGroup: %s", err)
}
}

return
}

Expand Down Expand Up @@ -742,12 +749,11 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) {
}
response, err := conn.findCoordinator(findCoordinatorRequestV0{CoordinatorKey: groupID})
if err != nil {
switch err {
case GroupCoordinatorNotAvailable:
if errors.Is(err, GroupCoordinatorNotAvailable) {
continue
default:
t.Fatalf("bad findCoordinator: %s", err)
}

t.Fatalf("bad findCoordinator: %s", err)
}

if response.Coordinator.NodeID == 0 {
Expand Down
24 changes: 15 additions & 9 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,19 +523,21 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) {
return
case <-ticker.C:
ops, err := g.conn.readPartitions(topic)
switch err {
case nil, UnknownTopicOrPartition:
switch {
case err == nil, errors.Is(err, UnknownTopicOrPartition):
if len(ops) != oParts {
g.log(func(l Logger) {
l.Printf("Partition changes found, reblancing group: %v.", g.GroupID)
})
return
}

default:
g.logError(func(l Logger) {
l.Printf("Problem getting partitions while checking for changes, %v", err)
})
if _, ok := err.(Error); ok {
var kafkaError Error
if errors.As(err, &kafkaError) {
continue
}
// other errors imply that we lost the connection to the coordinator, so we
Expand Down Expand Up @@ -724,20 +726,24 @@ func (cg *ConsumerGroup) run() {
// to the next generation. it will be non-nil in the case of an error
// joining or syncing the group.
var backoff <-chan time.Time
switch err {
case nil:

switch {
case err == nil:
// no error...the previous generation finished normally.
continue
case ErrGroupClosed:

case errors.Is(err, ErrGroupClosed):
// the CG has been closed...leave the group and exit loop.
_ = cg.leaveGroup(memberID)
return
case RebalanceInProgress:

case errors.Is(err, RebalanceInProgress):
// in case of a RebalanceInProgress, don't leave the group or
// change the member ID, but report the error. the next attempt
// to join the group will then be subject to the rebalance
// timeout, so the broker will be responsible for throttling
// this loop.

default:
// leave the group and report the error if we had gotten far
// enough so as to have a member ID. also clear the member id
Expand Down Expand Up @@ -984,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque
for _, balancer := range cg.config.GroupBalancers {
userData, err := balancer.UserData()
if err != nil {
return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %v", balancer.ProtocolName(), err)
return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err)
}
request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{
ProtocolName: balancer.ProtocolName(),
Expand Down Expand Up @@ -1050,7 +1056,7 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember
metadata := groupMetadata{}
reader := bufio.NewReader(bytes.NewReader(item.MemberMetadata))
if remain, err := (&metadata).readFrom(reader, len(item.MemberMetadata)); err != nil || remain != 0 {
return nil, fmt.Errorf("unable to read metadata for member, %v: %v", item.MemberID, err)
return nil, fmt.Errorf("unable to read metadata for member, %v: %w", item.MemberID, err)
}

members = append(members, GroupMember{
Expand Down
13 changes: 8 additions & 5 deletions createtopics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"bufio"
"context"
"errors"
"fmt"
"net"
"time"
Expand Down Expand Up @@ -384,12 +385,14 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error {
_, err := c.createTopics(createTopicsRequestV0{
Topics: requestV0Topics,
})
if err != nil {
if errors.Is(err, TopicAlreadyExists) {
// ok
return nil
}

switch err {
case TopicAlreadyExists:
// ok
return nil
default:
return err
}

return nil
}
3 changes: 2 additions & 1 deletion dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -294,7 +295,7 @@ func TestDialerConnectTLSHonorsContext(t *testing.T) {
defer cancel()

_, err := d.connectTLS(ctx, conn, d.TLS)
if context.DeadlineExceeded != err {
if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("expected err to be %v; got %v", context.DeadlineExceeded, err)
t.FailNow()
}
Expand Down
7 changes: 4 additions & 3 deletions discard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"bufio"
"bytes"
"errors"
"io"
"testing"
)
Expand Down Expand Up @@ -52,7 +53,7 @@ func TestDiscardN(t *testing.T) {
scenario: "discard more than available",
function: func(t *testing.T, r *bufio.Reader, sz int) {
remain, err := discardN(r, sz, sz+1)
if err != errShortRead {
if !errors.Is(err, errShortRead) {
t.Errorf("Expected errShortRead, got %v", err)
}
if remain != 0 {
Expand All @@ -64,7 +65,7 @@ func TestDiscardN(t *testing.T) {
scenario: "discard returns error",
function: func(t *testing.T, r *bufio.Reader, sz int) {
remain, err := discardN(r, sz+2, sz+1)
if err != io.EOF {
if !errors.Is(err, io.EOF) {
t.Errorf("Expected EOF, got %v", err)
}
if remain != 2 {
Expand All @@ -76,7 +77,7 @@ func TestDiscardN(t *testing.T) {
scenario: "errShortRead doesn't mask error",
function: func(t *testing.T, r *bufio.Reader, sz int) {
remain, err := discardN(r, sz+1, sz+2)
if err != io.EOF {
if !errors.Is(err, io.EOF) {
t.Errorf("Expected EOF, got %v", err)
}
if remain != 1 {
Expand Down
23 changes: 13 additions & 10 deletions example_consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka_test

import (
"context"
"errors"
"fmt"
"os"

Expand Down Expand Up @@ -42,18 +43,20 @@ func ExampleGeneration_Start_consumerGroupParallelReaders() {
reader.SetOffset(offset)
for {
msg, err := reader.ReadMessage(ctx)
switch err {
case kafka.ErrGenerationEnded:
// generation has ended. commit offsets. in a real app,
// offsets would be committed periodically.
gen.CommitOffsets(map[string]map[int]int64{"my-topic": {partition: offset + 1}})
return
case nil:
fmt.Printf("received message %s/%d/%d : %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
offset = msg.Offset
default:
if err != nil {
if errors.Is(err, kafka.ErrGenerationEnded) {
// generation has ended. commit offsets. in a real app,
// offsets would be committed periodically.
gen.CommitOffsets(map[string]map[int]int64{"my-topic": {partition: offset + 1}})
return
}

fmt.Printf("error reading message: %+v\n", err)
return
}

fmt.Printf("received message %s/%d/%d : %s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
offset = msg.Offset
}
})
}
Expand Down
3 changes: 2 additions & 1 deletion message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"encoding/hex"
"errors"
"fmt"
"io"
"math/rand"
Expand Down Expand Up @@ -551,7 +552,7 @@ func TestMessageSetReaderEmpty(t *testing.T) {
if headers != nil {
t.Errorf("expected nil headers, got %v", headers)
}
if err != RequestTimedOut {
if !errors.Is(err, RequestTimedOut) {
t.Errorf("expected RequestTimedOut, got %v", err)
}

Expand Down
Loading