diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.cpp b/src/share/scheduler/ob_tenant_dag_scheduler.cpp index cc1a6de74b..9c62b54087 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.cpp +++ b/src/share/scheduler/ob_tenant_dag_scheduler.cpp @@ -1216,6 +1216,11 @@ bool ObIDagNet::is_inited() return OB_NOT_NULL(allocator_); } +bool ObIDagNet::is_started() +{ + return start_time_ != 0; +} + void ObIDagNet::diagnose_dag(common::ObIArray &progress_list) { int tmp_ret = OB_SUCCESS; @@ -2132,8 +2137,8 @@ int ObDagPrioScheduler::erase_dag_net_without_lock_(ObIDagNet *erase_dag_net) COMMON_LOG(WARN, "unexpected null scheduler", K(ret), KP_(scheduler)); } else if (OB_ISNULL(erase_dag_net)) { // do nothing - } else if (OB_FAIL(scheduler_->finish_dag_net(erase_dag_net))) { - COMMON_LOG(WARN, "failed to erase dag net", K(ret), KPC(erase_dag_net)); + } else { + (void) scheduler_->finish_dag_net(erase_dag_net); // void } return ret; } @@ -3420,6 +3425,33 @@ int ObDagNetScheduler::init( return ret; } +void ObDagNetScheduler::erase_dag_net_or_abort(ObIDagNet &dag_net) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(dag_net_map_.erase_refactored(&dag_net))) { + COMMON_LOG(ERROR, "failed to erase dag net from dag_net_map_", K(ret), K(dag_net)); + ob_abort(); + } +} + +void ObDagNetScheduler::erase_dag_net_id_or_abort(ObIDagNet &dag_net) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(dag_net_id_map_.erase_refactored(dag_net.get_dag_id()))) { + COMMON_LOG(ERROR, "failed to erase dag_net_id from dag_net_id_map_", K(ret), K(dag_net)); + ob_abort(); + } + (void) erase_dag_net_or_abort(dag_net); +} + +void ObDagNetScheduler::erase_block_dag_net_or_abort(ObIDagNet *dag_net) +{ + if (!blocking_dag_net_list_.remove(dag_net)) { + COMMON_LOG_RET(WARN, OB_ERR_UNEXPECTED, "failed to remove dag_net from blocking_dag_net_list", K(dag_net)); + ob_abort(); + } +} + int ObDagNetScheduler::add_dag_net(ObIDagNet &dag_net) { int ret = OB_SUCCESS; @@ -3443,25 +3475,11 @@ int ObDagNetScheduler::add_dag_net(ObIDagNet &dag_net) COMMON_LOG(WARN, "exist dag net is", K(dag_net), KPC(exist_dag_net)); } } - - if (OB_TMP_FAIL(dag_net_map_.erase_refactored(&dag_net))) { - COMMON_LOG(ERROR, "failed to erase from running_dag_net_map", K(tmp_ret), K(dag_net)); - ob_abort(); - } + (void) erase_dag_net_or_abort(dag_net); } else if (!blocking_dag_net_list_.add_last(&dag_net)) {// add into blocking_dag_net_list ret = OB_ERR_UNEXPECTED; COMMON_LOG(ERROR, "failed to add into blocking_dag_net_list", K(ret), K(dag_net)); - int tmp_ret = OB_SUCCESS; - - if (OB_TMP_FAIL(dag_net_id_map_.erase_refactored(dag_net.get_dag_id()))) { - COMMON_LOG(ERROR, "failed to erase from dag net id from map", K(tmp_ret), K(dag_net)); - ob_abort(); - } - - if (OB_TMP_FAIL(dag_net_map_.erase_refactored(&dag_net))) { - COMMON_LOG(ERROR, "failed to erase from running_dag_net_map", K(tmp_ret), K(dag_net)); - ob_abort(); - } + (void) erase_dag_net_id_or_abort(dag_net); } else { ++dag_net_cnts_[dag_net.get_type()]; dag_net.set_add_time(); @@ -3472,19 +3490,23 @@ int ObDagNetScheduler::add_dag_net(ObIDagNet &dag_net) return ret; } -int ObDagNetScheduler::finish_dag_net(ObIDagNet &dag_net) +void ObDagNetScheduler::finish_dag_net_without_lock(ObIDagNet &dag_net) { - int ret = OB_SUCCESS; - ObMutexGuard guard(dag_net_map_lock_); - if (OB_FAIL(dag_net_map_.erase_refactored(&dag_net))) { - COMMON_LOG(ERROR, "failed to erase dag from running_dag_net_map", K(ret), K(dag_net)); - ob_abort(); - } else if (OB_FAIL(dag_net_id_map_.erase_refactored(dag_net.get_dag_id()))) { - COMMON_LOG(ERROR, "failed to erase dag from running_dag_net_map", K(ret), K(dag_net)); - ob_abort(); + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(dag_net.add_dag_warning_info())) { + COMMON_LOG_RET(WARN, tmp_ret, "failed to add dag warning info in dag net into mgr", K(tmp_ret), K(dag_net)); } + if (OB_TMP_FAIL(dag_net.clear_dag_net_ctx())) { + COMMON_LOG_RET(WARN, tmp_ret, "failed to clear dag net ctx", K(tmp_ret), K(dag_net)); + } + (void) erase_dag_net_id_or_abort(dag_net); --dag_net_cnts_[dag_net.get_type()]; - return ret; +} + +void ObDagNetScheduler::finish_dag_net(ObIDagNet &dag_net) +{ + ObMutexGuard guard(dag_net_map_lock_); + (void) finish_dag_net_without_lock(dag_net); } void ObDagNetScheduler::dump_dag_status() @@ -3616,7 +3638,7 @@ int ObDagNetScheduler::loop_running_dag_net_map() if (OB_ISNULL(dag_net = iter->second)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("dag net is unepxected null", K(ret), KP(dag_net)); - } else if (OB_FAIL(dag_net->schedule_rest_dag())) { + } else if (dag_net->is_started() && OB_FAIL(dag_net->schedule_rest_dag())) { LOG_WARN("failed to schedule rest dag", K(ret)); } else if (dag_net->check_finished_and_mark_stop()) { LOG_WARN("dag net is in finish state", K(ret), KPC(dag_net)); @@ -3655,34 +3677,19 @@ int ObDagNetScheduler::loop_blocking_dag_net_list() int64_t rest_cnt = DEFAULT_MAX_RUNNING_DAG_NET_CNT - (dag_net_map_.size() - blocking_dag_net_list_.get_size()); while (NULL != cur && head != cur && rest_cnt > 0 && !is_dag_map_full()) { LOG_DEBUG("loop blocking dag net list", K(ret), KPC(cur), K(rest_cnt)); - if (OB_FAIL(cur->start_running())) { // call start_running function - COMMON_LOG(WARN, "failed to start running", K(ret), KPC(cur)); - int64_t tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(dag_net_map_.erase_refactored(cur))) { - COMMON_LOG(ERROR, "failed to erase from running_dag_net_map", K(tmp_ret), KPC(cur)); - ob_abort(); - } else if (OB_TMP_FAIL(dag_net_id_map_.erase_refactored(cur->get_dag_id()))) { - COMMON_LOG(ERROR, "failed to erase from running_dag_net_id_map", K(tmp_ret), KPC(cur)); - ob_abort(); - } else { - tmp = cur; - cur = cur->get_next(); - if (!blocking_dag_net_list_.remove(tmp)) { - COMMON_LOG(WARN, "failed to remove dag_net from blocking_dag_net_list", K(tmp)); - ob_abort(); - } - --dag_net_cnts_[tmp->get_type()]; - scheduler_->free_dag_net(tmp); + tmp = cur; + cur = cur->get_next(); + if (tmp->is_cancel() || OB_FAIL(tmp->start_running())) { // call start_running function + if (OB_FAIL(ret)) { + COMMON_LOG(WARN, "failed to start running or be canceled", K(ret), KPC(cur)); } + (void) finish_dag_net_without_lock(*tmp); + (void) erase_block_dag_net_or_abort(tmp); + (void) scheduler_->free_dag_net(tmp); // set tmp nullptr } else { - cur->set_start_time(); - tmp = cur; - cur = cur->get_next(); + tmp->set_start_time(); --rest_cnt; - if (!blocking_dag_net_list_.remove(tmp)) { - COMMON_LOG(WARN, "failed to remove dag_net from blocking_dag_net_list", K(tmp)); - ob_abort(); - } + (void) erase_block_dag_net_or_abort(tmp); } } } @@ -3768,6 +3775,10 @@ int ObDagNetScheduler::check_ls_compaction_dag_exist_with_cancel(const ObLSID &l if (OB_ISNULL(cur_dag_net = iter->second)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("dag net is unepxected null", K(ret), KP(cur_dag_net)); + } else if (cur_dag_net->check_finished_and_mark_stop()) { + LOG_INFO("start finish dag net", K(ret), KPC(cur_dag_net)); + (void) finish_dag_net_without_lock(*cur_dag_net); + (void) scheduler_->free_dag_net(cur_dag_net); } else if (cur_dag_net->is_co_dag_net()) { compaction::ObCOMergeDagNet *co_dag_net = static_cast(cur_dag_net); if (ls_id == co_dag_net->get_dag_param().ls_id_) { @@ -4419,23 +4430,13 @@ int ObTenantDagScheduler::deal_with_finish_task(ObITask &task, ObTenantDagWorker return ret; } -int ObTenantDagScheduler::finish_dag_net(ObIDagNet *dag_net) +void ObTenantDagScheduler::finish_dag_net(ObIDagNet *dag_net) { - int ret = OB_SUCCESS; - int tmp_ret = OB_SUCCESS; if (OB_NOT_NULL(dag_net)) { - if (OB_TMP_FAIL(dag_net->add_dag_warning_info())) { - COMMON_LOG(WARN, "failed to add dag warning info in dag net into mgr", K(tmp_ret), KPC(dag_net)); - } - if (OB_TMP_FAIL(dag_net->clear_dag_net_ctx())) { - COMMON_LOG(WARN, "failed to clear dag net ctx", K(tmp_ret), KPC(dag_net)); - } - if (OB_SUCC(dag_net_sche_.finish_dag_net(*dag_net))) { - COMMON_LOG(INFO, "dag net finished", K(ret), KPC(dag_net)); - free_dag_net(dag_net); - } + COMMON_LOG(INFO, "start finish dag net", KPC(dag_net)); + (void) dag_net_sche_.finish_dag_net(*dag_net); + (void) free_dag_net(dag_net); } - return ret; } bool ObTenantDagScheduler::try_switch(ObTenantDagWorker &worker) diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.h b/src/share/scheduler/ob_tenant_dag_scheduler.h index 5a99a1803e..6afc208064 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.h +++ b/src/share/scheduler/ob_tenant_dag_scheduler.h @@ -571,6 +571,7 @@ public: void set_cancel(); bool is_cancel(); bool is_inited(); + bool is_started(); virtual int deal_with_cancel() { return OB_SUCCESS; @@ -745,7 +746,11 @@ public: bool is_empty() const { return 0 == dag_net_map_.size();} // only for unittest int add_dag_net(ObIDagNet &dag_net); - int finish_dag_net(ObIDagNet &dag_net); + void erase_dag_net_or_abort(ObIDagNet &dag_net); + void erase_dag_net_id_or_abort(ObIDagNet &dag_net); + void erase_block_dag_net_or_abort(ObIDagNet *dag_net); + void finish_dag_net_without_lock(ObIDagNet &dag_net); + void finish_dag_net(ObIDagNet &dag_net); void dump_dag_status(); int64_t get_dag_net_count(); void get_all_dag_scheduler_info( @@ -1106,7 +1111,7 @@ public: int deal_with_finish_task(ObITask &task, ObTenantDagWorker &worker, int error_code); bool try_switch(ObTenantDagWorker &worker); int dispatch_task(ObITask &task, ObTenantDagWorker *&ret_worker, const int64_t priority); - int finish_dag_net(ObIDagNet *dag_net); + void finish_dag_net(ObIDagNet *dag_net); // for unittest int get_first_dag_net(ObIDagNet *&dag_net); diff --git a/src/share/tablet/ob_tablet_table_iterator.cpp b/src/share/tablet/ob_tablet_table_iterator.cpp index 1f5212a27d..2edae92aef 100644 --- a/src/share/tablet/ob_tablet_table_iterator.cpp +++ b/src/share/tablet/ob_tablet_table_iterator.cpp @@ -117,6 +117,26 @@ void ObCompactionTabletMetaIterator::reset() batch_size_ = 0; } +int ObCompactionTabletMetaIterator::next(ObTabletInfo &tablet_info) +{ + int ret = OB_SUCCESS; + do { + if (OB_FAIL(ObTabletMetaIterator::next(tablet_info))) { + if (OB_ITER_END != ret) { + LOG_WARN("fail to get next tablet info", KR(ret)); + } + } else if (!tablet_info.is_valid()) { + if (tablet_info.get_replicas().empty()) { + // ObTabletMetaIterator::next may fillter some replica members and make tablet_info invalid, skip and fetch next one + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("tablet_info is invalid", KR(ret), K(tablet_info)); + } + } + } while (OB_SUCC(ret) && !tablet_info.is_valid()); + return ret; +} + int ObCompactionTabletMetaIterator::init( const uint64_t tenant_id, const int64_t batch_size, diff --git a/src/share/tablet/ob_tablet_table_iterator.h b/src/share/tablet/ob_tablet_table_iterator.h index b6f947e215..67397f1f91 100644 --- a/src/share/tablet/ob_tablet_table_iterator.h +++ b/src/share/tablet/ob_tablet_table_iterator.h @@ -34,7 +34,7 @@ public: ObTabletMetaIterator(); ~ObTabletMetaIterator() { reset(); } virtual void reset(); - int next(ObTabletInfo &tablet_info); + virtual int next(ObTabletInfo &tablet_info); protected: int inner_init( const uint64_t tenant_id); @@ -59,6 +59,7 @@ public: const int64_t batch_size, share::ObIServerTrace &server_trace); virtual void reset() override; + virtual int next(ObTabletInfo &tablet_info) override; private: virtual int prefetch() override;