Skip to content

Commit

Permalink
var list rework
Browse files Browse the repository at this point in the history
  • Loading branch information
Kuzu CI committed Mar 11, 2024
1 parent dc9771f commit 6d40d16
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class ColumnChunk {

inline uint64_t getCapacity() const { return capacity; }
inline uint64_t getNumValues() const { return numValues; }
void setNumValues(uint64_t numValues_);
virtual void setNumValues(uint64_t numValues_);
virtual bool numValuesSanityCheck() const;
bool isCompressionEnabled() const { return enableCompression; }

Expand Down
30 changes: 14 additions & 16 deletions src/include/storage/store/var_list_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ namespace kuzu {
namespace storage {

struct ListOffsetInfoInStorage {
common::offset_t prevNodeListOffset;
common::offset_t numTotal;
std::vector<std::unique_ptr<common::ValueVector>> offsetVectors;
std::vector<std::unique_ptr<common::ValueVector>> sizeVectors;

ListOffsetInfoInStorage(common::offset_t prevNodeListOffset,
std::vector<std::unique_ptr<common::ValueVector>> offsetVectors)
: prevNodeListOffset{prevNodeListOffset}, offsetVectors{std::move(offsetVectors)} {}
ListOffsetInfoInStorage(common::offset_t numTotal,
std::vector<std::unique_ptr<common::ValueVector>> offsetVectors,
std::vector<std::unique_ptr<common::ValueVector>> sizeVectors)
: numTotal{numTotal}, offsetVectors{std::move(offsetVectors)}, sizeVectors{std::move(
sizeVectors)} {}

uint64_t getListLength(uint64_t nodePos) const;
common::offset_t getEndListOffset(uint64_t nodePos) const;
common::offset_t getListOffset(uint64_t nodePos) const;

inline uint64_t getListLength(uint64_t nodePos) const {
KU_ASSERT(getListOffset(nodePos + 1) >= getListOffset(nodePos));
return getListOffset(nodePos + 1) - getListOffset(nodePos);
}
};

class VarListColumn : public Column {
Expand Down Expand Up @@ -69,13 +69,6 @@ class VarListColumn : public Column {
void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) override;

private:
inline common::offset_t readListOffsetInStorage(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup) {
return offsetInNodeGroup == 0 ?
0 :
readOffset(transaction, nodeGroupIdx, offsetInNodeGroup - 1);
}

void scanUnfiltered(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::ValueVector* resultVector,
const ListOffsetInfoInStorage& listOffsetInfoInStorage);
Expand All @@ -102,12 +95,17 @@ class VarListColumn : public Column {

common::offset_t readOffset(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup);

uint64_t readSize(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t offsetInNodeGroup);

ListOffsetInfoInStorage getListOffsetInfoInStorage(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInNodeGroup,
common::offset_t endOffsetInNodeGroup,
const std::shared_ptr<common::DataChunkState>& state);

private:
std::unique_ptr<Column> sizeColumn;
std::unique_ptr<Column> dataColumn;
};

Expand Down
33 changes: 29 additions & 4 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,15 @@ class VarListColumnChunk : public ColumnChunk {
return varListDataColumnChunk->dataColumnChunk.get();
}

inline ColumnChunk* getSizeColumnChunk() const { return sizeColumnChunk.get(); }

void resetToEmpty() final;

inline void setNumValues(uint64_t numValues_) final {
ColumnChunk::setNumValues(numValues_);
sizeColumnChunk->setNumValues(numValues_);
}

void append(common::ValueVector* vector) final;
void appendOne(common::ValueVector* vector, common::vector_idx_t pos) final;
// Note: `write` assumes that no `append` will be called afterward.
Expand All @@ -51,12 +58,32 @@ class VarListColumnChunk : public ColumnChunk {
varListDataColumnChunk->resizeBuffer(numValues);
}

inline void resize(uint64_t newCapacity) final {
ColumnChunk::resize(newCapacity);
sizeColumnChunk->resize(newCapacity);
}

void finalize() final;

inline common::offset_t getListOffset(common::offset_t offset) const {
return offset == 0 ? 0 : getValue<uint64_t>(offset - 1);
if (numValues == 0)
return 0;
KU_ASSERT(offset <= numValues);
return offset == numValues ? getValue<uint64_t>(offset - 1) :
getValue<uint64_t>(offset) - getListLen(offset);
}

inline uint64_t getListLen(common::offset_t offset) const {
if (sizeColumnChunk->getNumValues() == 0)
return 0;
KU_ASSERT(offset <= sizeColumnChunk->getNumValues());
return offset == sizeColumnChunk->getNumValues() ?
0 :
sizeColumnChunk->getValue<uint64_t>(offset);
}

void resetOffset();

protected:
void copyListValues(const common::list_entry_t& entry, common::ValueVector* dataVector);

Expand All @@ -74,14 +101,12 @@ class VarListColumnChunk : public ColumnChunk {
}
indicesColumnChunk->setNumValues(numValues);
}
inline uint64_t getListLen(common::offset_t offset) const {
return getListOffset(offset + 1) - getListOffset(offset);
}

void resetFromOtherChunk(VarListColumnChunk* other);
void appendNullList();

protected:
std::unique_ptr<ColumnChunk> sizeColumnChunk;
std::unique_ptr<VarListDataColumnChunk> varListDataColumnChunk;
// The following is needed to write var list to random positions in the column chunk.
// We first append var list to the end of the column chunk. Then use indicesColumnChunk to track
Expand Down
2 changes: 2 additions & 0 deletions src/storage/stats/table_statistics_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ std::unique_ptr<MetadataDAHInfo> TablesStatistics::createMetadataDAHInfo(
}
} break;
case PhysicalTypeID::VAR_LIST: {
metadataDAHInfo->childrenInfos.push_back(
createMetadataDAHInfo(*LogicalType::INT64(), metadataFH, bm, wal));
metadataDAHInfo->childrenInfos.push_back(
createMetadataDAHInfo(*VarListType::getChildType(&dataType), metadataFH, bm, wal));
} break;
Expand Down
110 changes: 87 additions & 23 deletions src/storage/store/var_list_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,24 @@ namespace kuzu {
namespace storage {

offset_t ListOffsetInfoInStorage::getListOffset(uint64_t nodePos) const {
if (nodePos == 0) {
return prevNodeListOffset;
} else {
auto offsetVector = offsetVectors[(nodePos - 1) / DEFAULT_VECTOR_CAPACITY].get();
return offsetVector->getValue<offset_t>((nodePos - 1) % DEFAULT_VECTOR_CAPACITY);
}
return getEndListOffset(nodePos) - getListLength(nodePos);
}

offset_t ListOffsetInfoInStorage::getEndListOffset(uint64_t nodePos) const {
if (offsetVectors.size() == 0)
return offset_t(0);
if (nodePos == numTotal)
return getEndListOffset(nodePos - 1);
auto offsetVector = offsetVectors[(nodePos) / DEFAULT_VECTOR_CAPACITY].get();
return offsetVector->getValue<offset_t>((nodePos) % DEFAULT_VECTOR_CAPACITY);
}

uint64_t ListOffsetInfoInStorage::getListLength(uint64_t nodePos) const {
if (nodePos == numTotal)
return uint64_t(0);
KU_ASSERT(nodePos < sizeVectors.size() * common::DEFAULT_VECTOR_CAPACITY);
auto sizeVector = sizeVectors[nodePos / common::DEFAULT_VECTOR_CAPACITY].get();
return sizeVector->getValue<uint64_t>(nodePos % common::DEFAULT_VECTOR_CAPACITY);
}

VarListColumn::VarListColumn(std::string name, LogicalType dataType,
Expand All @@ -26,9 +38,13 @@ VarListColumn::VarListColumn(std::string name, LogicalType dataType,
RWPropertyStats propertyStatistics, bool enableCompression)
: Column{name, std::move(dataType), metaDAHeaderInfo, dataFH, metadataFH, bufferManager, wal,
transaction, propertyStatistics, enableCompression, true /* requireNullColumn */} {
auto sizeColName = StorageUtils::getColumnName(name, StorageUtils::ColumnType::OFFSET, "");
auto dataColName = StorageUtils::getColumnName(name, StorageUtils::ColumnType::DATA, "");
sizeColumn = ColumnFactory::createColumn(sizeColName, *LogicalType::INT64(),
*metaDAHeaderInfo.childrenInfos[0], dataFH, metadataFH, bufferManager, wal, transaction,
propertyStatistics, enableCompression);
dataColumn = ColumnFactory::createColumn(dataColName,
*VarListType::getChildType(&this->dataType)->copy(), *metaDAHeaderInfo.childrenInfos[0],
*VarListType::getChildType(&this->dataType)->copy(), *metaDAHeaderInfo.childrenInfos[1],
dataFH, metadataFH, bufferManager, wal, transaction, propertyStatistics, enableCompression);
}

Expand All @@ -39,6 +55,7 @@ void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx
// a bottleneck of the scan performance. We need to further optimize this.
nullColumn->scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector,
offsetInVector);
KU_ASSERT(resultVector->state);
auto listOffsetInfoInStorage = getListOffsetInfoInStorage(
transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector->state);
offset_t listOffsetInVector =
Expand All @@ -53,9 +70,13 @@ void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx
listOffsetInVector += length;
}
ListVector::resizeDataVector(resultVector, listOffsetInVector);
auto dataVector = ListVector::getDataVector(resultVector);
if (!dataVector->state) {
dataVector->setState(std::make_unique<DataChunkState>());
dataVector->state->selVector->resetSelectorToValuePosBuffer();
}
dataColumn->scan(transaction, nodeGroupIdx, listOffsetInfoInStorage.getListOffset(0),
listOffsetInfoInStorage.getListOffset(numValues), ListVector::getDataVector(resultVector),
offsetToWriteListData);
listOffsetInfoInStorage.getListOffset(numValues), dataVector, offsetToWriteListData);
}

void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx,
Expand All @@ -64,14 +85,17 @@ void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx
columnChunk->setNumValues(0);
} else {
Column::scan(transaction, nodeGroupIdx, columnChunk, startOffset, endOffset);
// TODO: FIX-ME.
auto sizeColumnChunk =
ku_dynamic_cast<ColumnChunk*, VarListColumnChunk*>(columnChunk)->getSizeColumnChunk();
sizeColumn->scan(transaction, nodeGroupIdx, sizeColumnChunk, startOffset, endOffset);
auto varListColumnChunk = ku_dynamic_cast<ColumnChunk*, VarListColumnChunk*>(columnChunk);
auto startVarListOffset = varListColumnChunk->getListOffset(0);
auto endVarListOffset = varListColumnChunk->getListOffset(columnChunk->getNumValues());
auto numElements = endVarListOffset - startVarListOffset + 1;
auto numElements = endVarListOffset - startVarListOffset;
varListColumnChunk->resizeDataColumnChunk(std::bit_ceil(numElements));
dataColumn->scan(transaction, nodeGroupIdx, varListColumnChunk->getDataColumnChunk(),
startVarListOffset, endVarListOffset);
varListColumnChunk->resetOffset();
}
}

