Skip to content

Commit

Permalink
safekeeper: use CancellationToken instead of watch channel (#7836)
Browse files Browse the repository at this point in the history
## Problem

Safekeeper Timeline uses a channel for cancellation, but we have a
dedicated type for that.

## Summary of changes

- Use CancellationToken in Timeline
  • Loading branch information
jcsp committed May 22, 2024
1 parent a7f31f1 commit e015b2b
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 52 deletions.
10 changes: 2 additions & 8 deletions safekeeper/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,11 @@ use crate::{
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
pub async fn recovery_main(tli: Arc<Timeline>, conf: SafeKeeperConf) {
info!("started");
let mut cancellation_rx = match tli.get_cancellation_rx() {
Ok(rx) => rx,
Err(_) => {
info!("timeline canceled during task start");
return;
}
};

let cancel = tli.cancel.clone();
select! {
_ = recovery_main_loop(tli, conf) => { unreachable!() }
_ = cancellation_rx.changed() => {
_ = cancel.cancelled() => {
info!("stopped");
}
}
Expand Down
31 changes: 7 additions & 24 deletions safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use camino::Utf8PathBuf;
use postgres_ffi::XLogSegNo;
use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio_util::sync::CancellationToken;

use std::cmp::max;
use std::ops::{Deref, DerefMut};
Expand Down Expand Up @@ -342,12 +343,8 @@ pub struct Timeline {
walsenders: Arc<WalSenders>,
walreceivers: Arc<WalReceivers>,

/// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal.
cancellation_tx: watch::Sender<bool>,

/// Timeline should not be used after cancellation. Background tasks should
/// monitor this channel and stop eventually after receiving `true` from this channel.
cancellation_rx: watch::Receiver<bool>,
/// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
pub(crate) cancel: CancellationToken,

/// Directory where timeline state is stored.
pub timeline_dir: Utf8PathBuf,
Expand Down Expand Up @@ -376,7 +373,6 @@ impl Timeline {
shared_state.sk.flush_lsn(),
)));
let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
let (cancellation_tx, cancellation_rx) = watch::channel(false);

let walreceivers = WalReceivers::new();
Ok(Timeline {
Expand All @@ -390,8 +386,7 @@ impl Timeline {
mutex: RwLock::new(shared_state),
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
cancellation_rx,
cancellation_tx,
cancel: CancellationToken::default(),
timeline_dir: conf.timeline_dir(&ttid),
walsenders_keep_horizon: conf.walsenders_keep_horizon,
broker_active: AtomicBool::new(false),
Expand All @@ -411,7 +406,6 @@ impl Timeline {
let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) =
watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID)));
let (shared_state_version_tx, shared_state_version_rx) = watch::channel(0);
let (cancellation_tx, cancellation_rx) = watch::channel(false);

let state =
TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn);
Expand All @@ -428,8 +422,7 @@ impl Timeline {
mutex: RwLock::new(SharedState::create_new(conf, &ttid, state)?),
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
cancellation_rx,
cancellation_tx,
cancel: CancellationToken::default(),
timeline_dir: conf.timeline_dir(&ttid),
walsenders_keep_horizon: conf.walsenders_keep_horizon,
broker_active: AtomicBool::new(false),
Expand Down Expand Up @@ -535,25 +528,15 @@ impl Timeline {
/// eventually after receiving cancellation signal.
fn cancel(&self, shared_state: &mut WriteGuardSharedState<'_>) {
info!("timeline {} is cancelled", self.ttid);
let _ = self.cancellation_tx.send(true);
self.cancel.cancel();
// Close associated FDs. Nobody will be able to touch timeline data once
// it is cancelled, so WAL storage won't be opened again.
shared_state.sk.wal_store.close();
}

/// Returns if timeline is cancelled.
pub fn is_cancelled(&self) -> bool {
*self.cancellation_rx.borrow()
}

/// Returns watch channel which gets value when timeline is cancelled. It is
/// guaranteed to have not cancelled value observed (errors otherwise).
pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
let rx = self.cancellation_rx.clone();
if *rx.borrow() {
bail!(TimelineError::Cancelled(self.ttid));
}
Ok(rx)
self.cancel.is_cancelled()
}

/// Take a writing mutual exclusive lock on timeline shared_state.
Expand Down
10 changes: 1 addition & 9 deletions safekeeper/src/timeline_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,6 @@ pub async fn main_task(
conf: SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
) {
let mut cancellation_rx = match tli.get_cancellation_rx() {
Ok(rx) => rx,
Err(_) => {
info!("timeline canceled during task start");
return;
}
};

scopeguard::defer! {
if tli.is_cancelled() {
info!("manager task finished");
Expand Down Expand Up @@ -129,7 +121,7 @@ pub async fn main_task(
// wait until something changes. tx channels are stored under Arc, so they will not be
// dropped until the manager task is finished.
tokio::select! {
_ = cancellation_rx.changed() => {
_ = tli.cancel.cancelled() => {
// timeline was deleted
break 'outer state_snapshot;
}
Expand Down
14 changes: 3 additions & 11 deletions safekeeper/src/wal_backup_partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,6 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
debug!("started");
let await_duration = conf.partial_backup_timeout;

let mut cancellation_rx = match tli.get_cancellation_rx() {
Ok(rx) => rx,
Err(_) => {
info!("timeline canceled during task start");
return;
}
};

// sleep for random time to avoid thundering herd
{
let randf64 = rand::thread_rng().gen_range(0.0..1.0);
Expand Down Expand Up @@ -327,7 +319,7 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
&& flush_lsn_rx.borrow().term == seg.term
{
tokio::select! {
_ = cancellation_rx.changed() => {
_ = backup.tli.cancel.cancelled() => {
info!("timeline canceled");
return;
}
Expand All @@ -340,7 +332,7 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
// if we don't have any data and zero LSNs, wait for something
while flush_lsn_rx.borrow().lsn == Lsn(0) {
tokio::select! {
_ = cancellation_rx.changed() => {
_ = backup.tli.cancel.cancelled() => {
info!("timeline canceled");
return;
}
Expand All @@ -357,7 +349,7 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
// waiting until timeout expires OR segno changes
'inner: loop {
tokio::select! {
_ = cancellation_rx.changed() => {
_ = backup.tli.cancel.cancelled() => {
info!("timeline canceled");
return;
}
Expand Down

1 comment on commit e015b2b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3190 tests run: 3049 passed, 1 failed, 140 skipped (full report)


Failures on Postgres 14

  • test_download_churn[github-actions-selfhosted-100-tokio-epoll-uring-30]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_download_churn[release-pg14-github-actions-selfhosted-100-tokio-epoll-uring-30]"

Code coverage* (full report)

  • functions: 31.3% (6413 of 20477 functions)
  • lines: 48.0% (49309 of 102644 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
e015b2b at 2024-05-22T16:38:01.823Z :recycle:

Please sign in to comment.