Skip to content

Commit

Permalink
Merge pull request #446 from Altinity/backports/23.8/50209_engine_mer…
Browse files Browse the repository at this point in the history
…ge_obeys_row_policy

23.8 Backport of ClickHouse#50209 - Engine merge obeys row policy
  • Loading branch information
Enmk authored Sep 12, 2024
2 parents 4b15687 + d9495d4 commit 7f88020
Show file tree
Hide file tree
Showing 6 changed files with 787 additions and 78 deletions.
289 changes: 220 additions & 69 deletions src/Storages/StorageMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
#include <Processors/Sources/NullSource.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Transforms/MaterializingTransform.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Databases/IDatabase.h>
Expand Down Expand Up @@ -77,6 +79,7 @@ namespace ErrorCodes
extern const int SAMPLING_NOT_SUPPORTED;
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int LOGICAL_ERROR;
}

StorageMerge::StorageMerge(
Expand Down Expand Up @@ -371,6 +374,38 @@ void StorageMerge::read(
query_plan.addStep(std::move(step));
}

/// A transient object of this helper class is created
/// when processing a Merge table data source (subordinary table)
/// that has row policies
/// to guarantee that these row policies are applied
class ReadFromMerge::RowPolicyData
{
public:
RowPolicyData(RowPolicyFilterPtr, std::shared_ptr<DB::IStorage>, ContextPtr);

/// Add to data stream columns that are needed only for row policies
/// SELECT x from T if T has row policy y=42
/// required y in data pipeline
void extendNames(Names &);

/// Use storage facilities to filter data
/// optimization
/// does not guarantee accuracy, but reduces number of rows
void addStorageFilter(SourceStepWithFilter *);

/// Create explicit filter transform to exclude
/// rows that are not conform to row level policy
void addFilterTransform(QueryPipelineBuilder &);

private:
RowPolicyFilterPtr row_policy_filter_ptr;
std::string filter_column_name; // complex filter, may contain logic operations
ActionsDAGPtr actions_dag;
ExpressionActionsPtr filter_actions;
StorageMetadataPtr storage_metadata_snapshot;
};


ReadFromMerge::ReadFromMerge(
Block common_header_,
StorageListWithLocks selected_tables_,
Expand Down Expand Up @@ -438,8 +473,6 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
query_info.input_order_info = input_sorting_info;
}

auto sample_block = merge_storage_snapshot->getMetadataForQuery()->getSampleBlock();

std::vector<std::unique_ptr<QueryPipelineBuilder>> pipelines;
QueryPlanResourceHolder resources;

Expand All @@ -460,70 +493,13 @@ void ReadFromMerge::initializePipeline(QueryPipelineBuilder & pipeline, const Bu
if (sampling_requested && !storage->supportsSampling())
throw Exception(ErrorCodes::SAMPLING_NOT_SUPPORTED, "Illegal SAMPLE: table doesn't support sampling");

Aliases aliases;
auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, context);

auto modified_query_info = getModifiedQueryInfo(query_info, context, table, nested_storage_snaphsot);
Names column_names_as_aliases;

if (!context->getSettingsRef().allow_experimental_analyzer)
{
auto storage_columns = storage_metadata_snapshot->getColumns();
auto syntax_result = TreeRewriter(context).analyzeSelect(
modified_query_info.query, TreeRewriterResult({}, storage, nested_storage_snaphsot));

bool with_aliases = common_processed_stage == QueryProcessingStage::FetchColumns && !storage_columns.getAliases().empty();
if (with_aliases)
{
ASTPtr required_columns_expr_list = std::make_shared<ASTExpressionList>();
ASTPtr column_expr;

for (const auto & column : column_names)
{
const auto column_default = storage_columns.getDefault(column);
bool is_alias = column_default && column_default->kind == ColumnDefaultKind::Alias;

if (is_alias)
{
column_expr = column_default->expression->clone();
replaceAliasColumnsInQuery(column_expr, storage_metadata_snapshot->getColumns(),
syntax_result->array_join_result_to_source, context);

auto column_description = storage_columns.get(column);
column_expr = addTypeConversionToAST(std::move(column_expr), column_description.type->getName(),
storage_metadata_snapshot->getColumns().getAll(), context);
column_expr = setAlias(column_expr, column);

auto type = sample_block.getByName(column).type;
aliases.push_back({ .name = column, .type = type, .expression = column_expr->clone() });
}
else
column_expr = std::make_shared<ASTIdentifier>(column);

required_columns_expr_list->children.emplace_back(std::move(column_expr));
}

syntax_result = TreeRewriter(context).analyze(
required_columns_expr_list, storage_columns.getAllPhysical(), storage, storage->getStorageSnapshot(storage_metadata_snapshot, context));

auto alias_actions = ExpressionAnalyzer(required_columns_expr_list, syntax_result, context).getActionsDAG(true);

column_names_as_aliases = alias_actions->getRequiredColumns().getNames();
if (column_names_as_aliases.empty())
column_names_as_aliases.push_back(ExpressionActions::getSmallestColumn(storage_metadata_snapshot->getColumns().getAllPhysical()).name);
}
}

auto source_pipeline = createSources(
nested_storage_snaphsot,
modified_query_info,
common_processed_stage,
required_max_block_size,
common_header,
aliases,
table,
column_names_as_aliases.empty() ? column_names : column_names_as_aliases,
column_names,
merge_storage_snapshot->getMetadataForQuery()->getSampleBlock(),
context,
current_streams);

Expand Down Expand Up @@ -616,22 +592,111 @@ SelectQueryInfo ReadFromMerge::getModifiedQueryInfo(const SelectQueryInfo & quer
return modified_query_info;
}

void ReadFromMerge::processAliases(
Names & real_column_names,
const StorageWithLockAndName & storage_with_lock,
Aliases & aliases,
const Block & sample_block,
ContextMutablePtr modified_context)
{
auto storage = std::get<1>(storage_with_lock);
auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, modified_context);

auto modified_query_info = getModifiedQueryInfo(query_info, context, storage_with_lock, nested_storage_snaphsot);
Names column_names_as_aliases;

if (!context->getSettingsRef().allow_experimental_analyzer)
{
auto storage_columns = storage_metadata_snapshot->getColumns();
auto syntax_result = TreeRewriter(context).analyzeSelect(
modified_query_info.query, TreeRewriterResult({}, storage, nested_storage_snaphsot));

bool with_aliases = common_processed_stage == QueryProcessingStage::FetchColumns && !storage_columns.getAliases().empty();
if (with_aliases)
{
ASTPtr required_columns_expr_list = std::make_shared<ASTExpressionList>();
ASTPtr column_expr;

for (const auto & column : real_column_names)
{
const auto column_default = storage_columns.getDefault(column);
bool is_alias = column_default && column_default->kind == ColumnDefaultKind::Alias;

if (is_alias)
{
column_expr = column_default->expression->clone();
replaceAliasColumnsInQuery(column_expr, storage_metadata_snapshot->getColumns(),
syntax_result->array_join_result_to_source, context);

const auto & column_description = storage_columns.get(column);
column_expr = addTypeConversionToAST(std::move(column_expr), column_description.type->getName(),
storage_metadata_snapshot->getColumns().getAll(), context);
column_expr = setAlias(column_expr, column);

/// use storage type for transient columns that are not represented in result
/// e.g. for columns that needed to evaluate row policy
auto type = sample_block.has(column) ? sample_block.getByName(column).type : column_description.type;

aliases.push_back({ .name = column, .type = type, .expression = column_expr->clone() });
}
else
column_expr = std::make_shared<ASTIdentifier>(column);

required_columns_expr_list->children.emplace_back(std::move(column_expr));
}

syntax_result = TreeRewriter(context).analyze(
required_columns_expr_list, storage_columns.getAllPhysical(), storage, storage->getStorageSnapshot(storage_metadata_snapshot, context));

auto alias_actions = ExpressionAnalyzer(required_columns_expr_list, syntax_result, context).getActionsDAG(true);

column_names_as_aliases = alias_actions->getRequiredColumns().getNames();

if (column_names_as_aliases.empty())
column_names_as_aliases.push_back(ExpressionActions::getSmallestColumn(storage_metadata_snapshot->getColumns().getAllPhysical()).name);
}
}
if (!column_names_as_aliases.empty())
{
real_column_names = column_names_as_aliases;
}
}


