Change the ap_mini_task_mgr memory allocator from TCFactory to op_reclaim_alloc
This commit is contained in:
committed by
wangzelin.wzl
parent
28e32c861b
commit
4a67d3f351
@ -722,7 +722,7 @@ struct OPNum {
|
||||
|
||||
template <class Type, bool Cond = true>
|
||||
struct GetOPLabel {
|
||||
static const int64_t v = Type::OP_LABEL;
|
||||
static constexpr const char *v = Type::OP_LABEL;
|
||||
};
|
||||
|
||||
template <class Type>
|
||||
|
||||
@ -22,8 +22,6 @@
|
||||
namespace oceanbase {
|
||||
using namespace common;
|
||||
namespace sql {
|
||||
REGISTER_CREATOR(ObAPMiniTaskMgrGFactory, ObAPMiniTaskMgr, ObAPMiniTaskMgr, 0);
|
||||
|
||||
int ObAPMiniTaskMgr::save_task_result(
|
||||
const ObAddr& task_addr, int64_t task_id, int32_t ret_code, const ObMiniTaskResult& result)
|
||||
{
|
||||
@ -87,8 +85,7 @@ int ObAPMiniTaskMgr::init(ObSQLSessionInfo& session, ObExecutorRpcImpl* exec_rpc
|
||||
void ObAPMiniTaskMgr::reset()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObDLinkBase<ObAPMiniTaskMgr>::reset();
|
||||
void* p = NULL;
|
||||
void *p = NULL;
|
||||
while (OB_SUCC(finish_queue_.pop(p, 0))) {
|
||||
if (p != NULL) {
|
||||
ObMiniTaskEvent* task_event = static_cast<ObMiniTaskEvent*>(p);
|
||||
|
||||
@ -17,10 +17,13 @@
|
||||
#include "lib/allocator/ob_safe_arena.h"
|
||||
namespace oceanbase {
|
||||
namespace sql {
|
||||
class ObAPMiniTaskMgr : public common::ObDLinkBase<ObAPMiniTaskMgr> {
|
||||
class ObAPMiniTaskMgr {
|
||||
static const int64_t MAX_FINISH_QUEUE_CAPACITY = 512;
|
||||
|
||||
public:
|
||||
static const int64_t OP_LOCAL_NUM = 1;
|
||||
static constexpr const char *OP_LABEL = ObModIds::OB_SQL_EXECUTOR_MINI_TASK_MGR;
|
||||
|
||||
ObAPMiniTaskMgr()
|
||||
: ref_count_(0),
|
||||
mgr_rcode_(common::OB_SUCCESS),
|
||||
@ -30,11 +33,8 @@ public:
|
||||
lock_()
|
||||
{}
|
||||
virtual ~ObAPMiniTaskMgr()
|
||||
{}
|
||||
|
||||
int32_t get_type()
|
||||
{
|
||||
return 0;
|
||||
reset();
|
||||
}
|
||||
static ObAPMiniTaskMgr* alloc();
|
||||
static void free(ObAPMiniTaskMgr* item);
|
||||
@ -47,7 +47,7 @@ public:
|
||||
return ATOMIC_SAF((uint64_t*)&ref_count_, 1);
|
||||
}
|
||||
int init(ObSQLSessionInfo& session, ObExecutorRpcImpl* exec_rpc);
|
||||
virtual void reset() override;
|
||||
void reset();
|
||||
void set_mgr_rcode(int mgr_rcode)
|
||||
{
|
||||
mgr_rcode_ = mgr_rcode;
|
||||
@ -88,21 +88,9 @@ private:
|
||||
mutable common::ObSpinLock lock_;
|
||||
};
|
||||
|
||||
typedef common::ObGlobalFactory<ObAPMiniTaskMgr, 1, common::ObModIds::OB_SQL_EXECUTOR_MINI_TASK_MGR>
|
||||
ObAPMiniTaskMgrGFactory;
|
||||
typedef common::ObTCFactory<ObAPMiniTaskMgr, 1, common::ObModIds::OB_SQL_EXECUTOR_MINI_TASK_MGR>
|
||||
ObApMiniTaskMgrTCFactory;
|
||||
|
||||
inline ObAPMiniTaskMgr* ObAPMiniTaskMgr::alloc()
|
||||
inline ObAPMiniTaskMgr *ObAPMiniTaskMgr::alloc()
|
||||
{
|
||||
ObAPMiniTaskMgr* ap_mini_task_mgr = NULL;
|
||||
if (OB_ISNULL(ObApMiniTaskMgrTCFactory::get_instance())) {
|
||||
SQL_EXE_LOG(ERROR, "get ap mini task mgr factory instance failed");
|
||||
ap_mini_task_mgr = NULL;
|
||||
} else {
|
||||
ap_mini_task_mgr = ObApMiniTaskMgrTCFactory::get_instance()->get(0);
|
||||
}
|
||||
return ap_mini_task_mgr;
|
||||
return op_reclaim_alloc(ObAPMiniTaskMgr);
|
||||
}
|
||||
|
||||
inline void ObAPMiniTaskMgr::free(ObAPMiniTaskMgr* item)
|
||||
@ -111,13 +99,8 @@ inline void ObAPMiniTaskMgr::free(ObAPMiniTaskMgr* item)
|
||||
int64_t ref_count = item->def_ref_count();
|
||||
if (OB_LIKELY(0 == ref_count)) {
|
||||
// nobody reference this object, so free it
|
||||
if (OB_ISNULL(ObApMiniTaskMgrTCFactory::get_instance())) {
|
||||
SQL_EXE_LOG(ERROR, "get ap mini task mgr factory instance failed");
|
||||
} else {
|
||||
item->reset();
|
||||
ObApMiniTaskMgrTCFactory::get_instance()->put(item);
|
||||
item = NULL;
|
||||
}
|
||||
op_reclaim_free(item);
|
||||
item = nullptr;
|
||||
} else if (OB_UNLIKELY(ref_count < 0)) {
|
||||
SQL_EXE_LOG(ERROR, "ref_count is invalid", K(ref_count));
|
||||
}
|
||||
|
||||
@ -49,8 +49,7 @@ inline int init_sql_factories()
|
||||
// ATTENTION: don't delete this log, it's used to init thread local variable LogBufferMgr in ObLog.
|
||||
SQL_LOG(INFO, "init sql factories");
|
||||
int ret = common::OB_SUCCESS;
|
||||
if (OB_FAIL(ObAPMiniTaskMgrGFactory::get_instance()->init())) {
|
||||
} else if (OB_FAIL(ObPxSqcHandlerFactory::get_instance()->init())) {
|
||||
if (OB_FAIL(ObPxSqcHandlerFactory::get_instance()->init())) {
|
||||
SQL_LOG(ERROR, "failed to init sqc handler", K(ret));
|
||||
} else {
|
||||
ObExprOperatorFactory::register_expr_operators();
|
||||
|
||||
Reference in New Issue
Block a user