Expand All @@ -82,6 +106,7 @@ void VarListColumn::scanInternal(
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(startNodeOffset);
auto startNodeOffsetInGroup =
startNodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
KU_ASSERT(resultVector->state);
auto listOffsetInfoInStorage =
getListOffsetInfoInStorage(transaction, nodeGroupIdx, startNodeOffsetInGroup,
startNodeOffsetInGroup + nodeIDVector->state->getOriginalSize(), resultVector->state);
Expand All @@ -96,21 +121,28 @@ void VarListColumn::lookupValue(Transaction* transaction, offset_t nodeOffset,
ValueVector* resultVector, uint32_t posInVector) {
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto nodeOffsetInGroup = nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
auto listOffset = readListOffsetInStorage(transaction, nodeGroupIdx, nodeOffsetInGroup);
auto length = readListOffsetInStorage(transaction, nodeGroupIdx, nodeOffsetInGroup + 1) -
readListOffsetInStorage(transaction, nodeGroupIdx, nodeOffsetInGroup);
auto listEndOffset = readOffset(transaction, nodeGroupIdx, nodeOffsetInGroup);
auto length = readSize(transaction, nodeGroupIdx, nodeOffsetInGroup);
auto listOffset = listEndOffset - length;
auto offsetInVector = posInVector == 0 ? 0 : resultVector->getValue<offset_t>(posInVector - 1);
resultVector->setValue(posInVector, list_entry_t{offsetInVector, length});
ListVector::resizeDataVector(resultVector, offsetInVector + length);
auto dataVector = ListVector::getDataVector(resultVector);
if (!dataVector->state) {
dataVector->setState(std::make_unique<DataChunkState>());
dataVector->state->selVector->resetSelectorToValuePosBuffer();
}
dataColumn->scan(transaction, StorageUtils::getNodeGroupIdx(nodeOffset), listOffset,
listOffset + length, ListVector::getDataVector(resultVector), offsetInVector);
listOffset + length, dataVector, offsetInVector);
}

