move gtid/pkt_id to ObReqTransport::AsyncCB and support terminate rpc in ObAsyncRpcProxy

This commit is contained in:
liucc1997 2024-08-22 05:11:17 +00:00 committed by ob-robot
parent 78a2108dc6
commit e378f7f147
9 changed files with 80 additions and 34 deletions

View File

@ -94,7 +94,8 @@ public:
{
public:
AsyncCB(int pcode)
: low_level_cb_(NULL), dst_(), timeout_(0), tenant_id_(0),
: low_level_cb_(NULL), gtid_(0), pkt_id_(0),
dst_(), timeout_(0), tenant_id_(0),
err_(0), pcode_(pcode), send_ts_(0), payload_(0)
{}
virtual ~AsyncCB() {}
@ -130,6 +131,8 @@ public:
obrpc::ObRpcPacketCode get_pcode() const { return static_cast<obrpc::ObRpcPacketCode>(pcode_); }
void* low_level_cb_;
uint64_t gtid_;
uint32_t pkt_id_;
private:
static const int64_t REQUEST_ITEM_COST_RT = 100 * 1000; // 100ms
protected:

View File

@ -67,7 +67,8 @@ int ObSyncRespCallback::wait(const int64_t wait_timeout_us, const int64_t pcode,
const struct timespec ts = {1, 0};
bool has_terminated = false;
while(ATOMIC_LOAD(&cond_) == 0) {
if (OB_UNLIKELY((obrpc::OB_REMOTE_SYNC_EXECUTE == pcode || obrpc::OB_REMOTE_EXECUTE == pcode)
if (OB_UNLIKELY((obrpc::OB_REMOTE_SYNC_EXECUTE == pcode || obrpc::OB_REMOTE_EXECUTE == pcode
|| proxy_.is_detect_session_killed())
&& !has_terminated
&& OB_ERR_SESSION_INTERRUPTED == THIS_WORKER.check_status())) {
RPC_LOG(INFO, "check session killed, will execute pn_terminate_pkt", K(gtid_), K(pkt_id_));

View File

@ -28,8 +28,8 @@ namespace obrpc
class ObSyncRespCallback
{
public:
ObSyncRespCallback(ObRpcMemPool& pool)
: pkt_nio_cb_(NULL), pool_(pool), resp_(NULL), sz_(0), cond_(0), send_ret_(common::OB_SUCCESS), gtid_(0), pkt_id_(0){}
ObSyncRespCallback(ObRpcMemPool& pool, ObRpcProxy& proxy)
: pkt_nio_cb_(NULL), pool_(pool), proxy_(proxy), resp_(NULL), sz_(0), cond_(0), send_ret_(common::OB_SUCCESS){}
~ObSyncRespCallback() {}
void* alloc(int64_t sz) { return pool_.alloc(sz); }
int handle_resp(int io_err, const char* buf, int64_t sz);
@ -45,6 +45,7 @@ public:
private:
void* pkt_nio_cb_;
ObRpcMemPool& pool_;
const ObRpcProxy& proxy_;
char* resp_;
int64_t sz_;
int cond_;
@ -77,9 +78,6 @@ private:
void* pkt_nio_cb_;
ObRpcMemPool& pool_;
UAsyncCB* ucb_;
public:
uint64_t gtid_;
uint32_t pkt_id_;
};
void init_ucb(ObRpcProxy& proxy, UAsyncCB* ucb, const common::ObAddr& addr, int64_t send_ts, int64_t payload_sz);
@ -125,7 +123,7 @@ public:
auto &set = obrpc::ObRpcPacketSet::instance();
const char* pcode_label = set.name_of_idx(set.idx_of_pcode(pcode));
ObRpcMemPool pool(src_tenant_id, pcode_label);
ObSyncRespCallback cb(pool);
ObSyncRespCallback cb(pool, proxy);
char* req = NULL;
int64_t req_sz = 0;
const char* resp = NULL;
@ -244,8 +242,8 @@ public:
set_ucb_args(newcb, args);
init_ucb(proxy, cb->get_ucb(), addr, start_ts, req_sz);
}
cb->gtid_ = (pnio_group_id<<32) + thread_id;
pkt_id_ptr = &cb->pkt_id_;
ucb->gtid_ = (pnio_group_id<<32) + thread_id;
pkt_id_ptr = &ucb->pkt_id_;
}
IGNORE_RETURN snprintf(rpc_timeguard_str, sizeof(rpc_timeguard_str), "sz=%ld,pcode=%x,id=%ld", req_sz, pcode, src_tenant_id);
timeguard.click(rpc_timeguard_str);

View File

@ -119,7 +119,8 @@ public:
max_process_handler_time_(0), compressor_type_(common::INVALID_COMPRESSOR),
src_cluster_id_(common::OB_INVALID_CLUSTER_ID),
dst_cluster_id_(common::OB_INVALID_CLUSTER_ID), init_(false),
active_(true), is_trace_time_(false), do_ratelimit_(false), is_bg_flow_(0), rcode_() {}
active_(true), is_trace_time_(false), do_ratelimit_(false),
do_detect_session_killed_(false), is_bg_flow_(0), rcode_() {}
virtual ~ObRpcProxy() = default;
int init(const rpc::frame::ObReqTransport *transport,
@ -133,6 +134,8 @@ public:
int64_t get_timeout() const { return timeout_; }
void set_trace_time(const bool is_trace_time) { is_trace_time_ = is_trace_time; }
void set_ratelimit(const bool do_ratelimit) { do_ratelimit_ = do_ratelimit; }
void set_detect_session_killed(const bool do_detect) { do_detect_session_killed_ = do_detect; }
bool is_detect_session_killed() const { return do_detect_session_killed_; }
void set_bg_flow(const int8_t is_bg_flow) { is_bg_flow_ = is_bg_flow;}
void set_max_process_handler_time(const uint32_t max_process_handler_time)
{ max_process_handler_time_ = max_process_handler_time; }
@ -261,6 +264,7 @@ protected:
bool active_;
bool is_trace_time_;
bool do_ratelimit_;
bool do_detect_session_killed_;
int8_t is_bg_flow_;
ObRpcResultCode rcode_;
};
@ -391,6 +395,11 @@ extern ObRpcProxy::NoneT None;
set_ratelimit(do_ratelimit); \
return *this; \
} \
inline CLS& detect_session_killed(const bool do_detect) \
{ \
set_detect_session_killed(do_detect); \
return *this; \
} \
inline CLS& bg_flow(const uint32_t is_bg_flow) \
{ \
set_bg_flow(is_bg_flow); \

View File

@ -55,7 +55,7 @@ int SSHandle<pcodeStruct>::get_more(typename pcodeStruct::Response &result)
auto &set = obrpc::ObRpcPacketSet::instance();
const char* pcode_label = set.name_of_idx(set.idx_of_pcode(pcode_));
ObRpcMemPool pool(src_tenant_id, pcode_label);
ObSyncRespCallback cb(pool);
ObSyncRespCallback cb(pool, proxy_);
char* pnio_req = NULL;
int64_t pnio_req_sz = 0, resp_sz = 0;
const char* resp = NULL;
@ -205,7 +205,7 @@ int SSHandle<pcodeStruct>::abort()
auto &set = obrpc::ObRpcPacketSet::instance();
const char* pcode_label = set.name_of_idx(set.idx_of_pcode(pcode_));
ObRpcMemPool pool(src_tenant_id, pcode_label);
ObSyncRespCallback cb(pool);
ObSyncRespCallback cb(pool, proxy_);
char* pnio_req = NULL;
int64_t pnio_req_sz = 0, resp_sz = 0;
const char* resp = NULL;

View File

@ -21,6 +21,8 @@
#include "rpc/obrpc/ob_rpc_packet.h"
#include "rpc/obrpc/ob_rpc_result_code.h"
#include "rpc/obrpc/ob_rpc_proxy.h"
#include "share/ob_errno.h"
#include "lib/worker.h"
namespace oceanbase
{
@ -403,8 +405,36 @@ int ObAsyncRpcProxy<PC, RpcArg, RpcResult, Func, RpcProxy>::wait(
RPC_LOG(WARN, "inner stat error", K_(response_count), "cb_count",
cb_list_.get_size(), K(ret));
} else {
bool has_terminated = false;
while (response_count_ < cb_list_.get_size()) {
cond_.wait();
cond_.wait(1000);
if (OB_UNLIKELY(rpc_proxy_.is_detect_session_killed()
&& !has_terminated
&& OB_ERR_SESSION_INTERRUPTED == THIS_WORKER.check_status())) {
RPC_LOG(INFO, "check session killed, will terminate all rpc locally", K(response_count_), K(cb_list_.get_size()));
int tmp_ret = OB_SUCCESS;
int index = 0;
ObAsyncCB<PC, ObAsyncRpcProxy, RpcProxy> *cb = cb_list_.get_first();
while (common::OB_SUCCESS == tmp_ret && cb != cb_list_.get_header()) {
if (NULL == cb) {
tmp_ret = common::OB_ERR_UNEXPECTED;
RPC_LOG_RET(WARN, tmp_ret, "cb is null", KP(cb));
} else {
RPC_LOG(INFO, "terminate the rpc of cb_list", K(cb->gtid_), K(cb->pkt_id_), K(index));
int err = 0;
if ((err = pn_terminate_pkt(cb->gtid_, cb->pkt_id_)) != 0) {
tmp_ret = tranlate_to_ob_error(err);
RPC_LOG_RET(WARN, tmp_ret, "pn_terminate_pkt failed", K(err));
} else {
cb = cb->get_next();
++index;
}
}
}
if (index == cb_list_.get_size()) {
has_terminated = true;
}
}
}
// set results

View File

@ -253,20 +253,15 @@ void ObPxSqcAsyncProxy::fail_process() {
K(return_cb_count_), K(callbacks_.count()));
ARRAY_FOREACH_X(callbacks_, idx, count, true) {
ObSqcAsyncCB &callback = *callbacks_.at(idx);
{
// avoid rpc thread access the callback currently.
ObThreadCondGuard guard(callback.get_cond());
if (!callback.is_visited() &&
!(callback.is_processed() || callback.is_timeout() || callback.is_invalid())) {
// unregister async callbacks that have not received response.
ObAsyncRespCallback *async_cb = static_cast<ObAsyncRespCallback *>(callback.low_level_cb_);
uint64_t gtid = async_cb->gtid_;
uint32_t pkt_id = async_cb->pkt_id_;
int err = 0;
if ((err = pn_terminate_pkt(gtid, pkt_id)) != 0) {
int ret = tranlate_to_ob_error(err);
LOG_WARN("terminate pkt failed", K(ret), K(err));
}
if (!callback.is_visited() &&
!(callback.is_processed() || callback.is_timeout() || callback.is_invalid())) {
// unregister async callbacks that have not received response.
uint64_t gtid = callback.gtid_;
uint32_t pkt_id = callback.pkt_id_;
int err = 0;
if ((err = pn_terminate_pkt(gtid, pkt_id)) != 0) {
int ret = tranlate_to_ob_error(err);
LOG_WARN("terminate pkt failed", K(ret), K(err));
}
}
}

View File

@ -59,6 +59,8 @@ int ObTableLockRpcClient::init()
if (OB_FAIL(table_lock_rpc_proxy_.init(GCTX.net_frame_->get_req_transport(),
GCTX.self_addr()))) {
LOG_WARN("failed to init rpc proxy", K(ret));
} else {
table_lock_rpc_proxy_.set_detect_session_killed(true);
}
return ret;
}

View File

@ -1760,7 +1760,9 @@ int ObTableLockService::batch_pre_check_lock_(ObTableLockCtx &ctx,
int last_ret = OB_SUCCESS;
int64_t USLEEP_TIME = 100; // 0.1 ms
bool need_retry = false;
ObBatchLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::batch_lock_obj);
obrpc::ObSrvRpcProxy rpc_proxy(*GCTX.srv_rpc_proxy_);
rpc_proxy.set_detect_session_killed(true);
ObBatchLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::batch_lock_obj);
// only used in LOCK_TABLE/LOCK_PARTITION
if (LOCK_TABLE == ctx.task_type_ ||
LOCK_PARTITION == ctx.task_type_) {
@ -1864,7 +1866,9 @@ int ObTableLockService::pre_check_lock_old_version_(ObTableLockCtx &ctx,
ObRetryCtx retry_ctx;
ObAddr addr;
ObTableLockTaskRequest request;
ObTableLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::lock_table);
obrpc::ObSrvRpcProxy rpc_proxy(*GCTX.srv_rpc_proxy_);
rpc_proxy.set_detect_session_killed(true);
ObTableLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::lock_table);
// only used in LOCK_TABLE/LOCK_PARTITION
if (LOCK_TABLE == ctx.task_type_ ||
LOCK_PARTITION == ctx.task_type_) {
@ -2306,11 +2310,13 @@ int ObTableLockService::inner_process_obj_lock_batch_(ObTableLockCtx &ctx,
const ObTableLockOwnerID lock_owner)
{
int ret = OB_SUCCESS;
obrpc::ObSrvRpcProxy rpc_proxy(*GCTX.srv_rpc_proxy_);
rpc_proxy.set_detect_session_killed(true);
if (ctx.is_unlock_task()) {
ObHighPriorityBatchLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::batch_unlock_obj);
ObHighPriorityBatchLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::batch_unlock_obj);
ret = batch_rpc_handle_(proxy_batch, ctx, lock_map, lock_mode, lock_owner);
} else {
ObBatchLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::batch_lock_obj);
ObBatchLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::batch_lock_obj);
ret = batch_rpc_handle_(proxy_batch, ctx, lock_map, lock_mode, lock_owner);
}
@ -2324,12 +2330,14 @@ int ObTableLockService::inner_process_obj_lock_old_version_(ObTableLockCtx &ctx,
const ObTableLockOwnerID lock_owner)
{
int ret = OB_SUCCESS;
obrpc::ObSrvRpcProxy rpc_proxy(*GCTX.srv_rpc_proxy_);
rpc_proxy.set_detect_session_killed(true);
// TODO: yanyuan.cxf we process the rpc one by one and do parallel later.
if (ctx.is_unlock_task()) {
ObHighPriorityTableLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::unlock_table);
ObHighPriorityTableLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::unlock_table);
ret = parallel_rpc_handle_(proxy_batch, ctx, lock_map, ls_lock_map, lock_mode, lock_owner);
} else {
ObTableLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::lock_table);
ObTableLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::lock_table);
ret = parallel_rpc_handle_(proxy_batch, ctx, lock_map, ls_lock_map, lock_mode, lock_owner);
}