[CP] support ob_timer to assign max_task_num by argument

This commit is contained in:
obdev
2022-11-22 08:38:18 +00:00
committed by wangzelin.wzl
parent f72d873722
commit 16fc3d31e2
5 changed files with 45 additions and 21 deletions

View File

@ -24,7 +24,6 @@ namespace common
using namespace obutil;
using namespace lib;
const int32_t ObTimer::MAX_TASK_NUM;
int ObTimer::init(const char* thread_name)
{
@ -32,12 +31,23 @@ int ObTimer::init(const char* thread_name)
if (is_inited_) {
ret = OB_INIT_TWICE;
} else {
is_inited_ = true;
is_destroyed_ = false;
is_stopped_ = true;
has_running_repeat_task_ = false;
thread_name_ = thread_name;
ret = create();
tokens_ = reinterpret_cast<Token*>(ob_malloc(sizeof(Token) * max_task_num_, "timer"));
if (nullptr == tokens_) {
ret = OB_ALLOCATE_MEMORY_FAILED;
OB_LOG(ERROR, "failed to alloc memory", K(ret));
} else {
for (int i = 0; i < max_task_num_; i++) {
new (&tokens_[i]) Token();
}
if (OB_SUCC(ret)) {
is_inited_ = true;
is_destroyed_ = false;
is_stopped_ = true;
has_running_repeat_task_ = false;
thread_name_ = thread_name;
ret = create();
}
}
}
return ret;
}
@ -119,6 +129,13 @@ void ObTimer::destroy()
}
tasks_num_ = 0;
}
if (nullptr != tokens_) {
for (int i = 0; i < max_task_num_; i++) {
tokens_[i].~Token();
}
ob_free(tokens_);
tokens_ = NULL;
}
OB_LOG(INFO, "ObTimer destroy", KP(this), K_(thread_id));
}
ThreadPool::destroy();
@ -172,9 +189,9 @@ int ObTimer::schedule_task(ObTimerTask &task, const int64_t delay, const bool re
} else if (delay < 0) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid argument", K(ret), K(delay));
} else if (tasks_num_ >= MAX_TASK_NUM) {
} else if (tasks_num_ >= max_task_num_) {
ret = OB_ERR_UNEXPECTED;
OB_LOG(WARN, "too much timer task", K(ret), K_(tasks_num), "max_task_num", MAX_TASK_NUM);
OB_LOG(WARN, "too much timer task", K(ret), K_(tasks_num), "max_task_num", max_task_num_);
} else {
int64_t time = ObSysTime::now(ObSysTime::Monotonic).toMicroSeconds();
if(!is_scheduled_immediately) {
@ -200,15 +217,15 @@ int ObTimer::schedule_task(ObTimerTask &task, const int64_t delay, const bool re
int ObTimer::insert_token(const Token &token)
{
int ret = OB_SUCCESS;
int32_t max_task_num= MAX_TASK_NUM;
int32_t max_task_num= max_task_num_;
if (!is_inited_) {
ret = OB_NOT_INIT;
} else {
if (has_running_repeat_task_) {
max_task_num = MAX_TASK_NUM - 1;
max_task_num = max_task_num_ - 1;
}
if (tasks_num_ >= max_task_num) {
OB_LOG(WARN, "tasks_num_ exceed max_task_num", K_(tasks_num), "max_task_num", MAX_TASK_NUM);
OB_LOG(WARN, "tasks_num_ exceed max_task_num", K_(tasks_num), "max_task_num", max_task_num_);
ret = OB_ERR_UNEXPECTED;
} else {
int64_t pos = 0;

View File

@ -66,8 +66,8 @@ class ObTimer
{
public:
friend class oceanbase::tests::blocksstable::FakeTabletManager;
ObTimer(): tasks_num_(0), wakeup_time_(0), is_inited_(false), is_stopped_(false),
is_destroyed_(false), has_running_task_(false), has_running_repeat_task_(false),
ObTimer(int64_t max_task_num = 32): tasks_num_(0), max_task_num_(max_task_num), wakeup_time_(0), is_inited_(false), is_stopped_(false),
is_destroyed_(false), tokens_(nullptr), has_running_task_(false), has_running_repeat_task_(false),
thread_id_(-1), thread_name_(nullptr) {}
~ObTimer();
int init(const char* thread_name = nullptr);
@ -101,19 +101,19 @@ private:
int64_t delay;
ObTimerTask *task;
};
static const int32_t MAX_TASK_NUM = 32;
int insert_token(const Token &token);
void run1() final;
int schedule_task(ObTimerTask &task, const int64_t delay, const bool repeate, const bool is_scheduled_immediately);
DISALLOW_COPY_AND_ASSIGN(ObTimer);
private:
int32_t tasks_num_;
int64_t max_task_num_;
int64_t wakeup_time_;
bool is_inited_;
bool is_stopped_;
bool is_destroyed_;
obutil::ObMonitor<obutil::Mutex> monitor_;
Token tokens_[MAX_TASK_NUM];
Token* tokens_;
bool has_running_task_;
bool has_running_repeat_task_;
int64_t thread_id_;

View File

@ -23,7 +23,7 @@ TG_DEF(TEST7, test7, "", TG_STATIC, REENTRANT_THREAD_POOL, ThreadCountPair(1 ,1)
// other
TG_DEF(MEMORY_DUMP, memDump, "", TG_STATIC, THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(SchemaRefTask, SchemaRefTask, "", TG_STATIC, DEDUP_QUEUE, ThreadCountPair(1, 1), 1024, 1024, 1L << 30, 512L << 20, common::OB_MALLOC_BIG_BLOCK_SIZE, common::ObModIds::OB_SCHEMA_DEDUP_QUEUE)
TG_DEF(CONFIG_MGR, ConfigMgr, "", TG_STATIC, TIMER)
TG_DEF(CONFIG_MGR, ConfigMgr, "", TG_STATIC, TIMER, 1024)
TG_DEF(ReqMemEvict, ReqMemEvict, "", TG_DYNAMIC, TIMER)
TG_DEF(IO_TUNING, IO_TUNING, "", TG_STATIC, THREAD_POOL, ThreadCountPair(1, 1))
TG_DEF(IO_SCHEDULE, IO_SCHEDULE, "", TG_DYNAMIC, THREAD_POOL, ThreadCountPair(1, 1))

View File

@ -41,6 +41,9 @@ public:
ThreadCountPair(int64_t cnt, int64_t mini_mode_cnt)
: cnt_(cnt), mini_mode_cnt_(mini_mode_cnt)
{}
ThreadCountPair(int64_t dummy)
: cnt_(0), mini_mode_cnt_(0)
{}
int64_t get_thread_cnt() const
{
return !is_mini_mode() ? cnt_ : mini_mode_cnt_;
@ -619,6 +622,9 @@ template<>
class TG<TGType::TIMER> : public ITG
{
public:
TG(int64_t max_task_num = 32)
: max_task_num_(max_task_num)
{}
~TG() { destroy(); }
int thread_cnt() override { return 1; }
int set_thread_cnt(int64_t thread_cnt) override
@ -633,7 +639,7 @@ public:
if (timer_ != nullptr) {
ret = common::OB_ERR_UNEXPECTED;
} else {
timer_ = new (buf_) common::ObTimer();
timer_ = new (buf_) common::ObTimer(max_task_num_);
timer_->set_run_wrapper(tg_helper_, tg_cgroup_);
if (OB_FAIL(timer_->init(attr_.name_))) {
OB_LOG(WARN, "init failed", K(ret));
@ -706,6 +712,7 @@ public:
private:
char buf_[sizeof(common::ObTimer)];
common::ObTimer *timer_ = nullptr;
int64_t max_task_num_;
};
template<>

View File

@ -40,18 +40,18 @@ public:
TEST(TestTimer, timer_task)
{
TestTimerTask task[ObTimer::MAX_TASK_NUM + 1];
TestTimerTask task[32 + 1];
ObTimer timer;
ASSERT_EQ(OB_SUCCESS, timer.init());
ASSERT_EQ(OB_SUCCESS, timer.start());
const bool is_repeat = true;
ASSERT_EQ(OB_SUCCESS, timer.schedule(task[0], 100, is_repeat));
for(int i=1; i<ObTimer::MAX_TASK_NUM; ++i)
for(int i=1; i<32; ++i)
{
ASSERT_EQ(OB_SUCCESS,timer.schedule(task[i], 5000000000, is_repeat));//5000s
}
::usleep(5000);//5ms
ASSERT_EQ(OB_ERR_UNEXPECTED, timer.schedule(task[ObTimer::MAX_TASK_NUM], 50000000, is_repeat));
ASSERT_EQ(OB_ERR_UNEXPECTED, timer.schedule(task[32], 50000000, is_repeat));
::usleep(1000000);//1s
ASSERT_GT(task[0].task_run_count_, 1);
}