Skip to content

Commit

Permalink
Merge pull request #1469 from kuzudb/copy-rework
Browse files Browse the repository at this point in the history
Rework arrow array copy in node table copier
  • Loading branch information
ray6080 committed Apr 17, 2023
2 parents b45d97a + aca7696 commit 6fc38d0
Show file tree
Hide file tree
Showing 15 changed files with 565 additions and 413 deletions.
1 change: 0 additions & 1 deletion src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ std::unique_ptr<DataType> DataType::copy() {
}

ExtraTypeInfo* DataType::getExtraTypeInfo() const {
assert(typeID == VAR_LIST || typeID == FIXED_LIST || typeID == STRUCT);
return extraTypeInfo.get();
}

Expand Down
162 changes: 39 additions & 123 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
@@ -1,114 +1,79 @@
#pragma once

#include "storage/in_mem_storage_structure/node_in_mem_column.h"
#include "storage/in_mem_storage_structure/in_mem_node_column.h"
#include "storage/index/hash_index_builder.h"
#include "storage/store/nodes_statistics_and_deleted_ids.h"
#include "table_copier.h"

namespace kuzu {
namespace storage {

using set_element_func_t = std::function<void(NodeInMemColumn* column,
InMemColumnChunk* columnChunk, common::offset_t nodeOffset, const std::string& data)>;

template<typename MORSEL_T>
class NodeCopyMorsel {

public:
static constexpr common::block_idx_t INVALID_BLOCK_IDX = -1ull;
static constexpr common::block_idx_t INVALID_BLOCK_IDX = UINT64_MAX;

public:
NodeCopyMorsel(common::offset_t startOffset, common::block_idx_t blockIdx)
: startOffset{startOffset}, blockIdx{blockIdx} {};
NodeCopyMorsel(common::offset_t startOffset, common::block_idx_t blockIdx,
std::shared_ptr<arrow::RecordBatch> recordBatch)
: startOffset{startOffset}, blockIdx{blockIdx}, recordBatch{std::move(recordBatch)} {};

virtual ~NodeCopyMorsel() = default;

virtual const std::vector<std::shared_ptr<MORSEL_T>>& getArrowColumns() = 0;

bool success() { return blockIdx != INVALID_BLOCK_IDX; }
bool success() const { return blockIdx != INVALID_BLOCK_IDX; }

public:
common::offset_t startOffset;
common::block_idx_t blockIdx;
};

class CSVNodeCopyMorsel : public NodeCopyMorsel<arrow::Array> {

public:
CSVNodeCopyMorsel(std::shared_ptr<arrow::RecordBatch> recordBatch, common::offset_t startOffset,
common::block_idx_t blockIdx)
: NodeCopyMorsel{startOffset, blockIdx}, recordBatch{std::move(recordBatch)} {};

const std::vector<std::shared_ptr<arrow::Array>>& getArrowColumns() override {
return recordBatch->columns();
}

private:
std::shared_ptr<arrow::RecordBatch> recordBatch;
};

class ParquetNodeCopyMorsel : public NodeCopyMorsel<arrow::ChunkedArray> {

public:
ParquetNodeCopyMorsel(std::shared_ptr<arrow::Table> currTable, common::offset_t startOffset,
common::block_idx_t blockIdx)
: NodeCopyMorsel{startOffset, blockIdx}, currTable{std::move(currTable)} {};

const std::vector<std::shared_ptr<arrow::ChunkedArray>>& getArrowColumns() override {
return currTable->columns();
}

private:
std::shared_ptr<arrow::Table> currTable;
};

template<typename HASH_INDEX_T, typename MORSEL_T>
template<typename T>
class NodeCopySharedState {

public:
NodeCopySharedState(
std::string filePath, HashIndexBuilder<HASH_INDEX_T>* pkIndex, common::offset_t startOffset)
std::string filePath, HashIndexBuilder<T>* pkIndex, common::offset_t startOffset)
: filePath{std::move(filePath)}, pkIndex{pkIndex}, startOffset{startOffset}, blockIdx{0} {};

virtual ~NodeCopySharedState() = default;

virtual std::unique_ptr<NodeCopyMorsel<MORSEL_T>> getMorsel() = 0;
virtual std::unique_ptr<NodeCopyMorsel> getMorsel() = 0;

public:
std::string filePath;
HashIndexBuilder<HASH_INDEX_T>* pkIndex;
HashIndexBuilder<T>* pkIndex;
common::offset_t startOffset;

protected:
common::block_idx_t blockIdx;
std::mutex mtx;
};

template<typename HASH_INDEX_T>
class CSVNodeCopySharedState : public NodeCopySharedState<HASH_INDEX_T, arrow::Array> {
template<typename T>
class CSVNodeCopySharedState : public NodeCopySharedState<T> {

public:
CSVNodeCopySharedState(std::string filePath, HashIndexBuilder<HASH_INDEX_T>* pkIndex,
CSVNodeCopySharedState(std::string filePath, HashIndexBuilder<T>* pkIndex,
common::offset_t startOffset,
std::shared_ptr<arrow::csv::StreamingReader> csvStreamingReader)
: NodeCopySharedState<HASH_INDEX_T, arrow::Array>{filePath, pkIndex, startOffset},
csvStreamingReader{std::move(csvStreamingReader)} {};
std::unique_ptr<NodeCopyMorsel<arrow::Array>> getMorsel() override;
: NodeCopySharedState<T>{filePath, pkIndex, startOffset}, csvStreamingReader{std::move(
csvStreamingReader)} {};
std::unique_ptr<NodeCopyMorsel> getMorsel() override;

private:
std::shared_ptr<arrow::csv::StreamingReader> csvStreamingReader;
};

template<typename HASH_INDEX_T>
class ParquetNodeCopySharedState : public NodeCopySharedState<HASH_INDEX_T, arrow::ChunkedArray> {
template<typename T>
class ParquetNodeCopySharedState : public NodeCopySharedState<T> {

public:
ParquetNodeCopySharedState(std::string filePath, HashIndexBuilder<HASH_INDEX_T>* pkIndex,
ParquetNodeCopySharedState(std::string filePath, HashIndexBuilder<T>* pkIndex,
common::offset_t startOffset, uint64_t numBlocks,
std::unique_ptr<parquet::arrow::FileReader> parquetReader)
: NodeCopySharedState<HASH_INDEX_T, arrow::ChunkedArray>{filePath, pkIndex, startOffset},
numBlocks{numBlocks}, parquetReader{std::move(parquetReader)} {};
std::unique_ptr<NodeCopyMorsel<arrow::ChunkedArray>> getMorsel() override;
: NodeCopySharedState<T>{filePath, pkIndex, startOffset}, numBlocks{numBlocks},
parquetReader{std::move(parquetReader)} {};
std::unique_ptr<NodeCopyMorsel> getMorsel() override;

public:
uint64_t numBlocks;
Expand All @@ -133,87 +98,38 @@ class NodeCopier : public TableCopier {

void saveToFile() override;

template<typename HASH_INDEX_T>
template<typename T>
static void populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::NullMask* nullMask, HashIndexBuilder<HASH_INDEX_T>* pkIndex,
common::offset_t startOffset, uint64_t numValues);
common::NullMask* nullMask, HashIndexBuilder<T>* pkIndex, common::offset_t startOffset,
uint64_t numValues);

std::unordered_map<common::property_id_t, std::unique_ptr<NodeInMemColumn>> columns;
std::unordered_map<common::property_id_t, std::unique_ptr<InMemNodeColumn>> columns;

private:
template<typename HASH_INDEX_T>
template<typename T>
void populateColumns(processor::ExecutionContext* executionContext);

template<typename HASH_INDEX_T>
template<typename T>
void populateColumnsFromCSV(processor::ExecutionContext* executionContext,
std::unique_ptr<HashIndexBuilder<HASH_INDEX_T>>& pkIndex);
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename HASH_INDEX_T>
template<typename T>
void populateColumnsFromParquet(processor::ExecutionContext* executionContext,
std::unique_ptr<HashIndexBuilder<HASH_INDEX_T>>& pkIndex);
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename MORSEL_T>
static void putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeInMemColumn* column,
std::shared_ptr<MORSEL_T> arrowArray, common::offset_t startNodeOffset,
uint64_t numLinesInCurBlock, common::CopyDescription& copyDescription,
PageByteCursor& overflowCursor);
static void copyArrayIntoColumnChunk(InMemColumnChunk* columnChunk, InMemNodeColumn* column,
arrow::Array& arrowArray, common::offset_t startNodeOffset,
common::CopyDescription& copyDescription, PageByteCursor& overflowCursor);

// Concurrent tasks.
template<typename HASH_INDEX_T, typename MORSEL_T>
static void batchPopulateColumnsTask(NodeCopySharedState<HASH_INDEX_T, MORSEL_T>* sharedState,
NodeCopier* copier, processor::ExecutionContext* executionContext);

template<typename HASH_INDEX_T>
static void appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::offset_t offset, HashIndexBuilder<HASH_INDEX_T>* pkIndex) {
assert(false);
}

static set_element_func_t getSetElementFunc(common::DataTypeID typeID,
common::CopyDescription& copyDescription, PageByteCursor& pageByteCursor);

template<typename T>
inline static void setNumericElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data) {
auto val = common::TypeUtils::convertStringToNumber<T>(data.c_str());
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<const uint8_t*>(&val));
}

inline static void setBoolElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data) {
auto val = common::TypeUtils::convertToBoolean(data.c_str());
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<const uint8_t*>(&val));
}
static void populateColumnChunksTask(NodeCopySharedState<T>* sharedState, NodeCopier* copier,
processor::ExecutionContext* executionContext, spdlog::logger& logger);

