Skip to content

Commit

Permalink
Parquet writer rework
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed Oct 12, 2023
1 parent abca5e6 commit d634e8d
Show file tree
Hide file tree
Showing 37 changed files with 2,449 additions and 417 deletions.
16 changes: 16 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,5 +184,21 @@ struct OrderByConstants {
static constexpr uint64_t MIN_LIMIT_RATIO_TO_REDUCE = 2;
};

struct ParquetConstants {
static constexpr uint64_t PARQUET_DEFINE_VALID = 65535;
static constexpr const char* PARQUET_MAGIC_WORDS = "PAR1";
// We limit the uncompressed page size to 100MB.
// The max size in Parquet is 2GB, but we choose a more conservative limit.
static constexpr uint64_t MAX_UNCOMPRESSED_PAGE_SIZE = 100000000;
// Dictionary pages must be below 2GB. Unlike data pages, there's only one dictionary page.
// For this reason we go with a much higher, but still a conservative upper bound of 1GB.
static constexpr uint64_t MAX_UNCOMPRESSED_DICT_PAGE_SIZE = 1e9;
// The maximum size a key entry in an RLE page takes.
static constexpr uint64_t MAX_DICTIONARY_KEY_SIZE = sizeof(uint32_t);
// The size of encoding the string length.
static constexpr uint64_t STRING_LENGTH_SIZE = sizeof(uint32_t);
static constexpr uint64_t MAX_STRING_STATISTICS_SIZE = 10000;
};

} // namespace common
} // namespace kuzu
17 changes: 5 additions & 12 deletions src/include/processor/operator/persistent/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include "common/task_system/task_scheduler.h"
#include "processor/operator/persistent/csv_file_writer.h"
#include "processor/operator/persistent/file_writer.h"
#include "processor/operator/persistent/parquet_file_writer.h"
#include "processor/operator/physical_operator.h"
#include "processor/operator/sink.h"
#include "processor/result/result_set.h"
Expand All @@ -15,16 +14,11 @@ namespace processor {
class CopyToSharedState {
public:
CopyToSharedState(common::FileType fileType, std::string& filePath,
std::vector<std::string>& columnNames, std::vector<common::LogicalType>& columnTypes) {
if (fileType == common::FileType::CSV) {
fileWriter =
std::make_unique<processor::CSVFileWriter>(filePath, columnNames, columnTypes);
} else if (fileType == common::FileType::PARQUET) {
fileWriter =
std::make_unique<processor::ParquetFileWriter>(filePath, columnNames, columnTypes);
} else {
throw common::NotImplementedException("CopyToSharedState::CopyToSharedState");
}
std::vector<std::string>& columnNames,
std::vector<std::unique_ptr<common::LogicalType>> columnTypes) {
assert(fileType == common::FileType::CSV);
fileWriter = std::make_unique<processor::CSVFileWriter>(
filePath, columnNames, std::move(columnTypes));
}
inline std::unique_ptr<FileWriter>& getWriter() { return fileWriter; }

Expand Down Expand Up @@ -53,7 +47,6 @@ class CopyTo : public Sink {
}

protected:
std::string getOutputMsg();
std::vector<DataPos> vectorsToCopyPos;

private:
Expand Down
84 changes: 84 additions & 0 deletions src/include/processor/operator/persistent/copy_to_parquet.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#pragma once

#include "parquet/parquet_types.h"
#include "processor/operator/persistent/writer/parquet/parquet_writer.h"
#include "processor/operator/sink.h"
#include "processor/result/factorized_table.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class CopyToParquetSharedState {
public:
std::unique_ptr<ParquetWriter> writer;
};

struct CopyToParquetInfo {
kuzu_parquet::format::CompressionCodec::type codec =
kuzu_parquet::format::CompressionCodec::SNAPPY;
std::unique_ptr<FactorizedTableSchema> tableSchema;
std::vector<std::unique_ptr<common::LogicalType>> types;
std::vector<std::string> names;
std::vector<DataPos> dataPoses;
std::string fileName;

CopyToParquetInfo(std::unique_ptr<FactorizedTableSchema> tableSchema,
std::vector<std::unique_ptr<common::LogicalType>> types, std::vector<std::string> names,
std::vector<DataPos> dataPoses, std::string fileName)
: tableSchema{std::move(tableSchema)}, types{std::move(types)}, names{std::move(names)},
dataPoses{std::move(dataPoses)}, fileName{std::move(fileName)} {}

std::vector<std::unique_ptr<common::LogicalType>> copyTypes();

std::unique_ptr<CopyToParquetInfo> copy() {
return std::make_unique<CopyToParquetInfo>(
tableSchema->copy(), copyTypes(), names, dataPoses, fileName);
}
};

struct CopyToParquetLocalState {
std::unique_ptr<FactorizedTable> ft;
std::vector<common::ValueVector*> vectorsToAppend;
storage::MemoryManager* mm;

void init(CopyToParquetInfo* info, storage::MemoryManager* mm, ResultSet* resultSet);

inline void append() { ft->append(vectorsToAppend); }
};

class CopyToParquet : public Sink {
public:
CopyToParquet(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<CopyToParquetInfo> info,
std::shared_ptr<CopyToParquetSharedState> sharedState,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_TO_PARQUET,
std::move(child), id, paramsString},
info{std::move(info)}, localState{std::make_unique<CopyToParquetLocalState>()},
sharedState{std::move(sharedState)} {}

inline void finalize(ExecutionContext* executionContext) final {
sharedState->writer->finalize();
}
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;
void executeInternal(ExecutionContext* context) final;
std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<CopyToParquet>(resultSetDescriptor->copy(), info->copy(),
sharedState, children[0]->clone(), id, paramsString);
}

private:
void initGlobalStateInternal(ExecutionContext* context) final {
sharedState->writer = std::make_unique<ParquetWriter>(
info->fileName, info->copyTypes(), info->names, info->codec, context->memoryManager);
}

private:
std::shared_ptr<CopyToParquetSharedState> sharedState;
std::unique_ptr<CopyToParquetLocalState> localState;
std::unique_ptr<CopyToParquetInfo> info;
};

} // namespace processor
} // namespace kuzu
9 changes: 5 additions & 4 deletions src/include/processor/operator/persistent/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ namespace processor {
class FileWriter {
public:
FileWriter(std::string filePath, std::vector<std::string> columnNames,
std::vector<common::LogicalType> columnTypes)
: filePath{filePath}, columnNames{columnNames}, columnTypes{columnTypes} {}
virtual ~FileWriter(){};
std::vector<std::unique_ptr<common::LogicalType>> columnTypes)
: filePath{std::move(filePath)}, columnNames{std::move(columnNames)}, columnTypes{std::move(
columnTypes)} {}
virtual ~FileWriter() = default;
virtual void init() = 0;
virtual void openFile() = 0;
virtual void closeFile() = 0;
Expand All @@ -20,7 +21,7 @@ class FileWriter {
protected:
std::string filePath;
std::vector<std::string> columnNames;
std::vector<common::LogicalType> columnTypes;
std::vector<std::unique_ptr<common::LogicalType>> columnTypes;
};

} // namespace processor
Expand Down
58 changes: 0 additions & 58 deletions src/include/processor/operator/persistent/parquet_file_writer.h

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <list>

