diff --git a/src/common/data_chunk/data_chunk_state.cpp b/src/common/data_chunk/data_chunk_state.cpp index 261e1cfdc2..75acde78ad 100644 --- a/src/common/data_chunk/data_chunk_state.cpp +++ b/src/common/data_chunk/data_chunk_state.cpp @@ -6,7 +6,7 @@ namespace common { std::shared_ptr DataChunkState::getSingleValueDataChunkState() { auto state = std::make_shared(1); state->initOriginalAndSelectedSize(1); - state->currIdx = 0; + state->setToFlat(); return state; } diff --git a/src/include/common/data_chunk/data_chunk_state.h b/src/include/common/data_chunk/data_chunk_state.h index f13fdbc811..670811e9d9 100644 --- a/src/include/common/data_chunk/data_chunk_state.h +++ b/src/include/common/data_chunk/data_chunk_state.h @@ -8,7 +8,8 @@ namespace kuzu { namespace common { -enum class FactorizationStateType : uint8_t { +// F stands for Factorization +enum class FStateType : uint8_t { FLAT = 0, UNFLAT = 1, }; @@ -16,7 +17,7 @@ enum class FactorizationStateType : uint8_t { class DataChunkState { public: DataChunkState() : DataChunkState(DEFAULT_VECTOR_CAPACITY) {} - explicit DataChunkState(uint64_t capacity) : currIdx{-1}, originalSize{0} { + explicit DataChunkState(uint64_t capacity) : fStateType{FStateType::UNFLAT}, originalSize{0} { selVector = std::make_shared(capacity); } @@ -27,13 +28,19 @@ class DataChunkState { originalSize = size; selVector->selectedSize = size; } - inline bool isFlat() const { return currIdx != -1; } - inline void setToUnflat() { currIdx = -1; } - inline uint64_t getNumSelectedValues() const { return isFlat() ? 1 : selVector->selectedSize; } + inline void setOriginalSize(uint64_t size) { originalSize = size; } + inline uint64_t getOriginalSize() { return originalSize; } + inline bool isFlat() const { return fStateType == FStateType::FLAT; } + inline void setToFlat() { fStateType = FStateType::FLAT; } + inline void setToUnflat() { fStateType = FStateType::UNFLAT; } + + inline uint64_t getNumSelectedValues() const { return selVector->selectedSize; } public: - // The currIdx is >= 0 when vectors are flattened and -1 if the vectors are unflat. - int64_t currIdx; + std::shared_ptr selVector; + +private: + FStateType fStateType; // We need to keep track of originalSize of DataChunks to perform consistent scans of vectors // or lists. This is because all the vectors in a data chunk has to be the same length as they // share the same selectedPositions array.Therefore, if there is a scan after a filter on the @@ -41,7 +48,6 @@ class DataChunkState { // has to scan to generate a vector that is consistent with the rest of the vectors in the // data chunk. uint64_t originalSize; - std::shared_ptr selVector; }; } // namespace common diff --git a/src/include/processor/operator/flatten.h b/src/include/processor/operator/flatten.h index d02d2ff8ec..4b08bea8d4 100644 --- a/src/include/processor/operator/flatten.h +++ b/src/include/processor/operator/flatten.h @@ -6,10 +6,15 @@ namespace kuzu { namespace processor { +struct FlattenLocalState { + uint64_t currentIdx = 0; + uint64_t sizeToFlatten = 0; +}; + class Flatten : public PhysicalOperator, SelVectorOverWriter { public: - Flatten(uint32_t dataChunkToFlattenPos, std::unique_ptr child, uint32_t id, - const std::string& paramsString) + Flatten(data_chunk_pos_t dataChunkToFlattenPos, std::unique_ptr child, + uint32_t id, const std::string& paramsString) : PhysicalOperator{PhysicalOperatorType::FLATTEN, std::move(child), id, paramsString}, dataChunkToFlattenPos{dataChunkToFlattenPos} {} @@ -22,15 +27,12 @@ class Flatten : public PhysicalOperator, SelVectorOverWriter { } private: - inline bool isCurrIdxInitialOrLast() { - return dataChunkToFlatten->state->currIdx == -1 || - dataChunkToFlatten->state->currIdx == (prevSelVector->selectedSize - 1); - } void resetToCurrentSelVector(std::shared_ptr& selVector) override; private: - uint32_t dataChunkToFlattenPos; - std::shared_ptr dataChunkToFlatten; + data_chunk_pos_t dataChunkToFlattenPos; + common::DataChunkState* dataChunkState; + std::unique_ptr localState; }; } // namespace processor diff --git a/src/include/processor/operator/hash_join/hash_join_build.h b/src/include/processor/operator/hash_join/hash_join_build.h index f8034407de..6e1a903d68 100644 --- a/src/include/processor/operator/hash_join/hash_join_build.h +++ b/src/include/processor/operator/hash_join/hash_join_build.h @@ -38,14 +38,13 @@ class HashJoinBuildInfo { friend class HashJoinBuild; public: - HashJoinBuildInfo(std::vector keysPos, - std::vector factorizationStateTypes, + HashJoinBuildInfo(std::vector keysPos, std::vector fStateTypes, std::vector payloadsPos, std::unique_ptr tableSchema) - : keysPos{std::move(keysPos)}, factorizationStateTypes{std::move(factorizationStateTypes)}, + : keysPos{std::move(keysPos)}, fStateTypes{std::move(fStateTypes)}, payloadsPos{std::move(payloadsPos)}, tableSchema{std::move(tableSchema)} {} HashJoinBuildInfo(const HashJoinBuildInfo& other) - : keysPos{other.keysPos}, factorizationStateTypes{other.factorizationStateTypes}, - payloadsPos{other.payloadsPos}, tableSchema{other.tableSchema->copy()} {} + : keysPos{other.keysPos}, fStateTypes{other.fStateTypes}, payloadsPos{other.payloadsPos}, + tableSchema{other.tableSchema->copy()} {} inline uint32_t getNumKeys() const { return keysPos.size(); } @@ -57,7 +56,7 @@ class HashJoinBuildInfo { private: std::vector keysPos; - std::vector factorizationStateTypes; + std::vector fStateTypes; std::vector payloadsPos; std::unique_ptr tableSchema; }; diff --git a/src/processor/map/map_hash_join.cpp b/src/processor/map/map_hash_join.cpp index 84bb814145..a783f7b10c 100644 --- a/src/processor/map/map_hash_join.cpp +++ b/src/processor/map/map_hash_join.cpp @@ -14,7 +14,7 @@ std::unique_ptr PlanMapper::createHashBuildInfo( const Schema& buildSchema, const expression_vector& keys, const expression_vector& payloads) { planner::f_group_pos_set keyGroupPosSet; std::vector keysPos; - std::vector factorizationStateTypes; + std::vector fStateTypes; std::vector payloadsPos; auto tableSchema = std::make_unique(); for (auto& key : keys) { @@ -25,9 +25,9 @@ std::unique_ptr PlanMapper::createHashBuildInfo( LogicalTypeUtils::getRowLayoutSize(key->dataType)); tableSchema->appendColumn(std::move(columnSchema)); keysPos.push_back(pos); - factorizationStateTypes.push_back(buildSchema.getGroup(pos.dataChunkPos)->isFlat() ? - FactorizationStateType::FLAT : - FactorizationStateType::UNFLAT); + fStateTypes.push_back(buildSchema.getGroup(pos.dataChunkPos)->isFlat() ? + FStateType::FLAT : + FStateType::UNFLAT); } for (auto& payload : payloads) { auto pos = DataPos(buildSchema.getExpressionPos(*payload)); @@ -51,8 +51,8 @@ std::unique_ptr PlanMapper::createHashBuildInfo( auto pointerColumn = std::make_unique(false /* isUnFlat */, INVALID_DATA_CHUNK_POS, LogicalTypeUtils::getRowLayoutSize(pointerType)); tableSchema->appendColumn(std::move(pointerColumn)); - return std::make_unique(std::move(keysPos), - std::move(factorizationStateTypes), std::move(payloadsPos), std::move(tableSchema)); + return std::make_unique( + std::move(keysPos), std::move(fStateTypes), std::move(payloadsPos), std::move(tableSchema)); } std::unique_ptr PlanMapper::mapHashJoin(LogicalOperator* logicalOperator) { diff --git a/src/processor/operator/aggregate/aggregate_hash_table.cpp b/src/processor/operator/aggregate/aggregate_hash_table.cpp index c505644bb7..1ec732f0e9 100644 --- a/src/processor/operator/aggregate/aggregate_hash_table.cpp +++ b/src/processor/operator/aggregate/aggregate_hash_table.cpp @@ -182,7 +182,7 @@ void AggregateHashTable::initializeHashTable(uint64_t numEntriesToAllocate) { void AggregateHashTable::initializeTmpVectors() { hashState = std::make_shared(); - hashState->currIdx = 0; + hashState->setToFlat(); hashVector = std::make_unique(LogicalTypeID::INT64, &memoryManager); hashVector->state = hashState; hashSlotsToUpdateAggState = std::make_unique(DEFAULT_VECTOR_CAPACITY); diff --git a/src/processor/operator/flatten.cpp b/src/processor/operator/flatten.cpp index 89315eb697..d62bf42048 100644 --- a/src/processor/operator/flatten.cpp +++ b/src/processor/operator/flatten.cpp @@ -6,22 +6,25 @@ namespace kuzu { namespace processor { void Flatten::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { - dataChunkToFlatten = resultSet->dataChunks[dataChunkToFlattenPos]; + dataChunkState = resultSet->dataChunks[dataChunkToFlattenPos]->state.get(); currentSelVector->resetSelectorToValuePosBufferWithSize(1 /* size */); + localState = std::make_unique(); } bool Flatten::getNextTuplesInternal(ExecutionContext* context) { - if (isCurrIdxInitialOrLast()) { - dataChunkToFlatten->state->setToUnflat(); - restoreSelVector(dataChunkToFlatten->state->selVector); + if (localState->currentIdx == localState->sizeToFlatten) { + dataChunkState->setToUnflat(); // TODO(Xiyang): this should be part of restore/save + restoreSelVector(dataChunkState->selVector); if (!children[0]->getNextTuple(context)) { return false; } - saveSelVector(dataChunkToFlatten->state->selVector); + localState->currentIdx = 0; + localState->sizeToFlatten = dataChunkState->selVector->selectedSize; + saveSelVector(dataChunkState->selVector); + dataChunkState->setToFlat(); } - dataChunkToFlatten->state->currIdx++; currentSelVector->selectedPositions[0] = - prevSelVector->selectedPositions[dataChunkToFlatten->state->currIdx]; + prevSelVector->selectedPositions[localState->currentIdx++]; metrics->numOutputTuple.incrementByOne(); return true; } diff --git a/src/processor/operator/hash_join/hash_join_build.cpp b/src/processor/operator/hash_join/hash_join_build.cpp index 9f797a66b1..da8c13058f 100644 --- a/src/processor/operator/hash_join/hash_join_build.cpp +++ b/src/processor/operator/hash_join/hash_join_build.cpp @@ -16,7 +16,7 @@ void HashJoinBuild::initLocalStateInternal(ResultSet* resultSet, ExecutionContex for (auto i = 0u; i < info->keysPos.size(); ++i) { auto vector = resultSet->getValueVector(info->keysPos[i]).get(); keyTypes.push_back(vector->dataType.copy()); - if (info->factorizationStateTypes[i] == common::FactorizationStateType::UNFLAT) { + if (info->fStateTypes[i] == common::FStateType::UNFLAT) { setKeyState(vector->state.get()); } keyVectors.push_back(vector); diff --git a/src/processor/operator/hash_join/hash_join_probe.cpp b/src/processor/operator/hash_join/hash_join_probe.cpp index ed9f99ce6f..686cd15167 100644 --- a/src/processor/operator/hash_join/hash_join_probe.cpp +++ b/src/processor/operator/hash_join/hash_join_probe.cpp @@ -110,6 +110,13 @@ uint64_t HashJoinProbe::getLeftJoinResult() { for (auto& vector : vectorsToReadInto) { vector->setAsSingleNullEntry(); } + // TODO(Xiyang): We have a bug in LEFT JOIN which should not discard NULL keys. To be more + // clear, NULL keys should only be discarded for probe but should not reflect on the vector. + // The following for loop is a temporary hack. + for (auto& vector : keyVectors) { + assert(vector->state->isFlat()); + vector->state->selVector->selectedSize = 1; + } probeState->probedTuples[0] = nullptr; } return 1; diff --git a/src/processor/operator/result_collector.cpp b/src/processor/operator/result_collector.cpp index 63da3c302c..2950dad17a 100644 --- a/src/processor/operator/result_collector.cpp +++ b/src/processor/operator/result_collector.cpp @@ -39,7 +39,7 @@ void ResultCollector::finalize(ExecutionContext* context) { for (auto i = 0u; i < tableSchema->getNumColumns(); ++i) { auto columnSchema = tableSchema->getColumn(i); if (columnSchema->isFlat()) { - payloadVectors[i]->state->currIdx = 0; + payloadVectors[i]->state->setToFlat(); } } if (table->isEmpty()) { diff --git a/src/storage/storage_structure/column.cpp b/src/storage/storage_structure/column.cpp index b978f2573d..08c518d25b 100644 --- a/src/storage/storage_structure/column.cpp +++ b/src/storage/storage_structure/column.cpp @@ -123,7 +123,7 @@ void Column::scan( transaction::Transaction* transaction, ValueVector* nodeIDVector, ValueVector* resultVector) { // In sequential read, we fetch start offset regardless of selected position. auto startOffset = nodeIDVector->readNodeOffset(0); - uint64_t numValuesToRead = nodeIDVector->state->originalSize; + uint64_t numValuesToRead = nodeIDVector->state->getOriginalSize(); auto pageCursor = PageUtils::getPageElementCursorForPos(startOffset, numElementsPerPage); auto numValuesRead = 0u; auto posInSelVector = 0u; diff --git a/src/storage/storage_structure/lists/lists.cpp b/src/storage/storage_structure/lists/lists.cpp index a3213e0a42..425eec2b12 100644 --- a/src/storage/storage_structure/lists/lists.cpp +++ b/src/storage/storage_structure/lists/lists.cpp @@ -196,7 +196,7 @@ void Lists::readPropertyUpdatesToInMemListIfExists(InMemList& inMemList, void ListPropertyLists::readListFromPages( ValueVector* valueVector, ListHandle& listHandle, PageElementCursor& pageCursor) { - uint64_t numValuesToRead = valueVector->state->originalSize; + uint64_t numValuesToRead = valueVector->state->getOriginalSize(); uint64_t vectorPos = 0; while (vectorPos != numValuesToRead) { uint64_t numValuesInPage = numElementsPerPage - pageCursor.elemPosInPage; @@ -355,7 +355,7 @@ void RelIDList::setDeletedRelsIfNecessary( relIDVector->state->selVector->resetSelectorToValuePosBuffer(); auto& selVector = relIDVector->state->selVector; auto nextSelectedPos = 0u; - for (auto pos = 0; pos < relIDVector->state->originalSize; ++pos) { + for (auto pos = 0; pos < relIDVector->state->getOriginalSize(); ++pos) { auto relID = relIDVector->getValue(pos); if (!listsUpdatesStore->isRelDeletedInPersistentStore( storageStructureIDAndFName.storageStructureID.listFileID, diff --git a/src/storage/storage_structure/lists/lists_update_store.cpp b/src/storage/storage_structure/lists/lists_update_store.cpp index d9d374e724..33b93ca950 100644 --- a/src/storage/storage_structure/lists/lists_update_store.cpp +++ b/src/storage/storage_structure/lists/lists_update_store.cpp @@ -176,7 +176,7 @@ void ListsUpdatesStore::readValues( .at(nodeOffset); ftOfInsertedRels->lookup(vectorsToRead, columnsToRead, listUpdates->insertedRelsTupleIdxInFT, listHandle.getStartElemOffset(), numTuplesToRead); - valueVector->state->originalSize = numTuplesToRead; + valueVector->state->setOriginalSize(numTuplesToRead); } bool ListsUpdatesStore::hasAnyDeletedRelsInPersistentStore( @@ -246,7 +246,7 @@ void ListsUpdatesStore::readUpdatesToPropertyVectorIfExists(ListFileID& listFile for (auto& [listOffset, ftTupleIdx] : updatedPersistentListOffsets.listOffsetFTIdxMap) { if (startListOffset > listOffset) { continue; - } else if (startListOffset + propertyVector->state->originalSize <= listOffset) { + } else if (startListOffset + propertyVector->state->getOriginalSize() <= listOffset) { return; } auto elemPosInVector = listOffset - startListOffset; diff --git a/src/storage/storage_structure/storage_structure.cpp b/src/storage/storage_structure/storage_structure.cpp index e43fbf41c9..59a8f1fb50 100644 --- a/src/storage/storage_structure/storage_structure.cpp +++ b/src/storage/storage_structure/storage_structure.cpp @@ -35,7 +35,7 @@ BaseColumnOrList::BaseColumnOrList(const StorageStructureIDAndFName& storageStru void BaseColumnOrList::readBySequentialCopy(Transaction* transaction, ValueVector* vector, PageElementCursor& cursor, const std::function& logicalToPhysicalPageMapper) { - uint64_t numValuesToRead = vector->state->originalSize; + uint64_t numValuesToRead = vector->state->getOriginalSize(); uint64_t vectorPos = 0; while (vectorPos != numValuesToRead) { uint64_t numValuesInPage = numElementsPerPage - cursor.elemPosInPage; @@ -52,7 +52,7 @@ void BaseColumnOrList::readInternalIDsBySequentialCopy(Transaction* transaction, ValueVector* vector, PageElementCursor& cursor, const std::function& logicalToPhysicalPageMapper, table_id_t commonTableID, bool hasNoNullGuarantee) { - uint64_t numValuesToRead = vector->state->originalSize; + uint64_t numValuesToRead = vector->state->getOriginalSize(); uint64_t vectorPos = 0; while (vectorPos != numValuesToRead) { uint64_t numValuesInPage = numElementsPerPage - cursor.elemPosInPage; diff --git a/src/storage/store/node_column.cpp b/src/storage/store/node_column.cpp index 38ce595df3..71f94854d8 100644 --- a/src/storage/store/node_column.cpp +++ b/src/storage/store/node_column.cpp @@ -208,7 +208,7 @@ void NodeColumn::scanUnfiltered(Transaction* transaction, PageElementCursor& pag void NodeColumn::scanFiltered(Transaction* transaction, PageElementCursor& pageCursor, ValueVector* nodeIDVector, ValueVector* resultVector) { - auto numValuesToScan = nodeIDVector->state->originalSize; + auto numValuesToScan = nodeIDVector->state->getOriginalSize(); auto numValuesScanned = 0u; auto posInSelVector = 0u; while (numValuesScanned < numValuesToScan) { diff --git a/src/storage/store/nodes_statistics_and_deleted_ids.cpp b/src/storage/store/nodes_statistics_and_deleted_ids.cpp index d76c0aeb44..f165499bb0 100644 --- a/src/storage/store/nodes_statistics_and_deleted_ids.cpp +++ b/src/storage/store/nodes_statistics_and_deleted_ids.cpp @@ -87,7 +87,7 @@ void NodeStatisticsAndDeletedIDs::setDeletedNodeOffsetsForMorsel( auto itr = deletedNodeOffsets.begin(); sel_t nextDeletedNodeOffset = *itr - morselBeginOffset; uint64_t nextSelectedPosition = 0; - for (sel_t pos = 0; pos < nodeOffsetVector->state->originalSize; ++pos) { + for (sel_t pos = 0; pos < nodeOffsetVector->state->getOriginalSize(); ++pos) { if (pos == nextDeletedNodeOffset) { itr++; if (itr == deletedNodeOffsets.end()) { @@ -102,7 +102,7 @@ void NodeStatisticsAndDeletedIDs::setDeletedNodeOffsetsForMorsel( nodeOffsetVector->state->selVector->selectedPositions[nextSelectedPosition++] = pos; } nodeOffsetVector->state->selVector->selectedSize = - nodeOffsetVector->state->originalSize - deletedNodeOffsets.size(); + nodeOffsetVector->state->getOriginalSize() - deletedNodeOffsets.size(); } } diff --git a/src/storage/store/var_list_node_column.cpp b/src/storage/store/var_list_node_column.cpp index 0c7d8369ad..e96cada95f 100644 --- a/src/storage/store/var_list_node_column.cpp +++ b/src/storage/store/var_list_node_column.cpp @@ -57,7 +57,7 @@ void VarListNodeColumn::scanInternal( startNodeOffset - StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); auto listOffsetInfoInStorage = getListOffsetInfoInStorage(transaction, nodeGroupIdx, startNodeOffsetInGroup, - startNodeOffsetInGroup + nodeIDVector->state->originalSize, resultVector->state); + startNodeOffsetInGroup + nodeIDVector->state->getOriginalSize(), resultVector->state); if (resultVector->state->selVector->isUnfiltered()) { scanUnfiltered(transaction, nodeGroupIdx, resultVector, listOffsetInfoInStorage); } else { diff --git a/test/transaction/CMakeLists.txt b/test/transaction/CMakeLists.txt index 01d6f14c9c..3c07e48a44 100644 --- a/test/transaction/CMakeLists.txt +++ b/test/transaction/CMakeLists.txt @@ -1,2 +1 @@ add_kuzu_test(transaction_manager_test transaction_manager_test.cpp) -add_kuzu_test(transaction_test transaction_test.cpp) diff --git a/test/transaction/transaction_test.cpp b/test/transaction/transaction_test.cpp deleted file mode 100644 index 8bb434b9e3..0000000000 --- a/test/transaction/transaction_test.cpp +++ /dev/null @@ -1,278 +0,0 @@ -#include "common/constants.h" -#include "graph_test/graph_test.h" -#include "storage/storage_manager.h" -#include "storage/store/node_column.h" -#include "transaction/transaction_manager.h" - -using namespace kuzu::common; -using namespace kuzu::storage; -using namespace kuzu::testing; - -namespace kuzu { -namespace transaction { - -class TransactionTests : public DBTest { -public: - void SetUp() override { - DBTest::SetUp(); - initWithoutLoadingGraph(); - } - - std::string getInputDir() override { - return TestHelper::appendKuzuRootPath("dataset/tinysnb/"); - } - - void initWithoutLoadingGraph() { - systemConfig->bufferPoolSize = (1ull << 22); - // Note we do not actually use the connection field in these tests. We only need the - // database. - createDBAndConn(); - writeTrx = getTransactionManager(*database)->beginWriteTransaction(); - readTrx = getTransactionManager(*database)->beginReadOnlyTransaction(); - - table_id_t personTableID = - getCatalog(*database)->getReadOnlyVersion()->getTableID("person"); - uint32_t agePropertyID = getCatalog(*database) - ->getReadOnlyVersion() - ->getNodeProperty(personTableID, "age") - ->getPropertyID(); - uint32_t eyeSightPropertyID = getCatalog(*database) - ->getReadOnlyVersion() - ->getNodeProperty(personTableID, "eyeSight") - ->getPropertyID(); - - dataChunk = std::make_shared(3); - nodeVector = - std::make_shared(LogicalTypeID::INTERNAL_ID, getMemoryManager(*database)); - dataChunk->insert(0, nodeVector); - ((nodeID_t*)nodeVector->getData())[0].offset = 0; - ((nodeID_t*)nodeVector->getData())[1].offset = 1; - - agePropertyVectorToReadDataInto = - std::make_shared(LogicalTypeID::INT64, getMemoryManager(*database)); - dataChunk->insert(1, agePropertyVectorToReadDataInto); - eyeSightVectorToReadDataInto = - std::make_shared(LogicalTypeID::DOUBLE, getMemoryManager(*database)); - dataChunk->insert(2, eyeSightVectorToReadDataInto); - - personAgeColumn = getStorageManager(*database)->getNodesStore().getNodePropertyColumn( - personTableID, agePropertyID); - - personEyeSightColumn = getStorageManager(*database)->getNodesStore().getNodePropertyColumn( - personTableID, eyeSightPropertyID); - } - - void readAndAssertAgePropertyNode( - uint64_t nodeOffset, Transaction* trx, int64_t expectedValue, bool isNull) { - dataChunk->state->currIdx = nodeOffset; - dataChunk->state->selVector->resetSelectorToValuePosBuffer(); - dataChunk->state->selVector->selectedPositions[0] = nodeOffset; - dataChunk->state->selVector->selectedSize = 1; - personAgeColumn->lookup(trx, nodeVector.get(), agePropertyVectorToReadDataInto.get()); - if (isNull) { - ASSERT_TRUE(agePropertyVectorToReadDataInto->isNull(dataChunk->state->currIdx)); - } else { - ASSERT_FALSE(agePropertyVectorToReadDataInto->isNull(dataChunk->state->currIdx)); - ASSERT_EQ(expectedValue, - agePropertyVectorToReadDataInto->getValue(dataChunk->state->currIdx)); - } - } - - void readAndAssertEyeSightPropertyNode( - uint64_t nodeOffset, Transaction* trx, double expectedValue, bool isNull) { - dataChunk->state->currIdx = nodeOffset; - dataChunk->state->selVector->resetSelectorToValuePosBuffer(); - dataChunk->state->selVector->selectedPositions[0] = nodeOffset; - dataChunk->state->selVector->selectedSize = 1; - personEyeSightColumn->lookup(trx, nodeVector.get(), eyeSightVectorToReadDataInto.get()); - if (isNull) { - ASSERT_TRUE(eyeSightVectorToReadDataInto->isNull(dataChunk->state->currIdx)); - } else { - ASSERT_FALSE(eyeSightVectorToReadDataInto->isNull(dataChunk->state->currIdx)); - ASSERT_EQ(expectedValue, - eyeSightVectorToReadDataInto->getValue(dataChunk->state->currIdx)); - } - } - - void writeToAgePropertyNode(uint64_t nodeOffset, int64_t expectedValue, bool isNull) { - dataChunk->state->currIdx = nodeOffset; - dataChunk->state->selVector->resetSelectorToValuePosBuffer(); - dataChunk->state->selVector->selectedPositions[0] = nodeOffset; - dataChunk->state->selVector->selectedSize = 1; - auto propertyVectorToWriteDataTo = - std::make_shared(LogicalTypeID::INT64, getMemoryManager(*database)); - propertyVectorToWriteDataTo->state = dataChunk->state; - if (isNull) { - propertyVectorToWriteDataTo->setNull(dataChunk->state->currIdx, true /* is null - */); - } else { - propertyVectorToWriteDataTo->setNull( - dataChunk->state->currIdx, false /* is not null */); - propertyVectorToWriteDataTo->setValue( - dataChunk->state->currIdx, (uint64_t)expectedValue); - } - personAgeColumn->write(nodeVector.get(), propertyVectorToWriteDataTo.get()); - } - - void writeToEyeSightPropertyNode(uint64_t nodeOffset, double expectedValue, bool isNull) { - dataChunk->state->currIdx = nodeOffset; - dataChunk->state->selVector->resetSelectorToValuePosBuffer(); - dataChunk->state->selVector->selectedPositions[0] = nodeOffset; - dataChunk->state->selVector->selectedSize = 1; - auto propertyVectorToWriteDataTo = - std::make_shared(LogicalTypeID::DOUBLE, getMemoryManager(*database)); - propertyVectorToWriteDataTo->state = dataChunk->state; - if (isNull) { - propertyVectorToWriteDataTo->setNull(dataChunk->state->currIdx, true /* is null - */); - } else { - propertyVectorToWriteDataTo->setNull( - dataChunk->state->currIdx, false /* is not null */); - propertyVectorToWriteDataTo->setValue( - dataChunk->state->currIdx, (double_t)expectedValue); - } - personEyeSightColumn->write(nodeVector.get(), propertyVectorToWriteDataTo.get()); - } - - void assertOriginalAgeAndEyeSightPropertiesForNodes0And1(Transaction* transaction) { - readAndAssertAgePropertyNode(0 /* node offset */, transaction, 35, false /* is not null */); - readAndAssertAgePropertyNode(1 /* node offset */, transaction, 30, false /* is not null */); - readAndAssertEyeSightPropertyNode( - 0 /* node offset */, transaction, 5.0, false /* is not null */); - readAndAssertEyeSightPropertyNode( - 1 /* node offset */, transaction, 5.1, false /* is not null */); - } - - void updateAgeAndEyeSightPropertiesForNodes0And1() { - // Change the age value of node 0 to 70 and 1 null; - writeToAgePropertyNode(0, 70, false /* is not null */); - writeToAgePropertyNode(1, 124124 /* this argument is ignored */, true /* is null */); - - // Change the value of eyeSight property for node 0 to 0.4 and 1 to null; - writeToEyeSightPropertyNode(0 /* node offset */, 0.4, false /* is not null */); - writeToEyeSightPropertyNode(1 /* node offset */, 3.4, true /* is null */); - } - - void assertUpdatedAgeAndEyeSightPropertiesForNodes0And1(Transaction* transaction) { - readAndAssertAgePropertyNode(0 /* node offset */, transaction, 70, false /* is not null */); - readAndAssertAgePropertyNode(1 /* node offset */, transaction, - 1232532 /* this argument is ignored */, true /* is null */); - readAndAssertEyeSightPropertyNode( - 0 /* node offset */, transaction, 0.4, false /* is not null */); - readAndAssertEyeSightPropertyNode( - 1 /* node offset */, transaction, -423.12 /* this is ignored */, true /* is null */); - } - - void assertReadBehaviorForBeforeRollbackAndCommitForConcurrent1Write1ReadTransactionTest() { - assertOriginalAgeAndEyeSightPropertiesForNodes0And1(writeTrx.get()); - assertOriginalAgeAndEyeSightPropertiesForNodes0And1(readTrx.get()); - - updateAgeAndEyeSightPropertiesForNodes0And1(); - - assertUpdatedAgeAndEyeSightPropertiesForNodes0And1(writeTrx.get()); - assertOriginalAgeAndEyeSightPropertiesForNodes0And1(readTrx.get()); - } - - void testRecovery(bool committingTransaction) { - assertReadBehaviorForBeforeRollbackAndCommitForConcurrent1Write1ReadTransactionTest(); - getTransactionManager(*database)->commit(readTrx.get()); - commitAndCheckpointOrRollback( - *database, writeTrx.get(), committingTransaction, true /* skip checkpointing */); - readTrx = getTransactionManager(*database)->beginReadOnlyTransaction(); - // We next destroy and reconstruct the database to start up the system. - // initWithoutLoadingGraph will construct a completely new databse, writeTrx, readTrx, - // and personAgeColumn, and personEyeSightColumn classes. We could use completely - // different variables here but we would need to duplicate this construction code. - initWithoutLoadingGraph(); - if (committingTransaction) { - assertUpdatedAgeAndEyeSightPropertiesForNodes0And1(writeTrx.get()); - assertUpdatedAgeAndEyeSightPropertiesForNodes0And1(readTrx.get()); - } else { - assertOriginalAgeAndEyeSightPropertiesForNodes0And1(writeTrx.get()); - assertOriginalAgeAndEyeSightPropertiesForNodes0And1(readTrx.get()); - } - } - -public: - std::unique_ptr writeTrx; - std::unique_ptr readTrx; - std::shared_ptr dataChunk; - std::shared_ptr nodeVector; - std::shared_ptr agePropertyVectorToReadDataInto; - std::shared_ptr eyeSightVectorToReadDataInto; - NodeColumn* personAgeColumn; - NodeColumn* personEyeSightColumn; -}; - -TEST_F(TransactionTests, Concurrent1Write1ReadTransactionInTheMiddleOfTransaction) { - assertReadBehaviorForBeforeRollbackAndCommitForConcurrent1Write1ReadTransactionTest(); -} - -TEST_F(TransactionTests, Concurrent1Write1ReadTransactionCommitAndCheckpoint) { - assertReadBehaviorForBeforeRollbackAndCommitForConcurrent1Write1ReadTransactionTest(); - - // We need to commit the read transaction because commitAndCheckpointOrRollback requires all - // read transactions to leave the system. - getTransactionManager(*database)->commit(readTrx.get()); - commitAndCheckpointOrRollback(*database, writeTrx.get(), true /* isCommit */); - - // At this point, the write transaction no longer is valid and is not registered in the - // TransactionManager, so in normal operation, we would need to create new transactions. - // Although this is not needed, we still do it. This is not needed because this test directly - // accesses the columns with a transaction and the storage_structures assume that the given - // transaction is active. - writeTrx = getTransactionManager(*database)->beginWriteTransaction(); - readTrx = getTransactionManager(*database)->beginReadOnlyTransaction(); - - assertUpdatedAgeAndEyeSightPropertiesForNodes0And1(writeTrx.get()); - assertUpdatedAgeAndEyeSightPropertiesForNodes0And1(readTrx.get()); -} - -// TODO(Semih): Later when we make these tests go through connection.cpp or write new tests that -// test the same functionality but in an end-to-end manner, check that the WAL is cleared and the -// active write transaction is removed from the transaction manager etc (e.g., that you can start a -// new transaction). -TEST_F(TransactionTests, OpenReadOnlyTransactionTriggersTimeoutErrorForWriteTransaction) { - // Note that TransactionTests starts 1 read and 1 write transaction by default. - getTransactionManager(*database)->setCheckPointWaitTimeoutForTransactionsToLeaveInMicros( - 10000 /* 10ms */); - updateAgeAndEyeSightPropertiesForNodes0And1(); - try { - commitAndCheckpointOrRollback(*database, writeTrx.get(), true /* isCommit */); - FAIL(); - } catch (TransactionManagerException e) { - } catch (Exception& e) { FAIL(); } - assertOriginalAgeAndEyeSightPropertiesForNodes0And1(readTrx.get()); -} - -TEST_F(TransactionTests, Concurrent1Write1ReadTransactionRollback) { - assertReadBehaviorForBeforeRollbackAndCommitForConcurrent1Write1ReadTransactionTest(); - - commitAndCheckpointOrRollback(*database, writeTrx.get(), false /* rollback */); - - // See the comment inside Concurrent1Write1ReadTransactionCommitAndCheckpoint for why we create - // these new transactions - writeTrx = getTransactionManager(*database)->beginWriteTransaction(); - readTrx = getTransactionManager(*database)->beginReadOnlyTransaction(); - - assertOriginalAgeAndEyeSightPropertiesForNodes0And1(writeTrx.get()); - assertOriginalAgeAndEyeSightPropertiesForNodes0And1(readTrx.get()); -} - -TEST_F(TransactionTests, RecoverFromCommittedTransaction) { - testRecovery(true /* commit the transaction */); -} - -TEST_F(TransactionTests, RecoverFromUncommittedTransaction) { - testRecovery(false /* do not commit the transaction */); -} - -TEST_F(TransactionTests, ExecuteWriteQueryInReadOnlyTrx) { - conn->beginReadOnlyTransaction(); - auto result = conn->query("CREATE (p:person {ID: 20})"); - ASSERT_EQ( - result->getErrorMessage(), "Can't execute a write query inside a read-only transaction."); -} - -} // namespace transaction -} // namespace kuzu