Skip to content

Commit

Permalink
Merge pull request #2246 from kuzudb/node-group
Browse files Browse the repository at this point in the history
Node group based rel table
  • Loading branch information
ray6080 committed Nov 2, 2023
2 parents 7b408f1 + b89f443 commit 2f30023
Show file tree
Hide file tree
Showing 279 changed files with 3,348 additions and 9,046 deletions.
64 changes: 1 addition & 63 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.12 LANGUAGES CXX C)
project(Kuzu VERSION 0.0.12.1 LANGUAGES CXX C)

find_package(Threads REQUIRED)

Expand Down Expand Up @@ -78,7 +78,6 @@ if(MSVC)
# Required for M_PI on Windows
add_compile_definitions(_USE_MATH_DEFINES)
add_compile_definitions(NOMINMAX)
add_compile_definitions(ARROW_STATIC PARQUET_STATIC)
add_compile_definitions(SERD_STATIC)
# TODO (bmwinger): Figure out if this can be set automatically by cmake,
# or at least better integrated with user-specified options
Expand Down Expand Up @@ -175,67 +174,6 @@ endfunction()
add_definitions(-DKUZU_ROOT_DIRECTORY="${PROJECT_SOURCE_DIR}")
add_definitions(-DKUZU_STORAGE_VERSION="${CMAKE_PROJECT_VERSION}")

if (${USE_SYSTEM_ARROW})
find_package(Arrow REQUIRED)
find_package(Parquet REQUIRED)
if (TARGET arrow_shared)
set(ARROW_LIB arrow_shared)
else()
set(ARROW_LIB arrow_static)
endif()
if (TARGET parquet_shared)
set(PARQUET_LIB parquet_shared)
else()
set(PARQUET_LIB parquet_static)
endif()
else()
if (NOT DEFINED ARROW_INSTALL)
message(STATUS "Configuring arrow for bundled install")
set(ARROW_INSTALL ${CMAKE_CURRENT_BINARY_DIR}/arrow-build/arrow/install)
set(ARROW_BUILD_DIR ${CMAKE_CURRENT_BINARY_DIR}/arrow-build)
file(MAKE_DIRECTORY ${ARROW_BUILD_DIR})
execute_process(COMMAND ${CMAKE_COMMAND}
${CMAKE_CURRENT_SOURCE_DIR}/external
-G ${CMAKE_GENERATOR}
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DENABLE_ADDRESS_SANITIZER=${ENABLE_ADDRESS_SANITIZER}
-DENABLE_THREAD_SANITIZER=${ENABLE_THREAD_SANITIZER}
-DENABLE_UBSAN=${ENABLE_UBSAN}
-DFORCE_COLORED_OUTPUT=${FORCE_COLORED_OUTPUT}
WORKING_DIRECTORY ${ARROW_BUILD_DIR}
RESULT_VARIABLE ARROW_CONFIGURE_RESULT)
if (ARROW_CONFIGURE_RESULT)
message(FATAL_ERROR "Failed to configure arrow")
endif()
execute_process(COMMAND ${CMAKE_COMMAND} --build ${ARROW_BUILD_DIR} --config ${CMAKE_BUILD_TYPE}
RESULT_VARIABLE ARROW_BUILD_RESULT)
if (ARROW_BUILD_RESULT)
message(FATAL_ERROR "Failed to build arrow")
endif()
else()
message(STATUS "Using arrow at path ${ARROW_INSTALL}")
endif()
find_library(ARROW_DEPS_PATH arrow_bundled_dependencies HINTS ${ARROW_INSTALL}/lib ${ARROW_INSTALL}/lib64)
if(WIN32)
find_library(PARQUET_PATH parquet_static HINTS ${ARROW_INSTALL}/lib ${ARROW_INSTALL}/lib64)
find_library(ARROW_PATH arrow_static HINTS ${ARROW_INSTALL}/lib ${ARROW_INSTALL}/lib64)
else()
find_library(PARQUET_PATH parquet HINTS ${ARROW_INSTALL}/lib ${ARROW_INSTALL}/lib64)
find_library(ARROW_PATH arrow HINTS ${ARROW_INSTALL}/lib ${ARROW_INSTALL}/lib64)
endif()

add_library(arrow_deps STATIC IMPORTED)
set_target_properties(arrow_deps PROPERTIES IMPORTED_LOCATION ${ARROW_DEPS_PATH})
add_library(parquet_lib STATIC IMPORTED)
set_target_properties(parquet_lib PROPERTIES IMPORTED_LOCATION ${PARQUET_PATH})
add_library(arrow_lib STATIC IMPORTED)
set_target_properties(arrow_lib PROPERTIES IMPORTED_LOCATION ${ARROW_PATH})
include_directories(${ARROW_INSTALL}/include)

