Skip to content

Commit

Permalink
client: fix race between transport draining and new RPCs (#2919)
Browse files Browse the repository at this point in the history
Before these fixes, it was possible to see errors on new RPCs after a
connection began draining, and before establishing a new connection.  There is
an inherent race between choosing a SubConn and attempting to creating a stream
on it.  We should be able to avoid application-visible RPC errors due to this
with transparent retry.  However, several bugs were preventing this from
working correctly:

1. Non-wait-for-ready RPCs were skipping transparent retry, though the retry
design calls for retrying them.

2. The transport closed itself (and would consequently error new RPCs) before
notifying the SubConn that it was draining.

3. The SubConn wasn't synchronously updating itself once it was notified about
the closing or draining state.

4. The SubConn would go into the TRANSIENT_FAILURE state instantaneously,
causing RPCs to fail instead of queue.
  • Loading branch information
dfawley committed Jul 22, 2019
1 parent a975db9 commit 9771422
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 43 deletions.
12 changes: 6 additions & 6 deletions balancer/grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,9 +1125,9 @@ func TestGRPCLBStatsUnaryFailedToSend(t *testing.T) {
})

if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC),
numCallsFinished: int64(countRPC),
numCallsFinishedWithClientFailedToSend: int64(countRPC - 1),
numCallsStarted: int64(countRPC)*2 - 1,
numCallsFinished: int64(countRPC)*2 - 1,
numCallsFinishedWithClientFailedToSend: int64(countRPC-1) * 2,
numCallsFinishedKnownReceived: 1,
}); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1227,9 +1227,9 @@ func TestGRPCLBStatsStreamingFailedToSend(t *testing.T) {
})

if err := checkStats(stats, &rpcStats{
numCallsStarted: int64(countRPC),
numCallsFinished: int64(countRPC),
numCallsFinishedWithClientFailedToSend: int64(countRPC - 1),
numCallsStarted: int64(countRPC)*2 - 1,
numCallsFinished: int64(countRPC)*2 - 1,
numCallsFinishedWithClientFailedToSend: int64(countRPC-1) * 2,
numCallsFinishedKnownReceived: 1,
}); err != nil {
t.Fatal(err)
Expand Down
45 changes: 31 additions & 14 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,8 +1060,8 @@ func (ac *addrConn) resetTransport() {

ac.mu.Lock()
if ac.state == connectivity.Shutdown {
newTr.Close()
ac.mu.Unlock()
newTr.Close()
return
}
ac.curAddr = addr
Expand All @@ -1076,20 +1076,16 @@ func (ac *addrConn) resetTransport() {
// we restart from the top of the addr list.
<-reconnect.Done()
hcancel()

// Need to reconnect after a READY, the addrConn enters
// TRANSIENT_FAILURE.
// restart connecting - the top of the loop will set state to
// CONNECTING. This is against the current connectivity semantics doc,
// however it allows for graceful behavior for RPCs not yet dispatched
// - unfortunate timing would otherwise lead to the RPC failing even
// though the TRANSIENT_FAILURE state (called for by the doc) would be
// instantaneous.
//
// This will set addrConn to TRANSIENT_FAILURE for a very short period
// of time, and turns CONNECTING. It seems reasonable to skip this, but
// READY-CONNECTING is not a valid transition.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
ac.updateConnectivityState(connectivity.TransientFailure)
ac.mu.Unlock()
// Ideally we should transition to Idle here and block until there is
// RPC activity that leads to the balancer requesting a reconnect of
// the associated SubConn.
}
}

Expand Down Expand Up @@ -1146,14 +1142,35 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
Authority: ac.cc.authority,
}

once := sync.Once{}
onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
once.Do(func() {
if ac.state == connectivity.Ready {
// Prevent this SubConn from being used for new RPCs by setting its
// state to Connecting.
//
// TODO: this should be Idle when grpc-go properly supports it.
ac.updateConnectivityState(connectivity.Connecting)
}
})
ac.mu.Unlock()
reconnect.Fire()
}

onClose := func() {
ac.mu.Lock()
once.Do(func() {
if ac.state == connectivity.Ready {
// Prevent this SubConn from being used for new RPCs by setting its
// state to Connecting.
//
// TODO: this should be Idle when grpc-go properly supports it.
ac.updateConnectivityState(connectivity.Connecting)
}
})
ac.mu.Unlock()
close(onCloseCalled)
reconnect.Fire()
}
Expand Down
13 changes: 5 additions & 8 deletions clientconn_state_transition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,11 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s
}
}

// When a READY connection is closed, the client enters TRANSIENT FAILURE before CONNECTING.
func (s) TestStateTransitions_ReadyToTransientFailure(t *testing.T) {
// When a READY connection is closed, the client enters CONNECTING.
func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.TransientFailure,
connectivity.Connecting,
}

Expand Down Expand Up @@ -260,8 +259,8 @@ func (s) TestStateTransitions_ReadyToTransientFailure(t *testing.T) {
}
}

