Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework arrow array copy in node table copier #1469

Merged
merged 1 commit into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ std::unique_ptr<DataType> DataType::copy() {
}

ExtraTypeInfo* DataType::getExtraTypeInfo() const {
assert(typeID == VAR_LIST || typeID == FIXED_LIST || typeID == STRUCT);
return extraTypeInfo.get();
}

Expand Down
162 changes: 39 additions & 123 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
@@ -1,114 +1,79 @@
#pragma once

#include "storage/in_mem_storage_structure/node_in_mem_column.h"
#include "storage/in_mem_storage_structure/in_mem_node_column.h"
#include "storage/index/hash_index_builder.h"
#include "storage/store/nodes_statistics_and_deleted_ids.h"
#include "table_copier.h"

namespace kuzu {
namespace storage {

using set_element_func_t = std::function<void(NodeInMemColumn* column,
InMemColumnChunk* columnChunk, common::offset_t nodeOffset, const std::string& data)>;

template<typename MORSEL_T>
class NodeCopyMorsel {

public:
static constexpr common::block_idx_t INVALID_BLOCK_IDX = -1ull;
static constexpr common::block_idx_t INVALID_BLOCK_IDX = UINT64_MAX;

public:
NodeCopyMorsel(common::offset_t startOffset, common::block_idx_t blockIdx)
: startOffset{startOffset}, blockIdx{blockIdx} {};
NodeCopyMorsel(common::offset_t startOffset, common::block_idx_t blockIdx,
std::shared_ptr<arrow::RecordBatch> recordBatch)
: startOffset{startOffset}, blockIdx{blockIdx}, recordBatch{std::move(recordBatch)} {};

virtual ~NodeCopyMorsel() = default;

virtual const std::vector<std::shared_ptr<MORSEL_T>>& getArrowColumns() = 0;

bool success() { return blockIdx != INVALID_BLOCK_IDX; }
bool success() const { return blockIdx != INVALID_BLOCK_IDX; }

public:
common::offset_t startOffset;
common::block_idx_t blockIdx;
};

class CSVNodeCopyMorsel : public NodeCopyMorsel<arrow::Array> {

public:
CSVNodeCopyMorsel(std::shared_ptr<arrow::RecordBatch> recordBatch, common::offset_t startOffset,
common::block_idx_t blockIdx)
: NodeCopyMorsel{startOffset, blockIdx}, recordBatch{std::move(recordBatch)} {};

const std::vector<std::shared_ptr<arrow::Array>>& getArrowColumns() override {
return recordBatch->columns();
}

private:
std::shared_ptr<arrow::RecordBatch> recordBatch;
};

class ParquetNodeCopyMorsel : public NodeCopyMorsel<arrow::ChunkedArray> {

public:
ParquetNodeCopyMorsel(std::shared_ptr<arrow::Table> currTable, common::offset_t startOffset,
common::block_idx_t blockIdx)
: NodeCopyMorsel{startOffset, blockIdx}, currTable{std::move(currTable)} {};

const std::vector<std::shared_ptr<arrow::ChunkedArray>>& getArrowColumns() override {
return currTable->columns();
}

private:
std::shared_ptr<arrow::Table> currTable;
};

template<typename HASH_INDEX_T, typename MORSEL_T>
template<typename T>
class NodeCopySharedState {

public:
NodeCopySharedState(
std::string filePath, HashIndexBuilder<HASH_INDEX_T>* pkIndex, common::offset_t startOffset)
std::string filePath, HashIndexBuilder<T>* pkIndex, common::offset_t startOffset)
: filePath{std::move(filePath)}, pkIndex{pkIndex}, startOffset{startOffset}, blockIdx{0} {};

virtual ~NodeCopySharedState() = default;

virtual std::unique_ptr<NodeCopyMorsel<MORSEL_T>> getMorsel() = 0;
virtual std::unique_ptr<NodeCopyMorsel> getMorsel() = 0;

public:
std::string filePath;
HashIndexBuilder<HASH_INDEX_T>* pkIndex;
HashIndexBuilder<T>* pkIndex;
common::offset_t startOffset;

protected:
common::block_idx_t blockIdx;
std::mutex mtx;
};

template<typename HASH_INDEX_T>
class CSVNodeCopySharedState : public NodeCopySharedState<HASH_INDEX_T, arrow::Array> {
template<typename T>
class CSVNodeCopySharedState : public NodeCopySharedState<T> {

public:
CSVNodeCopySharedState(std::string filePath, HashIndexBuilder<HASH_INDEX_T>* pkIndex,
CSVNodeCopySharedState(std::string filePath, HashIndexBuilder<T>* pkIndex,
common::offset_t startOffset,
std::shared_ptr<arrow::csv::StreamingReader> csvStreamingReader)
: NodeCopySharedState<HASH_INDEX_T, arrow::Array>{filePath, pkIndex, startOffset},
csvStreamingReader{std::move(csvStreamingReader)} {};
std::unique_ptr<NodeCopyMorsel<arrow::Array>> getMorsel() override;
: NodeCopySharedState<T>{filePath, pkIndex, startOffset}, csvStreamingReader{std::move(
csvStreamingReader)} {};
std::unique_ptr<NodeCopyMorsel> getMorsel() override;

private:
std::shared_ptr<arrow::csv::StreamingReader> csvStreamingReader;
};

template<typename HASH_INDEX_T>
class ParquetNodeCopySharedState : public NodeCopySharedState<HASH_INDEX_T, arrow::ChunkedArray> {
template<typename T>
class ParquetNodeCopySharedState : public NodeCopySharedState<T> {

public:
ParquetNodeCopySharedState(std::string filePath, HashIndexBuilder<HASH_INDEX_T>* pkIndex,
ParquetNodeCopySharedState(std::string filePath, HashIndexBuilder<T>* pkIndex,
common::offset_t startOffset, uint64_t numBlocks,
std::unique_ptr<parquet::arrow::FileReader> parquetReader)
: NodeCopySharedState<HASH_INDEX_T, arrow::ChunkedArray>{filePath, pkIndex, startOffset},
numBlocks{numBlocks}, parquetReader{std::move(parquetReader)} {};
std::unique_ptr<NodeCopyMorsel<arrow::ChunkedArray>> getMorsel() override;
: NodeCopySharedState<T>{filePath, pkIndex, startOffset}, numBlocks{numBlocks},
parquetReader{std::move(parquetReader)} {};
std::unique_ptr<NodeCopyMorsel> getMorsel() override;

public:
uint64_t numBlocks;
Expand All @@ -133,87 +98,38 @@ class NodeCopier : public TableCopier {

void saveToFile() override;

template<typename HASH_INDEX_T>
template<typename T>
static void populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::NullMask* nullMask, HashIndexBuilder<HASH_INDEX_T>* pkIndex,
common::offset_t startOffset, uint64_t numValues);
common::NullMask* nullMask, HashIndexBuilder<T>* pkIndex, common::offset_t startOffset,
uint64_t numValues);

std::unordered_map<common::property_id_t, std::unique_ptr<NodeInMemColumn>> columns;
std::unordered_map<common::property_id_t, std::unique_ptr<InMemNodeColumn>> columns;

private:
template<typename HASH_INDEX_T>
template<typename T>
void populateColumns(processor::ExecutionContext* executionContext);

template<typename HASH_INDEX_T>
template<typename T>
void populateColumnsFromCSV(processor::ExecutionContext* executionContext,
std::unique_ptr<HashIndexBuilder<HASH_INDEX_T>>& pkIndex);
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename HASH_INDEX_T>
template<typename T>
void populateColumnsFromParquet(processor::ExecutionContext* executionContext,
std::unique_ptr<HashIndexBuilder<HASH_INDEX_T>>& pkIndex);
std::unique_ptr<HashIndexBuilder<T>>& pkIndex);

template<typename MORSEL_T>
static void putPropsOfLinesIntoColumns(InMemColumnChunk* columnChunk, NodeInMemColumn* column,
std::shared_ptr<MORSEL_T> arrowArray, common::offset_t startNodeOffset,
uint64_t numLinesInCurBlock, common::CopyDescription& copyDescription,
PageByteCursor& overflowCursor);
static void copyArrayIntoColumnChunk(InMemColumnChunk* columnChunk, InMemNodeColumn* column,
arrow::Array& arrowArray, common::offset_t startNodeOffset,
common::CopyDescription& copyDescription, PageByteCursor& overflowCursor);

// Concurrent tasks.
template<typename HASH_INDEX_T, typename MORSEL_T>
static void batchPopulateColumnsTask(NodeCopySharedState<HASH_INDEX_T, MORSEL_T>* sharedState,
NodeCopier* copier, processor::ExecutionContext* executionContext);

template<typename HASH_INDEX_T>
static void appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::offset_t offset, HashIndexBuilder<HASH_INDEX_T>* pkIndex) {
assert(false);
}

static set_element_func_t getSetElementFunc(common::DataTypeID typeID,
common::CopyDescription& copyDescription, PageByteCursor& pageByteCursor);

template<typename T>
inline static void setNumericElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data) {
auto val = common::TypeUtils::convertStringToNumber<T>(data.c_str());
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<const uint8_t*>(&val));
}

inline static void setBoolElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data) {
auto val = common::TypeUtils::convertToBoolean(data.c_str());
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<const uint8_t*>(&val));
}
static void populateColumnChunksTask(NodeCopySharedState<T>* sharedState, NodeCopier* copier,
processor::ExecutionContext* executionContext, spdlog::logger& logger);

