diff --git a/test/end2end_test.go b/test/end2end_test.go index 0bb72891e698..882b87e0fc0c 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -34,11 +34,14 @@ package grpc_test import ( + "bytes" "flag" "fmt" "io" + "log" "math" "net" + "os" "reflect" "runtime" "sort" @@ -53,6 +56,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/grpclog" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1alpha" "google.golang.org/grpc/metadata" @@ -279,6 +283,13 @@ const tlsDir = "testdata/" func TestReconnectTimeout(t *testing.T) { defer leakCheck(t)() + restore := declareLogNoise(t, + "transport: http2Client.notifyError got notified that the client transport was broken", + "grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport", + "grpc: Conn.transportMonitor exits due to: grpc: timed out trying to connect", + ) + defer restore() + lis, err := net.Listen("tcp", ":0") if err != nil { t.Fatalf("Failed to listen: %v", err) @@ -448,6 +459,14 @@ func TestTimeoutOnDeadServer(t *testing.T) { } func testTimeoutOnDeadServer(t *testing.T, e env) { + restore := declareLogNoise(t, + "transport: http2Client.notifyError got notified that the client transport was broken EOF", + "grpc: Conn.transportMonitor exits due to: grpc: the client connection is closing", + "grpc: Conn.resetTransport failed to create client transport: connection error", + "grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix", + ) + defer restore() + s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e) cc := clientSetUp(t, addr, nil, nil, "", e) tc := testpb.NewTestServiceClient(cc) @@ -482,6 +501,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { t.Fatalf("cc.State() = %s, %v, want %s or %s, ", state, err, grpc.Connecting, grpc.TransientFailure) } cc.Close() + awaitLogOutput(50*time.Millisecond, "grpc: the client connection is closing; please retry") } func healthCheck(d time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) { @@ -519,6 +539,12 @@ func TestHealthCheckOnFailure(t *testing.T) { } func testHealthCheckOnFailure(t *testing.T, e env) { + defer leakCheck(t)() + restore := declareLogNoise(t, + "Failed to dial ", + "grpc: the client connection is closing; please retry", + ) + defer restore() hs := health.NewHealthServer() hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 1) s, addr := serverSetUp(t, true, hs, math.MaxUint32, nil, nil, e) @@ -527,6 +553,7 @@ func testHealthCheckOnFailure(t *testing.T, e env) { if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1alpha.Health"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") { t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %d", err, codes.DeadlineExceeded) } + awaitLogOutput(50*time.Millisecond, "grpc: the client connection is closing; please retry") } func TestHealthCheckOff(t *testing.T) { @@ -769,6 +796,10 @@ func TestRetry(t *testing.T) { // TODO(zhaoq): Refactor to make this clearer and add more cases to test racy // and error-prone paths. func testRetry(t *testing.T, e env) { + restore := declareLogNoise(t, + "transport: http2Client.notifyError got notified that the client transport was broken", + ) + defer restore() s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e) cc := clientSetUp(t, addr, nil, nil, "", e) tc := testpb.NewTestServiceClient(cc) @@ -850,6 +881,10 @@ func TestCancel(t *testing.T) { } func testCancel(t *testing.T, e env) { + restore := declareLogNoise(t, + "grpc: the client connection is closing; please retry", + ) + defer restore() s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e) cc := clientSetUp(t, addr, nil, nil, "", e) tc := testpb.NewTestServiceClient(cc) @@ -873,6 +908,14 @@ func testCancel(t *testing.T, e env) { if grpc.Code(err) != codes.Canceled { t.Fatalf(`TestService/UnaryCall(_, _) = %v, %v; want , error code: %d`, reply, err, codes.Canceled) } + cc.Close() + + // Wait for any NewConn's goroutine to terminate, if it's + // still running. It spams logs with this message. We wait + // for it so our log filter is still active. Otherwise the + // "defer restore()" at the top restores our log filter and + // then the goroutine spams. + awaitLogOutput(50*time.Millisecond, "grpc: the client connection is closing; please retry") } func TestCancelNoIO(t *testing.T) { @@ -1417,3 +1460,117 @@ func leakCheck(t testing.TB) func() { } } } + +type lockingWriter struct { + mu sync.Mutex + w io.Writer +} + +func (lw *lockingWriter) Write(p []byte) (n int, err error) { + lw.mu.Lock() + defer lw.mu.Unlock() + return lw.w.Write(p) +} + +func (lw *lockingWriter) setWriter(w io.Writer) { + lw.mu.Lock() + defer lw.mu.Unlock() + lw.w = w +} + +var testLogOutput = &lockingWriter{w: os.Stderr} + +func awaitLogOutput(maxWait time.Duration, phrase string) { + pb := []byte(phrase) + + timer := time.NewTimer(maxWait) + defer timer.Stop() + wakeup := make(chan bool, 1) + for { + if logOutputHasContents(pb, wakeup) { + return + } + select { + case <-timer.C: + // Too slow. Oh well. + return + case <-wakeup: + } + } +} + +func logOutputHasContents(v []byte, wakeup chan<- bool) bool { + testLogOutput.mu.Lock() + defer testLogOutput.mu.Unlock() + fw, ok := testLogOutput.w.(*filterWriter) + if !ok { + return false + } + fw.mu.Lock() + defer fw.mu.Unlock() + if bytes.Contains(fw.buf.Bytes(), v) { + return true + } + fw.wakeup = wakeup + return false +} + +func init() { + grpclog.SetLogger(log.New(testLogOutput, "", log.LstdFlags)) +} + +var verboseLogs = flag.Bool("verbose_logs", false, "show all grpclog output, without filtering") + +func noop() {} + +// declareLogNoise declares that t is expected to emit the following noisy phrases, +// even on success. Those phrases will be filtered from grpclog output +// and only be shown if *verbose_logs or t ends up failing. +// The returned restore function should be called with defer to be run +// before the test ends. +func declareLogNoise(t *testing.T, phrases ...string) (restore func()) { + if *verboseLogs { + return noop + } + fw := &filterWriter{dst: os.Stderr, filter: phrases} + testLogOutput.setWriter(fw) + return func() { + if t.Failed() { + fw.mu.Lock() + defer fw.mu.Unlock() + if fw.buf.Len() > 0 { + t.Logf("Complete log output:\n%s", fw.buf.Bytes()) + } + } + testLogOutput.setWriter(os.Stderr) + } +} + +type filterWriter struct { + dst io.Writer + filter []string + + mu sync.Mutex + buf bytes.Buffer + wakeup chan<- bool // if non-nil, gets true on write +} + +func (fw *filterWriter) Write(p []byte) (n int, err error) { + fw.mu.Lock() + fw.buf.Write(p) + if fw.wakeup != nil { + select { + case fw.wakeup <- true: + default: + } + } + fw.mu.Unlock() + + ps := string(p) + for _, f := range fw.filter { + if strings.Contains(ps, f) { + return len(p), nil + } + } + return fw.dst.Write(p) +}