Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet writer #2177

Merged
merged 1 commit into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,5 +184,21 @@ struct OrderByConstants {
static constexpr uint64_t MIN_LIMIT_RATIO_TO_REDUCE = 2;
};

struct ParquetConstants {
static constexpr uint64_t PARQUET_DEFINE_VALID = 65535;
static constexpr const char* PARQUET_MAGIC_WORDS = "PAR1";
// We limit the uncompressed page size to 100MB.
// The max size in Parquet is 2GB, but we choose a more conservative limit.
static constexpr uint64_t MAX_UNCOMPRESSED_PAGE_SIZE = 100000000;
// Dictionary pages must be below 2GB. Unlike data pages, there's only one dictionary page.
// For this reason we go with a much higher, but still a conservative upper bound of 1GB.
static constexpr uint64_t MAX_UNCOMPRESSED_DICT_PAGE_SIZE = 1e9;
// The maximum size a key entry in an RLE page takes.
static constexpr uint64_t MAX_DICTIONARY_KEY_SIZE = sizeof(uint32_t);
// The size of encoding the string length.
static constexpr uint64_t STRING_LENGTH_SIZE = sizeof(uint32_t);
static constexpr uint64_t MAX_STRING_STATISTICS_SIZE = 10000;
};

} // namespace common
} // namespace kuzu
17 changes: 5 additions & 12 deletions src/include/processor/operator/persistent/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include "common/task_system/task_scheduler.h"
#include "processor/operator/persistent/csv_file_writer.h"
#include "processor/operator/persistent/file_writer.h"
#include "processor/operator/persistent/parquet_file_writer.h"
#include "processor/operator/physical_operator.h"
#include "processor/operator/sink.h"
#include "processor/result/result_set.h"
Expand All @@ -15,16 +14,11 @@ namespace processor {
class CopyToSharedState {
public:
CopyToSharedState(common::FileType fileType, std::string& filePath,
std::vector<std::string>& columnNames, std::vector<common::LogicalType>& columnTypes) {
if (fileType == common::FileType::CSV) {
fileWriter =
std::make_unique<processor::CSVFileWriter>(filePath, columnNames, columnTypes);
} else if (fileType == common::FileType::PARQUET) {
fileWriter =
std::make_unique<processor::ParquetFileWriter>(filePath, columnNames, columnTypes);
} else {
throw common::NotImplementedException("CopyToSharedState::CopyToSharedState");
}
std::vector<std::string>& columnNames,
std::vector<std::unique_ptr<common::LogicalType>> columnTypes) {
assert(fileType == common::FileType::CSV);
fileWriter = std::make_unique<processor::CSVFileWriter>(
filePath, columnNames, std::move(columnTypes));
}
inline std::unique_ptr<FileWriter>& getWriter() { return fileWriter; }

Expand Down Expand Up @@ -53,7 +47,6 @@ class CopyTo : public Sink {
}

protected:
std::string getOutputMsg();
std::vector<DataPos> vectorsToCopyPos;

private:
Expand Down
84 changes: 84 additions & 0 deletions src/include/processor/operator/persistent/copy_to_parquet.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#pragma once

#include "parquet/parquet_types.h"
#include "processor/operator/persistent/writer/parquet/parquet_writer.h"
#include "processor/operator/sink.h"
#include "processor/result/factorized_table.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class CopyToParquetSharedState {
public:
std::unique_ptr<ParquetWriter> writer;
};

struct CopyToParquetInfo {
kuzu_parquet::format::CompressionCodec::type codec =
kuzu_parquet::format::CompressionCodec::SNAPPY;
std::unique_ptr<FactorizedTableSchema> tableSchema;
std::vector<std::unique_ptr<common::LogicalType>> types;
std::vector<std::string> names;
std::vector<DataPos> dataPoses;
std::string fileName;

CopyToParquetInfo(std::unique_ptr<FactorizedTableSchema> tableSchema,
std::vector<std::unique_ptr<common::LogicalType>> types, std::vector<std::string> names,
std::vector<DataPos> dataPoses, std::string fileName)
: tableSchema{std::move(tableSchema)}, types{std::move(types)}, names{std::move(names)},
dataPoses{std::move(dataPoses)}, fileName{std::move(fileName)} {}

std::vector<std::unique_ptr<common::LogicalType>> copyTypes();

std::unique_ptr<CopyToParquetInfo> copy() {
return std::make_unique<CopyToParquetInfo>(
tableSchema->copy(), copyTypes(), names, dataPoses, fileName);
}
};

struct CopyToParquetLocalState {
std::unique_ptr<FactorizedTable> ft;
std::vector<common::ValueVector*> vectorsToAppend;
storage::MemoryManager* mm;

void init(CopyToParquetInfo* info, storage::MemoryManager* mm, ResultSet* resultSet);

inline void append() { ft->append(vectorsToAppend); }
};

class CopyToParquet : public Sink {
public:
CopyToParquet(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<CopyToParquetInfo> info,
std::shared_ptr<CopyToParquetSharedState> sharedState,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_TO_PARQUET,
std::move(child), id, paramsString},
info{std::move(info)}, localState{std::make_unique<CopyToParquetLocalState>()},
sharedState{std::move(sharedState)} {}

inline void finalize(ExecutionContext* executionContext) final {
sharedState->writer->finalize();
}
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;
void executeInternal(ExecutionContext* context) final;
std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<CopyToParquet>(resultSetDescriptor->copy(), info->copy(),
sharedState, children[0]->clone(), id, paramsString);
}

private:
void initGlobalStateInternal(ExecutionContext* context) final {
sharedState->writer = std::make_unique<ParquetWriter>(
info->fileName, info->copyTypes(), info->names, info->codec, context->memoryManager);
}

private:
std::shared_ptr<CopyToParquetSharedState> sharedState;
std::unique_ptr<CopyToParquetLocalState> localState;
std::unique_ptr<CopyToParquetInfo> info;
};

} // namespace processor
} // namespace kuzu
9 changes: 5 additions & 4 deletions src/include/processor/operator/persistent/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ namespace processor {
class FileWriter {
public:
FileWriter(std::string filePath, std::vector<std::string> columnNames,
std::vector<common::LogicalType> columnTypes)
: filePath{filePath}, columnNames{columnNames}, columnTypes{columnTypes} {}
virtual ~FileWriter(){};
std::vector<std::unique_ptr<common::LogicalType>> columnTypes)
: filePath{std::move(filePath)}, columnNames{std::move(columnNames)}, columnTypes{std::move(
columnTypes)} {}
virtual ~FileWriter() = default;
virtual void init() = 0;
virtual void openFile() = 0;
virtual void closeFile() = 0;
Expand All @@ -20,7 +21,7 @@ class FileWriter {
protected:
std::string filePath;
std::vector<std::string> columnNames;
std::vector<common::LogicalType> columnTypes;
std::vector<std::unique_ptr<common::LogicalType>> columnTypes;
};

} // namespace processor
Expand Down
58 changes: 0 additions & 58 deletions src/include/processor/operator/persistent/parquet_file_writer.h

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <list>

