Skip to content

Commit

Permalink
fix rel delete and create
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Nov 17, 2023
1 parent ef27e49 commit 09bea9b
Show file tree
Hide file tree
Showing 53 changed files with 352 additions and 178 deletions.
22 changes: 11 additions & 11 deletions src/include/processor/operator/persistent/delete_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ class NodeDeleteExecutor {
common::ValueVector* nodeIDVector;
};

class SingleLabelNodeDeleteExecutor : public NodeDeleteExecutor {
class SingleLabelNodeDeleteExecutor final : public NodeDeleteExecutor {
public:
SingleLabelNodeDeleteExecutor(storage::NodeTable* table, const DataPos& nodeIDPos)
: NodeDeleteExecutor(nodeIDPos), table{table} {}
SingleLabelNodeDeleteExecutor(const SingleLabelNodeDeleteExecutor& other)
: NodeDeleteExecutor(other.nodeIDPos), table{other.table} {}

void init(ResultSet* resultSet, ExecutionContext* context) final;
void delete_(ExecutionContext* context) final;
void init(ResultSet* resultSet, ExecutionContext* context);
void delete_(ExecutionContext* context);

inline std::unique_ptr<NodeDeleteExecutor> copy() const final {
inline std::unique_ptr<NodeDeleteExecutor> copy() const {
return std::make_unique<SingleLabelNodeDeleteExecutor>(*this);
}

Expand All @@ -44,7 +44,7 @@ class SingleLabelNodeDeleteExecutor : public NodeDeleteExecutor {
std::unique_ptr<common::ValueVector> pkVector;
};

class MultiLabelNodeDeleteExecutor : public NodeDeleteExecutor {
class MultiLabelNodeDeleteExecutor final : public NodeDeleteExecutor {
public:
MultiLabelNodeDeleteExecutor(
std::unordered_map<common::table_id_t, storage::NodeTable*> tableIDToTableMap,
Expand All @@ -53,10 +53,10 @@ class MultiLabelNodeDeleteExecutor : public NodeDeleteExecutor {
MultiLabelNodeDeleteExecutor(const MultiLabelNodeDeleteExecutor& other)
: NodeDeleteExecutor(other.nodeIDPos), tableIDToTableMap{other.tableIDToTableMap} {}

void init(ResultSet* resultSet, ExecutionContext* context) final;
void delete_(ExecutionContext* context) final;
void init(ResultSet* resultSet, ExecutionContext* context);
void delete_(ExecutionContext* context);

inline std::unique_ptr<NodeDeleteExecutor> copy() const final {
inline std::unique_ptr<NodeDeleteExecutor> copy() const {
return std::make_unique<MultiLabelNodeDeleteExecutor>(*this);
}

Expand All @@ -75,7 +75,7 @@ class RelDeleteExecutor {

void init(ResultSet* resultSet, ExecutionContext* context);

virtual void delete_() = 0;
virtual void delete_(ExecutionContext* context) = 0;

virtual std::unique_ptr<RelDeleteExecutor> copy() const = 0;

Expand All @@ -97,7 +97,7 @@ class SingleLabelRelDeleteExecutor final : public RelDeleteExecutor {
relsStatistic{relsStatistic}, table{table} {}
SingleLabelRelDeleteExecutor(const SingleLabelRelDeleteExecutor& other) = default;

void delete_();
void delete_(ExecutionContext* context);

inline std::unique_ptr<RelDeleteExecutor> copy() const {
return std::make_unique<SingleLabelRelDeleteExecutor>(*this);
Expand All @@ -119,7 +119,7 @@ class MultiLabelRelDeleteExecutor final : public RelDeleteExecutor {
tableIDToTableMap)} {}
MultiLabelRelDeleteExecutor(const MultiLabelRelDeleteExecutor& other) = default;

void delete_();
void delete_(ExecutionContext* context);

inline std::unique_ptr<RelDeleteExecutor> copy() const {
return std::make_unique<MultiLabelRelDeleteExecutor>(*this);
Expand Down
5 changes: 3 additions & 2 deletions src/include/storage/local_storage/local_rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class LocalRelNG final : public LocalNodeGroup {
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* srcNodeIDVector, common::ValueVector* relIDVector,
common::column_id_t columnID, common::ValueVector* propertyVector);
void delete_(common::ValueVector* srcNodeIDVector, common::ValueVector* relIDVector);
bool delete_(common::ValueVector* srcNodeIDVector, common::ValueVector* relIDVector);

inline LocalVectorCollection* getAdjChunk() { return adjChunk.get(); }
inline LocalVectorCollection* getPropertyChunk(common::column_id_t columnID) {
Expand All @@ -99,7 +99,8 @@ class LocalRelTableData final : public LocalTableData {
const std::vector<common::ValueVector*>& propertyVectors);
void update(common::ValueVector* srcNodeIDVector, common::ValueVector* relIDVector,
common::column_id_t columnID, common::ValueVector* propertyVector);
void delete_(common::ValueVector* srcNodeIDVector, common::ValueVector* relIDVector);
bool delete_(common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector,
common::ValueVector* relIDVector);

private:
LocalNodeGroup* getOrCreateLocalNodeGroup(common::ValueVector* nodeIDVector);
Expand Down
6 changes: 3 additions & 3 deletions src/include/storage/local_storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class LocalVector {
// contains 64 vectors (chunks).
class LocalVectorCollection {
public:
LocalVectorCollection(const common::LogicalType* dataType, MemoryManager* mm)
: dataType{dataType}, mm{mm}, numRows{0} {}
LocalVectorCollection(std::unique_ptr<common::LogicalType> dataType, MemoryManager* mm)
: dataType{std::move(dataType)}, mm{mm}, numRows{0} {}

void read(common::row_idx_t rowIdx, common::ValueVector* outputVector,
common::sel_t posInOutputVector);
Expand All @@ -62,7 +62,7 @@ class LocalVectorCollection {
void prepareAppend();

private:
const common::LogicalType* dataType;
std::unique_ptr<common::LogicalType> dataType;
MemoryManager* mm;
std::vector<std::unique_ptr<LocalVector>> vectors;
common::row_idx_t numRows;
Expand Down
5 changes: 5 additions & 0 deletions src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,14 @@ class RelTable : public Table {
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors) final;

void insert(transaction::Transaction* transaction, common::ValueVector* srcNodeIDVector,
common::ValueVector* dstNodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(transaction::Transaction* transaction, common::column_id_t columnID,
common::ValueVector* srcNodeIDVector, common::ValueVector* dstNodeIDVector,
common::ValueVector* relIDVector, common::ValueVector* propertyVector);
void delete_(transaction::Transaction* transaction, common::ValueVector* srcNodeIDVector,
common::ValueVector* dstNodeIDVector, common::ValueVector* relIDVector);

void addColumn(transaction::Transaction* transaction, const catalog::Property& property,
common::ValueVector* defaultValueVector) final;
Expand Down
16 changes: 16 additions & 0 deletions src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,14 @@ class RelTableData final : public TableData {
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors);

void insert(transaction::Transaction* transaction, common::ValueVector* srcNodeIDVector,
common::ValueVector* dstNodeIDVector,
const std::vector<common::ValueVector*>& propertyVectors);
void update(transaction::Transaction* transaction, common::column_id_t columnID,
common::ValueVector* srcNodeIDVector, common::ValueVector* relIDVector,
common::ValueVector* propertyVector);
bool delete_(transaction::Transaction* transaction, common::ValueVector* srcNodeIDVector,
common::ValueVector* dstNodeIDVector, common::ValueVector* relIDVector);

void append(NodeGroup* nodeGroup);

Expand Down Expand Up @@ -87,6 +92,17 @@ class RelTableData final : public TableData {
void prepareCommitCSRNGWithoutSliding(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, CSRRelNGInfo* relNodeGroupInfo,
ColumnChunk* csrOffsetChunk, ColumnChunk* relIDChunk, LocalRelNG* localNodeGroup);
void prepareCommitCSRNGWithSliding(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, CSRRelNGInfo* relNodeGroupInfo,
ColumnChunk* csrOffsetChunk, ColumnChunk* relIDChunk, LocalRelNG* localNodeGroup);

std::unique_ptr<ColumnChunk> slideCSROffsetColumnChunk(ColumnChunk* csrOffsetChunk,
CSRRelNGInfo* relNodeGroupInfo, common::offset_t nodeGroupStartOffset);
std::unique_ptr<ColumnChunk> slideCSRColumnChunk(transaction::Transaction* transaction,
ColumnChunk* csrOffsetChunk, ColumnChunk* slidedCSROffsetChunkForCheck,
ColumnChunk* relIDChunk, const offset_to_offset_to_row_idx_t& insertInfo,
const offset_to_offset_to_row_idx_t& updateInfo, const offset_to_offset_set_t& deleteInfo,
common::node_group_idx_t nodeGroupIdx, Column* column, LocalVectorCollection* localChunk);

static inline common::ColumnDataFormat getDataFormatFromSchema(
catalog::RelTableSchema* tableSchema, common::RelDataDirection direction) {
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/persistent/delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ bool DeleteRel::getNextTuplesInternal(ExecutionContext* context) {
return false;
}
for (auto& executor : executors) {
executor->delete_();
executor->delete_(context);
}
return true;
}
Expand Down
15 changes: 6 additions & 9 deletions src/processor/operator/persistent/delete_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,19 @@ void RelDeleteExecutor::init(ResultSet* resultSet, ExecutionContext* /*context*/
relIDVector = resultSet->getValueVector(relIDPos).get();
}

void SingleLabelRelDeleteExecutor::delete_() {
// TODO(Guodong): Fix delete.
// table->deleteRel(srcNodeIDVector, dstNodeIDVector, relIDVector);
// relsStatistic->updateNumRelsByValue(table->getTableID(), -1);
void SingleLabelRelDeleteExecutor::delete_(ExecutionContext* context) {
table->delete_(context->clientContext->getActiveTransaction(), srcNodeIDVector, dstNodeIDVector,
relIDVector);
}

void MultiLabelRelDeleteExecutor::delete_() {
void MultiLabelRelDeleteExecutor::delete_(ExecutionContext* context) {
KU_ASSERT(relIDVector->state->isFlat());
auto pos = relIDVector->state->selVector->selectedPositions[0];
auto relID = relIDVector->getValue<internalID_t>(pos);
KU_ASSERT(tableIDToTableMap.contains(relID.tableID));
auto [table, statistic] = tableIDToTableMap.at(relID.tableID);
// TODO(Guodong): Fix delete.
// table->deleteRel(srcNodeIDVector, dstNodeIDVector, relIDVector);
// KU_ASSERT(table->getTableID() == relID.tableID);
// statistic->updateNumRelsByValue(table->getTableID(), -1);
table->delete_(context->clientContext->getActiveTransaction(), srcNodeIDVector, dstNodeIDVector,
relIDVector);
}

} // namespace processor
Expand Down
4 changes: 1 addition & 3 deletions src/processor/operator/persistent/insert_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,7 @@ void RelInsertExecutor::insert(transaction::Transaction* tx) {
for (auto i = 1; i < propertyRhsEvaluators.size(); ++i) {
propertyRhsEvaluators[i]->evaluate();
}
// TODO(Guodong): Fix insert.
// table->insertRel(srcNodeIDVector, dstNodeIDVector, propertyRhsVectors);
// relsStatistics.updateNumRelsByValue(table->getRelTableID(), 1);
table->insert(tx, srcNodeIDVector, dstNodeIDVector, propertyRhsVectors);
for (auto i = 0u; i < propertyLhsVectors.size(); ++i) {
auto lhsVector = propertyLhsVectors[i];
auto rhsVector = propertyRhsVectors[i];
Expand Down
27 changes: 16 additions & 11 deletions src/storage/local_storage/local_rel_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ void RegularRelNGInfo::insert(offset_t srcNodeOffset, offset_t /*relOffset*/,
}

void RegularRelNGInfo::update(
offset_t srcNodeOffset, offset_t relOffset, column_id_t columnID, row_idx_t rowIdx) {
offset_t srcNodeOffset, offset_t /*relOffset*/, column_id_t columnID, row_idx_t rowIdx) {
if (deleteInfo.contains(srcNodeOffset)) {
// We choose to ignore the update operation if the node is deleted.
return;
Expand All @@ -32,9 +32,9 @@ void RegularRelNGInfo::update(
KU_ASSERT(columnID < updateInfoPerChunk.size());
if (insertInfoPerChunk[columnID].contains(srcNodeOffset)) {
// Update newly inserted value.
insertInfoPerChunk[columnID][relOffset] = rowIdx;
insertInfoPerChunk[columnID][srcNodeOffset] = rowIdx;
} else {
updateInfoPerChunk[columnID][relOffset] = rowIdx;
updateInfoPerChunk[columnID][srcNodeOffset] = rowIdx;
}
}

Expand Down Expand Up @@ -141,13 +141,15 @@ LocalRelNG::LocalRelNG(ColumnDataFormat dataFormat, std::vector<LogicalType*> da
KU_UNREACHABLE;
}
}
adjChunk = std::make_unique<LocalVectorCollection>(LogicalType::INTERNAL_ID(), mm);
}

void LocalRelNG::insert(ValueVector* srcNodeIDVector, ValueVector* dstNodeIDVector,
const std::vector<ValueVector*>& propertyVectors) {
KU_ASSERT(propertyVectors.size() == chunks.size() && propertyVectors.size() > 1);
KU_ASSERT(propertyVectors.size() == chunks.size() && propertyVectors.size() >= 1);
auto adjNodeIDRowIdx = adjChunk->append(dstNodeIDVector);
std::vector<row_idx_t> propertyValuesRowIdx;
propertyValuesRowIdx.reserve(propertyVectors.size());
for (auto i = 0u; i < propertyVectors.size(); ++i) {
propertyValuesRowIdx.push_back(chunks[i]->append(propertyVectors[i]));
}
Expand All @@ -169,12 +171,12 @@ void LocalRelNG::update(ValueVector* srcNodeIDVector, ValueVector* relIDVector,
relNGInfo->update(srcNodeOffset, relOffset, columnID, rowIdx);
}

void LocalRelNG::delete_(ValueVector* srcNodeIDVector, ValueVector* relIDVector) {
bool LocalRelNG::delete_(ValueVector* srcNodeIDVector, ValueVector* relIDVector) {
auto srcNodeIDPos = srcNodeIDVector->state->selVector->selectedPositions[0];
auto srcNodeOffset = srcNodeIDVector->getValue<nodeID_t>(srcNodeIDPos).offset;
auto relIDPos = relIDVector->state->selVector->selectedPositions[0];
auto relOffset = relIDVector->getValue<relID_t>(relIDPos).offset;
relNGInfo->delete_(srcNodeOffset, relOffset);
return relNGInfo->delete_(srcNodeOffset, relOffset);
}

void LocalRelTableData::insert(ValueVector* srcNodeIDVector, ValueVector* dstNodeIDVector,
Expand All @@ -197,24 +199,27 @@ void LocalRelTableData::update(ValueVector* srcNodeIDVector, ValueVector* relIDV
KU_ASSERT(srcNodeIDVector->state->selVector->selectedSize == 1 &&
relIDVector->state->selVector->selectedSize == 1);
auto srcNodeIDPos = srcNodeIDVector->state->selVector->selectedPositions[0];
if (srcNodeIDVector->isNull(srcNodeIDPos)) {
auto relIDPos = relIDVector->state->selVector->selectedPositions[0];
if (srcNodeIDVector->isNull(srcNodeIDPos) || relIDVector->isNull(relIDPos)) {
return;
}
auto localNodeGroup =
ku_dynamic_cast<LocalNodeGroup*, LocalRelNG*>(getOrCreateLocalNodeGroup(srcNodeIDVector));
localNodeGroup->update(srcNodeIDVector, relIDVector, columnID, propertyVector);
}

void LocalRelTableData::delete_(ValueVector* srcNodeIDVector, ValueVector* relIDVector) {
bool LocalRelTableData::delete_(
ValueVector* srcNodeIDVector, ValueVector* dstNodeIDVector, ValueVector* relIDVector) {
KU_ASSERT(srcNodeIDVector->state->selVector->selectedSize == 1 &&
relIDVector->state->selVector->selectedSize == 1);
auto srcNodeIDPos = srcNodeIDVector->state->selVector->selectedPositions[0];
if (srcNodeIDVector->isNull(srcNodeIDPos)) {
return;
auto dstNodeIDPos = dstNodeIDVector->state->selVector->selectedPositions[0];
if (srcNodeIDVector->isNull(srcNodeIDPos) || dstNodeIDVector->isNull(dstNodeIDPos)) {
return false;
}
auto localNodeGroup =
ku_dynamic_cast<LocalNodeGroup*, LocalRelNG*>(getOrCreateLocalNodeGroup(srcNodeIDVector));
localNodeGroup->delete_(srcNodeIDVector, relIDVector);
return localNodeGroup->delete_(srcNodeIDVector, relIDVector);
}

LocalNodeGroup* LocalRelTableData::getOrCreateLocalNodeGroup(ValueVector* nodeIDVector) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/local_storage/local_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ LocalNodeGroup::LocalNodeGroup(std::vector<LogicalType*> dataTypes, MemoryManage
for (auto i = 0u; i < dataTypes.size(); ++i) {
// To avoid unnecessary memory consumption, we chunk local changes of each column in the
// node group into chunks of size DEFAULT_VECTOR_CAPACITY.
chunks[i] = std::make_unique<LocalVectorCollection>(dataTypes[i], mm);
chunks[i] = std::make_unique<LocalVectorCollection>(dataTypes[i]->copy(), mm);
}
}

Expand Down
40 changes: 26 additions & 14 deletions src/storage/store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,17 +388,15 @@ void Column::scan(
columnChunk->setNumValues(0);
} else {
auto chunkMetadata = metadataDA->get(nodeGroupIdx, transaction->getType());
KU_ASSERT(chunkMetadata.numValues <= columnChunk->getCapacity());
auto cursor = PageElementCursor(chunkMetadata.pageIdx, 0);
uint64_t numValuesPerPage =
chunkMetadata.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, *dataType);
uint64_t numValuesScanned = 0u;
auto numValuesToScan = columnChunk->getCapacity();
if (chunkMetadata.numValues < numValuesToScan) {
numValuesToScan = chunkMetadata.numValues;
if (chunkMetadata.numValues > columnChunk->getCapacity()) {
columnChunk->resize(std::bit_ceil(chunkMetadata.numValues));
}
auto numValuesToScan = chunkMetadata.numValues;
KU_ASSERT(chunkMetadata.numValues <= columnChunk->getCapacity());
// TODO(Guodong): We should resize as needed here.
while (numValuesScanned < numValuesToScan) {
auto numValuesToReadInPage =
std::min(numValuesPerPage, numValuesToScan - numValuesScanned);
Expand Down Expand Up @@ -677,29 +675,43 @@ bool Column::canCommitInPlace(Transaction* transaction, node_group_idx_t nodeGro

void Column::commitLocalChunkInPlace(Transaction* /*transaction*/,
LocalVectorCollection* localChunk, const offset_to_row_idx_t& insertInfo,
const offset_to_row_idx_t& updateInfo, const offset_set_t& /*deleteInfo*/) {
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) {
applyLocalChunkToColumn(localChunk, updateInfo);
applyLocalChunkToColumn(localChunk, insertInfo);
// Set nulls based on deleteInfo. Note that this code path actually only gets executed when the
// column is a regular format one. This is not a good design, should be unified with csr one in
// the future.
for (auto nodeOffset : deleteInfo) {
setNull(nodeOffset);
}
}

void Column::commitLocalChunkOutOfPlace(Transaction* transaction, node_group_idx_t nodeGroupIdx,
LocalVectorCollection* localChunk, bool isNewNodeGroup, const offset_to_row_idx_t& insertInfo,
const offset_to_row_idx_t& updateInfo, const offset_set_t& /*deleteInfo*/) {
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) {
auto columnChunk = ColumnChunkFactory::createColumnChunk(dataType->copy(), enableCompression);
auto startNodeOffsetInChunk = nodeGroupIdx << StorageConstants::NODE_GROUP_SIZE_LOG2;
if (isNewNodeGroup) {
KU_ASSERT(updateInfo.empty());
KU_ASSERT(updateInfo.empty() && deleteInfo.empty());
// Apply inserts from the local chunk.
applyLocalChunkToColumnChunk(localChunk, columnChunk.get(),
nodeGroupIdx << StorageConstants::NODE_GROUP_SIZE_LOG2, insertInfo);
applyLocalChunkToColumnChunk(
localChunk, columnChunk.get(), startNodeOffsetInChunk, insertInfo);
} else {
// First, scan the whole column chunk from persistent storage.
scan(transaction, nodeGroupIdx, columnChunk.get());
// Then, apply updates from the local chunk.
applyLocalChunkToColumnChunk(localChunk, columnChunk.get(),
nodeGroupIdx << StorageConstants::NODE_GROUP_SIZE_LOG2, updateInfo);
applyLocalChunkToColumnChunk(
localChunk, columnChunk.get(), startNodeOffsetInChunk, updateInfo);
// Lastly, apply inserts from the local chunk.
applyLocalChunkToColumnChunk(localChunk, columnChunk.get(),
nodeGroupIdx << StorageConstants::NODE_GROUP_SIZE_LOG2, insertInfo);
applyLocalChunkToColumnChunk(
localChunk, columnChunk.get(), startNodeOffsetInChunk, insertInfo);
if (columnChunk->getNullChunk()) {
// Set nulls based on deleteInfo.
for (auto nodeOffset : deleteInfo) {
columnChunk->getNullChunk()->setNull(
nodeOffset - startNodeOffsetInChunk, true /* isNull */);
}
}
}
columnChunk->finalize();
append(columnChunk.get(), nodeGroupIdx);
Expand Down
Loading

0 comments on commit 09bea9b

Please sign in to comment.