[CP] [LogFetcher] Fix coredump cause by destroy LogRouteService before stopping its threads

This commit is contained in:
zxlzxlzxlzxlzxl 2024-04-29 11:43:31 +00:00 committed by ob-robot
parent efa62c5e5c
commit e6e35752b3
5 changed files with 119 additions and 65 deletions

View File

@ -73,6 +73,10 @@ ObLogFetcher::~ObLogFetcher()
destroy();
}
#ifdef ERRSIM
ERRSIM_POINT_DEF(LOG_FETCHER_LOG_EXT_HANDLER_INIT_FAIL);
ERRSIM_POINT_DEF(LOG_FETCHER_STREAM_WORKER_INIT_FAIL);
#endif
int ObLogFetcher::init(
const LogFetcherUser &log_fetcher_user,
const int64_t cluster_id,
@ -101,6 +105,8 @@ int ObLogFetcher::init(
} else {
cfg_ = &cfg;
// set self_tenant_id before suggest_cached_rpc_res_count
log_fetcher_user_ = log_fetcher_user;
fetching_mode_ = fetching_mode;
self_tenant_id_ = self_tenant_id;
// Before the LogFetcher module is initialized, the following configuration items need to be loaded
configure(cfg);
@ -112,7 +118,7 @@ int ObLogFetcher::init(
}
const common::ObRegion region(cfg.region.str());
if (is_integrated_fetching_mode(fetching_mode) && OB_FAIL(log_route_service_.init(
if (is_integrated_fetching_mode(fetching_mode_) && OB_FAIL(log_route_service_.init(
proxy,
region,
cluster_id,
@ -135,7 +141,11 @@ int ObLogFetcher::init(
LOG_ERROR("init progress controller fail", KR(ret));
} else if (OB_FAIL(large_buffer_pool_.init("ObLogFetcher", 1L * 1024 * 1024 * 1024))) {
LOG_ERROR("init large buffer pool failed", KR(ret));
} else if (is_direct_fetching_mode(fetching_mode) && OB_FAIL(log_ext_handler_.init())) {
#ifdef ERRSIM
} else if (is_direct_fetching_mode(fetching_mode_) && OB_FAIL(LOG_FETCHER_LOG_EXT_HANDLER_INIT_FAIL)) {
LOG_ERROR("ERRSIM: LOG_FETCHER_LOG_EXT_HANDLER_INIT_FAIL", KR(ret));
#endif
} else if (is_direct_fetching_mode(fetching_mode_) && OB_FAIL(log_ext_handler_.init())) {
LOG_ERROR("init failed", KR(ret));
} else if (OB_FAIL(ls_fetch_mgr_.init(
progress_controller_,
@ -147,16 +157,16 @@ int ObLogFetcher::init(
LOG_ERROR("init_self_addr_ fail", KR(ret));
} else if (OB_FAIL(rpc_.init(cluster_id, self_tenant_id, cfg.io_thread_num, cfg))) {
LOG_ERROR("init rpc handler fail", KR(ret));
} else if (is_cdc(log_fetcher_user) && OB_FAIL(start_lsn_locator_.init(
} else if (is_cdc(log_fetcher_user_) && OB_FAIL(start_lsn_locator_.init(
cfg.start_lsn_locator_thread_num,
cfg.start_lsn_locator_locate_count,
fetching_mode,
fetching_mode_,
archive_dest,
cfg,
rpc_, *err_handler))) {
LOG_ERROR("init start log id locator fail", KR(ret));
} else if (OB_FAIL(idle_pool_.init(
log_fetcher_user,
log_fetcher_user_,
cfg.idle_pool_thread_num,
cfg,
static_cast<void *>(this),
@ -169,6 +179,10 @@ int ObLogFetcher::init(
ls_fetch_mgr_,
*err_handler))) {
LOG_ERROR("init dead pool fail", KR(ret));
#ifdef ERRSIM
} else if (OB_FAIL(LOG_FETCHER_STREAM_WORKER_INIT_FAIL)) {
LOG_ERROR("ERRSIM: LOG_FETCHER_STREAM_WORKER_INIT_FAIL");
#endif
} else if (OB_FAIL(stream_worker_.init(cfg.stream_worker_thread_num,
cfg.timer_task_count_upper_limit,
static_cast<void *>(this),
@ -187,11 +201,9 @@ int ObLogFetcher::init(
progress_controller_,
log_handler))) {
} else {
log_fetcher_user_ = log_fetcher_user;
cluster_id_ = cluster_id;
source_tenant_id_ = source_tenant_id;
is_loading_data_dict_baseline_data_ = is_loading_data_dict_baseline_data;
fetching_mode_ = fetching_mode;
archive_dest_ = archive_dest;
paused_ = false;
@ -292,26 +304,24 @@ int ObLogFetcher::start()
void ObLogFetcher::stop()
{
if (OB_LIKELY(is_inited_)) {
stop_flag_ = true;
stop_flag_ = true;
LOG_INFO("LogFetcher stop begin");
stream_worker_.stop();
dead_pool_.stop();
idle_pool_.stop();
if (is_cdc(log_fetcher_user_)) {
start_lsn_locator_.stop();
}
if (is_integrated_fetching_mode(fetching_mode_)) {
log_route_service_.stop();
}
if (is_direct_fetching_mode(fetching_mode_)) {
log_ext_handler_.stop();
}
LOG_INFO("LogFetcher stop success");
LOG_INFO("LogFetcher stop begin");
stream_worker_.stop();
dead_pool_.stop();
idle_pool_.stop();
if (is_cdc(log_fetcher_user_)) {
start_lsn_locator_.stop();
}
if (is_integrated_fetching_mode(fetching_mode_)) {
log_route_service_.stop();
}
if (is_direct_fetching_mode(fetching_mode_)) {
log_ext_handler_.stop();
}
LOG_INFO("LogFetcher stop success");
}
void ObLogFetcher::pause()

View File

@ -112,6 +112,9 @@ int ObLogFetcherDeadPool::push(LSFetchCtx *task)
return ret;
}
#ifdef ERRSIM
ERRSIM_POINT_DEF(LOG_FETCHER_DEAD_POOL_START_FAIL);
#endif
int ObLogFetcherDeadPool::start()
{
int ret = OB_SUCCESS;
@ -119,6 +122,10 @@ int ObLogFetcherDeadPool::start()
if (OB_UNLIKELY(! inited_)) {
LOG_ERROR("not inited");
ret = OB_NOT_INIT;
#ifdef ERRSIM
} else if (OB_FAIL(LOG_FETCHER_DEAD_POOL_START_FAIL)) {
LOG_ERROR("ERRSIM: LOG_FETCHER_DEAD_POOL_START_FAIL");
#endif
} else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) {
LOG_WARN("TG_SET_HANDLER_AND_START failed", KR(ret), K(tg_id_));
} else {

View File

@ -125,6 +125,9 @@ int ObLogFetcherIdlePool::push(LSFetchCtx *task)
return ret;
}
#ifdef ERRSIM
ERRSIM_POINT_DEF(LOG_FETCHER_IDLE_POOL_START_FAIL);
#endif
int ObLogFetcherIdlePool::start()
{
int ret = OB_SUCCESS;
@ -132,6 +135,10 @@ int ObLogFetcherIdlePool::start()
if (OB_UNLIKELY(! inited_)) {
LOG_ERROR("not inited");
ret = OB_NOT_INIT;
#ifdef ERRSIM
} else if (OB_FAIL(LOG_FETCHER_IDLE_POOL_START_FAIL)) {
LOG_ERROR("ERRSIM: LOG_FETCHER_IDLE_POOL_START_FAIL");
#endif
} else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) {
LOG_WARN("TG_SET_HANDLER_AND_START failed", KR(ret), K(tg_id_));
} else {

View File

@ -121,6 +121,10 @@ void ObLSWorker::destroy()
LOG_INFO("destroy stream worker succ");
}
#ifdef ERRSIM
ERRSIM_POINT_DEF(LOG_FETCHER_LSW_TIMER_START_FAIL);
ERRSIM_POINT_DEF(LOG_FETCHER_LSW_HANDLER_START_FAIL);
#endif
int ObLSWorker::start()
{
int ret = OB_SUCCESS;
@ -128,8 +132,16 @@ int ObLSWorker::start()
if (OB_UNLIKELY(! inited_)) {
LOG_ERROR("not init", K(inited_));
ret = OB_NOT_INIT;
#ifdef ERRSIM
} else if (OB_FAIL(LOG_FETCHER_LSW_TIMER_START_FAIL)) {
LOG_ERROR("ERRSIM: LOG_FETCHER_LSW_TIMER_START_FAIL");
#endif
} else if (OB_FAIL(TG_START(timer_id_))) {
LOG_ERROR("start timer thread fail", KR(ret));
#ifdef ERRSIM
} else if (OB_FAIL(LOG_FETCHER_LSW_HANDLER_START_FAIL)) {
LOG_ERROR("ERRSIM: LOG_FETCHER_LSW_HANDLER_START_FAIL");
#endif
} else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) {
LOG_WARN("TG_SET_HANDLER_AND_START failed", KR(ret), K(tg_id_));
} else {

View File

@ -17,6 +17,7 @@
#include "share/rc/ob_tenant_base.h"
#include "share/ob_thread_mgr.h" // TG*
#include "ob_ls_log_stat_info.h" // LogStatRecordArray
#include "lib/utility/ob_tracepoint.h"
using namespace oceanbase::share;
using namespace oceanbase::common;
@ -58,6 +59,11 @@ ObLogRouteService::~ObLogRouteService()
destroy();
}
#ifdef ERRSIM
ERRSIM_POINT_DEF(LOG_ROUTE_TIMER_INIT_FAIL);
ERRSIM_POINT_DEF(LOG_ROUTE_HANDLER_INIT_FAIL);
ERRSIM_POINT_DEF(LOG_ROUTE_HANDLER_START_FAIL);
#endif
int ObLogRouteService::init(ObISQLClient *proxy,
const common::ObRegion &prefer_region,
const int64_t cluster_id,
@ -108,10 +114,22 @@ int ObLogRouteService::init(ObISQLClient *proxy,
all_server_cache_update_interval_sec, all_zone_cache_update_interval_sec))) {
LOG_WARN("all_svr_cache_ init failed", KR(ret), K(is_tenant_mode), K(prefer_region),
K(all_server_cache_update_interval_sec), K(all_zone_cache_update_interval_sec));
#ifdef ERRSIM
} else if (OB_FAIL(LOG_ROUTE_TIMER_INIT_FAIL)) {
LOG_ERROR("ERRSIM: LOG_ROUTE_TIMER_INIT_FAIL");
#endif
} else if (OB_FAIL(timer_.init("LogRouter"))) {
LOG_ERROR("fail to init itable gc timer", K(ret));
#ifdef ERRSIM
} else if (OB_FAIL(LOG_ROUTE_HANDLER_INIT_FAIL)) {
LOG_ERROR("ERRSIM: LOG_ROUTE_HANDLER_INIT_FAIL");
#endif
} else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::LogRouteService, tg_id_))) {
LOG_ERROR("TG_CREATE failed", KR(ret));
#ifdef ERRSIM
} else if (OB_FAIL(LOG_ROUTE_HANDLER_START_FAIL)) {
LOG_ERROR("ERRSIM: LOG_ROUTE_HANDLER_START_FAIL");
#endif
} else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) {
LOG_WARN("TG_SET_HANDLER_AND_START failed", KR(ret), K(tg_id_));
} else {
@ -170,11 +188,11 @@ void ObLogRouteService::stop()
void ObLogRouteService::wait()
{
if (IS_INIT) {
LOG_INFO("ObLogRouteService wait begin");
timer_.wait();
int64_t num = 0;
int ret = OB_SUCCESS;
LOG_INFO("ObLogRouteService wait begin");
int ret = OB_SUCCESS;
int64_t num = 0;
if (-1 != tg_id_) {
while (OB_SUCC(TG_GET_QUEUE_NUM(tg_id_, num)) && num > 0) {
PAUSE();
}
@ -183,47 +201,47 @@ void ObLogRouteService::wait()
}
TG_STOP(tg_id_);
TG_WAIT(tg_id_);
LOG_INFO("ObLogRouteService wait finish");
}
timer_.wait();
LOG_INFO("ObLogRouteService wait finish");
}
void ObLogRouteService::destroy()
{
if (IS_INIT) {
LOG_INFO("ObLogRouteService destroy begin");
timer_.destroy();
ls_route_timer_task_.destroy();
timer_id_ = -1;
if (-1 != tg_id_) {
TG_DESTROY(tg_id_);
tg_id_ = -1;
}
free_mem_();
ls_route_key_set_.destroy();
ls_router_map_.destroy();
systable_queryer_.destroy();
all_svr_cache_.destroy();
svr_blacklist_.destroy();
log_router_allocator_.destroy();
asyn_task_allocator_.destroy();
err_handler_ = NULL;
cluster_id_ = OB_INVALID_CLUSTER_ID;
self_tenant_id_ = OB_INVALID_TENANT_ID;
source_tenant_id_ = OB_INVALID_TENANT_ID;
background_refresh_time_sec_ = 0;
blacklist_survival_time_sec_ = 0;
blacklist_survival_time_upper_limit_min_ = 0;
blacklist_survival_time_penalty_period_min_ = 0;
blacklist_history_overdue_time_min_ = 0;
blacklist_history_clear_interval_min_ = 0;
is_tenant_mode_ = false;
is_inited_ = false;
LOG_INFO("ObLogRouteService destroy finish");
LOG_INFO("ObLogRouteService destroy begin");
timer_.destroy();
ls_route_timer_task_.destroy();
timer_id_ = -1;
if (-1 != tg_id_) {
TG_DESTROY(tg_id_);
tg_id_ = -1;
}
free_mem_();
ls_route_key_set_.destroy();
ls_router_map_.destroy();
systable_queryer_.destroy();
all_svr_cache_.destroy();
svr_blacklist_.destroy();
log_router_allocator_.destroy();
asyn_task_allocator_.destroy();
err_handler_ = NULL;
cluster_id_ = OB_INVALID_CLUSTER_ID;
self_tenant_id_ = OB_INVALID_TENANT_ID;
source_tenant_id_ = OB_INVALID_TENANT_ID;
background_refresh_time_sec_ = 0;
blacklist_survival_time_sec_ = 0;
blacklist_survival_time_upper_limit_min_ = 0;
blacklist_survival_time_penalty_period_min_ = 0;
blacklist_history_overdue_time_min_ = 0;
blacklist_history_clear_interval_min_ = 0;
is_tenant_mode_ = false;
is_inited_ = false;
LOG_INFO("ObLogRouteService destroy finish");
}
void ObLogRouteService::free_mem_()