Skip to content

Commit

Permalink
Switch to arrow node reader
Browse files Browse the repository at this point in the history
  • Loading branch information
printfCalvin committed Jan 3, 2023
1 parent 881b0ef commit 035c295
Show file tree
Hide file tree
Showing 53 changed files with 51,822 additions and 16 deletions.
Binary file added .DS_Store
Binary file not shown.
9 changes: 9 additions & 0 deletions .github/workflows/ci-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ jobs:
steps:
- uses: actions/checkout@v2
- run: sudo apt install -y python3-pip && sudo apt install -y sqlite3
- run: >
sudo apt-get update &&
sudo apt-get install -y ca-certificates lsb-release wget &&
wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb &&
sudo apt-get install -y ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb &&
rm ./apache-arrow* &&
sudo apt-get update &&
sudo apt-get install -y libarrow-dev libparquet-dev
- run: pip3 install -r tools/python_api/requirements_dev.txt

- name: build
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ build/
__pycache__/
*.py[cod]
*$py.class
cmake-build-debug/
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@ Kùzu is an in-process property graph database management system (GDBMS) built f
Kùzu is being actively developed at University of Waterloo as a feature-rich and usable GDBMS. Kùzu is available under a permissible license. So try it out and help us make it better! We welcome your feedback and feature requests.

