Skip to content

Commit

Permalink
Merge pull request #28 from StoriqaTeam/feature/lapin_fix
Browse files Browse the repository at this point in the history
moved to new lapin
  • Loading branch information
serejkaaa512 authored Mar 4, 2019
2 parents b81c942 + 089c54b commit 7bcd596
Show file tree
Hide file tree
Showing 11 changed files with 1,028 additions and 3,264 deletions.
1,607 changes: 875 additions & 732 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ gelf = { git = "https://github.com/StoriqaTeam/gelf-rust", rev = "b05956244f020b
http_router = "0.1"
hyper = "0.12"
hyper-tls = "0.3"
lapin-async = {version = "0.13", git = "https://github.com/StoriqaTeam/lapin", branch = "storiqa" }
lapin-futures = {version = "0.13.1", git = "https://github.com/StoriqaTeam/lapin", branch = "storiqa" }
lapin-async = {version = "0.17", git = "https://github.com/StoriqaTeam/lapin", branch = "0.17.1" }
lapin-futures = {version = "0.17", git = "https://github.com/StoriqaTeam/lapin", branch = "0.17.1" }
log = { version = "0.4", features = ["std", "serde"] }
r2d2 = "0.8"
rand = "0.5"
Expand Down
11 changes: 6 additions & 5 deletions config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ mode = "nightly"

[client]
dns_threads = 4
bitcoin_rpc_url = "http://localhost:18332"
bitcoin_rpc_user = "xyz"
bitcoin_rpc_password = "xyz"
blockcypher_token = "5979b604fb114080b5761dba871009c0"
bitcoin_rpc_url = "http://btc-bitcoind:18332"
bitcoin_rpc_user = "rpcstq"
bitcoin_rpc_password = "ER95KefTsNaMjuVa"
infura_key = "16a351590d9946989cdc80712e74f25e"
infura_secret = "b18f14aa53094950a51597600e41c422"
stq_contract_address = "0x1bf2092a42166b2ae19b7b23752e7d2dab5ba91a"
Expand All @@ -15,15 +16,15 @@ stq_balance_method = "0x70a08231"
[poller]
enabled = true
bitcoin_interval_secs = 60
bitcoin_number_of_tracked_confirmations = 4
bitcoin_number_of_tracked_confirmations = 1
ethereum_interval_secs = 10
ethereum_number_of_tracked_confirmations = 18
storiqa_interval_secs = 10
storiqa_number_of_tracked_confirmations = 18


[rabbit]
url = "amqp://guest:guest@localhost:5672//?heartbeat=3&frame_max=131072"
url = "amqp://user:ttAOXTnkA3@amqp-rabbitmq:5672//?heartbeat=3&frame_max=131072"
thread_pool_size = 2
connection_timeout_secs = 10
connection_pool_size = 10
40 changes: 20 additions & 20 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::net::SocketAddr;
use std::sync::Arc;

use failure::{Compat, Fail};
use futures::future;
use futures::prelude::*;
use hyper;
use hyper::Server;
Expand Down Expand Up @@ -173,23 +172,24 @@ impl Service for ApiService {
}
}

