From f056f1f1feb3df03ccfc5e6117f0f9a45d9d244d Mon Sep 17 00:00:00 2001 From: Amninder Kaur Date: Fri, 13 Oct 2023 10:08:06 +1100 Subject: [PATCH 1/4] fix: handle retry and graceful close for all threads if one is stopped --- .env.example | 27 +++- .github/workflows/release.yml | 18 +-- Cargo.lock | 4 +- How-to.md | 24 +++ chronos_bin/Cargo.toml | 2 +- chronos_bin/src/bin/chronos.rs | 20 ++- chronos_bin/src/kafka/mod.rs | 1 - chronos_bin/src/kafka/producer.rs | 2 +- chronos_bin/src/message_processor.rs | 2 +- chronos_bin/src/message_receiver.rs | 1 - chronos_bin/src/postgres/config.rs | 2 +- chronos_bin/src/postgres/pg.rs | 230 ++++++++++++++++----------- chronos_bin/src/runner.rs | 5 +- chronos_bin/src/utils/config.rs | 20 +-- chronos_bin/src/utils/util.rs | 4 +- pg_mig/Cargo.toml | 2 +- 16 files changed, 230 insertions(+), 134 deletions(-) diff --git a/.env.example b/.env.example index e74c43a..799e49c 100644 --- a/.env.example +++ b/.env.example @@ -14,6 +14,17 @@ KAFKA_OUT_TOPIC="chronos.out" KAFKA_USERNAME= KAFKA_PASSWORD= +# KAFKA +# KAFKA_BROKERS=kb001.ksp-sbx.syd1.kc.thinkbig.local:9092,kb002.ksp-sbx.syd1.kc.thinkbig.local:9092,kb003.ksp-sbx.syd1.kc.thinkbig.local:9092,kb004.ksp-sbx.syd1.kc.thinkbig.local:9092,kb005.ksp-sbx.syd1.kc.thinkbig.local:9092 +# KAFKA_CLIENT_ID="chronos-dev" +# KAFKA_GROUP_ID="chronos-dev" +# KAFKA_INPUT_TOPIC="sbx.ksp.bet.private.delay.in" +# KAFKA_OUTPUT_TOPIC="sbx.ksp.bet.private.delay.out" +# KAFKA_USERNAME="chimera-chronos" +# KAFKA_PASSWORD="Lbosg675kzTGkyXUw97r0Mt3gGiAfpa4" +# KAFKA_SASL_MECH="SCRAM-SHA-512" +# KAFKA_SEC_PROTOCOL="SASL_PLAINTEXT" + # POSTGRES # NB: `node-postgres` AND `node-pg-migrate` USE THE SAME ENVIRONMENT VARIABLES AS `libpq` TO CONNECT TO A POSTGRESQL SERVER # NODE_PG_FORCE_NATIVE=1 @@ -24,6 +35,17 @@ PG_PASSWORD=admin PG_DATABASE=chronos_db PG_POOL_SIZE=50 +# POSTGRES +# NB: `node-postgres` AND `node-pg-migrate` USE THE SAME ENVIRONMENT VARIABLES AS `libpq` TO CONNECT TO A POSTGRESQL SERVER +# NODE_PG_FORCE_NATIVE=1 +# PG_HOST=pgsql-ksp-sbx.unibet.com.au +# PG_PORT=5432 +# PG_USER=chimera-chronos +# PG_PASSWORD=Npr0QfoU4TJNb3BH7fe21vfwhPTVwB4Q +# PG_DATABASE=chimera_chronos +# PG_POOL_SIZE=50 + + # CONFIG RUST_LOG=info @@ -35,7 +57,10 @@ TIMING_ADVANCE=0 FAIL_DETECT_INTERVAL=500 MAX_RETRIES=3 PROCESSOR_DB_POLL=10 +<<<<<<< HEAD # TRACING OTEL_SERVICE_NAME=chronos -OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces \ No newline at end of file +OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces +======= +>>>>>>> 6107c18 (fix: handle retry and graceful close for all threads if one is stopped) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 350ce26..c26d2a7 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -6,15 +6,15 @@ name: release app binary on tag on: release: types: [created] - # push: - # tags: - # # only build on tags that start with 'v' - # # having major, minor and path version numbers - # # along with alpha beta support - # # e.g. v1.0.0-alpha.1, v1.0.0-beta.1, v1.0.0 - # - 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)+.[0-9]?' - # - 'v[0-9]+.[0-9]+.[0-9]' - # - 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)' + push: + tags: + # only build on tags that start with 'v' + # having major, minor and path version numbers + # along with alpha beta support + # e.g. v1.0.0-alpha.1, v1.0.0-beta.1, v1.0.0 + - 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)+.[0-9]?' + - 'v[0-9]+.[0-9]+.[0-9]' + - 'v[0-9]+.[0-9]+.[0-9]+(-[a-zA-Z0-9]+)' jobs: build: diff --git a/Cargo.lock b/Cargo.lock index b1ee408..7df7f80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -294,7 +294,7 @@ dependencies = [ [[package]] name = "chronos_bin" -version = "0.1.0" +version = "0.2.1" dependencies = [ "anyhow", "async-trait", @@ -1437,7 +1437,7 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "pg_mig" -version = "0.1.0" +version = "0.2.1" dependencies = [ "async-trait", "cargo-husky", diff --git a/How-to.md b/How-to.md index 9cdce64..3004e21 100644 --- a/How-to.md +++ b/How-to.md @@ -23,6 +23,7 @@ Use `make withenv RECIPE=docker.up` ## ENV vars All the required configurations for Chronos can be passed in environment variables mentioned below +<<<<<<< HEAD ### Required Vars |Env Var|Example Value| @@ -50,6 +51,29 @@ These values are set to fine tune performance Chrono in need, refer to [Chronos] | PROCESSOR_DB_POLL|5 milli sec | TIMING_ADVANCE|0 sec | FAIL_DETECT_INTERVAL|10 sec +======= +|Env Var|Example Value| Required| +|----|----|----| +|KAFKA_BROKERS|"localhost:9093"|True +| KAFKA_CLIENT_ID|"chronos"|True +| KAFKA_GROUP_ID|"chronos"|True +| KAFKA_IN_TOPIC|"chronos.in"|True +| KAFKA_OUT_TOPIC|"chronos.out"|True +| KAFKA_USERNAME||True +| KAFKA_PASSWORD||True +| PG_HOST|localhost|True +| PG_PORT|5432|True +| PG_USER|admin|True +| PG_PASSWORD|admin|True +| PG_DATABASE|chronos_db|True +| PG_POOL_SIZE|50|True +|NODE_ID|UUID|False +| DELAY_TIME|0|False +| RANDOMNESS_DELAY|100|False +| MONITOR_DB_POLL|5|False +| TIMING_ADVANCE|0|False +| FAIL_DETECT_INTERVAL|500|False +>>>>>>> 6107c18 (fix: handle retry and graceful close for all threads if one is stopped) ## Observability diff --git a/chronos_bin/Cargo.toml b/chronos_bin/Cargo.toml index a3c2dfc..cee9e01 100644 --- a/chronos_bin/Cargo.toml +++ b/chronos_bin/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "chronos_bin" -version = "0.1.0" +version = "0.2.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/chronos_bin/src/bin/chronos.rs b/chronos_bin/src/bin/chronos.rs index 5467490..1e6cb15 100644 --- a/chronos_bin/src/bin/chronos.rs +++ b/chronos_bin/src/bin/chronos.rs @@ -5,8 +5,9 @@ use chronos_bin::postgres::config::PgConfig; use chronos_bin::postgres::pg::Pg; use chronos_bin::runner::Runner; use chronos_bin::telemetry::register_telemetry::{TelemetryCollector, TelemetryCollectorType}; -use log::debug; +use log::{debug, info}; use std::sync::Arc; +use std::time::Duration; #[tokio::main] async fn main() { @@ -23,8 +24,23 @@ async fn main() { let kafka_consumer = KafkaConsumer::new(&kafka_config); let kafka_producer = KafkaProducer::new(&kafka_config); - let data_store = Pg::new(pg_config).await.unwrap(); + let data_store = match Pg::new(pg_config).await { + Ok(pg) => pg, + Err(e) => loop { + log::error!("couldnt connect to PG DB due to error::{} will retry ", e); + tokio::time::sleep(Duration::from_secs(10)).await; + let pg_config = PgConfig::from_env(); + match Pg::new(pg_config).await { + Ok(pg) => pg, + Err(e) => { + log::error!("error while creating PG intance {}", e); + continue; + } + }; + }, + }; + info!("starting chronos establish connections"); let r = Runner { data_store: Arc::new(data_store), producer: Arc::new(kafka_producer), diff --git a/chronos_bin/src/kafka/mod.rs b/chronos_bin/src/kafka/mod.rs index 2f7e54f..106b01f 100644 --- a/chronos_bin/src/kafka/mod.rs +++ b/chronos_bin/src/kafka/mod.rs @@ -1,5 +1,4 @@ pub mod config; pub mod consumer; pub mod errors; -pub mod kafka_deploy; pub mod producer; diff --git a/chronos_bin/src/kafka/producer.rs b/chronos_bin/src/kafka/producer.rs index 7df7df9..3b94743 100644 --- a/chronos_bin/src/kafka/producer.rs +++ b/chronos_bin/src/kafka/producer.rs @@ -18,7 +18,7 @@ pub struct KafkaProducer { impl KafkaProducer { pub fn new(config: &KafkaConfig) -> Self { - // Kafka Producer + // rdlibkafka goes infinitely trying to connect to kafka broker let producer = config.build_producer_config().create().expect("Failed to create producer"); let topic = config.out_topic.to_owned(); diff --git a/chronos_bin/src/message_processor.rs b/chronos_bin/src/message_processor.rs index 813db76..ea30437 100644 --- a/chronos_bin/src/message_processor.rs +++ b/chronos_bin/src/message_processor.rs @@ -28,7 +28,7 @@ impl MessageProcessor { let params = GetReady { readied_at: deadline, - readied_by: Uuid::parse_str(&node_id).unwrap(), + readied_by: node_id, deadline, // limit: 1000, // order: "asc", diff --git a/chronos_bin/src/message_receiver.rs b/chronos_bin/src/message_receiver.rs index bc6bc32..9d8caa1 100644 --- a/chronos_bin/src/message_receiver.rs +++ b/chronos_bin/src/message_receiver.rs @@ -11,7 +11,6 @@ use rdkafka::message::{BorrowedMessage, Message}; use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; -use std::time::Instant; pub struct MessageReceiver { pub(crate) consumer: Arc, diff --git a/chronos_bin/src/postgres/config.rs b/chronos_bin/src/postgres/config.rs index a896fa5..e5fae15 100644 --- a/chronos_bin/src/postgres/config.rs +++ b/chronos_bin/src/postgres/config.rs @@ -18,7 +18,7 @@ impl PgConfig { host: env_var!("PG_HOST"), port: env_var!("PG_PORT"), database: env_var!("PG_DATABASE"), - pool_size: env_var!("PG_POOL_SIZE").parse().unwrap(), + pool_size: env_var!("PG_POOL_SIZE").parse().unwrap_or(10), } } pub fn get_base_connection_string(&self) -> String { diff --git a/chronos_bin/src/postgres/pg.rs b/chronos_bin/src/postgres/pg.rs index 69bdf1b..aa50e50 100644 --- a/chronos_bin/src/postgres/pg.rs +++ b/chronos_bin/src/postgres/pg.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; use deadpool_postgres::{Config, GenericClient, ManagerConfig, Object, Pool, PoolConfig, Runtime, Transaction}; -use log::error; +use log::{error, info}; use std::time::{Duration, Instant}; use tokio_postgres::error::SqlState; use tokio_postgres::types::ToSql; @@ -63,26 +63,32 @@ struct PgAccess { } impl PgAccess { - pub async fn get_txn(&mut self) -> PgTxn { + pub async fn build_txn(&mut self) -> Result { let txn = self .client .build_transaction() .isolation_level(tokio_postgres::IsolationLevel::RepeatableRead) - .start() - .await - .unwrap(); - PgTxn { txn } + .start(); + + match txn.await { + Ok(txn) => Ok(PgTxn { txn }), + Err(e) => { + error!("Unable to start transaction: {}", e); + Err(PgError::UnknownException(e)) + } + } } } impl Pg { pub async fn new(pg_config: PgConfig) -> Result { + let port = pg_config.port.parse::().unwrap_or(0); // make the connection fail and send pack PgError let mut config = Config::new(); config.dbname = Some(pg_config.database); config.user = Some(pg_config.user); config.password = Some(pg_config.password); config.host = Some(pg_config.host); - config.port = Some(pg_config.port.parse::().expect("Failed to parse port to u16")); + config.port = Some(port); config.manager = Some(ManagerConfig { recycling_method: deadpool_postgres::RecyclingMethod::Fast, }); @@ -123,12 +129,12 @@ impl Pg { } }; - let rs = client.query_one("show transaction_isolation", &[]).await.unwrap(); + let rs = client.query_one("show transaction_isolation", &[]).await?; let value: String = rs.get(0); log::debug!("init: db-isolation-level: {}", value); } - println!("pool.status: {:?}", pool.status()); + log::info!("pool.status: {:?}", pool.status()); Ok(Pg { pool }) } @@ -149,13 +155,12 @@ impl Pg { pub(crate) async fn insert_to_delay_db(&self, params: &TableInsertRow<'_>) -> Result { let pg_client = self.get_client().await?; let mut pg_access = PgAccess { client: pg_client }; - let pg_txn: PgTxn = pg_access.get_txn().await; - + let pg_txn: PgTxn = pg_access.build_txn().await?; let insert_query = "INSERT INTO hanger (id, deadline, message_headers, message_key, message_value) VALUES ($1, $2 ,$3, $4, $5 )"; let query_execute_instant = Instant::now(); - let stmt = pg_txn.txn.prepare(insert_query).await.unwrap(); + let stmt = pg_txn.txn.prepare(insert_query).await?; let outcome = pg_txn .txn .execute( @@ -168,28 +173,41 @@ impl Pg { ¶ms.message_value, ], ) - .await; + .await?; let time_elapsed = query_execute_instant.elapsed(); if time_elapsed > Duration::from_millis(100) { - println!("insert_to_delay query_execute_instant: {:?} ", time_elapsed); + log::warn!("insert_to_delay query_execute_instant: {:?} ", time_elapsed); } if outcome.is_ok() { event!(tracing::Level::INFO, "insert_to_delay success"); let cmt_rdy = pg_txn.txn.commit().await; if let Err(e) = cmt_rdy { - error!("Unable to commit: {}. The original transaction updated: {} rows", e, outcome.unwrap()); + error!("Unable to commit: {}. The original transaction updated rows", e); return Err(PgError::UnknownException(e)); } } - Ok(outcome.unwrap()) + Ok(outcome) } #[tracing::instrument(skip_all)] - pub(crate) async fn delete_fired_db(&self, ids: &Vec) -> Result { - let pg_client = self.get_client().await.expect("Failed to get client from pool"); + pub(crate) async fn delete_fired(&self, ids: &Vec) -> Result { + // let query_execute_instant = Instant::now(); + let pg_client = match self.get_client().await { + Ok(client) => client, + Err(e) => { + error!("delete_fired: Unable to get client: {}", e); + return Err(format!("delete_fired: Unable to get client: {}", e)); + } + }; let mut pg_access = PgAccess { client: pg_client }; - let pg_txn: PgTxn = pg_access.get_txn().await; + let pg_txn: PgTxn = match pg_access.build_txn().await { + Ok(txn) => txn, + Err(e) => { + error!("delete_fired: Unable to start transaction: {}", e); + return Err(format!("delete_fired: Unable to start transaction: {}", e)); + } + }; let values_as_slice: Vec<_> = ids.iter().map(|x| x as &(dyn ToSql + Sync)).collect(); @@ -197,29 +215,35 @@ impl Pg { for i in 0..ids.len() { query = query + "$" + (i + 1).to_string().as_str() + ","; } - query = query.strip_suffix(',').unwrap().to_string(); + query = match query.strip_suffix(',') { + Some(query) => query.to_string(), + None => { + error!("reset_to_init: Unable to strip suffix"); + return Err("reset_to_init: Unable to strip suffix".to_string()); + } + }; query += ")"; - let stmt = pg_txn.txn.prepare(query.as_str()).await.unwrap(); + let stmt = match pg_txn.txn.prepare(query.as_str()).await { + Ok(stmt) => stmt, + Err(e) => { + error!("delete_fired: Unable to prepare query: {}", e); + return Err(format!("delete_fired: Unable to prepare query: {}", e)); + } + }; let response = pg_txn.txn.execute(&stmt, &values_as_slice).await; match response { Ok(resp) => { let cmt_rdy = pg_txn.txn.commit().await; if let Err(e) = cmt_rdy { - error!( - "delete_fired: Unable to commit: {}. The original transaction updated: {} rows", - e, - response.unwrap() - ); + error!("delete_fired: Unable to commit: {}. The original transaction updated: rows", e,); return Err(format!("Unable to commit: {}. The original transaction updated rows", e)); } Ok(resp) } Err(e) => { - let err_code = e.code(); - if err_code.is_some() { - let db_err = err_code.unwrap(); - if db_err == &SqlState::T_R_SERIALIZATION_FAILURE { + if let Some(err_code) = e.code() { + if err_code == &SqlState::T_R_SERIALIZATION_FAILURE { error!("delete_fired: Unable to execute txn due to : {}", e); return Err(format!("delete_fired: Unable to execute txn due to : {}", e)); } @@ -232,59 +256,58 @@ impl Pg { #[tracing::instrument(skip_all)] pub(crate) async fn ready_to_fire_db(&self, param: &GetReady) -> Result, String> { //TODO handle get client error gracefully - let pg_client = self.get_client().await.expect("Unable to get client"); + let pg_client = match self.get_client().await { + Ok(client) => client, + Err(e) => { + error!("ready_to_fire: Unable to get client: {}", e); + return Err(format!("ready_to_fire: Unable to get client: {}", e)); + } + }; let mut pg_access = PgAccess { client: pg_client }; - let pg_txn: PgTxn = pg_access.get_txn().await; + let pg_txn: PgTxn = match pg_access.build_txn().await { + Ok(txn) => txn, + Err(e) => { + error!("delete_fired: Unable to start transaction: {}", e); + return Err(format!("delete_fired: Unable to start transaction: {}", e)); + } + }; let ready_query = "UPDATE hanger SET readied_at = $1, readied_by = $2 where deadline < $3 AND readied_at IS NULL RETURNING id, deadline, readied_at, readied_by, message_headers, message_key, message_value"; - // let stmt = pg_client.prepare(ready_query).await.expect("Unable to prepare query"); - // let query_execute_instant = Instant::now(); - // let response = pg_client - // .query(&stmt, &[¶m.readied_at, ¶m.readied_by, ¶m.deadline]) - // .await - // .expect("update failed"); - // let time_elapsed = query_execute_instant.elapsed(); - // if time_elapsed > Duration::from_millis(100) { - // println!(" ready_to_fire query_execute_instant: {:?} params: {:?}", time_elapsed, param); - // } - // println!("redying success {:?}", &response); - // Ok(response) - // println!("ready_to_fire query {}", ¶m.deadline); - - // ready_to_fire_query_span.record("query", ready_query); - - let stmt = pg_txn.txn.prepare(ready_query).await.expect("Unable to prepare query"); - let query_execute_instant = Instant::now(); - let response = pg_txn.txn.query(&stmt, &[¶m.readied_at, ¶m.readied_by, ¶m.deadline]).await; - match response { - Ok(resp) => { - let cmt_rdy = pg_txn.txn.commit().await; - if let Err(e) = cmt_rdy { - error!("Unable to commit: {}. The original transaction updated: {:?} rows", e, resp); - return Err(format!( - "ready_to_fire: Unable to commit: {}. The original transaction updated: {:?} rows", - e, resp - )); - } - let time_elapsed = query_execute_instant.elapsed(); - if time_elapsed > Duration::from_millis(100) { - error!(" ready_to_fire query_execute_instant: {:?} params: {:?}", time_elapsed, param); + if let Ok(stmt) = pg_txn.txn.prepare(ready_query).await { + let query_execute_instant = Instant::now(); + let response = pg_txn.txn.query(&stmt, &[¶m.readied_at, ¶m.readied_by, ¶m.deadline]).await; + + match response { + Ok(resp) => { + let cmt_rdy = pg_txn.txn.commit().await; + if let Err(e) = cmt_rdy { + error!("Unable to commit: {}. The original transaction updated: {:?} rows", e, resp); + return Err(format!( + "ready_to_fire: Unable to commit: {}. The original transaction updated: {:?} rows", + e, resp + )); + } + let time_elapsed = query_execute_instant.elapsed(); + if time_elapsed > Duration::from_millis(100) { + log::warn!(" ready_to_fire query_execute_instant: {:?} params: {:?}", time_elapsed, param); + } + Ok(resp) } - Ok(resp) - } - Err(e) => { - let err_code = e.code(); - if err_code.is_some() { - let db_err = err_code.unwrap(); - if db_err == &SqlState::T_R_SERIALIZATION_FAILURE { - error!("ready_to_fire: Unable to execute txn due to : {}", e); - return Err(format!("ready_to_fire: Unable to execute txn due to : {}", e)); + Err(e) => { + if let Some(err_code) = e.code() { + if err_code == &SqlState::T_R_SERIALIZATION_FAILURE { + error!("ready_to_fire: Unable to execute txn due to : {}", e); + return Err(format!("ready_to_fire: Unable to execute txn due to : {}", e)); + } } + error!("ready_to_fire: Unknow exception {:?}", e); + Err(format!("ready_to_fire: Unknow exception {:?}", e)) } - error!("ready_to_fire: Unknow exception {:?}", e); - Err(format!("ready_to_fire: Unknow exception {:?}", e)) } + } else { + error!("ready_to_fire: Unable to prepare query"); + Err("ready_to_fire: Unable to prepare query".to_string()) } } @@ -292,15 +315,19 @@ impl Pg { pub(crate) async fn failed_to_fire_db(&self, delay_time: &DateTime) -> Result, PgError> { let query_execute_instant = Instant::now(); let pg_client = self.get_client().await?; + let mut pg_access = PgAccess { client: pg_client }; + let pg_txn: PgTxn = pg_access.build_txn().await?; + log::info!("failed_to_fire delay_time: {:?}", delay_time); let get_query = "SELECT * from hanger where readied_at > $1 ORDER BY deadline DESC"; - let stmt = pg_client.prepare(get_query).await?; + let stmt = pg_txn.txn.prepare(get_query).await?; - let response = pg_client.query(&stmt, &[&delay_time]).await.expect("get delayed messages query failed"); + let response = pg_txn.txn.query(&stmt, &[&delay_time]).await?; let time_elapsed = query_execute_instant.elapsed(); if time_elapsed > Duration::from_millis(100) { - error!(" failed_to_fire query_execute_instant: {:?} ", time_elapsed); + log::warn!(" failed_to_fire query_execute_instant: {:?} ", time_elapsed); } + Ok(response) } @@ -312,14 +339,21 @@ impl Pg { id_list.push(row.get("id")); } - let pg_client = self.get_client().await.expect("Unable to get client"); + let pg_client = match self.get_client().await { + Ok(client) => client, + Err(e) => { + error!("reset_to_init: Unable to get client: {}", e); + return Err(format!("reset_to_init: Unable to get client: {}", e)); + } + }; let mut pg_access = PgAccess { client: pg_client }; - let pg_txn: PgTxn = pg_access.get_txn().await; - - // let reset_query = format!( - // "UPDATE hanger SET readied_at=null , readied_by=null WHERE id IN ({})", - // ids_list - // ); + let pg_txn: PgTxn = match pg_access.build_txn().await { + Ok(txn) => txn, + Err(e) => { + error!("delete_fired: Unable to start transaction: {}", e); + return Err(format!("delete_fired: Unable to start transaction: {}", e)); + } + }; let values_as_slice: Vec<_> = id_list.iter().map(|x| x as &(dyn ToSql + Sync)).collect(); @@ -327,12 +361,22 @@ impl Pg { for i in 0..id_list.len() { query = query + "$" + (i + 1).to_string().as_str() + ","; } - query = query.strip_suffix(',').unwrap().to_string(); + query = match query.strip_suffix(',') { + Some(query) => query.to_string(), + None => { + error!("reset_to_init: Unable to strip suffix"); + return Err("reset_to_init: Unable to strip suffix".to_string()); + } + }; query += ")"; - // println!("reset query {}", query); - let stmt = pg_txn.txn.prepare(query.as_str()).await.expect("Unable to prepare query"); - + let stmt = match pg_txn.txn.prepare(query.as_str()).await { + Ok(stmt) => stmt, + Err(e) => { + error!("reset_to_init: Unable to prepare query: {}", e); + return Err(format!("reset_to_init: Unable to prepare query: {}", e)); + } + }; let response = pg_txn.txn.execute(&stmt, &values_as_slice[..]).await; match response { @@ -344,16 +388,14 @@ impl Pg { } let time_elapsed = query_execute_instant.elapsed(); if time_elapsed > Duration::from_millis(100) { - error!(" ready_to_fire query_execute_instant: {:?} ", time_elapsed); + log::warn!(" ready_to_fire query_execute_instant: {:?} ", time_elapsed); } Ok(id_list) } Err(e) => { error!("reset_to_init: Unable to execute txn due to : {}", e); - let err_code = e.code(); - if err_code.is_some() { - let db_err = err_code.unwrap(); - if db_err == &SqlState::T_R_SERIALIZATION_FAILURE { + if let Some(err_code) = e.code() { + if err_code == &SqlState::T_R_SERIALIZATION_FAILURE { error!("reset_to_init: Unable to execute txn due to : {}", e); return Err(format!("reset_to_init: Unable to execute txn due to : {}", e)); } diff --git a/chronos_bin/src/runner.rs b/chronos_bin/src/runner.rs index 0e610b0..550000e 100644 --- a/chronos_bin/src/runner.rs +++ b/chronos_bin/src/runner.rs @@ -46,6 +46,9 @@ impl Runner { message_receiver.run().await; }); - futures::future::join_all([monitor_handler, message_processor_handler, message_receiver_handler]).await; + let future_tuple = futures::future::try_join3(monitor_handler, message_processor_handler, message_receiver_handler).await; + if future_tuple.is_err() { + log::error!("Chronos Stopping all threads {:?}", future_tuple); + } } } diff --git a/chronos_bin/src/utils/config.rs b/chronos_bin/src/utils/config.rs index 58d599b..03d0fbb 100644 --- a/chronos_bin/src/utils/config.rs +++ b/chronos_bin/src/utils/config.rs @@ -11,22 +11,10 @@ impl ChronosConfig { pub fn from_env() -> ChronosConfig { ChronosConfig { // random_delay: env_var!("RANDOMNESS_DELAY").parse().unwrap(), - monitor_db_poll: std::env::var("MONITOR_DB_POLL") - .unwrap_or_else(|_| "5".to_string()) - .parse() - .expect("Failed to parse MONITOR_DB_POLL!!"), - processor_db_poll: std::env::var("PROCESSOR_DB_POLL") - .unwrap_or_else(|_| "5".to_string()) - .parse() - .expect("Failed to parse PROCESSOR_DB_POLL!!"), - time_advance: std::env::var("TIMING_ADVANCE") - .unwrap_or_else(|_| "0".to_string()) - .parse() - .expect("Failed to parse TIMING_ADVANCE!!"), - fail_detect_interval: std::env::var("FAIL_DETECT_INTERVAL") - .unwrap_or_else(|_| "10".to_string()) - .parse() - .expect("Failed to parse FAIL_DETECT_INTERVAL!!"), + monitor_db_poll: std::env::var("MONITOR_DB_POLL").unwrap_or_else(|_| 5.to_string()).parse().unwrap_or(5), + processor_db_poll: std::env::var("PROCESSOR_DB_POLL").unwrap_or_else(|_| 5.to_string()).parse().unwrap_or(5), + time_advance: std::env::var("TIMING_ADVANCE").unwrap_or_else(|_| 0.to_string()).parse().unwrap_or(0), + fail_detect_interval: std::env::var("FAIL_DETECT_INTERVAL").unwrap_or_else(|_| 10.to_string()).parse().unwrap_or(10), } } } diff --git a/chronos_bin/src/utils/util.rs b/chronos_bin/src/utils/util.rs index 1f21f0c..7ef0150 100644 --- a/chronos_bin/src/utils/util.rs +++ b/chronos_bin/src/utils/util.rs @@ -47,8 +47,8 @@ pub fn headers_check(headers: &BorrowedHeaders) -> bool { outcome } -pub fn get_payload_utf8<'a>(message: &'a BorrowedMessage) -> &'a [u8] { - message.payload().expect("parsing payload failed") +pub fn get_payload_utf8<'a>(message: &'a BorrowedMessage) -> Option<&'a [u8]> { + message.payload() } pub fn get_message_key(message: &BorrowedMessage) -> String { diff --git a/pg_mig/Cargo.toml b/pg_mig/Cargo.toml index 414701d..5374141 100644 --- a/pg_mig/Cargo.toml +++ b/pg_mig/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pg_mig" -version = "0.1.0" +version = "0.2.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html From cbce134c4d8f8f9f9b958ef76e000b0c377e30d7 Mon Sep 17 00:00:00 2001 From: Amninder Kaur Date: Fri, 13 Oct 2023 17:07:19 +1100 Subject: [PATCH 2/4] fix: add health check file with 0 or 1 --- .gitignore | 4 ++++ How-to.md | 1 - chronos_bin/src/message_processor.rs | 25 +++++++++++++++++-------- chronos_bin/src/runner.rs | 25 ++++++++++++++++++++++--- 4 files changed, 43 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index cf9db16..b2204e5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,10 @@ /target +<<<<<<< HEAD .env +======= +/heathcheck +>>>>>>> a5a05c4 (fix: add health check file with 0 or 1) ### Linux ### diff --git a/How-to.md b/How-to.md index 3004e21..c96fee4 100644 --- a/How-to.md +++ b/How-to.md @@ -67,7 +67,6 @@ These values are set to fine tune performance Chrono in need, refer to [Chronos] | PG_PASSWORD|admin|True | PG_DATABASE|chronos_db|True | PG_POOL_SIZE|50|True -|NODE_ID|UUID|False | DELAY_TIME|0|False | RANDOMNESS_DELAY|100|False | MONITOR_DB_POLL|5|False diff --git a/chronos_bin/src/message_processor.rs b/chronos_bin/src/message_processor.rs index ea30437..e2cac4b 100644 --- a/chronos_bin/src/message_processor.rs +++ b/chronos_bin/src/message_processor.rs @@ -16,14 +16,23 @@ pub struct MessageProcessor { impl MessageProcessor { pub async fn run(&self) { //Get UUID for the node that deployed this thread - let node_id: String = std::env::var("NODE_ID").unwrap_or_else(|_| uuid::Uuid::new_v4().to_string()); - - log::info!("MessageProcessor ON @ node_id: {}", node_id); - + let node_id: Uuid = match std::env::var("NODE_ID") { + Ok(val) => Uuid::parse_str(&val).unwrap_or_else(|_e| { + let uuid = uuid::Uuid::new_v4(); + log::info!("NODE_ID not found in env assigning {}", uuid); + uuid + }), + Err(_e) => { + log::info!("NODE_ID not found in env"); + uuid::Uuid::new_v4() + } + }; + log::info!("node_id {}", node_id); let mut delay_controller = DelayController::new(100); loop { - tokio::time::sleep(Duration::from_millis(ChronosConfig::from_env().processor_db_poll)).await; - + log::debug!("MessageProcessor loop"); + tokio::time::sleep(Duration::from_millis(10)).await; + // tokio::time::sleep(Duration::from_millis(ChronosConfig::from_env().db_poll_interval)).await; let deadline = Utc::now() - Duration::from_secs(ChronosConfig::from_env().time_advance); let params = GetReady { @@ -104,7 +113,7 @@ impl MessageProcessor { } if !ids.is_empty() { - if let Err(outcome_error) = &self.data_store.delete_fired_db(&ids).await { + if let Err(outcome_error) = &self.data_store.delete_fired(&ids).await { println!("Error: error occurred in message processor delete_fired {}", outcome_error); //add retry logic here } @@ -150,7 +159,7 @@ impl MessageProcessor { let mut retry_count = 0; loop { if retry_count < max_retries { - match &self.data_store.delete_fired_db(&ids).await { + match &self.data_store.delete_fired(&ids).await { Ok(_) => { tracing::Span::current().record("correlationId", ids.join(",")); break; diff --git a/chronos_bin/src/runner.rs b/chronos_bin/src/runner.rs index 550000e..c74ae62 100644 --- a/chronos_bin/src/runner.rs +++ b/chronos_bin/src/runner.rs @@ -1,12 +1,12 @@ use crate::kafka::consumer::KafkaConsumer; use crate::kafka::producer::KafkaProducer; -use std::sync::Arc; -use tracing::{info_span, instrument}; - use crate::message_processor::MessageProcessor; use crate::message_receiver::MessageReceiver; use crate::monitor::FailureDetector; use crate::postgres::pg::Pg; +use log::debug; +use std::fs::{create_dir, read, remove_file, write}; +use std::sync::Arc; pub struct Runner { pub consumer: Arc, @@ -46,9 +46,28 @@ impl Runner { message_receiver.run().await; }); + // check if healthcheck file exists in healthcheck dir + let healthcheck_file = "healthcheck/chronos_healthcheck"; + let healthcheck_file_exists = read(healthcheck_file).is_ok(); + if healthcheck_file_exists { + log::info!("healthcheck file exists"); + let write_resp = write(healthcheck_file, b"1"); + if write_resp.is_err() { + log::error!("error while writing to healthcheck file {:?}", write_resp); + } + } else if create_dir("healthcheck").is_ok() { + let write_resp = write(healthcheck_file, b"1"); + if write_resp.is_err() { + log::error!("error while writing to healthcheck file {:?}", write_resp); + } + } let future_tuple = futures::future::try_join3(monitor_handler, message_processor_handler, message_receiver_handler).await; if future_tuple.is_err() { log::error!("Chronos Stopping all threads {:?}", future_tuple); + let write_resp = write(healthcheck_file, b"0"); + if write_resp.is_err() { + log::error!("error while writing to healthcheck file {:?}", write_resp); + } } } } From 00f7c0597fa5e63b857575e4f99dd9c2aca474b1 Mon Sep 17 00:00:00 2001 From: Amninder Kaur Date: Tue, 24 Oct 2023 11:06:43 +1100 Subject: [PATCH 3/4] fix: refactor processor and receiver modules --- .env.example | 3 - .gitignore | 4 - How-to.md | 48 ++---- chronos_bin/src/bin/chronos.rs | 2 +- chronos_bin/src/kafka/producer.rs | 4 +- chronos_bin/src/message_processor.rs | 224 +++++++++++++-------------- chronos_bin/src/message_receiver.rs | 167 ++++++++++---------- chronos_bin/src/monitor.rs | 38 ++--- chronos_bin/src/postgres/pg.rs | 73 +++++---- chronos_bin/src/runner.rs | 29 ++-- chronos_bin/src/utils/util.rs | 57 +++---- docker-compose.yml | 126 +++++++-------- infra/otelcol-config.yml | 6 +- 13 files changed, 365 insertions(+), 416 deletions(-) diff --git a/.env.example b/.env.example index 799e49c..40a1135 100644 --- a/.env.example +++ b/.env.example @@ -57,10 +57,7 @@ TIMING_ADVANCE=0 FAIL_DETECT_INTERVAL=500 MAX_RETRIES=3 PROCESSOR_DB_POLL=10 -<<<<<<< HEAD # TRACING OTEL_SERVICE_NAME=chronos OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces -======= ->>>>>>> 6107c18 (fix: handle retry and graceful close for all threads if one is stopped) diff --git a/.gitignore b/.gitignore index b2204e5..7567c49 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,6 @@ /target -<<<<<<< HEAD .env - -======= /heathcheck ->>>>>>> a5a05c4 (fix: add health check file with 0 or 1) ### Linux ### diff --git a/How-to.md b/How-to.md index c96fee4..a7cd568 100644 --- a/How-to.md +++ b/How-to.md @@ -23,25 +23,24 @@ Use `make withenv RECIPE=docker.up` ## ENV vars All the required configurations for Chronos can be passed in environment variables mentioned below -<<<<<<< HEAD ### Required Vars |Env Var|Example Value| |----|----| |KAFKA_HOST|"localhost" |KAFKA_PORT|9093 -| KAFKA_CLIENT_ID|"chronos" -| KAFKA_GROUP_ID|"chronos" -| KAFKA_IN_TOPIC|"chronos.in" -| KAFKA_OUT_TOPIC|"chronos.out" -| KAFKA_USERNAME| -| KAFKA_PASSWORD| -| PG_HOST|localhost -| PG_PORT|5432 -| PG_USER|admin -| PG_PASSWORD|admin -| PG_DATABASE|chronos_db -| PG_POOL_SIZE|50 +|KAFKA_CLIENT_ID|"chronos" +|KAFKA_GROUP_ID|"chronos" +|KAFKA_IN_TOPIC|"chronos.in" +|KAFKA_OUT_TOPIC|"chronos.out" +|KAFKA_USERNAME| +|KAFKA_PASSWORD| +|PG_HOST|localhost +|PG_PORT|5432 +|PG_USER|admin +|PG_PASSWORD|admin +|PG_DATABASE|chronos_db +|PG_POOL_SIZE|50 ### Optional Vars These values are set to fine tune performance Chrono in need, refer to [Chronos](./README.md) @@ -51,28 +50,7 @@ These values are set to fine tune performance Chrono in need, refer to [Chronos] | PROCESSOR_DB_POLL|5 milli sec | TIMING_ADVANCE|0 sec | FAIL_DETECT_INTERVAL|10 sec -======= -|Env Var|Example Value| Required| -|----|----|----| -|KAFKA_BROKERS|"localhost:9093"|True -| KAFKA_CLIENT_ID|"chronos"|True -| KAFKA_GROUP_ID|"chronos"|True -| KAFKA_IN_TOPIC|"chronos.in"|True -| KAFKA_OUT_TOPIC|"chronos.out"|True -| KAFKA_USERNAME||True -| KAFKA_PASSWORD||True -| PG_HOST|localhost|True -| PG_PORT|5432|True -| PG_USER|admin|True -| PG_PASSWORD|admin|True -| PG_DATABASE|chronos_db|True -| PG_POOL_SIZE|50|True -| DELAY_TIME|0|False -| RANDOMNESS_DELAY|100|False -| MONITOR_DB_POLL|5|False -| TIMING_ADVANCE|0|False -| FAIL_DETECT_INTERVAL|500|False ->>>>>>> 6107c18 (fix: handle retry and graceful close for all threads if one is stopped) +| HEALTHCHECK_FILE|healthcheck/chronos_healthcheck ## Observability diff --git a/chronos_bin/src/bin/chronos.rs b/chronos_bin/src/bin/chronos.rs index 1e6cb15..0d9a36b 100644 --- a/chronos_bin/src/bin/chronos.rs +++ b/chronos_bin/src/bin/chronos.rs @@ -14,7 +14,7 @@ async fn main() { env_logger::init(); dotenvy::dotenv().ok(); - let protocol = std::env::var("TELEMETRY_PROTOCOL").unwrap_or_else(|_| "http/json".to_string()); + let protocol = std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL").unwrap_or_else(|_| "http/json".to_string()); let tracing_opentelemetry = TelemetryCollector::new(protocol, TelemetryCollectorType::Otlp); tracing_opentelemetry.register_traces(); diff --git a/chronos_bin/src/kafka/producer.rs b/chronos_bin/src/kafka/producer.rs index 3b94743..3fc3b94 100644 --- a/chronos_bin/src/kafka/producer.rs +++ b/chronos_bin/src/kafka/producer.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; use std::time::Duration; -use crate::kafka::errors::KafkaAdapterError; use crate::utils::util::into_headers; +use crate::{kafka::errors::KafkaAdapterError, utils::util::CHRONOS_ID}; use rdkafka::producer::{FutureProducer, FutureRecord}; use super::config::KafkaConfig; @@ -44,6 +44,6 @@ impl KafkaProducer { ) .await .map_err(|(kafka_error, _record)| KafkaAdapterError::PublishMessage(kafka_error, "message publishing failed".to_string()))?; - Ok(unwrap_header["chronosId"].to_string()) + Ok(unwrap_header[CHRONOS_ID].to_string()) } } diff --git a/chronos_bin/src/message_processor.rs b/chronos_bin/src/message_processor.rs index e2cac4b..8a4ced7 100644 --- a/chronos_bin/src/message_processor.rs +++ b/chronos_bin/src/message_processor.rs @@ -6,6 +6,7 @@ use chrono::Utc; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use tokio_postgres::Row; use uuid::Uuid; pub struct MessageProcessor { @@ -14,8 +15,7 @@ pub struct MessageProcessor { } impl MessageProcessor { - pub async fn run(&self) { - //Get UUID for the node that deployed this thread + fn assign_node_id() -> Uuid { let node_id: Uuid = match std::env::var("NODE_ID") { Ok(val) => Uuid::parse_str(&val).unwrap_or_else(|_e| { let uuid = uuid::Uuid::new_v4(); @@ -27,15 +27,88 @@ impl MessageProcessor { uuid::Uuid::new_v4() } }; - log::info!("node_id {}", node_id); - let mut delay_controller = DelayController::new(100); + node_id + } + + fn gather_ids(result: Result) -> String { + match result { + Ok(m) => m, + Err(e) => { + log::error!("Error: delayed message publish failed {:?}", e); + "".to_string() + } + } + } + + #[tracing::instrument(skip_all, fields(correlationId))] + async fn prepare_to_publish(&self, row: Row) -> Result { + let updated_row = TableRow { + id: row.get("id"), + deadline: row.get("deadline"), + readied_at: row.get("readied_at"), + readied_by: row.get("readied_by"), + message_headers: row.get("message_headers"), + message_key: row.get("message_key"), + message_value: row.get("message_value"), + }; + let mut headers: HashMap = match serde_json::from_str(&updated_row.message_headers.to_string()) { + Ok(t) => t, + Err(_e) => { + println!("error occurred while parsing"); + HashMap::new() + } + }; + + let readied_by_column = Some(updated_row.readied_by.to_string()); + tracing::Span::current().record("correlationId", &readied_by_column); + + match readied_by_column { + Some(id) => { + headers.insert("readied_by".to_string(), id); + if let Ok(id) = self + .producer + .kafka_publish(updated_row.message_value.to_string(), Some(headers), updated_row.message_key.to_string()) + .await + { + Ok(id) + } else { + Err("error occurred while publishing".to_string()) + } + } + None => { + log::error!("Error: readied_by not found in db row {:?}", updated_row); + Err("error occurred while publishing".to_string()) + } + } + } + + #[tracing::instrument(skip_all, fields(deleted_ids))] + async fn delete_fired_records_from_db(&self, ids: &Vec) { + //retry loop + let max_retry_count = 3; + let mut retry_count = 0; + while let Err(outcome_error) = &self.data_store.delete_fired(ids).await { + log::error!("Error: error occurred in message processor {}", outcome_error); + log::debug!("retrying"); + retry_count += 1; + if retry_count == max_retry_count { + log::error!("Error: max retry count {} reached by node {:?} for deleting fired ids ", max_retry_count, ids); + break; + } + } + } + + #[tracing::instrument(skip_all)] + async fn processor_message_ready(&self, node_id: Uuid) { loop { - log::debug!("MessageProcessor loop"); - tokio::time::sleep(Duration::from_millis(10)).await; - // tokio::time::sleep(Duration::from_millis(ChronosConfig::from_env().db_poll_interval)).await; + log::debug!("retry loop"); + // thread::sleep(Duration::from_millis(100)); + let max_retry_count = 3; + let mut retry_count = 0; + let deadline = Utc::now() - Duration::from_secs(ChronosConfig::from_env().time_advance); - let params = GetReady { + let param = GetReady { readied_at: deadline, readied_by: node_id, deadline, @@ -43,137 +116,56 @@ impl MessageProcessor { // order: "asc", }; - //retry loop - let _ = &self.processor_message_ready(¶ms).await; - - delay_controller.sleep().await; - } - } - - #[tracing::instrument(skip_all, fields(node_id, correlationId, is_published, error))] - async fn processor_message_ready(&self, params: &GetReady) { - loop { - let max_retry_count = 3; - let mut retry_count = 0; + let readied_by_column: Option = None; + let resp: Result, String> = self.data_store.ready_to_fire_db(¶m).await; + match resp { + Ok(ready_to_publish_rows) => { + if ready_to_publish_rows.is_empty() { + log::debug!("no rows ready to fire for dealine {}", deadline); + break; + } else { + let publish_futures = ready_to_publish_rows.into_iter().map(|row| self.prepare_to_publish(row)); - match &self.data_store.ready_to_fire_db(params).await { - Ok(publish_rows) => { - let rdy_to_pblsh_count = publish_rows.len(); - if rdy_to_pblsh_count > 0 { - let mut ids: Vec = Vec::with_capacity(rdy_to_pblsh_count); - let mut publish_futures = Vec::with_capacity(rdy_to_pblsh_count); - for row in publish_rows { - let updated_row = TableRow { - id: row.get("id"), - deadline: row.get("deadline"), - readied_at: row.get("readied_at"), - readied_by: row.get("readied_by"), - message_headers: row.get("message_headers"), - message_key: row.get("message_key"), - message_value: row.get("message_value"), - }; - let mut headers: HashMap = match serde_json::from_str(&updated_row.message_headers.to_string()) { - Ok(t) => t, - Err(_e) => { - println!("error occurred while parsing"); - HashMap::new() - } - }; - //TODO: handle empty headers - - let readied_by = updated_row.readied_by.to_string(); - tracing::Span::current().record("node_id", &readied_by); - headers.insert("readied_by".to_string(), readied_by); - - tracing::Span::current().record("correlationId", updated_row.id.to_string()); - - publish_futures.push(self.producer.kafka_publish( - updated_row.message_value.to_string(), - Some(headers), - updated_row.message_key.to_string(), - // updated_row.id.to_string(), - )) - } let results = futures::future::join_all(publish_futures).await; - for result in results { - match result { - Ok(m) => { - tracing::Span::current().record("is_published", "true"); - ids.push(m); - } - Err(e) => { - tracing::Span::current().record("is_published", "false"); - tracing::Span::current().record("error", &e.to_string()); - - log::error!("Error: delayed message publish failed {:?}", e); - break; - // failure detection needs to pick - } - } - } + + let ids: Vec = results.into_iter().map(Self::gather_ids).collect(); if !ids.is_empty() { - if let Err(outcome_error) = &self.data_store.delete_fired(&ids).await { - println!("Error: error occurred in message processor delete_fired {}", outcome_error); - //add retry logic here - } - // println!("delete ids {:?} and break", ids); + let _ = self.delete_fired_records_from_db(&ids).await; + log::debug!("number of rows published successfully and deleted from DB {}", ids.len()); break; } - log::debug!("number of rows published successfully and deleted from DB {}", ids.len()); - } else { - log::debug!("no rows ready to fire for dealine "); - break; } } Err(e) => { if e.contains("could not serialize access due to concurrent update") && retry_count < max_retry_count { //retry goes here - eprintln!("retrying"); + log::debug!("retrying"); retry_count += 1; if retry_count == max_retry_count { - log::error!( - "Error: max retry count {} reached by node {} for row ", - max_retry_count, - "node_id_option.unwrap()", - // row_id.unwrap() - ); + log::error!("Error: max retry count {} reached by node {:?} for row ", max_retry_count, readied_by_column); break; } - // &self.process_db_rows(¶ms).await; } log::error!("Error: error occurred in message processor while publishing {}", e); - break; } } } - - // let node_id_option: Option = node_id.clone().into(); - // let mut row_id: Option = None; } + pub async fn run(&self) { + log::info!("MessageProcessor ON!"); - #[tracing::instrument(skip_all, fields(correlationId))] - async fn clean_db(&self, ids: Vec) { - //rety in case delete fails - let max_retries = 3; - let mut retry_count = 0; + //Get UUID for the node that deployed this thread + let node_id = Self::assign_node_id(); + + log::info!("node_id {}", node_id); + let mut delay_controller = DelayController::new(100); loop { - if retry_count < max_retries { - match &self.data_store.delete_fired(&ids).await { - Ok(_) => { - tracing::Span::current().record("correlationId", ids.join(",")); - break; - } - Err(e) => { - println!("Error: error occurred in message processor delete_fired {}", e); - retry_count += 1; - continue; - } - } - } else { - log::error!("Error: max retry count {} reached by node {} for row ", max_retries, "node_id_option.unwrap()",); - break; - } + log::debug!("MessageProcessor loop"); + tokio::time::sleep(Duration::from_millis(10)).await; + self.processor_message_ready(node_id).await; + + delay_controller.sleep().await; } } } diff --git a/chronos_bin/src/message_receiver.rs b/chronos_bin/src/message_receiver.rs index 9d8caa1..93c0ea8 100644 --- a/chronos_bin/src/message_receiver.rs +++ b/chronos_bin/src/message_receiver.rs @@ -1,16 +1,13 @@ use chrono::{DateTime, Utc}; -use log::{debug, error, info, warn}; use serde_json::json; use tracing::instrument; use crate::kafka::consumer::KafkaConsumer; use crate::kafka::producer::KafkaProducer; use crate::postgres::pg::{Pg, TableInsertRow}; -use crate::utils::util::{get_message_key, get_payload_utf8, headers_check, required_headers, CHRONOS_ID, DEADLINE}; -use rdkafka::message::{BorrowedMessage, Message}; -use std::collections::HashMap; -use std::str::FromStr; -use std::sync::Arc; +use crate::utils::util::{get_message_key, get_payload_utf8, required_headers, CHRONOS_ID, DEADLINE}; +use rdkafka::message::BorrowedMessage; +use std::{collections::HashMap, str::FromStr, sync::Arc}; pub struct MessageReceiver { pub(crate) consumer: Arc, @@ -20,110 +17,102 @@ pub struct MessageReceiver { impl MessageReceiver { #[instrument(skip_all, fields(correlationId))] - pub async fn receiver_publish_to_kafka(&self, new_message: &BorrowedMessage<'_>, headers: HashMap) { - let string_payload = String::from_utf8_lossy(get_payload_utf8(new_message)).to_string(); - let message_key = get_message_key(new_message); - tracing::Span::current().record("correlationId", &message_key); - let outcome = &self.producer.kafka_publish(string_payload, Some(headers), message_key.to_string()).await; - match outcome { - Ok(_) => { - debug!("Published message to Kafka {}", &message_key); - } - Err(e) => { - error!("Failed to publish message to Kafka: {:?}", e); - // TODO check if needs to retry publishing + async fn insert_into_db( + &self, + new_message: &BorrowedMessage<'_>, + reqd_headers: HashMap, + message_deadline: DateTime, + ) -> Option { + let max_retry_count = 3; + let mut retry_count = 0; + //retry loop + loop { + if let Some(payload) = get_payload_utf8(new_message) { + if let Ok(message_value) = &serde_json::from_slice(payload) { + if let Some(message_key) = get_message_key(new_message) { + let params = TableInsertRow { + id: &reqd_headers[CHRONOS_ID], + deadline: message_deadline, + message_headers: &json!(&reqd_headers), + message_key: message_key.as_str(), + message_value, + }; + + if let Err(e) = self.data_store.insert_to_delay_db(¶ms).await { + log::error!("insert to delay failed {}", e); + retry_count += 1; + if retry_count == max_retry_count { + return Some("max retry count reached for insert to delay query".to_string()); + } + continue; + } + tracing::Span::current().record("correlationId", &message_key); + } + + log::debug!("Message publish success {:?}", new_message); + return None; + } else { + return Some("json conversion of payload failed".to_string()); + } + } else { + return Some("message payload is not utf8 encoded".to_string()); } } } #[instrument(skip_all, fields(correlationId))] - pub async fn receiver_insert_to_db(&self, new_message: &BorrowedMessage<'_>, headers: HashMap, deadline: DateTime) { - let result_value = &serde_json::from_slice(get_payload_utf8(new_message)); - let payload = match result_value { - Ok(payload) => payload, - Err(e) => { - error!("de-ser failed for payload: {:?}", e); - return; - } - }; - - let message_key = get_message_key(new_message); - tracing::Span::current().record("correlationId", &message_key); - - let params = TableInsertRow { - id: &headers[CHRONOS_ID], - deadline, - message_headers: &json!(&headers), - message_key: message_key.as_str(), - message_value: payload, - }; - let _insert_time = Instant::now(); - - //retry - let total_retry_count = 3; - let mut retry_count = 0; - loop { - match self.data_store.insert_to_delay_db(¶ms).await { - Ok(_) => { - break; - } - Err(e) => { - error!("insert_to_delay failed: {:?} retrying again", e); - retry_count += 1; - if retry_count == total_retry_count { - error!("max retry count {} exceeded aborting insert_to_db for {}", total_retry_count, message_key); - break; + async fn prepare_and_publish(&self, message: &BorrowedMessage<'_>, reqd_headers: HashMap) -> Option { + match get_payload_utf8(message) { + Some(string_payload) => { + if let Some(message_key) = get_message_key(message) { + let string_payload = String::from_utf8_lossy(string_payload).to_string(); + tracing::Span::current().record("correlationId", &message_key); + if let Err(e) = &self.producer.kafka_publish(string_payload, Some(reqd_headers.clone()), message_key).await { + return Some(format!("publish failed for received message {:?} with error :: {}", message, e)); } + } else { + return Some("message key not found".to_string()); } } - } + None => return None, + }; + None } - #[tracing::instrument(name = "receiver_handle_message", skip_all, fields(correlationId))] + #[tracing::instrument(name = "receiver_handle_message", skip_all, fields(correlationId, error))] pub async fn handle_message(&self, message: &BorrowedMessage<'_>) { - if headers_check(message.headers().unwrap()) { - let new_message = &message; - - if let Some(headers) = required_headers(new_message) { - tracing::Span::current().record("correlationId", &headers[CHRONOS_ID]); - let message_deadline: DateTime = match DateTime::::from_str(&headers[DEADLINE]) { - Ok(d) => d, - Err(e) => { - error!("failed to parse deadline: {}", e); - return; - } - }; - + let new_message = &message; + if let Some(reqd_headers) = required_headers(new_message) { + tracing::Span::current().record("correlationId", &reqd_headers[CHRONOS_ID]); + if let Ok(message_deadline) = DateTime::::from_str(&reqd_headers[DEADLINE]) { if message_deadline <= Utc::now() { - debug!("message deadline is in the past, sending directly to out_topic"); - // direct_sent_count += 1; - self.receiver_publish_to_kafka(new_message, headers).await - } else { - debug!("message deadline is in the future, sending to kafka"); - // db_insert_count += 1; - - self.receiver_insert_to_db(new_message, headers, message_deadline).await - // println!("insert took: {:?}", insert_time.elapsed()) + if let Some(err) = self.prepare_and_publish(new_message, reqd_headers).await { + log::error!("{}", err); + tracing::Span::current().record("error", &err); + } + } else if let Some(err_string) = self.insert_into_db(new_message, reqd_headers, message_deadline).await { + log::error!("{}", err_string); + tracing::Span::current().record("error", &err_string); } - } else { - warn!("message with improper headers on inbox.topic "); } - } else { - warn!("message with improper headers on inbox.topic "); } - // println!("{direct_sent_count} messages sent directly and {db_insert_count} added to db from total of {total_count} "); } pub async fn run(&self) { - info!("MessageReceiver ON!"); + log::info!("MessageReceiver ON!"); let _ = &self.consumer.subscribe().await; - // let mut total_count = 0; - // let mut direct_sent_count = 0; - // let mut db_insert_count = 0; loop { - if let Ok(message) = &self.consumer.kafka_consume_message().await { - self.handle_message(message).await; + match &self.consumer.kafka_consume_message().await { + Ok(message) => { + self.handle_message(message).await; + } + Err(e) => { + log::error!("error while consuming message {:?}", e); + } } + // if let Ok(message) = &self.consumer.kafka_consume_message().await { + // self.handle_message(message).await; + // } } } } diff --git a/chronos_bin/src/monitor.rs b/chronos_bin/src/monitor.rs index 474451e..aaaffd3 100644 --- a/chronos_bin/src/monitor.rs +++ b/chronos_bin/src/monitor.rs @@ -9,40 +9,42 @@ pub struct FailureDetector { pub(crate) data_store: Arc, } -//Needs to accept the poll time impl FailureDetector { - // #[instrument] pub async fn run(&self) { log::info!("Monitoring On!"); loop { - // TODO multiple rows are fetched, what to track in the monitor? - let _ = tokio::time::sleep(Duration::from_secs(ChronosConfig::from_env().monitor_db_poll)).await; - let _ = &self.monitor_failed().await; + let _ = &self.monitor_failed_fire_records().await; + } + } + + #[tracing::instrument(skip_all, fields(error))] + async fn reset_to_init_db(&self, fetched_rows: &std::vec::Vec) { + if !fetched_rows.is_empty() { + if let Err(e) = &self.data_store.reset_to_init_db(fetched_rows).await { + tracing::Span::current().record("error", e); + log::error!("error in monitor reset_to_init {}", e); + } else { + log::debug!("reset_to_init_db success for {:?}", fetched_rows) + } } } - #[tracing::instrument(skip_all, fields(message_key, error, monitoring_len))] - async fn monitor_failed(&self) { + + #[tracing::instrument(skip_all, fields(error, fail_to_fire_rows))] + async fn monitor_failed_fire_records(&self) { match &self .data_store .failed_to_fire_db(&(Utc::now() - Duration::from_secs(ChronosConfig::from_env().fail_detect_interval))) .await { Ok(fetched_rows) => { - if !fetched_rows.is_empty() { - if let Err(e) = &self.data_store.reset_to_init_db(fetched_rows).await { - tracing::Span::current().record("error", e); - println!("error in monitor reset_to_init {}", e); - } - tracing::Span::current().record("monitoring_len", fetched_rows.len()); - // TODO Need to monitor the node that redied but never fired - } else { - tracing::Span::current().record("monitoring_len", "empty"); - } + tracing::Span::current().record("fail_to_fire_rows", fetched_rows.len()); + self.reset_to_init_db(fetched_rows).await; } Err(e) => { - println!("error in monitor {}", e); + log::error!("error in monitor {}", e); + tracing::Span::current().record("error", e.to_string()); } } } diff --git a/chronos_bin/src/postgres/pg.rs b/chronos_bin/src/postgres/pg.rs index aa50e50..34318d0 100644 --- a/chronos_bin/src/postgres/pg.rs +++ b/chronos_bin/src/postgres/pg.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; use deadpool_postgres::{Config, GenericClient, ManagerConfig, Object, Pool, PoolConfig, Runtime, Transaction}; -use log::{error, info}; +use log::error; use std::time::{Duration, Instant}; use tokio_postgres::error::SqlState; use tokio_postgres::types::ToSql; @@ -179,7 +179,7 @@ impl Pg { log::warn!("insert_to_delay query_execute_instant: {:?} ", time_elapsed); } - if outcome.is_ok() { + if outcome > 0 { event!(tracing::Level::INFO, "insert_to_delay success"); let cmt_rdy = pg_txn.txn.commit().await; if let Err(e) = cmt_rdy { @@ -274,40 +274,45 @@ impl Pg { let ready_query = "UPDATE hanger SET readied_at = $1, readied_by = $2 where deadline < $3 AND readied_at IS NULL RETURNING id, deadline, readied_at, readied_by, message_headers, message_key, message_value"; - if let Ok(stmt) = pg_txn.txn.prepare(ready_query).await { - let query_execute_instant = Instant::now(); - let response = pg_txn.txn.query(&stmt, &[¶m.readied_at, ¶m.readied_by, ¶m.deadline]).await; - - match response { - Ok(resp) => { - let cmt_rdy = pg_txn.txn.commit().await; - if let Err(e) = cmt_rdy { - error!("Unable to commit: {}. The original transaction updated: {:?} rows", e, resp); - return Err(format!( - "ready_to_fire: Unable to commit: {}. The original transaction updated: {:?} rows", - e, resp - )); - } - let time_elapsed = query_execute_instant.elapsed(); - if time_elapsed > Duration::from_millis(100) { - log::warn!(" ready_to_fire query_execute_instant: {:?} params: {:?}", time_elapsed, param); - } - Ok(resp) + // println!("ready_query: {}", ready_query); + // if let Ok(stmt) = pg_txn.txn.prepare(ready_query).await { + let stmt = match pg_txn.txn.prepare(ready_query).await { + Ok(stmt) => stmt, + Err(e) => { + error!("ready_to_fire: Unable to prepare query: {}", e); + return Err(format!("ready_to_fire: Unable to prepare query: {}", e)); + } + }; + + let query_execute_instant = Instant::now(); + let response = pg_txn.txn.query(&stmt, &[¶m.readied_at, ¶m.readied_by, ¶m.deadline]).await; + + match response { + Ok(resp) => { + let cmt_rdy = pg_txn.txn.commit().await; + if let Err(e) = cmt_rdy { + error!("Unable to commit: {}. The original transaction updated: {:?} rows", e, resp); + return Err(format!( + "ready_to_fire: Unable to commit: {}. The original transaction updated: {:?} rows", + e, resp + )); } - Err(e) => { - if let Some(err_code) = e.code() { - if err_code == &SqlState::T_R_SERIALIZATION_FAILURE { - error!("ready_to_fire: Unable to execute txn due to : {}", e); - return Err(format!("ready_to_fire: Unable to execute txn due to : {}", e)); - } + let time_elapsed = query_execute_instant.elapsed(); + if time_elapsed > Duration::from_millis(100) { + log::warn!(" ready_to_fire query_execute_instant: {:?} params: {:?}", time_elapsed, param); + } + Ok(resp) + } + Err(e) => { + if let Some(err_code) = e.code() { + if err_code == &SqlState::T_R_SERIALIZATION_FAILURE { + error!("ready_to_fire: Unable to execute txn due to : {}", e); + return Err(format!("ready_to_fire: Unable to execute txn due to : {}", e)); } - error!("ready_to_fire: Unknow exception {:?}", e); - Err(format!("ready_to_fire: Unknow exception {:?}", e)) } + error!("ready_to_fire: Unknow exception {:?}", e); + Err(format!("ready_to_fire: Unknow exception {:?}", e)) } - } else { - error!("ready_to_fire: Unable to prepare query"); - Err("ready_to_fire: Unable to prepare query".to_string()) } } @@ -318,7 +323,7 @@ impl Pg { let mut pg_access = PgAccess { client: pg_client }; let pg_txn: PgTxn = pg_access.build_txn().await?; - log::info!("failed_to_fire delay_time: {:?}", delay_time); + log::debug!("failed_to_fire param delay_time: {:?}", delay_time); let get_query = "SELECT * from hanger where readied_at > $1 ORDER BY deadline DESC"; let stmt = pg_txn.txn.prepare(get_query).await?; @@ -370,6 +375,8 @@ impl Pg { }; query += ")"; + println!("query: {}", query); + let stmt = match pg_txn.txn.prepare(query.as_str()).await { Ok(stmt) => stmt, Err(e) => { diff --git a/chronos_bin/src/runner.rs b/chronos_bin/src/runner.rs index c74ae62..563382a 100644 --- a/chronos_bin/src/runner.rs +++ b/chronos_bin/src/runner.rs @@ -4,8 +4,7 @@ use crate::message_processor::MessageProcessor; use crate::message_receiver::MessageReceiver; use crate::monitor::FailureDetector; use crate::postgres::pg::Pg; -use log::debug; -use std::fs::{create_dir, read, remove_file, write}; +use std::fs::{create_dir, read, write}; use std::sync::Arc; pub struct Runner { @@ -47,27 +46,29 @@ impl Runner { }); // check if healthcheck file exists in healthcheck dir - let healthcheck_file = "healthcheck/chronos_healthcheck"; - let healthcheck_file_exists = read(healthcheck_file).is_ok(); + let healthcheck_file = std::env::var("HEALTHCHECK_FILE").unwrap_or_else(|_| "healthcheck/chronos_healthcheck".to_string()); + let healthcheck_file_exists = read(&healthcheck_file).is_ok(); if healthcheck_file_exists { log::info!("healthcheck file exists"); - let write_resp = write(healthcheck_file, b"1"); + let write_resp = write(&healthcheck_file, b"1"); if write_resp.is_err() { log::error!("error while writing to healthcheck file {:?}", write_resp); } } else if create_dir("healthcheck").is_ok() { - let write_resp = write(healthcheck_file, b"1"); - if write_resp.is_err() { - log::error!("error while writing to healthcheck file {:?}", write_resp); - } - } - let future_tuple = futures::future::try_join3(monitor_handler, message_processor_handler, message_receiver_handler).await; - if future_tuple.is_err() { - log::error!("Chronos Stopping all threads {:?}", future_tuple); - let write_resp = write(healthcheck_file, b"0"); + let write_resp = write(&healthcheck_file, b"1"); if write_resp.is_err() { log::error!("error while writing to healthcheck file {:?}", write_resp); } } + message_receiver_handler.await.unwrap(); + // futures::future::join(monitor_handler); + // let future_tuple = futures::future::try_join3(monitor_handler, message_processor_handler, message_receiver_handler).await; + // if future_tuple.is_err() { + // log::error!("Chronos Stopping all threads {:?}", future_tuple); + // let write_resp = write(&healthcheck_file, b"0"); + // if write_resp.is_err() { + // log::error!("error while writing to healthcheck file {:?}", write_resp); + // } + // } } } diff --git a/chronos_bin/src/utils/util.rs b/chronos_bin/src/utils/util.rs index 7ef0150..b9375ae 100644 --- a/chronos_bin/src/utils/util.rs +++ b/chronos_bin/src/utils/util.rs @@ -5,23 +5,24 @@ use std::collections::HashMap; pub static CHRONOS_ID: &str = "chronosMessageId"; pub static DEADLINE: &str = "chronosDeadline"; -//TODO check correctness for two headers in this method pub fn required_headers(message: &BorrowedMessage) -> Option> { if let Some(headers) = message.headers() { - let reqd_headers = headers.iter().fold(HashMap::::new(), |mut acc, header| { - let key: String = match header.key.parse() { - Ok(key) => key, - Err(e) => { - log::error!("Error parsing header key: {}", e); - return acc; + if headers_check(headers) { + let reqd_headers = headers.iter().fold(HashMap::::new(), |mut acc, header| { + if let Ok(key) = header.key.parse() { + if let Some(value) = header.value { + let value: String = String::from_utf8_lossy(value).into_owned(); + acc.insert(key, value); + acc + } else { + acc + } + } else { + acc } - }; - let value: String = String::from_utf8_lossy(header.value.expect("utf8 parsing for header value failed")).into_owned(); - - acc.insert(key, value); - acc - }); - return Some(reqd_headers); + }); + return Some(reqd_headers); + } } None } @@ -35,38 +36,24 @@ pub fn into_headers(headers: &HashMap) -> OwnedHeaders { } pub fn headers_check(headers: &BorrowedHeaders) -> bool { + // println!("headers_check {:?}", headers); let outcome = headers .iter() .filter(|h| { let header_keys = [CHRONOS_ID, DEADLINE]; header_keys.contains(&h.key) && h.value.is_some() }) - .count() - == 2; + .count(); - outcome + outcome == 2 } pub fn get_payload_utf8<'a>(message: &'a BorrowedMessage) -> Option<&'a [u8]> { message.payload() } -pub fn get_message_key(message: &BorrowedMessage) -> String { - let key = String::from_utf8_lossy(message.key().expect("No key found for message")).to_string(); - key -} - -pub fn get_chronos_id(headers: &BorrowedHeaders) -> String { - let value = headers - .iter() - .find(|h| { - let header_keys = [CHRONOS_ID]; - header_keys.contains(&h.key) && h.value.is_some() - }) - .expect("No chronosId found for message") - .value - .expect("No chronosId found for message"); - - String::from_utf8_lossy(value).into_owned() - // return value; +pub fn get_message_key(message: &BorrowedMessage) -> Option { + message.key().map(|key| String::from_utf8_lossy(key).to_string()) + // let key = String::from_utf8_lossy(.expect("No key found for message")).to_string(); + // key } diff --git a/docker-compose.yml b/docker-compose.yml index 39f2789..0f4f21f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,18 +2,18 @@ version: '3.1' services: #postgres DB - postgres: - image: postgres:13.3 - ports: - - 5432:5432 - volumes: - - postgres:/var/lib/postgresql/data/ - environment: - POSTGRES_USER: admin - POSTGRES_PASSWORD: admin - POSTGRES_DB: chronos_db - networks: - - chronos + # postgres: + # image: postgres:13.3 + # ports: + # - 5432:5432 + # volumes: + # - postgres:/var/lib/postgresql/data/ + # environment: + # POSTGRES_USER: admin + # POSTGRES_PASSWORD: admin + # POSTGRES_DB: chronos_db + # networks: + # - chronos # migration / init container # chronos-pg-mig: # image: mig @@ -33,59 +33,59 @@ services: # depends_on: # - postgres - zookeeper: - image: bitnami/zookeeper:3.7.0 - ports: - - 2180:2181 - volumes: - - zookeeper:/bitnami/zookeeper - environment: - ALLOW_ANONYMOUS_LOGIN: "yes" - networks: - - chronos + # zookeeper: + # image: bitnami/zookeeper:3.7.0 + # ports: + # - 2180:2181 + # volumes: + # - zookeeper:/bitnami/zookeeper + # environment: + # ALLOW_ANONYMOUS_LOGIN: "yes" + # networks: + # - chronos - kafka: - image: bitnami/kafka:2.8.0 - ports: - - 9092:9092 - - 9093:9093 - - 9094:9094 - volumes: - - kafka:/bitnami/kafka - - ./infra:/opt/infra - environment: - KAFKA_BROKER_ID: "1" - KAFKA_CFG_LISTENERS: "INTERNAL://:9092, EXTERNAL://:9093, K8S://:9094" - KAFKA_CFG_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092, EXTERNAL://localhost:9093" - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT, K8S:PLAINTEXT" - KAFKA_CFG_ZOOKEEPER_CONNECT: "zookeeper:2181" - KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL" - KAFKA_INTER_BROKER_USER: "admin" - KAFKA_INTER_BROKER_PASSWORD: "admin-secret" - KAFKA_CFG_NUM_PARTITIONS: "1" - KAFKA_LOG_RETENTION_BYTES: -1 - KAFKA_LOG_RETENTION_MS: -1 - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" - KAFKA_CFG_SUPER_USERS: "User:admin" - KAFKA_CLIENT_USERS: "admin,kafdrop" - KAFKA_CLIENT_PASSWORDS: "admin-secret,admin-secret" - ALLOW_PLAINTEXT_LISTENER: "yes" - networks: - - chronos - depends_on: - - zookeeper + # kafka: + # image: bitnami/kafka:2.8.0 + # ports: + # - 9092:9092 + # - 9093:9093 + # - 9094:9094 + # volumes: + # - kafka:/bitnami/kafka + # - ./infra:/opt/infra + # environment: + # KAFKA_BROKER_ID: "1" + # KAFKA_CFG_LISTENERS: "INTERNAL://:9092, EXTERNAL://:9093, K8S://:9094" + # KAFKA_CFG_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092, EXTERNAL://localhost:9093" + # KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT, K8S:PLAINTEXT" + # KAFKA_CFG_ZOOKEEPER_CONNECT: "zookeeper:2181" + # KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL" + # KAFKA_INTER_BROKER_USER: "admin" + # KAFKA_INTER_BROKER_PASSWORD: "admin-secret" + # KAFKA_CFG_NUM_PARTITIONS: "1" + # KAFKA_LOG_RETENTION_BYTES: -1 + # KAFKA_LOG_RETENTION_MS: -1 + # KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" + # KAFKA_CFG_SUPER_USERS: "User:admin" + # KAFKA_CLIENT_USERS: "admin,kafdrop" + # KAFKA_CLIENT_PASSWORDS: "admin-secret,admin-secret" + # ALLOW_PLAINTEXT_LISTENER: "yes" + # networks: + # - chronos + # depends_on: + # - zookeeper - kowl: - image: quay.io/cloudhut/kowl:master - ports: - - 9091:8080 - environment: - KAFKA_BROKERS: "kafka:9092" - networks: - - chronos - depends_on: - - kafka - - zookeeper + # kowl: + # image: quay.io/cloudhut/kowl:master + # ports: + # - 9091:8080 + # environment: + # KAFKA_BROKERS: "kafka:9092" + # networks: + # - chronos + # depends_on: + # - kafka + # - zookeeper # chronos # chronos-delay-scheduler: diff --git a/infra/otelcol-config.yml b/infra/otelcol-config.yml index bf3193e..41af551 100644 --- a/infra/otelcol-config.yml +++ b/infra/otelcol-config.yml @@ -41,11 +41,11 @@ extensions: health_check: pprof: endpoint: :1888 - zpages: - endpoint: :55679 + # zpages: + # endpoint: :55679 service: - extensions: [pprof, zpages, health_check] + extensions: [pprof, health_check] pipelines: traces: receivers: [otlp] From d4ce18ba3df8ef81abc00884fbb00fb64b0a4e4f Mon Sep 17 00:00:00 2001 From: Amninder Kaur Date: Wed, 25 Oct 2023 17:38:03 +1100 Subject: [PATCH 4/4] fix: updates to fail fast and capture the right ids --- chronos_bin/src/message_processor.rs | 21 ++++++--------------- chronos_bin/src/postgres/pg.rs | 2 +- chronos_bin/src/runner.rs | 18 ++++++++---------- 3 files changed, 15 insertions(+), 26 deletions(-) diff --git a/chronos_bin/src/message_processor.rs b/chronos_bin/src/message_processor.rs index 8a4ced7..9e03172 100644 --- a/chronos_bin/src/message_processor.rs +++ b/chronos_bin/src/message_processor.rs @@ -30,16 +30,6 @@ impl MessageProcessor { node_id } - fn gather_ids(result: Result) -> String { - match result { - Ok(m) => m, - Err(e) => { - log::error!("Error: delayed message publish failed {:?}", e); - "".to_string() - } - } - } - #[tracing::instrument(skip_all, fields(correlationId))] async fn prepare_to_publish(&self, row: Row) -> Result { let updated_row = TableRow { @@ -54,8 +44,8 @@ impl MessageProcessor { let mut headers: HashMap = match serde_json::from_str(&updated_row.message_headers.to_string()) { Ok(t) => t, Err(_e) => { - println!("error occurred while parsing"); - HashMap::new() + log::error!("error occurred while parsing"); + return Err("error occurred while parsing headers field".to_string()); } }; @@ -75,6 +65,7 @@ impl MessageProcessor { Err("error occurred while publishing".to_string()) } } + None => { log::error!("Error: readied_by not found in db row {:?}", updated_row); Err("error occurred while publishing".to_string()) @@ -89,7 +80,6 @@ impl MessageProcessor { let mut retry_count = 0; while let Err(outcome_error) = &self.data_store.delete_fired(ids).await { log::error!("Error: error occurred in message processor {}", outcome_error); - log::debug!("retrying"); retry_count += 1; if retry_count == max_retry_count { log::error!("Error: max retry count {} reached by node {:?} for deleting fired ids ", max_retry_count, ids); @@ -128,7 +118,9 @@ impl MessageProcessor { let results = futures::future::join_all(publish_futures).await; - let ids: Vec = results.into_iter().map(Self::gather_ids).collect(); + // closure to gather ids from results vector and ignore error from result + + let ids: Vec = results.into_iter().filter_map(|result| result.ok()).collect(); if !ids.is_empty() { let _ = self.delete_fired_records_from_db(&ids).await; @@ -140,7 +132,6 @@ impl MessageProcessor { Err(e) => { if e.contains("could not serialize access due to concurrent update") && retry_count < max_retry_count { //retry goes here - log::debug!("retrying"); retry_count += 1; if retry_count == max_retry_count { log::error!("Error: max retry count {} reached by node {:?} for row ", max_retry_count, readied_by_column); diff --git a/chronos_bin/src/postgres/pg.rs b/chronos_bin/src/postgres/pg.rs index 34318d0..9e39462 100644 --- a/chronos_bin/src/postgres/pg.rs +++ b/chronos_bin/src/postgres/pg.rs @@ -82,7 +82,7 @@ impl PgAccess { impl Pg { pub async fn new(pg_config: PgConfig) -> Result { - let port = pg_config.port.parse::().unwrap_or(0); // make the connection fail and send pack PgError + let port = pg_config.port.parse::().unwrap(); // make the connection fail let mut config = Config::new(); config.dbname = Some(pg_config.database); config.user = Some(pg_config.user); diff --git a/chronos_bin/src/runner.rs b/chronos_bin/src/runner.rs index 563382a..7431ec7 100644 --- a/chronos_bin/src/runner.rs +++ b/chronos_bin/src/runner.rs @@ -60,15 +60,13 @@ impl Runner { log::error!("error while writing to healthcheck file {:?}", write_resp); } } - message_receiver_handler.await.unwrap(); - // futures::future::join(monitor_handler); - // let future_tuple = futures::future::try_join3(monitor_handler, message_processor_handler, message_receiver_handler).await; - // if future_tuple.is_err() { - // log::error!("Chronos Stopping all threads {:?}", future_tuple); - // let write_resp = write(&healthcheck_file, b"0"); - // if write_resp.is_err() { - // log::error!("error while writing to healthcheck file {:?}", write_resp); - // } - // } + let future_tuple = futures::future::try_join3(monitor_handler, message_processor_handler, message_receiver_handler).await; + if future_tuple.is_err() { + log::error!("Chronos Stopping all threads {:?}", future_tuple); + let write_resp = write(&healthcheck_file, b"0"); + if write_resp.is_err() { + log::error!("error while writing to healthcheck file {:?}", write_resp); + } + } } }