[OBCDC] Fix memory not controlled caused by redo dispatch not under controll

This commit is contained in:
SanmuWangZJU
2023-09-18 03:44:04 +00:00
committed by ob-robot
parent 0534888042
commit 2970475539
55 changed files with 1004 additions and 310 deletions

View File

@ -71,7 +71,7 @@ TEST(ObLogTransTaskPool, Init)
ObLogTransTaskPool<MockTransTask> pool;
int ret = pool.init(&fifo, part_trans_task_prealloc_count, page_size, true, prealloc_page_count);
int ret = pool.init(&fifo, part_trans_task_prealloc_count, true, prealloc_page_count);
EXPECT_EQ(OB_SUCCESS, ret);
}
@ -85,7 +85,7 @@ TEST(ObLogTransTaskPool, Function1)
ObLogTransTaskPool<MockTransTask> pool;
int ret = pool.init(&fifo, 1024 * 8, 1024, true, 1024);
int ret = pool.init(&fifo, 1024 * 8, true, 1024);
EXPECT_EQ(OB_SUCCESS, ret);
MockTransTask **tasks = new MockTransTask*[task_cnt];
@ -118,7 +118,7 @@ TEST(ObLogTransTaskPool, Function2)
ObLogTransTaskPool<MockTransTask> pool;
int ret = pool.init(&fifo, 1024 * 8, 1024, true, 1024);
int ret = pool.init(&fifo, 1024 * 8, true, 1024);
EXPECT_EQ(OB_SUCCESS, ret);
MockTransTask **tasks = new MockTransTask*[task_cnt];

View File

@ -63,7 +63,7 @@ using namespace logfetcher;
ObLogPartTransResolverFactory resolver_factory; \
ObLogTransTaskPool<PartTransTask> task_pool; \
EXPECT_EQ(OB_SUCCESS, fifo_allocator.init(16 * _G_, 16 * _M_, OB_MALLOC_NORMAL_BLOCK_SIZE)); \
EXPECT_EQ(OB_SUCCESS, task_pool.init(&fifo_allocator, PREALLOC_POOL_SIZE, TRANS_TASK_PAGE_SIZE, true, PREALLOC_PAGE_COUNT)); \
EXPECT_EQ(OB_SUCCESS, task_pool.init(&fifo_allocator, PREALLOC_POOL_SIZE, true, PREALLOC_PAGE_COUNT)); \
ObLogEntryTaskPool log_entry_task_pool; \
EXPECT_EQ(OB_SUCCESS, log_entry_task_pool.init(10/* fixed_log_entry_task_count */)); \
MockFetcherDispatcher fetcher_dispatcher; \
@ -132,7 +132,6 @@ namespace unittest
// Task Pool
static const int64_t PREALLOC_POOL_SIZE = 10 * 1024;
static const int64_t TRANS_TASK_PAGE_SIZE = 1024;
static const int64_t TRANS_TASK_BLOCK_SIZE = 4 * 1024 *1024;
static const int64_t PREALLOC_PAGE_COUNT = 1024;
@ -374,8 +373,9 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq2_miss)
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry, lsn, tsi, new_miss_log));
EXPECT_EQ(0, new_miss_log.get_total_misslog_cnt());
missing_info.set_need_reconsume_commit_log_entry();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry2, lsn2, missing_info, tsi, stop_flag));
IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info;
reconsume_miss_info.set_reconsuming();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry2, lsn2, reconsume_miss_info, tsi, stop_flag));
DESTROY_OBLOG_INSTANCE();
}
@ -425,7 +425,7 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq3_miss)
EXPECT_EQ(0, new_miss_log.get_total_misslog_cnt());
EXPECT_TRUE(missing_info.need_reconsume_commit_log_entry());
IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info;
reconsume_miss_info.set_need_reconsume_commit_log_entry();
reconsume_miss_info.set_reconsuming();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry2, lsn2, reconsume_miss_info, tsi, stop_flag));
EXPECT_EQ(0, reconsume_miss_info.get_total_misslog_cnt());
@ -483,7 +483,7 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq4_miss_1)
new_miss_log.set_resolving_miss_log();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry, lsn, tsi, new_miss_log));
IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info;
reconsume_miss_info.set_need_reconsume_commit_log_entry();
reconsume_miss_info.set_reconsuming();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry2, lsn2, reconsume_miss_info, tsi, stop_flag));
missing_info.reset();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry3, lsn3, missing_info, tsi, stop_flag));
@ -524,7 +524,7 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq4_miss_2)
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry, lsn, tsi, new_miss_log_2));
IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info;
reconsume_miss_info.set_need_reconsume_commit_log_entry();
reconsume_miss_info.set_reconsuming();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry3, lsn3, reconsume_miss_info, tsi, stop_flag));
@ -582,9 +582,6 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq5_miss)
IObCDCPartTransResolver::MissingLogInfo new_miss_log;
new_miss_log.set_resolving_miss_log();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry, lsn, tsi, new_miss_log));
IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info;
reconsume_miss_info.set_need_reconsume_commit_log_entry();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry2, lsn2, reconsume_miss_info, tsi, stop_flag));
missing_info.reset();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry3, lsn3, missing_info, tsi, stop_flag));
DESTROY_OBLOG_INSTANCE();
@ -651,7 +648,7 @@ TEST(ObCDCPartTransResolver, test_sp_tx_seq6_miss)
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry2, lsn2, tsi, new_miss_log));
IObCDCPartTransResolver::MissingLogInfo reconsume_miss_info;
reconsume_miss_info.set_need_reconsume_commit_log_entry();
reconsume_miss_info.set_reconsuming();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry3, lsn3, reconsume_miss_info, tsi, stop_flag));
@ -746,7 +743,7 @@ TEST(ObCDCPartTransResolver, test_sp_tx_dist_miss2)
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_miss_tx_log(log_entry, lsn, tsi, new_miss_log));
EXPECT_EQ(0, new_miss_log.get_total_misslog_cnt());
missing_info.reset();
missing_info.set_need_reconsume_commit_log_entry();
missing_info.set_reconsuming();
EXPECT_EQ(OB_SUCCESS, ls_fetch_ctx->read_log(log_entry3, lsn3, missing_info, tsi, stop_flag));
DESTROY_OBLOG_INSTANCE();