Skip to content

Commit

Permalink
Add nquad file type, add scan multiple rdf files
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Dec 1, 2023
1 parent 3ab6bf1 commit 5da59c2
Show file tree
Hide file tree
Showing 17 changed files with 1,257 additions and 70 deletions.
4 changes: 4 additions & 0 deletions dataset/rdf/spb1k/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
COPY spb_resource_t FROM "dataset/rdf/spb1k/*.nq";
COPY spb_literal_t FROM glob("dataset/rdf/spb1k/*.nq");
COPY spb_resource_triples_t FROM glob("dataset/rdf/spb1k/*.nq");
COPY spb_literal_triples_t FROM glob("dataset/rdf/spb1k/*.nq");
294 changes: 294 additions & 0 deletions dataset/rdf/spb1k/generatedCreativeWorks-000001.nq

Large diffs are not rendered by default.

191 changes: 191 additions & 0 deletions dataset/rdf/spb1k/generatedCreativeWorks-000002.nq

Large diffs are not rendered by default.

296 changes: 296 additions & 0 deletions dataset/rdf/spb1k/generatedCreativeWorks-000003.nq

Large diffs are not rendered by default.

314 changes: 314 additions & 0 deletions dataset/rdf/spb1k/generatedCreativeWorks-000004.nq

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions dataset/rdf/spb1k/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE RDF GRAPH spb;
14 changes: 10 additions & 4 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,21 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
}
switch (tableSchema->tableType) {
case TableType::NODE:
if (readerConfig->fileType == FileType::TURTLE) {
switch (readerConfig->fileType) {
case FileType::TURTLE:
case FileType::NQUADS: {
return bindCopyRdfNodeFrom(statement, std::move(readerConfig), tableSchema);
} else {
}
default:
return bindCopyNodeFrom(statement, std::move(readerConfig), tableSchema);
}
case TableType::REL: {
if (readerConfig->fileType == FileType::TURTLE) {
switch (readerConfig->fileType) {
case FileType::TURTLE:
case FileType::NQUADS: {
return bindCopyRdfRelFrom(statement, std::move(readerConfig), tableSchema);
} else {
}
default:
return bindCopyRelFrom(statement, std::move(readerConfig), tableSchema);
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/binder/binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,19 +217,20 @@ function::TableFunction* Binder::getScanFunction(FileType fileType, const Reader
inputTypes.push_back(&stringType);
auto functions = catalog.getBuiltInFunctions();
switch (fileType) {
case common::FileType::PARQUET: {
case FileType::PARQUET: {
func = functions->matchScalarFunction(READ_PARQUET_FUNC_NAME, inputTypes);
} break;
case common::FileType::NPY: {
case FileType::NPY: {
func = functions->matchScalarFunction(READ_NPY_FUNC_NAME, inputTypes);
} break;
case common::FileType::CSV: {
case FileType::CSV: {
auto csvConfig = reinterpret_cast<CSVReaderConfig*>(config.extraConfig.get());
func = functions->matchScalarFunction(
csvConfig->parallel ? READ_CSV_PARALLEL_FUNC_NAME : READ_CSV_SERIAL_FUNC_NAME,
inputTypes);
} break;
case common::FileType::TURTLE: {
case FileType::NQUADS:
case FileType::TURTLE: {
func = functions->matchScalarFunction(READ_RDF_FUNC_NAME, inputTypes);
} break;
default:
Expand Down
6 changes: 6 additions & 0 deletions src/common/copier_config/reader_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ FileType FileTypeUtils::getFileTypeFromExtension(std::string_view extension) {
if (extension == ".ttl") {
return FileType::TURTLE;
}
if (extension == ".nq") {

Check warning on line 22 in src/common/copier_config/reader_config.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/copier_config/reader_config.cpp#L22

Added line #L22 was not covered by tests
return FileType::NQUADS;
}
throw CopyException(std::string("Unsupported file type ").append(extension));
}

Expand All @@ -39,6 +42,9 @@ std::string FileTypeUtils::toString(FileType fileType) {
case FileType::TURTLE: {
return "TURTLE";
}
case FileType::NQUADS: {
return "NQUADS";

Check warning on line 46 in src/common/copier_config/reader_config.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/copier_config/reader_config.cpp#L46

Added line #L46 was not covered by tests
}
default: {
KU_UNREACHABLE;
}
Expand Down
9 changes: 8 additions & 1 deletion src/include/common/copier_config/reader_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@
namespace kuzu {
namespace common {

enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3, TURTLE = 4 };
enum class FileType : uint8_t {
UNKNOWN = 0,
CSV = 1,
PARQUET = 2,
NPY = 3,
TURTLE = 4, // Terse triples http://www.w3.org/TR/turtle
NQUADS = 5 // Line-based quads http://www.w3.org/TR/n-quads/
};

struct FileTypeUtils {
static FileType getFileTypeFromExtension(std::string_view extension);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SerialCSVReader final : public BaseCSVReader {
};

struct SerialCSVScanSharedState final : public function::ScanSharedState {
explicit SerialCSVScanSharedState(
SerialCSVScanSharedState(
common::ReaderConfig readerConfig, uint64_t numRows, uint64_t numColumns)
: ScanSharedState{std::move(readerConfig), numRows}, numColumns{numColumns} {
initReader();
Expand Down
41 changes: 30 additions & 11 deletions src/include/processor/operator/persistent/reader/rdf/rdf_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,49 @@
#include "function/table_functions.h"
#include "function/table_functions/bind_data.h"
#include "function/table_functions/bind_input.h"
#include "function/table_functions/scan_functions.h"
#include "serd.h"

namespace kuzu {
namespace processor {

class RDFReader {
class RdfReader {
public:
RDFReader(std::string filePath, const common::RdfReaderConfig& config);
RdfReader(
std::string filePath, common::FileType fileType, const common::RdfReaderConfig& config)
: filePath{std::move(filePath)}, fileType{fileType}, mode{config.mode}, index{config.index},
reader{nullptr}, rowOffset{0}, vectorSize{0}, sVector{nullptr}, pVector{nullptr},
oVector{nullptr}, status{SERD_SUCCESS} {}

~RDFReader();
~RdfReader();

inline void initReader() { initReader(prefixHandle, statementHandle); }
inline void initCountReader() { initReader(nullptr, countStatementHandle); }

common::offset_t read(common::DataChunk* dataChunkToRead);
common::offset_t countLine();

private:
static SerdStatus errorHandle(void* handle, const SerdError* error);
static SerdStatus readerStatementSink(void* handle, SerdStatementFlags flags,
const SerdNode* graph, const SerdNode* subject, const SerdNode* predicate,
const SerdNode* object, const SerdNode* object_datatype, const SerdNode* object_lang);
static SerdStatus prefixSink(void* handle, const SerdNode* name, const SerdNode* uri);
static SerdStatus statementHandle(void* handle, SerdStatementFlags flags, const SerdNode* graph,
const SerdNode* subject, const SerdNode* predicate, const SerdNode* object,
const SerdNode* object_datatype, const SerdNode* object_lang);
static SerdStatus prefixHandle(void* handle, const SerdNode* name, const SerdNode* uri);

static SerdStatus counterStatementSink(void* handle, SerdStatementFlags flags,
static SerdStatus countStatementHandle(void* handle, SerdStatementFlags flags,
const SerdNode* graph, const SerdNode* subject, const SerdNode* predicate,
const SerdNode* object, const SerdNode* object_datatype, const SerdNode* object_lang);

void initReader(const SerdPrefixSink prefixSink_, const SerdStatementSink statementSink_);

private:
const std::string filePath;
common::FileType fileType;
common::RdfReaderMode mode;
storage::PrimaryKeyIndex* index;

FILE* fp;
SerdReader* reader;
SerdReader* counter;

// TODO(Xiyang): use prefix to expand CURIE.
const char* currentPrefix;
Expand All @@ -51,8 +61,17 @@ class RDFReader {
common::ValueVector* oVector; // object
};

struct RdfScanLocalState final : public function::TableFuncLocalState {
std::unique_ptr<RDFReader> reader;
struct RdfScanSharedState final : public function::ScanSharedState {
std::unique_ptr<RdfReader> reader;

RdfScanSharedState(common::ReaderConfig readerConfig, uint64_t numRows)
: ScanSharedState{readerConfig, numRows} {
initReader();
}

void read(common::DataChunk& dataChunk);

void initReader();
};

struct RdfScan {
Expand Down
3 changes: 2 additions & 1 deletion src/planner/plan/plan_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ static void appendPartitioner(BoundCopyFromInfo* copyFromInfo, LogicalPlan& plan
payloads.push_back(copyFromInfo->fileScanInfo->offset);
// TODO(Xiyang): Merge TURTLE case with other data types.
switch (fileType) {
case FileType::NQUADS:
case FileType::TURTLE: {
auto extraInfo = reinterpret_cast<ExtraBoundCopyRdfRelInfo*>(copyFromInfo->extraInfo.get());
infos.push_back(std::make_unique<LogicalPartitionerInfo>(
Expand Down Expand Up @@ -83,7 +84,7 @@ std::unique_ptr<LogicalPlan> Planner::planCopyFrom(const BoundStatement& stateme
QueryPlanner::appendScanFile(copyFromInfo->fileScanInfo.get(), *plan);
auto tableType = copyFromInfo->tableSchema->tableType;
if (tableType == TableType::REL) {
if (fileType != FileType::TURTLE) {
if (fileType != FileType::TURTLE && fileType != FileType::NQUADS) {
auto extraInfo = dynamic_cast<ExtraBoundCopyRelInfo*>(copyFromInfo->extraInfo.get());
std::vector<std::unique_ptr<LogicalIndexScanNodeInfo>> infos;
auto srcNodeTableSchema = dynamic_cast<NodeTableSchema*>(extraInfo->srcTableSchema);
Expand Down
7 changes: 4 additions & 3 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNodeFrom(LogicalOperator* l
auto readerConfig =
reinterpret_cast<function::ScanBindData*>(copyFromInfo->fileScanInfo->bindData.get())
->config;
if (readerConfig.fileType == FileType::TURTLE &&
reinterpret_cast<RdfReaderConfig*>(readerConfig.extraConfig.get())->mode ==
RdfReaderMode::RESOURCE) {
bool isRdfFile =
readerConfig.fileType == FileType::TURTLE || readerConfig.fileType == FileType::NQUADS;
if (isRdfFile && reinterpret_cast<RdfReaderConfig*>(readerConfig.extraConfig.get())->mode ==
RdfReaderMode::RESOURCE) {
copyNode = std::make_unique<CopyRdfResource>(sharedState, std::move(info),
std::make_unique<ResultSetDescriptor>(copyFrom->getSchema()), std::move(prevOperator),
getOperatorID(), copyFrom->getExpressionsForPrinting());
Expand Down
Loading

0 comments on commit 5da59c2

Please sign in to comment.