fix px hang when sqc report failed
This commit is contained in:
@ -54,7 +54,8 @@ void ObInitSqcP::destroy()
|
||||
* after process这个流程,那么就应当由自己来做释放。
|
||||
*/
|
||||
if (OB_NOT_NULL(arg_.sqc_handler_)) {
|
||||
ObPxSqcHandler::release_handler(arg_.sqc_handler_);
|
||||
int report_ret = OB_SUCCESS;
|
||||
ObPxSqcHandler::release_handler(arg_.sqc_handler_, report_ret);
|
||||
}
|
||||
ObActiveSessionGuard::setup_default_ash();
|
||||
}
|
||||
@ -111,7 +112,8 @@ int ObInitSqcP::process()
|
||||
UNSET_INTERRUPTABLE(arg.sqc_.get_interrupt_id().px_interrupt_id_);
|
||||
unregister_interrupt_ = false;
|
||||
}
|
||||
ObPxSqcHandler::release_handler(sqc_handler);
|
||||
int report_ret = OB_SUCCESS;
|
||||
ObPxSqcHandler::release_handler(sqc_handler, report_ret);
|
||||
arg_.sqc_handler_ = nullptr;
|
||||
}
|
||||
|
||||
@ -247,10 +249,10 @@ int ObInitSqcP::after_process(int error_code)
|
||||
if (OB_NOT_NULL(sqc_handler) && OB_SUCCESS == sqc_handler->get_end_ret()) {
|
||||
sqc_handler->set_end_ret(ret);
|
||||
}
|
||||
ObPxSqcHandler::release_handler(sqc_handler);
|
||||
int report_ret = OB_SUCCESS;
|
||||
ObPxSqcHandler::release_handler(sqc_handler, report_ret);
|
||||
arg_.sqc_handler_ = nullptr;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -372,7 +374,8 @@ void ObInitFastSqcP::destroy()
|
||||
* after process这个流程,那么就应当由自己来做释放。
|
||||
*/
|
||||
if (OB_NOT_NULL(arg_.sqc_handler_)) {
|
||||
ObPxSqcHandler::release_handler(arg_.sqc_handler_);
|
||||
int report_ret = OB_SUCCESS;
|
||||
ObPxSqcHandler::release_handler(arg_.sqc_handler_, report_ret);
|
||||
}
|
||||
ObActiveSessionGuard::setup_default_ash();
|
||||
}
|
||||
@ -430,7 +433,6 @@ int ObInitFastSqcP::process()
|
||||
}
|
||||
|
||||
ObActiveSessionGuard::get_stat().in_sql_execution_ = false;
|
||||
|
||||
if (OB_NOT_NULL(sqc_handler)) {
|
||||
// link channel之前或者link过程可能会失败.
|
||||
// 如果sqc和qc没有link, 由response将 ret 通知给px.
|
||||
@ -440,7 +442,9 @@ int ObInitFastSqcP::process()
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
sqc_handler->reset_reference_count();
|
||||
ObPxSqcHandler::release_handler(sqc_handler);
|
||||
int report_ret = OB_SUCCESS;
|
||||
ObPxSqcHandler::release_handler(sqc_handler, report_ret);
|
||||
ret = OB_SUCCESS == ret ? report_ret : ret;
|
||||
arg_.sqc_handler_ = nullptr;
|
||||
}
|
||||
return ret;
|
||||
|
||||
@ -136,14 +136,14 @@ ObPxSqcHandler *ObPxSqcHandler::get_sqc_handler()
|
||||
return op_reclaim_alloc(ObPxSqcHandler);
|
||||
}
|
||||
|
||||
void ObPxSqcHandler::release_handler(ObPxSqcHandler *sqc_handler)
|
||||
void ObPxSqcHandler::release_handler(ObPxSqcHandler *sqc_handler, int &report_ret)
|
||||
{
|
||||
bool all_released = false;
|
||||
if (OB_ISNULL(sqc_handler)) {
|
||||
LOG_ERROR_RET(OB_INVALID_ARGUMENT, "Get null sqc handler", K(sqc_handler));
|
||||
} else if (FALSE_IT(sqc_handler->release(all_released))) {
|
||||
} else if (all_released) {
|
||||
IGNORE_RETURN sqc_handler->destroy_sqc();
|
||||
IGNORE_RETURN sqc_handler->destroy_sqc(report_ret);
|
||||
sqc_handler->reset();
|
||||
op_reclaim_free(sqc_handler);
|
||||
}
|
||||
@ -302,10 +302,11 @@ bool ObPxSqcHandler::all_task_success()
|
||||
return bret;
|
||||
}
|
||||
|
||||
int ObPxSqcHandler::destroy_sqc()
|
||||
int ObPxSqcHandler::destroy_sqc(int &report_ret)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int end_ret = OB_SUCCESS;
|
||||
report_ret = OB_SUCCESS;
|
||||
sub_coord_->destroy_first_buffer_cache();
|
||||
// end_ret_记录的错误时SQC end process的时候发生的错误,该时刻语句已经执行完成,收尾工作发生了
|
||||
// 问题。相比起事务的错误码,收尾的错误码优先级更低。
|
||||
@ -330,7 +331,8 @@ int ObPxSqcHandler::destroy_sqc()
|
||||
* 了。有任何一个标记启动的sqc不report,qc都会一直等待直到超时。
|
||||
*/
|
||||
if (OB_FAIL(sub_coord_->report_sqc_finish(end_ret))) {
|
||||
LOG_WARN("fail report sqc to qc", K(ret));
|
||||
LOG_WARN("fail report sqc to qc", K(ret), K(end_ret_));
|
||||
report_ret = ret;
|
||||
}
|
||||
if (OB_NOT_NULL(des_phy_plan_) && des_phy_plan_->is_enable_px_fast_reclaim()) {
|
||||
(void) ObDetectManagerUtils::sqc_unregister_detectable_id_from_dm(
|
||||
|
||||
@ -69,7 +69,7 @@ public:
|
||||
~ObPxSqcHandler() = default;
|
||||
static constexpr const char *OP_LABEL = ObModIds::ObModIds::OB_SQL_SQC_HANDLER;
|
||||
static ObPxSqcHandler *get_sqc_handler();
|
||||
static void release_handler(ObPxSqcHandler *sqc_handler);
|
||||
static void release_handler(ObPxSqcHandler *sqc_handler, int &report_ret);
|
||||
void reset() ;
|
||||
void release(bool &all_released) {
|
||||
int64_t reference_count = ATOMIC_AAF(&reference_count_, -1);
|
||||
@ -130,7 +130,7 @@ public:
|
||||
|
||||
private:
|
||||
void init_flt_content();
|
||||
int destroy_sqc();
|
||||
int destroy_sqc(int &report_ret);
|
||||
private:
|
||||
lib::MemoryContext mem_context_;
|
||||
uint64_t tenant_id_;
|
||||
|
||||
@ -217,7 +217,8 @@ void PxWorkerFunctor::operator ()()
|
||||
* 当计数器为0的时候会真正释放sqc handler的内存。请保证所有的
|
||||
* 内存使用超出次函数。释放的时候必须在租户的space中,所以不能放到外面了。
|
||||
*/
|
||||
ObPxSqcHandler::release_handler(sqc_handler);
|
||||
int report_ret = OB_SUCCESS;
|
||||
ObPxSqcHandler::release_handler(sqc_handler, report_ret);
|
||||
// 环境清理
|
||||
ObCurTraceId::reset();
|
||||
if (enable_trace_log) {
|
||||
|
||||
Reference in New Issue
Block a user