Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify initListReadingState #1138

Merged
merged 1 commit into from
Dec 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,23 @@
namespace kuzu {
namespace processor {

class ListExtendAndScanRelProperties : public BaseExtendAndScanRelProperties {
class ScanRelTableLists : public BaseExtendAndScanRelProperties {
public:
ListExtendAndScanRelProperties(const DataPos& inNodeIDVectorPos,
const DataPos& outNodeIDVectorPos, vector<DataPos> outPropertyVectorsPos, Lists* adjList,
vector<Lists*> propertyLists, unique_ptr<PhysicalOperator> child, uint32_t id,
const string& paramsString)
ScanRelTableLists(const DataPos& inNodeIDVectorPos, const DataPos& outNodeIDVectorPos,
vector<DataPos> outPropertyVectorsPos, Lists* adjList, vector<Lists*> propertyLists,
unique_ptr<PhysicalOperator> child, uint32_t id, const string& paramsString)
: BaseExtendAndScanRelProperties{PhysicalOperatorType::LIST_EXTEND, inNodeIDVectorPos,
outNodeIDVectorPos, std::move(outPropertyVectorsPos), std::move(child), id,
paramsString},
adjList{adjList}, propertyLists{std::move(propertyLists)} {}
~ListExtendAndScanRelProperties() override = default;
~ScanRelTableLists() override = default;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

bool getNextTuplesInternal() override;

inline unique_ptr<PhysicalOperator> clone() override {
return make_unique<ListExtendAndScanRelProperties>(inNodeIDVectorPos, outNodeIDVectorPos,
return make_unique<ScanRelTableLists>(inNodeIDVectorPos, outNodeIDVectorPos,
outPropertyVectorsPos, adjList, propertyLists, children[0]->clone(), id, paramsString);
}

Expand Down
12 changes: 7 additions & 5 deletions src/include/processor/result/factorized_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ class DataBlockCollection {
DataBlockCollection(uint32_t numBytesPerTuple, uint32_t numTuplesPerBlock)
: numBytesPerTuple{numBytesPerTuple}, numTuplesPerBlock{numTuplesPerBlock} {}

inline void append(unique_ptr<DataBlock> otherBlock) { blocks.push_back(move(otherBlock)); }
inline void append(unique_ptr<DataBlock> otherBlock) {
blocks.push_back(std::move(otherBlock));
}
inline void append(vector<unique_ptr<DataBlock>> otherBlocks) {
move(begin(otherBlocks), end(otherBlocks), back_inserter(blocks));
}
inline void append(unique_ptr<DataBlockCollection> other) { append(move(other->blocks)); }
inline void append(unique_ptr<DataBlockCollection> other) { append(std::move(other->blocks)); }
inline bool isEmpty() { return blocks.empty(); }
inline vector<unique_ptr<DataBlock>>& getBlocks() { return blocks; }
inline DataBlock* getBlock(uint32_t blockIdx) { return blocks[blockIdx].get(); }
Expand Down Expand Up @@ -121,7 +123,7 @@ class FactorizedTableSchema {
FactorizedTableSchema(const FactorizedTableSchema& other);

explicit FactorizedTableSchema(vector<unique_ptr<ColumnSchema>> columns)
: columns{move(columns)} {}
: columns{std::move(columns)} {}

void appendColumn(unique_ptr<ColumnSchema> column);

Expand Down Expand Up @@ -240,7 +242,7 @@ class FactorizedTable {
// inside overflowFileOfInMemList.
void copyToInMemList(uint32_t colIdx, vector<uint64_t>& tupleIdxesToRead, uint8_t* data,
NullMask* nullMask, uint64_t startElemPosInList, DiskOverflowFile* overflowFileOfInMemList,
DataType type, NodeIDCompressionScheme* nodeIDCompressionScheme) const;
const DataType& type, NodeIDCompressionScheme* nodeIDCompressionScheme) const;
void clear();
int64_t findValueInFlatColumn(uint64_t colIdx, int64_t value) const;

Expand Down Expand Up @@ -292,7 +294,7 @@ class FactorizedTable {
readFlatColToUnflatVector(tuplesToRead, colIdx, vector, numTuplesToRead);
}
static void copyOverflowIfNecessary(
uint8_t* dst, uint8_t* src, DataType type, DiskOverflowFile* diskOverflowFile);
uint8_t* dst, uint8_t* src, const DataType& type, DiskOverflowFile* diskOverflowFile);

private:
MemoryManager* memoryManager;
Expand Down
42 changes: 26 additions & 16 deletions src/include/storage/storage_structure/lists/list_sync_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ namespace kuzu {
namespace storage {

enum class ListSourceStore : uint8_t {
PersistentStore = 0,
ListsUpdateStore = 1,
PERSISTENT_STORE = 0,
UPDATE_STORE = 1,
};

// ListSyncState holds the data that is required to synchronize reading from multiple Lists that
Expand All @@ -28,12 +28,16 @@ class ListSyncState {
public:
ListSyncState() { reset(); };

inline void setNumValuesInList(uint64_t numValuesInList_) {
this->numValuesInList = numValuesInList_;
}
inline void setBoundNodeOffset(node_offset_t boundNodeOffset_) {
inline void init(node_offset_t boundNodeOffset_, list_header_t listHeader_,
uint64_t numValuesInUpdateStore_, uint64_t numValuesInPersistentStore_,
ListSourceStore sourceStore_) {
this->boundNodeOffset = boundNodeOffset_;
this->listHeader = listHeader_;
this->numValuesInUpdateStore = numValuesInUpdateStore_;
this->numValuesInPersistentStore = numValuesInPersistentStore_;
this->sourceStore = sourceStore_;
}

inline void setRangeToRead(uint32_t startIdx_, uint32_t numValuesToRead_) {
this->startElemOffset = startIdx_;
this->numValuesToRead = numValuesToRead_;
Expand All @@ -43,27 +47,33 @@ class ListSyncState {
inline uint32_t getEndElemOffset() const { return startElemOffset + numValuesToRead; }
inline bool hasValidRangeToRead() const { return UINT32_MAX != startElemOffset; }
inline uint32_t getNumValuesToRead() const { return numValuesToRead; }
inline uint64_t getNumValuesInList() const { return numValuesInList; }
inline ListSourceStore getListSourceStore() const { return sourceStore; }
inline void setSourceStore(ListSourceStore sourceStore) { this->sourceStore = sourceStore; }
inline void setDataToReadFromUpdateStore(bool dataToReadFromUpdateStore_) {
dataToReadFromUpdateStore = dataToReadFromUpdateStore_;
}
inline bool hasDataToReadFromUpdateStore() const { return dataToReadFromUpdateStore; }
inline list_header_t getListHeader() const { return listHeader; }
inline void setListHeader(list_header_t listHeader_) { listHeader = listHeader_; }
inline uint32_t getNumValuesInList() {
return sourceStore == ListSourceStore::PERSISTENT_STORE ? numValuesInPersistentStore :
numValuesInUpdateStore;
}

bool hasMoreToRead();
bool hasMoreAndSwitchSourceIfNecessary();
void reset();

private:
inline bool hasMoreLeftInList() {
return (startElemOffset + numValuesToRead) < getNumValuesInList();
}
inline void switchToUpdateStore() {
sourceStore = ListSourceStore::UPDATE_STORE;
startElemOffset = UINT32_MAX;
}

private:
node_offset_t boundNodeOffset;
list_header_t listHeader;
uint32_t numValuesInUpdateStore;
uint32_t numValuesInPersistentStore;
uint32_t startElemOffset;
uint32_t numValuesToRead;
uint64_t numValuesInList;
ListSourceStore sourceStore;
bool dataToReadFromUpdateStore;
};

} // namespace storage
Expand Down
15 changes: 9 additions & 6 deletions src/include/storage/storage_structure/lists/lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ class Lists : public BaseColumnOrList {
Lists(const StorageStructureIDAndFName& storageStructureIDAndFName, const DataType& dataType,
const size_t& elementSize, shared_ptr<ListHeaders> headers, BufferManager& bufferManager,
bool isInMemory, WAL* wal, ListsUpdateStore* listsUpdateStore)
: Lists{storageStructureIDAndFName, dataType, elementSize, move(headers), bufferManager,
true /*hasNULLBytes*/, isInMemory, wal, listsUpdateStore} {};
: Lists{storageStructureIDAndFName, dataType, elementSize, std::move(headers),
bufferManager, true /*hasNULLBytes*/, isInMemory, wal, listsUpdateStore} {};
inline ListsMetadata& getListsMetadata() { return metadata; };
inline shared_ptr<ListHeaders> getHeaders() const { return headers; };
// TODO(Guodong): change the input to header.
inline uint64_t getNumElementsFromListHeader(node_offset_t nodeOffset) const {
auto header = headers->getHeader(nodeOffset);
return ListHeaders::isALargeList(header) ?
Expand Down Expand Up @@ -150,7 +151,7 @@ class Lists : public BaseColumnOrList {
: BaseColumnOrList{storageStructureIDAndFName, dataType, elementSize, bufferManager,
hasNULLBytes, isInMemory, wal},
storageStructureIDAndFName{storageStructureIDAndFName},
metadata{storageStructureIDAndFName, &bufferManager, wal}, headers{move(headers)},
metadata{storageStructureIDAndFName, &bufferManager, wal}, headers{std::move(headers)},
listsUpdateStore{listsUpdateStore} {};

private:
Expand All @@ -171,7 +172,7 @@ class PropertyListsWithOverflow : public Lists {
const DataType& dataType, shared_ptr<ListHeaders> headers, BufferManager& bufferManager,
bool isInMemory, WAL* wal, ListsUpdateStore* listsUpdateStore)
: Lists{storageStructureIDAndFName, dataType, Types::getDataTypeSize(dataType),
move(headers), bufferManager, isInMemory, wal, listsUpdateStore},
std::move(headers), bufferManager, isInMemory, wal, listsUpdateStore},
diskOverflowFile{storageStructureIDAndFName, bufferManager, isInMemory, wal} {}

private:
Expand Down Expand Up @@ -253,6 +254,8 @@ class AdjLists : public Lists {
const shared_ptr<ValueVector>& valueVector, ListHandle& listHandle) override;
void readFromListsUpdateStore(
ListSyncState& listSyncState, const shared_ptr<ValueVector>& valueVector);
void readFromListsPersistentStore(
ListHandle& listHandle, const shared_ptr<ValueVector>& valueVector);

private:
NodeIDCompressionScheme nodeIDCompressionScheme;
Expand All @@ -264,8 +267,8 @@ class RelIDList : public Lists {
RelIDList(const StorageStructureIDAndFName& storageStructureIDAndFName,
const DataType& dataType, const size_t& elementSize, shared_ptr<ListHeaders> headers,
BufferManager& bufferManager, bool isInMemory, WAL* wal, ListsUpdateStore* listsUpdateStore)
: Lists{storageStructureIDAndFName, dataType, elementSize, headers, bufferManager,
isInMemory, wal, listsUpdateStore} {}
: Lists{storageStructureIDAndFName, dataType, elementSize, std::move(headers),
bufferManager, isInMemory, wal, listsUpdateStore} {}
void setDeletedRelsIfNecessary(Transaction* transaction, ListSyncState& listSyncState,
const shared_ptr<ValueVector>& relIDVector) override;
unordered_set<uint64_t> getDeletedRelOffsetsInListForNodeOffset(node_offset_t nodeOffset);
Expand Down
9 changes: 4 additions & 5 deletions src/processor/mapper/map_extend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include "processor/mapper/plan_mapper.h"
#include "processor/operator/generic_extend.h"
#include "processor/operator/scan_column/adj_column_extend.h"
#include "processor/operator/scan_list/adj_list_extend.h"
#include "processor/operator/scan_list/scan_rel_table_lists.h"
#include "processor/operator/var_length_extend/var_length_adj_list_extend.h"
#include "processor/operator/var_length_extend/var_length_column_extend.h"

Expand Down Expand Up @@ -133,10 +133,9 @@ unique_ptr<PhysicalOperator> PlanMapper::mapLogicalExtendToPhysical(
} else {
auto propertyLists = populatePropertyLists(
boundNodeTableID, relTableID, direction, extend->getProperties(), relsStore);
return make_unique<ListExtendAndScanRelProperties>(inNodeIDVectorPos,
outNodeIDVectorPos, std::move(outPropertyVectorsPos), adjList,
std::move(propertyLists), std::move(prevOperator), getOperatorID(),
extend->getExpressionsForPrinting());
return make_unique<ScanRelTableLists>(inNodeIDVectorPos, outNodeIDVectorPos,
std::move(outPropertyVectorsPos), adjList, std::move(propertyLists),
std::move(prevOperator), getOperatorID(), extend->getExpressionsForPrinting());
}
}
} else { // map to generic extend
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/generic_extend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ bool AdjAndPropertyCollection::scanLists(const shared_ptr<ValueVector>& inVector
if (currentListIdx != UINT32_MAX) { // check current list
auto currentAdjList = adjCollection->lists[currentListIdx];
auto currentAdjListHandle = adjCollection->listHandles[currentListIdx].get();
if (currentAdjListHandle->listSyncState.hasMoreToRead()) {
if (currentAdjListHandle->listSyncState.hasMoreAndSwitchSourceIfNecessary()) {
// scan current adjList
currentAdjList->readValues(outNodeVector, *currentAdjListHandle);
scanPropertyList(currentListIdx, outPropertyVectors, transaction);
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/scan_list/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
add_library(kuzu_processor_operator_scan_list
OBJECT
adj_list_extend.cpp
scan_rel_table_lists.cpp
)

set(ALL_OBJECT_FILES
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#include "processor/operator/scan_list/adj_list_extend.h"
#include "processor/operator/scan_list/scan_rel_table_lists.h"

namespace kuzu {
namespace processor {

void ListExtendAndScanRelProperties::initLocalStateInternal(
ResultSet* resultSet, ExecutionContext* context) {
void ScanRelTableLists::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) {
BaseExtendAndScanRelProperties::initLocalStateInternal(resultSet, context);
syncState = make_unique<ListSyncState>();
adjListHandle = make_shared<ListHandle>(*syncState);
Expand All @@ -13,8 +12,8 @@ void ListExtendAndScanRelProperties::initLocalStateInternal(
}
}

bool ListExtendAndScanRelProperties::getNextTuplesInternal() {
if (adjListHandle->listSyncState.hasMoreToRead()) {
bool ScanRelTableLists::getNextTuplesInternal() {
if (adjListHandle->listSyncState.hasMoreAndSwitchSourceIfNecessary()) {
adjList->readValues(outNodeIDVector, *adjListHandle);
} else {
do {
Expand All @@ -41,7 +40,7 @@ bool ListExtendAndScanRelProperties::getNextTuplesInternal() {
return true;
}

void ListExtendAndScanRelProperties::scanPropertyLists() {
void ScanRelTableLists::scanPropertyLists() {
for (auto i = 0u; i < propertyLists.size(); ++i) {
outPropertyVectors[i]->resetOverflowBuffer();
propertyLists[i]->readValues(outPropertyVectors[i], *propertyListHandles[i]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ bool VarLengthAdjListExtend::addDFSLevelToStackIfParentExtends(uint64_t parent,

bool VarLengthAdjListExtend::getNextBatchOfNbrNodes(
shared_ptr<AdjListExtendDFSLevelInfo>& dfsLevel) const {
if (dfsLevel->listHandle->listSyncState.hasMoreToRead()) {
if (dfsLevel->listHandle->listSyncState.hasMoreAndSwitchSourceIfNecessary()) {
((AdjLists*)storage)->readValues(dfsLevel->children, *dfsLevel->listHandle);
return true;
}
Expand Down
4 changes: 2 additions & 2 deletions src/processor/result/factorized_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ void FactorizedTable::setNonOverflowColNull(uint8_t* nullBuffer, uint32_t colIdx

void FactorizedTable::copyToInMemList(uint32_t colIdx, vector<uint64_t>& tupleIdxesToRead,
uint8_t* data, NullMask* nullMask, uint64_t startElemPosInList,
DiskOverflowFile* overflowFileOfInMemList, DataType type,
DiskOverflowFile* overflowFileOfInMemList, const DataType& type,
NodeIDCompressionScheme* nodeIDCompressionScheme) const {
auto column = tableSchema->getColumn(colIdx);
assert(column->isFlat() == true);
Expand Down Expand Up @@ -641,7 +641,7 @@ void FactorizedTable::readFlatColToUnflatVector(
}

void FactorizedTable::copyOverflowIfNecessary(
uint8_t* dst, uint8_t* src, DataType type, DiskOverflowFile* diskOverflowFile) {
uint8_t* dst, uint8_t* src, const DataType& type, DiskOverflowFile* diskOverflowFile) {
switch (type.typeID) {
case STRING: {
ku_string_t* stringToWriteFrom = (ku_string_t*)src;
Expand Down
15 changes: 9 additions & 6 deletions src/storage/storage_structure/lists/list_sync_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
namespace kuzu {
namespace storage {

bool ListSyncState::hasMoreToRead() {
if (hasValidRangeToRead() && (startElemOffset + numValuesToRead != numValuesInList)) {
bool ListSyncState::hasMoreAndSwitchSourceIfNecessary() {
if (hasValidRangeToRead() && hasMoreLeftInList()) {
// Has more in the current source store.
return true;
}
if (dataToReadFromUpdateStore && sourceStore == ListSourceStore::PersistentStore) {
if (sourceStore == ListSourceStore::PERSISTENT_STORE && numValuesInUpdateStore > 0) {
// Switch from PERSISTENT_STORE to UPDATE_STORE.
switchToUpdateStore();
return true;
}
return false;
Expand All @@ -17,9 +20,9 @@ void ListSyncState::reset() {
boundNodeOffset = UINT64_MAX;
startElemOffset = UINT32_MAX;
numValuesToRead = UINT32_MAX;
numValuesInList = UINT64_MAX;
sourceStore = ListSourceStore::PersistentStore;
dataToReadFromUpdateStore = false;
numValuesInUpdateStore = 0;
numValuesInPersistentStore = 0;
sourceStore = ListSourceStore::PERSISTENT_STORE;
}

} // namespace storage
Expand Down
Loading