Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](schema-change) Fix wrong intput column for cast validity check (#38894) #39107

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/olap/column_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ struct ColumnMapping {
ColumnMapping() = default;
virtual ~ColumnMapping() = default;

bool has_reference() const { return expr != nullptr || ref_column >= 0; }
bool has_reference() const { return expr != nullptr || ref_column_idx >= 0; }

// <0: use default value
// >=0: use origin column
int32_t ref_column = -1;
int32_t ref_column_idx = -1;
// normally for default value. stores values for filters
WrapperField* default_value = nullptr;
std::shared_ptr<TExpr> expr;
Expand Down
149 changes: 87 additions & 62 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

#include "olap/schema_change.h"

#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
#include <thrift/protocol/TDebugProtocol.h>

#include <algorithm>
#include <exception>
#include <map>
Expand Down Expand Up @@ -279,51 +283,63 @@ Status BlockChanger::change_block(vectorized::Block* ref_block,
vectorized::VExprContext::filter_block(ctx.get(), ref_block, ref_block->columns()));
}

const int row_size = ref_block->rows();
const int column_size = new_block->columns();
const int row_num = ref_block->rows();
const int new_schema_cols_num = new_block->columns();

// swap ref_block[key] and new_block[value]
// will be used for swaping ref_block[entry.first] and new_block[entry.second]
std::list<std::pair<int, int>> swap_idx_list;
for (int idx = 0; idx < column_size; idx++) {
if (_schema_mapping[idx].expr != nullptr) {
for (int idx = 0; idx < new_schema_cols_num; idx++) {
auto expr = _schema_mapping[idx].expr;
if (expr != nullptr) {
vectorized::VExprContextSPtr ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*_schema_mapping[idx].expr, ctx));
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*expr, ctx));
RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc));
RETURN_IF_ERROR(ctx->open(state.get()));

int result_column_id = -1;
RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id));
if (ref_block->get_by_position(result_column_id).column == nullptr) {
int result_tmp_column_idx = -1;
RETURN_IF_ERROR(ctx->execute(ref_block, &result_tmp_column_idx));
auto& result_tmp_column_def = ref_block->get_by_position(result_tmp_column_idx);
if (result_tmp_column_def.column == nullptr) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"{} result column is nullptr",
ref_block->get_by_position(result_column_id).name);
"result column={} is nullptr, input expr={}", result_tmp_column_def.name,
apache::thrift::ThriftDebugString(*expr));
}
ref_block->replace_by_position_if_const(result_column_id);
ref_block->replace_by_position_if_const(result_tmp_column_idx);

if (ref_block->get_by_position(result_column_id).column->size() != row_size) {
if (result_tmp_column_def.column->size() != row_num) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"{} size invalid, expect={}, real={}", new_block->get_by_position(idx).name,
row_size, ref_block->get_by_position(result_column_id).column->size());
"result size invalid, expect={}, real={}; input expr={}", row_num,
result_tmp_column_def.column->size(),
apache::thrift::ThriftDebugString(*expr));
}

if (_type == SCHEMA_CHANGE) {
// danger casts (expected to be rejected by upstream caller) may cause data to be null and result in data loss in schema change
// for rollup, this check is unecessary, and ref columns are not set in this case, it works on exprs

// column_idx in base schema
int32_t ref_column_idx = _schema_mapping[idx].ref_column_idx;
DCHECK_GE(ref_column_idx, 0);
auto& ref_column_def = ref_block->get_by_position(ref_column_idx);
RETURN_IF_ERROR(
_check_cast_valid(ref_column_def.column, result_tmp_column_def.column));
}
RETURN_IF_ERROR(_check_cast_valid(ref_block->get_by_position(idx).column,
ref_block->get_by_position(result_column_id).column,
_type));
swap_idx_list.emplace_back(result_column_id, idx);
} else if (_schema_mapping[idx].ref_column < 0) {
swap_idx_list.emplace_back(result_tmp_column_idx, idx);
} else if (_schema_mapping[idx].ref_column_idx < 0) {
// new column, write default value
auto* value = _schema_mapping[idx].default_value;
auto column = new_block->get_by_position(idx).column->assume_mutable();
if (value->is_null()) {
DCHECK(column->is_nullable());
column->insert_many_defaults(row_size);
column->insert_many_defaults(row_num);
} else {
auto type_info = get_type_info(_schema_mapping[idx].new_column);
DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
value->ptr(), column, row_size);
value->ptr(), column, row_num);
}
} else {
// same type, just swap column
swap_idx_list.emplace_back(_schema_mapping[idx].ref_column, idx);
swap_idx_list.emplace_back(_schema_mapping[idx].ref_column_idx, idx);
}
}

Expand Down Expand Up @@ -361,81 +377,90 @@ Status BlockChanger::change_block(vectorized::Block* ref_block,
return Status::OK();
}

