Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upload benchmark copy query times to benchmark server #2099

Merged
merged 9 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.8.11 LANGUAGES CXX)
project(Kuzu VERSION 0.0.8.12 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
41 changes: 39 additions & 2 deletions benchmark/benchmark_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import psutil
from serializer import _get_kuzu_version
import multiprocessing
import re

# Get the number of CPUs, try to use sched_getaffinity if available to account
# for Docker CPU limits
Expand Down Expand Up @@ -72,6 +73,10 @@
'ldbc-sf100-ku': os.path.join(serialized_base_dir, 'ldbc-sf100-serialized')
}

benchmark_copy_log_dir = os.path.join("/tmp", 'benchmark_copy_logs')
shutil.rmtree(benchmark_copy_log_dir, ignore_errors=True)
os.mkdir(benchmark_copy_log_dir)

benchmark_log_dir = os.path.join("/tmp", 'benchmark_logs')
shutil.rmtree(benchmark_log_dir, ignore_errors=True)
os.mkdir(benchmark_log_dir)
Expand All @@ -82,6 +87,35 @@
num_run = 5


class CopyQueryBenchmark:
def __init__(self, benchmark_copy_log):
self.name = os.path.basename(benchmark_copy_log).split('_')[0]
self.group = 'copy'
self.status = 'pass'

with open(benchmark_copy_log) as log_file:
self.log = log_file.read()
match = re.search('Time: (.*)ms \(compiling\), (.*)ms \(executing\)', self.log)
self.compiling_time = float(match.group(1))
self.execution_time = float(match.group(2))

def to_json_dict(self):
result = {
'query_name': self.name,
'query_group': self.group,
'log': self.log,
'records': [
{
'status': self.status,
'compiling_time': self.compiling_time,
'execution_time': self.execution_time,
'query_seq': 3 # value > 2 required by server
}
]
}
return result


class QueryBenchmark:
def __init__(self, benchmark_log, group_name='NULL'):
self.name = os.path.basename(benchmark_log).split('_')[0]
Expand Down Expand Up @@ -186,7 +220,7 @@ def serialize_dataset(dataset_name):
serializer_script = os.path.join(base_dir, "serializer.py")
try:
subprocess.run([sys.executable, serializer_script, dataset_name,
dataset_path, serialized_graph_path], check=True)
dataset_path, serialized_graph_path, benchmark_copy_log_dir], check=True)
except subprocess.CalledProcessError as e:
logging.error("Failed to serialize dataset: %s", e)
sys.exit(1)
Expand All @@ -209,7 +243,7 @@ def run_kuzu(serialized_graph_path):
tuple(benchmark_cmd), stdout=subprocess.PIPE)
for line in iter(process.stdout.readline, b''):
print(line.decode("utf-8"), end='')
process.communicate()[0]
process.wait()
if process.returncode != 0:
print()
logging.error("An error has occurred while running benchmark!")
Expand Down Expand Up @@ -246,6 +280,9 @@ def get_run_info():

def get_query_info():
results = []
for filename in os.listdir(benchmark_copy_log_dir):
copy_query_benchmark = CopyQueryBenchmark(os.path.join(benchmark_copy_log_dir, filename))
results.append(copy_query_benchmark.to_json_dict())
for path in os.scandir(benchmark_log_dir):
if path.is_dir():
for filename in os.listdir(path):
Expand Down
40 changes: 31 additions & 9 deletions benchmark/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import shutil
import sys
import subprocess
import re

