Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework offset column of LIST #3198

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/include/storage/store/list_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,7 @@ class ListColumn : public Column {
void checkpointInMemory() final;
void rollbackInMemory() final;

common::offset_t readOffset(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup);

common::list_size_t readSize(transaction::Transaction* transaction,
common::offset_t readOffsetSize(Column* column, transaction::Transaction* transaction,
hououou marked this conversation as resolved.
Show resolved Hide resolved
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup);

ListOffsetSizeInfo getListOffsetSizeInfo(transaction::Transaction* transaction,
Expand All @@ -99,6 +96,7 @@ class ListColumn : public Column {
ColumnChunk* chunk, common::offset_t startSrcOffset) override;

private:
std::unique_ptr<Column> offsetColumn;
std::unique_ptr<Column> sizeColumn;
std::unique_ptr<Column> dataColumn;
};
Expand Down
5 changes: 5 additions & 0 deletions src/include/storage/store/list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ class ListColumnChunk final : public ColumnChunk {

inline ColumnChunk* getSizeColumnChunk() const { return sizeColumnChunk.get(); }

inline ColumnChunk* getOffsetColumnChunk() const { return offsetColumnChunk.get(); }

void resetToEmpty() override;

inline void setNumValues(uint64_t numValues_) override {
ColumnChunk::setNumValues(numValues_);
offsetColumnChunk->setNumValues(numValues_);
sizeColumnChunk->setNumValues(numValues_);
}

Expand All @@ -65,6 +68,7 @@ class ListColumnChunk final : public ColumnChunk {

inline void resize(uint64_t newCapacity) override {
ColumnChunk::resize(newCapacity);
offsetColumnChunk->resize(newCapacity);
sizeColumnChunk->resize(newCapacity);
}

Expand All @@ -90,6 +94,7 @@ class ListColumnChunk final : public ColumnChunk {
void appendNullList();

protected:
std::unique_ptr<ColumnChunk> offsetColumnChunk;
std::unique_ptr<ColumnChunk> sizeColumnChunk;
std::unique_ptr<ListDataColumnChunk> listDataColumnChunk;
// we use checkOffsetSortedAsc flag to indicate that we do not trigger random write
Expand Down
2 changes: 2 additions & 0 deletions src/storage/stats/table_statistics_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ std::unique_ptr<MetadataDAHInfo> TablesStatistics::createMetadataDAHInfo(
}
} break;
case PhysicalTypeID::LIST: {
metadataDAHInfo->childrenInfos.push_back(
createMetadataDAHInfo(*LogicalType::UINT64(), metadataFH, bm, wal));
metadataDAHInfo->childrenInfos.push_back(
createMetadataDAHInfo(*LogicalType::UINT32(), metadataFH, bm, wal));
metadataDAHInfo->childrenInfos.push_back(
Expand Down
79 changes: 33 additions & 46 deletions src/storage/store/list_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,18 @@ ListColumn::ListColumn(std::string name, LogicalType dataType,
RWPropertyStats propertyStatistics, bool enableCompression)
: Column{name, std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal,
transaction, propertyStatistics, enableCompression, true /* requireNullColumn */} {
auto sizeColName = StorageUtils::getColumnName(name, StorageUtils::ColumnType::OFFSET, "");
auto offsetColName =
StorageUtils::getColumnName(name, StorageUtils::ColumnType::OFFSET, "offset");
auto sizeColName = StorageUtils::getColumnName(name, StorageUtils::ColumnType::OFFSET, "size");
auto dataColName = StorageUtils::getColumnName(name, StorageUtils::ColumnType::DATA, "");
sizeColumn = ColumnFactory::createColumn(sizeColName, *LogicalType::UINT32(),
offsetColumn = ColumnFactory::createColumn(offsetColName, *LogicalType::UINT64(),
*metaDAHeaderInfo.childrenInfos[0], dataFH, metadataFH, bufferManager, wal, transaction,
propertyStatistics, enableCompression);
sizeColumn = ColumnFactory::createColumn(sizeColName, *LogicalType::UINT32(),
*metaDAHeaderInfo.childrenInfos[1], dataFH, metadataFH, bufferManager, wal, transaction,
propertyStatistics, enableCompression);
dataColumn = ColumnFactory::createColumn(dataColName,
*ListType::getChildType(&this->dataType)->copy(), *metaDAHeaderInfo.childrenInfos[1],
*ListType::getChildType(&this->dataType)->copy(), *metaDAHeaderInfo.childrenInfos[2],
dataFH, metadataFH, bufferManager, wal, transaction, propertyStatistics, enableCompression);
}

Expand Down Expand Up @@ -107,7 +112,9 @@ void ListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx,
columnChunk->setNumValues(0);
} else {
auto listColumnChunk = ku_dynamic_cast<ColumnChunk*, ListColumnChunk*>(columnChunk);
Column::scan(transaction, nodeGroupIdx, columnChunk, startOffset, endOffset);
Column::scan(transaction, nodeGroupIdx, listColumnChunk, startOffset, endOffset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you've moved the offsets to offsetColumn, do we need to scan the list column itself?

offsetColumn->scan(transaction, nodeGroupIdx, listColumnChunk->getOffsetColumnChunk(),
startOffset, endOffset);
auto sizeColumnChunk = listColumnChunk->getSizeColumnChunk();
sizeColumn->scan(transaction, nodeGroupIdx, sizeColumnChunk, startOffset, endOffset);
auto resizeNumValues = listColumnChunk->getDataColumnChunk()->getNumValues();
Expand Down Expand Up @@ -173,11 +180,12 @@ void ListColumn::lookupValue(Transaction* transaction, offset_t nodeOffset,
ValueVector* resultVector, uint32_t posInVector) {
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto nodeOffsetInGroup = nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
auto listEndOffset = readOffset(transaction, nodeGroupIdx, nodeOffsetInGroup);
auto size = readSize(transaction, nodeGroupIdx, nodeOffsetInGroup);
auto listEndOffset =
readOffsetSize(offsetColumn.get(), transaction, nodeGroupIdx, nodeOffsetInGroup);
auto size = readOffsetSize(sizeColumn.get(), transaction, nodeGroupIdx, nodeOffsetInGroup);
auto listStartOffset = listEndOffset - size;
auto offsetInVector = posInVector == 0 ? 0 : resultVector->getValue<offset_t>(posInVector - 1);
resultVector->setValue(posInVector, list_entry_t{offsetInVector, size});
resultVector->setValue(posInVector, list_entry_t{offsetInVector, (list_size_t)size});
ListVector::resizeDataVector(resultVector, offsetInVector + size);
auto dataVector = ListVector::getDataVector(resultVector);
dataColumn->scan(transaction, StorageUtils::getNodeGroupIdx(nodeOffset), listStartOffset,
Expand All @@ -188,10 +196,9 @@ void ListColumn::append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) {
KU_ASSERT(columnChunk->getDataType().getPhysicalType() == dataType.getPhysicalType());
auto listColumnChunk = ku_dynamic_cast<ColumnChunk*, ListColumnChunk*>(columnChunk);
Column::append(listColumnChunk, nodeGroupIdx);
auto sizeColumnChunk = listColumnChunk->getSizeColumnChunk();
sizeColumn->append(sizeColumnChunk, nodeGroupIdx);
auto dataColumnChunk = listColumnChunk->getDataColumnChunk();
dataColumn->append(dataColumnChunk, nodeGroupIdx);
offsetColumn->append(listColumnChunk->getOffsetColumnChunk(), nodeGroupIdx);
sizeColumn->append(listColumnChunk->getSizeColumnChunk(), nodeGroupIdx);
dataColumn->append(listColumnChunk->getDataColumnChunk(), nodeGroupIdx);
}

void ListColumn::scanUnfiltered(Transaction* transaction, node_group_idx_t nodeGroupIdx,
Expand Down Expand Up @@ -248,39 +255,28 @@ void ListColumn::scanFiltered(Transaction* transaction, node_group_idx_t nodeGro

void ListColumn::prepareCommit() {
Column::prepareCommit();
offsetColumn->prepareCommit();
sizeColumn->prepareCommit();
dataColumn->prepareCommit();
}

void ListColumn::checkpointInMemory() {
Column::checkpointInMemory();
offsetColumn->checkpointInMemory();
sizeColumn->checkpointInMemory();
dataColumn->checkpointInMemory();
}

void ListColumn::rollbackInMemory() {
Column::rollbackInMemory();
offsetColumn->rollbackInMemory();
sizeColumn->rollbackInMemory();
dataColumn->rollbackInMemory();
}

offset_t ListColumn::readOffset(
Transaction* transaction, node_group_idx_t nodeGroupIdx, offset_t offsetInNodeGroup) {
auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType());
auto pageCursor = PageUtils::getPageCursorForPos(offsetInNodeGroup,
chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType));
pageCursor.pageIdx += chunkMeta.pageIdx;
offset_t value;
readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void {
readToPageFunc(frame, pageCursor, (uint8_t*)&value, 0 /* posInVector */,
1 /* numValuesToRead */, chunkMeta.compMeta);
});
return value;
}

list_size_t ListColumn::readSize(
Transaction* transaction, node_group_idx_t nodeGroupIdx, offset_t offsetInNodeGroup) {
auto chunkMeta = sizeColumn->getMetadataDA()->get(nodeGroupIdx, transaction->getType());
offset_t ListColumn::readOffsetSize(Column* column, Transaction* transaction,
node_group_idx_t nodeGroupIdx, offset_t offsetInNodeGroup) {
auto chunkMeta = column->getMetadataDA()->get(nodeGroupIdx, transaction->getType());
auto pageCursor = PageUtils::getPageCursorForPos(offsetInNodeGroup,
chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType));
pageCursor.pageIdx += chunkMeta.pageIdx;
Expand All @@ -296,10 +292,10 @@ ListOffsetSizeInfo ListColumn::getListOffsetSizeInfo(Transaction* transaction,
node_group_idx_t nodeGroupIdx, offset_t startOffsetInNodeGroup, offset_t endOffsetInNodeGroup) {
auto numOffsetsToRead = endOffsetInNodeGroup - startOffsetInNodeGroup;
auto offsetColumnChunk = ColumnChunkFactory::createColumnChunk(
*common::LogicalType::INT64(), enableCompression, numOffsetsToRead);
*common::LogicalType::UINT64(), enableCompression, numOffsetsToRead);
auto sizeColumnChunk = ColumnChunkFactory::createColumnChunk(
*common::LogicalType::UINT32(), enableCompression, numOffsetsToRead);
Column::scan(transaction, nodeGroupIdx, offsetColumnChunk.get(), startOffsetInNodeGroup,
offsetColumn->scan(transaction, nodeGroupIdx, offsetColumnChunk.get(), startOffsetInNodeGroup,
endOffsetInNodeGroup);
sizeColumn->scan(transaction, nodeGroupIdx, sizeColumnChunk.get(), startOffsetInNodeGroup,
endOffsetInNodeGroup);
Expand Down Expand Up @@ -346,6 +342,7 @@ void ListColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_
} else {
// we separate the commit into three parts: offset chunk commit, size column chunk commit,
// data column chunk
Column::prepareCommitForChunk(transaction, nodeGroupIdx, dstOffsets, chunk, startSrcOffset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here, what are we committing here, as you've moved offsets to offsetColumn as I understand?

auto listChunk = ku_dynamic_cast<ColumnChunk*, ListColumnChunk*>(chunk);
sizeColumn->prepareCommitForChunk(
transaction, nodeGroupIdx, dstOffsets, listChunk->getSizeColumnChunk(), startSrcOffset);
Expand All @@ -363,25 +360,15 @@ void ListColumn::prepareCommitForChunk(Transaction* transaction, node_group_idx_
}
dataColumn->prepareCommitForChunk(
transaction, nodeGroupIdx, dstOffsetsInDataColumn, dataColumnChunk, startListOffset);
// we need to update the offset since we do not do in-place list data update but append data
// in the end of list data column we need to plus to original data column size to get the
// new offset
// TODO(Jiamin): A better way is to store the offset in a offset column, just like size
// column. Then we can reuse prepareCommitForChunk interface for offset column.
auto offsetChunkMeta = getMetadata(nodeGroupIdx, transaction->getType());
auto offsetColumnChunk = ColumnChunkFactory::createColumnChunk(*dataType.copy(),
enableCompression, 1.5 * std::bit_ceil(offsetChunkMeta.numValues + dstOffsets.size()));
Column::scan(transaction, nodeGroupIdx, offsetColumnChunk.get());

auto offsetColumnChunk = listChunk->getOffsetColumnChunk();
for (auto i = 0u; i < numListsToAppend; i++) {
auto listEndOffset = listChunk->getListEndOffset(startSrcOffset + i);
auto isNull = listChunk->getNullChunk()->isNull(startSrcOffset + i);
offsetColumnChunk->setValue<offset_t>(dataColumnSize + listEndOffset, dstOffsets[i]);
offsetColumnChunk->getNullChunk()->setNull(dstOffsets[i], isNull);
offsetColumnChunk->setValue<offset_t>(
dataColumnSize + listEndOffset, startSrcOffset + i);
}
auto offsetListChunk =
ku_dynamic_cast<ColumnChunk*, ListColumnChunk*>(offsetColumnChunk.get());
offsetListChunk->getSizeColumnChunk()->setNumValues(offsetColumnChunk->getNumValues());
Column::append(offsetColumnChunk.get(), nodeGroupIdx);
offsetColumn->prepareCommitForChunk(
transaction, nodeGroupIdx, dstOffsets, offsetColumnChunk, startSrcOffset);
}
}

Expand Down
Loading
Loading