fix the bug of canceled merge dag not deleted from dag_net_map

This commit is contained in:
Tsunaou
2023-12-18 16:13:21 +00:00
committed by ob-robot
parent e99cc037cc
commit c8d19338c9
7 changed files with 302 additions and 195 deletions

View File

@ -692,6 +692,8 @@ class EventTable
EN_COMPACTION_SCHEDULE_META_MERGE = 735, EN_COMPACTION_SCHEDULE_META_MERGE = 735,
EN_COMPACTION_ESTIMATE_ROW_FAILED = 736, EN_COMPACTION_ESTIMATE_ROW_FAILED = 736,
EN_COMPACTION_UPDATE_REPORT_SCN = 737, EN_COMPACTION_UPDATE_REPORT_SCN = 737,
EN_CO_MREGE_DAG_READY_FOREVER = 738,
EN_CO_MREGE_DAG_SCHEDULE_REST = 739,
// please add new trace point after 750 // please add new trace point after 750
EN_SESSION_LEAK_COUNT_THRESHOLD = 751, EN_SESSION_LEAK_COUNT_THRESHOLD = 751,

View File

@ -610,12 +610,12 @@ int ObIDag::add_child(ObIDag &child)
return ret; return ret;
} }
int ObIDag::update_status_in_dag_net() int ObIDag::update_status_in_dag_net(bool &dag_net_finished)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObMutexGuard guard(lock_); ObMutexGuard guard(lock_);
if (OB_NOT_NULL(dag_net_)) { if (OB_NOT_NULL(dag_net_)) {
dag_net_->update_dag_status(*this); dag_net_->update_dag_status(*this, dag_net_finished);
} }
return ret; return ret;
} }
@ -872,7 +872,7 @@ int ObIDag::reset_status_for_retry()
return ret; return ret;
} }
int ObIDag::finish(const ObDagStatus status) int ObIDag::finish(const ObDagStatus status, bool &dag_net_finished)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
{ {
@ -887,7 +887,7 @@ int ObIDag::finish(const ObDagStatus status)
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
set_dag_status(status); set_dag_status(status);
set_dag_error_location(); set_dag_error_location();
update_status_in_dag_net(); update_status_in_dag_net(dag_net_finished);
} }
return ret; return ret;
} }
@ -970,6 +970,24 @@ int ObIDag::fill_comment(char *buf, const int64_t buf_len)
/*************************************ObIDagNet***********************************/ /*************************************ObIDagNet***********************************/
const static char * ObDagNetListStr[] = {
"BLOCKING_DAG_NET_LIST",
"RUNNING_DAG_NET_LIST",
"FINISHED_DAG_NET_LIST"
};
const char *dag_net_list_to_str(const ObDagNetListIndex &dag_net_list_index)
{
STATIC_ASSERT(static_cast<int64_t>(DAG_NET_LIST_MAX) == ARRAYSIZEOF(ObDagNetListStr), "dag net list str len is mismatch");
const char *str = "";
if (is_valid_dag_net_list(dag_net_list_index)) {
str = ObDagNetListStr[dag_net_list_index];
} else {
str = "invalid_dag_net_list";
}
return str;
}
ObIDagNet::ObIDagNet( ObIDagNet::ObIDagNet(
const ObDagNetType::ObDagNetTypeEnum type) const ObDagNetType::ObDagNetTypeEnum type)
: is_stopped_(false), : is_stopped_(false),
@ -980,7 +998,8 @@ ObIDagNet::ObIDagNet(
start_time_(0), start_time_(0),
dag_record_map_(), dag_record_map_(),
first_fail_dag_info_(nullptr), first_fail_dag_info_(nullptr),
is_cancel_(false) is_cancel_(false),
is_finishing_last_dag_(false)
{ {
} }
@ -1040,6 +1059,7 @@ void ObIDagNet::reset()
{ {
ObMutexGuard guard(lock_); ObMutexGuard guard(lock_);
is_stopped_ = true; is_stopped_ = true;
is_finishing_last_dag_ = false;
if (dag_record_map_.created()) { if (dag_record_map_.created()) {
for (DagRecordMap::iterator iter = dag_record_map_.begin(); for (DagRecordMap::iterator iter = dag_record_map_.begin();
iter != dag_record_map_.end(); ++iter) { iter != dag_record_map_.end(); ++iter) {
@ -1086,14 +1106,14 @@ bool ObIDagNet::check_finished_and_mark_stop()
{ {
ObMutexGuard guard(lock_); ObMutexGuard guard(lock_);
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if ((is_cancel_ || inner_check_finished()) && dag_record_map_.empty()) { if (inner_check_finished_without_lock() && !is_finishing_last_dag_) {
WEAK_BARRIER(); WEAK_BARRIER();
is_stopped_ = true; is_stopped_ = true;
} }
return is_stopped_; return is_stopped_;
} }
int ObIDagNet::update_dag_status(ObIDag &dag) int ObIDagNet::update_dag_status(ObIDag &dag, bool &dag_net_finished)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObDagRecord *dag_record = nullptr; ObDagRecord *dag_record = nullptr;
@ -1120,6 +1140,11 @@ int ObIDagNet::update_dag_status(ObIDag &dag)
} else if (FALSE_IT(dag_record->dag_status_ = dag.get_dag_status())) { } else if (FALSE_IT(dag_record->dag_status_ = dag.get_dag_status())) {
} else if (ObIDag::is_finish_status(dag.get_dag_status())) { } else if (ObIDag::is_finish_status(dag.get_dag_status())) {
remove_dag_record_(*dag_record); remove_dag_record_(*dag_record);
if (inner_check_finished_without_lock()) {
dag_net_finished = true;
is_finishing_last_dag_ = true;
COMMON_LOG(INFO, "last dag in dag_net is finishing", K(dag), KP(this));
}
} }
return ret; return ret;
} }
@ -1174,7 +1199,7 @@ int64_t ObIDagNet::to_string(char* buf, const int64_t buf_len) const
J_NAME("ObIDagNet"); J_NAME("ObIDagNet");
J_COLON(); J_COLON();
J_KV(KP(this), K_(type), K_(dag_net_id), "dag_record_cnt", dag_record_map_.size(), J_KV(KP(this), K_(type), K_(dag_net_id), "dag_record_cnt", dag_record_map_.size(),
K_(is_stopped), K_(is_cancel), KP_(allocator)); K_(is_stopped), K_(is_cancel), K_(is_finishing_last_dag), KP_(allocator));
J_OBJ_END(); J_OBJ_END();
} }
return pos; return pos;
@ -1211,6 +1236,12 @@ bool ObIDagNet::is_cancel()
return is_cancel_; return is_cancel_;
} }
void ObIDagNet::set_last_dag_finished()
{
ObMutexGuard guard(lock_);
is_finishing_last_dag_ = false;
}
bool ObIDagNet::is_inited() bool ObIDagNet::is_inited()
{ {
return OB_NOT_NULL(allocator_); return OB_NOT_NULL(allocator_);
@ -1788,7 +1819,6 @@ void ObDagPrioScheduler::destroy()
{ {
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
int64_t abort_dag_cnt = 0; int64_t abort_dag_cnt = 0;
ObIDagNet *tmp_dag_net = nullptr;
for (int64_t j = 0; j < DAG_LIST_MAX; ++j) { for (int64_t j = 0; j < DAG_LIST_MAX; ++j) {
ObIDag *head = dag_list_[j].get_header(); ObIDag *head = dag_list_[j].get_header();
ObIDag *cur_dag = head->get_next(); ObIDag *cur_dag = head->get_next();
@ -1801,7 +1831,7 @@ void ObDagPrioScheduler::destroy()
} }
} }
if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cur_dag, tmp_dag_net, false/*try_move_child*/))) { if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cur_dag, false/*try_move_child*/))) {
STORAGE_LOG_RET(WARN, tmp_ret, "failed to abort dag", K(tmp_ret), KPC(cur_dag)); STORAGE_LOG_RET(WARN, tmp_ret, "failed to abort dag", K(tmp_ret), KPC(cur_dag));
} else { } else {
++abort_dag_cnt; ++abort_dag_cnt;
@ -2028,9 +2058,7 @@ void ObDagPrioScheduler::add_added_info_(const ObDagType::ObDagTypeEnum dag_type
scheduler_->add_added_dag_cnts(dag_type); scheduler_->add_added_dag_cnts(dag_type);
} }
int ObDagPrioScheduler::schedule_one_( int ObDagPrioScheduler::schedule_one_()
common::ObIArray<ObIDagNet *> &delayed_erase_dag_nets,
ObIDagNet *&extra_erase_dag_net)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObTenantDagWorker *worker = NULL; ObTenantDagWorker *worker = NULL;
@ -2040,7 +2068,7 @@ int ObDagPrioScheduler::schedule_one_(
COMMON_LOG(WARN, "unexpected null scheduler", K(ret), KP_(scheduler)); COMMON_LOG(WARN, "unexpected null scheduler", K(ret), KP_(scheduler));
} else if (!waiting_workers_.is_empty()) { } else if (!waiting_workers_.is_empty()) {
worker = waiting_workers_.remove_first(); worker = waiting_workers_.remove_first();
} else if (OB_FAIL(pop_task_from_ready_list_(task, delayed_erase_dag_nets, extra_erase_dag_net))) { } else if (OB_FAIL(pop_task_from_ready_list_(task))) {
if (OB_ENTRY_NOT_EXIST != ret) { if (OB_ENTRY_NOT_EXIST != ret) {
COMMON_LOG(WARN, "failed to pop task", K(ret), K_(priority)); COMMON_LOG(WARN, "failed to pop task", K(ret), K_(priority));
} }
@ -2102,6 +2130,7 @@ int ObDagPrioScheduler::schedule_dag_(ObIDag &dag, bool &move_dag_to_waiting_lis
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
bool unused_tmp_bret = false;
ObIDag::ObDagStatus next_dag_status = dag.get_dag_status(); ObIDag::ObDagStatus next_dag_status = dag.get_dag_status();
if (OB_NOT_NULL(dag.get_dag_net()) && dag.get_dag_net()->is_cancel()) { if (OB_NOT_NULL(dag.get_dag_net()) && dag.get_dag_net()->is_cancel()) {
next_dag_status = ObIDag::DAG_STATUS_NODE_FAILED; next_dag_status = ObIDag::DAG_STATUS_NODE_FAILED;
@ -2121,47 +2150,13 @@ int ObDagPrioScheduler::schedule_dag_(ObIDag &dag, bool &move_dag_to_waiting_lis
next_dag_status = ObIDag::DAG_STATUS_NODE_RUNNING; next_dag_status = ObIDag::DAG_STATUS_NODE_RUNNING;
} }
dag.set_dag_status(next_dag_status); dag.set_dag_status(next_dag_status);
dag.update_status_in_dag_net(); dag.update_status_in_dag_net(unused_tmp_bret /* dag_net_finished */);
dag.set_start_time(); // dag start running dag.set_start_time(); // dag start running
scheduler_->add_scheduled_dag_cnts(dag.get_type()); scheduler_->add_scheduled_dag_cnts(dag.get_type());
COMMON_LOG(DEBUG, "dag start running", K(ret), K(dag)); COMMON_LOG(DEBUG, "dag start running", K(ret), K(dag));
return ret; return ret;
} }
int ObDagPrioScheduler::erase_dag_net_without_lock_(ObIDagNet *erase_dag_net)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_ISNULL(scheduler_))) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "unexpected null scheduler", K(ret), KP_(scheduler));
} else if (OB_ISNULL(erase_dag_net)) {
// do nothing
} else {
(void) scheduler_->finish_dag_net(erase_dag_net); // void
}
return ret;
}
void ObDagPrioScheduler::erase_dag_nets_without_lock_(
common::ObIArray<ObIDagNet *> &delayed_erase_dag_nets,
ObIDagNet *extra_erase_dag_net)
{
int tmp_ret = OB_SUCCESS;
if (OB_UNLIKELY(!delayed_erase_dag_nets.empty())) {
for (int64_t i = 0; i < delayed_erase_dag_nets.count(); ++i) {
if (OB_TMP_FAIL(erase_dag_net_without_lock_(delayed_erase_dag_nets.at(i)))) {
COMMON_LOG_RET(WARN, tmp_ret, "failed to erase dag net", K(i), K(delayed_erase_dag_nets));
}
}
}
if (OB_NOT_NULL(extra_erase_dag_net)) {
if (OB_TMP_FAIL(erase_dag_net_without_lock_(extra_erase_dag_net))) {
COMMON_LOG_RET(WARN, tmp_ret, "failed to erase dag net", K(extra_erase_dag_net));
}
}
}
// should hold prio_lock_ before calling this func // should hold prio_lock_ before calling this func
bool ObDagPrioScheduler::check_need_compaction_rank_() const bool ObDagPrioScheduler::check_need_compaction_rank_() const
{ {
@ -2347,15 +2342,11 @@ void ObDagPrioScheduler::try_update_adaptive_task_limit_(const int64_t batch_siz
} }
// under prio_lock_ // under prio_lock_
int ObDagPrioScheduler::pop_task_from_ready_list_( int ObDagPrioScheduler::pop_task_from_ready_list_(ObITask *&task)
ObITask *&task,
common::ObIArray<ObIDagNet *> &delayed_erase_dag_nets,
ObIDagNet *&extra_erase_dag_net)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
task = nullptr; task = nullptr;
extra_erase_dag_net = nullptr;
// adaptive compaction scheduling // adaptive compaction scheduling
if (is_rank_dag_prio() && OB_TMP_FAIL(rank_compaction_dags_())) { if (is_rank_dag_prio() && OB_TMP_FAIL(rank_compaction_dags_())) {
@ -2368,11 +2359,26 @@ int ObDagPrioScheduler::pop_task_from_ready_list_(
ObIDag *tmp_dag = nullptr; ObIDag *tmp_dag = nullptr;
ObITask *ready_task = nullptr; ObITask *ready_task = nullptr;
ObIDag::ObDagStatus dag_status = ObIDag::DAG_STATUS_MAX; ObIDag::ObDagStatus dag_status = ObIDag::DAG_STATUS_MAX;
ObIDagNet *erase_dag_net = nullptr;
while (NULL != cur && head != cur && OB_SUCC(ret)) { while (NULL != cur && head != cur && OB_SUCC(ret)) {
bool move_dag_to_waiting_list = false; bool move_dag_to_waiting_list = false;
dag_status = cur->get_dag_status(); dag_status = cur->get_dag_status();
#ifdef ERRSIM
ObIDagNet *tmp_dag_net = nullptr;
ret = OB_E(EventTable::EN_CO_MREGE_DAG_READY_FOREVER) ret;
if (OB_FAIL(ret)) {
ret = OB_SUCCESS;
if (OB_NOT_NULL(tmp_dag_net = cur->get_dag_net()) && tmp_dag_net->is_co_dag_net()) {
LOG_INFO("ERRSIM EN_CO_MREGE_DAG_READY_FOREVER", K(ret));
if (tmp_dag_net->is_cancel()) {
LOG_INFO("ERRSIM EN_CO_MREGE_DAG_READY_FOREVER CO MERGE DAG IS CANCELED", K(ret));
} else {
cur = cur->get_next();
continue;
}
}
}
#endif
if (!cur->check_with_lock()) { if (!cur->check_with_lock()) {
// TODO(@jingshui) cancel dag // TODO(@jingshui) cancel dag
} else if (cur->get_indegree() > 0) { } else if (cur->get_indegree() > 0) {
@ -2386,14 +2392,9 @@ int ObDagPrioScheduler::pop_task_from_ready_list_(
&& 0 == cur->get_running_task_count()) { // no task running failed dag, need free && 0 == cur->get_running_task_count()) { // no task running failed dag, need free
tmp_dag = cur; tmp_dag = cur;
cur = cur->get_next(); cur = cur->get_next();
if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *tmp_dag, erase_dag_net, true/*try_move_child*/))) { // will report result if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *tmp_dag, true/*try_move_child*/))) { // will report result
COMMON_LOG(WARN, "failed to deal with failed dag", K(ret), KPC(tmp_dag)); COMMON_LOG(WARN, "failed to deal with failed dag", K(ret), KPC(tmp_dag));
ob_abort(); ob_abort();
} else if (OB_ISNULL(erase_dag_net)) {
} else if (OB_FAIL(delayed_erase_dag_nets.push_back(erase_dag_net))) {
COMMON_LOG(WARN, "failed to add erase dag net, stop looping dag_list to deal with the erase_dag_net", K(ret));
// if failed, we will lose this dag_net and never release it, so we should capture it with an extra ptr.
extra_erase_dag_net = erase_dag_net;
} }
continue; continue;
} }
@ -2483,20 +2484,36 @@ int ObDagPrioScheduler::generate_next_dag_(ObIDag &dag)
return ret; return ret;
} }
int ObDagPrioScheduler::add_dag_warning_info_into_dag_net_(ObIDag &dag, bool &need_add)
{
int ret = OB_SUCCESS;
ObIDagNet *dag_net = dag.get_dag_net();
if (OB_NOT_NULL(dag_net) && dag_net->is_co_dag_net()) {
if (OB_FAIL(dag_net->add_dag_warning_info(&dag))) {
COMMON_LOG(WARN, "failed to add dag warning info into dag net", K(ret), K(dag), KPC(dag_net));
} else {
need_add = false;
}
}
return ret;
}
// when cancel_dag(), the dag may have parent // when cancel_dag(), the dag may have parent
int ObDagPrioScheduler::finish_dag_( int ObDagPrioScheduler::finish_dag_(
const ObIDag::ObDagStatus status, const ObIDag::ObDagStatus status,
ObIDag &dag, ObIDag &dag,
ObIDagNet *&finish_dag_net,
const bool try_move_child) const bool try_move_child)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
finish_dag_net = nullptr; bool need_add = is_compaction_dag(dag.get_type()); // need add compaction dag warning info
bool dag_net_finished = false;
ObIDagNet *dag_net = dag.get_dag_net();
if (OB_UNLIKELY(dag.get_priority() != priority_ || OB_ISNULL(scheduler_))) { if (OB_UNLIKELY(dag.get_priority() != priority_ || OB_ISNULL(scheduler_))) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "unexpect value", K(ret), K(dag.get_priority()), K_(priority), KP_(scheduler)); COMMON_LOG(WARN, "unexpect value", K(ret), K(dag.get_priority()), K_(priority), KP_(scheduler));
} else if (OB_FAIL(dag.finish(status))) { } else if (FALSE_IT(add_dag_warning_info_into_dag_net_(dag, need_add))) {
} else if (OB_FAIL(dag.finish(status, dag_net_finished))) { // dag record will be erase, making this dag_net could be free.
COMMON_LOG(WARN, "dag finished failed", K(ret), K(dag), "dag_ret", dag.get_dag_ret()); COMMON_LOG(WARN, "dag finished failed", K(ret), K(dag), "dag_ret", dag.get_dag_ret());
} else if (try_move_child && OB_FAIL(try_move_child_to_ready_list_(dag))) { } else if (try_move_child && OB_FAIL(try_move_child_to_ready_list_(dag))) {
LOG_WARN("failed to try move child to ready list", K(ret), K(&dag)); LOG_WARN("failed to try move child to ready list", K(ret), K(&dag));
@ -2530,21 +2547,6 @@ int ObDagPrioScheduler::finish_dag_(
} }
} }
ObIDagNet *dag_net = dag.get_dag_net();
bool need_add = is_compaction_dag(dag.get_type());
if (OB_ISNULL(dag_net)) {
} else {
if (dag_net->check_finished_and_mark_stop()) {
finish_dag_net = dag_net;
}
if (dag_net->is_co_dag_net()) {
if (OB_TMP_FAIL(dag_net->add_dag_warning_info(&dag))) {
COMMON_LOG(WARN, "failed to add dag warning info into dag net", K(tmp_ret), K(dag), KPC(dag_net));
} else {
need_add = false;
}
}
}
if (need_add) { if (need_add) {
if (OB_TMP_FAIL(MTL(ObDagWarningHistoryManager*)->add_dag_warning_info(&dag))) { if (OB_TMP_FAIL(MTL(ObDagWarningHistoryManager*)->add_dag_warning_info(&dag))) {
COMMON_LOG(WARN, "failed to add dag warning info", K(tmp_ret), K(dag)); COMMON_LOG(WARN, "failed to add dag warning info", K(tmp_ret), K(dag));
@ -2567,6 +2569,10 @@ int ObDagPrioScheduler::finish_dag_(
} }
scheduler_->inner_free_dag(dag); // free after log print scheduler_->inner_free_dag(dag); // free after log print
if (OB_NOT_NULL(dag_net) && dag_net_finished) {
dag_net->set_last_dag_finished();
scheduler_->notify_when_dag_net_finish();
}
} }
return ret; return ret;
} }
@ -2637,8 +2643,7 @@ int ObDagPrioScheduler::deal_with_fail_dag_(ObIDag &dag, bool &retry_flag)
int ObDagPrioScheduler::finish_task_in_dag_( int ObDagPrioScheduler::finish_task_in_dag_(
ObITask &task, ObITask &task,
ObIDag &dag, ObIDag &dag)
ObIDagNet *&finish_dag_net)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
@ -2656,7 +2661,7 @@ int ObDagPrioScheduler::finish_task_in_dag_(
if (dag.has_finished()) { if (dag.has_finished()) {
ObIDag::ObDagStatus status = ObIDag::ObDagStatus status =
dag.is_dag_failed() ? ObIDag::DAG_STATUS_ABORT : ObIDag::DAG_STATUS_FINISH; dag.is_dag_failed() ? ObIDag::DAG_STATUS_ABORT : ObIDag::DAG_STATUS_FINISH;
if (OB_FAIL(finish_dag_(status, dag, finish_dag_net, true/*try_move_child*/))) { if (OB_FAIL(finish_dag_(status, dag, true/*try_move_child*/))) {
COMMON_LOG(WARN, "failed to finish dag and dag net", K(ret), K(dag)); COMMON_LOG(WARN, "failed to finish dag and dag net", K(ret), K(dag));
ob_abort(); ob_abort();
} }
@ -2686,36 +2691,30 @@ int ObDagPrioScheduler::loop_ready_dag_list(bool &is_found)
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
common::ObSEArray<ObIDagNet *, 64> delayed_erase_dag_nets;
ObIDagNet *extra_erase_dag_net = nullptr;
{ {
ObMutexGuard guard(prio_lock_); ObMutexGuard guard(prio_lock_);
if (running_task_cnts_ < adaptive_task_limit_) { if (running_task_cnts_ < adaptive_task_limit_) {
// if extra_erase_dag_net not null, the is_found must be false. // if extra_erase_dag_net not null, the is_found must be false.
if (!check_need_load_shedding_(true/*for_schedule*/)) { if (!check_need_load_shedding_(true/*for_schedule*/)) {
is_found = (OB_SUCCESS == schedule_one_(delayed_erase_dag_nets, extra_erase_dag_net)); is_found = (OB_SUCCESS == schedule_one_());
} }
while (running_task_cnts_ < adaptive_task_limit_ && is_found) { while (running_task_cnts_ < adaptive_task_limit_ && is_found) {
if (check_need_load_shedding_(true/*for_schedule*/)) { if (check_need_load_shedding_(true/*for_schedule*/)) {
break; break;
} else if (OB_SUCCESS != schedule_one_(delayed_erase_dag_nets, extra_erase_dag_net)) { } else if (OB_SUCCESS != schedule_one_()) {
break; break;
} }
} }
} }
} }
// clear delayed_erase_dag_nets after releasing prio_lock, ignore ret
(void)erase_dag_nets_without_lock_(delayed_erase_dag_nets, extra_erase_dag_net);
return ret; return ret;
} }
int ObDagPrioScheduler::loop_waiting_dag_list() int ObDagPrioScheduler::loop_waiting_dag_list()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
common::ObSEArray<ObIDagNet *, 64> delayed_erase_dag_nets;
ObIDagNet *erase_dag_net = nullptr;
{ {
ObMutexGuard guard(prio_lock_); ObMutexGuard guard(prio_lock_);
if (!dag_list_[WAITING_DAG_LIST].is_empty()) { if (!dag_list_[WAITING_DAG_LIST].is_empty()) {
@ -2727,14 +2726,9 @@ int ObDagPrioScheduler::loop_waiting_dag_list()
if (0 == cur->get_indegree() && OB_NOT_NULL(cur->get_dag_net()) && cur->get_dag_net()->is_cancel()) { if (0 == cur->get_indegree() && OB_NOT_NULL(cur->get_dag_net()) && cur->get_dag_net()->is_cancel()) {
move_dag = cur; move_dag = cur;
cur = cur->get_next(); cur = cur->get_next();
if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *move_dag, erase_dag_net, true/*try_move_child*/))) { // will report result if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *move_dag, true/*try_move_child*/))) { // will report result
COMMON_LOG(WARN, "failed to deal with failed dag", K(ret), KPC(cur)); COMMON_LOG(WARN, "failed to deal with failed dag", K(ret), KPC(cur));
ob_abort(); ob_abort();
} else if (OB_ISNULL(erase_dag_net)) {
} else if (OB_FAIL(delayed_erase_dag_nets.push_back(erase_dag_net))) {
COMMON_LOG(WARN, "failed to add erase dag net, stop looping dag_list to deal with the erase_dag_net", K(ret));
} else {
erase_dag_net = nullptr; // dag_net is in dag_nets array
} }
} else if (0 == cur->get_indegree() && cur->check_can_schedule()) { } else if (0 == cur->get_indegree() && cur->check_can_schedule()) {
move_dag = cur; move_dag = cur;
@ -2762,8 +2756,6 @@ int ObDagPrioScheduler::loop_waiting_dag_list()
} }
} }
} // prio_lock_ unlock } // prio_lock_ unlock
// clear delayed_erase_dag_nets after releasing prio_lock, ignore ret
(void)erase_dag_nets_without_lock_(delayed_erase_dag_nets, erase_dag_net);
return ret; return ret;
} }
@ -2933,7 +2925,6 @@ int ObDagPrioScheduler::check_ls_compaction_dag_exist_with_cancel(
compaction::ObTabletMergeDag *dag = nullptr; compaction::ObTabletMergeDag *dag = nullptr;
exist = false; exist = false;
ObDagListIndex loop_list[2] = { READY_DAG_LIST, RANK_DAG_LIST }; ObDagListIndex loop_list[2] = { READY_DAG_LIST, RANK_DAG_LIST };
ObIDagNet *unused_erase_dag_net = nullptr;
ObIDag *cancel_dag = nullptr; ObIDag *cancel_dag = nullptr;
bool cancel_flag = false; bool cancel_flag = false;
int64_t cancel_dag_cnt = 0; int64_t cancel_dag_cnt = 0;
@ -2954,7 +2945,7 @@ int ObDagPrioScheduler::check_ls_compaction_dag_exist_with_cancel(
if (OB_UNLIKELY(nullptr != cancel_dag->get_dag_net())) { if (OB_UNLIKELY(nullptr != cancel_dag->get_dag_net())) {
tmp_ret = OB_ERR_UNEXPECTED; tmp_ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "compaction dag should not in dag net", KR(tmp_ret)); COMMON_LOG(WARN, "compaction dag should not in dag net", KR(tmp_ret));
} else if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cancel_dag, unused_erase_dag_net, false/*try_move_child*/))) { } else if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cancel_dag, false/*try_move_child*/))) {
COMMON_LOG(WARN, "failed to erase dag", K(tmp_ret), KPC(cancel_dag)); COMMON_LOG(WARN, "failed to erase dag", K(tmp_ret), KPC(cancel_dag));
ob_abort(); ob_abort();
} else { } else {
@ -3149,7 +3140,6 @@ int ObDagPrioScheduler::deal_with_finish_task(
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObIDag *dag = nullptr; ObIDag *dag = nullptr;
ObIDagNet *erase_dag_net = nullptr;
if (OB_ISNULL(dag = task.get_dag())) { if (OB_ISNULL(dag = task.get_dag())) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
@ -3196,19 +3186,12 @@ int ObDagPrioScheduler::deal_with_finish_task(
finish_task_flag = false; finish_task_flag = false;
} }
if (finish_task_flag && OB_FAIL(finish_task_in_dag_(task, *dag, erase_dag_net))) { if (finish_task_flag && OB_FAIL(finish_task_in_dag_(task, *dag))) {
COMMON_LOG(WARN, "failed to finish task", K(ret), KPC(dag)); COMMON_LOG(WARN, "failed to finish task", K(ret), KPC(dag));
} }
scheduler_->sub_running_dag_cnts(dag_type); scheduler_->sub_running_dag_cnts(dag_type);
} }
// ensure that prio_lock is not locked before locking dag_net_map_lock
if (OB_SUCC(ret) && nullptr != erase_dag_net) {
if (OB_FAIL(erase_dag_net_without_lock_(erase_dag_net))) {
COMMON_LOG(WARN, "failed to finish dag net", K(ret));
}
}
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
scheduler_->sub_total_running_task_cnt(); scheduler_->sub_total_running_task_cnt();
ObMutexGuard guard(prio_lock_); ObMutexGuard guard(prio_lock_);
@ -3224,7 +3207,6 @@ int ObDagPrioScheduler::cancel_dag(const ObIDag &dag, const bool force_cancel)
int hash_ret = OB_SUCCESS; int hash_ret = OB_SUCCESS;
bool free_flag = false; bool free_flag = false;
ObIDag *cur_dag = nullptr; ObIDag *cur_dag = nullptr;
ObIDagNet *erase_dag_net = nullptr;
{ {
ObMutexGuard guard(prio_lock_); ObMutexGuard guard(prio_lock_);
if (OB_SUCCESS != (hash_ret = dag_map_.get_refactored(&dag, cur_dag))) { if (OB_SUCCESS != (hash_ret = dag_map_.get_refactored(&dag, cur_dag))) {
@ -3243,7 +3225,7 @@ int ObDagPrioScheduler::cancel_dag(const ObIDag &dag, const bool force_cancel)
} else if (cur_dag->get_dag_status() == ObIDag::DAG_STATUS_READY) { } else if (cur_dag->get_dag_status() == ObIDag::DAG_STATUS_READY) {
LOG_INFO("cancel dag", K(ret), KP(cur_dag)); LOG_INFO("cancel dag", K(ret), KP(cur_dag));
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
} else if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cur_dag, erase_dag_net, true/*try_move_child*/))) { } else if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cur_dag, true/*try_move_child*/))) {
COMMON_LOG(WARN, "failed to erase dag and dag net", K(ret), KPC(cur_dag)); COMMON_LOG(WARN, "failed to erase dag and dag net", K(ret), KPC(cur_dag));
ob_abort(); ob_abort();
} }
@ -3252,13 +3234,6 @@ int ObDagPrioScheduler::cancel_dag(const ObIDag &dag, const bool force_cancel)
cur_dag->set_force_cancel_flag(); cur_dag->set_force_cancel_flag();
} }
} }
// ensure that prio_lock is not locked before locking dag_net_map_lock
if (OB_SUCC(ret) && nullptr != erase_dag_net) {
if (OB_FAIL(erase_dag_net_without_lock_(erase_dag_net))) {
COMMON_LOG(WARN, "failed to finish dag net", K(ret));
}
}
return ret; return ret;
} }
@ -3325,8 +3300,6 @@ bool ObDagPrioScheduler::try_switch(ObTenantDagWorker &worker)
bool need_pause = false; bool need_pause = false;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
common::ObSEArray<ObIDagNet *, 64> delayed_erase_dag_nets;
ObIDagNet *extra_erase_dag_net = nullptr;
{ {
ObMutexGuard guard(prio_lock_); ObMutexGuard guard(prio_lock_);
if (running_task_cnts_ > adaptive_task_limit_) { if (running_task_cnts_ > adaptive_task_limit_) {
@ -3340,7 +3313,7 @@ bool ObDagPrioScheduler::try_switch(ObTenantDagWorker &worker)
if (!need_pause && !waiting_workers_.is_empty()) { if (!need_pause && !waiting_workers_.is_empty()) {
if (waiting_workers_.get_first()->need_wake_up()) { if (waiting_workers_.get_first()->need_wake_up()) {
// schedule_one will schedule the first worker on the waiting list first // schedule_one will schedule the first worker on the waiting list first
if (OB_TMP_FAIL(schedule_one_(delayed_erase_dag_nets, extra_erase_dag_net))) { if (OB_TMP_FAIL(schedule_one_())) {
if (OB_ENTRY_NOT_EXIST != tmp_ret) { if (OB_ENTRY_NOT_EXIST != tmp_ret) {
COMMON_LOG_RET(WARN, tmp_ret, "failed to schedule one task", K_(priority)); COMMON_LOG_RET(WARN, tmp_ret, "failed to schedule one task", K_(priority));
} }
@ -3354,7 +3327,6 @@ bool ObDagPrioScheduler::try_switch(ObTenantDagWorker &worker)
} }
} }
(void)erase_dag_nets_without_lock_(delayed_erase_dag_nets, extra_erase_dag_net); // ignore tmp_ret
return need_pause; return need_pause;
} }
@ -3392,7 +3364,9 @@ bool ObDagPrioScheduler::check_need_load_shedding_(const bool for_schedule)
/***************************************ObDagNetScheduler impl********************************************/ /***************************************ObDagNetScheduler impl********************************************/
void ObDagNetScheduler::destroy() void ObDagNetScheduler::destroy()
{ {
blocking_dag_net_list_.reset(); for (int i = 0; i < DAG_NET_LIST_MAX; i++ ) {
dag_net_list_[i].reset();
}
const ObIDagNet *cur_dag_net = nullptr; const ObIDagNet *cur_dag_net = nullptr;
ObIAllocator *allocator = nullptr; ObIAllocator *allocator = nullptr;
@ -3462,14 +3436,43 @@ void ObDagNetScheduler::erase_dag_net_id_or_abort(ObIDagNet &dag_net)
(void) erase_dag_net_or_abort(dag_net); (void) erase_dag_net_or_abort(dag_net);
} }
void ObDagNetScheduler::erase_block_dag_net_or_abort(ObIDagNet *dag_net) void ObDagNetScheduler::erase_dag_net_list_or_abort(const ObDagNetListIndex &dag_net_list_index, ObIDagNet *dag_net)
{ {
if (!blocking_dag_net_list_.remove(dag_net)) { if (!is_valid_dag_net_list(dag_net_list_index) || !dag_net_list_[dag_net_list_index].remove(dag_net)) {
COMMON_LOG_RET(WARN, OB_ERR_UNEXPECTED, "failed to remove dag_net from blocking_dag_net_list", K(dag_net)); COMMON_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "failed to remove dag_net from",
"list", dag_net_list_to_str(dag_net_list_index), K(dag_net_list_index), K(dag_net));
ob_abort(); ob_abort();
} }
} }
void ObDagNetScheduler::add_dag_net_list_or_abort(const ObDagNetListIndex &dag_net_list_index, ObIDagNet *dag_net)
{
if (!is_valid_dag_net_list(dag_net_list_index) || !dag_net_list_[dag_net_list_index].add_last(dag_net)) {
COMMON_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "failed to add dag_net into",
"list", dag_net_list_to_str(dag_net_list_index), K(dag_net_list_index), K(dag_net));
ob_abort();
}
}
bool ObDagNetScheduler::is_empty() {
bool bret = true;
ObMutexGuard guard(dag_net_map_lock_);
if (!dag_net_list_[BLOCKING_DAG_NET_LIST].is_empty()) {
bret = false;
} else if (!dag_net_list_[RUNNING_DAG_NET_LIST].is_empty()) {
ObIDagNet *head = dag_net_list_[RUNNING_DAG_NET_LIST].get_header();
ObIDagNet *cur = head->get_next();
while (NULL != cur && head != cur && bret) {
if (!cur->check_finished_and_mark_stop()) {
bret = false;
} else {
cur = cur->get_next();
}
}
}
return bret;
}
int ObDagNetScheduler::add_dag_net(ObIDagNet &dag_net) int ObDagNetScheduler::add_dag_net(ObIDagNet &dag_net)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -3494,7 +3497,7 @@ int ObDagNetScheduler::add_dag_net(ObIDagNet &dag_net)
} }
} }
(void) erase_dag_net_or_abort(dag_net); (void) erase_dag_net_or_abort(dag_net);
} else if (!blocking_dag_net_list_.add_last(&dag_net)) {// add into blocking_dag_net_list } else if (!dag_net_list_[BLOCKING_DAG_NET_LIST].add_last(&dag_net)) {// add into blocking_dag_net_list
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
COMMON_LOG(ERROR, "failed to add into blocking_dag_net_list", K(ret), K(dag_net)); COMMON_LOG(ERROR, "failed to add into blocking_dag_net_list", K(ret), K(dag_net));
(void) erase_dag_net_id_or_abort(dag_net); (void) erase_dag_net_id_or_abort(dag_net);
@ -3524,6 +3527,8 @@ void ObDagNetScheduler::dump_dag_status()
{ {
int64_t running_dag_net_map_size = 0; int64_t running_dag_net_map_size = 0;
int64_t blocking_dag_net_list_size = 0; int64_t blocking_dag_net_list_size = 0;
int64_t running_dag_net_list_size = 0;
int64_t dag_net_count[ObDagNetType::DAG_NET_TYPE_MAX]; int64_t dag_net_count[ObDagNetType::DAG_NET_TYPE_MAX];
{ {
ObMutexGuard guard(dag_net_map_lock_); ObMutexGuard guard(dag_net_map_lock_);
@ -3531,9 +3536,10 @@ void ObDagNetScheduler::dump_dag_status()
dag_net_count[i] = dag_net_cnts_[i]; dag_net_count[i] = dag_net_cnts_[i];
} }
running_dag_net_map_size = dag_net_map_.size(); running_dag_net_map_size = dag_net_map_.size();
blocking_dag_net_list_size = blocking_dag_net_list_.get_size(); blocking_dag_net_list_size = dag_net_list_[BLOCKING_DAG_NET_LIST].get_size();
running_dag_net_list_size = dag_net_list_[RUNNING_DAG_NET_LIST].get_size();
} }
COMMON_LOG(INFO, "dump_dag_status[DAG_NET]", K(running_dag_net_map_size), K(blocking_dag_net_list_size)); COMMON_LOG(INFO, "dump_dag_status[DAG_NET]", K(running_dag_net_map_size), K(blocking_dag_net_list_size), K(running_dag_net_list_size));
for (int64_t i = 0; i < ObDagNetType::DAG_NET_TYPE_MAX; ++i) { for (int64_t i = 0; i < ObDagNetType::DAG_NET_TYPE_MAX; ++i) {
COMMON_LOG(INFO, "dump_dag_status[DAG_NET]", "type", OB_DAG_NET_TYPES[i].dag_net_type_str_, COMMON_LOG(INFO, "dump_dag_status[DAG_NET]", "type", OB_DAG_NET_TYPES[i].dag_net_type_str_,
"dag_count", dag_net_count[i]); "dag_count", dag_net_count[i]);
@ -3566,17 +3572,21 @@ void ObDagNetScheduler::get_all_dag_info(
{ {
if (OB_NOT_NULL(info_list)) { if (OB_NOT_NULL(info_list)) {
ObMutexGuard guard(dag_net_map_lock_); ObMutexGuard guard(dag_net_map_lock_);
DagNetMap::iterator iter = dag_net_map_.begin();
for (; iter != dag_net_map_.end() && idx < total_cnt; ++iter) { // ignore failure
ADD_DAG_INFO(iter->second, "RUNNING_DAG_NET_MAP");
}
ObIDagNet *head = blocking_dag_net_list_.get_header(); ObIDagNet *head = dag_net_list_[BLOCKING_DAG_NET_LIST].get_header();
ObIDagNet *cur = head->get_next(); ObIDagNet *cur = head->get_next();
while (NULL != cur && head != cur && idx < total_cnt) { while (NULL != cur && head != cur && idx < total_cnt) {
ADD_DAG_INFO(cur, "BLOCKING_DAG_NET_MAP"); ADD_DAG_INFO(cur, "BLOCKING_DAG_NET_LIST");
cur = cur->get_next(); cur = cur->get_next();
} }
head = dag_net_list_[RUNNING_DAG_NET_LIST].get_header();
cur = head->get_next();
while (NULL != cur && head != cur && idx < total_cnt) {
ADD_DAG_INFO(cur, "RUNNING_DAG_NET_LIST");
cur = cur->get_next();
}
} }
} }
@ -3628,7 +3638,8 @@ bool ObDagNetScheduler::is_dag_map_full()
ObMutexGuard guard(dag_net_map_lock_); ObMutexGuard guard(dag_net_map_lock_);
COMMON_LOG(INFO, "dag map is almost full, stop loop blocking dag_net_map", COMMON_LOG(INFO, "dag map is almost full, stop loop blocking dag_net_map",
"dag_map_size", scheduler_->get_cur_dag_cnt(), "dag_net_map_size", dag_net_map_.size(), "dag_map_size", scheduler_->get_cur_dag_cnt(), "dag_net_map_size", dag_net_map_.size(),
"blocking_dag_net_list_size", blocking_dag_net_list_.get_size()); "blocking_dag_net_list_size", dag_net_list_[BLOCKING_DAG_NET_LIST].get_size(),
"running_dag_net_list_size", dag_net_list_[RUNNING_DAG_NET_LIST].get_size());
} }
} }
} }
@ -3636,23 +3647,27 @@ bool ObDagNetScheduler::is_dag_map_full()
return bret; return bret;
} }
int ObDagNetScheduler::loop_running_dag_net_map() int ObDagNetScheduler::loop_running_dag_net_list()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
ObIDagNet *dag_net = nullptr;
int64_t slow_dag_net_cnt = 0; int64_t slow_dag_net_cnt = 0;
ObMutexGuard guard(dag_net_map_lock_); ObMutexGuard guard(dag_net_map_lock_);
DagNetMap::iterator iter = dag_net_map_.begin(); ObIDagNet *head = dag_net_list_[RUNNING_DAG_NET_LIST].get_header();
for (; iter != dag_net_map_.end() && !is_dag_map_full(); ++iter) { // ignore failure ObIDagNet *cur = head->get_next();
if (OB_ISNULL(dag_net = iter->second)) { ObIDagNet *dag_net = nullptr;
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dag net is unepxected null", K(ret), KP(dag_net)); while (NULL != cur && head != cur) { // ignore failure
} else if (dag_net->is_started() && OB_FAIL(dag_net->schedule_rest_dag())) { LOG_DEBUG("loop running dag net list", K(ret), KPC(cur));
LOG_WARN("failed to schedule rest dag", K(ret)); dag_net = cur;
cur = cur->get_next();
if (dag_net->is_started() && OB_TMP_FAIL(dag_net->schedule_rest_dag())) {
LOG_WARN("failed to schedule rest dag", K(tmp_ret));
} else if (dag_net->check_finished_and_mark_stop()) { } else if (dag_net->check_finished_and_mark_stop()) {
LOG_WARN("dag net is in finish state", K(ret), KPC(dag_net)); LOG_WARN("dag net is in finish state, move to finished list", K(ret), KPC(dag_net));
(void) erase_dag_net_list_or_abort(RUNNING_DAG_NET_LIST, dag_net);
(void) add_dag_net_list_or_abort(FINISHED_DAG_NET_LIST, dag_net);
} else if (dag_net->is_co_dag_net() } else if (dag_net->is_co_dag_net()
&& dag_net->get_start_time() + SLOW_COMPACTION_DAG_NET_THREASHOLD < ObTimeUtility::fast_current_time()) { && dag_net->get_start_time() + SLOW_COMPACTION_DAG_NET_THREASHOLD < ObTimeUtility::fast_current_time()) {
++slow_dag_net_cnt; ++slow_dag_net_cnt;
@ -3669,12 +3684,35 @@ int ObDagNetScheduler::loop_running_dag_net_map()
} }
} }
} }
COMMON_LOG(INFO, "loop running dag net map", K(ret), COMMON_LOG(INFO, "loop running dag net list", K(ret),
"running_dag_net_map_size", dag_net_map_.size(), K(slow_dag_net_cnt)); "running_dag_net_list_size", dag_net_list_[RUNNING_DAG_NET_LIST].get_size(), K(slow_dag_net_cnt),
"finished_dag_net_list_size", dag_net_list_[FINISHED_DAG_NET_LIST].get_size());
return ret; return ret;
} }
int ObDagNetScheduler::loop_finished_dag_net_list()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(scheduler_)) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "scheduler is null", KP(scheduler_));
} else {
ObIDagNet *head = dag_net_list_[FINISHED_DAG_NET_LIST].get_header();
ObIDagNet *cur = head->get_next();
ObIDagNet *dag_net = nullptr;
while (NULL != cur && head != cur) {
LOG_DEBUG("loop blocking dag net list", K(ret), KPC(cur));
dag_net = cur;
cur = cur->get_next();
(void) erase_dag_net_list_or_abort(FINISHED_DAG_NET_LIST, dag_net);
(void) scheduler_->finish_dag_net(dag_net);
}
}
COMMON_LOG(INFO, "loop finsihed dag net list", K(ret), "finished_dag_net_list_size", dag_net_list_[FINISHED_DAG_NET_LIST].get_size());
return ret;
}
int ObDagNetScheduler::loop_blocking_dag_net_list() int ObDagNetScheduler::loop_blocking_dag_net_list()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -3683,10 +3721,10 @@ int ObDagNetScheduler::loop_blocking_dag_net_list()
COMMON_LOG(WARN, "scheduler is null", KP(scheduler_)); COMMON_LOG(WARN, "scheduler is null", KP(scheduler_));
} else { } else {
ObMutexGuard guard(dag_net_map_lock_); ObMutexGuard guard(dag_net_map_lock_);
ObIDagNet *head = blocking_dag_net_list_.get_header(); ObIDagNet *head = dag_net_list_[BLOCKING_DAG_NET_LIST].get_header();
ObIDagNet *cur = head->get_next(); ObIDagNet *cur = head->get_next();
ObIDagNet *tmp = nullptr; ObIDagNet *tmp = nullptr;
int64_t rest_cnt = DEFAULT_MAX_RUNNING_DAG_NET_CNT - (dag_net_map_.size() - blocking_dag_net_list_.get_size()); int64_t rest_cnt = DEFAULT_MAX_RUNNING_DAG_NET_CNT - (dag_net_map_.size() - dag_net_list_[BLOCKING_DAG_NET_LIST].get_size());
while (NULL != cur && head != cur && rest_cnt > 0 && !is_dag_map_full()) { while (NULL != cur && head != cur && rest_cnt > 0 && !is_dag_map_full()) {
LOG_DEBUG("loop blocking dag net list", K(ret), KPC(cur), K(rest_cnt)); LOG_DEBUG("loop blocking dag net list", K(ret), KPC(cur), K(rest_cnt));
tmp = cur; tmp = cur;
@ -3696,12 +3734,13 @@ int ObDagNetScheduler::loop_blocking_dag_net_list()
COMMON_LOG(WARN, "failed to start running or be canceled", K(ret), KPC(cur)); COMMON_LOG(WARN, "failed to start running or be canceled", K(ret), KPC(cur));
} }
(void) finish_dag_net_without_lock(*tmp); (void) finish_dag_net_without_lock(*tmp);
(void) erase_block_dag_net_or_abort(tmp); (void) erase_dag_net_list_or_abort(BLOCKING_DAG_NET_LIST, tmp);
(void) scheduler_->free_dag_net(tmp); // set tmp nullptr (void) scheduler_->free_dag_net(tmp); // set tmp nullptr
} else { } else {
tmp->set_start_time(); tmp->set_start_time();
--rest_cnt; --rest_cnt;
(void) erase_block_dag_net_or_abort(tmp); (void) erase_dag_net_list_or_abort(BLOCKING_DAG_NET_LIST, tmp);
(void) add_dag_net_list_or_abort(RUNNING_DAG_NET_LIST, tmp);
} }
} }
} }
@ -3781,21 +3820,29 @@ int ObDagNetScheduler::check_ls_compaction_dag_exist_with_cancel(const ObLSID &l
exist = false; exist = false;
ObMutexGuard dag_net_guard(dag_net_map_lock_); ObMutexGuard dag_net_guard(dag_net_map_lock_);
int64_t cancel_dag_cnt = 0; int64_t cancel_dag_cnt = 0;
ObIDagNet *head = nullptr;
ObIDagNet *cur = nullptr;
ObIDagNet *cur_dag_net = nullptr; ObIDagNet *cur_dag_net = nullptr;
DagNetMap::iterator iter = dag_net_map_.begin();
for (; OB_SUCC(ret) && iter != dag_net_map_.end(); ++iter) { for (int i = BLOCKING_DAG_NET_LIST; i <= RUNNING_DAG_NET_LIST; i++) {
if (OB_ISNULL(cur_dag_net = iter->second)) { head = dag_net_list_[i].get_header();
ret = OB_ERR_UNEXPECTED; cur = head->get_next();
LOG_WARN("dag net is unepxected null", K(ret), KP(cur_dag_net)); cur_dag_net = nullptr;
} else if (cur_dag_net->is_co_dag_net()) {
compaction::ObCOMergeDagNet *co_dag_net = static_cast<compaction::ObCOMergeDagNet*>(cur_dag_net); while (NULL != cur && head != cur) {
if (ls_id == co_dag_net->get_dag_param().ls_id_) { cur_dag_net = cur;
cur_dag_net->set_cancel(); cur = cur->get_next();
++cancel_dag_cnt; if (cur_dag_net->is_co_dag_net()) {
exist = true; compaction::ObCOMergeDagNet *co_dag_net = static_cast<compaction::ObCOMergeDagNet*>(cur_dag_net);
if (ls_id == co_dag_net->get_dag_param().ls_id_) {
cur_dag_net->set_cancel();
++cancel_dag_cnt;
exist = true;
}
} }
} }
} // end of while }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
LOG_INFO("success to cancel dag net", KR(ret), K(ls_id), K(cancel_dag_cnt), K(exist)); LOG_INFO("success to cancel dag net", KR(ret), K(ls_id), K(cancel_dag_cnt), K(exist));
} }
@ -4475,6 +4522,12 @@ void ObTenantDagScheduler::notify()
scheduler_sync_.signal(); scheduler_sync_.signal();
} }
void ObTenantDagScheduler::notify_when_dag_net_finish()
{
set_fast_schedule_dag_net();
notify();
}
int ObTenantDagScheduler::deal_with_finish_task(ObITask &task, ObTenantDagWorker &worker, int error_code) int ObTenantDagScheduler::deal_with_finish_task(ObITask &task, ObTenantDagWorker &worker, int error_code)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -4594,8 +4647,11 @@ void ObTenantDagScheduler::loop_dag_net()
clear_fast_schedule_dag_net(); clear_fast_schedule_dag_net();
} }
if (need_loop_running_dag_net || REACH_TENANT_TIME_INTERVAL(LOOP_RUNNING_DAG_NET_MAP_INTERVAL)) { if (need_loop_running_dag_net || REACH_TENANT_TIME_INTERVAL(LOOP_RUNNING_DAG_NET_MAP_INTERVAL)) {
if (OB_TMP_FAIL(dag_net_sche_.loop_running_dag_net_map())) { if (OB_TMP_FAIL(dag_net_sche_.loop_running_dag_net_list())) {
COMMON_LOG_RET(WARN, tmp_ret, "failed to add dag from running_dag_net_map"); COMMON_LOG_RET(WARN, tmp_ret, "failed to loop running_dag_net_list");
}
if (OB_TMP_FAIL(dag_net_sche_.loop_finished_dag_net_list())) {
COMMON_LOG_RET(WARN, tmp_ret, "failed to loop finished_dag_net_list");
} }
} }
} }

View File

@ -280,6 +280,20 @@ enum ObDagListIndex
DAG_LIST_MAX DAG_LIST_MAX
}; };
enum ObDagNetListIndex
{
BLOCKING_DAG_NET_LIST = 0,
RUNNING_DAG_NET_LIST = 1,
FINISHED_DAG_NET_LIST = 2,
DAG_NET_LIST_MAX
};
const char *dag_net_list_to_str(const ObDagNetListIndex &dag_net_list_index);
inline bool is_valid_dag_net_list(const ObDagNetListIndex &dag_net_list_index)
{
return dag_net_list_index >= 0 && dag_net_list_index < DAG_NET_LIST_MAX;
}
struct ObIDagInitParam struct ObIDagInitParam
{ {
ObIDagInitParam() {} ObIDagInitParam() {}
@ -406,8 +420,8 @@ public:
return true; return true;
} }
int add_child(ObIDag &child); int add_child(ObIDag &child);
int update_status_in_dag_net(); int update_status_in_dag_net(bool &dag_net_finished);
int finish(const ObDagStatus status); int finish(const ObDagStatus status, bool &dag_net_finished);
void gene_dag_info(ObDagInfo &info, const char *list_info); void gene_dag_info(ObDagInfo &info, const char *list_info);
virtual int gene_compaction_info(compaction::ObTabletCompactionProgress &progress) virtual int gene_compaction_info(compaction::ObTabletCompactionProgress &progress)
{ {
@ -550,8 +564,9 @@ public:
{ {
return true; return true;
} }
OB_INLINE bool inner_check_finished_without_lock() { return (is_cancel_ || inner_check_finished()) && dag_record_map_.empty(); }
bool check_finished_and_mark_stop(); bool check_finished_and_mark_stop();
int update_dag_status(ObIDag &dag); int update_dag_status(ObIDag &dag, bool &dag_net_finished);
int erase_dag_from_dag_net(ObIDag &dag); int erase_dag_from_dag_net(ObIDag &dag);
static const char *get_dag_net_type_str(enum ObDagNetType::ObDagNetTypeEnum type); static const char *get_dag_net_type_str(enum ObDagNetType::ObDagNetTypeEnum type);
@ -574,6 +589,7 @@ public:
} }
void set_cancel(); void set_cancel();
bool is_cancel(); bool is_cancel();
void set_last_dag_finished();
bool is_inited(); bool is_inited();
bool is_started(); bool is_started();
virtual int deal_with_cancel() virtual int deal_with_cancel()
@ -610,6 +626,7 @@ private:
ObDagId dag_net_id_; ObDagId dag_net_id_;
ObDagWarningInfo *first_fail_dag_info_; ObDagWarningInfo *first_fail_dag_info_;
bool is_cancel_; bool is_cancel_;
bool is_finishing_last_dag_; // making dag net freed after last dag freed if dag net can be freed after finish last dag
}; };
struct ObDagInfo struct ObDagInfo
@ -748,12 +765,13 @@ public:
ObIAllocator &ha_allocator, ObIAllocator &ha_allocator,
ObTenantDagScheduler &scheduler); ObTenantDagScheduler &scheduler);
bool is_empty() const { return 0 == dag_net_map_.size();} // only for unittest bool is_empty(); // only for unittest
int add_dag_net(ObIDagNet &dag_net); int add_dag_net(ObIDagNet &dag_net);
void erase_dag_net_or_abort(ObIDagNet &dag_net); void erase_dag_net_or_abort(ObIDagNet &dag_net);
void erase_dag_net_id_or_abort(ObIDagNet &dag_net); void erase_dag_net_id_or_abort(ObIDagNet &dag_net);
void erase_block_dag_net_or_abort(ObIDagNet *dag_net);
void finish_dag_net_without_lock(ObIDagNet &dag_net); void finish_dag_net_without_lock(ObIDagNet &dag_net);
void erase_dag_net_list_or_abort(const ObDagNetListIndex &dag_net_list_index, ObIDagNet *dag_net);
void add_dag_net_list_or_abort(const ObDagNetListIndex &dag_net_list_index, ObIDagNet *dag_net);
void finish_dag_net(ObIDagNet &dag_net); void finish_dag_net(ObIDagNet &dag_net);
void dump_dag_status(); void dump_dag_status();
int64_t get_dag_net_count(); int64_t get_dag_net_count();
@ -772,7 +790,9 @@ public:
int64_t &start_time); int64_t &start_time);
int64_t get_dag_net_count(const ObDagNetType::ObDagNetTypeEnum type); int64_t get_dag_net_count(const ObDagNetType::ObDagNetTypeEnum type);
bool is_dag_map_full(); bool is_dag_map_full();
int loop_running_dag_net_map(); int loop_running_dag_net_list();
// do not hold dag_net_map_lock_, otherwise deadlock when clear_dag_net_ctx, see
int loop_finished_dag_net_list();
int loop_blocking_dag_net_list(); int loop_blocking_dag_net_list();
int check_dag_net_exist( int check_dag_net_exist(
const ObDagId &dag_id, bool &exist); const ObDagId &dag_id, bool &exist);
@ -801,7 +821,11 @@ private:
ObTenantDagScheduler *scheduler_; ObTenantDagScheduler *scheduler_;
lib::ObMutex dag_net_map_lock_; lib::ObMutex dag_net_map_lock_;
DagNetMap dag_net_map_; // lock by dag_net_map_lock_ DagNetMap dag_net_map_; // lock by dag_net_map_lock_
DagNetList blocking_dag_net_list_; // lock by dag_net_map_lock_ /*
* blocking and running list should always locked by dag_net_map_lock_, but finished not.
* finished dag net list must without lock when free dag net, otherwise it would deadlock when clearing dag net ctx
*/
DagNetList dag_net_list_[DAG_NET_LIST_MAX];
DagNetIdMap dag_net_id_map_; // for HA to search dag_net of specified dag_id // lock by dag_net_map_lock_ DagNetIdMap dag_net_id_map_; // for HA to search dag_net of specified dag_id // lock by dag_net_map_lock_
int64_t dag_net_cnts_[ObDagNetType::DAG_NET_TYPE_MAX]; // lock by dag_net_map_lock_ int64_t dag_net_cnts_[ObDagNetType::DAG_NET_TYPE_MAX]; // lock by dag_net_map_lock_
}; };
@ -932,14 +956,9 @@ private:
ObIDag *&dag); ObIDag *&dag);
void add_schedule_info_(const ObDagType::ObDagTypeEnum dag_type, const int64_t data_size); void add_schedule_info_(const ObDagType::ObDagTypeEnum dag_type, const int64_t data_size);
void add_added_info_(const ObDagType::ObDagTypeEnum dag_type); void add_added_info_(const ObDagType::ObDagTypeEnum dag_type);
int schedule_one_( int schedule_one_();
common::ObIArray<ObIDagNet *> &delayed_erase_dag_nets,
ObIDagNet *&extra_erase_dag_net);
int schedule_dag_(ObIDag &dag, bool &move_dag_to_waiting_list); int schedule_dag_(ObIDag &dag, bool &move_dag_to_waiting_list);
int pop_task_from_ready_list_( int pop_task_from_ready_list_(ObITask *&task);
ObITask *&task,
common::ObIArray<ObIDagNet *> &delayed_erase_dag_nets,
ObIDagNet *&extra_erase_dag_net);
int rank_compaction_dags_(); int rank_compaction_dags_();
void try_update_adaptive_task_limit_(const int64_t batch_size); void try_update_adaptive_task_limit_(const int64_t batch_size);
int batch_move_compaction_dags_(const int64_t batch_size); int batch_move_compaction_dags_(const int64_t batch_size);
@ -948,20 +967,15 @@ private:
const int64_t batch_size, const int64_t batch_size,
common::ObSEArray<compaction::ObTabletMergeDag *, 32> &rank_dags); common::ObSEArray<compaction::ObTabletMergeDag *, 32> &rank_dags);
int generate_next_dag_(ObIDag &dag); int generate_next_dag_(ObIDag &dag);
int add_dag_warning_info_into_dag_net_(ObIDag &dag, bool &need_add);
int finish_dag_( int finish_dag_(
const ObIDag::ObDagStatus status, const ObIDag::ObDagStatus status,
ObIDag &dag, ObIDag &dag,
ObIDagNet *&erase_dag_net,
const bool try_move_child); const bool try_move_child);
// ensure that prio_lock is not locked before calling this func
int erase_dag_net_without_lock_(ObIDagNet *erase_dag_net);
void erase_dag_nets_without_lock_(
common::ObIArray<ObIDagNet *> &delayed_erase_dag_nets,
ObIDagNet *extra_erase_dag_net);
int try_move_child_to_ready_list_(ObIDag &dag); int try_move_child_to_ready_list_(ObIDag &dag);
int erase_dag_(ObIDag &dag); int erase_dag_(ObIDag &dag);
int deal_with_fail_dag_(ObIDag &dag, bool &retry_flag); int deal_with_fail_dag_(ObIDag &dag, bool &retry_flag);
int finish_task_in_dag_(ObITask &task, ObIDag &dag, ObIDagNet *&erase_dag_net); int finish_task_in_dag_(ObITask &task, ObIDag &dag);
void pause_worker_(ObTenantDagWorker &worker); void pause_worker_(ObTenantDagWorker &worker);
bool check_need_load_shedding_(const bool for_schedule); bool check_need_load_shedding_(const bool for_schedule);
@ -1041,6 +1055,7 @@ public:
void free_dag_net(T *&dag_net); void free_dag_net(T *&dag_net);
void run1() final; void run1() final;
void notify(); void notify();
void notify_when_dag_net_finish();
void reset(); void reset();
void destroy(); void destroy();
int64_t get_work_thread_num() int64_t get_work_thread_num()
@ -1049,7 +1064,7 @@ public:
return work_thread_num_; return work_thread_num_;
} }
int64_t get_dag_limit() const { return dag_limit_; } int64_t get_dag_limit() const { return dag_limit_; }
bool is_empty() const bool is_empty()
{ {
bool bret = true; bool bret = true;
for (int64_t i = 0; i < ObDagPrio::DAG_PRIO_MAX; ++i) { for (int64_t i = 0; i < ObDagPrio::DAG_PRIO_MAX; ++i) {
@ -1130,7 +1145,7 @@ private:
static const int64_t DUMP_DAG_STATUS_INTERVAL = 10 * 1000LL * 1000LL; // 10s static const int64_t DUMP_DAG_STATUS_INTERVAL = 10 * 1000LL * 1000LL; // 10s
static const int64_t DEFAULT_CHECK_PERIOD = 3 * 1000 * 1000; // 3s static const int64_t DEFAULT_CHECK_PERIOD = 3 * 1000 * 1000; // 3s
static const int64_t LOOP_WAITING_DAG_LIST_INTERVAL = 5 * 1000 * 1000L; // 5s static const int64_t LOOP_WAITING_DAG_LIST_INTERVAL = 5 * 1000 * 1000L; // 5s
static const int64_t LOOP_RUNNING_DAG_NET_MAP_INTERVAL = 3 * 60 * 1000 * 1000L; // 3m static const int64_t LOOP_RUNNING_DAG_NET_MAP_INTERVAL = 1 * 60 * 1000 * 1000L; // 1m
static const int32_t MAX_SHOW_DAG_NET_CNT_PER_PRIO = 500; static const int32_t MAX_SHOW_DAG_NET_CNT_PER_PRIO = 500;
static const int64_t MANY_DAG_COUNT = 2000; static const int64_t MANY_DAG_COUNT = 2000;
private: private:

View File

@ -33,6 +33,12 @@ ObCOTabletMergeCtx::ObCOTabletMergeCtx(
{ {
} }
/*
* ATTENTION: NEVER USE ANY LOG STREEM VARIABLES IN THIS FUNCTION.
* Destructor will be called when finish dag net.
* ObCOMergeDagNet is special, it will be check canceled when ls offine in ObDagNetScheduler::check_ls_compaction_dag_exist_with_cancel.
* But dag_net is only moved into finished dag net list and delaying freed. So if log streem variables used in this function after ls offine, it will be dangerous
*/
ObCOTabletMergeCtx::~ObCOTabletMergeCtx() ObCOTabletMergeCtx::~ObCOTabletMergeCtx()
{ {
if (OB_NOT_NULL(cg_merge_info_array_)) { if (OB_NOT_NULL(cg_merge_info_array_)) {

View File

@ -725,6 +725,12 @@ int ObCOMergeBatchExeTask::process()
} }
void *buf = nullptr; void *buf = nullptr;
#ifdef ERRSIM
ret = OB_E(EventTable::EN_CO_MREGE_DAG_SCHEDULE_REST) ret;
if (OB_FAIL(ret)) {
LOG_INFO("ERRSIM EN_CO_MREGE_DAG_SCHEDULE_REST PROCESS FAILED", K(ret));
}
#endif
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
} else if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObCOMerger)))) { } else if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObCOMerger)))) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
@ -997,6 +1003,12 @@ ObCOMergeDagNet::ObCOMergeDagNet()
{ {
} }
/*
* ATTENTION: NEVER USE ANY LOG STREEM VARIABLES IN THIS FUNCTION.
* Destructor will be called when finish dag net.
* ObCOMergeDagNet is special, it will be check canceled when ls offine in ObDagNetScheduler::check_ls_compaction_dag_exist_with_cancel.
* But dag_net is only moved into finished dag net list and delaying freed. So if log streem variables used in this function after ls offine, it will be dangerous
*/
ObCOMergeDagNet::~ObCOMergeDagNet() ObCOMergeDagNet::~ObCOMergeDagNet()
{ {
finish_dag_ = nullptr; finish_dag_ = nullptr;
@ -1052,7 +1064,7 @@ int ObCOMergeDagNet::start_running()
return ret; return ret;
} }
// schedule_rest_dag is called in loop_running_dag_net_map(Timer) // schedule_rest_dag is called in loop_running_dag_net_list(Timer)
// schedule dag may not finish yet, need wait merge_status >= PREPARE_FINISHED // schedule dag may not finish yet, need wait merge_status >= PREPARE_FINISHED
int ObCOMergeDagNet::schedule_rest_dag() int ObCOMergeDagNet::schedule_rest_dag()
{ {
@ -1067,13 +1079,30 @@ int ObCOMergeDagNet::schedule_rest_dag()
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("merge ctx is null or schema invalid", K(ret), KPC(co_merge_ctx_)); LOG_WARN("merge ctx is null or schema invalid", K(ret), KPC(co_merge_ctx_));
} else if (!is_cancel() && COMergeStatus::PREPARE_FINISHED <= ATOMIC_LOAD(&merge_status_)) { } else if (!is_cancel() && COMergeStatus::PREPARE_FINISHED <= ATOMIC_LOAD(&merge_status_)) {
if (OB_FAIL(inner_create_and_schedule_dags())) { #ifdef ERRSIM
ret = OB_E(EventTable::EN_CO_MREGE_DAG_SCHEDULE_REST) ret;
if (OB_FAIL(ret)) {
LOG_INFO("ERRSIM EN_CO_MREGE_DAG_SCHEDULE_REST SCHEDULE FAILED", K(ret));
}
#endif
if (FAILEDx(inner_create_and_schedule_dags())) {
LOG_WARN("failed to create and schedule rest dags", K(ret)); LOG_WARN("failed to create and schedule rest dags", K(ret));
} }
} }
return ret; return ret;
} }
/*
* ATTENTION: NEVER USE ANY LOG STREEM VARIABLES IN THIS FUNCTION.
* clear_dag_net_ctx() will be called when finish dag net.
* ObCOMergeDagNet is special, it will be check canceled when ls offine in ObDagNetScheduler::check_ls_compaction_dag_exist_with_cancel.
* But dag_net is only moved into finished dag net list and delaying freed. So if log streem variables used in this function after ls offine, it will be dangerous
*/
int ObCOMergeDagNet::clear_dag_net_ctx()
{
return ObIDagNet::clear_dag_net_ctx();
}
#define MARK_CG_SCHEDULE_STATUS(start_cg_idx, end_cg_idx, target_status) \ #define MARK_CG_SCHEDULE_STATUS(start_cg_idx, end_cg_idx, target_status) \
for (int64_t i = start_cg_idx; i < end_cg_idx; ++i) { \ for (int64_t i = start_cg_idx; i < end_cg_idx; ++i) { \
if (ObCOTabletMergeCtx::CG_SCHE_STATUS_FINISHED != co_merge_ctx_->cg_schedule_status_array_[i]) { \ if (ObCOTabletMergeCtx::CG_SCHE_STATUS_FINISHED != co_merge_ctx_->cg_schedule_status_array_[i]) { \

View File

@ -269,6 +269,7 @@ public:
virtual int fill_dag_net_key(char *buf, const int64_t buf_len) const override; virtual int fill_dag_net_key(char *buf, const int64_t buf_len) const override;
virtual int fill_comment(char *buf, const int64_t buf_len) const override; virtual int fill_comment(char *buf, const int64_t buf_len) const override;
virtual int schedule_rest_dag() override; virtual int schedule_rest_dag() override;
virtual int clear_dag_net_ctx() override;
virtual bool inner_check_finished() override virtual bool inner_check_finished() override
{ {
return ATOMIC_LOAD(&finish_added_); return ATOMIC_LOAD(&finish_added_);

View File

@ -1582,7 +1582,7 @@ TEST_F(TestDagScheduler, test_cancel_dag_func)
scheduler->cancel_dag(dag_array[i]); scheduler->cancel_dag(dag_array[i]);
} }
EXPECT_EQ(true, scheduler->is_empty()); wait_scheduler();
EXPECT_EQ(0, MTL(ObDagWarningHistoryManager *)->size()); EXPECT_EQ(0, MTL(ObDagWarningHistoryManager *)->size());
} }
@ -1614,9 +1614,7 @@ TEST_F(TestDagScheduler, test_cancel_dag_net_func)
} }
EXPECT_EQ(OB_SUCCESS, ret); EXPECT_EQ(OB_SUCCESS, ret);
ob_usleep(5000 * 1000); wait_scheduler();
EXPECT_EQ(true, scheduler->is_empty());
EXPECT_EQ(0, MTL(ObDagWarningHistoryManager *)->size()); EXPECT_EQ(0, MTL(ObDagWarningHistoryManager *)->size());
} }