set(ARROW_LIB arrow_lib arrow_deps)
set(PARQUET_LIB parquet_lib)
endif()

include_directories(src/include)
include_directories(third_party/antlr4_cypher/include)
include_directories(third_party/antlr4_runtime/src)
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: release debug test benchmark all alldebug arrow clean clean-external clean-all
.PHONY: release debug test benchmark all alldebug clean clean-all

GENERATOR=
FORCE_COLOR=
Expand Down Expand Up @@ -110,7 +110,7 @@ nodejstest: nodejs
cd $(ROOT_DIR)/tools/nodejs_api/ && \
npm test

javatest: arrow java
javatest: java
ifeq ($(OS),Windows_NT)
$(call mkdirp,tools/java_api/build/test) && cd tools/java_api/ && \
javac -d build/test -cp ".;build/kuzu_java.jar;third_party/junit-platform-console-standalone-1.9.3.jar" -sourcepath src/test/java/com/kuzudb/test/*.java && \
Expand Down
21 changes: 0 additions & 21 deletions external/CMakeLists.txt

This file was deleted.

36 changes: 0 additions & 36 deletions external/arrow/apache_arrow.cmake

This file was deleted.

4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ add_subdirectory(transaction)

add_library(kuzu STATIC ${ALL_OBJECT_FILES})
target_link_libraries(kuzu
PUBLIC antlr4_cypher antlr4_runtime fast_float utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor miniparquet)
PUBLIC antlr4_cypher antlr4_runtime fast_float utf8proc re2 serd Threads::Threads fastpfor miniparquet)
target_include_directories(kuzu
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
add_library(kuzu_shared SHARED ${ALL_OBJECT_FILES})
if(NOT WIN32)
set_target_properties(kuzu_shared PROPERTIES OUTPUT_NAME kuzu)
endif()
target_link_libraries(kuzu_shared
PUBLIC antlr4_cypher antlr4_runtime fast_float utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor miniparquet)
PUBLIC antlr4_cypher antlr4_runtime fast_float utf8proc re2 serd Threads::Threads fastpfor miniparquet)
target_include_directories(kuzu_shared
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)

Expand Down
90 changes: 52 additions & 38 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ using namespace kuzu::parser;
namespace kuzu {
namespace binder {

static constexpr uint64_t NUM_COLUMNS_TO_SKIP_IN_REL_FILE = 2;

std::unique_ptr<BoundStatement> Binder::bindCopyToClause(const Statement& statement) {
auto& copyToStatement = (CopyTo&)statement;
auto boundFilePath = copyToStatement.getFilePath();
Expand Down Expand Up @@ -118,21 +120,19 @@ std::unique_ptr<BoundStatement> Binder::bindCopyNodeFrom(
// For table with SERIAL columns, we need to read in serial from files.
auto containsSerial = bindContainsSerial(tableSchema);
auto columns = bindExpectedNodeFileColumns(tableSchema, *readerConfig);
auto nodeID =
createVariable(std::string(Property::INTERNAL_ID_NAME), LogicalTypeID::INTERNAL_ID);
auto nodeID = createVariable(std::string(Property::INTERNAL_ID_NAME), LogicalTypeID::INT64);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), std::move(nodeID), TableType::NODE);
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
tableSchema, std::move(boundFileScanInfo), containsSerial, nullptr /* extraInfo */);
std::move(readerConfig), columns, std::move(nodeID), TableType::NODE);
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(tableSchema,
std::move(boundFileScanInfo), containsSerial, std::move(columns), nullptr /* extraInfo */);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

std::unique_ptr<BoundStatement> Binder::bindCopyRdfNodeFrom(
std::unique_ptr<ReaderConfig> readerConfig, TableSchema* tableSchema) {
auto containsSerial = bindContainsSerial(tableSchema);
auto stringType = LogicalType{LogicalTypeID::STRING};
auto nodeID =
createVariable(std::string(Property::INTERNAL_ID_NAME), LogicalTypeID::INTERNAL_ID);
auto nodeID = createVariable(std::string(Property::INTERNAL_ID_NAME), LogicalTypeID::INT64);
expression_vector columns;
auto columnName = std::string(RDFKeyword::ANONYMOUS);
readerConfig->columnNames.push_back(columnName);
Expand All @@ -147,9 +147,9 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRdfNodeFrom(
std::make_unique<RdfReaderConfig>(RdfReaderMode::LITERAL, nullptr /* index */);
}
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), std::move(nodeID), TableType::NODE);
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
tableSchema, std::move(boundFileScanInfo), containsSerial, nullptr /* extraInfo */);
std::move(readerConfig), columns, std::move(nodeID), TableType::NODE);
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(tableSchema,
std::move(boundFileScanInfo), containsSerial, std::move(columns), nullptr /* extraInfo */);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

