[CP] fix mds memory leak bug after creating buffer ctx failed

This commit is contained in:
KyrielightWei 2023-12-07 14:42:56 +00:00 committed by ob-robot
parent 68dc85c53c
commit 08a76580f3
7 changed files with 283 additions and 49 deletions

View File

@ -733,7 +733,7 @@ void ObTxExecInfo::reset()
is_sub2pc_ = false;
}
void ObTxExecInfo::destroy()
void ObTxExecInfo::destroy(ObTxMDSCache &mds_cache)
{
if (!mds_buffer_ctx_array_.empty()) {
TRANS_LOG_RET(WARN, OB_ERR_UNEXPECTED, "mds_buffer_ctx_array_ is valid when exec_info destroy",
@ -745,7 +745,8 @@ void ObTxExecInfo::destroy()
for (int64_t i = 0; i < multi_data_source_.count(); ++i) {
ObTxBufferNode &node = multi_data_source_.at(i);
if (nullptr != node.data_.ptr()) {
MultiTxDataFactory::free(node.data_.ptr());
mds_cache.free_mds_node(node.data_, node.get_register_no());
// share::mtl_free(node.data_.ptr());
node.buffer_ctx_node_.destroy_ctx();
}
}

View File

@ -1638,6 +1638,8 @@ enum class RetainCause : int16_t
MAX = 1
};
class ObTxMDSCache;
static const int64_t MAX_TABLET_MODIFY_RECORD_COUNT = 16;
// exec info need to be persisted by "trans context table"
struct ObTxExecInfo
@ -1656,7 +1658,7 @@ public:
void clear_buffer_ctx_in_multi_data_source();
void reset();
// can not destroy in tx_ctx_table
void destroy();
void destroy(ObTxMDSCache &mds_cache);
int assign(const ObTxExecInfo &exec_info);
private:

View File

@ -118,7 +118,9 @@ int ObPartTransCtx::init(const uint64_t tenant_id,
} else if (OB_FAIL(init_log_cbs_(ls_id, trans_id))) {
TRANS_LOG(WARN, "init log cbs failed", KR(ret), K(trans_id), K(ls_id));
} else if (OB_FAIL(ctx_tx_data_.init(ls_ctx_mgr, trans_id))) {
TRANS_LOG(WARN, "init ctx tx data failed",K(ret));
TRANS_LOG(WARN, "init ctx tx data failed",K(ret), K(trans_id), K(ls_id));
} else if (OB_FAIL(mds_cache_.init(tenant_id))) {
TRANS_LOG(WARN, "init mds cache failed", K(ret), K(trans_id), K(ls_id));
}
}
@ -255,19 +257,28 @@ void ObPartTransCtx::destroy()
}
(void)try_gc_retain_ctx_func_();
if (NULL != tlog_) {
print_trace_log_if_necessary_();
tlog_ = NULL;
exec_info_.destroy(mds_cache_);
mds_cache_.destroy();
if (mds_cache_.is_mem_leak()) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "mds memory leak!", K(trans_id_), K(ls_id_),
K(mds_cache_), K(exec_info_), K(ctx_tx_data_), K(start_replay_ts_),
K(start_recover_ts_), K(ctx_create_time_));
FORCE_PRINT_TRACE(tlog_, "[check mds mem leak] ");
}
ctx_tx_data_.destroy();
mds_cache_.destroy();
exec_info_.destroy();
big_segment_info_.reset();
reset_log_cbs_();
if (NULL != tlog_) {
print_trace_log_if_necessary_();
tlog_ = NULL;
}
timeout_task_.destroy();
trace_info_.reset();
block_frozen_memtable_ = nullptr;
@ -5357,8 +5368,7 @@ int ObPartTransCtx::replay_multi_data_source(const ObTxMultiDataSourceLog &log,
ObTxBufferNode &node = exec_info_.multi_data_source_.at(i);
if (nullptr != node.data_.ptr()) {
MultiTxDataFactory::free(node.data_.ptr());
node.data_.assign_ptr(nullptr, 0);
mds_cache_.free_mds_node(node.data_, node.get_register_no());
node.get_buffer_ctx_node().destroy_ctx();
}
}
@ -6076,11 +6086,11 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array,
ret = OB_ERR_UNDEFINED;
TRANS_LOG(ERROR, "unexpected mds type", KR(ret), K(*this));
} else if (old_node.get_data_source_type() <= ObTxDataSourceType::BEFORE_VERSION_4_1
&& ObTxDataSourceType::CREATE_TABLET_NEW_MDS != old_node.get_data_source_type()
&& ObTxDataSourceType::DELETE_TABLET_NEW_MDS != old_node.get_data_source_type()
&& ObTxDataSourceType::UNBIND_TABLET_NEW_MDS != old_node.get_data_source_type()) {
TRANS_LOG(INFO, "old mds type, no need process with buffer ctx",
K(old_node.get_data_source_type()), K(*this));
&& ObTxDataSourceType::CREATE_TABLET_NEW_MDS != old_node.get_data_source_type()
&& ObTxDataSourceType::DELETE_TABLET_NEW_MDS != old_node.get_data_source_type()
&& ObTxDataSourceType::UNBIND_TABLET_NEW_MDS != old_node.get_data_source_type()) {
TRANS_LOG(DEBUG, "old mds type, no need process with buffer ctx",
K(old_node.get_data_source_type()), K(*this));
} else {
if (OB_ISNULL(old_node.get_buffer_ctx_node().get_ctx())) { // this is replay path, create ctx
if (OB_FAIL(mds::MdsFactory::create_buffer_ctx(old_node.get_data_source_type(), trans_id_,
@ -6105,11 +6115,13 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array,
ObTxBufferNodeArray tmp_buf_arr;
void *ptr = nullptr;
// void *ptr = nullptr;
int64_t len = 0;
if (OB_FAIL(tmp_buf_arr.reserve(additional_count))) {
TRANS_LOG(WARN, "reserve array space failed", K(ret));
} else if(OB_FAIL(incremental_array.reserve(additional_count))) {
TRANS_LOG(WARN, "reserve incremental_array space failed", K(ret));
} else if (need_replace) {
ret = exec_info_.multi_data_source_.reserve(additional_count);
} else {
@ -6123,20 +6135,30 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array,
for (int64_t i = 0; OB_SUCC(ret) && i < mds_array.count(); ++i) {
const ObTxBufferNode &node = mds_array.at(i);
len = node.data_.length();
if (OB_ISNULL(ptr = MultiTxDataFactory::alloc(len, trans_id_, (uint64_t)this))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
TRANS_LOG(WARN, "allocate memory failed", KR(ret), K(*this), K(len));
ObString tmp_data;
if (OB_FAIL(mds_cache_.alloc_mds_node(this, node.data_.ptr(), len, tmp_data, node.get_register_no()))) {
TRANS_LOG(WARN, "alloc mds node from the mds_cache_ failed", K(ret), K(mds_cache_), KPC(this));
} else {
MEMCPY(ptr, node.data_.ptr(), len);
// if (OB_ISNULL(ptr = mtl_malloc(len, ""))) {
// ret = OB_ALLOCATE_MEMORY_FAILED;
// TRANS_LOG(WARN, "allocate memory failed", KR(ret), K(*this), K(len));
// } else {
// MEMCPY(ptr, node.data_.ptr(), len);
ObTxBufferNode new_node;
ObString data;
data.assign_ptr(reinterpret_cast<char *>(ptr), len);
// ObString data;
// data.assign_ptr(reinterpret_cast<char *>(ptr), len);
mds::BufferCtx *new_ctx = nullptr;
if (OB_FAIL(process_with_buffer_ctx(node, new_ctx))) {
mds_cache_.free_mds_node(tmp_data, node.get_register_no());
// mtl_free(tmp_data.ptr());
if (OB_NOT_NULL(new_ctx)) {
MTL(mds::ObTenantMdsService*)->get_buffer_ctx_allocator().free(new_ctx);
new_ctx = nullptr;
}
TRANS_LOG(WARN, "process_with_buffer_ctx failed", KR(ret), K(*this));
} else if (OB_FAIL(new_node.init(node.get_data_source_type(), data, node.mds_base_scn_,
} else if (OB_FAIL(new_node.init(node.get_data_source_type(), tmp_data, node.mds_base_scn_,
new_ctx))) {
MultiTxDataFactory::free(data.ptr());
mds_cache_.free_mds_node(tmp_data, node.get_register_no());
if (OB_NOT_NULL(new_ctx)) {
MTL(mds::ObTenantMdsService *)->get_buffer_ctx_allocator().free(new_ctx);
new_ctx = nullptr;
@ -6144,14 +6166,15 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array,
TRANS_LOG(WARN, "init new node failed", KR(ret), K(*this));
} else if (ObTxBufferNode::is_valid_register_no(node.get_register_no())
&& OB_FAIL(new_node.set_mds_register_no(node.get_register_no()))) {
mtl_free(data.ptr());
mds_cache_.free_mds_node(tmp_data, node.get_register_no());
// mtl_free(tmp_data.ptr());
if (OB_NOT_NULL(new_ctx)) {
MTL(mds::ObTenantMdsService *)->get_buffer_ctx_allocator().free(new_ctx);
new_ctx = nullptr;
}
TRANS_LOG(WARN, "set mds register_no failed", KR(ret), K(*this));
} else if (OB_FAIL(tmp_buf_arr.push_back(new_node))) {
MultiTxDataFactory::free(data.ptr());
mds_cache_.free_mds_node(tmp_data, node.get_register_no());
if (OB_NOT_NULL(new_ctx)) {
MTL(mds::ObTenantMdsService *)->get_buffer_ctx_allocator().free(new_ctx);
new_ctx = nullptr;
@ -6163,7 +6186,7 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array,
if (OB_FAIL(ret)) {
for (int64_t i = 0; i < tmp_buf_arr.count(); ++i) {
MultiTxDataFactory::free(tmp_buf_arr[i].data_.ptr());
mds_cache_.free_mds_node(tmp_buf_arr[i].data_, tmp_buf_arr[i].get_register_no());
tmp_buf_arr[i].buffer_ctx_node_.destroy_ctx();
}
tmp_buf_arr.reset();
@ -6176,7 +6199,8 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array,
for (int64_t i = 0; i < exec_info_.multi_data_source_.count(); ++i) {
if (nullptr != exec_info_.multi_data_source_[i].data_.ptr()) {
MultiTxDataFactory::free(exec_info_.multi_data_source_[i].data_.ptr());
mds_cache_.free_mds_node(exec_info_.multi_data_source_[i].data_,
exec_info_.multi_data_source_[i].get_register_no());
}
exec_info_.multi_data_source_[i].buffer_ctx_node_.destroy_ctx();
}
@ -6213,8 +6237,8 @@ int ObPartTransCtx::deep_copy_mds_array_(const ObTxBufferNodeArray &mds_array,
}
if (tmp_buf_arr[i].get_register_no()
== exec_info_.multi_data_source_[ctx_array_start_index].get_register_no()) {
mtl_free(tmp_buf_arr[i].data_.ptr());
mds_cache_.free_mds_node(tmp_buf_arr[i].data_, tmp_buf_arr[i].get_register_no());
// mtl_free(tmp_buf_arr[i].data_.ptr());
tmp_buf_arr[i].buffer_ctx_node_.destroy_ctx();
if (OB_FAIL(incremental_array.push_back(
exec_info_.multi_data_source_[ctx_array_start_index]))) {
@ -6440,6 +6464,7 @@ int ObPartTransCtx::prepare_mul_data_source_tx_end_(bool is_commit)
#ifdef ERRSIM
ERRSIM_POINT_DEF(EN_DUP_TABLE_REDO_SYNC)
ERRSIM_POINT_DEF(EN_SUBMIT_TX_PREPARE_LOG)
ERRSIM_POINT_DEF(EN_NOTIFY_MDS)
#endif
int ObPartTransCtx::errism_dup_table_redo_sync_()
@ -6471,6 +6496,21 @@ OB_NOINLINE int ObPartTransCtx::errism_submit_prepare_log_()
return ret;
}
OB_NOINLINE int ObPartTransCtx::errsim_notify_mds_()
{
int ret = OB_SUCCESS;
#ifdef ERRSIM
ret = EN_NOTIFY_MDS;
#endif
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "errsim notify mds in test", K(ret));
}
return ret;
}
int ObPartTransCtx::notify_table_lock_(const SCN &log_ts,
const bool for_replay,
const ObTxBufferNodeArray &notify_array,
@ -6512,7 +6552,15 @@ int ObPartTransCtx::notify_data_source_(const NotifyType notify_type,
const bool is_force_kill)
{
int ret = OB_SUCCESS;
if (is_exiting_ && sub_state_.is_force_abort()) {
if (OB_FAIL(errsim_notify_mds_())) {
TRANS_LOG(WARN, "notify mds errsim", K(ret), K(ls_id_), K(trans_id_), K(notify_type), K(log_ts),
K(for_replay), K(notify_array), K(is_force_kill));
}
if (OB_FAIL(ret)) {
// do nothing
} else if (is_exiting_ && sub_state_.is_force_abort()) {
// do nothing
} else {
ObMulSourceDataNotifyArg arg;
@ -6530,8 +6578,8 @@ int ObPartTransCtx::notify_data_source_(const NotifyType notify_type,
TRANS_LOG(WARN, "notify data source failed", K(ret), K(arg));
}
if (notify_array.count() > 0) {
TRANS_LOG(INFO, "notify MDS", K(ret), K(trans_id_), K(ls_id_), K(notify_type), K(log_ts), K(notify_array.count()),
K(notify_array), K(total_time));
TRANS_LOG(INFO, "notify MDS", K(ret), K(trans_id_), K(ls_id_), K(notify_type), K(log_ts),
K(notify_array.count()), K(notify_array), K(total_time));
}
}
return ret;
@ -6547,7 +6595,7 @@ int ObPartTransCtx::register_multi_data_source(const ObTxDataSourceType data_sou
int tmp_ret = OB_SUCCESS;
ObTxBufferNode node;
ObString data;
void *ptr = nullptr;
// void *ptr = nullptr;
ObTxBufferNodeArray tmp_array;
bool need_lock = true;
@ -6589,11 +6637,10 @@ int ObPartTransCtx::register_multi_data_source(const ObTxDataSourceType data_sou
} else if (is_committing_()) {
ret = OB_TRANS_HAS_DECIDED;
TRANS_LOG(WARN, "can not register mds in committing part_ctx", K(ret), KPC(this));
} else if (OB_ISNULL(ptr = MultiTxDataFactory::alloc(len, trans_id_, (uint64_t)this))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
TRANS_LOG(WARN, "allocate memory failed", KR(ret), K(data_source_type), K(len));
} else if (FALSE_IT(MEMCPY(ptr, buf, len))) {
} else if (FALSE_IT(data.assign_ptr(reinterpret_cast<char *>(ptr), len))) {
} else if (OB_FAIL(mds_cache_.try_recover_max_register_no(exec_info_.multi_data_source_))) {
TRANS_LOG(WARN, "recover max register no failed", K(ret), K(mds_cache_), KPC(this));
} else if (OB_FAIL(mds_cache_.alloc_mds_node(this, buf, len, data))) {
TRANS_LOG(WARN, "alloc mds node from the mds_cache_ failed", K(ret), K(mds_cache_), KPC(this));
} else {
mds::BufferCtx *buffer_ctx = nullptr;
if (data_source_type > ObTxDataSourceType::BEFORE_VERSION_4_1
@ -6616,15 +6663,13 @@ int ObPartTransCtx::register_multi_data_source(const ObTxDataSourceType data_sou
ret = OB_LOG_TOO_LARGE;
TRANS_LOG(WARN, "too large mds buf node", K(ret), K(tmp_array.get_serialize_size()));
//#endif
} else if (OB_FAIL(mds_cache_.try_recover_max_register_no(exec_info_.multi_data_source_))) {
TRANS_LOG(WARN, "recover max register no failed", K(ret), K(mds_cache_), KPC(this));
} else if (OB_FAIL(mds_cache_.insert_mds_node(node))) {
TRANS_LOG(WARN, "register multi source data failed", KR(ret), K(data_source_type),
K(*this));
}
if (OB_FAIL(ret)) {
MultiTxDataFactory::free(ptr);
mds_cache_.free_mds_node(data, node.get_register_no());
if (OB_NOT_NULL(buffer_ctx)) {
MTL(mds::ObTenantMdsService *)->get_buffer_ctx_allocator().free(buffer_ctx);
}

View File

@ -565,6 +565,7 @@ private:
int errism_dup_table_redo_sync_();
int errism_submit_prepare_log_();
int errsim_notify_mds_();
protected:
virtual int get_gts_(share::SCN &gts);
virtual int wait_gts_elapse_commit_version_(bool &need_wait);

View File

@ -18,6 +18,18 @@ namespace oceanbase
namespace transaction
{
int ObTxMDSCache::init(int64_t tenant_id)
{
int ret = OB_SUCCESS;
ObMemAttr attr(tenant_id, "MdsMemHash");
if (OB_FAIL(mem_stat_hash_.create(16, attr, attr))) {
TRANS_LOG(WARN, "create mds mem stat failed", K(ret), K(tenant_id));
}
return ret;
}
void ObTxMDSCache::reset()
{
// allocator_.reset();
@ -26,6 +38,7 @@ void ObTxMDSCache::reset()
submitted_iterator_ = mds_list_.end(); // ObTxBufferNodeList::iterator();
need_retry_submit_mds_ = false;
max_register_no_ = 0;
mem_stat_hash_.destroy();
}
void ObTxMDSCache::destroy()
@ -35,10 +48,106 @@ void ObTxMDSCache::destroy()
while (!mds_list_.empty()) {
mds_list_.pop_front(tmp_node);
if (nullptr != tmp_node.data_.ptr()) {
MultiTxDataFactory::free(tmp_node.data_.ptr());
free_mds_node(tmp_node.data_, tmp_node.get_register_no());
}
tmp_node.get_buffer_ctx_node().destroy_ctx();
}
mem_stat_hash_.destroy();
}
int ObTxMDSCache::alloc_mds_node(const ObPartTransCtx *tx_ctx,
const char *buf,
const int64_t buf_len,
common::ObString &data,
uint64_t register_no)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
uint64_t cur_register_no = 0;
ObMDSMemStat tmp_mem_stat;
tmp_mem_stat.reset();
if (register_no <= 0) {
cur_register_no = max_register_no_ + 1;
} else {
cur_register_no = register_no;
}
void *ptr = nullptr;
if (OB_ISNULL(ptr =
MultiTxDataFactory::alloc(buf_len, tx_ctx->get_trans_id(), (uint64_t)tx_ctx))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
TRANS_LOG(WARN, "allocate memory failed", KR(ret), K(buf_len));
} else {
MEMCPY(ptr, buf, buf_len);
data.assign_ptr(reinterpret_cast<char *>(ptr), buf_len);
}
if (OB_TMP_FAIL(mem_stat_hash_.get_refactored(cur_register_no, tmp_mem_stat))) {
if (OB_HASH_NOT_EXIST != tmp_ret) {
TRANS_LOG(ERROR, "get tmp_mem_stat from mem_stat_hash failed", K(ret), K(tmp_ret),
K(cur_register_no), K(tmp_mem_stat), KPC(tx_ctx));
}
tmp_mem_stat.reset();
}
if (OB_SUCCESS == tmp_ret || OB_HASH_NOT_EXIST == tmp_ret) {
tmp_mem_stat.alloc_cnt_++;
if (OB_TMP_FAIL(mem_stat_hash_.set_refactored(cur_register_no, tmp_mem_stat, 1))) {
TRANS_LOG(ERROR, "insert mem_stat_ into hash table failed", K(ret), K(tmp_ret),
K(cur_register_no), K(tmp_mem_stat),KPC(tx_ctx) );
}
}
return ret;
}
void ObTxMDSCache::free_mds_node(common::ObString &data, uint64_t register_no)
{
int tmp_ret = OB_SUCCESS;
uint64_t cur_register_no = register_no;
ObMDSMemStat tmp_mem_stat;
tmp_mem_stat.reset();
if (register_no <= 0) {
cur_register_no = max_register_no_ + 1;
} else {
cur_register_no = register_no;
}
MultiTxDataFactory::free(data.ptr());
if (OB_TMP_FAIL(mem_stat_hash_.get_refactored(cur_register_no, tmp_mem_stat))) {
TRANS_LOG_RET(ERROR, tmp_ret, "get tmp_mem_stat from mem_stat_hash failed", K(ret), K(tmp_ret),
K(cur_register_no), K(tmp_mem_stat));
tmp_mem_stat.reset();
}
if (OB_SUCCESS == tmp_ret) {
tmp_mem_stat.free_cnt_++;
if (tmp_mem_stat.free_cnt_ >= tmp_mem_stat.alloc_cnt_) {
if (OB_TMP_FAIL(mem_stat_hash_.erase_refactored(cur_register_no))) {
TRANS_LOG_RET(ERROR, tmp_ret, "insert mem_stat_ into hash table failed", K(ret), K(tmp_ret),
K(cur_register_no), K(tmp_mem_stat));
}
}
}
}
bool ObTxMDSCache::is_mem_leak()
{
bool mem_leak = !mem_stat_hash_.empty();
if (mem_leak) {
for (ObTxMDSMemStatHash::iterator iter = mem_stat_hash_.begin(); iter != mem_stat_hash_.end();
iter++) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "mds node mem leak", K(iter->first), K(iter->second));
}
}
return mem_leak;
}
int ObTxMDSCache::try_recover_max_register_no(const ObTxBufferNodeArray &node_array)
@ -84,7 +193,7 @@ int ObTxMDSCache::rollback_last_mds_node()
TRANS_LOG(WARN, "pop back last node failed", K(ret));
} else {
TRANS_LOG(INFO, "rollback the last mds node", K(ret), K(buf_node), KPC(this));
MultiTxDataFactory::free(buf_node.get_ptr());
free_mds_node(buf_node.data_, buf_node.get_register_no());
buf_node.get_buffer_ctx_node().destroy_ctx();
}

View File

@ -26,13 +26,45 @@ typedef ObList<ObTxBufferNode, TransModulePageAllocator> ObTxBufferNodeList;
class ObTxMDSRange;
struct ObMDSMemStat
{
// uint64_t register_no_;
uint64_t alloc_cnt_;
uint64_t free_cnt_;
uint64_t mem_size_;
void reset()
{
alloc_cnt_ = 0;
free_cnt_ = 0;
mem_size_ = 0;
}
ObMDSMemStat() { reset(); }
TO_STRING_KV(K(alloc_cnt_), K(free_cnt_), K(mem_size_));
};
typedef common::hash::ObHashMap<uint64_t, ObMDSMemStat> ObTxMDSMemStatHash;
class ObTxMDSCache
{
public:
ObTxMDSCache(TransModulePageAllocator &allocator) : mds_list_(allocator) { reset(); }
int init(int64_t tenant_id);
void reset();
void destroy();
int alloc_mds_node(const ObPartTransCtx *tx_ctx,
const char *buf,
const int64_t buf_len,
common::ObString &data,
uint64_t register_no = 0);
void free_mds_node(common::ObString & data, uint64_t register_no = 0);
bool is_mem_leak();
int try_recover_max_register_no(const ObTxBufferNodeArray & node_array);
int insert_mds_node(ObTxBufferNode &buf_node);
int rollback_last_mds_node();
@ -51,10 +83,7 @@ public:
int64_t count() const { return mds_list_.size(); }
void update_submitted_iterator(ObTxBufferNodeArray &range_array);
void update_sync_failed_range(ObTxBufferNodeArray &range_array);
// {
// unsubmitted_size_ = unsubmitted_size_ - iter->get_serialize_size();
// submitted_iterator_ = iter;
// }
void clear_submitted_iterator() { submitted_iterator_ = mds_list_.end(); }
bool is_contain(const ObTxDataSourceType target_type) const;
@ -71,6 +100,8 @@ private:
int64_t unsubmitted_size_;
ObTxBufferNodeList mds_list_;
ObTxBufferNodeList::iterator submitted_iterator_;
ObTxMDSMemStatHash mem_stat_hash_;
};
class ObTxMDSRange

View File

@ -71,6 +71,24 @@ OB_NOINLINE int ObTransService::acquire_local_snapshot_(const share::ObLSID &ls_
acquire_from_follower = false;
return ret;
}
bool NOTIFY_MDS_ERRSIM = false;
OB_NOINLINE int ObPartTransCtx::errsim_notify_mds_()
{
int ret = OB_SUCCESS;
if (NOTIFY_MDS_ERRSIM) {
ret = OB_ERR_UNEXPECTED;
}
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "errsim notify mds", K(ret), K(NOTIFY_MDS_ERRSIM));
}
return ret;
}
class ObTestRegisterMDS : public ::testing::Test
{
public:
@ -162,6 +180,33 @@ TEST_F(ObTestRegisterMDS, basic_big_mds)
#endif
}
TEST_F(ObTestRegisterMDS, notify_mds_error)
{
START_TWO_TX_NODE_WITH_LSID(n1, n2, 2005);
PREPARE_TX(n1, tx);
PREPARE_TX_PARAM(tx_param);
const char *mds_str = "register mds basic";
ASSERT_EQ(OB_SUCCESS, n1->start_tx(tx, tx_param));
NOTIFY_MDS_ERRSIM = true;
ASSERT_EQ(OB_ERR_UNEXPECTED, n1->txs_.register_mds_into_tx(tx, n1->ls_id_, ObTxDataSourceType::DDL_TRANS,
mds_str, strlen(mds_str)));
NOTIFY_MDS_ERRSIM = false;
n2->wait_all_redolog_applied();
ASSERT_EQ(OB_SUCCESS, n1->commit_tx(tx, n1->ts_after_ms(500)));
n2->set_as_follower_replica(*n1);
ReplayLogEntryFunctor functor(n2);
ASSERT_EQ(OB_SUCCESS, n2->fake_tx_log_adapter_->replay_all(functor));
GC_MDS_RETAIN_CTX(n1)
ASSERT_EQ(OB_SUCCESS, n1->wait_all_tx_ctx_is_destoryed());
GC_MDS_RETAIN_CTX(n2)
ASSERT_EQ(OB_SUCCESS, n2->wait_all_tx_ctx_is_destoryed());
}
} // namespace oceanbase
int main(int argc, char **argv)