Skip to content

Commit

Permalink
Rework CSV_TO_PARQUET testing feature
Browse files Browse the repository at this point in the history
  • Loading branch information
manh9203 committed Mar 6, 2024
1 parent 1b858cf commit 9e23995
Show file tree
Hide file tree
Showing 16 changed files with 316 additions and 86 deletions.
2 changes: 1 addition & 1 deletion dataset/long-string-pk-tests/schema.cypher
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
CREATE NODE TABLE Person(name STRING, spouse STRING, PRIMARY KEY (name))
CREATE NODE TABLE Person (name STRING, spouse STRING, PRIMARY KEY (name))
create REL TABLE Knows (FROM Person TO Person);
4 changes: 4 additions & 0 deletions src/common/file_system/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ void FileSystem::overwriteFile(const std::string& /*from*/, const std::string& /
KU_UNREACHABLE;
}

void FileSystem::copyFile(const std::string& /*from*/, const std::string& /*to*/) const {
KU_UNREACHABLE;
}

void FileSystem::createDir(const std::string& /*dir*/) const {
KU_UNREACHABLE;
}
Expand Down
12 changes: 12 additions & 0 deletions src/common/file_system/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@ void LocalFileSystem::overwriteFile(const std::string& from, const std::string&
}
}

void LocalFileSystem::copyFile(const std::string& from, const std::string& to) const {
if (!fileOrPathExists(from))
return;
std::error_code errorCode;
if (!std::filesystem::copy_file(from, to, std::filesystem::copy_options::none, errorCode)) {
// LCOV_EXCL_START
throw Exception(stringFormat(
"Error copying file {} to {}. ErrorMessage: {}", from, to, errorCode.message()));
// LCOV_EXCL_STOP
}
}

