[LogFetcher] Limit the memory hold by FetchLogARpcRes
This commit is contained in:
parent
1a09fe9ab3
commit
93b3db6042
@ -501,6 +501,7 @@ class FetchLogARpcResultPool : public IFetchLogARpcResultPool
|
||||
{
|
||||
typedef common::ObSmallObjPool<FetchLogARpcResult> ResultPool;
|
||||
// 16M
|
||||
public:
|
||||
static const int64_t DEFAULT_RESULT_POOL_BLOCK_SIZE = (1 << 24L) + (1 << 20L);
|
||||
|
||||
public:
|
||||
|
@ -98,6 +98,12 @@ int ObLogFetcher::init(
|
||||
cfg_ = &cfg;
|
||||
// Before the LogFetcher module is initialized, the following configuration items need to be loaded
|
||||
configure(cfg);
|
||||
int64_t cached_fetch_log_arpc_res_cnt = cfg.rpc_result_cached_count;
|
||||
const int64_t MIN_FETCH_LOG_ARPC_RES_CNT = 4;
|
||||
if (is_standby(log_fetcher_user)) {
|
||||
cached_fetch_log_arpc_res_cnt =
|
||||
suggest_cached_rpc_res_count_(MIN_FETCH_LOG_ARPC_RES_CNT, cached_fetch_log_arpc_res_cnt);
|
||||
}
|
||||
const common::ObRegion region(cfg.region.str());
|
||||
|
||||
if (is_integrated_fetching_mode(fetching_mode) && OB_FAIL(log_route_service_.init(
|
||||
@ -165,7 +171,7 @@ int ObLogFetcher::init(
|
||||
source_tenant_id,
|
||||
cfg.svr_stream_cached_count,
|
||||
cfg.fetch_stream_cached_count,
|
||||
cfg.rpc_result_cached_count,
|
||||
cached_fetch_log_arpc_res_cnt,
|
||||
rpc_,
|
||||
stream_worker_,
|
||||
progress_controller_,
|
||||
@ -763,6 +769,25 @@ void ObLogFetcher::print_stat()
|
||||
}
|
||||
}
|
||||
|
||||
int ObLogFetcher::suggest_cached_rpc_res_count_(const int64_t min_res_cnt,
|
||||
const int64_t max_res_cnt)
|
||||
{
|
||||
const int64_t memory_limit = get_tenant_memory_limit(MTL_ID());
|
||||
// the maximum memory hold by rpc_result should be 1/32 of the memory limit.
|
||||
const int64_t rpc_res_hold_max = (memory_limit >> 5);
|
||||
int64_t rpc_res_cnt = rpc_res_hold_max / FetchLogARpcResultPool::DEFAULT_RESULT_POOL_BLOCK_SIZE;
|
||||
if (rpc_res_cnt < min_res_cnt) {
|
||||
rpc_res_cnt = min_res_cnt;
|
||||
}
|
||||
if (rpc_res_cnt > max_res_cnt) {
|
||||
rpc_res_cnt = max_res_cnt;
|
||||
}
|
||||
|
||||
LOG_INFO("suggest fetchlog arpc cached rpc result count", K(memory_limit),
|
||||
K(min_res_cnt), K(max_res_cnt), K(rpc_res_cnt));
|
||||
return rpc_res_cnt;
|
||||
}
|
||||
|
||||
void ObLogFetcher::print_fetcher_stat_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -256,6 +256,8 @@ public:
|
||||
virtual void print_stat();
|
||||
|
||||
private:
|
||||
int suggest_cached_rpc_res_count_(const int64_t min_res_cnt,
|
||||
const int64_t max_res_cnt);
|
||||
int init_self_addr_();
|
||||
void print_fetcher_stat_();
|
||||
int print_delay();
|
||||
|
Loading…
x
Reference in New Issue
Block a user