Expand All @@ -158,32 +158,38 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(
// For table with SERIAL columns, we need to read in serial from files.
auto containsSerial = bindContainsSerial(tableSchema);
assert(containsSerial == false);
auto columns = bindExpectedRelFileColumns(tableSchema, *readerConfig);
auto srcKey = columns[0];
auto dstKey = columns[1];
auto relID =
createVariable(std::string(Property::INTERNAL_ID_NAME), LogicalTypeID::INTERNAL_ID);
auto columnsToRead = bindExpectedRelFileColumns(tableSchema, *readerConfig);
auto relID = createVariable(std::string(Property::INTERNAL_ID_NAME), LogicalTypeID::INT64);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), std::move(relID), TableType::REL);
std::move(readerConfig), columnsToRead, relID->copy(), TableType::REL);
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
auto srcTableSchema =
catalog.getReadOnlyVersion()->getTableSchema(relTableSchema->getSrcTableID());
auto dstTableSchema =
catalog.getReadOnlyVersion()->getTableSchema(relTableSchema->getDstTableID());
auto arrowColumnType = LogicalType{LogicalTypeID::ARROW_COLUMN};
auto srcOffset = createVariable(std::string(Property::REL_BOUND_OFFSET_NAME), arrowColumnType);
auto dstOffset = createVariable(std::string(Property::REL_NBR_OFFSET_NAME), arrowColumnType);
auto srcKey = columnsToRead[0];
auto dstKey = columnsToRead[1];
auto srcNodeID =
createVariable(std::string(Property::REL_BOUND_OFFSET_NAME), LogicalTypeID::INT64);
auto dstNodeID =
createVariable(std::string(Property::REL_NBR_OFFSET_NAME), LogicalTypeID::INT64);
auto extraCopyRelInfo = std::make_unique<ExtraBoundCopyRelInfo>(
srcTableSchema, dstTableSchema, srcOffset, dstOffset, srcKey, dstKey);
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
tableSchema, std::move(boundFileScanInfo), containsSerial, std::move(extraCopyRelInfo));
srcTableSchema, dstTableSchema, srcNodeID, dstNodeID, srcKey, dstKey);
// Skip the first two columns.
expression_vector columnsToCopy{std::move(srcNodeID), std::move(dstNodeID), std::move(relID)};
for (auto i = NUM_COLUMNS_TO_SKIP_IN_REL_FILE; i < columnsToRead.size(); i++) {
columnsToCopy.push_back(std::move(columnsToRead[i]));
}
auto boundCopyFromInfo =
std::make_unique<BoundCopyFromInfo>(tableSchema, std::move(boundFileScanInfo),
containsSerial, std::move(columnsToCopy), std::move(extraCopyRelInfo));
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

std::unique_ptr<BoundStatement> Binder::bindCopyRdfRelFrom(
std::unique_ptr<ReaderConfig> readerConfig, TableSchema* tableSchema) {
auto containsSerial = bindContainsSerial(tableSchema);
auto offsetType = std::make_unique<LogicalType>(LogicalTypeID::ARROW_COLUMN);
auto offsetType = std::make_unique<LogicalType>(LogicalTypeID::INT64);
expression_vector columns;
for (auto i = 0u; i < 3; ++i) {
auto columnName = std::string(RDFKeyword::ANONYMOUS) + std::to_string(i);
Expand All @@ -201,13 +207,13 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRdfRelFrom(
readerConfig->rdfReaderConfig =
std::make_unique<RdfReaderConfig>(RdfReaderMode::LITERAL_TRIPLE, index);
}
auto relID =
createVariable(std::string(Property::INTERNAL_ID_NAME), LogicalTypeID::INTERNAL_ID);
auto relID = createVariable(std::string(Property::INTERNAL_ID_NAME), LogicalTypeID::INT64);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), columns, std::move(relID), TableType::REL);
std::move(readerConfig), columns, relID, TableType::REL);
auto extraInfo = std::make_unique<ExtraBoundCopyRdfRelInfo>(columns[0], columns[1], columns[2]);
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
tableSchema, std::move(boundFileScanInfo), containsSerial, std::move(extraInfo));
columns.push_back(std::move(relID));
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(tableSchema,
std::move(boundFileScanInfo), containsSerial, std::move(columns), std::move(extraInfo));
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

