allow cancel waiting dag when parent dag or dag net is canceled
This commit is contained in:
@ -1618,6 +1618,76 @@ TEST_F(TestDagScheduler, test_cancel_dag_net_func)
|
||||
EXPECT_EQ(0, MTL(ObDagWarningHistoryManager *)->size());
|
||||
}
|
||||
|
||||
class ObCancelWaitingDagInDagNet: public ObCancelDagNet
|
||||
{
|
||||
public:
|
||||
ObCancelWaitingDagInDagNet()
|
||||
: ObCancelDagNet(),
|
||||
target_dag_cnt_(4),
|
||||
first_dag_(nullptr)
|
||||
{}
|
||||
|
||||
virtual int start_running() override
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObCancelDag *dag = nullptr;
|
||||
ObCancelDag *last_dag = nullptr;
|
||||
// Dag1 -> Dag2/Dag3
|
||||
// Dag2
|
||||
// Dag3 -> Dag2/Dag4
|
||||
// Dag4 -> Dag2
|
||||
for (int64_t i = 0; i < target_dag_cnt_ && OB_SUCC(ret); i++) {
|
||||
if (OB_FAIL(MTL(ObTenantDagScheduler*)->alloc_dag(dag))) {
|
||||
COMMON_LOG(WARN, "Fail to create dag", K(ret));
|
||||
} else if (OB_ISNULL(first_dag_) && OB_FAIL(add_dag_into_dag_net(*dag))) { // add first dag into this dag_net
|
||||
COMMON_LOG(WARN, "Fail to add dag into dag_net", K(ret));
|
||||
} else if (OB_NOT_NULL(first_dag_) && i != (target_dag_cnt_ - 1) && OB_FAIL(first_dag_->add_child(*dag))) {
|
||||
COMMON_LOG(WARN, "Fail to add child of first_dag", K(ret), KPC(first_dag_), KPC(dag));
|
||||
} else if (OB_NOT_NULL(last_dag) && i == (target_dag_cnt_ - 1) && OB_FAIL(last_dag->add_child(*dag))) {
|
||||
COMMON_LOG(WARN, "Fail to add child of last_dag", K(ret), KPC(last_dag), KPC(dag));
|
||||
} else if (OB_FAIL(dag->create_first_task())) {
|
||||
COMMON_LOG(WARN, "Fail to create first task", K(ret));
|
||||
} else if (OB_FAIL(MTL(ObTenantDagScheduler*)->add_dag(dag))) {
|
||||
COMMON_LOG(WARN, "failed to add dag", K(ret), K(dag));
|
||||
} else if (OB_ISNULL(first_dag_)) {
|
||||
first_dag_ = dag;
|
||||
}
|
||||
last_dag = dag;
|
||||
EXPECT_EQ(OB_SUCCESS, ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
public:
|
||||
int64_t target_dag_cnt_;
|
||||
ObCancelDag *first_dag_;
|
||||
};
|
||||
|
||||
TEST_F(TestDagScheduler, test_cancel_waiting_dag)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
|
||||
ASSERT_TRUE(nullptr != scheduler);
|
||||
EXPECT_EQ(OB_SUCCESS, scheduler->create_and_add_dag_net<ObCancelWaitingDagInDagNet>(nullptr));
|
||||
ObIDagNet *tmp_dag_net = nullptr;
|
||||
EXPECT_EQ(OB_SUCCESS, scheduler->get_first_dag_net(tmp_dag_net));
|
||||
EXPECT_NE(nullptr, tmp_dag_net);
|
||||
|
||||
while (scheduler->get_cur_dag_cnt() < 4 /*target_dag_cnt*/) {
|
||||
usleep(100);
|
||||
}
|
||||
|
||||
ObCancelWaitingDagInDagNet *dag_net = static_cast<ObCancelWaitingDagInDagNet *>(tmp_dag_net);
|
||||
EXPECT_NE(nullptr, dag_net);
|
||||
ObArray<ObIDag *> dag_array;
|
||||
dag_net->get_dag_list(dag_array);
|
||||
EXPECT_EQ(4, dag_array.count());
|
||||
|
||||
ObCancelDag *first_dag = dag_net->first_dag_;
|
||||
EXPECT_NE(nullptr, first_dag);
|
||||
first_dag->set_force_cancel_flag();
|
||||
first_dag->can_schedule_ = true;
|
||||
wait_scheduler();
|
||||
}
|
||||
|
||||
TEST_F(TestDagScheduler, test_destroy_when_running) //TODO(renju.rj): fix it
|
||||
{
|
||||
|
Reference in New Issue
Block a user