Skip to content

Commit

Permalink
feat: allow to flush region before migrating
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 10, 2024
1 parent 545a80c commit 858ba01
Show file tree
Hide file tree
Showing 13 changed files with 373 additions and 45 deletions.
11 changes: 10 additions & 1 deletion src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,20 @@ impl OpenRegion {
pub struct DowngradeRegion {
/// The [RegionId].
pub region_id: RegionId,
/// The timeout of waiting for flush the region.
///
/// `None` stands for don't flush before downgrading the region.
#[serde(default)]
pub wait_for_flush_timeout: Option<Duration>,
}

impl Display for DowngradeRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "DowngradeRegion(region_id={})", self.region_id)
write!(
f,
"DowngradeRegion(region_id={}, flush={:?})",
self.region_id, self.wait_for_flush_timeout,
)
}
}

Expand Down
22 changes: 21 additions & 1 deletion src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::region_server::RegionServer;
pub struct RegionHeartbeatResponseHandler {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
}

/// Handler of the instruction.
Expand All @@ -47,12 +48,22 @@ pub type InstructionHandler =
pub struct HandlerContext {
region_server: RegionServer,
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
}

impl HandlerContext {
fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId {
RegionId::new(region_ident.table_id, region_ident.region_number)
}

#[cfg(test)]
pub fn new_for_test(region_server: RegionServer) -> Self {
Self {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
}
}
}

impl RegionHeartbeatResponseHandler {
Expand All @@ -61,6 +72,7 @@ impl RegionHeartbeatResponseHandler {
Self {
region_server,
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
}
}

Expand Down Expand Up @@ -107,11 +119,13 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
let mailbox = ctx.mailbox.clone();
let region_server = self.region_server.clone();
let catchup_tasks = self.catchup_tasks.clone();
let downgrade_tasks = self.downgrade_tasks.clone();
let handler = Self::build_handler(instruction)?;
let _handle = common_runtime::spawn_global(async move {
let reply = handler(HandlerContext {
region_server,
catchup_tasks,
downgrade_tasks,
})
.await;

Expand All @@ -129,6 +143,7 @@ mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use common_meta::heartbeat::mailbox::{
HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
Expand Down Expand Up @@ -197,6 +212,7 @@ mod tests {
// Downgrade region
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id: RegionId::new(2048, 1),
wait_for_flush_timeout: Some(Duration::from_secs(1)),
});
assert!(heartbeat_handler
.is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction))));
Expand Down Expand Up @@ -392,7 +408,10 @@ mod tests {
// Should be ok, if we try to downgrade it twice.
for _ in 0..2 {
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
let instruction = Instruction::DowngradeRegion(DowngradeRegion { region_id });
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id,
wait_for_flush_timeout: Some(Duration::from_secs(1)),
});

let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
Expand All @@ -413,6 +432,7 @@ mod tests {
let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
let instruction = Instruction::DowngradeRegion(DowngradeRegion {
region_id: RegionId::new(2048, 1),
wait_for_flush_timeout: Some(Duration::from_secs(1)),
});
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
Expand Down
Loading

0 comments on commit 858ba01

Please sign in to comment.