Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework VAR_LIST #3019

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class ColumnChunk {

inline uint64_t getCapacity() const { return capacity; }
inline uint64_t getNumValues() const { return numValues; }
void setNumValues(uint64_t numValues_);
virtual void setNumValues(uint64_t numValues_);
virtual bool numValuesSanityCheck() const;
bool isCompressionEnabled() const { return enableCompression; }

Expand Down
32 changes: 15 additions & 17 deletions src/include/storage/store/var_list_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ namespace kuzu {
namespace storage {

struct ListOffsetInfoInStorage {
common::offset_t prevNodeListOffset;
common::offset_t numTotal;
std::vector<std::unique_ptr<common::ValueVector>> offsetVectors;
std::vector<std::unique_ptr<common::ValueVector>> sizeVectors;

ListOffsetInfoInStorage(common::offset_t prevNodeListOffset,
std::vector<std::unique_ptr<common::ValueVector>> offsetVectors)
: prevNodeListOffset{prevNodeListOffset}, offsetVectors{std::move(offsetVectors)} {}
ListOffsetInfoInStorage(common::offset_t numTotal,
std::vector<std::unique_ptr<common::ValueVector>> offsetVectors,
std::vector<std::unique_ptr<common::ValueVector>> sizeVectors)
: numTotal{numTotal}, offsetVectors{std::move(offsetVectors)}, sizeVectors{std::move(
sizeVectors)} {}

common::offset_t getListOffset(uint64_t nodePos) const;

inline uint64_t getListLength(uint64_t nodePos) const {
KU_ASSERT(getListOffset(nodePos + 1) >= getListOffset(nodePos));
return getListOffset(nodePos + 1) - getListOffset(nodePos);
}
uint64_t getListLength(uint64_t nodePos) const;
common::offset_t getListEndOffset(uint64_t nodePos) const;
common::offset_t getListStartOffset(uint64_t nodePos) const;
};

class VarListColumn : public Column {
Expand Down Expand Up @@ -69,13 +69,6 @@ class VarListColumn : public Column {
void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) override;

private:
inline common::offset_t readListOffsetInStorage(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup) {
return offsetInNodeGroup == 0 ?
0 :
readOffset(transaction, nodeGroupIdx, offsetInNodeGroup - 1);
}

void scanUnfiltered(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::ValueVector* resultVector,
const ListOffsetInfoInStorage& listOffsetInfoInStorage);
Expand All @@ -102,12 +95,17 @@ class VarListColumn : public Column {

common::offset_t readOffset(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup);

uint64_t readSize(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t offsetInNodeGroup);

ListOffsetInfoInStorage getListOffsetInfoInStorage(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInNodeGroup,
common::offset_t endOffsetInNodeGroup,
const std::shared_ptr<common::DataChunkState>& state);

private:
std::unique_ptr<Column> sizeColumn;
std::unique_ptr<Column> dataColumn;
};

Expand Down
35 changes: 30 additions & 5 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,15 @@ class VarListColumnChunk : public ColumnChunk {
return varListDataColumnChunk->dataColumnChunk.get();
}

inline ColumnChunk* getSizeColumnChunk() const { return sizeColumnChunk.get(); }

void resetToEmpty() final;

inline void setNumValues(uint64_t numValues_) final {
ColumnChunk::setNumValues(numValues_);
sizeColumnChunk->setNumValues(numValues_);
}

void append(common::ValueVector* vector, common::SelectionVector& selVector) final;
// Note: `write` assumes that no `append` will be called afterward.
void write(common::ValueVector* vector, common::offset_t offsetInVector,
Expand All @@ -51,12 +58,32 @@ class VarListColumnChunk : public ColumnChunk {
varListDataColumnChunk->resizeBuffer(numValues);
}

inline void resize(uint64_t newCapacity) final {
ColumnChunk::resize(newCapacity);
sizeColumnChunk->resize(newCapacity);
}

void finalize() final;

inline common::offset_t getListOffset(common::offset_t offset) const {
return offset == 0 ? 0 : getValue<uint64_t>(offset - 1);
inline common::offset_t getListStartOffset(common::offset_t offset) const {
if (numValues == 0)
hououou marked this conversation as resolved.
Show resolved Hide resolved
return 0;
return offset == numValues ? getListEndOffset(offset - 1) :
getListEndOffset(offset) - getListLen(offset);
}

inline common::offset_t getListEndOffset(common::offset_t offset) const {
KU_ASSERT(offset < numValues);
return getValue<uint64_t>(offset);
}

inline uint64_t getListLen(common::offset_t offset) const {
KU_ASSERT(offset < sizeColumnChunk->getNumValues());
return sizeColumnChunk->getValue<uint64_t>(offset);
}

void resetOffset();

protected:
void copyListValues(const common::list_entry_t& entry, common::ValueVector* dataVector);

Expand All @@ -74,14 +101,12 @@ class VarListColumnChunk : public ColumnChunk {
}
indicesColumnChunk->setNumValues(numValues);
}
inline uint64_t getListLen(common::offset_t offset) const {
return getListOffset(offset + 1) - getListOffset(offset);
}

void resetFromOtherChunk(VarListColumnChunk* other);
void appendNullList();

protected:
std::unique_ptr<ColumnChunk> sizeColumnChunk;
std::unique_ptr<VarListDataColumnChunk> varListDataColumnChunk;
// The following is needed to write var list to random positions in the column chunk.
// We first append var list to the end of the column chunk. Then use indicesColumnChunk to track
Expand Down
2 changes: 2 additions & 0 deletions src/storage/stats/table_statistics_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ std::unique_ptr<MetadataDAHInfo> TablesStatistics::createMetadataDAHInfo(
}
} break;
case PhysicalTypeID::VAR_LIST: {
metadataDAHInfo->childrenInfos.push_back(
createMetadataDAHInfo(*LogicalType::INT64(), metadataFH, bm, wal));
metadataDAHInfo->childrenInfos.push_back(
createMetadataDAHInfo(*VarListType::getChildType(&dataType), metadataFH, bm, wal));
} break;
Expand Down
105 changes: 75 additions & 30 deletions src/storage/store/var_list_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,23 @@
namespace kuzu {
namespace storage {

offset_t ListOffsetInfoInStorage::getListOffset(uint64_t nodePos) const {
if (nodePos == 0) {
return prevNodeListOffset;
} else {
auto offsetVector = offsetVectors[(nodePos - 1) / DEFAULT_VECTOR_CAPACITY].get();
return offsetVector->getValue<offset_t>((nodePos - 1) % DEFAULT_VECTOR_CAPACITY);
offset_t ListOffsetInfoInStorage::getListStartOffset(uint64_t nodePos) const {
if (numTotal == 0) {
return 0;
}
return nodePos == numTotal ? getListEndOffset(nodePos - 1) :
getListEndOffset(nodePos) - getListLength(nodePos);
}

offset_t ListOffsetInfoInStorage::getListEndOffset(uint64_t nodePos) const {

Check warning on line 22 in src/storage/store/var_list_column.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/store/var_list_column.cpp#L22

Added line #L22 was not covered by tests
auto offsetVector = offsetVectors[(nodePos) / DEFAULT_VECTOR_CAPACITY].get();
return offsetVector->getValue<offset_t>((nodePos) % DEFAULT_VECTOR_CAPACITY);
}

uint64_t ListOffsetInfoInStorage::getListLength(uint64_t nodePos) const {
KU_ASSERT(nodePos < sizeVectors.size() * common::DEFAULT_VECTOR_CAPACITY);
auto sizeVector = sizeVectors[nodePos / common::DEFAULT_VECTOR_CAPACITY].get();
return sizeVector->getValue<uint64_t>(nodePos % common::DEFAULT_VECTOR_CAPACITY);
}

VarListColumn::VarListColumn(std::string name, LogicalType dataType,
Expand All @@ -26,9 +36,13 @@
RWPropertyStats propertyStatistics, bool enableCompression)
: Column{name, std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal,
transaction, propertyStatistics, enableCompression, true /* requireNullColumn */} {
auto sizeColName = StorageUtils::getColumnName(name, StorageUtils::ColumnType::OFFSET, "");
auto dataColName = StorageUtils::getColumnName(name, StorageUtils::ColumnType::DATA, "");
sizeColumn = ColumnFactory::createColumn(sizeColName, *LogicalType::INT64(),
*metaDAHeaderInfo.childrenInfos[0], dataFH, metadataFH, bufferManager, wal, transaction,
propertyStatistics, enableCompression);
dataColumn = ColumnFactory::createColumn(dataColName,
*VarListType::getChildType(&this->dataType)->copy(), *metaDAHeaderInfo.childrenInfos[0],
*VarListType::getChildType(&this->dataType)->copy(), *metaDAHeaderInfo.childrenInfos[1],
dataFH, metadataFH, bufferManager, wal, transaction, propertyStatistics, enableCompression);
}

Expand All @@ -53,9 +67,9 @@
listOffsetInVector += length;
}
ListVector::resizeDataVector(resultVector, listOffsetInVector);
dataColumn->scan(transaction, nodeGroupIdx, listOffsetInfoInStorage.getListOffset(0),
listOffsetInfoInStorage.getListOffset(numValues), ListVector::getDataVector(resultVector),
offsetToWriteListData);
auto dataVector = ListVector::getDataVector(resultVector);
dataColumn->scan(transaction, nodeGroupIdx, listOffsetInfoInStorage.getListStartOffset(0),
listOffsetInfoInStorage.getListStartOffset(numValues), dataVector, offsetToWriteListData);
}

void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx,
Expand All @@ -64,14 +78,17 @@
columnChunk->setNumValues(0);
} else {
Column::scan(transaction, nodeGroupIdx, columnChunk, startOffset, endOffset);
// TODO: FIX-ME.
auto sizeColumnChunk =
ku_dynamic_cast<ColumnChunk*, VarListColumnChunk*>(columnChunk)->getSizeColumnChunk();
sizeColumn->scan(transaction, nodeGroupIdx, sizeColumnChunk, startOffset, endOffset);
auto varListColumnChunk = ku_dynamic_cast<ColumnChunk*, VarListColumnChunk*>(columnChunk);
auto startVarListOffset = varListColumnChunk->getListOffset(0);
auto endVarListOffset = varListColumnChunk->getListOffset(columnChunk->getNumValues());
auto numElements = endVarListOffset - startVarListOffset + 1;
auto startVarListOffset = varListColumnChunk->getListStartOffset(0);
auto endVarListOffset = varListColumnChunk->getListStartOffset(columnChunk->getNumValues());
auto numElements = endVarListOffset - startVarListOffset;
varListColumnChunk->resizeDataColumnChunk(std::bit_ceil(numElements));
dataColumn->scan(transaction, nodeGroupIdx, varListColumnChunk->getDataColumnChunk(),
startVarListOffset, endVarListOffset);
varListColumnChunk->resetOffset();
}
}

Expand All @@ -82,6 +99,7 @@
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(startNodeOffset);
auto startNodeOffsetInGroup =
startNodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
KU_ASSERT(resultVector->state);
auto listOffsetInfoInStorage =
getListOffsetInfoInStorage(transaction, nodeGroupIdx, startNodeOffsetInGroup,
startNodeOffsetInGroup + nodeIDVector->state->getOriginalSize(), resultVector->state);
Expand All @@ -96,21 +114,24 @@
ValueVector* resultVector, uint32_t posInVector) {
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto nodeOffsetInGroup = nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
auto listOffset = readListOffsetInStorage(transaction, nodeGroupIdx, nodeOffsetInGroup);
auto length = readListOffsetInStorage(transaction, nodeGroupIdx, nodeOffsetInGroup + 1) -
readListOffsetInStorage(transaction, nodeGroupIdx, nodeOffsetInGroup);
auto listEndOffset = readOffset(transaction, nodeGroupIdx, nodeOffsetInGroup);
auto length = readSize(transaction, nodeGroupIdx, nodeOffsetInGroup);
auto listOffset = listEndOffset - length;
auto offsetInVector = posInVector == 0 ? 0 : resultVector->getValue<offset_t>(posInVector - 1);
resultVector->setValue(posInVector, list_entry_t{offsetInVector, length});
ListVector::resizeDataVector(resultVector, offsetInVector + length);
auto dataVector = ListVector::getDataVector(resultVector);
dataColumn->scan(transaction, StorageUtils::getNodeGroupIdx(nodeOffset), listOffset,
listOffset + length, ListVector::getDataVector(resultVector), offsetInVector);
listOffset + length, dataVector, offsetInVector);
}

void VarListColumn::append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) {
KU_ASSERT(columnChunk->getDataType().getPhysicalType() == dataType.getPhysicalType());
Column::append(columnChunk, nodeGroupIdx);
auto dataColumnChunk =
ku_dynamic_cast<ColumnChunk*, VarListColumnChunk*>(columnChunk)->getDataColumnChunk();
auto varListColumnChunk = ku_dynamic_cast<ColumnChunk*, VarListColumnChunk*>(columnChunk);
Column::append(varListColumnChunk, nodeGroupIdx);
auto sizeColumnChunk = varListColumnChunk->getSizeColumnChunk();
sizeColumn->append(sizeColumnChunk, nodeGroupIdx);
auto dataColumnChunk = varListColumnChunk->getDataColumnChunk();
dataColumn->append(dataColumnChunk, nodeGroupIdx);
}

Expand All @@ -124,10 +145,11 @@
offsetInVector += listLen;
}
ListVector::resizeDataVector(resultVector, offsetInVector);
auto startListOffsetInStorage = listOffsetInfoInStorage.getListOffset(0);
auto endListOffsetInStorage = listOffsetInfoInStorage.getListOffset(numValuesToScan);
auto startListOffsetInStorage = listOffsetInfoInStorage.getListStartOffset(0);
auto endListOffsetInStorage = listOffsetInfoInStorage.getListStartOffset(numValuesToScan);
auto dataVector = ListVector::getDataVector(resultVector);
dataColumn->scan(transaction, nodeGroupIdx, startListOffsetInStorage, endListOffsetInStorage,
ListVector::getDataVector(resultVector), 0 /* offsetInVector */);
dataVector, 0 /* offsetInVector */);
}

