Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Prevent blockchain & miner racing when accessing pending block. #9310

Merged
merged 2 commits into from
Aug 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,15 @@ impl Miner {
/// Retrieves an existing pending block iff it's not older than given block number.
///
/// NOTE: This will not prepare a new pending block if it's not existing.
/// See `map_pending_block` for alternative behaviour.
fn map_existing_pending_block<F, T>(&self, f: F, latest_block_number: BlockNumber) -> Option<T> where
F: FnOnce(&ClosedBlock) -> T,
{
self.sealing.lock().queue
.peek_last_ref()
.and_then(|b| {
if b.block().header().number() > latest_block_number {
// to prevent a data race between block import and updating pending block
// we allow the number to be equal.
if b.block().header().number() >= latest_block_number {
Some(f(b))
} else {
None
Expand Down Expand Up @@ -365,7 +366,7 @@ impl Miner {
// if at least one was pushed successfully, close and enqueue new ClosedBlock;
// otherwise, leave everything alone.
// otherwise, author a fresh block.
let mut open_block = match sealing.queue.pop_if(|b| b.block().header().parent_hash() == &best_hash) {
let mut open_block = match sealing.queue.get_pending_if(|b| b.block().header().parent_hash() == &best_hash) {
Some(old_block) => {
trace!(target: "miner", "prepare_block: Already have previous work; updating and returning");
// add transactions to old_block
Expand Down Expand Up @@ -628,7 +629,7 @@ impl Miner {
{
let mut sealing = self.sealing.lock();
sealing.next_mandatory_reseal = Instant::now() + self.options.reseal_max_period;
sealing.queue.push(block.clone());
sealing.queue.set_pending(block.clone());
sealing.queue.use_last_ref();
}

Expand Down Expand Up @@ -690,7 +691,7 @@ impl Miner {
);
let is_new = original_work_hash.map_or(true, |h| h != block_hash);

sealing.queue.push(block);
sealing.queue.set_pending(block);

#[cfg(feature = "work-notify")]
{
Expand Down Expand Up @@ -1108,7 +1109,7 @@ impl miner::MinerService for Miner {
Some(false) => {
trace!(target: "miner", "update_sealing: engine is not keen to seal internally right now");
// anyway, save the block for later use
self.sealing.lock().queue.push(block);
self.sealing.lock().queue.set_pending(block);
},
None => {
trace!(target: "miner", "update_sealing: engine does not seal internally, preparing work");
Expand Down
85 changes: 42 additions & 43 deletions util/using_queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<T> UsingQueue<T> {

/// Return a reference to the item at the top of the queue (or `None` if the queue is empty);
/// this constitutes using the item and will remain in the queue for at least another
/// `max_size` invocations of `push()`.
/// `max_size` invocations of `set_pending() + use_last_ref()`.
pub fn use_last_ref(&mut self) -> Option<&T> {
if let Some(x) = self.pending.take() {
self.in_use.push(x);
Expand All @@ -65,9 +65,9 @@ impl<T> UsingQueue<T> {
self.in_use.last()
}

/// Place an item on the end of the queue. The previously `push()`ed item will be removed
/// if `use_last_ref()` since it was `push()`ed.
pub fn push(&mut self, b: T) {
/// Place an item on the end of the queue. The previously pending item will be removed
/// if `use_last_ref()` since it was set.
pub fn set_pending(&mut self, b: T) {
self.pending = Some(b);
}

Expand Down Expand Up @@ -100,17 +100,16 @@ impl<T> UsingQueue<T> {
}
}

/// Returns the most recently pushed block if `f` returns `true` with a reference to it as
/// Returns a clone of the pending block if `f` returns `true` with a reference to it as
/// a parameter, otherwise `None`.
/// Will not destroy a block if a reference to it has previously been returned by `use_last_ref`,
/// but rather clone it.
pub fn pop_if<P>(&mut self, predicate: P) -> Option<T> where P: Fn(&T) -> bool, T: Clone {
///
/// If pending block is not available will clone the first of the used blocks that match the predicate.
pub fn get_pending_if<P>(&mut self, predicate: P) -> Option<T> where P: Fn(&T) -> bool, T: Clone {
// a bit clumsy - TODO: think about a nicer way of expressing this.
if let Some(x) = self.pending.take() {
if predicate(&x) {
Some(x)
if let Some(ref x) = self.pending {
if predicate(x) {
Some(x.clone())
} else {
self.pending = Some(x);
None
}
} else {
Expand All @@ -122,29 +121,29 @@ impl<T> UsingQueue<T> {
#[test]
fn should_not_find_when_pushed() {
let mut q = UsingQueue::new(2);
q.push(1);
q.set_pending(1);
assert!(q.take_used_if(|i| i == &1).is_none());
}

#[test]
fn should_not_find_when_pushed_with_clone() {
let mut q = UsingQueue::new(2);
q.push(1);
q.set_pending(1);
assert!(q.clone_used_if(|i| i == &1).is_none());
}

#[test]
fn should_find_when_pushed_and_used() {
let mut q = UsingQueue::new(2);
q.push(1);
q.set_pending(1);
q.use_last_ref();
assert!(q.take_used_if(|i| i == &1).unwrap() == 1);
}

#[test]
fn should_have_same_semantics_for_get_take_clone() {
let mut q = UsingQueue::new(2);
q.push(1);
q.set_pending(1);
assert!(q.get_used_if(GetAction::Clone, |i| i == &1).is_none());
assert!(q.get_used_if(GetAction::Take, |i| i == &1).is_none());
q.use_last_ref();
Expand All @@ -158,15 +157,15 @@ fn should_have_same_semantics_for_get_take_clone() {
#[test]
fn should_find_when_pushed_and_used_with_clone() {
let mut q = UsingQueue::new(2);
q.push(1);
q.set_pending(1);
q.use_last_ref();
assert!(q.clone_used_if(|i| i == &1).unwrap() == 1);
}

#[test]
fn should_not_find_again_when_pushed_and_taken() {
let mut q = UsingQueue::new(2);
q.push(1);
q.set_pending(1);
q.use_last_ref();
assert!(q.take_used_if(|i| i == &1).unwrap() == 1);
assert!(q.clone_used_if(|i| i == &1).is_none());
Expand All @@ -175,7 +174,7 @@ fn should_not_find_again_when_pushed_and_taken() {
#[test]
fn should_find_again_when_pushed_and_cloned() {
let mut q = UsingQueue::new(2);
q.push(1);
q.set_pending(1);
q.use_last_ref();
assert!(q.clone_used_if(|i| i == &1).unwrap() == 1);
assert!(q.clone_used_if(|i| i == &1).unwrap() == 1);
Expand All @@ -185,93 +184,93 @@ fn should_find_again_when_pushed_and_cloned() {
#[test]
fn should_find_when_others_used() {
let mut q = UsingQueue::new(2);
q.push(1);
q.set_pending(1);
q.use_last_ref();
q.push(2);
q.set_pending(2);
q.use_last_ref();
assert!(q.take_used_if(|i| i == &1).is_some());
}

#[test]
fn should_not_find_when_too_many_used() {
let mut q = UsingQueue::new(1);
q.push(1);
q.set_pending(1);
q.use_last_ref();
q.push(2);
q.set_pending(2);
q.use_last_ref();
assert!(q.take_used_if(|i| i == &1).is_none());
}

#[test]
fn should_not_find_when_not_used_and_then_pushed() {
let mut q = UsingQueue::new(3);
q.push(1);
q.push(2);
q.set_pending(1);
q.set_pending(2);
q.use_last_ref();
assert!(q.take_used_if(|i| i == &1).is_none());
}

#[test]
fn should_peek_correctly_after_push() {
let mut q = UsingQueue::new(3);
q.push(1);
q.set_pending(1);
assert_eq!(q.peek_last_ref(), Some(&1));
q.push(2);
q.set_pending(2);
assert_eq!(q.peek_last_ref(), Some(&2));
}

#[test]
fn should_inspect_correctly() {
let mut q = UsingQueue::new(3);
q.push(1);
q.set_pending(1);
assert_eq!(q.use_last_ref(), Some(&1));
assert_eq!(q.peek_last_ref(), Some(&1));
q.push(2);
q.set_pending(2);
assert_eq!(q.use_last_ref(), Some(&2));
assert_eq!(q.peek_last_ref(), Some(&2));
}

#[test]
fn should_not_find_when_not_used_peeked_and_then_pushed() {
let mut q = UsingQueue::new(3);
q.push(1);
q.set_pending(1);
q.peek_last_ref();
q.push(2);
q.set_pending(2);
q.use_last_ref();
assert!(q.take_used_if(|i| i == &1).is_none());
}

#[test]
fn should_pop_used() {
let mut q = UsingQueue::new(3);
q.push(1);
q.set_pending(1);
q.use_last_ref();
let popped = q.pop_if(|i| i == &1);
let popped = q.get_pending_if(|i| i == &1);
assert_eq!(popped, Some(1));
}

#[test]
fn should_pop_unused() {
fn should_not_pop_last_pending() {
let mut q = UsingQueue::new(3);
q.push(1);
assert_eq!(q.pop_if(|i| i == &1), Some(1));
assert_eq!(q.pop_if(|i| i == &1), None);
q.set_pending(1);
assert_eq!(q.get_pending_if(|i| i == &1), Some(1));
assert_eq!(q.get_pending_if(|i| i == &1), Some(1));
}

#[test]
fn should_not_pop_unused_before_used() {
let mut q = UsingQueue::new(3);
q.push(1);
q.push(2);
let popped = q.pop_if(|i| i == &1);
q.set_pending(1);
q.set_pending(2);
let popped = q.get_pending_if(|i| i == &1);
assert_eq!(popped, None);
}

#[test]
fn should_not_remove_used_popped() {
let mut q = UsingQueue::new(3);
q.push(1);
q.set_pending(1);
q.use_last_ref();
assert_eq!(q.pop_if(|i| i == &1), Some(1));
assert_eq!(q.pop_if(|i| i == &1), Some(1));
assert_eq!(q.get_pending_if(|i| i == &1), Some(1));
assert_eq!(q.get_pending_if(|i| i == &1), Some(1));
}