Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Apr 12, 2023
1 parent a8c1731 commit f2264ff
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 56 deletions.
56 changes: 26 additions & 30 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#pragma once

#include <utility>

#include "storage/in_mem_storage_structure/node_in_mem_column.h"
#include "storage/index/hash_index_builder.h"
#include "storage/store/nodes_statistics_and_deleted_ids.h"
Expand All @@ -10,8 +8,6 @@
namespace kuzu {
namespace storage {

using lock_t = std::unique_lock<std::mutex>;

using set_element_func_t = std::function<void(NodeInMemColumn* column,
InMemColumnChunk* columnChunk, common::offset_t nodeOffset, const std::string& data)>;

Expand Down Expand Up @@ -66,51 +62,51 @@ class ParquetNodeCopyMorsel : public NodeCopyMorsel<arrow::ChunkedArray> {
std::shared_ptr<arrow::Table> currTable;
};

template<typename T1, typename T2>
template<typename HASH_INDEX_T, typename MORSEL_T>
class NodeCopySharedState {

public:
NodeCopySharedState(
std::string filePath, HashIndexBuilder<T1>* pkIndex, common::offset_t startOffset)
std::string filePath, HashIndexBuilder<HASH_INDEX_T>* pkIndex, common::offset_t startOffset)
: filePath{std::move(filePath)}, pkIndex{pkIndex}, startOffset{startOffset}, blockIdx{0} {};

virtual ~NodeCopySharedState() = default;

virtual std::unique_ptr<NodeCopyMorsel<T2>> getMorsel() = 0;
virtual std::unique_ptr<NodeCopyMorsel<MORSEL_T>> getMorsel() = 0;

public:
std::string filePath;
HashIndexBuilder<T1>* pkIndex;
HashIndexBuilder<HASH_INDEX_T>* pkIndex;
common::offset_t startOffset;

protected:
common::block_idx_t blockIdx;
std::mutex mtx;
};

template<typename T>
class CSVNodeCopySharedState : public NodeCopySharedState<T, arrow::Array> {
template<typename HASH_INDEX_T>
class CSVNodeCopySharedState : public NodeCopySharedState<HASH_INDEX_T, arrow::Array> {

public:
CSVNodeCopySharedState(std::string filePath, HashIndexBuilder<T>* pkIndex,
CSVNodeCopySharedState(std::string filePath, HashIndexBuilder<HASH_INDEX_T>* pkIndex,
common::offset_t startOffset,
std::shared_ptr<arrow::csv::StreamingReader> csvStreamingReader)
: NodeCopySharedState<T, arrow::Array>{filePath, pkIndex, startOffset},
: NodeCopySharedState<HASH_INDEX_T, arrow::Array>{filePath, pkIndex, startOffset},
csvStreamingReader{std::move(csvStreamingReader)} {};
std::unique_ptr<NodeCopyMorsel<arrow::Array>> getMorsel() override;

private:
std::shared_ptr<arrow::csv::StreamingReader> csvStreamingReader;
};

template<typename T>
class ParquetNodeCopySharedState : public NodeCopySharedState<T, arrow::ChunkedArray> {
template<typename HASH_INDEX_T>
class ParquetNodeCopySharedState : public NodeCopySharedState<HASH_INDEX_T, arrow::ChunkedArray> {

public:
ParquetNodeCopySharedState(std::string filePath, HashIndexBuilder<T>* pkIndex,
ParquetNodeCopySharedState(std::string filePath, HashIndexBuilder<HASH_INDEX_T>* pkIndex,
common::offset_t startOffset, uint64_t numBlocks,
std::unique_ptr<parquet::arrow::FileReader> parquetReader)
: NodeCopySharedState<T, arrow::ChunkedArray>{filePath, pkIndex, startOffset},
: NodeCopySharedState<HASH_INDEX_T, arrow::ChunkedArray>{filePath, pkIndex, startOffset},
numBlocks{numBlocks}, parquetReader{std::move(parquetReader)} {};
std::unique_ptr<NodeCopyMorsel<arrow::ChunkedArray>> getMorsel() override;

Expand All @@ -137,39 +133,39 @@ class NodeCopier : public TableCopier {

void saveToFile() override;

template<typename T>
template<typename HASH_INDEX_T>
static void populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::NullMask* nullMask, HashIndexBuilder<T>* pkIndex, common::offset_t startOffset,
uint64_t numValues);
common::NullMask* nullMask, HashIndexBuilder<HASH_INDEX_T>* pkIndex,
common::offset_t startOffset, uint64_t numValues);

std::unordered_map<common::property_id_t, std::unique_ptr<NodeInMemColumn>> columns;

private:
template<typename T>
template<typename HASH_INDEX_T>
void populateColumns(processor::ExecutionContext* executionContext);

template<typename T>
template<typename HASH_INDEX_T>
void populateColumnsFromCSV(processor::ExecutionContext* executionContext,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);
std::unique_ptr<HashIndexBuilder<HASH_INDEX_T>>& pkIndex);

template<typename T>
template<typename HASH_INDEX_T>
void populateColumnsFromParquet(processor::ExecutionContext* executionContext,
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);
std::unique_ptr<HashIndexBuilder<HASH_INDEX_T>>& pkIndex);

template<typename T>
template<typename MORSEL_T>
static void putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeInMemColumn* column,
std::shared_ptr<T> arrowArray, common::offset_t startNodeOffset,
std::shared_ptr<MORSEL_T> arrowArray, common::offset_t startNodeOffset,
uint64_t numLinesInCurBlock, common::CopyDescription& copyDescription,
PageByteCursor& overflowCursor);

// Concurrent tasks.
template<typename T1, typename T2>
static void batchPopulateColumnsTask(NodeCopySharedState<T1, T2>* sharedState,
template<typename HASH_INDEX_T, typename MORSEL_T>
static void batchPopulateColumnsTask(NodeCopySharedState<HASH_INDEX_T, MORSEL_T>* sharedState,
NodeCopier* copier, processor::ExecutionContext* executionContext);

template<typename T>
template<typename HASH_INDEX_T>
static void appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::offset_t offset, HashIndexBuilder<T>* pkIndex) {
common::offset_t offset, HashIndexBuilder<HASH_INDEX_T>* pkIndex) {
assert(false);
}

Expand Down
54 changes: 28 additions & 26 deletions src/storage/copier/node_copier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ using namespace kuzu::common;
namespace kuzu {
namespace storage {

template<typename T>
std::unique_ptr<NodeCopyMorsel<arrow::Array>> CSVNodeCopySharedState<T>::getMorsel() {
template<typename MORSEL_T>
std::unique_ptr<NodeCopyMorsel<arrow::Array>> CSVNodeCopySharedState<MORSEL_T>::getMorsel() {
lock_t lck{this->mtx};
std::shared_ptr<arrow::RecordBatch> recordBatch;
auto result = csvStreamingReader->ReadNext(&recordBatch);
Expand All @@ -30,8 +30,9 @@ std::unique_ptr<NodeCopyMorsel<arrow::Array>> CSVNodeCopySharedState<T>::getMors
std::move(recordBatch), this->startOffset - numRows, this->blockIdx - 1);
}

template<typename T>
std::unique_ptr<NodeCopyMorsel<arrow::ChunkedArray>> ParquetNodeCopySharedState<T>::getMorsel() {
template<typename MORSEL_T>
std::unique_ptr<NodeCopyMorsel<arrow::ChunkedArray>>
ParquetNodeCopySharedState<MORSEL_T>::getMorsel() {
lock_t lck{this->mtx};
std::shared_ptr<arrow::Table> currTable;
if (this->blockIdx == numBlocks) {
Expand Down Expand Up @@ -92,20 +93,20 @@ void NodeCopier::saveToFile() {
logger->debug("Done writing node columns to disk.");
}

template<typename T>
template<typename HASH_INDEX_T>
void NodeCopier::populateColumns(processor::ExecutionContext* executionContext) {
logger->info("Populating properties");
auto pkIndex =
std::make_unique<HashIndexBuilder<T>>(StorageUtils::getNodeIndexFName(this->outputDirectory,
tableSchema->tableID, DBFileType::WAL_VERSION),
reinterpret_cast<NodeTableSchema*>(tableSchema)->getPrimaryKey().dataType);
auto pkIndex = std::make_unique<HashIndexBuilder<HASH_INDEX_T>>(
StorageUtils::getNodeIndexFName(
this->outputDirectory, tableSchema->tableID, DBFileType::WAL_VERSION),
reinterpret_cast<NodeTableSchema*>(tableSchema)->getPrimaryKey().dataType);
pkIndex->bulkReserve(numRows);
switch (copyDescription.fileType) {
case CopyDescription::FileType::CSV:
populateColumnsFromCSV<T>(executionContext, pkIndex);
populateColumnsFromCSV<HASH_INDEX_T>(executionContext, pkIndex);
break;
case CopyDescription::FileType::PARQUET:
populateColumnsFromParquet<T>(executionContext, pkIndex);
populateColumnsFromParquet<HASH_INDEX_T>(executionContext, pkIndex);
break;
default: {
throw CopyException(StringUtils::string_format("Unsupported file type {}.",
Expand All @@ -117,39 +118,40 @@ void NodeCopier::populateColumns(processor::ExecutionContext* executionContext)
logger->info("Done populating properties, constructing the pk index.");
}

template<typename T>
void NodeCopier::populateColumnsFromCSV(
processor::ExecutionContext* executionContext, std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
template<typename HASH_INDEX_T>
void NodeCopier::populateColumnsFromCSV(processor::ExecutionContext* executionContext,
std::unique_ptr<HashIndexBuilder<HASH_INDEX_T>>& pkIndex) {
for (auto& filePath : copyDescription.filePaths) {
std::shared_ptr<arrow::csv::StreamingReader> csvStreamingReader = initCSVReader(filePath);
CSVNodeCopySharedState sharedState{
filePath, pkIndex.get(), fileBlockInfos.at(filePath).startOffset, csvStreamingReader};
taskScheduler.scheduleTaskAndWaitOrError(
CopyTaskFactory::createParallelCopyTask(executionContext->numThreads,
batchPopulateColumnsTask<T, arrow::Array>, &sharedState, this, executionContext),
batchPopulateColumnsTask<HASH_INDEX_T, arrow::Array>, &sharedState, this,
executionContext),
executionContext);
}
}

template<typename T>
void NodeCopier::populateColumnsFromParquet(
processor::ExecutionContext* executionContext, std::unique_ptr<HashIndexBuilder<T>>& pkIndex) {
template<typename HASH_INDEX_T>
void NodeCopier::populateColumnsFromParquet(processor::ExecutionContext* executionContext,
std::unique_ptr<HashIndexBuilder<HASH_INDEX_T>>& pkIndex) {
for (auto& filePath : copyDescription.filePaths) {
std::unique_ptr<parquet::arrow::FileReader> parquetReader = initParquetReader(filePath);
ParquetNodeCopySharedState sharedState{filePath, pkIndex.get(),
fileBlockInfos.at(filePath).startOffset, fileBlockInfos.at(filePath).numBlocks,
std::move(parquetReader)};
taskScheduler.scheduleTaskAndWaitOrError(
CopyTaskFactory::createParallelCopyTask(executionContext->numThreads,
batchPopulateColumnsTask<T, arrow::ChunkedArray>, &sharedState, this,
batchPopulateColumnsTask<HASH_INDEX_T, arrow::ChunkedArray>, &sharedState, this,
executionContext),
executionContext);
}
}

template<typename T>
template<typename HASH_INDEX_T>
void NodeCopier::populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::NullMask* nullMask, HashIndexBuilder<T>* pkIndex, offset_t startOffset,
common::NullMask* nullMask, HashIndexBuilder<HASH_INDEX_T>* pkIndex, offset_t startOffset,
uint64_t numValues) {
for (auto i = 0u; i < numValues; i++) {
auto offset = i + startOffset;
Expand All @@ -160,8 +162,8 @@ void NodeCopier::populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* ove
}
}

template<typename T1, typename T2>
void NodeCopier::batchPopulateColumnsTask(NodeCopySharedState<T1, T2>* sharedState,
template<typename HASH_INDEX_T, typename MORSEL_T>
void NodeCopier::batchPopulateColumnsTask(NodeCopySharedState<HASH_INDEX_T, MORSEL_T>* sharedState,
NodeCopier* copier, processor::ExecutionContext* executionContext) {
while (true) {
if (executionContext->clientContext->isInterrupted()) {
Expand Down Expand Up @@ -198,10 +200,10 @@ void NodeCopier::batchPopulateColumnsTask(NodeCopySharedState<T1, T2>* sharedSta
}
}

template<typename T>
template<typename MORSEL_T>
void NodeCopier::putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeInMemColumn* column,
std::shared_ptr<T> arrowArray, common::offset_t startNodeOffset, uint64_t numLinesInCurBlock,
CopyDescription& copyDescription, PageByteCursor& overflowCursor) {
std::shared_ptr<MORSEL_T> arrowArray, common::offset_t startNodeOffset,
uint64_t numLinesInCurBlock, CopyDescription& copyDescription, PageByteCursor& overflowCursor) {
auto setElementFunc =
getSetElementFunc(column->getDataType().typeID, copyDescription, overflowCursor);
for (auto i = 0u; i < numLinesInCurBlock; i++) {
Expand Down

0 comments on commit f2264ff

Please sign in to comment.