void VarListColumn::scanFiltered(Transaction* transaction, node_group_idx_t nodeGroupIdx,
Expand All @@ -143,21 +165,24 @@
listOffset = 0;
for (auto i = 0u; i < resultVector->state->selVector->selectedSize; i++) {
auto pos = resultVector->state->selVector->selectedPositions[i];
auto startOffsetInStorageToScan = listOffsetInfoInStorage.getListOffset(pos);
auto endOffsetInStorageToScan = listOffsetInfoInStorage.getListOffset(pos + 1);
auto startOffsetInStorageToScan = listOffsetInfoInStorage.getListStartOffset(pos);
auto endOffsetInStorageToScan = listOffsetInfoInStorage.getListStartOffset(pos + 1);
auto dataVector = ListVector::getDataVector(resultVector);
dataColumn->scan(transaction, nodeGroupIdx, startOffsetInStorageToScan,
endOffsetInStorageToScan, ListVector::getDataVector(resultVector), listOffset);
endOffsetInStorageToScan, dataVector, listOffset);
listOffset += resultVector->getValue<list_entry_t>(pos).size;
}
}

void VarListColumn::checkpointInMemory() {
Column::checkpointInMemory();
sizeColumn->checkpointInMemory();
dataColumn->checkpointInMemory();
}