void LocalFileSystem::createDir(const std::string& dir) const {
try {
if (std::filesystem::exists(dir)) {
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/file_system/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class KUZU_API FileSystem {

virtual void overwriteFile(const std::string& from, const std::string& to) const;

virtual void copyFile(const std::string& from, const std::string& to) const;

virtual void createDir(const std::string& dir) const;

virtual void removeFileIfExists(const std::string& path) const;
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/file_system/local_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class LocalFileSystem final : public FileSystem {

void overwriteFile(const std::string& from, const std::string& to) const override;

void copyFile(const std::string& from, const std::string& to) const override;

void createDir(const std::string& dir) const override;

void removeFileIfExists(const std::string& path) const override;
Expand Down
64 changes: 38 additions & 26 deletions test/include/test_runner/csv_to_parquet_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,60 @@
#include <string>
#include <vector>

#include "main/kuzu.h"

namespace kuzu {
namespace testing {

// Convert an entire CSV dataset directory to parquet.
// The dataset directory must contain schema and copy files.
class CSVToParquetConverter {
public:
static std::string convertCSVDatasetToParquet(
const std::string& csvDatasetPath, const std::string& parquetDatasetPath);
explicit CSVToParquetConverter(
std::string csvDatasetPath, std::string parquetDatasetPath, uint64_t bufferPoolSize)
: csvDatasetPath{csvDatasetPath}, parquetDatasetPath{parquetDatasetPath},
bufferPoolSize{bufferPoolSize} {}

void convertCSVDatasetToParquet();

inline static std::string replaceSlashesWithUnderscores(std::string dataset) {
std::replace(dataset.begin(), dataset.end(), '/', '_');
return dataset;
}
private:
void copySchemaFile();
void createTableInfo(std::string schemaFile);
void readCopyCommandsFromCSVDataset();
void createCopyFile();
void convertCSVFilesToParquet();

private:
struct CopyCommandInfo {
std::string table;
struct TableInfo {
std::string name;
std::string csvFilePath;
std::string parquetFilePath;
bool csvHasHeader;
char delimiter;
// get cypher query to convert csv file to parquet file
virtual std::string getConverterQuery() const = 0;
};

static std::vector<CopyCommandInfo> readCopyCommandsFromCopyCypherFile(
const std::string& csvDatasetPath, const std::string& parquetDatasetPath);

static void convertCSVFilesToParquet(
const std::vector<CSVToParquetConverter::CopyCommandInfo>& copyCommands);

static CopyCommandInfo createCopyCommandInfo(
const std::string& parquetDatasetPath, std::string copyStatement);

// TODO: This has to be re-implemented to not rely on arrow, instead of on our own parquet lib.
// static arrow::Status runCSVToParquetConversion(const std::string& inputFile,
// const std::string& outputFile, char delimiter, bool hasHeader);
struct NodeTableInfo final : public TableInfo {
std::string primaryKey;
// get converter query for node table
std::string getConverterQuery() const override;
};

static void copySchema(
const std::string& csvDatasetPath, const std::string& parquetDatasetPath);
struct RelTableInfo final : public TableInfo {
std::shared_ptr<NodeTableInfo> fromTable;
std::shared_ptr<NodeTableInfo> toTable;
// get converter query for rel table
std::string getConverterQuery() const override;
};

static void createCopyFile(const std::string& parquetDatasetPath,
const std::vector<CSVToParquetConverter::CopyCommandInfo>& copyCommands);
private:
std::string csvDatasetPath;
std::string parquetDatasetPath;
std::vector<std::shared_ptr<TableInfo>> tables;
std::unordered_map<std::string, std::shared_ptr<TableInfo>> tableNameMap;
uint64_t bufferPoolSize;
std::unique_ptr<main::SystemConfig> systemConfig;
std::unique_ptr<main::Database> tempDb;
std::unique_ptr<main::Connection> tempConn;
};

} // namespace testing
Expand Down
23 changes: 15 additions & 8 deletions test/runner/e2e_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,23 @@ class EndToEndTest : public DBTest {
}

void setUpDataset() {
parquetTempDatasetPath = generateParquetTempDatasetPath();
dataset = TestHelper::appendKuzuRootPath("dataset/" + dataset);
if (datasetType == TestGroup::DatasetType::CSV_TO_PARQUET) {
throw NotImplementedException("CSV_TO_PARQUET dataset type is not implemented yet.");
auto csvDatasetPath = TestHelper::appendKuzuRootPath("dataset/" + dataset);
parquetTempDatasetPath = generateParquetTempDatasetPath();
CSVToParquetConverter converter(csvDatasetPath, parquetTempDatasetPath, bufferPoolSize);
converter.convertCSVDatasetToParquet();
dataset = parquetTempDatasetPath;
} else {
dataset = TestHelper::appendKuzuRootPath("dataset/" + dataset);
}
}

void TearDown() override {
std::filesystem::remove_all(databasePath);
std::filesystem::remove_all(parquetTempDatasetPath);
BaseGraphTest::removeIEDBPath();
if (datasetType == TestGroup::DatasetType::CSV_TO_PARQUET) {
std::filesystem::remove_all(parquetTempDatasetPath);
}
}

void TestBody() override { runTest(testStatements, checkpointWaitTimeout, connNames); }
Expand All @@ -56,10 +62,11 @@ class EndToEndTest : public DBTest {
std::set<std::string> connNames;

std::string generateParquetTempDatasetPath() {
return TestHelper::appendKuzuRootPath(
TestHelper::PARQUET_TEMP_DATASET_PATH +
CSVToParquetConverter::replaceSlashesWithUnderscores(dataset) + getTestGroupAndName() +
TestHelper::getMillisecondsSuffix());
std::string datasetName = dataset;
std::replace(datasetName.begin(), datasetName.end(), '/', '_');
return TestHelper::appendKuzuRootPath(TestHelper::PARQUET_TEMP_DATASET_PATH + datasetName +
"_" + getTestGroupAndName() + "_" +
TestHelper::getMillisecondsSuffix());
}
};

Expand Down
1 change: 0 additions & 1 deletion test/test_files/copy/copy_pk_long_string_parquet.test
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
-GROUP LongStringPKTest
-DATASET PARQUET CSV_TO_PARQUET(long-string-pk-tests)
-SKIP

--

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# FIXME: this test is failing on Parquet dataset
-GROUP LDBCTest
-DATASET PARQUET CSV_TO_PARQUET(ldbc-sf01)
-BUFFER_POOL_SIZE 1073741824
-SKIP

--

-CASE LDBCInteractiveShortParquet
Expand All @@ -22,6 +20,20 @@
---- 1
Mahinda|Perera|19891203|119.235.7.103|Firefox|1353|male|20100214153210447

# IS2 should be changed to use Kleene Star relationship once that is implemented.
# The 'Case When' statement should be supported as coalesce().
-LOG IS2
-STATEMENT MATCH (:Person {id: 21990232555803})<-[:Post_hasCreator|:Comment_hasCreator]-(message)
WITH message,message.id AS messageId, message.creationDate AS messageCreationDate
ORDER BY messageCreationDate DESC, messageId ASC LIMIT 10
MATCH (message)-[:replyOf_Post|:replyOf_Comment*1..2]->(post:Post), (post)-[:Post_hasCreator]->(person)
RETURN messageId, CASE WHEN message.imageFile is NULL THEN message.content ELSE message.imageFile END AS messageContent, messageCreationDate, post.id AS postId, person.id AS personId, person.firstName AS personFirstName, person.lastName AS personLastName
ORDER BY messageCreationDate DESC, messageId ASC;
---- 3
1030792343617|About H•A•M, d Kanye West, released as the first single from their collaborati|20120823142307823|1030792343610|21990232556463|Victor|Antonescu
1030792343876|thx|20120818110412997|1030792343869|21990232555803|Carlos|Lopez
1030792343986|fine|20120810030544084|1030792343978|21990232556837|Bing|Li

-LOG IS3
-STATEMENT MATCH (n:Person {id: 1129})-[r:knows]-(friend)
RETURN friend.id AS personId,
Expand Down Expand Up @@ -58,17 +70,11 @@ Mahinda|Perera|19891203|119.235.7.103|Firefox|1353|male|20100214153210447
2199023256077|Ibrahim Bare|Ousmane

# IS6 should be changed to use Kleene Star relationship once that is implemented
# This query is currently commented out, but will work as soon as multilabelled recursive rels are merged to master.
# -LOG IS6
# -STATEMENT MATCH (m:Comment {id: 962072675825 })-[:replyOf_Post|:replyOf_Comment*1..2]->(p:Post)<-[:containerOf]-(f:Forum)-[:hasModerator]->(mod:Person)
# RETURN
# f.id AS forumId,
# f.title AS forumTitle,
# mod.id AS moderatorId,
# mod.firstName AS moderatorFirstName,
# mod.lastName AS moderatorLastName;
# ---- 1
# 687194767491|Wall of Faisal Malik|21990232556585|Faisal|Malik
-LOG IS6
-STATEMENT MATCH (m:Comment {id: 962072675825 })-[:replyOf_Post|:replyOf_Comment*1..2]->(p:Post)<-[:containerOf]-(f:Forum)-[:hasModerator]->(mod:Person)
RETURN f.id AS forumId,f.title AS forumTitle,mod.id AS moderatorId,mod.firstName AS moderatorFirstName, mod.lastName AS moderatorLastName;
---- 1
687194767491|Wall of Faisal Malik|21990232556585|Faisal|Malik

-LOG IS7
-STATEMENT MATCH (m:Post:Comment {id: 962072971887})<-[:replyOf_Comment|:replyOf_Post]-(c:Comment)-[:Comment_hasCreator]->(p:Person)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
-DATASET PARQUET CSV_TO_PARQUET(lsqb-sf01)
-BUFFER_POOL_SIZE 1073741824
-SKIP

--

-CASE LSQBTestParquet
Expand Down
1 change: 0 additions & 1 deletion test/test_files/order_by/order_by_parquet.test
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

-GROUP OrderByTests
-DATASET PARQUET CSV_TO_PARQUET(order-by-tests)
-SKIP

--

Expand Down
1 change: 0 additions & 1 deletion test/test_files/read_list/large_adj_list_parquet.test
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
-GROUP EndToEndReadLargeListsTest
-DATASET PARQUET CSV_TO_PARQUET(read-list-tests/large-list)
-SKIP

--

Expand Down
Loading

0 comments on commit 9e23995

Please sign in to comment.