Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce Deno mapper stage #560

Merged
merged 1 commit into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5,712 changes: 4,404 additions & 1,308 deletions Cargo.lock

Large diffs are not rendered by default.

32 changes: 15 additions & 17 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ authors = ["Santiago Carmuega <santiago@carmuega.me>"]
# pallas = "0.18.0"
# pallas = { git = "https://github.com/txpipe/pallas" }
pallas = { path = "../pallas/pallas" }
gasket = { path = "../../construkts/gasket-rs" }
# gasket = { git = "https://github.com/construkts/gasket-rs.git" }
# gasket = { path = "../../construkts/gasket-rs" }
gasket = { git = "https://github.com/construkts/gasket-rs.git" }
hex = "0.4.3"
net2 = "0.2.37"
bech32 = "0.9.1"
Expand All @@ -36,6 +36,11 @@ strum = "0.24"
strum_macros = "0.24"
prometheus_exporter = { version = "0.8.5", default-features = false }
unicode-truncate = "0.2.0"
thiserror = "1.0.39"
indicatif = "0.17.3"
lazy_static = "1.4.0"
tracing = "0.1.37"
tracing-subscriber = "0.3.16"

# feature logs
file-rotate = { version = "0.7.1", optional = true }
Expand All @@ -58,7 +63,7 @@ aws-sdk-sqs = { version = "0.14.0", optional = true }
aws-sdk-lambda = { version = "0.14.0", optional = true }
aws-sdk-s3 = { version = "0.14.0", optional = true }

# features: elasticsearch || aws
# features: elasticsearch || aws || deno
tokio = { version = "1.24.2", optional = true, features = ["rt"] }

# required for CI to complete successfully
Expand All @@ -73,19 +78,12 @@ google-cloud-googleapis = { version = "0.7.0", optional = true }

# features: rabbitmqsink
lapin = { version = "2.1.1", optional = true }
thiserror = "1.0.39"
indicatif = "0.17.3"
lazy_static = "1.4.0"

# features: deno
deno_core = { version = "0.175.0", optional = true }
serde_v8 = { version = "0.86.0", optional = true }
deno_runtime = { version = "0.101.0", optional = true }

[features]
default = []
web = ["reqwest"]
logs = ["file-rotate"]
webhook = ["web"]
kafkasink = ["kafka", "openssl"]
elasticsink = ["elasticsearch", "tokio"]
fingerprint = ["murmur3"]
aws = ["aws-config", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3", "tokio"]
redissink = ["redis", "tokio"]
gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "tokio", "web"]
rabbitmqsink = ["lapin", "tokio"]
default = ["deno"]
deno = ["deno_core", "deno_runtime", "serde_v8", "tokio"]
9 changes: 0 additions & 9 deletions examples/basic/daemon.toml

This file was deleted.

17 changes: 17 additions & 0 deletions examples/deno/daemon.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[source]
type = "N2N"
peers = ["relays-new.cardano-mainnet.iohk.io:3001"]

[intersect]
type = "Point"
value = [
87486641,
"43b2980fa1cd003c886cc867344ee5fd53ef481e1a96532c4ac3aba77c51ccaa",
]

[mapper]
type = "Deno"
main_module = "./mapper.js"

[sink]
type = "Noop"
12 changes: 12 additions & 0 deletions examples/deno/mapper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// global state
let counter = 0;

