Skip to content

Commit

Permalink
List offset Column Refactor (#3219)
Browse files Browse the repository at this point in the history
 rework list offset commit
  • Loading branch information
hououou committed Apr 8, 2024
1 parent 7cb6fab commit c6e62e9
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 34 deletions.
12 changes: 12 additions & 0 deletions src/include/storage/store/list_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ class ListColumn : public Column {
common::node_group_idx_t nodeGroupIdx, const std::vector<common::offset_t>& dstOffsets,
ColumnChunk* chunk, common::offset_t startSrcOffset) override;

void prepareCommitForOffsetChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const std::vector<common::offset_t>& dstOffsets,
ColumnChunk* chunk, common::offset_t startSrcOffset);

void commitOffsetColumnChunkOutOfPlace(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const std::vector<common::offset_t>& dstOffsets,
ColumnChunk* chunk, common::offset_t startSrcOffset);

void commitOffsetColumnChunkInPlace(common::node_group_idx_t nodeGroupIdx,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t srcOffset);

private:
std::unique_ptr<Column> sizeColumn;
std::unique_ptr<Column> dataColumn;
Expand Down
112 changes: 78 additions & 34 deletions src/storage/store/list_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include "storage/store/list_column_chunk.h"
#include "storage/store/null_column.h"
#include <bit>

using namespace kuzu::common;
using namespace kuzu::transaction;

Expand Down Expand Up @@ -346,47 +345,92 @@ void ListColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_
if (isNewNodeGroup) {
commitColumnChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, dstOffsets, chunk,
startSrcOffset);
return;
}
// we first check if we can in place commit data column chunk
// if we can not in place commit data column chunk, we will out place commit offset/size/data
// column chunk otherwise, we commit data column chunk in place and separately commit
// offset/size column chunk
auto listChunk = ku_dynamic_cast<ColumnChunk*, ListColumnChunk*>(chunk);
auto dataColumnSize = dataColumn->getMetadata(nodeGroupIdx, transaction->getType()).numValues;
auto dataColumnChunk = listChunk->getDataColumnChunk();
auto numListsToAppend = std::min(chunk->getNumValues(), (uint64_t)dstOffsets.size());
auto dataSize = 0u;
auto startListOffset = listChunk->getListStartOffset(startSrcOffset);
std::vector<common::offset_t> dstOffsetsInDataColumn;
for (auto i = 0u; i < numListsToAppend; i++) {
for (auto j = 0u; j < listChunk->getListSize(startSrcOffset + i); j++) {
dstOffsetsInDataColumn.push_back(dataColumnSize + dataSize++);
}
}
bool dataColumnCanCommitInPlace = dataColumn->canCommitInPlace(transaction, nodeGroupIdx,
dstOffsetsInDataColumn, dataColumnChunk, startListOffset);
if (!dataColumnCanCommitInPlace) {
commitColumnChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, dstOffsets, chunk,
startSrcOffset);
} else {
// we separate the commit into three parts: offset chunk commit, size column chunk commit,
// data column chunk
auto listChunk = ku_dynamic_cast<ColumnChunk*, ListColumnChunk*>(chunk);
dataColumn->commitColumnChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup,
dstOffsetsInDataColumn, dataColumnChunk, startListOffset);
sizeColumn->prepareCommitForChunk(transaction, nodeGroupIdx, dstOffsets,
listChunk->getSizeColumnChunk(), startSrcOffset);
auto dataColumnSize =
dataColumn->getMetadata(nodeGroupIdx, transaction->getType()).numValues;
auto dataColumnChunk = listChunk->getDataColumnChunk();
auto numListsToAppend = std::min(chunk->getNumValues(), (uint64_t)dstOffsets.size());
auto dataSize = 0u;
auto startListOffset = listChunk->getListStartOffset(startSrcOffset);
std::vector<common::offset_t> dstOffsetsInDataColumn;
for (auto i = 0u; i < numListsToAppend; i++) {
for (auto j = 0u; j < listChunk->getListSize(startSrcOffset + i); j++) {
dstOffsetsInDataColumn.push_back(dataColumnSize + dataSize++);
}
}
dataColumn->prepareCommitForChunk(transaction, nodeGroupIdx, dstOffsetsInDataColumn,
dataColumnChunk, startListOffset);
// we need to update the offset since we do not do in-place list data update but append data
// in the end of list data column we need to plus to original data column size to get the
// new offset
// TODO(Jiamin): A better way is to store the offset in a offset column, just like size
// column. Then we can reuse prepareCommitForChunk interface for offset column.
auto offsetChunkMeta = getMetadata(nodeGroupIdx, transaction->getType());
auto offsetColumnChunk = ColumnChunkFactory::createColumnChunk(*dataType.copy(),
enableCompression, 1.5 * std::bit_ceil(offsetChunkMeta.numValues + dstOffsets.size()));
Column::scan(transaction, nodeGroupIdx, offsetColumnChunk.get());
for (auto i = 0u; i < numListsToAppend; i++) {
auto listEndOffset = listChunk->getListEndOffset(startSrcOffset + i);
auto isNull = listChunk->getNullChunk()->isNull(startSrcOffset + i);
offsetColumnChunk->setValue<offset_t>(dataColumnSize + listEndOffset, dstOffsets[i]);
offsetColumnChunk->getNullChunk()->setNull(dstOffsets[i], isNull);
chunk->setValue<offset_t>(dataColumnSize + listEndOffset, startSrcOffset + i);
}
prepareCommitForOffsetChunk(transaction, nodeGroupIdx, dstOffsets, chunk, startSrcOffset);
}
}