QueryPipelineBuilderPtr ReadFromMerge::createSources(
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & modified_query_info,
const QueryProcessingStage::Enum & processed_stage,
const UInt64 max_block_size,
QueryProcessingStage::Enum processed_stage,
UInt64 max_block_size,
const Block & header,
const Aliases & aliases,
const StorageWithLockAndName & storage_with_lock,
Names real_column_names,
const Block & sample_block,
ContextMutablePtr modified_context,
size_t streams_num,
bool concat_streams)
{
const auto & [database_name, storage, _, table_name] = storage_with_lock;
auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto storage_snapshot = storage->getStorageSnapshot(storage_metadata_snapshot, context);
auto modified_query_info = getModifiedQueryInfo(query_info, context, storage_with_lock, storage_snapshot);

auto & modified_select = modified_query_info.query->as<ASTSelectQuery &>();

std::unique_ptr<RowPolicyData> row_policy_data_ptr;

auto row_policy_filter_ptr = context->getRowPolicyFilter(
database_name,
table_name,
RowPolicyFilterType::SELECT_FILTER);
if (row_policy_filter_ptr)
{
row_policy_data_ptr = std::make_unique<RowPolicyData>(row_policy_filter_ptr, storage, context);
row_policy_data_ptr->extendNames(real_column_names);
}

Aliases aliases;
processAliases(real_column_names, storage_with_lock, aliases, sample_block, modified_context);

QueryPipelineBuilderPtr builder;
if (!InterpreterSelectQuery::isQueryWithFinal(modified_query_info) && storage->needRewriteQueryWithFinal(real_column_names))
{
Expand Down Expand Up @@ -688,6 +753,14 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(
if (!plan.isInitialized())
return {};

if (row_policy_data_ptr)
{
if (auto * source_step_with_filter = dynamic_cast<SourceStepWithFilter*>((plan.getRootNode()->step.get())))
{
row_policy_data_ptr->addStorageFilter(source_step_with_filter);
}
}

if (auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(plan.getRootNode()->step.get()))
{
size_t filters_dags_size = filter_dags.size();
Expand Down Expand Up @@ -783,12 +856,84 @@ QueryPipelineBuilderPtr ReadFromMerge::createSources(

/// Subordinary tables could have different but convertible types, like numeric types of different width.
/// We must return streams with structure equals to structure of Merge table.
convertingSourceStream(header, storage_snapshot->metadata, aliases, modified_context, *builder, processed_stage);
convertAndFilterSourceStream(header,
storage_snapshot->metadata,
aliases, std::move(row_policy_data_ptr),
modified_context,
*builder,
processed_stage);
}

return builder;
}

ReadFromMerge::RowPolicyData::RowPolicyData(RowPolicyFilterPtr row_policy_filter_ptr_,
std::shared_ptr<DB::IStorage> storage,
ContextPtr local_context)
: row_policy_filter_ptr(row_policy_filter_ptr_)
{
storage_metadata_snapshot = storage->getInMemoryMetadataPtr();
auto storage_columns = storage_metadata_snapshot->getColumns();
auto needed_columns = storage_columns.getAll/*Physical*/();

ASTPtr expr = row_policy_filter_ptr->expression;

auto syntax_result = TreeRewriter(local_context).analyze(expr,
needed_columns /*,
storage,
storage->getStorageSnapshot(storage_metadata_snapshot, local_context)*/);
auto expression_analyzer = ExpressionAnalyzer{expr, syntax_result, local_context};

actions_dag = expression_analyzer.getActionsDAG(false /* add_aliases */, false /* project_result */);
filter_actions = std::make_shared<ExpressionActions>(actions_dag,
ExpressionActionsSettings::fromContext(local_context, CompileExpressions::yes));
const auto & required_columns = filter_actions->getRequiredColumnsWithTypes();
const auto & sample_block_columns = filter_actions->getSampleBlock().getNamesAndTypesList();

NamesAndTypesList added, deleted;
sample_block_columns.getDifference(required_columns, added, deleted);
if (!deleted.empty() || added.size() != 1)
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot determine row level filter; {} columns deleted, {} columns added",
deleted.size(), added.size());
}

filter_column_name = added.getNames().front();
}

void ReadFromMerge::RowPolicyData::extendNames(Names & names)
{
boost::container::flat_set<std::string_view> names_set(names.begin(), names.end());
NameSet added_names;

for (const auto & req_column : filter_actions->getRequiredColumns())
{
if (!names_set.contains(req_column))
{
added_names.emplace(req_column);
}
}

if (!added_names.empty())
{
std::copy(added_names.begin(), added_names.end(), std::back_inserter(names));
}
}

void ReadFromMerge::RowPolicyData::addStorageFilter(SourceStepWithFilter * step)
{
step->addFilter(actions_dag, filter_column_name);
}

void ReadFromMerge::RowPolicyData::addFilterTransform(QueryPipelineBuilder & builder)
{
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<FilterTransform>(stream_header, filter_actions, filter_column_name, true /* remove filter column */);
});
}

