diff --git a/src/share/scheduler/ob_dag_scheduler.cpp b/src/share/scheduler/ob_dag_scheduler.cpp index 6885374870..f06cbc6f28 100755 --- a/src/share/scheduler/ob_dag_scheduler.cpp +++ b/src/share/scheduler/ob_dag_scheduler.cpp @@ -367,6 +367,7 @@ ObIDag::ObIDag(const ObDagType::ObDagTypeEnum type) add_time_(0), start_time_(0), consumer_group_id_(0), + allocator_(nullptr), is_inited_(false), type_(type), priority_(OB_DAG_TYPES[type].init_dag_prio_), @@ -408,7 +409,7 @@ void ObIDag::clear_task_list() while (NULL != cur && task_list_.get_header() != cur) { next = cur->get_next(); cur->~ObITask(); - allocator_.free(cur); + allocator_->free(cur); cur = next; } task_list_.reset(); @@ -436,7 +437,7 @@ void ObIDag::reset() is_stop_ = false; dag_net_ = nullptr; list_idx_ = DAG_LIST_MAX; - allocator_.reset(); + allocator_ = nullptr; } int ObIDag::add_task(ObITask &task) @@ -638,7 +639,7 @@ void ObIDag::free_task(ObITask &task) COMMON_LOG_RET(WARN, OB_NOT_INIT, "dag is not inited"); } else { task.~ObITask(); - allocator_.free(&task); + allocator_->free(&task); } } @@ -653,7 +654,7 @@ int ObIDag::remove_task(ObITask &task) COMMON_LOG(WARN, "failed to remove task from task_list", K_(id)); } else { task.~ObITask(); - allocator_.free(&task); + allocator_->free(&task); } return ret; } @@ -671,19 +672,14 @@ bool ObIDag::is_valid_type() const return type_ >= 0 && type_ < ObDagType::DAG_TYPE_MAX; } -int ObIDag::basic_init(const int64_t total, - const int64_t hold, - const int64_t page_size) +int ObIDag::basic_init(ObIAllocator &allocator) { int ret = OB_SUCCESS; - const lib::ObMemAttr mem_attr(MTL_ID(), "DagTask"); if (is_inited_) { ret = OB_INIT_TWICE; COMMON_LOG(WARN, "dag init twice", K(ret)); - } else if (OB_FAIL(allocator_.init(lib::ObMallocAllocator::get_instance(), page_size, - mem_attr, 0, hold, total))) { - COMMON_LOG(WARN, "failed to init allocator", K(ret), K(total), K(hold), K(page_size)); } else { + allocator_ = &allocator; is_inited_ = true; } return ret; @@ -863,7 +859,7 @@ ObIDagNet::ObIDagNet( const ObDagNetType::ObDagNetTypeEnum type) : is_stopped_(false), lock_(common::ObLatchIds::WORK_DAG_NET_LOCK), - allocator_("DagNet", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), + allocator_(nullptr), type_(type), add_time_(0), start_time_(0), @@ -882,7 +878,10 @@ int ObIDagNet::add_dag_into_dag_net(ObIDag &dag) WEAK_BARRIER(); const bool is_stop = is_stopped_; - if (OB_NOT_NULL(dag.get_dag_net())) { + if (!is_inited()) { + ret = OB_NOT_INIT; + COMMON_LOG(WARN, "dag net not basic init", K(ret), K(this)); + } else if (OB_NOT_NULL(dag.get_dag_net())) { ret = OB_INVALID_ARGUMENT; COMMON_LOG(WARN, "dag already belongs to a dag_net", K(ret), K(dag)); } else if (is_stop) { @@ -897,7 +896,7 @@ int ObIDagNet::add_dag_into_dag_net(ObIDag &dag) } else if (OB_HASH_NOT_EXIST != (hash_ret = dag_record_map_.get_refactored(&dag, dag_record))) { ret = OB_SUCCESS == hash_ret ? OB_ERR_UNEXPECTED : hash_ret; COMMON_LOG(WARN, "dag record maybe already exist in map", K(ret), K(hash_ret), K(dag)); - } else if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObDagRecord)))) { + } else if (OB_ISNULL(buf = allocator_->alloc(sizeof(ObDagRecord)))) { ret = OB_ALLOCATE_MEMORY_FAILED; COMMON_LOG(WARN, "allocate memory failed", K(ret), K(buf)); } else { @@ -913,7 +912,7 @@ int ObIDagNet::add_dag_into_dag_net(ObIDag &dag) } if (OB_FAIL(ret) && OB_NOT_NULL(buf)) { - allocator_.free(buf); + allocator_->free(buf); buf = nullptr; } } @@ -931,9 +930,11 @@ void ObIDagNet::reset() ObDagRecord *dag_record = iter->second; if (OB_ISNULL(dag_record)) { LOG_ERROR_RET(OB_ERR_UNEXPECTED, "dag record should not be NULL", KPC(this), KPC(dag_record)); - } else if (ObIDag::DAG_STATUS_FINISH != dag_record->dag_status_ - && ObIDag::DAG_STATUS_ABORT != dag_record->dag_status_) { - LOG_ERROR_RET(OB_ERR_UNEXPECTED, "dag net should not be reset, dag in dag_net is not finish!!!", KPC(this), KPC(dag_record)); + } else { + if (!ObIDag::is_finish_status(dag_record->dag_status_)) { + LOG_ERROR_RET(OB_ERR_UNEXPECTED, "dag net should not be reset, dag in dag_net is not finish!!!", KPC(this), KPC(dag_record)); + } + allocator_->free(dag_record); } } dag_record_map_.destroy(); @@ -944,6 +945,18 @@ void ObIDagNet::reset() start_time_ = 0; } +int ObIDagNet::basic_init(ObIAllocator &allocator) +{ + int ret = OB_SUCCESS; + if (is_inited()) { + ret = OB_INIT_TWICE; + COMMON_LOG(WARN, "dag net is inited", K(ret), K(this)); + } else { + allocator_ = &allocator; + } + return ret; +} + bool ObIDagNet::check_finished_and_mark_stop() { ObMutexGuard guard(lock_); @@ -962,7 +975,10 @@ int ObIDagNet::update_dag_status(ObIDag &dag) ObMutexGuard guard(lock_); WEAK_BARRIER(); - if (OB_UNLIKELY(is_stopped_)) { + if (!is_inited()) { + ret = OB_NOT_INIT; + COMMON_LOG(WARN, "dag net not basic init", K(ret), K(this)); + } else if (OB_UNLIKELY(is_stopped_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("dag net is in stop state", K(ret), K(dag), KP(this)); } else if (OB_FAIL(dag_record_map_.get_refactored(&dag, dag_record))) { @@ -993,7 +1009,7 @@ void ObIDagNet::remove_dag_record_(ObDagRecord &dag_record) COMMON_LOG(WARN, "failed to remove dag record", K(dag_record)); ob_abort(); } - allocator_.free(&dag_record); + allocator_->free(&dag_record); } // called when ObIDag::add_child failed @@ -1035,7 +1051,8 @@ int64_t ObIDagNet::to_string(char* buf, const int64_t buf_len) const J_OBJ_START(); J_NAME("ObIDagNet"); J_COLON(); - J_KV(KP(this), K_(type), K_(dag_id), "dag_record_cnt", dag_record_map_.size()); + J_KV(KP(this), K_(type), K_(dag_id), "dag_record_cnt", dag_record_map_.size(), + K_(is_stopped), K_(is_cancel), KP_(allocator)); J_OBJ_END(); } return pos; @@ -1072,6 +1089,12 @@ bool ObIDagNet::is_cancel() return is_cancel_; } +bool ObIDagNet::is_inited() +{ + return OB_NOT_NULL(allocator_); +} + + void ObIDagNet::gene_dag_info(ObDagInfo &info, const char *list_info) { ObMutexGuard guard(lock_); @@ -1578,36 +1601,39 @@ void ObTenantDagScheduler::reload_config() } } +int ObTenantDagScheduler::init_allocator( + const uint64_t tenant_id, + const lib::ObLabel &label, + lib::MemoryContext &mem_context) +{ + int ret = OB_SUCCESS; + ContextParam param; + param.set_mem_attr(tenant_id, label, common::ObCtxIds::DEFAULT_CTX_ID) + .set_ablock_size(lib::INTACT_MIDDLE_AOBJECT_SIZE) + .set_properties(ALLOC_THREAD_SAFE) + .set_parallel(8); + if (OB_FAIL(ROOT_CONTEXT->CREATE_CONTEXT(mem_context, param))) { + COMMON_LOG(WARN, "fail to create entity", K(ret)); + } else if (nullptr == mem_context) { + ret = OB_ERR_UNEXPECTED; + COMMON_LOG(WARN, "memory entity is null", K(ret)); + } + return ret; +} + int ObTenantDagScheduler::init( const uint64_t tenant_id, const int64_t check_period /* =DEFAULT_CHECK_PERIOD */, const int64_t loop_waiting_list_period /* = LOOP_WAITING_DAG_LIST_INTERVAL*/, - const int64_t dag_limit /*= DEFAULT_MAX_DAG_NUM*/, - const int64_t total_mem_limit /*= TOTAL_LIMIT*/, - const int64_t hold_mem_limit /*= HOLD_LIMIT*/, - const int64_t page_size /*= PAGE_SIZE*/) + const int64_t dag_limit /*= DEFAULT_MAX_DAG_NUM*/) { int ret = OB_SUCCESS; - const lib::ObMemAttr mem_attr(tenant_id, ObModIds::OB_SCHEDULER); - const lib::ObMemAttr ha_mem_attr(tenant_id, "HAScheduler"); if (IS_INIT) { ret = OB_INIT_TWICE; COMMON_LOG(WARN, "scheduler init twice", K(ret)); - } else if (OB_INVALID_ID == tenant_id - || 0 >= dag_limit - || 0 >= total_mem_limit || 0 >= hold_mem_limit - || hold_mem_limit > total_mem_limit || 0 >= page_size) { + } else if (OB_INVALID_ID == tenant_id || 0 >= dag_limit) { ret = OB_INVALID_ARGUMENT; - COMMON_LOG(WARN, "init ObTenantDagScheduler with invalid arguments", K(ret), K(tenant_id), K(dag_limit), - K(total_mem_limit), K(hold_mem_limit), K(page_size)); - } else if (OB_FAIL(allocator_.init(lib::ObMallocAllocator::get_instance(), page_size, - mem_attr, 0, hold_mem_limit, total_mem_limit))) { - COMMON_LOG(WARN, "failed to init allocator", K(ret), K(total_mem_limit), K(hold_mem_limit), - K(page_size)); - } else if (OB_FAIL(ha_allocator_.init(lib::ObMallocAllocator::get_instance(), page_size, - ha_mem_attr, 0, hold_mem_limit, total_mem_limit))) { - COMMON_LOG(WARN, "failed to init allocator", K(ret), K(total_mem_limit), K(hold_mem_limit), - K(page_size)); + COMMON_LOG(WARN, "init ObTenantDagScheduler with invalid arguments", K(ret), K(tenant_id), K(dag_limit)); } else if (OB_FAIL(dag_map_.create(dag_limit, "DagMap", "DagNode", tenant_id))) { COMMON_LOG(WARN, "failed to create dap map", K(ret), K(dag_limit)); } else if (OB_FAIL(dag_net_map_[RUNNING_DAG_NET_MAP].create(dag_limit, "DagNetMap", "DagNetNode", tenant_id))) { @@ -1616,6 +1642,10 @@ int ObTenantDagScheduler::init( COMMON_LOG(WARN, "failed to create dap net id map", K(ret), K(dag_limit)); } else if (OB_FAIL(scheduler_sync_.init(ObWaitEventIds::SCHEDULER_COND_WAIT))) { COMMON_LOG(WARN, "failed to init task queue sync", K(ret)); + } else if (OB_FAIL(init_allocator(tenant_id, ObModIds::OB_SCHEDULER, mem_context_))) { + COMMON_LOG(WARN, "failed to init scheduler allocator", K(ret)); + } else if (OB_FAIL(init_allocator(tenant_id, "HAScheduler", ha_mem_context_))) { + COMMON_LOG(WARN, "failed to init ha scheduler allocator", K(ret)); } if (OB_SUCC(ret)) { check_period_ = check_period; @@ -1707,22 +1737,25 @@ void ObTenantDagScheduler::reset() } if (dag_net_map_[RUNNING_DAG_NET_MAP].created()) { for (DagNetMap::iterator iter = dag_net_map_[RUNNING_DAG_NET_MAP].begin(); iter != dag_net_map_[RUNNING_DAG_NET_MAP].end(); ++iter) { - const bool ha_dag_net = iter->second->is_ha_dag_net(); + ObIAllocator &allocator = get_allocator(iter->second->is_ha_dag_net()); iter->second->~ObIDagNet(); - if (ha_dag_net) { - ha_allocator_.free(iter->second); - } else { - allocator_.free(iter->second); - } + allocator.free(iter->second); } // end of for dag_net_map_[RUNNING_DAG_NET_MAP].destroy(); } if (dag_net_id_map_.created()) { dag_net_id_map_.destroy(); } - COMMON_LOG(INFO, "ObTenantDagScheduler before allocator destroyed", K(abort_dag_cnt), K(allocator_.used()), K(ha_allocator_.used())); - allocator_.reset(); - ha_allocator_.reset(); + COMMON_LOG(INFO, "ObTenantDagScheduler before allocator destroyed", K(abort_dag_cnt)); + // there will be 'HAS UNFREE PTR' log with label when some ptrs haven't been free + if (NULL != mem_context_) { + DESTROY_CONTEXT(mem_context_); + mem_context_ = nullptr; + } + if (NULL != ha_mem_context_) { + DESTROY_CONTEXT(ha_mem_context_); + ha_mem_context_ = nullptr; + } scheduler_sync_.destroy(); dag_cnt_ = 0; dag_limit_ = 0; @@ -1775,13 +1808,9 @@ void ObTenantDagScheduler::inner_free_dag(ObIDag &dag) if (OB_UNLIKELY(nullptr != dag.prev_ || nullptr != dag.next_)) { LOG_ERROR_RET(OB_ERR_UNEXPECTED, "dag is in dag_list", K(dag), K(dag.prev_), K(dag.next_)); } - const bool ha_dag = dag.is_ha_dag(); + ObIAllocator &allocator = get_allocator(dag.is_ha_dag()); dag.~ObIDag(); - if (ha_dag) { - ha_allocator_.free(&dag); - } else { - allocator_.free(&dag); - } + allocator.free(&dag); } void ObTenantDagScheduler::get_default_config() @@ -2002,6 +2031,15 @@ int ObTenantDagScheduler::gene_basic_info( return ret; } +common::ObIAllocator &ObTenantDagScheduler::get_allocator(const bool is_ha) +{ + common::ObIAllocator *allocator = &mem_context_->get_malloc_allocator(); + if (is_ha) { + allocator = &ha_mem_context_->get_malloc_allocator(); + } + return *allocator; +} + int ObTenantDagScheduler::get_all_dag_scheduler_info( common::ObIAllocator &allocator, common::ObIArray &scheduler_infos) diff --git a/src/share/scheduler/ob_dag_scheduler.h b/src/share/scheduler/ob_dag_scheduler.h index 1ef8ac1ca6..b49f03436e 100644 --- a/src/share/scheduler/ob_dag_scheduler.h +++ b/src/share/scheduler/ob_dag_scheduler.h @@ -14,8 +14,6 @@ #define SRC_SHARE_SCHEDULER_OB_DAG_SCHEDULER_H_ #include "lib/ob_define.h" -#include "lib/allocator/ob_concurrent_fifo_allocator.h" -#include "lib/allocator/ob_fifo_allocator.h" #include "lib/container/ob_se_array.h" #include "lib/hash/ob_hashmap.h" #include "lib/list/ob_dlink_node.h" @@ -376,9 +374,6 @@ private: typedef common::ObDList TaskList; typedef lib::ObLockGuard ObDagGuard; static const int64_t DEFAULT_TASK_NUM = 32; - static const int64_t TOTAL_LIMIT = 1024L * 1024L * 1024L; - static const int64_t HOLD_LIMIT = 8 * 1024L * 1024L; - static const int64_t PAGE_SIZE = common::OB_MALLOC_NORMAL_BLOCK_SIZE; friend class ObTenantDagScheduler; friend class ObITask; friend class ObTenantDagWorker; @@ -388,9 +383,7 @@ private: int finish_task(ObITask &task); bool is_valid(); bool is_valid_type() const; - int basic_init(const int64_t total = TOTAL_LIMIT, - const int64_t hold = HOLD_LIMIT, - const int64_t page_size = PAGE_SIZE); + int basic_init(ObIAllocator &allocator); void reset_task_running_status(ObITask &task, ObITask::ObITaskStatus task_status); void reset(); void clear_task_list(); @@ -407,7 +400,7 @@ private: void dec_running_task_cnt() { --running_task_cnt_; } int inner_add_child_without_inheritance(ObIDag &child); private: - common::ObFIFOAllocator allocator_; + common::ObIAllocator *allocator_; bool is_inited_; ObDagType::ObDagTypeEnum type_; ObDagPrio::ObDagPrioEnum priority_; @@ -460,6 +453,7 @@ public: ObIDagNet::reset(); } void reset(); + int basic_init(ObIAllocator &allocator); ObDagNetType::ObDagNetTypeEnum get_type() const { return type_; } int add_dag_into_dag_net(ObIDag &dag); bool check_finished_and_mark_stop(); @@ -481,6 +475,7 @@ public: } void set_cancel(); bool is_cancel(); + bool is_inited(); virtual int deal_with_cancel() { return OB_SUCCESS; @@ -505,7 +500,7 @@ private: private: bool is_stopped_; lib::ObMutex lock_; - ObArenaAllocator allocator_; // use to alloc dag in dag_net later + common::ObIAllocator *allocator_; // use to alloc dag in dag_net later ObDagNetType::ObDagNetTypeEnum type_; int64_t add_time_; int64_t start_time_; @@ -772,10 +767,7 @@ public: int init(const uint64_t tenant_id, const int64_t check_period = DEFAULT_CHECK_PERIOD, const int64_t loop_waiting_list_period = LOOP_WAITING_DAG_LIST_INTERVAL, - const int64_t dag_limit = DEFAULT_MAX_DAG_NUM, - const int64_t total_mem_limit = TOTAL_LIMIT, - const int64_t hold_mem_limit = HOLD_LIMIT, - const int64_t page_size = PAGE_SIZE); + const int64_t dag_limit = DEFAULT_MAX_DAG_NUM); int add_dag(ObIDag *dag, const bool emergency = false, const bool check_size_overflow = true); int add_dag_net(ObIDagNet *dag_net); template @@ -861,9 +853,6 @@ private: static const int64_t TMP_WEIGHT = 5; static const int64_t SCHEDULER_WAIT_TIME_MS = 1000; // 1s - static const int64_t TOTAL_LIMIT = 1024L * 1024L * 1024L; - static const int64_t HOLD_LIMIT = 8 * 1024L * 1024L; - static const int64_t PAGE_SIZE = common::OB_MALLOC_NORMAL_BLOCK_SIZE; static const int64_t DAG_SIZE_LIMIT = 10 << 12; static const int64_t DEFAULT_MAX_DAG_NUM = 15000; static const int64_t DEFAULT_MAX_DAG_MAP_CNT = 150000; @@ -937,6 +926,8 @@ private: int try_move_child_to_ready_list(ObIDag &dag); void inner_free_dag(ObIDag &dag); OB_INLINE int64_t get_dag_limit(const ObDagPrio::ObDagPrioEnum dag_prio); + common::ObIAllocator &get_allocator(const bool is_ha); + int init_allocator(const uint64_t tenant_id, const lib::ObLabel &label, lib::MemoryContext &mem_context); private: bool is_inited_; @@ -962,8 +953,8 @@ private: int64_t scheduled_task_cnts_[ObDagType::DAG_TYPE_MAX]; // interval scheduled dag count int64_t dag_cnts_[ObDagType::DAG_TYPE_MAX]; int64_t dag_net_cnts_[ObDagNetType::DAG_NET_TYPE_MAX]; - common::ObFIFOAllocator allocator_; - common::ObFIFOAllocator ha_allocator_; + lib::MemoryContext mem_context_; + lib::MemoryContext ha_mem_context_; PriorityWorkerList waiting_workers_; // workers waiting for time slice to run PriorityWorkerList running_workers_; // running workers WorkerList free_workers_; // free workers who have not been assigned to any task @@ -982,7 +973,7 @@ int ObIDag::alloc_task(T *&task) if (IS_NOT_INIT) { ret = common::OB_NOT_INIT; COMMON_LOG(WARN, "dag is not inited", K(ret)); - } else if (NULL == (buf = allocator_.alloc(sizeof(T)))) { + } else if (NULL == (buf = allocator_->alloc(sizeof(T)))) { ret = common::OB_ALLOCATE_MEMORY_FAILED; task = NULL; COMMON_LOG(WARN, "failed to alloc task", K(ret)); @@ -1018,13 +1009,13 @@ int ObTenantDagScheduler::alloc_dag(T *&dag) COMMON_LOG(WARN, "Dag Object is too large", K(ret), K(sizeof(T))); } else { T tmp_dag; - common::ObFIFOAllocator *allocator = tmp_dag.is_ha_dag() ? &ha_allocator_ : &allocator_; - if (NULL == (buf = allocator->alloc(sizeof(T)))) { + ObIAllocator &allocator = get_allocator(tmp_dag.is_ha_dag()); + if (NULL == (buf = allocator.alloc(sizeof(T)))) { ret = common::OB_ALLOCATE_MEMORY_FAILED; COMMON_LOG(WARN, "failed to alloc dag", K(ret)); } else { ObIDag *new_dag = new (buf) T(); - if (OB_FAIL(new_dag->basic_init())) { + if (OB_FAIL(new_dag->basic_init(allocator))) { COMMON_LOG(WARN, "failed to init dag", K(ret)); // failed to init, free dag @@ -1042,13 +1033,9 @@ template void ObTenantDagScheduler::free_dag_net(T *&dag_net) { if (OB_NOT_NULL(dag_net)) { - const bool ha_dag_net = dag_net->is_ha_dag_net(); + ObIAllocator &allocator = get_allocator(dag_net->is_ha_dag_net()); dag_net->~T(); - if (ha_dag_net) { - ha_allocator_.free(dag_net); - } else { - allocator_.free(dag_net); - } + allocator.free(dag_net); dag_net = nullptr; } } @@ -1065,13 +1052,14 @@ int ObTenantDagScheduler::create_and_add_dag_net(const ObIDagInitParam *param) COMMON_LOG(WARN, "scheduler is not init", K(ret)); } else { T tmp_dag_net; - common::ObFIFOAllocator *allocator = tmp_dag_net.is_ha_dag_net() ? &ha_allocator_ : &allocator_; - if (NULL == (buf = allocator->alloc(sizeof(T)))) { + ObIAllocator &allocator = get_allocator(tmp_dag_net.is_ha_dag_net()); + if (NULL == (buf = allocator.alloc(sizeof(T)))) { ret = common::OB_ALLOCATE_MEMORY_FAILED; COMMON_LOG(WARN, "failed to alloc dag_net", K(ret)); } else if (FALSE_IT(dag_net = new (buf) T())) { } else if (OB_FAIL(dag_net->init_by_param(param))) { COMMON_LOG(WARN, "failed to init dag_net", K(ret), KPC(dag_net)); + } else if (FALSE_IT(dag_net->basic_init(allocator))) { } else if (FALSE_IT(dag_net->init_dag_id_())) { } else if (OB_FAIL(add_dag_net(dag_net))) { if (common::OB_HASH_EXIST == ret) { diff --git a/unittest/share/scheduler/test_dag_scheduler.cpp b/unittest/share/scheduler/test_dag_scheduler.cpp index 816e621f09..7fe1ea9a48 100644 --- a/unittest/share/scheduler/test_dag_scheduler.cpp +++ b/unittest/share/scheduler/test_dag_scheduler.cpp @@ -851,13 +851,6 @@ TEST_F(TestDagScheduler, test_init) // invalid thread cnt EXPECT_EQ(OB_INVALID_ARGUMENT, scheduler->init(MTL_ID(), time_slice, time_slice, -1)); - // invalid dag_limit - EXPECT_EQ(OB_INVALID_ARGUMENT, scheduler->init(MTL_ID(), time_slice, time_slice, 0, 0)); - // invalid total_mem_limit - EXPECT_EQ(OB_INVALID_ARGUMENT, scheduler->init(MTL_ID(), time_slice, time_slice, 0, 10, 0)); - // invalid hold_mem_limit - EXPECT_EQ(OB_INVALID_ARGUMENT, scheduler->init(MTL_ID(), time_slice, time_slice, 0, 10, 1024, 0)); - EXPECT_EQ(OB_INVALID_ARGUMENT, scheduler->init(MTL_ID(), time_slice, time_slice, 0, 10, 1024, 2048)); EXPECT_EQ(OB_SUCCESS, scheduler->init(MTL_ID(),time_slice, time_slice, 100)); EXPECT_EQ(OB_INIT_TWICE, scheduler->init(MTL_ID()));