Skip to content

Commit

Permalink
Fix size of data scanned from disk in VarList updates
Browse files Browse the repository at this point in the history
Also check page bounds of reads/writes to make sure they are within the bounds set by the metadata
  • Loading branch information
benjaminwinger committed Nov 6, 2023
1 parent 84f145c commit beba021
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
4 changes: 2 additions & 2 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ class Column {
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void scanUnfiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
uint64_t numValuesToScan, common::ValueVector* resultVector,
const CompressionMetadata& compMeta, uint64_t startPosInVector = 0);
const ColumnChunkMetadata& chunkMeta, uint64_t startPosInVector = 0);
void scanFiltered(transaction::Transaction* transaction, PageElementCursor& pageCursor,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector,
const CompressionMetadata& compMeta);
const ColumnChunkMetadata& chunkMeta);
virtual void lookupInternal(transaction::Transaction* transaction,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookupValue(transaction::Transaction* transaction, common::offset_t nodeOffset,
Expand Down
28 changes: 19 additions & 9 deletions src/storage/store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <memory>

#include "common/assert.h"
#include "common/exception/not_implemented.h"
#include "storage/stats/property_statistics.h"
#include "storage/store/column_chunk.h"
Expand Down Expand Up @@ -313,6 +314,7 @@ void Column::batchLookup(
auto cursor = getPageCursorForOffset(transaction->getType(), nodeOffset);
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType());
KU_ASSERT(cursor.pageIdx < chunkMeta.pageIdx + chunkMeta.numPages);
readFromPage(transaction, cursor.pageIdx, [&](uint8_t* frame) -> void {
batchLookupFunc(frame, cursor, result, i, 1, chunkMeta.compMeta);
});
Expand All @@ -339,7 +341,7 @@ void Column::scan(transaction::Transaction* transaction, node_group_idx_t nodeGr
pageCursor.pageIdx += chunkMeta.pageIdx;
auto numValuesToScan = endOffsetInGroup - startOffsetInGroup;
scanUnfiltered(
transaction, pageCursor, numValuesToScan, resultVector, chunkMeta.compMeta, offsetInVector);
transaction, pageCursor, numValuesToScan, resultVector, chunkMeta, offsetInVector);
}

void Column::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) {
Expand All @@ -357,6 +359,7 @@ void Column::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) {
while (numValuesScanned < columnChunk->getCapacity()) {
auto numValuesToReadInPage =
std::min(numValuesPerPage, columnChunk->getCapacity() - numValuesScanned);
KU_ASSERT(cursor.pageIdx < chunkMetadata.pageIdx + chunkMetadata.numPages);
readFromPage(&DUMMY_READ_TRANSACTION, cursor.pageIdx, [&](uint8_t* frame) -> void {
readToPageFunc(frame, cursor, columnChunk->getData(), numValuesScanned,
numValuesToReadInPage, chunkMetadata.compMeta);
Expand All @@ -377,45 +380,49 @@ void Column::scanInternal(
auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType());
if (nodeIDVector->state->selVector->isUnfiltered()) {
scanUnfiltered(transaction, cursor, nodeIDVector->state->selVector->selectedSize,
resultVector, chunkMeta.compMeta);
resultVector, chunkMeta);
} else {
scanFiltered(transaction, cursor, nodeIDVector, resultVector, chunkMeta.compMeta);
scanFiltered(transaction, cursor, nodeIDVector, resultVector, chunkMeta);
}
}

void Column::scanUnfiltered(Transaction* transaction, PageElementCursor& pageCursor,
uint64_t numValuesToScan, ValueVector* resultVector, const CompressionMetadata& compMeta,
uint64_t numValuesToScan, ValueVector* resultVector, const ColumnChunkMetadata& chunkMeta,
uint64_t startPosInVector) {
uint64_t numValuesScanned = 0;
auto numValuesPerPage = compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
auto numValuesPerPage =
chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
while (numValuesScanned < numValuesToScan) {
uint64_t numValuesToScanInPage =
std::min((uint64_t)numValuesPerPage - pageCursor.elemPosInPage,
numValuesToScan - numValuesScanned);
KU_ASSERT(pageCursor.pageIdx < chunkMeta.pageIdx + chunkMeta.numPages);
readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void {
readToVectorFunc(frame, pageCursor, resultVector, numValuesScanned + startPosInVector,
numValuesToScanInPage, compMeta);
numValuesToScanInPage, chunkMeta.compMeta);
});
numValuesScanned += numValuesToScanInPage;
pageCursor.nextPage();
}
}

