diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.cpp b/src/share/scheduler/ob_tenant_dag_scheduler.cpp index f2afb9360e..ed9a72a004 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.cpp +++ b/src/share/scheduler/ob_tenant_dag_scheduler.cpp @@ -876,9 +876,7 @@ int ObIDag::finish(const ObDagStatus status, bool &dag_net_finished) { int ret = OB_SUCCESS; { - // If force_cancel_flag is set but status is not DAG_STATUS_ABORT, the final task actually succeed. - if (OB_UNLIKELY(force_cancel_flag_ && DAG_STATUS_ABORT == status)) { - } else if (OB_FAIL(remove_child_for_parents())) { + if (OB_FAIL(remove_child_for_parents())) { COMMON_LOG(WARN, "failed to remove child for parents", K(ret)); } else if (OB_FAIL(remove_parent_for_children())) { COMMON_LOG(WARN, "failed to remove parent for children", K(ret)); @@ -892,6 +890,15 @@ int ObIDag::finish(const ObDagStatus status, bool &dag_net_finished) return ret; } +void ObIDag::set_force_cancel_flag() +{ + force_cancel_flag_ = true; + // dag_net and dags in the same dag net should be canceled too. + if (OB_NOT_NULL(dag_net_)) { + dag_net_->set_cancel(); + } +} + int ObIDag::add_child_without_inheritance(ObIDag &child) { int ret = OB_SUCCESS; @@ -1003,6 +1010,10 @@ ObIDagNet::ObIDagNet( { } +/* + * ATTENTION: DO NOT call this function if if this dag has parent dag. + * When parent adds child, child will be add into the same dag net with parent. + */ int ObIDagNet::add_dag_into_dag_net(ObIDag &dag) { int ret = OB_SUCCESS; diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.h b/src/share/scheduler/ob_tenant_dag_scheduler.h index 1d5652a0e2..e6b551c8a5 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.h +++ b/src/share/scheduler/ob_tenant_dag_scheduler.h @@ -435,7 +435,7 @@ public: } void set_start_time() { start_time_ = ObTimeUtility::fast_current_time(); } int64_t get_start_time() const { return start_time_; } - void set_force_cancel_flag() { force_cancel_flag_ = true; } + void set_force_cancel_flag(); bool get_force_cancel_flag() { return force_cancel_flag_; } int add_child_without_inheritance(ObIDag &child); int add_child_without_inheritance(const common::ObIArray &child_array); diff --git a/tools/deploy/mysql_test/test_suite/merge_uncommitted/t/rollback_sql_sequence_check_lock.test b/tools/deploy/mysql_test/test_suite/merge_uncommitted/t/rollback_sql_sequence_check_lock.test index 8267557b44..96ce054db7 100644 --- a/tools/deploy/mysql_test/test_suite/merge_uncommitted/t/rollback_sql_sequence_check_lock.test +++ b/tools/deploy/mysql_test/test_suite/merge_uncommitted/t/rollback_sql_sequence_check_lock.test @@ -1,4 +1,4 @@ -# owner: lixia.yq +# owner: gengli.wzy # owner group: transaction # description: 本case是为了测试在含未提交事务的状态下,with savepoint --disable_query_log diff --git a/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp b/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp index f92daa06ee..9faf93fdf7 100644 --- a/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp +++ b/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp @@ -1618,6 +1618,76 @@ TEST_F(TestDagScheduler, test_cancel_dag_net_func) EXPECT_EQ(0, MTL(ObDagWarningHistoryManager *)->size()); } +class ObCancelWaitingDagInDagNet: public ObCancelDagNet +{ +public: + ObCancelWaitingDagInDagNet() + : ObCancelDagNet(), + target_dag_cnt_(4), + first_dag_(nullptr) + {} + + virtual int start_running() override + { + int ret = OB_SUCCESS; + ObCancelDag *dag = nullptr; + ObCancelDag *last_dag = nullptr; + // Dag1 -> Dag2/Dag3 + // Dag2 + // Dag3 -> Dag2/Dag4 + // Dag4 -> Dag2 + for (int64_t i = 0; i < target_dag_cnt_ && OB_SUCC(ret); i++) { + if (OB_FAIL(MTL(ObTenantDagScheduler*)->alloc_dag(dag))) { + COMMON_LOG(WARN, "Fail to create dag", K(ret)); + } else if (OB_ISNULL(first_dag_) && OB_FAIL(add_dag_into_dag_net(*dag))) { // add first dag into this dag_net + COMMON_LOG(WARN, "Fail to add dag into dag_net", K(ret)); + } else if (OB_NOT_NULL(first_dag_) && i != (target_dag_cnt_ - 1) && OB_FAIL(first_dag_->add_child(*dag))) { + COMMON_LOG(WARN, "Fail to add child of first_dag", K(ret), KPC(first_dag_), KPC(dag)); + } else if (OB_NOT_NULL(last_dag) && i == (target_dag_cnt_ - 1) && OB_FAIL(last_dag->add_child(*dag))) { + COMMON_LOG(WARN, "Fail to add child of last_dag", K(ret), KPC(last_dag), KPC(dag)); + } else if (OB_FAIL(dag->create_first_task())) { + COMMON_LOG(WARN, "Fail to create first task", K(ret)); + } else if (OB_FAIL(MTL(ObTenantDagScheduler*)->add_dag(dag))) { + COMMON_LOG(WARN, "failed to add dag", K(ret), K(dag)); + } else if (OB_ISNULL(first_dag_)) { + first_dag_ = dag; + } + last_dag = dag; + EXPECT_EQ(OB_SUCCESS, ret); + } + return ret; + } +public: + int64_t target_dag_cnt_; + ObCancelDag *first_dag_; +}; + +TEST_F(TestDagScheduler, test_cancel_waiting_dag) +{ + int ret = OB_SUCCESS; + ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*); + ASSERT_TRUE(nullptr != scheduler); + EXPECT_EQ(OB_SUCCESS, scheduler->create_and_add_dag_net(nullptr)); + ObIDagNet *tmp_dag_net = nullptr; + EXPECT_EQ(OB_SUCCESS, scheduler->get_first_dag_net(tmp_dag_net)); + EXPECT_NE(nullptr, tmp_dag_net); + + while (scheduler->get_cur_dag_cnt() < 4 /*target_dag_cnt*/) { + usleep(100); + } + + ObCancelWaitingDagInDagNet *dag_net = static_cast(tmp_dag_net); + EXPECT_NE(nullptr, dag_net); + ObArray dag_array; + dag_net->get_dag_list(dag_array); + EXPECT_EQ(4, dag_array.count()); + + ObCancelDag *first_dag = dag_net->first_dag_; + EXPECT_NE(nullptr, first_dag); + first_dag->set_force_cancel_flag(); + first_dag->can_schedule_ = true; + wait_scheduler(); +} TEST_F(TestDagScheduler, test_destroy_when_running) //TODO(renju.rj): fix it {