diff --git a/deps/oblib/src/lib/thread/thread_define.h b/deps/oblib/src/lib/thread/thread_define.h index 8befe27344..a528ff5574 100644 --- a/deps/oblib/src/lib/thread/thread_define.h +++ b/deps/oblib/src/lib/thread/thread_define.h @@ -11,29 +11,19 @@ */ #ifdef TG_DEF -// (tg_def_id, name, desc, scope, type, args...) +// (tg_def_id, name, type, args...) // for test -TG_DEF(TEST1, test1, "", TG_STATIC, TIMER) -TG_DEF(TEST2, test2, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(1 ,1), 10) -TG_DEF(TEST3, test3, "", TG_STATIC, DEDUP_QUEUE, ThreadCountPair(1, 1), 8, 8, 16L << 20, 16L << 20,8192, "test") -TG_DEF(TEST4, test4, "", TG_STATIC, THREAD_POOL, ThreadCountPair(1 ,1)) -TG_DEF(TEST5, test5, "", TG_STATIC, ASYNC_TASK_QUEUE, ThreadCountPair(1 ,1), 16) -TG_DEF(TEST6, test6, "", TG_STATIC, TIMER_GROUP, ThreadCountPair(2 ,2)) -TG_DEF(TEST7, test7, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(10 ,10), 10) -TG_DEF(TEST8, test8, "", TG_STATIC, REENTRANT_THREAD_POOL, ThreadCountPair(1 ,1)) +TG_DEF(TEST1, test1, TIMER) +TG_DEF(TEST2, test2, QUEUE_THREAD, 1, 10) +TG_DEF(TEST3, test3, DEDUP_QUEUE, 1, 8, 8, 16L << 20, 16L << 20,8192, "test") +TG_DEF(TEST4, test4, THREAD_POOL, 1) +TG_DEF(TEST5, test5, ASYNC_TASK_QUEUE, 1, 16) +TG_DEF(TEST6, test6, MAP_QUEUE_THREAD, 2) +TG_DEF(TEST7, test7, QUEUE_THREAD, 10, 10) +TG_DEF(TEST8, test8, REENTRANT_THREAD_POOL, 1) // other -TG_DEF(MEMORY_DUMP, memDump, "", TG_STATIC, THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(SchemaRefTask, SchemaRefTask, "", TG_STATIC, DEDUP_QUEUE, ThreadCountPair(1, 1), 1024, 1024, 1L << 30, 512L << 20, common::OB_MALLOC_BIG_BLOCK_SIZE, "SchemaDedupQueu") -TG_DEF(CONFIG_MGR, ConfigMgr, "", TG_STATIC, TIMER, 1024) -TG_DEF(ReqMemEvict, ReqMemEvict, "", TG_DYNAMIC, TIMER) -TG_DEF(IO_TUNING, IO_TUNING, "", TG_STATIC, THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(IO_SCHEDULE, IO_SCHEDULE, "", TG_DYNAMIC, THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(IO_CALLBACK, IO_CALLBACK, "", TG_DYNAMIC, THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(IO_CHANNEL, IO_CHANNEL, "", TG_DYNAMIC, THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(IO_HEALTH, IO_HEALTH, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(1 ,1), 100) -TG_DEF(IO_BENCHMARK, IO_BENCHMARK, "", TG_DYNAMIC, THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(replica_control, replica_control, "", TG_STATIC, THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(TIMEZONE_MGR, TimezoneMgr, "", TG_STATIC, TIMER) -TG_DEF(MASTER_KEY_MGR, MasterKeyMgr, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(1 ,1), 100) -TG_DEF(SRS_MGR, SrsMgr, "", TG_STATIC, TIMER, 128) +TG_DEF(MEMORY_DUMP, memDump, THREAD_POOL, 1) +TG_DEF(SchemaRefTask, SchemaRefTask, DEDUP_QUEUE, 1, 1024, 1024, 1L << 30, 512L << 20, common::OB_MALLOC_BIG_BLOCK_SIZE, "SchemaDedupQueu") +TG_DEF(ReqMemEvict, ReqMemEvict, TIMER) +TG_DEF(replica_control, replica_control, THREAD_POOL, 1) #endif diff --git a/deps/oblib/src/lib/thread/thread_mgr.cpp b/deps/oblib/src/lib/thread/thread_mgr.cpp index c32f6ce041..0f5d0e5710 100644 --- a/deps/oblib/src/lib/thread/thread_mgr.cpp +++ b/deps/oblib/src/lib/thread/thread_mgr.cpp @@ -30,7 +30,7 @@ void set_tenant_tg_helper(TGHelper *tg_helper) } // define TGConfig -#define TG_DEF(id, name, desc, scope, type, arg...) const ThreadCountPair TGConfig::id = ThreadCountPair(arg); +#define TG_DEF(id, name, type, arg...) const ThreadCountPair TGConfig::id = ThreadCountPair(arg); #include "lib/thread/thread_define.h" #undef TG_DEF @@ -44,10 +44,10 @@ void __attribute__((weak)) init_create_func() void lib_init_create_func() { - #define TG_DEF(id, name, desc, scope, type, args...) \ + #define TG_DEF(id, name, type, args...) \ create_funcs_[TGDefIDs::id] = []() { \ - auto ret = OB_NEW(TGCLSMap::CLS, SET_USE_500("tg"), args); \ - ret->attr_ = {#name, desc, TGScope::scope, TGType::type}; \ + auto ret = OB_NEW(TG_##type, SET_USE_500("tg"), args); \ + ret->attr_ = {#name, TGType::type}; \ return ret; \ }; #include "lib/thread/thread_define.h" @@ -64,12 +64,6 @@ TGMgr::TGMgr() for (int i = 0; OB_SUCC(ret) && i < TGDefIDs::END; i++) { int tg_id = -1; ret = create_tg(i, tg_id, 0); - if (OB_FAIL(ret)) { - } else if (tg_id < 0) { - // do-nothing - } else { - default_tg_id_map_[i] = tg_id; - } } abort_unless(OB_SUCCESS == ret); } diff --git a/deps/oblib/src/lib/thread/thread_mgr.h b/deps/oblib/src/lib/thread/thread_mgr.h index 0b4cfd0f15..f817014874 100644 --- a/deps/oblib/src/lib/thread/thread_mgr.h +++ b/deps/oblib/src/lib/thread/thread_mgr.h @@ -74,21 +74,12 @@ namespace TGDefIDs { }; } -enum class TGScope -{ - INVALID, - TG_STATIC, - TG_DYNAMIC, -}; - enum class TGType { INVALID, REENTRANT_THREAD_POOL, THREAD_POOL, - OB_THREAD_POOL, TIMER, - TIMER_GROUP, QUEUE_THREAD, DEDUP_QUEUE, ASYNC_TASK_QUEUE, @@ -99,10 +90,8 @@ class TGCommonAttr { public: const char *name_; - const char *desc_; - enum TGScope scope_; enum TGType type_; - TO_STRING_KV(KCSTRING_(name), KCSTRING_(desc), K_(scope), K_(type)); + TO_STRING_KV(KCSTRING_(name), K_(type)); }; class ITG; @@ -126,7 +115,7 @@ extern void set_tenant_tg_helper(TGHelper *tg_helper); class ITG { public: - ITG() : tg_helper_(nullptr), tg_cgroup_(lib::ThreadCGroup::INVALID_CGROUP) {} + ITG() : tg_helper_(nullptr) {} virtual ~ITG() {} int64_t get_tenant_id() const { return NULL == tg_helper_ ? common::OB_SERVER_TENANT_ID : tg_helper_->id(); } @@ -206,7 +195,6 @@ public: UNUSED(qsize); } TGHelper *tg_helper_; - lib::ThreadCGroup tg_cgroup_; TGCommonAttr attr_; }; @@ -229,14 +217,16 @@ public: TGRunnable *runnable_ = nullptr; }; -template<> -class TG : public ITG +class TG_REENTRANT_THREAD_POOL : public ITG { public: - TG(ThreadCountPair pair) + TG_REENTRANT_THREAD_POOL(ThreadCountPair pair) : thread_cnt_(pair.get_thread_cnt()) {} - ~TG() { destroy(); } + TG_REENTRANT_THREAD_POOL(int64_t thread_cnt) + : thread_cnt_(thread_cnt) + {} + ~TG_REENTRANT_THREAD_POOL() { destroy(); } int thread_cnt() override { return (int)thread_cnt_; } int set_thread_cnt(int64_t thread_cnt) { @@ -293,7 +283,7 @@ public: } else if(nullptr == th_->runnable_) { ret = common::OB_ERR_UNEXPECTED; } else { - th_->set_run_wrapper(tg_helper_, tg_cgroup_); + th_->set_run_wrapper(tg_helper_); ret = th_->create(thread_cnt_, attr_.name_); } return ret; @@ -337,14 +327,16 @@ public: TGRunnable *runnable_ = nullptr; }; -template<> -class TG : public ITG +class TG_THREAD_POOL : public ITG { public: - TG(ThreadCountPair pair) + TG_THREAD_POOL(ThreadCountPair pair) : thread_cnt_(pair.get_thread_cnt()) {} - ~TG() { destroy(); } + TG_THREAD_POOL(int64_t thread_cnt) + : thread_cnt_(thread_cnt) + {} + ~TG_THREAD_POOL() { destroy(); } int thread_cnt() override { return (int)thread_cnt_; } int set_thread_cnt(int64_t thread_cnt) { @@ -379,7 +371,7 @@ public: } else { th_->runnable_->set_stop(false); th_->set_thread_count(thread_cnt_); - th_->set_run_wrapper(tg_helper_, tg_cgroup_); + th_->set_run_wrapper(tg_helper_); ret = th_->start(); } return ret; @@ -432,15 +424,18 @@ public: TGTaskHandler *handler_ = nullptr; }; -template<> -class TG : public ITG +class TG_QUEUE_THREAD : public ITG { public: - TG(ThreadCountPair pair, const int64_t task_num_limit) + TG_QUEUE_THREAD(ThreadCountPair pair, const int64_t task_num_limit) : thread_num_(pair.get_thread_cnt()), task_num_limit_(task_num_limit) {} - ~TG() { destroy(); } + TG_QUEUE_THREAD(int64_t thread_num, const int64_t task_num_limit) + : thread_num_(thread_num), + task_num_limit_(task_num_limit) + {} + ~TG_QUEUE_THREAD() { destroy(); } int thread_cnt() override { return (int)thread_num_; } int set_thread_cnt(int64_t thread_cnt) override { @@ -462,7 +457,7 @@ public: } else { qth_ = new (buf_) MySimpleThreadPool(); qth_->handler_ = &handler; - qth_->set_run_wrapper(tg_helper_, tg_cgroup_); + qth_->set_run_wrapper(tg_helper_); ret = qth_->init(thread_num_, task_num_limit_, attr_.name_, tenant_id); } return ret; @@ -549,14 +544,16 @@ public: TGTaskHandler *handler_ = nullptr; }; -template<> -class TG : public ITG +class TG_MAP_QUEUE_THREAD : public ITG { public: - TG(ThreadCountPair pair) + TG_MAP_QUEUE_THREAD(ThreadCountPair pair) : thread_num_(pair.get_thread_cnt()) {} - ~TG() { destroy(); } + TG_MAP_QUEUE_THREAD(int64_t thread_num) + : thread_num_(thread_num) + {} + ~TG_MAP_QUEUE_THREAD() { destroy(); } int thread_cnt() override { return (int)thread_num_; } int set_thread_cnt(int64_t thread_cnt) override { @@ -578,7 +575,7 @@ public: } else { qth_ = new (buf_) MyMapQueueThreadPool(); qth_->handler_ = &handler; - qth_->set_run_wrapper(tg_helper_, tg_cgroup_); + qth_->set_run_wrapper(tg_helper_); ret = qth_->init(tenant_id, thread_num_, attr_.name_); } return ret; @@ -604,6 +601,7 @@ public: { if (qth_ != nullptr) { qth_->wait(); + destroy(); } } void destroy() @@ -631,11 +629,10 @@ private: int64_t thread_num_; }; -template<> -class TG : public ITG +class TG_DEDUP_QUEUE : public ITG { public: - TG(ThreadCountPair pair, + TG_DEDUP_QUEUE(ThreadCountPair pair, const int64_t queue_size = ObDedupQueue::TASK_QUEUE_SIZE, const int64_t task_map_size = ObDedupQueue::TASK_MAP_SIZE, const int64_t total_mem_limit = ObDedupQueue::TOTAL_LIMIT, @@ -650,7 +647,22 @@ public: page_size_(page_size), label_(label) {} - ~TG() { destroy(); } + TG_DEDUP_QUEUE(int64_t thread_num, + const int64_t queue_size = ObDedupQueue::TASK_QUEUE_SIZE, + const int64_t task_map_size = ObDedupQueue::TASK_MAP_SIZE, + const int64_t total_mem_limit = ObDedupQueue::TOTAL_LIMIT, + const int64_t hold_mem_limit = ObDedupQueue::HOLD_LIMIT, + const int64_t page_size = ObDedupQueue::PAGE_SIZE, + const char *label = nullptr) + : thread_num_(thread_num), + queue_size_(queue_size), + task_map_size_(task_map_size), + total_mem_limit_(total_mem_limit), + hold_mem_limit_(hold_mem_limit), + page_size_(page_size), + label_(label) + {} + ~TG_DEDUP_QUEUE() { destroy(); } int thread_cnt() override { return (int)thread_num_; } int set_thread_cnt(int64_t thread_cnt) override { @@ -670,7 +682,7 @@ public: ret = common::OB_ERR_UNEXPECTED; } else { qth_ = new (buf_) common::ObDedupQueue(); - qth_->set_run_wrapper(tg_helper_, tg_cgroup_); + qth_->set_run_wrapper(tg_helper_); if (OB_FAIL(qth_->init(thread_num_, attr_.name_, queue_size_, @@ -734,14 +746,13 @@ private: const char *label_; }; -template<> -class TG : public ITG +class TG_TIMER : public ITG { public: - TG(int64_t max_task_num = 32) + TG_TIMER(int64_t max_task_num = 32) : max_task_num_(max_task_num) {} - ~TG() { destroy(); } + ~TG_TIMER() { destroy(); } int thread_cnt() override { return 1; } int set_thread_cnt(int64_t thread_cnt) override { @@ -756,7 +767,7 @@ public: ret = common::OB_ERR_UNEXPECTED; } else { timer_ = new (buf_) common::ObTimer(max_task_num_); - timer_->set_run_wrapper(tg_helper_, tg_cgroup_); + timer_->set_run_wrapper(tg_helper_); if (OB_FAIL(timer_->init(attr_.name_, ObMemAttr(get_tenant_id(), "TGTimer")))) { OB_LOG(WARN, "init failed", K(ret)); @@ -832,15 +843,18 @@ private: int64_t max_task_num_; }; -template<> -class TG : public ITG +class TG_ASYNC_TASK_QUEUE : public ITG { public: - TG(lib::ThreadCountPair pair, const int64_t queue_size) + TG_ASYNC_TASK_QUEUE(lib::ThreadCountPair pair, const int64_t queue_size) : thread_cnt_(pair.get_thread_cnt()), queue_size_(queue_size) {} - ~TG() { destroy(); } + TG_ASYNC_TASK_QUEUE(int64_t thread_cnt, const int64_t queue_size) + : thread_cnt_(thread_cnt), + queue_size_(queue_size) + {} + ~TG_ASYNC_TASK_QUEUE() { destroy(); } int thread_cnt() override { return (int)thread_cnt_; } int set_thread_cnt(int64_t thread_cnt) override { @@ -860,7 +874,7 @@ public: ret = common::OB_ERR_UNEXPECTED; } else { qth_ = new (buf_) share::ObAsyncTaskQueue(); - qth_->set_run_wrapper(tg_helper_, tg_cgroup_); + qth_->set_run_wrapper(tg_helper_); if (OB_FAIL(qth_->init(thread_cnt_, queue_size_, attr_.name_))) { @@ -908,123 +922,8 @@ private: int64_t queue_size_; }; -template<> -class TG : public ITG -{ - static constexpr int MAX_CNT = 32; - using TimerType = TG; -public: - TG(ThreadCountPair pair) - : cnt_(pair.get_thread_cnt()) - {} - ~TG() { destroy(); } - int thread_cnt() override { return (int)cnt_; } - int set_thread_cnt(int64_t thread_cnt) - { - UNUSED(thread_cnt); - int ret = common::OB_ERR_UNEXPECTED; - return ret; - } - int start() override - { - int ret = common::OB_SUCCESS; - if (is_inited_) { - ret = common::OB_ERR_UNEXPECTED; - } else { - if (cnt_ >= MAX_CNT) { - ret = common::OB_ERR_UNEXPECTED; - } - for (int i = 0; OB_SUCC(ret) && i < cnt_; i++) { - auto *tmp = new (buf_ + sizeof(TimerType) * i) TimerType(); - tmp->attr_ = attr_; - if (OB_FAIL(tmp->start())) { - } else { - timers_[i] = tmp; - } - } - is_inited_ = true; - } - return ret; - } - void stop() override - { - if (is_inited_) { - for (int i = 0; i < cnt_; i++) { - if (timers_[i] != nullptr) { - timers_[i]->stop(); - } - } - } - } - void wait() override - { - if (is_inited_) { - for (int i = 0; i < cnt_; i++) { - if (timers_[i] != nullptr) { - timers_[i]->wait(); - } - } - destroy(); - } - } - int schedule(int idx, common::ObTimerTask &task, const int64_t delay, - bool repeate = false, bool immediate = false) override - { - int ret = common::OB_SUCCESS; - if (!is_inited_) { - ret = common::OB_ERR_UNEXPECTED; - } else if (idx >= cnt_) { - ret = common::OB_INVALID_ARGUMENT; - } else if (OB_ISNULL(timers_[idx])) { - ret = common::OB_ERR_UNEXPECTED; - } else { - ret = timers_[idx]->schedule(task, delay, repeate, immediate); - } - return ret; - } - void destroy() - { - if (is_inited_) { - for (int i = 0; i < cnt_; i++) { - if (timers_[i] != nullptr) { - timers_[i]->~TimerType(); - timers_[i] = nullptr; - } - } - is_inited_ = false; - } - } -private: - char buf_[sizeof(TimerType) * MAX_CNT]; - TimerType *timers_[MAX_CNT] = {nullptr}; - const int64_t cnt_ = 0; - bool is_inited_ = false; -}; - -template -class TGCLSMap; -#define BIND_TG_CLS(type, CLS_) \ - namespace lib \ - { \ - template<> \ - class TGCLSMap \ - { \ - public: \ - using CLS = CLS_; \ - }; \ - } } // end of namespace lib -BIND_TG_CLS(TGType::REENTRANT_THREAD_POOL, TG); -BIND_TG_CLS(TGType::THREAD_POOL, TG); -BIND_TG_CLS(TGType::QUEUE_THREAD, TG); -BIND_TG_CLS(TGType::DEDUP_QUEUE, TG); -BIND_TG_CLS(TGType::TIMER, TG); -BIND_TG_CLS(TGType::TIMER_GROUP, TG); -BIND_TG_CLS(TGType::ASYNC_TASK_QUEUE, TG); -BIND_TG_CLS(TGType::MAP_QUEUE_THREAD, TG); - - namespace lib { class TGMgr @@ -1071,8 +970,7 @@ public: // tenant isolation int create_tg_tenant(int tg_def_id, int &tg_id, - int64_t qsize = 0, - lib::ThreadCGroup cgroup = lib::ThreadCGroup::FRONT_CGROUP) + int64_t qsize = 0) { tg_id = -1; int ret = common::OB_SUCCESS; @@ -1097,7 +995,6 @@ public: OB_LOG(WARN, "create tg tenant but tenant tg helper is null", K(tg_def_id), K(tg_id)); } else { tg->tg_helper_ = tg_helper; - tg->tg_cgroup_ = cgroup; tg_helper->tg_create_cb(tg_id); } tg->set_queue_size(qsize); @@ -1118,7 +1015,6 @@ private: char bs_buf_[ABitSet::buf_len(MAX_ID)]; public: ITG *tgs_[MAX_ID] = {nullptr}; - int default_tg_id_map_[TGDefIDs::END] = {-1}; }; template @@ -1149,8 +1045,7 @@ public: OB_LOG(ERROR, "invalid tg id"); \ } else { \ lib::ITG *tg = \ - TG_MGR.tgs_[static_cast(tg_id) < static_cast(lib::TGDefIDs::END) ? \ - TG_MGR.default_tg_id_map_[tg_id] : tg_id]; \ + TG_MGR.tgs_[tg_id]; \ if (OB_ISNULL(tg)) { \ ret = common::OB_ERR_UNEXPECTED; \ } else { \ @@ -1221,7 +1116,7 @@ public: tg_type = tg->attr_.type_; \ } \ if (TGType::REENTRANT_THREAD_POOL == tg_type) { \ - TG* tmp_tg = static_cast*>(tg); \ + TG_REENTRANT_THREAD_POOL* tmp_tg = static_cast(tg); \ ret = tmp_tg->logical_start(); \ } else { \ ret = common::OB_ERR_UNEXPECTED; \ @@ -1239,7 +1134,7 @@ public: tg_type = tg->attr_.type_; \ } \ if (TGType::REENTRANT_THREAD_POOL == tg_type) { \ - TG* tmp_tg = static_cast*>(tg); \ + TG_REENTRANT_THREAD_POOL* tmp_tg = static_cast(tg); \ tmp_tg->logical_stop(); \ } else { \ ret = common::OB_ERR_UNEXPECTED; \ @@ -1256,7 +1151,7 @@ public: tg_type = tg->attr_.type_; \ } \ if (TGType::REENTRANT_THREAD_POOL == tg_type) { \ - TG* tmp_tg = static_cast*>(tg); \ + TG_REENTRANT_THREAD_POOL* tmp_tg = static_cast(tg); \ tmp_tg->logical_wait(); \ } else { \ ret = common::OB_ERR_UNEXPECTED; \ diff --git a/deps/oblib/src/lib/thread/threads.h b/deps/oblib/src/lib/thread/threads.h index 39ad3b39d8..cbe84b380e 100644 --- a/deps/oblib/src/lib/thread/threads.h +++ b/deps/oblib/src/lib/thread/threads.h @@ -25,13 +25,6 @@ extern int64_t global_thread_stack_size; namespace oceanbase { namespace lib { -enum ThreadCGroup -{ - INVALID_CGROUP = 0, - FRONT_CGROUP = 1, - BACK_CGROUP = 2, -}; - class Threads; class IRunWrapper { @@ -59,8 +52,7 @@ public: threads_(nullptr), stack_size_(global_thread_stack_size), stop_(true), - run_wrapper_(nullptr), - cgroup_(INVALID_CGROUP) + run_wrapper_(nullptr) {} virtual ~Threads(); static IRunWrapper *&get_expect_run_wrapper(); @@ -87,10 +79,9 @@ public: int init(); // IRunWrapper 用于创建多租户线程时指定租户上下文 // cgroup_ctrl 和IRunWrapper配合使用,实现多租户线程的CPU隔离 - void set_run_wrapper(IRunWrapper *run_wrapper, ThreadCGroup cgroup = ThreadCGroup::FRONT_CGROUP) + void set_run_wrapper(IRunWrapper *run_wrapper) { run_wrapper_ = run_wrapper; - cgroup_ = cgroup; } virtual int start(); virtual void stop(); @@ -105,7 +96,6 @@ public: int ret = OB_SUCCESS; return ret; } - ThreadCGroup get_cgroup() { return cgroup_; } virtual bool has_set_stop() const { IGNORE_RETURN lib::Thread::update_loop_ts(); @@ -143,9 +133,6 @@ private: common::SpinRWLock lock_ __attribute__((__aligned__(16))); // tenant ctx IRunWrapper *run_wrapper_; - // thread cgroups - ThreadCGroup cgroup_; - // }; using ThreadPool = Threads; diff --git a/deps/oblib/unittest/lib/thread/test_tg_mgr.cpp b/deps/oblib/unittest/lib/thread/test_tg_mgr.cpp index 74556f38be..0a76a717a7 100644 --- a/deps/oblib/unittest/lib/thread/test_tg_mgr.cpp +++ b/deps/oblib/unittest/lib/thread/test_tg_mgr.cpp @@ -271,46 +271,49 @@ TEST(TG, async_task_queue) ASSERT_FALSE(TG_EXIST(tg_id)); } -TEST(TG, timer_group) +class MapQueueThreadHandler : public TGTaskHandler +{ +public: + void handle(void *task) override + {} + void handle(void *task, volatile bool &is_stopped) override + { + UNUSED(task); + UNUSED(is_stopped); + ++handle_count_; + ::usleep(50000); + } + + int64_t handle_count_ = 0; +}; + +TEST(TG, map_queue_thread) { int tg_id = TGDefIDs::TEST6; - TestTimerTask tasks[2]; + MapQueueThreadHandler handler; // start + ASSERT_EQ(OB_SUCCESS, TG_SET_HANDLER(tg_id, handler)); ASSERT_EQ(OB_SUCCESS, TG_START(tg_id)); - for (int i = 0; i < ARRAYSIZEOF(tasks); i++) { - ASSERT_EQ(OB_SUCCESS, TG_SCHEDULE(tg_id, i, tasks[i], 0, true)); - } - ::usleep(40000); - for (int i = 0; i < ARRAYSIZEOF(tasks); i++) { - ASSERT_TRUE(tasks[i].running_); - } - ::usleep(60000); - for (int i = 0; i < ARRAYSIZEOF(tasks); i++) { - ASSERT_EQ(1, tasks[i].task_run_count_); - } + ASSERT_EQ(OB_SUCCESS, TG_PUSH_TASK(tg_id, &tg_id, 0)); + ::usleep(50000); ASSERT_EQ(OB_SUCCESS, TG_STOP_R(tg_id)); ASSERT_EQ(OB_SUCCESS, TG_WAIT_R(tg_id)); + ASSERT_EQ(1, handler.handle_count_); // restart + ASSERT_EQ(OB_SUCCESS, TG_SET_HANDLER(tg_id, handler)); ASSERT_EQ(OB_SUCCESS, TG_START(tg_id)); - for (int i = 0; i < ARRAYSIZEOF(tasks); i++) { - ASSERT_EQ(OB_SUCCESS, TG_SCHEDULE(tg_id, i, tasks[i], 0, true)); - } - ::usleep(40000); - for (int i = 0; i < ARRAYSIZEOF(tasks); i++) { - ASSERT_TRUE(tasks[i].running_); - } - ::usleep(60000); - for (int i = 0; i < ARRAYSIZEOF(tasks); i++) { - ASSERT_EQ(2, tasks[i].task_run_count_); - } + ASSERT_EQ(OB_SUCCESS, TG_PUSH_TASK(tg_id, &tg_id, 1)); + ::usleep(50000); ASSERT_EQ(OB_SUCCESS, TG_STOP_R(tg_id)); ASSERT_EQ(OB_SUCCESS, TG_WAIT_R(tg_id)); + ASSERT_EQ(2, handler.handle_count_); ASSERT_TRUE(TG_EXIST(tg_id)); TG_DESTROY(tg_id); ASSERT_FALSE(TG_EXIST(tg_id)); } + int main(int argc, char *argv[]) { oceanbase::common::ObLogger::get_logger().set_log_level("INFO"); diff --git a/src/logservice/archiveservice/ob_archive_fetcher.cpp b/src/logservice/archiveservice/ob_archive_fetcher.cpp index 322be9c275..3edb65c975 100644 --- a/src/logservice/archiveservice/ob_archive_fetcher.cpp +++ b/src/logservice/archiveservice/ob_archive_fetcher.cpp @@ -142,7 +142,7 @@ void ObArchiveFetcher::destroy() int ObArchiveFetcher::start() { int ret = OB_SUCCESS; - ObThreadPool::set_run_wrapper(MTL_CTX(), lib::ThreadCGroup::BACK_CGROUP); + ObThreadPool::set_run_wrapper(MTL_CTX()); if (OB_UNLIKELY(! inited_)) { ARCHIVE_LOG(ERROR, "ObArchiveFetcher not init"); ret = OB_NOT_INIT; diff --git a/src/logservice/archiveservice/ob_archive_sender.cpp b/src/logservice/archiveservice/ob_archive_sender.cpp index 373c98d2d9..b1d087ea17 100644 --- a/src/logservice/archiveservice/ob_archive_sender.cpp +++ b/src/logservice/archiveservice/ob_archive_sender.cpp @@ -116,7 +116,7 @@ void ObArchiveSender::destroy() int ObArchiveSender::start() { int ret = OB_SUCCESS; - ObThreadPool::set_run_wrapper(MTL_CTX(), lib::ThreadCGroup::BACK_CGROUP); + ObThreadPool::set_run_wrapper(MTL_CTX()); if (OB_UNLIKELY(! inited_)) { ret = OB_NOT_INIT; ARCHIVE_LOG(INFO, "ObArchiveSender has not been initialized", KR(ret)); diff --git a/src/logservice/archiveservice/ob_archive_sequencer.cpp b/src/logservice/archiveservice/ob_archive_sequencer.cpp index 4a9561eb0b..0892848786 100644 --- a/src/logservice/archiveservice/ob_archive_sequencer.cpp +++ b/src/logservice/archiveservice/ob_archive_sequencer.cpp @@ -90,7 +90,7 @@ void ObArchiveSequencer::destroy() int ObArchiveSequencer::start() { int ret = OB_SUCCESS; - ObThreadPool::set_run_wrapper(MTL_CTX(), lib::ThreadCGroup::BACK_CGROUP); + ObThreadPool::set_run_wrapper(MTL_CTX()); if (OB_UNLIKELY(! inited_)) { ARCHIVE_LOG(ERROR, "ObArchiveSequencer not init"); ret = OB_NOT_INIT; diff --git a/src/logservice/archiveservice/ob_archive_service.cpp b/src/logservice/archiveservice/ob_archive_service.cpp index 4ba8805859..b52ed74768 100644 --- a/src/logservice/archiveservice/ob_archive_service.cpp +++ b/src/logservice/archiveservice/ob_archive_service.cpp @@ -109,7 +109,7 @@ int ObArchiveService::init(logservice::ObLogService *log_service, int ObArchiveService::start() { int ret = OB_SUCCESS; - ObThreadPool::set_run_wrapper(MTL_CTX(), lib::ThreadCGroup::BACK_CGROUP); + ObThreadPool::set_run_wrapper(MTL_CTX()); if (OB_UNLIKELY(! inited_)) { ret = OB_NOT_INIT; ARCHIVE_LOG(ERROR, "archive service not init", K(ret)); diff --git a/src/logservice/archiveservice/ob_archive_timer.cpp b/src/logservice/archiveservice/ob_archive_timer.cpp index dfcc7eb07b..6cf259f347 100644 --- a/src/logservice/archiveservice/ob_archive_timer.cpp +++ b/src/logservice/archiveservice/ob_archive_timer.cpp @@ -73,7 +73,7 @@ void ObArchiveTimer::destroy() int ObArchiveTimer::start() { int ret = OB_SUCCESS; - ObThreadPool::set_run_wrapper(MTL_CTX(), lib::ThreadCGroup::BACK_CGROUP); + ObThreadPool::set_run_wrapper(MTL_CTX()); if (OB_UNLIKELY(! inited_)) { ret = OB_NOT_INIT; ARCHIVE_LOG(ERROR, "ObArchiveTimer not init", K(ret), K(inited_), K(tenant_id_)); diff --git a/src/logservice/archiveservice/ob_ls_mgr.cpp b/src/logservice/archiveservice/ob_ls_mgr.cpp index 7c4e9eca40..aa8c02a769 100644 --- a/src/logservice/archiveservice/ob_ls_mgr.cpp +++ b/src/logservice/archiveservice/ob_ls_mgr.cpp @@ -183,7 +183,7 @@ int ObArchiveLSMgr::init(const uint64_t tenant_id, int ObArchiveLSMgr::start() { int ret = OB_SUCCESS; - ObThreadPool::set_run_wrapper(MTL_CTX(), lib::ThreadCGroup::BACK_CGROUP); + ObThreadPool::set_run_wrapper(MTL_CTX()); if (OB_UNLIKELY(! inited_)) { ARCHIVE_LOG(ERROR, "ObArchiveLSMgr not init"); ret = OB_NOT_INIT; diff --git a/src/share/ob_thread_define.h b/src/share/ob_thread_define.h index b40513ce3a..22a117316b 100644 --- a/src/share/ob_thread_define.h +++ b/src/share/ob_thread_define.h @@ -14,128 +14,138 @@ #define GET_THREAD_NUM_BY_NPROCESSORS(factor) (sysconf(_SC_NPROCESSORS_ONLN) / (factor) > 0 ? sysconf(_SC_NPROCESSORS_ONLN) / (factor) : 1) #define GET_THREAD_NUM_BY_NPROCESSORS_WITH_LIMIT(factor, limit) (sysconf(_SC_NPROCESSORS_ONLN) / (factor) > 0 ? min(sysconf(_SC_NPROCESSORS_ONLN) / (factor), limit) : 1) #define GET_MYSQL_THREAD_COUNT(default_cnt) (GCONF.sql_login_thread_count ? GCONF.sql_login_thread_count : (default_cnt)) -TG_DEF(TEST_OB_TH, testObTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1 ,1)) -TG_DEF(COMMON_THREAD_POOL, ComTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1 ,1)) -TG_DEF(COMMON_QUEUE_THREAD, ComQueueTh, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(1 ,1), 100) -TG_DEF(COMMON_TIMER_THREAD, ComTimerTh, "", TG_STATIC, TIMER) -TG_DEF(Blacklist, Blacklist, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(PartSerMigRetryQt, PartSerMigRetryQt, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1)) -// TG_DEF(PartSerCb, PartSerCb, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(storage::ObCallbackQueueThread::QUEUE_THREAD_NUM, storage::ObCallbackQueueThread::MINI_MODE_QUEUE_THREAD_NUM), +TG_DEF(TEST_OB_TH, testObTh, THREAD_POOL, 1) +TG_DEF(COMMON_THREAD_POOL, ComTh, THREAD_POOL, 1) +TG_DEF(COMMON_QUEUE_THREAD, ComQueueTh, QUEUE_THREAD, 1, 100) +TG_DEF(COMMON_TIMER_THREAD, ComTimerTh, TIMER) +TG_DEF(Blacklist, Blacklist, THREAD_POOL, 1) +TG_DEF(PartSerMigRetryQt, PartSerMigRetryQt, THREAD_POOL, 1) +// TG_DEF(PartSerCb, PartSerCb, QUEUE_THREAD, ThreadCountPair(storage::ObCallbackQueueThread::QUEUE_THREAD_NUM, storage::ObCallbackQueueThread::MINI_MODE_QUEUE_THREAD_NUM), // (!lib::is_mini_mode() ? OB_MAX_PARTITION_NUM_PER_SERVER : OB_MINI_MODE_MAX_PARTITION_NUM_PER_SERVER) * 2) -// TG_DEF(PartSerLargeCb, PartSerLargeCb, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(storage::ObCallbackQueueThread::QUEUE_THREAD_NUM, storage::ObCallbackQueueThread::MINI_MODE_QUEUE_THREAD_NUM), +// TG_DEF(PartSerLargeCb, PartSerLargeCb, QUEUE_THREAD, ThreadCountPair(storage::ObCallbackQueueThread::QUEUE_THREAD_NUM, storage::ObCallbackQueueThread::MINI_MODE_QUEUE_THREAD_NUM), // (!lib::is_mini_mode() ? OB_MAX_PARTITION_NUM_PER_SERVER : OB_MINI_MODE_MAX_PARTITION_NUM_PER_SERVER) * 2) -TG_DEF(ReplayEngine, ReplayEngine, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(sysconf(_SC_NPROCESSORS_ONLN), 2), +TG_DEF(ReplayEngine, ReplayEngine, QUEUE_THREAD, ThreadCountPair(sysconf(_SC_NPROCESSORS_ONLN), 2), !lib::is_mini_mode() ? (common::REPLAY_TASK_QUEUE_SIZE + 1) * OB_MAX_PARTITION_NUM_PER_SERVER : (common::REPLAY_TASK_QUEUE_SIZE + 1) * OB_MINI_MODE_MAX_PARTITION_NUM_PER_SERVER) -TG_DEF(TransMigrate, TransMigrate, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS(24), 1), 10000) -TG_DEF(StandbyTimestampService, StandbyTimestampService, "", TG_DYNAMIC, OB_THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(WeakReadService, WeakRdSrv, "", TG_DYNAMIC, OB_THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(TransTaskWork, TransTaskWork, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS(12), 1), transaction::ObThreadLocalTransCtx::MAX_BIG_TRANS_TASK) -TG_DEF(DDLTaskExecutor3, DDLTaskExecutor3, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(8, 2)) -TG_DEF(TSWorker, TSWorker, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS(12), 1), transaction::ObTsWorker::MAX_TASK_NUM) -TG_DEF(BRPC, BRPC, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(obrpc::ObBatchRpc::MAX_THREAD_COUNT, obrpc::ObBatchRpc::MINI_MODE_THREAD_COUNT)) -TG_DEF(RLMGR, RLMGR, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(LeaseQueueTh, LeaseQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(observer::ObSrvDeliver::LEASE_TASK_THREAD_CNT, observer::ObSrvDeliver::MINI_MODE_LEASE_TASK_THREAD_CNT)) -TG_DEF(DDLQueueTh, DDLQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(observer::ObSrvDeliver::DDL_TASK_THREAD_CNT, observer::ObSrvDeliver::DDL_TASK_THREAD_CNT)) -TG_DEF(MysqlQueueTh, MysqlQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(GET_MYSQL_THREAD_COUNT(observer::ObSrvDeliver::MYSQL_TASK_THREAD_CNT), GET_MYSQL_THREAD_COUNT(observer::ObSrvDeliver::MINI_MODE_MYSQL_TASK_THREAD_CNT))) -TG_DEF(DDLPQueueTh, DDLPQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS_WITH_LIMIT(2, 24), 2)) -TG_DEF(DiagnoseQueueTh, DiagnoseQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(observer::ObSrvDeliver::MYSQL_DIAG_TASK_THREAD_CNT, observer::ObSrvDeliver::MINI_MODE_MYSQL_DIAG_TASK_THREAD_CNT)) -TG_DEF(DdlBuild, DdlBuild, "", TG_STATIC, ASYNC_TASK_QUEUE, ThreadCountPair(16, 1), 4 << 10) -TG_DEF(LSService, LSService, "", TG_STATIC, REENTRANT_THREAD_POOL, ThreadCountPair(2 ,2)) -TG_DEF(ObCreateStandbyFromNetActor, ObCreateStandbyFromNetActor, "", TG_STATIC, REENTRANT_THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(SimpleLSService, SimpleLSService, "", TG_STATIC, REENTRANT_THREAD_POOL, ThreadCountPair(1 ,1)) -TG_DEF(IntermResGC, IntermResGC, "", TG_STATIC, TIMER) -TG_DEF(ServerGTimer, ServerGTimer, "", TG_STATIC, TIMER) -TG_DEF(FreezeTimer, FreezeTimer, "", TG_STATIC, TIMER) -TG_DEF(SqlMemTimer, SqlMemTimer, "", TG_STATIC, TIMER) -TG_DEF(ServerTracerTimer, ServerTracerTimer, "", TG_STATIC, TIMER) -TG_DEF(RSqlPool, RSqlPool, "", TG_STATIC, TIMER) -TG_DEF(KVCacheWash, KVCacheWash, "", TG_STATIC, TIMER) -TG_DEF(KVCacheRep, KVCacheRep, "", TG_STATIC, TIMER) -TG_DEF(ObHeartbeat, ObHeartbeat, "", TG_STATIC, TIMER) -TG_DEF(PlanCacheEvict, PlanCacheEvict, "", TG_DYNAMIC, TIMER) -TG_DEF(TabletStatRpt, TabletStatRpt, "", TG_STATIC, TIMER) -TG_DEF(PsCacheEvict, PsCacheEvict, "", TG_DYNAMIC, TIMER) -TG_DEF(MergeLoop, MergeLoop, "", TG_STATIC, TIMER) -TG_DEF(SSTableGC, SSTableGC, "", TG_STATIC, TIMER) -TG_DEF(MediumLoop, MediumLoop, "", TG_STATIC, TIMER) -TG_DEF(WriteCkpt, WriteCkpt, "", TG_STATIC, TIMER) -TG_DEF(EXTLogWash, EXTLogWash, "", TG_STATIC, TIMER) -TG_DEF(LineCache, LineCache, "", TG_STATIC, TIMER) -TG_DEF(LocalityReload, LocalityReload, "", TG_STATIC, TIMER) -TG_DEF(MemstoreGC, MemstoreGC, "", TG_STATIC, TIMER) -TG_DEF(DiskUseReport, DiskUseReport, "", TG_STATIC, TIMER) -TG_DEF(CLOGReqMinor, CLOGReqMinor, "", TG_STATIC, TIMER) -TG_DEF(PGArchiveLog, PGArchiveLog, "", TG_STATIC, TIMER) -TG_DEF(CKPTLogRep, CKPTLogRep, "", TG_STATIC, TIMER) -TG_DEF(RebuildRetry, RebuildRetry, "", TG_STATIC, TIMER) -TG_DEF(TableMgrGC, TableMgrGC, "", TG_STATIC, TIMER) -TG_DEF(IndexSche, IndexSche, "", TG_STATIC, TIMER) -TG_DEF(FreInfoReload, FreInfoReload, "", TG_DYNAMIC, TIMER) -TG_DEF(HAGtsMgr, HAGtsMgr, "", TG_STATIC, TIMER) -TG_DEF(HAGtsHB, HAGtsHB, "", TG_STATIC, TIMER) -TG_DEF(RebuildTask, RebuildTask, "", TG_STATIC, TIMER) -TG_DEF(LogDiskMon, LogDiskMon, "", TG_DYNAMIC, TIMER) -TG_DEF(ILOGFlush, ILOGFlush, "", TG_STATIC, TIMER) -TG_DEF(ILOGPurge, ILOGPurge, "", TG_STATIC, TIMER) -TG_DEF(RLogClrCache, RLogClrCache, "", TG_STATIC, TIMER) -TG_DEF(TableStatRpt, TableStatRpt, "", TG_STATIC, TIMER) -TG_DEF(MacroMetaMgr, MacroMetaMgr, "", TG_STATIC, TIMER) -TG_DEF(StoreFileGC, StoreFileGC, "", TG_STATIC, TIMER) -TG_DEF(LeaseHB, LeaseHB, "", TG_STATIC, TIMER) -TG_DEF(ClusterTimer, ClusterTimer, "", TG_STATIC, TIMER) -TG_DEF(MergeTimer, MergeTimer, "", TG_STATIC, TIMER) -TG_DEF(CFC, CFC, "", TG_STATIC, TIMER) -TG_DEF(CCDF, CCDF, "", TG_STATIC, TIMER) -TG_DEF(LogMysqlPool, LogMysqlPool, "", TG_STATIC, TIMER) -TG_DEF(TblCliSqlPool, TblCliSqlPool, "", TG_STATIC, TIMER) -TG_DEF(QueryExecCtxGC, QueryExecCtxGC, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(DtlDfc, DtlDfc, "", TG_STATIC, TIMER) -TG_DEF(LogIOTaskCbThreadPool, LogIOCb, "", TG_STATIC, QUEUE_THREAD, +TG_DEF(TransMigrate, TransMigrate, QUEUE_THREAD, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS(24), 1), 10000) +TG_DEF(StandbyTimestampService, StandbyTimestampService, THREAD_POOL, 1) +TG_DEF(WeakReadService, WeakRdSrv, THREAD_POOL, 1) +TG_DEF(TransTaskWork, TransTaskWork, QUEUE_THREAD, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS(12), 1), transaction::ObThreadLocalTransCtx::MAX_BIG_TRANS_TASK) +TG_DEF(DDLTaskExecutor3, DDLTaskExecutor3, THREAD_POOL, ThreadCountPair(8, 2)) +TG_DEF(TSWorker, TSWorker, QUEUE_THREAD, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS(12), 1), transaction::ObTsWorker::MAX_TASK_NUM) +TG_DEF(BRPC, BRPC, THREAD_POOL, ThreadCountPair(obrpc::ObBatchRpc::MAX_THREAD_COUNT, obrpc::ObBatchRpc::MINI_MODE_THREAD_COUNT)) +TG_DEF(RLMGR, RLMGR, THREAD_POOL, 1) +TG_DEF(LeaseQueueTh, LeaseQueueTh, THREAD_POOL, ThreadCountPair(observer::ObSrvDeliver::LEASE_TASK_THREAD_CNT, observer::ObSrvDeliver::MINI_MODE_LEASE_TASK_THREAD_CNT)) +TG_DEF(DDLQueueTh, DDLQueueTh, THREAD_POOL, ThreadCountPair(observer::ObSrvDeliver::DDL_TASK_THREAD_CNT, observer::ObSrvDeliver::DDL_TASK_THREAD_CNT)) +TG_DEF(MysqlQueueTh, MysqlQueueTh, THREAD_POOL, ThreadCountPair(GET_MYSQL_THREAD_COUNT(observer::ObSrvDeliver::MYSQL_TASK_THREAD_CNT), GET_MYSQL_THREAD_COUNT(observer::ObSrvDeliver::MINI_MODE_MYSQL_TASK_THREAD_CNT))) +TG_DEF(DDLPQueueTh, DDLPQueueTh, THREAD_POOL, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS_WITH_LIMIT(2, 24), 2)) +TG_DEF(DiagnoseQueueTh, DiagnoseQueueTh, THREAD_POOL, ThreadCountPair(observer::ObSrvDeliver::MYSQL_DIAG_TASK_THREAD_CNT, observer::ObSrvDeliver::MINI_MODE_MYSQL_DIAG_TASK_THREAD_CNT)) +TG_DEF(DdlBuild, DdlBuild, ASYNC_TASK_QUEUE, ThreadCountPair(16, 1), 4 << 10) +TG_DEF(LSService, LSService, REENTRANT_THREAD_POOL, 2) +TG_DEF(ObCreateStandbyFromNetActor, ObCreateStandbyFromNetActor, REENTRANT_THREAD_POOL, 1) +TG_DEF(SimpleLSService, SimpleLSService, REENTRANT_THREAD_POOL, 1) +TG_DEF(IntermResGC, IntermResGC, TIMER) +TG_DEF(ServerGTimer, ServerGTimer, TIMER) +TG_DEF(FreezeTimer, FreezeTimer, TIMER) +TG_DEF(SqlMemTimer, SqlMemTimer, TIMER) +TG_DEF(ServerTracerTimer, ServerTracerTimer, TIMER) +TG_DEF(RSqlPool, RSqlPool, TIMER) +TG_DEF(KVCacheWash, KVCacheWash, TIMER) +TG_DEF(KVCacheRep, KVCacheRep, TIMER) +TG_DEF(ObHeartbeat, ObHeartbeat, TIMER) +TG_DEF(PlanCacheEvict, PlanCacheEvict, TIMER) +TG_DEF(TabletStatRpt, TabletStatRpt, TIMER) +TG_DEF(PsCacheEvict, PsCacheEvict, TIMER) +TG_DEF(MergeLoop, MergeLoop, TIMER) +TG_DEF(SSTableGC, SSTableGC, TIMER) +TG_DEF(MediumLoop, MediumLoop, TIMER) +TG_DEF(WriteCkpt, WriteCkpt, TIMER) +TG_DEF(EXTLogWash, EXTLogWash, TIMER) +TG_DEF(LineCache, LineCache, TIMER) +TG_DEF(LocalityReload, LocalityReload, TIMER) +TG_DEF(MemstoreGC, MemstoreGC, TIMER) +TG_DEF(DiskUseReport, DiskUseReport, TIMER) +TG_DEF(CLOGReqMinor, CLOGReqMinor, TIMER) +TG_DEF(PGArchiveLog, PGArchiveLog, TIMER) +TG_DEF(CKPTLogRep, CKPTLogRep, TIMER) +TG_DEF(RebuildRetry, RebuildRetry, TIMER) +TG_DEF(TableMgrGC, TableMgrGC, TIMER) +TG_DEF(IndexSche, IndexSche, TIMER) +TG_DEF(FreInfoReload, FreInfoReload, TIMER) +TG_DEF(HAGtsMgr, HAGtsMgr, TIMER) +TG_DEF(HAGtsHB, HAGtsHB, TIMER) +TG_DEF(RebuildTask, RebuildTask, TIMER) +TG_DEF(LogDiskMon, LogDiskMon, TIMER) +TG_DEF(ILOGFlush, ILOGFlush, TIMER) +TG_DEF(ILOGPurge, ILOGPurge, TIMER) +TG_DEF(RLogClrCache, RLogClrCache, TIMER) +TG_DEF(TableStatRpt, TableStatRpt, TIMER) +TG_DEF(MacroMetaMgr, MacroMetaMgr, TIMER) +TG_DEF(StoreFileGC, StoreFileGC, TIMER) +TG_DEF(LeaseHB, LeaseHB, TIMER) +TG_DEF(ClusterTimer, ClusterTimer, TIMER) +TG_DEF(MergeTimer, MergeTimer, TIMER) +TG_DEF(CFC, CFC, TIMER) +TG_DEF(CCDF, CCDF, TIMER) +TG_DEF(LogMysqlPool, LogMysqlPool, TIMER) +TG_DEF(TblCliSqlPool, TblCliSqlPool, TIMER) +TG_DEF(QueryExecCtxGC, QueryExecCtxGC, THREAD_POOL, 1) +TG_DEF(DtlDfc, DtlDfc, TIMER) +TG_DEF(LogIOTaskCbThreadPool, LogIOCb, QUEUE_THREAD, ThreadCountPair(palf::LogIOTaskCbThreadPool::THREAD_NUM, palf::LogIOTaskCbThreadPool::MINI_MODE_THREAD_NUM), palf::LogIOTaskCbThreadPool::MAX_LOG_IO_CB_TASK_NUM) -TG_DEF(ReplayService, ReplaySrv, "", TG_DYNAMIC, QUEUE_THREAD, ThreadCountPair(1, 1), +TG_DEF(ReplayService, ReplaySrv, QUEUE_THREAD, 1, !lib::is_mini_mode() ? (common::REPLAY_TASK_QUEUE_SIZE + 1) * OB_MAX_LS_NUM_PER_TENANT_PER_SERVER : (common::REPLAY_TASK_QUEUE_SIZE + 1) * OB_MINI_MODE_MAX_LS_NUM_PER_TENANT_PER_SERVER) -TG_DEF(LogRouteService, LogRouteSrv, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(1, 1), +TG_DEF(LogRouteService, LogRouteSrv, QUEUE_THREAD, 1, !lib::is_mini_mode() ? (common::MAX_SERVER_COUNT) * OB_MAX_LS_NUM_PER_TENANT_PER_SERVER : (common::MAX_SERVER_COUNT) * OB_MINI_MODE_MAX_LS_NUM_PER_TENANT_PER_SERVER) -TG_DEF(LogRouterTimer, LogRouterTimer, "", TG_STATIC, TIMER) -TG_DEF(LogFetcherLSWorker, LSWorker, "", TG_STATIC, MAP_QUEUE_THREAD, ThreadCountPair(4, 1)) -TG_DEF(LogFetcherIdlePool, LSIdlePool, "", TG_STATIC, MAP_QUEUE_THREAD, ThreadCountPair(1, 1)) -TG_DEF(LogFetcherDeadPool, LSDeadPool, "", TG_STATIC, MAP_QUEUE_THREAD, ThreadCountPair(1, 1)) -TG_DEF(LogFetcherTimer, LSTimer, "", TG_STATIC, TIMER) -TG_DEF(PalfBlockGC, PalfGC, "", TG_DYNAMIC, TIMER) -TG_DEF(LSFreeze, LSFreeze, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(storage::ObLSFreezeThread::QUEUE_THREAD_NUM, storage::ObLSFreezeThread::MINI_MODE_QUEUE_THREAD_NUM), +TG_DEF(LogRouterTimer, LogRouterTimer, TIMER) +TG_DEF(LogFetcherLSWorker, LSWorker, MAP_QUEUE_THREAD, ThreadCountPair(4, 1)) +TG_DEF(LogFetcherIdlePool, LSIdlePool, MAP_QUEUE_THREAD, 1) +TG_DEF(LogFetcherDeadPool, LSDeadPool, MAP_QUEUE_THREAD, 1) +TG_DEF(LogFetcherTimer, LSTimer, TIMER) +TG_DEF(PalfBlockGC, PalfGC, TIMER) +TG_DEF(LSFreeze, LSFreeze, QUEUE_THREAD, ThreadCountPair(storage::ObLSFreezeThread::QUEUE_THREAD_NUM, storage::ObLSFreezeThread::MINI_MODE_QUEUE_THREAD_NUM), storage::ObLSFreezeThread::MAX_FREE_TASK_NUM) -TG_DEF(LSFetchLogEngine, FetchLog, "", TG_DYNAMIC, QUEUE_THREAD, +TG_DEF(LSFetchLogEngine, FetchLog, QUEUE_THREAD, ThreadCountPair(palf::FetchLogEngine::FETCH_LOG_THREAD_COUNT, palf::FetchLogEngine::MINI_MODE_FETCH_LOG_THREAD_COUNT), !lib::is_mini_mode() ? palf::FetchLogEngine::FETCH_LOG_TASK_MAX_COUNT_PER_LS * OB_MAX_LS_NUM_PER_TENANT_PER_SERVER : palf::FetchLogEngine::FETCH_LOG_TASK_MAX_COUNT_PER_LS * OB_MINI_MODE_MAX_LS_NUM_PER_TENANT_PER_SERVER) -TG_DEF(DagScheduler, DagScheduler, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(DagWorker, DagWorker, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(RCService, RCSrv, "", TG_DYNAMIC, QUEUE_THREAD, +TG_DEF(DagScheduler, DagScheduler, THREAD_POOL, 1) +TG_DEF(DagWorker, DagWorker, THREAD_POOL, 1) +TG_DEF(RCService, RCSrv, QUEUE_THREAD, ThreadCountPair(logservice::ObRoleChangeService::MAX_THREAD_NUM, logservice::ObRoleChangeService::MAX_THREAD_NUM), logservice::ObRoleChangeService::MAX_RC_EVENT_TASK) -TG_DEF(ApplyService, ApplySrv, "", TG_DYNAMIC, QUEUE_THREAD, ThreadCountPair(1, 1), +TG_DEF(ApplyService, ApplySrv, QUEUE_THREAD, 1, !lib::is_mini_mode() ? (common::APPLY_TASK_QUEUE_SIZE + 1) * OB_MAX_LS_NUM_PER_TENANT_PER_SERVER : (common::APPLY_TASK_QUEUE_SIZE + 1) * OB_MINI_MODE_MAX_LS_NUM_PER_TENANT_PER_SERVER) -TG_DEF(GlobalCtxTimer, GlobalCtxTimer, "", TG_STATIC, TIMER) -TG_DEF(StorageLogWriter, StorageLogWriter, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(ReplayProcessStat, ReplayProcessStat, "", TG_STATIC, TIMER) -TG_DEF(ActiveSessHist, ActiveSessHist, "", TG_STATIC, TIMER) -TG_DEF(CTASCleanUpTimer, CTASCleanUpTimer, "", TG_STATIC, TIMER) -TG_DEF(DDLScanTask, DDLScanTask, "", TG_STATIC, TIMER) -TG_DEF(TenantLSMetaChecker, LSMetaCh, "", TG_STATIC, TIMER) -TG_DEF(TenantTabletMetaChecker, TbMetaCh, "", TG_STATIC, TIMER) -TG_DEF(ServerMetaChecker, SvrMetaCh, "", TG_STATIC, TIMER) -TG_DEF(ArbGCSTh, ArbGCTimerP, "", TG_STATIC, TIMER) -TG_DEF(DataDictTimer, DataDictTimer, "", TG_STATIC, TIMER) -TG_DEF(CDCService, CDCSrv, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1)) -TG_DEF(LogUpdater, LogUpdater, "", TG_STATIC, TIMER) -TG_DEF(HeartBeatCheckTask, HeartBeatCheckTask, "", TG_STATIC, TIMER) -TG_DEF(RedefHeartBeatTask, RedefHeartBeatTask, "", TG_STATIC, TIMER) -TG_DEF(MemDumpTimer, MemDumpTimer, "", TG_STATIC, TIMER) -TG_DEF(SSTableDefragment, SSTableDefragment, "", TG_STATIC, TIMER) -TG_DEF(TenantMetaMemMgr, TenantMetaMemMgr, "", TG_STATIC, TIMER) -TG_DEF(IngressService, IngressService, "", TG_STATIC, TIMER) -TG_DEF(HeartbeatService, HeartbeatService, "", TG_STATIC, REENTRANT_THREAD_POOL, ThreadCountPair(2, 2)) -TG_DEF(DetectManager, DetectManager, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1)) +TG_DEF(GlobalCtxTimer, GlobalCtxTimer, TIMER) +TG_DEF(StorageLogWriter, StorageLogWriter, THREAD_POOL, 1) +TG_DEF(ReplayProcessStat, ReplayProcessStat, TIMER) +TG_DEF(ActiveSessHist, ActiveSessHist, TIMER) +TG_DEF(CTASCleanUpTimer, CTASCleanUpTimer, TIMER) +TG_DEF(DDLScanTask, DDLScanTask, TIMER) +TG_DEF(TenantLSMetaChecker, LSMetaCh, TIMER) +TG_DEF(TenantTabletMetaChecker, TbMetaCh, TIMER) +TG_DEF(ServerMetaChecker, SvrMetaCh, TIMER) +TG_DEF(ArbGCSTh, ArbGCTimerP, TIMER) +TG_DEF(DataDictTimer, DataDictTimer, TIMER) +TG_DEF(CDCService, CDCSrv, THREAD_POOL, 1) +TG_DEF(LogUpdater, LogUpdater, TIMER) +TG_DEF(HeartBeatCheckTask, HeartBeatCheckTask, TIMER) +TG_DEF(RedefHeartBeatTask, RedefHeartBeatTask, TIMER) +TG_DEF(MemDumpTimer, MemDumpTimer, TIMER) +TG_DEF(SSTableDefragment, SSTableDefragment, TIMER) +TG_DEF(TenantMetaMemMgr, TenantMetaMemMgr, TIMER) +TG_DEF(IngressService, IngressService, TIMER) +TG_DEF(HeartbeatService, HeartbeatService, REENTRANT_THREAD_POOL, 2) +TG_DEF(DetectManager, DetectManager, THREAD_POOL, 1) +TG_DEF(CONFIG_MGR, ConfigMgr, TIMER, 1024) +TG_DEF(IO_TUNING, IO_TUNING, THREAD_POOL, 1) +TG_DEF(IO_SCHEDULE, IO_SCHEDULE, THREAD_POOL, 1) +TG_DEF(IO_CALLBACK, IO_CALLBACK, THREAD_POOL, 1) +TG_DEF(IO_CHANNEL, IO_CHANNEL, THREAD_POOL, 1) +TG_DEF(IO_HEALTH, IO_HEALTH, QUEUE_THREAD, 1, 100) +TG_DEF(IO_BENCHMARK, IO_BENCHMARK, THREAD_POOL, 1) +TG_DEF(TIMEZONE_MGR, TimezoneMgr, TIMER) +TG_DEF(MASTER_KEY_MGR, MasterKeyMgr, QUEUE_THREAD, 1, 100) +TG_DEF(SRS_MGR, SrsMgr, TIMER, 128) #endif diff --git a/src/share/ob_thread_mgr.cpp b/src/share/ob_thread_mgr.cpp index 0cb1eb3979..e6699262df 100644 --- a/src/share/ob_thread_mgr.cpp +++ b/src/share/ob_thread_mgr.cpp @@ -31,10 +31,10 @@ namespace share { void ob_init_create_func() { - #define TG_DEF(id, name, desc, scope, type, args...) \ + #define TG_DEF(id, name, type, args...) \ lib::create_funcs_[lib::TGDefIDs::id] = []() { \ - auto ret = OB_NEW(TGCLSMap::CLS, SET_USE_500("tg"), args); \ - ret->attr_ = {#name, desc, TGScope::scope, TGType::type}; \ + auto ret = OB_NEW(TG_##type, SET_USE_500("tg"), args); \ + ret->attr_ = {#name, TGType::type}; \ return ret; \ }; #include "share/ob_thread_define.h" diff --git a/src/share/ob_thread_mgr.h b/src/share/ob_thread_mgr.h index ede342ca34..012e0e3965 100644 --- a/src/share/ob_thread_mgr.h +++ b/src/share/ob_thread_mgr.h @@ -30,103 +30,6 @@ enum OBTGDefIDEnum }; } } // end of namespace lib - -namespace share -{ -using lib::TGType; -using lib::ITG; -template -class ObTG; - -class MyObThreadPool : public share::ObThreadPool -{ -public: - void run1() override - { - runnable_->set_thread_idx(get_thread_idx()); - runnable_->run1(); - } - lib::TGRunnable *runnable_ = nullptr; -}; - -template<> -class ObTG : public ITG -{ -public: - ObTG(lib::ThreadCountPair pair) - : thread_cnt_(pair.get_thread_cnt()) - {} - ~ObTG() { destroy(); } - int thread_cnt() override { return (int)thread_cnt_; } - int set_thread_cnt(int64_t thread_cnt) override - { - int ret = common::OB_SUCCESS; - if (th_ == nullptr) { - ret = common::OB_ERR_UNEXPECTED; - } else { - thread_cnt_ = thread_cnt; - th_->set_thread_count(thread_cnt_); - } - return ret; - } - int set_runnable(lib::TGRunnable &runnable) - { - int ret = common::OB_SUCCESS; - if (th_ != nullptr) { - ret = common::OB_ERR_UNEXPECTED; - } else { - th_ = new (buf_) MyObThreadPool(); - th_->runnable_= &runnable; - } - return ret; - } - int start() override - { - int ret = common::OB_SUCCESS; - if (nullptr == th_) { - ret = common::OB_ERR_UNEXPECTED; - } else if(nullptr == th_->runnable_) { - ret = common::OB_ERR_UNEXPECTED; - } else { - th_->runnable_->set_stop(false); - th_->set_thread_count(thread_cnt_); - th_->set_run_wrapper(tg_helper_, tg_cgroup_); - ret = th_->start(); - } - return ret; - } - void stop() override - { - if (th_ != nullptr) { - th_->runnable_->set_stop(true); - th_->stop(); - } - } - void wait() override - { - if (th_ != nullptr) { - th_->wait(); - destroy(); - } - } - void destroy() - { - if (th_ != nullptr) { - th_->destroy(); - th_->~MyObThreadPool(); - th_ = nullptr; - } - } -private: - char buf_[sizeof(MyObThreadPool)]; - MyObThreadPool *th_ = nullptr; - int64_t thread_cnt_; -}; - -} // end of namespace share - -BIND_TG_CLS(lib::TGType::OB_THREAD_POOL, share::ObTG); - } // end of namespace oceanbase #endif // OB_THREAD_MGR_H_ diff --git a/src/share/rc/ob_tenant_base.cpp b/src/share/rc/ob_tenant_base.cpp index 70783aebba..ff1ce50862 100644 --- a/src/share/rc/ob_tenant_base.cpp +++ b/src/share/rc/ob_tenant_base.cpp @@ -270,10 +270,8 @@ void ObTenantBase::destroy_mtl_module() created_ = false; } -ObCgroupCtrl *ObTenantBase::get_cgroup(lib::ThreadCGroup cgroup) +ObCgroupCtrl *ObTenantBase::get_cgroup() { - // TODO - UNUSED(cgroup); ObCgroupCtrl *cgroup_ctrl = nullptr; cgroup_ctrl = cgroups_; return cgroup_ctrl; @@ -283,7 +281,7 @@ int ObTenantBase::pre_run(lib::Threads *th) { int ret = OB_SUCCESS; ObTenantEnv::set_tenant(this); - ObCgroupCtrl *cgroup_ctrl = get_cgroup(th->get_cgroup()); + ObCgroupCtrl *cgroup_ctrl = get_cgroup(); if (cgroup_ctrl != nullptr) { ret = cgroup_ctrl->add_self_to_cgroup(id_); } @@ -296,7 +294,7 @@ int ObTenantBase::end_run(lib::Threads *th) { int ret = OB_SUCCESS; ObTenantEnv::set_tenant(nullptr); - ObCgroupCtrl *cgroup_ctrl = get_cgroup(th->get_cgroup()); + ObCgroupCtrl *cgroup_ctrl = get_cgroup(); if (cgroup_ctrl != nullptr) { ret = cgroup_ctrl->remove_self_from_cgroup(id_); } diff --git a/src/share/rc/ob_tenant_base.h b/src/share/rc/ob_tenant_base.h index c180464bf2..4f00d8d8bb 100644 --- a/src/share/rc/ob_tenant_base.h +++ b/src/share/rc/ob_tenant_base.h @@ -416,7 +416,7 @@ public: int init(ObCgroupCtrl *cgroup = nullptr); void destroy(); virtual inline uint64_t id() const override { return id_; } - ObCgroupCtrl *get_cgroup(lib::ThreadCGroup cgroup); + ObCgroupCtrl *get_cgroup(); const ObTenantModuleInitCtx *get_mtl_init_ctx() const { return mtl_init_ctx_; } diff --git a/src/share/scheduler/ob_dag_scheduler.cpp b/src/share/scheduler/ob_dag_scheduler.cpp index 1a017d7199..0aea9ac4dd 100644 --- a/src/share/scheduler/ob_dag_scheduler.cpp +++ b/src/share/scheduler/ob_dag_scheduler.cpp @@ -1312,7 +1312,7 @@ int ObTenantDagWorker::init(const int64_t check_period) COMMON_LOG(WARN, "dag worker is inited twice", K(ret)); } else if (OB_FAIL(cond_.init(ObWaitEventIds::DAG_WORKER_COND_WAIT))) { COMMON_LOG(WARN, "failed to init cond", K(ret)); - } else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::DagWorker, tg_id_, lib::ThreadCGroup::BACK_CGROUP))) { + } else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::DagWorker, tg_id_))) { COMMON_LOG(WARN, "TG create dag worker failed", K(ret)); } else { check_period_ = check_period; @@ -1599,7 +1599,7 @@ int ObTenantDagScheduler::init( get_default_config(); dag_limit_ = dag_limit; - if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::DagScheduler, tg_id_, lib::ThreadCGroup::BACK_CGROUP))) { + if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::DagScheduler, tg_id_))) { COMMON_LOG(WARN, "TG create dag scheduler failed", K(ret)); }