Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Refactor resuming snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
ngotchac committed May 9, 2018
1 parent ddf2f3d commit 75fd4b5
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 119 deletions.
141 changes: 65 additions & 76 deletions ethcore/src/snapshot/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ pub struct Service {
progress: super::Progress,
taking_snapshot: AtomicBool,
restoring_snapshot: AtomicBool,
restoration_ready: AtomicBool,
}

impl Service {
Expand All @@ -263,7 +262,6 @@ impl Service {
progress: Default::default(),
taking_snapshot: AtomicBool::new(false),
restoring_snapshot: AtomicBool::new(false),
restoration_ready: AtomicBool::new(false),
};

// create the root snapshot dir if it doesn't exist.
Expand Down Expand Up @@ -416,81 +414,78 @@ impl Service {
/// Initialize the restoration synchronously.
/// The recover flag indicates whether to recover the restored snapshot.
pub fn init_restore(&self, manifest: ManifestData, recover: bool) -> Result<(), Error> {
{
let mut res = self.restoration.lock();

let rest_dir = self.restoration_dir();
let rest_db = self.restoration_db();
let recovery_temp = self.temp_recovery_dir();
let prev_chunks = self.prev_chunks_dir();

// delete and restore the restoration dir.
if let Err(e) = fs::remove_dir_all(&prev_chunks) {
match e.kind() {
ErrorKind::NotFound => {},
_ => return Err(e.into()),
}
}
let mut res = self.restoration.lock();

self.restoration_ready.store(false, Ordering::SeqCst);
let rest_dir = self.restoration_dir();
let rest_db = self.restoration_db();
let recovery_temp = self.temp_recovery_dir();
let prev_chunks = self.prev_chunks_dir();

// Move the previous recovery temp directory
// to `prev_chunks` to be able to restart restoring
// with previously downloaded blocks
// This step is optional, so don't fail on error
fs::rename(&recovery_temp, &prev_chunks).ok();
// delete and restore the restoration dir.
if let Err(e) = fs::remove_dir_all(&prev_chunks) {
match e.kind() {
ErrorKind::NotFound => {},
_ => return Err(e.into()),
}
}

self.state_chunks.store(0, Ordering::SeqCst);
self.block_chunks.store(0, Ordering::SeqCst);
// Move the previous recovery temp directory
// to `prev_chunks` to be able to restart restoring
// with previously downloaded blocks
// This step is optional, so don't fail on error
fs::rename(&recovery_temp, &prev_chunks).ok();

// tear down existing restoration.
*res = None;
self.state_chunks.store(0, Ordering::SeqCst);
self.block_chunks.store(0, Ordering::SeqCst);

// delete and restore the restoration dir.
if let Err(e) = fs::remove_dir_all(&rest_dir) {
match e.kind() {
ErrorKind::NotFound => {},
_ => return Err(e.into()),
}
}
// tear down existing restoration.
*res = None;

fs::create_dir_all(&rest_dir)?;

// make new restoration.
let writer = match recover {
true => Some(LooseWriter::new(self.temp_recovery_dir())?),
false => None
};

let params = RestorationParams {
manifest: manifest.clone(),
pruning: self.pruning,
db: self.restoration_db_handler.open(&self.restoration_db())?,
writer: writer,
genesis: &self.genesis_block,
guard: Guard::new(rest_db),
engine: &*self.engine,
};

let state_chunks = manifest.state_hashes.len();
let block_chunks = manifest.block_hashes.len();

*res = Some(Restoration::new(params)?);

*self.status.lock() = RestorationStatus::Ongoing {
state_chunks: state_chunks as u32,
block_chunks: block_chunks as u32,
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32,
block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32,
};

self.restoring_snapshot.store(true, Ordering::SeqCst);

// Import previous chunks, continue if it fails
self.import_prev_chunks(&mut res, manifest).ok();
self.restoration_ready.store(true, Ordering::SeqCst);
// delete and restore the restoration dir.
if let Err(e) = fs::remove_dir_all(&rest_dir) {
match e.kind() {
ErrorKind::NotFound => {},
_ => return Err(e.into()),
}
}

{ *self.status.lock() = RestorationStatus::Initializing; }

fs::create_dir_all(&rest_dir)?;

// make new restoration.
let writer = match recover {
true => Some(LooseWriter::new(self.temp_recovery_dir())?),
false => None
};

let params = RestorationParams {
manifest: manifest.clone(),
pruning: self.pruning,
db: self.restoration_db_handler.open(&self.restoration_db())?,
writer: writer,
genesis: &self.genesis_block,
guard: Guard::new(rest_db),
engine: &*self.engine,
};

let state_chunks = manifest.state_hashes.len();
let block_chunks = manifest.block_hashes.len();

*res = Some(Restoration::new(params)?);

self.restoring_snapshot.store(true, Ordering::SeqCst);

// Import previous chunks, continue if it fails
self.import_prev_chunks(&mut res, manifest).ok();

*self.status.lock() = RestorationStatus::Ongoing {
state_chunks: state_chunks as u32,
block_chunks: block_chunks as u32,
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32,
block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32,
};

Ok(())
}

Expand Down Expand Up @@ -583,7 +578,6 @@ impl Service {

let _ = fs::remove_dir_all(self.restoration_dir());
*self.status.lock() = RestorationStatus::Inactive;
self.restoration_ready.store(false, Ordering::SeqCst);

Ok(())
}
Expand All @@ -603,7 +597,7 @@ impl Service {
trace!(target: "snapshot", "Tried to restore chunk {:x} while inactive or failed", hash);
return Ok(());
},
RestorationStatus::Ongoing { .. } => {
RestorationStatus::Ongoing { .. } | RestorationStatus::Initializing => {
let (res, db) = {
let rest = match *restoration {
Some(ref mut r) => r,
Expand Down Expand Up @@ -682,10 +676,6 @@ impl SnapshotService for Service {
self.reader.read().as_ref().and_then(|r| r.chunk(hash).ok())
}

fn restoration_ready(&self) -> bool {
self.restoration_ready.load(Ordering::SeqCst)
}

fn completed_chunks(&self) -> Option<Vec<H256>> {
let restoration = self.restoration.lock();

Expand Down Expand Up @@ -727,7 +717,6 @@ impl SnapshotService for Service {
fn abort_restore(&self) {
trace!(target: "snapshot", "Aborting restore");
self.restoring_snapshot.store(false, Ordering::SeqCst);
self.restoration_ready.store(false, Ordering::SeqCst);
*self.restoration.lock() = None;
*self.status.lock() = RestorationStatus::Inactive;
}
Expand Down
3 changes: 0 additions & 3 deletions ethcore/src/snapshot/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ pub trait SnapshotService : Sync + Send {
/// `None` indicates warp sync isn't supported by the consensus engine.
fn supported_versions(&self) -> Option<(u64, u64)>;

/// Returns whether the Snapshot Service restoration is ready
fn restoration_ready(&self) -> bool;

/// Returns a list of the completed chunks
fn completed_chunks(&self) -> Option<Vec<H256>>;

Expand Down
6 changes: 5 additions & 1 deletion ethcore/sync/src/chain/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ impl SyncHandler {
}
sync.snapshot.reset_to(&manifest, &keccak(manifest_rlp.as_raw()));
io.snapshot_service().begin_restore(manifest);
sync.state = SyncState::SnapshotInit;
sync.state = SyncState::SnapshotData;

// give a task to the same peer first.
sync.sync_peer(io, peer_id, false);
Expand Down Expand Up @@ -578,6 +578,10 @@ impl SyncHandler {
sync.continue_sync(io);
return Ok(());
},
RestorationStatus::Initializing => {
trace!(target: "warp", "{}: Snapshot restoration is initializing", peer_id);
return Ok(());
}
RestorationStatus::Ongoing { .. } => {
trace!(target: "sync", "{}: Snapshot restoration is ongoing", peer_id);
},
Expand Down
52 changes: 24 additions & 28 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ pub enum SyncState {
WaitingPeers,
/// Waiting for snapshot manifest download
SnapshotManifest,
/// Snapshot service is initializing
SnapshotInit,
/// Downloading snapshot data
SnapshotData,
/// Waiting for snapshot restoration progress.
Expand Down Expand Up @@ -254,7 +252,6 @@ impl SyncStatus {
match self.state {
SyncState::SnapshotManifest |
SyncState::SnapshotData |
SyncState::SnapshotInit |
SyncState::SnapshotWaiting => true,
_ => false,
}
Expand Down Expand Up @@ -653,17 +650,6 @@ impl ChainSync {
}
}

/// Check if the snapshot service is ready
fn check_snapshot_service(&mut self, io: &SyncIo) {
if io.snapshot_service().restoration_ready() {
trace!(target: "snapshot", "Snapshot Service is ready!");
// Sync the previously resumed chunks
self.snapshot.sync(io);
// Move to fetching snapshot data
self.state = SyncState::SnapshotData;
}
}

/// Resume downloading
fn continue_sync(&mut self, io: &mut SyncIo) {
// Collect active peers that can sync
Expand Down Expand Up @@ -726,10 +712,6 @@ impl ChainSync {
trace!(target: "sync", "Waiting for the block queue");
return;
}
if self.state == SyncState::SnapshotInit {
trace!(target: "sync", "Waiting for the snapshot service to initialize");
return;
}
if self.state == SyncState::SnapshotWaiting {
trace!(target: "sync", "Waiting for the snapshot restoration");
return;
Expand Down Expand Up @@ -788,22 +770,36 @@ impl ChainSync {
}
},
SyncState::SnapshotData => {
if let RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } = io.snapshot_service().status() {
if self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize > MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
trace!(target: "sync", "Snapshot queue full, pausing sync");
self.state = SyncState::SnapshotWaiting;
match io.snapshot_service().status() {
RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } => {
// Initialize the snapshot if not already done
if !self.snapshot.is_initialized() {
self.snapshot.initialize(io);
}

if self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize > MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
trace!(target: "sync", "Snapshot queue full, pausing sync");
self.state = SyncState::SnapshotWaiting;
return;
}
},
RestorationStatus::Initializing => {
trace!(target: "warp", "Snapshot is stil initializing.");
return;
}
},
_ => {
return;
},
}

if peer_snapshot_hash.is_some() && peer_snapshot_hash == self.snapshot.snapshot_hash() {
self.clear_peer_download(peer_id);
SyncRequester::request_snapshot_data(self, io, peer_id);
}
},
SyncState::SnapshotManifest | //already downloading from other peer
SyncState::Waiting |
SyncState::SnapshotWaiting |
SyncState::SnapshotInit => ()
SyncState::SnapshotWaiting => ()
}
} else {
trace!(target: "sync", "Skipping peer {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state);
Expand Down Expand Up @@ -949,6 +945,9 @@ impl ChainSync {
trace!(target:"sync", "Snapshot restoration is complete");
self.restart(io);
},
RestorationStatus::Initializing => {
trace!(target:"sync", "Snapshot restoration is initializing");
},
RestorationStatus::Ongoing { state_chunks_done, block_chunks_done, .. } => {
if !self.snapshot.is_complete() && self.snapshot.done_chunks() - (state_chunks_done + block_chunks_done) as usize <= MAX_SNAPSHOT_CHUNKS_DOWNLOAD_AHEAD {
trace!(target:"sync", "Resuming snapshot sync");
Expand All @@ -964,9 +963,6 @@ impl ChainSync {
},
}
},
SyncState::SnapshotInit => {
self.check_snapshot_service(io);
}
_ => (),
}
}
Expand Down
17 changes: 15 additions & 2 deletions ethcore/sync/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct Snapshot {
completed_chunks: HashSet<H256>,
snapshot_hash: Option<H256>,
bad_hashes: HashSet<H256>,
initialized: bool,
}

impl Snapshot {
Expand All @@ -48,20 +49,27 @@ impl Snapshot {
completed_chunks: HashSet::new(),
snapshot_hash: None,
bad_hashes: HashSet::new(),
initialized: false,
}
}

/// Sync the Snapshot completed chunks with the Snapshot Service
pub fn sync (&mut self, io: &SyncIo) {
pub fn initialize (&mut self, io: &SyncIo) {
if self.initialized {
return;
}

if let Some(completed_chunks) = io.snapshot_service().completed_chunks() {
self.completed_chunks = HashSet::from_iter(completed_chunks);
}

trace!(
target: "snapshot",
"Synced ChainSync snapshot with {} completed chunks",
"Snapshot is now initialized with {} completed chunks.",
self.completed_chunks.len(),
);

self.initialized = true;
}

/// Clear everything.
Expand All @@ -71,6 +79,7 @@ impl Snapshot {
self.downloading_chunks.clear();
self.completed_chunks.clear();
self.snapshot_hash = None;
self.initialized = false;
}

/// Check if currently downloading a snapshot.
Expand Down Expand Up @@ -153,6 +162,10 @@ impl Snapshot {
pub fn is_complete(&self) -> bool {
self.total_chunks() == self.completed_chunks.len()
}

pub fn is_initialized(&self) -> bool {
self.initialized
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 75fd4b5

Please sign in to comment.