[FEAT MERGE] del_tenant_memleak branch

Co-authored-by: HaHaJeff <jeffzhouhhh@gmail.com>
This commit is contained in:
obdev
2023-01-28 19:29:23 +08:00
committed by ob-robot
parent e3b89cd741
commit ba19ba90e0
179 changed files with 3235 additions and 2291 deletions

View File

@ -23,6 +23,7 @@
#include "share/ob_rpc_struct.h"
#include "share/ob_truncated_string.h"
#include "share/schema/ob_schema_getter_guard.h"
#include "lib/rc/ob_rc.h"
#include "observer/ob_server_struct.h"
#include "sql/plan_cache/ob_ps_cache_callback.h"
#include "sql/plan_cache/ob_ps_sql_utils.h"
@ -36,7 +37,6 @@
#include "pl/ob_pl.h"
#include "pl/ob_pl_package.h"
#include "observer/ob_req_time_service.h"
#include "sql/plan_cache/ob_plan_cache_manager.h"
#include "pl/pl_cache/ob_pl_cache_mgr.h"
using namespace oceanbase::common;
@ -90,7 +90,7 @@ struct ObGetPlanIdBySqlIdOp
struct ObGetKVEntryByNsOp : public ObKVEntryTraverseOp
{
explicit ObGetKVEntryByNsOp(const ObLibCacheNameSpace ns,
explicit ObGetKVEntryByNsOp(const ObLibCacheNameSpace ns,
LCKeyValueArray *key_val_list,
const CacheRefHandleID ref_handle)
: ObKVEntryTraverseOp(key_val_list, ref_handle),
@ -301,7 +301,6 @@ ObPlanCache::ObPlanCache()
mem_used_(0),
bucket_num_(0),
inner_allocator_(),
ref_count_(0),
ref_handle_mgr_(),
pcm_(NULL),
destroy_(0)
@ -317,6 +316,7 @@ void ObPlanCache::destroy()
{
observer::ObReqTimeGuard req_timeinfo_guard;
if (inited_) {
TG_DESTROY(tg_id_);
if (OB_SUCCESS != (cache_evict_all_obj())) {
SQL_PC_LOG(WARN, "fail to evict all lib cache cache");
}
@ -324,12 +324,11 @@ void ObPlanCache::destroy()
}
}
int ObPlanCache::init(int64_t hash_bucket, common::ObAddr addr,
uint64_t tenant_id,
PlanCacheMap* pcm)
int ObPlanCache::init(int64_t hash_bucket, uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (!inited_) {
ObPCMemPctConf default_conf;
if (OB_FAIL(co_mgr_.init(hash_bucket, tenant_id))) {
SQL_PC_LOG(WARN, "failed to init lib cache manager", K(ret));
} else if (OB_FAIL(cache_key_node_map_.create(hash::cal_next_prime(hash_bucket),
@ -337,17 +336,25 @@ int ObPlanCache::init(int64_t hash_bucket, common::ObAddr addr,
ObModIds::OB_HASH_NODE_PLAN_CACHE,
tenant_id))) {
SQL_PC_LOG(WARN, "failed to init PlanCache", K(ret));
} else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::PlanCacheEvict, tg_id_))) {
LOG_WARN("failed to create tg", K(ret));
} else if (OB_FAIL(TG_START(tg_id_))) {
LOG_WARN("failed to start tg", K(ret));
} else if (OB_FAIL(TG_SCHEDULE(tg_id_, evict_task_, GCONF.plan_cache_evict_interval, true))) {
LOG_WARN("failed to schedule refresh task", K(ret));
} else if (OB_FAIL(set_mem_conf(default_conf))) {
LOG_WARN("fail to set plan cache memory conf", K(ret));
} else {
evict_task_.plan_cache_ = this;
cn_factory_.set_lib_cache(this);
ObMemAttr attr = get_mem_attr();
attr.tenant_id_ = tenant_id;
inner_allocator_.set_attr(attr);
set_host(addr);
set_host(const_cast<ObAddr &>(GCTX.self_addr()));
bucket_num_ = hash::cal_next_prime(hash_bucket);
tenant_id_ = tenant_id;
ref_handle_mgr_.set_tenant_id(tenant_id_);
inited_ = true;
pcm_ = pcm;
}
}
return ret;
@ -360,7 +367,7 @@ int ObPlanCache::check_after_get_plan(int tmp_ret,
int ret = tmp_ret;
ObPhysicalPlan *plan = NULL;
bool need_late_compilation = false;
ObJITEnableMode jit_mode = OFF;
ObJITEnableMode jit_mode = ObJITEnableMode::OFF;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
ObPlanCacheCtx &pc_ctx = static_cast<ObPlanCacheCtx&>(ctx);
if (cache_obj != NULL && ObLibCacheNameSpace::NS_CRSR == cache_obj->get_ns()) {
@ -399,7 +406,7 @@ int ObPlanCache::check_after_get_plan(int tmp_ret,
}
}
if (OB_SUCC(ret)) {
if (AUTO == jit_mode && // only use late compilation when jit_mode is auto
if (ObJITEnableMode::AUTO == jit_mode && // only use late compilation when jit_mode is auto
OB_FAIL(need_late_compile(plan, need_late_compilation))) {
LOG_WARN("failed to check for late compilation", K(ret));
} else {
@ -433,7 +440,7 @@ int ObPlanCache::get_plan(common::ObIAllocator &allocator,
ObCacheObjGuard& guard)
{
int ret = OB_SUCCESS;
FLTSpanGuard(pc_get_plan);
ObGlobalReqTimeService::check_req_timeinfo();
pc_ctx.handle_id_ = guard.ref_handle_;
@ -684,6 +691,9 @@ int ObPlanCache::add_cache_obj(ObILibCacheCtx &ctx,
if (OB_ISNULL(key) || OB_ISNULL(cache_obj)) {
ret = OB_INVALID_ARGUMENT;
SQL_PC_LOG(WARN, "invalid null argument", K(ret), K(key), K(cache_obj));
} else if (get_tenant_id() != cache_obj->get_tenant_id()) {
ret = OB_ERR_UNEXPECTED;
SQL_PC_LOG(ERROR, "unmatched tenant_id", K(ret), K(get_tenant_id()), K(cache_obj->get_tenant_id()));
} else if (OB_FAIL(get_value(key, cache_node, w_ref_lock /*write locked*/))) {
ret = OB_ERR_UNEXPECTED;
SQL_PC_LOG(DEBUG, "failed to get cache node from lib cache by key", K(ret));
@ -961,9 +971,10 @@ int ObPlanCache::cache_evict_plan_by_sql_id(uint64_t db_id, common::ObString sql
}
int ObPlanCache::evict_plan_by_table_name(uint64_t tenant_id, uint64_t database_id, ObString tab_name)
int ObPlanCache::evict_plan_by_table_name(uint64_t database_id, ObString tab_name)
{
int ret = OB_SUCCESS;
observer::ObReqTimeGuard req_timeinfo_guard;
ObGlobalReqTimeService::check_req_timeinfo();
SQL_PC_LOG(DEBUG, "cache evict plan by table name start");
LCKeyValueArray to_evict_keys;
@ -1169,37 +1180,6 @@ int ObPlanCache::remove_cache_node(ObILibCacheKey *key)
return ret;
}
int64_t ObPlanCache::inc_ref_count()
{
int64_t ret = 0;
ret = ATOMIC_AAF((uint64_t *)&ref_count_, 1);
return ret;
}
void ObPlanCache::dec_ref_count()
{
int64_t ref_count = ATOMIC_SAF((uint64_t *)&ref_count_, 1);
int64_t destroy = ATOMIC_LOAD((uint64_t *)&destroy_);
// when plan cache is destorying, we needs to clean all its plan and, at this time,
// have to refers the plan cache that is destorying. Therefore, we needs to record
// its status; Otherwise, the plan cache will be deconstructed many times, which
// will result in some memory leak.
if (ref_count > 0) {
} else if (0 == destroy && 0 == ref_count) {
//delete
ATOMIC_INC(&destroy_);
this->~ObPlanCache();
if (OB_ISNULL(pcm_)) {
// do nothing
} else {
pcm_->erase_refactored(tenant_id_);
}
} else if (ref_count < 0) {
BACKTRACE(ERROR, true, "Plan Cache %p ref count < 0, ref_count = %ld", this, ref_count);
}
}
int ObPlanCache::ref_cache_obj(const ObCacheObjID obj_id, ObCacheObjGuard& guard)
{
int ret = OB_SUCCESS;
@ -1877,6 +1857,249 @@ template int ObPlanCache::dump_deleted_objs<DUMP_SQL>(ObIArray<AllocCacheObjInfo
const int64_t) const;
template int ObPlanCache::dump_deleted_objs<DUMP_ALL>(ObIArray<AllocCacheObjInfo> &,
const int64_t) const;
int ObPlanCache::mtl_init(ObPlanCache* &plan_cache)
{
int ret = OB_SUCCESS;
uint64_t tenant_id = lib::current_resource_owner_id();
int64_t mem_limit = lib::get_tenant_memory_limit(tenant_id);
if (OB_FAIL(plan_cache->init(common::OB_PLAN_CACHE_BUCKET_NUMBER, tenant_id))) {
LOG_WARN("failed to init request manager", K(ret));
} else {
// do nothing
}
return ret;
}
void ObPlanCache::mtl_stop(ObPlanCache * &plan_cache)
{
if (OB_LIKELY(nullptr != plan_cache)) {
TG_CANCEL(plan_cache->tg_id_, plan_cache->evict_task_);
TG_STOP(plan_cache->tg_id_);
}
}
int ObPlanCache::flush_plan_cache()
{
int ret = OB_SUCCESS;
observer::ObReqTimeGuard req_timeinfo_guard;
if (OB_FAIL(cache_evict_all_plan())) {
SQL_PC_LOG(ERROR, "Plan cache evict failed, please check", K(ret));
}
ObArray<AllocCacheObjInfo> deleted_objs;
int64_t safe_timestamp = INT64_MAX;
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(observer::ObGlobalReqTimeService::get_instance().get_global_safe_timestamp(safe_timestamp))) {
SQL_PC_LOG(ERROR, "failed to get global safe timestamp", K(ret));
} else if (OB_FAIL(dump_deleted_objs<DUMP_SQL>(deleted_objs, safe_timestamp))) {
SQL_PC_LOG(WARN, "failed to get deleted sql objs", K(ret));
} else {
LOG_INFO("Deleted Cache Objs", K(deleted_objs));
for (int64_t i = 0; i < deleted_objs.count(); i++) { // ignore error code and continue
if (OB_FAIL(ObCacheObjectFactory::destroy_cache_obj(true,
deleted_objs.at(i).obj_id_,
this))) {
LOG_WARN("failed to destroy cache obj", K(ret));
}
}
}
return ret;
}
int ObPlanCache::flush_plan_cache_by_sql_id(uint64_t db_id, common::ObString sql_id)
{
int ret = OB_SUCCESS;
observer::ObReqTimeGuard req_timeinfo_guard;
if (OB_FAIL(cache_evict_plan_by_sql_id(db_id, sql_id))) {
SQL_PC_LOG(ERROR, "Plan cache evict failed, please check", K(ret));
}
ObArray<AllocCacheObjInfo> deleted_objs;
int64_t safe_timestamp = INT64_MAX;
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(observer::ObGlobalReqTimeService::get_instance()
.get_global_safe_timestamp(safe_timestamp))) {
SQL_PC_LOG(ERROR, "failed to get global safe timestamp", K(ret));
} else if (OB_FAIL(dump_deleted_objs<DUMP_SQL>(deleted_objs, safe_timestamp))) {
SQL_PC_LOG(WARN, "failed to get deleted sql objs", K(ret));
} else {
LOG_INFO("Deleted Cache Objs", K(deleted_objs));
for (int64_t i = 0; i < deleted_objs.count(); i++) { // ignore error code and continue
if (OB_FAIL(ObCacheObjectFactory::destroy_cache_obj(true,
deleted_objs.at(i).obj_id_,
this))) {
LOG_WARN("failed to destroy cache obj", K(ret));
}
}
}
return ret;
}
int ObPlanCache::flush_lib_cache()
{
int ret = OB_SUCCESS;
observer::ObReqTimeGuard req_timeinfo_guard;
if (OB_FAIL(cache_evict_all_obj())) {
SQL_PC_LOG(ERROR, "lib cache evict failed, please check", K(ret));
}
ObArray<AllocCacheObjInfo> deleted_objs;
int64_t safe_timestamp = INT64_MAX;
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(observer::ObGlobalReqTimeService::get_instance().get_global_safe_timestamp(safe_timestamp))) {
SQL_PC_LOG(ERROR, "failed to get global safe timestamp", K(ret));
} else if (OB_FAIL(dump_deleted_objs<DUMP_ALL>(deleted_objs, safe_timestamp))) {
SQL_PC_LOG(WARN, "failed to get deleted sql objs", K(ret));
} else {
LOG_INFO("Deleted Cache Objs", K(deleted_objs));
for (int64_t i = 0; i < deleted_objs.count(); i++) { // ignore error code and continue
if (OB_FAIL(ObCacheObjectFactory::destroy_cache_obj(true,
deleted_objs.at(i).obj_id_,
this))) {
LOG_WARN("failed to destroy cache obj", K(ret));
}
}
}
return ret;
}
int ObPlanCache::flush_lib_cache_by_ns(const ObLibCacheNameSpace ns)
{
int ret = OB_SUCCESS;
observer::ObReqTimeGuard req_timeinfo_guard;
int64_t safe_timestamp = INT64_MAX;
ObArray<AllocCacheObjInfo> deleted_objs;
if (OB_FAIL(cache_evict_by_ns(ns))) {
SQL_PC_LOG(ERROR, "cache evict by ns failed, please check", K(ret));
} else if (OB_FAIL(observer::ObGlobalReqTimeService::get_instance()
.get_global_safe_timestamp(safe_timestamp))) {
SQL_PC_LOG(ERROR, "failed to get global safe timestamp", K(ret));
} else if (OB_FAIL(dump_deleted_objs_by_ns(deleted_objs, safe_timestamp, ns))) {
SQL_PC_LOG(ERROR, "failed to dump deleted objs by ns", K(ret));
} else {
LOG_INFO("Deleted Cache Objs", K(deleted_objs));
for (int i = 0; i < deleted_objs.count(); i++) { // ignore error code and continue
if (OB_FAIL(ObCacheObjectFactory::destroy_cache_obj(true,
deleted_objs.at(i).obj_id_,
this))) {
LOG_WARN("failed to destroy cache obj", K(ret));
}
}
}
return ret;
}
int ObPlanCache::flush_pl_cache()
{
int ret = OB_SUCCESS;
observer::ObReqTimeGuard req_timeinfo_guard;
int64_t safe_timestamp = INT64_MAX;
ObArray<AllocCacheObjInfo> deleted_objs;
if (OB_FAIL(ObPLCacheMgr::cache_evict_all_pl(this))) {
SQL_PC_LOG(ERROR, "PL cache evict failed, please check", K(ret));
} else if (OB_FAIL(observer::ObGlobalReqTimeService::get_instance()
.get_global_safe_timestamp(safe_timestamp))) {
SQL_PC_LOG(ERROR, "failed to get global safe timestamp", K(ret));
} else if (OB_FAIL(dump_deleted_objs<DUMP_PL>(deleted_objs, safe_timestamp))) {
SQL_PC_LOG(ERROR, "failed to dump deleted pl objs", K(ret));
} else {
LOG_INFO("Deleted Cache Objs", K(deleted_objs));
for (int i = 0; i < deleted_objs.count(); i++) { // ignore error code and continue
if (OB_FAIL(ObCacheObjectFactory::destroy_cache_obj(true,
deleted_objs.at(i).obj_id_,
this))) {
LOG_WARN("failed to destroy cache obj", K(ret));
}
}
}
return ret;
}
const char *plan_cache_gc_confs[3] = { "OFF", "REPORT", "AUTO" };
int ObPlanCache::get_plan_cache_gc_strategy()
{
PlanCacheGCStrategy strategy = INVALID;
for (int i = 0; i < ARRAYSIZEOF(plan_cache_gc_confs) && strategy == INVALID; i++) {
if (0 == ObString::make_string(plan_cache_gc_confs[i])
.case_compare(GCONF._ob_plan_cache_gc_strategy)) {
strategy = static_cast<PlanCacheGCStrategy>(i);
}
}
return strategy;
}
void ObPlanCacheEliminationTask::runTimerTask()
{
int ret = OB_SUCCESS;
++run_task_counter_;
const int64_t auto_flush_pc_interval = (int64_t)(GCONF._ob_plan_cache_auto_flush_interval) / (1000 * 1000L); // second
{
// 在调用plan cache接口前引用plan资源前必须定义guard
observer::ObReqTimeGuard req_timeinfo_guard;
run_plan_cache_task();
if (0 != auto_flush_pc_interval
&& 0 == run_task_counter_ % auto_flush_pc_interval) {
IGNORE_RETURN plan_cache_->flush_plan_cache();
}
SQL_PC_LOG(INFO, "schedule next cache evict task",
"evict_interval", (int64_t)(GCONF.plan_cache_evict_interval));
}
// free cache obj in deleted map
if (plan_cache_->get_plan_cache_gc_strategy() > 0) {
observer::ObReqTimeGuard req_timeinfo_guard;
run_free_cache_obj_task();
}
SQL_PC_LOG(INFO, "schedule next cache evict task",
"evict_interval", (int64_t)(GCONF.plan_cache_evict_interval));
}
void ObPlanCacheEliminationTask::run_plan_cache_task()
{
int ret = OB_SUCCESS;
if (OB_FAIL(plan_cache_->update_memory_conf())) { //如果失败, 则不更新设置, 也不影响其他流程
SQL_PC_LOG(WARN, "fail to update plan cache memory sys val", K(ret));
}
if (OB_FAIL(plan_cache_->cache_evict())) {
SQL_PC_LOG(ERROR, "Plan cache evict failed, please check", K(ret));
}
if (OB_FAIL(plan_cache_->asyn_update_baseline())) {
SQL_PC_LOG(ERROR, "asyn replace plan baseline failed", K(ret));
}
}
void ObPlanCacheEliminationTask::run_free_cache_obj_task()
{
int ret = OB_SUCCESS;
ObArray<AllocCacheObjInfo> deleted_objs;
int64_t safe_timestamp = INT64_MAX;
if (observer::ObGlobalReqTimeService::get_instance()
.get_global_safe_timestamp(safe_timestamp)) {
SQL_PC_LOG(ERROR, "failed to get global safe timestamp", K(ret));
} else if (OB_FAIL(plan_cache_->dump_deleted_objs<DUMP_ALL>(deleted_objs, safe_timestamp))) {
SQL_PC_LOG(WARN, "failed to traverse hashmap", K(ret));
} else {
int64_t tot_mem_used = 0;
for (int k = 0; k < deleted_objs.count(); k++) {
tot_mem_used += deleted_objs.at(k).mem_used_;
} // end for
if (tot_mem_used >= ((plan_cache_->get_mem_limit() / 100) * 30)) {
LOG_ERROR("Cache Object Memory Leaked Much!!!", K(tot_mem_used),
K(plan_cache_->get_mem_limit()), K(deleted_objs), K(safe_timestamp));
} else if (deleted_objs.count() > 0) {
LOG_WARN("Cache Object Memory Leaked Much!!!", K(deleted_objs),
K(safe_timestamp), K(plan_cache_->get_mem_limit()));
}
}
if (OB_SUCC(ret) && OB_FAIL(plan_cache_->dump_all_objs())) {
LOG_WARN("failed to dump deleted map", K(ret));
}
}
} // end of namespace sql
} // end of namespace oceanbase