// This check is to prevent schema-change from causing data loss
Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
vectorized::ColumnPtr new_column,
AlterTabletType type) const {
if (ref_column->size() != new_column->size()) {
// This check can prevent schema-change from causing data loss after type cast
Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr input_column,
TangSiyang2001 marked this conversation as resolved.
Show resolved Hide resolved
vectorized::ColumnPtr output_column) {
if (input_column->size() != output_column->size()) {
return Status::InternalError(
"column size is changed, ref_column_size={}, new_column_size={}",
ref_column->size(), new_column->size());
}
if (type == ROLLUP) {
return Status::OK();
"column size is changed, input_column_size={}, output_column_size={}; "
"input_column={}",
input_column->size(), output_column->size(), input_column->get_name());
}
if (ref_column->is_nullable() != new_column->is_nullable()) {
if (ref_column->is_nullable()) {
DCHECK_EQ(input_column->size(), output_column->size())
<< "length check should have done before calling this function!";

if (input_column->is_nullable() != output_column->is_nullable()) {
if (input_column->is_nullable()) {
const auto* ref_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column)
->get_null_map_column()
.get_data()
.data();

bool is_changed = false;
for (size_t i = 0; i < ref_column->size(); i++) {
for (size_t i = 0; i < input_column->size(); i++) {
is_changed |= ref_null_map[i];
}
if (is_changed) {
return Status::DataQualityError("Null data is changed to not nullable");
return Status::DataQualityError(
"some null data is changed to not null, intput_column={}",
input_column->get_name());
}
} else {
const auto& null_map_column =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column)
->get_null_map_column();
const auto& nested_column =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column)
->get_nested_column();
const auto* new_null_map = null_map_column.get_data().data();

if (null_map_column.size() != new_column->size() ||
nested_column.size() != new_column->size()) {
DCHECK(false) << "null_map_column_size=" << null_map_column.size()
<< " new_column_size=" << new_column->size()
<< " nested_column_size=" << nested_column.size();
if (null_map_column.size() != output_column->size()) {
return Status::InternalError(
"null_map_column size is changed, null_map_column_size={}, "
"new_column_size={}",
null_map_column.size(), new_column->size());
"null_map_column size mismatch output_column_size, "
"null_map_column_size={}, output_column_size={}; input_column={}",
null_map_column.size(), output_column->size(), input_column->get_name());
}

if (nested_column.size() != output_column->size()) {
return Status::InternalError(
"nested_column size is changed, nested_column_size={}, "
"ouput_column_size={}; input_column={}",
nested_column.size(), output_column->size(), input_column->get_name());
}

bool is_changed = false;
for (size_t i = 0; i < ref_column->size(); i++) {
for (size_t i = 0; i < input_column->size(); i++) {
is_changed |= new_null_map[i];
}
if (is_changed) {
return Status::DataQualityError("Some data is changed to null");
return Status::DataQualityError(
"some not null data is changed to null, intput_column={}",
input_column->get_name());
}
}
}

if (ref_column->is_nullable() && new_column->is_nullable()) {
if (input_column->is_nullable() && output_column->is_nullable()) {
const auto* ref_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column)
->get_null_map_column()
.get_data()
.data();
const auto* new_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column)
->get_null_map_column()
.get_data()
.data();

bool is_changed = false;
for (size_t i = 0; i < ref_column->size(); i++) {
for (size_t i = 0; i < input_column->size(); i++) {
is_changed |= (ref_null_map[i] != new_null_map[i]);
}
if (is_changed) {
return Status::DataQualityError("is_null of data is changed!");
return Status::DataQualityError(
"null map is changed after calculation, input_column={}",
input_column->get_name());
}
}
return Status::OK();
Expand Down Expand Up @@ -1197,6 +1222,8 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
column_mapping->new_column = &new_column;

column_mapping->ref_column_idx = base_tablet_schema->field_index(new_column.name());

if (materialized_function_map.find(column_name_lower) != materialized_function_map.end()) {
auto mv_param = materialized_function_map.find(column_name_lower)->second;
column_mapping->expr = mv_param.expr;
Expand All @@ -1205,9 +1232,7 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
}
}

int32_t column_index = base_tablet_schema->field_index(new_column.name());
if (column_index >= 0) {
column_mapping->ref_column = column_index;
if (column_mapping->ref_column_idx >= 0) {
continue;
}

Expand All @@ -1230,7 +1255,7 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
return Status::InternalError("failed due to operate on shadow column");
}
// Newly added column go here
column_mapping->ref_column = -1;
column_mapping->ref_column_idx = -1;

if (i < base_tablet_schema->num_short_key_columns()) {
*sc_directly = true;
Expand Down Expand Up @@ -1259,7 +1284,7 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
continue;
}

if (column_mapping->ref_column != i - num_default_value) {
if (column_mapping->ref_column_idx != i - num_default_value) {
*sc_sorting = true;
return Status::OK();
}
Expand Down Expand Up @@ -1316,9 +1341,9 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
if (column_mapping->expr != nullptr) {
*sc_directly = true;
return Status::OK();
} else if (column_mapping->ref_column >= 0) {
} else if (column_mapping->ref_column_idx >= 0) {
const auto& column_new = new_tablet_schema->column(i);
const auto& column_old = base_tablet_schema->column(column_mapping->ref_column);
const auto& column_old = base_tablet_schema->column(column_mapping->ref_column_idx);
// index changed
if (column_new.is_bf_column() != column_old.is_bf_column() ||
column_new.has_bitmap_index() != column_old.has_bitmap_index()) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ class BlockChanger {
bool has_where() const { return _where_expr != nullptr; }

private:
Status _check_cast_valid(vectorized::ColumnPtr ref_column, vectorized::ColumnPtr new_column,
AlterTabletType type) const;
static Status _check_cast_valid(vectorized::ColumnPtr ref_column,
vectorized::ColumnPtr new_column);

// @brief column-mapping specification of new schema
SchemaMapping _schema_mapping;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_move_column_with_cast") {
def tableName = "test_move_column_with_cast"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
k BIGINT,
v SMALLINT NOT NULL
) DUPLICATE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 4
properties("replication_num" = "1");
"""

sql """ INSERT INTO ${tableName} VALUES(1, 1); """
sql """ ALTER TABLE ${tableName} ADD COLUMN t2 DATETIME DEFAULT NULL; """
sql """ ALTER TABLE ${tableName} MODIFY COLUMN v BIGINT AFTER t2; """

waitForSchemaChangeDone {
sql """SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1"""
time 600
}
}
Loading