pub fn start_server(config: Config) {
hyper::rt::run(future::lazy(move || {
ApiService::from_config(&config)
.into_future()
.and_then(move |api| {
let api_clone = api.clone();
let new_service = move || {
let res: Result<_, hyper::Error> = Ok(api_clone.clone());
res
};
let addr = api.server_address.clone();
let server = Server::bind(&api.server_address)
.serve(new_service)
.map_err(ectx!(ErrorSource::Hyper, ErrorKind::Internal => addr));
info!("Listening on http://{}", addr);
server
})
.map_err(|e: Error| log_error(&e))
}));
pub fn start_server(config: Config) -> Box<Future<Item = (), Error = ()> + Send> {
let fut = ApiService::from_config(&config)
.into_future()
.and_then(move |api| {
let api_clone = api.clone();
let new_service = move || {
let res: Result<_, hyper::Error> = Ok(api_clone.clone());
res
};
let addr = api.server_address.clone();
let server = Server::bind(&api.server_address)
.serve(new_service)
.map_err(ectx!(ErrorSource::Hyper, ErrorKind::Internal => addr));
info!("Listening on http://{}", addr);
server
})
.map_err(|e: Error| log_error(&e))
.map(|_| ());

Box::new(fut)
}
132 changes: 72 additions & 60 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,14 @@ mod services;
mod utils;

use std::sync::Arc;
use std::thread;
use std::time::Duration;

use self::client::{BitcoinClient, BitcoinClientImpl, EthereumClient, EthereumClientImpl, HttpClientImpl};
use self::pollers::{BitcoinPollerService, EthereumPollerService, StoriqaPollerService};
use self::utils::log_error;
use config::Config;
use prelude::*;
use rabbit::{ConnectionHooks, Error as RabbitError, RabbitConnectionManager, TransactionPublisherImpl};
use rabbit::{RabbitConnectionManager, TransactionPublisherImpl};

pub fn print_config() {
println!("Parsed config: {:?}", get_config());
Expand All @@ -72,6 +71,8 @@ pub fn start_server() {
// Prepare logger
logger::init(&config);

let mut rt = tokio::runtime::Runtime::new().unwrap();

let http_client = HttpClientImpl::new(&config, log::Level::Trace);
let bitcoin_client = Arc::new(BitcoinClientImpl::new(
Arc::new(http_client.clone()),
Expand All @@ -90,47 +91,51 @@ pub fn start_server() {
config.client.stq_balance_method.clone(),
));

let config_clone = config.clone();
debug!("Started creating rabbit connection pool");
let rabbit_connection_manager = rt
.block_on(RabbitConnectionManager::create(&config))
.map_err(|e| {
log_error(&e);
})
.expect("Can not create rabbit connection manager");
debug!("Finished creating rabbit connection manager");
if config.poller.enabled {
thread::spawn(move || {
let mut core = tokio_core::reactor::Core::new().unwrap();
debug!("Started creating rabbit connection pool");
let config = config_clone.clone();
let f = create_transactions_publisher(&config_clone)
.map(|publisher| {
let publisher = Arc::new(publisher);
let ethereum_poller = EthereumPollerService::new(
Duration::from_secs(config.poller.ethereum_interval_secs as u64),
ethereum_client.clone(),
publisher.clone(),
config.poller.ethereum_number_of_tracked_confirmations,
);
let storiqa_poller = StoriqaPollerService::new(
Duration::from_secs(config.poller.storiqa_interval_secs as u64),
ethereum_client.clone(),
publisher.clone(),
config.poller.storiqa_number_of_tracked_confirmations,
);
let bitcoin_poller = BitcoinPollerService::new(
Duration::from_secs(config.poller.bitcoin_interval_secs as u64),
bitcoin_client.clone(),
publisher.clone(),
config.poller.bitcoin_number_of_tracked_confirmations,
);

bitcoin_poller.start();
ethereum_poller.start();
storiqa_poller.start();
})
.map_err(|e| {
log_error(&e);
});
let _ = core.run(f.and_then(|_| futures::future::empty::<(), ()>()));
warn!("Poller process exited!");
});
let channel = Arc::new(rabbit_connection_manager.get_channel().expect("Can not get channel from pool"));
let publisher = rt
.block_on(TransactionPublisherImpl::init(channel))
.map_err(|e| {
log_error(&e);
})
.expect("Can not create rabbit connection manager");

let publisher = Arc::new(publisher);
let ethereum_poller = EthereumPollerService::new(
Duration::from_secs(config.poller.ethereum_interval_secs as u64),
ethereum_client.clone(),
publisher.clone(),
config.poller.ethereum_number_of_tracked_confirmations,
);
let storiqa_poller = StoriqaPollerService::new(
Duration::from_secs(config.poller.storiqa_interval_secs as u64),
ethereum_client.clone(),
publisher.clone(),
config.poller.storiqa_number_of_tracked_confirmations,
);
let bitcoin_poller = BitcoinPollerService::new(
Duration::from_secs(config.poller.bitcoin_interval_secs as u64),
bitcoin_client.clone(),
publisher.clone(),
config.poller.bitcoin_number_of_tracked_confirmations,
);

rt.spawn(bitcoin_poller.start());
rt.spawn(ethereum_poller.start());
rt.spawn(storiqa_poller.start());
}

api::start_server(config);
rt.spawn(api::start_server(config));

rt.shutdown_on_idle().wait().expect("Tokio runtime shutdown failed");
}

pub fn get_btc_blocks(hash: Option<String>, number: u64) {
Expand Down Expand Up @@ -189,7 +194,15 @@ pub fn get_btc_transactions(hash: Option<String>, number: u64) {
pub fn publish_btc_transactions(hash: Option<String>, number: u64) {
let config = get_config();
let bitcoin_client = Arc::new(create_btc_client(&config));
let f = create_transactions_publisher(&config)
let mut core = ::tokio_core::reactor::Core::new().unwrap();
let rabbit_connection_manager = core
.run(RabbitConnectionManager::create(&config))
.map_err(|e| {
log_error(&e);
})
.expect("Can not create rabbit connection manager");
let channel = Arc::new(rabbit_connection_manager.get_channel().expect("Can not get channel from pool"));
let f = TransactionPublisherImpl::init(channel)
.map_err(|e| {
log_error(&e);
})
Expand All @@ -204,7 +217,6 @@ pub fn publish_btc_transactions(hash: Option<String>, number: u64) {
log_error(&e);
})
});
let mut core = ::tokio_core::reactor::Core::new().unwrap();
let _ = core.run(f);
}

Expand Down Expand Up @@ -246,7 +258,15 @@ pub fn get_eth_transactions(hash: Option<String>, number: u64) {
pub fn publish_eth_transactions(hash: Option<String>, number: u64) {
let config = get_config();
let ethereum_client = Arc::new(create_eth_client(&config));
let f = create_transactions_publisher(&config)
let mut core = ::tokio_core::reactor::Core::new().unwrap();
let rabbit_connection_manager = core
.run(RabbitConnectionManager::create(&config))
.map_err(|e| {
log_error(&e);
})
.expect("Can not create rabbit connection manager");
let channel = Arc::new(rabbit_connection_manager.get_channel().expect("Can not get channel from pool"));
let f = TransactionPublisherImpl::init(channel)
.map_err(|e| {
log_error(&e);
})
Expand All @@ -261,7 +281,6 @@ pub fn publish_eth_transactions(hash: Option<String>, number: u64) {
log_error(&e);
})
});
let mut core = ::tokio_core::reactor::Core::new().unwrap();
let _ = core.run(f);
}

