Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

ethcore-io retries failed work steal #9651

Merged
merged 2 commits into from
Sep 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion ethcore/src/engines/tendermint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,6 @@ impl Engine<EthereumMachine> for Tendermint {
}

fn stop(&self) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this method altogether, there's a default empty implementation in the Engine trait.

self.step_service.stop()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop doesn't nee to be called as it is already called on drop

}

fn is_proposal(&self, header: &Header) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion util/io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
fnv = "1.0"
mio = { version = "0.6.8", optional = true }
crossbeam = "0.3"
crossbeam-deque = "0.6"
parking_lot = "0.6"
log = "0.4"
slab = "0.4"
Expand Down
2 changes: 1 addition & 1 deletion util/io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ extern crate mio;
#[macro_use]
extern crate log as rlog;
extern crate slab;
extern crate crossbeam;
extern crate crossbeam_deque as deque;
extern crate parking_lot;
extern crate num_cpus;
extern crate timer;
Expand Down
14 changes: 7 additions & 7 deletions util/io/src/service_mio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::collections::HashMap;
use mio::*;
use mio::timer::{Timeout};
use mio::deprecated::{EventLoop, Handler, Sender, EventLoopBuilder};
use crossbeam::sync::chase_lev;
use deque;
use slab::Slab;
use {IoError, IoHandler};
use worker::{Worker, Work, WorkType};
Expand Down Expand Up @@ -184,7 +184,7 @@ pub struct IoManager<Message> where Message: Send + Sync {
timers: Arc<RwLock<HashMap<HandlerId, UserTimer>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>,
workers: Vec<Worker>,
worker_channel: chase_lev::Worker<Work<Message>>,
worker_channel: deque::Worker<Work<Message>>,
work_ready: Arc<Condvar>,
}

Expand All @@ -194,7 +194,7 @@ impl<Message> IoManager<Message> where Message: Send + Sync + 'static {
event_loop: &mut EventLoop<IoManager<Message>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>
) -> Result<(), IoError> {
let (worker, stealer) = chase_lev::deque();
let (worker, stealer) = deque::fifo();
let num_workers = 4;
let work_ready_mutex = Arc::new(Mutex::new(()));
let work_ready = Arc::new(Condvar::new());
Expand Down Expand Up @@ -430,7 +430,7 @@ impl<Message> IoChannel<Message> where Message: Send + Sync + 'static {
/// General IO Service. Starts an event loop and dispatches IO requests.
/// 'Message' is a notification message type
pub struct IoService<Message> where Message: Send + Sync + 'static {
thread: Mutex<Option<JoinHandle<()>>>,
thread: Option<JoinHandle<()>>,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since rust 1.29 JoinHandle is Send + Sync

host_channel: Mutex<Sender<IoMessage<Message>>>,
handlers: Arc<RwLock<Slab<Arc<IoHandler<Message>>>>>,
}
Expand All @@ -448,19 +448,19 @@ impl<Message> IoService<Message> where Message: Send + Sync + 'static {
IoManager::<Message>::start(&mut event_loop, h).expect("Error starting IO service");
});
Ok(IoService {
thread: Mutex::new(Some(thread)),
thread: Some(thread),
host_channel: Mutex::new(channel),
handlers: handlers,
})
}

