Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequential read for StringPropertyList #1327

Merged
merged 1 commit into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/include/storage/storage_structure/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ class StringPropertyColumn : public PropertyColumnWithOverflow {
inline void scan(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override {
Column::scan(transaction, resultVector, cursor);
diskOverflowFile.scanSequentialStringOverflow(transaction->getType(), *resultVector);
diskOverflowFile.scanStrings(transaction->getType(), *resultVector);
}
void scanWithSelState(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override {
Column::scanWithSelState(transaction, resultVector, cursor);
diskOverflowFile.scanSequentialStringOverflow(transaction->getType(), *resultVector);
diskOverflowFile.scanStrings(transaction->getType(), *resultVector);
}
};

Expand Down
21 changes: 14 additions & 7 deletions src/include/storage/storage_structure/disk_overflow_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,14 @@ class DiskOverflowFile : public StorageStructure {
return copy;
}

void readStringsToVector(
transaction::TransactionType trxType, common::ValueVector& valueVector);
void readStringToVector(transaction::TransactionType trxType, common::ku_string_t& kuStr,
common::InMemOverflowBuffer& inMemOverflowBuffer);
void scanStrings(transaction::TransactionType trxType, common::ValueVector& valueVector);

inline void scanSingleStringOverflow(
transaction::TransactionType trxType, common::ValueVector& vector, uint64_t vectorPos) {
assert(vector.dataType.typeID == common::STRING && !vector.isNull(vectorPos));
auto& kuString = ((common::ku_string_t*)vector.getData())[vectorPos];
readStringToVector(trxType, kuString, vector.getOverflowBuffer());
lookupString(trxType, kuString, vector.getOverflowBuffer());
}
void scanSequentialStringOverflow(
transaction::TransactionType trxType, common::ValueVector& vector);
inline void scanSingleListOverflow(
transaction::TransactionType trxType, common::ValueVector& vector, uint64_t vectorPos) {
assert(vector.dataType.typeID == common::VAR_LIST && !vector.isNull(vectorPos));
Expand Down Expand Up @@ -77,6 +72,15 @@ class DiskOverflowFile : public StorageStructure {
}

private:
struct OverflowPageCache {
BufferManagedFileHandle* bufferManagedFileHandle = nullptr;
common::page_idx_t pageIdx = UINT32_MAX;
uint8_t* frame = nullptr;
};
void lookupString(transaction::TransactionType trxType, common::ku_string_t& kuStr,
common::InMemOverflowBuffer& inMemOverflowBuffer);
void lookupString(transaction::TransactionType trxType, common::ku_string_t& kuStr,
common::InMemOverflowBuffer& inMemOverflowBuffer, OverflowPageCache& overflowPageCache);
void addNewPageIfNecessaryWithoutLock(uint32_t numBytesToAppend);
void readListToVector(transaction::TransactionType trxType, common::ku_list_t& kuList,
const common::DataType& dataType, common::InMemOverflowBuffer& inMemOverflowBuffer);
Expand All @@ -85,6 +89,9 @@ class DiskOverflowFile : public StorageStructure {
void setListRecursiveIfNestedWithoutLock(const common::ku_list_t& inMemSrcList,
common::ku_list_t& diskDstList, const common::DataType& dataType);
void logNewOverflowFileNextBytePosRecordIfNecessaryWithoutLock();
void pinOverflowPageCache(BufferManagedFileHandle* bufferManagedFileHandleToPin,
common::page_idx_t pageIdxToPin, OverflowPageCache& overflowPageCache);
void unpinOverflowPageCache(OverflowPageCache& overflowPageCache);

private:
// This is the index of the last free byte to which we can write.
Expand Down
78 changes: 36 additions & 42 deletions src/storage/storage_structure/disk_overflow_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,34 @@ using namespace kuzu::common;
namespace kuzu {
namespace storage {

void DiskOverflowFile::readStringsToVector(TransactionType trxType, ValueVector& valueVector) {
void DiskOverflowFile::pinOverflowPageCache(BufferManagedFileHandle* bufferManagedFileHandleToPin,
page_idx_t pageIdxToPin, OverflowPageCache& overflowPageCache) {
overflowPageCache.frame = bufferManager.pin(*bufferManagedFileHandleToPin, pageIdxToPin);
overflowPageCache.bufferManagedFileHandle = bufferManagedFileHandleToPin;
overflowPageCache.pageIdx = pageIdxToPin;
}

void DiskOverflowFile::unpinOverflowPageCache(OverflowPageCache& overflowPageCache) {
if (overflowPageCache.pageIdx != UINT32_MAX) {
bufferManager.unpin(*overflowPageCache.bufferManagedFileHandle, overflowPageCache.pageIdx);
}
}

void DiskOverflowFile::scanStrings(TransactionType trxType, ValueVector& valueVector) {
assert(!valueVector.state->isFlat());
OverflowPageCache overflowPageCache;
for (auto i = 0u; i < valueVector.state->selVector->selectedSize; i++) {
auto pos = valueVector.state->selVector->selectedPositions[i];
if (valueVector.isNull(pos)) {
continue;
}
readStringToVector(
trxType, ((ku_string_t*)valueVector.getData())[pos], valueVector.getOverflowBuffer());
lookupString(trxType, ((ku_string_t*)valueVector.getData())[pos],
valueVector.getOverflowBuffer(), overflowPageCache);
}
unpinOverflowPageCache(overflowPageCache);
}

void DiskOverflowFile::readStringToVector(
void DiskOverflowFile::lookupString(
TransactionType trxType, ku_string_t& kuStr, InMemOverflowBuffer& inMemOverflowBuffer) {
if (ku_string_t::isShortString(kuStr.len)) {
return;
Expand All @@ -39,45 +54,22 @@ void DiskOverflowFile::readStringToVector(
bufferManager.unpin(*fileHandleToPin, pageIdxToPin);
}

void DiskOverflowFile::scanSequentialStringOverflow(TransactionType trxType, ValueVector& vector) {
BufferManagedFileHandle* cachedFileHandle = nullptr;
page_idx_t cachedPageIdx = UINT32_MAX;
uint8_t* cachedFrame = nullptr;
for (auto i = 0u; i < vector.state->selVector->selectedSize; ++i) {
auto pos = vector.state->selVector->selectedPositions[i];
if (vector.isNull(pos)) {
continue;
}
auto& kuString = ((ku_string_t*)vector.getData())[pos];
if (ku_string_t::isShortString(kuString.len)) {
continue;
}
page_idx_t pageIdx = UINT32_MAX;
uint16_t pagePos = UINT16_MAX;
TypeUtils::decodeOverflowPtr(kuString.overflowPtr, pageIdx, pagePos);
auto [fileHandleToPin, pageIdxToPin] =
StorageStructureUtils::getFileHandleAndPhysicalPageIdxToPin(
*fileHandle, pageIdx, *wal, trxType);
if (pageIdxToPin == cachedPageIdx) { // cache hit
InMemOverflowBufferUtils::copyString(
(char*)(cachedFrame + pagePos), kuString.len, kuString, vector.getOverflowBuffer());
continue;
}
// cache miss
if (cachedPageIdx != UINT32_MAX) { // unpin cached frame
bufferManager.unpin(*cachedFileHandle, cachedPageIdx);
}
// pin new frame and update cache
auto frame = bufferManager.pin(*fileHandleToPin, pageIdxToPin);
InMemOverflowBufferUtils::copyString(
(char*)(frame + pagePos), kuString.len, kuString, vector.getOverflowBuffer());
cachedFileHandle = fileHandleToPin;
cachedPageIdx = pageIdxToPin;
cachedFrame = frame;
void DiskOverflowFile::lookupString(TransactionType trxType, ku_string_t& kuStr,
InMemOverflowBuffer& inMemOverflowBuffer, OverflowPageCache& overflowPageCache) {
if (ku_string_t::isShortString(kuStr.len)) {
return;
}
if (cachedPageIdx != UINT32_MAX) {
bufferManager.unpin(*cachedFileHandle, cachedPageIdx);
PageByteCursor cursor;
TypeUtils::decodeOverflowPtr(kuStr.overflowPtr, cursor.pageIdx, cursor.offsetInPage);
auto [fileHandleToPin, pageIdxToPin] =
StorageStructureUtils::getFileHandleAndPhysicalPageIdxToPin(
*fileHandle, cursor.pageIdx, *wal, trxType);
if (pageIdxToPin != overflowPageCache.pageIdx) { // cache miss
unpinOverflowPageCache(overflowPageCache);
pinOverflowPageCache(fileHandleToPin, pageIdxToPin, overflowPageCache);
}
InMemOverflowBufferUtils::copyString((char*)(overflowPageCache.frame + cursor.offsetInPage),
kuStr.len, kuStr, inMemOverflowBuffer);
}

void DiskOverflowFile::readListsToVector(TransactionType trxType, ValueVector& valueVector) {
Expand All @@ -104,9 +96,11 @@ void DiskOverflowFile::readListToVector(TransactionType trxType, ku_list_t& kuLi
bufferManager.unpin(*fileHandleToPin, pageIdxToPin);
if (dataType.childType->typeID == STRING) {
auto kuStrings = (ku_string_t*)(kuList.overflowPtr);
OverflowPageCache overflowPageCache;
for (auto i = 0u; i < kuList.size; i++) {
readStringToVector(trxType, kuStrings[i], inMemOverflowBuffer);
lookupString(trxType, kuStrings[i], inMemOverflowBuffer, overflowPageCache);
}
unpinOverflowPageCache(overflowPageCache);
} else if (dataType.childType->typeID == VAR_LIST) {
auto kuLists = (ku_list_t*)(kuList.overflowPtr);
for (auto i = 0u; i < kuList.size; i++) {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/storage_structure/lists/lists.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,13 @@ void Lists::readPropertyUpdatesToInMemListIfExists(InMemList& inMemList,
void StringPropertyLists::readFromLargeList(ValueVector* valueVector, ListHandle& listHandle) {
valueVector->resetOverflowBuffer();
Lists::readFromLargeList(valueVector, listHandle);
diskOverflowFile.readStringsToVector(TransactionType::READ_ONLY, *valueVector);
diskOverflowFile.scanStrings(TransactionType::READ_ONLY, *valueVector);
}

void StringPropertyLists::readFromSmallList(ValueVector* valueVector, ListHandle& listHandle) {
valueVector->resetOverflowBuffer();
Lists::readFromSmallList(valueVector, listHandle);
diskOverflowFile.readStringsToVector(TransactionType::READ_ONLY, *valueVector);
diskOverflowFile.scanStrings(TransactionType::READ_ONLY, *valueVector);
}

void ListPropertyLists::readFromLargeList(ValueVector* valueVector, ListHandle& listHandle) {
Expand Down