[LogFetcher] Fix the bug that fetchStream would never be scheduled if fallback retry process failed
This commit is contained in:
parent
1f253e6fd0
commit
e2bd3c55e5
@ -740,7 +740,7 @@ int FetchLogARpc::handle_rpc_response(RpcRequest &rpc_req,
|
||||
if (IDLE == state_) {
|
||||
host_.switch_state(FetchStream::State::IDLE);
|
||||
}
|
||||
if (OB_TMP_FAIL(stream_worker_->dispatch_stream_task(host_, "FailPostProcess"))) {
|
||||
if (OB_TMP_FAIL(stream_worker_->dispatch_stream_task(host_, "FailPostProcess", true))) {
|
||||
LOG_ERROR_RET(tmp_ret, "dispatch stream task fail", KR(ret));
|
||||
}
|
||||
}
|
||||
|
@ -157,6 +157,7 @@ void ObLogFetcherIdlePool::mark_stop_flag()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: consider retry on failure
|
||||
void ObLogFetcherIdlePool::handle(void *data, volatile bool &stop_flag)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -246,9 +246,23 @@ int FetchStream::handle(volatile bool &stop_flag)
|
||||
return ret;
|
||||
}
|
||||
|
||||
#ifdef ERRSIM
|
||||
ERRSIM_POINT_DEF(FAILED_TO_SCHEDULE_FETCH_STREAM);
|
||||
#endif
|
||||
|
||||
int FetchStream::schedule(int timer_id)
|
||||
{
|
||||
return TG_SCHEDULE(timer_id, *this, g_schedule_time, false);
|
||||
int ret = OB_SUCCESS;
|
||||
#ifdef ERRSIM
|
||||
if (OB_FAIL(FAILED_TO_SCHEDULE_FETCH_STREAM)) {
|
||||
LOG_ERROR("ERRSIM: failed to schedule fetch stream");
|
||||
} else {
|
||||
#endif
|
||||
ret = TG_SCHEDULE(timer_id, *this, g_schedule_time, false);
|
||||
#ifdef ERRSIM
|
||||
}
|
||||
#endif
|
||||
return ret;
|
||||
}
|
||||
|
||||
// The purpose of a timed task is to assign itself to a worker thread
|
||||
@ -264,7 +278,8 @@ void FetchStream::runTimerTask()
|
||||
if (OB_ISNULL(stream_worker_)) {
|
||||
LOG_ERROR("invalid stream worker", K(stream_worker_));
|
||||
ret = OB_INVALID_ERROR;
|
||||
} else if (OB_FAIL(stream_worker_->dispatch_stream_task(*this, "TimerWakeUp"))) {
|
||||
// should never fail
|
||||
} else if (OB_FAIL(stream_worker_->dispatch_stream_task(*this, "TimerWakeUp", true))) {
|
||||
LOG_ERROR("dispatch stream task fail", KR(ret), K(this));
|
||||
} else {
|
||||
ATOMIC_STORE(&end_time, get_timestamp());
|
||||
|
@ -271,9 +271,10 @@ int ObLSWorker::dispatch_fetch_task(LSFetchCtx &task, const char *dispatch_reaso
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSWorker::dispatch_stream_task(FetchStream &task, const char *from_mod)
|
||||
int ObLSWorker::dispatch_stream_task(FetchStream &task, const char *from_mod, const bool retry_until_succ)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t RETRY_INTERVAL = 100L * 1000; // 100ms
|
||||
|
||||
if (OB_UNLIKELY(! inited_)) {
|
||||
LOG_ERROR("not init", K(inited_));
|
||||
@ -291,16 +292,22 @@ int ObLSWorker::dispatch_stream_task(FetchStream &task, const char *from_mod)
|
||||
}
|
||||
|
||||
// Rotating the task of fetching log streams to work threads
|
||||
if (OB_FAIL(TG_PUSH_TASK(tg_id_, &task, hash_val))) {
|
||||
LOG_ERROR("push stream task into thread queue fail", KR(ret));
|
||||
}
|
||||
do {
|
||||
if (OB_FAIL(TG_PUSH_TASK(tg_id_, &task, hash_val))) {
|
||||
LOG_ERROR("push stream task into thread queue fail", KR(ret));
|
||||
if (retry_until_succ) {
|
||||
ob_usleep(RETRY_INTERVAL);
|
||||
}
|
||||
}
|
||||
} while (OB_FAIL(ret) && retry_until_succ);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSWorker::hibernate_stream_task(FetchStream &task, const char *from_mod)
|
||||
int ObLSWorker::hibernate_stream_task(FetchStream &task, const char *from_mod, const bool retry_until_succ)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t RETRY_INTERVAL = 100L * 1000; // 100ms
|
||||
bool print_stream_dispatch_info = ATOMIC_LOAD(&g_print_stream_dispatch_info);
|
||||
|
||||
if (print_stream_dispatch_info) {
|
||||
@ -310,12 +317,16 @@ int ObLSWorker::hibernate_stream_task(FetchStream &task, const char *from_mod)
|
||||
LOG_TRACE("[STAT] [STREAM_WORKER] [HIBERNATE_STREAM_TASK]",
|
||||
"task", &task, K(from_mod), K(task));
|
||||
}
|
||||
|
||||
if (OB_FAIL(task.schedule(timer_id_))) {
|
||||
LOG_ERROR("schedule timer task fail", KR(ret));
|
||||
} else {
|
||||
// success
|
||||
}
|
||||
do {
|
||||
if (OB_FAIL(task.schedule(timer_id_))) {
|
||||
LOG_ERROR("schedule timer task fail", KR(ret));
|
||||
if (retry_until_succ) {
|
||||
ob_usleep(RETRY_INTERVAL);
|
||||
}
|
||||
} else {
|
||||
// success
|
||||
}
|
||||
} while (OB_FAIL(ret) && retry_until_succ);
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -341,7 +352,7 @@ void ObLSWorker::handle(void *data, volatile bool &stop_flag)
|
||||
else if (OB_UNLIKELY(is_paused) && ! (task->is_sys_log_stream() || task->is_rpc_ready())) {
|
||||
LOG_TRACE("[STAT] [STREAM_WORKER] [HIBERNATE_STREAM_TASK_ON_PAUSE]", K(task));
|
||||
|
||||
if (OB_FAIL(hibernate_stream_task(*task, "PausedFetcher"))) {
|
||||
if (OB_FAIL(hibernate_stream_task(*task, "PausedFetcher", true))) {
|
||||
LOG_ERROR("hibernate_stream_task on pause fail", KR(ret), K(task), KPC(task));
|
||||
}
|
||||
} else if (OB_FAIL(task->handle(stop_flag))) {
|
||||
@ -362,7 +373,7 @@ void ObLSWorker::handle(void *data, volatile bool &stop_flag)
|
||||
// 4. the retry mechanism relys on that if LSWorker consume all the result and need to exit, the exit process
|
||||
// couldn't fail, otherwise the same FetchStream would be scheduled multiple times.
|
||||
task->switch_state(FetchStream::State::IDLE);
|
||||
if (OB_TMP_FAIL(hibernate_stream_task(*task, "HandleTaskErr"))) {
|
||||
if (OB_TMP_FAIL(hibernate_stream_task(*task, "HandleTaskErr", true))) {
|
||||
LOG_ERROR_RET(tmp_ret, "hibernate_stream_task on handle task failure", K(task), KPC(task));
|
||||
}
|
||||
} else {
|
||||
|
@ -48,10 +48,10 @@ public:
|
||||
virtual int dispatch_fetch_task(LSFetchCtx &task, const char *dispatch_reason) = 0;
|
||||
|
||||
// Putting the fetch log stream task into the work thread task pool
|
||||
virtual int dispatch_stream_task(FetchStream &task, const char *from_mod) = 0;
|
||||
virtual int dispatch_stream_task(FetchStream &task, const char *from_mod, const bool retry_until_succ = false) = 0;
|
||||
|
||||
// Hibernate fetch log stream task
|
||||
virtual int hibernate_stream_task(FetchStream &task, const char *from_mod) = 0;
|
||||
virtual int hibernate_stream_task(FetchStream &task, const char *from_mod, const bool retry_until_succ = false) = 0;
|
||||
};
|
||||
|
||||
//////////////////////////////////////////// ObLSWorker ////////////////////////////////////////////
|
||||
@ -93,8 +93,8 @@ public:
|
||||
int64_t get_fetcher_resume_tstamp();
|
||||
|
||||
int dispatch_fetch_task(LSFetchCtx &task, const char *dispatch_reason);
|
||||
int dispatch_stream_task(FetchStream &task, const char *from_mod);
|
||||
int hibernate_stream_task(FetchStream &task, const char *from_mod);
|
||||
int dispatch_stream_task(FetchStream &task, const char *from_mod, const bool retry_until_succ = false);
|
||||
int hibernate_stream_task(FetchStream &task, const char *from_mod, const bool retry_until_succ = false);
|
||||
|
||||
public:
|
||||
virtual void handle(void *data) {}
|
||||
|
Loading…
x
Reference in New Issue
Block a user