globalThis.mapEvent = function (record) {
// I can store custom state that will be available on the next iteration
counter += 1;

return {
msg: "I can change this at runtime without having to compile Oura!",
counter,
};
};
2 changes: 2 additions & 0 deletions src/bin/oura/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,14 @@ pub fn run(args: &Args) -> Result<(), Error> {
let cursor = Cursor::new(config.intersect.into());
let error_policy = config.policy.unwrap_or_default().into();
let finalize = config.finalize;
let current_dir = std::env::current_dir().unwrap();

let ctx = Context {
chain,
error_policy,
finalize,
cursor,
current_dir,
};

let source = config.source.bootstrapper(&ctx)?;
Expand Down
2 changes: 2 additions & 0 deletions src/framework/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use pallas::ledger::traverse::wellknown::GenesisValues;
use serde::Deserialize;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::path::PathBuf;

use pallas::network::miniprotocols::Point;
use pallas::network::upstream::cursor::{Cursor, Intersection};
Expand Down Expand Up @@ -46,6 +47,7 @@ pub struct Context {
pub cursor: Cursor,
pub error_policy: RuntimePolicy,
pub finalize: Option<FinalizeConfig>,
pub current_dir: PathBuf,
}

use serde_json::Value as JsonValue;
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use thiserror::Error;
#![feature(pin_macro)]

pub mod filters;
pub mod framework;
Expand Down
196 changes: 196 additions & 0 deletions src/mappers/deno/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
//! A mapper with custom logic from using the Deno runtime

use deno_core::{op, Extension, ModuleSpecifier, OpState};
use deno_runtime::permissions::PermissionsContainer;
use deno_runtime::worker::{MainWorker, WorkerOptions};
use gasket::{messaging::*, runtime::Tether};
use serde::Deserialize;
use serde_json::json;
use std::ops::Deref;
use std::ops::DerefMut;
use tracing::debug;

use crate::framework::*;

//struct WrappedRuntime(JsRuntime);
struct WrappedRuntime(MainWorker);

unsafe impl Send for WrappedRuntime {}

impl Deref for WrappedRuntime {
type Target = MainWorker;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for WrappedRuntime {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

#[op]
fn op_pop_record(state: &mut OpState) -> Result<serde_json::Value, deno_core::error::AnyError> {
let r: Record = state.take();

let j = match r {
Record::CborBlock(x) => json!({ "len": x.len() as u32 }),
_ => todo!(),
};

Ok(j)
}

#[op]
fn op_put_record(
state: &mut OpState,
value: serde_json::Value,
) -> Result<(), deno_core::error::AnyError> {
state.put(value);

Ok(())
}

struct Worker {
ops_count: gasket::metrics::Counter,
runtime: Option<WrappedRuntime>,
main_module: ModuleSpecifier,
input: MapperInputPort,
output: MapperOutputPort,
}

impl Worker {
fn eval_apply(&mut self, record: Record) -> Result<Option<serde_json::Value>, String> {
let deno = self.runtime.as_mut().unwrap();

{
deno.js_runtime.op_state().borrow_mut().put(record);
}

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

rt.block_on(async {
let res = deno.execute_script(
"<anon>",
r#"
Deno[Deno.internal].core.ops.op_put_record(mapEvent(Deno[Deno.internal].core.ops.op_pop_record()));
"#,
);

deno.run_event_loop(false).await.unwrap();

res.unwrap();
});

let out = deno.js_runtime.op_state().borrow_mut().try_take();
debug!(?out, "deno mapping finished");

Ok(out)
}
}

impl gasket::runtime::Worker for Worker {
fn metrics(&self) -> gasket::metrics::Registry {
gasket::metrics::Builder::new()
.with_counter("ops_count", &self.ops_count)
.build()
}

fn bootstrap(&mut self) -> Result<(), gasket::error::Error> {
let ext = Extension::builder("oura")
.ops(vec![op_pop_record::decl(), op_put_record::decl()])
.build();

let mut worker = MainWorker::bootstrap_from_options(
self.main_module.clone(),
PermissionsContainer::allow_all(),
WorkerOptions {
extensions: vec![ext],
..Default::default()
},
);

let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

rt.block_on(async {
worker.execute_main_module(&self.main_module).await.unwrap();
});

self.runtime = Some(WrappedRuntime(worker));

Ok(())
}

fn work(&mut self) -> gasket::runtime::WorkResult {
let msg = self.input.recv_or_idle()?;

match msg.payload {
ChainEvent::Apply(p, r) => {
let mapped = self.eval_apply(r).unwrap();

if let Some(mapped) = mapped {
self.ops_count.inc(1);
self.output
.send(ChainEvent::Apply(p, Record::GenericJson(mapped)).into())?;
}
}
ChainEvent::Undo(p, r) => todo!(),
ChainEvent::Reset(p) => {
self.output.send(ChainEvent::reset(p))?;
}
}

Ok(gasket::runtime::WorkOutcome::Partial)
}
}

pub struct Bootstrapper(Worker);

impl Bootstrapper {
pub fn connect_input(&mut self, adapter: MapperInputAdapter) {
self.0.input.connect(adapter);
}

pub fn connect_output(&mut self, adapter: MapperOutputAdapter) {
self.0.output.connect(adapter);
}

pub fn spawn(self) -> Result<Vec<Tether>, Error> {
let worker_tether = gasket::runtime::spawn_stage(
self.0,
gasket::runtime::Policy::default(),
Some("mapper"),
);

Ok(vec![worker_tether])
}
}

#[derive(Deserialize)]
pub struct Config {
main_module: String,
}

impl Config {
pub fn bootstrapper(self, ctx: &Context) -> Result<Bootstrapper, Error> {
let main_module =
deno_core::resolve_path(&self.main_module, &ctx.current_dir).map_err(Error::config)?;

let worker = Worker {
main_module,
ops_count: Default::default(),
runtime: Default::default(),
input: Default::default(),
output: Default::default(),
};

Ok(Bootstrapper(worker))
}
}
7 changes: 7 additions & 0 deletions src/mappers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::Deserialize;

use crate::framework::*;

pub mod deno;
pub mod json;
pub mod legacy_v1;
pub mod wasm;
Expand All @@ -11,6 +12,7 @@ pub enum Bootstrapper {
Json(json::Bootstrapper),
LegacyV1(legacy_v1::Bootstrapper),
Wasm(wasm::Bootstrapper),
Deno(deno::Bootstrapper),
}

impl Bootstrapper {
Expand All @@ -19,6 +21,7 @@ impl Bootstrapper {
Bootstrapper::Json(p) => p.connect_input(adapter),
Bootstrapper::LegacyV1(p) => p.connect_input(adapter),
Bootstrapper::Wasm(p) => p.connect_input(adapter),
Bootstrapper::Deno(p) => p.connect_input(adapter),
}
}

Expand All @@ -27,6 +30,7 @@ impl Bootstrapper {
Bootstrapper::Json(p) => p.connect_output(adapter),
Bootstrapper::LegacyV1(p) => p.connect_output(adapter),
Bootstrapper::Wasm(p) => p.connect_output(adapter),
Bootstrapper::Deno(p) => p.connect_output(adapter),
}
}

Expand All @@ -35,6 +39,7 @@ impl Bootstrapper {
Bootstrapper::Json(x) => x.spawn(),
Bootstrapper::LegacyV1(x) => x.spawn(),
Bootstrapper::Wasm(x) => x.spawn(),
Bootstrapper::Deno(x) => x.spawn(),
}
}
}
Expand All @@ -45,6 +50,7 @@ pub enum Config {
Json(json::Config),
LegacyV1(legacy_v1::Config),
Wasm(wasm::Config),
Deno(deno::Config),
}

impl Config {
Expand All @@ -53,6 +59,7 @@ impl Config {
Config::Json(c) => Ok(Bootstrapper::Json(c.bootstrapper(ctx)?)),
Config::LegacyV1(c) => Ok(Bootstrapper::LegacyV1(c.bootstrapper(ctx)?)),
Config::Wasm(c) => Ok(Bootstrapper::Wasm(c.bootstrapper(ctx)?)),
Config::Deno(c) => Ok(Bootstrapper::Deno(c.bootstrapper(ctx)?)),
}
}
}
Expand Down
Loading