Skip to content

Commit

Permalink
remove currIdx from datachunk state
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Aug 27, 2023
1 parent 8174d25 commit d917264
Show file tree
Hide file tree
Showing 16 changed files with 54 additions and 315 deletions.
2 changes: 1 addition & 1 deletion src/common/data_chunk/data_chunk_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace common {
std::shared_ptr<DataChunkState> DataChunkState::getSingleValueDataChunkState() {
auto state = std::make_shared<DataChunkState>(1);
state->initOriginalAndSelectedSize(1);
state->currIdx = 0;
state->setToFlat();
return state;
}

Expand Down
20 changes: 13 additions & 7 deletions src/include/common/data_chunk/data_chunk_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ enum class FactorizationStateType : uint8_t {
class DataChunkState {
public:
DataChunkState() : DataChunkState(DEFAULT_VECTOR_CAPACITY) {}
explicit DataChunkState(uint64_t capacity) : currIdx{-1}, originalSize{0} {
explicit DataChunkState(uint64_t capacity)
: factorizationState{FactorizationStateType::UNFLAT}, originalSize{0} {
selVector = std::make_shared<SelectionVector>(capacity);
}

Expand All @@ -27,21 +28,26 @@ class DataChunkState {
originalSize = size;
selVector->selectedSize = size;
}
inline bool isFlat() const { return currIdx != -1; }
inline void setToUnflat() { currIdx = -1; }
inline uint64_t getNumSelectedValues() const { return isFlat() ? 1 : selVector->selectedSize; }
inline void setOriginalSize(uint64_t size) { originalSize = size; }
inline uint64_t getOriginalSize() { return originalSize; }
inline bool isFlat() const { return factorizationState == FactorizationStateType::FLAT; }
inline void setToFlat() { factorizationState = FactorizationStateType::FLAT; }
inline void setToUnflat() { factorizationState = FactorizationStateType::UNFLAT; }

inline uint64_t getNumSelectedValues() const { return selVector->selectedSize; }

public:
// The currIdx is >= 0 when vectors are flattened and -1 if the vectors are unflat.
int64_t currIdx;
std::shared_ptr<SelectionVector> selVector;

private:
FactorizationStateType factorizationState = FactorizationStateType::UNFLAT;
// We need to keep track of originalSize of DataChunks to perform consistent scans of vectors
// or lists. This is because all the vectors in a data chunk has to be the same length as they
// share the same selectedPositions array.Therefore, if there is a scan after a filter on the
// data chunk, the selectedSize of selVector might decrease, so the scan cannot know how much it
// has to scan to generate a vector that is consistent with the rest of the vectors in the
// data chunk.
uint64_t originalSize;
std::shared_ptr<SelectionVector> selVector;
};

} // namespace common
Expand Down
18 changes: 10 additions & 8 deletions src/include/processor/operator/flatten.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@
namespace kuzu {
namespace processor {

struct FlattenLocalState {
uint64_t currentIdx = 0;
uint64_t sizeToFlatten = 0;
};

class Flatten : public PhysicalOperator, SelVectorOverWriter {
public:
Flatten(uint32_t dataChunkToFlattenPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString)
Flatten(data_chunk_pos_t dataChunkToFlattenPos, std::unique_ptr<PhysicalOperator> child,
uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::FLATTEN, std::move(child), id, paramsString},
dataChunkToFlattenPos{dataChunkToFlattenPos} {}

Expand All @@ -22,15 +27,12 @@ class Flatten : public PhysicalOperator, SelVectorOverWriter {
}

private:
inline bool isCurrIdxInitialOrLast() {
return dataChunkToFlatten->state->currIdx == -1 ||
dataChunkToFlatten->state->currIdx == (prevSelVector->selectedSize - 1);
}
void resetToCurrentSelVector(std::shared_ptr<common::SelectionVector>& selVector) override;

private:
uint32_t dataChunkToFlattenPos;
std::shared_ptr<common::DataChunk> dataChunkToFlatten;
data_chunk_pos_t dataChunkToFlattenPos;
common::DataChunkState* dataChunkState;
std::unique_ptr<FlattenLocalState> localState;
};

} // namespace processor
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/aggregate/aggregate_hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ void AggregateHashTable::initializeHashTable(uint64_t numEntriesToAllocate) {

void AggregateHashTable::initializeTmpVectors() {
hashState = std::make_shared<DataChunkState>();
hashState->currIdx = 0;
hashState->setToFlat();
hashVector = std::make_unique<ValueVector>(LogicalTypeID::INT64, &memoryManager);
hashVector->state = hashState;
hashSlotsToUpdateAggState = std::make_unique<HashSlot*[]>(DEFAULT_VECTOR_CAPACITY);
Expand Down
17 changes: 10 additions & 7 deletions src/processor/operator/flatten.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,25 @@ namespace kuzu {
namespace processor {

void Flatten::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) {
dataChunkToFlatten = resultSet->dataChunks[dataChunkToFlattenPos];
dataChunkState = resultSet->dataChunks[dataChunkToFlattenPos]->state.get();
currentSelVector->resetSelectorToValuePosBufferWithSize(1 /* size */);
localState = std::make_unique<FlattenLocalState>();
}

bool Flatten::getNextTuplesInternal(ExecutionContext* context) {
if (isCurrIdxInitialOrLast()) {
dataChunkToFlatten->state->setToUnflat();
restoreSelVector(dataChunkToFlatten->state->selVector);
if (localState->currentIdx == localState->sizeToFlatten) {
dataChunkState->setToUnflat(); // TODO(Xiyang): this should be part of restore/save
restoreSelVector(dataChunkState->selVector);
if (!children[0]->getNextTuple(context)) {
return false;
}
saveSelVector(dataChunkToFlatten->state->selVector);
localState->currentIdx = 0;
localState->sizeToFlatten = dataChunkState->selVector->selectedSize;
saveSelVector(dataChunkState->selVector);
dataChunkState->setToFlat();
}
dataChunkToFlatten->state->currIdx++;
currentSelVector->selectedPositions[0] =
prevSelVector->selectedPositions[dataChunkToFlatten->state->currIdx];
prevSelVector->selectedPositions[localState->currentIdx++];
metrics->numOutputTuple.incrementByOne();
return true;
}
Expand Down
7 changes: 7 additions & 0 deletions src/processor/operator/hash_join/hash_join_probe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ uint64_t HashJoinProbe::getLeftJoinResult() {
for (auto& vector : vectorsToReadInto) {
vector->setAsSingleNullEntry();
}
// TODO(Xiyang): We have a bug in LEFT JOIN which should not discard NULL keys. To be more
// clear, NULL keys should only be discarded for probe but should not reflect on the vector.
// The following for loop is a temporary hack.
for (auto& vector : keyVectors) {
assert(vector->state->isFlat());
vector->state->selVector->selectedSize = 1;
}
probeState->probedTuples[0] = nullptr;
}
return 1;
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/result_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void ResultCollector::finalize(ExecutionContext* context) {
for (auto i = 0u; i < tableSchema->getNumColumns(); ++i) {
auto columnSchema = tableSchema->getColumn(i);
if (columnSchema->isFlat()) {
payloadVectors[i]->state->currIdx = 0;
payloadVectors[i]->state->setToFlat();
}
}
if (table->isEmpty()) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/storage_structure/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ void Column::scan(
transaction::Transaction* transaction, ValueVector* nodeIDVector, ValueVector* resultVector) {
// In sequential read, we fetch start offset regardless of selected position.
auto startOffset = nodeIDVector->readNodeOffset(0);
uint64_t numValuesToRead = nodeIDVector->state->originalSize;
uint64_t numValuesToRead = nodeIDVector->state->getOriginalSize();
auto pageCursor = PageUtils::getPageElementCursorForPos(startOffset, numElementsPerPage);
auto numValuesRead = 0u;
auto posInSelVector = 0u;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/storage_structure/lists/lists.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ void Lists::readPropertyUpdatesToInMemListIfExists(InMemList& inMemList,

void ListPropertyLists::readListFromPages(
ValueVector* valueVector, ListHandle& listHandle, PageElementCursor& pageCursor) {
uint64_t numValuesToRead = valueVector->state->originalSize;
uint64_t numValuesToRead = valueVector->state->getOriginalSize();
uint64_t vectorPos = 0;
while (vectorPos != numValuesToRead) {
uint64_t numValuesInPage = numElementsPerPage - pageCursor.elemPosInPage;
Expand Down Expand Up @@ -355,7 +355,7 @@ void RelIDList::setDeletedRelsIfNecessary(
relIDVector->state->selVector->resetSelectorToValuePosBuffer();
auto& selVector = relIDVector->state->selVector;
auto nextSelectedPos = 0u;
for (auto pos = 0; pos < relIDVector->state->originalSize; ++pos) {
for (auto pos = 0; pos < relIDVector->state->getOriginalSize(); ++pos) {
auto relID = relIDVector->getValue<relID_t>(pos);
if (!listsUpdatesStore->isRelDeletedInPersistentStore(
storageStructureIDAndFName.storageStructureID.listFileID,
Expand Down
4 changes: 2 additions & 2 deletions src/storage/storage_structure/lists/lists_update_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ void ListsUpdatesStore::readValues(
.at(nodeOffset);
ftOfInsertedRels->lookup(vectorsToRead, columnsToRead, listUpdates->insertedRelsTupleIdxInFT,
listHandle.getStartElemOffset(), numTuplesToRead);
valueVector->state->originalSize = numTuplesToRead;
valueVector->state->setOriginalSize(numTuplesToRead);
}

bool ListsUpdatesStore::hasAnyDeletedRelsInPersistentStore(
Expand Down Expand Up @@ -246,7 +246,7 @@ void ListsUpdatesStore::readUpdatesToPropertyVectorIfExists(ListFileID& listFile
for (auto& [listOffset, ftTupleIdx] : updatedPersistentListOffsets.listOffsetFTIdxMap) {
if (startListOffset > listOffset) {
continue;
} else if (startListOffset + propertyVector->state->originalSize <= listOffset) {
} else if (startListOffset + propertyVector->state->getOriginalSize() <= listOffset) {

Check warning on line 249 in src/storage/storage_structure/lists/lists_update_store.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/storage_structure/lists/lists_update_store.cpp#L249

Added line #L249 was not covered by tests
return;
}
auto elemPosInVector = listOffset - startListOffset;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/storage_structure/storage_structure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ BaseColumnOrList::BaseColumnOrList(const StorageStructureIDAndFName& storageStru
void BaseColumnOrList::readBySequentialCopy(Transaction* transaction, ValueVector* vector,
PageElementCursor& cursor,
const std::function<page_idx_t(page_idx_t)>& logicalToPhysicalPageMapper) {
uint64_t numValuesToRead = vector->state->originalSize;
uint64_t numValuesToRead = vector->state->getOriginalSize();
uint64_t vectorPos = 0;
while (vectorPos != numValuesToRead) {
uint64_t numValuesInPage = numElementsPerPage - cursor.elemPosInPage;
Expand All @@ -52,7 +52,7 @@ void BaseColumnOrList::readInternalIDsBySequentialCopy(Transaction* transaction,
ValueVector* vector, PageElementCursor& cursor,
const std::function<page_idx_t(page_idx_t)>& logicalToPhysicalPageMapper,
table_id_t commonTableID, bool hasNoNullGuarantee) {
uint64_t numValuesToRead = vector->state->originalSize;
uint64_t numValuesToRead = vector->state->getOriginalSize();
uint64_t vectorPos = 0;
while (vectorPos != numValuesToRead) {
uint64_t numValuesInPage = numElementsPerPage - cursor.elemPosInPage;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/node_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ void NodeColumn::scanUnfiltered(Transaction* transaction, PageElementCursor& pag

void NodeColumn::scanFiltered(Transaction* transaction, PageElementCursor& pageCursor,
ValueVector* nodeIDVector, ValueVector* resultVector) {
auto numValuesToScan = nodeIDVector->state->originalSize;
auto numValuesToScan = nodeIDVector->state->getOriginalSize();
auto numValuesScanned = 0u;
auto posInSelVector = 0u;
while (numValuesScanned < numValuesToScan) {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/store/nodes_statistics_and_deleted_ids.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void NodeStatisticsAndDeletedIDs::setDeletedNodeOffsetsForMorsel(
auto itr = deletedNodeOffsets.begin();
sel_t nextDeletedNodeOffset = *itr - morselBeginOffset;
uint64_t nextSelectedPosition = 0;
for (sel_t pos = 0; pos < nodeOffsetVector->state->originalSize; ++pos) {
for (sel_t pos = 0; pos < nodeOffsetVector->state->getOriginalSize(); ++pos) {
if (pos == nextDeletedNodeOffset) {
itr++;
if (itr == deletedNodeOffsets.end()) {
Expand All @@ -102,7 +102,7 @@ void NodeStatisticsAndDeletedIDs::setDeletedNodeOffsetsForMorsel(
nodeOffsetVector->state->selVector->selectedPositions[nextSelectedPosition++] = pos;
}
nodeOffsetVector->state->selVector->selectedSize =
nodeOffsetVector->state->originalSize - deletedNodeOffsets.size();
nodeOffsetVector->state->getOriginalSize() - deletedNodeOffsets.size();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/var_list_node_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void VarListNodeColumn::scanInternal(
startNodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
auto listOffsetInfoInStorage =
getListOffsetInfoInStorage(transaction, nodeGroupIdx, startNodeOffsetInGroup,
startNodeOffsetInGroup + nodeIDVector->state->originalSize, resultVector->state);
startNodeOffsetInGroup + nodeIDVector->state->getOriginalSize(), resultVector->state);
if (resultVector->state->selVector->isUnfiltered()) {
scanUnfiltered(transaction, nodeGroupIdx, resultVector, listOffsetInfoInStorage);
} else {
Expand Down
1 change: 0 additions & 1 deletion test/transaction/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
add_kuzu_test(transaction_manager_test transaction_manager_test.cpp)
add_kuzu_test(transaction_test transaction_test.cpp)
Loading

0 comments on commit d917264

Please sign in to comment.