fix rpc hang when px task start failed
This commit is contained in:
		@ -41,8 +41,14 @@ int ObPxWorkNotifier::wait_all_worker_start()
 | 
			
		||||
   * 如果丢了信号,则wait一段时间,
 | 
			
		||||
   * 如果未丢信号量,则是直接被唤醒。
 | 
			
		||||
   */
 | 
			
		||||
  while (start_worker_count_ != expect_worker_count_) {
 | 
			
		||||
  bool is_interrupted = false;
 | 
			
		||||
  int64_t cnt = 1;
 | 
			
		||||
  while (start_worker_count_ != expect_worker_count_ && !is_interrupted) {
 | 
			
		||||
    cond_.wait(wait_key, wait_us);
 | 
			
		||||
    // check status after at most 1 second.
 | 
			
		||||
    if (0 == (cnt++ % 32)) {
 | 
			
		||||
      is_interrupted = IS_INTERRUPTED();
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
@ -377,6 +383,7 @@ int ObPxSqcHandler::link_qc_sqc_channel()
 | 
			
		||||
void ObPxSqcHandler::check_interrupt()
 | 
			
		||||
{
 | 
			
		||||
  if (OB_UNLIKELY(IS_INTERRUPTED())) {
 | 
			
		||||
    has_interrupted_ = true;
 | 
			
		||||
    // 中断错误处理
 | 
			
		||||
    ObInterruptCode code = GET_INTERRUPT_CODE();
 | 
			
		||||
    int ret = code.code_;
 | 
			
		||||
 | 
			
		||||
@ -64,7 +64,7 @@ public:
 | 
			
		||||
    mem_context_(NULL), tenant_id_(UINT64_MAX), reserved_px_thread_count_(0), process_flags_(0),
 | 
			
		||||
    end_ret_(OB_SUCCESS), reference_count_(1), notifier_(nullptr), exec_ctx_(nullptr),
 | 
			
		||||
    des_phy_plan_(nullptr), sqc_init_args_(nullptr), sub_coord_(nullptr), rpc_level_(INT32_MAX),
 | 
			
		||||
    node_sequence_id_(0) {
 | 
			
		||||
    node_sequence_id_(0), has_interrupted_(false) {
 | 
			
		||||
  }
 | 
			
		||||
  ~ObPxSqcHandler() = default;
 | 
			
		||||
  static constexpr const char *OP_LABEL = ObModIds::ObModIds::OB_SQL_SQC_HANDLER;
 | 
			
		||||
@ -125,6 +125,7 @@ public:
 | 
			
		||||
  void set_rpc_level(int64_t level) { rpc_level_ = level; }
 | 
			
		||||
  void set_node_sequence_id(uint64_t node_sequence_id) { node_sequence_id_ = node_sequence_id; }
 | 
			
		||||
  int thread_count_auto_scaling(int64_t &reserved_px_thread_count);
 | 
			
		||||
  bool has_interrupted() const { return has_interrupted_; }
 | 
			
		||||
  TO_STRING_KV(K_(tenant_id), K_(reserved_px_thread_count), KP_(notifier),
 | 
			
		||||
      K_(exec_ctx), K_(des_phy_plan), K_(sqc_init_args), KP_(sub_coord), K_(rpc_level));
 | 
			
		||||
 | 
			
		||||
@ -147,6 +148,15 @@ private:
 | 
			
		||||
  trace::FltTransCtx flt_ctx_;
 | 
			
		||||
  int64_t rpc_level_;
 | 
			
		||||
  uint64_t node_sequence_id_;
 | 
			
		||||
  /* At first, sqc must wait for all workers start, and then check whether it is interrupted.
 | 
			
		||||
   * If so, sqc will broadcast interruption to all workers in case that some workers have not registered interruption when qc send interruption.
 | 
			
		||||
   * Then we find that sqc may hang at waiting for all workers start, so sqc check whether interrupted while waiting now.
 | 
			
		||||
   * This change makes that if worker starts after sqc broadcast interruption, it will miss the interruption.
 | 
			
		||||
   * So we add has_interrupted_ in sqc_handler.
 | 
			
		||||
   * 1. sqc set has_interrupted_ = true before broadcast interruption.
 | 
			
		||||
   * 2. worker register interruption first, then check has_interrupted_, skip execution if has_interrupted_ = true.
 | 
			
		||||
   */
 | 
			
		||||
  bool has_interrupted_;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -138,10 +138,13 @@ void ObPxSubCoord::notify_dispatched_task_exit(int64_t dispatched_worker_count)
 | 
			
		||||
{
 | 
			
		||||
  (void) thread_worker_factory_.join();
 | 
			
		||||
  auto &tasks = sqc_ctx_.get_tasks();
 | 
			
		||||
  for (int64_t idx = 0; idx < dispatched_worker_count && dispatched_worker_count <= tasks.count(); ++idx) {
 | 
			
		||||
  bool is_interrupted = false;
 | 
			
		||||
  for (int64_t idx = 0;
 | 
			
		||||
       idx < dispatched_worker_count && dispatched_worker_count <= tasks.count() && !is_interrupted;
 | 
			
		||||
       ++idx) {
 | 
			
		||||
    int tick = 1;
 | 
			
		||||
    ObPxTask &task = tasks.at(idx);
 | 
			
		||||
    while (false == task.is_task_state_set(SQC_TASK_EXIT)) {
 | 
			
		||||
    while (false == task.is_task_state_set(SQC_TASK_EXIT) && !is_interrupted) {
 | 
			
		||||
      // 每秒给当前 sqc 中未完成的 tasks 发送一次中断
 | 
			
		||||
      // 首次发中断的时间为 100ms 时。定这个时间是为了
 | 
			
		||||
      // cover px pool 调度 task 的延迟
 | 
			
		||||
@ -151,7 +154,8 @@ void ObPxSubCoord::notify_dispatched_task_exit(int64_t dispatched_worker_count)
 | 
			
		||||
      }
 | 
			
		||||
      // 如果 10s 还没有退出,则打印一条日志。按照设计,不会出现这种情况
 | 
			
		||||
      if (tick++ % 10000 == 0) {
 | 
			
		||||
        LOG_INFO("waiting for task exit", K(idx), K(dispatched_worker_count), K(tick));
 | 
			
		||||
        is_interrupted = IS_INTERRUPTED();
 | 
			
		||||
        LOG_INFO("waiting for task exit", K(idx), K(dispatched_worker_count), K(tick), K(is_interrupted));
 | 
			
		||||
      }
 | 
			
		||||
      ob_usleep(1000);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -126,17 +126,44 @@ int ObPxCoroWorker::deep_copy_assign(const ObPxRpcInitTaskArgs &src,
 | 
			
		||||
//////////////////////////////////////////////////////////////////////////////
 | 
			
		||||
//////////////////////////////////////////////////////////////////////////////
 | 
			
		||||
//////////////////////////////////////////////////////////////////////////////
 | 
			
		||||
 | 
			
		||||
class SQCHandlerGuard
 | 
			
		||||
{
 | 
			
		||||
public:
 | 
			
		||||
  SQCHandlerGuard(ObPxSqcHandler *h) : sqc_handler_(h)
 | 
			
		||||
  {
 | 
			
		||||
    if (OB_LIKELY(sqc_handler_)) {
 | 
			
		||||
      sqc_handler_->get_notifier().worker_start(GETTID());
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  ~SQCHandlerGuard()
 | 
			
		||||
  {
 | 
			
		||||
    if (OB_LIKELY(sqc_handler_)) {
 | 
			
		||||
      sqc_handler_->worker_end_hook();
 | 
			
		||||
      int report_ret = OB_SUCCESS;
 | 
			
		||||
      ObPxSqcHandler::release_handler(sqc_handler_, report_ret);
 | 
			
		||||
      sqc_handler_ = nullptr;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
private:
 | 
			
		||||
  ObPxSqcHandler *sqc_handler_;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
void PxWorkerFunctor::operator ()()
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObCurTraceId::set(env_arg_.get_trace_id());
 | 
			
		||||
  /**
 | 
			
		||||
   * 中断必须覆盖到release handler,因为它的流程含有sqc向qc发送消息,
 | 
			
		||||
   * 需要check中断。而中断本身是线程局部,不依赖于租户空间才对。
 | 
			
		||||
   */
 | 
			
		||||
  ObPxInterruptGuard px_int_guard(task_arg_.task_.get_interrupt_id().px_interrupt_id_);
 | 
			
		||||
  ObPxSqcHandler *sqc_handler = task_arg_.get_sqc_handler();
 | 
			
		||||
  SQCHandlerGuard sqc_handler_guard(sqc_handler);
 | 
			
		||||
  lib::MemoryContext mem_context = nullptr;
 | 
			
		||||
  const bool enable_trace_log = lib::is_trace_log_enabled();
 | 
			
		||||
  //ensure PX worker skip updating timeout_ts_ by ntp offset
 | 
			
		||||
  THIS_WORKER.set_ntp_offset(0);
 | 
			
		||||
  if (OB_NOT_NULL(sqc_handler)) {
 | 
			
		||||
  if (OB_NOT_NULL(sqc_handler) && OB_LIKELY(!sqc_handler->has_interrupted())) {
 | 
			
		||||
    THIS_WORKER.set_worker_level(sqc_handler->get_rpc_level());
 | 
			
		||||
    THIS_WORKER.set_curr_request_level(sqc_handler->get_rpc_level());
 | 
			
		||||
    LOG_TRACE("init flt ctx", K(sqc_handler->get_flt_ctx()));
 | 
			
		||||
@ -150,13 +177,6 @@ void PxWorkerFunctor::operator ()()
 | 
			
		||||
                sqc_id, task_arg_.task_.get_sqc_id(),
 | 
			
		||||
                qc_id, task_arg_.task_.get_qc_id(),
 | 
			
		||||
                group_id, THIS_WORKER.get_group_id());
 | 
			
		||||
    /**
 | 
			
		||||
     * 中断必须覆盖到release handler,因为它的流程含有sqc向qc发送消息,
 | 
			
		||||
     * 需要check中断。而中断本身是线程局部,不依赖于租户空间才对。
 | 
			
		||||
     */
 | 
			
		||||
    ObPxInterruptGuard px_int_guard(task_arg_.task_.get_interrupt_id().px_interrupt_id_);
 | 
			
		||||
    // 环境初始化
 | 
			
		||||
    ObCurTraceId::set(env_arg_.get_trace_id());
 | 
			
		||||
    // Do not set thread local log level while log level upgrading (OB_LOGGER.is_info_as_wdiag)
 | 
			
		||||
    if (OB_LOGGER.is_info_as_wdiag()) {
 | 
			
		||||
      ObThreadLogLevelUtils::clear();
 | 
			
		||||
@ -191,14 +211,10 @@ void PxWorkerFunctor::operator ()()
 | 
			
		||||
            // 执行
 | 
			
		||||
            ObPxTaskProcess worker(*env_arg_.get_gctx(), runtime_arg);
 | 
			
		||||
            worker.set_is_oracle_mode(env_arg_.is_oracle_mode());
 | 
			
		||||
            sqc_handler->get_notifier().worker_start(GETTID());
 | 
			
		||||
            if (OB_SUCC(ret)) {
 | 
			
		||||
              worker.run();
 | 
			
		||||
            }
 | 
			
		||||
            runtime_arg.destroy();
 | 
			
		||||
 | 
			
		||||
            LOG_TRACE("Is finish all worker", K(ret), K(sqc_handler->get_notifier()));
 | 
			
		||||
            sqc_handler->worker_end_hook();
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
@ -212,25 +228,19 @@ void PxWorkerFunctor::operator ()()
 | 
			
		||||
          LOG_ERROR("page manager's used should be 0, unexpected!!!", KP(pm));
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      /**
 | 
			
		||||
       * worker在经历了release handler的时候进行内存引用计数的释放。
 | 
			
		||||
       * 当计数器为0的时候会真正释放sqc handler的内存。请保证所有的
 | 
			
		||||
       * 内存使用超出次函数。释放的时候必须在租户的space中,所以不能放到外面了。
 | 
			
		||||
       */
 | 
			
		||||
      int report_ret = OB_SUCCESS;
 | 
			
		||||
      ObPxSqcHandler::release_handler(sqc_handler, report_ret);
 | 
			
		||||
      // 环境清理
 | 
			
		||||
      ObCurTraceId::reset();
 | 
			
		||||
      if (enable_trace_log) {
 | 
			
		||||
        ObThreadLogLevelUtils::clear();
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    if (enable_trace_log) {
 | 
			
		||||
      ObThreadLogLevelUtils::clear();
 | 
			
		||||
    }
 | 
			
		||||
  } else if (OB_ISNULL(sqc_handler)) {
 | 
			
		||||
    LOG_ERROR("Unexpected null sqc handler", K(sqc_handler));
 | 
			
		||||
  } else {
 | 
			
		||||
    LOG_ERROR("Unexpected", K(ret), K(sqc_handler));
 | 
			
		||||
    LOG_WARN("already interrupted");
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  PxWorkerFinishFunctor on_func_finish;
 | 
			
		||||
  on_func_finish();
 | 
			
		||||
  ObCurTraceId::reset();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void PxWorkerFinishFunctor::operator ()()
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user