[DM] memory optmize

This commit is contained in:
obdev
2023-08-29 11:40:37 +00:00
committed by ob-robot
parent 81b8923b14
commit 23c56d9213
4 changed files with 250 additions and 123 deletions

View File

@ -22,7 +22,7 @@ namespace common {
const int64_t DM_INTERRUPT_MSG_MAX_LENGTH = 128;
ObIDetectCallback::ObIDetectCallback(uint64_t tenant_id, const ObIArray<ObPeerTaskState> &peer_states)
: ref_count_(0)
: ref_count_(0), d_node_()
{
int ret = OB_SUCCESS;
peer_states_.set_attr(ObMemAttr(tenant_id, "DmCbStArr"));

View File

@ -18,6 +18,7 @@
#include "sql/engine/px/ob_dfo.h"
#include "sql/engine/px/p2p_datahub/ob_p2p_dh_share_info.h"
#include "lib/container/ob_se_array.h"
#include "lib/list/ob_dlink_node.h"
namespace oceanbase {
@ -67,11 +68,21 @@ enum class DetectCallBackType
P2P_DATAHUB_DETECT_CB = 5,
};
// detectable id with activate time, used for delay detect
class ObDetectableIdDNode : public common::ObDLinkBase<ObDetectableIdDNode>
{
public:
ObDetectableIdDNode() : detectable_id_(), activate_tm_(0) {}
ObDetectableId detectable_id_;
int64_t activate_tm_;
TO_STRING_KV(K_(detectable_id), K_(activate_tm));
};
class ObIDetectCallback
{
public:
// constructor for pass peer_states from derived class
explicit ObIDetectCallback(uint64_t tenant_id, const ObIArray<ObPeerTaskState> &peer_states);
ObIDetectCallback(uint64_t tenant_id, const ObIArray<ObPeerTaskState> &peer_states);
virtual void destroy()
{
peer_states_.reset();
@ -105,6 +116,8 @@ protected:
common::ObAddr from_svr_addr_; // in which server the task is detected as finished
common::ObCurTraceId::TraceId trace_id_;
bool alloc_succ_;
public:
ObDetectableIdDNode d_node_; // used for delay detect
};
class ObQcDetectCB : public ObIDetectCallback

View File

@ -14,16 +14,11 @@
#include "share/detect/ob_detect_manager.h"
#include "lib/ob_running_mode.h"
#include "share/rc/ob_context.h"
#include "lib/lock/ob_spin_lock.h"
namespace oceanbase {
namespace common {
struct ObDetectableIdWrapper : public common::ObLink
{
ObDetectableId detectable_id_;
int64_t activate_tm_;
};
ObDetectableIdGen::ObDetectableIdGen()
{
detect_sequence_id_ = ObTimeUtil::current_time();
@ -56,6 +51,110 @@ int ObDetectableIdGen::generate_detectable_id(ObDetectableId &detectable_id, uin
return ret;
}
int ObDMMultiDlist::init(int64_t tenant_id, int64_t bucket_num)
{
int ret = OB_SUCCESS;
bucket_num_ = bucket_num;
allocator_.set_attr(ObMemAttr(tenant_id, "ObDMMDL"));
locks_.set_allocator(&allocator_);
buckets_.set_allocator(&allocator_);
if (OB_FAIL(locks_.init(bucket_num_))) {
LIB_LOG(WARN, "[DM] failed to init locks_");
} else if (OB_FAIL(buckets_.init(bucket_num_))) {
LIB_LOG(WARN, "[DM] failed to init buckets_");
}
for (int64_t i = 0; OB_SUCC(ret) && i < bucket_num_; ++i) {
ObLockWrapper *lock = OB_NEWx(ObLockWrapper, &allocator_);
ObDList<ObDetectableIdDNode> *dlist = OB_NEWx(ObDList<ObDetectableIdDNode>, &allocator_);
if (OB_ISNULL(lock) || OB_ISNULL(dlist)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LIB_LOG(WARN, "[DM] failed to allocate");
} else if (OB_FAIL(locks_.push_back(lock))) {
LIB_LOG(WARN, "[DM] failed to push_back lock");
} else if (OB_FAIL(buckets_.push_back(dlist))) {
LIB_LOG(WARN, "[DM] failed to push_back dlist");
}
}
return ret;
}
void ObDMMultiDlist::destroy() {
for (int64_t i = 0; i < locks_.count(); ++i) {
locks_.at(i)->~ObLockWrapper();
allocator_.free(locks_.at(i));
}
for (int64_t i = 0; i < buckets_.count(); ++i) {
buckets_.at(i)->~ObDList<ObDetectableIdDNode>();
allocator_.free(buckets_.at(i));
}
locks_.destroy();
buckets_.destroy();
LIB_LOG(INFO, "[DM] destroy ObDMMultiDlist");
}
void ObDMMultiDlist::add_list_node_tail(const ObDetectableId &detectable_id, ObDetectableIdDNode *node)
{
int64_t bucket_id = detectable_id.first_ % bucket_num_;
{
ObLockGuard<common::ObSpinLock> lock_guard(*locks_.at(bucket_id));
buckets_.at(bucket_id)->add_last(node);
}
}
void ObDMMultiDlist::remove_list_node(const ObDetectableId &detectable_id, ObDetectableIdDNode *node)
{
int64_t bucket_id = detectable_id.first_ % bucket_num_;
{
ObLockGuard<common::ObSpinLock> lock_guard(*locks_.at(bucket_id));
buckets_.at(bucket_id)->remove(node);
}
}
void ObDMMultiDlist::pop_active_node(
hash::ObHashSet<ObDetectableId, hash::NoPthreadDefendMode> &still_need_check_id)
{
int ret = OB_SUCCESS;
int64_t cur_time = ObTimeUtil::current_time();
for (int64_t i = 0; OB_SUCC(ret) && i < bucket_num_; ++i) {
bool end_bucket_traverse = false;
// pop MAX_LOCK_LENGTH nodes per lock time
while (!end_bucket_traverse && OB_SUCC(ret)) {
int64_t poped_cnt = 0;
ObLockGuard<common::ObSpinLock> lock_guard(*locks_.at(i));
if (buckets_.at(i)->is_empty()) {
break;
}
ObDetectableIdDNode *cur = buckets_.at(i)->get_first();
ObDetectableIdDNode *next = nullptr;
while (OB_SUCC(ret) && ++poped_cnt <= MAX_LOCK_LENGTH) {
if (cur->activate_tm_ > cur_time) {
// all remain nodes in this bucket are not active, stopping traverse this bucket.
end_bucket_traverse = true;
break;
} else {
if (OB_FAIL(still_need_check_id.set_refactored(cur->detectable_id_))) {
if (OB_HASH_EXIST != ret) {
end_bucket_traverse = true;
LIB_LOG(WARN, "[DM] failed to set_refactored", K(ret), K(cur->detectable_id_));
break;
} else {
ret = OB_SUCCESS;
}
}
// pop cur node
next = cur->get_next();
buckets_.at(i)->remove(cur);
cur = next;
if (buckets_.at(i)->is_empty()) {
end_bucket_traverse = true;
break;
}
}
}
}
}
}
int ObDetectManager::mtl_init(ObDetectManager *&dm)
{
int ret = OB_SUCCESS;
@ -88,18 +187,9 @@ void ObDetectManager::mtl_destroy(ObDetectManager *&dm)
void ObDetectManager::destroy()
{
int ret = OB_SUCCESS;
// destroy wrapper in fifo_que_
int64_t que_size = fifo_que_.size();
for (int64_t idx = 0; idx < que_size; ++idx) {
common::ObLink *p = nullptr;
IGNORE_RETURN fifo_que_.pop(p);
ObDetectableIdWrapper *wrapper = static_cast<ObDetectableIdWrapper *>(p);
if (OB_ISNULL(wrapper)) {
LIB_LOG(WARN, "[DM] wrapper is null");
continue;
}
mem_context_->get_malloc_allocator().free(wrapper);
}
// destroy dm_multi_list_
dm_multi_list_.destroy();
// destroy node and callback in all_check_items_
FOREACH(iter, all_check_items_) {
const ObDetectableId &detectable_id = iter->first;
@ -160,6 +250,9 @@ int ObDetectManager::init(const ObAddr &self, double mem_factor)
tenant_id_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LIB_LOG(WARN, "[DM] create hash set failed", K(ret));
} else if (OB_FAIL(dm_multi_list_.init(tenant_id_,
is_meta_tenant(tenant_id_) ? DM_MDLIST_BUCKETS_COUNT_FOR_META_TENANT : DM_MDLIST_BUCKETS_COUNT))) {
LIB_LOG(WARN, "[DM] failed to init ObDMMultiDlist", K(ret));
} else {
self_ = self;
is_inited_ = true;
@ -202,67 +295,46 @@ int ObDetectManager::unregister_detectable_id(const ObDetectableId &detectable_i
return ret;
}
int ObDetectManager::do_register_check_item(const ObDetectableId &detectable_id, ObIDetectCallback *cb,
int ObDetectManager::do_register_check_item(const ObDetectableId &detectable_id, ObDetectCallbackNode *cb_node,
uint64_t &node_sequence_id, bool need_ref)
{
int ret = OB_SUCCESS;
ObDetectCallbackNode *cb_node = nullptr;
if (OB_ISNULL(cb)) {
if (OB_ISNULL(cb_node)) {
ret = OB_INVALID_ARGUMENT;
LIB_LOG(WARN, "[DM] invaild cb pointer");
} else if (OB_FAIL(create_cb_node(cb, cb_node))) {
cb->destroy();
mem_context_->free(cb);
LIB_LOG(WARN, "[DM] fail to create cb node", K(ret));
LIB_LOG(WARN, "[DM] invaild cb node pointer");
} else {
ObDetectableIdWrapper *wrapper = OB_NEWx(ObDetectableIdWrapper, &mem_context_->get_malloc_allocator());
LIB_LOG(DEBUG, "[DM] dm new wrapper ", K(wrapper));
if (OB_ISNULL(wrapper)) {
delete_cb_node(cb_node);
ret = OB_ALLOCATE_MEMORY_FAILED;
LIB_LOG(WARN, "[DM] failed to create wrapper");
} else {
// if need_ref is true, which means that work thread may use cb, so add ref_count in case of deleting cb during ObDetectCallbackNodeExecuteCall
// typical scene: qc may change sqc's task state in callback upon receiving the report message from sqc, but dm may free callback already
if (need_ref) {
cb->inc_ref_count();
}
node_sequence_id = cb_node->sequence_id_;
// A slightly more complicated but safe inspection operation
// Since map does not provide the operation of "create or modify", nor does it provide the ability to hold bucket locks
// Therefore, add cyclic verification to prevent the occurrence of
// thread a failed to create --> thread b erased --> thread a failed to modify
// Such a situation
do {
if (OB_HASH_EXIST == (ret = all_check_items_.set_refactored(detectable_id, cb_node))) {
ObDetectCallbackNodeAddCall add_node_call(cb_node);
ret = all_check_items_.atomic_refactored(detectable_id, add_node_call);
// If it is an empty queue, it means that another thread wants to delete the node but unexpectedly got the lock by this thread
// So do not delete, try to put again according to HASH_NOT_EXIST
if (add_node_call.is_empty()) {
ret = OB_HASH_NOT_EXIST;
}
// if need_ref is true, which means that work thread may use cb, so add ref_count in case of deleting cb during ObDetectCallbackNodeExecuteCall
// typical scene: qc may change sqc's task state in callback upon receiving the report message from sqc, but dm may free callback already
if (need_ref) {
cb_node->cb_->inc_ref_count();
}
node_sequence_id = cb_node->sequence_id_;
// A slightly more complicated but safe inspection operation
// Since map does not provide the operation of "create or modify", nor does it provide the ability to hold bucket locks
// Therefore, add cyclic verification to prevent the occurrence of
// thread a failed to create --> thread b erased --> thread a failed to modify
// Such a situation
ObObDetectCallbackNodeSetCall set_node_call(this);
do {
if (OB_HASH_EXIST == (ret = all_check_items_.set_refactored(detectable_id, cb_node, 0, 0, 0, &set_node_call))) {
ObDetectCallbackNodeAddCall add_node_call(cb_node, this);
ret = all_check_items_.atomic_refactored(detectable_id, add_node_call);
// If it is an empty queue, it means that another thread wants to delete the node but unexpectedly got the lock by this thread
// So do not delete, try to put again according to HASH_NOT_EXIST
if (add_node_call.is_empty()) {
ret = OB_HASH_NOT_EXIST;
}
} while (ret == OB_HASH_NOT_EXIST);
if (OB_SUCC(ret)) {
// For short queries, we do not need to detect
// Use a fifo que to make detect delay, all queries existed more than ACTIVATE_DELAY_TIME will be activate and be detected by dm.
wrapper->detectable_id_ = detectable_id;
wrapper->activate_tm_ = ObTimeUtility::current_time() + ACTIVATE_DELAY_TIME;
// push is never fail
IGNORE_RETURN fifo_que_.push(wrapper);
}
// hashmap may set_refactored for alloc failed
if OB_FAIL(ret) {
delete_cb_node(cb_node);
}
} while (ret == OB_HASH_NOT_EXIST);
// hashmap may set_refactored for alloc failed
if OB_FAIL(ret) {
delete_cb_node(cb_node);
}
}
if (OB_SUCC(ret)) {
LIB_LOG(DEBUG, "[DM] register_check_item", K(ret), K(detectable_id),
K(cb->get_detect_callback_type()), K(node_sequence_id));
K(cb_node->cb_->get_detect_callback_type()), K(node_sequence_id));
}
return ret;
}
@ -299,33 +371,8 @@ int ObDetectManager::unregister_check_item(const ObDetectableId &detectable_id,
int ObDetectManager::gather_requests(REQUEST_MAP &req_map, lib::MemoryContext &req_map_context)
{
int ret = OB_SUCCESS;
int64_t que_size = fifo_que_.size();
int64_t cur_time = ObTimeUtil::current_time();
// don't break if failed
for (int64_t idx = 0; idx < que_size; ++idx) {
common::ObLink *p = nullptr;
IGNORE_RETURN fifo_que_.pop(p);
ObDetectableIdWrapper *wrapper = static_cast<ObDetectableIdWrapper *>(p);
if (OB_ISNULL(wrapper)) {
LIB_LOG(WARN, "[DM] wrapper is null");
continue;
} else if (wrapper->activate_tm_ > cur_time) {
// the check item is not activated, push to fifo que again
// push_front never failed, because wrapper is not null
IGNORE_RETURN fifo_que_.push_front(wrapper);
break;
}
// push all activated check item into still_need_check_id_
if (OB_FAIL(still_need_check_id_.set_refactored(wrapper->detectable_id_, 0/* flag */))) {
if (OB_HASH_EXIST != ret) {
LIB_LOG(WARN, "[DM] failed to set_refactored", K(ret), K(wrapper->detectable_id_));
} else {
ret = OB_SUCCESS;
}
}
mem_context_->get_malloc_allocator().free(wrapper);
LIB_LOG(DEBUG, "[DM] dm free wrapper ", K(wrapper));
}
IGNORE_RETURN dm_multi_list_.pop_active_node(still_need_check_id_);
ObSEArray<ObDetectableId, 32> remove_list;
FOREACH(iter, still_need_check_id_) {
const ObDetectableId &detectable_id = iter->first;
@ -384,32 +431,26 @@ void ObDetectManager::do_handle_one_result(const ObDetectableId &detectable_id,
}
}
int ObDetectManager::create_cb_node(ObIDetectCallback *cb, ObDetectCallbackNode *&cb_node)
{
int ret = OB_SUCCESS;
// each node has its own unique node_sequence_id for identify itself in the linked list
cb_node = OB_NEWx(ObDetectCallbackNode, &mem_context_->get_malloc_allocator(),
cb, ObDetectableIdGen::instance().get_callback_node_sequence_id());
if (OB_ISNULL(cb_node)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LIB_LOG(WARN, "[DM] failed to new cb_node");
}
LIB_LOG(DEBUG, "[DM] dm new cb node ", K(cb_node));
return ret;
}
void ObDetectManager::delete_cb_node(ObDetectCallbackNode *&cb_node)
{
int ret = OB_SUCCESS;
// cb and cb_node allocated simultaneously, just need to free cb_node
LIB_LOG(DEBUG, "[DM] dm free cb ", K(cb_node->cb_));
cb_node->cb_->destroy();
mem_context_->free(cb_node->cb_);
cb_node->cb_ = nullptr;
LIB_LOG(DEBUG, "[DM] dm free cbnode ", K(cb_node));
mem_context_->free(cb_node);
cb_node = nullptr;
}
int ObDetectManager::ObObDetectCallbackNodeSetCall::operator()(const hash::HashMapPair<ObDetectableId,
ObDetectCallbackNode *> &entry)
{
int ret = OB_SUCCESS;
(void) dm_->dm_multi_list_.add_list_node_tail(entry.first, &entry.second->cb_->d_node_);
return ret;
}
void ObDetectManager::ObDetectCallbackNodeAddCall::operator()(hash::HashMapPair<ObDetectableId,
ObDetectCallbackNode *> &entry)
{
@ -418,6 +459,7 @@ void ObDetectManager::ObDetectCallbackNodeAddCall::operator()(hash::HashMapPair<
cb_node_->next_ = entry.second;
entry.second->prev_ = cb_node_;
entry.second = cb_node_;
(void) dm_->dm_multi_list_.add_list_node_tail(entry.first, &cb_node_->cb_->d_node_);
is_empty_ = false;
} else {
is_empty_ = true;
@ -445,6 +487,7 @@ bool ObDetectManager::ObDetectCallbackNodeRemoveCall::operator()(hash::HashMapPa
// Prev is empty, which means that the current deleted element is the head of the linked list pointed to by map_value, and head is set to next
entry.second = node->next_;
}
(void) dm_->dm_multi_list_.remove_list_node(entry.first, &node->cb_->d_node_);
dm_->delete_cb_node(node);
node_cnt--;
break;
@ -496,6 +539,7 @@ bool ObDetectManager::ObDetectCallbackNodeExecuteCall::operator()(hash::HashMapP
// Prev is empty, which means that the current deleted element is the head of the linked list pointed to by map_value, and head is set to next
entry.second = node->next_;
}
(void) dm_->dm_multi_list_.remove_list_node(entry.first, &node->cb_->d_node_);
dm_->delete_cb_node(node);
node_cnt--;
}

View File

@ -16,7 +16,7 @@
#include "share/detect/ob_detect_rpc_proxy.h"
#include "lib/container/ob_array.h"
#include "lib/hash/ob_hashset.h"
#include "lib/queue/ob_link_queue.h"
#include "lib/list/ob_dlist.h"
#include "lib/thread/thread_mgr_interface.h"
namespace oceanbase {
@ -51,6 +51,40 @@ private:
volatile uint64_t callback_node_sequence_id_; // for mark specific node in linked list(value of CHECK_MAP)
};
// Detect Manager multi Double link list
// ObDMMultiDlist is used to record all the detect callback's detectable id in a fifo manner,
// the node of list is inline in detect callback.
// when add or remove a list node, the lock is applied to guarantee the atomicity.
class ObDMMultiDlist
{
private:
static const int64_t MAX_LOCK_LENGTH = 100;
// the reason why we need to derive is that ObFixedArray<T> needs T has member function to_string
class ObLockWrapper : public common::ObSpinLock
{
public:
inline int64_t to_string(char *buf, const int64_t len) const { return 0; }
} CACHE_ALIGNED;
public:
ObDMMultiDlist()
: bucket_num_(0), allocator_(), locks_(), buckets_() {}
~ObDMMultiDlist() {}
int init(int64_t tenant_id, int64_t bucket_num);
void destroy();
// remove a node in list, the atomicity is guaranteed by lock
void remove_list_node(const ObDetectableId &detectable_id, ObDetectableIdDNode *node);
// insert a node at the tail of list, the atomicity is guaranteed by lock
void add_list_node_tail(const ObDetectableId &detectable_id, ObDetectableIdDNode *node);
// pop all activate node from list, the atomicity is guaranteed by lock
void pop_active_node(hash::ObHashSet<ObDetectableId, hash::NoPthreadDefendMode> &still_need_check_id);
private:
int64_t bucket_num_;
ObArenaAllocator allocator_;
ObFixedArray<ObLockWrapper *, common::ObIAllocator> locks_;
ObFixedArray<ObDList<ObDetectableIdDNode> *, common::ObIAllocator> buckets_;
};
static const int64_t DEFAULT_REQUEST_MAP_BUCKETS_COUNT = 100; //100
typedef hash::ObHashMap<common::ObAddr, obrpc::ObTaskStateDetectReq *,
hash::SpinReadWriteDefendMode,
@ -70,6 +104,9 @@ private:
static const int64_t MINI_MODE_SET_BUCKETS_COUNT = 10000; //1w
// for still_need_check_id_
static const int64_t MIDDLE_SET_BUCKETS_COUNT = 100000; //10w
// for ObDMMultiDlist bucket
static const int64_t DM_MDLIST_BUCKETS_COUNT = 128;
static const int64_t DM_MDLIST_BUCKETS_COUNT_FOR_META_TENANT = 4;
static const uint64_t ACTIVATE_DELAY_TIME = 5 * 1000L * 1000L; // dm only detects checkitems that have been present for at least "ACTIVATE_DELAY_TIME" seconds
public:
@ -77,18 +114,43 @@ public:
static void mtl_destroy(ObDetectManager *&dm);
public:
/* tool classes */
/* The CHECK_MAP's construct:
key_id bucket node value
___
123 -> |___|->ObDetectCallbackNode*->ObDetectCallbackNode*->...->nullptr
|___|->ObDetectCallbackNode*->ObDetectCallbackNode*->...->nullptr
___
124 -> |___|->ObDetectCallbackNode*->ObDetectCallbackNode*->...->nullptr
|___|->ObDetectCallbackNode*->ObDetectCallbackNode*->...->nullptr
means the value of the CHECK_MAP is a linked list.
for same ObDetectableId, we may insert more than one nodes into the linked list.
when the linked list is empty, we use ObObDetectCallbackNodeSetCall to set the first node
otherwise, we use ObDetectCallbackNodeAddCall to insert a node into the linked list
*/
typedef hash::ObHashMap<ObDetectableId, ObDetectCallbackNode *,
hash::SpinReadWriteDefendMode, hash::hash_func<ObDetectableId>,
hash::equal_to<ObDetectableId>> CHECK_MAP;
/// Atomic insertion callback
// Atomic insert the first detect callback node in linklist
class ObObDetectCallbackNodeSetCall
{
public:
explicit ObObDetectCallbackNodeSetCall(ObDetectManager *dm) : dm_(dm) {};
int operator()(const hash::HashMapPair<ObDetectableId, ObDetectCallbackNode *> &entry);
private:
ObDetectManager *dm_;
};
// Atomic insertion a callback node into the linklist
class ObDetectCallbackNodeAddCall
{
public:
void operator()(hash::HashMapPair<ObDetectableId, ObDetectCallbackNode *> &entry);
explicit ObDetectCallbackNodeAddCall(ObDetectCallbackNode *cb_node) :
cb_node_(cb_node), is_empty_(false) {};
ObDetectCallbackNodeAddCall(ObDetectCallbackNode *cb_node, ObDetectManager *dm) :
cb_node_(cb_node), dm_(dm), is_empty_(false) {};
inline bool is_empty()
{
@ -96,10 +158,11 @@ public:
}
private:
ObDetectCallbackNode *cb_node_;
ObDetectManager *dm_;
bool is_empty_;
};
/// Atomic removal callback, lock the bucket to avoid reading and inserting operations during removal
// Atomic removal callback, lock the bucket to avoid reading and inserting operations during removal
class ObDetectCallbackNodeRemoveCall
{
public:
@ -178,19 +241,27 @@ public:
ret = OB_INVALID_ARGUMENT;
LIB_LOG(WARN, "[DM] invaild detectable_id", K(common::lbt()));
} else {
T* ptr = NULL;
T* ptr = nullptr;
ObDetectCallbackNode *cb_node = nullptr;
ObIAllocator &allocator = get_mem_context()->get_malloc_allocator();
void *buf = allocator.alloc(sizeof(T));
// Simultaneously allocate memory for both ObDetectCallbackNode and ObDetectCallback
// to reduce the number of allocations.
void *buf = allocator.alloc(sizeof(ObDetectCallbackNode) + sizeof(T));
if (OB_NOT_NULL(buf)) {
ptr = new(buf) T(detectable_id.tenant_id_, args...);
ptr = new((char *)(buf) + sizeof(ObDetectCallbackNode)) T(detectable_id.tenant_id_, args...);
if (!ptr->alloc_succ()) {
ret = OB_ALLOCATE_MEMORY_FAILED;
ptr->destroy();
allocator.free(buf);
LIB_LOG(WARN, "[DM] failed to new cb ", K(ptr));
} else {
cb_node = new(buf) ObDetectCallbackNode(ptr, ObDetectableIdGen::instance().get_callback_node_sequence_id());
ObCurTraceId::TraceId *cur_thread_id = ObCurTraceId::get_trace_id();
ptr->set_trace_id(*cur_thread_id);
ptr->d_node_.detectable_id_ = detectable_id;
ptr->d_node_.activate_tm_ = ObTimeUtility::current_time() + ACTIVATE_DELAY_TIME;
LIB_LOG(DEBUG, "[DM] dm new cb ", K(ptr));
if (OB_FAIL(do_register_check_item(detectable_id, ptr, node_sequence_id, need_ref))) {
if (OB_FAIL(do_register_check_item(detectable_id, cb_node, node_sequence_id, need_ref))) {
LIB_LOG(WARN, "[DM] failed to register_check_item", K(ptr));
} else {
cb = ptr;
@ -216,10 +287,9 @@ public:
lib::MemoryContext &get_mem_context() { return mem_context_; }
private:
int do_register_check_item(const ObDetectableId &detectable_id, ObIDetectCallback *cb,
int do_register_check_item(const ObDetectableId &detectable_id, ObDetectCallbackNode *cb_node,
uint64_t &node_sequence_id, bool need_ref = false);
int create_cb_node(ObIDetectCallback *cb, ObDetectCallbackNode *&cb_node);
void delete_cb_node(ObDetectCallbackNode *&cb_node);
int gather_requests(REQUEST_MAP &req_map, lib::MemoryContext &req_map_context);
void do_detect_local(const ObDetectableId &detectable_id);
@ -230,7 +300,7 @@ private:
friend class ObDetectManagerThread;
hash::ObHashSet<ObDetectableId, hash::SpinReadWriteDefendMode> detectable_ids_;
ObLinkQueue fifo_que_;
ObDMMultiDlist dm_multi_list_;
CHECK_MAP all_check_items_;
// still_need_check_id_ only operated by dm's detect thread, there is no data race.
hash::ObHashSet<ObDetectableId, hash::NoPthreadDefendMode> still_need_check_id_;