[fix](delete) fix potential delete fail after adding columns (#25817)
This commit is contained in:
@ -249,6 +249,27 @@ Status DeleteHandler::check_condition_valid(const TabletSchema& schema, const TC
|
||||
}
|
||||
}
|
||||
|
||||
if (!cond.__isset.column_unique_id) {
|
||||
LOG(WARNING) << "column=" << cond.column_name
|
||||
<< " in predicate does not have uid, table id=" << schema.table_id();
|
||||
// TODO(tsy): make it fail here after FE forbidding hard-link-schema-change
|
||||
return Status::OK();
|
||||
}
|
||||
if (schema.field_index(cond.column_unique_id) == -1) {
|
||||
const auto& err_msg =
|
||||
fmt::format("column id does not exists in table={}, schema version={},",
|
||||
schema.table_id(), schema.schema_version());
|
||||
return Status::Error<DELETE_INVALID_CONDITION>(err_msg);
|
||||
}
|
||||
if (!iequal(schema.column_by_uid(cond.column_unique_id).name(), cond.column_name)) {
|
||||
const auto& err_msg = fmt::format(
|
||||
"colum name={} does not belongs to column uid={}, which column name={}, "
|
||||
"delete_cond.column_name ={}",
|
||||
cond.column_name, cond.column_unique_id,
|
||||
schema.column_by_uid(cond.column_unique_id).name(), cond.column_name);
|
||||
return Status::Error<DELETE_INVALID_CONDITION>(err_msg);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -316,8 +337,12 @@ Status DeleteHandler::_parse_column_pred(TabletSchemaSPtr complete_schema,
|
||||
for (const auto& sub_predicate : sub_pred_list) {
|
||||
TCondition condition;
|
||||
RETURN_IF_ERROR(parse_condition(sub_predicate, &condition));
|
||||
int32_t col_unique_id =
|
||||
delete_pred_related_schema->column(condition.column_name).unique_id();
|
||||
int32_t col_unique_id;
|
||||
if constexpr (std::is_same_v<SubPredType, DeletePredicatePB>) {
|
||||
col_unique_id = sub_predicate.col_unique_id;
|
||||
} else {
|
||||
col_unique_id = delete_pred_related_schema->column(condition.column_name).unique_id();
|
||||
}
|
||||
condition.__set_column_unique_id(col_unique_id);
|
||||
const auto& column = complete_schema->column_by_uid(col_unique_id);
|
||||
uint32_t index = complete_schema->field_index(col_unique_id);
|
||||
@ -345,7 +370,7 @@ Status DeleteHandler::init(TabletSchemaSPtr tablet_schema,
|
||||
bool with_sub_pred_v2) {
|
||||
DCHECK(!_is_inited) << "reinitialize delete handler.";
|
||||
DCHECK(version >= 0) << "invalid parameters. version=" << version;
|
||||
_predicate_arena.reset(new vectorized::Arena());
|
||||
_predicate_arena = std::make_unique<vectorized::Arena>();
|
||||
|
||||
for (const auto& delete_pred : delete_preds) {
|
||||
// Skip the delete condition with large version
|
||||
@ -354,7 +379,7 @@ Status DeleteHandler::init(TabletSchemaSPtr tablet_schema,
|
||||
}
|
||||
// Need the tablet schema at the delete condition to parse the accurate column
|
||||
const auto& delete_pred_related_schema = delete_pred->tablet_schema();
|
||||
auto& delete_condition = delete_pred->delete_predicate();
|
||||
const auto& delete_condition = delete_pred->delete_predicate();
|
||||
DeleteConditions temp;
|
||||
temp.filter_version = delete_pred->version().first;
|
||||
if (with_sub_pred_v2) {
|
||||
@ -368,8 +393,8 @@ Status DeleteHandler::init(TabletSchemaSPtr tablet_schema,
|
||||
for (const auto& in_predicate : delete_condition.in_predicates()) {
|
||||
TCondition condition;
|
||||
condition.__set_column_name(in_predicate.column_name());
|
||||
condition.__set_column_unique_id(
|
||||
delete_pred_related_schema->column(condition.column_name).unique_id());
|
||||
auto col_unique_id = in_predicate.column_unique_id();
|
||||
condition.__set_column_unique_id(col_unique_id);
|
||||
|
||||
if (in_predicate.is_not_in()) {
|
||||
condition.__set_condition_op("!*=");
|
||||
@ -379,8 +404,6 @@ Status DeleteHandler::init(TabletSchemaSPtr tablet_schema,
|
||||
for (const auto& value : in_predicate.values()) {
|
||||
condition.condition_values.push_back(value);
|
||||
}
|
||||
int32_t col_unique_id =
|
||||
delete_pred_related_schema->column(condition.column_name).unique_id();
|
||||
const auto& column = tablet_schema->column_by_uid(col_unique_id);
|
||||
uint32_t index = tablet_schema->field_index(col_unique_id);
|
||||
temp.column_predicate_vec.push_back(
|
||||
@ -401,7 +424,7 @@ void DeleteHandler::finalize() {
|
||||
}
|
||||
|
||||
for (auto& cond : _del_conds) {
|
||||
for (auto pred : cond.column_predicate_vec) {
|
||||
for (const auto* pred : cond.column_predicate_vec) {
|
||||
delete pred;
|
||||
}
|
||||
}
|
||||
@ -414,12 +437,12 @@ void DeleteHandler::get_delete_conditions_after_version(
|
||||
int64_t version, AndBlockColumnPredicate* and_block_column_predicate_ptr,
|
||||
std::unordered_map<int32_t, std::vector<const ColumnPredicate*>>*
|
||||
del_predicates_for_zone_map) const {
|
||||
for (auto& del_cond : _del_conds) {
|
||||
for (const auto& del_cond : _del_conds) {
|
||||
if (del_cond.filter_version > version) {
|
||||
// now, only query support delete column predicate operator
|
||||
if (!del_cond.column_predicate_vec.empty()) {
|
||||
if (del_cond.column_predicate_vec.size() == 1) {
|
||||
auto single_column_block_predicate =
|
||||
auto* single_column_block_predicate =
|
||||
new SingleColumnBlockPredicate(del_cond.column_predicate_vec[0]);
|
||||
and_block_column_predicate_ptr->add_column_predicate(
|
||||
single_column_block_predicate);
|
||||
@ -432,7 +455,7 @@ void DeleteHandler::get_delete_conditions_after_version(
|
||||
(*del_predicates_for_zone_map)[del_cond.column_predicate_vec[0]->column_id()]
|
||||
.push_back(del_cond.column_predicate_vec[0]);
|
||||
} else {
|
||||
auto or_column_predicate = new OrBlockColumnPredicate();
|
||||
auto* or_column_predicate = new OrBlockColumnPredicate();
|
||||
|
||||
// build or_column_predicate
|
||||
// when delete from where a = 1 and b = 2, we can not use del_predicates_for_zone_map to filter zone page,
|
||||
|
||||
@ -102,7 +102,7 @@ public:
|
||||
// * Status::Error<DELETE_INVALID_PARAMETERS>(): input parameters are not valid
|
||||
// * Status::Error<MEM_ALLOC_FAILED>(): alloc memory failed
|
||||
Status init(TabletSchemaSPtr tablet_schema,
|
||||
const std::vector<RowsetMetaSharedPtr>& delete_conditions, int64_t version,
|
||||
const std::vector<RowsetMetaSharedPtr>& delete_preds, int64_t version,
|
||||
bool with_sub_pred_v2 = false);
|
||||
|
||||
[[nodiscard]] bool empty() const { return _del_conds.empty(); }
|
||||
|
||||
@ -141,31 +141,6 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
|
||||
DeletePredicatePB del_pred;
|
||||
TabletSchema tablet_schema;
|
||||
tablet_schema.copy_from(*tablet->tablet_schema());
|
||||
for (const auto& delete_cond : request.delete_conditions) {
|
||||
if (!delete_cond.__isset.column_unique_id) {
|
||||
LOG(WARNING) << "column=" << delete_cond.column_name
|
||||
<< " in predicate does not have uid, table id="
|
||||
<< tablet_schema.table_id();
|
||||
// TODO(tsy): make it fail here after FE forbidding hard-link-schema-change
|
||||
continue;
|
||||
}
|
||||
if (tablet_schema.field_index(delete_cond.column_unique_id) == -1) {
|
||||
const auto& err_msg =
|
||||
fmt::format("column id={} does not exists, table id={}",
|
||||
delete_cond.column_unique_id, tablet_schema.table_id());
|
||||
return Status::Aborted(err_msg);
|
||||
}
|
||||
if (!iequal(tablet_schema.column_by_uid(delete_cond.column_unique_id).name(),
|
||||
delete_cond.column_name)) {
|
||||
const auto& err_msg = fmt::format(
|
||||
"colum name={} does not belongs to column uid={}, which column name={}, "
|
||||
"delete_cond.column_name ={}",
|
||||
delete_cond.column_name, delete_cond.column_unique_id,
|
||||
tablet_schema.column_by_uid(delete_cond.column_unique_id).name(),
|
||||
delete_cond.column_name);
|
||||
return Status::Aborted(err_msg);
|
||||
}
|
||||
}
|
||||
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
|
||||
tablet_schema.clear_columns();
|
||||
for (const auto& column_desc : request.columns_desc) {
|
||||
|
||||
@ -761,6 +761,7 @@ void TabletSchema::copy_from(const TabletSchema& tablet_schema) {
|
||||
TabletSchemaPB tablet_schema_pb;
|
||||
tablet_schema.to_schema_pb(&tablet_schema_pb);
|
||||
init_from_pb(tablet_schema_pb);
|
||||
_table_id = tablet_schema.table_id();
|
||||
}
|
||||
|
||||
std::string TabletSchema::to_key() const {
|
||||
|
||||
Reference in New Issue
Block a user