fix memory managerment of dynamic node in TenantTabletStatMgr

This commit is contained in:
obdev
2023-01-04 11:42:06 +00:00
committed by ob-robot
parent 1c9259243e
commit d8000e3816
4 changed files with 58 additions and 44 deletions

View File

@ -10,8 +10,8 @@
* See the Mulan PubL v2 for more details. * See the Mulan PubL v2 for more details.
*/ */
#ifndef OCEANBASE_OBSERVER_VIRTUAL_TABLE_OB_INFORMATION_KVCACHE_TABLE_ #ifndef OCEANBASE_OBSERVER_VIRTUAL_TABLE_OB_INFORMATION_KVCACHE_TABLE_
#define OCEANBASE_OBSERVER_VIRTUAL_TABLE_OB_INFORMATION_KVCACHE_TABLE_ #define OCEANBASE_OBSERVER_VIRTUAL_TABLE_OB_INFORMATION_KVCACHE_TABLE_
#include "share/ob_virtual_table_scanner_iterator.h" #include "share/ob_virtual_table_scanner_iterator.h"
#include "share/cache/ob_kv_storecache.h" #include "share/cache/ob_kv_storecache.h"

View File

@ -266,6 +266,7 @@ ObTabletStreamPool::ObTabletStreamPool()
: dynamic_allocator_(MTL_ID()), : dynamic_allocator_(MTL_ID()),
free_list_allocator_("FreeTbltStream"), free_list_allocator_("FreeTbltStream"),
free_list_(), free_list_(),
lru_list_(),
max_free_list_num_(0), max_free_list_num_(0),
max_dynamic_node_num_(0), max_dynamic_node_num_(0),
allocated_dynamic_num_(0), allocated_dynamic_num_(0),
@ -283,12 +284,20 @@ void ObTabletStreamPool::destroy()
is_inited_ = false; is_inited_ = false;
ObTabletStreamNode *node = nullptr; ObTabletStreamNode *node = nullptr;
DLIST_REMOVE_ALL_NORET(node, lru_list_) {
lru_list_.remove(node);
node->~ObTabletStreamNode();
node = nullptr;
}
lru_list_.reset();
while (OB_SUCCESS == free_list_.pop(node)) { while (OB_SUCCESS == free_list_.pop(node)) {
if (OB_NOT_NULL(node)) { if (OB_NOT_NULL(node)) {
node->~ObTabletStreamNode(); node->~ObTabletStreamNode();
node = nullptr; node = nullptr;
} }
} }
dynamic_allocator_.reset(); dynamic_allocator_.reset();
free_list_.destroy(); free_list_.destroy();
free_list_allocator_.reset(); free_list_allocator_.reset();
@ -336,11 +345,13 @@ int ObTabletStreamPool::init(
return ret; return ret;
} }
int ObTabletStreamPool::alloc(ObTabletStreamNode *&free_node) int ObTabletStreamPool::alloc(ObTabletStreamNode *&free_node, bool &is_retired)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
is_retired = false;
void *buf = nullptr; void *buf = nullptr;
// 1. try to alloc node from free_list
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("ObTabletStreamPool not inited", K(ret)); LOG_WARN("ObTabletStreamPool not inited", K(ret));
@ -350,16 +361,14 @@ int ObTabletStreamPool::alloc(ObTabletStreamNode *&free_node)
} else if (OB_FAIL(free_list_.pop(free_node))) { } else if (OB_FAIL(free_list_.pop(free_node))) {
if (OB_ENTRY_NOT_EXIST != ret) { if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("failed to pop free node from free list", K(ret)); LOG_WARN("failed to pop free node from free list", K(ret));
} else {
ret = OB_SUCCESS;
} }
} }
if (OB_FAIL(ret)) { // 2. no free node in free_list, try to alloc node dynamically
} else if (NULL == free_node) { if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
if (allocated_dynamic_num_ >= max_dynamic_node_num_) { if (allocated_dynamic_num_ >= max_dynamic_node_num_) {
ret = OB_SIZE_OVERFLOW; ret = OB_SIZE_OVERFLOW;
LOG_WARN("the number of allocated dynamic node has reached MAX", K(ret), K(max_dynamic_node_num_), K(allocated_dynamic_num_));
} else if (OB_ISNULL(buf = dynamic_allocator_.alloc(sizeof(ObTabletStreamNode)))) { } else if (OB_ISNULL(buf = dynamic_allocator_.alloc(sizeof(ObTabletStreamNode)))) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory for free node", K(ret)); LOG_WARN("failed to allocate memory for free node", K(ret));
@ -368,6 +377,18 @@ int ObTabletStreamPool::alloc(ObTabletStreamNode *&free_node)
++allocated_dynamic_num_; ++allocated_dynamic_num_;
} }
} }
// 3. dynamic node has reached the upper limit, try to retire the oldest node in lru_list
if (OB_SIZE_OVERFLOW == ret) {
ret = OB_SUCCESS;
if (lru_list_.is_empty()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("lru list is unexpected null", K(ret));
} else {
free_node = lru_list_.get_last();
is_retired = true;
}
}
return ret; return ret;
} }
@ -377,13 +398,14 @@ void ObTabletStreamPool::free(ObTabletStreamNode *node)
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
if (IS_NOT_INIT) { if (IS_NOT_INIT) {
tmp_ret = OB_NOT_INIT; tmp_ret = OB_NOT_INIT;
LOG_ERROR("[MEMORY LEAK] ObTabletStreamPool is not inited, cannot free this node!!!", K(tmp_ret), KPC(node)); LOG_ERROR("[MEMORY LEAK] ObTabletStreamPool is not inited, cannot free this node!!!",
K(tmp_ret), KPC(node));
} else if (DYNAMIC_ALLOC == node->flag_) { } else if (DYNAMIC_ALLOC == node->flag_) {
node->~ObTabletStreamNode(); node->~ObTabletStreamNode();
dynamic_allocator_.free(node); dynamic_allocator_.free(node);
--allocated_dynamic_num_; --allocated_dynamic_num_;
} else { } else {
node->reset(); node->~ObTabletStreamNode();
OB_ASSERT(OB_SUCCESS == free_list_.push(node)); OB_ASSERT(OB_SUCCESS == free_list_.push(node));
} }
} }
@ -395,7 +417,6 @@ ObTenantTabletStatMgr::ObTenantTabletStatMgr()
: report_stat_task_(*this), : report_stat_task_(*this),
stream_pool_(), stream_pool_(),
stream_map_(), stream_map_(),
lru_list_(),
bucket_lock_(), bucket_lock_(),
report_queue_(), report_queue_(),
report_cursor_(0), report_cursor_(0),
@ -465,11 +486,6 @@ void ObTenantTabletStatMgr::destroy()
{ {
ObBucketWLockAllGuard lock_guard(bucket_lock_); ObBucketWLockAllGuard lock_guard(bucket_lock_);
stream_map_.destroy(); stream_map_.destroy();
DLIST_REMOVE_ALL_NORET(node, lru_list_) {
lru_list_.remove(node);
stream_pool_.free(node);
}
lru_list_.reset();
stream_pool_.destroy(); stream_pool_.destroy();
report_cursor_ = 0; report_cursor_ = 0;
pending_cursor_ = 0; pending_cursor_ = 0;
@ -601,7 +617,7 @@ int ObTenantTabletStatMgr::update_tablet_stream(const ObTabletStat &report_stat)
if (OB_ISNULL(stream_node)) { if (OB_ISNULL(stream_node)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("stream node is unexpected null", K(ret), K(report_stat)); LOG_WARN("stream node is unexpected null", K(ret), K(report_stat));
} else if (!lru_list_.move_to_first(stream_node)) { } else if (OB_UNLIKELY(!stream_pool_.update_lru_list(stream_node))) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to add node to lru list", K(ret), K(stream_node)); LOG_WARN("failed to add node to lru list", K(ret), K(stream_node));
} else { } else {
@ -611,6 +627,7 @@ int ObTenantTabletStatMgr::update_tablet_stream(const ObTabletStat &report_stat)
} }
if (OB_FAIL(ret) && OB_NOT_NULL(stream_node)) { if (OB_FAIL(ret) && OB_NOT_NULL(stream_node)) {
stream_pool_.remove_lru_list(stream_node);
stream_pool_.free(stream_node); stream_pool_.free(stream_node);
stream_node = nullptr; stream_node = nullptr;
} }
@ -620,35 +637,27 @@ int ObTenantTabletStatMgr::update_tablet_stream(const ObTabletStat &report_stat)
int ObTenantTabletStatMgr::fetch_node(ObTabletStreamNode *&node) int ObTenantTabletStatMgr::fetch_node(ObTabletStreamNode *&node)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
bool is_retired = false;
node = nullptr; node = nullptr;
if (OB_FAIL(stream_pool_.alloc(node))) { if (OB_FAIL(stream_pool_.alloc(node, is_retired))) {
if (OB_SIZE_OVERFLOW == ret) { LOG_WARN("failed to alloc node", K(ret));
if (lru_list_.is_empty()) { } else if (is_retired) { // get node from lru_list, should retire the old stat
ret = OB_ERR_UNEXPECTED; ObTabletStatKey old_key = node->stream_.get_tablet_stat_key();
LOG_WARN("lru list is unexpected null", K(ret)); ObBucketHashWLockGuard lock_guard(bucket_lock_, old_key.hash());
} else { if (OB_FAIL(stream_map_.erase_refactored(old_key))) {
ret = OB_SUCCESS; LOG_WARN("failed to erase tablet stat stream", K(ret), K(old_key));
ObTabletStatKey old_key = lru_list_.get_last()->stream_.get_tablet_stat_key();
ObBucketHashWLockGuard lock_guard(bucket_lock_, old_key.hash());
if (OB_FAIL(stream_map_.erase_refactored(old_key))) {
LOG_WARN("failed to erase tablet stat stream", K(ret), K(old_key));
} else {
node = lru_list_.remove_last();
node->stream_.reset();
}
}
} else { } else {
LOG_WARN("failed to get free node from stream pool", K(ret)); node->reset();
} }
} } else if (OB_UNLIKELY(!stream_pool_.add_lru_list(node))) {
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(node)) {
} else if (!lru_list_.add_first(node)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to add node to lru list", K(ret), KPC(node)); LOG_WARN("failed to add node to lru list", K(ret), KPC(node));
stream_pool_.free(node); stream_pool_.free(node);
} }
if (OB_FAIL(ret) && OB_NOT_NULL(node)) {
node = nullptr;
}
return ret; return ret;
} }

