Cherry commit from 3.1 to 3.1 opensource

This commit is contained in:
stdliu 2021-07-26 11:51:13 +08:00 committed by wangzelin.wzl
parent f2b3572e34
commit a24b0eb4e7
7 changed files with 39 additions and 15 deletions

View File

@ -4784,6 +4784,9 @@ int ObRebalanceSqlBKGTask::log_execute_result(const common::ObIArray<int>& rc_ar
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ret code count not match", K(ret), "ret_code_count", rc_array.count());
} else {
if (OB_SUCCESS != rc_array.at(0) && task_infos_.count() > 0) {
notify_sql_scheduler(rc_array.at(0));
}
for (int64_t i = 0; i < task_infos_.count(); i++) {
const sql::ObSchedBKGDDistTask& info = task_infos_.at(i).get_sql_bkgd_task();
ROOTSERVICE_EVENT_ADD("balancer",
@ -5341,15 +5344,20 @@ int ObValidateTask::assign(const ObValidateTask& that)
}
void ObRebalanceSqlBKGTask::notify_cancel() const
{
notify_sql_scheduler(OB_CANCELED);
}
void ObRebalanceSqlBKGTask::notify_sql_scheduler(const int rc) const
{
int tmp_ret = OB_SUCCESS;
if (1 != task_infos_.count()) {
tmp_ret = OB_ERR_UNEXPECTED;
LOG_WARN("should be one task info", K(tmp_ret), K(task_infos_.count()));
} else if (OB_SUCCESS !=
(tmp_ret = sql::ObRpcBKGDTaskCompleteP::notify_error(task_infos_.at(0).get_sql_bkgd_task().get_task_id(),
task_infos_.at(0).get_sql_bkgd_task().get_scheduler_id(),
OB_CANCELED))) {
} else if (OB_SUCCESS != (tmp_ret = sql::ObRpcBKGDTaskCompleteP::notify_error(
task_infos_.at(0).get_sql_bkgd_task().get_task_id(),
task_infos_.at(0).get_sql_bkgd_task().get_scheduler_id(),
rc))) {
LOG_WARN("notify task canceled filed", K(tmp_ret));
}
}
@ -5829,6 +5837,7 @@ int ObRebalanceSqlBKGTask::check_before_execute(
}
}
}
ret = E(EventTable::EN_BALANCE_TASK_EXE_ERR) OB_SUCCESS;
return ret;
}

View File

@ -2731,6 +2731,8 @@ public:
int assign(const ObRebalanceSqlBKGTask& that);
private:
void notify_sql_scheduler(const int rc) const;
common::ObArray<ObSqlBKGDDistTaskInfo> task_infos_;
};

View File

@ -217,6 +217,7 @@ OB_DEF_SERIALIZE(ObExprDllUdf)
LOG_WARN("serialize aggregate column expression failed", K(ret));
}
}
OZ(ObFuncExprOperator::serialize(buf, buf_len, pos));
return ret;
}
@ -238,13 +239,14 @@ OB_DEF_DESERIALIZE(ObExprDllUdf)
LOG_WARN("fail to deserialize expression", K(ret));
}
}
OZ(ObFuncExprOperator::deserialize(buf, data_len, pos));
OZ(udf_func_.init(udf_meta_));
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObExprDllUdf)
{
int64_t len = 0;
len += ObFuncExprOperator::get_serialize_size();
OB_UNIS_ADD_LEN(udf_meta_);
OB_UNIS_ADD_LEN(udf_attributes_);
OB_UNIS_ADD_LEN(udf_attributes_types_);
@ -258,6 +260,7 @@ OB_DEF_SERIALIZE_SIZE(ObExprDllUdf)
len += expr->get_serialize_size();
}
}
len += ObFuncExprOperator::get_serialize_size();
return len;
}

View File

@ -68,7 +68,7 @@ int ObExprLastInsertID::calc_resultN(
} else {
EXPR_DEFINE_CAST_CTX(expr_ctx, CM_NO_RANGE_CHECK);
expr_ctx.cast_mode_ &= ~(CM_WARN_ON_FAIL);
const ObObj& obj_tmp = objs_array[0];
const ObObj &obj_tmp = objs_array[0];
EXPR_GET_UINT64_V2(obj_tmp, param_value);
result.set_uint64(param_value);
}

View File

