Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement parquet-reader #2076

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ include_directories(third_party/pybind11/include)
include_directories(third_party/re2/include)
include_directories(third_party/concurrentqueue)
include_directories(third_party/serd/include)
include_directories(third_party/miniparquet/src)

add_subdirectory(third_party)
add_subdirectory(src)
Expand Down
2 changes: 1 addition & 1 deletion dataset/copy-test/node/parquet/schema.cypher
Original file line number Diff line number Diff line change
@@ -1 +1 @@
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, listOfInt64 INT64[], listOfString STRING[], listOfListOfInt64 INT64[][], fixedSizeList INT64[3], listOfFixedSizeList INT64[3][], structColumn STRUCT(ID int64, name STRING), PRIMARY KEY (id));
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, stringColumn STRING, listOfInt64 INT64[], listOfString STRING[], listOfListOfInt64 INT64[][], structColumn STRUCT(ID int64, name STRING), PRIMARY KEY (id));
Binary file modified dataset/copy-test/node/parquet/types_50k_0.parquet
Binary file not shown.
Binary file modified dataset/copy-test/node/parquet/types_50k_1.parquet
Binary file not shown.
Binary file modified dataset/copy-test/node/parquet/types_50k_2.parquet
Binary file not shown.
4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ add_subdirectory(transaction)

