Skip to content

Commit

Permalink
Merge pull request #12 from josehu07/multipaxos
Browse files Browse the repository at this point in the history
Fix possible circular deadlocks
  • Loading branch information
josehu07 committed Aug 13, 2023
2 parents 7d1955a + 2365990 commit fb036a6
Show file tree
Hide file tree
Showing 17 changed files with 414 additions and 480 deletions.
9 changes: 5 additions & 4 deletions scripts/local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def run_process(cmd):

MODE_PARAMS = {
"repl": [],
"bench": ["init_batch_size", "value_size", "put_ratio", "length_s"],
"bench": ["init_batch_size", "value_size", "put_ratio", "length_s", "adaptive"],
}


Expand Down Expand Up @@ -93,9 +93,7 @@ def run_client(protocol, num_replicas, mode, params, release):
parser.add_argument(
"-n", "--num_replicas", type=int, required=True, help="number of replicas"
)
parser.add_argument(
"-r", "--release", action="store_true", help="if set, run release mode"
)
parser.add_argument("-r", "--release", action="store_true", help="run release mode")

subparsers = parser.add_subparsers(
required=True, dest="mode", description="client utility mode"
Expand All @@ -112,6 +110,9 @@ def run_client(protocol, num_replicas, mode, params, release):
)
parser_bench.add_argument("-w", "--put_ratio", type=int, help="percentage of puts")
parser_bench.add_argument("-l", "--length_s", type=int, help="run length in secs")
parser_bench.add_argument(
"-a", "--adaptive", action="store_true", help="adaptive batch size"
)

args = parser.parse_args()

Expand Down
78 changes: 67 additions & 11 deletions src/client/apistub.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Summerset client API communication stub implementation.

use std::io::ErrorKind;
use std::net::SocketAddr;

