Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify prepare commit of null columns and nested columns with chunk states #3398

Merged
merged 1 commit into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,24 @@ class Column {
Column* getNullColumn();

virtual void prepareCommit();
virtual void prepareCommitForChunk(transaction::Transaction* transaction,
void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, bool isNewNodeGroup,
const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo);
virtual void prepareCommitForChunk(transaction::Transaction* transaction,
void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, bool isNewNodeGroup,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t startSrcOffset);

virtual void prepareCommitForExistingChunk(transaction::Transaction* transaction,
ChunkState& state, const ChunkCollection& localInsertChunks,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks,
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo);
virtual void prepareCommitForExistingChunk(transaction::Transaction* transaction,
ChunkState& state, const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t startSrcOffset);

virtual void checkpointInMemory();
virtual void rollbackInMemory();

Expand Down Expand Up @@ -187,7 +195,7 @@ class Column {
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t srcOffset);

virtual void commitLocalChunkInPlace(transaction::Transaction* transaction, ChunkState& state,
virtual void commitLocalChunkInPlace(ChunkState& state,
const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo);
Expand Down
11 changes: 3 additions & 8 deletions src/include/storage/store/list_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,25 +94,20 @@ class ListColumn final : public Column {
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInNodeGroup,
common::offset_t endOffsetInNodeGroup);

void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, bool isNewNodeGroup,
void prepareCommitForExistingChunk(transaction::Transaction* transaction, ChunkState& state,
const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo) override;
void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, bool isNewNodeGroup,
void prepareCommitForExistingChunk(transaction::Transaction* transaction, ChunkState& state,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t startSrcOffset) override;

void prepareCommitForOffsetChunk(transaction::Transaction* transaction, ChunkState& offsetState,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t startSrcOffset);
void commitOffsetColumnChunkOutOfPlace(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const std::vector<common::offset_t>& dstOffsets,
const ChunkState& offsetState, const std::vector<common::offset_t>& dstOffsets,
ColumnChunk* chunk, common::offset_t startSrcOffset);
void commitOffsetColumnChunkInPlace(ChunkState& offsetChunk,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t srcOffset);

private:
std::unique_ptr<Column> sizeColumn;
Expand Down
7 changes: 3 additions & 4 deletions src/include/storage/store/null_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ class NullColumn final : public Column {
void write(ChunkState& state, common::offset_t offsetInChunk, ColumnChunk* data,
common::offset_t dataOffset, common::length_t numValues) override;

void commitLocalChunkInPlace(Transaction* transaction, ChunkState& state,
const ChunkCollection& localInsertChunk, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunk, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo) override;
void commitLocalChunkInPlace(ChunkState& state, const ChunkCollection& localInsertChunk,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunk,
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) override;

private:
std::unique_ptr<ColumnChunk> getEmptyChunkForCommit(uint64_t capacity) override {
Expand Down
16 changes: 4 additions & 12 deletions src/include/storage/store/struct_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@ class StructColumn final : public Column {
void write(ChunkState& state, common::offset_t offsetInChunk, ColumnChunk* data,
common::offset_t dataOffset, common::length_t numValues) override;

void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, bool isNewNodeGroup,
const ChunkCollection& localInsertChunk, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunk, const offset_to_row_idx_t& updateInfo,
void prepareCommitForExistingChunk(transaction::Transaction* transaction, ChunkState& state,
const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo) override;
void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, bool isNewNodeGroup,
void prepareCommitForExistingChunk(transaction::Transaction* transaction, ChunkState& state,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t startSrcOffset) override;

Expand All @@ -52,12 +50,6 @@ class StructColumn final : public Column {
void lookupInternal(transaction::Transaction* transaction, ChunkState& readState,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector) override;

bool canCommitInPlace(const ChunkState& state, const ChunkCollection& localInsertChunk,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunk,
const offset_to_row_idx_t& updateInfo) override;
bool canCommitInPlace(const ChunkState& state, const std::vector<common::offset_t>& dstOffsets,
ColumnChunk* chunk, common::offset_t dataOffset) override;

private:
static ChunkCollection getStructChildChunkCollection(const ChunkCollection& chunkCollection,
common::vector_idx_t childIdx);
Expand Down
101 changes: 43 additions & 58 deletions src/storage/store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class SerialColumn final : public Column {
KU_UNREACHABLE;
}

void commitLocalChunkInPlace(Transaction*, ChunkState& state, const ChunkCollection&,
void commitLocalChunkInPlace(ChunkState& state, const ChunkCollection&,
const offset_to_row_idx_t& insertInfo, const ChunkCollection&,
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) override {
// Avoid unused parameter warnings during release build.
Expand Down Expand Up @@ -610,42 +610,10 @@ void Column::prepareCommitForChunk(Transaction* transaction, common::node_group_
commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, localInsertChunks,
insertInfo, localUpdateChunks, updateInfo, deleteInfo);
} else {
bool didInPlaceCommit = false;
ChunkState state;
initChunkState(transaction, nodeGroupIdx, state);
// If this is not a new node group, we should first check if we can perform in-place commit.
if (canCommitInPlace(state, localInsertChunks, insertInfo, localUpdateChunks, updateInfo)) {
commitLocalChunkInPlace(transaction, state, localInsertChunks, insertInfo,
localUpdateChunks, updateInfo, deleteInfo);
didInPlaceCommit = true;
KU_ASSERT(sanityCheckForWrites(state.metadata, dataType));
metadataDA->update(nodeGroupIdx, state.metadata);
} else {
commitLocalChunkOutOfPlace(transaction, state.nodeGroupIdx, isNewNodeGroup,
localInsertChunks, insertInfo, localUpdateChunks, updateInfo, deleteInfo);
}
// TODO(Guodong/Ben): The logic here on NullColumn is confusing as out-of-place commits and
// in-place commits handle it differently. See if we can unify them.
if (nullColumn) {
// Uses functions written for the null chunk which only access the localColumnChunk's
// null information
KU_ASSERT(state.nullState);
if (nullColumn->canCommitInPlace(*state.nullState,
getNullChunkCollection(localInsertChunks), insertInfo,
getNullChunkCollection(localUpdateChunks), updateInfo)) {
nullColumn->commitLocalChunkInPlace(transaction, *state.nullState,
getNullChunkCollection(localInsertChunks), insertInfo,
getNullChunkCollection(localUpdateChunks), updateInfo, deleteInfo);
nullColumn->metadataDA->update(nodeGroupIdx, state.nullState->metadata);
} else if (didInPlaceCommit) {
// Out-of-place commits also commit the null chunk out of place,
// so we only need to do a separate out of place commit for the null chunk if the
// main chunk did an in-place commit.
nullColumn->commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup,
getNullChunkCollection(localInsertChunks), insertInfo,
getNullChunkCollection(localUpdateChunks), updateInfo, deleteInfo);
}
}
prepareCommitForExistingChunk(transaction, state, localInsertChunks, insertInfo,
localUpdateChunks, updateInfo, deleteInfo);
}
}

Expand All @@ -657,31 +625,49 @@ void Column::prepareCommitForChunk(Transaction* transaction, node_group_idx_t no
commitColumnChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, dstOffsets, chunk,
startSrcOffset);
} else {
bool didInPlaceCommit = false;
// If this is not a new node group, we should first check if we can perform in-place commit.
ChunkState state;
initChunkState(transaction, nodeGroupIdx, state);
if (canCommitInPlace(state, dstOffsets, chunk, startSrcOffset)) {
commitColumnChunkInPlace(state, dstOffsets, chunk, startSrcOffset);
didInPlaceCommit = true;
KU_ASSERT(sanityCheckForWrites(state.metadata, dataType));
metadataDA->update(nodeGroupIdx, state.metadata);
} else {
commitColumnChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, dstOffsets,
chunk, startSrcOffset);
}
prepareCommitForExistingChunk(transaction, state, dstOffsets, chunk, startSrcOffset);
}
}

void Column::prepareCommitForExistingChunk(Transaction* transaction, Column::ChunkState& state,
const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo) {
// If this is not a new node group, we should first check if we can perform in-place commit.
if (canCommitInPlace(state, localInsertChunks, insertInfo, localUpdateChunks, updateInfo)) {
commitLocalChunkInPlace(state, localInsertChunks, insertInfo, localUpdateChunks, updateInfo,
deleteInfo);
KU_ASSERT(sanityCheckForWrites(state.metadata, dataType));
metadataDA->update(state.nodeGroupIdx, state.metadata);
if (nullColumn) {
KU_ASSERT(state.nullState);
if (nullColumn->canCommitInPlace(*state.nullState, dstOffsets, chunk->getNullChunk(),
startSrcOffset)) {
nullColumn->commitColumnChunkInPlace(*state.nullState, dstOffsets,
chunk->getNullChunk(), startSrcOffset);
nullColumn->metadataDA->update(nodeGroupIdx, state.nullState->metadata);
} else if (didInPlaceCommit) {
nullColumn->commitColumnChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup,
dstOffsets, chunk->getNullChunk(), startSrcOffset);
}
auto nullInsertChunks = getNullChunkCollection(localInsertChunks);
auto nullUpdateChunks = getNullChunkCollection(localUpdateChunks);
nullColumn->prepareCommitForExistingChunk(transaction, *state.nullState,
nullInsertChunks, insertInfo, nullUpdateChunks, updateInfo, deleteInfo);
}
} else {
commitLocalChunkOutOfPlace(transaction, state.nodeGroupIdx, false /*isNewNodeGroup*/,
localInsertChunks, insertInfo, localUpdateChunks, updateInfo, deleteInfo);
}
}

void Column::prepareCommitForExistingChunk(Transaction* transaction, ChunkState& state,
const std::vector<offset_t>& dstOffsets, ColumnChunk* chunk, offset_t startSrcOffset) {
if (canCommitInPlace(state, dstOffsets, chunk, startSrcOffset)) {
commitColumnChunkInPlace(state, dstOffsets, chunk, startSrcOffset);
KU_ASSERT(sanityCheckForWrites(state.metadata, dataType));
metadataDA->update(state.nodeGroupIdx, state.metadata);
if (nullColumn) {
nullColumn->prepareCommitForExistingChunk(transaction, *state.nullState, dstOffsets,
chunk->getNullChunk(), startSrcOffset);
}
} else {
commitColumnChunkOutOfPlace(transaction, state.nodeGroupIdx, false /*isNewNodeGroup*/,
dstOffsets, chunk, startSrcOffset);
}
}

Expand Down Expand Up @@ -762,10 +748,9 @@ bool Column::canCommitInPlace(const ChunkState& state,
return true;
}

void Column::commitLocalChunkInPlace(Transaction*, ChunkState& state,
const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo,
const offset_set_t&) {
void Column::commitLocalChunkInPlace(ChunkState& state, const ChunkCollection& localInsertChunks,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks,
const offset_to_row_idx_t& updateInfo, const offset_set_t&) {
applyLocalChunkToColumn(state, localUpdateChunks, updateInfo);
applyLocalChunkToColumn(state, localInsertChunks, insertInfo);
}
Expand Down
Loading
Loading