1604 lines
		
	
	
		
			46 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1604 lines
		
	
	
		
			46 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /**
 | |
|  * Copyright (c) 2021 OceanBase
 | |
|  * OceanBase CE is licensed under Mulan PubL v2.
 | |
|  * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | |
|  * You may obtain a copy of Mulan PubL v2 at:
 | |
|  *          http://license.coscl.org.cn/MulanPubL-2.0
 | |
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | |
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | |
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | |
|  * See the Mulan PubL v2 for more details.
 | |
|  */
 | |
| 
 | |
| #define USING_LOG_PREFIX TEST
 | |
| #include <getopt.h>
 | |
| #include <unistd.h>
 | |
| #include <gtest/gtest.h>
 | |
| #define protected public
 | |
| #define private public
 | |
| #include "share/scheduler/ob_dag_scheduler.h"
 | |
| #include "lib/atomic/ob_atomic.h"
 | |
| #include "observer/omt/ob_tenant_node_balancer.h"
 | |
| #include "share/scheduler/ob_dag_warning_history_mgr.h"
 | |
| #include "storage/meta_mem/ob_tenant_meta_mem_mgr.h"
 | |
| 
 | |
| int64_t dag_cnt = 1;
 | |
| int64_t stress_time= 1; // 100ms
 | |
| char log_level[20] = "INFO";
 | |
| uint32_t time_slice = 1000;
 | |
| uint64_t check_waiting_list_period = 1000;
 | |
| uint32_t sleep_slice = 2 * time_slice;
 | |
| const int64_t CHECK_TIMEOUT = 1 * 1000 * 1000;
 | |
| 
 | |
| #define CHECK_EQ_UTIL_TIMEOUT(expected, expr) \
 | |
|   { \
 | |
|     int64_t start_time = oceanbase::common::ObTimeUtility::current_time(); \
 | |
|     auto expr_result = (expr); \
 | |
|     do { \
 | |
|       if ((expected) == (expr_result)) { \
 | |
|         break; \
 | |
|       } else { \
 | |
|         expr_result = (expr); \
 | |
|       }\
 | |
|     } while(oceanbase::common::ObTimeUtility::current_time() - start_time < CHECK_TIMEOUT); \
 | |
|     EXPECT_EQ((expected), (expr_result)); \
 | |
|   }
 | |
| 
 | |
| namespace oceanbase
 | |
| {
 | |
| using namespace common;
 | |
| using namespace share;
 | |
| using namespace omt;
 | |
| 
 | |
| namespace storage
 | |
| {
 | |
| int64_t ObTenantMetaMemMgr::cal_adaptive_bucket_num()
 | |
| {
 | |
|   return 1000;
 | |
| }
 | |
| }
 | |
| 
 | |
| namespace unittest
 | |
| {
 | |
| 
 | |
| class TestDagScheduler : public ::testing::Test
 | |
| {
 | |
| public:
 | |
|   TestDagScheduler()
 | |
|     : tenant_id_(500),
 | |
|       scheduler_(nullptr),
 | |
|       tenant_base_(500)
 | |
|   {}
 | |
|   ~TestDagScheduler() {}
 | |
|   void SetUp()
 | |
|   {
 | |
|     ObUnitInfoGetter::ObTenantConfig unit_config;
 | |
|     unit_config.mode_ = lib::Worker::CompatMode::MYSQL;
 | |
|     unit_config.tenant_id_ = 0;
 | |
|     TenantUnits units;
 | |
|     ASSERT_EQ(OB_SUCCESS, units.push_back(unit_config));
 | |
| 
 | |
|     ObTenantMetaMemMgr *t3m = OB_NEW(ObTenantMetaMemMgr, ObModIds::TEST, 500);
 | |
|     tenant_base_.set(t3m);
 | |
| 
 | |
|     scheduler_ = OB_NEW(ObTenantDagScheduler, ObModIds::TEST);
 | |
|     tenant_base_.set(scheduler_);
 | |
| 
 | |
|     ObTenantEnv::set_tenant(&tenant_base_);
 | |
|     ASSERT_EQ(OB_SUCCESS, tenant_base_.init());
 | |
| 
 | |
|     ASSERT_EQ(OB_SUCCESS, t3m->init());
 | |
|     ASSERT_EQ(OB_SUCCESS, scheduler_->init(tenant_id_, time_slice, check_waiting_list_period, MAX_DAG_CNT));
 | |
|   }
 | |
|   void TearDown()
 | |
|   {
 | |
|     scheduler_->destroy();
 | |
|     scheduler_ = nullptr;
 | |
|     tenant_base_.destroy();
 | |
|     ObTenantEnv::set_tenant(nullptr);
 | |
|   }
 | |
| private:
 | |
|   const static int64_t MAX_DAG_CNT = 64;
 | |
|   const uint64_t tenant_id_;
 | |
|   ObTenantDagScheduler *scheduler_;
 | |
|   ObTenantBase tenant_base_;
 | |
|   DISALLOW_COPY_AND_ASSIGN(TestDagScheduler);
 | |
| };
 | |
| 
 | |
| void wait_scheduler() {
 | |
|   ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
 | |
|   ASSERT_TRUE(nullptr != scheduler);
 | |
|   while (!scheduler->is_empty()) {
 | |
|     usleep(100000);
 | |
|   }
 | |
| }
 | |
| 
 | |
| class ObBasicDag : public ObIDag
 | |
| {
 | |
| public:
 | |
|   ObBasicDag() :
 | |
|     ObIDag(ObDagType::DAG_TYPE_MAJOR_MERGE),
 | |
|     id_(ObTimeUtility::current_time() + random())
 | |
|   {}
 | |
|   void init(int64_t id) { id_ = id; }
 | |
|   virtual int64_t hash() const { return murmurhash(&id_, sizeof(id_), 0);}
 | |
|   virtual bool operator == (const ObIDag &other) const
 | |
|   {
 | |
|     bool bret = false;
 | |
|     if (get_type() == other.get_type()) {
 | |
|       const ObBasicDag &dag = static_cast<const ObBasicDag &>(other);
 | |
|       bret = dag.id_ == id_;
 | |
|     }
 | |
|     return bret;
 | |
|   }
 | |
|   virtual int fill_comment(char *buf,const int64_t size) const override { UNUSEDx(buf, size); return OB_SUCCESS; }
 | |
|   virtual int fill_dag_key(char *buf,const int64_t size) const override { UNUSEDx(buf, size); return OB_SUCCESS; }
 | |
|   virtual lib::Worker::CompatMode get_compat_mode() const override
 | |
|   { return lib::Worker::CompatMode::MYSQL; }
 | |
|   virtual uint64_t get_consumer_group_id() const override
 | |
|   { return consumer_group_id_; }
 | |
| 
 | |
|   INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(is_inited), K_(type), K_(id), K(task_list_.get_size()), K_(dag_ret));
 | |
| 
 | |
| private:
 | |
|   int64_t id_;
 | |
|   DISALLOW_COPY_AND_ASSIGN(ObBasicDag);
 | |
| };
 | |
| 
 | |
| /*
 | |
|  * check dag wait to schedule
 | |
|  * */
 | |
| 
 | |
| class ObWaitTask : public ObITask
 | |
| {
 | |
| public:
 | |
|   ObWaitTask() : ObITask(ObITaskType::TASK_TYPE_UT), cnt_(0), start_time_(0), finish_time_(0) {}
 | |
|   virtual ~ObWaitTask() {}
 | |
|   virtual int process()
 | |
|   {
 | |
|     if (cnt_ == 0) {
 | |
|       start_time_ = ObTimeUtility::current_time();
 | |
|     } else if (cnt_ < FINISH_CNT) {
 | |
|       cnt_++;
 | |
|       dag_yield();
 | |
|     } else {
 | |
|       finish_time_ = ObTimeUtility::current_time();
 | |
|       COMMON_LOG(INFO, "finish process", K(start_time_), K_(finish_time));
 | |
|     }
 | |
|     return OB_SUCCESS;
 | |
|   }
 | |
| private:
 | |
|   const static int64_t FINISH_CNT = 5;
 | |
|   int cnt_;
 | |
|   int64_t start_time_;
 | |
|   int64_t finish_time_;
 | |
| };
 | |
| 
 | |
| class ObWaitDag : public ObBasicDag
 | |
| {
 | |
| public:
 | |
|   ObWaitDag() :
 | |
|     ObBasicDag(),
 | |
|     retry_times_(0),
 | |
|     last_run_time_(0)
 | |
|   {}
 | |
|   virtual int create_first_task() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObWaitTask *task = NULL;
 | |
|     if (OB_FAIL(alloc_task(task))) {
 | |
|       COMMON_LOG(WARN, "Fail to alloc task", K(ret));
 | |
|     } else if (OB_FAIL(add_task(*task))) {
 | |
|       COMMON_LOG(WARN, "Fail to add task", K(ret));
 | |
|     }
 | |
|     return common::OB_SUCCESS;
 | |
|   }
 | |
| 
 | |
|   bool check_can_retry()
 | |
|   {
 | |
|     bool bret = true;
 | |
|     if (retry_times_++ > MAX_RETRY_TIMES) {
 | |
|       bret = false;
 | |
|     }
 | |
|     return bret;
 | |
|   }
 | |
| 
 | |
|   virtual bool check_can_schedule() override
 | |
|   {
 | |
|     bool bret = true;
 | |
|     if (ObTimeUtility::current_time() - last_run_time_ < MAX_CHECK_INTERVAL) {
 | |
|       bret = false;
 | |
|     } else {
 | |
|       last_run_time_ = ObTimeUtility::current_time();
 | |
|       STORAGE_LOG(INFO, "check_can_schedule", KPC(this));
 | |
|     }
 | |
|     return bret;
 | |
|   }
 | |
|   INHERIT_TO_STRING_KV("ObBasicDag", ObBasicDag, K_(retry_times), K_(last_run_time));
 | |
| private:
 | |
|   const int64_t MAX_RETRY_TIMES = 20;
 | |
|   const int64_t MAX_CHECK_INTERVAL = 1000L * 100L; // 100ms
 | |
| 
 | |
|   int64_t retry_times_;
 | |
|   int64_t last_run_time_;
 | |
|   DISALLOW_COPY_AND_ASSIGN(ObWaitDag);
 | |
| };
 | |
