fix das retry thread deadlock

This commit is contained in:
leslieyuchen
2023-08-24 07:40:33 +00:00
committed by ob-robot
parent a9fbca91d0
commit bb35a1f410
24 changed files with 105 additions and 81 deletions

View File

@ -394,7 +394,7 @@ int ObLoadDataBase::memory_wait_local(ObExecContext &ctx,
}
//if it is location exception, refresh location cache with block interface
//because load data can only local retry
loc_router.refresh_location_cache(false, ret);
loc_router.refresh_location_cache_by_errno(false, ret);
}
//print info

View File

@ -902,6 +902,7 @@ public:
task_channel_(NULL),
sqc_channel_(NULL),
rc_(TASK_DEFAULT_RET_VALUE), // 小于等于 0 表示设置了 rc 值
das_retry_rc_(common::OB_SUCCESS),
state_(0),
task_co_id_(0),
px_int_id_(),
@ -956,6 +957,7 @@ public:
K_(exec_addr),
K_(qc_addr),
K_(rc),
K_(das_retry_rc),
K_(task_co_id),
K_(px_int_id),
K_(is_fulltree),
@ -990,6 +992,9 @@ public:
inline void set_result(int rc) { rc_ = rc; }
inline bool has_result() const { return rc_ <= 0; }
inline int get_result() const { return rc_; }
void set_das_retry_rc(int das_retry_rc)
{ das_retry_rc_ = (das_retry_rc_ == common::OB_SUCCESS ? das_retry_rc : das_retry_rc_); }
int get_das_retry_rc() const { return das_retry_rc_; }
void set_exec_addr(const common::ObAddr &addr) { exec_addr_ = addr; }
void set_sqc_addr(const common::ObAddr &addr) { sqc_addr_ = addr; }
void set_qc_addr(const common::ObAddr &addr) { qc_addr_ = addr; }
@ -1025,6 +1030,7 @@ public:
common::ObAddr exec_addr_; /* Task 的运行地址 */
common::ObAddr qc_addr_; /*记录 QC 的地址,用于中断*/
int rc_;
int das_retry_rc_;
volatile int32_t state_; // 被 task 线程设置
volatile uint64_t task_co_id_; /* task 的协程 id */
ObPxInterruptID px_int_id_;

View File

@ -31,7 +31,7 @@ OB_SERIALIZE_MEMBER(ObPxTransmitDataChannelMsg, ch_sets_, part_affinity_map_, ch
OB_SERIALIZE_MEMBER(ObPxInitSqcResultMsg, dfo_id_, sqc_id_, rc_, task_count_, err_msg_);
OB_SERIALIZE_MEMBER(ObPxFinishSqcResultMsg, dfo_id_, sqc_id_, rc_, trans_result_,
task_monitor_info_array_, sqc_affected_rows_, dml_row_info_,
temp_table_id_, interm_result_ids_, fb_info_, err_msg_);
temp_table_id_, interm_result_ids_, fb_info_, err_msg_, das_retry_rc_);
OB_SERIALIZE_MEMBER(ObPxFinishTaskResultMsg, dfo_id_, sqc_id_, task_id_, rc_);
OB_SERIALIZE_MEMBER((ObPxBloomFilterChInfo, dtl::ObDtlChTotalInfo), filter_id_);
OB_SERIALIZE_MEMBER((ObPxBloomFilterChSet, dtl::ObDtlChSet), filter_id_, sqc_id_);

View File