#include "common/file_utils.h"
acquamarin marked this conversation as resolved.
Show resolved Hide resolved
#include "thrift/protocol/TCompactProtocol.h"
#include "thrift/transport/TBufferTransports.h"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#pragma once

#include "parquet/parquet_types.h"
#include "processor/operator/persistent/writer/parquet/column_writer.h"

namespace kuzu {
namespace processor {

class BasicColumnWriterState : public ColumnWriterState {
public:
BasicColumnWriterState(kuzu_parquet::format::RowGroup& rowGroup, uint64_t colIdx)
: rowGroup{rowGroup}, colIdx{colIdx} {
pageInfo.emplace_back();
}

kuzu_parquet::format::RowGroup& rowGroup;
uint64_t colIdx;
std::vector<PageInformation> pageInfo;
std::vector<PageWriteInformation> writeInfo;
std::unique_ptr<ColumnWriterStatistics> statsState;
uint64_t currentPage = 0;
};

class BasicColumnWriter : public ColumnWriter {
public:
BasicColumnWriter(ParquetWriter& writer, uint64_t schemaIdx,
std::vector<std::string> schemaPath, uint64_t maxRepeat, uint64_t maxDefine,
bool canHaveNulls)
: ColumnWriter(
writer, schemaIdx, std::move(schemaPath), maxRepeat, maxDefine, canHaveNulls) {}

public:
std::unique_ptr<ColumnWriterState> initializeWriteState(
kuzu_parquet::format::RowGroup& rowGroup) override;
void prepare(ColumnWriterState& state, ColumnWriterState* parent, common::ValueVector* vector,
uint64_t count) override;
void beginWrite(ColumnWriterState& state) override;
void write(ColumnWriterState& state, common::ValueVector* vector, uint64_t count) override;
void finalizeWrite(ColumnWriterState& state) override;

protected:
void writeLevels(BufferedSerializer& bufferedSerializer, const std::vector<uint16_t>& levels,
uint64_t maxValue, uint64_t startOffset, uint64_t count);

virtual kuzu_parquet::format::Encoding::type getEncoding(BasicColumnWriterState& state) {
return kuzu_parquet::format::Encoding::PLAIN;
}

void nextPage(BasicColumnWriterState& state);
void flushPage(BasicColumnWriterState& state);

// Initializes the state used to track statistics during writing. Only used for scalar types.
virtual std::unique_ptr<ColumnWriterStatistics> initializeStatsState() {
return std::make_unique<ColumnWriterStatistics>();

Check warning on line 54 in src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h#L53-L54

Added lines #L53 - L54 were not covered by tests
}

// Initialize the writer for a specific page. Only used for scalar types.
virtual std::unique_ptr<ColumnWriterPageState> initializePageState(
BasicColumnWriterState& state) {
return nullptr;
}

// Flushes the writer for a specific page. Only used for scalar types.
virtual void flushPageState(
BufferedSerializer& bufferedSerializer, ColumnWriterPageState* state) {}

// Retrieves the row size of a vector at the specified location. Only used for scalar types.
virtual uint64_t getRowSize(

Check warning on line 68 in src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h#L68

Added line #L68 was not covered by tests
common::ValueVector* vector, uint64_t index, BasicColumnWriterState& state) {
throw common::NotImplementedException{"BasicColumnWriter::getRowSize"};

Check warning on line 70 in src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h#L70

Added line #L70 was not covered by tests
}
// Writes a (subset of a) vector to the specified serializer. Only used for scalar types.
virtual void writeVector(BufferedSerializer& bufferedSerializer, ColumnWriterStatistics* stats,
ColumnWriterPageState* pageState, common::ValueVector* vector, uint64_t chunkStart,
uint64_t chunkEnd) = 0;

virtual bool hasDictionary(BasicColumnWriterState& writerState) { return false; }
// The number of elements in the dictionary.
virtual uint64_t dictionarySize(BasicColumnWriterState& writerState) {
throw common::NotImplementedException{"BasicColumnWriter::dictionarySize"};

Check warning on line 80 in src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h#L79-L80

Added lines #L79 - L80 were not covered by tests
}
void writeDictionary(BasicColumnWriterState& state,
std::unique_ptr<BufferedSerializer> bufferedSerializer, uint64_t rowCount);
virtual void flushDictionary(BasicColumnWriterState& state, ColumnWriterStatistics* stats) {
throw common::NotImplementedException{"BasicColumnWriter::flushDictionary"};

Check warning on line 85 in src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h#L84-L85

Added lines #L84 - L85 were not covered by tests
}

void setParquetStatistics(
BasicColumnWriterState& state, kuzu_parquet::format::ColumnChunk& column);
void registerToRowGroup(kuzu_parquet::format::RowGroup& rowGroup);
};

} // namespace processor
} // namespace kuzu
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#pragma once

