Skip to content

Commit

Permalink
rework init reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Apr 13, 2023
1 parent b001e03 commit 0d1e363
Show file tree
Hide file tree
Showing 16 changed files with 652 additions and 762 deletions.
3 changes: 2 additions & 1 deletion src/include/catalog/catalog_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ struct RelTableSchema : TableSchema {

RelTableSchema()
: TableSchema{"", common::INVALID_TABLE_ID, false /* isNodeTable */, {} /* properties */},
relMultiplicity{MANY_MANY} {}
relMultiplicity{MANY_MANY}, srcTableID{common::INVALID_TABLE_ID},
dstTableID{common::INVALID_TABLE_ID} {}
RelTableSchema(std::string tableName, common::table_id_t tableID,
RelMultiplicity relMultiplicity, std::vector<Property> properties,
common::table_id_t srcTableID, common::table_id_t dstTableID)
Expand Down
68 changes: 33 additions & 35 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
namespace kuzu {
namespace storage {

using lock_t = std::unique_lock<std::mutex>;

using set_element_func_t = std::function<void(NodeInMemColumn* column,
InMemColumnChunk* columnChunk, common::offset_t nodeOffset, const std::string& data)>;

Expand Down Expand Up @@ -39,7 +37,7 @@ class CSVNodeCopyMorsel : public NodeCopyMorsel<arrow::Array> {
public:
CSVNodeCopyMorsel(std::shared_ptr<arrow::RecordBatch> recordBatch, common::offset_t startOffset,
common::block_idx_t blockIdx)
: NodeCopyMorsel{startOffset, blockIdx}, recordBatch{recordBatch} {};
: NodeCopyMorsel{startOffset, blockIdx}, recordBatch{std::move(recordBatch)} {};

const std::vector<std::shared_ptr<arrow::Array>>& getArrowColumns() override {
return recordBatch->columns();
Expand All @@ -54,7 +52,7 @@ class ParquetNodeCopyMorsel : public NodeCopyMorsel<arrow::ChunkedArray> {
public:
ParquetNodeCopyMorsel(std::shared_ptr<arrow::Table> currTable, common::offset_t startOffset,
common::block_idx_t blockIdx)
: NodeCopyMorsel{startOffset, blockIdx}, currTable{currTable} {};
: NodeCopyMorsel{startOffset, blockIdx}, currTable{std::move(currTable)} {};

const std::vector<std::shared_ptr<arrow::ChunkedArray>>& getArrowColumns() override {
return currTable->columns();
Expand All @@ -64,51 +62,51 @@ class ParquetNodeCopyMorsel : public NodeCopyMorsel<arrow::ChunkedArray> {
std::shared_ptr<arrow::Table> currTable;
};

template<typename T1, typename T2>
template<typename HASH_INDEX_T, typename MORSEL_T>
class NodeCopySharedState {

public:
NodeCopySharedState(
std::string filePath, HashIndexBuilder<T1>* pkIndex, common::offset_t startOffset)
: filePath{filePath}, pkIndex{pkIndex}, startOffset{startOffset}, blockIdx{0} {};
std::string filePath, HashIndexBuilder<HASH_INDEX_T>* pkIndex, common::offset_t startOffset)
: filePath{std::move(filePath)}, pkIndex{pkIndex}, startOffset{startOffset}, blockIdx{0} {};

virtual ~NodeCopySharedState() = default;

virtual std::unique_ptr<NodeCopyMorsel<T2>> getMorsel() = 0;
virtual std::unique_ptr<NodeCopyMorsel<MORSEL_T>> getMorsel() = 0;

public:
std::string filePath;
HashIndexBuilder<T1>* pkIndex;
HashIndexBuilder<HASH_INDEX_T>* pkIndex;
common::offset_t startOffset;

protected:
common::block_idx_t blockIdx;
std::mutex mtx;
};

template<typename T>
class CSVNodeCopySharedState : public NodeCopySharedState<T, arrow::Array> {
template<typename HASH_INDEX_T>
class CSVNodeCopySharedState : public NodeCopySharedState<HASH_INDEX_T, arrow::Array> {

public:
CSVNodeCopySharedState(std::string filePath, HashIndexBuilder<T>* pkIndex,
CSVNodeCopySharedState(std::string filePath, HashIndexBuilder<HASH_INDEX_T>* pkIndex,
common::offset_t startOffset,
std::shared_ptr<arrow::csv::StreamingReader> csvStreamingReader)
: NodeCopySharedState<T, arrow::Array>{filePath, pkIndex, startOffset},
csvStreamingReader{move(csvStreamingReader)} {};
: NodeCopySharedState<HASH_INDEX_T, arrow::Array>{filePath, pkIndex, startOffset},
csvStreamingReader{std::move(csvStreamingReader)} {};
std::unique_ptr<NodeCopyMorsel<arrow::Array>> getMorsel() override;

private:
std::shared_ptr<arrow::csv::StreamingReader> csvStreamingReader;
};

template<typename T>
class ParquetNodeCopySharedState : public NodeCopySharedState<T, arrow::ChunkedArray> {
template<typename HASH_INDEX_T>
class ParquetNodeCopySharedState : public NodeCopySharedState<HASH_INDEX_T, arrow::ChunkedArray> {

public:
ParquetNodeCopySharedState(std::string filePath, HashIndexBuilder<T>* pkIndex,
ParquetNodeCopySharedState(std::string filePath, HashIndexBuilder<HASH_INDEX_T>* pkIndex,
common::offset_t startOffset, uint64_t numBlocks,
std::unique_ptr<parquet::arrow::FileReader> parquetReader)
: NodeCopySharedState<T, arrow::ChunkedArray>{filePath, pkIndex, startOffset},
: NodeCopySharedState<HASH_INDEX_T, arrow::ChunkedArray>{filePath, pkIndex, startOffset},
numBlocks{numBlocks}, parquetReader{std::move(parquetReader)} {};
std::unique_ptr<NodeCopyMorsel<arrow::ChunkedArray>> getMorsel() override;

Expand All @@ -135,39 +133,39 @@ class NodeCopier : public TableCopier {

void saveToFile() override;

template<typename T>
template<typename HASH_INDEX_T>
static void populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::NullMask* nullMask, HashIndexBuilder<T>* pkIndex, common::offset_t startOffset,
uint64_t numValues);
common::NullMask* nullMask, HashIndexBuilder<HASH_INDEX_T>* pkIndex,
common::offset_t startOffset, uint64_t numValues);

std::unordered_map<common::property_id_t, std::unique_ptr<NodeInMemColumn>> columns;

private:
template<typename T>
arrow::Status populateColumns(processor::ExecutionContext* executionContext);
template<typename HASH_INDEX_T>
void populateColumns(processor::ExecutionContext* executionContext);

template<typename T>
arrow::Status populateColumnsFromCSV(processor::ExecutionContext* executionContext,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);
template<typename HASH_INDEX_T>
void populateColumnsFromCSV(processor::ExecutionContext* executionContext,
std::unique_ptr<HashIndexBuilder<HASH_INDEX_T>>& pkIndex);

template<typename T>
arrow::Status populateColumnsFromParquet(processor::ExecutionContext* executionContext,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);
template<typename HASH_INDEX_T>
void populateColumnsFromParquet(processor::ExecutionContext* executionContext,
std::unique_ptr<HashIndexBuilder<HASH_INDEX_T>>& pkIndex);

template<typename T>
template<typename MORSEL_T>
static void putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeInMemColumn* column,
std::shared_ptr<T> arrowArray, common::offset_t startNodeOffset,
std::shared_ptr<MORSEL_T> arrowArray, common::offset_t startNodeOffset,
uint64_t numLinesInCurBlock, common::CopyDescription& copyDescription,
PageByteCursor& overflowCursor);

// Concurrent tasks.
template<typename T1, typename T2>
static void batchPopulateColumnsTask(NodeCopySharedState<T1, T2>* sharedState,
template<typename HASH_INDEX_T, typename MORSEL_T>
static void batchPopulateColumnsTask(NodeCopySharedState<HASH_INDEX_T, MORSEL_T>* sharedState,
NodeCopier* copier, processor::ExecutionContext* executionContext);

template<typename T>
template<typename HASH_INDEX_T>
static void appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::offset_t offset, HashIndexBuilder<T>* pkIndex) {
common::offset_t offset, HashIndexBuilder<HASH_INDEX_T>* pkIndex) {
assert(false);
}

Expand Down
6 changes: 3 additions & 3 deletions src/include/storage/copier/rel_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ class RelCopier : public TableCopier {

void initializePkIndexes(common::table_id_t nodeTableID, BufferManager& bufferManager);

arrow::Status executePopulateTask(PopulateTaskType populateTaskType);
void executePopulateTask(PopulateTaskType populateTaskType);

arrow::Status populateFromCSV(PopulateTaskType populateTaskType);
void populateFromCSV(PopulateTaskType populateTaskType);

arrow::Status populateFromParquet(PopulateTaskType populateTaskType);
void populateFromParquet(PopulateTaskType populateTaskType);

void populateAdjColumnsAndCountRelsInAdjLists();

Expand Down
31 changes: 11 additions & 20 deletions src/include/storage/copier/table_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include <arrow/pretty_print.h>
#include <arrow/result.h>
#include <arrow/scalar.h>
#include <arrow/status.h>
#include <arrow/table.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
Expand Down Expand Up @@ -52,28 +51,15 @@ class TableCopier {

virtual void populateInMemoryStructures(processor::ExecutionContext* executionContext);

inline void updateTableStatistics() {
tablesStatistics->setNumTuplesForTable(tableSchema->tableID, numRows);
}

void countNumLines(const std::vector<std::string>& filePath);

arrow::Status countNumLinesCSV(const std::vector<std::string>& filePaths);
void countNumLines(const std::vector<std::string>& filePaths);

arrow::Status countNumLinesParquet(const std::vector<std::string>& filePaths);
void countNumLinesCSV(const std::vector<std::string>& filePaths);

arrow::Status initCSVReaderAndCheckStatus(
std::shared_ptr<arrow::csv::StreamingReader>& csv_streaming_reader,
const std::string& filePath);
void countNumLinesParquet(const std::vector<std::string>& filePaths);

arrow::Status initCSVReader(std::shared_ptr<arrow::csv::StreamingReader>& csv_streaming_reader,
const std::string& filePath);
std::shared_ptr<arrow::csv::StreamingReader> initCSVReader(const std::string& filePath) const;

arrow::Status initParquetReaderAndCheckStatus(
std::unique_ptr<parquet::arrow::FileReader>& reader, const std::string& filePath);

arrow::Status initParquetReader(
std::unique_ptr<parquet::arrow::FileReader>& reader, const std::string& filePath);
std::unique_ptr<parquet::arrow::FileReader> initParquetReader(const std::string& filePath);

static std::vector<std::pair<int64_t, int64_t>> getListElementPos(
const std::string& l, int64_t from, int64_t to, common::CopyDescription& copyDescription);
Expand All @@ -86,14 +72,19 @@ class TableCopier {

static void throwCopyExceptionIfNotOK(const arrow::Status& status);

uint64_t getNumBlocks() const {
inline void updateTableStatistics() {
tablesStatistics->setNumTuplesForTable(tableSchema->tableID, numRows);
}
inline uint64_t getNumBlocks() const {
uint64_t numBlocks = 0;
for (auto& [_, info] : fileBlockInfos) {
numBlocks += info.numBlocks;
}
return numBlocks;
}

static std::shared_ptr<arrow::DataType> toArrowDataType(const common::DataType& dataType);

protected:
std::shared_ptr<spdlog::logger> logger;
common::CopyDescription& copyDescription;
Expand Down
Loading

0 comments on commit 0d1e363

Please sign in to comment.