Skip to content

Commit

Permalink
Merge pull request #2028 from kuzudb/rdf-copy
Browse files Browse the repository at this point in the history
Implement copy node rdf
  • Loading branch information
acquamarin committed Sep 13, 2023
2 parents defa82b + f99d35a commit 678055d
Show file tree
Hide file tree
Showing 54 changed files with 578,250 additions and 48 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ if(MSVC)
add_compile_definitions(_USE_MATH_DEFINES)
add_compile_definitions(NOMINMAX)
add_compile_definitions(ARROW_STATIC PARQUET_STATIC)
add_compile_definitions(SERD_STATIC)
# TODO (bmwinger): Figure out if this can be set automatically by cmake,
# or at least better integrated with user-specified options
# For now, hardcode _AMD64_
Expand Down Expand Up @@ -215,6 +216,7 @@ include_directories(third_party/utf8proc/include)
include_directories(third_party/pybind11/include)
include_directories(third_party/re2/include)
include_directories(third_party/concurrentqueue)
include_directories(third_party/serd/include)

add_subdirectory(third_party)
add_subdirectory(src)
Expand Down
1 change: 1 addition & 0 deletions dataset/copy-test/node/turtle/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY taxonomy_RESOURCE FROM "dataset/copy-test/node/turtle/taxonomy.ttl" ;
1 change: 1 addition & 0 deletions dataset/copy-test/node/turtle/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE RDF GRAPH taxonomy;
570,274 changes: 570,274 additions & 0 deletions dataset/copy-test/node/turtle/taxonomy.ttl

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ add_subdirectory(transaction)

