From c73ec49e301bdb2a3fcd862c1f66393d2cec9e97 Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Fri, 4 Aug 2023 15:51:52 +0800 Subject: [PATCH] refactor: refactor the error handling logic in compact Closes: 188 Signed-off-by: Phoeniix Zhao --- xline/src/server/kv_server.rs | 78 +++++-------------- xline/src/server/watch_server.rs | 32 +++----- xline/src/storage/compact/mod.rs | 25 ++++-- .../src/storage/compact/periodic_compactor.rs | 25 ++++-- .../src/storage/compact/revision_compactor.rs | 23 ++++-- xline/src/storage/mod.rs | 1 - 6 files changed, 85 insertions(+), 99 deletions(-) diff --git a/xline/src/server/kv_server.rs b/xline/src/server/kv_server.rs index 38f6e6702..9d8e244a1 100644 --- a/xline/src/server/kv_server.rs +++ b/xline/src/server/kv_server.rs @@ -437,18 +437,12 @@ mod test { ..Default::default() }; - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} is higher than current revision {}", - range_request_with_future_rev.revision, current_revision - )) - .to_string(); - let message = tonic::Status::from( + let expected_tonic_status = tonic::Status::from( range_request_with_future_rev .check_revision(compacted_revision, current_revision) .unwrap_err(), - ) - .to_string(); - assert_eq!(message, expected_err_message); + ); + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); let range_request_with_compacted_rev = RangeRequest { key: b"foo".to_vec(), @@ -456,19 +450,13 @@ mod test { ..Default::default() }; - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} has been compacted, compacted revision is {}", - range_request_with_compacted_rev.revision, compacted_revision - )) - .to_string(); - - let message = tonic::Status::from( + let expected_tonic_status = tonic::Status::from( range_request_with_compacted_rev .check_revision(compacted_revision, current_revision) .unwrap_err(), - ) - .to_string(); - assert_eq!(message, expected_err_message); + ); + + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); } #[tokio::test] @@ -487,20 +475,13 @@ mod test { failure: vec![], }; - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} is higher than current revision {}", - 20, current_revision - )) - .to_string(); - - let message = tonic::Status::from( + let expected_tonic_status = tonic::Status::from( txn_request_with_future_revision .check_revision(compacted_revision, current_revision) .unwrap_err(), - ) - .to_string(); + ); - assert_eq!(message, expected_err_message); + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); let txn_request_with_compacted_revision = TxnRequest { compare: vec![], @@ -514,20 +495,13 @@ mod test { failure: vec![], }; - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} has been compacted, compacted revision is {}", - 3, compacted_revision - )) - .to_string(); - - let message = tonic::Status::from( + let expected_tonic_status = tonic::Status::from( txn_request_with_compacted_revision .check_revision(compacted_revision, current_revision) .unwrap_err(), - ) - .to_string(); + ); - assert_eq!(message, expected_err_message); + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); } #[tokio::test] @@ -537,24 +511,12 @@ mod test { ..Default::default() }; - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} is higher than current revision {}", - compact_request.revision, 8 - )) - .to_string(); - - let message = - tonic::Status::from(compact_request.check_revision(3, 8).unwrap_err()).to_string(); - assert_eq!(message, expected_err_message); - - let expected_err_message = tonic::Status::out_of_range(format!( - "required revision {} has been compacted, compacted revision is {}", - compact_request.revision, 13 - )) - .to_string(); - - let message = - tonic::Status::from(compact_request.check_revision(13, 18).unwrap_err()).to_string(); - assert_eq!(message, expected_err_message); + let expected_tonic_status = + tonic::Status::from(compact_request.check_revision(3, 8).unwrap_err()); + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); + + let expected_tonic_status = + tonic::Status::from(compact_request.check_revision(13, 18).unwrap_err()); + assert_eq!(expected_tonic_status.code(), tonic::Code::OutOfRange); } } diff --git a/xline/src/server/watch_server.rs b/xline/src/server/watch_server.rs index b3345ed32..736574c72 100644 --- a/xline/src/server/watch_server.rs +++ b/xline/src/server/watch_server.rs @@ -290,17 +290,17 @@ where /// Handle watch event async fn handle_watch_event(&mut self, mut watch_event: WatchEvent) { let watch_id = watch_event.watch_id(); - let response = if watch_event.compacted() { - WatchResponse { - header: Some(ResponseHeader { - revision: watch_event.revision(), - ..ResponseHeader::default() - }), - watch_id, - compact_revision: self.kv_watcher.compacted_revision(), - canceled: true, - ..WatchResponse::default() - } + let mut response = WatchResponse { + header: Some(ResponseHeader { + revision: watch_event.revision(), + ..ResponseHeader::default() + }), + watch_id, + ..WatchResponse::default() + }; + if watch_event.compacted() { + response.compact_revision = self.kv_watcher.compacted_revision(); + response.canceled = true; } else { let mut events = watch_event.take_events(); if events.is_empty() { @@ -318,15 +318,7 @@ where } } } - WatchResponse { - header: Some(ResponseHeader { - revision: watch_event.revision(), - ..ResponseHeader::default() - }), - watch_id, - events, - ..WatchResponse::default() - } + response.events = events; }; if self.response_tx.send(Ok(response)).await.is_err() { diff --git a/xline/src/storage/compact/mod.rs b/xline/src/storage/compact/mod.rs index 56abb8bc7..4036131e3 100644 --- a/xline/src/storage/compact/mod.rs +++ b/xline/src/storage/compact/mod.rs @@ -1,7 +1,11 @@ use std::{sync::Arc, time::Duration}; use async_trait::async_trait; -use curp::{client::Client, cmd::ProposeId, error::ProposeError}; +use curp::{ + client::Client, + cmd::ProposeId, + error::CommandProposeError::{AfterSync, Execute}, +}; use event_listener::Event; use periodic_compactor::PeriodicCompactor; use revision_compactor::RevisionCompactor; @@ -12,7 +16,7 @@ use uuid::Uuid; use super::{ index::{Index, IndexOperate}, storage_api::StorageApi, - KvStore, + ExecuteError, KvStore, }; use crate::{ revision_number::RevisionNumberGenerator, @@ -45,12 +49,12 @@ pub(crate) trait Compactor: std::fmt::Debug + Send + Sync { #[async_trait] pub(crate) trait Compactable: std::fmt::Debug + Send + Sync { /// do compact - async fn compact(&self, revision: i64) -> Result<(), ProposeError>; + async fn compact(&self, revision: i64) -> Result<(), ExecuteError>; } #[async_trait] impl Compactable for Client { - async fn compact(&self, revision: i64) -> Result<(), ProposeError> { + async fn compact(&self, revision: i64) -> Result<(), ExecuteError> { let request = CompactionRequest { revision, physical: false, @@ -58,8 +62,17 @@ impl Compactable for Client { let request_wrapper = RequestWithToken::new_with_token(request.into(), None); let propose_id = ProposeId::new(format!("auto-compactor-{}", Uuid::new_v4())); let cmd = Command::new(vec![], request_wrapper, propose_id); - let _cmd_res = self.propose(cmd).await?; - Ok(()) + if let Err(e) = self.propose(cmd).await { + #[allow(clippy::wildcard_enum_match_arm)] + match e { + Execute(e) | AfterSync(e) => Err(e), + _ => { + unreachable!("Compaction should not receive any errors other than ExecuteError, but it receives {e:?}"); + } + } + } else { + Ok(()) + } } } diff --git a/xline/src/storage/compact/periodic_compactor.rs b/xline/src/storage/compact/periodic_compactor.rs index a3d8b12af..76caf8812 100644 --- a/xline/src/storage/compact/periodic_compactor.rs +++ b/xline/src/storage/compact/periodic_compactor.rs @@ -12,7 +12,7 @@ use event_listener::Event; use tracing::{info, warn}; use super::{Compactable, Compactor}; -use crate::revision_number::RevisionNumberGenerator; +use crate::{revision_number::RevisionNumberGenerator, storage::ExecuteError}; /// `RevisionWindow` is a ring buffer used to store periodically sampled revision. struct RevisionWindow { @@ -106,13 +106,24 @@ impl PeriodicCompactor { "starting auto periodic compaction, revision = {}, period = {:?}", revision, self.period ); - // TODO: add more error processing logic + if let Err(e) = self.client.compact(revision).await { - warn!( - "failed auto revision compaction, revision = {}, period = {:?}, error: {:?}", - revision, self.period, e - ); - None + if let ExecuteError::RevisionCompacted(_rev, compacted_rev) = e { + info!( + "required revision {} has been compacted, the current compacted revision is {}, period = {:?}, took {:?}", + revision, + compacted_rev, + self.period, + now.elapsed().as_secs() + ); + Some(compacted_rev) + } else { + warn!( + "failed auto revision compaction, revision = {}, period = {:?}, error: {:?}", + revision, self.period, e + ); + None + } } else { info!( "completed auto revision compaction, revision = {}, period = {:?}, took {:?}", diff --git a/xline/src/storage/compact/revision_compactor.rs b/xline/src/storage/compact/revision_compactor.rs index 8603ffad9..6f38cc0a0 100644 --- a/xline/src/storage/compact/revision_compactor.rs +++ b/xline/src/storage/compact/revision_compactor.rs @@ -11,7 +11,7 @@ use event_listener::Event; use tracing::{info, warn}; use super::{Compactable, Compactor}; -use crate::revision_number::RevisionNumberGenerator; +use crate::{revision_number::RevisionNumberGenerator, storage::ExecuteError}; /// check for the need of compaction every 5 minutes const CHECK_INTERVAL: Duration = Duration::from_secs(5 * 60); @@ -65,13 +65,22 @@ impl RevisionCompactor { "starting auto revision compaction, revision = {}, retention = {}", target_revision, self.retention ); - // TODO: add more error processing logic if let Err(e) = self.client.compact(target_revision).await { - warn!( - "failed auto revision compaction, revision = {}, retention = {}, error: {:?}", - target_revision, self.retention, e - ); - None + if let ExecuteError::RevisionCompacted(_rev, compacted_rev) = e { + info!( + "required revision {} has been compacted, the current compacted revision is {}, retention = {:?}", + target_revision, + compacted_rev, + self.retention, + ); + Some(compacted_rev) + } else { + warn!( + "failed auto revision compaction, revision = {}, retention = {}, error: {:?}", + target_revision, self.retention, e + ); + None + } } else { info!( "completed auto revision compaction, revision = {}, retention = {}, took {:?}", diff --git a/xline/src/storage/mod.rs b/xline/src/storage/mod.rs index a3961d980..1463cdcb1 100644 --- a/xline/src/storage/mod.rs +++ b/xline/src/storage/mod.rs @@ -22,7 +22,6 @@ pub(crate) mod snapshot_allocator; pub(crate) mod storage_api; pub use self::execute_error::ExecuteError; - pub(crate) use self::{ auth_store::AuthStore, kv_store::KvStore, lease_store::LeaseStore, revision::Revision, };