Skip to content

Commit

Permalink
Attempt of using arrow low-level API for parquet writing
Browse files Browse the repository at this point in the history
  • Loading branch information
rfdavid committed Aug 12, 2023
1 parent 542e3e0 commit 9d4c839
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 42 deletions.
36 changes: 18 additions & 18 deletions examples/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@ int main() {
auto database = std::make_unique<Database>("puffer_fish" /* fill db path */);
auto connection = std::make_unique<Connection>(database.get());

// connection->query("create node table person (ID INt64, fName StRING, gender INT64, isStudent BoOLEAN, isWorker BOOLEAN, age INT64, eyeSight DOUBLE, birthdate DATE, registerTime TIMESTAMP, lastJobDuration interval, workedHours INT64[], usedNames STRING[], courseScoresPerTerm INT64[][], grades INT64[4], height float, PRIMARY KEY (ID));");
// connection->query("create node table organisation (ID INT64, name STRING, orgCode INT64, mark DOUBLE, score INT64, history STRING, licenseValidInterval INTERVAL, rating DOUBLE, state STRUCT(revenue INT16, location STRING[], stock STRUCT(price INT64[], volume INT64)), PRIMARY KEY (ID));");
// connection->query("create node table movies (name STRING, length INT32, note STRING, description STRUCT(rating DOUBLE, views INT64, release TIMESTAMP, film DATE), content BYTEA, PRIMARY KEY (name));");
// connection->query("create rel table knows (FROM person TO person, date DATE, meetTime TIMESTAMP, validInterval INTERVAL, comments STRING[], MANY_MANY);");
// connection->query("create rel table studyAt (FROM person TO organisation, year INT64, places STRING[], length INT16,MANY_ONE);");
// connection->query("create rel table workAt (FROM person TO organisation, year INT64, grading DOUBLE[2], rating float, MANY_ONE);");
// connection->query("create rel table meets (FROM person TO person, location FLOAT[2], times INT, data BYTEA, MANY_ONE);");
// connection->query("create rel table marries (FROM person TO person, usedAddress STRING[], address INT16[2], note STRING, ONE_ONE);");
//
// connection->query("COPY person FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vPerson.csv' (HEADER=true, DELIM=',');");
// connection->query("COPY organisation FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vOrganisation.csv';");
// connection->query("COPY movies FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vMovies.csv';");
// connection->query("COPY knows FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eKnows.csv';");
// connection->query("COPY studyAt FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eStudyAt.csv' (HEADER=true);");
// connection->query("COPY workAt FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eWorkAt.csv';");
// connection->query("COPY meets FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMeets.csv';");
// connection->query("COPY marries FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMarries.csv';");
connection->query("create node table person (ID INt64, fName StRING, gender INT64, isStudent BoOLEAN, isWorker BOOLEAN, age INT64, eyeSight DOUBLE, birthdate DATE, registerTime TIMESTAMP, lastJobDuration interval, workedHours INT64[], usedNames STRING[], courseScoresPerTerm INT64[][], grades INT64[4], height float, PRIMARY KEY (ID));");
connection->query("create node table organisation (ID INT64, name STRING, orgCode INT64, mark DOUBLE, score INT64, history STRING, licenseValidInterval INTERVAL, rating DOUBLE, state STRUCT(revenue INT16, location STRING[], stock STRUCT(price INT64[], volume INT64)), PRIMARY KEY (ID));");
connection->query("create node table movies (name STRING, length INT32, note STRING, description STRUCT(rating DOUBLE, views INT64, release TIMESTAMP, film DATE), content BYTEA, PRIMARY KEY (name));");
connection->query("create rel table knows (FROM person TO person, date DATE, meetTime TIMESTAMP, validInterval INTERVAL, comments STRING[], MANY_MANY);");
connection->query("create rel table studyAt (FROM person TO organisation, year INT64, places STRING[], length INT16,MANY_ONE);");
connection->query("create rel table workAt (FROM person TO organisation, year INT64, grading DOUBLE[2], rating float, MANY_ONE);");
connection->query("create rel table meets (FROM person TO person, location FLOAT[2], times INT, data BYTEA, MANY_ONE);");
connection->query("create rel table marries (FROM person TO person, usedAddress STRING[], address INT16[2], note STRING, ONE_ONE);");

connection->query("COPY person FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vPerson.csv' (HEADER=true, DELIM=',');");
connection->query("COPY organisation FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vOrganisation.csv';");
connection->query("COPY movies FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vMovies.csv';");
connection->query("COPY knows FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eKnows.csv';");
connection->query("COPY studyAt FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eStudyAt.csv' (HEADER=true);");
connection->query("COPY workAt FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eWorkAt.csv';");
connection->query("COPY meets FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMeets.csv';");
connection->query("COPY marries FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMarries.csv';");

connection->query("COPY (MATCH (p:person) RETURN p.ID, p.birthdate, p.registerTime) TO 'dates.parquet';");

connection->query("COPY (MATCH (p:person) RETURN p.workedHours) TO 'dates.parquet';");

// connection->query("COPY (MATCH (b:person) RETURN b.courseScoresPerTerm) TO 'out.parquet';");
// connection->query("COPY (MATCH (b:person) RETURN b.workedHours) TO 'out.parquet';");
Expand Down
7 changes: 5 additions & 2 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ std::unique_ptr<BoundStatement> Binder::bindCopyToClause(const Statement& statem
auto boundFilePath = copyToStatement.getFilePath();
auto fileType = bindFileType(boundFilePath);
std::vector<std::string> columnNames;
std::vector<LogicalTypeID> columnTypes;
std::vector<LogicalType> columnTypes;
auto query = bindQuery(*copyToStatement.getRegularQuery());
auto columns = query->getStatementResult()->getColumns();
for (auto& column : columns) {
auto columnName = column->hasAlias() ? column->getAlias() : column->toString();
columnNames.push_back(columnName);
columnTypes.push_back(column->getDataType().getLogicalTypeID());
columnTypes.push_back(column->getDataType());
// auto d = column->getDataType();
// auto e = VarListType::getChildType(&d);
// auto f = "ok";
}
if (fileType != CopyDescription::FileType::CSV &&
fileType != CopyDescription::FileType::PARQUET) {
Expand Down
2 changes: 1 addition & 1 deletion src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ CopyDescription::CopyDescription(
// Copy To
CopyDescription::CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
const std::vector<std::string>& columnNames,
const std::vector<common::LogicalTypeID>& columnTypes)
const std::vector<common::LogicalType>& columnTypes)
: filePaths{filePaths}, fileType{fileType}, columnNames{columnNames}, columnTypes{columnTypes} {
}

Expand Down
4 changes: 2 additions & 2 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct CopyDescription {
// Copy To
CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
const std::vector<std::string>& columnNames,
const std::vector<common::LogicalTypeID>& columnTypes);
const std::vector<common::LogicalType>& columnTypes);

CopyDescription(const CopyDescription& copyDescription);

Expand All @@ -54,7 +54,7 @@ struct CopyDescription {

const std::vector<std::string> filePaths;
const std::vector<std::string> columnNames;
const std::vector<common::LogicalTypeID> columnTypes;
const std::vector<common::LogicalType> columnTypes;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;
FileType fileType;
};
Expand Down
8 changes: 4 additions & 4 deletions src/include/processor/operator/copy_to/csv_parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ namespace processor {
class CSVParquetWriter {
public:
virtual ~CSVParquetWriter(){};
virtual void open(const std::string& filePath) = 0;
virtual void open(const std::string& filePath) = 0; // rename
virtual void init() = 0;
virtual void closeFile() = 0;
virtual void writeValues(std::vector<common::ValueVector*>& outputVectors) = 0;

// TODO (Rui): check setters and getters vs directly access
inline void setColumns(const std::vector<std::string>& columnNames,
const std::vector<common::LogicalTypeID>& columnTypes) {
const std::vector<common::LogicalType>& columnTypes) {
this->columnNames = columnNames;
this->columnTypes = columnTypes;
}
inline std::vector<std::string>& getColumnNames() { return columnNames; }
inline std::vector<common::LogicalTypeID>& getColumnTypes() { return columnTypes; }
inline std::vector<common::LogicalType>& getColumnTypes() { return columnTypes; }

private:
std::vector<std::string> columnNames;
std::vector<common::LogicalTypeID> columnTypes;
std::vector<common::LogicalType> columnTypes;
};

} // namespace processor
Expand Down
6 changes: 3 additions & 3 deletions src/include/processor/operator/copy_to/parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ class ParquetWriter : public CSVParquetWriter {

private:
static std::shared_ptr<parquet::schema::Node> kuzuTypeToParquetType(
std::string& columnName, common::LogicalTypeID& typeID, int length = -1);
std::string& columnName, const common::LogicalType& logicalType, int length = -1);

void streamWrite(common::ValueVector* vector);
void* getValueToWrite(common::ValueVector* valueVector, uint32_t selPos);
void writeValue(common::LogicalTypeID type, void* value);

parquet::StreamWriter streamWriter;
std::shared_ptr<arrow::io::FileOutputStream> outputStream;
std::shared_ptr<arrow::io::FileOutputStream> outFile;
std::shared_ptr<parquet::ParquetFileWriter> fileWriter;
Expand Down
Loading

0 comments on commit 9d4c839

Please sign in to comment.