diff --git a/src/logservice/archiveservice/ob_archive_fetcher.cpp b/src/logservice/archiveservice/ob_archive_fetcher.cpp index 921a947f9b..405d418fb5 100644 --- a/src/logservice/archiveservice/ob_archive_fetcher.cpp +++ b/src/logservice/archiveservice/ob_archive_fetcher.cpp @@ -299,6 +299,10 @@ void ObArchiveFetcher::do_thread_task_() ARCHIVE_LOG(WARN, "handle single task failed", K(ret)); } } + + if (REACH_TIME_INTERVAL(10 * 1000 * 1000L)) { + ARCHIVE_LOG(INFO, "ObArchiveFetcher is running", "thread_index", get_thread_idx()); + } } int ObArchiveFetcher::handle_single_task_() diff --git a/src/logservice/archiveservice/ob_archive_sender.cpp b/src/logservice/archiveservice/ob_archive_sender.cpp index c16aa71abf..30d346856d 100644 --- a/src/logservice/archiveservice/ob_archive_sender.cpp +++ b/src/logservice/archiveservice/ob_archive_sender.cpp @@ -250,36 +250,7 @@ void ObArchiveSender::do_thread_task_() { // try consume task { - int ret = OB_SUCCESS; - ObArchiveSendTask *task = NULL; - bool task_exist = false; - TaskConsumeStatus consume_status = TaskConsumeStatus::INVALID; - // As task issued flag is marked, no matter task is handled succ or fail - // the flag should be dealed. - if (OB_FAIL(get_send_task_(task, task_exist))) { - ARCHIVE_LOG(WARN, "get send task failed", K(ret)); - } else if (! task_exist) { - } else if (FALSE_IT(handle(*task, consume_status))) { - } else { - switch (consume_status) { - case TaskConsumeStatus::DONE: - break; - case TaskConsumeStatus::STALE_TASK: - task->mark_stale(); - break; - case TaskConsumeStatus::NEED_RETRY: - if (! task->retire_task_with_retry()) { - ret = OB_ERR_UNEXPECTED; - ARCHIVE_LOG(ERROR, "retire task with retry failed", K(ret), KPC(task)); - } - break; - default: - ret = OB_ERR_UNEXPECTED; - ARCHIVE_LOG(ERROR, "handle send_task status unexpected", K(consume_status), KPC(task)); - task->mark_stale(); - break; - } - } + (void)try_consume_send_task_(); } // try free send task @@ -289,6 +260,55 @@ void ObArchiveSender::do_thread_task_() ARCHIVE_LOG(WARN, "try free send task failed", K(ret)); } } + + if (REACH_TIME_INTERVAL(10 * 1000 * 1000L)) { + ARCHIVE_LOG(INFO, "ObArchiveSender is running", "thread_index", get_thread_idx()); + } +} + +int ObArchiveSender::try_consume_send_task_() +{ + int ret = OB_SUCCESS; + const int64_t counts = std::max(1L, task_queue_.size()); + for (int64_t i = 0; OB_SUCC(ret) && i < counts; i++) { + ret = do_consume_send_task_(); + } + return ret; +} + +int ObArchiveSender::do_consume_send_task_() +{ + int ret = OB_SUCCESS; + ObArchiveSendTask *task = NULL; + bool task_exist = false; + TaskConsumeStatus consume_status = TaskConsumeStatus::INVALID; + // As task issued flag is marked, no matter task is handled succ or fail + // the flag should be dealed. + if (OB_FAIL(get_send_task_(task, task_exist))) { + ARCHIVE_LOG(WARN, "get send task failed", K(ret)); + } else if (! task_exist) { + } else if (FALSE_IT(handle(*task, consume_status))) { + } else { + switch (consume_status) { + case TaskConsumeStatus::DONE: + break; + case TaskConsumeStatus::STALE_TASK: + task->mark_stale(); + break; + case TaskConsumeStatus::NEED_RETRY: + if (! task->retire_task_with_retry()) { + ret = OB_ERR_UNEXPECTED; + ARCHIVE_LOG(ERROR, "retire task with retry failed", K(ret), KPC(task)); + } + break; + default: + ret = OB_ERR_UNEXPECTED; + ARCHIVE_LOG(ERROR, "handle send_task status unexpected", K(consume_status), KPC(task)); + task->mark_stale(); + break; + } + } + return ret; } // only get task pointer, while task is still in task_status @@ -339,7 +359,7 @@ int ObArchiveSender::get_send_task_(ObArchiveSendTask *&task, bool &exist) int ObArchiveSender::try_free_send_task_() { int ret = OB_SUCCESS; - const int64_t counts = std::max(1L, task_queue_.size()) + get_thread_count(); + const int64_t counts = std::max(1L, task_queue_.size()); if (0 != get_thread_idx()) { // only 0 thread affirm and free send task } else { @@ -465,6 +485,11 @@ void ObArchiveSender::handle(ObArchiveSendTask &task, TaskConsumeStatus &consume handle_archive_ret_code_(id, station.get_round(), ret); + // if encounter fail, sleep 100ms + if (OB_FAIL(ret)) { + ob_usleep(100 * 1000L); + } + DEBUG_SYNC(ARCHIVE_SENDER_HANDLE_TASK_DONE); } diff --git a/src/logservice/archiveservice/ob_archive_sender.h b/src/logservice/archiveservice/ob_archive_sender.h index f30db0c3c3..ffee4f7282 100644 --- a/src/logservice/archiveservice/ob_archive_sender.h +++ b/src/logservice/archiveservice/ob_archive_sender.h @@ -92,6 +92,10 @@ private: void run1(); void do_thread_task_(); + int try_consume_send_task_(); + + int do_consume_send_task_(); + // 消费task status, 为日志流级别send_task队列, 目前为单线程消费单个日志流 int get_send_task_(ObArchiveSendTask *&task, bool &exist); diff --git a/src/logservice/restoreservice/ob_remote_fetch_log_worker.cpp b/src/logservice/restoreservice/ob_remote_fetch_log_worker.cpp index 378d8e9e2e..0f26550ac4 100644 --- a/src/logservice/restoreservice/ob_remote_fetch_log_worker.cpp +++ b/src/logservice/restoreservice/ob_remote_fetch_log_worker.cpp @@ -235,6 +235,10 @@ void ObRemoteFetchWorker::do_thread_task_() LOG_WARN("try_consume_data_ failed", K(ret)); } } + + if (REACH_TIME_INTERVAL(10 * 1000 * 1000L)) { + LOG_INFO("ObRemoteFetchWorker is running", "thread_index", get_thread_idx()); + } } int ObRemoteFetchWorker::handle_single_task_()