[CP] fix sqc handler memory leak when drop tenant

This commit is contained in:
obdev
2024-02-08 09:47:28 +00:00
committed by ob-robot
parent 38c85c932c
commit 51a7041b17
5 changed files with 63 additions and 9 deletions

View File

@ -523,7 +523,7 @@ int ObMultiTenant::init(ObAddr myaddr,
MTL_BIND2(mtl_new_default, ObTenantCGReadInfoMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObTenantCGReadInfoMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, ObDecodeResourcePool::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObDecodeResourcePool::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND(ObPxPools::mtl_init, ObPxPools::mtl_destroy); MTL_BIND2(nullptr, ObPxPools::mtl_init, nullptr, ObPxPools::mtl_stop, nullptr, ObPxPools::mtl_destroy);
MTL_BIND(ObTenantDfc::mtl_init, ObTenantDfc::mtl_destroy); MTL_BIND(ObTenantDfc::mtl_init, ObTenantDfc::mtl_destroy);
MTL_BIND(init_compat_mode, nullptr); MTL_BIND(init_compat_mode, nullptr);
MTL_BIND2(ObMySQLRequestManager::mtl_new, ObMySQLRequestManager::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, ObMySQLRequestManager::mtl_destroy); MTL_BIND2(ObMySQLRequestManager::mtl_new, ObMySQLRequestManager::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, ObMySQLRequestManager::mtl_destroy);

View File

@ -162,7 +162,7 @@ int ObPxPools::ThreadRecyclePoolFunc::operator() (common::hash::HashMapPair<int6
return ret; return ret;
} }
int ObPxPools::DeletePoolFunc::operator() (common::hash::HashMapPair<int64_t, ObPxPool*> &kv) int ObPxPools::StopPoolFunc::operator() (common::hash::HashMapPair<int64_t, ObPxPool*> &kv)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t &group_id = kv.first; int64_t &group_id = kv.first;
@ -172,15 +172,41 @@ int ObPxPools::DeletePoolFunc::operator() (common::hash::HashMapPair<int64_t, Ob
} else { } else {
pool->stop(); pool->stop();
LOG_INFO("DEL_POOL_STEP_1: mark px pool stop succ!", K(group_id)); LOG_INFO("DEL_POOL_STEP_1: mark px pool stop succ!", K(group_id));
}
return ret;
}
int ObPxPools::DeletePoolFunc::operator() (common::hash::HashMapPair<int64_t, ObPxPool*> &kv)
{
int ret = OB_SUCCESS;
int64_t &group_id = kv.first;
ObPxPool *pool = kv.second;
if (NULL == pool) {
LOG_WARN("pool is null", K(group_id));
} else {
pool->wait(); pool->wait();
LOG_INFO("DEL_POOL_STEP_2: wait pool empty succ!", K(group_id)); LOG_INFO("DEL_POOL_STEP_2: wait pool empty succ!", K(group_id));
pool->destroy(); pool->destroy();
LOG_INFO("DEL_POOL_STEP_3: pool destroy succ!", K(group_id)); LOG_INFO("DEL_POOL_STEP_3: pool destroy succ!", K(group_id), K(pool->get_queue_size()));
common::ob_delete(pool); common::ob_delete(pool);
} }
return ret; return ret;
} }
void ObPxPools::mtl_stop(ObPxPools *&pools)
{
int ret = OB_SUCCESS;
common::SpinWLockGuard g(pools->lock_);
if (OB_ISNULL(pools)) {
LOG_WARN("pools is null");
} else {
StopPoolFunc stop_pool_func;
if (OB_FAIL(pools->pool_map_.foreach_refactored(stop_pool_func))) {
LOG_WARN("failed to do foreach", K(ret));
}
}
}
void ObPxPools::destroy() void ObPxPools::destroy()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -226,7 +252,8 @@ void ObPxPool::handle(ObLink *task)
if (t == nullptr) { if (t == nullptr) {
LOG_ERROR_RET(OB_INVALID_ARGUMENT, "px task is invalid"); LOG_ERROR_RET(OB_INVALID_ARGUMENT, "px task is invalid");
} else { } else {
t->func_(); bool need_exec = true;
t->func_(need_exec);
OB_DELETE(Task, "PxTask", t); OB_DELETE(Task, "PxTask", t);
} }
ATOMIC_DEC(&concurrency_); ATOMIC_DEC(&concurrency_);
@ -314,6 +341,22 @@ void ObPxPool::try_recycle(int64_t idle_time)
} }
} }
void ObPxPool::stop()
{
int ret = OB_SUCCESS;
Threads::stop();
ObLink *task = nullptr;
bool need_exec = false;
while (OB_SUCC(queue_.pop(task, QUEUE_WAIT_TIME))) {
Task *t = static_cast<Task*>(task);
if (OB_NOT_NULL(t)) {
t->func_(need_exec);
OB_DELETE(Task, "PxTask", t);
}
ATOMIC_DEC(&concurrency_);
}
}
ObResourceGroup::ObResourceGroup(int32_t group_id, ObTenant* tenant, share::ObCgroupCtrl *cgroup_ctrl): ObResourceGroup::ObResourceGroup(int32_t group_id, ObTenant* tenant, share::ObCgroupCtrl *cgroup_ctrl):
ObResourceGroupNode(group_id), ObResourceGroupNode(group_id),
workers_lock_(tenant->workers_lock_), workers_lock_(tenant->workers_lock_),

