Skip to content

Commit

Permalink
Merge pull request #1376 from kuzudb/copy-glob
Browse files Browse the repository at this point in the history
Add glob support to copycsv
  • Loading branch information
acquamarin committed Mar 14, 2023
2 parents c8a4542 + 95f462d commit 3ec4124
Show file tree
Hide file tree
Showing 16 changed files with 3,365 additions and 3,281 deletions.
5 changes: 4 additions & 1 deletion src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ kU_CopyCSV

kU_FilePaths
: '[' SP? StringLiteral ( SP? ',' SP? StringLiteral )* ']'
| StringLiteral ;
| StringLiteral
| GLOB SP? '(' SP? StringLiteral SP? ')' ;

GLOB : ( 'G' | 'g' ) ( 'L' | 'l' ) ( 'O' | 'o' ) ( 'B' | 'b' ) ;

kU_ParsingOptions
: kU_ParsingOption ( SP? ',' SP? kU_ParsingOption )* ;
Expand Down
45 changes: 22 additions & 23 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,26 @@ std::unique_ptr<BoundStatement> Binder::bindCopy(const Statement& statement) {
auto tableName = copyCSV.getTableName();
validateTableExist(catalog, tableName);
auto tableID = catalogContent->getTableID(tableName);
auto filePaths = copyCSV.getFilePaths();
auto csvReaderConfig = bindParsingOptions(copyCSV.getParsingOptions());
auto filePaths = bindFilePaths(copyCSV.getFilePaths());
auto fileType = bindFileType(filePaths);
return make_unique<BoundCopy>(
CopyDescription(filePaths, csvReaderConfig, fileType), tableID, tableName);
}

std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& filePaths) {
std::vector<std::string> boundFilePaths;
for (auto& filePath : filePaths) {
auto globbedFilePaths = FileUtils::globFilePath(filePath);
boundFilePaths.insert(
boundFilePaths.end(), globbedFilePaths.begin(), globbedFilePaths.end());
}
if (boundFilePaths.empty()) {
throw BinderException{StringUtils::string_format("Invalid file path: {}.", filePaths[0])};
}
return boundFilePaths;
}

CSVReaderConfig Binder::bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<ParsedExpression>>* parsingOptions) {
CSVReaderConfig csvReaderConfig;
Expand Down Expand Up @@ -85,29 +98,15 @@ CopyDescription::FileType Binder::bindFileType(std::vector<std::string> filePath
auto csvSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::CSV);
auto arrowSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::ARROW);
auto parquetSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::PARQUET);

if (fileName.length() >= csvSuffix.length()) {
if (!fileName.compare(
fileName.length() - csvSuffix.length(), csvSuffix.length(), csvSuffix)) {
return CopyDescription::FileType::CSV;
}
if (fileName.ends_with(csvSuffix)) {
return CopyDescription::FileType::CSV;
} else if (fileName.ends_with(arrowSuffix)) {
return CopyDescription::FileType::ARROW;
} else if (fileName.ends_with(parquetSuffix)) {
return CopyDescription::FileType::PARQUET;
} else {
throw CopyException("Unsupported file type: " + fileName);
}

if (fileName.length() >= arrowSuffix.length()) {
if (!fileName.compare(
fileName.length() - arrowSuffix.length(), arrowSuffix.length(), arrowSuffix)) {
return CopyDescription::FileType::ARROW;
}
}

if (fileName.length() >= parquetSuffix.length()) {
if (!fileName.compare(fileName.length() - parquetSuffix.length(), parquetSuffix.length(),
parquetSuffix)) {
return CopyDescription::FileType::PARQUET;
}
}

throw CopyException("Unsupported file type: " + fileName);
}

} // namespace binder
Expand Down
10 changes: 10 additions & 0 deletions src/common/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,15 @@ void FileUtils::truncateFileToEmpty(FileInfo* fileInfo) {
ftruncate(fileInfo->fd, 0);
}

std::vector<std::string> FileUtils::globFilePath(const std::string& path) {
std::vector<std::string> result;
glob_t globResult;
glob(path.c_str(), GLOB_TILDE, nullptr, &globResult);
for (auto i = 0u; i < globResult.gl_pathc; ++i) {
result.emplace_back(globResult.gl_pathv[i]);
}
return result;
}

} // namespace common
} // namespace kuzu
2 changes: 2 additions & 0 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class Binder {
/*** bind copy csv ***/
std::unique_ptr<BoundStatement> bindCopy(const parser::Statement& statement);

std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);

common::CSVReaderConfig bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>*
parsingOptions);
Expand Down
5 changes: 5 additions & 0 deletions src/include/common/file_utils.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#pragma once

#include <fcntl.h>
#include <glob.h>
#include <sys/stat.h>
#include <unistd.h>

#include <filesystem>
#include <string>
#include <vector>

namespace kuzu {
namespace common {
Expand Down Expand Up @@ -54,6 +56,9 @@ class FileUtils {
}
return s.st_size;
}

static std::vector<std::string> globFilePath(const std::string& path);
};

} // namespace common
} // namespace kuzu
2 changes: 0 additions & 2 deletions src/include/processor/operator/copy/copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ class Copy : public PhysicalOperator {
virtual uint64_t executeInternal(
common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) = 0;

virtual uint64_t getNumTuplesInTable() = 0;

virtual bool allowCopyCSV() = 0;

protected:
Expand Down
2 changes: 0 additions & 2 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ class CopyNode : public Copy {
uint64_t executeInternal(
common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;

uint64_t getNumTuplesInTable() override;

private:
inline bool allowCopyCSV() override {
return nodesStatistics->getNodeStatisticsAndDeletedIDs(tableID)->getNumTuples() == 0;
Expand Down
2 changes: 0 additions & 2 deletions src/include/processor/operator/copy/copy_rel.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ class CopyRel : public Copy {
uint64_t executeInternal(
common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;

uint64_t getNumTuplesInTable() override;

private:
inline bool allowCopyCSV() override {
return relsStatistics->getRelStatistics(tableID)->getNextRelOffset() == 0;
Expand Down
1 change: 0 additions & 1 deletion src/include/processor/operator/ddl/ddl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class DDL : public PhysicalOperator {
uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, catalog{catalog}, outputPos{outputPos} {
}
virtual ~DDL() override = default;

inline bool isSource() const override { return true; }

Expand Down
6 changes: 0 additions & 6 deletions src/processor/operator/copy/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,5 @@ uint64_t CopyNode::executeInternal(
return numNodesCopied;
}

uint64_t CopyNode::getNumTuplesInTable() {
// TODO(Ziyi): this chains looks weird. Fix when refactoring table statistics. Ditto in
// CopyRel.
return nodesStatistics->getReadOnlyVersion()->tableStatisticPerTable[tableID]->getNumTuples();
}

} // namespace processor
} // namespace kuzu
4 changes: 0 additions & 4 deletions src/processor/operator/copy/copy_rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,5 @@ uint64_t CopyRel::executeInternal(
return numRelsCopied;
}

uint64_t CopyRel::getNumTuplesInTable() {
return relsStatistics->getReadOnlyVersion()->tableStatisticPerTable[tableID]->getNumTuples();
}

} // namespace processor
} // namespace kuzu
42 changes: 33 additions & 9 deletions test/copy/copy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ class CopyNodeInitRelTablesTest : public BaseGraphTest {
};

class CopyMultipleFilesTest : public DBTest {

public:
void validatePersonTableAfterCopying() {
auto result = conn->query("MATCH (p:person) RETURN p.ID");
ASSERT_TRUE(result->isSuccess());
auto groundTruth =
std::vector<std::string>{"1", "12", "15", "20", "21", "4", "5", "6", "7", "8", "9"};
ASSERT_EQ(TestHelper::convertResultToString(*result), groundTruth);
}

void validateKnowsTableAfterCopying() {
auto result = conn->query("MATCH (:person)-[e:knows]->(:person) RETURN e.weight");
ASSERT_TRUE(result->isSuccess());
auto groundTruth = std::vector<std::string>{"22", "25", "33", "41", "44", "79", "80", "85"};
ASSERT_EQ(TestHelper::convertResultToString(*result), groundTruth);
}

private:
std::string getInputDir() override {
return TestHelper::appendKuzuRootPath("dataset/copy-multiple-files-test/");
}
Expand Down Expand Up @@ -303,19 +321,25 @@ TEST_F(CopyMultipleFilesTest, CopyMultipleFilesTest) {
TestHelper::appendKuzuRootPath("dataset/copy-multiple-files-test/vPerson2.csv"),
TestHelper::appendKuzuRootPath("dataset/copy-multiple-files-test/vPerson3.csv")))
->isSuccess());
validatePersonTableAfterCopying();
ASSERT_TRUE(
conn->query(
StringUtils::string_format(R"(COPY knows FROM ["{}", "{}"])",
TestHelper::appendKuzuRootPath("dataset/copy-multiple-files-test/eKnows1.csv"),
TestHelper::appendKuzuRootPath("dataset/copy-multiple-files-test/eKnows2.csv")))
->isSuccess());
auto result = conn->query("MATCH (p:person) RETURN p.ID");
ASSERT_TRUE(result->isSuccess());
auto groundTruth =
std::vector<std::string>{"1", "12", "15", "20", "21", "4", "5", "6", "7", "8", "9"};
ASSERT_EQ(TestHelper::convertResultToString(*result), groundTruth);
result = conn->query("MATCH (:person)-[e:knows]->(:person) RETURN e.weight");
ASSERT_TRUE(result->isSuccess());
groundTruth = std::vector<std::string>{"22", "25", "33", "41", "44", "79", "80", "85"};
ASSERT_EQ(TestHelper::convertResultToString(*result), groundTruth);
validateKnowsTableAfterCopying();
}

TEST_F(CopyMultipleFilesTest, CopyFilesWithWildcardPattern) {
ASSERT_TRUE(conn->query(StringUtils::string_format(R"(COPY person FROM "{}")",
TestHelper::appendKuzuRootPath(
"dataset/copy-multiple-files-test/vPerson?.csv")))
->isSuccess());
validatePersonTableAfterCopying();
ASSERT_TRUE(
conn->query(StringUtils::string_format(R"(COPY knows FROM glob("{}"))",
TestHelper::appendKuzuRootPath("dataset/copy-multiple-files-test/eK*")))
->isSuccess());
validateKnowsTableAfterCopying();
}
Loading

0 comments on commit 3ec4124

Please sign in to comment.