diff --git a/deps/oblib/src/lib/ob_define.h b/deps/oblib/src/lib/ob_define.h index cb5c8bd50a..3574c4f3d1 100644 --- a/deps/oblib/src/lib/ob_define.h +++ b/deps/oblib/src/lib/ob_define.h @@ -167,6 +167,7 @@ const int64_t OB_MAX_SPAN_LENGTH = 1024; const int64_t OB_MAX_SPAN_TAG_LENGTH = 8 * 1024L; const int64_t OB_MAX_REF_TYPE_LENGTH = 10; const int64_t OB_MAX_LS_FLAG_LENGTH = 2048; +const int64_t RESOURCE_GROUP_START_ID = 10000; // See ObDeviceHealthStatus for more information const int64_t OB_MAX_DEVICE_HEALTH_STATUS_STR_LENGTH = 20; diff --git a/src/observer/ob_inner_sql_rpc_processor.cpp b/src/observer/ob_inner_sql_rpc_processor.cpp index bb214dbc4a..97d52dfd03 100644 --- a/src/observer/ob_inner_sql_rpc_processor.cpp +++ b/src/observer/ob_inner_sql_rpc_processor.cpp @@ -103,12 +103,13 @@ int ObInnerSqlRpcP::process_write( { int ret = OB_SUCCESS; int64_t affected_rows = -1; + ResourceGroupGuard guard(transmit_arg.get_consumer_group_id()); if (OB_FAIL(conn->execute_write(transmit_arg.get_tenant_id(), write_sql.ptr(), affected_rows))) { LOG_WARN("execute write failed", K(ret), K(transmit_arg), K(write_sql)); } else { transmit_result.set_affected_rows(affected_rows); transmit_result.set_stmt_type( - static_cast(conn)->get_session().get_stmt_type()); + static_cast(conn)->get_session().get_stmt_type()); } return ret; @@ -534,7 +535,6 @@ int ObInnerSqlRpcP::process() LOG_WARN("failed to acquire inner connection", K(ret), K(transmit_arg)); } /* init session info */ - const int64_t group_id = transmit_arg.get_consumer_group_id(); if (OB_SUCC(ret) && OB_NOT_NULL(tmp_session)) { tmp_session->set_current_trace_id(ObCurTraceId::get_trace_id()); tmp_session->switch_tenant(transmit_arg.get_tenant_id()); @@ -717,6 +717,23 @@ int ObInnerSqlRpcP::set_session_param_to_conn( return ret; } +ResourceGroupGuard::ResourceGroupGuard(const int32_t group_id) + : group_change_(false), old_group_id_(0) +{ + if (group_id >= RESOURCE_GROUP_START_ID) { + old_group_id_ = THIS_WORKER.get_group_id(); + THIS_WORKER.set_group_id(group_id); + group_change_ = true; + } +} + +ResourceGroupGuard::~ResourceGroupGuard() +{ + if (group_change_) { + THIS_WORKER.set_group_id(old_group_id_); + } +} + } } // namespace oceanbase diff --git a/src/observer/ob_inner_sql_rpc_processor.h b/src/observer/ob_inner_sql_rpc_processor.h index 73439121e6..9f5330eaca 100644 --- a/src/observer/ob_inner_sql_rpc_processor.h +++ b/src/observer/ob_inner_sql_rpc_processor.h @@ -95,6 +95,18 @@ private: DISALLOW_COPY_AND_ASSIGN(ObInnerSqlRpcP); }; +class ResourceGroupGuard +{ + //todo qilu:revert after ddl_back_threads are split under tenants +public: + ResourceGroupGuard(const int32_t group_id); + ~ResourceGroupGuard(); +public: + bool group_change_; + int32_t old_group_id_; + +}; + } } #endif /* OBDEV_SRC_OBSERVER_INNER_SQL_RPC_PROCESSOR_H_ */ diff --git a/src/share/io/ob_io_define.cpp b/src/share/io/ob_io_define.cpp index fb3817237c..6c748d2361 100644 --- a/src/share/io/ob_io_define.cpp +++ b/src/share/io/ob_io_define.cpp @@ -438,7 +438,7 @@ int64_t ObIORequest::get_group_id() const uint64_t ObIORequest::get_io_usage_index() { uint64_t index = 0; - if (get_group_id() < GROUP_START_ID) { + if (get_group_id() < RESOURCE_GROUP_START_ID) { //other group , do nothing } else { index = tenant_io_mgr_.get_ptr()->get_usage_index(get_group_id()); @@ -1173,7 +1173,7 @@ int ObTenantIOConfig::parse_group_config(const char *config_str) int ObTenantIOConfig::add_single_group_config(const uint64_t tenant_id, const int64_t group_id, int64_t min_percent, int64_t max_percent, int64_t weight_percent) { int ret = OB_SUCCESS; - if (group_id < GROUP_START_ID || !is_valid_tenant_id(tenant_id) || + if (group_id < RESOURCE_GROUP_START_ID || !is_valid_tenant_id(tenant_id) || min_percent < 0 || min_percent > 100 || max_percent < 0 || max_percent > 100 || weight_percent < 0 || weight_percent > 100 || diff --git a/src/share/io/ob_io_define.h b/src/share/io/ob_io_define.h index 794f32d330..826d253933 100644 --- a/src/share/io/ob_io_define.h +++ b/src/share/io/ob_io_define.h @@ -31,7 +31,6 @@ namespace common static constexpr int64_t DEFAULT_IO_WAIT_TIME_MS = 5000L; // 5s static constexpr int64_t MAX_IO_WAIT_TIME_MS = 300L * 1000L; // 5min -static constexpr int64_t GROUP_START_ID = 10000L; // start id = 10000 static constexpr int64_t GROUP_START_NUM = 8L; enum class ObIOMode : uint8_t { diff --git a/src/share/io/ob_io_manager.cpp b/src/share/io/ob_io_manager.cpp index f1513eec38..549234665a 100644 --- a/src/share/io/ob_io_manager.cpp +++ b/src/share/io/ob_io_manager.cpp @@ -980,7 +980,7 @@ int ObTenantIOManager::modify_io_config(const uint64_t group_id, } else { uint64_t index = INT64_MAX; DRWLock::WRLockGuard guard(io_config_lock_); - if (group_id < GROUP_START_ID && group_id > 0) { + if (group_id < RESOURCE_GROUP_START_ID && group_id > 0) { ret = OB_INVALID_CONFIG; LOG_WARN("invalid group id", K(ret), K(tenant_id_), K(group_id)); } else if (min_percent < 0 || min_percent > 100 || @@ -1046,7 +1046,7 @@ int ObTenantIOManager::add_group_io_config(const int64_t group_id, } else if (OB_UNLIKELY(!is_working())) { ret = OB_STATE_NOT_MATCH; LOG_WARN("tenant not working", K(ret), K(tenant_id_)); - } else if (group_id < GROUP_START_ID || min_percent < 0 || min_percent > 100 || + } else if (group_id < RESOURCE_GROUP_START_ID || min_percent < 0 || min_percent > 100 || max_percent < 0 || max_percent > 100 || max_percent < min_percent || weight_percent < 0 || weight_percent > 100) { ret = OB_INVALID_CONFIG; @@ -1099,7 +1099,7 @@ int ObTenantIOManager::reset_consumer_group_config(const int64_t group_id) } else if (OB_UNLIKELY(!is_working())) { ret = OB_STATE_NOT_MATCH; LOG_WARN("tenant not working", K(ret), K(tenant_id_)); - } else if (group_id < GROUP_START_ID) { + } else if (group_id < RESOURCE_GROUP_START_ID) { ret = OB_INVALID_CONFIG; LOG_WARN("cannot reset other group io config", K(ret), K(group_id)); } else { @@ -1138,7 +1138,7 @@ int ObTenantIOManager::delete_consumer_group_config(const int64_t group_id) } else if (OB_UNLIKELY(!is_working())) { ret = OB_STATE_NOT_MATCH; LOG_WARN("tenant not working", K(ret), K(tenant_id_)); - } else if (group_id < GROUP_START_ID) { + } else if (group_id < RESOURCE_GROUP_START_ID) { ret = OB_INVALID_CONFIG; LOG_WARN("cannot delete other group io config", K(ret), K(group_id)); } else { diff --git a/src/share/io/ob_io_struct.cpp b/src/share/io/ob_io_struct.cpp index 26077a183f..f13c59485b 100644 --- a/src/share/io/ob_io_struct.cpp +++ b/src/share/io/ob_io_struct.cpp @@ -992,7 +992,7 @@ int ObIOSender::enqueue_request(ObIORequest &req) } else { uint64_t index = INT_MAX64; const int64_t group_id = tmp_req->get_group_id(); - if (group_id < GROUP_START_ID) { //other + if (group_id < RESOURCE_GROUP_START_ID) { //other tmp_phy_queue = &(io_group_queues->other_phy_queue_); } else if (OB_FAIL(req.tenant_io_mgr_.get_ptr()->get_group_index(group_id, index))) { // 防止删除group、新建group等情况发生时在途req无法找到对应的group diff --git a/src/share/resource_manager/ob_cgroup_ctrl.cpp b/src/share/resource_manager/ob_cgroup_ctrl.cpp index d4db66668f..7fd46f6df8 100644 --- a/src/share/resource_manager/ob_cgroup_ctrl.cpp +++ b/src/share/resource_manager/ob_cgroup_ctrl.cpp @@ -375,7 +375,7 @@ int ObCgroupCtrl::add_self_to_cgroup(const uint64_t tenant_id, int64_t group_id) bool exist_cgroup = false; if (OB_FAIL(get_group_path(group_path, PATH_BUFSIZE, tenant_id, group_id))) { - LOG_WARN("fail get group path", K(tenant_id), K(ret)); + LOG_WARN("fail get group path", K(tenant_id), K(ret), K(group_id)); } else if (OB_FAIL(FileDirectoryUtils::is_exists(group_path, exist_cgroup))) { LOG_WARN("fail check file exist", K(group_path), K(ret)); } else if (!exist_cgroup && OB_FAIL(init_cgroup_dir_(group_path))) { @@ -752,7 +752,7 @@ int ObCgroupCtrl::reset_group_iops(const uint64_t tenant_id, } else { LOG_WARN("fail get group id", K(ret), K(group_id), K(group_name)); } - } else if (group_id < GROUP_START_ID) { + } else if (group_id < RESOURCE_GROUP_START_ID) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid group id", K(ret), K(group_id), K(group_name)); } else if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(tenant_id, tenant_holder))) { @@ -787,7 +787,7 @@ int ObCgroupCtrl::delete_group_iops(const uint64_t tenant_id, } else { LOG_WARN("fail get group id", K(ret), K(group_id), K(group_name)); } - } else if (group_id < GROUP_START_ID) { + } else if (group_id < RESOURCE_GROUP_START_ID) { //OTHER_GROUPS and all cannot be deleted ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid group id", K(ret), K(group_id), K(group_name)); diff --git a/src/share/resource_manager/ob_resource_manager_proxy.cpp b/src/share/resource_manager/ob_resource_manager_proxy.cpp index bcf945ffbd..7827184a14 100644 --- a/src/share/resource_manager/ob_resource_manager_proxy.cpp +++ b/src/share/resource_manager/ob_resource_manager_proxy.cpp @@ -820,7 +820,7 @@ int ObResourceManagerProxy::check_iops_validity( uint64_t total_min = 0; for (int64_t i = 0; OB_SUCC(ret) && i < directives.count(); ++i) { ObPlanDirective &cur_directive = directives.at(i); - if (cur_directive.group_id_ < GROUP_START_ID) { + if (cur_directive.group_id_ < RESOURCE_GROUP_START_ID) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected group id", K(cur_directive)); } else if (OB_UNLIKELY(!cur_directive.is_valid())) { diff --git a/src/share/resource_manager/ob_resource_plan_manager.cpp b/src/share/resource_manager/ob_resource_plan_manager.cpp index 8e91e48012..785bc83b69 100644 --- a/src/share/resource_manager/ob_resource_plan_manager.cpp +++ b/src/share/resource_manager/ob_resource_plan_manager.cpp @@ -222,7 +222,7 @@ int ObResourcePlanManager::normalize_iops_directives(const uint64_t tenant_id, } else { for (int64_t i = 0; OB_SUCC(ret) && i < directives.count(); ++i) { ObPlanDirective &cur_directive = directives.at(i); - if (cur_directive.group_id_ < GROUP_START_ID) { + if (cur_directive.group_id_ < RESOURCE_GROUP_START_ID) { ret = OB_ERR_UNEXPECTED; // 理论上不应该出现 LOG_WARN("unexpected error!!!", K(cur_directive));