diff --git a/src/logservice/logfetcher/ob_log_fetch_log_rpc.cpp b/src/logservice/logfetcher/ob_log_fetch_log_rpc.cpp index 90c42c9f7d..9bb8caf2d5 100644 --- a/src/logservice/logfetcher/ob_log_fetch_log_rpc.cpp +++ b/src/logservice/logfetcher/ob_log_fetch_log_rpc.cpp @@ -734,6 +734,12 @@ int FetchLogARpc::handle_rpc_response(RpcRequest &rpc_req, int tmp_ret = OB_SUCCESS; rpc_req.mark_flying_state(false); if (need_dispatch_stream_task || IDLE == state_) { + // if need_dispatch_stream_task is true, generate_rpc_result_ must succeed, then state_ should be READY + // which means there is a comsumable result. In this scenario, launch_async_rpc_ may fail. + // So FetchStream only switch state to IDLE iff IDLE == state_ + if (IDLE == state_) { + host_.switch_state(FetchStream::State::IDLE); + } if (OB_TMP_FAIL(stream_worker_->dispatch_stream_task(host_, "FailPostProcess"))) { LOG_ERROR_RET(tmp_ret, "dispatch stream task fail", KR(ret)); } diff --git a/src/logservice/logfetcher/ob_ls_worker.cpp b/src/logservice/logfetcher/ob_ls_worker.cpp index e54a36cda0..3f87da20d2 100644 --- a/src/logservice/logfetcher/ob_ls_worker.cpp +++ b/src/logservice/logfetcher/ob_ls_worker.cpp @@ -353,6 +353,15 @@ void ObLSWorker::handle(void *data, volatile bool &stop_flag) LOG_INFO("handle fetch stream task failed, need to reschedule", KR(ret), K(task)); int tmp_ret = OB_SUCCESS; + // Switch the state of FetchStream to IDLE unconditionally, which means rpc request and rpc results would + // be discarded. + // 1. discard_request and handle_rpc_response are mutually exclusive. + // 2. if discard_request executes before handle_rpc_response, then handle_rpc_response would abort. + // 3. if handle_rpc_response executes discard_request, even if handle_rpc_response fails, it wouldn't + // retry in handle_rpc_response, because LSWorker fails here and the state of FetchLogArpc couldn't be IDLE. + // 4. the retry mechanism relys on that if LSWorker consume all the result and need to exit, the exit process + // couldn't fail, otherwise the same FetchStream would be scheduled multiple times. + task->switch_state(FetchStream::State::IDLE); if (OB_TMP_FAIL(hibernate_stream_task(*task, "HandleTaskErr"))) { LOG_ERROR_RET(tmp_ret, "hibernate_stream_task on handle task failure", K(task), KPC(task)); }