Skip to content

Commit

Permalink
src: clean up the base library modules
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jul 21, 2024
1 parent df8cabb commit be44cf6
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 93 deletions.
2 changes: 1 addition & 1 deletion src/bin/toysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl ToySQL {
/// Creates a new ToySQL REPL for the given server host and port
fn new(host: &str, port: u16) -> Result<Self> {
Ok(Self {
client: Client::new((host, port))?,
client: Client::connect((host, port))?,
editor: Editor::new()?,
history_path: std::env::var_os("HOME")
.map(|home| std::path::Path::new(&home).join(".toysql.history")),
Expand Down
4 changes: 2 additions & 2 deletions src/bin/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl Runner {
/// Runs the specified workload.
fn run<W: Workload>(self, workload: W) -> Result<()> {
let mut rng = rand::rngs::StdRng::seed_from_u64(self.seed);
let mut client = Client::new(&self.hosts[0])?;
let mut client = Client::connect(&self.hosts[0])?;

// Set up a histogram recording txn latencies as nanoseconds. The
// buckets range from 0.001s to 10s.
Expand All @@ -103,7 +103,7 @@ impl Runner {
let (done_tx, done_rx) = crossbeam::channel::bounded::<()>(0);

for addr in self.hosts.iter().cycle().take(self.concurrency) {
let mut client = Client::new(addr)?;
let mut client = Client::connect(addr)?;
let mut recorder = hist.recorder();
let work_rx = work_rx.clone();
let done_tx = done_tx.clone();
Expand Down
75 changes: 36 additions & 39 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::io::Write as _;

use crate::encoding::Value as _;
use crate::errdata;
use crate::error::{Error, Result};
Expand All @@ -9,66 +7,72 @@ use crate::sql::types::Table;
use crate::storage::mvcc;

use rand::Rng;
use std::io::Write as _;

/// A toyDB client
/// A toyDB client. Connects to a server via TCP and submits SQL statements and
/// other requests.
pub struct Client {
/// Inbound response stream.
reader: std::io::BufReader<std::net::TcpStream>,
/// Outbound request stream.
writer: std::io::BufWriter<std::net::TcpStream>,
/// The current transaction, if any.
txn: Option<mvcc::TransactionState>,
}

impl Client {
/// Creates a new client
pub fn new(addr: impl std::net::ToSocketAddrs) -> Result<Self> {
/// Connects to a toyDB server, creating a new client.
pub fn connect(addr: impl std::net::ToSocketAddrs) -> Result<Self> {
let socket = std::net::TcpStream::connect(addr)?;
let reader = std::io::BufReader::new(socket.try_clone()?);
let writer = std::io::BufWriter::new(socket);
Ok(Self { reader, writer, txn: None })
}

/// Call a server method
fn call(&mut self, request: Request) -> Result<Response> {
/// Sends a request to the server, returning the response.
fn request(&mut self, request: Request) -> Result<Response> {
request.encode_into(&mut self.writer)?;
self.writer.flush()?;
Result::<Response>::decode_from(&mut self.reader)?
Result::decode_from(&mut self.reader)?
}

/// Executes a query
pub fn execute(&mut self, query: &str) -> Result<StatementResult> {
let resultset = match self.call(Request::Execute(query.into()))? {
Response::Execute(rs) => rs,
/// Executes a SQL statement.
pub fn execute(&mut self, statement: &str) -> Result<StatementResult> {
let result = match self.request(Request::Execute(statement.to_string()))? {
Response::Execute(result) => result,
response => return errdata!("unexpected response {response:?}"),
};
match &resultset {
// Update the transaction state.
match &result {
StatementResult::Begin { state } => self.txn = Some(state.clone()),
StatementResult::Commit { .. } => self.txn = None,
StatementResult::Rollback { .. } => self.txn = None,
_ => {}
}
Ok(resultset)
Ok(result)
}

/// Fetches the table schema as SQL
/// Fetches a table schema.
pub fn get_table(&mut self, table: &str) -> Result<Table> {
match self.call(Request::GetTable(table.into()))? {
Response::GetTable(t) => Ok(t),
resp => errdata!("unexpected response: {resp:?}"),
match self.request(Request::GetTable(table.to_string()))? {
Response::GetTable(table) => Ok(table),
response => errdata!("unexpected response: {response:?}"),
}
}

/// Lists database tables
/// Lists database tables.
pub fn list_tables(&mut self) -> Result<Vec<String>> {
match self.call(Request::ListTables)? {
Response::ListTables(t) => Ok(t),
resp => errdata!("unexpected response: {resp:?}"),
match self.request(Request::ListTables)? {
Response::ListTables(tables) => Ok(tables),
response => errdata!("unexpected response: {response:?}"),
}
}

/// Checks server status
/// Returns server status.
pub fn status(&mut self) -> Result<Status> {
match self.call(Request::Status)? {
Response::Status(s) => Ok(s),
resp => errdata!("unexpected response: {resp:?}"),
match self.request(Request::Status)? {
Response::Status(status) => Ok(status),
response => errdata!("unexpected response: {response:?}"),
}
}

Expand All @@ -79,27 +83,20 @@ impl Client {

/// Runs the given closure, automatically retrying serialization and abort
/// errors. If a transaction is open following an error, it is automatically
/// rolled back. It is the caller's responsibility to use a transaction
/// in the closure where appropriate (i.e. when it is not idempotent).
///
/// TODO: test this.
pub fn with_retry<F, T>(&mut self, mut f: F) -> Result<T>
where
F: FnMut(&mut Client) -> Result<T>,
{
/// rolled back. It is the caller's responsibility to use a transaction in
/// the closure where appropriate (i.e. when it is not idempotent).
pub fn with_retry<T>(&mut self, f: impl Fn(&mut Client) -> Result<T>) -> Result<T> {
const MAX_RETRIES: u32 = 10;
const MIN_WAIT: u64 = 10;
const MAX_WAIT: u64 = 2_000;

let mut retries: u32 = 0;
loop {
match f(self) {
Ok(r) => return Ok(r),
Ok(result) => return Ok(result),
Err(Error::Serialization | Error::Abort) if retries < MAX_RETRIES => {
if self.txn().is_some() {
self.execute("ROLLBACK")?;
}

// Use exponential backoff starting at MIN_WAIT doubling up
// to MAX_WAIT, but randomize the wait time in this interval
// to reduce the chance of collisions.
Expand All @@ -108,11 +105,11 @@ impl Client {
std::thread::sleep(std::time::Duration::from_millis(wait));
retries += 1;
}
Err(e) => {
Err(error) => {
if self.txn().is_some() {
self.execute("ROLLBACK").ok(); // ignore rollback error
}
return Err(e);
return Err(error);
}
}
}
Expand Down
33 changes: 17 additions & 16 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Error {
/// The operation was aborted and must be retried. This typically happens
/// with e.g. Raft leader changes.
/// with e.g. Raft leader changes. This is used instead of implementing
/// complex retry logic and replay protection in Raft.
Abort,
/// Invalid data, typically decoding errors or unexpected internal values.
InvalidData(String),
Expand Down Expand Up @@ -35,23 +36,23 @@ impl std::fmt::Display for Error {
}

impl Error {
/// Returns whether the error is considered deterministic. State machine
/// application needs to know whether a command failure is deterministic on
/// the input command -- if it is, the command can be considered applied and
/// the error returned to the client, but otherwise the state machine must
/// panic to prevent replica divergence.
/// Returns whether the error is considered deterministic. Raft state
/// machine application needs to know whether a command failure is
/// deterministic on the input command -- if it is, the command can be
/// considered applied and the error returned to the client, but otherwise
/// the state machine must panic to prevent replica divergence.
pub fn is_deterministic(&self) -> bool {
match self {
// Aborts don't happen during application, only leader changes. But
// we consider them non-deterministic in case a abort should happen
// we consider them non-deterministic in case an abort should happen
// unexpectedly below Raft.
Error::Abort => false,
// Possible data corruption local to this node.
Error::InvalidData(_) => false,
// Input errors are (likely) deterministic. We could employ command
// checksums to be sure.
// Input errors are (likely) deterministic. They might not be in
// case data was corrupted in flight, but we ignore this case.
Error::InvalidInput(_) => true,
// IO errors are typically node-local.
// IO errors are typically local to the node (e.g. faulty disk).
Error::IO(_) => false,
// Write commands in read-only transactions are deterministic.
Error::ReadOnly => true,
Expand All @@ -61,19 +62,19 @@ impl Error {
}
}

/// Constructs an Error::InvalidData via format!() and into().
/// Constructs an Error::InvalidData for the given format string.
#[macro_export]
macro_rules! errdata {
($($args:tt)*) => { $crate::error::Error::InvalidData(format!($($args)*)).into() };
}

/// Constructs an Error::InvalidInput via format!() and into().
/// Constructs an Error::InvalidInput for the given format string.
#[macro_export]
macro_rules! errinput {
($($args:tt)*) => { $crate::error::Error::InvalidInput(format!($($args)*)).into() };
}

/// Result returning Error.
/// A toyDB Result returning Error.
pub type Result<T> = std::result::Result<T, Error>;

impl<T> From<Error> for Result<T> {
Expand Down Expand Up @@ -132,7 +133,7 @@ impl<T> From<crossbeam::channel::TrySendError<T>> for Error {

impl From<hdrhistogram::CreationError> for Error {
fn from(err: hdrhistogram::CreationError) -> Self {
panic!("{err}")
panic!("{err}") // faulty code
}
}

Expand All @@ -150,13 +151,13 @@ impl From<log::ParseLevelError> for Error {

impl From<log::SetLoggerError> for Error {
fn from(err: log::SetLoggerError) -> Self {
panic!("{err}")
panic!("{err}") // faulty code
}
}

impl From<regex::Error> for Error {
fn from(err: regex::Error) -> Self {
panic!("{err}")
panic!("{err}") // faulty code
}
}

Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![warn(clippy::all)]
#![allow(clippy::new_without_default)]
#![allow(clippy::unneeded_field_pattern)]

pub mod client;
pub mod encoding;
Expand Down
2 changes: 1 addition & 1 deletion src/raft/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub struct Log {
/// The underlying storage engine. Uses a trait object instead of generics,
/// to allow runtime selection of the engine and avoid propagating the
/// generic type parameters throughout Raft.
pub(super) engine: Box<dyn storage::Engine>,
pub engine: Box<dyn storage::Engine>,
/// The current term.
term: Term,
/// Our leader vote in the current term, if any.
Expand Down
Loading

0 comments on commit be44cf6

Please sign in to comment.