void Column::scanFiltered(Transaction* transaction, PageElementCursor& pageCursor,
ValueVector* nodeIDVector, ValueVector* resultVector, const CompressionMetadata& compMeta) {
ValueVector* nodeIDVector, ValueVector* resultVector, const ColumnChunkMetadata& chunkMeta) {
auto numValuesToScan = nodeIDVector->state->getOriginalSize();
auto numValuesScanned = 0u;
auto posInSelVector = 0u;
auto numValuesPerPage = compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
auto numValuesPerPage =
chunkMeta.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, dataType);
while (numValuesScanned < numValuesToScan) {
uint64_t numValuesToScanInPage =
std::min((uint64_t)numValuesPerPage - pageCursor.elemPosInPage,
numValuesToScan - numValuesScanned);
if (isInRange(nodeIDVector->state->selVector->selectedPositions[posInSelVector],
numValuesScanned, numValuesScanned + numValuesToScanInPage)) {
KU_ASSERT(pageCursor.pageIdx < chunkMeta.pageIdx + chunkMeta.numPages);
readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void {
readToVectorFunc(frame, pageCursor, resultVector, numValuesScanned,
numValuesToScanInPage, compMeta);
numValuesToScanInPage, chunkMeta.compMeta);
});
}
numValuesScanned += numValuesToScanInPage;
Expand Down Expand Up @@ -453,6 +460,7 @@ void Column::lookupValue(transaction::Transaction* transaction, offset_t nodeOff
auto cursor = getPageCursorForOffset(transaction->getType(), nodeOffset);
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto chunkMeta = metadataDA->get(nodeGroupIdx, transaction->getType());
KU_ASSERT(cursor.pageIdx < chunkMeta.pageIdx + chunkMeta.numPages);
readFromPage(transaction, cursor.pageIdx, [&](uint8_t* frame) -> void {
readToVectorFunc(
frame, cursor, resultVector, posInVector, 1 /* numValuesToRead */, chunkMeta.compMeta);
Expand Down Expand Up @@ -496,6 +504,8 @@ void Column::writeValue(
auto walPageInfo = createWALVersionOfPageForValue(nodeOffset);
auto nodeGroupIdx = StorageUtils::getNodeGroupIdx(nodeOffset);
auto chunkMeta = metadataDA->get(nodeGroupIdx, TransactionType::WRITE);
KU_ASSERT(
chunkMeta.pageIdx <= walPageInfo.originalPageIdx < chunkMeta.pageIdx + chunkMeta.numPages);
try {
writeFromVectorFunc(walPageInfo.frame, walPageInfo.posInPage, vectorToWriteFrom,
posInVectorToWriteFrom, chunkMeta.compMeta);
Expand Down
6 changes: 3 additions & 3 deletions src/storage/store/var_list_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ void VarListColumn::scan(node_group_idx_t nodeGroupIdx, kuzu::storage::ColumnChu
varListColumnChunk->setNumValues(0);
} else {
Column::scan(nodeGroupIdx, columnChunk);
auto metadata = metadataDA->get(nodeGroupIdx, transaction::TransactionType::READ_ONLY);
varListColumnChunk->setNumValues(metadata.numValues);
auto dataColumnMetadata =
dataColumn->getMetadata(nodeGroupIdx, transaction::TransactionType::WRITE);
varListColumnChunk->resizeDataColumnChunk(
metadata.numPages * BufferPoolConstants::PAGE_4KB_SIZE);
dataColumnMetadata.numPages * BufferPoolConstants::PAGE_4KB_SIZE);
dataColumn->scan(nodeGroupIdx, varListColumnChunk->getDataColumnChunk());
}
}
Expand Down

0 comments on commit beba021

Please sign in to comment.