/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX TEST #include #include #include #define protected public #define private public #include "storage/compaction/ob_tenant_tablet_scheduler.h" #include "share/scheduler/ob_tenant_dag_scheduler.h" #include "share/scheduler/ob_sys_task_stat.h" #include "lib/atomic/ob_atomic.h" #include "observer/omt/ob_tenant_node_balancer.h" #include "share/scheduler/ob_dag_warning_history_mgr.h" #include "storage/compaction/ob_tenant_compaction_progress.h" int64_t dag_cnt = 1; int64_t stress_time= 1; // 100ms char log_level[20] = "INFO"; uint32_t time_slice = 1000; namespace oceanbase { using namespace common; using namespace lib; using namespace share; using namespace omt; namespace unittest { class AtomicOperator { public: AtomicOperator() : v_(0) {} AtomicOperator(int64_t v) : v_(v) {} void inc() { lib::ObMutexGuard guard(lock_); ++v_; } void mul(int64_t m) { lib::ObMutexGuard guard(lock_); v_ *= m; } int64_t value() { return v_; } void reset() { v_ = 0; } private: lib::ObMutex lock_; int64_t v_; }; static const int64_t SLEEP_SLICE = 100; class AtomicMulTask : public ObITask { public: AtomicMulTask() : ObITask(ObITask::TASK_TYPE_UT), seq_(0), cnt_(0), error_seq_(-1), op_(NULL), sleep_us_(0) {} int init(int64_t seq, int64_t cnt, AtomicOperator &op, int sleep_us = 0, int64_t error_seq = -1) { seq_ = seq; cnt_ = cnt; error_seq_ = error_seq; op_ = &op; sleep_us_ = sleep_us; return OB_SUCCESS; } virtual int generate_next_task(ObITask *&task) { int ret = OB_SUCCESS; ObIDag *dag = NULL; AtomicMulTask *ntask = NULL; if (seq_ >= cnt_) { return OB_ITER_END; } else if (OB_ISNULL(dag = get_dag())) { ret = OB_ERR_UNEXPECTED; COMMON_LOG(WARN, "dag is NULL", K(ret)); } else if (OB_FAIL(dag->alloc_task(ntask))){ COMMON_LOG(WARN, "failed to alloc task", K(ret)); } else if (OB_ISNULL(ntask)) { ret = OB_ERR_UNEXPECTED; COMMON_LOG(WARN, "ntask is NULL", K(ret)); } else { COMMON_LOG(INFO, "a task is generated", K(seq_)); ntask->init(seq_ + 1, cnt_, *op_, sleep_us_, error_seq_); task = ntask; } return ret; } virtual int process() { int ret = OB_SUCCESS; int64_t cnt = sleep_us_ / SLEEP_SLICE; for (int64_t i = 0; i < cnt; ++i) { if (OB_FAIL(dag_yield())) { if (OB_CANCELED == ret) { COMMON_LOG(INFO, "Cancel this task since the whole dag is canceled", K(ret)); break; } else { COMMON_LOG(WARN, "Invalid return value for dag_yield", K(ret)); } } ::usleep(SLEEP_SLICE); } if (seq_ == error_seq_) { return OB_ERR_UNEXPECTED; } else { op_->mul(2); } return ret; } INHERIT_TO_STRING_KV("ObITask", ObITask, "type", "AtomicMul", K(*dag_), K_(seq), K_(cnt), KP_(op), K_(error_seq), K_(sleep_us)); private: int64_t seq_; int64_t cnt_; // The seq triger process return OB_ERR_UNEXPECTED to test error handing int64_t error_seq_; AtomicOperator *op_; int sleep_us_; }; class AtomicIncTask : public ObITask { public: AtomicIncTask() : ObITask(ObITask::TASK_TYPE_UT), seq_(0), cnt_(0), error_seq_(-1), op_(NULL), sleep_us_(0) {} int init(int64_t seq, int64_t cnt, AtomicOperator &op, int sleep_us = 0, int64_t error_seq = -1) { seq_ = seq; cnt_ = cnt; error_seq_ = error_seq; op_ = &op; sleep_us_ = sleep_us; return OB_SUCCESS; } virtual int generate_next_task(ObITask *&task) { int ret = OB_SUCCESS; ObIDag *dag = NULL; AtomicIncTask *ntask = NULL; if (seq_ >= cnt_) { return OB_ITER_END; } else if (OB_ISNULL(dag = get_dag())) { ret = OB_ERR_UNEXPECTED; COMMON_LOG(WARN, "dag is NULL", K(ret)); } else if (OB_FAIL(dag->alloc_task(ntask))){ COMMON_LOG(WARN, "failed to alloc task", K(ret)); } else if (OB_ISNULL(ntask)) { ret = OB_ERR_UNEXPECTED; COMMON_LOG(WARN, "ntask is NULL", K(ret)); } else { COMMON_LOG(INFO, "a task is generated", K(seq_)); ntask->init(seq_ + 1, cnt_, *op_, sleep_us_, error_seq_); task = ntask; } return ret; } virtual int process() { int ret = OB_SUCCESS; int64_t cnt = sleep_us_ / SLEEP_SLICE; for (int64_t i = 0; i < cnt; ++i) { if (OB_FAIL(dag_yield())) { if (OB_CANCELED == ret) { COMMON_LOG(INFO, "Cancel this task since the whole dag is canceled", K(ret)); break; } else { COMMON_LOG(WARN, "Invalid return value for dag_yield", K(ret)); } } ::usleep(SLEEP_SLICE); } if (seq_ == error_seq_) { return OB_ERR_UNEXPECTED; } else { op_->inc(); } return ret; } INHERIT_TO_STRING_KV("ObITask", ObITask, "type", "AtomicInc", K(*dag_), K_(seq), K_(cnt), KP_(op), K_(error_seq), K_(sleep_us)); private: int64_t seq_; int64_t cnt_; int64_t error_seq_; AtomicOperator *op_; int sleep_us_; }; template int alloc_task(ObIDag &dag, T *&task) { int ret = OB_SUCCESS; task = NULL; if (OB_FAIL(dag.alloc_task(task))) { COMMON_LOG(WARN, "failed to alloc task", K(ret)); } else if (OB_ISNULL(task)) { ret = OB_ERR_UNEXPECTED; COMMON_LOG(WARN, "task is NULL", K(ret)); } return ret; } void wait_scheduler() { ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*); ASSERT_TRUE(nullptr != scheduler); while (!scheduler->is_empty()) { ::usleep(100000); } ObIAllocator &basic_allocator = scheduler->get_allocator(false /*is_ha*/); ObIAllocator &basic_root_allocator = static_cast(&basic_allocator)->root_allocator_; while ((basic_allocator.used() - basic_root_allocator.used()) != 0) { ::usleep(100000); } while ((basic_allocator.total() - basic_root_allocator.total()) != 0) { ::usleep(100000); } } class TestDag : public ObIDag { public: TestDag() : ObIDag(ObDagType::DAG_TYPE_MERGE_EXECUTE), id_(0), expect_(-1), expect_ret_(0), running_(false), tester_(NULL) { } explicit TestDag(const ObDagType::ObDagTypeEnum type) : ObIDag(type), id_(0), expect_(-1), expect_ret_(0), running_(false), tester_(NULL) { } virtual ~TestDag() { if (get_dag_status() == ObIDag::DAG_STATUS_FINISH || get_dag_status() == ObIDag::DAG_STATUS_NODE_FAILED) { if (running_ && -1 != expect_) { if (op_.value() != expect_ || get_dag_ret() != expect_ret_) { if (OB_ALLOCATE_MEMORY_FAILED != get_dag_ret()) { if (NULL != tester_) { tester_->stop(); } COMMON_LOG_RET(ERROR, OB_ERROR, "FATAL ERROR!!!", K_(expect), K(op_.value()), K_(expect_ret), K(get_dag_ret()), K_(id)); common::right_to_die_or_duty_to_live(); } } } } } int init(int64_t id, int expect_ret = 0, int64_t expect = -1, lib::ThreadPool *tester = NULL) { id_ = id; expect_ret_ = expect_ret; expect_ = expect; tester_ = tester; ObAddr addr(1683068975,9999); if (OB_SUCCESS != (ObSysTaskStatMgr::get_instance().set_self_addr(addr))) { COMMON_LOG_RET(WARN, OB_ERROR, "failed to add sys task", K(addr)); } return OB_SUCCESS; } virtual int64_t hash() const { return murmurhash(&id_, sizeof(id_), 0);} virtual bool operator == (const ObIDag &other) const { bool bret = false; if (get_type() == other.get_type()) { const TestDag &dag = static_cast(other); bret = dag.id_ == id_; } return bret; } void set_id(int64_t id) { id_ = id; } AtomicOperator &get_op() { return op_; } void set_running() { running_ = true; } virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const override { int ret = OB_SUCCESS; if (!is_inited_) { ret = OB_NOT_INIT; } else if (OB_FAIL(ADD_DAG_WARN_INFO_PARAM(out_param, allocator, get_type(), 1, 1, "table_id", 10))) { COMMON_LOG(WARN, "fail to add dag warning info param", K(ret)); } return ret; } virtual int fill_dag_key(char *buf, const int64_t buf_len) const override { UNUSEDx(buf, buf_len); return OB_SUCCESS; } virtual lib::Worker::CompatMode get_compat_mode() const override { return lib::Worker::CompatMode::MYSQL; } virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; } virtual bool is_ha_dag() const override { return false; } INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(is_inited), K_(type), K_(id), K(task_list_.get_size())); protected: int64_t id_; // expect ret for op_ int64_t expect_; // expect ret for this dag int expect_ret_; AtomicOperator op_; bool running_; lib::ThreadPool *tester_; private: DISALLOW_COPY_AND_ASSIGN(TestDag); }; class TestPrepareTask : public ObITask { static const int64_t inc_task_cnt = 8; static const int64_t mul_task_cnt = 6; public: TestPrepareTask() : ObITask(ObITask::TASK_TYPE_UT), dag_id_(0), is_error_(false), sleep_us_(0), op_(NULL) {} int init(int64_t dag_id, AtomicOperator *op = NULL, bool is_error = false, int sleep_us = 0) { int ret = OB_SUCCESS; dag_id_ = dag_id; is_error_ = is_error; sleep_us_ = sleep_us; if (NULL != op) { op_ = op; } else { TestDag *dag = static_cast(get_dag()); if (OB_ISNULL(dag)) { ret = OB_ERR_UNEXPECTED; COMMON_LOG(WARN, "dag is null", K(ret)); } else { op_ = &dag->get_op(); } } return OB_SUCCESS; } int process() { int ret = OB_SUCCESS; TestDag*dag = static_cast(get_dag()); AtomicIncTask *inc_task = NULL; AtomicMulTask *mul_task = NULL; AtomicMulTask *mul_task1 = NULL; if (OB_ISNULL(dag)) { ret = OB_ERR_UNEXPECTED; COMMON_LOG(WARN, "dag is null", K(ret)); } else if (OB_FAIL(alloc_task(*dag, inc_task))) { COMMON_LOG(WARN, "failed to alloc inc_task", K(ret)); } else if (OB_FAIL(inc_task->init(1, inc_task_cnt, *op_))) { } else if (OB_FAIL(alloc_task(*dag, mul_task))){ COMMON_LOG(WARN, "failed to alloc mul task", K(ret)); } else if (OB_FAIL(mul_task->init(1, mul_task_cnt, *op_, 0, is_error_ ? 1 + (dag_id_ % mul_task_cnt) : -1))){ } else if (OB_FAIL(alloc_task(*dag, mul_task1))){ COMMON_LOG(WARN, "failed to alloc mul task", K(ret)); } else if (OB_FAIL(mul_task1->init(1, mul_task_cnt, *op_))){ } else if (OB_FAIL(mul_task->add_child(*inc_task))) { COMMON_LOG(WARN, "failed to add child", K(ret)); } else if (OB_FAIL(mul_task1->add_child(*inc_task))) { COMMON_LOG(WARN, "failed to add child", K(ret)); } else if (OB_FAIL(add_child(*mul_task))) { COMMON_LOG(WARN, "failed to add child to self", K(ret)); } else if (OB_FAIL(add_child(*mul_task1))) { COMMON_LOG(WARN, "failed to add child to self", K(ret)); } else if (OB_FAIL(dag->add_task(*inc_task))) { COMMON_LOG(WARN, "failed to add_task", K(ret)); } else if (OB_FAIL(dag->add_task(*mul_task1))) { COMMON_LOG(WARN, "failed to add_task", K(ret)); } else if (OB_FAIL(dag->add_task(*mul_task))) { COMMON_LOG(WARN, "failed to add_task", K(ret)); } else { dag->set_running(); } if (sleep_us_ > 0) { ::usleep(sleep_us_); if (is_error_) { ret = OB_ERR_UNEXPECTED; } } return ret; } private: int64_t dag_id_; bool is_error_; int sleep_us_; AtomicOperator *op_; }; class TestDagScheduler : public ::testing::Test { public: TestDagScheduler() : tenant_id_(1001), tablet_scheduler_(nullptr), scheduler_(nullptr), dag_history_mgr_(nullptr), tenant_base_(1001), allocator_("DagScheduler"), inited_(false) { } ~TestDagScheduler() {} void SetUp() { if (!inited_) { ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(1001); inited_ = true; } tablet_scheduler_ = OB_NEW(compaction::ObTenantTabletScheduler, ObModIds::TEST); tenant_base_.set(tablet_scheduler_); scheduler_ = OB_NEW(ObTenantDagScheduler, ObModIds::TEST); tenant_base_.set(scheduler_); dag_history_mgr_ = OB_NEW(ObDagWarningHistoryManager, ObModIds::TEST); tenant_base_.set(dag_history_mgr_); ObTenantEnv::set_tenant(&tenant_base_); ASSERT_EQ(OB_SUCCESS, tenant_base_.init()); ObMallocAllocator *ma = ObMallocAllocator::get_instance(); ASSERT_EQ(OB_SUCCESS, ma->set_tenant_limit(tenant_id_, 1LL << 30)); } void TearDown() { tablet_scheduler_->destroy(); tablet_scheduler_ = nullptr; scheduler_->destroy(); scheduler_ = nullptr; dag_history_mgr_->~ObDagWarningHistoryManager(); dag_history_mgr_ = nullptr; tenant_base_.destroy(); ObTenantEnv::set_tenant(nullptr); } private: const uint64_t tenant_id_; compaction::ObTenantTabletScheduler *tablet_scheduler_; ObTenantDagScheduler *scheduler_; ObDagWarningHistoryManager *dag_history_mgr_; ObTenantBase tenant_base_; ObArenaAllocator allocator_; bool inited_; DISALLOW_COPY_AND_ASSIGN(TestDagScheduler); }; TEST_F(TestDagScheduler, test_error_handling) { ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*); ASSERT_TRUE(nullptr != scheduler); ASSERT_EQ(OB_SUCCESS, scheduler->init(MTL_ID(), time_slice)); AtomicOperator op(0); TestDag *dag = NULL; AtomicMulTask *mul_task = NULL; AtomicIncTask *inc_task = NULL; EXPECT_EQ(OB_SUCCESS, scheduler->alloc_dag(dag)); EXPECT_EQ(OB_SUCCESS, dag->init(1)); EXPECT_EQ(OB_SUCCESS, alloc_task(*dag, mul_task)); EXPECT_EQ(OB_SUCCESS, mul_task->init(1, 10, op, 0, 8)); EXPECT_EQ(OB_SUCCESS, dag->add_task(*mul_task)); EXPECT_EQ(OB_SUCCESS, alloc_task(*dag, inc_task)); EXPECT_EQ(OB_SUCCESS, inc_task->init(1, 10, op)); EXPECT_EQ(OB_SUCCESS, mul_task->add_child(*inc_task)); EXPECT_EQ(OB_SUCCESS, dag->add_task(*inc_task)); EXPECT_EQ(OB_SUCCESS, scheduler->add_dag(dag)); wait_scheduler(); EXPECT_EQ(0, op.value()); EXPECT_EQ(OB_SUCCESS, scheduler->alloc_dag(dag)); EXPECT_EQ(OB_SUCCESS, dag->init(1)); EXPECT_EQ(OB_SUCCESS, alloc_task(*dag, mul_task)); EXPECT_EQ(OB_SUCCESS, mul_task->init(1, 1, op, 0, 1)); EXPECT_EQ(OB_SUCCESS, dag->add_task(*mul_task)); EXPECT_EQ(OB_SUCCESS, alloc_task(*dag, inc_task)); EXPECT_EQ(OB_SUCCESS, inc_task->init(1, 10, op)); EXPECT_EQ(OB_SUCCESS, inc_task->add_child(*mul_task)); EXPECT_EQ(OB_SUCCESS, dag->add_task(*inc_task)); EXPECT_EQ(OB_SUCCESS, alloc_task(*dag, inc_task)); EXPECT_EQ(OB_SUCCESS, inc_task->init(1, 10, op)); EXPECT_EQ(OB_SUCCESS, mul_task->add_child(*inc_task)); EXPECT_EQ(OB_SUCCESS, dag->add_task(*inc_task)); EXPECT_EQ(OB_SUCCESS, scheduler->add_dag(dag)); wait_scheduler(); EXPECT_EQ(10, op.value()); TestDag *dag1 = NULL; TestPrepareTask *prepare_task = NULL; op.reset(); EXPECT_EQ(OB_SUCCESS, scheduler->alloc_dag(dag1)); EXPECT_EQ(OB_SUCCESS, dag1->init(1)); EXPECT_EQ(OB_SUCCESS, alloc_task(*dag1, prepare_task)); EXPECT_EQ(OB_SUCCESS, prepare_task->init(1, &op, true)); EXPECT_EQ(OB_SUCCESS, dag1->add_task(*prepare_task)); EXPECT_EQ(OB_SUCCESS, scheduler->add_dag(dag1)); wait_scheduler(); EXPECT_EQ(0, op.value()); op.reset(); EXPECT_EQ(OB_SUCCESS, scheduler->alloc_dag(dag1)); EXPECT_EQ(OB_SUCCESS, dag1->init(1)); EXPECT_EQ(OB_SUCCESS, alloc_task(*dag1, prepare_task)); EXPECT_EQ(OB_SUCCESS, prepare_task->init(1, &op, true, 1000*1000)); EXPECT_EQ(OB_SUCCESS, dag1->add_task(*prepare_task)); EXPECT_EQ(OB_SUCCESS, scheduler->add_dag(dag1)); wait_scheduler(); EXPECT_EQ(0, op.value()); } } } void parse_cmd_arg(int argc, char **argv) { int opt = 0; const char *opt_string = "p:s:l:"; struct option longopts[] = { {"dag cnt for performance test", 1, NULL, 'p'}, {"stress test time", 1, NULL, 's'}, {"log level", 1, NULL, 'l'}, {0,0,0,0} }; while (-1 != (opt = getopt_long(argc, argv, opt_string, longopts, NULL))) { switch(opt) { case 'p': dag_cnt = strtoll(optarg, NULL, 10); break; case 's': stress_time = strtoll(optarg, NULL, 10); break; case 'l': snprintf(log_level, 20, "%s", optarg); break; default: break; } } } int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); parse_cmd_arg(argc, argv); OB_LOGGER.set_log_level(log_level); OB_LOGGER.set_max_file_size(256*1024*1024); system("rm -f test_dag_scheduler_error_handling.log*"); OB_LOGGER.set_file_name("test_dag_scheduler_error_handling.log"); return RUN_ALL_TESTS(); }