[CP] Add retry interval when transfer task failed

This commit is contained in:
ZhenNan0
2024-03-04 08:44:13 +00:00
committed by ob-robot
parent c6e5d0a116
commit 8ef6b21d51
10 changed files with 189 additions and 12 deletions

View File

@ -240,13 +240,38 @@ TEST_F(TestTenantTransferService, test_service)
ObString finished_part_list_str;
ASSERT_EQ(OB_SUCCESS, finished_part_list.to_display_str(allocator, finished_part_list_str));
LOG_WARN("finished_part_list", K(finished_part_list_str));
ASSERT_TRUE(0 == finished_part_list_str.case_compare("500002:500003,500002:500004,500016:500014,500016:500015"));
ASSERT_TRUE(0 == finished_part_list_str.case_compare("500002:500005,500002:500006,500016:500014,500016:500015"));
ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, task_id, false, task, 0/*group_id*/));
create_time = OB_INVALID_TIMESTAMP;
finish_time = OB_INVALID_TIMESTAMP;
ObTransferTask history_task;
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get_history_task(inner_sql_proxy, g_tenant_id, task_id, history_task, create_time, finish_time));
ASSERT_TRUE(history_task.get_status().is_completed_status());
// test retry task with interval
sql.reset();
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("alter system set _transfer_task_retry_interval = '1h'"));
ASSERT_EQ(OB_SUCCESS, inner_sql_proxy.write(g_tenant_id, sql.ptr(), affected_rows));
sql.reset();
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("update oceanbase.__all_transfer_task_history set status = 'FAILED' where task_id = %ld", init_task.get_task_id().id()));
ASSERT_EQ(OB_SUCCESS, inner_sql_proxy.write(g_tenant_id, sql.ptr(), affected_rows));
ObTransferTask retry_task;
ObTransferTaskID retry_task_id(444);
ASSERT_EQ(OB_SUCCESS, gen_mock_data(retry_task_id, ObTransferStatus(ObTransferStatus::INIT), retry_task));
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::insert(inner_sql_proxy, g_tenant_id, retry_task));
ASSERT_EQ(OB_NEED_RETRY, tenant_transfer->process_init_task_(retry_task_id));
ObTransferTask retry_task_after_process;
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, retry_task_id, false, retry_task_after_process, 0/*group_id*/));
ASSERT_TRUE(WAIT_DUE_TO_LAST_FAILURE == retry_task_after_process.get_comment() && retry_task_after_process.get_status().is_init_status());
sleep(10); // 20s
ASSERT_EQ(OB_NEED_RETRY, tenant_transfer->process_init_task_(retry_task_id)); // _transfer_task_retry_interval effect
retry_task_after_process.reset();
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, retry_task_id, false, retry_task_after_process, 0/*group_id*/));
ASSERT_TRUE(WAIT_DUE_TO_LAST_FAILURE == retry_task_after_process.get_comment() && retry_task_after_process.get_status().is_init_status());
sql.reset();
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("alter system set _transfer_task_retry_interval = '0'"));
ASSERT_EQ(OB_SUCCESS, inner_sql_proxy.write(g_tenant_id, sql.ptr(), affected_rows));
ASSERT_TRUE(OB_NEED_RETRY != tenant_transfer->process_init_task_(retry_task_id));
}
TEST_F(TestTenantTransferService, test_batch_part_list)

View File

@ -223,6 +223,11 @@ TEST_F(TestTransferTaskOperator, test_operator)
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get_max_task_id_from_history(sql_proxy, tenant_id_, max_task_id));
ASSERT_TRUE(!max_task_id.is_valid());
// getget_last_task_by_balance_task_id when history is empty
ObTransferTask last_task;
int64_t finish_time = OB_INVALID_TIMESTAMP;
ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferTaskOperator::get_last_task_by_balance_task_id(sql_proxy, tenant_id_, ObBalanceTaskID(1), last_task, finish_time));
// insert
ObTransferTask other_task;
ObTransferTaskID other_task_id(222);
@ -250,7 +255,7 @@ TEST_F(TestTransferTaskOperator, test_operator)
// get_task_with_time
int64_t create_time = OB_INVALID_TIMESTAMP;
int64_t finish_time = OB_INVALID_TIMESTAMP;
finish_time = OB_INVALID_TIMESTAMP;
task.reset();
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get_task_with_time(sql_proxy, tenant_id_, task_id_, true, task, create_time, finish_time));
ASSERT_TRUE(OB_INVALID_TIMESTAMP != create_time);
@ -384,6 +389,12 @@ TEST_F(TestTransferTaskOperator, test_operator)
max_task_id.reset();
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get_max_task_id_from_history(sql_proxy, tenant_id_, max_task_id));
ASSERT_TRUE(max_task_id == task_id_);
last_task.reset();
finish_time = OB_INVALID_TIMESTAMP;
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get_last_task_by_balance_task_id(sql_proxy, tenant_id_, task_.get_balance_task_id(), last_task, finish_time));
ASSERT_TRUE(last_task.is_valid() && last_task.get_task_id() == task_id_);
ASSERT_TRUE(finish_time > 0);
}
} // namespace share

