[fix](schema_change) remove shadow prefix of schema for tablesink (#18822)
LSC updates tablet's schema in writing. Be optimized adding columns via linked schema change and it distinguishes adding by comparing column name. e.g. if new column's name is not found in old schema, then it is a newly-add column. When a table is under schema-changing, it adds __doris_shadow_ prefix in name of columns in shadow index. Then writes during schema-changing would bring schema with __doris_shadow_ to be. If schema change request arrives at be after writes, then be do it as a add-column schema change due to __doris_shadow_ is not in base tablet.
This commit is contained in:
@ -1704,7 +1704,7 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
|
||||
for (int i = 0, new_schema_size = new_tablet->tablet_schema()->num_columns();
|
||||
i < new_schema_size; ++i) {
|
||||
const TabletColumn& new_column = new_tablet->tablet_schema()->column(i);
|
||||
const string& column_name = new_column.name();
|
||||
const std::string& column_name = new_column.name();
|
||||
ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
|
||||
column_mapping->new_column = &new_column;
|
||||
|
||||
@ -1729,6 +1729,11 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
|
||||
continue;
|
||||
}
|
||||
|
||||
if (column_name.find("__doris_shadow_") == 0) {
|
||||
// Should delete in the future, just a protection for bug.
|
||||
LOG(INFO) << "a shadow column is encountered " << column_name;
|
||||
return Status::InternalError("failed due to operate on shadow column");
|
||||
}
|
||||
// Newly added column go here
|
||||
column_mapping->ref_column = -1;
|
||||
|
||||
@ -1738,8 +1743,9 @@ Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
|
||||
RETURN_IF_ERROR(
|
||||
_init_column_mapping(column_mapping, new_column, new_column.default_value()));
|
||||
|
||||
VLOG_TRACE << "A column with default value will be added after schema changing. "
|
||||
<< "column=" << column_name << ", default_value=" << new_column.default_value();
|
||||
LOG(INFO) << "A column with default value will be added after schema changing. "
|
||||
<< "column=" << column_name << ", default_value=" << new_column.default_value()
|
||||
<< " to table " << new_tablet->get_table_id();
|
||||
}
|
||||
|
||||
if (materialized_function_map.count(WHERE_SIGN)) {
|
||||
|
||||
@ -483,6 +483,8 @@ public:
|
||||
return config::max_tablet_io_errors > 0 && _io_error_times >= config::max_tablet_io_errors;
|
||||
}
|
||||
|
||||
int64_t get_table_id() { return _tablet_meta->table_id(); }
|
||||
|
||||
private:
|
||||
Status _init_once_action();
|
||||
void _print_missed_versions(const std::vector<Version>& missed_versions) const;
|
||||
|
||||
@ -698,6 +698,7 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
*/
|
||||
modColumn.setName(SHADOW_NAME_PREFIX + modColumn.getName());
|
||||
}
|
||||
LOG.info("modify column {} ", modColumn);
|
||||
return lightSchemaChange;
|
||||
}
|
||||
|
||||
|
||||
@ -270,6 +270,10 @@ public class Column implements Writable, GsonPostProcessable {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
public String getNonShadowName() {
|
||||
return removeNamePrefix(name);
|
||||
}
|
||||
|
||||
public String getNameWithoutMvPrefix() {
|
||||
return CreateMaterializedViewStmt.mvColumnBreaker(name);
|
||||
}
|
||||
|
||||
@ -221,7 +221,7 @@ public class OlapTableSink extends DataSink {
|
||||
List<String> columns = Lists.newArrayList();
|
||||
List<TColumn> columnsDesc = Lists.newArrayList();
|
||||
List<TOlapTableIndex> indexDesc = Lists.newArrayList();
|
||||
columns.addAll(indexMeta.getSchema().stream().map(Column::getName).collect(Collectors.toList()));
|
||||
columns.addAll(indexMeta.getSchema().stream().map(Column::getNonShadowName).collect(Collectors.toList()));
|
||||
for (Column column : indexMeta.getSchema()) {
|
||||
TColumn tColumn = column.toThrift();
|
||||
column.setIndexFlag(tColumn, table);
|
||||
|
||||
@ -21,6 +21,7 @@ suite("test_agg_keys_schema_change_decimalv3") {
|
||||
def tbName = "test_agg_keys_schema_change_decimalv3"
|
||||
def getJobState = { tableName ->
|
||||
def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
|
||||
logger.info(jobStateResult.toString());
|
||||
return jobStateResult[0][9]
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user