#include "common/file_utils.h"
#include "thrift/protocol/TCompactProtocol.h"
#include "thrift/transport/TBufferTransports.h"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#pragma once

#include "parquet/parquet_types.h"
#include "processor/operator/persistent/writer/parquet/column_writer.h"

namespace kuzu {
namespace processor {

class BasicColumnWriterState : public ColumnWriterState {
public:
BasicColumnWriterState(kuzu_parquet::format::RowGroup& rowGroup, uint64_t colIdx)
: rowGroup{rowGroup}, colIdx{colIdx} {
pageInfo.emplace_back();
}

kuzu_parquet::format::RowGroup& rowGroup;
uint64_t colIdx;
std::vector<PageInformation> pageInfo;
std::vector<PageWriteInformation> writeInfo;
std::unique_ptr<ColumnWriterStatistics> statsState;
uint64_t currentPage = 0;
};

class BasicColumnWriter : public ColumnWriter {
public:
BasicColumnWriter(ParquetWriter& writer, uint64_t schemaIdx,
std::vector<std::string> schemaPath, uint64_t maxRepeat, uint64_t maxDefine,
bool canHaveNulls)
: ColumnWriter(
writer, schemaIdx, std::move(schemaPath), maxRepeat, maxDefine, canHaveNulls) {}

public:
std::unique_ptr<ColumnWriterState> initializeWriteState(
kuzu_parquet::format::RowGroup& rowGroup) override;
void prepare(ColumnWriterState& state, ColumnWriterState* parent, common::ValueVector* vector,
uint64_t count) override;
void beginWrite(ColumnWriterState& state) override;
void write(ColumnWriterState& state, common::ValueVector* vector, uint64_t count) override;
void finalizeWrite(ColumnWriterState& state) override;

protected:
void writeLevels(BufferedSerializer& bufferedSerializer, const std::vector<uint16_t>& levels,
uint64_t maxValue, uint64_t startOffset, uint64_t count);

virtual kuzu_parquet::format::Encoding::type getEncoding(BasicColumnWriterState& state) {
return kuzu_parquet::format::Encoding::PLAIN;
}

void nextPage(BasicColumnWriterState& state);
void flushPage(BasicColumnWriterState& state);

// Initializes the state used to track statistics during writing. Only used for scalar types.
virtual std::unique_ptr<ColumnWriterStatistics> initializeStatsState() {
return std::make_unique<ColumnWriterStatistics>();

Check warning on line 54 in src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h#L53-L54

Added lines #L53 - L54 were not covered by tests
}

// Initialize the writer for a specific page. Only used for scalar types.
virtual std::unique_ptr<ColumnWriterPageState> initializePageState(
BasicColumnWriterState& state) {
return nullptr;
}

// Flushes the writer for a specific page. Only used for scalar types.
virtual void flushPageState(
BufferedSerializer& bufferedSerializer, ColumnWriterPageState* state) {}

// Retrieves the row size of a vector at the specified location. Only used for scalar types.
virtual uint64_t getRowSize(

Check warning on line 68 in src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h#L68

Added line #L68 was not covered by tests
common::ValueVector* vector, uint64_t index, BasicColumnWriterState& state) {
throw common::NotImplementedException{"BasicColumnWriter::getRowSize"};

Check warning on line 70 in src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h#L70

Added line #L70 was not covered by tests
}
// Writes a (subset of a) vector to the specified serializer. Only used for scalar types.
virtual void writeVector(BufferedSerializer& bufferedSerializer, ColumnWriterStatistics* stats,
ColumnWriterPageState* pageState, common::ValueVector* vector, uint64_t chunkStart,
uint64_t chunkEnd) = 0;

virtual bool hasDictionary(BasicColumnWriterState& writerState) { return false; }
// The number of elements in the dictionary.
virtual uint64_t dictionarySize(BasicColumnWriterState& writerState) {
throw common::NotImplementedException{"BasicColumnWriter::dictionarySize"};

Check warning on line 80 in src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h#L79-L80

Added lines #L79 - L80 were not covered by tests
}
void writeDictionary(BasicColumnWriterState& state,
std::unique_ptr<BufferedSerializer> bufferedSerializer, uint64_t rowCount);
virtual void flushDictionary(BasicColumnWriterState& state, ColumnWriterStatistics* stats) {
throw common::NotImplementedException{"BasicColumnWriter::flushDictionary"};

Check warning on line 85 in src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h#L84-L85

Added lines #L84 - L85 were not covered by tests
}

void setParquetStatistics(
BasicColumnWriterState& state, kuzu_parquet::format::ColumnChunk& column);
void registerToRowGroup(kuzu_parquet::format::RowGroup& rowGroup);
};

} // namespace processor
} // namespace kuzu
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#pragma once

