From 08a76580f3841e2626cc4246f798076eebe1a80a Mon Sep 17 00:00:00 2001 From: KyrielightWei Date: Thu, 7 Dec 2023 14:42:56 +0000 Subject: [PATCH] [CP] fix mds memory leak bug after creating buffer ctx failed --- src/storage/tx/ob_trans_define.cpp | 5 +- src/storage/tx/ob_trans_define.h | 4 +- src/storage/tx/ob_trans_part_ctx.cpp | 125 +++++++++++++------ src/storage/tx/ob_trans_part_ctx.h | 1 + src/storage/tx/ob_tx_ctx_mds.cpp | 113 ++++++++++++++++- src/storage/tx/ob_tx_ctx_mds.h | 39 +++++- unittest/storage/tx/it/test_register_mds.cpp | 45 +++++++ 7 files changed, 283 insertions(+), 49 deletions(-) diff --git a/src/storage/tx/ob_trans_define.cpp b/src/storage/tx/ob_trans_define.cpp index 63c1f17df..edbaccf5a 100644 --- a/src/storage/tx/ob_trans_define.cpp +++ b/src/storage/tx/ob_trans_define.cpp @@ -733,7 +733,7 @@ void ObTxExecInfo::reset() is_sub2pc_ = false; } -void ObTxExecInfo::destroy() +void ObTxExecInfo::destroy(ObTxMDSCache &mds_cache) { if (!mds_buffer_ctx_array_.empty()) { TRANS_LOG_RET(WARN, OB_ERR_UNEXPECTED, "mds_buffer_ctx_array_ is valid when exec_info destroy", @@ -745,7 +745,8 @@ void ObTxExecInfo::destroy() for (int64_t i = 0; i < multi_data_source_.count(); ++i) { ObTxBufferNode &node = multi_data_source_.at(i); if (nullptr != node.data_.ptr()) { - MultiTxDataFactory::free(node.data_.ptr()); + mds_cache.free_mds_node(node.data_, node.get_register_no()); + // share::mtl_free(node.data_.ptr()); node.buffer_ctx_node_.destroy_ctx(); } } diff --git a/src/storage/tx/ob_trans_define.h b/src/storage/tx/ob_trans_define.h index 9234f89e7..bbbe8ce8e 100644 --- a/src/storage/tx/ob_trans_define.h +++ b/src/storage/tx/ob_trans_define.h @@ -1638,6 +1638,8 @@ enum class RetainCause : int16_t MAX = 1 }; +class ObTxMDSCache; + static const int64_t MAX_TABLET_MODIFY_RECORD_COUNT = 16; // exec info need to be persisted by "trans context table" struct ObTxExecInfo @@ -1656,7 +1658,7 @@ public: void clear_buffer_ctx_in_multi_data_source(); void reset(); // can not destroy in tx_ctx_table - void destroy(); + void destroy(ObTxMDSCache &mds_cache); int assign(const ObTxExecInfo &exec_info); private: diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index be0938498..bf6e6f70b 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -118,7 +118,9 @@ int ObPartTransCtx::init(const uint64_t tenant_id, } else if (OB_FAIL(init_log_cbs_(ls_id, trans_id))) { TRANS_LOG(WARN, "init log cbs failed", KR(ret), K(trans_id), K(ls_id)); } else if (OB_FAIL(ctx_tx_data_.init(ls_ctx_mgr, trans_id))) { - TRANS_LOG(WARN, "init ctx tx data failed",K(ret)); + TRANS_LOG(WARN, "init ctx tx data failed",K(ret), K(trans_id), K(ls_id)); + } else if (OB_FAIL(mds_cache_.init(tenant_id))) { + TRANS_LOG(WARN, "init mds cache failed", K(ret), K(trans_id), K(ls_id)); } } @@ -255,19 +257,28 @@ void ObPartTransCtx::destroy() } (void)try_gc_retain_ctx_func_(); - if (NULL != tlog_) { - print_trace_log_if_necessary_(); - tlog_ = NULL; + exec_info_.destroy(mds_cache_); + + mds_cache_.destroy(); + + if (mds_cache_.is_mem_leak()) { + TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "mds memory leak!", K(trans_id_), K(ls_id_), + K(mds_cache_), K(exec_info_), K(ctx_tx_data_), K(start_replay_ts_), + K(start_recover_ts_), K(ctx_create_time_)); + FORCE_PRINT_TRACE(tlog_, "[check mds mem leak] "); } ctx_tx_data_.destroy(); - mds_cache_.destroy(); - exec_info_.destroy(); big_segment_info_.reset(); reset_log_cbs_(); + if (NULL != tlog_) { + print_trace_log_if_necessary_(); + tlog_ = NULL; + } + timeout_task_.destroy(); trace_info_.reset(); block_frozen_memtable_ = nullptr; @@ -5357,8 +5368,7 @@ int ObPartTransCtx::replay_multi_data_source(const ObTxMultiDataSourceLog &log, ObTxBufferNode &node = exec_info_.multi_data_source_.at(i); if (nullptr != node.data_.ptr()) { - MultiTxDataFactory::free(node.data_.ptr()); - node.data_.assign_ptr(nullptr, 0); + mds_cache_.free_mds_node(node.data_, node.get_register_no()); node.get_buffer_ctx_node().destroy_ctx(); } } @@ -6076,11 +6086,11 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array, ret = OB_ERR_UNDEFINED; TRANS_LOG(ERROR, "unexpected mds type", KR(ret), K(*this)); } else if (old_node.get_data_source_type() <= ObTxDataSourceType::BEFORE_VERSION_4_1 - && ObTxDataSourceType::CREATE_TABLET_NEW_MDS != old_node.get_data_source_type() - && ObTxDataSourceType::DELETE_TABLET_NEW_MDS != old_node.get_data_source_type() - && ObTxDataSourceType::UNBIND_TABLET_NEW_MDS != old_node.get_data_source_type()) { - TRANS_LOG(INFO, "old mds type, no need process with buffer ctx", - K(old_node.get_data_source_type()), K(*this)); + && ObTxDataSourceType::CREATE_TABLET_NEW_MDS != old_node.get_data_source_type() + && ObTxDataSourceType::DELETE_TABLET_NEW_MDS != old_node.get_data_source_type() + && ObTxDataSourceType::UNBIND_TABLET_NEW_MDS != old_node.get_data_source_type()) { + TRANS_LOG(DEBUG, "old mds type, no need process with buffer ctx", + K(old_node.get_data_source_type()), K(*this)); } else { if (OB_ISNULL(old_node.get_buffer_ctx_node().get_ctx())) { // this is replay path, create ctx if (OB_FAIL(mds::MdsFactory::create_buffer_ctx(old_node.get_data_source_type(), trans_id_, @@ -6105,11 +6115,13 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array, ObTxBufferNodeArray tmp_buf_arr; - void *ptr = nullptr; + // void *ptr = nullptr; int64_t len = 0; if (OB_FAIL(tmp_buf_arr.reserve(additional_count))) { TRANS_LOG(WARN, "reserve array space failed", K(ret)); + } else if(OB_FAIL(incremental_array.reserve(additional_count))) { + TRANS_LOG(WARN, "reserve incremental_array space failed", K(ret)); } else if (need_replace) { ret = exec_info_.multi_data_source_.reserve(additional_count); } else { @@ -6123,20 +6135,30 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array, for (int64_t i = 0; OB_SUCC(ret) && i < mds_array.count(); ++i) { const ObTxBufferNode &node = mds_array.at(i); len = node.data_.length(); - if (OB_ISNULL(ptr = MultiTxDataFactory::alloc(len, trans_id_, (uint64_t)this))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - TRANS_LOG(WARN, "allocate memory failed", KR(ret), K(*this), K(len)); + ObString tmp_data; + if (OB_FAIL(mds_cache_.alloc_mds_node(this, node.data_.ptr(), len, tmp_data, node.get_register_no()))) { + TRANS_LOG(WARN, "alloc mds node from the mds_cache_ failed", K(ret), K(mds_cache_), KPC(this)); } else { - MEMCPY(ptr, node.data_.ptr(), len); + // if (OB_ISNULL(ptr = mtl_malloc(len, ""))) { + // ret = OB_ALLOCATE_MEMORY_FAILED; + // TRANS_LOG(WARN, "allocate memory failed", KR(ret), K(*this), K(len)); + // } else { + // MEMCPY(ptr, node.data_.ptr(), len); ObTxBufferNode new_node; - ObString data; - data.assign_ptr(reinterpret_cast(ptr), len); + // ObString data; + // data.assign_ptr(reinterpret_cast(ptr), len); mds::BufferCtx *new_ctx = nullptr; if (OB_FAIL(process_with_buffer_ctx(node, new_ctx))) { + mds_cache_.free_mds_node(tmp_data, node.get_register_no()); + // mtl_free(tmp_data.ptr()); + if (OB_NOT_NULL(new_ctx)) { + MTL(mds::ObTenantMdsService*)->get_buffer_ctx_allocator().free(new_ctx); + new_ctx = nullptr; + } TRANS_LOG(WARN, "process_with_buffer_ctx failed", KR(ret), K(*this)); - } else if (OB_FAIL(new_node.init(node.get_data_source_type(), data, node.mds_base_scn_, + } else if (OB_FAIL(new_node.init(node.get_data_source_type(), tmp_data, node.mds_base_scn_, new_ctx))) { - MultiTxDataFactory::free(data.ptr()); + mds_cache_.free_mds_node(tmp_data, node.get_register_no()); if (OB_NOT_NULL(new_ctx)) { MTL(mds::ObTenantMdsService *)->get_buffer_ctx_allocator().free(new_ctx); new_ctx = nullptr; @@ -6144,14 +6166,15 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array, TRANS_LOG(WARN, "init new node failed", KR(ret), K(*this)); } else if (ObTxBufferNode::is_valid_register_no(node.get_register_no()) && OB_FAIL(new_node.set_mds_register_no(node.get_register_no()))) { - mtl_free(data.ptr()); + mds_cache_.free_mds_node(tmp_data, node.get_register_no()); + // mtl_free(tmp_data.ptr()); if (OB_NOT_NULL(new_ctx)) { MTL(mds::ObTenantMdsService *)->get_buffer_ctx_allocator().free(new_ctx); new_ctx = nullptr; } TRANS_LOG(WARN, "set mds register_no failed", KR(ret), K(*this)); } else if (OB_FAIL(tmp_buf_arr.push_back(new_node))) { - MultiTxDataFactory::free(data.ptr()); + mds_cache_.free_mds_node(tmp_data, node.get_register_no()); if (OB_NOT_NULL(new_ctx)) { MTL(mds::ObTenantMdsService *)->get_buffer_ctx_allocator().free(new_ctx); new_ctx = nullptr; @@ -6163,7 +6186,7 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array, if (OB_FAIL(ret)) { for (int64_t i = 0; i < tmp_buf_arr.count(); ++i) { - MultiTxDataFactory::free(tmp_buf_arr[i].data_.ptr()); + mds_cache_.free_mds_node(tmp_buf_arr[i].data_, tmp_buf_arr[i].get_register_no()); tmp_buf_arr[i].buffer_ctx_node_.destroy_ctx(); } tmp_buf_arr.reset(); @@ -6176,7 +6199,8 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array, for (int64_t i = 0; i < exec_info_.multi_data_source_.count(); ++i) { if (nullptr != exec_info_.multi_data_source_[i].data_.ptr()) { - MultiTxDataFactory::free(exec_info_.multi_data_source_[i].data_.ptr()); + mds_cache_.free_mds_node(exec_info_.multi_data_source_[i].data_, + exec_info_.multi_data_source_[i].get_register_no()); } exec_info_.multi_data_source_[i].buffer_ctx_node_.destroy_ctx(); } @@ -6213,8 +6237,8 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array, } if (tmp_buf_arr[i].get_register_no() == exec_info_.multi_data_source_[ctx_array_start_index].get_register_no()) { - - mtl_free(tmp_buf_arr[i].data_.ptr()); + mds_cache_.free_mds_node(tmp_buf_arr[i].data_, tmp_buf_arr[i].get_register_no()); + // mtl_free(tmp_buf_arr[i].data_.ptr()); tmp_buf_arr[i].buffer_ctx_node_.destroy_ctx(); if (OB_FAIL(incremental_array.push_back( exec_info_.multi_data_source_[ctx_array_start_index]))) { @@ -6440,6 +6464,7 @@ int ObPartTransCtx::prepare_mul_data_source_tx_end_(bool is_commit) #ifdef ERRSIM ERRSIM_POINT_DEF(EN_DUP_TABLE_REDO_SYNC) ERRSIM_POINT_DEF(EN_SUBMIT_TX_PREPARE_LOG) +ERRSIM_POINT_DEF(EN_NOTIFY_MDS) #endif int ObPartTransCtx::errism_dup_table_redo_sync_() @@ -6471,6 +6496,21 @@ OB_NOINLINE int ObPartTransCtx::errism_submit_prepare_log_() return ret; } +OB_NOINLINE int ObPartTransCtx::errsim_notify_mds_() +{ + int ret = OB_SUCCESS; + +#ifdef ERRSIM + ret = EN_NOTIFY_MDS; +#endif + + if (OB_FAIL(ret)) { + TRANS_LOG(WARN, "errsim notify mds in test", K(ret)); + } + + return ret; +} + int ObPartTransCtx::notify_table_lock_(const SCN &log_ts, const bool for_replay, const ObTxBufferNodeArray ¬ify_array, @@ -6512,7 +6552,15 @@ int ObPartTransCtx::notify_data_source_(const NotifyType notify_type, const bool is_force_kill) { int ret = OB_SUCCESS; - if (is_exiting_ && sub_state_.is_force_abort()) { + + if (OB_FAIL(errsim_notify_mds_())) { + TRANS_LOG(WARN, "notify mds errsim", K(ret), K(ls_id_), K(trans_id_), K(notify_type), K(log_ts), + K(for_replay), K(notify_array), K(is_force_kill)); + } + + if (OB_FAIL(ret)) { + // do nothing + } else if (is_exiting_ && sub_state_.is_force_abort()) { // do nothing } else { ObMulSourceDataNotifyArg arg; @@ -6530,8 +6578,8 @@ int ObPartTransCtx::notify_data_source_(const NotifyType notify_type, TRANS_LOG(WARN, "notify data source failed", K(ret), K(arg)); } if (notify_array.count() > 0) { - TRANS_LOG(INFO, "notify MDS", K(ret), K(trans_id_), K(ls_id_), K(notify_type), K(log_ts), K(notify_array.count()), - K(notify_array), K(total_time)); + TRANS_LOG(INFO, "notify MDS", K(ret), K(trans_id_), K(ls_id_), K(notify_type), K(log_ts), + K(notify_array.count()), K(notify_array), K(total_time)); } } return ret; @@ -6547,7 +6595,7 @@ int ObPartTransCtx::register_multi_data_source(const ObTxDataSourceType data_sou int tmp_ret = OB_SUCCESS; ObTxBufferNode node; ObString data; - void *ptr = nullptr; + // void *ptr = nullptr; ObTxBufferNodeArray tmp_array; bool need_lock = true; @@ -6589,11 +6637,10 @@ int ObPartTransCtx::register_multi_data_source(const ObTxDataSourceType data_sou } else if (is_committing_()) { ret = OB_TRANS_HAS_DECIDED; TRANS_LOG(WARN, "can not register mds in committing part_ctx", K(ret), KPC(this)); - } else if (OB_ISNULL(ptr = MultiTxDataFactory::alloc(len, trans_id_, (uint64_t)this))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - TRANS_LOG(WARN, "allocate memory failed", KR(ret), K(data_source_type), K(len)); - } else if (FALSE_IT(MEMCPY(ptr, buf, len))) { - } else if (FALSE_IT(data.assign_ptr(reinterpret_cast(ptr), len))) { + } else if (OB_FAIL(mds_cache_.try_recover_max_register_no(exec_info_.multi_data_source_))) { + TRANS_LOG(WARN, "recover max register no failed", K(ret), K(mds_cache_), KPC(this)); + } else if (OB_FAIL(mds_cache_.alloc_mds_node(this, buf, len, data))) { + TRANS_LOG(WARN, "alloc mds node from the mds_cache_ failed", K(ret), K(mds_cache_), KPC(this)); } else { mds::BufferCtx *buffer_ctx = nullptr; if (data_source_type > ObTxDataSourceType::BEFORE_VERSION_4_1 @@ -6616,15 +6663,13 @@ int ObPartTransCtx::register_multi_data_source(const ObTxDataSourceType data_sou ret = OB_LOG_TOO_LARGE; TRANS_LOG(WARN, "too large mds buf node", K(ret), K(tmp_array.get_serialize_size())); //#endif - } else if (OB_FAIL(mds_cache_.try_recover_max_register_no(exec_info_.multi_data_source_))) { - TRANS_LOG(WARN, "recover max register no failed", K(ret), K(mds_cache_), KPC(this)); } else if (OB_FAIL(mds_cache_.insert_mds_node(node))) { TRANS_LOG(WARN, "register multi source data failed", KR(ret), K(data_source_type), K(*this)); } if (OB_FAIL(ret)) { - MultiTxDataFactory::free(ptr); + mds_cache_.free_mds_node(data, node.get_register_no()); if (OB_NOT_NULL(buffer_ctx)) { MTL(mds::ObTenantMdsService *)->get_buffer_ctx_allocator().free(buffer_ctx); } diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 2466bf28d..b09ac8ac8 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -565,6 +565,7 @@ private: int errism_dup_table_redo_sync_(); int errism_submit_prepare_log_(); + int errsim_notify_mds_(); protected: virtual int get_gts_(share::SCN >s); virtual int wait_gts_elapse_commit_version_(bool &need_wait); diff --git a/src/storage/tx/ob_tx_ctx_mds.cpp b/src/storage/tx/ob_tx_ctx_mds.cpp index a8a10c245..495f16ff7 100644 --- a/src/storage/tx/ob_tx_ctx_mds.cpp +++ b/src/storage/tx/ob_tx_ctx_mds.cpp @@ -18,6 +18,18 @@ namespace oceanbase namespace transaction { +int ObTxMDSCache::init(int64_t tenant_id) +{ + int ret = OB_SUCCESS; + + ObMemAttr attr(tenant_id, "MdsMemHash"); + if (OB_FAIL(mem_stat_hash_.create(16, attr, attr))) { + TRANS_LOG(WARN, "create mds mem stat failed", K(ret), K(tenant_id)); + } + + return ret; +} + void ObTxMDSCache::reset() { // allocator_.reset(); @@ -26,6 +38,7 @@ void ObTxMDSCache::reset() submitted_iterator_ = mds_list_.end(); // ObTxBufferNodeList::iterator(); need_retry_submit_mds_ = false; max_register_no_ = 0; + mem_stat_hash_.destroy(); } void ObTxMDSCache::destroy() @@ -35,10 +48,106 @@ void ObTxMDSCache::destroy() while (!mds_list_.empty()) { mds_list_.pop_front(tmp_node); if (nullptr != tmp_node.data_.ptr()) { - MultiTxDataFactory::free(tmp_node.data_.ptr()); + free_mds_node(tmp_node.data_, tmp_node.get_register_no()); } tmp_node.get_buffer_ctx_node().destroy_ctx(); } + mem_stat_hash_.destroy(); +} + +int ObTxMDSCache::alloc_mds_node(const ObPartTransCtx *tx_ctx, + const char *buf, + const int64_t buf_len, + common::ObString &data, + uint64_t register_no) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + uint64_t cur_register_no = 0; + + ObMDSMemStat tmp_mem_stat; + tmp_mem_stat.reset(); + + if (register_no <= 0) { + cur_register_no = max_register_no_ + 1; + } else { + cur_register_no = register_no; + } + + void *ptr = nullptr; + if (OB_ISNULL(ptr = + MultiTxDataFactory::alloc(buf_len, tx_ctx->get_trans_id(), (uint64_t)tx_ctx))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + TRANS_LOG(WARN, "allocate memory failed", KR(ret), K(buf_len)); + } else { + MEMCPY(ptr, buf, buf_len); + data.assign_ptr(reinterpret_cast(ptr), buf_len); + } + + if (OB_TMP_FAIL(mem_stat_hash_.get_refactored(cur_register_no, tmp_mem_stat))) { + if (OB_HASH_NOT_EXIST != tmp_ret) { + TRANS_LOG(ERROR, "get tmp_mem_stat from mem_stat_hash failed", K(ret), K(tmp_ret), + K(cur_register_no), K(tmp_mem_stat), KPC(tx_ctx)); + } + tmp_mem_stat.reset(); + } + + if (OB_SUCCESS == tmp_ret || OB_HASH_NOT_EXIST == tmp_ret) { + tmp_mem_stat.alloc_cnt_++; + if (OB_TMP_FAIL(mem_stat_hash_.set_refactored(cur_register_no, tmp_mem_stat, 1))) { + TRANS_LOG(ERROR, "insert mem_stat_ into hash table failed", K(ret), K(tmp_ret), + K(cur_register_no), K(tmp_mem_stat),KPC(tx_ctx) ); + } + + } + + return ret; +} + +void ObTxMDSCache::free_mds_node(common::ObString &data, uint64_t register_no) +{ + int tmp_ret = OB_SUCCESS; + uint64_t cur_register_no = register_no; + ObMDSMemStat tmp_mem_stat; + tmp_mem_stat.reset(); + + if (register_no <= 0) { + cur_register_no = max_register_no_ + 1; + } else { + cur_register_no = register_no; + } + + MultiTxDataFactory::free(data.ptr()); + + if (OB_TMP_FAIL(mem_stat_hash_.get_refactored(cur_register_no, tmp_mem_stat))) { + TRANS_LOG_RET(ERROR, tmp_ret, "get tmp_mem_stat from mem_stat_hash failed", K(ret), K(tmp_ret), + K(cur_register_no), K(tmp_mem_stat)); + tmp_mem_stat.reset(); + } + + if (OB_SUCCESS == tmp_ret) { + tmp_mem_stat.free_cnt_++; + if (tmp_mem_stat.free_cnt_ >= tmp_mem_stat.alloc_cnt_) { + if (OB_TMP_FAIL(mem_stat_hash_.erase_refactored(cur_register_no))) { + TRANS_LOG_RET(ERROR, tmp_ret, "insert mem_stat_ into hash table failed", K(ret), K(tmp_ret), + K(cur_register_no), K(tmp_mem_stat)); + } + } + } +} + +bool ObTxMDSCache::is_mem_leak() +{ + bool mem_leak = !mem_stat_hash_.empty(); + + if (mem_leak) { + for (ObTxMDSMemStatHash::iterator iter = mem_stat_hash_.begin(); iter != mem_stat_hash_.end(); + iter++) { + TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "mds node mem leak", K(iter->first), K(iter->second)); + } + } + + return mem_leak; } int ObTxMDSCache::try_recover_max_register_no(const ObTxBufferNodeArray &node_array) @@ -84,7 +193,7 @@ int ObTxMDSCache::rollback_last_mds_node() TRANS_LOG(WARN, "pop back last node failed", K(ret)); } else { TRANS_LOG(INFO, "rollback the last mds node", K(ret), K(buf_node), KPC(this)); - MultiTxDataFactory::free(buf_node.get_ptr()); + free_mds_node(buf_node.data_, buf_node.get_register_no()); buf_node.get_buffer_ctx_node().destroy_ctx(); } diff --git a/src/storage/tx/ob_tx_ctx_mds.h b/src/storage/tx/ob_tx_ctx_mds.h index d8a26d541..feadec885 100644 --- a/src/storage/tx/ob_tx_ctx_mds.h +++ b/src/storage/tx/ob_tx_ctx_mds.h @@ -26,13 +26,45 @@ typedef ObList ObTxBufferNodeList; class ObTxMDSRange; +struct ObMDSMemStat +{ + // uint64_t register_no_; + uint64_t alloc_cnt_; + uint64_t free_cnt_; + uint64_t mem_size_; + + void reset() + { + alloc_cnt_ = 0; + free_cnt_ = 0; + mem_size_ = 0; + } + + ObMDSMemStat() { reset(); } + + TO_STRING_KV(K(alloc_cnt_), K(free_cnt_), K(mem_size_)); +}; + +typedef common::hash::ObHashMap ObTxMDSMemStatHash; + class ObTxMDSCache { public: ObTxMDSCache(TransModulePageAllocator &allocator) : mds_list_(allocator) { reset(); } + int init(int64_t tenant_id); void reset(); void destroy(); + int alloc_mds_node(const ObPartTransCtx *tx_ctx, + const char *buf, + const int64_t buf_len, + common::ObString &data, + uint64_t register_no = 0); + void free_mds_node(common::ObString & data, uint64_t register_no = 0); + + bool is_mem_leak(); + + int try_recover_max_register_no(const ObTxBufferNodeArray & node_array); int insert_mds_node(ObTxBufferNode &buf_node); int rollback_last_mds_node(); @@ -51,10 +83,7 @@ public: int64_t count() const { return mds_list_.size(); } void update_submitted_iterator(ObTxBufferNodeArray &range_array); void update_sync_failed_range(ObTxBufferNodeArray &range_array); - // { - // unsubmitted_size_ = unsubmitted_size_ - iter->get_serialize_size(); - // submitted_iterator_ = iter; - // } + void clear_submitted_iterator() { submitted_iterator_ = mds_list_.end(); } bool is_contain(const ObTxDataSourceType target_type) const; @@ -71,6 +100,8 @@ private: int64_t unsubmitted_size_; ObTxBufferNodeList mds_list_; ObTxBufferNodeList::iterator submitted_iterator_; + + ObTxMDSMemStatHash mem_stat_hash_; }; class ObTxMDSRange diff --git a/unittest/storage/tx/it/test_register_mds.cpp b/unittest/storage/tx/it/test_register_mds.cpp index e57a4b689..4047782a2 100644 --- a/unittest/storage/tx/it/test_register_mds.cpp +++ b/unittest/storage/tx/it/test_register_mds.cpp @@ -71,6 +71,24 @@ OB_NOINLINE int ObTransService::acquire_local_snapshot_(const share::ObLSID &ls_ acquire_from_follower = false; return ret; } + +bool NOTIFY_MDS_ERRSIM = false; + +OB_NOINLINE int ObPartTransCtx::errsim_notify_mds_() +{ + int ret = OB_SUCCESS; + + if (NOTIFY_MDS_ERRSIM) { + ret = OB_ERR_UNEXPECTED; + } + + if (OB_FAIL(ret)) { + TRANS_LOG(WARN, "errsim notify mds", K(ret), K(NOTIFY_MDS_ERRSIM)); + } + + return ret; +} + class ObTestRegisterMDS : public ::testing::Test { public: @@ -162,6 +180,33 @@ TEST_F(ObTestRegisterMDS, basic_big_mds) #endif } +TEST_F(ObTestRegisterMDS, notify_mds_error) +{ + START_TWO_TX_NODE_WITH_LSID(n1, n2, 2005); + PREPARE_TX(n1, tx); + PREPARE_TX_PARAM(tx_param); + const char *mds_str = "register mds basic"; + + ASSERT_EQ(OB_SUCCESS, n1->start_tx(tx, tx_param)); + + NOTIFY_MDS_ERRSIM = true; + ASSERT_EQ(OB_ERR_UNEXPECTED, n1->txs_.register_mds_into_tx(tx, n1->ls_id_, ObTxDataSourceType::DDL_TRANS, + mds_str, strlen(mds_str))); + NOTIFY_MDS_ERRSIM = false; + + n2->wait_all_redolog_applied(); + ASSERT_EQ(OB_SUCCESS, n1->commit_tx(tx, n1->ts_after_ms(500))); + + n2->set_as_follower_replica(*n1); + ReplayLogEntryFunctor functor(n2); + ASSERT_EQ(OB_SUCCESS, n2->fake_tx_log_adapter_->replay_all(functor)); + + GC_MDS_RETAIN_CTX(n1) + ASSERT_EQ(OB_SUCCESS, n1->wait_all_tx_ctx_is_destoryed()); + + GC_MDS_RETAIN_CTX(n2) + ASSERT_EQ(OB_SUCCESS, n2->wait_all_tx_ctx_is_destoryed()); +} } // namespace oceanbase int main(int argc, char **argv)