Skip to content

Commit

Permalink
[fix](schema-change) Fix wrong intput column for cast validity check (#…
Browse files Browse the repository at this point in the history
…38894) (#39107)

## Proposed changes

1. Use column idx of ref block instead of new block to indicate the ref
column.
2. Rename some variables to clarify their meanings.
3. Clarify some log msg.
4. Add a minimal case to verify the change.
  • Loading branch information
TangSiyang2001 authored Aug 8, 2024
1 parent 21b6b86 commit f8f5be7
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 66 deletions.
4 changes: 2 additions & 2 deletions be/src/olap/column_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ struct ColumnMapping {
ColumnMapping() = default;
virtual ~ColumnMapping() = default;

bool has_reference() const { return expr != nullptr || ref_column >= 0; }
bool has_reference() const { return expr != nullptr || ref_column_idx >= 0; }

// <0: use default value
// >=0: use origin column
int32_t ref_column = -1;
int32_t ref_column_idx = -1;
// normally for default value. stores values for filters
WrapperField* default_value = nullptr;
std::shared_ptr<TExpr> expr;
Expand Down
149 changes: 87 additions & 62 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

#include "olap/schema_change.h"

#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
#include <thrift/protocol/TDebugProtocol.h>

#include <algorithm>
#include <exception>
#include <map>
Expand Down Expand Up @@ -279,51 +283,63 @@ Status BlockChanger::change_block(vectorized::Block* ref_block,
vectorized::VExprContext::filter_block(ctx.get(), ref_block, ref_block->columns()));
}

const int row_size = ref_block->rows();
const int column_size = new_block->columns();
const int row_num = ref_block->rows();
const int new_schema_cols_num = new_block->columns();

// swap ref_block[key] and new_block[value]
// will be used for swaping ref_block[entry.first] and new_block[entry.second]
std::list<std::pair<int, int>> swap_idx_list;
for (int idx = 0; idx < column_size; idx++) {
if (_schema_mapping[idx].expr != nullptr) {
for (int idx = 0; idx < new_schema_cols_num; idx++) {
auto expr = _schema_mapping[idx].expr;
if (expr != nullptr) {
vectorized::VExprContextSPtr ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*_schema_mapping[idx].expr, ctx));
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*expr, ctx));
RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc));
RETURN_IF_ERROR(ctx->open(state.get()));

int result_column_id = -1;
RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id));
if (ref_block->get_by_position(result_column_id).column == nullptr) {
int result_tmp_column_idx = -1;
RETURN_IF_ERROR(ctx->execute(ref_block, &result_tmp_column_idx));
auto& result_tmp_column_def = ref_block->get_by_position(result_tmp_column_idx);
if (result_tmp_column_def.column == nullptr) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"{} result column is nullptr",
ref_block->get_by_position(result_column_id).name);
"result column={} is nullptr, input expr={}", result_tmp_column_def.name,
apache::thrift::ThriftDebugString(*expr));
}
ref_block->replace_by_position_if_const(result_column_id);
ref_block->replace_by_position_if_const(result_tmp_column_idx);

if (ref_block->get_by_position(result_column_id).column->size() != row_size) {
if (result_tmp_column_def.column->size() != row_num) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"{} size invalid, expect={}, real={}", new_block->get_by_position(idx).name,
row_size, ref_block->get_by_position(result_column_id).column->size());
"result size invalid, expect={}, real={}; input expr={}", row_num,
result_tmp_column_def.column->size(),
apache::thrift::ThriftDebugString(*expr));
}