## Build
<<<<<<< Updated upstream
<<<<<<< Updated upstream
To build from source code, Kùzu requires Cmake(>=3.11), Python 3, and a compiler that supports `C++20`.
=======
To build from source code, Kùzu requires Cmake(>=3.11), Python 3, [Apache Arrow](https://arrow.apache.org/), [xsimd](https://xsimd.readthedocs.io/en/latest/), and a compiler that supports `C++20`.
>>>>>>> Stashed changes
=======
To build from source code, Kùzu requires Cmake(>=3.11), Python 3, [Apache Arrow](https://arrow.apache.org/), [xsimd](https://xsimd.readthedocs.io/en/latest/), and a compiler that supports `C++20`.
>>>>>>> Stashed changes
- Perform a full clean build
- `make clean && make`
- Run tests (optional)
Expand Down
8 changes: 8 additions & 0 deletions dataset/copy-csv-node-property-test/vPerson.csv
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@
5,"qifidjufri"
6,"gqpnpbdmrb"
7,"dgzbiqjkaz"
<<<<<<< Updated upstream
<<<<<<< Updated upstream
8,"ebf,,uq\buqma""
=======
8,"ebf,,uq\buqma"
>>>>>>> Stashed changes
=======
8,"ebf,,uq\buqma"
>>>>>>> Stashed changes
9,"rwhnybogfy"
10,"enqpnymvdb"
11,"axgwwhhohf"
Expand Down
1 change: 1 addition & 0 deletions dataset/copy-test/node/arrow/copy_csv.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY tableOfTypes FROM "dataset/copy-test/node/arrow/types_50k.arrow" (HEADER=true);
1 change: 1 addition & 0 deletions dataset/copy-test/node/arrow/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, PRIMARY KEY (id));
Binary file added dataset/copy-test/node/arrow/types_50k.arrow
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/copy-test/node/csv/copy_csv.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY tableOfTypes FROM "dataset/copy-test/node/csv/types_50k.csv" (HEADER=true);
1 change: 1 addition & 0 deletions dataset/copy-test/node/csv/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, PRIMARY KEY (id));
50,000 changes: 50,000 additions & 0 deletions dataset/copy-test/node/csv/types_50k.csv

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions dataset/copy-test/node/parquet/copy_csv.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY tableOfTypes FROM "dataset/copy-test/node/parquet/types_50k.parquet" (HEADER=true);
1 change: 1 addition & 0 deletions dataset/copy-test/node/parquet/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, PRIMARY KEY (id));
Binary file added dataset/copy-test/node/parquet/types_50k.parquet
Binary file not shown.
17 changes: 17 additions & 0 deletions dataset/tinysnb/vPerson.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
id,fname,Gender,ISStudent,isWorker,age,eyeSight,birthdate,registerTime,lastJobDuration,workedHours,usedNames,courseScoresPerTerm
<<<<<<< Updated upstream
<<<<<<< Updated upstream
0,Alice,1,true,false,35,5.0,1900-01-01,2011-08-20 11:25:30Z+00:00,3 years 2 days 13 hours 2 minutes,[10,5],["Aida"],[[10,8],[6,7,8]]
2,Bob,2,true,false,30,5.1,1900-01-01,2008-11-03 13:25:30.000526-02:00,10 years 5 months 13 hours 24 us,[12,8],[Bobby],[[8,9],[9,10]]
3,Carol,1,false,true,45,5.0,1940-06-22,1911-08-20 02:32:21,48 hours 24 minutes 11 seconds,[4,5],[Carmen,Fred],[[8,10]]
Expand All @@ -7,3 +9,18 @@ id,fname,Gender,ISStudent,isWorker,age,eyeSight,birthdate,registerTime,lastJobDu
8,Farooq,2,true,false,25,4.5,1980-10-26,1972-07-31 13:22:30.678559,18 minutes 24 milliseconds,[3,4,5,6,7],[Fesdwe],[[8]]
9,Greg,2,false,false,40,4.9,1980-10-26,1976-12-23 11:21:42Z+06:40,10 years 5 months 13 hours 24 us,[1],[Grad],[[10]]
10,Hubert Blaine Wolfeschlegelsteinhausenbergerdorff,2,false,true,83,4.9,1990-11-27,2023-02-21 13:25:30,3 years 2 days 13 hours 2 minutes,[10,11,12,3,4,5,6,7],[Ad,De,Hi,Kye,Orlan],[[7],[10],[6,7]]
=======
=======
>>>>>>> Stashed changes
0,Alice,1,true,false,35,5.0,1900-01-01,2011-08-20 11:25:30Z+00:00,3 years 2 days 13 hours 2 minutes,"[10,5]","[Aida]","[[10,8],[6,7,8]]"
2,Bob,2,true,false,30,5.1,1900-01-01,2008-11-03 13:25:30.000526-02:00,10 years 5 months 13 hours 24 us,"[12,8]","[Bobby]","[[8,9],[9,10]]"
3,Carol,1,false,true,45,5.0,1940-06-22,1911-08-20 02:32:21,48 hours 24 minutes 11 seconds,"[4,5]","[Carmen,Fred]","[[8,10]]"
5,Dan,2,false,true,20,4.8,1950-7-23,2031-11-30 12:25:30Z,10 years 5 months 13 hours 24 us,"[1,9]","[Wolfeschlegelstein,Daniel]","[[7,4],[8,8],[9]]"
7,Elizabeth,1,false,true,20,4.7,1980-10-26,1976-12-23 11:21:42,48 hours 24 minutes 11 seconds,"[2]","[Ein]","[[6],[7],[8]]"
8,Farooq,2,true,false,25,4.5,1980-10-26,1972-07-31 13:22:30.678559,18 minutes 24 milliseconds,"[3,4,5,6,7]","[Fesdwe]","[[8]]"
9,Greg,2,false,false,40,4.9,1980-10-26,1976-12-23 11:21:42Z+06:40,10 years 5 months 13 hours 24 us,"[1]","[Grad]","[[10]]"
10,Hubert Blaine Wolfeschlegelsteinhausenbergerdorff,2,false,true,83,4.9,1990-11-27,2023-02-21 13:25:30,3 years 2 days 13 hours 2 minutes,"[10,11,12,3,4,5,6,7]","[Ad,De,Hi,Kye,Orlan]","[[7],[10],[6,7]]"
<<<<<<< Updated upstream
>>>>>>> Stashed changes
=======
>>>>>>> Stashed changes
6 changes: 6 additions & 0 deletions scripts/dockerized-ci-tests-runner/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ RUN apt-get install -y clang-format
RUN apt-get install -y clang-13
RUN apt-get install -y nodejs
RUN apt-get install -y jq
RUN apt-get install -y ca-certificates lsb-release wget
RUN wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
RUN apt-get install -y ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb
RUN rm ./apache-arrow*
RUN apt-get update
RUN apt-get install -y libarrow-dev libparquet-dev

