Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework rel copy task scheduling and improve copy performance #1621

Merged
merged 1 commit into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.3.5 LANGUAGES CXX)
project(Kuzu VERSION 0.0.4 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ using field_idx_t = uint64_t;
using struct_entry_t = int64_t;
using struct_field_idx_t = uint64_t;
constexpr struct_field_idx_t INVALID_STRUCT_FIELD_IDX = UINT64_MAX;
using tuple_idx_t = uint64_t;

// System representation for a variable-sized overflow value.
struct overflow_value_t {
Expand Down
1 change: 1 addition & 0 deletions src/include/processor/result/factorized_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
namespace kuzu {
namespace processor {

// TODO(Guodong/Ziyi): Move these typedef to common and unify them with the ones without `ft_`.
typedef uint64_t ft_tuple_idx_t;
typedef uint32_t ft_col_idx_t;
typedef uint32_t ft_col_offset_t;
Expand Down
204 changes: 62 additions & 142 deletions src/include/storage/copier/node_copier.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "storage/copier/npy_reader.h"
#include "storage/copier/table_copy_executor.h"
#include "storage/copier/table_copy_utils.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/index/hash_index_builder.h"
#include "storage/store/node_table.h"
Expand All @@ -20,133 +20,63 @@
namespace kuzu {
namespace storage {

class NodeCopyMorsel {
public:
static constexpr common::block_idx_t BLOCK_IDX_INVALID = UINT64_MAX;

NodeCopyMorsel(common::offset_t nodeOffset, common::block_idx_t blockIdx, uint64_t numNodes,
std::string filePath)
: nodeOffset{nodeOffset}, blockIdx{blockIdx}, numNodes{numNodes}, filePath{std::move(
filePath)} {};
virtual ~NodeCopyMorsel() = default;

public:
common::offset_t nodeOffset;
common::block_idx_t blockIdx;
uint64_t numNodes;
std::string filePath;
};

// For CSV file, we need to read in streaming mode, so we need to read one batch at a time.
class CSVNodeCopyMorsel : public NodeCopyMorsel {
public:
CSVNodeCopyMorsel(common::offset_t startOffset, std::string filePath,
std::shared_ptr<arrow::RecordBatch> recordBatch)
: NodeCopyMorsel{startOffset, BLOCK_IDX_INVALID, UINT64_MAX, std::move(filePath)},
recordBatch{std::move(recordBatch)} {}

std::shared_ptr<arrow::RecordBatch> recordBatch;
};

class NodeCopySharedState {
public:
NodeCopySharedState(std::vector<std::string> filePaths,
std::unordered_map<std::string, TableCopyExecutor::FileBlockInfo> fileBlockInfos)
: filePaths{std::move(filePaths)}, fileIdx{0}, blockIdx{0}, nodeOffset{0},
fileBlockInfos{std::move(fileBlockInfos)} {};
virtual ~NodeCopySharedState() = default;

virtual std::unique_ptr<NodeCopyMorsel> getMorsel();

public:
std::vector<std::string> filePaths;
common::vector_idx_t fileIdx;
common::offset_t nodeOffset;

protected:
std::unordered_map<std::string, TableCopyExecutor::FileBlockInfo> fileBlockInfos;
common::block_idx_t blockIdx;
std::mutex mtx;
};

// For CSV file, we need to read in streaming mode, so we need to keep the reader in the shared
// state.
class CSVNodeCopySharedState : public NodeCopySharedState {
public:
CSVNodeCopySharedState(std::vector<std::string> filePaths,
std::unordered_map<std::string, TableCopyExecutor::FileBlockInfo> fileBlockInfos,
common::CSVReaderConfig* csvReaderConfig, catalog::TableSchema* tableSchema)
: NodeCopySharedState{std::move(filePaths), std::move(fileBlockInfos)},
csvReaderConfig{csvReaderConfig}, tableSchema{tableSchema} {};

std::unique_ptr<NodeCopyMorsel> getMorsel() override;

private:
common::CSVReaderConfig* csvReaderConfig;
catalog::TableSchema* tableSchema;
std::shared_ptr<arrow::csv::StreamingReader> reader;
};

class NodeCopier {
public:
NodeCopier(const std::string& directory, std::shared_ptr<NodeCopySharedState> sharedState,
PrimaryKeyIndexBuilder* pkIndex, const common::CopyDescription& copyDesc,
catalog::TableSchema* schema, NodeTable* table, common::column_id_t pkColumnID);
NodeCopier(const std::string& directory, std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::TableSchema* schema,
common::tuple_idx_t numTuples, common::column_id_t columnToCopy);

// For clone.
NodeCopier(std::shared_ptr<NodeCopySharedState> sharedState, PrimaryKeyIndexBuilder* pkIndex,
const common::CopyDescription& copyDesc, NodeTable* table, common::property_id_t pkColumnID,
std::vector<std::shared_ptr<InMemColumn>> columns,
std::vector<catalog::Property> properties)
: sharedState{std::move(sharedState)}, pkIndex{pkIndex}, copyDesc{copyDesc}, table{table},
pkColumnID{pkColumnID}, columns{std::move(columns)}, properties{std::move(properties)} {}
NodeCopier(std::shared_ptr<CopySharedState> sharedState,
std::shared_ptr<PrimaryKeyIndexBuilder> pkIndex, const common::CopyDescription& copyDesc,
common::property_id_t pkColumnID, std::vector<std::shared_ptr<InMemColumn>> columns,
std::vector<catalog::Property> properties, common::column_id_t columnToCopy)
: sharedState{std::move(sharedState)}, pkIndex{std::move(pkIndex)}, copyDesc{copyDesc},
pkColumnID{pkColumnID}, columns{std::move(columns)}, properties{std::move(properties)},
columnToCopy{columnToCopy} {}

virtual ~NodeCopier() = default;

void execute(processor::ExecutionContext* executionContext);

inline virtual void finalize() {
if (pkIndex) {
pkIndex->flush();
}
for (auto& column : columns) {
column->saveToFile();
}
}
virtual void finalize();

virtual std::unique_ptr<NodeCopier> clone() const {
return std::make_unique<NodeCopier>(
sharedState, pkIndex, copyDesc, table, pkColumnID, columns, properties);
sharedState, pkIndex, copyDesc, pkColumnID, columns, properties, columnToCopy);
}

protected:
virtual void executeInternal(std::unique_ptr<NodeCopyMorsel> morsel) {
throw common::CopyException("executeInternal not implemented");
virtual void executeInternal(std::unique_ptr<CopyMorsel> morsel) {
throw common::CopyException("NodeCopier::executeInternal not implemented");
}

void populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overflowFile,
common::offset_t startOffset, uint64_t numValues);

void flushChunksAndPopulatePKIndex(
const std::vector<std::unique_ptr<InMemColumnChunk>>& columnChunks,
common::offset_t startNodeOffset, common::offset_t endNodeOffset,
common::column_id_t columnToFlush = common::INVALID_COLUMN_ID);
common::offset_t startNodeOffset, common::offset_t endNodeOffset);

template<typename T, typename... Args>
void appendToPKIndex(
InMemColumnChunk* chunk, common::offset_t startOffset, uint64_t numValues, Args... args) {
throw common::CopyException("appendToPKIndex not implemented");
}

private:
void initializeIndex(
const std::string& directory, catalog::TableSchema* schema, common::tuple_idx_t numTuples);

protected:
std::shared_ptr<NodeCopySharedState> sharedState;
PrimaryKeyIndexBuilder* pkIndex;
std::shared_ptr<CopySharedState> sharedState;
std::shared_ptr<PrimaryKeyIndexBuilder> pkIndex;
common::CopyDescription copyDesc;
NodeTable* table;
// The properties to be copied into. Each property corresponds to a column.
std::vector<catalog::Property> properties;
std::vector<std::shared_ptr<InMemColumn>> columns;
common::column_id_t pkColumnID;
common::column_id_t columnToCopy;
};

template<>
Expand All @@ -159,53 +89,52 @@ void NodeCopier::appendToPKIndex<common::ku_string_t, storage::InMemOverflowFile

class CSVNodeCopier : public NodeCopier {
public:
CSVNodeCopier(const std::string& directory, std::shared_ptr<NodeCopySharedState> sharedState,
PrimaryKeyIndexBuilder* pkIndex, const common::CopyDescription& copyDesc,
catalog::TableSchema* schema, NodeTable* table, common::property_id_t pkColumnID)
: NodeCopier{
directory, std::move(sharedState), pkIndex, copyDesc, schema, table, pkColumnID} {}
CSVNodeCopier(const std::string& directory, std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::TableSchema* schema,
common::tuple_idx_t numTuples, common::column_id_t columnToCopy)
: NodeCopier{directory, std::move(sharedState), copyDesc, schema, numTuples, columnToCopy} {
}

// For clone.
CSVNodeCopier(std::shared_ptr<NodeCopySharedState> sharedState, PrimaryKeyIndexBuilder* pkIndex,
const common::CopyDescription& copyDesc, NodeTable* table, common::column_id_t pkColumnID,
std::vector<std::shared_ptr<InMemColumn>> columns,
std::vector<catalog::Property> properties)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, table, pkColumnID,
std::move(columns), std::move(properties)} {}
CSVNodeCopier(std::shared_ptr<CopySharedState> sharedState,
std::shared_ptr<PrimaryKeyIndexBuilder> pkIndex, const common::CopyDescription& copyDesc,
common::column_id_t pkColumnID, std::vector<std::shared_ptr<InMemColumn>> columns,
std::vector<catalog::Property> properties, common::column_id_t columnToCopy)
: NodeCopier{std::move(sharedState), std::move(pkIndex), copyDesc, pkColumnID,
std::move(columns), std::move(properties), columnToCopy} {}

inline std::unique_ptr<NodeCopier> clone() const override {
return std::make_unique<CSVNodeCopier>(
sharedState, pkIndex, copyDesc, table, pkColumnID, columns, properties);
sharedState, pkIndex, copyDesc, pkColumnID, columns, properties, columnToCopy);
}

protected:
void executeInternal(std::unique_ptr<NodeCopyMorsel> morsel) override;
void executeInternal(std::unique_ptr<CopyMorsel> morsel) override;
};

class ParquetNodeCopier : public NodeCopier {
public:
ParquetNodeCopier(const std::string& directory,
std::shared_ptr<NodeCopySharedState> sharedState, PrimaryKeyIndexBuilder* pkIndex,
const common::CopyDescription& copyDesc, catalog::TableSchema* schema, NodeTable* table,
common::column_id_t pkColumnID)
: NodeCopier{
directory, std::move(sharedState), pkIndex, copyDesc, schema, table, pkColumnID} {}
ParquetNodeCopier(const std::string& directory, std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::TableSchema* schema,
common::tuple_idx_t numTuples, common::column_id_t columnToCopy)
: NodeCopier{directory, std::move(sharedState), copyDesc, schema, numTuples, columnToCopy} {
}

// Clone.
ParquetNodeCopier(std::shared_ptr<NodeCopySharedState> sharedState,
PrimaryKeyIndexBuilder* pkIndex, const common::CopyDescription& copyDesc, NodeTable* table,
ParquetNodeCopier(std::shared_ptr<CopySharedState> sharedState,
std::shared_ptr<PrimaryKeyIndexBuilder> pkIndex, const common::CopyDescription& copyDesc,
common::column_id_t pkColumnID, std::vector<std::shared_ptr<InMemColumn>> columns,
std::vector<catalog::Property> properties)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, table, pkColumnID,
std::move(columns), std::move(properties)} {}
std::vector<catalog::Property> properties, common::column_id_t columnToCopy)
: NodeCopier{std::move(sharedState), std::move(pkIndex), copyDesc, pkColumnID,
std::move(columns), std::move(properties), columnToCopy} {}

inline std::unique_ptr<NodeCopier> clone() const override {
return std::make_unique<ParquetNodeCopier>(
sharedState, pkIndex, copyDesc, table, pkColumnID, columns, properties);
sharedState, pkIndex, copyDesc, pkColumnID, columns, properties, columnToCopy);
}

protected:
void executeInternal(std::unique_ptr<NodeCopyMorsel> morsel) override;
void executeInternal(std::unique_ptr<CopyMorsel> morsel) override;

private:
std::unique_ptr<parquet::arrow::FileReader> reader;
Expand All @@ -214,33 +143,29 @@ class ParquetNodeCopier : public NodeCopier {

class NPYNodeCopier : public NodeCopier {
public:
NPYNodeCopier(const std::string& directory, std::shared_ptr<NodeCopySharedState> sharedState,
PrimaryKeyIndexBuilder* pkIndex, const common::CopyDescription& copyDesc,
catalog::TableSchema* schema, NodeTable* table, common::column_id_t pkColumnID,
common::column_id_t columnToCopy)
: NodeCopier{directory, std::move(sharedState), pkIndex, copyDesc, schema, table,
pkColumnID},
columnToCopy{columnToCopy} {}
NPYNodeCopier(const std::string& directory, std::shared_ptr<CopySharedState> sharedState,
const common::CopyDescription& copyDesc, catalog::TableSchema* schema,
common::tuple_idx_t numTuples, common::column_id_t columnToCopy)
: NodeCopier{directory, std::move(sharedState), copyDesc, schema, numTuples, columnToCopy} {
}

// Clone.
NPYNodeCopier(std::shared_ptr<NodeCopySharedState> sharedState, PrimaryKeyIndexBuilder* pkIndex,
const common::CopyDescription& copyDesc, NodeTable* table, common::column_id_t pkColumnID,
common::column_id_t columnToCopy, std::vector<std::shared_ptr<InMemColumn>> columns,
std::vector<catalog::Property> properties)
: NodeCopier{std::move(sharedState), pkIndex, copyDesc, table, pkColumnID,
std::move(columns), std::move(properties)},
columnToCopy{columnToCopy} {}
NPYNodeCopier(std::shared_ptr<CopySharedState> sharedState,
std::shared_ptr<PrimaryKeyIndexBuilder> pkIndex, const common::CopyDescription& copyDesc,
common::column_id_t pkColumnID, std::vector<std::shared_ptr<InMemColumn>> columns,
std::vector<catalog::Property> properties, common::column_id_t columnToCopy)
: NodeCopier{std::move(sharedState), std::move(pkIndex), copyDesc, pkColumnID,
std::move(columns), std::move(properties), columnToCopy} {}

inline std::unique_ptr<NodeCopier> clone() const override {
return std::make_unique<NPYNodeCopier>(
sharedState, pkIndex, copyDesc, table, pkColumnID, columnToCopy, columns, properties);
sharedState, pkIndex, copyDesc, pkColumnID, columns, properties, columnToCopy);
}

protected:
void executeInternal(std::unique_ptr<NodeCopyMorsel> morsel) override;
void executeInternal(std::unique_ptr<CopyMorsel> morsel) override;

private:
common::column_id_t columnToCopy;
ray6080 marked this conversation as resolved.
Show resolved Hide resolved
std::unique_ptr<NpyReader> reader;
};

Expand All @@ -253,12 +178,7 @@ class NodeCopyTask : public common::Task {
: Task{executionContext->numThreads}, nodeCopier{std::move(nodeCopier)},
executionContext{executionContext} {};

inline void run() override {
mtx.lock();
auto clonedNodeCopier = nodeCopier->clone();
mtx.unlock();
clonedNodeCopier->execute(executionContext);
}
void run() override;
inline void finalizeIfNecessary() override { nodeCopier->finalize(); }

private:
Expand Down
30 changes: 16 additions & 14 deletions src/include/storage/copier/node_copy_executor.h
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@
#pragma once

#include "storage/copier/node_copier.h"
#include "storage/copier/table_copy_utils.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/store/node_table.h"
#include "storage/store/nodes_statistics_and_deleted_ids.h"

namespace kuzu {
namespace storage {

class NodeCopyExecutor : public TableCopyExecutor {
class NodeCopyExecutor {

public:
NodeCopyExecutor(common::CopyDescription& copyDescription, std::string outputDirectory,
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog, storage::NodeTable* table,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs)
: TableCopyExecutor{copyDescription, std::move(outputDirectory), taskScheduler, catalog,
table->getTableID(), nodesStatisticsAndDeletedIDs},
table{table} {}
: copyDescription{copyDescription}, outputDirectory{std::move(outputDirectory)},
taskScheduler{taskScheduler}, catalog{catalog},
tableSchema{catalog.getReadOnlyVersion()->getTableSchema(table->getTableID())},
table{table}, nodesStatisticsAndDeletedIDs{nodesStatisticsAndDeletedIDs}, numRows{0} {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unify the term usage of row and tuple. I would prefer tuple.


protected:
void initializeColumnsAndLists() override {
// DO NOTHING
}

void populateColumnsAndLists(processor::ExecutionContext* executionContext) override;

void saveToFile() override {
// DO NOTHING
}
public:
common::offset_t copy(processor::ExecutionContext* executionContext);

private:
void populateColumns(processor::ExecutionContext* executionContext);

private:
common::CopyDescription& copyDescription;
std::string outputDirectory;
common::TaskScheduler& taskScheduler;
catalog::Catalog& catalog;
catalog::TableSchema* tableSchema;
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs;
storage::NodeTable* table;
std::unordered_map<std::string, FileBlockInfo> fileBlockInfos;
uint64_t numRows;
};

} // namespace storage
Expand Down
Loading