pub fn stop(&self) {
pub fn stop(&mut self) {
trace!(target: "shutdown", "[IoService] Closing...");
// Clear handlers so that shared pointers are not stuck on stack
// in Channel::send_sync
self.handlers.write().clear();
self.host_channel.lock().send(IoMessage::Shutdown).unwrap_or_else(|e| warn!("Error on IO service shutdown: {:?}", e));
if let Some(thread) = self.thread.lock().take() {
if let Some(thread) = self.thread.take() {
thread.join().unwrap_or_else(|e| {
debug!(target: "shutdown", "Error joining IO service event loop thread: {:?}", e);
});
Expand Down
22 changes: 11 additions & 11 deletions util/io/src/service_non_mio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use std::sync::{Arc, Weak};
use std::thread;
use crossbeam::sync::chase_lev;
use deque;
use slab::Slab;
use fnv::FnvHashMap;
use {IoError, IoHandler};
Expand Down Expand Up @@ -198,7 +198,7 @@ struct Shared<Message> where Message: Send + Sync + 'static {
// necessary.
timers: Mutex<FnvHashMap<TimerToken, TimerGuard>>,
// Channel used to send work to the worker threads.
channel: Mutex<Option<chase_lev::Worker<WorkTask<Message>>>>,
channel: Mutex<Option<deque::Worker<WorkTask<Message>>>>,
}

// Messages used to communicate with the event loop from other threads.
Expand All @@ -224,7 +224,7 @@ impl<Message> Clone for WorkTask<Message> where Message: Send + Sized {
impl<Message> IoService<Message> where Message: Send + Sync + 'static {
/// Starts IO event loop
pub fn start() -> Result<IoService<Message>, IoError> {
let (tx, rx) = chase_lev::deque();
let (tx, rx) = deque::fifo();

let shared = Arc::new(Shared {
handlers: RwLock::new(Slab::with_capacity(MAX_HANDLERS)),
Expand All @@ -251,7 +251,7 @@ impl<Message> IoService<Message> where Message: Send + Sync + 'static {
}

/// Stops the IO service.
pub fn stop(&self) {
pub fn stop(&mut self) {
trace!(target: "shutdown", "[IoService] Closing...");
// Clear handlers so that shared pointers are not stuck on stack
// in Channel::send_sync
Expand Down Expand Up @@ -307,23 +307,23 @@ impl<Message> Drop for IoService<Message> where Message: Send + Sync {
}
}

fn do_work<Message>(shared: &Arc<Shared<Message>>, rx: chase_lev::Stealer<WorkTask<Message>>)
where Message: Send + Sync + 'static
fn do_work<Message>(shared: &Arc<Shared<Message>>, rx: deque::Stealer<WorkTask<Message>>)
where Message: Send + Sync + 'static
{
loop {
match rx.steal() {
chase_lev::Steal::Abort => continue,
chase_lev::Steal::Empty => thread::park(),
chase_lev::Steal::Data(WorkTask::Shutdown) => break,
chase_lev::Steal::Data(WorkTask::UserMessage(message)) => {
deque::Steal::Retry => continue,
deque::Steal::Empty => thread::park(),
deque::Steal::Data(WorkTask::Shutdown) => break,
deque::Steal::Data(WorkTask::UserMessage(message)) => {
for id in 0 .. MAX_HANDLERS {
if let Some(handler) = shared.handlers.read().get(id) {
let ctxt = IoContext { handler: id, shared: shared.clone() };
handler.message(&ctxt, &message);
}
}
},
chase_lev::Steal::Data(WorkTask::TimerTrigger { handler_id, token }) => {
deque::Steal::Data(WorkTask::TimerTrigger { handler_id, token }) => {
if let Some(handler) = shared.handlers.read().get(handler_id) {
let ctxt = IoContext { handler: handler_id, shared: shared.clone() };
handler.timeout(&ctxt, token);
Expand Down
14 changes: 8 additions & 6 deletions util/io/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::sync::Arc;
use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use crossbeam::sync::chase_lev;
use deque;
use service_mio::{HandlerId, IoChannel, IoContext};
use IoHandler;
use LOCAL_STACK_SIZE;
Expand Down Expand Up @@ -53,7 +53,7 @@ pub struct Worker {
impl Worker {
/// Creates a new worker instance.
pub fn new<Message>(index: usize,
stealer: chase_lev::Stealer<Work<Message>>,
stealer: deque::Stealer<Work<Message>>,
channel: IoChannel<Message>,
wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>,
Expand All @@ -75,8 +75,9 @@ impl Worker {
worker
}

fn work_loop<Message>(stealer: chase_lev::Stealer<Work<Message>>,
channel: IoChannel<Message>, wait: Arc<Condvar>,
fn work_loop<Message>(stealer: deque::Stealer<Work<Message>>,
channel: IoChannel<Message>,
wait: Arc<Condvar>,
wait_mutex: Arc<Mutex<()>>,
deleting: Arc<AtomicBool>)
where Message: Send + Sync + 'static {
Expand All @@ -91,8 +92,9 @@ impl Worker {

while !deleting.load(AtomicOrdering::Acquire) {
match stealer.steal() {
chase_lev::Steal::Data(work) => Worker::do_work(work, channel.clone()),
_ => break,
deque::Steal::Data(work) => Worker::do_work(work, channel.clone()),
deque::Steal::Retry => {},
Copy link
Collaborator Author

@debris debris Sep 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a bug, if worker lost the race for stealing data, it was incorrectly killed

Copy link
Collaborator

@ordian ordian Sep 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The break is from the inner while loop, so it wasn't really killed, but rather put on sleep, until there is more work to do. I wonder whether this change can increase CPU usage because of busy loop.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure it's going to increase CPU usage, but should also improve performance, cause previously if a thread lost the race it would go to sleep vs re-trying and get the work done. If it went to sleep other threads had to finish whatever they were doing and then pick up the new task.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a case when Worker is being dropped. If steal returns Retry, work will never be resumed

deque::Steal::Empty => break,
}
}
}
Expand Down