Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement copy node rdf #2028

Merged
merged 1 commit into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ if(MSVC)
add_compile_definitions(_USE_MATH_DEFINES)
add_compile_definitions(NOMINMAX)
add_compile_definitions(ARROW_STATIC PARQUET_STATIC)
add_compile_definitions(SERD_STATIC)
# TODO (bmwinger): Figure out if this can be set automatically by cmake,
# or at least better integrated with user-specified options
# For now, hardcode _AMD64_
Expand Down Expand Up @@ -209,6 +210,7 @@ include_directories(third_party/utf8proc/include)
include_directories(third_party/pybind11/include)
include_directories(third_party/re2/include)
include_directories(third_party/concurrentqueue)
include_directories(third_party/serd/include)

add_subdirectory(third_party)
add_subdirectory(src)
Expand Down
1 change: 1 addition & 0 deletions dataset/copy-test/node/turtle/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY taxonomy_RESOURCE FROM "dataset/copy-test/node/turtle/taxonomy.ttl" ;
1 change: 1 addition & 0 deletions dataset/copy-test/node/turtle/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE RDF GRAPH taxonomy;
570,274 changes: 570,274 additions & 0 deletions dataset/copy-test/node/turtle/taxonomy.ttl

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ add_subdirectory(transaction)

add_library(kuzu STATIC ${ALL_OBJECT_FILES})
target_link_libraries(kuzu
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
target_include_directories(kuzu
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
add_library(kuzu_shared SHARED ${ALL_OBJECT_FILES})
Expand All @@ -24,6 +24,6 @@ else()
set_target_properties(kuzu_shared PROPERTIES OUTPUT_NAME kuzu)
endif()
target_link_libraries(kuzu_shared
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads)
target_include_directories(kuzu_shared
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
35 changes: 28 additions & 7 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,35 @@
expression_vector Binder::bindCopyNodeColumns(
TableSchema* tableSchema, CopyDescription::FileType fileType) {
expression_vector columnExpressions;
auto isCopyCSV = fileType == CopyDescription::FileType::CSV;
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
continue;
switch (fileType) {
case common::CopyDescription::FileType::TURTLE: {
columnExpressions.push_back(createVariable("SUBJECT", LogicalType{LogicalTypeID::STRING}));
columnExpressions.push_back(
createVariable("PREDICATE", LogicalType{LogicalTypeID::STRING}));
columnExpressions.push_back(createVariable("OBJECT", LogicalType{LogicalTypeID::STRING}));
} break;
case common::CopyDescription::FileType::CSV: {
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
continue;
}
columnExpressions.push_back(
createVariable(property->getName(), *property->getDataType()));
}
} break;
case common::CopyDescription::FileType::NPY:
case common::CopyDescription::FileType::PARQUET: {
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
continue;
}
columnExpressions.push_back(
createVariable(property->getName(), LogicalType{LogicalTypeID::ARROW_COLUMN}));
}
auto dataType =
isCopyCSV ? *property->getDataType() : LogicalType{LogicalTypeID::ARROW_COLUMN};
columnExpressions.push_back(createVariable(property->getName(), dataType));
} break;
default: {
throw NotImplementedException{"Binder::bindCopyNodeColumns"};

Check warning on line 247 in src/binder/bind/bind_copy.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_copy.cpp#L246-L247

Added lines #L246 - L247 were not covered by tests
}
}
return columnExpressions;
}
Expand Down
1 change: 1 addition & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ struct ListsMetadataConstants {
// Hash Index Configurations
struct HashIndexConstants {
static constexpr uint8_t SLOT_CAPACITY = 3;
static constexpr double MAX_LOAD_FACTOR = 0.8;
};

struct CopyConstants {
Expand Down
1 change: 0 additions & 1 deletion src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ struct CSVReaderConfig {

struct CopyDescription {
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3, TURTLE = 4 };

FileType fileType;
std::vector<std::string> filePaths;
std::vector<std::string> columnNames;
Expand Down
10 changes: 9 additions & 1 deletion src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class CopyNodeSharedState {
}
}

inline uint64_t getCurNodeGroupIdx() const { return currentNodeGroupIdx; }

void logCopyNodeWALRecord(storage::WAL* wal);

void appendLocalNodeGroup(std::unique_ptr<storage::NodeGroup> localNodeGroup);
Expand All @@ -48,6 +50,7 @@ class CopyNodeSharedState {
uint64_t currentNodeGroupIdx;
// The sharedNodeGroup is to accumulate left data within local node groups in CopyNode ops.
std::unique_ptr<storage::NodeGroup> sharedNodeGroup;
bool isCopyTurtle;
};

struct CopyNodeInfo {
Expand Down Expand Up @@ -90,7 +93,7 @@ class CopyNode : public Sink {

static void writeAndResetNodeGroup(common::node_group_idx_t nodeGroupIdx,
storage::PrimaryKeyIndexBuilder* pkIndex, common::column_id_t pkColumnID,
storage::NodeTable* table, storage::NodeGroup* nodeGroup);
storage::NodeTable* table, storage::NodeGroup* nodeGroup, bool isCopyTurtle);

private:
inline bool isCopyAllowed() const {
Expand All @@ -108,6 +111,11 @@ class CopyNode : public Sink {
static uint64_t appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex,
storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes);

void appendUniqueValueToPKIndex(
storage::PrimaryKeyIndexBuilder* pkIndex, common::ValueVector* vectorToAppend);

void copyToNodeGroup(std::vector<DataPos> dataPoses);

private:
std::shared_ptr<CopyNodeSharedState> sharedState;
CopyNodeInfo copyNodeInfo;
Expand Down
4 changes: 4 additions & 0 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class Reader : public PhysicalOperator {
return make_unique<Reader>(info->copy(), sharedState, getOperatorID(), paramsString);
}

inline bool isCopyTurtleFile() const {
return sharedState->copyDescription->fileType == common::CopyDescription::FileType::TURTLE;
}

inline bool getContainsSerial() const { return info->containsSerial; }

protected:
Expand Down
37 changes: 37 additions & 0 deletions src/include/processor/operator/persistent/reader/rdf_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once

#include "common/data_chunk/data_chunk.h"
#include "common/string_utils.h"
#include "serd.h"

namespace kuzu {
namespace storage {

class RDFReader {
public:
explicit RDFReader(std::string filePath);

~RDFReader();

common::offset_t read(common::DataChunk* dataChunkToRead);

private:
static SerdStatus errorHandle(void* handle, const SerdError* error);
static SerdStatus handleStatements(void* handle, SerdStatementFlags flags,
const SerdNode* graph, const SerdNode* subject, const SerdNode* predicate,
const SerdNode* object, const SerdNode* object_datatype, const SerdNode* object_lang);
static bool isSerdTypeSupported(SerdType serdType);

private:
const std::string filePath;
FILE* fp;
SerdReader* reader;
common::offset_t numLinesRead;
SerdStatus status;
common::ValueVector* subjectVector;
common::ValueVector* predicateVector;
common::ValueVector* objectVector;
};

} // namespace storage
} // namespace kuzu
19 changes: 18 additions & 1 deletion src/include/processor/operator/persistent/reader_functions.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "processor/operator/persistent/csv_reader.h"
#include "processor/operator/persistent/reader/csv_reader.h"
#include "processor/operator/persistent/reader/rdf_reader.h"
#include "storage/copier/npy_reader.h"
#include "storage/copier/table_copy_utils.h"

Expand Down Expand Up @@ -32,6 +33,10 @@ struct NPYReaderFunctionData : public ReaderFunctionData {
std::unique_ptr<storage::NpyMultiFileReader> reader = nullptr;
};

struct RDFReaderFunctionData : public ReaderFunctionData {
std::unique_ptr<storage::RDFReader> reader = nullptr;
};

struct FileBlocksInfo {
common::row_idx_t numRows = 0;
common::block_idx_t numBlocks = 0;
Expand Down Expand Up @@ -69,6 +74,10 @@ struct ReaderFunctions {
}
static void validateNPYFiles(
const std::vector<std::string>& paths, catalog::TableSchema* tableSchema);
static inline void validateRDFFiles(
const std::vector<std::string>& paths, catalog::TableSchema* tableSchema) {
// DO NOTHING.
}

static std::vector<FileBlocksInfo> countRowsInRelCSVFile(const std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
Expand All @@ -82,6 +91,9 @@ struct ReaderFunctions {
static std::vector<FileBlocksInfo> countRowsInNPYFile(const std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
storage::MemoryManager* memoryManager);
static std::vector<FileBlocksInfo> countRowsInRDFFile(const std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
storage::MemoryManager* memoryManager);

static void initRelCSVReadData(ReaderFunctionData& funcData,
const std::vector<std::string>& paths, common::vector_idx_t fileIdx,
Expand All @@ -95,6 +107,9 @@ struct ReaderFunctions {
static void initNPYReadData(ReaderFunctionData& funcData, const std::vector<std::string>& paths,
common::vector_idx_t fileIdx, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema);
static void initRDFReadData(ReaderFunctionData& funcData, const std::vector<std::string>& paths,
common::vector_idx_t fileIdx, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema);

static void readRowsFromRelCSVFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead);
Expand All @@ -104,6 +119,8 @@ struct ReaderFunctions {
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
static void readRowsFromNPYFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
static void readRowsFromRDFFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);

static std::unique_ptr<common::DataChunk> getDataChunkToRead(
catalog::TableSchema* tableSchema, storage::MemoryManager* memoryManager);
Expand Down
4 changes: 0 additions & 4 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ namespace processor {

std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyFrom(LogicalOperator* logicalOperator) {
auto copyFrom = (LogicalCopyFrom*)logicalOperator;
auto info = copyFrom->getInfo();
if (info->copyDesc->fileType == CopyDescription::FileType::TURTLE) {
throw NotImplementedException("PlanMapper::mapCopyFrom");
}
switch (copyFrom->getInfo()->tableSchema->getTableType()) {
case TableType::NODE:
return mapCopyNodeFrom(logicalOperator);
Expand Down
3 changes: 2 additions & 1 deletion src/processor/operator/persistent/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
add_subdirectory(reader)

add_library(kuzu_processor_operator_persistent
OBJECT
copy.cpp
Expand All @@ -7,7 +9,6 @@ add_library(kuzu_processor_operator_persistent
copy_rel_lists.cpp
copy_to.cpp
csv_file_writer.cpp
csv_reader.cpp
delete.cpp
delete_executor.cpp
insert.cpp
Expand Down
87 changes: 64 additions & 23 deletions src/processor/operator/persistent/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
CopyNodeSharedState::CopyNodeSharedState(uint64_t& numRows, NodeTableSchema* tableSchema,
NodeTable* table, const CopyDescription& copyDesc, MemoryManager* memoryManager)
: numRows{numRows}, copyDesc{copyDesc}, tableSchema{tableSchema}, table{table}, pkColumnID{0},
hasLoggedWAL{false}, currentNodeGroupIdx{0} {
hasLoggedWAL{false}, currentNodeGroupIdx{0}, isCopyTurtle{copyDesc.fileType ==
CopyDescription::FileType::TURTLE} {
auto ftTableSchema = std::make_unique<FactorizedTableSchema>();
ftTableSchema->appendColumn(
std::make_unique<ColumnSchema>(false /* flat */, 0 /* dataChunkPos */,
Expand Down Expand Up @@ -63,7 +64,7 @@
if (sharedNodeGroup->isFull()) {
auto nodeGroupIdx = getNextNodeGroupIdxWithoutLock();
CopyNode::writeAndResetNodeGroup(
nodeGroupIdx, pkIndex.get(), pkColumnID, table, sharedNodeGroup.get());
nodeGroupIdx, pkIndex.get(), pkColumnID, table, sharedNodeGroup.get(), isCopyTurtle);
}
if (numNodesAppended < localNodeGroup->getNumNodes()) {
sharedNodeGroup->append(localNodeGroup.get(), numNodesAppended);
Expand All @@ -83,24 +84,23 @@
while (children[0]->getNextTuple(context)) {
auto originalSelVector =
resultSet->getDataChunk(copyNodeInfo.dataColumnPoses[0].dataChunkPos)->state->selVector;
// All tuples in the resultSet are in the same data chunk.
auto numTuplesToAppend = originalSelVector->selectedSize;
auto numAppendedTuples = 0ul;
while (numAppendedTuples < numTuplesToAppend) {
auto numAppendedTuplesInNodeGroup = localNodeGroup->append(
resultSet, copyNodeInfo.dataColumnPoses, numTuplesToAppend - numAppendedTuples);
numAppendedTuples += numAppendedTuplesInNodeGroup;
if (localNodeGroup->isFull()) {
node_group_idx_t nodeGroupIdx;
nodeGroupIdx = sharedState->getNextNodeGroupIdx();
writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(),
sharedState->pkColumnID, sharedState->table, localNodeGroup.get());
}
if (numAppendedTuples < numTuplesToAppend) {
sliceDataChunk(
*resultSet->getDataChunk(copyNodeInfo.dataColumnPoses[0].dataChunkPos),
copyNodeInfo.dataColumnPoses, (offset_t)numAppendedTuplesInNodeGroup);
switch (sharedState->copyDesc.fileType) {
case CopyDescription::FileType::TURTLE: {
for (auto& dataPos : copyNodeInfo.dataColumnPoses) {
// All tuples in the resultSet are in the same data chunk.
auto vectorToAppend = resultSet->getValueVector(dataPos).get();
appendUniqueValueToPKIndex(sharedState->pkIndex.get(), vectorToAppend);
copyToNodeGroup({dataPos});
}
} break;
case CopyDescription::FileType::CSV:
case CopyDescription::FileType::PARQUET:
case CopyDescription::FileType::NPY: {
copyToNodeGroup(copyNodeInfo.dataColumnPoses);
} break;
default: {
throw NotImplementedException{"CopyNode::executeInternal"};

Check warning on line 102 in src/processor/operator/persistent/copy_node.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/operator/persistent/copy_node.cpp#L101-L102

Added lines #L101 - L102 were not covered by tests
}
}
resultSet->getDataChunk(copyNodeInfo.dataColumnPoses[0].dataChunkPos)->state->selVector =
std::move(originalSelVector);
Expand Down Expand Up @@ -130,11 +130,11 @@
}

void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx,
PrimaryKeyIndexBuilder* pkIndex, column_id_t pkColumnID, NodeTable* table,
NodeGroup* nodeGroup) {
PrimaryKeyIndexBuilder* pkIndex, column_id_t pkColumnID, NodeTable* table, NodeGroup* nodeGroup,
bool isCopyTurtle) {
nodeGroup->setNodeGroupIdx(nodeGroupIdx);
auto startOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
if (pkIndex) {
if (pkIndex && !isCopyTurtle) {
populatePKIndex(pkIndex, nodeGroup->getColumnChunk(pkColumnID), startOffset,
nodeGroup->getNumNodes() /* startPageIdx */);
}
Expand Down Expand Up @@ -191,7 +191,7 @@
if (sharedState->sharedNodeGroup) {
auto nodeGroupIdx = sharedState->getNextNodeGroupIdx();
writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(), sharedState->pkColumnID,
sharedState->table, sharedState->sharedNodeGroup.get());
sharedState->table, sharedState->sharedNodeGroup.get(), sharedState->isCopyTurtle);
}
if (sharedState->pkIndex) {
sharedState->pkIndex->flush();
Expand Down Expand Up @@ -240,5 +240,46 @@
return numValues;
}

void CopyNode::appendUniqueValueToPKIndex(
storage::PrimaryKeyIndexBuilder* pkIndex, common::ValueVector* vectorToAppend) {
auto selVector = std::make_unique<common::SelectionVector>(common::DEFAULT_VECTOR_CAPACITY);
selVector->resetSelectorToValuePosBuffer();
common::sel_t nextPos = 0;
common::offset_t result;
auto offset = StorageUtils::getStartOffsetOfNodeGroup(sharedState->getCurNodeGroupIdx()) +
localNodeGroup->getNumNodes();
for (auto i = 0u; i < vectorToAppend->state->getNumSelectedValues(); i++) {
auto uriStr = vectorToAppend->getValue<common::ku_string_t>(i).getAsString();
if (!pkIndex->lookup((int64_t)uriStr.c_str(), result)) {
pkIndex->append(uriStr.c_str(), offset++);
selVector->selectedPositions[nextPos++] = i;
}
}
selVector->selectedSize = nextPos;
vectorToAppend->state->selVector = std::move(selVector);
}

void CopyNode::copyToNodeGroup(std::vector<DataPos> dataPoses) {
auto numAppendedTuples = 0ul;
auto numTuplesToAppend =
resultSet->getDataChunk(dataPoses[0].dataChunkPos)->state->getNumSelectedValues();
while (numAppendedTuples < numTuplesToAppend) {
auto numAppendedTuplesInNodeGroup =
localNodeGroup->append(resultSet, dataPoses, numTuplesToAppend - numAppendedTuples);
numAppendedTuples += numAppendedTuplesInNodeGroup;
if (localNodeGroup->isFull()) {
node_group_idx_t nodeGroupIdx;
nodeGroupIdx = sharedState->getNextNodeGroupIdx();
writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(),
sharedState->pkColumnID, sharedState->table, localNodeGroup.get(),
sharedState->isCopyTurtle);
}
if (numAppendedTuples < numTuplesToAppend) {
sliceDataChunk(*resultSet->getDataChunk(copyNodeInfo.dataColumnPoses[0].dataChunkPos),
copyNodeInfo.dataColumnPoses, (offset_t)numAppendedTuplesInNodeGroup);
}
}
}

} // namespace processor
} // namespace kuzu
Loading