diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 951c72ba55..b0f207c5bc 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -161,8 +161,6 @@ pub struct TableFlushOptions { /// /// Default is false. pub block_on_write_thread: bool, - /// Flush policy - pub policy: TableFlushPolicy, } impl Default for TableFlushOptions { @@ -171,7 +169,6 @@ impl Default for TableFlushOptions { res_sender: None, compact_after_flush: true, block_on_write_thread: false, - policy: TableFlushPolicy::Dump, } } } @@ -184,23 +181,6 @@ pub struct TableFlushRequest { pub max_sequence: SequenceNumber, } -/// Policy of how to perform flush operation. -#[derive(Default, Debug, Clone, Copy)] -pub enum TableFlushPolicy { - /// Unknown policy, this is the default value and operation will report - /// error for it. Others except `RoleTable` should set policy to this - /// variant. - Unknown, - /// Dump memtable to sst file. - // todo: the default value should be [Unknown]. - #[default] - Dump, - // TODO: use this policy and remove "allow(dead_code)" - /// Drop memtables. - #[allow(dead_code)] - Purge, -} - impl Instance { /// Flush this table. pub async fn flush_table( @@ -348,7 +328,7 @@ impl Instance { let table = table_data.name.clone(); let instance = self.clone(); - let flush_job = async move { instance.flush_memtables(&flush_req, opts.policy).await }; + let flush_job = async move { instance.flush_memtables(&flush_req).await }; let compact_req = TableCompactionRequest::no_waiter( table_data.clone(), @@ -389,11 +369,7 @@ impl Instance { } /// Each table can only have one running flush job. - async fn flush_memtables( - &self, - flush_req: &TableFlushRequest, - policy: TableFlushPolicy, - ) -> Result<()> { + async fn flush_memtables(&self, flush_req: &TableFlushRequest) -> Result<()> { let TableFlushRequest { table_data, max_sequence, @@ -408,28 +384,16 @@ impl Instance { let request_id = RequestId::next_id(); info!( - "Instance try to flush memtables, table:{}, table_id:{}, request_id:{}, mems_to_flush:{:?}, policy:{:?}", - table_data.name, table_data.id, request_id, mems_to_flush, policy, + "Instance try to flush memtables, table:{}, table_id:{}, request_id:{}, mems_to_flush:{:?}", + table_data.name, table_data.id, request_id, mems_to_flush ); // Start flush duration timer. let local_metrics = table_data.metrics.local_flush_metrics(); local_metrics.observe_memtables_num(mems_to_flush.len()); let _timer = local_metrics.flush_duration_histogram.start_timer(); - - match policy { - TableFlushPolicy::Unknown => { - return UnknownPolicy {}.fail(); - } - TableFlushPolicy::Dump => { - self.dump_memtables(table_data, request_id, &mems_to_flush) - .await? - } - TableFlushPolicy::Purge => { - self.purge_memtables(table_data, request_id, &mems_to_flush) - .await? - } - } + self.dump_memtables(table_data, request_id, &mems_to_flush) + .await?; table_data.set_last_flush_time(time::current_time_millis()); @@ -441,48 +405,6 @@ impl Instance { Ok(()) } - /// Flush action for [TableFlushPolicy::Purge]. - /// - /// Purge is simply removing all selected memtables. - async fn purge_memtables( - &self, - table_data: &TableData, - request_id: RequestId, - mems_to_flush: &FlushableMemTables, - ) -> Result<()> { - // calculate largest sequence number purged - let mut last_sequence_purged = SequenceNumber::MIN; - if let Some(sampling_mem) = &mems_to_flush.sampling_mem { - last_sequence_purged = last_sequence_purged.max(sampling_mem.last_sequence()); - } - for mem in &mems_to_flush.memtables { - last_sequence_purged = last_sequence_purged.max(mem.last_sequence()); - } - - // remove these memtables - let mems_to_remove = mems_to_flush.ids(); - let edit = VersionEdit { - flushed_sequence: last_sequence_purged, - mems_to_remove, - files_to_add: vec![], - files_to_delete: vec![], - }; - table_data.current_version().apply_edit(edit); - - info!( - "Instance purged memtables, table:{}, table_id:{}, request_id:{}, mems_to_flush:{:?}, last_sequence_purged:{}", - table_data.name, - table_data.id, - request_id, - mems_to_flush, - last_sequence_purged - ); - - Ok(()) - } - - /// Flush action for [TableFlushPolicy::Dump]. - /// /// This will write picked memtables [FlushableMemTables] to level 0 sst /// files. Sampling memtable may be dumped into multiple sst file according /// to the sampled segment duration. diff --git a/analytic_engine/src/instance/open.rs b/analytic_engine/src/instance/open.rs index 7638a86e30..db08eb1ce7 100644 --- a/analytic_engine/src/instance/open.rs +++ b/analytic_engine/src/instance/open.rs @@ -27,7 +27,7 @@ use crate::{ ApplyMemTable, FlushTable, OperateByWriteWorker, ReadMetaUpdate, ReadWal, RecoverTableData, Result, }, - flush_compaction::{TableFlushOptions, TableFlushPolicy}, + flush_compaction::TableFlushOptions, mem_collector::MemUsageCollector, write_worker, write_worker::{RecoverTableCommand, WorkerLocal, WriteGroup}, @@ -423,7 +423,6 @@ impl Instance { res_sender: None, compact_after_flush: false, block_on_write_thread: false, - policy: TableFlushPolicy::Dump, }; self.flush_table_in_worker(worker_local, table_data, opts) .await diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index af802f3896..d2bffd3bc6 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -25,10 +25,7 @@ use tokio::sync::oneshot; use self::data::TableDataRef; use crate::{ - instance::{ - flush_compaction::{TableFlushOptions, TableFlushPolicy}, - Instance, InstanceRef, - }, + instance::{flush_compaction::TableFlushOptions, Instance, InstanceRef}, space::{SpaceAndTable, SpaceId}, }; @@ -263,7 +260,6 @@ impl Table for TableImpl { } else { None }, - policy: TableFlushPolicy::Dump, }; Instance::flush_table(self.space_table.table_data().clone(), flush_opts)