Skip to content

Commit

Permalink
transport: fix race sending RPC status that could lead to a panic (#1687
Browse files Browse the repository at this point in the history
)

WriteStatus can be called concurrently: one by SendMsg,
the other by RecvMsg. Then, closing writes channel
becomes racey without proper locking.

Make transport closing synchronous in such case.
  • Loading branch information
gyuho authored and dfawley committed Nov 30, 2017
1 parent 00383af commit c6b4608
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 16 deletions.
23 changes: 11 additions & 12 deletions transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,9 @@ type serverHandlerTransport struct {
// when WriteStatus is called.
writes chan func()

mu sync.Mutex
// streamDone indicates whether WriteStatus has been called and writes channel
// has been closed.
streamDone bool
// block concurrent WriteStatus calls
// e.g. grpc/(*serverStream).SendMsg/RecvMsg
writeStatusMu sync.Mutex
}

func (ht *serverHandlerTransport) Close() error {
Expand Down Expand Up @@ -177,13 +176,9 @@ func (ht *serverHandlerTransport) do(fn func()) error {
}

func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
ht.mu.Lock()
if ht.streamDone {
ht.mu.Unlock()
return nil
}
ht.streamDone = true
ht.mu.Unlock()
ht.writeStatusMu.Lock()
defer ht.writeStatusMu.Unlock()

err := ht.do(func() {
ht.writeCommonHeaders(s)

Expand Down Expand Up @@ -222,7 +217,11 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
}
}
})
close(ht.writes)

if err == nil { // transport has not been closed
ht.Close()
close(ht.writes)
}
return err
}

Expand Down
27 changes: 23 additions & 4 deletions transport/handler_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,10 @@ func TestHandlerTransport_HandleStreams_Timeout(t *testing.T) {
}
}

// TestHandlerTransport_HandleStreams_MultiWriteStatus ensures that
// concurrent "WriteStatus"s do not panic writing to closed "writes" channel.
func TestHandlerTransport_HandleStreams_MultiWriteStatus(t *testing.T) {
st := newHandleStreamTest(t)
handleStream := func(s *Stream) {
testHandlerTransportHandleStreams(t, func(st *handleStreamTest, s *Stream) {
if want := "/service/foo.bar"; s.method != want {
t.Errorf("stream method = %q; want %q", s.method, want)
}
Expand All @@ -408,9 +409,27 @@ func TestHandlerTransport_HandleStreams_MultiWriteStatus(t *testing.T) {
}()
}
wg.Wait()
}
})
}

// TestHandlerTransport_HandleStreams_WriteStatusWrite ensures that "Write"
// following "WriteStatus" does not panic writing to closed "writes" channel.
func TestHandlerTransport_HandleStreams_WriteStatusWrite(t *testing.T) {
testHandlerTransportHandleStreams(t, func(st *handleStreamTest, s *Stream) {
if want := "/service/foo.bar"; s.method != want {
t.Errorf("stream method = %q; want %q", s.method, want)
}
st.bodyw.Close() // no body

st.ht.WriteStatus(s, status.New(codes.OK, ""))
st.ht.Write(s, []byte("hdr"), []byte("data"), &Options{})
})
}

func testHandlerTransportHandleStreams(t *testing.T, handleStream func(st *handleStreamTest, s *Stream)) {
st := newHandleStreamTest(t)
st.ht.HandleStreams(
func(s *Stream) { go handleStream(s) },
func(s *Stream) { go handleStream(st, s) },
func(ctx context.Context, method string) context.Context { return ctx },
)
}
Expand Down

0 comments on commit c6b4608

Please sign in to comment.