Skip to content

Commit

Permalink
Merge pull request #10 from josehu07/multipaxos
Browse files Browse the repository at this point in the history
Fix transport connections logic & Client-side utilities
  • Loading branch information
josehu07 committed Aug 9, 2023
2 parents 72a89ad + 1846051 commit a43fc07
Show file tree
Hide file tree
Showing 30 changed files with 1,605 additions and 732 deletions.
687 changes: 407 additions & 280 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
[workspace]
members = [
"summerset_server",
"summerset_client",
]
members = ["summerset_server", "summerset_client"]

[package]
name = "summerset"
Expand All @@ -15,8 +12,10 @@ authors = ["Guanzhou Jose Hu <huguanzhou123@gmail.com>"]
async-trait = "0.1"
fixedbitset = "0.4"
flashmap = "0.1"
tokio = { version = "1.27", features = ["full"] }
futures = "0.3"
tokio = { version = "1.29", features = ["full"] }
rand = "0.8"
lazy_static = "1.4"
rmp-serde = "1.1"
serde = { version = "1.0", features = ["derive"] }
toml = { version = "0.7", features = ["parse"] }
Expand Down
23 changes: 14 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@ The default logging level is set as >= `info`. To display debugging or even trac
RUST_LOG=debug cargo run ...
```

A helper script, `cluster.py`, for running a set of server nodes as local processes is also provided:

```bash
python3 cluster.py -h
```

Complete cluster management and benchmarking scripts are available in another repo, [Wayrest](https://github.com/josehu07/wayrest), which is a Python module for managing replication protocol clusters and running distributed experiments.

### Run Clients

Run a client executable:
Expand All @@ -59,6 +51,15 @@ Run a client executable:
cargo run [-r] -p summerset_client -- -h
```

Some helper scripts for running server nodes and clients as local processes are available in `scripts/`:

```bash
python3 scripts/local_cluster.py -h
python3 scripts/local_client.py -h
```

Complete cluster management and benchmarking scripts are available in another repo, [Wayrest](https://github.com/josehu07/wayrest), which is a Python module for managing replication protocol clusters and running distributed experiments.

## TODO List

- [x] event-based programming structure
Expand All @@ -72,7 +73,11 @@ cargo run [-r] -p summerset_client -- -h
- [ ] membership discovery & view changes
- [ ] implementation of Raft
- [ ] implementation of Crossword prototype
- [ ] complete client, tests, & benchmarks
- [ ] complete client-side utilities
- [x] REPL-style client
- [x] random benchmarking client
- [ ] testing client
- [ ] benchmarking with YCSB input
- [ ] better README

---
Expand Down
80 changes: 0 additions & 80 deletions cluster.py

This file was deleted.

130 changes: 130 additions & 0 deletions scripts/local_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import os
import argparse
import subprocess
from pathlib import Path


def run_process(cmd):
print("Run:", " ".join(cmd))
proc = subprocess.Popen(cmd)
return proc


PROTOCOL_CONFIGS = {
"RepNothing": lambda n: "",
"SimplePush": lambda n: "",
"MultiPaxos": lambda n: "",
}

MODE_PARAMS = {
"repl": [],
"bench": ["init_batch_size", "value_size", "put_ratio", "length_s"],
}


def glue_params_str(cli_args, params_list):
params_strs = []

for param in params_list:
value = getattr(cli_args, param)
if value is None:
continue

if isinstance(value, str):
params_strs.append(f"{param}='{value}'")
elif isinstance(value, bool):
params_strs.append(f"{param}={'true' if value else 'false'}")
else:
params_strs.append(f"{param}={value}")

return "+".join(params_strs)


def compose_client_cmd(protocol, replica_list, config, mode, params, release):
cmd = [
"cargo",
"run",
"-p",
"summerset_client",
]
if release:
cmd.append("-r")

cmd += [
"--",
"-p",
protocol,
]
cmd += replica_list
if len(config) > 0:
cmd += ["--config", config]

cmd += ["-m", mode]
if len(params) > 0:
cmd += ["--params", params]

return cmd


def run_client(protocol, num_replicas, mode, params, release):
api_ports = list(range(52700, 52700 + num_replicas * 10, 10))
replica_list = []
for replica in range(num_replicas):
replica_list += ["-r", f"127.0.0.1:{api_ports[replica]}"]

cmd = compose_client_cmd(
protocol,
replica_list,
PROTOCOL_CONFIGS[protocol](num_replicas),
mode,
params,
release,
)
proc = run_process(cmd)

return proc


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-p", "--protocol", type=str, required=True, help="protocol name"
)
parser.add_argument(
"-n", "--num_replicas", type=int, required=True, help="number of replicas"
)
parser.add_argument(
"-r", "--release", action="store_true", help="if set, run release mode"
)

subparsers = parser.add_subparsers(
required=True, dest="mode", description="client utility mode"
)

parser_repl = subparsers.add_parser("repl", help="REPL mode")

parser_bench = subparsers.add_parser("bench", help="benchmark mode")
parser_bench.add_argument(
"-b", "--init_batch_size", type=int, help="initial batch size"
)
parser_bench.add_argument(
"-v", "--value_size", type=int, help="value size in bytes"
)
parser_bench.add_argument("-w", "--put_ratio", type=int, help="percentage of puts")
parser_bench.add_argument("-l", "--length_s", type=int, help="run length in secs")

args = parser.parse_args()

if args.protocol not in PROTOCOL_CONFIGS:
raise ValueError(f"unknown protocol name '{args.protocol}'")
if args.num_replicas <= 0 or args.num_replicas > 9:
raise ValueError(f"invalid number of replicas {args.num_replicas}")

client_proc = run_client(
args.protocol,
args.num_replicas,
args.mode,
glue_params_str(args, MODE_PARAMS[args.mode]),
args.release,
)
client_proc.wait()
105 changes: 105 additions & 0 deletions scripts/local_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import os
import argparse
import subprocess
from pathlib import Path


def run_process(cmd):
print("Run:", " ".join(cmd))
proc = subprocess.Popen(cmd)
return proc


PROTOCOL_CONFIGS = {
"RepNothing": lambda r, n: f"backer_path='/tmp/summerset.rep_nothing.{r}.wal'",
"SimplePush": lambda r, n: f"backer_path='/tmp/summerset.simple_push.{r}.wal'+rep_degree={n-1}",
"MultiPaxos": lambda r, n: f"backer_path='/tmp/summerset.multipaxos.{r}.wal'+logger_sync=false",
}


def compose_server_cmd(
protocol, api_port, base_conn_port, replica_id, replica_list, config, release
):
cmd = [
"cargo",
"run",
"-p",
"summerset_server",
]
if release:
cmd.append("-r")

cmd += [
"--",
"-p",
protocol,
"-a",
str(api_port),
"-b",
str(base_conn_port),
"-i",
str(replica_id),
]
cmd += replica_list
if len(config) > 0:
cmd += ["--config", config]

return cmd


def launch_servers(protocol, num_replicas, release):
api_ports = list(range(52700, 52700 + num_replicas * 10, 10))
base_conn_ports = list(range(52800, 52800 + num_replicas * 10, 10))
replica_lists = [[] for _ in range(num_replicas)]
for replica in range(num_replicas):
for peer in range(num_replicas):
replica_lists[replica] += [
"-r",
f"127.0.0.1:{base_conn_ports[peer] + replica}",
]

server_procs = []
for replica in range(num_replicas):
cmd = compose_server_cmd(
protocol,
api_ports[replica],
base_conn_ports[replica],
replica,
replica_lists[replica],
PROTOCOL_CONFIGS[protocol](replica, num_replicas),
release,
)
proc = run_process(cmd)
server_procs.append(proc)

return server_procs


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-p", "--protocol", type=str, required=True, help="protocol name"
)
parser.add_argument(
"-n", "--num_replicas", type=int, required=True, help="number of replicas"
)
parser.add_argument(
"-r", "--release", action="store_true", help="if set, run release mode"
)
args = parser.parse_args()

if args.protocol not in PROTOCOL_CONFIGS:
raise ValueError(f"unknown protocol name '{args.protocol}'")
if args.num_replicas <= 0 or args.num_replicas > 9:
raise ValueError(f"invalid number of replicas {args.num_replicas}")

# kill all existing server processes
os.system("pkill summerset_server")

# remove all existing wal files
for path in Path("/tmp").glob("summerset.*.wal"):
path.unlink()

server_procs = launch_servers(args.protocol, args.num_replicas, args.release)
for proc in server_procs:
proc.wait()
4 changes: 3 additions & 1 deletion src/client/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ pub trait GenericClient {
/// Creates a new client stub.
fn new(
id: ClientId,
// remote addresses of server replicas
servers: HashMap<ReplicaId, SocketAddr>,
config_str: Option<&str>, // protocol-specific config in TOML format
// protocol-specific config in TOML format
config_str: Option<&str>,
) -> Result<Self, SummersetError>
where
Self: Sized;
Expand Down
Loading

0 comments on commit a43fc07

Please sign in to comment.