Skip to content

Commit

Permalink
Refactor benchmark script
Browse files Browse the repository at this point in the history
  • Loading branch information
mewim committed Jan 5, 2023
1 parent f1471ee commit 2889f49
Show file tree
Hide file tree
Showing 9 changed files with 398 additions and 84 deletions.
9 changes: 4 additions & 5 deletions .github/workflows/ci-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,13 @@ jobs:

benchmark:
name: benchmark
runs-on: self-hosted-himrod
runs-on: kuzu-self-hosted-benchmarking
steps:
- uses: actions/checkout@v2
- run: sudo apt install -y python3-pip && sudo apt install -y sqlite3
- run: pip3 install -r tools/python_api/requirements_dev.txt
- run: pip3 install --user -r tools/python_api/requirements_dev.txt

- name: build
run: CC=gcc-9 make release NUM_THREADS=32
run: make release NUM_THREADS=30

- name: benchmark
run: python3 benchmark/benchmark_runner.py --dataset ldbc-sf100
run: python3 benchmark/benchmark_runner.py --dataset ldbc-sf100 --thread 1
26 changes: 26 additions & 0 deletions benchmark/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
FROM ubuntu:22.04

ENV CSV_DIR /csv
ENV SERIALIZED_DIR /serialized

RUN DEBIAN_FRONTEND=noninteractive apt-get update && apt-get install -y --no-install-recommends apt-utils
RUN DEBIAN_FRONTEND=noninteractive apt-get -y install python3-dev python3-pip python-is-python3 cmake nodejs jq curl apt-transport-https gnupg sudo git
RUN pip3 install requests psutil

RUN mkdir -p $CSV_DIR $SERIALIZED_DIR

RUN useradd --create-home runner
RUN chown -R runner:runner $CSV_DIR $SERIALIZED_DIR

USER runner
RUN mkdir /home/runner/actions-runner
WORKDIR /home/runner/actions-runner

RUN curl -o actions-runner-linux-x64-2.298.2.tar.gz -L https://github.com/actions/runner/releases/download/v2.298.2/actions-runner-linux-x64-2.298.2.tar.gz
RUN echo "0bfd792196ce0ec6f1c65d2a9ad00215b2926ef2c416b8d97615265194477117 actions-runner-linux-x64-2.298.2.tar.gz" | shasum -a 256
RUN tar xzf ./actions-runner-linux-x64-2.298.2.tar.gz

COPY --chown=runner:runner start.sh start.sh
RUN chmod +x start.sh

ENTRYPOINT ["./start.sh"]
19 changes: 19 additions & 0 deletions benchmark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
## Build

```
docker build -t kuzu-self-hosted-benchmark-runner .
```

## Start Container

```
docker run --name self-hosted-benchmark-runner-<X> --detach --restart=always\
-e GITHUB_ACCESS_TOKEN=<YOUR_GITHUB_ACCESS_TOKEN>\
-e MACHINE_NAME=<NAME_OF_THE_PHYSICAL_MACHINE>\
-e JWT_TOKEN=<YOUR_JWT_TOKEN>\
-e BENCHMARK_SERVER_URL=http://<YOUR_SERVER_ADDRESS>/api/post_results\
-v <PATH_TO_RAW_CSV_FILES>:/csv\
-v <PATH_TO_SERIALIZED_DATASET>:/serialized\
--memory=<MEMORY_LIMIT> --cpuset-cpus=<RANGE_OF_CPU_CORES>\
kuzu-self-hosted-benchmark-runner
```
250 changes: 173 additions & 77 deletions benchmark/benchmark_runner.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,81 @@
import sys
import argparse
import os
import sqlite3
import datetime
import logging
import requests
import shutil
import subprocess
import logging
import datetime
import os
import argparse
import sys
import psutil
from serializer import _get_kuzu_version
import multiprocessing

# Get the number of CPUs, try to use sched_getaffinity if available to account
# for Docker CPU limits
try:
cpu_count = len(os.sched_getaffinity(0))
except AttributeError:
cpu_count = multiprocessing.cpu_count()

# Use 90% of the available memory size as bm-size
# First try to read the memory limit from cgroup to account for Docker RAM
# limit, if not available use the total memory size
try:
# cgroup v2
max_memory = int(open("/sys/fs/cgroup/memory.max").readline().strip())
except FileNotFoundError:
try:
# cgroup v1
max_memory = int(
open("/sys/fs/cgroup/memory/memory.limit_in_bytes").readline().strip())
except FileNotFoundError:
max_memory = psutil.virtual_memory().total

bm_size = int((max_memory / 1024 ** 2) * .9)
base_dir = os.path.dirname(os.path.realpath(__file__))

# dataset registration
datasets = {'ldbc-sf10', 'ldbc-sf100'}

csv_base_dir = os.getenv('CSV_DIR')
serialized_base_dir = os.getenv('SERIALIZED_DIR')
is_dry_run = os.getenv('DRY_RUN') == 'true'
benchmark_files = os.path.join(base_dir, 'queries')

