[CP] bugfix: thread deadlock caused by local retries
This commit is contained in:
parent
4ec2a9f224
commit
11be84be20
@ -502,6 +502,7 @@ int ObMPQuery::process_single_stmt(const ObMultiStmtItem &multi_stmt_item,
|
||||
//每次执行不同sql都需要更新
|
||||
ctx_.self_add_plan_ = false;
|
||||
retry_ctrl_.reset_retry_times();//每个statement单独记录retry times
|
||||
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY);
|
||||
do {
|
||||
ret = OB_SUCCESS; //当发生本地重试的时候,需要重置错误码,不然无法推进重试
|
||||
need_disconnect = true;
|
||||
@ -580,7 +581,6 @@ OB_NOINLINE int ObMPQuery::process_with_tmp_context(ObSQLSessionInfo &session,
|
||||
bool &need_disconnect)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY);
|
||||
//create a temporary memory context to process retry or the rest sql of multi-query,
|
||||
//avoid memory dynamic leaks caused by query retry or too many multi-query items
|
||||
lib::ContextParam param;
|
||||
|
@ -1496,7 +1496,6 @@ OB_NOINLINE int ObMPStmtExecute::process_retry(ObSQLSessionInfo &session,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
//create a temporary memory context to process retry, avoid memory bloat caused by retries
|
||||
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY);
|
||||
lib::ContextParam param;
|
||||
param.set_mem_attr(MTL_ID(),
|
||||
ObModIds::OB_SQL_EXECUTOR, ObCtxIds::DEFAULT_CTX_ID)
|
||||
@ -1524,6 +1523,7 @@ int ObMPStmtExecute::do_process_single(ObSQLSessionInfo &session,
|
||||
int ret = OB_SUCCESS;
|
||||
// 每次执行不同sql都需要更新
|
||||
ctx_.self_add_plan_ = false;
|
||||
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY);
|
||||
do {
|
||||
// 每次都必须设置为OB_SCCESS, 否则可能会因为没有调用do_process()造成死循环
|
||||
ret = OB_SUCCESS;
|
||||
|
@ -196,6 +196,7 @@ int ObMPStmtPrexecute::before_process()
|
||||
session->get_effective_tenant_id()))) {
|
||||
LOG_WARN("failed to check_and_refresh_schema", K(ret));
|
||||
} else {
|
||||
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY);
|
||||
do {
|
||||
share::schema::ObSchemaGetterGuard schema_guard;
|
||||
const uint64_t tenant_id = session->get_effective_tenant_id();
|
||||
@ -253,9 +254,6 @@ int ObMPStmtPrexecute::before_process()
|
||||
session->set_session_in_retry(retry_ctrl_.need_retry());
|
||||
}
|
||||
}
|
||||
if (RETRY_TYPE_LOCAL == retry_ctrl_.get_retry_type()) {
|
||||
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY);
|
||||
}
|
||||
} while (RETRY_TYPE_LOCAL == retry_ctrl_.get_retry_type());
|
||||
if (OB_SUCC(ret) && retry_ctrl_.get_retry_times() > 0) {
|
||||
LOG_TRACE("sql retry succeed", K(ret),
|
||||
|
@ -37,7 +37,7 @@ int ObDASCtx::init(const ObPhysicalPlan &plan, ObExecContext &ctx)
|
||||
const ObIArray<ObTableLocation> &normal_locations = plan.get_table_locations();
|
||||
const ObIArray<ObTableLocation> &das_locations = plan.get_das_table_locations();
|
||||
location_router_.set_last_errno(ctx.get_my_session()->get_retry_info().get_last_query_retry_err());
|
||||
location_router_.set_total_retry_cnt(ctx.get_my_session()->get_retry_info().get_retry_cnt());
|
||||
location_router_.set_history_retry_cnt(ctx.get_my_session()->get_retry_info().get_retry_cnt());
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < das_locations.count(); ++i) {
|
||||
const ObTableLocation &das_location = das_locations.at(i);
|
||||
ObDASTableLoc *table_loc = nullptr;
|
||||
|
@ -743,7 +743,7 @@ int ObDASTabletMapper::get_partition_id_map(ObObjectID partition_id,
|
||||
ObDASLocationRouter::ObDASLocationRouter(ObIAllocator &allocator)
|
||||
: last_errno_(OB_SUCCESS),
|
||||
cur_errno_(OB_SUCCESS),
|
||||
total_retry_cnt_(0),
|
||||
history_retry_cnt_(0),
|
||||
cur_retry_cnt_(0),
|
||||
all_tablet_list_(allocator),
|
||||
succ_tablet_list_(allocator),
|
||||
@ -1238,7 +1238,7 @@ int ObDASLocationRouter::block_renew_tablet_location(const ObTabletID &tablet_id
|
||||
void ObDASLocationRouter::set_retry_info(const ObQueryRetryInfo* retry_info)
|
||||
{
|
||||
last_errno_ = retry_info->get_last_query_retry_err();
|
||||
total_retry_cnt_ = retry_info->get_retry_cnt();
|
||||
history_retry_cnt_ = retry_info->get_retry_cnt();
|
||||
}
|
||||
|
||||
int ObDASLocationRouter::get_external_table_ls_location(ObLSLocation &location)
|
||||
|
@ -310,13 +310,13 @@ public:
|
||||
int save_touched_tablet_id(const common::ObTabletID &tablet_id) { return all_tablet_list_.push_back(tablet_id); }
|
||||
void set_last_errno(int err_no) { last_errno_ = err_no; }
|
||||
int get_last_errno() const { return last_errno_; }
|
||||
void set_total_retry_cnt(int64_t total_retry_cnt) { total_retry_cnt_ = total_retry_cnt; }
|
||||
void set_history_retry_cnt(int64_t history_retry_cnt) { history_retry_cnt_ = history_retry_cnt; }
|
||||
void accumulate_retry_count()
|
||||
{
|
||||
total_retry_cnt_ += cur_retry_cnt_;
|
||||
history_retry_cnt_ += cur_retry_cnt_;
|
||||
cur_retry_cnt_ = 0;
|
||||
}
|
||||
int64_t get_total_retry_cnt() const { return total_retry_cnt_; }
|
||||
int64_t get_total_retry_cnt() const { return history_retry_cnt_ + cur_retry_cnt_; }
|
||||
int64_t get_cur_retry_cnt() const { return cur_retry_cnt_; }
|
||||
void reset_cur_retry_cnt() { cur_retry_cnt_ = 0; }
|
||||
void inc_cur_retry_cnt() { ++cur_retry_cnt_; }
|
||||
@ -345,7 +345,7 @@ private:
|
||||
private:
|
||||
int last_errno_;
|
||||
int cur_errno_;
|
||||
int64_t total_retry_cnt_;
|
||||
int64_t history_retry_cnt_; //Total number of retries before the current retry round.
|
||||
int64_t cur_retry_cnt_; // the counter of continuous retry
|
||||
// NOTE: Only all_tablet_list_ needs to be serialized and send to other server to perform das remote execution;
|
||||
// And other members will be collected by execution server self, No need to perform serialization;
|
||||
|
@ -248,6 +248,7 @@ int ObDataAccessService::retry_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op
|
||||
bool retry_continue = false;
|
||||
ObDASLocationRouter &location_router = DAS_CTX(das_ref.get_exec_ctx()).get_location_router();
|
||||
location_router.reset_cur_retry_cnt();
|
||||
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY);
|
||||
do {
|
||||
ObDASRetryCtrl::retry_func retry_func = nullptr;
|
||||
|
||||
@ -277,7 +278,6 @@ int ObDataAccessService::retry_das_task(ObDASRef &das_ref, ObIDASTaskOp &task_op
|
||||
task_op.in_part_retry_ = true;
|
||||
location_router.set_last_errno(task_op.get_errcode());
|
||||
location_router.inc_cur_retry_cnt();
|
||||
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY);
|
||||
if (OB_TMP_FAIL(clear_task_exec_env(das_ref, task_op))) {
|
||||
LOG_WARN("clear task execution environment failed", K(tmp_ret));
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user