Skip to content

Commit

Permalink
refactor: unify CopyNode and CopyRel operator
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Mar 3, 2024
1 parent f9e9e29 commit ca5d82a
Show file tree
Hide file tree
Showing 28 changed files with 656 additions and 596 deletions.
1 change: 1 addition & 0 deletions src/include/catalog/catalog_entry/table_catalog_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class TableCatalogEntry : public CatalogEntry {
std::string getComment() const { return comment; }
void setComment(std::string newComment) { comment = std::move(newComment); }
virtual bool isParent(common::table_id_t tableID) = 0;
// TODO(Guodong/Ziyi): This function should be removed. Instead we should use CatalogEntryType.
virtual common::TableType getTableType() const = 0;

//===--------------------------------------------------------------------===//
Expand Down
1 change: 1 addition & 0 deletions src/include/common/enums/table_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
namespace kuzu {
namespace common {

// TODO(Guodong/Ziyi/Xiyang): Should we remove this and instead use `CatalogEntryType`?
enum class TableType : uint8_t {
UNKNOWN = 0,
NODE = 1,
Expand Down
10 changes: 5 additions & 5 deletions src/include/processor/operator/index_lookup.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once

#include "processor/operator/persistent/copy_node.h"
#include "processor/operator/physical_operator.h"

namespace kuzu {
Expand All @@ -9,23 +8,24 @@ class PrimaryKeyIndex;
} // namespace storage
namespace processor {

struct BatchInsertSharedState;
struct IndexLookupInfo {
std::unique_ptr<common::LogicalType> pkDataType;
storage::PrimaryKeyIndex* index; // NULL if the PK data type is SERIAL.
// In copy rdf, we need to perform lookup before primary key is persist on disk. So we need to
// use index builder.
std::shared_ptr<CopyNodeSharedState> copyNodeSharedState;
std::shared_ptr<BatchInsertSharedState> batchInsertSharedState;
DataPos keyVectorPos;
DataPos resultVectorPos;

IndexLookupInfo(std::unique_ptr<common::LogicalType> pkDataType,
storage::PrimaryKeyIndex* index, const DataPos& keyVectorPos,
const DataPos& resultVectorPos)
: pkDataType{std::move(pkDataType)}, index{index}, copyNodeSharedState{nullptr},
: pkDataType{std::move(pkDataType)}, index{index}, batchInsertSharedState{nullptr},
keyVectorPos{keyVectorPos}, resultVectorPos{resultVectorPos} {}
IndexLookupInfo(const IndexLookupInfo& other)
: pkDataType{other.pkDataType->copy()}, index{other.index},
copyNodeSharedState{other.copyNodeSharedState}, keyVectorPos{other.keyVectorPos},
batchInsertSharedState{other.batchInsertSharedState}, keyVectorPos{other.keyVectorPos},
resultVectorPos{other.resultVectorPos} {}

inline std::unique_ptr<IndexLookupInfo> copy() {
Expand All @@ -40,7 +40,7 @@ class IndexLookup : public PhysicalOperator {
: PhysicalOperator{PhysicalOperatorType::INDEX_LOOKUP, std::move(child), id, paramsString},
infos{std::move(infos)} {}

void setCopyNodeSharedState(std::shared_ptr<CopyNodeSharedState> sharedState);
void setBatchInsertSharedState(std::shared_ptr<BatchInsertSharedState> sharedState);

bool getNextTuplesInternal(ExecutionContext* context) final;

Expand Down
13 changes: 8 additions & 5 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#pragma once

#include "common/data_chunk/data_chunk_collection.h"
#include "processor/operator/persistent/copy_node.h"
#include "processor/operator/sink.h"

namespace kuzu {
namespace storage {
class NodeTable;
}
namespace processor {

using partitioner_func_t =
Expand All @@ -23,8 +25,9 @@ struct PartitioningBuffer {
void merge(std::unique_ptr<PartitioningBuffer> localPartitioningStates);
};

// NOTE: Currently, Partitioner is tightly coupled with CopyRel. We should generalize it later when
// necessary. Here, each partition is essentially a node group.
// NOTE: Currently, Partitioner is tightly coupled with RelBatchInsert. We should generalize it
// later when necessary. Here, each partition is essentially a node group.
struct BatchInsertSharedState;
struct PartitionerSharedState {
std::mutex mtx;
storage::MemoryManager* mm;
Expand All @@ -37,7 +40,7 @@ struct PartitionerSharedState {
std::vector<std::unique_ptr<PartitioningBuffer>> partitioningBuffers;
common::partition_idx_t nextPartitionIdx = 0;
// In copy rdf, we need to access num nodes before it is available in statistics.
std::vector<std::shared_ptr<CopyNodeSharedState>> copyNodeSharedStates;
std::vector<std::shared_ptr<BatchInsertSharedState>> nodeBatchInsertSharedStates;

explicit PartitionerSharedState(storage::MemoryManager* mm) : mm{mm} {}

Expand Down Expand Up @@ -102,7 +105,7 @@ class Partitioner : public Sink {
std::vector<common::partition_idx_t> numPartitions, storage::MemoryManager* mm);

private:
// TODO: For now, CopyRel will guarantee all data are inside one data chunk. Should be
// TODO: For now, RelBatchInsert will guarantee all data are inside one data chunk. Should be
// generalized to resultSet later if needed.
void copyDataToPartitions(
common::partition_idx_t partitioningIdx, common::DataChunk* chunkToCopyFrom);
Expand Down
86 changes: 86 additions & 0 deletions src/include/processor/operator/persistent/batch_insert.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#pragma once

#include "processor/operator/sink.h"
#include "storage/store/table.h"

namespace kuzu {
namespace processor {

struct BatchInsertInfo {
catalog::TableCatalogEntry* tableEntry;
bool compressionEnabled;

BatchInsertInfo(catalog::TableCatalogEntry* tableEntry, bool compressionEnabled)
: tableEntry{tableEntry}, compressionEnabled{compressionEnabled} {}
virtual ~BatchInsertInfo() = default;

BatchInsertInfo(const BatchInsertInfo& other) = delete;

inline virtual std::unique_ptr<BatchInsertInfo> copy() const = 0;
};

struct BatchInsertSharedState {
std::mutex mtx;
std::atomic<common::row_idx_t> numRows;
storage::Table* table;
std::shared_ptr<FactorizedTable> fTable;
storage::WAL* wal;

BatchInsertSharedState(
storage::Table* table, std::shared_ptr<FactorizedTable> fTable, storage::WAL* wal)
: numRows{0}, table{table}, fTable{std::move(fTable)}, wal{wal} {};
BatchInsertSharedState(const BatchInsertSharedState& other) = delete;

virtual ~BatchInsertSharedState() = default;

std::unique_ptr<BatchInsertSharedState> copy() const {
auto result = std::make_unique<BatchInsertSharedState>(table, fTable, wal);
result->numRows.store(numRows.load());
return result;
}

inline void incrementNumRows(common::row_idx_t numRowsToIncrement) {
numRows.fetch_add(numRowsToIncrement);
}
inline common::row_idx_t getNumRows() { return numRows.load(); }
// NOLINTNEXTLINE(readability-make-member-function-const): Semantically non-const.
inline void logBatchInsertWALRecord() {
wal->logCopyTableRecord(table->getTableID());
wal->flushAllPages();
}
inline void setNumTuplesForTable() { table->setNumTuples(getNumRows()); }
};

struct BatchInsertLocalState {
std::unique_ptr<storage::NodeGroup> nodeGroup;

virtual ~BatchInsertLocalState() = default;

Check warning on line 57 in src/include/processor/operator/persistent/batch_insert.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/batch_insert.h#L57

Added line #L57 was not covered by tests
};

class BatchInsert : public Sink {
public:
BatchInsert(std::unique_ptr<BatchInsertInfo> info,
std::shared_ptr<BatchInsertSharedState> sharedState,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor, uint32_t id,
const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::BATCH_INSERT, id,
paramsString},
info{std::move(info)}, sharedState{std::move(sharedState)} {}

~BatchInsert() override = default;

std::unique_ptr<PhysicalOperator> clone() override = 0;

inline std::shared_ptr<BatchInsertSharedState> getSharedState() const { return sharedState; }

protected:
void checkIfTableIsEmpty();

protected:
std::unique_ptr<BatchInsertInfo> info;
std::shared_ptr<BatchInsertSharedState> sharedState;
std::unique_ptr<BatchInsertLocalState> localState;
};

} // namespace processor
} // namespace kuzu
139 changes: 0 additions & 139 deletions src/include/processor/operator/persistent/copy_node.h

This file was deleted.

Loading

0 comments on commit ca5d82a

Please sign in to comment.