View File

@ -187,6 +187,7 @@ int ObTenantTransferService::process_init_task_(const ObTransferTaskID task_id)
ObTransferPartList lock_conflict_part_list;
ObDisplayTabletList table_lock_tablet_list;
ObTimeoutCtx ctx;
bool need_wait = false;
const int64_t start_time = ObTimeUtil::current_time();
TTS_INFO("start to process init task", K(task_id), K(start_time));
@ -215,6 +216,13 @@ int ObTenantTransferService::process_init_task_(const ObTransferTaskID task_id)
ret = OB_STATE_NOT_MATCH;
LOG_WARN("task status is not init", KR(ret), K(task));
} else if (FALSE_IT(ObCurTraceId::set(task.get_trace_id()))) {
} else if (OB_FAIL(check_if_need_wait_due_to_last_failure_(*sql_proxy_, task, need_wait))) {
LOG_WARN("check if need wait due to last failure failed", KR(ret), K(task), K(need_wait));
} else if (need_wait) {
result_comment = WAIT_DUE_TO_LAST_FAILURE;
ret = OB_NEED_RETRY;
TTS_INFO("last task failed, need to process task later",
KR(ret), K_(tenant_id), K(task), "result_comment", transfer_task_comment_to_str(result_comment));
} else if (OB_FAIL(check_ls_member_list_(
*sql_proxy_,
task.get_src_ls(),
@ -305,6 +313,58 @@ int ObTenantTransferService::process_init_task_(const ObTransferTaskID task_id)
return ret;
}
int ObTenantTransferService::check_if_need_wait_due_to_last_failure_(
common::ObISQLClient &sql_proxy,
const ObTransferTask &task,
bool &need_wait)
{
int ret = OB_SUCCESS;
need_wait = false;
ObTransferTask last_task;
int64_t finish_time = OB_INVALID_TIMESTAMP;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_UNLIKELY(!task.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid task", KR(ret), K(task));
} else {
int64_t wait_interval = 60 * 1000 * 1000L; // default 1m
{
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_));
if (tenant_config.is_valid()) {
wait_interval = tenant_config->_transfer_task_retry_interval;
}
} // release guard
if (OB_FAIL(ret)) {
} else if (0 == wait_interval) {
need_wait = false;
} else if (OB_FAIL(ObTransferTaskOperator::get_last_task_by_balance_task_id(
sql_proxy,
tenant_id_,
task.get_balance_task_id(),
last_task,
finish_time))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
need_wait = false;
} else {
LOG_WARN("get last task by balance task id failed",
KR(ret), K(tenant_id_), K(task), K(last_task));
}
} else if (OB_UNLIKELY(!last_task.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("last task should be valid", KR(ret), K(task), K(last_task));
} else if (last_task.get_status().is_failed_status()
&& ObTimeUtil::current_time() - finish_time < wait_interval) {
need_wait = true;
LOG_TRACE("last task failed, need wait", KR(ret),
K(task), K(last_task), K(finish_time), K(wait_interval));
}
}
return ret;
}
ERRSIM_POINT_DEF(EN_TENANT_TRANSFER_CHECK_LS_MEMBER_LIST_NOT_SAME);
// 1.check leader member_lists of src_ls and dest_ls are same
@ -1625,7 +1685,7 @@ int ObTenantTransferService::update_comment_for_expected_errors_(
} else if (OB_TRANS_TIMEOUT == err || OB_TIMEOUT == err) {
actual_comment = TRANSACTION_TIMEOUT;
} else if (OB_NEED_RETRY == err) {
if (WAIT_FOR_MEMBER_LIST != result_comment && INACTIVE_SERVER_IN_MEMBER_LIST != result_comment) {
if (result_comment < EMPTY_COMMENT || result_comment >= MAX_COMMENT) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected comment with err", KR(ret), K(err), K(result_comment));
} else {

View File

@ -231,6 +231,10 @@ private:
int construct_ls_member_list_(
common::sqlclient::ObMySQLResult &res,
share::ObLSReplica::MemberList &ls_member_list);
int check_if_need_wait_due_to_last_failure_(
common::ObISQLClient &sql_proxy,
const share::ObTransferTask &task,
bool &need_wait);
private:
static const int64_t IDLE_TIME_US = 10 * 1000 * 1000L; // 10s
static const int64_t BUSY_IDLE_TIME_US = 100 * 1000L; // 100ms

View File

@ -1556,22 +1556,28 @@ DEF_INT(_transfer_start_retry_count, OB_TENANT_PARAMETER, "3", "[0,64]",
DEF_TIME(_transfer_service_wakeup_interval, OB_TENANT_PARAMETER, "5m", "[1s,5m]",
"transfer service wakeup interval in errsim mode"
"transfer service wakeup interval in errsim mode. "
"Range: [1s, 5m]",
ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_TIME(_transfer_process_lock_tx_timeout, OB_TENANT_PARAMETER, "100s", "[30s,)",
"transaction timeout for locking and unlocking transfer task"
"transaction timeout for locking and unlocking transfer task. "
"Range: [30s, +∞)",
ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(_transfer_task_tablet_count_threshold, OB_TENANT_PARAMETER, "100", "(0,100]",
"Threshold for the count of tablets that can be processed by a transfer task"
"Threshold for the count of tablets that can be processed by a transfer task. "
"Range: (0, 100]",
ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(_enable_active_txn_transfer, OB_TENANT_PARAMETER, "True",
"Specifies whether support transfer active tx",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_TIME(_transfer_task_retry_interval, OB_TENANT_PARAMETER, "1m", "[0s,)",
"Retry interval after transfer task failure. "
"Range: [0s, +∞). Default: 1m",
ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
// end of transfer
DEF_TIME(dump_data_dictionary_to_log_interval, OB_TENANT_PARAMETER, "24h", "(0s,]",

View File

@ -423,6 +423,7 @@ static const char* TRANSFER_TASK_COMMENT_ARRAY[] =
"Task canceled",
"Unable to process task due to transaction timeout",
"Unable to process task due to inactive server in member list",
"Wait to retry due to the last failure",
"Unknow"/*MAX_COMMENT*/
};

View File

@ -281,6 +281,7 @@ enum ObTransferTaskComment
TASK_CANCELED = 3,
TRANSACTION_TIMEOUT = 4,
INACTIVE_SERVER_IN_MEMBER_LIST = 5,
WAIT_DUE_TO_LAST_FAILURE = 6,
MAX_COMMENT
};

View File

@ -76,8 +76,8 @@ int ObTransferTaskOperator::get_task_with_time(
ObSqlString sql;
common::sqlclient::ObMySQLResult *res = NULL;
const bool with_time = true;
if (OB_FAIL(sql.assign_fmt("SELECT time_to_usec(gmt_create) AS create_time, "
"time_to_usec(gmt_modified) AS finish_time, * FROM %s WHERE task_id = %ld%s",
if (OB_FAIL(sql.assign_fmt("SELECT time_to_usec(gmt_create) AS create_time_int64, "
"time_to_usec(gmt_modified) AS finish_time_int64, * FROM %s WHERE task_id = %ld%s",
OB_ALL_TRANSFER_TASK_TNAME, task_id.id(), for_update ? " FOR UPDATE" : ""))) {
LOG_WARN("fail to assign sql", KR(ret), K(task_id), K(for_update));
} else if (OB_FAIL(sql_proxy.read(result, tenant_id, sql.ptr()))) {
@ -830,8 +830,8 @@ int ObTransferTaskOperator::parse_sql_result_(
common::ObCurTraceId::TraceId trace_id;
if (with_time) {
(void)GET_COL_IGNORE_NULL(res.get_int, "create_time", create_time);
(void)GET_COL_IGNORE_NULL(res.get_int, "finish_time", finish_time);
(void)GET_COL_IGNORE_NULL(res.get_int, "create_time_int64", create_time);
(void)GET_COL_IGNORE_NULL(res.get_int, "finish_time_int64", finish_time);
}
(void)GET_COL_IGNORE_NULL(res.get_int, "task_id", task_id);
(void)GET_COL_IGNORE_NULL(res.get_int, "src_ls", src_ls);
@ -981,8 +981,8 @@ int ObTransferTaskOperator::get_history_task(
ObSqlString sql;
bool with_time = true;
common::sqlclient::ObMySQLResult *res = NULL;
if (OB_FAIL(sql.assign_fmt("SELECT time_to_usec(gmt_create) AS create_time, "
"time_to_usec(gmt_modified) AS finish_time, * FROM %s WHERE task_id = %ld",
if (OB_FAIL(sql.assign_fmt("SELECT time_to_usec(create_time) AS create_time_int64, "
"time_to_usec(finish_time) AS finish_time_int64, * FROM %s WHERE task_id = %ld",
OB_ALL_TRANSFER_TASK_HISTORY_TNAME, task_id.id()))) {
LOG_WARN("fail to assign sql", KR(ret), K(task_id));
} else if (OB_FAIL(sql_proxy.read(result, tenant_id, sql.ptr()))) {
@ -1049,6 +1049,54 @@ int ObTransferTaskOperator::get_max_task_id_from_history(
return ret;
}
int ObTransferTaskOperator::get_last_task_by_balance_task_id(
common::ObISQLClient &sql_proxy,
const uint64_t tenant_id,
const ObBalanceTaskID &balance_task_id,
ObTransferTask &last_task,
int64_t &finish_time)
{
int ret = OB_SUCCESS;
last_task.reset();
finish_time = OB_INVALID_TIMESTAMP;
int64_t create_time = OB_INVALID_TIMESTAMP;
const bool with_time = true;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !balance_task_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(balance_task_id));
} else {
SMART_VAR(ObISQLClient::ReadResult, result) {
ObSqlString sql;
common::sqlclient::ObMySQLResult *res = NULL;
if (OB_FAIL(sql.assign_fmt("SELECT time_to_usec(create_time) AS create_time_int64, "
"time_to_usec(finish_time) AS finish_time_int64, * FROM %s "
"WHERE balance_task_id = %ld order by task_id desc limit 1",
OB_ALL_TRANSFER_TASK_HISTORY_TNAME,
balance_task_id.id()))) {
LOG_WARN("fail to assign sql", KR(ret), K(balance_task_id));
} else if (OB_FAIL(sql_proxy.read(result, tenant_id, sql.ptr()))) {
LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql));
} else if (OB_ISNULL(res = result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get mysql result failed", KR(ret), K(tenant_id), K(sql));
} else if (OB_FAIL(res->next())) {
if (OB_ITER_END == ret) {
ret = OB_ENTRY_NOT_EXIST;
LOG_TRACE("task not found", KR(ret), K(tenant_id), K(balance_task_id));
} else {
LOG_WARN("next failed", KR(ret), K(sql));
}
} else if (OB_FAIL(parse_sql_result_(*res, with_time, last_task, create_time, finish_time))) {
LOG_WARN("parse sql result failed", KR(ret), K(with_time), K(sql));
} else {
LOG_TRACE("get last task by balance task id from history", KR(ret),
K(tenant_id), K(balance_task_id), K(last_task), K(create_time), K(finish_time));
}
}
}
return ret;
}
int ObTransferTaskOperator::update_comment(
common::ObISQLClient &sql_proxy,
const uint64_t tenant_id,

View File

@ -407,6 +407,26 @@ public:
const uint64_t tenant_id,
ObTransferTaskID &max_task_id);
/*
* get last transfer task from __all_transfer_task_history by balance_task_id
*
* @param [in] sql_proxy: sql client
* @param [in] tenant_id: target tenant_id
* @param [in] balance_task_id: target balance task id
* @param [out] last_task: transfer task with max transfer_id in history
* @param [out] finish_time: finish time of the last task
* @return
* - OB_SUCCESS: successful
* - OB_ENTRY_NOT_EXIST: no history
* - other: failed
*/
static int get_last_task_by_balance_task_id(
common::ObISQLClient &sql_proxy,
const uint64_t tenant_id,
const ObBalanceTaskID &balance_task_id,
ObTransferTask &last_task,
int64_t &finish_time);
/*
* update comment in __all_transfer_task according to task_id
*

View File

@ -424,6 +424,7 @@ _transfer_service_wakeup_interval
_transfer_start_retry_count
_transfer_start_rpc_timeout
_transfer_start_trans_timeout
_transfer_task_retry_interval
_transfer_task_tablet_count_threshold
_tx_data_memory_limit_percentage
_tx_result_retention