Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: make vacuum temp files batch #15430

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/ee/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ databend-enterprise-vacuum-handler = { path = "../ee_features/vacuum_handler" }
databend-enterprise-virtual-column = { path = "../ee_features/virtual_column" }
databend-query = { path = "../service" }
databend-storages-common-cache = { path = "../storages/common/cache" }
databend-storages-common-io = { path = "../storages/common/io" }
databend-storages-common-pruner = { path = "../storages/common/pruner" }
databend-storages-common-table-meta = { path = "../storages/common/table_meta" }
opendal = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion src/query/ee/src/storages/fuse/operations/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ impl VacuumHandler for RealVacuumHandler {

async fn do_vacuum_temporary_files(
&self,
ctx: Arc<dyn TableContext>,
temporary_dir: String,
retain: Option<Duration>,
vacuum_limit: Option<usize>,
) -> Result<Vec<String>> {
do_vacuum_temporary_files(temporary_dir, retain, vacuum_limit).await
do_vacuum_temporary_files(ctx, temporary_dir, retain, vacuum_limit).await
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_storage::DataOperator;
use databend_storages_common_io::Files;
use futures_util::TryStreamExt;
use log::info;
use opendal::Metakey;

const BATCH_SIZE: usize = 10_000;

#[async_backtrace::framed]
pub async fn do_vacuum_temporary_files(
ctx: Arc<dyn TableContext>,
temporary_dir: String,
retain: Option<Duration>,
limit: Option<usize>,
) -> Result<Vec<String>> {
let operator = DataOperator::instance().operator();
let files = Files::create(ctx, operator.clone());

let mut ds = operator
.lister_with(&temporary_dir)
Expand All @@ -43,20 +51,41 @@ pub async fn do_vacuum_temporary_files(
.as_millis() as i64;

let mut remove_temp_files_name = Vec::new();
let mut expired_files = Vec::with_capacity(BATCH_SIZE);
while let Some(de) = ds.try_next().await? {
let meta = de.metadata();

if let Some(modified) = meta.last_modified() {
if timestamp - modified.timestamp_millis() >= expire_time {
operator.delete(de.path()).await?;
expired_files.push(de.path().to_string());
remove_temp_files_name.push(de.name().to_string());
}

// If the batch size is reached, remove the files in batch.
if expired_files.len() >= BATCH_SIZE {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe operator.remove_via is better?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will have a check:)

info!("Removing {} expired files in batch", expired_files.len());
files.remove_file_in_batch(expired_files.clone()).await?;
expired_files.clear();
}

if remove_temp_files_name.len() >= limit {
break;
}
}
}

if !expired_files.is_empty() {
info!(
"Removing the remaining {} expired files",
expired_files.len()
);
files.remove_file_in_batch(expired_files).await?;
}

info!(
"Finished vacuuming temporary files, removed {} files in total",
remove_temp_files_name.len()
);

Ok(remove_temp_files_name)
}
12 changes: 10 additions & 2 deletions src/query/ee/tests/it/storages/fuse/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn test_fuse_do_vacuum_drop_tables() -> Result<()> {

#[tokio::test(flavor = "multi_thread")]
async fn test_do_vacuum_temporary_files() -> Result<()> {
let _fixture = TestFixture::setup().await?;
let fixture = TestFixture::setup().await?;

let operator = DataOperator::instance().operator();
operator.write("test_dir/test1", vec![1, 2]).await?;
Expand All @@ -120,7 +120,9 @@ async fn test_do_vacuum_temporary_files() -> Result<()> {
assert_eq!(3, operator.list("test_dir/").await?.len());

tokio::time::sleep(Duration::from_secs(2)).await;
let ctx = fixture.new_query_ctx().await?;
do_vacuum_temporary_files(
ctx.clone(),
"test_dir/".to_string(),
Some(Duration::from_secs(2)),
Some(1),
Expand All @@ -129,7 +131,13 @@ async fn test_do_vacuum_temporary_files() -> Result<()> {

assert_eq!(2, operator.list("test_dir/").await?.len());

do_vacuum_temporary_files("test_dir/".to_string(), Some(Duration::from_secs(2)), None).await?;
do_vacuum_temporary_files(
ctx,
"test_dir/".to_string(),
Some(Duration::from_secs(2)),
None,
)
.await?;
assert_eq!(0, operator.list("test_dir/").await?.len());

Ok(())
Expand Down
4 changes: 3 additions & 1 deletion src/query/ee_features/vacuum_handler/src/vacuum_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub trait VacuumHandler: Sync + Send {

async fn do_vacuum_temporary_files(
&self,
ctx: Arc<dyn TableContext>,
temporary_dir: String,
retain: Option<Duration>,
vacuum_limit: Option<usize>,
Expand Down Expand Up @@ -86,12 +87,13 @@ impl VacuumHandlerWrapper {
#[async_backtrace::framed]
pub async fn do_vacuum_temporary_files(
&self,
ctx: Arc<dyn TableContext>,
temporary_dir: String,
retain: Option<Duration>,
vacuum_limit: Option<usize>,
) -> Result<Vec<String>> {
self.handler
.do_vacuum_temporary_files(temporary_dir, retain, vacuum_limit)
.do_vacuum_temporary_files(ctx, temporary_dir, retain, vacuum_limit)
.await
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl Interpreter for VacuumTemporaryFilesInterpreter {
let temporary_files_prefix = query_spill_prefix(self.ctx.get_tenant().tenant_name());
let remove_files = handler
.do_vacuum_temporary_files(
self.ctx.clone(),
temporary_files_prefix,
self.plan.retain,
self.plan.limit.map(|x| x as usize),
Expand Down
Loading