diff --git a/Cargo.toml b/Cargo.toml index 58f79546..ba850f45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ config = { version = "0.13.2", default-features = false, features = [ "json", ] } serde = { version = "1.0.152", features = ["derive"] } -serde_json = { version = "1.0.89", features = ["arbitrary_precision"] } +serde_json = { version = "1.0.89" } strum = "0.24" strum_macros = "0.24" prometheus_exporter = { version = "0.8.5", default-features = false } diff --git a/examples/deno/daemon.toml b/examples/deno/daemon.toml deleted file mode 100644 index cb72b62d..00000000 --- a/examples/deno/daemon.toml +++ /dev/null @@ -1,17 +0,0 @@ -[source] -type = "N2N" -peers = ["relays-new.cardano-mainnet.iohk.io:3001"] - -[intersect] -type = "Point" -value = [ - 87486641, - "43b2980fa1cd003c886cc867344ee5fd53ef481e1a96532c4ac3aba77c51ccaa", -] - -[mapper] -type = "Deno" -main_module = "./mapper.js" - -[sink] -type = "Noop" diff --git a/examples/deno/mapper.js b/examples/deno/mapper.js deleted file mode 100644 index d653046a..00000000 --- a/examples/deno/mapper.js +++ /dev/null @@ -1,12 +0,0 @@ -// global state -let counter = 0; - -globalThis.mapEvent = function (record) { - // I can store custom state that will be available on the next iteration - counter += 1; - - return { - msg: "I can change this at runtime without having to compile Oura!", - counter, - }; -}; diff --git a/examples/deno_cip68/.gitignore b/examples/deno_cip68/.gitignore new file mode 100644 index 00000000..4c43fe68 --- /dev/null +++ b/examples/deno_cip68/.gitignore @@ -0,0 +1 @@ +*.js \ No newline at end of file diff --git a/examples/deno_cip68/README.md b/examples/deno_cip68/README.md new file mode 100644 index 00000000..84eb1d34 --- /dev/null +++ b/examples/deno_cip68/README.md @@ -0,0 +1,76 @@ +# CIP68 Parser Example + +This example shows how to leverage the Deno filter stage to apply custom parsing logic and extract CIP68 reference NFT data from transaction without the need to change Oura's internal processing pipeline. + +## Configuration + +The relevant section of the daemon.toml is the following: + +```toml +[[filters]] +type = "Deno" +main_module = "./parser.js" +``` + +The above configuration fragment instructs _Oura_ to introduce a _Deno_ filter that uses the logic specified in the file `parser.js`, which holds your custom filter logic. + +The following section explains how to create a .js file compatible with what the Deno filter is expecting. + +## Custom Deno Filter + +To create the custom logic for your Deno-base filter, you need to start by creating a _Typescript_ and implementing a `mapEvent` function: + +```ts +export function mapEvent(record: oura.Event) { + // your custom logic goes here +} +``` + +The above function will be called for each record ths goes through _Oura_'s pipeline. Depending on your configuration, records could represent blocks, transactions or other payloads generated by previous filter stages. + +## CIP68 + +The goal for this particular example is to extract data from transactions that corresponds to reference NFT as defined by [CIP68](https://cips.cardano.org/cips/cip68). + +By inspecting the outputs and Plutus datums from the transactions, we can parse relevant information and generate custom JSON objects such as this one: + +```json +{ + "label": "000643b042756438363031", + "policy": "4523c5e21d409b81c95b45b0aea275b8ea1406e6cafea5583b9f8a5f", + "metadata": { + "name": "SpaceBud #8601", + "traits": "", + "type": "Shark", + "image": "ipfs://bafkreidrqwxpxhyc5bo364fzzwv7nhnjel6y6zkywriw33jopb2p4tba5u", + "sha256": "7185aefb9f02e85dbf70b9cdabf69da922fd8f6558b4516ded2e7874fe4c20ed" + }, + "version": 1, + "txHash": "720dd358d2b28531e181f93eed0e0d24db364232ddaccf6655abf92790a062d5" +} +``` + +You can check the actual code for the parser in the file `parser.ts`. Without going much into detail, the relevant part of the parser is a function called `extractRefNFT`: + +```ts +function extractNFTRef( + output: oura.TxOutputRecord, + allDatums?: oura.PlutusDatumRecord[] | null +): Partial | null { + const asset = output.assets?.find((a) => a.asset.startsWith("000")); + + if (!asset) return null; + + const datum = + output.inline_datum || + allDatums?.find((d) => d.datum_hash == output.datum_hash); + + if (!datum) return null; + + return { + label: asset.asset, + policy: asset.policy, + ...parseDatum(datum), + }; +} +``` \ No newline at end of file diff --git a/examples/deno_cip68/daemon.toml b/examples/deno_cip68/daemon.toml new file mode 100644 index 00000000..c85d6843 --- /dev/null +++ b/examples/deno_cip68/daemon.toml @@ -0,0 +1,21 @@ +[source] +type = "N2N" +peers = ["relays-new.cardano-mainnet.iohk.io:3001"] + +[intersect] +type = "Point" +value = [ + 87938927, + "7d6f25ff981bdc92f6b61c04d7f9d0b7236783da9c66b2d0ba5dc6953b68b34f", +] + +[[filters]] +type = "LegacyV1" +include_transaction_details = true + +[[filters]] +type = "Deno" +main_module = "./parser.js" + +[sink] +type = "Noop" diff --git a/examples/deno_cip68/ouraTypes.ts b/examples/deno_cip68/ouraTypes.ts new file mode 100644 index 00000000..c38964d9 --- /dev/null +++ b/examples/deno_cip68/ouraTypes.ts @@ -0,0 +1,254 @@ +export type Era = + | "Undefined" + | "Unknown" + | "Byron" + | "Shelley" + | "Allegra" + | "Mary" + | "Alonzo" + | "Babbage"; + +export type GenericJson = Record; + +export type MetadatumRendition = { + map_json?: GenericJson; + array_json?: GenericJson; + int_scalar?: string; + text_scalar?: string; + bytes_hex?: string; +}; + +export type MetadataRecord = { + label: string; + content: MetadatumRendition; +}; + +export type CIP25AssetRecord = { + version: string; + policy: string; + asset: string; + name: string | null; + image: string | null; + media_type: string | null; + description: string | null; + raw_json: GenericJson; +}; + +export type CIP15AssetRecord = { + voting_key: string; + stake_pub: string; + reward_address: string; + nonce: number; + raw_json: GenericJson; +}; + +export type TxInputRecord = { + tx_id: string; + index: number; +}; + +export type OutputAssetRecord = { + policy: string; + asset: string; + asset_ascii: string | null; + amount: number; +}; + +export type TxOutputRecord = { + address: string; + amount: number; + assets: OutputAssetRecord[] | null; + datum_hash: string | null; + inline_datum: PlutusDatumRecord | null; +}; + +export type MintRecord = { + policy: string; + asset: string; + quantity: number; +}; + +export type WithdrawalRecord = { + reward_account: string; + coin: number; +}; + +export type TransactionRecord = { + hash: string; + fee: number; + ttl: number | null; + validity_interval_start: number | null; + network_id: number | null; + input_count: number; + collateral_input_count: number; + has_collateral_output: boolean; + output_count: number; + mint_count: number; + total_output: number; + + // include_details + metadata: MetadataRecord[] | null; + inputs: TxInputRecord[] | null; + outputs: TxOutputRecord[] | null; + collateral_inputs: TxInputRecord[] | null; + collateral_output: TxOutputRecord | null; + mint: MintRecord[] | null; + vkey_witnesses: VKeyWitnessRecord[] | null; + native_witnesses: NativeWitnessRecord[] | null; + plutus_witnesses: PlutusWitnessRecord[] | null; + plutus_redeemers: PlutusRedeemerRecord[] | null; + plutus_data: PlutusDatumRecord[] | null; + withdrawals: WithdrawalRecord[] | null; + size: number; +}; + +export type EventContext = { + block_hash: string | null; + block_number: number | null; + slot: number | null; + timestamp: number | null; + tx_idx: number | null; + tx_hash: string | null; + input_idx: number | null; + output_idx: number | null; + output_address: string | null; + certificate_idx: number | null; +}; + +export type StakeCredential = { + addr_keyhash?: string; + scripthash?: string; +}; + +export type VKeyWitnessRecord = { + vkey_hex: string; + signature_hex: string; +}; + +export type NativeWitnessRecord = { + policy_id: string; + script_json: GenericJson; +}; + +export type PlutusWitnessRecord = { + script_hash: string; + script_hex: string; +}; + +export type PlutusRedeemerRecord = { + purpose: string; + ex_units_mem: number; + ex_units_steps: number; + input_idx: number; + plutus_data: GenericJson; +}; + +export type PlutusDatumRecord = { + datum_hash: string; + plutus_data: GenericJson; +}; + +export type BlockRecord = { + era: Era; + epoch: number | null; + epoch_slot: number | null; + body_size: number; + issuer_vkey: string; + vrf_vkey: string; + tx_count: number; + slot: number; + hash: string; + number: number; + previous_hash: string; + cbor_hex: string | null; + transactions: TransactionRecord[] | null; +}; + +export type CollateralRecord = { + tx_id: string; + index: number; +}; + +export type PoolRegistrationRecord = { + operator: string; + vrf_keyhash: string; + pledge: number; + cost: number; + margin: number; + reward_account: string; + pool_owners: string[]; + relays: string[]; + pool_metadata: string | null; + pool_metadata_hash: string | null; +}; + +export type RollBackRecord = { + block_slot: number; + block_hash: string; +}; + +export type MoveInstantaneousRewardsCertRecord = { + from_reserves: boolean; + from_treasury: boolean; + to_stake_credentials: Array<[StakeCredential, number]> | null; + to_other_pot: number | null; +}; + +export type NativeScriptRecord = { + policy_id: string; + script: GenericJson; +}; + +export type PlutusScriptRecord = { + hash: string; + data: string; +}; + +export type StakeRegistrationRecord = { credential: StakeCredential }; + +export type StakeDeregistrationRecord = { credential: StakeCredential }; + +export type StakeDelegation = { + credential: StakeCredential; + pool_hash: string; +}; + +export type PoolRetirementRecord = { + pool: string; + epoch: number; +}; + +export type GenesisKeyDelegationRecord = {}; + +export type Event = { + context: EventContext; + fingerprint?: string; + + block?: BlockRecord; + block_end?: BlockRecord; + transaction?: TransactionRecord; + transaction_end?: TransactionRecord; + tx_input?: TxInputRecord; + tx_output?: TxOutputRecord; + output_asset?: OutputAssetRecord; + metadata?: MetadataRecord; + v_key_witness?: VKeyWitnessRecord; + native_witness?: NativeWitnessRecord; + plutus_witness?: PlutusWitnessRecord; + plutus_redeemer?: PlutusRedeemerRecord; + plutus_datum?: PlutusDatumRecord; + cip25_asset?: CIP25AssetRecord; + cip15_asset?: CIP15AssetRecord; + mint?: MintRecord; + collateral?: CollateralRecord; + native_script?: NativeScriptRecord; + plutus_script?: PlutusScriptRecord; + stake_registration?: StakeRegistrationRecord; + stake_deregistration?: StakeDeregistrationRecord; + stake_delegation?: StakeDelegation; + pool_registration?: PoolRegistrationRecord; + pool_retirement?: PoolRetirementRecord; + genesis_key_delegation?: GenesisKeyDelegationRecord; + move_instantaneous_rewards_cert?: MoveInstantaneousRewardsCertRecord; + roll_back?: RollBackRecord; +}; diff --git a/examples/deno_cip68/parser.ts b/examples/deno_cip68/parser.ts new file mode 100644 index 00000000..c7a2b846 --- /dev/null +++ b/examples/deno_cip68/parser.ts @@ -0,0 +1,64 @@ +import * as oura from "./ouraTypes.ts"; +import { PlutusMap, plutusMapToPlainJson } from "./plutusData.ts"; + +interface RefNFT { + txHash: string; + label: string; + policy: string; + metadata?: oura.GenericJson; + version?: number; +} + +function parseDatum(raw: oura.PlutusDatumRecord): Partial | null { + try { + const [metaField, versionField, _] = raw.plutus_data.fields as [ + { map: oura.GenericJson[] }, + { int: number }, + { fields: oura.GenericJson[] } + ]; + + return { + metadata: plutusMapToPlainJson(metaField.map as PlutusMap), + version: versionField.int as number, + }; + } catch (err) { + console.error(err); + return null; + } +} + +function extractRefNFT( + output: oura.TxOutputRecord, + allDatums?: oura.PlutusDatumRecord[] | null +): Partial | null { + const asset = output.assets?.find((a) => a.asset.startsWith("000")); + + if (!asset) return null; + + const datum = + output.inline_datum || + allDatums?.find((d) => d.datum_hash == output.datum_hash); + + if (!datum) return null; + + return { + label: asset.asset, + policy: asset.policy, + ...parseDatum(datum), + }; +} + +function processTx(tx: oura.TransactionRecord) { + return tx.outputs + ?.map((output) => extractRefNFT(output, tx.plutus_data)) + .filter((x) => !!x) + .map((x) => ({ ...x, txHash: tx.hash })); +} + +export function mapEvent(record: oura.Event) { + if (!record.transaction) { + return; + } + + return processTx(record.transaction); +} diff --git a/examples/deno_cip68/plutusData.ts b/examples/deno_cip68/plutusData.ts new file mode 100644 index 00000000..ef9ec2b4 --- /dev/null +++ b/examples/deno_cip68/plutusData.ts @@ -0,0 +1,32 @@ +import * as hex from "https://deno.land/std/encoding/hex.ts"; +import * as oura from "./ouraTypes.ts"; + +export type PlutusMap = Array<{ + k: { bytes: string }; + v: { bytes: string }; +}>; + +const TEXT_ENCODER = new TextEncoder(); +const TEXT_DECODER = new TextDecoder(); + +function isReadableAscii(raw: string): boolean { + return raw.split("").every((char) => { + const charCode = char.charCodeAt(0); + return 0x20 <= charCode && charCode <= 0x7e; + }); +} + +function hexToText(hexString: string): string { + const hexBytes = TEXT_ENCODER.encode(hexString); + const utfBytes = hex.decode(hexBytes); + return TEXT_DECODER.decode(utfBytes); +} + +export function plutusMapToPlainJson(source: PlutusMap): oura.GenericJson { + return source.reduce((all, item) => { + const key = hexToText(item.k.bytes); + const maybeText = hexToText(item.v.bytes); + all[key] = isReadableAscii(maybeText) ? maybeText : item.v.bytes; + return all; + }, {}); +} diff --git a/src/filters/deno/mod.rs b/src/filters/deno/mod.rs index 34523705..c20ad314 100644 --- a/src/filters/deno/mod.rs +++ b/src/filters/deno/mod.rs @@ -1,43 +1,33 @@ //! A mapper with custom logic from using the Deno runtime -use deno_core::{op, Extension, ModuleSpecifier, OpState}; +use deno_core::{op, Extension, ModuleSpecifier, OpState, Snapshot}; use deno_runtime::permissions::PermissionsContainer; -use deno_runtime::worker::{MainWorker, WorkerOptions}; +use deno_runtime::worker::{MainWorker as DenoWorker, WorkerOptions}; +use deno_runtime::BootstrapOptions; use gasket::{messaging::*, runtime::Tether}; use serde::Deserialize; use serde_json::json; use std::ops::Deref; use std::ops::DerefMut; -use tracing::debug; +use std::path::PathBuf; +use tokio::runtime::Runtime as TokioRuntime; +use tracing::{debug, trace}; use crate::framework::*; -//struct WrappedRuntime(JsRuntime); -struct WrappedRuntime(MainWorker); +struct WrappedRuntime(DenoWorker, TokioRuntime); unsafe impl Send for WrappedRuntime {} -impl Deref for WrappedRuntime { - type Target = MainWorker; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for WrappedRuntime { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - #[op] fn op_pop_record(state: &mut OpState) -> Result { let r: Record = state.take(); let j = match r { - Record::CborBlock(x) => json!({ "len": x.len() as u32 }), - _ => todo!(), + Record::CborBlock(x) => json!({ "hex": hex::encode(x) }), + Record::CborTx(x) => json!({ "hex": hex::encode(x) }), + Record::OuraV1Event(x) => json!(x), + Record::GenericJson(x) => x, }; Ok(j) @@ -48,7 +38,10 @@ fn op_put_record( state: &mut OpState, value: serde_json::Value, ) -> Result<(), deno_core::error::AnyError> { - state.put(value); + match value { + serde_json::Value::Null => (), + _ => state.put(value), + }; Ok(()) } @@ -56,30 +49,21 @@ fn op_put_record( struct Worker { ops_count: gasket::metrics::Counter, runtime: Option, - main_module: ModuleSpecifier, + main_module: PathBuf, input: MapperInputPort, output: MapperOutputPort, } impl Worker { fn eval_apply(&mut self, record: Record) -> Result, String> { - let deno = self.runtime.as_mut().unwrap(); + let WrappedRuntime(deno, tokio) = self.runtime.as_mut().unwrap(); - { - deno.js_runtime.op_state().borrow_mut().put(record); - } + deno.js_runtime.op_state().borrow_mut().put(record); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - rt.block_on(async { + tokio.block_on(async { let res = deno.execute_script( "", - r#" -Deno[Deno.internal].core.ops.op_put_record(mapEvent(Deno[Deno.internal].core.ops.op_pop_record())); -"#, + r#"Deno[Deno.internal].core.ops.op_put_record(mapEvent(Deno[Deno.internal].core.ops.op_pop_record()));"#, ); deno.run_event_loop(false).await.unwrap(); @@ -88,7 +72,7 @@ Deno[Deno.internal].core.ops.op_put_record(mapEvent(Deno[Deno.internal].core.ops }); let out = deno.js_runtime.op_state().borrow_mut().try_take(); - debug!(?out, "deno mapping finished"); + trace!(?out, "deno mapping finished"); Ok(out) } @@ -106,24 +90,38 @@ impl gasket::runtime::Worker for Worker { .ops(vec![op_pop_record::decl(), op_put_record::decl()]) .build(); - let mut worker = MainWorker::bootstrap_from_options( - self.main_module.clone(), + let empty_module = + deno_core::ModuleSpecifier::parse("data:text/javascript;base64,").unwrap(); + + let mut worker = DenoWorker::bootstrap_from_options( + empty_module, PermissionsContainer::allow_all(), WorkerOptions { extensions: vec![ext], + bootstrap: BootstrapOptions { + ..Default::default() + }, ..Default::default() }, ); - let rt = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); + let reactor = deno_runtime::tokio_util::create_basic_runtime(); - rt.block_on(async { - worker.execute_main_module(&self.main_module).await.unwrap(); + reactor.block_on(async { + let code = std::fs::read_to_string(&self.main_module).unwrap(); + + worker + .js_runtime + .load_side_module(&ModuleSpecifier::parse("oura:mapper").unwrap(), Some(code)) + .await + .unwrap(); + + let res = worker.execute_script("[oura:runtime.js]", include_str!("./runtime.js")); + worker.run_event_loop(false).await.unwrap(); + res.unwrap(); }); - self.runtime = Some(WrappedRuntime(worker)); + self.runtime = Some(WrappedRuntime(worker, reactor)); Ok(()) } @@ -137,8 +135,20 @@ impl gasket::runtime::Worker for Worker { if let Some(mapped) = mapped { self.ops_count.inc(1); - self.output - .send(ChainEvent::Apply(p, Record::GenericJson(mapped)).into())?; + + match mapped { + serde_json::Value::Array(items) => { + for item in items { + self.output.send( + ChainEvent::Apply(p.clone(), Record::GenericJson(item)).into(), + )?; + } + } + _ => { + self.output + .send(ChainEvent::Apply(p, Record::GenericJson(mapped)).into())?; + } + } } } ChainEvent::Undo(p, r) => todo!(), @@ -180,11 +190,13 @@ pub struct Config { impl Config { pub fn bootstrapper(self, ctx: &Context) -> Result { - let main_module = - deno_core::resolve_path(&self.main_module, &ctx.current_dir).map_err(Error::config)?; + // let main_module = + // deno_core::resolve_path(&self.main_module, + // &ctx.current_dir).map_err(Error::config)?; let worker = Worker { - main_module, + //main_module, + main_module: PathBuf::from(self.main_module), ops_count: Default::default(), runtime: Default::default(), input: Default::default(), diff --git a/src/filters/deno/runtime.js b/src/filters/deno/runtime.js new file mode 100644 index 00000000..f3723d74 --- /dev/null +++ b/src/filters/deno/runtime.js @@ -0,0 +1,3 @@ +import("oura:mapper").then(({ mapEvent }) => { + globalThis["mapEvent"] = mapEvent; +}); diff --git a/src/sinks/terminal/run.rs b/src/sinks/terminal/run.rs index a4bdea56..3acc256e 100644 --- a/src/sinks/terminal/run.rs +++ b/src/sinks/terminal/run.rs @@ -17,7 +17,6 @@ pub struct Worker { pub(crate) adahandle_policy: Option, pub(crate) cursor: Cursor, pub(crate) input: MapperInputPort, - pub(crate) output: MapperOutputPort, } impl Worker { diff --git a/src/sinks/terminal/setup.rs b/src/sinks/terminal/setup.rs index 0cee1a82..be8fb7bf 100644 --- a/src/sinks/terminal/setup.rs +++ b/src/sinks/terminal/setup.rs @@ -14,11 +14,8 @@ impl Bootstrapper { } pub fn spawn(self) -> Result, Error> { - let worker_tether = gasket::runtime::spawn_stage( - self.0, - gasket::runtime::Policy::default(), - Some("sink_terminal"), - ); + let worker_tether = + gasket::runtime::spawn_stage(self.0, gasket::runtime::Policy::default(), Some("sink")); Ok(vec![worker_tether]) } @@ -40,7 +37,6 @@ impl Config { adahandle_policy: self.adahandle_policy, msg_count: Default::default(), input: Default::default(), - output: Default::default(), cursor: ctx.cursor.clone(), };