From a02623fbe31b8ab699791d126ed365c667388d30 Mon Sep 17 00:00:00 2001 From: Matej Vasek Date: Thu, 8 Jun 2023 19:04:49 +0200 Subject: [PATCH 1/4] fix: bad connection handling for in cluster dialer Connections were closed from wrong end of io.Pipe which resulted in confusing error logs. Signed-off-by: Matej Vasek --- pkg/k8s/dialer.go | 63 ++++++++++++++++++++++++++++++----------------- 1 file changed, 40 insertions(+), 23 deletions(-) diff --git a/pkg/k8s/dialer.go b/pkg/k8s/dialer.go index 7dc9a06a4..76a1cbf53 100644 --- a/pkg/k8s/dialer.go +++ b/pkg/k8s/dialer.go @@ -8,6 +8,7 @@ import ( "io" "net" "sync" + "sync/atomic" "time" coreV1 "k8s.io/api/core/v1" @@ -66,18 +67,15 @@ func (c *contextDialer) DialContext(ctx context.Context, network string, addr st return nil, fmt.Errorf("unsupported network: %q", network) } - execDone := make(chan struct{}) - pr, pw, conn := newConn(execDone) + pr, pw, conn := newConn() go func() { - defer close(execDone) errOut := bytes.NewBuffer(nil) err := c.exec(addr, pr, pw, errOut) if err != nil { err = fmt.Errorf("failed to exec in pod: %w (stderr: %q)", err, errOut.String()) - _ = pr.CloseWithError(err) - _ = pw.CloseWithError(err) } + _ = conn.closeWithError(err) }() return conn, nil @@ -324,50 +322,69 @@ func (a addr) String() string { } type conn struct { - pr *io.PipeReader - pw *io.PipeWriter - execDone <-chan struct{} + pr *io.PipeReader + pw *io.PipeWriter + err atomic.Pointer[error] } -func (c conn) Read(b []byte) (n int, err error) { - return c.pr.Read(b) +func (c *conn) Read(b []byte) (n int, err error) { + n, err = c.pr.Read(b) + if errors.Is(err, io.ErrClosedPipe) { + if p := c.err.Load(); p != nil { + err = *p + } + } + return } -func (c conn) Write(b []byte) (n int, err error) { - return c.pw.Write(b) +func (c *conn) Write(b []byte) (n int, err error) { + n, err = c.pw.Write(b) + if errors.Is(err, io.ErrClosedPipe) { + if p := c.err.Load(); p != nil { + err = *p + } + } + return } -func (c conn) Close() error { - err := c.pw.Close() +func (c *conn) closeWithError(err error) error { + { + e := err + c.err.CompareAndSwap(nil, &e) + } + err = c.pw.CloseWithError(io.EOF) if err != nil { return fmt.Errorf("failed to close writer: %w", err) } - <-c.execDone - err = c.pr.Close() + err = c.pr.CloseWithError(net.ErrClosed) if err != nil { return fmt.Errorf("failed to close reader: %w", err) } return nil } -func (c conn) LocalAddr() net.Addr { +func (c *conn) Close() error { + return c.closeWithError(net.ErrClosed) +} + +func (c *conn) LocalAddr() net.Addr { return addr{} } -func (c conn) RemoteAddr() net.Addr { +func (c *conn) RemoteAddr() net.Addr { return addr{} } -func (c conn) SetDeadline(t time.Time) error { return nil } +func (c *conn) SetDeadline(t time.Time) error { return nil } -func (c conn) SetReadDeadline(t time.Time) error { return nil } +func (c *conn) SetReadDeadline(t time.Time) error { return nil } -func (c conn) SetWriteDeadline(t time.Time) error { return nil } +func (c *conn) SetWriteDeadline(t time.Time) error { return nil } -func newConn(execDone <-chan struct{}) (*io.PipeReader, *io.PipeWriter, conn) { +func newConn() (*io.PipeReader, *io.PipeWriter, *conn) { pr0, pw0 := io.Pipe() pr1, pw1 := io.Pipe() - rwc := conn{pr: pr0, pw: pw1, execDone: execDone} + rwc := &conn{pr: pr0, pw: pw1} return pr1, pw0, rwc } From 823f03ae5e8148452cab074d8f2ffb5bb507f7d2 Mon Sep 17 00:00:00 2001 From: Matej Vasek Date: Thu, 8 Jun 2023 23:06:46 +0200 Subject: [PATCH 2/4] feat: DialContext() better error handling Now DialContext() returns some errors immediately instead of deferring it Read/Write operation on the returned connection. Signed-off-by: Matej Vasek --- pkg/k8s/dialer.go | 49 +++++++++++++++++++++++++++++++++++------- pkg/k8s/dialer_test.go | 12 ++--------- 2 files changed, 43 insertions(+), 18 deletions(-) diff --git a/pkg/k8s/dialer.go b/pkg/k8s/dialer.go index 76a1cbf53..e6799c802 100644 --- a/pkg/k8s/dialer.go +++ b/pkg/k8s/dialer.go @@ -1,12 +1,14 @@ package k8s import ( + "bufio" "bytes" "context" "errors" "fmt" "io" "net" + "regexp" "sync" "sync/atomic" "time" @@ -67,18 +69,45 @@ func (c *contextDialer) DialContext(ctx context.Context, network string, addr st return nil, fmt.Errorf("unsupported network: %q", network) } - pr, pw, conn := newConn() - + ctrStdin, ctrStdout, conn := newConn() + connectSuccess := make(chan struct{}) + connectFailure := make(chan error, 1) go func() { - errOut := bytes.NewBuffer(nil) - err := c.exec(addr, pr, pw, errOut) + stderrBuff := bytes.NewBuffer(nil) + ctrStderr := io.MultiWriter(stderrBuff, detectConnSuccess(connectSuccess)) + + err := c.exec(addr, ctrStdin, ctrStdout, ctrStderr) if err != nil { - err = fmt.Errorf("failed to exec in pod: %w (stderr: %q)", err, errOut.String()) + err = fmt.Errorf("failed to exec in pod: %w (stderr: %q)", err, stderrBuff.String()) } _ = conn.closeWithError(err) + connectFailure <- err }() - return conn, nil + select { + case <-connectSuccess: + return conn, nil + case err := <-connectFailure: + return nil, err + case <-ctx.Done(): + _ = conn.closeWithError(ctx.Err()) + return nil, ctx.Err() + } +} + +var connSuccessfulRE = regexp.MustCompile("successfully connected") + +// Creates io.Writer which closes connectSuccess channel when string "successfully connected" is written to it. +func detectConnSuccess(connectSuccess chan struct{}) io.Writer { + pr, pw := io.Pipe() + go func() { + ok := connSuccessfulRE.MatchReader(bufio.NewReader(pr)) + if ok { + close(connectSuccess) + } + _, _ = io.Copy(io.Discard, pr) + }() + return pw } func (c *contextDialer) Close() error { @@ -195,7 +224,7 @@ func (c *contextDialer) exec(hostPort string, in io.Reader, out, errOut io.Write Namespace(c.namespace). SubResource("exec") req.VersionedParams(&coreV1.PodExecOptions{ - Command: []string{"socat", "-", fmt.Sprintf("TCP:%s", hostPort)}, + Command: []string{"socat", "-dd", "-", fmt.Sprintf("TCP:%s", hostPort)}, Container: c.podName, Stdin: true, Stdout: true, @@ -348,6 +377,10 @@ func (c *conn) Write(b []byte) (n int, err error) { } func (c *conn) closeWithError(err error) error { + if err == nil { + err = net.ErrClosed + } + { e := err c.err.CompareAndSwap(nil, &e) @@ -364,7 +397,7 @@ func (c *conn) closeWithError(err error) error { } func (c *conn) Close() error { - return c.closeWithError(net.ErrClosed) + return c.closeWithError(nil) } func (c *conn) LocalAddr() net.Addr { diff --git a/pkg/k8s/dialer_test.go b/pkg/k8s/dialer_test.go index 157d26e43..df19f7412 100644 --- a/pkg/k8s/dialer_test.go +++ b/pkg/k8s/dialer_test.go @@ -167,16 +167,8 @@ func TestDialUnreachable(t *testing.T) { t.Cleanup(func() { dialer.Close() }) - - transport := &http.Transport{ - DialContext: dialer.DialContext, - } - - var client = http.Client{ - Transport: transport, - } - - _, err = client.Get("http://does-not.exists.svc") + + _, err = dialer.DialContext(ctx, "tcp", "does-not.exists.svc:80") if err == nil { t.Error("error was expected but got nil") return From 91586177efcbb79596a404f4b2e5966ea7ce9f63 Mon Sep 17 00:00:00 2001 From: Matej Vasek Date: Fri, 9 Jun 2023 00:39:59 +0200 Subject: [PATCH 3/4] feat: DialContext() more better error handling Now ContextDial() tries to parse socat's stderr and translate it to Go's net.OpError instead of just creating error with whole stderr embedded in it. Signed-off-by: Matej Vasek --- pkg/k8s/dialer.go | 68 +++++++++++++++++++++++++++++++++++++++++- pkg/k8s/dialer_test.go | 15 ++++++++-- 2 files changed, 79 insertions(+), 4 deletions(-) diff --git a/pkg/k8s/dialer.go b/pkg/k8s/dialer.go index e6799c802..cd9b946b0 100644 --- a/pkg/k8s/dialer.go +++ b/pkg/k8s/dialer.go @@ -8,9 +8,12 @@ import ( "fmt" "io" "net" + "os" "regexp" + "strconv" "sync" "sync/atomic" + "syscall" "time" coreV1 "k8s.io/api/core/v1" @@ -78,7 +81,13 @@ func (c *contextDialer) DialContext(ctx context.Context, network string, addr st err := c.exec(addr, ctrStdin, ctrStdout, ctrStderr) if err != nil { - err = fmt.Errorf("failed to exec in pod: %w (stderr: %q)", err, stderrBuff.String()) + stderrStr := stderrBuff.String() + socatErr := tryParseSocatError(network, addr, stderrStr) + if socatErr != nil { + err = fmt.Errorf("socat error: %w", socatErr) + } else { + err = fmt.Errorf("failed to exec in pod: %w (stderr: %q)", err, stderrStr) + } } _ = conn.closeWithError(err) connectFailure <- err @@ -110,6 +119,63 @@ func detectConnSuccess(connectSuccess chan struct{}) io.Writer { return pw } +var ( + connectionRefusedErrorRE = regexp.MustCompile(`E connect\(\d+, AF=\d+ (?P[\[\]0-9.:a-z]+), \d+\): Connection refused`) + nameResolutionErrorRE = regexp.MustCompile(`E getaddrinfo\("(?P[a-zA-z-.0-9]+)",.*\): Name does not resolve`) +) + +// tries to detect common errors from `socat` stderr +func tryParseSocatError(network, address, stderr string) error { + groups := nameResolutionErrorRE.FindStringSubmatch(stderr) + if groups != nil { + var name string + if len(groups) > 1 { + name = groups[1] + } + return &net.OpError{ + Op: "dial", + Net: network, + Source: nil, + Addr: nil, + Err: &net.DNSError{ + Err: "no such host", + Name: name, + IsNotFound: true, + }, + } + } + groups = connectionRefusedErrorRE.FindStringSubmatch(stderr) + if groups != nil { + var ( + addr net.IP + port int + zone string + ) + if len(groups) > 1 { + h, p, err := net.SplitHostPort(groups[1]) + if err == nil { + addr = net.ParseIP(h) + p, _ := strconv.ParseInt(p, 10, 16) + port = int(p) + } + } + return &net.OpError{ + Op: "dial", + Net: network, + Addr: &net.TCPAddr{ + IP: addr, + Port: port, + Zone: zone, + }, + Err: &os.SyscallError{ + Syscall: "connect", + Err: syscall.ECONNREFUSED, + }, + } + } + return nil +} + func (c *contextDialer) Close() error { // closing the channel will cause stdin of the attached container to return EOF // as a result the pod exits -- it transits to Completed state diff --git a/pkg/k8s/dialer_test.go b/pkg/k8s/dialer_test.go index df19f7412..aaef149d3 100644 --- a/pkg/k8s/dialer_test.go +++ b/pkg/k8s/dialer_test.go @@ -167,13 +167,22 @@ func TestDialUnreachable(t *testing.T) { t.Cleanup(func() { dialer.Close() }) - + _, err = dialer.DialContext(ctx, "tcp", "does-not.exists.svc:80") if err == nil { t.Error("error was expected but got nil") return } - if !strings.Contains(err.Error(), "not resolve") { - t.Errorf("error %q doesn't containe expected sub-string: ", err.Error()) + if !strings.Contains(err.Error(), "no such host") { + t.Errorf("error %q doesn't containe expected substring: ", err.Error()) + } + + _, err = dialer.DialContext(ctx, "tcp", "localhost:80") + if err == nil { + t.Error("error was expected but got nil") + return + } + if !strings.Contains(err.Error(), "connection refused") { + t.Errorf("error %q doesn't containe expected substring: ", err.Error()) } } From 7a18b2f133087e1cbaccba04f84e90a9af9150ca Mon Sep 17 00:00:00 2001 From: Matej Vasek Date: Fri, 9 Jun 2023 21:48:41 +0200 Subject: [PATCH 4/4] Apply suggestions from code review Co-authored-by: Lance Ball --- pkg/k8s/dialer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/k8s/dialer_test.go b/pkg/k8s/dialer_test.go index aaef149d3..22f3ba8e9 100644 --- a/pkg/k8s/dialer_test.go +++ b/pkg/k8s/dialer_test.go @@ -174,7 +174,7 @@ func TestDialUnreachable(t *testing.T) { return } if !strings.Contains(err.Error(), "no such host") { - t.Errorf("error %q doesn't containe expected substring: ", err.Error()) + t.Errorf("error %q doesn't contain expected substring: ", err.Error()) } _, err = dialer.DialContext(ctx, "tcp", "localhost:80") @@ -183,6 +183,6 @@ func TestDialUnreachable(t *testing.T) { return } if !strings.Contains(err.Error(), "connection refused") { - t.Errorf("error %q doesn't containe expected substring: ", err.Error()) + t.Errorf("error %q doesn't contain expected substring: ", err.Error()) } }