// When the first connection is closed, the client enters stays in CONNECTING
// until it tries the second address (which succeeds, and then it enters READY).
// When the first connection is closed, the client stays in CONNECTING until it
// tries the second address (which succeeds, and then it enters READY).
func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
want := []connectivity.State{
connectivity.Connecting,
Expand Down Expand Up @@ -354,13 +353,11 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T)
}

// When there are multiple addresses, and we enter READY on one of them, a
// later closure should cause the client to enter TRANSIENT FAILURE before it
// re-enters CONNECTING.
// later closure should cause the client to enter CONNECTING
func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.TransientFailure,
connectivity.Connecting,
}

Expand Down
12 changes: 7 additions & 5 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
close(s.headerChan)
}

}
hdr := &headerFrame{
hf: headerFields,
Expand Down Expand Up @@ -769,6 +768,9 @@ func (t *http2Client) Close() error {
t.mu.Unlock()
return nil
}
// Call t.onClose before setting the state to closing to prevent the client
// from attempting to create new streams ASAP.
t.onClose()
t.state = closing
streams := t.activeStreams
t.activeStreams = nil
Expand All @@ -789,7 +791,6 @@ func (t *http2Client) Close() error {
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
t.onClose()
return err
}

Expand Down Expand Up @@ -1085,11 +1086,12 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
default:
t.setGoAwayReason(f)
close(t.goAway)
t.state = draining
t.controlBuf.put(&incomingGoAway{})

// This has to be a new goroutine because we're still using the current goroutine to read in the transport.
// Notify the clientconn about the GOAWAY before we set the state to
// draining, to allow the client to stop attempting to create streams
// before disallowing new streams on this connection.
t.onGoAway(t.goAwayReason)
t.state = draining
}
// All streams with IDs greater than the GoAwayId
// and smaller than the previous GoAway ID should be killed.
Expand Down
4 changes: 2 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ func (cs *clientStream) shouldRetry(err error) error {
if cs.attempt.s != nil {
<-cs.attempt.s.Done()
}
if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
// First attempt, wait-for-ready, stream unprocessed: transparently retry.
if cs.firstAttempt && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
// First attempt, stream unprocessed: transparently retry.
cs.firstAttempt = false
return nil
}
Expand Down
12 changes: 4 additions & 8 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3627,7 +3627,10 @@ func testTransparentRetry(t *testing.T, e env) {
}, {
successAttempt: 2,
failFast: true,
errCode: codes.Unavailable, // We won't retry on fail fast.
}, {
successAttempt: 3,
failFast: true,
errCode: codes.Unavailable,
}}
for _, tc := range testCases {
attempts = 0
Expand Down Expand Up @@ -7133,9 +7136,7 @@ func (s) TestGoAwayThenClose(t *testing.T) {
}
s2 := grpc.NewServer()
defer s2.Stop()
conn2Ready := grpcsync.NewEvent()
ts2 := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
conn2Ready.Fire()
return &testpb.SimpleResponse{}, nil
}}
testpb.RegisterTestServiceServer(s2, ts2)
Expand Down Expand Up @@ -7177,11 +7178,6 @@ func (s) TestGoAwayThenClose(t *testing.T) {
t.Fatal("expected the stream to die, but got a successful Recv")
}

// Connection was dialed, so ac is either in connecting or ready. Because there's a race
// between ac state change and balancer state change, so cc could still be transient
// failure. This wait make sure cc is at least in connecting, and RPCs won't fail after
// this.
cc.WaitForStateChange(ctx, connectivity.TransientFailure)
// Do a bunch of RPCs, make sure it stays stable. These should go to connection 2.
for i := 0; i < 10; i++ {
if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
Expand Down
74 changes: 74 additions & 0 deletions test/goaway_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package test

import (
"context"
"net"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
testpb "google.golang.org/grpc/test/grpc_testing"
)

// TestGracefulClientOnGoAway attempts to ensure that when the server sends a
// GOAWAY (in this test, by configuring max connection age on the server), a
// client will never see an error. This requires that the client is appraised
// of the GOAWAY and updates its state accordingly before the transport stops
// accepting new streams. If a subconn is chosen by a picker and receives the
// goaway before creating the stream, an error will occur, but upon transparent
// retry, the clientconn will ensure a ready subconn is chosen.
func (s) TestGracefulClientOnGoAway(t *testing.T) {
const maxConnAge = 100 * time.Millisecond
const testTime = maxConnAge * 10

ss := &stubServer{
emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
},
}

s := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: maxConnAge}))
defer s.Stop()
testpb.RegisterTestServiceServer(s, ss)

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to create listener: %v", err)
}
go s.Serve(lis)

cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
if err != nil {
t.Fatalf("Failed to dial server: %v", err)
}
defer cc.Close()
c := testpb.NewTestServiceClient(cc)

endTime := time.Now().Add(testTime)
for time.Now().Before(endTime) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall(_, _) = _, %v; want _, <nil>", err)
}
cancel()
}
}

0 comments on commit 9771422

Please sign in to comment.