use crate::utils::SummersetError;
Expand Down Expand Up @@ -48,33 +49,88 @@ impl ClientApiStub {
/// Client write stub that owns a TCP write half.
pub struct ClientSendStub {
/// My client ID.
_id: ClientId,
id: ClientId,

/// Write-half split of the TCP connection stream.
conn_write: OwnedWriteHalf,

/// Request write buffer for deadlock avoidance.
req_buf: BytesMut,

/// Request write buffer cursor at first unwritten byte.
req_buf_cursor: usize,
}

impl ClientSendStub {
/// Creates a new write stub.
fn new(id: ClientId, conn_write: OwnedWriteHalf) -> Self {
ClientSendStub {
_id: id,
id,
conn_write,
req_buf: BytesMut::with_capacity(8 + 1024),
req_buf_cursor: 0,
}
}

/// Sends a request to established server connection.
pub async fn send_req(
/// Sends a request to established server connection. Returns:
/// - `Ok(true)` if successful
/// - `Ok(false)` if socket full and may block; in this case, the input
/// request is saved and the next calls to `send_req()`
/// must give arg `req == None` to retry until successful
/// (typically after doing a few `recv_reply()`s to free
/// up some buffer space)
/// - `Err(err)` if any unexpected error occurs
pub fn send_req(
&mut self,
req: ApiRequest,
) -> Result<(), SummersetError> {
let req_bytes = encode_to_vec(&req)?;
let req_len = req_bytes.len();
self.conn_write.write_u64(req_len as u64).await?; // send length first
self.conn_write.write_all(&req_bytes[..]).await?;
req: Option<&ApiRequest>,
) -> Result<bool, SummersetError> {
// DEADLOCK AVOIDANCE: we avoid using `write_u64()` and `write_all()`
// here because, in the case of TCP buffers being full, the service and
// the client may both be blocking on trying to send (write) into the
// buffers, resulting in a circular deadlock

// if last write was not successful, cannot make a new request
if req.is_some() && !self.req_buf.is_empty() {
return logged_err!(self.id; "attempting new request while should retry");
} else if req.is_none() && self.req_buf.is_empty() {
return logged_err!(self.id; "attempting to retry while buffer is empty");
} else if req.is_some() {
// sending a new request, fill req_buf
assert_eq!(self.req_buf_cursor, 0);
let req_bytes = encode_to_vec(req.unwrap())?;
let req_len = req_bytes.len();
self.req_buf.extend_from_slice(&req_len.to_be_bytes());
assert_eq!(self.req_buf.len(), 8);
self.req_buf.extend_from_slice(req_bytes.as_slice());
} else {
// retrying last unsuccessful write
assert!(self.req_buf_cursor < self.req_buf.len());
pf_debug!(self.id; "retrying last unsuccessful send_req");
}

// try until length + the request are all written
while self.req_buf_cursor < self.req_buf.len() {
match self
.conn_write
.try_write(&self.req_buf[self.req_buf_cursor..])
{
Ok(n) => {
self.req_buf_cursor += n;
}
Err(ref err) if err.kind() == ErrorKind::WouldBlock => {
pf_debug!(self.id; "send_req would block; TCP buffer full?");
return Ok(false);
}
Err(err) => return Err(err.into()),
}
}

// everything written, clear req_buf
self.req_buf.clear();
self.req_buf_cursor = 0;

// pf_trace!(self.id; "send req {:?}", req);
Ok(())
Ok(true)
}
}

Expand Down
10 changes: 6 additions & 4 deletions src/client/handle.rs → src/client/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Summerset generic client trait to be implemented by all protocol-specific
//! Summerset generic client traits to be implemented by all protocol-specific
//! client stub structs.

use std::collections::HashMap;
Expand All @@ -14,7 +14,7 @@ pub type ClientId = u64;

/// Client trait to be implement by all protocol-specific client structs.
#[async_trait]
pub trait GenericClient {
pub trait GenericEndpoint {
/// Creates a new client stub.
fn new(
id: ClientId,
Expand All @@ -31,8 +31,10 @@ pub trait GenericClient {
async fn setup(&mut self) -> Result<(), SummersetError>;

/// Sends a request to the service according to protocol-specific logic.
async fn send_req(&mut self, req: ApiRequest)
-> Result<(), SummersetError>;
fn send_req(
&mut self,
req: Option<&ApiRequest>,
) -> Result<bool, SummersetError>;

/// Receives a reply from the service according to protocol-specific logic.
async fn recv_reply(&mut self) -> Result<ApiReply, SummersetError>;
Expand Down
4 changes: 2 additions & 2 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Summerset's client functionality modules and trait.

mod handle;
mod endpoint;
mod apistub;

pub use handle::{GenericClient, ClientId};
pub use endpoint::{GenericEndpoint, ClientId};
pub use apistub::{ClientApiStub, ClientSendStub, ClientRecvStub};
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use crate::server::{
};

#[doc(inline)]
pub use crate::client::{ClientId, GenericClient};
pub use crate::client::{ClientId, GenericEndpoint};

#[doc(inline)]
pub use crate::protocols::SMRProtocol;
Expand Down
4 changes: 2 additions & 2 deletions src/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::net::SocketAddr;

use crate::utils::SummersetError;
use crate::server::{GenericReplica, ReplicaId};
use crate::client::{GenericClient, ClientId};
use crate::client::{GenericEndpoint, ClientId};

mod rep_nothing;
use rep_nothing::{RepNothingReplica, RepNothingClient};
Expand Down Expand Up @@ -86,7 +86,7 @@ impl SMRProtocol {
id: ClientId,
servers: HashMap<ReplicaId, SocketAddr>,
config_str: Option<&str>,
) -> Result<Box<dyn GenericClient>, SummersetError> {
) -> Result<Box<dyn GenericEndpoint>, SummersetError> {
match self {
Self::RepNothing => {
box_if_ok!(RepNothingClient::new(id, servers, config_str))
Expand Down
Loading

0 comments on commit fb036a6

Please sign in to comment.