Skip to content

Commit

Permalink
[libbeat] Add more disk queue unit tests and fix a size-check bug (#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed Oct 22, 2020
1 parent f33bfd9 commit 2e7b902
Show file tree
Hide file tree
Showing 3 changed files with 594 additions and 58 deletions.
24 changes: 17 additions & 7 deletions libbeat/publisher/queue/diskqueue/core_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ func (dq *diskQueue) handleProducerWriteRequest(request producerWriteRequest) {
// than an entire segment all by itself (as long as it isn't, it is
// guaranteed to eventually enter the queue assuming no disk errors).
frameSize := request.frame.sizeOnDisk()
if dq.settings.MaxSegmentSize < frameSize {
if dq.settings.maxSegmentOffset() < segmentOffset(frameSize) {
dq.logger.Warnf(
"Rejecting event with size %v because the maximum segment size is %v",
frameSize, dq.settings.MaxSegmentSize)
"Rejecting event with size %v because the segment buffer limit is %v",
frameSize, dq.settings.maxSegmentOffset())
request.responseChan <- false
return
}
Expand Down Expand Up @@ -326,13 +326,19 @@ func (dq *diskQueue) maybeWritePending() {
// Nothing to do right now
return
}

// Remove everything from pendingFrames and forward it to the writer loop.
frames := dq.pendingFrames
dq.pendingFrames = nil
dq.writerLoop.requestChan <- writerLoopRequest{frames: frames}

dq.writerLoop.requestChan <- writerLoopRequest{
frames: frames,
// Compute the size of the request so we know how full the queue is going
// to be.
totalSize := uint64(0)
for _, sf := range frames {
totalSize += sf.frame.sizeOnDisk()
}
dq.writeRequestSize = totalSize
dq.writing = true
}

Expand Down Expand Up @@ -471,8 +477,12 @@ func (dq *diskQueue) canAcceptFrameOfSize(frameSize uint64) bool {
// left in the queue after accounting for the existing segments and the
// pending writes that were already accepted.
pendingBytes := uint64(0)
for _, request := range dq.pendingFrames {
pendingBytes += request.frame.sizeOnDisk()
for _, sf := range dq.pendingFrames {
pendingBytes += sf.frame.sizeOnDisk()
}
// If a writing request is outstanding, include it in the size total.
if dq.writing {
pendingBytes += dq.writeRequestSize
}
currentSize := pendingBytes + dq.segments.sizeOnDisk()

Expand Down
Loading

0 comments on commit 2e7b902

Please sign in to comment.