Skip to content

Commit

Permalink
impl semaphore to limit lease tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
anil0906 committed Apr 22, 2024
1 parent 6566e63 commit 56e5652
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
7 changes: 6 additions & 1 deletion scylla_pg_lib/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl DbExecute for PgAdapter {
})
.collect());
if let Err(e) = tx.commit().await {
try_count = try_count + 1;
log::error!("commit for tx failed : {}", e.to_string());
error = Some(PgAdapterError::DbError(e));
} else {
Expand Down Expand Up @@ -163,7 +164,7 @@ impl Persistence for PgAdapter {
worker: worker.clone(),
progress: Some(0.0),
}));

self.execute(UPDATE_BATCH_TASK_SQL, &[&queue, &limit, &worker_json, &deadline, &updated, &task_history])
.await
}
Expand All @@ -173,3 +174,7 @@ impl Persistence for PgAdapter {
self.execute_count(DELETE_BATCH_TASK_SQL, &[&deletion_time]).await
}
}

// impl PgAdapter {
// async fn retry_operation(&self, count: i32, operation: Box<dyn Fn() -> Result<Vec<Task>, Self::PersistenceError>>) -> Result<Vec<Task>, Self::PersistenceError> {}
// }
8 changes: 6 additions & 2 deletions scylla_pg_lib/src/bin/load_lease_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use uuid::{Uuid};
use scylla_models::AddTaskModel;
use scylla_pg_core::config::PGConfig;
use scylla_pg_lib::manager::PgManager;
use tokio::sync::Semaphore;

#[tokio::main]
pub async fn main() {
Expand All @@ -15,13 +16,15 @@ pub async fn main() {
let args: Vec<String> = env::args().collect();
let worker = &args[1];
log::info!("worker: {}", worker);
let semaphore = Arc::new(Semaphore::new(5));

let pgm_rw: Arc<RwLock<PgManager>> = Arc::new(RwLock::new(pgm));

loop {
tokio::time::sleep(Duration::from_millis(10)).await;
tokio::time::sleep(Duration::from_millis(5)).await;
let pgm_clone = pgm_rw.clone();
let worker_clone = worker.clone();
let permit = semaphore.clone().acquire_owned().await.unwrap();
tokio::spawn(async move {
match pgm_clone.read().await.lease_n_tasks("load_test".to_string(), 10, worker_clone, Some(5)).await {
Err(e) => { log::error!("error occurred while leasing tasks {e}"); }
Expand All @@ -31,11 +34,12 @@ pub async fn main() {
log::error!("error occurred while heartbeat tasks {e}");
}
if let Err(e) = pgm_clone.read().await.complete_task(t.rn.clone()).await {
log::error!("error occurred while heartbeat tasks {e}");
log::error!("error occurred while complete tasks {e}");
}
}
}
}
drop(permit);
});
}
}

0 comments on commit 56e5652

Please sign in to comment.