[Net Standby] Fix the Unit may be gc failed
This commit is contained in:
@ -141,6 +141,7 @@ int ObLogFetcher::init(
|
||||
log_fetcher_user,
|
||||
cfg.idle_pool_thread_num,
|
||||
cfg,
|
||||
static_cast<void *>(this),
|
||||
*err_handler,
|
||||
stream_worker_,
|
||||
start_lsn_locator_))) {
|
||||
|
@ -32,6 +32,8 @@ namespace logfetcher
|
||||
ObLogFetcherIdlePool::ObLogFetcherIdlePool() :
|
||||
inited_(false),
|
||||
tg_id_(-1),
|
||||
fetcher_host_(nullptr),
|
||||
log_fetcher_user_(LogFetcherUser::UNKNOWN),
|
||||
cfg_(nullptr),
|
||||
err_handler_(NULL),
|
||||
stream_worker_(NULL),
|
||||
@ -48,6 +50,7 @@ int ObLogFetcherIdlePool::init(
|
||||
const LogFetcherUser &log_fetcher_user,
|
||||
const int64_t thread_num,
|
||||
const ObLogFetcherConfig &cfg,
|
||||
void *fetcher_host,
|
||||
IObLogErrHandler &err_handler,
|
||||
IObLSWorker &stream_worker,
|
||||
IObLogStartLSNLocator &start_lsn_locator)
|
||||
@ -63,6 +66,7 @@ int ObLogFetcherIdlePool::init(
|
||||
} else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::LogFetcherIdlePool, tg_id_))) {
|
||||
LOG_ERROR("TG_CREATE_TENANT failed", KR(ret), K(thread_num));
|
||||
} else {
|
||||
fetcher_host_ = fetcher_host;
|
||||
log_fetcher_user_ = log_fetcher_user;
|
||||
cfg_ = &cfg;
|
||||
err_handler_ = &err_handler;
|
||||
@ -90,8 +94,10 @@ void ObLogFetcherIdlePool::destroy()
|
||||
err_handler_ = NULL;
|
||||
stream_worker_ = NULL;
|
||||
start_lsn_locator_ = NULL;
|
||||
fetcher_host_ = nullptr;
|
||||
log_fetcher_user_ = LogFetcherUser::UNKNOWN;
|
||||
|
||||
LOG_INFO("destroy fetcher idle pool succ");
|
||||
LOG_INFO("destroy fetcher idle pool success");
|
||||
}
|
||||
|
||||
int ObLogFetcherIdlePool::push(LSFetchCtx *task)
|
||||
|
@ -59,6 +59,7 @@ public:
|
||||
const LogFetcherUser &log_fetcher_user,
|
||||
const int64_t thread_num,
|
||||
const ObLogFetcherConfig &cfg,
|
||||
void *fetcher_host,
|
||||
IObLogErrHandler &err_handler,
|
||||
IObLSWorker &stream_worker,
|
||||
IObLogStartLSNLocator &start_lsn_locator);
|
||||
@ -86,6 +87,7 @@ private:
|
||||
private:
|
||||
bool inited_;
|
||||
int tg_id_;
|
||||
void *fetcher_host_;
|
||||
LogFetcherUser log_fetcher_user_;
|
||||
const ObLogFetcherConfig *cfg_;
|
||||
IObLogErrHandler *err_handler_;
|
||||
|
@ -851,7 +851,10 @@ int FetchStream::read_group_entry_(
|
||||
if (OB_NEED_RETRY == ret) {
|
||||
LOG_INFO("LogHander handle_group_entry failed, need_retry", KR(ret), K(tenant_id), K(ls_id), K(proposal_id),
|
||||
K(group_start_lsn), K(group_entry));
|
||||
} else if (OB_IN_STOP_STATE != ret) {
|
||||
} else if (OB_IN_STOP_STATE == ret) {
|
||||
LOG_INFO("LogHander handle_group_entry is stopped", KR(ret), K(tenant_id), K(ls_id), K(proposal_id),
|
||||
K(group_start_lsn), K(group_entry));
|
||||
} else {
|
||||
LOG_ERROR("LogHander handle_group_entry failed", KR(ret), K(tenant_id), K(ls_id), K(proposal_id),
|
||||
K(group_start_lsn), K(group_entry));
|
||||
if (OB_NOT_NULL(ls_fetch_ctx_)) {
|
||||
@ -1148,12 +1151,16 @@ int FetchStream::handle_fetch_log_result_(
|
||||
is_stream_valid = false;
|
||||
stream_invalid_reason = "LogNotSync";
|
||||
ret = OB_SUCCESS;
|
||||
} else if (OB_NEED_RETRY == ret) {
|
||||
} else if ((OB_NEED_RETRY == ret) || (OB_IN_STOP_STATE == ret)) {
|
||||
// 1. OB_NEED_RETRY: handle_group_entry may return
|
||||
// 2. OB_IN_STOP_STATE: handle_group_entry may return
|
||||
// ...
|
||||
is_stream_valid = false;
|
||||
stream_invalid_reason = "NeedRetry";
|
||||
|
||||
if (OB_UNLIKELY(ls_fetch_ctx_->is_discarded())) {
|
||||
kickout_info.kick_out_reason_ = DISCARDED;
|
||||
LOG_INFO("[STAT] [FETCH_STREAM] [RECYCLE_FETCH_TASK]", KPC(ls_fetch_ctx_));
|
||||
}
|
||||
ret = OB_SUCCESS;
|
||||
} else if (OB_SUCCESS == ret) {
|
||||
@ -1364,12 +1371,12 @@ int FetchStream::read_log_(
|
||||
decode_log_entry_time += (get_timestamp() - begin_time);
|
||||
|
||||
if (OB_FAIL(read_group_entry_(group_entry, group_start_lsn, buffer, kick_out_info, tsi, stop_flag))) {
|
||||
if (OB_IN_STOP_STATE != ret) {
|
||||
if (OB_NEED_RETRY != ret) {
|
||||
LOG_ERROR("read group entry failed", KR(ret), KPC_(ls_fetch_ctx));
|
||||
}
|
||||
ls_fetch_ctx_->reset_memory_storage();
|
||||
if (OB_IN_STOP_STATE != ret && OB_NEED_RETRY != ret) {
|
||||
LOG_ERROR("read group entry failed", KR(ret), KPC_(ls_fetch_ctx));
|
||||
}
|
||||
|
||||
// If failed, reset memory storage
|
||||
ls_fetch_ctx_->reset_memory_storage();
|
||||
}
|
||||
|
||||
// update log process
|
||||
|
@ -345,11 +345,15 @@ void ObLSWorker::handle(void *data, volatile bool &stop_flag)
|
||||
}
|
||||
} else if (OB_FAIL(task->handle(stop_flag))) {
|
||||
if (OB_IN_STOP_STATE != ret) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(hibernate_stream_task(*task, "HandleTaskErr"))) {
|
||||
LOG_ERROR_RET(tmp_ret, "hibernate_stream_task on handle task failure", K(task), KPC(task));
|
||||
}
|
||||
LOG_ERROR("handle fetch stream task fail", KR(ret), K(task));
|
||||
LOG_ERROR("handle fetch stream task failed", KR(ret), K(task));
|
||||
} else {
|
||||
LOG_INFO("handle fetch stream task is stopped", KR(ret), K(task));
|
||||
}
|
||||
|
||||
LOG_INFO("handle fetch stream task failed, need to reschedule", KR(ret), K(task));
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(hibernate_stream_task(*task, "HandleTaskErr"))) {
|
||||
LOG_ERROR_RET(tmp_ret, "hibernate_stream_task on handle task failure", K(task), KPC(task));
|
||||
}
|
||||
} else {
|
||||
// Can no longer continue with the task
|
||||
|
Reference in New Issue
Block a user