diff --git a/src/include/storage/storage_structure/column.h b/src/include/storage/storage_structure/column.h index 77f75172da..58251be0b8 100644 --- a/src/include/storage/storage_structure/column.h +++ b/src/include/storage/storage_structure/column.h @@ -8,21 +8,42 @@ namespace kuzu { namespace storage { -class Column : public BaseColumnOrList { +using scan_data_func_t = std::function; +using lookup_data_func_t = std::function; +class Column : public BaseColumnOrList { public: - Column(const common::DataType& dataType) : BaseColumnOrList{dataType} {}; - - Column(const StorageStructureIDAndFName& structureIDAndFName, const common::DataType& dataType, - size_t elementSize, BufferManager* bufferManager, WAL* wal) - : BaseColumnOrList{structureIDAndFName, dataType, elementSize, bufferManager, - true /*hasNULLBytes*/, wal} {}; + // TODO(Guodong): Clean up column constructors. + // Currently extended by SERIAL column. + explicit Column(const common::DataType& dataType) + : BaseColumnOrList{dataType}, tableID{common::INVALID_TABLE_ID} {}; Column(const StorageStructureIDAndFName& structureIDAndFName, const common::DataType& dataType, BufferManager* bufferManager, WAL* wal) : Column(structureIDAndFName, dataType, common::Types::getDataTypeSize(dataType), bufferManager, wal){}; + Column(const StorageStructureIDAndFName& structureIDAndFName, const common::DataType& dataType, + size_t elementSize, BufferManager* bufferManager, WAL* wal) + : Column{structureIDAndFName, dataType, elementSize, bufferManager, wal, + common::INVALID_TABLE_ID} {}; + + // Extended by INTERNAL_ID column. + Column(const StorageStructureIDAndFName& structureIDAndFName, const common::DataType& dataType, + size_t elementSize, BufferManager* bufferManager, WAL* wal, common::table_id_t tableID) + : BaseColumnOrList{structureIDAndFName, dataType, elementSize, bufferManager, + true /*hasNULLBytes*/, wal}, + tableID{tableID} { + scanDataFunc = Column::scanValuesFromPage; + lookupDataFunc = Column::lookupValueFromPage; + } + // Expose for feature store virtual void batchLookup(const common::offset_t* nodeOffsets, size_t size, uint8_t* result); @@ -32,30 +53,28 @@ class Column : public BaseColumnOrList { void writeValues(common::ValueVector* nodeIDVector, common::ValueVector* vectorToWriteFrom); bool isNull(common::offset_t nodeOffset, transaction::Transaction* transaction); - void setNodeOffsetToNull(common::offset_t nodeOffset); + void setNull(common::offset_t nodeOffset); // Currently, used only in CopyCSV tests. + // TODO(Guodong): Remove this function. Use `read` instead. virtual common::Value readValueForTestingOnly(common::offset_t offset); protected: void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, common::ValueVector* resultVector, uint32_t vectorPos); - virtual void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector, - uint32_t vectorPos, PageElementCursor& cursor); - virtual inline void scan(transaction::Transaction* transaction, - common::ValueVector* resultVector, PageElementCursor& cursor) { - readBySequentialCopy(transaction, resultVector, cursor, identityMapper); - } - virtual void scanWithSelState(transaction::Transaction* transaction, - common::ValueVector* resultVector, PageElementCursor& cursor) { - readBySequentialCopyWithSelState(transaction, resultVector, cursor, identityMapper); - } + virtual void lookup(transaction::Transaction* transaction, common::offset_t nodeOffset, + common::ValueVector* resultVector, uint32_t vectorPos); + virtual void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, + common::ValueVector* resultVector); virtual void writeValueForSingleNodeIDPosition(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom); WALPageIdxPosInPageAndFrame beginUpdatingPage(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom); + void readFromPage(transaction::Transaction* transaction, common::page_idx_t pageIdx, + const std::function& func); + private: // The reason why we make this function virtual is: we can't simply do memcpy on nodeIDs if // the adjColumn has tableIDCompression, in this case we only store the nodeOffset in @@ -74,35 +93,42 @@ class Column : public BaseColumnOrList { WALPageIdxPosInPageAndFrame beginUpdatingPageAndWriteOnlyNullBit( common::offset_t nodeOffset, bool isNull); + static void scanValuesFromPage(transaction::Transaction* transaction, uint8_t* frame, + PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector, + uint32_t numElementsPerPage, uint32_t numValuesToRead, common::table_id_t commonTableID, + DiskOverflowFile* diskOverflowFile); + static void lookupValueFromPage(transaction::Transaction* transaction, uint8_t* frame, + PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector, + uint32_t numElementsPerPage, common::table_id_t commonTableID, + DiskOverflowFile* diskOverflowFile); + protected: // no logical-physical page mapping is required for columns std::function identityMapper = [](uint32_t i) { return i; }; + + scan_data_func_t scanDataFunc; + lookup_data_func_t lookupDataFunc; + common::table_id_t tableID; + std::unique_ptr diskOverflowFile; }; class PropertyColumnWithOverflow : public Column { public: PropertyColumnWithOverflow(const StorageStructureIDAndFName& structureIDAndFNameOfMainColumn, const common::DataType& dataType, BufferManager* bufferManager, WAL* wal) - : Column{structureIDAndFNameOfMainColumn, dataType, bufferManager, wal}, - diskOverflowFile{structureIDAndFNameOfMainColumn, bufferManager, wal} {} - - inline void read(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, - common::ValueVector* resultVector) override { - common::StringVector::resetOverflowBuffer(resultVector); - Column::read(transaction, nodeIDVector, resultVector); + : Column{structureIDAndFNameOfMainColumn, dataType, bufferManager, wal} { + diskOverflowFile = + std::make_unique(structureIDAndFNameOfMainColumn, bufferManager, wal); } - inline DiskOverflowFile* getDiskOverflowFile() { return &diskOverflowFile; } - inline BMFileHandle* getDiskOverflowFileHandle() { return diskOverflowFile.getFileHandle(); } + inline DiskOverflowFile* getDiskOverflowFile() { return diskOverflowFile.get(); } -protected: - DiskOverflowFile diskOverflowFile; + inline BMFileHandle* getDiskOverflowFileHandle() { return diskOverflowFile->getFileHandle(); } }; class StringPropertyColumn : public PropertyColumnWithOverflow { - public: StringPropertyColumn(const StorageStructureIDAndFName& structureIDAndFNameOfMainColumn, const common::DataType& dataType, BufferManager* bufferManager, WAL* wal) @@ -116,33 +142,32 @@ class StringPropertyColumn : public PropertyColumnWithOverflow { common::Value readValueForTestingOnly(common::offset_t offset) override; private: - inline void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector, - uint32_t vectorPos, PageElementCursor& cursor) override { - Column::lookup(transaction, resultVector, vectorPos, cursor); + inline void lookup(transaction::Transaction* transaction, common::offset_t nodeOffset, + common::ValueVector* resultVector, uint32_t vectorPos) override { + common::StringVector::resetOverflowBuffer(resultVector); + Column::lookup(transaction, nodeOffset, resultVector, vectorPos); if (!resultVector->isNull(vectorPos)) { - diskOverflowFile.scanSingleStringOverflow( + diskOverflowFile->scanSingleStringOverflow( transaction->getType(), *resultVector, vectorPos); } } - inline void scan(transaction::Transaction* transaction, common::ValueVector* resultVector, - PageElementCursor& cursor) override { - Column::scan(transaction, resultVector, cursor); - diskOverflowFile.scanStrings(transaction->getType(), *resultVector); - } - void scanWithSelState(transaction::Transaction* transaction, common::ValueVector* resultVector, - PageElementCursor& cursor) override { - Column::scanWithSelState(transaction, resultVector, cursor); - diskOverflowFile.scanStrings(transaction->getType(), *resultVector); + inline void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, + common::ValueVector* resultVector) override { + common::StringVector::resetOverflowBuffer(resultVector); + Column::scan(transaction, nodeIDVector, resultVector); + diskOverflowFile->scanStrings(transaction->getType(), *resultVector); } }; class ListPropertyColumn : public PropertyColumnWithOverflow { - public: ListPropertyColumn(const StorageStructureIDAndFName& structureIDAndFNameOfMainColumn, const common::DataType& dataType, BufferManager* bufferManager, WAL* wal) : PropertyColumnWithOverflow{ - structureIDAndFNameOfMainColumn, dataType, bufferManager, wal} {}; + structureIDAndFNameOfMainColumn, dataType, bufferManager, wal} { + scanDataFunc = ListPropertyColumn::scanListsFromPage; + lookupDataFunc = ListPropertyColumn::lookupListFromPage; + }; void writeValueForSingleNodeIDPosition(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) override; @@ -150,20 +175,14 @@ class ListPropertyColumn : public PropertyColumnWithOverflow { common::Value readValueForTestingOnly(common::offset_t offset) override; private: - // TODO(Ziyi): remove these function once we have changed the storage structure for lists. - void readListsToVector(PageElementCursor& cursor, - const std::function& logicalToPhysicalPageMapper, - transaction::Transaction* transaction, common::ValueVector* resultVector, - uint64_t vectorPos, uint64_t numValuesToRead); - - void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector, - uint32_t vectorPos, PageElementCursor& cursor) override; - - void scan(transaction::Transaction* transaction, common::ValueVector* resultVector, - PageElementCursor& cursor) override; - - void scanWithSelState(transaction::Transaction* transaction, common::ValueVector* resultVector, - PageElementCursor& cursor) override; + static void scanListsFromPage(transaction::Transaction* transaction, uint8_t* frame, + PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector, + uint32_t numElementsPerPage, uint32_t numValuesToRead, common::table_id_t commonTableID, + DiskOverflowFile* diskOverflowFile); + static void lookupListFromPage(transaction::Transaction* transaction, uint8_t* frame, + PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector, + uint32_t numElementsPerPage, common::table_id_t commonTableID, + DiskOverflowFile* diskOverflowFile); }; class StructPropertyColumn : public Column { @@ -178,38 +197,17 @@ class StructPropertyColumn : public Column { std::vector> structFieldColumns; }; -class RelIDColumn : public Column { - +class InternalIDColumn : public Column { public: - RelIDColumn(const StorageStructureIDAndFName& structureIDAndFName, BufferManager* bufferManager, - WAL* wal) + InternalIDColumn(const StorageStructureIDAndFName& structureIDAndFName, + BufferManager* bufferManager, WAL* wal, common::table_id_t tableID) : Column{structureIDAndFName, common::DataType(common::INTERNAL_ID), - sizeof(common::offset_t), bufferManager, wal}, - commonTableID{structureIDAndFName.storageStructureID.columnFileID.relPropertyColumnID - .relNodeTableAndDir.relTableID} { - assert(structureIDAndFName.storageStructureID.columnFileID.columnType == - ColumnType::REL_PROPERTY_COLUMN); - assert(structureIDAndFName.storageStructureID.storageStructureType == - StorageStructureType::COLUMN); + sizeof(common::offset_t), bufferManager, wal, tableID} { + scanDataFunc = InternalIDColumn::scanInternalIDsFromPage; + lookupDataFunc = InternalIDColumn::lookupInternalIDFromPage; } private: - inline void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector, - uint32_t vectorPos, PageElementCursor& cursor) override { - readInternalIDsFromAPageBySequentialCopy(transaction, resultVector, vectorPos, - cursor.pageIdx, cursor.elemPosInPage, 1 /* numValuesToCopy */, commonTableID, - false /* hasNoNullGuarantee */); - } - inline void scan(transaction::Transaction* transaction, common::ValueVector* resultVector, - PageElementCursor& cursor) override { - readInternalIDsBySequentialCopy(transaction, resultVector, cursor, identityMapper, - commonTableID, false /* hasNoNullGuarantee */); - } - inline void scanWithSelState(transaction::Transaction* transaction, - common::ValueVector* resultVector, PageElementCursor& cursor) override { - readInternalIDsBySequentialCopyWithSelState( - transaction, resultVector, cursor, identityMapper, commonTableID); - } inline void writeToPage(WALPageIdxPosInPageAndFrame& walPageInfo, common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) override { auto relID = vectorToWriteFrom->getValue(posInVectorToWriteFrom); @@ -217,44 +215,21 @@ class RelIDColumn : public Column { sizeof(relID.offset)); } -private: - common::table_id_t commonTableID; + static void scanInternalIDsFromPage(transaction::Transaction* transaction, uint8_t* frame, + PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector, + uint32_t numElementsPerPage, uint32_t numValuesToRead, common::table_id_t commonTableID, + DiskOverflowFile* diskOverflowFile); + static void lookupInternalIDFromPage(transaction::Transaction* transaction, uint8_t* frame, + PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector, + uint32_t numElementsPerPage, common::table_id_t commonTableID, + DiskOverflowFile* diskOverflowFile); }; -class AdjColumn : public Column { - +class AdjColumn : public InternalIDColumn { public: AdjColumn(const StorageStructureIDAndFName& structureIDAndFName, common::table_id_t nbrTableID, BufferManager* bufferManager, WAL* wal) - : Column{structureIDAndFName, common::DataType(common::INTERNAL_ID), - sizeof(common::offset_t), bufferManager, wal}, - nbrTableID{nbrTableID} {}; - -private: - inline void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector, - uint32_t vectorPos, PageElementCursor& cursor) override { - readInternalIDsFromAPageBySequentialCopy(transaction, resultVector, vectorPos, - cursor.pageIdx, cursor.elemPosInPage, 1 /* numValuesToCopy */, nbrTableID, - false /* hasNoNullGuarantee */); - } - inline void scan(transaction::Transaction* transaction, common::ValueVector* resultVector, - PageElementCursor& cursor) override { - readInternalIDsBySequentialCopy(transaction, resultVector, cursor, identityMapper, - nbrTableID, false /* hasNoNullGuarantee */); - } - inline void scanWithSelState(transaction::Transaction* transaction, - common::ValueVector* resultVector, PageElementCursor& cursor) override { - readInternalIDsBySequentialCopyWithSelState( - transaction, resultVector, cursor, identityMapper, nbrTableID); - } - inline void writeToPage(WALPageIdxPosInPageAndFrame& walPageInfo, - common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) override { - *(walPageInfo.frame + mapElementPosToByteOffset(walPageInfo.posInPage)) = - vectorToWriteFrom->getValue(posInVectorToWriteFrom).offset; - } - -private: - common::table_id_t nbrTableID; + : InternalIDColumn{structureIDAndFName, bufferManager, wal, nbrTableID} {} }; class SerialColumn : public Column { @@ -266,7 +241,6 @@ class SerialColumn : public Column { }; class ColumnFactory { - public: static std::unique_ptr getColumn(const StorageStructureIDAndFName& structureIDAndFName, const common::DataType& dataType, BufferManager* bufferManager, WAL* wal) { @@ -289,11 +263,14 @@ class ColumnFactory { return std::make_unique( structureIDAndFName, dataType, bufferManager, wal); case common::INTERNAL_ID: + // RelID column in rel tables. assert(structureIDAndFName.storageStructureID.storageStructureType == StorageStructureType::COLUMN && structureIDAndFName.storageStructureID.columnFileID.columnType == ColumnType::REL_PROPERTY_COLUMN); - return std::make_unique(structureIDAndFName, bufferManager, wal); + return std::make_unique(structureIDAndFName, bufferManager, wal, + structureIDAndFName.storageStructureID.columnFileID.relPropertyColumnID + .relNodeTableAndDir.relTableID); case common::STRUCT: return std::make_unique( structureIDAndFName, dataType, bufferManager, wal); diff --git a/src/storage/storage_structure/column.cpp b/src/storage/storage_structure/column.cpp index ac8393fffc..6defedf511 100644 --- a/src/storage/storage_structure/column.cpp +++ b/src/storage/storage_structure/column.cpp @@ -13,10 +13,8 @@ void Column::batchLookup(const common::offset_t* nodeOffsets, size_t size, uint8 for (auto i = 0u; i < size; ++i) { auto nodeOffset = nodeOffsets[i]; auto cursor = PageUtils::getPageElementCursorForPos(nodeOffset, numElementsPerPage); - auto [fileHandleToPin, pageIdxToPin] = - StorageStructureUtils::getFileHandleAndPhysicalPageIdxToPin( - *fileHandle, cursor.pageIdx, *wal, TransactionType::READ_ONLY); - bufferManager->optimisticRead(*fileHandleToPin, pageIdxToPin, [&](uint8_t* frame) -> void { + auto dummyReadOnlyTransaction = Transaction::getDummyReadOnlyTrx(); + readFromPage(dummyReadOnlyTransaction.get(), cursor.pageIdx, [&](uint8_t* frame) -> void { auto frameBytesOffset = getElemByteOffset(cursor.elemPosInPage); memcpy(result + i * elementSize, frame + frameBytesOffset, elementSize); }); @@ -29,14 +27,7 @@ void Column::read(Transaction* transaction, common::ValueVector* nodeIDVector, auto pos = nodeIDVector->state->selVector->selectedPositions[0]; lookup(transaction, nodeIDVector, resultVector, pos); } else if (nodeIDVector->isSequential()) { - // In sequential read, we fetch start offset regardless of selected position. - auto startOffset = nodeIDVector->readNodeOffset(0); - auto pageCursor = PageUtils::getPageElementCursorForPos(startOffset, numElementsPerPage); - if (nodeIDVector->state->selVector->isUnfiltered()) { - scan(transaction, resultVector, pageCursor); - } else { - scanWithSelState(transaction, resultVector, pageCursor); - } + scan(transaction, nodeIDVector, resultVector); } else { for (auto i = 0ul; i < nodeIDVector->state->selVector->selectedSize; i++) { auto pos = nodeIDVector->state->selVector->selectedPositions[i]; @@ -73,15 +64,6 @@ void Column::writeValues( } } -Value Column::readValueForTestingOnly(offset_t offset) { - auto cursor = PageUtils::getPageElementCursorForPos(offset, numElementsPerPage); - Value retVal = Value::createDefaultValue(dataType); - bufferManager->optimisticRead(*fileHandle, cursor.pageIdx, [&](uint8_t* frame) { - retVal.copyValueFrom(frame + mapElementPosToByteOffset(cursor.elemPosInPage)); - }); - return retVal; -} - bool Column::isNull(offset_t nodeOffset, Transaction* transaction) { auto cursor = PageUtils::getPageElementCursorForPos(nodeOffset, numElementsPerPage); auto originalPageIdx = cursor.pageIdx; @@ -115,13 +97,23 @@ bool Column::isNull(offset_t nodeOffset, Transaction* transaction) { return isNull; } -void Column::setNodeOffsetToNull(offset_t nodeOffset) { +void Column::setNull(common::offset_t nodeOffset) { auto updatedPageInfoAndWALPageFrame = beginUpdatingPageAndWriteOnlyNullBit(nodeOffset, true /* isNull */); StorageStructureUtils::unpinWALPageAndReleaseOriginalPageLock( updatedPageInfoAndWALPageFrame, *fileHandle, *bufferManager, *wal); } +Value Column::readValueForTestingOnly(offset_t offset) { + auto cursor = PageUtils::getPageElementCursorForPos(offset, numElementsPerPage); + Value retVal = Value::createDefaultValue(dataType); + auto dummyReadOnlyTransaction = Transaction::getDummyReadOnlyTrx(); + readFromPage(dummyReadOnlyTransaction.get(), cursor.pageIdx, [&](uint8_t* frame) { + retVal.copyValueFrom(frame + mapElementPosToByteOffset(cursor.elemPosInPage)); + }); + return retVal; +} + void Column::lookup(Transaction* transaction, common::ValueVector* nodeIDVector, common::ValueVector* resultVector, uint32_t vectorPos) { if (nodeIDVector->isNull(vectorPos)) { @@ -129,23 +121,68 @@ void Column::lookup(Transaction* transaction, common::ValueVector* nodeIDVector, return; } auto nodeOffset = nodeIDVector->readNodeOffset(vectorPos); - auto pageCursor = PageUtils::getPageElementCursorForPos(nodeOffset, numElementsPerPage); - lookup(transaction, resultVector, vectorPos, pageCursor); + lookup(transaction, nodeOffset, resultVector, vectorPos); } -void Column::lookup(Transaction* transaction, common::ValueVector* resultVector, uint32_t vectorPos, - PageElementCursor& cursor) { - auto [fileHandleToPin, pageIdxToPin] = - StorageStructureUtils::getFileHandleAndPhysicalPageIdxToPin( - *fileHandle, cursor.pageIdx, *wal, transaction->getType()); - bufferManager->optimisticRead(*fileHandleToPin, pageIdxToPin, [&](uint8_t* frame) -> void { - auto vectorBytesOffset = getElemByteOffset(vectorPos); - auto frameBytesOffset = getElemByteOffset(cursor.elemPosInPage); - memcpy(resultVector->getData() + vectorBytesOffset, frame + frameBytesOffset, elementSize); - readSingleNullBit(resultVector, frame, cursor.elemPosInPage, vectorPos); +void Column::lookup(Transaction* transaction, common::offset_t nodeOffset, + common::ValueVector* resultVector, uint32_t vectorPos) { + auto pageCursor = PageUtils::getPageElementCursorForPos(nodeOffset, numElementsPerPage); + readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) { + lookupDataFunc(transaction, frame, pageCursor, resultVector, vectorPos, numElementsPerPage, + tableID, diskOverflowFile.get()); }); } +void Column::scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, + common::ValueVector* resultVector) { + // In sequential read, we fetch start offset regardless of selected position. + auto startOffset = nodeIDVector->readNodeOffset(0); + uint64_t numValuesToRead = nodeIDVector->state->originalSize; + auto pageCursor = PageUtils::getPageElementCursorForPos(startOffset, numElementsPerPage); + auto numValuesRead = 0u; + auto posInSelVector = 0u; + if (nodeIDVector->state->selVector->isUnfiltered()) { + while (numValuesRead < numValuesToRead) { + uint64_t numValuesToReadInPage = + std::min((uint64_t)numElementsPerPage - pageCursor.elemPosInPage, + numValuesToRead - numValuesRead); + readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void { + scanDataFunc(transaction, frame, pageCursor, resultVector, numValuesRead, + numElementsPerPage, numValuesToReadInPage, tableID, diskOverflowFile.get()); + }); + numValuesRead += numValuesToReadInPage; + pageCursor.nextPage(); + } + } else { + while (numValuesRead < numValuesToRead) { + uint64_t numValuesToReadInPage = + std::min((uint64_t)numElementsPerPage - pageCursor.elemPosInPage, + numValuesToRead - numValuesRead); + if (isInRange(nodeIDVector->state->selVector->selectedPositions[posInSelVector], + numValuesRead, numValuesRead + numValuesToReadInPage)) { + readFromPage(transaction, pageCursor.pageIdx, [&](uint8_t* frame) -> void { + scanDataFunc(transaction, frame, pageCursor, resultVector, numValuesRead, + numElementsPerPage, numValuesToReadInPage, tableID, diskOverflowFile.get()); + }); + } + numValuesRead += numValuesToReadInPage; + pageCursor.nextPage(); + while ( + posInSelVector < nodeIDVector->state->selVector->selectedSize && + nodeIDVector->state->selVector->selectedPositions[posInSelVector] < numValuesRead) { + posInSelVector++; + } + } + } +} +void Column::writeValueForSingleNodeIDPosition( + offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { + auto updatedPageInfoAndWALPageFrame = + beginUpdatingPage(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); + StorageStructureUtils::unpinWALPageAndReleaseOriginalPageLock( + updatedPageInfoAndWALPageFrame, *fileHandle, *bufferManager, *wal); +} + WALPageIdxPosInPageAndFrame Column::beginUpdatingPage( offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { auto isNull = vectorToWriteFrom->isNull(posInVectorToWriteFrom); @@ -156,6 +193,14 @@ WALPageIdxPosInPageAndFrame Column::beginUpdatingPage( return walPageInfo; } +void Column::readFromPage(transaction::Transaction* transaction, common::page_idx_t pageIdx, + const std::function& func) { + auto [fileHandleToPin, pageIdxToPin] = + StorageStructureUtils::getFileHandleAndPhysicalPageIdxToPin( + *fileHandle, pageIdx, *wal, transaction->getType()); + bufferManager->optimisticRead(*fileHandleToPin, pageIdxToPin, func); +} + WALPageIdxPosInPageAndFrame Column::beginUpdatingPageAndWriteOnlyNullBit( offset_t nodeOffset, bool isNull) { auto walPageInfo = createWALVersionOfPageIfNecessaryForElement(nodeOffset, numElementsPerPage); @@ -163,12 +208,32 @@ WALPageIdxPosInPageAndFrame Column::beginUpdatingPageAndWriteOnlyNullBit( return walPageInfo; } -void Column::writeValueForSingleNodeIDPosition( - offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { - auto updatedPageInfoAndWALPageFrame = - beginUpdatingPage(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); - StorageStructureUtils::unpinWALPageAndReleaseOriginalPageLock( - updatedPageInfoAndWALPageFrame, *fileHandle, *bufferManager, *wal); +void Column::scanValuesFromPage(transaction::Transaction* transaction, uint8_t* frame, + PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector, + uint32_t numElementsPerPage, uint32_t numValuesToRead, common::table_id_t commonTableID, + DiskOverflowFile* diskOverflowFile) { + auto numBytesPerValue = resultVector->getNumBytesPerValue(); + memcpy(resultVector->getData() + posInVector * numBytesPerValue, + frame + pageCursor.elemPosInPage * numBytesPerValue, numValuesToRead * numBytesPerValue); + auto hasNull = NullMask::copyNullMask( + (uint64_t*)(frame + (numElementsPerPage * numBytesPerValue)), pageCursor.elemPosInPage, + resultVector->getNullMaskData(), posInVector, numValuesToRead); + if (hasNull) { + resultVector->setMayContainNulls(); + } +} + +void Column::lookupValueFromPage(transaction::Transaction* transaction, uint8_t* frame, + kuzu::storage::PageElementCursor& pageCursor, common::ValueVector* resultVector, + uint32_t posInVector, uint32_t numElementsPerPage, common::table_id_t commonTableID, + DiskOverflowFile* diskOverflowFile) { + auto numBytesPerValue = resultVector->getNumBytesPerValue(); + auto frameBytesOffset = pageCursor.elemPosInPage * numBytesPerValue; + memcpy(resultVector->getData() + posInVector * numBytesPerValue, frame + frameBytesOffset, + numBytesPerValue); + auto isNull = NullMask::isNull( + (uint64_t*)(frame + (numElementsPerPage * numBytesPerValue)), pageCursor.elemPosInPage); + resultVector->setNull(posInVector, isNull); } void StringPropertyColumn::writeValueForSingleNodeIDPosition( @@ -184,7 +249,7 @@ void StringPropertyColumn::writeValueForSingleNodeIDPosition( // the overflow buffer of vectorToWriteFrom. We need to move it to storage. if (!ku_string_t::isShortString(stringToWriteFrom.len)) { try { - diskOverflowFile.writeStringOverflowAndUpdateOverflowPtr( + diskOverflowFile->writeStringOverflowAndUpdateOverflowPtr( stringToWriteFrom, *stringToWriteTo); } catch (RuntimeException& e) { // Note: The try catch block is to make sure that we correctly unpin the WAL page @@ -202,13 +267,14 @@ void StringPropertyColumn::writeValueForSingleNodeIDPosition( } Value StringPropertyColumn::readValueForTestingOnly(offset_t offset) { - auto cursor = PageUtils::getPageElementCursorForPos(offset, numElementsPerPage); ku_string_t kuString; - bufferManager->optimisticRead(*fileHandle, cursor.pageIdx, [&](uint8_t* frame) -> void { + auto cursor = PageUtils::getPageElementCursorForPos(offset, numElementsPerPage); + auto dummyReadOnlyTransaction = Transaction::getDummyReadOnlyTrx(); + readFromPage(dummyReadOnlyTransaction.get(), cursor.pageIdx, [&](uint8_t* frame) -> void { memcpy(&kuString, frame + mapElementPosToByteOffset(cursor.elemPosInPage), sizeof(ku_string_t)); }); - return Value(diskOverflowFile.readString(TransactionType::READ_ONLY, kuString)); + return Value(diskOverflowFile->readString(TransactionType::READ_ONLY, kuString)); } void ListPropertyColumn::writeValueForSingleNodeIDPosition( @@ -222,7 +288,7 @@ void ListPropertyColumn::writeValueForSingleNodeIDPosition( mapElementPosToByteOffset(updatedPageInfoAndWALPageFrame.posInPage))); auto kuListToWriteFrom = vectorToWriteFrom->getValue(posInVectorToWriteFrom); try { - diskOverflowFile.writeListOverflowAndUpdateOverflowPtr( + diskOverflowFile->writeListOverflowAndUpdateOverflowPtr( kuListToWriteFrom, *kuListToWriteTo, vectorToWriteFrom->dataType); } catch (RuntimeException& e) { // Note: The try catch block is to make sure that we correctly unpin the WAL page @@ -239,88 +305,47 @@ void ListPropertyColumn::writeValueForSingleNodeIDPosition( } Value ListPropertyColumn::readValueForTestingOnly(offset_t offset) { - auto cursor = PageUtils::getPageElementCursorForPos(offset, numElementsPerPage); ku_list_t kuList; - bufferManager->optimisticRead(*fileHandle, cursor.pageIdx, [&](uint8_t* frame) -> void { + auto cursor = PageUtils::getPageElementCursorForPos(offset, numElementsPerPage); + auto dummyReadOnlyTransaction = Transaction::getDummyReadOnlyTrx(); + readFromPage(dummyReadOnlyTransaction.get(), cursor.pageIdx, [&](uint8_t* frame) -> void { memcpy(&kuList, frame + mapElementPosToByteOffset(cursor.elemPosInPage), sizeof(ku_list_t)); }); - return Value(dataType, diskOverflowFile.readList(TransactionType::READ_ONLY, kuList, dataType)); + return Value( + dataType, diskOverflowFile->readList(TransactionType::READ_ONLY, kuList, dataType)); } -void ListPropertyColumn::readListsToVector(PageElementCursor& cursor, - const std::function& logicalToPhysicalPageMapper, - transaction::Transaction* transaction, common::ValueVector* resultVector, uint64_t vectorPos, - uint64_t numValuesToRead) { - auto [fileHandleToPin, pageIdxToPin] = - StorageStructureUtils::getFileHandleAndPhysicalPageIdxToPin( - *fileHandle, logicalToPhysicalPageMapper(cursor.pageIdx), *wal, transaction->getType()); - auto frameBytesOffset = getElemByteOffset(cursor.elemPosInPage); - bufferManager->optimisticRead(*fileHandleToPin, pageIdxToPin, [&](uint8_t* frame) { - auto kuListsToRead = reinterpret_cast(frame + frameBytesOffset); - readNullBitsFromAPage( - resultVector, frame, cursor.elemPosInPage, vectorPos, numValuesToRead); - for (auto i = 0u; i < numValuesToRead; i++) { - if (!resultVector->isNull(vectorPos)) { - diskOverflowFile.readListToVector( - transaction->getType(), kuListsToRead[i], resultVector, vectorPos); - } - vectorPos++; - } - }); -} - -void ListPropertyColumn::lookup(transaction::Transaction* transaction, - common::ValueVector* resultVector, uint32_t vectorPos, PageElementCursor& cursor) { - auto [fileHandleToPin, pageIdxToPin] = - StorageStructureUtils::getFileHandleAndPhysicalPageIdxToPin( - *fileHandle, cursor.pageIdx, *wal, transaction->getType()); - bufferManager->optimisticRead(*fileHandleToPin, pageIdxToPin, [&](uint8_t* frame) -> void { - readSingleNullBit(resultVector, frame, cursor.elemPosInPage, vectorPos); - if (!resultVector->isNull(vectorPos)) { - auto frameBytesOffset = getElemByteOffset(cursor.elemPosInPage); - diskOverflowFile.readListToVector(transaction->getType(), - *(common::ku_list_t*)(frame + frameBytesOffset), resultVector, vectorPos); +void ListPropertyColumn::scanListsFromPage(transaction::Transaction* transaction, uint8_t* frame, + kuzu::storage::PageElementCursor& pageCursor, common::ValueVector* resultVector, + uint32_t posInVector, uint32_t numElementsPerPage, uint32_t numValuesToRead, + common::table_id_t commonTableID, DiskOverflowFile* diskOverflowFile) { + auto frameBytesOffset = pageCursor.elemPosInPage * sizeof(ku_list_t); + auto kuListsToRead = reinterpret_cast(frame + frameBytesOffset); + auto hasNull = NullMask::copyNullMask( + (uint64_t*)(frame + (numElementsPerPage * sizeof(ku_list_t))), pageCursor.elemPosInPage, + resultVector->getNullMaskData(), posInVector, numValuesToRead); + if (hasNull) { + resultVector->setMayContainNulls(); + } + for (auto i = 0u; i < numValuesToRead; i++) { + if (!resultVector->isNull(posInVector + i)) { + diskOverflowFile->readListToVector( + transaction->getType(), kuListsToRead[i], resultVector, posInVector + i); } - }); -} - -void ListPropertyColumn::scan(transaction::Transaction* transaction, - common::ValueVector* resultVector, PageElementCursor& cursor) { - uint64_t numValuesToRead = resultVector->state->originalSize; - uint64_t numValuesRead = 0; - while (numValuesRead != numValuesToRead) { - uint64_t numValuesInPage = numElementsPerPage - cursor.elemPosInPage; - uint64_t numValuesToReadInPage = std::min(numValuesInPage, numValuesToRead - numValuesRead); - readListsToVector(cursor, identityMapper, transaction, resultVector, numValuesRead, - numValuesToReadInPage); - numValuesRead += numValuesToReadInPage; - cursor.nextPage(); } } -void ListPropertyColumn::scanWithSelState(transaction::Transaction* transaction, - common::ValueVector* resultVector, PageElementCursor& cursor) { - auto selectedState = resultVector->state; - auto numValuesToRead = resultVector->state->originalSize; - uint64_t selectedStatePos = 0; - uint64_t vectorPos = 0; - while (true) { - uint64_t numValuesInPage = numElementsPerPage - cursor.elemPosInPage; - uint64_t numValuesToReadInPage = std::min(numValuesInPage, numValuesToRead - vectorPos); - if (StorageStructure::isInRange( - selectedState->selVector->selectedPositions[selectedStatePos], vectorPos, - vectorPos + numValuesToReadInPage)) { - readListsToVector(cursor, identityMapper, transaction, resultVector, vectorPos, - numValuesToReadInPage); - } - vectorPos += numValuesToReadInPage; - while (selectedState->selVector->selectedPositions[selectedStatePos] < vectorPos) { - selectedStatePos++; - if (selectedStatePos == selectedState->selVector->selectedSize) { - return; - } - } - cursor.nextPage(); +void ListPropertyColumn::lookupListFromPage(transaction::Transaction* transaction, uint8_t* frame, + kuzu::storage::PageElementCursor& pageCursor, common::ValueVector* resultVector, + uint32_t posInVector, uint32_t numElementsPerPage, common::table_id_t commonTableID, + DiskOverflowFile* diskOverflowFile) { + auto isNull = NullMask::isNull( + (uint64_t*)(frame + (numElementsPerPage * sizeof(ku_list_t))), pageCursor.elemPosInPage); + resultVector->setNull(posInVector, isNull); + if (!resultVector->isNull(posInVector)) { + auto frameBytesOffset = pageCursor.elemPosInPage * sizeof(ku_list_t); + diskOverflowFile->readListToVector(transaction->getType(), + *(common::ku_list_t*)(frame + frameBytesOffset), resultVector, posInVector); } } @@ -340,7 +365,7 @@ StructPropertyColumn::StructPropertyColumn(const StorageStructureIDAndFName& str void StructPropertyColumn::read(Transaction* transaction, common::ValueVector* nodeIDVector, common::ValueVector* resultVector) { - // We currently do not support null struct value. + // TODO(Guodong/Ziyi): We currently do not support null struct value. resultVector->setAllNonNull(); for (auto i = 0u; i < structFieldColumns.size(); i++) { structFieldColumns[i]->read( @@ -348,8 +373,42 @@ void StructPropertyColumn::read(Transaction* transaction, common::ValueVector* n } } +void InternalIDColumn::scanInternalIDsFromPage(transaction::Transaction* transaction, + uint8_t* frame, PageElementCursor& pageCursor, common::ValueVector* resultVector, + uint32_t posInVector, uint32_t numElementsPerPage, uint32_t numValuesToRead, + table_id_t commonTableID, DiskOverflowFile* diskOverflowFile) { + auto numBytesPerValue = sizeof(offset_t); + auto resultData = (internalID_t*)resultVector->getData(); + for (auto i = 0u; i < numValuesToRead; i++) { + auto posInFrame = pageCursor.elemPosInPage + i; + resultData[posInVector + i].offset = *(offset_t*)(frame + (posInFrame * numBytesPerValue)); + resultData[posInVector + i].tableID = commonTableID; + } + auto hasNull = NullMask::copyNullMask( + (uint64_t*)(frame + (numElementsPerPage * numBytesPerValue)), pageCursor.elemPosInPage, + resultVector->getNullMaskData(), posInVector, numValuesToRead); + if (hasNull) { + resultVector->setMayContainNulls(); + } +} + +void InternalIDColumn::lookupInternalIDFromPage(transaction::Transaction* transaction, + uint8_t* frame, kuzu::storage::PageElementCursor& pageCursor, common::ValueVector* resultVector, + uint32_t posInVector, uint32_t numElementsPerPage, common::table_id_t commonTableID, + DiskOverflowFile* diskOverflowFile) { + auto numBytesPerValue = sizeof(offset_t); + auto resultData = (internalID_t*)resultVector->getData(); + resultData[posInVector].offset = + *(offset_t*)(frame + (pageCursor.elemPosInPage * numBytesPerValue)); + resultData[posInVector].tableID = commonTableID; + auto isNull = NullMask::isNull( + (uint64_t*)(frame + (numElementsPerPage * numBytesPerValue)), pageCursor.elemPosInPage); + resultVector->setNull(posInVector, isNull); +} + void SerialColumn::read(transaction::Transaction* transaction, common::ValueVector* nodeIDVector, common::ValueVector* resultVector) { + // Serial column cannot contain null values. for (auto i = 0ul; i < nodeIDVector->state->selVector->selectedSize; i++) { auto pos = nodeIDVector->state->selVector->selectedPositions[i]; auto offset = nodeIDVector->readNodeOffset(pos); diff --git a/src/storage/store/node_table.cpp b/src/storage/store/node_table.cpp index 026bc8be76..138db5079a 100644 --- a/src/storage/store/node_table.cpp +++ b/src/storage/store/node_table.cpp @@ -53,7 +53,7 @@ offset_t NodeTable::addNodeAndResetProperties(ValueVector* primaryKeyVector) { throw RuntimeException(Exception::getExistedPKExceptionMsg(pkStr)); } for (auto& [_, column] : propertyColumns) { - column->setNodeOffsetToNull(nodeOffset); + column->setNull(nodeOffset); } return nodeOffset; } diff --git a/src/storage/store/rel_table.cpp b/src/storage/store/rel_table.cpp index 1b4dc2222c..c7729d38ff 100644 --- a/src/storage/store/rel_table.cpp +++ b/src/storage/store/rel_table.cpp @@ -156,9 +156,9 @@ void DirectedRelTableData::deleteRel(ValueVector* boundVector) { } auto nodeOffset = boundVector->readNodeOffset(boundVector->state->selVector->selectedPositions[0]); - adjColumn->setNodeOffsetToNull(nodeOffset); + adjColumn->setNull(nodeOffset); for (auto& [_, propertyColumn] : propertyColumns) { - propertyColumn->setNodeOffsetToNull(nodeOffset); + propertyColumn->setNull(nodeOffset); } } @@ -298,10 +298,10 @@ void RelTable::updateRel(common::ValueVector* srcNodeIDVector, common::ValueVect void RelTable::initEmptyRelsForNewNode(nodeID_t& nodeID) { if (fwdRelTableData->isSingleMultiplicity() && fwdRelTableData->isBoundTable(nodeID.tableID)) { - fwdRelTableData->getAdjColumn()->setNodeOffsetToNull(nodeID.offset); + fwdRelTableData->getAdjColumn()->setNull(nodeID.offset); } if (bwdRelTableData->isSingleMultiplicity() && bwdRelTableData->isBoundTable(nodeID.tableID)) { - bwdRelTableData->getAdjColumn()->setNodeOffsetToNull(nodeID.offset); + bwdRelTableData->getAdjColumn()->setNull(nodeID.offset); } listsUpdatesStore->initNewlyAddedNodes(nodeID); }