Skip to content

Commit

Permalink
Switch to Arrow node reader
Browse files Browse the repository at this point in the history
  • Loading branch information
printfCalvin committed Jan 3, 2023
1 parent c49cb1c commit 4f84fce
Show file tree
Hide file tree
Showing 35 changed files with 50,860 additions and 329 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/ci-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ jobs:
steps:
- uses: actions/checkout@v2
- run: sudo apt install -y python3-pip && sudo apt install -y sqlite3
- run: >
sudo apt-get update &&
sudo apt-get install -y ca-certificates lsb-release wget &&
wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb &&
sudo apt-get install -y ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb &&
rm ./apache-arrow* &&
sudo apt-get update &&
sudo apt-get install -y libarrow-dev libparquet-dev
- run: pip3 install -r tools/python_api/requirements_dev.txt

- name: build
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ build/
__pycache__/
*.py[cod]
*$py.class
cmake-build-debug/
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ project(Kuzu VERSION 0.0.1 LANGUAGES CXX)

find_package(Threads REQUIRED)

set(CMAKE_FIND_PACKAGE_RESOLVE_SYMLINKS TRUE)
find_package(Arrow REQUIRED)
find_package(Parquet REQUIRED)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
Expand Down Expand Up @@ -69,6 +72,8 @@ include_directories(third_party/spdlog)
include_directories(third_party/nlohmann_json)
include_directories(third_party/utf8proc/include)
include_directories(third_party/pybind11/include)
include_directories(${ARROW_INCLUDE_DIR})
include_directories(${PARQUET_INCLUDE_DIR})

add_subdirectory(third_party)
add_subdirectory(src)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Kùzu is an in-process property graph database management system (GDBMS) built f
Kùzu is being actively developed at University of Waterloo as a feature-rich and usable GDBMS. Kùzu is available under a permissible license. So try it out and help us make it better! We welcome your feedback and feature requests.

