diff --git a/src/sql/engine/px/ob_px_sqc_handler.cpp b/src/sql/engine/px/ob_px_sqc_handler.cpp index 68b196acd1..93d8b0b993 100644 --- a/src/sql/engine/px/ob_px_sqc_handler.cpp +++ b/src/sql/engine/px/ob_px_sqc_handler.cpp @@ -41,8 +41,14 @@ int ObPxWorkNotifier::wait_all_worker_start() * 如果丢了信号,则wait一段时间, * 如果未丢信号量,则是直接被唤醒。 */ - while (start_worker_count_ != expect_worker_count_) { + bool is_interrupted = false; + int64_t cnt = 1; + while (start_worker_count_ != expect_worker_count_ && !is_interrupted) { cond_.wait(wait_key, wait_us); + // check status after at most 1 second. + if (0 == (cnt++ % 32)) { + is_interrupted = IS_INTERRUPTED(); + } } return ret; } @@ -377,6 +383,7 @@ int ObPxSqcHandler::link_qc_sqc_channel() void ObPxSqcHandler::check_interrupt() { if (OB_UNLIKELY(IS_INTERRUPTED())) { + has_interrupted_ = true; // 中断错误处理 ObInterruptCode code = GET_INTERRUPT_CODE(); int ret = code.code_; diff --git a/src/sql/engine/px/ob_px_sqc_handler.h b/src/sql/engine/px/ob_px_sqc_handler.h index d03e5b7337..c62fb8bd8f 100644 --- a/src/sql/engine/px/ob_px_sqc_handler.h +++ b/src/sql/engine/px/ob_px_sqc_handler.h @@ -64,7 +64,7 @@ public: mem_context_(NULL), tenant_id_(UINT64_MAX), reserved_px_thread_count_(0), process_flags_(0), end_ret_(OB_SUCCESS), reference_count_(1), notifier_(nullptr), exec_ctx_(nullptr), des_phy_plan_(nullptr), sqc_init_args_(nullptr), sub_coord_(nullptr), rpc_level_(INT32_MAX), - node_sequence_id_(0) { + node_sequence_id_(0), has_interrupted_(false) { } ~ObPxSqcHandler() = default; static constexpr const char *OP_LABEL = ObModIds::ObModIds::OB_SQL_SQC_HANDLER; @@ -125,6 +125,7 @@ public: void set_rpc_level(int64_t level) { rpc_level_ = level; } void set_node_sequence_id(uint64_t node_sequence_id) { node_sequence_id_ = node_sequence_id; } int thread_count_auto_scaling(int64_t &reserved_px_thread_count); + bool has_interrupted() const { return has_interrupted_; } TO_STRING_KV(K_(tenant_id), K_(reserved_px_thread_count), KP_(notifier), K_(exec_ctx), K_(des_phy_plan), K_(sqc_init_args), KP_(sub_coord), K_(rpc_level)); @@ -147,6 +148,15 @@ private: trace::FltTransCtx flt_ctx_; int64_t rpc_level_; uint64_t node_sequence_id_; + /* At first, sqc must wait for all workers start, and then check whether it is interrupted. + * If so, sqc will broadcast interruption to all workers in case that some workers have not registered interruption when qc send interruption. + * Then we find that sqc may hang at waiting for all workers start, so sqc check whether interrupted while waiting now. + * This change makes that if worker starts after sqc broadcast interruption, it will miss the interruption. + * So we add has_interrupted_ in sqc_handler. + * 1. sqc set has_interrupted_ = true before broadcast interruption. + * 2. worker register interruption first, then check has_interrupted_, skip execution if has_interrupted_ = true. + */ + bool has_interrupted_; }; } diff --git a/src/sql/engine/px/ob_px_sub_coord.cpp b/src/sql/engine/px/ob_px_sub_coord.cpp index d5c49b5a8b..6a76fd8bc7 100644 --- a/src/sql/engine/px/ob_px_sub_coord.cpp +++ b/src/sql/engine/px/ob_px_sub_coord.cpp @@ -138,10 +138,13 @@ void ObPxSubCoord::notify_dispatched_task_exit(int64_t dispatched_worker_count) { (void) thread_worker_factory_.join(); auto &tasks = sqc_ctx_.get_tasks(); - for (int64_t idx = 0; idx < dispatched_worker_count && dispatched_worker_count <= tasks.count(); ++idx) { + bool is_interrupted = false; + for (int64_t idx = 0; + idx < dispatched_worker_count && dispatched_worker_count <= tasks.count() && !is_interrupted; + ++idx) { int tick = 1; ObPxTask &task = tasks.at(idx); - while (false == task.is_task_state_set(SQC_TASK_EXIT)) { + while (false == task.is_task_state_set(SQC_TASK_EXIT) && !is_interrupted) { // 每秒给当前 sqc 中未完成的 tasks 发送一次中断 // 首次发中断的时间为 100ms 时。定这个时间是为了 // cover px pool 调度 task 的延迟 @@ -151,7 +154,8 @@ void ObPxSubCoord::notify_dispatched_task_exit(int64_t dispatched_worker_count) } // 如果 10s 还没有退出,则打印一条日志。按照设计,不会出现这种情况 if (tick++ % 10000 == 0) { - LOG_INFO("waiting for task exit", K(idx), K(dispatched_worker_count), K(tick)); + is_interrupted = IS_INTERRUPTED(); + LOG_INFO("waiting for task exit", K(idx), K(dispatched_worker_count), K(tick), K(is_interrupted)); } ob_usleep(1000); } diff --git a/src/sql/engine/px/ob_px_worker.cpp b/src/sql/engine/px/ob_px_worker.cpp index 346d8ac489..6b6976176b 100644 --- a/src/sql/engine/px/ob_px_worker.cpp +++ b/src/sql/engine/px/ob_px_worker.cpp @@ -126,17 +126,44 @@ int ObPxCoroWorker::deep_copy_assign(const ObPxRpcInitTaskArgs &src, ////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////// - +class SQCHandlerGuard +{ +public: + SQCHandlerGuard(ObPxSqcHandler *h) : sqc_handler_(h) + { + if (OB_LIKELY(sqc_handler_)) { + sqc_handler_->get_notifier().worker_start(GETTID()); + } + } + ~SQCHandlerGuard() + { + if (OB_LIKELY(sqc_handler_)) { + sqc_handler_->worker_end_hook(); + int report_ret = OB_SUCCESS; + ObPxSqcHandler::release_handler(sqc_handler_, report_ret); + sqc_handler_ = nullptr; + } + } +private: + ObPxSqcHandler *sqc_handler_; +}; void PxWorkerFunctor::operator ()() { int ret = OB_SUCCESS; + ObCurTraceId::set(env_arg_.get_trace_id()); + /** + * 中断必须覆盖到release handler,因为它的流程含有sqc向qc发送消息, + * 需要check中断。而中断本身是线程局部,不依赖于租户空间才对。 + */ + ObPxInterruptGuard px_int_guard(task_arg_.task_.get_interrupt_id().px_interrupt_id_); ObPxSqcHandler *sqc_handler = task_arg_.get_sqc_handler(); + SQCHandlerGuard sqc_handler_guard(sqc_handler); lib::MemoryContext mem_context = nullptr; const bool enable_trace_log = lib::is_trace_log_enabled(); //ensure PX worker skip updating timeout_ts_ by ntp offset THIS_WORKER.set_ntp_offset(0); - if (OB_NOT_NULL(sqc_handler)) { + if (OB_NOT_NULL(sqc_handler) && OB_LIKELY(!sqc_handler->has_interrupted())) { THIS_WORKER.set_worker_level(sqc_handler->get_rpc_level()); THIS_WORKER.set_curr_request_level(sqc_handler->get_rpc_level()); LOG_TRACE("init flt ctx", K(sqc_handler->get_flt_ctx())); @@ -150,13 +177,6 @@ void PxWorkerFunctor::operator ()() sqc_id, task_arg_.task_.get_sqc_id(), qc_id, task_arg_.task_.get_qc_id(), group_id, THIS_WORKER.get_group_id()); - /** - * 中断必须覆盖到release handler,因为它的流程含有sqc向qc发送消息, - * 需要check中断。而中断本身是线程局部,不依赖于租户空间才对。 - */ - ObPxInterruptGuard px_int_guard(task_arg_.task_.get_interrupt_id().px_interrupt_id_); - // 环境初始化 - ObCurTraceId::set(env_arg_.get_trace_id()); // Do not set thread local log level while log level upgrading (OB_LOGGER.is_info_as_wdiag) if (OB_LOGGER.is_info_as_wdiag()) { ObThreadLogLevelUtils::clear(); @@ -191,14 +211,10 @@ void PxWorkerFunctor::operator ()() // 执行 ObPxTaskProcess worker(*env_arg_.get_gctx(), runtime_arg); worker.set_is_oracle_mode(env_arg_.is_oracle_mode()); - sqc_handler->get_notifier().worker_start(GETTID()); if (OB_SUCC(ret)) { worker.run(); } runtime_arg.destroy(); - - LOG_TRACE("Is finish all worker", K(ret), K(sqc_handler->get_notifier())); - sqc_handler->worker_end_hook(); } } } @@ -212,25 +228,19 @@ void PxWorkerFunctor::operator ()() LOG_ERROR("page manager's used should be 0, unexpected!!!", KP(pm)); } } - /** - * worker在经历了release handler的时候进行内存引用计数的释放。 - * 当计数器为0的时候会真正释放sqc handler的内存。请保证所有的 - * 内存使用超出次函数。释放的时候必须在租户的space中,所以不能放到外面了。 - */ - int report_ret = OB_SUCCESS; - ObPxSqcHandler::release_handler(sqc_handler, report_ret); - // 环境清理 - ObCurTraceId::reset(); - if (enable_trace_log) { - ObThreadLogLevelUtils::clear(); - } } + if (enable_trace_log) { + ObThreadLogLevelUtils::clear(); + } + } else if (OB_ISNULL(sqc_handler)) { + LOG_ERROR("Unexpected null sqc handler", K(sqc_handler)); } else { - LOG_ERROR("Unexpected", K(ret), K(sqc_handler)); + LOG_WARN("already interrupted"); } PxWorkerFinishFunctor on_func_finish; on_func_finish(); + ObCurTraceId::reset(); } void PxWorkerFinishFunctor::operator ()()