From f774ffa26b19b64f96a8b6eb02c7336f6568d00a Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 7 Feb 2024 23:12:05 +0000 Subject: [PATCH] fix the bug of canceled merge dag not deleted from dag_net_map --- deps/oblib/src/lib/utility/ob_tracepoint.h | 2 + .../scheduler/ob_tenant_dag_scheduler.cpp | 388 ++++++++++-------- src/share/scheduler/ob_tenant_dag_scheduler.h | 61 +-- src/storage/column_store/ob_co_merge_ctx.cpp | 6 + src/storage/column_store/ob_co_merge_dag.cpp | 33 +- src/storage/column_store/ob_co_merge_dag.h | 1 + .../test_dag_net_in_dag_scheduler.cpp | 6 +- 7 files changed, 302 insertions(+), 195 deletions(-) diff --git a/deps/oblib/src/lib/utility/ob_tracepoint.h b/deps/oblib/src/lib/utility/ob_tracepoint.h index e93dd6412a..a9e261a1c0 100644 --- a/deps/oblib/src/lib/utility/ob_tracepoint.h +++ b/deps/oblib/src/lib/utility/ob_tracepoint.h @@ -692,6 +692,8 @@ class EventTable EN_COMPACTION_SCHEDULE_META_MERGE = 735, EN_COMPACTION_ESTIMATE_ROW_FAILED = 736, EN_COMPACTION_UPDATE_REPORT_SCN = 737, + EN_CO_MREGE_DAG_READY_FOREVER = 738, + EN_CO_MREGE_DAG_SCHEDULE_REST = 739, // please add new trace point after 750 EN_SESSION_LEAK_COUNT_THRESHOLD = 751, diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.cpp b/src/share/scheduler/ob_tenant_dag_scheduler.cpp index 25924bfa23..ed138f3e41 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.cpp +++ b/src/share/scheduler/ob_tenant_dag_scheduler.cpp @@ -610,12 +610,12 @@ int ObIDag::add_child(ObIDag &child) return ret; } -int ObIDag::update_status_in_dag_net() +int ObIDag::update_status_in_dag_net(bool &dag_net_finished) { int ret = OB_SUCCESS; ObMutexGuard guard(lock_); if (OB_NOT_NULL(dag_net_)) { - dag_net_->update_dag_status(*this); + dag_net_->update_dag_status(*this, dag_net_finished); } return ret; } @@ -872,7 +872,7 @@ int ObIDag::reset_status_for_retry() return ret; } -int ObIDag::finish(const ObDagStatus status) +int ObIDag::finish(const ObDagStatus status, bool &dag_net_finished) { int ret = OB_SUCCESS; { @@ -887,7 +887,7 @@ int ObIDag::finish(const ObDagStatus status) if (OB_SUCC(ret)) { set_dag_status(status); set_dag_error_location(); - update_status_in_dag_net(); + update_status_in_dag_net(dag_net_finished); } return ret; } @@ -970,6 +970,24 @@ int ObIDag::fill_comment(char *buf, const int64_t buf_len) /*************************************ObIDagNet***********************************/ +const static char * ObDagNetListStr[] = { + "BLOCKING_DAG_NET_LIST", + "RUNNING_DAG_NET_LIST", + "FINISHED_DAG_NET_LIST" +}; + +const char *dag_net_list_to_str(const ObDagNetListIndex &dag_net_list_index) +{ + STATIC_ASSERT(static_cast(DAG_NET_LIST_MAX) == ARRAYSIZEOF(ObDagNetListStr), "dag net list str len is mismatch"); + const char *str = ""; + if (is_valid_dag_net_list(dag_net_list_index)) { + str = ObDagNetListStr[dag_net_list_index]; + } else { + str = "invalid_dag_net_list"; + } + return str; +} + ObIDagNet::ObIDagNet( const ObDagNetType::ObDagNetTypeEnum type) : is_stopped_(false), @@ -980,7 +998,8 @@ ObIDagNet::ObIDagNet( start_time_(0), dag_record_map_(), first_fail_dag_info_(nullptr), - is_cancel_(false) + is_cancel_(false), + is_finishing_last_dag_(false) { } @@ -1040,6 +1059,7 @@ void ObIDagNet::reset() { ObMutexGuard guard(lock_); is_stopped_ = true; + is_finishing_last_dag_ = false; if (dag_record_map_.created()) { for (DagRecordMap::iterator iter = dag_record_map_.begin(); iter != dag_record_map_.end(); ++iter) { @@ -1086,14 +1106,14 @@ bool ObIDagNet::check_finished_and_mark_stop() { ObMutexGuard guard(lock_); int ret = OB_SUCCESS; - if ((is_cancel_ || inner_check_finished()) && dag_record_map_.empty()) { + if (inner_check_finished_without_lock() && !is_finishing_last_dag_) { WEAK_BARRIER(); is_stopped_ = true; } return is_stopped_; } -int ObIDagNet::update_dag_status(ObIDag &dag) +int ObIDagNet::update_dag_status(ObIDag &dag, bool &dag_net_finished) { int ret = OB_SUCCESS; ObDagRecord *dag_record = nullptr; @@ -1120,6 +1140,11 @@ int ObIDagNet::update_dag_status(ObIDag &dag) } else if (FALSE_IT(dag_record->dag_status_ = dag.get_dag_status())) { } else if (ObIDag::is_finish_status(dag.get_dag_status())) { remove_dag_record_(*dag_record); + if (inner_check_finished_without_lock()) { + dag_net_finished = true; + is_finishing_last_dag_ = true; + COMMON_LOG(INFO, "last dag in dag_net is finishing", K(dag), KP(this)); + } } return ret; } @@ -1174,7 +1199,7 @@ int64_t ObIDagNet::to_string(char* buf, const int64_t buf_len) const J_NAME("ObIDagNet"); J_COLON(); J_KV(KP(this), K_(type), K_(dag_net_id), "dag_record_cnt", dag_record_map_.size(), - K_(is_stopped), K_(is_cancel), KP_(allocator)); + K_(is_stopped), K_(is_cancel), K_(is_finishing_last_dag), KP_(allocator)); J_OBJ_END(); } return pos; @@ -1211,6 +1236,12 @@ bool ObIDagNet::is_cancel() return is_cancel_; } +void ObIDagNet::set_last_dag_finished() +{ + ObMutexGuard guard(lock_); + is_finishing_last_dag_ = false; +} + bool ObIDagNet::is_inited() { return OB_NOT_NULL(allocator_); @@ -1788,7 +1819,6 @@ void ObDagPrioScheduler::destroy() { int tmp_ret = OB_SUCCESS; int64_t abort_dag_cnt = 0; - ObIDagNet *tmp_dag_net = nullptr; for (int64_t j = 0; j < DAG_LIST_MAX; ++j) { ObIDag *head = dag_list_[j].get_header(); ObIDag *cur_dag = head->get_next(); @@ -1801,7 +1831,7 @@ void ObDagPrioScheduler::destroy() } } - if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cur_dag, tmp_dag_net, false/*try_move_child*/))) { + if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cur_dag, false/*try_move_child*/))) { STORAGE_LOG_RET(WARN, tmp_ret, "failed to abort dag", K(tmp_ret), KPC(cur_dag)); } else { ++abort_dag_cnt; @@ -2028,9 +2058,7 @@ void ObDagPrioScheduler::add_added_info_(const ObDagType::ObDagTypeEnum dag_type scheduler_->add_added_dag_cnts(dag_type); } -int ObDagPrioScheduler::schedule_one_( - common::ObIArray &delayed_erase_dag_nets, - ObIDagNet *&extra_erase_dag_net) +int ObDagPrioScheduler::schedule_one_() { int ret = OB_SUCCESS; ObTenantDagWorker *worker = NULL; @@ -2040,7 +2068,7 @@ int ObDagPrioScheduler::schedule_one_( COMMON_LOG(WARN, "unexpected null scheduler", K(ret), KP_(scheduler)); } else if (!waiting_workers_.is_empty()) { worker = waiting_workers_.remove_first(); - } else if (OB_FAIL(pop_task_from_ready_list_(task, delayed_erase_dag_nets, extra_erase_dag_net))) { + } else if (OB_FAIL(pop_task_from_ready_list_(task))) { if (OB_ENTRY_NOT_EXIST != ret) { COMMON_LOG(WARN, "failed to pop task", K(ret), K_(priority)); } @@ -2102,6 +2130,7 @@ int ObDagPrioScheduler::schedule_dag_(ObIDag &dag, bool &move_dag_to_waiting_lis { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; + bool unused_tmp_bret = false; ObIDag::ObDagStatus next_dag_status = dag.get_dag_status(); if (OB_NOT_NULL(dag.get_dag_net()) && dag.get_dag_net()->is_cancel()) { next_dag_status = ObIDag::DAG_STATUS_NODE_FAILED; @@ -2121,47 +2150,13 @@ int ObDagPrioScheduler::schedule_dag_(ObIDag &dag, bool &move_dag_to_waiting_lis next_dag_status = ObIDag::DAG_STATUS_NODE_RUNNING; } dag.set_dag_status(next_dag_status); - dag.update_status_in_dag_net(); + dag.update_status_in_dag_net(unused_tmp_bret /* dag_net_finished */); dag.set_start_time(); // dag start running scheduler_->add_scheduled_dag_cnts(dag.get_type()); COMMON_LOG(DEBUG, "dag start running", K(ret), K(dag)); return ret; } -int ObDagPrioScheduler::erase_dag_net_without_lock_(ObIDagNet *erase_dag_net) -{ - int ret = OB_SUCCESS; - - if (OB_UNLIKELY(OB_ISNULL(scheduler_))) { - ret = OB_ERR_UNEXPECTED; - COMMON_LOG(WARN, "unexpected null scheduler", K(ret), KP_(scheduler)); - } else if (OB_ISNULL(erase_dag_net)) { - // do nothing - } else { - (void) scheduler_->finish_dag_net(erase_dag_net); // void - } - return ret; -} - -void ObDagPrioScheduler::erase_dag_nets_without_lock_( - common::ObIArray &delayed_erase_dag_nets, - ObIDagNet *extra_erase_dag_net) -{ - int tmp_ret = OB_SUCCESS; - if (OB_UNLIKELY(!delayed_erase_dag_nets.empty())) { - for (int64_t i = 0; i < delayed_erase_dag_nets.count(); ++i) { - if (OB_TMP_FAIL(erase_dag_net_without_lock_(delayed_erase_dag_nets.at(i)))) { - COMMON_LOG_RET(WARN, tmp_ret, "failed to erase dag net", K(i), K(delayed_erase_dag_nets)); - } - } - } - if (OB_NOT_NULL(extra_erase_dag_net)) { - if (OB_TMP_FAIL(erase_dag_net_without_lock_(extra_erase_dag_net))) { - COMMON_LOG_RET(WARN, tmp_ret, "failed to erase dag net", K(extra_erase_dag_net)); - } - } -} - // should hold prio_lock_ before calling this func bool ObDagPrioScheduler::check_need_compaction_rank_() const { @@ -2347,15 +2342,11 @@ void ObDagPrioScheduler::try_update_adaptive_task_limit_(const int64_t batch_siz } // under prio_lock_ -int ObDagPrioScheduler::pop_task_from_ready_list_( - ObITask *&task, - common::ObIArray &delayed_erase_dag_nets, - ObIDagNet *&extra_erase_dag_net) +int ObDagPrioScheduler::pop_task_from_ready_list_(ObITask *&task) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; task = nullptr; - extra_erase_dag_net = nullptr; // adaptive compaction scheduling if (is_rank_dag_prio() && OB_TMP_FAIL(rank_compaction_dags_())) { @@ -2368,11 +2359,26 @@ int ObDagPrioScheduler::pop_task_from_ready_list_( ObIDag *tmp_dag = nullptr; ObITask *ready_task = nullptr; ObIDag::ObDagStatus dag_status = ObIDag::DAG_STATUS_MAX; - ObIDagNet *erase_dag_net = nullptr; while (NULL != cur && head != cur && OB_SUCC(ret)) { bool move_dag_to_waiting_list = false; dag_status = cur->get_dag_status(); +#ifdef ERRSIM + ObIDagNet *tmp_dag_net = nullptr; + ret = OB_E(EventTable::EN_CO_MREGE_DAG_READY_FOREVER) ret; + if (OB_FAIL(ret)) { + ret = OB_SUCCESS; + if (OB_NOT_NULL(tmp_dag_net = cur->get_dag_net()) && tmp_dag_net->is_co_dag_net()) { + LOG_INFO("ERRSIM EN_CO_MREGE_DAG_READY_FOREVER", K(ret)); + if (tmp_dag_net->is_cancel()) { + LOG_INFO("ERRSIM EN_CO_MREGE_DAG_READY_FOREVER CO MERGE DAG IS CANCELED", K(ret)); + } else { + cur = cur->get_next(); + continue; + } + } + } +#endif if (!cur->check_with_lock()) { // TODO(@jingshui) cancel dag } else if (cur->get_indegree() > 0) { @@ -2386,14 +2392,9 @@ int ObDagPrioScheduler::pop_task_from_ready_list_( && 0 == cur->get_running_task_count()) { // no task running failed dag, need free tmp_dag = cur; cur = cur->get_next(); - if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *tmp_dag, erase_dag_net, true/*try_move_child*/))) { // will report result + if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *tmp_dag, true/*try_move_child*/))) { // will report result COMMON_LOG(WARN, "failed to deal with failed dag", K(ret), KPC(tmp_dag)); ob_abort(); - } else if (OB_ISNULL(erase_dag_net)) { - } else if (OB_FAIL(delayed_erase_dag_nets.push_back(erase_dag_net))) { - COMMON_LOG(WARN, "failed to add erase dag net, stop looping dag_list to deal with the erase_dag_net", K(ret)); - // if failed, we will lose this dag_net and never release it, so we should capture it with an extra ptr. - extra_erase_dag_net = erase_dag_net; } continue; } @@ -2483,20 +2484,36 @@ int ObDagPrioScheduler::generate_next_dag_(ObIDag &dag) return ret; } +int ObDagPrioScheduler::add_dag_warning_info_into_dag_net_(ObIDag &dag, bool &need_add) +{ + int ret = OB_SUCCESS; + ObIDagNet *dag_net = dag.get_dag_net(); + if (OB_NOT_NULL(dag_net) && dag_net->is_co_dag_net()) { + if (OB_FAIL(dag_net->add_dag_warning_info(&dag))) { + COMMON_LOG(WARN, "failed to add dag warning info into dag net", K(ret), K(dag), KPC(dag_net)); + } else { + need_add = false; + } + } + return ret; +} + // when cancel_dag(), the dag may have parent int ObDagPrioScheduler::finish_dag_( const ObIDag::ObDagStatus status, ObIDag &dag, - ObIDagNet *&finish_dag_net, const bool try_move_child) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - finish_dag_net = nullptr; + bool need_add = is_compaction_dag(dag.get_type()); // need add compaction dag warning info + bool dag_net_finished = false; + ObIDagNet *dag_net = dag.get_dag_net(); if (OB_UNLIKELY(dag.get_priority() != priority_ || OB_ISNULL(scheduler_))) { ret = OB_ERR_UNEXPECTED; COMMON_LOG(WARN, "unexpect value", K(ret), K(dag.get_priority()), K_(priority), KP_(scheduler)); - } else if (OB_FAIL(dag.finish(status))) { + } else if (FALSE_IT(add_dag_warning_info_into_dag_net_(dag, need_add))) { + } else if (OB_FAIL(dag.finish(status, dag_net_finished))) { // dag record will be erase, making this dag_net could be free. COMMON_LOG(WARN, "dag finished failed", K(ret), K(dag), "dag_ret", dag.get_dag_ret()); } else if (try_move_child && OB_FAIL(try_move_child_to_ready_list_(dag))) { LOG_WARN("failed to try move child to ready list", K(ret), K(&dag)); @@ -2530,21 +2547,6 @@ int ObDagPrioScheduler::finish_dag_( } } - ObIDagNet *dag_net = dag.get_dag_net(); - bool need_add = is_compaction_dag(dag.get_type()); - if (OB_ISNULL(dag_net)) { - } else { - if (dag_net->check_finished_and_mark_stop()) { - finish_dag_net = dag_net; - } - if (dag_net->is_co_dag_net()) { - if (OB_TMP_FAIL(dag_net->add_dag_warning_info(&dag))) { - COMMON_LOG(WARN, "failed to add dag warning info into dag net", K(tmp_ret), K(dag), KPC(dag_net)); - } else { - need_add = false; - } - } - } if (need_add) { if (OB_TMP_FAIL(MTL(ObDagWarningHistoryManager*)->add_dag_warning_info(&dag))) { COMMON_LOG(WARN, "failed to add dag warning info", K(tmp_ret), K(dag)); @@ -2567,6 +2569,10 @@ int ObDagPrioScheduler::finish_dag_( } scheduler_->inner_free_dag(dag); // free after log print + if (OB_NOT_NULL(dag_net) && dag_net_finished) { + dag_net->set_last_dag_finished(); + scheduler_->notify_when_dag_net_finish(); + } } return ret; } @@ -2637,8 +2643,7 @@ int ObDagPrioScheduler::deal_with_fail_dag_(ObIDag &dag, bool &retry_flag) int ObDagPrioScheduler::finish_task_in_dag_( ObITask &task, - ObIDag &dag, - ObIDagNet *&finish_dag_net) + ObIDag &dag) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; @@ -2656,7 +2661,7 @@ int ObDagPrioScheduler::finish_task_in_dag_( if (dag.has_finished()) { ObIDag::ObDagStatus status = dag.is_dag_failed() ? ObIDag::DAG_STATUS_ABORT : ObIDag::DAG_STATUS_FINISH; - if (OB_FAIL(finish_dag_(status, dag, finish_dag_net, true/*try_move_child*/))) { + if (OB_FAIL(finish_dag_(status, dag, true/*try_move_child*/))) { COMMON_LOG(WARN, "failed to finish dag and dag net", K(ret), K(dag)); ob_abort(); } @@ -2686,36 +2691,30 @@ int ObDagPrioScheduler::loop_ready_dag_list(bool &is_found) int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - common::ObSEArray delayed_erase_dag_nets; - ObIDagNet *extra_erase_dag_net = nullptr; { ObMutexGuard guard(prio_lock_); if (running_task_cnts_ < adaptive_task_limit_) { // if extra_erase_dag_net not null, the is_found must be false. if (!check_need_load_shedding_(true/*for_schedule*/)) { - is_found = (OB_SUCCESS == schedule_one_(delayed_erase_dag_nets, extra_erase_dag_net)); + is_found = (OB_SUCCESS == schedule_one_()); } while (running_task_cnts_ < adaptive_task_limit_ && is_found) { if (check_need_load_shedding_(true/*for_schedule*/)) { break; - } else if (OB_SUCCESS != schedule_one_(delayed_erase_dag_nets, extra_erase_dag_net)) { + } else if (OB_SUCCESS != schedule_one_()) { break; } } } } - // clear delayed_erase_dag_nets after releasing prio_lock, ignore ret - (void)erase_dag_nets_without_lock_(delayed_erase_dag_nets, extra_erase_dag_net); return ret; } int ObDagPrioScheduler::loop_waiting_dag_list() { int ret = OB_SUCCESS; - common::ObSEArray delayed_erase_dag_nets; - ObIDagNet *erase_dag_net = nullptr; { ObMutexGuard guard(prio_lock_); if (!dag_list_[WAITING_DAG_LIST].is_empty()) { @@ -2727,14 +2726,9 @@ int ObDagPrioScheduler::loop_waiting_dag_list() if (0 == cur->get_indegree() && OB_NOT_NULL(cur->get_dag_net()) && cur->get_dag_net()->is_cancel()) { move_dag = cur; cur = cur->get_next(); - if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *move_dag, erase_dag_net, true/*try_move_child*/))) { // will report result + if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *move_dag, true/*try_move_child*/))) { // will report result COMMON_LOG(WARN, "failed to deal with failed dag", K(ret), KPC(cur)); ob_abort(); - } else if (OB_ISNULL(erase_dag_net)) { - } else if (OB_FAIL(delayed_erase_dag_nets.push_back(erase_dag_net))) { - COMMON_LOG(WARN, "failed to add erase dag net, stop looping dag_list to deal with the erase_dag_net", K(ret)); - } else { - erase_dag_net = nullptr; // dag_net is in dag_nets array } } else if (0 == cur->get_indegree() && cur->check_can_schedule()) { move_dag = cur; @@ -2762,8 +2756,6 @@ int ObDagPrioScheduler::loop_waiting_dag_list() } } } // prio_lock_ unlock - // clear delayed_erase_dag_nets after releasing prio_lock, ignore ret - (void)erase_dag_nets_without_lock_(delayed_erase_dag_nets, erase_dag_net); return ret; } @@ -2933,7 +2925,6 @@ int ObDagPrioScheduler::check_ls_compaction_dag_exist_with_cancel( compaction::ObTabletMergeDag *dag = nullptr; exist = false; ObDagListIndex loop_list[2] = { READY_DAG_LIST, RANK_DAG_LIST }; - ObIDagNet *unused_erase_dag_net = nullptr; ObIDag *cancel_dag = nullptr; bool cancel_flag = false; int64_t cancel_dag_cnt = 0; @@ -2954,7 +2945,7 @@ int ObDagPrioScheduler::check_ls_compaction_dag_exist_with_cancel( if (OB_UNLIKELY(nullptr != cancel_dag->get_dag_net())) { tmp_ret = OB_ERR_UNEXPECTED; COMMON_LOG(WARN, "compaction dag should not in dag net", KR(tmp_ret)); - } else if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cancel_dag, unused_erase_dag_net, false/*try_move_child*/))) { + } else if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cancel_dag, false/*try_move_child*/))) { COMMON_LOG(WARN, "failed to erase dag", K(tmp_ret), KPC(cancel_dag)); ob_abort(); } else { @@ -3149,7 +3140,6 @@ int ObDagPrioScheduler::deal_with_finish_task( { int ret = OB_SUCCESS; ObIDag *dag = nullptr; - ObIDagNet *erase_dag_net = nullptr; if (OB_ISNULL(dag = task.get_dag())) { ret = OB_ERR_UNEXPECTED; @@ -3196,19 +3186,12 @@ int ObDagPrioScheduler::deal_with_finish_task( finish_task_flag = false; } - if (finish_task_flag && OB_FAIL(finish_task_in_dag_(task, *dag, erase_dag_net))) { + if (finish_task_flag && OB_FAIL(finish_task_in_dag_(task, *dag))) { COMMON_LOG(WARN, "failed to finish task", K(ret), KPC(dag)); } scheduler_->sub_running_dag_cnts(dag_type); } - // ensure that prio_lock is not locked before locking dag_net_map_lock - if (OB_SUCC(ret) && nullptr != erase_dag_net) { - if (OB_FAIL(erase_dag_net_without_lock_(erase_dag_net))) { - COMMON_LOG(WARN, "failed to finish dag net", K(ret)); - } - } - if (OB_SUCC(ret)) { scheduler_->sub_total_running_task_cnt(); ObMutexGuard guard(prio_lock_); @@ -3224,7 +3207,6 @@ int ObDagPrioScheduler::cancel_dag(const ObIDag &dag, const bool force_cancel) int hash_ret = OB_SUCCESS; bool free_flag = false; ObIDag *cur_dag = nullptr; - ObIDagNet *erase_dag_net = nullptr; { ObMutexGuard guard(prio_lock_); if (OB_SUCCESS != (hash_ret = dag_map_.get_refactored(&dag, cur_dag))) { @@ -3243,7 +3225,7 @@ int ObDagPrioScheduler::cancel_dag(const ObIDag &dag, const bool force_cancel) } else if (cur_dag->get_dag_status() == ObIDag::DAG_STATUS_READY) { LOG_INFO("cancel dag", K(ret), KP(cur_dag)); if (OB_FAIL(ret)) { - } else if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cur_dag, erase_dag_net, true/*try_move_child*/))) { + } else if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cur_dag, true/*try_move_child*/))) { COMMON_LOG(WARN, "failed to erase dag and dag net", K(ret), KPC(cur_dag)); ob_abort(); } @@ -3252,13 +3234,6 @@ int ObDagPrioScheduler::cancel_dag(const ObIDag &dag, const bool force_cancel) cur_dag->set_force_cancel_flag(); } } - - // ensure that prio_lock is not locked before locking dag_net_map_lock - if (OB_SUCC(ret) && nullptr != erase_dag_net) { - if (OB_FAIL(erase_dag_net_without_lock_(erase_dag_net))) { - COMMON_LOG(WARN, "failed to finish dag net", K(ret)); - } - } return ret; } @@ -3325,8 +3300,6 @@ bool ObDagPrioScheduler::try_switch(ObTenantDagWorker &worker) bool need_pause = false; int tmp_ret = OB_SUCCESS; - common::ObSEArray delayed_erase_dag_nets; - ObIDagNet *extra_erase_dag_net = nullptr; { ObMutexGuard guard(prio_lock_); if (running_task_cnts_ > adaptive_task_limit_) { @@ -3340,7 +3313,7 @@ bool ObDagPrioScheduler::try_switch(ObTenantDagWorker &worker) if (!need_pause && !waiting_workers_.is_empty()) { if (waiting_workers_.get_first()->need_wake_up()) { // schedule_one will schedule the first worker on the waiting list first - if (OB_TMP_FAIL(schedule_one_(delayed_erase_dag_nets, extra_erase_dag_net))) { + if (OB_TMP_FAIL(schedule_one_())) { if (OB_ENTRY_NOT_EXIST != tmp_ret) { COMMON_LOG_RET(WARN, tmp_ret, "failed to schedule one task", K_(priority)); } @@ -3354,7 +3327,6 @@ bool ObDagPrioScheduler::try_switch(ObTenantDagWorker &worker) } } - (void)erase_dag_nets_without_lock_(delayed_erase_dag_nets, extra_erase_dag_net); // ignore tmp_ret return need_pause; } @@ -3392,7 +3364,9 @@ bool ObDagPrioScheduler::check_need_load_shedding_(const bool for_schedule) /***************************************ObDagNetScheduler impl********************************************/ void ObDagNetScheduler::destroy() { - blocking_dag_net_list_.reset(); + for (int i = 0; i < DAG_NET_LIST_MAX; i++ ) { + dag_net_list_[i].reset(); + } const ObIDagNet *cur_dag_net = nullptr; ObIAllocator *allocator = nullptr; @@ -3462,14 +3436,43 @@ void ObDagNetScheduler::erase_dag_net_id_or_abort(ObIDagNet &dag_net) (void) erase_dag_net_or_abort(dag_net); } -void ObDagNetScheduler::erase_block_dag_net_or_abort(ObIDagNet *dag_net) +void ObDagNetScheduler::erase_dag_net_list_or_abort(const ObDagNetListIndex &dag_net_list_index, ObIDagNet *dag_net) { - if (!blocking_dag_net_list_.remove(dag_net)) { - COMMON_LOG_RET(WARN, OB_ERR_UNEXPECTED, "failed to remove dag_net from blocking_dag_net_list", K(dag_net)); + if (!is_valid_dag_net_list(dag_net_list_index) || !dag_net_list_[dag_net_list_index].remove(dag_net)) { + COMMON_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "failed to remove dag_net from", + "list", dag_net_list_to_str(dag_net_list_index), K(dag_net_list_index), K(dag_net)); ob_abort(); } } +void ObDagNetScheduler::add_dag_net_list_or_abort(const ObDagNetListIndex &dag_net_list_index, ObIDagNet *dag_net) +{ + if (!is_valid_dag_net_list(dag_net_list_index) || !dag_net_list_[dag_net_list_index].add_last(dag_net)) { + COMMON_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "failed to add dag_net into", + "list", dag_net_list_to_str(dag_net_list_index), K(dag_net_list_index), K(dag_net)); + ob_abort(); + } +} + +bool ObDagNetScheduler::is_empty() { + bool bret = true; + ObMutexGuard guard(dag_net_map_lock_); + if (!dag_net_list_[BLOCKING_DAG_NET_LIST].is_empty()) { + bret = false; + } else if (!dag_net_list_[RUNNING_DAG_NET_LIST].is_empty()) { + ObIDagNet *head = dag_net_list_[RUNNING_DAG_NET_LIST].get_header(); + ObIDagNet *cur = head->get_next(); + while (NULL != cur && head != cur && bret) { + if (!cur->check_finished_and_mark_stop()) { + bret = false; + } else { + cur = cur->get_next(); + } + } + } + return bret; +} + int ObDagNetScheduler::add_dag_net(ObIDagNet &dag_net) { int ret = OB_SUCCESS; @@ -3494,7 +3497,7 @@ int ObDagNetScheduler::add_dag_net(ObIDagNet &dag_net) } } (void) erase_dag_net_or_abort(dag_net); - } else if (!blocking_dag_net_list_.add_last(&dag_net)) {// add into blocking_dag_net_list + } else if (!dag_net_list_[BLOCKING_DAG_NET_LIST].add_last(&dag_net)) {// add into blocking_dag_net_list ret = OB_ERR_UNEXPECTED; COMMON_LOG(ERROR, "failed to add into blocking_dag_net_list", K(ret), K(dag_net)); (void) erase_dag_net_id_or_abort(dag_net); @@ -3524,6 +3527,8 @@ void ObDagNetScheduler::dump_dag_status() { int64_t running_dag_net_map_size = 0; int64_t blocking_dag_net_list_size = 0; + int64_t running_dag_net_list_size = 0; + int64_t dag_net_count[ObDagNetType::DAG_NET_TYPE_MAX]; { ObMutexGuard guard(dag_net_map_lock_); @@ -3531,9 +3536,10 @@ void ObDagNetScheduler::dump_dag_status() dag_net_count[i] = dag_net_cnts_[i]; } running_dag_net_map_size = dag_net_map_.size(); - blocking_dag_net_list_size = blocking_dag_net_list_.get_size(); + blocking_dag_net_list_size = dag_net_list_[BLOCKING_DAG_NET_LIST].get_size(); + running_dag_net_list_size = dag_net_list_[RUNNING_DAG_NET_LIST].get_size(); } - COMMON_LOG(INFO, "dump_dag_status[DAG_NET]", K(running_dag_net_map_size), K(blocking_dag_net_list_size)); + COMMON_LOG(INFO, "dump_dag_status[DAG_NET]", K(running_dag_net_map_size), K(blocking_dag_net_list_size), K(running_dag_net_list_size)); for (int64_t i = 0; i < ObDagNetType::DAG_NET_TYPE_MAX; ++i) { COMMON_LOG(INFO, "dump_dag_status[DAG_NET]", "type", OB_DAG_NET_TYPES[i].dag_net_type_str_, "dag_count", dag_net_count[i]); @@ -3566,17 +3572,21 @@ void ObDagNetScheduler::get_all_dag_info( { if (OB_NOT_NULL(info_list)) { ObMutexGuard guard(dag_net_map_lock_); - DagNetMap::iterator iter = dag_net_map_.begin(); - for (; iter != dag_net_map_.end() && idx < total_cnt; ++iter) { // ignore failure - ADD_DAG_INFO(iter->second, "RUNNING_DAG_NET_MAP"); - } - ObIDagNet *head = blocking_dag_net_list_.get_header(); + ObIDagNet *head = dag_net_list_[BLOCKING_DAG_NET_LIST].get_header(); ObIDagNet *cur = head->get_next(); while (NULL != cur && head != cur && idx < total_cnt) { - ADD_DAG_INFO(cur, "BLOCKING_DAG_NET_MAP"); + ADD_DAG_INFO(cur, "BLOCKING_DAG_NET_LIST"); cur = cur->get_next(); } + + head = dag_net_list_[RUNNING_DAG_NET_LIST].get_header(); + cur = head->get_next(); + while (NULL != cur && head != cur && idx < total_cnt) { + ADD_DAG_INFO(cur, "RUNNING_DAG_NET_LIST"); + cur = cur->get_next(); + } + } } @@ -3628,7 +3638,8 @@ bool ObDagNetScheduler::is_dag_map_full() ObMutexGuard guard(dag_net_map_lock_); COMMON_LOG(INFO, "dag map is almost full, stop loop blocking dag_net_map", "dag_map_size", scheduler_->get_cur_dag_cnt(), "dag_net_map_size", dag_net_map_.size(), - "blocking_dag_net_list_size", blocking_dag_net_list_.get_size()); + "blocking_dag_net_list_size", dag_net_list_[BLOCKING_DAG_NET_LIST].get_size(), + "running_dag_net_list_size", dag_net_list_[RUNNING_DAG_NET_LIST].get_size()); } } } @@ -3636,23 +3647,27 @@ bool ObDagNetScheduler::is_dag_map_full() return bret; } -int ObDagNetScheduler::loop_running_dag_net_map() +int ObDagNetScheduler::loop_running_dag_net_list() { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - ObIDagNet *dag_net = nullptr; int64_t slow_dag_net_cnt = 0; ObMutexGuard guard(dag_net_map_lock_); - DagNetMap::iterator iter = dag_net_map_.begin(); - for (; iter != dag_net_map_.end() && !is_dag_map_full(); ++iter) { // ignore failure - if (OB_ISNULL(dag_net = iter->second)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("dag net is unepxected null", K(ret), KP(dag_net)); - } else if (dag_net->is_started() && OB_FAIL(dag_net->schedule_rest_dag())) { - LOG_WARN("failed to schedule rest dag", K(ret)); + ObIDagNet *head = dag_net_list_[RUNNING_DAG_NET_LIST].get_header(); + ObIDagNet *cur = head->get_next(); + ObIDagNet *dag_net = nullptr; + + while (NULL != cur && head != cur) { // ignore failure + LOG_DEBUG("loop running dag net list", K(ret), KPC(cur)); + dag_net = cur; + cur = cur->get_next(); + if (dag_net->is_started() && OB_TMP_FAIL(dag_net->schedule_rest_dag())) { + LOG_WARN("failed to schedule rest dag", K(tmp_ret)); } else if (dag_net->check_finished_and_mark_stop()) { - LOG_WARN("dag net is in finish state", K(ret), KPC(dag_net)); + LOG_WARN("dag net is in finish state, move to finished list", K(ret), KPC(dag_net)); + (void) erase_dag_net_list_or_abort(RUNNING_DAG_NET_LIST, dag_net); + (void) add_dag_net_list_or_abort(FINISHED_DAG_NET_LIST, dag_net); } else if (dag_net->is_co_dag_net() && dag_net->get_start_time() + SLOW_COMPACTION_DAG_NET_THREASHOLD < ObTimeUtility::fast_current_time()) { ++slow_dag_net_cnt; @@ -3669,12 +3684,35 @@ int ObDagNetScheduler::loop_running_dag_net_map() } } } - COMMON_LOG(INFO, "loop running dag net map", K(ret), - "running_dag_net_map_size", dag_net_map_.size(), K(slow_dag_net_cnt)); + COMMON_LOG(INFO, "loop running dag net list", K(ret), + "running_dag_net_list_size", dag_net_list_[RUNNING_DAG_NET_LIST].get_size(), K(slow_dag_net_cnt), + "finished_dag_net_list_size", dag_net_list_[FINISHED_DAG_NET_LIST].get_size()); return ret; } +int ObDagNetScheduler::loop_finished_dag_net_list() +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(scheduler_)) { + ret = OB_ERR_UNEXPECTED; + COMMON_LOG(WARN, "scheduler is null", KP(scheduler_)); + } else { + ObIDagNet *head = dag_net_list_[FINISHED_DAG_NET_LIST].get_header(); + ObIDagNet *cur = head->get_next(); + ObIDagNet *dag_net = nullptr; + while (NULL != cur && head != cur) { + LOG_DEBUG("loop blocking dag net list", K(ret), KPC(cur)); + dag_net = cur; + cur = cur->get_next(); + (void) erase_dag_net_list_or_abort(FINISHED_DAG_NET_LIST, dag_net); + (void) scheduler_->finish_dag_net(dag_net); + } + } + COMMON_LOG(INFO, "loop finsihed dag net list", K(ret), "finished_dag_net_list_size", dag_net_list_[FINISHED_DAG_NET_LIST].get_size()); + return ret; +} + int ObDagNetScheduler::loop_blocking_dag_net_list() { int ret = OB_SUCCESS; @@ -3683,10 +3721,10 @@ int ObDagNetScheduler::loop_blocking_dag_net_list() COMMON_LOG(WARN, "scheduler is null", KP(scheduler_)); } else { ObMutexGuard guard(dag_net_map_lock_); - ObIDagNet *head = blocking_dag_net_list_.get_header(); + ObIDagNet *head = dag_net_list_[BLOCKING_DAG_NET_LIST].get_header(); ObIDagNet *cur = head->get_next(); ObIDagNet *tmp = nullptr; - int64_t rest_cnt = DEFAULT_MAX_RUNNING_DAG_NET_CNT - (dag_net_map_.size() - blocking_dag_net_list_.get_size()); + int64_t rest_cnt = DEFAULT_MAX_RUNNING_DAG_NET_CNT - (dag_net_map_.size() - dag_net_list_[BLOCKING_DAG_NET_LIST].get_size()); while (NULL != cur && head != cur && rest_cnt > 0 && !is_dag_map_full()) { LOG_DEBUG("loop blocking dag net list", K(ret), KPC(cur), K(rest_cnt)); tmp = cur; @@ -3696,12 +3734,13 @@ int ObDagNetScheduler::loop_blocking_dag_net_list() COMMON_LOG(WARN, "failed to start running or be canceled", K(ret), KPC(cur)); } (void) finish_dag_net_without_lock(*tmp); - (void) erase_block_dag_net_or_abort(tmp); + (void) erase_dag_net_list_or_abort(BLOCKING_DAG_NET_LIST, tmp); (void) scheduler_->free_dag_net(tmp); // set tmp nullptr } else { tmp->set_start_time(); --rest_cnt; - (void) erase_block_dag_net_or_abort(tmp); + (void) erase_dag_net_list_or_abort(BLOCKING_DAG_NET_LIST, tmp); + (void) add_dag_net_list_or_abort(RUNNING_DAG_NET_LIST, tmp); } } } @@ -3781,21 +3820,29 @@ int ObDagNetScheduler::check_ls_compaction_dag_exist_with_cancel(const ObLSID &l exist = false; ObMutexGuard dag_net_guard(dag_net_map_lock_); int64_t cancel_dag_cnt = 0; + ObIDagNet *head = nullptr; + ObIDagNet *cur = nullptr; ObIDagNet *cur_dag_net = nullptr; - DagNetMap::iterator iter = dag_net_map_.begin(); - for (; OB_SUCC(ret) && iter != dag_net_map_.end(); ++iter) { - if (OB_ISNULL(cur_dag_net = iter->second)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("dag net is unepxected null", K(ret), KP(cur_dag_net)); - } else if (cur_dag_net->is_co_dag_net()) { - compaction::ObCOMergeDagNet *co_dag_net = static_cast(cur_dag_net); - if (ls_id == co_dag_net->get_dag_param().ls_id_) { - cur_dag_net->set_cancel(); - ++cancel_dag_cnt; - exist = true; + + for (int i = BLOCKING_DAG_NET_LIST; i <= RUNNING_DAG_NET_LIST; i++) { + head = dag_net_list_[i].get_header(); + cur = head->get_next(); + cur_dag_net = nullptr; + + while (NULL != cur && head != cur) { + cur_dag_net = cur; + cur = cur->get_next(); + if (cur_dag_net->is_co_dag_net()) { + compaction::ObCOMergeDagNet *co_dag_net = static_cast(cur_dag_net); + if (ls_id == co_dag_net->get_dag_param().ls_id_) { + cur_dag_net->set_cancel(); + ++cancel_dag_cnt; + exist = true; + } } } - } // end of while + } + if (OB_SUCC(ret)) { LOG_INFO("success to cancel dag net", KR(ret), K(ls_id), K(cancel_dag_cnt), K(exist)); } @@ -4475,6 +4522,12 @@ void ObTenantDagScheduler::notify() scheduler_sync_.signal(); } +void ObTenantDagScheduler::notify_when_dag_net_finish() +{ + set_fast_schedule_dag_net(); + notify(); +} + int ObTenantDagScheduler::deal_with_finish_task(ObITask &task, ObTenantDagWorker &worker, int error_code) { int ret = OB_SUCCESS; @@ -4594,8 +4647,11 @@ void ObTenantDagScheduler::loop_dag_net() clear_fast_schedule_dag_net(); } if (need_loop_running_dag_net || REACH_TENANT_TIME_INTERVAL(LOOP_RUNNING_DAG_NET_MAP_INTERVAL)) { - if (OB_TMP_FAIL(dag_net_sche_.loop_running_dag_net_map())) { - COMMON_LOG_RET(WARN, tmp_ret, "failed to add dag from running_dag_net_map"); + if (OB_TMP_FAIL(dag_net_sche_.loop_running_dag_net_list())) { + COMMON_LOG_RET(WARN, tmp_ret, "failed to loop running_dag_net_list"); + } + if (OB_TMP_FAIL(dag_net_sche_.loop_finished_dag_net_list())) { + COMMON_LOG_RET(WARN, tmp_ret, "failed to loop finished_dag_net_list"); } } } diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.h b/src/share/scheduler/ob_tenant_dag_scheduler.h index da3097ce2e..8511bda81e 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.h +++ b/src/share/scheduler/ob_tenant_dag_scheduler.h @@ -280,6 +280,20 @@ enum ObDagListIndex DAG_LIST_MAX }; +enum ObDagNetListIndex +{ + BLOCKING_DAG_NET_LIST = 0, + RUNNING_DAG_NET_LIST = 1, + FINISHED_DAG_NET_LIST = 2, + DAG_NET_LIST_MAX +}; + +const char *dag_net_list_to_str(const ObDagNetListIndex &dag_net_list_index); +inline bool is_valid_dag_net_list(const ObDagNetListIndex &dag_net_list_index) +{ + return dag_net_list_index >= 0 && dag_net_list_index < DAG_NET_LIST_MAX; +} + struct ObIDagInitParam { ObIDagInitParam() {} @@ -406,8 +420,8 @@ public: return true; } int add_child(ObIDag &child); - int update_status_in_dag_net(); - int finish(const ObDagStatus status); + int update_status_in_dag_net(bool &dag_net_finished); + int finish(const ObDagStatus status, bool &dag_net_finished); void gene_dag_info(ObDagInfo &info, const char *list_info); virtual int gene_compaction_info(compaction::ObTabletCompactionProgress &progress) { @@ -550,8 +564,9 @@ public: { return true; } + OB_INLINE bool inner_check_finished_without_lock() { return (is_cancel_ || inner_check_finished()) && dag_record_map_.empty(); } bool check_finished_and_mark_stop(); - int update_dag_status(ObIDag &dag); + int update_dag_status(ObIDag &dag, bool &dag_net_finished); int erase_dag_from_dag_net(ObIDag &dag); static const char *get_dag_net_type_str(enum ObDagNetType::ObDagNetTypeEnum type); @@ -574,6 +589,7 @@ public: } void set_cancel(); bool is_cancel(); + void set_last_dag_finished(); bool is_inited(); bool is_started(); virtual int deal_with_cancel() @@ -610,6 +626,7 @@ private: ObDagId dag_net_id_; ObDagWarningInfo *first_fail_dag_info_; bool is_cancel_; + bool is_finishing_last_dag_; // making dag net freed after last dag freed if dag net can be freed after finish last dag }; struct ObDagInfo @@ -748,12 +765,13 @@ public: ObIAllocator &ha_allocator, ObTenantDagScheduler &scheduler); - bool is_empty() const { return 0 == dag_net_map_.size();} // only for unittest + bool is_empty(); // only for unittest int add_dag_net(ObIDagNet &dag_net); void erase_dag_net_or_abort(ObIDagNet &dag_net); void erase_dag_net_id_or_abort(ObIDagNet &dag_net); - void erase_block_dag_net_or_abort(ObIDagNet *dag_net); void finish_dag_net_without_lock(ObIDagNet &dag_net); + void erase_dag_net_list_or_abort(const ObDagNetListIndex &dag_net_list_index, ObIDagNet *dag_net); + void add_dag_net_list_or_abort(const ObDagNetListIndex &dag_net_list_index, ObIDagNet *dag_net); void finish_dag_net(ObIDagNet &dag_net); void dump_dag_status(); int64_t get_dag_net_count(); @@ -772,7 +790,9 @@ public: int64_t &start_time); int64_t get_dag_net_count(const ObDagNetType::ObDagNetTypeEnum type); bool is_dag_map_full(); - int loop_running_dag_net_map(); + int loop_running_dag_net_list(); + // do not hold dag_net_map_lock_, otherwise deadlock when clear_dag_net_ctx, see + int loop_finished_dag_net_list(); int loop_blocking_dag_net_list(); int check_dag_net_exist( const ObDagId &dag_id, bool &exist); @@ -801,7 +821,11 @@ private: ObTenantDagScheduler *scheduler_; lib::ObMutex dag_net_map_lock_; DagNetMap dag_net_map_; // lock by dag_net_map_lock_ - DagNetList blocking_dag_net_list_; // lock by dag_net_map_lock_ + /* + * blocking and running list should always locked by dag_net_map_lock_, but finished not. + * finished dag net list must without lock when free dag net, otherwise it would deadlock when clearing dag net ctx + */ + DagNetList dag_net_list_[DAG_NET_LIST_MAX]; DagNetIdMap dag_net_id_map_; // for HA to search dag_net of specified dag_id // lock by dag_net_map_lock_ int64_t dag_net_cnts_[ObDagNetType::DAG_NET_TYPE_MAX]; // lock by dag_net_map_lock_ }; @@ -932,14 +956,9 @@ private: ObIDag *&dag); void add_schedule_info_(const ObDagType::ObDagTypeEnum dag_type, const int64_t data_size); void add_added_info_(const ObDagType::ObDagTypeEnum dag_type); - int schedule_one_( - common::ObIArray &delayed_erase_dag_nets, - ObIDagNet *&extra_erase_dag_net); + int schedule_one_(); int schedule_dag_(ObIDag &dag, bool &move_dag_to_waiting_list); - int pop_task_from_ready_list_( - ObITask *&task, - common::ObIArray &delayed_erase_dag_nets, - ObIDagNet *&extra_erase_dag_net); + int pop_task_from_ready_list_(ObITask *&task); int rank_compaction_dags_(); void try_update_adaptive_task_limit_(const int64_t batch_size); int batch_move_compaction_dags_(const int64_t batch_size); @@ -948,20 +967,15 @@ private: const int64_t batch_size, common::ObSEArray &rank_dags); int generate_next_dag_(ObIDag &dag); + int add_dag_warning_info_into_dag_net_(ObIDag &dag, bool &need_add); int finish_dag_( const ObIDag::ObDagStatus status, ObIDag &dag, - ObIDagNet *&erase_dag_net, const bool try_move_child); - // ensure that prio_lock is not locked before calling this func - int erase_dag_net_without_lock_(ObIDagNet *erase_dag_net); - void erase_dag_nets_without_lock_( - common::ObIArray &delayed_erase_dag_nets, - ObIDagNet *extra_erase_dag_net); int try_move_child_to_ready_list_(ObIDag &dag); int erase_dag_(ObIDag &dag); int deal_with_fail_dag_(ObIDag &dag, bool &retry_flag); - int finish_task_in_dag_(ObITask &task, ObIDag &dag, ObIDagNet *&erase_dag_net); + int finish_task_in_dag_(ObITask &task, ObIDag &dag); void pause_worker_(ObTenantDagWorker &worker); bool check_need_load_shedding_(const bool for_schedule); @@ -1041,6 +1055,7 @@ public: void free_dag_net(T *&dag_net); void run1() final; void notify(); + void notify_when_dag_net_finish(); void reset(); void destroy(); int64_t get_work_thread_num() @@ -1049,7 +1064,7 @@ public: return work_thread_num_; } int64_t get_dag_limit() const { return dag_limit_; } - bool is_empty() const + bool is_empty() { bool bret = true; for (int64_t i = 0; i < ObDagPrio::DAG_PRIO_MAX; ++i) { @@ -1130,7 +1145,7 @@ private: static const int64_t DUMP_DAG_STATUS_INTERVAL = 10 * 1000LL * 1000LL; // 10s static const int64_t DEFAULT_CHECK_PERIOD = 3 * 1000 * 1000; // 3s static const int64_t LOOP_WAITING_DAG_LIST_INTERVAL = 5 * 1000 * 1000L; // 5s - static const int64_t LOOP_RUNNING_DAG_NET_MAP_INTERVAL = 3 * 60 * 1000 * 1000L; // 3m + static const int64_t LOOP_RUNNING_DAG_NET_MAP_INTERVAL = 1 * 60 * 1000 * 1000L; // 1m static const int32_t MAX_SHOW_DAG_NET_CNT_PER_PRIO = 500; static const int64_t MANY_DAG_COUNT = 2000; private: diff --git a/src/storage/column_store/ob_co_merge_ctx.cpp b/src/storage/column_store/ob_co_merge_ctx.cpp index e525fe5cae..1574df63cc 100644 --- a/src/storage/column_store/ob_co_merge_ctx.cpp +++ b/src/storage/column_store/ob_co_merge_ctx.cpp @@ -33,6 +33,12 @@ ObCOTabletMergeCtx::ObCOTabletMergeCtx( { } +/* + * ATTENTION: NEVER USE ANY LOG STREEM VARIABLES IN THIS FUNCTION. + * Destructor will be called when finish dag net. + * ObCOMergeDagNet is special, it will be check canceled when ls offine in ObDagNetScheduler::check_ls_compaction_dag_exist_with_cancel. + * But dag_net is only moved into finished dag net list and delaying freed. So if log streem variables used in this function after ls offine, it will be dangerous + */ ObCOTabletMergeCtx::~ObCOTabletMergeCtx() { if (OB_NOT_NULL(cg_merge_info_array_)) { diff --git a/src/storage/column_store/ob_co_merge_dag.cpp b/src/storage/column_store/ob_co_merge_dag.cpp index af2954b864..61c4fb2a7d 100644 --- a/src/storage/column_store/ob_co_merge_dag.cpp +++ b/src/storage/column_store/ob_co_merge_dag.cpp @@ -725,6 +725,12 @@ int ObCOMergeBatchExeTask::process() } void *buf = nullptr; +#ifdef ERRSIM + ret = OB_E(EventTable::EN_CO_MREGE_DAG_SCHEDULE_REST) ret; + if (OB_FAIL(ret)) { + LOG_INFO("ERRSIM EN_CO_MREGE_DAG_SCHEDULE_REST PROCESS FAILED", K(ret)); + } +#endif if (OB_FAIL(ret)) { } else if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObCOMerger)))) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -997,6 +1003,12 @@ ObCOMergeDagNet::ObCOMergeDagNet() { } +/* + * ATTENTION: NEVER USE ANY LOG STREEM VARIABLES IN THIS FUNCTION. + * Destructor will be called when finish dag net. + * ObCOMergeDagNet is special, it will be check canceled when ls offine in ObDagNetScheduler::check_ls_compaction_dag_exist_with_cancel. + * But dag_net is only moved into finished dag net list and delaying freed. So if log streem variables used in this function after ls offine, it will be dangerous + */ ObCOMergeDagNet::~ObCOMergeDagNet() { finish_dag_ = nullptr; @@ -1052,7 +1064,7 @@ int ObCOMergeDagNet::start_running() return ret; } -// schedule_rest_dag is called in loop_running_dag_net_map(Timer) +// schedule_rest_dag is called in loop_running_dag_net_list(Timer) // schedule dag may not finish yet, need wait merge_status >= PREPARE_FINISHED int ObCOMergeDagNet::schedule_rest_dag() { @@ -1067,13 +1079,30 @@ int ObCOMergeDagNet::schedule_rest_dag() ret = OB_ERR_UNEXPECTED; LOG_WARN("merge ctx is null or schema invalid", K(ret), KPC(co_merge_ctx_)); } else if (!is_cancel() && COMergeStatus::PREPARE_FINISHED <= ATOMIC_LOAD(&merge_status_)) { - if (OB_FAIL(inner_create_and_schedule_dags())) { +#ifdef ERRSIM + ret = OB_E(EventTable::EN_CO_MREGE_DAG_SCHEDULE_REST) ret; + if (OB_FAIL(ret)) { + LOG_INFO("ERRSIM EN_CO_MREGE_DAG_SCHEDULE_REST SCHEDULE FAILED", K(ret)); + } +#endif + if (FAILEDx(inner_create_and_schedule_dags())) { LOG_WARN("failed to create and schedule rest dags", K(ret)); } } return ret; } +/* + * ATTENTION: NEVER USE ANY LOG STREEM VARIABLES IN THIS FUNCTION. + * clear_dag_net_ctx() will be called when finish dag net. + * ObCOMergeDagNet is special, it will be check canceled when ls offine in ObDagNetScheduler::check_ls_compaction_dag_exist_with_cancel. + * But dag_net is only moved into finished dag net list and delaying freed. So if log streem variables used in this function after ls offine, it will be dangerous + */ +int ObCOMergeDagNet::clear_dag_net_ctx() +{ + return ObIDagNet::clear_dag_net_ctx(); +} + #define MARK_CG_SCHEDULE_STATUS(start_cg_idx, end_cg_idx, target_status) \ for (int64_t i = start_cg_idx; i < end_cg_idx; ++i) { \ if (ObCOTabletMergeCtx::CG_SCHE_STATUS_FINISHED != co_merge_ctx_->cg_schedule_status_array_[i]) { \ diff --git a/src/storage/column_store/ob_co_merge_dag.h b/src/storage/column_store/ob_co_merge_dag.h index 8eaa7d3afe..b122b8b0fd 100644 --- a/src/storage/column_store/ob_co_merge_dag.h +++ b/src/storage/column_store/ob_co_merge_dag.h @@ -269,6 +269,7 @@ public: virtual int fill_dag_net_key(char *buf, const int64_t buf_len) const override; virtual int fill_comment(char *buf, const int64_t buf_len) const override; virtual int schedule_rest_dag() override; + virtual int clear_dag_net_ctx() override; virtual bool inner_check_finished() override { return ATOMIC_LOAD(&finish_added_); diff --git a/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp b/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp index 95bc0a3e55..f92daa06ee 100644 --- a/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp +++ b/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp @@ -1582,7 +1582,7 @@ TEST_F(TestDagScheduler, test_cancel_dag_func) scheduler->cancel_dag(dag_array[i]); } - EXPECT_EQ(true, scheduler->is_empty()); + wait_scheduler(); EXPECT_EQ(0, MTL(ObDagWarningHistoryManager *)->size()); } @@ -1614,9 +1614,7 @@ TEST_F(TestDagScheduler, test_cancel_dag_net_func) } EXPECT_EQ(OB_SUCCESS, ret); - ob_usleep(5000 * 1000); - - EXPECT_EQ(true, scheduler->is_empty()); + wait_scheduler(); EXPECT_EQ(0, MTL(ObDagWarningHistoryManager *)->size()); }