## Build
To build from source code, Kùzu requires Cmake(>=3.11), Python 3, and a compiler that supports `C++20`.
To build from source code, Kùzu requires Cmake(>=3.11), Python 3, [Apache Arrow](https://arrow.apache.org/), [xsimd](https://xsimd.readthedocs.io/en/latest/), and a compiler that supports `C++20`.
- Perform a full clean build
- `make clean && make`
- Run tests (optional)
Expand Down
2 changes: 1 addition & 1 deletion dataset/copy-csv-node-property-test/vPerson.csv
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
5,"qifidjufri"
6,"gqpnpbdmrb"
7,"dgzbiqjkaz"
8,"ebf,,uq\buqma""
8,"ebf,,uq\buqma"
9,"rwhnybogfy"
10,"enqpnymvdb"
11,"axgwwhhohf"
Expand Down
1 change: 1 addition & 0 deletions dataset/copy-test/node/arrow/copy_csv.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY tableOfTypes FROM "dataset/copy-test/node/arrow/types_50k.arrow" (HEADER=true);
1 change: 1 addition & 0 deletions dataset/copy-test/node/arrow/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, PRIMARY KEY (id));
Binary file added dataset/copy-test/node/arrow/types_50k.arrow
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/copy-test/node/csv/copy_csv.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY tableOfTypes FROM "dataset/copy-test/node/csv/types_50k.csv" (HEADER=true);
1 change: 1 addition & 0 deletions dataset/copy-test/node/csv/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, PRIMARY KEY (id));
50,000 changes: 50,000 additions & 0 deletions dataset/copy-test/node/csv/types_50k.csv

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions dataset/copy-test/node/parquet/copy_csv.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY tableOfTypes FROM "dataset/copy-test/node/parquet/types_50k.parquet" (HEADER=true);
1 change: 1 addition & 0 deletions dataset/copy-test/node/parquet/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, PRIMARY KEY (id));
Binary file added dataset/copy-test/node/parquet/types_50k.parquet
Binary file not shown.
16 changes: 8 additions & 8 deletions dataset/tinysnb/vPerson.csv
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
id,fname,Gender,ISStudent,isWorker,age,eyeSight,birthdate,registerTime,lastJobDuration,workedHours,usedNames,courseScoresPerTerm
0,Alice,1,true,false,35,5.0,1900-01-01,2011-08-20 11:25:30Z+00:00,3 years 2 days 13 hours 2 minutes,[10,5],["Aida"],[[10,8],[6,7,8]]
2,Bob,2,true,false,30,5.1,1900-01-01,2008-11-03 13:25:30.000526-02:00,10 years 5 months 13 hours 24 us,[12,8],[Bobby],[[8,9],[9,10]]
3,Carol,1,false,true,45,5.0,1940-06-22,1911-08-20 02:32:21,48 hours 24 minutes 11 seconds,[4,5],[Carmen,Fred],[[8,10]]
5,Dan,2,false,true,20,4.8,1950-7-23,2031-11-30 12:25:30Z,10 years 5 months 13 hours 24 us,[1,9],[Wolfeschlegelstein,Daniel],[[7,4],[8,8],[9]]
7,Elizabeth,1,false,true,20,4.7,1980-10-26,1976-12-23 11:21:42,48 hours 24 minutes 11 seconds,[2],[Ein],[[6],[7],[8]]
8,Farooq,2,true,false,25,4.5,1980-10-26,1972-07-31 13:22:30.678559,18 minutes 24 milliseconds,[3,4,5,6,7],[Fesdwe],[[8]]
9,Greg,2,false,false,40,4.9,1980-10-26,1976-12-23 11:21:42Z+06:40,10 years 5 months 13 hours 24 us,[1],[Grad],[[10]]
10,Hubert Blaine Wolfeschlegelsteinhausenbergerdorff,2,false,true,83,4.9,1990-11-27,2023-02-21 13:25:30,3 years 2 days 13 hours 2 minutes,[10,11,12,3,4,5,6,7],[Ad,De,Hi,Kye,Orlan],[[7],[10],[6,7]]
0,Alice,1,true,false,35,5.0,1900-01-01,2011-08-20 11:25:30Z+00:00,3 years 2 days 13 hours 2 minutes,"[10,5]","[Aida]","[[10,8],[6,7,8]]"
2,Bob,2,true,false,30,5.1,1900-01-01,2008-11-03 13:25:30.000526-02:00,10 years 5 months 13 hours 24 us,"[12,8]","[Bobby]","[[8,9],[9,10]]"
3,Carol,1,false,true,45,5.0,1940-06-22,1911-08-20 02:32:21,48 hours 24 minutes 11 seconds,"[4,5]","[Carmen,Fred]","[[8,10]]"
5,Dan,2,false,true,20,4.8,1950-7-23,2031-11-30 12:25:30Z,10 years 5 months 13 hours 24 us,"[1,9]","[Wolfeschlegelstein,Daniel]","[[7,4],[8,8],[9]]"
7,Elizabeth,1,false,true,20,4.7,1980-10-26,1976-12-23 11:21:42,48 hours 24 minutes 11 seconds,"[2]","[Ein]","[[6],[7],[8]]"
8,Farooq,2,true,false,25,4.5,1980-10-26,1972-07-31 13:22:30.678559,18 minutes 24 milliseconds,"[3,4,5,6,7]","[Fesdwe]","[[8]]"
9,Greg,2,false,false,40,4.9,1980-10-26,1976-12-23 11:21:42Z+06:40,10 years 5 months 13 hours 24 us,"[1]","[Grad]","[[10]]"
10,Hubert Blaine Wolfeschlegelsteinhausenbergerdorff,2,false,true,83,4.9,1990-11-27,2023-02-21 13:25:30,3 years 2 days 13 hours 2 minutes,"[10,11,12,3,4,5,6,7]","[Ad,De,Hi,Kye,Orlan]","[[7],[10],[6,7]]"
6 changes: 6 additions & 0 deletions scripts/dockerized-ci-tests-runner/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ RUN apt-get install -y clang-format
RUN apt-get install -y clang-13
RUN apt-get install -y nodejs
RUN apt-get install -y jq
RUN apt-get install -y ca-certificates lsb-release wget
RUN wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
RUN apt-get install -y ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
RUN rm ./apache-arrow*
RUN apt-get update
RUN apt-get install -y libarrow-dev libparquet-dev

# Install GitHub action runner
RUN mkdir /actions-runner
Expand Down
2 changes: 1 addition & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ add_subdirectory(storage)
add_subdirectory(transaction)

