Skip to content

Commit

Permalink
solve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
hououou committed Mar 22, 2024
1 parent 16f03c3 commit 9180039
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 140 deletions.
2 changes: 1 addition & 1 deletion src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ListAuxiliaryBuffer::ListAuxiliaryBuffer(
: capacity{DEFAULT_VECTOR_CAPACITY}, size{0}, dataVector{std::make_shared<ValueVector>(
dataVectorType, memoryManager)} {}

list_entry_t ListAuxiliaryBuffer::addList(uint64_t listSize) {
list_entry_t ListAuxiliaryBuffer::addList(list_size_t listSize) {
auto listEntry = list_entry_t{size, listSize};
bool needResizeDataVector = size + listSize > capacity;
while (size + listSize > capacity) {
Expand Down
7 changes: 4 additions & 3 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ constexpr node_group_idx_t INVALID_NODE_GROUP_IDX = UINT64_MAX;
using partition_idx_t = uint64_t;
constexpr partition_idx_t INVALID_PARTITION_IDX = UINT64_MAX;
using length_t = uint64_t;
using list_size_t = uint32_t;

// System representation for a variable-sized overflow value.
struct overflow_value_t {
Expand All @@ -57,10 +58,10 @@ struct overflow_value_t {

struct list_entry_t {
common::offset_t offset;
uint64_t size;
list_size_t size;

list_entry_t() : offset{INVALID_OFFSET}, size{UINT64_MAX} {}
list_entry_t(common::offset_t offset, uint64_t size) : offset{offset}, size{size} {}
list_entry_t() : offset{INVALID_OFFSET}, size{UINT32_MAX} {}
list_entry_t(common::offset_t offset, list_size_t size) : offset{offset}, size{size} {}
};

struct struct_entry_t {
Expand Down
2 changes: 1 addition & 1 deletion src/include/common/vector/auxiliary_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ListAuxiliaryBuffer : public AuxiliaryBuffer {
ValueVector* getDataVector() const { return dataVector.get(); }
std::shared_ptr<ValueVector> getSharedDataVector() const { return dataVector; }

list_entry_t addList(uint64_t listSize);
list_entry_t addList(list_size_t listSize);

uint64_t getSize() const { return size; }

Expand Down
21 changes: 11 additions & 10 deletions src/include/storage/store/var_list_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
// Size column: [4, 0, 2, 0]
// data column: [4, 7, 8, 12, 2, 3]
// When updating the data, we first append the data to the data column, and then update the offset
// and size. For example, we want to update [4,7,8,12] to [1,3,4]. We first append [1,3,4] to the
// data column, and then update the offset and size. The layout after the update: Offset column: [9,
// 4, 6, 6] Size column: [3, 0, 3, 0] data column: [4, 7, 8, 12, 2, 3, 1, 3, 4] This design is good
// for writing performance. But it is bad for scan performance since it may cause random access to
// the data column. Worse, the data column will be increased every update/insert. To balance the
// write performance and read performance, we rewrite the whole var list column in ascending order
// when the size of data column is larger than a threshold(Currently we use capacity/2).
// and size accordingly. Besides offset column, we introduce an extra size column here to enable
// in-place updates of a list column. In a list column chunk, offsets of lists are not always sorted
// after updates. This is good for writes, but it introduces extra overheads for scans, as lists can
// be scattered, and scans have to be broken into multiple small reads. To achieve a balance between
// reads and writes, during updates, we rewrite the whole var list column chunk in ascending order
// when the offsets are not sorted in ascending order and the size of data column chunk is larger
// than half of its capacity.

namespace kuzu {
namespace storage {
Expand All @@ -35,7 +35,7 @@ struct ListOffsetSizeInfo {
: numTotal{numTotal}, offsetColumnChunk{std::move(offsetColumnChunk)},
sizeColumnChunk{std::move(sizeColumnChunk)} {}

uint32_t getListLength(uint64_t pos) const;
common::list_size_t getListSize(uint64_t pos) const;
common::offset_t getListEndOffset(uint64_t pos) const;
common::offset_t getListStartOffset(uint64_t pos) const;

Expand Down Expand Up @@ -83,8 +83,8 @@ class VarListColumn : public Column {
common::offset_t readOffset(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup);

uint32_t readSize(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t offsetInNodeGroup);
common::list_size_t readSize(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInNodeGroup);

ListOffsetSizeInfo getListOffsetSizeInfo(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInNodeGroup,
Expand All @@ -101,6 +101,7 @@ class VarListColumn : public Column {
private:
std::unique_ptr<Column> sizeColumn;
std::unique_ptr<Column> dataColumn;
// TODO(Guodong): This should be moved to table states.
std::unique_ptr<VarListDataColumnChunk> tmpDataColumnChunk;
};

Expand Down
27 changes: 7 additions & 20 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,31 +68,17 @@ class VarListColumnChunk final : public ColumnChunk {
sizeColumnChunk->resize(newCapacity);
}

inline common::offset_t getListStartOffset(common::offset_t offset) const {
if (numValues == 0)
return 0;
return offset == numValues ? getListEndOffset(offset - 1) :
getListEndOffset(offset) - getListLen(offset);
}
common::offset_t getListStartOffset(common::offset_t offset) const;

inline common::offset_t getListEndOffset(common::offset_t offset) const {
if (numValues == 0)
return 0;
KU_ASSERT(offset < numValues);
return getValue<uint64_t>(offset);
}
common::offset_t getListEndOffset(common::offset_t offset) const;

inline uint64_t getListLen(common::offset_t offset) const {
if (numValues == 0)
return 0;
KU_ASSERT(offset < sizeColumnChunk->getNumValues());
return sizeColumnChunk->getValue<uint32_t>(offset);
}
common::list_size_t getListSize(common::offset_t offset) const;

void resetOffset();
void resetFromOtherChunk(VarListColumnChunk* other);
void finalize() override;
bool isOffsetSortedAscending(uint64_t startPos, uint64_t endPos) const;
bool isOffsetsConsecutiveAndSortedAscending(uint64_t startPos, uint64_t endPos) const;
bool sanityCheck() override;

protected:
void copyListValues(const common::list_entry_t& entry, common::ValueVector* dataVector);
Expand All @@ -106,7 +92,8 @@ class VarListColumnChunk final : public ColumnChunk {
protected:
std::unique_ptr<ColumnChunk> sizeColumnChunk;
std::unique_ptr<VarListDataColumnChunk> varListDataColumnChunk;
bool checkOrder;
// we use checkOffsetSortedAsc flag to indicate that we do not trigger random write
bool checkOffsetSortedAsc;
};

} // namespace storage
Expand Down
5 changes: 3 additions & 2 deletions src/processor/operator/unwind.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ void Unwind::copyTuplesToOutVector(uint64_t startPos, uint64_t endPos) const {

bool Unwind::getNextTuplesInternal(ExecutionContext* context) {
if (hasMoreToRead()) {
auto totalElementsCopy = std::min(DEFAULT_VECTOR_CAPACITY, listEntry.size - startIndex);
auto totalElementsCopy =
std::min(DEFAULT_VECTOR_CAPACITY, (uint64_t)listEntry.size - startIndex);
copyTuplesToOutVector(startIndex, (totalElementsCopy + startIndex));
startIndex += totalElementsCopy;
outValueVector->state->initOriginalAndSelectedSize(totalElementsCopy);
Expand All @@ -42,7 +43,7 @@ bool Unwind::getNextTuplesInternal(ExecutionContext* context) {
}
listEntry = expressionEvaluator->resultVector->getValue<list_entry_t>(pos);
startIndex = 0;
auto totalElementsCopy = std::min(DEFAULT_VECTOR_CAPACITY, listEntry.size);
auto totalElementsCopy = std::min(DEFAULT_VECTOR_CAPACITY, (uint64_t)listEntry.size);
copyTuplesToOutVector(0, totalElementsCopy);
startIndex += totalElementsCopy;
outValueVector->state->initOriginalAndSelectedSize(startIndex);
Expand Down
76 changes: 36 additions & 40 deletions src/storage/store/var_list_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ offset_t ListOffsetSizeInfo::getListStartOffset(uint64_t pos) const {
if (numTotal == 0) {
return 0;
}
return pos == numTotal ? getListEndOffset(pos - 1) : getListEndOffset(pos) - getListLength(pos);
return pos == numTotal ? getListEndOffset(pos - 1) : getListEndOffset(pos) - getListSize(pos);
}

offset_t ListOffsetSizeInfo::getListEndOffset(uint64_t pos) const {
Expand All @@ -26,20 +26,20 @@ offset_t ListOffsetSizeInfo::getListEndOffset(uint64_t pos) const {
return offsetColumnChunk->getValue<offset_t>(pos);
}

uint32_t ListOffsetSizeInfo::getListLength(uint64_t pos) const {
list_size_t ListOffsetSizeInfo::getListSize(uint64_t pos) const {
if (numTotal == 0) {
return 0;
}
KU_ASSERT(pos < sizeColumnChunk->getNumValues());
return sizeColumnChunk->getValue<uint32_t>(pos);
return sizeColumnChunk->getValue<list_size_t>(pos);
}

bool ListOffsetSizeInfo::isOffsetSortedAscending(uint64_t startPos, uint64_t endPos) const {
offset_t prevEndOffset = getListStartOffset(startPos);
for (auto i = startPos; i < endPos; i++) {
offset_t currentEndOffset = getListEndOffset(i);
auto length = getListLength(i);
prevEndOffset += length;
auto size = getListSize(i);
prevEndOffset += size;
if (currentEndOffset != prevEndOffset) {
return false;
}
Expand Down Expand Up @@ -69,8 +69,6 @@ VarListColumn::VarListColumn(std::string name, LogicalType dataType,
void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx,
offset_t startOffsetInGroup, offset_t endOffsetInGroup, ValueVector* resultVector,
uint64_t offsetInVector) {
// TODO(Ziyi): the current scan function requires two dynamic allocation of vectors which may be
// a bottleneck of the scan performance. We need to further optimize this.
nullColumn->scan(transaction, nodeGroupIdx, startOffsetInGroup, endOffsetInGroup, resultVector,
offsetInVector);
auto listOffsetInfoInStorage =
Expand All @@ -83,9 +81,9 @@ void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx
auto numValues = endOffsetInGroup - startOffsetInGroup;
KU_ASSERT(numValues >= 0);
for (auto i = 0u; i < numValues; i++) {
uint64_t length = listOffsetInfoInStorage.getListLength(i);
resultVector->setValue(i + offsetInVector, list_entry_t{listOffsetInVector, length});
listOffsetInVector += length;
list_size_t size = listOffsetInfoInStorage.getListSize(i);
resultVector->setValue(i + offsetInVector, list_entry_t{listOffsetInVector, size});
listOffsetInVector += size;
}
ListVector::resizeDataVector(resultVector, listOffsetInVector);
auto dataVector = ListVector::getDataVector(resultVector);
Expand All @@ -97,11 +95,11 @@ void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx
} else {
for (auto i = 0u; i < numValues; i++) {
offset_t startOffset = listOffsetInfoInStorage.getListStartOffset(i);
offset_t appendLen = listOffsetInfoInStorage.getListLength(i);
KU_ASSERT(appendLen >= 0);
dataColumn->scan(transaction, nodeGroupIdx, startOffset, startOffset + appendLen,
offset_t appendSize = listOffsetInfoInStorage.getListSize(i);
KU_ASSERT(appendSize >= 0);
dataColumn->scan(transaction, nodeGroupIdx, startOffset, startOffset + appendSize,
dataVector, offsetToWriteListData);
offsetToWriteListData += appendLen;
offsetToWriteListData += appendSize;
}
}
}
Expand All @@ -120,12 +118,12 @@ void VarListColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx
offset_t prevOffset = varListColumnChunk->getListStartOffset(0);
for (auto i = 0u; i < columnChunk->getNumValues(); i++) {
auto currentEndOffset = varListColumnChunk->getListEndOffset(i);
auto appendLen = varListColumnChunk->getListLen(i);
prevOffset += (uint64_t)appendLen;
auto appendSize = varListColumnChunk->getListSize(i);
prevOffset += appendSize;
if (currentEndOffset != prevOffset) {
isOffsetSortedAscending = false;
}
resizeNumValues += appendLen;
resizeNumValues += appendSize;
}
if (isOffsetSortedAscending) {
varListColumnChunk->resizeDataColumnChunk(std::bit_ceil(resizeNumValues));
Expand Down Expand Up @@ -177,11 +175,11 @@ void VarListColumn::lookupValue(Transaction* transaction, offset_t nodeOffset,
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto nodeOffsetInGroup = nodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
auto listEndOffset = readOffset(transaction, nodeGroupIdx, nodeOffsetInGroup);
auto length = readSize(transaction, nodeGroupIdx, nodeOffsetInGroup);
auto listStartOffset = listEndOffset - length;
auto size = readSize(transaction, nodeGroupIdx, nodeOffsetInGroup);
auto listStartOffset = listEndOffset - size;
auto offsetInVector = posInVector == 0 ? 0 : resultVector->getValue<offset_t>(posInVector - 1);
resultVector->setValue(posInVector, list_entry_t{offsetInVector, (uint64_t)length});
ListVector::resizeDataVector(resultVector, offsetInVector + length);
resultVector->setValue(posInVector, list_entry_t{offsetInVector, size});
ListVector::resizeDataVector(resultVector, offsetInVector + size);
auto dataVector = ListVector::getDataVector(resultVector);
dataColumn->scan(transaction, StorageUtils::getNodeGroupIdx(nodeOffset), listStartOffset,
listEndOffset, dataVector, offsetInVector);
Expand All @@ -202,7 +200,7 @@ void VarListColumn::scanUnfiltered(Transaction* transaction, node_group_idx_t no
auto numValuesToScan = resultVector->state->selVector->selectedSize;
offset_t offsetInVector = 0;
for (auto i = 0u; i < numValuesToScan; i++) {
auto listLen = listOffsetInfoInStorage.getListLength(i);
auto listLen = listOffsetInfoInStorage.getListSize(i);
resultVector->setValue(i, list_entry_t{offsetInVector, listLen});
offsetInVector += listLen;
}
Expand All @@ -219,10 +217,10 @@ void VarListColumn::scanUnfiltered(Transaction* transaction, node_group_idx_t no
} else {
for (auto i = 0u; i < numValuesToScan; i++) {
auto startListOffsetInStorage = listOffsetInfoInStorage.getListStartOffset(i);
auto appendLen = listOffsetInfoInStorage.getListLength(i);
auto appendSize = listOffsetInfoInStorage.getListSize(i);
dataColumn->scan(transaction, nodeGroupIdx, startListOffsetInStorage,
startListOffsetInStorage + appendLen, dataVector, offsetInVector);
offsetInVector += appendLen;
startListOffsetInStorage + appendSize, dataVector, offsetInVector);
offsetInVector += appendSize;
}
}
}
Expand All @@ -232,19 +230,19 @@ void VarListColumn::scanFiltered(Transaction* transaction, node_group_idx_t node
offset_t listOffset = 0;
for (auto i = 0u; i < resultVector->state->selVector->selectedSize; i++) {
auto pos = resultVector->state->selVector->selectedPositions[i];
auto listLen = listOffsetSizeInfo.getListLength(pos);
resultVector->setValue(pos, list_entry_t{(offset_t)listOffset, (uint64_t)listLen});
listOffset += listLen;
auto listSize = listOffsetSizeInfo.getListSize(pos);
resultVector->setValue(pos, list_entry_t{(offset_t)listOffset, listSize});
listOffset += listSize;
}
ListVector::resizeDataVector(resultVector, listOffset);
listOffset = 0;
for (auto i = 0u; i < resultVector->state->selVector->selectedSize; i++) {
auto pos = resultVector->state->selVector->selectedPositions[i];
auto startOffsetInStorageToScan = listOffsetSizeInfo.getListStartOffset(pos);
auto appendLen = listOffsetSizeInfo.getListLength(pos);
auto appendSize = listOffsetSizeInfo.getListSize(pos);
auto dataVector = ListVector::getDataVector(resultVector);
dataColumn->scan(transaction, nodeGroupIdx, startOffsetInStorageToScan,
startOffsetInStorageToScan + appendLen, dataVector, listOffset);
startOffsetInStorageToScan + appendSize, dataVector, listOffset);
listOffset += resultVector->getValue<list_entry_t>(pos).size;
}
}
Expand Down Expand Up @@ -275,7 +273,7 @@ offset_t VarListColumn::readOffset(
return value;
}

uint32_t VarListColumn::readSize(
list_size_t VarListColumn::readSize(
Transaction* transaction, node_group_idx_t nodeGroupIdx, offset_t offsetInNodeGroup) {
auto chunkMeta = sizeColumn->getMetadataDA()->get(nodeGroupIdx, transaction->getType());
auto pageCursor = PageUtils::getPageCursorForPos(offsetInNodeGroup,
Expand Down Expand Up @@ -313,7 +311,7 @@ void VarListColumn::prepareCommitForChunk(Transaction* transaction, node_group_i
commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, isNewNodeGroup, localInsertChunks,
insertInfo, localUpdateChunks, updateInfo, deleteInfo);
} else {
auto columnChunk = getEmptyChunkForCommit(common::StorageConstants::NODE_GROUP_SIZE);
auto columnChunk = getEmptyChunkForCommit(updateInfo.size() + insertInfo.size());
std::vector<offset_t> dstOffsets;
for (auto& [offsetInDstChunk, rowIdx] : updateInfo) {
auto [chunkIdx, offsetInLocalChunk] =
Expand Down Expand Up @@ -349,16 +347,14 @@ void VarListColumn::prepareCommitForChunk(Transaction* transaction, node_group_i
auto dataColumnSize =
dataColumn->getMetadata(nodeGroupIdx, transaction->getType()).numValues;
auto dataColumnChunk = varListChunk->getDataColumnChunk();
auto appendListNum = std::min(chunk->getNumValues(), (uint64_t)dstOffsets.size());
auto appendListDataNum = 0u;
auto numListsToAppend = std::min(chunk->getNumValues(), (uint64_t)dstOffsets.size());
auto dataSize = 0u;
auto startVarListOffset = varListChunk->getListStartOffset(startSrcOffset);
std::vector<common::offset_t> dstOffsetsInDataColumn;
for (auto i = 0u; i < appendListNum; i++) {
auto appendLen = varListChunk->getListLen(startSrcOffset + i);
for (auto j = 0u; j < appendLen; j++) {
dstOffsetsInDataColumn.push_back(dataColumnSize + appendListDataNum + j);
for (auto i = 0u; i < numListsToAppend; i++) {
for (auto j = 0u; j < varListChunk->getListSize(startSrcOffset + i); j++) {
dstOffsetsInDataColumn.push_back(dataColumnSize + dataSize++);
}
appendListDataNum += appendLen;
}
dataColumn->prepareCommitForChunk(
transaction, nodeGroupIdx, dstOffsetsInDataColumn, dataColumnChunk, startVarListOffset);
Expand All @@ -371,7 +367,7 @@ void VarListColumn::prepareCommitForChunk(Transaction* transaction, node_group_i
auto offsetColumnChunk = ColumnChunkFactory::createColumnChunk(*dataType.copy(),
enableCompression, 1.5 * std::bit_ceil(offsetChunkMeta.numValues + dstOffsets.size()));
Column::scan(transaction, nodeGroupIdx, offsetColumnChunk.get());
for (auto i = 0u; i < appendListNum; i++) {
for (auto i = 0u; i < numListsToAppend; i++) {
auto listEndOffset = varListChunk->getListEndOffset(startSrcOffset + i);
auto isNull = varListChunk->getNullChunk()->isNull(startSrcOffset + i);
offsetColumnChunk->setValue<offset_t>(dataColumnSize + listEndOffset, dstOffsets[i]);
Expand Down
Loading

0 comments on commit 9180039

Please sign in to comment.