diff --git a/src/logservice/logfetcher/ob_log_fetcher.cpp b/src/logservice/logfetcher/ob_log_fetcher.cpp index bd3ed598ea..b06a884469 100644 --- a/src/logservice/logfetcher/ob_log_fetcher.cpp +++ b/src/logservice/logfetcher/ob_log_fetcher.cpp @@ -141,6 +141,7 @@ int ObLogFetcher::init( log_fetcher_user, cfg.idle_pool_thread_num, cfg, + static_cast(this), *err_handler, stream_worker_, start_lsn_locator_))) { diff --git a/src/logservice/logfetcher/ob_log_fetcher_idle_pool.cpp b/src/logservice/logfetcher/ob_log_fetcher_idle_pool.cpp index cb81c6bbbf..7f3e35eb1f 100644 --- a/src/logservice/logfetcher/ob_log_fetcher_idle_pool.cpp +++ b/src/logservice/logfetcher/ob_log_fetcher_idle_pool.cpp @@ -32,6 +32,8 @@ namespace logfetcher ObLogFetcherIdlePool::ObLogFetcherIdlePool() : inited_(false), tg_id_(-1), + fetcher_host_(nullptr), + log_fetcher_user_(LogFetcherUser::UNKNOWN), cfg_(nullptr), err_handler_(NULL), stream_worker_(NULL), @@ -48,6 +50,7 @@ int ObLogFetcherIdlePool::init( const LogFetcherUser &log_fetcher_user, const int64_t thread_num, const ObLogFetcherConfig &cfg, + void *fetcher_host, IObLogErrHandler &err_handler, IObLSWorker &stream_worker, IObLogStartLSNLocator &start_lsn_locator) @@ -63,6 +66,7 @@ int ObLogFetcherIdlePool::init( } else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::LogFetcherIdlePool, tg_id_))) { LOG_ERROR("TG_CREATE_TENANT failed", KR(ret), K(thread_num)); } else { + fetcher_host_ = fetcher_host; log_fetcher_user_ = log_fetcher_user; cfg_ = &cfg; err_handler_ = &err_handler; @@ -90,8 +94,10 @@ void ObLogFetcherIdlePool::destroy() err_handler_ = NULL; stream_worker_ = NULL; start_lsn_locator_ = NULL; + fetcher_host_ = nullptr; + log_fetcher_user_ = LogFetcherUser::UNKNOWN; - LOG_INFO("destroy fetcher idle pool succ"); + LOG_INFO("destroy fetcher idle pool success"); } int ObLogFetcherIdlePool::push(LSFetchCtx *task) diff --git a/src/logservice/logfetcher/ob_log_fetcher_idle_pool.h b/src/logservice/logfetcher/ob_log_fetcher_idle_pool.h index e034043921..caffbcb809 100644 --- a/src/logservice/logfetcher/ob_log_fetcher_idle_pool.h +++ b/src/logservice/logfetcher/ob_log_fetcher_idle_pool.h @@ -59,6 +59,7 @@ public: const LogFetcherUser &log_fetcher_user, const int64_t thread_num, const ObLogFetcherConfig &cfg, + void *fetcher_host, IObLogErrHandler &err_handler, IObLSWorker &stream_worker, IObLogStartLSNLocator &start_lsn_locator); @@ -86,6 +87,7 @@ private: private: bool inited_; int tg_id_; + void *fetcher_host_; LogFetcherUser log_fetcher_user_; const ObLogFetcherConfig *cfg_; IObLogErrHandler *err_handler_; diff --git a/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp b/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp index 54b2e8d983..5b64ef3ff9 100644 --- a/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp +++ b/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp @@ -851,7 +851,10 @@ int FetchStream::read_group_entry_( if (OB_NEED_RETRY == ret) { LOG_INFO("LogHander handle_group_entry failed, need_retry", KR(ret), K(tenant_id), K(ls_id), K(proposal_id), K(group_start_lsn), K(group_entry)); - } else if (OB_IN_STOP_STATE != ret) { + } else if (OB_IN_STOP_STATE == ret) { + LOG_INFO("LogHander handle_group_entry is stopped", KR(ret), K(tenant_id), K(ls_id), K(proposal_id), + K(group_start_lsn), K(group_entry)); + } else { LOG_ERROR("LogHander handle_group_entry failed", KR(ret), K(tenant_id), K(ls_id), K(proposal_id), K(group_start_lsn), K(group_entry)); if (OB_NOT_NULL(ls_fetch_ctx_)) { @@ -1148,12 +1151,16 @@ int FetchStream::handle_fetch_log_result_( is_stream_valid = false; stream_invalid_reason = "LogNotSync"; ret = OB_SUCCESS; - } else if (OB_NEED_RETRY == ret) { + } else if ((OB_NEED_RETRY == ret) || (OB_IN_STOP_STATE == ret)) { + // 1. OB_NEED_RETRY: handle_group_entry may return + // 2. OB_IN_STOP_STATE: handle_group_entry may return + // ... is_stream_valid = false; stream_invalid_reason = "NeedRetry"; if (OB_UNLIKELY(ls_fetch_ctx_->is_discarded())) { kickout_info.kick_out_reason_ = DISCARDED; + LOG_INFO("[STAT] [FETCH_STREAM] [RECYCLE_FETCH_TASK]", KPC(ls_fetch_ctx_)); } ret = OB_SUCCESS; } else if (OB_SUCCESS == ret) { @@ -1364,12 +1371,12 @@ int FetchStream::read_log_( decode_log_entry_time += (get_timestamp() - begin_time); if (OB_FAIL(read_group_entry_(group_entry, group_start_lsn, buffer, kick_out_info, tsi, stop_flag))) { - if (OB_IN_STOP_STATE != ret) { - if (OB_NEED_RETRY != ret) { - LOG_ERROR("read group entry failed", KR(ret), KPC_(ls_fetch_ctx)); - } - ls_fetch_ctx_->reset_memory_storage(); + if (OB_IN_STOP_STATE != ret && OB_NEED_RETRY != ret) { + LOG_ERROR("read group entry failed", KR(ret), KPC_(ls_fetch_ctx)); } + + // If failed, reset memory storage + ls_fetch_ctx_->reset_memory_storage(); } // update log process diff --git a/src/logservice/logfetcher/ob_ls_worker.cpp b/src/logservice/logfetcher/ob_ls_worker.cpp index a977b16a02..ff791aebe1 100644 --- a/src/logservice/logfetcher/ob_ls_worker.cpp +++ b/src/logservice/logfetcher/ob_ls_worker.cpp @@ -345,11 +345,15 @@ void ObLSWorker::handle(void *data, volatile bool &stop_flag) } } else if (OB_FAIL(task->handle(stop_flag))) { if (OB_IN_STOP_STATE != ret) { - int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(hibernate_stream_task(*task, "HandleTaskErr"))) { - LOG_ERROR_RET(tmp_ret, "hibernate_stream_task on handle task failure", K(task), KPC(task)); - } - LOG_ERROR("handle fetch stream task fail", KR(ret), K(task)); + LOG_ERROR("handle fetch stream task failed", KR(ret), K(task)); + } else { + LOG_INFO("handle fetch stream task is stopped", KR(ret), K(task)); + } + + LOG_INFO("handle fetch stream task failed, need to reschedule", KR(ret), K(task)); + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(hibernate_stream_task(*task, "HandleTaskErr"))) { + LOG_ERROR_RET(tmp_ret, "hibernate_stream_task on handle task failure", K(task), KPC(task)); } } else { // Can no longer continue with the task