Skip to content

Commit

Permalink
Add cache pin on readStringToVector
Browse files Browse the repository at this point in the history
Also: I have read and agree to the terms under CLA.md
  • Loading branch information
rfdavid committed Mar 14, 2023
1 parent 855cb4d commit b8f50e1
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 53 deletions.
4 changes: 2 additions & 2 deletions src/include/storage/storage_structure/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ class StringPropertyColumn : public PropertyColumnWithOverflow {
inline void scan(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override {
Column::scan(transaction, resultVector, cursor);
diskOverflowFile.scanSequentialStringOverflow(transaction->getType(), *resultVector);
diskOverflowFile.scanStrings(transaction->getType(), *resultVector);
}
void scanWithSelState(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override {
Column::scanWithSelState(transaction, resultVector, cursor);
diskOverflowFile.scanSequentialStringOverflow(transaction->getType(), *resultVector);
diskOverflowFile.scanStrings(transaction->getType(), *resultVector);
}
};

Expand Down
21 changes: 14 additions & 7 deletions src/include/storage/storage_structure/disk_overflow_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,14 @@ class DiskOverflowFile : public StorageStructure {
return copy;
}

void readStringsToVector(
transaction::TransactionType trxType, common::ValueVector& valueVector);
void readStringToVector(transaction::TransactionType trxType, common::ku_string_t& kuStr,
common::InMemOverflowBuffer& inMemOverflowBuffer);
void scanStrings(transaction::TransactionType trxType, common::ValueVector& valueVector);

inline void scanSingleStringOverflow(
transaction::TransactionType trxType, common::ValueVector& vector, uint64_t vectorPos) {
assert(vector.dataType.typeID == common::STRING && !vector.isNull(vectorPos));
auto& kuString = ((common::ku_string_t*)vector.getData())[vectorPos];
readStringToVector(trxType, kuString, vector.getOverflowBuffer());
lookupString(trxType, kuString, vector.getOverflowBuffer());
}
void scanSequentialStringOverflow(
transaction::TransactionType trxType, common::ValueVector& vector);
inline void scanSingleListOverflow(
transaction::TransactionType trxType, common::ValueVector& vector, uint64_t vectorPos) {
assert(vector.dataType.typeID == common::VAR_LIST && !vector.isNull(vectorPos));
Expand Down Expand Up @@ -77,6 +72,15 @@ class DiskOverflowFile : public StorageStructure {
}

private:
struct OverflowPageCache {
BufferManagedFileHandle* bufferManagedFileHandle = nullptr;
common::page_idx_t pageIdx = UINT32_MAX;
uint8_t* frame = nullptr;
};
void lookupString(transaction::TransactionType trxType, common::ku_string_t& kuStr,
common::InMemOverflowBuffer& inMemOverflowBuffer);
void lookupString(transaction::TransactionType trxType, common::ku_string_t& kuStr,
common::InMemOverflowBuffer& inMemOverflowBuffer, OverflowPageCache& overflowPageCache);
void addNewPageIfNecessaryWithoutLock(uint32_t numBytesToAppend);
void readListToVector(transaction::TransactionType trxType, common::ku_list_t& kuList,
const common::DataType& dataType, common::InMemOverflowBuffer& inMemOverflowBuffer);
Expand All @@ -85,6 +89,9 @@ class DiskOverflowFile : public StorageStructure {
void setListRecursiveIfNestedWithoutLock(const common::ku_list_t& inMemSrcList,
common::ku_list_t& diskDstList, const common::DataType& dataType);
void logNewOverflowFileNextBytePosRecordIfNecessaryWithoutLock();
void pinOverflowPageCache(BufferManagedFileHandle* bufferManagedFileHandleToPin,
common::page_idx_t pageIdxToPin, OverflowPageCache& overflowPageCache);
void unpinOverflowPageCache(OverflowPageCache& overflowPageCache);

private:
// This is the index of the last free byte to which we can write.
Expand Down
78 changes: 36 additions & 42 deletions src/storage/storage_structure/disk_overflow_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,34 @@ using namespace kuzu::common;
namespace kuzu {
namespace storage {

void DiskOverflowFile::readStringsToVector(TransactionType trxType, ValueVector& valueVector) {
void DiskOverflowFile::pinOverflowPageCache(BufferManagedFileHandle* bufferManagedFileHandleToPin,
page_idx_t pageIdxToPin, OverflowPageCache& overflowPageCache) {
overflowPageCache.frame = bufferManager.pin(*bufferManagedFileHandleToPin, pageIdxToPin);
overflowPageCache.bufferManagedFileHandle = bufferManagedFileHandleToPin;
overflowPageCache.pageIdx = pageIdxToPin;
}

void DiskOverflowFile::unpinOverflowPageCache(OverflowPageCache& overflowPageCache) {
if (overflowPageCache.pageIdx != UINT32_MAX) {
bufferManager.unpin(*overflowPageCache.bufferManagedFileHandle, overflowPageCache.pageIdx);
}
}

void DiskOverflowFile::scanStrings(TransactionType trxType, ValueVector& valueVector) {
assert(!valueVector.state->isFlat());
OverflowPageCache overflowPageCache;
for (auto i = 0u; i < valueVector.state->selVector->selectedSize; i++) {
auto pos = valueVector.state->selVector->selectedPositions[i];
if (valueVector.isNull(pos)) {
continue;
}
readStringToVector(
trxType, ((ku_string_t*)valueVector.getData())[pos], valueVector.getOverflowBuffer());
lookupString(trxType, ((ku_string_t*)valueVector.getData())[pos],
valueVector.getOverflowBuffer(), overflowPageCache);
}
unpinOverflowPageCache(overflowPageCache);
}

void DiskOverflowFile::readStringToVector(
void DiskOverflowFile::lookupString(
TransactionType trxType, ku_string_t& kuStr, InMemOverflowBuffer& inMemOverflowBuffer) {
if (ku_string_t::isShortString(kuStr.len)) {
return;
Expand All @@ -39,45 +54,22 @@ void DiskOverflowFile::readStringToVector(
bufferManager.unpin(*fileHandleToPin, pageIdxToPin);
}

void DiskOverflowFile::scanSequentialStringOverflow(TransactionType trxType, ValueVector& vector) {
BufferManagedFileHandle* cachedFileHandle = nullptr;
page_idx_t cachedPageIdx = UINT32_MAX;
uint8_t* cachedFrame = nullptr;
for (auto i = 0u; i < vector.state->selVector->selectedSize; ++i) {
auto pos = vector.state->selVector->selectedPositions[i];
if (vector.isNull(pos)) {
continue;
}
auto& kuString = ((ku_string_t*)vector.getData())[pos];
if (ku_string_t::isShortString(kuString.len)) {
continue;
}
page_idx_t pageIdx = UINT32_MAX;
uint16_t pagePos = UINT16_MAX;
TypeUtils::decodeOverflowPtr(kuString.overflowPtr, pageIdx, pagePos);
auto [fileHandleToPin, pageIdxToPin] =
StorageStructureUtils::getFileHandleAndPhysicalPageIdxToPin(
*fileHandle, pageIdx, *wal, trxType);
if (pageIdxToPin == cachedPageIdx) { // cache hit
InMemOverflowBufferUtils::copyString(
(char*)(cachedFrame + pagePos), kuString.len, kuString, vector.getOverflowBuffer());
continue;
}
// cache miss
if (cachedPageIdx != UINT32_MAX) { // unpin cached frame
bufferManager.unpin(*cachedFileHandle, cachedPageIdx);
}
// pin new frame and update cache
auto frame = bufferManager.pin(*fileHandleToPin, pageIdxToPin);
InMemOverflowBufferUtils::copyString(
(char*)(frame + pagePos), kuString.len, kuString, vector.getOverflowBuffer());
cachedFileHandle = fileHandleToPin;
cachedPageIdx = pageIdxToPin;
cachedFrame = frame;
void DiskOverflowFile::lookupString(TransactionType trxType, ku_string_t& kuStr,
InMemOverflowBuffer& inMemOverflowBuffer, OverflowPageCache& overflowPageCache) {
if (ku_string_t::isShortString(kuStr.len)) {
return;
}
if (cachedPageIdx != UINT32_MAX) {
bufferManager.unpin(*cachedFileHandle, cachedPageIdx);
PageByteCursor cursor;
TypeUtils::decodeOverflowPtr(kuStr.overflowPtr, cursor.pageIdx, cursor.offsetInPage);
auto [fileHandleToPin, pageIdxToPin] =
StorageStructureUtils::getFileHandleAndPhysicalPageIdxToPin(
*fileHandle, cursor.pageIdx, *wal, trxType);
if (pageIdxToPin != overflowPageCache.pageIdx) { // cache miss
unpinOverflowPageCache(overflowPageCache);
pinOverflowPageCache(fileHandleToPin, pageIdxToPin, overflowPageCache);
}
InMemOverflowBufferUtils::copyString((char*)(overflowPageCache.frame + cursor.offsetInPage),
kuStr.len, kuStr, inMemOverflowBuffer);
}

void DiskOverflowFile::readListsToVector(TransactionType trxType, ValueVector& valueVector) {
Expand All @@ -104,9 +96,11 @@ void DiskOverflowFile::readListToVector(TransactionType trxType, ku_list_t& kuLi
bufferManager.unpin(*fileHandleToPin, pageIdxToPin);
if (dataType.childType->typeID == STRING) {
auto kuStrings = (ku_string_t*)(kuList.overflowPtr);
OverflowPageCache overflowPageCache;
for (auto i = 0u; i < kuList.size; i++) {
readStringToVector(trxType, kuStrings[i], inMemOverflowBuffer);
lookupString(trxType, kuStrings[i], inMemOverflowBuffer, overflowPageCache);
}
unpinOverflowPageCache(overflowPageCache);
} else if (dataType.childType->typeID == VAR_LIST) {
auto kuLists = (ku_list_t*)(kuList.overflowPtr);
for (auto i = 0u; i < kuList.size; i++) {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/storage_structure/lists/lists.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,13 @@ void Lists::readPropertyUpdatesToInMemListIfExists(InMemList& inMemList,
void StringPropertyLists::readFromLargeList(ValueVector* valueVector, ListHandle& listHandle) {
valueVector->resetOverflowBuffer();
Lists::readFromLargeList(valueVector, listHandle);
diskOverflowFile.readStringsToVector(TransactionType::READ_ONLY, *valueVector);
diskOverflowFile.scanStrings(TransactionType::READ_ONLY, *valueVector);
}

void StringPropertyLists::readFromSmallList(ValueVector* valueVector, ListHandle& listHandle) {
valueVector->resetOverflowBuffer();
Lists::readFromSmallList(valueVector, listHandle);
diskOverflowFile.readStringsToVector(TransactionType::READ_ONLY, *valueVector);
diskOverflowFile.scanStrings(TransactionType::READ_ONLY, *valueVector);
}

void ListPropertyLists::readFromLargeList(ValueVector* valueVector, ListHandle& listHandle) {
Expand Down

0 comments on commit b8f50e1

Please sign in to comment.