#include "processor/operator/persistent/writer/parquet/basic_column_writer.h"

namespace kuzu {
namespace processor {

class BooleanStatisticsState : public ColumnWriterStatistics {
public:
BooleanStatisticsState() : min{true}, max{false} {}

bool min;
bool max;

public:
bool hasStats() { return !(min && !max); }

Check warning on line 16 in src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h#L16

Added line #L16 was not covered by tests

std::string getMin() override { return getMinValue(); }
std::string getMax() override { return getMaxValue(); }
std::string getMinValue() override {
return hasStats() ? std::string(reinterpret_cast<const char*>(&min), sizeof(bool)) :
std::string();
}
std::string getMaxValue() override {
return hasStats() ? std::string(reinterpret_cast<const char*>(&max), sizeof(bool)) :
std::string();
}
};

class BooleanWriterPageState : public ColumnWriterPageState {
public:
uint8_t byte = 0;
uint8_t bytePos = 0;
};

class BooleanColumnWriter : public BasicColumnWriter {
public:
BooleanColumnWriter(ParquetWriter& writer, uint64_t schemaIdx,
std::vector<std::string> schemaPath, uint64_t maxRepeat, uint64_t maxDefine,
bool canHaveNulls)
: BasicColumnWriter(
writer, schemaIdx, std::move(schemaPath), maxRepeat, maxDefine, canHaveNulls) {}

inline std::unique_ptr<ColumnWriterStatistics> initializeStatsState() override {
return std::make_unique<BooleanStatisticsState>();
}

inline uint64_t getRowSize(
common::ValueVector* vector, uint64_t index, BasicColumnWriterState& state) override {
return sizeof(bool);
}

inline std::unique_ptr<ColumnWriterPageState> initializePageState(
BasicColumnWriterState& state) override {
return std::make_unique<BooleanWriterPageState>();
}

void writeVector(BufferedSerializer& bufferedSerializer,
ColumnWriterStatistics* writerStatistics, ColumnWriterPageState* writerPageState,
common::ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) override;

void flushPageState(
BufferedSerializer& temp_writer, ColumnWriterPageState* writerPageState) override;
};

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit d634e8d

Please sign in to comment.