diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 272f35303bd1..1eb93f13e9b8 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -34,9 +34,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -77,6 +79,7 @@ namespace ErrorCodes extern const int SAMPLING_NOT_SUPPORTED; extern const int ALTER_OF_COLUMN_IS_FORBIDDEN; extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; + extern const int LOGICAL_ERROR; } StorageMerge::StorageMerge( @@ -371,6 +374,38 @@ void StorageMerge::read( query_plan.addStep(std::move(step)); } +/// A transient object of this helper class is created +/// when processing a Merge table data source (subordinary table) +/// that has row policies +/// to guarantee that these row policies are applied +class ReadFromMerge::RowPolicyData +{ +public: + RowPolicyData(RowPolicyFilterPtr, std::shared_ptr, ContextPtr); + + /// Add to data stream columns that are needed only for row policies + /// SELECT x from T if T has row policy y=42 + /// required y in data pipeline + void extendNames(Names &); + + /// Use storage facilities to filter data + /// optimization + /// does not guarantee accuracy, but reduces number of rows + void addStorageFilter(SourceStepWithFilter *); + + /// Create explicit filter transform to exclude + /// rows that are not conform to row level policy + void addFilterTransform(QueryPipelineBuilder &); + +private: + RowPolicyFilterPtr row_policy_filter_ptr; + std::string filter_column_name; // complex filter, may contain logic operations + ActionsDAGPtr actions_dag; + ExpressionActionsPtr filter_actions; + StorageMetadataPtr storage_metadata_snapshot; +}; + + ReadFromMerge::ReadFromMerge( Block common_header_, StorageListWithLocks selected_tables_, @@ -438,8 +473,6 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu query_info.input_order_info = input_sorting_info; } - auto sample_block = merge_storage_snapshot->getMetadataForQuery()->getSampleBlock(); - std::vector> pipelines; QueryPlanResourceHolder resources; @@ -460,70 +493,13 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu if (sampling_requested && !storage->supportsSampling()) throw Exception(ErrorCodes::SAMPLING_NOT_SUPPORTED, "Illegal SAMPLE: table doesn't support sampling"); - Aliases aliases; - auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr(); - auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, context); - - auto modified_query_info = getModifiedQueryInfo(query_info, context, table, nested_storage_snaphsot); - Names column_names_as_aliases; - - if (!context->getSettingsRef().allow_experimental_analyzer) - { - auto storage_columns = storage_metadata_snapshot->getColumns(); - auto syntax_result = TreeRewriter(context).analyzeSelect( - modified_query_info.query, TreeRewriterResult({}, storage, nested_storage_snaphsot)); - - bool with_aliases = common_processed_stage == QueryProcessingStage::FetchColumns && !storage_columns.getAliases().empty(); - if (with_aliases) - { - ASTPtr required_columns_expr_list = std::make_shared(); - ASTPtr column_expr; - - for (const auto & column : column_names) - { - const auto column_default = storage_columns.getDefault(column); - bool is_alias = column_default && column_default->kind == ColumnDefaultKind::Alias; - - if (is_alias) - { - column_expr = column_default->expression->clone(); - replaceAliasColumnsInQuery(column_expr, storage_metadata_snapshot->getColumns(), - syntax_result->array_join_result_to_source, context); - - auto column_description = storage_columns.get(column); - column_expr = addTypeConversionToAST(std::move(column_expr), column_description.type->getName(), - storage_metadata_snapshot->getColumns().getAll(), context); - column_expr = setAlias(column_expr, column); - - auto type = sample_block.getByName(column).type; - aliases.push_back({ .name = column, .type = type, .expression = column_expr->clone() }); - } - else - column_expr = std::make_shared(column); - - required_columns_expr_list->children.emplace_back(std::move(column_expr)); - } - - syntax_result = TreeRewriter(context).analyze( - required_columns_expr_list, storage_columns.getAllPhysical(), storage, storage->getStorageSnapshot(storage_metadata_snapshot, context)); - - auto alias_actions = ExpressionAnalyzer(required_columns_expr_list, syntax_result, context).getActionsDAG(true); - - column_names_as_aliases = alias_actions->getRequiredColumns().getNames(); - if (column_names_as_aliases.empty()) - column_names_as_aliases.push_back(ExpressionActions::getSmallestColumn(storage_metadata_snapshot->getColumns().getAllPhysical()).name); - } - } - auto source_pipeline = createSources( - nested_storage_snaphsot, - modified_query_info, common_processed_stage, required_max_block_size, common_header, - aliases, table, - column_names_as_aliases.empty() ? column_names : column_names_as_aliases, + column_names, + merge_storage_snapshot->getMetadataForQuery()->getSampleBlock(), context, current_streams); @@ -616,22 +592,111 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const SelectQueryInfo & quer return modified_query_info; } +void ReadFromMerge::processAliases( + Names & real_column_names, + const StorageWithLockAndName & storage_with_lock, + Aliases & aliases, + const Block & sample_block, + ContextMutablePtr modified_context) +{ + auto storage = std::get<1>(storage_with_lock); + auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr(); + auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, modified_context); + + auto modified_query_info = getModifiedQueryInfo(query_info, context, storage_with_lock, nested_storage_snaphsot); + Names column_names_as_aliases; + + if (!context->getSettingsRef().allow_experimental_analyzer) + { + auto storage_columns = storage_metadata_snapshot->getColumns(); + auto syntax_result = TreeRewriter(context).analyzeSelect( + modified_query_info.query, TreeRewriterResult({}, storage, nested_storage_snaphsot)); + + bool with_aliases = common_processed_stage == QueryProcessingStage::FetchColumns && !storage_columns.getAliases().empty(); + if (with_aliases) + { + ASTPtr required_columns_expr_list = std::make_shared(); + ASTPtr column_expr; + + for (const auto & column : real_column_names) + { + const auto column_default = storage_columns.getDefault(column); + bool is_alias = column_default && column_default->kind == ColumnDefaultKind::Alias; + + if (is_alias) + { + column_expr = column_default->expression->clone(); + replaceAliasColumnsInQuery(column_expr, storage_metadata_snapshot->getColumns(), + syntax_result->array_join_result_to_source, context); + + const auto & column_description = storage_columns.get(column); + column_expr = addTypeConversionToAST(std::move(column_expr), column_description.type->getName(), + storage_metadata_snapshot->getColumns().getAll(), context); + column_expr = setAlias(column_expr, column); + + /// use storage type for transient columns that are not represented in result + /// e.g. for columns that needed to evaluate row policy + auto type = sample_block.has(column) ? sample_block.getByName(column).type : column_description.type; + + aliases.push_back({ .name = column, .type = type, .expression = column_expr->clone() }); + } + else + column_expr = std::make_shared(column); + + required_columns_expr_list->children.emplace_back(std::move(column_expr)); + } + + syntax_result = TreeRewriter(context).analyze( + required_columns_expr_list, storage_columns.getAllPhysical(), storage, storage->getStorageSnapshot(storage_metadata_snapshot, context)); + + auto alias_actions = ExpressionAnalyzer(required_columns_expr_list, syntax_result, context).getActionsDAG(true); + + column_names_as_aliases = alias_actions->getRequiredColumns().getNames(); + + if (column_names_as_aliases.empty()) + column_names_as_aliases.push_back(ExpressionActions::getSmallestColumn(storage_metadata_snapshot->getColumns().getAllPhysical()).name); + } + } + if (!column_names_as_aliases.empty()) + { + real_column_names = column_names_as_aliases; + } +} + + QueryPipelineBuilderPtr ReadFromMerge::createSources( - const StorageSnapshotPtr & storage_snapshot, - SelectQueryInfo & modified_query_info, - const QueryProcessingStage::Enum & processed_stage, - const UInt64 max_block_size, + QueryProcessingStage::Enum processed_stage, + UInt64 max_block_size, const Block & header, - const Aliases & aliases, const StorageWithLockAndName & storage_with_lock, Names real_column_names, + const Block & sample_block, ContextMutablePtr modified_context, size_t streams_num, bool concat_streams) { const auto & [database_name, storage, _, table_name] = storage_with_lock; + auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr(); + auto storage_snapshot = storage->getStorageSnapshot(storage_metadata_snapshot, context); + auto modified_query_info = getModifiedQueryInfo(query_info, context, storage_with_lock, storage_snapshot); + auto & modified_select = modified_query_info.query->as(); + std::unique_ptr row_policy_data_ptr; + + auto row_policy_filter_ptr = context->getRowPolicyFilter( + database_name, + table_name, + RowPolicyFilterType::SELECT_FILTER); + if (row_policy_filter_ptr) + { + row_policy_data_ptr = std::make_unique(row_policy_filter_ptr, storage, context); + row_policy_data_ptr->extendNames(real_column_names); + } + + Aliases aliases; + processAliases(real_column_names, storage_with_lock, aliases, sample_block, modified_context); + QueryPipelineBuilderPtr builder; if (!InterpreterSelectQuery::isQueryWithFinal(modified_query_info) && storage->needRewriteQueryWithFinal(real_column_names)) { @@ -688,6 +753,14 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( if (!plan.isInitialized()) return {}; + if (row_policy_data_ptr) + { + if (auto * source_step_with_filter = dynamic_cast((plan.getRootNode()->step.get()))) + { + row_policy_data_ptr->addStorageFilter(source_step_with_filter); + } + } + if (auto * read_from_merge_tree = typeid_cast(plan.getRootNode()->step.get())) { size_t filters_dags_size = filter_dags.size(); @@ -783,12 +856,84 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources( /// Subordinary tables could have different but convertible types, like numeric types of different width. /// We must return streams with structure equals to structure of Merge table. - convertingSourceStream(header, storage_snapshot->metadata, aliases, modified_context, *builder, processed_stage); + convertAndFilterSourceStream(header, + storage_snapshot->metadata, + aliases, std::move(row_policy_data_ptr), + modified_context, + *builder, + processed_stage); } return builder; } +ReadFromMerge::RowPolicyData::RowPolicyData(RowPolicyFilterPtr row_policy_filter_ptr_, + std::shared_ptr storage, + ContextPtr local_context) + : row_policy_filter_ptr(row_policy_filter_ptr_) +{ + storage_metadata_snapshot = storage->getInMemoryMetadataPtr(); + auto storage_columns = storage_metadata_snapshot->getColumns(); + auto needed_columns = storage_columns.getAll/*Physical*/(); + + ASTPtr expr = row_policy_filter_ptr->expression; + + auto syntax_result = TreeRewriter(local_context).analyze(expr, + needed_columns /*, + storage, + storage->getStorageSnapshot(storage_metadata_snapshot, local_context)*/); + auto expression_analyzer = ExpressionAnalyzer{expr, syntax_result, local_context}; + + actions_dag = expression_analyzer.getActionsDAG(false /* add_aliases */, false /* project_result */); + filter_actions = std::make_shared(actions_dag, + ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes)); + const auto & required_columns = filter_actions->getRequiredColumnsWithTypes(); + const auto & sample_block_columns = filter_actions->getSampleBlock().getNamesAndTypesList(); + + NamesAndTypesList added, deleted; + sample_block_columns.getDifference(required_columns, added, deleted); + if (!deleted.empty() || added.size() != 1) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Cannot determine row level filter; {} columns deleted, {} columns added", + deleted.size(), added.size()); + } + + filter_column_name = added.getNames().front(); +} + +void ReadFromMerge::RowPolicyData::extendNames(Names & names) +{ + boost::container::flat_set names_set(names.begin(), names.end()); + NameSet added_names; + + for (const auto & req_column : filter_actions->getRequiredColumns()) + { + if (!names_set.contains(req_column)) + { + added_names.emplace(req_column); + } + } + + if (!added_names.empty()) + { + std::copy(added_names.begin(), added_names.end(), std::back_inserter(names)); + } +} + +void ReadFromMerge::RowPolicyData::addStorageFilter(SourceStepWithFilter * step) +{ + step->addFilter(actions_dag, filter_column_name); +} + +void ReadFromMerge::RowPolicyData::addFilterTransform(QueryPipelineBuilder & builder) +{ + builder.addSimpleTransform([&](const Block & stream_header) + { + return std::make_shared(stream_header, filter_actions, filter_column_name, true /* remove filter column */); + }); +} + StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables( ContextPtr query_context, const ASTPtr & query /* = nullptr */, @@ -959,13 +1104,14 @@ void StorageMerge::alter( setInMemoryMetadata(storage_metadata); } -void ReadFromMerge::convertingSourceStream( +void ReadFromMerge::convertAndFilterSourceStream( const Block & header, const StorageMetadataPtr & metadata_snapshot, const Aliases & aliases, + std::unique_ptr row_policy_data_ptr, ContextPtr local_context, QueryPipelineBuilder & builder, - const QueryProcessingStage::Enum & processed_stage) + QueryProcessingStage::Enum processed_stage) { Block before_block_header = builder.getHeader(); @@ -994,6 +1140,11 @@ void ReadFromMerge::convertingSourceStream( if (local_context->getSettingsRef().allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns) convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Position; + if (row_policy_data_ptr) + { + row_policy_data_ptr->addFilterTransform(builder); + } + auto convert_actions_dag = ActionsDAG::makeConvertingActions(builder.getHeader().getColumnsWithTypeAndName(), header.getColumnsWithTypeAndName(), convert_actions_match_columns_mode); diff --git a/src/Storages/StorageMerge.h b/src/Storages/StorageMerge.h index babf0dd92e86..d6b4c5d8ad2f 100644 --- a/src/Storages/StorageMerge.h +++ b/src/Storages/StorageMerge.h @@ -170,38 +170,49 @@ class ReadFromMerge final : public SourceStepWithFilter struct AliasData { - String name; - DataTypePtr type; - ASTPtr expression; + String name; /// "size" in "size String Alias formatReadableSize(size_bytes)" + DataTypePtr type; /// String in "size String Alias formatReadableSize(size_bytes)", or something different came from query + ASTPtr expression; /// formatReadableSize(size_bytes) in "size String Alias formatReadableSize(size_bytes)" }; using Aliases = std::vector; + class RowPolicyData; + static SelectQueryInfo getModifiedQueryInfo(const SelectQueryInfo & query_info, const ContextPtr & modified_context, const StorageWithLockAndName & storage_with_lock_and_name, const StorageSnapshotPtr & storage_snapshot); + /// Populates AliasData structures for further processing + /// using types from result query if possible + /// and removes alias columns from real_column_names + void processAliases( + Names & real_column_names, + const StorageWithLockAndName & storage_with_lock, + Aliases & aliases, + const Block & sample_block, + ContextMutablePtr modified_context); + QueryPipelineBuilderPtr createSources( - const StorageSnapshotPtr & storage_snapshot, - SelectQueryInfo & query_info, - const QueryProcessingStage::Enum & processed_stage, + QueryProcessingStage::Enum processed_stage, UInt64 max_block_size, const Block & header, - const Aliases & aliases, const StorageWithLockAndName & storage_with_lock, Names real_column_names, + const Block & sample_block, ContextMutablePtr modified_context, size_t streams_num, bool concat_streams = false); - static void convertingSourceStream( + static void convertAndFilterSourceStream( const Block & header, const StorageMetadataPtr & metadata_snapshot, const Aliases & aliases, + std::unique_ptr row_policy_data_ptr, ContextPtr context, QueryPipelineBuilder & builder, - const QueryProcessingStage::Enum & processed_stage); + QueryProcessingStage::Enum processed_stage); }; } diff --git a/tests/queries/0_stateless/02763_row_policy_storage_merge.reference b/tests/queries/0_stateless/02763_row_policy_storage_merge.reference new file mode 100644 index 000000000000..9fa5612e7cd4 --- /dev/null +++ b/tests/queries/0_stateless/02763_row_policy_storage_merge.reference @@ -0,0 +1,314 @@ +SELECT * FROM 02763_merge_log_1 ORDER BY x +1 11 +2 12 +3 13 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge) ORDER BY x +1 11 +1 11 +1 11 +1 11 +2 12 +2 12 +2 12 +2 12 +3 13 +3 13 +3 13 +3 13 +4 14 +4 14 +4 14 +4 14 +SETTINGS optimize_move_to_prewhere= 0 +SELECT * FROM 02763_merge_log_1 +3 13 +SELECT * FROM merge(currentDatabase(), 02763_merge_log_1) +3 13 +SELECT * FROM merge(currentDatabase(), 02763_merge_log) +1 11 +2 12 +3 13 +3 13 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge_log) WHERE x>2 +3 13 +3 13 +4 14 +SELECT * FROM 02763_merge_merge_1 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge_merge_1) +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge_merge) +1 11 +2 12 +3 13 +4 14 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge_merge) WHERE x>2 +3 13 +4 14 +4 14 +SELECT * FROM engine_merge_12 WHERE x>2 +3 13 +4 14 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge) +1 11 +1 11 +2 12 +2 12 +3 13 +3 13 +3 13 +4 14 +4 14 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge) WHERE x>2 +3 13 +3 13 +3 13 +4 14 +4 14 +4 14 +aaa 6 39 +aaa 6 39 +aaa 6 39 +aaa 8 42 +aaa 8 42 +aaa 8 42 +3 +3 +3 +4 +4 +4 +SELECT * FROM merge(...) LEFT JOIN merge(...) +3 13 13 +3 13 13 +4 14 14 +4 14 14 +SELECT * FROM merge(...) UNION ALL SELECT * FROM merge(...) +1 11 +1 11 +2 12 +2 12 +3 13 +3 13 +3 13 +4 14 +4 14 +4 14 +SELECT x, SUM(x) FROM (SELECT * FROM merge(...) UNION ALL ...) GROUP BY x +1 22 +2 24 +3 39 +4 42 +1 11 0 +2 12 0 +3 13 0 +4 14 1 +4 14 1 +SELECT * FROM merge(currentDatabase(), 02763_merge_log) WHERE x>1 -- with y>12 +2 12 +3 13 +3 13 +4 14 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge_merge) WHERE x>1 -- with y>12 +2 12 +3 13 +3 13 +4 14 +4 14 +2 12 0 +3 13 1 +3 13 1 +4 14 1 +4 14 1 +SELECT y from merge(currentDatabase(), 02763_merge) +11 +11 +12 +12 +13 +13 +13 +13 +14 +14 +14 +14 +02763_merge_fancycols +SELECT * +SELECT x, lc +SELECT * +1 11 111 111 42 +1 11 111 111 42 +SELECT x, lc +1 111 +1 111 +SELECT x, lc, cnst +1 111 42 +1 111 42 +SELECT x, y from merge(currentDatabase(), 02763_merge +1 11 +1 11 +1 11 +1 11 +2 12 +2 12 +3 13 +3 13 +3 13 +3 13 +4 14 +4 14 +4 14 +4 14 +SETTINGS optimize_move_to_prewhere= 1 +SELECT * FROM 02763_merge_log_1 +3 13 +SELECT * FROM merge(currentDatabase(), 02763_merge_log_1) +3 13 +SELECT * FROM merge(currentDatabase(), 02763_merge_log) +1 11 +2 12 +3 13 +3 13 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge_log) WHERE x>2 +3 13 +3 13 +4 14 +SELECT * FROM 02763_merge_merge_1 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge_merge_1) +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge_merge) +1 11 +2 12 +3 13 +4 14 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge_merge) WHERE x>2 +3 13 +4 14 +4 14 +SELECT * FROM engine_merge_12 WHERE x>2 +3 13 +4 14 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge) +1 11 +1 11 +2 12 +2 12 +3 13 +3 13 +3 13 +4 14 +4 14 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge) WHERE x>2 +3 13 +3 13 +3 13 +4 14 +4 14 +4 14 +aaa 6 39 +aaa 6 39 +aaa 6 39 +aaa 8 42 +aaa 8 42 +aaa 8 42 +3 +3 +3 +4 +4 +4 +SELECT * FROM merge(...) LEFT JOIN merge(...) +3 13 13 +3 13 13 +4 14 14 +4 14 14 +SELECT * FROM merge(...) UNION ALL SELECT * FROM merge(...) +1 11 +1 11 +2 12 +2 12 +3 13 +3 13 +3 13 +4 14 +4 14 +4 14 +SELECT x, SUM(x) FROM (SELECT * FROM merge(...) UNION ALL ...) GROUP BY x +1 22 +2 24 +3 39 +4 42 +1 11 0 +2 12 0 +3 13 0 +4 14 1 +4 14 1 +SELECT * FROM merge(currentDatabase(), 02763_merge_log) WHERE x>1 -- with y>12 +2 12 +3 13 +3 13 +4 14 +4 14 +SELECT * FROM merge(currentDatabase(), 02763_merge_merge) WHERE x>1 -- with y>12 +2 12 +3 13 +3 13 +4 14 +4 14 +2 12 0 +3 13 1 +3 13 1 +4 14 1 +4 14 1 +SELECT y from merge(currentDatabase(), 02763_merge) +11 +11 +12 +12 +13 +13 +13 +13 +14 +14 +14 +14 +02763_merge_fancycols +SELECT * +SELECT x, lc +SELECT * +1 11 111 111 42 +1 11 111 111 42 +SELECT x, lc +1 111 +1 111 +SELECT x, lc, cnst +1 111 42 +1 111 42 +SELECT x, y from merge(currentDatabase(), 02763_merge +1 11 +1 11 +1 11 +1 11 +2 12 +2 12 +3 13 +3 13 +3 13 +3 13 +4 14 +4 14 +4 14 +4 14 diff --git a/tests/queries/0_stateless/02763_row_policy_storage_merge.sql.j2 b/tests/queries/0_stateless/02763_row_policy_storage_merge.sql.j2 new file mode 100644 index 000000000000..0263e1a974f0 --- /dev/null +++ b/tests/queries/0_stateless/02763_row_policy_storage_merge.sql.j2 @@ -0,0 +1,143 @@ +DROP TABLE IF EXISTS 02763_merge_log_1; +DROP TABLE IF EXISTS 02763_merge_log_2; +DROP TABLE IF EXISTS 02763_merge_merge_1; +DROP TABLE IF EXISTS 02763_merge_merge_2; +DROP TABLE IF EXISTS 02763_merge_fancycols; +DROP ROW POLICY IF EXISTS 02763_filter_1 ON 02763_merge_log_1; +DROP ROW POLICY IF EXISTS 02763_filter_2 ON 02763_merge_merge_1; +DROP ROW POLICY IF EXISTS 02763_filter_3 ON 02763_merge_log_1; +DROP ROW POLICY IF EXISTS 02763_filter_4 ON 02763_merge_merge_1; +DROP ROW POLICY IF EXISTS 02763_filter_5 ON 02763_merge_fancycols; +DROP ROW POLICY IF EXISTS 02763_filter_6 ON 02763_merge_fancycols; + + +CREATE TABLE 02763_merge_log_1 (x UInt8, y UInt64) ENGINE = Log; +CREATE TABLE 02763_merge_log_2 (x UInt8, y UInt64) ENGINE = Log; + +CREATE TABLE 02763_merge_merge_1 (x UInt8, y UInt64) ENGINE = MergeTree ORDER BY x; +CREATE TABLE 02763_merge_merge_2 (x UInt8, y UInt64) ENGINE = MergeTree ORDER BY x; + +CREATE TABLE 02763_engine_merge_12 (x UInt8, y UInt64) ENGINE = Merge(currentDatabase(), '02763_merge_merge'); + +INSERT INTO 02763_merge_log_1 VALUES (1, 11), (2, 12), (3, 13), (4, 14); +INSERT INTO 02763_merge_log_2 VALUES (1, 11), (2, 12), (3, 13), (4, 14); +INSERT INTO 02763_merge_merge_1 VALUES (1, 11), (2, 12), (3, 13), (4, 14); +INSERT INTO 02763_merge_merge_2 VALUES (1, 11), (2, 12), (3, 13), (4, 14); + +SELECT 'SELECT * FROM 02763_merge_log_1 ORDER BY x'; +SELECT * FROM 02763_merge_log_1 ORDER BY x; + +SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge) ORDER BY x'; +SELECT * FROM merge(currentDatabase(), '02763_merge') ORDER BY x; + + +{% for prew in [0 , 1] -%} + +SELECT 'SETTINGS optimize_move_to_prewhere= {{prew}}'; + +CREATE ROW POLICY 02763_filter_1 ON 02763_merge_log_1 USING x=3 AS permissive TO ALL; + +SELECT 'SELECT * FROM 02763_merge_log_1'; +SELECT * FROM 02763_merge_log_1 ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge_log_1)'; +SELECT * FROM merge(currentDatabase(), '02763_merge_log_1') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge_log)'; +SELECT * FROM merge(currentDatabase(), '02763_merge_log') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge_log) WHERE x>2'; +SELECT * FROM merge(currentDatabase(), '02763_merge_log') WHERE x>2 ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + +CREATE ROW POLICY 02763_filter_2 ON 02763_merge_merge_1 USING x=4 AS permissive TO ALL; + +SELECT 'SELECT * FROM 02763_merge_merge_1'; +SELECT * FROM 02763_merge_merge_1 ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge_merge_1)'; +SELECT * FROM merge(currentDatabase(), '02763_merge_merge_1') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge_merge)'; +SELECT * FROM merge(currentDatabase(), '02763_merge_merge') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge_merge) WHERE x>2'; +SELECT * FROM merge(currentDatabase(), '02763_merge_merge') WHERE x>2 ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + + +SELECT 'SELECT * FROM engine_merge_12 WHERE x>2'; +SELECT * FROM 02763_engine_merge_12 WHERE x>2 ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + + +SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge)'; +SELECT * FROM merge(currentDatabase(), '02763_merge') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge) WHERE x>2'; +SELECT * FROM merge(currentDatabase(), '02763_merge') WHERE x>2 ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + +SELECT 'aaa', x*2 as x_2, y*3 as y_3 FROM merge(currentDatabase(), '02763_merge') WHERE x>2 ORDER BY x_2 SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT x FROM (SELECT * FROM merge(currentDatabase(), '02763_merge') WHERE x IN (3,4)) ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + +SELECT 'SELECT * FROM merge(...) LEFT JOIN merge(...)'; +SELECT * FROM merge(currentDatabase(), '02763_merge.*1') as a +LEFT JOIN +merge(currentDatabase(), '02763_merge.*2') as b +USING (x) +ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + +SELECT 'SELECT * FROM merge(...) UNION ALL SELECT * FROM merge(...)'; +SELECT * FROM +( +SELECT * FROM merge(currentDatabase(), '02763_merge.*1') +UNION ALL +SELECT * FROM merge(currentDatabase(), '02763_merge.*2') +) +ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + +SELECT 'SELECT x, SUM(x) FROM (SELECT * FROM merge(...) UNION ALL ...) GROUP BY x'; +SELECT x, SUM(y) FROM +(SELECT * FROM merge(currentDatabase(), '02763_merge.*1') +UNION ALL +SELECT * FROM merge(currentDatabase(), '02763_merge.*2')) +GROUP BY x +ORDER BY x; + +SELECT *, x=4 FROM merge(currentDatabase(), '02763_merge_merge') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + +CREATE ROW POLICY 02763_filter_3 ON 02763_merge_log_1 USING y>12 AS permissive TO ALL; +SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge_log) WHERE x>1 -- with y>12'; +SELECT * FROM merge(currentDatabase(), '02763_merge_log') WHERE x>1 ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + +CREATE ROW POLICY 02763_filter_4 ON 02763_merge_merge_1 USING y>12 AS permissive TO ALL; +SELECT 'SELECT * FROM merge(currentDatabase(), 02763_merge_merge) WHERE x>1 -- with y>12'; +SELECT * FROM merge(currentDatabase(), '02763_merge_merge') WHERE x>1 ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + +SELECT *, (x=4 OR y>12) FROM merge(currentDatabase(), '02763_merge_merge') WHERE x>1 ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + +SELECT 'SELECT y from merge(currentDatabase(), 02763_merge)'; +SELECT y from merge(currentDatabase(), '02763_merge') ORDER BY y SETTINGS optimize_move_to_prewhere= {{prew}}; + +SELECT '02763_merge_fancycols'; +CREATE TABLE 02763_merge_fancycols (x UInt8, y Nullable(UInt64), z String DEFAULT CONCAT(toString(x), toString(y)), lc LowCardinality(String) DEFAULT z, cnst UInt32 MATERIALIZED 42) ENGINE = MergeTree() ORDER BY tuple(); +INSERT INTO 02763_merge_fancycols (x, y) SELECT x, y from merge(currentDatabase(), '02763_merge'); + +CREATE ROW POLICY 02763_filter_5 ON 02763_merge_fancycols USING cnst<>42 AS permissive TO ALL; +SELECT 'SELECT *'; +SELECT * from merge(currentDatabase(), '02763_merge_fancycols') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT 'SELECT x, lc'; +SELECT x, lc from merge(currentDatabase(), '02763_merge_fancycols') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + +CREATE ROW POLICY 02763_filter_6 ON 02763_merge_fancycols USING lc='111' AS permissive TO ALL; +SELECT 'SELECT *'; +SELECT * from merge(currentDatabase(), '02763_merge_fancycols') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT 'SELECT x, lc'; +SELECT x, lc from merge(currentDatabase(), '02763_merge_fancycols') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT 'SELECT x, lc, cnst'; +SELECT x, lc, cnst from merge(currentDatabase(), '02763_merge_fancycols') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT 'SELECT x, y from merge(currentDatabase(), 02763_merge'; +SELECT x, y from merge(currentDatabase(), '02763_merge') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + +DROP TABLE 02763_merge_fancycols; + +DROP ROW POLICY 02763_filter_1 ON 02763_merge_log_1; +DROP ROW POLICY 02763_filter_2 ON 02763_merge_merge_1; + +DROP ROW POLICY 02763_filter_3 ON 02763_merge_log_1; +DROP ROW POLICY 02763_filter_4 ON 02763_merge_merge_1; + +DROP ROW POLICY 02763_filter_5 ON 02763_merge_fancycols; +DROP ROW POLICY 02763_filter_6 ON 02763_merge_fancycols; + +{% endfor %} diff --git a/tests/queries/0_stateless/02763_row_policy_storage_merge_alias.reference b/tests/queries/0_stateless/02763_row_policy_storage_merge_alias.reference new file mode 100644 index 000000000000..56bfdbe0b18c --- /dev/null +++ b/tests/queries/0_stateless/02763_row_policy_storage_merge_alias.reference @@ -0,0 +1,49 @@ +02763_merge_aliases +x, y, z FROM 02763_a_merge +3 13 16 +4 14 18 +* FROM 02763_a_merge +3 13 16 +4 14 18 +x, y FROM 02763_a_merge +3 13 +4 14 +SELECT x, y FROM merge(currentDatabase(), 02763_alias) +3 13 +4 14 +SELECT x, y FROM merge(currentDatabase(), 02763_alias) +2 12 +3 13 +4 14 +SELECT x FROM merge(currentDatabase(), 02763_alias) +12 +13 +14 +SELECT y FROM merge(currentDatabase(), 02763_alias) +2 +3 +4 +x, y, z FROM 02763_a_merge +3 13 16 +4 14 18 +* FROM 02763_a_merge +3 13 16 +4 14 18 +x, y FROM 02763_a_merge +3 13 +4 14 +SELECT x, y FROM merge(currentDatabase(), 02763_alias) +3 13 +4 14 +SELECT x, y FROM merge(currentDatabase(), 02763_alias) +2 12 +3 13 +4 14 +SELECT x FROM merge(currentDatabase(), 02763_alias) +12 +13 +14 +SELECT y FROM merge(currentDatabase(), 02763_alias) +2 +3 +4 diff --git a/tests/queries/0_stateless/02763_row_policy_storage_merge_alias.sql.j2 b/tests/queries/0_stateless/02763_row_policy_storage_merge_alias.sql.j2 new file mode 100644 index 000000000000..bdd456951ddd --- /dev/null +++ b/tests/queries/0_stateless/02763_row_policy_storage_merge_alias.sql.j2 @@ -0,0 +1,41 @@ +DROP TABLE IF EXISTS 02763_alias; +DROP TABLE IF EXISTS 02763_a_merge; + + +SELECT '02763_merge_aliases'; +CREATE TABLE 02763_alias (x UInt8, y UInt64, z UInt64 ALIAS plus(x,y)) ENGINE = MergeTree ORDER BY x; +INSERT INTO 02763_alias VALUES (1, 11), (2, 12), (3, 13), (4, 14); + +CREATE ROW POLICY 02763_filter_7 ON 02763_alias USING z>15 AS permissive TO ALL; + +CREATE TABLE 02763_a_merge (x UInt8, y UInt64, z UInt64) ENGINE = Merge(currentDatabase(), '02763_alias'); + +{% for prew in [0 , 1] -%} + + + +SELECT 'x, y, z FROM 02763_a_merge'; +SELECT x, y, z FROM 02763_a_merge ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT '* FROM 02763_a_merge'; +SELECT * FROM 02763_a_merge ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT 'x, y FROM 02763_a_merge'; +SELECT x, y FROM 02763_a_merge ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT 'SELECT x, y FROM merge(currentDatabase(), 02763_alias)'; +SELECT x, y FROM merge(currentDatabase(), '02763_alias') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + +CREATE ROW POLICY 02763_filter_8 ON 02763_alias USING y>11 AS permissive TO ALL; + +SELECT 'SELECT x, y FROM merge(currentDatabase(), 02763_alias)'; +SELECT x, y FROM merge(currentDatabase(), '02763_alias') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT 'SELECT x FROM merge(currentDatabase(), 02763_alias)'; +SELECT y FROM merge(currentDatabase(), '02763_alias') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; +SELECT 'SELECT y FROM merge(currentDatabase(), 02763_alias)'; +SELECT x FROM merge(currentDatabase(), '02763_alias') ORDER BY x SETTINGS optimize_move_to_prewhere= {{prew}}; + +DROP ROW POLICY 02763_filter_8 ON 02763_alias; +{% endfor %} + +DROP TABLE 02763_alias; +DROP TABLE 02763_a_merge; + +DROP ROW POLICY 02763_filter_7 ON 02763_alias;