void VarListColumn::append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) {
KU_ASSERT(columnChunk->getDataType().getPhysicalType() == dataType.getPhysicalType());
Column::append(columnChunk, nodeGroupIdx);
auto dataColumnChunk =
ku_dynamic_cast<ColumnChunk*, VarListColumnChunk*>(columnChunk)->getDataColumnChunk();
auto varListColumnChunk = ku_dynamic_cast<ColumnChunk*, VarListColumnChunk*>(columnChunk);
Column::append(varListColumnChunk, nodeGroupIdx);
auto sizeColumnChunk = varListColumnChunk->getSizeColumnChunk();
sizeColumn->append(sizeColumnChunk, nodeGroupIdx);
auto dataColumnChunk = varListColumnChunk->getDataColumnChunk();
dataColumn->append(dataColumnChunk, nodeGroupIdx);
}

Expand All @@ -126,8 +158,13 @@ void VarListColumn::scanUnfiltered(Transaction* transaction, node_group_idx_t no
ListVector::resizeDataVector(resultVector, offsetInVector);
auto startListOffsetInStorage = listOffsetInfoInStorage.getListOffset(0);
auto endListOffsetInStorage = listOffsetInfoInStorage.getListOffset(numValuesToScan);
auto dataVector = ListVector::getDataVector(resultVector);
if (!dataVector->state) {
dataVector->setState(std::make_unique<DataChunkState>());
dataVector->state->selVector->resetSelectorToValuePosBuffer();
}
dataColumn->scan(transaction, nodeGroupIdx, startListOffsetInStorage, endListOffsetInStorage,
ListVector::getDataVector(resultVector), 0 /* offsetInVector */);
dataVector, 0 /* offsetInVector */);
}

