From 046956a4ad202f528d132fa7473855a2f1821838 Mon Sep 17 00:00:00 2001 From: "Joaquin Hoyos (Clark)" Date: Mon, 7 Oct 2024 20:24:02 +0200 Subject: [PATCH] feat: add redis cursor (#5) --- Cargo.lock | 93 ++++++++++++++++++++++++++++++++++++++------- Cargo.toml | 6 ++- src/utils/cursor.rs | 46 ++++++++++++++++++++++ 3 files changed, 131 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cd05acfb..8a6806ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -677,7 +677,7 @@ version = "0.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d60bf15d33cbbe6cf0708a6908fab91166187550ac963c62427d2befea8e648f" dependencies = [ - "itoa", + "itoa 1.0.11", "num-integer", "ryu", "time", @@ -721,7 +721,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "itoa", + "itoa 1.0.11", "matchit", "memchr", "mime", @@ -1252,6 +1252,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "dtoa" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0" + [[package]] name = "dyn-clone" version = "1.0.17" @@ -1762,7 +1768,7 @@ checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" dependencies = [ "bytes", "fnv", - "itoa", + "itoa 1.0.11", ] [[package]] @@ -1773,7 +1779,7 @@ checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", - "itoa", + "itoa 1.0.11", ] [[package]] @@ -1843,7 +1849,7 @@ dependencies = [ "http-body 0.4.6", "httparse", "httpdate", - "itoa", + "itoa 1.0.11", "pin-project-lite", "socket2 0.5.7", "tokio", @@ -1866,7 +1872,7 @@ dependencies = [ "http-body 1.0.1", "httparse", "httpdate", - "itoa", + "itoa 1.0.11", "pin-project-lite", "smallvec", "tokio", @@ -2093,6 +2099,12 @@ dependencies = [ "either", ] +[[package]] +name = "itoa" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" + [[package]] name = "itoa" version = "1.0.11" @@ -2492,7 +2504,7 @@ checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" [[package]] name = "oura" -version = "1.10.1" +version = "1.11.0" dependencies = [ "aws-config", "aws-sdk-lambda", @@ -2523,7 +2535,8 @@ dependencies = [ "pallas-primitives", "pallas-traverse", "prometheus_exporter", - "redis", + "r2d2_redis", + "redis 0.24.0", "reqwest 0.12.7", "serde", "serde_json", @@ -2551,7 +2564,7 @@ dependencies = [ "pkcs5", "rand", "rc2", - "sha1", + "sha1 0.10.6", "sha2", "thiserror", "x509-parser", @@ -2985,6 +2998,27 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + +[[package]] +name = "r2d2_redis" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "182473b876b0b93e353682ec58e207dd1cb4a62278bbe0045fe52b86b74363bb" +dependencies = [ + "r2d2", + "redis 0.20.2", +] + [[package]] name = "rand" version = "0.8.5" @@ -3035,6 +3069,21 @@ dependencies = [ "futures-io", ] +[[package]] +name = "redis" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4f0ceb2ec0dd769483ecd283f6615aa83dcd0be556d5294c6e659caefe7cc54" +dependencies = [ + "async-trait", + "combine", + "dtoa", + "itoa 0.4.8", + "percent-encoding", + "sha1 0.6.1", + "url", +] + [[package]] name = "redis" version = "0.24.0" @@ -3045,7 +3094,7 @@ dependencies = [ "bytes", "combine", "futures-util", - "itoa", + "itoa 1.0.11", "percent-encoding", "pin-project-lite", "ryu", @@ -3408,6 +3457,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -3505,7 +3563,7 @@ version = "1.0.125" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed" dependencies = [ - "itoa", + "itoa 1.0.11", "memchr", "ryu", "serde", @@ -3518,7 +3576,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" dependencies = [ "form_urlencoded", - "itoa", + "itoa 1.0.11", "ryu", "serde", ] @@ -3545,6 +3603,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "sha1" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1da05c97445caa12d05e848c4a4fcbbea29e748ac28f7e80e9b010392063770" +dependencies = [ + "sha1_smol", +] + [[package]] name = "sha1" version = "0.10.6" @@ -3881,7 +3948,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" dependencies = [ "deranged", - "itoa", + "itoa 1.0.11", "num-conv", "powerfmt", "serde", diff --git a/Cargo.toml b/Cargo.toml index fd4f65ee..48840398 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "oura" description = "The tail of Cardano" -version = "1.10.1" +version = "1.11.0" edition = "2021" repository = "https://github.com/txpipe/oura" homepage = "https://github.com/txpipe/oura" @@ -37,6 +37,8 @@ strum_macros = "0.26.4" prometheus_exporter = { version = "0.8.5", default-features = false } unicode-truncate = "0.2.0" time = "0.3.36" +#TODO: put this under a feature +r2d2_redis = { version = "0.14.0" } # feature logs file-rotate = { version = "0.7.1", optional = true } @@ -76,6 +78,7 @@ google-cloud-googleapis = { version = "0.15.0", optional = true } # features: rabbitmqsink lapin = { version = "2.1.1", optional = true } + [features] default = [] web = ["reqwest"] @@ -88,3 +91,4 @@ aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"] redissink = ["redis", "tokio"] gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web", "google-cloud-gax"] rabbitmqsink = ["lapin", "tokio"] + diff --git a/src/utils/cursor.rs b/src/utils/cursor.rs index f462033b..84068c64 100644 --- a/src/utils/cursor.rs +++ b/src/utils/cursor.rs @@ -5,6 +5,11 @@ //! initial point from where it should start reading. A sink should use this //! utility to persist the position once a block has been processed. +use r2d2_redis::{ + r2d2::{self, Pool}, + redis::Commands, + RedisConnectionManager, +}; use std::{ sync::RwLock, time::{Duration, Instant}, @@ -27,15 +32,24 @@ pub struct FileConfig { pub path: String, } +#[derive(Debug, Deserialize)] +pub struct RedisConfig { + pub url: String, + pub key: String, +} + /// A cursor provider that uses the file system as the source for persistence pub(crate) struct FileStorage(FileConfig); /// An ephemeral cursor that lives only in memory pub(crate) struct MemoryStorage(PointArg); +pub(crate) struct RedisStorage(RedisConfig); + enum Storage { File(FileStorage), Memory(MemoryStorage), + Redis(RedisStorage), } impl CanStore for Storage { @@ -43,6 +57,7 @@ impl CanStore for Storage { match self { Storage::File(x) => x.read_cursor(), Storage::Memory(x) => x.read_cursor(), + Storage::Redis(x) => x.read_cursor(), } } @@ -50,6 +65,7 @@ impl CanStore for Storage { match self { Storage::File(x) => x.write_cursor(point), Storage::Memory(x) => x.write_cursor(point), + Storage::Redis(x) => x.write_cursor(point), } } } @@ -59,6 +75,7 @@ impl CanStore for Storage { pub enum Config { File(FileConfig), Memory(PointArg), + Redis(RedisConfig), } #[derive(Clone)] @@ -80,6 +97,7 @@ impl Provider { storage: match config { Config::File(x) => Storage::File(FileStorage(x)), Config::Memory(x) => Storage::Memory(MemoryStorage(x)), + Config::Redis(x) => Storage::Redis(RedisStorage(x)), }, } } @@ -168,3 +186,31 @@ impl CanStore for MemoryStorage { Ok(()) } } + +impl RedisStorage { + pub fn get_pool(&self) -> Result, Error> { + let manager = RedisConnectionManager::new(self.0.url.clone())?; + let pool = r2d2::Pool::builder().build(manager)?; + Ok(pool) + } +} + +impl CanStore for RedisStorage { + fn read_cursor(&self) -> Result { + let pool = self.get_pool()?; + let mut conn = pool.get()?; + // let data: String = conn.get("oura-cursor")?; + let data: String = conn.get(self.0.key.clone())?; + let point: PointArg = serde_json::from_str(&data)?; + Ok(point) + } + + fn write_cursor(&self, point: PointArg) -> Result<(), Error> { + let pool = self.get_pool()?; + let mut conn = pool.get()?; + let data_to_write = serde_json::to_string(&point)?; + // conn.set("oura-cursor", data_to_write)?; + conn.set(self.0.key.clone(), data_to_write)?; + Ok(()) + } +}