Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into preferential-ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Oct 7, 2023
2 parents 71d552a + 0dca427 commit 25d36b3
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 42 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 31 additions & 22 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::{Buf, BufMut, BytesMut};
use log::{self, error, info, warn};
use log::{self, debug, error, info, warn};
use seahash::hash;

use std::collections::VecDeque;
Expand All @@ -19,6 +19,7 @@ pub enum Error {
}

pub struct Storage {
name: String,
/// maximum allowed file size
max_file_size: usize,
/// current open file
Expand All @@ -30,8 +31,9 @@ pub struct Storage {
}

impl Storage {
pub fn new(max_file_size: usize) -> Storage {
pub fn new(name: impl Into<String>, max_file_size: usize) -> Storage {
Storage {
name: name.into(),
max_file_size,
current_write_file: BytesMut::with_capacity(max_file_size * 2),
current_read_file: BytesMut::with_capacity(max_file_size * 2),
Expand Down Expand Up @@ -87,7 +89,10 @@ impl Storage {
Some(persistence) => {
let hash = hash(&self.current_write_file[..]);
let mut next_file = persistence.open_next_write_file()?;
info!("Flushing data to disk!! {:?}", next_file.path);
info!(
"Flushing data to disk for stoarge: {}; path = {:?}",
self.name, next_file.path
);

next_file.file.write_all(&hash.to_be_bytes())?;
next_file.file.write_all(&self.current_write_file[..])?;
Expand All @@ -99,6 +104,10 @@ impl Storage {
// TODO(RT): Make sure that disk files starts with id 1 to represent in memory file
// with id 0
self.current_write_file.clear();
warn!(
"Persistence disabled for storage: {}. Deleted in-memory buffer on overflow",
self.name
);
Ok(Some(0))
}
}
Expand All @@ -116,9 +125,12 @@ impl Storage {
}

if let Some(persistence) = &mut self.persistence {
// Remove read file on completion
if let Some(id) = persistence.current_read_file_id.take() {
persistence.remove(id)?;
// Remove read file on completion in destructive-read mode
let read_is_destructive = !persistence.non_destructive_read;
let read_file_id = persistence.current_read_file_id.take();
if let Some(id) = read_is_destructive.then_some(read_file_id).flatten() {
let deleted_file = persistence.remove(id)?;
debug!("Completed reading a persistence file, deleting it; storage = {}, path = {deleted_file:?}", self.name);
}

// Swap read buffer with write buffer to read data in inmemory write
Expand Down Expand Up @@ -227,15 +239,11 @@ impl Persistence {
}

/// Removes a file with provided id
fn remove(&self, id: u64) -> Result<(), Error> {
if self.non_destructive_read {
return Ok(());
}

fn remove(&self, id: u64) -> Result<PathBuf, Error> {
let path = self.path(id)?;
fs::remove_file(path)?;
fs::remove_file(&path)?;

Ok(())
Ok(path)
}

/// Move corrupt file to special directory
Expand Down Expand Up @@ -284,10 +292,11 @@ impl Persistence {
_ => self.backlog_files.pop_front().unwrap(),
};

warn!("file limit reached. deleting backup@{}", id);

next.deleted = Some(id);
self.remove(id)?;
if !self.non_destructive_read {
let deleted_file = self.remove(id)?;
warn!("file limit reached. deleting backup@{}; path = {deleted_file:?}", id);
}

Ok(next)
}
Expand Down Expand Up @@ -373,7 +382,7 @@ mod test {
fn flush_creates_new_file_after_size_limit() {
// 1036 is the size of a publish message with topic = "hello", qos = 1, payload = 1024 bytes
let backup = init_backup_folders();
let mut storage = Storage::new(10 * 1036);
let mut storage = Storage::new("test", 10 * 1036);
storage.set_persistence(backup.path(), 10).unwrap();

// 2 files on disk and a partially filled in memory buffer
Expand All @@ -390,7 +399,7 @@ mod test {
#[test]
fn old_file_is_deleted_after_limit() {
let backup = init_backup_folders();
let mut storage = Storage::new(10 * 1036);
let mut storage = Storage::new("test", 10 * 1036);
storage.set_persistence(backup.path(), 10).unwrap();

// 11 files created. 10 on disk
Expand All @@ -410,7 +419,7 @@ mod test {
#[test]
fn reload_loads_correct_file_into_memory() {
let backup = init_backup_folders();
let mut storage = Storage::new(10 * 1036);
let mut storage = Storage::new("test", 10 * 1036);
storage.set_persistence(backup.path(), 10).unwrap();

// 10 files on disk
Expand All @@ -428,7 +437,7 @@ mod test {
#[test]
fn reload_loads_partially_written_write_buffer_correctly() {
let backup = init_backup_folders();
let mut storage = Storage::new(10 * 1036);
let mut storage = Storage::new("test", 10 * 1036);
storage.set_persistence(backup.path(), 10).unwrap();

// 10 files on disk and partially filled current write buffer
Expand All @@ -447,7 +456,7 @@ mod test {
#[test]
fn ensure_file_remove_on_read_completion_only() {
let backup = init_backup_folders();
let mut storage = Storage::new(10 * 1036);
let mut storage = Storage::new("test", 10 * 1036);
storage.set_persistence(backup.path(), 10).unwrap();
// 10 files on disk and partially filled current write buffer, 10 publishes per file
write_n_publishes(&mut storage, 105);
Expand Down Expand Up @@ -481,7 +490,7 @@ mod test {
#[test]
fn ensure_files_including_read_removed_post_flush_on_overflow() {
let backup = init_backup_folders();
let mut storage = Storage::new(10 * 1036);
let mut storage = Storage::new("test", 10 * 1036);
storage.set_persistence(backup.path(), 10).unwrap();
// 10 files on disk and partially filled current write buffer, 10 publishes per file
write_n_publishes(&mut storage, 105);
Expand Down
2 changes: 1 addition & 1 deletion uplink/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "uplink"
version = "2.8.0"
version = "2.8.1"
authors = ["tekjar <raviteja@bytebeam.io>"]
edition = "2021"

Expand Down
57 changes: 39 additions & 18 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,16 @@ impl MqttClient for AsyncClient {

struct StorageHandler {
map: BTreeMap<Arc<StreamConfig>, Storage>,
// Stream being read from
read_stream: Option<Arc<StreamConfig>>,
}

impl StorageHandler {
fn new(config: Arc<Config>) -> Result<Self, Error> {
let mut map = BTreeMap::new();
for (stream_name, stream_config) in config.streams.iter() {
let mut storage = Storage::new(stream_config.persistence.max_file_size);
let mut storage =
Storage::new(&stream_config.topic, stream_config.persistence.max_file_size);
if stream_config.persistence.max_file_count > 0 {
let mut path = config.persistence_path.clone();
path.push(stream_name);
Expand All @@ -164,11 +167,13 @@ impl StorageHandler {
map.insert(Arc::new(stream_config.clone()), storage);
}

Ok(Self { map })
Ok(Self { map, read_stream: None })
}

fn select(&mut self, stream_config: Arc<StreamConfig>) -> &mut Storage {
self.map.entry(stream_config).or_insert_with(|| Storage::new(default_file_size()))
self.map
.entry(stream_config.to_owned())
.or_insert_with(|| Storage::new(&stream_config.topic, default_file_size()))
}

fn next(
Expand All @@ -178,12 +183,28 @@ impl StorageHandler {
let storages = self.map.iter_mut();

for (stream_config, storage) in storages {
match storage.reload_on_eof() {
// Done reading all the pending files
Ok(true) => continue,
Ok(false) => return Some((stream_config, storage)),
match (storage.reload_on_eof(), &mut self.read_stream) {
// Done reading all pending files for a persisted stream
(Ok(true), Some(curr_stream)) => {
if curr_stream == stream_config {
self.read_stream.take();
debug!("Completed reading from: {}", stream_config.topic);
}

continue;
}
// Persisted stream is empty
(Ok(true), _) => continue,
// Reading from a newly loaded non-empty persisted stream
(Ok(false), None) => {
debug!("Reading from: {}", stream_config.topic);
self.read_stream = Some(stream_config.to_owned());
return Some((stream_config, storage));
}
// Continuing to read from persisted stream loaded earlier
(Ok(false), _) => return Some((stream_config, storage)),
// Reload again on encountering a corrupted file
Err(e) => {
(Err(e), _) => {
metrics.increment_errors();
metrics.increment_lost_segments();
error!("Failed to reload from storage. Error = {e}");
Expand Down Expand Up @@ -859,7 +880,7 @@ mod test {
let config = Arc::new(default_config());

let (serializer, _, _) = defaults(config);
let mut storage = Storage::new(1024);
let mut storage = Storage::new("hello/world", 1024);

let mut publish = Publish::new(
"hello/world",
Expand Down Expand Up @@ -966,12 +987,12 @@ mod test {
let config = Arc::new(default_config());

let (mut serializer, data_tx, net_rx) = defaults(config);

let stream_config = Arc::new(Default::default());
let mut storage = serializer
.storage_handler
.map
.entry(Arc::new(Default::default()))
.or_insert(Storage::new(1024));
.entry(stream_config)
.or_insert(Storage::new(stream_config, 1024));

let mut collector = MockCollector::new(data_tx);
// Run a collector practically once
Expand Down Expand Up @@ -1018,12 +1039,12 @@ mod test {
let config = Arc::new(default_config());

let (mut serializer, data_tx, _) = defaults(config);

let stream_config =
Arc::new(StreamConfig { topic: "hello/world".to_owned(), ..Default::default() });

let mut storage =
serializer.storage_handler.map.entry(stream_config).or_insert(Storage::new(1024));
let stream_config = Arc::new(Default::default());
let mut storage = serializer
.storage_handler
.map
.entry(stream_config)
.or_insert(Storage::new(stream_config, 1024));

let mut collector = MockCollector::new(data_tx);
// Run a collector
Expand Down

0 comments on commit 25d36b3

Please sign in to comment.