[CP] [OBCDC] Fix not check_switch_server

This commit is contained in:
SanmuWangZJU
2023-10-18 08:09:34 +00:00
committed by ob-robot
parent 1ff26def43
commit 1d15f2df0a
11 changed files with 38 additions and 101 deletions

View File

@ -327,11 +327,6 @@ public:
// LS fetch progress update timeout in seconds
// If the logs are not fetched after a certain period of time, the stream will be cut
T_DEF_INT_INFT(ls_fetch_progress_update_timeout_sec, OB_CLUSTER_PARAMETER, 15, 1, "logstream fetch progress update timeout in seconds");
// Timeout time for lagging replica logstreams
//
// If logs are not fetched for more than a certain period of time on a lagging copy, cut the stream
T_DEF_INT_INFT(ls_fetch_progress_update_timeout_sec_for_lagged_replica, OB_CLUSTER_PARAMETER, 3, 1,
"fetch progress update timeout for lagged replica in seconds");
T_DEF_INT_INFT(log_router_background_refresh_interval_sec, OB_CLUSTER_PARAMETER, 10, 1,
"log_route_service background_refresh_time in seconds");
@ -378,7 +373,7 @@ public:
T_DEF_INT_INFT(blacklist_history_clear_interval_min, OB_CLUSTER_PARAMETER, 2, 1, "blacklist history clear interval in minute");
// Check the need for active cut-off cycles, in minutes
T_DEF_INT_INFT(check_switch_server_interval_min, OB_CLUSTER_PARAMETER, 10, 1, "check switch server interval in minute");
T_DEF_INT_INFT(check_switch_server_interval_sec, OB_CLUSTER_PARAMETER, 60, 1, "check switch server interval in seconds");
// Print the number of LSs with the slowest progress of the Fetcher module
T_DEF_INT_INFT(print_fetcher_slowest_ls_num, OB_CLUSTER_PARAMETER, 10, 1, "print fetcher slowest ls num");

View File

