diff --git a/src/sql/engine/px/ob_px_rpc_processor.cpp b/src/sql/engine/px/ob_px_rpc_processor.cpp index d94501a263..34f5e057b7 100644 --- a/src/sql/engine/px/ob_px_rpc_processor.cpp +++ b/src/sql/engine/px/ob_px_rpc_processor.cpp @@ -54,7 +54,8 @@ void ObInitSqcP::destroy() * after process这个流程,那么就应当由自己来做释放。 */ if (OB_NOT_NULL(arg_.sqc_handler_)) { - ObPxSqcHandler::release_handler(arg_.sqc_handler_); + int report_ret = OB_SUCCESS; + ObPxSqcHandler::release_handler(arg_.sqc_handler_, report_ret); } ObActiveSessionGuard::setup_default_ash(); } @@ -111,7 +112,8 @@ int ObInitSqcP::process() UNSET_INTERRUPTABLE(arg.sqc_.get_interrupt_id().px_interrupt_id_); unregister_interrupt_ = false; } - ObPxSqcHandler::release_handler(sqc_handler); + int report_ret = OB_SUCCESS; + ObPxSqcHandler::release_handler(sqc_handler, report_ret); arg_.sqc_handler_ = nullptr; } @@ -247,10 +249,10 @@ int ObInitSqcP::after_process(int error_code) if (OB_NOT_NULL(sqc_handler) && OB_SUCCESS == sqc_handler->get_end_ret()) { sqc_handler->set_end_ret(ret); } - ObPxSqcHandler::release_handler(sqc_handler); + int report_ret = OB_SUCCESS; + ObPxSqcHandler::release_handler(sqc_handler, report_ret); arg_.sqc_handler_ = nullptr; } - return ret; } @@ -372,7 +374,8 @@ void ObInitFastSqcP::destroy() * after process这个流程,那么就应当由自己来做释放。 */ if (OB_NOT_NULL(arg_.sqc_handler_)) { - ObPxSqcHandler::release_handler(arg_.sqc_handler_); + int report_ret = OB_SUCCESS; + ObPxSqcHandler::release_handler(arg_.sqc_handler_, report_ret); } ObActiveSessionGuard::setup_default_ash(); } @@ -430,7 +433,6 @@ int ObInitFastSqcP::process() } ObActiveSessionGuard::get_stat().in_sql_execution_ = false; - if (OB_NOT_NULL(sqc_handler)) { // link channel之前或者link过程可能会失败. // 如果sqc和qc没有link, 由response将 ret 通知给px. @@ -440,7 +442,9 @@ int ObInitFastSqcP::process() ret = OB_SUCCESS; } sqc_handler->reset_reference_count(); - ObPxSqcHandler::release_handler(sqc_handler); + int report_ret = OB_SUCCESS; + ObPxSqcHandler::release_handler(sqc_handler, report_ret); + ret = OB_SUCCESS == ret ? report_ret : ret; arg_.sqc_handler_ = nullptr; } return ret; diff --git a/src/sql/engine/px/ob_px_sqc_handler.cpp b/src/sql/engine/px/ob_px_sqc_handler.cpp index e19ffd714c..68b196acd1 100644 --- a/src/sql/engine/px/ob_px_sqc_handler.cpp +++ b/src/sql/engine/px/ob_px_sqc_handler.cpp @@ -136,14 +136,14 @@ ObPxSqcHandler *ObPxSqcHandler::get_sqc_handler() return op_reclaim_alloc(ObPxSqcHandler); } -void ObPxSqcHandler::release_handler(ObPxSqcHandler *sqc_handler) +void ObPxSqcHandler::release_handler(ObPxSqcHandler *sqc_handler, int &report_ret) { bool all_released = false; if (OB_ISNULL(sqc_handler)) { LOG_ERROR_RET(OB_INVALID_ARGUMENT, "Get null sqc handler", K(sqc_handler)); } else if (FALSE_IT(sqc_handler->release(all_released))) { } else if (all_released) { - IGNORE_RETURN sqc_handler->destroy_sqc(); + IGNORE_RETURN sqc_handler->destroy_sqc(report_ret); sqc_handler->reset(); op_reclaim_free(sqc_handler); } @@ -302,10 +302,11 @@ bool ObPxSqcHandler::all_task_success() return bret; } -int ObPxSqcHandler::destroy_sqc() +int ObPxSqcHandler::destroy_sqc(int &report_ret) { int ret = OB_SUCCESS; int end_ret = OB_SUCCESS; + report_ret = OB_SUCCESS; sub_coord_->destroy_first_buffer_cache(); // end_ret_记录的错误时SQC end process的时候发生的错误,该时刻语句已经执行完成,收尾工作发生了 // 问题。相比起事务的错误码,收尾的错误码优先级更低。 @@ -330,7 +331,8 @@ int ObPxSqcHandler::destroy_sqc() * 了。有任何一个标记启动的sqc不report,qc都会一直等待直到超时。 */ if (OB_FAIL(sub_coord_->report_sqc_finish(end_ret))) { - LOG_WARN("fail report sqc to qc", K(ret)); + LOG_WARN("fail report sqc to qc", K(ret), K(end_ret_)); + report_ret = ret; } if (OB_NOT_NULL(des_phy_plan_) && des_phy_plan_->is_enable_px_fast_reclaim()) { (void) ObDetectManagerUtils::sqc_unregister_detectable_id_from_dm( diff --git a/src/sql/engine/px/ob_px_sqc_handler.h b/src/sql/engine/px/ob_px_sqc_handler.h index bb08aec4e3..d03e5b7337 100644 --- a/src/sql/engine/px/ob_px_sqc_handler.h +++ b/src/sql/engine/px/ob_px_sqc_handler.h @@ -69,7 +69,7 @@ public: ~ObPxSqcHandler() = default; static constexpr const char *OP_LABEL = ObModIds::ObModIds::OB_SQL_SQC_HANDLER; static ObPxSqcHandler *get_sqc_handler(); - static void release_handler(ObPxSqcHandler *sqc_handler); + static void release_handler(ObPxSqcHandler *sqc_handler, int &report_ret); void reset() ; void release(bool &all_released) { int64_t reference_count = ATOMIC_AAF(&reference_count_, -1); @@ -130,7 +130,7 @@ public: private: void init_flt_content(); - int destroy_sqc(); + int destroy_sqc(int &report_ret); private: lib::MemoryContext mem_context_; uint64_t tenant_id_; diff --git a/src/sql/engine/px/ob_px_worker.cpp b/src/sql/engine/px/ob_px_worker.cpp index d882e93b76..346d8ac489 100644 --- a/src/sql/engine/px/ob_px_worker.cpp +++ b/src/sql/engine/px/ob_px_worker.cpp @@ -217,7 +217,8 @@ void PxWorkerFunctor::operator ()() * 当计数器为0的时候会真正释放sqc handler的内存。请保证所有的 * 内存使用超出次函数。释放的时候必须在租户的space中,所以不能放到外面了。 */ - ObPxSqcHandler::release_handler(sqc_handler); + int report_ret = OB_SUCCESS; + ObPxSqcHandler::release_handler(sqc_handler, report_ret); // 环境清理 ObCurTraceId::reset(); if (enable_trace_log) {