Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split tasks spawned by CLI commands into their own modules #331

Merged
merged 13 commits into from
Oct 22, 2020
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@

### FEATURES

- Added "unreleased" section in `CHANGELOG.MD` to help streamline releases ([#274]).
- [changelog] Added "unreleased" section in `CHANGELOG.MD` to help streamline releases ([#274]).

### IMPROVEMENTS

- [relayer-cli] Split tasks spawned by CLI commands into their own modules ([#331])

[#274]: https://github.com/informalsystems/ibc-rs/issues/274
[#331]: https://github.com/informalsystems/ibc-rs/pulls/331

## v0.0.4
*October 19, 2020*
Expand Down
1 change: 0 additions & 1 deletion relayer-cli/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ mod listen;
mod query;
mod start;
mod tx;
mod utils;
mod version;

use self::{
Expand Down
80 changes: 13 additions & 67 deletions relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
@@ -1,84 +1,30 @@
use crate::prelude::*;
use std::ops::Deref;

use abscissa_core::{
application::fatal_error,
error::BoxError,
tracing::{debug, info},
application::fatal_error, error::BoxError, tracing::debug, Command, Options, Runnable,
};
use abscissa_core::{Command, Options, Runnable};

use crate::commands::utils::block_on;
use relayer::config::ChainConfig;
use relayer::event_handler::*;
use relayer::event_monitor::*;

use std::{ops::Deref, process};
use tokio::sync::mpsc::{channel, Sender};

use crate::config::Config;
use ::tendermint::chain::Id as ChainId;
use futures::future::join_all;
use ibc::events::IBCEvent;
use crate::{application::APPLICATION, prelude::*, tasks::event_listener};

#[derive(Command, Debug, Options)]
pub struct ListenCmd {
// #[options(help = "reset state from trust options", short = "r")]
// reset: bool,
}
pub struct ListenCmd {}

impl ListenCmd {
fn cmd(&self) -> Result<(), BoxError> {
async fn cmd(&self) -> Result<(), BoxError> {
let config = app_config().clone();
debug!("launching 'listen' command");
let local = tokio::task::LocalSet::new();

block_on(local.run_until(listener_task(&config, false)))
debug!("launching 'listen' command");
event_listener::start(&config, false).await
}
}

impl Runnable for ListenCmd {
fn run(&self) {
self.cmd()
.unwrap_or_else(|e| fatal_error(app_reader().deref(), &*e))
}
}

pub async fn listener_task(config: &Config, relay: bool) -> Result<(), BoxError> {
let (tx, rx) = channel(100);
let mut all_futures = Vec::new();
for chain_config in &config.chains {
info!(chain.id = % chain_config.id, "spawning event monitor for");
let mut event_monitor = init_monitor(chain_config.clone(), tx.clone()).await;
let m_handle = tokio::spawn(async move { event_monitor.run().await });
all_futures.push(m_handle);
abscissa_tokio::run(&APPLICATION, async move {
self.cmd()
.await
.unwrap_or_else(|e| fatal_error(app_reader().deref(), &*e));
})
.unwrap();
}

info!("spawning main event handler");
let mut event_handler = EventHandler::new(rx, relay);
let r_handle = tokio::spawn(async move { event_handler.run().await });

all_futures.push(r_handle);
let _res = join_all(all_futures).await;

Ok(())
}

async fn init_monitor(
chain_config: ChainConfig,
tx: Sender<(ChainId, Vec<IBCEvent>)>,
) -> EventMonitor {
let mut event_monitor =
EventMonitor::create(chain_config.id, chain_config.rpc_addr.clone(), tx)
.await
.unwrap_or_else(|e| {
status_err!("couldn't initialize event monitor: {}", e);
process::exit(1);
});

event_monitor.subscribe().await.unwrap_or_else(|e| {
status_err!("couldn't initialize subscriptions: {}", e);
process::exit(1);
});

event_monitor
}
Loading