-
Notifications
You must be signed in to change notification settings - Fork 586
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: improve compaction job state management #3519
Conversation
case compactorv1.CompactionStatus_COMPACTION_STATUS_IN_PROGRESS: | ||
m.compactionJobQueue.update(statusUpdate.JobName, raftAppendedAtNanos, statusUpdate.RaftLogIndex) | ||
stateUpdate.updatedJobs = append(stateUpdate.updatedJobs, job.Name) | ||
case compactorv1.CompactionStatus_COMPACTION_STATUS_FAILURE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still missing a failure reason from the worker, I am considering adding it so that we can persist it and show it in tooling. However, after reaching max failures we currently delete the job to avoid accumulating jobs so the failure reason would only be observed temporarily.
We could implement a job retention policy for this (we might need it anyway) but I would do that separately.
|
||
func (m *metastoreState) writeToDb(sTable *pollStateUpdate) error { | ||
return m.db.boltdb.Update(func(tx *bbolt.Tx) error { | ||
for shard, blocks := range sTable.newBlocks { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to ensure atomicity of operations within a shard. Consider a case:
- We remove source blocks.
- We handle query that targets the blocks we just have removed.
- We add compacted blocks.
For me it's not very apparent how we're handling this. I suspect that the query will return incomplete results
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
Currently from what I can see in the read path we read data directly from memory. 9442699 locks the shards mutex while replacing source blocks with compacted ones so things should be a bit better. Ideally we should lock a single shard but this makes the code more complex and swapping of blocks should anyway be quite fast.
4c5630e
to
9442699
Compare
The main change here is how we update the state of compaction jobs, in particular when workers are polling with state updates and ask for jobs.
The current implementation intermixes the persistence (boltdb) and in-memory state updates. This causes a few cases where an error could leave the 2 storage layers in an inconsistent state.
The new implementation does everything in memory first and constructs a list of items that need to be durably stored. If we fail to durably store something or otherwise end up in an unexpected state while persisting, the application will panic.
Bonus: