Skip to content

Commit

Permalink
Primitive datatypes support
Browse files Browse the repository at this point in the history
  • Loading branch information
rfdavid committed Aug 11, 2023
1 parent c6d83a5 commit 33a6ab4
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 152 deletions.
41 changes: 23 additions & 18 deletions examples/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,32 @@ int main() {
auto database = std::make_unique<Database>("puffer_fish" /* fill db path */);
auto connection = std::make_unique<Connection>(database.get());

connection->query("create node table person (ID INt64, fName StRING, gender INT64, isStudent BoOLEAN, isWorker BOOLEAN, age INT64, eyeSight DOUBLE, birthdate DATE, registerTime TIMESTAMP, lastJobDuration interval, workedHours INT64[], usedNames STRING[], courseScoresPerTerm INT64[][], grades INT64[4], height float, PRIMARY KEY (ID));");
connection->query("create node table organisation (ID INT64, name STRING, orgCode INT64, mark DOUBLE, score INT64, history STRING, licenseValidInterval INTERVAL, rating DOUBLE, state STRUCT(revenue INT16, location STRING[], stock STRUCT(price INT64[], volume INT64)), PRIMARY KEY (ID));");
connection->query("create node table movies (name STRING, length INT32, note STRING, description STRUCT(rating DOUBLE, views INT64, release TIMESTAMP, film DATE), content BYTEA, PRIMARY KEY (name));");
connection->query("create rel table knows (FROM person TO person, date DATE, meetTime TIMESTAMP, validInterval INTERVAL, comments STRING[], MANY_MANY);");
connection->query("create rel table studyAt (FROM person TO organisation, year INT64, places STRING[], length INT16,MANY_ONE);");
connection->query("create rel table workAt (FROM person TO organisation, year INT64, grading DOUBLE[2], rating float, MANY_ONE);");
connection->query("create rel table meets (FROM person TO person, location FLOAT[2], times INT, data BYTEA, MANY_ONE);");
connection->query("create rel table marries (FROM person TO person, usedAddress STRING[], address INT16[2], note STRING, ONE_ONE);");
// connection->query("create node table person (ID INt64, fName StRING, gender INT64, isStudent BoOLEAN, isWorker BOOLEAN, age INT64, eyeSight DOUBLE, birthdate DATE, registerTime TIMESTAMP, lastJobDuration interval, workedHours INT64[], usedNames STRING[], courseScoresPerTerm INT64[][], grades INT64[4], height float, PRIMARY KEY (ID));");
// connection->query("create node table organisation (ID INT64, name STRING, orgCode INT64, mark DOUBLE, score INT64, history STRING, licenseValidInterval INTERVAL, rating DOUBLE, state STRUCT(revenue INT16, location STRING[], stock STRUCT(price INT64[], volume INT64)), PRIMARY KEY (ID));");
// connection->query("create node table movies (name STRING, length INT32, note STRING, description STRUCT(rating DOUBLE, views INT64, release TIMESTAMP, film DATE), content BYTEA, PRIMARY KEY (name));");
// connection->query("create rel table knows (FROM person TO person, date DATE, meetTime TIMESTAMP, validInterval INTERVAL, comments STRING[], MANY_MANY);");
// connection->query("create rel table studyAt (FROM person TO organisation, year INT64, places STRING[], length INT16,MANY_ONE);");
// connection->query("create rel table workAt (FROM person TO organisation, year INT64, grading DOUBLE[2], rating float, MANY_ONE);");
// connection->query("create rel table meets (FROM person TO person, location FLOAT[2], times INT, data BYTEA, MANY_ONE);");
// connection->query("create rel table marries (FROM person TO person, usedAddress STRING[], address INT16[2], note STRING, ONE_ONE);");
//
// connection->query("COPY person FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vPerson.csv' (HEADER=true, DELIM=',');");
// connection->query("COPY organisation FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vOrganisation.csv';");
// connection->query("COPY movies FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vMovies.csv';");
// connection->query("COPY knows FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eKnows.csv';");
// connection->query("COPY studyAt FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eStudyAt.csv' (HEADER=true);");
// connection->query("COPY workAt FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eWorkAt.csv';");
// connection->query("COPY meets FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMeets.csv';");
// connection->query("COPY marries FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMarries.csv';");

connection->query("COPY person FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vPerson.csv' (HEADER=true, DELIM=',');");
connection->query("COPY organisation FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vOrganisation.csv';");
connection->query("COPY movies FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vMovies.csv';");
connection->query("COPY knows FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eKnows.csv';");
connection->query("COPY studyAt FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eStudyAt.csv' (HEADER=true);");
connection->query("COPY workAt FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eWorkAt.csv';");
connection->query("COPY meets FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMeets.csv';");
connection->query("COPY marries FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMarries.csv';");

// connection->query("COPY (MATCH (a:person)-[e]->(b:person) RETURN a.ID, a.birthdate, a.registerTime, b.ID) TO 'out.parquet';");
connection->query("COPY (MATCH (p:person) RETURN p.ID, p.birthdate, p.registerTime) TO 'dates.parquet';");

// connection->query("COPY (MATCH (b:person) RETURN b.courseScoresPerTerm) TO 'out.parquet';");
// connection->query("COPY (MATCH (b:person) RETURN b.workedHours) TO 'out.parquet';");
// connection->query("COPY (MATCH (a:person)-[e]->(b:person) RETURN a.ID, a.birthdate, a.registerTime, b.ID) TO 'out.parquet';");
// connection->query("COPY (MATCH (b:person) RETURN b.isStudent) TO 'out.csv';");
// connection->query("COPY (MATCH (a:person)-[e]->(b:person) RETURN ID(e), a.ID, b.ID) TO 'a.csv';");
// connection->query("COPY (MATCH (a:person) RETURN a.fName, a.workedHours) TO 'a.csv';");
connection->query("COPY (MATCH (a:person) RETURN a.fName, a.workedHours) TO 'a.csv';");
// connection->query("COPY (MATCH (a:person) RETURN a.fName, a.workedHours) TO 'a.csv';");
}
3 changes: 1 addition & 2 deletions src/include/processor/mapper/plan_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include "common/statement_type.h"
#include "planner/logical_plan/logical_plan.h"
#include "processor/mapper/expression_mapper.h"
#include "processor/operator/copy_to/copy_to.h"
#include "processor/operator/result_collector.h"
#include "processor/physical_plan.h"
#include "storage/storage_manager.h"
Expand Down Expand Up @@ -66,6 +65,7 @@ class PlanMapper {
std::unique_ptr<PhysicalOperator> mapCreateNodeTable(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapCreateRelTable(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapCopyFrom(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapCopyTo(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapCopyNode(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapCopyRel(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapDropTable(planner::LogicalOperator* logicalOperator);
Expand All @@ -78,7 +78,6 @@ class PlanMapper {
std::unique_ptr<PhysicalOperator> mapExplain(planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapCreateMacro(planner::LogicalOperator* logicalOperator);

std::unique_ptr<CopyTo> mapCopyTo(planner::LogicalOperator* logicalOperator);
std::unique_ptr<ResultCollector> createResultCollector(common::AccumulateType accumulateType,
const binder::expression_vector& expressions, planner::Schema* schema,
std::unique_ptr<PhysicalOperator> prevOperator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class CSVParquetWriter {
virtual ~CSVParquetWriter(){};
virtual void open(const std::string& filePath) = 0;
virtual void init() = 0;
virtual void closeFile() = 0;
virtual void writeValues(std::vector<common::ValueVector*>& outputVectors) = 0;

// TODO (Rui): check setters and getters vs directly access
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/copy_to/csv_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class CSVWriter : public CSVParquetWriter {
CSVWriter(){};
void open(const std::string& filePath) override;
void init() override;
inline void closeFile() override { flush(); }
void writeValues(std::vector<common::ValueVector*>& outputVectors) override;

private:
Expand Down
8 changes: 3 additions & 5 deletions src/include/processor/operator/copy_to/parquet_writer.h
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
#pragma once

// TODO (Rui): check what is needed here
#include "arrow/io/file.h"
#include "common/arrow/arrow_converter.h"
#include "common/copier_config/copier_config.h"
#include "parquet/stream_writer.h"
#include "processor/operator/copy_to/csv_parquet_writer.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class ParquetWriter : public CSVParquetWriter {
public:
ParquetWriter(){};
// TODO (Rui): rename to openFile to not confuse with open syscall
void open(const std::string& filePath) override;
void init() override;
void closeFile() override;
void writeValues(std::vector<common::ValueVector*>& outputVectors) override;

private:
Expand All @@ -26,9 +24,9 @@ class ParquetWriter : public CSVParquetWriter {

parquet::StreamWriter streamWriter;
std::shared_ptr<arrow::io::FileOutputStream> outputStream;

std::shared_ptr<arrow::io::FileOutputStream> outFile;
std::shared_ptr<parquet::ParquetFileWriter> fileWriter;
parquet::RowGroupWriter* rowWriter;
};

} // namespace processor
Expand Down
24 changes: 20 additions & 4 deletions src/processor/mapper/map_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyFrom(LogicalOperator* logic
throw common::NotImplementedException{"PlanMapper::mapCopy"};
}
}

std::unique_ptr<CopyTo> PlanMapper::mapCopyTo(LogicalOperator* logicalOperator) {
std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyTo(LogicalOperator* logicalOperator) {
auto copy = (LogicalCopyTo*)logicalOperator;
auto childSchema = logicalOperator->getChild(0)->getSchema();
auto prevOperator = mapOperator(logicalOperator->getChild(0).get());
Expand All @@ -39,9 +38,26 @@ std::unique_ptr<CopyTo> PlanMapper::mapCopyTo(LogicalOperator* logicalOperator)
}
auto sharedState =
std::make_shared<CSVParquetWriterSharedState>(copy->getCopyDescription().fileType);
return std::make_unique<CopyTo>(std::make_unique<ResultSetDescriptor>(childSchema), sharedState,
std::move(vectorsToCopyPos), copy->getCopyDescription(), getOperatorID(),
// return std::make_unique<CopyTo>(std::make_unique<ResultSetDescriptor>(childSchema),
// sharedState,
// std::move(vectorsToCopyPos), copy->getCopyDescription(), getOperatorID(),
// copy->getExpressionsForPrinting(), std::move(prevOperator));

auto copyTo = std::make_unique<CopyTo>(std::make_unique<ResultSetDescriptor>(childSchema),
sharedState, std::move(vectorsToCopyPos), copy->getCopyDescription(), getOperatorID(),
copy->getExpressionsForPrinting(), std::move(prevOperator));

std::shared_ptr<FactorizedTable> fTable;

auto ftTableSchema = std::make_unique<FactorizedTableSchema>();
// ftTableSchema->appendColumn(
// std::make_unique<ColumnSchema>(false /* flat */, 0 /* dataChunkPos */,
// common::LogicalTypeUtils::getRowLayoutSize(common::LogicalType{common::LogicalTypeID::STRING})));

fTable = std::make_shared<FactorizedTable>(memoryManager, std::move(ftTableSchema));

return createFactorizedTableScan(
binder::expression_vector{}, childSchema, fTable, std::move(copyTo));
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNode(
Expand Down
1 change: 0 additions & 1 deletion src/processor/mapper/plan_mapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ std::unique_ptr<PhysicalOperator> PlanMapper::appendResultCollectorIfNotCopy(
// We have a special code path for executing copy rel and copy npy, so we don't need to append
// the resultCollector.
if (lastOperator->getOperatorType() != PhysicalOperatorType::COPY_REL &&
lastOperator->getOperatorType() != PhysicalOperatorType::COPY_TO &&
lastOperator->getOperatorType() != PhysicalOperatorType::COPY_NPY) {
lastOperator = createResultCollector(
common::AccumulateType::REGULAR, expressionsToCollect, schema, std::move(lastOperator));
Expand Down
4 changes: 1 addition & 3 deletions src/processor/operator/copy_to/copy_to.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,14 @@ void CopyTo::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* cont
}
}

// bool CopyTo::getNextTuplesInternal(ExecutionContext* context) {
void CopyTo::executeInternal(ExecutionContext* context) {
while (children[0]->getNextTuple(context)) {
std::cout << "CopyTo::executeInternal" << std::endl;
sharedState->getWriter()->writeValues(outputVectors);
}
}

void CopyTo::finalize(ExecutionContext* context) {
std::cout << "Finalized." << std::endl;
sharedState->getWriter()->closeFile();
}

} // namespace processor
Expand Down
Loading

0 comments on commit 33a6ab4

Please sign in to comment.