diff --git a/consumergroup.go b/consumergroup.go index 92c622c7..00380ce9 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -8,6 +8,8 @@ import ( "fmt" "io" "math" + "net" + "strconv" "strings" "sync" "time" @@ -861,7 +863,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { return nil, err } - address := fmt.Sprintf("%v:%v", out.Coordinator.Host, out.Coordinator.Port) + address := net.JoinHostPort(out.Coordinator.Host, strconv.Itoa(int(out.Coordinator.Port))) return cg.config.connect(cg.config.Dialer, address) } diff --git a/initproducerid_test.go b/initproducerid_test.go index 5f1bb59a..480175f4 100644 --- a/initproducerid_test.go +++ b/initproducerid_test.go @@ -3,7 +3,8 @@ package kafka import ( "context" "errors" - "fmt" + "net" + "strconv" "testing" "time" @@ -31,7 +32,7 @@ func TestClientInitProducerId(t *testing.T) { } // Now establish a connection with the transaction coordinator - transactionCoordinator := TCP(fmt.Sprintf("%s:%d", respc.Coordinator.Host, respc.Coordinator.Port)) + transactionCoordinator := TCP(net.JoinHostPort(respc.Coordinator.Host, strconv.Itoa(int(respc.Coordinator.Port)))) client, shutdown = newClient(transactionCoordinator) defer shutdown()