finetune thread_mgr interface

This commit is contained in:
zhjc1124
2023-06-02 07:47:52 +00:00
committed by ob-robot
parent 72b490a340
commit 972185dc81
17 changed files with 248 additions and 468 deletions

View File

@ -11,29 +11,19 @@
*/
#ifdef TG_DEF
// (tg_def_id, name, desc, scope, type, args...)
// (tg_def_id, name, type, args...)
// for test
TG_DEF(TEST1, test1, "", TG_STATIC, TIMER)
TG_DEF(TEST2, test2, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(1 ,1), 10)
TG_DEF(TEST3, test3, "", TG_STATIC, DEDUP_QUEUE, ThreadCountPair(1, 1), 8, 8, 16L << 20, 16L << 20,8192, "test")
TG_DEF(TEST4, test4, "", TG_STATIC, THREAD_POOL, ThreadCountPair(1 ,1))
TG_DEF(TEST5, test5, "", TG_STATIC, ASYNC_TASK_QUEUE, ThreadCountPair(1 ,1), 16)
TG_DEF(TEST6, test6, "", TG_STATIC, TIMER_GROUP, ThreadCountPair(2 ,2))
TG_DEF(TEST7, test7, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(10 ,10), 10)
TG_DEF(TEST8, test8, "", TG_STATIC, REENTRANT_THREAD_POOL, ThreadCountPair(1 ,1))
TG_DEF(TEST1, test1, TIMER)
TG_DEF(TEST2, test2, QUEUE_THREAD, 1, 10)
TG_DEF(TEST3, test3, DEDUP_QUEUE, 1, 8, 8, 16L << 20, 16L << 20,8192, "test")
TG_DEF(TEST4, test4, THREAD_POOL, 1)
TG_DEF(TEST5, test5, ASYNC_TASK_QUEUE, 1, 16)
TG_DEF(TEST6, test6, MAP_QUEUE_THREAD, 2)
TG_DEF(TEST7, test7, QUEUE_THREAD, 10, 10)
TG_DEF(TEST8, test8, REENTRANT_THREAD_POOL, 1)
// other
TG_DEF(MEMORY_DUMP, memDump, "", TG_STATIC, THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(SchemaRefTask, SchemaRefTask, "", TG_STATIC, DEDUP_QUEUE, ThreadCountPair(1, 1), 1024, 1024, 1L << 30, 512L << 20, common::OB_MALLOC_BIG_BLOCK_SIZE, "SchemaDedupQueu")
TG_DEF(CONFIG_MGR, ConfigMgr, "", TG_STATIC, TIMER, 1024)
TG_DEF(ReqMemEvict, ReqMemEvict, "", TG_DYNAMIC, TIMER)
TG_DEF(IO_TUNING, IO_TUNING, "", TG_STATIC, THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(IO_SCHEDULE, IO_SCHEDULE, "", TG_DYNAMIC, THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(IO_CALLBACK, IO_CALLBACK, "", TG_DYNAMIC, THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(IO_CHANNEL, IO_CHANNEL, "", TG_DYNAMIC, THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(IO_HEALTH, IO_HEALTH, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(1 ,1), 100)
TG_DEF(IO_BENCHMARK, IO_BENCHMARK, "", TG_DYNAMIC, THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(replica_control, replica_control, "", TG_STATIC, THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(TIMEZONE_MGR, TimezoneMgr, "", TG_STATIC, TIMER)
TG_DEF(MASTER_KEY_MGR, MasterKeyMgr, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(1 ,1), 100)
TG_DEF(SRS_MGR, SrsMgr, "", TG_STATIC, TIMER, 128)
TG_DEF(MEMORY_DUMP, memDump, THREAD_POOL, 1)
TG_DEF(SchemaRefTask, SchemaRefTask, DEDUP_QUEUE, 1, 1024, 1024, 1L << 30, 512L << 20, common::OB_MALLOC_BIG_BLOCK_SIZE, "SchemaDedupQueu")
TG_DEF(ReqMemEvict, ReqMemEvict, TIMER)
TG_DEF(replica_control, replica_control, THREAD_POOL, 1)
#endif

View File

@ -30,7 +30,7 @@ void set_tenant_tg_helper(TGHelper *tg_helper)
}
// define TGConfig
#define TG_DEF(id, name, desc, scope, type, arg...) const ThreadCountPair TGConfig::id = ThreadCountPair(arg);
#define TG_DEF(id, name, type, arg...) const ThreadCountPair TGConfig::id = ThreadCountPair(arg);
#include "lib/thread/thread_define.h"
#undef TG_DEF
@ -44,10 +44,10 @@ void __attribute__((weak)) init_create_func()
void lib_init_create_func()
{
#define TG_DEF(id, name, desc, scope, type, args...) \
#define TG_DEF(id, name, type, args...) \
create_funcs_[TGDefIDs::id] = []() { \
auto ret = OB_NEW(TGCLSMap<TGType::type>::CLS, SET_USE_500("tg"), args); \
ret->attr_ = {#name, desc, TGScope::scope, TGType::type}; \
auto ret = OB_NEW(TG_##type, SET_USE_500("tg"), args); \
ret->attr_ = {#name, TGType::type}; \
return ret; \
};
#include "lib/thread/thread_define.h"
@ -64,12 +64,6 @@ TGMgr::TGMgr()
for (int i = 0; OB_SUCC(ret) && i < TGDefIDs::END; i++) {
int tg_id = -1;
ret = create_tg(i, tg_id, 0);
if (OB_FAIL(ret)) {
} else if (tg_id < 0) {
// do-nothing
} else {
default_tg_id_map_[i] = tg_id;
}
}
abort_unless(OB_SUCCESS == ret);
}

View File

@ -74,21 +74,12 @@ namespace TGDefIDs {
};
}
enum class TGScope
{
INVALID,
TG_STATIC,
TG_DYNAMIC,
};
enum class TGType
{
INVALID,
REENTRANT_THREAD_POOL,
THREAD_POOL,
OB_THREAD_POOL,
TIMER,
TIMER_GROUP,
QUEUE_THREAD,
DEDUP_QUEUE,
ASYNC_TASK_QUEUE,
@ -99,10 +90,8 @@ class TGCommonAttr
{
public:
const char *name_;
const char *desc_;
enum TGScope scope_;
enum TGType type_;
TO_STRING_KV(KCSTRING_(name), KCSTRING_(desc), K_(scope), K_(type));
TO_STRING_KV(KCSTRING_(name), K_(type));
};
class ITG;
@ -126,7 +115,7 @@ extern void set_tenant_tg_helper(TGHelper *tg_helper);
class ITG
{
public:
ITG() : tg_helper_(nullptr), tg_cgroup_(lib::ThreadCGroup::INVALID_CGROUP) {}
ITG() : tg_helper_(nullptr) {}
virtual ~ITG() {}
int64_t get_tenant_id() const
{ return NULL == tg_helper_ ? common::OB_SERVER_TENANT_ID : tg_helper_->id(); }
@ -206,7 +195,6 @@ public:
UNUSED(qsize);
}
TGHelper *tg_helper_;
lib::ThreadCGroup tg_cgroup_;
TGCommonAttr attr_;
};
@ -229,14 +217,16 @@ public:
TGRunnable *runnable_ = nullptr;
};
template<>
class TG<TGType::REENTRANT_THREAD_POOL> : public ITG
class TG_REENTRANT_THREAD_POOL : public ITG
{
public:
TG(ThreadCountPair pair)
TG_REENTRANT_THREAD_POOL(ThreadCountPair pair)
: thread_cnt_(pair.get_thread_cnt())
{}
~TG() { destroy(); }
TG_REENTRANT_THREAD_POOL(int64_t thread_cnt)
: thread_cnt_(thread_cnt)
{}
~TG_REENTRANT_THREAD_POOL() { destroy(); }
int thread_cnt() override { return (int)thread_cnt_; }
int set_thread_cnt(int64_t thread_cnt)
{
@ -293,7 +283,7 @@ public:
} else if(nullptr == th_->runnable_) {
ret = common::OB_ERR_UNEXPECTED;
} else {
th_->set_run_wrapper(tg_helper_, tg_cgroup_);
th_->set_run_wrapper(tg_helper_);
ret = th_->create(thread_cnt_, attr_.name_);
}
return ret;
@ -337,14 +327,16 @@ public:
TGRunnable *runnable_ = nullptr;
};
template<>
class TG<TGType::THREAD_POOL> : public ITG
class TG_THREAD_POOL : public ITG
{
public:
TG(ThreadCountPair pair)
TG_THREAD_POOL(ThreadCountPair pair)
: thread_cnt_(pair.get_thread_cnt())
{}
~TG() { destroy(); }
TG_THREAD_POOL(int64_t thread_cnt)
: thread_cnt_(thread_cnt)
{}
~TG_THREAD_POOL() { destroy(); }
int thread_cnt() override { return (int)thread_cnt_; }
int set_thread_cnt(int64_t thread_cnt)
{
@ -379,7 +371,7 @@ public:
} else {
th_->runnable_->set_stop(false);
th_->set_thread_count(thread_cnt_);
th_->set_run_wrapper(tg_helper_, tg_cgroup_);
th_->set_run_wrapper(tg_helper_);
ret = th_->start();
}
return ret;
@ -432,15 +424,18 @@ public:
TGTaskHandler *handler_ = nullptr;
};
template<>
class TG<TGType::QUEUE_THREAD> : public ITG
class TG_QUEUE_THREAD : public ITG
{
public:
TG(ThreadCountPair pair, const int64_t task_num_limit)
TG_QUEUE_THREAD(ThreadCountPair pair, const int64_t task_num_limit)
: thread_num_(pair.get_thread_cnt()),
task_num_limit_(task_num_limit)
{}
~TG() { destroy(); }
TG_QUEUE_THREAD(int64_t thread_num, const int64_t task_num_limit)
: thread_num_(thread_num),
task_num_limit_(task_num_limit)
{}
~TG_QUEUE_THREAD() { destroy(); }
int thread_cnt() override { return (int)thread_num_; }
int set_thread_cnt(int64_t thread_cnt) override
{
@ -462,7 +457,7 @@ public:
} else {
qth_ = new (buf_) MySimpleThreadPool();
qth_->handler_ = &handler;
qth_->set_run_wrapper(tg_helper_, tg_cgroup_);
qth_->set_run_wrapper(tg_helper_);
ret = qth_->init(thread_num_, task_num_limit_, attr_.name_, tenant_id);
}
return ret;
@ -549,14 +544,16 @@ public:
TGTaskHandler *handler_ = nullptr;
};
template<>
class TG<TGType::MAP_QUEUE_THREAD> : public ITG
class TG_MAP_QUEUE_THREAD : public ITG
{
public:
TG(ThreadCountPair pair)
TG_MAP_QUEUE_THREAD(ThreadCountPair pair)
: thread_num_(pair.get_thread_cnt())
{}
~TG() { destroy(); }
TG_MAP_QUEUE_THREAD(int64_t thread_num)
: thread_num_(thread_num)
{}
~TG_MAP_QUEUE_THREAD() { destroy(); }
int thread_cnt() override { return (int)thread_num_; }
int set_thread_cnt(int64_t thread_cnt) override
{
@ -578,7 +575,7 @@ public:
} else {
qth_ = new (buf_) MyMapQueueThreadPool();
qth_->handler_ = &handler;
qth_->set_run_wrapper(tg_helper_, tg_cgroup_);
qth_->set_run_wrapper(tg_helper_);
ret = qth_->init(tenant_id, thread_num_, attr_.name_);
}
return ret;
@ -604,6 +601,7 @@ public:
{
if (qth_ != nullptr) {
qth_->wait();
destroy();
}
}
void destroy()
@ -631,11 +629,10 @@ private:
int64_t thread_num_;
};
template<>
class TG<TGType::DEDUP_QUEUE> : public ITG
class TG_DEDUP_QUEUE : public ITG
{
public:
TG(ThreadCountPair pair,
TG_DEDUP_QUEUE(ThreadCountPair pair,
const int64_t queue_size = ObDedupQueue::TASK_QUEUE_SIZE,
const int64_t task_map_size = ObDedupQueue::TASK_MAP_SIZE,
const int64_t total_mem_limit = ObDedupQueue::TOTAL_LIMIT,
@ -650,7 +647,22 @@ public:
page_size_(page_size),
label_(label)
{}
~TG() { destroy(); }
TG_DEDUP_QUEUE(int64_t thread_num,
const int64_t queue_size = ObDedupQueue::TASK_QUEUE_SIZE,
const int64_t task_map_size = ObDedupQueue::TASK_MAP_SIZE,
const int64_t total_mem_limit = ObDedupQueue::TOTAL_LIMIT,
const int64_t hold_mem_limit = ObDedupQueue::HOLD_LIMIT,
const int64_t page_size = ObDedupQueue::PAGE_SIZE,
const char *label = nullptr)
: thread_num_(thread_num),
queue_size_(queue_size),
task_map_size_(task_map_size),
total_mem_limit_(total_mem_limit),
hold_mem_limit_(hold_mem_limit),
page_size_(page_size),
label_(label)
{}
~TG_DEDUP_QUEUE() { destroy(); }
int thread_cnt() override { return (int)thread_num_; }
int set_thread_cnt(int64_t thread_cnt) override
{
@ -670,7 +682,7 @@ public:
ret = common::OB_ERR_UNEXPECTED;
} else {
qth_ = new (buf_) common::ObDedupQueue();
qth_->set_run_wrapper(tg_helper_, tg_cgroup_);
qth_->set_run_wrapper(tg_helper_);
if (OB_FAIL(qth_->init(thread_num_,
attr_.name_,
queue_size_,
@ -734,14 +746,13 @@ private:
const char *label_;
};
template<>
class TG<TGType::TIMER> : public ITG
class TG_TIMER : public ITG
{
public:
TG(int64_t max_task_num = 32)
TG_TIMER(int64_t max_task_num = 32)
: max_task_num_(max_task_num)
{}
~TG() { destroy(); }
~TG_TIMER() { destroy(); }
int thread_cnt() override { return 1; }
int set_thread_cnt(int64_t thread_cnt) override
{
@ -756,7 +767,7 @@ public:
ret = common::OB_ERR_UNEXPECTED;
} else {
timer_ = new (buf_) common::ObTimer(max_task_num_);
timer_->set_run_wrapper(tg_helper_, tg_cgroup_);
timer_->set_run_wrapper(tg_helper_);
if (OB_FAIL(timer_->init(attr_.name_,
ObMemAttr(get_tenant_id(), "TGTimer")))) {
OB_LOG(WARN, "init failed", K(ret));
@ -832,15 +843,18 @@ private:
int64_t max_task_num_;
};
template<>
class TG<TGType::ASYNC_TASK_QUEUE> : public ITG
class TG_ASYNC_TASK_QUEUE : public ITG
{
public:
TG(lib::ThreadCountPair pair, const int64_t queue_size)
TG_ASYNC_TASK_QUEUE(lib::ThreadCountPair pair, const int64_t queue_size)
: thread_cnt_(pair.get_thread_cnt()),
queue_size_(queue_size)
{}
~TG() { destroy(); }
TG_ASYNC_TASK_QUEUE(int64_t thread_cnt, const int64_t queue_size)
: thread_cnt_(thread_cnt),
queue_size_(queue_size)
{}
~TG_ASYNC_TASK_QUEUE() { destroy(); }
int thread_cnt() override { return (int)thread_cnt_; }
int set_thread_cnt(int64_t thread_cnt) override
{
@ -860,7 +874,7 @@ public:
ret = common::OB_ERR_UNEXPECTED;
} else {
qth_ = new (buf_) share::ObAsyncTaskQueue();
qth_->set_run_wrapper(tg_helper_, tg_cgroup_);
qth_->set_run_wrapper(tg_helper_);
if (OB_FAIL(qth_->init(thread_cnt_,
queue_size_,
attr_.name_))) {
@ -908,123 +922,8 @@ private:
int64_t queue_size_;
};
template<>
class TG<TGType::TIMER_GROUP> : public ITG
{
static constexpr int MAX_CNT = 32;
using TimerType = TG<TGType::TIMER>;
public:
TG(ThreadCountPair pair)
: cnt_(pair.get_thread_cnt())
{}
~TG() { destroy(); }
int thread_cnt() override { return (int)cnt_; }
int set_thread_cnt(int64_t thread_cnt)
{
UNUSED(thread_cnt);
int ret = common::OB_ERR_UNEXPECTED;
return ret;
}
int start() override
{
int ret = common::OB_SUCCESS;
if (is_inited_) {
ret = common::OB_ERR_UNEXPECTED;
} else {
if (cnt_ >= MAX_CNT) {
ret = common::OB_ERR_UNEXPECTED;
}
for (int i = 0; OB_SUCC(ret) && i < cnt_; i++) {
auto *tmp = new (buf_ + sizeof(TimerType) * i) TimerType();
tmp->attr_ = attr_;
if (OB_FAIL(tmp->start())) {
} else {
timers_[i] = tmp;
}
}
is_inited_ = true;
}
return ret;
}
void stop() override
{
if (is_inited_) {
for (int i = 0; i < cnt_; i++) {
if (timers_[i] != nullptr) {
timers_[i]->stop();
}
}
}
}
void wait() override
{
if (is_inited_) {
for (int i = 0; i < cnt_; i++) {
if (timers_[i] != nullptr) {
timers_[i]->wait();
}
}
destroy();
}
}
int schedule(int idx, common::ObTimerTask &task, const int64_t delay,
bool repeate = false, bool immediate = false) override
{
int ret = common::OB_SUCCESS;
if (!is_inited_) {
ret = common::OB_ERR_UNEXPECTED;
} else if (idx >= cnt_) {
ret = common::OB_INVALID_ARGUMENT;
} else if (OB_ISNULL(timers_[idx])) {
ret = common::OB_ERR_UNEXPECTED;
} else {
ret = timers_[idx]->schedule(task, delay, repeate, immediate);
}
return ret;
}
void destroy()
{
if (is_inited_) {
for (int i = 0; i < cnt_; i++) {
if (timers_[i] != nullptr) {
timers_[i]->~TimerType();
timers_[i] = nullptr;
}
}
is_inited_ = false;
}
}
private:
char buf_[sizeof(TimerType) * MAX_CNT];
TimerType *timers_[MAX_CNT] = {nullptr};
const int64_t cnt_ = 0;
bool is_inited_ = false;
};
template<enum TGType type>
class TGCLSMap;
#define BIND_TG_CLS(type, CLS_) \
namespace lib \
{ \
template<> \
class TGCLSMap<type> \
{ \
public: \
using CLS = CLS_; \
}; \
}
} // end of namespace lib
BIND_TG_CLS(TGType::REENTRANT_THREAD_POOL, TG<TGType::REENTRANT_THREAD_POOL>);
BIND_TG_CLS(TGType::THREAD_POOL, TG<TGType::THREAD_POOL>);
BIND_TG_CLS(TGType::QUEUE_THREAD, TG<TGType::QUEUE_THREAD>);
BIND_TG_CLS(TGType::DEDUP_QUEUE, TG<TGType::DEDUP_QUEUE>);
BIND_TG_CLS(TGType::TIMER, TG<TGType::TIMER>);
BIND_TG_CLS(TGType::TIMER_GROUP, TG<TGType::TIMER_GROUP>);
BIND_TG_CLS(TGType::ASYNC_TASK_QUEUE, TG<TGType::ASYNC_TASK_QUEUE>);
BIND_TG_CLS(TGType::MAP_QUEUE_THREAD, TG<TGType::MAP_QUEUE_THREAD>);
namespace lib {
class TGMgr
@ -1071,8 +970,7 @@ public:
// tenant isolation
int create_tg_tenant(int tg_def_id,
int &tg_id,
int64_t qsize = 0,
lib::ThreadCGroup cgroup = lib::ThreadCGroup::FRONT_CGROUP)
int64_t qsize = 0)
{
tg_id = -1;
int ret = common::OB_SUCCESS;
@ -1097,7 +995,6 @@ public:
OB_LOG(WARN, "create tg tenant but tenant tg helper is null", K(tg_def_id), K(tg_id));
} else {
tg->tg_helper_ = tg_helper;
tg->tg_cgroup_ = cgroup;
tg_helper->tg_create_cb(tg_id);
}
tg->set_queue_size(qsize);
@ -1118,7 +1015,6 @@ private:
char bs_buf_[ABitSet::buf_len(MAX_ID)];
public:
ITG *tgs_[MAX_ID] = {nullptr};
int default_tg_id_map_[TGDefIDs::END] = {-1};
};
template<typename Func, bool return_void=false>
@ -1149,8 +1045,7 @@ public:
OB_LOG(ERROR, "invalid tg id"); \
} else { \
lib::ITG *tg = \
TG_MGR.tgs_[static_cast<int>(tg_id) < static_cast<int>(lib::TGDefIDs::END) ? \
TG_MGR.default_tg_id_map_[tg_id] : tg_id]; \
TG_MGR.tgs_[tg_id]; \
if (OB_ISNULL(tg)) { \
ret = common::OB_ERR_UNEXPECTED; \
} else { \
@ -1221,7 +1116,7 @@ public:
tg_type = tg->attr_.type_; \
} \
if (TGType::REENTRANT_THREAD_POOL == tg_type) { \
TG<TGType::REENTRANT_THREAD_POOL>* tmp_tg = static_cast<TG<TGType::REENTRANT_THREAD_POOL>*>(tg); \
TG_REENTRANT_THREAD_POOL* tmp_tg = static_cast<TG_REENTRANT_THREAD_POOL*>(tg); \
ret = tmp_tg->logical_start(); \
} else { \
ret = common::OB_ERR_UNEXPECTED; \
@ -1239,7 +1134,7 @@ public:
tg_type = tg->attr_.type_; \
} \
if (TGType::REENTRANT_THREAD_POOL == tg_type) { \
TG<TGType::REENTRANT_THREAD_POOL>* tmp_tg = static_cast<TG<TGType::REENTRANT_THREAD_POOL>*>(tg); \
TG_REENTRANT_THREAD_POOL* tmp_tg = static_cast<TG_REENTRANT_THREAD_POOL*>(tg); \
tmp_tg->logical_stop(); \
} else { \
ret = common::OB_ERR_UNEXPECTED; \
@ -1256,7 +1151,7 @@ public:
tg_type = tg->attr_.type_; \
} \
if (TGType::REENTRANT_THREAD_POOL == tg_type) { \
TG<TGType::REENTRANT_THREAD_POOL>* tmp_tg = static_cast<TG<TGType::REENTRANT_THREAD_POOL>*>(tg); \
TG_REENTRANT_THREAD_POOL* tmp_tg = static_cast<TG_REENTRANT_THREAD_POOL*>(tg); \
tmp_tg->logical_wait(); \
} else { \
ret = common::OB_ERR_UNEXPECTED; \

View File

@ -25,13 +25,6 @@ extern int64_t global_thread_stack_size;
namespace oceanbase {
namespace lib {
enum ThreadCGroup
{
INVALID_CGROUP = 0,
FRONT_CGROUP = 1,
BACK_CGROUP = 2,
};
class Threads;
class IRunWrapper
{
@ -59,8 +52,7 @@ public:
threads_(nullptr),
stack_size_(global_thread_stack_size),
stop_(true),
run_wrapper_(nullptr),
cgroup_(INVALID_CGROUP)
run_wrapper_(nullptr)
{}
virtual ~Threads();
static IRunWrapper *&get_expect_run_wrapper();
@ -87,10 +79,9 @@ public:
int init();
// IRunWrapper 用于创建多租户线程时指定租户上下文
// cgroup_ctrl 和IRunWrapper配合使用,实现多租户线程的CPU隔离
void set_run_wrapper(IRunWrapper *run_wrapper, ThreadCGroup cgroup = ThreadCGroup::FRONT_CGROUP)
void set_run_wrapper(IRunWrapper *run_wrapper)
{
run_wrapper_ = run_wrapper;
cgroup_ = cgroup;
}
virtual int start();
virtual void stop();
@ -105,7 +96,6 @@ public:
int ret = OB_SUCCESS;
return ret;
}
ThreadCGroup get_cgroup() { return cgroup_; }
virtual bool has_set_stop() const
{
IGNORE_RETURN lib::Thread::update_loop_ts();
@ -143,9 +133,6 @@ private:
common::SpinRWLock lock_ __attribute__((__aligned__(16)));
// tenant ctx
IRunWrapper *run_wrapper_;
// thread cgroups
ThreadCGroup cgroup_;
//
};
using ThreadPool = Threads;

View File

@ -271,46 +271,49 @@ TEST(TG, async_task_queue)
ASSERT_FALSE(TG_EXIST(tg_id));
}
TEST(TG, timer_group)
class MapQueueThreadHandler : public TGTaskHandler
{
public:
void handle(void *task) override
{}
void handle(void *task, volatile bool &is_stopped) override
{
UNUSED(task);
UNUSED(is_stopped);
++handle_count_;
::usleep(50000);
}
int64_t handle_count_ = 0;
};
TEST(TG, map_queue_thread)
{
int tg_id = TGDefIDs::TEST6;
TestTimerTask tasks[2];
MapQueueThreadHandler handler;
// start
ASSERT_EQ(OB_SUCCESS, TG_SET_HANDLER(tg_id, handler));
ASSERT_EQ(OB_SUCCESS, TG_START(tg_id));
for (int i = 0; i < ARRAYSIZEOF(tasks); i++) {
ASSERT_EQ(OB_SUCCESS, TG_SCHEDULE(tg_id, i, tasks[i], 0, true));
}
::usleep(40000);
for (int i = 0; i < ARRAYSIZEOF(tasks); i++) {
ASSERT_TRUE(tasks[i].running_);
}
::usleep(60000);
for (int i = 0; i < ARRAYSIZEOF(tasks); i++) {
ASSERT_EQ(1, tasks[i].task_run_count_);
}
ASSERT_EQ(OB_SUCCESS, TG_PUSH_TASK(tg_id, &tg_id, 0));
::usleep(50000);
ASSERT_EQ(OB_SUCCESS, TG_STOP_R(tg_id));
ASSERT_EQ(OB_SUCCESS, TG_WAIT_R(tg_id));
ASSERT_EQ(1, handler.handle_count_);
// restart
ASSERT_EQ(OB_SUCCESS, TG_SET_HANDLER(tg_id, handler));
ASSERT_EQ(OB_SUCCESS, TG_START(tg_id));
for (int i = 0; i < ARRAYSIZEOF(tasks); i++) {
ASSERT_EQ(OB_SUCCESS, TG_SCHEDULE(tg_id, i, tasks[i], 0, true));
}
::usleep(40000);
for (int i = 0; i < ARRAYSIZEOF(tasks); i++) {
ASSERT_TRUE(tasks[i].running_);
}
::usleep(60000);
for (int i = 0; i < ARRAYSIZEOF(tasks); i++) {
ASSERT_EQ(2, tasks[i].task_run_count_);
}
ASSERT_EQ(OB_SUCCESS, TG_PUSH_TASK(tg_id, &tg_id, 1));
::usleep(50000);
ASSERT_EQ(OB_SUCCESS, TG_STOP_R(tg_id));
ASSERT_EQ(OB_SUCCESS, TG_WAIT_R(tg_id));
ASSERT_EQ(2, handler.handle_count_);
ASSERT_TRUE(TG_EXIST(tg_id));
TG_DESTROY(tg_id);
ASSERT_FALSE(TG_EXIST(tg_id));
}
int main(int argc, char *argv[])
{
oceanbase::common::ObLogger::get_logger().set_log_level("INFO");

View File

@ -142,7 +142,7 @@ void ObArchiveFetcher::destroy()
int ObArchiveFetcher::start()
{
int ret = OB_SUCCESS;
ObThreadPool::set_run_wrapper(MTL_CTX(), lib::ThreadCGroup::BACK_CGROUP);
ObThreadPool::set_run_wrapper(MTL_CTX());
if (OB_UNLIKELY(! inited_)) {
ARCHIVE_LOG(ERROR, "ObArchiveFetcher not init");
ret = OB_NOT_INIT;

View File

@ -116,7 +116,7 @@ void ObArchiveSender::destroy()
int ObArchiveSender::start()
{
int ret = OB_SUCCESS;
ObThreadPool::set_run_wrapper(MTL_CTX(), lib::ThreadCGroup::BACK_CGROUP);
ObThreadPool::set_run_wrapper(MTL_CTX());
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
ARCHIVE_LOG(INFO, "ObArchiveSender has not been initialized", KR(ret));

View File

@ -90,7 +90,7 @@ void ObArchiveSequencer::destroy()
int ObArchiveSequencer::start()
{
int ret = OB_SUCCESS;
ObThreadPool::set_run_wrapper(MTL_CTX(), lib::ThreadCGroup::BACK_CGROUP);
ObThreadPool::set_run_wrapper(MTL_CTX());
if (OB_UNLIKELY(! inited_)) {
ARCHIVE_LOG(ERROR, "ObArchiveSequencer not init");
ret = OB_NOT_INIT;

View File

@ -109,7 +109,7 @@ int ObArchiveService::init(logservice::ObLogService *log_service,
int ObArchiveService::start()
{
int ret = OB_SUCCESS;
ObThreadPool::set_run_wrapper(MTL_CTX(), lib::ThreadCGroup::BACK_CGROUP);
ObThreadPool::set_run_wrapper(MTL_CTX());
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
ARCHIVE_LOG(ERROR, "archive service not init", K(ret));

View File

@ -73,7 +73,7 @@ void ObArchiveTimer::destroy()
int ObArchiveTimer::start()
{
int ret = OB_SUCCESS;
ObThreadPool::set_run_wrapper(MTL_CTX(), lib::ThreadCGroup::BACK_CGROUP);
ObThreadPool::set_run_wrapper(MTL_CTX());
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
ARCHIVE_LOG(ERROR, "ObArchiveTimer not init", K(ret), K(inited_), K(tenant_id_));

View File

@ -183,7 +183,7 @@ int ObArchiveLSMgr::init(const uint64_t tenant_id,
int ObArchiveLSMgr::start()
{
int ret = OB_SUCCESS;
ObThreadPool::set_run_wrapper(MTL_CTX(), lib::ThreadCGroup::BACK_CGROUP);
ObThreadPool::set_run_wrapper(MTL_CTX());
if (OB_UNLIKELY(! inited_)) {
ARCHIVE_LOG(ERROR, "ObArchiveLSMgr not init");
ret = OB_NOT_INIT;

View File

@ -14,128 +14,138 @@
#define GET_THREAD_NUM_BY_NPROCESSORS(factor) (sysconf(_SC_NPROCESSORS_ONLN) / (factor) > 0 ? sysconf(_SC_NPROCESSORS_ONLN) / (factor) : 1)
#define GET_THREAD_NUM_BY_NPROCESSORS_WITH_LIMIT(factor, limit) (sysconf(_SC_NPROCESSORS_ONLN) / (factor) > 0 ? min(sysconf(_SC_NPROCESSORS_ONLN) / (factor), limit) : 1)
#define GET_MYSQL_THREAD_COUNT(default_cnt) (GCONF.sql_login_thread_count ? GCONF.sql_login_thread_count : (default_cnt))
TG_DEF(TEST_OB_TH, testObTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1 ,1))
TG_DEF(COMMON_THREAD_POOL, ComTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1 ,1))
TG_DEF(COMMON_QUEUE_THREAD, ComQueueTh, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(1 ,1), 100)
TG_DEF(COMMON_TIMER_THREAD, ComTimerTh, "", TG_STATIC, TIMER)
TG_DEF(Blacklist, Blacklist, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(PartSerMigRetryQt, PartSerMigRetryQt, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1))
// TG_DEF(PartSerCb, PartSerCb, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(storage::ObCallbackQueueThread::QUEUE_THREAD_NUM, storage::ObCallbackQueueThread::MINI_MODE_QUEUE_THREAD_NUM),
TG_DEF(TEST_OB_TH, testObTh, THREAD_POOL, 1)
TG_DEF(COMMON_THREAD_POOL, ComTh, THREAD_POOL, 1)
TG_DEF(COMMON_QUEUE_THREAD, ComQueueTh, QUEUE_THREAD, 1, 100)
TG_DEF(COMMON_TIMER_THREAD, ComTimerTh, TIMER)
TG_DEF(Blacklist, Blacklist, THREAD_POOL, 1)
TG_DEF(PartSerMigRetryQt, PartSerMigRetryQt, THREAD_POOL, 1)
// TG_DEF(PartSerCb, PartSerCb, QUEUE_THREAD, ThreadCountPair(storage::ObCallbackQueueThread::QUEUE_THREAD_NUM, storage::ObCallbackQueueThread::MINI_MODE_QUEUE_THREAD_NUM),
// (!lib::is_mini_mode() ? OB_MAX_PARTITION_NUM_PER_SERVER : OB_MINI_MODE_MAX_PARTITION_NUM_PER_SERVER) * 2)
// TG_DEF(PartSerLargeCb, PartSerLargeCb, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(storage::ObCallbackQueueThread::QUEUE_THREAD_NUM, storage::ObCallbackQueueThread::MINI_MODE_QUEUE_THREAD_NUM),
// TG_DEF(PartSerLargeCb, PartSerLargeCb, QUEUE_THREAD, ThreadCountPair(storage::ObCallbackQueueThread::QUEUE_THREAD_NUM, storage::ObCallbackQueueThread::MINI_MODE_QUEUE_THREAD_NUM),
// (!lib::is_mini_mode() ? OB_MAX_PARTITION_NUM_PER_SERVER : OB_MINI_MODE_MAX_PARTITION_NUM_PER_SERVER) * 2)
TG_DEF(ReplayEngine, ReplayEngine, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(sysconf(_SC_NPROCESSORS_ONLN), 2),
TG_DEF(ReplayEngine, ReplayEngine, QUEUE_THREAD, ThreadCountPair(sysconf(_SC_NPROCESSORS_ONLN), 2),
!lib::is_mini_mode() ? (common::REPLAY_TASK_QUEUE_SIZE + 1) * OB_MAX_PARTITION_NUM_PER_SERVER : (common::REPLAY_TASK_QUEUE_SIZE + 1) * OB_MINI_MODE_MAX_PARTITION_NUM_PER_SERVER)
TG_DEF(TransMigrate, TransMigrate, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS(24), 1), 10000)
TG_DEF(StandbyTimestampService, StandbyTimestampService, "", TG_DYNAMIC, OB_THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(WeakReadService, WeakRdSrv, "", TG_DYNAMIC, OB_THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(TransTaskWork, TransTaskWork, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS(12), 1), transaction::ObThreadLocalTransCtx::MAX_BIG_TRANS_TASK)
TG_DEF(DDLTaskExecutor3, DDLTaskExecutor3, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(8, 2))
TG_DEF(TSWorker, TSWorker, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS(12), 1), transaction::ObTsWorker::MAX_TASK_NUM)
TG_DEF(BRPC, BRPC, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(obrpc::ObBatchRpc::MAX_THREAD_COUNT, obrpc::ObBatchRpc::MINI_MODE_THREAD_COUNT))
TG_DEF(RLMGR, RLMGR, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(LeaseQueueTh, LeaseQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(observer::ObSrvDeliver::LEASE_TASK_THREAD_CNT, observer::ObSrvDeliver::MINI_MODE_LEASE_TASK_THREAD_CNT))
TG_DEF(DDLQueueTh, DDLQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(observer::ObSrvDeliver::DDL_TASK_THREAD_CNT, observer::ObSrvDeliver::DDL_TASK_THREAD_CNT))
TG_DEF(MysqlQueueTh, MysqlQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(GET_MYSQL_THREAD_COUNT(observer::ObSrvDeliver::MYSQL_TASK_THREAD_CNT), GET_MYSQL_THREAD_COUNT(observer::ObSrvDeliver::MINI_MODE_MYSQL_TASK_THREAD_CNT)))
TG_DEF(DDLPQueueTh, DDLPQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS_WITH_LIMIT(2, 24), 2))
TG_DEF(DiagnoseQueueTh, DiagnoseQueueTh, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(observer::ObSrvDeliver::MYSQL_DIAG_TASK_THREAD_CNT, observer::ObSrvDeliver::MINI_MODE_MYSQL_DIAG_TASK_THREAD_CNT))
TG_DEF(DdlBuild, DdlBuild, "", TG_STATIC, ASYNC_TASK_QUEUE, ThreadCountPair(16, 1), 4 << 10)
TG_DEF(LSService, LSService, "", TG_STATIC, REENTRANT_THREAD_POOL, ThreadCountPair(2 ,2))
TG_DEF(ObCreateStandbyFromNetActor, ObCreateStandbyFromNetActor, "", TG_STATIC, REENTRANT_THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(SimpleLSService, SimpleLSService, "", TG_STATIC, REENTRANT_THREAD_POOL, ThreadCountPair(1 ,1))
TG_DEF(IntermResGC, IntermResGC, "", TG_STATIC, TIMER)
TG_DEF(ServerGTimer, ServerGTimer, "", TG_STATIC, TIMER)
TG_DEF(FreezeTimer, FreezeTimer, "", TG_STATIC, TIMER)
TG_DEF(SqlMemTimer, SqlMemTimer, "", TG_STATIC, TIMER)
TG_DEF(ServerTracerTimer, ServerTracerTimer, "", TG_STATIC, TIMER)
TG_DEF(RSqlPool, RSqlPool, "", TG_STATIC, TIMER)
TG_DEF(KVCacheWash, KVCacheWash, "", TG_STATIC, TIMER)
TG_DEF(KVCacheRep, KVCacheRep, "", TG_STATIC, TIMER)
TG_DEF(ObHeartbeat, ObHeartbeat, "", TG_STATIC, TIMER)
TG_DEF(PlanCacheEvict, PlanCacheEvict, "", TG_DYNAMIC, TIMER)
TG_DEF(TabletStatRpt, TabletStatRpt, "", TG_STATIC, TIMER)
TG_DEF(PsCacheEvict, PsCacheEvict, "", TG_DYNAMIC, TIMER)
TG_DEF(MergeLoop, MergeLoop, "", TG_STATIC, TIMER)
TG_DEF(SSTableGC, SSTableGC, "", TG_STATIC, TIMER)
TG_DEF(MediumLoop, MediumLoop, "", TG_STATIC, TIMER)
TG_DEF(WriteCkpt, WriteCkpt, "", TG_STATIC, TIMER)
TG_DEF(EXTLogWash, EXTLogWash, "", TG_STATIC, TIMER)
TG_DEF(LineCache, LineCache, "", TG_STATIC, TIMER)
TG_DEF(LocalityReload, LocalityReload, "", TG_STATIC, TIMER)
TG_DEF(MemstoreGC, MemstoreGC, "", TG_STATIC, TIMER)
TG_DEF(DiskUseReport, DiskUseReport, "", TG_STATIC, TIMER)
TG_DEF(CLOGReqMinor, CLOGReqMinor, "", TG_STATIC, TIMER)
TG_DEF(PGArchiveLog, PGArchiveLog, "", TG_STATIC, TIMER)
TG_DEF(CKPTLogRep, CKPTLogRep, "", TG_STATIC, TIMER)
TG_DEF(RebuildRetry, RebuildRetry, "", TG_STATIC, TIMER)
TG_DEF(TableMgrGC, TableMgrGC, "", TG_STATIC, TIMER)
TG_DEF(IndexSche, IndexSche, "", TG_STATIC, TIMER)
TG_DEF(FreInfoReload, FreInfoReload, "", TG_DYNAMIC, TIMER)
TG_DEF(HAGtsMgr, HAGtsMgr, "", TG_STATIC, TIMER)
TG_DEF(HAGtsHB, HAGtsHB, "", TG_STATIC, TIMER)
TG_DEF(RebuildTask, RebuildTask, "", TG_STATIC, TIMER)
TG_DEF(LogDiskMon, LogDiskMon, "", TG_DYNAMIC, TIMER)
TG_DEF(ILOGFlush, ILOGFlush, "", TG_STATIC, TIMER)
TG_DEF(ILOGPurge, ILOGPurge, "", TG_STATIC, TIMER)
TG_DEF(RLogClrCache, RLogClrCache, "", TG_STATIC, TIMER)
TG_DEF(TableStatRpt, TableStatRpt, "", TG_STATIC, TIMER)
TG_DEF(MacroMetaMgr, MacroMetaMgr, "", TG_STATIC, TIMER)
TG_DEF(StoreFileGC, StoreFileGC, "", TG_STATIC, TIMER)
TG_DEF(LeaseHB, LeaseHB, "", TG_STATIC, TIMER)
TG_DEF(ClusterTimer, ClusterTimer, "", TG_STATIC, TIMER)
TG_DEF(MergeTimer, MergeTimer, "", TG_STATIC, TIMER)
TG_DEF(CFC, CFC, "", TG_STATIC, TIMER)
TG_DEF(CCDF, CCDF, "", TG_STATIC, TIMER)
TG_DEF(LogMysqlPool, LogMysqlPool, "", TG_STATIC, TIMER)
TG_DEF(TblCliSqlPool, TblCliSqlPool, "", TG_STATIC, TIMER)
TG_DEF(QueryExecCtxGC, QueryExecCtxGC, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(DtlDfc, DtlDfc, "", TG_STATIC, TIMER)
TG_DEF(LogIOTaskCbThreadPool, LogIOCb, "", TG_STATIC, QUEUE_THREAD,
TG_DEF(TransMigrate, TransMigrate, QUEUE_THREAD, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS(24), 1), 10000)
TG_DEF(StandbyTimestampService, StandbyTimestampService, THREAD_POOL, 1)
TG_DEF(WeakReadService, WeakRdSrv, THREAD_POOL, 1)
TG_DEF(TransTaskWork, TransTaskWork, QUEUE_THREAD, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS(12), 1), transaction::ObThreadLocalTransCtx::MAX_BIG_TRANS_TASK)
TG_DEF(DDLTaskExecutor3, DDLTaskExecutor3, THREAD_POOL, ThreadCountPair(8, 2))
TG_DEF(TSWorker, TSWorker, QUEUE_THREAD, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS(12), 1), transaction::ObTsWorker::MAX_TASK_NUM)
TG_DEF(BRPC, BRPC, THREAD_POOL, ThreadCountPair(obrpc::ObBatchRpc::MAX_THREAD_COUNT, obrpc::ObBatchRpc::MINI_MODE_THREAD_COUNT))
TG_DEF(RLMGR, RLMGR, THREAD_POOL, 1)
TG_DEF(LeaseQueueTh, LeaseQueueTh, THREAD_POOL, ThreadCountPair(observer::ObSrvDeliver::LEASE_TASK_THREAD_CNT, observer::ObSrvDeliver::MINI_MODE_LEASE_TASK_THREAD_CNT))
TG_DEF(DDLQueueTh, DDLQueueTh, THREAD_POOL, ThreadCountPair(observer::ObSrvDeliver::DDL_TASK_THREAD_CNT, observer::ObSrvDeliver::DDL_TASK_THREAD_CNT))
TG_DEF(MysqlQueueTh, MysqlQueueTh, THREAD_POOL, ThreadCountPair(GET_MYSQL_THREAD_COUNT(observer::ObSrvDeliver::MYSQL_TASK_THREAD_CNT), GET_MYSQL_THREAD_COUNT(observer::ObSrvDeliver::MINI_MODE_MYSQL_TASK_THREAD_CNT)))
TG_DEF(DDLPQueueTh, DDLPQueueTh, THREAD_POOL, ThreadCountPair(GET_THREAD_NUM_BY_NPROCESSORS_WITH_LIMIT(2, 24), 2))
TG_DEF(DiagnoseQueueTh, DiagnoseQueueTh, THREAD_POOL, ThreadCountPair(observer::ObSrvDeliver::MYSQL_DIAG_TASK_THREAD_CNT, observer::ObSrvDeliver::MINI_MODE_MYSQL_DIAG_TASK_THREAD_CNT))
TG_DEF(DdlBuild, DdlBuild, ASYNC_TASK_QUEUE, ThreadCountPair(16, 1), 4 << 10)
TG_DEF(LSService, LSService, REENTRANT_THREAD_POOL, 2)
TG_DEF(ObCreateStandbyFromNetActor, ObCreateStandbyFromNetActor, REENTRANT_THREAD_POOL, 1)
TG_DEF(SimpleLSService, SimpleLSService, REENTRANT_THREAD_POOL, 1)
TG_DEF(IntermResGC, IntermResGC, TIMER)
TG_DEF(ServerGTimer, ServerGTimer, TIMER)
TG_DEF(FreezeTimer, FreezeTimer, TIMER)
TG_DEF(SqlMemTimer, SqlMemTimer, TIMER)
TG_DEF(ServerTracerTimer, ServerTracerTimer, TIMER)
TG_DEF(RSqlPool, RSqlPool, TIMER)
TG_DEF(KVCacheWash, KVCacheWash, TIMER)
TG_DEF(KVCacheRep, KVCacheRep, TIMER)
TG_DEF(ObHeartbeat, ObHeartbeat, TIMER)
TG_DEF(PlanCacheEvict, PlanCacheEvict, TIMER)
TG_DEF(TabletStatRpt, TabletStatRpt, TIMER)
TG_DEF(PsCacheEvict, PsCacheEvict, TIMER)
TG_DEF(MergeLoop, MergeLoop, TIMER)
TG_DEF(SSTableGC, SSTableGC, TIMER)
TG_DEF(MediumLoop, MediumLoop, TIMER)
TG_DEF(WriteCkpt, WriteCkpt, TIMER)
TG_DEF(EXTLogWash, EXTLogWash, TIMER)
TG_DEF(LineCache, LineCache, TIMER)
TG_DEF(LocalityReload, LocalityReload, TIMER)
TG_DEF(MemstoreGC, MemstoreGC, TIMER)
TG_DEF(DiskUseReport, DiskUseReport, TIMER)
TG_DEF(CLOGReqMinor, CLOGReqMinor, TIMER)
TG_DEF(PGArchiveLog, PGArchiveLog, TIMER)
TG_DEF(CKPTLogRep, CKPTLogRep, TIMER)
TG_DEF(RebuildRetry, RebuildRetry, TIMER)
TG_DEF(TableMgrGC, TableMgrGC, TIMER)
TG_DEF(IndexSche, IndexSche, TIMER)
TG_DEF(FreInfoReload, FreInfoReload, TIMER)
TG_DEF(HAGtsMgr, HAGtsMgr, TIMER)
TG_DEF(HAGtsHB, HAGtsHB, TIMER)
TG_DEF(RebuildTask, RebuildTask, TIMER)
TG_DEF(LogDiskMon, LogDiskMon, TIMER)
TG_DEF(ILOGFlush, ILOGFlush, TIMER)
TG_DEF(ILOGPurge, ILOGPurge, TIMER)
TG_DEF(RLogClrCache, RLogClrCache, TIMER)
TG_DEF(TableStatRpt, TableStatRpt, TIMER)
TG_DEF(MacroMetaMgr, MacroMetaMgr, TIMER)
TG_DEF(StoreFileGC, StoreFileGC, TIMER)
TG_DEF(LeaseHB, LeaseHB, TIMER)
TG_DEF(ClusterTimer, ClusterTimer, TIMER)
TG_DEF(MergeTimer, MergeTimer, TIMER)
TG_DEF(CFC, CFC, TIMER)
TG_DEF(CCDF, CCDF, TIMER)
TG_DEF(LogMysqlPool, LogMysqlPool, TIMER)
TG_DEF(TblCliSqlPool, TblCliSqlPool, TIMER)
TG_DEF(QueryExecCtxGC, QueryExecCtxGC, THREAD_POOL, 1)
TG_DEF(DtlDfc, DtlDfc, TIMER)
TG_DEF(LogIOTaskCbThreadPool, LogIOCb, QUEUE_THREAD,
ThreadCountPair(palf::LogIOTaskCbThreadPool::THREAD_NUM,
palf::LogIOTaskCbThreadPool::MINI_MODE_THREAD_NUM),
palf::LogIOTaskCbThreadPool::MAX_LOG_IO_CB_TASK_NUM)
TG_DEF(ReplayService, ReplaySrv, "", TG_DYNAMIC, QUEUE_THREAD, ThreadCountPair(1, 1),
TG_DEF(ReplayService, ReplaySrv, QUEUE_THREAD, 1,
!lib::is_mini_mode() ? (common::REPLAY_TASK_QUEUE_SIZE + 1) * OB_MAX_LS_NUM_PER_TENANT_PER_SERVER : (common::REPLAY_TASK_QUEUE_SIZE + 1) * OB_MINI_MODE_MAX_LS_NUM_PER_TENANT_PER_SERVER)
TG_DEF(LogRouteService, LogRouteSrv, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(1, 1),
TG_DEF(LogRouteService, LogRouteSrv, QUEUE_THREAD, 1,
!lib::is_mini_mode() ? (common::MAX_SERVER_COUNT) * OB_MAX_LS_NUM_PER_TENANT_PER_SERVER : (common::MAX_SERVER_COUNT) * OB_MINI_MODE_MAX_LS_NUM_PER_TENANT_PER_SERVER)
TG_DEF(LogRouterTimer, LogRouterTimer, "", TG_STATIC, TIMER)
TG_DEF(LogFetcherLSWorker, LSWorker, "", TG_STATIC, MAP_QUEUE_THREAD, ThreadCountPair(4, 1))
TG_DEF(LogFetcherIdlePool, LSIdlePool, "", TG_STATIC, MAP_QUEUE_THREAD, ThreadCountPair(1, 1))
TG_DEF(LogFetcherDeadPool, LSDeadPool, "", TG_STATIC, MAP_QUEUE_THREAD, ThreadCountPair(1, 1))
TG_DEF(LogFetcherTimer, LSTimer, "", TG_STATIC, TIMER)
TG_DEF(PalfBlockGC, PalfGC, "", TG_DYNAMIC, TIMER)
TG_DEF(LSFreeze, LSFreeze, "", TG_STATIC, QUEUE_THREAD, ThreadCountPair(storage::ObLSFreezeThread::QUEUE_THREAD_NUM, storage::ObLSFreezeThread::MINI_MODE_QUEUE_THREAD_NUM),
TG_DEF(LogRouterTimer, LogRouterTimer, TIMER)
TG_DEF(LogFetcherLSWorker, LSWorker, MAP_QUEUE_THREAD, ThreadCountPair(4, 1))
TG_DEF(LogFetcherIdlePool, LSIdlePool, MAP_QUEUE_THREAD, 1)
TG_DEF(LogFetcherDeadPool, LSDeadPool, MAP_QUEUE_THREAD, 1)
TG_DEF(LogFetcherTimer, LSTimer, TIMER)
TG_DEF(PalfBlockGC, PalfGC, TIMER)
TG_DEF(LSFreeze, LSFreeze, QUEUE_THREAD, ThreadCountPair(storage::ObLSFreezeThread::QUEUE_THREAD_NUM, storage::ObLSFreezeThread::MINI_MODE_QUEUE_THREAD_NUM),
storage::ObLSFreezeThread::MAX_FREE_TASK_NUM)
TG_DEF(LSFetchLogEngine, FetchLog, "", TG_DYNAMIC, QUEUE_THREAD,
TG_DEF(LSFetchLogEngine, FetchLog, QUEUE_THREAD,
ThreadCountPair(palf::FetchLogEngine::FETCH_LOG_THREAD_COUNT, palf::FetchLogEngine::MINI_MODE_FETCH_LOG_THREAD_COUNT),
!lib::is_mini_mode() ? palf::FetchLogEngine::FETCH_LOG_TASK_MAX_COUNT_PER_LS * OB_MAX_LS_NUM_PER_TENANT_PER_SERVER : palf::FetchLogEngine::FETCH_LOG_TASK_MAX_COUNT_PER_LS * OB_MINI_MODE_MAX_LS_NUM_PER_TENANT_PER_SERVER)
TG_DEF(DagScheduler, DagScheduler, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(DagWorker, DagWorker, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(RCService, RCSrv, "", TG_DYNAMIC, QUEUE_THREAD,
TG_DEF(DagScheduler, DagScheduler, THREAD_POOL, 1)
TG_DEF(DagWorker, DagWorker, THREAD_POOL, 1)
TG_DEF(RCService, RCSrv, QUEUE_THREAD,
ThreadCountPair(logservice::ObRoleChangeService::MAX_THREAD_NUM,
logservice::ObRoleChangeService::MAX_THREAD_NUM),
logservice::ObRoleChangeService::MAX_RC_EVENT_TASK)
TG_DEF(ApplyService, ApplySrv, "", TG_DYNAMIC, QUEUE_THREAD, ThreadCountPair(1, 1),
TG_DEF(ApplyService, ApplySrv, QUEUE_THREAD, 1,
!lib::is_mini_mode() ? (common::APPLY_TASK_QUEUE_SIZE + 1) * OB_MAX_LS_NUM_PER_TENANT_PER_SERVER : (common::APPLY_TASK_QUEUE_SIZE + 1) * OB_MINI_MODE_MAX_LS_NUM_PER_TENANT_PER_SERVER)
TG_DEF(GlobalCtxTimer, GlobalCtxTimer, "", TG_STATIC, TIMER)
TG_DEF(StorageLogWriter, StorageLogWriter, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(ReplayProcessStat, ReplayProcessStat, "", TG_STATIC, TIMER)
TG_DEF(ActiveSessHist, ActiveSessHist, "", TG_STATIC, TIMER)
TG_DEF(CTASCleanUpTimer, CTASCleanUpTimer, "", TG_STATIC, TIMER)
TG_DEF(DDLScanTask, DDLScanTask, "", TG_STATIC, TIMER)
TG_DEF(TenantLSMetaChecker, LSMetaCh, "", TG_STATIC, TIMER)
TG_DEF(TenantTabletMetaChecker, TbMetaCh, "", TG_STATIC, TIMER)
TG_DEF(ServerMetaChecker, SvrMetaCh, "", TG_STATIC, TIMER)
TG_DEF(ArbGCSTh, ArbGCTimerP, "", TG_STATIC, TIMER)
TG_DEF(DataDictTimer, DataDictTimer, "", TG_STATIC, TIMER)
TG_DEF(CDCService, CDCSrv, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(LogUpdater, LogUpdater, "", TG_STATIC, TIMER)
TG_DEF(HeartBeatCheckTask, HeartBeatCheckTask, "", TG_STATIC, TIMER)
TG_DEF(RedefHeartBeatTask, RedefHeartBeatTask, "", TG_STATIC, TIMER)
TG_DEF(MemDumpTimer, MemDumpTimer, "", TG_STATIC, TIMER)
TG_DEF(SSTableDefragment, SSTableDefragment, "", TG_STATIC, TIMER)
TG_DEF(TenantMetaMemMgr, TenantMetaMemMgr, "", TG_STATIC, TIMER)
TG_DEF(IngressService, IngressService, "", TG_STATIC, TIMER)
TG_DEF(HeartbeatService, HeartbeatService, "", TG_STATIC, REENTRANT_THREAD_POOL, ThreadCountPair(2, 2))
TG_DEF(DetectManager, DetectManager, "", TG_STATIC, OB_THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(GlobalCtxTimer, GlobalCtxTimer, TIMER)
TG_DEF(StorageLogWriter, StorageLogWriter, THREAD_POOL, 1)
TG_DEF(ReplayProcessStat, ReplayProcessStat, TIMER)
TG_DEF(ActiveSessHist, ActiveSessHist, TIMER)
TG_DEF(CTASCleanUpTimer, CTASCleanUpTimer, TIMER)
TG_DEF(DDLScanTask, DDLScanTask, TIMER)
TG_DEF(TenantLSMetaChecker, LSMetaCh, TIMER)
TG_DEF(TenantTabletMetaChecker, TbMetaCh, TIMER)
TG_DEF(ServerMetaChecker, SvrMetaCh, TIMER)
TG_DEF(ArbGCSTh, ArbGCTimerP, TIMER)
TG_DEF(DataDictTimer, DataDictTimer, TIMER)
TG_DEF(CDCService, CDCSrv, THREAD_POOL, 1)
TG_DEF(LogUpdater, LogUpdater, TIMER)
TG_DEF(HeartBeatCheckTask, HeartBeatCheckTask, TIMER)
TG_DEF(RedefHeartBeatTask, RedefHeartBeatTask, TIMER)
TG_DEF(MemDumpTimer, MemDumpTimer, TIMER)
TG_DEF(SSTableDefragment, SSTableDefragment, TIMER)
TG_DEF(TenantMetaMemMgr, TenantMetaMemMgr, TIMER)
TG_DEF(IngressService, IngressService, TIMER)
TG_DEF(HeartbeatService, HeartbeatService, REENTRANT_THREAD_POOL, 2)
TG_DEF(DetectManager, DetectManager, THREAD_POOL, 1)
TG_DEF(CONFIG_MGR, ConfigMgr, TIMER, 1024)
TG_DEF(IO_TUNING, IO_TUNING, THREAD_POOL, 1)
TG_DEF(IO_SCHEDULE, IO_SCHEDULE, THREAD_POOL, 1)
TG_DEF(IO_CALLBACK, IO_CALLBACK, THREAD_POOL, 1)
TG_DEF(IO_CHANNEL, IO_CHANNEL, THREAD_POOL, 1)
TG_DEF(IO_HEALTH, IO_HEALTH, QUEUE_THREAD, 1, 100)
TG_DEF(IO_BENCHMARK, IO_BENCHMARK, THREAD_POOL, 1)
TG_DEF(TIMEZONE_MGR, TimezoneMgr, TIMER)
TG_DEF(MASTER_KEY_MGR, MasterKeyMgr, QUEUE_THREAD, 1, 100)
TG_DEF(SRS_MGR, SrsMgr, TIMER, 128)
#endif

View File

@ -31,10 +31,10 @@ namespace share
{
void ob_init_create_func()
{
#define TG_DEF(id, name, desc, scope, type, args...) \
#define TG_DEF(id, name, type, args...) \
lib::create_funcs_[lib::TGDefIDs::id] = []() { \
auto ret = OB_NEW(TGCLSMap<TGType::type>::CLS, SET_USE_500("tg"), args); \
ret->attr_ = {#name, desc, TGScope::scope, TGType::type}; \
auto ret = OB_NEW(TG_##type, SET_USE_500("tg"), args); \
ret->attr_ = {#name, TGType::type}; \
return ret; \
};
#include "share/ob_thread_define.h"

View File

@ -30,103 +30,6 @@ enum OBTGDefIDEnum
};
}
} // end of namespace lib
namespace share
{
using lib::TGType;
using lib::ITG;
template<enum TGType type>
class ObTG;
class MyObThreadPool : public share::ObThreadPool
{
public:
void run1() override
{
runnable_->set_thread_idx(get_thread_idx());
runnable_->run1();
}
lib::TGRunnable *runnable_ = nullptr;
};
template<>
class ObTG<TGType::OB_THREAD_POOL> : public ITG
{
public:
ObTG(lib::ThreadCountPair pair)
: thread_cnt_(pair.get_thread_cnt())
{}
~ObTG() { destroy(); }
int thread_cnt() override { return (int)thread_cnt_; }
int set_thread_cnt(int64_t thread_cnt) override
{
int ret = common::OB_SUCCESS;
if (th_ == nullptr) {
ret = common::OB_ERR_UNEXPECTED;
} else {
thread_cnt_ = thread_cnt;
th_->set_thread_count(thread_cnt_);
}
return ret;
}
int set_runnable(lib::TGRunnable &runnable)
{
int ret = common::OB_SUCCESS;
if (th_ != nullptr) {
ret = common::OB_ERR_UNEXPECTED;
} else {
th_ = new (buf_) MyObThreadPool();
th_->runnable_= &runnable;
}
return ret;
}
int start() override
{
int ret = common::OB_SUCCESS;
if (nullptr == th_) {
ret = common::OB_ERR_UNEXPECTED;
} else if(nullptr == th_->runnable_) {
ret = common::OB_ERR_UNEXPECTED;
} else {
th_->runnable_->set_stop(false);
th_->set_thread_count(thread_cnt_);
th_->set_run_wrapper(tg_helper_, tg_cgroup_);
ret = th_->start();
}
return ret;
}
void stop() override
{
if (th_ != nullptr) {
th_->runnable_->set_stop(true);
th_->stop();
}
}
void wait() override
{
if (th_ != nullptr) {
th_->wait();
destroy();
}
}
void destroy()
{
if (th_ != nullptr) {
th_->destroy();
th_->~MyObThreadPool();
th_ = nullptr;
}
}
private:
char buf_[sizeof(MyObThreadPool)];
MyObThreadPool *th_ = nullptr;
int64_t thread_cnt_;
};
} // end of namespace share
BIND_TG_CLS(lib::TGType::OB_THREAD_POOL, share::ObTG<lib::TGType::OB_THREAD_POOL>);
} // end of namespace oceanbase
#endif // OB_THREAD_MGR_H_

View File

@ -270,10 +270,8 @@ void ObTenantBase::destroy_mtl_module()
created_ = false;
}
ObCgroupCtrl *ObTenantBase::get_cgroup(lib::ThreadCGroup cgroup)
ObCgroupCtrl *ObTenantBase::get_cgroup()
{
// TODO
UNUSED(cgroup);
ObCgroupCtrl *cgroup_ctrl = nullptr;
cgroup_ctrl = cgroups_;
return cgroup_ctrl;
@ -283,7 +281,7 @@ int ObTenantBase::pre_run(lib::Threads *th)
{
int ret = OB_SUCCESS;
ObTenantEnv::set_tenant(this);
ObCgroupCtrl *cgroup_ctrl = get_cgroup(th->get_cgroup());
ObCgroupCtrl *cgroup_ctrl = get_cgroup();
if (cgroup_ctrl != nullptr) {
ret = cgroup_ctrl->add_self_to_cgroup(id_);
}
@ -296,7 +294,7 @@ int ObTenantBase::end_run(lib::Threads *th)
{
int ret = OB_SUCCESS;
ObTenantEnv::set_tenant(nullptr);
ObCgroupCtrl *cgroup_ctrl = get_cgroup(th->get_cgroup());
ObCgroupCtrl *cgroup_ctrl = get_cgroup();
if (cgroup_ctrl != nullptr) {
ret = cgroup_ctrl->remove_self_from_cgroup(id_);
}

View File

@ -416,7 +416,7 @@ public:
int init(ObCgroupCtrl *cgroup = nullptr);
void destroy();
virtual inline uint64_t id() const override { return id_; }
ObCgroupCtrl *get_cgroup(lib::ThreadCGroup cgroup);
ObCgroupCtrl *get_cgroup();
const ObTenantModuleInitCtx *get_mtl_init_ctx() const { return mtl_init_ctx_; }

View File

@ -1312,7 +1312,7 @@ int ObTenantDagWorker::init(const int64_t check_period)
COMMON_LOG(WARN, "dag worker is inited twice", K(ret));
} else if (OB_FAIL(cond_.init(ObWaitEventIds::DAG_WORKER_COND_WAIT))) {
COMMON_LOG(WARN, "failed to init cond", K(ret));
} else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::DagWorker, tg_id_, lib::ThreadCGroup::BACK_CGROUP))) {
} else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::DagWorker, tg_id_))) {
COMMON_LOG(WARN, "TG create dag worker failed", K(ret));
} else {
check_period_ = check_period;
@ -1599,7 +1599,7 @@ int ObTenantDagScheduler::init(
get_default_config();
dag_limit_ = dag_limit;
if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::DagScheduler, tg_id_, lib::ThreadCGroup::BACK_CGROUP))) {
if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::DagScheduler, tg_id_))) {
COMMON_LOG(WARN, "TG create dag scheduler failed", K(ret));
}