Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework column scan and lookup functions #1539

Merged
merged 1 commit into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 98 additions & 121 deletions src/include/storage/storage_structure/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,42 @@
namespace kuzu {
namespace storage {

class Column : public BaseColumnOrList {
using scan_data_func_t = std::function<void(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, uint32_t numValuesToRead, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile)>;
using lookup_data_func_t = std::function<void(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile)>;

class Column : public BaseColumnOrList {
public:
Column(const common::DataType& dataType) : BaseColumnOrList{dataType} {};

Column(const StorageStructureIDAndFName& structureIDAndFName, const common::DataType& dataType,
size_t elementSize, BufferManager* bufferManager, WAL* wal)
: BaseColumnOrList{structureIDAndFName, dataType, elementSize, bufferManager,
true /*hasNULLBytes*/, wal} {};
// TODO(Guodong): Clean up column constructors.
// Currently extended by SERIAL column.
explicit Column(const common::DataType& dataType)
: BaseColumnOrList{dataType}, tableID{common::INVALID_TABLE_ID} {};

Column(const StorageStructureIDAndFName& structureIDAndFName, const common::DataType& dataType,
BufferManager* bufferManager, WAL* wal)
: Column(structureIDAndFName, dataType, common::Types::getDataTypeSize(dataType),
bufferManager, wal){};

Column(const StorageStructureIDAndFName& structureIDAndFName, const common::DataType& dataType,
size_t elementSize, BufferManager* bufferManager, WAL* wal)
: Column{structureIDAndFName, dataType, elementSize, bufferManager, wal,
common::INVALID_TABLE_ID} {};

// Extended by INTERNAL_ID column.
Column(const StorageStructureIDAndFName& structureIDAndFName, const common::DataType& dataType,
size_t elementSize, BufferManager* bufferManager, WAL* wal, common::table_id_t tableID)
: BaseColumnOrList{structureIDAndFName, dataType, elementSize, bufferManager,
true /*hasNULLBytes*/, wal},
tableID{tableID} {
scanDataFunc = Column::scanValuesFromPage;
lookupDataFunc = Column::lookupValueFromPage;
}

// Expose for feature store
virtual void batchLookup(const common::offset_t* nodeOffsets, size_t size, uint8_t* result);

Expand All @@ -32,30 +53,28 @@ class Column : public BaseColumnOrList {
void writeValues(common::ValueVector* nodeIDVector, common::ValueVector* vectorToWriteFrom);

bool isNull(common::offset_t nodeOffset, transaction::Transaction* transaction);
void setNodeOffsetToNull(common::offset_t nodeOffset);
void setNull(common::offset_t nodeOffset);

// Currently, used only in CopyCSV tests.
// TODO(Guodong): Remove this function. Use `read` instead.
virtual common::Value readValueForTestingOnly(common::offset_t offset);

protected:
void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector, uint32_t vectorPos);

virtual void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector,
uint32_t vectorPos, PageElementCursor& cursor);
virtual inline void scan(transaction::Transaction* transaction,
common::ValueVector* resultVector, PageElementCursor& cursor) {
readBySequentialCopy(transaction, resultVector, cursor, identityMapper);
}
virtual void scanWithSelState(transaction::Transaction* transaction,
common::ValueVector* resultVector, PageElementCursor& cursor) {
readBySequentialCopyWithSelState(transaction, resultVector, cursor, identityMapper);
}
virtual void lookup(transaction::Transaction* transaction, common::offset_t nodeOffset,
common::ValueVector* resultVector, uint32_t vectorPos);
virtual void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector);
virtual void writeValueForSingleNodeIDPosition(common::offset_t nodeOffset,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom);
WALPageIdxPosInPageAndFrame beginUpdatingPage(common::offset_t nodeOffset,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom);

void readFromPage(transaction::Transaction* transaction, common::page_idx_t pageIdx,
const std::function<void(uint8_t*)>& func);

private:
// The reason why we make this function virtual is: we can't simply do memcpy on nodeIDs if
// the adjColumn has tableIDCompression, in this case we only store the nodeOffset in
Expand All @@ -74,35 +93,42 @@ class Column : public BaseColumnOrList {
WALPageIdxPosInPageAndFrame beginUpdatingPageAndWriteOnlyNullBit(
common::offset_t nodeOffset, bool isNull);

static void scanValuesFromPage(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, uint32_t numValuesToRead, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile);
static void lookupValueFromPage(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile);

protected:
// no logical-physical page mapping is required for columns
std::function<common::page_idx_t(common::page_idx_t)> identityMapper = [](uint32_t i) {
return i;
};

scan_data_func_t scanDataFunc;
lookup_data_func_t lookupDataFunc;
common::table_id_t tableID;
std::unique_ptr<DiskOverflowFile> diskOverflowFile;
};

class PropertyColumnWithOverflow : public Column {
public:
PropertyColumnWithOverflow(const StorageStructureIDAndFName& structureIDAndFNameOfMainColumn,
const common::DataType& dataType, BufferManager* bufferManager, WAL* wal)
: Column{structureIDAndFNameOfMainColumn, dataType, bufferManager, wal},
diskOverflowFile{structureIDAndFNameOfMainColumn, bufferManager, wal} {}

inline void read(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) override {
common::StringVector::resetOverflowBuffer(resultVector);
Column::read(transaction, nodeIDVector, resultVector);
: Column{structureIDAndFNameOfMainColumn, dataType, bufferManager, wal} {
diskOverflowFile =
std::make_unique<DiskOverflowFile>(structureIDAndFNameOfMainColumn, bufferManager, wal);
}
inline DiskOverflowFile* getDiskOverflowFile() { return &diskOverflowFile; }

inline BMFileHandle* getDiskOverflowFileHandle() { return diskOverflowFile.getFileHandle(); }
inline DiskOverflowFile* getDiskOverflowFile() { return diskOverflowFile.get(); }

protected:
DiskOverflowFile diskOverflowFile;
inline BMFileHandle* getDiskOverflowFileHandle() { return diskOverflowFile->getFileHandle(); }
};

class StringPropertyColumn : public PropertyColumnWithOverflow {

public:
StringPropertyColumn(const StorageStructureIDAndFName& structureIDAndFNameOfMainColumn,
const common::DataType& dataType, BufferManager* bufferManager, WAL* wal)
Expand All @@ -116,54 +142,47 @@ class StringPropertyColumn : public PropertyColumnWithOverflow {
common::Value readValueForTestingOnly(common::offset_t offset) override;

private:
inline void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector,
uint32_t vectorPos, PageElementCursor& cursor) override {
Column::lookup(transaction, resultVector, vectorPos, cursor);
inline void lookup(transaction::Transaction* transaction, common::offset_t nodeOffset,
common::ValueVector* resultVector, uint32_t vectorPos) override {
common::StringVector::resetOverflowBuffer(resultVector);
Column::lookup(transaction, nodeOffset, resultVector, vectorPos);
if (!resultVector->isNull(vectorPos)) {
diskOverflowFile.scanSingleStringOverflow(
diskOverflowFile->scanSingleStringOverflow(
transaction->getType(), *resultVector, vectorPos);
}
}
inline void scan(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override {
Column::scan(transaction, resultVector, cursor);
diskOverflowFile.scanStrings(transaction->getType(), *resultVector);
}
void scanWithSelState(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override {
Column::scanWithSelState(transaction, resultVector, cursor);
diskOverflowFile.scanStrings(transaction->getType(), *resultVector);
inline void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) override {
common::StringVector::resetOverflowBuffer(resultVector);
Column::scan(transaction, nodeIDVector, resultVector);
diskOverflowFile->scanStrings(transaction->getType(), *resultVector);
}
};

class ListPropertyColumn : public PropertyColumnWithOverflow {

public:
ListPropertyColumn(const StorageStructureIDAndFName& structureIDAndFNameOfMainColumn,
const common::DataType& dataType, BufferManager* bufferManager, WAL* wal)
: PropertyColumnWithOverflow{
structureIDAndFNameOfMainColumn, dataType, bufferManager, wal} {};
structureIDAndFNameOfMainColumn, dataType, bufferManager, wal} {
scanDataFunc = ListPropertyColumn::scanListsFromPage;
lookupDataFunc = ListPropertyColumn::lookupListFromPage;
};

void writeValueForSingleNodeIDPosition(common::offset_t nodeOffset,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) override;

common::Value readValueForTestingOnly(common::offset_t offset) override;

private:
// TODO(Ziyi): remove these function once we have changed the storage structure for lists.
void readListsToVector(PageElementCursor& cursor,
const std::function<common::page_idx_t(common::page_idx_t)>& logicalToPhysicalPageMapper,
transaction::Transaction* transaction, common::ValueVector* resultVector,
uint64_t vectorPos, uint64_t numValuesToRead);

void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector,
uint32_t vectorPos, PageElementCursor& cursor) override;

void scan(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override;

void scanWithSelState(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override;
static void scanListsFromPage(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, uint32_t numValuesToRead, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile);
static void lookupListFromPage(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile);
};

class StructPropertyColumn : public Column {
Expand All @@ -178,83 +197,39 @@ class StructPropertyColumn : public Column {
std::vector<std::unique_ptr<Column>> structFieldColumns;
};

class RelIDColumn : public Column {

class InternalIDColumn : public Column {
public:
RelIDColumn(const StorageStructureIDAndFName& structureIDAndFName, BufferManager* bufferManager,
WAL* wal)
InternalIDColumn(const StorageStructureIDAndFName& structureIDAndFName,
BufferManager* bufferManager, WAL* wal, common::table_id_t tableID)
: Column{structureIDAndFName, common::DataType(common::INTERNAL_ID),
sizeof(common::offset_t), bufferManager, wal},
commonTableID{structureIDAndFName.storageStructureID.columnFileID.relPropertyColumnID
.relNodeTableAndDir.relTableID} {
assert(structureIDAndFName.storageStructureID.columnFileID.columnType ==
ColumnType::REL_PROPERTY_COLUMN);
assert(structureIDAndFName.storageStructureID.storageStructureType ==
StorageStructureType::COLUMN);
sizeof(common::offset_t), bufferManager, wal, tableID} {
scanDataFunc = InternalIDColumn::scanInternalIDsFromPage;
lookupDataFunc = InternalIDColumn::lookupInternalIDFromPage;
}

private:
inline void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector,
uint32_t vectorPos, PageElementCursor& cursor) override {
readInternalIDsFromAPageBySequentialCopy(transaction, resultVector, vectorPos,
cursor.pageIdx, cursor.elemPosInPage, 1 /* numValuesToCopy */, commonTableID,
false /* hasNoNullGuarantee */);
}
inline void scan(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override {
readInternalIDsBySequentialCopy(transaction, resultVector, cursor, identityMapper,
commonTableID, false /* hasNoNullGuarantee */);
}
inline void scanWithSelState(transaction::Transaction* transaction,
common::ValueVector* resultVector, PageElementCursor& cursor) override {
readInternalIDsBySequentialCopyWithSelState(
transaction, resultVector, cursor, identityMapper, commonTableID);
}
inline void writeToPage(WALPageIdxPosInPageAndFrame& walPageInfo,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) override {
auto relID = vectorToWriteFrom->getValue<common::relID_t>(posInVectorToWriteFrom);
memcpy(walPageInfo.frame + mapElementPosToByteOffset(walPageInfo.posInPage), &relID.offset,
sizeof(relID.offset));
}

private:
common::table_id_t commonTableID;
static void scanInternalIDsFromPage(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, uint32_t numValuesToRead, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile);
static void lookupInternalIDFromPage(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile);
};

class AdjColumn : public Column {

class AdjColumn : public InternalIDColumn {
public:
AdjColumn(const StorageStructureIDAndFName& structureIDAndFName, common::table_id_t nbrTableID,
BufferManager* bufferManager, WAL* wal)
: Column{structureIDAndFName, common::DataType(common::INTERNAL_ID),
sizeof(common::offset_t), bufferManager, wal},
nbrTableID{nbrTableID} {};

private:
inline void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector,
uint32_t vectorPos, PageElementCursor& cursor) override {
readInternalIDsFromAPageBySequentialCopy(transaction, resultVector, vectorPos,
cursor.pageIdx, cursor.elemPosInPage, 1 /* numValuesToCopy */, nbrTableID,
false /* hasNoNullGuarantee */);
}
inline void scan(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override {
readInternalIDsBySequentialCopy(transaction, resultVector, cursor, identityMapper,
nbrTableID, false /* hasNoNullGuarantee */);
}
inline void scanWithSelState(transaction::Transaction* transaction,
common::ValueVector* resultVector, PageElementCursor& cursor) override {
readInternalIDsBySequentialCopyWithSelState(
transaction, resultVector, cursor, identityMapper, nbrTableID);
}
inline void writeToPage(WALPageIdxPosInPageAndFrame& walPageInfo,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) override {
*(walPageInfo.frame + mapElementPosToByteOffset(walPageInfo.posInPage)) =
vectorToWriteFrom->getValue<common::nodeID_t>(posInVectorToWriteFrom).offset;
}

private:
common::table_id_t nbrTableID;
: InternalIDColumn{structureIDAndFName, bufferManager, wal, nbrTableID} {}
};

class SerialColumn : public Column {
Expand All @@ -266,7 +241,6 @@ class SerialColumn : public Column {
};

class ColumnFactory {

public:
static std::unique_ptr<Column> getColumn(const StorageStructureIDAndFName& structureIDAndFName,
const common::DataType& dataType, BufferManager* bufferManager, WAL* wal) {
Expand All @@ -289,11 +263,14 @@ class ColumnFactory {
return std::make_unique<ListPropertyColumn>(
structureIDAndFName, dataType, bufferManager, wal);
case common::INTERNAL_ID:
// RelID column in rel tables.
assert(structureIDAndFName.storageStructureID.storageStructureType ==
StorageStructureType::COLUMN &&
structureIDAndFName.storageStructureID.columnFileID.columnType ==
ColumnType::REL_PROPERTY_COLUMN);
return std::make_unique<RelIDColumn>(structureIDAndFName, bufferManager, wal);
return std::make_unique<InternalIDColumn>(structureIDAndFName, bufferManager, wal,
structureIDAndFName.storageStructureID.columnFileID.relPropertyColumnID
.relNodeTableAndDir.relTableID);
case common::STRUCT:
return std::make_unique<StructPropertyColumn>(
structureIDAndFName, dataType, bufferManager, wal);
Expand Down
Loading