Skip to content

Commit

Permalink
[serverless-1.30] fix: bad connection handling for in cluster dialer (#…
Browse files Browse the repository at this point in the history
…647)

* fix: bad connection handling for in cluster dialer (knative#1800)

* fix: bad connection handling for in cluster dialer

Connections were closed from wrong end of io.Pipe
which resulted in confusing error logs.

Signed-off-by: Matej Vasek <mvasek@redhat.com>

* feat: DialContext() better error handling

Now DialContext() returns some errors immediately
instead of deferring it Read/Write operation on the returned connection.

Signed-off-by: Matej Vasek <mvasek@redhat.com>

* feat: DialContext() more better error handling

Now ContextDial() tries to parse socat's stderr and translate it to Go's net.OpError
instead of just creating error with whole stderr embedded in it.

Signed-off-by: Matej Vasek <mvasek@redhat.com>

* Apply suggestions from code review

Co-authored-by: Lance Ball <lball@redhat.com>

---------

Signed-off-by: Matej Vasek <mvasek@redhat.com>
Co-authored-by: Lance Ball <lball@redhat.com>

* fixup: update golangci-lit version

Signed-off-by: Matej Vasek <mvasek@redhat.com>

* fixup: update CI Go version

Signed-off-by: Matej Vasek <mvasek@redhat.com>

---------

Signed-off-by: Matej Vasek <mvasek@redhat.com>
Co-authored-by: Lance Ball <lball@redhat.com>
  • Loading branch information
matejvasek and lance committed Aug 24, 2023
1 parent 18b62d2 commit 8331fc8
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 37 deletions.
174 changes: 145 additions & 29 deletions pkg/k8s/dialer.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package k8s

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"os"
"regexp"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"

coreV1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -66,21 +72,108 @@ func (c *contextDialer) DialContext(ctx context.Context, network string, addr st
return nil, fmt.Errorf("unsupported network: %q", network)
}

execDone := make(chan struct{})
pr, pw, conn := newConn(execDone)

ctrStdin, ctrStdout, conn := newConn()
connectSuccess := make(chan struct{})
connectFailure := make(chan error, 1)
go func() {
defer close(execDone)
errOut := bytes.NewBuffer(nil)
err := c.exec(addr, pr, pw, errOut)
stderrBuff := bytes.NewBuffer(nil)
ctrStderr := io.MultiWriter(stderrBuff, detectConnSuccess(connectSuccess))

err := c.exec(addr, ctrStdin, ctrStdout, ctrStderr)
if err != nil {
err = fmt.Errorf("failed to exec in pod: %w (stderr: %q)", err, errOut.String())
_ = pr.CloseWithError(err)
_ = pw.CloseWithError(err)
stderrStr := stderrBuff.String()
socatErr := tryParseSocatError(network, addr, stderrStr)
if socatErr != nil {
err = fmt.Errorf("socat error: %w", socatErr)
} else {
err = fmt.Errorf("failed to exec in pod: %w (stderr: %q)", err, stderrStr)
}
}
_ = conn.closeWithError(err)
connectFailure <- err
}()

return conn, nil
select {
case <-connectSuccess:
return conn, nil
case err := <-connectFailure:
return nil, err
case <-ctx.Done():
_ = conn.closeWithError(ctx.Err())
return nil, ctx.Err()
}
}

var connSuccessfulRE = regexp.MustCompile("successfully connected")

// Creates io.Writer which closes connectSuccess channel when string "successfully connected" is written to it.
func detectConnSuccess(connectSuccess chan struct{}) io.Writer {
pr, pw := io.Pipe()
go func() {
ok := connSuccessfulRE.MatchReader(bufio.NewReader(pr))
if ok {
close(connectSuccess)
}
_, _ = io.Copy(io.Discard, pr)
}()
return pw
}

var (
connectionRefusedErrorRE = regexp.MustCompile(`E connect\(\d+, AF=\d+ (?P<hostport>[\[\]0-9.:a-z]+), \d+\): Connection refused`)
nameResolutionErrorRE = regexp.MustCompile(`E getaddrinfo\("(?P<hostname>[a-zA-z-.0-9]+)",.*\): Name does not resolve`)
)

// tries to detect common errors from `socat` stderr
func tryParseSocatError(network, address, stderr string) error {
groups := nameResolutionErrorRE.FindStringSubmatch(stderr)
if groups != nil {
var name string
if len(groups) > 1 {
name = groups[1]
}
return &net.OpError{
Op: "dial",
Net: network,
Source: nil,
Addr: nil,
Err: &net.DNSError{
Err: "no such host",
Name: name,
IsNotFound: true,
},
}
}
groups = connectionRefusedErrorRE.FindStringSubmatch(stderr)
if groups != nil {
var (
addr net.IP
port int
zone string
)
if len(groups) > 1 {
h, p, err := net.SplitHostPort(groups[1])
if err == nil {
addr = net.ParseIP(h)
p, _ := strconv.ParseInt(p, 10, 16)
port = int(p)
}
}
return &net.OpError{
Op: "dial",
Net: network,
Addr: &net.TCPAddr{
IP: addr,
Port: port,
Zone: zone,
},
Err: &os.SyscallError{
Syscall: "connect",
Err: syscall.ECONNREFUSED,
},
}
}
return nil
}