void VarListColumn::rollbackInMemory() {
Column::rollbackInMemory();
sizeColumn->rollbackInMemory();
dataColumn->rollbackInMemory();
}

Expand All @@ -175,29 +200,49 @@
return value;
}

uint64_t VarListColumn::readSize(
Transaction* transaction, node_group_idx_t nodeGroupIdx, offset_t offsetInNodeGroup) {
auto chunkMeta = sizeColumn->getMetadataDA()->get(nodeGroupIdx, transaction->getType());
auto pageCursor = PageUtils::getPageCursorForPos(offsetInNodeGroup,
chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType));
pageCursor.pageIdx += chunkMeta.pageIdx;
uint64_t value;
readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void {
readToPageFunc(frame, pageCursor, (uint8_t*)&value, 0 /* posInVector */,
1 /* numValuesToRead */, chunkMeta.compMeta);
});
return value;
}

ListOffsetInfoInStorage VarListColumn::getListOffsetInfoInStorage(Transaction* transaction,
node_group_idx_t nodeGroupIdx, offset_t startOffsetInNodeGroup, offset_t endOffsetInNodeGroup,
const std::shared_ptr<DataChunkState>& state) {
auto numOffsetsToRead = endOffsetInNodeGroup - startOffsetInNodeGroup;
auto numOffsetVectors = numOffsetsToRead / DEFAULT_VECTOR_CAPACITY +
(numOffsetsToRead % DEFAULT_VECTOR_CAPACITY ? 1 : 0);
std::vector<std::unique_ptr<ValueVector>> offsetVectors;
std::vector<std::unique_ptr<ValueVector>> sizeVectors;
offsetVectors.reserve(numOffsetVectors);
sizeVectors.reserve(numOffsetVectors);
uint64_t numOffsetsRead = 0;
for (auto i = 0u; i < numOffsetVectors; i++) {
auto offsetVector = std::make_unique<ValueVector>(LogicalTypeID::INT64);
auto sizeVector = std::make_unique<ValueVector>(LogicalTypeID::INT64);
auto numOffsetsToReadInCurBatch =
std::min(numOffsetsToRead - numOffsetsRead, DEFAULT_VECTOR_CAPACITY);
offsetVector->setState(state);
sizeVector->setState(state);
Column::scan(transaction, nodeGroupIdx, startOffsetInNodeGroup + numOffsetsRead,
startOffsetInNodeGroup + numOffsetsRead + numOffsetsToReadInCurBatch,
offsetVector.get(), 0 /* offsetInVector */);
sizeColumn->scan(transaction, nodeGroupIdx, startOffsetInNodeGroup + numOffsetsRead,
startOffsetInNodeGroup + numOffsetsRead + numOffsetsToReadInCurBatch, sizeVector.get(),
0 /* offsetInVector */);
offsetVectors.push_back(std::move(offsetVector));
sizeVectors.push_back(std::move(sizeVector));
numOffsetsRead += numOffsetsToReadInCurBatch;
}
auto prevNodeListOffsetInStorage =
readListOffsetInStorage(transaction, nodeGroupIdx, startOffsetInNodeGroup);
return {prevNodeListOffsetInStorage, std::move(offsetVectors)};
return {numOffsetsToRead, std::move(offsetVectors), std::move(sizeVectors)};
}

} // namespace storage
Expand Down
Loading
Loading