Skip to content

Commit

Permalink
feat: add redis cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
joacohoyos committed Oct 7, 2024
1 parent c07705a commit 93dd063
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 14 deletions.
93 changes: 80 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "oura"
description = "The tail of Cardano"
version = "1.10.1"
version = "1.11.0"
edition = "2021"
repository = "https://github.com/txpipe/oura"
homepage = "https://github.com/txpipe/oura"
Expand Down Expand Up @@ -37,6 +37,8 @@ strum_macros = "0.26.4"
prometheus_exporter = { version = "0.8.5", default-features = false }
unicode-truncate = "0.2.0"
time = "0.3.36"
#TODO: put this under a feature
r2d2_redis = { version = "0.14.0" }

# feature logs
file-rotate = { version = "0.7.1", optional = true }
Expand Down Expand Up @@ -76,6 +78,7 @@ google-cloud-googleapis = { version = "0.15.0", optional = true }
# features: rabbitmqsink
lapin = { version = "2.1.1", optional = true }


[features]
default = []
web = ["reqwest"]
Expand All @@ -88,3 +91,4 @@ aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"]
redissink = ["redis", "tokio"]
gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web", "google-cloud-gax"]
rabbitmqsink = ["lapin", "tokio"]

46 changes: 46 additions & 0 deletions src/utils/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
//! initial point from where it should start reading. A sink should use this
//! utility to persist the position once a block has been processed.

use r2d2_redis::{
r2d2::{self, Pool},
redis::Commands,
RedisConnectionManager,
};
use std::{
sync::RwLock,
time::{Duration, Instant},
Expand All @@ -27,29 +32,40 @@ pub struct FileConfig {
pub path: String,
}

#[derive(Debug, Deserialize)]
pub struct RedisConfig {
pub url: String,
pub key: String,
}

/// A cursor provider that uses the file system as the source for persistence
pub(crate) struct FileStorage(FileConfig);

/// An ephemeral cursor that lives only in memory
pub(crate) struct MemoryStorage(PointArg);

pub(crate) struct RedisStorage(RedisConfig);

enum Storage {
File(FileStorage),
Memory(MemoryStorage),
Redis(RedisStorage),
}

impl CanStore for Storage {
fn read_cursor(&self) -> Result<PointArg, Error> {
match self {
Storage::File(x) => x.read_cursor(),
Storage::Memory(x) => x.read_cursor(),
Storage::Redis(x) => x.read_cursor(),
}
}

fn write_cursor(&self, point: PointArg) -> Result<(), Error> {
match self {
Storage::File(x) => x.write_cursor(point),
Storage::Memory(x) => x.write_cursor(point),
Storage::Redis(x) => x.write_cursor(point),
}
}
}
Expand All @@ -59,6 +75,7 @@ impl CanStore for Storage {
pub enum Config {
File(FileConfig),
Memory(PointArg),
Redis(RedisConfig),
}

#[derive(Clone)]
Expand All @@ -80,6 +97,7 @@ impl Provider {
storage: match config {
Config::File(x) => Storage::File(FileStorage(x)),
Config::Memory(x) => Storage::Memory(MemoryStorage(x)),
Config::Redis(x) => Storage::Redis(RedisStorage(x)),
},
}
}
Expand Down Expand Up @@ -168,3 +186,31 @@ impl CanStore for MemoryStorage {
Ok(())
}
}

impl RedisStorage {
pub fn get_pool(&self) -> Result<Pool<RedisConnectionManager>, Error> {
let manager = RedisConnectionManager::new(self.0.url.clone())?;
let pool = r2d2::Pool::builder().build(manager)?;
Ok(pool)
}
}

impl CanStore for RedisStorage {
fn read_cursor(&self) -> Result<PointArg, Error> {
let pool = self.get_pool()?;
let mut conn = pool.get()?;
// let data: String = conn.get("oura-cursor")?;
let data: String = conn.get(self.0.key.clone())?;
let point: PointArg = serde_json::from_str(&data)?;
Ok(point)
}

fn write_cursor(&self, point: PointArg) -> Result<(), Error> {

Check failure on line 208 in src/utils/cursor.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function depends on never type fallback being `()`

Check warning on line 208 in src/utils/cursor.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (windows-latest, stable)

this function depends on never type fallback being `()`

Check warning on line 208 in src/utils/cursor.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (ubuntu-latest, stable)

this function depends on never type fallback being `()`

Check warning on line 208 in src/utils/cursor.rs

View workflow job for this annotation

GitHub Actions / Matrix Check (macOS-latest, stable)

this function depends on never type fallback being `()`
let pool = self.get_pool()?;
let mut conn = pool.get()?;
let data_to_write = serde_json::to_string(&point)?;
// conn.set("oura-cursor", data_to_write)?;
conn.set(self.0.key.clone(), data_to_write)?;
Ok(())
}
}

0 comments on commit 93dd063

Please sign in to comment.