Skip to content

Commit

Permalink
Add specialized buffers to memqueue (#5148)
Browse files Browse the repository at this point in the history
* memqueue cleanup

Move the buffer and message handling from memqueue broker type
to the actual go routines handling the messaging.

* Add specialized buffers to memqueue

Use specialized buffer types and eventloops depending on the memqueue
configuration.

If flushing is disabled, a region based ring buffer is used, as loads of
small batches can be generated and we want to minimize additional
allocations of small objects.

If flushing is enabled a list of active and flushed buffers is managed
by the queue. If a buffer is flushed by timeout, but not yet processed
(and not full), additional events being published are added to the
already flushed buffer.

* ensure flush list does not contain empty buffers
  • Loading branch information
Steffen Siering authored and exekias committed Sep 12, 2017
1 parent 6082603 commit ffd23a9
Show file tree
Hide file tree
Showing 9 changed files with 909 additions and 505 deletions.
61 changes: 41 additions & 20 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,22 @@ package memqueue
// Producer ACKs are run in the ackLoop go-routine.
type ackLoop struct {
broker *Broker
sig chan batchAckRequest
sig chan batchAckMsg
lst chanList

totalACK uint64
totalSched uint64

batchesSched uint64
batchesACKed uint64

processACK func(chanList, int)
}

func newACKLoop(b *Broker, processACK func(chanList, int)) *ackLoop {
l := &ackLoop{broker: b}
l.processACK = processACK
return l
}

func (l *ackLoop) run() {
Expand Down Expand Up @@ -72,38 +80,51 @@ func (l *ackLoop) run() {
// handleBatchSig collects and handles a batch ACK/Cancel signal. handleBatchSig
// is run by the ackLoop.
func (l *ackLoop) handleBatchSig() int {
lst := l.collectAcked()

count := 0
for current := lst.front(); current != nil; current = current.next {
count += current.count
}

if e := l.broker.eventer; e != nil {
e.OnACK(count)
}

// report acks to waiting clients
l.processACK(lst, count)

for !lst.empty() {
releaseACKChan(lst.pop())
}

// return final ACK to EventLoop, in order to clean up internal buffer
l.broker.logger.Debug("ackloop: return ack to broker loop:", count)

l.totalACK += uint64(count)
l.broker.logger.Debug("ackloop: done send ack")
return count
}

func (l *ackLoop) collectAcked() chanList {
lst := chanList{}

acks := l.lst.pop()
l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count)
start := acks.start
count := acks.count
l.batchesACKed++
releaseACKChan(acks)
lst.append(acks)

done := false
// collect pending ACKs
for !l.lst.empty() && !done {
acks := l.lst.front()
select {
case <-acks.ch:
l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count)

count += acks.count
l.batchesACKed++
releaseACKChan(l.lst.pop())
lst.append(l.lst.pop())

default:
done = true
}
}

// report acks to waiting clients
states := l.broker.buf.buf.clients
l.broker.reportACK(states, start, count)

// return final ACK to EventLoop, in order to clean up internal buffer
l.broker.logger.Debug("ackloop: return ack to broker loop:", count)

l.totalACK += uint64(count)
l.broker.logger.Debug("ackloop: done send ack")
return count
return lst
}
65 changes: 65 additions & 0 deletions libbeat/publisher/queue/memqueue/batchbuf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package memqueue

import "github.com/elastic/beats/libbeat/publisher"

type batchBuffer struct {
next *batchBuffer
flushed bool
events []publisher.Event
clients []clientState
}

func newBatchBuffer(sz int) *batchBuffer {
b := &batchBuffer{}
b.init(sz)
return b
}

func (b *batchBuffer) init(sz int) {
b.events = make([]publisher.Event, 0, sz)
b.clients = make([]clientState, 0, sz)
}

func (b *batchBuffer) initWith(sz int, old batchBuffer) {
events, clients := old.events, old.clients
L := len(events)

b.events = make([]publisher.Event, L, sz)
b.clients = make([]clientState, L, sz)

copy(b.events, events)
copy(b.clients, clients)
}

func (b *batchBuffer) add(event publisher.Event, st clientState) {
b.events = append(b.events, event)
b.clients = append(b.clients, st)
}

func (b *batchBuffer) length() int {
return len(b.events)
}

func (b *batchBuffer) capacity() int {
return cap(b.events)
}

func (b *batchBuffer) cancel(st *produceState) int {
events := b.events[:0]
clients := b.clients[:0]

removed := 0
for i := range b.clients {
if b.clients[i].state == st {
removed++
continue
}

events = append(events, b.events[i])
clients = append(clients, b.clients[i])
}

b.events = events
b.clients = clients
return removed
}
165 changes: 33 additions & 132 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package memqueue

import (
"fmt"
"math"
"sync"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/queue"
)

Expand All @@ -16,9 +13,10 @@ type Broker struct {

logger logger

buf brokerBuffer
minEvents int
idleTimeout time.Duration
bufSize int
// buf brokerBuffer
// minEvents int
// idleTimeout time.Duration

// api channels
events chan pushRequest
Expand Down Expand Up @@ -46,9 +44,10 @@ type Settings struct {

type ackChan struct {
next *ackChan
ch chan batchAckRequest
ch chan batchAckMsg
seq uint
start, count int // number of events waiting for ACK
states []clientState
}

type chanList struct {
Expand Down Expand Up @@ -119,12 +118,20 @@ func NewBroker(

eventer: settings.Eventer,
}
b.buf.init(logger, sz)
b.minEvents = minEvents
b.idleTimeout = flushTimeout

eventLoop := newEventLoop(b)
ack := &ackLoop{broker: b}
var eventLoop interface {
run()
processACK(chanList, int)
}

if minEvents > 1 {
eventLoop = newBufferingEventLoop(b, sz, minEvents, flushTimeout)
} else {
eventLoop = newDirectEventLoop(b, sz)
}

b.bufSize = sz
ack := newACKLoop(b, eventLoop.processACK)

b.wg.Add(2)
go func() {
Expand All @@ -149,7 +156,7 @@ func (b *Broker) Close() error {

func (b *Broker) BufferConfig() queue.BufferConfig {
return queue.BufferConfig{
Events: b.buf.Size(),
Events: b.bufSize,
}
}

Expand All @@ -161,136 +168,21 @@ func (b *Broker) Consumer() queue.Consumer {
return newConsumer(b)
}

func (b *Broker) insert(req *pushRequest) (int, bool) {
var avail int
if req.state == nil {
_, avail = b.buf.insert(req.event, clientState{})
} else {
st := req.state
if st.cancelled {
b.logger.Debugf("cancelled producer - ignore event: %v\t%v\t%p", req.event, req.seq, req.state)

// do not add waiting events if producer did send cancel signal

if cb := st.dropCB; cb != nil {
cb(req.event.Content)
}

return -1, false
}

_, avail = b.buf.insert(req.event, clientState{
seq: req.seq,
state: st,
})
}

return avail, true
}

func (b *Broker) get(max int) (startIndex int, events []publisher.Event) {
return b.buf.reserve(max)
}

func (b *Broker) cancel(st *produceState) int {
return b.buf.cancel(st)
}

func (b *Broker) full() bool {
return b.buf.Full()
}

func (b *Broker) avail() int {
return b.buf.Avail()
}

func (b *Broker) totalAvail() int {
return b.buf.TotalAvail()
}

func (b *Broker) cleanACKs(count int) {
b.buf.ack(count)
}

func (b *Broker) reportACK(states []clientState, start, N int) {
{
start := time.Now()
b.logger.Debug("handle ACKs: ", N)
defer func() {
b.logger.Debug("handle ACK took: ", time.Since(start))
}()
}

if e := b.eventer; e != nil {
e.OnACK(N)
}

// TODO: global boolean to check if clients will need an ACK
// no need to report ACKs if no client is interested in ACKs

idx := start + N - 1
if idx >= len(states) {
idx -= len(states)
}

total := 0
for i := N - 1; i >= 0; i-- {
if idx < 0 {
idx = len(states) - 1
}

st := &states[idx]
b.logger.Debugf("try ack index: (idx=%v, i=%v, seq=%v)\n", idx, i, st.seq)

idx--
if st.state == nil {
b.logger.Debug("no state set")
continue
}

count := (st.seq - st.state.lastACK)
if count == 0 || count > math.MaxUint32/2 {
// seq number comparison did underflow. This happens only if st.seq has
// allready been acknowledged
// b.logger.Debug("seq number already acked: ", st.seq)

st.state = nil
continue
}

b.logger.Debugf("broker ACK events: count=%v, start-seq=%v, end-seq=%v\n",
count,
st.state.lastACK+1,
st.seq,
)

total += int(count)
if total > N {
panic(fmt.Sprintf("Too many events acked (expected=%v, total=%v)",
count, total,
))
}

st.state.cb(int(count))
st.state.lastACK = st.seq
st.state = nil
}
}

var ackChanPool = sync.Pool{
New: func() interface{} {
return &ackChan{
ch: make(chan batchAckRequest, 1),
ch: make(chan batchAckMsg, 1),
}
},
}

func newACKChan(seq uint, start, count int) *ackChan {
func newACKChan(seq uint, start, count int, states []clientState) *ackChan {
ch := ackChanPool.Get().(*ackChan)
ch.next = nil
ch.seq = seq
ch.start = start
ch.count = count
ch.states = states
return ch
}

Expand Down Expand Up @@ -342,7 +234,7 @@ func (l *chanList) front() *ackChan {
return l.head
}

func (l *chanList) channel() chan batchAckRequest {
func (l *chanList) channel() chan batchAckMsg {
if l.head == nil {
return nil
}
Expand All @@ -361,3 +253,12 @@ func (l *chanList) pop() *ackChan {
ch.next = nil
return ch
}

func (l *chanList) reverse() {
tmp := *l
*l = chanList{}

for !tmp.empty() {
l.prepend(tmp.pop())
}
}
Loading

0 comments on commit ffd23a9

Please sign in to comment.