Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport: block reading frames when too many transport control frames are queued #2970

Merged
merged 1 commit into from
Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 81 additions & 3 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"runtime"
"sync"
"sync/atomic"

"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
Expand Down Expand Up @@ -84,12 +85,24 @@ func (il *itemList) isEmpty() bool {
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.

// maxQueuedTransportResponseFrames is the most queued "transport response"
// frames we will buffer before preventing new reads from occurring on the
// transport. These are control frames sent in response to client requests,
// such as RST_STREAM due to bad headers or settings acks.
const maxQueuedTransportResponseFrames = 50

type cbItem interface {
isTransportResponseFrame() bool
}

// registerStream is used to register an incoming stream with loopy writer.
type registerStream struct {
streamID uint32
wq *writeQuota
}

func (*registerStream) isTransportResponseFrame() bool { return false }

// headerFrame is also used to register stream on the client-side.
type headerFrame struct {
streamID uint32
Expand All @@ -102,13 +115,19 @@ type headerFrame struct {
onOrphaned func(error) // Valid on client-side
}

func (h *headerFrame) isTransportResponseFrame() bool {
return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
}

type cleanupStream struct {
streamID uint32
rst bool
rstCode http2.ErrCode
onWrite func()
}

func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM

type dataFrame struct {
streamID uint32
endStream bool
Expand All @@ -119,43 +138,63 @@ type dataFrame struct {
onEachWrite func()
}

func (*dataFrame) isTransportResponseFrame() bool { return false }

type incomingWindowUpdate struct {
streamID uint32
increment uint32
}

func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }

type outgoingWindowUpdate struct {
streamID uint32
increment uint32
}

func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
return false // window updates are throttled by thresholds
}

type incomingSettings struct {
ss []http2.Setting
}

func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK

type outgoingSettings struct {
ss []http2.Setting
}

func (*outgoingSettings) isTransportResponseFrame() bool { return false }

type incomingGoAway struct {
}

func (*incomingGoAway) isTransportResponseFrame() bool { return false }

type goAway struct {
code http2.ErrCode
debugData []byte
headsUp bool
closeConn bool
}

func (*goAway) isTransportResponseFrame() bool { return false }

type ping struct {
ack bool
data [8]byte
}

func (*ping) isTransportResponseFrame() bool { return true }

type outFlowControlSizeRequest struct {
resp chan uint32
}

func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }

type outStreamState int

const (
Expand Down Expand Up @@ -238,6 +277,14 @@ type controlBuffer struct {
consumerWaiting bool
list *itemList
err error

// transportResponseFrames counts the number of queued items that represent
// the response of an action initiated by the peer. trfChan is created
// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
// closed and nilled when transportResponseFrames drops below the
// threshold. Both fields are protected by mu.
transportResponseFrames int
trfChan atomic.Value // *chan struct{}
}

func newControlBuffer(done <-chan struct{}) *controlBuffer {
Expand All @@ -248,12 +295,24 @@ func newControlBuffer(done <-chan struct{}) *controlBuffer {
}
}

func (c *controlBuffer) put(it interface{}) error {
// throttle blocks if there are too many incomingSettings/cleanupStreams in the
// controlbuf.
func (c *controlBuffer) throttle() {
ch, _ := c.trfChan.Load().(*chan struct{})
if ch != nil {
select {
case <-*ch:
case <-c.done:
}
}
}

func (c *controlBuffer) put(it cbItem) error {
_, err := c.executeAndPut(nil, it)
return err
}

func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) {
func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
var wakeUp bool
c.mu.Lock()
if c.err != nil {
Expand All @@ -271,6 +330,15 @@ func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{
c.consumerWaiting = false
}
c.list.enqueue(it)
if it.isTransportResponseFrame() {
c.transportResponseFrames++
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are adding the frame that puts us over the threshold; create
// a throttling channel.
ch := make(chan struct{})
c.trfChan.Store(&ch)
}
}
c.mu.Unlock()
if wakeUp {
select {
Expand Down Expand Up @@ -304,7 +372,17 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
return nil, c.err
}
if !c.list.isEmpty() {
h := c.list.dequeue()
h := c.list.dequeue().(cbItem)
if h.isTransportResponseFrame() {
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are removing the frame that put us over the
// threshold; close and clear the throttling channel.
ch := c.trfChan.Load().(*chan struct{})
close(*ch)
c.trfChan.Store((*chan struct{})(nil))
}
c.transportResponseFrames--
}
c.mu.Unlock()
return h, nil
}
Expand Down
1 change: 1 addition & 0 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,7 @@ func (t *http2Client) reader() {

// loop to keep reading incoming messages on this transport.
for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
Expand Down
1 change: 1 addition & 0 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
defer close(t.readerDone)
for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
atomic.StoreUint32(&t.activity, 1)
if err != nil {
Expand Down