Fix bug of 1. canceled dag_net is not deleted from dag_net_map. 2. invalid tablet_info due to filter

This commit is contained in:
Tsunaou 2023-11-27 07:33:07 +00:00 committed by ob-robot
parent e63f6e8dbe
commit 546a33924f
4 changed files with 98 additions and 71 deletions

View File

@ -1216,6 +1216,11 @@ bool ObIDagNet::is_inited()
return OB_NOT_NULL(allocator_);
}
bool ObIDagNet::is_started()
{
return start_time_ != 0;
}
void ObIDagNet::diagnose_dag(common::ObIArray<compaction::ObDiagnoseTabletCompProgress> &progress_list)
{
int tmp_ret = OB_SUCCESS;
@ -2132,8 +2137,8 @@ int ObDagPrioScheduler::erase_dag_net_without_lock_(ObIDagNet *erase_dag_net)
COMMON_LOG(WARN, "unexpected null scheduler", K(ret), KP_(scheduler));
} else if (OB_ISNULL(erase_dag_net)) {
// do nothing
} else if (OB_FAIL(scheduler_->finish_dag_net(erase_dag_net))) {
COMMON_LOG(WARN, "failed to erase dag net", K(ret), KPC(erase_dag_net));
} else {
(void) scheduler_->finish_dag_net(erase_dag_net); // void
}
return ret;
}
@ -3420,6 +3425,33 @@ int ObDagNetScheduler::init(
return ret;
}
void ObDagNetScheduler::erase_dag_net_or_abort(ObIDagNet &dag_net)
{
int ret = OB_SUCCESS;
if (OB_FAIL(dag_net_map_.erase_refactored(&dag_net))) {
COMMON_LOG(ERROR, "failed to erase dag net from dag_net_map_", K(ret), K(dag_net));
ob_abort();
}
}
void ObDagNetScheduler::erase_dag_net_id_or_abort(ObIDagNet &dag_net)
{
int ret = OB_SUCCESS;
if (OB_FAIL(dag_net_id_map_.erase_refactored(dag_net.get_dag_id()))) {
COMMON_LOG(ERROR, "failed to erase dag_net_id from dag_net_id_map_", K(ret), K(dag_net));
ob_abort();
}
(void) erase_dag_net_or_abort(dag_net);
}
void ObDagNetScheduler::erase_block_dag_net_or_abort(ObIDagNet *dag_net)
{
if (!blocking_dag_net_list_.remove(dag_net)) {
COMMON_LOG_RET(WARN, OB_ERR_UNEXPECTED, "failed to remove dag_net from blocking_dag_net_list", K(dag_net));
ob_abort();
}
}
int ObDagNetScheduler::add_dag_net(ObIDagNet &dag_net)
{
int ret = OB_SUCCESS;
@ -3443,25 +3475,11 @@ int ObDagNetScheduler::add_dag_net(ObIDagNet &dag_net)
COMMON_LOG(WARN, "exist dag net is", K(dag_net), KPC(exist_dag_net));
}
}
if (OB_TMP_FAIL(dag_net_map_.erase_refactored(&dag_net))) {
COMMON_LOG(ERROR, "failed to erase from running_dag_net_map", K(tmp_ret), K(dag_net));
ob_abort();
}
(void) erase_dag_net_or_abort(dag_net);
} else if (!blocking_dag_net_list_.add_last(&dag_net)) {// add into blocking_dag_net_list
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(ERROR, "failed to add into blocking_dag_net_list", K(ret), K(dag_net));
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(dag_net_id_map_.erase_refactored(dag_net.get_dag_id()))) {
COMMON_LOG(ERROR, "failed to erase from dag net id from map", K(tmp_ret), K(dag_net));
ob_abort();
}
if (OB_TMP_FAIL(dag_net_map_.erase_refactored(&dag_net))) {
COMMON_LOG(ERROR, "failed to erase from running_dag_net_map", K(tmp_ret), K(dag_net));
ob_abort();
}
(void) erase_dag_net_id_or_abort(dag_net);
} else {
++dag_net_cnts_[dag_net.get_type()];
dag_net.set_add_time();
@ -3472,19 +3490,23 @@ int ObDagNetScheduler::add_dag_net(ObIDagNet &dag_net)
return ret;
}
int ObDagNetScheduler::finish_dag_net(ObIDagNet &dag_net)
void ObDagNetScheduler::finish_dag_net_without_lock(ObIDagNet &dag_net)
{
int ret = OB_SUCCESS;
ObMutexGuard guard(dag_net_map_lock_);
if (OB_FAIL(dag_net_map_.erase_refactored(&dag_net))) {
COMMON_LOG(ERROR, "failed to erase dag from running_dag_net_map", K(ret), K(dag_net));
ob_abort();
} else if (OB_FAIL(dag_net_id_map_.erase_refactored(dag_net.get_dag_id()))) {
COMMON_LOG(ERROR, "failed to erase dag from running_dag_net_map", K(ret), K(dag_net));
ob_abort();
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(dag_net.add_dag_warning_info())) {
COMMON_LOG_RET(WARN, tmp_ret, "failed to add dag warning info in dag net into mgr", K(tmp_ret), K(dag_net));
}
if (OB_TMP_FAIL(dag_net.clear_dag_net_ctx())) {
COMMON_LOG_RET(WARN, tmp_ret, "failed to clear dag net ctx", K(tmp_ret), K(dag_net));
}
(void) erase_dag_net_id_or_abort(dag_net);
--dag_net_cnts_[dag_net.get_type()];
return ret;
}
void ObDagNetScheduler::finish_dag_net(ObIDagNet &dag_net)
{
ObMutexGuard guard(dag_net_map_lock_);
(void) finish_dag_net_without_lock(dag_net);
}
void ObDagNetScheduler::dump_dag_status()
@ -3616,7 +3638,7 @@ int ObDagNetScheduler::loop_running_dag_net_map()
if (OB_ISNULL(dag_net = iter->second)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dag net is unepxected null", K(ret), KP(dag_net));
} else if (OB_FAIL(dag_net->schedule_rest_dag())) {
} else if (dag_net->is_started() && OB_FAIL(dag_net->schedule_rest_dag())) {
LOG_WARN("failed to schedule rest dag", K(ret));
} else if (dag_net->check_finished_and_mark_stop()) {
LOG_WARN("dag net is in finish state", K(ret), KPC(dag_net));
@ -3655,34 +3677,19 @@ int ObDagNetScheduler::loop_blocking_dag_net_list()
int64_t rest_cnt = DEFAULT_MAX_RUNNING_DAG_NET_CNT - (dag_net_map_.size() - blocking_dag_net_list_.get_size());
while (NULL != cur && head != cur && rest_cnt > 0 && !is_dag_map_full()) {
LOG_DEBUG("loop blocking dag net list", K(ret), KPC(cur), K(rest_cnt));
if (OB_FAIL(cur->start_running())) { // call start_running function
COMMON_LOG(WARN, "failed to start running", K(ret), KPC(cur));
int64_t tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(dag_net_map_.erase_refactored(cur))) {
COMMON_LOG(ERROR, "failed to erase from running_dag_net_map", K(tmp_ret), KPC(cur));
ob_abort();
} else if (OB_TMP_FAIL(dag_net_id_map_.erase_refactored(cur->get_dag_id()))) {
COMMON_LOG(ERROR, "failed to erase from running_dag_net_id_map", K(tmp_ret), KPC(cur));
ob_abort();
} else {
tmp = cur;
cur = cur->get_next();
if (!blocking_dag_net_list_.remove(tmp)) {
COMMON_LOG(WARN, "failed to remove dag_net from blocking_dag_net_list", K(tmp));
ob_abort();
}
--dag_net_cnts_[tmp->get_type()];
scheduler_->free_dag_net(tmp);
tmp = cur;
cur = cur->get_next();
if (tmp->is_cancel() || OB_FAIL(tmp->start_running())) { // call start_running function
if (OB_FAIL(ret)) {
COMMON_LOG(WARN, "failed to start running or be canceled", K(ret), KPC(cur));
}
(void) finish_dag_net_without_lock(*tmp);
(void) erase_block_dag_net_or_abort(tmp);
(void) scheduler_->free_dag_net(tmp); // set tmp nullptr
} else {
cur->set_start_time();
tmp = cur;
cur = cur->get_next();
tmp->set_start_time();
--rest_cnt;
if (!blocking_dag_net_list_.remove(tmp)) {
COMMON_LOG(WARN, "failed to remove dag_net from blocking_dag_net_list", K(tmp));
ob_abort();
}
(void) erase_block_dag_net_or_abort(tmp);
}
}
}
@ -3768,6 +3775,10 @@ int ObDagNetScheduler::check_ls_compaction_dag_exist_with_cancel(const ObLSID &l
if (OB_ISNULL(cur_dag_net = iter->second)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dag net is unepxected null", K(ret), KP(cur_dag_net));
} else if (cur_dag_net->check_finished_and_mark_stop()) {
LOG_INFO("start finish dag net", K(ret), KPC(cur_dag_net));
(void) finish_dag_net_without_lock(*cur_dag_net);
(void) scheduler_->free_dag_net(cur_dag_net);
} else if (cur_dag_net->is_co_dag_net()) {
compaction::ObCOMergeDagNet *co_dag_net = static_cast<compaction::ObCOMergeDagNet*>(cur_dag_net);
if (ls_id == co_dag_net->get_dag_param().ls_id_) {
@ -4419,23 +4430,13 @@ int ObTenantDagScheduler::deal_with_finish_task(ObITask &task, ObTenantDagWorker
return ret;
}
int ObTenantDagScheduler::finish_dag_net(ObIDagNet *dag_net)
void ObTenantDagScheduler::finish_dag_net(ObIDagNet *dag_net)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
if (OB_NOT_NULL(dag_net)) {
if (OB_TMP_FAIL(dag_net->add_dag_warning_info())) {
COMMON_LOG(WARN, "failed to add dag warning info in dag net into mgr", K(tmp_ret), KPC(dag_net));
}
if (OB_TMP_FAIL(dag_net->clear_dag_net_ctx())) {
COMMON_LOG(WARN, "failed to clear dag net ctx", K(tmp_ret), KPC(dag_net));
}
if (OB_SUCC(dag_net_sche_.finish_dag_net(*dag_net))) {
COMMON_LOG(INFO, "dag net finished", K(ret), KPC(dag_net));
free_dag_net(dag_net);
}
COMMON_LOG(INFO, "start finish dag net", KPC(dag_net));
(void) dag_net_sche_.finish_dag_net(*dag_net);
(void) free_dag_net(dag_net);
}
return ret;
}
bool ObTenantDagScheduler::try_switch(ObTenantDagWorker &worker)

View File

@ -571,6 +571,7 @@ public:
void set_cancel();
bool is_cancel();
bool is_inited();
bool is_started();
virtual int deal_with_cancel()
{
return OB_SUCCESS;
@ -745,7 +746,11 @@ public:
bool is_empty() const { return 0 == dag_net_map_.size();} // only for unittest
int add_dag_net(ObIDagNet &dag_net);
int finish_dag_net(ObIDagNet &dag_net);
void erase_dag_net_or_abort(ObIDagNet &dag_net);
void erase_dag_net_id_or_abort(ObIDagNet &dag_net);
void erase_block_dag_net_or_abort(ObIDagNet *dag_net);
void finish_dag_net_without_lock(ObIDagNet &dag_net);
void finish_dag_net(ObIDagNet &dag_net);
void dump_dag_status();
int64_t get_dag_net_count();
void get_all_dag_scheduler_info(
@ -1106,7 +1111,7 @@ public:
int deal_with_finish_task(ObITask &task, ObTenantDagWorker &worker, int error_code);
bool try_switch(ObTenantDagWorker &worker);
int dispatch_task(ObITask &task, ObTenantDagWorker *&ret_worker, const int64_t priority);
int finish_dag_net(ObIDagNet *dag_net);
void finish_dag_net(ObIDagNet *dag_net);
// for unittest
int get_first_dag_net(ObIDagNet *&dag_net);

View File

@ -117,6 +117,26 @@ void ObCompactionTabletMetaIterator::reset()
batch_size_ = 0;
}
int ObCompactionTabletMetaIterator::next(ObTabletInfo &tablet_info)
{
int ret = OB_SUCCESS;
do {
if (OB_FAIL(ObTabletMetaIterator::next(tablet_info))) {
if (OB_ITER_END != ret) {
LOG_WARN("fail to get next tablet info", KR(ret));
}
} else if (!tablet_info.is_valid()) {
if (tablet_info.get_replicas().empty()) {
// ObTabletMetaIterator::next may fillter some replica members and make tablet_info invalid, skip and fetch next one
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet_info is invalid", KR(ret), K(tablet_info));
}
}
} while (OB_SUCC(ret) && !tablet_info.is_valid());
return ret;
}
int ObCompactionTabletMetaIterator::init(
const uint64_t tenant_id,
const int64_t batch_size,

View File

@ -34,7 +34,7 @@ public:
ObTabletMetaIterator();
~ObTabletMetaIterator() { reset(); }
virtual void reset();
int next(ObTabletInfo &tablet_info);
virtual int next(ObTabletInfo &tablet_info);
protected:
int inner_init(
const uint64_t tenant_id);
@ -59,6 +59,7 @@ public:
const int64_t batch_size,
share::ObIServerTrace &server_trace);
virtual void reset() override;
virtual int next(ObTabletInfo &tablet_info) override;
private:
virtual int prefetch() override;