View File

@ -220,6 +220,7 @@ class ObTabletStreamPool
{ {
public: public:
typedef common::ObFixedQueue<ObTabletStreamNode> FreeList; typedef common::ObFixedQueue<ObTabletStreamNode> FreeList;
typedef common::ObDList<ObTabletStreamNode> LruList;
enum NodeAllocType: int64_t { enum NodeAllocType: int64_t {
FIXED_ALLOC = 0, FIXED_ALLOC = 0,
DYNAMIC_ALLOC DYNAMIC_ALLOC
@ -230,8 +231,11 @@ public:
void destroy(); void destroy();
int init(const int64_t max_free_list_num, int init(const int64_t max_free_list_num,
const int64_t up_limit_node_num); const int64_t up_limit_node_num);
int alloc(ObTabletStreamNode *&node); int alloc(ObTabletStreamNode *&node, bool &is_retired);
void free(ObTabletStreamNode *node); void free(ObTabletStreamNode *node);
bool add_lru_list(ObTabletStreamNode *node) { return lru_list_.add_first(node); }
bool remove_lru_list(ObTabletStreamNode *node) { return lru_list_.remove(node); }
bool update_lru_list(ObTabletStreamNode *node) { return lru_list_.move_to_first(node); }
OB_INLINE int64_t get_free_num() const { return free_list_.get_total(); } OB_INLINE int64_t get_free_num() const { return free_list_.get_total(); }
OB_INLINE int64_t get_allocated_num() const { return (max_free_list_num_ - get_free_num()) + allocated_dynamic_num_; } OB_INLINE int64_t get_allocated_num() const { return (max_free_list_num_ - get_free_num()) + allocated_dynamic_num_; }
TO_STRING_KV(K_(max_free_list_num), K_(max_dynamic_node_num), K_(allocated_dynamic_num)); TO_STRING_KV(K_(max_free_list_num), K_(max_dynamic_node_num), K_(allocated_dynamic_num));
@ -240,6 +244,7 @@ private:
common::ObFIFOAllocator dynamic_allocator_; common::ObFIFOAllocator dynamic_allocator_;
common::ObArenaAllocator free_list_allocator_; common::ObArenaAllocator free_list_allocator_;
FreeList free_list_; FreeList free_list_;
LruList lru_list_;
int64_t max_free_list_num_; int64_t max_free_list_num_;
int64_t max_dynamic_node_num_; int64_t max_dynamic_node_num_;
int64_t allocated_dynamic_num_; int64_t allocated_dynamic_num_;
@ -304,7 +309,6 @@ private:
TabletStatUpdater report_stat_task_; TabletStatUpdater report_stat_task_;
ObTabletStreamPool stream_pool_; ObTabletStreamPool stream_pool_;
TabletStreamMap stream_map_; TabletStreamMap stream_map_;
common::ObDList<ObTabletStreamNode> lru_list_;
common::ObBucketLock bucket_lock_; common::ObBucketLock bucket_lock_;
ObTabletStat report_queue_[DEFAULT_MAX_PENDING_CNT]; ObTabletStat report_queue_[DEFAULT_MAX_PENDING_CNT];
uint64_t report_cursor_; uint64_t report_cursor_;

View File

@ -296,7 +296,8 @@ TEST_F(TestTenantTabletStatMgr, basic_stream_pool)
ASSERT_EQ(max_free_list_num, free_num); ASSERT_EQ(max_free_list_num, free_num);
ObTabletStreamNode *fixed_node = nullptr; ObTabletStreamNode *fixed_node = nullptr;
ret = pool.alloc(fixed_node); bool is_retired = false;
ret = pool.alloc(fixed_node, is_retired);
ASSERT_EQ(OB_SUCCESS, ret); ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_TRUE(NULL != fixed_node); ASSERT_TRUE(NULL != fixed_node);
ASSERT_EQ(storage::ObTabletStreamPool::NodeAllocType::FIXED_ALLOC, fixed_node->flag_); ASSERT_EQ(storage::ObTabletStreamPool::NodeAllocType::FIXED_ALLOC, fixed_node->flag_);
@ -308,7 +309,7 @@ TEST_F(TestTenantTabletStatMgr, basic_stream_pool)
common::ObSEArray<ObTabletStreamNode *, 500> free_nodes; common::ObSEArray<ObTabletStreamNode *, 500> free_nodes;
for (int64_t i = 0; i < max_free_list_num; ++i) { for (int64_t i = 0; i < max_free_list_num; ++i) {
ObTabletStreamNode *free_node = nullptr; ObTabletStreamNode *free_node = nullptr;
ret = pool.alloc(free_node); ret = pool.alloc(free_node, is_retired);
ASSERT_EQ(OB_SUCCESS, ret); ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_TRUE(NULL != free_node); ASSERT_TRUE(NULL != free_node);
ASSERT_EQ(storage::ObTabletStreamPool::NodeAllocType::FIXED_ALLOC, free_node->flag_); ASSERT_EQ(storage::ObTabletStreamPool::NodeAllocType::FIXED_ALLOC, free_node->flag_);
@ -318,7 +319,7 @@ TEST_F(TestTenantTabletStatMgr, basic_stream_pool)
ASSERT_EQ(0, pool.get_free_num()); ASSERT_EQ(0, pool.get_free_num());
ObTabletStreamNode *dynamic_node = nullptr; ObTabletStreamNode *dynamic_node = nullptr;
ret = pool.alloc(dynamic_node); ret = pool.alloc(dynamic_node, is_retired);
ASSERT_EQ(OB_SUCCESS, ret); ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_TRUE(NULL != dynamic_node); ASSERT_TRUE(NULL != dynamic_node);
ASSERT_EQ(storage::ObTabletStreamPool::NodeAllocType::DYNAMIC_ALLOC, dynamic_node->flag_); ASSERT_EQ(storage::ObTabletStreamPool::NodeAllocType::DYNAMIC_ALLOC, dynamic_node->flag_);