From e2bd3c55e57aacddc6a0524b6b0226ee3f4ec25a Mon Sep 17 00:00:00 2001 From: zxlzxlzxlzxlzxl Date: Tue, 19 Mar 2024 02:15:42 +0000 Subject: [PATCH] [LogFetcher] Fix the bug that fetchStream would never be scheduled if fallback retry process failed --- .../logfetcher/ob_log_fetch_log_rpc.cpp | 2 +- .../logfetcher/ob_log_fetcher_idle_pool.cpp | 1 + .../logfetcher/ob_log_ls_fetch_stream.cpp | 19 +++++++++- src/logservice/logfetcher/ob_ls_worker.cpp | 37 ++++++++++++------- src/logservice/logfetcher/ob_ls_worker.h | 8 ++-- 5 files changed, 47 insertions(+), 20 deletions(-) diff --git a/src/logservice/logfetcher/ob_log_fetch_log_rpc.cpp b/src/logservice/logfetcher/ob_log_fetch_log_rpc.cpp index f9f98b805..ac8caafaf 100644 --- a/src/logservice/logfetcher/ob_log_fetch_log_rpc.cpp +++ b/src/logservice/logfetcher/ob_log_fetch_log_rpc.cpp @@ -740,7 +740,7 @@ int FetchLogARpc::handle_rpc_response(RpcRequest &rpc_req, if (IDLE == state_) { host_.switch_state(FetchStream::State::IDLE); } - if (OB_TMP_FAIL(stream_worker_->dispatch_stream_task(host_, "FailPostProcess"))) { + if (OB_TMP_FAIL(stream_worker_->dispatch_stream_task(host_, "FailPostProcess", true))) { LOG_ERROR_RET(tmp_ret, "dispatch stream task fail", KR(ret)); } } diff --git a/src/logservice/logfetcher/ob_log_fetcher_idle_pool.cpp b/src/logservice/logfetcher/ob_log_fetcher_idle_pool.cpp index 7f3e35eb1..b27ab0b5d 100644 --- a/src/logservice/logfetcher/ob_log_fetcher_idle_pool.cpp +++ b/src/logservice/logfetcher/ob_log_fetcher_idle_pool.cpp @@ -157,6 +157,7 @@ void ObLogFetcherIdlePool::mark_stop_flag() } } +// TODO: consider retry on failure void ObLogFetcherIdlePool::handle(void *data, volatile bool &stop_flag) { int ret = OB_SUCCESS; diff --git a/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp b/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp index f6281793f..9d9a0ae25 100644 --- a/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp +++ b/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp @@ -246,9 +246,23 @@ int FetchStream::handle(volatile bool &stop_flag) return ret; } +#ifdef ERRSIM +ERRSIM_POINT_DEF(FAILED_TO_SCHEDULE_FETCH_STREAM); +#endif + int FetchStream::schedule(int timer_id) { - return TG_SCHEDULE(timer_id, *this, g_schedule_time, false); + int ret = OB_SUCCESS; +#ifdef ERRSIM + if (OB_FAIL(FAILED_TO_SCHEDULE_FETCH_STREAM)) { + LOG_ERROR("ERRSIM: failed to schedule fetch stream"); + } else { +#endif + ret = TG_SCHEDULE(timer_id, *this, g_schedule_time, false); +#ifdef ERRSIM + } +#endif + return ret; } // The purpose of a timed task is to assign itself to a worker thread @@ -264,7 +278,8 @@ void FetchStream::runTimerTask() if (OB_ISNULL(stream_worker_)) { LOG_ERROR("invalid stream worker", K(stream_worker_)); ret = OB_INVALID_ERROR; - } else if (OB_FAIL(stream_worker_->dispatch_stream_task(*this, "TimerWakeUp"))) { + // should never fail + } else if (OB_FAIL(stream_worker_->dispatch_stream_task(*this, "TimerWakeUp", true))) { LOG_ERROR("dispatch stream task fail", KR(ret), K(this)); } else { ATOMIC_STORE(&end_time, get_timestamp()); diff --git a/src/logservice/logfetcher/ob_ls_worker.cpp b/src/logservice/logfetcher/ob_ls_worker.cpp index 3f87da20d..0fe83a5f2 100644 --- a/src/logservice/logfetcher/ob_ls_worker.cpp +++ b/src/logservice/logfetcher/ob_ls_worker.cpp @@ -271,9 +271,10 @@ int ObLSWorker::dispatch_fetch_task(LSFetchCtx &task, const char *dispatch_reaso return ret; } -int ObLSWorker::dispatch_stream_task(FetchStream &task, const char *from_mod) +int ObLSWorker::dispatch_stream_task(FetchStream &task, const char *from_mod, const bool retry_until_succ) { int ret = OB_SUCCESS; + const int64_t RETRY_INTERVAL = 100L * 1000; // 100ms if (OB_UNLIKELY(! inited_)) { LOG_ERROR("not init", K(inited_)); @@ -291,16 +292,22 @@ int ObLSWorker::dispatch_stream_task(FetchStream &task, const char *from_mod) } // Rotating the task of fetching log streams to work threads - if (OB_FAIL(TG_PUSH_TASK(tg_id_, &task, hash_val))) { - LOG_ERROR("push stream task into thread queue fail", KR(ret)); - } + do { + if (OB_FAIL(TG_PUSH_TASK(tg_id_, &task, hash_val))) { + LOG_ERROR("push stream task into thread queue fail", KR(ret)); + if (retry_until_succ) { + ob_usleep(RETRY_INTERVAL); + } + } + } while (OB_FAIL(ret) && retry_until_succ); } return ret; } -int ObLSWorker::hibernate_stream_task(FetchStream &task, const char *from_mod) +int ObLSWorker::hibernate_stream_task(FetchStream &task, const char *from_mod, const bool retry_until_succ) { int ret = OB_SUCCESS; + const int64_t RETRY_INTERVAL = 100L * 1000; // 100ms bool print_stream_dispatch_info = ATOMIC_LOAD(&g_print_stream_dispatch_info); if (print_stream_dispatch_info) { @@ -310,12 +317,16 @@ int ObLSWorker::hibernate_stream_task(FetchStream &task, const char *from_mod) LOG_TRACE("[STAT] [STREAM_WORKER] [HIBERNATE_STREAM_TASK]", "task", &task, K(from_mod), K(task)); } - - if (OB_FAIL(task.schedule(timer_id_))) { - LOG_ERROR("schedule timer task fail", KR(ret)); - } else { - // success - } + do { + if (OB_FAIL(task.schedule(timer_id_))) { + LOG_ERROR("schedule timer task fail", KR(ret)); + if (retry_until_succ) { + ob_usleep(RETRY_INTERVAL); + } + } else { + // success + } + } while (OB_FAIL(ret) && retry_until_succ); return ret; } @@ -341,7 +352,7 @@ void ObLSWorker::handle(void *data, volatile bool &stop_flag) else if (OB_UNLIKELY(is_paused) && ! (task->is_sys_log_stream() || task->is_rpc_ready())) { LOG_TRACE("[STAT] [STREAM_WORKER] [HIBERNATE_STREAM_TASK_ON_PAUSE]", K(task)); - if (OB_FAIL(hibernate_stream_task(*task, "PausedFetcher"))) { + if (OB_FAIL(hibernate_stream_task(*task, "PausedFetcher", true))) { LOG_ERROR("hibernate_stream_task on pause fail", KR(ret), K(task), KPC(task)); } } else if (OB_FAIL(task->handle(stop_flag))) { @@ -362,7 +373,7 @@ void ObLSWorker::handle(void *data, volatile bool &stop_flag) // 4. the retry mechanism relys on that if LSWorker consume all the result and need to exit, the exit process // couldn't fail, otherwise the same FetchStream would be scheduled multiple times. task->switch_state(FetchStream::State::IDLE); - if (OB_TMP_FAIL(hibernate_stream_task(*task, "HandleTaskErr"))) { + if (OB_TMP_FAIL(hibernate_stream_task(*task, "HandleTaskErr", true))) { LOG_ERROR_RET(tmp_ret, "hibernate_stream_task on handle task failure", K(task), KPC(task)); } } else { diff --git a/src/logservice/logfetcher/ob_ls_worker.h b/src/logservice/logfetcher/ob_ls_worker.h index 495e65d0c..9243744b8 100644 --- a/src/logservice/logfetcher/ob_ls_worker.h +++ b/src/logservice/logfetcher/ob_ls_worker.h @@ -48,10 +48,10 @@ public: virtual int dispatch_fetch_task(LSFetchCtx &task, const char *dispatch_reason) = 0; // Putting the fetch log stream task into the work thread task pool - virtual int dispatch_stream_task(FetchStream &task, const char *from_mod) = 0; + virtual int dispatch_stream_task(FetchStream &task, const char *from_mod, const bool retry_until_succ = false) = 0; // Hibernate fetch log stream task - virtual int hibernate_stream_task(FetchStream &task, const char *from_mod) = 0; + virtual int hibernate_stream_task(FetchStream &task, const char *from_mod, const bool retry_until_succ = false) = 0; }; //////////////////////////////////////////// ObLSWorker //////////////////////////////////////////// @@ -93,8 +93,8 @@ public: int64_t get_fetcher_resume_tstamp(); int dispatch_fetch_task(LSFetchCtx &task, const char *dispatch_reason); - int dispatch_stream_task(FetchStream &task, const char *from_mod); - int hibernate_stream_task(FetchStream &task, const char *from_mod); + int dispatch_stream_task(FetchStream &task, const char *from_mod, const bool retry_until_succ = false); + int hibernate_stream_task(FetchStream &task, const char *from_mod, const bool retry_until_succ = false); public: virtual void handle(void *data) {}