/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX SQL_ENG #include "ob_px_rpc_processor.h" #include "ob_px_sub_coord.h" #include "ob_px_task_process.h" #include "ob_px_admission.h" #include "ob_px_sqc_handler.h" #include "lib/ash/ob_active_session_guard.h" #include "sql/executor/ob_executor_rpc_processor.h" #include "sql/dtl/ob_dtl_channel_group.h" #include "storage/memtable/ob_lock_wait_mgr.h" #include "sql/engine/px/ob_px_target_mgr.h" #include "sql/engine/px/ob_px_sqc_handler.h" #include "sql/dtl/ob_dtl_basic_channel.h" using namespace oceanbase::common; using namespace oceanbase::sql; int ObInitSqcP::init() { int ret = OB_SUCCESS; ObPxSqcHandler *sqc_handler = nullptr; if (OB_ISNULL(sqc_handler = ObPxSqcHandler::get_sqc_handler())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected sqc handler", K(ret)); } else if (OB_FAIL(sqc_handler->init())) { LOG_WARN("Failed to init sqc handler", K(ret)); } else { arg_.sqc_handler_ = sqc_handler; arg_.sqc_handler_->reset_reference_count(); //设置sqc_handler的引用计数为1. } return ret; } void ObInitSqcP::destroy() { obrpc::ObRpcProcessor >::destroy(); /** * 如果经历了after process这个流程,arg_.sqc_handler_会被置为空。 * 如果这里arg_.sqc_handler_不为空,则意味着,init以后,没有进行 * after process这个流程,那么就应当由自己来做释放。 */ if (OB_NOT_NULL(arg_.sqc_handler_)) { ObPxSqcHandler::release_handler(arg_.sqc_handler_); } ObActiveSessionGuard::setup_default_ash(); } int ObInitSqcP::process() { ObActiveSessionGuard::get_stat().in_px_execution_ = true; int ret = OB_SUCCESS; LOG_TRACE("receive dfo", K_(arg)); ObPxSqcHandler *sqc_handler = arg_.sqc_handler_; /** * 只要能进process,after process一定会被调用,所以可以用中断覆盖整个 * SQC的生命周期。 */ if (OB_NOT_NULL(sqc_handler)) { ObPxRpcInitSqcArgs &arg = sqc_handler->get_sqc_init_arg(); SET_INTERRUPTABLE(arg.sqc_.get_interrupt_id().px_interrupt_id_); unregister_interrupt_ = true; } if (OB_ISNULL(sqc_handler)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Sqc handler can't be nullptr", K(ret)); } else if (OB_FAIL(sqc_handler->init_env())) { LOG_WARN("Failed to init sqc env", K(ret)); } else if (OB_FAIL(sqc_handler->pre_acquire_px_worker(result_.reserved_thread_count_))) { LOG_WARN("Failed to pre acquire px worker", K(ret)); } else if (OB_FAIL(pre_setup_op_input(*sqc_handler))) { LOG_WARN("pre setup op input failed", K(ret)); } else if (OB_FAIL(sqc_handler->thread_count_auto_scaling(result_.reserved_thread_count_))) { LOG_WARN("fail to do thread auto scaling", K(ret), K(result_.reserved_thread_count_)); } else if (result_.reserved_thread_count_ <= 0) { ret = OB_ERR_INSUFFICIENT_PX_WORKER; LOG_WARN("Worker thread res not enough", K_(result)); } else if (OB_FAIL(sqc_handler->link_qc_sqc_channel())) { LOG_WARN("Failed to link qc sqc channel", K(ret)); } else { /*do nothing*/ } if (OB_FAIL(ret) && OB_NOT_NULL(sqc_handler)) { if (unregister_interrupt_) { ObPxRpcInitSqcArgs &arg = sqc_handler->get_sqc_init_arg(); UNSET_INTERRUPTABLE(arg.sqc_.get_interrupt_id().px_interrupt_id_); unregister_interrupt_ = false; } ObPxSqcHandler::release_handler(sqc_handler); arg_.sqc_handler_ = nullptr; } // https://work.aone.alibaba-inc.com/issue/37723456 if (OB_SUCCESS != ret && is_schema_error(ret)) { if (OB_NOT_NULL(sqc_handler) && GSCHEMASERVICE.is_schema_error_need_retry(NULL, sqc_handler->get_tenant_id())) { ret = OB_ERR_REMOTE_SCHEMA_NOT_FULL; } else { ret = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH; } } // 非rpc框架的错误内容设置到response消息中 // rpc框架的错误码在process中返回OB_SUCCESS result_.rc_ = ret; // 异步逻辑处理接口,总返回OB_SUCCESS,逻辑处理中的错误码 // 通过result_.rc_返回 return OB_SUCCESS; } int ObInitSqcP::pre_setup_op_input(ObPxSqcHandler &sqc_handler) { int ret = OB_SUCCESS; ObPxSubCoord &sub_coord = sqc_handler.get_sub_coord(); ObExecContext *ctx = sqc_handler.get_sqc_init_arg().exec_ctx_; ObOpSpec *root = sqc_handler.get_sqc_init_arg().op_spec_root_; ObPxSqcMeta &sqc = sqc_handler.get_sqc_init_arg().sqc_; sub_coord.set_is_single_tsc_leaf_dfo(sqc.is_single_tsc_leaf_dfo()); CK(OB_NOT_NULL(ctx) && OB_NOT_NULL(root)); if (sqc.is_single_tsc_leaf_dfo() && OB_FAIL(sub_coord.rebuild_sqc_access_table_locations())) { LOG_WARN("fail to rebuild sqc access location", K(ret)); } else if (OB_FAIL(sub_coord.pre_setup_op_input(*ctx, *root, sub_coord.get_sqc_ctx(), sqc.get_access_table_locations(), sqc.get_access_table_location_keys()))) { LOG_WARN("pre_setup_op_input failed", K(ret)); } return ret; } int ObInitSqcP::startup_normal_sqc(ObPxSqcHandler &sqc_handler) { int ret = OB_SUCCESS; int64_t dispatched_worker_count = 0; ObSQLSessionInfo *session = sqc_handler.get_exec_ctx().get_my_session(); ObPxSubCoord &sub_coord = sqc_handler.get_sub_coord(); if (OB_ISNULL(session)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("session is NULL", K(ret)); } else { ObPxRpcInitSqcArgs &arg = sqc_handler.get_sqc_init_arg(); ObWorkerSessionGuard worker_session_guard(session); ObSQLSessionInfo::LockGuard lock_guard(session->get_query_lock()); session->set_current_trace_id(ObCurTraceId::get_trace_id()); session->set_peer_addr(arg.sqc_.get_qc_addr()); if (OB_FAIL(session->store_query_string(ObString::make_string("PX SUB COORDINATOR")))) { LOG_WARN("store query string to session failed", K(ret)); } else if (OB_FAIL(sub_coord.pre_process())) { LOG_WARN("fail process sqc", K(arg), K(ret)); } else if (OB_FAIL(sub_coord.try_start_tasks(dispatched_worker_count))) { /** * 启动部分worker失败的时候,我们这里主动将已经启动的worker中断掉。 * 这个操作是阻塞的,中断成功后,后续直接释放sqc handler。 */ LOG_WARN("Notity all dispatched worker to exit", K(ret), K(dispatched_worker_count)); sub_coord.notify_dispatched_task_exit(dispatched_worker_count); LOG_WARN("All dispatched worker exit", K(ret), K(dispatched_worker_count)); } else { sqc_handler.get_notifier().wait_all_worker_start(); /** * 检查中断,如果自己这边已收到中断,传递给各个worker,避免worker掉中断。 * process流程一旦结束,sqc就可能收到中断,但是此时worker不一定注册了中断, * 所以这是sqc需要将中断传递给各个worker。 */ sqc_handler.check_interrupt(); sqc_handler.worker_end_hook(); } } return ret; } int ObInitSqcP::after_process(int error_code) { int ret = OB_SUCCESS; UNUSED(error_code); ObSQLSessionInfo *session = nullptr; ObPxSqcHandler *sqc_handler = arg_.sqc_handler_; bool no_need_startup_normal_sqc = (OB_SUCCESS != result_.rc_); if (no_need_startup_normal_sqc) { /** * rc_不等于OB_SUCCESS,不再进行sqc的流程,直接在最后面去进行sqc的释放。 */ } else if (OB_ISNULL(sqc_handler = arg_.sqc_handler_) || !sqc_handler->valid()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Invalid sqc handler", K(ret), KPC(sqc_handler)); } else if (OB_ISNULL(session = sqc_handler->get_exec_ctx().get_my_session())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Session can't be null", K(ret)); } else { lib::CompatModeGuard g(session->get_compatibility_mode() == ORACLE_MODE ? lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL); sqc_handler->set_tenant_id(sqc_handler->get_exec_ctx().get_my_session()->get_effective_tenant_id()); ObPxRpcInitSqcArgs &arg = sqc_handler->get_sqc_init_arg(); /** * 根据 arg_ 参数来获取本机线程,并执行 task */ LOG_TRACE("process dfo", K(arg), K(session->get_compatibility_mode()), K(sqc_handler->get_reserved_px_thread_count())); ret = startup_normal_sqc(*sqc_handler); session->set_session_sleep(); } ObActiveSessionGuard::get_stat().in_px_execution_ = false; /** * 此处需要清理中断,并把分配的线程数和handler释放. * worker正常启动后,此时它的引用计数被更新成了 * worker数量+rpc,release_handler会做减掉一个引用计数,最后一个引用计数的人 * 会真正的对sqc handler进行释放。 * 最后一个工作线程,需要释放内存; */ if (!no_need_startup_normal_sqc) { if (unregister_interrupt_) { if (OB_ISNULL(sqc_handler = arg_.sqc_handler_) || !sqc_handler->valid()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Invalid sqc handler", K(ret), KPC(sqc_handler)); } else { ObPxRpcInitSqcArgs &arg = sqc_handler->get_sqc_init_arg(); UNSET_INTERRUPTABLE(arg.sqc_.get_interrupt_id().px_interrupt_id_); } } if (OB_NOT_NULL(sqc_handler) && OB_SUCCESS == sqc_handler->get_end_ret()) { sqc_handler->set_end_ret(ret); } ObPxSqcHandler::release_handler(sqc_handler); arg_.sqc_handler_ = nullptr; } return ret; } // 已经未使用了,后续移除 int ObInitTaskP::init() { return OB_NOT_SUPPORTED; } int ObInitTaskP::process() { // 根据 arg_ 参数来获取本机线程,并执行 task return OB_NOT_SUPPORTED; } int ObInitTaskP::after_process(int error_code) { UNUSED(error_code); return OB_NOT_SUPPORTED; } void ObFastInitSqcReportQCMessageCall::operator()(hash::HashMapPair &entry) { UNUSED(entry); if (OB_NOT_NULL(sqc_)) { if (sqc_->is_ignore_vtable_error() && err_ != OB_SUCCESS && err_ != OB_TIMEOUT) { // 当该SQC是虚拟表查询时, 调度RPC失败时需要忽略错误结果. // 并mock一个sqc finsh msg发送给正在轮询消息的PX算子 // 此操作已确认是线程安全的. mock_sqc_finish_msg(); } else { sqc_->set_need_report(false); if (need_set_not_alive_) { sqc_->set_server_not_alive(true); } } } } int ObFastInitSqcReportQCMessageCall::mock_sqc_finish_msg() { int ret = OB_SUCCESS; if (OB_NOT_NULL(sqc_)) { dtl::ObDtlBasicChannel *ch = reinterpret_cast( sqc_->get_qc_channel()); if (OB_ISNULL(ch)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ch is unexpected", K(ret)); } else { MTL_SWITCH(ch->get_tenant_id()) { ObPxFinishSqcResultMsg finish_msg; finish_msg.rc_ = err_; finish_msg.dfo_id_ = sqc_->get_dfo_id(); finish_msg.sqc_id_ = sqc_->get_sqc_id(); dtl::ObDtlMsgHeader header; header.nbody_ = static_cast(finish_msg.get_serialize_size()); header.type_ = static_cast(finish_msg.get_type()); int64_t need_size = header.get_serialize_size() + finish_msg.get_serialize_size(); dtl::ObDtlLinkedBuffer *buffer = nullptr; if (OB_ISNULL(buffer = ch->alloc_buf(need_size))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("alloc buffer failed", K(ret)); } else { auto buf = buffer->buf(); auto size = buffer->size(); auto &pos = buffer->pos(); buffer->set_data_msg(false); buffer->timeout_ts() = timeout_ts_; buffer->set_msg_type(dtl::ObDtlMsgType::FINISH_SQC_RESULT); if (OB_FAIL(common::serialization::encode(buf, size, pos, header))) { LOG_WARN("fail to encode buffer", K(ret)); } else if (OB_FAIL(common::serialization::encode(buf, size, pos, finish_msg))) { LOG_WARN("serialize RPC channel message fail", K(ret)); } else if (FALSE_IT(buffer->size() = pos)) { } else if (FALSE_IT(pos = 0)) { } else if (FALSE_IT(buffer->tenant_id() = ch->get_tenant_id())) { } else if (OB_FAIL(ch->attach(buffer))) { LOG_WARN("fail to feedup buffer", K(ret)); } else if (FALSE_IT(ch->free_buffer_count())) { } else { need_interrupt_ = false; } } if (NULL != buffer) { ch->free_buffer_count(); } } } } return ret; } // ObInitFastSqcP相关函数. int ObInitFastSqcP::init() { int ret = OB_SUCCESS; ObPxSqcHandler *sqc_handler = nullptr; if (OB_ISNULL(sqc_handler = ObPxSqcHandler::get_sqc_handler())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected sqc handler", K(ret)); } else if (OB_FAIL(sqc_handler->init())) { LOG_WARN("Failed to init sqc handler", K(ret)); } else { arg_.sqc_handler_ = sqc_handler; arg_.sqc_handler_->reset_reference_count(); //设置sqc_handler的引用计数为1. } return ret; } void ObInitFastSqcP::destroy() { obrpc::ObRpcProcessor >::destroy(); /** * 如果经历了after process这个流程,arg_.sqc_handler_会被置为空。 * 如果这里arg_.sqc_handler_不为空,则意味着,init以后,没有进行 * after process这个流程,那么就应当由自己来做释放。 */ if (OB_NOT_NULL(arg_.sqc_handler_)) { ObPxSqcHandler::release_handler(arg_.sqc_handler_); } ObActiveSessionGuard::setup_default_ash(); } int ObInitFastSqcP::process() { ObActiveSessionGuard::get_stat().in_sql_execution_ = true; int ret = OB_SUCCESS; LOG_TRACE("receive dfo", K_(arg)); ObPxSqcHandler *sqc_handler = arg_.sqc_handler_; ObSQLSessionInfo *session = nullptr; if (OB_ISNULL(sqc_handler)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Sqc handler can't be nullptr", K(ret)); } else if (OB_FAIL(sqc_handler->init_env())) { LOG_WARN("Failed to init sqc env", K(ret)); } else if (OB_ISNULL(sqc_handler = arg_.sqc_handler_) || !sqc_handler->valid()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Invalid sqc handler", K(ret), KPC(sqc_handler)); } else if (OB_FAIL(OB_E(EventTable::EN_PX_SQC_EXECUTE_FAILED) OB_SUCCESS)) { LOG_WARN("match sqc execute errism", K(ret)); } else if (OB_ISNULL(session = sqc_handler->get_exec_ctx().get_my_session())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Session can't be null", K(ret)); } else if (OB_FAIL(sqc_handler->link_qc_sqc_channel())) { LOG_WARN("fail to link qc sqc channel", K(ret)); } else { ObPxRpcInitSqcArgs &arg = sqc_handler->get_sqc_init_arg(); arg.sqc_.set_task_count(1); ObPxInterruptGuard px_int_guard(arg.sqc_.get_interrupt_id().px_interrupt_id_); lib::CompatModeGuard g(session->get_compatibility_mode() == ORACLE_MODE ? lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL); sqc_handler->set_tenant_id(session->get_effective_tenant_id()); LOG_TRACE("process dfo", K(arg), K(session->get_compatibility_mode()), K(sqc_handler->get_reserved_px_thread_count())); if (OB_FAIL(startup_normal_sqc(*sqc_handler))) { LOG_WARN("fail to startup normal sqc", K(ret)); } } // https://work.aone.alibaba-inc.com/issue/37723456 if (OB_SUCCESS != ret && is_schema_error(ret)) { if (OB_NOT_NULL(sqc_handler) && GSCHEMASERVICE.is_schema_error_need_retry(NULL, sqc_handler->get_tenant_id())) { ret = OB_ERR_REMOTE_SCHEMA_NOT_FULL; } else { ret = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH; } } ObActiveSessionGuard::get_stat().in_sql_execution_ = false; if (OB_NOT_NULL(sqc_handler)) { // link channel之前或者link过程可能会失败. // 如果sqc和qc没有link, 由response将 ret 通知给px. // 如果sqc和qc已经link, 由dtl msg report通知px. sqc_handler->set_end_ret(ret); if (sqc_handler->has_flag(OB_SQC_HANDLER_QC_SQC_LINKED)) { ret = OB_SUCCESS; } sqc_handler->reset_reference_count(); ObPxSqcHandler::release_handler(sqc_handler); arg_.sqc_handler_ = nullptr; } return ret; } int ObInitFastSqcP::startup_normal_sqc(ObPxSqcHandler &sqc_handler) { int ret = OB_SUCCESS; int64_t dispatched_worker_count = 0; ObSQLSessionInfo *session = sqc_handler.get_exec_ctx().get_my_session(); ObPxSubCoord &sub_coord = sqc_handler.get_sub_coord(); if (OB_ISNULL(session)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("session is NULL", K(ret)); } else { ObPxRpcInitSqcArgs &arg = sqc_handler.get_sqc_init_arg(); ObWorkerSessionGuard worker_session_guard(session); ObSQLSessionInfo::LockGuard lock_guard(session->get_query_lock()); session->set_peer_addr(arg.sqc_.get_qc_addr()); if (OB_FAIL(session->store_query_string(ObString::make_string("PX SUB COORDINATOR")))) { LOG_WARN("store query string to session failed", K(ret)); } else if (OB_FAIL(sub_coord.pre_process())) { LOG_WARN("fail process sqc", K(arg), K(ret)); } else if (OB_FAIL(sub_coord.try_start_tasks(dispatched_worker_count, true))) { LOG_WARN("fail to start tasks", K(ret)); } } return ret; } void ObFastInitSqcCB::on_timeout() { int ret = OB_TIMEOUT; ret = deal_with_rpc_timeout_err_safely(); const bool is_timeout = true; interrupt_qc(ret, is_timeout); } int ObFastInitSqcCB::process() { // https://work.aone.alibaba-inc.com/issue/26171617 int ret = rcode_.rcode_; if (OB_FAIL(ret)) { int64_t cur_timestamp = ::oceanbase::common::ObTimeUtility::current_time(); if (timeout_ts_ - cur_timestamp > 0) { const bool is_timeout = false; interrupt_qc(ret, is_timeout); LOG_WARN("init fast sqc cb async interrupt qc", K_(trace_id), K(addr_), K(timeout_ts_), K(interrupt_id_), K(ret)); } else { LOG_WARN("init fast sqc cb async timeout", K_(trace_id), K(addr_), K(timeout_ts_), K(cur_timestamp), K(ret)); } } return ret; } int ObFastInitSqcCB::deal_with_rpc_timeout_err_safely() { int ret = OB_SUCCESS; ObDealWithRpcTimeoutCall call(addr_, retry_info_, timeout_ts_, trace_id_); call.ret_ = OB_TIMEOUT; ObGlobalInterruptManager *manager = ObGlobalInterruptManager::getInstance(); if (OB_NOT_NULL(manager)) { if (OB_FAIL(manager->get_map().atomic_refactored(interrupt_id_, call))) { LOG_WARN("fail to deal with rpc timeout call", K(interrupt_id_)); } } return call.ret_; } void ObFastInitSqcCB::interrupt_qc(int err, bool is_timeout) { int ret = OB_SUCCESS; ObGlobalInterruptManager *manager = ObGlobalInterruptManager::getInstance(); if (OB_NOT_NULL(manager)) { // if we are sure init_sqc msg is not sent to sqc successfully, we don't have to set sqc not alive. bool init_sqc_not_send_out = (get_error() == EASY_TIMEOUT_NOT_SENT_OUT || get_error() == EASY_DISCONNECT_NOT_SENT_OUT); const bool need_set_not_alive = is_timeout && !init_sqc_not_send_out; ObFastInitSqcReportQCMessageCall call(sqc_, err, timeout_ts_, need_set_not_alive); if (OB_FAIL(manager->get_map().atomic_refactored(interrupt_id_, call))) { LOG_WARN("fail to set need report", K(interrupt_id_)); } else if (!call.need_interrupt_) { /* do nothing*/ LOG_WARN("ignore virtual table error,no need interrupt qc", K(ret)); } else { int tmp_ret = OB_SUCCESS; ObInterruptCode int_code(err, GETTID(), GCTX.self_addr(), "RPC ABORT PX"); if (OB_SUCCESS != (tmp_ret = manager->interrupt(interrupt_id_, int_code))) { LOG_WARN("fail to send interrupt message", K_(trace_id), K(tmp_ret), K(int_code), K(interrupt_id_)); } } } } void ObDealWithRpcTimeoutCall::deal_with_rpc_timeout_err() { if (OB_TIMEOUT == ret_) { int64_t cur_timestamp = ::oceanbase::common::ObTimeUtility::current_time(); // 由于存在时间精度不一致导致的时间差, 这里需要满足大于100ms才认为不是超时. // 一个容错的处理. if (timeout_ts_ - cur_timestamp > 100 * 1000) { LOG_DEBUG("rpc return OB_TIMEOUT, but it is actually not timeout, " "change error code to OB_CONNECT_ERROR", K(ret_), K(timeout_ts_), K(cur_timestamp)); if (NULL != retry_info_) { int a_ret = OB_SUCCESS; if (OB_UNLIKELY(OB_SUCCESS != (a_ret = retry_info_->add_invalid_server_distinctly( addr_)))) { LOG_WARN_RET(a_ret, "fail to add invalid server distinctly", K_(trace_id), K(a_ret), K_(addr)); } } ret_ = OB_RPC_CONNECT_ERROR; } else { LOG_DEBUG("rpc return OB_TIMEOUT, and it is actually timeout, " "do not change error code", K(ret_), K(timeout_ts_), K(cur_timestamp)); if (NULL != retry_info_) { retry_info_->set_is_rpc_timeout(true); } } } } void ObDealWithRpcTimeoutCall::operator() (hash::HashMapPair &entry) { UNUSED(entry); deal_with_rpc_timeout_err(); } int ObPxTenantTargetMonitorP::init() { return OB_SUCCESS; } void ObPxTenantTargetMonitorP::destroy() { } // leader 接收各个 follower 的资源汇报,并将 leader 看到的最新视图作为结果返回给 follower int ObPxTenantTargetMonitorP::process() { int ret = OB_SUCCESS; ObTimeGuard timeguard("px_target_request", 100000); const uint64_t tenant_id = arg_.get_tenant_id(); const uint64_t follower_version = arg_.get_version(); bool is_leader; uint64_t leader_version; if (OB_FAIL(OB_PX_TARGET_MGR.is_leader(tenant_id, is_leader))) { LOG_WARN("get is_leader failed", K(ret), K(tenant_id)); } else if (OB_FAIL(OB_PX_TARGET_MGR.get_version(tenant_id, leader_version))) { LOG_WARN("get master_version failed", K(ret), K(tenant_id)); } else { result_.set_tenant_id(tenant_id); result_.set_version(leader_version); if (!is_leader) { result_.set_status(MONITOR_NOT_MASTER); } else if (follower_version != leader_version) { result_.set_status(MONITOR_VERSION_NOT_MATCH); } else { result_.set_status(MONITOR_READY); for (int i = 0; OB_SUCC(ret) && i < arg_.addr_target_array_.count(); i++) { ObAddr &server = arg_.addr_target_array_.at(i).addr_; int64_t peer_used_inc = arg_.addr_target_array_.at(i).target_; if (OB_FAIL(OB_PX_TARGET_MGR.update_peer_target_used(tenant_id, server, peer_used_inc))) { LOG_WARN("set thread count failed", K(ret), K(tenant_id), K(server), K(peer_used_inc)); } } // A simple and rude exception handling, re-statistics if (OB_FAIL(ret)) { int tem_ret = OB_SUCCESS; if ((tem_ret = OB_PX_TARGET_MGR.reset_statistics(tenant_id, leader_version + 1)) != OB_SUCCESS) { LOG_WARN("reset statistics failed", K(tem_ret)); } } else { const hash::ObHashMap *global_target_usage = NULL; if (OB_FAIL(OB_PX_TARGET_MGR.get_global_target_usage(tenant_id, global_target_usage))) { LOG_WARN("get global thread count failed", K(ret), K(tenant_id)); } else { for (hash::ObHashMap::const_iterator it = global_target_usage->begin(); OB_SUCC(ret) && it != global_target_usage->end(); ++it) { if (OB_FAIL(result_.push_peer_target_usage(it->first, it->second.get_peer_used()))) { COMMON_LOG(WARN, "push_back peer_used failed", K(ret)); } } } } } } return ret; }