void ListColumn::prepareCommitForOffsetChunk(Transaction* transaction,
node_group_idx_t nodeGroupIdx, const std::vector<common::offset_t>& dstOffsets,
ColumnChunk* chunk, offset_t startSrcOffset) {
metadataDA->prepareCommit();
bool didInPlaceCommit = false;
if (canCommitInPlace(transaction, nodeGroupIdx, dstOffsets, chunk, startSrcOffset)) {
commitOffsetColumnChunkInPlace(nodeGroupIdx, dstOffsets, chunk, startSrcOffset);
didInPlaceCommit = true;
} else {
commitOffsetColumnChunkOutOfPlace(transaction, nodeGroupIdx, dstOffsets, chunk,
startSrcOffset);
}
if (nullColumn) {
if (nullColumn->canCommitInPlace(transaction, nodeGroupIdx, dstOffsets,
chunk->getNullChunk(), startSrcOffset)) {
nullColumn->commitColumnChunkInPlace(nodeGroupIdx, dstOffsets, chunk->getNullChunk(),
startSrcOffset);
} else if (didInPlaceCommit) {
nullColumn->commitColumnChunkOutOfPlace(transaction, nodeGroupIdx, false, dstOffsets,
chunk->getNullChunk(), startSrcOffset);
}
auto offsetListChunk =
ku_dynamic_cast<ColumnChunk*, ListColumnChunk*>(offsetColumnChunk.get());
offsetListChunk->getSizeColumnChunk()->setNumValues(offsetColumnChunk->getNumValues());
Column::append(offsetColumnChunk.get(), nodeGroupIdx);
}
}

void ListColumn::commitOffsetColumnChunkInPlace(node_group_idx_t nodeGroupIdx,
const std::vector<offset_t>& dstOffsets, ColumnChunk* chunk, offset_t srcOffset) {
for (auto i = 0u; i < dstOffsets.size(); i++) {
Column::write(nodeGroupIdx, dstOffsets[i], chunk, srcOffset + i, 1 /* numValues */);
}
}

void ListColumn::commitOffsetColumnChunkOutOfPlace(Transaction* transaction,
node_group_idx_t nodeGroupIdx, const std::vector<offset_t>& dstOffsets, ColumnChunk* chunk,
offset_t startSrcOffset) {
auto listChunk = ku_dynamic_cast<ColumnChunk*, ListColumnChunk*>(chunk);
auto offsetChunkMeta = getMetadata(nodeGroupIdx, transaction->getType());
auto offsetColumnChunk = ColumnChunkFactory::createColumnChunk(*dataType.copy(),
enableCompression, 1.5 * std::bit_ceil(offsetChunkMeta.numValues + dstOffsets.size()));
Column::scan(transaction, nodeGroupIdx, offsetColumnChunk.get());
auto numListsToAppend = std::min(chunk->getNumValues(), (uint64_t)dstOffsets.size());
for (auto i = 0u; i < numListsToAppend; i++) {
auto listEndOffset = listChunk->getListEndOffset(startSrcOffset + i);
auto isNull = listChunk->getNullChunk()->isNull(startSrcOffset + i);
offsetColumnChunk->setValue<offset_t>(listEndOffset, dstOffsets[i]);
offsetColumnChunk->getNullChunk()->setNull(dstOffsets[i], isNull);
}
auto offsetListChunk = ku_dynamic_cast<ColumnChunk*, ListColumnChunk*>(offsetColumnChunk.get());
offsetListChunk->getSizeColumnChunk()->setNumValues(offsetColumnChunk->getNumValues());
Column::append(offsetColumnChunk.get(), nodeGroupIdx);
}

} // namespace storage
} // namespace kuzu

0 comments on commit c6e62e9

Please sign in to comment.