if (_type == SCHEMA_CHANGE) {
// danger casts (expected to be rejected by upstream caller) may cause data to be null and result in data loss in schema change
// for rollup, this check is unecessary, and ref columns are not set in this case, it works on exprs

// column_idx in base schema
int32_t ref_column_idx = _schema_mapping[idx].ref_column_idx;
DCHECK_GE(ref_column_idx, 0);
auto& ref_column_def = ref_block->get_by_position(ref_column_idx);
RETURN_IF_ERROR(
_check_cast_valid(ref_column_def.column, result_tmp_column_def.column));
}
RETURN_IF_ERROR(_check_cast_valid(ref_block->get_by_position(idx).column,
ref_block->get_by_position(result_column_id).column,
_type));
swap_idx_list.emplace_back(result_column_id, idx);
} else if (_schema_mapping[idx].ref_column < 0) {
swap_idx_list.emplace_back(result_tmp_column_idx, idx);
} else if (_schema_mapping[idx].ref_column_idx < 0) {
// new column, write default value
auto* value = _schema_mapping[idx].default_value;
auto column = new_block->get_by_position(idx).column->assume_mutable();
if (value->is_null()) {
DCHECK(column->is_nullable());
column->insert_many_defaults(row_size);
column->insert_many_defaults(row_num);
} else {
auto type_info = get_type_info(_schema_mapping[idx].new_column);
DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
value->ptr(), column, row_size);
value->ptr(), column, row_num);
}
} else {
// same type, just swap column
swap_idx_list.emplace_back(_schema_mapping[idx].ref_column, idx);
swap_idx_list.emplace_back(_schema_mapping[idx].ref_column_idx, idx);
}
}

Expand Down Expand Up @@ -361,81 +377,90 @@ Status BlockChanger::change_block(vectorized::Block* ref_block,
return Status::OK();
}

