Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): make complete barrier non-async in worker loop #15109

Merged
merged 25 commits into from
Mar 5, 2024

Conversation

wenym1
Copy link
Contributor

@wenym1 wenym1 commented Feb 18, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Previously after we collect a barrier, we call complete_barrier to commit the a new hummock snapshot and also do some post-collection work to update the catalog metadata. The complete_barrier is an async method, which involves works like doing IO to meta store backend, and acquiring async lock. The complete_barrier call will in-place block the global barrier manager worker loop, which may block the handling of other events.

In this PR, we change to spawn a task a do the complete_barrier work, so that the call on it will be non-blocking. We also refactor the method to split it into two parts, one called update_snapshot, which updates the hummock snapshot, and the other called update_tracking_jobs, which updates the information of tracked create streaming jobs.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

pub fn clear_changes(&mut self) {
self.finished_jobs.clear();
pub async fn clear_and_fail_all_nodes(&mut self, err: &MetaError) {
// join spawned completing command to finish no matter it succeeds or not.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, after this PR, there will be no pending join handle if we reach here because failure_recovery will only be triggered when next_complete_barrier is returned with an error. At that time, the pending join handle must be finish and self.completing_command must be None.

That being said, should we assert here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have made some change on the code.

In current code, we will call failure_recovery when any barrier fails to be collected, which can be at anytime and may happen when there is some pending spawned complete_barrier task unfinished yet.

Copy link
Contributor

@yezizp2012 yezizp2012 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM~

@@ -518,7 +518,7 @@ steps:
retry: *auto-retry

- label: "recovery test (deterministic simulation)"
command: "TEST_NUM=8 KILL_RATE=0.5 BACKGROUND_DDL_RATE=0.0 ci/scripts/deterministic-recovery-test.sh"
command: "TEST_NUM=8 KILL_RATE=0.4 BACKGROUND_DDL_RATE=0.0 ci/scripts/deterministic-recovery-test.sh"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I ask why we need to change this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this PR, handling new command will not be blocked by called on the async complete_barrier, which makes the command be handled more timely. Once a command is started to be handled, a failure will fail the command, and then the test will retry the ddl. This increases the time taken to finish the test. Reduce the kill rate to reduce the time taken to finish the test.

src/meta/src/barrier/mod.rs Show resolved Hide resolved
src/meta/src/barrier/mod.rs Outdated Show resolved Hide resolved
Comment on lines 944 to 946
pub(super) fn next_completed_barrier(
&mut self,
) -> impl Future<Output = MetaResult<(Arc<CommandContext>, bool)>> + '_ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for not implementing as an async function? 👀

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The future is used in a select!, and we should make the future cancellation safe. Here we need to poll the join handle. If we implement it as a async method, we need to take the join handle out and await on it, which makes the future cancellation unsafe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&mut Future is also a Future. Does it work in this case?

https://doc.rust-lang.org/stable/std/future/trait.Future.html#impl-Future-for-%26mut+F

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. Surprised that it works. I used to think that the .await sugar must take the ownership of the future, but it seems that it doesn't.

So now we can do things like await on a oneshot rx for multiple times and pass the compile, which is somehow counterintuitive.

#[tokio::main]
async fn main() {
    let (tx, mut rx) = tokio::sync::oneshot::channel();
    tx.send(()).unwrap();

    let rx_mut = &mut rx;
    rx_mut.await.unwrap();

    let rx_mut = &mut rx;
    rx_mut.await.unwrap();
}

thread 'main' panicked at /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.36.0/src/sync/oneshot.rs:1109:13:
called after complete

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is what FusedFuture for.

I believe the manual poll style has the same issue. It works now because there's no another point of return Pending between join_handle gets ready and replace self.completing_command to drop the future. This is exactly corresponding to no yield point after awaiting on &mut join_handle.

let prev_epoch = node.command_ctx.prev_epoch.value().0;
match &mut node.state {
Completed(resps) => {
async fn complete_barrier(self, node: EpochNode) -> MetaResult<bool> {
Copy link
Member

@BugenZhao BugenZhao Mar 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find myself missing the point of the main target of this PR. The diff looks not friendly to reviewers since it's apart from the original Manager::handle_barrier_complete. 🤣

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original logic is, an event is fired in select to notify that a barrier has been collected from all CN, which then calls handle_barrier_complete, for each collected node, calls complete_barrier one by one to update hummock snapshot and the creating mv jobs.

In this PR, when the event is fired, we only modify the node state to Completed (which actually means collected) in checkpoint control, without calling handle complete. Later in next_completed_barrier, if there is no pending completing node, we will take the first collected node out from the queue and spawn a task to complete it. If the join handle finishes, we return with the command and other output.

@wenym1 wenym1 added this pull request to the merge queue Mar 5, 2024
Comment on lines +978 to +980
.map_err(|e| {
anyhow!("failed to join completing command: {:?}", e.as_report()).into()
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.map_err(|e| {
anyhow!("failed to join completing command: {:?}", e.as_report()).into()
})
.context("failed to join completing command")?

Merged via the queue into main with commit 71504d7 Mar 5, 2024
26 of 27 checks passed
@wenym1 wenym1 deleted the yiming/non-async-complete-barrier branch March 5, 2024 07:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants