Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add glob support to copycsv #1376

Merged
merged 1 commit into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ kU_CopyCSV

kU_FilePaths
: '[' SP? StringLiteral ( SP? ',' SP? StringLiteral )* ']'
| StringLiteral ;
| StringLiteral
| GLOB SP? '(' SP? StringLiteral SP? ')' ;

GLOB : ( 'G' | 'g' ) ( 'L' | 'l' ) ( 'O' | 'o' ) ( 'B' | 'b' ) ;

kU_ParsingOptions
: kU_ParsingOption ( SP? ',' SP? kU_ParsingOption )* ;
Expand Down
45 changes: 22 additions & 23 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,26 @@ std::unique_ptr<BoundStatement> Binder::bindCopy(const Statement& statement) {
auto tableName = copyCSV.getTableName();
validateTableExist(catalog, tableName);
auto tableID = catalogContent->getTableID(tableName);
auto filePaths = copyCSV.getFilePaths();
auto csvReaderConfig = bindParsingOptions(copyCSV.getParsingOptions());
auto filePaths = bindFilePaths(copyCSV.getFilePaths());
auto fileType = bindFileType(filePaths);
return make_unique<BoundCopy>(
CopyDescription(filePaths, csvReaderConfig, fileType), tableID, tableName);
}

std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& filePaths) {
std::vector<std::string> boundFilePaths;
for (auto& filePath : filePaths) {
auto globbedFilePaths = FileUtils::globFilePath(filePath);
boundFilePaths.insert(
boundFilePaths.end(), globbedFilePaths.begin(), globbedFilePaths.end());
}
if (boundFilePaths.empty()) {
throw BinderException{StringUtils::string_format("Invalid file path: {}.", filePaths[0])};
}
return boundFilePaths;
}

CSVReaderConfig Binder::bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<ParsedExpression>>* parsingOptions) {
CSVReaderConfig csvReaderConfig;
Expand Down Expand Up @@ -85,29 +98,15 @@ CopyDescription::FileType Binder::bindFileType(std::vector<std::string> filePath
auto csvSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::CSV);
auto arrowSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::ARROW);
auto parquetSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::PARQUET);

if (fileName.length() >= csvSuffix.length()) {
if (!fileName.compare(
fileName.length() - csvSuffix.length(), csvSuffix.length(), csvSuffix)) {
return CopyDescription::FileType::CSV;
}
if (fileName.ends_with(csvSuffix)) {
return CopyDescription::FileType::CSV;
} else if (fileName.ends_with(arrowSuffix)) {
return CopyDescription::FileType::ARROW;
} else if (fileName.ends_with(parquetSuffix)) {
return CopyDescription::FileType::PARQUET;
} else {
throw CopyException("Unsupported file type: " + fileName);
}

if (fileName.length() >= arrowSuffix.length()) {
if (!fileName.compare(
fileName.length() - arrowSuffix.length(), arrowSuffix.length(), arrowSuffix)) {
return CopyDescription::FileType::ARROW;
}
}

if (fileName.length() >= parquetSuffix.length()) {
if (!fileName.compare(fileName.length() - parquetSuffix.length(), parquetSuffix.length(),
parquetSuffix)) {
return CopyDescription::FileType::PARQUET;
}
}

throw CopyException("Unsupported file type: " + fileName);
}

} // namespace binder
Expand Down
10 changes: 10 additions & 0 deletions src/common/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,15 @@ void FileUtils::truncateFileToEmpty(FileInfo* fileInfo) {
ftruncate(fileInfo->fd, 0);
}

std::vector<std::string> FileUtils::globFilePath(const std::string& path) {
std::vector<std::string> result;
glob_t globResult;
glob(path.c_str(), GLOB_TILDE, nullptr, &globResult);
for (auto i = 0u; i < globResult.gl_pathc; ++i) {
result.emplace_back(globResult.gl_pathv[i]);
}
return result;
}

} // namespace common
} // namespace kuzu
2 changes: 2 additions & 0 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class Binder {
/*** bind copy csv ***/
std::unique_ptr<BoundStatement> bindCopy(const parser::Statement& statement);

std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);

common::CSVReaderConfig bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>*
parsingOptions);
Expand Down
5 changes: 5 additions & 0 deletions src/include/common/file_utils.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#pragma once

#include <fcntl.h>
#include <glob.h>
#include <sys/stat.h>
#include <unistd.h>

#include <filesystem>
#include <string>
#include <vector>

namespace kuzu {
namespace common {
Expand Down Expand Up @@ -54,6 +56,9 @@ class FileUtils {
}
return s.st_size;
}

static std::vector<std::string> globFilePath(const std::string& path);
};

} // namespace common
} // namespace kuzu
2 changes: 0 additions & 2 deletions src/include/processor/operator/copy/copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ class Copy : public PhysicalOperator {
virtual uint64_t executeInternal(
common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) = 0;

virtual uint64_t getNumTuplesInTable() = 0;

virtual bool allowCopyCSV() = 0;

protected:
Expand Down
2 changes: 0 additions & 2 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ class CopyNode : public Copy {
uint64_t executeInternal(
common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;

uint64_t getNumTuplesInTable() override;

private:
inline bool allowCopyCSV() override {
return nodesStatistics->getNodeStatisticsAndDeletedIDs(tableID)->getNumTuples() == 0;
Expand Down
2 changes: 0 additions & 2 deletions src/include/processor/operator/copy/copy_rel.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ class CopyRel : public Copy {
uint64_t executeInternal(
common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;

uint64_t getNumTuplesInTable() override;

private:
inline bool allowCopyCSV() override {
return relsStatistics->getRelStatistics(tableID)->getNextRelOffset() == 0;
Expand Down
1 change: 0 additions & 1 deletion src/include/processor/operator/ddl/ddl.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class DDL : public PhysicalOperator {
uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, catalog{catalog}, outputPos{outputPos} {
}
virtual ~DDL() override = default;

inline bool isSource() const override { return true; }

Expand Down
6 changes: 0 additions & 6 deletions src/processor/operator/copy/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,5 @@ uint64_t CopyNode::executeInternal(
return numNodesCopied;
}

uint64_t CopyNode::getNumTuplesInTable() {
// TODO(Ziyi): this chains looks weird. Fix when refactoring table statistics. Ditto in
// CopyRel.
return nodesStatistics->getReadOnlyVersion()->tableStatisticPerTable[tableID]->getNumTuples();
}

} // namespace processor
} // namespace kuzu
4 changes: 0 additions & 4 deletions src/processor/operator/copy/copy_rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,5 @@ uint64_t CopyRel::executeInternal(
return numRelsCopied;
}

uint64_t CopyRel::getNumTuplesInTable() {
return relsStatistics->getReadOnlyVersion()->tableStatisticPerTable[tableID]->getNumTuples();
}

} // namespace processor
} // namespace kuzu
42 changes: 33 additions & 9 deletions test/copy/copy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ class CopyNodeInitRelTablesTest : public BaseGraphTest {
};

class CopyMultipleFilesTest : public DBTest {

public:
void validatePersonTableAfterCopying() {
auto result = conn->query("MATCH (p:person) RETURN p.ID");
ASSERT_TRUE(result->isSuccess());
auto groundTruth =
std::vector<std::string>{"1", "12", "15", "20", "21", "4", "5", "6", "7", "8", "9"};
ASSERT_EQ(TestHelper::convertResultToString(*result), groundTruth);
}

void validateKnowsTableAfterCopying() {
auto result = conn->query("MATCH (:person)-[e:knows]->(:person) RETURN e.weight");
ASSERT_TRUE(result->isSuccess());
auto groundTruth = std::vector<std::string>{"22", "25", "33", "41", "44", "79", "80", "85"};
ASSERT_EQ(TestHelper::convertResultToString(*result), groundTruth);
}

private:
std::string getInputDir() override {
return TestHelper::appendKuzuRootPath("dataset/copy-multiple-files-test/");
}
Expand Down Expand Up @@ -303,19 +321,25 @@ TEST_F(CopyMultipleFilesTest, CopyMultipleFilesTest) {
TestHelper::appendKuzuRootPath("dataset/copy-multiple-files-test/vPerson2.csv"),
TestHelper::appendKuzuRootPath("dataset/copy-multiple-files-test/vPerson3.csv")))
->isSuccess());
validatePersonTableAfterCopying();
ASSERT_TRUE(
conn->query(
StringUtils::string_format(R"(COPY knows FROM ["{}", "{}"])",
TestHelper::appendKuzuRootPath("dataset/copy-multiple-files-test/eKnows1.csv"),
TestHelper::appendKuzuRootPath("dataset/copy-multiple-files-test/eKnows2.csv")))
->isSuccess());
auto result = conn->query("MATCH (p:person) RETURN p.ID");
ASSERT_TRUE(result->isSuccess());
auto groundTruth =
std::vector<std::string>{"1", "12", "15", "20", "21", "4", "5", "6", "7", "8", "9"};
ASSERT_EQ(TestHelper::convertResultToString(*result), groundTruth);
result = conn->query("MATCH (:person)-[e:knows]->(:person) RETURN e.weight");
ASSERT_TRUE(result->isSuccess());
groundTruth = std::vector<std::string>{"22", "25", "33", "41", "44", "79", "80", "85"};
ASSERT_EQ(TestHelper::convertResultToString(*result), groundTruth);
validateKnowsTableAfterCopying();
}

TEST_F(CopyMultipleFilesTest, CopyFilesWithWildcardPattern) {
ASSERT_TRUE(conn->query(StringUtils::string_format(R"(COPY person FROM "{}")",
TestHelper::appendKuzuRootPath(
"dataset/copy-multiple-files-test/vPerson?.csv")))
->isSuccess());
validatePersonTableAfterCopying();
ASSERT_TRUE(
conn->query(StringUtils::string_format(R"(COPY knows FROM glob("{}"))",
TestHelper::appendKuzuRootPath("dataset/copy-multiple-files-test/eK*")))
->isSuccess());
validateKnowsTableAfterCopying();
}
Loading