From e378f7f147fe12f37220594886beeeace1d11cf3 Mon Sep 17 00:00:00 2001 From: liucc1997 <1192520566@qq.com> Date: Thu, 22 Aug 2024 05:11:17 +0000 Subject: [PATCH] move gtid/pkt_id to ObReqTransport::AsyncCB and support terminate rpc in ObAsyncRpcProxy --- deps/oblib/src/rpc/frame/ob_req_transport.h | 5 ++- deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp | 3 +- deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h | 14 ++++---- deps/oblib/src/rpc/obrpc/ob_rpc_proxy.h | 11 ++++++- deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp | 4 +-- src/share/rpc/ob_async_rpc_proxy.h | 32 ++++++++++++++++++- src/sql/engine/px/ob_px_sqc_async_proxy.cpp | 23 ++++++------- .../tablelock/ob_table_lock_rpc_client.cpp | 2 ++ .../tablelock/ob_table_lock_service.cpp | 20 ++++++++---- 9 files changed, 80 insertions(+), 34 deletions(-) diff --git a/deps/oblib/src/rpc/frame/ob_req_transport.h b/deps/oblib/src/rpc/frame/ob_req_transport.h index 020f91a88..baa7d21b8 100644 --- a/deps/oblib/src/rpc/frame/ob_req_transport.h +++ b/deps/oblib/src/rpc/frame/ob_req_transport.h @@ -94,7 +94,8 @@ public: { public: AsyncCB(int pcode) - : low_level_cb_(NULL), dst_(), timeout_(0), tenant_id_(0), + : low_level_cb_(NULL), gtid_(0), pkt_id_(0), + dst_(), timeout_(0), tenant_id_(0), err_(0), pcode_(pcode), send_ts_(0), payload_(0) {} virtual ~AsyncCB() {} @@ -130,6 +131,8 @@ public: obrpc::ObRpcPacketCode get_pcode() const { return static_cast(pcode_); } void* low_level_cb_; + uint64_t gtid_; + uint32_t pkt_id_; private: static const int64_t REQUEST_ITEM_COST_RT = 100 * 1000; // 100ms protected: diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp index 9bcfbf8d8..dc52110f8 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp @@ -67,7 +67,8 @@ int ObSyncRespCallback::wait(const int64_t wait_timeout_us, const int64_t pcode, const struct timespec ts = {1, 0}; bool has_terminated = false; while(ATOMIC_LOAD(&cond_) == 0) { - if (OB_UNLIKELY((obrpc::OB_REMOTE_SYNC_EXECUTE == pcode || obrpc::OB_REMOTE_EXECUTE == pcode) + if (OB_UNLIKELY((obrpc::OB_REMOTE_SYNC_EXECUTE == pcode || obrpc::OB_REMOTE_EXECUTE == pcode + || proxy_.is_detect_session_killed()) && !has_terminated && OB_ERR_SESSION_INTERRUPTED == THIS_WORKER.check_status())) { RPC_LOG(INFO, "check session killed, will execute pn_terminate_pkt", K(gtid_), K(pkt_id_)); diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h index 19bb03310..13750a537 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h @@ -28,8 +28,8 @@ namespace obrpc class ObSyncRespCallback { public: - ObSyncRespCallback(ObRpcMemPool& pool) - : pkt_nio_cb_(NULL), pool_(pool), resp_(NULL), sz_(0), cond_(0), send_ret_(common::OB_SUCCESS), gtid_(0), pkt_id_(0){} + ObSyncRespCallback(ObRpcMemPool& pool, ObRpcProxy& proxy) + : pkt_nio_cb_(NULL), pool_(pool), proxy_(proxy), resp_(NULL), sz_(0), cond_(0), send_ret_(common::OB_SUCCESS){} ~ObSyncRespCallback() {} void* alloc(int64_t sz) { return pool_.alloc(sz); } int handle_resp(int io_err, const char* buf, int64_t sz); @@ -45,6 +45,7 @@ public: private: void* pkt_nio_cb_; ObRpcMemPool& pool_; + const ObRpcProxy& proxy_; char* resp_; int64_t sz_; int cond_; @@ -77,9 +78,6 @@ private: void* pkt_nio_cb_; ObRpcMemPool& pool_; UAsyncCB* ucb_; -public: - uint64_t gtid_; - uint32_t pkt_id_; }; void init_ucb(ObRpcProxy& proxy, UAsyncCB* ucb, const common::ObAddr& addr, int64_t send_ts, int64_t payload_sz); @@ -125,7 +123,7 @@ public: auto &set = obrpc::ObRpcPacketSet::instance(); const char* pcode_label = set.name_of_idx(set.idx_of_pcode(pcode)); ObRpcMemPool pool(src_tenant_id, pcode_label); - ObSyncRespCallback cb(pool); + ObSyncRespCallback cb(pool, proxy); char* req = NULL; int64_t req_sz = 0; const char* resp = NULL; @@ -244,8 +242,8 @@ public: set_ucb_args(newcb, args); init_ucb(proxy, cb->get_ucb(), addr, start_ts, req_sz); } - cb->gtid_ = (pnio_group_id<<32) + thread_id; - pkt_id_ptr = &cb->pkt_id_; + ucb->gtid_ = (pnio_group_id<<32) + thread_id; + pkt_id_ptr = &ucb->pkt_id_; } IGNORE_RETURN snprintf(rpc_timeguard_str, sizeof(rpc_timeguard_str), "sz=%ld,pcode=%x,id=%ld", req_sz, pcode, src_tenant_id); timeguard.click(rpc_timeguard_str); diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.h b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.h index 2d1982e00..9cf5f80b1 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.h @@ -119,7 +119,8 @@ public: max_process_handler_time_(0), compressor_type_(common::INVALID_COMPRESSOR), src_cluster_id_(common::OB_INVALID_CLUSTER_ID), dst_cluster_id_(common::OB_INVALID_CLUSTER_ID), init_(false), - active_(true), is_trace_time_(false), do_ratelimit_(false), is_bg_flow_(0), rcode_() {} + active_(true), is_trace_time_(false), do_ratelimit_(false), + do_detect_session_killed_(false), is_bg_flow_(0), rcode_() {} virtual ~ObRpcProxy() = default; int init(const rpc::frame::ObReqTransport *transport, @@ -133,6 +134,8 @@ public: int64_t get_timeout() const { return timeout_; } void set_trace_time(const bool is_trace_time) { is_trace_time_ = is_trace_time; } void set_ratelimit(const bool do_ratelimit) { do_ratelimit_ = do_ratelimit; } + void set_detect_session_killed(const bool do_detect) { do_detect_session_killed_ = do_detect; } + bool is_detect_session_killed() const { return do_detect_session_killed_; } void set_bg_flow(const int8_t is_bg_flow) { is_bg_flow_ = is_bg_flow;} void set_max_process_handler_time(const uint32_t max_process_handler_time) { max_process_handler_time_ = max_process_handler_time; } @@ -261,6 +264,7 @@ protected: bool active_; bool is_trace_time_; bool do_ratelimit_; + bool do_detect_session_killed_; int8_t is_bg_flow_; ObRpcResultCode rcode_; }; @@ -391,6 +395,11 @@ extern ObRpcProxy::NoneT None; set_ratelimit(do_ratelimit); \ return *this; \ } \ + inline CLS& detect_session_killed(const bool do_detect) \ + { \ + set_detect_session_killed(do_detect); \ + return *this; \ + } \ inline CLS& bg_flow(const uint32_t is_bg_flow) \ { \ set_bg_flow(is_bg_flow); \ diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp index c28221675..527ec9b45 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp @@ -55,7 +55,7 @@ int SSHandle::get_more(typename pcodeStruct::Response &result) auto &set = obrpc::ObRpcPacketSet::instance(); const char* pcode_label = set.name_of_idx(set.idx_of_pcode(pcode_)); ObRpcMemPool pool(src_tenant_id, pcode_label); - ObSyncRespCallback cb(pool); + ObSyncRespCallback cb(pool, proxy_); char* pnio_req = NULL; int64_t pnio_req_sz = 0, resp_sz = 0; const char* resp = NULL; @@ -205,7 +205,7 @@ int SSHandle::abort() auto &set = obrpc::ObRpcPacketSet::instance(); const char* pcode_label = set.name_of_idx(set.idx_of_pcode(pcode_)); ObRpcMemPool pool(src_tenant_id, pcode_label); - ObSyncRespCallback cb(pool); + ObSyncRespCallback cb(pool, proxy_); char* pnio_req = NULL; int64_t pnio_req_sz = 0, resp_sz = 0; const char* resp = NULL; diff --git a/src/share/rpc/ob_async_rpc_proxy.h b/src/share/rpc/ob_async_rpc_proxy.h index 5a07711d9..351b887b0 100644 --- a/src/share/rpc/ob_async_rpc_proxy.h +++ b/src/share/rpc/ob_async_rpc_proxy.h @@ -21,6 +21,8 @@ #include "rpc/obrpc/ob_rpc_packet.h" #include "rpc/obrpc/ob_rpc_result_code.h" #include "rpc/obrpc/ob_rpc_proxy.h" +#include "share/ob_errno.h" +#include "lib/worker.h" namespace oceanbase { @@ -403,8 +405,36 @@ int ObAsyncRpcProxy::wait( RPC_LOG(WARN, "inner stat error", K_(response_count), "cb_count", cb_list_.get_size(), K(ret)); } else { + bool has_terminated = false; while (response_count_ < cb_list_.get_size()) { - cond_.wait(); + cond_.wait(1000); + if (OB_UNLIKELY(rpc_proxy_.is_detect_session_killed() + && !has_terminated + && OB_ERR_SESSION_INTERRUPTED == THIS_WORKER.check_status())) { + RPC_LOG(INFO, "check session killed, will terminate all rpc locally", K(response_count_), K(cb_list_.get_size())); + int tmp_ret = OB_SUCCESS; + int index = 0; + ObAsyncCB *cb = cb_list_.get_first(); + while (common::OB_SUCCESS == tmp_ret && cb != cb_list_.get_header()) { + if (NULL == cb) { + tmp_ret = common::OB_ERR_UNEXPECTED; + RPC_LOG_RET(WARN, tmp_ret, "cb is null", KP(cb)); + } else { + RPC_LOG(INFO, "terminate the rpc of cb_list", K(cb->gtid_), K(cb->pkt_id_), K(index)); + int err = 0; + if ((err = pn_terminate_pkt(cb->gtid_, cb->pkt_id_)) != 0) { + tmp_ret = tranlate_to_ob_error(err); + RPC_LOG_RET(WARN, tmp_ret, "pn_terminate_pkt failed", K(err)); + } else { + cb = cb->get_next(); + ++index; + } + } + } + if (index == cb_list_.get_size()) { + has_terminated = true; + } + } } // set results diff --git a/src/sql/engine/px/ob_px_sqc_async_proxy.cpp b/src/sql/engine/px/ob_px_sqc_async_proxy.cpp index 126d3bd3b..a4ca0c73c 100644 --- a/src/sql/engine/px/ob_px_sqc_async_proxy.cpp +++ b/src/sql/engine/px/ob_px_sqc_async_proxy.cpp @@ -253,20 +253,15 @@ void ObPxSqcAsyncProxy::fail_process() { K(return_cb_count_), K(callbacks_.count())); ARRAY_FOREACH_X(callbacks_, idx, count, true) { ObSqcAsyncCB &callback = *callbacks_.at(idx); - { - // avoid rpc thread access the callback currently. - ObThreadCondGuard guard(callback.get_cond()); - if (!callback.is_visited() && - !(callback.is_processed() || callback.is_timeout() || callback.is_invalid())) { - // unregister async callbacks that have not received response. - ObAsyncRespCallback *async_cb = static_cast(callback.low_level_cb_); - uint64_t gtid = async_cb->gtid_; - uint32_t pkt_id = async_cb->pkt_id_; - int err = 0; - if ((err = pn_terminate_pkt(gtid, pkt_id)) != 0) { - int ret = tranlate_to_ob_error(err); - LOG_WARN("terminate pkt failed", K(ret), K(err)); - } + if (!callback.is_visited() && + !(callback.is_processed() || callback.is_timeout() || callback.is_invalid())) { + // unregister async callbacks that have not received response. + uint64_t gtid = callback.gtid_; + uint32_t pkt_id = callback.pkt_id_; + int err = 0; + if ((err = pn_terminate_pkt(gtid, pkt_id)) != 0) { + int ret = tranlate_to_ob_error(err); + LOG_WARN("terminate pkt failed", K(ret), K(err)); } } } diff --git a/src/storage/tablelock/ob_table_lock_rpc_client.cpp b/src/storage/tablelock/ob_table_lock_rpc_client.cpp index 180bcd200..9d5a94456 100644 --- a/src/storage/tablelock/ob_table_lock_rpc_client.cpp +++ b/src/storage/tablelock/ob_table_lock_rpc_client.cpp @@ -59,6 +59,8 @@ int ObTableLockRpcClient::init() if (OB_FAIL(table_lock_rpc_proxy_.init(GCTX.net_frame_->get_req_transport(), GCTX.self_addr()))) { LOG_WARN("failed to init rpc proxy", K(ret)); + } else { + table_lock_rpc_proxy_.set_detect_session_killed(true); } return ret; } diff --git a/src/storage/tablelock/ob_table_lock_service.cpp b/src/storage/tablelock/ob_table_lock_service.cpp index 56870ebf6..bdd5c44aa 100644 --- a/src/storage/tablelock/ob_table_lock_service.cpp +++ b/src/storage/tablelock/ob_table_lock_service.cpp @@ -1760,7 +1760,9 @@ int ObTableLockService::batch_pre_check_lock_(ObTableLockCtx &ctx, int last_ret = OB_SUCCESS; int64_t USLEEP_TIME = 100; // 0.1 ms bool need_retry = false; - ObBatchLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::batch_lock_obj); + obrpc::ObSrvRpcProxy rpc_proxy(*GCTX.srv_rpc_proxy_); + rpc_proxy.set_detect_session_killed(true); + ObBatchLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::batch_lock_obj); // only used in LOCK_TABLE/LOCK_PARTITION if (LOCK_TABLE == ctx.task_type_ || LOCK_PARTITION == ctx.task_type_) { @@ -1864,7 +1866,9 @@ int ObTableLockService::pre_check_lock_old_version_(ObTableLockCtx &ctx, ObRetryCtx retry_ctx; ObAddr addr; ObTableLockTaskRequest request; - ObTableLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::lock_table); + obrpc::ObSrvRpcProxy rpc_proxy(*GCTX.srv_rpc_proxy_); + rpc_proxy.set_detect_session_killed(true); + ObTableLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::lock_table); // only used in LOCK_TABLE/LOCK_PARTITION if (LOCK_TABLE == ctx.task_type_ || LOCK_PARTITION == ctx.task_type_) { @@ -2306,11 +2310,13 @@ int ObTableLockService::inner_process_obj_lock_batch_(ObTableLockCtx &ctx, const ObTableLockOwnerID lock_owner) { int ret = OB_SUCCESS; + obrpc::ObSrvRpcProxy rpc_proxy(*GCTX.srv_rpc_proxy_); + rpc_proxy.set_detect_session_killed(true); if (ctx.is_unlock_task()) { - ObHighPriorityBatchLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::batch_unlock_obj); + ObHighPriorityBatchLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::batch_unlock_obj); ret = batch_rpc_handle_(proxy_batch, ctx, lock_map, lock_mode, lock_owner); } else { - ObBatchLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::batch_lock_obj); + ObBatchLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::batch_lock_obj); ret = batch_rpc_handle_(proxy_batch, ctx, lock_map, lock_mode, lock_owner); } @@ -2324,12 +2330,14 @@ int ObTableLockService::inner_process_obj_lock_old_version_(ObTableLockCtx &ctx, const ObTableLockOwnerID lock_owner) { int ret = OB_SUCCESS; + obrpc::ObSrvRpcProxy rpc_proxy(*GCTX.srv_rpc_proxy_); + rpc_proxy.set_detect_session_killed(true); // TODO: yanyuan.cxf we process the rpc one by one and do parallel later. if (ctx.is_unlock_task()) { - ObHighPriorityTableLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::unlock_table); + ObHighPriorityTableLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::unlock_table); ret = parallel_rpc_handle_(proxy_batch, ctx, lock_map, ls_lock_map, lock_mode, lock_owner); } else { - ObTableLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::lock_table); + ObTableLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::lock_table); ret = parallel_rpc_handle_(proxy_batch, ctx, lock_map, ls_lock_map, lock_mode, lock_owner); }