diff --git a/src/logservice/logfetcher/ob_log_fetcher.cpp b/src/logservice/logfetcher/ob_log_fetcher.cpp index 2f1b2b97b..099103675 100755 --- a/src/logservice/logfetcher/ob_log_fetcher.cpp +++ b/src/logservice/logfetcher/ob_log_fetcher.cpp @@ -73,6 +73,10 @@ ObLogFetcher::~ObLogFetcher() destroy(); } +#ifdef ERRSIM +ERRSIM_POINT_DEF(LOG_FETCHER_LOG_EXT_HANDLER_INIT_FAIL); +ERRSIM_POINT_DEF(LOG_FETCHER_STREAM_WORKER_INIT_FAIL); +#endif int ObLogFetcher::init( const LogFetcherUser &log_fetcher_user, const int64_t cluster_id, @@ -101,6 +105,8 @@ int ObLogFetcher::init( } else { cfg_ = &cfg; // set self_tenant_id before suggest_cached_rpc_res_count + log_fetcher_user_ = log_fetcher_user; + fetching_mode_ = fetching_mode; self_tenant_id_ = self_tenant_id; // Before the LogFetcher module is initialized, the following configuration items need to be loaded configure(cfg); @@ -112,7 +118,7 @@ int ObLogFetcher::init( } const common::ObRegion region(cfg.region.str()); - if (is_integrated_fetching_mode(fetching_mode) && OB_FAIL(log_route_service_.init( + if (is_integrated_fetching_mode(fetching_mode_) && OB_FAIL(log_route_service_.init( proxy, region, cluster_id, @@ -135,7 +141,11 @@ int ObLogFetcher::init( LOG_ERROR("init progress controller fail", KR(ret)); } else if (OB_FAIL(large_buffer_pool_.init("ObLogFetcher", 1L * 1024 * 1024 * 1024))) { LOG_ERROR("init large buffer pool failed", KR(ret)); - } else if (is_direct_fetching_mode(fetching_mode) && OB_FAIL(log_ext_handler_.init())) { +#ifdef ERRSIM + } else if (is_direct_fetching_mode(fetching_mode_) && OB_FAIL(LOG_FETCHER_LOG_EXT_HANDLER_INIT_FAIL)) { + LOG_ERROR("ERRSIM: LOG_FETCHER_LOG_EXT_HANDLER_INIT_FAIL", KR(ret)); +#endif + } else if (is_direct_fetching_mode(fetching_mode_) && OB_FAIL(log_ext_handler_.init())) { LOG_ERROR("init failed", KR(ret)); } else if (OB_FAIL(ls_fetch_mgr_.init( progress_controller_, @@ -147,16 +157,16 @@ int ObLogFetcher::init( LOG_ERROR("init_self_addr_ fail", KR(ret)); } else if (OB_FAIL(rpc_.init(cluster_id, self_tenant_id, cfg.io_thread_num, cfg))) { LOG_ERROR("init rpc handler fail", KR(ret)); - } else if (is_cdc(log_fetcher_user) && OB_FAIL(start_lsn_locator_.init( + } else if (is_cdc(log_fetcher_user_) && OB_FAIL(start_lsn_locator_.init( cfg.start_lsn_locator_thread_num, cfg.start_lsn_locator_locate_count, - fetching_mode, + fetching_mode_, archive_dest, cfg, rpc_, *err_handler))) { LOG_ERROR("init start log id locator fail", KR(ret)); } else if (OB_FAIL(idle_pool_.init( - log_fetcher_user, + log_fetcher_user_, cfg.idle_pool_thread_num, cfg, static_cast(this), @@ -169,6 +179,10 @@ int ObLogFetcher::init( ls_fetch_mgr_, *err_handler))) { LOG_ERROR("init dead pool fail", KR(ret)); +#ifdef ERRSIM + } else if (OB_FAIL(LOG_FETCHER_STREAM_WORKER_INIT_FAIL)) { + LOG_ERROR("ERRSIM: LOG_FETCHER_STREAM_WORKER_INIT_FAIL"); +#endif } else if (OB_FAIL(stream_worker_.init(cfg.stream_worker_thread_num, cfg.timer_task_count_upper_limit, static_cast(this), @@ -187,11 +201,9 @@ int ObLogFetcher::init( progress_controller_, log_handler))) { } else { - log_fetcher_user_ = log_fetcher_user; cluster_id_ = cluster_id; source_tenant_id_ = source_tenant_id; is_loading_data_dict_baseline_data_ = is_loading_data_dict_baseline_data; - fetching_mode_ = fetching_mode; archive_dest_ = archive_dest; paused_ = false; @@ -292,26 +304,24 @@ int ObLogFetcher::start() void ObLogFetcher::stop() { - if (OB_LIKELY(is_inited_)) { - stop_flag_ = true; + stop_flag_ = true; - LOG_INFO("LogFetcher stop begin"); - stream_worker_.stop(); - dead_pool_.stop(); - idle_pool_.stop(); - if (is_cdc(log_fetcher_user_)) { - start_lsn_locator_.stop(); - } - - if (is_integrated_fetching_mode(fetching_mode_)) { - log_route_service_.stop(); - } - if (is_direct_fetching_mode(fetching_mode_)) { - log_ext_handler_.stop(); - } - - LOG_INFO("LogFetcher stop success"); + LOG_INFO("LogFetcher stop begin"); + stream_worker_.stop(); + dead_pool_.stop(); + idle_pool_.stop(); + if (is_cdc(log_fetcher_user_)) { + start_lsn_locator_.stop(); } + + if (is_integrated_fetching_mode(fetching_mode_)) { + log_route_service_.stop(); + } + if (is_direct_fetching_mode(fetching_mode_)) { + log_ext_handler_.stop(); + } + + LOG_INFO("LogFetcher stop success"); } void ObLogFetcher::pause() diff --git a/src/logservice/logfetcher/ob_log_fetcher_dead_pool.cpp b/src/logservice/logfetcher/ob_log_fetcher_dead_pool.cpp index 72913021f..2f161536a 100644 --- a/src/logservice/logfetcher/ob_log_fetcher_dead_pool.cpp +++ b/src/logservice/logfetcher/ob_log_fetcher_dead_pool.cpp @@ -112,6 +112,9 @@ int ObLogFetcherDeadPool::push(LSFetchCtx *task) return ret; } +#ifdef ERRSIM +ERRSIM_POINT_DEF(LOG_FETCHER_DEAD_POOL_START_FAIL); +#endif int ObLogFetcherDeadPool::start() { int ret = OB_SUCCESS; @@ -119,6 +122,10 @@ int ObLogFetcherDeadPool::start() if (OB_UNLIKELY(! inited_)) { LOG_ERROR("not inited"); ret = OB_NOT_INIT; +#ifdef ERRSIM + } else if (OB_FAIL(LOG_FETCHER_DEAD_POOL_START_FAIL)) { + LOG_ERROR("ERRSIM: LOG_FETCHER_DEAD_POOL_START_FAIL"); +#endif } else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) { LOG_WARN("TG_SET_HANDLER_AND_START failed", KR(ret), K(tg_id_)); } else { diff --git a/src/logservice/logfetcher/ob_log_fetcher_idle_pool.cpp b/src/logservice/logfetcher/ob_log_fetcher_idle_pool.cpp index b27ab0b5d..473fd2dbc 100644 --- a/src/logservice/logfetcher/ob_log_fetcher_idle_pool.cpp +++ b/src/logservice/logfetcher/ob_log_fetcher_idle_pool.cpp @@ -125,6 +125,9 @@ int ObLogFetcherIdlePool::push(LSFetchCtx *task) return ret; } +#ifdef ERRSIM +ERRSIM_POINT_DEF(LOG_FETCHER_IDLE_POOL_START_FAIL); +#endif int ObLogFetcherIdlePool::start() { int ret = OB_SUCCESS; @@ -132,6 +135,10 @@ int ObLogFetcherIdlePool::start() if (OB_UNLIKELY(! inited_)) { LOG_ERROR("not inited"); ret = OB_NOT_INIT; +#ifdef ERRSIM + } else if (OB_FAIL(LOG_FETCHER_IDLE_POOL_START_FAIL)) { + LOG_ERROR("ERRSIM: LOG_FETCHER_IDLE_POOL_START_FAIL"); +#endif } else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) { LOG_WARN("TG_SET_HANDLER_AND_START failed", KR(ret), K(tg_id_)); } else { diff --git a/src/logservice/logfetcher/ob_ls_worker.cpp b/src/logservice/logfetcher/ob_ls_worker.cpp index 0fe83a5f2..242fb46f9 100644 --- a/src/logservice/logfetcher/ob_ls_worker.cpp +++ b/src/logservice/logfetcher/ob_ls_worker.cpp @@ -121,6 +121,10 @@ void ObLSWorker::destroy() LOG_INFO("destroy stream worker succ"); } +#ifdef ERRSIM +ERRSIM_POINT_DEF(LOG_FETCHER_LSW_TIMER_START_FAIL); +ERRSIM_POINT_DEF(LOG_FETCHER_LSW_HANDLER_START_FAIL); +#endif int ObLSWorker::start() { int ret = OB_SUCCESS; @@ -128,8 +132,16 @@ int ObLSWorker::start() if (OB_UNLIKELY(! inited_)) { LOG_ERROR("not init", K(inited_)); ret = OB_NOT_INIT; +#ifdef ERRSIM + } else if (OB_FAIL(LOG_FETCHER_LSW_TIMER_START_FAIL)) { + LOG_ERROR("ERRSIM: LOG_FETCHER_LSW_TIMER_START_FAIL"); +#endif } else if (OB_FAIL(TG_START(timer_id_))) { LOG_ERROR("start timer thread fail", KR(ret)); +#ifdef ERRSIM + } else if (OB_FAIL(LOG_FETCHER_LSW_HANDLER_START_FAIL)) { + LOG_ERROR("ERRSIM: LOG_FETCHER_LSW_HANDLER_START_FAIL"); +#endif } else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) { LOG_WARN("TG_SET_HANDLER_AND_START failed", KR(ret), K(tg_id_)); } else { diff --git a/src/logservice/logrouteservice/ob_log_route_service.cpp b/src/logservice/logrouteservice/ob_log_route_service.cpp index fbaae36d4..1a60e59c4 100755 --- a/src/logservice/logrouteservice/ob_log_route_service.cpp +++ b/src/logservice/logrouteservice/ob_log_route_service.cpp @@ -17,6 +17,7 @@ #include "share/rc/ob_tenant_base.h" #include "share/ob_thread_mgr.h" // TG* #include "ob_ls_log_stat_info.h" // LogStatRecordArray +#include "lib/utility/ob_tracepoint.h" using namespace oceanbase::share; using namespace oceanbase::common; @@ -58,6 +59,11 @@ ObLogRouteService::~ObLogRouteService() destroy(); } +#ifdef ERRSIM +ERRSIM_POINT_DEF(LOG_ROUTE_TIMER_INIT_FAIL); +ERRSIM_POINT_DEF(LOG_ROUTE_HANDLER_INIT_FAIL); +ERRSIM_POINT_DEF(LOG_ROUTE_HANDLER_START_FAIL); +#endif int ObLogRouteService::init(ObISQLClient *proxy, const common::ObRegion &prefer_region, const int64_t cluster_id, @@ -108,10 +114,22 @@ int ObLogRouteService::init(ObISQLClient *proxy, all_server_cache_update_interval_sec, all_zone_cache_update_interval_sec))) { LOG_WARN("all_svr_cache_ init failed", KR(ret), K(is_tenant_mode), K(prefer_region), K(all_server_cache_update_interval_sec), K(all_zone_cache_update_interval_sec)); +#ifdef ERRSIM + } else if (OB_FAIL(LOG_ROUTE_TIMER_INIT_FAIL)) { + LOG_ERROR("ERRSIM: LOG_ROUTE_TIMER_INIT_FAIL"); +#endif } else if (OB_FAIL(timer_.init("LogRouter"))) { LOG_ERROR("fail to init itable gc timer", K(ret)); +#ifdef ERRSIM + } else if (OB_FAIL(LOG_ROUTE_HANDLER_INIT_FAIL)) { + LOG_ERROR("ERRSIM: LOG_ROUTE_HANDLER_INIT_FAIL"); +#endif } else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::LogRouteService, tg_id_))) { LOG_ERROR("TG_CREATE failed", KR(ret)); +#ifdef ERRSIM + } else if (OB_FAIL(LOG_ROUTE_HANDLER_START_FAIL)) { + LOG_ERROR("ERRSIM: LOG_ROUTE_HANDLER_START_FAIL"); +#endif } else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) { LOG_WARN("TG_SET_HANDLER_AND_START failed", KR(ret), K(tg_id_)); } else { @@ -170,11 +188,11 @@ void ObLogRouteService::stop() void ObLogRouteService::wait() { - if (IS_INIT) { - LOG_INFO("ObLogRouteService wait begin"); - timer_.wait(); - int64_t num = 0; - int ret = OB_SUCCESS; + LOG_INFO("ObLogRouteService wait begin"); + + int ret = OB_SUCCESS; + int64_t num = 0; + if (-1 != tg_id_) { while (OB_SUCC(TG_GET_QUEUE_NUM(tg_id_, num)) && num > 0) { PAUSE(); } @@ -183,47 +201,47 @@ void ObLogRouteService::wait() } TG_STOP(tg_id_); TG_WAIT(tg_id_); - LOG_INFO("ObLogRouteService wait finish"); } + timer_.wait(); + + LOG_INFO("ObLogRouteService wait finish"); } void ObLogRouteService::destroy() { - if (IS_INIT) { - LOG_INFO("ObLogRouteService destroy begin"); - timer_.destroy(); - ls_route_timer_task_.destroy(); - timer_id_ = -1; - if (-1 != tg_id_) { - TG_DESTROY(tg_id_); - tg_id_ = -1; - } - free_mem_(); - ls_route_key_set_.destroy(); - ls_router_map_.destroy(); - systable_queryer_.destroy(); - all_svr_cache_.destroy(); - svr_blacklist_.destroy(); - - log_router_allocator_.destroy(); - asyn_task_allocator_.destroy(); - - err_handler_ = NULL; - - cluster_id_ = OB_INVALID_CLUSTER_ID; - self_tenant_id_ = OB_INVALID_TENANT_ID; - source_tenant_id_ = OB_INVALID_TENANT_ID; - background_refresh_time_sec_ = 0; - blacklist_survival_time_sec_ = 0; - blacklist_survival_time_upper_limit_min_ = 0; - blacklist_survival_time_penalty_period_min_ = 0; - blacklist_history_overdue_time_min_ = 0; - blacklist_history_clear_interval_min_ = 0; - - is_tenant_mode_ = false; - is_inited_ = false; - LOG_INFO("ObLogRouteService destroy finish"); + LOG_INFO("ObLogRouteService destroy begin"); + timer_.destroy(); + ls_route_timer_task_.destroy(); + timer_id_ = -1; + if (-1 != tg_id_) { + TG_DESTROY(tg_id_); + tg_id_ = -1; } + free_mem_(); + ls_route_key_set_.destroy(); + ls_router_map_.destroy(); + systable_queryer_.destroy(); + all_svr_cache_.destroy(); + svr_blacklist_.destroy(); + + log_router_allocator_.destroy(); + asyn_task_allocator_.destroy(); + + err_handler_ = NULL; + + cluster_id_ = OB_INVALID_CLUSTER_ID; + self_tenant_id_ = OB_INVALID_TENANT_ID; + source_tenant_id_ = OB_INVALID_TENANT_ID; + background_refresh_time_sec_ = 0; + blacklist_survival_time_sec_ = 0; + blacklist_survival_time_upper_limit_min_ = 0; + blacklist_survival_time_penalty_period_min_ = 0; + blacklist_history_overdue_time_min_ = 0; + blacklist_history_clear_interval_min_ = 0; + + is_tenant_mode_ = false; + is_inited_ = false; + LOG_INFO("ObLogRouteService destroy finish"); } void ObLogRouteService::free_mem_()