From a24b0eb4e75ad91ab7e13bfa19e006c7bb955eb5 Mon Sep 17 00:00:00 2001 From: stdliu Date: Mon, 26 Jul 2021 11:51:13 +0800 Subject: [PATCH] Cherry commit from 3.1 to 3.1 opensource --- src/rootserver/ob_rebalance_task.cpp | 17 +++++++++++++---- src/rootserver/ob_rebalance_task.h | 2 ++ src/sql/engine/expr/ob_expr_dll_udf.cpp | 5 ++++- src/sql/engine/expr/ob_expr_last_insert_id.cpp | 2 +- src/sql/executor/ob_distributed_transmit.cpp | 12 +++++------- .../executor/ob_remote_executor_processor.cpp | 7 +++++-- src/sql/ob_index_sstable_builder.cpp | 9 +++++++++ 7 files changed, 39 insertions(+), 15 deletions(-) diff --git a/src/rootserver/ob_rebalance_task.cpp b/src/rootserver/ob_rebalance_task.cpp index 977fe0ef9..12bd7aef6 100644 --- a/src/rootserver/ob_rebalance_task.cpp +++ b/src/rootserver/ob_rebalance_task.cpp @@ -4784,6 +4784,9 @@ int ObRebalanceSqlBKGTask::log_execute_result(const common::ObIArray& rc_ar ret = OB_ERR_UNEXPECTED; LOG_WARN("ret code count not match", K(ret), "ret_code_count", rc_array.count()); } else { + if (OB_SUCCESS != rc_array.at(0) && task_infos_.count() > 0) { + notify_sql_scheduler(rc_array.at(0)); + } for (int64_t i = 0; i < task_infos_.count(); i++) { const sql::ObSchedBKGDDistTask& info = task_infos_.at(i).get_sql_bkgd_task(); ROOTSERVICE_EVENT_ADD("balancer", @@ -5341,15 +5344,20 @@ int ObValidateTask::assign(const ObValidateTask& that) } void ObRebalanceSqlBKGTask::notify_cancel() const +{ + notify_sql_scheduler(OB_CANCELED); +} + +void ObRebalanceSqlBKGTask::notify_sql_scheduler(const int rc) const { int tmp_ret = OB_SUCCESS; if (1 != task_infos_.count()) { tmp_ret = OB_ERR_UNEXPECTED; LOG_WARN("should be one task info", K(tmp_ret), K(task_infos_.count())); - } else if (OB_SUCCESS != - (tmp_ret = sql::ObRpcBKGDTaskCompleteP::notify_error(task_infos_.at(0).get_sql_bkgd_task().get_task_id(), - task_infos_.at(0).get_sql_bkgd_task().get_scheduler_id(), - OB_CANCELED))) { + } else if (OB_SUCCESS != (tmp_ret = sql::ObRpcBKGDTaskCompleteP::notify_error( + task_infos_.at(0).get_sql_bkgd_task().get_task_id(), + task_infos_.at(0).get_sql_bkgd_task().get_scheduler_id(), + rc))) { LOG_WARN("notify task canceled filed", K(tmp_ret)); } } @@ -5829,6 +5837,7 @@ int ObRebalanceSqlBKGTask::check_before_execute( } } } + ret = E(EventTable::EN_BALANCE_TASK_EXE_ERR) OB_SUCCESS; return ret; } diff --git a/src/rootserver/ob_rebalance_task.h b/src/rootserver/ob_rebalance_task.h index d21294617..b9a4a53be 100644 --- a/src/rootserver/ob_rebalance_task.h +++ b/src/rootserver/ob_rebalance_task.h @@ -2731,6 +2731,8 @@ public: int assign(const ObRebalanceSqlBKGTask& that); private: + void notify_sql_scheduler(const int rc) const; + common::ObArray task_infos_; }; diff --git a/src/sql/engine/expr/ob_expr_dll_udf.cpp b/src/sql/engine/expr/ob_expr_dll_udf.cpp index 5268946e0..7d251c40c 100644 --- a/src/sql/engine/expr/ob_expr_dll_udf.cpp +++ b/src/sql/engine/expr/ob_expr_dll_udf.cpp @@ -217,6 +217,7 @@ OB_DEF_SERIALIZE(ObExprDllUdf) LOG_WARN("serialize aggregate column expression failed", K(ret)); } } + OZ(ObFuncExprOperator::serialize(buf, buf_len, pos)); return ret; } @@ -238,13 +239,14 @@ OB_DEF_DESERIALIZE(ObExprDllUdf) LOG_WARN("fail to deserialize expression", K(ret)); } } + OZ(ObFuncExprOperator::deserialize(buf, data_len, pos)); + OZ(udf_func_.init(udf_meta_)); return ret; } OB_DEF_SERIALIZE_SIZE(ObExprDllUdf) { int64_t len = 0; - len += ObFuncExprOperator::get_serialize_size(); OB_UNIS_ADD_LEN(udf_meta_); OB_UNIS_ADD_LEN(udf_attributes_); OB_UNIS_ADD_LEN(udf_attributes_types_); @@ -258,6 +260,7 @@ OB_DEF_SERIALIZE_SIZE(ObExprDllUdf) len += expr->get_serialize_size(); } } + len += ObFuncExprOperator::get_serialize_size(); return len; } diff --git a/src/sql/engine/expr/ob_expr_last_insert_id.cpp b/src/sql/engine/expr/ob_expr_last_insert_id.cpp index fb8ace2d0..f6d42a500 100644 --- a/src/sql/engine/expr/ob_expr_last_insert_id.cpp +++ b/src/sql/engine/expr/ob_expr_last_insert_id.cpp @@ -68,7 +68,7 @@ int ObExprLastInsertID::calc_resultN( } else { EXPR_DEFINE_CAST_CTX(expr_ctx, CM_NO_RANGE_CHECK); expr_ctx.cast_mode_ &= ~(CM_WARN_ON_FAIL); - const ObObj& obj_tmp = objs_array[0]; + const ObObj &obj_tmp = objs_array[0]; EXPR_GET_UINT64_V2(obj_tmp, param_value); result.set_uint64(param_value); } diff --git a/src/sql/executor/ob_distributed_transmit.cpp b/src/sql/executor/ob_distributed_transmit.cpp index 3c71575f6..9eccf52a7 100644 --- a/src/sql/executor/ob_distributed_transmit.cpp +++ b/src/sql/executor/ob_distributed_transmit.cpp @@ -160,11 +160,11 @@ int ObDistributedTransmit::inner_open(ObExecContext& exec_ctx) const ObIArray* slice_events = NULL; uint64_t slice_table_id = repartition_table_id_; - ObSchemaGetterGuard schema_guard; - const ObTableSchema* table_schema = NULL; + ObSchemaGetterGuard *schema_guard = exec_ctx.get_sql_ctx()->schema_guard_; + const ObTableSchema *table_schema = NULL; - int64_t interm_result_buf_len = get_split_task_count() * sizeof(ObIntermResult*); - if (OB_ISNULL(child_op_)) { + int64_t interm_result_buf_len = get_split_task_count() * sizeof(ObIntermResult *); + if (OB_ISNULL(child_op_) || OB_ISNULL(schema_guard)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("child op is NULL", K(ret)); } else if (OB_UNLIKELY(get_split_task_count() <= 0)) { @@ -212,9 +212,7 @@ int ObDistributedTransmit::inner_open(ObExecContext& exec_ctx) const } if (OB_SUCC(ret) && OB_INVALID_ID != slice_table_id) { - if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(session->get_effective_tenant_id(), schema_guard))) { - LOG_WARN("faile to get schema guard", K(ret)); - } else if (OB_FAIL(schema_guard.get_table_schema(slice_table_id, table_schema))) { + if (OB_FAIL(schema_guard->get_table_schema(slice_table_id, table_schema))) { LOG_WARN("faile to get table schema", K(ret), K(slice_table_id)); } else if (OB_ISNULL(table_schema)) { ret = OB_ERR_UNEXPECTED; diff --git a/src/sql/executor/ob_remote_executor_processor.cpp b/src/sql/executor/ob_remote_executor_processor.cpp index 522703e53..4d1a7a540 100644 --- a/src/sql/executor/ob_remote_executor_processor.cpp +++ b/src/sql/executor/ob_remote_executor_processor.cpp @@ -295,7 +295,9 @@ int ObRemoteBaseExecuteP::sync_send_result(ObExecContext& exec_ctx, const ObP bool need_flush = false; if (is_static_engine) { bool added = false; - if (OB_FAIL(scanner.try_add_row(se_op->get_spec().output_, exec_ctx.get_eval_ctx(), added))) { + if (OB_FAIL(scanner.try_add_row(se_op->get_spec().output_, + exec_ctx.get_eval_ctx(), + added))) { LOG_WARN("fail add row to scanner", K(ret)); } else if (!added) { need_flush = true; @@ -1178,7 +1180,8 @@ int ObRpcRemoteASyncExecuteP::send_result_to_controller(ObExecContext& exec_ctx, if (OB_UNLIKELY(OB_ITER_END != ret)) { LOG_WARN("failed to get next row", K(ret)); } - } else if (OB_FAIL(scanner.try_add_row(se_op->get_spec().output_, exec_ctx.get_eval_ctx(), added))) { + } else if (OB_FAIL(scanner.try_add_row(se_op->get_spec().output_, + exec_ctx.get_eval_ctx(), added))) { LOG_WARN("fail add row to scanner", K(ret)); } else if (!added) { buffer_enough = true; diff --git a/src/sql/ob_index_sstable_builder.cpp b/src/sql/ob_index_sstable_builder.cpp index ea71c5859..87259bf7a 100644 --- a/src/sql/ob_index_sstable_builder.cpp +++ b/src/sql/ob_index_sstable_builder.cpp @@ -789,6 +789,9 @@ int ObIndexSSTableBuilder::gen_data_exchange(ObPhysicalPlan& phy_plan, ObPhyOper if (OB_FAIL( transmit->get_range_locations().push_back(ObTaskInfo::ObRangeLocation(phy_plan.get_allocator())))) { LOG_WARN("array push back failed", K(ret)); + } else if (FALSE_IT(transmit->get_range_locations().at( + transmit->get_range_locations().count() - 1).part_locs_.set_allocator( + &phy_plan.get_allocator()))) { } else if (OB_FAIL(transmit->get_range_locations().at(idx).part_locs_.init(1))) { LOG_WARN("init fix array failed", K(ret)); } else { @@ -1025,6 +1028,9 @@ int ObIndexSSTableBuilder::gen_macro_exchange(ObPhysicalPlan& phy_plan, ObPhyOpe for (int64_t i = 0; OB_SUCC(ret) && i < index_keys.get_partition_num(); i++) { if (OB_FAIL(range_locations.push_back(ObTaskInfo::ObRangeLocation(phy_plan.get_allocator())))) { LOG_WARN("array push back failed", K(ret)); + } else if (FALSE_IT( + range_locations.at(range_locations.count() - 1).part_locs_.set_allocator( + &phy_plan.get_allocator()))) { } else { auto& ranges = index_ranges_.at(std::min(i, index_ranges_.count() - 1)); ObPartitionKey pkey; @@ -1145,6 +1151,9 @@ int ObIndexSSTableBuilder::gen_sstable_exchange(ObPhysicalPlan& phy_plan, ObPhyO for (int64_t i = 0; OB_SUCC(ret) && i < index_keys.get_partition_num(); i++) { if (OB_FAIL(range_locations.push_back(ObTaskInfo::ObRangeLocation(phy_plan.get_allocator())))) { LOG_WARN("array push back failed", K(ret)); + } else if (FALSE_IT( + range_locations.at(range_locations.count() - 1).part_locs_.set_allocator( + &phy_plan.get_allocator()))) { } else { auto& range_location = range_locations.at(i); if (OB_FAIL(range_location.part_locs_.init(1))) {