Skip to content

Commit

Permalink
Added datatypes to var list and struct
Browse files Browse the repository at this point in the history
  • Loading branch information
rfdavid committed Aug 31, 2023
1 parent f460fbe commit 8ff867d
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 283 deletions.
13 changes: 11 additions & 2 deletions examples/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,17 @@ int main() {
connection->query("COPY meets FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMeets.csv';");
connection->query("COPY marries FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMarries.csv';");

connection->query("COPY (MATCH (p:person) RETURN 1, p.eyeSight) TO 'out.parquet';");
// connection->query("COPY (return {a: {b: 1, c:2}, d: {b: 3, c:'asdasdd'}}) to 'out.parquet';");
// connection->query("copy (return [0,1,null,2]) to 'out.parquet';");

connection->query("COPY (RETURN {a: {b: 1, c: 2}, d: {b: 3, c: 4}}) TO 'out.parquet';");
// connection->query("COPY (return {b: [{c: [1,2], d: [3]}, {c: [4], d: [5,6]} ]}) to 'out.parquet';");

// connection->query("COPY (return {b: [{c: [1,2], d: [3]}, {c: [4], d: [5,6]} ], e: [{c: [7,8], d: [9]}] }) to 'out.parquet';");

// connection->query("COPY (return {b: [{c: [1,2], d: [3]}, {c: [4], d: [5,6]} ], e: [{c: [7,8], d: [9]}] }) to 'out.parquet';");
// connection->query("COPY (RETURN {a: {c: [1,3], d: [2,77,8]}, b: {c: [3], d: [4,9,0]}}) TO 'out.parquet';");
// connection->query("COPY (RETURN {a: {b: 1, c: 2}, d: {b: 3, c: 4}}) TO 'out.parquet';");
// connection->query("COPY (MATCH (p:person) RETURN p.fName, p.gender) TO 'out.parquet';");
// connection->query("COPY (MATCH (p:person) RETURN p.fName, p.gender, p.isStudent, p.age, p.birthdate, p.registerTime) TO 'out.parquet';");

Expand Down Expand Up @@ -56,7 +65,7 @@ int main() {


// ERROR
// connection->query("COPY (RETURN {a: [{b: 1}, {b: 2}], c: [{d: 3}]}) TO 'out.parquet';");
// connection->query("COPY (RETURN {a: [{b: 1}, {b: 2}], c: [{d: 3}]}) TO 'out.parquet';");
// connection->query("COPY (RETURN {a: {b: 1, c: 2}, b: {b: 3, c: 4}}) TO 'out.parquet';");


Expand Down
2 changes: 0 additions & 2 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,6 @@ class LogicalType {
static std::vector<std::unique_ptr<LogicalType>> copy(
const std::vector<std::unique_ptr<LogicalType>>& types);

static const bool isPrimitiveDataType();

private:
void setPhysicalType();

Expand Down
75 changes: 34 additions & 41 deletions src/include/processor/operator/copy_to/parquet_column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,70 +25,63 @@ class ParquetColumnWriter {
int kuzuColumn, common::ValueVector* vector, parquet::RowGroupWriter* rowWriter);

private:
// define the writers
// using BoolWriter = TypedColumnWriter<BooleanType>;
// using Int32Writer = TypedColumnWriter<Int32Type>;
// using Int64Writer = TypedColumnWriter<Int64Type>;
// using Int96Writer = TypedColumnWriter<Int96Type>;
// using FloatWriter = TypedColumnWriter<FloatType>;
// using DoubleWriter = TypedColumnWriter<DoubleType>;
// using ByteArrayWriter = TypedColumnWriter<ByteArrayType>;
// using FixedLenByteArrayWriter = TypedColumnWriter<FLBAType>;
parquet::Int64Writer* int64Writer;
parquet::ByteArrayWriter* byteArrayWriter;
parquet::BoolWriter* boolWriter;
parquet::RowGroupWriter* rowWriter;
parquet::Int32Writer* int32Writer;
parquet::DoubleWriter* doubleWriter;
parquet::FloatWriter* floatWriter;

void nextParquetColumn(common::LogicalTypeID logicalTypeID);
void writePrimitiveValue(common::LogicalTypeID logicalTypeID, uint8_t* value,
int16_t definitionLevel = 0, int16_t repetitionLevel = 0);

// ParquetBatch contains the information needed by low-level arrow API writeBatch:
// WriteBatch(int64_t num_values, const int16_t* def_levels,
// const int16_t* rep_levels, const T* values
template<typename T>
struct ParquetBatch {
std::vector<T> values;
std::vector<int16_t> repetitionLevels;
std::vector<int16_t> definitionLevels;
};

template<typename T>
struct ParquetColumn {
common::LogicalTypeID logicalTypeID;
std::vector<int16_t> repetitionLevels;
std::vector<int16_t> definitionLevels;
std::vector<T> values;
std::vector<uint8_t*> values;
};

inline void initNewColumn() {
currentParquetColumn = 0;
isListStarting = true;
}
inline void initNewColumn() { isListStarting = true; }

// template<typename WriterType, typename DataType>
// void writeParquetColumn(parquet::RowGroupWriter* rowWriter, ParquetColumn<DataType>& parquetColumn);
void castValueToVector(uint8_t* value, common::ValueVector* vector,
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

void setParquetColumn(bool isParentAList);

// This is a wrapper to simplify the parquet writeBatch API
template<typename WriterType, typename DataType>
void writeBatch(parquet::RowGroupWriter* rowWriter, int64_t numValues, DataType* values,
const int16_t* definitionLevels = nullptr, const int16_t* repetitionLevels = nullptr);

template<typename T>
void castValueToVector(const common::LogicalType& dataType, uint8_t* value,
common::ValueVector* vector, std::unordered_map<int, ParquetBatch<T>>& parquetBatch,
int currentElementIndex = 0, int parentElementIndex = 0, int depth = 0,
bool isParentAList = false);

template<typename T>
void extractList(const common::list_entry_t& list, const common::ValueVector* vector,
std::unordered_map<int, ParquetBatch<T>>& parquetBatch, int currentElementIndex = 0,
int parentElementIndex = 0, int depth = 0);
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

template<typename T>
void extractStruct(const common::struct_entry_t& val, const common::ValueVector* vector,
std::unordered_map<int, ParquetBatch<T>>& parquetBatch, int currentElementIndex,
int parentElementIndex, int depth);
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

void extractStruct2(const common::struct_entry_t& val, const common::ValueVector* vector);

template<typename T>
T getValue(common::LogicalTypeID logicalTypeID, uint8_t* value);
void extractNested(uint8_t* value, const common::ValueVector* vector,
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

// Extract dremel encoding levels
int getRepetitionLevel(int currentElementIndex, int parentElementIndex, int depth);
int getRepetitionLevel(int currentElementIdx, int parentElementIdx, int depth);

std::unordered_map<int, ParquetColumnDescriptor> columnDescriptors;

// Properties for nested lists and structs
bool isListStarting;

int currentParquetColumn;

int currentKuzuColumn;
};

Expand Down
Loading

0 comments on commit 8ff867d

Please sign in to comment.