From c802f0cc2863f1aa4f6ab840209a2270a592158a Mon Sep 17 00:00:00 2001 From: fmarek-kindred <123923685+fmarek-kindred@users.noreply.github.com> Date: Tue, 22 Aug 2023 12:46:45 +1000 Subject: [PATCH] feat: Add abort reason to CapturedState. (#71) * feat: Add abort reason to CapturedState. * feat: Make re-usable KafkaConfig structure. (#72) --- Cargo.lock | 352 +++++++++++------ examples/agent_client/Cargo.toml | 9 +- .../agent_client/examples/agent_client.rs | 124 ++---- examples/certifier_kafka_pg/Cargo.toml | 2 + .../examples/certifier_kafka_pg.rs | 7 +- examples/cohort_banking_with_sdk/Cargo.toml | 2 +- .../examples/cohort_banking_with_sdk.rs | 59 ++- .../cohort_replicator_kafka_pg/Cargo.toml | 2 + .../examples/cohort_replicator_kafka_pg.rs | 8 +- packages/cohort_banking/src/app.rs | 11 +- .../src/callbacks/state_provider.rs | 2 + packages/cohort_sdk/Cargo.toml | 1 + packages/cohort_sdk/src/cohort.rs | 23 +- packages/cohort_sdk/src/model/callbacks.rs | 1 + packages/cohort_sdk/src/model/internal.rs | 1 + packages/cohort_sdk/src/model/mod.rs | 76 ++-- packages/talos_agent/Cargo.toml | 8 +- .../talos_agent/src/agent/decision_reader.rs | 2 + packages/talos_agent/src/api.rs | 35 -- packages/talos_agent/src/messaging/api.rs | 3 +- packages/talos_agent/src/messaging/kafka.rs | 120 ++---- packages/talos_certifier/src/lib.rs | 1 - packages/talos_certifier_adapters/Cargo.toml | 14 +- .../histogram_decision_timeline_from_kafka.rs | 16 +- .../src/certifier_kafka_pg.rs | 2 +- .../src/kafka/config.rs | 231 ----------- .../src/kafka/consumer.rs | 3 +- .../src/kafka/kafka_deploy.rs | 4 +- .../talos_certifier_adapters/src/kafka/mod.rs | 1 - .../src/kafka/producer.rs | 3 +- packages/talos_certifier_adapters/src/lib.rs | 1 - .../src/postgres/config.rs | 2 +- packages/talos_common_utils/Cargo.toml | 16 + .../utils => talos_common_utils/src}/env.rs | 9 +- .../mod.rs => talos_common_utils/src/lib.rs} | 0 packages/talos_rdkafka_utils/Cargo.toml | 29 ++ .../talos_rdkafka_utils/src/kafka_config.rs | 363 ++++++++++++++++++ packages/talos_rdkafka_utils/src/lib.rs | 1 + 38 files changed, 829 insertions(+), 715 deletions(-) delete mode 100644 packages/talos_certifier_adapters/src/kafka/config.rs create mode 100644 packages/talos_common_utils/Cargo.toml rename packages/{talos_certifier/src/utils => talos_common_utils/src}/env.rs (98%) rename packages/{talos_certifier/src/utils/mod.rs => talos_common_utils/src/lib.rs} (100%) create mode 100644 packages/talos_rdkafka_utils/Cargo.toml create mode 100644 packages/talos_rdkafka_utils/src/kafka_config.rs create mode 100644 packages/talos_rdkafka_utils/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 61f881b2..088e25f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,13 +27,14 @@ dependencies = [ "examples_support", "log", "rand", - "rdkafka 0.29.0", + "rdkafka 0.33.2", "rdkafka-sys", "serde", "serde_json", - "strum 0.24.1", + "strum 0.25.0", "talos_agent", - "time 0.3.24", + "talos_rdkafka_utils", + "time 0.3.26", "tokio", "uuid", ] @@ -63,9 +64,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.0.2" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41" +checksum = "6748e8def348ed4d14996fa801f4122cd763fff530258cdc03f64b25f89d3a5a" dependencies = [ "memchr", ] @@ -91,6 +92,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "anstyle" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a30da5c5f2d5e72842e00bcb57657162cdabef0931f40e2deb9b4140440cecd" + [[package]] name = "arrayvec" version = "0.7.4" @@ -127,18 +134,18 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] name = "async-trait" -version = "0.1.72" +version = "0.1.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09" +checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] @@ -187,9 +194,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.3.3" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630be753d4e58660abd17930c71b647fe46c27ea6b63cc59e1e3851406972e42" +checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" [[package]] name = "bitvec" @@ -311,9 +318,12 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.79" +version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +dependencies = [ + "libc", +] [[package]] name = "certifier_kafka_pg" @@ -326,6 +336,8 @@ dependencies = [ "refinery", "talos_certifier", "talos_certifier_adapters", + "talos_common_utils", + "talos_rdkafka_utils", "talos_suffix", "tokio", ] @@ -386,11 +398,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" dependencies = [ "bitflags 1.3.2", - "clap_lex", + "clap_lex 0.2.4", "indexmap 1.9.3", "textwrap", ] +[[package]] +name = "clap" +version = "4.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03aef18ddf7d879c15ce20f04826ef8418101c7e528014c3eeea13321047dca3" +dependencies = [ + "clap_builder", +] + +[[package]] +name = "clap_builder" +version = "4.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ce6fffb678c9b80a70b6b6de0aad31df727623a70fd9a842c30cd573e2fa98" +dependencies = [ + "anstyle", + "clap_lex 0.5.0", +] + [[package]] name = "clap_lex" version = "0.2.4" @@ -400,6 +431,12 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "clap_lex" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" + [[package]] name = "cohort_banking" version = "0.0.1" @@ -457,8 +494,8 @@ dependencies = [ "talos_agent", "talos_certifier", "talos_certifier_adapters", - "talos_suffix", - "time 0.3.24", + "talos_rdkafka_utils", + "time 0.3.26", "tokio", "tokio-postgres", "uuid", @@ -480,7 +517,9 @@ dependencies = [ "talos_certifier", "talos_certifier_adapters", "talos_cohort_replicator", - "time 0.3.24", + "talos_common_utils", + "talos_rdkafka_utils", + "time 0.3.26", "tokio", "tokio-postgres", "uuid", @@ -506,6 +545,7 @@ dependencies = [ "strum 0.25.0", "talos_agent", "talos_certifier", + "talos_rdkafka_utils", "tokio", "uuid", ] @@ -544,7 +584,7 @@ dependencies = [ "atty", "cast", "ciborium", - "clap", + "clap 3.2.25", "criterion-plot", "itertools", "lazy_static", @@ -560,6 +600,32 @@ dependencies = [ "walkdir", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap 4.3.23", + "criterion-plot", + "is-terminal", + "itertools", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + [[package]] name = "criterion-plot" version = "0.5.0" @@ -672,9 +738,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.6" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8810e7e2cf385b1e9b50d68264908ec367ba642c96d02edfe61c39e88e2a3c01" +checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946" [[package]] name = "difflib" @@ -738,9 +804,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "erased-serde" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da96524cc884f6558f1769b6c46686af2fe8e8b4cd253bd5a3cdba8181b8e070" +checksum = "fc978899517288e3ebbd1a3bfc1d9537dbb87eeab149e53ea490e63bcdff561a" dependencies = [ "serde", ] @@ -786,7 +852,7 @@ dependencies = [ "rust_decimal", "strum 0.24.1", "thiserror", - "time 0.3.24", + "time 0.3.26", "tokio", "tokio-postgres", "uuid", @@ -884,7 +950,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] @@ -1125,9 +1191,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.3" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0" +checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" [[package]] name = "lock_api" @@ -1141,9 +1207,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.19" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" dependencies = [ "serde", "value-bag", @@ -1198,7 +1264,7 @@ dependencies = [ "opentelemetry_sdk", "serde", "serde_json", - "time 0.3.24", + "time 0.3.26", ] [[package]] @@ -1250,9 +1316,9 @@ dependencies = [ [[package]] name = "multimap" -version = "0.8.3" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +checksum = "70db9248a93dc36a36d9a47898caa007a32755c7ad140ec64eeeb50d5a730631" dependencies = [ "serde", ] @@ -1326,9 +1392,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "openssl-sys" -version = "0.9.90" +version = "0.9.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "374533b0e45f3a7ced10fcaeccca020e66656bc03dac384f852e4e5a7a8104a6" +checksum = "866b5f16f90776b9bb8dc1e1802ac6f0513de3a7a7465867bfbc563dc737faac" dependencies = [ "cc", "libc", @@ -1400,9 +1466,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "3.7.0" +version = "3.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fc2dbde8f8a79f2102cc474ceb0ad68e3b80b85289ea62389b60e66777e4213" +checksum = "126d3e6f3926bfb0fb24495b4f4da50626f547e54956594748e3d8882a0320b4" dependencies = [ "num-traits", ] @@ -1472,9 +1538,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.10" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" +checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05" [[package]] name = "pin-utils" @@ -1518,9 +1584,9 @@ dependencies = [ [[package]] name = "postgres" -version = "0.19.5" +version = "0.19.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bed5017bc2ff49649c0075d0d7a9d676933c1292480c1d137776fb205b5cd18" +checksum = "2843af30d9b6dccec9f8779eb50d56cfbec54b38c7b1bbaf50b37d6c8a4f8959" dependencies = [ "bytes", "fallible-iterator", @@ -1532,9 +1598,9 @@ dependencies = [ [[package]] name = "postgres-protocol" -version = "0.6.5" +version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78b7fa9f396f51dffd61546fd8573ee20592287996568e6175ceb0f8699ad75d" +checksum = "49b6c5ef183cd3ab4ba005f1ca64c21e8bd97ce4699cfea9e8d9a2c4958ca520" dependencies = [ "base64", "byteorder", @@ -1550,9 +1616,9 @@ dependencies = [ [[package]] name = "postgres-types" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f028f05971fe20f512bcc679e2c10227e57809a3af86a7606304435bc8896cd6" +checksum = "8d2234cdee9408b523530a9b6d2d6b373d1db34f6a8e51dc03ded1828d7fb67c" dependencies = [ "bytes", "fallible-iterator", @@ -1648,9 +1714,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.32" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ "proc-macro2", ] @@ -1796,7 +1862,7 @@ dependencies = [ "serde", "siphasher", "thiserror", - "time 0.3.24", + "time 0.3.26", "tokio", "tokio-postgres", "toml 0.7.6", @@ -1814,14 +1880,14 @@ dependencies = [ "quote", "refinery-core", "regex", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] name = "regex" -version = "1.9.1" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" +checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a" dependencies = [ "aho-corasick", "memchr", @@ -1831,9 +1897,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.4" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7b6d6190b7594385f61bd3911cd1be99dfddcfc365a4160cc2ab5bff4aed294" +checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69" dependencies = [ "aho-corasick", "memchr", @@ -1916,11 +1982,11 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.38.4" +version = "0.38.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5" +checksum = "19ed4fa021d81c8392ce04db050a3da9a60299050b7ae1cf482d862b54a7218f" dependencies = [ - "bitflags 2.3.3", + "bitflags 2.4.0", "errno", "libc", "linux-raw-sys", @@ -1974,22 +2040,22 @@ checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" [[package]] name = "serde" -version = "1.0.179" +version = "1.0.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a5bf42b8d227d4abf38a1ddb08602e229108a517cd4e5bb28f9c7eaafdce5c0" +checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.179" +version = "1.0.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "741e124f5485c7e60c03b043f79f320bff3527f4bbf12cf3831750dc46a0ec2c" +checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] @@ -2003,9 +2069,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.104" +version = "1.0.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "076066c5f1078eac5b722a31827a8832fe108bed65dfa75e233c89f8206e976c" +checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360" dependencies = [ "itoa", "ryu", @@ -2023,9 +2089,9 @@ dependencies = [ [[package]] name = "serial_test" -version = "1.0.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "538c30747ae860d6fb88330addbbd3e0ddbe46d662d032855596d8a8ca260611" +checksum = "0e56dd856803e253c8f298af3f4d7eb0ae5e23a737252cd90bb4f3b435033b2d" dependencies = [ "dashmap", "futures", @@ -2037,13 +2103,13 @@ dependencies = [ [[package]] name = "serial_test_derive" -version = "1.0.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "079a83df15f85d89a68d64ae1238f142f172b1fa915d0d76b26a7cba1b659a69" +checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.29", ] [[package]] @@ -2103,16 +2169,6 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" -[[package]] -name = "socket2" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.5.3" @@ -2148,7 +2204,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" dependencies = [ - "strum_macros 0.25.1", + "strum_macros 0.25.2", ] [[package]] @@ -2166,15 +2222,15 @@ dependencies = [ [[package]] name = "strum_macros" -version = "0.25.1" +version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6069ca09d878a33f883cc06aaa9718ede171841d3832450354410b718b097232" +checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059" dependencies = [ "heck", "proc-macro2", "quote", "rustversion", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] @@ -2264,9 +2320,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.28" +version = "2.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04361975b3f5e348b2189d8dc55bc942f278b2d482a6a0365de5bdd62d351567" +checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a" dependencies = [ "proc-macro2", "quote", @@ -2282,13 +2338,14 @@ dependencies = [ "log", "mockall", "multimap", - "rdkafka 0.29.0", + "rdkafka 0.33.2", "rdkafka-sys", "serde", "serde_json", - "strum 0.24.1", + "strum 0.25.0", + "talos_rdkafka_utils", "thiserror", - "time 0.3.24", + "time 0.3.26", "tokio", "tokio-test", "uuid", @@ -2301,7 +2358,7 @@ dependencies = [ "ahash 0.8.3", "async-trait", "cargo-husky", - "criterion", + "criterion 0.4.0", "env_logger", "futures-util", "log", @@ -2311,7 +2368,7 @@ dependencies = [ "strum 0.24.1", "talos_suffix", "thiserror", - "time 0.3.24", + "time 0.3.26", "tokio", "tokio-test", ] @@ -2328,15 +2385,17 @@ dependencies = [ "logger", "metrics", "mockall", - "rdkafka 0.29.0", + "rdkafka 0.33.2", "refinery", "serde", "serde_json", "serial_test", "talos_certifier", + "talos_common_utils", + "talos_rdkafka_utils", "talos_suffix", "thiserror", - "time 0.3.24", + "time 0.3.26", "tokio", "tokio-postgres", "uuid", @@ -2358,7 +2417,33 @@ dependencies = [ "talos_certifier", "talos_suffix", "thiserror", - "time 0.3.24", + "time 0.3.26", + "tokio", + "tokio-test", +] + +[[package]] +name = "talos_common_utils" +version = "0.1.0" +dependencies = [ + "env_logger", + "log", + "serial_test", +] + +[[package]] +name = "talos_rdkafka_utils" +version = "0.1.0" +dependencies = [ + "async-trait", + "criterion 0.5.1", + "env_logger", + "log", + "mockall", + "rdkafka 0.33.2", + "serial_test", + "talos_common_utils", + "thiserror", "tokio", "tokio-test", ] @@ -2368,7 +2453,7 @@ name = "talos_suffix" version = "0.1.0" dependencies = [ "async-trait", - "criterion", + "criterion 0.4.0", "env_logger", "log", "mockall", @@ -2408,22 +2493,22 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.44" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "611040a08a0439f8248d1990b111c95baa9c704c805fa1f62104b39655fd7f90" +checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.44" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96" +checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] @@ -2439,9 +2524,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.24" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b79eabcd964882a646b3584543ccabeae7869e9ac32a46f6f22b7a5bd405308b" +checksum = "a79d09ac6b08c1ab3906a2f7cc2e81a0e27c7ae89c63812df75e52bef0751e07" dependencies = [ "deranged", "itoa", @@ -2458,9 +2543,9 @@ checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" [[package]] name = "time-macros" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb71511c991639bb078fd5bf97757e03914361c48100d52878b8e52b46fb92cd" +checksum = "75c65469ed6b3a4809d987a41eb1dc918e9bc1d92211cbad7ae82931846f7451" dependencies = [ "time-core", ] @@ -2492,11 +2577,10 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.29.1" +version = "1.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" +checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" dependencies = [ - "autocfg", "backtrace", "bytes", "libc", @@ -2505,7 +2589,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.4.9", + "socket2", "tokio-macros", "windows-sys", ] @@ -2518,14 +2602,14 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", ] [[package]] name = "tokio-postgres" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e89f6234aa8fd43779746012fcf53603cdb91fdd8399aa0de868c2d56b6dde1" +checksum = "000387915083ea6406ee44b50ca74813aba799fe682a7689e382bf9e13b74ce9" dependencies = [ "async-trait", "byteorder", @@ -2540,9 +2624,11 @@ dependencies = [ "pin-project-lite", "postgres-protocol", "postgres-types", - "socket2 0.5.3", + "rand", + "socket2", "tokio", "tokio-util", + "whoami", ] [[package]] @@ -2790,7 +2876,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", "wasm-bindgen-shared", ] @@ -2812,7 +2898,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.28", + "syn 2.0.29", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2833,6 +2919,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "whoami" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22fc3756b8a9133049b26c7f61ab35416c130e8c09b660f5b3958b446f52cc50" +dependencies = [ + "wasm-bindgen", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" @@ -2884,9 +2980,9 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.48.1" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ "windows_aarch64_gnullvm", "windows_aarch64_msvc", @@ -2899,51 +2995,51 @@ dependencies = [ [[package]] name = "windows_aarch64_gnullvm" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_msvc" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_i686_gnu" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_msvc" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_x86_64_gnu" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnullvm" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_msvc" -version = "0.48.0" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "winnow" -version = "0.5.2" +version = "0.5.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bd122eb777186e60c3fdf765a58ac76e41c582f1f535fbf3314434c6b58f3f7" +checksum = "d09770118a7eb1ccaf4a594a221334119a44a814fcb0d31c5b85e83e97227a97" dependencies = [ "memchr", ] diff --git a/examples/agent_client/Cargo.toml b/examples/agent_client/Cargo.toml index 4b117068..c54a9d76 100644 --- a/examples/agent_client/Cargo.toml +++ b/examples/agent_client/Cargo.toml @@ -5,19 +5,20 @@ edition = "2021" [dev-dependencies] -talos_agent = { path = "../../packages/talos_agent" } -examples_support = { path = "../../packages/examples_support" } +talos_agent = { path = "../../packages/talos_agent" } +talos_rdkafka_utils = { path = "../../packages/talos_rdkafka_utils" } +examples_support = { path = "../../packages/examples_support" } async-channel = { version = "1.8.0" } async-trait = { workspace = true } env_logger = { workspace = true } log = { workspace = true } rand = { version = "0.8.5" } -rdkafka = { version = "0.29.0", features = ["sasl"] } +rdkafka = { version = "0.33.2", features = ["sasl"] } rdkafka-sys = { version = "4.3.0" } serde = { workspace = true } serde_json = { workspace = true } -strum = { version = "0.24", features = ["derive"] } +strum = { version = "0.25", features = ["derive"] } tokio = { workspace = true, features = ["full"] } uuid = { version = "1.2.2", features = ["v4"] } time = { version = "0.3.17" } diff --git a/examples/agent_client/examples/agent_client.rs b/examples/agent_client/examples/agent_client.rs index 814a411e..e2ec67b4 100644 --- a/examples/agent_client/examples/agent_client.rs +++ b/examples/agent_client/examples/agent_client.rs @@ -1,20 +1,19 @@ use async_channel::Receiver; use examples_support::load_generator::generator::ControlledRateLoadGenerator; use examples_support::load_generator::models::{Generator, StopType}; -use std::num::ParseIntError; -use std::{env, sync::Arc, time::Duration}; - -use rdkafka::config::RDKafkaLogLevel; +use std::collections::HashMap; use std::env::{var, VarError}; +use std::{env, sync::Arc, time::Duration}; use talos_agent::agent::core::TalosAgentImpl; use talos_agent::agent::model::{CancelRequestChannelMessage, CertifyRequestChannelMessage}; -use talos_agent::api::{AgentConfig, CandidateData, CertificationRequest, CertificationResponse, KafkaConfig, TalosAgent, TalosType}; +use talos_agent::api::{AgentConfig, CandidateData, CertificationRequest, CertificationResponse, TalosAgent, TalosType}; use talos_agent::messaging::api::DecisionMessage; use talos_agent::messaging::kafka::KafkaInitializer; use talos_agent::metrics::client::MetricsClient; use talos_agent::metrics::core::Metrics; use talos_agent::metrics::model::Signal; use talos_agent::mpsc::core::{ReceiverWrapper, SenderWrapper}; +use talos_rdkafka_utils::kafka_config::KafkaConfig; use time::OffsetDateTime; use tokio::sync::mpsc; use tokio::task::JoinHandle; @@ -23,8 +22,6 @@ use uuid::Uuid; #[derive(Clone)] struct LaunchParams { - stop_max_empty_checks: u64, - stop_check_delay: Duration, stop_type: StopType, target_rate: u64, threads: u64, @@ -51,7 +48,7 @@ async fn certify() -> Result<(), String> { // give this to stop controller let rx_generated_ref = Arc::clone(&rx_generated); - let h_stop_controller: JoinHandle> = create_stop_controller(params.clone(), rx_generated_ref); + let h_monitor: JoinHandle> = create_queue_monitor(rx_generated_ref); let h_agent_workers = init_workers(params.clone(), rx_generated); @@ -61,15 +58,11 @@ async fn certify() -> Result<(), String> { }); let all_async_services = tokio::spawn(async move { - let result = try_join!(h_workload_generator, h_agent_workers); + let result = try_join!(h_workload_generator, h_agent_workers, h_monitor); log::info!("Result from the services ={result:?}"); }); tokio::select! { - _ = h_stop_controller => { - log::info!("Stop controller is active..."); - } - _ = all_async_services => {} // CTRL + C termination signal @@ -132,24 +125,6 @@ fn init_workers(params: LaunchParams, queue: Arc Result { - match read_var("KAFKA_LOG_LEVEL") { - Ok(level) => match level.to_lowercase().as_str() { - "alert" => Ok(RDKafkaLogLevel::Alert), - "critical" => Ok(RDKafkaLogLevel::Critical), - "debug" => Ok(RDKafkaLogLevel::Debug), - "emerg" => Ok(RDKafkaLogLevel::Emerg), - "error" => Ok(RDKafkaLogLevel::Error), - "info" => Ok(RDKafkaLogLevel::Info), - "notice" => Ok(RDKafkaLogLevel::Notice), - "warning" => Ok(RDKafkaLogLevel::Warning), - _ => Ok(RDKafkaLogLevel::Info), - }, - - Err(e) => Err(e), - } -} - fn load_configs() -> Result<(AgentConfig, KafkaConfig), String> { let cfg_agent = AgentConfig { agent: read_var("AGENT_NAME").unwrap(), @@ -158,42 +133,28 @@ fn load_configs() -> Result<(AgentConfig, KafkaConfig), String> { timeout_ms: read_var("AGENT_TIMEOUT_MS").unwrap().parse().unwrap(), }; - let cfg_kafka = KafkaConfig { - brokers: read_var("KAFKA_BROKERS")?, - group_id: read_var("KAFKA_GROUP_ID")?, - certification_topic: read_var("KAFKA_TOPIC")?, - fetch_wait_max_ms: read_var("KAFKA_FETCH_WAIT_MAX_MS")?.parse().map_err(|e: ParseIntError| e.to_string())?, - message_timeout_ms: read_var("KAFKA_MESSAGE_TIMEOUT_MS")?.parse().map_err(|e: ParseIntError| e.to_string())?, - enqueue_timeout_ms: read_var("KAFKA_ENQUEUE_TIMEOUT_MS")?.parse().map_err(|e: ParseIntError| e.to_string())?, - log_level: get_kafka_log_level_from_env()?, - talos_type: TalosType::External, - sasl_mechanisms: read_var_optional("KAFKA_SASL_MECHANISMS")?, - username: read_var_optional("KAFKA_USERNAME")?, - password: read_var_optional("KAFKA_PASSWORD")?, - }; + let mut cfg_kafka = KafkaConfig::from_env(Some("AGENT")); + let more_producer_values = [ + ("message.timeout.ms".to_string(), "15000".to_string()), + ("queue.buffering.max.messages".to_string(), "1000000".to_string()), + ("topic.metadata.refresh.interval.ms".to_string(), "5".to_string()), + ("socket.keepalive.enable".to_string(), "true".to_string()), + ("acks".to_string(), "0".to_string()), + ]; + + let more_consumer_values = [ + ("enable.auto.commit".to_string(), "false".to_string()), + ("auto.offset.reset".to_string(), "latest".to_string()), + ("fetch.wait.max.ms".to_string(), "600".to_string()), + ("socket.keepalive.enable".to_string(), "true".to_string()), + ("acks".to_string(), "0".to_string()), + ]; + + cfg_kafka.extend(Some(HashMap::from(more_producer_values)), Some(HashMap::from(more_consumer_values))); Ok((cfg_agent, cfg_kafka)) } -fn read_var_optional(name: &str) -> Result, String> { - match var(name) { - Ok(value) => { - if value.is_empty() { - Ok(None) - } else { - Ok(Some(value.trim().to_string())) - } - } - Err(e) => match e { - VarError::NotPresent => { - log::info!("Environment variable is not found: \"{}\"", name); - Ok(None) - } - VarError::NotUnicode(_) => Err(format!("Environment variable is not unique: \"{}\"", name)), - }, - } -} - fn read_var(name: &str) -> Result { match var(name) { Ok(value) => { @@ -225,7 +186,7 @@ async fn make_agent(params: LaunchParams) -> impl TalosAgent { let tx_cancel = SenderWrapper:: { tx: tx_cancel_ch }; let rx_cancel = ReceiverWrapper:: { rx: rx_cancel_ch }; - let (publisher, consumer) = KafkaInitializer::connect(cfg_agent.agent.clone(), cfg_kafka) + let (publisher, consumer) = KafkaInitializer::connect(cfg_agent.agent.clone(), cfg_kafka, TalosType::External) .await .expect("Cannot connect to kafka..."); @@ -264,30 +225,17 @@ async fn make_agent(params: LaunchParams) -> impl TalosAgent { agent } -fn create_stop_controller(params: LaunchParams, queue: Arc>) -> JoinHandle> { +fn create_queue_monitor(queue: Arc>) -> JoinHandle> { tokio::spawn(async move { - let mut remaining_checks = params.stop_max_empty_checks; loop { - tokio::time::sleep(params.stop_check_delay).await; + tokio::time::sleep(Duration::from_secs(10)).await; if queue.is_empty() { - log::info!( - "There are no more items to process, finalising in {} sec", - remaining_checks * params.stop_check_delay.as_secs() - ); - remaining_checks -= 1; - if remaining_checks == 0 { - break; - } - } else { - let len = queue.len(); - log::info!("Items remaining to process: {}", len); - remaining_checks = params.stop_max_empty_checks; + continue; } - } - - queue.close(); - Err("Signal from StopController".into()) + let len = queue.len(); + log::info!("Items remaining to process: {}", len); + } }) } @@ -296,8 +244,6 @@ async fn get_params() -> Result { let mut threads: Option = Some(1); let mut target_rate: Option = None; let mut stop_type: Option = None; - let mut stop_max_empty_checks: Option = Some(5); - let mut stop_check_delay: Option = Some(5); let mut collect_metrics: Option = Some(true); if args.len() >= 3 { @@ -322,12 +268,6 @@ async fn get_params() -> Result { let count: u64 = param_value.parse().unwrap(); stop_type = Some(StopType::LimitGeneratedTransactions { count }) } - } else if param_name.eq("--stop-controller-max-empty-checks") { - let param_value = &args[i + 1]; - stop_max_empty_checks = Some(param_value.parse().unwrap()); - } else if param_name.eq("--stop-controller-delay") { - let param_value = &args[i + 1]; - stop_check_delay = Some(param_value.parse().unwrap()); } else if param_name.eq("--no-metrics") { collect_metrics = Some(false) } @@ -344,8 +284,6 @@ async fn get_params() -> Result { target_rate: target_rate.unwrap(), stop_type: stop_type.unwrap(), threads: threads.unwrap(), - stop_max_empty_checks: stop_max_empty_checks.unwrap(), - stop_check_delay: Duration::from_secs(stop_check_delay.unwrap()), collect_metrics: collect_metrics.unwrap(), }) } diff --git a/examples/certifier_kafka_pg/Cargo.toml b/examples/certifier_kafka_pg/Cargo.toml index ba12649d..950a560c 100644 --- a/examples/certifier_kafka_pg/Cargo.toml +++ b/examples/certifier_kafka_pg/Cargo.toml @@ -21,6 +21,8 @@ logger = { path = "../../packages/logger" } talos_certifier = { path = "../../packages/talos_certifier" } talos_suffix = { path = "../../packages/talos_suffix" } talos_certifier_adapters = { path = "../../packages/talos_certifier_adapters" } +talos_common_utils = { path = "../../packages/talos_common_utils" } +talos_rdkafka_utils = { path = "../../packages/talos_rdkafka_utils" } [dev-dependencies.cargo-husky] version = "1" diff --git a/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs b/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs index fdeccecc..9c4aa103 100644 --- a/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs +++ b/examples/certifier_kafka_pg/examples/certifier_kafka_pg.rs @@ -1,6 +1,7 @@ use log::{error, info}; -use talos_certifier::env_var; -use talos_certifier_adapters::{certifier_with_kafka_pg, Configuration, KafkaConfig, PgConfig, TalosCertifierChannelBuffers}; +use talos_certifier_adapters::{certifier_with_kafka_pg, Configuration, PgConfig, TalosCertifierChannelBuffers}; +use talos_common_utils::env_var; +use talos_rdkafka_utils::kafka_config::KafkaConfig; use talos_suffix::core::SuffixConfig; use tokio::signal; @@ -17,7 +18,7 @@ async fn main() -> Result<(), impl std::error::Error> { info!("Talos certifier starting..."); - let kafka_config = KafkaConfig::from_env(); + let kafka_config = KafkaConfig::from_env(None); let pg_config = PgConfig::from_env(); let mock_config = get_mock_config(); let suffix_config = Some(SuffixConfig { diff --git a/examples/cohort_banking_with_sdk/Cargo.toml b/examples/cohort_banking_with_sdk/Cargo.toml index ca1da297..f6066fce 100644 --- a/examples/cohort_banking_with_sdk/Cargo.toml +++ b/examples/cohort_banking_with_sdk/Cargo.toml @@ -12,7 +12,7 @@ metrics = { path = "../../packages/metrics" } talos_agent = { path = "../../packages/talos_agent" } talos_certifier = { path = "../../packages/talos_certifier" } talos_certifier_adapters = { path = "../../packages/talos_certifier_adapters" } -talos_suffix = { path = "../../packages/talos_suffix" } +talos_rdkafka_utils = { path = "../../packages/talos_rdkafka_utils" } async-trait = { workspace = true } env_logger = { workspace = true } diff --git a/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs b/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs index f8ef5847..11d90aa1 100644 --- a/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs +++ b/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, env, sync::Arc, time::Duration}; use async_channel::Receiver; +use cohort_banking::state::postgres::database_config::DatabaseConfig; use cohort_banking::{app::BankingApp, examples_support::queue_processor::QueueProcessor, model::requests::TransferRequest}; use cohort_sdk::model::{BackoffConfig, Config}; use examples_support::load_generator::models::Generator; @@ -17,6 +18,7 @@ use opentelemetry_sdk::runtime; use opentelemetry_stdout::MetricsExporterBuilder; use rand::Rng; use rust_decimal::prelude::FromPrimitive; +use talos_rdkafka_utils::kafka_config::KafkaConfig; use tokio::{signal, task::JoinHandle, try_join}; use opentelemetry::global::shutdown_tracer_provider; @@ -68,7 +70,7 @@ async fn main() -> Result<(), String> { let generator = ControlledRateLoadGenerator::generate(params.stop_type, params.target_rate, generator_impl, Arc::new(tx_queue)); let h_generator = tokio::spawn(generator); - let config = Config { + let sdk_config = Config { // // cohort configs // @@ -89,37 +91,30 @@ async fn main() -> Result<(), String> { buffer_size: 10_000_000, timeout_ms: 600_000, - // - // Common to kafka configs values - // - brokers: "127.0.0.1:9092".into(), - topic: "dev.ksp.certification".into(), - sasl_mechanisms: None, - kafka_username: None, - kafka_password: None, - - // - // Kafka configs for Agent - // - // Must be unique for each agent instance. Can be the same as AgentConfig.agent_id - agent_group_id: "cohort-banking".into(), - agent_fetch_wait_max_ms: 6000, - // The maximum time librdkafka may use to deliver a message (including retries) - agent_message_timeout_ms: 15000, - // Controls how long to wait until message is successfully placed on the librdkafka producer queue (including retries). - agent_enqueue_timeout_ms: 10, - // should be mapped to rdkafka::config::RDKafkaLogLevel - agent_log_level: 6, + kafka: KafkaConfig { + brokers: vec!["127.0.0.1:9092".to_string()], + topic: "dev.ksp.certification".into(), + client_id: "cohort-banking".into(), + // Must be unique for each agent instance. Can be the same as AgentConfig.agent_id + group_id: "cohort-banking".into(), + username: "".into(), + password: "".into(), + // The maximum time librdkafka may use to deliver a message (including retries) + producer_config_overrides: HashMap::from([("message.timeout.ms".into(), "15000".into())]), + consumer_config_overrides: HashMap::from([("fetch.wait.max.ms".into(), "6000".into())]), + // consumer_config_overrides: HashMap::new(), + producer_send_timeout_ms: Some(10), + log_level: Some("info".into()), + }, + }; - // - // Database config - // - db_pool_size: 100, - db_user: "postgres".into(), - db_password: "admin".into(), - db_host: "127.0.0.1".into(), - db_port: "5432".into(), - db_database: "talos-sample-cohort-dev".into(), + let db_config = DatabaseConfig { + pool_size: 100, + user: "postgres".into(), + password: "admin".into(), + host: "127.0.0.1".into(), + port: "5432".into(), + database: "talos-sample-cohort-dev".into(), }; let printer = MetricsToStringPrinter::new(params.threads, params.metric_print_raw, ScalingConfig { ratios: params.scaling_config }); @@ -144,7 +139,7 @@ async fn main() -> Result<(), String> { let meter = Arc::new(meter); let h_cohort = tokio::spawn(async move { - let mut banking_app = BankingApp::new(config).await.unwrap(); + let mut banking_app = BankingApp::new(sdk_config, db_config).await.unwrap(); let _ = banking_app.init().await; let tasks = QueueProcessor::process::(rx_queue, meter, params.threads, Arc::new(banking_app)).await; diff --git a/examples/cohort_replicator_kafka_pg/Cargo.toml b/examples/cohort_replicator_kafka_pg/Cargo.toml index 908a824c..afbe09c3 100644 --- a/examples/cohort_replicator_kafka_pg/Cargo.toml +++ b/examples/cohort_replicator_kafka_pg/Cargo.toml @@ -10,6 +10,8 @@ cohort_banking = { path = "../../packages/cohort_banking" } talos_cohort_replicator = { path = "../../packages/talos_cohort_replicator" } talos_certifier = { path = "../../packages/talos_certifier" } talos_certifier_adapters = { path = "../../packages/talos_certifier_adapters" } +talos_common_utils = { path = "../../packages/talos_common_utils" } +talos_rdkafka_utils = { path = "../../packages/talos_rdkafka_utils" } async-trait = { workspace = true } env_logger = { workspace = true } diff --git a/examples/cohort_replicator_kafka_pg/examples/cohort_replicator_kafka_pg.rs b/examples/cohort_replicator_kafka_pg/examples/cohort_replicator_kafka_pg.rs index 4d6046b0..d2c6eb54 100644 --- a/examples/cohort_replicator_kafka_pg/examples/cohort_replicator_kafka_pg.rs +++ b/examples/cohort_replicator_kafka_pg/examples/cohort_replicator_kafka_pg.rs @@ -5,11 +5,13 @@ use cohort_banking::{ callbacks::statemap_installer::BankStatemapInstaller, state::postgres::{database::Database, database_config::DatabaseConfig}, }; -use talos_certifier::{env_var, env_var_with_defaults, ports::MessageReciever}; -use talos_certifier_adapters::{KafkaConfig, KafkaConsumer}; +use talos_certifier::ports::MessageReciever; +use talos_certifier_adapters::KafkaConsumer; use talos_cohort_replicator::{talos_cohort_replicator, CohortReplicatorConfig, ReplicatorSnapshotProvider}; use cohort_banking::state::postgres::database::DatabaseError; +use talos_common_utils::{env_var, env_var_with_defaults}; +use talos_rdkafka_utils::kafka_config::KafkaConfig; use tokio::signal; pub static SNAPSHOT_SINGLETON_ROW_ID: &str = "SINGLETON"; @@ -43,7 +45,7 @@ async fn main() { // 0. Create required items. // a. Create Kafka consumer - let mut kafka_config = KafkaConfig::from_env(); + let mut kafka_config = KafkaConfig::from_env(None); kafka_config.group_id = env_var!("BANK_REPLICATOR_KAFKA_GROUP_ID"); let kafka_consumer = KafkaConsumer::new(&kafka_config); diff --git a/packages/cohort_banking/src/app.rs b/packages/cohort_banking/src/app.rs index 4a7a35f5..fe5a692a 100644 --- a/packages/cohort_banking/src/app.rs +++ b/packages/cohort_banking/src/app.rs @@ -29,16 +29,7 @@ pub struct BankingApp { } impl BankingApp { - pub async fn new(config: Config) -> Result { - let db_config = DatabaseConfig { - pool_size: config.db_pool_size, - user: config.db_user.clone(), - password: config.db_password.clone(), - host: config.db_host.clone(), - port: config.db_port.clone(), - database: config.db_database.clone(), - }; - + pub async fn new(config: Config, db_config: DatabaseConfig) -> Result { let meter = global::meter("banking_cohort"); let counter_aborts = meter.u64_counter("metric_aborts").with_unit(Unit::new("tx")).init(); let counter_commits = meter.u64_counter("metric_commits").with_unit(Unit::new("tx")).init(); diff --git a/packages/cohort_banking/src/callbacks/state_provider.rs b/packages/cohort_banking/src/callbacks/state_provider.rs index a5e27bed..f310e2d4 100644 --- a/packages/cohort_banking/src/callbacks/state_provider.rs +++ b/packages/cohort_banking/src/callbacks/state_provider.rs @@ -73,6 +73,7 @@ impl StateProviderImpl { version: account.version, }) .collect(), + abort_reason: None, }) } @@ -117,6 +118,7 @@ impl StateProviderImpl { version: tuple.0.version, }) .collect(), + abort_reason: None, }) } } diff --git a/packages/cohort_sdk/Cargo.toml b/packages/cohort_sdk/Cargo.toml index a78c6998..efe20e9e 100644 --- a/packages/cohort_sdk/Cargo.toml +++ b/packages/cohort_sdk/Cargo.toml @@ -21,6 +21,7 @@ strum = { version = "0.25", features = ["derive"] } metrics = { path = "../metrics" } talos_agent = { path = "../talos_agent" } talos_certifier = { path = "../talos_certifier" } +talos_rdkafka_utils = { path = "../talos_rdkafka_utils" } uuid = { version = "1.2.2", features = ["v4"] } tokio = { workspace = true, features = ["full"] } diff --git a/packages/cohort_sdk/src/cohort.rs b/packages/cohort_sdk/src/cohort.rs index 31c94bee..12817538 100644 --- a/packages/cohort_sdk/src/cohort.rs +++ b/packages/cohort_sdk/src/cohort.rs @@ -12,7 +12,7 @@ use talos_agent::{ core::{AgentServices, TalosAgentImpl}, model::{CancelRequestChannelMessage, CertifyRequestChannelMessage}, }, - api::{AgentConfig, CandidateData, CertificationRequest, KafkaConfig, TalosAgent}, + api::{AgentConfig, CandidateData, CertificationRequest, TalosAgent, TalosType}, messaging::{ api::{Decision, DecisionMessage}, kafka::KafkaInitializer, @@ -20,6 +20,7 @@ use talos_agent::{ metrics::{client::MetricsClient, model::Signal}, mpsc::core::{ReceiverWrapper, SenderWrapper}, }; +use talos_rdkafka_utils::kafka_config::KafkaConfig; use crate::{ delay_controller::DelayController, @@ -62,7 +63,7 @@ impl Cohort { // Returns error descrition. If string is empty it means there was no error installing ) -> Result { let agent_config: AgentConfig = config.clone().into(); - let kafka_config: KafkaConfig = config.clone().into(); + let kafka_config: KafkaConfig = config.kafka.clone(); // // Create instance of Agent @@ -93,7 +94,7 @@ impl Cohort { }, ); - let (publisher, consumer) = KafkaInitializer::connect(agent_config.agent.clone(), kafka_config) + let (publisher, consumer) = KafkaInitializer::connect(agent_config.agent.clone(), kafka_config, TalosType::External) .await .map_err(|me| ClientError { kind: model::ClientErrorKind::Messaging, @@ -273,6 +274,14 @@ impl Cohort { attempts += 1; let is_success = match self.send_to_talos_attempt(request.clone(), state_provider, recent_conflict).await { + CertificationAttemptOutcome::ClientAborted { reason } => { + result = Some(Err(ClientError { + kind: model::ClientErrorKind::ClientAborted, + reason, + cause: None, + })); + false + } CertificationAttemptOutcome::Success { mut response } => { response.metadata.duration_ms = started_at.elapsed().as_millis() as u64; response.metadata.attempts = attempts; @@ -380,6 +389,10 @@ impl Cohort { Ok(local_state) => local_state, }; + if let Some(reason) = local_state.abort_reason { + return CertificationAttemptOutcome::ClientAborted { reason }; + } + log::debug!("loaded state: {}, {:?}", local_state.snapshot_version, local_state.items); let (snapshot, readvers) = Self::select_snapshot_and_readvers(local_state.snapshot_version, local_state.items.iter().map(|i| i.version).collect()); @@ -437,7 +450,9 @@ impl Cohort { match result_local_state { Err(reason) => return Err(SnapshotPollErrorType::FetchError { reason }), Ok(current_state) => { - if current_state.snapshot_version < conflict { + if current_state.abort_reason.is_some() { + break Ok(current_state); + } else if current_state.snapshot_version < conflict { // not safe yet let waited = poll_started_at.elapsed(); if waited >= timeout { diff --git a/packages/cohort_sdk/src/model/callbacks.rs b/packages/cohort_sdk/src/model/callbacks.rs index 0c6bb390..9a3e7c2f 100644 --- a/packages/cohort_sdk/src/model/callbacks.rs +++ b/packages/cohort_sdk/src/model/callbacks.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; pub struct CapturedState { + pub abort_reason: Option, pub snapshot_version: u64, pub items: Vec, } diff --git a/packages/cohort_sdk/src/model/internal.rs b/packages/cohort_sdk/src/model/internal.rs index ea32b708..133b8d8b 100644 --- a/packages/cohort_sdk/src/model/internal.rs +++ b/packages/cohort_sdk/src/model/internal.rs @@ -5,6 +5,7 @@ use talos_agent::agent::errors::AgentError; use super::CertificationResponse; pub(crate) enum CertificationAttemptOutcome { + ClientAborted { reason: String }, Success { response: CertificationResponse }, Aborted { response: CertificationResponse }, AgentError { error: AgentError }, diff --git a/packages/cohort_sdk/src/model/mod.rs b/packages/cohort_sdk/src/model/mod.rs index 06910693..adb9c3c0 100644 --- a/packages/cohort_sdk/src/model/mod.rs +++ b/packages/cohort_sdk/src/model/mod.rs @@ -6,9 +6,10 @@ use std::{collections::HashMap, fmt::Display}; use serde_json::Value; use talos_agent::{ agent::errors::{AgentError, AgentErrorKind}, - api::{AgentConfig, KafkaConfig, TalosType}, + api::AgentConfig, messaging::api::Decision, }; +use talos_rdkafka_utils::kafka_config::KafkaConfig; use tokio::task::JoinHandle; #[derive(Clone)] @@ -46,6 +47,7 @@ pub struct ResponseMetadata { pub enum ClientErrorKind { Certification, CertificationTimeout, + ClientAborted, Messaging, Persistence, Internal, @@ -74,19 +76,22 @@ impl BackoffConfig { #[derive(Clone)] pub struct Config { // - // cohort configs + // Cohort configs // + // Backoff setting before re-polling after Talos returned abort caused by conflict pub backoff_on_conflict: BackoffConfig, + // Backoff setting before re-trying to send request to Talos pub retry_backoff: BackoffConfig, pub retry_attempts_max: u32, + // Backoff setting before re-trying DB install operations during when handling out of order installs pub retry_oo_backoff: BackoffConfig, pub retry_oo_attempts_max: u32, pub snapshot_wait_timeout_ms: u32, // - // agent config values + // Agent config values // pub agent: String, pub cohort: String, @@ -94,37 +99,30 @@ pub struct Config { pub buffer_size: usize, pub timeout_ms: u64, - // - // Common to kafka configs values - // - pub brokers: String, - pub topic: String, - pub sasl_mechanisms: Option, - pub kafka_username: Option, - pub kafka_password: Option, - // // Kafka configs for Agent // - // Must be unique for each agent instance. Can be the same as AgentConfig.agent_id - pub agent_group_id: String, - pub agent_fetch_wait_max_ms: u32, - // The maximum time librdkafka may use to deliver a message (including retries) - pub agent_message_timeout_ms: u32, - // Controls how long to wait until message is successfully placed on the librdkafka producer queue (including retries). - pub agent_enqueue_timeout_ms: u32, - // should be mapped to rdkafka::config::RDKafkaLogLevel - pub agent_log_level: u32, + pub kafka: KafkaConfig, +} - // - // Database config - // - pub db_pool_size: usize, - pub db_user: String, - pub db_password: String, - pub db_host: String, - pub db_port: String, - pub db_database: String, +impl Config { + pub fn create(agent: String, cohort: String, kafka_config: KafkaConfig) -> Self { + Self { + backoff_on_conflict: BackoffConfig { min_ms: 1, max_ms: 1500 }, + retry_backoff: BackoffConfig { min_ms: 20, max_ms: 1500 }, + retry_attempts_max: 10, + retry_oo_backoff: BackoffConfig { min_ms: 20, max_ms: 1500 }, + retry_oo_attempts_max: 10, + snapshot_wait_timeout_ms: 10_000, + agent, + cohort, + buffer_size: 100_000, + timeout_ms: 30_000, + + // Kafka + kafka: kafka_config, + } + } } pub struct ReplicatorServices { @@ -143,24 +141,6 @@ impl From for AgentConfig { } } -impl From for KafkaConfig { - fn from(val: Config) -> Self { - KafkaConfig { - brokers: val.brokers, - certification_topic: val.topic, - sasl_mechanisms: val.sasl_mechanisms, - username: val.kafka_username, - password: val.kafka_password, - group_id: val.agent_group_id, - fetch_wait_max_ms: val.agent_fetch_wait_max_ms, - message_timeout_ms: val.agent_message_timeout_ms, - enqueue_timeout_ms: val.agent_enqueue_timeout_ms, - log_level: KafkaConfig::map_log_level(val.agent_log_level), - talos_type: TalosType::External, - } - } -} - impl From for ClientError { fn from(agent_error: AgentError) -> Self { let (kind, reason) = match agent_error.kind { diff --git a/packages/talos_agent/Cargo.toml b/packages/talos_agent/Cargo.toml index 44afc3f4..9769a96e 100644 --- a/packages/talos_agent/Cargo.toml +++ b/packages/talos_agent/Cargo.toml @@ -8,17 +8,19 @@ edition = "2021" async-trait = { workspace = true } env_logger = { workspace = true } log = { workspace = true } -multimap = { version = "0.8.3" } -rdkafka = { version = "0.29.0", features = ["sasl"] } +multimap = { version = "0.9.0" } +rdkafka = { version = "0.33.2", features = ["sasl"] } rdkafka-sys = { version = "4.3.0" } serde = { workspace = true } serde_json = { workspace = true } -strum = { version = "0.24", features = ["derive"] } +strum = { version = "0.25", features = ["derive"] } thiserror = { version = "1.0.31" } time = { version = "0.3.17" } tokio = { workspace = true, features = ["full"] } uuid = { version = "1.2.2", features = ["v4"] } +talos_rdkafka_utils = { path = "../talos_rdkafka_utils" } + [dev-dependencies] mockall = { version = "0.11.3" } tokio-test = { version = "0.4.2" } diff --git a/packages/talos_agent/src/agent/decision_reader.rs b/packages/talos_agent/src/agent/decision_reader.rs index 1582f06e..e5633b67 100644 --- a/packages/talos_agent/src/agent/decision_reader.rs +++ b/packages/talos_agent/src/agent/decision_reader.rs @@ -45,6 +45,7 @@ enum ReaderError { #[cfg(test)] mod tests { use super::*; + use crate::api::TalosType; use crate::messaging::api::Consumer; use crate::messaging::api::Decision::Committed; use crate::messaging::errors::{MessagingError, MessagingErrorKind}; @@ -66,6 +67,7 @@ mod tests { #[async_trait] impl Consumer for NoopConsumer { + pub fn get_talos_type(&self) -> TalosType; pub async fn receive_message(&self) -> Option>; } } diff --git a/packages/talos_agent/src/api.rs b/packages/talos_agent/src/api.rs index aca85637..4e3f40db 100644 --- a/packages/talos_agent/src/api.rs +++ b/packages/talos_agent/src/api.rs @@ -2,7 +2,6 @@ use crate::agent::errors::AgentError; use crate::messaging::api::{ConflictMessage, Decision}; use crate::metrics::model::MetricsReport; use async_trait::async_trait; -use rdkafka::config::RDKafkaLogLevel; use serde_json::Value; use std::collections::HashMap; use std::time::Duration; @@ -58,40 +57,6 @@ pub enum TalosType { InProcessMock, // kafka listener and decision publisher is out internal function } -/// Kafka-related configuration -#[derive(Clone, Debug)] -pub struct KafkaConfig { - pub brokers: String, - // Must be unique for each agent instance. Can be the same as AgentConfig.agent_id - pub group_id: String, - pub certification_topic: String, - pub fetch_wait_max_ms: u32, - // The maximum time librdkafka may use to deliver a message (including retries) - pub message_timeout_ms: u32, - // Controls how long to wait until message is successfully placed on the librdkafka producer queue (including retries). - pub enqueue_timeout_ms: u32, - pub log_level: RDKafkaLogLevel, - pub talos_type: TalosType, - // defaults to SCRAM-SHA-512 - pub sasl_mechanisms: Option, - pub username: Option, - pub password: Option, -} - -impl KafkaConfig { - pub fn map_log_level(level: u32) -> RDKafkaLogLevel { - match level { - 0 => RDKafkaLogLevel::Emerg, - 1 => RDKafkaLogLevel::Alert, - 2 => RDKafkaLogLevel::Critical, - 3 => RDKafkaLogLevel::Error, - 4 => RDKafkaLogLevel::Warning, - 5 => RDKafkaLogLevel::Notice, - 6 => RDKafkaLogLevel::Info, - _ => RDKafkaLogLevel::Debug, - } - } -} /// The agent interface exposed to the client #[async_trait] pub trait TalosAgent { diff --git a/packages/talos_agent/src/messaging/api.rs b/packages/talos_agent/src/messaging/api.rs index a4dda59d..6bb98d24 100644 --- a/packages/talos_agent/src/messaging/api.rs +++ b/packages/talos_agent/src/messaging/api.rs @@ -1,4 +1,4 @@ -use crate::api::{CandidateData, StateMap}; +use crate::api::{CandidateData, StateMap, TalosType}; use crate::messaging::errors::MessagingError; use async_trait::async_trait; use serde::{Deserialize, Serialize}; @@ -113,6 +113,7 @@ pub type PublisherType = dyn Publisher + Sync + Send; #[async_trait] pub trait Consumer { async fn receive_message(&self) -> Option>; + fn get_talos_type(&self) -> TalosType; } pub type ConsumerType = dyn Consumer + Sync + Send; diff --git a/packages/talos_agent/src/messaging/kafka.rs b/packages/talos_agent/src/messaging/kafka.rs index 7ce012b5..2b87fed8 100644 --- a/packages/talos_agent/src/messaging/kafka.rs +++ b/packages/talos_agent/src/messaging/kafka.rs @@ -1,4 +1,4 @@ -use crate::api::{KafkaConfig, TalosType}; +use crate::api::TalosType; use crate::messaging::api::{ CandidateMessage, ConsumerType, Decision, DecisionMessage, PublishResponse, Publisher, PublisherType, TalosMessageType, HEADER_AGENT_ID, HEADER_MESSAGE_TYPE, @@ -11,12 +11,13 @@ use rdkafka::error::KafkaError; use rdkafka::message::{Header, Headers, OwnedHeaders}; use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::util::Timeout; -use rdkafka::{ClientConfig, ClientContext, Message, Offset, TopicPartitionList}; +use rdkafka::{ClientContext, Message, Offset, TopicPartitionList}; use std::collections::HashMap; use std::str::Utf8Error; use std::sync::Arc; use std::time::Duration; use std::{str, thread}; +use talos_rdkafka_utils::kafka_config::KafkaConfig; use time::OffsetDateTime; use super::api::TxProcessingTimeline; @@ -47,24 +48,10 @@ impl KafkaPublisher { Ok(Self { agent, config: config.clone(), - producer: Self::create_producer(config)?, + producer: config.build_producer_config().create()?, }) } - fn create_producer(kafka: &KafkaConfig) -> Result { - let mut cfg = ClientConfig::new(); - cfg.set("bootstrap.servers", &kafka.brokers) - .set("message.timeout.ms", &kafka.message_timeout_ms.to_string()) - .set("queue.buffering.max.messages", "1000000") - .set("topic.metadata.refresh.interval.ms", "5") - .set("socket.keepalive.enable", "true") - .set("acks", "0") - .set_log_level(kafka.log_level); - - setup_kafka_auth(&mut cfg, kafka); - cfg.create() - } - pub fn make_record<'a>(agent: String, topic: &'a str, key: &'a str, message: &'a str) -> FutureRecord<'a, str, str> { let type_value = TalosMessageType::Candidate.to_string(); let h_type = Header { @@ -88,13 +75,13 @@ impl Publisher for KafkaPublisher { async fn send_message(&self, key: String, mut message: CandidateMessage) -> Result { debug!("KafkaPublisher.send_message(): async publishing message {:?} with key: {}", message, key); - let topic = self.config.certification_topic.clone(); + let topic = self.config.topic.clone(); message.published_at = OffsetDateTime::now_utc().unix_timestamp_nanos(); let payload = serde_json::to_string(&message).unwrap(); - let data = KafkaPublisher::make_record(self.agent.clone(), &self.config.certification_topic, key.as_str(), payload.as_str()); + let data = KafkaPublisher::make_record(self.agent.clone(), &self.config.topic, key.as_str(), payload.as_str()); - let timeout = Timeout::After(Duration::from_millis(self.config.enqueue_timeout_ms as u64)); + let timeout = Timeout::After(Duration::from_millis(self.config.producer_send_timeout_ms.unwrap_or(10) as u64)); return match self.producer.send(data, timeout).await { Ok((partition, offset)) => { debug!("KafkaPublisher.send_message(): Published into partition {}, offset: {}", partition, offset); @@ -113,6 +100,7 @@ pub struct KafkaConsumer { agent: String, config: KafkaConfig, consumer: StreamConsumer, + talos_type: TalosType, } struct KafkaConsumerContext {} @@ -130,33 +118,24 @@ impl ConsumerContext for KafkaConsumerContext { } impl KafkaConsumer { - pub fn new(agent: String, config: &KafkaConfig) -> Result { + pub fn new(agent: String, config: &KafkaConfig, talos_type: TalosType) -> Result { let consumer = Self::create_consumer(config)?; Ok(KafkaConsumer { agent, config: config.clone(), consumer, + talos_type, }) } - fn create_consumer(kafka: &KafkaConfig) -> Result, KafkaError> { - let mut cfg = ClientConfig::new(); - cfg.set("bootstrap.servers", &kafka.brokers) - .set("group.id", &kafka.group_id) - .set("enable.auto.commit", "false") - .set("auto.offset.reset", "latest") - .set("socket.keepalive.enable", "true") - .set("fetch.wait.max.ms", kafka.fetch_wait_max_ms.to_string()) - .set_log_level(kafka.log_level); - - setup_kafka_auth(&mut cfg, kafka); - + fn create_consumer(kafka_config: &KafkaConfig) -> Result, KafkaError> { + let cfg = kafka_config.build_consumer_config(); cfg.create_with_context(KafkaConsumerContext {}) } pub fn subscribe(&self) -> Result<(), KafkaError> { - let topic = self.config.certification_topic.as_str(); + let topic = &self.config.topic.as_str(); let partition = 0_i32; let mut partition_list = TopicPartitionList::new(); @@ -177,7 +156,7 @@ impl KafkaConsumer { } fn get_offset(&self, partition: i32, timeout: Duration) -> Result { - let topic = self.config.certification_topic.as_str(); + let topic = &self.config.topic.as_str(); let (_low, high) = self.consumer.fetch_watermarks(topic, partition, timeout)?; Ok(Offset::Offset(high)) @@ -186,6 +165,10 @@ impl KafkaConsumer { #[async_trait] impl crate::messaging::api::Consumer for KafkaConsumer { + fn get_talos_type(&self) -> TalosType { + self.talos_type.clone() + } + async fn receive_message(&self) -> Option> { let rslt_received = self .consumer @@ -238,22 +221,12 @@ impl crate::messaging::api::Consumer for KafkaConsumer { } }); - parsed_type.and_then(|message_type| deserialize_decision(&self.config.talos_type, &message_type, &received.payload_view::())) + parsed_type.and_then(|message_type| deserialize_decision(&self.get_talos_type(), &message_type, &received.payload_view::())) } } /// Utilities. -fn setup_kafka_auth(client: &mut ClientConfig, config: &KafkaConfig) { - if let Some(username) = &config.username { - client - .set("security.protocol", "SASL_PLAINTEXT") - .set("sasl.mechanisms", config.sasl_mechanisms.clone().unwrap_or_else(|| "SCRAM-SHA-512".to_string())) - .set("sasl.username", username) - .set("sasl.password", config.password.clone().unwrap_or_default()); - } -} - fn deserialize_decision( talos_type: &TalosType, message_type: &TalosMessageType, @@ -311,9 +284,13 @@ pub struct KafkaInitializer {} impl KafkaInitializer { /// Creates new instances of initialised and fully connected publisher and consumer - pub async fn connect(agent: String, kafka_config: KafkaConfig) -> Result<(Arc>, Arc>), MessagingError> { + pub async fn connect( + agent: String, + kafka_config: KafkaConfig, + talos_type: TalosType, + ) -> Result<(Arc>, Arc>), MessagingError> { let kafka_publisher = KafkaPublisher::new(agent.clone(), &kafka_config)?; - let kafka_consumer = KafkaConsumer::new(agent, &kafka_config)?; + let kafka_consumer = KafkaConsumer::new(agent, &kafka_config, talos_type)?; kafka_consumer.subscribe()?; let consumer: Arc> = Arc::new(Box::new(kafka_consumer)); @@ -364,50 +341,3 @@ mod tests_publisher { } } // $coverage:ignore-end - -// $coverage:ignore-start -#[cfg(test)] -mod tests { - use super::*; - use rdkafka::config::RDKafkaLogLevel; - - #[test] - fn test_setup_kafka_auth() { - let mut cfg = ClientConfig::new(); - setup_kafka_auth( - &mut cfg, - &KafkaConfig { - brokers: "brokers".to_string(), - group_id: "group_id".to_string(), - certification_topic: "certification_topic".to_string(), - fetch_wait_max_ms: 1_u32, - message_timeout_ms: 1_u32, - enqueue_timeout_ms: 1_u32, - log_level: RDKafkaLogLevel::Debug, - talos_type: TalosType::InProcessMock, - sasl_mechanisms: None, - username: Some("user1".to_string()), - password: None, - }, - ); - assert!(check_key(&mut cfg, "security.protocol", "SASL_PLAINTEXT")); - assert!(check_key(&mut cfg, "sasl.mechanisms", "SCRAM-SHA-512")); - assert!(check_key(&mut cfg, "sasl.username", "user1")); - assert!(check_key(&mut cfg, "sasl.password", "")); - - cfg.set("sasl.password", "pwd".to_string()); - assert!(check_key(&mut cfg, "sasl.password", "pwd")); - - cfg.set("sasl.mechanisms", "ANONYMOUS".to_string()); - assert!(check_key(&mut cfg, "sasl.mechanisms", "ANONYMOUS")); - } - - fn check_key(cfg: &mut ClientConfig, key: &str, value: &str) -> bool { - if let Some(v) = cfg.get(key) { - v == value - } else { - false - } - } -} -// $coverage:ignore-end diff --git a/packages/talos_certifier/src/lib.rs b/packages/talos_certifier/src/lib.rs index 7977ee6a..7cf7e2cb 100644 --- a/packages/talos_certifier/src/lib.rs +++ b/packages/talos_certifier/src/lib.rs @@ -7,7 +7,6 @@ pub mod model; pub mod ports; pub mod services; pub mod talos_certifier_service; -pub mod utils; pub use crate::core::{ChannelMessage, SystemMessage}; pub use certifier::Certifier; diff --git a/packages/talos_certifier_adapters/Cargo.toml b/packages/talos_certifier_adapters/Cargo.toml index 7f70d6f4..2a69f054 100644 --- a/packages/talos_certifier_adapters/Cargo.toml +++ b/packages/talos_certifier_adapters/Cargo.toml @@ -21,7 +21,7 @@ tokio = { workspace = true } async-trait = { workspace = true } futures-util = "0.3.21" # Kafka -rdkafka = { version = "0.29.0", features = ["sasl"] } +rdkafka = { version = "0.33.2", features = ["sasl"] } # uuid uuid = { version = "1.2.2", features = [] } @@ -42,11 +42,13 @@ thiserror = "1.0.31" mockall = "0.11.0" # internal crates -logger = { path = "../logger" } -metrics = { path = "../metrics" } -talos_certifier = { path = "../talos_certifier" } -talos_suffix = { path = "../talos_suffix" } +logger = { path = "../logger" } +metrics = { path = "../metrics" } +talos_certifier = { path = "../talos_certifier" } +talos_suffix = { path = "../talos_suffix" } +talos_common_utils = { path = "../talos_common_utils" } +talos_rdkafka_utils = { path = "../talos_rdkafka_utils" } [dev-dependencies] -serial_test = "1.0.0" +serial_test = "2.0.0" diff --git a/packages/talos_certifier_adapters/src/bin/histogram_decision_timeline_from_kafka.rs b/packages/talos_certifier_adapters/src/bin/histogram_decision_timeline_from_kafka.rs index 4386f606..d6ba80ac 100644 --- a/packages/talos_certifier_adapters/src/bin/histogram_decision_timeline_from_kafka.rs +++ b/packages/talos_certifier_adapters/src/bin/histogram_decision_timeline_from_kafka.rs @@ -2,7 +2,8 @@ use std::{collections::HashMap, time::Duration}; use metrics::model::MinMax; use talos_certifier::{model::metrics::TxProcessingTimeline, ports::MessageReciever, ChannelMessage}; -use talos_certifier_adapters::{KafkaConfig, KafkaConsumer}; +use talos_certifier_adapters::KafkaConsumer; +use talos_rdkafka_utils::kafka_config::KafkaConfig; use time::OffsetDateTime; use tokio::time::timeout; @@ -10,12 +11,15 @@ use tokio::time::timeout; async fn main() -> Result<(), String> { env_logger::builder().format_timestamp_millis().init(); - let mut kafka_config = KafkaConfig::from_env(); - let mut cfg: HashMap<&'static str, &'static str> = HashMap::new(); - cfg.insert("enable.auto.commit", "false"); - cfg.insert("auto.offset.reset", "earliest"); + let mut kafka_config = KafkaConfig::from_env(None); + kafka_config.extend( + None, + Some(HashMap::from([ + ("auto.commit.enable".into(), "false".into()), + ("auto.offset.reset".into(), "earliest".into()), + ])), + ); - kafka_config.set_overrides(HashMap::new(), cfg); kafka_config.group_id = format!("talos-metric-histogram-timings-{}", uuid::Uuid::new_v4()); let mut kafka_consumer = KafkaConsumer::new(&kafka_config); kafka_consumer.subscribe().await.unwrap(); diff --git a/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs b/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs index 2a95539f..8394e080 100644 --- a/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs +++ b/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs @@ -1,5 +1,4 @@ use crate as Adapters; -use crate::kafka::config::KafkaConfig; use crate::mock_certifier_service::MockCertifierService; use crate::PgConfig; use std::sync::{atomic::AtomicI64, Arc}; @@ -14,6 +13,7 @@ use talos_certifier::{ services::{CertifierService, DecisionOutboxService, MessageReceiverService}, talos_certifier_service::{TalosCertifierService, TalosCertifierServiceBuilder}, }; +use talos_rdkafka_utils::kafka_config::KafkaConfig; use talos_suffix::core::SuffixConfig; use tokio::sync::{broadcast, mpsc}; diff --git a/packages/talos_certifier_adapters/src/kafka/config.rs b/packages/talos_certifier_adapters/src/kafka/config.rs deleted file mode 100644 index 98541402..00000000 --- a/packages/talos_certifier_adapters/src/kafka/config.rs +++ /dev/null @@ -1,231 +0,0 @@ -use rdkafka::ClientConfig; -use std::collections::HashMap; -use talos_certifier::env_var; - -#[derive(Debug, Clone)] -pub struct KafkaConfig { - pub brokers: Vec, - pub topic: String, - pub client_id: String, - pub group_id: String, - pub username: String, - pub password: String, - pub producer_config_overrides: HashMap<&'static str, &'static str>, - pub consumer_config_overrides: HashMap<&'static str, &'static str>, -} - -impl KafkaConfig { - pub fn from_env() -> Self { - KafkaConfig { - brokers: env_var!("KAFKA_BROKERS", Vec), - topic: env_var!("KAFKA_TOPIC"), - client_id: env_var!("KAFKA_CLIENT_ID"), - group_id: env_var!("KAFKA_GROUP_ID"), - username: env_var!("KAFKA_USERNAME"), - password: env_var!("KAFKA_PASSWORD"), - producer_config_overrides: HashMap::new(), - consumer_config_overrides: HashMap::new(), - } - } - - pub fn set_overrides( - &mut self, - producer_config_overrides: HashMap<&'static str, &'static str>, - consumer_config_overrides: HashMap<&'static str, &'static str>, - ) { - self.producer_config_overrides = producer_config_overrides; - self.consumer_config_overrides = consumer_config_overrides; - } - - pub fn build_consumer_config(&self) -> ClientConfig { - let mut client_config = ClientConfig::new(); - - let username = self.username.to_owned(); - let password = self.password.to_owned(); - let brokers = self.brokers.join(","); - let mut base_config = HashMap::from([ - ("group.id", self.group_id.as_str()), - ("bootstrap.servers", brokers.as_str()), - ("auto.offset.reset", "earliest"), - ("socket.keepalive.enable", "true"), - ("enable.auto.commit", "false"), - ]); - - base_config.extend(&self.consumer_config_overrides); - - for (k, v) in base_config.into_iter() { - client_config.set(k, v); - } - - if !username.is_empty() && !password.is_empty() { - client_config - .set("security.protocol", "SASL_PLAINTEXT") - .set("sasl.mechanisms", "SCRAM-SHA-512") - .set("sasl.username", username) - .set("sasl.password", password); - } - - client_config - } - - pub fn build_producer_config(&self) -> ClientConfig { - let mut client_config = ClientConfig::new(); - - let username = self.username.to_owned(); - let password = self.password.to_owned(); - let brokers = self.brokers.join(","); - let mut base_config = HashMap::from([ - ("message.timeout.ms", "30000"), - ("bootstrap.servers", brokers.as_str()), - ("message.send.max.retries", "100000"), - ]); - base_config.extend(&self.producer_config_overrides); - for (k, v) in base_config.into_iter() { - client_config.set(k, v); - } - if !username.is_empty() && !password.is_empty() { - client_config - .set("security.protocol", "SASL_PLAINTEXT") - .set("sasl.mechanisms", "SCRAM-SHA-512") - .set("sasl.username", username) - .set("sasl.password", password); - } - - client_config - } -} - -#[cfg(test)] -mod tests { - use std::{collections::HashMap, env}; - - use serial_test::serial; - - use super::*; - - fn set_env_var(key: &str, value: &str) { - env::set_var(key, value) - } - - fn unset_env_var(key: &str) { - env::remove_var(key) - } - - fn get_kafka_env_variables() -> HashMap<&'static str, &'static str> { - let env_hashmap = [ - ("KAFKA_BROKERS", "broker1, broker2 "), - ("KAFKA_TOPIC", "some-topic"), - ("KAFKA_CLIENT_ID", "some-client-id"), - ("KAFKA_GROUP_ID", "some-group-id"), - ("KAFKA_USERNAME", ""), - ("KAFKA_PASSWORD", ""), - ]; - HashMap::from(env_hashmap) - } - - fn build_test_kafka_config() -> KafkaConfig { - KafkaConfig { - brokers: vec!["broker1".to_string()], - topic: "topic".to_owned(), - client_id: "client-id-1".to_string(), - group_id: "group-id-1".to_string(), - username: "user_name".to_owned(), - password: "password".to_owned(), - producer_config_overrides: Default::default(), - consumer_config_overrides: Default::default(), - } - } - - #[test] - #[serial] - fn test_from_env_gets_values_successully() { - get_kafka_env_variables().iter().for_each(|(k, v)| { - set_env_var(k, v); - }); - - let config = KafkaConfig::from_env(); - - assert_eq!(config.client_id, "some-client-id"); - assert_eq!(config.brokers.len(), 2); - - get_kafka_env_variables().iter().for_each(|(k, _)| { - unset_env_var(k); - }); - } - #[test] - #[serial] - #[should_panic(expected = "KAFKA_TOPIC environment variable is not defined")] - fn test_from_env_when_env_variable_not_found() { - get_kafka_env_variables().iter().for_each(|(k, v)| { - set_env_var(k, v); - }); - - unset_env_var("KAFKA_TOPIC"); - - let _config = KafkaConfig::from_env(); - - get_kafka_env_variables().iter().for_each(|(k, _)| { - unset_env_var(k); - }); - } - - #[test] - fn test_build_consumer_config_obj() { - let config = build_test_kafka_config().build_consumer_config(); - assert_eq!(config.get("group.id").unwrap(), "group-id-1"); - assert_eq!(config.get("socket.keepalive.enable").unwrap(), "true"); - assert_eq!(config.get("sasl.username").unwrap(), "user_name"); - } - #[test] - fn test_passing_credentials_to_build_consumer_config() { - let config = KafkaConfig { - brokers: vec!["broker1".to_string()], - topic: "consumer-topic-1".to_owned(), - client_id: "client-id-1".to_string(), - group_id: "groud-id-1".to_string(), - username: "user".to_string(), - password: "password".to_string(), - producer_config_overrides: Default::default(), - consumer_config_overrides: Default::default(), - }; - let client_config = config.build_consumer_config(); - assert_eq!(client_config.get("auto.offset.reset").unwrap(), "earliest"); - assert_eq!(client_config.get("socket.keepalive.enable").unwrap(), "true"); - assert_eq!(client_config.get("enable.auto.commit").unwrap(), "false"); - assert_eq!(client_config.get("sasl.username").unwrap(), "user"); - assert_eq!(client_config.get("sasl.password").unwrap(), "password"); - assert_eq!(client_config.get("security.protocol").unwrap(), "SASL_PLAINTEXT"); - } - - #[test] - fn test_build_producer_config_obj() { - let config = build_test_kafka_config().build_producer_config(); - assert!(config.get("group.id").is_none()); - assert_eq!(config.get("sasl.username").unwrap(), "user_name"); - assert_eq!(config.get("sasl.password").unwrap(), "password"); - assert_eq!(config.get("message.timeout.ms").unwrap(), "30000"); - assert_eq!(config.get("message.send.max.retries").unwrap(), "100000"); - } - #[test] - fn test_passing_overrides() { - let mut kafka_config = KafkaConfig { - brokers: vec!["broker1".to_string()], - topic: "topic".to_owned(), - client_id: "client-id-1".to_string(), - group_id: "groud-id-1".to_string(), - username: "user".to_string(), - password: "password".to_string(), - producer_config_overrides: Default::default(), - consumer_config_overrides: Default::default(), - }; - let producer_override = HashMap::from([("message.timeout.ms", "10")]); - let consumer_override = HashMap::from([("auto.offset.reset", "latest")]); - kafka_config.set_overrides(producer_override, consumer_override); - let producer_config = kafka_config.build_producer_config(); - assert_eq!(producer_config.get("sasl.username").unwrap(), "user"); - assert_eq!(producer_config.get("message.timeout.ms").unwrap(), "10"); - let consumer_config = kafka_config.build_consumer_config(); - assert_eq!(consumer_config.get("sasl.username").unwrap(), "user"); - assert_eq!(consumer_config.get("auto.offset.reset").unwrap(), "latest"); - } -} diff --git a/packages/talos_certifier_adapters/src/kafka/consumer.rs b/packages/talos_certifier_adapters/src/kafka/consumer.rs index c1909540..e6f6ad6f 100644 --- a/packages/talos_certifier_adapters/src/kafka/consumer.rs +++ b/packages/talos_certifier_adapters/src/kafka/consumer.rs @@ -17,11 +17,12 @@ use talos_certifier::{ }, ChannelMessage, }; +use talos_rdkafka_utils::kafka_config::KafkaConfig; use time::OffsetDateTime; use crate::{kafka::utils::get_message_headers, KafkaAdapterError}; -use super::{config::KafkaConfig, utils}; +use super::utils; // Kafka Consumer Client // #[derive(Debug, Clone)] diff --git a/packages/talos_certifier_adapters/src/kafka/kafka_deploy.rs b/packages/talos_certifier_adapters/src/kafka/kafka_deploy.rs index 0d3563c0..d0b8c408 100644 --- a/packages/talos_certifier_adapters/src/kafka/kafka_deploy.rs +++ b/packages/talos_certifier_adapters/src/kafka/kafka_deploy.rs @@ -1,6 +1,5 @@ use std::time::Duration; -use crate::KafkaConfig; use rdkafka::{ admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}, client::DefaultClientContext, @@ -8,6 +7,7 @@ use rdkafka::{ error::KafkaError, types::RDKafkaErrorCode, }; +use talos_rdkafka_utils::kafka_config::KafkaConfig; use thiserror::Error as ThisError; pub enum KafkaDeployStatus { @@ -24,7 +24,7 @@ pub enum KafkaDeployError { } pub async fn create_topic() -> Result { - let kafka_config = KafkaConfig::from_env(); + let kafka_config = KafkaConfig::from_env(None); println!("kafka configs received from env... {kafka_config:#?}"); let consumer: StreamConsumer = kafka_config.build_consumer_config().create()?; diff --git a/packages/talos_certifier_adapters/src/kafka/mod.rs b/packages/talos_certifier_adapters/src/kafka/mod.rs index 8a07f621..fa4cbb74 100644 --- a/packages/talos_certifier_adapters/src/kafka/mod.rs +++ b/packages/talos_certifier_adapters/src/kafka/mod.rs @@ -1,4 +1,3 @@ -pub mod config; pub mod consumer; pub mod errors; pub mod kafka_deploy; diff --git a/packages/talos_certifier_adapters/src/kafka/producer.rs b/packages/talos_certifier_adapters/src/kafka/producer.rs index 99df2125..7037105b 100644 --- a/packages/talos_certifier_adapters/src/kafka/producer.rs +++ b/packages/talos_certifier_adapters/src/kafka/producer.rs @@ -7,11 +7,10 @@ use talos_certifier::{ errors::SystemServiceError, ports::{common::SharedPortTraits, errors::MessagePublishError, MessagePublisher}, }; +use talos_rdkafka_utils::kafka_config::KafkaConfig; use crate::kafka::utils::build_kafka_headers; -use super::config::KafkaConfig; - // Kafka Producer // #[derive(Clone)] pub struct KafkaProducer { diff --git a/packages/talos_certifier_adapters/src/lib.rs b/packages/talos_certifier_adapters/src/lib.rs index 4d3b40a0..78ca0a93 100644 --- a/packages/talos_certifier_adapters/src/lib.rs +++ b/packages/talos_certifier_adapters/src/lib.rs @@ -1,6 +1,5 @@ // Kafka exports pub mod kafka; -pub use kafka::config::KafkaConfig; pub use kafka::consumer::KafkaConsumer; pub use kafka::errors::KafkaAdapterError; pub use kafka::producer::KafkaProducer; diff --git a/packages/talos_certifier_adapters/src/postgres/config.rs b/packages/talos_certifier_adapters/src/postgres/config.rs index 3125e8e2..0a456da2 100644 --- a/packages/talos_certifier_adapters/src/postgres/config.rs +++ b/packages/talos_certifier_adapters/src/postgres/config.rs @@ -1,4 +1,4 @@ -use talos_certifier::env_var; +use talos_common_utils::env_var; #[derive(Debug, Clone)] pub struct PgConfig { diff --git a/packages/talos_common_utils/Cargo.toml b/packages/talos_common_utils/Cargo.toml new file mode 100644 index 00000000..94413e14 --- /dev/null +++ b/packages/talos_common_utils/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "talos_common_utils" +version = "0.1.0" +edition = "2021" + +[lib] +doctest = false +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +# Logging +log = { workspace = true } +env_logger = { workspace = true } + +[dev-dependencies] +serial_test = "2.0.0" \ No newline at end of file diff --git a/packages/talos_certifier/src/utils/env.rs b/packages/talos_common_utils/src/env.rs similarity index 98% rename from packages/talos_certifier/src/utils/env.rs rename to packages/talos_common_utils/src/env.rs index 9b079f92..59776c15 100644 --- a/packages/talos_certifier/src/utils/env.rs +++ b/packages/talos_common_utils/src/env.rs @@ -140,6 +140,8 @@ macro_rules! env_var_with_defaults { mod tests { use std::env; + use serial_test::serial; + fn set_env_var(key: &str, value: &str) { env::set_var(key, value) } @@ -149,6 +151,7 @@ mod tests { } #[test] + #[serial] fn test_env_var_macro_get_value_successfully_for_key() { // When only the key is passed a value as String is returned. set_env_var("keyA", "valueA"); @@ -174,6 +177,7 @@ mod tests { unset_env_var("keyA"); } #[test] + #[serial] #[should_panic(expected = "keyB environment variable is not defined")] fn test_env_var_macro_when_key_value_not_found() { // When only the key is passed a value as String is returned. @@ -184,6 +188,7 @@ mod tests { unset_env_var("keyAE1"); } #[test] + #[serial] #[should_panic(expected = "error parsing \"valueA\" String -> u32")] fn test_env_var_macro_when_parsing_fails() { // When only the key is passed a value as String is returned. @@ -194,6 +199,7 @@ mod tests { unset_env_var("keyAE2"); } #[test] + #[serial] #[should_panic(expected = "error parsing \"1, 2 ,valueA\" String -> Vec")] fn test_env_var_macro_when_parsing_fails_for_vector() { // When only the key is passed a value as String is returned. @@ -205,12 +211,11 @@ mod tests { } #[test] + #[serial] fn test_env_var_with_default_macro_get_value_successfully_for_key() { let val = env_var_with_defaults!("keyA", "test_string".to_owned()); assert_eq!(val, "test_string".to_owned()); - // let val = env_var_with_defaults!("keyA", Option::, Some(10)); - // assert_eq!(val, Some(10)); set_env_var("keyA", "30"); let val = env_var_with_defaults!("keyA", Option::, 10); diff --git a/packages/talos_certifier/src/utils/mod.rs b/packages/talos_common_utils/src/lib.rs similarity index 100% rename from packages/talos_certifier/src/utils/mod.rs rename to packages/talos_common_utils/src/lib.rs diff --git a/packages/talos_rdkafka_utils/Cargo.toml b/packages/talos_rdkafka_utils/Cargo.toml new file mode 100644 index 00000000..aa8741a1 --- /dev/null +++ b/packages/talos_rdkafka_utils/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "talos_rdkafka_utils" +version = "0.1.0" +edition = "2021" + +[lib] +doctest = false +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt"] } +talos_common_utils = { path = "../talos_common_utils" } +# Logging +log = { workspace = true } +env_logger = { workspace = true } + +# Test +mockall = "0.11.0" + +# Error +thiserror = "1.0.31" + +rdkafka = { version = "0.33.2", features = ["sasl"] } + +[dev-dependencies] +tokio-test = "0.4" +criterion = "0.5.1" +serial_test = "2.0.0" \ No newline at end of file diff --git a/packages/talos_rdkafka_utils/src/kafka_config.rs b/packages/talos_rdkafka_utils/src/kafka_config.rs new file mode 100644 index 00000000..5a5b87a7 --- /dev/null +++ b/packages/talos_rdkafka_utils/src/kafka_config.rs @@ -0,0 +1,363 @@ +use std::{collections::HashMap, env}; + +use rdkafka::{config::RDKafkaLogLevel, ClientConfig}; +use talos_common_utils::{env_var, env_var_with_defaults}; + +#[derive(Debug, Clone)] +pub struct KafkaConfig { + pub brokers: Vec, + pub topic: String, + pub client_id: String, + pub group_id: String, + pub username: String, + pub password: String, + pub producer_config_overrides: HashMap, + pub consumer_config_overrides: HashMap, + // Controls how long to wait until message is successfully placed on the librdkafka producer queue (including retries). + pub producer_send_timeout_ms: Option, + pub log_level: Option, +} + +impl KafkaConfig { + pub fn from_env(prefix: Option<&'static str>) -> Self { + let prefix = if let Some(prefix) = prefix { format!("{}_", prefix) } else { "".to_string() }; + + let log_level = env_var_with_defaults!(format!("{}KAFKA_LOG_LEVEL", prefix), String, "info".to_string()); + let mut cfg = KafkaConfig { + brokers: env_var!(format!("{}KAFKA_BROKERS", prefix), Vec), + topic: env_var!(format!("{}KAFKA_TOPIC", prefix)), + client_id: env_var!(format!("{}KAFKA_CLIENT_ID", prefix)), + group_id: env_var!(format!("{}KAFKA_GROUP_ID", prefix)), + username: env_var!(format!("{}KAFKA_USERNAME", prefix)), + password: env_var!(format!("{}KAFKA_PASSWORD", prefix)), + producer_config_overrides: HashMap::new(), + consumer_config_overrides: HashMap::new(), + producer_send_timeout_ms: None, + log_level: if log_level.is_empty() { None } else { Some(log_level) }, + }; + + let consumer_prefix = format!("{}KAFKA_CONSUMER_OVERRIDES.", prefix); + for (name, _) in env::vars().filter(|(k, _)| k.starts_with(consumer_prefix.as_str())) { + let property = name.replace(consumer_prefix.as_str(), ""); + cfg.consumer_config_overrides.insert(property.clone(), env_var!(property.clone())); + } + + let producer_prefix = format!("{}KAFKA_PRODUCER_OVERRIDES.", prefix); + for (name, _) in env::vars().filter(|(name, _)| name.starts_with(producer_prefix.as_str())) { + let property = name.replace(producer_prefix.as_str(), ""); + cfg.producer_config_overrides.insert(property.clone(), env_var!(property.clone())); + } + + cfg + } + + pub fn extend(&mut self, producer_config_overrides: Option>, consumer_config_overrides: Option>) { + if let Some(more_values) = producer_config_overrides { + self.producer_config_overrides.extend(more_values); + } + if let Some(more_values) = consumer_config_overrides { + self.consumer_config_overrides.extend(more_values); + } + } + + pub fn set_overrides(&mut self, producer_config_overrides: HashMap, consumer_config_overrides: HashMap) { + self.producer_config_overrides = producer_config_overrides; + self.consumer_config_overrides = consumer_config_overrides; + } + + pub fn build_consumer_config(&self) -> ClientConfig { + let mut client_config = ClientConfig::new(); + + let brokers = self.brokers.join(","); + let base_config = HashMap::::from([ + ("group.id".to_string(), self.group_id.clone()), + ("bootstrap.servers".to_string(), brokers), + ("socket.keepalive.enable".to_string(), "true".to_string()), + ]); + + for (k, v) in base_config.iter() { + client_config.set(k, v); + } + for (k, v) in self.consumer_config_overrides.iter() { + client_config.set(k, v); + } + + self.setup_auth(&mut client_config, base_config); + if let Some(ref level) = self.log_level { + client_config.set_log_level(Self::map_log_level((*level).clone())); + } + + client_config + } + + pub fn build_producer_config(&self) -> ClientConfig { + let mut client_config = ClientConfig::new(); + + let brokers = self.brokers.join(","); + let base_config = HashMap::::from([ + ("message.timeout.ms".to_string(), "30000".to_string()), + ("bootstrap.servers".to_string(), brokers), + ("message.send.max.retries".to_string(), "100000".to_string()), + ]); + + for (k, v) in base_config.iter() { + client_config.set(k, v); + } + for (k, v) in self.producer_config_overrides.iter() { + client_config.set(k, v); + } + + self.setup_auth(&mut client_config, base_config); + + log::warn!("p: client_config = {:?}", client_config); + + client_config + } + + pub fn map_log_level(level: String) -> RDKafkaLogLevel { + match level.to_lowercase().as_str() { + "alert" => RDKafkaLogLevel::Alert, + "critical" => RDKafkaLogLevel::Critical, + "debug" => RDKafkaLogLevel::Debug, + "emerg" => RDKafkaLogLevel::Emerg, + "error" => RDKafkaLogLevel::Error, + "info" => RDKafkaLogLevel::Info, + "notice" => RDKafkaLogLevel::Notice, + "warning" => RDKafkaLogLevel::Warning, + _ => RDKafkaLogLevel::Info, + } + } + + fn setup_auth(&self, client: &mut ClientConfig, base_config: HashMap) { + if !self.username.is_empty() { + client + .set( + "security.protocol", + base_config.get("security.protocol").unwrap_or(&"SASL_PLAINTEXT".to_string()), + ) + .set("sasl.mechanisms", base_config.get("sasl.mechanisms").unwrap_or(&"SCRAM-SHA-512".to_string())) + .set("sasl.username", self.username.clone()) + .set("sasl.password", self.password.clone()); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serial_test::serial; + use std::{collections::HashMap, env}; + + static KAFKA_LOG_LEVEL: i32 = RDKafkaLogLevel::Error as i32; + static KAFKA_LOG_LEVEL_VALUE: &str = "error"; + + fn set_env_var(key: &str, value: &str) { + env::set_var(key, value) + } + + fn unset_env_var(key: &str) { + env::remove_var(key) + } + + fn get_kafka_env_variables(prefix: Option<&str>) -> HashMap { + let prefix = if let Some(prefix) = prefix { format!("{}_", prefix) } else { "".to_string() }; + + let env_hashmap = [ + (format!("{}KAFKA_BROKERS", prefix), "broker1, broker2 ".to_string()), + (format!("{}KAFKA_TOPIC", prefix), "some-topic".to_string()), + (format!("{}KAFKA_CLIENT_ID", prefix), "some-client-id".to_string()), + (format!("{}KAFKA_GROUP_ID", prefix), "some-group-id".to_string()), + (format!("{}KAFKA_USERNAME", prefix), "".to_string()), + (format!("{}KAFKA_PASSWORD", prefix), "".to_string()), + ]; + HashMap::from(env_hashmap) + } + + fn build_test_kafka_config() -> KafkaConfig { + KafkaConfig { + brokers: vec!["broker1".to_string()], + topic: "topic".to_owned(), + client_id: "client-id-1".to_string(), + group_id: "group-id-1".to_string(), + username: "user_name".to_owned(), + password: "password".to_owned(), + producer_config_overrides: Default::default(), + consumer_config_overrides: Default::default(), + producer_send_timeout_ms: None, + log_level: Some(KAFKA_LOG_LEVEL_VALUE.into()), + } + } + + #[test] + #[serial] + fn test_from_env_gets_values_successully() { + let prefix = None as Option<&str>; + get_kafka_env_variables(prefix).iter().for_each(|(k, v)| { + set_env_var(k, v); + }); + + let config = KafkaConfig::from_env(prefix); + + assert_eq!(config.client_id, "some-client-id"); + assert_eq!(config.brokers.len(), 2); + + get_kafka_env_variables(prefix).iter().for_each(|(k, _)| { + unset_env_var(k); + }); + } + + #[test] + #[serial] + fn test_from_env_gets_values_with_prefix_successully() { + let prefix1 = Some("TEST_PREFIX1"); + get_kafka_env_variables(prefix1).iter().for_each(|(k, v)| { + set_env_var(k, v); + }); + + let config = KafkaConfig::from_env(prefix1); + + assert_eq!(config.client_id, "some-client-id"); + assert_eq!(config.brokers.len(), 2); + + set_env_var("TEST_PREFIX2_KAFKA_CLIENT_ID", "some-client-id2"); + set_env_var("TEST_PREFIX2_KAFKA_BROKERS", "broker2-server1, broker2-server2, broker2-server3"); + set_env_var("TEST_PREFIX2_KAFKA_TOPIC", "t2"); + set_env_var("TEST_PREFIX2_KAFKA_GROUP_ID", "g2"); + set_env_var("TEST_PREFIX2_KAFKA_USERNAME", ""); + set_env_var("TEST_PREFIX2_KAFKA_PASSWORD", ""); + let config2 = KafkaConfig::from_env(Some("TEST_PREFIX2")); + + assert_eq!(config2.client_id, "some-client-id2"); + assert_eq!(config2.brokers.len(), 3); + + get_kafka_env_variables(prefix1).iter().for_each(|(k, _)| { + unset_env_var(k); + }); + unset_env_var("TEST_PREFIX2_KAFKA_CLIENT_ID"); + unset_env_var("TEST_PREFIX2_KAFKA_BROKERS"); + unset_env_var("TEST_PREFIX2_KAFKA_TOPIC"); + unset_env_var("TEST_PREFIX2_KAFKA_GROUP_ID"); + unset_env_var("TEST_PREFIX2_KAFKA_USERNAME"); + unset_env_var("TEST_PREFIX2_KAFKA_PASSWORD"); + } + + #[test] + #[serial] + #[should_panic(expected = "KAFKA_TOPIC environment variable is not defined")] + fn test_from_env_when_env_variable_not_found() { + let prefix = None as Option<&str>; + get_kafka_env_variables(prefix).iter().for_each(|(k, v)| { + set_env_var(k, v); + }); + + unset_env_var("KAFKA_TOPIC"); + + let _config = KafkaConfig::from_env(prefix); + + get_kafka_env_variables(prefix).iter().for_each(|(k, _)| { + unset_env_var(k); + }); + } + + #[test] + fn test_build_consumer_config_obj() { + let config = build_test_kafka_config().build_consumer_config(); + assert_eq!(config.get("group.id").expect("group.id"), "group-id-1"); + assert_eq!(config.get("socket.keepalive.enable").expect("socket.keepalive.enable"), "true"); + assert_eq!(config.get("sasl.username").expect("sasl.username"), "user_name"); + assert_eq!(config.log_level as i32, KAFKA_LOG_LEVEL); + } + + #[test] + fn test_passing_credentials_to_build_consumer_config() { + let config = KafkaConfig { + brokers: vec!["broker1".to_string()], + topic: "consumer-topic-1".to_owned(), + client_id: "client-id-1".to_string(), + group_id: "group-id-1".to_string(), + username: "user".to_string(), + password: "password".to_string(), + producer_config_overrides: Default::default(), + consumer_config_overrides: HashMap::from([("auto.offset.reset".into(), "earliest".into()), ("enable.auto.commit".into(), "false".into())]), + producer_send_timeout_ms: None, + log_level: None, + }; + let client_config = config.build_consumer_config(); + assert_eq!(client_config.get("auto.offset.reset").expect("auto.offset.reset"), "earliest"); + assert_eq!(client_config.get("socket.keepalive.enable").expect("socket.keepalive.enable"), "true"); + assert_eq!(client_config.get("enable.auto.commit").expect("enable.auto.commit"), "false"); + assert_eq!(client_config.get("sasl.username").expect("sasl.username"), "user"); + assert_eq!(client_config.get("sasl.password").expect("sasl.password"), "password"); + assert_eq!(client_config.get("security.protocol").expect("security.protocol"), "SASL_PLAINTEXT"); + } + + #[test] + fn test_build_producer_config_obj() { + let config = build_test_kafka_config().build_producer_config(); + assert!(config.get("group.id").is_none()); + assert_eq!(config.get("sasl.username").expect("sasl.username"), "user_name"); + assert_eq!(config.get("sasl.password").expect("sasl.password"), "password"); + assert_eq!(config.get("message.timeout.ms").expect("message.timeout.ms"), "30000"); + assert_eq!(config.get("message.send.max.retries").expect("message.send.max.retries"), "100000"); + assert_eq!(config.log_level as i32, KAFKA_LOG_LEVEL); + } + + #[test] + fn test_passing_overrides() { + let kafka_config = KafkaConfig { + brokers: vec!["broker1".to_string()], + topic: "topic".to_owned(), + client_id: "client-id-1".to_string(), + group_id: "groud-id-1".to_string(), + username: "user".to_string(), + password: "password".to_string(), + producer_config_overrides: HashMap::from([("message.timeout.ms".into(), "10".into())]), + consumer_config_overrides: HashMap::from([("auto.offset.reset".into(), "latest".into())]), + producer_send_timeout_ms: None, + log_level: None, + }; + + let producer_config = kafka_config.build_producer_config(); + assert_eq!(producer_config.get("sasl.username").expect("sasl.username"), "user"); + assert_eq!(producer_config.get("message.timeout.ms").expect("message.timeout.ms"), "10"); + let consumer_config = kafka_config.build_consumer_config(); + assert_eq!(consumer_config.get("sasl.username").expect("sasl.username"), "user"); + assert_eq!(consumer_config.get("auto.offset.reset").expect("auto.offset.reset"), "latest"); + } + + #[test] + fn test_setup_kafka_auth() { + let mut cfg = ClientConfig::new(); + let kafka_config = KafkaConfig { + brokers: vec!["brokers".to_string()], + group_id: "group_id".to_string(), + client_id: "client_id".to_string(), + topic: "certification_topic".to_string(), + log_level: Some("debug".to_string()), + username: "user1".to_string(), + password: "".to_string(), + producer_config_overrides: Default::default(), + consumer_config_overrides: Default::default(), + producer_send_timeout_ms: None, + }; + kafka_config.setup_auth(&mut cfg, HashMap::new()); + assert!(check_key(&mut cfg, "security.protocol", "SASL_PLAINTEXT")); + assert!(check_key(&mut cfg, "sasl.mechanisms", "SCRAM-SHA-512")); + assert!(check_key(&mut cfg, "sasl.username", "user1")); + assert!(check_key(&mut cfg, "sasl.password", "")); + + cfg.set("sasl.password", "pwd".to_string()); + assert!(check_key(&mut cfg, "sasl.password", "pwd")); + + cfg.set("sasl.mechanisms", "ANONYMOUS".to_string()); + assert!(check_key(&mut cfg, "sasl.mechanisms", "ANONYMOUS")); + } + + fn check_key(cfg: &mut ClientConfig, key: &str, value: &str) -> bool { + if let Some(v) = cfg.get(key) { + v == value + } else { + false + } + } +} diff --git a/packages/talos_rdkafka_utils/src/lib.rs b/packages/talos_rdkafka_utils/src/lib.rs new file mode 100644 index 00000000..39b5946b --- /dev/null +++ b/packages/talos_rdkafka_utils/src/lib.rs @@ -0,0 +1 @@ +pub mod kafka_config;