patch is_schema_changed in medium info & cancel ls compaction waiting dag
This commit is contained in:
@ -2172,32 +2172,57 @@ int ObTenantDagScheduler::get_minor_exe_dag_info(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantDagScheduler::check_ls_compaction_dag_exist(const ObLSID &ls_id, bool &exist)
|
||||
int ObTenantDagScheduler::check_ls_compaction_dag_exist_with_cancel(
|
||||
const ObLSID &ls_id,
|
||||
bool &exist)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
exist = false;
|
||||
compaction::ObTabletMergeDag *dag = nullptr;
|
||||
ObThreadCondGuard guard(scheduler_sync_);
|
||||
for (int64_t i = 0; i < ObIDag::MergeDagPrioCnt; ++i) {
|
||||
ObIDag *head = dag_list_[READY_DAG_LIST].get_head(ObIDag::MergeDagPrio[i]);
|
||||
ObIDag *cur = head->get_next();
|
||||
while (head != cur) {
|
||||
if (ObDagType::DAG_TYPE_MDS_TABLE_MERGE == cur->get_type()) {
|
||||
// TODO (bowen.gbw) : make ObMdsTableMergeDag inherit from ObTabletMergeDag
|
||||
const mds::ObMdsTableMergeDag *mds_dag = static_cast<const mds::ObMdsTableMergeDag *>(cur);
|
||||
if (ls_id == mds_dag->get_param().ls_id_) {
|
||||
exist = true;
|
||||
break;
|
||||
ObIDagNet *unused_erase_dag_net = nullptr;
|
||||
ObIDag *cancel_dag = nullptr;
|
||||
bool cancel_flag = false;
|
||||
int64_t cancel_dag_cnt = 0;
|
||||
{
|
||||
ObThreadCondGuard guard(scheduler_sync_);
|
||||
for (int64_t i = 0; i < ObIDag::MergeDagPrioCnt; ++i) {
|
||||
ObIDag *head = dag_list_[READY_DAG_LIST].get_head(ObIDag::MergeDagPrio[i]);
|
||||
ObIDag *cur = head->get_next();
|
||||
while (head != cur) {
|
||||
if (ObDagType::DAG_TYPE_MDS_TABLE_MERGE == cur->get_type()) {
|
||||
// TODO (bowen.gbw) : make ObMdsTableMergeDag inherit from ObTabletMergeDag
|
||||
const mds::ObMdsTableMergeDag *mds_dag = static_cast<const mds::ObMdsTableMergeDag *>(cur);
|
||||
cancel_flag = (ls_id == mds_dag->get_param().ls_id_);
|
||||
} else {
|
||||
dag = static_cast<compaction::ObTabletMergeDag *>(cur);
|
||||
cancel_flag = (ls_id == dag->get_ls_id());
|
||||
}
|
||||
} else {
|
||||
dag = static_cast<compaction::ObTabletMergeDag *>(cur);
|
||||
if (ls_id == dag->get_ctx().param_.ls_id_) {
|
||||
exist = true;
|
||||
break;
|
||||
if (cancel_flag) {
|
||||
if (cur->get_dag_status() == ObIDag::DAG_STATUS_READY) {
|
||||
cancel_dag = cur;
|
||||
cur = cur->get_next();
|
||||
if (OB_UNLIKELY(nullptr != cancel_dag->get_dag_net())) {
|
||||
tmp_ret = OB_ERR_UNEXPECTED;
|
||||
COMMON_LOG(WARN, "compaction dag should not in dag net", KR(tmp_ret));
|
||||
} else if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cancel_dag, unused_erase_dag_net))) {
|
||||
COMMON_LOG(WARN, "failed to erase dag", K(tmp_ret), KPC(cancel_dag));
|
||||
ob_abort();
|
||||
} else {
|
||||
++cancel_dag_cnt;
|
||||
}
|
||||
} else {
|
||||
exist = true;
|
||||
cur = cur->get_next();
|
||||
}
|
||||
} else {
|
||||
cur = cur->get_next();
|
||||
}
|
||||
}
|
||||
cur = cur->get_next();
|
||||
}
|
||||
} // end of scheduler_sync_
|
||||
if (OB_SUCC(ret)) {
|
||||
COMMON_LOG(INFO, "cancel dag when check ls compaction dag exist", KR(ret), K(cancel_dag_cnt), K(exist));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user