From 93b3db604296e11db2fb5abef732db38e4baa346 Mon Sep 17 00:00:00 2001 From: zxlzxlzxlzxlzxl Date: Wed, 21 Jun 2023 20:12:25 +0000 Subject: [PATCH] [LogFetcher] Limit the memory hold by FetchLogARpcRes --- .../logfetcher/ob_log_fetch_log_rpc.h | 1 + src/logservice/logfetcher/ob_log_fetcher.cpp | 27 ++++++++++++++++++- src/logservice/logfetcher/ob_log_fetcher.h | 2 ++ 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/logservice/logfetcher/ob_log_fetch_log_rpc.h b/src/logservice/logfetcher/ob_log_fetch_log_rpc.h index dbb1b0d83..67a1bd905 100644 --- a/src/logservice/logfetcher/ob_log_fetch_log_rpc.h +++ b/src/logservice/logfetcher/ob_log_fetch_log_rpc.h @@ -501,6 +501,7 @@ class FetchLogARpcResultPool : public IFetchLogARpcResultPool { typedef common::ObSmallObjPool ResultPool; // 16M +public: static const int64_t DEFAULT_RESULT_POOL_BLOCK_SIZE = (1 << 24L) + (1 << 20L); public: diff --git a/src/logservice/logfetcher/ob_log_fetcher.cpp b/src/logservice/logfetcher/ob_log_fetcher.cpp index d0eb8987c..09865da91 100755 --- a/src/logservice/logfetcher/ob_log_fetcher.cpp +++ b/src/logservice/logfetcher/ob_log_fetcher.cpp @@ -98,6 +98,12 @@ int ObLogFetcher::init( cfg_ = &cfg; // Before the LogFetcher module is initialized, the following configuration items need to be loaded configure(cfg); + int64_t cached_fetch_log_arpc_res_cnt = cfg.rpc_result_cached_count; + const int64_t MIN_FETCH_LOG_ARPC_RES_CNT = 4; + if (is_standby(log_fetcher_user)) { + cached_fetch_log_arpc_res_cnt = + suggest_cached_rpc_res_count_(MIN_FETCH_LOG_ARPC_RES_CNT, cached_fetch_log_arpc_res_cnt); + } const common::ObRegion region(cfg.region.str()); if (is_integrated_fetching_mode(fetching_mode) && OB_FAIL(log_route_service_.init( @@ -165,7 +171,7 @@ int ObLogFetcher::init( source_tenant_id, cfg.svr_stream_cached_count, cfg.fetch_stream_cached_count, - cfg.rpc_result_cached_count, + cached_fetch_log_arpc_res_cnt, rpc_, stream_worker_, progress_controller_, @@ -763,6 +769,25 @@ void ObLogFetcher::print_stat() } } +int ObLogFetcher::suggest_cached_rpc_res_count_(const int64_t min_res_cnt, + const int64_t max_res_cnt) +{ + const int64_t memory_limit = get_tenant_memory_limit(MTL_ID()); + // the maximum memory hold by rpc_result should be 1/32 of the memory limit. + const int64_t rpc_res_hold_max = (memory_limit >> 5); + int64_t rpc_res_cnt = rpc_res_hold_max / FetchLogARpcResultPool::DEFAULT_RESULT_POOL_BLOCK_SIZE; + if (rpc_res_cnt < min_res_cnt) { + rpc_res_cnt = min_res_cnt; + } + if (rpc_res_cnt > max_res_cnt) { + rpc_res_cnt = max_res_cnt; + } + + LOG_INFO("suggest fetchlog arpc cached rpc result count", K(memory_limit), + K(min_res_cnt), K(max_res_cnt), K(rpc_res_cnt)); + return rpc_res_cnt; +} + void ObLogFetcher::print_fetcher_stat_() { int ret = OB_SUCCESS; diff --git a/src/logservice/logfetcher/ob_log_fetcher.h b/src/logservice/logfetcher/ob_log_fetcher.h index 7379c29ab..35db806b4 100644 --- a/src/logservice/logfetcher/ob_log_fetcher.h +++ b/src/logservice/logfetcher/ob_log_fetcher.h @@ -256,6 +256,8 @@ public: virtual void print_stat(); private: + int suggest_cached_rpc_res_count_(const int64_t min_res_cnt, + const int64_t max_res_cnt); int init_self_addr_(); void print_fetcher_stat_(); int print_delay();