Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove currIdx from DataChunkState #1966

Merged
merged 1 commit into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/data_chunk/data_chunk_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace common {
std::shared_ptr<DataChunkState> DataChunkState::getSingleValueDataChunkState() {
auto state = std::make_shared<DataChunkState>(1);
state->initOriginalAndSelectedSize(1);
state->currIdx = 0;
state->setToFlat();
return state;
}

Expand Down
22 changes: 14 additions & 8 deletions src/include/common/data_chunk/data_chunk_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@
namespace kuzu {
namespace common {

enum class FactorizationStateType : uint8_t {
// F stands for Factorization
enum class FStateType : uint8_t {
FLAT = 0,
UNFLAT = 1,
};

class DataChunkState {
public:
DataChunkState() : DataChunkState(DEFAULT_VECTOR_CAPACITY) {}
explicit DataChunkState(uint64_t capacity) : currIdx{-1}, originalSize{0} {
explicit DataChunkState(uint64_t capacity) : fStateType{FStateType::UNFLAT}, originalSize{0} {
selVector = std::make_shared<SelectionVector>(capacity);
}

Expand All @@ -27,21 +28,26 @@ class DataChunkState {
originalSize = size;
selVector->selectedSize = size;
}
inline bool isFlat() const { return currIdx != -1; }
inline void setToUnflat() { currIdx = -1; }
inline uint64_t getNumSelectedValues() const { return isFlat() ? 1 : selVector->selectedSize; }
inline void setOriginalSize(uint64_t size) { originalSize = size; }
inline uint64_t getOriginalSize() { return originalSize; }
inline bool isFlat() const { return fStateType == FStateType::FLAT; }
inline void setToFlat() { fStateType = FStateType::FLAT; }
inline void setToUnflat() { fStateType = FStateType::UNFLAT; }

inline uint64_t getNumSelectedValues() const { return selVector->selectedSize; }

public:
// The currIdx is >= 0 when vectors are flattened and -1 if the vectors are unflat.
int64_t currIdx;
std::shared_ptr<SelectionVector> selVector;
andyfengHKU marked this conversation as resolved.
Show resolved Hide resolved

private:
FStateType fStateType;
// We need to keep track of originalSize of DataChunks to perform consistent scans of vectors
// or lists. This is because all the vectors in a data chunk has to be the same length as they
// share the same selectedPositions array.Therefore, if there is a scan after a filter on the
// data chunk, the selectedSize of selVector might decrease, so the scan cannot know how much it
// has to scan to generate a vector that is consistent with the rest of the vectors in the
// data chunk.
uint64_t originalSize;
std::shared_ptr<SelectionVector> selVector;
};

} // namespace common
Expand Down
18 changes: 10 additions & 8 deletions src/include/processor/operator/flatten.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@
namespace kuzu {
namespace processor {

struct FlattenLocalState {
uint64_t currentIdx = 0;
uint64_t sizeToFlatten = 0;
};

class Flatten : public PhysicalOperator, SelVectorOverWriter {
public:
Flatten(uint32_t dataChunkToFlattenPos, std::unique_ptr<PhysicalOperator> child, uint32_t id,
const std::string& paramsString)
Flatten(data_chunk_pos_t dataChunkToFlattenPos, std::unique_ptr<PhysicalOperator> child,
uint32_t id, const std::string& paramsString)
: PhysicalOperator{PhysicalOperatorType::FLATTEN, std::move(child), id, paramsString},
dataChunkToFlattenPos{dataChunkToFlattenPos} {}

Expand All @@ -22,15 +27,12 @@ class Flatten : public PhysicalOperator, SelVectorOverWriter {
}

private:
inline bool isCurrIdxInitialOrLast() {
return dataChunkToFlatten->state->currIdx == -1 ||
dataChunkToFlatten->state->currIdx == (prevSelVector->selectedSize - 1);
}
void resetToCurrentSelVector(std::shared_ptr<common::SelectionVector>& selVector) override;

private:
uint32_t dataChunkToFlattenPos;
std::shared_ptr<common::DataChunk> dataChunkToFlatten;
data_chunk_pos_t dataChunkToFlattenPos;
common::DataChunkState* dataChunkState;
std::unique_ptr<FlattenLocalState> localState;
};

} // namespace processor
Expand Down
11 changes: 5 additions & 6 deletions src/include/processor/operator/hash_join/hash_join_build.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,13 @@ class HashJoinBuildInfo {
friend class HashJoinBuild;

public:
HashJoinBuildInfo(std::vector<DataPos> keysPos,
std::vector<common::FactorizationStateType> factorizationStateTypes,
HashJoinBuildInfo(std::vector<DataPos> keysPos, std::vector<common::FStateType> fStateTypes,
std::vector<DataPos> payloadsPos, std::unique_ptr<FactorizedTableSchema> tableSchema)
: keysPos{std::move(keysPos)}, factorizationStateTypes{std::move(factorizationStateTypes)},
: keysPos{std::move(keysPos)}, fStateTypes{std::move(fStateTypes)},
payloadsPos{std::move(payloadsPos)}, tableSchema{std::move(tableSchema)} {}
HashJoinBuildInfo(const HashJoinBuildInfo& other)
: keysPos{other.keysPos}, factorizationStateTypes{other.factorizationStateTypes},
payloadsPos{other.payloadsPos}, tableSchema{other.tableSchema->copy()} {}
: keysPos{other.keysPos}, fStateTypes{other.fStateTypes}, payloadsPos{other.payloadsPos},
tableSchema{other.tableSchema->copy()} {}

inline uint32_t getNumKeys() const { return keysPos.size(); }

Expand All @@ -57,7 +56,7 @@ class HashJoinBuildInfo {

private:
std::vector<DataPos> keysPos;
std::vector<common::FactorizationStateType> factorizationStateTypes;
std::vector<common::FStateType> fStateTypes;
std::vector<DataPos> payloadsPos;
std::unique_ptr<FactorizedTableSchema> tableSchema;
};
Expand Down
12 changes: 6 additions & 6 deletions src/processor/map/map_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ std::unique_ptr<HashJoinBuildInfo> PlanMapper::createHashBuildInfo(
const Schema& buildSchema, const expression_vector& keys, const expression_vector& payloads) {
planner::f_group_pos_set keyGroupPosSet;
std::vector<DataPos> keysPos;
std::vector<FactorizationStateType> factorizationStateTypes;
std::vector<FStateType> fStateTypes;
std::vector<DataPos> payloadsPos;
auto tableSchema = std::make_unique<FactorizedTableSchema>();
for (auto& key : keys) {
Expand All @@ -25,9 +25,9 @@ std::unique_ptr<HashJoinBuildInfo> PlanMapper::createHashBuildInfo(
LogicalTypeUtils::getRowLayoutSize(key->dataType));
tableSchema->appendColumn(std::move(columnSchema));
keysPos.push_back(pos);
factorizationStateTypes.push_back(buildSchema.getGroup(pos.dataChunkPos)->isFlat() ?
FactorizationStateType::FLAT :
FactorizationStateType::UNFLAT);
fStateTypes.push_back(buildSchema.getGroup(pos.dataChunkPos)->isFlat() ?
FStateType::FLAT :
FStateType::UNFLAT);
}
for (auto& payload : payloads) {
auto pos = DataPos(buildSchema.getExpressionPos(*payload));
Expand All @@ -51,8 +51,8 @@ std::unique_ptr<HashJoinBuildInfo> PlanMapper::createHashBuildInfo(
auto pointerColumn = std::make_unique<ColumnSchema>(false /* isUnFlat */,
INVALID_DATA_CHUNK_POS, LogicalTypeUtils::getRowLayoutSize(pointerType));
tableSchema->appendColumn(std::move(pointerColumn));
return std::make_unique<HashJoinBuildInfo>(std::move(keysPos),
std::move(factorizationStateTypes), std::move(payloadsPos), std::move(tableSchema));
return std::make_unique<HashJoinBuildInfo>(
std::move(keysPos), std::move(fStateTypes), std::move(payloadsPos), std::move(tableSchema));
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapHashJoin(LogicalOperator* logicalOperator) {
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/aggregate/aggregate_hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ void AggregateHashTable::initializeHashTable(uint64_t numEntriesToAllocate) {

void AggregateHashTable::initializeTmpVectors() {
hashState = std::make_shared<DataChunkState>();
hashState->currIdx = 0;
hashState->setToFlat();
hashVector = std::make_unique<ValueVector>(LogicalTypeID::INT64, &memoryManager);
hashVector->state = hashState;
hashSlotsToUpdateAggState = std::make_unique<HashSlot*[]>(DEFAULT_VECTOR_CAPACITY);
Expand Down
17 changes: 10 additions & 7 deletions src/processor/operator/flatten.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,25 @@ namespace kuzu {
namespace processor {

void Flatten::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) {
dataChunkToFlatten = resultSet->dataChunks[dataChunkToFlattenPos];
dataChunkState = resultSet->dataChunks[dataChunkToFlattenPos]->state.get();
currentSelVector->resetSelectorToValuePosBufferWithSize(1 /* size */);
localState = std::make_unique<FlattenLocalState>();
}

bool Flatten::getNextTuplesInternal(ExecutionContext* context) {
if (isCurrIdxInitialOrLast()) {
dataChunkToFlatten->state->setToUnflat();
restoreSelVector(dataChunkToFlatten->state->selVector);
if (localState->currentIdx == localState->sizeToFlatten) {
dataChunkState->setToUnflat(); // TODO(Xiyang): this should be part of restore/save
restoreSelVector(dataChunkState->selVector);
if (!children[0]->getNextTuple(context)) {
return false;
}
saveSelVector(dataChunkToFlatten->state->selVector);
localState->currentIdx = 0;
localState->sizeToFlatten = dataChunkState->selVector->selectedSize;
saveSelVector(dataChunkState->selVector);
dataChunkState->setToFlat();
}
dataChunkToFlatten->state->currIdx++;
currentSelVector->selectedPositions[0] =
prevSelVector->selectedPositions[dataChunkToFlatten->state->currIdx];
prevSelVector->selectedPositions[localState->currentIdx++];
metrics->numOutputTuple.incrementByOne();
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/hash_join/hash_join_build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ void HashJoinBuild::initLocalStateInternal(ResultSet* resultSet, ExecutionContex
for (auto i = 0u; i < info->keysPos.size(); ++i) {
auto vector = resultSet->getValueVector(info->keysPos[i]).get();
keyTypes.push_back(vector->dataType.copy());
if (info->factorizationStateTypes[i] == common::FactorizationStateType::UNFLAT) {
if (info->fStateTypes[i] == common::FStateType::UNFLAT) {
setKeyState(vector->state.get());
}
keyVectors.push_back(vector);
Expand Down
7 changes: 7 additions & 0 deletions src/processor/operator/hash_join/hash_join_probe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ uint64_t HashJoinProbe::getLeftJoinResult() {
for (auto& vector : vectorsToReadInto) {
vector->setAsSingleNullEntry();
}
// TODO(Xiyang): We have a bug in LEFT JOIN which should not discard NULL keys. To be more
// clear, NULL keys should only be discarded for probe but should not reflect on the vector.
// The following for loop is a temporary hack.
andyfengHKU marked this conversation as resolved.
Show resolved Hide resolved
for (auto& vector : keyVectors) {
assert(vector->state->isFlat());
vector->state->selVector->selectedSize = 1;
}
probeState->probedTuples[0] = nullptr;
}
return 1;
Expand Down
2 changes: 1 addition & 1 deletion src/processor/operator/result_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void ResultCollector::finalize(ExecutionContext* context) {
for (auto i = 0u; i < tableSchema->getNumColumns(); ++i) {
auto columnSchema = tableSchema->getColumn(i);
if (columnSchema->isFlat()) {
payloadVectors[i]->state->currIdx = 0;
payloadVectors[i]->state->setToFlat();
}
}
if (table->isEmpty()) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/storage_structure/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ void Column::scan(
transaction::Transaction* transaction, ValueVector* nodeIDVector, ValueVector* resultVector) {
// In sequential read, we fetch start offset regardless of selected position.
auto startOffset = nodeIDVector->readNodeOffset(0);
uint64_t numValuesToRead = nodeIDVector->state->originalSize;
uint64_t numValuesToRead = nodeIDVector->state->getOriginalSize();
auto pageCursor = PageUtils::getPageElementCursorForPos(startOffset, numElementsPerPage);
auto numValuesRead = 0u;
auto posInSelVector = 0u;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/storage_structure/lists/lists.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ void Lists::readPropertyUpdatesToInMemListIfExists(InMemList& inMemList,

void ListPropertyLists::readListFromPages(
ValueVector* valueVector, ListHandle& listHandle, PageElementCursor& pageCursor) {
uint64_t numValuesToRead = valueVector->state->originalSize;
uint64_t numValuesToRead = valueVector->state->getOriginalSize();
uint64_t vectorPos = 0;
while (vectorPos != numValuesToRead) {
uint64_t numValuesInPage = numElementsPerPage - pageCursor.elemPosInPage;
Expand Down Expand Up @@ -355,7 +355,7 @@ void RelIDList::setDeletedRelsIfNecessary(
relIDVector->state->selVector->resetSelectorToValuePosBuffer();
auto& selVector = relIDVector->state->selVector;
auto nextSelectedPos = 0u;
for (auto pos = 0; pos < relIDVector->state->originalSize; ++pos) {
for (auto pos = 0; pos < relIDVector->state->getOriginalSize(); ++pos) {
auto relID = relIDVector->getValue<relID_t>(pos);
if (!listsUpdatesStore->isRelDeletedInPersistentStore(
storageStructureIDAndFName.storageStructureID.listFileID,
Expand Down
4 changes: 2 additions & 2 deletions src/storage/storage_structure/lists/lists_update_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@
.at(nodeOffset);
ftOfInsertedRels->lookup(vectorsToRead, columnsToRead, listUpdates->insertedRelsTupleIdxInFT,
listHandle.getStartElemOffset(), numTuplesToRead);
valueVector->state->originalSize = numTuplesToRead;
valueVector->state->setOriginalSize(numTuplesToRead);
}

bool ListsUpdatesStore::hasAnyDeletedRelsInPersistentStore(
Expand Down Expand Up @@ -246,7 +246,7 @@
for (auto& [listOffset, ftTupleIdx] : updatedPersistentListOffsets.listOffsetFTIdxMap) {
if (startListOffset > listOffset) {
continue;
} else if (startListOffset + propertyVector->state->originalSize <= listOffset) {
} else if (startListOffset + propertyVector->state->getOriginalSize() <= listOffset) {

Check warning on line 249 in src/storage/storage_structure/lists/lists_update_store.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/storage_structure/lists/lists_update_store.cpp#L249

Added line #L249 was not covered by tests
return;
}
auto elemPosInVector = listOffset - startListOffset;
Expand Down
4 changes: 2 additions & 2 deletions src/storage/storage_structure/storage_structure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ BaseColumnOrList::BaseColumnOrList(const StorageStructureIDAndFName& storageStru
void BaseColumnOrList::readBySequentialCopy(Transaction* transaction, ValueVector* vector,
PageElementCursor& cursor,
const std::function<page_idx_t(page_idx_t)>& logicalToPhysicalPageMapper) {
uint64_t numValuesToRead = vector->state->originalSize;
uint64_t numValuesToRead = vector->state->getOriginalSize();
uint64_t vectorPos = 0;
while (vectorPos != numValuesToRead) {
uint64_t numValuesInPage = numElementsPerPage - cursor.elemPosInPage;
Expand All @@ -52,7 +52,7 @@ void BaseColumnOrList::readInternalIDsBySequentialCopy(Transaction* transaction,
ValueVector* vector, PageElementCursor& cursor,
const std::function<page_idx_t(page_idx_t)>& logicalToPhysicalPageMapper,
table_id_t commonTableID, bool hasNoNullGuarantee) {
uint64_t numValuesToRead = vector->state->originalSize;
uint64_t numValuesToRead = vector->state->getOriginalSize();
uint64_t vectorPos = 0;
while (vectorPos != numValuesToRead) {
uint64_t numValuesInPage = numElementsPerPage - cursor.elemPosInPage;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/node_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ void NodeColumn::scanUnfiltered(Transaction* transaction, PageElementCursor& pag

void NodeColumn::scanFiltered(Transaction* transaction, PageElementCursor& pageCursor,
ValueVector* nodeIDVector, ValueVector* resultVector) {
auto numValuesToScan = nodeIDVector->state->originalSize;
auto numValuesToScan = nodeIDVector->state->getOriginalSize();
auto numValuesScanned = 0u;
auto posInSelVector = 0u;
while (numValuesScanned < numValuesToScan) {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/store/nodes_statistics_and_deleted_ids.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void NodeStatisticsAndDeletedIDs::setDeletedNodeOffsetsForMorsel(
auto itr = deletedNodeOffsets.begin();
sel_t nextDeletedNodeOffset = *itr - morselBeginOffset;
uint64_t nextSelectedPosition = 0;
for (sel_t pos = 0; pos < nodeOffsetVector->state->originalSize; ++pos) {
for (sel_t pos = 0; pos < nodeOffsetVector->state->getOriginalSize(); ++pos) {
if (pos == nextDeletedNodeOffset) {
itr++;
if (itr == deletedNodeOffsets.end()) {
Expand All @@ -102,7 +102,7 @@ void NodeStatisticsAndDeletedIDs::setDeletedNodeOffsetsForMorsel(
nodeOffsetVector->state->selVector->selectedPositions[nextSelectedPosition++] = pos;
}
nodeOffsetVector->state->selVector->selectedSize =
nodeOffsetVector->state->originalSize - deletedNodeOffsets.size();
nodeOffsetVector->state->getOriginalSize() - deletedNodeOffsets.size();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/var_list_node_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void VarListNodeColumn::scanInternal(
startNodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
auto listOffsetInfoInStorage =
getListOffsetInfoInStorage(transaction, nodeGroupIdx, startNodeOffsetInGroup,
startNodeOffsetInGroup + nodeIDVector->state->originalSize, resultVector->state);
startNodeOffsetInGroup + nodeIDVector->state->getOriginalSize(), resultVector->state);
if (resultVector->state->selVector->isUnfiltered()) {
scanUnfiltered(transaction, nodeGroupIdx, resultVector, listOffsetInfoInStorage);
} else {
Expand Down
1 change: 0 additions & 1 deletion test/transaction/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
add_kuzu_test(transaction_manager_test transaction_manager_test.cpp)
add_kuzu_test(transaction_test transaction_test.cpp)
Loading