add_library(kuzu STATIC ${ALL_OBJECT_FILES})
target_link_libraries(kuzu
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
target_include_directories(kuzu
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
add_library(kuzu_shared SHARED ${ALL_OBJECT_FILES})
Expand All @@ -24,6 +24,6 @@ else()
set_target_properties(kuzu_shared PROPERTIES OUTPUT_NAME kuzu)
endif()
target_link_libraries(kuzu_shared
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
target_include_directories(kuzu_shared
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
35 changes: 28 additions & 7 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,35 @@ static bool skipPropertyInFile(const Property& property) {
expression_vector Binder::bindCopyNodeColumns(
TableSchema* tableSchema, CopyDescription::FileType fileType) {
expression_vector columnExpressions;
auto isCopyCSV = fileType == CopyDescription::FileType::CSV;
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
continue;
switch (fileType) {
case common::CopyDescription::FileType::TURTLE: {
columnExpressions.push_back(createVariable("SUBJECT", LogicalType{LogicalTypeID::STRING}));
columnExpressions.push_back(
createVariable("PREDICATE", LogicalType{LogicalTypeID::STRING}));
columnExpressions.push_back(createVariable("OBJECT", LogicalType{LogicalTypeID::STRING}));
} break;
case common::CopyDescription::FileType::CSV: {
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
continue;
}
columnExpressions.push_back(
createVariable(property->getName(), *property->getDataType()));
}
} break;
case common::CopyDescription::FileType::NPY:
case common::CopyDescription::FileType::PARQUET: {
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
continue;
}
columnExpressions.push_back(
createVariable(property->getName(), LogicalType{LogicalTypeID::ARROW_COLUMN}));
}
auto dataType =
isCopyCSV ? *property->getDataType() : LogicalType{LogicalTypeID::ARROW_COLUMN};
columnExpressions.push_back(createVariable(property->getName(), dataType));
} break;
default: {
throw NotImplementedException{"Binder::bindCopyNodeColumns"};
}
}
return columnExpressions;
}
Expand Down
1 change: 1 addition & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ struct ListsMetadataConstants {
// Hash Index Configurations
struct HashIndexConstants {
static constexpr uint8_t SLOT_CAPACITY = 3;
static constexpr double MAX_LOAD_FACTOR = 0.8;
};

struct CopyConstants {
Expand Down
1 change: 0 additions & 1 deletion src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ struct CSVReaderConfig {

struct CopyDescription {
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3, TURTLE = 4 };

FileType fileType;
std::vector<std::string> filePaths;
std::vector<std::string> columnNames;
Expand Down
10 changes: 9 additions & 1 deletion src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class CopyNodeSharedState {
}
}

inline uint64_t getCurNodeGroupIdx() const { return currentNodeGroupIdx; }

void logCopyNodeWALRecord(storage::WAL* wal);

void appendLocalNodeGroup(std::unique_ptr<storage::NodeGroup> localNodeGroup);
Expand All @@ -48,6 +50,7 @@ class CopyNodeSharedState {
uint64_t currentNodeGroupIdx;
// The sharedNodeGroup is to accumulate left data within local node groups in CopyNode ops.
std::unique_ptr<storage::NodeGroup> sharedNodeGroup;
bool isCopyTurtle;
};

struct CopyNodeInfo {
Expand Down Expand Up @@ -90,7 +93,7 @@ class CopyNode : public Sink {

static void writeAndResetNodeGroup(common::node_group_idx_t nodeGroupIdx,
storage::PrimaryKeyIndexBuilder* pkIndex, common::column_id_t pkColumnID,
storage::NodeTable* table, storage::NodeGroup* nodeGroup);
storage::NodeTable* table, storage::NodeGroup* nodeGroup, bool isCopyTurtle);

private:
inline bool isCopyAllowed() const {
Expand All @@ -108,6 +111,11 @@ class CopyNode : public Sink {
static uint64_t appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex,
storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes);

void appendUniqueValueToPKIndex(
storage::PrimaryKeyIndexBuilder* pkIndex, common::ValueVector* vectorToAppend);

void copyToNodeGroup(std::vector<DataPos> dataPoses);

private:
std::shared_ptr<CopyNodeSharedState> sharedState;
CopyNodeInfo copyNodeInfo;
Expand Down
4 changes: 4 additions & 0 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class Reader : public PhysicalOperator {
return make_unique<Reader>(info->copy(), sharedState, getOperatorID(), paramsString);
}

inline bool isCopyTurtleFile() const {
return sharedState->copyDescription->fileType == common::CopyDescription::FileType::TURTLE;
}

inline bool getContainsSerial() const { return info->containsSerial; }

protected:
Expand Down
37 changes: 37 additions & 0 deletions src/include/processor/operator/persistent/reader/rdf_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once

#include "common/data_chunk/data_chunk.h"
#include "common/string_utils.h"
#include "serd.h"

namespace kuzu {
namespace storage {

class RDFReader {
public:
explicit RDFReader(std::string filePath);

~RDFReader();

common::offset_t read(common::DataChunk* dataChunkToRead);

private:
static SerdStatus errorHandle(void* handle, const SerdError* error);
static SerdStatus handleStatements(void* handle, SerdStatementFlags flags,
const SerdNode* graph, const SerdNode* subject, const SerdNode* predicate,
const SerdNode* object, const SerdNode* object_datatype, const SerdNode* object_lang);
static bool isSerdTypeSupported(SerdType serdType);

private:
const std::string filePath;
FILE* fp;
SerdReader* reader;
common::offset_t numLinesRead;
SerdStatus status;
common::ValueVector* subjectVector;
common::ValueVector* predicateVector;
common::ValueVector* objectVector;
};

} // namespace storage
} // namespace kuzu
19 changes: 18 additions & 1 deletion src/include/processor/operator/persistent/reader_functions.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "processor/operator/persistent/csv_reader.h"
#include "processor/operator/persistent/reader/csv_reader.h"
#include "processor/operator/persistent/reader/rdf_reader.h"
#include "storage/copier/npy_reader.h"
#include "storage/copier/table_copy_utils.h"

Expand Down Expand Up @@ -32,6 +33,10 @@ struct NPYReaderFunctionData : public ReaderFunctionData {
std::unique_ptr<storage::NpyMultiFileReader> reader = nullptr;
};

struct RDFReaderFunctionData : public ReaderFunctionData {
std::unique_ptr<storage::RDFReader> reader = nullptr;
};

struct FileBlocksInfo {
common::row_idx_t numRows = 0;
common::block_idx_t numBlocks = 0;
Expand Down Expand Up @@ -69,6 +74,10 @@ struct ReaderFunctions {
}
static void validateNPYFiles(
const std::vector<std::string>& paths, catalog::TableSchema* tableSchema);
static inline void validateRDFFiles(
const std::vector<std::string>& paths, catalog::TableSchema* tableSchema) {
// DO NOTHING.
}

static std::vector<FileBlocksInfo> countRowsInRelCSVFile(const std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
Expand All @@ -82,6 +91,9 @@ struct ReaderFunctions {
static std::vector<FileBlocksInfo> countRowsInNPYFile(const std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
storage::MemoryManager* memoryManager);
static std::vector<FileBlocksInfo> countRowsInRDFFile(const std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
storage::MemoryManager* memoryManager);

static void initRelCSVReadData(ReaderFunctionData& funcData,
const std::vector<std::string>& paths, common::vector_idx_t fileIdx,
Expand All @@ -95,6 +107,9 @@ struct ReaderFunctions {
static void initNPYReadData(ReaderFunctionData& funcData, const std::vector<std::string>& paths,
common::vector_idx_t fileIdx, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema);
static void initRDFReadData(ReaderFunctionData& funcData, const std::vector<std::string>& paths,
common::vector_idx_t fileIdx, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema);

static void readRowsFromRelCSVFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead);
Expand All @@ -104,6 +119,8 @@ struct ReaderFunctions {
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
static void readRowsFromNPYFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
static void readRowsFromRDFFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);

static std::unique_ptr<common::DataChunk> getDataChunkToRead(
catalog::TableSchema* tableSchema, storage::MemoryManager* memoryManager);
Expand Down
4 changes: 0 additions & 4 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ namespace processor {

std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyFrom(LogicalOperator* logicalOperator) {
auto copyFrom = (LogicalCopyFrom*)logicalOperator;
auto info = copyFrom->getInfo();
if (info->copyDesc->fileType == CopyDescription::FileType::TURTLE) {
throw NotImplementedException("PlanMapper::mapCopyFrom");
}
switch (copyFrom->getInfo()->tableSchema->getTableType()) {
case TableType::NODE:
return mapCopyNodeFrom(logicalOperator);
Expand Down
3 changes: 2 additions & 1 deletion src/processor/operator/persistent/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
add_subdirectory(reader)

add_library(kuzu_processor_operator_persistent
OBJECT
copy.cpp
Expand All @@ -7,7 +9,6 @@ add_library(kuzu_processor_operator_persistent
copy_rel_lists.cpp
copy_to.cpp
csv_file_writer.cpp
csv_reader.cpp
delete.cpp
delete_executor.cpp
insert.cpp
Expand Down
87 changes: 64 additions & 23 deletions src/processor/operator/persistent/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ namespace processor {
CopyNodeSharedState::CopyNodeSharedState(uint64_t& numRows, NodeTableSchema* tableSchema,
NodeTable* table, const CopyDescription& copyDesc, MemoryManager* memoryManager)
: numRows{numRows}, copyDesc{copyDesc}, tableSchema{tableSchema}, table{table}, pkColumnID{0},
hasLoggedWAL{false}, currentNodeGroupIdx{0} {
hasLoggedWAL{false}, currentNodeGroupIdx{0}, isCopyTurtle{copyDesc.fileType ==
CopyDescription::FileType::TURTLE} {
auto ftTableSchema = std::make_unique<FactorizedTableSchema>();
ftTableSchema->appendColumn(
std::make_unique<ColumnSchema>(false /* flat */, 0 /* dataChunkPos */,
Expand Down Expand Up @@ -63,7 +64,7 @@ void CopyNodeSharedState::appendLocalNodeGroup(std::unique_ptr<NodeGroup> localN
if (sharedNodeGroup->isFull()) {
auto nodeGroupIdx = getNextNodeGroupIdxWithoutLock();
CopyNode::writeAndResetNodeGroup(
nodeGroupIdx, pkIndex.get(), pkColumnID, table, sharedNodeGroup.get());
nodeGroupIdx, pkIndex.get(), pkColumnID, table, sharedNodeGroup.get(), isCopyTurtle);
}
if (numNodesAppended < localNodeGroup->getNumNodes()) {
sharedNodeGroup->append(localNodeGroup.get(), numNodesAppended);
Expand All @@ -83,24 +84,23 @@ void CopyNode::executeInternal(ExecutionContext* context) {
while (children[0]->getNextTuple(context)) {
auto originalSelVector =
resultSet->getDataChunk(copyNodeInfo.dataColumnPoses[0].dataChunkPos)->state->selVector;
// All tuples in the resultSet are in the same data chunk.
auto numTuplesToAppend = originalSelVector->selectedSize;
auto numAppendedTuples = 0ul;
while (numAppendedTuples < numTuplesToAppend) {
auto numAppendedTuplesInNodeGroup = localNodeGroup->append(
resultSet, copyNodeInfo.dataColumnPoses, numTuplesToAppend - numAppendedTuples);
numAppendedTuples += numAppendedTuplesInNodeGroup;
if (localNodeGroup->isFull()) {
node_group_idx_t nodeGroupIdx;
nodeGroupIdx = sharedState->getNextNodeGroupIdx();
writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(),
sharedState->pkColumnID, sharedState->table, localNodeGroup.get());
}
if (numAppendedTuples < numTuplesToAppend) {
sliceDataChunk(
*resultSet->getDataChunk(copyNodeInfo.dataColumnPoses[0].dataChunkPos),
copyNodeInfo.dataColumnPoses, (offset_t)numAppendedTuplesInNodeGroup);
switch (sharedState->copyDesc.fileType) {
case CopyDescription::FileType::TURTLE: {
for (auto& dataPos : copyNodeInfo.dataColumnPoses) {
// All tuples in the resultSet are in the same data chunk.
auto vectorToAppend = resultSet->getValueVector(dataPos).get();
appendUniqueValueToPKIndex(sharedState->pkIndex.get(), vectorToAppend);
copyToNodeGroup({dataPos});
}
} break;
case CopyDescription::FileType::CSV:
case CopyDescription::FileType::PARQUET:
case CopyDescription::FileType::NPY: {
copyToNodeGroup(copyNodeInfo.dataColumnPoses);
} break;
default: {
throw NotImplementedException{"CopyNode::executeInternal"};
}
}
resultSet->getDataChunk(copyNodeInfo.dataColumnPoses[0].dataChunkPos)->state->selVector =
std::move(originalSelVector);
Expand Down Expand Up @@ -130,11 +130,11 @@ void CopyNode::sliceDataChunk(
}

void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx,
PrimaryKeyIndexBuilder* pkIndex, column_id_t pkColumnID, NodeTable* table,
NodeGroup* nodeGroup) {
PrimaryKeyIndexBuilder* pkIndex, column_id_t pkColumnID, NodeTable* table, NodeGroup* nodeGroup,
bool isCopyTurtle) {
nodeGroup->setNodeGroupIdx(nodeGroupIdx);
auto startOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
if (pkIndex) {
if (pkIndex && !isCopyTurtle) {
populatePKIndex(pkIndex, nodeGroup->getColumnChunk(pkColumnID), startOffset,
nodeGroup->getNumNodes() /* startPageIdx */);
}
Expand Down Expand Up @@ -191,7 +191,7 @@ void CopyNode::finalize(ExecutionContext* context) {
if (sharedState->sharedNodeGroup) {
auto nodeGroupIdx = sharedState->getNextNodeGroupIdx();
writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(), sharedState->pkColumnID,
sharedState->table, sharedState->sharedNodeGroup.get());
sharedState->table, sharedState->sharedNodeGroup.get(), sharedState->isCopyTurtle);
}
if (sharedState->pkIndex) {
sharedState->pkIndex->flush();
Expand Down Expand Up @@ -240,5 +240,46 @@ uint64_t CopyNode::appendToPKIndex<ku_string_t>(
return numValues;
}

void CopyNode::appendUniqueValueToPKIndex(
storage::PrimaryKeyIndexBuilder* pkIndex, common::ValueVector* vectorToAppend) {
auto selVector = std::make_unique<common::SelectionVector>(common::DEFAULT_VECTOR_CAPACITY);
selVector->resetSelectorToValuePosBuffer();
common::sel_t nextPos = 0;
common::offset_t result;
auto offset = StorageUtils::getStartOffsetOfNodeGroup(sharedState->getCurNodeGroupIdx()) +
localNodeGroup->getNumNodes();
for (auto i = 0u; i < vectorToAppend->state->getNumSelectedValues(); i++) {
auto uriStr = vectorToAppend->getValue<common::ku_string_t>(i).getAsString();
if (!pkIndex->lookup((int64_t)uriStr.c_str(), result)) {
pkIndex->append(uriStr.c_str(), offset++);
selVector->selectedPositions[nextPos++] = i;
}
}
selVector->selectedSize = nextPos;
vectorToAppend->state->selVector = std::move(selVector);
}

void CopyNode::copyToNodeGroup(std::vector<DataPos> dataPoses) {
auto numAppendedTuples = 0ul;
auto numTuplesToAppend =
resultSet->getDataChunk(dataPoses[0].dataChunkPos)->state->getNumSelectedValues();
while (numAppendedTuples < numTuplesToAppend) {
auto numAppendedTuplesInNodeGroup =
localNodeGroup->append(resultSet, dataPoses, numTuplesToAppend - numAppendedTuples);
numAppendedTuples += numAppendedTuplesInNodeGroup;
if (localNodeGroup->isFull()) {
node_group_idx_t nodeGroupIdx;
nodeGroupIdx = sharedState->getNextNodeGroupIdx();
writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(),
sharedState->pkColumnID, sharedState->table, localNodeGroup.get(),
sharedState->isCopyTurtle);
}
if (numAppendedTuples < numTuplesToAppend) {
sliceDataChunk(*resultSet->getDataChunk(copyNodeInfo.dataColumnPoses[0].dataChunkPos),
copyNodeInfo.dataColumnPoses, (offset_t)numAppendedTuplesInNodeGroup);
}
}
}

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit 678055d

Please sign in to comment.