// This check is to prevent schema-change from causing data loss
Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
vectorized::ColumnPtr new_column,
AlterTabletType type) const {
if (ref_column->size() != new_column->size()) {
// This check can prevent schema-change from causing data loss after type cast
Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr input_column,
vectorized::ColumnPtr output_column) {
if (input_column->size() != output_column->size()) {
return Status::InternalError(
"column size is changed, ref_column_size={}, new_column_size={}",
ref_column->size(), new_column->size());
}
if (type == ROLLUP) {
return Status::OK();
"column size is changed, input_column_size={}, output_column_size={}; "
"input_column={}",
input_column->size(), output_column->size(), input_column->get_name());
}
if (ref_column->is_nullable() != new_column->is_nullable()) {
if (ref_column->is_nullable()) {
DCHECK_EQ(input_column->size(), output_column->size())
<< "length check should have done before calling this function!";

if (input_column->is_nullable() != output_column->is_nullable()) {
if (input_column->is_nullable()) {
const auto* ref_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column)
->get_null_map_column()
.get_data()
.data();

bool is_changed = false;
for (size_t i = 0; i < ref_column->size(); i++) {
for (size_t i = 0; i < input_column->size(); i++) {
is_changed |= ref_null_map[i];
}
if (is_changed) {
return Status::DataQualityError("Null data is changed to not nullable");
return Status::DataQualityError(
"some null data is changed to not null, intput_column={}",
input_column->get_name());
}
} else {
const auto& null_map_column =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column)
->get_null_map_column();
const auto& nested_column =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column)
->get_nested_column();
const auto* new_null_map = null_map_column.get_data().data();

if (null_map_column.size() != new_column->size() ||
nested_column.size() != new_column->size()) {
DCHECK(false) << "null_map_column_size=" << null_map_column.size()
<< " new_column_size=" << new_column->size()
<< " nested_column_size=" << nested_column.size();
if (null_map_column.size() != output_column->size()) {
return Status::InternalError(
"null_map_column size is changed, null_map_column_size={}, "
"new_column_size={}",
null_map_column.size(), new_column->size());
"null_map_column size mismatch output_column_size, "
"null_map_column_size={}, output_column_size={}; input_column={}",
null_map_column.size(), output_column->size(), input_column->get_name());
}

if (nested_column.size() != output_column->size()) {
return Status::InternalError(
"nested_column size is changed, nested_column_size={}, "
"ouput_column_size={}; input_column={}",
nested_column.size(), output_column->size(), input_column->get_name());
}

bool is_changed = false;
for (size_t i = 0; i < ref_column->size(); i++) {
for (size_t i = 0; i < input_column->size(); i++) {
is_changed |= new_null_map[i];
}
if (is_changed) {
return Status::DataQualityError("Some data is changed to null");
return Status::DataQualityError(
"some not null data is changed to null, intput_column={}",
input_column->get_name());
}
}
}

if (ref_column->is_nullable() && new_column->is_nullable()) {
if (input_column->is_nullable() && output_column->is_nullable()) {
const auto* ref_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column)
->get_null_map_column()
.get_data()
.data();
const auto* new_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column)
->get_null_map_column()
.get_data()
.data();

bool is_changed = false;
for (size_t i = 0; i < ref_column->size(); i++) {
for (size_t i = 0; i < input_column->size(); i++) {
is_changed |= (ref_null_map[i] != new_null_map[i]);
}
if (is_changed) {
return Status::DataQualityError("is_null of data is changed!");
return Status::DataQualityError(
"null map is changed after calculation, input_column={}",
input_column->get_name());
}
}
return Status::OK();
Expand Down Expand Up @@ -1197,6 +1222,8 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
column_mapping->new_column = &new_column;

column_mapping->ref_column_idx = base_tablet_schema->field_index(new_column.name());

if (materialized_function_map.find(column_name_lower) != materialized_function_map.end()) {
auto mv_param = materialized_function_map.find(column_name_lower)->second;
column_mapping->expr = mv_param.expr;
Expand All @@ -1205,9 +1232,7 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
}
}

int32_t column_index = base_tablet_schema->field_index(new_column.name());
if (column_index >= 0) {
column_mapping->ref_column = column_index;
if (column_mapping->ref_column_idx >= 0) {
continue;
}

Expand All @@ -1230,7 +1255,7 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
return Status::InternalError("failed due to operate on shadow column");
}
// Newly added column go here
column_mapping->ref_column = -1;
column_mapping->ref_column_idx = -1;

if (i < base_tablet_schema->num_short_key_columns()) {
*sc_directly = true;
Expand Down Expand Up @@ -1259,7 +1284,7 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
continue;
}

if (column_mapping->ref_column != i - num_default_value) {
if (column_mapping->ref_column_idx != i - num_default_value) {
*sc_sorting = true;
return Status::OK();
}
Expand Down Expand Up @@ -1316,9 +1341,9 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
if (column_mapping->expr != nullptr) {
*sc_directly = true;
return Status::OK();
} else if (column_mapping->ref_column >= 0) {
} else if (column_mapping->ref_column_idx >= 0) {
const auto& column_new = new_tablet_schema->column(i);
const auto& column_old = base_tablet_schema->column(column_mapping->ref_column);
const auto& column_old = base_tablet_schema->column(column_mapping->ref_column_idx);
// index changed
if (column_new.is_bf_column() != column_old.is_bf_column() ||
column_new.has_bitmap_index() != column_old.has_bitmap_index()) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ class BlockChanger {
bool has_where() const { return _where_expr != nullptr; }

private:
Status _check_cast_valid(vectorized::ColumnPtr ref_column, vectorized::ColumnPtr new_column,
AlterTabletType type) const;
static Status _check_cast_valid(vectorized::ColumnPtr ref_column,
vectorized::ColumnPtr new_column);

// @brief column-mapping specification of new schema
SchemaMapping _schema_mapping;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_move_column_with_cast") {
def tableName = "test_move_column_with_cast"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
k BIGINT,
v SMALLINT NOT NULL
) DUPLICATE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 4
properties("replication_num" = "1");
"""

sql """ INSERT INTO ${tableName} VALUES(1, 1); """
sql """ ALTER TABLE ${tableName} ADD COLUMN t2 DATETIME DEFAULT NULL; """
sql """ ALTER TABLE ${tableName} MODIFY COLUMN v BIGINT AFTER t2; """

waitForSchemaChangeDone {
sql """SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1"""
time 600
}
}

0 comments on commit f8f5be7

Please sign in to comment.