[LogFetcher] Fix Invalid State Transition of LogFetcher
This commit is contained in:
committed by
ob-robot
parent
fcc68dce0c
commit
b5dcad8e1d
@ -734,6 +734,12 @@ int FetchLogARpc::handle_rpc_response(RpcRequest &rpc_req,
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
rpc_req.mark_flying_state(false);
|
||||
if (need_dispatch_stream_task || IDLE == state_) {
|
||||
// if need_dispatch_stream_task is true, generate_rpc_result_ must succeed, then state_ should be READY
|
||||
// which means there is a comsumable result. In this scenario, launch_async_rpc_ may fail.
|
||||
// So FetchStream only switch state to IDLE iff IDLE == state_
|
||||
if (IDLE == state_) {
|
||||
host_.switch_state(FetchStream::State::IDLE);
|
||||
}
|
||||
if (OB_TMP_FAIL(stream_worker_->dispatch_stream_task(host_, "FailPostProcess"))) {
|
||||
LOG_ERROR_RET(tmp_ret, "dispatch stream task fail", KR(ret));
|
||||
}
|
||||
|
||||
@ -353,6 +353,15 @@ void ObLSWorker::handle(void *data, volatile bool &stop_flag)
|
||||
|
||||
LOG_INFO("handle fetch stream task failed, need to reschedule", KR(ret), K(task));
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
// Switch the state of FetchStream to IDLE unconditionally, which means rpc request and rpc results would
|
||||
// be discarded.
|
||||
// 1. discard_request and handle_rpc_response are mutually exclusive.
|
||||
// 2. if discard_request executes before handle_rpc_response, then handle_rpc_response would abort.
|
||||
// 3. if handle_rpc_response executes discard_request, even if handle_rpc_response fails, it wouldn't
|
||||
// retry in handle_rpc_response, because LSWorker fails here and the state of FetchLogArpc couldn't be IDLE.
|
||||
// 4. the retry mechanism relys on that if LSWorker consume all the result and need to exit, the exit process
|
||||
// couldn't fail, otherwise the same FetchStream would be scheduled multiple times.
|
||||
task->switch_state(FetchStream::State::IDLE);
|
||||
if (OB_TMP_FAIL(hibernate_stream_task(*task, "HandleTaskErr"))) {
|
||||
LOG_ERROR_RET(tmp_ret, "hibernate_stream_task on handle task failure", K(task), KPC(task));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user