Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: Flush chunks one at a time #5894

Merged
merged 4 commits into from
Apr 13, 2022

Conversation

DylanGuedes
Copy link
Contributor

@DylanGuedes DylanGuedes commented Apr 12, 2022

What this PR does / why we need it:
This modifies our ingesters flushing strategy to flush one chunk at a time instead of flushing them in a bulk. This has two benefits/fixes in relation to the current implementation:

  1. Imagine we have 250 chunks to flush, but chunk 125 fails to flush. In the previous implementation, we would first encode all 250 chunks, but flushing chunk 125 would fail so the time encoding the last 125 chunks would be a waste
  2. Flushes are marked with cs[i].flushed = time.Now(). In the previous implementation, a chunk flush could only be marked if all other flushes also succeeded. This makes it more granular, helping chunks to not be flushed multiple times.

This PR also refactor related functions into smaller pieces.

Which issue(s) this PR fixes:
N/A

Special notes for your reviewer:
For reference, see #5267 (comment).

Checklist

  • Documentation added
  • Tests updated
  • Add an entry in the CHANGELOG.md about the changes.

- Calls `store.Put` (flushes chunk to the store) for each chunk
  individually until it doesn't work instead of calling it for all
  chunks a single time
@DylanGuedes DylanGuedes changed the title Loki: Flush Ingester chunks one at a time Loki: Flush chunks one at a time Apr 12, 2022
Copy link
Member

@owen-d owen-d left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! This isn't easy code and it's a great surprise to wake up and see a bug fix for this. I've left one suggestion around locking, then LGTM


if ok && compressedSize > 0 {
chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize)
if err := i.flushChunk(ctx, ch); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flushChunk call is done with the mutex held, meaning we need to wait for the chunk to finish flushing to remote storage before the stream's chunkMtx is unlocked. Instead, we should mimic the original implementation which

  • locked the mutex
  • close the chunks
  • release lock
  • flush chunks
  • lock again
  • update the chunks' chunk.flushed = time.Now()
  • unlock again.

Copy link
Contributor Author

@DylanGuedes DylanGuedes Apr 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. Pushed a commit addressing it, and now I'm checking how it is going.
edit: I believe it is working fine. WDYT?

// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed
// chunk to have another opportunity to be flushed.
func (i *Ingester) flushChunk(ctx context.Context, ch chunk.Chunk) error {
if err := i.store.Put(ctx, []chunk.Chunk{ch}); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this call i.store.PutOne for clarity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that's a good call but after digging into it, looks like a bigger decision has to be made because:

  • ChunkStore interface doesn't have a PutOne. I could add one, tho
  • Put using multiple chunks is used by other parts of the code so I can't just get rid of it

Do you think it is worth it if I add a new PutOne that basically calls the Store PutOne?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine calling Put, as Dylan said their isn't a PutOne method on the store interface being used here, and isn't really worth adding it IMO

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I got confused with the different store interfaces. Definitely not worth adding an interface method if it isn't there already.

pkg/ingester/flush.go Outdated Show resolved Hide resolved
@afayngelerindbx
Copy link
Contributor

I'm also not sure that this is in scope for this PR(let me know if I should open a PR to do this after this one is merged):

We're incrementing chunksFlushedPerReason in collectChunksToFlush. This means that even if a particular chunk isn't flushed or even encoded, we count it in that metric. This is very misleading when debugging flush throughput e.g. the metric says that flushes are happening when, in fact, they are backlogged. It'd be nice to move that reporting into reportFlushedChunkStatistics

Should we also measure the number of chunks that were not flushed?

- Updates our locking strategy to release the chunk lock more
  frequently. This release the lock after closing and marking a chunk as
  flushed.
@DylanGuedes
Copy link
Contributor Author

I'm also not sure that this is in scope for this PR(let me know if I should open a PR to do this after this one is merged):

We're incrementing chunksFlushedPerReason in collectChunksToFlush. This means that even if a particular chunk isn't flushed or even encoded, we count it in that metric. This is very misleading when debugging flush throughput e.g. the metric says that flushes are happening when, in fact, they are backlogged. It'd be nice to move that reporting into reportFlushedChunkStatistics

Should we also measure the number of chunks that were not flushed?

I'm pushing a commit addressing this problem with chunksFlushedPerReason, lmk what you think.

Regarding measuring the number of chunks not flushed, I think we can derive it from the total number of chunks (I think it is ingester_chunks_stored_total) minus the number of flushed chunks

@DylanGuedes DylanGuedes marked this pull request as ready for review April 13, 2022 15:55
@DylanGuedes DylanGuedes requested a review from a team as a code owner April 13, 2022 15:55
Copy link
Collaborator

@slim-bean slim-bean left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@slim-bean slim-bean merged commit b3b9656 into grafana:main Apr 13, 2022
@afayngelerindbx
Copy link
Contributor

Regarding measuring the number of chunks not flushed, I think we can derive it from the total number of chunks (I think it is ingester_chunks_stored_total) minus the number of flushed chunks

So ingester_chunks_stored_total is broken down by tenant while ingester_chunks_flushed_total is broken down by reason. So you can't really get a per-tenant count of "unflushed" chunks. Also, with this change as it stands, the total aggregated value of these counters should be the same since both are incremented once in reportFlushedChunkStatistics

Oof rationalizing these seems like a bigger change(probably out of scope here). Maybe revert the change to move the flushed metric. I'm sorry I've caused confusion.

@slim-bean
Copy link
Collaborator

I think moving the flushed metric still makes sense to be consistent on only updating the metric if the chunk is actually flushed.

There is also cortex_ingester_flush_queue_length for tracking the queues to see if they get backed up. It's not per tenant but I'm not sure you really need to know about per tenant stats on flushing as all the chunks are going to the same place?

@afayngelerindbx
Copy link
Contributor

There is also cortex_ingester_flush_queue_length for tracking the queues to see if they get backed up. It's not per tenant but I'm not sure you really need to know about per tenant stats on flushing as all the chunks are going to the same place?

flush queue length tracks the number of streams enqueued to be flushed. That number can stay flat while ingester is falling behind because a large number of chunks is going unflushed. Breaking down by tenant would be preferable since that is how one would apply limits/shed load, I think.

Definitely outside the scope of this patch. Glad this was merged. Going to test the new version soon. Thanks @slim-bean and @DylanGuedes for getting this in so quickly!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants