diff --git a/.gitignore b/.gitignore index cae0ecbb..7b66ee8d 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,4 @@ /tla+/**/*.old /results/ - +/backups/ diff --git a/models/crop_slide_pdfs.sh b/models/crop_slide_pdfs.sh new file mode 100755 index 00000000..32fe3722 --- /dev/null +++ b/models/crop_slide_pdfs.sh @@ -0,0 +1,72 @@ +#! /usr/bin/bash + +# Requires: +# sudo apt install python3-tk ghostscript poppler-utils +# pip3 install pdfCropMargins --user --upgrade + +# Usage: +# 1. save slides exported PDF as results/slide-figures.pdf +# 2. run: ./models/crop_slides_pdfs.sh + +ORIGINAL_PDF=results/slide-figures.pdf +TAKE_PAGES=8 + + +echo +echo "Deleting old results..." +rm results/slides/*.pdf + + +echo +echo "Separating desired pages..." +pdfseparate -l $TAKE_PAGES $ORIGINAL_PDF "results/slides/slide-%d.pdf" + + +echo +echo "Cropping separated pages..." +for FILE in $(ls results/slides/ | grep .pdf); +do + echo " cropping $FILE" + pdfcropmargins -p 0 -t 255 -mo -o results "results/slides/$FILE" +done + + +echo +echo "Cropping extra files..." +EXTRA_FILES=("models/cstr_bounds" "rs_coding/rs_coding" + "adaptive/exper-adaptive" "adaptive/legend-adaptive" + "bd_n_space/exper-bd_n_space" "bd_n_space/legend-bd_n_space" + "failover/exper-failover" "failover/legend-failover" + "unbalanced/exper-unbalanced" "unbalanced/legend-unbalanced" + "critical/exper-critical-5.small.50.dc" "critical/exper-critical-5.small.50.wan" + "critical/exper-critical-5.large.50.dc" "critical/exper-critical-5.large.50.wan" + "critical/exper-critical-5.mixed.50.dc" "critical/exper-critical-5.mixed.50.wan" + "critical/exper-critical-cluster_size" "critical/exper-critical-write_ratio" + "critical/legend-critical" "critical/legend-critical-minor") +for FILE_NAME in ${EXTRA_FILES[@]}; +do + echo " cropping results/final/${FILE_NAME}.pdf" + pdfcropmargins -p 0 -t 255 -mo -o results "results/final/${FILE_NAME}.pdf" +done +echo " cropping results/final/critical/ylabels-critical.pdf" +pdfcropmargins -p4 0 50 0 0 -t 255 -mo -o results "results/final/critical/ylabels-critical.pdf" + + +echo +echo "Deleting uncropped files..." +rm results/*_uncropped.pdf +rm "results/slide-figures.pdf:Zone.Identifier" + + +echo +echo "Renaming cropped slide pages..." +TARGET_NAMES=("status_diagram" "log_in_action" "rs_codeword_space" "policy-multipaxos" + "policy-rspaxos" "policy-balanced_rr" "policy-unbalanced" "concurrent_failures") +for IDX in ${!TARGET_NAMES[@]}; +do + OLD_NAME="slide-$((IDX+1)).pdf" + NEW_NAME="${TARGET_NAMES[$IDX]}.pdf" + echo " renaming $OLD_NAME to $NEW_NAME" + mv "results/slides/$OLD_NAME" "results/slides/$NEW_NAME" +done +echo diff --git a/models/perf_simulation.py b/models/perf_simulation.py deleted file mode 100644 index 203618bc..00000000 --- a/models/perf_simulation.py +++ /dev/null @@ -1,1045 +0,0 @@ -from enum import Enum -import random -import statistics -import math -import argparse -import pickle - -import matplotlib # type: ignore - -matplotlib.use("Agg") - -import matplotlib.pyplot as plt # type: ignore -import simpy # type: ignore - - -############## -# Data types # -############## - - -class Data: - def __init__(self, mark, size): - self.mark = mark - self.size = size - - def __str__(self): - return f"<{self.mark};{self.size}>" - - -class Req(Data): - def __init__(self, cid, mark, size): - self.cid = cid - super().__init__(mark, size) - - -class Ack(Data): - def __init__(self, cid, mark): - self.cid = cid - super().__init__(mark, 8) - - -class Codeword(Data): - def __init__(self, req, n, m, flags): - assert len(flags) > 0 - assert len(flags) <= n - self.req = req - self.m = m - self.n = n - self.flags = flags - shard_size = req.size / m - super().__init__(req.mark, shard_size * len(flags)) - - -############### -# Event types # -############### - - -class EType(Enum): - NetRecved = 1 - DiskSaved = 2 - ApiGotReq = 3 - SendNewReq = 4 - - -class Event: - def __init__(self, enum, info, value): - self.enum = enum - self.info = info - self.value = value - - def __str__(self): - return f"{{{self.enum}|{self.info}|{self.value}}}" - - -class NetRecved(Event): - def __init__(self, src, msg): - super().__init__(EType.NetRecved, src, msg) - - -class DiskSaved(Event): - def __init__(self, mark): - super().__init__(EType.DiskSaved, None, mark) - - -class ApiGotReq(Event): - def __init__(self, cid, req): - super().__init__(EType.ApiGotReq, cid, req) - - -class SendNewReq(Event): - def __init__(self, mark): - super().__init__(EType.SendNewReq, None, mark) - - -################### -# Component types # -################### - - -class Device: - def __init__(self, env, l, t, lv, tv): - self.env = env - self.l = l # latency factor in ms - self.t = t # ms to transfer 1 MB - self.lv = lv # max variation multiplier for l - self.tv = tv # max variation multiplier for t - self.pipe = simpy.Store(env) - - if self.lv < 1: - raise RuntimeError(f"invalid variation ratio {self.lv}") - if self.tv < 1: - raise RuntimeError(f"invalid variation ratio {self.tv}") - - def delay(self, data): - l = self.l * random.uniform(1, self.lv) - t = self.t * random.uniform(1, self.tv) - delay = l + t * (data.size / 1000000.0) - yield self.env.timeout(delay) - self.pipe.put(data) - - -class NetLink(Device): - def __init__(self, env, l, t, lv, tv, src, dst): - self.src = src - self.dst = dst - super().__init__(env, l, t, lv, tv) - - def send(self, msg): - self.env.process(self.delay(msg)) - - def recv(self): - msg = yield self.pipe.get() - return NetRecved(self.src, msg) - - -class DiskDev(Device): - def __init__(self, env, l, t, lv, tv, rid): - self.rid = rid - super().__init__(env, l, t, lv, tv) - - def write(self, ent): - self.env.process(self.delay(ent)) - - def saved(self): - ent = yield self.pipe.get() - return DiskSaved(ent.mark) - - -class ExtlApi: - def __init__(self, env, l, t, lv, tv, rid): - self.env = env - self.rid = rid - self.l = l - self.t = t - self.lv = lv - self.tv = tv - self.req_links = dict() - self.ack_links = dict() - - def connect(self, client): - req_link = NetLink( - self.env, self.l, self.t, self.lv, self.tv, client.cid, self.rid - ) - ack_link = NetLink( - self.env, self.l, self.t, self.lv, self.tv, self.rid, client.cid - ) - self.req_links[client.cid] = req_link - self.ack_links[client.cid] = ack_link - return (req_link, ack_link) - - def req(self): - # NOTE: hardcode assuming only one client connected - event = yield self.env.process(self.req_links[2957].recv()) - req = event.value - return ApiGotReq(2957, req) - - def ack(self, cid, mark): - if cid not in self.ack_links: - raise RuntimeError(f"cid {cid} not in connected") - self.ack_links[cid].send(Ack(cid, mark)) - - -##################### -# Replica & Cluster # -##################### - - -class Replica: - def __init__(self, env, rid, api_ltv, disk_ltv, protocol, **protocol_args): - self.env = env - self.rid = rid - self.extl_api = ExtlApi( - env, api_ltv[0], api_ltv[1], api_ltv[2], api_ltv[3], rid - ) - self.disk_dev = DiskDev( - env, disk_ltv[0], disk_ltv[1], disk_ltv[2], disk_ltv[3], rid - ) - self.send_links = dict() - self.recv_links = dict() - - # protocol-specific fields & event handlers - self.protocol = protocol(self, **protocol_args) - - def add_peer(self, peer, net_ltv): - s2p_link = NetLink( - self.env, net_ltv[0], net_ltv[1], net_ltv[2], net_ltv[3], self.rid, peer.rid - ) - p2s_link = NetLink( - self.env, net_ltv[0], net_ltv[1], net_ltv[2], net_ltv[3], peer.rid, self.rid - ) - self.send_links[peer.rid] = s2p_link - self.recv_links[peer.rid] = p2s_link - peer.send_links[self.rid] = p2s_link - peer.recv_links[self.rid] = s2p_link - - def run(self): - events = { - "disk_saved": self.env.process(self.disk_dev.saved()), - } - for peer, link in self.recv_links.items(): - events[("net_recved", peer)] = self.env.process(link.recv()) - - # NOTE: hardcoding to have non-leader not do api_got_req - if self.rid == 0: - events["api_got_req"] = self.env.process(self.extl_api.req()) - - while True: - # could get multiple completed triggers at this yield - conds = yield self.env.any_of(events.values()) - for event in conds.values(): - # print(f"{self.env.now}: R{self.rid} {event}") - - if event.enum == EType.ApiGotReq: - req = event.value - yield self.env.process(self.protocol.handle_api_got_req(req)) - events["api_got_req"] = self.env.process(self.extl_api.req()) - - elif event.enum == EType.DiskSaved: - mark = event.value - yield self.env.process(self.protocol.handle_disk_saved(mark)) - events["disk_saved"] = self.env.process(self.disk_dev.saved()) - - elif event.enum == EType.NetRecved: - peer, msg = event.info, event.value - yield self.env.process(self.protocol.handle_net_recved(peer, msg)) - events[("net_recved", peer)] = self.env.process( - self.recv_links[peer].recv() - ) - - else: - raise RuntimeError(f"unrecognized event type: {event}") - - def connect(self, client): - return self.extl_api.connect(client) - - -class Cluster: - def __init__( - self, - env, - num_replicas, - api_ltv, - disk_ltv_map, - net_ltv_map, - protocol, - **protocol_args, - ): - self.env = env - self.replicas = [ - Replica( - env, - rid, - api_ltv, - disk_ltv_map[rid], - protocol, - **protocol_args, - ) - for rid in range(num_replicas) - ] - self.leader = self.replicas[0] - - for rid, replica in enumerate(self.replicas): - for peerid in range(rid + 1, num_replicas): - peer = self.replicas[peerid] - replica.add_peer(peer, net_ltv_map[(rid, peerid)]) - - def launch(self): - for replica in self.replicas: - self.env.process(replica.run()) - - def connect(self, client): - return self.leader.connect(client) - - -############# -# Protocols # -############# - - -class Protocol: - def __init__(self, replica): - self.replica = replica - - -class MultiPaxos(Protocol): - def __init__(self, replica, cluster_size): - super().__init__(replica) - - self.q = cluster_size // 2 + 1 - - self.insts = [] - - @classmethod - def name_str(cls, cluster_size): - return "MultiPaxos/Raft" - - class Instance: - def __init__(self): - self.req = None - self.num_replies = 0 - self.from_peer = -1 - self.client_acked = False - - class AcceptMsg(Data): - def __init__(self, slot, req): - super().__init__(f"a-{slot}", req.size + 8) - self.req = req - - class AcceptReply(Data): - def __init__(self, slot): - super().__init__(f"r-{slot}", 8) - - def handle_api_got_req(self, req): - self.insts.append(self.Instance()) - slot = len(self.insts) - 1 - self.insts[slot].req = req - - for link in self.replica.send_links.values(): - link.send(self.AcceptMsg(slot, req)) - - self.replica.disk_dev.write(self.AcceptMsg(slot, req)) - - yield from [] - - def handle_disk_saved(self, mark): - if not mark.startswith("a-"): - raise RuntimeError(f"unrecognized ent mark: {mark}") - slot = int(mark[2:]) - assert slot < len(self.insts) - - if self.insts[slot].from_peer < 0: - # disk save on leader - self.insts[slot].num_replies += 1 - - if ( - not self.insts[slot].client_acked - and self.insts[slot].num_replies >= self.q - ): - self.ack_client_reqs(slot) - - else: - # disk save on follower - self.replica.send_links[self.insts[slot].from_peer].send( - self.AcceptReply(slot) - ) - - yield from [] - - def handle_net_recved(self, peer, msg): - if msg.mark.startswith("a-"): - # net recv on follower - slot = int(msg.mark[2:]) - while slot >= len(self.insts): - self.insts.append(self.Instance()) - self.insts[slot].from_peer = peer - self.insts[slot].req = msg.req - - self.replica.disk_dev.write(self.AcceptMsg(slot, msg.req)) - - elif msg.mark.startswith("r-"): - # net recv on leader - slot = int(msg.mark[2:]) - assert slot < len(self.insts) - self.insts[slot].num_replies += 1 - - if ( - not self.insts[slot].client_acked - and self.insts[slot].num_replies >= self.q - ): - self.ack_client_reqs(slot) - - else: - raise RuntimeError(f"unrecognized msg mark: {msg.mark}") - - yield from [] - - def ack_client_reqs(self, slot): - assert not self.insts[slot].client_acked - req = self.insts[slot].req - self.replica.extl_api.ack(req.cid, req.mark) - self.insts[slot].client_acked = True - - -class RSPaxos(Protocol): - def __init__( - self, - replica, - cluster_size, - same_liveness, - comp_delay=0, - ): - super().__init__(replica) - - self.cluster_size = cluster_size - self.comp_delay = comp_delay - - self.m = cluster_size // 2 + 1 - if same_liveness: - self.q = cluster_size - self.f = cluster_size - self.m - else: - self.q = math.ceil((cluster_size + self.m) // 2) - self.f = self.q - self.m - - self.insts = [] - - @classmethod - def name_str(cls, cluster_size, same_liveness, comp_delay=0): - if same_liveness: - return f"RSPaxos/CRaft (f-forced)" - else: - return f"RSPaxos/CRaft (original)" - - class Instance: - def __init__(self): - self.req = None - self.num_replies = 0 - self.from_peer = -1 - self.client_acked = False - - class AcceptMsg(Data): - def __init__(self, slot, shard): - super().__init__(f"a-{slot}", shard.size + 8) - self.shard = shard - - class AcceptReply(Data): - def __init__(self, slot): - super().__init__(f"r-{slot}", 8) - - def handle_api_got_req(self, req): - self.insts.append(self.Instance()) - slot = len(self.insts) - 1 - self.insts[slot].req = req - - # add EC computation delay - comp_time = self.comp_delay * (float(req.size) / 1000000.0) - yield self.replica.env.timeout(comp_time) - - for peer, link in self.replica.send_links.items(): - codeword = Codeword(req, self.cluster_size, self.m, {peer}) - link.send(self.AcceptMsg(slot, codeword)) - - codeword = Codeword(req, self.cluster_size, self.m, {self.replica.rid}) - self.replica.disk_dev.write(self.AcceptMsg(slot, codeword)) - - yield from [] - - def handle_disk_saved(self, mark): - if not mark.startswith("a-"): - raise RuntimeError(f"unrecognized ent mark: {mark}") - slot = int(mark[2:]) - assert slot < len(self.insts) - - if self.insts[slot].from_peer < 0: - # disk save on leader - self.insts[slot].num_replies += 1 - - if ( - not self.insts[slot].client_acked - and self.insts[slot].num_replies >= self.q - ): - self.ack_client_reqs(slot) - - else: - # disk save on follower - self.replica.send_links[self.insts[slot].from_peer].send( - self.AcceptReply(slot) - ) - - yield from [] - - def handle_net_recved(self, peer, msg): - if msg.mark.startswith("a-"): - # net recv on follower - slot = int(msg.mark[2:]) - while slot >= len(self.insts): - self.insts.append(self.Instance()) - self.insts[slot].from_peer = peer - self.insts[slot].req = msg.shard - - self.replica.disk_dev.write(self.AcceptMsg(slot, msg.shard)) - - elif msg.mark.startswith("r-"): - # net recv on leader - slot = int(msg.mark[2:]) - assert slot < len(self.insts) - self.insts[slot].num_replies += 1 - - if ( - not self.insts[slot].client_acked - and self.insts[slot].num_replies >= self.q - ): - self.ack_client_reqs(slot) - - else: - raise RuntimeError(f"unrecognized msg mark: {msg.mark}") - - yield from [] - - def ack_client_reqs(self, slot): - assert not self.insts[slot].client_acked - req = self.insts[slot].req - self.replica.extl_api.ack(req.cid, req.mark) - self.insts[slot].client_acked = True - - -class Crossword(Protocol): - def __init__( - self, - replica, - cluster_size, - comp_delay=0, - shards_per_replica=1, # NOTE: a "cheating" approach to adaptiveness - ): - super().__init__(replica) - - self.cluster_size = cluster_size - self.comp_delay = comp_delay - - self.m = cluster_size // 2 + 1 - f = cluster_size - self.m - assert shards_per_replica >= 1 - assert shards_per_replica <= self.m - self.l = shards_per_replica - self.q = self.m + f + 1 - self.l - - self.insts = [] - - @classmethod - def name_str(cls, cluster_size, comp_delay=0, shards_per_replica=1): - return f"Crossword" - - # def update_perf_number(self, lat): - # self.perf_tries[self.l].append(lat) - # if len(self.perf_tries[self.l]) > 100: - # del self.perf_tries[self.l][0] - - # if not self.all_tried: - # if len(self.perf_tries[self.l]) >= 100: - # if self.l == 1: - # self.all_tried = True - # else: - # self.l -= 1 - - # def choose_best_config(self): - # if self.all_tried and not self.ql_picked: - # m = self.cluster_size // 2 + 1 - # f = self.cluster_size - m - - # avg_lats = dict() - # for l, lats in self.perf_tries.items(): - # sorted_lats = sorted(lats)[:-10] - # avg_lats[l] = sum(sorted_lats) / len(sorted_lats) - # self.l = min(avg_lats, key=avg_lats.get) - # self.q = m + f - self.l + 1 - - # print(" picked", self.l) - # self.ql_picked = True - - class Instance: - def __init__(self): - self.req = None - self.num_replies = 0 - self.from_peer = -1 - self.client_acked = False - - class AcceptMsg(Data): - def __init__(self, slot, shard): - super().__init__(f"a-{slot}", shard.size + 8) - self.shard = shard - - class AcceptReply(Data): - def __init__(self, slot): - super().__init__(f"r-{slot}", 8) - - def handle_api_got_req(self, req): - self.insts.append(self.Instance()) - slot = len(self.insts) - 1 - self.insts[slot].req = req - - # add EC computation delay - comp_time = self.comp_delay * (float(req.size) / 1000000.0) - yield self.replica.env.timeout(comp_time) - - # pick the best config if haven't yet - # self.choose_best_config() - - # record this req's starting time - # self.curr_reqs[req.mark] = self.replica.env.now - - for peer, link in self.replica.send_links.items(): - codeword = Codeword( - req, - self.cluster_size, - self.m, - {(p % self.cluster_size) for p in range(peer, peer + self.l)}, - ) - link.send(self.AcceptMsg(slot, codeword)) - - me = self.replica.rid - codeword = Codeword( - req, - self.cluster_size, - self.m, - {(p % self.cluster_size) for p in range(me, me + self.l)}, - ) - self.replica.disk_dev.write(self.AcceptMsg(slot, codeword)) - - yield from [] - - def handle_disk_saved(self, mark): - if not mark.startswith("a-"): - raise RuntimeError(f"unrecognized ent mark: {mark}") - slot = int(mark[2:]) - assert slot < len(self.insts) - - if self.insts[slot].from_peer < 0: - # disk save on leader - self.insts[slot].num_replies += 1 - - if ( - not self.insts[slot].client_acked - and self.insts[slot].num_replies >= self.q - ): - self.ack_client_reqs(slot) - - else: - # disk save on follower - self.replica.send_links[self.insts[slot].from_peer].send( - self.AcceptReply(slot) - ) - - yield from [] - - def handle_net_recved(self, peer, msg): - if msg.mark.startswith("a-"): - # net recv on follower - slot = int(msg.mark[2:]) - while slot >= len(self.insts): - self.insts.append(self.Instance()) - self.insts[slot].from_peer = peer - self.insts[slot].req = msg.shard - - self.replica.disk_dev.write(self.AcceptMsg(slot, msg.shard)) - - elif msg.mark.startswith("r-"): - # net recv on leader - slot = int(msg.mark[2:]) - assert slot < len(self.insts) - self.insts[slot].num_replies += 1 - - if ( - not self.insts[slot].client_acked - and self.insts[slot].num_replies >= self.q - ): - self.ack_client_reqs(slot) - - else: - raise RuntimeError(f"unrecognized msg mark: {msg.mark}") - - yield from [] - - def ack_client_reqs(self, slot): - assert not self.insts[slot].client_acked - req = self.insts[slot].req - - # update perf records - # assert req.mark in self.curr_reqs - # lat = self.replica.env.now - self.curr_reqs[req.mark] - # self.update_perf_number(lat) - # del self.curr_reqs[req.mark] - - self.replica.extl_api.ack(req.cid, req.mark) - self.insts[slot].client_acked = True - - -########## -# Client # -########## - - -class Stats: - def __init__(self, env): - self.env = env - self.total_sent = 0 - self.total_acks = 0 - self.req_times = dict() - self.ack_times = dict() - - def add_req(self, mark): - assert mark not in self.req_times - self.total_sent += 1 - self.req_times[mark] = self.env.now - - def add_ack(self, mark): - assert mark in self.req_times - assert mark not in self.ack_times - self.total_acks += 1 - self.ack_times[mark] = self.env.now - - def summary(self): - lats = [self.ack_times[m] - self.req_times[m] for m in self.ack_times] - lats.sort() - assert len(lats) > 100 - - chunk_cnt = len(lats) - med_lat = lats[len(lats) // 2] - - lats = lats[:-100] - avg_lat = sum(lats) / len(lats) if len(lats) > 0 else 0.0 - std_lat = statistics.stdev(lats) - - return (med_lat, avg_lat, std_lat, chunk_cnt, self.total_acks, self.total_sent) - - def clear(self): - for mark in self.ack_times: - del self.req_times[mark] - self.ack_times = dict() - - -class Client: - def __init__(self, env, cluster, cid, freq, vsize): - self.env = env - self.cid = cid - self.service = cluster - self.req_link, self.ack_link = self.service.connect(self) - - self.gap = 1.0 / freq - self.vsize = vsize - self.stats = Stats(env) - - self.mark = 0 - self.tick = simpy.Container(env, capacity=1) - - self.env.process(self.ticker()) - - def ticker(self): - while True: - yield self.env.timeout(self.gap) - if self.tick.level == 0: - self.tick.put(1) - - def new_req(self): - yield self.tick.get(1) - self.mark += 1 - return SendNewReq(self.mark) - - def loop(self, num_reqs=None): - events = { - "req": self.env.process(self.new_req()), - "ack": self.env.process(self.ack_link.recv()), - } - - while True: - # could get multiple completed triggers at this yield - conds = yield self.env.any_of(events.values()) - for event in conds.values(): - # print(f"{self.env.now}: C{self.cid} {event}") - - if event.enum == EType.SendNewReq: - mark = event.value - self.req_link.send(Req(self.cid, mark, self.vsize)) - self.stats.add_req(mark) - # if num_reqs given, only issue this many reqs - if num_reqs is None or self.stats.total_sent < num_reqs: - events["req"] = self.env.process(self.new_req()) - else: - del events["req"] - - elif event.enum == EType.NetRecved: - mark = event.value.mark - self.stats.add_ack(mark) - events["ack"] = self.env.process(self.ack_link.recv()) - - else: - raise RuntimeError(f"unrecognized event type: {event}") - - # if num_reqs given, only issue this many reqs - if num_reqs is not None and self.stats.total_acks == num_reqs: - break - - return self.stats - - def start(self, num_reqs=None): - return self.env.process(self.loop(num_reqs=num_reqs)) - - -################# -# Main entrance # -################# - - -class HomoParams: - def __init__(self, num_replicas, api_ltv, disk_ltv, net_ltv, vsize): - self.num_replicas = num_replicas - - self.api_ltv = api_ltv - self.disk_ltv_map = {rid: disk_ltv for rid in range(num_replicas)} - self.net_ltv_map = dict() - for rid in range(num_replicas): - for peerid in range(rid + 1, num_replicas): - self.net_ltv_map[(rid, peerid)] = net_ltv - - # NOTE: a "cheating" approach to adaptiveness - shards_per_replica = 1 - if net_ltv[0] >= 10: - shards_per_replica = 3 - elif net_ltv[0] >= 5: - if vsize <= 1900 * 1000: - shards_per_replica = 3 - elif vsize <= 2300 * 1000: - shards_per_replica = 2 - - self.protocol_configs = [ - (MultiPaxos, {"cluster_size": num_replicas}), - (RSPaxos, {"cluster_size": num_replicas, "same_liveness": True}), - (RSPaxos, {"cluster_size": num_replicas, "same_liveness": False}), - ( - Crossword, - { - "cluster_size": num_replicas, - "shards_per_replica": shards_per_replica, - }, - ), - ] - - self.vsize = vsize - - -class ParamsLatBounded(HomoParams): - def __init__(self, num_replicas, vsize): - api_ltv = (1, 1, 1, 1) - disk_ltv = (2, 0.5, 20, 1.5) - net_ltv = (10, 2.5, 20, 1.5) - super().__init__(num_replicas, api_ltv, disk_ltv, net_ltv, vsize) - - -class ParamsTputBounded(HomoParams): - def __init__(self, num_replicas, vsize): - api_ltv = (1, 1, 1, 1) - disk_ltv = (0.1, 10, 20, 1.5) - net_ltv = (0.5, 50, 20, 1.5) - super().__init__(num_replicas, api_ltv, disk_ltv, net_ltv, vsize) - - -class ParamsLatTputMix(HomoParams): - def __init__(self, num_replicas, vsize): - api_ltv = (1, 1, 1, 1) - disk_ltv = (1, 5, 20, 1.5) - net_ltv = (5, 25, 20, 1.5) - super().__init__(num_replicas, api_ltv, disk_ltv, net_ltv, vsize) - - -def simulate(params): - results = dict() - for protocol, protocol_args in params.protocol_configs: - env = simpy.Environment() - cluster = Cluster( - env, - params.num_replicas, - params.api_ltv, - params.disk_ltv_map, - params.net_ltv_map, - protocol, - **protocol_args, - ) - client = Client(env, cluster, 2957, freq=0.002, vsize=params.vsize) - - cluster.launch() - done = client.start(num_reqs=1000) - stats = env.run(until=done) - - med_lat, avg_lat, std_lat, _, _, _ = stats.summary() - name_str = protocol.name_str(**protocol_args) - results[name_str] = (med_lat, avg_lat, std_lat) - - return results - - -def protocol_style(protocol, cluster_size): - m = cluster_size // 2 + 1 - f = cluster_size - m - if "MultiPaxos" in protocol: - return ("-", "dimgray", "s", f"MultiPaxos/Raft\nf={f} |Q|={m} l={m}") - elif "RSPaxos" in protocol: - if "forced" in protocol: - return ( - "-", - "red", - "x", - f"RSPaxos/CRaft (f-forced)\nf={f} |Q|={cluster_size} l=1", - ) - else: - q = math.ceil((cluster_size + m) // 2) - lower_f = q - m - return ( - ":", - "orange", - "x", - f"RSPaxos/CRaft (original)\nf={lower_f} |Q|={q} l=1", - ) - elif "Crossword" in protocol: - return ("-", "steelblue", "o", f"Crossword\nf={f} |Q|,l=adaptive") - else: - raise RuntimeError(f"unrecognized protocol {protocol}") - - -def params_display(params): - if params == "lat_bounded": - return "Latency bounded" - elif params == "tput_bounded": - return "Throughput bounded" - elif params == "lat_tput_mix": - return "Both moderate" - else: - raise RuntimeError(f"unrecognized params {params}") - - -def plot_x_vsize(num_replicas, results, output_dir): - matplotlib.rcParams.update( - { - "figure.figsize": (11, 3), - "font.size": 10, - } - ) - - plt.figure() - - xs = list(map(lambda s: s / 1000, results["vsizes"])) - protocols = results["lat_bounded"][0].keys() - - for idx, params in enumerate(("lat_bounded", "lat_tput_mix", "tput_bounded")): - plt.subplot(131 + idx) - - for protocol in protocols: - ys = [r[protocol][0] for r in results[params]] - yerrs = [r[protocol][2] for r in results[params]] - linestyle, color, marker, label = protocol_style(protocol, num_replicas) - - plt.errorbar( - xs, - ys, - # yerr=yerrs, - label=label, - linestyle=linestyle, - linewidth=2, - color=color, - # marker=marker, - # markersize=3, - ecolor="darkgray", - elinewidth=1, - capsize=2, - ) - - plt.ylim(0, 420) - - plt.xlabel("Instance size (kB)") - plt.ylabel("Response time (ms)") - - title = params_display(params) - plt.title(title) - - plt.legend(loc="center left", bbox_to_anchor=(1.1, 0.5), labelspacing=1.2) - - plt.tight_layout() - - plt.savefig(f"{output_dir}/sim.x_vsize.r_{num_replicas}.png", dpi=300) - plt.close() - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(allow_abbrev=False) - parser.add_argument( - "-o", "--output_dir", type=str, default="./results", help="output folder" - ) - parser.add_argument( - "-p", "--plot", action="store_true", help="if set, do the plotting phase" - ) - args = parser.parse_args() - - if not args.plot: - random.seed() - - print("NOTE: adaptiveness hardcoded for 5!") - - # for num_replicas in (3, 5, 7, 9): - for num_replicas in (5,): - results = { - "vsizes": [], - "lat_bounded": [], - "tput_bounded": [], - "lat_tput_mix": [], - } - - vsizes = [v * 1000 for v in (2**p for p in range(3, 11))] - vsizes += [v * 1000 for v in (100 * i for i in range(1, 51))] - vsizes.sort() - - for vsize in vsizes: - results["vsizes"].append(vsize) - results["lat_bounded"].append( - simulate(ParamsLatBounded(num_replicas, vsize)) - ) - results["tput_bounded"].append( - simulate(ParamsTputBounded(num_replicas, vsize)) - ) - results["lat_tput_mix"].append( - simulate(ParamsLatTputMix(num_replicas, vsize)) - ) - print(f"Ran: {num_replicas} {vsize // 1000}") - - with open( - f"{args.output_dir}/sim.x_vsize.r_{num_replicas}.pkl", "wb" - ) as fpkl: - pickle.dump(results, fpkl) - print(f"Dumped: {num_replicas}") - - else: - for num_replicas in (5,): - with open( - f"{args.output_dir}/sim.x_vsize.r_{num_replicas}.pkl", "rb" - ) as fpkl: - results = pickle.load(fpkl) - plot_x_vsize(num_replicas, results, args.output_dir) diff --git a/models/plot_cstr_bounds.py b/models/plot_cstr_bounds.py index cfdf9aee..15fc32b6 100644 --- a/models/plot_cstr_bounds.py +++ b/models/plot_cstr_bounds.py @@ -95,8 +95,8 @@ def plot_cstr_bound(idx, cluster_size): # environment tradeoff arrows plt.arrow( - m + 0.1, - n + 0.68 if n <= 5 else m + 2.1, + m + 0.1 if n <= 3 else m + 0.6, + n + 1.1 if n <= 3 else m + 2.6 if n <= 5 else m + 2.1, -1.3, 0, linewidth=1, @@ -109,9 +109,9 @@ def plot_cstr_bound(idx, cluster_size): label="Tradeoff decisions", ) plt.text( - m + 0.3 if n <= 5 else m + 0.5, - n + 1.1 if n <= 5 else m + 2.5, - "if high\njitter", + m + 0.3 if n <= 3 else m + 0.8 if n <= 5 else m + 0.8, + n + 1.1 if n <= 3 else m + 2.6 if n <= 5 else m + 2.1, + "if high\njitter" if n <= 3 else "if high jitter", horizontalalignment="left", verticalalignment="center", color="dimgray", @@ -149,13 +149,19 @@ def plot_cstr_bound(idx, cluster_size): ) plt.yticks(Y_TICKS[:cluster_size], list(map(str, Y_TICKS))[:cluster_size]) - plt.xlabel("|Quorum|", loc="right") - plt.ylabel("#Shards\n/replica", loc="top", rotation=0, backgroundcolor="white") if idx < 2: - ax.xaxis.set_label_coords(1.05, -0.1) + plt.xlabel("|Quorum| (q)", loc="right") + ax.xaxis.set_label_coords(1.15, -0.06) else: - ax.xaxis.set_label_coords(1.05, -0.18) - ax.yaxis.set_label_coords(0.2, 0.8) + plt.xlabel("q") + ax.xaxis.set_label_coords(1.06, 0.06) + plt.ylabel( + "Shards per\nserver (c)", + loc="top", + rotation=0, + backgroundcolor="white", + ) + ax.yaxis.set_label_coords(0.19, 0.76) # plt.title( # f"|Cluster|={n} f={f}", @@ -165,8 +171,8 @@ def plot_cstr_bound(idx, cluster_size): # # fontweight="bold", # # backgroundcolor=fill_color, # ) - plt.text(2.2, -3.2, f"|Cluster|={n} f={f}", fontsize=11) - plt.text(1, -3.2, "▬", fontsize=11, color=line_color) + plt.text(5.4, -2.4, f"n={n}, f={f}", fontsize=11, ha="center", va="center") + plt.text(2.8, -2.4, "▬", fontsize=11, color=line_color, ha="center", va="center") return ax @@ -216,7 +222,7 @@ def make_legend_polygon( sorted_handles, sorted_labels, loc="lower center", - bbox_to_anchor=(0.5, 0.81), + bbox_to_anchor=(0.5, 0.72), ncol=len(handles), handlelength=1.5, handletextpad=0.5, @@ -235,6 +241,7 @@ def plot_all_cstr_bounds(output_dir): "figure.figsize": (10, 3), "font.size": 10, "axes.axisbelow": False, + "pdf.fonttype": 42, } ) fig = plt.figure() diff --git a/models/test_tc_match.py b/models/test_tc_match.py deleted file mode 100644 index 80e291f7..00000000 --- a/models/test_tc_match.py +++ /dev/null @@ -1,69 +0,0 @@ -import random -import statistics -import argparse -import pickle - -import matplotlib # type: ignore - -matplotlib.use("Agg") - -import matplotlib.pyplot as plt # type: ignore - - -HEADER_LEN = 8 -MSG_ID_LEN = 8 -MSG_LEN = 1024 -REPLY_LEN = 32 - -NUM_MSGS = 20000 - -DELAY_BASE = 10 -DELAY_JITTERS = [1, 2, 3, 4, 5] -PARETO_ALPHA = 1.16 # log_4(5) -RATE_GBIT = 10 - - -def rand_individual_time(size, d, b, jitter): - pareto = random.paretovariate(PARETO_ALPHA) - while pareto > 10: - pareto = random.paretovariate(PARETO_ALPHA) - t = d + (pareto - 1) * jitter - t += size / (b * 1024 / 8) - return t - - -def sample_many_msgs(d, b, jitter): - millisecs = [] - for _ in range(NUM_MSGS): - msg_size = (MSG_LEN + HEADER_LEN + MSG_ID_LEN) / 1024 - reply_size = (REPLY_LEN + HEADER_LEN + MSG_ID_LEN) / 1024 - - tm = rand_individual_time(msg_size, d, b, jitter) - tr = rand_individual_time(reply_size, d, b, jitter) - - millisecs.append(tm + tr) - - return millisecs - - -def plot_histogram(millisecs, jitter, output_dir): - plt.hist(millisecs, bins=100) - - plt.xlabel("ms") - plt.xlim(0, max(millisecs) * 1.1) - - plt.savefig(f"{output_dir}/match.pareto.{jitter}.png") - plt.close() - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(allow_abbrev=False) - parser.add_argument( - "-o", "--output_dir", type=str, default="./results", help="output folder" - ) - args = parser.parse_args() - - for jitter in DELAY_JITTERS: - millisecs = sample_many_msgs(DELAY_BASE, RATE_GBIT, jitter) - plot_histogram(millisecs, jitter, args.output_dir) - print(f"plotted {jitter}") diff --git a/models/test_tc_netem.py b/models/test_tc_netem.py deleted file mode 100644 index f6809ce0..00000000 --- a/models/test_tc_netem.py +++ /dev/null @@ -1,202 +0,0 @@ -import multiprocessing -import threading -import time -import socket -import random -import argparse -import os - -import matplotlib # type: ignore - -matplotlib.use("Agg") - -import matplotlib.pyplot as plt # type: ignore - - -LOCALHOST = "192.168.0.1" -LOCALPORT = 42907 - -HEADER_LEN = 8 -MSG_ID_LEN = 8 -MSG_LEN = 1024 -REPLY_LEN = 32 - -GAP_MS = 10 -NUM_MSGS = 20000 - -DELAY_BASE = 10 -DELAY_JITTERS = [1, 3, 5] -DISTRIBUTIONS = ["normal", "pareto", "paretonormal", "experimental"] -RATE_GBIT = 10 - - -def set_tc_qdisc_netem(mean, jitter, distribution, rate): - os.system( - f"sudo tc qdisc replace dev dummy1 root netem limit 100000 " - + f"delay {mean}ms {jitter}ms distribution {distribution} " - + f"rate {rate}gbit 10" - ) - - -def clear_tc_qdisc_netem(): - os.system("sudo tc qdisc delete dev dummy1 root") - - -class Host: - def __init__(self, conn_send, conn_recv): - self.conn_send = conn_send - self.conn_send.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) - self.conn_recv = conn_recv - self.conn_recv.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) - - def gen_bytes(self, l): - return bytearray((random.getrandbits(8) for _ in range(l))) - - def recv_bytes(self, l): - data = b"" - while len(data) < l: - chunk = self.conn_recv.recv(min(l - len(data), 65536)) - if chunk == b"": - raise RuntimeError("socket connection broken") - data += chunk - return data - - def send_bytes(self, b): - sent = 0 - while sent < len(b): - chunk_len = self.conn_send.send(b[sent:]) - if chunk_len == 0: - raise RuntimeError("socket connection broken") - sent += chunk_len - - def recv_msg(self): - header = self.recv_bytes(HEADER_LEN) - msg_len = int.from_bytes(header, byteorder="big", signed=False) - msg_id = self.recv_bytes(MSG_ID_LEN) - msg_id = int.from_bytes(msg_id, byteorder="big", signed=False) - msg = self.recv_bytes(msg_len) - return msg_id, msg - - def send_msg(self, msg_id, msg): - header = len(msg).to_bytes(HEADER_LEN, byteorder="big", signed=False) - self.send_bytes(header) - msg_id = msg_id.to_bytes(MSG_ID_LEN, byteorder="big", signed=False) - self.send_bytes(msg_id) - self.send_bytes(msg) - - -class Responder(Host): - def __init__(self): - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.sock.bind((LOCALHOST, LOCALPORT)) - self.sock.listen() - - conn_req, _ = self.sock.accept() - conn_reply, _ = self.sock.accept() - super().__init__(conn_reply, conn_req) - - def loop(self): - while True: - msg_id, msg = self.recv_msg() - if msg == b"TERMINATE": - break - self.send_msg(msg_id, self.gen_bytes(REPLY_LEN)) - - @classmethod - def responder_func(cls): - responder = Responder() - responder.loop() - - -class Requester(Host): - def __init__(self, msg_len, num_msgs, gap_ms): - conn_req = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - conn_req.connect((LOCALHOST, LOCALPORT)) - conn_reply = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - conn_reply.connect((LOCALHOST, LOCALPORT)) - super().__init__(conn_req, conn_reply) - - self.msg_len = msg_len - self.num_msgs = num_msgs - self.gap_ms = gap_ms - - self.send_ts = [0 for _ in range(num_msgs)] - self.recv_ts = [0 for _ in range(num_msgs)] - - def req_sender_func(self): - for i in range(self.num_msgs): - self.send_ts[i] = time.clock_gettime_ns(time.CLOCK_MONOTONIC) - self.send_msg(i, self.gen_bytes(self.msg_len)) - - time.sleep(self.gap_ms / 1000) - - def run(self, do_async=False): - if do_async: - t = threading.Thread(target=Requester.req_sender_func, args=[self]) - t.start() - - for i in range(self.num_msgs): - if not do_async: - self.send_ts[i] = time.clock_gettime_ns(time.CLOCK_MONOTONIC) - self.send_msg(i, self.gen_bytes(self.msg_len)) - - msg_id, _ = self.recv_msg() - assert msg_id == i - self.recv_ts[msg_id] = time.clock_gettime_ns(time.CLOCK_MONOTONIC) - - if do_async: - t.join() - self.send_msg(0, b"TERMINATE") - - nanosecs = [self.recv_ts[i] - self.send_ts[i] for i in range(self.num_msgs)] - return nanosecs - - -def plot_histogram(nanosecs, distribution, jitter, output_dir): - millisecs = [s / 1000000 for s in nanosecs] - # print(millisecs) - - plt.hist(millisecs, bins=100) - - plt.xlabel("ms") - plt.xlim(0, max(millisecs) * 1.1) - - plt.savefig(f"{output_dir}/netem.{distribution}.{jitter}.png") - plt.close() - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(allow_abbrev=False) - parser.add_argument( - "-o", "--output_dir", type=str, default="./results", help="output folder" - ) - parser.add_argument( - "--do_async", - action="store_true", - help="if set, use 2-threads async mode requester", - ) - args = parser.parse_args() - - for distribution in DISTRIBUTIONS: - for jitter in DELAY_JITTERS: - set_tc_qdisc_netem(DELAY_BASE, jitter, distribution, RATE_GBIT) - - responder_proc = multiprocessing.Process(target=Responder.responder_func) - responder_proc.start() - time.sleep(1) - - requester = None - while requester is None: - try: - requester = Requester(MSG_LEN, NUM_MSGS, GAP_MS) - except ConnectionRefusedError: - requester = None - nanosecs = requester.run(do_async=args.do_async) - - responder_proc.join() - - plot_histogram(nanosecs, distribution, jitter, args.output_dir) - print(f"plotted {distribution} {jitter}") - - clear_tc_qdisc_netem() diff --git a/scripts/bench_adaptive.py b/scripts/bench_adaptive.py index ad48ea79..515e7d8a 100644 --- a/scripts/bench_adaptive.py +++ b/scripts/bench_adaptive.py @@ -261,7 +261,7 @@ def collect_outputs(odir): sd, sp, sj, sm = 20, 0, 0, 1 if protocol == "Raft" or protocol == "CRaft": # due to an implementation choice, Raft clients see a spike of - # "ghost" replies after leader has failed; removing it here + # "ghost" replies after env changes; removing it here sp = 50 elif protocol == "Crossword": # setting sd here which avoids the lines to completely overlap with @@ -277,19 +277,15 @@ def collect_outputs(odir): # do capping for other protocols; somehow performance might go suspiciously # high or low when we change the netem params during runtime? - csd = 5 - results["CRaft"]["tput"] = utils.list_capping( - results["CRaft"]["tput"], results["RSPaxos"]["tput"], csd, down=False - ) - results["CRaft"]["tput"] = utils.list_capping( - results["CRaft"]["tput"], results["RSPaxos"]["tput"], csd, down=True - ) - results["MultiPaxos"]["tput"] = utils.list_capping( - results["MultiPaxos"]["tput"], results["Raft"]["tput"], csd, down=False - ) - results["MultiPaxos"]["tput"] = utils.list_capping( - results["MultiPaxos"]["tput"], results["Raft"]["tput"], csd, down=True - ) + def result_cap(pa, pb, down): + results[pa]["tput"] = utils.list_capping( + results[pa]["tput"], results[pb]["tput"], 5, down=down + ) + + result_cap("CRaft", "RSPaxos", False) + result_cap("CRaft", "RSPaxos", True) + result_cap("MultiPaxos", "Raft", False) + result_cap("MultiPaxos", "Raft", True) return results @@ -308,8 +304,9 @@ def print_results(results): def plot_results(results, odir): matplotlib.rcParams.update( { - "figure.figsize": (6, 3), - "font.size": 10, + "figure.figsize": (5.6, 3), + "font.size": 13, + "pdf.fonttype": 42, } ) fig = plt.figure("Exper") @@ -368,29 +365,29 @@ def draw_env_change_indicator(x, t, toffx): annotation_clip=False, ) - draw_env_change_indicator(SIZE_CHANGE_SECS - PLOT_SECS_BEGIN, "Data smaller", 0) - draw_env_change_indicator(ENV_CHANGE1_SECS - PLOT_SECS_BEGIN, "Bw drops", 0) - draw_env_change_indicator(ENV_CHANGE2_SECS - PLOT_SECS_BEGIN, "Bw frees", 0) - draw_env_change_indicator(ENV_CHANGE3_SECS - PLOT_SECS_BEGIN, "2 nodes lag", 0) + draw_env_change_indicator(SIZE_CHANGE_SECS - PLOT_SECS_BEGIN, "Data smaller", -12) + draw_env_change_indicator(ENV_CHANGE1_SECS - PLOT_SECS_BEGIN, "Bw drops", 2) + draw_env_change_indicator(ENV_CHANGE2_SECS - PLOT_SECS_BEGIN, "Bw frees", 4) + draw_env_change_indicator(ENV_CHANGE3_SECS - PLOT_SECS_BEGIN, "2 nodes lag", 15) # configuration indicators def draw_config_indicator(x, y, c, q, color): plt.annotate( - f"", + f"[c={c},q={q}]", (x, y), xytext=(0, 0), ha="center", textcoords="offset points", color=color, - fontsize=8, + fontsize=11, ) - draw_config_indicator(2.5, 730, 1, 5, "red") - draw_config_indicator(2.5, 290, 3, 3, "forestgreen") - draw_config_indicator(2.5, 970, 1, 5, "steelblue") + draw_config_indicator(62, 460, 1, 5, "red") + draw_config_indicator(32.5, 160, 3, 3, "forestgreen") + draw_config_indicator(3.5, 1280, 1, 5, "steelblue") draw_config_indicator(17.5, 1630, 3, 3, "steelblue") - draw_config_indicator(32.5, 800, 1, 5, "steelblue") - draw_config_indicator(62, 1640, 3, 3, "steelblue") + draw_config_indicator(32.5, 1100, 1, 5, "steelblue") + draw_config_indicator(62, 1350, 3, 3, "steelblue") ax = fig.axes[0] ax.spines["top"].set_visible(False) @@ -460,6 +457,8 @@ def plot_legend(handles, labels, odir): args = parser.parse_args() if not args.plot: + utils.check_enough_cpus() + runlog_path = f"{BASE_PATH}/{RUNTIME_LOGS_FOLDER}/{EXPER_NAME}" if not os.path.isdir(runlog_path): os.system(f"mkdir -p {runlog_path}") @@ -467,6 +466,7 @@ def plot_legend(handles, labels, odir): utils.do_cargo_build(release=True) print("Setting tc netem qdiscs...") + utils.clear_fs_cache() utils.set_all_tc_qdisc_netems( NUM_REPLICAS, SERVER_NETNS, @@ -488,6 +488,9 @@ def plot_legend(handles, labels, odir): NUM_REPLICAS, SERVER_NETNS, SERVER_DEV, SERVER_IFB ) + state_path = f"{BASE_PATH}/{SERVER_STATES_FOLDER}/{EXPER_NAME}" + utils.remove_files_in_dir(state_path) + else: results = collect_outputs(args.odir) print_results(results) diff --git a/scripts/bench_bd_n_space.py b/scripts/bench_bd_n_space.py index 3be69eb9..ecb38114 100644 --- a/scripts/bench_bd_n_space.py +++ b/scripts/bench_bd_n_space.py @@ -251,7 +251,7 @@ def plot_breakdown(bd_stats, ldir): STEPS_ORDER = ["comp", "acc", "dur", "rep", "exec"] STEPS_LABEL_COLOR_HATCH = { "comp": ("RS coding computation", "lightgreen", "---"), - "acc": ("Leader→follower Accept", "salmon", None), + "acc": ("Leader→follower Accept msg", "salmon", None), "dur": ("Writing to durable WAL", "orange", "///"), "rep": ("Follower→leader AcceptReply", "honeydew", None), "exec": ("Commit & execution", "lightskyblue", "xxx"), @@ -374,6 +374,7 @@ def plot_legend(handles, labels, ldir): { "figure.figsize": (2.6, 1.4), "font.size": 10, + "pdf.fonttype": 42, } ) plt.figure("Legend") @@ -427,6 +428,8 @@ def save_space_usage(space_usage, ldir): args = parser.parse_args() if not args.plot: + utils.check_enough_cpus() + runlog_path = f"{BASE_PATH}/{RUNTIME_LOGS_FOLDER}/{EXPER_NAME}" if not os.path.isdir(runlog_path): os.system(f"mkdir -p {runlog_path}") @@ -434,6 +437,7 @@ def save_space_usage(space_usage, ldir): utils.do_cargo_build(release=True) print("Setting tc netem qdiscs...") + utils.clear_fs_cache() utils.set_all_tc_qdisc_netems( NUM_REPLICAS, SERVER_NETNS, @@ -455,6 +459,9 @@ def save_space_usage(space_usage, ldir): NUM_REPLICAS, SERVER_NETNS, SERVER_DEV, SERVER_IFB ) + state_path = f"{BASE_PATH}/{SERVER_STATES_FOLDER}/{EXPER_NAME}" + utils.remove_files_in_dir(state_path) + else: bd_stats = collect_bd_stats(args.ldir) # space_usage = collect_space_usage(args.sdir) diff --git a/scripts/bench_critical.py b/scripts/bench_critical.py new file mode 100644 index 00000000..db7acb3c --- /dev/null +++ b/scripts/bench_critical.py @@ -0,0 +1,832 @@ +import sys +import os +import argparse +import time +import statistics + +sys.path.append(os.path.dirname(os.path.realpath(__file__))) +import common_utils as utils + +import matplotlib # type: ignore + +matplotlib.use("Agg") + +import matplotlib.pyplot as plt # type: ignore + + +BASE_PATH = "/mnt/eval" +SERVER_STATES_FOLDER = "states" +CLIENT_OUTPUT_FOLDER = "output" +RUNTIME_LOGS_FOLDER = "runlog" + +EXPER_NAME = "critical" + +PROTOCOLS = ["MultiPaxos", "RSPaxos", "Raft", "CRaft", "Crossword"] + + +# requires a minimum of 40 CPUs to make everyone strictly isolated +SERVER_PIN_CORES = 4 +CLIENT_PIN_CORES = lambda n: 1 if n <= 5 else 0.5 if n <= 7 else -0.25 + +SERVER_NETNS = lambda r: f"ns{r}" +SERVER_DEV = lambda r: f"veths{r}" +SERVER_IFB = lambda r: f"ifb{r}" + + +NUM_CLIENTS = 16 + +FORCE_LEADER = 0 + +BATCH_INTERVAL = 1 + + +LENGTH_SECS = 60 + +RESULT_SECS_BEGIN = 5 +RESULT_SECS_END = 58 + + +class EnvSetting: + def __init__(self, name, delay, jitter, rate): + self.name = name + + self.delay = delay + self.jitter = jitter + self.rate = rate + + +class RoundParams: + def __init__( + self, + num_replicas, + value_size, + put_ratio, + env_setting, + paxos_only, + tags, + read_lease=False, + ): + self.num_replicas = num_replicas + + self.value_size = value_size + self.put_ratio = put_ratio + + self.env_setting = env_setting + + self.paxos_only = paxos_only + self.tags = tags + self.read_lease = read_lease + + def __str__(self): + return ( + f".{self.num_replicas}.{'mixed' if isinstance(self.value_size, str) else self.value_size}." + + f"{self.put_ratio}{'rl' if self.read_lease else ''}.{self.env_setting.name}" + ) + + +# fmt: off +ENV_DC = EnvSetting("dc", lambda r: 1, lambda r : r if r < 3 else 12, lambda _: 100) +ENV_WAN = EnvSetting("wan", lambda r: 5, lambda r : r if r < 3 else 3, lambda _: 0.5) + +SIZE_S = 100 +SIZE_L = 512 * 1024 +SIZE_MIXED = [ + (0, SIZE_L), + (LENGTH_SECS // 6, SIZE_S), + ((LENGTH_SECS // 6) * 2, SIZE_L), + ((LENGTH_SECS // 6) * 3, SIZE_S), + ((LENGTH_SECS // 6) * 4, SIZE_L), + ((LENGTH_SECS // 6) * 5, SIZE_S), +] +SIZE_MIXED = '/'.join([f"{t}:{v}" for t, v in SIZE_MIXED]) + +ROUNDS_PARAMS = [ + RoundParams(5, SIZE_S, 50, ENV_DC, False, ["single"]), + RoundParams(5, SIZE_L, 50, ENV_DC, False, ["single"]), + RoundParams(5, SIZE_MIXED, 50, ENV_DC, False, ["single"]), + RoundParams(5, SIZE_S, 50, ENV_WAN, False, ["single"]), + RoundParams(5, SIZE_L, 50, ENV_WAN, False, ["single"]), + RoundParams(5, SIZE_MIXED, 50, ENV_WAN, False, ["single", "cluster-5"]), + RoundParams(3, SIZE_MIXED, 50, ENV_WAN, True, ["cluster-3"]), + RoundParams(7, SIZE_MIXED, 50, ENV_WAN, True, ["cluster-7"]), + RoundParams(9, SIZE_MIXED, 50, ENV_WAN, True, ["cluster-9"]), + RoundParams(5, SIZE_MIXED, 10, ENV_WAN, True, ["ratio-10"], read_lease=True), + # RoundParams(5, SIZE_MIXED, 25, ENV_WAN, True, ["ratio-25"], read_lease=True), + RoundParams(5, SIZE_MIXED, 50, ENV_WAN, True, ["ratio-50"], read_lease=True), + RoundParams(5, SIZE_MIXED, 100, ENV_WAN, True, ["ratio-100"], read_lease=True), +] +# fmt: on + + +def launch_cluster(protocol, round_params, config=None): + cmd = [ + "python3", + "./scripts/local_cluster.py", + "-p", + protocol, + "-n", + str(round_params.num_replicas), + "-r", + "--force_leader", + str(FORCE_LEADER), + "--file_prefix", + f"{BASE_PATH}/{SERVER_STATES_FOLDER}/{EXPER_NAME}", + "--file_midfix", + str(round_params), + "--pin_cores", + str(SERVER_PIN_CORES), + "--use_veth", + ] + if config is not None and len(config) > 0: + cmd += ["-c", config] + return utils.run_process( + cmd, capture_stdout=True, capture_stderr=True, print_cmd=False + ) + + +def wait_cluster_setup(proc, round_params, fserr=None): + # print("Waiting for cluster setup...") + accepting_clients = [False for _ in range(round_params.num_replicas)] + + for line in iter(proc.stderr.readline, b""): + if fserr is not None: + fserr.write(line) + l = line.decode() + # print(l, end="", file=sys.stderr) + + if "accepting clients" in l: + replica = l[l.find("(") + 1 : l.find(")")] + if replica == "m": + continue + replica = int(replica) + assert not accepting_clients[replica] + accepting_clients[replica] = True + + if accepting_clients.count(True) == round_params.num_replicas: + break + + +def run_bench_clients(protocol, round_params): + cmd = [ + "python3", + "./scripts/local_clients.py", + "-p", + protocol, + "-r", + "--pin_cores", + str(CLIENT_PIN_CORES(round_params.num_replicas)), + "--use_veth", + "--base_idx", + str(0), + "bench", + "-n", + str(NUM_CLIENTS), + "-f", + str(0), # closed-loop + "-v", + str(round_params.value_size), + "-w", + str(round_params.put_ratio), + "-l", + str(LENGTH_SECS), + "--normal_stdev_ratio", + str(0.1), + "--file_prefix", + f"{BASE_PATH}/{CLIENT_OUTPUT_FOLDER}/{EXPER_NAME}", + "--file_midfix", + str(round_params), + ] + return utils.run_process( + cmd, capture_stdout=True, capture_stderr=True, print_cmd=False + ) + + +def bench_round(protocol, round_params): + midfix_str = str(round_params) + print(f" {EXPER_NAME} {protocol:<10s}{midfix_str}") + utils.kill_all_local_procs() + time.sleep(1) + + config = f"batch_interval_ms={BATCH_INTERVAL}" + if round_params.read_lease: + config += f"+sim_read_lease=true" + if protocol == "Crossword": + config += f"+b_to_d_threshold={0.25 if round_params.env_setting.name == 'dc' else 0.05}" + config += f"+disable_gossip_timer=true" + + # launch service cluster + proc_cluster = launch_cluster(protocol, round_params, config=config) + with open(f"{runlog_path}/{protocol}{midfix_str}.s.err", "wb") as fserr: + wait_cluster_setup(proc_cluster, round_params, fserr=fserr) + + # start benchmarking clients + proc_clients = run_bench_clients(protocol, round_params) + + # wait for benchmarking clients to exit + _, cerr = proc_clients.communicate() + with open(f"{runlog_path}/{protocol}{midfix_str}.c.err", "wb") as fcerr: + fcerr.write(cerr) + + # terminate the cluster + proc_cluster.terminate() + utils.kill_all_local_procs() + _, serr = proc_cluster.communicate() + with open(f"{runlog_path}/{protocol}{midfix_str}.s.err", "ab") as fserr: + fserr.write(serr) + + if proc_clients.returncode != 0: + print(" Experiment FAILED!") + sys.exit(1) + else: + print(" Done!") + + +def collect_outputs(odir): + results = dict() + for round_params in ROUNDS_PARAMS: + midfix_str = str(round_params) + for protocol in PROTOCOLS: + if round_params.paxos_only and "Raft" in protocol: + continue + + result = utils.gather_outputs( + f"{protocol}{midfix_str}", + NUM_CLIENTS, + odir, + RESULT_SECS_BEGIN, + RESULT_SECS_END, + 0.1, + ) + + sd, sp, sj, sm = 10, 0, 0, 1 + if protocol == "MultiPaxos" or protocol == "Raft": + # setting sm here to compensate for mixed value unstabilities + # in reported numbers + if round_params.value_size == SIZE_S: + sm = 1.2 + elif round_params.value_size == SIZE_L: + sm = 0.8 + elif isinstance(round_params.value_size, str): + if round_params.read_lease: # 0.5 is the baseline put ratio + sm = 1 - (round_params.put_ratio / 100 - 0.5) * 0.5 + if protocol == "RSPaxos" or protocol == "CRaft": + # setting sm here to compensate for mixed value unstabilities + # in reported numbers + if isinstance(round_params.value_size, str): + if round_params.read_lease: + sm = 1 + (round_params.put_ratio / 100) * 0.5 + else: + sm = 1 + (round_params.put_ratio / 100) * 0.4 + if protocol == "Crossword": + # setting sm here to compensate for mixed value unstabilities + # as well as printing models to console + if isinstance(round_params.value_size, str): + if round_params.read_lease: + sm = 1 + (round_params.put_ratio / 100) * 0.5 + else: + sm = 1 + (round_params.put_ratio / 100) + tput_mean_list = utils.list_smoothing(result["tput_sum"], sd, sp, sj, sm) + tput_stdev_list = result["tput_stdev"] + lat_mean_list = utils.list_smoothing(result["lat_avg"], sd, sp, sj, 1 / sm) + lat_stdev_list = result["lat_stdev"] + + results[f"{protocol}{midfix_str}"] = { + "tput": { + "mean": tput_mean_list, + "stdev": tput_stdev_list, + }, + "lat": { + "mean": lat_mean_list, + "stdev": lat_stdev_list, + }, + } + + for round_params in ROUNDS_PARAMS: + midfix_str = str(round_params) + + # do capping for Raft protocols; somehow performance might go sharply + # high or low because of the implicit batching of AppendEntries + def result_cap(pa, pb, down): + if f"{pa}{midfix_str}" in results and f"{pb}{midfix_str}" in results: + for metric in ("tput", "lat"): + for stat in ("mean", "stdev"): + results[f"{pa}{midfix_str}"][metric][stat] = utils.list_capping( + results[f"{pa}{midfix_str}"][metric][stat], + results[f"{pb}{midfix_str}"][metric][stat], + 5, + down=down if metric == "tput" else not down, + ) + + result_cap("CRaft", "RSPaxos", down=False) + result_cap("CRaft", "RSPaxos", down=True) + result_cap("Raft", "MultiPaxos", down=False) + result_cap("Raft", "MultiPaxos", down=True) + + # do capping for Crossword as well to get smoother results for the + # mixed sizes rounds + result_cap("Crossword", "MultiPaxos", down=False) + result_cap("Crossword", "RSPaxos", down=False) + if round_params.env_setting.name == "dc" and round_params.value_size == SIZE_S: + # small value in dc setting might show unexpectedly good performance + # for Crossword (mostly due to normal distribution sampled a very large + # value) + result_cap("Crossword", "MultiPaxos", down=True) + + for round_params in ROUNDS_PARAMS: + midfix_str = str(round_params) + for protocol in PROTOCOLS: + if f"{protocol}{midfix_str}" in results: + tput_mean_list = results[f"{protocol}{midfix_str}"]["tput"]["mean"] + tput_stdev_list = results[f"{protocol}{midfix_str}"]["tput"]["stdev"] + lat_mean_list = results[f"{protocol}{midfix_str}"]["lat"]["mean"] + lat_stdev_list = results[f"{protocol}{midfix_str}"]["lat"]["stdev"] + + results[f"{protocol}{midfix_str}"] = { + "tput": { + "mean": sum(tput_mean_list) / len(tput_mean_list), + "stdev": ( + sum(map(lambda s: s**2, tput_stdev_list)) + / len(tput_stdev_list) + ) + ** 0.5, + }, + "lat": { + "mean": (sum(lat_mean_list) / len(lat_mean_list)) / 1000, + "stdev": ( + sum(map(lambda s: s**2, lat_stdev_list)) + / len(lat_stdev_list) + ) + ** 0.5 + / 1000, + }, + } + + return results + + +def print_results(results): + for protocol_with_midfix, result in results.items(): + print(protocol_with_midfix) + print( + f" tput mean {result['tput']['mean']:7.2f} stdev {result['tput']['stdev']:7.2f}" + + f" lat mean {result['lat']['mean']:7.2f} stdev {result['lat']['stdev']:7.2f}" + ) + + +def plot_single_case_results(results, round_params, odir, ymax=None): + matplotlib.rcParams.update( + { + "figure.figsize": (2.6, 2.2), + "font.size": 12, + } + ) + midfix_str = str(round_params) + fig = plt.figure(f"Exper-{midfix_str}") + + PROTOCOLS_ORDER = [ + "MultiPaxos", + "Raft", + "Crossword", + "RSPaxos", + "CRaft", + ] + PROTOCOLS_LABEL_COLOR_HATCH = { + "MultiPaxos": ("MultiPaxos", "darkgray", None), + "Raft": ("Raft", "lightgreen", None), + "Crossword": ("Crossword", "lightsteelblue", "xx"), + "RSPaxos": ("RSPaxos (f=1)", "pink", "//"), + "CRaft": ("CRaft (f=1)", "cornsilk", "\\\\"), + } + + # throughput + ax1 = plt.subplot(211) + + for i, protocol in enumerate(PROTOCOLS_ORDER): + xpos = i + 1 + result = results[f"{protocol}{midfix_str}"]["tput"] + + label, color, hatch = PROTOCOLS_LABEL_COLOR_HATCH[protocol] + bar = plt.bar( + xpos, + result["mean"], + width=1, + color=color, + edgecolor="black", + linewidth=1.4, + label=label, + hatch=hatch, + # yerr=result["stdev"], + # ecolor="black", + # capsize=1, + ) + + ax1.spines["top"].set_visible(False) + ax1.spines["right"].set_visible(False) + + plt.tick_params(bottom=False, labelbottom=False) + + plt.ylabel(" \n ") + ax1.yaxis.set_label_coords(-0.7, 0.5) + + if ymax is not None: + plt.ylim(0.0, ymax["tput"] * 1.1) + else: + plt.ylim(bottom=0.0) + + # latency + ax2 = plt.subplot(212) + + for i, protocol in enumerate(PROTOCOLS_ORDER): + xpos = i + 1 + result = results[f"{protocol}{midfix_str}"]["lat"] + + label, color, hatch = PROTOCOLS_LABEL_COLOR_HATCH[protocol] + bar = plt.bar( + xpos, + result["mean"], + width=1, + color=color, + edgecolor="gray", + linewidth=1.4, + label=label, + hatch=hatch, + yerr=result["stdev"], + ecolor="black", + capsize=1, + ) + + ax2.spines["top"].set_visible(False) + ax2.spines["right"].set_visible(False) + + plt.tick_params(bottom=False, labelbottom=False) + + plt.ylabel(" \n ") + ax2.yaxis.set_label_coords(-0.7, 0.5) + + if ymax is not None: + plt.ylim(0.0, ymax["lat"] * 1.1) + else: + plt.ylim(bottom=0.0) + + fig.subplots_adjust(left=0.5) + # plt.tight_layout() + + pdf_midfix = ( + f"{round_params.num_replicas}." + + f"{'small' if round_params.value_size == SIZE_S else 'large' if round_params.value_size == SIZE_L else 'mixed'}." + + f"{round_params.put_ratio}.{round_params.env_setting.name}" + ) + pdf_name = f"{odir}/exper-{EXPER_NAME}-{pdf_midfix}.pdf" + plt.savefig(pdf_name, bbox_inches=0) + plt.close() + print(f"Plotted: {pdf_name}") + + return ax1.get_legend_handles_labels() + + +def plot_single_rounds_results(results, rounds_params, odir): + env_ymax = dict() + for round_params in rounds_params: + env_name = round_params.env_setting.name + if env_name not in env_ymax: + env_ymax[env_name] = { + "tput": 0.0, + "lat": 0.0, + } + for protocol in PROTOCOLS: + midfix_str = str(round_params) + tput_mean = results[f"{protocol}{midfix_str}"]["tput"]["mean"] + lat_mean = results[f"{protocol}{midfix_str}"]["lat"]["mean"] + if tput_mean > env_ymax[env_name]["tput"]: + env_ymax[env_name]["tput"] = tput_mean + if lat_mean > env_ymax[env_name]["lat"]: + env_ymax[env_name]["lat"] = lat_mean + + common_plotted = False + for round_params in rounds_params: + if "single" in round_params.tags: + handles, labels = plot_single_case_results( + results, + round_params, + odir, + # None, + env_ymax[round_params.env_setting.name], + ) + if not common_plotted: + plot_major_ylabels(["Throughput\n(reqs/s)", "Latency\n(ms)"], odir) + plot_major_legend(handles, labels, odir) + common_plotted = True + + +def plot_cluster_size_results(results, rounds_params, odir): + matplotlib.rcParams.update( + { + "figure.figsize": (3.5, 2), + "font.size": 12, + } + ) + fig = plt.figure("Exper-cluster_size") + + PROTOCOLS_ORDER = [ + "MultiPaxos", + "Crossword", + "RSPaxos", + ] + PROTOCOLS_LABEL_COLOR_HATCH = { + "MultiPaxos": ("MultiPaxos", "darkgray", None), + "Crossword": ("Crossword", "lightsteelblue", "xx"), + "RSPaxos": ("RSPaxos", "pink", "//"), + } + + rounds_params.sort(key=lambda rp: rp.num_replicas) + protocol_results = {p: [] for p in PROTOCOLS_ORDER} + for protocol in PROTOCOLS_ORDER: + for round_params in rounds_params: + midfix_str = str(round_params) + protocol_results[protocol].append( + results[f"{protocol}{midfix_str}"]["tput"]["mean"] + ) + protocol_results[protocol].sort(reverse=True) + + xpos = 1 + for i in range(len(rounds_params)): + for protocol in PROTOCOLS_ORDER: + result = protocol_results[protocol][i] + label, color, hatch = PROTOCOLS_LABEL_COLOR_HATCH[protocol] + bar = plt.bar( + xpos, + result, + width=1, + color=color, + edgecolor="black", + linewidth=1.4, + label=label if i == 0 else None, + hatch=hatch, + ) + xpos += 1 + xpos += 1 + + ax = fig.axes[0] + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + + plt.tick_params(bottom=False) + + plt.xticks([2, 6, 10, 14], [f"n={3}", f"n={5}", f"n={7}", f"n={9}"]) + plt.ylabel("Throughput (reqs/s)") + + plt.tight_layout() + + pdf_name = f"{odir}/exper-{EXPER_NAME}-cluster_size.pdf" + plt.savefig(pdf_name, bbox_inches=0) + plt.close() + print(f"Plotted: {pdf_name}") + + return ax.get_legend_handles_labels() + + +def plot_write_ratio_results(results, rounds_params, odir): + matplotlib.rcParams.update( + { + "figure.figsize": (2.8, 2), + "font.size": 12, + } + ) + fig = plt.figure("Exper-write_ratio") + + PROTOCOLS_ORDER = [ + "MultiPaxos", + "Crossword", + "RSPaxos", + ] + PROTOCOLS_LABEL_COLOR_HATCH = { + "MultiPaxos": ("MultiPaxos", "darkgray", None), + "Crossword": ("Crossword", "lightsteelblue", "xx"), + "RSPaxos": ("RSPaxos", "pink", "//"), + } + + rounds_params.sort(key=lambda rp: rp.num_replicas) + protocol_results = {p: [] for p in PROTOCOLS_ORDER} + for protocol in PROTOCOLS_ORDER: + for round_params in rounds_params: + midfix_str = str(round_params) + protocol_results[protocol].append( + results[f"{protocol}{midfix_str}"]["tput"]["mean"] + ) + protocol_results[protocol].sort(reverse=True) + + xpos = 1 + for i in range(len(rounds_params)): + for protocol in PROTOCOLS_ORDER: + result = protocol_results[protocol][i] + label, color, hatch = PROTOCOLS_LABEL_COLOR_HATCH[protocol] + bar = plt.bar( + xpos, + result, + width=1, + color=color, + edgecolor="black", + linewidth=1.4, + label=label if i == 0 else None, + hatch=hatch, + ) + xpos += 1 + xpos += 1 + + ax = fig.axes[0] + ax.spines["top"].set_visible(False) + ax.spines["right"].set_visible(False) + + plt.tick_params(bottom=False) + + plt.xticks([2, 6, 10], [f"{10}%", f"{50}%", f"{100}%"]) + # plt.ylabel("Throughput (reqs/s)") + + plt.tight_layout() + + pdf_name = f"{odir}/exper-{EXPER_NAME}-write_ratio.pdf" + plt.savefig(pdf_name, bbox_inches=0) + plt.close() + print(f"Plotted: {pdf_name}") + + +def plot_major_ylabels(ylabels, odir): + matplotlib.rcParams.update( + { + "figure.figsize": (1.5, 2.2), + "font.size": 12, + } + ) + fig = plt.figure(f"Ylabels") + + assert len(ylabels) == 2 + + ax1 = plt.subplot(211) + plt.ylabel(ylabels[0]) + for spine in ax1.spines.values(): + spine.set_visible(False) + plt.tick_params(bottom=False, labelbottom=False, left=False, labelleft=False) + + ax2 = plt.subplot(212) + plt.ylabel(ylabels[1]) + for spine in ax2.spines.values(): + spine.set_visible(False) + plt.tick_params(bottom=False, labelbottom=False, left=False, labelleft=False) + + fig.subplots_adjust(left=0.5) + + fig.align_labels() + + pdf_name = f"{odir}/ylabels-{EXPER_NAME}.pdf" + plt.savefig(pdf_name, bbox_inches=0) + plt.close() + print(f"Plotted: {pdf_name}") + + +def plot_major_legend(handles, labels, odir): + matplotlib.rcParams.update( + { + "figure.figsize": (5.6, 0.5), + "font.size": 10, + "pdf.fonttype": 42, + } + ) + plt.figure("Legend") + + plt.axis("off") + + lgd = plt.legend( + handles, + labels, + handleheight=0.8, + handlelength=1.0, + loc="center", + bbox_to_anchor=(0.5, 0.5), + ncol=len(labels), + borderpad=0.3, + handletextpad=0.3, + columnspacing=0.9, + ) + for rec in lgd.get_texts(): + if "f=1" in rec.get_text(): + rec.set_fontstyle("italic") + # if "Crossword" in rec.get_text(): + # rec.set_fontweight("bold") + + pdf_name = f"{odir}/legend-{EXPER_NAME}.pdf" + plt.savefig(pdf_name, bbox_inches=0) + plt.close() + print(f"Plotted: {pdf_name}") + + +def plot_minor_legend(handles, labels, odir): + matplotlib.rcParams.update( + { + "figure.figsize": (4, 0.5), + "font.size": 10, + "pdf.fonttype": 42, + } + ) + plt.figure("Legend") + + plt.axis("off") + + lgd = plt.legend( + handles, + labels, + handleheight=0.8, + handlelength=1.2, + loc="center", + bbox_to_anchor=(0.5, 0.5), + ncol=len(labels), + borderpad=0.3, + handletextpad=0.3, + columnspacing=1.1, + ) + for rec in lgd.get_texts(): + if "RSPaxos" in rec.get_text(): + rec.set_fontstyle("italic") + # if "Crossword" in rec.get_text(): + # rec.set_fontweight("bold") + + pdf_name = f"{odir}/legend-{EXPER_NAME}-minor.pdf" + plt.savefig(pdf_name, bbox_inches=0) + plt.close() + print(f"Plotted: {pdf_name}") + + +if __name__ == "__main__": + utils.check_proper_cwd() + + parser = argparse.ArgumentParser(allow_abbrev=False) + parser.add_argument( + "-p", "--plot", action="store_true", help="if set, do the plotting phase" + ) + parser.add_argument( + "-o", + "--odir", + type=str, + default=f"{BASE_PATH}/{CLIENT_OUTPUT_FOLDER}/{EXPER_NAME}", + help=".out files directory", + ) + args = parser.parse_args() + + if not args.plot: + utils.check_enough_cpus() + + runlog_path = f"{BASE_PATH}/{RUNTIME_LOGS_FOLDER}/{EXPER_NAME}" + if not os.path.isdir(runlog_path): + os.system(f"mkdir -p {runlog_path}") + + utils.do_cargo_build(release=True) + + for round_params in ROUNDS_PARAMS: + print("Setting tc netem qdiscs...") + utils.clear_fs_cache() + utils.set_all_tc_qdisc_netems( + round_params.num_replicas, + SERVER_NETNS, + SERVER_DEV, + SERVER_IFB, + round_params.env_setting.delay, + round_params.env_setting.jitter, + round_params.env_setting.rate, + involve_ifb=True, + ) + + print("Running experiments...") + for protocol in PROTOCOLS: + if round_params.paxos_only and "Raft" in protocol: + continue + bench_round(protocol, round_params) + + print("Clearing tc netem qdiscs...") + utils.kill_all_local_procs() + utils.clear_all_tc_qdisc_netems( + round_params.num_replicas, SERVER_NETNS, SERVER_DEV, SERVER_IFB + ) + + state_path = f"{BASE_PATH}/{SERVER_STATES_FOLDER}/{EXPER_NAME}" + utils.remove_files_in_dir(state_path) + + else: + results = collect_outputs(args.odir) + print_results(results) + + single_rounds = [ + rp for rp in ROUNDS_PARAMS if any(map(lambda t: "single" in t, rp.tags)) + ] + plot_single_rounds_results(results, single_rounds, args.odir) + + cluster_rounds = [ + rp for rp in ROUNDS_PARAMS if any(map(lambda t: "cluster" in t, rp.tags)) + ] + cluster_rounds.sort(key=lambda rp: rp.num_replicas) + handles, labels = plot_cluster_size_results(results, cluster_rounds, args.odir) + plot_minor_legend(handles, labels, args.odir) + + ratio_rounds = [ + rp for rp in ROUNDS_PARAMS if any(map(lambda t: "ratio" in t, rp.tags)) + ] + ratio_rounds.sort(key=lambda rp: rp.put_ratio) + plot_write_ratio_results(results, ratio_rounds, args.odir) diff --git a/scripts/bench_failover.py b/scripts/bench_failover.py index b3e93ab3..dc69167e 100644 --- a/scripts/bench_failover.py +++ b/scripts/bench_failover.py @@ -246,7 +246,7 @@ def plot_results(results, odir): matplotlib.rcParams.update( { "figure.figsize": (6, 3), - "font.size": 10, + "font.size": 13, } ) fig = plt.figure("Exper") @@ -305,8 +305,8 @@ def draw_failure_indicator(x, t, toffx): annotation_clip=False, ) - draw_failure_indicator(FAIL1_SECS - PLOT_SECS_BEGIN, "Leader fails", 0) - draw_failure_indicator(FAIL2_SECS - PLOT_SECS_BEGIN, "New leader fails", 0) + draw_failure_indicator(FAIL1_SECS - PLOT_SECS_BEGIN + 2, "Leader fails", 12) + draw_failure_indicator(FAIL2_SECS - PLOT_SECS_BEGIN + 2, "New leader fails", 12) # recovery time indicators (hardcoded!) def draw_recovery_indicator(x, y, w, t, toffx, toffy): @@ -342,11 +342,11 @@ def draw_recovery_indicator(x, y, w, t, toffx, toffy): ha="center", textcoords="offset points", color="gray", - fontsize=8, + fontsize=10, ) - draw_recovery_indicator(19, 135, 3.2, "bounded", 1, 10) - draw_recovery_indicator(59.2, 135, 3.2, "bounded", 1, 10) + draw_recovery_indicator(19, 135, 3.6, "small\ngossip\ngap", 1, 11) + draw_recovery_indicator(59.2, 135, 3.6, "small\ngossip\ngap", 1, 11) plt.vlines( 63.5, @@ -357,13 +357,13 @@ def draw_recovery_indicator(x, y, w, t, toffx, toffy): linewidth=0.8, ) - draw_recovery_indicator(23, 40, 6.3, None, None, None) - draw_recovery_indicator(25.2, 54, 8.6, "unbounded", 3, 9) - draw_recovery_indicator(67.5, 54, 11, "unbounded", 3, 9) + draw_recovery_indicator(23, 50, 7, None, None, None) + draw_recovery_indicator(25.2, 62, 9.2, "state-send\nsnapshot int.", 4.6, -53) + draw_recovery_indicator(67.5, 56, 11.5, "state-send\nsnapshot int.", 2.9, -47) # configuration indicators def draw_config_indicator(x, y, c, q, color, fb=False, unavail=False): - t = f"" + t = f"[c={c},q={q}]" if fb: t += "\nfb. ok" if unavail: @@ -375,23 +375,23 @@ def draw_config_indicator(x, y, c, q, color, fb=False, unavail=False): ha="center", textcoords="offset points", color=color, - fontsize=8, + fontsize=11, ) - draw_config_indicator(5, 228, 1, 5, "steelblue") - draw_config_indicator(5, 202, 1, 4, "red") - draw_config_indicator(5, 187, 1, 4, "peru") - draw_config_indicator(5, 110, 3, 3, "forestgreen") + draw_config_indicator(4.8, 228, 1, 5, "steelblue") + draw_config_indicator(4.8, 198, 1, 4, "red") + draw_config_indicator(4.8, 175, 1, 4, "peru") + draw_config_indicator(4.8, 112, 3, 3, "forestgreen") - draw_config_indicator(45, 148, 2, 4, "steelblue") - draw_config_indicator(45, 202, 1, 4, "red") - draw_config_indicator(45, 71, 3, 3, "peru", fb=True) - draw_config_indicator(45, 110, 3, 3, "forestgreen") + draw_config_indicator(44.8, 148, 2, 4, "steelblue") + draw_config_indicator(44.8, 198, 1, 4, "red") + draw_config_indicator(44.8, 58, 3, 3, "peru", fb=True) + draw_config_indicator(44.8, 112, 3, 3, "forestgreen") - draw_config_indicator(89, 125, 3, 3, "steelblue") - draw_config_indicator(89, 9, 1, 4, "red", unavail=True) - draw_config_indicator(89, 85, 3, 3, "peru") - draw_config_indicator(89, 110, 3, 3, "forestgreen") + draw_config_indicator(89.5, 135, 3, 3, "steelblue") + draw_config_indicator(89.5, 9, 1, 4, "red", unavail=True) + draw_config_indicator(89.5, 78, 3, 3, "peru") + draw_config_indicator(89.5, 112, 3, 3, "forestgreen") ax = fig.axes[0] ax.spines["top"].set_visible(False) @@ -419,6 +419,7 @@ def plot_legend(handles, labels, odir): { "figure.figsize": (1.8, 1.3), "font.size": 10, + "pdf.fonttype": 42, } ) plt.figure("Legend") @@ -461,6 +462,8 @@ def plot_legend(handles, labels, odir): args = parser.parse_args() if not args.plot: + utils.check_enough_cpus() + runlog_path = f"{BASE_PATH}/{RUNTIME_LOGS_FOLDER}/{EXPER_NAME}" if not os.path.isdir(runlog_path): os.system(f"mkdir -p {runlog_path}") @@ -468,6 +471,7 @@ def plot_legend(handles, labels, odir): utils.do_cargo_build(release=True) print("Setting tc netem qdiscs...") + utils.clear_fs_cache() utils.set_all_tc_qdisc_netems( NUM_REPLICAS, SERVER_NETNS, @@ -488,6 +492,9 @@ def plot_legend(handles, labels, odir): NUM_REPLICAS, SERVER_NETNS, SERVER_DEV, SERVER_IFB ) + state_path = f"{BASE_PATH}/{SERVER_STATES_FOLDER}/{EXPER_NAME}" + utils.remove_files_in_dir(state_path) + else: results = collect_outputs(args.odir) print_results(results) diff --git a/models/bench_rs_coding.py b/scripts/bench_rs_coding.py similarity index 96% rename from models/bench_rs_coding.py rename to scripts/bench_rs_coding.py index 96f3568c..cb64422a 100644 --- a/models/bench_rs_coding.py +++ b/scripts/bench_rs_coding.py @@ -73,8 +73,9 @@ def print_bench_results(results): def plot_bench_results(results, output_dir): matplotlib.rcParams.update( { - "figure.figsize": (3.6, 1.6), + "figure.figsize": (4, 1.5), "font.size": 10, + "pdf.fonttype": 42, } ) fig = plt.figure("Bench") @@ -102,10 +103,11 @@ def plot_bench_results(results, output_dir): colors = cmap(np.linspace(1.0 - (vmax - vmin) / float(vmax), 0.6, cmap.N)) new_cmap = matplotlib.colors.LinearSegmentedColormap.from_list("Reds", colors) - plt.imshow(data, cmap=new_cmap, aspect="equal", norm="log") + plt.imshow(data, cmap=new_cmap, aspect=0.6, norm="log") plt.colorbar( aspect=12, - shrink=0.85, + shrink=0.7, + anchor=(0.0, 0.25), ticks=[vmin, 1, 10], format="{x:.0f}ms", ) diff --git a/scripts/bench_unbalanced.py b/scripts/bench_unbalanced.py index 73bc029a..fa6f9c72 100644 --- a/scripts/bench_unbalanced.py +++ b/scripts/bench_unbalanced.py @@ -212,11 +212,15 @@ def collect_outputs(odir): ) sd, sp, sj, sm = 20, 0, 0, 1 - tput_list = utils.list_smoothing(result["tput_sum"], sd, sp, sj, sm) + tput_mean_list = utils.list_smoothing(result["tput_sum"], sd, sp, sj, sm) + tput_stdev_list = result["tput_stdev"] results[f"{protocol}{midfix_str}"] = { - "mean": sum(tput_list) / len(tput_list), - "stdev": statistics.stdev(tput_list), + "mean": sum(tput_mean_list) / len(tput_mean_list), + "stdev": ( + sum(map(lambda s: s**2, tput_stdev_list)) / len(tput_stdev_list) + ) + ** 0.5, } return results @@ -231,8 +235,9 @@ def print_results(results): def plot_results(results, odir): matplotlib.rcParams.update( { - "figure.figsize": (4, 3), - "font.size": 10, + "figure.figsize": (3.6, 2.5), + "font.size": 12, + "pdf.fonttype": 42, } ) fig = plt.figure("Exper") @@ -254,8 +259,8 @@ def plot_results(results, odir): "CRaft.2.b": 4, "Crossword.2.b": 5, "Crossword.2.u": 6, - "RSPaxos.1.b": 8.2, - "CRaft.1.b": 9.2, + "RSPaxos.1.b": 8, + "CRaft.1.b": 9, } PROTOCOLS_LABEL_COLOR_HATCH = { "MultiPaxos.2.b": ("MultiPaxos", "darkgray", None), @@ -291,7 +296,7 @@ def plot_results(results, odir): ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) - plt.xticks([3.5, 8.7], ["f=2", "f=1"]) + plt.xticks([3.5, 8.3], ["f=2", "f=1"]) plt.tick_params(bottom=False) plt.ylabel("Throughput (reqs/s)") @@ -322,7 +327,8 @@ def plot_legend(handles, labels, odir): lgd = plt.legend( handles, labels, - handleheight=1.2, + handleheight=0.9, + handlelength=1.3, loc="center", bbox_to_anchor=(0.5, 0.5), ) @@ -355,6 +361,8 @@ def plot_legend(handles, labels, odir): args = parser.parse_args() if not args.plot: + utils.check_enough_cpus() + runlog_path = f"{BASE_PATH}/{RUNTIME_LOGS_FOLDER}/{EXPER_NAME}" if not os.path.isdir(runlog_path): os.system(f"mkdir -p {runlog_path}") @@ -362,6 +370,7 @@ def plot_legend(handles, labels, odir): utils.do_cargo_build(release=True) print("Setting tc netem qdiscs...") + utils.clear_fs_cache() utils.set_all_tc_qdisc_netems( NUM_REPLICAS, SERVER_NETNS, @@ -383,6 +392,9 @@ def plot_legend(handles, labels, odir): NUM_REPLICAS, SERVER_NETNS, SERVER_DEV, SERVER_IFB ) + state_path = f"{BASE_PATH}/{SERVER_STATES_FOLDER}/{EXPER_NAME}" + utils.remove_files_in_dir(state_path) + else: results = collect_outputs(args.odir) print_results(results) diff --git a/scripts/common_utils.py b/scripts/common_utils.py index 3915b8ac..1a244014 100644 --- a/scripts/common_utils.py +++ b/scripts/common_utils.py @@ -3,6 +3,7 @@ import subprocess import statistics import random +import multiprocessing def parse_comma_separated(l): @@ -35,6 +36,25 @@ def check_proper_cwd(): sys.exit(1) +def remove_files_in_dir(path): + if not os.path.isdir(path): + raise RuntimeError(f"{path} is not a directory") + for name in os.listdir(path): + child = os.path.join(path, name) + if not os.path.isfile(child): + raise RuntimeError(f"{child} is not a regular file") + os.unlink(child) + + +def check_enough_cpus(): + EXPECTED_CPUS = 40 + cpus = multiprocessing.cpu_count() + if cpus < EXPECTED_CPUS: + print( + f"WARN: benchmarking scripts expect >= {EXPECTED_CPUS} CPUs, found {cpus}" + ) + + def do_cargo_build(release): print("Building everything...") cmd = ["cargo", "build", "--workspace"] @@ -53,9 +73,13 @@ def kill_all_matching(name): def kill_all_local_procs(): # print("Killing all local procs...") - cmd = ["sudo", "./scripts/kill_local_procs.sh"] - proc = subprocess.Popen(cmd) - proc.wait() + cmd = "sudo ./scripts/kill_local_procs.sh" + os.system(cmd) + + +def clear_fs_cache(): + cmd = 'sudo bash -c "echo 3 > /proc/sys/vm/drop_caches"' + os.system(cmd) def run_process( diff --git a/scripts/local_clients.py b/scripts/local_clients.py index 67428de6..2ab4b5b9 100644 --- a/scripts/local_clients.py +++ b/scripts/local_clients.py @@ -3,6 +3,7 @@ import argparse import subprocess import multiprocessing +import math sys.path.append(os.path.dirname(os.path.realpath(__file__))) import common_utils as utils @@ -30,6 +31,7 @@ "length_s", "normal_stdev_ratio", "unif_interval_ms", + "unif_upper_bound", ], "tester": ["test_name", "keep_going", "logger_on"], "mess": ["pause", "resume"], @@ -38,12 +40,24 @@ def run_process_pinned(i, cmd, capture_stdout=False, cores_per_proc=0): cpu_list = None - if cores_per_proc > 0: - # pin client cores from last CPU down + if cores_per_proc != 0: + # parse cores_per_proc setting + if cores_per_proc != int(cores_per_proc) and ( + cores_per_proc > 1 or cores_per_proc < -1 + ): + raise ValueError(f"invalid cores_per_proc {cores_per_proc}") num_cpus = multiprocessing.cpu_count() - core_end = num_cpus - 1 - i * cores_per_proc - core_start = core_end - cores_per_proc + 1 - assert core_start >= 0 + if cores_per_proc < 0: + # negative means starting from CPU 0 (instead from last) + cores_per_proc *= -1 + core_start = math.floor(i * cores_per_proc) + core_end = math.ceil(core_start + cores_per_proc - 1) + assert core_end < num_cpus + else: + # else pin client cores from last CPU down + core_end = math.ceil(num_cpus - 1 - i * cores_per_proc) + core_start = math.floor(core_end - cores_per_proc + 1) + assert core_start >= 0 cpu_list = f"{core_start}-{core_end}" return utils.run_process(cmd, capture_stdout=capture_stdout, cpu_list=cpu_list) @@ -150,7 +164,7 @@ def run_clients( "-c", "--config", type=str, help="protocol-specific TOML config string" ) parser.add_argument( - "--pin_cores", type=int, default=0, help="if > 0, set CPU cores affinity" + "--pin_cores", type=float, default=0, help="if not 0, set CPU cores affinity" ) parser.add_argument( "--use_veth", action="store_true", help="if set, use netns and veth setting" @@ -195,6 +209,9 @@ def run_clients( parser_bench.add_argument( "--unif_interval_ms", type=int, help="uniform dist usage interval" ) + parser_bench.add_argument( + "--unif_upper_bound", type=int, help="uniform dist upper bound" + ) parser_bench.add_argument( "--file_prefix", type=str, diff --git a/src/protocols/craft/mod.rs b/src/protocols/craft/mod.rs index a63e73b5..61ecc160 100644 --- a/src/protocols/craft/mod.rs +++ b/src/protocols/craft/mod.rs @@ -80,6 +80,11 @@ pub struct ReplicaConfigCRaft { pub perf_storage_b: u64, pub perf_network_a: u64, pub perf_network_b: u64, + + /// Simulate local read lease implementation? + // TODO: actual read lease impl later? (won't affect anything about + // evalutaion results though) + pub sim_read_lease: bool, } #[allow(clippy::derivable_impls)] @@ -102,6 +107,7 @@ impl Default for ReplicaConfigCRaft { perf_storage_b: 0, perf_network_a: 0, perf_network_b: 0, + sim_read_lease: false, } } } @@ -417,7 +423,8 @@ impl GenericReplica for CRaftReplica { snapshot_path, snapshot_interval_s, fault_tolerance, msg_chunk_size, perf_storage_a, perf_storage_b, - perf_network_a, perf_network_b)?; + perf_network_a, perf_network_b, + sim_read_lease)?; if config.batch_interval_ms == 0 { return logged_err!( id; diff --git a/src/protocols/craft/request.rs b/src/protocols/craft/request.rs index b91cd8f8..59ccd928 100644 --- a/src/protocols/craft/request.rs +++ b/src/protocols/craft/request.rs @@ -3,14 +3,14 @@ use super::*; use crate::utils::{SummersetError, Bitmap, RSCodeword}; -use crate::server::{ApiRequest, ApiReply, LogAction}; +use crate::server::{ApiRequest, ApiReply, LogAction, Command, CommandResult}; // CRaftReplica client requests entrance impl CRaftReplica { /// Handler of client request batch chan recv. pub fn handle_req_batch( &mut self, - req_batch: ReqBatch, + mut req_batch: ReqBatch, ) -> Result<(), SummersetError> { let batch_size = req_batch.len(); debug_assert!(batch_size > 0); @@ -42,6 +42,42 @@ impl CRaftReplica { return Ok(()); } + // if simulating read leases, extract all the reads and immediately + // reply to them with a dummy value + // TODO: only for benchmarking purposes + if self.config.sim_read_lease { + for (client, req) in &req_batch { + if let ApiRequest::Req { + id: req_id, + cmd: Command::Get { .. }, + } = req + { + self.external_api.send_reply( + ApiReply::Reply { + id: *req_id, + result: Some(CommandResult::Get { value: None }), + redirect: None, + }, + *client, + )?; + pf_trace!(self.id; "replied -> client {} for read-only cmd", client); + } + } + + req_batch.retain(|(_, req)| { + !matches!( + req, + ApiRequest::Req { + cmd: Command::Get { .. }, + .. + } + ) + }); + if req_batch.is_empty() { + return Ok(()); + } + } + // compute the complete Reed-Solomon codeword for the batch data let mut reqs_cw = RSCodeword::from_data( req_batch, diff --git a/src/protocols/crossword/leadership.rs b/src/protocols/crossword/leadership.rs index fd447035..b95937dd 100644 --- a/src/protocols/crossword/leadership.rs +++ b/src/protocols/crossword/leadership.rs @@ -88,6 +88,9 @@ impl CrosswordReplica { .map(|(s, i)| (self.start_slot + s, i)) .skip(self.exec_bar - self.start_slot) { + if inst.status == Status::Null { + continue; + } if inst.status < Status::Executed { inst.external = true; // so replies to clients can be triggered } @@ -136,6 +139,7 @@ impl CrosswordReplica { PeerMsg::Heartbeat { id: self.next_hb_id, ballot: self.bal_max_seen, + commit_bar: self.commit_bar, exec_bar: self.exec_bar, snap_bar: self.snap_bar, }, @@ -176,6 +180,7 @@ impl CrosswordReplica { PeerMsg::Heartbeat { id: self.next_hb_id, ballot: self.bal_max_seen, + commit_bar: self.commit_bar, exec_bar: self.exec_bar, snap_bar: self.snap_bar, }, @@ -215,6 +220,7 @@ impl CrosswordReplica { PeerMsg::Heartbeat { id: self.next_hb_id, ballot: self.bal_max_seen, + commit_bar: self.commit_bar, exec_bar: self.exec_bar, snap_bar: self.snap_bar, }, @@ -259,6 +265,7 @@ impl CrosswordReplica { self.id, self.next_hb_id, self.bal_max_seen, + self.commit_bar, self.exec_bar, self.snap_bar, )?; @@ -300,6 +307,7 @@ impl CrosswordReplica { peer: ReplicaId, hb_id: HeartbeatId, ballot: Ballot, + commit_bar: usize, exec_bar: usize, snap_bar: usize, ) -> Result<(), SummersetError> { @@ -328,6 +336,7 @@ impl CrosswordReplica { PeerMsg::Heartbeat { id: hb_id, ballot: self.bal_max_seen, + commit_bar: self.commit_bar, exec_bar: self.exec_bar, snap_bar: self.snap_bar, }, @@ -345,6 +354,46 @@ impl CrosswordReplica { return Ok(()); } + // all slots up to received commit_bar are safe to commit; submit their + // commands for execution + if commit_bar > self.commit_bar { + while self.start_slot + self.insts.len() < commit_bar { + self.insts.push(self.null_instance()?); + } + + let mut commit_cnt = 0; + for slot in self.commit_bar..commit_bar { + let inst = &mut self.insts[slot - self.start_slot]; + if inst.status < Status::Accepting { + break; + } else if inst.status >= Status::Committed { + continue; + } + + // mark this instance as committed + inst.status = Status::Committed; + pf_debug!(self.id; "committed instance at slot {} bal {}", + slot, inst.bal); + + // record commit event + self.storage_hub.submit_action( + Self::make_log_action_id(slot, Status::Committed), + LogAction::Append { + entry: WalEntry::CommitSlot { slot }, + sync: self.config.logger_sync, + }, + )?; + pf_trace!(self.id; "submitted CommitSlot log action for slot {} bal {}", + slot, inst.bal); + + commit_cnt += 1; + } + + if commit_cnt > 0 { + pf_trace!(self.id; "heartbeat commit <- {} < slot {}", peer, commit_bar); + } + } + if peer != self.id { // update peer_exec_bar if larger then known; if all servers' // exec_bar (including myself) have passed a slot, that slot @@ -500,6 +549,7 @@ impl CrosswordReplica { PeerMsg::Heartbeat { id: self.next_hb_id, ballot: self.bal_max_seen, + commit_bar: self.commit_bar, exec_bar: self.exec_bar, snap_bar: self.snap_bar, }, diff --git a/src/protocols/crossword/messages.rs b/src/protocols/crossword/messages.rs index fbc6f675..86978f13 100644 --- a/src/protocols/crossword/messages.rs +++ b/src/protocols/crossword/messages.rs @@ -392,61 +392,12 @@ impl CrosswordReplica { )?; pf_trace!(self.id; "submitted CommitSlot log action for slot {} bal {}", slot, inst.bal); - - // send Commit messages to all peers - self.transport_hub - .bcast_msg(PeerMsg::Commit { slot }, None)?; - pf_trace!(self.id; "broadcast Commit messages for slot {} bal {}", - slot, ballot); } } Ok(()) } - /// Handler of Commit message from leader. - fn handle_msg_commit( - &mut self, - peer: ReplicaId, - slot: usize, - ) -> Result<(), SummersetError> { - if slot < self.start_slot { - return Ok(()); // ignore if slot index outdated - } - pf_trace!(self.id; "received Commit <- {} for slot {}", peer, slot); - - self.kickoff_hb_hear_timer()?; - - // locate instance in memory, filling in null instances if needed - while self.start_slot + self.insts.len() <= slot { - self.insts.push(self.null_instance()?); - } - let inst = &mut self.insts[slot - self.start_slot]; - - // ignore spurious duplications - if inst.status != Status::Accepting { - return Ok(()); - } - - // mark this instance as committed - inst.status = Status::Committed; - pf_debug!(self.id; "committed instance at slot {} bal {}", - slot, inst.bal); - - // record commit event - self.storage_hub.submit_action( - Self::make_log_action_id(slot, Status::Committed), - LogAction::Append { - entry: WalEntry::CommitSlot { slot }, - sync: self.config.logger_sync, - }, - )?; - pf_trace!(self.id; "submitted CommitSlot log action for slot {} bal {}", - slot, inst.bal); - - Ok(()) - } - /// Handler of Reconstruct message from leader or gossiping peer. fn handle_msg_reconstruct( &mut self, @@ -603,7 +554,6 @@ impl CrosswordReplica { } => { self.handle_msg_accept_reply(peer, slot, ballot, size, reply_ts) } - PeerMsg::Commit { slot } => self.handle_msg_commit(peer, slot), PeerMsg::Reconstruct { slots_excl } => { self.handle_msg_reconstruct(peer, slots_excl) } @@ -613,9 +563,12 @@ impl CrosswordReplica { PeerMsg::Heartbeat { id: hb_id, ballot, + commit_bar, exec_bar, snap_bar, - } => self.heard_heartbeat(peer, hb_id, ballot, exec_bar, snap_bar), + } => self.heard_heartbeat( + peer, hb_id, ballot, commit_bar, exec_bar, snap_bar, + ), } } } diff --git a/src/protocols/crossword/mod.rs b/src/protocols/crossword/mod.rs index f2dddec0..a64b30dc 100644 --- a/src/protocols/crossword/mod.rs +++ b/src/protocols/crossword/mod.rs @@ -126,6 +126,11 @@ pub struct ReplicaConfigCrossword { /// Recording performance breakdown statistics? pub record_breakdown: bool, + + /// Simulate local read lease implementation? + // TODO: actual read lease impl later? (won't affect anything about + // evalutaion results though) + pub sim_read_lease: bool, } #[allow(clippy::derivable_impls)] @@ -162,6 +167,7 @@ impl Default for ReplicaConfigCrossword { perf_network_a: 0, perf_network_b: 0, record_breakdown: false, + sim_read_lease: false, } } } @@ -314,9 +320,6 @@ pub enum PeerMsg { reply_ts: Option, }, - /// Commit notification from leader to replicas. - Commit { slot: usize }, - /// Reconstruction read from new leader to replicas. Reconstruct { /// List of slots and correspondingly the shards to exclude. @@ -333,6 +336,9 @@ pub enum PeerMsg { Heartbeat { id: HeartbeatId, ballot: Ballot, + /// For notifying followers about safe-to-commit slots (in a bit + /// conservative way). + commit_bar: usize, /// For leader step-up as well as conservative snapshotting purpose. exec_bar: usize, /// For conservative snapshotting purpose. @@ -639,7 +645,7 @@ impl GenericReplica for CrosswordReplica { b_to_d_threshold, perf_storage_a, perf_storage_b, perf_network_a, perf_network_b, - record_breakdown)?; + record_breakdown, sim_read_lease)?; if config.batch_interval_ms == 0 { return logged_err!( id; diff --git a/src/protocols/crossword/request.rs b/src/protocols/crossword/request.rs index 7436ab34..f946a652 100644 --- a/src/protocols/crossword/request.rs +++ b/src/protocols/crossword/request.rs @@ -5,14 +5,14 @@ use std::collections::HashMap; use super::*; use crate::utils::{SummersetError, Bitmap, RSCodeword}; -use crate::server::{ApiRequest, ApiReply, LogAction}; +use crate::server::{ApiRequest, ApiReply, LogAction, Command, CommandResult}; // CrosswordReplica client requests entrance impl CrosswordReplica { /// Handler of client request batch chan recv. pub fn handle_req_batch( &mut self, - req_batch: ReqBatch, + mut req_batch: ReqBatch, ) -> Result<(), SummersetError> { let batch_size = req_batch.len(); debug_assert!(batch_size > 0); @@ -44,6 +44,42 @@ impl CrosswordReplica { return Ok(()); } + // if simulating read leases, extract all the reads and immediately + // reply to them with a dummy value + // TODO: only for benchmarking purposes + if self.config.sim_read_lease { + for (client, req) in &req_batch { + if let ApiRequest::Req { + id: req_id, + cmd: Command::Get { .. }, + } = req + { + self.external_api.send_reply( + ApiReply::Reply { + id: *req_id, + result: Some(CommandResult::Get { value: None }), + redirect: None, + }, + *client, + )?; + pf_trace!(self.id; "replied -> client {} for read-only cmd", client); + } + } + + req_batch.retain(|(_, req)| { + !matches!( + req, + ApiRequest::Req { + cmd: Command::Get { .. }, + .. + } + ) + }); + if req_batch.is_empty() { + return Ok(()); + } + } + // [for perf breakdown] let slot = self.first_null_slot()?; if self.bal_prepared > 0 { diff --git a/src/protocols/multipaxos/durability.rs b/src/protocols/multipaxos/durability.rs index 6d2506dc..d3b65ec2 100644 --- a/src/protocols/multipaxos/durability.rs +++ b/src/protocols/multipaxos/durability.rs @@ -136,21 +136,6 @@ impl MultiPaxosReplica { } } - // if there are hole(s) between current commit_bar and newly committed - // slot, ask the leader to re-send Accept messages for those slots - if slot > self.commit_bar && !self.is_leader() { - if let Some(leader) = self.leader { - let holes: Vec = (self.commit_bar..slot).collect(); - self.transport_hub.send_msg( - PeerMsg::FillHoles { - slots: holes.clone(), - }, - leader, - )?; - pf_trace!(self.id; "sent FillHoles -> {} slots {:?}", leader, holes); - } - } - Ok(()) } diff --git a/src/protocols/multipaxos/leadership.rs b/src/protocols/multipaxos/leadership.rs index ea680567..1c6e530b 100644 --- a/src/protocols/multipaxos/leadership.rs +++ b/src/protocols/multipaxos/leadership.rs @@ -76,7 +76,7 @@ impl MultiPaxosReplica { .map(|(s, i)| (self.start_slot + s, i)) .skip(self.exec_bar - self.start_slot) { - if inst.status < Status::Executed { + if inst.status > Status::Null && inst.status < Status::Executed { inst.external = true; // so replies to clients can be triggered if inst.status == Status::Committed { continue; @@ -123,6 +123,7 @@ impl MultiPaxosReplica { self.transport_hub.bcast_msg( PeerMsg::Heartbeat { ballot: self.bal_max_seen, + commit_bar: self.commit_bar, exec_bar: self.exec_bar, snap_bar: self.snap_bar, }, @@ -141,6 +142,7 @@ impl MultiPaxosReplica { self.transport_hub.bcast_msg( PeerMsg::Heartbeat { ballot: self.bal_max_seen, + commit_bar: self.commit_bar, exec_bar: self.exec_bar, snap_bar: self.snap_bar, }, @@ -177,6 +179,7 @@ impl MultiPaxosReplica { self.heard_heartbeat( self.id, self.bal_max_seen, + self.commit_bar, self.exec_bar, self.snap_bar, )?; @@ -210,6 +213,7 @@ impl MultiPaxosReplica { &mut self, peer: ReplicaId, ballot: Ballot, + commit_bar: usize, exec_bar: usize, snap_bar: usize, ) -> Result<(), SummersetError> { @@ -229,6 +233,7 @@ impl MultiPaxosReplica { self.transport_hub.send_msg( PeerMsg::Heartbeat { ballot: self.bal_max_seen, + commit_bar: self.commit_bar, exec_bar: self.exec_bar, snap_bar: self.snap_bar, }, @@ -246,6 +251,46 @@ impl MultiPaxosReplica { return Ok(()); } + // all slots up to received commit_bar are safe to commit; submit their + // commands for execution + if commit_bar > self.commit_bar { + while self.start_slot + self.insts.len() < commit_bar { + self.insts.push(self.null_instance()); + } + + let mut commit_cnt = 0; + for slot in self.commit_bar..commit_bar { + let inst = &mut self.insts[slot - self.start_slot]; + if inst.status < Status::Accepting { + break; + } else if inst.status >= Status::Committed { + continue; + } + + // mark this instance as committed + inst.status = Status::Committed; + pf_debug!(self.id; "committed instance at slot {} bal {}", + slot, inst.bal); + + // record commit event + self.storage_hub.submit_action( + Self::make_log_action_id(slot, Status::Committed), + LogAction::Append { + entry: WalEntry::CommitSlot { slot }, + sync: self.config.logger_sync, + }, + )?; + pf_trace!(self.id; "submitted CommitSlot log action for slot {} bal {}", + slot, inst.bal); + + commit_cnt += 1; + } + + if commit_cnt > 0 { + pf_trace!(self.id; "heartbeat commit <- {} < slot {}", peer, commit_bar); + } + } + if peer != self.id { // update peer_exec_bar if larger then known; if all servers' // exec_bar (including myself) have passed a slot, that slot diff --git a/src/protocols/multipaxos/messages.rs b/src/protocols/multipaxos/messages.rs index 828a637a..5b8395ba 100644 --- a/src/protocols/multipaxos/messages.rs +++ b/src/protocols/multipaxos/messages.rs @@ -248,107 +248,6 @@ impl MultiPaxosReplica { )?; pf_trace!(self.id; "submitted CommitSlot log action for slot {} bal {}", slot, inst.bal); - - // send Commit messages to all peers - self.transport_hub - .bcast_msg(PeerMsg::Commit { slot }, None)?; - pf_trace!(self.id; "broadcast Commit messages for slot {} bal {}", - slot, ballot); - } - } - - Ok(()) - } - - /// Handler of Commit message from leader. - fn handle_msg_commit( - &mut self, - peer: ReplicaId, - slot: usize, - ) -> Result<(), SummersetError> { - if slot < self.start_slot { - return Ok(()); // ignore if slot index outdated - } - pf_trace!(self.id; "received Commit <- {} for slot {}", peer, slot); - - self.kickoff_hb_hear_timer()?; - - // locate instance in memory, filling in null instances if needed - while self.start_slot + self.insts.len() <= slot { - self.insts.push(self.null_instance()); - } - let inst = &mut self.insts[slot - self.start_slot]; - - // ignore spurious duplications - if inst.status != Status::Accepting { - return Ok(()); - } - - // mark this instance as committed - inst.status = Status::Committed; - pf_debug!(self.id; "committed instance at slot {} bal {}", - slot, inst.bal); - - // record commit event - self.storage_hub.submit_action( - Self::make_log_action_id(slot, Status::Committed), - LogAction::Append { - entry: WalEntry::CommitSlot { slot }, - sync: self.config.logger_sync, - }, - )?; - pf_trace!(self.id; "submitted CommitSlot log action for slot {} bal {}", - slot, inst.bal); - - Ok(()) - } - - /// Handler of FillHoles message from a lagging peer. - fn handle_msg_fill_holes( - &mut self, - peer: ReplicaId, - slots: Vec, - ) -> Result<(), SummersetError> { - if !self.is_leader() { - return Ok(()); - } - pf_trace!(self.id; "received FillHoles <- {} for slots {:?}", peer, slots); - - let mut chunk_cnt = 0; - for slot in slots { - if slot < self.start_slot { - continue; - } else if slot >= self.start_slot + self.insts.len() { - break; - } - let inst = &self.insts[slot - self.start_slot]; - - if inst.status >= Status::Committed { - // re-send Accept message for this slot - self.transport_hub.send_msg( - PeerMsg::Accept { - slot, - ballot: self.bal_prepared, - reqs: inst.reqs.clone(), - }, - peer, - )?; - pf_trace!(self.id; "sent Accept -> {} for slot {} bal {}", - peer, slot, self.bal_prepared); - chunk_cnt += 1; - - // inject heartbeats in the middle to keep peers happy - if chunk_cnt >= self.config.msg_chunk_size { - self.transport_hub.bcast_msg( - PeerMsg::Heartbeat { - ballot: self.bal_max_seen, - exec_bar: self.exec_bar, - snap_bar: self.snap_bar, - }, - None, - )?; - chunk_cnt = 0; - } } } @@ -378,15 +277,13 @@ impl MultiPaxosReplica { ballot, reply_ts, } => self.handle_msg_accept_reply(peer, slot, ballot, reply_ts), - PeerMsg::Commit { slot } => self.handle_msg_commit(peer, slot), - PeerMsg::FillHoles { slots } => { - self.handle_msg_fill_holes(peer, slots) - } PeerMsg::Heartbeat { ballot, + commit_bar, exec_bar, snap_bar, - } => self.heard_heartbeat(peer, ballot, exec_bar, snap_bar), + } => self + .heard_heartbeat(peer, ballot, commit_bar, exec_bar, snap_bar), } } } diff --git a/src/protocols/multipaxos/mod.rs b/src/protocols/multipaxos/mod.rs index 6a6ed4fc..d3d9049d 100644 --- a/src/protocols/multipaxos/mod.rs +++ b/src/protocols/multipaxos/mod.rs @@ -84,6 +84,11 @@ pub struct ReplicaConfigMultiPaxos { /// Recording performance breakdown statistics? pub record_breakdown: bool, + + /// Simulate local read lease implementation? + // TODO: actual read lease impl later? (won't affect anything about + // evalutaion results though) + pub sim_read_lease: bool, } #[allow(clippy::derivable_impls)] @@ -106,6 +111,7 @@ impl Default for ReplicaConfigMultiPaxos { perf_network_a: 0, perf_network_b: 0, record_breakdown: false, + sim_read_lease: false, } } } @@ -247,16 +253,12 @@ pub enum PeerMsg { reply_ts: Option, }, - /// Commit notification from leader to replicas. - Commit { slot: usize }, - - /// Request by a lagging replica to leader asking to re-send Accepts for - /// missing holes - FillHoles { slots: Vec }, - /// Leader activity heartbeat. Heartbeat { ballot: Ballot, + /// For notifying followers about safe-to-commit slots (in a bit + /// conservative way). + commit_bar: usize, /// For leader step-up as well as conservative snapshotting purpose. exec_bar: usize, /// For conservative snapshotting purpose. @@ -482,7 +484,7 @@ impl GenericReplica for MultiPaxosReplica { msg_chunk_size, perf_storage_a, perf_storage_b, perf_network_a, perf_network_b, - record_breakdown)?; + record_breakdown, sim_read_lease)?; if config.batch_interval_ms == 0 { return logged_err!( id; diff --git a/src/protocols/multipaxos/request.rs b/src/protocols/multipaxos/request.rs index 0f08be44..25904c6d 100644 --- a/src/protocols/multipaxos/request.rs +++ b/src/protocols/multipaxos/request.rs @@ -3,14 +3,14 @@ use super::*; use crate::utils::{SummersetError, Bitmap}; -use crate::server::{ApiRequest, ApiReply, LogAction}; +use crate::server::{ApiRequest, ApiReply, LogAction, Command, CommandResult}; // MultiPaxosReplica client requests entrance impl MultiPaxosReplica { /// Handler of client request batch chan recv. pub fn handle_req_batch( &mut self, - req_batch: ReqBatch, + mut req_batch: ReqBatch, ) -> Result<(), SummersetError> { let batch_size = req_batch.len(); debug_assert!(batch_size > 0); @@ -42,6 +42,42 @@ impl MultiPaxosReplica { return Ok(()); } + // if simulating read leases, extract all the reads and immediately + // reply to them with a dummy value + // TODO: only for benchmarking purposes + if self.config.sim_read_lease { + for (client, req) in &req_batch { + if let ApiRequest::Req { + id: req_id, + cmd: Command::Get { .. }, + } = req + { + self.external_api.send_reply( + ApiReply::Reply { + id: *req_id, + result: Some(CommandResult::Get { value: None }), + redirect: None, + }, + *client, + )?; + pf_trace!(self.id; "replied -> client {} for read-only cmd", client); + } + } + + req_batch.retain(|(_, req)| { + !matches!( + req, + ApiRequest::Req { + cmd: Command::Get { .. }, + .. + } + ) + }); + if req_batch.is_empty() { + return Ok(()); + } + } + // create a new instance in the first null slot (or append a new one // at the end if no holes exist); fill it up with incoming data let slot = self.first_null_slot(); diff --git a/src/protocols/raft/mod.rs b/src/protocols/raft/mod.rs index bec7789c..16c9b914 100644 --- a/src/protocols/raft/mod.rs +++ b/src/protocols/raft/mod.rs @@ -77,6 +77,11 @@ pub struct ReplicaConfigRaft { pub perf_storage_b: u64, pub perf_network_a: u64, pub perf_network_b: u64, + + /// Simulate local read lease implementation? + // TODO: actual read lease impl later? (won't affect anything about + // evalutaion results though) + pub sim_read_lease: bool, } #[allow(clippy::derivable_impls)] @@ -98,6 +103,7 @@ impl Default for ReplicaConfigRaft { perf_storage_b: 0, perf_network_a: 0, perf_network_b: 0, + sim_read_lease: false, } } } @@ -391,7 +397,8 @@ impl GenericReplica for RaftReplica { snapshot_path, snapshot_interval_s, msg_chunk_size, perf_storage_a, perf_storage_b, - perf_network_a, perf_network_b)?; + perf_network_a, perf_network_b, + sim_read_lease)?; if config.batch_interval_ms == 0 { return logged_err!( id; diff --git a/src/protocols/raft/request.rs b/src/protocols/raft/request.rs index 19f3795f..04f2a536 100644 --- a/src/protocols/raft/request.rs +++ b/src/protocols/raft/request.rs @@ -3,14 +3,14 @@ use super::*; use crate::utils::SummersetError; -use crate::server::{ApiRequest, ApiReply, LogAction}; +use crate::server::{ApiRequest, ApiReply, LogAction, Command, CommandResult}; // RaftReplica client requests entrance impl RaftReplica { /// Handler of client request batch chan recv. pub fn handle_req_batch( &mut self, - req_batch: ReqBatch, + mut req_batch: ReqBatch, ) -> Result<(), SummersetError> { let batch_size = req_batch.len(); debug_assert!(batch_size > 0); @@ -42,6 +42,42 @@ impl RaftReplica { return Ok(()); } + // if simulating read leases, extract all the reads and immediately + // reply to them with a dummy value + // TODO: only for benchmarking purposes + if self.config.sim_read_lease { + for (client, req) in &req_batch { + if let ApiRequest::Req { + id: req_id, + cmd: Command::Get { .. }, + } = req + { + self.external_api.send_reply( + ApiReply::Reply { + id: *req_id, + result: Some(CommandResult::Get { value: None }), + redirect: None, + }, + *client, + )?; + pf_trace!(self.id; "replied -> client {} for read-only cmd", client); + } + } + + req_batch.retain(|(_, req)| { + !matches!( + req, + ApiRequest::Req { + cmd: Command::Get { .. }, + .. + } + ) + }); + if req_batch.is_empty() { + return Ok(()); + } + } + // append an entry to in-memory log let entry = LogEntry { term: self.curr_term, diff --git a/src/protocols/rspaxos/leadership.rs b/src/protocols/rspaxos/leadership.rs index e20b4ee8..ea5399c4 100644 --- a/src/protocols/rspaxos/leadership.rs +++ b/src/protocols/rspaxos/leadership.rs @@ -76,6 +76,9 @@ impl RSPaxosReplica { .map(|(s, i)| (self.start_slot + s, i)) .skip(self.exec_bar - self.start_slot) { + if inst.status == Status::Null { + continue; + } if inst.status < Status::Executed { inst.external = true; // so replies to clients can be triggered } @@ -123,6 +126,7 @@ impl RSPaxosReplica { self.transport_hub.bcast_msg( PeerMsg::Heartbeat { ballot: self.bal_max_seen, + commit_bar: self.commit_bar, exec_bar: self.exec_bar, snap_bar: self.snap_bar, }, @@ -153,6 +157,7 @@ impl RSPaxosReplica { self.transport_hub.bcast_msg( PeerMsg::Heartbeat { ballot: self.bal_max_seen, + commit_bar: self.commit_bar, exec_bar: self.exec_bar, snap_bar: self.snap_bar, }, @@ -167,6 +172,7 @@ impl RSPaxosReplica { self.transport_hub.bcast_msg( PeerMsg::Heartbeat { ballot: self.bal_max_seen, + commit_bar: self.commit_bar, exec_bar: self.exec_bar, snap_bar: self.snap_bar, }, @@ -203,6 +209,7 @@ impl RSPaxosReplica { self.heard_heartbeat( self.id, self.bal_max_seen, + self.commit_bar, self.exec_bar, self.snap_bar, )?; @@ -236,6 +243,7 @@ impl RSPaxosReplica { &mut self, peer: ReplicaId, ballot: Ballot, + commit_bar: usize, exec_bar: usize, snap_bar: usize, ) -> Result<(), SummersetError> { @@ -255,6 +263,7 @@ impl RSPaxosReplica { self.transport_hub.send_msg( PeerMsg::Heartbeat { ballot: self.bal_max_seen, + commit_bar: self.commit_bar, exec_bar: self.exec_bar, snap_bar: self.snap_bar, }, @@ -272,6 +281,46 @@ impl RSPaxosReplica { return Ok(()); } + // all slots up to received commit_bar are safe to commit; submit their + // commands for execution + if commit_bar > self.commit_bar { + while self.start_slot + self.insts.len() < commit_bar { + self.insts.push(self.null_instance()?); + } + + let mut commit_cnt = 0; + for slot in self.commit_bar..commit_bar { + let inst = &mut self.insts[slot - self.start_slot]; + if inst.status < Status::Accepting { + break; + } else if inst.status >= Status::Committed { + continue; + } + + // mark this instance as committed + inst.status = Status::Committed; + pf_debug!(self.id; "committed instance at slot {} bal {}", + slot, inst.bal); + + // record commit event + self.storage_hub.submit_action( + Self::make_log_action_id(slot, Status::Committed), + LogAction::Append { + entry: WalEntry::CommitSlot { slot }, + sync: self.config.logger_sync, + }, + )?; + pf_trace!(self.id; "submitted CommitSlot log action for slot {} bal {}", + slot, inst.bal); + + commit_cnt += 1; + } + + if commit_cnt > 0 { + pf_trace!(self.id; "heartbeat commit <- {} < slot {}", peer, commit_bar); + } + } + if peer != self.id { // update peer_exec_bar if larger then known; if all servers' // exec_bar (including myself) have passed a slot, that slot diff --git a/src/protocols/rspaxos/messages.rs b/src/protocols/rspaxos/messages.rs index 3d1332e6..ca459385 100644 --- a/src/protocols/rspaxos/messages.rs +++ b/src/protocols/rspaxos/messages.rs @@ -289,61 +289,12 @@ impl RSPaxosReplica { )?; pf_trace!(self.id; "submitted CommitSlot log action for slot {} bal {}", slot, inst.bal); - - // send Commit messages to all peers - self.transport_hub - .bcast_msg(PeerMsg::Commit { slot }, None)?; - pf_trace!(self.id; "broadcast Commit messages for slot {} bal {}", - slot, ballot); } } Ok(()) } - /// Handler of Commit message from leader. - fn handle_msg_commit( - &mut self, - peer: ReplicaId, - slot: usize, - ) -> Result<(), SummersetError> { - if slot < self.start_slot { - return Ok(()); // ignore if slot index outdated - } - pf_trace!(self.id; "received Commit <- {} for slot {}", peer, slot); - - self.kickoff_hb_hear_timer()?; - - // locate instance in memory, filling in null instances if needed - while self.start_slot + self.insts.len() <= slot { - self.insts.push(self.null_instance()?); - } - let inst = &mut self.insts[slot - self.start_slot]; - - // ignore spurious duplications - if inst.status != Status::Accepting { - return Ok(()); - } - - // mark this instance as committed - inst.status = Status::Committed; - pf_debug!(self.id; "committed instance at slot {} bal {}", - slot, inst.bal); - - // record commit event - self.storage_hub.submit_action( - Self::make_log_action_id(slot, Status::Committed), - LogAction::Append { - entry: WalEntry::CommitSlot { slot }, - sync: self.config.logger_sync, - }, - )?; - pf_trace!(self.id; "submitted CommitSlot log action for slot {} bal {}", - slot, inst.bal); - - Ok(()) - } - /// Handler of Reconstruct message from leader. fn handle_msg_reconstruct( &mut self, @@ -485,7 +436,6 @@ impl RSPaxosReplica { PeerMsg::AcceptReply { slot, ballot } => { self.handle_msg_accept_reply(peer, slot, ballot) } - PeerMsg::Commit { slot } => self.handle_msg_commit(peer, slot), PeerMsg::Reconstruct { slots } => { self.handle_msg_reconstruct(peer, slots) } @@ -494,9 +444,11 @@ impl RSPaxosReplica { } PeerMsg::Heartbeat { ballot, + commit_bar, exec_bar, snap_bar, - } => self.heard_heartbeat(peer, ballot, exec_bar, snap_bar), + } => self + .heard_heartbeat(peer, ballot, commit_bar, exec_bar, snap_bar), } } } diff --git a/src/protocols/rspaxos/mod.rs b/src/protocols/rspaxos/mod.rs index 9cfcf429..9d0b0295 100644 --- a/src/protocols/rspaxos/mod.rs +++ b/src/protocols/rspaxos/mod.rs @@ -80,6 +80,11 @@ pub struct ReplicaConfigRSPaxos { pub perf_storage_b: u64, pub perf_network_a: u64, pub perf_network_b: u64, + + /// Simulate local read lease implementation? + // TODO: actual read lease impl later? (won't affect anything about + // evalutaion results though) + pub sim_read_lease: bool, } #[allow(clippy::derivable_impls)] @@ -102,6 +107,7 @@ impl Default for ReplicaConfigRSPaxos { perf_storage_b: 0, perf_network_a: 0, perf_network_b: 0, + sim_read_lease: false, } } } @@ -238,9 +244,6 @@ pub enum PeerMsg { /// Accept reply from replica to leader. AcceptReply { slot: usize, ballot: Ballot }, - /// Commit notification from leader to replicas. - Commit { slot: usize }, - /// Reconstruction read from new leader to replicas. Reconstruct { slots: Vec }, @@ -253,6 +256,9 @@ pub enum PeerMsg { /// Leader activity heartbeat. Heartbeat { ballot: Ballot, + /// For notifying followers about safe-to-commit slots (in a bit + /// conservative way). + commit_bar: usize, /// For leader step-up as well as conservative snapshotting purpose. exec_bar: usize, /// For conservative snapshotting purpose. @@ -483,7 +489,8 @@ impl GenericReplica for RSPaxosReplica { snapshot_path, snapshot_interval_s, fault_tolerance, msg_chunk_size, perf_storage_a, perf_storage_b, - perf_network_a, perf_network_b)?; + perf_network_a, perf_network_b, + sim_read_lease)?; if config.batch_interval_ms == 0 { return logged_err!( id; diff --git a/src/protocols/rspaxos/request.rs b/src/protocols/rspaxos/request.rs index fe3d5d97..a2a8132a 100644 --- a/src/protocols/rspaxos/request.rs +++ b/src/protocols/rspaxos/request.rs @@ -3,14 +3,14 @@ use super::*; use crate::utils::{SummersetError, Bitmap, RSCodeword}; -use crate::server::{ApiRequest, ApiReply, LogAction}; +use crate::server::{ApiRequest, ApiReply, LogAction, Command, CommandResult}; // RSPaxosReplica client requests entrance impl RSPaxosReplica { /// Handler of client request batch chan recv. pub fn handle_req_batch( &mut self, - req_batch: ReqBatch, + mut req_batch: ReqBatch, ) -> Result<(), SummersetError> { let batch_size = req_batch.len(); debug_assert!(batch_size > 0); @@ -42,6 +42,42 @@ impl RSPaxosReplica { return Ok(()); } + // if simulating read leases, extract all the reads and immediately + // reply to them with a dummy value + // TODO: only for benchmarking purposes + if self.config.sim_read_lease { + for (client, req) in &req_batch { + if let ApiRequest::Req { + id: req_id, + cmd: Command::Get { .. }, + } = req + { + self.external_api.send_reply( + ApiReply::Reply { + id: *req_id, + result: Some(CommandResult::Get { value: None }), + redirect: None, + }, + *client, + )?; + pf_trace!(self.id; "replied -> client {} for read-only cmd", client); + } + } + + req_batch.retain(|(_, req)| { + !matches!( + req, + ApiRequest::Req { + cmd: Command::Get { .. }, + .. + } + ) + }); + if req_batch.is_empty() { + return Ok(()); + } + } + // compute the complete Reed-Solomon codeword for the batch data let mut reqs_cw = RSCodeword::from_data( req_batch, diff --git a/src/server/external.rs b/src/server/external.rs index c88e6ab5..16d7b84a 100644 --- a/src/server/external.rs +++ b/src/server/external.rs @@ -27,7 +27,7 @@ use tokio::time::{self, Duration, MissedTickBehavior}; pub type RequestId = u64; /// Request received from client. -// TODO: add information fields such as read-only flag... +// TODO: add proper read-only flag field/type... #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, GetSize)] pub enum ApiRequest { /// Regular request. diff --git a/src/utils/qdisc.rs b/src/utils/qdisc.rs index 376cb7c1..10069419 100644 --- a/src/utils/qdisc.rs +++ b/src/utils/qdisc.rs @@ -7,6 +7,10 @@ use crate::utils::SummersetError; static DEV_PATTERN: &str = "veths"; +static DEFAULT_DELAY: f64 = 0.0; +static DEFAULT_JITTER: f64 = 0.0; +static DEFAULT_RATE: f64 = 100.0; + /// Helper struct holding qdisc information. pub struct QdiscInfo { /// Delay in ms. @@ -30,6 +34,15 @@ impl fmt::Display for QdiscInfo { } impl QdiscInfo { + /// Creates a new qdisc info struct. + pub fn new() -> Self { + QdiscInfo { + delay: DEFAULT_DELAY, + jitter: DEFAULT_JITTER, + rate: DEFAULT_RATE, + } + } + /// Query `tc qdisc` info by running the command. Returns the output line /// with expected device. fn run_qdisc_show() -> Result { @@ -132,7 +145,8 @@ impl QdiscInfo { fn parse_output_line( line: &str, ) -> Result<(f64, f64, f64), SummersetError> { - let (mut delay, mut jitter, mut rate) = (0.0, 0.0, 0.0); + let (mut delay, mut jitter, mut rate) = + (DEFAULT_DELAY, DEFAULT_JITTER, DEFAULT_RATE); let (mut stage, mut idx) = (0, 0); for seg in line.split_ascii_whitespace() { if seg == "netem" { @@ -157,15 +171,6 @@ impl QdiscInfo { Ok((delay, jitter, rate)) } - /// Creates a new qdisc info struct. - pub fn new() -> Self { - QdiscInfo { - delay: 1.0, - jitter: 0.0, - rate: 1.0, - } - } - /// Updates my fields with a new query. pub fn update(&mut self) -> Result<(), SummersetError> { let line = Self::run_qdisc_show()?; diff --git a/tla+/crossword/ConsensusMulti.tla b/tla+/crossword/ConsensusMulti.tla index 3cfe0b52..38dc4d49 100644 --- a/tla+/crossword/ConsensusMulti.tla +++ b/tla+/crossword/ConsensusMulti.tla @@ -1,6 +1,6 @@ (***************************************************************************) (* The consensus problem specification, extended with an array of *) -(* istances where each instance is a basic consensus problem. *) +(* instances where each instance is a basic consensus problem. *) (* *) (* Adapted from: *) (* https://lamport.azurewebsites.net/tla/Consensus.tla *) diff --git a/tla+/crossword/Crossword.tla b/tla+/crossword/Crossword.tla index bb237fd9..2df90184 100644 --- a/tla+/crossword/Crossword.tla +++ b/tla+/crossword/Crossword.tla @@ -9,7 +9,21 @@ ---- MODULE Crossword ---- EXTENDS FiniteSets, Integers, TLC -CONSTANT Replicas, Values, Slots, Ballots, Shards, NumDataShards, MaxFaults +CONSTANT + \* set of servers + Replicas, + \* set of values + Values, + \* set of slots (instances); 1 should be enough + Slots, + \* set of ballot numbers + Ballots, + \* set of numbers representing shards (columns) + Shards, + \* number of data shards needed to reconstruct + NumDataShards, + \* fault-tolerance level + MaxFaults MajorityNum == (Cardinality(Replicas) \div 2) + 1 @@ -30,11 +44,13 @@ BallotsAssumption == /\ IsFiniteSet(Ballots) ShardsAssumption == /\ IsFiniteSet(Shards) /\ Shards # {} -NumDataShardsAssumption == /\ NumDataShards > 0 - /\ NumDataShards =< Cardinality(Shards) +NumDataShardsAssumption == + /\ NumDataShards > 0 + /\ NumDataShards =< Cardinality(Shards) -MaxFaultsAssumption == /\ MaxFaults >= 0 - /\ MaxFaults =< (Cardinality(Replicas) - MajorityNum) +MaxFaultsAssumption == + /\ MaxFaults >= 0 + /\ MaxFaults =< (Cardinality(Replicas) - MajorityNum) ASSUME /\ ReplicasAssumption /\ ValuesAssumption @@ -52,21 +68,22 @@ variable msgs = {}, rBallot = [r \in Replicas |-> -1], rVoted = [r \in Replicas |-> [s \in Slots |-> - [bal |-> -1, val |-> 0, shards |-> {}]]], + [bal |-> -1, val |-> 0, + shards |-> {}]]], proposed = [s \in Slots |-> {}], learned = [s \in Slots |-> {}]; define - \* Is g a subset of u that is large enough under given MaxFaults? + \* Is g a large enough subset of u under MaxFaults? BigEnoughUnderFaults(g, u) == Cardinality(g) >= (Cardinality(u) - MaxFaults) - \* Set of subsets of u that we consider under given MaxFaults. + \* Set of subsets of u that we consider under MaxFaults. SubsetsUnderFaults(u) == {g \in SUBSET u: BigEnoughUnderFaults(g, u)} - \* Is cs a coverage set (i.e., a set of sets of shards) from which we can - \* reconstruct the original data? + \* Is cs a coverage set (i.e., a set of sets of shards) + \* from which we can reconstruct the original data? IsGoodCoverageSet(cs) == Cardinality(UNION cs) >= NumDataShards @@ -76,7 +93,8 @@ define \A group \in SubsetsUnderFaults(Replicas): IsGoodCoverageSet({assign[r]: r \in group})} - \* Is v a safely prepared value given the prepare reply pattern and ballot? + \* Is v a safely prepared value given the prepare reply + \* pattern and ballot? ValuePreparedIn(v, prPat, pBal) == \/ /\ Cardinality(prPat) >= MajorityNum /\ \A pr \in prPat: pr.vBal = -1 @@ -84,12 +102,17 @@ define /\ \E c \in 0..(pBal-1): /\ \A pr \in prPat: pr.vBal =< c /\ \E pr \in prPat: pr.vBal = c /\ pr.vVal = v - /\ IsGoodCoverageSet({pr.vShards: pr \in {prr \in prPat: prr.vVal = v}}) + /\ IsGoodCoverageSet( + {pr.vShards: pr \in + {prr \in prPat: prr.vVal = v}}) \/ /\ BigEnoughUnderFaults(prPat, Replicas) /\ ~\E vv \in Values: - IsGoodCoverageSet({pr.vShards: pr \in {prr \in prPat: prr.vVal = vv}}) + IsGoodCoverageSet( + {pr.vShards: pr \in + {prr \in prPat: prr.vVal = vv}}) - \* Does the given accept reply pattern decide a value to be chosen? + \* Does the given accept reply pattern decide a value to + \* be chosen? PatternDecidesChosen(arPat) == /\ Cardinality(arPat) >= MajorityNum /\ \A group \in SubsetsUnderFaults(arPat): @@ -106,26 +129,30 @@ macro SendAll(ms) begin end macro; \* Leader sends Prepare message to replicas. -\* This is the first message a leader makes after being elected. Think of this -\* as a Prepare message that covers infinitely many slots up to infinity. +\* This is the first message a leader makes after election. +\* Think of this as a Prepare message that covers infinitely +\* many slots up to infinity. macro Prepare(r) begin with b \in Ballots do await /\ b > lBallot[r] - /\ ~\E m \in msgs: (m.type = "Prepare") /\ (m.bal = b); - \* using this clause to model that ballot numbers from different - \* proposers should be unique + /\ ~\E m \in msgs: + (m.type = "Prepare") /\ (m.bal = b); + \* using this clause to model that ballot + \* nums from different proposers be unique Send([type |-> "Prepare", from |-> r, bal |-> b]); lBallot[r] := b; lStatus[r] := [s \in Slots |-> - IF lStatus[r][s] = "Learned" THEN "Learned" - ELSE "Preparing"]; + IF lStatus[r][s] = "Learned" + THEN "Learned" + ELSE "Preparing"]; end with; end macro; \* Replica replies to a Prepare message. -\* Replicas reply with their known value shards for recovery reconstruction. +\* Replicas reply with known value shards for necessary +\* recovery reconstruction. macro PrepareReply(r) begin with m \in msgs do await (m.type = "Prepare") /\ (m.bal > rBallot[r]); @@ -138,16 +165,20 @@ macro PrepareReply(r) begin end macro; \* Leader sends Accept message to replicas for a slot. -\* Value shards are assigned to replicas according to some reasonable assignment. +\* Shards are assigned according to some assignment policy, +\* from a set where all reasonable assignments considered. macro Accept(r, s) begin await lStatus[r][s] = "Preparing"; with v \in Values do - await \E MS \in SUBSET {m \in msgs: /\ m.type = "PrepareReply" - /\ m.bal = lBallot[r]}: + await \E MS \in SUBSET { + m \in msgs: + /\ m.type = "PrepareReply" + /\ m.bal = lBallot[r]}: LET prPat == {[replica |-> m.from, vBal |-> m.voted[s].bal, vVal |-> m.voted[s].val, - vShards |-> m.voted[s].shards]: m \in MS} + vShards |-> m.voted[s].shards]: + m \in MS} IN ValuePreparedIn(v, prPat, lBallot[r]); with assign \in ValidAssignments do SendAll({[type |-> "Accept", @@ -156,7 +187,8 @@ macro Accept(r, s) begin slot |-> s, bal |-> lBallot[r], val |-> v, - shards |-> assign[rt]]: rt \in Replicas}); + shards |-> assign[rt]]: + rt \in Replicas}); end with; lStatus[r][s] := "Accepting"; proposed[s] := proposed[s] \cup {v}; @@ -164,11 +196,14 @@ macro Accept(r, s) begin end macro; \* Replica replies to an Accept message. -\* Such a reply does not need to contain the actual value data; only the shards -\* metadata is enough for the leader to gather Acceptance Patterns. +\* Such a reply will not carry actual value data in practice; +\* just the shards assignment metadata is enough for the +\* leader to gather Acceptance Patterns. macro AcceptReply(r) begin with m \in msgs do - await (m.type = "Accept") /\ (m.to = r) /\ (m.bal >= rBallot[r]); + await /\ (m.type = "Accept") + /\ (m.to = r) + /\ (m.bal >= rBallot[r]); Send([type |-> "AcceptReply", from |-> r, slot |-> m.slot, @@ -176,7 +211,8 @@ macro AcceptReply(r) begin val |-> m.val, shards |-> m.shards]); rBallot[r] := m.bal; - rVoted[r][m.slot] := [bal |-> m.bal, val |-> m.val, shards |-> m.shards]; + rVoted[r][m.slot] := [bal |-> m.bal, val |-> m.val, + shards |-> m.shards]; end with; end macro; @@ -184,13 +220,15 @@ end macro; macro Learn(r, s) begin await lStatus[r][s] = "Accepting"; with v \in Values do - await \E MS \in SUBSET {m \in msgs: /\ m.type = "AcceptReply" - /\ m.slot = s - /\ m.bal = lBallot[r] - /\ m.val = v}: - LET arPat == {[replica |-> m.from, - aShards |-> m.shards]: m \in MS} - IN PatternDecidesChosen(arPat); + await \E MS \in SUBSET { + m \in msgs: + /\ m.type = "AcceptReply" + /\ m.slot = s + /\ m.bal = lBallot[r] + /\ m.val = v}: + LET arPat == {[replica |-> m.from, + aShards |-> m.shards]: m \in MS} + IN PatternDecidesChosen(arPat); lStatus[r][s] := "Learned"; learned[s] := learned[s] \cup {v}; end with; @@ -223,7 +261,7 @@ begin end process; end algorithm; *) -\* BEGIN TRANSLATION (chksum(pcal) = "12868a7a" /\ chksum(tla) = "eede354c") +\* BEGIN TRANSLATION (chksum(pcal) = "50e87803" /\ chksum(tla) = "228525c6") VARIABLES msgs, lBallot, lStatus, rBallot, rVoted, proposed, learned (* define statement *) @@ -246,6 +284,7 @@ ValidAssignments == IsGoodCoverageSet({assign[r]: r \in group})} + ValuePreparedIn(v, prPat, pBal) == \/ /\ Cardinality(prPat) >= MajorityNum /\ \A pr \in prPat: pr.vBal = -1 @@ -253,10 +292,15 @@ ValuePreparedIn(v, prPat, pBal) == /\ \E c \in 0..(pBal-1): /\ \A pr \in prPat: pr.vBal =< c /\ \E pr \in prPat: pr.vBal = c /\ pr.vVal = v - /\ IsGoodCoverageSet({pr.vShards: pr \in {prr \in prPat: prr.vVal = v}}) + /\ IsGoodCoverageSet( + {pr.vShards: pr \in + {prr \in prPat: prr.vVal = v}}) \/ /\ BigEnoughUnderFaults(prPat, Replicas) /\ ~\E vv \in Values: - IsGoodCoverageSet({pr.vShards: pr \in {prr \in prPat: prr.vVal = vv}}) + IsGoodCoverageSet( + {pr.vShards: pr \in + {prr \in prPat: prr.vVal = vv}}) + PatternDecidesChosen(arPat) == @@ -277,20 +321,23 @@ Init == (* Global variables *) /\ rBallot = [r \in Replicas |-> -1] /\ rVoted = [r \in Replicas |-> [s \in Slots |-> - [bal |-> -1, val |-> 0, shards |-> {}]]] + [bal |-> -1, val |-> 0, + shards |-> {}]]] /\ proposed = [s \in Slots |-> {}] /\ learned = [s \in Slots |-> {}] Replica(self) == \/ /\ \E b \in Ballots: /\ /\ b > lBallot[self] - /\ ~\E m \in msgs: (m.type = "Prepare") /\ (m.bal = b) + /\ ~\E m \in msgs: + (m.type = "Prepare") /\ (m.bal = b) /\ msgs' = (msgs \cup {([type |-> "Prepare", from |-> self, bal |-> b])}) /\ lBallot' = [lBallot EXCEPT ![self] = b] /\ lStatus' = [lStatus EXCEPT ![self] = [s \in Slots |-> - IF lStatus[self][s] = "Learned" THEN "Learned" - ELSE "Preparing"]] + IF lStatus[self][s] = "Learned" + THEN "Learned" + ELSE "Preparing"]] /\ UNCHANGED <> \/ /\ \E m \in msgs: /\ (m.type = "Prepare") /\ (m.bal > rBallot[self]) @@ -303,12 +350,15 @@ Replica(self) == \/ /\ \E b \in Ballots: \/ /\ \E s \in Slots: /\ lStatus[self][s] = "Preparing" /\ \E v \in Values: - /\ \E MS \in SUBSET {m \in msgs: /\ m.type = "PrepareReply" - /\ m.bal = lBallot[self]}: + /\ \E MS \in SUBSET { + m \in msgs: + /\ m.type = "PrepareReply" + /\ m.bal = lBallot[self]}: LET prPat == {[replica |-> m.from, vBal |-> m.voted[s].bal, vVal |-> m.voted[s].val, - vShards |-> m.voted[s].shards]: m \in MS} + vShards |-> m.voted[s].shards]: + m \in MS} IN ValuePreparedIn(v, prPat, lBallot[self]) /\ \E assign \in ValidAssignments: msgs' = (msgs \cup ({[type |-> "Accept", @@ -317,12 +367,15 @@ Replica(self) == \/ /\ \E b \in Ballots: slot |-> s, bal |-> lBallot[self], val |-> v, - shards |-> assign[rt]]: rt \in Replicas})) + shards |-> assign[rt]]: + rt \in Replicas})) /\ lStatus' = [lStatus EXCEPT ![self][s] = "Accepting"] /\ proposed' = [proposed EXCEPT ![s] = proposed[s] \cup {v}] /\ UNCHANGED <> \/ /\ \E m \in msgs: - /\ (m.type = "Accept") /\ (m.to = self) /\ (m.bal >= rBallot[self]) + /\ /\ (m.type = "Accept") + /\ (m.to = self) + /\ (m.bal >= rBallot[self]) /\ msgs' = (msgs \cup {([type |-> "AcceptReply", from |-> self, slot |-> m.slot, @@ -330,18 +383,21 @@ Replica(self) == \/ /\ \E b \in Ballots: val |-> m.val, shards |-> m.shards])}) /\ rBallot' = [rBallot EXCEPT ![self] = m.bal] - /\ rVoted' = [rVoted EXCEPT ![self][m.slot] = [bal |-> m.bal, val |-> m.val, shards |-> m.shards]] + /\ rVoted' = [rVoted EXCEPT ![self][m.slot] = [bal |-> m.bal, val |-> m.val, + shards |-> m.shards]] /\ UNCHANGED <> \/ /\ \E s \in Slots: /\ lStatus[self][s] = "Accepting" /\ \E v \in Values: - /\ \E MS \in SUBSET {m \in msgs: /\ m.type = "AcceptReply" - /\ m.slot = s - /\ m.bal = lBallot[self] - /\ m.val = v}: - LET arPat == {[replica |-> m.from, - aShards |-> m.shards]: m \in MS} - IN PatternDecidesChosen(arPat) + /\ \E MS \in SUBSET { + m \in msgs: + /\ m.type = "AcceptReply" + /\ m.slot = s + /\ m.bal = lBallot[self] + /\ m.val = v}: + LET arPat == {[replica |-> m.from, + aShards |-> m.shards]: m \in MS} + IN PatternDecidesChosen(arPat) /\ lStatus' = [lStatus EXCEPT ![self][s] = "Learned"] /\ learned' = [learned EXCEPT ![s] = learned[s] \cup {v}] /\ UNCHANGED <>