[Physical Standby] Modify the log level and Fixed an issue with high CPU usage when removed the primary tenant

This commit is contained in:
obdev
2023-05-22 10:17:12 +00:00
committed by ob-robot
parent 190faa5a4f
commit 4e0d79e6c6
11 changed files with 101 additions and 78 deletions

View File

@ -387,7 +387,7 @@ int FetchLogARpc::prepare_request(const share::ObLSID &ls_id,
"flying_rpc", flying_req_list_.count_,
K(rpc_timeout));
} else {
LOG_DEBUG("[STAT] [FETCH_LOG_ARPC] prepare rpc request", K(fetch_stream),
LOG_TRACE("[STAT] [FETCH_LOG_ARPC] prepare rpc request", K(fetch_stream),
"ready_result", res_queue_.count(),
"flying_rpc", flying_req_list_.count_,
K(rpc_timeout));
@ -431,7 +431,7 @@ void FetchLogARpc::discard_request(const char *discard_reason, const bool is_nor
"ready_result", res_queue_.count(),
"flying_rpc", flying_req_list_.count_);
} else {
LOG_DEBUG("[STAT] [FETCH_LOG_ARPC] discard rpc request", K(discard_reason),
LOG_TRACE("[STAT] [FETCH_LOG_ARPC] discard rpc request", K(discard_reason),
K(fetch_stream), K_(cur_req), KPC_(cur_req),
"ready_result", res_queue_.count(),
"flying_rpc", flying_req_list_.count_);
@ -862,7 +862,7 @@ void FetchLogARpc::print_handle_info_(RpcRequest &rpc_req,
"delta", next_upper_limit - req_upper_limit,
K(rpc_time), KPC(resp));
} else {
LOG_DEBUG("handle rpc result by rpc callback",
LOG_TRACE("handle rpc result by rpc callback",
K(fetch_stream),
K(need_stop_rpc),
"stop_reason", print_rpc_stop_reason(rpc_stop_reason),
@ -897,7 +897,7 @@ int FetchLogARpc::launch_async_rpc_(RpcRequest &rpc_req,
// Use the trace id of the request
ObLogTraceIdGuard guard(rpc_req.get_trace_id());
_LOG_DEBUG("launch async fetch log rpc by %s, request=%s",
_LOG_TRACE("launch async fetch log rpc by %s, request=%s",
launch_by_cb ? "callback" : "fetch stream", to_cstring(rpc_req));
// The default setting is flyin before sending an asynchronous request

View File

@ -100,7 +100,7 @@ int ObLogFetcherDeadPool::push(LSFetchCtx *task)
} else {
task->dispatch_in_dead_pool();
LOG_DEBUG("[STAT] [DEAD_POOL] [DISPATCH_IN]", K(task), KPC(task));
LOG_TRACE("[STAT] [DEAD_POOL] [DISPATCH_IN]", K(task), KPC(task));
if (OB_FAIL(TG_PUSH_TASK(tg_id_, task, task->hash()))) {
LOG_ERROR("push task into thread queue fail", KR(ret), K(task));
@ -218,7 +218,7 @@ int ObLogFetcherDeadPool::handle_task_list_(
// sleep
ob_usleep(IDLE_WAIT_TIME);
// The mission is still in use
LOG_DEBUG("[STAT] [DEAD_POOL] [TASK_IN_USE]", K(ls_fetch_ctx));
LOG_TRACE("[STAT] [DEAD_POOL] [TASK_IN_USE]", K(ls_fetch_ctx));
}
} // while
}

View File

@ -107,7 +107,7 @@ int ObLogFetcherIdlePool::push(LSFetchCtx *task)
} else {
task->dispatch_in_idle_pool();
LOG_DEBUG("[STAT] [IDLE_POOL] [DISPATCH_IN]", K(task), KPC(task));
LOG_TRACE("[STAT] [IDLE_POOL] [DISPATCH_IN]", K(task), KPC(task));
if (OB_FAIL(TG_PUSH_TASK(tg_id_, task, task->hash()))) {
LOG_ERROR("push task into thread queue fail", KR(ret), K(task));
@ -167,7 +167,7 @@ void ObLogFetcherIdlePool::handle(void *data, volatile bool &stop_flag)
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls_fetch_ctx is nullptt", KR(ret), K(thread_index), K(get_thread_cnt()));
} else {
LOG_INFO("fetcher idle pool thread start", K(thread_index), "tls_id", ls_fetch_ctx->get_tls_id());
LOG_TRACE("fetcher idle pool thread start", K(thread_index), "tls_id", ls_fetch_ctx->get_tls_id());
if (OB_FAIL(do_request_(thread_index, *ls_fetch_ctx))) {
if (OB_IN_STOP_STATE != ret) {
@ -205,8 +205,7 @@ int ObLogFetcherIdlePool::do_request_(const int64_t thread_index, LSFetchCtx &ls
if (OB_FAIL(handle_task_(&ls_fetch_ctx, need_dispatch))) {
LOG_ERROR("handle task fail", KR(ret), K(ls_fetch_ctx));
} else if (need_dispatch) {
LOG_DEBUG("[STAT] [IDLE_POOL] [DISPATCH_OUT]", K(ls_fetch_ctx), K(thread_index),
K(ls_fetch_ctx));
LOG_TRACE("[STAT] [IDLE_POOL] [DISPATCH_OUT]", K(thread_index), K(ls_fetch_ctx));
const char *dispatch_reason = "SvrListReady";
if (OB_FAIL(stream_worker_->dispatch_fetch_task(ls_fetch_ctx, dispatch_reason))) {
@ -254,6 +253,11 @@ int ObLogFetcherIdlePool::handle_task_(LSFetchCtx *task, bool &need_dispatch)
} else {
need_dispatch = false;
const bool enable_continue_use_cache_server_list = (1 == cfg_->enable_continue_use_cache_server_list);
const int64_t dispatched_count_from_idle_to_idle = task->get_dispatched_count_from_idle_to_idle();
if (1 == (dispatched_count_from_idle_to_idle % IDLE_HANDLE_COUNT)) {
ob_usleep(IDLE_WAIT_TIME);
}
if (OB_SUCCESS == ret) {
// Update the server list
@ -262,10 +266,12 @@ int ObLogFetcherIdlePool::handle_task_(LSFetchCtx *task, bool &need_dispatch)
if (OB_FAIL(task->update_svr_list())) {
LOG_ERROR("update server list fail", KR(ret), KPC(task));
}
}
} else if (is_standby(log_fetcher_user_)) {
need_dispatch = true;
} else if (is_cdc(log_fetcher_user_)) {
// locate the start LSN
// Requires a successful location to leave the idle pool
else if (task->need_locate_start_lsn()) {
if (task->need_locate_start_lsn()) {
if (is_standby(log_fetcher_user_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("LS need to specifies start_lsn in the Physical standby", KR(ret), KPC(task));
@ -283,11 +289,15 @@ int ObLogFetcherIdlePool::handle_task_(LSFetchCtx *task, bool &need_dispatch)
// After all the above conditions are met, allow distribution to the fetch log stream
need_dispatch = true;
}
} else {
ret = OB_NOT_SUPPORTED;
LOG_ERROR("not support", KR(ret), K(log_fetcher_user_), KPC(task));
}
}
if (enable_continue_use_cache_server_list) {
need_dispatch = true;
LOG_DEBUG("enable_continue_use_cache_server_list", KPC(task), K(need_dispatch));
LOG_TRACE("enable_continue_use_cache_server_list", KPC(task), K(need_dispatch));
}
}

View File

@ -79,6 +79,7 @@ public:
private:
static const int64_t IDLE_WAIT_TIME = 100 * 1000;
static const int64_t IDLE_HANDLE_COUNT = 10;
int do_request_(const int64_t thread_index, LSFetchCtx &ls_fetch_ctx);
int handle_task_(LSFetchCtx *task, bool &need_dispatch);

View File

@ -111,8 +111,6 @@ void LSFetchCtx::reset()
progress_.reset();
start_parameters_.reset();
fetch_info_.reset();
//svr_list_need_update_ = true;
//TODO tmp test
svr_list_need_update_ = false;
start_lsn_locate_req_.reset();
end_lsn_locate_req_.reset();
@ -271,7 +269,7 @@ int LSFetchCtx::append_log(const char *buf, const int64_t buf_len)
} else if (OB_FAIL(group_iterator_.reuse(start_lsn))) {
LOG_ERROR("MemPalfBufferIterator resuse failed", KR(ret), K_(tls_id), K(start_lsn));
} else {
LOG_DEBUG("mem_storage_ init and MemPalfBufferIterator resuse succ", K_(tls_id), K(start_lsn));
LOG_TRACE("mem_storage_ init and MemPalfBufferIterator resuse succ", K_(tls_id), K(start_lsn));
}
}
@ -281,7 +279,7 @@ int LSFetchCtx::append_log(const char *buf, const int64_t buf_len)
K_(mem_storage), K_(group_iterator));
} else {
ATOMIC_AAF(&fetched_log_size_, buf_len);
LOG_DEBUG("append_log succ", K(buf_len), KPC(this));
LOG_TRACE("append_log succ", K(buf_len), KPC(this));
}
}
@ -377,7 +375,7 @@ int LSFetchCtx::update_progress(
K(group_entry), K(progress_));
}
LOG_DEBUG("read log and update progress success", K_(tls_id), K(group_entry), K_(progress));
LOG_TRACE("read log and update progress success", K_(tls_id), K(group_entry), K_(progress));
}
}
@ -451,13 +449,14 @@ bool LSFetchCtx::need_update_svr_list()
{
int ret = OB_SUCCESS;
bool bool_ret = false;
int64_t cur_time = get_timestamp();
const int64_t cur_time = get_timestamp();
int64_t avail_svr_count = 0;
logservice::ObLogRouteService *log_route_service = nullptr;
if (is_direct_fetching_mode(fetching_mode_)) {
bool_ret = false;
} else if(is_integrated_fetching_mode(fetching_mode_)) {
if (REACH_TIME_INTERVAL_THREAD_LOCAL(SERVER_LIST_UPDATE_INTERVAL_SEC)) {
if (OB_FAIL(get_log_route_service_(log_route_service))) {
LOG_ERROR("get_log_route_service_ failed", KR(ret));
} else if (OB_FAIL(log_route_service->get_server_count(tls_id_.get_tenant_id(), tls_id_.get_ls_id(),
@ -469,17 +468,17 @@ bool LSFetchCtx::need_update_svr_list()
}
} else {
// If no server is available, or if a proactive update is requested, an update is required
// if (avail_svr_count <= 0 || svr_list_need_update_) {
if (avail_svr_count <= 0) {
if (avail_svr_count <= 0 || svr_list_need_update_) {
bool_ret = true;
}
}
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ls_fetch_ctx has invalid fetching mode", KR(ret), KPC(this), K(tls_id_), K_(fetching_mode));
}
LOG_DEBUG("need_update_svr_list", K(bool_ret), KR(ret), K(tls_id_),
LOG_TRACE("need_update_svr_list", KR(ret), K(bool_ret), K(tls_id_),
K(svr_list_need_update_), K(avail_svr_count));
return bool_ret;
@ -491,7 +490,7 @@ bool LSFetchCtx::need_locate_start_lsn() const
bool_ret = ! (progress_.get_next_lsn().is_valid());
LOG_DEBUG("need_locate_start_lsn", K(tls_id_), K(bool_ret), K(progress_));
LOG_TRACE("need_locate_start_lsn", K(tls_id_), K(bool_ret), K(progress_));
return bool_ret;
}
@ -504,7 +503,7 @@ bool LSFetchCtx::need_locate_end_lsn() const
bool_ret = (OB_INVALID_TIMESTAMP != start_parameters_.get_end_tstamp_ns())
&& ! start_parameters_.get_end_lsn().is_valid();
LOG_DEBUG("need_locate_end_lsn", K(tls_id_), K(bool_ret), K(start_parameters_));
LOG_TRACE("need_locate_end_lsn", K(tls_id_), K(bool_ret), K(start_parameters_));
return bool_ret;
}
@ -524,7 +523,7 @@ int LSFetchCtx::update_svr_list(const bool need_print_info)
LOG_ERROR("ObLogRouteService async_server_query_req failed", KR(ret), K(tls_id_));
}
} else {
LOG_DEBUG("async_server_query_req succ", K_(tls_id));
LOG_TRACE("async_server_query_req succ", K_(tls_id));
}
return ret;
@ -825,7 +824,7 @@ int LSFetchCtx::check_fetch_timeout(const common::ObAddr &svr,
"last_update_tstamp", TS_TO_STR(last_update_tstamp),
"svr_start_fetch_tstamp", TS_TO_STR(svr_start_fetch_tstamp));
} else {
LOG_DEBUG("[CHECK_PROGRESS_TIMEOUT]", K_(tls_id), K(svr),
LOG_TRACE("[CHECK_PROGRESS_TIMEOUT]", K_(tls_id), K(svr),
K(is_fetch_timeout), K(is_fetch_timeout_on_lagged_replica),
K(progress_update_interval),
K(progress_),
@ -1118,6 +1117,7 @@ void LSFetchCtx::FetchInfo::reset()
cur_mod_.reset();
out_mod_.reset();
out_reason_ = "NONE";
dispatched_count_from_idle_to_idle_ = 0;
}
void LSFetchCtx::FetchInfo::dispatch_in_idle_pool()
@ -1138,6 +1138,10 @@ void LSFetchCtx::FetchInfo::dispatch_in_dead_pool()
void LSFetchCtx::FetchInfo::dispatch_out(const char *reason)
{
out_mod_ = cur_mod_;
if (is_from_idle_to_idle()) {
++dispatched_count_from_idle_to_idle_;
}
cur_mod_.reset();
out_reason_ = reason;
}

View File

@ -245,6 +245,10 @@ public:
void dispatch_in_idle_pool();
void dispatch_in_fetch_stream(const common::ObAddr &svr, FetchStream &fs);
void dispatch_in_dead_pool();
int64_t get_dispatched_count_from_idle_to_idle() const
{
return fetch_info_.get_dispatched_count_from_idle_to_idle();
}
bool is_in_fetching_log() const { return FETCHING_LOG == ATOMIC_LOAD(&state_); }
void set_not_in_fetching_log() { ATOMIC_SET(&state_, NOT_FETCHING_LOG); }
@ -286,6 +290,8 @@ public:
// Internal member functions
private:
static const int64_t SERVER_LIST_UPDATE_INTERVAL_SEC = 5 * _SEC_;
int init_group_iterator_(const palf::LSN &start_lsn);
int init_archive_dest_(const ObBackupPathString &archve_dest_str,
@ -391,6 +397,7 @@ public:
FetchModule out_mod_; // module that dispatch out from
const char *out_reason_; // reason for dispatch out
int64_t dispatched_count_from_idle_to_idle_;
FetchInfo() { reset(); }
@ -405,6 +412,7 @@ public:
return (FetchModule::ModuleName::FETCH_MODULE_IDLE_POOL == cur_mod_.module_)
&& (FetchModule::ModuleName::FETCH_MODULE_IDLE_POOL == out_mod_.module_);
}
int64_t get_dispatched_count_from_idle_to_idle() const { return dispatched_count_from_idle_to_idle_; }
// Get the start fetch time of the log on the current server
// Requires FETCH_STREAM for the fetch log module; requiring the server to match

View File

@ -186,7 +186,7 @@ int FetchStream::handle(volatile bool &stop_flag)
LOG_INFO("[STAT] [FETCH_STREAM] begin handle", "fetch_stream", this,
"fetch_stream", *this, "LS_CTX", *ls_fetch_ctx_);
} else {
LOG_DEBUG("[STAT] [FETCH_STREAM] begin handle", "fetch_stream", this,
LOG_TRACE("[STAT] [FETCH_STREAM] begin handle", "fetch_stream", this,
"fetch_stream", *this, "LS_CTX", *ls_fetch_ctx_);
}
if (OB_ISNULL(ls_fetch_ctx_)) {
@ -249,7 +249,7 @@ void FetchStream::runTimerTask()
int64_t start_time = get_timestamp();
int64_t end_time = 0;
LOG_DEBUG("[STAT] [WAKE_UP_STREAM_TASK]", "task", this, "task", *this);
LOG_TRACE("[STAT] [WAKE_UP_STREAM_TASK]", "task", this, "task", *this);
if (OB_ISNULL(stream_worker_)) {
LOG_ERROR("invalid stream worker", K(stream_worker_));
@ -309,7 +309,7 @@ void FetchStream::do_stat()
last_stat_info_ = cur_stat_info_;
} else if (delta_second <= 0) {
// Statistics are too frequent, ignore the statistics here, otherwise the following will lead to divide by zero error
LOG_DEBUG("fetch stream stat too frequently", K(delta_time), K(delta_second),
LOG_TRACE("fetch stream stat too frequently", K(delta_time), K(delta_second),
K(last_stat_time_), K(this));
} else {
FetchStatInfoPrinter fsi_printer(cur_stat_info_, last_stat_info_, delta_second);
@ -339,7 +339,7 @@ void FetchStream::handle_when_leave_(const char *leave_reason) const
// No data members can be accessed in when print log, only the address is printed
LOG_INFO("[STAT] [FETCH_STREAM] leave stream", "fetch_stream", this, K(leave_reason));
} else {
LOG_DEBUG("[STAT] [FETCH_STREAM] leave stream", "fetch_stream", this, K(leave_reason));
LOG_TRACE("[STAT] [FETCH_STREAM] leave stream", "fetch_stream", this, K(leave_reason));
}
}
@ -485,7 +485,7 @@ int FetchStream::get_upper_limit(int64_t &upper_limit_us)
global_upper_limit = progress_controller_->get_global_upper_limit();
if (OB_INVALID_TIMESTAMP != global_upper_limit) {
int64_t log_progress = ls_fetch_ctx_->get_progress();
const int64_t log_progress = ls_fetch_ctx_->get_progress();
if (log_progress < global_upper_limit) {
upper_limit_us = INT64_MAX - 1;
} else {
@ -620,7 +620,7 @@ void FetchStream::print_handle_info_(
"resp", result.resp_, K(handle_rpc_time), K(read_log_time), K(decode_log_entry_time),
K(tsi), K(min_progress), K(min_tls_id));
} else {
LOG_DEBUG("handle rpc result by fetch stream",
LOG_TRACE("handle rpc result by fetch stream",
"fetch_stream", this,
"upper_limit", NTS_TO_STR(upper_limit_),
K(need_stop_request),
@ -885,7 +885,7 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag)
int ret = OB_SUCCESS;
static const int64_t UPDATE_FETCH_STATE_INTERVAL = 100;
bool need_fetch_log = true;
LOG_DEBUG("handle_fetch_archive_task_ begin", K(svr_), "tls_id", ls_fetch_ctx_->get_tls_id());
LOG_TRACE("handle_fetch_archive_task_ begin", K(svr_), "tls_id", ls_fetch_ctx_->get_tls_id());
if (OB_ISNULL(ls_fetch_ctx_)) {
ret = OB_ERR_UNEXPECTED;
@ -1297,7 +1297,7 @@ int FetchStream::read_log_(
read_log_time = 0;
decode_log_entry_time = 0;
// TODO for Debug remove
LOG_DEBUG("redo_log_debug", K(resp), "tls_id", ls_fetch_ctx_->get_tls_id());
LOG_TRACE("redo_log_debug", K(resp), "tls_id", ls_fetch_ctx_->get_tls_id());
if (OB_ISNULL(buf)) {
LOG_ERROR("invalid response log buf", K(buf), K(resp));
@ -1307,7 +1307,7 @@ int FetchStream::read_log_(
LOG_ERROR("invalid ls_fetch_ctx", KR(ret), K(ls_fetch_ctx_));
} else if (0 == log_cnt) {
// Ignore 0 logs
LOG_DEBUG("fetch 0 log", K_(svr), "fetch_status", resp.get_fetch_status());
LOG_TRACE("fetch 0 log", K_(svr), "fetch_status", resp.get_fetch_status());
} else if (OB_FAIL(ls_fetch_ctx_->append_log(buf, len))) {
LOG_ERROR("append log to LSFetchCtx failed", KR(ret), KPC(ls_fetch_ctx_), K(resp));
} else {
@ -1603,7 +1603,7 @@ int FetchStream::check_switch_server_(LSFetchCtx &task, KickOutInfo &kick_out_in
if (exist_(kick_out_info, task.get_tls_id())) {
// Do not check for LS already located in kick_out_info
} else if (task.need_switch_server(svr_)) {
LOG_DEBUG("exist higher priority server, need switch server", KR(ret), "key", task.get_tls_id(),
LOG_TRACE("exist higher priority server, need switch server", KR(ret), "key", task.get_tls_id(),
K_(svr));
// If need to switch the stream, add it to the kick out collection
if (OB_FAIL(set_(kick_out_info, task.get_tls_id(), NEED_SWITCH_SERVER))) {

View File

@ -205,7 +205,7 @@ int PartProgressController::update_progress(const int64_t progress_id, const int
Item &item = *(progress_list_ + progress_id);
item.update(progress);
_LOG_DEBUG("[STAT] [PROGRESS_CONTROLLER] [UPDATE] progress_id=%ld progress=%ld(%s) "
_LOG_TRACE("[STAT] [PROGRESS_CONTROLLER] [UPDATE] progress_id=%ld progress=%ld(%s) "
"delay=%s progress_cnt=(total=%ld,valid=%ld,recycled=%ld,max=%ld)",
progress_id, progress, NTS_TO_STR(progress), NTS_TO_DELAY(progress),
progress_cnt_, valid_progress_cnt_, recycled_indices_.count(), max_progress_cnt_);
@ -242,7 +242,7 @@ int PartProgressController::get_min_progress(int64_t &progress)
}
}
LOG_DEBUG("[FETCHER] [GET_MIN_PROGRESS] ", K(progress), K_(progress_cnt), K(min_progress_id));
LOG_TRACE("[FETCHER] [GET_MIN_PROGRESS] ", K(progress), K_(progress_cnt), K(min_progress_id));
execution_time = get_timestamp() - execution_time ;
// Update execution time, print execution time periodically
update_execution_time_(execution_time);

View File

@ -121,7 +121,7 @@ int ObLogRpc::async_stream_fetch_log(const uint64_t tenant_id,
req.set_flag(ObCdcRpcTestFlag::OBCDC_RPC_TEST_SWITCH_MODE);
}
SEND_RPC(async_stream_fetch_log, tenant_id, svr, timeout, req, &cb);
LOG_DEBUG("rpc: async fetch stream log", KR(ret), K(svr), K(timeout), K(req));
LOG_TRACE("rpc: async fetch stream log", KR(ret), K(svr), K(timeout), K(req));
return ret;
}
@ -137,7 +137,7 @@ int ObLogRpc::async_stream_fetch_missing_log(const uint64_t tenant_id,
req.set_flag(ObCdcRpcTestFlag::OBCDC_RPC_FETCH_ARCHIVE);
}
SEND_RPC(async_stream_fetch_miss_log, tenant_id, svr, timeout, req, &cb);
LOG_DEBUG("rpc: async fetch stream missing_log", KR(ret), K(svr), K(timeout), K(req));
LOG_TRACE("rpc: async fetch stream missing_log", KR(ret), K(svr), K(timeout), K(req));
return ret;
}

View File

@ -193,7 +193,7 @@ int ObLogStartLSNLocator::dispatch_worker_(StartLSNLocateReq *req)
if (req->is_request_ended(locate_count_)) {
// If the request ends, set to DONE
// NOTE: After setting to DONE, no further access is possible
LOG_DEBUG("start lsn locate request ended", KPC(req));
LOG_TRACE("start lsn locate request ended", KPC(req));
req->set_state_done();
} else if (OB_FAIL(req->next_svr_item(item)) || OB_ISNULL(item)) {
LOG_ERROR("get next server item fail", KR(ret), KPC(req), K(item));
@ -209,7 +209,7 @@ int ObLogStartLSNLocator::dispatch_worker_(StartLSNLocateReq *req)
hash_val = common::murmurhash(&tenant_id, sizeof(tenant_id), hash_val);
hash_val = common::murmurhash(&svr_hash, sizeof(svr_hash), hash_val);
LOG_DEBUG("dispatch start lsn locate request",
LOG_TRACE("dispatch start lsn locate request",
"worker_idx", hash_val % worker_cnt_,
K(request_svr),
KPC(req));

View File

@ -208,7 +208,7 @@ int ObLSWorker::dispatch_fetch_task(LSFetchCtx &task, const char *dispatch_reaso
request_svr.reset();
bool found_valid_svr = false;
LOG_DEBUG("[STAT] [STREAM_WORKER] [DISPATCH_FETCH_TASK] begin to dispatch",
LOG_TRACE("[STAT] [STREAM_WORKER] [DISPATCH_FETCH_TASK] begin to dispatch",
"task", &task, K(task), K(dispatch_reason));
// Get the next valid server for the service log
@ -224,7 +224,7 @@ int ObLSWorker::dispatch_fetch_task(LSFetchCtx &task, const char *dispatch_reaso
"svr_service_time", TVAL_TO_STR(svr_service_time),
"survival_time", TVAL_TO_STR(survival_time));
} else {
LOG_DEBUG("not-avail server, task add into blacklist succ", KR(ret), K(task), K(request_svr),
LOG_TRACE("not-avail server, task add into blacklist succ", KR(ret), K(task), K(request_svr),
"svr_service_time", TVAL_TO_STR(svr_service_time),
"survival_time", TVAL_TO_STR(survival_time));
}
@ -242,14 +242,14 @@ int ObLSWorker::dispatch_fetch_task(LSFetchCtx &task, const char *dispatch_reaso
if (OB_SUCCESS == ret) {
// No server available, put it into idle pool
if (! found_valid_svr) {
LOG_DEBUG("[STAT] [STREAM_WORKER] [DISPATCH_FETCH_TASK] server list is used up, "
LOG_TRACE("[STAT] [STREAM_WORKER] [DISPATCH_FETCH_TASK] server list is used up, "
"dispatch to idle pool", "task", &task, K(task));
if (OB_FAIL(idle_pool_->push(&task))) {
LOG_ERROR("push into idle pool fail", KR(ret), K(task));
}
} else {
LOG_DEBUG("[STAT] [STREAM_WORKER] [DISPATCH_FETCH_TASK] dispatch to next server",
LOG_TRACE("[STAT] [STREAM_WORKER] [DISPATCH_FETCH_TASK] dispatch to next server",
K(request_svr), "task", &task, K(task));
// Assigning tasks to the server
@ -286,7 +286,7 @@ int ObLSWorker::dispatch_stream_task(FetchStream &task, const char *from_mod)
LOG_INFO("[STAT] [STREAM_WORKER] [DISPATCH_STREAM_TASK]",
"fetch_stream", &task, K(from_mod), K(hash_val), K(task));
} else {
LOG_DEBUG("[STAT] [STREAM_WORKER] [DISPATCH_STREAM_TASK]",
LOG_TRACE("[STAT] [STREAM_WORKER] [DISPATCH_STREAM_TASK]",
"fetch_stream", &task, K(from_mod), K(hash_val), K(task));
}
@ -307,7 +307,7 @@ int ObLSWorker::hibernate_stream_task(FetchStream &task, const char *from_mod)
LOG_INFO("[STAT] [STREAM_WORKER] [HIBERNATE_STREAM_TASK]",
"task", &task, K(from_mod), K(task));
} else {
LOG_DEBUG("[STAT] [STREAM_WORKER] [HIBERNATE_STREAM_TASK]",
LOG_TRACE("[STAT] [STREAM_WORKER] [HIBERNATE_STREAM_TASK]",
"task", &task, K(from_mod), K(task));
}
@ -328,7 +328,7 @@ void ObLSWorker::handle(void *data, volatile bool &stop_flag)
bool is_paused = ATOMIC_LOAD(&stream_paused_);
FetchStream *task = static_cast<FetchStream *>(data);
LOG_DEBUG("[STAT] [STREAM_WORKER] [HANDLE_STREAM_TASK]", K_(stream_paused), K(thread_index),
LOG_TRACE("[STAT] [STREAM_WORKER] [HANDLE_STREAM_TASK]", K_(stream_paused), K(thread_index),
K(task), KPC(task));
if (OB_ISNULL(task)) {
@ -338,7 +338,7 @@ void ObLSWorker::handle(void *data, volatile bool &stop_flag)
// If the stream task is currently suspended, the task is put to sleep
// DDL tasks are exempt from suspend and require always processing
else if (OB_UNLIKELY(is_paused) && ! task->is_sys_log_stream()) {
LOG_DEBUG("[STAT] [STREAM_WORKER] [HIBERNATE_STREAM_TASK_ON_PAUSE]", K(task));
LOG_TRACE("[STAT] [STREAM_WORKER] [HIBERNATE_STREAM_TASK_ON_PAUSE]", K(task));
if (OB_FAIL(hibernate_stream_task(*task, "PausedFetcher"))) {
LOG_ERROR("hibernate_stream_task on pause fail", KR(ret), K(task), KPC(task));
@ -395,7 +395,7 @@ int ObLSWorker::dispatch_fetch_task_to_svr_(LSFetchCtx &task, const common::ObAd
} else if (OB_FAIL(fs_container_mgr->get_fsc(tls_id, fsc))) {
LOG_ERROR("FetchStreamContainerMgr get_fsc fail", KR(ret));
} else {
LOG_DEBUG("[STAT] [STREAM_WORKER] [DISPATCH_FETCH_TASK] dispatch to svr",
LOG_TRACE("[STAT] [STREAM_WORKER] [DISPATCH_FETCH_TASK] dispatch to svr",
"task", &task, K(task), K(request_svr));
if (OB_FAIL(fsc->dispatch(task, request_svr))) {