Skip to content

Commit

Permalink
upload benchmark copy query times to benchmark server (#2099)
Browse files Browse the repository at this point in the history
* upload benchmark copy query times to benchmark server

* bump Kuzu version to trigger copy queries

* bump Kuzu version to trigger copy queries

* fix formatting

* bump Kuzu version to trigger copy queries

* fix buffered output in serializer subprocess

* fix buffered output in serializer subprocess

* fix copy query naming issues

* fix serializer subprocess logic

---------

Co-authored-by: Russell Liu <russell.liu@uwaterloo.ca>
  • Loading branch information
russell-liu and Russell Liu committed Sep 29, 2023
1 parent c4652f2 commit 951a2ca
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 20 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.8.11 LANGUAGES CXX)
project(Kuzu VERSION 0.0.8.12 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
41 changes: 39 additions & 2 deletions benchmark/benchmark_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import psutil
from serializer import _get_kuzu_version
import multiprocessing
import re

# Get the number of CPUs, try to use sched_getaffinity if available to account
# for Docker CPU limits
Expand Down Expand Up @@ -72,6 +73,10 @@
'ldbc-sf100-ku': os.path.join(serialized_base_dir, 'ldbc-sf100-serialized')
}

benchmark_copy_log_dir = os.path.join("/tmp", 'benchmark_copy_logs')
shutil.rmtree(benchmark_copy_log_dir, ignore_errors=True)
os.mkdir(benchmark_copy_log_dir)

benchmark_log_dir = os.path.join("/tmp", 'benchmark_logs')
shutil.rmtree(benchmark_log_dir, ignore_errors=True)
os.mkdir(benchmark_log_dir)
Expand All @@ -82,6 +87,35 @@
num_run = 5


class CopyQueryBenchmark:
def __init__(self, benchmark_copy_log):
self.name = os.path.basename(benchmark_copy_log).split('_')[0]
self.group = 'copy'
self.status = 'pass'

with open(benchmark_copy_log) as log_file:
self.log = log_file.read()
match = re.search('Time: (.*)ms \(compiling\), (.*)ms \(executing\)', self.log)
self.compiling_time = float(match.group(1))
self.execution_time = float(match.group(2))

def to_json_dict(self):
result = {
'query_name': self.name,
'query_group': self.group,
'log': self.log,
'records': [
{
'status': self.status,
'compiling_time': self.compiling_time,
'execution_time': self.execution_time,
'query_seq': 3 # value > 2 required by server
}
]
}
return result


class QueryBenchmark:
def __init__(self, benchmark_log, group_name='NULL'):
self.name = os.path.basename(benchmark_log).split('_')[0]
Expand Down Expand Up @@ -186,7 +220,7 @@ def serialize_dataset(dataset_name):
serializer_script = os.path.join(base_dir, "serializer.py")
try:
subprocess.run([sys.executable, serializer_script, dataset_name,
dataset_path, serialized_graph_path], check=True)
dataset_path, serialized_graph_path, benchmark_copy_log_dir], check=True)
except subprocess.CalledProcessError as e:
logging.error("Failed to serialize dataset: %s", e)
sys.exit(1)
Expand All @@ -209,7 +243,7 @@ def run_kuzu(serialized_graph_path):
tuple(benchmark_cmd), stdout=subprocess.PIPE)
for line in iter(process.stdout.readline, b''):
print(line.decode("utf-8"), end='')
process.communicate()[0]
process.wait()
if process.returncode != 0:
print()
logging.error("An error has occurred while running benchmark!")
Expand Down Expand Up @@ -246,6 +280,9 @@ def get_run_info():

def get_query_info():
results = []
for filename in os.listdir(benchmark_copy_log_dir):
copy_query_benchmark = CopyQueryBenchmark(os.path.join(benchmark_copy_log_dir, filename))
results.append(copy_query_benchmark.to_json_dict())
for path in os.scandir(benchmark_log_dir):
if path.is_dir():
for filename in os.listdir(path):
Expand Down
35 changes: 26 additions & 9 deletions benchmark/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import shutil
import sys
import subprocess
import re

