diff --git a/src/observer/omt/ob_tenant.h b/src/observer/omt/ob_tenant.h index 5c3e41f9c5..96f2566869 100644 --- a/src/observer/omt/ob_tenant.h +++ b/src/observer/omt/ob_tenant.h @@ -609,6 +609,8 @@ OB_INLINE int64_t ObResourceGroup::min_worker_cnt() const cnt = std::max(cnt, 8L); } else if (share::OBCG_WR == group_id_) { cnt = 2; // one for take snapshot, one for purge + } else if (share::OBCG_DBA_COMMAND == group_id_) { + cnt = 1; } return cnt; } @@ -623,6 +625,8 @@ OB_INLINE int64_t ObResourceGroup::max_worker_cnt() const cnt = std::max(cnt, tenant_->max_worker_cnt()); } else if (share::OBCG_WR == group_id_) { cnt = 2; // one for take snapshot, one for purge + } else if (share::OBCG_DBA_COMMAND == group_id_) { + cnt = 1; } return cnt; } diff --git a/src/rootserver/ob_primary_ls_service.cpp b/src/rootserver/ob_primary_ls_service.cpp index c9f6964a5c..d0ad2a03a1 100755 --- a/src/rootserver/ob_primary_ls_service.cpp +++ b/src/rootserver/ob_primary_ls_service.cpp @@ -422,11 +422,13 @@ int ObPrimaryLSService::check_ls_can_offline_by_rpc_(const share::ObLSStatusInfo const int64_t timeout = GCONF.rpc_timeout; obrpc::ObCheckLSCanOfflineArg arg; can_offline = false; + const uint64_t group_id = info.ls_is_tenant_dropping() ? OBCG_DBA_COMMAND : OBCG_DEFAULT; if (OB_FAIL(arg.init(info.tenant_id_, info.ls_id_, info.status_))) { LOG_WARN("failed to init arg", KR(ret), K(arg)); } else if (OB_FAIL(GCTX.srv_rpc_proxy_->to(leader) .by(info.tenant_id_) .timeout(timeout) + .group_id(group_id) .check_ls_can_offline(arg))) { can_offline = false; LOG_WARN("failed to check ls can offline", KR(ret), K(arg), K(info), diff --git a/src/rootserver/ob_tenant_role_transition_service.cpp b/src/rootserver/ob_tenant_role_transition_service.cpp index a6deecd678..f5c0fb131f 100644 --- a/src/rootserver/ob_tenant_role_transition_service.cpp +++ b/src/rootserver/ob_tenant_role_transition_service.cpp @@ -745,6 +745,7 @@ int ObTenantRoleTransitionService::get_ls_access_mode_( int64_t rpc_count = 0; ObArray return_code_array; int tmp_ret = OB_SUCCESS; + const uint64_t group_id = share::OBCG_DBA_COMMAND; for (int64_t i = 0; OB_SUCC(ret) && i < status_info_array.count(); ++i) { return_code_array.reset(); const ObLSStatusInfo &info = status_info_array.at(i); @@ -755,9 +756,10 @@ int ObTenantRoleTransitionService::get_ls_access_mode_( } else if (OB_FAIL(arg.init(tenant_id_, info.ls_id_))) { LOG_WARN("failed to init arg", KR(ret), K(tenant_id_), K(info)); // use meta rpc process thread - } else if (OB_FAIL(proxy.call(leader, timeout, GCONF.cluster_id, gen_meta_tenant_id(tenant_id_), arg))) { + } else if (OB_FAIL(proxy.call(leader, timeout, GCONF.cluster_id, tenant_id_, group_id, arg))) { //can not ignore error of each ls - LOG_WARN("failed to send rpc", KR(ret), K(leader), K(timeout), K(tenant_id_), K(arg)); + LOG_WARN("failed to send rpc", KR(ret), K(leader), K(timeout), + K(tenant_id_), K(arg), K(group_id)); } else { rpc_count++; } @@ -841,6 +843,7 @@ int ObTenantRoleTransitionService::do_change_ls_access_mode_( ObChangeLSAccessModeProxy proxy(*rpc_proxy_, &obrpc::ObSrvRpcProxy::change_ls_access_mode); ObAddr leader; obrpc::ObLSAccessModeInfo arg; + const uint64_t group_id = share::OBCG_DBA_COMMAND; for (int64_t i = 0; OB_SUCC(ret) && i < ls_access_info.count(); ++i) { const obrpc::ObLSAccessModeInfo &info = ls_access_info.at(i); const int64_t timeout = ctx.get_timeout(); @@ -851,9 +854,10 @@ int ObTenantRoleTransitionService::do_change_ls_access_mode_( } else if (OB_FAIL(arg.init(tenant_id_, info.get_ls_id(), info.get_mode_version(), target_access_mode, ref_scn, sys_ls_sync_scn))) { LOG_WARN("failed to init arg", KR(ret), K(info), K(target_access_mode), K(ref_scn), K(sys_ls_sync_scn)); - } else if (OB_FAIL(proxy.call(leader, timeout, GCONF.cluster_id, gen_meta_tenant_id(tenant_id_), arg))) { + } else if (OB_FAIL(proxy.call(leader, timeout, GCONF.cluster_id, tenant_id_, group_id, arg))) { //can not ignore of each ls - LOG_WARN("failed to send rpc", KR(ret), K(arg), K(timeout), K(tenant_id_), K(info)); + LOG_WARN("failed to send rpc", KR(ret), K(arg), K(timeout), + K(tenant_id_), K(info), K(group_id)); } }//end for //result @@ -1244,6 +1248,7 @@ int ObTenantRoleTransitionService::get_checkpoints_by_rpc(const uint64_t tenant_ *GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::get_ls_sync_scn); obrpc::ObGetLSSyncScnArg arg; int64_t rpc_count = 0; + const uint64_t group_id = share::OBCG_DBA_COMMAND; for (int64_t i = 0; OB_SUCC(ret) && i < status_info_array.count(); ++i) { const ObLSStatusInfo &info = status_info_array.at(i); const int64_t timeout_us = !THIS_WORKER.is_timeout_ts_valid() ? @@ -1254,8 +1259,9 @@ int ObTenantRoleTransitionService::get_checkpoints_by_rpc(const uint64_t tenant_ } else if (OB_FAIL(arg.init(tenant_id, info.ls_id_, check_sync_to_latest))) { LOG_WARN("failed to init arg", KR(ret), K(tenant_id), K(info)); // use meta rpc process thread - } else if (OB_FAIL(proxy.call(leader, timeout_us, gen_meta_tenant_id(tenant_id), arg))) { - LOG_WARN("failed to send rpc", KR(ret), K(leader), K(timeout_us), K(tenant_id), K(arg)); + } else if (OB_FAIL(proxy.call(leader, timeout_us, GCONF.cluster_id, tenant_id, group_id, arg))) { + LOG_WARN("failed to send rpc", KR(ret), K(leader), K(timeout_us), + K(tenant_id), K(arg), K(group_id)); } else { rpc_count++; } @@ -1418,6 +1424,7 @@ void ObTenantRoleTransitionService::broadcast_tenant_info(const char* const log_ } else if (OB_FAIL(unit_operator.get_units_by_tenant(tenant_id_, units))) { LOG_WARN("failed to get tenant unit", KR(ret), K_(tenant_id)); } else { + //no need user special group OBCG_DBA_COMMAND ObRefreshTenantInfoProxy proxy( *GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::refresh_tenant_info); int64_t rpc_count = 0; @@ -1433,7 +1440,7 @@ void ObTenantRoleTransitionService::broadcast_tenant_info(const char* const log_ } else if (OB_FAIL(arg.init(tenant_id_))) { LOG_WARN("failed to init arg", KR(ret), K_(tenant_id)); // use meta rpc process thread - } else if (OB_FAIL(proxy.call(unit.server_, timeout_us, gen_meta_tenant_id(tenant_id_), arg))) { + } else if (OB_FAIL(proxy.call(unit.server_, timeout_us, GCONF.cluster_id, gen_meta_tenant_id(tenant_id_), arg))) { LOG_WARN("failed to send rpc", KR(ret), K(unit), K(timeout_us), K_(tenant_id), K(arg)); } else { rpc_count++; diff --git a/src/share/resource_manager/ob_group_list.h b/src/share/resource_manager/ob_group_list.h index 81519e4961..0c3edf5ac2 100644 --- a/src/share/resource_manager/ob_group_list.h +++ b/src/share/resource_manager/ob_group_list.h @@ -18,4 +18,5 @@ CGID_DEF(OBCG_DIAG_TENANT, 14) CGID_DEF(OBCG_WR, 15) CGID_DEF(OBCG_STORAGE_HA_LEVEL1, 16) CGID_DEF(OBCG_STORAGE_HA_LEVEL2, 17) +CGID_DEF(OBCG_DBA_COMMAND, 18, 1) CGID_DEF(OBCG_LQ, 100)