Skip to content

Commit

Permalink
Add cache pin to readStringToVector
Browse files Browse the repository at this point in the history
Also: I have read and agree to the terms under CLA.md
  • Loading branch information
rfdavid committed Mar 3, 2023
1 parent 308fe34 commit 12263d0
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 54 deletions.
4 changes: 2 additions & 2 deletions src/include/storage/storage_structure/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ class StringPropertyColumn : public PropertyColumnWithOverflow {
const std::shared_ptr<common::ValueVector>& resultVector,
PageElementCursor& cursor) override {
Column::scan(transaction, resultVector, cursor);
diskOverflowFile.scanSequentialStringOverflow(transaction->getType(), *resultVector);
diskOverflowFile.readStringsToVector(transaction->getType(), *resultVector);
}
void scanWithSelState(transaction::Transaction* transaction,
const std::shared_ptr<common::ValueVector>& resultVector,
PageElementCursor& cursor) override {
Column::scanWithSelState(transaction, resultVector, cursor);
diskOverflowFile.scanSequentialStringOverflow(transaction->getType(), *resultVector);
diskOverflowFile.readStringsToVector(transaction->getType(), *resultVector);
}
};

Expand Down
15 changes: 11 additions & 4 deletions src/include/storage/storage_structure/disk_overflow_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,25 @@ class DiskOverflowFile : public StorageStructure {
return copy;
}

struct OverflowCache {
FileHandle* fileHandle = nullptr;
common::page_idx_t pageIdx = UINT32_MAX;
uint8_t* frame = nullptr;
};

void readStringsToVector(
transaction::TransactionType trxType, common::ValueVector& valueVector);
void readStringToVector(transaction::TransactionType trxType, common::ku_string_t& kuStr,
common::InMemOverflowBuffer& inMemOverflowBuffer);
common::InMemOverflowBuffer& inMemOverflowBuffer, OverflowCache& overflowCache);

