Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NQUAD file type, add multiple rdf file scanner #2531

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dataset/rdf/spb1k/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
COPY spb_resource_t FROM "dataset/rdf/spb1k/*.nq";
COPY spb_literal_t FROM glob("dataset/rdf/spb1k/*.nq");
COPY spb_resource_triples_t FROM glob("dataset/rdf/spb1k/*.nq");
COPY spb_literal_triples_t FROM glob("dataset/rdf/spb1k/*.nq");
294 changes: 294 additions & 0 deletions dataset/rdf/spb1k/generatedCreativeWorks-000001.nq

Large diffs are not rendered by default.

191 changes: 191 additions & 0 deletions dataset/rdf/spb1k/generatedCreativeWorks-000002.nq

Large diffs are not rendered by default.

296 changes: 296 additions & 0 deletions dataset/rdf/spb1k/generatedCreativeWorks-000003.nq

Large diffs are not rendered by default.

314 changes: 314 additions & 0 deletions dataset/rdf/spb1k/generatedCreativeWorks-000004.nq

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions dataset/rdf/spb1k/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE RDF GRAPH spb;
14 changes: 10 additions & 4 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,21 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
}
switch (tableSchema->tableType) {
case TableType::NODE:
if (readerConfig->fileType == FileType::TURTLE) {
switch (readerConfig->fileType) {
case FileType::TURTLE:
case FileType::NQUADS: {
return bindCopyRdfNodeFrom(statement, std::move(readerConfig), tableSchema);
} else {
}
default:
return bindCopyNodeFrom(statement, std::move(readerConfig), tableSchema);
}
case TableType::REL: {
if (readerConfig->fileType == FileType::TURTLE) {
switch (readerConfig->fileType) {
case FileType::TURTLE:
case FileType::NQUADS: {
return bindCopyRdfRelFrom(statement, std::move(readerConfig), tableSchema);
} else {
}
default:
return bindCopyRelFrom(statement, std::move(readerConfig), tableSchema);
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/binder/binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,19 +217,20 @@ function::TableFunction* Binder::getScanFunction(FileType fileType, const Reader
inputTypes.push_back(&stringType);
auto functions = catalog.getBuiltInFunctions();
switch (fileType) {
case common::FileType::PARQUET: {
case FileType::PARQUET: {
func = functions->matchScalarFunction(READ_PARQUET_FUNC_NAME, inputTypes);
} break;
case common::FileType::NPY: {
case FileType::NPY: {
func = functions->matchScalarFunction(READ_NPY_FUNC_NAME, inputTypes);
} break;
case common::FileType::CSV: {
case FileType::CSV: {
auto csvConfig = reinterpret_cast<CSVReaderConfig*>(config.extraConfig.get());
func = functions->matchScalarFunction(
csvConfig->parallel ? READ_CSV_PARALLEL_FUNC_NAME : READ_CSV_SERIAL_FUNC_NAME,
inputTypes);
} break;
case common::FileType::TURTLE: {
case FileType::NQUADS:
case FileType::TURTLE: {
func = functions->matchScalarFunction(READ_RDF_FUNC_NAME, inputTypes);
} break;
default:
Expand Down
6 changes: 6 additions & 0 deletions src/common/copier_config/reader_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
if (extension == ".ttl") {
return FileType::TURTLE;
}
if (extension == ".nq") {

Check warning on line 22 in src/common/copier_config/reader_config.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/copier_config/reader_config.cpp#L22

Added line #L22 was not covered by tests
return FileType::NQUADS;
}
throw CopyException(std::string("Unsupported file type ").append(extension));
}

Expand All @@ -39,6 +42,9 @@
case FileType::TURTLE: {
return "TURTLE";
}
case FileType::NQUADS: {
return "NQUADS";

Check warning on line 46 in src/common/copier_config/reader_config.cpp

View check run for this annotation

Codecov / codecov/patch

src/common/copier_config/reader_config.cpp#L46

Added line #L46 was not covered by tests
}
default: {
KU_UNREACHABLE;
}
Expand Down
9 changes: 8 additions & 1 deletion src/include/common/copier_config/reader_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@
namespace kuzu {
namespace common {

enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3, TURTLE = 4 };
enum class FileType : uint8_t {
UNKNOWN = 0,
CSV = 1,
PARQUET = 2,
NPY = 3,
TURTLE = 4, // Terse triples http://www.w3.org/TR/turtle
NQUADS = 5 // Line-based quads http://www.w3.org/TR/n-quads/
};

struct FileTypeUtils {
static FileType getFileTypeFromExtension(std::string_view extension);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SerialCSVReader final : public BaseCSVReader {
};

struct SerialCSVScanSharedState final : public function::ScanSharedState {
explicit SerialCSVScanSharedState(
SerialCSVScanSharedState(
common::ReaderConfig readerConfig, uint64_t numRows, uint64_t numColumns)
: ScanSharedState{std::move(readerConfig), numRows}, numColumns{numColumns} {
initReader();
Expand Down
41 changes: 30 additions & 11 deletions src/include/processor/operator/persistent/reader/rdf/rdf_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,49 @@
#include "function/table_functions.h"
#include "function/table_functions/bind_data.h"
#include "function/table_functions/bind_input.h"
#include "function/table_functions/scan_functions.h"
#include "serd.h"

namespace kuzu {
namespace processor {

class RDFReader {
class RdfReader {
public:
RDFReader(std::string filePath, const common::RdfReaderConfig& config);
RdfReader(
std::string filePath, common::FileType fileType, const common::RdfReaderConfig& config)
: filePath{std::move(filePath)}, fileType{fileType}, mode{config.mode}, index{config.index},
reader{nullptr}, rowOffset{0}, vectorSize{0}, sVector{nullptr}, pVector{nullptr},
oVector{nullptr}, status{SERD_SUCCESS} {}

~RDFReader();
~RdfReader();

inline void initReader() { initReader(prefixHandle, statementHandle); }
inline void initCountReader() { initReader(nullptr, countStatementHandle); }

common::offset_t read(common::DataChunk* dataChunkToRead);
common::offset_t countLine();

private:
static SerdStatus errorHandle(void* handle, const SerdError* error);
static SerdStatus readerStatementSink(void* handle, SerdStatementFlags flags,
const SerdNode* graph, const SerdNode* subject, const SerdNode* predicate,
const SerdNode* object, const SerdNode* object_datatype, const SerdNode* object_lang);
static SerdStatus prefixSink(void* handle, const SerdNode* name, const SerdNode* uri);
static SerdStatus statementHandle(void* handle, SerdStatementFlags flags, const SerdNode* graph,
const SerdNode* subject, const SerdNode* predicate, const SerdNode* object,
const SerdNode* object_datatype, const SerdNode* object_lang);
static SerdStatus prefixHandle(void* handle, const SerdNode* name, const SerdNode* uri);

static SerdStatus counterStatementSink(void* handle, SerdStatementFlags flags,
static SerdStatus countStatementHandle(void* handle, SerdStatementFlags flags,
const SerdNode* graph, const SerdNode* subject, const SerdNode* predicate,
const SerdNode* object, const SerdNode* object_datatype, const SerdNode* object_lang);

void initReader(const SerdPrefixSink prefixSink_, const SerdStatementSink statementSink_);

private:
const std::string filePath;
common::FileType fileType;
common::RdfReaderMode mode;
storage::PrimaryKeyIndex* index;

FILE* fp;
SerdReader* reader;
SerdReader* counter;

// TODO(Xiyang): use prefix to expand CURIE.
const char* currentPrefix;
Expand All @@ -51,8 +61,17 @@ class RDFReader {
common::ValueVector* oVector; // object
};

struct RdfScanLocalState final : public function::TableFuncLocalState {
std::unique_ptr<RDFReader> reader;
struct RdfScanSharedState final : public function::ScanSharedState {
std::unique_ptr<RdfReader> reader;

RdfScanSharedState(common::ReaderConfig readerConfig, uint64_t numRows)
: ScanSharedState{readerConfig, numRows} {
initReader();
}

void read(common::DataChunk& dataChunk);

void initReader();
};

struct RdfScan {
Expand Down
3 changes: 2 additions & 1 deletion src/planner/plan/plan_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ static void appendPartitioner(BoundCopyFromInfo* copyFromInfo, LogicalPlan& plan
payloads.push_back(copyFromInfo->fileScanInfo->offset);
// TODO(Xiyang): Merge TURTLE case with other data types.
switch (fileType) {
case FileType::NQUADS:
case FileType::TURTLE: {
auto extraInfo = reinterpret_cast<ExtraBoundCopyRdfRelInfo*>(copyFromInfo->extraInfo.get());
infos.push_back(std::make_unique<LogicalPartitionerInfo>(
Expand Down Expand Up @@ -83,7 +84,7 @@ std::unique_ptr<LogicalPlan> Planner::planCopyFrom(const BoundStatement& stateme
QueryPlanner::appendScanFile(copyFromInfo->fileScanInfo.get(), *plan);
auto tableType = copyFromInfo->tableSchema->tableType;
if (tableType == TableType::REL) {
if (fileType != FileType::TURTLE) {
if (fileType != FileType::TURTLE && fileType != FileType::NQUADS) {
auto extraInfo = dynamic_cast<ExtraBoundCopyRelInfo*>(copyFromInfo->extraInfo.get());
std::vector<std::unique_ptr<LogicalIndexScanNodeInfo>> infos;
auto srcNodeTableSchema = dynamic_cast<NodeTableSchema*>(extraInfo->srcTableSchema);
Expand Down
7 changes: 4 additions & 3 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNodeFrom(LogicalOperator* l
auto readerConfig =
reinterpret_cast<function::ScanBindData*>(copyFromInfo->fileScanInfo->bindData.get())
->config;
if (readerConfig.fileType == FileType::TURTLE &&
reinterpret_cast<RdfReaderConfig*>(readerConfig.extraConfig.get())->mode ==
RdfReaderMode::RESOURCE) {
bool isRdfFile =
readerConfig.fileType == FileType::TURTLE || readerConfig.fileType == FileType::NQUADS;
if (isRdfFile && reinterpret_cast<RdfReaderConfig*>(readerConfig.extraConfig.get())->mode ==
RdfReaderMode::RESOURCE) {
copyNode = std::make_unique<CopyRdfResource>(sharedState, std::move(info),
std::make_unique<ResultSetDescriptor>(copyFrom->getSchema()), std::move(prevOperator),
getOperatorID(), copyFrom->getExpressionsForPrinting());
Expand Down
Loading