fix core in get_next_ready_task
This commit is contained in:
@ -671,14 +671,14 @@ int ObIDag::basic_init(const int64_t total,
|
|||||||
const int64_t page_size)
|
const int64_t page_size)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
const lib::ObMemAttr mem_attr(MTL_ID(), ObModIds::OB_SCHEDULER);
|
||||||
if (is_inited_) {
|
if (is_inited_) {
|
||||||
ret = OB_INIT_TWICE;
|
ret = OB_INIT_TWICE;
|
||||||
COMMON_LOG(WARN, "dag init twice", K(ret));
|
COMMON_LOG(WARN, "dag init twice", K(ret));
|
||||||
} else if (OB_FAIL(allocator_.init(total, hold, page_size))) {
|
} else if (OB_FAIL(allocator_.init(lib::ObMallocAllocator::get_instance(), page_size,
|
||||||
|
mem_attr, 0, hold, total))) {
|
||||||
COMMON_LOG(WARN, "failed to init allocator", K(ret), K(total), K(hold), K(page_size));
|
COMMON_LOG(WARN, "failed to init allocator", K(ret), K(total), K(hold), K(page_size));
|
||||||
} else {
|
} else {
|
||||||
allocator_.set_tenant_id(MTL_ID());
|
|
||||||
allocator_.set_label(ObModIds::OB_SCHEDULER);
|
|
||||||
is_inited_ = true;
|
is_inited_ = true;
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -1524,6 +1524,7 @@ int ObTenantDagScheduler::init(
|
|||||||
const int64_t page_size /*= PAGE_SIZE*/)
|
const int64_t page_size /*= PAGE_SIZE*/)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
const lib::ObMemAttr mem_attr(tenant_id, ObModIds::OB_SCHEDULER);
|
||||||
if (IS_INIT) {
|
if (IS_INIT) {
|
||||||
ret = OB_INIT_TWICE;
|
ret = OB_INIT_TWICE;
|
||||||
COMMON_LOG(WARN, "scheduler init twice", K(ret));
|
COMMON_LOG(WARN, "scheduler init twice", K(ret));
|
||||||
@ -1534,7 +1535,8 @@ int ObTenantDagScheduler::init(
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
COMMON_LOG(WARN, "init ObTenantDagScheduler with invalid arguments", K(ret), K(tenant_id), K(dag_limit),
|
COMMON_LOG(WARN, "init ObTenantDagScheduler with invalid arguments", K(ret), K(tenant_id), K(dag_limit),
|
||||||
K(total_mem_limit), K(hold_mem_limit), K(page_size));
|
K(total_mem_limit), K(hold_mem_limit), K(page_size));
|
||||||
} else if (OB_FAIL(allocator_.init(total_mem_limit, hold_mem_limit, page_size))) {
|
} else if (OB_FAIL(allocator_.init(lib::ObMallocAllocator::get_instance(), page_size,
|
||||||
|
mem_attr, 0, hold_mem_limit, total_mem_limit))) {
|
||||||
COMMON_LOG(WARN, "failed to init allocator", K(ret), K(total_mem_limit), K(hold_mem_limit),
|
COMMON_LOG(WARN, "failed to init allocator", K(ret), K(total_mem_limit), K(hold_mem_limit),
|
||||||
K(page_size));
|
K(page_size));
|
||||||
} else if (OB_FAIL(dag_map_.create(dag_limit, "DagMap", "DagNode", tenant_id))) {
|
} else if (OB_FAIL(dag_map_.create(dag_limit, "DagMap", "DagNode", tenant_id))) {
|
||||||
@ -1549,8 +1551,6 @@ int ObTenantDagScheduler::init(
|
|||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
check_period_ = check_period;
|
check_period_ = check_period;
|
||||||
loop_waiting_dag_list_period_ = loop_waiting_list_period;
|
loop_waiting_dag_list_period_ = loop_waiting_list_period;
|
||||||
allocator_.set_tenant_id(tenant_id);
|
|
||||||
allocator_.set_label(ObModIds::OB_SCHEDULER);
|
|
||||||
MEMSET(dag_cnts_, 0, sizeof(dag_cnts_));
|
MEMSET(dag_cnts_, 0, sizeof(dag_cnts_));
|
||||||
MEMSET(dag_net_cnts_, 0, sizeof(dag_net_cnts_));
|
MEMSET(dag_net_cnts_, 0, sizeof(dag_net_cnts_));
|
||||||
MEMSET(running_task_cnts_, 0, sizeof(running_task_cnts_));
|
MEMSET(running_task_cnts_, 0, sizeof(running_task_cnts_));
|
||||||
@ -1626,7 +1626,7 @@ void ObTenantDagScheduler::destroy()
|
|||||||
dag_net_id_map_.destroy();
|
dag_net_id_map_.destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
allocator_.destroy();
|
allocator_.reset();
|
||||||
scheduler_sync_.destroy();
|
scheduler_sync_.destroy();
|
||||||
dag_cnt_ = 0;
|
dag_cnt_ = 0;
|
||||||
dag_limit_ = 0;
|
dag_limit_ = 0;
|
||||||
|
|||||||
@ -391,7 +391,7 @@ private:
|
|||||||
void dec_running_task_cnt() { --running_task_cnt_; }
|
void dec_running_task_cnt() { --running_task_cnt_; }
|
||||||
int inner_add_child_without_inheritance(ObIDag &child);
|
int inner_add_child_without_inheritance(ObIDag &child);
|
||||||
private:
|
private:
|
||||||
common::ObConcurrentFIFOAllocator allocator_;
|
common::ObFIFOAllocator allocator_;
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
ObDagType::ObDagTypeEnum type_;
|
ObDagType::ObDagTypeEnum type_;
|
||||||
ObDagPrio::ObDagPrioEnum priority_;
|
ObDagPrio::ObDagPrioEnum priority_;
|
||||||
@ -929,7 +929,7 @@ private:
|
|||||||
int32_t up_limits_[ObDagPrio::DAG_PRIO_MAX]; // wait to delete
|
int32_t up_limits_[ObDagPrio::DAG_PRIO_MAX]; // wait to delete
|
||||||
int64_t dag_cnts_[ObDagType::DAG_TYPE_MAX];
|
int64_t dag_cnts_[ObDagType::DAG_TYPE_MAX];
|
||||||
int64_t dag_net_cnts_[ObDagNetType::DAG_NET_TYPE_MAX];
|
int64_t dag_net_cnts_[ObDagNetType::DAG_NET_TYPE_MAX];
|
||||||
common::ObConcurrentFIFOAllocator allocator_;
|
common::ObFIFOAllocator allocator_;
|
||||||
PriorityWorkerList waiting_workers_; // workers waiting for time slice to run
|
PriorityWorkerList waiting_workers_; // workers waiting for time slice to run
|
||||||
PriorityWorkerList running_workers_; // running workers
|
PriorityWorkerList running_workers_; // running workers
|
||||||
WorkerList free_workers_; // free workers who have not been assigned to any task
|
WorkerList free_workers_; // free workers who have not been assigned to any task
|
||||||
|
|||||||
Reference in New Issue
Block a user