#include "processor/operator/persistent/writer/parquet/basic_column_writer.h"

namespace kuzu {
namespace processor {

class BooleanStatisticsState : public ColumnWriterStatistics {
public:
BooleanStatisticsState() : min{true}, max{false} {}

bool min;
bool max;

public:
bool hasStats() { return !(min && !max); }

Check warning on line 16 in src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h#L16

Added line #L16 was not covered by tests

std::string getMin() override { return getMinValue(); }
std::string getMax() override { return getMaxValue(); }
std::string getMinValue() override {
return hasStats() ? std::string(reinterpret_cast<const char*>(&min), sizeof(bool)) :
std::string();
}
std::string getMaxValue() override {
return hasStats() ? std::string(reinterpret_cast<const char*>(&max), sizeof(bool)) :
std::string();
}
};

class BooleanWriterPageState : public ColumnWriterPageState {
public:
uint8_t byte = 0;
uint8_t bytePos = 0;
};

class BooleanColumnWriter : public BasicColumnWriter {
public:
BooleanColumnWriter(ParquetWriter& writer, uint64_t schemaIdx,
std::vector<std::string> schemaPath, uint64_t maxRepeat, uint64_t maxDefine,
bool canHaveNulls)
: BasicColumnWriter(
writer, schemaIdx, std::move(schemaPath), maxRepeat, maxDefine, canHaveNulls) {}

inline std::unique_ptr<ColumnWriterStatistics> initializeStatsState() override {
return std::make_unique<BooleanStatisticsState>();
}

inline uint64_t getRowSize(
common::ValueVector* vector, uint64_t index, BasicColumnWriterState& state) override {
return sizeof(bool);
}

inline std::unique_ptr<ColumnWriterPageState> initializePageState(
BasicColumnWriterState& state) override {
return std::make_unique<BooleanWriterPageState>();
}

void writeVector(BufferedSerializer& bufferedSerializer,
ColumnWriterStatistics* writerStatistics, ColumnWriterPageState* writerPageState,
common::ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) override;

void flushPageState(
BufferedSerializer& temp_writer, ColumnWriterPageState* writerPageState) override;
};

} // namespace processor
} // namespace kuzu
Loading
Loading