diff --git a/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h b/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h index 7d27a40b40..449bc27aa4 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h +++ b/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h @@ -256,8 +256,8 @@ public: } return ret; } - void set_group_id(const int64_t v) {consumer_group_id_ = v; } - int64_t get_group_id() const {return consumer_group_id_; } + void set_group_id(const uint64_t v) {consumer_group_id_ = v; } + uint64_t get_group_id() const {return consumer_group_id_; } void set_reverse_link_creadentials(bool flag) { has_reverse_link_credentials_ = flag; } bool get_reverse_link_creadentials() { return has_reverse_link_credentials_; } void set_usable(bool flag) { usable_ = flag; } @@ -277,7 +277,7 @@ protected: uint64_t dblink_id_; // for dblink, record dblink_id of a connection used by dblink DblinkDriverProto dblink_driver_proto_; //for dblink, record DblinkDriverProto of a connection used by dblink uint32_t sessid_; - int64_t consumer_group_id_; //for resource isolation + uint64_t consumer_group_id_; //for resource isolation bool has_reverse_link_credentials_; // for dblink, mark if this link has credentials set bool usable_; // usable_ = false: connection is unusable, should not execute query again. char *last_set_sql_mode_cstr_; // for mysql dblink to set sql mode diff --git a/deps/oblib/src/lib/worker.cpp b/deps/oblib/src/lib/worker.cpp index 02169ac95d..9e90278199 100644 --- a/deps/oblib/src/lib/worker.cpp +++ b/deps/oblib/src/lib/worker.cpp @@ -44,18 +44,34 @@ int __attribute__((weak)) common_yield() return OB_SUCCESS; } -} +int __attribute__((weak)) SET_GROUP_ID(uint64_t group_id) +{ + int ret = OB_SUCCESS; + THIS_WORKER.set_group_id_(group_id); + return ret; } +int __attribute__((weak)) CONVERT_FUNCTION_TYPE_TO_GROUP_ID(const uint8_t function_type, uint64_t &group_id) +{ + int ret = OB_SUCCESS; + UNUSED(function_type); + group_id = GET_GROUP_ID(); + return ret; +} + +} // namespace lib +} // namespace oceanbase __thread Worker *Worker::self_; Worker::Worker() - : allocator_(nullptr), + : group_(nullptr), + allocator_(nullptr), st_current_priority_(0), session_(nullptr), cur_request_(nullptr), worker_level_(INT32_MAX), curr_request_level_(0), + is_th_worker_(false), group_id_(0), rpc_stat_srv_(nullptr), timeout_ts_(INT64_MAX), diff --git a/deps/oblib/src/lib/worker.h b/deps/oblib/src/lib/worker.h index 48fc3c3950..600f7592f1 100644 --- a/deps/oblib/src/lib/worker.h +++ b/deps/oblib/src/lib/worker.h @@ -82,10 +82,15 @@ public: OB_INLINE void set_curr_request_level(const int32_t level) { curr_request_level_ = level; } OB_INLINE int32_t get_curr_request_level() const { return curr_request_level_; } + OB_INLINE bool is_th_worker() const { return is_th_worker_; } + OB_INLINE void set_group_id_(const uint64_t group_id) { group_id_ = group_id;} - OB_INLINE void set_group_id(int32_t group_id) { group_id_ = group_id; } - OB_INLINE int32_t get_group_id() const { return group_id_; } - + OB_INLINE uint64_t get_group_id() const { return group_id_; } + OB_INLINE void set_group(void *group) { group_ = group; }; + OB_INLINE void *get_group() { return group_;}; + OB_INLINE bool is_group_worker() const { return OB_NOT_NULL(group_); } + OB_INLINE void set_func_type_(uint8_t func_type) { func_type_ = func_type; } + OB_INLINE uint8_t get_func_type() const { return func_type_; } OB_INLINE void set_rpc_stat_srv(void *rpc_stat_srv) { rpc_stat_srv_ = rpc_stat_srv; } OB_INLINE void *get_rpc_stat_srv() const { return rpc_stat_srv_; } @@ -125,12 +130,14 @@ public: static void set_module_type(const ObErrsimModuleType &module_type); static ObErrsimModuleType get_module_type(); #endif - +protected: + OB_INLINE void set_is_th_worker(bool is_th_worker) { is_th_worker_ = is_th_worker; } public: static __thread Worker *self_; public: common::ObDLinkNode worker_node_; + void *group_; protected: // 线程运行时内存从此分配器分配 // 初始tenant_id=500, 在处理request时,tenant_id被更新成request的租户id @@ -143,7 +150,9 @@ private: // whether worker is in blocking int32_t worker_level_; int32_t curr_request_level_; - int32_t group_id_; + bool is_th_worker_; + uint64_t group_id_; + uint8_t func_type_; void *rpc_stat_srv_; int64_t timeout_ts_; @@ -190,6 +199,76 @@ inline Worker &this_worker() #define THIS_WORKER oceanbase::lib::Worker::self() +#define GET_FUNC_TYPE() (THIS_WORKER.get_func_type()) +#define GET_GROUP_ID() (THIS_WORKER.get_group_id()) + +int SET_GROUP_ID(uint64_t group_id); + +int CONVERT_FUNCTION_TYPE_TO_GROUP_ID(const uint8_t function_type, uint64_t &group_id); + +class ConsumerGroupIdGuard +{ +public: + ConsumerGroupIdGuard(uint64_t group_id) + : thread_group_id_(GET_GROUP_ID()), group_changed_(false), ret_(OB_SUCCESS) + { + group_changed_ = group_id != thread_group_id_; + if (group_changed_) { + ret_ = SET_GROUP_ID(group_id); + } + } + ~ConsumerGroupIdGuard() + { + if (group_changed_) { + // SET_GROUP_ID(thread_group_id_); + } + } + int get_ret() + { + return ret_; + } + +private: + uint64_t thread_group_id_; + bool group_changed_; + int ret_; +}; +#define CONSUMER_GROUP_ID_GUARD(group_id) oceanbase::lib::ConsumerGroupIdGuard consumer_group_id_guard_(group_id) + +class ConsumerGroupFuncGuard +{ +public: + ConsumerGroupFuncGuard(uint8_t func_type) + : thread_group_id_(GET_GROUP_ID()), thread_func_type_(GET_FUNC_TYPE()), group_changed_(false), ret_(OB_SUCCESS) + { + // THIS_WORKER.set_func_type_(func_type); + uint64_t group_id = 0; + ret_ = CONVERT_FUNCTION_TYPE_TO_GROUP_ID(func_type, group_id); + if (OB_SUCCESS == ret_ && is_user_group(group_id) && group_id != thread_group_id_) { + group_changed_ = true; + ret_ = SET_GROUP_ID(group_id); + } + } + ~ConsumerGroupFuncGuard() + { + if (group_changed_) { + // SET_GROUP_ID(thread_group_id_); + // THIS_WORKER.set_func_type_(thread_func_type_); + } + } + int get_ret() + { + return ret_; + } + +private: + uint64_t thread_group_id_; + uint8_t thread_func_type_; + bool group_changed_; + int ret_; +}; +#define CONSUMER_GROUP_FUNC_GUARD(func_type) oceanbase::lib::ConsumerGroupFuncGuard consumer_group_func_guard_(func_type) + class DisableSchedInterGuard { public: diff --git a/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp b/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp index 226932a025..02aa124295 100644 --- a/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp +++ b/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp @@ -1233,17 +1233,11 @@ void ObSqlNio::destroy() } } -int __attribute__((weak)) sql_nio_add_cgroup(const uint64_t tenant_id) -{ - return 0; -} void ObSqlNio::run(int64_t idx) { if (NULL != impl_) { lib::set_thread_name("sql_nio", idx); - // if (tenant_id_ != common::OB_INVALID_ID) { - // obmysql::sql_nio_add_cgroup(tenant_id_); - // } + // SET_GROUP_ID(OBCG_SQL_NIO); while(!has_set_stop() && !(OB_NOT_NULL(&lib::Thread::current()) ? lib::Thread::current().has_set_stop() : false)) { impl_[idx].do_work(); } diff --git a/deps/oblib/src/rpc/obmysql/ob_sql_nio.h b/deps/oblib/src/rpc/obmysql/ob_sql_nio.h index df40686dd9..869d8ddf4b 100644 --- a/deps/oblib/src/rpc/obmysql/ob_sql_nio.h +++ b/deps/oblib/src/rpc/obmysql/ob_sql_nio.h @@ -71,7 +71,6 @@ private: uint64_t dispatch_idx_; uint64_t tenant_id_; }; -extern int sql_nio_add_cgroup(const uint64_t tenant_id); }; // end namespace obmysql }; // end namespace oceanbase diff --git a/src/observer/mysql/obmp_query.cpp b/src/observer/mysql/obmp_query.cpp index 5485cbf92f..52d9ced030 100644 --- a/src/observer/mysql/obmp_query.cpp +++ b/src/observer/mysql/obmp_query.cpp @@ -368,7 +368,7 @@ int ObMPQuery::process() IGNORE_RETURN record_flt_trace(session); } - if (OB_UNLIKELY(NULL != GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid() && is_conn_valid()) { + if (is_conn_valid()) { int tmp_ret = OB_SUCCESS; // Call setup_user_resource_group no matter OB_SUCC or OB_FAIL // because we have to reset conn.group_id_ according to user_name. diff --git a/src/observer/mysql/obmp_stmt_execute.cpp b/src/observer/mysql/obmp_stmt_execute.cpp index 82951dd7c8..b12a928592 100644 --- a/src/observer/mysql/obmp_stmt_execute.cpp +++ b/src/observer/mysql/obmp_stmt_execute.cpp @@ -1837,7 +1837,7 @@ int ObMPStmtExecute::process_execute_stmt(const ObMultiStmtItem &multi_stmt_item if (OB_FAIL(do_process_single(session, params_, has_more_result, force_sync_resp, async_resp_used))) { LOG_WARN("fail to do process", K(ret), K(ctx_.cur_sql_)); } - if (OB_UNLIKELY(NULL != GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid() && is_conn_valid()) { + if (is_conn_valid()) { int bak_ret = ret; ObSQLSessionInfo *sess = NULL; if (OB_FAIL(get_session(sess))) { diff --git a/src/observer/ob_inner_sql_connection.cpp b/src/observer/ob_inner_sql_connection.cpp index 6d868d5cb1..5ddec06292 100644 --- a/src/observer/ob_inner_sql_connection.cpp +++ b/src/observer/ob_inner_sql_connection.cpp @@ -697,6 +697,7 @@ int ObInnerSQLConnection::do_query(sqlclient::ObIExecutor &executor, ObInnerSQLR } else { ObSQLSessionInfo &session = res.result_set().get_session(); session.set_expect_group_id(group_id_); + CONSUMER_GROUP_ID_GUARD(consumer_group_id_); if (OB_ISNULL(res.sql_ctx().schema_guard_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("schema guard is null"); diff --git a/src/observer/ob_inner_sql_rpc_processor.cpp b/src/observer/ob_inner_sql_rpc_processor.cpp index a5af6d6a4b..917bc47ca9 100644 --- a/src/observer/ob_inner_sql_rpc_processor.cpp +++ b/src/observer/ob_inner_sql_rpc_processor.cpp @@ -108,7 +108,7 @@ int ObInnerSqlRpcP::process_write( { int ret = OB_SUCCESS; int64_t affected_rows = -1; - ResourceGroupGuard guard(transmit_arg.get_consumer_group_id()); + CONSUMER_GROUP_ID_GUARD(transmit_arg.get_consumer_group_id()); if (OB_FAIL(conn->execute_write(transmit_arg.get_tenant_id(), write_sql.ptr(), affected_rows))) { LOG_WARN("execute write failed", K(ret), K(transmit_arg), K(write_sql)); } else { @@ -129,7 +129,7 @@ int ObInnerSqlRpcP::process_read( int ret = OB_SUCCESS; common::ObScanner &scanner = transmit_result.get_scanner(); scanner.set_found_rows(0); - + CONSUMER_GROUP_ID_GUARD(transmit_arg.get_consumer_group_id()); SMART_VAR(ObMySQLProxy::MySQLResult, res) { sqlclient::ObMySQLResult *sql_result = NULL; if (OB_FAIL(conn->execute_read(GCONF.cluster_id, transmit_arg.get_tenant_id(), read_sql.ptr(), res))) { @@ -486,23 +486,6 @@ int ObInnerSqlRpcP::set_session_param_to_conn( return ret; } -ResourceGroupGuard::ResourceGroupGuard(const int32_t group_id) - : group_change_(false), old_group_id_(0) -{ - if (is_user_group(group_id)) { - old_group_id_ = THIS_WORKER.get_group_id(); - THIS_WORKER.set_group_id(group_id); - group_change_ = true; - } -} - -ResourceGroupGuard::~ResourceGroupGuard() -{ - if (group_change_) { - THIS_WORKER.set_group_id(old_group_id_); - } -} - } } // namespace oceanbase diff --git a/src/observer/ob_inner_sql_rpc_processor.h b/src/observer/ob_inner_sql_rpc_processor.h index 23674307ab..e3679a0f26 100644 --- a/src/observer/ob_inner_sql_rpc_processor.h +++ b/src/observer/ob_inner_sql_rpc_processor.h @@ -75,18 +75,6 @@ private: DISALLOW_COPY_AND_ASSIGN(ObInnerSqlRpcP); }; -class ResourceGroupGuard -{ - //todo qilu:revert after ddl_back_threads are split under tenants -public: - ResourceGroupGuard(const int32_t group_id); - ~ResourceGroupGuard(); -public: - bool group_change_; - int32_t old_group_id_; - -}; - } } #endif /* OBDEV_SRC_OBSERVER_INNER_SQL_RPC_PROCESSOR_H_ */ diff --git a/src/observer/ob_inner_sql_rpc_proxy.h b/src/observer/ob_inner_sql_rpc_proxy.h index 1985ffe28c..660bc13b0f 100644 --- a/src/observer/ob_inner_sql_rpc_proxy.h +++ b/src/observer/ob_inner_sql_rpc_proxy.h @@ -124,10 +124,10 @@ public: int64_t get_trx_timeout() const { return trx_timeout_; } - void set_consumer_group_id(const int64_t consumer_group_id) { + void set_consumer_group_id(const uint64_t consumer_group_id) { consumer_group_id_ = consumer_group_id; } - int64_t get_consumer_group_id() const { + uint64_t get_consumer_group_id() const { return consumer_group_id_; } inline int set_tz_info_wrap(const ObTimeZoneInfoWrap &other) { return tz_info_wrap_.deep_copy(other); } @@ -184,7 +184,7 @@ private: bool is_load_data_exec_; common::ObString nls_formats_[common::ObNLSFormatEnum::NLS_MAX]; bool use_external_session_; - int64_t consumer_group_id_; + uint64_t consumer_group_id_; }; class ObInnerSQLTransmitResult diff --git a/src/observer/ob_srv_deliver.h b/src/observer/ob_srv_deliver.h index 6c43734f29..316c61686f 100644 --- a/src/observer/ob_srv_deliver.h +++ b/src/observer/ob_srv_deliver.h @@ -72,11 +72,8 @@ public: if (thread_name_ != nullptr) { lib::set_thread_name(thread_name_, get_thread_idx()); } - if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread && - tenant_id_ != common::OB_INVALID_ID && nullptr != GCTX.cgroup_ctrl_ && - OB_LIKELY(GCTX.cgroup_ctrl_->is_valid())) { - GCTX.cgroup_ctrl_->add_self_to_cgroup(tenant_id_, - share::OBCG_MYSQL_LOGIN); + if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread) { + lib::SET_GROUP_ID(share::OBCG_MYSQL_LOGIN); } queue_.loop(); } diff --git a/src/observer/omt/ob_multi_tenant.cpp b/src/observer/omt/ob_multi_tenant.cpp index 442faaa4e6..a4da3d6a89 100644 --- a/src/observer/omt/ob_multi_tenant.cpp +++ b/src/observer/omt/ob_multi_tenant.cpp @@ -2469,17 +2469,6 @@ int ObMultiTenant::check_if_unit_id_exist(const uint64_t unit_id, bool &exist) return ret; } -int obmysql::sql_nio_add_cgroup(const uint64_t tenant_id) -{ - int ret = OB_SUCCESS; - if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread && - nullptr != GCTX.cgroup_ctrl_ && - OB_LIKELY(GCTX.cgroup_ctrl_->is_valid())) { - ret = GCTX.cgroup_ctrl_->add_self_to_cgroup(tenant_id, OBCG_SQL_NIO); - } - return ret; -} - int ObSrvNetworkFrame::reload_tenant_sql_thread_config(const uint64_t tenant_id) { int ret = OB_SUCCESS; diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index 7f2e74bf94..0ae8266d23 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -291,10 +291,7 @@ void ObPxPool::run1() CLEAR_INTERRUPTABLE(); ObCgroupCtrl *cgroup_ctrl = GCTX.cgroup_ctrl_; LOG_INFO("run px pool", K(group_id_), K(tenant_id_), K_(active_threads)); - if (nullptr != cgroup_ctrl && OB_LIKELY(cgroup_ctrl->is_valid())) { - cgroup_ctrl->add_self_to_cgroup(tenant_id_, group_id_); - LOG_INFO("add thread to group succ", K(tenant_id_), K(group_id_)); - } + SET_GROUP_ID(group_id_); if (!is_inited_) { queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size); @@ -357,7 +354,7 @@ void ObPxPool::stop() } } -ObResourceGroup::ObResourceGroup(int32_t group_id, ObTenant* tenant, share::ObCgroupCtrl *cgroup_ctrl): +ObResourceGroup::ObResourceGroup(uint64_t group_id, ObTenant* tenant, share::ObCgroupCtrl *cgroup_ctrl): ObResourceGroupNode(group_id), workers_lock_(tenant->workers_lock_), inited_(false), @@ -531,9 +528,6 @@ void ObResourceGroup::check_worker_count(ObThWorker &w) if (OB_UNLIKELY(ATOMIC_LOAD(&shrink_)) && OB_LIKELY(ATOMIC_BCAS(&shrink_, true, false))) { w.stop(); - if (cgroup_ctrl_->is_valid() && OB_FAIL(cgroup_ctrl_->remove_self_from_cgroup(tenant_->id()))) { - LOG_WARN("remove thread from cgroup failed", K(ret), "tenant:", tenant_->id(), K_(group_id)); - } LOG_INFO("worker thread exit", K(tenant_->id()), K(workers_.get_size())); } } @@ -624,7 +618,7 @@ int ObResourceGroup::get_throttled_time(int64_t &throttled_time) return ret; } -int GroupMap::create_and_insert_group(int32_t group_id, ObTenant *tenant, ObCgroupCtrl *cgroup_ctrl, ObResourceGroup *&group) +int GroupMap::create_and_insert_group(uint64_t group_id, ObTenant *tenant, ObCgroupCtrl *cgroup_ctrl, ObResourceGroup *&group) { int ret = OB_SUCCESS; if (nullptr == tenant @@ -1240,11 +1234,12 @@ int ObTenant::get_new_request( w.set_large_query(false); w.set_curr_request_level(0); wk_level = w.get_worker_level(); + ObResourceGroup *group = static_cast(w.get_group()); if (wk_level < 0 || wk_level >= MAX_REQUEST_LEVEL) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("unexpected level", K(wk_level), K(id_)); } else if (wk_level >= MAX_REQUEST_LEVEL - 1) { - ret = w.get_group()->multi_level_queue_.pop_timeup(task, wk_level, timeout); + ret = group->multi_level_queue_.pop_timeup(task, wk_level, timeout); if ((ret == OB_SUCCESS && nullptr == task) || ret == OB_ENTRY_NOT_EXIST) { ret = OB_ENTRY_NOT_EXIST; usleep(10 * 1000L); @@ -1255,17 +1250,17 @@ int ObTenant::get_new_request( LOG_ERROR("pop queue err", "tenant_id", id_, K(ret)); } } else if (w.is_level_worker()) { - ret = w.get_group()->multi_level_queue_.pop(task, wk_level, timeout); + ret = group->multi_level_queue_.pop(task, wk_level, timeout); } else { for (int32_t level = MAX_REQUEST_LEVEL - 1; level >= GROUP_MULTI_LEVEL_THRESHOLD; level--) { - IGNORE_RETURN w.get_group()->multi_level_queue_.try_pop(task, level); + IGNORE_RETURN group->multi_level_queue_.try_pop(task, level); if (nullptr != task) { ret = OB_SUCCESS; break; } } if (nullptr == task) { - ret = w.get_group()->req_queue_.pop(task, timeout); + ret = group->req_queue_.pop(task, timeout); } } } else { @@ -1628,7 +1623,13 @@ void ObTenant::print_throttled_time() LOG_WARN_RET(tmp_ret, "get throttled time failed", K(tmp_ret), K(group)); } else { tenant_throttled_time += group_throttled_time; - databuff_printf(buf, len, pos, "group_id: %d, group: %s, throttled_time: %ld;", group->group_id_, set.name_of_id(group->group_id_), group_throttled_time); + databuff_printf(buf, + len, + pos, + "group_id: %ld, group: %s, throttled_time: %ld;", + group->group_id_, + set.name_of_id(group->group_id_), + group_throttled_time); } } } @@ -1785,9 +1786,6 @@ void ObTenant::check_worker_count(ObThWorker &w) && OB_UNLIKELY(ATOMIC_LOAD(&shrink_)) && OB_LIKELY(ATOMIC_BCAS(&shrink_, true, false))) { w.stop(); - if (cgroup_ctrl_.is_valid() && OB_FAIL(cgroup_ctrl_.remove_self_from_cgroup(id_))) { - LOG_WARN("remove thread from cgroup failed", K(ret), K_(id)); - } LOG_INFO("worker thread exit", K(id_), K(workers_.get_size())); } } @@ -1856,7 +1854,7 @@ void ObTenant::lq_end(ObThWorker &w) { int ret = OB_SUCCESS; if (w.is_lq_yield()) { - if (OB_FAIL(cgroup_ctrl_.add_self_to_cgroup(id_, w.get_group_id()))) { + if (OB_FAIL(cgroup_ctrl_.add_self_to_cgroup_(id_, w.get_group_id()))) { LOG_WARN("move thread from lq group failed", K(ret), K(id_)); } else { w.set_lq_yield(false); @@ -1867,7 +1865,8 @@ void ObTenant::lq_end(ObThWorker &w) void ObTenant::lq_wait(ObThWorker &w) { int64_t last_query_us = ObTimeUtility::current_time() - w.get_last_wakeup_ts(); - int64_t lq_group_worker_cnt = w.get_group()->workers_.get_size(); + ObResourceGroup *group = static_cast(w.get_group()); + int64_t lq_group_worker_cnt = group->workers_.get_size(); int64_t default_group_worker_cnt = workers_.get_size(); double large_query_percentage = GCONF.large_query_worker_percentage / 100.0; int64_t wait_us = static_cast(last_query_us * lq_group_worker_cnt / @@ -1890,7 +1889,7 @@ int ObTenant::lq_yield(ObThWorker &w) } } else if (w.is_lq_yield()) { // avoid duplicate change group - } else if (OB_FAIL(cgroup_ctrl_.add_self_to_cgroup(id_, OBCG_LQ))) { + } else if (OB_FAIL(cgroup_ctrl_.add_self_to_cgroup_(id_, share::OBCG_LQ))) { LOG_WARN("move thread to lq group failed", K(ret), K(id_)); } else { w.set_lq_yield(); diff --git a/src/observer/omt/ob_tenant.h b/src/observer/omt/ob_tenant.h index 56546e3eec..bee1bee4fd 100644 --- a/src/observer/omt/ob_tenant.h +++ b/src/observer/omt/ob_tenant.h @@ -63,7 +63,6 @@ public: ObPxPool() : tenant_id_(common::OB_INVALID_ID), group_id_(0), - cgroup_ctrl_(nullptr), is_inited_(false), concurrency_(0), active_threads_(0) @@ -71,7 +70,6 @@ public: virtual void stop(); void set_tenant_id(uint64_t tenant_id) { tenant_id_ = tenant_id; } void set_group_id(uint64_t group_id) { group_id_ = group_id; } - void set_cgroup_ctrl(share::ObCgroupCtrl *cgroup_ctrl) { cgroup_ctrl_ = cgroup_ctrl; } int64_t get_pool_size() const { return get_thread_count(); } int submit(const RunFuncT &func); void set_px_thread_name(); @@ -90,7 +88,6 @@ private: private: uint64_t tenant_id_; uint64_t group_id_; - share::ObCgroupCtrl *cgroup_ctrl_; common::ObPriorityQueue2<0, 1> queue_; bool is_inited_; int64_t concurrency_; @@ -232,12 +229,12 @@ private: class ObResourceGroupNode : public common::SpHashNode { public: - ObResourceGroupNode(int32_t group_id): + ObResourceGroupNode(uint64_t group_id): common::SpHashNode(calc_hash(group_id)), group_id_(group_id) {} ~ObResourceGroupNode() {} - int64_t calc_hash(int32_t group_id) + int64_t calc_hash(uint64_t group_id) { return (common::murmurhash(&group_id, sizeof(group_id), 0)) | 1; } @@ -258,10 +255,10 @@ public: } return ret; } - int32_t get_group_id() const { return group_id_; } - void set_group_id(const int32_t &group_id) { group_id_ = group_id; } + uint64_t get_group_id() const { return group_id_; } + void set_group_id(const uint64_t &group_id) { group_id_ = group_id; } protected: - int32_t group_id_; + uint64_t group_id_; }; class ObResourceGroup : public ObResourceGroupNode // group container, storing thread pool and queue, each group_id corresponds to one{ @@ -273,7 +270,7 @@ public: using WList = common::ObDList; static constexpr int64_t PRESERVE_INACTIVE_WORKER_TIME = 10 * 1000L * 1000L; - ObResourceGroup(int32_t group_id, ObTenant* tenant, share::ObCgroupCtrl *cgroup_ctrl); + ObResourceGroup(uint64_t group_id, ObTenant* tenant, share::ObCgroupCtrl *cgroup_ctrl); ~ObResourceGroup() {} bool is_inited() const { return inited_; } @@ -282,6 +279,8 @@ public: int64_t min_worker_cnt() const; int64_t max_worker_cnt() const; ObTenant *get_tenant() { return tenant_; } + WList &get_workers() { return workers_; } + lib::ObMutex &get_workers_lock() { return workers_lock_; } share::ObCgroupCtrl *get_cgroup_ctrl() { return cgroup_ctrl_; } int init(); @@ -329,7 +328,7 @@ public: { } ~GroupMap() {} - int create_and_insert_group(int32_t group_id, ObTenant *tenant, share::ObCgroupCtrl *cgroup_ctrl, ObResourceGroup *&group); + int create_and_insert_group(uint64_t group_id, ObTenant *tenant, share::ObCgroupCtrl *cgroup_ctrl, ObResourceGroup *&group); void wait_group(); void destroy_group(); int64_t to_string(char *buf, const int64_t buf_len) const @@ -369,7 +368,7 @@ class ObTenant : public share::ObTenantBase friend class observer::ObAllVirtualDumpTenantInfo; friend class ObResourceGroup; friend int ::select_dump_tenant_info(lua_State*); - friend int create_worker(ObThWorker* &worker, ObTenant *tenant, int32_t group_id, + friend int create_worker(ObThWorker* &worker, ObTenant *tenant, uint64_t group_id, int32_t level, bool force, ObResourceGroup *group); friend int destroy_worker(ObThWorker *worker); using WListNode = common::ObDLinkNode; @@ -508,6 +507,7 @@ public: { return 0; } + GroupMap& get_group_map() { return group_map_;} // OB_INLINE bool has_normal_request() const { return req_queue_.size() != 0; } // OB_INLINE bool has_level_request() const { return OB_NOT_NULL(multi_level_queue_) && multi_level_queue_->get_total_size() != 0; } private: diff --git a/src/observer/omt/ob_th_worker.cpp b/src/observer/omt/ob_th_worker.cpp index b2ddaa4686..f8c5c4e56f 100644 --- a/src/observer/omt/ob_th_worker.cpp +++ b/src/observer/omt/ob_th_worker.cpp @@ -44,7 +44,7 @@ namespace oceanbase namespace omt { -int create_worker(ObThWorker* &worker, ObTenant *tenant, int32_t group_id, +int create_worker(ObThWorker* &worker, ObTenant *tenant, uint64_t group_id, int32_t level, bool force, ObResourceGroup *group) { int ret = OB_SUCCESS; @@ -65,7 +65,7 @@ int create_worker(ObThWorker* &worker, ObTenant *tenant, int32_t group_id, } else { worker->reset(); worker->set_tenant(tenant); - worker->set_group_id(group_id); + worker->set_group_id_(group_id); worker->set_worker_level(level); worker->set_group(group); if (OB_FAIL(worker->start())) { @@ -101,12 +101,12 @@ int destroy_worker(ObThWorker *worker) ObThWorker::ObThWorker() : procor_(ObServer::get_instance().get_net_frame().get_xlator(), ObServer::get_instance().get_self()), is_inited_(false), tenant_(nullptr), - group_(nullptr), run_cond_(), + run_cond_(), pause_flag_(false), large_query_(false), priority_limit_(RQ_LOW), is_lq_yield_(false), query_start_time_(0), last_check_time_(0), can_retry_(true), need_retry_(false), - has_add_to_cgroup_(false), last_wakeup_ts_(0), blocking_ts_(nullptr), + last_wakeup_ts_(0), blocking_ts_(nullptr), idle_us_(0) { } @@ -125,6 +125,7 @@ int ObThWorker::init() } else if (OB_FAIL(run_cond_.init(ObWaitEventIds::TH_WORKER_COND_WAIT))) { LOG_ERROR("init run cond fail, ", K(ret)); } else { + set_is_th_worker(true); is_inited_ = true; } @@ -305,7 +306,7 @@ void ObThWorker::set_th_worker_thread_name() char buf[32]; if (serving_tenant_id != tenant_->id()) { serving_tenant_id = tenant_->id(); - snprintf(buf, 32, "L%d_G%d", get_worker_level(), get_group_id()); + snprintf(buf, sizeof(buf), "L%d_G%ld", get_worker_level(), get_group_id()); lib::set_thread_name(buf); } } @@ -332,11 +333,6 @@ void ObThWorker::worker(int64_t &tenant_id, int64_t &req_recv_timestamp, int32_t if (OB_NOT_NULL(tenant_)) { tenant_id = tenant_->id(); } - if (OB_NOT_NULL(GCTX.cgroup_ctrl_) && OB_LIKELY(GCTX.cgroup_ctrl_->is_valid()) && !has_add_to_cgroup_) { - if (OB_SUCC(GCTX.cgroup_ctrl_->add_self_to_cgroup(tenant_->id(), get_group_id()))) { - has_add_to_cgroup_ = true; - } - } if (OB_NOT_NULL(pm)) { if (pm->get_used() != 0) { LOG_ERROR("page manager's used should be 0, unexpected!!!", KP(pm)); @@ -411,10 +407,11 @@ void ObThWorker::worker(int64_t &tenant_id, int64_t &req_recv_timestamp, int32_t if (this->get_worker_level() != 0) { // nesting workers not allowed to calling check_worker_count } else if (this->get_group() == nullptr) { - tenant_->check_worker_count(*this); tenant_->lq_end(*this); + tenant_->check_worker_count(*this); } else { - group_->check_worker_count(*this); + ObResourceGroup *group = static_cast(group_); + group->check_worker_count(*this); } } } @@ -431,6 +428,7 @@ void ObThWorker::run(int64_t idx) int64_t tenant_id = -1; int64_t req_recv_timestamp = -1; int32_t worker_level = -1; + SET_GROUP_ID(get_group_id()); this->worker(tenant_id, req_recv_timestamp, worker_level); } diff --git a/src/observer/omt/ob_th_worker.h b/src/observer/omt/ob_th_worker.h index 4dcf10f55b..a7a066d608 100644 --- a/src/observer/omt/ob_th_worker.h +++ b/src/observer/omt/ob_th_worker.h @@ -74,8 +74,6 @@ public: set_run_wrapper(MTL_CTX()); } - OB_INLINE void set_group(ObResourceGroup *group) { group_ = group; } - void worker(int64_t &tenant_id, int64_t &req_recv_timestamp, int32_t &worker_level); void run(int64_t idx) override; @@ -87,8 +85,6 @@ public: OB_INLINE bool large_query() const { return large_query_; } OB_INLINE void set_large_query(bool v=true) { large_query_ = v; } - - OB_INLINE bool is_group_worker() const { return OB_NOT_NULL(group_); } OB_INLINE bool is_level_worker() const { return get_worker_level() > 0; } OB_INLINE void set_priority_limit(uint8_t limit) { priority_limit_ = limit; } OB_INLINE bool is_high_priority() const { return priority_limit_ == QQ_HIGH; } @@ -100,7 +96,6 @@ public: OB_INLINE int64_t get_query_start_time() const { return query_start_time_; } OB_INLINE int64_t get_query_enqueue_time() const { return query_enqueue_time_; } OB_INLINE ObTenant* get_tenant() { return tenant_; } - OB_INLINE ObResourceGroup* get_group() { return group_; } OB_INLINE bool is_lq_yield() const { return is_lq_yield_; } OB_INLINE void set_lq_yield(bool v=true) { is_lq_yield_ = v; } OB_INLINE int64_t get_last_wakeup_ts() { return last_wakeup_ts_; } @@ -118,7 +113,6 @@ private: bool is_inited_; ObTenant *tenant_; - ObResourceGroup *group_; common::ObThreadCond run_cond_; bool pause_flag_; @@ -135,8 +129,6 @@ private: // if upper scheduler support retry, need this request retry? bool need_retry_; - bool has_add_to_cgroup_; - int64_t last_wakeup_ts_; int64_t* blocking_ts_; int64_t idle_us_; @@ -156,7 +148,6 @@ inline void ObThWorker::reset() query_enqueue_time_ = 0; can_retry_ = true; need_retry_ = false; - has_add_to_cgroup_ = false; last_wakeup_ts_ = 0; } @@ -168,7 +159,7 @@ group_id: set worker's group_id level: set worker's level, in ObResourceGroup level = INT32_MAX, in ObTenant level = 0, group: set worker's group, in ObResourceGroup level = this, in ObTenant level = nullptr, */ -int create_worker(ObThWorker* &worker, ObTenant *tenant, int32_t group_id, +int create_worker(ObThWorker* &worker, ObTenant *tenant, uint64_t group_id, int32_t level = INT32_MAX, bool force = false, ObResourceGroup *group = nullptr); // defalut level=INT32_MAX, group=nullptr int destroy_worker(ObThWorker *worker); diff --git a/src/share/rc/ob_tenant_base.cpp b/src/share/rc/ob_tenant_base.cpp index f0a52858e7..e4afaf8855 100644 --- a/src/share/rc/ob_tenant_base.cpp +++ b/src/share/rc/ob_tenant_base.cpp @@ -298,10 +298,7 @@ int ObTenantBase::pre_run() { int ret = OB_SUCCESS; ObTenantEnv::set_tenant(this); - ObCgroupCtrl *cgroup_ctrl = get_cgroup(); - if (cgroup_ctrl != nullptr && cgroup_ctrl->is_valid()) { - ret = cgroup_ctrl->add_self_to_cgroup(id_); - } + ret = lib::SET_GROUP_ID(OBCG_DEFAULT); { ThreadListNode *node = lib::Thread::current().get_thread_list_node(); lib::ObMutexGuard guard(thread_list_lock_); @@ -320,9 +317,6 @@ int ObTenantBase::end_run() int ret = OB_SUCCESS; ObTenantEnv::set_tenant(nullptr); ObCgroupCtrl *cgroup_ctrl = get_cgroup(); - if (cgroup_ctrl != nullptr && cgroup_ctrl->is_valid()) { - ret = cgroup_ctrl->remove_self_from_cgroup(id_); - } { ThreadListNode *node = lib::Thread::current().get_thread_list_node(); lib::ObMutexGuard guard(thread_list_lock_); diff --git a/src/share/resource_manager/ob_cgroup_ctrl.cpp b/src/share/resource_manager/ob_cgroup_ctrl.cpp index 3b9bbf1847..9582d23556 100644 --- a/src/share/resource_manager/ob_cgroup_ctrl.cpp +++ b/src/share/resource_manager/ob_cgroup_ctrl.cpp @@ -22,20 +22,49 @@ #include "share/resource_manager/ob_resource_plan_info.h" #include "share/resource_manager/ob_resource_manager.h" #include "share/resource_manager/ob_cgroup_ctrl.h" +#include "observer/omt/ob_tenant.h" +#include "observer/omt/ob_multi_tenant.h" #include #include using namespace oceanbase::common; using namespace oceanbase::share; +using namespace oceanbase::omt; namespace oceanbase { + +namespace lib +{ + +int SET_GROUP_ID(uint64_t group_id) +{ + int ret = OB_SUCCESS; + + // to do switch group + + THIS_WORKER.set_group_id_(group_id); + int tmp_ret = OB_SUCCESS; + if (OB_NOT_NULL(GCTX.cgroup_ctrl_) + && OB_TMP_FAIL(GCTX.cgroup_ctrl_->add_self_to_cgroup_(MTL_ID(), group_id))) { + LOG_WARN("add self to cgroup fail", K(ret), K(MTL_ID()), K(group_id)); + } + return ret; +} + +int CONVERT_FUNCTION_TYPE_TO_GROUP_ID(const uint8_t function_type, uint64_t &group_id) +{ + return G_RES_MGR.get_mapping_rule_mgr().get_group_id_by_function_type(MTL_ID(), function_type, group_id); +} + +} // namespace lib + namespace share { -ObCgSet ObCgSet::instance_; -} -} +ObCgSet ObCgSet::instance_; +} // namespace share +} // namespace oceanbase // cgroup config name static const char *CPU_SHARES_FILE = "cpu.shares"; @@ -45,6 +74,7 @@ static const char *CPU_CFS_QUOTA_FILE = "cpu.cfs_quota_us"; static const char *CPU_CFS_PERIOD_FILE = "cpu.cfs_period_us"; static const char *CPUACCT_USAGE_FILE = "cpuacct.usage"; static const char *CPU_STAT_FILE = "cpu.stat"; +static const char *CGROUP_PROCS_FILE = "cgroup.procs"; //集成IO参数 int OBGroupIOInfo::init(int64_t min_percent, int64_t max_percent, int64_t weight_percent) @@ -100,20 +130,15 @@ int ObCgroupCtrl::init() } else { LOG_ERROR("init cgroup dir failed", K(ret), K(root_cgroup_)); } - } else if (OB_FAIL(init_dir_(other_cgroup_))) { - LOG_WARN("init other cgroup dir failed", K(ret), K_(other_cgroup)); } else { - char procs_path[PATH_BUFSIZE]; char pid_value[VALUE_BUFSIZE]; - snprintf(procs_path, PATH_BUFSIZE, "%s/cgroup.procs", other_cgroup_); snprintf(pid_value, VALUE_BUFSIZE, "%d", getpid()); - if(OB_FAIL(write_string_to_file_(procs_path, pid_value))) { - LOG_ERROR("add tid to cgroup failed", K(ret), K(procs_path), K(pid_value)); + if (OB_FAIL(set_cgroup_config_(other_cgroup_, CGROUP_PROCS_FILE, pid_value))) { + LOG_ERROR("add tid to cgroup failed", K(ret), K(other_cgroup_), K(pid_value)); } else { valid_ = true; } } - return ret; } @@ -363,19 +388,37 @@ int ObCgroupCtrl::get_group_path( return ret; } -int ObCgroupCtrl::add_self_to_cgroup(const uint64_t tenant_id, uint64_t group_id, const char *base_path) +int ObCgroupCtrl::add_self_to_cgroup_(const uint64_t tenant_id, const uint64_t group_id) { int ret = OB_SUCCESS; - char group_path[PATH_BUFSIZE]; - char tid_value[VALUE_BUFSIZE + 1]; + if (is_valid()) { + ResourceGroupType group_type = ResourceGroupType::INVALID_GROUP; + if (is_user_group(group_id) && + OB_FAIL(G_RES_MGR.get_mapping_rule_mgr().get_group_type_by_id(tenant_id, group_id, group_type))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("get group type by id failed", K(ret), K(tenant_id), K(group_id), K(group_type)); + } + } - snprintf(tid_value, VALUE_BUFSIZE, "%ld", gettid()); - if (OB_FAIL(get_group_path(group_path, PATH_BUFSIZE, tenant_id, group_id, base_path))) { - LOG_WARN("fail get group path", K(tenant_id), K(ret)); - } else if (OB_FAIL(set_cgroup_config_(group_path, TASKS_FILE, tid_value))) { - LOG_WARN("add tid to cgroup failed", K(ret), K(group_path), K(tid_value), K(tenant_id)); - } else { - LOG_INFO("add tid to cgroup success", K(group_path), K(tid_value), K(tenant_id), K(group_id)); + const char *base_path = + (GCONF.enable_global_background_resource_isolation && ResourceGroupType::FUNCTION_GROUP == group_type) + ? BACKGROUND_CGROUP + : ""; + char group_path[PATH_BUFSIZE]; + char tid_value[VALUE_BUFSIZE + 1]; + + snprintf(tid_value, VALUE_BUFSIZE, "%ld", gettid()); + if (OB_FAIL(ret)) { + // do nothing + } else if (OB_FAIL(get_group_path(group_path, PATH_BUFSIZE, tenant_id, group_id, base_path))) { + LOG_WARN("fail get group path", K(tenant_id), K(ret)); + } else if (OB_FAIL(set_cgroup_config_(group_path, TASKS_FILE, tid_value))) { + LOG_WARN("add tid to cgroup failed", K(ret), K(group_path), K(tid_value), K(tenant_id)); + } else { + LOG_INFO("add tid to cgroup success", K(group_path), K(tid_value), K(tenant_id), K(group_id)); + } } return ret; } @@ -394,20 +437,6 @@ int ObCgroupCtrl::get_group_info_by_group_id(const uint64_t tenant_id, return ret; } -int ObCgroupCtrl::remove_self_from_cgroup(const uint64_t tenant_id) -{ - int ret = OB_SUCCESS; - char tid_value[VALUE_BUFSIZE]; - // 把该tid加入other_cgroup目录的tasks文件中就会从其它tasks中删除 - snprintf(tid_value, VALUE_BUFSIZE, "%ld", gettid()); - if (OB_FAIL(set_cgroup_config_(other_cgroup_, TASKS_FILE, tid_value))) { - LOG_WARN("remove tid to cgroup failed", K(ret), K(other_cgroup_), K(tid_value), K(tenant_id)); - } else { - LOG_INFO("remove tid to cgroup success", K(other_cgroup_), K(tid_value), K(tenant_id)); - } - return ret; -} - int ObCgroupCtrl::get_cgroup_config_(const char *group_path, const char *config_name, char *config_value) { int ret = OB_SUCCESS; diff --git a/src/share/resource_manager/ob_cgroup_ctrl.h b/src/share/resource_manager/ob_cgroup_ctrl.h index 1be9b9244c..4c86ee910b 100644 --- a/src/share/resource_manager/ob_cgroup_ctrl.h +++ b/src/share/resource_manager/ob_cgroup_ctrl.h @@ -152,10 +152,7 @@ public: int remove_both_cgroup(const uint64_t tenant_id, const uint64_t group_id = OB_INVALID_GROUP_ID, const char *base_path = ""); static int remove_dir_(const char *curr_dir); - int add_self_to_cgroup(const uint64_t tenant_id, uint64_t group_id = OBCG_DEFAULT, const char *base_path = ""); - - // 从指定租户cgroup组移除指定tid - int remove_self_from_cgroup(const uint64_t tenant_id); + int add_self_to_cgroup_(const uint64_t tenant_id, const uint64_t group_id = OBCG_DEFAULT); static int get_cgroup_config_(const char *group_path, const char *config_name, char *config_value); static int set_cgroup_config_(const char *group_path, const char *config_name, char *config_value); diff --git a/src/share/resource_manager/ob_resource_mapping_rule_manager.cpp b/src/share/resource_manager/ob_resource_mapping_rule_manager.cpp index d81f7e9ee6..a7d376756f 100644 --- a/src/share/resource_manager/ob_resource_mapping_rule_manager.cpp +++ b/src/share/resource_manager/ob_resource_mapping_rule_manager.cpp @@ -27,7 +27,7 @@ int ObResourceMappingRuleManager::init() int rule_bucket_size = 4096; int group_bucket_size = 512; if (user_rule_map_.created() || group_id_name_map_.created() || - function_rule_map_.created() || group_name_id_map_.created()) { + function_rule_map_.created() || group_name_id_map_.created() || group_id_type_map_.created()) { ret = OB_INIT_TWICE; LOG_WARN("mapping rule manager should not init multiple times", K(ret)); } else if (OB_FAIL(user_rule_map_.create(rule_bucket_size, "UsrRuleMap", "UsrRuleMapNode"))) { @@ -40,7 +40,7 @@ int ObResourceMappingRuleManager::init() LOG_WARN("fail create function rule map", K(ret)); } else if (OB_FAIL(group_name_id_map_.create(group_bucket_size, "GrpNameIdMap", "GrpNameIdNode"))) { LOG_WARN("fail create name id map", K(ret)); - } + } else if (OB_FAIL(group_id_type_map_.create(group_bucket_size, "GrpIdTypeMap", "GrpIdTypeNode"))) LOG_INFO("resource mapping rule manager init ok"); return ret; } @@ -155,6 +155,10 @@ int ObResourceMappingRuleManager::refresh_resource_user_mapping_rule( 1 /* overwrite on dup key */))) { LOG_WARN("fail set user mapping rule to rule_map", K(rule), K(ret)); } + if (OB_SUCC(ret) && OB_FAIL(group_id_type_map_.set_refactored( + share::ObTenantGroupIdKey(rule.tenant_id_, group_id), ResourceGroupType::USER_GROUP))) { + LOG_WARN("group_id_type_map_ set_refactored failed", K(ret), K(group_id)); + } } LOG_INFO("refresh resource user mapping rule", K(tenant_id), K(plan), K(user_rules)); } @@ -192,6 +196,10 @@ int ObResourceMappingRuleManager::refresh_resource_function_mapping_rule( 1 /* overwrite on dup key */))) { LOG_WARN("fail set user mapping rule to rule_map", K(rule), K(ret)); } + if (OB_SUCC(ret) && OB_FAIL(group_id_type_map_.set_refactored( + share::ObTenantGroupIdKey(rule.tenant_id_, group_id), ResourceGroupType::FUNCTION_GROUP))) { + LOG_WARN("group_id_type_map_ set_refactored failed", K(ret), K(group_id)); + } } LOG_INFO("refresh_resource_function_mapping_rule", K(tenant_id), K(plan), K(rules)); } diff --git a/src/share/resource_manager/ob_resource_mapping_rule_manager.h b/src/share/resource_manager/ob_resource_mapping_rule_manager.h index 749a001b42..f547943fb5 100644 --- a/src/share/resource_manager/ob_resource_mapping_rule_manager.h +++ b/src/share/resource_manager/ob_resource_mapping_rule_manager.h @@ -29,6 +29,15 @@ class ObString; } namespace share { + +enum class ResourceGroupType { + INVALID_GROUP, + USER_GROUP, + FUNCTION_GROUP, + SQL_GROUP, + END_GROUP +}; + class ObResourceManagerProxy; class ObResourceMappingRuleManager { @@ -126,7 +135,11 @@ public: int ret = function_rule_map_.set_refactored(share::ObTenantFunctionKey(tenant_id, func), 0, 1/*overwrite*/); return ret; } - + inline int get_group_type_by_id(const uint64_t tenant_id, uint64_t group_id, ResourceGroupType &group_type) + { + int ret = group_id_type_map_.get_refactored(share::ObTenantGroupIdKey(tenant_id, group_id), group_type); + return ret; + } private: int refresh_resource_function_mapping_rule( ObResourceManagerProxy &proxy, @@ -146,6 +159,7 @@ private: common::hash::ObHashMap group_id_name_map_; // 将 group_name 映射到 group_id, 用于快速根据group_name找到id(主要是用于io控制) common::hash::ObHashMap group_name_id_map_; + common::hash::ObHashMap group_id_type_map_; DISALLOW_COPY_AND_ASSIGN(ObResourceMappingRuleManager); }; } diff --git a/src/share/resource_manager/ob_resource_plan_info.h b/src/share/resource_manager/ob_resource_plan_info.h index 083b7c30a6..70bb3c0bc5 100644 --- a/src/share/resource_manager/ob_resource_plan_info.h +++ b/src/share/resource_manager/ob_resource_plan_info.h @@ -157,6 +157,54 @@ public: ObResMgrVarcharValue func_name_; }; +// ObTenantGroupIdKey +class ObTenantGroupIdKey { +public: + ObTenantGroupIdKey() : tenant_id_(OB_INVALID_TENANT_ID), group_id_(OB_INVALID_ID) + {} + ObTenantGroupIdKey(const uint64_t tenant_id, const uint64_t group_id) : + tenant_id_(tenant_id), group_id_(group_id) + {} + int assign(const ObTenantGroupIdKey &other) + { + tenant_id_ = other.tenant_id_; + group_id_ = other.group_id_; + return common::OB_SUCCESS; + } + uint64_t hash() const + { + uint64_t hash_val = 0; + hash_val = common::murmurhash(&tenant_id_, sizeof(tenant_id_), hash_val); + hash_val = common::murmurhash(&group_id_, sizeof(group_id_), hash_val); + return hash_val; + } + int hash(uint64_t &hash_val) const + { + hash_val = hash(); + return common::OB_SUCCESS; + } + int compare(const ObTenantGroupIdKey& r) const + { + int cmp = 0; + if (tenant_id_ < r.tenant_id_) { + cmp = -1; + } else if (tenant_id_ == r.tenant_id_) { + cmp = group_id_ < r.group_id_ ? -1 : (group_id_ > r.group_id_ ? 1 : 0); + } else { + cmp = 1; + } + return cmp; + } + bool operator== (const ObTenantGroupIdKey &other) const { return 0 == compare(other); } + bool operator!=(const ObTenantGroupIdKey &other) const { return !operator==(other); } + bool operator<(const ObTenantGroupIdKey &other) const { return -1 == compare(other); } + TO_STRING_KV(K_(tenant_id), K_(group_id)); + +public: + uint64_t tenant_id_; + uint64_t group_id_; +}; + class ObPlanDirective { public: diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.cpp b/src/share/scheduler/ob_tenant_dag_scheduler.cpp index 82d909b9a8..225a2f6df8 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.cpp +++ b/src/share/scheduler/ob_tenant_dag_scheduler.cpp @@ -1579,7 +1579,6 @@ ObTenantDagWorker::ObTenantDagWorker() check_period_(0), last_check_time_(0), function_type_(0), - group_id_(OB_INVALID_GROUP_ID), tg_id_(-1), hold_by_compaction_dag_(false), is_inited_(false) @@ -1650,7 +1649,6 @@ void ObTenantDagWorker::reset() check_period_ = 0; last_check_time_ = 0; function_type_ = 0; - group_id_ = OB_INVALID_GROUP_ID; self_ = NULL; is_inited_ = false; TG_DESTROY(tg_id_); @@ -1668,36 +1666,6 @@ void ObTenantDagWorker::resume() notify(DWS_RUNNABLE); } -int ObTenantDagWorker::set_dag_resource(const uint64_t group_id) -{ - int ret = OB_SUCCESS; - uint64_t consumer_group_id = USER_RESOURCE_OTHER_GROUP_ID; - if (is_user_group(group_id)) { - //user level - consumer_group_id = group_id; - } else if (OB_FAIL(G_RES_MGR.get_mapping_rule_mgr().get_group_id_by_function_type(MTL_ID(), function_type_, consumer_group_id))) { - //function level - LOG_WARN("fail to get group id by function", K(ret), K(MTL_ID()), K(function_type_), K(consumer_group_id)); - } - - if (OB_SUCC(ret) && consumer_group_id != group_id_) { - // for CPU isolation, depend on cgroup - if (OB_NOT_NULL(GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid() && - OB_FAIL(GCTX.cgroup_ctrl_->add_self_to_cgroup( - MTL_ID(), - consumer_group_id, - GCONF.enable_global_background_resource_isolation ? BACKGROUND_CGROUP - : ""))) { - LOG_WARN("bind back thread to group failed", K(ret), K(GETTID()), K(MTL_ID()), K(group_id)); - } else { - // for IOPS isolation, only depend on consumer_group_id - ATOMIC_SET(&group_id_, consumer_group_id); - THIS_WORKER.set_group_id(static_cast(consumer_group_id)); - } - } - return ret; -} - bool ObTenantDagWorker::need_wake_up() const { return (ObTimeUtility::fast_current_time() - last_check_time_) > check_period_ * 10; @@ -1758,6 +1726,7 @@ void ObTenantDagWorker::run1() } if (OB_SUCC(ret)) { + CONSUMER_GROUP_ID_GUARD(dag->get_consumer_group_id()); ObDagId dag_id = dag->get_dag_id(); if (task_->get_sub_task_id() > 0) { dag_id.set_sub_id(task_->get_sub_task_id()); @@ -1778,9 +1747,8 @@ void ObTenantDagWorker::run1() } else { THIS_WORKER.set_log_reduction_mode(LogReductionMode::NONE); } - if (OB_FAIL(set_dag_resource(dag->get_consumer_group_id()))) { - LOG_WARN("isolate dag CPU and IOPS failed", K(ret)); - } else if (OB_FAIL(task_->do_work())) { + CONSUMER_GROUP_FUNC_GUARD(function_type_); + if (OB_FAIL(task_->do_work())) { if (!dag->ignore_warning()) { COMMON_LOG(WARN, "failed to do work", K(ret), K(*task_), K(compat_mode)); } diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.h b/src/share/scheduler/ob_tenant_dag_scheduler.h index 47eab75984..4b0bf49842 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.h +++ b/src/share/scheduler/ob_tenant_dag_scheduler.h @@ -728,8 +728,7 @@ public: void run1() override; int yield(); void set_task(ObITask *task); - void set_function_type(const int64_t function_type) { function_type_ = function_type; } - int set_dag_resource(const uint64_t group_id); + void set_function_type(const uint8_t function_type) { function_type_ = function_type; } bool need_wake_up() const; ObITask *get_task() const { return task_; } DagWorkerStatus get_status() { return status_; } @@ -738,7 +737,6 @@ public: static bool is_reserve_mode() { return is_reserve_mode_; } static compaction::ObCompactionMemoryContext* get_mem_ctx() { return mem_ctx_; } static void set_mem_ctx(compaction::ObCompactionMemoryContext *mem_ctx) { if (nullptr == mem_ctx_) { mem_ctx_ = mem_ctx; } } - uint64_t get_group_id() { return group_id_; } bool get_force_cancel_flag(); bool hold_by_compaction_dag() const { return hold_by_compaction_dag_; } private: @@ -755,8 +753,7 @@ private: DagWorkerStatus status_; int64_t check_period_; int64_t last_check_time_; - int64_t function_type_; - uint64_t group_id_; + uint8_t function_type_; int tg_id_; bool hold_by_compaction_dag_; bool is_inited_; diff --git a/src/sql/engine/px/ob_px_worker.cpp b/src/sql/engine/px/ob_px_worker.cpp index 35a74e57ec..00e6050ea2 100644 --- a/src/sql/engine/px/ob_px_worker.cpp +++ b/src/sql/engine/px/ob_px_worker.cpp @@ -188,7 +188,7 @@ void PxWorkerFunctor::operator ()(bool need_exec) ObThreadLogLevelUtils::init(env_arg_.get_log_level()); } } - THIS_WORKER.set_group_id(env_arg_.get_group_id()); + SET_GROUP_ID(env_arg_.get_group_id()); // When deserialize expr, sql mode will affect basic function of expr. CompatModeGuard mode_guard(env_arg_.is_oracle_mode() ? Worker::CompatMode::ORACLE : Worker::CompatMode::MYSQL); MTL_SWITCH(sqc_handler->get_tenant_id()) { diff --git a/src/storage/blocksstable/ob_block_manager.cpp b/src/storage/blocksstable/ob_block_manager.cpp index fc46108be3..1b71fb77c7 100644 --- a/src/storage/blocksstable/ob_block_manager.cpp +++ b/src/storage/blocksstable/ob_block_manager.cpp @@ -1112,9 +1112,9 @@ int ObBlockManager::mark_macro_blocks( const uint64_t tenant_id = mtl_tenant_ids.at(i); MacroBlockId macro_id; MTL_SWITCH(tenant_id) { - if (OB_FAIL(set_group_id(tenant_id))) { - LOG_WARN("isolate CPU and IOPS failed", K(ret)); - } else if (OB_FAIL(mark_tenant_blocks(mark_info, macro_id_set, tmp_status))) { + CONSUMER_GROUP_FUNC_GUARD(ObFunctionType::PRIO_OTHER_BACKGROUND); + if (OB_FAIL(mark_tenant_blocks(mark_info, macro_id_set, + tmp_status))) { LOG_WARN("fail to mark tenant blocks", K(ret), K(tenant_id)); } else if (OB_FALSE_IT(MTL(ObSharedMacroBlockMgr*)->get_cur_shared_block(macro_id))) { } else if (OB_FAIL(mark_held_shared_block(macro_id, mark_info, macro_id_set, tmp_status))) { @@ -1598,34 +1598,6 @@ int ObBlockManager::update_mark_info(const MacroBlockId ¯o_id, return ret; } -int ObBlockManager::set_group_id(const uint64_t tenant_id) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(tenant_id <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid tenant id", K(ret), K(tenant_id)); - } else { - uint64_t consumer_group_id = 0; - if (OB_FAIL(G_RES_MGR.get_mapping_rule_mgr().get_group_id_by_function_type(tenant_id, ObFunctionType::PRIO_OTHER_BACKGROUND, consumer_group_id))) { - //function level - LOG_WARN("fail to get group id by function", K(ret), K(tenant_id), K(consumer_group_id)); - } else if (consumer_group_id != group_id_) { - // for CPU isolation, depend on cgroup - if (OB_NOT_NULL(GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid() && - OB_FAIL(GCTX.cgroup_ctrl_->add_self_to_cgroup(tenant_id, - consumer_group_id, - GCONF.enable_global_background_resource_isolation ? BACKGROUND_CGROUP : ""))) { - LOG_WARN("bind back thread to group failed", K(ret), K(GETTID()), K(tenant_id), K(consumer_group_id)); - } - } - if (OB_SUCC(ret)) { - group_id_ = consumer_group_id; - THIS_WORKER.set_group_id(static_cast(consumer_group_id)); - } - } - return ret; -} - int ObBlockManager::BlockMapIterator::get_next_block(common::ObIOFd &block_id) { int ret = OB_SUCCESS; diff --git a/src/storage/blocksstable/ob_block_manager.h b/src/storage/blocksstable/ob_block_manager.h index 6fa81efb46..5681756392 100644 --- a/src/storage/blocksstable/ob_block_manager.h +++ b/src/storage/blocksstable/ob_block_manager.h @@ -405,7 +405,6 @@ private: MacroBlkIdMap &mark_info, common::hash::ObHashSet ¯o_id_set, ObMacroBlockMarkerStatus &tmp_status); - int set_group_id(const uint64_t tenant_id); int do_sweep(MacroBlkIdMap &mark_info); int sweep_one_block(const MacroBlockId& macro_id); diff --git a/unittest/observer/omt/test_cgroup_ctrl.cpp b/unittest/observer/omt/test_cgroup_ctrl.cpp index 29ab914d35..1121d1b0e3 100644 --- a/unittest/observer/omt/test_cgroup_ctrl.cpp +++ b/unittest/observer/omt/test_cgroup_ctrl.cpp @@ -24,9 +24,9 @@ using namespace oceanbase::observer; void *thread_func(void *args) { - ASSERT_EQ(OB_SUCCESS, cg_ctrl.add_self_to_cgroup(1001)); + ASSERT_EQ(OB_SUCCESS, cg_ctrl.add_self_to_cgroup_(1001)); sleep(3); - ASSERT_EQ(OB_SUCCESS, cg_ctrl.remove_self_from_cgroup(1001)); + ASSERT_EQ(OB_SUCCESS, cg_ctrl.add_self_to_cgroup_(1001)); return nullptr; }