Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
ethcore-io retries failed work steal (#9651)
Browse files Browse the repository at this point in the history
* ethcore-io uses newer version of crossbeam && retries failed work steal

* ethcore-io non-mio service uses newer crossbeam
  • Loading branch information
debris authored and 5chdn committed Sep 30, 2018
1 parent 723cb33 commit 0eee5da
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion ethcore/src/engines/tendermint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,6 @@ impl Engine<EthereumMachine> for Tendermint {
}

fn stop(&self) {
self.step_service.stop()
}

fn is_proposal(&self, header: &Header) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion util/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
fnv = "1.0"
mio = { version = "0.6.8", optional = true }
crossbeam = "0.3"
crossbeam-deque = "0.6"
parking_lot = "0.6"
log = "0.4"
slab = "0.4"
Expand Down
2 changes: 1 addition & 1 deletion util/io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ extern crate mio;
#[macro_use]
extern crate log as rlog;
extern crate slab;
extern crate crossbeam;
extern crate crossbeam_deque as deque;
extern crate parking_lot;
extern crate num_cpus;
extern crate timer;
Expand Down
14 changes: 7 additions & 7 deletions util/io/src/service_mio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::collections::HashMap;
use mio::*;
use mio::timer::{Timeout};
use mio::deprecated::{EventLoop, Handler, Sender, EventLoopBuilder};
use crossbeam::sync::chase_lev;
use deque;
use slab::Slab;
use {IoError, IoHandler};
use worker::{Worker, Work, WorkType};
Expand Down Expand Up @@ -184,7 +184,7 @@ pub struct IoManager<Message> where Message: Send + Sync {
timers: Arc<RwLock<HashMap<HandlerId, UserTimer>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>,
workers: Vec<Worker>,
worker_channel: chase_lev::Worker<Work<Message>>,
worker_channel: deque::Worker<Work<Message>>,
work_ready: Arc<Condvar>,
}

Expand All @@ -194,7 +194,7 @@ impl<Message> IoManager<Message> where Message: Send + Sync + 'static {
event_loop: &mut EventLoop<IoManager<Message>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>
) -> Result<(), IoError> {
let (worker, stealer) = chase_lev::deque();
let (worker, stealer) = deque::fifo();
let num_workers = 4;
let work_ready_mutex = Arc::new(Mutex::new(()));
let work_ready = Arc::new(Condvar::new());
Expand Down Expand Up @@ -430,7 +430,7 @@ impl<Message> IoChannel<Message> where Message: Send + Sync + 'static {
/// General IO Service. Starts an event loop and dispatches IO requests.
/// 'Message' is a notification message type
pub struct IoService<Message> where Message: Send + Sync + 'static {
thread: Mutex<Option<JoinHandle<()>>>,
thread: Option<JoinHandle<()>>,
host_channel: Mutex<Sender<IoMessage<Message>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>,
}
Expand All @@ -448,19 +448,19 @@ impl<Message> IoService<Message> where Message: Send + Sync + 'static {
IoManager::<Message>::start(&mut event_loop, h).expect("Error starting IO service");
});
Ok(IoService {
thread: Mutex::new(Some(thread)),
thread: Some(thread),
host_channel: Mutex::new(channel),
handlers: handlers,
})
}

