Skip to content

Commit

Permalink
Implement parquet-reader
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Sep 25, 2023
1 parent da22d9e commit b43a63c
Show file tree
Hide file tree
Showing 100 changed files with 21,257 additions and 65 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ include_directories(third_party/pybind11/include)
include_directories(third_party/re2/include)
include_directories(third_party/concurrentqueue)
include_directories(third_party/serd/include)
include_directories(third_party/miniparquet/src)

add_subdirectory(third_party)
add_subdirectory(src)
Expand Down
2 changes: 1 addition & 1 deletion dataset/copy-test/node/parquet/schema.cypher
Original file line number Diff line number Diff line change
@@ -1 +1 @@
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, listOfInt64 INT64[], listOfString STRING[], listOfListOfInt64 INT64[][], fixedSizeList INT64[3], listOfFixedSizeList INT64[3][], structColumn STRUCT(ID int64, name STRING), PRIMARY KEY (id));
create node table tableOfTypes (id INT64, int64Column INT64, doubleColumn DOUBLE, booleanColumn BOOLEAN, dateColumn DATE, stringColumn STRING, listOfInt64 INT64[], listOfString STRING[], listOfListOfInt64 INT64[][], structColumn STRUCT(ID int64, name STRING), PRIMARY KEY (id));
Binary file modified dataset/copy-test/node/parquet/types_50k_0.parquet
Binary file not shown.
Binary file modified dataset/copy-test/node/parquet/types_50k_1.parquet
Binary file not shown.
Binary file modified dataset/copy-test/node/parquet/types_50k_2.parquet
Binary file not shown.
4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ add_subdirectory(transaction)

