From b77cd1d6f64e80dcc3603e363ab62095a1e10905 Mon Sep 17 00:00:00 2001 From: xiyang Date: Thu, 4 Apr 2024 13:55:45 -0400 Subject: [PATCH] Add distinct aggregate over node and relationships --- .../bind_function_expression.cpp | 5 - src/common/vector/value_vector.cpp | 2 +- src/include/binder/expression/expression.h | 26 ++- src/include/function/aggregate_function.h | 26 ++- .../planner/operator/logical_aggregate.h | 57 +++--- .../planner/operator/logical_operator.h | 6 +- .../operator/aggregate/aggregate_hash_table.h | 45 ++--- .../operator/aggregate/aggregate_input.h | 30 ++-- .../operator/aggregate/base_aggregate.h | 14 +- .../operator/aggregate/hash_aggregate.h | 22 +-- .../operator/aggregate/simple_aggregate.h | 8 +- src/include/processor/plan_mapper.h | 25 ++- .../processor/result/base_hash_table.h | 2 +- .../processor/result/mark_hash_table.h | 9 +- .../agg_key_dependency_optimizer.cpp | 6 +- src/planner/operator/logical_aggregate.cpp | 22 +-- src/processor/map/map_accumulate.cpp | 2 +- src/processor/map/map_aggregate.cpp | 164 ++++++++++-------- src/processor/map/map_distinct.cpp | 19 +- src/processor/map/map_mark_accumulate.cpp | 21 +-- src/processor/map/plan_mapper.cpp | 4 +- .../aggregate/aggregate_hash_table.cpp | 148 ++++++++++------ .../operator/aggregate/base_aggregate.cpp | 26 +-- .../operator/aggregate/hash_aggregate.cpp | 21 ++- .../operator/aggregate/simple_aggregate.cpp | 26 ++- src/processor/result/mark_hash_table.cpp | 10 +- .../ldbc-interactive/interactive-complex.test | 19 +- test/test_files/tinysnb/agg/distinct_agg.test | 55 ++++++ 28 files changed, 470 insertions(+), 350 deletions(-) diff --git a/src/binder/bind_expression/bind_function_expression.cpp b/src/binder/bind_expression/bind_function_expression.cpp index 3b63cb759de..bb26efa1d21 100644 --- a/src/binder/bind_expression/bind_function_expression.cpp +++ b/src/binder/bind_expression/bind_function_expression.cpp @@ -129,11 +129,6 @@ std::shared_ptr ExpressionBinder::bindAggregateFunctionExpression( expression_vector children; for (auto i = 0u; i < parsedExpression.getNumChildren(); ++i) { auto child = bindExpression(*parsedExpression.getChild(i)); - auto childTypeID = child->dataType.getLogicalTypeID(); - if (isDistinct && - (childTypeID == LogicalTypeID::NODE || childTypeID == LogicalTypeID::REL)) { - throw BinderException{"DISTINCT is not supported for NODE or REL type."}; - } childrenTypes.push_back(child->dataType); children.push_back(std::move(child)); } diff --git a/src/common/vector/value_vector.cpp b/src/common/vector/value_vector.cpp index 78491f972e1..06ebea14237 100644 --- a/src/common/vector/value_vector.cpp +++ b/src/common/vector/value_vector.cpp @@ -14,7 +14,7 @@ ValueVector::ValueVector(LogicalType dataType, storage::MemoryManager* memoryMan : dataType{std::move(dataType)} { if (this->dataType.getLogicalTypeID() == LogicalTypeID::ANY) { // LCOV_EXCL_START - // Alternatively we can assign + // Alternatively we can assign a default type here but I don't think it's a good practice. throw RuntimeException("Trying to a create a vector with ANY type. This should not happen. " "Data type is expected to be resolved during binding."); // LCOV_EXCL_STOP diff --git a/src/include/binder/expression/expression.h b/src/include/binder/expression/expression.h index a8fc4a27111..268205d30b7 100644 --- a/src/include/binder/expression/expression.h +++ b/src/include/binder/expression/expression.h @@ -6,6 +6,7 @@ #include #include "common/assert.h" +#include "common/cast.h" #include "common/copy_constructors.h" #include "common/enums/expression_type.h" #include "common/exception/internal.h" @@ -65,21 +66,19 @@ class Expression : public std::enable_shared_from_this { common::LogicalType getDataType() const { return dataType; } common::LogicalType& getDataTypeReference() { return dataType; } - inline bool hasAlias() const { return !alias.empty(); } - inline std::string getAlias() const { return alias; } + bool hasAlias() const { return !alias.empty(); } + std::string getAlias() const { return alias; } - inline uint32_t getNumChildren() const { return children.size(); } - inline std::shared_ptr getChild(common::vector_idx_t idx) const { - return children[idx]; - } - inline expression_vector getChildren() const { return children; } - inline void setChild(common::vector_idx_t idx, std::shared_ptr child) { + uint32_t getNumChildren() const { return children.size(); } + std::shared_ptr getChild(common::vector_idx_t idx) const { return children[idx]; } + expression_vector getChildren() const { return children; } + void setChild(common::vector_idx_t idx, std::shared_ptr child) { children[idx] = std::move(child); } expression_vector splitOnAND(); - inline bool operator==(const Expression& rhs) const { return uniqueName == rhs.uniqueName; } + bool operator==(const Expression& rhs) const { return uniqueName == rhs.uniqueName; } std::string toString() const { return hasAlias() ? alias : toStringInternal(); } @@ -87,6 +86,15 @@ class Expression : public std::enable_shared_from_this { throw common::InternalException("Unimplemented expression copy()."); } + template + const TARGET* safePtrCast() const { + return common::ku_dynamic_cast(this); + } + template + TARGET* unsafePtrCast() { + return common::ku_dynamic_cast(this); + } + protected: virtual std::string toStringInternal() const = 0; diff --git a/src/include/function/aggregate_function.h b/src/include/function/aggregate_function.h index 864afbee2c6..c6a71fc358e 100644 --- a/src/include/function/aggregate_function.h +++ b/src/include/function/aggregate_function.h @@ -10,7 +10,7 @@ namespace kuzu { namespace function { struct AggregateState { - virtual inline uint32_t getStateSize() const = 0; + virtual uint32_t getStateSize() const = 0; virtual void moveResultToVector(common::ValueVector* outputVector, uint64_t pos) = 0; virtual ~AggregateState() = default; @@ -52,45 +52,41 @@ struct AggregateFunction final : public BaseScalarFunction { std::move(combineFunc), std::move(finalizeFunc), isDistinct, nullptr /* bindFunc */, std::move(paramRewriteFunc)} {} - inline uint32_t getAggregateStateSize() const { - return initialNullAggregateState->getStateSize(); - } + uint32_t getAggregateStateSize() const { return initialNullAggregateState->getStateSize(); } // NOLINTNEXTLINE(readability-make-member-function-const): Returns a non-const pointer. - inline AggregateState* getInitialNullAggregateState() { - return initialNullAggregateState.get(); - } + AggregateState* getInitialNullAggregateState() { return initialNullAggregateState.get(); } - inline std::unique_ptr createInitialNullAggregateState() const { + std::unique_ptr createInitialNullAggregateState() const { return initializeFunc(); } - inline void updateAllState(uint8_t* state, common::ValueVector* input, uint64_t multiplicity, + void updateAllState(uint8_t* state, common::ValueVector* input, uint64_t multiplicity, storage::MemoryManager* memoryManager) const { return updateAllFunc(state, input, multiplicity, memoryManager); } - inline void updatePosState(uint8_t* state, common::ValueVector* input, uint64_t multiplicity, + void updatePosState(uint8_t* state, common::ValueVector* input, uint64_t multiplicity, uint32_t pos, storage::MemoryManager* memoryManager) const { return updatePosFunc(state, input, multiplicity, pos, memoryManager); } - inline void combineState(uint8_t* state, uint8_t* otherState, + void combineState(uint8_t* state, uint8_t* otherState, storage::MemoryManager* memoryManager) const { return combineFunc(state, otherState, memoryManager); } - inline void finalizeState(uint8_t* state) const { return finalizeFunc(state); } + void finalizeState(uint8_t* state) const { return finalizeFunc(state); } - inline bool isFunctionDistinct() const { return isDistinct; } + bool isFunctionDistinct() const { return isDistinct; } - inline std::unique_ptr copy() const override { + std::unique_ptr copy() const override { return std::make_unique(name, parameterTypeIDs, returnTypeID, initializeFunc, updateAllFunc, updatePosFunc, combineFunc, finalizeFunc, isDistinct, bindFunc, paramRewriteFunc); } - inline std::unique_ptr clone() const { + std::unique_ptr clone() const { return std::make_unique(name, parameterTypeIDs, returnTypeID, initializeFunc, updateAllFunc, updatePosFunc, combineFunc, finalizeFunc, isDistinct, bindFunc, paramRewriteFunc); diff --git a/src/include/planner/operator/logical_aggregate.h b/src/include/planner/operator/logical_aggregate.h index a5e4e700c2f..50a4c3a980d 100644 --- a/src/include/planner/operator/logical_aggregate.h +++ b/src/include/planner/operator/logical_aggregate.h @@ -7,18 +7,14 @@ namespace planner { class LogicalAggregate : public LogicalOperator { public: - LogicalAggregate(binder::expression_vector keyExpressions, - binder::expression_vector aggregateExpressions, std::shared_ptr child) - : LogicalOperator{LogicalOperatorType::AGGREGATE, std::move(child)}, - keyExpressions{std::move(keyExpressions)}, - aggregateExpressions{std::move(aggregateExpressions)} {} - LogicalAggregate(binder::expression_vector keyExpressions, - binder::expression_vector dependentKeyExpressions, - binder::expression_vector aggregateExpressions, std::shared_ptr child) - : LogicalOperator{LogicalOperatorType::AGGREGATE, std::move(child)}, - keyExpressions{std::move(keyExpressions)}, - dependentKeyExpressions{std::move(dependentKeyExpressions)}, - aggregateExpressions{std::move(aggregateExpressions)} {} + LogicalAggregate(binder::expression_vector keys, binder::expression_vector aggregates, + std::shared_ptr child) + : LogicalOperator{LogicalOperatorType::AGGREGATE, std::move(child)}, keys{std::move(keys)}, + aggregates{std::move(aggregates)} {} + LogicalAggregate(binder::expression_vector keys, binder::expression_vector dependentKeys, + binder::expression_vector aggregates, std::shared_ptr child) + : LogicalOperator{LogicalOperatorType::AGGREGATE, std::move(child)}, keys{std::move(keys)}, + dependentKeys{std::move(dependentKeys)}, aggregates{std::move(aggregates)} {} void computeFactorizedSchema() override; void computeFlatSchema() override; @@ -28,30 +24,23 @@ class LogicalAggregate : public LogicalOperator { std::string getExpressionsForPrinting() const override; - inline bool hasKeyExpressions() const { return !keyExpressions.empty(); } - inline binder::expression_vector getKeyExpressions() const { return keyExpressions; } - inline void setKeyExpressions(binder::expression_vector expressions) { - keyExpressions = std::move(expressions); + bool hasKeys() const { return !keys.empty(); } + binder::expression_vector getKeys() const { return keys; } + void setKeys(binder::expression_vector expressions) { keys = std::move(expressions); } + binder::expression_vector getDependentKeys() const { return dependentKeys; } + void setDependentKeys(binder::expression_vector expressions) { + dependentKeys = std::move(expressions); } - inline binder::expression_vector getDependentKeyExpressions() const { - return dependentKeyExpressions; - } - inline void setDependentKeyExpressions(binder::expression_vector expressions) { - dependentKeyExpressions = std::move(expressions); - } - inline binder::expression_vector getAllKeyExpressions() const { + binder::expression_vector getAllKeys() const { binder::expression_vector result; - result.insert(result.end(), keyExpressions.begin(), keyExpressions.end()); - result.insert(result.end(), dependentKeyExpressions.begin(), dependentKeyExpressions.end()); + result.insert(result.end(), keys.begin(), keys.end()); + result.insert(result.end(), dependentKeys.begin(), dependentKeys.end()); return result; } - inline binder::expression_vector getAggregateExpressions() const { - return aggregateExpressions; - } + binder::expression_vector getAggregates() const { return aggregates; } - inline std::unique_ptr copy() override { - return make_unique(keyExpressions, dependentKeyExpressions, - aggregateExpressions, children[0]->copy()); + std::unique_ptr copy() override { + return make_unique(keys, dependentKeys, aggregates, children[0]->copy()); } private: @@ -59,11 +48,11 @@ class LogicalAggregate : public LogicalOperator { void insertAllExpressionsToGroupAndScope(f_group_pos groupPos); private: - binder::expression_vector keyExpressions; + binder::expression_vector keys; // A dependentKeyExpression depend on a keyExpression (e.g. a.age depends on a.ID) and will not // be treated as a hash key during hash aggregation. - binder::expression_vector dependentKeyExpressions; - binder::expression_vector aggregateExpressions; + binder::expression_vector dependentKeys; + binder::expression_vector aggregates; }; } // namespace planner diff --git a/src/include/planner/operator/logical_operator.h b/src/include/planner/operator/logical_operator.h index fc98ac5d55d..cd9de618a85 100644 --- a/src/include/planner/operator/logical_operator.h +++ b/src/include/planner/operator/logical_operator.h @@ -105,7 +105,11 @@ class LogicalOperator { static logical_op_vector_t copy(const logical_op_vector_t& ops); template - TARGET* ptrCast() { + const TARGET* safePtrCast() const { + return common::ku_dynamic_cast(this); + } + template + TARGET* unsafePtrCast() { return common::ku_dynamic_cast(this); } diff --git a/src/include/processor/operator/aggregate/aggregate_hash_table.h b/src/include/processor/operator/aggregate/aggregate_hash_table.h index e918621d47e..49f15550c99 100644 --- a/src/include/processor/operator/aggregate/aggregate_hash_table.h +++ b/src/include/processor/operator/aggregate/aggregate_hash_table.h @@ -40,19 +40,20 @@ using update_agg_function_t = class AggregateHashTable : public BaseHashTable { public: - // Used by distinct aggregate hash table only. AggregateHashTable(storage::MemoryManager& memoryManager, - const common::logical_type_vec_t& keysDataTypes, - const std::vector>& aggregateFunctions, - uint64_t numEntriesToAllocate, std::unique_ptr tableSchema) - : AggregateHashTable(memoryManager, keysDataTypes, std::vector(), - aggregateFunctions, numEntriesToAllocate, std::move(tableSchema)) {} + const std::vector& keyTypes, + const std::vector& payloadTypes, uint64_t numEntriesToAllocate, + std::unique_ptr tableSchema) + : AggregateHashTable(memoryManager, keyTypes, payloadTypes, + std::vector>{} /* empty aggregates */, + std::vector{} /* empty distinct agg key*/, numEntriesToAllocate, + std::move(tableSchema)) {} AggregateHashTable(storage::MemoryManager& memoryManager, - std::vector keysDataTypes, - std::vector payloadsDataTypes, + std::vector keyTypes, std::vector payloadTypes, const std::vector>& aggregateFunctions, - uint64_t numEntriesToAllocate, std::unique_ptr tableSchema); + const std::vector& distinctAggKeyTypes, uint64_t numEntriesToAllocate, + std::unique_ptr tableSchema); uint8_t* getEntry(uint64_t idx) { return factorizedTable->getTuple(idx); } @@ -62,8 +63,7 @@ class AggregateHashTable : public BaseHashTable { void append(const std::vector& flatKeyVectors, const std::vector& unFlatKeyVectors, - common::DataChunkState* leadingState, - const std::vector>& aggregateInputs, + common::DataChunkState* leadingState, const std::vector& aggregateInputs, uint64_t resultSetMultiplicity) { append(flatKeyVectors, unFlatKeyVectors, std::vector(), leadingState, aggregateInputs, resultSetMultiplicity); @@ -73,8 +73,7 @@ class AggregateHashTable : public BaseHashTable { void append(const std::vector& flatKeyVectors, const std::vector& unFlatKeyVectors, const std::vector& dependentKeyVectors, - common::DataChunkState* leadingState, - const std::vector>& aggregateInputs, + common::DataChunkState* leadingState, const std::vector& aggregateInputs, uint64_t resultSetMultiplicity); bool isAggregateValueDistinctForGroupByKeys( @@ -152,8 +151,7 @@ class AggregateHashTable : public BaseHashTable { void updateAggStates(const std::vector& flatKeyVectors, const std::vector& unFlatKeyVectors, - const std::vector>& aggregateInputs, - uint64_t resultSetMultiplicity); + const std::vector& aggregateInputs, uint64_t resultSetMultiplicity); // ! This function will only be used by distinct aggregate, which assumes that all keyVectors // are flat. @@ -217,7 +215,7 @@ class AggregateHashTable : public BaseHashTable { std::unique_ptr hashSlotsToUpdateAggState; private: - std::vector dependentKeyDataTypes; + std::vector payloadTypes; std::vector> aggregateFunctions; //! special handling of distinct aggregate @@ -233,13 +231,18 @@ class AggregateHashTable : public BaseHashTable { std::unique_ptr tmpSlotIdxes; }; -class AggregateHashTableUtils { +struct AggregateHashTableUtils { -public: - static std::vector> createDistinctHashTables( + static std::unique_ptr createDistinctHashTable( storage::MemoryManager& memoryManager, - const std::vector& groupByKeyDataTypes, - const std::vector>& aggregateFunctions); + const std::vector& groupByKeyTypes, + const common::LogicalType& distinctKeyType); + + // static std::vector> createDistinctHashTables( + // storage::MemoryManager& memoryManager, + // const std::vector& groupByKeyTypes, + // const std::vector& distinctKeyTypes, + // const std::vector>& aggregateFunctions); }; } // namespace processor diff --git a/src/include/processor/operator/aggregate/aggregate_input.h b/src/include/processor/operator/aggregate/aggregate_input.h index 3392c3016e8..9d3a54a6a09 100644 --- a/src/include/processor/operator/aggregate/aggregate_input.h +++ b/src/include/processor/operator/aggregate/aggregate_input.h @@ -6,23 +6,33 @@ namespace kuzu { namespace processor { -struct AggregateInputInfo { - DataPos aggregateVectorPos; +struct AggregateInfo { + DataPos aggVectorPos; std::vector multiplicityChunksPos; + common::LogicalType distinctAggKeyType; - AggregateInputInfo(const DataPos& vectorPos, - std::vector multiplicityChunksPos) - : aggregateVectorPos{vectorPos}, multiplicityChunksPos{std::move(multiplicityChunksPos)} {} - AggregateInputInfo(const AggregateInputInfo& other) - : AggregateInputInfo(other.aggregateVectorPos, other.multiplicityChunksPos) {} - inline std::unique_ptr copy() { - return std::make_unique(*this); - } + AggregateInfo(const DataPos& aggVectorPos, std::vector multiplicityChunksPos, + common::LogicalType distinctAggKeyType) + : aggVectorPos{aggVectorPos}, multiplicityChunksPos{std::move(multiplicityChunksPos)}, + distinctAggKeyType{std::move(distinctAggKeyType)} {} + EXPLICIT_COPY_DEFAULT_MOVE(AggregateInfo); + +private: + AggregateInfo(const AggregateInfo& other) + : aggVectorPos{other.aggVectorPos}, multiplicityChunksPos{other.multiplicityChunksPos}, + distinctAggKeyType{other.distinctAggKeyType} {} }; struct AggregateInput { common::ValueVector* aggregateVector; std::vector multiplicityChunks; + + AggregateInput() = default; + EXPLICIT_COPY_DEFAULT_MOVE(AggregateInput); + +private: + AggregateInput(const AggregateInput& other) + : aggregateVector{other.aggregateVector}, multiplicityChunks{other.multiplicityChunks} {} }; } // namespace processor diff --git a/src/include/processor/operator/aggregate/base_aggregate.h b/src/include/processor/operator/aggregate/base_aggregate.h index 8a1d693ec02..ca3d76740eb 100644 --- a/src/include/processor/operator/aggregate/base_aggregate.h +++ b/src/include/processor/operator/aggregate/base_aggregate.h @@ -26,21 +26,19 @@ class BaseAggregate : public Sink { protected: BaseAggregate(std::unique_ptr resultSetDescriptor, std::vector> aggregateFunctions, - std::vector> aggregateInputInfos, - std::unique_ptr child, uint32_t id, const std::string& paramsString) + std::vector aggInfos, std::unique_ptr child, uint32_t id, + const std::string& paramsString) : Sink{std::move(resultSetDescriptor), PhysicalOperatorType::AGGREGATE, std::move(child), id, paramsString}, - aggregateFunctions{std::move(aggregateFunctions)}, - aggregateInputInfos{std::move(aggregateInputInfos)} {} + aggregateFunctions{std::move(aggregateFunctions)}, aggInfos{std::move(aggInfos)} {} void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override; - inline bool canParallel() const final { return !containDistinctAggregate(); } + bool canParallel() const final { return !containDistinctAggregate(); } void finalize(ExecutionContext* context) override = 0; std::vector> cloneAggFunctions(); - std::vector> cloneAggInputInfos(); std::unique_ptr clone() override = 0; private: @@ -48,8 +46,8 @@ class BaseAggregate : public Sink { protected: std::vector> aggregateFunctions; - std::vector> aggregateInputInfos; - std::vector> aggregateInputs; + std::vector aggInfos; + std::vector aggInputs; }; } // namespace processor diff --git a/src/include/processor/operator/aggregate/hash_aggregate.h b/src/include/processor/operator/aggregate/hash_aggregate.h index 0f224db67da..d061d136490 100644 --- a/src/include/processor/operator/aggregate/hash_aggregate.h +++ b/src/include/processor/operator/aggregate/hash_aggregate.h @@ -54,21 +54,21 @@ struct HashAggregateLocalState { std::unique_ptr aggregateHashTable; void init(ResultSet& resultSet, main::ClientContext* context, HashAggregateInfo& info, - std::vector>& aggregateFunctions); - void append(std::vector>& aggregateInputs, - uint64_t multiplicity) const; + std::vector>& aggregateFunctions, + std::vector types); + void append(const std::vector& aggregateInputs, uint64_t multiplicity) const; }; class HashAggregate : public BaseAggregate { public: HashAggregate(std::unique_ptr resultSetDescriptor, - std::shared_ptr sharedState, HashAggregateInfo aggregateInfo, + std::shared_ptr sharedState, HashAggregateInfo hashInfo, std::vector> aggregateFunctions, - std::vector> aggregateInputInfos, - std::unique_ptr child, uint32_t id, const std::string& paramsString) + std::vector aggInfos, std::unique_ptr child, uint32_t id, + const std::string& paramsString) : BaseAggregate{std::move(resultSetDescriptor), std::move(aggregateFunctions), - std::move(aggregateInputInfos), std::move(child), id, paramsString}, - aggregateInfo{std::move(aggregateInfo)}, sharedState{std::move(sharedState)} {} + std::move(aggInfos), std::move(child), id, paramsString}, + hashInfo{std::move(hashInfo)}, sharedState{std::move(sharedState)} {} void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override; @@ -77,12 +77,12 @@ class HashAggregate : public BaseAggregate { void finalize(ExecutionContext* context) override; std::unique_ptr clone() override { - return make_unique(resultSetDescriptor->copy(), sharedState, aggregateInfo, - cloneAggFunctions(), cloneAggInputInfos(), children[0]->clone(), id, paramsString); + return make_unique(resultSetDescriptor->copy(), sharedState, hashInfo, + cloneAggFunctions(), copyVector(aggInfos), children[0]->clone(), id, paramsString); } private: - HashAggregateInfo aggregateInfo; + HashAggregateInfo hashInfo; HashAggregateLocalState localState; std::shared_ptr sharedState; }; diff --git a/src/include/processor/operator/aggregate/simple_aggregate.h b/src/include/processor/operator/aggregate/simple_aggregate.h index 83c27efb898..0d5b2e62bcc 100644 --- a/src/include/processor/operator/aggregate/simple_aggregate.h +++ b/src/include/processor/operator/aggregate/simple_aggregate.h @@ -33,10 +33,10 @@ class SimpleAggregate : public BaseAggregate { SimpleAggregate(std::unique_ptr resultSetDescriptor, std::shared_ptr sharedState, std::vector> aggregateFunctions, - std::vector> aggregateInputInfos, - std::unique_ptr child, uint32_t id, const std::string& paramsString) + std::vector aggInfos, std::unique_ptr child, uint32_t id, + const std::string& paramsString) : BaseAggregate{std::move(resultSetDescriptor), std::move(aggregateFunctions), - std::move(aggregateInputInfos), std::move(child), id, paramsString}, + std::move(aggInfos), std::move(child), id, paramsString}, sharedState{std::move(sharedState)} {} void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override; @@ -49,7 +49,7 @@ class SimpleAggregate : public BaseAggregate { inline std::unique_ptr clone() override { return make_unique(resultSetDescriptor->copy(), sharedState, - cloneAggFunctions(), cloneAggInputInfos(), children[0]->clone(), id, paramsString); + cloneAggFunctions(), copyVector(aggInfos), children[0]->clone(), id, paramsString); } private: diff --git a/src/include/processor/plan_mapper.h b/src/include/processor/plan_mapper.h index 9e9ca5b5e78..5afc3261a28 100644 --- a/src/include/processor/plan_mapper.h +++ b/src/include/processor/plan_mapper.h @@ -21,7 +21,7 @@ class LogicalCopyFrom; namespace processor { class HashJoinBuildInfo; -struct AggregateInputInfo; +struct AggregateInfo; class NodeInsertExecutor; class RelInsertExecutor; class NodeSetExecutor; @@ -149,13 +149,21 @@ class PlanMapper { std::unique_ptr createHashBuildInfo(const planner::Schema& buildSideSchema, const binder::expression_vector& keys, const binder::expression_vector& payloads); + + std::unique_ptr createDistinctHashAggregate( + const binder::expression_vector& keys, const binder::expression_vector& payloads, + planner::Schema* inSchema, planner::Schema* outSchema, + std::unique_ptr prevOperator, const std::string& paramsString); + std::unique_ptr createMarkDistinctHashAggregate( + const binder::expression_vector& keys, const binder::expression_vector& payloads, + std::shared_ptr mark, planner::Schema* inSchema, + planner::Schema* outSchema, std::unique_ptr prevOperator, + const std::string& paramsString); std::unique_ptr createHashAggregate(const binder::expression_vector& keys, - const binder::expression_vector& payloads, - std::vector> aggregateFunctions, - std::vector> aggregateInputInfos, - std::vector aggregatesOutputPos, planner::Schema* inSchema, + const binder::expression_vector& payloads, const binder::expression_vector& aggregates, + std::shared_ptr mark, planner::Schema* inSchema, planner::Schema* outSchema, std::unique_ptr prevOperator, - const std::string& paramsString, std::shared_ptr markExpression); + const std::string& paramsString); std::unique_ptr getNodeInsertExecutor( const planner::LogicalInsertInfo* info, const planner::Schema& inSchema, @@ -173,11 +181,10 @@ class PlanMapper { static void mapSIPJoin(PhysicalOperator* probe); - static std::vector getExpressionsDataPos(const binder::expression_vector& expressions, + static std::vector getDataPos(const binder::expression_vector& expressions, const planner::Schema& schema); - static inline DataPos getDataPos(const binder::Expression& expression, - const planner::Schema& schema) { + static DataPos getDataPos(const binder::Expression& expression, const planner::Schema& schema) { return DataPos(schema.getExpressionPos(expression)); } diff --git a/src/include/processor/result/base_hash_table.h b/src/include/processor/result/base_hash_table.h index 36fa78e0e10..ce58eeb417b 100644 --- a/src/include/processor/result/base_hash_table.h +++ b/src/include/processor/result/base_hash_table.h @@ -38,7 +38,7 @@ class BaseHashTable { storage::MemoryManager& memoryManager; std::unique_ptr factorizedTable; std::vector compareEntryFuncs; - common::logical_type_vec_t keyTypes; + std::vector keyTypes; // Temporary arrays to hold intermediate results for appending. std::shared_ptr hashState; std::unique_ptr hashVector; diff --git a/src/include/processor/result/mark_hash_table.h b/src/include/processor/result/mark_hash_table.h index 5f014ae9819..feb31c3362f 100644 --- a/src/include/processor/result/mark_hash_table.h +++ b/src/include/processor/result/mark_hash_table.h @@ -6,13 +6,10 @@ namespace kuzu { namespace processor { class MarkHashTable : public AggregateHashTable { - public: - MarkHashTable(storage::MemoryManager& memoryManager, - std::vector keyDataTypes, - std::vector dependentKeyDataTypes, - const std::vector>& aggregateFunctions, - uint64_t numEntriesToAllocate, std::unique_ptr tableSchema); + MarkHashTable(storage::MemoryManager& memoryManager, std::vector keyTypes, + std::vector payloadTypes, uint64_t numEntriesToAllocate, + std::unique_ptr tableSchema); uint64_t matchFTEntries(const std::vector& flatKeyVectors, const std::vector& unFlatKeyVectors, uint64_t numMayMatches, diff --git a/src/optimizer/agg_key_dependency_optimizer.cpp b/src/optimizer/agg_key_dependency_optimizer.cpp index e027e792e31..2ad308c7b3f 100644 --- a/src/optimizer/agg_key_dependency_optimizer.cpp +++ b/src/optimizer/agg_key_dependency_optimizer.cpp @@ -26,9 +26,9 @@ void AggKeyDependencyOptimizer::visitOperator(planner::LogicalOperator* op) { void AggKeyDependencyOptimizer::visitAggregate(planner::LogicalOperator* op) { auto agg = (LogicalAggregate*)op; - auto [keys, dependentKeys] = resolveKeysAndDependentKeys(agg->getKeyExpressions()); - agg->setKeyExpressions(keys); - agg->setDependentKeyExpressions(dependentKeys); + auto [keys, dependentKeys] = resolveKeysAndDependentKeys(agg->getKeys()); + agg->setKeys(keys); + agg->setDependentKeys(dependentKeys); } void AggKeyDependencyOptimizer::visitDistinct(planner::LogicalOperator* op) { diff --git a/src/planner/operator/logical_aggregate.cpp b/src/planner/operator/logical_aggregate.cpp index daa04431db9..32f6575ba78 100644 --- a/src/planner/operator/logical_aggregate.cpp +++ b/src/planner/operator/logical_aggregate.cpp @@ -22,7 +22,7 @@ void LogicalAggregate::computeFlatSchema() { f_group_pos_set LogicalAggregate::getGroupsPosToFlattenForGroupBy() { f_group_pos_set dependentGroupsPos; - for (auto& expression : getAllKeyExpressions()) { + for (auto& expression : getAllKeys()) { for (auto groupPos : children[0]->getSchema()->getDependentGroupsPos(expression)) { dependentGroupsPos.insert(groupPos); } @@ -38,7 +38,7 @@ f_group_pos_set LogicalAggregate::getGroupsPosToFlattenForGroupBy() { f_group_pos_set LogicalAggregate::getGroupsPosToFlattenForAggregate() { if (hasDistinctAggregate()) { f_group_pos_set dependentGroupsPos; - for (auto& expression : aggregateExpressions) { + for (auto& expression : aggregates) { for (auto groupPos : children[0]->getSchema()->getDependentGroupsPos(expression)) { dependentGroupsPos.insert(groupPos); } @@ -50,14 +50,14 @@ f_group_pos_set LogicalAggregate::getGroupsPosToFlattenForAggregate() { std::string LogicalAggregate::getExpressionsForPrinting() const { std::string result = "Group By ["; - for (auto& expression : keyExpressions) { + for (auto& expression : keys) { result += expression->toString() + ", "; } - for (auto& expression : dependentKeyExpressions) { + for (auto& expression : dependentKeys) { result += expression->toString() + ", "; } result += "], Aggregate ["; - for (auto& expression : aggregateExpressions) { + for (auto& expression : aggregates) { result += expression->toString() + ", "; } result += "]"; @@ -65,9 +65,9 @@ std::string LogicalAggregate::getExpressionsForPrinting() const { } bool LogicalAggregate::hasDistinctAggregate() { - for (auto& expression : aggregateExpressions) { - auto& functionExpression = (binder::AggregateFunctionExpression&)*expression; - if (functionExpression.isDistinct()) { + for (auto& expression : aggregates) { + auto funcExpr = expression->safePtrCast(); + if (funcExpr->isDistinct()) { return true; } } @@ -75,13 +75,13 @@ bool LogicalAggregate::hasDistinctAggregate() { } void LogicalAggregate::insertAllExpressionsToGroupAndScope(f_group_pos groupPos) { - for (auto& expression : keyExpressions) { + for (auto& expression : keys) { schema->insertToGroupAndScopeMayRepeat(expression, groupPos); } - for (auto& expression : dependentKeyExpressions) { + for (auto& expression : dependentKeys) { schema->insertToGroupAndScopeMayRepeat(expression, groupPos); } - for (auto& expression : aggregateExpressions) { + for (auto& expression : aggregates) { schema->insertToGroupAndScopeMayRepeat(expression, groupPos); } } diff --git a/src/processor/map/map_accumulate.cpp b/src/processor/map/map_accumulate.cpp index c42aa2a6199..7b12f0a046c 100644 --- a/src/processor/map/map_accumulate.cpp +++ b/src/processor/map/map_accumulate.cpp @@ -9,7 +9,7 @@ namespace kuzu { namespace processor { std::unique_ptr PlanMapper::mapAccumulate(LogicalOperator* op) { - auto acc = op->ptrCast(); + auto acc = op->unsafePtrCast(); auto outSchema = acc->getSchema(); auto inSchema = acc->getChild(0)->getSchema(); auto prevOperator = mapOperator(acc->getChild(0).get()); diff --git a/src/processor/map/map_aggregate.cpp b/src/processor/map/map_aggregate.cpp index fc811f3809d..93109f8a08e 100644 --- a/src/processor/map/map_aggregate.cpp +++ b/src/processor/map/map_aggregate.cpp @@ -14,12 +14,11 @@ using namespace kuzu::planner; namespace kuzu { namespace processor { -static std::vector> getAggregateInputInfos( - const expression_vector& groupByExpressions, const expression_vector& aggregateExpressions, - const Schema& schema) { +static std::vector getAggregateInputInfos(const expression_vector& keys, + const expression_vector& aggregates, const Schema& schema) { // Collect unFlat groups from std::unordered_set groupByGroupPosSet; - for (auto& expression : groupByExpressions) { + for (auto& expression : keys) { groupByGroupPosSet.insert(schema.getGroupPos(*expression)); } std::unordered_set unFlatAggregateGroupPosSet; @@ -32,9 +31,9 @@ static std::vector> getAggregateInputInfos( } unFlatAggregateGroupPosSet.insert(groupPos); } - std::vector> result; - for (auto& expression : aggregateExpressions) { - DataPos aggregateVectorPos{}; + std::vector result; + for (auto& expression : aggregates) { + auto aggregateVectorPos = DataPos::getInvalidPos(); if (expression->getNumChildren() != 0) { // COUNT(*) has no children auto child = expression->getChild(0); aggregateVectorPos = DataPos{schema.getExpressionPos(*child)}; @@ -45,15 +44,18 @@ static std::vector> getAggregateInputInfos( multiplicityChunksPos.push_back(groupPos); } } - result.emplace_back(std::make_unique(aggregateVectorPos, - std::move(multiplicityChunksPos))); + auto aggExpr = expression->safePtrCast(); + auto distinctAggKeyType = + aggExpr->isDistinct() ? expression->getChild(0)->getDataType() : *LogicalType::ANY(); + result.emplace_back(aggregateVectorPos, std::move(multiplicityChunksPos), + std::move(distinctAggKeyType)); } return result; } -static binder::expression_vector getKeyExpressions(const binder::expression_vector& expressions, +static expression_vector getKeyExpressions(const expression_vector& expressions, const Schema& schema, bool isFlat) { - binder::expression_vector result; + expression_vector result; for (auto& expression : expressions) { if (schema.getGroup(schema.getGroupPos(*expression))->isFlat() == isFlat) { result.emplace_back(expression); @@ -62,100 +64,124 @@ static binder::expression_vector getKeyExpressions(const binder::expression_vect return result; } -std::unique_ptr PlanMapper::mapAggregate(LogicalOperator* logicalOperator) { - auto& logicalAggregate = (const LogicalAggregate&)*logicalOperator; - auto outSchema = logicalAggregate.getSchema(); - auto inSchema = logicalAggregate.getChild(0)->getSchema(); - auto prevOperator = mapOperator(logicalOperator->getChild(0).get()); - auto paramsString = logicalAggregate.getExpressionsForPrinting(); +static std::vector> getAggFunctions( + const expression_vector& aggregates) { std::vector> aggregateFunctions; - for (auto& expression : logicalAggregate.getAggregateExpressions()) { - aggregateFunctions.push_back( - ((AggregateFunctionExpression&)*expression).aggregateFunction->clone()); + for (auto& expression : aggregates) { + auto aggExpr = expression->safePtrCast(); + aggregateFunctions.push_back(aggExpr->aggregateFunction->clone()); } - auto aggregatesOutputPos = - getExpressionsDataPos(logicalAggregate.getAggregateExpressions(), *outSchema); - auto aggregateInputInfos = getAggregateInputInfos(logicalAggregate.getAllKeyExpressions(), - logicalAggregate.getAggregateExpressions(), *inSchema); - if (logicalAggregate.hasKeyExpressions()) { - return createHashAggregate(logicalAggregate.getKeyExpressions(), - logicalAggregate.getDependentKeyExpressions(), std::move(aggregateFunctions), - std::move(aggregateInputInfos), std::move(aggregatesOutputPos), inSchema, outSchema, - std::move(prevOperator), paramsString, nullptr); - } else { - auto sharedState = make_shared(aggregateFunctions); - auto aggregate = - make_unique(std::make_unique(inSchema), - sharedState, std::move(aggregateFunctions), std::move(aggregateInputInfos), - std::move(prevOperator), getOperatorID(), paramsString); - return make_unique(sharedState, aggregatesOutputPos, - std::move(aggregate), getOperatorID(), paramsString); + return aggregateFunctions; +} + +std::unique_ptr PlanMapper::mapAggregate(LogicalOperator* logicalOperator) { + auto agg = logicalOperator->unsafePtrCast(); + auto aggregates = agg->getAggregates(); + auto outSchema = agg->getSchema(); + auto child = agg->getChild(0).get(); + auto inSchema = child->getSchema(); + auto prevOperator = mapOperator(child); + auto paramsString = agg->getExpressionsForPrinting(); + if (agg->hasKeys()) { + return createHashAggregate(agg->getKeys(), agg->getDependentKeys(), aggregates, + nullptr /* mark */, inSchema, outSchema, std::move(prevOperator), paramsString); } + auto aggFunctions = getAggFunctions(aggregates); + auto aggOutputPos = getDataPos(aggregates, *outSchema); + auto aggregateInputInfos = getAggregateInputInfos(agg->getAllKeys(), aggregates, *inSchema); + auto sharedState = make_shared(aggFunctions); + auto aggregate = make_unique(std::make_unique(inSchema), + sharedState, std::move(aggFunctions), std::move(aggregateInputInfos), + std::move(prevOperator), getOperatorID(), paramsString); + return make_unique(sharedState, aggOutputPos, std::move(aggregate), + getOperatorID(), paramsString); } static std::unique_ptr getFactorizedTableSchema( - const binder::expression_vector& flatKeys, const binder::expression_vector& unflatKeys, - const binder::expression_vector& payloads, + const expression_vector& flatKeys, const expression_vector& unFlatKeys, + const expression_vector& payloads, std::vector>& aggregateFunctions, std::shared_ptr markExpression) { - auto isUnflat = false; + auto isUnFlat = false; auto dataChunkPos = 0u; std::unique_ptr tableSchema = std::make_unique(); for (auto& flatKey : flatKeys) { auto size = LogicalTypeUtils::getRowLayoutSize(flatKey->dataType); - tableSchema->appendColumn(std::make_unique(isUnflat, dataChunkPos, size)); + tableSchema->appendColumn(std::make_unique(isUnFlat, dataChunkPos, size)); } - for (auto& unflatKey : unflatKeys) { - auto size = LogicalTypeUtils::getRowLayoutSize(unflatKey->dataType); - tableSchema->appendColumn(std::make_unique(isUnflat, dataChunkPos, size)); + for (auto& unFlatKey : unFlatKeys) { + auto size = LogicalTypeUtils::getRowLayoutSize(unFlatKey->dataType); + tableSchema->appendColumn(std::make_unique(isUnFlat, dataChunkPos, size)); } for (auto& payload : payloads) { auto size = LogicalTypeUtils::getRowLayoutSize(payload->dataType); - tableSchema->appendColumn(std::make_unique(isUnflat, dataChunkPos, size)); + tableSchema->appendColumn(std::make_unique(isUnFlat, dataChunkPos, size)); } for (auto& aggregateFunc : aggregateFunctions) { - tableSchema->appendColumn(std::make_unique(isUnflat, dataChunkPos, + tableSchema->appendColumn(std::make_unique(isUnFlat, dataChunkPos, aggregateFunc->getAggregateStateSize())); } if (markExpression != nullptr) { - tableSchema->appendColumn(std::make_unique(isUnflat, dataChunkPos, + tableSchema->appendColumn(std::make_unique(isUnFlat, dataChunkPos, LogicalTypeUtils::getRowLayoutSize(markExpression->dataType))); } tableSchema->appendColumn( - std::make_unique(isUnflat, dataChunkPos, sizeof(hash_t))); + std::make_unique(isUnFlat, dataChunkPos, sizeof(hash_t))); return tableSchema; } -std::unique_ptr PlanMapper::createHashAggregate( - const binder::expression_vector& keys, const binder::expression_vector& payloads, - std::vector> aggregateFunctions, - std::vector> aggregateInputInfos, - std::vector aggregatesOutputPos, planner::Schema* inSchema, planner::Schema* outSchema, - std::unique_ptr prevOperator, const std::string& paramsString, - std::shared_ptr markExpression) { - auto sharedState = make_shared(aggregateFunctions); +std::unique_ptr PlanMapper::createDistinctHashAggregate( + const expression_vector& keys, const expression_vector& payloads, Schema* inSchema, + Schema* outSchema, std::unique_ptr prevOperator, + const std::string& paramsString) { + return createHashAggregate(keys, payloads, expression_vector{} /* aggregates */, + nullptr /* mark */, inSchema, outSchema, std::move(prevOperator), paramsString); +} + +std::unique_ptr PlanMapper::createMarkDistinctHashAggregate( + const expression_vector& keys, const expression_vector& payloads, + std::shared_ptr mark, Schema* inSchema, Schema* outSchema, + std::unique_ptr prevOperator, const std::string& paramsString) { + return createHashAggregate(keys, payloads, expression_vector{} /* aggregates */, + std::move(mark), inSchema, outSchema, std::move(prevOperator), paramsString); +} + +// Payloads are also group by keys except that they are functional dependent on keys so we don't +// need to hash or compare payloads. +std::unique_ptr PlanMapper::createHashAggregate(const expression_vector& keys, + const expression_vector& payloads, const expression_vector& aggregates, + std::shared_ptr mark, Schema* inSchema, Schema* outSchema, + std::unique_ptr prevOperator, const std::string& paramsString) { + // Create hash aggregate + auto aggFunctions = getAggFunctions(aggregates); + expression_vector allKeys; + allKeys.insert(allKeys.end(), keys.begin(), keys.end()); + allKeys.insert(allKeys.end(), payloads.begin(), payloads.end()); + auto aggregateInputInfos = getAggregateInputInfos(allKeys, aggregates, *inSchema); + auto sharedState = std::make_shared(aggFunctions); auto flatKeys = getKeyExpressions(keys, *inSchema, true /* isFlat */); auto unFlatKeys = getKeyExpressions(keys, *inSchema, false /* isFlat */); - auto tableSchema = getFactorizedTableSchema(flatKeys, unFlatKeys, payloads, aggregateFunctions, - markExpression); - HashAggregateInfo aggregateInfo{getExpressionsDataPos(flatKeys, *inSchema), - getExpressionsDataPos(unFlatKeys, *inSchema), getExpressionsDataPos(payloads, *inSchema), - std::move(tableSchema), - markExpression == nullptr ? HashTableType::AGGREGATE_HASH_TABLE : - HashTableType::MARK_HASH_TABLE}; + auto tableSchema = getFactorizedTableSchema(flatKeys, unFlatKeys, payloads, aggFunctions, mark); + auto hashTableType = + mark == nullptr ? HashTableType::AGGREGATE_HASH_TABLE : HashTableType::MARK_HASH_TABLE; + HashAggregateInfo aggregateInfo{getDataPos(flatKeys, *inSchema), + getDataPos(unFlatKeys, *inSchema), getDataPos(payloads, *inSchema), std::move(tableSchema), + hashTableType}; auto aggregate = make_unique(std::make_unique(inSchema), - sharedState, std::move(aggregateInfo), std::move(aggregateFunctions), + sharedState, std::move(aggregateInfo), std::move(aggFunctions), std::move(aggregateInputInfos), std::move(prevOperator), getOperatorID(), paramsString); - binder::expression_vector outputExpressions; + // Create AggScan. + expression_vector outputExpressions; outputExpressions.insert(outputExpressions.end(), flatKeys.begin(), flatKeys.end()); outputExpressions.insert(outputExpressions.end(), unFlatKeys.begin(), unFlatKeys.end()); outputExpressions.insert(outputExpressions.end(), payloads.begin(), payloads.end()); - if (markExpression != nullptr) { - outputExpressions.emplace_back(markExpression); + if (mark != nullptr) { + outputExpressions.emplace_back(mark); } + auto aggOutputPos = getDataPos(aggregates, *outSchema); return std::make_unique(sharedState, - getExpressionsDataPos(outputExpressions, *outSchema), std::move(aggregatesOutputPos), - std::move(aggregate), getOperatorID(), paramsString); + getDataPos(outputExpressions, *outSchema), std::move(aggOutputPos), std::move(aggregate), + getOperatorID(), paramsString); } } // namespace processor diff --git a/src/processor/map/map_distinct.cpp b/src/processor/map/map_distinct.cpp index a1de19d1a49..31803b21e33 100644 --- a/src/processor/map/map_distinct.cpp +++ b/src/processor/map/map_distinct.cpp @@ -1,5 +1,4 @@ #include "planner/operator/logical_distinct.h" -#include "processor/operator/aggregate/aggregate_input.h" #include "processor/plan_mapper.h" using namespace kuzu::common; @@ -9,17 +8,13 @@ namespace kuzu { namespace processor { std::unique_ptr PlanMapper::mapDistinct(LogicalOperator* logicalOperator) { - auto& logicalDistinct = (const LogicalDistinct&)*logicalOperator; - auto outSchema = logicalDistinct.getSchema(); - auto inSchema = logicalDistinct.getChild(0)->getSchema(); - auto prevOperator = mapOperator(logicalOperator->getChild(0).get()); - std::vector> emptyAggFunctions; - std::vector> emptyAggInputInfos; - std::vector emptyAggregatesOutputPos; - return createHashAggregate(logicalDistinct.getKeys(), logicalDistinct.getPayloads(), - std::move(emptyAggFunctions), std::move(emptyAggInputInfos), - std::move(emptyAggregatesOutputPos), inSchema, outSchema, std::move(prevOperator), - logicalDistinct.getExpressionsForPrinting(), nullptr /* markExpression */); + auto distinct = logicalOperator->safePtrCast(); + auto child = distinct->getChild(0).get(); + auto outSchema = distinct->getSchema(); + auto inSchema = child->getSchema(); + auto prevOperator = mapOperator(child); + return createDistinctHashAggregate(distinct->getKeys(), distinct->getPayloads(), inSchema, + outSchema, std::move(prevOperator), distinct->getExpressionsForPrinting()); } } // namespace processor diff --git a/src/processor/map/map_mark_accumulate.cpp b/src/processor/map/map_mark_accumulate.cpp index 96268c08c7d..c9f4bc4cffd 100644 --- a/src/processor/map/map_mark_accumulate.cpp +++ b/src/processor/map/map_mark_accumulate.cpp @@ -1,5 +1,4 @@ #include "planner/operator/logical_mark_accmulate.h" -#include "processor/operator/aggregate/aggregate_input.h" #include "processor/plan_mapper.h" using namespace kuzu::planner; @@ -9,17 +8,15 @@ namespace kuzu { namespace processor { std::unique_ptr PlanMapper::mapMarkAccumulate(LogicalOperator* op) { - auto logicalMarkAccumulate = ku_dynamic_cast(op); - auto keys = logicalMarkAccumulate->getKeys(); - auto payloads = logicalMarkAccumulate->getPayloads(); - auto outSchema = logicalMarkAccumulate->getSchema(); - auto inSchema = logicalMarkAccumulate->getChild(0)->getSchema(); - auto prevOperator = mapOperator(logicalMarkAccumulate->getChild(0).get()); - return createHashAggregate(keys, payloads, - std::vector>{}, - std::vector>{}, std::vector{}, inSchema, - outSchema, std::move(prevOperator), logicalMarkAccumulate->getExpressionsForPrinting(), - logicalMarkAccumulate->getMark()); + auto acc = op->safePtrCast(); + auto keys = acc->getKeys(); + auto payloads = acc->getPayloads(); + auto outSchema = acc->getSchema(); + auto child = acc->getChild(0).get(); + auto inSchema = child->getSchema(); + auto prevOperator = mapOperator(child); + return createMarkDistinctHashAggregate(keys, payloads, acc->getMark(), inSchema, outSchema, + std::move(prevOperator), acc->getExpressionsForPrinting()); } } // namespace processor diff --git a/src/processor/map/plan_mapper.cpp b/src/processor/map/plan_mapper.cpp index 4103f73afbd..c09c950ee56 100644 --- a/src/processor/map/plan_mapper.cpp +++ b/src/processor/map/plan_mapper.cpp @@ -190,11 +190,11 @@ std::unique_ptr PlanMapper::mapOperator(LogicalOperator* logic return physicalOperator; } -std::vector PlanMapper::getExpressionsDataPos(const binder::expression_vector& expressions, +std::vector PlanMapper::getDataPos(const binder::expression_vector& expressions, const planner::Schema& schema) { std::vector result; for (auto& expression : expressions) { - result.emplace_back(schema.getExpressionPos(*expression)); + result.emplace_back(getDataPos(*expression, schema)); } return result; } diff --git a/src/processor/operator/aggregate/aggregate_hash_table.cpp b/src/processor/operator/aggregate/aggregate_hash_table.cpp index a7a8fa4643b..95630e05ec4 100644 --- a/src/processor/operator/aggregate/aggregate_hash_table.cpp +++ b/src/processor/operator/aggregate/aggregate_hash_table.cpp @@ -11,23 +11,31 @@ namespace kuzu { namespace processor { AggregateHashTable::AggregateHashTable(MemoryManager& memoryManager, - std::vector keyDataTypes, std::vector dependentKeyDataTypes, + std::vector keyTypes, std::vector payloadTypes, const std::vector>& aggregateFunctions, - uint64_t numEntriesToAllocate, std::unique_ptr tableSchema) - : BaseHashTable{memoryManager, std::move(keyDataTypes)}, - dependentKeyDataTypes{std::move(dependentKeyDataTypes)} { + const std::vector& distinctAggKeyTypes, uint64_t numEntriesToAllocate, + std::unique_ptr tableSchema) + : BaseHashTable{memoryManager, std::move(keyTypes)}, payloadTypes{std::move(payloadTypes)} { initializeFT(aggregateFunctions, std::move(tableSchema)); initializeHashTable(numEntriesToAllocate); - distinctHashTables = AggregateHashTableUtils::createDistinctHashTables(memoryManager, - this->keyTypes, this->aggregateFunctions); + KU_ASSERT(aggregateFunctions.size() == distinctAggKeyTypes.size()); + for (auto i = 0u; i < this->aggregateFunctions.size(); ++i) { + std::unique_ptr distinctHT; + if (this->aggregateFunctions[i]->isDistinct) { + distinctHT = AggregateHashTableUtils::createDistinctHashTable(memoryManager, + this->keyTypes, distinctAggKeyTypes[i]); + } else { + distinctHT = nullptr; + } + distinctHashTables.push_back(std::move(distinctHT)); + } initializeTmpVectors(); } void AggregateHashTable::append(const std::vector& flatKeyVectors, const std::vector& unFlatKeyVectors, - const std::vector& dependentKeyVectors, common::DataChunkState* leadingState, - const std::vector>& aggregateInputs, - uint64_t resultSetMultiplicity) { + const std::vector& dependentKeyVectors, DataChunkState* leadingState, + const std::vector& aggregateInputs, uint64_t resultSetMultiplicity) { resizeHashTableIfNecessary(leadingState->selVector->selectedSize); computeVectorHashes(flatKeyVectors, unFlatKeyVectors); findHashSlots(flatKeyVectors, unFlatKeyVectors, dependentKeyVectors, leadingState); @@ -66,9 +74,9 @@ bool AggregateHashTable::isAggregateValueDistinctForGroupByKeys( void AggregateHashTable::merge(AggregateHashTable& other) { std::shared_ptr vectorsToScanState = std::make_shared(); - std::vector vectorsToScan(keyTypes.size() + dependentKeyDataTypes.size()); + std::vector vectorsToScan(keyTypes.size() + payloadTypes.size()); std::vector groupByHashVectors(keyTypes.size()); - std::vector groupByNonHashVectors(dependentKeyDataTypes.size()); + std::vector groupByNonHashVectors(payloadTypes.size()); std::vector> hashKeyVectors(keyTypes.size()); std::vector> nonHashKeyVectors(groupByNonHashVectors.size()); for (auto i = 0u; i < keyTypes.size(); i++) { @@ -78,9 +86,8 @@ void AggregateHashTable::merge(AggregateHashTable& other) { groupByHashVectors[i] = hashKeyVec.get(); hashKeyVectors[i] = std::move(hashKeyVec); } - for (auto i = 0u; i < dependentKeyDataTypes.size(); i++) { - auto nonHashKeyVec = - std::make_unique(dependentKeyDataTypes[i], &memoryManager); + for (auto i = 0u; i < payloadTypes.size(); i++) { + auto nonHashKeyVec = std::make_unique(payloadTypes[i], &memoryManager); nonHashKeyVec->state = vectorsToScanState; vectorsToScan[i + keyTypes.size()] = nonHashKeyVec.get(); groupByNonHashVectors[i] = nonHashKeyVec.get(); @@ -129,11 +136,11 @@ void AggregateHashTable::finalizeAggregateStates() { void AggregateHashTable::initializeFT( const std::vector>& aggFuncs, std::unique_ptr tableSchema) { - aggStateColIdxInFT = keyTypes.size() + dependentKeyDataTypes.size(); + aggStateColIdxInFT = keyTypes.size() + payloadTypes.size(); for (auto& dataType : keyTypes) { numBytesForKeys += LogicalTypeUtils::getRowLayoutSize(dataType); } - for (auto& dataType : dependentKeyDataTypes) { + for (auto& dataType : payloadTypes) { numBytesForDependentKeys += LogicalTypeUtils::getRowLayoutSize(dataType); } aggStateColOffsetInFT = numBytesForKeys + numBytesForDependentKeys; @@ -424,7 +431,7 @@ void AggregateHashTable::increaseHashSlotIdxes(uint64_t numNoMatches) { void AggregateHashTable::findHashSlots(const std::vector& flatKeyVectors, const std::vector& unFlatKeyVectors, - const std::vector& dependentKeyVectors, common::DataChunkState* leadingState) { + const std::vector& dependentKeyVectors, DataChunkState* leadingState) { initTmpHashSlotsAndIdxes(); auto numEntriesToFindHashSlots = leadingState->selVector->selectedSize; while (numEntriesToFindHashSlots > 0) { @@ -506,16 +513,15 @@ void AggregateHashTable::updateAggState(const std::vector& flatKey void AggregateHashTable::updateAggStates(const std::vector& flatKeyVectors, const std::vector& unFlatKeyVectors, - const std::vector>& aggregateInputs, - uint64_t resultSetMultiplicity) { + const std::vector& aggregateInputs, uint64_t resultSetMultiplicity) { auto aggregateStateOffset = aggStateColOffsetInFT; for (auto i = 0u; i < aggregateFunctions.size(); i++) { auto multiplicity = resultSetMultiplicity; - for (auto& dataChunk : aggregateInputs[i]->multiplicityChunks) { + for (auto& dataChunk : aggregateInputs[i].multiplicityChunks) { multiplicity *= dataChunk->state->selVector->selectedSize; } updateAggFuncs[i](this, flatKeyVectors, unFlatKeyVectors, aggregateFunctions[i], - aggregateInputs[i]->aggregateVector, multiplicity, i, aggregateStateOffset); + aggregateInputs[i].aggregateVector, multiplicity, i, aggregateStateOffset); aggregateStateOffset += aggregateFunctions[i]->getAggregateStateSize(); } } @@ -753,40 +759,72 @@ void AggregateHashTable::updateBothUnFlatDifferentDCAggVectorState( } } -std::vector> AggregateHashTableUtils::createDistinctHashTables( - MemoryManager& memoryManager, const std::vector& groupByKeyDataTypes, - const std::vector>& aggregateFunctions) { - // TODO(Xiyang): move the creation of distinct hashtable schema to mapper. - std::vector> distinctHTs; - for (auto& aggregateFunction : aggregateFunctions) { - if (aggregateFunction->isFunctionDistinct()) { - std::vector distinctKeysDataTypes(groupByKeyDataTypes.size() + 1); - auto tableSchema = std::make_unique(); - for (auto i = 0u; i < groupByKeyDataTypes.size(); i++) { - distinctKeysDataTypes[i] = groupByKeyDataTypes[i]; - auto size = LogicalTypeUtils::getRowLayoutSize(distinctKeysDataTypes[i]); - tableSchema->appendColumn(std::make_unique(false /* isUnflat */, - 0 /* dataChunkPos */, size)); - } - distinctKeysDataTypes[groupByKeyDataTypes.size()] = - LogicalType{aggregateFunction->parameterTypeIDs[0]}; - tableSchema->appendColumn( - std::make_unique(false /* isUnflat */, 0 /* dataChunkPos */, - LogicalTypeUtils::getRowLayoutSize( - LogicalType{aggregateFunction->parameterTypeIDs[0]}))); - tableSchema->appendColumn(std::make_unique(false /* isUnflat */, - 0 /* dataChunkPos */, sizeof(hash_t))); - std::vector> emptyFunctions; - auto ht = std::make_unique(memoryManager, - std::move(distinctKeysDataTypes), emptyFunctions, 0 /* numEntriesToAllocate */, - std::move(tableSchema)); - distinctHTs.push_back(std::move(ht)); - } else { - distinctHTs.push_back(nullptr); - } - } - return distinctHTs; -} +std::unique_ptr AggregateHashTableUtils::createDistinctHashTable( + MemoryManager& memoryManager, const std::vector& groupByKeyTypes, + const LogicalType& distinctKeyType) { + std::vector hashKeyTypes(groupByKeyTypes.size() + 1); + auto tableSchema = std::make_unique(); + auto i = 0u; + // Group by key columns + for (; i < groupByKeyTypes.size(); i++) { + hashKeyTypes[i] = groupByKeyTypes[i]; + auto size = LogicalTypeUtils::getRowLayoutSize(hashKeyTypes[i]); + auto columnSchema = + std::make_unique(false /* isUnFlat */, 0 /* dataChunkPos */, size); + tableSchema->appendColumn(std::move(columnSchema)); + } + // Distinct key column + hashKeyTypes[i] = distinctKeyType; + auto columnSchema = std::make_unique(false /* isUnFlat */, 0 /* dataChunkPos */, + LogicalTypeUtils::getRowLayoutSize(distinctKeyType)); + tableSchema->appendColumn(std::move(columnSchema)); + // Hash column + tableSchema->appendColumn( + std::make_unique(false /* isUnFlat */, 0 /* dataChunkPos */, sizeof(hash_t))); + return std::make_unique(memoryManager, std::move(hashKeyTypes), + std::vector{} /* empty payload types */, 0 /* numEntriesToAllocate */, + std::move(tableSchema)); +} + +// std::vector> +// AggregateHashTableUtils::createDistinctHashTables( +// MemoryManager& memoryManager, const std::vector& groupByKeyDataTypes, +// const std::vector& distinctKeyTypes, +// const std::vector& isDistinctAgg) { +// std::vector> distinctHTs; +// KU_ASSERT(isDistinctAgg.size() == distinctKeyTypes.size()); +// for (auto j = 0u; j < isDistinctAgg.size(); ++j) { +// auto distinctKeyType = distinctKeyTypes[j]; +// if (!isDistinctAgg[j]) { +// distinctHTs.push_back(nullptr); +// continue; +// } +// std::vector hashKeyTypes(groupByKeyDataTypes.size() + 1); +// auto tableSchema = std::make_unique(); +// auto i = 0u; +// // Group by key columns +// for (; i < groupByKeyDataTypes.size(); i++) { +// hashKeyTypes[i] = groupByKeyDataTypes[i]; +// auto size = LogicalTypeUtils::getRowLayoutSize(hashKeyTypes[i]); +// auto columnSchema = +// std::make_unique(false /* isUnflat */, 0 /* dataChunkPos */, size); +// tableSchema->appendColumn(std::move(columnSchema)); +// } +// // Distinct key column +// hashKeyTypes[i] = distinctKeyType; +// auto columnSchema = std::make_unique(false /* isUnflat */, +// 0 /* dataChunkPos */, LogicalTypeUtils::getRowLayoutSize(distinctKeyType)); +// tableSchema->appendColumn(std::move(columnSchema)); +// // Hash column +// tableSchema->appendColumn(std::make_unique(false /* isUnflat */, +// 0 /* dataChunkPos */, sizeof(hash_t))); +// std::vector> emptyFunctions; +// auto ht = std::make_unique(memoryManager, std::move(hashKeyTypes), +// emptyFunctions, 0 /* numEntriesToAllocate */, std::move(tableSchema)); +// distinctHTs.push_back(std::move(ht)); +// } +// return distinctHTs; +// } } // namespace processor } // namespace kuzu diff --git a/src/processor/operator/aggregate/base_aggregate.cpp b/src/processor/operator/aggregate/base_aggregate.cpp index f8b3aa7fd58..6171bac30c5 100644 --- a/src/processor/operator/aggregate/base_aggregate.cpp +++ b/src/processor/operator/aggregate/base_aggregate.cpp @@ -23,19 +23,18 @@ bool BaseAggregate::containDistinctAggregate() const { } void BaseAggregate::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* /*context*/) { - for (auto& inputInfo : aggregateInputInfos) { - auto aggregateInput = std::make_unique(); - if (inputInfo->aggregateVectorPos.dataChunkPos == INVALID_DATA_CHUNK_POS) { - aggregateInput->aggregateVector = nullptr; + for (auto& info : aggInfos) { + auto aggregateInput = AggregateInput(); + if (info.aggVectorPos.dataChunkPos == INVALID_DATA_CHUNK_POS) { + aggregateInput.aggregateVector = nullptr; } else { - aggregateInput->aggregateVector = - resultSet->getValueVector(inputInfo->aggregateVectorPos).get(); + aggregateInput.aggregateVector = resultSet->getValueVector(info.aggVectorPos).get(); } - for (auto dataChunkPos : inputInfo->multiplicityChunksPos) { - aggregateInput->multiplicityChunks.push_back( + for (auto dataChunkPos : info.multiplicityChunksPos) { + aggregateInput.multiplicityChunks.push_back( resultSet->getDataChunk(dataChunkPos).get()); } - aggregateInputs.push_back(std::move(aggregateInput)); + aggInputs.push_back(std::move(aggregateInput)); } } @@ -48,14 +47,5 @@ std::vector> BaseAggregate::cloneAg return result; } -std::vector> BaseAggregate::cloneAggInputInfos() { - std::vector> result; - result.reserve(aggregateInputInfos.size()); - for (auto& info : aggregateInputInfos) { - result.push_back(info->copy()); - } - return result; -} - } // namespace processor } // namespace kuzu diff --git a/src/processor/operator/aggregate/hash_aggregate.cpp b/src/processor/operator/aggregate/hash_aggregate.cpp index 9c88638a131..f629b8b4857 100644 --- a/src/processor/operator/aggregate/hash_aggregate.cpp +++ b/src/processor/operator/aggregate/hash_aggregate.cpp @@ -64,7 +64,8 @@ HashAggregateInfo::HashAggregateInfo(const HashAggregateInfo& other) void HashAggregateLocalState::init(ResultSet& resultSet, main::ClientContext* context, HashAggregateInfo& info, - std::vector>& aggregateFunctions) { + std::vector>& aggregateFunctions, + std::vector types) { std::vector keyDataTypes; for (auto& pos : info.flatKeysPos) { auto vector = resultSet.getValueVector(pos).get(); @@ -86,19 +87,20 @@ void HashAggregateLocalState::init(ResultSet& resultSet, main::ClientContext* co unFlatKeyVectors[0]->state.get(); switch (info.hashTableType) { case HashTableType::AGGREGATE_HASH_TABLE: - aggregateHashTable = std::make_unique(*context->getMemoryManager(), - keyDataTypes, payloadDataTypes, aggregateFunctions, 0, std::move(info.tableSchema)); + aggregateHashTable = + std::make_unique(*context->getMemoryManager(), keyDataTypes, + payloadDataTypes, aggregateFunctions, types, 0, std::move(info.tableSchema)); break; case HashTableType::MARK_HASH_TABLE: aggregateHashTable = std::make_unique(*context->getMemoryManager(), - keyDataTypes, payloadDataTypes, aggregateFunctions, 0, std::move(info.tableSchema)); + keyDataTypes, payloadDataTypes, 0, std::move(info.tableSchema)); break; default: KU_UNREACHABLE; } } -void HashAggregateLocalState::append(std::vector>& aggregateInputs, +void HashAggregateLocalState::append(const std::vector& aggregateInputs, uint64_t multiplicity) const { aggregateHashTable->append(flatKeyVectors, unFlatKeyVectors, dependentKeyVectors, leadingState, aggregateInputs, multiplicity); @@ -106,12 +108,17 @@ void HashAggregateLocalState::append(std::vector void HashAggregate::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { BaseAggregate::initLocalStateInternal(resultSet, context); - localState.init(*resultSet, context->clientContext, aggregateInfo, aggregateFunctions); + std::vector distinctAggKeyTypes; + for (auto& info : aggInfos) { + distinctAggKeyTypes.push_back(info.distinctAggKeyType); + } + localState.init(*resultSet, context->clientContext, hashInfo, aggregateFunctions, + distinctAggKeyTypes); } void HashAggregate::executeInternal(ExecutionContext* context) { while (children[0]->getNextTuple(context)) { - localState.append(aggregateInputs, resultSet->multiplicity); + localState.append(aggInputs, resultSet->multiplicity); } sharedState->appendAggregateHashTable(std::move(localState.aggregateHashTable)); } diff --git a/src/processor/operator/aggregate/simple_aggregate.cpp b/src/processor/operator/aggregate/simple_aggregate.cpp index f39a1ac2c17..a7672e658d5 100644 --- a/src/processor/operator/aggregate/simple_aggregate.cpp +++ b/src/processor/operator/aggregate/simple_aggregate.cpp @@ -44,12 +44,20 @@ std::pair SimpleAggregateSharedState::getNextRangeToRead() { void SimpleAggregate::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { BaseAggregate::initLocalStateInternal(resultSet, context); - for (auto& aggregateFunction : this->aggregateFunctions) { - localAggregateStates.push_back(aggregateFunction->createInitialNullAggregateState()); - } - distinctHashTables = AggregateHashTableUtils::createDistinctHashTables( - *context->clientContext->getMemoryManager(), std::vector{}, - this->aggregateFunctions); + for (auto i = 0u; i < aggregateFunctions.size(); ++i) { + auto func = aggregateFunctions[i].get(); + localAggregateStates.push_back(func->createInitialNullAggregateState()); + std::unique_ptr distinctHT; + if (func->isDistinct) { + auto mm = context->clientContext->getMemoryManager(); + distinctHT = AggregateHashTableUtils::createDistinctHashTable(*mm, + std::vector{} /* empty group by keys */, + aggInfos[i].distinctAggKeyType); + } else { + distinctHT = nullptr; + } + distinctHashTables.push_back(std::move(distinctHT)); + }; } void SimpleAggregate::executeInternal(ExecutionContext* context) { @@ -59,10 +67,10 @@ void SimpleAggregate::executeInternal(ExecutionContext* context) { auto aggregateFunction = aggregateFunctions[i].get(); if (aggregateFunction->isFunctionDistinct()) { computeDistinctAggregate(distinctHashTables[i].get(), aggregateFunction, - aggregateInputs[i].get(), localAggregateStates[i].get(), memoryManager); + &aggInputs[i], localAggregateStates[i].get(), memoryManager); } else { - computeAggregate(aggregateFunction, aggregateInputs[i].get(), - localAggregateStates[i].get(), memoryManager); + computeAggregate(aggregateFunction, &aggInputs[i], localAggregateStates[i].get(), + memoryManager); } } } diff --git a/src/processor/result/mark_hash_table.cpp b/src/processor/result/mark_hash_table.cpp index 29c515e2d56..5450d8346da 100644 --- a/src/processor/result/mark_hash_table.cpp +++ b/src/processor/result/mark_hash_table.cpp @@ -4,12 +4,12 @@ namespace kuzu { namespace processor { MarkHashTable::MarkHashTable(storage::MemoryManager& memoryManager, - std::vector keyDataTypes, - std::vector dependentKeyDataTypes, - const std::vector>& aggregateFunctions, + std::vector keyTypes, std::vector payloadTypes, uint64_t numEntriesToAllocate, std::unique_ptr tableSchema) - : AggregateHashTable(memoryManager, std::move(keyDataTypes), std::move(dependentKeyDataTypes), - std::move(aggregateFunctions), numEntriesToAllocate, std::move(tableSchema)) { + : AggregateHashTable(memoryManager, std::move(keyTypes), std::move(payloadTypes), + std::vector>{} /* empty aggregates */, + std::vector{} /* empty distinct agg key*/, numEntriesToAllocate, + std::move(tableSchema)) { distinctColIdxInFT = hashColIdxInFT - 1; } diff --git a/test/test_files/ldbc/ldbc-interactive/interactive-complex.test b/test/test_files/ldbc/ldbc-interactive/interactive-complex.test index 7e8fb9793aa..5844f359175 100644 --- a/test/test_files/ldbc/ldbc-interactive/interactive-complex.test +++ b/test/test_files/ldbc/ldbc-interactive/interactive-complex.test @@ -165,7 +165,6 @@ Euripides|1 # IC12 should be changed to use Kleene Star relationship once that is implemented -LOG IC12 -CHECK_ORDER --PARALLELISM 10 -STATEMENT MATCH (tag:Tag)-[:hasType|:isSubclassOf*1..20]->(baseTagClass:TagClass) WHERE tag.name = "Monarch" OR baseTagClass.name = "Monarch" WITH collect(tag.id) as tags @@ -174,16 +173,14 @@ Euripides|1 RETURN friend.id AS personId, friend.firstName AS personFirstName, friend.lastName AS personLastName, list_sort(collect(DISTINCT tag.name)) AS tagNames, count(DISTINCT comment) AS replyCount ORDER BY replyCount DESC, personId ASC LIMIT 20; ----- error -Binder exception: DISTINCT is not supported for NODE or REL type. - -#8796093022764|Zheng|Xu|[Mahmud_of_Ghazni,Ashoka,Tiberius,Marcus_Aurelius,Genghis_Khan,Justinian_I,Hadrian,Timur]|13 -#10995116278353|Otto|Muller|[Tiberius,Genghis_Khan,Justinian_I,Constantine_the_Great,Trajan]|11 -#17592186044994|Jie|Wang|[Genghis_Khan,David]|7 -#13194139534548|Bing|Zheng|[Genghis_Khan,Hadrian,Solomon]|6 -#13194139533500|Otto|Becker|[Tiberius,Genghis_Khan,Julius_Caesar,David,Alexander_the_Great]|5 -#28587302322537|Anh|Nguyen|[Mahmud_of_Ghazni,Trajan]|3 -#30786325578932|Alexander|Hleb|[Mahmud_of_Ghazni,David]|3 +---- 7 +8796093022764|Zheng|Xu|[Ashoka,Genghis_Khan,Hadrian,Justinian_I,Mahmud_of_Ghazni,Marcus_Aurelius,Tiberius,Timur]|13 +10995116278353|Otto|Muller|[Constantine_the_Great,Genghis_Khan,Justinian_I,Tiberius,Trajan]|11 +17592186044994|Jie|Wang|[David,Genghis_Khan]|7 +13194139534548|Bing|Zheng|[Genghis_Khan,Hadrian,Solomon]|6 +13194139533500|Otto|Becker|[Alexander_the_Great,David,Genghis_Khan,Julius_Caesar,Tiberius]|5 +28587302322537|Anh|Nguyen|[Mahmud_of_Ghazni,Trajan]|3 +30786325578932|Alexander|Hleb|[David,Mahmud_of_Ghazni]|3 # To be completely correct, this query needs to have # (i) Unbounded shortest path diff --git a/test/test_files/tinysnb/agg/distinct_agg.test b/test/test_files/tinysnb/agg/distinct_agg.test index e36bb4c440f..719496fb4f6 100644 --- a/test/test_files/tinysnb/agg/distinct_agg.test +++ b/test/test_files/tinysnb/agg/distinct_agg.test @@ -63,3 +63,58 @@ (0:3)-{_LABEL: knows, _ID: 3:10, date: 1950-05-14, meetTime: 1982-11-11 13:12:05.123, validInterval: 00:23:00, comments: [fewh9182912e3,h9y8y89soidfsf,nuhudf78w78efw,hioshe0f9023sdsd], summary: {locations: ['paris'], transfer: {day: 2000-01-01, amount: [20,5000]}}, notes: happy new year}->(0:1) (0:3)-{_LABEL: knows, _ID: 3:11, date: 2000-01-01, meetTime: 1999-04-21 15:12:11.42, validInterval: 48:00:00.052, comments: [23h9sdslnfowhu2932,shuhf98922323sf], summary: {locations: ['paris'], transfer: {day: 2000-01-01, amount: [20,5000]}}, notes: 4}->(0:2) (0:3)-{_LABEL: knows, _ID: 3:9, date: 2021-06-30, meetTime: 1936-11-02 11:02:01, validInterval: 00:00:00.00048, comments: [fwewe], summary: {locations: ['shanghai','nanjing'], transfer: {day: 1998-11-12, amount: [22,53240]}}, notes: 15}->(0:0) + +-LOG CollectDistinct +-STATEMENT UNWIND [1,1,3] AS a MATCH (b:person) RETURN COUNT(*); +---- 1 +24 +-STATEMENT UNWIND [1,1,3] AS x MATCH (a:person)-[e:knows]->(b:person) WHERE a.ID=5 RETURN COUNT(*); +---- 1 +9 +-STATEMENT UNWIND [1,1,3] AS a MATCH (b:person) WITH COLLECT(DISTINCT b) AS bs UNWIND bs AS newB RETURN newB.fName; +---- 8 +Alice +Bob +Carol +Dan +Elizabeth +Farooq +Greg +Hubert Blaine Wolfeschlegelsteinhausenbergerdorff +-STATEMENT UNWIND [1,1,3] AS x MATCH (a:person)-[e:knows]->(b:person) WHERE a.ID=5 WITH COLLECT(DISTINCT e) AS es UNWIND es AS newE RETURN newE.date; +---- 3 +1950-05-14 +2000-01-01 +2021-06-30 +-STATEMENT UNWIND [1,1,3] AS a MATCH (b:person) WHERE b.ID < 6 WITH a, COLLECT(DISTINCT b) AS bs UNWIND bs AS newB RETURN a, newB.fName; +---- 8 +1|Alice +1|Bob +1|Carol +1|Dan +3|Alice +3|Bob +3|Carol +3|Dan +-STATEMENT UNWIND [1,1,3] AS x MATCH (a:person)-[e:knows]->(b:person) WHERE a.ID=5 WITH x, COLLECT(DISTINCT e) AS es UNWIND es AS newE RETURN x, newE.date; +---- 6 +1|1950-05-14 +1|2000-01-01 +1|2021-06-30 +3|1950-05-14 +3|2000-01-01 +3|2021-06-30 +-STATEMENT UNWIND [1,1] AS x MATCH (a:person) WHERE a.ID < 3 MATCH (b:person) WHERE b.ID < 6 AND b.ID > 2 WITH a, COLLECT(DISTINCT b) AS bs UNWIND bs AS newB RETURN a.fName, newB.fName; +---- 4 +Alice|Carol +Alice|Dan +Bob|Carol +Bob|Dan +-STATEMENT UNWIND [1,1] AS x MATCH (a:person)-[e:knows]->() WHERE a.ID = 5 MATCH (b:person) WHERE b.ID < 6 AND b.ID > 2 WITH e, COLLECT(DISTINCT b) AS bs UNWIND bs AS newB RETURN e.date, newB.fName; +---- 6 +1950-05-14|Carol +1950-05-14|Dan +2000-01-01|Carol +2000-01-01|Dan +2021-06-30|Carol +2021-06-30|Dan