pub fn stop(&self) {
pub fn stop(&mut self) {
trace!(target: "shutdown", "[IoService] Closing...");
// Clear handlers so that shared pointers are not stuck on stack
// in Channel::send_sync
self.handlers.write().clear();
self.host_channel.lock().send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e));
if let Some(thread) = self.thread.lock().take() {
if let Some(thread) = self.thread.take() {
thread.join().unwrap_or_else(|e| {
debug!(target: "shutdown", "Error joining IO service event loop thread: {:?}", e);
});
Expand Down
22 changes: 11 additions & 11 deletions util/io/src/service_non_mio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use std::sync::{Arc, Weak};
use std::thread;
use crossbeam::sync::chase_lev;
use deque;
use slab::Slab;
use fnv::FnvHashMap;
use {IoError, IoHandler};
Expand Down Expand Up @@ -198,7 +198,7 @@ struct Shared<Message> where Message: Send + Sync + 'static {
// necessary.
timers: Mutex<FnvHashMap<TimerToken, TimerGuard>>,
// Channel used to send work to the worker threads.
channel: Mutex<Option<chase_lev::Worker<WorkTask<Message>>>>,
channel: Mutex<Option<deque::Worker<WorkTask<Message>>>>,
}

// Messages used to communicate with the event loop from other threads.
Expand All @@ -224,7 +224,7 @@ impl<Message> Clone for WorkTask<Message> where Message: Send + Sized {
impl<Message> IoService<Message> where Message: Send + Sync + 'static {
/// Starts IO event loop
pub fn start() -> Result<IoService<Message>, IoError> {
let (tx, rx) = chase_lev::deque();
let (tx, rx) = deque::fifo();

let shared = Arc::new(Shared {
handlers: RwLock::new(Slab::with_capacity(MAX_HANDLERS)),
Expand All @@ -251,7 +251,7 @@ impl<Message> IoService<Message> where Message: Send + Sync + 'static {
}

/// Stops the IO service.
pub fn stop(&self) {
pub fn stop(&mut self) {
trace!(target: "shutdown", "[IoService] Closing...");
// Clear handlers so that shared pointers are not stuck on stack
// in Channel::send_sync
Expand Down Expand Up @@ -307,23 +307,23 @@ impl<Message> Drop for IoService<Message> where Message: Send + Sync {
}
}

fn do_work<Message>(shared: &Arc<Shared<Message>>, rx: chase_lev::Stealer<WorkTask<Message>>)
where Message: Send + Sync + 'static
fn do_work<Message>(shared: &Arc<Shared<Message>>, rx: deque::Stealer<WorkTask<Message>>)
where Message: Send + Sync + 'static
{
loop {
match rx.steal() {
chase_lev::Steal::Abort => continue,
chase_lev::Steal::Empty => thread::park(),
chase_lev::Steal::Data(WorkTask::Shutdown) => break,
chase_lev::Steal::Data(WorkTask::UserMessage(message)) => {
deque::Steal::Retry => continue,
deque::Steal::Empty => thread::park(),
deque::Steal::Data(WorkTask::Shutdown) => break,
deque::Steal::Data(WorkTask::UserMessage(message)) => {
for id in 0 .. MAX_HANDLERS {
if let Some(handler) = shared.handlers.read().get(id) {
let ctxt = IoContext { handler: id, shared: shared.clone() };
handler.message(&ctxt, &message);
}
}
},
chase_lev::Steal::Data(WorkTask::TimerTrigger { handler_id, token }) => {
deque::Steal::Data(WorkTask::TimerTrigger { handler_id, token }) => {
if let Some(handler) = shared.handlers.read().get(handler_id) {
let ctxt = IoContext { handler: handler_id, shared: shared.clone() };
handler.timeout(&ctxt, token);
Expand Down
14 changes: 8 additions & 6 deletions util/io/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::sync::Arc;
use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use crossbeam::sync::chase_lev;
use deque;
use service_mio::{HandlerId, IoChannel, IoContext};
use IoHandler;
use LOCAL_STACK_SIZE;
Expand Down Expand Up @@ -53,7 +53,7 @@ pub struct Worker {
impl Worker {
/// Creates a new worker instance.
pub fn new<Message>(index: usize,
stealer: chase_lev::Stealer<Work<Message>>,
stealer: deque::Stealer<Work<Message>>,
channel: IoChannel<Message>,
wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>,
Expand All @@ -75,8 +75,9 @@ impl Worker {
worker
}

fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>, wait: Arc<Condvar>,
fn work_loop<Message>(stealer: deque::Stealer<Work<Message>>,
channel: IoChannel<Message>,
wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>,
deleting: Arc<AtomicBool>)
where Message: Send + Sync + 'static {
Expand All @@ -91,8 +92,9 @@ impl Worker {

while !deleting.load(AtomicOrdering::Acquire) {
match stealer.steal() {
chase_lev::Steal::Data(work) => Worker::do_work(work, channel.clone()),
_ => break,
deque::Steal::Data(work) => Worker::do_work(work, channel.clone()),
deque::Steal::Retry => {},
deque::Steal::Empty => break,
}
}
}
Expand Down

0 comments on commit 0eee5da

Please sign in to comment.