Skip to content

Commit

Permalink
Add nquad file type, add scan multiple rdf files
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Nov 30, 2023
1 parent 975aed4 commit 7acfb58
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 16 deletions.
4 changes: 4 additions & 0 deletions dataset/rdf/spb1k/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
COPY spb_resource_t FROM "dataset/rdf/spb1k/*.nq";
COPY spb_literal_t FROM glob("dataset/rdf/spb1k/*.nq");
COPY spb_resource_triples_t FROM glob("dataset/rdf/spb1k/*.nq");
COPY spb_literal_triples_t FROM glob("dataset/rdf/spb1k/*.nq");
1 change: 1 addition & 0 deletions dataset/rdf/spb1k/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE RDF GRAPH spb;
14 changes: 10 additions & 4 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,21 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
}
switch (tableSchema->tableType) {
case TableType::NODE:
if (readerConfig->fileType == FileType::TURTLE) {
switch (readerConfig->fileType) {
case FileType::TURTLE:
case FileType::NQUADS: {
return bindCopyRdfNodeFrom(statement, std::move(readerConfig), tableSchema);
} else {
}
default:
return bindCopyNodeFrom(statement, std::move(readerConfig), tableSchema);
}
case TableType::REL: {
if (readerConfig->fileType == FileType::TURTLE) {
switch (readerConfig->fileType) {
case FileType::TURTLE:
case FileType::NQUADS: {
return bindCopyRdfRelFrom(statement, std::move(readerConfig), tableSchema);
} else {
}
default:
return bindCopyRelFrom(statement, std::move(readerConfig), tableSchema);
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/binder/binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,19 +217,20 @@ function::TableFunction* Binder::getScanFunction(FileType fileType, const Reader
inputTypes.push_back(&stringType);
auto functions = catalog.getBuiltInFunctions();
switch (fileType) {
case common::FileType::PARQUET: {
case FileType::PARQUET: {
func = functions->matchScalarFunction(READ_PARQUET_FUNC_NAME, inputTypes);
} break;
case common::FileType::NPY: {
case FileType::NPY: {
func = functions->matchScalarFunction(READ_NPY_FUNC_NAME, inputTypes);
} break;
case common::FileType::CSV: {
case FileType::CSV: {
auto csvConfig = reinterpret_cast<CSVReaderConfig*>(config.extraConfig.get());
func = functions->matchScalarFunction(
csvConfig->parallel ? READ_CSV_PARALLEL_FUNC_NAME : READ_CSV_SERIAL_FUNC_NAME,
inputTypes);
} break;
case common::FileType::TURTLE: {
case FileType::NQUADS:
case FileType::TURTLE: {
func = functions->matchScalarFunction(READ_RDF_FUNC_NAME, inputTypes);
} break;
default:
Expand Down
4 changes: 3 additions & 1 deletion src/common/copier_config/reader_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ FileType FileTypeUtils::getFileTypeFromExtension(std::string_view extension) {
if (extension == ".ttl") {
return FileType::TURTLE;
}
if (extension == ".nq") {}
if (extension == ".nq") {
return FileType::NQUADS;
}
throw CopyException(std::string("Unsupported file type ").append(extension));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SerialCSVReader final : public BaseCSVReader {
};

struct SerialCSVScanSharedState final : public function::ScanSharedState {
explicit SerialCSVScanSharedState(
SerialCSVScanSharedState(
common::ReaderConfig readerConfig, uint64_t numRows, uint64_t numColumns)
: ScanSharedState{std::move(readerConfig), numRows}, numColumns{numColumns} {
initReader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ class RdfReader {
struct RdfScanSharedState final : public function::ScanSharedState {
std::unique_ptr<RdfReader> reader;

RdfScanSharedState(common::ReaderConfig readerConfig, uint64_t numRows)
: ScanSharedState{readerConfig, numRows} {
initReader();
}

void read(common::DataChunk& dataChunk);

void initReader();
Expand Down
3 changes: 2 additions & 1 deletion src/planner/plan/plan_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ static void appendPartitioner(BoundCopyFromInfo* copyFromInfo, LogicalPlan& plan
payloads.push_back(copyFromInfo->fileScanInfo->offset);
// TODO(Xiyang): Merge TURTLE case with other data types.
switch (fileType) {
case FileType::NQUADS:
case FileType::TURTLE: {
auto extraInfo = reinterpret_cast<ExtraBoundCopyRdfRelInfo*>(copyFromInfo->extraInfo.get());
infos.push_back(std::make_unique<LogicalPartitionerInfo>(
Expand Down Expand Up @@ -83,7 +84,7 @@ std::unique_ptr<LogicalPlan> Planner::planCopyFrom(const BoundStatement& stateme
QueryPlanner::appendScanFile(copyFromInfo->fileScanInfo.get(), *plan);
auto tableType = copyFromInfo->tableSchema->tableType;
if (tableType == TableType::REL) {
if (fileType != FileType::TURTLE) {
if (fileType != FileType::TURTLE && fileType != FileType::NQUADS) {
auto extraInfo = dynamic_cast<ExtraBoundCopyRelInfo*>(copyFromInfo->extraInfo.get());
std::vector<std::unique_ptr<LogicalIndexScanNodeInfo>> infos;
auto srcNodeTableSchema = dynamic_cast<NodeTableSchema*>(extraInfo->srcTableSchema);
Expand Down
7 changes: 4 additions & 3 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNodeFrom(LogicalOperator* l
auto readerConfig =
reinterpret_cast<function::ScanBindData*>(copyFromInfo->fileScanInfo->bindData.get())
->config;
if (readerConfig.fileType == FileType::TURTLE &&
reinterpret_cast<RdfReaderConfig*>(readerConfig.extraConfig.get())->mode ==
RdfReaderMode::RESOURCE) {
bool isRdfFile =
readerConfig.fileType == FileType::TURTLE || readerConfig.fileType == FileType::NQUADS;
if (isRdfFile && reinterpret_cast<RdfReaderConfig*>(readerConfig.extraConfig.get())->mode ==
RdfReaderMode::RESOURCE) {
copyNode = std::make_unique<CopyRdfResource>(sharedState, std::move(info),
std::make_unique<ResultSetDescriptor>(copyFrom->getSchema()), std::move(prevOperator),
getOperatorID(), copyFrom->getExpressionsForPrinting());
Expand Down
3 changes: 2 additions & 1 deletion src/processor/operator/persistent/reader/rdf/rdf_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ void RdfScanSharedState::initReader() {
auto path = readerConfig.filePaths[fileIdx];
auto rdfConfig = reinterpret_cast<RdfReaderConfig*>(readerConfig.extraConfig.get());
reader = std::make_unique<RdfReader>(path, readerConfig.fileType, *rdfConfig);
reader->initReader();
}

function::function_set RdfScan::getFunctionSet() {
Expand Down Expand Up @@ -336,7 +337,7 @@ std::unique_ptr<function::TableFuncSharedState> RdfScan::initSharedState(
reader->initCountReader();
numRows += reader->countLine();
}
return std::make_unique<function::ScanSharedState>(bindData->config, numRows);
return std::make_unique<RdfScanSharedState>(bindData->config, numRows);
}

std::unique_ptr<function::TableFuncLocalState> RdfScan::initLocalState(
Expand Down
3 changes: 2 additions & 1 deletion test/graph_test/base_graph_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ void TestHelper::executeScript(const std::string& cypherScript, Connection& conn
if (substrLower.find(".csv") != std::string::npos ||
substrLower.find(".parquet") != std::string::npos ||
substrLower.find(".npy") != std::string::npos ||
substrLower.find(".ttl") != std::string::npos) {
substrLower.find(".ttl") != std::string::npos ||
substrLower.find(".nq") != std::string::npos) {
csvFilePaths.push_back(substr);
}
index = end + 1;
Expand Down
16 changes: 16 additions & 0 deletions test/test_files/rdf/spb1k.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-GROUP RdfoxExample
-DATASET TTL rdf/spb1k

--

-CASE SPB

-STATEMENT MATCH (s:spb_resource_t) RETURN COUNT(*);
---- 1
173
-STATEMENT MATCH (s:spb_resource_t)-[]->(:spb_resource_t) WHERE s.iri = "http://www.bbc.co.uk/things/3#id" RETURN COUNT(*);
---- 1
16
-STATEMENT MATCH (s:spb_resource_t)-[]->(:spb_literal_t) WHERE s.iri = "http://www.bbc.co.uk/things/3#id" RETURN COUNT(*);
---- 1
7

0 comments on commit 7acfb58

Please sign in to comment.