kuzu_benchmark_tool = os.path.join(
base_dir, '..', 'build', 'release', 'tools', 'benchmark', 'kuzu_benchmark')

if csv_base_dir is None:
logging.error("CSV_DIR is not set, exiting...")
sys.exit(1)
if serialized_base_dir is None:
logging.error("SERIALIZED_DIR is not set, exiting...")
sys.exit(1)

benchmark_server_url = os.getenv('BENCHMARK_SERVER_URL')
if benchmark_server_url is None and not is_dry_run:
logging.error("BENCHMARK_SERVER_URL is not set, exiting...")
sys.exit(1)

jwt_token = os.getenv('JWT_TOKEN')
if jwt_token is None and not is_dry_run:
logging.error("JWT_TOKEN is not set, exiting...")
sys.exit(1)

datasets_path = {
'ldbc-sf10-ku': '/home/x74feng/CI/ldbc-sf10',
'ldbc-sf100-ku': '/home/x74feng/CI/ldbc-sf100'
'ldbc-sf10-ku': os.path.join(csv_base_dir, 'ldbc-10', 'csv'),
'ldbc-sf100-ku': os.path.join(csv_base_dir, 'ldbc-100', 'csv')
}

serialized_graphs_path = {
'ldbc-sf10-ku': '/home/x74feng/CI/ldbc-sf10-serialized',
'ldbc-sf100-ku': '/home/x74feng/CI/ldbc-sf100-serialized'
'ldbc-sf10-ku': os.path.join(serialized_base_dir, 'ldbc-sf10-serialized'),
'ldbc-sf100-ku': os.path.join(serialized_base_dir, 'ldbc-sf100-serialized')
}

benchmark_server_dir = '/home/x74feng/CI/server'
benchmark_log_dir = benchmark_server_dir + '/data/logs'
benchmark_files = os.getenv("GITHUB_WORKSPACE") + '/benchmark/queries'
kuzu_benchmark_tool = os.getenv("GITHUB_WORKSPACE") + '/build/release/tools/benchmark/kuzu_benchmark'
benchmark_log_dir = os.path.join("/tmp", 'benchmark_logs')
shutil.rmtree(benchmark_log_dir, ignore_errors=True)
os.mkdir(benchmark_log_dir)


# benchmark configuration
num_warmup = 1
Expand All @@ -36,6 +89,16 @@ def __init__(self, benchmark_log, group_name='NULL'):
self.status = []
self.compiling_time = []
self.execution_time = []

profile_log_path = os.path.join(os.path.dirname(
benchmark_log), self.name + '_profile.txt')
if os.path.exists(profile_log_path):
with open(profile_log_path) as profile_file:
self.profile = profile_file.read()
else:
self.profile = None
with open(benchmark_log) as log_file:
self.log = log_file.read()
with open(benchmark_log) as log_file:
for line in log_file:
if ':' not in line:
Expand All @@ -49,23 +112,24 @@ def __init__(self, benchmark_log, group_name='NULL'):
elif key == 'Execution time':
self.execution_time.append(float(value))

def insert_db(self, run_num):
insert_query_record = '''INSERT INTO benchmark_result
(query_name, status, compiling_time, execution_time, run_id, query_group, query_seq)
values(?, ?, ?, ?, ?, ?, ?);'''
con = sqlite3.connect(benchmark_server_dir + '/benchmark.db')
cur = con.cursor()
def to_json_dict(self):
result = {
'query_name': self.name,
'query_group': self.group,
'log': self.log,
'profile': self.profile,
'records': []
}

for index, record in enumerate(self.status):
if record == 'pass':
cur.execute(insert_query_record,
(self.name, record, self.compiling_time[index],
self.execution_time[index], int(run_num), self.group, int(index + 1)))
else:
cur.execute(insert_query_record,
(self.name, record, 'NULL', 'NULL',
int(run_num), self.group, int(index + 1)))
con.commit()
con.close()
curr_dict = {
'status': record,
'compiling_time': self.compiling_time[index] if record == 'pass' else None,
'execution_time': self.execution_time[index] if record == 'pass' else None,
'query_seq': int(index + 1)
}
result['records'].append(curr_dict)
return result


class Benchmark:
Expand All @@ -89,7 +153,8 @@ def _load(self, path):
self.query += line + " "
line = next(f)
line = line.strip()
elif line.startswith('expectedNumOutput'): # parse number of output tuples
# parse number of output tuples
elif line.startswith('expectedNumOutput'):
self.expectedNumOutput = line.split(' ')[1]


Expand All @@ -115,22 +180,16 @@ def _load_group(self, group_path):
return benchmarks


def get_run_num():
if not os.path.exists(benchmark_server_dir + '/benchmark.db'):
logging.error("Benchmark db not found! PATH: " + benchmark_server_dir + '/benchmark.db')
sys.exit(1)
def serialize_dataset(dataset_name):
dataset_path = datasets_path[dataset_name]
serialized_graph_path = serialized_graphs_path[dataset_name]
serializer_script = os.path.join(base_dir, "serializer.py")
try:
query = 'SELECT MAX(run_id) FROM run_info'
con = sqlite3.connect(benchmark_server_dir + '/benchmark.db')
cur = con.cursor()
result_tuple = cur.execute(query).fetchone()
if result_tuple[0] is None:
return 1
else:
return result_tuple[0] + 1

