Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve compaction job state management #3519

Merged
merged 13 commits into from
Aug 28, 2024
Merged

Conversation

aleks-p
Copy link
Contributor

@aleks-p aleks-p commented Aug 26, 2024

The main change here is how we update the state of compaction jobs, in particular when workers are polling with state updates and ask for jobs.

The current implementation intermixes the persistence (boltdb) and in-memory state updates. This causes a few cases where an error could leave the 2 storage layers in an inconsistent state.

The new implementation does everything in memory first and constructs a list of items that need to be durably stored. If we fail to durably store something or otherwise end up in an unexpected state while persisting, the application will panic.

Bonus:

  • handle failed compaction jobs (with max retries)
  • prioritize jobs on compaction level before lease expiry
  • unit tests for compaction job creation and state management
  • convert some flags to config variables
  • rename "job pre queue" to "job block queue"

@aleks-p aleks-p requested review from a team as code owners August 26, 2024 11:40
case compactorv1.CompactionStatus_COMPACTION_STATUS_IN_PROGRESS:
m.compactionJobQueue.update(statusUpdate.JobName, raftAppendedAtNanos, statusUpdate.RaftLogIndex)
stateUpdate.updatedJobs = append(stateUpdate.updatedJobs, job.Name)
case compactorv1.CompactionStatus_COMPACTION_STATUS_FAILURE:
Copy link
Contributor Author

@aleks-p aleks-p Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still missing a failure reason from the worker, I am considering adding it so that we can persist it and show it in tooling. However, after reaching max failures we currently delete the job to avoid accumulating jobs so the failure reason would only be observed temporarily.

We could implement a job retention policy for this (we might need it anyway) but I would do that separately.


func (m *metastoreState) writeToDb(sTable *pollStateUpdate) error {
return m.db.boltdb.Update(func(tx *bbolt.Tx) error {
for shard, blocks := range sTable.newBlocks {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to ensure atomicity of operations within a shard. Consider a case:

  • We remove source blocks.
  • We handle query that targets the blocks we just have removed.
  • We add compacted blocks.

For me it's not very apparent how we're handling this. I suspect that the query will return incomplete results

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

Currently from what I can see in the read path we read data directly from memory. 9442699 locks the shards mutex while replacing source blocks with compacted ones so things should be a bit better. Ideally we should lock a single shard but this makes the code more complex and swapping of blocks should anyway be quite fast.

@aleks-p aleks-p force-pushed the feat/v2/compaction-planning branch from 4c5630e to 9442699 Compare August 27, 2024 19:40
@aleks-p aleks-p merged commit 32621d5 into main Aug 28, 2024
18 checks passed
@aleks-p aleks-p deleted the feat/v2/compaction-planning branch August 28, 2024 11:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants