diff --git a/packages/cohort_banking/src/app.rs b/packages/cohort_banking/src/app.rs index fe5a692a..1a8d4334 100644 --- a/packages/cohort_banking/src/app.rs +++ b/packages/cohort_banking/src/app.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use cohort_sdk::{ cohort::Cohort, - model::{CandidateData, CertificationRequest, ClientErrorKind, Config}, + model::{ClientErrorKind, Config}, }; use opentelemetry_api::{ @@ -13,9 +13,9 @@ use opentelemetry_api::{ use talos_agent::messaging::api::Decision; use crate::{ - callbacks::{oo_installer::OutOfOrderInstallerImpl, state_provider::StateProviderImpl}, + callbacks::{certification_candidate_provider::CertificationCandidateProviderImpl, oo_installer::OutOfOrderInstallerImpl}, examples_support::queue_processor::Handler, - model::requests::{BusinessActionType, TransferRequest}, + model::requests::{BusinessActionType, CandidateData, CertificationRequest, TransferRequest}, state::postgres::{database::Database, database_config::DatabaseConfig}, }; @@ -63,14 +63,12 @@ impl Handler for BankingApp { async fn handle(&self, request: TransferRequest) -> Result<(), String> { log::debug!("processig new banking transfer request: {:?}", request); - let request_copy = request.clone(); - let statemap = vec![HashMap::from([( BusinessActionType::TRANSFER.to_string(), TransferRequest::new(request.from.clone(), request.to.clone(), request.amount).json(), )])]; - let request = CertificationRequest { + let certification_request = CertificationRequest { timeout_ms: 0, candidate: CandidateData { readset: vec![request.from.clone(), request.to.clone()], @@ -80,15 +78,14 @@ impl Handler for BankingApp { }; let single_query_strategy = true; - let state_provider = StateProviderImpl { + let state_provider = CertificationCandidateProviderImpl { database: Arc::clone(&self.database), - request: request_copy.clone(), single_query_strategy, }; + let request_payload_callback = || state_provider.get_certification_candidate(certification_request.clone()); let oo_inst = OutOfOrderInstallerImpl { database: Arc::clone(&self.database), - request: request_copy, detailed_logging: false, counter_oo_no_data_found: Arc::clone(&self.counter_oo_no_data_found), single_query_strategy, @@ -98,7 +95,7 @@ impl Handler for BankingApp { .cohort_api .as_ref() .expect("Banking app is not initialised") - .certify(request, &state_provider, &oo_inst) + .certify(&request_payload_callback, &oo_inst) .await { Ok(rsp) => { diff --git a/packages/cohort_banking/src/callbacks/state_provider.rs b/packages/cohort_banking/src/callbacks/certification_candidate_provider.rs similarity index 56% rename from packages/cohort_banking/src/callbacks/state_provider.rs rename to packages/cohort_banking/src/callbacks/certification_candidate_provider.rs index f310e2d4..fb122769 100644 --- a/packages/cohort_banking/src/callbacks/state_provider.rs +++ b/packages/cohort_banking/src/callbacks/certification_candidate_provider.rs @@ -1,23 +1,30 @@ use std::sync::Arc; -use async_trait::async_trait; -use cohort_sdk::model::callbacks::{CapturedItemState, CapturedState, ItemStateProvider}; +use cohort_sdk::model::callback::{CertificationCandidate, CertificationCandidateCallbackResponse, CertificationRequestPayload}; use rust_decimal::Decimal; use tokio_postgres::Row; use crate::{ - model::{bank_account::BankAccount, requests::TransferRequest}, + model::{bank_account::BankAccount, requests::CertificationRequest}, state::postgres::database::{Database, DatabaseError}, }; -pub struct StateProviderImpl { - pub request: TransferRequest, - pub database: Arc, - pub single_query_strategy: bool, +#[derive(Debug, PartialEq, PartialOrd)] +pub struct CapturedState { + pub snapshot_version: u64, + pub items: Vec, +} + +#[derive(Debug, PartialEq, PartialOrd)] +pub struct CapturedItemState { + pub id: String, + pub version: u64, } -impl StateProviderImpl { - pub fn account_from_row(row: &Row) -> Result { +impl TryFrom<&Row> for BankAccount { + type Error = DatabaseError; + + fn try_from(row: &Row) -> Result { Ok(BankAccount { name: row .try_get::<&str, String>("name") @@ -33,20 +40,27 @@ impl StateProviderImpl { .map_err(|e| DatabaseError::deserialise_payload(e.to_string(), "Cannot read account amount".into()))?, }) } +} + +pub struct CertificationCandidateProviderImpl { + pub database: Arc, + pub single_query_strategy: bool, +} - async fn get_state_using_two_queries(&self) -> Result { +impl CertificationCandidateProviderImpl { + async fn get_state_using_two_queries(&self, from_account: &str, to_account: &str) -> Result { let list = self .database .query_many( r#"SELECT ba.* FROM bank_accounts ba WHERE ba."number" = $1 OR ba."number" = $2"#, - &[&self.request.from, &self.request.to], - Self::account_from_row, + &[&from_account, &to_account], + |row| BankAccount::try_from(row), ) .await .map_err(|e| e.to_string())?; if list.len() != 2 { - return Err(format!("Unable to load state of accounts: '{}' and '{}'", self.request.from, self.request.to)); + return Err(format!("Unable to load state of accounts: '{:?}' and '{:?}'", from_account, to_account)); } let snapshot_version = self @@ -73,11 +87,10 @@ impl StateProviderImpl { version: account.version, }) .collect(), - abort_reason: None, }) } - async fn get_state_using_one_query(&self) -> Result { + async fn get_state_using_one_query(&self, from_account: &str, to_account: &str) -> Result { let list = self .database .query_many( @@ -92,10 +105,10 @@ impl StateProviderImpl { bank_accounts ba, cohort_snapshot cs WHERE ba."number" = $1 OR ba."number" = $2"#, - &[&self.request.from, &self.request.to], + &[&from_account, &to_account], // convert RAW output into tuple (bank account, snap ver) |row| { - let account = Self::account_from_row(row)?; + let account = BankAccount::try_from(row)?; let snapshot_version = row .try_get::<&str, i64>("snapshot_version") .map_err(|e| DatabaseError::deserialise_payload(e.to_string(), "Cannot read snapshot_version".into()))?; @@ -106,7 +119,7 @@ impl StateProviderImpl { .map_err(|e| e.to_string())?; if list.len() != 2 { - return Err(format!("Unable to load state of accounts: '{}' and '{}'", self.request.from, self.request.to)); + return Err(format!("Unable to load state of accounts: '{:?}' and '{:?}'", from_account, to_account)); } Ok(CapturedState { @@ -118,18 +131,38 @@ impl StateProviderImpl { version: tuple.0.version, }) .collect(), - abort_reason: None, }) } -} -#[async_trait] -impl ItemStateProvider for StateProviderImpl { - async fn get_state(&self) -> Result { - if self.single_query_strategy { - self.get_state_using_one_query().await + pub async fn get_certification_candidate(&self, request: CertificationRequest) -> Result { + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + // This example doesn't handle `Cancelled` scenario. + // If user cancellation is needed, add additional logic in this fn to return `Cancelled` instead of `Proceed` in the result. + // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + + // The order of the accounts doesn't really matter in this example as we just use these accounts to get their respective versions. + // Safe assumption made here that we have 2 items in the writeset here. As our example is to transfer between 2 accounts. + // The other alternative is to deserialize from statemap Value, which could be expensive comparitively, also we may not have statemap. + let first_account = &request.candidate.writeset[0]; + let second_account = &request.candidate.writeset[1]; + + let state = if self.single_query_strategy { + self.get_state_using_one_query(first_account, second_account).await } else { - self.get_state_using_two_queries().await - } + self.get_state_using_two_queries(first_account, second_account).await + }?; + + let candidate = CertificationCandidate { + readset: request.candidate.readset, + writeset: request.candidate.writeset, + statemaps: request.candidate.statemap, + readvers: state.items.into_iter().map(|x| x.version).collect(), + }; + + Ok(CertificationCandidateCallbackResponse::Proceed(CertificationRequestPayload { + candidate, + snapshot: state.snapshot_version, + timeout_ms: request.timeout_ms, + })) } } diff --git a/packages/cohort_banking/src/callbacks/mod.rs b/packages/cohort_banking/src/callbacks/mod.rs index 474d49c6..1dbc99b9 100644 --- a/packages/cohort_banking/src/callbacks/mod.rs +++ b/packages/cohort_banking/src/callbacks/mod.rs @@ -1,3 +1,3 @@ +pub mod certification_candidate_provider; pub mod oo_installer; -pub mod state_provider; pub mod statemap_installer; diff --git a/packages/cohort_banking/src/callbacks/oo_installer.rs b/packages/cohort_banking/src/callbacks/oo_installer.rs index 6278eb09..8f195c9f 100644 --- a/packages/cohort_banking/src/callbacks/oo_installer.rs +++ b/packages/cohort_banking/src/callbacks/oo_installer.rs @@ -1,13 +1,8 @@ -use std::{ - collections::HashSet, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{collections::HashSet, sync::Arc}; use async_trait::async_trait; -use cohort_sdk::model::callbacks::{OutOfOrderInstallOutcome, OutOfOrderInstaller}; +use cohort_sdk::model::callback::{OutOfOrderInstallOutcome, OutOfOrderInstallRequest, OutOfOrderInstaller}; use opentelemetry_api::metrics::Counter; -use tokio::task::JoinHandle; use tokio_postgres::types::ToSql; use crate::{ @@ -17,7 +12,6 @@ use crate::{ pub struct OutOfOrderInstallerImpl { pub database: Arc, - pub request: TransferRequest, pub detailed_logging: bool, pub counter_oo_no_data_found: Arc>, pub single_query_strategy: bool, @@ -26,72 +20,13 @@ pub struct OutOfOrderInstallerImpl { pub static SNAPSHOT_SINGLETON_ROW_ID: &str = "SINGLETON"; impl OutOfOrderInstallerImpl { - async fn is_safe_to_proceed(db: Arc, safepoint: u64) -> Result { - let snapshot = db - .query_one(r#"SELECT "version" FROM cohort_snapshot WHERE id = $1"#, &[&SNAPSHOT_SINGLETON_ROW_ID], |row| { - let snapshot = row - .try_get::<&str, i64>("version") - .map(|v| v as u64) - .map_err(|e| DatabaseError::deserialise_payload(e.to_string(), "Cannot read snapshot version".into()))?; - Ok(snapshot) - }) - .await - .map_err(|e| e.to_string())?; - Ok(snapshot >= safepoint) - } - - async fn install_item(&self, new_version: u64) -> Result { - let sql = r#" - UPDATE bank_accounts ba SET - "amount" = - (CASE - WHEN ba."number" = ($1)::TEXT THEN ba."amount" + ($3)::DECIMAL - WHEN ba."number" = ($2)::TEXT THEN ba."amount" - ($3)::DECIMAL - END), - "version" = ($4)::BIGINT - WHERE ba."number" IN (($1)::TEXT, ($2)::TEXT) AND ba."version" < ($4)::BIGINT - "#; - - let params: &[&(dyn ToSql + Sync)] = &[&self.request.from, &self.request.to, &self.request.amount, &(new_version as i64)]; - - let result = self.database.execute(sql, params).await.map_err(|e| e.to_string())?; - - if result == 0 { - Ok(OutOfOrderInstallOutcome::InstalledAlready) - } else { - Ok(OutOfOrderInstallOutcome::Installed) - } - } - - async fn install_using_polling(&self, _xid: String, safepoint: u64, new_version: u64, _attempt_nr: u32) -> Result { - let db = Arc::clone(&self.database); - let wait_handle: JoinHandle> = tokio::spawn(async move { - let mut safe_now = Self::is_safe_to_proceed(Arc::clone(&db), safepoint).await?; - let poll_frequency = Duration::from_secs(1); - let started_at = Instant::now(); - loop { - if safe_now { - return Ok(true); - } - - tokio::time::sleep(poll_frequency).await; - if started_at.elapsed().as_secs() >= 600 { - return Ok(false); - } - - safe_now = Self::is_safe_to_proceed(Arc::clone(&db), safepoint).await?; - } - }); - - let is_safe_now = wait_handle.await.map_err(|e| e.to_string())??; - if is_safe_now { - self.install_item(new_version).await - } else { - Ok(OutOfOrderInstallOutcome::SafepointCondition) - } - } - - async fn install_using_single_query(&self, xid: String, safepoint: u64, new_version: u64, attempt_nr: u32) -> Result { + async fn install_using_single_query( + &self, + xid: &str, + safepoint: u64, + new_version: u64, + request: &TransferRequest, + ) -> Result { // Params order: // 1 - from, 2 - to, 3 - amount // 4 - new_ver, 5 - safepoint @@ -119,13 +54,7 @@ impl OutOfOrderInstallerImpl { WHERE ba."number" IN (($1)::TEXT, ($2)::TEXT) "#; - let params: &[&(dyn ToSql + Sync)] = &[ - &self.request.from, - &self.request.to, - &self.request.amount, - &(new_version as i64), - &(safepoint as i64), - ]; + let params: &[&(dyn ToSql + Sync)] = &[&request.from, &request.to, &request.amount, &(new_version as i64), &(safepoint as i64)]; let result = self .database @@ -150,10 +79,7 @@ impl OutOfOrderInstallerImpl { if result.is_empty() { // there were no items found to work with - log::warn!( - "No bank accounts where found by these IDs: {:?}", - (self.request.from.clone(), self.request.to.clone()) - ); + log::warn!("No bank accounts where found by these IDs: {:?}", (request.from.clone(), request.to.clone())); let c = Arc::clone(&self.counter_oo_no_data_found); tokio::spawn(async move { c.add(1, &[]); @@ -162,8 +88,8 @@ impl OutOfOrderInstallerImpl { } // Quickly grab the snapshot to check whether safepoint condition is satisfied. Any row can be used for that. - let (_, _, _, snapshot) = &result[0]; - if (*snapshot as u64) < safepoint { + let (_, _, _, snapshot) = result[0]; + if (snapshot as u64) < safepoint { return Ok(OutOfOrderInstallOutcome::SafepointCondition); } @@ -185,19 +111,17 @@ impl OutOfOrderInstallerImpl { let (number, new_ver, version, _snapshot) = &result[0]; if new_ver.is_none() { log::debug!( - "Case 1: No rows were updated for xid '{}' when installing out of order data with new version {} using attempts: {}. Account {} version is now {:?}. Another candidate account was not found", + "Case 1: No rows were updated for xid '{}' when installing out of order data with new version {}. Account {} version is now {:?}. Another candidate account was not found", xid, new_version, - attempt_nr, number, version, ); } else { log::debug!( - "Case 1: 1 row was updated for xid '{}' when installing out of order data with new version {} using attempts: {}. Account {} version is now {:?}. Another candidate account was not found", + "Case 1: 1 row was updated for xid '{}' when installing out of order data with new version {}. Account {} version is now {:?}. Another candidate account was not found", xid, new_version, - attempt_nr, number, new_ver, ); @@ -209,20 +133,18 @@ impl OutOfOrderInstallerImpl { let (_, _, version_from, _) = &result[0]; let (_, _, version_to, _) = &result[1]; log::debug!( - "Case 2.1: No rows were updated for xid '{}' when installing out of order data with new version {} using attempts: {}. Current versions have moved to {:?}", + "Case 2.1: No rows were updated for xid '{}' when installing out of order data with new version {}. Current versions have moved to {:?}", xid, new_version, - attempt_nr, (version_from, version_to) ); } else { // 2.2 let (number, new_ver, _, _) = &result[0]; log::debug!( - "Case 2.2: 1 row was updated for xid '{}' when installing out of order data with new version {} using attempts: {}. Account {} version is now {:?}. Another candidate account was not found", + "Case 2.2: 1 row was updated for xid '{}' when installing out of order data with new version {}. Account {} version is now {:?}. Another candidate account was not found", xid, new_version, - attempt_nr, number, new_ver, ); @@ -234,10 +156,9 @@ impl OutOfOrderInstallerImpl { let (_, _, version, _) = if *number_a == *number { &result[2] } else { &result[1] }; log::debug!( - "Case 3: 1 row was updated for xid '{}' when installing out of order data with new version {} using attempts: {}. Account {} version is now {:?}. Another accout was already set to: {:?}", + "Case 3: 1 row was updated for xid '{}' when installing out of order data with new version {}. Account {} version is now {:?}. Another accout was already set to: {:?}", xid, new_version, - attempt_nr, number, new_ver, version, @@ -245,17 +166,31 @@ impl OutOfOrderInstallerImpl { } } + Ok(OutOfOrderInstallOutcome::Installed) + } + pub async fn install_statemap(&self, install_item: OutOfOrderInstallRequest) -> Result { + // TODO: GK - + // For our testing the statemap size is 1,so the below approach is fine. But if we have to install multiple statemaps, then we would ideally + // use a transaction and either install all or none, depending on error and handle the return accordingly. + for statemap in install_item.statemaps.iter() { + let payload = statemap.values().next().unwrap(); + let request: TransferRequest = serde_json::from_value(payload.clone()).unwrap(); + let result = self + .install_using_single_query(&install_item.xid, install_item.safepoint, install_item.version, &request) + .await?; + + if result != OutOfOrderInstallOutcome::Installed { + return Ok(result); + }; + } + Ok(OutOfOrderInstallOutcome::Installed) } } #[async_trait] impl OutOfOrderInstaller for OutOfOrderInstallerImpl { - async fn install(&self, xid: String, safepoint: u64, new_version: u64, attempt_nr: u32) -> Result { - if self.single_query_strategy { - self.install_using_single_query(xid, safepoint, new_version, attempt_nr).await - } else { - self.install_using_polling(xid, safepoint, new_version, attempt_nr).await - } + async fn install(&self, install_item: OutOfOrderInstallRequest) -> Result { + self.install_statemap(install_item).await } } diff --git a/packages/cohort_banking/src/model/requests.rs b/packages/cohort_banking/src/model/requests.rs index a465ab17..d51adca4 100644 --- a/packages/cohort_banking/src/model/requests.rs +++ b/packages/cohort_banking/src/model/requests.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + // $coverage:ignore-start use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; @@ -6,6 +8,20 @@ use serde_json::Value; use strum::{Display, EnumString}; +#[derive(Clone)] +pub struct CandidateData { + pub readset: Vec, + pub writeset: Vec, + pub statemap: Option>>, + // The "snapshot" is intentionally messing here. We will compute it ourselves before feeding this data to Talos +} + +#[derive(Clone)] +pub struct CertificationRequest { + pub candidate: CandidateData, + pub timeout_ms: u64, +} + #[derive(Display, Debug, Deserialize, EnumString, PartialEq, Eq)] pub enum BusinessActionType { TRANSFER, diff --git a/packages/cohort_sdk/src/cohort.rs b/packages/cohort_sdk/src/cohort.rs index 12817538..2da60400 100644 --- a/packages/cohort_sdk/src/cohort.rs +++ b/packages/cohort_sdk/src/cohort.rs @@ -3,6 +3,7 @@ use std::{ time::{Duration, Instant}, }; +use futures::Future; use opentelemetry_api::{ global, metrics::{Counter, Histogram, Unit}, @@ -26,7 +27,7 @@ use crate::{ delay_controller::DelayController, model::{ self, - callbacks::{CapturedState, ItemStateProvider, OutOfOrderInstallOutcome, OutOfOrderInstaller}, + callback::{CertificationCandidateCallbackResponse, OutOfOrderInstallOutcome, OutOfOrderInstallRequest, OutOfOrderInstaller}, internal::CertificationAttemptOutcome, CertificationResponse, ClientError, Config, ResponseMetadata, }, @@ -144,18 +145,59 @@ impl Cohort { }) } - pub async fn certify( - &self, - request: model::CertificationRequest, - state_provider: &S, - oo_installer: &O, - ) -> Result + fn select_snapshot_and_readvers(cpt_snapshot: u64, cpt_versions: Vec) -> (u64, Vec) { + if cpt_versions.is_empty() { + log::debug!( + "select_snapshot_and_readvers({}, {:?}): {:?}", + cpt_snapshot, + cpt_versions, + (cpt_snapshot, Vec::::new()) + ); + return (cpt_snapshot, vec![]); + } + + let mut cpt_version_min: u64 = u64::MAX; + for v in cpt_versions.iter() { + if cpt_version_min > *v { + cpt_version_min = *v; + } + } + let snapshot_version = std::cmp::max(cpt_snapshot, cpt_version_min); + let mut read_vers = Vec::::new(); + for v in cpt_versions.iter() { + if snapshot_version < *v { + read_vers.push(*v); + } + } + + log::debug!( + "select_snapshot_and_readvers({}, {:?}): {:?}", + cpt_snapshot, + cpt_versions, + (snapshot_version, read_vers.clone()) + ); + (snapshot_version, read_vers) + } + + pub async fn shutdown(&self) { + self.agent_services.decision_reader.abort(); + self.agent_services.state_manager.abort(); + } + + /// Certifies a candidate in talos and install the statemap if the certification is successful. + /// Uses two callbacks + /// - First callback to build the request payload to send to talos for certification + /// - The installer callback will be called to do the out of order install. + pub async fn certify(&self, get_certification_candidate_callback: &F, oo_installer: &O) -> Result where - S: ItemStateProvider, + F: Fn() -> Fut, + Fut: Future>, O: OutOfOrderInstaller, { let span_1 = Instant::now(); - let response = self.send_to_talos(request, state_provider).await?; + // 1. Get the snapshot + // 2. Send for certification + let response = self.send_to_talos(get_certification_candidate_callback).await?; let span_1_val = span_1.elapsed().as_nanos() as f64 / 1_000_000_f64; let h_talos = Arc::clone(&self.talos_histogram); @@ -163,14 +205,28 @@ impl Cohort { h_talos.record(span_1_val * 100.0, &[]); }); - if response.decision == Decision::Aborted { + if response.safepoint.is_none() || response.statemaps.is_none() { return Ok(response); } - // system error if we have Commit decision but no safepoint is given - let safepoint = response.safepoint.unwrap(); - let new_version = response.version; + let oooinstall_payload = OutOfOrderInstallRequest { + xid: response.xid.clone(), + version: response.version, + safepoint: response.safepoint.unwrap(), + statemaps: response.statemaps.clone().unwrap(), + }; + + // 3. OOO install + self.install_statemaps_oo(oooinstall_payload, oo_installer).await?; + + Ok(response) + } + /// Installs the statemap for candidate messages with committed decisions received from talos. + async fn install_statemaps_oo(&self, install_payload: OutOfOrderInstallRequest, oo_installer: &O) -> Result<(), ClientError> + where + O: OutOfOrderInstaller, + { let mut controller = DelayController::new(self.config.retry_oo_backoff.min_ms, self.config.retry_oo_backoff.max_ms); let mut attempt = 0; let span_2 = Instant::now(); @@ -178,11 +234,13 @@ impl Cohort { let mut is_not_save = 0_u64; let mut giveups = 0_u64; + let safepoint = install_payload.safepoint; let result = loop { attempt += 1; let span_3 = Instant::now(); - let install_result = oo_installer.install(response.xid.clone(), safepoint, new_version, attempt).await; + + let install_result = oo_installer.install(install_payload.clone()).await; let span_3_val = span_3.elapsed().as_nanos() as f64 / 1_000_000_f64; let h_install = Arc::clone(&self.oo_install_histogram); @@ -192,8 +250,6 @@ impl Cohort { }); let error = match install_result { - Ok(OutOfOrderInstallOutcome::Installed) => None, - Ok(OutOfOrderInstallOutcome::InstalledAlready) => None, Ok(OutOfOrderInstallOutcome::SafepointCondition) => { is_not_save += 1; // We create this error as "safepoint timeout" in advance. Error is erased if further attempt will be successfull or replaced with anotuer error. @@ -203,6 +259,7 @@ impl Cohort { cause: None, }) } + Ok(_) => None, Err(error) => Some(ClientError { kind: model::ClientErrorKind::OutOfOrderCallbackFailed, reason: error, @@ -219,7 +276,7 @@ impl Cohort { // try again controller.sleep().await; } else { - break Ok(response); + break Ok(()); } }; @@ -250,17 +307,22 @@ impl Cohort { h_attempts.record(attempt as u64, &[]); h_span_2.record(span_2_val * 100.0, &[]); }); + + log::debug!("Total attempts used to install: {attempt}"); result } - async fn send_to_talos(&self, request: model::CertificationRequest, state_provider: &S) -> Result + async fn send_to_talos(&self, get_certification_candidate_callback: &F) -> Result where - S: ItemStateProvider, + F: Fn() -> Fut, + Fut: Future>, { let started_at = Instant::now(); - let mut attempts = 0; + let mut result: Option>; let mut delay_controller = DelayController::new(self.config.retry_backoff.min_ms, self.config.retry_backoff.max_ms); + + let mut attempts = 0; let mut talos_aborts = 0_u64; let mut agent_errors = 0_u64; let mut db_errors = 0_u64; @@ -268,45 +330,44 @@ impl Cohort { let mut recent_conflict: Option = None; let mut recent_abort: Option = None; - let result = loop { - // One of these will be sent to client if we failed - let result: Option>; - - attempts += 1; - let is_success = match self.send_to_talos_attempt(request.clone(), state_provider, recent_conflict).await { - CertificationAttemptOutcome::ClientAborted { reason } => { - result = Some(Err(ClientError { - kind: model::ClientErrorKind::ClientAborted, - reason, - cause: None, - })); - false - } + // The loop exits when either of the below conditions are met. + // 1. When commit decision is received from talos agent/certifier. + // 2. When an clientabort is requested. + // 3. When all retries are exhausted. + let final_result = loop { + // Await for snapshot and build the certification request payload. + // Send the certification payload to talos + match self.send_to_talos_attempt(&get_certification_candidate_callback, recent_conflict).await { CertificationAttemptOutcome::Success { mut response } => { response.metadata.duration_ms = started_at.elapsed().as_millis() as u64; response.metadata.attempts = attempts; - result = Some(Ok(response)); - true + break Ok(response); } CertificationAttemptOutcome::Aborted { mut response } => { talos_aborts += 1; response.metadata.duration_ms = started_at.elapsed().as_millis() as u64; response.metadata.attempts = attempts; + + // TODO: GK - aborts by rule 2 will not have any conflict version. recent_conflict = response.conflict; recent_abort = Some(response.clone()); - // result = recent_abort.map(|a| Ok(a)); result = Some(Ok(response)); - false - } - CertificationAttemptOutcome::SnapshotTimeout { waited, conflict } => { - log::error!("Timeout wating for snapshot: {:?}. Waited: {:.2} sec", conflict, waited.as_secs_f32()); - result = recent_abort.clone().map(Ok); - false } CertificationAttemptOutcome::AgentError { error } => { result = Some(Err(ClientError::from(error))); agent_errors += 1; - false + } + + CertificationAttemptOutcome::Cancelled { reason } => { + break Err(ClientError { + kind: model::ClientErrorKind::Cancelled, + reason, + cause: None, + }); + } + CertificationAttemptOutcome::SnapshotTimeout { waited, conflict } => { + log::error!("Timeout wating for snapshot: {:?}. Waited: {:.2} sec", conflict, waited.as_secs_f32()); + result = recent_abort.clone().map(Ok); } CertificationAttemptOutcome::DataError { reason } => { result = Some(Err(ClientError { @@ -315,12 +376,12 @@ impl Cohort { cause: None, })); db_errors += 1; - false } - }; + } let rslt_response = result.unwrap(); - if is_success || self.config.retry_attempts_max <= attempts { + attempts += 1; + if attempts >= self.config.retry_attempts_max { break rslt_response; } @@ -328,18 +389,13 @@ impl Cohort { Ok(response) => { log::debug!( "Unsuccessful transaction: {:?}. Response: {:?} This might retry. Attempts: {}", - request.candidate.statemap, + response.statemaps, response.decision, attempts ); } Err(error) => { - log::debug!( - "Unsuccessful transaction with error: {:?}. {} This might retry. Attempts: {}", - request.candidate.statemap, - error, - attempts - ); + log::debug!("Unsuccessful transaction with error: {:?}. This might retry. Attempts: {}", error, attempts); } } @@ -360,25 +416,20 @@ impl Cohort { }); } - result + final_result } - async fn send_to_talos_attempt( - &self, - request: model::CertificationRequest, - state_provider: &S, - previous_conflict: Option, - ) -> CertificationAttemptOutcome + async fn send_to_talos_attempt(&self, get_certification_candidate_callback: &F, previous_conflict: Option) -> CertificationAttemptOutcome where - S: ItemStateProvider, + F: Fn() -> Fut, + Fut: Future>, { - let timeout = if request.timeout_ms > 0 { - Duration::from_millis(request.timeout_ms) - } else { - Duration::from_millis(self.config.snapshot_wait_timeout_ms as u64) - }; + let timeout = Duration::from_millis(self.config.snapshot_wait_timeout_ms as u64); - let local_state: CapturedState = match self.await_for_snapshot(state_provider, previous_conflict, timeout).await { + let request = match self + .create_candidate_for_certification(get_certification_candidate_callback, previous_conflict, timeout) + .await + { Err(SnapshotPollErrorType::FetchError { reason }) => return CertificationAttemptOutcome::DataError { reason }, Err(SnapshotPollErrorType::Timeout { waited }) => { return CertificationAttemptOutcome::SnapshotTimeout { @@ -386,23 +437,22 @@ impl Cohort { conflict: previous_conflict.unwrap(), } } - Ok(local_state) => local_state, + Ok(CertificationCandidateCallbackResponse::Cancelled(reason)) => { + return CertificationAttemptOutcome::Cancelled { reason }; + } + Ok(CertificationCandidateCallbackResponse::Proceed(request)) => request, }; - if let Some(reason) = local_state.abort_reason { - return CertificationAttemptOutcome::ClientAborted { reason }; - } - - log::debug!("loaded state: {}, {:?}", local_state.snapshot_version, local_state.items); + log::debug!("loaded state: {}, {:?}", request.snapshot, request.candidate); - let (snapshot, readvers) = Self::select_snapshot_and_readvers(local_state.snapshot_version, local_state.items.iter().map(|i| i.version).collect()); + let (snapshot, readvers) = Self::select_snapshot_and_readvers(request.snapshot, request.candidate.readvers); let xid = uuid::Uuid::new_v4().to_string(); let agent_request = CertificationRequest { message_key: xid.clone(), candidate: CandidateData { - xid: xid.clone(), - statemap: request.candidate.statemap, + xid, + statemap: request.candidate.statemaps.clone(), readset: request.candidate.readset, writeset: request.candidate.writeset, readvers, @@ -424,6 +474,7 @@ impl Cohort { version: agent_response.version, metadata: ResponseMetadata { duration_ms: 0, attempts: 0 }, conflict: agent_response.conflict.map(|cm| cm.version), + statemaps: request.candidate.statemaps, }; if response.decision == Decision::Aborted { @@ -436,79 +487,39 @@ impl Cohort { } } - async fn await_for_snapshot(&self, state_provider: &S, previous_conflict: Option, timeout: Duration) -> Result + async fn create_candidate_for_certification( + &self, + get_candidate_callback: &F, + previous_conflict: Option, + timeout: Duration, + ) -> Result where - S: ItemStateProvider, + F: Fn() -> Fut, + Fut: Future>, { - match previous_conflict { - None => state_provider.get_state().await.map_err(|reason| SnapshotPollErrorType::FetchError { reason }), - Some(conflict) => { - let mut delay_controller = DelayController::new(self.config.backoff_on_conflict.min_ms, self.config.backoff_on_conflict.max_ms); - let poll_started_at = Instant::now(); - loop { - let result_local_state = state_provider.get_state().await; - match result_local_state { - Err(reason) => return Err(SnapshotPollErrorType::FetchError { reason }), - Ok(current_state) => { - if current_state.abort_reason.is_some() { - break Ok(current_state); - } else if current_state.snapshot_version < conflict { - // not safe yet - let waited = poll_started_at.elapsed(); - if waited >= timeout { - return Err(SnapshotPollErrorType::Timeout { waited }); - } - delay_controller.sleep().await; - continue; - } else { - break Ok(current_state); - } - } + let conflict = previous_conflict.unwrap_or(0); + + let mut delay_controller = DelayController::new(self.config.backoff_on_conflict.min_ms, self.config.backoff_on_conflict.max_ms); + let poll_started_at = Instant::now(); + + loop { + let candidate_callback_result = get_candidate_callback().await; + match candidate_callback_result { + Err(reason) => return Err(SnapshotPollErrorType::FetchError { reason }), + Ok(CertificationCandidateCallbackResponse::Proceed(request)) if request.snapshot < conflict => { + let waited = poll_started_at.elapsed(); + if waited >= timeout { + return Err(SnapshotPollErrorType::Timeout { waited }); } + delay_controller.sleep().await; + continue; } + Ok(request) => return Ok(request), } } } - - fn select_snapshot_and_readvers(cpt_snapshot: u64, cpt_versions: Vec) -> (u64, Vec) { - if cpt_versions.is_empty() { - log::debug!( - "select_snapshot_and_readvers({}, {:?}): {:?}", - cpt_snapshot, - cpt_versions, - (cpt_snapshot, Vec::::new()) - ); - return (cpt_snapshot, vec![]); - } - - let mut cpt_version_min: u64 = u64::MAX; - for v in cpt_versions.iter() { - if cpt_version_min > *v { - cpt_version_min = *v; - } - } - let snapshot_version = std::cmp::max(cpt_snapshot, cpt_version_min); - let mut read_vers = Vec::::new(); - for v in cpt_versions.iter() { - if snapshot_version < *v { - read_vers.push(*v); - } - } - - log::debug!( - "select_snapshot_and_readvers({}, {:?}): {:?}", - cpt_snapshot, - cpt_versions, - (snapshot_version, read_vers.clone()) - ); - (snapshot_version, read_vers) - } - - pub async fn shutdown(&self) { - self.agent_services.decision_reader.abort(); - self.agent_services.state_manager.abort(); - } } +#[derive(Debug)] pub enum SnapshotPollErrorType { Timeout { waited: Duration }, FetchError { reason: String }, diff --git a/packages/cohort_sdk/src/model/callback.rs b/packages/cohort_sdk/src/model/callback.rs new file mode 100644 index 00000000..c3e23644 --- /dev/null +++ b/packages/cohort_sdk/src/model/callback.rs @@ -0,0 +1,51 @@ +use std::collections::HashMap; + +use async_trait::async_trait; +use serde_json::Value; + +#[derive(Debug, PartialEq)] +pub enum CertificationCandidateCallbackResponse { + Cancelled(String), + Proceed(CertificationRequestPayload), +} + +#[derive(Debug, Clone, PartialEq)] +pub struct CertificationRequestPayload { + pub candidate: CertificationCandidate, + pub snapshot: u64, + pub timeout_ms: u64, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct CertificationCandidate { + pub readset: Vec, + pub writeset: Vec, + pub readvers: Vec, + pub statemaps: Option>>, +} + +#[derive(Debug, Clone)] +pub struct OutOfOrderInstallRequest { + pub xid: String, + pub version: u64, + pub safepoint: u64, + pub statemaps: Vec>, +} + +#[derive(Debug, PartialEq, PartialOrd)] + +pub enum OutOfOrderInstallOutcome { + Installed, + InstalledAlready, + SafepointCondition, +} + +// #[async_trait] +// pub trait CertificationRequestProvider { +// async fn get_candidate_to_certify(&self) -> Result; +// } + +#[async_trait] +pub trait OutOfOrderInstaller { + async fn install(&self, install_item: OutOfOrderInstallRequest) -> Result; +} diff --git a/packages/cohort_sdk/src/model/callbacks.rs b/packages/cohort_sdk/src/model/callbacks.rs deleted file mode 100644 index 9a3e7c2f..00000000 --- a/packages/cohort_sdk/src/model/callbacks.rs +++ /dev/null @@ -1,29 +0,0 @@ -use async_trait::async_trait; - -pub struct CapturedState { - pub abort_reason: Option, - pub snapshot_version: u64, - pub items: Vec, -} - -#[derive(Debug)] -pub struct CapturedItemState { - pub id: String, - pub version: u64, -} - -#[async_trait] -pub trait ItemStateProvider { - async fn get_state(&self) -> Result; -} - -#[async_trait] -pub trait OutOfOrderInstaller { - async fn install(&self, xid: String, safepoint: u64, new_version: u64, attempt_nr: u32) -> Result; -} - -pub enum OutOfOrderInstallOutcome { - Installed, - InstalledAlready, - SafepointCondition, -} diff --git a/packages/cohort_sdk/src/model/internal.rs b/packages/cohort_sdk/src/model/internal.rs index 133b8d8b..0f0ec394 100644 --- a/packages/cohort_sdk/src/model/internal.rs +++ b/packages/cohort_sdk/src/model/internal.rs @@ -5,7 +5,7 @@ use talos_agent::agent::errors::AgentError; use super::CertificationResponse; pub(crate) enum CertificationAttemptOutcome { - ClientAborted { reason: String }, + Cancelled { reason: String }, Success { response: CertificationResponse }, Aborted { response: CertificationResponse }, AgentError { error: AgentError }, diff --git a/packages/cohort_sdk/src/model/mod.rs b/packages/cohort_sdk/src/model/mod.rs index adb9c3c0..2a4b4178 100644 --- a/packages/cohort_sdk/src/model/mod.rs +++ b/packages/cohort_sdk/src/model/mod.rs @@ -1,4 +1,4 @@ -pub mod callbacks; +pub mod callback; pub mod internal; use std::{collections::HashMap, fmt::Display}; @@ -12,20 +12,6 @@ use talos_agent::{ use talos_rdkafka_utils::kafka_config::KafkaConfig; use tokio::task::JoinHandle; -#[derive(Clone)] -pub struct CandidateData { - pub readset: Vec, - pub writeset: Vec, - pub statemap: Option>>, - // The "snapshot" is intentionally messing here. We will compute it ourselves before feeding this data to Talos -} - -#[derive(Clone)] -pub struct CertificationRequest { - pub candidate: CandidateData, - pub timeout_ms: u64, -} - #[derive(Clone)] pub struct CertificationResponse { pub xid: String, @@ -34,6 +20,7 @@ pub struct CertificationResponse { pub safepoint: Option, pub conflict: Option, pub metadata: ResponseMetadata, + pub statemaps: Option>>, } #[derive(Clone)] @@ -42,12 +29,12 @@ pub struct ResponseMetadata { pub duration_ms: u64, } -#[derive(strum::Display)] +#[derive(strum::Display, Debug)] // this is napi friendly copy of talos_agent::agent::errors::AgentErrorKind pub enum ClientErrorKind { Certification, CertificationTimeout, - ClientAborted, + Cancelled, Messaging, Persistence, Internal, @@ -55,6 +42,7 @@ pub enum ClientErrorKind { OutOfOrderSnapshotTimeout, } +#[derive(Debug)] pub struct ClientError { pub kind: ClientErrorKind, pub reason: String, diff --git a/packages/metrics/src/opentel/printer.rs b/packages/metrics/src/opentel/printer.rs index 86104c15..5341d4f5 100644 --- a/packages/metrics/src/opentel/printer.rs +++ b/packages/metrics/src/opentel/printer.rs @@ -51,7 +51,7 @@ impl MetricsToStringPrinter { let mut out: String = "".to_owned(); let serde_value = serde_json::to_value(metrics).map_err(|e| e.to_string())?; let container = serde_json::from_value::(serde_value).map_err(|e| e.to_string())?; - let percentile_labels = vec![25.0, 50.0, 75.0, 90.0, 95.0, 98.0, 99.0, 99.9, 99.99, 100.0]; + let percentile_labels = [25.0, 50.0, 75.0, 90.0, 95.0, 98.0, 99.0, 99.9, 99.99, 100.0]; if let Some(ref filter) = self.resource_filter { let mut filter_passed = filter.is_empty(); diff --git a/packages/talos_cohort_replicator/src/models/statemap_installer_queue.rs b/packages/talos_cohort_replicator/src/models/statemap_installer_queue.rs index 0277c8fa..525f6e53 100644 --- a/packages/talos_cohort_replicator/src/models/statemap_installer_queue.rs +++ b/packages/talos_cohort_replicator/src/models/statemap_installer_queue.rs @@ -44,7 +44,9 @@ impl StatemapInstallerQueue { } pub fn remove_installed(&mut self) -> Option { - let Some(index) = self.queue.get_index_of(&self.snapshot_version) else { return None;}; + let Some(index) = self.queue.get_index_of(&self.snapshot_version) else { + return None; + }; let items = self.queue.drain(..index); diff --git a/packages/talos_suffix/src/suffix.rs b/packages/talos_suffix/src/suffix.rs index 77c31443..e0ae433a 100644 --- a/packages/talos_suffix/src/suffix.rs +++ b/packages/talos_suffix/src/suffix.rs @@ -144,9 +144,7 @@ where pub fn get_safe_prune_index(&mut self) -> Option { // If `prune_start_threshold=None` don't prune. let Some(prune_threshold) = self.meta.prune_start_threshold else { - debug!( - "[SUFFIX PRUNE CHECK] As suffix.meta.prune_start_threshold is None, pruning is disabled." - ); + debug!("[SUFFIX PRUNE CHECK] As suffix.meta.prune_start_threshold is None, pruning is disabled."); return None; }; @@ -194,13 +192,14 @@ where return Ok(()); } - let Some(sfx_item) = self - .get(version)? - else { - info!("Returned due item not found in suffix for version={version} with index={:?} and decision version={decision_ver}", self.index_from_head(version)); - // info!("All some items on suffix.... {:?}", self.retrieve_all_some_vec_items()); - return Ok(()); - }; + let Some(sfx_item) = self.get(version)? else { + info!( + "Returned due item not found in suffix for version={version} with index={:?} and decision version={decision_ver}", + self.index_from_head(version) + ); + // info!("All some items on suffix.... {:?}", self.retrieve_all_some_vec_items()); + return Ok(()); + }; let new_sfx_item = SuffixItem { decision_ver: Some(decision_ver), diff --git a/packages/talos_suffix/src/tests.rs b/packages/talos_suffix/src/tests.rs index d031f550..8734bb04 100644 --- a/packages/talos_suffix/src/tests.rs +++ b/packages/talos_suffix/src/tests.rs @@ -30,7 +30,7 @@ mod suffix_tests { let full_vec = r.collect::>(); let Some(ignore_vec) = ignore_vers else { - return full_vec; + return full_vec; }; let vec_remaining = full_vec.iter().filter_map(|&v| (!ignore_vec.contains(&v)).then_some(v)); @@ -359,7 +359,7 @@ mod suffix_tests { #[test] fn test_get_non_empty_items_when_no_empty_items() { - let list_to_scan = vec![Some(10_u8); 100]; + let list_to_scan = [Some(10_u8); 100]; let result = get_nonempty_suffix_items(list_to_scan.iter()); assert_eq!(result.count(), 100);