Skip to content

Commit

Permalink
Merge pull request #2346 from kuzudb/check_chunk_bounds
Browse files Browse the repository at this point in the history
Fix bounds of data read in VarList updates
  • Loading branch information
benjaminwinger committed Nov 9, 2023
2 parents cc7f6ac + beba021 commit b698dca
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ class Column {
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void scanUnfiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
uint64_t numValuesToScan, common::ValueVector* resultVector,
const CompressionMetadata& compMeta, uint64_t startPosInVector = 0);
const ColumnChunkMetadata& chunkMeta, uint64_t startPosInVector = 0);
void scanFiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector,
const CompressionMetadata& compMeta);
const ColumnChunkMetadata& chunkMeta);
virtual void lookupInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset,
Expand Down
28 changes: 19 additions & 9 deletions src/storage/store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <memory>

#include "common/assert.h"
#include "common/exception/not_implemented.h"
#include "storage/stats/property_statistics.h"
#include "storage/store/column_chunk.h"
Expand Down Expand Up @@ -312,6 +313,7 @@ void Column::batchLookup(
auto cursor = getPageCursorForOffset(transaction->getType(), nodeOffset);
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType());
KU_ASSERT(cursor.pageIdx < chunkMeta.pageIdx + chunkMeta.numPages);
readFromPage(transaction, cursor.pageIdx, [&](uint8_t* frame) -> void {
batchLookupFunc(frame, cursor, result, i, 1, chunkMeta.compMeta);
});
Expand All @@ -338,7 +340,7 @@ void Column::scan(transaction::Transaction* transaction, node_group_idx_t nodeGr
pageCursor.pageIdx += chunkMeta.pageIdx;
auto numValuesToScan = endOffsetInGroup - startOffsetInGroup;
scanUnfiltered(
transaction, pageCursor, numValuesToScan, resultVector, chunkMeta.compMeta, offsetInVector);
transaction, pageCursor, numValuesToScan, resultVector, chunkMeta, offsetInVector);
}

void Column::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) {
Expand All @@ -356,6 +358,7 @@ void Column::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) {
while (numValuesScanned < columnChunk->getCapacity()) {
auto numValuesToReadInPage =
std::min(numValuesPerPage, columnChunk->getCapacity() - numValuesScanned);
KU_ASSERT(cursor.pageIdx < chunkMetadata.pageIdx + chunkMetadata.numPages);
readFromPage(&DUMMY_READ_TRANSACTION, cursor.pageIdx, [&](uint8_t* frame) -> void {
readToPageFunc(frame, cursor, columnChunk->getData(), numValuesScanned,
numValuesToReadInPage, chunkMetadata.compMeta);
Expand All @@ -376,45 +379,49 @@ void Column::scanInternal(
auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType());
if (nodeIDVector->state->selVector->isUnfiltered()) {
scanUnfiltered(transaction, cursor, nodeIDVector->state->selVector->selectedSize,
resultVector, chunkMeta.compMeta);
resultVector, chunkMeta);
} else {
scanFiltered(transaction, cursor, nodeIDVector, resultVector, chunkMeta.compMeta);
scanFiltered(transaction, cursor, nodeIDVector, resultVector, chunkMeta);
}
}

void Column::scanUnfiltered(Transaction* transaction, PageElementCursor& pageCursor,
uint64_t numValuesToScan, ValueVector* resultVector, const CompressionMetadata& compMeta,
uint64_t numValuesToScan, ValueVector* resultVector, const ColumnChunkMetadata& chunkMeta,
uint64_t startPosInVector) {
uint64_t numValuesScanned = 0;
auto numValuesPerPage = compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
auto numValuesPerPage =
chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
while (numValuesScanned < numValuesToScan) {
uint64_t numValuesToScanInPage =
std::min((uint64_t)numValuesPerPage - pageCursor.elemPosInPage,
numValuesToScan - numValuesScanned);
KU_ASSERT(pageCursor.pageIdx < chunkMeta.pageIdx + chunkMeta.numPages);
readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void {
readToVectorFunc(frame, pageCursor, resultVector, numValuesScanned + startPosInVector,
numValuesToScanInPage, compMeta);
numValuesToScanInPage, chunkMeta.compMeta);
});
numValuesScanned += numValuesToScanInPage;
pageCursor.nextPage();
}
}

