Skip to content

Commit

Permalink
add state for prepare commit of nested columns; simplify null column …
Browse files Browse the repository at this point in the history
…commit (#3398)
  • Loading branch information
ray6080 committed Apr 28, 2024
1 parent c4af99e commit 88d7436
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 240 deletions.
14 changes: 11 additions & 3 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,24 @@ class Column {
Column* getNullColumn();

virtual void prepareCommit();
virtual void prepareCommitForChunk(transaction::Transaction* transaction,
void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, bool isNewNodeGroup,
const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo);
virtual void prepareCommitForChunk(transaction::Transaction* transaction,
void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, bool isNewNodeGroup,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t startSrcOffset);

virtual void prepareCommitForExistingChunk(transaction::Transaction* transaction,
ChunkState& state, const ChunkCollection& localInsertChunks,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks,
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo);
virtual void prepareCommitForExistingChunk(transaction::Transaction* transaction,
ChunkState& state, const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t startSrcOffset);

virtual void checkpointInMemory();
virtual void rollbackInMemory();

Expand Down Expand Up @@ -187,7 +195,7 @@ class Column {
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t srcOffset);

virtual void commitLocalChunkInPlace(transaction::Transaction* transaction, ChunkState& state,
virtual void commitLocalChunkInPlace(ChunkState& state,
const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo);
Expand Down
11 changes: 3 additions & 8 deletions src/include/storage/store/list_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,25 +94,20 @@ class ListColumn final : public Column {
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInNodeGroup,
common::offset_t endOffsetInNodeGroup);

void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, bool isNewNodeGroup,
void prepareCommitForExistingChunk(transaction::Transaction* transaction, ChunkState& state,
const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo) override;
void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, bool isNewNodeGroup,
void prepareCommitForExistingChunk(transaction::Transaction* transaction, ChunkState& state,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t startSrcOffset) override;

void prepareCommitForOffsetChunk(transaction::Transaction* transaction, ChunkState& offsetState,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t startSrcOffset);
void commitOffsetColumnChunkOutOfPlace(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const std::vector<common::offset_t>& dstOffsets,
const ChunkState& offsetState, const std::vector<common::offset_t>& dstOffsets,
ColumnChunk* chunk, common::offset_t startSrcOffset);
void commitOffsetColumnChunkInPlace(ChunkState& offsetChunk,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t srcOffset);

private:
std::unique_ptr<Column> sizeColumn;
Expand Down
7 changes: 3 additions & 4 deletions src/include/storage/store/null_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ class NullColumn final : public Column {
void write(ChunkState& state, common::offset_t offsetInChunk, ColumnChunk* data,
common::offset_t dataOffset, common::length_t numValues) override;

void commitLocalChunkInPlace(Transaction* transaction, ChunkState& state,
const ChunkCollection& localInsertChunk, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunk, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo) override;
void commitLocalChunkInPlace(ChunkState& state, const ChunkCollection& localInsertChunk,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunk,
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) override;

private:
std::unique_ptr<ColumnChunk> getEmptyChunkForCommit(uint64_t capacity) override {
Expand Down
16 changes: 4 additions & 12 deletions src/include/storage/store/struct_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@ class StructColumn final : public Column {
void write(ChunkState& state, common::offset_t offsetInChunk, ColumnChunk* data,
common::offset_t dataOffset, common::length_t numValues) override;

void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, bool isNewNodeGroup,
const ChunkCollection& localInsertChunk, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunk, const offset_to_row_idx_t& updateInfo,
void prepareCommitForExistingChunk(transaction::Transaction* transaction, ChunkState& state,
const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo) override;
void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, bool isNewNodeGroup,
void prepareCommitForExistingChunk(transaction::Transaction* transaction, ChunkState& state,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t startSrcOffset) override;

Expand All @@ -52,12 +50,6 @@ class StructColumn final : public Column {
void lookupInternal(transaction::Transaction* transaction, ChunkState& readState,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector) override;

bool canCommitInPlace(const ChunkState& state, const ChunkCollection& localInsertChunk,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunk,
const offset_to_row_idx_t& updateInfo) override;
bool canCommitInPlace(const ChunkState& state, const std::vector<common::offset_t>& dstOffsets,
ColumnChunk* chunk, common::offset_t dataOffset) override;

private:
static ChunkCollection getStructChildChunkCollection(const ChunkCollection& chunkCollection,
common::vector_idx_t childIdx);
Expand Down
101 changes: 43 additions & 58 deletions src/storage/store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class SerialColumn final : public Column {
KU_UNREACHABLE;
}

void commitLocalChunkInPlace(Transaction*, ChunkState& state, const ChunkCollection&,
void commitLocalChunkInPlace(ChunkState& state, const ChunkCollection&,
const offset_to_row_idx_t& insertInfo, const ChunkCollection&,
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo) override {
// Avoid unused parameter warnings during release build.
Expand Down Expand Up @@ -610,42 +610,10 @@ void Column::prepareCommitForChunk(Transaction* transaction, common::node_group_
commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, localInsertChunks,
insertInfo, localUpdateChunks, updateInfo, deleteInfo);
} else {
bool didInPlaceCommit = false;
ChunkState state;
initChunkState(transaction, nodeGroupIdx, state);
// If this is not a new node group, we should first check if we can perform in-place commit.
if (canCommitInPlace(state, localInsertChunks, insertInfo, localUpdateChunks, updateInfo)) {
commitLocalChunkInPlace(transaction, state, localInsertChunks, insertInfo,
localUpdateChunks, updateInfo, deleteInfo);
didInPlaceCommit = true;
KU_ASSERT(sanityCheckForWrites(state.metadata, dataType));
metadataDA->update(nodeGroupIdx, state.metadata);
} else {
commitLocalChunkOutOfPlace(transaction, state.nodeGroupIdx, isNewNodeGroup,
localInsertChunks, insertInfo, localUpdateChunks, updateInfo, deleteInfo);
}
// TODO(Guodong/Ben): The logic here on NullColumn is confusing as out-of-place commits and
// in-place commits handle it differently. See if we can unify them.
if (nullColumn) {
// Uses functions written for the null chunk which only access the localColumnChunk's
// null information
KU_ASSERT(state.nullState);
if (nullColumn->canCommitInPlace(*state.nullState,
getNullChunkCollection(localInsertChunks), insertInfo,
getNullChunkCollection(localUpdateChunks), updateInfo)) {
nullColumn->commitLocalChunkInPlace(transaction, *state.nullState,
getNullChunkCollection(localInsertChunks), insertInfo,
getNullChunkCollection(localUpdateChunks), updateInfo, deleteInfo);
nullColumn->metadataDA->update(nodeGroupIdx, state.nullState->metadata);
} else if (didInPlaceCommit) {
// Out-of-place commits also commit the null chunk out of place,
// so we only need to do a separate out of place commit for the null chunk if the
// main chunk did an in-place commit.
nullColumn->commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup,
getNullChunkCollection(localInsertChunks), insertInfo,
getNullChunkCollection(localUpdateChunks), updateInfo, deleteInfo);
}
}
prepareCommitForExistingChunk(transaction, state, localInsertChunks, insertInfo,
localUpdateChunks, updateInfo, deleteInfo);
}
}

Expand All @@ -657,31 +625,49 @@ void Column::prepareCommitForChunk(Transaction* transaction, node_group_idx_t no
commitColumnChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, dstOffsets, chunk,
startSrcOffset);
} else {
bool didInPlaceCommit = false;
// If this is not a new node group, we should first check if we can perform in-place commit.
ChunkState state;
initChunkState(transaction, nodeGroupIdx, state);
if (canCommitInPlace(state, dstOffsets, chunk, startSrcOffset)) {
commitColumnChunkInPlace(state, dstOffsets, chunk, startSrcOffset);
didInPlaceCommit = true;
KU_ASSERT(sanityCheckForWrites(state.metadata, dataType));
metadataDA->update(nodeGroupIdx, state.metadata);
} else {
commitColumnChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, dstOffsets,
chunk, startSrcOffset);
}
prepareCommitForExistingChunk(transaction, state, dstOffsets, chunk, startSrcOffset);
}
}

void Column::prepareCommitForExistingChunk(Transaction* transaction, Column::ChunkState& state,
const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo) {
// If this is not a new node group, we should first check if we can perform in-place commit.
if (canCommitInPlace(state, localInsertChunks, insertInfo, localUpdateChunks, updateInfo)) {
commitLocalChunkInPlace(state, localInsertChunks, insertInfo, localUpdateChunks, updateInfo,
deleteInfo);
KU_ASSERT(sanityCheckForWrites(state.metadata, dataType));
metadataDA->update(state.nodeGroupIdx, state.metadata);
if (nullColumn) {
KU_ASSERT(state.nullState);
if (nullColumn->canCommitInPlace(*state.nullState, dstOffsets, chunk->getNullChunk(),
startSrcOffset)) {
nullColumn->commitColumnChunkInPlace(*state.nullState, dstOffsets,
chunk->getNullChunk(), startSrcOffset);
nullColumn->metadataDA->update(nodeGroupIdx, state.nullState->metadata);
} else if (didInPlaceCommit) {
nullColumn->commitColumnChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup,
dstOffsets, chunk->getNullChunk(), startSrcOffset);
}
auto nullInsertChunks = getNullChunkCollection(localInsertChunks);
auto nullUpdateChunks = getNullChunkCollection(localUpdateChunks);
nullColumn->prepareCommitForExistingChunk(transaction, *state.nullState,
nullInsertChunks, insertInfo, nullUpdateChunks, updateInfo, deleteInfo);
}
} else {
commitLocalChunkOutOfPlace(transaction, state.nodeGroupIdx, false /*isNewNodeGroup*/,
localInsertChunks, insertInfo, localUpdateChunks, updateInfo, deleteInfo);
}
}

void Column::prepareCommitForExistingChunk(Transaction* transaction, ChunkState& state,
const std::vector<offset_t>& dstOffsets, ColumnChunk* chunk, offset_t startSrcOffset) {
if (canCommitInPlace(state, dstOffsets, chunk, startSrcOffset)) {
commitColumnChunkInPlace(state, dstOffsets, chunk, startSrcOffset);
KU_ASSERT(sanityCheckForWrites(state.metadata, dataType));
metadataDA->update(state.nodeGroupIdx, state.metadata);
if (nullColumn) {
nullColumn->prepareCommitForExistingChunk(transaction, *state.nullState, dstOffsets,
chunk->getNullChunk(), startSrcOffset);
}
} else {
commitColumnChunkOutOfPlace(transaction, state.nodeGroupIdx, false /*isNewNodeGroup*/,
dstOffsets, chunk, startSrcOffset);
}
}

Expand Down Expand Up @@ -762,10 +748,9 @@ bool Column::canCommitInPlace(const ChunkState& state,
return true;
}

void Column::commitLocalChunkInPlace(Transaction*, ChunkState& state,
const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo,
const offset_set_t&) {
void Column::commitLocalChunkInPlace(ChunkState& state, const ChunkCollection& localInsertChunks,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks,
const offset_to_row_idx_t& updateInfo, const offset_set_t&) {
applyLocalChunkToColumn(state, localUpdateChunks, updateInfo);
applyLocalChunkToColumn(state, localInsertChunks, insertInfo);
}
Expand Down
Loading

0 comments on commit 88d7436

Please sign in to comment.