template<typename T>
inline static void setTimeElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data) {
auto val = T::FromCString(data.c_str(), data.length());
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<const uint8_t*>(&val));
}

inline static void setStringElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data, PageByteCursor& overflowCursor) {
auto val = column->getInMemOverflowFile()->copyString(
data.substr(0, common::BufferPoolConstants::PAGE_4KB_SIZE).c_str(), overflowCursor);
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
}

inline static void setVarListElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data,
common::CopyDescription& copyDescription, PageByteCursor& overflowCursor) {
auto varListVal =
getArrowVarList(data, 1, data.length() - 2, column->getDataType(), copyDescription);
auto kuList = column->getInMemOverflowFile()->copyList(*varListVal, overflowCursor);
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&kuList));
}

inline static void setFixedListElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data,
common::CopyDescription& copyDescription) {
auto fixedListVal =
getArrowFixedList(data, 1, data.length() - 2, column->getDataType(), copyDescription);
column->setElementInChunk(columnChunk, nodeOffset, fixedListVal.get());
static void appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::offset_t offset, HashIndexBuilder<T>* pkIndex) {
assert(false);
}
};

Expand Down
23 changes: 8 additions & 15 deletions src/include/storage/copier/table_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ class TableCopier {

uint64_t copy(processor::ExecutionContext* executionContext);

static void throwCopyExceptionIfNotOK(const arrow::Status& status);

static std::unique_ptr<common::Value> getArrowVarList(const std::string& l, int64_t from,
int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription);

static std::unique_ptr<uint8_t[]> getArrowFixedList(const std::string& l, int64_t from,
int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription);

protected:
virtual void initializeColumnsAndLists() = 0;

Expand All @@ -64,24 +72,9 @@ class TableCopier {
static std::vector<std::pair<int64_t, int64_t>> getListElementPos(
const std::string& l, int64_t from, int64_t to, common::CopyDescription& copyDescription);

static std::unique_ptr<common::Value> getArrowVarList(const std::string& l, int64_t from,
int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription);

static std::unique_ptr<uint8_t[]> getArrowFixedList(const std::string& l, int64_t from,
int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription);

static void throwCopyExceptionIfNotOK(const arrow::Status& status);

inline void updateTableStatistics() {
tablesStatistics->setNumTuplesForTable(tableSchema->tableID, numRows);
}
inline uint64_t getNumBlocks() const {
uint64_t numBlocks = 0;
for (auto& [_, info] : fileBlockInfos) {
numBlocks += info.numBlocks;
}
return numBlocks;
}

static std::shared_ptr<arrow::DataType> toArrowDataType(const common::DataType& dataType);

Expand Down
Loading

0 comments on commit 6fc38d0

Please sign in to comment.