From c67913252b958322c7fb23ca49a5e2cca51bd246 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 4 Dec 2023 21:56:14 +0530 Subject: [PATCH 1/7] feat: track disk space utilized by persistence --- storage/src/lib.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 03919e21c..53d738fd3 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -97,6 +97,10 @@ impl Storage { next_file.file.write_all(&hash.to_be_bytes())?; next_file.file.write_all(&self.current_write_file[..])?; next_file.file.flush()?; + + // 8 is the number of bytes the hash(u64) occupies + persistence.bytes_occupied += 8 + self.current_write_file.len(); + self.current_write_file.clear(); Ok(next_file.deleted) } @@ -213,6 +217,8 @@ struct Persistence { // /// Deleted file id // deleted: Option, non_destructive_read: bool, + /// Disk space(in bytes) currently occupied by persistence files + bytes_occupied: usize, } impl Persistence { @@ -228,6 +234,7 @@ impl Persistence { current_read_file_id: None, // deleted: None, non_destructive_read: false, + bytes_occupied: 0, }) } @@ -239,8 +246,13 @@ impl Persistence { } /// Removes a file with provided id - fn remove(&self, id: u64) -> Result { + fn remove(&mut self, id: u64) -> Result { let path = self.path(id)?; + + // Query the fs to track size of removed persistence file + let metadata = fs::metadata(&path)?; + self.bytes_occupied -= metadata.len() as usize; + fs::remove_file(&path)?; Ok(path) From d6390f7418f97167a279bbe27d559d28b6370fd3 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 4 Dec 2023 22:13:17 +0530 Subject: [PATCH 2/7] feat: track disk utilization by persistence --- storage/src/lib.rs | 7 +++++++ uplink/src/base/serializer/metrics.rs | 7 +++++++ uplink/src/base/serializer/mod.rs | 9 +++++++++ 3 files changed, 23 insertions(+) diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 53d738fd3..d5b1dc12f 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -70,6 +70,13 @@ impl Storage { } } + pub fn disk_utilized(&self) -> usize { + match &self.persistence { + Some(p) => p.bytes_occupied, + None => 0, + } + } + pub fn inmemory_read_size(&self) -> usize { self.current_read_file.len() } diff --git a/uplink/src/base/serializer/metrics.rs b/uplink/src/base/serializer/metrics.rs index 2844aeadd..6f9543e0e 100644 --- a/uplink/src/base/serializer/metrics.rs +++ b/uplink/src/base/serializer/metrics.rs @@ -17,6 +17,8 @@ pub struct SerializerMetrics { pub read_memory: usize, /// Number of files that have been written to disk pub disk_files: usize, + /// Number of bytes that have been written to disk + pub disk_utilized: usize, /// Nuber of persistence files that had to deleted before being consumed pub lost_segments: usize, /// Number of errors faced during serializer operation @@ -35,6 +37,7 @@ impl SerializerMetrics { write_memory: 0, read_memory: 0, disk_files: 0, + disk_utilized: 0, lost_segments: 0, errors: 0, sent_size: 0, @@ -68,6 +71,10 @@ impl SerializerMetrics { self.disk_files = count; } + pub fn set_disk_utilized(&mut self, bytes: usize) { + self.disk_utilized = bytes; + } + pub fn increment_errors(&mut self) { self.errors += 1; } diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 603308345..61de05127 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -578,16 +578,19 @@ fn check_metrics(metrics: &mut SerializerMetrics, storage_handler: &StorageHandl let mut inmemory_write_size = 0; let mut inmemory_read_size = 0; let mut file_count = 0; + let mut disk_utilized = 0; for storage in storage_handler.map.values() { inmemory_read_size += storage.inmemory_read_size(); inmemory_write_size += storage.inmemory_write_size(); file_count += storage.file_count(); + disk_utilized += storage.disk_utilized(); } metrics.set_write_memory(inmemory_write_size); metrics.set_read_memory(inmemory_read_size); metrics.set_disk_files(file_count); + metrics.set_disk_files(disk_utilized); info!( "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} write_memory = {} read_memory = {}", @@ -609,16 +612,19 @@ fn save_and_prepare_next_metrics( let mut inmemory_write_size = 0; let mut inmemory_read_size = 0; let mut file_count = 0; + let mut disk_utilized = 0; for storage in storage_handler.map.values() { inmemory_write_size += storage.inmemory_write_size(); inmemory_read_size += storage.inmemory_read_size(); file_count += storage.file_count(); + disk_utilized += storage.disk_utilized(); } metrics.set_write_memory(inmemory_write_size); metrics.set_read_memory(inmemory_read_size); metrics.set_disk_files(file_count); + metrics.set_disk_utilized(disk_utilized); let m = metrics.clone(); pending.push_back(m); @@ -637,16 +643,19 @@ fn check_and_flush_metrics( let mut inmemory_write_size = 0; let mut inmemory_read_size = 0; let mut file_count = 0; + let mut disk_utilized = 0; for storage in storage_handler.map.values() { inmemory_write_size += storage.inmemory_write_size(); inmemory_read_size += storage.inmemory_read_size(); file_count += storage.file_count(); + disk_utilized += storage.disk_utilized(); } metrics.set_write_memory(inmemory_write_size); metrics.set_read_memory(inmemory_read_size); metrics.set_disk_files(file_count); + metrics.set_disk_utilized(disk_utilized); // Send pending metrics. This signifies state change while let Some(metrics) = pending.get(0) { From fc39c5f2e2834cdd9070858b7f4702116554acc3 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 5 Dec 2023 15:34:44 +0530 Subject: [PATCH 3/7] log: disk utilization --- uplink/src/base/serializer/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 61de05127..bc0c87ea3 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -593,12 +593,13 @@ fn check_metrics(metrics: &mut SerializerMetrics, storage_handler: &StorageHandl metrics.set_disk_files(disk_utilized); info!( - "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} write_memory = {} read_memory = {}", + "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} disk_utilized = {} write_memory = {} read_memory = {}", metrics.mode, metrics.batches, metrics.errors, metrics.lost_segments, metrics.disk_files, + convert(metrics.disk_utilized as f64), convert(metrics.write_memory as f64), convert(metrics.read_memory as f64), ); @@ -661,12 +662,13 @@ fn check_and_flush_metrics( while let Some(metrics) = pending.get(0) { // Always send pending metrics. They represent state changes info!( - "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} write_memory = {} read_memory = {}", + "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} disk_utilized = {} write_memory = {} read_memory = {}", metrics.mode, metrics.batches, metrics.errors, metrics.lost_segments, metrics.disk_files, + convert(metrics.disk_utilized as f64), convert(metrics.write_memory as f64), convert(metrics.read_memory as f64), ); From f7a57038acfbe2c8a588e492796c8dafb491969a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 5 Dec 2023 15:39:16 +0530 Subject: [PATCH 4/7] fix: call the right method --- uplink/src/base/serializer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index bc0c87ea3..81846d3e9 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -590,7 +590,7 @@ fn check_metrics(metrics: &mut SerializerMetrics, storage_handler: &StorageHandl metrics.set_write_memory(inmemory_write_size); metrics.set_read_memory(inmemory_read_size); metrics.set_disk_files(file_count); - metrics.set_disk_files(disk_utilized); + metrics.set_disk_utilized(disk_utilized); info!( "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} disk_utilized = {} write_memory = {} read_memory = {}", From 3a0fc16c804120edd5dcc0c2a85f1038dd5e9cea Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 5 Dec 2023 15:51:43 +0530 Subject: [PATCH 5/7] fix: init `bytes_occupied` --- storage/src/lib.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/storage/src/lib.rs b/storage/src/lib.rs index d5b1dc12f..02a009a99 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -234,6 +234,13 @@ impl Persistence { let backlog_files = get_file_ids(&path)?; info!("List of file ids loaded from disk: {backlog_files:?}"); + let bytes_occupied = backlog_files.iter().fold(0, |acc, id| { + let mut file = PathBuf::from(&path); + let file_name = format!("backup@{id}"); + file.push(file_name); + fs::metadata(&file).unwrap().len() as usize + acc + }); + Ok(Persistence { path, max_file_count, @@ -241,7 +248,7 @@ impl Persistence { current_read_file_id: None, // deleted: None, non_destructive_read: false, - bytes_occupied: 0, + bytes_occupied, }) } From 7bdae5e12ae3e0b65bc3bc05131811403991b800 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 5 Dec 2023 15:54:01 +0530 Subject: [PATCH 6/7] log: add missing line --- uplink/src/base/serializer/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 81846d3e9..aedb4ff21 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -678,12 +678,13 @@ fn check_and_flush_metrics( if metrics.batches() > 0 { info!( - "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} write_memory = {} read_memory = {}", + "{:>17}: batches = {:<3} errors = {} lost = {} disk_files = {:<3} disk_utilized = {} write_memory = {} read_memory = {}", metrics.mode, metrics.batches, metrics.errors, metrics.lost_segments, metrics.disk_files, + convert(metrics.disk_utilized as f64), convert(metrics.write_memory as f64), convert(metrics.read_memory as f64), ); From 13ed9db7682b0c8a744e27f98cb2bd83104feb82 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 5 Dec 2023 18:46:48 +0530 Subject: [PATCH 7/7] doc: why `disk_utilized` --- uplink/src/base/serializer/metrics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uplink/src/base/serializer/metrics.rs b/uplink/src/base/serializer/metrics.rs index 6f9543e0e..68e1fc435 100644 --- a/uplink/src/base/serializer/metrics.rs +++ b/uplink/src/base/serializer/metrics.rs @@ -17,7 +17,7 @@ pub struct SerializerMetrics { pub read_memory: usize, /// Number of files that have been written to disk pub disk_files: usize, - /// Number of bytes that have been written to disk + /// Disk size currently occupied by persistence files pub disk_utilized: usize, /// Nuber of persistence files that had to deleted before being consumed pub lost_segments: usize,