except:
return 1
subprocess.run([sys.executable, serializer_script, dataset_name,
dataset_path, serialized_graph_path], check=True)
except subprocess.CalledProcessError as e:
logging.error("Failed to serialize dataset: %s", e)
sys.exit(1)


def run_kuzu(serialized_graph_path):
Expand All @@ -142,10 +201,12 @@ def run_kuzu(serialized_graph_path):
'--warmup=' + str(num_warmup),
'--run=' + str(num_run),
'--out=' + benchmark_log_dir + '/' + group,
'--bm-size=81920',
'--bm-size=' + str(bm_size),
'--thread=' + args.thread,
'--profile'
]
process = subprocess.Popen(tuple(benchmark_cmd), stdout=subprocess.PIPE)
process = subprocess.Popen(
tuple(benchmark_cmd), stdout=subprocess.PIPE)
for line in iter(process.stdout.readline, b''):
print(line.decode("utf-8"), end='')
process.communicate()[0]
Expand All @@ -158,56 +219,91 @@ def run_kuzu(serialized_graph_path):

def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--dataset', default='ldbc-sf10', help='dataset to run benchmark')
parser.add_argument('--thread', default='1', help='number of threads to run benchmark')
parser.add_argument('--note', default='automated benchmark run', help='note about this run')
parser.add_argument('--dataset', default='ldbc-sf100',
help='dataset to run benchmark')
parser.add_argument('--thread', default=str(cpu_count),
help='number of threads to run benchmark')
parser.add_argument(
'--note', default='automated benchmark run', help='note about this run')
return parser.parse_args()


def upload_run_info():
insert_run_info_query = 'INSERT INTO run_info (commit_id, run_timestamp, note, dataset) values(?, ?, ?, ?)'
con = sqlite3.connect(benchmark_server_dir + '/benchmark.db')
cur = con.cursor()
cur.execute(insert_run_info_query,
(os.environ.get('GITHUB_SHA'), datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), args.note,
args.dataset))
con.commit()
con.close()
def _get_git_revision_hash():
try:
return subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode("utf-8").strip()
except:
return None


def get_run_info():
return {
'commit_id': os.environ.get('GITHUB_SHA', _get_git_revision_hash()),
'run_timestamp': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'note': args.note,
'dataset': args.dataset
}

def upload_query_info(run_num):

def get_query_info():
results = []
for path in os.scandir(benchmark_log_dir):
if path.is_dir():
for filename in os.listdir(path):
if 'log' not in filename:
continue
queryBenchmark = QueryBenchmark(os.path.join(path, filename), path.name)
queryBenchmark.insert_db(run_num)


def upload_benchmark_result(run_num):
upload_run_info()
upload_query_info(run_num)
query_benchmark = QueryBenchmark(
os.path.join(path, filename), path.name)
results.append(query_benchmark.to_json_dict())
return results


def upload_benchmark_result():
run = get_run_info()
queries = get_query_info()
run['queries'] = queries

response = requests.post(
benchmark_server_url, json=run, headers={
'Content-Type': 'application/json; charset=utf-8',
'Authorization': 'Bearer ' + jwt_token
}
)
if response.status_code != 200:
logging.error(
"An error has occurred while uploading benchmark result!")
sys.exit(1)


if __name__ == '__main__':
if not os.path.exists(benchmark_server_dir):
logging.error("Benchmark Server Dir not found! PATH: " + benchmark_server_dir)
sys.exit(1)

args = parse_args()
run_num = get_run_num()
benchmark_log_dir = benchmark_log_dir + "/run" + str(run_num)
if not os.path.exists(benchmark_log_dir):
os.mkdir(benchmark_log_dir)
benchmark_log_dir = benchmark_log_dir
benchmark_files = benchmark_files + '/' + args.dataset
dataset_path = datasets_path[args.dataset + '-ku']

logging.getLogger().setLevel(logging.INFO)
logging.info("Running benchmark for dataset %s", args.dataset)
logging.info("Database version: %s", _get_kuzu_version())
logging.info("CPU cores: %d", cpu_count)
logging.info("Using %s threads", args.thread)
logging.info("Total memory: %d GiB", max_memory / 1024 ** 3)
logging.info("bm-size: %d MiB", bm_size)

# serialize dataset
serialize_dataset(args.dataset + '-ku')

# load benchmark
benchmark_group = BenchmarkGroup(benchmark_files)
benchmark_group.load()

logging.info("Running benchmark...")
run_kuzu(serialized_graphs_path[args.dataset + '-ku'])
logging.info("Benchmark finished")

if is_dry_run:
logging.info("Dry run, skipping upload")
sys.exit(0)

# upload benchmark result and logs
upload_benchmark_result(run_num)
logging.info("Uploading benchmark result...")
upload_benchmark_result()
logging.info("Benchmark result uploaded")
Loading

0 comments on commit 2889f49

Please sign in to comment.