Skip to content

Commit

Permalink
feat: Add abort reason to CapturedState. (#71)
Browse files Browse the repository at this point in the history
* feat: Add abort reason to CapturedState.

* feat: Make re-usable KafkaConfig structure. (#72)
  • Loading branch information
fmarek-kindred authored Aug 22, 2023
1 parent a447b57 commit c802f0c
Show file tree
Hide file tree
Showing 38 changed files with 829 additions and 715 deletions.
352 changes: 224 additions & 128 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions examples/agent_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@ edition = "2021"

[dev-dependencies]

talos_agent = { path = "../../packages/talos_agent" }
examples_support = { path = "../../packages/examples_support" }
talos_agent = { path = "../../packages/talos_agent" }
talos_rdkafka_utils = { path = "../../packages/talos_rdkafka_utils" }
examples_support = { path = "../../packages/examples_support" }

async-channel = { version = "1.8.0" }
async-trait = { workspace = true }
env_logger = { workspace = true }
log = { workspace = true }
rand = { version = "0.8.5" }
rdkafka = { version = "0.29.0", features = ["sasl"] }
rdkafka = { version = "0.33.2", features = ["sasl"] }
rdkafka-sys = { version = "4.3.0" }
serde = { workspace = true }
serde_json = { workspace = true }
strum = { version = "0.24", features = ["derive"] }
strum = { version = "0.25", features = ["derive"] }
tokio = { workspace = true, features = ["full"] }
uuid = { version = "1.2.2", features = ["v4"] }
time = { version = "0.3.17" }
124 changes: 31 additions & 93 deletions examples/agent_client/examples/agent_client.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use async_channel::Receiver;
use examples_support::load_generator::generator::ControlledRateLoadGenerator;
use examples_support::load_generator::models::{Generator, StopType};
use std::num::ParseIntError;
use std::{env, sync::Arc, time::Duration};

use rdkafka::config::RDKafkaLogLevel;
use std::collections::HashMap;
use std::env::{var, VarError};
use std::{env, sync::Arc, time::Duration};
use talos_agent::agent::core::TalosAgentImpl;
use talos_agent::agent::model::{CancelRequestChannelMessage, CertifyRequestChannelMessage};
use talos_agent::api::{AgentConfig, CandidateData, CertificationRequest, CertificationResponse, KafkaConfig, TalosAgent, TalosType};
use talos_agent::api::{AgentConfig, CandidateData, CertificationRequest, CertificationResponse, TalosAgent, TalosType};
use talos_agent::messaging::api::DecisionMessage;
use talos_agent::messaging::kafka::KafkaInitializer;
use talos_agent::metrics::client::MetricsClient;
use talos_agent::metrics::core::Metrics;
use talos_agent::metrics::model::Signal;
use talos_agent::mpsc::core::{ReceiverWrapper, SenderWrapper};
use talos_rdkafka_utils::kafka_config::KafkaConfig;
use time::OffsetDateTime;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
Expand All @@ -23,8 +22,6 @@ use uuid::Uuid;

#[derive(Clone)]
struct LaunchParams {
stop_max_empty_checks: u64,
stop_check_delay: Duration,
stop_type: StopType,
target_rate: u64,
threads: u64,
Expand All @@ -51,7 +48,7 @@ async fn certify() -> Result<(), String> {
// give this to stop controller
let rx_generated_ref = Arc::clone(&rx_generated);

let h_stop_controller: JoinHandle<Result<(), String>> = create_stop_controller(params.clone(), rx_generated_ref);
let h_monitor: JoinHandle<Result<(), String>> = create_queue_monitor(rx_generated_ref);

let h_agent_workers = init_workers(params.clone(), rx_generated);

Expand All @@ -61,15 +58,11 @@ async fn certify() -> Result<(), String> {
});

let all_async_services = tokio::spawn(async move {
let result = try_join!(h_workload_generator, h_agent_workers);
let result = try_join!(h_workload_generator, h_agent_workers, h_monitor);
log::info!("Result from the services ={result:?}");
});

tokio::select! {
_ = h_stop_controller => {
log::info!("Stop controller is active...");
}

_ = all_async_services => {}

// CTRL + C termination signal
Expand Down Expand Up @@ -132,24 +125,6 @@ fn init_workers(params: LaunchParams, queue: Arc<Receiver<(CertificationRequest,
})
}

fn get_kafka_log_level_from_env() -> Result<RDKafkaLogLevel, String> {
match read_var("KAFKA_LOG_LEVEL") {
Ok(level) => match level.to_lowercase().as_str() {
"alert" => Ok(RDKafkaLogLevel::Alert),
"critical" => Ok(RDKafkaLogLevel::Critical),
"debug" => Ok(RDKafkaLogLevel::Debug),
"emerg" => Ok(RDKafkaLogLevel::Emerg),
"error" => Ok(RDKafkaLogLevel::Error),
"info" => Ok(RDKafkaLogLevel::Info),
"notice" => Ok(RDKafkaLogLevel::Notice),
"warning" => Ok(RDKafkaLogLevel::Warning),
_ => Ok(RDKafkaLogLevel::Info),
},

Err(e) => Err(e),
}
}

fn load_configs() -> Result<(AgentConfig, KafkaConfig), String> {
let cfg_agent = AgentConfig {
agent: read_var("AGENT_NAME").unwrap(),
Expand All @@ -158,42 +133,28 @@ fn load_configs() -> Result<(AgentConfig, KafkaConfig), String> {
timeout_ms: read_var("AGENT_TIMEOUT_MS").unwrap().parse().unwrap(),
};

let cfg_kafka = KafkaConfig {
brokers: read_var("KAFKA_BROKERS")?,
group_id: read_var("KAFKA_GROUP_ID")?,
certification_topic: read_var("KAFKA_TOPIC")?,
fetch_wait_max_ms: read_var("KAFKA_FETCH_WAIT_MAX_MS")?.parse().map_err(|e: ParseIntError| e.to_string())?,
message_timeout_ms: read_var("KAFKA_MESSAGE_TIMEOUT_MS")?.parse().map_err(|e: ParseIntError| e.to_string())?,
enqueue_timeout_ms: read_var("KAFKA_ENQUEUE_TIMEOUT_MS")?.parse().map_err(|e: ParseIntError| e.to_string())?,
log_level: get_kafka_log_level_from_env()?,
talos_type: TalosType::External,
sasl_mechanisms: read_var_optional("KAFKA_SASL_MECHANISMS")?,
username: read_var_optional("KAFKA_USERNAME")?,
password: read_var_optional("KAFKA_PASSWORD")?,
};
let mut cfg_kafka = KafkaConfig::from_env(Some("AGENT"));
let more_producer_values = [
("message.timeout.ms".to_string(), "15000".to_string()),
("queue.buffering.max.messages".to_string(), "1000000".to_string()),
("topic.metadata.refresh.interval.ms".to_string(), "5".to_string()),
("socket.keepalive.enable".to_string(), "true".to_string()),
("acks".to_string(), "0".to_string()),
];

let more_consumer_values = [
("enable.auto.commit".to_string(), "false".to_string()),
("auto.offset.reset".to_string(), "latest".to_string()),
("fetch.wait.max.ms".to_string(), "600".to_string()),
("socket.keepalive.enable".to_string(), "true".to_string()),
("acks".to_string(), "0".to_string()),
];

cfg_kafka.extend(Some(HashMap::from(more_producer_values)), Some(HashMap::from(more_consumer_values)));

Ok((cfg_agent, cfg_kafka))
}

fn read_var_optional(name: &str) -> Result<Option<String>, String> {
match var(name) {
Ok(value) => {
if value.is_empty() {
Ok(None)
} else {
Ok(Some(value.trim().to_string()))
}
}
Err(e) => match e {
VarError::NotPresent => {
log::info!("Environment variable is not found: \"{}\"", name);
Ok(None)
}
VarError::NotUnicode(_) => Err(format!("Environment variable is not unique: \"{}\"", name)),
},
}
}

fn read_var(name: &str) -> Result<String, String> {
match var(name) {
Ok(value) => {
Expand Down Expand Up @@ -225,7 +186,7 @@ async fn make_agent(params: LaunchParams) -> impl TalosAgent {
let tx_cancel = SenderWrapper::<CancelRequestChannelMessage> { tx: tx_cancel_ch };
let rx_cancel = ReceiverWrapper::<CancelRequestChannelMessage> { rx: rx_cancel_ch };

let (publisher, consumer) = KafkaInitializer::connect(cfg_agent.agent.clone(), cfg_kafka)
let (publisher, consumer) = KafkaInitializer::connect(cfg_agent.agent.clone(), cfg_kafka, TalosType::External)
.await
.expect("Cannot connect to kafka...");

Expand Down Expand Up @@ -264,30 +225,17 @@ async fn make_agent(params: LaunchParams) -> impl TalosAgent {
agent
}

fn create_stop_controller(params: LaunchParams, queue: Arc<Receiver<(CertificationRequest, f64)>>) -> JoinHandle<Result<(), String>> {
fn create_queue_monitor(queue: Arc<Receiver<(CertificationRequest, f64)>>) -> JoinHandle<Result<(), String>> {
tokio::spawn(async move {
let mut remaining_checks = params.stop_max_empty_checks;
loop {
tokio::time::sleep(params.stop_check_delay).await;
tokio::time::sleep(Duration::from_secs(10)).await;
if queue.is_empty() {
log::info!(
"There are no more items to process, finalising in {} sec",
remaining_checks * params.stop_check_delay.as_secs()
);
remaining_checks -= 1;
if remaining_checks == 0 {
break;
}
} else {
let len = queue.len();
log::info!("Items remaining to process: {}", len);
remaining_checks = params.stop_max_empty_checks;
continue;
}
}

queue.close();

Err("Signal from StopController".into())
let len = queue.len();
log::info!("Items remaining to process: {}", len);
}
})
}

Expand All @@ -296,8 +244,6 @@ async fn get_params() -> Result<LaunchParams, String> {
let mut threads: Option<u64> = Some(1);
let mut target_rate: Option<u64> = None;
let mut stop_type: Option<StopType> = None;
let mut stop_max_empty_checks: Option<u64> = Some(5);
let mut stop_check_delay: Option<u64> = Some(5);
let mut collect_metrics: Option<bool> = Some(true);

if args.len() >= 3 {
Expand All @@ -322,12 +268,6 @@ async fn get_params() -> Result<LaunchParams, String> {
let count: u64 = param_value.parse().unwrap();
stop_type = Some(StopType::LimitGeneratedTransactions { count })
}
} else if param_name.eq("--stop-controller-max-empty-checks") {
let param_value = &args[i + 1];
stop_max_empty_checks = Some(param_value.parse().unwrap());
} else if param_name.eq("--stop-controller-delay") {
let param_value = &args[i + 1];
stop_check_delay = Some(param_value.parse().unwrap());
} else if param_name.eq("--no-metrics") {
collect_metrics = Some(false)
}
Expand All @@ -344,8 +284,6 @@ async fn get_params() -> Result<LaunchParams, String> {
target_rate: target_rate.unwrap(),
stop_type: stop_type.unwrap(),
threads: threads.unwrap(),
stop_max_empty_checks: stop_max_empty_checks.unwrap(),
stop_check_delay: Duration::from_secs(stop_check_delay.unwrap()),
collect_metrics: collect_metrics.unwrap(),
})
}
Expand Down
2 changes: 2 additions & 0 deletions examples/certifier_kafka_pg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ logger = { path = "../../packages/logger" }
talos_certifier = { path = "../../packages/talos_certifier" }
talos_suffix = { path = "../../packages/talos_suffix" }
talos_certifier_adapters = { path = "../../packages/talos_certifier_adapters" }
talos_common_utils = { path = "../../packages/talos_common_utils" }
talos_rdkafka_utils = { path = "../../packages/talos_rdkafka_utils" }

[dev-dependencies.cargo-husky]
version = "1"
Expand Down
7 changes: 4 additions & 3 deletions examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use log::{error, info};
use talos_certifier::env_var;
use talos_certifier_adapters::{certifier_with_kafka_pg, Configuration, KafkaConfig, PgConfig, TalosCertifierChannelBuffers};
use talos_certifier_adapters::{certifier_with_kafka_pg, Configuration, PgConfig, TalosCertifierChannelBuffers};
use talos_common_utils::env_var;
use talos_rdkafka_utils::kafka_config::KafkaConfig;
use talos_suffix::core::SuffixConfig;
use tokio::signal;

Expand All @@ -17,7 +18,7 @@ async fn main() -> Result<(), impl std::error::Error> {

info!("Talos certifier starting...");

let kafka_config = KafkaConfig::from_env();
let kafka_config = KafkaConfig::from_env(None);
let pg_config = PgConfig::from_env();
let mock_config = get_mock_config();
let suffix_config = Some(SuffixConfig {
Expand Down
2 changes: 1 addition & 1 deletion examples/cohort_banking_with_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ metrics = { path = "../../packages/metrics" }
talos_agent = { path = "../../packages/talos_agent" }
talos_certifier = { path = "../../packages/talos_certifier" }
talos_certifier_adapters = { path = "../../packages/talos_certifier_adapters" }
talos_suffix = { path = "../../packages/talos_suffix" }
talos_rdkafka_utils = { path = "../../packages/talos_rdkafka_utils" }

async-trait = { workspace = true }
env_logger = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::HashMap, env, sync::Arc, time::Duration};

use async_channel::Receiver;
use cohort_banking::state::postgres::database_config::DatabaseConfig;
use cohort_banking::{app::BankingApp, examples_support::queue_processor::QueueProcessor, model::requests::TransferRequest};
use cohort_sdk::model::{BackoffConfig, Config};
use examples_support::load_generator::models::Generator;
Expand All @@ -17,6 +18,7 @@ use opentelemetry_sdk::runtime;
use opentelemetry_stdout::MetricsExporterBuilder;
use rand::Rng;
use rust_decimal::prelude::FromPrimitive;
use talos_rdkafka_utils::kafka_config::KafkaConfig;
use tokio::{signal, task::JoinHandle, try_join};

use opentelemetry::global::shutdown_tracer_provider;
Expand Down Expand Up @@ -68,7 +70,7 @@ async fn main() -> Result<(), String> {
let generator = ControlledRateLoadGenerator::generate(params.stop_type, params.target_rate, generator_impl, Arc::new(tx_queue));
let h_generator = tokio::spawn(generator);

let config = Config {
let sdk_config = Config {
//
// cohort configs
//
Expand All @@ -89,37 +91,30 @@ async fn main() -> Result<(), String> {
buffer_size: 10_000_000,
timeout_ms: 600_000,

//
// Common to kafka configs values
//
brokers: "127.0.0.1:9092".into(),
topic: "dev.ksp.certification".into(),
sasl_mechanisms: None,
kafka_username: None,
kafka_password: None,

//
// Kafka configs for Agent
//
// Must be unique for each agent instance. Can be the same as AgentConfig.agent_id
agent_group_id: "cohort-banking".into(),
agent_fetch_wait_max_ms: 6000,
// The maximum time librdkafka may use to deliver a message (including retries)
agent_message_timeout_ms: 15000,
// Controls how long to wait until message is successfully placed on the librdkafka producer queue (including retries).
agent_enqueue_timeout_ms: 10,
// should be mapped to rdkafka::config::RDKafkaLogLevel
agent_log_level: 6,
kafka: KafkaConfig {
brokers: vec!["127.0.0.1:9092".to_string()],
topic: "dev.ksp.certification".into(),
client_id: "cohort-banking".into(),
// Must be unique for each agent instance. Can be the same as AgentConfig.agent_id
group_id: "cohort-banking".into(),
username: "".into(),
password: "".into(),
// The maximum time librdkafka may use to deliver a message (including retries)
producer_config_overrides: HashMap::from([("message.timeout.ms".into(), "15000".into())]),
consumer_config_overrides: HashMap::from([("fetch.wait.max.ms".into(), "6000".into())]),
// consumer_config_overrides: HashMap::new(),
producer_send_timeout_ms: Some(10),
log_level: Some("info".into()),
},
};

//
// Database config
//
db_pool_size: 100,
db_user: "postgres".into(),
db_password: "admin".into(),
db_host: "127.0.0.1".into(),
db_port: "5432".into(),
db_database: "talos-sample-cohort-dev".into(),
let db_config = DatabaseConfig {
pool_size: 100,
user: "postgres".into(),
password: "admin".into(),
host: "127.0.0.1".into(),
port: "5432".into(),
database: "talos-sample-cohort-dev".into(),
};

let printer = MetricsToStringPrinter::new(params.threads, params.metric_print_raw, ScalingConfig { ratios: params.scaling_config });
Expand All @@ -144,7 +139,7 @@ async fn main() -> Result<(), String> {
let meter = Arc::new(meter);

let h_cohort = tokio::spawn(async move {
let mut banking_app = BankingApp::new(config).await.unwrap();
let mut banking_app = BankingApp::new(sdk_config, db_config).await.unwrap();
let _ = banking_app.init().await;
let tasks = QueueProcessor::process::<TransferRequest, BankingApp>(rx_queue, meter, params.threads, Arc::new(banking_app)).await;

Expand Down
2 changes: 2 additions & 0 deletions examples/cohort_replicator_kafka_pg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ cohort_banking = { path = "../../packages/cohort_banking" }
talos_cohort_replicator = { path = "../../packages/talos_cohort_replicator" }
talos_certifier = { path = "../../packages/talos_certifier" }
talos_certifier_adapters = { path = "../../packages/talos_certifier_adapters" }
talos_common_utils = { path = "../../packages/talos_common_utils" }
talos_rdkafka_utils = { path = "../../packages/talos_rdkafka_utils" }

async-trait = { workspace = true }
env_logger = { workspace = true }
Expand Down
Loading

0 comments on commit c802f0c

Please sign in to comment.