Skip to content

Commit

Permalink
Merge pull request #1539 from kuzudb/column-rework
Browse files Browse the repository at this point in the history
Rework column scan and lookup functions
  • Loading branch information
ray6080 committed May 15, 2023
2 parents 4f75391 + 96706a0 commit e9e27a1
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 245 deletions.
219 changes: 98 additions & 121 deletions src/include/storage/storage_structure/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,42 @@
namespace kuzu {
namespace storage {

class Column : public BaseColumnOrList {
using scan_data_func_t = std::function<void(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, uint32_t numValuesToRead, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile)>;
using lookup_data_func_t = std::function<void(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile)>;

class Column : public BaseColumnOrList {
public:
Column(const common::DataType& dataType) : BaseColumnOrList{dataType} {};

Column(const StorageStructureIDAndFName& structureIDAndFName, const common::DataType& dataType,
size_t elementSize, BufferManager* bufferManager, WAL* wal)
: BaseColumnOrList{structureIDAndFName, dataType, elementSize, bufferManager,
true /*hasNULLBytes*/, wal} {};
// TODO(Guodong): Clean up column constructors.
// Currently extended by SERIAL column.
explicit Column(const common::DataType& dataType)
: BaseColumnOrList{dataType}, tableID{common::INVALID_TABLE_ID} {};

Column(const StorageStructureIDAndFName& structureIDAndFName, const common::DataType& dataType,
BufferManager* bufferManager, WAL* wal)
: Column(structureIDAndFName, dataType, common::Types::getDataTypeSize(dataType),
bufferManager, wal){};

Column(const StorageStructureIDAndFName& structureIDAndFName, const common::DataType& dataType,
size_t elementSize, BufferManager* bufferManager, WAL* wal)
: Column{structureIDAndFName, dataType, elementSize, bufferManager, wal,
common::INVALID_TABLE_ID} {};

// Extended by INTERNAL_ID column.
Column(const StorageStructureIDAndFName& structureIDAndFName, const common::DataType& dataType,
size_t elementSize, BufferManager* bufferManager, WAL* wal, common::table_id_t tableID)
: BaseColumnOrList{structureIDAndFName, dataType, elementSize, bufferManager,
true /*hasNULLBytes*/, wal},
tableID{tableID} {
scanDataFunc = Column::scanValuesFromPage;
lookupDataFunc = Column::lookupValueFromPage;
}

// Expose for feature store
virtual void batchLookup(const common::offset_t* nodeOffsets, size_t size, uint8_t* result);

Expand All @@ -32,30 +53,28 @@ class Column : public BaseColumnOrList {
void writeValues(common::ValueVector* nodeIDVector, common::ValueVector* vectorToWriteFrom);

bool isNull(common::offset_t nodeOffset, transaction::Transaction* transaction);
void setNodeOffsetToNull(common::offset_t nodeOffset);
void setNull(common::offset_t nodeOffset);

// Currently, used only in CopyCSV tests.
// TODO(Guodong): Remove this function. Use `read` instead.
virtual common::Value readValueForTestingOnly(common::offset_t offset);

protected:
void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector, uint32_t vectorPos);

virtual void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector,
uint32_t vectorPos, PageElementCursor& cursor);
virtual inline void scan(transaction::Transaction* transaction,
common::ValueVector* resultVector, PageElementCursor& cursor) {
readBySequentialCopy(transaction, resultVector, cursor, identityMapper);
}
virtual void scanWithSelState(transaction::Transaction* transaction,
common::ValueVector* resultVector, PageElementCursor& cursor) {
readBySequentialCopyWithSelState(transaction, resultVector, cursor, identityMapper);
}
virtual void lookup(transaction::Transaction* transaction, common::offset_t nodeOffset,
common::ValueVector* resultVector, uint32_t vectorPos);
virtual void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector);
virtual void writeValueForSingleNodeIDPosition(common::offset_t nodeOffset,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom);
WALPageIdxPosInPageAndFrame beginUpdatingPage(common::offset_t nodeOffset,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom);

void readFromPage(transaction::Transaction* transaction, common::page_idx_t pageIdx,
const std::function<void(uint8_t*)>& func);

private:
// The reason why we make this function virtual is: we can't simply do memcpy on nodeIDs if
// the adjColumn has tableIDCompression, in this case we only store the nodeOffset in
Expand All @@ -74,35 +93,42 @@ class Column : public BaseColumnOrList {
WALPageIdxPosInPageAndFrame beginUpdatingPageAndWriteOnlyNullBit(
common::offset_t nodeOffset, bool isNull);

static void scanValuesFromPage(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, uint32_t numValuesToRead, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile);
static void lookupValueFromPage(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile);

protected:
// no logical-physical page mapping is required for columns
std::function<common::page_idx_t(common::page_idx_t)> identityMapper = [](uint32_t i) {
return i;
};

scan_data_func_t scanDataFunc;
lookup_data_func_t lookupDataFunc;
common::table_id_t tableID;
std::unique_ptr<DiskOverflowFile> diskOverflowFile;
};

class PropertyColumnWithOverflow : public Column {
public:
PropertyColumnWithOverflow(const StorageStructureIDAndFName& structureIDAndFNameOfMainColumn,
const common::DataType& dataType, BufferManager* bufferManager, WAL* wal)
: Column{structureIDAndFNameOfMainColumn, dataType, bufferManager, wal},
diskOverflowFile{structureIDAndFNameOfMainColumn, bufferManager, wal} {}

inline void read(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) override {
common::StringVector::resetOverflowBuffer(resultVector);
Column::read(transaction, nodeIDVector, resultVector);
: Column{structureIDAndFNameOfMainColumn, dataType, bufferManager, wal} {
diskOverflowFile =
std::make_unique<DiskOverflowFile>(structureIDAndFNameOfMainColumn, bufferManager, wal);
}
inline DiskOverflowFile* getDiskOverflowFile() { return &diskOverflowFile; }

inline BMFileHandle* getDiskOverflowFileHandle() { return diskOverflowFile.getFileHandle(); }
inline DiskOverflowFile* getDiskOverflowFile() { return diskOverflowFile.get(); }

protected:
DiskOverflowFile diskOverflowFile;
inline BMFileHandle* getDiskOverflowFileHandle() { return diskOverflowFile->getFileHandle(); }
};

class StringPropertyColumn : public PropertyColumnWithOverflow {

public:
StringPropertyColumn(const StorageStructureIDAndFName& structureIDAndFNameOfMainColumn,
const common::DataType& dataType, BufferManager* bufferManager, WAL* wal)
Expand All @@ -116,54 +142,47 @@ class StringPropertyColumn : public PropertyColumnWithOverflow {
common::Value readValueForTestingOnly(common::offset_t offset) override;

private:
inline void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector,
uint32_t vectorPos, PageElementCursor& cursor) override {
Column::lookup(transaction, resultVector, vectorPos, cursor);
inline void lookup(transaction::Transaction* transaction, common::offset_t nodeOffset,
common::ValueVector* resultVector, uint32_t vectorPos) override {
common::StringVector::resetOverflowBuffer(resultVector);
Column::lookup(transaction, nodeOffset, resultVector, vectorPos);
if (!resultVector->isNull(vectorPos)) {
diskOverflowFile.scanSingleStringOverflow(
diskOverflowFile->scanSingleStringOverflow(
transaction->getType(), *resultVector, vectorPos);
}
}
inline void scan(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override {
Column::scan(transaction, resultVector, cursor);
diskOverflowFile.scanStrings(transaction->getType(), *resultVector);
}
void scanWithSelState(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override {
Column::scanWithSelState(transaction, resultVector, cursor);
diskOverflowFile.scanStrings(transaction->getType(), *resultVector);
inline void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) override {
common::StringVector::resetOverflowBuffer(resultVector);
Column::scan(transaction, nodeIDVector, resultVector);
diskOverflowFile->scanStrings(transaction->getType(), *resultVector);
}
};

class ListPropertyColumn : public PropertyColumnWithOverflow {

public:
ListPropertyColumn(const StorageStructureIDAndFName& structureIDAndFNameOfMainColumn,
const common::DataType& dataType, BufferManager* bufferManager, WAL* wal)
: PropertyColumnWithOverflow{
structureIDAndFNameOfMainColumn, dataType, bufferManager, wal} {};
structureIDAndFNameOfMainColumn, dataType, bufferManager, wal} {
scanDataFunc = ListPropertyColumn::scanListsFromPage;
lookupDataFunc = ListPropertyColumn::lookupListFromPage;
};

void writeValueForSingleNodeIDPosition(common::offset_t nodeOffset,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) override;

common::Value readValueForTestingOnly(common::offset_t offset) override;

private:
// TODO(Ziyi): remove these function once we have changed the storage structure for lists.
void readListsToVector(PageElementCursor& cursor,
const std::function<common::page_idx_t(common::page_idx_t)>& logicalToPhysicalPageMapper,
transaction::Transaction* transaction, common::ValueVector* resultVector,
uint64_t vectorPos, uint64_t numValuesToRead);

void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector,
uint32_t vectorPos, PageElementCursor& cursor) override;

void scan(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override;

void scanWithSelState(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override;
static void scanListsFromPage(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, uint32_t numValuesToRead, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile);
static void lookupListFromPage(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile);
};

class StructPropertyColumn : public Column {
Expand All @@ -178,83 +197,39 @@ class StructPropertyColumn : public Column {
std::vector<std::unique_ptr<Column>> structFieldColumns;
};

class RelIDColumn : public Column {

class InternalIDColumn : public Column {
public:
RelIDColumn(const StorageStructureIDAndFName& structureIDAndFName, BufferManager* bufferManager,
WAL* wal)
InternalIDColumn(const StorageStructureIDAndFName& structureIDAndFName,
BufferManager* bufferManager, WAL* wal, common::table_id_t tableID)
: Column{structureIDAndFName, common::DataType(common::INTERNAL_ID),
sizeof(common::offset_t), bufferManager, wal},
commonTableID{structureIDAndFName.storageStructureID.columnFileID.relPropertyColumnID
.relNodeTableAndDir.relTableID} {
assert(structureIDAndFName.storageStructureID.columnFileID.columnType ==
ColumnType::REL_PROPERTY_COLUMN);
assert(structureIDAndFName.storageStructureID.storageStructureType ==
StorageStructureType::COLUMN);
sizeof(common::offset_t), bufferManager, wal, tableID} {
scanDataFunc = InternalIDColumn::scanInternalIDsFromPage;
lookupDataFunc = InternalIDColumn::lookupInternalIDFromPage;
}

private:
inline void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector,
uint32_t vectorPos, PageElementCursor& cursor) override {
readInternalIDsFromAPageBySequentialCopy(transaction, resultVector, vectorPos,
cursor.pageIdx, cursor.elemPosInPage, 1 /* numValuesToCopy */, commonTableID,
false /* hasNoNullGuarantee */);
}
inline void scan(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override {
readInternalIDsBySequentialCopy(transaction, resultVector, cursor, identityMapper,
commonTableID, false /* hasNoNullGuarantee */);
}
inline void scanWithSelState(transaction::Transaction* transaction,
common::ValueVector* resultVector, PageElementCursor& cursor) override {
readInternalIDsBySequentialCopyWithSelState(
transaction, resultVector, cursor, identityMapper, commonTableID);
}
inline void writeToPage(WALPageIdxPosInPageAndFrame& walPageInfo,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) override {
auto relID = vectorToWriteFrom->getValue<common::relID_t>(posInVectorToWriteFrom);
memcpy(walPageInfo.frame + mapElementPosToByteOffset(walPageInfo.posInPage), &relID.offset,
sizeof(relID.offset));
}

private:
common::table_id_t commonTableID;
static void scanInternalIDsFromPage(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, uint32_t numValuesToRead, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile);
static void lookupInternalIDFromPage(transaction::Transaction* transaction, uint8_t* frame,
PageElementCursor& pageCursor, common::ValueVector* resultVector, uint32_t posInVector,
uint32_t numElementsPerPage, common::table_id_t commonTableID,
DiskOverflowFile* diskOverflowFile);
};

class AdjColumn : public Column {

class AdjColumn : public InternalIDColumn {
public:
AdjColumn(const StorageStructureIDAndFName& structureIDAndFName, common::table_id_t nbrTableID,
BufferManager* bufferManager, WAL* wal)
: Column{structureIDAndFName, common::DataType(common::INTERNAL_ID),
sizeof(common::offset_t), bufferManager, wal},
nbrTableID{nbrTableID} {};

private:
inline void lookup(transaction::Transaction* transaction, common::ValueVector* resultVector,
uint32_t vectorPos, PageElementCursor& cursor) override {
readInternalIDsFromAPageBySequentialCopy(transaction, resultVector, vectorPos,
cursor.pageIdx, cursor.elemPosInPage, 1 /* numValuesToCopy */, nbrTableID,
false /* hasNoNullGuarantee */);
}
inline void scan(transaction::Transaction* transaction, common::ValueVector* resultVector,
PageElementCursor& cursor) override {
readInternalIDsBySequentialCopy(transaction, resultVector, cursor, identityMapper,
nbrTableID, false /* hasNoNullGuarantee */);
}
inline void scanWithSelState(transaction::Transaction* transaction,
common::ValueVector* resultVector, PageElementCursor& cursor) override {
readInternalIDsBySequentialCopyWithSelState(
transaction, resultVector, cursor, identityMapper, nbrTableID);
}
inline void writeToPage(WALPageIdxPosInPageAndFrame& walPageInfo,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) override {
*(walPageInfo.frame + mapElementPosToByteOffset(walPageInfo.posInPage)) =
vectorToWriteFrom->getValue<common::nodeID_t>(posInVectorToWriteFrom).offset;
}

private:
common::table_id_t nbrTableID;
: InternalIDColumn{structureIDAndFName, bufferManager, wal, nbrTableID} {}
};

class SerialColumn : public Column {
Expand All @@ -266,7 +241,6 @@ class SerialColumn : public Column {
};

class ColumnFactory {

public:
static std::unique_ptr<Column> getColumn(const StorageStructureIDAndFName& structureIDAndFName,
const common::DataType& dataType, BufferManager* bufferManager, WAL* wal) {
Expand All @@ -289,11 +263,14 @@ class ColumnFactory {
return std::make_unique<ListPropertyColumn>(
structureIDAndFName, dataType, bufferManager, wal);
case common::INTERNAL_ID:
// RelID column in rel tables.
assert(structureIDAndFName.storageStructureID.storageStructureType ==
StorageStructureType::COLUMN &&
structureIDAndFName.storageStructureID.columnFileID.columnType ==
ColumnType::REL_PROPERTY_COLUMN);
return std::make_unique<RelIDColumn>(structureIDAndFName, bufferManager, wal);
return std::make_unique<InternalIDColumn>(structureIDAndFName, bufferManager, wal,
structureIDAndFName.storageStructureID.columnFileID.relPropertyColumnID
.relNodeTableAndDir.relTableID);
case common::STRUCT:
return std::make_unique<StructPropertyColumn>(
structureIDAndFName, dataType, bufferManager, wal);
Expand Down
Loading

0 comments on commit e9e27a1

Please sign in to comment.