[enhancement](delete) use column id in delete push task instead of column name (#24549)
This commit is contained in:
@ -17,6 +17,7 @@
|
||||
|
||||
#include "olap/push_handler.h"
|
||||
|
||||
#include <fmt/core.h>
|
||||
#include <gen_cpp/AgentService_types.h>
|
||||
#include <gen_cpp/Descriptors_types.h>
|
||||
#include <gen_cpp/MasterService_types.h>
|
||||
@ -25,6 +26,7 @@
|
||||
#include <gen_cpp/Types_types.h>
|
||||
#include <gen_cpp/olap_file.pb.h>
|
||||
#include <gen_cpp/types.pb.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <iostream>
|
||||
@ -138,6 +140,33 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
|
||||
DeletePredicatePB del_pred;
|
||||
TabletSchema tablet_schema;
|
||||
tablet_schema.copy_from(*tablet->tablet_schema());
|
||||
for (const auto& delete_cond : request.delete_conditions) {
|
||||
if (!delete_cond.__isset.column_unique_id) {
|
||||
LOG(WARNING) << "column=" << delete_cond.column_name
|
||||
<< " in predicate does not have uid, table id="
|
||||
<< tablet_schema.table_id();
|
||||
// TODO(tsy): make it fail here after FE forbidding hard-link-schema-change
|
||||
continue;
|
||||
}
|
||||
if (tablet_schema.field_index(delete_cond.column_unique_id) == -1) {
|
||||
const auto& err_msg =
|
||||
fmt::format("column id={} does not exists, table id={}",
|
||||
delete_cond.column_unique_id, tablet_schema.table_id());
|
||||
LOG(WARNING) << err_msg;
|
||||
DCHECK(false);
|
||||
return Status::Aborted(err_msg);
|
||||
}
|
||||
if (tablet_schema.column_by_uid(delete_cond.column_unique_id).name() !=
|
||||
delete_cond.column_name) {
|
||||
const auto& err_msg = fmt::format(
|
||||
"colum name={} does not belongs to column uid={}, which column name={}",
|
||||
delete_cond.column_name, delete_cond.column_unique_id,
|
||||
tablet_schema.column_by_uid(delete_cond.column_unique_id).name());
|
||||
LOG(WARNING) << err_msg;
|
||||
DCHECK(false);
|
||||
return Status::Aborted(err_msg);
|
||||
}
|
||||
}
|
||||
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
|
||||
tablet_schema.clear_columns();
|
||||
for (const auto& column_desc : request.columns_desc) {
|
||||
|
||||
@ -40,6 +40,9 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class PushTask extends AgentTask {
|
||||
private static final Logger LOG = LogManager.getLogger(PushTask.class);
|
||||
@ -127,33 +130,36 @@ public class PushTask extends AgentTask {
|
||||
break;
|
||||
case DELETE:
|
||||
List<TCondition> tConditions = new ArrayList<TCondition>();
|
||||
Map<String, TColumn> colNameToColDesc = columnsDesc.stream()
|
||||
.collect(Collectors.toMap(TColumn::getColumnName, Function.identity()));
|
||||
for (Predicate condition : conditions) {
|
||||
TCondition tCondition = new TCondition();
|
||||
ArrayList<String> conditionValues = new ArrayList<String>();
|
||||
SlotRef slotRef = (SlotRef) condition.getChild(0);
|
||||
String columnName = slotRef.getColumnName();
|
||||
tCondition.setColumnName(columnName);
|
||||
int uniqueId = colNameToColDesc.get(columnName).getColUniqueId();
|
||||
if (uniqueId >= 0) {
|
||||
tCondition.setColumnUniqueId(uniqueId);
|
||||
}
|
||||
if (condition instanceof BinaryPredicate) {
|
||||
BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
|
||||
String columnName = ((SlotRef) binaryPredicate.getChild(0)).getColumnName();
|
||||
String value = ((LiteralExpr) binaryPredicate.getChild(1)).getStringValue();
|
||||
Operator op = binaryPredicate.getOp();
|
||||
tCondition.setColumnName(columnName);
|
||||
tCondition.setConditionOp(op.toString());
|
||||
conditionValues.add(value);
|
||||
} else if (condition instanceof IsNullPredicate) {
|
||||
IsNullPredicate isNullPredicate = (IsNullPredicate) condition;
|
||||
String columnName = ((SlotRef) isNullPredicate.getChild(0)).getColumnName();
|
||||
String op = "IS";
|
||||
String value = "NULL";
|
||||
if (isNullPredicate.isNotNull()) {
|
||||
value = "NOT NULL";
|
||||
}
|
||||
tCondition.setColumnName(columnName);
|
||||
tCondition.setConditionOp(op);
|
||||
conditionValues.add(value);
|
||||
} else if (condition instanceof InPredicate) {
|
||||
InPredicate inPredicate = (InPredicate) condition;
|
||||
String columnName = ((SlotRef) inPredicate.getChild(0)).getColumnName();
|
||||
String op = inPredicate.isNotIn() ? "!*=" : "*=";
|
||||
tCondition.setColumnName(columnName);
|
||||
tCondition.setConditionOp(op);
|
||||
for (int i = 1; i <= inPredicate.getInElementNum(); i++) {
|
||||
conditionValues.add(inPredicate.getChild(i).getStringValue());
|
||||
|
||||
Reference in New Issue
Block a user