base_dir = os.path.dirname(os.path.realpath(__file__))
kuzu_exec_path = os.path.join(
Expand All @@ -17,7 +18,7 @@ def _get_kuzu_version():
return line.split(' ')[2].strip()


def serialize(dataset_name, dataset_path, serialized_graph_path):
def serialize(dataset_name, dataset_path, serialized_graph_path, benchmark_copy_log_dir):
bin_version = _get_kuzu_version()

if not os.path.exists(serialized_graph_path):
Expand Down Expand Up @@ -46,16 +47,36 @@ def serialize(dataset_name, dataset_path, serialized_graph_path):
serialize_queries = [q.strip().replace('{}', dataset_path)
for q in serialize_queries]

table_types = {}

for s in serialize_queries:
logging.info('Executing query: %s', s)
try:
# Run kuzu shell one query at a time. This ensures a new process is
# created for each query to avoid memory leaks.
subprocess.run([kuzu_exec_path, serialized_graph_path],
input=(s + ";" + "\n").encode("ascii"), check=True)
except subprocess.CalledProcessError as e:
# Run kuzu shell one query at a time. This ensures a new process is
# created for each query to avoid memory leaks.
process = subprocess.Popen([kuzu_exec_path, serialized_graph_path],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
process.stdin.write((s + ";" + "\n").encode("ascii"))
process.stdin.close()
match = re.match('create\s+(.+?)\s+table\s+(.+?)\s*\(', s, re.IGNORECASE)
if match:
table_types[match.group(2)] = match.group(1).lower()
match = False
Copy link
Member

@mewim mewim Sep 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is not very readable.

  1. You should run the check for create statement before subprocess.Popen. If it is a create you should simply pass the output via sys.stdout and sys.stderr redirection instead of manually get the output and print.
  2. Instead of setting match to False here you should indent line 66-75 so it is under your else. If you do this, you can also avoid the if check for your write and flush.
  3. Do not reuse your flags. Call it create_match and copy_match maybe. Reusing a flag-type variable is
    error-prone and make the code hard to understand.

else:
match = re.match('copy\s+(.+?)\s+from', s, re.IGNORECASE)
if match:
filename = table_types[match.group(1)] + '-' + match.group(1).replace('_', '-') + '_log.txt'
f = open(os.path.join(benchmark_copy_log_dir, filename), 'ab')
for line in iter(process.stdout.readline, b''):
sys.stdout.write(line.decode("utf-8"))
sys.stdout.flush()
if match:
f.write(line)
if match:
f.close()
process.wait()
if process.returncode != 0:
logging.error('Error executing query: %s', s)
raise e
raise subprocess.CalledProcessError

with open(os.path.join(serialized_graph_path, 'version.txt'), 'w') as f:
f.write(bin_version)
Expand All @@ -66,8 +87,9 @@ def serialize(dataset_name, dataset_path, serialized_graph_path):
dataset_name = sys.argv[1]
dataset_path = sys.argv[2]
serialized_graph_path = sys.argv[3]
benchmark_copy_log_dir = sys.argv[4]
try:
serialize(dataset_name, dataset_path, serialized_graph_path)
serialize(dataset_name, dataset_path, serialized_graph_path, benchmark_copy_log_dir)
except Exception as e:
logging.error('Error serializing dataset %s', dataset_name)
logging.error(e)
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace kuzu {
namespace common {

constexpr char KUZU_VERSION[] = "v0.0.8.11";
constexpr char KUZU_VERSION[] = "v0.0.8.12";

constexpr uint64_t DEFAULT_VECTOR_CAPACITY_LOG_2 = 11;
constexpr uint64_t DEFAULT_VECTOR_CAPACITY = (uint64_t)1 << DEFAULT_VECTOR_CAPACITY_LOG_2;
Expand Down
12 changes: 6 additions & 6 deletions src/include/storage/storage_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ using storage_version_t = uint64_t;

struct StorageVersionInfo {
static std::unordered_map<std::string, storage_version_t> getStorageVersionInfo() {
return {{"0.0.8.11", 22}, {"0.0.8.10", 22}, {"0.0.8.9", 22}, {"0.0.8.8", 21},
{"0.0.8.7", 21}, {"0.0.8.6", 20}, {"0.0.8.5", 19}, {"0.0.8.4", 19}, {"0.0.8.3", 19},
{"0.0.8.2", 19}, {"0.0.8.1", 18}, {"0.0.8", 17}, {"0.0.7.1", 16}, {"0.0.7", 15},
{"0.0.6.5", 14}, {"0.0.6.4", 13}, {"0.0.6.3", 12}, {"0.0.6.2", 11}, {"0.0.6.1", 10},
{"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3.5", 6}, {"0.0.3.4", 5},
{"0.0.3.3", 4}, {"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}};
return {{"0.0.8.12", 22}, {"0.0.8.11", 22}, {"0.0.8.10", 22}, {"0.0.8.9", 22},
{"0.0.8.8", 21}, {"0.0.8.7", 21}, {"0.0.8.6", 20}, {"0.0.8.5", 19}, {"0.0.8.4", 19},
{"0.0.8.3", 19}, {"0.0.8.2", 19}, {"0.0.8.1", 18}, {"0.0.8", 17}, {"0.0.7.1", 16},
{"0.0.7", 15}, {"0.0.6.5", 14}, {"0.0.6.4", 13}, {"0.0.6.3", 12}, {"0.0.6.2", 11},
{"0.0.6.1", 10}, {"0.0.6", 9}, {"0.0.5", 8}, {"0.0.4", 7}, {"0.0.3.5", 6},
{"0.0.3.4", 5}, {"0.0.3.3", 4}, {"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}};
}

static storage_version_t getStorageVersion();
Expand Down
2 changes: 1 addition & 1 deletion test/test_files/tinysnb/function/table.test
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,4 @@ height
-LOG ReturnDBVersion
-STATEMENT CALL db_version() RETURN version
---- 1
v0.0.8.11
v0.0.8.12