# Install GitHub action runner
RUN mkdir /actions-runner
Expand Down
6 changes: 6 additions & 0 deletions src/binder/bind/bind_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
#include "binder/bind/bound_drop_table.h"
#include "binder/binder.h"
#include "parser/ddl/create_node_clause.h"
<<<<<<< Updated upstream
<<<<<<< Updated upstream
#include "parser/ddl/create_rel_clause.h"
=======
>>>>>>> Stashed changes
=======
>>>>>>> Stashed changes
#include "parser/ddl/drop_table.h"

namespace kuzu {
Expand Down
37 changes: 37 additions & 0 deletions src/binder/bind/bind_updating_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,15 @@ unique_ptr<BoundCreateRel> Binder::bindCreateRel(
} else {
auto propertyExpression =
expressionBinder.bindRelPropertyExpression(rel, property.name);
<<<<<<< Updated upstream
<<<<<<< Updated upstream
auto nullExpression = expressionBinder.bindNullLiteralExpression();
=======
=======
>>>>>>> Stashed changes
shared_ptr<Expression> nullExpression =
LiteralExpression::createNullLiteralExpression(getUniqueExpressionName("NULL"));
>>>>>>> Stashed changes
nullExpression = ExpressionBinder::implicitCastIfNecessary(
nullExpression, propertyExpression->dataType);
setItems.emplace_back(std::move(propertyExpression), std::move(nullExpression));
Expand All @@ -108,6 +116,7 @@ unique_ptr<BoundUpdatingClause> Binder::bindSetClause(const UpdatingClause& upda
auto boundSetClause = make_unique<BoundSetClause>();
for (auto i = 0u; i < setClause.getNumSetItems(); ++i) {
auto setItem = setClause.getSetItem(i);
<<<<<<< Updated upstream
auto nodeOrRel = expressionBinder.bindExpression(*setItem.first->getChild(0));
switch (nodeOrRel->dataType.typeID) {
case DataTypeID::NODE: {
Expand All @@ -121,6 +130,21 @@ unique_ptr<BoundUpdatingClause> Binder::bindSetClause(const UpdatingClause& upda
default:
throw BinderException("Set " + expressionTypeToString(nodeOrRel->expressionType) +
" property is supported.");
=======
auto boundLhs = expressionBinder.bindExpression(*setItem->origin);
auto boundNodeOrRel = boundLhs->getChild(0);
if (boundNodeOrRel->dataType.typeID != NODE) {
throw BinderException("Set " + Types::dataTypeToString(boundNodeOrRel->dataType) +
" property is supported.");
}
auto boundNode = static_pointer_cast<NodeExpression>(boundNodeOrRel);
if (boundNode->isMultiLabeled()) {
throw BinderException("Set property of node " + boundNode->getRawName() +
" with multiple node labels is not supported.");
<<<<<<< Updated upstream
>>>>>>> Stashed changes
=======
>>>>>>> Stashed changes
}
}
return boundSetClause;
Expand Down Expand Up @@ -156,6 +180,7 @@ unique_ptr<BoundUpdatingClause> Binder::bindDeleteClause(const UpdatingClause& u
auto& deleteClause = (DeleteClause&)updatingClause;
auto boundDeleteClause = make_unique<BoundDeleteClause>();
for (auto i = 0u; i < deleteClause.getNumExpressions(); ++i) {
<<<<<<< Updated upstream
auto nodeOrRel = expressionBinder.bindExpression(*deleteClause.getExpression(i));
switch (nodeOrRel->dataType.typeID) {
case DataTypeID::NODE: {
Expand All @@ -168,6 +193,18 @@ unique_ptr<BoundUpdatingClause> Binder::bindDeleteClause(const UpdatingClause& u
} break;
default:
throw BinderException("Delete " + expressionTypeToString(nodeOrRel->expressionType) +
=======
auto boundExpression = expressionBinder.bindExpression(*deleteClause.getExpression(i));
if (boundExpression->dataType.typeID == NODE) {
auto deleteNode = bindDeleteNode(static_pointer_cast<NodeExpression>(boundExpression));
boundDeleteClause->addDeleteNode(std::move(deleteNode));
} else if (boundExpression->dataType.typeID == REL) {
auto deleteRel = bindDeleteRel(static_pointer_cast<RelExpression>(boundExpression));
boundDeleteClause->addDeleteRel(std::move(deleteRel));
} else {
throw BinderException("Delete " +
expressionTypeToString(boundExpression->expressionType) +
>>>>>>> Stashed changes
" is not supported.");
}
}
Expand Down
18 changes: 18 additions & 0 deletions src/binder/expression_binder.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
#include "binder/expression_binder.h"

#include "binder/binder.h"
<<<<<<< Updated upstream
<<<<<<< Updated upstream
#include "binder/expression/case_expression.h"
=======
>>>>>>> Stashed changes
=======
>>>>>>> Stashed changes
#include "binder/expression/existential_subquery_expression.h"
#include "binder/expression/function_expression.h"
#include "binder/expression/literal_expression.h"
Expand All @@ -10,7 +16,13 @@
#include "common/type_utils.h"
#include "function/boolean/vector_boolean_operations.h"
#include "function/null/vector_null_operations.h"
<<<<<<< Updated upstream
<<<<<<< Updated upstream
#include "parser/expression/parsed_case_expression.h"
=======
>>>>>>> Stashed changes
=======
>>>>>>> Stashed changes
#include "parser/expression/parsed_function_expression.h"
#include "parser/expression/parsed_literal_expression.h"
#include "parser/expression/parsed_parameter_expression.h"
Expand Down Expand Up @@ -44,8 +56,14 @@ shared_ptr<Expression> ExpressionBinder::bindExpression(const ParsedExpression&
expression = bindVariableExpression(parsedExpression);
} else if (EXISTENTIAL_SUBQUERY == expressionType) {
expression = bindExistentialSubqueryExpression(parsedExpression);
<<<<<<< Updated upstream
<<<<<<< Updated upstream
} else if (CASE_ELSE == expressionType) {
expression = bindCaseExpression(parsedExpression);
=======
>>>>>>> Stashed changes
=======
>>>>>>> Stashed changes
} else {
throw NotImplementedException(
"bindExpression(" + expressionTypeToString(expressionType) + ").");
Expand Down
46 changes: 36 additions & 10 deletions src/common/task_system/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,32 @@ shared_ptr<ScheduledTask> TaskScheduler::scheduleTask(const shared_ptr<Task>& ta
return scheduledTask;
}

void TaskScheduler::errorIfThereIsAnException() {
lock_t lck{mtx};
errorIfThereIsAnExceptionNoLock();
lck.unlock();
}

void TaskScheduler::errorIfThereIsAnExceptionNoLock() {
for (auto it = taskQueue.begin(); it != taskQueue.end(); ++it) {
auto task = (*it)->task;
if (task->hasException()) {
taskQueue.erase(it);
std::rethrow_exception(task->getExceptionPtr());
}
// TODO(Semih): We can optimize to stop after finding a registrable task. This is
// because tasks after the first registrable task in the queue cannot have any thread
// yet registered to them, so they cannot have errored.
}
}

void TaskScheduler::waitAllTasksToCompleteOrError() {
while (true) {
lock_t lck{mtx};
if (taskQueue.empty()) {
return;
}
for (auto it = taskQueue.begin(); it != taskQueue.end(); ++it) {
auto task = (*it)->task;
if (task->hasException()) {
taskQueue.erase(it);
std::rethrow_exception(task->getExceptionPtr());
}
// TODO(Semih): We can optimize to stop after finding a registrable task. This is
// because tasks after the first registrable task in the queue cannot have any thread
// yet registered to them, so they cannot have errored.
}
errorIfThereIsAnExceptionNoLock();
lck.unlock();
this_thread::sleep_for(chrono::microseconds(THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS));
}
Expand All @@ -62,6 +72,22 @@ void TaskScheduler::scheduleTaskAndWaitOrError(const shared_ptr<Task>& task) {
removeErroringTask(scheduledTask->ID);
std::rethrow_exception(task->getExceptionPtr());
}
<<<<<<< Updated upstream
<<<<<<< Updated upstream
=======
=======
>>>>>>> Stashed changes
}

void TaskScheduler::waitUntilEnoughTasksFinish(int64_t minimumNumTasksToScheduleMore) {
while (getNumTasks() > minimumNumTasksToScheduleMore) {
errorIfThereIsAnException();
this_thread::sleep_for(chrono::microseconds(THREAD_SLEEP_TIME_WHEN_WAITING_IN_MICROS));
}
<<<<<<< Updated upstream
>>>>>>> Stashed changes
=======
>>>>>>> Stashed changes
}

shared_ptr<ScheduledTask> TaskScheduler::getTaskAndRegister() {
Expand Down
12 changes: 12 additions & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,20 @@ ValueVector::ValueVector(DataType dataType, MemoryManager* memoryManager)

void ValueVector::addString(uint32_t pos, char* value, uint64_t len) const {
assert(dataType.typeID == STRING);
<<<<<<< Updated upstream
<<<<<<< Updated upstream
auto& entry = ((ku_string_t*)getData())[pos];
InMemOverflowBufferUtils::copyString(value, len, entry, *inMemOverflowBuffer);
=======
auto vectorData = (ku_string_t*)valueBuffer.get();
auto& result = vectorData[pos];
InMemOverflowBufferUtils::copyString(value, len, result, *inMemOverflowBuffer);
>>>>>>> Stashed changes
=======
auto vectorData = (ku_string_t*)valueBuffer.get();
auto& result = vectorData[pos];
InMemOverflowBufferUtils::copyString(value, len, result, *inMemOverflowBuffer);
>>>>>>> Stashed changes
}

bool NodeIDVector::discardNull(ValueVector& vector) {
Expand Down
7 changes: 7 additions & 0 deletions src/expression_evaluator/function_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ void FunctionExpressionEvaluator::init(const ResultSet& resultSet, MemoryManager
if (expression->dataType.typeID == BOOL) {
selectFunc = ((ScalarFunctionExpression&)*expression).selectFunc;
}
<<<<<<< Updated upstream
=======
resultVector = make_shared<ValueVector>(expression->dataType, memoryManager);
if (children.empty()) { // const function, e.g. PI()
resultVector->state = DataChunkState::getSingleValueDataChunkState();
}
>>>>>>> Stashed changes
for (auto& child : children) {
parameters.push_back(child->resultVector);
}
Expand Down
10 changes: 10 additions & 0 deletions src/expression_evaluator/literal_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,15 @@ void LiteralExpressionEvaluator::resolveResultVector(
resultVector->state = DataChunkState::getSingleValueDataChunkState();
}

<<<<<<< Updated upstream
=======
bool LiteralExpressionEvaluator::select(SelectionVector& selVector) {
assert(resultVector->dataType.typeID == BOOL);
auto pos = resultVector->state->selVector->selectedPositions[0];
assert(pos == 0u);
return resultVector->getValue<bool>(pos) && (!resultVector->isNull(pos));
}

>>>>>>> Stashed changes
} // namespace evaluator
} // namespace kuzu
12 changes: 12 additions & 0 deletions src/expression_evaluator/reference_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@ namespace evaluator {
inline static bool isTrue(ValueVector& vector, uint64_t pos) {
assert(vector.dataType.typeID == BOOL);
return !vector.isNull(pos) && vector.getValue<bool>(pos);
<<<<<<< Updated upstream
<<<<<<< Updated upstream
=======
=======
>>>>>>> Stashed changes
}

void ReferenceExpressionEvaluator::init(const ResultSet& resultSet, MemoryManager* memoryManager) {
assert(children.empty());
resultVector =
resultSet.dataChunks[vectorPos.dataChunkPos]->valueVectors[vectorPos.valueVectorPos];
>>>>>>> Stashed changes
}

bool ReferenceExpressionEvaluator::select(SelectionVector& selVector) {
Expand Down
Loading

0 comments on commit 035c295

Please sign in to comment.