Fix archive livelock

This commit is contained in:
obdev
2023-02-24 14:08:07 +00:00
committed by ob-robot
parent 5babc03224
commit b6a4cddede
4 changed files with 68 additions and 31 deletions

View File

@ -299,6 +299,10 @@ void ObArchiveFetcher::do_thread_task_()
ARCHIVE_LOG(WARN, "handle single task failed", K(ret));
}
}
if (REACH_TIME_INTERVAL(10 * 1000 * 1000L)) {
ARCHIVE_LOG(INFO, "ObArchiveFetcher is running", "thread_index", get_thread_idx());
}
}
int ObArchiveFetcher::handle_single_task_()

View File

@ -250,36 +250,7 @@ void ObArchiveSender::do_thread_task_()
{
// try consume task
{
int ret = OB_SUCCESS;
ObArchiveSendTask *task = NULL;
bool task_exist = false;
TaskConsumeStatus consume_status = TaskConsumeStatus::INVALID;
// As task issued flag is marked, no matter task is handled succ or fail
// the flag should be dealed.
if (OB_FAIL(get_send_task_(task, task_exist))) {
ARCHIVE_LOG(WARN, "get send task failed", K(ret));
} else if (! task_exist) {
} else if (FALSE_IT(handle(*task, consume_status))) {
} else {
switch (consume_status) {
case TaskConsumeStatus::DONE:
break;
case TaskConsumeStatus::STALE_TASK:
task->mark_stale();
break;
case TaskConsumeStatus::NEED_RETRY:
if (! task->retire_task_with_retry()) {
ret = OB_ERR_UNEXPECTED;
ARCHIVE_LOG(ERROR, "retire task with retry failed", K(ret), KPC(task));
}
break;
default:
ret = OB_ERR_UNEXPECTED;
ARCHIVE_LOG(ERROR, "handle send_task status unexpected", K(consume_status), KPC(task));
task->mark_stale();
break;
}
}
(void)try_consume_send_task_();
}
// try free send task
@ -289,6 +260,55 @@ void ObArchiveSender::do_thread_task_()
ARCHIVE_LOG(WARN, "try free send task failed", K(ret));
}
}
if (REACH_TIME_INTERVAL(10 * 1000 * 1000L)) {
ARCHIVE_LOG(INFO, "ObArchiveSender is running", "thread_index", get_thread_idx());
}
}
int ObArchiveSender::try_consume_send_task_()
{
int ret = OB_SUCCESS;
const int64_t counts = std::max(1L, task_queue_.size());
for (int64_t i = 0; OB_SUCC(ret) && i < counts; i++) {
ret = do_consume_send_task_();
}
return ret;
}
int ObArchiveSender::do_consume_send_task_()
{
int ret = OB_SUCCESS;
ObArchiveSendTask *task = NULL;
bool task_exist = false;
TaskConsumeStatus consume_status = TaskConsumeStatus::INVALID;
// As task issued flag is marked, no matter task is handled succ or fail
// the flag should be dealed.
if (OB_FAIL(get_send_task_(task, task_exist))) {
ARCHIVE_LOG(WARN, "get send task failed", K(ret));
} else if (! task_exist) {
} else if (FALSE_IT(handle(*task, consume_status))) {
} else {
switch (consume_status) {
case TaskConsumeStatus::DONE:
break;
case TaskConsumeStatus::STALE_TASK:
task->mark_stale();
break;
case TaskConsumeStatus::NEED_RETRY:
if (! task->retire_task_with_retry()) {
ret = OB_ERR_UNEXPECTED;
ARCHIVE_LOG(ERROR, "retire task with retry failed", K(ret), KPC(task));
}
break;
default:
ret = OB_ERR_UNEXPECTED;
ARCHIVE_LOG(ERROR, "handle send_task status unexpected", K(consume_status), KPC(task));
task->mark_stale();
break;
}
}
return ret;
}
// only get task pointer, while task is still in task_status
@ -339,7 +359,7 @@ int ObArchiveSender::get_send_task_(ObArchiveSendTask *&task, bool &exist)
int ObArchiveSender::try_free_send_task_()
{
int ret = OB_SUCCESS;
const int64_t counts = std::max(1L, task_queue_.size()) + get_thread_count();
const int64_t counts = std::max(1L, task_queue_.size());
if (0 != get_thread_idx()) {
// only 0 thread affirm and free send task
} else {
@ -465,6 +485,11 @@ void ObArchiveSender::handle(ObArchiveSendTask &task, TaskConsumeStatus &consume
handle_archive_ret_code_(id, station.get_round(), ret);
// if encounter fail, sleep 100ms
if (OB_FAIL(ret)) {
ob_usleep(100 * 1000L);
}
DEBUG_SYNC(ARCHIVE_SENDER_HANDLE_TASK_DONE);
}

View File

@ -92,6 +92,10 @@ private:
void run1();
void do_thread_task_();
int try_consume_send_task_();
int do_consume_send_task_();
// 消费task status, 为日志流级别send_task队列, 目前为单线程消费单个日志流
int get_send_task_(ObArchiveSendTask *&task, bool &exist);

View File

@ -235,6 +235,10 @@ void ObRemoteFetchWorker::do_thread_task_()
LOG_WARN("try_consume_data_ failed", K(ret));
}
}
if (REACH_TIME_INTERVAL(10 * 1000 * 1000L)) {
LOG_INFO("ObRemoteFetchWorker is running", "thread_index", get_thread_idx());
}
}
int ObRemoteFetchWorker::handle_single_task_()