void VarListColumn::scanFiltered(Transaction* transaction, node_group_idx_t nodeGroupIdx,
Expand All @@ -145,19 +182,26 @@ void VarListColumn::scanFiltered(Transaction* transaction, node_group_idx_t node
auto pos = resultVector->state->selVector->selectedPositions[i];
auto startOffsetInStorageToScan = listOffsetInfoInStorage.getListOffset(pos);
auto endOffsetInStorageToScan = listOffsetInfoInStorage.getListOffset(pos + 1);
auto dataVector = ListVector::getDataVector(resultVector);
if (!dataVector->state) {
dataVector->setState(std::make_unique<DataChunkState>());
dataVector->state->selVector->resetSelectorToValuePosBuffer();
}
dataColumn->scan(transaction, nodeGroupIdx, startOffsetInStorageToScan,
endOffsetInStorageToScan, ListVector::getDataVector(resultVector), listOffset);
endOffsetInStorageToScan, dataVector, listOffset);
listOffset += resultVector->getValue<list_entry_t>(pos).size;
}
}

void VarListColumn::checkpointInMemory() {
Column::checkpointInMemory();
sizeColumn->checkpointInMemory();
dataColumn->checkpointInMemory();
}

void VarListColumn::rollbackInMemory() {
Column::rollbackInMemory();
sizeColumn->rollbackInMemory();
dataColumn->rollbackInMemory();
}