@ -384,6 +384,7 @@ public:
: dfo_id_(common::OB_INVALID_ID),
sqc_id_(common::OB_INVALID_ID),
rc_(common::OB_SUCCESS),
das_retry_rc_(common::OB_SUCCESS),
task_monitor_info_array_(),
sqc_affected_rows_(0),
dml_row_info_(),
@ -396,20 +397,22 @@ public:
transaction::ObTxExecResult &get_trans_result() { return trans_result_; }
void reset()
{
dfo_id_ = OB_INVALID_ID;
sqc_id_ = OB_INVALID_ID;
rc_ = OB_SUCCESS;
dfo_id_ = common::OB_INVALID_ID;
sqc_id_ = common::OB_INVALID_ID;
rc_ = common::OB_SUCCESS;
das_retry_rc_ = common::OB_SUCCESS;
trans_result_.reset();
task_monitor_info_array_.reset();
dml_row_info_.reset();
fb_info_.reset();
err_msg_.reset();
}
TO_STRING_KV(K_(dfo_id), K_(sqc_id), K_(rc), K_(sqc_affected_rows));
TO_STRING_KV(K_(dfo_id), K_(sqc_id), K_(rc), K_(das_retry_rc), K_(sqc_affected_rows));
public:
int64_t dfo_id_;
int64_t sqc_id_;
int rc_; // 错误码
int das_retry_rc_; //record the error code that cause DAS to retry
transaction::ObTxExecResult trans_result_;
ObPxTaskMonitorInfoArray task_monitor_info_array_; // deprecated, keep for compatiablity
int64_t sqc_affected_rows_; // pdml情况下,一个sqc 影响的行数

View File

@ -384,10 +384,12 @@ int ObPxMsgProc::on_sqc_finish_msg(ObExecContext &ctx,
ObPxErrorUtil::update_qc_error_code(coord_info_.first_error_code_, pkt.rc_, pkt.err_msg_);
if (OB_SUCC(ret)) {
if (OB_FAIL(pkt.rc_)) {
DAS_CTX(ctx).get_location_router().save_cur_exec_status(pkt.rc_);
LOG_WARN("sqc fail, abort qc", K(pkt), K(ret), "sqc_addr", sqc->get_exec_addr());
} else {
// pkt rc_ == OB_SUCCESS
// 处理 dml + px 框架下的affected row
DAS_CTX(ctx).get_location_router().save_cur_exec_status(pkt.das_retry_rc_);
if (OB_ISNULL(ctx.get_physical_plan_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("phy plan ctx is null", K(ret));

View File

@ -456,6 +456,13 @@ int ObPxSQCProxy::report(int end_ret) const
// overwrite ret
ObPxTask &task = tasks.at(i);
ObPxErrorUtil::update_sqc_error_code(sqc_ret, task.get_result(), task.err_msg_, finish_msg.err_msg_);
if (OB_SUCCESS == finish_msg.das_retry_rc_) {
//Even if the PX task is successfully retried by DAS,
//it is necessary to collect the retry error code of each task and provide feedback to the QC node of PX,
//so that the QC node can refresh the location cache in a timely manner,
//avoiding the next execution from being sent to the same erroneous node.
finish_msg.das_retry_rc_ = task.get_das_retry_rc();
}
affected_rows += task.get_affected_rows();
finish_msg.dml_row_info_.add_px_dml_row_info(task.dml_row_info_);
finish_msg.temp_table_id_ = task.temp_table_id_;

View File

@ -456,9 +456,6 @@ int ObPxTaskProcess::do_process()
}
}
}
if (OB_NOT_NULL(arg_.exec_ctx_)) {
DAS_CTX(*arg_.exec_ctx_).get_location_router().refresh_location_cache(true, ret);
}
// for forward warning msg and user error msg
(void)record_user_error_msg(ret);
@ -474,6 +471,10 @@ int ObPxTaskProcess::do_process()
// Task 和 Sqc 在两个不同线程中时,task 需要和 sqc 通信
if (NULL != arg_.sqc_task_ptr_) {
arg_.sqc_task_ptr_->set_result(ret);
if (OB_NOT_NULL(arg_.exec_ctx_)) {
int das_retry_rc = DAS_CTX(*arg_.exec_ctx_).get_location_router().get_last_errno();
arg_.sqc_task_ptr_->set_das_retry_rc(das_retry_rc);
}
if (OB_SUCC(ret)) {
// nop
} else if (IS_INTERRUPTED()) {