fixed memory leak of ObLogExternalStorageHandler
This commit is contained in:
@ -54,9 +54,12 @@ int ObLogExternalStorageHandler::init()
|
|||||||
} else if (NULL == (handle_adapter_ = MTL_NEW(ObLogExternalStorageIOTaskHandleAdapter, "ObLogEXTHandler"))) {
|
} else if (NULL == (handle_adapter_ = MTL_NEW(ObLogExternalStorageIOTaskHandleAdapter, "ObLogEXTHandler"))) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
CLOG_LOG(WARN, "allocate memory failed");
|
CLOG_LOG(WARN, "allocate memory failed");
|
||||||
|
} else if (FALSE_IT(share::ObThreadPool::set_run_wrapper(MTL_CTX()))) {
|
||||||
|
} else if (OB_FAIL(ObSimpleThreadPool::init(1, CAPACITY_COEFFICIENT * 1, "ObLogEXTTP", MTL_ID()))) {
|
||||||
|
CLOG_LOG(WARN, "invalid argument", KPC(this));
|
||||||
} else {
|
} else {
|
||||||
concurrency_ = 0;
|
concurrency_ = 1;
|
||||||
capacity_ = 0;
|
capacity_ = CAPACITY_COEFFICIENT;
|
||||||
is_running_ = false;
|
is_running_ = false;
|
||||||
is_inited_ = true;
|
is_inited_ = true;
|
||||||
CLOG_LOG(INFO, "ObLogExternalStorageHandler inits successfully", KPC(this));
|
CLOG_LOG(INFO, "ObLogExternalStorageHandler inits successfully", KPC(this));
|
||||||
@ -79,14 +82,9 @@ int ObLogExternalStorageHandler::start(const int64_t concurrency)
|
|||||||
} else if (!is_valid_concurrency_(concurrency)) {
|
} else if (!is_valid_concurrency_(concurrency)) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
CLOG_LOG(WARN, "invalid argument", K(concurrency), KPC(this));
|
CLOG_LOG(WARN, "invalid argument", K(concurrency), KPC(this));
|
||||||
} else if (0 != concurrency
|
} else if (OB_FAIL(resize_(concurrency))) {
|
||||||
&& !FALSE_IT(share::ObThreadPool::set_run_wrapper(MTL_CTX()))
|
CLOG_LOG(WARN, "resize_ failed", K(concurrency), KPC(this));
|
||||||
&& OB_FAIL(ObSimpleThreadPool::init(
|
|
||||||
concurrency, CAPACITY_COEFFICIENT * concurrency, "ObLogEXTTP", MTL_ID()))) {
|
|
||||||
CLOG_LOG(WARN, "invalid argument", K(concurrency), KPC(this));
|
|
||||||
} else {
|
} else {
|
||||||
concurrency_ = concurrency;
|
|
||||||
capacity_ = CAPACITY_COEFFICIENT * concurrency;
|
|
||||||
is_running_ = true;
|
is_running_ = true;
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -112,9 +110,12 @@ void ObLogExternalStorageHandler::destroy()
|
|||||||
stop();
|
stop();
|
||||||
wait();
|
wait();
|
||||||
ObSimpleThreadPool::destroy();
|
ObSimpleThreadPool::destroy();
|
||||||
|
lib::ThreadPool::destroy();
|
||||||
concurrency_ = -1;
|
concurrency_ = -1;
|
||||||
capacity_ = -1;
|
capacity_ = -1;
|
||||||
|
if (OB_NOT_NULL(handle_adapter_)) {
|
||||||
MTL_DELETE(ObLogExternalStorageIOTaskHandleIAdapter, "ObLogEXTHandler", handle_adapter_);
|
MTL_DELETE(ObLogExternalStorageIOTaskHandleIAdapter, "ObLogEXTHandler", handle_adapter_);
|
||||||
|
}
|
||||||
handle_adapter_ = NULL;
|
handle_adapter_ = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -143,7 +144,12 @@ int ObLogExternalStorageHandler::resize(const int64_t new_concurrency,
|
|||||||
} else if (new_concurrency == concurrency_) {
|
} else if (new_concurrency == concurrency_) {
|
||||||
CLOG_LOG(TRACE, "no need resize", KPC(this), K(new_concurrency));
|
CLOG_LOG(TRACE, "no need resize", KPC(this), K(new_concurrency));
|
||||||
} else {
|
} else {
|
||||||
destroy_and_init_new_thread_pool_(new_concurrency);
|
do {
|
||||||
|
ret = resize_(new_concurrency);
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
usleep(DEFAULT_RETRY_INTERVAL);
|
||||||
|
}
|
||||||
|
} while (OB_FAIL(ret));
|
||||||
time_guard.click("after create new thread pool");
|
time_guard.click("after create new thread pool");
|
||||||
concurrency_ = new_concurrency;
|
concurrency_ = new_concurrency;
|
||||||
capacity_ = CAPACITY_COEFFICIENT * new_concurrency;
|
capacity_ = CAPACITY_COEFFICIENT * new_concurrency;
|
||||||
@ -304,6 +310,8 @@ int ObLogExternalStorageHandler::construct_async_tasks_and_push_them_into_thread
|
|||||||
// defense code, last_io_task must not be NULL.
|
// defense code, last_io_task must not be NULL.
|
||||||
if (NULL != last_io_task) {
|
if (NULL != last_io_task) {
|
||||||
(void)last_io_task->do_task();
|
(void)last_io_task->do_task();
|
||||||
|
MTL_DELETE(ObLogExternalStorageIOTask, "ObLogEXTHandler", last_io_task);
|
||||||
|
last_io_task = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -388,35 +396,20 @@ void ObLogExternalStorageHandler::push_async_task_into_thread_pool_(
|
|||||||
} while (OB_FAIL(ret));
|
} while (OB_FAIL(ret));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObLogExternalStorageHandler::destroy_and_init_new_thread_pool_(
|
int ObLogExternalStorageHandler::resize_(const int64_t new_concurrency)
|
||||||
const int64_t new_concurrency)
|
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObTimeGuard time_guard("resize impl", 10 * 1000);
|
ObTimeGuard time_guard("resize impl", 10 * 1000);
|
||||||
// destroy currenty thread pool
|
|
||||||
ObSimpleThreadPool::stop();
|
|
||||||
time_guard.click("stop old thread pool");
|
|
||||||
ObSimpleThreadPool::wait();
|
|
||||||
time_guard.click("wait old thread pool");
|
|
||||||
ObSimpleThreadPool::destroy();
|
|
||||||
time_guard.click("destroy old thread pool");
|
|
||||||
|
|
||||||
do {
|
if (OB_FAIL(ObSimpleThreadPool::set_thread_count(new_concurrency))) {
|
||||||
if (0 != new_concurrency
|
CLOG_LOG(WARN, "set_thread_count failed", K(new_concurrency), KPC(this));
|
||||||
&& !FALSE_IT(share::ObThreadPool::set_run_wrapper(MTL_CTX()))
|
|
||||||
&& OB_FAIL(ObSimpleThreadPool::init(
|
|
||||||
new_concurrency, CAPACITY_COEFFICIENT * new_concurrency, "ObLogEXTTP", MTL_ID()))) {
|
|
||||||
CLOG_LOG(WARN, "init ObSimpleThreadPool failed", K(new_concurrency), KPC(this));
|
|
||||||
} else {
|
} else {
|
||||||
|
CLOG_LOG_RET(INFO, OB_SUCCESS, "resize_ success", K(time_guard), KPC(this), K(new_concurrency));
|
||||||
concurrency_ = new_concurrency;
|
concurrency_ = new_concurrency;
|
||||||
capacity_ = CAPACITY_COEFFICIENT * new_concurrency;
|
capacity_ = CAPACITY_COEFFICIENT * new_concurrency;
|
||||||
}
|
}
|
||||||
if (OB_FAIL(ret)) {
|
time_guard.click("set thread count");
|
||||||
usleep(DEFAULT_RETRY_INTERVAL);
|
return ret;
|
||||||
}
|
|
||||||
} while (OB_FAIL(ret));
|
|
||||||
time_guard.click("creat enew thread pool");
|
|
||||||
CLOG_LOG_RET(WARN, OB_SUCCESS, "destroy_and_init_new_thread_pool_ success", K(time_guard), KPC(this));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ObLogExternalStorageHandler::check_need_resize_(const int64_t new_concurrency) const
|
bool ObLogExternalStorageHandler::check_need_resize_(const int64_t new_concurrency) const
|
||||||
|
|||||||
@ -156,7 +156,7 @@ private:
|
|||||||
|
|
||||||
void push_async_task_into_thread_pool_(ObLogExternalStorageIOTask *io_task);
|
void push_async_task_into_thread_pool_(ObLogExternalStorageIOTask *io_task);
|
||||||
|
|
||||||
void destroy_and_init_new_thread_pool_(const int64_t concurrency);
|
int resize_(const int64_t concurrency);
|
||||||
|
|
||||||
bool check_need_resize_(const int64_t concurrency) const;
|
bool check_need_resize_(const int64_t concurrency) const;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user