inline void scanSingleStringOverflow(
transaction::TransactionType trxType, common::ValueVector& vector, uint64_t vectorPos) {
OverflowCache overflowCache;
assert(vector.dataType.typeID == common::STRING && !vector.isNull(vectorPos));
auto& kuString = ((common::ku_string_t*)vector.getData())[vectorPos];
readStringToVector(trxType, kuString, vector.getOverflowBuffer());
readStringToVector(trxType, kuString, vector.getOverflowBuffer(), overflowCache);
unpinOverflowCache(overflowCache);
}
void scanSequentialStringOverflow(
transaction::TransactionType trxType, common::ValueVector& vector);
inline void scanSingleListOverflow(
transaction::TransactionType trxType, common::ValueVector& vector, uint64_t vectorPos) {
assert(vector.dataType.typeID == common::VAR_LIST && !vector.isNull(vectorPos));
Expand Down Expand Up @@ -86,6 +92,7 @@ class DiskOverflowFile : public StorageStructure {
void setListRecursiveIfNestedWithoutLock(const common::ku_list_t& inMemSrcList,
common::ku_list_t& diskDstList, const common::DataType& dataType);
void logNewOverflowFileNextBytePosRecordIfNecessaryWithoutLock();
void unpinOverflowCache(OverflowCache& overflowCache);

private:
// This is the index of the last free byte to which we can write.
Expand Down
71 changes: 23 additions & 48 deletions src/storage/storage_structure/disk_overflow_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,28 @@ using namespace kuzu::common;
namespace kuzu {
namespace storage {

void DiskOverflowFile::unpinOverflowCache(OverflowCache& overflowCache) {
if (overflowCache.pageIdx != UINT32_MAX) {
bufferManager.unpin(*overflowCache.fileHandle, overflowCache.pageIdx);
}
}

void DiskOverflowFile::readStringsToVector(TransactionType trxType, ValueVector& valueVector) {
assert(!valueVector.state->isFlat());
OverflowCache overflowCache;
for (auto i = 0u; i < valueVector.state->selVector->selectedSize; i++) {
auto pos = valueVector.state->selVector->selectedPositions[i];
if (valueVector.isNull(pos)) {
continue;
}
readStringToVector(
trxType, ((ku_string_t*)valueVector.getData())[pos], valueVector.getOverflowBuffer());
readStringToVector(trxType, ((ku_string_t*)valueVector.getData())[pos],
valueVector.getOverflowBuffer(), overflowCache);
}
unpinOverflowCache(overflowCache);
}

void DiskOverflowFile::readStringToVector(
TransactionType trxType, ku_string_t& kuStr, InMemOverflowBuffer& inMemOverflowBuffer) {
void DiskOverflowFile::readStringToVector(TransactionType trxType, ku_string_t& kuStr,
InMemOverflowBuffer& inMemOverflowBuffer, OverflowCache& overflowCache) {
if (ku_string_t::isShortString(kuStr.len)) {
return;
}
Expand All @@ -33,51 +41,16 @@ void DiskOverflowFile::readStringToVector(
auto [fileHandleToPin, pageIdxToPin] =
StorageStructureUtils::getFileHandleAndPhysicalPageIdxToPin(
fileHandle, cursor.pageIdx, *wal, trxType);
auto frame = bufferManager.pin(*fileHandleToPin, pageIdxToPin);
InMemOverflowBufferUtils::copyString(
(char*)(frame + cursor.offsetInPage), kuStr.len, kuStr, inMemOverflowBuffer);
bufferManager.unpin(*fileHandleToPin, pageIdxToPin);
}

void DiskOverflowFile::scanSequentialStringOverflow(TransactionType trxType, ValueVector& vector) {
FileHandle* cachedFileHandle = nullptr;
page_idx_t cachedPageIdx = UINT32_MAX;
uint8_t* cachedFrame = nullptr;
for (auto i = 0u; i < vector.state->selVector->selectedSize; ++i) {
auto pos = vector.state->selVector->selectedPositions[i];
if (vector.isNull(pos)) {
continue;
if (pageIdxToPin != overflowCache.pageIdx) { // cache miss
if (overflowCache.pageIdx != UINT32_MAX) {
bufferManager.unpin(*overflowCache.fileHandle, overflowCache.pageIdx);
}
auto& kuString = ((ku_string_t*)vector.getData())[pos];
if (ku_string_t::isShortString(kuString.len)) {
continue;
}
page_idx_t pageIdx = UINT32_MAX;
uint16_t pagePos = UINT16_MAX;
TypeUtils::decodeOverflowPtr(kuString.overflowPtr, pageIdx, pagePos);
auto [fileHandleToPin, pageIdxToPin] =
StorageStructureUtils::getFileHandleAndPhysicalPageIdxToPin(
fileHandle, pageIdx, *wal, trxType);
if (pageIdxToPin == cachedPageIdx) { // cache hit
InMemOverflowBufferUtils::copyString(
(char*)(cachedFrame + pagePos), kuString.len, kuString, vector.getOverflowBuffer());
continue;
}
// cache miss
if (cachedPageIdx != UINT32_MAX) { // unpin cached frame
bufferManager.unpin(*cachedFileHandle, cachedPageIdx);
}
// pin new frame and update cache
auto frame = bufferManager.pin(*fileHandleToPin, pageIdxToPin);
InMemOverflowBufferUtils::copyString(
(char*)(frame + pagePos), kuString.len, kuString, vector.getOverflowBuffer());
cachedFileHandle = fileHandleToPin;
cachedPageIdx = pageIdxToPin;
cachedFrame = frame;
}
if (cachedPageIdx != UINT32_MAX) {
bufferManager.unpin(*cachedFileHandle, cachedPageIdx);
overflowCache.frame = bufferManager.pin(*fileHandleToPin, pageIdxToPin);
overflowCache.fileHandle = fileHandleToPin;
overflowCache.pageIdx = pageIdxToPin;
}
InMemOverflowBufferUtils::copyString(
(char*)(overflowCache.frame + cursor.offsetInPage), kuStr.len, kuStr, inMemOverflowBuffer);
}

void DiskOverflowFile::readListsToVector(TransactionType trxType, ValueVector& valueVector) {
Expand All @@ -104,9 +77,11 @@ void DiskOverflowFile::readListToVector(TransactionType trxType, ku_list_t& kuLi
bufferManager.unpin(*fileHandleToPin, pageIdxToPin);
if (dataType.childType->typeID == STRING) {
auto kuStrings = (ku_string_t*)(kuList.overflowPtr);
OverflowCache overflowCache;
for (auto i = 0u; i < kuList.size; i++) {
readStringToVector(trxType, kuStrings[i], inMemOverflowBuffer);
readStringToVector(trxType, kuStrings[i], inMemOverflowBuffer, overflowCache);
}
unpinOverflowCache(overflowCache);
} else if (dataType.childType->typeID == VAR_LIST) {
auto kuLists = (ku_list_t*)(kuList.overflowPtr);
for (auto i = 0u; i < kuList.size; i++) {
Expand Down

0 comments on commit 12263d0

Please sign in to comment.