Skip to content

Commit

Permalink
refactor: remove table flush policy
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Mar 7, 2023
1 parent cb99057 commit 2290dbd
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 91 deletions.
90 changes: 6 additions & 84 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,6 @@ pub struct TableFlushOptions {
///
/// Default is false.
pub block_on_write_thread: bool,
/// Flush policy
pub policy: TableFlushPolicy,
}

impl Default for TableFlushOptions {
Expand All @@ -171,7 +169,6 @@ impl Default for TableFlushOptions {
res_sender: None,
compact_after_flush: true,
block_on_write_thread: false,
policy: TableFlushPolicy::Dump,
}
}
}
Expand All @@ -184,23 +181,6 @@ pub struct TableFlushRequest {
pub max_sequence: SequenceNumber,
}

/// Policy of how to perform flush operation.
#[derive(Default, Debug, Clone, Copy)]
pub enum TableFlushPolicy {
/// Unknown policy, this is the default value and operation will report
/// error for it. Others except `RoleTable` should set policy to this
/// variant.
Unknown,
/// Dump memtable to sst file.
// todo: the default value should be [Unknown].
#[default]
Dump,
// TODO: use this policy and remove "allow(dead_code)"
/// Drop memtables.
#[allow(dead_code)]
Purge,
}

impl Instance {
/// Flush this table.
pub async fn flush_table(
Expand Down Expand Up @@ -348,7 +328,7 @@ impl Instance {
let table = table_data.name.clone();

let instance = self.clone();
let flush_job = async move { instance.flush_memtables(&flush_req, opts.policy).await };
let flush_job = async move { instance.flush_memtables(&flush_req).await };

let compact_req = TableCompactionRequest::no_waiter(
table_data.clone(),
Expand Down Expand Up @@ -389,11 +369,7 @@ impl Instance {
}

/// Each table can only have one running flush job.
async fn flush_memtables(
&self,
flush_req: &TableFlushRequest,
policy: TableFlushPolicy,
) -> Result<()> {
async fn flush_memtables(&self, flush_req: &TableFlushRequest) -> Result<()> {
let TableFlushRequest {
table_data,
max_sequence,
Expand All @@ -408,28 +384,16 @@ impl Instance {

let request_id = RequestId::next_id();
info!(
"Instance try to flush memtables, table:{}, table_id:{}, request_id:{}, mems_to_flush:{:?}, policy:{:?}",
table_data.name, table_data.id, request_id, mems_to_flush, policy,
"Instance try to flush memtables, table:{}, table_id:{}, request_id:{}, mems_to_flush:{:?}",
table_data.name, table_data.id, request_id, mems_to_flush
);

// Start flush duration timer.
let local_metrics = table_data.metrics.local_flush_metrics();
local_metrics.observe_memtables_num(mems_to_flush.len());
let _timer = local_metrics.flush_duration_histogram.start_timer();

match policy {
TableFlushPolicy::Unknown => {
return UnknownPolicy {}.fail();
}
TableFlushPolicy::Dump => {
self.dump_memtables(table_data, request_id, &mems_to_flush)
.await?
}
TableFlushPolicy::Purge => {
self.purge_memtables(table_data, request_id, &mems_to_flush)
.await?
}
}
self.dump_memtables(table_data, request_id, &mems_to_flush)
.await?;

table_data.set_last_flush_time(time::current_time_millis());

Expand All @@ -441,48 +405,6 @@ impl Instance {
Ok(())
}

/// Flush action for [TableFlushPolicy::Purge].
///
/// Purge is simply removing all selected memtables.
async fn purge_memtables(
&self,
table_data: &TableData,
request_id: RequestId,
mems_to_flush: &FlushableMemTables,
) -> Result<()> {
// calculate largest sequence number purged
let mut last_sequence_purged = SequenceNumber::MIN;
if let Some(sampling_mem) = &mems_to_flush.sampling_mem {
last_sequence_purged = last_sequence_purged.max(sampling_mem.last_sequence());
}
for mem in &mems_to_flush.memtables {
last_sequence_purged = last_sequence_purged.max(mem.last_sequence());
}

// remove these memtables
let mems_to_remove = mems_to_flush.ids();
let edit = VersionEdit {
flushed_sequence: last_sequence_purged,
mems_to_remove,
files_to_add: vec![],
files_to_delete: vec![],
};
table_data.current_version().apply_edit(edit);

info!(
"Instance purged memtables, table:{}, table_id:{}, request_id:{}, mems_to_flush:{:?}, last_sequence_purged:{}",
table_data.name,
table_data.id,
request_id,
mems_to_flush,
last_sequence_purged
);

Ok(())
}

/// Flush action for [TableFlushPolicy::Dump].
///
/// This will write picked memtables [FlushableMemTables] to level 0 sst
/// files. Sampling memtable may be dumped into multiple sst file according
/// to the sampled segment duration.
Expand Down
3 changes: 1 addition & 2 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
ApplyMemTable, FlushTable, OperateByWriteWorker, ReadMetaUpdate, ReadWal,
RecoverTableData, Result,
},
flush_compaction::{TableFlushOptions, TableFlushPolicy},
flush_compaction::TableFlushOptions,
mem_collector::MemUsageCollector,
write_worker,
write_worker::{RecoverTableCommand, WorkerLocal, WriteGroup},
Expand Down Expand Up @@ -423,7 +423,6 @@ impl Instance {
res_sender: None,
compact_after_flush: false,
block_on_write_thread: false,
policy: TableFlushPolicy::Dump,
};
self.flush_table_in_worker(worker_local, table_data, opts)
.await
Expand Down
6 changes: 1 addition & 5 deletions analytic_engine/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ use tokio::sync::oneshot;

use self::data::TableDataRef;
use crate::{
instance::{
flush_compaction::{TableFlushOptions, TableFlushPolicy},
Instance, InstanceRef,
},
instance::{flush_compaction::TableFlushOptions, Instance, InstanceRef},
space::{SpaceAndTable, SpaceId},
};

Expand Down Expand Up @@ -263,7 +260,6 @@ impl Table for TableImpl {
} else {
None
},
policy: TableFlushPolicy::Dump,
};

Instance::flush_table(self.space_table.table_data().clone(), flush_opts)
Expand Down

0 comments on commit 2290dbd

Please sign in to comment.