Skip to content

Commit

Permalink
Add cache pin to readStringToVector
Browse files Browse the repository at this point in the history
Also: I have read and agree to the terms under CLA.md
  • Loading branch information
rfdavid committed Mar 4, 2023
1 parent c6b5317 commit 729aa3e
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 52 deletions.
4 changes: 2 additions & 2 deletions src/include/storage/storage_structure/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ class StringPropertyColumn : public PropertyColumnWithOverflow {
const std::shared_ptr<common::ValueVector>& resultVector,
PageElementCursor& cursor) override {
Column::scan(transaction, resultVector, cursor);
diskOverflowFile.scanSequentialStringOverflow(transaction->getType(), *resultVector);
diskOverflowFile.scanStrings(transaction->getType(), *resultVector);
}
void scanWithSelState(transaction::Transaction* transaction,
const std::shared_ptr<common::ValueVector>& resultVector,
PageElementCursor& cursor) override {
Column::scanWithSelState(transaction, resultVector, cursor);
diskOverflowFile.scanSequentialStringOverflow(transaction->getType(), *resultVector);
diskOverflowFile.scanStrings(transaction->getType(), *resultVector);
}
};

Expand Down
18 changes: 12 additions & 6 deletions src/include/storage/storage_structure/disk_overflow_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,24 @@ class DiskOverflowFile : public StorageStructure {
return copy;
}

