Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove blocks counting in copying rel pipeline #2013

Merged
merged 1 commit into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace kuzu {
namespace common {

constexpr char KUZU_VERSION[] = "v0.4.0";
constexpr char KUZU_VERSION[] = "v0.0.8.5";

constexpr uint64_t DEFAULT_VECTOR_CAPACITY_LOG_2 = 11;
constexpr uint64_t DEFAULT_VECTOR_CAPACITY = (uint64_t)1 << DEFAULT_VECTOR_CAPACITY_LOG_2;
Expand Down Expand Up @@ -118,9 +118,6 @@ struct CopyConstants {
// Size (in bytes) of the chunks to be read in Node/Rel Copier
static constexpr uint64_t CSV_READING_BLOCK_SIZE = 1 << 23;

// Number of rows per block for npy files
static constexpr uint64_t NUM_ROWS_PER_BLOCK_FOR_NPY = 2048;

// Default configuration for csv file parsing
static constexpr const char* STRING_CSV_PARSING_OPTIONS[5] = {
"ESCAPE", "DELIM", "QUOTE", "LIST_BEGIN", "LIST_END"};
Expand Down
5 changes: 4 additions & 1 deletion src/include/common/data_chunk/sel_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ class SelectionVector {

public:
sel_t* selectedPositions;
sel_t selectedSize;
// TODO: type of `selectedSize` was changed from `sel_t` to `uint64_t`, which should be reverted
// when we removed arrow array in ValueVector. Currently, we need to keep size of arrow array,
// which could be larger than MAX of `sel_t`.
uint64_t selectedSize;

private:
std::unique_ptr<sel_t[]> selectedPositionsBuffer;
Expand Down
3 changes: 0 additions & 3 deletions src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class CopyNodeSharedState {

struct CopyNodeInfo {
std::vector<DataPos> dataColumnPoses;
DataPos nodeOffsetPos;
common::CopyDescription copyDesc;
storage::NodeTable* table;
storage::RelsStore* relsStore;
Expand All @@ -71,7 +70,6 @@ class CopyNode : public Sink {
for (auto& arrowColumnPos : copyNodeInfo.dataColumnPoses) {
dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
nodeOffsetVector = resultSet->getValueVector(copyNodeInfo.nodeOffsetPos).get();
localNodeGroup =
std::make_unique<storage::NodeGroup>(sharedState->tableSchema, &sharedState->copyDesc);
}
Expand Down Expand Up @@ -111,7 +109,6 @@ class CopyNode : public Sink {
storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes);

private:
common::ValueVector* nodeOffsetVector;
std::shared_ptr<CopyNodeSharedState> sharedState;
CopyNodeInfo copyNodeInfo;
std::vector<common::ValueVector*> dataColumnVectors;
Expand Down
21 changes: 16 additions & 5 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,21 @@ class Reader : public PhysicalOperator {
bool getNextTuplesInternal(ExecutionContext* context) final;

private:
void getNextNodeGroupInSerial();
void getNextNodeGroupInParallel();
void readNextNodeGroupInParallel();
template<ReaderSharedState::ReadMode READ_MODE>
void readNextDataChunk();

template<ReaderSharedState::ReadMode READ_MODE>
inline void lockForSerial() {
if constexpr (READ_MODE == ReaderSharedState::ReadMode::SERIAL) {
sharedState->mtx.lock();
}
}
template<ReaderSharedState::ReadMode READ_MODE>
inline void unlockForSerial() {
if constexpr (READ_MODE == ReaderSharedState::ReadMode::SERIAL) {
sharedState->mtx.unlock();
}
}

private:
std::unique_ptr<ReaderInfo> info;
Expand All @@ -66,8 +78,7 @@ class Reader : public PhysicalOperator {

read_rows_func_t readFunc;
init_reader_data_func_t initFunc;
// For parallel reading.
std::unique_ptr<ReaderFunctionData> readFuncData;
std::shared_ptr<ReaderFunctionData> readFuncData;
ray6080 marked this conversation as resolved.
Show resolved Hide resolved
};

} // namespace processor
Expand Down
113 changes: 113 additions & 0 deletions src/include/processor/operator/persistent/reader_functions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#pragma once

#include "processor/operator/persistent/csv_reader.h"
#include "storage/copier/npy_reader.h"
#include "storage/copier/table_copy_utils.h"

namespace kuzu {
namespace processor {

// TODO(Xiyang): Move functors to system built-in functions.
struct ReaderFunctionData {
common::CSVReaderConfig csvReaderConfig;
catalog::TableSchema* tableSchema;
common::vector_idx_t fileIdx;

virtual ~ReaderFunctionData() = default;
};

struct RelCSVReaderFunctionData : public ReaderFunctionData {
std::shared_ptr<arrow::csv::StreamingReader> reader = nullptr;
};

struct NodeCSVReaderFunctionData : public ReaderFunctionData {
std::unique_ptr<BufferedCSVReader> reader = nullptr;
};

struct ParquetReaderFunctionData : public ReaderFunctionData {
std::unique_ptr<parquet::arrow::FileReader> reader = nullptr;
};

struct NPYReaderFunctionData : public ReaderFunctionData {
std::unique_ptr<storage::NpyMultiFileReader> reader = nullptr;
};

struct FileBlocksInfo {
common::row_idx_t numRows = 0;
common::block_idx_t numBlocks = 0;
};

using validate_func_t =
std::function<void(const std::vector<std::string>& paths, catalog::TableSchema* tableSchema)>;
using init_reader_data_func_t = std::function<void(ReaderFunctionData& funcData,
const std::vector<std::string>& paths, common::vector_idx_t fileIdx,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema)>;
using count_blocks_func_t = std::function<std::vector<FileBlocksInfo>(
const std::vector<std::string>& paths, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema, storage::MemoryManager* memoryManager)>;
using read_rows_func_t = std::function<void(
const ReaderFunctionData& funcData, common::block_idx_t blockIdx, common::DataChunk*)>;

struct ReaderFunctions {
static validate_func_t getValidateFunc(common::CopyDescription::FileType fileType);
static count_blocks_func_t getCountBlocksFunc(
common::CopyDescription::FileType fileType, common::TableType tableType);
static init_reader_data_func_t getInitDataFunc(
common::CopyDescription::FileType fileType, common::TableType tableType);
static read_rows_func_t getReadRowsFunc(
common::CopyDescription::FileType fileType, common::TableType tableType);
static std::shared_ptr<ReaderFunctionData> getReadFuncData(
common::CopyDescription::FileType fileType, common::TableType tableType);

static inline void validateCSVFiles(
const std::vector<std::string>& paths, catalog::TableSchema* tableSchema) {
// DO NOTHING.
}
static inline void validateParquetFiles(
const std::vector<std::string>& paths, catalog::TableSchema* tableSchema) {
// DO NOTHING.
}
static void validateNPYFiles(
const std::vector<std::string>& paths, catalog::TableSchema* tableSchema);

static std::vector<FileBlocksInfo> countRowsInRelCSVFile(const std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
storage::MemoryManager* memoryManager);
static std::vector<FileBlocksInfo> countRowsInNodeCSVFile(const std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
storage::MemoryManager* memoryManager);
static std::vector<FileBlocksInfo> countRowsInParquetFile(const std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
storage::MemoryManager* memoryManager);
static std::vector<FileBlocksInfo> countRowsInNPYFile(const std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
storage::MemoryManager* memoryManager);

static void initRelCSVReadData(ReaderFunctionData& funcData,
const std::vector<std::string>& paths, common::vector_idx_t fileIdx,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema);
static void initNodeCSVReadData(ReaderFunctionData& funcData,
const std::vector<std::string>& paths, common::vector_idx_t fileIdx,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema);
static void initParquetReadData(ReaderFunctionData& funcData,
const std::vector<std::string>& paths, common::vector_idx_t fileIdx,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema);
static void initNPYReadData(ReaderFunctionData& funcData, const std::vector<std::string>& paths,
common::vector_idx_t fileIdx, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema);

static void readRowsFromRelCSVFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead);
static void readRowsFromNodeCSVFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead);
static void readRowsFromParquetFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
static void readRowsFromNPYFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);

static std::unique_ptr<common::DataChunk> getDataChunkToRead(
catalog::TableSchema* tableSchema, storage::MemoryManager* memoryManager);
};

} // namespace processor
} // namespace kuzu
Loading