-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Loki: Flush chunks one at a time #5894
Conversation
- Calls `store.Put` (flushes chunk to the store) for each chunk individually until it doesn't work instead of calling it for all chunks a single time
11d6d6e
to
3774fb3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! This isn't easy code and it's a great surprise to wake up and see a bug fix for this. I've left one suggestion around locking, then LGTM
|
||
if ok && compressedSize > 0 { | ||
chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize) | ||
if err := i.flushChunk(ctx, ch); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This flushChunk
call is done with the mutex held, meaning we need to wait for the chunk to finish flushing to remote storage before the stream's chunkMtx is unlocked. Instead, we should mimic the original implementation which
- locked the mutex
- close the chunks
- release lock
- flush chunks
- lock again
- update the chunks'
chunk.flushed = time.Now()
- unlock again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this. Pushed a commit addressing it, and now I'm checking how it is going.
edit: I believe it is working fine. WDYT?
// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed | ||
// chunk to have another opportunity to be flushed. | ||
func (i *Ingester) flushChunk(ctx context.Context, ch chunk.Chunk) error { | ||
if err := i.store.Put(ctx, []chunk.Chunk{ch}); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this call i.store.PutOne
for clarity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm that's a good call but after digging into it, looks like a bigger decision has to be made because:
- ChunkStore interface doesn't have a
PutOne
. I could add one, tho Put
using multiple chunks is used by other parts of the code so I can't just get rid of it
Do you think it is worth it if I add a new PutOne that basically calls the Store PutOne
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine calling Put, as Dylan said their isn't a PutOne method on the store interface being used here, and isn't really worth adding it IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I got confused with the different store interfaces. Definitely not worth adding an interface method if it isn't there already.
I'm also not sure that this is in scope for this PR(let me know if I should open a PR to do this after this one is merged): We're incrementing Should we also measure the number of chunks that were not flushed? |
- Updates our locking strategy to release the chunk lock more frequently. This release the lock after closing and marking a chunk as flushed.
I'm pushing a commit addressing this problem with Regarding measuring the number of chunks not flushed, I think we can derive it from the total number of chunks (I think it is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
So Oof rationalizing these seems like a bigger change(probably out of scope here). Maybe revert the change to move the |
I think moving the flushed metric still makes sense to be consistent on only updating the metric if the chunk is actually flushed. There is also |
flush queue length tracks the number of streams enqueued to be flushed. That number can stay flat while ingester is falling behind because a large number of chunks is going unflushed. Breaking down by tenant would be preferable since that is how one would apply limits/shed load, I think. Definitely outside the scope of this patch. Glad this was merged. Going to test the new version soon. Thanks @slim-bean and @DylanGuedes for getting this in so quickly! |
This reverts commit b3b9656.
What this PR does / why we need it:
This modifies our ingesters flushing strategy to flush one chunk at a time instead of flushing them in a bulk. This has two benefits/fixes in relation to the current implementation:
cs[i].flushed = time.Now()
. In the previous implementation, a chunk flush could only be marked if all other flushes also succeeded. This makes it more granular, helping chunks to not be flushed multiple times.This PR also refactor related functions into smaller pieces.
Which issue(s) this PR fixes:
N/A
Special notes for your reviewer:
For reference, see #5267 (comment).
Checklist
CHANGELOG.md
about the changes.