-
Notifications
You must be signed in to change notification settings - Fork 564
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(meta): make complete barrier non-async in worker loop #15109
Conversation
pub fn clear_changes(&mut self) { | ||
self.finished_jobs.clear(); | ||
pub async fn clear_and_fail_all_nodes(&mut self, err: &MetaError) { | ||
// join spawned completing command to finish no matter it succeeds or not. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, after this PR, there will be no pending join handle if we reach here because failure_recovery
will only be triggered when next_complete_barrier
is returned with an error. At that time, the pending join handle must be finish and self.completing_command
must be None.
That being said, should we assert here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made some change on the code.
In current code, we will call failure_recovery
when any barrier fails to be collected, which can be at anytime and may happen when there is some pending spawned complete_barrier
task unfinished yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM~
@@ -518,7 +518,7 @@ steps: | |||
retry: *auto-retry | |||
|
|||
- label: "recovery test (deterministic simulation)" | |||
command: "TEST_NUM=8 KILL_RATE=0.5 BACKGROUND_DDL_RATE=0.0 ci/scripts/deterministic-recovery-test.sh" | |||
command: "TEST_NUM=8 KILL_RATE=0.4 BACKGROUND_DDL_RATE=0.0 ci/scripts/deterministic-recovery-test.sh" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I ask why we need to change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this PR, handling new command will not be blocked by called on the async complete_barrier
, which makes the command be handled more timely. Once a command is started to be handled, a failure will fail the command, and then the test will retry the ddl. This increases the time taken to finish the test. Reduce the kill rate to reduce the time taken to finish the test.
src/meta/src/barrier/mod.rs
Outdated
pub(super) fn next_completed_barrier( | ||
&mut self, | ||
) -> impl Future<Output = MetaResult<(Arc<CommandContext>, bool)>> + '_ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for not implementing as an async function? 👀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The future is used in a select!, and we should make the future cancellation safe. Here we need to poll the join handle. If we implement it as a async method, we need to take the join handle out and await on it, which makes the future cancellation unsafe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
&mut Future
is also a Future
. Does it work in this case?
https://doc.rust-lang.org/stable/std/future/trait.Future.html#impl-Future-for-%26mut+F
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. Surprised that it works. I used to think that the .await
sugar must take the ownership of the future, but it seems that it doesn't.
So now we can do things like await on a oneshot rx for multiple times and pass the compile, which is somehow counterintuitive.
#[tokio::main]
async fn main() {
let (tx, mut rx) = tokio::sync::oneshot::channel();
tx.send(()).unwrap();
let rx_mut = &mut rx;
rx_mut.await.unwrap();
let rx_mut = &mut rx;
rx_mut.await.unwrap();
}
thread 'main' panicked at /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/sync/oneshot.rs:1109:13:
called after complete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This is what FusedFuture
for.
I believe the manual poll
style has the same issue. It works now because there's no another point of return Pending
between join_handle
gets ready and replace self.completing_command
to drop the future. This is exactly corresponding to no yield
point after awaiting on &mut join_handle
.
src/meta/src/barrier/mod.rs
Outdated
let prev_epoch = node.command_ctx.prev_epoch.value().0; | ||
match &mut node.state { | ||
Completed(resps) => { | ||
async fn complete_barrier(self, node: EpochNode) -> MetaResult<bool> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find myself missing the point of the main target of this PR. The diff looks not friendly to reviewers since it's apart from the original Manager::handle_barrier_complete
. 🤣
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original logic is, an event is fired in select to notify that a barrier has been collected from all CN, which then calls handle_barrier_complete, for each collected node, calls complete_barrier one by one to update hummock snapshot and the creating mv jobs.
In this PR, when the event is fired, we only modify the node state to Completed (which actually means collected) in checkpoint control, without calling handle complete. Later in next_completed_barrier, if there is no pending completing node, we will take the first collected node out from the queue and spawn a task to complete it. If the join handle finishes, we return with the command and other output.
.map_err(|e| { | ||
anyhow!("failed to join completing command: {:?}", e.as_report()).into() | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.map_err(|e| { | |
anyhow!("failed to join completing command: {:?}", e.as_report()).into() | |
}) | |
.context("failed to join completing command")? |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Previously after we collect a barrier, we call
complete_barrier
to commit the a new hummock snapshot and also do some post-collection work to update the catalog metadata. Thecomplete_barrier
is an async method, which involves works like doing IO to meta store backend, and acquiring async lock. Thecomplete_barrier
call will in-place block the global barrier manager worker loop, which may block the handling of other events.In this PR, we change to spawn a task a do the
complete_barrier
work, so that the call on it will be non-blocking. We also refactor the method to split it into two parts, one calledupdate_snapshot
, which updates the hummock snapshot, and the other calledupdate_tracking_jobs
, which updates the information of tracked create streaming jobs.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.