void Column::scanFiltered(Transaction* transaction, PageElementCursor& pageCursor,
ValueVector* nodeIDVector, ValueVector* resultVector, const CompressionMetadata& compMeta) {
ValueVector* nodeIDVector, ValueVector* resultVector, const ColumnChunkMetadata& chunkMeta) {
auto numValuesToScan = nodeIDVector->state->getOriginalSize();
auto numValuesScanned = 0u;
auto posInSelVector = 0u;
auto numValuesPerPage = compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
auto numValuesPerPage =
chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
while (numValuesScanned < numValuesToScan) {
uint64_t numValuesToScanInPage =
std::min((uint64_t)numValuesPerPage - pageCursor.elemPosInPage,
numValuesToScan - numValuesScanned);
if (isInRange(nodeIDVector->state->selVector->selectedPositions[posInSelVector],
numValuesScanned, numValuesScanned + numValuesToScanInPage)) {
KU_ASSERT(pageCursor.pageIdx < chunkMeta.pageIdx + chunkMeta.numPages);
readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void {
readToVectorFunc(frame, pageCursor, resultVector, numValuesScanned,
numValuesToScanInPage, compMeta);
numValuesToScanInPage, chunkMeta.compMeta);
});
}
numValuesScanned += numValuesToScanInPage;
Expand Down Expand Up @@ -452,6 +459,7 @@ void Column::lookupValue(transaction::Transaction* transaction, offset_t nodeOff
auto cursor = getPageCursorForOffset(transaction->getType(), nodeOffset);
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType());
KU_ASSERT(cursor.pageIdx < chunkMeta.pageIdx + chunkMeta.numPages);
readFromPage(transaction, cursor.pageIdx, [&](uint8_t* frame) -> void {
readToVectorFunc(
frame, cursor, resultVector, posInVector, 1 /* numValuesToRead */, chunkMeta.compMeta);
Expand Down Expand Up @@ -495,6 +503,8 @@ void Column::writeValue(
auto walPageInfo = createWALVersionOfPageForValue(nodeOffset);
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto chunkMeta = metadataDA->get(nodeGroupIdx, TransactionType::WRITE);
KU_ASSERT(
chunkMeta.pageIdx <= walPageInfo.originalPageIdx < chunkMeta.pageIdx + chunkMeta.numPages);
try {
writeFromVectorFunc(walPageInfo.frame, walPageInfo.posInPage, vectorToWriteFrom,
posInVectorToWriteFrom, chunkMeta.compMeta);
Expand Down
6 changes: 3 additions & 3 deletions src/storage/store/var_list_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ void VarListColumn::scan(node_group_idx_t nodeGroupIdx, kuzu::storage::ColumnChu
varListColumnChunk->setNumValues(0);
} else {
Column::scan(nodeGroupIdx, columnChunk);
auto metadata = metadataDA->get(nodeGroupIdx, transaction::TransactionType::READ_ONLY);
varListColumnChunk->setNumValues(metadata.numValues);
auto dataColumnMetadata =
dataColumn->getMetadata(nodeGroupIdx, transaction::TransactionType::WRITE);
varListColumnChunk->resizeDataColumnChunk(
metadata.numPages * BufferPoolConstants::PAGE_4KB_SIZE);
dataColumnMetadata.numPages * BufferPoolConstants::PAGE_4KB_SIZE);
dataColumn->scan(nodeGroupIdx, varListColumnChunk->getDataColumnChunk());
}
}
Expand Down

0 comments on commit b698dca

Please sign in to comment.