Expand Down Expand Up @@ -304,7 +323,15 @@ pub fn get_stq_transactions(hash: Option<String>, number: u64) {
pub fn publish_stq_transactions(hash: Option<String>, number: u64) {
let config = get_config();
let storiqa_client = Arc::new(create_eth_client(&config));
let f = create_transactions_publisher(&config)
let mut core = ::tokio_core::reactor::Core::new().unwrap();
let rabbit_connection_manager = core
.run(RabbitConnectionManager::create(&config))
.map_err(|e| {
log_error(&e);
})
.expect("Can not create rabbit connection manager");
let channel = Arc::new(rabbit_connection_manager.get_channel().expect("Can not get channel from pool"));
let f = TransactionPublisherImpl::init(channel)
.map_err(|e| {
log_error(&e);
})
Expand All @@ -319,24 +346,9 @@ pub fn publish_stq_transactions(hash: Option<String>, number: u64) {
log_error(&e);
})
});
let mut core = ::tokio_core::reactor::Core::new().unwrap();
let _ = core.run(f);
}

fn create_transactions_publisher(config: &Config) -> impl Future<Item = TransactionPublisherImpl, Error = RabbitError> {
let config_clone = config.clone();
let rabbit_thread_pool = futures_cpupool::CpuPool::new(config_clone.rabbit.thread_pool_size);
RabbitConnectionManager::create(&config).and_then(move |rabbit_connection_manager| {
let rabbit_connection_pool = r2d2::Pool::builder()
.max_size(config_clone.rabbit.connection_pool_size as u32)
.connection_customizer(Box::new(ConnectionHooks))
.build(rabbit_connection_manager)
.expect("Cannot build rabbit connection pool");
let publisher = TransactionPublisherImpl::new(rabbit_connection_pool, rabbit_thread_pool);
publisher.init().map(|_| publisher)
})
}

fn create_btc_client(config: &Config) -> BitcoinClientImpl {
let http_client = Arc::new(HttpClientImpl::new(config, log::Level::Debug));
BitcoinClientImpl::new(
Expand Down
4 changes: 2 additions & 2 deletions src/pollers/bitcoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ impl BitcoinPollerService {
}
}

pub fn start(&self) {
pub fn start(&self) -> impl Future<Item = (), Error = ()> {
let self_clone = self.clone();
let interval = Interval::new_interval(self.interval).for_each(move |_| {
self_clone.tick();
Ok(())
});
tokio::spawn(interval.map_err(|_| ()));
interval.map_err(|_| ())
}

pub fn publish_transactions(
Expand Down
4 changes: 2 additions & 2 deletions src/pollers/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ impl EthereumPollerService {
}
}

pub fn start(&self) {
pub fn start(&self) -> impl Future<Item = (), Error = ()> {
let self_clone = self.clone();
let interval = Interval::new_interval(self.interval).for_each(move |_| {
self_clone.tick();
Ok(())
});
tokio::spawn(interval.map_err(|_| ()));
interval.map_err(|_| ())
}

pub fn publish_transactions(
Expand Down
4 changes: 2 additions & 2 deletions src/pollers/storiqa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ impl StoriqaPollerService {
}
}

pub fn start(&self) {
pub fn start(&self) -> impl Future<Item = (), Error = ()> {
let self_clone = self.clone();
let interval = Interval::new_interval(self.interval).for_each(move |_| {
self_clone.tick();
Ok(())
});
tokio::spawn(interval.map_err(|_| ()));
interval.map_err(|_| ())
}

pub fn publish_transactions(
Expand Down
Loading

0 comments on commit 7bcd596

Please sign in to comment.