Expand All @@ -234,7 +240,7 @@ expression_vector Binder::bindExpectedNodeFileColumns(
}
} break;
default: {
throw NotImplementedException{"Binder::bindCopyNodeColumns"};
throw NotImplementedException{"Binder::bindCopyNodeFileColumns"};
}
}
// Detect columns from file.
Expand All @@ -258,7 +264,6 @@ expression_vector Binder::bindExpectedRelFileColumns(
case FileType::CSV:
case FileType::PARQUET:
case FileType::NPY: {
auto arrowColumnType = LogicalType{LogicalTypeID::ARROW_COLUMN};
auto srcColumnName = std::string(Property::REL_FROM_PROPERTY_NAME);
auto dstColumnName = std::string(Property::REL_TO_PROPERTY_NAME);
readerConfig.columnNames.push_back(srcColumnName);
Expand All @@ -269,19 +274,28 @@ expression_vector Binder::bindExpectedRelFileColumns(
auto dstTable =
catalog.getReadOnlyVersion()->getTableSchema(relTableSchema->getDstTableID());
assert(dstTable->tableType == TableType::NODE);
readerConfig.columnTypes.push_back(
reinterpret_cast<NodeTableSchema*>(srcTable)->getPrimaryKey()->getDataType()->copy());
readerConfig.columnTypes.push_back(
reinterpret_cast<NodeTableSchema*>(dstTable)->getPrimaryKey()->getDataType()->copy());
columns.push_back(createVariable(srcColumnName, arrowColumnType));
columns.push_back(createVariable(dstColumnName, arrowColumnType));
auto srcPKColumnType =
reinterpret_cast<NodeTableSchema*>(srcTable)->getPrimaryKey()->getDataType()->copy();
if (srcPKColumnType->getLogicalTypeID() == LogicalTypeID::SERIAL) {
srcPKColumnType = std::make_unique<LogicalType>(LogicalTypeID::INT64);
}
auto dstPKColumnType =
reinterpret_cast<NodeTableSchema*>(dstTable)->getPrimaryKey()->getDataType()->copy();
if (dstPKColumnType->getLogicalTypeID() == LogicalTypeID::SERIAL) {
dstPKColumnType = std::make_unique<LogicalType>(LogicalTypeID::INT64);
}
columns.push_back(createVariable(srcColumnName, *srcPKColumnType));
columns.push_back(createVariable(dstColumnName, *dstPKColumnType));
readerConfig.columnTypes.push_back(std::move(srcPKColumnType));
readerConfig.columnTypes.push_back(std::move(dstPKColumnType));
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
continue;
}
readerConfig.columnNames.push_back(property->getName());
readerConfig.columnTypes.push_back(property->getDataType()->copy());
columns.push_back(createVariable(property->getName(), arrowColumnType));
auto columnType = property->getDataType()->copy();
columns.push_back(createVariable(property->getName(), *columnType));
readerConfig.columnTypes.push_back(std::move(columnType));
}
} break;
default: {
Expand Down
11 changes: 2 additions & 9 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ void Catalog::prepareCommitOrRollback(TransactionAction action) {
if (hasUpdates()) {
wal->logCatalogRecord();
if (action == TransactionAction::COMMIT) {
catalogContentForWriteTrx->saveToFile(wal->getDirectory(), DBFileType::WAL_VERSION);
catalogContentForWriteTrx->saveToFile(
wal->getDirectory(), FileVersionType::WAL_VERSION);
}
}
}
Expand All @@ -48,18 +49,12 @@ table_id_t Catalog::addNodeTableSchema(const binder::BoundCreateTableInfo& info)
table_id_t Catalog::addRelTableSchema(const binder::BoundCreateTableInfo& info) {
initCatalogContentForWriteTrxIfNecessary();
auto tableID = catalogContentForWriteTrx->addRelTableSchema(info);
wal->logRelTableRecord(tableID);
return tableID;
}

common::table_id_t Catalog::addRelTableGroupSchema(const binder::BoundCreateTableInfo& info) {
initCatalogContentForWriteTrxIfNecessary();
auto tableID = catalogContentForWriteTrx->addRelTableGroupSchema(info);
auto relTableGroupSchema =
(RelTableGroupSchema*)catalogContentForWriteTrx->getTableSchema(tableID);
for (auto relTableID : relTableGroupSchema->getRelTableIDs()) {
wal->logRelTableRecord(relTableID);
}
return tableID;
}

Expand Down Expand Up @@ -104,8 +99,6 @@ void Catalog::addRelProperty(
initCatalogContentForWriteTrxIfNecessary();
catalogContentForWriteTrx->getTableSchema(tableID)->addRelProperty(
propertyName, std::move(dataType));
wal->logAddPropertyRecord(
tableID, catalogContentForWriteTrx->getTableSchema(tableID)->getPropertyID(propertyName));
}

void Catalog::dropProperty(table_id_t tableID, property_id_t propertyID) {
Expand Down
Loading

0 comments on commit 2f30023

Please sign in to comment.