[CP] use memory_context in dag_scheduler
This commit is contained in:
@ -367,6 +367,7 @@ ObIDag::ObIDag(const ObDagType::ObDagTypeEnum type)
|
|||||||
add_time_(0),
|
add_time_(0),
|
||||||
start_time_(0),
|
start_time_(0),
|
||||||
consumer_group_id_(0),
|
consumer_group_id_(0),
|
||||||
|
allocator_(nullptr),
|
||||||
is_inited_(false),
|
is_inited_(false),
|
||||||
type_(type),
|
type_(type),
|
||||||
priority_(OB_DAG_TYPES[type].init_dag_prio_),
|
priority_(OB_DAG_TYPES[type].init_dag_prio_),
|
||||||
@ -408,7 +409,7 @@ void ObIDag::clear_task_list()
|
|||||||
while (NULL != cur && task_list_.get_header() != cur) {
|
while (NULL != cur && task_list_.get_header() != cur) {
|
||||||
next = cur->get_next();
|
next = cur->get_next();
|
||||||
cur->~ObITask();
|
cur->~ObITask();
|
||||||
allocator_.free(cur);
|
allocator_->free(cur);
|
||||||
cur = next;
|
cur = next;
|
||||||
}
|
}
|
||||||
task_list_.reset();
|
task_list_.reset();
|
||||||
@ -436,7 +437,7 @@ void ObIDag::reset()
|
|||||||
is_stop_ = false;
|
is_stop_ = false;
|
||||||
dag_net_ = nullptr;
|
dag_net_ = nullptr;
|
||||||
list_idx_ = DAG_LIST_MAX;
|
list_idx_ = DAG_LIST_MAX;
|
||||||
allocator_.reset();
|
allocator_ = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObIDag::add_task(ObITask &task)
|
int ObIDag::add_task(ObITask &task)
|
||||||
@ -638,7 +639,7 @@ void ObIDag::free_task(ObITask &task)
|
|||||||
COMMON_LOG_RET(WARN, OB_NOT_INIT, "dag is not inited");
|
COMMON_LOG_RET(WARN, OB_NOT_INIT, "dag is not inited");
|
||||||
} else {
|
} else {
|
||||||
task.~ObITask();
|
task.~ObITask();
|
||||||
allocator_.free(&task);
|
allocator_->free(&task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -653,7 +654,7 @@ int ObIDag::remove_task(ObITask &task)
|
|||||||
COMMON_LOG(WARN, "failed to remove task from task_list", K_(id));
|
COMMON_LOG(WARN, "failed to remove task from task_list", K_(id));
|
||||||
} else {
|
} else {
|
||||||
task.~ObITask();
|
task.~ObITask();
|
||||||
allocator_.free(&task);
|
allocator_->free(&task);
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -671,19 +672,14 @@ bool ObIDag::is_valid_type() const
|
|||||||
return type_ >= 0 && type_ < ObDagType::DAG_TYPE_MAX;
|
return type_ >= 0 && type_ < ObDagType::DAG_TYPE_MAX;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObIDag::basic_init(const int64_t total,
|
int ObIDag::basic_init(ObIAllocator &allocator)
|
||||||
const int64_t hold,
|
|
||||||
const int64_t page_size)
|
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
const lib::ObMemAttr mem_attr(MTL_ID(), "DagTask");
|
|
||||||
if (is_inited_) {
|
if (is_inited_) {
|
||||||
ret = OB_INIT_TWICE;
|
ret = OB_INIT_TWICE;
|
||||||
COMMON_LOG(WARN, "dag init twice", K(ret));
|
COMMON_LOG(WARN, "dag init twice", K(ret));
|
||||||
} else if (OB_FAIL(allocator_.init(lib::ObMallocAllocator::get_instance(), page_size,
|
|
||||||
mem_attr, 0, hold, total))) {
|
|
||||||
COMMON_LOG(WARN, "failed to init allocator", K(ret), K(total), K(hold), K(page_size));
|
|
||||||
} else {
|
} else {
|
||||||
|
allocator_ = &allocator;
|
||||||
is_inited_ = true;
|
is_inited_ = true;
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -863,7 +859,7 @@ ObIDagNet::ObIDagNet(
|
|||||||
const ObDagNetType::ObDagNetTypeEnum type)
|
const ObDagNetType::ObDagNetTypeEnum type)
|
||||||
: is_stopped_(false),
|
: is_stopped_(false),
|
||||||
lock_(common::ObLatchIds::WORK_DAG_NET_LOCK),
|
lock_(common::ObLatchIds::WORK_DAG_NET_LOCK),
|
||||||
allocator_("DagNet", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
|
allocator_(nullptr),
|
||||||
type_(type),
|
type_(type),
|
||||||
add_time_(0),
|
add_time_(0),
|
||||||
start_time_(0),
|
start_time_(0),
|
||||||
@ -882,7 +878,10 @@ int ObIDagNet::add_dag_into_dag_net(ObIDag &dag)
|
|||||||
WEAK_BARRIER();
|
WEAK_BARRIER();
|
||||||
const bool is_stop = is_stopped_;
|
const bool is_stop = is_stopped_;
|
||||||
|
|
||||||
if (OB_NOT_NULL(dag.get_dag_net())) {
|
if (!is_inited()) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
COMMON_LOG(WARN, "dag net not basic init", K(ret), K(this));
|
||||||
|
} else if (OB_NOT_NULL(dag.get_dag_net())) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
COMMON_LOG(WARN, "dag already belongs to a dag_net", K(ret), K(dag));
|
COMMON_LOG(WARN, "dag already belongs to a dag_net", K(ret), K(dag));
|
||||||
} else if (is_stop) {
|
} else if (is_stop) {
|
||||||
@ -897,7 +896,7 @@ int ObIDagNet::add_dag_into_dag_net(ObIDag &dag)
|
|||||||
} else if (OB_HASH_NOT_EXIST != (hash_ret = dag_record_map_.get_refactored(&dag, dag_record))) {
|
} else if (OB_HASH_NOT_EXIST != (hash_ret = dag_record_map_.get_refactored(&dag, dag_record))) {
|
||||||
ret = OB_SUCCESS == hash_ret ? OB_ERR_UNEXPECTED : hash_ret;
|
ret = OB_SUCCESS == hash_ret ? OB_ERR_UNEXPECTED : hash_ret;
|
||||||
COMMON_LOG(WARN, "dag record maybe already exist in map", K(ret), K(hash_ret), K(dag));
|
COMMON_LOG(WARN, "dag record maybe already exist in map", K(ret), K(hash_ret), K(dag));
|
||||||
} else if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObDagRecord)))) {
|
} else if (OB_ISNULL(buf = allocator_->alloc(sizeof(ObDagRecord)))) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
COMMON_LOG(WARN, "allocate memory failed", K(ret), K(buf));
|
COMMON_LOG(WARN, "allocate memory failed", K(ret), K(buf));
|
||||||
} else {
|
} else {
|
||||||
@ -913,7 +912,7 @@ int ObIDagNet::add_dag_into_dag_net(ObIDag &dag)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (OB_FAIL(ret) && OB_NOT_NULL(buf)) {
|
if (OB_FAIL(ret) && OB_NOT_NULL(buf)) {
|
||||||
allocator_.free(buf);
|
allocator_->free(buf);
|
||||||
buf = nullptr;
|
buf = nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -931,9 +930,11 @@ void ObIDagNet::reset()
|
|||||||
ObDagRecord *dag_record = iter->second;
|
ObDagRecord *dag_record = iter->second;
|
||||||
if (OB_ISNULL(dag_record)) {
|
if (OB_ISNULL(dag_record)) {
|
||||||
LOG_ERROR_RET(OB_ERR_UNEXPECTED, "dag record should not be NULL", KPC(this), KPC(dag_record));
|
LOG_ERROR_RET(OB_ERR_UNEXPECTED, "dag record should not be NULL", KPC(this), KPC(dag_record));
|
||||||
} else if (ObIDag::DAG_STATUS_FINISH != dag_record->dag_status_
|
} else {
|
||||||
&& ObIDag::DAG_STATUS_ABORT != dag_record->dag_status_) {
|
if (!ObIDag::is_finish_status(dag_record->dag_status_)) {
|
||||||
LOG_ERROR_RET(OB_ERR_UNEXPECTED, "dag net should not be reset, dag in dag_net is not finish!!!", KPC(this), KPC(dag_record));
|
LOG_ERROR_RET(OB_ERR_UNEXPECTED, "dag net should not be reset, dag in dag_net is not finish!!!", KPC(this), KPC(dag_record));
|
||||||
|
}
|
||||||
|
allocator_->free(dag_record);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
dag_record_map_.destroy();
|
dag_record_map_.destroy();
|
||||||
@ -944,6 +945,18 @@ void ObIDagNet::reset()
|
|||||||
start_time_ = 0;
|
start_time_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObIDagNet::basic_init(ObIAllocator &allocator)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (is_inited()) {
|
||||||
|
ret = OB_INIT_TWICE;
|
||||||
|
COMMON_LOG(WARN, "dag net is inited", K(ret), K(this));
|
||||||
|
} else {
|
||||||
|
allocator_ = &allocator;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
bool ObIDagNet::check_finished_and_mark_stop()
|
bool ObIDagNet::check_finished_and_mark_stop()
|
||||||
{
|
{
|
||||||
ObMutexGuard guard(lock_);
|
ObMutexGuard guard(lock_);
|
||||||
@ -962,7 +975,10 @@ int ObIDagNet::update_dag_status(ObIDag &dag)
|
|||||||
|
|
||||||
ObMutexGuard guard(lock_);
|
ObMutexGuard guard(lock_);
|
||||||
WEAK_BARRIER();
|
WEAK_BARRIER();
|
||||||
if (OB_UNLIKELY(is_stopped_)) {
|
if (!is_inited()) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
COMMON_LOG(WARN, "dag net not basic init", K(ret), K(this));
|
||||||
|
} else if (OB_UNLIKELY(is_stopped_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("dag net is in stop state", K(ret), K(dag), KP(this));
|
LOG_WARN("dag net is in stop state", K(ret), K(dag), KP(this));
|
||||||
} else if (OB_FAIL(dag_record_map_.get_refactored(&dag, dag_record))) {
|
} else if (OB_FAIL(dag_record_map_.get_refactored(&dag, dag_record))) {
|
||||||
@ -993,7 +1009,7 @@ void ObIDagNet::remove_dag_record_(ObDagRecord &dag_record)
|
|||||||
COMMON_LOG(WARN, "failed to remove dag record", K(dag_record));
|
COMMON_LOG(WARN, "failed to remove dag record", K(dag_record));
|
||||||
ob_abort();
|
ob_abort();
|
||||||
}
|
}
|
||||||
allocator_.free(&dag_record);
|
allocator_->free(&dag_record);
|
||||||
}
|
}
|
||||||
|
|
||||||
// called when ObIDag::add_child failed
|
// called when ObIDag::add_child failed
|
||||||
@ -1035,7 +1051,8 @@ int64_t ObIDagNet::to_string(char* buf, const int64_t buf_len) const
|
|||||||
J_OBJ_START();
|
J_OBJ_START();
|
||||||
J_NAME("ObIDagNet");
|
J_NAME("ObIDagNet");
|
||||||
J_COLON();
|
J_COLON();
|
||||||
J_KV(KP(this), K_(type), K_(dag_id), "dag_record_cnt", dag_record_map_.size());
|
J_KV(KP(this), K_(type), K_(dag_id), "dag_record_cnt", dag_record_map_.size(),
|
||||||
|
K_(is_stopped), K_(is_cancel), KP_(allocator));
|
||||||
J_OBJ_END();
|
J_OBJ_END();
|
||||||
}
|
}
|
||||||
return pos;
|
return pos;
|
||||||
@ -1072,6 +1089,12 @@ bool ObIDagNet::is_cancel()
|
|||||||
return is_cancel_;
|
return is_cancel_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ObIDagNet::is_inited()
|
||||||
|
{
|
||||||
|
return OB_NOT_NULL(allocator_);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void ObIDagNet::gene_dag_info(ObDagInfo &info, const char *list_info)
|
void ObIDagNet::gene_dag_info(ObDagInfo &info, const char *list_info)
|
||||||
{
|
{
|
||||||
ObMutexGuard guard(lock_);
|
ObMutexGuard guard(lock_);
|
||||||
@ -1578,36 +1601,39 @@ void ObTenantDagScheduler::reload_config()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObTenantDagScheduler::init_allocator(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const lib::ObLabel &label,
|
||||||
|
lib::MemoryContext &mem_context)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ContextParam param;
|
||||||
|
param.set_mem_attr(tenant_id, label, common::ObCtxIds::DEFAULT_CTX_ID)
|
||||||
|
.set_ablock_size(lib::INTACT_MIDDLE_AOBJECT_SIZE)
|
||||||
|
.set_properties(ALLOC_THREAD_SAFE)
|
||||||
|
.set_parallel(8);
|
||||||
|
if (OB_FAIL(ROOT_CONTEXT->CREATE_CONTEXT(mem_context, param))) {
|
||||||
|
COMMON_LOG(WARN, "fail to create entity", K(ret));
|
||||||
|
} else if (nullptr == mem_context) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
COMMON_LOG(WARN, "memory entity is null", K(ret));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObTenantDagScheduler::init(
|
int ObTenantDagScheduler::init(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const int64_t check_period /* =DEFAULT_CHECK_PERIOD */,
|
const int64_t check_period /* =DEFAULT_CHECK_PERIOD */,
|
||||||
const int64_t loop_waiting_list_period /* = LOOP_WAITING_DAG_LIST_INTERVAL*/,
|
const int64_t loop_waiting_list_period /* = LOOP_WAITING_DAG_LIST_INTERVAL*/,
|
||||||
const int64_t dag_limit /*= DEFAULT_MAX_DAG_NUM*/,
|
const int64_t dag_limit /*= DEFAULT_MAX_DAG_NUM*/)
|
||||||
const int64_t total_mem_limit /*= TOTAL_LIMIT*/,
|
|
||||||
const int64_t hold_mem_limit /*= HOLD_LIMIT*/,
|
|
||||||
const int64_t page_size /*= PAGE_SIZE*/)
|
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
const lib::ObMemAttr mem_attr(tenant_id, ObModIds::OB_SCHEDULER);
|
|
||||||
const lib::ObMemAttr ha_mem_attr(tenant_id, "HAScheduler");
|
|
||||||
if (IS_INIT) {
|
if (IS_INIT) {
|
||||||
ret = OB_INIT_TWICE;
|
ret = OB_INIT_TWICE;
|
||||||
COMMON_LOG(WARN, "scheduler init twice", K(ret));
|
COMMON_LOG(WARN, "scheduler init twice", K(ret));
|
||||||
} else if (OB_INVALID_ID == tenant_id
|
} else if (OB_INVALID_ID == tenant_id || 0 >= dag_limit) {
|
||||||
|| 0 >= dag_limit
|
|
||||||
|| 0 >= total_mem_limit || 0 >= hold_mem_limit
|
|
||||||
|| hold_mem_limit > total_mem_limit || 0 >= page_size) {
|
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
COMMON_LOG(WARN, "init ObTenantDagScheduler with invalid arguments", K(ret), K(tenant_id), K(dag_limit),
|
COMMON_LOG(WARN, "init ObTenantDagScheduler with invalid arguments", K(ret), K(tenant_id), K(dag_limit));
|
||||||
K(total_mem_limit), K(hold_mem_limit), K(page_size));
|
|
||||||
} else if (OB_FAIL(allocator_.init(lib::ObMallocAllocator::get_instance(), page_size,
|
|
||||||
mem_attr, 0, hold_mem_limit, total_mem_limit))) {
|
|
||||||
COMMON_LOG(WARN, "failed to init allocator", K(ret), K(total_mem_limit), K(hold_mem_limit),
|
|
||||||
K(page_size));
|
|
||||||
} else if (OB_FAIL(ha_allocator_.init(lib::ObMallocAllocator::get_instance(), page_size,
|
|
||||||
ha_mem_attr, 0, hold_mem_limit, total_mem_limit))) {
|
|
||||||
COMMON_LOG(WARN, "failed to init allocator", K(ret), K(total_mem_limit), K(hold_mem_limit),
|
|
||||||
K(page_size));
|
|
||||||
} else if (OB_FAIL(dag_map_.create(dag_limit, "DagMap", "DagNode", tenant_id))) {
|
} else if (OB_FAIL(dag_map_.create(dag_limit, "DagMap", "DagNode", tenant_id))) {
|
||||||
COMMON_LOG(WARN, "failed to create dap map", K(ret), K(dag_limit));
|
COMMON_LOG(WARN, "failed to create dap map", K(ret), K(dag_limit));
|
||||||
} else if (OB_FAIL(dag_net_map_[RUNNING_DAG_NET_MAP].create(dag_limit, "DagNetMap", "DagNetNode", tenant_id))) {
|
} else if (OB_FAIL(dag_net_map_[RUNNING_DAG_NET_MAP].create(dag_limit, "DagNetMap", "DagNetNode", tenant_id))) {
|
||||||
@ -1616,6 +1642,10 @@ int ObTenantDagScheduler::init(
|
|||||||
COMMON_LOG(WARN, "failed to create dap net id map", K(ret), K(dag_limit));
|
COMMON_LOG(WARN, "failed to create dap net id map", K(ret), K(dag_limit));
|
||||||
} else if (OB_FAIL(scheduler_sync_.init(ObWaitEventIds::SCHEDULER_COND_WAIT))) {
|
} else if (OB_FAIL(scheduler_sync_.init(ObWaitEventIds::SCHEDULER_COND_WAIT))) {
|
||||||
COMMON_LOG(WARN, "failed to init task queue sync", K(ret));
|
COMMON_LOG(WARN, "failed to init task queue sync", K(ret));
|
||||||
|
} else if (OB_FAIL(init_allocator(tenant_id, ObModIds::OB_SCHEDULER, mem_context_))) {
|
||||||
|
COMMON_LOG(WARN, "failed to init scheduler allocator", K(ret));
|
||||||
|
} else if (OB_FAIL(init_allocator(tenant_id, "HAScheduler", ha_mem_context_))) {
|
||||||
|
COMMON_LOG(WARN, "failed to init ha scheduler allocator", K(ret));
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
check_period_ = check_period;
|
check_period_ = check_period;
|
||||||
@ -1707,22 +1737,25 @@ void ObTenantDagScheduler::reset()
|
|||||||
}
|
}
|
||||||
if (dag_net_map_[RUNNING_DAG_NET_MAP].created()) {
|
if (dag_net_map_[RUNNING_DAG_NET_MAP].created()) {
|
||||||
for (DagNetMap::iterator iter = dag_net_map_[RUNNING_DAG_NET_MAP].begin(); iter != dag_net_map_[RUNNING_DAG_NET_MAP].end(); ++iter) {
|
for (DagNetMap::iterator iter = dag_net_map_[RUNNING_DAG_NET_MAP].begin(); iter != dag_net_map_[RUNNING_DAG_NET_MAP].end(); ++iter) {
|
||||||
const bool ha_dag_net = iter->second->is_ha_dag_net();
|
ObIAllocator &allocator = get_allocator(iter->second->is_ha_dag_net());
|
||||||
iter->second->~ObIDagNet();
|
iter->second->~ObIDagNet();
|
||||||
if (ha_dag_net) {
|
allocator.free(iter->second);
|
||||||
ha_allocator_.free(iter->second);
|
|
||||||
} else {
|
|
||||||
allocator_.free(iter->second);
|
|
||||||
}
|
|
||||||
} // end of for
|
} // end of for
|
||||||
dag_net_map_[RUNNING_DAG_NET_MAP].destroy();
|
dag_net_map_[RUNNING_DAG_NET_MAP].destroy();
|
||||||
}
|
}
|
||||||
if (dag_net_id_map_.created()) {
|
if (dag_net_id_map_.created()) {
|
||||||
dag_net_id_map_.destroy();
|
dag_net_id_map_.destroy();
|
||||||
}
|
}
|
||||||
COMMON_LOG(INFO, "ObTenantDagScheduler before allocator destroyed", K(abort_dag_cnt), K(allocator_.used()), K(ha_allocator_.used()));
|
COMMON_LOG(INFO, "ObTenantDagScheduler before allocator destroyed", K(abort_dag_cnt));
|
||||||
allocator_.reset();
|
// there will be 'HAS UNFREE PTR' log with label when some ptrs haven't been free
|
||||||
ha_allocator_.reset();
|
if (NULL != mem_context_) {
|
||||||
|
DESTROY_CONTEXT(mem_context_);
|
||||||
|
mem_context_ = nullptr;
|
||||||
|
}
|
||||||
|
if (NULL != ha_mem_context_) {
|
||||||
|
DESTROY_CONTEXT(ha_mem_context_);
|
||||||
|
ha_mem_context_ = nullptr;
|
||||||
|
}
|
||||||
scheduler_sync_.destroy();
|
scheduler_sync_.destroy();
|
||||||
dag_cnt_ = 0;
|
dag_cnt_ = 0;
|
||||||
dag_limit_ = 0;
|
dag_limit_ = 0;
|
||||||
@ -1775,13 +1808,9 @@ void ObTenantDagScheduler::inner_free_dag(ObIDag &dag)
|
|||||||
if (OB_UNLIKELY(nullptr != dag.prev_ || nullptr != dag.next_)) {
|
if (OB_UNLIKELY(nullptr != dag.prev_ || nullptr != dag.next_)) {
|
||||||
LOG_ERROR_RET(OB_ERR_UNEXPECTED, "dag is in dag_list", K(dag), K(dag.prev_), K(dag.next_));
|
LOG_ERROR_RET(OB_ERR_UNEXPECTED, "dag is in dag_list", K(dag), K(dag.prev_), K(dag.next_));
|
||||||
}
|
}
|
||||||
const bool ha_dag = dag.is_ha_dag();
|
ObIAllocator &allocator = get_allocator(dag.is_ha_dag());
|
||||||
dag.~ObIDag();
|
dag.~ObIDag();
|
||||||
if (ha_dag) {
|
allocator.free(&dag);
|
||||||
ha_allocator_.free(&dag);
|
|
||||||
} else {
|
|
||||||
allocator_.free(&dag);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObTenantDagScheduler::get_default_config()
|
void ObTenantDagScheduler::get_default_config()
|
||||||
@ -2002,6 +2031,15 @@ int ObTenantDagScheduler::gene_basic_info(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
common::ObIAllocator &ObTenantDagScheduler::get_allocator(const bool is_ha)
|
||||||
|
{
|
||||||
|
common::ObIAllocator *allocator = &mem_context_->get_malloc_allocator();
|
||||||
|
if (is_ha) {
|
||||||
|
allocator = &ha_mem_context_->get_malloc_allocator();
|
||||||
|
}
|
||||||
|
return *allocator;
|
||||||
|
}
|
||||||
|
|
||||||
int ObTenantDagScheduler::get_all_dag_scheduler_info(
|
int ObTenantDagScheduler::get_all_dag_scheduler_info(
|
||||||
common::ObIAllocator &allocator,
|
common::ObIAllocator &allocator,
|
||||||
common::ObIArray<void *> &scheduler_infos)
|
common::ObIArray<void *> &scheduler_infos)
|
||||||
|
@ -14,8 +14,6 @@
|
|||||||
#define SRC_SHARE_SCHEDULER_OB_DAG_SCHEDULER_H_
|
#define SRC_SHARE_SCHEDULER_OB_DAG_SCHEDULER_H_
|
||||||
|
|
||||||
#include "lib/ob_define.h"
|
#include "lib/ob_define.h"
|
||||||
#include "lib/allocator/ob_concurrent_fifo_allocator.h"
|
|
||||||
#include "lib/allocator/ob_fifo_allocator.h"
|
|
||||||
#include "lib/container/ob_se_array.h"
|
#include "lib/container/ob_se_array.h"
|
||||||
#include "lib/hash/ob_hashmap.h"
|
#include "lib/hash/ob_hashmap.h"
|
||||||
#include "lib/list/ob_dlink_node.h"
|
#include "lib/list/ob_dlink_node.h"
|
||||||
@ -376,9 +374,6 @@ private:
|
|||||||
typedef common::ObDList<ObITask> TaskList;
|
typedef common::ObDList<ObITask> TaskList;
|
||||||
typedef lib::ObLockGuard<ObIDag> ObDagGuard;
|
typedef lib::ObLockGuard<ObIDag> ObDagGuard;
|
||||||
static const int64_t DEFAULT_TASK_NUM = 32;
|
static const int64_t DEFAULT_TASK_NUM = 32;
|
||||||
static const int64_t TOTAL_LIMIT = 1024L * 1024L * 1024L;
|
|
||||||
static const int64_t HOLD_LIMIT = 8 * 1024L * 1024L;
|
|
||||||
static const int64_t PAGE_SIZE = common::OB_MALLOC_NORMAL_BLOCK_SIZE;
|
|
||||||
friend class ObTenantDagScheduler;
|
friend class ObTenantDagScheduler;
|
||||||
friend class ObITask;
|
friend class ObITask;
|
||||||
friend class ObTenantDagWorker;
|
friend class ObTenantDagWorker;
|
||||||
@ -388,9 +383,7 @@ private:
|
|||||||
int finish_task(ObITask &task);
|
int finish_task(ObITask &task);
|
||||||
bool is_valid();
|
bool is_valid();
|
||||||
bool is_valid_type() const;
|
bool is_valid_type() const;
|
||||||
int basic_init(const int64_t total = TOTAL_LIMIT,
|
int basic_init(ObIAllocator &allocator);
|
||||||
const int64_t hold = HOLD_LIMIT,
|
|
||||||
const int64_t page_size = PAGE_SIZE);
|
|
||||||
void reset_task_running_status(ObITask &task, ObITask::ObITaskStatus task_status);
|
void reset_task_running_status(ObITask &task, ObITask::ObITaskStatus task_status);
|
||||||
void reset();
|
void reset();
|
||||||
void clear_task_list();
|
void clear_task_list();
|
||||||
@ -407,7 +400,7 @@ private:
|
|||||||
void dec_running_task_cnt() { --running_task_cnt_; }
|
void dec_running_task_cnt() { --running_task_cnt_; }
|
||||||
int inner_add_child_without_inheritance(ObIDag &child);
|
int inner_add_child_without_inheritance(ObIDag &child);
|
||||||
private:
|
private:
|
||||||
common::ObFIFOAllocator allocator_;
|
common::ObIAllocator *allocator_;
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
ObDagType::ObDagTypeEnum type_;
|
ObDagType::ObDagTypeEnum type_;
|
||||||
ObDagPrio::ObDagPrioEnum priority_;
|
ObDagPrio::ObDagPrioEnum priority_;
|
||||||
@ -460,6 +453,7 @@ public:
|
|||||||
ObIDagNet::reset();
|
ObIDagNet::reset();
|
||||||
}
|
}
|
||||||
void reset();
|
void reset();
|
||||||
|
int basic_init(ObIAllocator &allocator);
|
||||||
ObDagNetType::ObDagNetTypeEnum get_type() const { return type_; }
|
ObDagNetType::ObDagNetTypeEnum get_type() const { return type_; }
|
||||||
int add_dag_into_dag_net(ObIDag &dag);
|
int add_dag_into_dag_net(ObIDag &dag);
|
||||||
bool check_finished_and_mark_stop();
|
bool check_finished_and_mark_stop();
|
||||||
@ -481,6 +475,7 @@ public:
|
|||||||
}
|
}
|
||||||
void set_cancel();
|
void set_cancel();
|
||||||
bool is_cancel();
|
bool is_cancel();
|
||||||
|
bool is_inited();
|
||||||
virtual int deal_with_cancel()
|
virtual int deal_with_cancel()
|
||||||
{
|
{
|
||||||
return OB_SUCCESS;
|
return OB_SUCCESS;
|
||||||
@ -505,7 +500,7 @@ private:
|
|||||||
private:
|
private:
|
||||||
bool is_stopped_;
|
bool is_stopped_;
|
||||||
lib::ObMutex lock_;
|
lib::ObMutex lock_;
|
||||||
ObArenaAllocator allocator_; // use to alloc dag in dag_net later
|
common::ObIAllocator *allocator_; // use to alloc dag in dag_net later
|
||||||
ObDagNetType::ObDagNetTypeEnum type_;
|
ObDagNetType::ObDagNetTypeEnum type_;
|
||||||
int64_t add_time_;
|
int64_t add_time_;
|
||||||
int64_t start_time_;
|
int64_t start_time_;
|
||||||
@ -772,10 +767,7 @@ public:
|
|||||||
int init(const uint64_t tenant_id,
|
int init(const uint64_t tenant_id,
|
||||||
const int64_t check_period = DEFAULT_CHECK_PERIOD,
|
const int64_t check_period = DEFAULT_CHECK_PERIOD,
|
||||||
const int64_t loop_waiting_list_period = LOOP_WAITING_DAG_LIST_INTERVAL,
|
const int64_t loop_waiting_list_period = LOOP_WAITING_DAG_LIST_INTERVAL,
|
||||||
const int64_t dag_limit = DEFAULT_MAX_DAG_NUM,
|
const int64_t dag_limit = DEFAULT_MAX_DAG_NUM);
|
||||||
const int64_t total_mem_limit = TOTAL_LIMIT,
|
|
||||||
const int64_t hold_mem_limit = HOLD_LIMIT,
|
|
||||||
const int64_t page_size = PAGE_SIZE);
|
|
||||||
int add_dag(ObIDag *dag, const bool emergency = false, const bool check_size_overflow = true);
|
int add_dag(ObIDag *dag, const bool emergency = false, const bool check_size_overflow = true);
|
||||||
int add_dag_net(ObIDagNet *dag_net);
|
int add_dag_net(ObIDagNet *dag_net);
|
||||||
template<typename T>
|
template<typename T>
|
||||||
@ -861,9 +853,6 @@ private:
|
|||||||
|
|
||||||
static const int64_t TMP_WEIGHT = 5;
|
static const int64_t TMP_WEIGHT = 5;
|
||||||
static const int64_t SCHEDULER_WAIT_TIME_MS = 1000; // 1s
|
static const int64_t SCHEDULER_WAIT_TIME_MS = 1000; // 1s
|
||||||
static const int64_t TOTAL_LIMIT = 1024L * 1024L * 1024L;
|
|
||||||
static const int64_t HOLD_LIMIT = 8 * 1024L * 1024L;
|
|
||||||
static const int64_t PAGE_SIZE = common::OB_MALLOC_NORMAL_BLOCK_SIZE;
|
|
||||||
static const int64_t DAG_SIZE_LIMIT = 10 << 12;
|
static const int64_t DAG_SIZE_LIMIT = 10 << 12;
|
||||||
static const int64_t DEFAULT_MAX_DAG_NUM = 15000;
|
static const int64_t DEFAULT_MAX_DAG_NUM = 15000;
|
||||||
static const int64_t DEFAULT_MAX_DAG_MAP_CNT = 150000;
|
static const int64_t DEFAULT_MAX_DAG_MAP_CNT = 150000;
|
||||||
@ -937,6 +926,8 @@ private:
|
|||||||
int try_move_child_to_ready_list(ObIDag &dag);
|
int try_move_child_to_ready_list(ObIDag &dag);
|
||||||
void inner_free_dag(ObIDag &dag);
|
void inner_free_dag(ObIDag &dag);
|
||||||
OB_INLINE int64_t get_dag_limit(const ObDagPrio::ObDagPrioEnum dag_prio);
|
OB_INLINE int64_t get_dag_limit(const ObDagPrio::ObDagPrioEnum dag_prio);
|
||||||
|
common::ObIAllocator &get_allocator(const bool is_ha);
|
||||||
|
int init_allocator(const uint64_t tenant_id, const lib::ObLabel &label, lib::MemoryContext &mem_context);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
@ -962,8 +953,8 @@ private:
|
|||||||
int64_t scheduled_task_cnts_[ObDagType::DAG_TYPE_MAX]; // interval scheduled dag count
|
int64_t scheduled_task_cnts_[ObDagType::DAG_TYPE_MAX]; // interval scheduled dag count
|
||||||
int64_t dag_cnts_[ObDagType::DAG_TYPE_MAX];
|
int64_t dag_cnts_[ObDagType::DAG_TYPE_MAX];
|
||||||
int64_t dag_net_cnts_[ObDagNetType::DAG_NET_TYPE_MAX];
|
int64_t dag_net_cnts_[ObDagNetType::DAG_NET_TYPE_MAX];
|
||||||
common::ObFIFOAllocator allocator_;
|
lib::MemoryContext mem_context_;
|
||||||
common::ObFIFOAllocator ha_allocator_;
|
lib::MemoryContext ha_mem_context_;
|
||||||
PriorityWorkerList waiting_workers_; // workers waiting for time slice to run
|
PriorityWorkerList waiting_workers_; // workers waiting for time slice to run
|
||||||
PriorityWorkerList running_workers_; // running workers
|
PriorityWorkerList running_workers_; // running workers
|
||||||
WorkerList free_workers_; // free workers who have not been assigned to any task
|
WorkerList free_workers_; // free workers who have not been assigned to any task
|
||||||
@ -982,7 +973,7 @@ int ObIDag::alloc_task(T *&task)
|
|||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = common::OB_NOT_INIT;
|
ret = common::OB_NOT_INIT;
|
||||||
COMMON_LOG(WARN, "dag is not inited", K(ret));
|
COMMON_LOG(WARN, "dag is not inited", K(ret));
|
||||||
} else if (NULL == (buf = allocator_.alloc(sizeof(T)))) {
|
} else if (NULL == (buf = allocator_->alloc(sizeof(T)))) {
|
||||||
ret = common::OB_ALLOCATE_MEMORY_FAILED;
|
ret = common::OB_ALLOCATE_MEMORY_FAILED;
|
||||||
task = NULL;
|
task = NULL;
|
||||||
COMMON_LOG(WARN, "failed to alloc task", K(ret));
|
COMMON_LOG(WARN, "failed to alloc task", K(ret));
|
||||||
@ -1018,13 +1009,13 @@ int ObTenantDagScheduler::alloc_dag(T *&dag)
|
|||||||
COMMON_LOG(WARN, "Dag Object is too large", K(ret), K(sizeof(T)));
|
COMMON_LOG(WARN, "Dag Object is too large", K(ret), K(sizeof(T)));
|
||||||
} else {
|
} else {
|
||||||
T tmp_dag;
|
T tmp_dag;
|
||||||
common::ObFIFOAllocator *allocator = tmp_dag.is_ha_dag() ? &ha_allocator_ : &allocator_;
|
ObIAllocator &allocator = get_allocator(tmp_dag.is_ha_dag());
|
||||||
if (NULL == (buf = allocator->alloc(sizeof(T)))) {
|
if (NULL == (buf = allocator.alloc(sizeof(T)))) {
|
||||||
ret = common::OB_ALLOCATE_MEMORY_FAILED;
|
ret = common::OB_ALLOCATE_MEMORY_FAILED;
|
||||||
COMMON_LOG(WARN, "failed to alloc dag", K(ret));
|
COMMON_LOG(WARN, "failed to alloc dag", K(ret));
|
||||||
} else {
|
} else {
|
||||||
ObIDag *new_dag = new (buf) T();
|
ObIDag *new_dag = new (buf) T();
|
||||||
if (OB_FAIL(new_dag->basic_init())) {
|
if (OB_FAIL(new_dag->basic_init(allocator))) {
|
||||||
COMMON_LOG(WARN, "failed to init dag", K(ret));
|
COMMON_LOG(WARN, "failed to init dag", K(ret));
|
||||||
|
|
||||||
// failed to init, free dag
|
// failed to init, free dag
|
||||||
@ -1042,13 +1033,9 @@ template<typename T>
|
|||||||
void ObTenantDagScheduler::free_dag_net(T *&dag_net)
|
void ObTenantDagScheduler::free_dag_net(T *&dag_net)
|
||||||
{
|
{
|
||||||
if (OB_NOT_NULL(dag_net)) {
|
if (OB_NOT_NULL(dag_net)) {
|
||||||
const bool ha_dag_net = dag_net->is_ha_dag_net();
|
ObIAllocator &allocator = get_allocator(dag_net->is_ha_dag_net());
|
||||||
dag_net->~T();
|
dag_net->~T();
|
||||||
if (ha_dag_net) {
|
allocator.free(dag_net);
|
||||||
ha_allocator_.free(dag_net);
|
|
||||||
} else {
|
|
||||||
allocator_.free(dag_net);
|
|
||||||
}
|
|
||||||
dag_net = nullptr;
|
dag_net = nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1065,13 +1052,14 @@ int ObTenantDagScheduler::create_and_add_dag_net(const ObIDagInitParam *param)
|
|||||||
COMMON_LOG(WARN, "scheduler is not init", K(ret));
|
COMMON_LOG(WARN, "scheduler is not init", K(ret));
|
||||||
} else {
|
} else {
|
||||||
T tmp_dag_net;
|
T tmp_dag_net;
|
||||||
common::ObFIFOAllocator *allocator = tmp_dag_net.is_ha_dag_net() ? &ha_allocator_ : &allocator_;
|
ObIAllocator &allocator = get_allocator(tmp_dag_net.is_ha_dag_net());
|
||||||
if (NULL == (buf = allocator->alloc(sizeof(T)))) {
|
if (NULL == (buf = allocator.alloc(sizeof(T)))) {
|
||||||
ret = common::OB_ALLOCATE_MEMORY_FAILED;
|
ret = common::OB_ALLOCATE_MEMORY_FAILED;
|
||||||
COMMON_LOG(WARN, "failed to alloc dag_net", K(ret));
|
COMMON_LOG(WARN, "failed to alloc dag_net", K(ret));
|
||||||
} else if (FALSE_IT(dag_net = new (buf) T())) {
|
} else if (FALSE_IT(dag_net = new (buf) T())) {
|
||||||
} else if (OB_FAIL(dag_net->init_by_param(param))) {
|
} else if (OB_FAIL(dag_net->init_by_param(param))) {
|
||||||
COMMON_LOG(WARN, "failed to init dag_net", K(ret), KPC(dag_net));
|
COMMON_LOG(WARN, "failed to init dag_net", K(ret), KPC(dag_net));
|
||||||
|
} else if (FALSE_IT(dag_net->basic_init(allocator))) {
|
||||||
} else if (FALSE_IT(dag_net->init_dag_id_())) {
|
} else if (FALSE_IT(dag_net->init_dag_id_())) {
|
||||||
} else if (OB_FAIL(add_dag_net(dag_net))) {
|
} else if (OB_FAIL(add_dag_net(dag_net))) {
|
||||||
if (common::OB_HASH_EXIST == ret) {
|
if (common::OB_HASH_EXIST == ret) {
|
||||||
|
@ -851,13 +851,6 @@ TEST_F(TestDagScheduler, test_init)
|
|||||||
|
|
||||||
// invalid thread cnt
|
// invalid thread cnt
|
||||||
EXPECT_EQ(OB_INVALID_ARGUMENT, scheduler->init(MTL_ID(), time_slice, time_slice, -1));
|
EXPECT_EQ(OB_INVALID_ARGUMENT, scheduler->init(MTL_ID(), time_slice, time_slice, -1));
|
||||||
// invalid dag_limit
|
|
||||||
EXPECT_EQ(OB_INVALID_ARGUMENT, scheduler->init(MTL_ID(), time_slice, time_slice, 0, 0));
|
|
||||||
// invalid total_mem_limit
|
|
||||||
EXPECT_EQ(OB_INVALID_ARGUMENT, scheduler->init(MTL_ID(), time_slice, time_slice, 0, 10, 0));
|
|
||||||
// invalid hold_mem_limit
|
|
||||||
EXPECT_EQ(OB_INVALID_ARGUMENT, scheduler->init(MTL_ID(), time_slice, time_slice, 0, 10, 1024, 0));
|
|
||||||
EXPECT_EQ(OB_INVALID_ARGUMENT, scheduler->init(MTL_ID(), time_slice, time_slice, 0, 10, 1024, 2048));
|
|
||||||
|
|
||||||
EXPECT_EQ(OB_SUCCESS, scheduler->init(MTL_ID(),time_slice, time_slice, 100));
|
EXPECT_EQ(OB_SUCCESS, scheduler->init(MTL_ID(),time_slice, time_slice, 100));
|
||||||
EXPECT_EQ(OB_INIT_TWICE, scheduler->init(MTL_ID()));
|
EXPECT_EQ(OB_INIT_TWICE, scheduler->init(MTL_ID()));
|
||||||
|
Reference in New Issue
Block a user