Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add specialized buffers to memqueue #5148

Merged
merged 3 commits into from
Sep 12, 2017

Conversation

urso
Copy link

@urso urso commented Sep 11, 2017

Use specialized buffer types and eventloops depending on the memqueue
configuration.

If flushing is disabled, a region based ring buffer is used, as loads of
small batches can be generated and we want to minimize additional
allocations of small objects.

If flushing is enabled a list of active and flushed buffers is managed
by the queue. If a buffer is flushed by timeout, but not yet processed
(and not full), additional events being published are added to the
already flushed buffer.

This PR fixes a bug in beta2 regarding the flush timeout not working properly. Having distinct state machines and buffer types for flush based and non-flush based queue configurations helps solving the issue, due to having more simple use-case specific state-machines.

  • buffer handling is moved from 'broker' to actual eventloop implementation
  • 'broker' only provides common types/channels/interfaces
  • renamed brokerBuffer to ringBuffer
  • introduce batchBuffer used to hold flushable batches of events
  • introduce separate event loop types

urso added 3 commits September 11, 2017 16:33
Move the buffer and message handling from memqueue broker type
to the actual go routines handling the messaging.
Use specialized buffer types and eventloops depending on the memqueue
configuration.

If flushing is disabled, a region based ring buffer is used, as loads of
small batches can be generated and we want to minimize additional
allocations of small objects.

If flushing is enabled a list of active and flushed buffers is managed
by the queue. If a buffer is flushed by timeout, but not yet processed
(and not full), additional events being published are added to the
already flushed buffer.
@urso urso added bug libbeat needs_backport PR is waiting to be backported to other branches. review labels Sep 11, 2017
@urso urso mentioned this pull request Sep 11, 2017
Copy link
Member

@ruflin ruflin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite challenging to follow the full change as I think the tricky parts are in the nitty gritty details. As test almost didn't change, it seems things are working as before.

Would be nice if we could an additional pair of eyes on this PR.

minEvents int
idleTimeout time.Duration
bufSize int
// buf brokerBuffer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

count := len(buf)
if count == 0 {
panic("empty batch returned")
}

// log.Debug("newACKChan: ", b.ackSeq, count)
ackCH := newACKChan(l.ackSeq, start, count)
ackCH := newACKChan(l.ackSeq, start, count, l.buf.buf.clients)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming smells a bit here ;-)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's due to the old code. Some more cleanups will come at some point.


// Internal event ring buffer.
// The ring is split into 2 regions.
// Region A contains active events to be send to consumers, while region B can
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/send/sent/

}

func (b *ringBuffer) insert(event publisher.Event, client clientState) (bool, int) {
// log := b.buf.logger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover?


// region B does not exist yet, check if region A is available for use
idx := b.regA.index + b.regA.size
// log.Debug(" - index: ", idx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite a few logging leftsovers in the code here and below I think.

@urso
Copy link
Author

urso commented Sep 12, 2017

Debug Logging 'leftovers' are on purpose. It's some 'ExtraDebug' :)
The buffer handling is quite tricky and in case I need to debug super helpful. Just commented out by default as these create too much noise when enabling debug logging in beats.

type ringBuffer struct {
buf eventBuffer

regA, regB region
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for the record: I think regA and regB could be renamed to regConsumer and regProducer, so the could can make more sense without the comments. But as it is just moved from a different a file I can live with it. It is already an improvement over the previous state.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tl;dr: Only purpose of regB is to handle the filling the ring buffers when the indices in the ring buffer 'wrap around'.

regA is regConsumer and regProducer as long as there is space left in regA :)
regB only comes into existence if the ring buffer wraps around (no more space in regA).
Once regB exists, regA is indeed consumer only and regB is producer only.
After wrapping around regB becomes regA, once regA becomes empty. From then on regA is again shared by producers and consumers until another 'wrap around' on the ring buffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have seen that. I was just trying to come up with names which could help the understanding more. But ofc, the proposed names were not descriptive either. :(

@exekias exekias merged commit ffd23a9 into elastic:master Sep 12, 2017
urso pushed a commit to urso/beats that referenced this pull request Sep 12, 2017
* memqueue cleanup

Move the buffer and message handling from memqueue broker type
to the actual go routines handling the messaging.

* Add specialized buffers to memqueue

Use specialized buffer types and eventloops depending on the memqueue
configuration.

If flushing is disabled, a region based ring buffer is used, as loads of
small batches can be generated and we want to minimize additional
allocations of small objects.

If flushing is enabled a list of active and flushed buffers is managed
by the queue. If a buffer is flushed by timeout, but not yet processed
(and not full), additional events being published are added to the
already flushed buffer.

* ensure flush list does not contain empty buffers

(cherry picked from commit ffd23a9)
@urso urso removed the needs_backport PR is waiting to be backported to other branches. label Sep 12, 2017
andrewkroh pushed a commit that referenced this pull request Sep 12, 2017
* memqueue cleanup

Move the buffer and message handling from memqueue broker type
to the actual go routines handling the messaging.

* Add specialized buffers to memqueue

Use specialized buffer types and eventloops depending on the memqueue
configuration.

If flushing is disabled, a region based ring buffer is used, as loads of
small batches can be generated and we want to minimize additional
allocations of small objects.

If flushing is enabled a list of active and flushed buffers is managed
by the queue. If a buffer is flushed by timeout, but not yet processed
(and not full), additional events being published are added to the
already flushed buffer.

* ensure flush list does not contain empty buffers

(cherry picked from commit ffd23a9)
@urso urso deleted the memqueue/special-buffers branch February 19, 2019 18:41
leweafan pushed a commit to leweafan/beats that referenced this pull request Apr 28, 2023
* memqueue cleanup

Move the buffer and message handling from memqueue broker type
to the actual go routines handling the messaging.

* Add specialized buffers to memqueue

Use specialized buffer types and eventloops depending on the memqueue
configuration.

If flushing is disabled, a region based ring buffer is used, as loads of
small batches can be generated and we want to minimize additional
allocations of small objects.

If flushing is enabled a list of active and flushed buffers is managed
by the queue. If a buffer is flushed by timeout, but not yet processed
(and not full), additional events being published are added to the
already flushed buffer.

* ensure flush list does not contain empty buffers

(cherry picked from commit 7968572)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants