Fix bug of 1. canceled dag_net is not deleted from dag_net_map. 2. invalid tablet_info due to filter
This commit is contained in:
@ -1216,6 +1216,11 @@ bool ObIDagNet::is_inited()
|
|||||||
return OB_NOT_NULL(allocator_);
|
return OB_NOT_NULL(allocator_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ObIDagNet::is_started()
|
||||||
|
{
|
||||||
|
return start_time_ != 0;
|
||||||
|
}
|
||||||
|
|
||||||
void ObIDagNet::diagnose_dag(common::ObIArray<compaction::ObDiagnoseTabletCompProgress> &progress_list)
|
void ObIDagNet::diagnose_dag(common::ObIArray<compaction::ObDiagnoseTabletCompProgress> &progress_list)
|
||||||
{
|
{
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
@ -2132,8 +2137,8 @@ int ObDagPrioScheduler::erase_dag_net_without_lock_(ObIDagNet *erase_dag_net)
|
|||||||
COMMON_LOG(WARN, "unexpected null scheduler", K(ret), KP_(scheduler));
|
COMMON_LOG(WARN, "unexpected null scheduler", K(ret), KP_(scheduler));
|
||||||
} else if (OB_ISNULL(erase_dag_net)) {
|
} else if (OB_ISNULL(erase_dag_net)) {
|
||||||
// do nothing
|
// do nothing
|
||||||
} else if (OB_FAIL(scheduler_->finish_dag_net(erase_dag_net))) {
|
} else {
|
||||||
COMMON_LOG(WARN, "failed to erase dag net", K(ret), KPC(erase_dag_net));
|
(void) scheduler_->finish_dag_net(erase_dag_net); // void
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -3420,6 +3425,33 @@ int ObDagNetScheduler::init(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ObDagNetScheduler::erase_dag_net_or_abort(ObIDagNet &dag_net)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_FAIL(dag_net_map_.erase_refactored(&dag_net))) {
|
||||||
|
COMMON_LOG(ERROR, "failed to erase dag net from dag_net_map_", K(ret), K(dag_net));
|
||||||
|
ob_abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ObDagNetScheduler::erase_dag_net_id_or_abort(ObIDagNet &dag_net)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_FAIL(dag_net_id_map_.erase_refactored(dag_net.get_dag_id()))) {
|
||||||
|
COMMON_LOG(ERROR, "failed to erase dag_net_id from dag_net_id_map_", K(ret), K(dag_net));
|
||||||
|
ob_abort();
|
||||||
|
}
|
||||||
|
(void) erase_dag_net_or_abort(dag_net);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ObDagNetScheduler::erase_block_dag_net_or_abort(ObIDagNet *dag_net)
|
||||||
|
{
|
||||||
|
if (!blocking_dag_net_list_.remove(dag_net)) {
|
||||||
|
COMMON_LOG_RET(WARN, OB_ERR_UNEXPECTED, "failed to remove dag_net from blocking_dag_net_list", K(dag_net));
|
||||||
|
ob_abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int ObDagNetScheduler::add_dag_net(ObIDagNet &dag_net)
|
int ObDagNetScheduler::add_dag_net(ObIDagNet &dag_net)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -3443,25 +3475,11 @@ int ObDagNetScheduler::add_dag_net(ObIDagNet &dag_net)
|
|||||||
COMMON_LOG(WARN, "exist dag net is", K(dag_net), KPC(exist_dag_net));
|
COMMON_LOG(WARN, "exist dag net is", K(dag_net), KPC(exist_dag_net));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
(void) erase_dag_net_or_abort(dag_net);
|
||||||
if (OB_TMP_FAIL(dag_net_map_.erase_refactored(&dag_net))) {
|
|
||||||
COMMON_LOG(ERROR, "failed to erase from running_dag_net_map", K(tmp_ret), K(dag_net));
|
|
||||||
ob_abort();
|
|
||||||
}
|
|
||||||
} else if (!blocking_dag_net_list_.add_last(&dag_net)) {// add into blocking_dag_net_list
|
} else if (!blocking_dag_net_list_.add_last(&dag_net)) {// add into blocking_dag_net_list
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
COMMON_LOG(ERROR, "failed to add into blocking_dag_net_list", K(ret), K(dag_net));
|
COMMON_LOG(ERROR, "failed to add into blocking_dag_net_list", K(ret), K(dag_net));
|
||||||
int tmp_ret = OB_SUCCESS;
|
(void) erase_dag_net_id_or_abort(dag_net);
|
||||||
|
|
||||||
if (OB_TMP_FAIL(dag_net_id_map_.erase_refactored(dag_net.get_dag_id()))) {
|
|
||||||
COMMON_LOG(ERROR, "failed to erase from dag net id from map", K(tmp_ret), K(dag_net));
|
|
||||||
ob_abort();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (OB_TMP_FAIL(dag_net_map_.erase_refactored(&dag_net))) {
|
|
||||||
COMMON_LOG(ERROR, "failed to erase from running_dag_net_map", K(tmp_ret), K(dag_net));
|
|
||||||
ob_abort();
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
++dag_net_cnts_[dag_net.get_type()];
|
++dag_net_cnts_[dag_net.get_type()];
|
||||||
dag_net.set_add_time();
|
dag_net.set_add_time();
|
||||||
@ -3472,19 +3490,23 @@ int ObDagNetScheduler::add_dag_net(ObIDagNet &dag_net)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObDagNetScheduler::finish_dag_net(ObIDagNet &dag_net)
|
void ObDagNetScheduler::finish_dag_net_without_lock(ObIDagNet &dag_net)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
ObMutexGuard guard(dag_net_map_lock_);
|
if (OB_TMP_FAIL(dag_net.add_dag_warning_info())) {
|
||||||
if (OB_FAIL(dag_net_map_.erase_refactored(&dag_net))) {
|
COMMON_LOG_RET(WARN, tmp_ret, "failed to add dag warning info in dag net into mgr", K(tmp_ret), K(dag_net));
|
||||||
COMMON_LOG(ERROR, "failed to erase dag from running_dag_net_map", K(ret), K(dag_net));
|
|
||||||
ob_abort();
|
|
||||||
} else if (OB_FAIL(dag_net_id_map_.erase_refactored(dag_net.get_dag_id()))) {
|
|
||||||
COMMON_LOG(ERROR, "failed to erase dag from running_dag_net_map", K(ret), K(dag_net));
|
|
||||||
ob_abort();
|
|
||||||
}
|
}
|
||||||
|
if (OB_TMP_FAIL(dag_net.clear_dag_net_ctx())) {
|
||||||
|
COMMON_LOG_RET(WARN, tmp_ret, "failed to clear dag net ctx", K(tmp_ret), K(dag_net));
|
||||||
|
}
|
||||||
|
(void) erase_dag_net_id_or_abort(dag_net);
|
||||||
--dag_net_cnts_[dag_net.get_type()];
|
--dag_net_cnts_[dag_net.get_type()];
|
||||||
return ret;
|
}
|
||||||
|
|
||||||
|
void ObDagNetScheduler::finish_dag_net(ObIDagNet &dag_net)
|
||||||
|
{
|
||||||
|
ObMutexGuard guard(dag_net_map_lock_);
|
||||||
|
(void) finish_dag_net_without_lock(dag_net);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObDagNetScheduler::dump_dag_status()
|
void ObDagNetScheduler::dump_dag_status()
|
||||||
@ -3616,7 +3638,7 @@ int ObDagNetScheduler::loop_running_dag_net_map()
|
|||||||
if (OB_ISNULL(dag_net = iter->second)) {
|
if (OB_ISNULL(dag_net = iter->second)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("dag net is unepxected null", K(ret), KP(dag_net));
|
LOG_WARN("dag net is unepxected null", K(ret), KP(dag_net));
|
||||||
} else if (OB_FAIL(dag_net->schedule_rest_dag())) {
|
} else if (dag_net->is_started() && OB_FAIL(dag_net->schedule_rest_dag())) {
|
||||||
LOG_WARN("failed to schedule rest dag", K(ret));
|
LOG_WARN("failed to schedule rest dag", K(ret));
|
||||||
} else if (dag_net->check_finished_and_mark_stop()) {
|
} else if (dag_net->check_finished_and_mark_stop()) {
|
||||||
LOG_WARN("dag net is in finish state", K(ret), KPC(dag_net));
|
LOG_WARN("dag net is in finish state", K(ret), KPC(dag_net));
|
||||||
@ -3655,34 +3677,19 @@ int ObDagNetScheduler::loop_blocking_dag_net_list()
|
|||||||
int64_t rest_cnt = DEFAULT_MAX_RUNNING_DAG_NET_CNT - (dag_net_map_.size() - blocking_dag_net_list_.get_size());
|
int64_t rest_cnt = DEFAULT_MAX_RUNNING_DAG_NET_CNT - (dag_net_map_.size() - blocking_dag_net_list_.get_size());
|
||||||
while (NULL != cur && head != cur && rest_cnt > 0 && !is_dag_map_full()) {
|
while (NULL != cur && head != cur && rest_cnt > 0 && !is_dag_map_full()) {
|
||||||
LOG_DEBUG("loop blocking dag net list", K(ret), KPC(cur), K(rest_cnt));
|
LOG_DEBUG("loop blocking dag net list", K(ret), KPC(cur), K(rest_cnt));
|
||||||
if (OB_FAIL(cur->start_running())) { // call start_running function
|
|
||||||
COMMON_LOG(WARN, "failed to start running", K(ret), KPC(cur));
|
|
||||||
int64_t tmp_ret = OB_SUCCESS;
|
|
||||||
if (OB_TMP_FAIL(dag_net_map_.erase_refactored(cur))) {
|
|
||||||
COMMON_LOG(ERROR, "failed to erase from running_dag_net_map", K(tmp_ret), KPC(cur));
|
|
||||||
ob_abort();
|
|
||||||
} else if (OB_TMP_FAIL(dag_net_id_map_.erase_refactored(cur->get_dag_id()))) {
|
|
||||||
COMMON_LOG(ERROR, "failed to erase from running_dag_net_id_map", K(tmp_ret), KPC(cur));
|
|
||||||
ob_abort();
|
|
||||||
} else {
|
|
||||||
tmp = cur;
|
tmp = cur;
|
||||||
cur = cur->get_next();
|
cur = cur->get_next();
|
||||||
if (!blocking_dag_net_list_.remove(tmp)) {
|
if (tmp->is_cancel() || OB_FAIL(tmp->start_running())) { // call start_running function
|
||||||
COMMON_LOG(WARN, "failed to remove dag_net from blocking_dag_net_list", K(tmp));
|
if (OB_FAIL(ret)) {
|
||||||
ob_abort();
|
COMMON_LOG(WARN, "failed to start running or be canceled", K(ret), KPC(cur));
|
||||||
}
|
|
||||||
--dag_net_cnts_[tmp->get_type()];
|
|
||||||
scheduler_->free_dag_net(tmp);
|
|
||||||
}
|
}
|
||||||
|
(void) finish_dag_net_without_lock(*tmp);
|
||||||
|
(void) erase_block_dag_net_or_abort(tmp);
|
||||||
|
(void) scheduler_->free_dag_net(tmp); // set tmp nullptr
|
||||||
} else {
|
} else {
|
||||||
cur->set_start_time();
|
tmp->set_start_time();
|
||||||
tmp = cur;
|
|
||||||
cur = cur->get_next();
|
|
||||||
--rest_cnt;
|
--rest_cnt;
|
||||||
if (!blocking_dag_net_list_.remove(tmp)) {
|
(void) erase_block_dag_net_or_abort(tmp);
|
||||||
COMMON_LOG(WARN, "failed to remove dag_net from blocking_dag_net_list", K(tmp));
|
|
||||||
ob_abort();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3768,6 +3775,10 @@ int ObDagNetScheduler::check_ls_compaction_dag_exist_with_cancel(const ObLSID &l
|
|||||||
if (OB_ISNULL(cur_dag_net = iter->second)) {
|
if (OB_ISNULL(cur_dag_net = iter->second)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("dag net is unepxected null", K(ret), KP(cur_dag_net));
|
LOG_WARN("dag net is unepxected null", K(ret), KP(cur_dag_net));
|
||||||
|
} else if (cur_dag_net->check_finished_and_mark_stop()) {
|
||||||
|
LOG_INFO("start finish dag net", K(ret), KPC(cur_dag_net));
|
||||||
|
(void) finish_dag_net_without_lock(*cur_dag_net);
|
||||||
|
(void) scheduler_->free_dag_net(cur_dag_net);
|
||||||
} else if (cur_dag_net->is_co_dag_net()) {
|
} else if (cur_dag_net->is_co_dag_net()) {
|
||||||
compaction::ObCOMergeDagNet *co_dag_net = static_cast<compaction::ObCOMergeDagNet*>(cur_dag_net);
|
compaction::ObCOMergeDagNet *co_dag_net = static_cast<compaction::ObCOMergeDagNet*>(cur_dag_net);
|
||||||
if (ls_id == co_dag_net->get_dag_param().ls_id_) {
|
if (ls_id == co_dag_net->get_dag_param().ls_id_) {
|
||||||
@ -4419,23 +4430,13 @@ int ObTenantDagScheduler::deal_with_finish_task(ObITask &task, ObTenantDagWorker
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTenantDagScheduler::finish_dag_net(ObIDagNet *dag_net)
|
void ObTenantDagScheduler::finish_dag_net(ObIDagNet *dag_net)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
int tmp_ret = OB_SUCCESS;
|
|
||||||
if (OB_NOT_NULL(dag_net)) {
|
if (OB_NOT_NULL(dag_net)) {
|
||||||
if (OB_TMP_FAIL(dag_net->add_dag_warning_info())) {
|
COMMON_LOG(INFO, "start finish dag net", KPC(dag_net));
|
||||||
COMMON_LOG(WARN, "failed to add dag warning info in dag net into mgr", K(tmp_ret), KPC(dag_net));
|
(void) dag_net_sche_.finish_dag_net(*dag_net);
|
||||||
|
(void) free_dag_net(dag_net);
|
||||||
}
|
}
|
||||||
if (OB_TMP_FAIL(dag_net->clear_dag_net_ctx())) {
|
|
||||||
COMMON_LOG(WARN, "failed to clear dag net ctx", K(tmp_ret), KPC(dag_net));
|
|
||||||
}
|
|
||||||
if (OB_SUCC(dag_net_sche_.finish_dag_net(*dag_net))) {
|
|
||||||
COMMON_LOG(INFO, "dag net finished", K(ret), KPC(dag_net));
|
|
||||||
free_dag_net(dag_net);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ObTenantDagScheduler::try_switch(ObTenantDagWorker &worker)
|
bool ObTenantDagScheduler::try_switch(ObTenantDagWorker &worker)
|
||||||
|
|||||||
@ -571,6 +571,7 @@ public:
|
|||||||
void set_cancel();
|
void set_cancel();
|
||||||
bool is_cancel();
|
bool is_cancel();
|
||||||
bool is_inited();
|
bool is_inited();
|
||||||
|
bool is_started();
|
||||||
virtual int deal_with_cancel()
|
virtual int deal_with_cancel()
|
||||||
{
|
{
|
||||||
return OB_SUCCESS;
|
return OB_SUCCESS;
|
||||||
@ -745,7 +746,11 @@ public:
|
|||||||
|
|
||||||
bool is_empty() const { return 0 == dag_net_map_.size();} // only for unittest
|
bool is_empty() const { return 0 == dag_net_map_.size();} // only for unittest
|
||||||
int add_dag_net(ObIDagNet &dag_net);
|
int add_dag_net(ObIDagNet &dag_net);
|
||||||
int finish_dag_net(ObIDagNet &dag_net);
|
void erase_dag_net_or_abort(ObIDagNet &dag_net);
|
||||||
|
void erase_dag_net_id_or_abort(ObIDagNet &dag_net);
|
||||||
|
void erase_block_dag_net_or_abort(ObIDagNet *dag_net);
|
||||||
|
void finish_dag_net_without_lock(ObIDagNet &dag_net);
|
||||||
|
void finish_dag_net(ObIDagNet &dag_net);
|
||||||
void dump_dag_status();
|
void dump_dag_status();
|
||||||
int64_t get_dag_net_count();
|
int64_t get_dag_net_count();
|
||||||
void get_all_dag_scheduler_info(
|
void get_all_dag_scheduler_info(
|
||||||
@ -1106,7 +1111,7 @@ public:
|
|||||||
int deal_with_finish_task(ObITask &task, ObTenantDagWorker &worker, int error_code);
|
int deal_with_finish_task(ObITask &task, ObTenantDagWorker &worker, int error_code);
|
||||||
bool try_switch(ObTenantDagWorker &worker);
|
bool try_switch(ObTenantDagWorker &worker);
|
||||||
int dispatch_task(ObITask &task, ObTenantDagWorker *&ret_worker, const int64_t priority);
|
int dispatch_task(ObITask &task, ObTenantDagWorker *&ret_worker, const int64_t priority);
|
||||||
int finish_dag_net(ObIDagNet *dag_net);
|
void finish_dag_net(ObIDagNet *dag_net);
|
||||||
// for unittest
|
// for unittest
|
||||||
int get_first_dag_net(ObIDagNet *&dag_net);
|
int get_first_dag_net(ObIDagNet *&dag_net);
|
||||||
|
|
||||||
|
|||||||
@ -117,6 +117,26 @@ void ObCompactionTabletMetaIterator::reset()
|
|||||||
batch_size_ = 0;
|
batch_size_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObCompactionTabletMetaIterator::next(ObTabletInfo &tablet_info)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
do {
|
||||||
|
if (OB_FAIL(ObTabletMetaIterator::next(tablet_info))) {
|
||||||
|
if (OB_ITER_END != ret) {
|
||||||
|
LOG_WARN("fail to get next tablet info", KR(ret));
|
||||||
|
}
|
||||||
|
} else if (!tablet_info.is_valid()) {
|
||||||
|
if (tablet_info.get_replicas().empty()) {
|
||||||
|
// ObTabletMetaIterator::next may fillter some replica members and make tablet_info invalid, skip and fetch next one
|
||||||
|
} else {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("tablet_info is invalid", KR(ret), K(tablet_info));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (OB_SUCC(ret) && !tablet_info.is_valid());
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObCompactionTabletMetaIterator::init(
|
int ObCompactionTabletMetaIterator::init(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const int64_t batch_size,
|
const int64_t batch_size,
|
||||||
|
|||||||
@ -34,7 +34,7 @@ public:
|
|||||||
ObTabletMetaIterator();
|
ObTabletMetaIterator();
|
||||||
~ObTabletMetaIterator() { reset(); }
|
~ObTabletMetaIterator() { reset(); }
|
||||||
virtual void reset();
|
virtual void reset();
|
||||||
int next(ObTabletInfo &tablet_info);
|
virtual int next(ObTabletInfo &tablet_info);
|
||||||
protected:
|
protected:
|
||||||
int inner_init(
|
int inner_init(
|
||||||
const uint64_t tenant_id);
|
const uint64_t tenant_id);
|
||||||
@ -59,6 +59,7 @@ public:
|
|||||||
const int64_t batch_size,
|
const int64_t batch_size,
|
||||||
share::ObIServerTrace &server_trace);
|
share::ObIServerTrace &server_trace);
|
||||||
virtual void reset() override;
|
virtual void reset() override;
|
||||||
|
virtual int next(ObTabletInfo &tablet_info) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
virtual int prefetch() override;
|
virtual int prefetch() override;
|
||||||
|
|||||||
Reference in New Issue
Block a user