add_library(kuzu STATIC ${ALL_OBJECT_FILES})
target_link_libraries(kuzu PUBLIC antlr4_cypher antlr4_runtime utf8proc Threads::Threads)
target_link_libraries(kuzu PUBLIC antlr4_cypher antlr4_runtime utf8proc Threads::Threads Arrow::arrow_shared parquet_shared)
target_include_directories(
kuzu PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
37 changes: 27 additions & 10 deletions src/common/task_system/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,32 @@ shared_ptr<ScheduledTask> TaskScheduler::scheduleTask(const shared_ptr<Task>& ta
return scheduledTask;
}

void TaskScheduler::errorIfThereIsAnException() {
lock_t lck{mtx};
errorIfThereIsAnExceptionNoLock();
lck.unlock();
}

void TaskScheduler::errorIfThereIsAnExceptionNoLock() {
for (auto it = taskQueue.begin(); it != taskQueue.end(); ++it) {
auto task = (*it)->task;
if (task->hasException()) {
taskQueue.erase(it);
std::rethrow_exception(task->getExceptionPtr());
}
// TODO(Semih): We can optimize to stop after finding a registrable task. This is
// because tasks after the first registrable task in the queue cannot have any thread
// yet registered to them, so they cannot have errored.
}
}

void TaskScheduler::waitAllTasksToCompleteOrError() {
while (true) {
lock_t lck{mtx};
if (taskQueue.empty()) {
return;
}
for (auto it = taskQueue.begin(); it != taskQueue.end(); ++it) {
auto task = (*it)->task;
if (task->hasException()) {
taskQueue.erase(it);
std::rethrow_exception(task->getExceptionPtr());
}
// TODO(Semih): We can optimize to stop after finding a registrable task. This is
// because tasks after the first registrable task in the queue cannot have any thread
// yet registered to them, so they cannot have errored.
}
errorIfThereIsAnExceptionNoLock();
lck.unlock();
this_thread::sleep_for(chrono::microseconds(THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS));
}
Expand All @@ -64,6 +74,13 @@ void TaskScheduler::scheduleTaskAndWaitOrError(const shared_ptr<Task>& task) {
}
}

void TaskScheduler::waitUntilEnoughTasksFinish(int64_t minimumNumTasksToScheduleMore) {
while (getNumTasks() > minimumNumTasksToScheduleMore) {
errorIfThereIsAnException();
this_thread::sleep_for(chrono::microseconds(THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS));
}
}

shared_ptr<ScheduledTask> TaskScheduler::getTaskAndRegister() {
lock_t lck{mtx};
if (taskQueue.empty()) {
Expand Down
6 changes: 6 additions & 0 deletions src/include/common/configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ struct CopyCSVConfig {
// Size (in bytes) of the chunks to be read in InMemNode/RelCSVCopier
static constexpr uint64_t CSV_READING_BLOCK_SIZE = 1 << 23;

// Number of tasks to be assigned in a batch when reading files.
static constexpr uint64_t NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH = 200;

// Lower bound for number of incomplete tasks in copier to trigger scheduling a new batch.
static constexpr uint64_t MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE = 50;

// Default configuration for csv file parsing
static constexpr const char* STRING_CSV_PARSING_OPTIONS[5] = {
"ESCAPE", "DELIM", "QUOTE", "LIST_BEGIN", "LIST_END"};
Expand Down
9 changes: 9 additions & 0 deletions src/include/common/task_system/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,20 @@ class TaskScheduler {
// from the task queue and remain in the queue. So for now, use this function if you
// want the system to crash if any of the tasks fails.
void waitAllTasksToCompleteOrError();

void waitUntilEnoughTasksFinish(int64_t minimumNumTasksToScheduleMore);

// Checks if there is an erroring task in the queue and if so, errors.
void errorIfThereIsAnException();

bool isTaskQueueEmpty() { return taskQueue.empty(); }
uint64_t getNumTasks() { return taskQueue.size(); }

private:
void removeErroringTask(uint64_t scheduledTaskID);

void errorIfThereIsAnExceptionNoLock();

// Functions to launch worker threads and for the worker threads to use to grab task from queue.
void runWorkerThread();
shared_ptr<ScheduledTask> getTaskAndRegister();
Expand Down
105 changes: 105 additions & 0 deletions src/include/storage/in_mem_csv_copier/in_mem_arrow_node_copier.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#pragma once

#include "in_mem_structures_copier.h"
#include "storage/index/hash_index_builder.h"
#include "storage/store/nodes_statistics_and_deleted_ids.h"
#include <arrow/api.h>
#include <arrow/csv/api.h>
#include <arrow/io/api.h>
#include <arrow/ipc/reader.h>
#include <arrow/pretty_print.h>
#include <arrow/result.h>
#include <arrow/scalar.h>
#include <arrow/status.h>
#include <arrow/table.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <parquet/exception.h>

namespace kuzu {
namespace storage {

class InMemArrowNodeCopier : public InMemStructuresCopier {

public:
InMemArrowNodeCopier(CSVDescription& csvDescription, string outputDirectory,
TaskScheduler& taskScheduler, Catalog& catalog, table_id_t tableID,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs);

~InMemArrowNodeCopier() override = default;

uint64_t copy();

void saveToFile() override;

enum class FileTypes { CSV, ARROW, PARQUET };

static std::string getFileTypeName(FileTypes fileTypes);

static std::string getFileTypeSuffix(FileTypes fileTypes);

private:
void setFileType(std::string const& fileName);

void countNumLines(const std::string& filePath);

arrow::Status countNumLinesCSV(std::string const& filePath);

arrow::Status countNumLinesArrow(std::string const& filePath);

arrow::Status countNumLinesParquet(std::string const& filePath);

void initializeColumnsAndList();

template<typename T>
arrow::Status populateColumns();

template<typename T>
arrow::Status populateColumnsFromCSV(unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
arrow::Status populateColumnsFromArrow(unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
arrow::Status populateColumnsFromParquet(unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename T>
static void putPropsOfLineIntoColumns(vector<unique_ptr<InMemColumn>>& columns,
vector<PageByteCursor>& overflowCursors, const std::vector<shared_ptr<T>>& arrow_columns,
uint64_t nodeOffset, uint64_t bufferOffset, char delimiter);

template<typename T>
static void populatePKIndex(InMemColumn* column, HashIndexBuilder<T>* pkIndex,
node_offset_t startOffset, uint64_t numValues);

// Concurrent tasks.
// Note that primaryKeyPropertyIdx is *NOT* the property ID of the primary key property.
// Instead, it is the index in the structured columns that we expect it to appear.

template<typename T1, typename T2>
static arrow::Status batchPopulateColumnsTask(uint64_t primaryKeyPropertyIdx, uint64_t blockId,
uint64_t offsetStart, HashIndexBuilder<T1>* pkIndex, InMemArrowNodeCopier* copier,
const vector<shared_ptr<T2>>& batchColumns, char delimiter);

arrow::Status initCSVReader(
shared_ptr<arrow::csv::StreamingReader>& csv_streaming_reader, const std::string& filePath);

arrow::Status initArrowReader(std::shared_ptr<arrow::ipc::RecordBatchFileReader>& ipc_reader,
const std::string& filePath);

arrow::Status initParquetReader(
std::unique_ptr<parquet::arrow::FileReader>& reader, const std::string& filePath);

static Literal getArrowList(
string& l, int64_t from, int64_t to, const DataType& dataType, char delimiter);

private:
NodeTableSchema* nodeTableSchema;
uint64_t numNodes;
vector<unique_ptr<InMemColumn>> structuredColumns;
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs;
FileTypes inputFileType;
};

} // namespace storage
} // namespace kuzu
55 changes: 0 additions & 55 deletions src/include/storage/in_mem_csv_copier/in_mem_node_csv_copier.h

This file was deleted.

4 changes: 2 additions & 2 deletions src/include/storage/in_mem_csv_copier/in_mem_rel_csv_copier.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include "in_mem_structures_csv_copier.h"
#include "in_mem_structures_copier.h"
#include "storage/index/hash_index.h"
#include "storage/store/rels_statistics.h"

Expand All @@ -13,7 +13,7 @@ using table_adj_in_mem_lists_map_t = unordered_map<table_id_t, unique_ptr<InMemA
using table_property_in_mem_columns_map_t =
unordered_map<table_id_t, vector<unique_ptr<InMemColumn>>>;

class InMemRelCSVCopier : public InMemStructuresCSVCopier {
class InMemRelCSVCopier : public InMemStructuresCopier {

public:
InMemRelCSVCopier(CSVDescription& csvDescription, string outputDirectory,
Expand Down
Loading

0 comments on commit 4f84fce

Please sign in to comment.