void readStringsToVector(
transaction::TransactionType trxType, common::ValueVector& valueVector);
void readStringToVector(transaction::TransactionType trxType, common::ku_string_t& kuStr,
struct OverflowPageCache {
FileHandle* fileHandle = nullptr;
common::page_idx_t pageIdx = UINT32_MAX;
uint8_t* frame = nullptr;
};

void scanStrings(transaction::TransactionType trxType, common::ValueVector& valueVector);
void lookupString(transaction::TransactionType trxType, common::ku_string_t& kuStr,
common::InMemOverflowBuffer& inMemOverflowBuffer);
void lookupString(transaction::TransactionType trxType, common::ku_string_t& kuStr,
common::InMemOverflowBuffer& inMemOverflowBuffer, OverflowPageCache& overflowPageCache);

inline void scanSingleStringOverflow(
transaction::TransactionType trxType, common::ValueVector& vector, uint64_t vectorPos) {
assert(vector.dataType.typeID == common::STRING && !vector.isNull(vectorPos));
auto& kuString = ((common::ku_string_t*)vector.getData())[vectorPos];
readStringToVector(trxType, kuString, vector.getOverflowBuffer());
lookupString(trxType, kuString, vector.getOverflowBuffer());
}
void scanSequentialStringOverflow(
transaction::TransactionType trxType, common::ValueVector& vector);
inline void scanSingleListOverflow(
transaction::TransactionType trxType, common::ValueVector& vector, uint64_t vectorPos) {
assert(vector.dataType.typeID == common::VAR_LIST && !vector.isNull(vectorPos));
Expand Down Expand Up @@ -86,6 +91,7 @@ class DiskOverflowFile : public StorageStructure {
void setListRecursiveIfNestedWithoutLock(const common::ku_list_t& inMemSrcList,
common::ku_list_t& diskDstList, const common::DataType& dataType);
void logNewOverflowFileNextBytePosRecordIfNecessaryWithoutLock();
void unpinOverflowPageCache(OverflowPageCache& overflowPageCache);

private:
// This is the index of the last free byte to which we can write.
Expand Down
73 changes: 31 additions & 42 deletions src/storage/storage_structure/disk_overflow_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,27 @@ using namespace kuzu::common;
namespace kuzu {
namespace storage {

void DiskOverflowFile::readStringsToVector(TransactionType trxType, ValueVector& valueVector) {
void DiskOverflowFile::unpinOverflowPageCache(OverflowPageCache& overflowPageCache) {
if (overflowPageCache.pageIdx != UINT32_MAX) {
bufferManager.unpin(*overflowPageCache.fileHandle, overflowPageCache.pageIdx);
}
}

void DiskOverflowFile::scanStrings(TransactionType trxType, ValueVector& valueVector) {
assert(!valueVector.state->isFlat());
OverflowPageCache overflowPageCache;
for (auto i = 0u; i < valueVector.state->selVector->selectedSize; i++) {
auto pos = valueVector.state->selVector->selectedPositions[i];
if (valueVector.isNull(pos)) {
continue;
}
readStringToVector(
trxType, ((ku_string_t*)valueVector.getData())[pos], valueVector.getOverflowBuffer());
lookupString(trxType, ((ku_string_t*)valueVector.getData())[pos],
valueVector.getOverflowBuffer(), overflowPageCache);
}
unpinOverflowPageCache(overflowPageCache);
}

void DiskOverflowFile::readStringToVector(
void DiskOverflowFile::lookupString(
TransactionType trxType, ku_string_t& kuStr, InMemOverflowBuffer& inMemOverflowBuffer) {
if (ku_string_t::isShortString(kuStr.len)) {
return;
Expand All @@ -39,45 +47,24 @@ void DiskOverflowFile::readStringToVector(
bufferManager.unpin(*fileHandleToPin, pageIdxToPin);
}

void DiskOverflowFile::scanSequentialStringOverflow(TransactionType trxType, ValueVector& vector) {
FileHandle* cachedFileHandle = nullptr;
page_idx_t cachedPageIdx = UINT32_MAX;
uint8_t* cachedFrame = nullptr;
for (auto i = 0u; i < vector.state->selVector->selectedSize; ++i) {
auto pos = vector.state->selVector->selectedPositions[i];
if (vector.isNull(pos)) {
continue;
}
auto& kuString = ((ku_string_t*)vector.getData())[pos];
if (ku_string_t::isShortString(kuString.len)) {
continue;
}
page_idx_t pageIdx = UINT32_MAX;
uint16_t pagePos = UINT16_MAX;
TypeUtils::decodeOverflowPtr(kuString.overflowPtr, pageIdx, pagePos);
auto [fileHandleToPin, pageIdxToPin] =
StorageStructureUtils::getFileHandleAndPhysicalPageIdxToPin(
fileHandle, pageIdx, *wal, trxType);
if (pageIdxToPin == cachedPageIdx) { // cache hit
InMemOverflowBufferUtils::copyString(
(char*)(cachedFrame + pagePos), kuString.len, kuString, vector.getOverflowBuffer());
continue;
}
// cache miss
if (cachedPageIdx != UINT32_MAX) { // unpin cached frame
bufferManager.unpin(*cachedFileHandle, cachedPageIdx);
}
// pin new frame and update cache
auto frame = bufferManager.pin(*fileHandleToPin, pageIdxToPin);
InMemOverflowBufferUtils::copyString(
(char*)(frame + pagePos), kuString.len, kuString, vector.getOverflowBuffer());
cachedFileHandle = fileHandleToPin;
cachedPageIdx = pageIdxToPin;
cachedFrame = frame;
void DiskOverflowFile::lookupString(TransactionType trxType, ku_string_t& kuStr,
InMemOverflowBuffer& inMemOverflowBuffer, OverflowPageCache& overflowPageCache) {
if (ku_string_t::isShortString(kuStr.len)) {
return;
}
if (cachedPageIdx != UINT32_MAX) {
bufferManager.unpin(*cachedFileHandle, cachedPageIdx);
PageByteCursor cursor;
TypeUtils::decodeOverflowPtr(kuStr.overflowPtr, cursor.pageIdx, cursor.offsetInPage);
auto [fileHandleToPin, pageIdxToPin] =
StorageStructureUtils::getFileHandleAndPhysicalPageIdxToPin(
fileHandle, cursor.pageIdx, *wal, trxType);
if (pageIdxToPin != overflowPageCache.pageIdx) { // cache miss
unpinOverflowPageCache(overflowPageCache);
overflowPageCache.frame = bufferManager.pin(*fileHandleToPin, pageIdxToPin);
overflowPageCache.fileHandle = fileHandleToPin;
overflowPageCache.pageIdx = pageIdxToPin;
}
InMemOverflowBufferUtils::copyString((char*)(overflowPageCache.frame + cursor.offsetInPage),
kuStr.len, kuStr, inMemOverflowBuffer);
}

void DiskOverflowFile::readListsToVector(TransactionType trxType, ValueVector& valueVector) {
Expand All @@ -104,9 +91,11 @@ void DiskOverflowFile::readListToVector(TransactionType trxType, ku_list_t& kuLi
bufferManager.unpin(*fileHandleToPin, pageIdxToPin);
if (dataType.childType->typeID == STRING) {
auto kuStrings = (ku_string_t*)(kuList.overflowPtr);
OverflowPageCache overflowPageCache;
for (auto i = 0u; i < kuList.size; i++) {
readStringToVector(trxType, kuStrings[i], inMemOverflowBuffer);
lookupString(trxType, kuStrings[i], inMemOverflowBuffer, overflowPageCache);
}
unpinOverflowPageCache(overflowPageCache);
} else if (dataType.childType->typeID == VAR_LIST) {
auto kuLists = (ku_list_t*)(kuList.overflowPtr);
for (auto i = 0u; i < kuList.size; i++) {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/storage_structure/lists/lists.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,14 @@ void StringPropertyLists::readFromLargeList(
const std::shared_ptr<ValueVector>& valueVector, ListHandle& listHandle) {
valueVector->resetOverflowBuffer();
Lists::readFromLargeList(valueVector, listHandle);
diskOverflowFile.readStringsToVector(TransactionType::READ_ONLY, *valueVector);
diskOverflowFile.scanStrings(TransactionType::READ_ONLY, *valueVector);
}

void StringPropertyLists::readFromSmallList(
const std::shared_ptr<ValueVector>& valueVector, ListHandle& listHandle) {
valueVector->resetOverflowBuffer();
Lists::readFromSmallList(valueVector, listHandle);
diskOverflowFile.readStringsToVector(TransactionType::READ_ONLY, *valueVector);
diskOverflowFile.scanStrings(TransactionType::READ_ONLY, *valueVector);
}

void ListPropertyLists::readFromLargeList(
Expand Down

0 comments on commit 729aa3e

Please sign in to comment.