template<typename T>
inline static void setTimeElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data) {
auto val = T::FromCString(data.c_str(), data.length());
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<const uint8_t*>(&val));
}

inline static void setStringElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data, PageByteCursor& overflowCursor) {
auto val = column->getInMemOverflowFile()->copyString(
data.substr(0, common::BufferPoolConstants::PAGE_4KB_SIZE).c_str(), overflowCursor);
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&val));
}

inline static void setVarListElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data,
common::CopyDescription& copyDescription, PageByteCursor& overflowCursor) {
auto varListVal =
getArrowVarList(data, 1, data.length() - 2, column->getDataType(), copyDescription);
auto kuList = column->getInMemOverflowFile()->copyList(*varListVal, overflowCursor);
column->setElementInChunk(columnChunk, nodeOffset, reinterpret_cast<uint8_t*>(&kuList));
}

inline static void setFixedListElement(NodeInMemColumn* column, InMemColumnChunk* columnChunk,
common::offset_t nodeOffset, const std::string& data,
common::CopyDescription& copyDescription) {
auto fixedListVal =
getArrowFixedList(data, 1, data.length() - 2, column->getDataType(), copyDescription);
column->setElementInChunk(columnChunk, nodeOffset, fixedListVal.get());
static void appendPKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::offset_t offset, HashIndexBuilder<T>* pkIndex) {
assert(false);
}
};