| 
 | |
| 
 | |
| TEST_F(TestDagScheduler, test_task_wait_to_schedule)
 | |
| {
 | |
|   ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
 | |
|   ASSERT_TRUE(nullptr != scheduler);
 | |
|   ObWaitDag *dag = NULL;
 | |
|   for (int i = 0; i < 10; ++i) {
 | |
|     EXPECT_EQ(OB_SUCCESS, scheduler->create_and_add_dag(nullptr, dag));
 | |
|   }
 | |
| 
 | |
|   wait_scheduler();
 | |
|   EXPECT_EQ(0, ObDagWarningHistoryManager::get_instance().size());
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * check task retry
 | |
|  * */
 | |
| class ObRetryTask : public ObITask
 | |
| {
 | |
| public:
 | |
|   ObRetryTask() : ObITask(ObITaskType::TASK_TYPE_NORMAL_MINOR_MERGE), cnt_(0), seq_(0) {}
 | |
|   virtual ~ObRetryTask() {}
 | |
|   virtual int process()
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (cnt_++ < FINISH_CNT) {
 | |
|       ret = OB_ERROR;
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
|   void init(int64_t seq) { seq_ = seq; }
 | |
|   virtual int generate_next_task(ObITask *&next_task)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (seq_ >= MAX_SEQ) {
 | |
|       ret = OB_ITER_END;
 | |
|       COMMON_LOG(INFO, "generate task end", K_(seq));
 | |
|     } else {
 | |
|       ObIDag *dag = get_dag();
 | |
|       ObRetryTask *ntask = NULL;
 | |
|       if (NULL == dag) {
 | |
|         ret = OB_ERR_UNEXPECTED;
 | |
|         COMMON_LOG(WARN, "dag is null", K(ret));
 | |
|       } else if (OB_FAIL(dag->alloc_task(ntask))) {
 | |
|         COMMON_LOG(WARN, "failed to alloc task", K(ret));
 | |
|       } else if (NULL == ntask) {
 | |
|         ret = OB_ERR_UNEXPECTED;
 | |
|         COMMON_LOG(WARN, "task is null", K(ret));
 | |
|       } else {
 | |
|         ntask->init(seq_ + 1);
 | |
|         ntask->set_max_retry_times(3);
 | |
|         next_task = ntask;
 | |
|       }
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| private:
 | |
|   const int64_t FINISH_CNT = 3;
 | |
|   const int64_t MAX_SEQ = 3;
 | |
|   int cnt_;
 | |
|   int64_t seq_;
 | |
| };
 | |
| 
 | |
| class ObTaskRetryDag : public ObBasicDag
 | |
| {
 | |
| public:
 | |
|   ObTaskRetryDag() :
 | |
|     ObBasicDag()
 | |
|   {}
 | |
|   virtual int create_first_task() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObRetryTask *task = NULL;
 | |
|     if (OB_FAIL(alloc_task(task))) {
 | |
|       COMMON_LOG(WARN, "Fail to alloc task", K(ret));
 | |
|     } else if (OB_FAIL(add_task(*task))) {
 | |
|       COMMON_LOG(WARN, "Fail to add task", K(ret));
 | |
|     } else {
 | |
|       task->init(0);
 | |
|       task->set_max_retry_times(10);
 | |
|     }
 | |
|     return common::OB_SUCCESS;
 | |
|   }
 | |
| };
 | |
| 
 | |
| TEST_F(TestDagScheduler, test_task_retry)
 | |
| {
 | |
|   ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
 | |
|   ASSERT_TRUE(nullptr != scheduler);
 | |
| 
 | |
|   for (int i = 0; i < 2; ++i) {
 | |
|     ObTaskRetryDag *dag = NULL;
 | |
|     EXPECT_EQ(OB_SUCCESS, scheduler->create_and_add_dag(nullptr, dag));
 | |
|   }
 | |
| 
 | |
|   wait_scheduler();
 | |
|   EXPECT_EQ(0, ObDagWarningHistoryManager::get_instance().size());
 | |
| }
 | |
| 
 | |
| class ObDagRetryTask : public ObITask
 | |
| {
 | |
| public:
 | |
|   ObDagRetryTask() : ObITask(ObITaskType::TASK_TYPE_NORMAL_MINOR_MERGE) {}
 | |
|   virtual ~ObDagRetryTask() {}
 | |
|   virtual int process()
 | |
|   {
 | |
|     static int cnt_ = 0;
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (cnt_++ < FINISH_CNT) {
 | |
|       ret = OB_ERROR;
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| private:
 | |
|   const int64_t FINISH_CNT = 1;
 | |
| };
 | |
| 
 | |
| struct ObRetryDagInitParam : public ObIDagInitParam
 | |
| {
 | |
|   ObRetryDagInitParam() : id_(0), str_() {}
 | |
|   virtual ~ObRetryDagInitParam() {}
 | |
|   virtual bool is_valid() const override
 | |
|   {
 | |
|     return id_ > 0 && !str_.empty();
 | |
|   }
 | |
| 
 | |
|   int assign(const ObRetryDagInitParam &other)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     id_ = other.id_;
 | |
|     if (OB_FAIL(deep_copy_str(other.str_.ptr(), str_))) {
 | |
|       STORAGE_LOG(WARN, "deep copy string", K(ret));
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   int deep_copy_str(const char *src, ObString &dest)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     char *buf = NULL;
 | |
| 
 | |
|     if (OB_ISNULL(src)) {
 | |
|       ret = OB_INVALID_ARGUMENT;
 | |
|       STORAGE_LOG(WARN, "The src is NULL, ", K(ret));
 | |
|     } else {
 | |
|       int64_t len = strlen(src) + 1;
 | |
|       if (NULL == (buf = static_cast<char *>(allocator_.alloc(len)))) {
 | |
|         ret = OB_ALLOCATE_MEMORY_FAILED;
 | |
|         STORAGE_LOG(ERROR, "Fail to allocate memory, ", K(len), K(ret));
 | |
|       } else {
 | |
|         MEMCPY(buf, src, len-1);
 | |
|         buf[len-1] = '\0';
 | |
|         dest.assign_ptr(buf, static_cast<ObString::obstr_size_t>(len-1));
 | |
|       }
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
|   int64_t id_;
 | |
|   ObString str_;
 | |
|   ObArenaAllocator allocator_;
 | |
| };
 | |
| 
 | |
| class ObDagRetryDag : public ObBasicDag
 | |
| {
 | |
| public:
 | |
|   ObDagRetryDag() : ObBasicDag() {}
 | |
|   virtual int init_by_param(const ObIDagInitParam *param) override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (OB_ISNULL(param) || !param->is_valid()) {
 | |
|       ret = OB_INVALID_ARGUMENT;
 | |
|       COMMON_LOG(WARN, "invalid argument", K(ret), K(param));
 | |
|     } else if (OB_FAIL(param_.assign(*(static_cast<const ObRetryDagInitParam *>(param))))) {
 | |
|       COMMON_LOG(WARN, "failed to assign param", K(ret));
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
|   virtual int create_first_task() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObDagRetryTask *task = NULL;
 | |
|     if (OB_FAIL(alloc_task(task))) {
 | |
|       COMMON_LOG(WARN, "Fail to alloc task", K(ret));
 | |
|     } else if (OB_FAIL(add_task(*task))) {
 | |
|       COMMON_LOG(WARN, "Fail to add task", K(ret));
 | |
|     }
 | |
|     return common::OB_SUCCESS;
 | |
|   }
 | |
|   virtual int inner_reset_status_for_retry() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (OB_FAIL(init_by_param(¶m_))) {
 | |
|       COMMON_LOG(WARN, "failed to init param", K(ret));
 | |
|     } else if (OB_FAIL(create_first_task())) {
 | |
|       COMMON_LOG(WARN, "failed to create first task", K(ret));
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
| private:
 | |
|   ObRetryDagInitParam param_;
 | |
|   DISALLOW_COPY_AND_ASSIGN(ObDagRetryDag);
 | |
| };
 | |
| 
 | |
| TEST_F(TestDagScheduler, test_dag_retry)
 | |
| {
 | |
|   ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
 | |
|   ASSERT_TRUE(nullptr != scheduler);
 | |
| 
 | |
|   int ret = OB_SUCCESS;
 | |
|   for (int i = 0; OB_SUCC(ret) && i < 5; ++i) {
 | |
|     ObDagRetryDag *dag = NULL;
 | |
|     ObRetryDagInitParam param;
 | |
|     const int64_t str_len = 100;
 | |
|     char str[str_len];
 | |
|     param.id_ = i + 1;
 | |
|     snprintf(str, str_len, "Hello OceanBase_%d", i);
 | |
|     param.str_ = ObString(str);
 | |
|     if (OB_FAIL(scheduler->create_and_add_dag(¶m, dag))) {
 | |
|       COMMON_LOG(WARN, "failed to create first task", K(ret));
 | |
|     } else {
 | |
|       dag->set_max_retry_times(3);
 | |
|     }
 | |
|     EXPECT_EQ(OB_SUCCESS, ret);
 | |
|   }
 | |
| 
 | |
|   wait_scheduler();
 | |
|   EXPECT_EQ(0, ObDagWarningHistoryManager::get_instance().size());
 | |
| }
 | |
| 
 | |
| class ObOperator
 | |
| {
 | |
| public:
 | |
|   ObOperator() : num_(0) {}
 | |
|   ~ObOperator() {}
 | |
|   void inc() { ATOMIC_INC(&num_); }
 | |
|   void dec() { ATOMIC_DEC(&num_); }
 | |
| private:
 | |
|   int64_t num_;
 | |
| };
 | |
| 
 | |
| class ObRunningTask : public ObITask
 | |
| {
 | |
| public:
 | |
|   ObRunningTask()
 | |
|     : ObITask(ObITaskType::TASK_TYPE_UT),
 | |
|       seq_(0),
 | |
|       op_(),
 | |
|       is_inc_(true)
 | |
|   {
 | |
|   }
 | |
|   virtual ~ObRunningTask() {}
 | |
|   void init(const int64_t seq, ObOperator &op, bool is_inc) { seq_ = seq; op_ = &op; is_inc_ = is_inc; }
 | |
|   virtual int process()
 | |
|   {
 | |
|     if (is_inc_) {
 | |
|       op_->inc();
 | |
|     } else {
 | |
|       op_->dec();
 | |
|     }
 | |
|     return OB_SUCCESS;
 | |
|   }
 | |
|   virtual int generate_next_task(ObITask *&next_task)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (seq_ >= MAX_SEQ) {
 | |
|       ret = OB_ITER_END;
 | |
|       COMMON_LOG(INFO, "generate task end", K_(seq));
 | |
|     } else {
 | |
|       ObIDag *dag = get_dag();
 | |
|       ObRunningTask *ntask = NULL;
 | |
|       if (NULL == dag) {
 | |
|         ret = OB_ERR_UNEXPECTED;
 | |
|         COMMON_LOG(WARN, "dag is null", K(ret));
 | |
|       } else if (OB_FAIL(dag->alloc_task(ntask))) {
 | |
|         COMMON_LOG(WARN, "failed to alloc task", K(ret));
 | |
|       } else if (NULL == ntask) {
 | |
|         ret = OB_ERR_UNEXPECTED;
 | |
|         COMMON_LOG(WARN, "task is null", K(ret));
 | |
|       } else {
 | |
|         ntask->init(seq_ + 1, *op_, is_inc_);
 | |
|         next_task = ntask;
 | |
|       }
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| private:
 | |
|   const int64_t MAX_SEQ = 5;
 | |
|   int64_t seq_;
 | |
|   ObOperator *op_;
 | |
|   bool is_inc_;
 | |
| };
 | |
| 
 | |
| class ObChildDag : public ObBasicDag
 | |
| {
 | |
| public:
 | |
|   ObChildDag()
 | |
|    : ObBasicDag(),
 | |
|      op_(nullptr),
 | |
|      is_inc_(true),
 | |
|      cnt_(0)
 | |
|   {}
 | |
|   static const int64_t MAX_CHILD_DAG_CNT = 3;
 | |
|   void init(ObOperator &op, bool is_inc) { op_ = &op; is_inc_ = is_inc; }
 | |
|   virtual int create_first_task() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObRunningTask *task = NULL;
 | |
|     if (OB_FAIL(alloc_task(task))) {
 | |
|       COMMON_LOG(WARN, "Fail to alloc task", K(ret));
 | |
|     } else if (FALSE_IT(task->init(0, *op_, is_inc_))) {
 | |
|     } else if (OB_FAIL(add_task(*task))) {
 | |
|       COMMON_LOG(WARN, "Fail to add task", K(ret));
 | |
|     } else {
 | |
|       COMMON_LOG(INFO, "success to add task", K(ret), KPC(this), KPC(task));
 | |
|     }
 | |
|     return common::OB_SUCCESS;
 | |
|   }
 | |
|   virtual bool check_can_schedule() override
 | |
|   {
 | |
|     bool bret = true;
 | |
|     if (cnt_ < WAIT_CNT) {
 | |
|       ++cnt_;
 | |
|       bret  = false;
 | |
|     }
 | |
|     COMMON_LOG(INFO, "check_can_schedule", K(bret), KPC(this), K(cnt_));
 | |
|     return bret;
 | |
|   }
 | |
|   INHERIT_TO_STRING_KV("BasicDag", ObBasicDag, K_(is_inited), K_(type), K_(id), K(task_list_.get_size()), K_(cnt));
 | |
| 
 | |
| private:
 | |
|   const int64_t WAIT_CNT = 1;
 | |
|   ObOperator *op_;
 | |
|   bool is_inc_;
 | |
|   int64_t cnt_;
 | |
| };
 | |
| 
 | |
| class ObFatherPrepareTask : public ObITask
 | |
| {
 | |
| public:
 | |
|   ObFatherPrepareTask()
 | |
|     : ObITask(ObITaskType::TASK_TYPE_UT),
 | |
|       op_(nullptr)
 | |
|   {
 | |
|   }
 | |
|   virtual ~ObFatherPrepareTask() {}
 | |
|   void init(ObOperator &op) { op_ = &op; }
 | |
|   virtual int process()
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
 | |
|     if (OB_ISNULL(scheduler)) {
 | |
|       ret = OB_ERR_UNEXPECTED;
 | |
|       COMMON_LOG(WARN, "ObTenantDagScheduler is unexpected null", K(ret));
 | |
|     }
 | |
| 
 | |
|     for (int i = 0; OB_SUCC(ret) && i < CHILD_DAG_CNT; ++i) {
 | |
|       ObChildDag *child_dag = nullptr;
 | |
|       if (OB_FAIL(scheduler->alloc_dag(child_dag))) {
 | |
|         COMMON_LOG(WARN, "failed to alloc child dag", K(ret));
 | |
|       } else if (FALSE_IT(child_dag->init(*op_, 0 == i % 2))) {
 | |
|       } else if (OB_FAIL(child_dag->create_first_task())) {
 | |
|         COMMON_LOG(WARN, "failed to create first task for child dag", K(ret), KPC(child_dag));
 | |
|       } else if (OB_FAIL(dag_->add_child(*child_dag))) {
 | |
|         COMMON_LOG(WARN, "failed to alloc dependency dag", K(ret), KPC(dag_), KPC(child_dag));
 | |
|       } else if (OB_FAIL(scheduler->add_dag(child_dag))) {
 | |
|         if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) {
 | |
|           COMMON_LOG(WARN, "failed to add dag", K(ret), KPC(dag_), KPC(child_dag));
 | |
|         } else {
 | |
|           ret = OB_SUCCESS;
 | |
|         }
 | |
|       } else {
 | |
|         COMMON_LOG(INFO, "success to alloc child dag", K(ret), KPC(this), KPC(child_dag));
 | |
|       }
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| private:
 | |
|   const static int64_t CHILD_DAG_CNT = 2;
 | |
|   ObOperator *op_;
 | |
| };
 | |
| 
 | |
| class ObFatherPrepareDag : public ObBasicDag
 | |
| {
 | |
| public:
 | |
|   ObFatherPrepareDag()
 | |
|    : ObBasicDag(),
 | |
|      op_(nullptr)
 | |
|   {}
 | |
|   void init(ObOperator &op) { op_ = &op;}
 | |
|   virtual int create_first_task() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObFatherPrepareTask *task = NULL;
 | |
|     if (OB_FAIL(alloc_task(task))) {
 | |
|       COMMON_LOG(WARN, "Fail to alloc task", K(ret));
 | |
|     } else if (FALSE_IT(task->init(*op_))) {
 | |
|     } else if (OB_FAIL(add_task(*task))) {
 | |
|       COMMON_LOG(WARN, "Fail to add task", K(ret));
 | |
|     } else {
 | |
|       COMMON_LOG(INFO, "success to add task", K(ret), KPC(this), KPC(task));
 | |
|     }
 | |
|     return common::OB_SUCCESS;
 | |
|   }
 | |
|   INHERIT_TO_STRING_KV("BasicDag", ObBasicDag, K_(is_inited), K_(type), K_(id), K(task_list_.get_size()));
 | |
| 
 | |
| private:
 | |
|   ObOperator *op_;
 | |
| };
 | |
| 
 | |
| class ObFatherFinishTask : public ObITask
 | |
| {
 | |
| public:
 | |
|   ObFatherFinishTask()
 | |
|     : ObITask(ObITaskType::TASK_TYPE_UT),
 | |
|       op_(nullptr)
 | |
|   {
 | |
|   }
 | |
|   virtual ~ObFatherFinishTask() {}
 | |
|   void init(ObOperator &op) { op_ = &op; }
 | |
|   virtual int process();
 | |
| 
 | |
| private:
 | |
|   ObOperator *op_;
 | |
| };
 | |
| 
 | |
| class ObFatherFinishDag : public ObBasicDag
 | |
| {
 | |
| public:
 | |
|   ObFatherFinishDag()
 | |
|    : ObBasicDag(),
 | |
|      op_(nullptr)
 | |
|   {}
 | |
|   void init(ObOperator &op) { op_ = &op;}
 | |
|   virtual int create_first_task() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObFatherFinishTask *task = NULL;
 | |
|     if (OB_FAIL(alloc_task(task))) {
 | |
|       COMMON_LOG(WARN, "Fail to alloc task", K(ret));
 | |
|     } else if (FALSE_IT(task->init(*op_))) {
 | |
|     } else if (OB_FAIL(add_task(*task))) {
 | |
|       COMMON_LOG(WARN, "Fail to add task", K(ret));
 | |
|     } else {
 | |
|       COMMON_LOG(INFO, "success to add task", K(ret), KPC(this), KPC(task));
 | |
|     }
 | |
|     return common::OB_SUCCESS;
 | |
|   }
 | |
|   INHERIT_TO_STRING_KV("BasicDag", ObBasicDag, K_(is_inited), K_(type), K_(id), K(task_list_.get_size()));
 | |
| 
 | |
| private:
 | |
|   ObOperator *op_;
 | |
| };
 | |
| 
 | |
| class ObFatherDagNet : public ObIDagNet
 | |
| {
 | |
| public:
 | |
|   ObFatherDagNet() :
 | |
|     ObIDagNet(ObDagNetType::DAG_NET_TYPE_MIGARTION),
 | |
|     id_(ObTimeUtility::current_time() + random()),
 | |
|     op_()
 | |
|   {}
 | |
|   void init(int64_t id) { id_ = id; }
 | |
|   bool is_valid() const { return true; }
 | |
|   virtual int start_running() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObFatherPrepareDag *prepare_dag = NULL;
 | |
|     ObFatherFinishDag *finish_dag = NULL;
 | |
| 
 | |
|     // create dag and connections
 | |
|     if (OB_FAIL(MTL(ObTenantDagScheduler*)->alloc_dag(prepare_dag))) {
 | |
|       COMMON_LOG(WARN, "Fail to create dag", K(ret));
 | |
|     } else if (FALSE_IT(prepare_dag->init(op_))) {
 | |
|     } else if (OB_FAIL(prepare_dag->create_first_task())) {
 | |
|       COMMON_LOG(WARN, "Fail to create first task", K(ret));
 | |
|     } else if (OB_FAIL(add_dag_into_dag_net(*prepare_dag))) { // add first dag into this dag_net
 | |
|       COMMON_LOG(WARN, "Fail to add dag into dag_net", K(ret));
 | |
|     } else   if (OB_FAIL(MTL(ObTenantDagScheduler*)->alloc_dag(finish_dag))) {
 | |
|       COMMON_LOG(WARN, "Fail to create dag", K(ret));
 | |
|     } else if (FALSE_IT(finish_dag->init(op_))) {
 | |
|     } else if (OB_FAIL(finish_dag->create_first_task())) {
 | |
|       COMMON_LOG(WARN, "Fail to create first task", K(ret));
 | |
|     } else if (OB_FAIL(prepare_dag->add_child(*finish_dag))) {
 | |
|       COMMON_LOG(WARN, "Fail to add child", K(ret), KPC(prepare_dag), KPC(finish_dag));
 | |
|     } else if (OB_FAIL(MTL(ObTenantDagScheduler*)->add_dag(prepare_dag))
 | |
|         || OB_FAIL(MTL(ObTenantDagScheduler*)->add_dag(finish_dag))) {
 | |
|       COMMON_LOG(WARN, "Fail to add dag into dag_scheduler", K(ret));
 | |
|     } else {
 | |
|       // add all dags into dag_scheduler
 | |
|       COMMON_LOG(INFO, "success to add dag into dag_scheduler", K(ret));
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
|   virtual int64_t hash() const { return murmurhash(&id_, sizeof(id_), 0);}
 | |
|   virtual bool operator == (const ObIDagNet &other) const
 | |
|   {
 | |
|     bool bret = false;
 | |
|     if (get_type() == other.get_type()) {
 | |
|       const ObFatherDagNet &dag = static_cast<const ObFatherDagNet &>(other);
 | |
|       bret = dag.id_ == id_;
 | |
|     }
 | |
|     return bret;
 | |
|   }
 | |
|   virtual int fill_comment(char *buf, const int64_t buf_len) const override
 | |
|   { UNUSEDx(buf, buf_len); return OB_SUCCESS; }
 | |
|   virtual int fill_dag_net_key(char *buf, const int64_t buf_len) const override
 | |
|   { UNUSEDx(buf, buf_len); return OB_SUCCESS; }
 | |
| 
 | |
|   INHERIT_TO_STRING_KV("ObIDagNet", ObIDagNet, K_(type), K_(id));
 | |
| private:
 | |
| 
 | |
|   int64_t id_;
 | |
|   ObOperator op_;
 | |
|   DISALLOW_COPY_AND_ASSIGN(ObFatherDagNet);
 | |
| };
 | |
| 
 | |
| int ObFatherFinishTask::process()
 | |
| {
 | |
|   EXPECT_EQ(0, op_->num_);
 | |
|   return OB_SUCCESS;
 | |
| }
 | |
| 
 | |
| TEST_F(TestDagScheduler, test_basic_dag_net)
 | |
| {
 | |
|   ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
 | |
|   ASSERT_TRUE(nullptr != scheduler);
 | |
| 
 | |
|   ObFatherDagNet *dag_net = nullptr;
 | |
|   for (int i = 0; i < 2; ++i) {
 | |
|     EXPECT_EQ(OB_SUCCESS, scheduler->create_and_add_dag_net(nullptr, dag_net));
 | |
|   }
 | |
| 
 | |
|   wait_scheduler();
 | |
|   EXPECT_EQ(0, ObDagWarningHistoryManager::get_instance().size());
 | |
| }
 | |
| 
 | |
| class ObFatherWithRetryDagNet : public ObFatherDagNet
 | |
| {
 | |
| public:
 | |
|   virtual int start_running() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObFatherPrepareDag *prepare_dag = NULL;
 | |
|     ObFatherFinishDag *finish_dag = NULL;
 | |
| 
 | |
|     // create dag and connections
 | |
|     if (OB_FAIL(MTL(ObTenantDagScheduler*)->alloc_dag(prepare_dag))) {
 | |
|       COMMON_LOG(WARN, "Fail to create dag", K(ret));
 | |
|     } else if (FALSE_IT(prepare_dag->init(op_))) {
 | |
|     } else if (OB_FAIL(prepare_dag->create_first_task())) {
 | |
|       COMMON_LOG(WARN, "Fail to create first task", K(ret));
 | |
|     } else if (OB_FAIL(add_dag_into_dag_net(*prepare_dag))) { // add first dag into this dag_net
 | |
|       COMMON_LOG(WARN, "Fail to add dag into dag_net", K(ret));
 | |
|     } else if (OB_FAIL(MTL(ObTenantDagScheduler*)->alloc_dag(finish_dag))) {
 | |
|       COMMON_LOG(WARN, "Fail to create dag", K(ret));
 | |
|     } else if (FALSE_IT(finish_dag->init(op_))) {
 | |
|     } else if (OB_FAIL(finish_dag->create_first_task())) {
 | |
|       COMMON_LOG(WARN, "Fail to create first task", K(ret));
 | |
|     } else if (OB_FAIL(prepare_dag->add_child(*finish_dag))) {
 | |
|       COMMON_LOG(WARN, "Fail to add child", K(ret), KPC(prepare_dag), KPC(finish_dag));
 | |
|     }
 | |
|     for (int i = 0; OB_SUCC(ret) && i < 1; ++i) {
 | |
|       ObDagRetryDag *dag = NULL;
 | |
|       ObRetryDagInitParam param;
 | |
|       const int64_t str_len = 100;
 | |
|       char str[str_len];
 | |
|       param.id_ = i + 1;
 | |
|       snprintf(str, str_len, "Hello OceanBase_%d", i);
 | |
|       param.str_ = ObString(str);
 | |
|       if (OB_FAIL(MTL(ObTenantDagScheduler*)->create_dag(¶m, dag))) {
 | |
|         COMMON_LOG(WARN, "failed to create first task", K(ret));
 | |
|       } else {
 | |
|         dag->set_max_retry_times(3);
 | |
|         if (OB_FAIL(prepare_dag->add_child(*dag))) {
 | |
|           COMMON_LOG(WARN, "Fail to add child", K(ret), KPC(prepare_dag), KPC(finish_dag));
 | |
|         } else if (OB_FAIL(MTL(ObTenantDagScheduler*)->add_dag(dag))) {
 | |
|           COMMON_LOG(WARN, "Fail to add dag into dag_scheduler", K(ret));
 | |
|         }
 | |
|       }
 | |
|       EXPECT_EQ(OB_SUCCESS, ret);
 | |
|     }
 | |
| 
 | |
|     if (OB_FAIL(MTL(ObTenantDagScheduler*)->add_dag(prepare_dag))
 | |
|         || OB_FAIL(MTL(ObTenantDagScheduler*)->add_dag(finish_dag))) {
 | |
|       COMMON_LOG(WARN, "Fail to add dag into dag_scheduler", K(ret));
 | |
|     } else {
 | |
|       // add all dags into dag_scheduler
 | |
|       COMMON_LOG(INFO, "success to add dag into dag_scheduler", K(ret));
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| };
 | |
| 
 | |
| TEST_F(TestDagScheduler, test_basic_dag_net_with_one_retry_dag)
 | |
| {
 | |
|   ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
 | |
|   ASSERT_TRUE(nullptr != scheduler);
 | |
| 
 | |
|   ObFatherWithRetryDagNet *dag_net = nullptr;
 | |
|   for (int i = 0; i < 1; ++i) {
 | |
|     EXPECT_EQ(OB_SUCCESS, scheduler->create_and_add_dag_net(nullptr, dag_net));
 | |
|   }
 | |
| 
 | |
|   wait_scheduler();
 | |
|   EXPECT_EQ(0, ObDagWarningHistoryManager::get_instance().size());
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * check task retry
 | |
|  * */
 | |
| static int64_t generate_cnt = 1;
 | |
| class ObGenerateFailedTask : public ObITask
 | |
| {
 | |
| public:
 | |
|   ObGenerateFailedTask() : ObITask(ObITaskType::TASK_TYPE_NORMAL_MINOR_MERGE), cnt_(0), seq_(0) {}
 | |
|   virtual ~ObGenerateFailedTask() {}
 | |
|   virtual int process()
 | |
|   {
 | |
|     return OB_SUCCESS;
 | |
|   }
 | |
|   void init(int64_t seq) { seq_ = seq; }
 | |
|   virtual int generate_next_task(ObITask *&next_task)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (seq_ >= MAX_SEQ) {
 | |
|       ret = OB_ITER_END;
 | |
|       COMMON_LOG(INFO, "generate task end", K_(seq));
 | |
|     } else if (generate_cnt++ < 2) {
 | |
|       ret = OB_ERR_UNEXPECTED;
 | |
|     } else {
 | |
|       ObIDag *dag = get_dag();
 | |
|       ObRetryTask *ntask = NULL;
 | |
|       if (NULL == dag) {
 | |
|         ret = OB_ERR_UNEXPECTED;
 | |
|         COMMON_LOG(WARN, "dag is null", K(ret));
 | |
|       } else if (OB_FAIL(dag->alloc_task(ntask))) {
 | |
|         COMMON_LOG(WARN, "failed to alloc task", K(ret));
 | |
|       } else if (NULL == ntask) {
 | |
|         ret = OB_ERR_UNEXPECTED;
 | |
|         COMMON_LOG(WARN, "task is null", K(ret));
 | |
|       } else {
 | |
|         ntask->init(seq_ + 1);
 | |
|         next_task = ntask;
 | |
|       }
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| private:
 | |
|   const int64_t MAX_SEQ = 2;
 | |
|   int cnt_;
 | |
|   int64_t seq_;
 | |
| };
 | |
| 
 | |
| class ObGenerateFailedDag : public ObBasicDag
 | |
| {
 | |
| public:
 | |
|   virtual int create_first_task() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObGenerateFailedTask *task = NULL;
 | |
|     if (OB_FAIL(alloc_task(task))) {
 | |
|       COMMON_LOG(WARN, "Fail to alloc task", K(ret));
 | |
|     } else if (OB_FAIL(add_task(*task))) {
 | |
|       COMMON_LOG(WARN, "Fail to add task", K(ret));
 | |
|     } else {
 | |
|       task->init(0);
 | |
|     }
 | |
|     return common::OB_SUCCESS;
 | |
|   }
 | |
|   int inner_reset_status_for_retry() { return OB_SUCCESS; }
 | |
|   INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(is_inited), K_(type), K_(id), K(task_list_.get_size()), K_(dag_ret));
 | |
| };
 | |
| 
 | |
| TEST_F(TestDagScheduler, test_generage_task_failed)
 | |
| {
 | |
|   ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
 | |
|   ASSERT_TRUE(nullptr != scheduler);
 | |
| 
 | |
|   ObGenerateFailedDag *dag = nullptr;
 | |
|   for (int i = 0; i < 1; ++i) {
 | |
|     EXPECT_EQ(OB_SUCCESS, scheduler->create_and_add_dag(nullptr, dag));
 | |
|     dag->set_max_retry_times(7);
 | |
|   }
 | |
| 
 | |
|   wait_scheduler();
 | |
|   EXPECT_EQ(0, ObDagWarningHistoryManager::get_instance().size());
 | |
| }
 | |
| 
 | |
| //generate next dag
 | |
| 
 | |
| class ObGenerateNextDagCtx
 | |
| {
 | |
| public:
 | |
|   ObGenerateNextDagCtx()
 | |
|     : index_(0)
 | |
|   {
 | |
|   }
 | |
| 
 | |
|   virtual ~ObGenerateNextDagCtx() {}
 | |
| 
 | |
|   int get_next_index(int64_t &index)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     index = 0;
 | |
|     common::SpinWLockGuard guard(lock_);
 | |
|     if (index_ >= MAX_INDEX) {
 | |
|       ret = OB_ITER_END;
 | |
|     } else {
 | |
|       index = index_;
 | |
|       index_++;
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   bool is_empty() const
 | |
|   {
 | |
|     common::SpinRLockGuard guard(lock_);
 | |
|     return index_ == MAX_INDEX;
 | |
|   }
 | |
| 
 | |
| private:
 | |
|   const int64_t MAX_INDEX = 10;
 | |
|   common::SpinRWLock lock_;
 | |
|   int64_t index_;
 | |
| };
 | |
| 
 | |
| class ObFinishGeneratNextDag : public ObBasicDag
 | |
| {
 | |
| public:
 | |
|   ObFinishGeneratNextDag() :
 | |
|     ObBasicDag(),
 | |
|     is_inited_(false),
 | |
|     ctx_()
 | |
|   {}
 | |
| 
 | |
|   int init()
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (is_inited_) {
 | |
|       ret = OB_INIT_TWICE;
 | |
|       COMMON_LOG(WARN, "start generate next dag init twice", K(ret));
 | |
|     } else {
 | |
|       id_ = FINISH_DAG_ID;
 | |
|       is_inited_ = true;
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   virtual int create_first_task() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObFakeTask *task = NULL;
 | |
|     if (OB_FAIL(alloc_task(task))) {
 | |
|       COMMON_LOG(WARN, "Fail to alloc task", K(ret));
 | |
|     } else if (OB_FAIL(add_task(*task))) {
 | |
|       COMMON_LOG(WARN, "Fail to add task", K(ret));
 | |
|     }
 | |
|     return common::OB_SUCCESS;
 | |
|   }
 | |
|   ObGenerateNextDagCtx *get_ctx() { return &ctx_; }
 | |
| 
 | |
| private:
 | |
|   const int64_t FINISH_DAG_ID = 1000001;
 | |
|   bool is_inited_;
 | |
|   ObGenerateNextDagCtx ctx_;
 | |
|   DISALLOW_COPY_AND_ASSIGN(ObFinishGeneratNextDag);
 | |
| };
 | |
| 
 | |
| class ObDagGenerateNextDag : public ObBasicDag
 | |
| {
 | |
| public:
 | |
|   ObDagGenerateNextDag() :
 | |
|     ObBasicDag(),
 | |
|     is_inited_(false),
 | |
|     ctx_(nullptr)
 | |
|   {}
 | |
| 
 | |
|   int init(const int64_t id, ObGenerateNextDagCtx *ctx)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (is_inited_) {
 | |
|       ret = OB_INIT_TWICE;
 | |
|       COMMON_LOG(WARN, "dag generate next dag init twice", K(ret));
 | |
|     } else if (id < 0) {
 | |
|       ret = OB_INVALID_ARGUMENT;
 | |
|       COMMON_LOG(WARN, "init dag generate next dag get invalid argument", K(ret), K(id));
 | |
|     } else {
 | |
|       id_ = id;
 | |
|       ctx_ = ctx;
 | |
|       is_inited_ = true;
 | |
|       COMMON_LOG(INFO, "succeed init next dag", K(id));
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   virtual int generate_next_dag(share::ObIDag *&dag)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     dag = nullptr;
 | |
|     ObTenantDagScheduler *scheduler = nullptr;
 | |
|     int64_t next_id = 0;
 | |
|     ObDagGenerateNextDag *next_dag = nullptr;
 | |
| 
 | |
|     if (!is_inited_) {
 | |
|       ret = OB_NOT_INIT;
 | |
|       COMMON_LOG(WARN, "generate next dag do not init", K(ret));
 | |
|     } else if (OB_FAIL(ctx_->get_next_index(next_id))) {
 | |
|       if (OB_ITER_END == ret) {
 | |
|         //do nothing
 | |
|       } else {
 | |
|         COMMON_LOG(WARN, "failed to get next index", K(ret));
 | |
|       }
 | |
|     } else if (OB_ISNULL(scheduler = MTL(ObTenantDagScheduler*))) {
 | |
|       ret = OB_ERR_UNEXPECTED;
 | |
|       COMMON_LOG(WARN, "failed to get ObTenantDagScheduler from MTL", K(ret));
 | |
|     } else if (OB_FAIL(scheduler->alloc_dag(next_dag))) {
 | |
|       COMMON_LOG(WARN, "failed to alloc next_dag", K(ret));
 | |
|     } else if (OB_FAIL(next_dag->init(next_id, ctx_))) {
 | |
|       COMMON_LOG(WARN, "failed to init tablet migration dag", K(ret));
 | |
|     } else if (OB_FAIL(next_dag->create_first_task())) {
 | |
|       COMMON_LOG(WARN, "failed to create first task", K(ret));
 | |
|     } else {
 | |
|       dag = next_dag;
 | |
|       next_dag = nullptr;
 | |
|     }
 | |
| 
 | |
|     if (OB_NOT_NULL(next_dag)) {
 | |
|       scheduler->free_dag(*next_dag);
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   virtual int create_first_task() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObFakeTask *task = NULL;
 | |
|     if (OB_FAIL(alloc_task(task))) {
 | |
|       COMMON_LOG(WARN, "Fail to alloc task", K(ret));
 | |
|     } else if (OB_FAIL(add_task(*task))) {
 | |
|       COMMON_LOG(WARN, "Fail to add task", K(ret));
 | |
|     }
 | |
|     return common::OB_SUCCESS;
 | |
|   }
 | |
| 
 | |
|   INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(is_inited), K_(type), K_(id), K(task_list_.get_size()), K_(dag_ret));
 | |
| private:
 | |
|   bool is_inited_;
 | |
|   ObGenerateNextDagCtx *ctx_;
 | |
|   DISALLOW_COPY_AND_ASSIGN(ObDagGenerateNextDag);
 | |
| };
 | |
| 
 | |
| class ObStartGeneratNextDagTask : public ObITask
 | |
| {
 | |
| public:
 | |
|   ObStartGeneratNextDagTask() : ObITask(ObITaskType::TASK_TYPE_NORMAL_MINOR_MERGE), is_inited_(false), id_(0) {}
 | |
|   virtual ~ObStartGeneratNextDagTask() {}
 | |
|   virtual int process()
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (!is_inited_) {
 | |
|       ret = OB_NOT_INIT;
 | |
|       COMMON_LOG(WARN, "generate next dag task do not init", K(ret));
 | |
|     } else {
 | |
|       ObDagGenerateNextDag *next_dag = nullptr;
 | |
|       ObFinishGeneratNextDag *finish_dag = nullptr;
 | |
|       ObGenerateNextDagCtx *ctx = nullptr;
 | |
|       ObTenantDagScheduler *scheduler = nullptr;
 | |
|       int64_t id = 0;
 | |
| 
 | |
|       if (!is_inited_) {
 | |
|         ret = OB_NOT_INIT;
 | |
|         COMMON_LOG(WARN, "start prepare migration task do not init", K(ret));
 | |
|       } else if (OB_ISNULL(scheduler = MTL(ObTenantDagScheduler*))) {
 | |
|         ret = OB_ERR_UNEXPECTED;
 | |
|         COMMON_LOG(WARN, "failed to get ObTenantDagScheduler from MTL", K(ret));
 | |
|       } else if (OB_FAIL(scheduler->alloc_dag(finish_dag))) {
 | |
|         COMMON_LOG(WARN, "failed to alloc finish backfill tx migration dag ", K(ret));
 | |
|       } else if (OB_FAIL(finish_dag->init())) {
 | |
|         COMMON_LOG(WARN, "failed to init data tablets migration dag", K(ret));
 | |
|       } else if (OB_ISNULL(ctx = finish_dag->get_ctx())) {
 | |
|         ret = OB_ERR_UNEXPECTED;
 | |
|         COMMON_LOG(WARN, "backfill tx ctx should not be NULL", K(ret), KP(ctx));
 | |
|       } else if (ctx->is_empty()) {
 | |
|         if (OB_FAIL(this->get_dag()->add_child(*finish_dag))) {
 | |
|           COMMON_LOG(WARN, "failed to add finish_dag as chilid", K(ret));
 | |
|         }
 | |
|       } else {
 | |
|         if (OB_FAIL(ctx->get_next_index(id))) {
 | |
|           COMMON_LOG(WARN, "failed to get tablet id", K(ret));
 | |
|         } else if (OB_FAIL(scheduler->alloc_dag(next_dag))) {
 | |
|           COMMON_LOG(WARN, "failed to alloc next_dag", K(ret));
 | |
|         } else if (OB_FAIL(next_dag->init(id, ctx))) {
 | |
|           COMMON_LOG(WARN, "failed to init next_dag", K(ret));
 | |
|         } else if (OB_FAIL(this->get_dag()->add_child(*next_dag))) {
 | |
|           COMMON_LOG(WARN, "failed to add next_dag as chilid", K(ret));
 | |
|         } else if (OB_FAIL(next_dag->create_first_task())) {
 | |
|           COMMON_LOG(WARN, "failed to create first task", K(ret));
 | |
|         } else if (OB_FAIL(next_dag->add_child(*finish_dag))) {
 | |
|           COMMON_LOG(WARN, "failed to add child dag", K(ret));
 | |
|         } else if (OB_FAIL(scheduler->add_dag(next_dag))) {
 | |
|           COMMON_LOG(WARN, "failed to add tablet backfill tx dag", K(ret));
 | |
|           if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
 | |
|             COMMON_LOG(WARN, "Fail to add task", K(ret));
 | |
|             ret = OB_EAGAIN;
 | |
|           }
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       if (OB_FAIL(ret)) {
 | |
|       } else if (OB_FAIL(finish_dag->create_first_task())) {
 | |
|         COMMON_LOG(WARN, "failed to create first task", K(ret));
 | |
|       } else if (OB_FAIL(scheduler->add_dag(finish_dag))) {
 | |
|         COMMON_LOG(WARN, "failed to add finish_dag", K(ret));
 | |
|         int tmp_ret = OB_SUCCESS;
 | |
|         if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
 | |
|           COMMON_LOG(WARN, "Fail to add task", K(ret));
 | |
|           ret = OB_EAGAIN;
 | |
|         }
 | |
| 
 | |
|         if (OB_NOT_NULL(next_dag)) {
 | |
|           if (OB_SUCCESS != (tmp_ret = scheduler->cancel_dag(next_dag))) {
 | |
|             COMMON_LOG(WARN, "failed to cancel next_dag", K(ret));
 | |
|           }
 | |
|           next_dag = nullptr;
 | |
|         }
 | |
|       } else {
 | |
|         next_dag = nullptr;
 | |
|         finish_dag = nullptr;
 | |
|       }
 | |
| 
 | |
|       if (OB_FAIL(ret)) {
 | |
|         if (OB_NOT_NULL(next_dag)) {
 | |
|           scheduler->free_dag(*next_dag);
 | |
|         }
 | |
| 
 | |
|         if (OB_NOT_NULL(finish_dag)) {
 | |
|           scheduler->free_dag(*finish_dag);
 | |
|         }
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     return ret;
 | |
|   }
 | |
|   int init(const int64_t id)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (is_inited_) {
 | |
|       ret = OB_INIT_TWICE;
 | |
|       COMMON_LOG(WARN, "generate next dag task init twice", K(ret));
 | |
|     } else if (id < 0) {
 | |
|       ret = OB_INVALID_ARGUMENT;
 | |
|       COMMON_LOG(WARN, "init generate next dag task get invalid argument", K(ret), K(id));
 | |
|     } else {
 | |
|       id_ = id;
 | |
|       is_inited_ = true;
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| private:
 | |
|   bool is_inited_;
 | |
|   int64_t id_;
 | |
| };
 | |
| 
 | |
| class ObStartGenerateNextDag : public ObBasicDag
 | |
| {
 | |
| public:
 | |
|   ObStartGenerateNextDag() :
 | |
|     ObBasicDag(),
 | |
|     is_inited_(false)
 | |
|   {}
 | |
| 
 | |
|   int init()
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (is_inited_) {
 | |
|       ret = OB_INIT_TWICE;
 | |
|       COMMON_LOG(WARN, "start generate next dag init twice", K(ret));
 | |
|     } else {
 | |
|       id_ = START_DAG_ID;
 | |
|       is_inited_ = true;
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   virtual int create_first_task() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObStartGeneratNextDagTask *task = NULL;
 | |
|     if (OB_FAIL(alloc_task(task))) {
 | |
|       COMMON_LOG(WARN, "Fail to alloc task", K(ret));
 | |
|     } else if (OB_FAIL(task->init(id_))) {
 | |
|       COMMON_LOG(WARN, "failed to init task", K(ret));
 | |
|     } else if (OB_FAIL(add_task(*task))) {
 | |
|       COMMON_LOG(WARN, "Fail to add task", K(ret));
 | |
|     }
 | |
|     return common::OB_SUCCESS;
 | |
|   }
 | |
| 
 | |
| private:
 | |
|   const int64_t START_DAG_ID = 1000000;
 | |
|   bool is_inited_;
 | |
|   DISALLOW_COPY_AND_ASSIGN(ObStartGenerateNextDag);
 | |
| };
 | |
| 
 | |
| TEST_F(TestDagScheduler, generate_next_dag)
 | |
| {
 | |
|   ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
 | |
|   ASSERT_TRUE(nullptr != scheduler);
 | |
| 
 | |
|   ObStartGenerateNextDag *dag = nullptr;
 | |
|   EXPECT_EQ(OB_SUCCESS, scheduler->alloc_dag(dag));
 | |
|   EXPECT_EQ(OB_SUCCESS, dag->init());
 | |
|   EXPECT_EQ(OB_SUCCESS, dag->create_first_task());
 | |
|   EXPECT_EQ(OB_SUCCESS, scheduler->add_dag(dag));
 | |
| 
 | |
|   wait_scheduler();
 | |
|   EXPECT_EQ(0, ObDagWarningHistoryManager::get_instance().size());
 | |
| }
 | |
| 
 | |
| class ObCreateChildTask : public ObITask
 | |
| {
 | |
| public:
 | |
|   ObCreateChildTask() : ObITask(ObITaskType::TASK_TYPE_UT), cnt_(0){}
 | |
|   virtual ~ObCreateChildTask() {}
 | |
|   virtual int process()
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     int tmp_ret = OB_SUCCESS;
 | |
|     ObArray<ObIDag *> new_dag_array;
 | |
|     ObIDag *parent = this->get_dag();
 | |
|     ObIDagNet *dag_net = get_dag()->get_dag_net();
 | |
|     ObWaitDag *new_dag = nullptr;
 | |
|     for (int64_t i = 0; OB_SUCC(ret) && i < TestDagScheduler::MAX_DAG_CNT + 5; ++i) {
 | |
|       if (OB_FAIL(MTL(ObTenantDagScheduler*)->alloc_dag(new_dag))) {
 | |
|         COMMON_LOG(WARN, "failed to alloc tablet migration dag ", K(ret));
 | |
|       } else if (FALSE_IT(new_dag->init(i))) {
 | |
|       } else if (OB_FAIL(dag_net->add_dag_into_dag_net(*new_dag))) {
 | |
|         COMMON_LOG(WARN, "failed to add dag into dag net", K(ret), K(i), K(new_dag));
 | |
|       } else if (OB_FAIL(parent->add_child_without_inheritance(*new_dag))) {
 | |
|         COMMON_LOG(WARN, "failed to add child dag", K(ret), K(i), K(new_dag));
 | |
|       } else if (OB_FAIL(new_dag->create_first_task())) {
 | |
|         COMMON_LOG(WARN, "failed to create first task", K(ret), K(i), K(new_dag));
 | |
|       } else if (OB_FAIL(MTL(ObTenantDagScheduler*)->add_dag(new_dag))) {
 | |
|         COMMON_LOG(WARN, "failed to add tablet migration dag", K(ret), K(*new_dag));
 | |
|         if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
 | |
|           COMMON_LOG(WARN, "Fail to add task", K(ret));
 | |
|           ret = OB_EAGAIN;
 | |
|         }
 | |
|       } else if (OB_FAIL(new_dag_array.push_back(new_dag))) {
 | |
|         COMMON_LOG(WARN, "failed to push tablet migration dag into array", K(ret), K(i), K(new_dag));
 | |
|       }
 | |
|     }
 | |
|     if (OB_FAIL(ret)) {
 | |
|       if (OB_NOT_NULL(new_dag)) {
 | |
|         new_dag->reset_children();
 | |
|         if (OB_SUCCESS != (tmp_ret = dag_net->erase_dag_from_dag_net(*new_dag))) {
 | |
|           COMMON_LOG(WARN, "failed to erase dag from dag net", K(tmp_ret), KPC(new_dag));
 | |
|         }
 | |
|         MTL(ObTenantDagScheduler*)->free_dag(*new_dag);
 | |
|         new_dag = nullptr;
 | |
|       }
 | |
| 
 | |
|       for (int64_t i = 0; i < new_dag_array.count(); ++i) {
 | |
|         ObIDag *dag = new_dag_array.at(i);
 | |
|         dag->reset_children();
 | |
|         if (OB_SUCCESS != (tmp_ret = dag_net->erase_dag_from_dag_net(*dag))) {
 | |
|           COMMON_LOG(WARN, "failed to erase dag from dag net", K(tmp_ret), KPC(dag));
 | |
|         }
 | |
| 
 | |
|         if (OB_SUCCESS != (tmp_ret = MTL(ObTenantDagScheduler*)->cancel_dag(dag))) {
 | |
|           COMMON_LOG(WARN, "failed to cancel dag", K(tmp_ret), K(*dag));
 | |
|           ob_abort();
 | |
|         } else {
 | |
|           COMMON_LOG(INFO, "success to cancel dag", K(tmp_ret), K(i));
 | |
|         }
 | |
|       }
 | |
|       new_dag_array.reset();
 | |
|       this->get_dag()->reset_children();
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| private:
 | |
|   const static int64_t FINISH_CNT = 5;
 | |
|   int cnt_;
 | |
| };
 | |
| 
 | |
| class ObCreateChildDag : public ObBasicDag
 | |
| {
 | |
| public:
 | |
|   ObCreateChildDag() :
 | |
|     ObBasicDag()
 | |
|   {}
 | |
|   virtual int create_first_task() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObCreateChildTask *task = NULL;
 | |
|     if (OB_FAIL(alloc_task(task))) {
 | |
|       COMMON_LOG(WARN, "Fail to alloc task", K(ret));
 | |
|     } else if (OB_FAIL(add_task(*task))) {
 | |
|       COMMON_LOG(WARN, "Fail to add task", K(ret));
 | |
|     }
 | |
|     return common::OB_SUCCESS;
 | |
|   }
 | |
| private:
 | |
|   const int64_t MAX_RETRY_TIMES = 20;
 | |
|   const int64_t MAX_CHECK_INTERVAL = 1000L * 100L; // 100ms
 | |
| 
 | |
|   DISALLOW_COPY_AND_ASSIGN(ObCreateChildDag);
 | |
| };
 | |
| 
 | |
| class ObCreateDagNet : public ObFatherDagNet
 | |
| {
 | |
| public:
 | |
|   virtual int start_running() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObCreateChildDag *prepare_dag = NULL;
 | |
|     ObFatherFinishDag *finish_dag = NULL;
 | |
| 
 | |
|     // create dag and connections
 | |
|     if (OB_FAIL(MTL(ObTenantDagScheduler*)->alloc_dag(prepare_dag))) {
 | |
|       COMMON_LOG(WARN, "Fail to create dag", K(ret));
 | |
|     } else if (OB_FAIL(prepare_dag->create_first_task())) {
 | |
|       COMMON_LOG(WARN, "Fail to create first task", K(ret));
 | |
|     } else if (OB_FAIL(add_dag_into_dag_net(*prepare_dag))) { // add first dag into this dag_net
 | |
|       COMMON_LOG(WARN, "Fail to add dag into dag_net", K(ret));
 | |
|     } else if (OB_FAIL(MTL(ObTenantDagScheduler*)->alloc_dag(finish_dag))) {
 | |
|       COMMON_LOG(WARN, "Fail to create dag", K(ret));
 | |
|     } else if (FALSE_IT(finish_dag->init(op_))) {
 | |
|     } else if (OB_FAIL(finish_dag->create_first_task())) {
 | |
|       COMMON_LOG(WARN, "Fail to create first task", K(ret));
 | |
|     } else if (OB_FAIL(prepare_dag->add_child(*finish_dag))) {
 | |
|       COMMON_LOG(WARN, "Fail to add child", K(ret), KPC(prepare_dag), KPC(finish_dag));
 | |
|     }
 | |
|     EXPECT_EQ(OB_SUCCESS, ret);
 | |
| 
 | |
|     if (OB_FAIL(MTL(ObTenantDagScheduler*)->add_dag(prepare_dag))
 | |
|         || OB_FAIL(MTL(ObTenantDagScheduler*)->add_dag(finish_dag))) {
 | |
|       COMMON_LOG(WARN, "Fail to add dag into dag_scheduler", K(ret));
 | |
|     } else {
 | |
|       // add all dags into dag_scheduler
 | |
|       COMMON_LOG(INFO, "success to add dag into dag_scheduler", K(ret));
 | |
|     }
 | |
| 
 | |
|     return ret;
 | |
|   }
 | |
| };
 | |
| 
 | |
| TEST_F(TestDagScheduler, test_add_dag_failed_in_generate_dag_net)
 | |
| {
 | |
|   ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
 | |
|   ASSERT_TRUE(nullptr != scheduler);
 | |
|   ObCreateDagNet *dag_net = nullptr;
 | |
|   EXPECT_EQ(OB_SUCCESS, scheduler->create_and_add_dag_net(nullptr, dag_net));
 | |
| 
 | |
|   wait_scheduler();
 | |
|   EXPECT_EQ(0, ObDagWarningHistoryManager::get_instance().size());
 | |
| }
 | |
| 
 | |
| class ObFreeDagNet: public ObFatherDagNet
 | |
| {
 | |
| public:
 | |
|   virtual int start_running() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObCreateChildDag *prepare_dag = NULL;
 | |
|     ObFatherFinishDag *finish_dag = NULL;
 | |
| 
 | |
|     // create dag and connections
 | |
|     if (OB_FAIL(MTL(ObTenantDagScheduler*)->alloc_dag(prepare_dag))) {
 | |
|       COMMON_LOG(WARN, "Fail to create dag", K(ret));
 | |
|     } else if (OB_FAIL(prepare_dag->create_first_task())) {
 | |
|       COMMON_LOG(WARN, "Fail to create first task", K(ret));
 | |
|     } else if (OB_FAIL(add_dag_into_dag_net(*prepare_dag))) { // add first dag into this dag_net
 | |
|       COMMON_LOG(WARN, "Fail to add dag into dag_net", K(ret));
 | |
|     } else if (OB_FAIL(MTL(ObTenantDagScheduler*)->alloc_dag(finish_dag))) {
 | |
|       COMMON_LOG(WARN, "Fail to create dag", K(ret));
 | |
|     } else if (FALSE_IT(finish_dag->init(op_))) {
 | |
|     } else if (OB_FAIL(finish_dag->create_first_task())) {
 | |
|       COMMON_LOG(WARN, "Fail to create first task", K(ret));
 | |
|     } else if (OB_FAIL(prepare_dag->add_child(*finish_dag))) {
 | |
|       COMMON_LOG(WARN, "Fail to add child", K(ret), KPC(prepare_dag), KPC(finish_dag));
 | |
|     }
 | |
|     EXPECT_EQ(OB_SUCCESS, ret);
 | |
|     MTL(ObTenantDagScheduler*)->free_dag(*finish_dag, prepare_dag);
 | |
|     COMMON_LOG(INFO, "free dag", K(ret), KPC(prepare_dag), KPC(this));
 | |
|     EXPECT_EQ(dag_record_map_.size(), 1);
 | |
|     EXPECT_EQ(0, prepare_dag->children_.count());
 | |
| 
 | |
|     MTL(ObTenantDagScheduler*)->free_dag(*prepare_dag);
 | |
|     COMMON_LOG(INFO, "free dag", K(ret), KPC(this));
 | |
|     EXPECT_EQ(dag_record_map_.size(), 0);
 | |
| 
 | |
|     return OB_ERR_UNEXPECTED;
 | |
|   }
 | |
| };
 | |
| 
 | |
| TEST_F(TestDagScheduler, test_free_dag_func)
 | |
| {
 | |
|   ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
 | |
|   ASSERT_TRUE(nullptr != scheduler);
 | |
|   ObFreeDagNet *dag_net = nullptr;
 | |
|   EXPECT_EQ(OB_SUCCESS, scheduler->create_and_add_dag_net(nullptr, dag_net));
 | |
| 
 | |
|   wait_scheduler();
 | |
|   EXPECT_EQ(0, ObDagWarningHistoryManager::get_instance().size());
 | |
| }
 | |
| 
 | |
| class ObCancelDag : public ObBasicDag
 | |
| {
 | |
| public:
 | |
|   ObCancelDag() : can_schedule_(false) {}
 | |
|   virtual int create_first_task() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObWaitTask *task = NULL;
 | |
|     if (OB_FAIL(alloc_task(task))) {
 | |
|       COMMON_LOG(WARN, "Fail to alloc task", K(ret));
 | |
|     } else if (OB_FAIL(add_task(*task))) {
 | |
|       COMMON_LOG(WARN, "Fail to add task", K(ret));
 | |
|     }
 | |
|     return common::OB_SUCCESS;
 | |
|   }
 | |
|   virtual bool check_can_schedule() override
 | |
|   {
 | |
|     return can_schedule_;
 | |
|   }
 | |
| 
 | |
|   bool can_schedule_;
 | |
| };
 | |
| 
 | |
| class ObCancelDagNet: public ObFatherDagNet
 | |
| {
 | |
| public:
 | |
|   virtual int start_running() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObCancelDag *dag = NULL;
 | |
| 
 | |
|     for (int i = 0; OB_SUCC(ret) && i < 3; ++i) {
 | |
|       // create dag and connections
 | |
|       if (OB_FAIL(MTL(ObTenantDagScheduler*)->alloc_dag(dag))) {
 | |
|         COMMON_LOG(WARN, "Fail to create dag", K(ret));
 | |
|       } else if (OB_FAIL(dag->create_first_task())) {
 | |
|         COMMON_LOG(WARN, "Fail to create first task", K(ret));
 | |
|       } else if (OB_FAIL(add_dag_into_dag_net(*dag))) { // add first dag into this dag_net
 | |
|         COMMON_LOG(WARN, "Fail to add dag into dag_net", K(ret));
 | |
|       } else if (OB_FAIL(MTL(ObTenantDagScheduler*)->add_dag(dag))) {
 | |
|         COMMON_LOG(WARN, "failed to add dag", K(ret), K(dag));
 | |
|       }
 | |
|       EXPECT_EQ(OB_SUCCESS, ret);
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
|   void get_dag_list(ObIArray<ObIDag *> &dag_array) const
 | |
|   {
 | |
|     for (DagRecordMap::const_iterator iter = dag_record_map_.begin();
 | |
|         iter != dag_record_map_.end(); ++iter) {
 | |
|       ObDagRecord *dag_record = iter->second;
 | |
|       dag_array.push_back(dag_record->dag_ptr_);
 | |
|     }
 | |
|   }
 | |
| };
 | |
| 
 | |
| TEST_F(TestDagScheduler, test_cancel_dag_func)
 | |
| {
 | |
|   ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
 | |
|   ASSERT_TRUE(nullptr != scheduler);
 | |
|   ObCancelDagNet *dag_net = nullptr;
 | |
|   EXPECT_EQ(OB_SUCCESS, scheduler->create_and_add_dag_net(nullptr, dag_net));
 | |
| 
 | |
|   while (scheduler->get_cur_dag_cnt() < 3) {
 | |
|     usleep(100);
 | |
|   }
 | |
| 
 | |
|   ObArray<ObIDag *> dag_array;
 | |
|   dag_net->get_dag_list(dag_array);
 | |
| 
 | |
|   for (int i = 0; i < dag_array.count(); ++i) {
 | |
|     scheduler->cancel_dag(dag_array[i]);
 | |
|   }
 | |
| 
 | |
|   EXPECT_EQ(true, scheduler->is_empty());
 | |
|   EXPECT_EQ(0, ObDagWarningHistoryManager::get_instance().size());
 | |
| }
 | |
| 
 | |
| 
 | |
| 
 | |
| TEST_F(TestDagScheduler, test_cancel_dag_net_func)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
 | |
|   ASSERT_TRUE(nullptr != scheduler);
 | |
|   ObCancelDagNet *dag_net = nullptr;
 | |
|   EXPECT_EQ(OB_SUCCESS, scheduler->create_and_add_dag_net(nullptr, dag_net));
 | |
| 
 | |
|   while (scheduler->get_cur_dag_cnt() < 3) {
 | |
|     usleep(100);
 | |
|   }
 | |
| 
 | |
|   ObArray<ObIDag *> dag_array;
 | |
|   dag_net->get_dag_list(dag_array);
 | |
|   ret = scheduler->cancel_dag_net(dag_net->get_dag_id());
 | |
| 
 | |
|   for (int i = 0; i < dag_array.count(); ++i) {
 | |
|     ObCancelDag *dag = static_cast<ObCancelDag *>(dag_array[i]);
 | |
|     dag->can_schedule_ = true;
 | |
|   }
 | |
| 
 | |
|   EXPECT_EQ(OB_SUCCESS, ret);
 | |
|   ob_usleep(5000 * 1000);
 | |
| 
 | |
|   EXPECT_EQ(true, scheduler->is_empty());
 | |
|   EXPECT_EQ(0, ObDagWarningHistoryManager::get_instance().size());
 | |
| }
 | |
| 
 | |
| 
 | |
| TEST_F(TestDagScheduler, test_destroy_when_running)
 | |
| {
 | |
|   ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*);
 | |
|   ASSERT_TRUE(nullptr != scheduler);
 | |
| 
 | |
|   #ifndef BUILD_COVERAGE
 | |
|   // not participate in coverage compilation to fix hang problem
 | |
|   ObCancelDagNet *dag_net = nullptr;
 | |
|   EXPECT_EQ(OB_SUCCESS, scheduler->create_and_add_dag_net(nullptr, dag_net));
 | |
| 
 | |
|   while (scheduler->get_cur_dag_cnt() < 3) {
 | |
|     usleep(100);
 | |
|   }
 | |
|   #endif
 | |
| }
 | |
| 
 | |
| 
 | |
| }
 | |
| }
 | |
| 
 | |
| void parse_cmd_arg(int argc, char **argv)
 | |
| {
 | |
|   int opt = 0;
 | |
|   const char *opt_string = "p:s:l:";
 | |
| 
 | |
|   struct option longopts[] = {
 | |
|       {"dag cnt for performance test", 1, NULL, 'p'},
 | |
|       {"stress test time", 1, NULL, 's'},
 | |
|       {"log level", 1, NULL, 'l'},
 | |
|       {0,0,0,0} };
 | |
| 
 | |
|   while (-1 != (opt = getopt_long(argc, argv, opt_string, longopts, NULL))) {
 | |
|     switch(opt) {
 | |
|     case 'p':
 | |
|       dag_cnt = strtoll(optarg, NULL, 10);
 | |
|       break;
 | |
|     case 's':
 | |
|       stress_time = strtoll(optarg, NULL, 10);
 | |
|       break;
 | |
|     case 'l':
 | |
|       snprintf(log_level, 20, "%s", optarg);
 | |
|       break;
 | |
|     default:
 | |
|       break;
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| int main(int argc, char **argv)
 | |
| {
 | |
|   ::testing::InitGoogleTest(&argc, argv);
 | |
|   parse_cmd_arg(argc, argv);
 | |
|   OB_LOGGER.set_enable_async_log(false);
 | |
|   OB_LOGGER.set_log_level("DEBUG");
 | |
|   OB_LOGGER.set_max_file_size(256*1024*1024);
 | |
|   system("rm -f test_dag_net_in_dag_scheduler.log*");
 | |
|   OB_LOGGER.set_file_name("test_dag_net_in_dag_scheduler.log");
 | |
|   return RUN_ALL_TESTS();
 | |
| }
 | 