StorageMerge::StorageListWithLocks StorageMerge::getSelectedTables(
ContextPtr query_context,
const ASTPtr & query /* = nullptr */,
Expand Down Expand Up @@ -959,13 +1104,14 @@ void StorageMerge::alter(
setInMemoryMetadata(storage_metadata);
}

void ReadFromMerge::convertingSourceStream(
void ReadFromMerge::convertAndFilterSourceStream(
const Block & header,
const StorageMetadataPtr & metadata_snapshot,
const Aliases & aliases,
std::unique_ptr<RowPolicyData> row_policy_data_ptr,
ContextPtr local_context,
QueryPipelineBuilder & builder,
const QueryProcessingStage::Enum & processed_stage)
QueryProcessingStage::Enum processed_stage)
{
Block before_block_header = builder.getHeader();

Expand Down Expand Up @@ -994,6 +1140,11 @@ void ReadFromMerge::convertingSourceStream(
if (local_context->getSettingsRef().allow_experimental_analyzer && processed_stage != QueryProcessingStage::FetchColumns)
convert_actions_match_columns_mode = ActionsDAG::MatchColumnsMode::Position;

if (row_policy_data_ptr)
{
row_policy_data_ptr->addFilterTransform(builder);
}

auto convert_actions_dag = ActionsDAG::makeConvertingActions(builder.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
convert_actions_match_columns_mode);
Expand Down
Loading

0 comments on commit 7f88020

Please sign in to comment.