func (c *contextDialer) Close() error {
Expand Down Expand Up @@ -197,7 +290,7 @@ func (c *contextDialer) exec(hostPort string, in io.Reader, out, errOut io.Write
Namespace(c.namespace).
SubResource("exec")
req.VersionedParams(&coreV1.PodExecOptions{
Command: []string{"socat", "-", fmt.Sprintf("TCP:%s", hostPort)},
Command: []string{"socat", "-dd", "-", fmt.Sprintf("TCP:%s", hostPort)},
Container: c.podName,
Stdin: true,
Stdout: true,
Expand Down Expand Up @@ -324,50 +417,73 @@ func (a addr) String() string {
}

type conn struct {
pr *io.PipeReader
pw *io.PipeWriter
execDone <-chan struct{}
pr *io.PipeReader
pw *io.PipeWriter
err atomic.Pointer[error]
}

func (c conn) Read(b []byte) (n int, err error) {
return c.pr.Read(b)
func (c *conn) Read(b []byte) (n int, err error) {
n, err = c.pr.Read(b)
if errors.Is(err, io.ErrClosedPipe) {
if p := c.err.Load(); p != nil {
err = *p
}
}
return
}

func (c conn) Write(b []byte) (n int, err error) {
return c.pw.Write(b)
func (c *conn) Write(b []byte) (n int, err error) {
n, err = c.pw.Write(b)
if errors.Is(err, io.ErrClosedPipe) {
if p := c.err.Load(); p != nil {
err = *p
}
}
return
}

func (c conn) Close() error {
err := c.pw.Close()
func (c *conn) closeWithError(err error) error {
if err == nil {
err = net.ErrClosed
}

{
e := err
c.err.CompareAndSwap(nil, &e)
}
err = c.pw.CloseWithError(io.EOF)
if err != nil {
return fmt.Errorf("failed to close writer: %w", err)
}
<-c.execDone
err = c.pr.Close()
err = c.pr.CloseWithError(net.ErrClosed)
if err != nil {
return fmt.Errorf("failed to close reader: %w", err)
}
return nil
}

func (c conn) LocalAddr() net.Addr {
func (c *conn) Close() error {
return c.closeWithError(nil)
}

func (c *conn) LocalAddr() net.Addr {
return addr{}
}

func (c conn) RemoteAddr() net.Addr {
func (c *conn) RemoteAddr() net.Addr {
return addr{}
}

func (c conn) SetDeadline(t time.Time) error { return nil }
func (c *conn) SetDeadline(t time.Time) error { return nil }

func (c conn) SetReadDeadline(t time.Time) error { return nil }
func (c *conn) SetReadDeadline(t time.Time) error { return nil }

func (c conn) SetWriteDeadline(t time.Time) error { return nil }
func (c *conn) SetWriteDeadline(t time.Time) error { return nil }

func newConn(execDone <-chan struct{}) (*io.PipeReader, *io.PipeWriter, conn) {
func newConn() (*io.PipeReader, *io.PipeWriter, *conn) {
pr0, pw0 := io.Pipe()
pr1, pw1 := io.Pipe()
rwc := conn{pr: pr0, pw: pw1, execDone: execDone}
rwc := &conn{pr: pr0, pw: pw1}
return pr1, pw0, rwc
}

Expand Down
17 changes: 9 additions & 8 deletions pkg/k8s/dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,20 +168,21 @@ func TestDialUnreachable(t *testing.T) {
dialer.Close()
})

transport := &http.Transport{
DialContext: dialer.DialContext,
_, err = dialer.DialContext(ctx, "tcp", "does-not.exists.svc:80")
if err == nil {
t.Error("error was expected but got nil")
return
}

var client = http.Client{
Transport: transport,
if !strings.Contains(err.Error(), "no such host") {
t.Errorf("error %q doesn't contain expected substring: ", err.Error())
}

_, err = client.Get("http://does-not.exists.svc")
_, err = dialer.DialContext(ctx, "tcp", "localhost:80")
if err == nil {
t.Error("error was expected but got nil")
return
}
if !strings.Contains(err.Error(), "not resolve") {
t.Errorf("error %q doesn't containe expected sub-string: ", err.Error())
if !strings.Contains(err.Error(), "connection refused") {
t.Errorf("error %q doesn't contain expected substring: ", err.Error())
}
}

0 comments on commit 8331fc8

Please sign in to comment.