check thread stop in archive consumption

Co-authored-by: SanmuWangZJU <sanmuwang.ws@gmail.com>
This commit is contained in:
taoshuning 2023-11-16 12:44:57 +00:00 committed by ob-robot
parent 4ff2e7c873
commit 729902b847
5 changed files with 244 additions and 207 deletions

View File

@ -20,6 +20,7 @@
#include "logservice/restoreservice/ob_log_archive_piece_mgr.h"
#undef private
#include "fake_archive_piece_mgr.h"
#include "share/ob_thread_pool.h"
#include "share/scn.h"
namespace oceanbase
@ -27,208 +28,241 @@ namespace oceanbase
using namespace palf;
namespace unittest
{
TEST(FakeArchivePieceContext, get_piece)
volatile bool consume_finish = false;
class FakeWorker : public share::ObThreadPool
{
int ret = OB_SUCCESS;
const int64_t ONE_SECOND = 1000 * 1000 * 1000L;
const int64_t base_ts_1 = 1000 * ONE_SECOND;
share::SCN base_scn_1;
base_scn_1.convert_for_logservice(base_ts_1);
const int64_t piece_switch_interval = 10 * ONE_SECOND;
const int64_t ONE_MB = 1024 * 1024L;
FakeRounds rounds;
int64_t file_id = 1;
FakeArchiveComponent round1;
public:
int start()
{
round1.state_ = FakeRoundState::STOP;
round1.round_id_ = 2;
round1.start_scn_.convert_for_logservice(1005 * ONE_SECOND);
round1.end_scn_.convert_for_logservice(1051 * ONE_SECOND);
round1.base_piece_id_ = 1;
round1.piece_switch_interval_ = piece_switch_interval;
round1.base_piece_scn_ = base_scn_1;
round1.min_piece_id_ = 1;
round1.max_piece_id_ = 6;
for (int64_t i = round1.min_piece_id_; OB_SUCC(ret) && i <= round1.max_piece_id_; i++) {
FakePieceComponent piece;
{
if (i == 3) {
piece.state_ = FakePieceState::EMPTY;
piece.piece_id_ = i;
piece.min_lsn_ = LSN((file_id - 1) * 64 * ONE_MB + 10240);
piece.max_lsn_ = LSN((file_id - 1) * 64 * ONE_MB + 10240);
} else {
return share::ObThreadPool::start();
}
void stop()
{
share::ObThreadPool::stop();
}
void wait()
{
share::ObThreadPool::wait();
}
void run1()
{
int ret = OB_SUCCESS;
const int64_t ONE_SECOND = 1000 * 1000 * 1000L;
const int64_t base_ts_1 = 1000 * ONE_SECOND;
share::SCN base_scn_1;
base_scn_1.convert_for_logservice(base_ts_1);
const int64_t piece_switch_interval = 10 * ONE_SECOND;
const int64_t ONE_MB = 1024 * 1024L;
FakeRounds rounds;
int64_t file_id = 1;
FakeArchiveComponent round1;
{
round1.state_ = FakeRoundState::STOP;
round1.round_id_ = 2;
round1.start_scn_.convert_for_logservice(1005 * ONE_SECOND);
round1.end_scn_.convert_for_logservice(1051 * ONE_SECOND);
round1.base_piece_id_ = 1;
round1.piece_switch_interval_ = piece_switch_interval;
round1.base_piece_scn_ = base_scn_1;
round1.min_piece_id_ = 1;
round1.max_piece_id_ = 6;
for (int64_t i = round1.min_piece_id_; OB_SUCC(ret) && i <= round1.max_piece_id_; i++) {
FakePieceComponent piece;
{
if (i == 3) {
piece.state_ = FakePieceState::EMPTY;
piece.piece_id_ = i;
piece.min_lsn_ = LSN((file_id - 1) * 64 * ONE_MB + 10240);
piece.max_lsn_ = LSN((file_id - 1) * 64 * ONE_MB + 10240);
} else {
piece.state_ = FakePieceState::FRONZEN;
piece.piece_id_ = i;
piece.min_file_id_ = file_id;
file_id = file_id + 10;
piece.max_file_id_ = file_id;
piece.min_lsn_ = LSN((piece.min_file_id_ - 1) * 64 * ONE_MB + 10240); // p1(1, 11), p2(11, 21), p3(EMPTY), p4(21, 31), p5(31, 41), p6(41, 51)
piece.max_lsn_ = LSN((piece.max_file_id_ - 1) * 64 * ONE_MB + 10240);
}
}
ret = round1.array_.push_back(piece);
EXPECT_EQ(OB_SUCCESS, ret);
CLOG_LOG(INFO, "COME HERE print", K(piece), K(round1.round_id_));
}
}
ret = rounds.array_.push_back(round1);
EXPECT_EQ(OB_SUCCESS, ret);
FakeArchiveComponent round2;
{
round2.state_ = FakeRoundState::STOP;
round2.round_id_ = 4;
round2.start_scn_.convert_for_logservice(1051 * ONE_SECOND);
round2.end_scn_.convert_for_logservice(1095 * ONE_SECOND);
round2.base_piece_id_ = 2;
round2.piece_switch_interval_ = piece_switch_interval;
round2.base_piece_scn_ = base_scn_1;
round2.min_piece_id_ = 7;
round2.max_piece_id_ = 11;
for (int64_t i = round2.min_piece_id_ ; OB_SUCC(ret) && i <= round2.max_piece_id_; i++) {
FakePieceComponent piece;
{
piece.state_ = FakePieceState::FRONZEN;
piece.piece_id_ = i;
piece.min_file_id_ = file_id;
file_id = file_id + 10;
piece.max_file_id_ = file_id;
piece.min_lsn_ = LSN((piece.min_file_id_ - 1) * 64 * ONE_MB + 10240); // p1(1, 11), p2(11, 21), p3(EMPTY), p4(21, 31), p5(31, 41), p6(41, 51)
piece.min_lsn_ = LSN((piece.min_file_id_ - 1) * 64 * ONE_MB + 10240); // p7(51, 61), p8(61, 71), p9(71, 81), p9(81, 91), p10(91, 101), p11(101, 111)
piece.max_lsn_ = LSN((piece.max_file_id_ - 1) * 64 * ONE_MB + 10240);
}
ret = round2.array_.push_back(piece);
EXPECT_EQ(OB_SUCCESS, ret);
CLOG_LOG(INFO, "COME HERE print", K(piece), K(round2.round_id_));
}
ret = round1.array_.push_back(piece);
EXPECT_EQ(OB_SUCCESS, ret);
CLOG_LOG(INFO, "COME HERE print", K(piece), K(round1.round_id_));
}
}
ret = rounds.array_.push_back(round1);
EXPECT_EQ(OB_SUCCESS, ret);
ret = rounds.array_.push_back(round2);
EXPECT_EQ(OB_SUCCESS, ret);
FakeArchiveComponent round2;
{
round2.state_ = FakeRoundState::STOP;
round2.round_id_ = 4;
round2.start_scn_.convert_for_logservice(1051 * ONE_SECOND);
round2.end_scn_.convert_for_logservice(1095 * ONE_SECOND);
round2.base_piece_id_ = 2;
round2.piece_switch_interval_ = piece_switch_interval;
round2.base_piece_scn_ = base_scn_1;
round2.min_piece_id_ = 7;
round2.max_piece_id_ = 11;
for (int64_t i = round2.min_piece_id_ ; OB_SUCC(ret) && i <= round2.max_piece_id_; i++) {
FakeArchiveComponent round3;
{
round3.state_ = FakeRoundState::STOP;
round3.round_id_ = 5;
round3.start_scn_.convert_for_logservice(1200 * ONE_SECOND);
round3.end_scn_.convert_for_logservice(1295 * ONE_SECOND);
round3.base_piece_id_ = 2;
round3.piece_switch_interval_ = piece_switch_interval;
round3.base_piece_scn_ = base_scn_1;
round3.min_piece_id_ = 20;
round3.max_piece_id_ = 25;
for (int64_t i = round3.min_piece_id_; OB_SUCC(ret) && i < round3.max_piece_id_; i++) {
FakePieceComponent piece;
{
piece.state_ = FakePieceState::FRONZEN;
piece.piece_id_ = i;
piece.min_file_id_ = file_id;
file_id = file_id + 10;
piece.max_file_id_ = file_id;
piece.min_lsn_ = LSN((piece.min_file_id_ - 1) * 64 * ONE_MB + 10240); // p20(111, 121), p22(121, 131), p23(131, 141), p24(141, 151), p25(NO file)
piece.max_lsn_ = LSN((piece.max_file_id_ - 1) * 64 * ONE_MB + 10240);
}
ret = round3.array_.push_back(piece);
EXPECT_EQ(OB_SUCCESS, ret);
CLOG_LOG(INFO, "COME HERE print", K(piece), K(round3.round_id_));
}
FakePieceComponent piece;
{
piece.state_ = FakePieceState::FRONZEN;
piece.piece_id_ = i;
piece.min_file_id_ = file_id;
file_id = file_id + 10;
piece.max_file_id_ = file_id;
piece.min_lsn_ = LSN((piece.min_file_id_ - 1) * 64 * ONE_MB + 10240); // p7(51, 61), p8(61, 71), p9(71, 81), p9(81, 91), p10(91, 101), p11(101, 111)
piece.max_lsn_ = LSN((piece.max_file_id_ - 1) * 64 * ONE_MB + 10240);
}
ret = round2.array_.push_back(piece);
EXPECT_EQ(OB_SUCCESS, ret);
CLOG_LOG(INFO, "COME HERE print", K(piece), K(round2.round_id_));
}
}
ret = rounds.array_.push_back(round2);
EXPECT_EQ(OB_SUCCESS, ret);
FakeArchiveComponent round3;
{
round3.state_ = FakeRoundState::STOP;
round3.round_id_ = 5;
round3.start_scn_.convert_for_logservice(1200 * ONE_SECOND);
round3.end_scn_.convert_for_logservice(1295 * ONE_SECOND);
round3.base_piece_id_ = 2;
round3.piece_switch_interval_ = piece_switch_interval;
round3.base_piece_scn_ = base_scn_1;
round3.min_piece_id_ = 20;
round3.max_piece_id_ = 25;
for (int64_t i = round3.min_piece_id_; OB_SUCC(ret) && i < round3.max_piece_id_; i++) {
FakePieceComponent piece;
{
piece.state_ = FakePieceState::FRONZEN;
piece.piece_id_ = i;
piece.min_file_id_ = file_id;
file_id = file_id + 10;
piece.max_file_id_ = file_id;
piece.min_lsn_ = LSN((piece.min_file_id_ - 1) * 64 * ONE_MB + 10240); // p20(111, 121), p22(121, 131), p23(131, 141), p24(141, 151), p25(NO file)
piece.max_lsn_ = LSN((piece.max_file_id_ - 1) * 64 * ONE_MB + 10240);
piece.state_ = FakePieceState::ACTIVE;
piece.piece_id_ = round3.max_piece_id_;
piece.min_file_id_ = 0;
piece.max_file_id_ = 0;
}
ret = round3.array_.push_back(piece);
EXPECT_EQ(OB_SUCCESS, ret);
CLOG_LOG(INFO, "COME HERE print", K(piece), K(round3.round_id_));
}
FakePieceComponent piece;
{
piece.state_ = FakePieceState::ACTIVE;
piece.piece_id_ = round3.max_piece_id_;
piece.min_file_id_ = 0;
piece.max_file_id_ = 0;
}
ret = round3.array_.push_back(piece);
ret = rounds.array_.push_back(round3);
EXPECT_EQ(OB_SUCCESS, ret);
FakeArchivePieceContext archive_context;
FakeArchivePieceContext *context = &archive_context;
ret = archive_context.init(share::ObLSID(1001), &rounds);
EXPECT_EQ(OB_SUCCESS, ret);
int64_t log_ts = 1021 * ONE_SECOND;
share::SCN scn;
scn.convert_for_logservice(log_ts);
palf::LSN lsn(64 * ONE_MB + ONE_MB);
const int64_t ONE_PIECE_MB = 10 * 64 * ONE_MB;
int64_t dest_id = 0;
int64_t round_id = 0;
int64_t piece_id = 0;
int64_t cur_file_id = 0;
int64_t offset = 0;
bool to_newest = false;
palf::LSN max_lsn;
CLOG_LOG(INFO, "print get piece 1", K(lsn));
ret = context->get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(OB_SUCCESS, ret);
EXPECT_EQ(round_id, 2);
EXPECT_EQ(piece_id, 1);
EXPECT_EQ(cur_file_id, 2);
EXPECT_EQ(offset, 0);
lsn = LSN(23 * 64 * ONE_MB);
CLOG_LOG(INFO, "print get piece 2", K(lsn));
scn.convert_for_logservice(log_ts + 3);
ret = context->get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(OB_SUCCESS, ret);
EXPECT_EQ(round_id, 2);
EXPECT_EQ(piece_id, 4);
EXPECT_EQ(cur_file_id, 24);
EXPECT_EQ(offset, 0);
lsn = LSN(122 * 64 * ONE_MB);
CLOG_LOG(INFO, "print get piece 3", K(lsn));
scn.convert_for_logservice(log_ts+20);
ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(OB_ARCHIVE_ROUND_NOT_CONTINUOUS, ret);
archive_context.reset_locate_info();
log_ts = 1199 * ONE_SECOND;
CLOG_LOG(INFO, "print get piece 4", K(lsn));
scn.convert_for_logservice(log_ts);
ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(OB_SUCCESS, ret);
EXPECT_EQ(round_id, 5);
EXPECT_EQ(piece_id, 22);
EXPECT_EQ(cur_file_id, 123);
EXPECT_EQ(offset, 0);
int64_t fake_offset = 102400;
palf::LSN fake_lsn(102400);
ret = archive_context.update_file_info(dest_id, round_id, piece_id, cur_file_id, fake_offset, fake_lsn);
EXPECT_EQ(OB_SUCCESS, ret);
lsn = lsn + 102400;
CLOG_LOG(INFO, "print get piece 6", K(lsn));
ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(round_id, 5);
EXPECT_EQ(piece_id, 22);
EXPECT_EQ(cur_file_id, 123);
EXPECT_EQ(offset, fake_offset);
EXPECT_EQ(max_lsn, fake_lsn);
lsn = LSN(151 * 64 * ONE_MB + ONE_MB);
CLOG_LOG(INFO, "print get piece 7", K(lsn));
ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(OB_ITER_END, ret);
// bad case, current piece hang, can not backward piece
lsn = LSN(10 * 64 * ONE_MB);
CLOG_LOG(INFO, "print get piece 8", K(lsn));
ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND, ret);
archive_context.reset_locate_info();
log_ts = 1010;
scn.convert_for_logservice(log_ts);
CLOG_LOG(INFO, "print get piece 9", K(lsn));
ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(OB_SUCCESS, ret);
consume_finish = true;
}
ret = rounds.array_.push_back(round3);
EXPECT_EQ(OB_SUCCESS, ret);
};
FakeArchivePieceContext archive_context;
FakeArchivePieceContext *context = &archive_context;
ret = archive_context.init(share::ObLSID(1001), &rounds);
EXPECT_EQ(OB_SUCCESS, ret);
int64_t log_ts = 1021 * ONE_SECOND;
share::SCN scn;
scn.convert_for_logservice(log_ts);
palf::LSN lsn(64 * ONE_MB + ONE_MB);
const int64_t ONE_PIECE_MB = 10 * 64 * ONE_MB;
int64_t dest_id = 0;
int64_t round_id = 0;
int64_t piece_id = 0;
int64_t cur_file_id = 0;
int64_t offset = 0;
bool to_newest = false;
palf::LSN max_lsn;
CLOG_LOG(INFO, "print get piece 1", K(lsn));
ret = context->get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(OB_SUCCESS, ret);
EXPECT_EQ(round_id, 2);
EXPECT_EQ(piece_id, 1);
EXPECT_EQ(cur_file_id, 2);
EXPECT_EQ(offset, 0);
lsn = LSN(23 * 64 * ONE_MB);
CLOG_LOG(INFO, "print get piece 2", K(lsn));
scn.convert_for_logservice(log_ts + 3);
ret = context->get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(OB_SUCCESS, ret);
EXPECT_EQ(round_id, 2);
EXPECT_EQ(piece_id, 4);
EXPECT_EQ(cur_file_id, 24);
EXPECT_EQ(offset, 0);
lsn = LSN(122 * 64 * ONE_MB);
CLOG_LOG(INFO, "print get piece 3", K(lsn));
scn.convert_for_logservice(log_ts+20);
ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(OB_ARCHIVE_ROUND_NOT_CONTINUOUS, ret);
archive_context.reset_locate_info();
log_ts = 1199 * ONE_SECOND;
CLOG_LOG(INFO, "print get piece 4", K(lsn));
scn.convert_for_logservice(log_ts);
ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(OB_SUCCESS, ret);
EXPECT_EQ(round_id, 5);
EXPECT_EQ(piece_id, 22);
EXPECT_EQ(cur_file_id, 123);
EXPECT_EQ(offset, 0);
int64_t fake_offset = 102400;
palf::LSN fake_lsn(102400);
ret = archive_context.update_file_info(dest_id, round_id, piece_id, cur_file_id, fake_offset, fake_lsn);
EXPECT_EQ(OB_SUCCESS, ret);
lsn = lsn + 102400;
CLOG_LOG(INFO, "print get piece 6", K(lsn));
ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(round_id, 5);
EXPECT_EQ(piece_id, 22);
EXPECT_EQ(cur_file_id, 123);
EXPECT_EQ(offset, fake_offset);
EXPECT_EQ(max_lsn, fake_lsn);
lsn = LSN(151 * 64 * ONE_MB + ONE_MB);
CLOG_LOG(INFO, "print get piece 7", K(lsn));
ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(OB_ITER_END, ret);
// bad case, current piece hang, can not backward piece
lsn = LSN(10 * 64 * ONE_MB);
CLOG_LOG(INFO, "print get piece 8", K(lsn));
ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND, ret);
archive_context.reset_locate_info();
log_ts = 1010;
scn.convert_for_logservice(log_ts);
CLOG_LOG(INFO, "print get piece 9", K(lsn));
ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
EXPECT_EQ(OB_SUCCESS, ret);
TEST(FakeArchivePieceContext, get_piece)
{
FakeWorker worker;
worker.start();
while (! consume_finish) {
sleep(1);
}
worker.stop();
worker.wait();
}
}
}

View File

@ -73,7 +73,7 @@ int ObLSWorker::init(const int64_t worker_thread_num,
LOG_ERROR("init timer fail", KR(ret), K(max_timer_task_count));
}
// Initializing the thread pool
else if (OB_FAIL(StreamWorkerThread::init(worker_thread_num,
else if (OB_FAIL(ObMapQueueThreadPool::init(OB_SERVER_TENANT_ID, worker_thread_num,
ObModIds::OB_LS_WORKER_THREAD))) {
LOG_ERROR("init worker thread fail", KR(ret), K(worker_thread_num));
} else {
@ -103,7 +103,7 @@ void ObLSWorker::destroy()
inited_ = false;
stream_paused_ = false;
fetcher_resume_time_ = OB_INVALID_TIMESTAMP;
StreamWorkerThread::destroy();
ObMapQueueThreadPool::destroy();
timer_.destroy();
fetcher_host_ = nullptr;
idle_pool_ = NULL;
@ -123,7 +123,7 @@ int ObLSWorker::start()
ret = OB_NOT_INIT;
} else if (OB_FAIL(timer_.start())) {
LOG_ERROR("start timer thread fail", KR(ret));
} else if (OB_FAIL(StreamWorkerThread::start())) {
} else if (OB_FAIL(ObMapQueueThreadPool::start())) {
LOG_ERROR("start stream worker fail", KR(ret));
} else {
LOG_INFO("start stream worker succ");
@ -137,7 +137,7 @@ void ObLSWorker::stop()
LOG_INFO("stop stream worker begin");
mark_stop_flag();
timer_.stop();
StreamWorkerThread::stop();
ObMapQueueThreadPool::stop();
LOG_INFO("stop stream worker succ");
}
}
@ -146,7 +146,7 @@ void ObLSWorker::mark_stop_flag()
{
LOG_INFO("stream worker mark_stop_flag begin");
timer_.mark_stop_flag();
StreamWorkerThread::mark_stop_flag();
ObMapQueueThreadPool::has_set_stop() = true;
LOG_INFO("stream worker mark_stop_flag end");
}
@ -289,7 +289,7 @@ int ObLSWorker::dispatch_stream_task(FetchStream &task, const char *from_mod)
}
// Rotating the task of fetching log streams to work threads
if (OB_FAIL(StreamWorkerThread::push(&task, hash_val))) {
if (OB_FAIL(ObMapQueueThreadPool::push(&task, hash_val))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("push stream task into thread queue fail", KR(ret));
}
@ -320,8 +320,7 @@ int ObLSWorker::hibernate_stream_task(FetchStream &task, const char *from_mod)
}
// hendle function for thread pool
int ObLSWorker::handle(void *data,
const int64_t thread_index,
void ObLSWorker::handle(void *data,
volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
@ -329,11 +328,11 @@ int ObLSWorker::handle(void *data,
FetchStream *task = static_cast<FetchStream *>(data);
ObLogTraceIdGuard trace_guard;
LOG_DEBUG("[STAT] [STREAM_WORKER] [HANDLE_STREAM_TASK]", K_(stream_paused), K(thread_index),
LOG_DEBUG("[STAT] [STREAM_WORKER] [HANDLE_STREAM_TASK]", K_(stream_paused), "thread_index", get_thread_idx(),
K(task), KPC(task));
if (OB_ISNULL(task)) {
LOG_ERROR("invalid task", K(task), K(thread_index));
LOG_ERROR("invalid task", K(task), "thread_index", get_thread_idx());
ret = OB_INVALID_ARGUMENT;
}
// If the stream task is currently suspended, the task is put to sleep
@ -353,7 +352,7 @@ int ObLSWorker::handle(void *data,
// Can no longer continue with the task
}
if (0 == thread_index) {
if (0 == get_thread_idx()) {
if (REACH_TIME_INTERVAL(STAT_INTERVAL)) {
print_stat_();
}
@ -361,9 +360,9 @@ int ObLSWorker::handle(void *data,
if (OB_SUCCESS != ret && OB_IN_STOP_STATE != ret && OB_NOT_NULL(err_handler_)) {
err_handler_->handle_error(ret, "stream worker exits on error, err=%d, thread_index=%ld",
ret, thread_index);
ret, get_thread_idx());
ObMapQueueThreadPool::has_set_stop() = true; // mark thread pool stop;
}
return ret;
}
void ObLSWorker::configure(const ObLogConfig & config)

View File

@ -20,12 +20,13 @@
#include "lib/lock/ob_spin_lock.h" // ObSpinLock
#include "ob_log_config.h" // ObLogConfig
#include "ob_map_queue_thread.h" // ObMapQueueThread
#include "lib/thread/ob_map_queue_thread_pool.h"// ObMapQueueThreadPool
#include "ob_log_timer.h" // ObLogFixedTimer
#include "ob_log_ls_fetch_stream.h" // FetchStream
namespace oceanbase
{
using namespace common;
namespace libobcdc
{
@ -62,9 +63,7 @@ class IObLogFetcherDeadPool;
class IObLogSvrFinder;
class IObLogErrHandler;
typedef common::ObMapQueueThread<IObLSWorker::MAX_THREAD_NUM> StreamWorkerThread;
class ObLSWorker : public IObLSWorker, public StreamWorkerThread
class ObLSWorker : public IObLSWorker, public ObMapQueueThreadPool
{
static const int64_t STAT_INTERVAL = 5 * _SEC_;
@ -101,7 +100,8 @@ public:
public:
// Overloading thread handling functions
virtual int handle(void *data, const int64_t thread_index, volatile bool &stop_flag);
// virtual int handle(void *data, const int64_t thread_index, volatile bool &stop_flag);
virtual void handle(void *data, volatile bool &stop_flag) override;
public:
static void configure(const ObLogConfig & config);

View File

@ -497,9 +497,7 @@ int ObLogArchivePieceContext::get_piece_(const SCN &scn,
{
int ret = OB_SUCCESS;
bool done = false;
int64_t times = 0;
const int64_t MAX_RETRY_TIMES = 100;
while (OB_SUCC(ret) && ! done && times < MAX_RETRY_TIMES) {
while (OB_SUCC(ret) && ! done) {
if (OB_FAIL(switch_round_if_need_(scn, lsn))) {
CLOG_LOG(WARN, "switch round if need failed", K(ret), KPC(this));
} else if (OB_FAIL(switch_piece_if_need_(file_id, scn, lsn))) {
@ -517,12 +515,11 @@ int ObLogArchivePieceContext::get_piece_(const SCN &scn,
CLOG_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "get piece cost too much time", K(scn), K(lsn), KPC(this));
}
times++;
}
if (OB_SUCC(ret) && ! done && times == MAX_RETRY_TIMES) {
ret = OB_ERR_TOO_MUCH_TIME;
CLOG_LOG(ERROR, "retry too much times", K(times), KPC(this));
// threads consume archive may increase or decrease, if threads stop, just retry
if (! done && OB_SUCC(ret) && OB_NOT_NULL(&lib::Thread::current()) ? lib::Thread::current().has_set_stop() : false) {
ret = OB_EAGAIN;
CLOG_LOG(INFO, "thread stop, try again", K(id_), K(scn), K(lsn));
}
}
return ret;

View File

@ -267,6 +267,13 @@ int ObRemoteLogIterator<LogEntryType>::next_entry_(LogEntryType &entry, LSN &lsn
}
}
}
// threads consume archive may increase or decrease, if threads stop, just retry
if (! done && OB_SUCC(ret) && OB_NOT_NULL(&lib::Thread::current()) ? lib::Thread::current().has_set_stop() : false) {
ret = OB_EAGAIN;
CLOG_LOG(INFO, "thread stop, try again", K(id_), K(lsn));
}
} while (OB_SUCCESS == ret && ! done);
if (OB_NEED_RETRY == ret) {