base_dir = os.path.dirname(os.path.realpath(__file__))
kuzu_exec_path = os.path.join(
Expand All @@ -17,7 +18,7 @@ def _get_kuzu_version():
return line.split(' ')[2].strip()


def serialize(dataset_name, dataset_path, serialized_graph_path):
def serialize(dataset_name, dataset_path, serialized_graph_path, benchmark_copy_log_dir):
bin_version = _get_kuzu_version()

if not os.path.exists(serialized_graph_path):
Expand Down Expand Up @@ -46,16 +47,31 @@ def serialize(dataset_name, dataset_path, serialized_graph_path):
serialize_queries = [q.strip().replace('{}', dataset_path)
for q in serialize_queries]

table_types = {}

for s in serialize_queries:
logging.info('Executing query: %s', s)
try:
# Run kuzu shell one query at a time. This ensures a new process is
# created for each query to avoid memory leaks.
subprocess.run([kuzu_exec_path, serialized_graph_path],
input=(s + ";" + "\n").encode("ascii"), check=True)
except subprocess.CalledProcessError as e:
create_match = re.match('create\s+(.+?)\s+table\s+(.+?)\s*\(', s, re.IGNORECASE)
# Run kuzu shell one query at a time. This ensures a new process is
# created for each query to avoid memory leaks.
process = subprocess.Popen([kuzu_exec_path, serialized_graph_path],
stdin=subprocess.PIPE, stdout=sys.stdout if create_match else subprocess.PIPE)
process.stdin.write((s + ";" + "\n").encode("ascii"))
process.stdin.close()
if create_match:
table_types[create_match.group(2)] = create_match.group(1).lower()
else:
copy_match = re.match('copy\s+(.+?)\s+from', s, re.IGNORECASE)
filename = table_types[copy_match.group(1)] + '-' + copy_match.group(1).replace('_', '-') + '_log.txt'
with open(os.path.join(benchmark_copy_log_dir, filename), 'ab') as f:
for line in iter(process.stdout.readline, b''):
sys.stdout.write(line.decode("utf-8"))
sys.stdout.flush()
f.write(line)
process.wait()
if process.returncode != 0:
logging.error('Error executing query: %s', s)
raise e
raise subprocess.CalledProcessError

with open(os.path.join(serialized_graph_path, 'version.txt'), 'w') as f:
f.write(bin_version)
Expand All @@ -66,8 +82,9 @@ def serialize(dataset_name, dataset_path, serialized_graph_path):
dataset_name = sys.argv[1]
dataset_path = sys.argv[2]
serialized_graph_path = sys.argv[3]
benchmark_copy_log_dir = sys.argv[4]
try:
serialize(dataset_name, dataset_path, serialized_graph_path)
serialize(dataset_name, dataset_path, serialized_graph_path, benchmark_copy_log_dir)
except Exception as e:
logging.error('Error serializing dataset %s', dataset_name)
logging.error(e)
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace kuzu {
namespace common {

constexpr char KUZU_VERSION[] = "v0.0.8.11";
constexpr char KUZU_VERSION[] = "v0.0.8.12";

constexpr uint64_t DEFAULT_VECTOR_CAPACITY_LOG_2 = 11;
constexpr uint64_t DEFAULT_VECTOR_CAPACITY = (uint64_t)1 << DEFAULT_VECTOR_CAPACITY_LOG_2;
Expand Down
12 changes: 6 additions & 6 deletions src/include/storage/storage_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ using storage_version_t = uint64_t;

struct StorageVersionInfo {
static std::unordered_map<std::string, storage_version_t> getStorageVersionInfo() {
return {{"0.0.8.11", 22}, {"0.0.8.10", 22}, {"0.0.8.9", 22}, {"0.0.8.8", 21},
{"0.0.8.7", 21}, {"0.0.8.6", 20}, {"0.0.8.5", 19}, {"0.0.8.4", 19}, {"0.0.8.3", 19},
{"0.0.8.2", 19}, {"0.0.8.1", 18}, {"0.0.8", 17}, {"0.0.7.1", 16}, {"0.0.7", 15},
{"0.0.6.5", 14}, {"0.0.6.4", 13}, {"0.0.6.3", 12}, {"0.0.6.2", 11}, {"0.0.6.1", 10},
{"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3.5", 6}, {"0.0.3.4", 5},
{"0.0.3.3", 4}, {"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}};
return {{"0.0.8.12", 22}, {"0.0.8.11", 22}, {"0.0.8.10", 22}, {"0.0.8.9", 22},
{"0.0.8.8", 21}, {"0.0.8.7", 21}, {"0.0.8.6", 20}, {"0.0.8.5", 19}, {"0.0.8.4", 19},
{"0.0.8.3", 19}, {"0.0.8.2", 19}, {"0.0.8.1", 18}, {"0.0.8", 17}, {"0.0.7.1", 16},
{"0.0.7", 15}, {"0.0.6.5", 14}, {"0.0.6.4", 13}, {"0.0.6.3", 12}, {"0.0.6.2", 11},
{"0.0.6.1", 10}, {"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3.5", 6},
{"0.0.3.4", 5}, {"0.0.3.3", 4}, {"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}};
}

static storage_version_t getStorageVersion();
Expand Down
2 changes: 1 addition & 1 deletion test/test_files/tinysnb/function/table.test
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,4 @@ height
-LOG ReturnDBVersion
-STATEMENT CALL db_version() RETURN version
---- 1
v0.0.8.11
v0.0.8.12

0 comments on commit 951a2ca

Please sign in to comment.