@ -1002,19 +1002,15 @@ uint64_t LSFetchCtx::hash() const
int LSFetchCtx::check_fetch_timeout(const common::ObAddr &svr,
const int64_t upper_limit,
const int64_t fetcher_resume_tstamp,
bool &is_fetch_timeout, // Is the log fetch timeout
bool &is_fetch_timeout_on_lagged_replica) // Is the log fetch timeout on a lagged replica
bool &is_fetch_timeout) // Is the log fetch timeout
{
int ret = OB_SUCCESS;
int64_t cur_time = get_timestamp();
int64_t svr_start_fetch_tstamp = OB_INVALID_TIMESTAMP;
// Partition timeout, after which time progress is not updated, it is considered to be a log fetch timeout
const int64_t ls_fetch_progress_update_timeout = TCONF.ls_fetch_progress_update_timeout_sec * _SEC_;
// Timeout period for partitions on lagging replica, compared to normal timeout period
const int64_t ls_fetch_progress_update_timeout_for_lagged_replica = TCONF.ls_fetch_progress_update_timeout_sec_for_lagged_replica * _SEC_;
is_fetch_timeout = false;
is_fetch_timeout_on_lagged_replica = false;
// Get the starting log time on the current server
if (OB_FAIL(fetch_info_.get_cur_svr_start_fetch_tstamp(svr, svr_start_fetch_tstamp))) {
@ -1029,7 +1025,6 @@ int LSFetchCtx::check_fetch_timeout(const common::ObAddr &svr,
if (OB_INVALID_TIMESTAMP != cur_progress && cur_progress >= upper_limit) {
is_fetch_timeout = false;
is_fetch_timeout_on_lagged_replica = false;
} else {
// Consider the time at which logs start to be fetched on the server as a lower bound for progress updates
// Ensure that the ls stays on a server for a certain period of time
@ -1052,24 +1047,11 @@ int LSFetchCtx::check_fetch_timeout(const common::ObAddr &svr,
// long periods of non-updating progress and progress timeouts, where it is no longer necessary to determine if the machine is behind in its backup
if (progress_update_interval >= ls_fetch_progress_update_timeout) {
is_fetch_timeout = true;
is_fetch_timeout_on_lagged_replica = false;
} else {
// Before the progress timeout, verify that the server fetching the logs is a lagged replica, and if the logs are not fetched on the lagged replica for some time, then it is also considered a progress timeout
// Generally, the timeout for a lagged copy is less than the progress timeout
// ls_fetch_progress_update_timeout_for_lagged_replica < ls_fetch_progress_update_timeout
//
// How to define a long timeout for fetching logs on a lagged replica?
// 1. lagged replica: the next log does exist, but this server can't fetch it, indicating that this server is most likely behind the replica
// 2. When to start counting the timeout: from the time libobcdc confirms the existence of the next log
if (progress_update_interval >= ls_fetch_progress_update_timeout_for_lagged_replica) { // Progress update time over lagging replica configuration item
is_fetch_timeout = true;
is_fetch_timeout_on_lagged_replica = true;
}
}
if (is_fetch_timeout) {
LOG_INFO("[CHECK_PROGRESS_TIMEOUT]", K_(tls_id), K(svr),
K(is_fetch_timeout), K(is_fetch_timeout_on_lagged_replica),
K(is_fetch_timeout),
K(progress_update_interval),
K(progress_),
"update_limit", NTS_TO_STR(upper_limit),
@ -1077,7 +1059,7 @@ int LSFetchCtx::check_fetch_timeout(const common::ObAddr &svr,
"svr_start_fetch_tstamp", TS_TO_STR(svr_start_fetch_tstamp));
} else {
LOG_DEBUG("[CHECK_PROGRESS_TIMEOUT]", K_(tls_id), K(svr),
K(is_fetch_timeout), K(is_fetch_timeout_on_lagged_replica),
K(is_fetch_timeout),
K(progress_update_interval),
K(progress_),
"update_limit", NTS_TO_STR(upper_limit),
@ -1200,11 +1182,13 @@ bool LSFetchCtx::need_switch_server(const common::ObAddr &cur_svr)
if (OB_FAIL(get_log_route_service_(log_route_service))) {
LOG_ERROR("get_log_route_service_ failed", KR(ret));
} else if (OB_FAIL(log_route_service->need_switch_server(tls_id_.get_tenant_id(), tls_id_.get_ls_id(),
next_lsn, cur_svr))) {
LOG_ERROR("ObLogRouteService need_switch_server failed", KR(ret), K(tls_id_), K(next_lsn),
K(cur_svr));
} else {}
} else {
bool_ret = log_route_service->need_switch_server(
tls_id_.get_tenant_id(),
tls_id_.get_ls_id(),
next_lsn,
cur_svr);
}
return bool_ret;
}

View File

@ -230,8 +230,7 @@ public:
int check_fetch_timeout(const common::ObAddr &svr,
const int64_t upper_limit,
const int64_t fetcher_resume_tstamp,
bool &is_fetch_timeout, // Is the log fetch timeout
bool &is_fetch_timeout_on_lagged_replica); // Is the log fetch timeout on a lagged replica
bool &is_fetch_timeout); // Is the log fetch timeout
// Get the progress of a transaction
// 1. When there is a transaction ready to be sent, the timestamp of the transaction to be sent - 1 is taken as the progress of the sending

View File

@ -37,7 +37,7 @@ int64_t FetchStream::g_rpc_timeout = ObLogConfig::default_fetch_log_rpc_timeout_
int64_t FetchStream::g_dml_progress_limit = ObLogConfig::default_progress_limit_sec_for_dml * _SEC_;
int64_t FetchStream::g_ddl_progress_limit = ObLogConfig::default_progress_limit_sec_for_ddl * _SEC_;
int64_t FetchStream::g_blacklist_survival_time = ObLogConfig::default_blacklist_survival_time_sec * _SEC_;
int64_t FetchStream::g_check_switch_server_interval = ObLogConfig::default_check_switch_server_interval_min * _MIN_;
int64_t FetchStream::g_check_switch_server_interval = ObLogConfig::default_check_switch_server_interval_sec * _SEC_;
bool FetchStream::g_print_rpc_handle_info = ObLogConfig::default_print_rpc_handle_info;
bool FetchStream::g_print_stream_dispatch_info = ObLogConfig::default_print_stream_dispatch_info;
@ -256,7 +256,7 @@ void FetchStream::configure(const ObLogConfig &config)
int64_t dml_progress_limit_sec = config.progress_limit_sec_for_dml;
int64_t ddl_progress_limit_sec = config.progress_limit_sec_for_ddl;
int64_t blacklist_survival_time_sec = config.blacklist_survival_time_sec;
int64_t check_switch_server_interval_min = config.check_switch_server_interval_min;
int64_t check_switch_server_interval_sec = config.check_switch_server_interval_sec;
bool print_rpc_handle_info = config.print_rpc_handle_info;
bool print_stream_dispatch_info = config.print_stream_dispatch_info;
@ -268,8 +268,8 @@ void FetchStream::configure(const ObLogConfig &config)
LOG_INFO("[CONFIG]", K(ddl_progress_limit_sec));
ATOMIC_STORE(&g_blacklist_survival_time, blacklist_survival_time_sec * _SEC_);
LOG_INFO("[CONFIG]", K(blacklist_survival_time_sec));
ATOMIC_STORE(&g_check_switch_server_interval, check_switch_server_interval_min * _MIN_);
LOG_INFO("[CONFIG]", K(check_switch_server_interval_min));
ATOMIC_STORE(&g_check_switch_server_interval, check_switch_server_interval_sec * _SEC_);
LOG_INFO("[CONFIG]", K(check_switch_server_interval_sec));
ATOMIC_STORE(&g_print_rpc_handle_info, print_rpc_handle_info);
LOG_INFO("[CONFIG]", K(print_rpc_handle_info));
ATOMIC_STORE(&g_print_stream_dispatch_info, print_stream_dispatch_info);
@ -1323,10 +1323,6 @@ const char *FetchStream::print_kick_out_reason_(const KickOutReason reason)
str = "PROGRESS_TIMEOUT";
break;
case PROGRESS_TIMEOUT_ON_LAGGED_REPLICA:
str = "PROGRESS_TIMEOUT_ON_LAGGED_REPLICA";
break;
case NEED_SWITCH_SERVER:
str = "NeedSwitchServer";
break;
@ -2147,7 +2143,6 @@ int FetchStream::check_fetch_timeout_(LSFetchCtx &task, KickOutInfo &kick_out_in
{
int ret = OB_SUCCESS;
bool is_fetch_timeout = false;
bool is_fetch_timeout_on_lagged_replica = false;
// For lagging replica, the timeout of partition
int64_t fetcher_resume_tstamp = OB_INVALID_TIMESTAMP;
@ -2157,12 +2152,11 @@ int FetchStream::check_fetch_timeout_(LSFetchCtx &task, KickOutInfo &kick_out_in
} else {
fetcher_resume_tstamp = stream_worker_->get_fetcher_resume_tstamp();
if (OB_FAIL(task.check_fetch_timeout(svr_, upper_limit_, fetcher_resume_tstamp,
is_fetch_timeout, is_fetch_timeout_on_lagged_replica))) {
if (OB_FAIL(task.check_fetch_timeout(svr_, upper_limit_, fetcher_resume_tstamp, is_fetch_timeout))) {
LOG_ERROR("check fetch timeout fail", KR(ret), K_(svr),
K(upper_limit_), "fetcher_resume_tstamp", TS_TO_STR(fetcher_resume_tstamp), K(task));
} else if (is_fetch_timeout) {
KickOutReason reason = is_fetch_timeout_on_lagged_replica ? PROGRESS_TIMEOUT_ON_LAGGED_REPLICA : PROGRESS_TIMEOUT;
KickOutReason reason = PROGRESS_TIMEOUT;
// If the partition fetch log times out, add it to the kick out collection
if (OB_FAIL(set_(kick_out_info, task.get_tls_id(), reason))) {
if (OB_ENTRY_EXIST == ret) {
@ -2183,11 +2177,11 @@ int FetchStream::check_switch_server_(LSFetchCtx &task, KickOutInfo &kick_out_in
{
int ret = OB_SUCCESS;
if (exist_(kick_out_info, task.get_tls_id())) {
if (exist_(kick_out_info, task.get_tls_id()) && kick_out_info.need_kick_out()) {
// Do not check for LS already located in kick_out_info
} else if (task.need_switch_server(svr_)) {
LOG_DEBUG("exist higher priority server, need switch server", KR(ret), "key", task.get_tls_id(),
K_(svr));
LOG_INFO("exist higher priority server, need switch server", KR(ret), "key", task.get_tls_id(),
K_(svr));
// If need to switch the stream, add it to the kick out collection
if (OB_FAIL(set_(kick_out_info, task.get_tls_id(), NEED_SWITCH_SERVER))) {
if (OB_ENTRY_EXIST == ret) {

View File

@ -149,14 +149,12 @@ private:
// Progress timeout, long time to fetch logs
PROGRESS_TIMEOUT = 8, // Partition fetch log timeout
// Progress timeout and detection of lagging replica
PROGRESS_TIMEOUT_ON_LAGGED_REPLICA = 9, // Partition fetch log timeout on lagging replica
NEED_SWITCH_SERVER = 10, // There is a higher priority server that actively switch
DISCARDED = 11, // Partition is discard
NEED_SWITCH_SERVER = 9, // There is a higher priority server that actively switch
DISCARDED = 10, // Partition is discard
// Feedback
ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF = 12, //same as ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF
ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF = 11, //same as ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF
};
static const char *print_kick_out_reason_(const KickOutReason reason);
// Determine if the server needs to be blacklisted,

View File

@ -149,11 +149,6 @@ public:
// LS fetch progress update timeout in seconds
// If the logs are not fetched after a certain period of time, the stream will be cut
T_DEF_INT_INFT(ls_fetch_progress_update_timeout_sec, OB_CLUSTER_PARAMETER, 15, 1, "logstream fetch progress update timeout in seconds");
// Timeout time for lagging replica logstreams
//
// If logs are not fetched for more than a certain period of time on a lagging copy, cut the stream
T_DEF_INT_INFT(ls_fetch_progress_update_timeout_sec_for_lagged_replica, OB_CLUSTER_PARAMETER, 3, 1,
"fetch progress update timeout for lagged replica in seconds");
T_DEF_INT_INFT(log_router_background_refresh_interval_sec, OB_CLUSTER_PARAMETER, 10, 1,
"log_route_service background_refresh_time in seconds");
@ -196,7 +191,7 @@ public:
T_DEF_INT_INFT(blacklist_history_clear_interval_min, OB_CLUSTER_PARAMETER, 20, 10, "blacklist history clear interval in minute");
// Check the need for active cut-off cycles, in minutes
T_DEF_INT_INFT(check_switch_server_interval_min, OB_CLUSTER_PARAMETER, 10, 1, "check switch server interval in minute");
T_DEF_INT_INFT(check_switch_server_interval_sec, OB_CLUSTER_PARAMETER, 60, 1, "check switch server interval in seconds");
// Print the number of LSs with the slowest progress of the Fetcher module
T_DEF_INT_INFT(print_fetcher_slowest_ls_num, OB_CLUSTER_PARAMETER, 10, 1, "print fetcher slowest ls num");

View File

@ -48,10 +48,6 @@ const char *print_switch_reason(const KickOutReason reason)
str = "PROGRESS_TIMEOUT";
break;
case PROGRESS_TIMEOUT_ON_LAGGED_REPLICA:
str = "PROGRESS_TIMEOUT_ON_LAGGED_REPLICA";
break;
case NEED_SWITCH_SERVER:
str = "NeedSwitchServer";
break;

View File

@ -37,14 +37,12 @@ enum KickOutReason
// Progress timeout, long time to fetch logs
PROGRESS_TIMEOUT = 8, // Partition fetch log timeout
// Progress timeout and detection of lagging replica
PROGRESS_TIMEOUT_ON_LAGGED_REPLICA = 9, // Partition fetch log timeout on lagging replica
NEED_SWITCH_SERVER = 10, // There is a higher priority server that actively switch
DISCARDED = 11, // Partition is discard
NEED_SWITCH_SERVER = 9, // There is a higher priority server that actively switch
DISCARDED = 10, // Partition is discard
// Feedback
ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF = 12, //same as ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF
ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF = 11, //same as ARCHIVE_ITER_END_BUT_LS_NOT_EXIST_IN_PALF
};
const char *print_switch_reason(const KickOutReason reason);

View File

@ -780,14 +780,12 @@ uint64_t LSFetchCtx::hash() const
int LSFetchCtx::check_fetch_timeout(const common::ObAddr &svr,
const int64_t upper_limit,
const int64_t fetcher_resume_tstamp,
bool &is_fetch_timeout, // Is the log fetch timeout
bool &is_fetch_timeout_on_lagged_replica) // Is the log fetch timeout on a lagged replica
bool &is_fetch_timeout) // Is the log fetch timeout
{
int ret = OB_SUCCESS;
int64_t cur_time = get_timestamp();
int64_t svr_start_fetch_tstamp = OB_INVALID_TIMESTAMP;
is_fetch_timeout = false;
is_fetch_timeout_on_lagged_replica = false;
const ObLogFetcherConfig *cfg = nullptr;
if (OB_FAIL(get_fetcher_config(cfg))) {
@ -795,8 +793,6 @@ int LSFetchCtx::check_fetch_timeout(const common::ObAddr &svr,
} else {
// Partition timeout, after which time progress is not updated, it is considered to be a log fetch timeout
const int64_t ls_fetch_progress_update_timeout = cfg->ls_fetch_progress_update_timeout_sec * _SEC_;
// Timeout period for partitions on lagging replica, compared to normal timeout period
const int64_t ls_fetch_progress_update_timeout_for_lagged_replica = cfg->ls_fetch_progress_update_timeout_sec_for_lagged_replica * _SEC_;
// Get the starting log time on the current server
@ -812,7 +808,6 @@ int LSFetchCtx::check_fetch_timeout(const common::ObAddr &svr,
if (OB_INVALID_TIMESTAMP != cur_progress && cur_progress >= upper_limit) {
is_fetch_timeout = false;
is_fetch_timeout_on_lagged_replica = false;
} else {
// Consider the time at which logs start to be fetched on the server as a lower bound for progress updates
// Ensure that the ls stays on a server for a certain period of time
@ -835,24 +830,11 @@ int LSFetchCtx::check_fetch_timeout(const common::ObAddr &svr,
// long periods of non-updating progress and progress timeouts, where it is no longer necessary to determine if the machine is behind in its backup
if (progress_update_interval >= ls_fetch_progress_update_timeout) {
is_fetch_timeout = true;
is_fetch_timeout_on_lagged_replica = false;
} else {
// Before the progress timeout, verify that the server fetching the logs is a lagged replica, and if the logs are not fetched on the lagged replica for some time, then it is also considered a progress timeout
// Generally, the timeout for a lagged copy is less than the progress timeout
// ls_fetch_progress_update_timeout_for_lagged_replica < ls_fetch_progress_update_timeout
//
// How to define a long timeout for fetching logs on a lagged replica?
// 1. lagged replica: the next log does exist, but this server can't fetch it, indicating that this server is most likely behind the replica
// 2. When to start counting the timeout: from the time libobcdc confirms the existence of the next log
if (progress_update_interval >= ls_fetch_progress_update_timeout_for_lagged_replica) { // Progress update time over lagging replica configuration item
is_fetch_timeout = true;
is_fetch_timeout_on_lagged_replica = true;
}
}
if (is_fetch_timeout) {
LOG_INFO("[CHECK_PROGRESS_TIMEOUT]", K_(tls_id), K(svr),
K(is_fetch_timeout), K(is_fetch_timeout_on_lagged_replica),
K(is_fetch_timeout),
K(progress_update_interval),
K(progress_),
"update_limit", NTS_TO_STR(upper_limit),
@ -860,7 +842,7 @@ int LSFetchCtx::check_fetch_timeout(const common::ObAddr &svr,
"svr_start_fetch_tstamp", TS_TO_STR(svr_start_fetch_tstamp));
} else {
LOG_TRACE("[CHECK_PROGRESS_TIMEOUT]", K_(tls_id), K(svr),
K(is_fetch_timeout), K(is_fetch_timeout_on_lagged_replica),
K(is_fetch_timeout),
K(progress_update_interval),
K(progress_),
"update_limit", NTS_TO_STR(upper_limit),

View File

@ -214,8 +214,7 @@ public:
int check_fetch_timeout(const common::ObAddr &svr,
const int64_t upper_limit,
const int64_t fetcher_resume_tstamp,
bool &is_fetch_timeout, // Is the log fetch timeout
bool &is_fetch_timeout_on_lagged_replica); // Is the log fetch timeout on a lagged replica
bool &is_fetch_timeout); // Is the log fetch timeout
// Get the progress of a transaction
// 1. When there is a transaction ready to be sent, the timestamp of the transaction to be sent - 1 is taken as the progress of the sending

View File

@ -44,7 +44,7 @@ int64_t FetchStream::g_rpc_timeout = ObLogFetcherConfig::default_fetch_log_rpc_t
int64_t FetchStream::g_dml_progress_limit = ObLogFetcherConfig::default_progress_limit_sec_for_dml * _SEC_;
int64_t FetchStream::g_ddl_progress_limit = ObLogFetcherConfig::default_progress_limit_sec_for_ddl * _SEC_;
int64_t FetchStream::g_blacklist_survival_time = ObLogFetcherConfig::default_blacklist_survival_time_sec * _SEC_;
int64_t FetchStream::g_check_switch_server_interval = ObLogFetcherConfig::default_check_switch_server_interval_min * _MIN_;
int64_t FetchStream::g_check_switch_server_interval = ObLogFetcherConfig::default_check_switch_server_interval_sec * _SEC_;
bool FetchStream::g_print_rpc_handle_info = ObLogFetcherConfig::default_print_rpc_handle_info;
bool FetchStream::g_print_stream_dispatch_info = ObLogFetcherConfig::default_print_stream_dispatch_info;
int64_t FetchStream::g_schedule_time = ObLogFetcherConfig::default_timer_task_wait_time_msec * _MSEC_;
@ -282,7 +282,7 @@ void FetchStream::configure(const ObLogFetcherConfig &config)
int64_t dml_progress_limit_sec = config.progress_limit_sec_for_dml;
int64_t ddl_progress_limit_sec = config.progress_limit_sec_for_ddl;
int64_t blacklist_survival_time_sec = config.blacklist_survival_time_sec;
int64_t check_switch_server_interval_min = config.check_switch_server_interval_min;
int64_t check_switch_server_interval_sec = config.check_switch_server_interval_sec;
bool print_rpc_handle_info = config.print_rpc_handle_info;
bool print_stream_dispatch_info = config.print_stream_dispatch_info;
int64_t timer_task_wait_time_msec = config.timer_task_wait_time_msec;
@ -295,8 +295,8 @@ void FetchStream::configure(const ObLogFetcherConfig &config)
LOG_INFO("[CONFIG]", K(ddl_progress_limit_sec));
ATOMIC_STORE(&g_blacklist_survival_time, blacklist_survival_time_sec * _SEC_);
LOG_INFO("[CONFIG]", K(blacklist_survival_time_sec));
ATOMIC_STORE(&g_check_switch_server_interval, check_switch_server_interval_min * _MIN_);
LOG_INFO("[CONFIG]", K(check_switch_server_interval_min));
ATOMIC_STORE(&g_check_switch_server_interval, check_switch_server_interval_sec * _SEC_);
LOG_INFO("[CONFIG]", K(check_switch_server_interval_sec));
ATOMIC_STORE(&g_print_rpc_handle_info, print_rpc_handle_info);
LOG_INFO("[CONFIG]", K(print_rpc_handle_info));
ATOMIC_STORE(&g_print_stream_dispatch_info, print_stream_dispatch_info);
@ -1627,8 +1627,6 @@ int FetchStream::check_fetch_timeout_(LSFetchCtx &task, KickOutInfo &kick_out_in
{
int ret = OB_SUCCESS;
bool is_fetch_timeout = false;
bool is_fetch_timeout_on_lagged_replica = false;
// For lagging replica, the timeout of partition
int64_t fetcher_resume_tstamp = OB_INVALID_TIMESTAMP;
if (OB_ISNULL(stream_worker_)) {
@ -1637,12 +1635,11 @@ int FetchStream::check_fetch_timeout_(LSFetchCtx &task, KickOutInfo &kick_out_in
} else {
fetcher_resume_tstamp = stream_worker_->get_fetcher_resume_tstamp();
if (OB_FAIL(task.check_fetch_timeout(svr_, upper_limit_, fetcher_resume_tstamp,
is_fetch_timeout, is_fetch_timeout_on_lagged_replica))) {
if (OB_FAIL(task.check_fetch_timeout(svr_, upper_limit_, fetcher_resume_tstamp, is_fetch_timeout))) {
LOG_ERROR("check fetch timeout fail", KR(ret), K_(svr),
K(upper_limit_), "fetcher_resume_tstamp", TS_TO_STR(fetcher_resume_tstamp), K(task));
} else if (is_fetch_timeout) {
KickOutReason reason = is_fetch_timeout_on_lagged_replica ? PROGRESS_TIMEOUT_ON_LAGGED_REPLICA : PROGRESS_TIMEOUT;
KickOutReason reason = PROGRESS_TIMEOUT;
// If the partition fetch log times out, add it to the kick out collection
if (OB_FAIL(set_(kick_out_info, task.get_tls_id(), reason))) {
if (OB_ENTRY_EXIST == ret) {