Skip to content

Commit

Permalink
VAR_LIST with INT64 type
Browse files Browse the repository at this point in the history
  • Loading branch information
rfdavid committed Aug 20, 2023
1 parent aa2058c commit bc3809d
Show file tree
Hide file tree
Showing 8 changed files with 935 additions and 343 deletions.
20 changes: 19 additions & 1 deletion examples/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,25 @@ int main() {
connection->query("COPY marries FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMarries.csv';");


connection->query("COPY (MATCH (p:person) RETURN p.age) TO 'out.parquet';");
connection->query("COPY (MATCH (p:person) RETURN p.courseScoresPerTerm) TO 'out.parquet';");

// connection->query("COPY (RETURN {first: {f: 44}, second: {s: 38}}) TO 'out.parquet';");
// connection->query("COPY (RETURN {first:[[7],[5]], second:[[4],[3],[2]]}) TO 'out.parquet';");

// connection->query("COPY (RETURN [[[1,2],[3,4]]]) TO 'out.parquet';");
// connection->query("COPY (RETURN [[1,2],[3,4],[5,6,7,8]]) TO 'out.parquet';");
// connection->query("COPY (RETURN [1,2,3,4]) TO 'out.parquet';");

// connection->query("COPY (RETURN [ 1,2,3,4]) TO 'out.parquet';");

// connection->query("COPY (RETURN {first:[1,2,3], second:[4,5,6,7]}) TO 'out.parquet';");


// connection->query("COPY (RETURN {first: {f: 'please'}, second: {s: 'work'}}) TO 'out.parquet';");

// connection->query("COPY (MATCH (p:person) RETURN p.fName, p.gender, p.isStudent, p.age, p.birthdate, p.registerTime) TO 'out.parquet';");
// connection->query("COPY (RETURN {first: {f: 'please'}, second: {s: 'work'}}) TO 'out.parquet';");
// connection->query("COPY (RETURN {first: [[44]], second: [[12]]}) TO 'out.parquet';");
// connection->query("COPY (RETURN {first:[['a'],['b']], second:[['c'],['d'],['e']]}) TO 'out.parquet';");

// connection->query("COPY (MATCH (p:person) RETURN p.workedHours) TO 'dates.parquet';");
Expand Down
6 changes: 3 additions & 3 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ std::unique_ptr<BoundStatement> Binder::bindCopyToClause(const Statement& statem
auto columnName = column->hasAlias() ? column->getAlias() : column->toString();
columnNames.push_back(columnName);
columnTypes.push_back(column->getDataType());
// auto d = column->getDataType();
// auto e = VarListType::getChildType(&d);
// auto f = "ok";
// auto d = column->getDataType();
// auto e = VarListType::getChildType(&d);
// auto f = "ok";
}
if (fileType != CopyDescription::FileType::CSV &&
fileType != CopyDescription::FileType::PARQUET) {
Expand Down
12 changes: 12 additions & 0 deletions src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,18 @@ bool LogicalTypeUtils::isNumerical(const LogicalType& dataType) {
}
}

// TODO (Rui): review the datatypes
bool LogicalTypeUtils::isPrimitive(const LogicalType& dataType) {
switch (dataType.typeID) {
case LogicalTypeID::STRUCT:
case LogicalTypeID::VAR_LIST:
case LogicalTypeID::FIXED_LIST:
return false;
default:
return true;
}
}

std::vector<LogicalType> LogicalTypeUtils::getAllValidComparableLogicalTypes() {
return std::vector<LogicalType>{LogicalType{LogicalTypeID::BOOL},
LogicalType{LogicalTypeID::INT64}, LogicalType{LogicalTypeID::INT32},
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ class LogicalType {
static std::vector<std::unique_ptr<LogicalType>> copy(
const std::vector<std::unique_ptr<LogicalType>>& types);

static const bool isPrimitiveDataType();

private:
void setPhysicalType();

Expand Down Expand Up @@ -421,6 +423,7 @@ class LogicalTypeUtils {
KUZU_API static LogicalType dataTypeFromString(const std::string& dataTypeString);
static uint32_t getRowLayoutSize(const LogicalType& logicalType);
static bool isNumerical(const LogicalType& dataType);
static bool isPrimitive(const LogicalType& dataType);
static std::vector<LogicalType> getAllValidComparableLogicalTypes();
static std::vector<LogicalTypeID> getNumericalLogicalTypeIDs();
static std::vector<LogicalType> getAllValidLogicTypes();
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class ValueVector {

void resetAuxiliaryBuffer();

inline bool isPrimitiveDataType() { return LogicalTypeUtils::isPrimitive(dataType); }

// If there is still non-null values after discarding, return true. Otherwise, return false.
// For an unflat vector, its selection vector is also updated to the resultSelVector.
static bool discardNull(ValueVector& vector);
Expand Down
64 changes: 55 additions & 9 deletions src/include/processor/operator/copy_to/parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include <arrow/io/api.h>
#include <parquet/arrow/writer.h>


namespace kuzu {
namespace processor {

Expand All @@ -20,37 +19,84 @@ class ParquetWriter : public CSVParquetWriter {
void writeValues(std::vector<common::ValueVector*>& outputVectors) override;

private:
static std::shared_ptr<parquet::schema::Node> kuzuTypeToParquetType(
std::string& columnName, const common::LogicalType& logicalType, int length = -1);
// struct ParquetValue {
// common::LogicalTypeID logicalTypeID;
// int16_t definitionLevel;
// int16_t repetitionLevel;
// uint8_t* value;
// };
//
static std::shared_ptr<parquet::schema::Node> kuzuTypeToParquetType(std::string& columnName,
const common::LogicalType& logicalType,
parquet::Repetition::type repetition = parquet::Repetition::REQUIRED, int length = -1);

void* getValueToWrite(common::ValueVector* valueVector, uint32_t selPos);
void writeValue(common::LogicalTypeID type, void* value);

void flush();


void generateSchema();
std::shared_ptr<arrow::Table> generateTable();
void writeParquetFile(const arrow::Table& table);
std::shared_ptr<arrow::Schema> schema;

std::shared_ptr<arrow::DataType> getParquetType(const common::LogicalType& logicalType);
std::shared_ptr<arrow::Array> buildParquetArray(common::ValueVector* vector);

std::vector<std::shared_ptr<arrow::Array>> data;
std::vector<std::shared_ptr<arrow::ArrayBuilder>> builders;

std::unique_ptr<parquet::arrow::FileWriter> fileWriter;
std::shared_ptr<parquet::ParquetFileWriter> fileWriter;
parquet::RowGroupWriter* rowWriter;

std::shared_ptr<arrow::io::FileOutputStream> outFile;

std::shared_ptr<arrow::ArrayBuilder> getBuilder(const common::LogicalType& logicalType);

void pushToBuilder(std::shared_ptr<arrow::ArrayBuilder>& builder, common::ValueVector* vector);
// void writeBatch(ParquetValue& parquetValue);

void writeParquetBatch(common::ValueVector* vector);

// void createBatch(std::vector<ParquetValue>& parquetValues, uint16_t& outSize,
// std::vector<int16_t>& outDefinitionLevels, std::vector<int16_t>& outRepetitionLevels,
// std::vector<uint64_t>& outValues);
//
// void pushToBuilder(std::shared_ptr<arrow::ArrayBuilder>& builder, common::ValueVector*
// vector);

// TODO (Rui) : check what is necessary, lots of unused variables
// ParquetValue getParquetValue(
// common::ValueVector* vector, int16_t repetitionLevel = 0, int16_t definitionLevel = 0);
// std::vector<ParquetValue> getParquetValues(
// common::ValueVector* vector, int16_t repetitionLevel = 0, int16_t definitionLevel = 0);
//
std::shared_ptr<arrow::io::FileOutputStream> outputStream;
parquet::RowGroupWriter* rowWriter;

// template<typename T>
// void extractValues(const common::ValueVector* vector, common::list_entry_t list, std::vector<T>& result);


template<typename T>
struct ParquetBatch {
std::vector<T> values;
std::vector<int16_t> repetitionLevels;
std::vector<int16_t> definitionLevels;
};

template<typename T>
void castValueToVector(const common::LogicalType& dataType, uint8_t* value, common::ValueVector* vector, ParquetBatch<T>& parquetBatch, int currentElementIndex=0, int parentElementIndex=0, int depth=0);

template<typename T>
void extractList(const common::list_entry_t list, const common::ValueVector* vector, ParquetBatch<T>& parquetBatch, int currentElementIndex=0, int parentElementIndex=0, int depth=0);

template<typename T>
void extractStruct(const common::ValueVector* vector, ParquetBatch<T>& parquetBatch);


int getRepetitionLevel(int currentElementIndex, int parentElementIndex, int depth);

// std::vector<int64_t> getValues(common::ValueVector* vector, common::list_entry_t list);

// std::shared_ptr<arrow::Array> buildParquetArray(common::ValueVector* vector);
// std::shared_ptr<arrow::Table> generateTable();
};

} // namespace processor
Expand Down
16 changes: 2 additions & 14 deletions src/processor/map/map_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,14 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyTo(LogicalOperator* logical
}
auto sharedState =
std::make_shared<CSVParquetWriterSharedState>(copy->getCopyDescription().fileType);
// return std::make_unique<CopyTo>(std::make_unique<ResultSetDescriptor>(childSchema),
// sharedState,
// std::move(vectorsToCopyPos), copy->getCopyDescription(), getOperatorID(),
// copy->getExpressionsForPrinting(), std::move(prevOperator));

auto copyTo = std::make_unique<CopyTo>(std::make_unique<ResultSetDescriptor>(childSchema),
sharedState, std::move(vectorsToCopyPos), copy->getCopyDescription(), getOperatorID(),
copy->getExpressionsForPrinting(), std::move(prevOperator));

std::shared_ptr<FactorizedTable> fTable;

auto ftTableSchema = std::make_unique<FactorizedTableSchema>();
// ftTableSchema->appendColumn(
// std::make_unique<ColumnSchema>(false /* flat */, 0 /* dataChunkPos */,
// common::LogicalTypeUtils::getRowLayoutSize(common::LogicalType{common::LogicalTypeID::STRING})));

fTable = std::make_shared<FactorizedTable>(memoryManager, std::move(ftTableSchema));

return createFactorizedTableScan(
binder::expression_vector{}, childSchema, fTable, std::move(copyTo));
return createFactorizedTableScan(binder::expression_vector{}, std::vector<ft_col_idx_t>{},
childSchema, fTable, 0, std::move(copyTo));
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNode(
Expand Down
Loading

0 comments on commit bc3809d

Please sign in to comment.