diff --git a/CMakeLists.txt b/CMakeLists.txt index 94c3c133c5..bc7c46df5b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -42,6 +42,11 @@ if(ENABLE_LATCH_DIAGNOSE) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_LATCH_DIAGNOSE") endif() +if(ENABLE_500_FALLBACK) + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_500_FALLBACK") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_500_FALLBACK") +endif() + if(ENABLE_SMART_VAR_CHECK) set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_SMART_VAR_CHECK") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_SMART_VAR_CHECK") diff --git a/deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.cpp b/deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.cpp index 7d996e8b51..98b446ea08 100644 --- a/deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.cpp +++ b/deps/oblib/src/lib/alloc/ob_tenant_ctx_allocator.cpp @@ -371,7 +371,7 @@ int64_t ObTenantCtxAllocator::sync_wash(int64_t wash_size) int64_t washed_size = 0; auto stat = obj_mgr_.get_stat(); - const double min_utilization = 0.9; + const double min_utilization = 0.95; if (stat.payload_ * min_utilization > stat.used_) { washed_size = obj_mgr_.sync_wash(wash_size); } diff --git a/deps/oblib/src/lib/hash/ob_linear_hash_map.h b/deps/oblib/src/lib/hash/ob_linear_hash_map.h index ce4ff739ed..e2c7d6503f 100644 --- a/deps/oblib/src/lib/hash/ob_linear_hash_map.h +++ b/deps/oblib/src/lib/hash/ob_linear_hash_map.h @@ -638,7 +638,7 @@ ObLinearHashMap::HashMapMemMgrCore::HashMapMemMgrCore() // page_size = OB_MALLOC_BIG_BLOCK_SIZE; } else { - total_limit /= (lib::ObRunningModeConfig::MINI_MEM_UPPER / lib::ObRunningModeConfig::instance().memory_limit_); + total_limit *= lib::mini_mode_resource_ratio(); page_size = OB_MALLOC_MIDDLE_BLOCK_SIZE; } // Init dir alloc. diff --git a/deps/oblib/src/lib/ob_running_mode.cpp b/deps/oblib/src/lib/ob_running_mode.cpp index f2befd16f7..ee36d63339 100644 --- a/deps/oblib/src/lib/ob_running_mode.cpp +++ b/deps/oblib/src/lib/ob_running_mode.cpp @@ -19,5 +19,8 @@ const int64_t ObRunningModeConfig::MIN_MEM = 1L << 30; // The minimum value for const int64_t ObRunningModeConfig::MINI_MEM_LOWER = 4L << 30; const int64_t ObRunningModeConfig::MINI_MEM_UPPER = 12L << 30; const int64_t ObRunningModeConfig::MINI_CPU_UPPER = 8; + +bool __attribute__((weak)) mtl_is_mini_mode() { return false; } + } //end of namespace lib } //end of namespace oceanbase diff --git a/deps/oblib/src/lib/ob_running_mode.h b/deps/oblib/src/lib/ob_running_mode.h index 9b085645ce..289304f1d3 100644 --- a/deps/oblib/src/lib/ob_running_mode.h +++ b/deps/oblib/src/lib/ob_running_mode.h @@ -18,6 +18,8 @@ namespace oceanbase { namespace lib { +extern bool mtl_is_mini_mode(); + struct ObRunningModeConfig { static const int64_t MIN_MEM; @@ -40,7 +42,7 @@ inline ObRunningModeConfig &ObRunningModeConfig::instance() inline bool is_mini_mode() { - return ObRunningModeConfig::instance().mini_mode_; + return ObRunningModeConfig::instance().mini_mode_ || mtl_is_mini_mode(); } inline bool is_mini_cpu_mode() @@ -48,6 +50,19 @@ inline bool is_mini_cpu_mode() return ObRunningModeConfig::instance().mini_cpu_mode_; } +inline double mini_mode_resource_ratio() +{ + int64_t memory_limit = ObRunningModeConfig::instance().memory_limit_; + int64_t upper = ObRunningModeConfig::instance().MINI_MEM_UPPER; + double ratio = 1.0; + if (0 == memory_limit || memory_limit >= upper) { + ratio = 1.0; + } else { + ratio = (double)memory_limit / upper; + } + return ratio; +} + inline void update_mini_mode(int64_t memory_limit, int64_t cpu_cnt) { ObRunningModeConfig::instance().memory_limit_ = memory_limit; diff --git a/deps/oblib/src/lib/oblog/ob_base_log_writer.cpp b/deps/oblib/src/lib/oblog/ob_base_log_writer.cpp index 9be9a60844..d2bc6a7c2c 100644 --- a/deps/oblib/src/lib/oblog/ob_base_log_writer.cpp +++ b/deps/oblib/src/lib/oblog/ob_base_log_writer.cpp @@ -58,6 +58,7 @@ int ObBaseLogWriter::init( const uint64_t tenant_id) { int ret = OB_SUCCESS; + ObMemAttr attr(tenant_id, "BaseLogWriter"); if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_STDERR("The ObBaseLogWriter has been inited.\n"); @@ -69,10 +70,10 @@ int ObBaseLogWriter::init( LOG_STDERR("Fail to allocate memory, max_buffer_item_cnt=%lu.\n", log_cfg.max_buffer_item_cnt_); } else if (0 != pthread_mutex_init(&thread_mutex_, NULL)) { ret = OB_ERR_SYS; - } else if (OB_ISNULL(log_write_cond_ = OB_NEW(SimpleCond, "BaseLogWriter"))) { + } else if (OB_ISNULL(log_write_cond_ = OB_NEW(SimpleCond, attr))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_STDERR("Fail to allocate memory, max_buffer_item_cnt=%lu.\n", log_cfg.max_buffer_item_cnt_); - } else if (OB_ISNULL(log_flush_cond_ = OB_NEW(SimpleCond, "BaseLogWriter"))) { + } else if (OB_ISNULL(log_flush_cond_ = OB_NEW(SimpleCond, attr))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_STDERR("Fail to allocate memory, max_buffer_item_cnt=%lu.\n", log_cfg.max_buffer_item_cnt_); } else { diff --git a/deps/oblib/src/lib/oblog/ob_base_log_writer.h b/deps/oblib/src/lib/oblog/ob_base_log_writer.h index a402d0185e..53768286fd 100644 --- a/deps/oblib/src/lib/oblog/ob_base_log_writer.h +++ b/deps/oblib/src/lib/oblog/ob_base_log_writer.h @@ -17,6 +17,7 @@ #include #include +#include "lib/ob_define.h" #include "lib/utility/ob_macro_utils.h" namespace oceanbase @@ -62,7 +63,7 @@ struct ObBaseLogWriterCfg static const uint64_t DEFAULT_MAX_BUFFER_ITEM_CNT = 1024; uint64_t max_buffer_item_cnt_; uint64_t group_commit_max_wait_us_; - uint64_t group_commit_min_item_cnt_; + uint64_t group_commit_min_item_cnt_; uint64_t group_commit_max_item_cnt_; }; @@ -74,7 +75,7 @@ public: virtual int init( const ObBaseLogWriterCfg &log_cfg, const char *thread_name = "ALOG", - const uint64_t tenant_id = 0); + const uint64_t tenant_id = OB_SERVER_TENANT_ID); virtual int start(); virtual void stop(); virtual void wait(); diff --git a/deps/oblib/src/lib/queue/ob_dedup_queue.cpp b/deps/oblib/src/lib/queue/ob_dedup_queue.cpp index 7e32548141..9b4830b50b 100644 --- a/deps/oblib/src/lib/queue/ob_dedup_queue.cpp +++ b/deps/oblib/src/lib/queue/ob_dedup_queue.cpp @@ -45,6 +45,7 @@ ObDedupQueue::ObDedupQueue() : is_inited_(false), work_thread_num_(DEFAULT_THREAD_NUM), thread_dead_threshold_(DEFALT_THREAD_DEAD_THRESHOLD), hash_allocator_(allocator_), + bucket_allocator_(&allocator_), gc_queue_head_(NULL), gc_queue_tail_(NULL), thread_name_(nullptr) @@ -89,7 +90,7 @@ int ObDedupQueue::init(const int64_t thread_num /*= DEFAULT_THREAD_NUM*/, COMMON_LOG(WARN, "allocator init fail", K(page_size), K(label), K(tenant_id), K(total_mem_limit), K(ret)); } else if (OB_SUCCESS != (ret = task_map_.create(task_map_size, &hash_allocator_, - ObModIds::OB_HASH_BUCKET_TASK_MAP))) { + &bucket_allocator_))) { COMMON_LOG(WARN, "task_map create fail", K(ret)); } else if (OB_SUCCESS != (ret = task_queue_.init(queue_size, &allocator_))) { COMMON_LOG(WARN, "task_queue init fail", K(ret)); diff --git a/deps/oblib/src/lib/queue/ob_dedup_queue.h b/deps/oblib/src/lib/queue/ob_dedup_queue.h index 7472baae3b..33187398af 100644 --- a/deps/oblib/src/lib/queue/ob_dedup_queue.h +++ b/deps/oblib/src/lib/queue/ob_dedup_queue.h @@ -189,7 +189,9 @@ private: hash::MultiWriteDefendMode, hash::hash_func, hash::equal_to, - HashAllocator> TaskMap; + HashAllocator, + common::hash::NormalPointer, + common::ObWrapperAllocator> TaskMap; typedef hash::HashMapTypes::pair_type TaskMapKVPair; static const int32_t DEFAULT_THREAD_NUM = 4; static const int32_t MAX_THREAD_NUM = 64; @@ -301,6 +303,7 @@ private: int64_t thread_dead_threshold_; ObConcurrentFIFOAllocator allocator_; HashAllocator hash_allocator_; + common::ObWrapperAllocator bucket_allocator_; TaskMap task_map_; TaskQueue task_queue_; ObThreadCond task_queue_sync_; diff --git a/deps/oblib/src/lib/stat/ob_di_cache.cpp b/deps/oblib/src/lib/stat/ob_di_cache.cpp index 989f5ca9b7..806e191c86 100644 --- a/deps/oblib/src/lib/stat/ob_di_cache.cpp +++ b/deps/oblib/src/lib/stat/ob_di_cache.cpp @@ -233,7 +233,8 @@ int ObDIThreadTenantCache::get_node(uint64_t tenant_id, ObDITenantCollect *&tena int ret = OB_SUCCESS; if (OB_ISNULL(tenant_collect = tenant_cache_.get_node(tenant_id))) { if (nullptr == extend_tenant_cache_) { - extend_tenant_cache_ = OB_NEW(ObDIBaseTenantCache, "di_tenant_cache"); + extend_tenant_cache_ = OB_NEW(ObDIBaseTenantCache, + SET_USE_500("di_tenant_cache")); } if (nullptr != extend_tenant_cache_) { tenant_collect = extend_tenant_cache_->get_node(tenant_id, true /*replace*/); diff --git a/deps/oblib/src/lib/task/ob_timer.cpp b/deps/oblib/src/lib/task/ob_timer.cpp index caff17df6e..d22de8d605 100644 --- a/deps/oblib/src/lib/task/ob_timer.cpp +++ b/deps/oblib/src/lib/task/ob_timer.cpp @@ -25,13 +25,13 @@ using namespace obutil; using namespace lib; -int ObTimer::init(const char* thread_name) +int ObTimer::init(const char* thread_name, const ObMemAttr &attr) { int ret = OB_SUCCESS; if (is_inited_) { ret = OB_INIT_TWICE; } else { - tokens_ = reinterpret_cast(ob_malloc(sizeof(Token) * max_task_num_, "timer")); + tokens_ = reinterpret_cast(ob_malloc(sizeof(Token) * max_task_num_, attr)); if (nullptr == tokens_) { ret = OB_ALLOCATE_MEMORY_FAILED; OB_LOG(ERROR, "failed to alloc memory", K(ret)); diff --git a/deps/oblib/src/lib/task/ob_timer.h b/deps/oblib/src/lib/task/ob_timer.h index c84b6595ef..64d22ddb2c 100644 --- a/deps/oblib/src/lib/task/ob_timer.h +++ b/deps/oblib/src/lib/task/ob_timer.h @@ -70,7 +70,8 @@ public: is_destroyed_(false), tokens_(nullptr), has_running_task_(false), has_running_repeat_task_(false), thread_id_(-1), thread_name_(nullptr) {} ~ObTimer(); - int init(const char* thread_name = nullptr); + int init(const char* thread_name = nullptr, + const ObMemAttr &attr = ObMemAttr(OB_SERVER_TENANT_ID, "timer")); bool inited() const; int create(); // create new timer thread and start int start(); // only start diff --git a/deps/oblib/src/lib/thread/thread_mgr.h b/deps/oblib/src/lib/thread/thread_mgr.h index fba673e40a..e45825ca72 100644 --- a/deps/oblib/src/lib/thread/thread_mgr.h +++ b/deps/oblib/src/lib/thread/thread_mgr.h @@ -129,6 +129,8 @@ class ITG public: ITG() : tg_helper_(nullptr), tg_cgroup_(lib::ThreadCGroup::INVALID_CGROUP) {} virtual ~ITG() {} + int64_t get_tenant_id() const + { return NULL == tg_helper_ ? common::OB_SERVER_TENANT_ID : tg_helper_->id(); } virtual int thread_cnt() = 0; virtual int set_thread_cnt(int64_t) = 0; virtual int start() = 0; @@ -455,7 +457,7 @@ public: int set_handler(TGTaskHandler &handler) { int ret = common::OB_SUCCESS; - uint64_t tenant_id = NULL == tg_helper_ ? common::OB_SERVER_TENANT_ID : tg_helper_->id(); + uint64_t tenant_id = get_tenant_id(); if (qth_ != nullptr) { ret = common::OB_ERR_UNEXPECTED; } else { @@ -756,7 +758,8 @@ public: } else { timer_ = new (buf_) common::ObTimer(max_task_num_); timer_->set_run_wrapper(tg_helper_, tg_cgroup_); - if (OB_FAIL(timer_->init(attr_.name_))) { + if (OB_FAIL(timer_->init(attr_.name_, + ObMemAttr(get_tenant_id(), "TGTimer")))) { OB_LOG(WARN, "init failed", K(ret)); } } diff --git a/deps/oblib/src/rpc/frame/ob_req_queue_thread.cpp b/deps/oblib/src/rpc/frame/ob_req_queue_thread.cpp index b8e5cf37a9..823616ac90 100644 --- a/deps/oblib/src/rpc/frame/ob_req_queue_thread.cpp +++ b/deps/oblib/src/rpc/frame/ob_req_queue_thread.cpp @@ -31,14 +31,14 @@ using namespace oceanbase::rpc::frame; using namespace oceanbase::common; using namespace oceanbase::lib; -ObReqQueue::ObReqQueue(int queue_capacity) +ObReqQueue::ObReqQueue(int capacity) : wait_finish_(true), push_worker_count_(0), + capacity_(capacity), queue_(), qhandler_(NULL), host_() { - queue_.init(queue_capacity); } ObReqQueue::~ObReqQueue() @@ -47,6 +47,11 @@ ObReqQueue::~ObReqQueue() queue_.destroy(); } +int ObReqQueue::init(const int64_t tenant_id) +{ + return queue_.init(capacity_, "ReqQueue", tenant_id); +} + void ObReqQueue::set_qhandler(ObiReqQHandler *qhandler) { if (OB_ISNULL(qhandler)) { diff --git a/deps/oblib/src/rpc/frame/ob_req_queue_thread.h b/deps/oblib/src/rpc/frame/ob_req_queue_thread.h index 36912defff..eca7639bd4 100644 --- a/deps/oblib/src/rpc/frame/ob_req_queue_thread.h +++ b/deps/oblib/src/rpc/frame/ob_req_queue_thread.h @@ -38,6 +38,8 @@ public: virtual ~ObReqQueue(); + int init(const int64_t tenant_id = OB_SERVER_TENANT_ID); + void set_qhandler(ObiReqQHandler *handler); bool push(ObRequest *req, int max_queue_len, bool block = true); @@ -72,6 +74,7 @@ protected: bool wait_finish_; int push_worker_count_; + int64_t capacity_; common::ObLightyQueue queue_; ObiReqQHandler *qhandler_; diff --git a/src/logservice/leader_coordinator/ob_leader_coordinator.cpp b/src/logservice/leader_coordinator/ob_leader_coordinator.cpp index 0ee08ac071..c5b809df55 100644 --- a/src/logservice/leader_coordinator/ob_leader_coordinator.cpp +++ b/src/logservice/leader_coordinator/ob_leader_coordinator.cpp @@ -48,6 +48,7 @@ struct AllLsElectionReferenceInfoFactory COORDINATOR_LOG_RET(ERROR, OB_ALLOCATE_MEMORY_FAILED, "alloc memory failed"); } else { new(new_all_ls_election_reference_info) ObArray(); + new_all_ls_election_reference_info->set_attr(ObMemAttr(MTL_ID(), "LsElectRefInfo")); } return new_all_ls_election_reference_info; } diff --git a/src/logservice/palf/election/utils/election_member_list.cpp b/src/logservice/palf/election/utils/election_member_list.cpp index 99d8a263a3..23f3bc5852 100644 --- a/src/logservice/palf/election/utils/election_member_list.cpp +++ b/src/logservice/palf/election/utils/election_member_list.cpp @@ -23,7 +23,10 @@ namespace election using namespace common; -MemberList::MemberList() : replica_num_(0) {} +MemberList::MemberList() : replica_num_(0) +{ + addr_list_.set_attr(ObMemAttr(OB_SERVER_TENANT_ID, "AddrList")); +} bool MemberList::only_membership_version_different(const MemberList &rhs) const { @@ -132,4 +135,4 @@ const ObArray &MemberList::get_addr_list() const { return addr_list_; } } } -} \ No newline at end of file +} diff --git a/src/logservice/palf/election/utils/election_utils.cpp b/src/logservice/palf/election/utils/election_utils.cpp index 0901f1f466..59f2749987 100644 --- a/src/logservice/palf/election/utils/election_utils.cpp +++ b/src/logservice/palf/election/utils/election_utils.cpp @@ -31,7 +31,9 @@ using namespace share; OB_SERIALIZE_MEMBER(Lease, owner_, lease_end_ts_, ballot_number_); MemberListWithStates::MemberListWithStates() -:p_impl_(nullptr) {} +:p_impl_(nullptr) +{ +} int MemberListWithStates::init() { diff --git a/src/logservice/palf/election/utils/election_utils.h b/src/logservice/palf/election/utils/election_utils.h index 0b3a42c48a..6acc235a31 100644 --- a/src/logservice/palf/election/utils/election_utils.h +++ b/src/logservice/palf/election/utils/election_utils.h @@ -267,6 +267,15 @@ private: private: struct PImpl { + PImpl() + { + prepare_ok_.set_attr( + ObMemAttr(OB_SERVER_TENANT_ID, "PrepareOk")); + accept_ok_promise_not_vote_before_local_ts_.set_attr( + ObMemAttr(OB_SERVER_TENANT_ID, "acceptOK")); + follower_renew_lease_success_membership_version_.set_attr( + ObMemAttr(OB_SERVER_TENANT_ID, "followerRenew")); + } MemberList member_list_; common::ObArray prepare_ok_; common::ObArray accept_ok_promise_not_vote_before_local_ts_; @@ -408,4 +417,4 @@ private: }// namespace palf }// namesapce oceanbase -#endif \ No newline at end of file +#endif diff --git a/src/observer/dbms_job/ob_dbms_job_master.cpp b/src/observer/dbms_job/ob_dbms_job_master.cpp index 3a48d30b71..fba4ec46e1 100644 --- a/src/observer/dbms_job/ob_dbms_job_master.cpp +++ b/src/observer/dbms_job/ob_dbms_job_master.cpp @@ -242,7 +242,7 @@ int ObDBMSJobMaster::init(ObISQLClient *sql_client, int ret = OB_SUCCESS; uint64_t ready_queue_size = MAX_READY_JOBS_CAPACITY; if (is_mini_mode()) { - ready_queue_size /= (lib::ObRunningModeConfig::MINI_MEM_UPPER / lib::ObRunningModeConfig::instance().memory_limit_); + ready_queue_size *= lib::mini_mode_resource_ratio(); } if (inited_) { ret = OB_INIT_TWICE; @@ -503,7 +503,7 @@ int ObDBMSJobMaster::get_all_servers(int64_t tenant_id, ObString &pick_zone, ObI } } } - } + } } } return ret; @@ -725,4 +725,3 @@ int ObDBMSJobMaster::register_job( } // end for namespace dbms_job } // end for namespace oceanbase - diff --git a/src/observer/main.cpp b/src/observer/main.cpp index 17d6fe18ca..041cf2b3a5 100644 --- a/src/observer/main.cpp +++ b/src/observer/main.cpp @@ -439,6 +439,7 @@ int main(int argc, char *argv[]) snprintf(ob_get_tname(), OB_THREAD_NAME_BUF_LEN, "observer"); } ObStackHeaderGuard stack_header_guard; + // just take effect in observer #ifndef OB_USE_ASAN init_malloc_hook(); #endif diff --git a/src/observer/mysql/ob_mysql_request_manager.cpp b/src/observer/mysql/ob_mysql_request_manager.cpp index 9e25aa600f..ec4df8f59b 100644 --- a/src/observer/mysql/ob_mysql_request_manager.cpp +++ b/src/observer/mysql/ob_mysql_request_manager.cpp @@ -289,7 +289,7 @@ int ObMySQLRequestManager::get_mem_limit(uint64_t tenant_id, int ObMySQLRequestManager::mtl_new(ObMySQLRequestManager* &req_mgr) { int ret = OB_SUCCESS; - req_mgr = OB_NEW(ObMySQLRequestManager, ObModIds::OB_MYSQL_REQUEST_RECORD); + req_mgr = OB_NEW(ObMySQLRequestManager, ObMemAttr(MTL_ID(), ObModIds::OB_MYSQL_REQUEST_RECORD)); if (nullptr == req_mgr) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to alloc memory for ObMySQLRequestManager", K(ret)); diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index e7210fddb6..a37307b5b3 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -2035,7 +2035,7 @@ int ObServer::init_global_kvcache() int64_t bucket_num = ObKVGlobalCache::get_instance().get_suitable_bucket_num(); int64_t max_cache_size = ObKVGlobalCache::DEFAULT_MAX_CACHE_SIZE; if (is_mini_mode()) { - max_cache_size /= (lib::ObRunningModeConfig::MINI_MEM_UPPER / lib::ObRunningModeConfig::instance().memory_limit_); + max_cache_size *= lib::mini_mode_resource_ratio(); } if (OB_FAIL(ObKVGlobalCache::get_instance().init(&ObTenantMemLimitGetter::get_instance(), bucket_num, diff --git a/src/observer/ob_srv_deliver.cpp b/src/observer/ob_srv_deliver.cpp index 8033bf87e9..bcb55feb15 100644 --- a/src/observer/ob_srv_deliver.cpp +++ b/src/observer/ob_srv_deliver.cpp @@ -322,6 +322,8 @@ int ObSrvDeliver::create_queue_thread(int tg_id, const char *thread_name, QueueT qthread = OB_NEW(QueueThread, ObModIds::OB_RPC, thread_name); if (OB_ISNULL(qthread)) { ret = OB_ALLOCATE_MEMORY_FAILED; + } else if (OB_FAIL(qthread->init())) { + LOG_WARN("init qthread failed", K(ret)); } else { qthread->queue_.set_qhandler(&qhandler_); } diff --git a/src/observer/ob_srv_deliver.h b/src/observer/ob_srv_deliver.h index c69ae16803..6c43734f29 100644 --- a/src/observer/ob_srv_deliver.h +++ b/src/observer/ob_srv_deliver.h @@ -43,12 +43,14 @@ class QueueThread { public: QueueThread(const char *thread_name = nullptr, - uint64_t tenant_id = common::OB_INVALID_ID) + uint64_t tenant_id = OB_SERVER_TENANT_ID) : thread_(queue_, thread_name, tenant_id), tg_id_(0), tenant_id_(tenant_id), n_thread_(0) {} ~QueueThread() { destroy(); } + int init() { return queue_.init(tenant_id_); } + public: int set_thread_count(int thread_cnt) { int ret = OB_SUCCESS; diff --git a/src/observer/omt/ob_multi_tenant.cpp b/src/observer/omt/ob_multi_tenant.cpp index 4a761b04b3..b83613c3d3 100644 --- a/src/observer/omt/ob_multi_tenant.cpp +++ b/src/observer/omt/ob_multi_tenant.cpp @@ -306,10 +306,12 @@ static int start_mysql_queue(QueueThread *&qthread) int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); if (is_sys_tenant(tenant_id) || is_user_tenant(tenant_id)) { - qthread = OB_NEW(QueueThread, ObModIds::OB_RPC, "MysqlQueueTh", tenant_id); + qthread = OB_NEW(QueueThread, ObMemAttr(tenant_id, ObModIds::OB_RPC), "MysqlQueueTh", tenant_id); if (OB_ISNULL(qthread)) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new qthread", K(ret), K(tenant_id)); + } else if (OB_FAIL(qthread->init())) { + LOG_WARN("init qthread failed", K(tenant_id), K(ret)); } else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::MysqlQueueTh, qthread->tg_id_))) { LOG_WARN("mysql queue init failed", K(ret), K(tenant_id), diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index 3bf2eadb86..969330264f 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -80,7 +80,8 @@ int ObPxPools::init(uint64_t tenant_id) static int PX_POOL_COUNT = 128; // 128 groups, generally enough int ret = OB_SUCCESS; tenant_id_ = tenant_id; - if (OB_FAIL(pool_map_.create(PX_POOL_COUNT, "PxPoolBkt", "PxPoolNode"))) { + ObMemAttr attr(tenant_id, "PxPoolBkt"); + if (OB_FAIL(pool_map_.create(PX_POOL_COUNT, attr, attr))) { LOG_WARN("fail init pool map", K(ret)); } return ret; @@ -110,7 +111,7 @@ int ObPxPools::create_pool(int64_t group_id, ObPxPool *&pool) common::SpinWLockGuard g(lock_); if (OB_FAIL(pool_map_.get_refactored(group_id, pool))) { if (OB_HASH_NOT_EXIST == ret) { - pool = OB_NEW(ObPxPool, common::ObModIds::OMT_TENANT); + pool = OB_NEW(ObPxPool, ObMemAttr(tenant_id_, "PxPool")); if (OB_ISNULL(pool)) { ret = common::OB_ALLOCATE_MEMORY_FAILED; } else { @@ -198,7 +199,7 @@ int ObPxPool::submit(const RunFuncT &func) if (ATOMIC_LOAD(&active_threads_) < ATOMIC_LOAD(&concurrency_)) { ret = OB_SIZE_OVERFLOW; } else { - Task *t = OB_NEW(Task, ObModIds::OMT_TENANT, func); + Task *t = OB_NEW(Task, ObMemAttr(tenant_id_, "PxTask"), func); if (OB_ISNULL(t)) { ret = OB_ALLOCATE_MEMORY_FAILED; } else if (OB_FAIL(queue_.push(static_cast(t), 0))) { @@ -219,7 +220,7 @@ void ObPxPool::handle(ObLink *task) LOG_ERROR_RET(OB_INVALID_ARGUMENT, "px task is invalid"); } else { t->func_(); - OB_DELETE(Task, ObModIds::OMT_TENANT, t); + OB_DELETE(Task, "PxTask", t); } ATOMIC_DEC(&concurrency_); } @@ -455,7 +456,7 @@ int GroupMap::create_and_insert_group(int32_t group_id, ObTenant *tenant, ObCgro } else { const int64_t alloc_size = sizeof(ObResourceGroup); ObResourceGroup *buf = nullptr; - if (nullptr == (buf = (ObResourceGroup*)ob_malloc(alloc_size, ObModIds::OMT_TENANT))) { + if (nullptr == (buf = (ObResourceGroup*)ob_malloc(alloc_size, ObMemAttr(tenant->id(), "ResourceGroup")))) { ret = OB_ALLOCATE_MEMORY_FAILED; } else { group = new(buf)ObResourceGroup(group_id, tenant, cgroup_ctrl); @@ -630,11 +631,11 @@ int ObTenant::init(const ObTenantMeta &meta) if (OB_FAIL(ObTenantBase::init(&cgroup_ctrl_))) { LOG_WARN("fail to init tenant base", K(ret)); } else if (FALSE_IT(req_queue_.set_limit(GCONF.tenant_task_queue_size))) { - } else if (OB_ISNULL(multi_level_queue_ = OB_NEW(ObMultiLevelQueue, ObModIds::OMT_TENANT))) { + } else if (OB_ISNULL(multi_level_queue_ = OB_NEW(ObMultiLevelQueue, ObMemAttr(id_, "MulLevelQueue")))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("alloc ObMultiLevelQueue failed", K(ret), K(*this)); } else if (FALSE_IT(multi_level_queue_->set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size))) { - } else if (OB_ISNULL(rpc_stat_info_ = OB_NEW(RpcStatInfo, ObModIds::OMT_TENANT, id_))) { + } else if (OB_ISNULL(rpc_stat_info_ = OB_NEW(RpcStatInfo, ObMemAttr(id_, "RpcStatInfo"), id_))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("alloc RpcStatInfo failed", K(ret), K(*this)); } else if (OB_FAIL(construct_mtl_init_ctx(meta, mtl_init_ctx_))) { @@ -694,7 +695,7 @@ int ObTenant::init(const ObTenantMeta &meta) int ObTenant::construct_mtl_init_ctx(const ObTenantMeta &meta, share::ObTenantModuleInitCtx *&ctx) { int ret = OB_SUCCESS; - if (OB_ISNULL(ctx = OB_NEW(share::ObTenantModuleInitCtx, ObModIds::OMT_TENANT))) { + if (OB_ISNULL(ctx = OB_NEW(share::ObTenantModuleInitCtx, ObMemAttr(id_, "ModuleInitCtx")))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("alloc ObTenantModuleInitCtx failed", K(ret)); } else if (OB_FAIL(OB_FILE_SYSTEM_ROUTER.get_tenant_clog_dir(id_, mtl_init_ctx_->tenant_clog_dir_))) { diff --git a/src/observer/omt/ob_tenant.h b/src/observer/omt/ob_tenant.h index 9a287c9957..ef96421ad9 100644 --- a/src/observer/omt/ob_tenant.h +++ b/src/observer/omt/ob_tenant.h @@ -130,7 +130,7 @@ public: { int ret = common::OB_SUCCESS; uint64_t tenant_id = MTL_ID(); - pools = OB_NEW(ObPxPools, common::ObModIds::OMT_TENANT); + pools = OB_NEW(ObPxPools, ObMemAttr(tenant_id, "PxPools")); if (OB_ISNULL(pools)) { ret = common::OB_ALLOCATE_MEMORY_FAILED; } else if (OB_FAIL(pools->init(tenant_id))) { diff --git a/src/observer/table_load/ob_table_load_service.cpp b/src/observer/table_load/ob_table_load_service.cpp index 2e4b08393c..0516953a09 100644 --- a/src/observer/table_load/ob_table_load_service.cpp +++ b/src/observer/table_load/ob_table_load_service.cpp @@ -306,7 +306,7 @@ int ObTableLoadService::start() LOG_WARN("ObTableLoadService not init", KR(ret), KP(this)); } else { gc_timer_.set_run_wrapper(MTL_CTX()); - if (OB_FAIL(gc_timer_.init("TLD_GC"))) { + if (OB_FAIL(gc_timer_.init("TLD_GC", ObMemAttr(MTL_ID(), "GC_TIMER")))) { LOG_WARN("fail to init gc timer", KR(ret)); } else if (OB_FAIL(gc_timer_.schedule(gc_task_, GC_INTERVAL, true))) { LOG_WARN("fail to schedule gc task", KR(ret)); diff --git a/src/observer/virtual_table/ob_gv_sql_audit.cpp b/src/observer/virtual_table/ob_gv_sql_audit.cpp index 0be50c10f2..8ecb86dfbf 100644 --- a/src/observer/virtual_table/ob_gv_sql_audit.cpp +++ b/src/observer/virtual_table/ob_gv_sql_audit.cpp @@ -56,7 +56,6 @@ ObGvSqlAudit::~ObGvSqlAudit() { void ObGvSqlAudit::reset() { if (with_tenant_ctx_ != nullptr && allocator_ != nullptr) { - SERVER_LOG(INFO, "nijia debug deconstruct", KP(this)); if (cur_mysql_req_mgr_ != nullptr && ref_.idx_ != -1) { cur_mysql_req_mgr_->revert(&ref_); } diff --git a/src/observer/virtual_table/ob_virtual_sql_plan_monitor.cpp b/src/observer/virtual_table/ob_virtual_sql_plan_monitor.cpp index 6f157a3cfb..8853cd68bb 100644 --- a/src/observer/virtual_table/ob_virtual_sql_plan_monitor.cpp +++ b/src/observer/virtual_table/ob_virtual_sql_plan_monitor.cpp @@ -61,7 +61,6 @@ ObVirtualSqlPlanMonitor::~ObVirtualSqlPlanMonitor() void ObVirtualSqlPlanMonitor::reset() { if (with_tenant_ctx_ != nullptr && allocator_ != nullptr) { - SERVER_LOG(INFO, "nijia debug deconstruct", KP(this)); if (cur_mysql_req_mgr_ != nullptr && ref_.idx_ != -1) { cur_mysql_req_mgr_->revert(&ref_); } @@ -841,4 +840,3 @@ int ObVirtualSqlPlanMonitor::convert_node_to_row(ObMonitorNode &node, ObNewRow * } return ret; } - diff --git a/src/rootserver/freeze/ob_major_freeze_service.cpp b/src/rootserver/freeze/ob_major_freeze_service.cpp index 766078c3a1..5e07482303 100644 --- a/src/rootserver/freeze/ob_major_freeze_service.cpp +++ b/src/rootserver/freeze/ob_major_freeze_service.cpp @@ -36,7 +36,7 @@ int ObMajorFreezeService::init(const uint64_t tenant_id) return ret; } -ObMajorFreezeService::~ObMajorFreezeService() +ObMajorFreezeService::~ObMajorFreezeService() { SpinWLockGuard w_guard(rw_lock_); ob_delete(tenant_major_freeze_); @@ -53,7 +53,7 @@ int ObMajorFreezeService::switch_to_leader() if (OB_ISNULL(tenant_major_freeze_)) { SpinWLockGuard w_guard(rw_lock_); if (OB_FAIL(alloc_tenant_major_freeze())) { - LOG_WARN("fail to alloc tenant_major_freeze", KR(ret), K_(tenant_id)); + LOG_WARN("fail to alloc tenant_major_freeze", KR(ret), K_(tenant_id)); } } else { SpinRLockGuard r_guard(rw_lock_); @@ -119,7 +119,7 @@ int ObMajorFreezeService::alloc_tenant_major_freeze() } else if (OB_NOT_NULL(tenant_major_freeze_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tenant_major_freeze is not null", K_(tenant_id), KR(ret), KP_(tenant_major_freeze)); - } else if (nullptr == (buf = common::ob_malloc(len, "tenant_mf_mgr"))) { + } else if (nullptr == (buf = common::ob_malloc(len, ObMemAttr(tenant_id_, "tenant_mf_mgr")))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory", KR(ret), K_(tenant_id), K(len)); } else if (FALSE_IT(tenant_major_freeze_ = new(buf) ObTenantMajorFreeze())) { @@ -130,7 +130,7 @@ int ObMajorFreezeService::alloc_tenant_major_freeze() } else if (OB_FAIL(tenant_major_freeze_->start())) { LOG_WARN("fail to start tenant_major_freeze", K_(tenant_id), KR(ret), K(is_primary_service)); } - + if (OB_SUCC(ret)) { LOG_INFO("succ to alloc tenant_major_freeze", K_(tenant_id), KP_(tenant_major_freeze), K(is_primary_service)); @@ -195,7 +195,7 @@ int ObMajorFreezeService::launch_major_freeze() } ATOMIC_STORE(&is_launched_, false); // set is as false no matter its previous value. } - + return ret; } @@ -247,7 +247,7 @@ int ObMajorFreezeService::check_inner_stat() if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not init", KR(ret), K_(tenant_id)); - } + } return ret; } diff --git a/src/rootserver/freeze/ob_major_merge_progress_checker.cpp b/src/rootserver/freeze/ob_major_merge_progress_checker.cpp index c6a833c443..6d7e76430d 100644 --- a/src/rootserver/freeze/ob_major_merge_progress_checker.cpp +++ b/src/rootserver/freeze/ob_major_merge_progress_checker.cpp @@ -70,6 +70,7 @@ int ObMajorMergeProgressChecker::init( } else if (OB_FAIL(cross_cluster_validator_.init(tenant_id, is_primary_service, sql_proxy, zone_merge_mgr))) { LOG_WARN("fail to init cross cluster validator", KR(ret), K(tenant_id)); } else { + table_ids_.set_attr(ObMemAttr(tenant_id, "TableIds")); tenant_id_ = tenant_id; sql_proxy_ = &sql_proxy; schema_service_ = &schema_service; @@ -241,7 +242,7 @@ int ObMajorMergeProgressChecker::check_merge_progress( if (OB_FAIL(iter.init(*sql_proxy_, tenant_id_))) { LOG_WARN("fail to init tablet table iterator", KR(ret), K_(tenant_id)); } - // Keep set_filter_not_exist_server before setting all the other filters, + // Keep set_filter_not_exist_server before setting all the other filters, // otherwise the other filters may return OB_ENTRY_NOT_EXIST error code. else if (OB_FAIL(iter.get_filters().set_filter_not_exist_server(*server_trace_))) { LOG_WARN("fail to set not exist server filter", KR(ret), K_(tenant_id)); diff --git a/src/share/cache/ob_kvcache_map.cpp b/src/share/cache/ob_kvcache_map.cpp index d1b83209c6..0a30fc5d00 100644 --- a/src/share/cache/ob_kvcache_map.cpp +++ b/src/share/cache/ob_kvcache_map.cpp @@ -28,7 +28,7 @@ ObKVCacheMap::ObKVCacheMap() bucket_num_(0), bucket_size_(0), buckets_(NULL), - store_(NULL), + store_(NULL), global_hazard_version_() { bucket_allocator_.set_label("CACHE_MAP_BKT"); @@ -56,7 +56,7 @@ int ObKVCacheMap::init(const int64_t bucket_num, ObKVCacheStore *store) } else { bucket_size_ = DEFAULT_BUCKET_SIZE; if (is_mini_mode()) { - bucket_size_ /= (lib::ObRunningModeConfig::MINI_MEM_UPPER / lib::ObRunningModeConfig::instance().memory_limit_); + bucket_size_ *= lib::mini_mode_resource_ratio(); bucket_size_ = bucket_size_ > MIN_BUCKET_SIZE ? bucket_size_ : MIN_BUCKET_SIZE; } const int64_t bucket_cnt = bucket_num % bucket_size_ == 0 ? @@ -209,13 +209,13 @@ int ObKVCacheMap::put( // update mb_handle_ and inst if (NULL == iter) { - // put new node + // put new node (void) ATOMIC_AAF(&inst.status_.kv_cnt_, 1); } else { // overwrite (void) ATOMIC_SAF(&iter->mb_handle_->kv_cnt_, 1); (void) ATOMIC_SAF(&iter->mb_handle_->get_cnt_, iter->get_cnt_); - + } (void) ATOMIC_AAF(&mb_handle->kv_cnt_, 1); (void) ATOMIC_AAF(&mb_handle->get_cnt_, 1); @@ -664,7 +664,7 @@ int ObKVCacheMap::replace_fragment_node(int64_t &start_pos, int64_t &replace_nod COMMON_LOG(WARN, "Invalid argument, ", K(start_pos), K_(bucket_num), K(replace_num), K(ret)); } else { ObTimeGuard tg("replace_fragement_node", 100000); - // The variable 'replace_start_pos' do not need atomic operation because it is only used by replace thread + // The variable 'replace_start_pos' do not need atomic operation because it is only used by replace thread int64_t replace_start_pos = start_pos % bucket_num_; int64_t replace_end_pos = MIN(replace_num + replace_start_pos, bucket_num_); Node *iter = NULL; @@ -816,7 +816,7 @@ int ObKVCacheMap::internal_data_move(Node *&prev, Node *&old_iter, Node *&bucket // update inst and mb_handle (void) ATOMIC_SAF(&old_iter->mb_handle_->kv_cnt_, 1); (void) ATOMIC_SAF(&old_iter->mb_handle_->get_cnt_, old_iter->get_cnt_); - (void) ATOMIC_AAF(&new_mb_handle->kv_cnt_, 1); + (void) ATOMIC_AAF(&new_mb_handle->kv_cnt_, 1); (void) ATOMIC_AAF(&new_mb_handle->get_cnt_, old_iter->get_cnt_); ++new_mb_handle->recent_get_cnt_; diff --git a/src/share/diagnosis/ob_sql_plan_monitor_node_list.cpp b/src/share/diagnosis/ob_sql_plan_monitor_node_list.cpp index b4543994e0..b75ca1857f 100644 --- a/src/share/diagnosis/ob_sql_plan_monitor_node_list.cpp +++ b/src/share/diagnosis/ob_sql_plan_monitor_node_list.cpp @@ -48,6 +48,7 @@ int ObPlanMonitorNodeList::init(uint64_t tenant_id, const int64_t queue_size) { int ret = OB_SUCCESS; + ObMemAttr attr(tenant_id, "SqlPlanMonMap"); if (inited_) { ret = OB_INIT_TWICE; } else if (OB_FAIL(queue_.init(MOD_LABEL, queue_size, tenant_id))) { @@ -56,8 +57,8 @@ int ObPlanMonitorNodeList::init(uint64_t tenant_id, SERVER_LOG(WARN, "create failed", K(ret)); } else if (OB_FAIL(node_map_.create(!lib::is_mini_mode() ? DEFAULT_BUCKETS_COUNT : DEFAULT_BUCKETS_COUNT / 100, - "SqlPlanMonMap", - "SqlPlanMonMap"))) { + attr, + attr))) { LOG_WARN("failed to create hash map", K(ret)); } else if (OB_FAIL(TG_START(tg_id_))) { SERVER_LOG(WARN, "init timer fail", K(ret)); @@ -101,12 +102,12 @@ void ObPlanMonitorNodeList::destroy() int ObPlanMonitorNodeList::mtl_init(ObPlanMonitorNodeList* &node_list) { int ret = OB_SUCCESS; - node_list = OB_NEW(ObPlanMonitorNodeList, MOD_LABEL); + uint64_t tenant_id = lib::current_resource_owner_id(); + node_list = OB_NEW(ObPlanMonitorNodeList, ObMemAttr(tenant_id, MOD_LABEL)); if (nullptr == node_list) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to alloc memory for ObPlanMonitorNodeList", K(ret)); } else { - uint64_t tenant_id = lib::current_resource_owner_id(); int64_t mem_limit = get_tenant_memory_limit(tenant_id); if (OB_FAIL(node_list->init(tenant_id, mem_limit, MAX_QUEUE_SIZE))) { LOG_WARN("failed to init event list", K(ret)); @@ -221,4 +222,3 @@ int ObSqlPlanMonitorRecycleTask::init(ObPlanMonitorNodeList *node_list) } return ret; } - diff --git a/src/share/io/ob_io_manager.cpp b/src/share/io/ob_io_manager.cpp index 6a68db6f28..f1513eec38 100644 --- a/src/share/io/ob_io_manager.cpp +++ b/src/share/io/ob_io_manager.cpp @@ -661,7 +661,7 @@ int ObTenantIOManager::init(const uint64_t tenant_id, LOG_WARN("init io usage failed", K(ret), K(io_usage_)); } else if (OB_FAIL(io_clock_->init(io_config, &io_usage_))) { LOG_WARN("init io clock failed", K(ret), K(io_config)); - } else if (OB_FAIL(init_group_index_map(io_config))) { + } else if (OB_FAIL(init_group_index_map(tenant_id, io_config))) { LOG_WARN("init group map failed", K(ret)); } else if (OB_FAIL(io_config_.deep_copy(io_config))) { LOG_WARN("copy io config failed", K(ret), K(io_config_)); @@ -703,7 +703,8 @@ int ObTenantIOManager::start() LOG_WARN("not init", K(ret), K(is_inited_)); } else if (is_working()) { // do nothing - } else if (OB_FAIL(callback_mgr_.init(io_config_.callback_thread_count_, DEFAULT_QUEUE_DEPTH, &io_allocator_))) { + } else if (OB_FAIL(callback_mgr_.init(tenant_id_, io_config_.callback_thread_count_, + DEFAULT_QUEUE_DEPTH, &io_allocator_))) { LOG_WARN("init callback manager failed", K(ret), K(tenant_id_), K(io_config_.callback_thread_count_)); } else { is_working_ = true; @@ -894,10 +895,12 @@ int ObTenantIOManager::alloc_io_clock(ObIAllocator &allocator, ObTenantIOClock * return ret; } -int ObTenantIOManager::init_group_index_map(const ObTenantIOConfig &io_config) +int ObTenantIOManager::init_group_index_map(const int64_t tenant_id, + const ObTenantIOConfig &io_config) { int ret = OB_SUCCESS; - if (OB_FAIL(group_id_index_map_.create(7, "GROUP_INDEX_MAP"))) { + ObMemAttr attr(tenant_id, "GROUP_INDEX_MAP"); + if (OB_FAIL(group_id_index_map_.create(7, attr, attr))) { LOG_WARN("create group index map failed", K(ret)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < io_config.group_num_; ++i) { diff --git a/src/share/io/ob_io_manager.h b/src/share/io/ob_io_manager.h index 9744ac5f17..5d0a5c145e 100644 --- a/src/share/io/ob_io_manager.h +++ b/src/share/io/ob_io_manager.h @@ -47,13 +47,13 @@ public: int pread(ObIOInfo &info, int64_t &read_size); int pwrite(ObIOInfo &info, int64_t &write_size); - + int detect_read(const ObIOInfo &info, ObIOHandle &handle, const uint64_t timeout_ms); // config related, thread safe int set_io_config(const ObIOConfig &conf); const ObIOConfig &get_io_config() const; - + // device health management ObIOFaultDetector &get_device_health_detector(); int get_device_health_status(ObDeviceHealthStatus &dhs, int64_t &device_abnormal_time); @@ -120,7 +120,7 @@ public: int update_basic_io_config(const ObTenantIOConfig &io_config); int alloc_io_request(ObIAllocator &allocator,const int64_t callback_size, ObIORequest *&req); int alloc_io_clock(ObIAllocator &allocator, ObTenantIOClock *&io_clock); - int init_group_index_map(const ObTenantIOConfig &io_config); + int init_group_index_map(const int64_t tenant_id, const ObTenantIOConfig &io_config); int get_group_index(const int64_t group_id, uint64_t &index); int modify_group_io_config(const uint64_t index, const int64_t min_percent, diff --git a/src/share/io/ob_io_struct.cpp b/src/share/io/ob_io_struct.cpp index bce3d56db4..26077a183f 100644 --- a/src/share/io/ob_io_struct.cpp +++ b/src/share/io/ob_io_struct.cpp @@ -107,7 +107,7 @@ int ObIOMemoryPool::init(const int64_t block_count, ObIAllocator &allocato if (OB_FAIL(ret)) { // do nothing - } else if (OB_FAIL(pool_.init(capacity_))) { + } else if (OB_FAIL(pool_.init(capacity_, allocator_))) { LOG_WARN("fail to init memory pool", K(ret)); } else if (OB_ISNULL(begin_ptr_ = reinterpret_cast(allocator_->alloc(capacity_ * SIZE)))){ ret = OB_ALLOCATE_MEMORY_FAILED; @@ -2546,10 +2546,12 @@ ObIOCallbackManager::~ObIOCallbackManager() destroy(); } -int ObIOCallbackManager::init(const int64_t thread_count, const int32_t queue_depth, ObIOAllocator *io_allocator) +int ObIOCallbackManager::init(const int64_t tenant_id, const int64_t thread_count, + const int32_t queue_depth, ObIOAllocator *io_allocator) { int ret = OB_SUCCESS; void *buf = nullptr; + runners_.set_attr(ObMemAttr(tenant_id, "IORunners")); if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret), K(is_inited_)); diff --git a/src/share/io/ob_io_struct.h b/src/share/io/ob_io_struct.h index d96e5123d6..9ddaf8e219 100644 --- a/src/share/io/ob_io_struct.h +++ b/src/share/io/ob_io_struct.h @@ -23,7 +23,7 @@ #include "lib/container/ob_array_wrap.h" #include "lib/lock/ob_spin_lock.h" #include "share/io/ob_io_define.h" -#include "share/io/io_schedule/ob_io_mclock.h" +#include "share/io/io_schedule/ob_io_mclock.h" namespace oceanbase { @@ -437,7 +437,8 @@ class ObIOCallbackManager final public: ObIOCallbackManager(); ~ObIOCallbackManager(); - int init(const int64_t thread_count, const int32_t queue_depth, ObIOAllocator *io_allocator); + int init(const int64_t tenant_id, int64_t thread_count, + const int32_t queue_depth, ObIOAllocator *io_allocator); void destroy(); int enqueue_callback(ObIORequest &req); @@ -483,7 +484,7 @@ private: int record_write_failure(); void set_device_warning(); void set_device_error(); - + private: static const int64_t WRITE_FAILURE_DETECT_EVENT_COUNT = 100; bool is_inited_; diff --git a/src/share/ls/ob_ls_info.cpp b/src/share/ls/ob_ls_info.cpp index fc6622bfcf..7d7caff02d 100644 --- a/src/share/ls/ob_ls_info.cpp +++ b/src/share/ls/ob_ls_info.cpp @@ -542,6 +542,7 @@ ObLSInfo::ObLSInfo( ls_id_(ls_id), replicas_() { + replicas_.set_attr(ObMemAttr(tenant_id, "LSInfo")); } ObLSInfo::~ObLSInfo() diff --git a/src/share/ob_get_compat_mode.cpp b/src/share/ob_get_compat_mode.cpp index 294ef84fce..4248b7e617 100644 --- a/src/share/ob_get_compat_mode.cpp +++ b/src/share/ob_get_compat_mode.cpp @@ -140,11 +140,13 @@ int ObCompatModeGetter::init(common::ObMySQLProxy *proxy) { int ret = OB_SUCCESS; + ObMemAttr attr(OB_SERVER_TENANT_ID, + ObModIds::OB_HASH_BUCKET_TENANT_COMPAT_MODE); + SET_USE_500(attr); if (IS_INIT) { ret = OB_INIT_TWICE; } else if (OB_FAIL(id_mode_map_.create(bucket_num, - ObModIds::OB_HASH_BUCKET_TENANT_COMPAT_MODE, - ObModIds::OB_HASH_NODE_TENANT_COMPAT_MODE))) { + attr))) { LOG_WARN("create hash table failed", K(ret)); } else { sql_proxy_ = proxy; @@ -272,5 +274,3 @@ int ObCompatModeGetter::reset_compat_getter_map() } return ret; } - - diff --git a/src/share/ob_global_autoinc_service.cpp b/src/share/ob_global_autoinc_service.cpp index 01563e08a3..0d796a3f9a 100644 --- a/src/share/ob_global_autoinc_service.cpp +++ b/src/share/ob_global_autoinc_service.cpp @@ -116,14 +116,15 @@ int ObAutoIncCacheNode::update_sync_value(const uint64_t sync_value) int ObGlobalAutoIncService::init(const ObAddr &addr, ObMySQLProxy *mysql_proxy) { int ret = OB_SUCCESS; + ObMemAttr attr(MTL_ID(), ObModIds::OB_AUTOINCREMENT); if (OB_ISNULL(mysql_proxy)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), KP(mysql_proxy)); } else if (OB_FAIL(inner_table_proxy_.init(mysql_proxy))) { LOG_WARN("init inner table proxy failed", K(ret)); } else if (OB_FAIL(autoinc_map_.create(ObGlobalAutoIncService::INIT_HASHMAP_SIZE, - ObModIds::OB_AUTOINCREMENT, - ObModIds::OB_AUTOINCREMENT))) { + attr, + attr))) { LOG_WARN("init autoinc_map_ failed", K(ret)); } else { for (int64_t i = 0; i < MUTEX_NUM; ++i) { diff --git a/src/share/ob_tablet_autoincrement_param.cpp b/src/share/ob_tablet_autoincrement_param.cpp index 5d2494723f..9e867d8d90 100644 --- a/src/share/ob_tablet_autoincrement_param.cpp +++ b/src/share/ob_tablet_autoincrement_param.cpp @@ -67,6 +67,7 @@ OB_SERIALIZE_MEMBER(ObMigrateTabletAutoincSeqParam, src_tablet_id_, dest_tablet_ ObTabletAutoincSeq::ObTabletAutoincSeq() : intervals_() { + intervals_.set_attr(ObMemAttr(OB_SERVER_TENANT_ID, "TabletAutoInc")); } int ObTabletAutoincSeq::assign(const ObTabletAutoincSeq &other) diff --git a/src/share/ob_upgrade_utils.cpp b/src/share/ob_upgrade_utils.cpp index e9af2f4374..954b5c4306 100644 --- a/src/share/ob_upgrade_utils.cpp +++ b/src/share/ob_upgrade_utils.cpp @@ -548,7 +548,8 @@ int ObUpgradeUtils::filter_sys_stat( /* =========== upgrade processor ============= */ ObUpgradeProcesserSet::ObUpgradeProcesserSet() - : inited_(false), allocator_("UpgProcSet"), + : inited_(false), allocator_(ObMemAttr(MTL_CTX() ? MTL_ID() : OB_SERVER_TENANT_ID, + "UpgProcSet")), processor_list_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator_)) { diff --git a/src/share/rc/ob_tenant_base.cpp b/src/share/rc/ob_tenant_base.cpp index 43860523e7..6e191662a9 100644 --- a/src/share/rc/ob_tenant_base.cpp +++ b/src/share/rc/ob_tenant_base.cpp @@ -26,6 +26,10 @@ int64_t mtl_id() { return MTL_CTX() != nullptr && MTL_ID() != 0 ? MTL_ID() : OB_SERVER_TENANT_ID; } +bool mtl_is_mini_mode() +{ + return MTL_CTX() != nullptr && MTL_IS_MINI_MODE(); +} } namespace share @@ -94,12 +98,13 @@ int ObTenantBase::init(ObCgroupCtrl *cgroup) { int ret = OB_SUCCESS; + ObMemAttr attr(id_, "DynamicFactor"); if (inited_) { ret = OB_INIT_TWICE; LOG_WARN("init twice error", K(ret)); } else if (OB_FAIL(tg_set_.create(1024))) { LOG_WARN("fail to create tg set", K(ret)); - } else if (OB_FAIL(thread_dynamic_factor_map_.create(1024, "thread_factor", ObModIds::OB_HASH_NODE, id_))) { + } else if (OB_FAIL(thread_dynamic_factor_map_.create(1024, attr))) { LOG_WARN("fail to create thread dynamic_factor_map", K(ret)); } else { if (cgroup == nullptr) { diff --git a/src/share/resource_manager/ob_resource_col_mapping_rule_manager.cpp b/src/share/resource_manager/ob_resource_col_mapping_rule_manager.cpp index 639af0e855..2b46c06673 100644 --- a/src/share/resource_manager/ob_resource_col_mapping_rule_manager.cpp +++ b/src/share/resource_manager/ob_resource_col_mapping_rule_manager.cpp @@ -257,6 +257,7 @@ int ObResourceColMappingRuleManager::drop_tenant(uint64_t tenant_id) ret = OB_SUCCESS; } } + tenant_rule_infos_.purge(); LOG_INFO("drop resource column mapping rule info", K(ret), K(tenant_id)); return ret; } diff --git a/src/share/resource_manager/ob_resource_col_mapping_rule_manager.h b/src/share/resource_manager/ob_resource_col_mapping_rule_manager.h index 43b1cc0a2a..8d7353aa35 100644 --- a/src/share/resource_manager/ob_resource_col_mapping_rule_manager.h +++ b/src/share/resource_manager/ob_resource_col_mapping_rule_manager.h @@ -181,6 +181,7 @@ public: {} int init(uint64_t tenant_id); int refresh(sql::ObPlanCache *plan_cache, const common::ObString &plan); + uint64_t get_tenant_id() const { return tenant_id_; } int get_rule_id(uint64_t tenant_id, uint64_t database_id, const common::ObString &table_name, const common::ObString &column_name, common::ObNameCaseMode case_mode, uint64_t &rule_id); @@ -215,8 +216,7 @@ private: } static ObResColMapInfoNode *alloc_node(ObTenantResColMappingInfo *p) { - UNUSED(p); - return OB_NEW(ObResColMapInfoNode, "ResRuleInfoMap"); + return OB_NEW(ObResColMapInfoNode, ObMemAttr(p->get_tenant_id(), "ResRuleInfoMap")); } static void free_node(ObResColMapInfoNode *node) { diff --git a/src/sql/das/ob_das_id_cache.cpp b/src/sql/das/ob_das_id_cache.cpp index 280d59edfd..ce9b641194 100644 --- a/src/sql/das/ob_das_id_cache.cpp +++ b/src/sql/das/ob_das_id_cache.cpp @@ -26,6 +26,7 @@ int ObDASIDCache::init(const common::ObAddr &server, rpc::frame::ObReqTransport int ret = OB_SUCCESS; void *proxy_buf = nullptr; void *request_buf = nullptr; + alloc_.set_attr(ObMemAttr(MTL_ID(), "DASIDCache")); if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", KR(ret)); diff --git a/src/sql/dtl/ob_dtl_local_first_buffer_manager.cpp b/src/sql/dtl/ob_dtl_local_first_buffer_manager.cpp index 1539ed9321..25ad4bd69a 100644 --- a/src/sql/dtl/ob_dtl_local_first_buffer_manager.cpp +++ b/src/sql/dtl/ob_dtl_local_first_buffer_manager.cpp @@ -361,7 +361,7 @@ int ObDtlLocalFirstBufferCacheManager::init() if (OB_FAIL(allocator_.init( lib::ObMallocAllocator::get_instance(), OB_MALLOC_NORMAL_BLOCK_SIZE, - ObMemAttr(common::OB_SERVER_TENANT_ID, ObModIds::OB_SQL_DTL)))) { + ObMemAttr(tenant_id_, ObModIds::OB_SQL_DTL)))) { LOG_WARN("failed to init allocator", K(ret)); } else { if (OB_FAIL(hash_table_.init(BUCKET_NUM, CONCURRENT_CNT))) { diff --git a/src/sql/dtl/ob_dtl_tenant_mem_manager.cpp b/src/sql/dtl/ob_dtl_tenant_mem_manager.cpp index bc5c0b8428..aa8a57dcdd 100644 --- a/src/sql/dtl/ob_dtl_tenant_mem_manager.cpp +++ b/src/sql/dtl/ob_dtl_tenant_mem_manager.cpp @@ -32,7 +32,7 @@ int ObDtlTenantMemManager::init() int ret = OB_SUCCESS; char *buf = nullptr; hash_cnt_ = next_pow2(common::ObServerConfig::get_instance()._px_chunklist_count_ratio) * HASH_CNT; - ObMemAttr attr(OB_SERVER_TENANT_ID, "SqlDtlMgr"); + ObMemAttr attr(tenant_id_, "SqlDtlMgr"); buf = reinterpret_cast(ob_malloc(hash_cnt_ * sizeof(ObDtlChannelMemManager), attr)); if (nullptr == buf) { ret = OB_ALLOCATE_MEMORY_FAILED; diff --git a/src/sql/engine/ob_exec_context.cpp b/src/sql/engine/ob_exec_context.cpp index a428eef0dd..374a7603e4 100644 --- a/src/sql/engine/ob_exec_context.cpp +++ b/src/sql/engine/ob_exec_context.cpp @@ -334,7 +334,7 @@ int ObExecContext::get_temp_expr_eval_ctx(const ObTempExpr &temp_expr, int ret = OB_SUCCESS; if (use_temp_expr_ctx_cache_) { if (!temp_expr_ctx_map_.created()) { - OZ(temp_expr_ctx_map_.create(8, ObLabel("TempExprCtx"))); + OZ(temp_expr_ctx_map_.create(8, ObMemAttr(OB_SERVER_TENANT_ID, "TempExprCtx"))); } if (OB_SUCC(ret)) { int64_t ctx_ptr = 0; diff --git a/src/sql/engine/ob_physical_plan.cpp b/src/sql/engine/ob_physical_plan.cpp index 8c841d449b..d8c8672c37 100644 --- a/src/sql/engine/ob_physical_plan.cpp +++ b/src/sql/engine/ob_physical_plan.cpp @@ -1129,9 +1129,10 @@ const common::hash::ObHashMap& ObPhysicalPlan::get_minimal_work int ObPhysicalPlan::assign_worker_map(common::hash::ObHashMap &worker_map, const common::hash::ObHashMap &c) { int ret = OB_SUCCESS; + ObMemAttr attr(MTL_ID(), "WorkerMap"); if (worker_map.created()) { worker_map.clear(); - } else if (OB_FAIL(worker_map.create(common::hash::cal_next_prime(100), ObModIds::OB_SQL_PX, ObModIds::OB_SQL_PX))){ + } else if (OB_FAIL(worker_map.create(common::hash::cal_next_prime(100), attr, attr))){ LOG_WARN("create hash map failed", K(ret)); } if (OB_SUCC(ret)) { diff --git a/src/sql/engine/ob_tenant_sql_memory_manager.cpp b/src/sql/engine/ob_tenant_sql_memory_manager.cpp index 63646c3809..067c2d564d 100644 --- a/src/sql/engine/ob_tenant_sql_memory_manager.cpp +++ b/src/sql/engine/ob_tenant_sql_memory_manager.cpp @@ -331,14 +331,15 @@ int ObTenantSqlMemoryManager::mtl_init(ObTenantSqlMemoryManager *&sql_mem_mgr) sql_mem_mgr = nullptr; // 系统租户不创建 if (OB_MAX_RESERVED_TENANT_ID < tenant_id) { - sql_mem_mgr = OB_NEW(ObTenantSqlMemoryManager, "SqlMemMgr", tenant_id); + sql_mem_mgr = OB_NEW(ObTenantSqlMemoryManager, + ObMemAttr(tenant_id, "SqlMemMgr"), tenant_id); if (nullptr == sql_mem_mgr) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to alloc tenant sql memory manager", K(ret)); } else if (OB_FAIL(sql_mem_mgr->allocator_.init( lib::ObMallocAllocator::get_instance(), OB_MALLOC_NORMAL_BLOCK_SIZE, - ObMemAttr(common::OB_SERVER_TENANT_ID, "SqlMemMgr")))) { + ObMemAttr(tenant_id, "SqlMemMgr")))) { LOG_WARN("failed to init fifo allocator", K(ret)); } else { int64_t work_area_interval_size = sizeof(ObSqlWorkAreaInterval) * INTERVAL_NUM; diff --git a/src/sql/monitor/flt/ob_flt_span_mgr.cpp b/src/sql/monitor/flt/ob_flt_span_mgr.cpp index 78d6c417db..e19f5e32e6 100644 --- a/src/sql/monitor/flt/ob_flt_span_mgr.cpp +++ b/src/sql/monitor/flt/ob_flt_span_mgr.cpp @@ -75,12 +75,12 @@ namespace sql int ObFLTSpanMgr::mtl_init(ObFLTSpanMgr* &span_mgr) { int ret = OB_SUCCESS; - span_mgr = OB_NEW(ObFLTSpanMgr, "SqlFltSpanRec"); + uint64_t tenant_id = lib::current_resource_owner_id(); + span_mgr = OB_NEW(ObFLTSpanMgr, ObMemAttr(tenant_id, "SqlFltSpanRec")); if (nullptr == span_mgr) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to alloc memory for ObMySQLRequestManager", K(ret)); } else { - uint64_t tenant_id = lib::current_resource_owner_id(); int64_t mem_limit = lib::get_tenant_memory_limit(tenant_id); int64_t queue_size = MAX_QUEUE_SIZE; if (OB_FAIL(span_mgr->init(tenant_id, mem_limit, queue_size))) { diff --git a/src/sql/plan_cache/ob_plan_cache_value.cpp b/src/sql/plan_cache/ob_plan_cache_value.cpp index 5d34cf0944..e9249aec3a 100644 --- a/src/sql/plan_cache/ob_plan_cache_value.cpp +++ b/src/sql/plan_cache/ob_plan_cache_value.cpp @@ -155,6 +155,12 @@ ObPlanCacheValue::ObPlanCacheValue() stmt_type_(stmt::T_MAX) { MEMSET(sql_id_, 0, sizeof(sql_id_)); + not_param_index_.set_attr(ObMemAttr(MTL_ID(), "NotParamIdex")); + neg_param_index_.set_attr(ObMemAttr(MTL_ID(), "NegParamIdex")); + must_be_positive_idx_.set_attr(ObMemAttr(MTL_ID(), "MustBePosiIdx")); + not_param_info_.set_attr(ObMemAttr(MTL_ID(), "NotParamInfo")); + not_param_var_.set_attr(ObMemAttr(MTL_ID(), "NotParamVar")); + param_charset_type_.set_attr(ObMemAttr(MTL_ID(), "ParamCharsType")); } int ObPlanCacheValue::assign_udr_infos(ObPlanCacheCtx &pc_ctx) @@ -376,7 +382,7 @@ int ObPlanCacheValue::match_all_params_info(ObPlanSet *batch_plan_set, } else if (!is_same) { ret = OB_BATCHED_MULTI_STMT_ROLLBACK; LOG_TRACE("params is not same type", K(param_store), K(i)); - } + } } if (OB_SUCC(ret)) { diff --git a/src/sql/plan_cache/ob_plan_set.h b/src/sql/plan_cache/ob_plan_set.h index a37b5052fd..14d6db60de 100644 --- a/src/sql/plan_cache/ob_plan_set.h +++ b/src/sql/plan_cache/ob_plan_set.h @@ -261,7 +261,9 @@ public: //has_array_binding_(false), is_contain_virtual_table_(false), enable_inner_part_parallel_exec_(false) - {} + { + table_locations_.set_attr(ObMemAttr(OB_SERVER_TENANT_ID, "TableLocations")); + } virtual ~ObSqlPlanSet() {} public: @@ -320,10 +322,10 @@ private: int get_plan_special(ObPlanCacheCtx &pc_ctx, ObPhysicalPlan *&plan); int try_get_local_plan(ObPlanCacheCtx &pc_ctx, - ObPhysicalPlan *&plan, + ObPhysicalPlan *&plan, bool &get_next); int try_get_remote_plan(ObPlanCacheCtx &pc_ctx, - ObPhysicalPlan *&plan, + ObPhysicalPlan *&plan, bool &get_next); int try_get_dist_plan(ObPlanCacheCtx &pc_ctx, ObPhysicalPlan *&plan); diff --git a/src/sql/session/ob_sql_session_info.cpp b/src/sql/session/ob_sql_session_info.cpp index 4768d34778..bd6fae073c 100644 --- a/src/sql/session/ob_sql_session_info.cpp +++ b/src/sql/session/ob_sql_session_info.cpp @@ -203,18 +203,18 @@ int ObSQLSessionInfo::init(uint32_t sessid, uint64_t proxy_sessid, LOG_WARN("fail to init basic session info", K(ret)); } else if (!is_acquire_from_pool() && OB_FAIL(package_state_map_.create(hash::cal_next_prime(4), - ObModIds::OB_HASH_BUCKET, - ObModIds::OB_HASH_NODE))) { + "PackStateMap", + "PackStateMap"))) { LOG_WARN("create package state map failed", K(ret)); } else if (!is_acquire_from_pool() && OB_FAIL(sequence_currval_map_.create(hash::cal_next_prime(32), - ObModIds::OB_HASH_BUCKET, - ObModIds::OB_HASH_NODE))) { + "SequenceMap", + "SequenceMap"))) { LOG_WARN("create sequence current value map failed", K(ret)); } else if (!is_acquire_from_pool() && OB_FAIL(contexts_map_.create(hash::cal_next_prime(32), - ObModIds::OB_HASH_BUCKET, - ObModIds::OB_HASH_NODE))) { + "ContextsMap", + "ContextsMap"))) { LOG_WARN("create contexts map failed", K(ret)); } else { curr_session_context_size_ = 0; diff --git a/src/sql/udr/ob_udr_item_mgr.cpp b/src/sql/udr/ob_udr_item_mgr.cpp index fce932bcc4..675e5b4af1 100644 --- a/src/sql/udr/ob_udr_item_mgr.cpp +++ b/src/sql/udr/ob_udr_item_mgr.cpp @@ -195,10 +195,11 @@ int ObUDRItemMgr::init(uint64_t tenant_id, common::ObIAllocator &allocator) { int ret = OB_SUCCESS; int bucket_size = 40960; + ObMemAttr attr(tenant_id, "RewriteRuleMap"); if (OB_UNLIKELY(rule_key_node_map_.created())) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret)); - } else if (OB_FAIL(rule_key_node_map_.create(bucket_size, "RewriteRuleMap", "RewriteRuleNode"))) { + } else if (OB_FAIL(rule_key_node_map_.create(bucket_size, attr, attr))) { LOG_WARN("failed create rule map", K(ret)); } else { inited_ = true; diff --git a/src/storage/blocksstable/encoding/ob_micro_block_decoder.cpp b/src/storage/blocksstable/encoding/ob_micro_block_decoder.cpp index 29cfd2c67e..ad4b92b3bc 100644 --- a/src/storage/blocksstable/encoding/ob_micro_block_decoder.cpp +++ b/src/storage/blocksstable/encoding/ob_micro_block_decoder.cpp @@ -70,7 +70,10 @@ class ObDecoderCtxArray { public: typedef ObColumnDecoderCtx ObDecoderCtx; - ObDecoderCtxArray() {}; + ObDecoderCtxArray() + { + ctxs_.set_attr(SET_USE_500("DecoderCtxArray")); + } virtual ~ObDecoderCtxArray() { FOREACH(it, ctxs_) { diff --git a/src/storage/blocksstable/ob_macro_block_handle.cpp b/src/storage/blocksstable/ob_macro_block_handle.cpp index c09f32291e..7fd368f5f2 100644 --- a/src/storage/blocksstable/ob_macro_block_handle.cpp +++ b/src/storage/blocksstable/ob_macro_block_handle.cpp @@ -229,6 +229,7 @@ int ObMacroBlockHandle::set_macro_block_id(const MacroBlockId ¯o_block_id) ObMacroBlocksHandle::ObMacroBlocksHandle() : macro_id_list_() { + macro_id_list_.set_attr(ObMemAttr(OB_SERVER_TENANT_ID, "MacroIdList")); } ObMacroBlocksHandle::~ObMacroBlocksHandle() diff --git a/src/storage/blocksstable/ob_shared_macro_block_manager.cpp b/src/storage/blocksstable/ob_shared_macro_block_manager.cpp index 8c39f92788..7424da5c18 100644 --- a/src/storage/blocksstable/ob_shared_macro_block_manager.cpp +++ b/src/storage/blocksstable/ob_shared_macro_block_manager.cpp @@ -124,7 +124,8 @@ int ObSharedMacroBlockMgr::init() ret = OB_INIT_TWICE; LOG_WARN("shared macro block handle has been inited", K(ret)); } else if (FALSE_IT(common_header.set_attr(ObMacroBlockCommonHeader::MacroBlockType::SharedSSTableData))) { - } else if (OB_ISNULL(common_header_buf_ = reinterpret_cast(ob_malloc(header_size_, ObModIds::OB_MACRO_FILE)))) { + } else if (OB_ISNULL(common_header_buf_ = reinterpret_cast(ob_malloc(header_size_, + ObMemAttr(MTL_ID(), ObModIds::OB_MACRO_FILE))))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory for buffer that holds common header", K(ret), K(common_header)); } else if (FALSE_IT(MEMSET(common_header_buf_, 9, header_size_))) { diff --git a/src/storage/compaction/ob_server_compaction_event_history.h b/src/storage/compaction/ob_server_compaction_event_history.h index 322be94a0a..8e6ad01ee8 100644 --- a/src/storage/compaction/ob_server_compaction_event_history.h +++ b/src/storage/compaction/ob_server_compaction_event_history.h @@ -81,9 +81,9 @@ public: static const int64_t SERVER_EVENT_MAX_CNT = 500; ObServerCompactionEventHistory() - : ObInfoRingArray(allocator_), - allocator_("CompEventMgr") + : ObInfoRingArray(allocator_) { + allocator_.set_attr(SET_USE_500("CompEventMgr")); } ~ObServerCompactionEventHistory() {} static int mtl_init(ObServerCompactionEventHistory* &event_history); diff --git a/src/storage/compaction/ob_sstable_merge_info_mgr.cpp b/src/storage/compaction/ob_sstable_merge_info_mgr.cpp index 6db7726841..b194a68ec9 100644 --- a/src/storage/compaction/ob_sstable_merge_info_mgr.cpp +++ b/src/storage/compaction/ob_sstable_merge_info_mgr.cpp @@ -149,7 +149,7 @@ void ObSSTableMergeInfoIterator::reset() */ ObTenantSSTableMergeInfoMgr::ObTenantSSTableMergeInfoMgr() : is_inited_(false), - allocator_(ObModIds::OB_SSTABLE_MERGE_INFO, OB_MALLOC_BIG_BLOCK_SIZE), + allocator_(SET_USE_500(ObModIds::OB_SSTABLE_MERGE_INFO), OB_MALLOC_BIG_BLOCK_SIZE), major_merge_infos_(allocator_), minor_merge_infos_(allocator_) { diff --git a/src/storage/compaction/ob_storage_locality_cache.cpp b/src/storage/compaction/ob_storage_locality_cache.cpp index 3490048618..c98db8d349 100644 --- a/src/storage/compaction/ob_storage_locality_cache.cpp +++ b/src/storage/compaction/ob_storage_locality_cache.cpp @@ -60,8 +60,7 @@ ObStorageLocalityCache::ObStorageLocalityCache() tenant_id_(OB_INVALID_TENANT_ID), sql_proxy_(nullptr), alloc_buf_(nullptr), - allocator_("StoLocCache"), - ls_locality_array_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator_)) + ls_locality_array_(OB_MALLOC_NORMAL_BLOCK_SIZE, allocator_) {} ObStorageLocalityCache::~ObStorageLocalityCache() @@ -81,6 +80,9 @@ int ObStorageLocalityCache::init( ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(tenant_id), KP(sql_proxy)); } else { + ObMemAttr attr(tenant_id, "StoLocCache"); + allocator_.set_attr(attr); + ls_locality_array_.set_attr(attr); tenant_id_ = tenant_id; sql_proxy_ = sql_proxy; is_inited_ = true; diff --git a/src/storage/compaction/ob_storage_locality_cache.h b/src/storage/compaction/ob_storage_locality_cache.h index b73526541f..a631918422 100644 --- a/src/storage/compaction/ob_storage_locality_cache.h +++ b/src/storage/compaction/ob_storage_locality_cache.h @@ -101,7 +101,7 @@ private: uint64_t tenant_id_; ObMySQLProxy *sql_proxy_; void *alloc_buf_; - common::DefaultPageAllocator allocator_; + common::ModulePageAllocator allocator_; common::ObArray ls_locality_array_; }; diff --git a/src/storage/compaction/ob_tenant_compaction_progress.h b/src/storage/compaction/ob_tenant_compaction_progress.h index 7db699e205..14dd9982d3 100644 --- a/src/storage/compaction/ob_tenant_compaction_progress.h +++ b/src/storage/compaction/ob_tenant_compaction_progress.h @@ -91,9 +91,9 @@ public: ObTenantCompactionProgressMgr() : ObInfoRingArray(allocator_), - allocator_("TenCompProgMgr"), sum_time_guard_() { + allocator_.set_attr(SET_USE_500("TenCompProgMgr")); } ~ObTenantCompactionProgressMgr() {} static int mtl_init(ObTenantCompactionProgressMgr* &progress_mgr); diff --git a/src/storage/high_availability/ob_storage_ha_service.cpp b/src/storage/high_availability/ob_storage_ha_service.cpp index 80f6a90800..f4915e5fcc 100644 --- a/src/storage/high_availability/ob_storage_ha_service.cpp +++ b/src/storage/high_availability/ob_storage_ha_service.cpp @@ -37,6 +37,7 @@ int ObStorageHAService::mtl_init(ObStorageHAService *&ha_service) int ret = OB_SUCCESS; ObLSService *ls_service = nullptr; + ha_service->ls_id_array_.set_attr(ObMemAttr(MTL_ID(), "ls_id")); if (OB_ISNULL(ls_service = (MTL(ObLSService *)))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ls service should not be NULL", K(ret), KP(ls_service)); @@ -231,4 +232,3 @@ int ObStorageHAService::do_ha_handler_(const share::ObLSID &ls_id) } } - diff --git a/src/storage/ls/ob_ls.cpp b/src/storage/ls/ob_ls.cpp index 01e68e5e4b..dca3d19b73 100644 --- a/src/storage/ls/ob_ls.cpp +++ b/src/storage/ls/ob_ls.cpp @@ -140,7 +140,7 @@ int ObLS::init(const share::ObLSID &ls_id, LOG_WARN("init ls sync tablet seq handler failed", K(ret)); } else if (OB_FAIL(ls_ddl_log_handler_.init(this))) { LOG_WARN("init ls ddl log handler failed", K(ret)); - } else if (OB_FAIL(keep_alive_ls_handler_.init(ls_meta_.ls_id_, get_log_handler()))) { + } else if (OB_FAIL(keep_alive_ls_handler_.init(tenant_id, ls_meta_.ls_id_, get_log_handler()))) { LOG_WARN("init keep_alive_ls_handler failed", K(ret)); } else if (OB_FAIL(gc_handler_.init(this))) { LOG_WARN("init gc handler failed", K(ret)); @@ -158,7 +158,7 @@ int ObLS::init(const share::ObLSID &ls_id, LOG_WARN("failed to init ls rebuild cb impl", K(ret)); } else if (OB_FAIL(tablet_gc_handler_.init(this))) { LOG_WARN("init tablet gc handler", K(ret)); - } else if (OB_FAIL(reserved_snapshot_mgr_.init(this, &log_handler_))) { + } else if (OB_FAIL(reserved_snapshot_mgr_.init(tenant_id, this, &log_handler_))) { LOG_WARN("failed to init reserved snapshot mgr", K(ret), K(ls_id)); } else if (OB_FAIL(reserved_snapshot_clog_handler_.init(this))) { LOG_WARN("failed to init reserved snapshot clog handler", K(ret), K(ls_id)); diff --git a/src/storage/ls/ob_ls_reserved_snapshot_mgr.cpp b/src/storage/ls/ob_ls_reserved_snapshot_mgr.cpp index d3b783c712..0116b98368 100644 --- a/src/storage/ls/ob_ls_reserved_snapshot_mgr.cpp +++ b/src/storage/ls/ob_ls_reserved_snapshot_mgr.cpp @@ -42,9 +42,10 @@ ObLSReservedSnapshotMgr::~ObLSReservedSnapshotMgr() destroy(); } -int ObLSReservedSnapshotMgr::init(ObLS *ls, ObLogHandler *log_handler) +int ObLSReservedSnapshotMgr::init(const int64_t tenant_id, ObLS *ls, ObLogHandler *log_handler) { int ret = OB_SUCCESS; + ObMemAttr attr(tenant_id, "DepTabletSet"); if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObLSReservedSnapshotMgr is inited", K(ret), KP(ls)); @@ -53,7 +54,7 @@ int ObLSReservedSnapshotMgr::init(ObLS *ls, ObLogHandler *log_handler) LOG_WARN("invalid argument", K(ret), K(ls), K(log_handler)); } else if (OB_FAIL(ObIStorageClogRecorder::init(0/*max_saved_version*/, log_handler))) { LOG_WARN("failed to init", K(ret), KP(ls), K(log_handler)); - } else if (OB_FAIL(dependent_tablet_set_.create(HASH_BUCKET))) { + } else if (OB_FAIL(dependent_tablet_set_.create(HASH_BUCKET, attr, attr))) { LOG_WARN("failed to create hash set", K(ret), K(ls)); } else { ls_ = ls; diff --git a/src/storage/ls/ob_ls_reserved_snapshot_mgr.h b/src/storage/ls/ob_ls_reserved_snapshot_mgr.h index 0f56759bd1..72d927777f 100644 --- a/src/storage/ls/ob_ls_reserved_snapshot_mgr.h +++ b/src/storage/ls/ob_ls_reserved_snapshot_mgr.h @@ -38,7 +38,7 @@ public: ObLSReservedSnapshotMgr(); ~ObLSReservedSnapshotMgr(); - int init(storage::ObLS *ls, logservice::ObLogHandler *log_handler); + int init(const int64_t tenant_id, storage::ObLS *ls, logservice::ObLogHandler *log_handler); virtual void destroy() override; // for leader diff --git a/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp b/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp index 0a087373db..a791c297d9 100644 --- a/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp +++ b/src/storage/meta_mem/ob_tenant_meta_mem_mgr.cpp @@ -123,7 +123,7 @@ int ObTenantMetaMemMgr::mtl_new(ObTenantMetaMemMgr *&meta_mem_mgr) int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); - meta_mem_mgr = OB_NEW(ObTenantMetaMemMgr, oceanbase::ObModIds::OMT_TENANT, tenant_id); + meta_mem_mgr = OB_NEW(ObTenantMetaMemMgr, ObMemAttr(tenant_id, "MetaMemMgr"), tenant_id); if (OB_ISNULL(meta_mem_mgr)) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to alloc memory", K(ret), K(tenant_id)); @@ -149,12 +149,12 @@ int ObTenantMetaMemMgr::init() } else if (OB_FAIL(tablet_map_.init(bucket_num, tenant_id_, "TabletMap", TOTAL_LIMIT, HOLD_LIMIT, common::OB_MALLOC_NORMAL_BLOCK_SIZE))) { LOG_WARN("fail to initialize tablet map", K(ret), K(bucket_num)); - } else if (OB_FAIL(last_min_minor_sstable_set_.create(DEFAULT_MINOR_SSTABLE_SET_COUNT))) { + } else if (OB_FAIL(last_min_minor_sstable_set_.create(DEFAULT_MINOR_SSTABLE_SET_COUNT, mem_attr))) { LOG_WARN("fail to create last min minor sstable set", K(ret)); } else if (OB_FAIL(pin_set_lock_.init(pin_set_bucket_num, ObLatchIds::BLOCK_MANAGER_LOCK, "T3MPinLock", tenant_id_))) { LOG_WARN("fail to init pin set lock", K(ret)); - } else if (OB_FAIL(pinned_tablet_set_.create(pin_set_bucket_num))) { + } else if (OB_FAIL(pinned_tablet_set_.create(pin_set_bucket_num, mem_attr))) { LOG_WARN("fail to create pinned tablet set", K(ret)); } else if (OB_FAIL(gc_memtable_map_.create(10, "GCMemtableMap", "GCMemtableMap", tenant_id_))) { LOG_WARN("fail to initialize gc memtable map", K(ret)); @@ -1913,6 +1913,7 @@ ObT3mTabletMapIterator::ObT3mTabletMapIterator(ObTenantMetaMemMgr &t3m) tablet_items_(), idx_(0) { + tablet_items_.set_attr(SET_USE_500("TabletItems")); } ObT3mTabletMapIterator::ObT3mTabletMapIterator( @@ -1923,6 +1924,7 @@ ObT3mTabletMapIterator::ObT3mTabletMapIterator( tablet_items_(), idx_(0) { + tablet_items_.set_attr(SET_USE_500("TabletItems")); } ObT3mTabletMapIterator::~ObT3mTabletMapIterator() diff --git a/src/storage/ob_disk_usage_reporter.cpp b/src/storage/ob_disk_usage_reporter.cpp index c024d3f991..6268ab1d3d 100644 --- a/src/storage/ob_disk_usage_reporter.cpp +++ b/src/storage/ob_disk_usage_reporter.cpp @@ -52,7 +52,7 @@ int ObDiskUsageReportTask::init(ObMySQLProxy &sql_proxy) if (is_inited_) { ret = OB_INIT_TWICE; STORAGE_LOG(WARN, "init twice", K(ret)); - } else if (OB_FAIL(result_map_.create(OB_MAX_SERVER_TENANT_CNT * 5, lib::ObLabel("OB_DISK_REP")))) { + } else if (OB_FAIL(result_map_.create(OB_MAX_SERVER_TENANT_CNT * 5, SET_USE_500("OB_DISK_REP")))) { STORAGE_LOG(WARN, "Failed to create result_map_", K(ret)); } else if (OB_FAIL(disk_usage_table_operator_.init(sql_proxy))) { STORAGE_LOG(WARN, "failed to init disk_usage_table_operator_", K(ret)); @@ -572,4 +572,3 @@ int ObDiskUsageReportTask::delete_tenant_usage_stat(const uint64_t tenant_id) } // namespace storage } // namespace oceanbase - diff --git a/src/storage/ob_resource_map.h b/src/storage/ob_resource_map.h index 64bbb9fe6a..59e914f3de 100644 --- a/src/storage/ob_resource_map.h +++ b/src/storage/ob_resource_map.h @@ -222,7 +222,7 @@ int ObResourceMap::init( ret = common::OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid argument", K(ret), K(bucket_num), K(total_limit), K(hold_limit), K(page_size), K(tenant_id)); - } else if (OB_FAIL(bucket_lock_.init(bkt_num))) { + } else if (OB_FAIL(bucket_lock_.init(bkt_num, ObLatchIds::DEFAULT_BUCKET_LOCK, ObMemAttr(tenant_id, "ResourMapLock")))) { STORAGE_LOG(WARN, "fail to init bucket lock", K(ret), K(bkt_num)); } else if (OB_FAIL(map_.create(bkt_num, attr, attr))) { STORAGE_LOG(WARN, "fail to create map", K(ret)); diff --git a/src/storage/ob_tenant_tablet_stat_mgr.cpp b/src/storage/ob_tenant_tablet_stat_mgr.cpp index 746dbd933b..61c6a0b449 100644 --- a/src/storage/ob_tenant_tablet_stat_mgr.cpp +++ b/src/storage/ob_tenant_tablet_stat_mgr.cpp @@ -317,7 +317,7 @@ int ObTabletStream::get_all_tablet_stat(common::ObIArray &tablet_s /************************************* ObTabletStreamPool *************************************/ ObTabletStreamPool::ObTabletStreamPool() : dynamic_allocator_(MTL_ID()), - free_list_allocator_("FreeTbltStream"), + free_list_allocator_(ObMemAttr(MTL_ID(), "FreeTbltStream")), free_list_(), lru_list_(), max_free_list_num_(0), @@ -490,7 +490,7 @@ ObTenantTabletStatMgr::~ObTenantTabletStatMgr() destroy(); } -int ObTenantTabletStatMgr::init() +int ObTenantTabletStatMgr::init(const int64_t tenant_id) { int ret = OB_SUCCESS; const bool repeat = true; @@ -500,9 +500,10 @@ int ObTenantTabletStatMgr::init() LOG_WARN("ObTenantTabletStatMgr init twice", K(ret)); } else if (OB_FAIL(stream_pool_.init(DEFAULT_MAX_FREE_STREAM_CNT, DEFAULT_UP_LIMIT_STREAM_CNT))) { LOG_WARN("failed to init tablet stream pool", K(ret)); - } else if (OB_FAIL(stream_map_.create(DEFAULT_BUCKET_NUM, "TabletStats"))) { + } else if (OB_FAIL(stream_map_.create(DEFAULT_BUCKET_NUM, ObMemAttr(tenant_id, "TabletStats")))) { LOG_WARN("failed to create TabletStats", K(ret)); - } else if (OB_FAIL(bucket_lock_.init(DEFAULT_BUCKET_NUM))) { + } else if (OB_FAIL(bucket_lock_.init(DEFAULT_BUCKET_NUM, ObLatchIds::DEFAULT_BUCKET_LOCK, + ObMemAttr(tenant_id, "TabStatMgrLock")))) { LOG_WARN("failed to init bucket lock", K(ret)); } else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::TabletStatRpt, report_tg_id_))) { LOG_WARN("failed to create TabletStatRpt thread", K(ret)); @@ -519,7 +520,7 @@ int ObTenantTabletStatMgr::init() int ObTenantTabletStatMgr::mtl_init(ObTenantTabletStatMgr* &tablet_stat_mgr) { int ret = OB_SUCCESS; - if (OB_FAIL(tablet_stat_mgr->init())) { + if (OB_FAIL(tablet_stat_mgr->init(MTL_ID()))) { LOG_WARN("failed to init tablet stat mgr", K(ret), K(MTL_ID())); } else { LOG_INFO("success to init ObTenantTabletStatMgr", K(MTL_ID())); diff --git a/src/storage/ob_tenant_tablet_stat_mgr.h b/src/storage/ob_tenant_tablet_stat_mgr.h index c45c9f96be..086ba8baed 100644 --- a/src/storage/ob_tenant_tablet_stat_mgr.h +++ b/src/storage/ob_tenant_tablet_stat_mgr.h @@ -284,7 +284,7 @@ public: static int mtl_init(ObTenantTabletStatMgr* &tablet_stat_mgr); ObTenantTabletStatMgr(); virtual ~ObTenantTabletStatMgr(); - int init(); + int init(const int64_t tenant_id); bool is_inited() const { return is_inited_; } // int start(); void wait(); diff --git a/src/storage/slog/ob_storage_log_nop_log.cpp b/src/storage/slog/ob_storage_log_nop_log.cpp index 397bee3919..0da5156dca 100644 --- a/src/storage/slog/ob_storage_log_nop_log.cpp +++ b/src/storage/slog/ob_storage_log_nop_log.cpp @@ -39,7 +39,7 @@ ObStorageLogNopLog::~ObStorageLogNopLog() destroy(); } -int ObStorageLogNopLog::init(const int64_t buffer_size) +int ObStorageLogNopLog::init(const int64_t tenant_id, const int64_t buffer_size) { int ret = OB_SUCCESS; if (OB_UNLIKELY(is_inited_)) { @@ -51,7 +51,7 @@ int ObStorageLogNopLog::init(const int64_t buffer_size) } else { buffer_ = static_cast(ob_malloc_align( ObLogConstants::LOG_FILE_ALIGN_SIZE, - buffer_size, "SlogNopLog")); + buffer_size, ObMemAttr(tenant_id, "SlogNopLog"))); if (OB_ISNULL(buffer_)) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc nop log buffer", K(ret)); @@ -141,4 +141,4 @@ int64_t ObStorageLogNopLog::get_fixed_serialize_len(const int64_t used_len) return ret_len; } } // namespace blocksstable -} // namespace oceanbase \ No newline at end of file +} // namespace oceanbase diff --git a/src/storage/slog/ob_storage_log_nop_log.h b/src/storage/slog/ob_storage_log_nop_log.h index 1ad66c95a5..ea75d766a8 100644 --- a/src/storage/slog/ob_storage_log_nop_log.h +++ b/src/storage/slog/ob_storage_log_nop_log.h @@ -26,7 +26,7 @@ public: ObStorageLogNopLog(); virtual ~ObStorageLogNopLog(); - int init(const int64_t buffer_size); + int init(const int64_t tenant_id, const int64_t buffer_size); void destroy(); int set_needed_size(const int64_t size); diff --git a/src/storage/slog/ob_storage_log_write_buffer.cpp b/src/storage/slog/ob_storage_log_write_buffer.cpp index 9c262aa4e3..7cd64d33da 100644 --- a/src/storage/slog/ob_storage_log_write_buffer.cpp +++ b/src/storage/slog/ob_storage_log_write_buffer.cpp @@ -40,7 +40,8 @@ ObStorageLogWriteBuffer::~ObStorageLogWriteBuffer() int ObStorageLogWriteBuffer::init( const int64_t align_size, - const int64_t buf_size) + const int64_t buf_size, + const int64_t tenant_id) { int ret = OB_SUCCESS; @@ -51,7 +52,7 @@ int ObStorageLogWriteBuffer::init( ret = OB_INVALID_ARGUMENT; STORAGE_REDO_LOG(WARN, "Invalid arguments", K(ret), K(align_size), K(buf_size)); } else if (OB_ISNULL(buf_ = static_cast(ob_malloc_align(align_size, - buf_size, "SlogWriteBuffer")))) { + buf_size, ObMemAttr(tenant_id, "SlogWriteBuffer"))))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_REDO_LOG(WARN, "Fail to alloc write buffer", K(ret), KP_(buf), K(buf_size), K_(align_size)); diff --git a/src/storage/slog/ob_storage_log_write_buffer.h b/src/storage/slog/ob_storage_log_write_buffer.h index 955cf8033c..db793eeabb 100644 --- a/src/storage/slog/ob_storage_log_write_buffer.h +++ b/src/storage/slog/ob_storage_log_write_buffer.h @@ -30,7 +30,8 @@ public: int init( const int64_t align_size, - const int64_t buf_size); + const int64_t buf_size, + const int64_t tenant_id); void destroy(); const char *get_buf() const { return buf_; } int64_t get_write_len() const { return write_len_; } diff --git a/src/storage/slog/ob_storage_log_writer.cpp b/src/storage/slog/ob_storage_log_writer.cpp index 3f52ead228..08912370fc 100644 --- a/src/storage/slog/ob_storage_log_writer.cpp +++ b/src/storage/slog/ob_storage_log_writer.cpp @@ -79,7 +79,7 @@ int ObStorageLogWriter::init( ret = OB_INVALID_ARGUMENT; STORAGE_REDO_LOG(WARN, "Invalid arguments", K(ret), KP(log_dir), K(log_file_size), K(max_log_size)); - } else if (OB_FAIL(ObBaseLogWriter::init(log_cfg, thread_name, MTL_ID()))) { + } else if (OB_FAIL(ObBaseLogWriter::init(log_cfg, thread_name, tenant_id))) { STORAGE_REDO_LOG(WARN, "Fail to init ObBaseLogWriter", K(ret)); } else if (OB_FAIL(ObLogPolicyParser::parse_retry_write_policy(log_file_spec.retry_write_policy_, retry_write_policy_))) { @@ -89,9 +89,9 @@ int ObStorageLogWriter::init( log_write_policy_))) { ret = OB_INVALID_ARGUMENT; STORAGE_REDO_LOG(WARN, "Fail to parse log write policy", K(ret), K(log_file_spec)); - } else if (OB_FAIL(nop_log_.init(ObLogConstants::LOG_FILE_ALIGN_SIZE))) { + } else if (OB_FAIL(nop_log_.init(tenant_id, ObLogConstants::LOG_FILE_ALIGN_SIZE))) { STORAGE_REDO_LOG(WARN, "Fail to init nop log", K(ret)); - } else if (OB_FAIL(batch_write_buf_.init(ObLogConstants::LOG_FILE_ALIGN_SIZE, buf_size))) { + } else if (OB_FAIL(batch_write_buf_.init(ObLogConstants::LOG_FILE_ALIGN_SIZE, buf_size, tenant_id))) { STORAGE_REDO_LOG(WARN, "Fail to init batch write buf", K(ret), K(buf_size)); } else if (OB_FAIL(file_handler_.init(log_dir, log_file_size, tenant_id))) { STORAGE_REDO_LOG(WARN, "Fail to create file handler", K(ret), KP(log_dir)); diff --git a/src/storage/tx/ob_keep_alive_ls_handler.cpp b/src/storage/tx/ob_keep_alive_ls_handler.cpp index 2beb5cfb91..d194a50575 100644 --- a/src/storage/tx/ob_keep_alive_ls_handler.cpp +++ b/src/storage/tx/ob_keep_alive_ls_handler.cpp @@ -32,7 +32,8 @@ int64_t ObKeepAliveLogBody::get_max_serialize_size() return max_log_body.get_serialize_size(); } -int ObKeepAliveLSHandler::init(const ObLSID &ls_id, logservice::ObLogHandler *log_handler_ptr) +int ObKeepAliveLSHandler::init(const int64_t tenant_id, const ObLSID &ls_id, + logservice::ObLogHandler *log_handler_ptr) { int ret = OB_SUCCESS; logservice::ObLogBaseHeader base_header(ObLogBaseType::KEEP_ALIVE_LOG_BASE_TYPE, @@ -45,7 +46,7 @@ int ObKeepAliveLSHandler::init(const ObLSID &ls_id, logservice::ObLogHandler *lo } else if (OB_NOT_NULL(log_handler_ptr_)) { ret = OB_INIT_TWICE; } else if (OB_ISNULL(submit_buf_ = - static_cast(ob_malloc(submit_buf_len_, "KeepAliveBuf")))) { + static_cast(ob_malloc(submit_buf_len_, ObMemAttr(tenant_id, "KeepAliveBuf"))))) { ret = OB_ALLOCATE_MEMORY_FAILED; TRANS_LOG(WARN, "[Keep Alive] submit_buf alloc failed", K(ret), KP(submit_buf_), K(base_header)); diff --git a/src/storage/tx/ob_keep_alive_ls_handler.h b/src/storage/tx/ob_keep_alive_ls_handler.h index 6f75e87add..ca9d986df3 100644 --- a/src/storage/tx/ob_keep_alive_ls_handler.h +++ b/src/storage/tx/ob_keep_alive_ls_handler.h @@ -133,13 +133,13 @@ private: class ObKeepAliveLSHandler : public logservice::ObIReplaySubHandler, public logservice::ObICheckpointSubHandler, public logservice::ObIRoleChangeSubHandler, - public logservice::AppendCb + public logservice::AppendCb { public: const int64_t KEEP_ALIVE_GTS_INTERVAL = 100 * 1000; public: ObKeepAliveLSHandler() : submit_buf_(nullptr) { reset(); } - int init(const share::ObLSID &ls_id,logservice::ObLogHandler * log_handler_ptr); + int init(const int64_t tenant_id, const share::ObLSID &ls_id,logservice::ObLogHandler * log_handler_ptr); void stop(); // false - can not safe destroy @@ -147,7 +147,7 @@ public: void destroy(); void reset(); - + int try_submit_log(const share::SCN &min_start_scn, MinStartScnStatus status); void print_stat_info(); public: @@ -159,7 +159,7 @@ public: int replay(const void *buffer, const int64_t nbytes, const palf::LSN &lsn, const share::SCN &scn); void switch_to_follower_forcedly() { - ATOMIC_STORE(&is_master_, false); + ATOMIC_STORE(&is_master_, false); } int switch_to_leader() { ATOMIC_STORE(&is_master_,true); return OB_SUCCESS;} int switch_to_follower_gracefully() { ATOMIC_STORE(&is_master_,false); return OB_SUCCESS;} @@ -171,7 +171,7 @@ public: private: bool check_gts_(); int serialize_keep_alive_log_(const share::SCN &min_start_scn, MinStartScnStatus status); -private : +private : SpinRWLock lock_; bool is_busy_; diff --git a/src/storage/tx/ob_trans_define.h b/src/storage/tx/ob_trans_define.h index f0d2754fbc..688e0c5444 100644 --- a/src/storage/tx/ob_trans_define.h +++ b/src/storage/tx/ob_trans_define.h @@ -164,7 +164,7 @@ private: class TransModulePageAllocator : public common::ModulePageAllocator { public: - TransModulePageAllocator(const lib::ObLabel &label = common::ObModIds::OB_MODULE_PAGE_ALLOCATOR, + TransModulePageAllocator(const lib::ObLabel &label = "TransModulePage", int64_t tenant_id = common::OB_SERVER_TENANT_ID, int64_t ctx_id = 0) : ModulePageAllocator(label, tenant_id, ctx_id) {} diff --git a/src/storage/tx/ob_trans_service.cpp b/src/storage/tx/ob_trans_service.cpp index 5840d3d093..f64e27ab1f 100644 --- a/src/storage/tx/ob_trans_service.cpp +++ b/src/storage/tx/ob_trans_service.cpp @@ -511,7 +511,7 @@ void ObTransService::handle(void *task) } else if (OB_FAIL(advance_ckpt_task->try_advance_ls_ckpt_ts())) { TRANS_LOG(WARN, "advance ls ckpt ts failed", K(ret)); } - + if (OB_NOT_NULL(advance_ckpt_task)) { mtl_free(advance_ckpt_task); advance_ckpt_task = nullptr; @@ -913,5 +913,3 @@ int ObTransService::get_max_commit_version(SCN &commit_version) const } } // transaction } // oceanbase - - diff --git a/src/storage/tx/ob_tx_ls_log_writer.cpp b/src/storage/tx/ob_tx_ls_log_writer.cpp index 0980dda374..d00f3f3d72 100644 --- a/src/storage/tx/ob_tx_ls_log_writer.cpp +++ b/src/storage/tx/ob_tx_ls_log_writer.cpp @@ -171,7 +171,7 @@ int ObTxLSLogWriter::init(const int64_t tenant_id, const ObLSID &ls_id, ObITxLogAdapter * adapter, ObLSTxCtxMgr *ctx_mgr) -{ + { int ret = OB_SUCCESS; if (OB_NOT_NULL(tx_log_adapter_) || OB_NOT_NULL(ctx_mgr_)) { @@ -180,6 +180,7 @@ int ObTxLSLogWriter::init(const int64_t tenant_id, ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "[TxLsLogWriter] invalid arguments", K(ls_id), KP(adapter), KP(ctx_mgr)); } else { + tenant_id_ = tenant_id; ls_id_ = ls_id; tenant_id_ = tenant_id; ctx_mgr_ = ctx_mgr; @@ -228,6 +229,7 @@ void ObTxLSLogWriter::reset() keep_alive_cbs_.reset(); start_working_cbs_.reset(); free_cbs_.reset(); + tenant_id_ = OB_SERVER_TENANT_ID; ls_id_.reset(); ctx_mgr_ = nullptr; tx_log_adapter_ = nullptr; @@ -362,7 +364,8 @@ int ObTxLSLogWriter::append_free_log_cb_() char *cb_buf = nullptr; ObTxLSLogCb *tmp_cb = nullptr; if (nullptr - == (cb_buf = (char *)ob_malloc(sizeof(ObTxLSLogCb) * APPEND_LOG_CB_CNT, "ObTxLSLogCb"))) { + == (cb_buf = (char *)ob_malloc(sizeof(ObTxLSLogCb) * APPEND_LOG_CB_CNT, + ObMemAttr(tenant_id_, "ObTxLSLogCb")))) { ret = OB_ALLOCATE_MEMORY_FAILED; } else { for (int i = 0; i < APPEND_LOG_CB_CNT; i++) { diff --git a/src/storage/tx/ob_tx_ls_log_writer.h b/src/storage/tx/ob_tx_ls_log_writer.h index a877920e4b..6cedba6e19 100644 --- a/src/storage/tx/ob_tx_ls_log_writer.h +++ b/src/storage/tx/ob_tx_ls_log_writer.h @@ -122,7 +122,7 @@ int ObTxLSLogCb::serialize_ls_log(T &ls_log, // K(block_header)); // } else if (OB_FAIL(ls_log.before_serialize())) { // TRANS_LOG(WARN, "[TxLsLogWriter] before serialize block header error", KR(ret), K(ls_log)); - // } else + // } else if (OB_FAIL(base_header.serialize(log_buf_, ObTxLSLogLimit::LOG_BUF_SIZE, pos_))) { TRANS_LOG(WARN, "[TxLsLogWriter] serialize base header error", KR(ret), KP(log_buf_), K(pos_)); @@ -200,8 +200,8 @@ private: common::ObDList keep_alive_cbs_; common::ObDList start_working_cbs_; - share::ObLSID ls_id_; int64_t tenant_id_; + share::ObLSID ls_id_; ObLSTxCtxMgr *ctx_mgr_; ObITxLogAdapter *tx_log_adapter_; }; diff --git a/src/storage/tx_storage/ob_checkpoint_service.cpp b/src/storage/tx_storage/ob_checkpoint_service.cpp index 25ce3a675d..3f506cf518 100644 --- a/src/storage/tx_storage/ob_checkpoint_service.cpp +++ b/src/storage/tx_storage/ob_checkpoint_service.cpp @@ -42,16 +42,16 @@ int64_t ObCheckPointService::TRAVERSAL_FLUSH_INTERVAL = 5000 * 1000L; int ObCheckPointService::mtl_init(ObCheckPointService* &m) { - return m->init(); + return m->init(MTL_ID()); } -int ObCheckPointService::init() +int ObCheckPointService::init(const int64_t tenant_id) { int ret = OB_SUCCESS; if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObCheckPointService init twice.", K(ret)); - } else if (OB_FAIL(freeze_thread_.init(lib::TGDefIDs::LSFreeze))) { + } else if (OB_FAIL(freeze_thread_.init(tenant_id, lib::TGDefIDs::LSFreeze))) { LOG_WARN("fail to initialize freeze thread", K(ret)); } else { is_inited_ = true; @@ -66,15 +66,15 @@ int ObCheckPointService::start() traversal_flush_timer_.set_run_wrapper(MTL_CTX()); check_clog_disk_usage_timer_.set_run_wrapper(MTL_CTX()); - if (OB_FAIL(checkpoint_timer_.init("TxCkpt"))) { + if (OB_FAIL(checkpoint_timer_.init("TxCkpt", ObMemAttr(MTL_ID(), "CheckPointTimer")))) { STORAGE_LOG(ERROR, "fail to init checkpoint_timer", K(ret)); } else if (OB_FAIL(checkpoint_timer_.schedule(checkpoint_task_, CHECKPOINT_INTERVAL, true))) { STORAGE_LOG(ERROR, "fail to schedule checkpoint task", K(ret)); - } else if (OB_FAIL(traversal_flush_timer_.init("Flush"))) { + } else if (OB_FAIL(traversal_flush_timer_.init("Flush", ObMemAttr(MTL_ID(), "FlushTimer")))) { STORAGE_LOG(ERROR, "fail to init traversal_timer", K(ret)); } else if (OB_FAIL(traversal_flush_timer_.schedule(traversal_flush_task_, TRAVERSAL_FLUSH_INTERVAL, true))) { STORAGE_LOG(ERROR, "fail to schedule traversal_flush task", K(ret)); - } else if (OB_FAIL(check_clog_disk_usage_timer_.init("CKClogDisk"))) { + } else if (OB_FAIL(check_clog_disk_usage_timer_.init("CKClogDisk", ObMemAttr(MTL_ID(), "DiskUsageTimer")))) { STORAGE_LOG(ERROR, "fail to init check_clog_disk_usage_timer", K(ret)); } else if (OB_FAIL(check_clog_disk_usage_timer_.schedule(check_clog_disk_usage_task_, CHECK_CLOG_USAGE_INTERVAL, true))) { STORAGE_LOG(ERROR, "fail to schedule check_clog_disk_usage task", K(ret)); diff --git a/src/storage/tx_storage/ob_checkpoint_service.h b/src/storage/tx_storage/ob_checkpoint_service.h index dd2db27029..3bc1a3e450 100644 --- a/src/storage/tx_storage/ob_checkpoint_service.h +++ b/src/storage/tx_storage/ob_checkpoint_service.h @@ -40,7 +40,7 @@ public: static const int64_t NEED_FLUSH_CLOG_DISK_PERCENT = 30; static int mtl_init(ObCheckPointService *&m); - int init(); + int init(const int64_t tenant_id); int start(); int stop(); void wait(); diff --git a/src/storage/tx_storage/ob_ls_freeze_thread.cpp b/src/storage/tx_storage/ob_ls_freeze_thread.cpp index ff44a06ee1..dc6ebb9149 100644 --- a/src/storage/tx_storage/ob_ls_freeze_thread.cpp +++ b/src/storage/tx_storage/ob_ls_freeze_thread.cpp @@ -77,7 +77,7 @@ void ObLSFreezeThread::destroy() } } -int ObLSFreezeThread::init(int tg_id) +int ObLSFreezeThread::init(const int64_t tenant_id, int tg_id) { int ret = OB_SUCCESS; if (inited_) { @@ -88,7 +88,7 @@ int ObLSFreezeThread::init(int tg_id) } else if (OB_FAIL(TG_SET_HANDLER_AND_START(tg_id_, *this))) { STORAGE_LOG(WARN, "ObSimpleThreadPool inited error.", K(ret)); } else { - ObMemAttr memattr(OB_SERVER_TENANT_ID, "FreezeTask"); + ObMemAttr memattr(tenant_id, "FreezeTask"); for (int64_t i = 0; OB_SUCC(ret) && i < MAX_FREE_TASK_NUM; i++) { ObLSFreezeTask *ptr = (ObLSFreezeTask *)ob_malloc(sizeof(ObLSFreezeTask), memattr); diff --git a/src/storage/tx_storage/ob_ls_freeze_thread.h b/src/storage/tx_storage/ob_ls_freeze_thread.h index 897a88664e..4c9ecb1b30 100644 --- a/src/storage/tx_storage/ob_ls_freeze_thread.h +++ b/src/storage/tx_storage/ob_ls_freeze_thread.h @@ -57,7 +57,7 @@ public: ObLSFreezeThread(); virtual ~ObLSFreezeThread(); - int init(int tg_id); + int init(int64_t tenant_id, int tg_id); void destroy(); int add_task(checkpoint::ObDataCheckpoint *data_checkpoint, share::SCN rec_scn); diff --git a/src/storage/tx_storage/ob_tablet_gc_service.cpp b/src/storage/tx_storage/ob_tablet_gc_service.cpp index 44733beb90..02ffb7bdf8 100644 --- a/src/storage/tx_storage/ob_tablet_gc_service.cpp +++ b/src/storage/tx_storage/ob_tablet_gc_service.cpp @@ -74,7 +74,7 @@ int ObTabletGCService::start() timer_for_tablet_gc_.set_run_wrapper(MTL_CTX()); if (OB_FAIL(timer_for_tablet_change_.init())) { STORAGE_LOG(ERROR, "fail to init timer", KR(ret)); - } else if (OB_FAIL(timer_for_tablet_gc_.init())) { + } else if (OB_FAIL(timer_for_tablet_gc_.init("TabletGcTimer", ObMemAttr(MTL_ID(), "TabletGcTimer")))) { STORAGE_LOG(ERROR, "fail to init timer", KR(ret)); } else if (OB_FAIL(timer_for_tablet_change_.schedule(tablet_change_task_, GC_CHECK_INTERVAL, true))) { STORAGE_LOG(ERROR, "fail to schedule task", KR(ret)); diff --git a/src/storage/tx_storage/ob_tenant_memory_printer.cpp b/src/storage/tx_storage/ob_tenant_memory_printer.cpp index 97ff6dd222..c907a90875 100644 --- a/src/storage/tx_storage/ob_tenant_memory_printer.cpp +++ b/src/storage/tx_storage/ob_tenant_memory_printer.cpp @@ -90,7 +90,6 @@ int ObTenantMemoryPrinter::print_tenant_usage() LOG_WARN("print mtl tenant usage failed", K(tmp_ret), K(tenant_id)); } } - uint64_t tenant_ids[128] = {0}; int tenant_cnt = 0; static uint64_t all_tenant_ids[OB_MAX_SERVER_TENANT_CNT] = {0}; common::get_tenant_ids(all_tenant_ids, OB_MAX_SERVER_TENANT_CNT, tenant_cnt); diff --git a/src/storage/tx_table/ob_tx_data_table.cpp b/src/storage/tx_table/ob_tx_data_table.cpp index 0fea6a564b..c9de65756c 100644 --- a/src/storage/tx_table/ob_tx_data_table.cpp +++ b/src/storage/tx_table/ob_tx_data_table.cpp @@ -66,6 +66,8 @@ int ObTxDataTable::init(ObLS *ls, ObTxCtxTable *tx_ctx_table) } else if (OB_FAIL(init_tx_data_read_schema_())) { STORAGE_LOG(WARN, "init tx data read ctx failed.", KR(ret), K(tablet_id_)); } else { + calc_upper_trans_version_cache_.commit_versions_.array_.set_attr( + ObMemAttr(ls->get_tenant_id(), "CommitVersions")); slice_allocator_.set_nway(ObTxDataTable::TX_DATA_MAX_CONCURRENCY); TX_DATA_MEM_LEAK_DEBUG_CODE diff --git a/unittest/storage/slog/test_storage_logger_manager.cpp b/unittest/storage/slog/test_storage_logger_manager.cpp index 6a307da2b9..8cc6b59869 100644 --- a/unittest/storage/slog/test_storage_logger_manager.cpp +++ b/unittest/storage/slog/test_storage_logger_manager.cpp @@ -142,6 +142,7 @@ TEST_F(TestStorageLoggerManager, test_slogger_basic) ObTenantBase tenant_base(5); tenant_base.set(tmp_slogger); ObTenantEnv::set_tenant(&tenant_base); + DEFER(ObTenantEnv::set_tenant(nullptr)); ASSERT_EQ(OB_SUCCESS, tenant_base.init()); ObTenantSwitchGuard guard; @@ -183,19 +184,21 @@ TEST_F(TestStorageLoggerManager, test_slogger_basic) TEST_F (TestStorageLoggerManager, test_build_item) { int ret = OB_SUCCESS; + ObTenantBase tenant_base(5); + ObTenantEnv::set_tenant(&tenant_base); + ASSERT_EQ(OB_SUCCESS, tenant_base.init()); SLOGGERMGR.init(dir_, MAX_FILE_SIZE, log_file_spec_); ObStorageLogger *tmp_slogger = OB_NEW(ObStorageLogger, ObModIds::TEST); ASSERT_EQ(OB_SUCCESS, tmp_slogger->init(SLOGGERMGR, 500)); ASSERT_EQ(OB_SUCCESS, tmp_slogger->start()); - ObTenantBase tenant_base(5); tenant_base.set(tmp_slogger); - ObTenantEnv::set_tenant(&tenant_base); - ASSERT_EQ(OB_SUCCESS, tenant_base.init()); ObTenantSwitchGuard guard; guard.switch_to(5); + ObTenantEnv::set_tenant(&tenant_base); + DEFER(ObTenantEnv::set_tenant(nullptr)); ObStorageLogger *slogger = MTL(ObStorageLogger*); slogger->start_log(start_cursor_); diff --git a/unittest/storage/test_io_manager.cpp b/unittest/storage/test_io_manager.cpp index 020dc62876..c7bea48e9b 100644 --- a/unittest/storage/test_io_manager.cpp +++ b/unittest/storage/test_io_manager.cpp @@ -568,10 +568,10 @@ TEST_F(TestIOStruct, IOCallbackManager) // test init ObIOCallbackManager callback_mgr; ASSERT_FALSE(callback_mgr.is_inited_); - ASSERT_FAIL(callback_mgr.init(0, 1000, nullptr)); + ASSERT_FAIL(callback_mgr.init(TEST_TENANT_ID, 0, 1000, nullptr)); ObIOAllocator io_allocator; ASSERT_SUCC(io_allocator.init(TEST_TENANT_ID, IO_MEMORY_LIMIT)); - ASSERT_SUCC(callback_mgr.init(2, 1000, &io_allocator)); + ASSERT_SUCC(callback_mgr.init(TEST_TENANT_ID, 2, 1000, &io_allocator)); ASSERT_TRUE(callback_mgr.is_inited_); // test enqueue and dequeue @@ -2001,4 +2001,4 @@ int IOTracerSwitch::modify_tenant_io(IOPerfTenant &curr_tenant) LOG_WARN("refresh tenant io config failed", K(ret), K(curr_tenant.tenant_id_), K(curr_tenant.config_)); } return ret; -} \ No newline at end of file +} diff --git a/unittest/storage/test_tenant_tablet_stat_mgr.cpp b/unittest/storage/test_tenant_tablet_stat_mgr.cpp index 704553d8cf..62e0ca38ed 100644 --- a/unittest/storage/test_tenant_tablet_stat_mgr.cpp +++ b/unittest/storage/test_tenant_tablet_stat_mgr.cpp @@ -79,11 +79,12 @@ void TestTenantTabletStatMgr::SetUp() { int ret = OB_SUCCESS; + ObTenantEnv::set_tenant(&tenant_base_); stat_mgr_ = OB_NEW(ObTenantTabletStatMgr, ObModIds::TEST); - ret = stat_mgr_->init(); + ret = stat_mgr_->init(tenant_id_); ASSERT_EQ(OB_SUCCESS, ret); - tenant_base_.set(stat_mgr_); + ObTenantEnv::set_tenant(&tenant_base_); ASSERT_EQ(OB_SUCCESS, tenant_base_.init()); ASSERT_EQ(tenant_id_, MTL_ID()); @@ -425,4 +426,4 @@ int main(int argc, char **argv) OB_LOGGER.set_log_level("INFO"); OB_LOGGER.set_max_file_size(256*1024*1024); return RUN_ALL_TESTS(); -} \ No newline at end of file +} diff --git a/unittest/storage/tx/it/tx_node.cpp b/unittest/storage/tx/it/tx_node.cpp index 4d0c151a10..c1ae4fb238 100644 --- a/unittest/storage/tx/it/tx_node.cpp +++ b/unittest/storage/tx/it/tx_node.cpp @@ -261,6 +261,7 @@ ObTxNode::~ObTxNode() __attribute__((optnone)) { delete fake_tx_log_adapter_; } FAST_FAIL(); + ObTenantEnv::set_tenant(NULL); } int ObTxNode::create_memtable_(const int64_t tablet_id, memtable::ObMemtable *&mt) { diff --git a/unittest/storage/tx/test_ls_log_writer.cpp b/unittest/storage/tx/test_ls_log_writer.cpp index cf8fea149c..31a36d8b0e 100644 --- a/unittest/storage/tx/test_ls_log_writer.cpp +++ b/unittest/storage/tx/test_ls_log_writer.cpp @@ -72,7 +72,7 @@ const ObLSID TEST_LS_ID(735); TEST_F(TestLSLogWriter, submit_start_working_log) { - int64_t tmp_tenant_id = 1004; + int64_t tmp_tenant_id = 1; ObLSTxCtxMgr tmp_mgr; common::ObConcurrentFIFOAllocator tmp_allocator;