add_library(kuzu STATIC ${ALL_OBJECT_FILES})
target_link_libraries(kuzu
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor miniparquet)
target_include_directories(kuzu
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
add_library(kuzu_shared SHARED ${ALL_OBJECT_FILES})
Expand All @@ -24,6 +24,6 @@ else()
set_target_properties(kuzu_shared PROPERTIES OUTPUT_NAME kuzu)
endif()
target_link_libraries(kuzu_shared
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor)
PUBLIC antlr4_cypher antlr4_runtime utf8proc re2 serd ${PARQUET_LIB} ${ARROW_LIB} Threads::Threads fastpfor miniparquet)
target_include_directories(kuzu_shared
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
4 changes: 2 additions & 2 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ expression_vector Binder::bindExpectedNodeFileColumns(
columns.push_back(createVariable(columnName, stringType));
}
} break;
case FileType::PARQUET:
case FileType::CSV: {
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
Expand All @@ -207,8 +208,7 @@ expression_vector Binder::bindExpectedNodeFileColumns(
columns.push_back(createVariable(property->getName(), *property->getDataType()));
}
} break;
case FileType::NPY:
case FileType::PARQUET: {
case FileType::NPY: {
for (auto& property : tableSchema->properties) {
if (skipPropertyInFile(*property)) {
continue;
Expand Down
2 changes: 1 addition & 1 deletion src/binder/bind/bind_reading_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "parser/query/reading_clause/in_query_call_clause.h"
#include "parser/query/reading_clause/load_from.h"
#include "parser/query/reading_clause/unwind_clause.h"
#include "processor/operator/persistent/reader/csv_reader.h"
#include "processor/operator/persistent/reader/csv/csv_reader.h"

using namespace kuzu::common;
using namespace kuzu::parser;
Expand Down
17 changes: 17 additions & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,23 @@ void ListVector::copyFromVectorData(ValueVector* dstVector, uint8_t* dstData,
}
}

void ListVector::appendDataVector(kuzu::common::ValueVector* dstVector,
kuzu::common::ValueVector* srcDataVector, uint64_t numValuesToAppend) {
auto offset = getDataVectorSize(dstVector);
resizeDataVector(dstVector, offset + numValuesToAppend);
auto dstDataVector = getDataVector(dstVector);
for (auto i = 0u; i < numValuesToAppend; i++) {
dstDataVector->copyFromVectorData(offset + i, srcDataVector, i);
}
}

void ListVector::sliceDataVector(
ValueVector* vectorToSlice, uint64_t childIdx, uint64_t numValues) {
for (auto i = 0u; i < numValues - childIdx; i++) {
vectorToSlice->copyFromVectorData(i, vectorToSlice, i + childIdx);
}
}

void StructVector::copyFromRowData(ValueVector* vector, uint32_t pos, const uint8_t* rowData) {
assert(vector->dataType.getPhysicalType() == PhysicalTypeID::STRUCT);
auto& structFields = getFieldVectors(vector);
Expand Down
10 changes: 10 additions & 0 deletions src/include/common/types/ku_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ struct ku_string_t {
value.len);
}

void setFromRawStr(const char* value, uint64_t length) {
this->len = length;
if (isShortString(length)) {
setShortString(value, length);
} else {
memcpy(prefix, value, PREFIX_LENGTH);
overflowPtr = reinterpret_cast<uint64_t>(value);
}
}

std::string getAsShortString() const;
std::string getAsString() const;

Expand Down
2 changes: 2 additions & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ class VarListTypeInfo : public ExtraTypeInfo {
VarListTypeInfo() = default;
explicit VarListTypeInfo(std::unique_ptr<LogicalType> childType)
: childType{std::move(childType)} {}
explicit VarListTypeInfo(LogicalTypeID childTypeID)
: childType{std::make_unique<LogicalType>(childTypeID)} {}
inline LogicalType* getChildType() const { return childType.get(); }
bool operator==(const VarListTypeInfo& other) const;
std::unique_ptr<ExtraTypeInfo> copy() const override;
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ class ListVector {
InMemOverflowBuffer* rowOverflowBuffer);
static void copyFromVectorData(ValueVector* dstVector, uint8_t* dstData,
const ValueVector* srcVector, const uint8_t* srcData);
static void appendDataVector(
ValueVector* dstVector, ValueVector* srcDataVector, uint64_t numValuesToAppend);
static void sliceDataVector(ValueVector* vectorToSlice, uint64_t childIdx, uint64_t numValues);
};

class StructVector {
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class Reader : public PhysicalOperator {
read_rows_func_t readFunc;
init_reader_data_func_t initFunc;
std::shared_ptr<ReaderFunctionData> readFuncData;
storage::MemoryManager* memoryManager;
};

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once

#include "column_reader.h"
#include "templated_column_reader.h"

namespace kuzu {
namespace processor {

struct BooleanParquetValueConversion;

class BooleanColumnReader : public TemplatedColumnReader<bool, BooleanParquetValueConversion> {
public:
static constexpr const common::PhysicalTypeID TYPE = common::PhysicalTypeID::BOOL;

public:
BooleanColumnReader(ParquetReader& reader, std::unique_ptr<common::LogicalType> type,
const kuzu_parquet::format::SchemaElement& schema, uint64_t schemaIdx, uint64_t maxDefine,
uint64_t maxRepeat)
: TemplatedColumnReader<bool, BooleanParquetValueConversion>(
reader, std::move(type), schema, schemaIdx, maxDefine, maxRepeat),
bytePos(0){};

uint8_t bytePos;

void initializeRead(uint64_t rowGroupIdx,
const std::vector<kuzu_parquet::format::ColumnChunk>& columns,
kuzu_apache::thrift::protocol::TProtocol& protocol) override;

inline void resetPage() override { bytePos = 0; }
};

struct BooleanParquetValueConversion {
static bool dictRead(ByteBuffer& dict, uint32_t& offset, ColumnReader& reader) {
throw common::CopyException{"Dicts for booleans make no sense"};

Check warning on line 34 in src/include/processor/operator/persistent/reader/parquet/boolean_column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/boolean_column_reader.h#L33-L34

Added lines #L33 - L34 were not covered by tests
}

static bool plainRead(ByteBuffer& plainData, ColumnReader& reader);

static inline void plainSkip(ByteBuffer& plainData, ColumnReader& reader) {
plainRead(plainData, reader);
}

Check warning on line 41 in src/include/processor/operator/persistent/reader/parquet/boolean_column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/boolean_column_reader.h#L40-L41

Added lines #L40 - L41 were not covered by tests
};

} // namespace processor
} // namespace kuzu
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#pragma once

#include "column_reader.h"
#include "parquet_reader.h"
#include "templated_column_reader.h"

namespace kuzu {
namespace processor {

template<class PARQUET_PHYSICAL_TYPE, class KU_PHYSICAL_TYPE,
KU_PHYSICAL_TYPE (*FUNC)(const PARQUET_PHYSICAL_TYPE& input)>
class CallbackColumnReader
: public TemplatedColumnReader<KU_PHYSICAL_TYPE,
CallbackParquetValueConversion<PARQUET_PHYSICAL_TYPE, KU_PHYSICAL_TYPE, FUNC>> {
using BaseType = TemplatedColumnReader<KU_PHYSICAL_TYPE,
CallbackParquetValueConversion<PARQUET_PHYSICAL_TYPE, KU_PHYSICAL_TYPE, FUNC>>;

public:
static constexpr const common::PhysicalTypeID TYPE = common::PhysicalTypeID::ANY;

public:
CallbackColumnReader(ParquetReader& reader, std::unique_ptr<common::LogicalType> type_p,
const kuzu_parquet::format::SchemaElement& schema_p, uint64_t file_idx_p,
uint64_t max_define_p, uint64_t max_repeat_p)
: TemplatedColumnReader<KU_PHYSICAL_TYPE,
CallbackParquetValueConversion<PARQUET_PHYSICAL_TYPE, KU_PHYSICAL_TYPE, FUNC>>(
reader, std::move(type_p), schema_p, file_idx_p, max_define_p, max_repeat_p) {}

protected:
void dictionary(std::shared_ptr<ResizeableBuffer> dictionaryData, uint64_t numEntries) {
BaseType::allocateDict(numEntries * sizeof(KU_PHYSICAL_TYPE));
auto dictPtr = (KU_PHYSICAL_TYPE*)this->dict->ptr;
for (auto i = 0u; i < numEntries; i++) {
dictPtr[i] = FUNC(dictionaryData->read<PARQUET_PHYSICAL_TYPE>());
}
}
};

} // namespace processor
} // namespace kuzu
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#pragma once

#include <bitset>

#include "common/constants.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
#include "parquet/parquet_types.h"
#include "parquet_dbp_decoder.h"
#include "parquet_rle_bp_decoder.h"
#include "resizable_buffer.h"
#include "thrift_tools.h"

namespace kuzu {
namespace processor {
class ParquetReader;

typedef std::bitset<common::DEFAULT_VECTOR_CAPACITY> parquet_filter_t;

class ColumnReader {
public:
ColumnReader(ParquetReader& reader, std::unique_ptr<common::LogicalType> type,
const kuzu_parquet::format::SchemaElement& schema, uint64_t fileIdx, uint64_t maxDefinition,
uint64_t maxRepeat);
virtual ~ColumnReader() = default;
inline common::LogicalType* getDataType() const { return type.get(); }
inline bool hasDefines() { return maxDefine > 0; }
inline bool hasRepeats() { return maxRepeat > 0; }
virtual inline void skip(uint64_t numValues) { pendingSkips += numValues; }
virtual inline void dictionary(std::shared_ptr<ResizeableBuffer> data, uint64_t num_entries) {
throw common::NotImplementedException{"Dictionary"};

Check warning on line 31 in src/include/processor/operator/persistent/reader/parquet/column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/column_reader.h#L29-L31

Added lines #L29 - L31 were not covered by tests
}
virtual inline void offsets(uint32_t* offsets, uint8_t* defines, uint64_t numValues,

Check warning on line 33 in src/include/processor/operator/persistent/reader/parquet/column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/column_reader.h#L33

Added line #L33 was not covered by tests
parquet_filter_t& filter, uint64_t resultOffset, common::ValueVector* result) {
throw common::NotImplementedException{"ColumnReader::offsets"};

Check warning on line 35 in src/include/processor/operator/persistent/reader/parquet/column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/column_reader.h#L35

Added line #L35 was not covered by tests
}
virtual inline void plain(std::shared_ptr<ByteBuffer> plainData, uint8_t* defines,

Check warning on line 37 in src/include/processor/operator/persistent/reader/parquet/column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/column_reader.h#L37

Added line #L37 was not covered by tests
uint64_t numValues, parquet_filter_t& filter, uint64_t resultOffset,
common::ValueVector* result) {
throw common::NotImplementedException{"ColumnReader::plain"};

Check warning on line 40 in src/include/processor/operator/persistent/reader/parquet/column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/column_reader.h#L40

Added line #L40 was not covered by tests
}
virtual inline void resetPage() {}
virtual inline uint64_t getGroupRowsAvailable() { return groupRowsAvailable; }
virtual void initializeRead(uint64_t rowGroupIdx,
const std::vector<kuzu_parquet::format::ColumnChunk>& columns,
kuzu_apache::thrift::protocol::TProtocol& protocol);
virtual uint64_t getTotalCompressedSize();
virtual void registerPrefetch(ThriftFileTransport& transport, bool allowMerge);
virtual uint64_t fileOffset() const;
virtual void applyPendingSkips(uint64_t numValues);
virtual uint64_t read(uint64_t numValues, parquet_filter_t& filter, uint8_t* defineOut,
uint8_t* repeatOut, common::ValueVector* resultOut);
static std::unique_ptr<ColumnReader> createReader(ParquetReader& reader,
std::unique_ptr<common::LogicalType> type,
const kuzu_parquet::format::SchemaElement& schema, uint64_t fileIdx, uint64_t maxDefine,
uint64_t maxRepeat);
void prepareRead(parquet_filter_t& filter);
void allocateBlock(uint64_t size);
void allocateCompressed(uint64_t size);
void decompressInternal(kuzu_parquet::format::CompressionCodec::type codec, const uint8_t* src,
uint64_t srcSize, uint8_t* dst, uint64_t dstSize);
void preparePageV2(kuzu_parquet::format::PageHeader& pageHdr);
void preparePage(kuzu_parquet::format::PageHeader& pageHdr);
void prepareDataPage(kuzu_parquet::format::PageHeader& pageHdr);
template<class VALUE_TYPE, class CONVERSION>
void plainTemplated(std::shared_ptr<ByteBuffer> plainData, uint8_t* defines, uint64_t numValues,
parquet_filter_t& filter, uint64_t resultOffset, common::ValueVector* result) {
for (auto i = 0u; i < numValues; i++) {
if (hasDefines() && defines[i + resultOffset] != maxDefine) {
result->setNull(i + resultOffset, true);
continue;

Check warning on line 71 in src/include/processor/operator/persistent/reader/parquet/column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/column_reader.h#L70-L71

Added lines #L70 - L71 were not covered by tests
}
if (filter[i + resultOffset]) {
VALUE_TYPE val = CONVERSION::plainRead(*plainData, *this);
result->setValue(i + resultOffset, val);
} else { // there is still some data there that we have to skip over
CONVERSION::plainSkip(*plainData, *this);

Check warning on line 77 in src/include/processor/operator/persistent/reader/parquet/column_reader.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/column_reader.h#L77

Added line #L77 was not covered by tests
}
}
}

protected:
const kuzu_parquet::format::SchemaElement& schema;

uint64_t fileIdx;
uint64_t maxDefine;
uint64_t maxRepeat;

ParquetReader& reader;
std::unique_ptr<common::LogicalType> type;

uint64_t pendingSkips = 0;

const kuzu_parquet::format::ColumnChunk* chunk = nullptr;

kuzu_apache::thrift::protocol::TProtocol* protocol;
uint64_t pageRowsAvailable;
uint64_t groupRowsAvailable;
uint64_t chunkReadOffset;

std::shared_ptr<ResizeableBuffer> block;

ResizeableBuffer compressedBuffer;
ResizeableBuffer offsetBuffer;

std::unique_ptr<RleBpDecoder> dictDecoder;
std::unique_ptr<RleBpDecoder> defineDecoder;
std::unique_ptr<RleBpDecoder> repeatedDecoder;
std::unique_ptr<DbpDecoder> dbpDecoder;
std::unique_ptr<RleBpDecoder> rleDecoder;

// dummies for Skip()
parquet_filter_t noneFilter;
ResizeableBuffer dummyDefine;
ResizeableBuffer dummyRepeat;
};

} // namespace processor
} // namespace kuzu
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#pragma once

#include "common/exception/copy.h"
#include "common/string_utils.h"
#include "resizable_buffer.h"

namespace kuzu {
namespace processor {
class ParquetDecodeUtils {

public:
template<class T>
static T ZigzagToInt(const T n) {
return (n >> 1) ^ -(n & 1);

Check warning on line 14 in src/include/processor/operator/persistent/reader/parquet/decode_utils.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/decode_utils.h#L14

Added line #L14 was not covered by tests
}

static const uint64_t BITPACK_MASKS[];
static const uint64_t BITPACK_MASKS_SIZE;
static const uint8_t BITPACK_DLEN;

template<typename T>
static uint32_t BitUnpack(
ByteBuffer& buffer, uint8_t& bitpack_pos, T* dest, uint32_t count, uint8_t width) {
if (width >= ParquetDecodeUtils::BITPACK_MASKS_SIZE) {
throw common::CopyException(common::StringUtils::string_format(

Check warning on line 25 in src/include/processor/operator/persistent/reader/parquet/decode_utils.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/decode_utils.h#L25

Added line #L25 was not covered by tests
"The width ({}) of the bitpacked data exceeds the supported max width ({}), "
"the file might be corrupted.",
width, ParquetDecodeUtils::BITPACK_MASKS_SIZE));
}
auto mask = BITPACK_MASKS[width];

for (uint32_t i = 0; i < count; i++) {
T val = (buffer.get<uint8_t>() >> bitpack_pos) & mask;
bitpack_pos += width;
while (bitpack_pos > BITPACK_DLEN) {
buffer.inc(1);
val |= (T(buffer.get<uint8_t>()) << T(BITPACK_DLEN - (bitpack_pos - width))) & mask;
bitpack_pos -= BITPACK_DLEN;
}
dest[i] = val;
}
return count;
}

template<class T>
static T VarintDecode(ByteBuffer& buf) {
T result = 0;
uint8_t shift = 0;
while (true) {
auto byte = buf.read<uint8_t>();
result |= T(byte & 127) << shift;
if ((byte & 128) == 0) {
break;
}
shift += 7;
if (shift > sizeof(T) * 8) {
throw std::runtime_error("Varint-decoding found too large number");

Check warning on line 57 in src/include/processor/operator/persistent/reader/parquet/decode_utils.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/reader/parquet/decode_utils.h#L57

Added line #L57 was not covered by tests
}
}
return result;
}
};

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit b43a63c

Please sign in to comment.