Expand All @@ -175,29 +219,49 @@ offset_t VarListColumn::readOffset(
return value;
}

uint64_t VarListColumn::readSize(
Transaction* transaction, node_group_idx_t nodeGroupIdx, offset_t offsetInNodeGroup) {
auto chunkMeta = sizeColumn->getMetadataDA()->get(nodeGroupIdx, transaction->getType());
auto pageCursor = PageUtils::getPageCursorForPos(offsetInNodeGroup,
chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType));
pageCursor.pageIdx += chunkMeta.pageIdx;
uint64_t value;
readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void {
readToPageFunc(frame, pageCursor, (uint8_t*)&value, 0 /* posInVector */,
1 /* numValuesToRead */, chunkMeta.compMeta);
});
return value;
}

ListOffsetInfoInStorage VarListColumn::getListOffsetInfoInStorage(Transaction* transaction,
node_group_idx_t nodeGroupIdx, offset_t startOffsetInNodeGroup, offset_t endOffsetInNodeGroup,
const std::shared_ptr<DataChunkState>& state) {
auto numOffsetsToRead = endOffsetInNodeGroup - startOffsetInNodeGroup;
auto numOffsetVectors = numOffsetsToRead / DEFAULT_VECTOR_CAPACITY +
(numOffsetsToRead % DEFAULT_VECTOR_CAPACITY ? 1 : 0);
std::vector<std::unique_ptr<ValueVector>> offsetVectors;
std::vector<std::unique_ptr<ValueVector>> sizeVectors;
offsetVectors.reserve(numOffsetVectors);
sizeVectors.reserve(numOffsetVectors);
uint64_t numOffsetsRead = 0;
for (auto i = 0u; i < numOffsetVectors; i++) {
auto offsetVector = std::make_unique<ValueVector>(LogicalTypeID::INT64);
auto sizeVector = std::make_unique<ValueVector>(LogicalTypeID::INT64);
auto numOffsetsToReadInCurBatch =
std::min(numOffsetsToRead - numOffsetsRead, DEFAULT_VECTOR_CAPACITY);
offsetVector->setState(state);
sizeVector->setState(state);
Column::scan(transaction, nodeGroupIdx, startOffsetInNodeGroup + numOffsetsRead,
startOffsetInNodeGroup + numOffsetsRead + numOffsetsToReadInCurBatch,
offsetVector.get(), 0 /* offsetInVector */);
sizeColumn->scan(transaction, nodeGroupIdx, startOffsetInNodeGroup + numOffsetsRead,
startOffsetInNodeGroup + numOffsetsRead + numOffsetsToReadInCurBatch, sizeVector.get(),
0 /* offsetInVector */);
offsetVectors.push_back(std::move(offsetVector));
sizeVectors.push_back(std::move(sizeVector));
numOffsetsRead += numOffsetsToReadInCurBatch;
}
auto prevNodeListOffsetInStorage =
readListOffsetInStorage(transaction, nodeGroupIdx, startOffsetInNodeGroup);
return {prevNodeListOffsetInStorage, std::move(offsetVectors)};
return {numOffsetsToRead, std::move(offsetVectors), std::move(sizeVectors)};
}

} // namespace storage
Expand Down

0 comments on commit 6d40d16

Please sign in to comment.