[FEAT MERGE][CP] patch 423 sql exec feature to master branch

Co-authored-by: qingsuijiu <642782632@qq.com>
Co-authored-by: hwx65 <1780011298@qq.com>
This commit is contained in:
hezuojiao
2024-06-18 00:40:23 +00:00
committed by ob-robot
parent 77c6c17cb6
commit 6e705b1f1f
82 changed files with 2614 additions and 589 deletions

View File

@ -27,7 +27,7 @@ int ObSqlMemMgrProcessor::init(
int64_t cache_size,
const ObPhyOperatorType op_type,
const uint64_t op_id,
ObExecContext *exec_ctx)
ObSqlProfileExecInfo exec_info)
{
int ret = OB_SUCCESS;
bool tmp_enable_auto_mem_mgr = false;
@ -37,26 +37,18 @@ int ObSqlMemMgrProcessor::init(
tenant_id_ = tenant_id;
profile_.set_operator_type(op_type);
profile_.set_operator_id(op_id);
profile_.set_exec_ctx(exec_ctx);
profile_.set_exec_info(exec_info);
const int64_t DEFAULT_CACHE_SIZE = 2 * 1024 * 1024;
if (cache_size < 0) {
LOG_WARN("unexpected cache size got", K(lbt()), K(cache_size), K(op_id), K(op_type));
cache_size = DEFAULT_CACHE_SIZE;
}
if (OB_ISNULL(exec_ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get exec ctx", K(ret));
} else if (OB_ISNULL(allocator)) {
if (OB_ISNULL(allocator)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get allocator", K(ret));
} else if (OB_FAIL(profile_.set_exec_info(*exec_ctx))) {
LOG_WARN("failed to set exec info", K(ret));
} else if (OB_FAIL(alloc_dir_id(dir_id_))) {
} else if (OB_NOT_NULL(sql_mem_mgr)) {
if (OB_NOT_NULL(exec_ctx->get_physical_plan_ctx())
&& OB_NOT_NULL(exec_ctx->get_physical_plan_ctx()->get_phy_plan())) {
profile_.disable_auto_mem_mgr_ = exec_ctx->get_physical_plan_ctx()->get_phy_plan()->is_disable_auto_memory_mgr();
}
profile_.disable_auto_mem_mgr_ = exec_info.get_disable_auto_mem_mgr();
if (sql_mem_mgr->enable_auto_memory_mgr()) {
tmp_enable_auto_mem_mgr = true;
if (profile_.get_auto_policy()) {
@ -74,7 +66,7 @@ int ObSqlMemMgrProcessor::init(
} else {
// first time to register
profile_.init(cache_size, OB_MALLOC_MIDDLE_BLOCK_SIZE);
LOG_DEBUG("trace register work area profile", K(profile_.get_cache_size()));
LOG_TRACE("trace register work area profile", K(profile_.get_cache_size()));
}
// 每次初始化都认为是未注册
profile_.set_expect_size(OB_INVALID_ID);
@ -104,7 +96,7 @@ int ObSqlMemMgrProcessor::init(
int64_t max_mem_size = MAX_SQL_MEM_SIZE;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObSqlWorkareaUtil::get_workarea_size(
profile_.get_work_area_type(), tenant_id_, exec_ctx, max_mem_size))) {
profile_.get_work_area_type(), tenant_id_, exec_info, max_mem_size))) {
LOG_WARN("failed to get workarea size", K(ret), K(tenant_id_), K(max_mem_size));
}
if (!profile_.get_auto_policy()) {
@ -191,6 +183,9 @@ int ObSqlMemMgrProcessor::update_used_mem_size(int64_t used_size)
{
int ret = OB_SUCCESS;
int64_t delta_size = used_size - profile_.mem_used_;
if (OB_NOT_NULL(op_monitor_info_)) {
op_monitor_info_->update_memory(delta_size);
}
if (delta_size > 0) {
if (OB_NOT_NULL(sql_mem_mgr_) && OB_NOT_NULL(mem_callback_)) {
mem_callback_->alloc(delta_size);
@ -224,7 +219,7 @@ int ObSqlMemMgrProcessor::try_upgrade_auto_mgr(ObIAllocator *allocator, int64_t
} else if (OB_FAIL(init(allocator, tenant_id_,
max_area_size * (EXTEND_RATIO + 100) /100,
profile_.get_operator_type(),
profile_.get_operator_id(), profile_.get_exec_ctx()))) {
profile_.get_operator_id(), profile_.get_exec_info()))) {
LOG_WARN("failed to upgrade sql memory manager", K(ret));
} else if (is_auto_mgr()) {
// 由于之前未注册,所以对内存统计不会反应到sql mem manager中,但现在注册了,则需要重新更新下内存值
@ -335,20 +330,42 @@ int ObSqlWorkareaUtil::get_workarea_size(const ObSqlWorkAreaType wa_type,
int64_t &value)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(exec_ctx)) {
if (OB_FAIL(get_workarea_size(wa_type, tenant_id, value, nullptr))) {
LOG_WARN("Fail to get workarea size", K(ret));
}
} else if (OB_FAIL(get_workarea_size(wa_type, tenant_id, value, exec_ctx->get_my_session()))) {
LOG_WARN("Fail to get workarea size", K(ret));
}
return ret;
}
if (NULL != exec_ctx) {
if (OB_ISNULL(exec_ctx->get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected exec_ctx or session", K(ret), K(wa_type), K(tenant_id), KP(exec_ctx));
int ObSqlWorkareaUtil::get_workarea_size(const ObSqlWorkAreaType wa_type,
const int64_t tenant_id,
ObSqlProfileExecInfo &exec_info,
int64_t &value)
{
int ret = OB_SUCCESS;
if (OB_FAIL(get_workarea_size(wa_type, tenant_id, value, exec_info.get_my_session()))) {
LOG_WARN("Fail to get workarea size", K(ret));
}
return ret;
}
int ObSqlWorkareaUtil::get_workarea_size(const ObSqlWorkAreaType wa_type,
const int64_t tenant_id,
int64_t &value,
ObSQLSessionInfo *session)
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(session)) {
if (HASH_WORK_AREA == wa_type) {
value = session->get_tenant_hash_area_size();
} else if (SORT_WORK_AREA == wa_type) {
value = session->get_tenant_sort_area_size();
} else {
if (HASH_WORK_AREA == wa_type) {
value = exec_ctx->get_my_session()->get_tenant_hash_area_size();
} else if (SORT_WORK_AREA == wa_type) {
value = exec_ctx->get_my_session()->get_tenant_sort_area_size();
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected status: workarea type", K(wa_type), K(tenant_id));
}
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected status: workarea type", K(wa_type), K(tenant_id));
}
} else {
ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
@ -361,15 +378,12 @@ int ObSqlWorkareaUtil::get_workarea_size(const ObSqlWorkAreaType wa_type,
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected status: workarea type", K(wa_type), K(tenant_id));
}
LOG_DEBUG("debug workarea size", K(value), K(tenant_id), K(lbt()));
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to init tenant config", K(tenant_id), K(ret));
}
}
LOG_DEBUG("debug workarea size", K(value), K(tenant_id), K(lbt()));
return ret;
}