add_library(kuzu STATIC ${ALL_OBJECT_FILES})
target_link_libraries(kuzu
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor miniparquet)
target_include_directories(kuzu
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
add_library(kuzu_shared SHARED ${ALL_OBJECT_FILES})
Expand All @@ -24,6 +24,6 @@ else()
set_target_properties(kuzu_shared PROPERTIES OUTPUT_NAME kuzu)
endif()
target_link_libraries(kuzu_shared
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor miniparquet)
target_include_directories(kuzu_shared
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
4 changes: 2 additions & 2 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ expression_vector Binder::bindExpectedNodeFileColumns(
columns.push_back(createVariable(columnName, stringType));
}
} break;
case FileType::PARQUET:
case FileType::CSV: {
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
Expand All @@ -207,8 +208,7 @@ expression_vector Binder::bindExpectedNodeFileColumns(
columns.push_back(createVariable(property->getName(), *property->getDataType()));
}
} break;
case FileType::NPY:
case FileType::PARQUET: {
case FileType::NPY: {
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
continue;
Expand Down
2 changes: 1 addition & 1 deletion src/binder/bind/bind_reading_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "parser/query/reading_clause/in_query_call_clause.h"
#include "parser/query/reading_clause/load_from.h"
#include "parser/query/reading_clause/unwind_clause.h"
#include "processor/operator/persistent/reader/csv_reader.h"
#include "processor/operator/persistent/reader/csv/csv_reader.h"

using namespace kuzu::common;
using namespace kuzu::parser;
Expand Down
17 changes: 17 additions & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,23 @@ void ListVector::copyFromVectorData(ValueVector* dstVector, uint8_t* dstData,
}
}

void ListVector::appendDataVector(kuzu::common::ValueVector* dstVector,
kuzu::common::ValueVector* srcDataVector, uint64_t numValuesToAppend) {
auto offset = getDataVectorSize(dstVector);
resizeDataVector(dstVector, offset + numValuesToAppend);
auto dstDataVector = getDataVector(dstVector);
for (auto i = 0u; i < numValuesToAppend; i++) {
dstDataVector->copyFromVectorData(offset + i, srcDataVector, i);
}
}

void ListVector::sliceDataVector(
ValueVector* vectorToSlice, uint64_t childIdx, uint64_t numValues) {
for (auto i = 0u; i < numValues - childIdx; i++) {
vectorToSlice->copyFromVectorData(i, vectorToSlice, i + childIdx);
}
}

void StructVector::copyFromRowData(ValueVector* vector, uint32_t pos, const uint8_t* rowData) {
assert(vector->dataType.getPhysicalType() == PhysicalTypeID::STRUCT);
auto& structFields = getFieldVectors(vector);
Expand Down
10 changes: 10 additions & 0 deletions src/include/common/types/ku_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ struct ku_string_t {
value.len);
}

void setFromRawStr(const char* value, uint64_t length) {
this->len = length;
if (isShortString(length)) {
setShortString(value, length);
} else {
memcpy(prefix, value, PREFIX_LENGTH);
overflowPtr = reinterpret_cast<uint64_t>(value);
}
}

std::string getAsShortString() const;
std::string getAsString() const;

Expand Down
2 changes: 2 additions & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ class VarListTypeInfo : public ExtraTypeInfo {
VarListTypeInfo() = default;
explicit VarListTypeInfo(std::unique_ptr<LogicalType> childType)
: childType{std::move(childType)} {}
explicit VarListTypeInfo(LogicalTypeID childTypeID)
: childType{std::make_unique<LogicalType>(childTypeID)} {}
inline LogicalType* getChildType() const { return childType.get(); }
bool operator==(const VarListTypeInfo& other) const;
std::unique_ptr<ExtraTypeInfo> copy() const override;
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ class ListVector {
InMemOverflowBuffer* rowOverflowBuffer);
static void copyFromVectorData(ValueVector* dstVector, uint8_t* dstData,
const ValueVector* srcVector, const uint8_t* srcData);
static void appendDataVector(
ValueVector* dstVector, ValueVector* srcDataVector, uint64_t numValuesToAppend);
static void sliceDataVector(ValueVector* vectorToSlice, uint64_t childIdx, uint64_t numValues);
};

class StructVector {
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class Reader : public PhysicalOperator {
read_rows_func_t readFunc;
init_reader_data_func_t initFunc;
std::shared_ptr<ReaderFunctionData> readFuncData;
storage::MemoryManager* memoryManager;
};

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once

#include "column_reader.h"
#include "templated_column_reader.h"

namespace kuzu {
namespace processor {

struct BooleanParquetValueConversion;

class BooleanColumnReader : public TemplatedColumnReader<bool, BooleanParquetValueConversion> {
public:
static constexpr const common::PhysicalTypeID TYPE = common::PhysicalTypeID::BOOL;

public:
BooleanColumnReader(ParquetReader& reader, std::unique_ptr<common::LogicalType> type,
const kuzu_parquet::format::SchemaElement& schema, uint64_t schemaIdx, uint64_t maxDefine,
uint64_t maxRepeat)
: TemplatedColumnReader<bool, BooleanParquetValueConversion>(
reader, std::move(type), schema, schemaIdx, maxDefine, maxRepeat),
bytePos(0){};

uint8_t bytePos;

void initializeRead(uint64_t rowGroupIdx,
const std::vector<kuzu_parquet::format::ColumnChunk>& columns,
kuzu_apache::thrift::protocol::TProtocol& protocol) override;

inline void resetPage() override { bytePos = 0; }
};

struct BooleanParquetValueConversion {
static bool dictRead(ByteBuffer& dict, uint32_t& offset, ColumnReader& reader) {
throw common::CopyException{"Dicts for booleans make no sense"};

Check warning on line 34 in src/include/processor/operator/persistent/reader/parquet/boolean_column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/boolean_column_reader.h#L33-L34

Added lines #L33 - L34 were not covered by tests
}

static bool plainRead(ByteBuffer& plainData, ColumnReader& reader);

static inline void plainSkip(ByteBuffer& plainData, ColumnReader& reader) {
plainRead(plainData, reader);
}

Check warning on line 41 in src/include/processor/operator/persistent/reader/parquet/boolean_column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/boolean_column_reader.h#L40-L41

Added lines #L40 - L41 were not covered by tests
};

} // namespace processor
} // namespace kuzu
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#pragma once

#include "column_reader.h"
#include "parquet_reader.h"
#include "templated_column_reader.h"

namespace kuzu {
namespace processor {

template<class PARQUET_PHYSICAL_TYPE, class KU_PHYSICAL_TYPE,
KU_PHYSICAL_TYPE (*FUNC)(const PARQUET_PHYSICAL_TYPE& input)>
class CallbackColumnReader
: public TemplatedColumnReader<KU_PHYSICAL_TYPE,
CallbackParquetValueConversion<PARQUET_PHYSICAL_TYPE, KU_PHYSICAL_TYPE, FUNC>> {
using BaseType = TemplatedColumnReader<KU_PHYSICAL_TYPE,
CallbackParquetValueConversion<PARQUET_PHYSICAL_TYPE, KU_PHYSICAL_TYPE, FUNC>>;

public:
static constexpr const common::PhysicalTypeID TYPE = common::PhysicalTypeID::ANY;

public:
CallbackColumnReader(ParquetReader& reader, std::unique_ptr<common::LogicalType> type_p,
const kuzu_parquet::format::SchemaElement& schema_p, uint64_t file_idx_p,
uint64_t max_define_p, uint64_t max_repeat_p)
: TemplatedColumnReader<KU_PHYSICAL_TYPE,
CallbackParquetValueConversion<PARQUET_PHYSICAL_TYPE, KU_PHYSICAL_TYPE, FUNC>>(
reader, std::move(type_p), schema_p, file_idx_p, max_define_p, max_repeat_p) {}

protected:
void dictionary(std::shared_ptr<ResizeableBuffer> dictionaryData, uint64_t numEntries) {
BaseType::allocateDict(numEntries * sizeof(KU_PHYSICAL_TYPE));
auto dictPtr = (KU_PHYSICAL_TYPE*)this->dict->ptr;
for (auto i = 0u; i < numEntries; i++) {
dictPtr[i] = FUNC(dictionaryData->read<PARQUET_PHYSICAL_TYPE>());
}
}
};

} // namespace processor
} // namespace kuzu
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#pragma once

#include <bitset>

#include "common/constants.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "parquet/parquet_types.h"
#include "parquet_dbp_decoder.h"
#include "parquet_rle_bp_decoder.h"
#include "resizable_buffer.h"
#include "thrift_tools.h"

namespace kuzu {
namespace processor {
class ParquetReader;

typedef std::bitset<common::DEFAULT_VECTOR_CAPACITY> parquet_filter_t;

class ColumnReader {
public:
ColumnReader(ParquetReader& reader, std::unique_ptr<common::LogicalType> type,
const kuzu_parquet::format::SchemaElement& schema, uint64_t fileIdx, uint64_t maxDefinition,
uint64_t maxRepeat);
virtual ~ColumnReader() = default;
inline common::LogicalType* getDataType() const { return type.get(); }
inline bool hasDefines() { return maxDefine > 0; }
inline bool hasRepeats() { return maxRepeat > 0; }
virtual inline void skip(uint64_t numValues) { pendingSkips += numValues; }
virtual inline void dictionary(std::shared_ptr<ResizeableBuffer> data, uint64_t num_entries) {
throw common::NotImplementedException{"Dictionary"};

Check warning on line 31 in src/include/processor/operator/persistent/reader/parquet/column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/column_reader.h#L29-L31

Added lines #L29 - L31 were not covered by tests
}
virtual inline void offsets(uint32_t* offsets, uint8_t* defines, uint64_t numValues,

Check warning on line 33 in src/include/processor/operator/persistent/reader/parquet/column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/column_reader.h#L33

Added line #L33 was not covered by tests
parquet_filter_t& filter, uint64_t resultOffset, common::ValueVector* result) {
throw common::NotImplementedException{"ColumnReader::offsets"};

Check warning on line 35 in src/include/processor/operator/persistent/reader/parquet/column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/column_reader.h#L35

Added line #L35 was not covered by tests
}
virtual inline void plain(std::shared_ptr<ByteBuffer> plainData, uint8_t* defines,

Check warning on line 37 in src/include/processor/operator/persistent/reader/parquet/column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/column_reader.h#L37

Added line #L37 was not covered by tests
uint64_t numValues, parquet_filter_t& filter, uint64_t resultOffset,
common::ValueVector* result) {
throw common::NotImplementedException{"ColumnReader::plain"};

Check warning on line 40 in src/include/processor/operator/persistent/reader/parquet/column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/column_reader.h#L40

Added line #L40 was not covered by tests
}
virtual inline void resetPage() {}
virtual inline uint64_t getGroupRowsAvailable() { return groupRowsAvailable; }
virtual void initializeRead(uint64_t rowGroupIdx,
const std::vector<kuzu_parquet::format::ColumnChunk>& columns,
kuzu_apache::thrift::protocol::TProtocol& protocol);
virtual uint64_t getTotalCompressedSize();
virtual void registerPrefetch(ThriftFileTransport& transport, bool allowMerge);
virtual uint64_t fileOffset() const;
virtual void applyPendingSkips(uint64_t numValues);
virtual uint64_t read(uint64_t numValues, parquet_filter_t& filter, uint8_t* defineOut,
uint8_t* repeatOut, common::ValueVector* resultOut);
static std::unique_ptr<ColumnReader> createReader(ParquetReader& reader,
std::unique_ptr<common::LogicalType> type,
const kuzu_parquet::format::SchemaElement& schema, uint64_t fileIdx, uint64_t maxDefine,
uint64_t maxRepeat);
void prepareRead(parquet_filter_t& filter);
void allocateBlock(uint64_t size);
void allocateCompressed(uint64_t size);
void decompressInternal(kuzu_parquet::format::CompressionCodec::type codec, const uint8_t* src,
uint64_t srcSize, uint8_t* dst, uint64_t dstSize);
void preparePageV2(kuzu_parquet::format::PageHeader& pageHdr);
void preparePage(kuzu_parquet::format::PageHeader& pageHdr);
void prepareDataPage(kuzu_parquet::format::PageHeader& pageHdr);
template<class VALUE_TYPE, class CONVERSION>
void plainTemplated(std::shared_ptr<ByteBuffer> plainData, uint8_t* defines, uint64_t numValues,
parquet_filter_t& filter, uint64_t resultOffset, common::ValueVector* result) {
for (auto i = 0u; i < numValues; i++) {
if (hasDefines() && defines[i + resultOffset] != maxDefine) {
result->setNull(i + resultOffset, true);
continue;

Check warning on line 71 in src/include/processor/operator/persistent/reader/parquet/column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/column_reader.h#L70-L71

Added lines #L70 - L71 were not covered by tests
}
if (filter[i + resultOffset]) {
VALUE_TYPE val = CONVERSION::plainRead(*plainData, *this);
result->setValue(i + resultOffset, val);
} else { // there is still some data there that we have to skip over
CONVERSION::plainSkip(*plainData, *this);

Check warning on line 77 in src/include/processor/operator/persistent/reader/parquet/column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/column_reader.h#L77

Added line #L77 was not covered by tests
}
}
}

protected:
const kuzu_parquet::format::SchemaElement& schema;

uint64_t fileIdx;
uint64_t maxDefine;
uint64_t maxRepeat;

ParquetReader& reader;
std::unique_ptr<common::LogicalType> type;

uint64_t pendingSkips = 0;

const kuzu_parquet::format::ColumnChunk* chunk = nullptr;

kuzu_apache::thrift::protocol::TProtocol* protocol;
uint64_t pageRowsAvailable;
uint64_t groupRowsAvailable;
uint64_t chunkReadOffset;

std::shared_ptr<ResizeableBuffer> block;

ResizeableBuffer compressedBuffer;
ResizeableBuffer offsetBuffer;

std::unique_ptr<RleBpDecoder> dictDecoder;
std::unique_ptr<RleBpDecoder> defineDecoder;
std::unique_ptr<RleBpDecoder> repeatedDecoder;
std::unique_ptr<DbpDecoder> dbpDecoder;
std::unique_ptr<RleBpDecoder> rleDecoder;

// dummies for Skip()
parquet_filter_t noneFilter;
ResizeableBuffer dummyDefine;
ResizeableBuffer dummyRepeat;
};

} // namespace processor
} // namespace kuzu
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#pragma once

#include "common/exception/copy.h"
#include "common/string_utils.h"
#include "resizable_buffer.h"

namespace kuzu {
namespace processor {
class ParquetDecodeUtils {

public:
template<class T>
static T ZigzagToInt(const T n) {
return (n >> 1) ^ -(n & 1);

Check warning on line 14 in src/include/processor/operator/persistent/reader/parquet/decode_utils.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/decode_utils.h#L14

Added line #L14 was not covered by tests
}

static const uint64_t BITPACK_MASKS[];
static const uint64_t BITPACK_MASKS_SIZE;
static const uint8_t BITPACK_DLEN;

template<typename T>
static uint32_t BitUnpack(
ByteBuffer& buffer, uint8_t& bitpack_pos, T* dest, uint32_t count, uint8_t width) {
if (width >= ParquetDecodeUtils::BITPACK_MASKS_SIZE) {
throw common::CopyException(common::StringUtils::string_format(

Check warning on line 25 in src/include/processor/operator/persistent/reader/parquet/decode_utils.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/decode_utils.h#L25

Added line #L25 was not covered by tests
"The width ({}) of the bitpacked data exceeds the supported max width ({}), "
"the file might be corrupted.",
width, ParquetDecodeUtils::BITPACK_MASKS_SIZE));
}
auto mask = BITPACK_MASKS[width];

for (uint32_t i = 0; i < count; i++) {
T val = (buffer.get<uint8_t>() >> bitpack_pos) & mask;
bitpack_pos += width;
while (bitpack_pos > BITPACK_DLEN) {
buffer.inc(1);
val |= (T(buffer.get<uint8_t>()) << T(BITPACK_DLEN - (bitpack_pos - width))) & mask;
bitpack_pos -= BITPACK_DLEN;
}
dest[i] = val;
}
return count;
}

template<class T>
static T VarintDecode(ByteBuffer& buf) {
T result = 0;
uint8_t shift = 0;
while (true) {
auto byte = buf.read<uint8_t>();
result |= T(byte & 127) << shift;
if ((byte & 128) == 0) {
break;
}
shift += 7;
if (shift > sizeof(T) * 8) {
throw std::runtime_error("Varint-decoding found too large number");

Check warning on line 57 in src/include/processor/operator/persistent/reader/parquet/decode_utils.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/decode_utils.h#L57

Added line #L57 was not covered by tests
}
}
return result;
}
};

} // namespace processor
} // namespace kuzu
Loading