View File

@ -53,7 +53,7 @@ namespace omt
class ObPxPool class ObPxPool
: public share::ObThreadPool : public share::ObThreadPool
{ {
using RunFuncT = std::function<void ()>; using RunFuncT = std::function<void (bool)>;
void run(int64_t idx) final; void run(int64_t idx) final;
void run1() final; void run1() final;
static const int64_t QUEUE_WAIT_TIME = 100 * 1000; static const int64_t QUEUE_WAIT_TIME = 100 * 1000;
@ -68,12 +68,14 @@ public:
concurrency_(0), concurrency_(0),
active_threads_(0) active_threads_(0)
{} {}
virtual void stop();
void set_tenant_id(uint64_t tenant_id) { tenant_id_ = tenant_id; } void set_tenant_id(uint64_t tenant_id) { tenant_id_ = tenant_id; }
void set_group_id(uint64_t group_id) { group_id_ = group_id; } void set_group_id(uint64_t group_id) { group_id_ = group_id; }
void set_cgroup_ctrl(share::ObCgroupCtrl *cgroup_ctrl) { cgroup_ctrl_ = cgroup_ctrl; } void set_cgroup_ctrl(share::ObCgroupCtrl *cgroup_ctrl) { cgroup_ctrl_ = cgroup_ctrl; }
int64_t get_pool_size() const { return get_thread_count(); } int64_t get_pool_size() const { return get_thread_count(); }
int submit(const RunFuncT &func); int submit(const RunFuncT &func);
void set_px_thread_name(); void set_px_thread_name();
int64_t get_queue_size() const { return queue_.size(); }
private: private:
void handle(common::ObLink *task); void handle(common::ObLink *task);
void try_recycle(int64_t idle_time); void try_recycle(int64_t idle_time);
@ -110,6 +112,13 @@ public:
class ObPxPools class ObPxPools
{ {
public: public:
class StopPoolFunc
{
public:
StopPoolFunc() {}
virtual ~StopPoolFunc() = default;
int operator()(common::hash::HashMapPair<int64_t, ObPxPool*> &kv);
};
class DeletePoolFunc class DeletePoolFunc
{ {
public: public:
@ -117,7 +126,6 @@ public:
virtual ~DeletePoolFunc() = default; virtual ~DeletePoolFunc() = default;
int operator()(common::hash::HashMapPair<int64_t, ObPxPool*> &kv); int operator()(common::hash::HashMapPair<int64_t, ObPxPool*> &kv);
}; };
class ThreadRecyclePoolFunc class ThreadRecyclePoolFunc
{ {
public: public:
@ -137,6 +145,7 @@ public:
} }
return ret; return ret;
} }
static void mtl_stop(ObPxPools *&pools);
static void mtl_destroy(ObPxPools *&pools) static void mtl_destroy(ObPxPools *&pools)
{ {
common::ob_delete(pools); common::ob_delete(pools);

View File

@ -148,7 +148,7 @@ private:
ObPxSqcHandler *sqc_handler_; ObPxSqcHandler *sqc_handler_;
}; };
void PxWorkerFunctor::operator ()() void PxWorkerFunctor::operator ()(bool need_exec)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObCurTraceId::set(env_arg_.get_trace_id()); ObCurTraceId::set(env_arg_.get_trace_id());
@ -163,7 +163,9 @@ void PxWorkerFunctor::operator ()()
const bool enable_trace_log = lib::is_trace_log_enabled(); const bool enable_trace_log = lib::is_trace_log_enabled();
//ensure PX worker skip updating timeout_ts_ by ntp offset //ensure PX worker skip updating timeout_ts_ by ntp offset
THIS_WORKER.set_ntp_offset(0); THIS_WORKER.set_ntp_offset(0);
if (OB_FAIL(px_int_guard.get_interrupt_reg_ret())) { if (!need_exec) {
LOG_INFO("px pool already stopped, do not execute the task.");
} else if (OB_FAIL(px_int_guard.get_interrupt_reg_ret())) {
LOG_WARN("px worker failed to SET_INTERRUPTABLE"); LOG_WARN("px worker failed to SET_INTERRUPTABLE");
} else if (OB_NOT_NULL(sqc_handler) && OB_LIKELY(!sqc_handler->has_interrupted())) { } else if (OB_NOT_NULL(sqc_handler) && OB_LIKELY(!sqc_handler->has_interrupted())) {
THIS_WORKER.set_worker_level(sqc_handler->get_rpc_level()); THIS_WORKER.set_worker_level(sqc_handler->get_rpc_level());

View File

@ -198,7 +198,7 @@ public:
~PxWorkerFunctor() = default; ~PxWorkerFunctor() = default;
// px thread will invoke this function. // px thread will invoke this function.
void operator ()(); void operator ()(bool need_exec);
PxWorkerFunctor &operator = (const PxWorkerFunctor &other) { PxWorkerFunctor &operator = (const PxWorkerFunctor &other) {
if (&other != this) { if (&other != this) {