fix dag scheduler destroy when init failed

This commit is contained in:
a1iive
2023-09-12 02:40:35 +00:00
committed by ob-robot
parent 45573c67e5
commit ffd6829b5b
9 changed files with 144 additions and 111 deletions

View File

@ -1329,6 +1329,9 @@ int ObTenantDagWorker::init(const int64_t check_period)
check_period_ = check_period; check_period_ = check_period;
is_inited_ = true; is_inited_ = true;
} }
if (!is_inited_) {
reset();
}
return ret; return ret;
} }
@ -1358,19 +1361,23 @@ void ObTenantDagWorker::wait()
void ObTenantDagWorker::destroy() void ObTenantDagWorker::destroy()
{ {
if (IS_INIT) { if (IS_INIT) {
stop(); reset();
wait();
task_ = NULL;
status_ = DWS_FREE;
check_period_ = 0;
last_check_time_ = 0;
function_type_ = 0;
group_id_ = 0;
self_ = NULL;
is_inited_ = false;
TG_DESTROY(tg_id_);
} }
} }
void ObTenantDagWorker::reset()
{
stop();
wait();
task_ = NULL;
status_ = DWS_FREE;
check_period_ = 0;
last_check_time_ = 0;
function_type_ = 0;
group_id_ = 0;
self_ = NULL;
is_inited_ = false;
TG_DESTROY(tg_id_);
}
void ObTenantDagWorker::notify(DagWorkerStatus status) void ObTenantDagWorker::notify(DagWorkerStatus status)
{ {
@ -1637,7 +1644,7 @@ int ObTenantDagScheduler::init(
} }
} }
if (!is_inited_) { if (!is_inited_) {
destroy(); reset();
COMMON_LOG(WARN, "failed to init ObTenantDagScheduler", K(ret)); COMMON_LOG(WARN, "failed to init ObTenantDagScheduler", K(ret));
} else { } else {
dump_dag_status(); dump_dag_status();
@ -1649,81 +1656,86 @@ int ObTenantDagScheduler::init(
void ObTenantDagScheduler::destroy() void ObTenantDagScheduler::destroy()
{ {
if (IS_INIT) { if (IS_INIT) {
COMMON_LOG(INFO, "ObTenantDagScheduler starts to destroy"); reset();
stop();
notify();
wait();
destroy_all_workers();
is_inited_ = false; // avoid alloc dag/dag_net
WEAK_BARRIER();
int tmp_ret = OB_SUCCESS;
int64_t abort_dag_cnt = 0;
for (int64_t j = 0; j < DAG_LIST_MAX; ++j) {
for (int64_t i = 0; i < PriorityDagList::PRIO_CNT; ++i) {
ObIDag *head = dag_list_[j].get_head(i);
ObIDag *cur_dag = head->get_next();
ObIDag *next = NULL;
ObIDagNet *tmp_dag_net = nullptr;
while (NULL != cur_dag && head != cur_dag) {
next = cur_dag->get_next();
if (cur_dag->get_dag_id().is_valid()
&& OB_TMP_FAIL(ObSysTaskStatMgr::get_instance().del_task(cur_dag->get_dag_id()))) {
if (OB_ENTRY_NOT_EXIST != tmp_ret) {
STORAGE_LOG_RET(WARN, tmp_ret, "failed to del sys task", K(tmp_ret), K(cur_dag->get_dag_id()));
}
}
if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cur_dag, tmp_dag_net))) {
STORAGE_LOG_RET(WARN, tmp_ret, "failed to abort dag", K(tmp_ret), KPC(cur_dag));
} else {
++abort_dag_cnt;
}
cur_dag = next;
} // end of while
} // end of prio loop
dag_list_[j].reset();
} // end of for
blocking_dag_net_list_.reset();
if (dag_map_.created()) {
dag_map_.destroy();
}
if (dag_net_map_[RUNNING_DAG_NET_MAP].created()) {
for (DagNetMap::iterator iter = dag_net_map_[RUNNING_DAG_NET_MAP].begin(); iter != dag_net_map_[RUNNING_DAG_NET_MAP].end(); ++iter) {
const bool ha_dag_net = iter->second->is_ha_dag_net();
iter->second->~ObIDagNet();
if (ha_dag_net) {
ha_allocator_.free(iter->second);
} else {
allocator_.free(iter->second);
}
} // end of for
dag_net_map_[RUNNING_DAG_NET_MAP].destroy();
}
if (dag_net_id_map_.created()) {
dag_net_id_map_.destroy();
}
COMMON_LOG(INFO, "ObTenantDagScheduler before allocator destroyed", K(abort_dag_cnt), K(allocator_.used()), K(ha_allocator_.used()));
allocator_.reset();
ha_allocator_.reset();
scheduler_sync_.destroy();
dag_cnt_ = 0;
dag_limit_ = 0;
total_worker_cnt_ = 0;
work_thread_num_ = 0;
total_running_task_cnt_ = 0;
scheduled_task_cnt_ = 0;
MEMSET(running_task_cnts_, 0, sizeof(running_task_cnts_));
MEMSET(dag_cnts_, 0, sizeof(dag_cnts_));
MEMSET(scheduled_task_cnts_, 0, sizeof(scheduled_task_cnts_));
MEMSET(dag_net_cnts_, 0, sizeof(dag_net_cnts_));
waiting_workers_.reset();
running_workers_.reset();
COMMON_LOG(INFO, "ObTenantDagScheduler destroyed");
TG_DESTROY(tg_id_);
} }
} }
void ObTenantDagScheduler::reset()
{
COMMON_LOG(INFO, "ObTenantDagScheduler starts to destroy");
stop();
notify();
wait();
destroy_all_workers();
is_inited_ = false; // avoid alloc dag/dag_net
WEAK_BARRIER();
int tmp_ret = OB_SUCCESS;
int64_t abort_dag_cnt = 0;
for (int64_t j = 0; j < DAG_LIST_MAX; ++j) {
for (int64_t i = 0; i < PriorityDagList::PRIO_CNT; ++i) {
ObIDag *head = dag_list_[j].get_head(i);
ObIDag *cur_dag = head->get_next();
ObIDag *next = NULL;
ObIDagNet *tmp_dag_net = nullptr;
while (NULL != cur_dag && head != cur_dag) {
next = cur_dag->get_next();
if (cur_dag->get_dag_id().is_valid()
&& OB_TMP_FAIL(ObSysTaskStatMgr::get_instance().del_task(cur_dag->get_dag_id()))) {
if (OB_ENTRY_NOT_EXIST != tmp_ret) {
STORAGE_LOG_RET(WARN, tmp_ret, "failed to del sys task", K(tmp_ret), K(cur_dag->get_dag_id()));
}
}
if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cur_dag, tmp_dag_net))) {
STORAGE_LOG_RET(WARN, tmp_ret, "failed to abort dag", K(tmp_ret), KPC(cur_dag));
} else {
++abort_dag_cnt;
}
cur_dag = next;
} // end of while
} // end of prio loop
dag_list_[j].reset();
} // end of for
blocking_dag_net_list_.reset();
if (dag_map_.created()) {
dag_map_.destroy();
}
if (dag_net_map_[RUNNING_DAG_NET_MAP].created()) {
for (DagNetMap::iterator iter = dag_net_map_[RUNNING_DAG_NET_MAP].begin(); iter != dag_net_map_[RUNNING_DAG_NET_MAP].end(); ++iter) {
const bool ha_dag_net = iter->second->is_ha_dag_net();
iter->second->~ObIDagNet();
if (ha_dag_net) {
ha_allocator_.free(iter->second);
} else {
allocator_.free(iter->second);
}
} // end of for
dag_net_map_[RUNNING_DAG_NET_MAP].destroy();
}
if (dag_net_id_map_.created()) {
dag_net_id_map_.destroy();
}
COMMON_LOG(INFO, "ObTenantDagScheduler before allocator destroyed", K(abort_dag_cnt), K(allocator_.used()), K(ha_allocator_.used()));
allocator_.reset();
ha_allocator_.reset();
scheduler_sync_.destroy();
dag_cnt_ = 0;
dag_limit_ = 0;
total_worker_cnt_ = 0;
work_thread_num_ = 0;
total_running_task_cnt_ = 0;
scheduled_task_cnt_ = 0;
MEMSET(running_task_cnts_, 0, sizeof(running_task_cnts_));
MEMSET(dag_cnts_, 0, sizeof(dag_cnts_));
MEMSET(scheduled_task_cnts_, 0, sizeof(scheduled_task_cnts_));
MEMSET(dag_net_cnts_, 0, sizeof(dag_net_cnts_));
waiting_workers_.reset();
running_workers_.reset();
TG_DESTROY(tg_id_);
COMMON_LOG(INFO, "ObTenantDagScheduler destroyed");
}
void ObTenantDagScheduler::free_dag(ObIDag &dag, ObIDag *parent_dag) void ObTenantDagScheduler::free_dag(ObIDag &dag, ObIDag *parent_dag)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -2704,7 +2716,7 @@ int ObTenantDagScheduler::sys_task_start(ObIDag *dag)
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
// allow comment truncation, no need to set ret // allow comment truncation, no need to set ret
if (OB_TMP_FAIL(dag->fill_comment(sys_task_status.comment_, sizeof(sys_task_status.comment_)))) { if (OB_TMP_FAIL(dag->fill_comment(sys_task_status.comment_, sizeof(sys_task_status.comment_)))) {
COMMON_LOG(WARN, "failed to fill comment", K(ret)); COMMON_LOG(WARN, "failed to fill comment", K(tmp_ret));
} }
if (OB_SUCCESS != (ret = ObSysTaskStatMgr::get_instance().add_task(sys_task_status))) { if (OB_SUCCESS != (ret = ObSysTaskStatMgr::get_instance().add_task(sys_task_status))) {
COMMON_LOG(WARN, "failed to add sys task", K(ret), K(sys_task_status)); COMMON_LOG(WARN, "failed to add sys task", K(ret), K(sys_task_status));

View File

@ -718,6 +718,7 @@ public:
int start(); int start();
void stop(); void stop();
void wait(); void wait();
void reset();
void destroy(); void destroy();
void resume(); void resume();
void run1() override; void run1() override;
@ -795,6 +796,7 @@ public:
void free_dag_net(T *&dag_net); void free_dag_net(T *&dag_net);
void run1() final; void run1() final;
void notify(); void notify();
void reset();
void destroy(); void destroy();
void get_default_config(); void get_default_config();
int64_t get_work_thread_num() const { return work_thread_num_; } int64_t get_work_thread_num() const { return work_thread_num_; }

View File

@ -219,7 +219,7 @@ int ObIDiagnoseInfoMgr::init(bool with_map,
seq_num_ = 1; seq_num_ = 1;
is_inited_ = true; is_inited_ = true;
} else { } else {
destroy(); reset();
} }
return ret; return ret;
} }
@ -227,16 +227,20 @@ int ObIDiagnoseInfoMgr::init(bool with_map,
void ObIDiagnoseInfoMgr::destroy() void ObIDiagnoseInfoMgr::destroy()
{ {
if (IS_INIT) { if (IS_INIT) {
common::SpinWLockGuard guard(lock_); reset();
common::SpinWLockGuard WLockGuard(rwlock_);
clear_with_no_lock();
if (info_map_.created()) {
info_map_.destroy();
}
allocator_.reset();
is_inited_ = false;
} }
} }
void ObIDiagnoseInfoMgr::reset()
{
common::SpinWLockGuard guard(lock_);
common::SpinWLockGuard WLockGuard(rwlock_);
clear_with_no_lock();
if (info_map_.created()) {
info_map_.destroy();
}
allocator_.reset();
is_inited_ = false;
}
void ObIDiagnoseInfoMgr::clear() void ObIDiagnoseInfoMgr::clear()
{ {

View File

@ -236,6 +236,7 @@ public:
const int64_t page_size=INFO_PAGE_SIZE, const int64_t page_size=INFO_PAGE_SIZE,
int64_t max_size=INFO_MAX_SIZE); int64_t max_size=INFO_MAX_SIZE);
void reset();
void destroy(); void destroy();
void clear(); void clear();
void clear_with_no_lock(); void clear_with_no_lock();

View File

@ -98,7 +98,7 @@ int ObTenantSSTableMergeInfoMgr::init(const int64_t page_size)
} }
if (!is_inited_) { if (!is_inited_) {
destroy(); reset();
} }
return ret; return ret;
} }
@ -106,13 +106,18 @@ int ObTenantSSTableMergeInfoMgr::init(const int64_t page_size)
void ObTenantSSTableMergeInfoMgr::destroy() void ObTenantSSTableMergeInfoMgr::destroy()
{ {
if (IS_INIT) { if (IS_INIT) {
major_info_pool_.destroy(); reset();
minor_info_pool_.destroy();
is_inited_ =false;
STORAGE_LOG(INFO, "ObTenantSSTableMergeInfoMgr destroy finish");
} }
} }
void ObTenantSSTableMergeInfoMgr::reset()
{
major_info_pool_.destroy();
minor_info_pool_.destroy();
is_inited_ = false;
STORAGE_LOG(INFO, "ObTenantSSTableMergeInfoMgr destroy finish");
}
int ObTenantSSTableMergeInfoMgr::open_iter(compaction::ObIDiagnoseInfoMgr::Iterator &major_iter, int ObTenantSSTableMergeInfoMgr::open_iter(compaction::ObIDiagnoseInfoMgr::Iterator &major_iter,
compaction::ObIDiagnoseInfoMgr::Iterator &minor_iter) compaction::ObIDiagnoseInfoMgr::Iterator &minor_iter)
{ {

View File

@ -39,6 +39,7 @@ public:
virtual ~ObTenantSSTableMergeInfoMgr(); virtual ~ObTenantSSTableMergeInfoMgr();
int init(const int64_t page_size=compaction::ObIDiagnoseInfoMgr::INFO_PAGE_SIZE); int init(const int64_t page_size=compaction::ObIDiagnoseInfoMgr::INFO_PAGE_SIZE);
int add_sstable_merge_info(ObSSTableMergeInfo &input_info); int add_sstable_merge_info(ObSSTableMergeInfo &input_info);
void reset();
void destroy(); void destroy();
int open_iter(compaction::ObIDiagnoseInfoMgr::Iterator &major_iter, int open_iter(compaction::ObIDiagnoseInfoMgr::Iterator &major_iter,
compaction::ObIDiagnoseInfoMgr::Iterator &minor_iter); compaction::ObIDiagnoseInfoMgr::Iterator &minor_iter);

View File

@ -269,19 +269,23 @@ void ObTableStoreStatMgr::wait()
} }
void ObTableStoreStatMgr::destroy() void ObTableStoreStatMgr::destroy()
{ {
if(is_inited_){ if (IS_INIT) {
is_inited_ = false; reset();
TG_DESTROY(lib::TGDefIDs::TableStatRpt);
report_task_.destroy();
report_cursor_ = 0;
pending_cursor_ = 0;
lru_head_ = NULL;
lru_tail_ = NULL;
cur_cnt_ = 0;
limit_cnt_ = 0;
quick_map_.destroy();
} }
} }
void ObTableStoreStatMgr::reset()
{
is_inited_ = false;
TG_DESTROY(lib::TGDefIDs::TableStatRpt);
report_task_.destroy();
report_cursor_ = 0;
pending_cursor_ = 0;
lru_head_ = NULL;
lru_tail_ = NULL;
cur_cnt_ = 0;
limit_cnt_ = 0;
quick_map_.destroy();
}
int ObTableStoreStatMgr::init(const int64_t limit_cnt) int ObTableStoreStatMgr::init(const int64_t limit_cnt)
{ {
@ -317,6 +321,9 @@ int ObTableStoreStatMgr::init(const int64_t limit_cnt)
is_inited_ = true; is_inited_ = true;
} }
} }
if (!is_inited_) {
reset();
}
return ret; return ret;
} }

View File

@ -216,6 +216,7 @@ public:
int init(const int64_t limit_cnt = DEFAULT_MAX_CNT); int init(const int64_t limit_cnt = DEFAULT_MAX_CNT);
void stop(); void stop();
void wait(); void wait();
void reset();
void destroy(); void destroy();
static ObTableStoreStatMgr &get_instance(); static ObTableStoreStatMgr &get_instance();
int report_stat(const ObTableStoreStat &stat); int report_stat(const ObTableStoreStat &stat);

View File

@ -695,7 +695,7 @@ public:
wait(); wait();
int64_t elapsed_time = ObTimeUtility::current_time() - start_time; int64_t elapsed_time = ObTimeUtility::current_time() - start_time;
COMMON_LOG(INFO, "stress test finished", K(elapsed_time / 1000)); COMMON_LOG(INFO, "stress test finished", K(elapsed_time / 1000));
int ret_code = system("grep ERROR test_dag_scheduler.log -q"); int ret_code = system("grep ERROR test_dag_scheduler.log -q | grep -v 'Fail to lock' | grep -v 'invalid tg id'");
ret_code = WEXITSTATUS(ret_code); ret_code = WEXITSTATUS(ret_code);
if (ret_code == 0) if (ret_code == 0)
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;