@ -160,11 +160,11 @@ int ObDistributedTransmit::inner_open(ObExecContext& exec_ctx) const
ObIArray<ObSliceEvent>* slice_events = NULL;
uint64_t slice_table_id = repartition_table_id_;
ObSchemaGetterGuard schema_guard;
const ObTableSchema* table_schema = NULL;
ObSchemaGetterGuard *schema_guard = exec_ctx.get_sql_ctx()->schema_guard_;
const ObTableSchema *table_schema = NULL;
int64_t interm_result_buf_len = get_split_task_count() * sizeof(ObIntermResult*);
if (OB_ISNULL(child_op_)) {
int64_t interm_result_buf_len = get_split_task_count() * sizeof(ObIntermResult *);
if (OB_ISNULL(child_op_) || OB_ISNULL(schema_guard)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("child op is NULL", K(ret));
} else if (OB_UNLIKELY(get_split_task_count() <= 0)) {
@ -212,9 +212,7 @@ int ObDistributedTransmit::inner_open(ObExecContext& exec_ctx) const
}
if (OB_SUCC(ret) && OB_INVALID_ID != slice_table_id) {
if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(session->get_effective_tenant_id(), schema_guard))) {
LOG_WARN("faile to get schema guard", K(ret));
} else if (OB_FAIL(schema_guard.get_table_schema(slice_table_id, table_schema))) {
if (OB_FAIL(schema_guard->get_table_schema(slice_table_id, table_schema))) {
LOG_WARN("faile to get table schema", K(ret), K(slice_table_id));
} else if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;

View File

@ -295,7 +295,9 @@ int ObRemoteBaseExecuteP<T>::sync_send_result(ObExecContext& exec_ctx, const ObP
bool need_flush = false;
if (is_static_engine) {
bool added = false;
if (OB_FAIL(scanner.try_add_row(se_op->get_spec().output_, exec_ctx.get_eval_ctx(), added))) {
if (OB_FAIL(scanner.try_add_row(se_op->get_spec().output_,
exec_ctx.get_eval_ctx(),
added))) {
LOG_WARN("fail add row to scanner", K(ret));
} else if (!added) {
need_flush = true;
@ -1178,7 +1180,8 @@ int ObRpcRemoteASyncExecuteP::send_result_to_controller(ObExecContext& exec_ctx,
if (OB_UNLIKELY(OB_ITER_END != ret)) {
LOG_WARN("failed to get next row", K(ret));
}
} else if (OB_FAIL(scanner.try_add_row(se_op->get_spec().output_, exec_ctx.get_eval_ctx(), added))) {
} else if (OB_FAIL(scanner.try_add_row(se_op->get_spec().output_,
exec_ctx.get_eval_ctx(), added))) {
LOG_WARN("fail add row to scanner", K(ret));
} else if (!added) {
buffer_enough = true;

View File

@ -789,6 +789,9 @@ int ObIndexSSTableBuilder::gen_data_exchange(ObPhysicalPlan& phy_plan, ObPhyOper
if (OB_FAIL(
transmit->get_range_locations().push_back(ObTaskInfo::ObRangeLocation(phy_plan.get_allocator())))) {
LOG_WARN("array push back failed", K(ret));
} else if (FALSE_IT(transmit->get_range_locations().at(
transmit->get_range_locations().count() - 1).part_locs_.set_allocator(
&phy_plan.get_allocator()))) {
} else if (OB_FAIL(transmit->get_range_locations().at(idx).part_locs_.init(1))) {
LOG_WARN("init fix array failed", K(ret));
} else {
@ -1025,6 +1028,9 @@ int ObIndexSSTableBuilder::gen_macro_exchange(ObPhysicalPlan& phy_plan, ObPhyOpe
for (int64_t i = 0; OB_SUCC(ret) && i < index_keys.get_partition_num(); i++) {
if (OB_FAIL(range_locations.push_back(ObTaskInfo::ObRangeLocation(phy_plan.get_allocator())))) {
LOG_WARN("array push back failed", K(ret));
} else if (FALSE_IT(
range_locations.at(range_locations.count() - 1).part_locs_.set_allocator(
&phy_plan.get_allocator()))) {
} else {
auto& ranges = index_ranges_.at(std::min(i, index_ranges_.count() - 1));
ObPartitionKey pkey;
@ -1145,6 +1151,9 @@ int ObIndexSSTableBuilder::gen_sstable_exchange(ObPhysicalPlan& phy_plan, ObPhyO
for (int64_t i = 0; OB_SUCC(ret) && i < index_keys.get_partition_num(); i++) {
if (OB_FAIL(range_locations.push_back(ObTaskInfo::ObRangeLocation(phy_plan.get_allocator())))) {
LOG_WARN("array push back failed", K(ret));
} else if (FALSE_IT(
range_locations.at(range_locations.count() - 1).part_locs_.set_allocator(
&phy_plan.get_allocator()))) {
} else {
auto& range_location = range_locations.at(i);
if (OB_FAIL(range_location.part_locs_.init(1))) {