From bb35a1f410bb30c2c18d08547a936207ed7c213a Mon Sep 17 00:00:00 2001 From: leslieyuchen Date: Thu, 24 Aug 2023 07:40:33 +0000 Subject: [PATCH] fix das retry thread deadlock --- deps/oblib/src/lib/thread/thread.h | 1 + src/observer/mysql/ob_query_driver.cpp | 3 + src/observer/mysql/ob_query_retry_ctrl.cpp | 4 +- src/observer/mysql/ob_sync_plan_driver.cpp | 2 +- src/observer/mysql/obmp_query.cpp | 1 + src/observer/mysql/obmp_stmt_execute.cpp | 1 + src/observer/mysql/obmp_stmt_prexecute.cpp | 3 + src/observer/ob_inner_sql_result.cpp | 6 +- src/sql/das/ob_das_id_rpc.cpp | 1 + src/sql/das/ob_das_location_router.cpp | 97 ++++++++----------- src/sql/das/ob_das_location_router.h | 5 +- src/sql/das/ob_das_retry_ctrl.cpp | 2 +- src/sql/das/ob_das_utils.cpp | 2 +- src/sql/das/ob_data_access_service.cpp | 7 +- src/sql/engine/cmd/ob_load_data_impl.cpp | 2 +- src/sql/engine/px/ob_dfo.h | 6 ++ src/sql/engine/px/ob_px_dtl_msg.cpp | 2 +- src/sql/engine/px/ob_px_dtl_msg.h | 11 ++- src/sql/engine/px/ob_px_scheduler.cpp | 2 + src/sql/engine/px/ob_px_sqc_proxy.cpp | 7 ++ src/sql/engine/px/ob_px_task_process.cpp | 7 +- .../executor/ob_remote_executor_processor.cpp | 2 +- src/sql/ob_result_set.cpp | 9 +- src/sql/ob_result_set.h | 3 +- 24 files changed, 105 insertions(+), 81 deletions(-) diff --git a/deps/oblib/src/lib/thread/thread.h b/deps/oblib/src/lib/thread/thread.h index bb38186e44..3128534274 100644 --- a/deps/oblib/src/lib/thread/thread.h +++ b/deps/oblib/src/lib/thread/thread.h @@ -142,6 +142,7 @@ public: static constexpr uint8_t WAIT = (1 << 0); static constexpr uint8_t WAIT_IN_TENANT_QUEUE = (1 << 1); static constexpr uint8_t WAIT_FOR_IO_EVENT = (1 << 2); + static constexpr uint8_t WAIT_FOR_LOCAL_RETRY = (1 << 3); //Statistics of local retry waiting time for dynamically increasing threads. // for thread diagnose, maybe replace it with union later. static thread_local int64_t loop_ts_; static thread_local pthread_t thread_joined_; diff --git a/src/observer/mysql/ob_query_driver.cpp b/src/observer/mysql/ob_query_driver.cpp index 3e63ed52db..ef5b4f7239 100644 --- a/src/observer/mysql/ob_query_driver.cpp +++ b/src/observer/mysql/ob_query_driver.cpp @@ -307,6 +307,9 @@ int ObQueryDriver::response_query_result(ObResultSet &result, LOG_WARN("fail to response query header", K(ret), K(row_num), K(can_retry)); } } + if (OB_FAIL(ret) && !can_retry) { + FLOG_INFO("The query has already returned partial results to the client and cannot be retried", KR(ret)); + } return ret; } diff --git a/src/observer/mysql/ob_query_retry_ctrl.cpp b/src/observer/mysql/ob_query_retry_ctrl.cpp index a3429774db..6f6226e45e 100644 --- a/src/observer/mysql/ob_query_retry_ctrl.cpp +++ b/src/observer/mysql/ob_query_retry_ctrl.cpp @@ -113,7 +113,7 @@ public: ~ObRefreshLocationCachePolicy() = default; virtual void test(ObRetryParam &v) const override { - v.result_.refresh_location_cache(is_async, v.err_); + v.result_.force_refresh_location_cache(is_async, v.err_); } }; @@ -238,7 +238,7 @@ public: K(v.session_.get_retry_info().get_last_query_retry_err())); if (v.session_.get_retry_info().is_rpc_timeout() || is_transaction_rpc_timeout_err(v.err_)) { // rpc超时了,可能是location cache不对,异步刷新location cache - v.result_.refresh_location_cache(true, v.err_); // 非阻塞 + v.result_.force_refresh_location_cache(true, v.err_); // 非阻塞 LOG_WARN("sql rpc timeout, or trans rpc timeout, maybe location is changed, " "refresh location cache non blockly", K(v), K(v.session_.get_retry_info().is_rpc_timeout())); diff --git a/src/observer/mysql/ob_sync_plan_driver.cpp b/src/observer/mysql/ob_sync_plan_driver.cpp index 658e40c6f5..39bda33d44 100644 --- a/src/observer/mysql/ob_sync_plan_driver.cpp +++ b/src/observer/mysql/ob_sync_plan_driver.cpp @@ -113,7 +113,7 @@ int ObSyncPlanDriver::response_result(ObMySQLResultSet &result) K(ret), K(cli_ret), K(retry_ctrl_.need_retry())); ret = cli_ret; } else { - result.refresh_location_cache(true, ret); + result.refresh_location_cache_by_errno(true, ret); } int cret = result.close(ret); if (cret != OB_SUCCESS) { diff --git a/src/observer/mysql/obmp_query.cpp b/src/observer/mysql/obmp_query.cpp index 9df6e2ee96..d9161a6824 100644 --- a/src/observer/mysql/obmp_query.cpp +++ b/src/observer/mysql/obmp_query.cpp @@ -580,6 +580,7 @@ OB_NOINLINE int ObMPQuery::process_with_tmp_context(ObSQLSessionInfo &session, bool &need_disconnect) { int ret = OB_SUCCESS; + oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY); //create a temporary memory context to process retry or the rest sql of multi-query, //avoid memory dynamic leaks caused by query retry or too many multi-query items lib::ContextParam param; diff --git a/src/observer/mysql/obmp_stmt_execute.cpp b/src/observer/mysql/obmp_stmt_execute.cpp index 0bf83b7cb4..3b5c034dec 100644 --- a/src/observer/mysql/obmp_stmt_execute.cpp +++ b/src/observer/mysql/obmp_stmt_execute.cpp @@ -1493,6 +1493,7 @@ OB_NOINLINE int ObMPStmtExecute::process_retry(ObSQLSessionInfo &session, { int ret = OB_SUCCESS; //create a temporary memory context to process retry, avoid memory bloat caused by retries + oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY); lib::ContextParam param; param.set_mem_attr(MTL_ID(), ObModIds::OB_SQL_EXECUTOR, ObCtxIds::DEFAULT_CTX_ID) diff --git a/src/observer/mysql/obmp_stmt_prexecute.cpp b/src/observer/mysql/obmp_stmt_prexecute.cpp index f5cfb187f7..94ca36e20d 100644 --- a/src/observer/mysql/obmp_stmt_prexecute.cpp +++ b/src/observer/mysql/obmp_stmt_prexecute.cpp @@ -253,6 +253,9 @@ int ObMPStmtPrexecute::before_process() session->set_session_in_retry(retry_ctrl_.need_retry()); } } + if (RETRY_TYPE_LOCAL == retry_ctrl_.get_retry_type()) { + oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY); + } } while (RETRY_TYPE_LOCAL == retry_ctrl_.get_retry_type()); if (OB_SUCC(ret) && retry_ctrl_.get_retry_times() > 0) { LOG_TRACE("sql retry succeed", K(ret), diff --git a/src/observer/ob_inner_sql_result.cpp b/src/observer/ob_inner_sql_result.cpp index a83a14cc97..bc0e1f6980 100644 --- a/src/observer/ob_inner_sql_result.cpp +++ b/src/observer/ob_inner_sql_result.cpp @@ -149,7 +149,7 @@ int ObInnerSQLResult::open() ret = OB_INIT_TWICE; LOG_WARN("result set already open", K(ret)); } else if (has_tenant_resource() && OB_FAIL(result_set_->open())) { - result_set_->refresh_location_cache(true, ret); + result_set_->refresh_location_cache_by_errno(true, ret); LOG_WARN("open result set failed", K(ret)); // move after precess_retry(). // result_set_->close(); @@ -214,7 +214,7 @@ int ObInnerSQLResult::inner_close() } else { WITH_CONTEXT(mem_context_) { if (has_tenant_resource() && OB_FAIL(result_set_->close())) { - result_set_->refresh_location_cache(true, ret); + result_set_->refresh_location_cache_by_errno(true, ret); LOG_WARN("result set close failed", K(ret)); } else if(!has_tenant_resource() && OB_FAIL(remote_result_set_->close())) { LOG_WARN("remote_result_set close failed", K(ret)); @@ -248,7 +248,7 @@ int ObInnerSQLResult::next() WITH_CONTEXT(mem_context_) { if (has_tenant_resource() && OB_FAIL(result_set_->get_next_row(row_))) { if (OB_ITER_END != ret) { - result_set_->refresh_location_cache(true, ret); + result_set_->refresh_location_cache_by_errno(true, ret); LOG_WARN("get next row failed", K(ret)); } } else if (!has_tenant_resource() && OB_FAIL(remote_result_set_->get_next_row(row_))) { diff --git a/src/sql/das/ob_das_id_rpc.cpp b/src/sql/das/ob_das_id_rpc.cpp index f31765ff5c..ec0901b598 100644 --- a/src/sql/das/ob_das_id_rpc.cpp +++ b/src/sql/das/ob_das_id_rpc.cpp @@ -111,6 +111,7 @@ int ObDASIDRequestRpc::fetch_new_range(const ObDASIDRequest &msg, .sync_fetch_das_id(msg, res))) { LOG_WARN("fetch new range failed", KR(ret), K(server), K(msg)); } + LOG_INFO("fetch new DAS ID range finish", KR(ret), K(msg), K(res)); return ret; } } // namespace sql diff --git a/src/sql/das/ob_das_location_router.cpp b/src/sql/das/ob_das_location_router.cpp index dad5fcc436..888ea3698d 100755 --- a/src/sql/das/ob_das_location_router.cpp +++ b/src/sql/das/ob_das_location_router.cpp @@ -754,7 +754,7 @@ ObDASLocationRouter::ObDASLocationRouter(ObIAllocator &allocator) ObDASLocationRouter::~ObDASLocationRouter() { //try to refresh location when location exception occurred - refresh_location_cache(true, cur_errno_); + refresh_location_cache_by_errno(true, cur_errno_); cur_errno_ = OB_SUCCESS; } @@ -777,7 +777,7 @@ int ObDASLocationRouter::nonblock_get_readable_replica(const uint64_t tenant_id, ls_loc))) { LOG_WARN("get ls replica location failed", K(ret), K(tablet_loc)); } - if (is_partition_change_error(ret) && OB_SUCCESS == last_errno_ && retry_cnt_ <= 0) { + if (is_partition_change_error(ret)) { /*During the execution phase, if nonblock location interface is used to obtain the location * and an exception occurs, retries are necessary. * However, statement-level retries cannot rollback many execution states, @@ -862,7 +862,7 @@ int ObDASLocationRouter::nonblock_get(const ObDASTableLocMeta &loc_meta, location))) { LOG_WARN("fail to get tablet locations", K(ret), K(tenant_id), K(ls_id)); } - if (is_partition_change_error(ret) && OB_SUCCESS == last_errno_ && retry_cnt_ <= 0) { + if (is_partition_change_error(ret)) { /*During the execution phase, if nonblock location interface is used to obtain the location * and an exception occurs, retries are necessary. * However, statement-level retries cannot rollback many execution states, @@ -965,7 +965,7 @@ int ObDASLocationRouter::nonblock_get_leader(const uint64_t tenant_id, tablet_loc.server_))) { LOG_WARN("nonblock get ls location failed", K(ret), K(tablet_loc)); } - if (is_partition_change_error(ret) && OB_SUCCESS == last_errno_ && retry_cnt_ <= 0) { + if (is_partition_change_error(ret)) { /*During the execution phase, if nonblock location interface is used to obtain the location * and an exception occurs, retries are necessary. * However, statement-level retries cannot rollback many execution states, @@ -1142,7 +1142,7 @@ OB_NOINLINE int ObDASLocationRouter::get_vt_ls_location(uint64_t table_id, return ret; } -void ObDASLocationRouter::refresh_location_cache(bool is_nonblock, int err_no) +void ObDASLocationRouter::refresh_location_cache_by_errno(bool is_nonblock, int err_no) { NG_TRACE_TIMES(1, get_location_cache_begin); if (is_master_changed_error(err_no) @@ -1159,62 +1159,43 @@ void ObDASLocationRouter::refresh_location_cache(bool is_nonblock, int err_no) // Timeout usage priority: ObTimeoutCtx > THIS_WORKER > GCONF // // all_tablet_list_ may contain duplicate tablet_id - int ret = OB_SUCCESS; - lib::ContextParam param; - param.set_mem_attr(MTL_ID(), "DasRefrLoca", ObCtxIds::DEFAULT_CTX_ID) - .set_properties(lib::USE_TL_PAGE_OPTIONAL) - .set_page_size(OB_MALLOC_NORMAL_BLOCK_SIZE) - .set_ablock_size(lib::INTACT_MIDDLE_AOBJECT_SIZE); - CREATE_WITH_TEMP_CONTEXT(param) { - ObList failed_list(CURRENT_CONTEXT->get_allocator()); - FOREACH_X(id_iter, all_tablet_list_, OB_SUCC(ret)) { - if (!element_exist(succ_tablet_list_, *id_iter) && !element_exist(failed_list, *id_iter)) { - if (OB_FAIL(failed_list.push_back(*id_iter))) { - LOG_WARN("store failed tablet id failed", KR(ret), K(id_iter)); - } - } - } - if (OB_SUCC(ret)) { - if (OB_ISNULL(GCTX.location_service_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("GCTX.location_service_ is null", KR(ret)); - } else if (OB_FAIL(GCTX.location_service_->batch_renew_tablet_locations(MTL_ID(), - failed_list, - err_no, - is_nonblock))) { - LOG_WARN("batch renew tablet locations failed", KR(ret), - "tenant_id", MTL_ID(), K(err_no), K(is_nonblock), K(failed_list)); - } - } - } - all_tablet_list_.clear(); - succ_tablet_list_.clear(); + force_refresh_location_cache(is_nonblock, err_no); } NG_TRACE_TIMES(1, get_location_cache_end); } -void ObDASLocationRouter::refresh_location_cache(const ObTabletID &tablet_id, - bool is_nonblock, - int err_no) +void ObDASLocationRouter::force_refresh_location_cache(bool is_nonblock, int err_no) { int ret = OB_SUCCESS; - // Refresh tablet ls mapping and ls location according to err_no. - // - // The timeout has been set inner the interface when renewing location synchronously. - // Timeout usage priority: ObTimeoutCtx > THIS_WORKER > GCONF.location_cache_refresh_sql_timeout. - if (OB_ISNULL(GCTX.location_service_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("GCTX.location_service_ is null", KR(ret)); - } else if (OB_FAIL(GCTX.location_service_->renew_tablet_location( - MTL_ID(), - tablet_id, - err_no, - is_nonblock))) { - LOG_WARN("renew tablet location failed", KR(ret), - "tenant_id", MTL_ID(), K(tablet_id), K(err_no), K(is_nonblock)); - } else { - LOG_INFO("LOCATION: refresh tablet location cache succ", K(err_no), K(tablet_id)); + lib::ContextParam param; + param.set_mem_attr(MTL_ID(), "DasRefrLoca", ObCtxIds::DEFAULT_CTX_ID) + .set_properties(lib::USE_TL_PAGE_OPTIONAL) + .set_page_size(OB_MALLOC_NORMAL_BLOCK_SIZE) + .set_ablock_size(lib::INTACT_MIDDLE_AOBJECT_SIZE); + CREATE_WITH_TEMP_CONTEXT(param) { + ObList failed_list(CURRENT_CONTEXT->get_allocator()); + FOREACH_X(id_iter, all_tablet_list_, OB_SUCC(ret)) { + if (!element_exist(succ_tablet_list_, *id_iter) && !element_exist(failed_list, *id_iter)) { + if (OB_FAIL(failed_list.push_back(*id_iter))) { + LOG_WARN("store failed tablet id failed", KR(ret), K(id_iter)); + } + } + } + if (OB_SUCC(ret)) { + if (OB_ISNULL(GCTX.location_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("GCTX.location_service_ is null", KR(ret)); + } else if (OB_FAIL(GCTX.location_service_->batch_renew_tablet_locations(MTL_ID(), + failed_list, + err_no, + is_nonblock))) { + LOG_WARN("batch renew tablet locations failed", KR(ret), + "tenant_id", MTL_ID(), K(err_no), K(is_nonblock), K(failed_list)); + } + } } + all_tablet_list_.clear(); + succ_tablet_list_.clear(); } int ObDASLocationRouter::block_renew_tablet_location(const ObTabletID &tablet_id, ObLSLocation &ls_loc) @@ -1225,9 +1206,13 @@ int ObDASLocationRouter::block_renew_tablet_location(const ObTabletID &tablet_id ObLSID ls_id; int64_t query_timeout_ts = THIS_WORKER.get_timeout_ts(); ObTimeoutCtx timeout_ctx; - timeout_ctx.set_timeout(1 * 1000L * 1000L); + timeout_ctx.set_timeout(GCONF.location_cache_refresh_sql_timeout); + //The maximum timeout period is location_cache_refresh_sql_timeout + if (timeout_ctx.get_abs_timeout() > query_timeout_ts && query_timeout_ts > 0) { + timeout_ctx.set_abs_timeout(query_timeout_ts); + } //the timeout limit for "refresh location" is within 1s - THIS_WORKER.set_timeout_ts(timeout_ctx.get_abs_timeout(query_timeout_ts)); + THIS_WORKER.set_timeout_ts(timeout_ctx.get_abs_timeout()); if (OB_FAIL(GCTX.location_service_->get(MTL_ID(), tablet_id, expire_renew_time, diff --git a/src/sql/das/ob_das_location_router.h b/src/sql/das/ob_das_location_router.h index b7b4adfa0b..b172d7bdb5 100644 --- a/src/sql/das/ob_das_location_router.h +++ b/src/sql/das/ob_das_location_router.h @@ -303,11 +303,12 @@ public: int get_full_ls_replica_loc(const common::ObObjectID &tenant_id, const ObDASTabletLoc &tablet_loc, share::ObLSReplicaLocation &replica_loc); - void refresh_location_cache(bool is_nonblock, int err_no); - void refresh_location_cache(const common::ObTabletID &tablet_id, bool is_nonblock, int err_no); + void refresh_location_cache_by_errno(bool is_nonblock, int err_no); + void force_refresh_location_cache(bool is_nonblock, int err_no); int block_renew_tablet_location(const common::ObTabletID &tablet_id, share::ObLSLocation &ls_loc); int save_touched_tablet_id(const common::ObTabletID &tablet_id) { return all_tablet_list_.push_back(tablet_id); } void set_last_errno(int err_no) { last_errno_ = err_no; } + int get_last_errno() const { return last_errno_; } void set_retry_cnt(int64_t retry_cnt) { retry_cnt_ = retry_cnt; } void inc_retry_cnt() { ++retry_cnt_; } void set_retry_info(const ObQueryRetryInfo* retry_info); diff --git a/src/sql/das/ob_das_retry_ctrl.cpp b/src/sql/das/ob_das_retry_ctrl.cpp index 6f3f8ed268..2b03f042fa 100644 --- a/src/sql/das/ob_das_retry_ctrl.cpp +++ b/src/sql/das/ob_das_retry_ctrl.cpp @@ -38,7 +38,7 @@ void ObDASRetryCtrl::tablet_location_retry_proc(ObDASRef &das_ref, ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet loc is nullptr", K(ret)); } else { - loc_router.refresh_location_cache(true, task_op.get_errcode()); + loc_router.force_refresh_location_cache(true, task_op.get_errcode()); need_retry = true; const ObDASTableLocMeta *loc_meta = tablet_loc->loc_meta_; LOG_INFO("refresh tablet location cache and retry DAS task", diff --git a/src/sql/das/ob_das_utils.cpp b/src/sql/das/ob_das_utils.cpp index 842518336c..17d91284d3 100644 --- a/src/sql/das/ob_das_utils.cpp +++ b/src/sql/das/ob_das_utils.cpp @@ -392,7 +392,7 @@ int ObDASUtils::wait_das_retry(int64_t retry_cnt) uint32_t timeout_factor = static_cast((retry_cnt > 100) ? 100 : retry_cnt); int64_t sleep_us = 10000L * timeout_factor > THIS_WORKER.get_timeout_remain() ? THIS_WORKER.get_timeout_remain() - : 1000L * timeout_factor; + : 10000L * timeout_factor; if (sleep_us > 0) { LOG_INFO("will sleep", K(sleep_us), K(THIS_WORKER.get_timeout_remain())); THIS_WORKER.sched_wait(); diff --git a/src/sql/das/ob_data_access_service.cpp b/src/sql/das/ob_data_access_service.cpp index 83f849117e..6ad98ab244 100644 --- a/src/sql/das/ob_data_access_service.cpp +++ b/src/sql/das/ob_data_access_service.cpp @@ -267,20 +267,22 @@ int ObDataAccessService::retry_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op //When das scan under px gi with transfor case, we need to disable das retry. need_retry = false; retry_continue = false; + LOG_INFO("[DAS RETRY] The PX task has retried too many times and has exited the DAS retry process"); } if (need_retry) { task_op.in_part_retry_ = true; location_router.set_last_errno(task_op.get_errcode()); location_router.inc_retry_cnt(); + oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY); if (OB_TMP_FAIL(clear_task_exec_env(das_ref, task_op))) { LOG_WARN("clear task execution environment failed", K(tmp_ret)); } if (OB_FAIL(das_ref.get_exec_ctx().check_status())) { - LOG_WARN("query is timeout, terminate retry", K(ret)); + LOG_WARN("query is timeout or interrupted, terminate retry", KR(ret)); } else if (OB_FAIL(refresh_task_location_info(das_ref, task_op))) { LOG_WARN("refresh task location failed", K(ret)); } else { - LOG_INFO("start to retry DAS task now", KPC(task_op.get_tablet_loc())); + LOG_INFO("[DAS RETRY] Start retrying the DAS task now", KPC(task_op.get_tablet_loc())); das_task_wrapper.reuse(); task_op.set_task_status(ObDasTaskStatus::UNSTART); if (OB_FAIL(das_task_wrapper.push_back_task(&task_op))) { @@ -288,6 +290,7 @@ int ObDataAccessService::retry_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op } else if (OB_FAIL(execute_dist_das_task(das_ref, das_task_wrapper, false))) { LOG_WARN("execute dist DAS task failed", K(ret)); } + LOG_INFO("[DAS RETRY] Retry completing the DAS Task", KPC(task_op.get_tablet_loc())); } task_op.errcode_ = ret; retry_continue = (OB_SUCCESS != ret); diff --git a/src/sql/engine/cmd/ob_load_data_impl.cpp b/src/sql/engine/cmd/ob_load_data_impl.cpp index 042c1ee608..4fa6fb89fb 100644 --- a/src/sql/engine/cmd/ob_load_data_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_impl.cpp @@ -394,7 +394,7 @@ int ObLoadDataBase::memory_wait_local(ObExecContext &ctx, } //if it is location exception, refresh location cache with block interface //because load data can only local retry - loc_router.refresh_location_cache(false, ret); + loc_router.refresh_location_cache_by_errno(false, ret); } //print info diff --git a/src/sql/engine/px/ob_dfo.h b/src/sql/engine/px/ob_dfo.h index 7f91d796e4..c43b0f0032 100644 --- a/src/sql/engine/px/ob_dfo.h +++ b/src/sql/engine/px/ob_dfo.h @@ -902,6 +902,7 @@ public: task_channel_(NULL), sqc_channel_(NULL), rc_(TASK_DEFAULT_RET_VALUE), // 小于等于 0 表示设置了 rc 值 + das_retry_rc_(common::OB_SUCCESS), state_(0), task_co_id_(0), px_int_id_(), @@ -956,6 +957,7 @@ public: K_(exec_addr), K_(qc_addr), K_(rc), + K_(das_retry_rc), K_(task_co_id), K_(px_int_id), K_(is_fulltree), @@ -990,6 +992,9 @@ public: inline void set_result(int rc) { rc_ = rc; } inline bool has_result() const { return rc_ <= 0; } inline int get_result() const { return rc_; } + void set_das_retry_rc(int das_retry_rc) + { das_retry_rc_ = (das_retry_rc_ == common::OB_SUCCESS ? das_retry_rc : das_retry_rc_); } + int get_das_retry_rc() const { return das_retry_rc_; } void set_exec_addr(const common::ObAddr &addr) { exec_addr_ = addr; } void set_sqc_addr(const common::ObAddr &addr) { sqc_addr_ = addr; } void set_qc_addr(const common::ObAddr &addr) { qc_addr_ = addr; } @@ -1025,6 +1030,7 @@ public: common::ObAddr exec_addr_; /* Task 的运行地址 */ common::ObAddr qc_addr_; /*记录 QC 的地址,用于中断*/ int rc_; + int das_retry_rc_; volatile int32_t state_; // 被 task 线程设置 volatile uint64_t task_co_id_; /* task 的协程 id */ ObPxInterruptID px_int_id_; diff --git a/src/sql/engine/px/ob_px_dtl_msg.cpp b/src/sql/engine/px/ob_px_dtl_msg.cpp index 131439832f..f849a32eba 100644 --- a/src/sql/engine/px/ob_px_dtl_msg.cpp +++ b/src/sql/engine/px/ob_px_dtl_msg.cpp @@ -31,7 +31,7 @@ OB_SERIALIZE_MEMBER(ObPxTransmitDataChannelMsg, ch_sets_, part_affinity_map_, ch OB_SERIALIZE_MEMBER(ObPxInitSqcResultMsg, dfo_id_, sqc_id_, rc_, task_count_, err_msg_); OB_SERIALIZE_MEMBER(ObPxFinishSqcResultMsg, dfo_id_, sqc_id_, rc_, trans_result_, task_monitor_info_array_, sqc_affected_rows_, dml_row_info_, - temp_table_id_, interm_result_ids_, fb_info_, err_msg_); + temp_table_id_, interm_result_ids_, fb_info_, err_msg_, das_retry_rc_); OB_SERIALIZE_MEMBER(ObPxFinishTaskResultMsg, dfo_id_, sqc_id_, task_id_, rc_); OB_SERIALIZE_MEMBER((ObPxBloomFilterChInfo, dtl::ObDtlChTotalInfo), filter_id_); OB_SERIALIZE_MEMBER((ObPxBloomFilterChSet, dtl::ObDtlChSet), filter_id_, sqc_id_); diff --git a/src/sql/engine/px/ob_px_dtl_msg.h b/src/sql/engine/px/ob_px_dtl_msg.h index 8105816741..db0b929542 100644 --- a/src/sql/engine/px/ob_px_dtl_msg.h +++ b/src/sql/engine/px/ob_px_dtl_msg.h @@ -384,6 +384,7 @@ public: : dfo_id_(common::OB_INVALID_ID), sqc_id_(common::OB_INVALID_ID), rc_(common::OB_SUCCESS), + das_retry_rc_(common::OB_SUCCESS), task_monitor_info_array_(), sqc_affected_rows_(0), dml_row_info_(), @@ -396,20 +397,22 @@ public: transaction::ObTxExecResult &get_trans_result() { return trans_result_; } void reset() { - dfo_id_ = OB_INVALID_ID; - sqc_id_ = OB_INVALID_ID; - rc_ = OB_SUCCESS; + dfo_id_ = common::OB_INVALID_ID; + sqc_id_ = common::OB_INVALID_ID; + rc_ = common::OB_SUCCESS; + das_retry_rc_ = common::OB_SUCCESS; trans_result_.reset(); task_monitor_info_array_.reset(); dml_row_info_.reset(); fb_info_.reset(); err_msg_.reset(); } - TO_STRING_KV(K_(dfo_id), K_(sqc_id), K_(rc), K_(sqc_affected_rows)); + TO_STRING_KV(K_(dfo_id), K_(sqc_id), K_(rc), K_(das_retry_rc), K_(sqc_affected_rows)); public: int64_t dfo_id_; int64_t sqc_id_; int rc_; // 错误码 + int das_retry_rc_; //record the error code that cause DAS to retry transaction::ObTxExecResult trans_result_; ObPxTaskMonitorInfoArray task_monitor_info_array_; // deprecated, keep for compatiablity int64_t sqc_affected_rows_; // pdml情况下,一个sqc 影响的行数 diff --git a/src/sql/engine/px/ob_px_scheduler.cpp b/src/sql/engine/px/ob_px_scheduler.cpp index dd6a16bca7..2fc0f9d23a 100644 --- a/src/sql/engine/px/ob_px_scheduler.cpp +++ b/src/sql/engine/px/ob_px_scheduler.cpp @@ -384,10 +384,12 @@ int ObPxMsgProc::on_sqc_finish_msg(ObExecContext &ctx, ObPxErrorUtil::update_qc_error_code(coord_info_.first_error_code_, pkt.rc_, pkt.err_msg_); if (OB_SUCC(ret)) { if (OB_FAIL(pkt.rc_)) { + DAS_CTX(ctx).get_location_router().save_cur_exec_status(pkt.rc_); LOG_WARN("sqc fail, abort qc", K(pkt), K(ret), "sqc_addr", sqc->get_exec_addr()); } else { // pkt rc_ == OB_SUCCESS // 处理 dml + px 框架下的affected row + DAS_CTX(ctx).get_location_router().save_cur_exec_status(pkt.das_retry_rc_); if (OB_ISNULL(ctx.get_physical_plan_ctx())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("phy plan ctx is null", K(ret)); diff --git a/src/sql/engine/px/ob_px_sqc_proxy.cpp b/src/sql/engine/px/ob_px_sqc_proxy.cpp index 45547508d8..231bb63965 100644 --- a/src/sql/engine/px/ob_px_sqc_proxy.cpp +++ b/src/sql/engine/px/ob_px_sqc_proxy.cpp @@ -456,6 +456,13 @@ int ObPxSQCProxy::report(int end_ret) const // overwrite ret ObPxTask &task = tasks.at(i); ObPxErrorUtil::update_sqc_error_code(sqc_ret, task.get_result(), task.err_msg_, finish_msg.err_msg_); + if (OB_SUCCESS == finish_msg.das_retry_rc_) { + //Even if the PX task is successfully retried by DAS, + //it is necessary to collect the retry error code of each task and provide feedback to the QC node of PX, + //so that the QC node can refresh the location cache in a timely manner, + //avoiding the next execution from being sent to the same erroneous node. + finish_msg.das_retry_rc_ = task.get_das_retry_rc(); + } affected_rows += task.get_affected_rows(); finish_msg.dml_row_info_.add_px_dml_row_info(task.dml_row_info_); finish_msg.temp_table_id_ = task.temp_table_id_; diff --git a/src/sql/engine/px/ob_px_task_process.cpp b/src/sql/engine/px/ob_px_task_process.cpp index 225fa0b083..4766a8fca6 100644 --- a/src/sql/engine/px/ob_px_task_process.cpp +++ b/src/sql/engine/px/ob_px_task_process.cpp @@ -456,9 +456,6 @@ int ObPxTaskProcess::do_process() } } } - if (OB_NOT_NULL(arg_.exec_ctx_)) { - DAS_CTX(*arg_.exec_ctx_).get_location_router().refresh_location_cache(true, ret); - } // for forward warning msg and user error msg (void)record_user_error_msg(ret); @@ -474,6 +471,10 @@ int ObPxTaskProcess::do_process() // Task 和 Sqc 在两个不同线程中时,task 需要和 sqc 通信 if (NULL != arg_.sqc_task_ptr_) { arg_.sqc_task_ptr_->set_result(ret); + if (OB_NOT_NULL(arg_.exec_ctx_)) { + int das_retry_rc = DAS_CTX(*arg_.exec_ctx_).get_location_router().get_last_errno(); + arg_.sqc_task_ptr_->set_das_retry_rc(das_retry_rc); + } if (OB_SUCC(ret)) { // nop } else if (IS_INTERRUPTED()) { diff --git a/src/sql/executor/ob_remote_executor_processor.cpp b/src/sql/executor/ob_remote_executor_processor.cpp index e38c26c1ff..ef89fc8399 100644 --- a/src/sql/executor/ob_remote_executor_processor.cpp +++ b/src/sql/executor/ob_remote_executor_processor.cpp @@ -717,7 +717,7 @@ int ObRemoteBaseExecuteP::execute_with_sql(ObRemoteTask &task) NULL, session->get_effective_tenant_id())) { ret = OB_ERR_REMOTE_SCHEMA_NOT_FULL; } - DAS_CTX(exec_ctx_).get_location_router().refresh_location_cache(true, ret); + DAS_CTX(exec_ctx_).get_location_router().refresh_location_cache_by_errno(true, ret); } //监控项统计结束 exec_end_timestamp_ = ObTimeUtility::current_time(); diff --git a/src/sql/ob_result_set.cpp b/src/sql/ob_result_set.cpp index d1444941da..d95b57c7cc 100644 --- a/src/sql/ob_result_set.cpp +++ b/src/sql/ob_result_set.cpp @@ -1154,9 +1154,14 @@ int ObResultSet::init_cmd_exec_context(ObExecContext &exec_ctx) } // obmp_query中重试整个SQL之前,可能需要调用本接口来刷新Location,以避免总是发给了错误的服务器 -void ObResultSet::refresh_location_cache(bool is_nonblock, int err) +void ObResultSet::refresh_location_cache_by_errno(bool is_nonblock, int err) { - DAS_CTX(get_exec_context()).get_location_router().refresh_location_cache(is_nonblock, err); + DAS_CTX(get_exec_context()).get_location_router().refresh_location_cache_by_errno(is_nonblock, err); +} + +void ObResultSet::force_refresh_location_cache(bool is_nonblock, int err) +{ + DAS_CTX(get_exec_context()).get_location_router().force_refresh_location_cache(is_nonblock, err); } // 告诉mysql是否要传入一个EndTransCallback diff --git a/src/sql/ob_result_set.h b/src/sql/ob_result_set.h index ef53f87ae8..df8c7c87ec 100644 --- a/src/sql/ob_result_set.h +++ b/src/sql/ob_result_set.h @@ -195,7 +195,8 @@ public: const common::ObString& get_stmt_ps_sql() const { return ps_sql_; } common::ObString& get_stmt_ps_sql() { return ps_sql_; } int64_t get_query_string_id() const; - void refresh_location_cache(bool is_nonblock, int err); + void refresh_location_cache_by_errno(bool is_nonblock, int err); + void force_refresh_location_cache(bool is_nonblock, int err); bool need_execute_remote_sql_async() const { return get_exec_context().use_remote_sql() && !is_inner_result_set_; }