allow cancel waiting dag when parent dag or dag net is canceled

This commit is contained in:
Tsunaou 2024-01-09 07:12:53 +00:00 committed by ob-robot
parent a1dda71c11
commit bb72497741
4 changed files with 86 additions and 5 deletions

View File

@ -876,9 +876,7 @@ int ObIDag::finish(const ObDagStatus status, bool &dag_net_finished)
{
int ret = OB_SUCCESS;
{
// If force_cancel_flag is set but status is not DAG_STATUS_ABORT, the final task actually succeed.
if (OB_UNLIKELY(force_cancel_flag_ && DAG_STATUS_ABORT == status)) {
} else if (OB_FAIL(remove_child_for_parents())) {
if (OB_FAIL(remove_child_for_parents())) {
COMMON_LOG(WARN, "failed to remove child for parents", K(ret));
} else if (OB_FAIL(remove_parent_for_children())) {
COMMON_LOG(WARN, "failed to remove parent for children", K(ret));
@ -892,6 +890,15 @@ int ObIDag::finish(const ObDagStatus status, bool &dag_net_finished)
return ret;
}
void ObIDag::set_force_cancel_flag()
{
force_cancel_flag_ = true;
// dag_net and dags in the same dag net should be canceled too.
if (OB_NOT_NULL(dag_net_)) {
dag_net_->set_cancel();
}
}
int ObIDag::add_child_without_inheritance(ObIDag &child)
{
int ret = OB_SUCCESS;
@ -1003,6 +1010,10 @@ ObIDagNet::ObIDagNet(
{
}
/*
* ATTENTION: DO NOT call this function if if this dag has parent dag.
* When parent adds child, child will be add into the same dag net with parent.
*/
int ObIDagNet::add_dag_into_dag_net(ObIDag &dag)
{
int ret = OB_SUCCESS;

View File

@ -435,7 +435,7 @@ public:
}
void set_start_time() { start_time_ = ObTimeUtility::fast_current_time(); }
int64_t get_start_time() const { return start_time_; }
void set_force_cancel_flag() { force_cancel_flag_ = true; }
void set_force_cancel_flag();
bool get_force_cancel_flag() { return force_cancel_flag_; }
int add_child_without_inheritance(ObIDag &child);
int add_child_without_inheritance(const common::ObIArray<ObINodeWithChild*> &child_array);

View File

@ -1,4 +1,4 @@
# owner: lixia.yq
# owner: gengli.wzy
# owner group: transaction
# description: 本case是为了测试在含未提交事务的状态下,with savepoint
--disable_query_log

View File

@ -1618,6 +1618,76 @@ TEST_F(TestDagScheduler, test_cancel_dag_net_func)
EXPECT_EQ(0, MTL(ObDagWarningHistoryManager *)->size());
}
class ObCancelWaitingDagInDagNet: public ObCancelDagNet
{
public:
ObCancelWaitingDagInDagNet()
: ObCancelDagNet(),
target_dag_cnt_(4),
first_dag_(nullptr)
{}
virtual int start_running() override
{
int ret = OB_SUCCESS;
ObCancelDag *dag = nullptr;
ObCancelDag *last_dag = nullptr;
// Dag1 -> Dag2/Dag3
// Dag2
// Dag3 -> Dag2/Dag4
// Dag4 -> Dag2
for (int64_t i = 0; i < target_dag_cnt_ && OB_SUCC(ret); i++) {
if (OB_FAIL(MTL(ObTenantDagScheduler*)->alloc_dag(dag))) {
COMMON_LOG(WARN, "Fail to create dag", K(ret));
} else if (OB_ISNULL(first_dag_) && OB_FAIL(add_dag_into_dag_net(*dag))) { // add first dag into this dag_net
COMMON_LOG(WARN, "Fail to add dag into dag_net", K(ret));
} else if (OB_NOT_NULL(first_dag_) && i != (target_dag_cnt_ - 1) && OB_FAIL(first_dag_->add_child(*dag))) {
COMMON_LOG(WARN, "Fail to add child of first_dag", K(ret), KPC(first_dag_), KPC(dag));
} else if (OB_NOT_NULL(last_dag) && i == (target_dag_cnt_ - 1) && OB_FAIL(last_dag->add_child(*dag))) {
COMMON_LOG(WARN, "Fail to add child of last_dag", K(ret), KPC(last_dag), KPC(dag));
} else if (OB_FAIL(dag->create_first_task())) {
COMMON_LOG(WARN, "Fail to create first task", K(ret));
} else if (OB_FAIL(MTL(ObTenantDagScheduler*)->add_dag(dag))) {
COMMON_LOG(WARN, "failed to add dag", K(ret), K(dag));
} else if (OB_ISNULL(first_dag_)) {
first_dag_ = dag;
}
last_dag = dag;
EXPECT_EQ(OB_SUCCESS, ret);
}
return ret;
}
public:
int64_t target_dag_cnt_;
ObCancelDag *first_dag_;
};
TEST_F(TestDagScheduler, test_cancel_waiting_dag)
{
int ret = OB_SUCCESS;
ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
ASSERT_TRUE(nullptr != scheduler);
EXPECT_EQ(OB_SUCCESS, scheduler->create_and_add_dag_net<ObCancelWaitingDagInDagNet>(nullptr));
ObIDagNet *tmp_dag_net = nullptr;
EXPECT_EQ(OB_SUCCESS, scheduler->get_first_dag_net(tmp_dag_net));
EXPECT_NE(nullptr, tmp_dag_net);
while (scheduler->get_cur_dag_cnt() < 4 /*target_dag_cnt*/) {
usleep(100);
}
ObCancelWaitingDagInDagNet *dag_net = static_cast<ObCancelWaitingDagInDagNet *>(tmp_dag_net);
EXPECT_NE(nullptr, dag_net);
ObArray<ObIDag *> dag_array;
dag_net->get_dag_list(dag_array);
EXPECT_EQ(4, dag_array.count());
ObCancelDag *first_dag = dag_net->first_dag_;
EXPECT_NE(nullptr, first_dag);
first_dag->set_force_cancel_flag();
first_dag->can_schedule_ = true;
wait_scheduler();
}
TEST_F(TestDagScheduler, test_destroy_when_running) //TODO(renju.rj): fix it
{