From 729902b8474e69cd17a5b682f3d9135e937a7534 Mon Sep 17 00:00:00 2001 From: taoshuning <616811991@qq.com> Date: Thu, 16 Nov 2023 12:44:57 +0000 Subject: [PATCH] check thread stop in archive consumption Co-authored-by: SanmuWangZJU --- .../test_restore_archive_log.cpp | 394 ++++++++++-------- src/logservice/libobcdc/src/ob_ls_worker.cpp | 25 +- src/logservice/libobcdc/src/ob_ls_worker.h | 10 +- .../ob_log_archive_piece_mgr.cpp | 15 +- .../restoreservice/ob_remote_log_iterator.ipp | 7 + 5 files changed, 244 insertions(+), 207 deletions(-) diff --git a/mittest/logservice/archiveservice/test_restore_archive_log.cpp b/mittest/logservice/archiveservice/test_restore_archive_log.cpp index 20a84b173..bbfe0791e 100644 --- a/mittest/logservice/archiveservice/test_restore_archive_log.cpp +++ b/mittest/logservice/archiveservice/test_restore_archive_log.cpp @@ -20,6 +20,7 @@ #include "logservice/restoreservice/ob_log_archive_piece_mgr.h" #undef private #include "fake_archive_piece_mgr.h" +#include "share/ob_thread_pool.h" #include "share/scn.h" namespace oceanbase @@ -27,208 +28,241 @@ namespace oceanbase using namespace palf; namespace unittest { -TEST(FakeArchivePieceContext, get_piece) + +volatile bool consume_finish = false; +class FakeWorker : public share::ObThreadPool { - int ret = OB_SUCCESS; - const int64_t ONE_SECOND = 1000 * 1000 * 1000L; - const int64_t base_ts_1 = 1000 * ONE_SECOND; - share::SCN base_scn_1; - base_scn_1.convert_for_logservice(base_ts_1); - const int64_t piece_switch_interval = 10 * ONE_SECOND; - const int64_t ONE_MB = 1024 * 1024L; - FakeRounds rounds; - int64_t file_id = 1; - FakeArchiveComponent round1; +public: + int start() { - round1.state_ = FakeRoundState::STOP; - round1.round_id_ = 2; - round1.start_scn_.convert_for_logservice(1005 * ONE_SECOND); - round1.end_scn_.convert_for_logservice(1051 * ONE_SECOND); - round1.base_piece_id_ = 1; - round1.piece_switch_interval_ = piece_switch_interval; - round1.base_piece_scn_ = base_scn_1; - round1.min_piece_id_ = 1; - round1.max_piece_id_ = 6; - for (int64_t i = round1.min_piece_id_; OB_SUCC(ret) && i <= round1.max_piece_id_; i++) { - FakePieceComponent piece; - { - if (i == 3) { - piece.state_ = FakePieceState::EMPTY; - piece.piece_id_ = i; - piece.min_lsn_ = LSN((file_id - 1) * 64 * ONE_MB + 10240); - piece.max_lsn_ = LSN((file_id - 1) * 64 * ONE_MB + 10240); - } else { + return share::ObThreadPool::start(); + } + + void stop() + { + share::ObThreadPool::stop(); + } + + void wait() + { + share::ObThreadPool::wait(); + } + + void run1() + { + int ret = OB_SUCCESS; + const int64_t ONE_SECOND = 1000 * 1000 * 1000L; + const int64_t base_ts_1 = 1000 * ONE_SECOND; + share::SCN base_scn_1; + base_scn_1.convert_for_logservice(base_ts_1); + const int64_t piece_switch_interval = 10 * ONE_SECOND; + const int64_t ONE_MB = 1024 * 1024L; + FakeRounds rounds; + int64_t file_id = 1; + FakeArchiveComponent round1; + { + round1.state_ = FakeRoundState::STOP; + round1.round_id_ = 2; + round1.start_scn_.convert_for_logservice(1005 * ONE_SECOND); + round1.end_scn_.convert_for_logservice(1051 * ONE_SECOND); + round1.base_piece_id_ = 1; + round1.piece_switch_interval_ = piece_switch_interval; + round1.base_piece_scn_ = base_scn_1; + round1.min_piece_id_ = 1; + round1.max_piece_id_ = 6; + for (int64_t i = round1.min_piece_id_; OB_SUCC(ret) && i <= round1.max_piece_id_; i++) { + FakePieceComponent piece; + { + if (i == 3) { + piece.state_ = FakePieceState::EMPTY; + piece.piece_id_ = i; + piece.min_lsn_ = LSN((file_id - 1) * 64 * ONE_MB + 10240); + piece.max_lsn_ = LSN((file_id - 1) * 64 * ONE_MB + 10240); + } else { + piece.state_ = FakePieceState::FRONZEN; + piece.piece_id_ = i; + piece.min_file_id_ = file_id; + file_id = file_id + 10; + piece.max_file_id_ = file_id; + piece.min_lsn_ = LSN((piece.min_file_id_ - 1) * 64 * ONE_MB + 10240); // p1(1, 11), p2(11, 21), p3(EMPTY), p4(21, 31), p5(31, 41), p6(41, 51) + piece.max_lsn_ = LSN((piece.max_file_id_ - 1) * 64 * ONE_MB + 10240); + } + } + ret = round1.array_.push_back(piece); + EXPECT_EQ(OB_SUCCESS, ret); + CLOG_LOG(INFO, "COME HERE print", K(piece), K(round1.round_id_)); + } + } + ret = rounds.array_.push_back(round1); + EXPECT_EQ(OB_SUCCESS, ret); + + FakeArchiveComponent round2; + { + round2.state_ = FakeRoundState::STOP; + round2.round_id_ = 4; + round2.start_scn_.convert_for_logservice(1051 * ONE_SECOND); + round2.end_scn_.convert_for_logservice(1095 * ONE_SECOND); + round2.base_piece_id_ = 2; + round2.piece_switch_interval_ = piece_switch_interval; + round2.base_piece_scn_ = base_scn_1; + round2.min_piece_id_ = 7; + round2.max_piece_id_ = 11; + for (int64_t i = round2.min_piece_id_ ; OB_SUCC(ret) && i <= round2.max_piece_id_; i++) { + FakePieceComponent piece; + { piece.state_ = FakePieceState::FRONZEN; piece.piece_id_ = i; piece.min_file_id_ = file_id; file_id = file_id + 10; piece.max_file_id_ = file_id; - piece.min_lsn_ = LSN((piece.min_file_id_ - 1) * 64 * ONE_MB + 10240); // p1(1, 11), p2(11, 21), p3(EMPTY), p4(21, 31), p5(31, 41), p6(41, 51) + piece.min_lsn_ = LSN((piece.min_file_id_ - 1) * 64 * ONE_MB + 10240); // p7(51, 61), p8(61, 71), p9(71, 81), p9(81, 91), p10(91, 101), p11(101, 111) piece.max_lsn_ = LSN((piece.max_file_id_ - 1) * 64 * ONE_MB + 10240); } + ret = round2.array_.push_back(piece); + EXPECT_EQ(OB_SUCCESS, ret); + CLOG_LOG(INFO, "COME HERE print", K(piece), K(round2.round_id_)); } - ret = round1.array_.push_back(piece); - EXPECT_EQ(OB_SUCCESS, ret); - CLOG_LOG(INFO, "COME HERE print", K(piece), K(round1.round_id_)); } - } - ret = rounds.array_.push_back(round1); - EXPECT_EQ(OB_SUCCESS, ret); + ret = rounds.array_.push_back(round2); + EXPECT_EQ(OB_SUCCESS, ret); - FakeArchiveComponent round2; - { - round2.state_ = FakeRoundState::STOP; - round2.round_id_ = 4; - round2.start_scn_.convert_for_logservice(1051 * ONE_SECOND); - round2.end_scn_.convert_for_logservice(1095 * ONE_SECOND); - round2.base_piece_id_ = 2; - round2.piece_switch_interval_ = piece_switch_interval; - round2.base_piece_scn_ = base_scn_1; - round2.min_piece_id_ = 7; - round2.max_piece_id_ = 11; - for (int64_t i = round2.min_piece_id_ ; OB_SUCC(ret) && i <= round2.max_piece_id_; i++) { + + FakeArchiveComponent round3; + { + round3.state_ = FakeRoundState::STOP; + round3.round_id_ = 5; + round3.start_scn_.convert_for_logservice(1200 * ONE_SECOND); + round3.end_scn_.convert_for_logservice(1295 * ONE_SECOND); + round3.base_piece_id_ = 2; + round3.piece_switch_interval_ = piece_switch_interval; + round3.base_piece_scn_ = base_scn_1; + round3.min_piece_id_ = 20; + round3.max_piece_id_ = 25; + for (int64_t i = round3.min_piece_id_; OB_SUCC(ret) && i < round3.max_piece_id_; i++) { + FakePieceComponent piece; + { + piece.state_ = FakePieceState::FRONZEN; + piece.piece_id_ = i; + piece.min_file_id_ = file_id; + file_id = file_id + 10; + piece.max_file_id_ = file_id; + piece.min_lsn_ = LSN((piece.min_file_id_ - 1) * 64 * ONE_MB + 10240); // p20(111, 121), p22(121, 131), p23(131, 141), p24(141, 151), p25(NO file) + piece.max_lsn_ = LSN((piece.max_file_id_ - 1) * 64 * ONE_MB + 10240); + } + ret = round3.array_.push_back(piece); + EXPECT_EQ(OB_SUCCESS, ret); + CLOG_LOG(INFO, "COME HERE print", K(piece), K(round3.round_id_)); + } FakePieceComponent piece; { - piece.state_ = FakePieceState::FRONZEN; - piece.piece_id_ = i; - piece.min_file_id_ = file_id; - file_id = file_id + 10; - piece.max_file_id_ = file_id; - piece.min_lsn_ = LSN((piece.min_file_id_ - 1) * 64 * ONE_MB + 10240); // p7(51, 61), p8(61, 71), p9(71, 81), p9(81, 91), p10(91, 101), p11(101, 111) - piece.max_lsn_ = LSN((piece.max_file_id_ - 1) * 64 * ONE_MB + 10240); - } - ret = round2.array_.push_back(piece); - EXPECT_EQ(OB_SUCCESS, ret); - CLOG_LOG(INFO, "COME HERE print", K(piece), K(round2.round_id_)); - } - } - ret = rounds.array_.push_back(round2); - EXPECT_EQ(OB_SUCCESS, ret); - - - FakeArchiveComponent round3; - { - round3.state_ = FakeRoundState::STOP; - round3.round_id_ = 5; - round3.start_scn_.convert_for_logservice(1200 * ONE_SECOND); - round3.end_scn_.convert_for_logservice(1295 * ONE_SECOND); - round3.base_piece_id_ = 2; - round3.piece_switch_interval_ = piece_switch_interval; - round3.base_piece_scn_ = base_scn_1; - round3.min_piece_id_ = 20; - round3.max_piece_id_ = 25; - for (int64_t i = round3.min_piece_id_; OB_SUCC(ret) && i < round3.max_piece_id_; i++) { - FakePieceComponent piece; - { - piece.state_ = FakePieceState::FRONZEN; - piece.piece_id_ = i; - piece.min_file_id_ = file_id; - file_id = file_id + 10; - piece.max_file_id_ = file_id; - piece.min_lsn_ = LSN((piece.min_file_id_ - 1) * 64 * ONE_MB + 10240); // p20(111, 121), p22(121, 131), p23(131, 141), p24(141, 151), p25(NO file) - piece.max_lsn_ = LSN((piece.max_file_id_ - 1) * 64 * ONE_MB + 10240); + piece.state_ = FakePieceState::ACTIVE; + piece.piece_id_ = round3.max_piece_id_; + piece.min_file_id_ = 0; + piece.max_file_id_ = 0; } ret = round3.array_.push_back(piece); EXPECT_EQ(OB_SUCCESS, ret); - CLOG_LOG(INFO, "COME HERE print", K(piece), K(round3.round_id_)); } - FakePieceComponent piece; - { - piece.state_ = FakePieceState::ACTIVE; - piece.piece_id_ = round3.max_piece_id_; - piece.min_file_id_ = 0; - piece.max_file_id_ = 0; - } - ret = round3.array_.push_back(piece); + ret = rounds.array_.push_back(round3); EXPECT_EQ(OB_SUCCESS, ret); + + + FakeArchivePieceContext archive_context; + FakeArchivePieceContext *context = &archive_context; + ret = archive_context.init(share::ObLSID(1001), &rounds); + EXPECT_EQ(OB_SUCCESS, ret); + + int64_t log_ts = 1021 * ONE_SECOND; + share::SCN scn; + scn.convert_for_logservice(log_ts); + palf::LSN lsn(64 * ONE_MB + ONE_MB); + const int64_t ONE_PIECE_MB = 10 * 64 * ONE_MB; + int64_t dest_id = 0; + int64_t round_id = 0; + int64_t piece_id = 0; + int64_t cur_file_id = 0; + int64_t offset = 0; + bool to_newest = false; + palf::LSN max_lsn; + CLOG_LOG(INFO, "print get piece 1", K(lsn)); + ret = context->get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); + EXPECT_EQ(OB_SUCCESS, ret); + EXPECT_EQ(round_id, 2); + EXPECT_EQ(piece_id, 1); + EXPECT_EQ(cur_file_id, 2); + EXPECT_EQ(offset, 0); + + lsn = LSN(23 * 64 * ONE_MB); + CLOG_LOG(INFO, "print get piece 2", K(lsn)); + scn.convert_for_logservice(log_ts + 3); + ret = context->get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); + EXPECT_EQ(OB_SUCCESS, ret); + EXPECT_EQ(round_id, 2); + EXPECT_EQ(piece_id, 4); + EXPECT_EQ(cur_file_id, 24); + EXPECT_EQ(offset, 0); + + lsn = LSN(122 * 64 * ONE_MB); + CLOG_LOG(INFO, "print get piece 3", K(lsn)); + scn.convert_for_logservice(log_ts+20); + ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); + EXPECT_EQ(OB_ARCHIVE_ROUND_NOT_CONTINUOUS, ret); + + archive_context.reset_locate_info(); + log_ts = 1199 * ONE_SECOND; + CLOG_LOG(INFO, "print get piece 4", K(lsn)); + scn.convert_for_logservice(log_ts); + ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); + EXPECT_EQ(OB_SUCCESS, ret); + EXPECT_EQ(round_id, 5); + EXPECT_EQ(piece_id, 22); + EXPECT_EQ(cur_file_id, 123); + EXPECT_EQ(offset, 0); + int64_t fake_offset = 102400; + palf::LSN fake_lsn(102400); + ret = archive_context.update_file_info(dest_id, round_id, piece_id, cur_file_id, fake_offset, fake_lsn); + EXPECT_EQ(OB_SUCCESS, ret); + lsn = lsn + 102400; + CLOG_LOG(INFO, "print get piece 6", K(lsn)); + ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); + EXPECT_EQ(round_id, 5); + EXPECT_EQ(piece_id, 22); + EXPECT_EQ(cur_file_id, 123); + EXPECT_EQ(offset, fake_offset); + EXPECT_EQ(max_lsn, fake_lsn); + + lsn = LSN(151 * 64 * ONE_MB + ONE_MB); + CLOG_LOG(INFO, "print get piece 7", K(lsn)); + ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); + EXPECT_EQ(OB_ITER_END, ret); + + // bad case, current piece hang, can not backward piece + lsn = LSN(10 * 64 * ONE_MB); + CLOG_LOG(INFO, "print get piece 8", K(lsn)); + ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); + EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND, ret); + + archive_context.reset_locate_info(); + log_ts = 1010; + scn.convert_for_logservice(log_ts); + CLOG_LOG(INFO, "print get piece 9", K(lsn)); + ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); + EXPECT_EQ(OB_SUCCESS, ret); + + consume_finish = true; } - ret = rounds.array_.push_back(round3); - EXPECT_EQ(OB_SUCCESS, ret); +}; - - FakeArchivePieceContext archive_context; - FakeArchivePieceContext *context = &archive_context; - ret = archive_context.init(share::ObLSID(1001), &rounds); - EXPECT_EQ(OB_SUCCESS, ret); - - int64_t log_ts = 1021 * ONE_SECOND; - share::SCN scn; - scn.convert_for_logservice(log_ts); - palf::LSN lsn(64 * ONE_MB + ONE_MB); - const int64_t ONE_PIECE_MB = 10 * 64 * ONE_MB; - int64_t dest_id = 0; - int64_t round_id = 0; - int64_t piece_id = 0; - int64_t cur_file_id = 0; - int64_t offset = 0; - bool to_newest = false; - palf::LSN max_lsn; - CLOG_LOG(INFO, "print get piece 1", K(lsn)); - ret = context->get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); - EXPECT_EQ(OB_SUCCESS, ret); - EXPECT_EQ(round_id, 2); - EXPECT_EQ(piece_id, 1); - EXPECT_EQ(cur_file_id, 2); - EXPECT_EQ(offset, 0); - - lsn = LSN(23 * 64 * ONE_MB); - CLOG_LOG(INFO, "print get piece 2", K(lsn)); - scn.convert_for_logservice(log_ts + 3); - ret = context->get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); - EXPECT_EQ(OB_SUCCESS, ret); - EXPECT_EQ(round_id, 2); - EXPECT_EQ(piece_id, 4); - EXPECT_EQ(cur_file_id, 24); - EXPECT_EQ(offset, 0); - - lsn = LSN(122 * 64 * ONE_MB); - CLOG_LOG(INFO, "print get piece 3", K(lsn)); - scn.convert_for_logservice(log_ts+20); - ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); - EXPECT_EQ(OB_ARCHIVE_ROUND_NOT_CONTINUOUS, ret); - - archive_context.reset_locate_info(); - log_ts = 1199 * ONE_SECOND; - CLOG_LOG(INFO, "print get piece 4", K(lsn)); - scn.convert_for_logservice(log_ts); - ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); - EXPECT_EQ(OB_SUCCESS, ret); - EXPECT_EQ(round_id, 5); - EXPECT_EQ(piece_id, 22); - EXPECT_EQ(cur_file_id, 123); - EXPECT_EQ(offset, 0); - int64_t fake_offset = 102400; - palf::LSN fake_lsn(102400); - ret = archive_context.update_file_info(dest_id, round_id, piece_id, cur_file_id, fake_offset, fake_lsn); - EXPECT_EQ(OB_SUCCESS, ret); - lsn = lsn + 102400; - CLOG_LOG(INFO, "print get piece 6", K(lsn)); - ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); - EXPECT_EQ(round_id, 5); - EXPECT_EQ(piece_id, 22); - EXPECT_EQ(cur_file_id, 123); - EXPECT_EQ(offset, fake_offset); - EXPECT_EQ(max_lsn, fake_lsn); - - lsn = LSN(151 * 64 * ONE_MB + ONE_MB); - CLOG_LOG(INFO, "print get piece 7", K(lsn)); - ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); - EXPECT_EQ(OB_ITER_END, ret); - - // bad case, current piece hang, can not backward piece - lsn = LSN(10 * 64 * ONE_MB); - CLOG_LOG(INFO, "print get piece 8", K(lsn)); - ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); - EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND, ret); - - archive_context.reset_locate_info(); - log_ts = 1010; - scn.convert_for_logservice(log_ts); - CLOG_LOG(INFO, "print get piece 9", K(lsn)); - ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest); - EXPECT_EQ(OB_SUCCESS, ret); +TEST(FakeArchivePieceContext, get_piece) +{ + FakeWorker worker; + worker.start(); + while (! consume_finish) { + sleep(1); + } + worker.stop(); + worker.wait(); } - } } diff --git a/src/logservice/libobcdc/src/ob_ls_worker.cpp b/src/logservice/libobcdc/src/ob_ls_worker.cpp index 9dd5e9221..847d9a98a 100644 --- a/src/logservice/libobcdc/src/ob_ls_worker.cpp +++ b/src/logservice/libobcdc/src/ob_ls_worker.cpp @@ -73,7 +73,7 @@ int ObLSWorker::init(const int64_t worker_thread_num, LOG_ERROR("init timer fail", KR(ret), K(max_timer_task_count)); } // Initializing the thread pool - else if (OB_FAIL(StreamWorkerThread::init(worker_thread_num, + else if (OB_FAIL(ObMapQueueThreadPool::init(OB_SERVER_TENANT_ID, worker_thread_num, ObModIds::OB_LS_WORKER_THREAD))) { LOG_ERROR("init worker thread fail", KR(ret), K(worker_thread_num)); } else { @@ -103,7 +103,7 @@ void ObLSWorker::destroy() inited_ = false; stream_paused_ = false; fetcher_resume_time_ = OB_INVALID_TIMESTAMP; - StreamWorkerThread::destroy(); + ObMapQueueThreadPool::destroy(); timer_.destroy(); fetcher_host_ = nullptr; idle_pool_ = NULL; @@ -123,7 +123,7 @@ int ObLSWorker::start() ret = OB_NOT_INIT; } else if (OB_FAIL(timer_.start())) { LOG_ERROR("start timer thread fail", KR(ret)); - } else if (OB_FAIL(StreamWorkerThread::start())) { + } else if (OB_FAIL(ObMapQueueThreadPool::start())) { LOG_ERROR("start stream worker fail", KR(ret)); } else { LOG_INFO("start stream worker succ"); @@ -137,7 +137,7 @@ void ObLSWorker::stop() LOG_INFO("stop stream worker begin"); mark_stop_flag(); timer_.stop(); - StreamWorkerThread::stop(); + ObMapQueueThreadPool::stop(); LOG_INFO("stop stream worker succ"); } } @@ -146,7 +146,7 @@ void ObLSWorker::mark_stop_flag() { LOG_INFO("stream worker mark_stop_flag begin"); timer_.mark_stop_flag(); - StreamWorkerThread::mark_stop_flag(); + ObMapQueueThreadPool::has_set_stop() = true; LOG_INFO("stream worker mark_stop_flag end"); } @@ -289,7 +289,7 @@ int ObLSWorker::dispatch_stream_task(FetchStream &task, const char *from_mod) } // Rotating the task of fetching log streams to work threads - if (OB_FAIL(StreamWorkerThread::push(&task, hash_val))) { + if (OB_FAIL(ObMapQueueThreadPool::push(&task, hash_val))) { if (OB_IN_STOP_STATE != ret) { LOG_ERROR("push stream task into thread queue fail", KR(ret)); } @@ -320,8 +320,7 @@ int ObLSWorker::hibernate_stream_task(FetchStream &task, const char *from_mod) } // hendle function for thread pool -int ObLSWorker::handle(void *data, - const int64_t thread_index, +void ObLSWorker::handle(void *data, volatile bool &stop_flag) { int ret = OB_SUCCESS; @@ -329,11 +328,11 @@ int ObLSWorker::handle(void *data, FetchStream *task = static_cast(data); ObLogTraceIdGuard trace_guard; - LOG_DEBUG("[STAT] [STREAM_WORKER] [HANDLE_STREAM_TASK]", K_(stream_paused), K(thread_index), + LOG_DEBUG("[STAT] [STREAM_WORKER] [HANDLE_STREAM_TASK]", K_(stream_paused), "thread_index", get_thread_idx(), K(task), KPC(task)); if (OB_ISNULL(task)) { - LOG_ERROR("invalid task", K(task), K(thread_index)); + LOG_ERROR("invalid task", K(task), "thread_index", get_thread_idx()); ret = OB_INVALID_ARGUMENT; } // If the stream task is currently suspended, the task is put to sleep @@ -353,7 +352,7 @@ int ObLSWorker::handle(void *data, // Can no longer continue with the task } - if (0 == thread_index) { + if (0 == get_thread_idx()) { if (REACH_TIME_INTERVAL(STAT_INTERVAL)) { print_stat_(); } @@ -361,9 +360,9 @@ int ObLSWorker::handle(void *data, if (OB_SUCCESS != ret && OB_IN_STOP_STATE != ret && OB_NOT_NULL(err_handler_)) { err_handler_->handle_error(ret, "stream worker exits on error, err=%d, thread_index=%ld", - ret, thread_index); + ret, get_thread_idx()); + ObMapQueueThreadPool::has_set_stop() = true; // mark thread pool stop; } - return ret; } void ObLSWorker::configure(const ObLogConfig & config) diff --git a/src/logservice/libobcdc/src/ob_ls_worker.h b/src/logservice/libobcdc/src/ob_ls_worker.h index 6ed469dd7..1e5d9c11b 100644 --- a/src/logservice/libobcdc/src/ob_ls_worker.h +++ b/src/logservice/libobcdc/src/ob_ls_worker.h @@ -20,12 +20,13 @@ #include "lib/lock/ob_spin_lock.h" // ObSpinLock #include "ob_log_config.h" // ObLogConfig -#include "ob_map_queue_thread.h" // ObMapQueueThread +#include "lib/thread/ob_map_queue_thread_pool.h"// ObMapQueueThreadPool #include "ob_log_timer.h" // ObLogFixedTimer #include "ob_log_ls_fetch_stream.h" // FetchStream namespace oceanbase { +using namespace common; namespace libobcdc { @@ -62,9 +63,7 @@ class IObLogFetcherDeadPool; class IObLogSvrFinder; class IObLogErrHandler; -typedef common::ObMapQueueThread StreamWorkerThread; - -class ObLSWorker : public IObLSWorker, public StreamWorkerThread +class ObLSWorker : public IObLSWorker, public ObMapQueueThreadPool { static const int64_t STAT_INTERVAL = 5 * _SEC_; @@ -101,7 +100,8 @@ public: public: // Overloading thread handling functions - virtual int handle(void *data, const int64_t thread_index, volatile bool &stop_flag); + // virtual int handle(void *data, const int64_t thread_index, volatile bool &stop_flag); + virtual void handle(void *data, volatile bool &stop_flag) override; public: static void configure(const ObLogConfig & config); diff --git a/src/logservice/restoreservice/ob_log_archive_piece_mgr.cpp b/src/logservice/restoreservice/ob_log_archive_piece_mgr.cpp index 3a5b60ef8..a97aece12 100644 --- a/src/logservice/restoreservice/ob_log_archive_piece_mgr.cpp +++ b/src/logservice/restoreservice/ob_log_archive_piece_mgr.cpp @@ -497,9 +497,7 @@ int ObLogArchivePieceContext::get_piece_(const SCN &scn, { int ret = OB_SUCCESS; bool done = false; - int64_t times = 0; - const int64_t MAX_RETRY_TIMES = 100; - while (OB_SUCC(ret) && ! done && times < MAX_RETRY_TIMES) { + while (OB_SUCC(ret) && ! done) { if (OB_FAIL(switch_round_if_need_(scn, lsn))) { CLOG_LOG(WARN, "switch round if need failed", K(ret), KPC(this)); } else if (OB_FAIL(switch_piece_if_need_(file_id, scn, lsn))) { @@ -517,12 +515,11 @@ int ObLogArchivePieceContext::get_piece_(const SCN &scn, CLOG_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "get piece cost too much time", K(scn), K(lsn), KPC(this)); } - times++; - } - - if (OB_SUCC(ret) && ! done && times == MAX_RETRY_TIMES) { - ret = OB_ERR_TOO_MUCH_TIME; - CLOG_LOG(ERROR, "retry too much times", K(times), KPC(this)); + // threads consume archive may increase or decrease, if threads stop, just retry + if (! done && OB_SUCC(ret) && OB_NOT_NULL(&lib::Thread::current()) ? lib::Thread::current().has_set_stop() : false) { + ret = OB_EAGAIN; + CLOG_LOG(INFO, "thread stop, try again", K(id_), K(scn), K(lsn)); + } } return ret; diff --git a/src/logservice/restoreservice/ob_remote_log_iterator.ipp b/src/logservice/restoreservice/ob_remote_log_iterator.ipp index b82f173a5..7fb851c24 100644 --- a/src/logservice/restoreservice/ob_remote_log_iterator.ipp +++ b/src/logservice/restoreservice/ob_remote_log_iterator.ipp @@ -267,6 +267,13 @@ int ObRemoteLogIterator::next_entry_(LogEntryType &entry, LSN &lsn } } } + + // threads consume archive may increase or decrease, if threads stop, just retry + if (! done && OB_SUCC(ret) && OB_NOT_NULL(&lib::Thread::current()) ? lib::Thread::current().has_set_stop() : false) { + ret = OB_EAGAIN; + CLOG_LOG(INFO, "thread stop, try again", K(id_), K(lsn)); + } + } while (OB_SUCCESS == ret && ! done); if (OB_NEED_RETRY == ret) {