Skip to content

Commit

Permalink
fix COPY; rework Reader op
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Aug 27, 2023
1 parent 9526e5e commit c222939
Show file tree
Hide file tree
Showing 53 changed files with 905 additions and 822 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.11)

project(Kuzu VERSION 0.0.7 LANGUAGES CXX)
project(Kuzu VERSION 0.0.7.1 LANGUAGES CXX)

find_package(Threads REQUIRED)

Expand Down
4 changes: 4 additions & 0 deletions src/common/exception.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ std::string ExceptionMessage::existedPKException(const std::string& pkString) {
pkString);
}

std::string ExceptionMessage::nonExistPKException(const std::string& pkString) {
return StringUtils::string_format("Found non-existed primary key value {}.", pkString);
}

std::string ExceptionMessage::invalidPKType(const std::string& type) {
return StringUtils::string_format(
"Invalid primary key column type {}. Primary key must be either INT64, STRING or SERIAL.",
Expand Down
1 change: 1 addition & 0 deletions src/include/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace common {

struct ExceptionMessage {
static std::string existedPKException(const std::string& pkString);
static std::string nonExistPKException(const std::string& pkString);
static std::string invalidPKType(const std::string& type);
static inline std::string nullPKException() {
return "Found NULL, which violates the non-null constraint of the primary key column.";
Expand Down
29 changes: 0 additions & 29 deletions src/include/processor/operator/copy_from/read_csv.h

This file was deleted.

39 changes: 0 additions & 39 deletions src/include/processor/operator/copy_from/read_file.h

This file was deleted.

32 changes: 0 additions & 32 deletions src/include/processor/operator/copy_from/read_npy.h

This file was deleted.

30 changes: 0 additions & 30 deletions src/include/processor/operator/copy_from/read_parquet.h

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ class CopyNodeSharedState {
std::unique_lock<std::mutex> lck{mtx};
return getNextNodeGroupIdxWithoutLock();
}
inline void setNextNodeGroupIdx(common::node_group_idx_t nextNodeGroupIdx) {
std::unique_lock<std::mutex> lck{mtx};
if (nextNodeGroupIdx > currentNodeGroupIdx) {
currentNodeGroupIdx = nextNodeGroupIdx;
}
}

void logCopyNodeWALRecord(storage::WAL* wal);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include "processor/operator/copy_from/copy.h"
#include "processor/operator/persistent/copy.h"
#include "storage/store/nodes_store.h"
#include "storage/store/rels_store.h"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include "common/copier_config/copier_config.h"
#include "common/task_system/task_scheduler.h"
#include "processor/operator/copy_to/csv_file_writer.h"
#include "processor/operator/persistent/csv_file_writer.h"
#include "processor/operator/physical_operator.h"
#include "processor/result/result_set.h"

Expand Down
54 changes: 54 additions & 0 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#pragma once

#include "processor/operator/physical_operator.h"
#include "storage/copier/reader_state.h"

namespace kuzu {
namespace processor {

struct ReaderInfo {
DataPos nodeOffsetPos;
std::vector<DataPos> dataColumnPoses;
bool isOrderPreserving;
storage::read_rows_func_t readFunc;
storage::init_reader_data_func_t initFunc;
};

class Reader : public PhysicalOperator {
public:
Reader(ReaderInfo readerInfo, std::shared_ptr<storage::ReaderSharedState> sharedState,
uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::READER, id, paramsString},
readerInfo{std::move(readerInfo)}, sharedState{std::move(sharedState)}, leftNumRows{0},
currFileIdx{common::INVALID_VECTOR_IDX}, readFuncData{nullptr} {}

inline void initGlobalStateInternal(ExecutionContext* context) final {
sharedState->validate();
sharedState->countBlocks();
}
inline bool isSource() const final { return true; }

inline std::unique_ptr<PhysicalOperator> clone() final {
return make_unique<Reader>(readerInfo, sharedState, getOperatorID(), paramsString);
}

protected:
bool getNextTuplesInternal(ExecutionContext* context) final;

private:
void getNextNodeGroupInSerial(std::shared_ptr<arrow::Table>& table);
void getNextNodeGroupInParallel(std::shared_ptr<arrow::Table>& table);

private:
ReaderInfo readerInfo;
std::shared_ptr<storage::ReaderSharedState> sharedState;
std::vector<std::shared_ptr<arrow::RecordBatch>> leftRecordBatches;
common::row_idx_t leftNumRows;

// For parallel reading.
common::vector_idx_t currFileIdx;
std::unique_ptr<storage::ReaderFunctionData> readFuncData;
};

} // namespace processor
} // namespace kuzu
9 changes: 3 additions & 6 deletions src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@ enum class PhysicalOperatorType : uint8_t {
IN_QUERY_CALL,
COPY_NODE,
COPY_REL,
COPY_NPY,
COPY_TO,
READ_CSV,
READ_NPY,
READ_PARQUET,
INSERT_NODE,
CREATE_NODE_TABLE,
INSERT_REL,
CREATE_REL_TABLE,
CROSS_PRODUCT,
DELETE_NODE,
Expand All @@ -38,6 +32,8 @@ enum class PhysicalOperatorType : uint8_t {
HASH_JOIN_BUILD,
HASH_JOIN_PROBE,
INDEX_SCAN,
INSERT_NODE,
INSERT_REL,
INTERSECT_BUILD,
INTERSECT,
LIMIT,
Expand All @@ -46,6 +42,7 @@ enum class PhysicalOperatorType : uint8_t {
PATH_PROPERTY_PROBE,
PROJECTION,
PROFILE,
READER,
RECURSIVE_JOIN,
RENAME_PROPERTY,
RENAME_TABLE,
Expand Down
26 changes: 14 additions & 12 deletions src/include/storage/copier/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,33 @@ namespace storage {

class NullColumnChunk;

struct ColumnChunkMetadata {
common::page_idx_t pageIdx = common::INVALID_PAGE_IDX;
common::page_idx_t numPages = 0;
struct BaseColumnChunkMetadata {
common::page_idx_t pageIdx;
common::page_idx_t numPages;

ColumnChunkMetadata() = default;
ColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages)
BaseColumnChunkMetadata() : pageIdx{common::INVALID_PAGE_IDX}, numPages{0} {}
BaseColumnChunkMetadata(common::page_idx_t pageIdx, common::page_idx_t numPages)
: pageIdx(pageIdx), numPages(numPages) {}
virtual ~BaseColumnChunkMetadata() = default;
};

struct MainColumnChunkMetadata : public ColumnChunkMetadata {
struct ColumnChunkMetadata : public BaseColumnChunkMetadata {
uint64_t numValues;

MainColumnChunkMetadata() = default;
MainColumnChunkMetadata(
ColumnChunkMetadata() : BaseColumnChunkMetadata(), numValues{UINT64_MAX} {}
ColumnChunkMetadata(
common::page_idx_t pageIdx, common::page_idx_t numPages, uint64_t numNodesInChunk)
: ColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk) {}
: BaseColumnChunkMetadata{pageIdx, numPages}, numValues(numNodesInChunk) {}
};

struct OverflowColumnChunkMetadata : public ColumnChunkMetadata {
struct OverflowColumnChunkMetadata : public BaseColumnChunkMetadata {
common::offset_t lastOffsetInPage;

OverflowColumnChunkMetadata() = default;
OverflowColumnChunkMetadata()
: BaseColumnChunkMetadata(), lastOffsetInPage{common::INVALID_OFFSET} {}
OverflowColumnChunkMetadata(
common::page_idx_t pageIdx, common::page_idx_t numPages, common::offset_t lastOffsetInPage)
: ColumnChunkMetadata{pageIdx, numPages}, lastOffsetInPage(lastOffsetInPage) {}
: BaseColumnChunkMetadata{pageIdx, numPages}, lastOffsetInPage(lastOffsetInPage) {}
};

// Base data segment covers all fixed-sized data types.
Expand Down
Loading

0 comments on commit c222939

Please sign in to comment.