From e158ab5e5f335bee7c45d43abd3edf008c4ded7b Mon Sep 17 00:00:00 2001 From: sdc Date: Tue, 23 Jan 2024 14:18:02 +0000 Subject: [PATCH] fix wait init_sqc response core --- src/sql/engine/px/ob_px_sqc_async_proxy.cpp | 24 ++++++++++++--------- src/sql/engine/px/ob_px_sqc_async_proxy.h | 1 + 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/sql/engine/px/ob_px_sqc_async_proxy.cpp b/src/sql/engine/px/ob_px_sqc_async_proxy.cpp index ca6300785e..47428a16c0 100644 --- a/src/sql/engine/px/ob_px_sqc_async_proxy.cpp +++ b/src/sql/engine/px/ob_px_sqc_async_proxy.cpp @@ -251,16 +251,20 @@ void ObPxSqcAsyncProxy::fail_process() { K(return_cb_count_), K(callbacks_.count())); ARRAY_FOREACH_X(callbacks_, idx, count, true) { ObSqcAsyncCB &callback = *callbacks_.at(idx); - if (!callback.is_visited() && - !(callback.is_processed() || callback.is_timeout() || callback.is_invalid())) { - // unregister async callbacks that have not received response. - ObAsyncRespCallback *async_cb = static_cast(callback.low_level_cb_); - uint64_t gtid = async_cb->gtid_; - uint32_t pkt_id = async_cb->pkt_id_; - int err = 0; - if ((err = pn_terminate_pkt(gtid, pkt_id)) != 0) { - int ret = tranlate_to_ob_error(err); - LOG_WARN("terminate pkt failed", K(ret), K(err)); + { + // avoid rpc thread access the callback currently. + ObThreadCondGuard guard(callback.get_cond()); + if (!callback.is_visited() && + !(callback.is_processed() || callback.is_timeout() || callback.is_invalid())) { + // unregister async callbacks that have not received response. + ObAsyncRespCallback *async_cb = static_cast(callback.low_level_cb_); + uint64_t gtid = async_cb->gtid_; + uint32_t pkt_id = async_cb->pkt_id_; + int err = 0; + if ((err = pn_terminate_pkt(gtid, pkt_id)) != 0) { + int ret = tranlate_to_ob_error(err); + LOG_WARN("terminate pkt failed", K(ret), K(err)); + } } } } diff --git a/src/sql/engine/px/ob_px_sqc_async_proxy.h b/src/sql/engine/px/ob_px_sqc_async_proxy.h index c30f07b0d2..2a7f26452a 100644 --- a/src/sql/engine/px/ob_px_sqc_async_proxy.h +++ b/src/sql/engine/px/ob_px_sqc_async_proxy.h @@ -62,6 +62,7 @@ public: const ObPxUserErrorMsg get_ret_code() const { return rcode_; } const common::ObAddr &get_dst() const { return dst_; } int64_t get_timeout() const { return timeout_; } + ObThreadCond &get_cond() { return cond_; } // to string TO_STRING_KV("dst", get_dst(), "timeout", get_timeout(), "ret_code", get_ret_code(), "result", get_result(), "is_visited",