Skip to content

Commit

Permalink
remove count for copy rel
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Sep 11, 2023
1 parent 46f4039 commit c756eff
Show file tree
Hide file tree
Showing 17 changed files with 589 additions and 686 deletions.
5 changes: 1 addition & 4 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace kuzu {
namespace common {

constexpr char KUZU_VERSION[] = "v0.4.0";
constexpr char KUZU_VERSION[] = "v0.0.8.5";

constexpr uint64_t DEFAULT_VECTOR_CAPACITY_LOG_2 = 11;
constexpr uint64_t DEFAULT_VECTOR_CAPACITY = (uint64_t)1 << DEFAULT_VECTOR_CAPACITY_LOG_2;
Expand Down Expand Up @@ -118,9 +118,6 @@ struct CopyConstants {
// Size (in bytes) of the chunks to be read in Node/Rel Copier
static constexpr uint64_t CSV_READING_BLOCK_SIZE = 1 << 23;

// Number of rows per block for npy files
static constexpr uint64_t NUM_ROWS_PER_BLOCK_FOR_NPY = 2048;

// Default configuration for csv file parsing
static constexpr const char* STRING_CSV_PARSING_OPTIONS[5] = {
"ESCAPE", "DELIM", "QUOTE", "LIST_BEGIN", "LIST_END"};
Expand Down
5 changes: 4 additions & 1 deletion src/include/common/data_chunk/sel_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ class SelectionVector {

public:
sel_t* selectedPositions;
sel_t selectedSize;
// TODO: type of `selectedSize` was changed from `sel_t` to `uint64_t`, which should be reverted
// when we removed arrow array in ValueVector. Currently, we need to keep size of arrow array,
// which could be larger than MAX of `sel_t`.
uint64_t selectedSize;

private:
std::unique_ptr<sel_t[]> selectedPositionsBuffer;
Expand Down
3 changes: 0 additions & 3 deletions src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class CopyNodeSharedState {

struct CopyNodeInfo {
std::vector<DataPos> dataColumnPoses;
DataPos nodeOffsetPos;
common::CopyDescription copyDesc;
storage::NodeTable* table;
storage::RelsStore* relsStore;
Expand All @@ -71,7 +70,6 @@ class CopyNode : public Sink {
for (auto& arrowColumnPos : copyNodeInfo.dataColumnPoses) {
dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
nodeOffsetVector = resultSet->getValueVector(copyNodeInfo.nodeOffsetPos).get();
localNodeGroup =
std::make_unique<storage::NodeGroup>(sharedState->tableSchema, &sharedState->copyDesc);
}
Expand Down Expand Up @@ -111,7 +109,6 @@ class CopyNode : public Sink {
storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes);

private:
common::ValueVector* nodeOffsetVector;
std::shared_ptr<CopyNodeSharedState> sharedState;
CopyNodeInfo copyNodeInfo;
std::vector<common::ValueVector*> dataColumnVectors;
Expand Down
21 changes: 16 additions & 5 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,21 @@ class Reader : public PhysicalOperator {
bool getNextTuplesInternal(ExecutionContext* context) final;

private:
void getNextNodeGroupInSerial();
void getNextNodeGroupInParallel();
void readNextNodeGroupInParallel();
template<ReaderSharedState::ReadMode READ_MODE>
void readNextDataChunk();

template<ReaderSharedState::ReadMode READ_MODE>
inline void lockForSerial() {
if constexpr (READ_MODE == ReaderSharedState::ReadMode::SERIAL) {
sharedState->mtx.lock();
}
}
template<ReaderSharedState::ReadMode READ_MODE>
inline void unlockForSerial() {
if constexpr (READ_MODE == ReaderSharedState::ReadMode::SERIAL) {
sharedState->mtx.unlock();
}
}

private:
std::unique_ptr<ReaderInfo> info;
Expand All @@ -66,8 +78,7 @@ class Reader : public PhysicalOperator {

read_rows_func_t readFunc;
init_reader_data_func_t initFunc;
// For parallel reading.
std::unique_ptr<ReaderFunctionData> readFuncData;
std::shared_ptr<ReaderFunctionData> readFuncData;
};

} // namespace processor
Expand Down
113 changes: 113 additions & 0 deletions src/include/processor/operator/persistent/reader_functions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#pragma once

#include "processor/operator/persistent/csv_reader.h"
#include "storage/copier/npy_reader.h"
#include "storage/copier/table_copy_utils.h"

namespace kuzu {
namespace processor {

// TODO(Xiyang): Move functors to system built-in functions.
struct ReaderFunctionData {
common::CSVReaderConfig csvReaderConfig;
catalog::TableSchema* tableSchema;
common::vector_idx_t fileIdx;

virtual ~ReaderFunctionData() = default;
};

struct RelCSVReaderFunctionData : public ReaderFunctionData {
std::shared_ptr<arrow::csv::StreamingReader> reader = nullptr;
};

struct NodeCSVReaderFunctionData : public ReaderFunctionData {
std::unique_ptr<BufferedCSVReader> reader = nullptr;
};

struct ParquetReaderFunctionData : public ReaderFunctionData {
std::unique_ptr<parquet::arrow::FileReader> reader = nullptr;
};

struct NPYReaderFunctionData : public ReaderFunctionData {
std::unique_ptr<storage::NpyMultiFileReader> reader = nullptr;
};

struct FileBlocksInfo {
common::row_idx_t numRows = 0;
common::block_idx_t numBlocks = 0;
};

using validate_func_t =
std::function<void(const std::vector<std::string>& paths, catalog::TableSchema* tableSchema)>;
using init_reader_data_func_t = std::function<void(ReaderFunctionData& funcData,
const std::vector<std::string>& paths, common::vector_idx_t fileIdx,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema)>;
using count_blocks_func_t = std::function<std::vector<FileBlocksInfo>(
const std::vector<std::string>& paths, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema, storage::MemoryManager* memoryManager)>;
using read_rows_func_t = std::function<void(
const ReaderFunctionData& funcData, common::block_idx_t blockIdx, common::DataChunk*)>;

struct ReaderFunctions {
static validate_func_t getValidateFunc(common::CopyDescription::FileType fileType);
static count_blocks_func_t getCountBlocksFunc(
common::CopyDescription::FileType fileType, common::TableType tableType);
static init_reader_data_func_t getInitDataFunc(
common::CopyDescription::FileType fileType, common::TableType tableType);
static read_rows_func_t getReadRowsFunc(
common::CopyDescription::FileType fileType, common::TableType tableType);
static std::shared_ptr<ReaderFunctionData> getReadFuncData(
common::CopyDescription::FileType fileType, common::TableType tableType);

static inline void validateCSVFiles(
const std::vector<std::string>& paths, catalog::TableSchema* tableSchema) {
// DO NOTHING.
}
static inline void validateParquetFiles(
const std::vector<std::string>& paths, catalog::TableSchema* tableSchema) {
// DO NOTHING.
}
static void validateNPYFiles(
const std::vector<std::string>& paths, catalog::TableSchema* tableSchema);

static std::vector<FileBlocksInfo> countRowsInRelCSVFile(const std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
storage::MemoryManager* memoryManager);
static std::vector<FileBlocksInfo> countRowsInNodeCSVFile(const std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
storage::MemoryManager* memoryManager);
static std::vector<FileBlocksInfo> countRowsInParquetFile(const std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
storage::MemoryManager* memoryManager);
static std::vector<FileBlocksInfo> countRowsInNPYFile(const std::vector<std::string>& paths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema,
storage::MemoryManager* memoryManager);

static void initRelCSVReadData(ReaderFunctionData& funcData,
const std::vector<std::string>& paths, common::vector_idx_t fileIdx,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema);
static void initNodeCSVReadData(ReaderFunctionData& funcData,
const std::vector<std::string>& paths, common::vector_idx_t fileIdx,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema);
static void initParquetReadData(ReaderFunctionData& funcData,
const std::vector<std::string>& paths, common::vector_idx_t fileIdx,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema);
static void initNPYReadData(ReaderFunctionData& funcData, const std::vector<std::string>& paths,
common::vector_idx_t fileIdx, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema);

static void readRowsFromRelCSVFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead);
static void readRowsFromNodeCSVFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* dataChunkToRead);
static void readRowsFromParquetFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);
static void readRowsFromNPYFile(const ReaderFunctionData& funcData,
common::block_idx_t blockIdx, common::DataChunk* vectorsToRead);

static std::unique_ptr<common::DataChunk> getDataChunkToRead(
catalog::TableSchema* tableSchema, storage::MemoryManager* memoryManager);
};

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit c756eff

Please sign in to comment.