From 51a7041b170f0873d14902ec6d31aac6531eef5d Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 8 Feb 2024 09:47:28 +0000 Subject: [PATCH] [CP] fix sqc handler memory leak when drop tenant --- src/observer/omt/ob_multi_tenant.cpp | 2 +- src/observer/omt/ob_tenant.cpp | 49 ++++++++++++++++++++++++++-- src/observer/omt/ob_tenant.h | 13 ++++++-- src/sql/engine/px/ob_px_worker.cpp | 6 ++-- src/sql/engine/px/ob_px_worker.h | 2 +- 5 files changed, 63 insertions(+), 9 deletions(-) diff --git a/src/observer/omt/ob_multi_tenant.cpp b/src/observer/omt/ob_multi_tenant.cpp index c4e17a7524..17f5f4f1fc 100644 --- a/src/observer/omt/ob_multi_tenant.cpp +++ b/src/observer/omt/ob_multi_tenant.cpp @@ -523,7 +523,7 @@ int ObMultiTenant::init(ObAddr myaddr, MTL_BIND2(mtl_new_default, ObTenantCGReadInfoMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObDecodeResourcePool::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); - MTL_BIND(ObPxPools::mtl_init, ObPxPools::mtl_destroy); + MTL_BIND2(nullptr, ObPxPools::mtl_init, nullptr, ObPxPools::mtl_stop, nullptr, ObPxPools::mtl_destroy); MTL_BIND(ObTenantDfc::mtl_init, ObTenantDfc::mtl_destroy); MTL_BIND(init_compat_mode, nullptr); MTL_BIND2(ObMySQLRequestManager::mtl_new, ObMySQLRequestManager::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, ObMySQLRequestManager::mtl_destroy); diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index 7241f5e07d..525fecdb4a 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -162,7 +162,7 @@ int ObPxPools::ThreadRecyclePoolFunc::operator() (common::hash::HashMapPair &kv) +int ObPxPools::StopPoolFunc::operator() (common::hash::HashMapPair &kv) { int ret = OB_SUCCESS; int64_t &group_id = kv.first; @@ -172,15 +172,41 @@ int ObPxPools::DeletePoolFunc::operator() (common::hash::HashMapPairstop(); LOG_INFO("DEL_POOL_STEP_1: mark px pool stop succ!", K(group_id)); + } + return ret; +} + +int ObPxPools::DeletePoolFunc::operator() (common::hash::HashMapPair &kv) +{ + int ret = OB_SUCCESS; + int64_t &group_id = kv.first; + ObPxPool *pool = kv.second; + if (NULL == pool) { + LOG_WARN("pool is null", K(group_id)); + } else { pool->wait(); LOG_INFO("DEL_POOL_STEP_2: wait pool empty succ!", K(group_id)); pool->destroy(); - LOG_INFO("DEL_POOL_STEP_3: pool destroy succ!", K(group_id)); + LOG_INFO("DEL_POOL_STEP_3: pool destroy succ!", K(group_id), K(pool->get_queue_size())); common::ob_delete(pool); } return ret; } +void ObPxPools::mtl_stop(ObPxPools *&pools) +{ + int ret = OB_SUCCESS; + common::SpinWLockGuard g(pools->lock_); + if (OB_ISNULL(pools)) { + LOG_WARN("pools is null"); + } else { + StopPoolFunc stop_pool_func; + if (OB_FAIL(pools->pool_map_.foreach_refactored(stop_pool_func))) { + LOG_WARN("failed to do foreach", K(ret)); + } + } +} + void ObPxPools::destroy() { int ret = OB_SUCCESS; @@ -226,7 +252,8 @@ void ObPxPool::handle(ObLink *task) if (t == nullptr) { LOG_ERROR_RET(OB_INVALID_ARGUMENT, "px task is invalid"); } else { - t->func_(); + bool need_exec = true; + t->func_(need_exec); OB_DELETE(Task, "PxTask", t); } ATOMIC_DEC(&concurrency_); @@ -314,6 +341,22 @@ void ObPxPool::try_recycle(int64_t idle_time) } } +void ObPxPool::stop() +{ + int ret = OB_SUCCESS; + Threads::stop(); + ObLink *task = nullptr; + bool need_exec = false; + while (OB_SUCC(queue_.pop(task, QUEUE_WAIT_TIME))) { + Task *t = static_cast(task); + if (OB_NOT_NULL(t)) { + t->func_(need_exec); + OB_DELETE(Task, "PxTask", t); + } + ATOMIC_DEC(&concurrency_); + } +} + ObResourceGroup::ObResourceGroup(int32_t group_id, ObTenant* tenant, share::ObCgroupCtrl *cgroup_ctrl): ObResourceGroupNode(group_id), workers_lock_(tenant->workers_lock_), diff --git a/src/observer/omt/ob_tenant.h b/src/observer/omt/ob_tenant.h index b563f2f22b..03aed0ff5f 100644 --- a/src/observer/omt/ob_tenant.h +++ b/src/observer/omt/ob_tenant.h @@ -53,7 +53,7 @@ namespace omt class ObPxPool : public share::ObThreadPool { - using RunFuncT = std::function; + using RunFuncT = std::function; void run(int64_t idx) final; void run1() final; static const int64_t QUEUE_WAIT_TIME = 100 * 1000; @@ -68,12 +68,14 @@ public: concurrency_(0), active_threads_(0) {} + virtual void stop(); void set_tenant_id(uint64_t tenant_id) { tenant_id_ = tenant_id; } void set_group_id(uint64_t group_id) { group_id_ = group_id; } void set_cgroup_ctrl(share::ObCgroupCtrl *cgroup_ctrl) { cgroup_ctrl_ = cgroup_ctrl; } int64_t get_pool_size() const { return get_thread_count(); } int submit(const RunFuncT &func); void set_px_thread_name(); + int64_t get_queue_size() const { return queue_.size(); } private: void handle(common::ObLink *task); void try_recycle(int64_t idle_time); @@ -110,6 +112,13 @@ public: class ObPxPools { public: + class StopPoolFunc + { + public: + StopPoolFunc() {} + virtual ~StopPoolFunc() = default; + int operator()(common::hash::HashMapPair &kv); + }; class DeletePoolFunc { public: @@ -117,7 +126,6 @@ public: virtual ~DeletePoolFunc() = default; int operator()(common::hash::HashMapPair &kv); }; - class ThreadRecyclePoolFunc { public: @@ -137,6 +145,7 @@ public: } return ret; } + static void mtl_stop(ObPxPools *&pools); static void mtl_destroy(ObPxPools *&pools) { common::ob_delete(pools); diff --git a/src/sql/engine/px/ob_px_worker.cpp b/src/sql/engine/px/ob_px_worker.cpp index fc8b490762..1615902711 100644 --- a/src/sql/engine/px/ob_px_worker.cpp +++ b/src/sql/engine/px/ob_px_worker.cpp @@ -148,7 +148,7 @@ private: ObPxSqcHandler *sqc_handler_; }; -void PxWorkerFunctor::operator ()() +void PxWorkerFunctor::operator ()(bool need_exec) { int ret = OB_SUCCESS; ObCurTraceId::set(env_arg_.get_trace_id()); @@ -163,7 +163,9 @@ void PxWorkerFunctor::operator ()() const bool enable_trace_log = lib::is_trace_log_enabled(); //ensure PX worker skip updating timeout_ts_ by ntp offset THIS_WORKER.set_ntp_offset(0); - if (OB_FAIL(px_int_guard.get_interrupt_reg_ret())) { + if (!need_exec) { + LOG_INFO("px pool already stopped, do not execute the task."); + } else if (OB_FAIL(px_int_guard.get_interrupt_reg_ret())) { LOG_WARN("px worker failed to SET_INTERRUPTABLE"); } else if (OB_NOT_NULL(sqc_handler) && OB_LIKELY(!sqc_handler->has_interrupted())) { THIS_WORKER.set_worker_level(sqc_handler->get_rpc_level()); diff --git a/src/sql/engine/px/ob_px_worker.h b/src/sql/engine/px/ob_px_worker.h index f252467d40..f11a8bd3e5 100644 --- a/src/sql/engine/px/ob_px_worker.h +++ b/src/sql/engine/px/ob_px_worker.h @@ -198,7 +198,7 @@ public: ~PxWorkerFunctor() = default; // px thread will invoke this function. - void operator ()(); + void operator ()(bool need_exec); PxWorkerFunctor &operator = (const PxWorkerFunctor &other) { if (&other != this) {