Skip to content

Commit

Permalink
Merge pull request #6 from wetware/feat/workspaces
Browse files Browse the repository at this point in the history
Add workspaces for ww_net and ww_proc.
  • Loading branch information
lthibault authored Apr 25, 2024
2 parents b793759 + 7acd9b1 commit b146c7d
Show file tree
Hide file tree
Showing 11 changed files with 290 additions and 14 deletions.
7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[workspace]
members = [
"ww_net",
"ww_proc"
]

[dependencies]
anyhow = "1"
futures = "0.3.29"
Expand All @@ -13,3 +19,4 @@ rand = "0.8"
tokio = { version = "1.36", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
ww_net = {path = "ww_net"}
43 changes: 29 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,34 @@ use std::{error::Error, time::Duration};

use anyhow::Result;
use futures::StreamExt;
use libp2p::{
PeerId,
swarm, identity, mdns, noise, ping, tcp, yamux,
swarm::dial_opts::DialOpts,
};
use libp2p::{identity, mdns, noise, ping, swarm, swarm::dial_opts::DialOpts, tcp, yamux, PeerId};
use std::ops::{Deref, DerefMut};
use tracing_subscriber::EnvFilter;

mod net;
use ww_net;

struct DefaultSwarm(swarm::Swarm<ww_net::DefaultBehaviour>);

impl Deref for DefaultSwarm {
type Target = swarm::Swarm<ww_net::DefaultBehaviour>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for DefaultSwarm {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Start configuring a `fmt` subscriber
let subscriber = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact() // use abbreviated log format
.compact() // use abbreviated log format
// .with_file(true)
// .with_thread_ids(true)
.with_max_level(tracing::Level::INFO)
Expand All @@ -34,12 +47,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
let ping_behaviour = ping::Behaviour::default();

// Combine behaviours.
let behaviour = net::DefaultBehaviour {
let behaviour = ww_net::DefaultBehaviour {
mdns: mdns_behaviour,
ping: ping_behaviour,
};

let mut swarm = libp2p::SwarmBuilder::with_existing_identity(id_keys)
let raw_swarm = libp2p::SwarmBuilder::with_existing_identity(id_keys)
.with_tokio()
.with_tcp(
tcp::Config::default(),
Expand All @@ -50,6 +63,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)))
.build();

let mut swarm = DefaultSwarm(raw_swarm);

// Tell the swarm to listen on all interfaces and a random, OS-assigned
// port.
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
Expand All @@ -62,11 +77,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
tracing::info!("listening on {address:?}")
}

swarm::SwarmEvent::Behaviour(net::DefaultBehaviourEvent::Mdns(event)) => {
net::mdns::default_handler(&mut swarm, event);
swarm::SwarmEvent::Behaviour(ww_net::DefaultBehaviourEvent::Mdns(event)) => {
ww_net::mdns::default_handler(&mut swarm, event);
}
swarm::SwarmEvent::Behaviour(net::DefaultBehaviourEvent::Ping(event)) => {

swarm::SwarmEvent::Behaviour(ww_net::DefaultBehaviourEvent::Ping(event)) => {
tracing::info!("got PING event: {event:?}");
}
event => {
Expand All @@ -76,7 +91,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
}

impl net::mdns::Dialer for swarm::Swarm<net::DefaultBehaviour> {
impl ww_net::mdns::Dialer for DefaultSwarm {
fn dial(&mut self, opts: DialOpts) -> Result<(), swarm::DialError> {
return self.dial(opts);
}
Expand Down
15 changes: 15 additions & 0 deletions ww_net/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "ww_net"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1"
futures = "0.3.29"
libp2p = { version = "0.53.2", features = ["full"] }
rand = "0.8"
tokio = { version = "1.36", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
File renamed without changes.
File renamed without changes.
22 changes: 22 additions & 0 deletions ww_proc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "ww_proc"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

build = "build.rs"

[build-dependencies]
capnpc = "0.19.0"

[dependencies]
anyhow = "1"
wasmer = "4.2.8"
wasmer-compiler-cranelift = "4.2.8"
wasmer-middlewares = "4.2.8"
futures = "0.3.0"
tokio = { version = "1.0.0", features = ["net", "rt", "macros"]}
tokio-util = { version = "0.7.4", features = ["compat"] }
capnp = "0.19.3"
capnp-rpc = "0.19.0"
6 changes: 6 additions & 0 deletions ww_proc/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
capnpc::CompilerCommand::new()
.file("proc.capnp")
.run()?;
Ok(())
}
5 changes: 5 additions & 0 deletions ww_proc/proc.capnp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
@0x8e6a757a73b70ecd;

interface Proc {
deliver @0 (method :Text, event :Data) -> ();
}
4 changes: 4 additions & 0 deletions ww_proc/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#[test]
fn test_client() -> anyhow::Result<()> {
return Ok(())
}
166 changes: 166 additions & 0 deletions ww_proc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// use anyhow::bail;
// use std::sync::Arc;
// use wasmer::wasmparser::Operator;
// use wasmer::CompilerConfig;
// use wasmer::{imports, wat2wasm, sys::EngineBuilder, Instance, Module, Store, TypedFunction};
// use wasmer_compiler_cranelift::Cranelift;
// use wasmer_middlewares::{
// metering::{get_remaining_points, set_remaining_points, MeteringPoints},
// Metering,
// };

pub mod proc_capnp {
include!(concat!(env!("OUT_DIR"), "/proc_capnp.rs"));
}

pub mod client;
pub mod server;


// // cost_function function will be called for each `Operator` encountered during
// // the Wasm module execution. It should return the cost of the operator that it
// // received as it first argument.
// fn cost_function(operator: &Operator) -> u64 {
// match operator {
// Operator::LocalGet { .. } | Operator::I32Const { .. } => 1,
// Operator::I32Add { .. } => 2,
// _ => 0,
// }
// }

#[test]
fn test_capnp() -> anyhow::Result<()> {
return Ok(())
}

// #[test]
// fn test_metering() -> anyhow::Result<()> {
// // Let's declare the Wasm module.
// //
// // We are using the text representation of the module here but you can also load `.wasm`
// // files using the `include_bytes!` macro.
// let wasm_bytes = wat2wasm(
// br#"
// (module
// (type $add_t (func (param i32) (result i32)))
// (func $add_one_f (type $add_t) (param $value i32) (result i32)
// local.get $value
// i32.const 1
// i32.add)
// (export "add_one" (func $add_one_f)))
// "#,
// )?;




// // Now let's create our metering middleware.
// //
// // `Metering` needs to be configured with a limit and a cost function.
// //
// // For each `Operator`, the metering middleware will call the cost
// // function and subtract the cost from the remaining points.
// let metering = Arc::new(Metering::new(10, cost_function));
// let mut compiler_config = Cranelift::default();
// compiler_config.push_middleware(metering);

// // Create a Store.
// //
// // We use our previously create compiler configuration
// // with the Universal engine.
// let mut store = Store::new(EngineBuilder::new(compiler_config));

// println!("Compiling module...");
// // Let's compile the Wasm module.
// let module = Module::new(&store, wasm_bytes)?;

// // Create an empty import object.
// let import_object = imports! {};

// println!("Instantiating module...");
// // Let's instantiate the Wasm module.
// let instance = Instance::new(&mut store, &module, &import_object)?;

// // We now have an instance ready to be used.
// //
// // Our module exports a single `add_one` function. We want to
// // measure the cost of executing this function.
// let add_one: TypedFunction<i32, i32> = instance
// .exports
// .get_function("add_one")?
// .typed(&mut store)?;

// println!("Calling `add_one` function once...");
// add_one.call(&mut store, 1)?;

// // As you can see here, after the first call we have 6 remaining points.
// //
// // This is correct, here are the details of how it has been computed:
// // * `local.get $value` is a `Operator::LocalGet` which costs 1 point;
// // * `i32.const` is a `Operator::I32Const` which costs 1 point;
// // * `i32.add` is a `Operator::I32Add` which costs 2 points.
// let remaining_points_after_first_call = get_remaining_points(&mut store, &instance);
// assert_eq!(
// remaining_points_after_first_call,
// MeteringPoints::Remaining(6)
// );

// println!(
// "Remaining points after the first call: {:?}",
// remaining_points_after_first_call
// );

// println!("Calling `add_one` function twice...");
// add_one.call(&mut store, 1)?;

// // We spent 4 more points with the second call.
// // We have 2 remaining points.
// let remaining_points_after_second_call = get_remaining_points(&mut store, &instance);
// assert_eq!(
// remaining_points_after_second_call,
// MeteringPoints::Remaining(2)
// );

// println!(
// "Remaining points after the second call: {:?}",
// remaining_points_after_second_call
// );

// // Because calling our `add_one` function consumes 4 points,
// // calling it a third time will fail: we already consume 8
// // points, there are only two remaining.
// println!("Calling `add_one` function a third time...");
// match add_one.call(&mut store, 1) {
// Ok(result) => {
// bail!(
// "Expected failure while calling `add_one`, found: {}",
// result
// );
// }
// Err(_) => {
// println!("Calling `add_one` failed.");

// // Because the last needed more than the remaining points, we should have an error.
// let remaining_points = get_remaining_points(&mut store, &instance);

// match remaining_points {
// MeteringPoints::Remaining(..) => {
// bail!("No metering error: there are remaining points")
// }
// MeteringPoints::Exhausted => println!("Not enough points remaining"),
// }
// }
// }

// // Now let's see how we can set a new limit...
// println!("Set new remaining points to 10");
// let new_limit = 10;
// set_remaining_points(&mut store, &instance, new_limit);

// let remaining_points = get_remaining_points(&mut store, &instance);
// assert_eq!(remaining_points, MeteringPoints::Remaining(new_limit));

// println!("Remaining points: {:?}", remaining_points);

// Ok(())
// }
36 changes: 36 additions & 0 deletions ww_proc/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::any::Any;

use capnp::{Error, capability::Promise};
use capnp_rpc::{pry, rpc_twoparty_capnp, twoparty, RpcSystem};
use crate::proc_capnp::proc_;

use wasmer::{Instance, Store};




struct WasmerProc {
store: Store,
wasm: Instance
}

impl proc_::Server for WasmerProc {
fn deliver(
&mut self,
params: proc_::DeliverParams,
mut results: proc_::DeliverResults,
) -> Promise<(), Error>{
let method = pry!(pry!(pry!(params.get()).get_method()).to_str());
let event = pry!(pry!(params.get()).get_event());

match self.wasm.exports.get_function(method) {
Ok(f) => {
// f.call(&mut self.store, event)?;
Promise::ok(())
}
Err(e) => {
Promise::err(Error::failed(format!("method not found: {:?}", e)))
}
}
}
}

0 comments on commit b146c7d

Please sign in to comment.