Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Oct 10, 2023
1 parent d4ee1fb commit 75d5431
Show file tree
Hide file tree
Showing 15 changed files with 430 additions and 478 deletions.
5 changes: 5 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,5 +184,10 @@ struct OrderByConstants {
static constexpr uint64_t MIN_LIMIT_RATIO_TO_REDUCE = 2;
};

struct ParquetConstants {
static constexpr uint64_t PARQUET_DEFINE_VALID = 65535;
static constexpr const char* PARQUET_MAGIC_WORDS = "PAR1";
};

} // namespace common
} // namespace kuzu
4 changes: 3 additions & 1 deletion src/include/processor/operator/persistent/copy_to_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ class CopyToParquet : public Sink {

void executeInternal(ExecutionContext* context) final;

void finalize(ExecutionContext* executionContext) final { sharedState->writer->Finalize(); }
inline void finalize(ExecutionContext* executionContext) final {
sharedState->writer->finalize();
}

std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<CopyToParquet>(resultSetDescriptor->copy(), info->copy(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ class BasicColumnWriter : public ColumnWriter {
static constexpr const uint64_t STRING_LENGTH_SIZE = sizeof(uint32_t);

public:
std::unique_ptr<ColumnWriterState> InitializeWriteState(
std::unique_ptr<ColumnWriterState> initializeWriteState(
kuzu_parquet::format::RowGroup& row_group) override;
void Prepare(ColumnWriterState& state, ColumnWriterState* parent, common::ValueVector* vector,
void prepare(ColumnWriterState& state, ColumnWriterState* parent, common::ValueVector* vector,
uint64_t count) override;
void BeginWrite(ColumnWriterState& state) override;
void Write(ColumnWriterState& state, common::ValueVector* vector, uint64_t count) override;
void FinalizeWrite(ColumnWriterState& state) override;
void beginWrite(ColumnWriterState& state) override;
void write(ColumnWriterState& state, common::ValueVector* vector, uint64_t count) override;
void finalizeWrite(ColumnWriterState& state) override;

protected:
void WriteLevels(BufferedSerializer& temp_writer, const std::vector<uint16_t>& levels,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ class BooleanStatisticsState : public ColumnWriterStatistics {
public:
bool HasStats() { return !(min && !max); }

std::string GetMin() override { return GetMinValue(); }
std::string GetMax() override { return GetMaxValue(); }
std::string GetMinValue() override {
std::string getMin() override { return getMinValue(); }
std::string getMax() override { return getMaxValue(); }
std::string getMinValue() override {
return HasStats() ? std::string(reinterpret_cast<const char*>(&min), sizeof(bool)) :
std::string();
}
std::string GetMaxValue() override {
std::string getMaxValue() override {
return HasStats() ? std::string(reinterpret_cast<const char*>(&max), sizeof(bool)) :
std::string();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ namespace kuzu {
namespace processor {
class ParquetWriter;

#define PARQUET_DEFINE_VALID 65535

struct PageInformation {
uint64_t offset = 0;
uint64_t row_count = 0;
uint64_t empty_count = 0;
uint64_t estimated_page_size = 0;
uint64_t rowCount = 0;
uint64_t emptyCount = 0;
uint64_t estimatedPageSize = 0;
};

class ColumnWriterPageState {
Expand All @@ -25,104 +23,90 @@ class ColumnWriterPageState {
};

struct PageWriteInformation {
kuzu_parquet::format::PageHeader page_header;
std::unique_ptr<BufferedSerializer> temp_writer;
std::unique_ptr<ColumnWriterPageState> page_state;
uint64_t write_page_idx = 0;
uint64_t write_count = 0;
uint64_t max_write_count = 0;
size_t compressed_size;
uint8_t* compressed_data;
std::unique_ptr<uint8_t[]> compressed_buf;
kuzu_parquet::format::PageHeader pageHeader;
std::unique_ptr<BufferedSerializer> bufferWriter;
std::unique_ptr<ColumnWriterPageState> pageState;
uint64_t writePageIdx = 0;
uint64_t writeCount = 0;
uint64_t maxWriteCount = 0;
size_t compressedSize;
uint8_t* compressedData;
std::unique_ptr<uint8_t[]> compressedBuf;
};

class ColumnWriterState {
public:
virtual ~ColumnWriterState() = default;

std::vector<uint16_t> definition_levels;
std::vector<uint16_t> repetition_levels;
std::vector<bool> is_empty;
std::vector<uint16_t> definitionLevels;
std::vector<uint16_t> repetitionLevels;
std::vector<bool> isEmpty;
};

// TODO(Ziyi): We don't have statistics for each column, so we simply return empty string for
// min/max.
class ColumnWriterStatistics {
public:
virtual ~ColumnWriterStatistics() = default;

virtual std::string GetMin() { return std::string(); }
virtual std::string GetMax() { return std::string(); }
virtual std::string GetMinValue() { return std::string(); }
virtual std::string GetMaxValue() { return std::string(); }

public:
template<class TARGET>
TARGET& Cast() {
D_ASSERT(dynamic_cast<TARGET*>(this));
return reinterpret_cast<TARGET&>(*this);
}
template<class TARGET>
const TARGET& Cast() const {
D_ASSERT(dynamic_cast<const TARGET*>(this));
return reinterpret_cast<const TARGET&>(*this);
}
virtual std::string getMin() { return {}; }
virtual std::string getMax() { return {}; }
virtual std::string getMinValue() { return {}; }
virtual std::string getMaxValue() { return {}; }
};

class ColumnWriter {

public:
ColumnWriter(ParquetWriter& writer, uint64_t schema_idx, std::vector<std::string> schema_path,
uint64_t max_repeat, uint64_t max_define, bool can_have_nulls);
ColumnWriter(ParquetWriter& writer, uint64_t schemaIdx, std::vector<std::string> schemaPath,
uint64_t maxRepeat, uint64_t maxDefine, bool canHaveNulls);
virtual ~ColumnWriter() = default;

ParquetWriter& writer;
uint64_t schema_idx;
std::vector<std::string> schema_path;
uint64_t max_repeat;
uint64_t max_define;
bool can_have_nulls;
uint64_t schemaIdx;
std::vector<std::string> schemaPath;
uint64_t maxRepeat;
uint64_t maxDefine;
bool canHaveNulls;
// collected stats
uint64_t null_count;
uint64_t nullCount;

public:
//! Create the column writer for a specific type recursively
static std::unique_ptr<ColumnWriter> createWriterRecursive(
std::vector<kuzu_parquet::format::SchemaElement>& schemas, ParquetWriter& writer,
common::LogicalType* type, const std::string& name, std::vector<std::string> schema_path,
uint64_t max_repeat = 0, uint64_t max_define = 1, bool can_have_nulls = true);
common::LogicalType* type, const std::string& name,
std::vector<std::string> schemaPathToCreate, uint64_t maxRepeatToCreate = 0,
uint64_t maxDefineToCreate = 1, bool canHaveNullsToCreate = true);

virtual std::unique_ptr<ColumnWriterState> InitializeWriteState(
kuzu_parquet::format::RowGroup& row_group) = 0;
virtual std::unique_ptr<ColumnWriterState> initializeWriteState(
kuzu_parquet::format::RowGroup& rowGroup) = 0;

//! indicates whether the write need to analyse the data before preparing it
virtual bool HasAnalyze() { return false; }
// Indicates whether the write need to analyse the data before preparing it.
virtual bool hasAnalyze() { return false; }

virtual void Analyze(ColumnWriterState& state, ColumnWriterState* parent,
virtual void analyze(ColumnWriterState& state, ColumnWriterState* parent,
common::ValueVector* vector, uint64_t count) {
throw common::NotImplementedException{"Writer does not need analysis"};
throw common::NotImplementedException{"ColumnWriter::analyze"};
}

//! Called after all data has been passed to Analyze
virtual void FinalizeAnalyze(ColumnWriterState& state) {
throw common::NotImplementedException{"Writer does not need analysis"};
// Called after all data has been passed to Analyze.
virtual void finalizeAnalyze(ColumnWriterState& state) {
throw common::NotImplementedException{"ColumnWriter::finalizeAnalyze"};
}

virtual void Prepare(ColumnWriterState& state, ColumnWriterState* parent,
virtual void prepare(ColumnWriterState& state, ColumnWriterState* parent,
common::ValueVector* vector, uint64_t count) = 0;

virtual void BeginWrite(ColumnWriterState& state) = 0;
virtual void Write(ColumnWriterState& state, common::ValueVector* vector, uint64_t count) = 0;
virtual void FinalizeWrite(ColumnWriterState& state) = 0;
virtual void beginWrite(ColumnWriterState& state) = 0;
virtual void write(ColumnWriterState& state, common::ValueVector* vector, uint64_t count) = 0;
virtual void finalizeWrite(ColumnWriterState& state) = 0;

protected:
void HandleDefineLevels(ColumnWriterState& state, ColumnWriterState* parent,
common::ValueVector* vector, uint64_t count, uint16_t define_value, uint16_t null_value);
void HandleRepeatLevels(
ColumnWriterState& state_p, ColumnWriterState* parent, uint64_t count, uint64_t max_repeat);

void CompressPage(BufferedSerializer& temp_writer, size_t& compressed_size,
uint8_t*& compressed_data, std::unique_ptr<uint8_t[]>& compressed_buf);
void handleDefineLevels(ColumnWriterState& state, ColumnWriterState* parent,
common::ValueVector* vector, uint64_t count, uint16_t defineValue, uint16_t nullValue);
void handleRepeatLevels(ColumnWriterState& stateToHandle, ColumnWriterState* parent,
uint64_t count, uint64_t maxRepeatToHandle);
void compressPage(BufferedSerializer& bufferedSerializer, size_t& compressedSize,
uint8_t*& compressedData, std::unique_ptr<uint8_t[]>& compressedBuf);
};

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ class ParquetWriterTransport : public kuzu_apache::thrift::protocol::TTransport
explicit ParquetWriterTransport(common::FileInfo* fileInfo, common::offset_t& offset)
: fileInfo{fileInfo}, offset{offset} {}

bool isOpen() const override { return true; }
inline bool isOpen() const override { return true; }

void open() override {}

void close() override {}

void write_virt(const uint8_t* buf, uint32_t len) override {
inline void write_virt(const uint8_t* buf, uint32_t len) override {
common::FileUtils::writeToFile(fileInfo, buf, len, offset);
offset += len;
}
Expand All @@ -32,7 +32,7 @@ class ParquetWriterTransport : public kuzu_apache::thrift::protocol::TTransport
};

struct PreparedRowGroup {
kuzu_parquet::format::RowGroup row_group;
kuzu_parquet::format::RowGroup rowGroup;
std::vector<std::unique_ptr<ColumnWriterState>> states;
};

Expand All @@ -42,39 +42,36 @@ class ParquetWriter {
std::vector<std::string> names, kuzu_parquet::format::CompressionCodec::type codec,
storage::MemoryManager* mm);

public:
void PrepareRowGroup(FactorizedTable& buffer, PreparedRowGroup& result);
void FlushRowGroup(PreparedRowGroup& row_group);
void Flush(FactorizedTable& buffer);
void Finalize();

static kuzu_parquet::format::Type::type KUZUTypeToParquetType(common::LogicalType* type);
static void SetSchemaProperties(
common::LogicalType* type, kuzu_parquet::format::SchemaElement& schema_ele);

kuzu_apache::thrift::protocol::TProtocol* GetProtocol() { return protocol.get(); }
kuzu_parquet::format::CompressionCodec::type GetCodec() { return codec; }
kuzu_parquet::format::Type::type GetType(uint64_t schema_idx) {
return file_meta_data.schema[schema_idx].type;
}
inline common::offset_t getOffset() const { return fileOffset; }
inline void write(const uint8_t* buf, uint32_t len) {
common::FileUtils::writeToFile(fileInfo.get(), buf, len, fileOffset);
fileOffset += len;
}
inline kuzu_parquet::format::CompressionCodec::type getCodec() { return codec; }
inline kuzu_apache::thrift::protocol::TProtocol* getProtocol() { return protocol.get(); }
inline kuzu_parquet::format::Type::type getParquetType(uint64_t schemaIdx) {
return fileMetaData.schema[schemaIdx].type;
}
void flush(FactorizedTable& ft);
void finalize();
static kuzu_parquet::format::Type::type convertToParquetType(common::LogicalType* type);
static void setSchemaProperties(
common::LogicalType* type, kuzu_parquet::format::SchemaElement& schemaElement);

private:
void prepareRowGroup(FactorizedTable& ft, PreparedRowGroup& result);
void flushRowGroup(PreparedRowGroup& rowGroup);

private:
std::string fileName;
std::vector<std::unique_ptr<common::LogicalType>> types;
std::vector<std::string> columnNames;
kuzu_parquet::format::CompressionCodec::type codec;

std::unique_ptr<common::FileInfo> fileInfo;
std::shared_ptr<kuzu_apache::thrift::protocol::TProtocol> protocol;
kuzu_parquet::format::FileMetaData file_meta_data;
kuzu_parquet::format::FileMetaData fileMetaData;
std::mutex lock;

std::vector<std::unique_ptr<ColumnWriter>> column_writers;
std::vector<std::unique_ptr<ColumnWriter>> columnWriters;
common::offset_t fileOffset;
storage::MemoryManager* mm;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ class NumericStatisticsState : public ColumnWriterStatistics {
public:
bool HasStats() { return min <= max; }

std::string GetMin() override {
return function::NumericLimits<SRC>::isSigned() ? GetMinValue() : std::string();
std::string getMin() override {
return function::NumericLimits<SRC>::isSigned() ? getMinValue() : std::string();
}
std::string GetMax() override {
return function::NumericLimits<SRC>::isSigned() ? GetMaxValue() : std::string();
std::string getMax() override {
return function::NumericLimits<SRC>::isSigned() ? getMaxValue() : std::string();
}
std::string GetMinValue() override {
std::string getMinValue() override {
return HasStats() ? std::string((char*)&min, sizeof(T)) : std::string();
}
std::string GetMaxValue() override {
std::string getMaxValue() override {
return HasStats() ? std::string((char*)&max, sizeof(T)) : std::string();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ class StringStatisticsState : public ColumnWriterStatistics {
has_stats = true;
}

std::string GetMin() override { return GetMinValue(); }
std::string GetMax() override { return GetMaxValue(); }
std::string GetMinValue() override { return HasStats() ? min : std::string(); }
std::string GetMaxValue() override { return HasStats() ? max : std::string(); }
std::string getMin() override { return getMinValue(); }
std::string getMax() override { return getMaxValue(); }
std::string getMinValue() override { return HasStats() ? min : std::string(); }
std::string getMaxValue() override { return HasStats() ? max : std::string(); }
};

class StringColumnWriterState : public BasicColumnWriterState {
Expand Down Expand Up @@ -122,30 +122,30 @@ class StringColumnWriter : public BasicColumnWriter {
return std::make_unique<StringStatisticsState>();
}

std::unique_ptr<ColumnWriterState> InitializeWriteState(
std::unique_ptr<ColumnWriterState> initializeWriteState(
kuzu_parquet::format::RowGroup& row_group) override {
auto result =
std::make_unique<StringColumnWriterState>(row_group, row_group.columns.size());
RegisterToRowGroup(row_group);
return std::move(result);
}

bool HasAnalyze() override { return true; }
bool hasAnalyze() override { return true; }

void Analyze(ColumnWriterState& state_p, ColumnWriterState* parent, common::ValueVector* vector,
void analyze(ColumnWriterState& state_p, ColumnWriterState* parent, common::ValueVector* vector,
uint64_t count) override {
auto& state = reinterpret_cast<StringColumnWriterState&>(state_p);
uint64_t vcount =
parent ? parent->definition_levels.size() - state.definition_levels.size() : count;
uint64_t parent_index = state.definition_levels.size();
parent ? parent->definitionLevels.size() - state.definitionLevels.size() : count;
uint64_t parent_index = state.definitionLevels.size();
uint64_t vector_index = 0;
uint32_t new_value_index = state.dictionary.size();
uint32_t last_value_index = -1;
uint64_t run_length = 0;
uint64_t run_count = 0;
for (auto i = 0u; i < vcount; i++) {

if (parent && !parent->is_empty.empty() && parent->is_empty[parent_index + i]) {
if (parent && !parent->isEmpty.empty() && parent->isEmpty[parent_index + i]) {
continue;
}
auto pos = vector->state->selVector->selectedPositions[vector_index];
Expand Down Expand Up @@ -178,7 +178,7 @@ class StringColumnWriter : public BasicColumnWriter {
state.estimated_rle_pages_size += MAX_DICTIONARY_KEY_SIZE * run_count;
}

void FinalizeAnalyze(ColumnWriterState& state_p) override {
void finalizeAnalyze(ColumnWriterState& state_p) override {
auto& state = reinterpret_cast<StringColumnWriterState&>(state_p);

// check if a dictionary will require more space than a plain write, or if the dictionary
Expand Down
Loading

0 comments on commit 75d5431

Please sign in to comment.