Expand Down
23 changes: 8 additions & 15 deletions src/include/storage/copier/table_copier.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ class TableCopier {

uint64_t copy(processor::ExecutionContext* executionContext);

static void throwCopyExceptionIfNotOK(const arrow::Status& status);

static std::unique_ptr<common::Value> getArrowVarList(const std::string& l, int64_t from,
int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription);

static std::unique_ptr<uint8_t[]> getArrowFixedList(const std::string& l, int64_t from,
int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription);

protected:
virtual void initializeColumnsAndLists() = 0;

Expand All @@ -64,24 +72,9 @@ class TableCopier {
static std::vector<std::pair<int64_t, int64_t>> getListElementPos(
const std::string& l, int64_t from, int64_t to, common::CopyDescription& copyDescription);

static std::unique_ptr<common::Value> getArrowVarList(const std::string& l, int64_t from,
int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription);

static std::unique_ptr<uint8_t[]> getArrowFixedList(const std::string& l, int64_t from,
int64_t to, const common::DataType& dataType, common::CopyDescription& copyDescription);

static void throwCopyExceptionIfNotOK(const arrow::Status& status);

inline void updateTableStatistics() {
tablesStatistics->setNumTuplesForTable(tableSchema->tableID, numRows);
}
inline uint64_t getNumBlocks() const {
uint64_t numBlocks = 0;
for (auto& [_, info] : fileBlockInfos) {
numBlocks += info.numBlocks;
}
return numBlocks;
}

static std::shared_ptr<arrow::DataType> toArrowDataType(const common::DataType& dataType);

Expand Down
Loading