Adjust the decode resource pool to init based on tenant memory.
This commit is contained in:
@ -1184,6 +1184,8 @@ int ObMultiTenant::update_tenant_memory(const ObUnitInfoGetter::ObTenantConfig &
|
||||
LOG_WARN("fail to update tenant memory", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(update_tenant_freezer_mem_limit(tenant_id, memory_size, allowed_mem_limit))) {
|
||||
LOG_WARN("fail to update_tenant_freezer_mem_limit", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(update_tenant_decode_resource(tenant_id))) {
|
||||
LOG_WARN("fail to update_tenant_decode_resource", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(update_throttle_config_(tenant_id))) {
|
||||
LOG_WARN("update throttle config failed", K(ret), K(tenant_id));
|
||||
} else if (FALSE_IT(tenant->set_unit_memory_size(allowed_mem_limit))) {
|
||||
@ -1447,6 +1449,21 @@ int ObMultiTenant::update_tenant_freezer_mem_limit(const uint64_t tenant_id,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMultiTenant::update_tenant_decode_resource(const uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
ObDecodeResourcePool * decode_resource_pool = nullptr;
|
||||
if (tenant_id != MTL_ID() && OB_FAIL(guard.switch_to(tenant_id))) {
|
||||
LOG_WARN("switch tenant failed", K(ret), K(tenant_id));
|
||||
} else if (FALSE_IT(decode_resource_pool = MTL(ObDecodeResourcePool *))) {
|
||||
} else if (OB_ISNULL(decode_resource_pool)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
} else if (OB_FAIL(decode_resource_pool->reload_config())) {
|
||||
LOG_WARN("fail to update tenant decode resource", K(ret), K(tenant_id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMultiTenant::get_tenant_unit(const uint64_t tenant_id, ObUnitInfoGetter::ObTenantConfig &unit)
|
||||
{
|
||||
|
@ -127,6 +127,7 @@ public:
|
||||
int update_tenant_freezer_mem_limit(const uint64_t tenant_id,
|
||||
const int64_t tenant_min_mem,
|
||||
const int64_t tenant_max_mem);
|
||||
int update_tenant_decode_resource(const uint64_t tenant_id);
|
||||
|
||||
inline TenantList &get_tenant_list();
|
||||
int for_each(std::function<int(ObTenant &)> func);
|
||||
|
@ -667,6 +667,8 @@ int ObTenantNodeBalancer::update_tenant_memory(const obrpc::ObTenantMemoryArg &t
|
||||
LOG_WARN("failed to update tenant memory", K(ret), K(tenant_id), K(memory_size));
|
||||
} else if (OB_FAIL(omt_->update_tenant_freezer_mem_limit(tenant_id, unit.config_.memory_size(), allowed_mem_limit))) {
|
||||
LOG_WARN("set_tenant_freezer_mem_limit failed", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(omt_->update_tenant_decode_resource(tenant_id))) {
|
||||
LOG_WARN("update_tenant_decode_resource failed", K(ret), K(tenant_id));
|
||||
} else {
|
||||
refresh_interval_ = refresh_interval * 1000L * 1000L;
|
||||
LOG_INFO("succ to admin update tenant memory", K(tenant_id), K(memory_size));
|
||||
|
@ -63,23 +63,24 @@ int ObDecodeResourcePool::init() {
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_inited_) {
|
||||
uint64_t tenant_id = MTL_ID();
|
||||
uint64_t adaptive_factor = get_adaptive_factor(tenant_id);
|
||||
//empiric value (raw, dict, const):(rle, int_diff):else = 4:2:1
|
||||
if(OB_FAIL(raw_pool_.init(MAX_DECODER_CNT, "RawPl", tenant_id))
|
||||
|| OB_FAIL(dict_pool_.init(MAX_DECODER_CNT, "DictPl", tenant_id))
|
||||
|| OB_FAIL(rle_pool_.init(MID_DECODER_CNT, "RlePl", tenant_id))
|
||||
|| OB_FAIL(const_pool_.init(MAX_DECODER_CNT, "ConstPl", tenant_id))
|
||||
|| OB_FAIL(int_diff_pool_.init(MID_DECODER_CNT, "IntDiffPl", tenant_id))
|
||||
|| OB_FAIL(str_diff_pool_.init(MIN_DECODER_CNT, "StrDiffPl", tenant_id))
|
||||
|| OB_FAIL(hex_str_pool_.init(MIN_DECODER_CNT, "HexStrPl", tenant_id))
|
||||
|| OB_FAIL(str_prefix_pool_.init(MIN_DECODER_CNT, "StrPrefixPl", tenant_id))
|
||||
|| OB_FAIL(column_equal_pool_.init(MIN_DECODER_CNT, "ColEqualPl", tenant_id))
|
||||
|| OB_FAIL(column_substr_pool_.init(MIN_DECODER_CNT, "ColSubStrPl", tenant_id))
|
||||
|| OB_FAIL(ctx_block_pool_.init(MAX_CTX_BLOCK_CNT, "CtxBlockPl", tenant_id)
|
||||
|| OB_FAIL(cs_integer_pool_.init(MAX_CS_DECODER_CNT, "CsIntPl", tenant_id))
|
||||
|| OB_FAIL(cs_string_pool_.init(MAX_CS_DECODER_CNT, "CsStrPl", tenant_id))
|
||||
|| OB_FAIL(cs_int_dict_pool_.init(MAX_CS_DECODER_CNT, "CsDictPl", tenant_id))
|
||||
|| OB_FAIL(cs_str_dict_pool_.init(MAX_CS_DECODER_CNT, "CsDictPl", tenant_id))
|
||||
|| OB_FAIL(cs_ctx_block_pool_.init(MAX_CS_CTX_BLOCK_CNT, "CsCtxBlockPl", tenant_id))
|
||||
if(OB_FAIL(raw_pool_.init(MAX_DECODER_CNT * adaptive_factor, "RawPl", tenant_id))
|
||||
|| OB_FAIL(dict_pool_.init(MAX_DECODER_CNT * adaptive_factor, "DictPl", tenant_id))
|
||||
|| OB_FAIL(rle_pool_.init(MID_DECODER_CNT * adaptive_factor, "RlePl", tenant_id))
|
||||
|| OB_FAIL(const_pool_.init(MAX_DECODER_CNT * adaptive_factor, "ConstPl", tenant_id))
|
||||
|| OB_FAIL(int_diff_pool_.init(MID_DECODER_CNT * adaptive_factor, "IntDiffPl", tenant_id))
|
||||
|| OB_FAIL(str_diff_pool_.init(MIN_DECODER_CNT * adaptive_factor, "StrDiffPl", tenant_id))
|
||||
|| OB_FAIL(hex_str_pool_.init(MIN_DECODER_CNT * adaptive_factor, "HexStrPl", tenant_id))
|
||||
|| OB_FAIL(str_prefix_pool_.init(MIN_DECODER_CNT * adaptive_factor, "StrPrefixPl", tenant_id))
|
||||
|| OB_FAIL(column_equal_pool_.init(MIN_DECODER_CNT * adaptive_factor, "ColEqualPl", tenant_id))
|
||||
|| OB_FAIL(column_substr_pool_.init(MIN_DECODER_CNT * adaptive_factor, "ColSubStrPl", tenant_id))
|
||||
|| OB_FAIL(ctx_block_pool_.init(MAX_CTX_BLOCK_CNT * adaptive_factor, "CtxBlockPl", tenant_id)
|
||||
|| OB_FAIL(cs_integer_pool_.init(MAX_CS_DECODER_CNT * adaptive_factor, "CsIntPl", tenant_id))
|
||||
|| OB_FAIL(cs_string_pool_.init(MAX_CS_DECODER_CNT * adaptive_factor, "CsStrPl", tenant_id))
|
||||
|| OB_FAIL(cs_int_dict_pool_.init(MAX_CS_DECODER_CNT * adaptive_factor, "CsDictPl", tenant_id))
|
||||
|| OB_FAIL(cs_str_dict_pool_.init(MAX_CS_DECODER_CNT * adaptive_factor, "CsDictPl", tenant_id))
|
||||
|| OB_FAIL(cs_ctx_block_pool_.init(MAX_CS_CTX_BLOCK_CNT * adaptive_factor, "CsCtxBlockPl", tenant_id))
|
||||
)) {
|
||||
STORAGE_LOG(WARN, "failed to init decode resource pool", K(ret));
|
||||
} else {
|
||||
@ -89,6 +90,55 @@ int ObDecodeResourcePool::init() {
|
||||
return ret;
|
||||
}
|
||||
|
||||
uint64_t ObDecodeResourcePool::get_adaptive_factor(const uint64_t tenant_id) const
|
||||
{
|
||||
uint64_t adaptive_factor = 0;
|
||||
const int64_t tenant_mem_limit = lib::get_tenant_memory_limit(tenant_id);
|
||||
if (is_sys_tenant(tenant_id) || is_server_tenant(tenant_id) || is_virtual_tenant_id(tenant_id)) {
|
||||
adaptive_factor = MIN_FACTOR;
|
||||
} else {
|
||||
// Each 1G corresponds to 5.5M, and the minimum(5.5M) and maximum(275.5M) limits are set.
|
||||
adaptive_factor = tenant_mem_limit / (1024 * 1024 * 1024);
|
||||
adaptive_factor = MAX(MIN_FACTOR, adaptive_factor);
|
||||
adaptive_factor = MIN(MAX_FACTOR, adaptive_factor);
|
||||
}
|
||||
return adaptive_factor;
|
||||
}
|
||||
|
||||
int ObDecodeResourcePool::reload_config()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t tenant_id = MTL_ID();
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
STORAGE_LOG(WARN, "not init", KR(ret), K_(is_inited), K(tenant_id));
|
||||
} else {
|
||||
const uint64_t adaptive_factor = get_adaptive_factor(tenant_id);
|
||||
if (raw_pool_.get_fixed_count() == MAX_DECODER_CNT * adaptive_factor) {
|
||||
// not change do nothing
|
||||
} else {
|
||||
raw_pool_.set_fixed_count(MAX_DECODER_CNT * adaptive_factor);
|
||||
dict_pool_.set_fixed_count(MAX_DECODER_CNT * adaptive_factor);
|
||||
rle_pool_.set_fixed_count(MID_DECODER_CNT * adaptive_factor);
|
||||
const_pool_.set_fixed_count(MAX_DECODER_CNT * adaptive_factor);
|
||||
int_diff_pool_.set_fixed_count(MID_DECODER_CNT * adaptive_factor);
|
||||
str_diff_pool_.set_fixed_count(MIN_DECODER_CNT * adaptive_factor);
|
||||
hex_str_pool_.set_fixed_count(MIN_DECODER_CNT * adaptive_factor);
|
||||
str_prefix_pool_.set_fixed_count(MIN_DECODER_CNT * adaptive_factor);
|
||||
column_equal_pool_.set_fixed_count(MIN_DECODER_CNT * adaptive_factor);
|
||||
column_substr_pool_.set_fixed_count(MIN_DECODER_CNT * adaptive_factor);
|
||||
ctx_block_pool_.set_fixed_count(MAX_CTX_BLOCK_CNT * adaptive_factor);
|
||||
//cs
|
||||
cs_integer_pool_.set_fixed_count(MAX_CS_DECODER_CNT * adaptive_factor);
|
||||
cs_string_pool_.set_fixed_count(MAX_CS_DECODER_CNT * adaptive_factor);
|
||||
cs_int_dict_pool_.set_fixed_count(MAX_CS_DECODER_CNT * adaptive_factor);
|
||||
cs_str_dict_pool_.set_fixed_count(MAX_CS_DECODER_CNT * adaptive_factor);
|
||||
cs_ctx_block_pool_.set_fixed_count(MAX_CS_CTX_BLOCK_CNT * adaptive_factor);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template<>
|
||||
ObSmallObjPool<ObRawDecoder>& ObDecodeResourcePool::get_pool()
|
||||
{
|
||||
|
@ -51,15 +51,18 @@ public:
|
||||
static int mtl_init(ObDecodeResourcePool *&ctx_array_pool);
|
||||
void destroy();
|
||||
int init();
|
||||
int reload_config();
|
||||
template <typename T>
|
||||
int alloc(T *&item);
|
||||
template <typename T>
|
||||
int free(T *item);
|
||||
|
||||
private:
|
||||
uint64_t get_adaptive_factor(const uint64_t tenant_id) const;
|
||||
template<typename T>
|
||||
ObSmallObjPool<T> &get_pool();
|
||||
private:
|
||||
static const uint64_t MIN_FACTOR = 1; // 5.5M
|
||||
static const uint64_t MAX_FACTOR = 50; // 275.5M
|
||||
static const int64_t MAX_DECODER_CNT = 4096;
|
||||
static const int64_t MID_DECODER_CNT = MAX_DECODER_CNT / 2;
|
||||
static const int64_t MIN_DECODER_CNT = MAX_DECODER_CNT / 4;
|
||||
|
Reference in New Issue
Block a user