diff --git a/mittest/simple_server/test_tenant_transfer_service.cpp b/mittest/simple_server/test_tenant_transfer_service.cpp index 0bdd557e4d..872bd6ca3e 100644 --- a/mittest/simple_server/test_tenant_transfer_service.cpp +++ b/mittest/simple_server/test_tenant_transfer_service.cpp @@ -240,13 +240,38 @@ TEST_F(TestTenantTransferService, test_service) ObString finished_part_list_str; ASSERT_EQ(OB_SUCCESS, finished_part_list.to_display_str(allocator, finished_part_list_str)); LOG_WARN("finished_part_list", K(finished_part_list_str)); - ASSERT_TRUE(0 == finished_part_list_str.case_compare("500002:500003,500002:500004,500016:500014,500016:500015")); + ASSERT_TRUE(0 == finished_part_list_str.case_compare("500002:500005,500002:500006,500016:500014,500016:500015")); ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, task_id, false, task, 0/*group_id*/)); create_time = OB_INVALID_TIMESTAMP; finish_time = OB_INVALID_TIMESTAMP; ObTransferTask history_task; ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get_history_task(inner_sql_proxy, g_tenant_id, task_id, history_task, create_time, finish_time)); ASSERT_TRUE(history_task.get_status().is_completed_status()); + + // test retry task with interval + sql.reset(); + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("alter system set _transfer_task_retry_interval = '1h'")); + ASSERT_EQ(OB_SUCCESS, inner_sql_proxy.write(g_tenant_id, sql.ptr(), affected_rows)); + sql.reset(); + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("update oceanbase.__all_transfer_task_history set status = 'FAILED' where task_id = %ld", init_task.get_task_id().id())); + ASSERT_EQ(OB_SUCCESS, inner_sql_proxy.write(g_tenant_id, sql.ptr(), affected_rows)); + ObTransferTask retry_task; + ObTransferTaskID retry_task_id(444); + ASSERT_EQ(OB_SUCCESS, gen_mock_data(retry_task_id, ObTransferStatus(ObTransferStatus::INIT), retry_task)); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::insert(inner_sql_proxy, g_tenant_id, retry_task)); + ASSERT_EQ(OB_NEED_RETRY, tenant_transfer->process_init_task_(retry_task_id)); + ObTransferTask retry_task_after_process; + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, retry_task_id, false, retry_task_after_process, 0/*group_id*/)); + ASSERT_TRUE(WAIT_DUE_TO_LAST_FAILURE == retry_task_after_process.get_comment() && retry_task_after_process.get_status().is_init_status()); + sleep(10); // 20s + ASSERT_EQ(OB_NEED_RETRY, tenant_transfer->process_init_task_(retry_task_id)); // _transfer_task_retry_interval effect + retry_task_after_process.reset(); + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, retry_task_id, false, retry_task_after_process, 0/*group_id*/)); + ASSERT_TRUE(WAIT_DUE_TO_LAST_FAILURE == retry_task_after_process.get_comment() && retry_task_after_process.get_status().is_init_status()); + sql.reset(); + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("alter system set _transfer_task_retry_interval = '0'")); + ASSERT_EQ(OB_SUCCESS, inner_sql_proxy.write(g_tenant_id, sql.ptr(), affected_rows)); + ASSERT_TRUE(OB_NEED_RETRY != tenant_transfer->process_init_task_(retry_task_id)); } TEST_F(TestTenantTransferService, test_batch_part_list) diff --git a/mittest/simple_server/test_transfer_task_operator.cpp b/mittest/simple_server/test_transfer_task_operator.cpp index 21cd6bee0c..3d467fe205 100644 --- a/mittest/simple_server/test_transfer_task_operator.cpp +++ b/mittest/simple_server/test_transfer_task_operator.cpp @@ -223,6 +223,11 @@ TEST_F(TestTransferTaskOperator, test_operator) ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get_max_task_id_from_history(sql_proxy, tenant_id_, max_task_id)); ASSERT_TRUE(!max_task_id.is_valid()); + // getget_last_task_by_balance_task_id when history is empty + ObTransferTask last_task; + int64_t finish_time = OB_INVALID_TIMESTAMP; + ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferTaskOperator::get_last_task_by_balance_task_id(sql_proxy, tenant_id_, ObBalanceTaskID(1), last_task, finish_time)); + // insert ObTransferTask other_task; ObTransferTaskID other_task_id(222); @@ -250,7 +255,7 @@ TEST_F(TestTransferTaskOperator, test_operator) // get_task_with_time int64_t create_time = OB_INVALID_TIMESTAMP; - int64_t finish_time = OB_INVALID_TIMESTAMP; + finish_time = OB_INVALID_TIMESTAMP; task.reset(); ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get_task_with_time(sql_proxy, tenant_id_, task_id_, true, task, create_time, finish_time)); ASSERT_TRUE(OB_INVALID_TIMESTAMP != create_time); @@ -384,6 +389,12 @@ TEST_F(TestTransferTaskOperator, test_operator) max_task_id.reset(); ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get_max_task_id_from_history(sql_proxy, tenant_id_, max_task_id)); ASSERT_TRUE(max_task_id == task_id_); + + last_task.reset(); + finish_time = OB_INVALID_TIMESTAMP; + ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get_last_task_by_balance_task_id(sql_proxy, tenant_id_, task_.get_balance_task_id(), last_task, finish_time)); + ASSERT_TRUE(last_task.is_valid() && last_task.get_task_id() == task_id_); + ASSERT_TRUE(finish_time > 0); } } // namespace share diff --git a/src/rootserver/ob_tenant_transfer_service.cpp b/src/rootserver/ob_tenant_transfer_service.cpp index 4f416bfc34..a51898cd11 100644 --- a/src/rootserver/ob_tenant_transfer_service.cpp +++ b/src/rootserver/ob_tenant_transfer_service.cpp @@ -187,6 +187,7 @@ int ObTenantTransferService::process_init_task_(const ObTransferTaskID task_id) ObTransferPartList lock_conflict_part_list; ObDisplayTabletList table_lock_tablet_list; ObTimeoutCtx ctx; + bool need_wait = false; const int64_t start_time = ObTimeUtil::current_time(); TTS_INFO("start to process init task", K(task_id), K(start_time)); @@ -215,6 +216,13 @@ int ObTenantTransferService::process_init_task_(const ObTransferTaskID task_id) ret = OB_STATE_NOT_MATCH; LOG_WARN("task status is not init", KR(ret), K(task)); } else if (FALSE_IT(ObCurTraceId::set(task.get_trace_id()))) { + } else if (OB_FAIL(check_if_need_wait_due_to_last_failure_(*sql_proxy_, task, need_wait))) { + LOG_WARN("check if need wait due to last failure failed", KR(ret), K(task), K(need_wait)); + } else if (need_wait) { + result_comment = WAIT_DUE_TO_LAST_FAILURE; + ret = OB_NEED_RETRY; + TTS_INFO("last task failed, need to process task later", + KR(ret), K_(tenant_id), K(task), "result_comment", transfer_task_comment_to_str(result_comment)); } else if (OB_FAIL(check_ls_member_list_( *sql_proxy_, task.get_src_ls(), @@ -305,6 +313,58 @@ int ObTenantTransferService::process_init_task_(const ObTransferTaskID task_id) return ret; } +int ObTenantTransferService::check_if_need_wait_due_to_last_failure_( + common::ObISQLClient &sql_proxy, + const ObTransferTask &task, + bool &need_wait) +{ + int ret = OB_SUCCESS; + need_wait = false; + ObTransferTask last_task; + int64_t finish_time = OB_INVALID_TIMESTAMP; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (OB_UNLIKELY(!task.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid task", KR(ret), K(task)); + } else { + int64_t wait_interval = 60 * 1000 * 1000L; // default 1m + { + omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_)); + if (tenant_config.is_valid()) { + wait_interval = tenant_config->_transfer_task_retry_interval; + } + } // release guard + if (OB_FAIL(ret)) { + } else if (0 == wait_interval) { + need_wait = false; + } else if (OB_FAIL(ObTransferTaskOperator::get_last_task_by_balance_task_id( + sql_proxy, + tenant_id_, + task.get_balance_task_id(), + last_task, + finish_time))) { + if (OB_ENTRY_NOT_EXIST == ret) { + ret = OB_SUCCESS; + need_wait = false; + } else { + LOG_WARN("get last task by balance task id failed", + KR(ret), K(tenant_id_), K(task), K(last_task)); + } + } else if (OB_UNLIKELY(!last_task.is_valid())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("last task should be valid", KR(ret), K(task), K(last_task)); + } else if (last_task.get_status().is_failed_status() + && ObTimeUtil::current_time() - finish_time < wait_interval) { + need_wait = true; + LOG_TRACE("last task failed, need wait", KR(ret), + K(task), K(last_task), K(finish_time), K(wait_interval)); + } + } + return ret; +} + ERRSIM_POINT_DEF(EN_TENANT_TRANSFER_CHECK_LS_MEMBER_LIST_NOT_SAME); // 1.check leader member_lists of src_ls and dest_ls are same @@ -1625,7 +1685,7 @@ int ObTenantTransferService::update_comment_for_expected_errors_( } else if (OB_TRANS_TIMEOUT == err || OB_TIMEOUT == err) { actual_comment = TRANSACTION_TIMEOUT; } else if (OB_NEED_RETRY == err) { - if (WAIT_FOR_MEMBER_LIST != result_comment && INACTIVE_SERVER_IN_MEMBER_LIST != result_comment) { + if (result_comment < EMPTY_COMMENT || result_comment >= MAX_COMMENT) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected comment with err", KR(ret), K(err), K(result_comment)); } else { diff --git a/src/rootserver/ob_tenant_transfer_service.h b/src/rootserver/ob_tenant_transfer_service.h index 472039010e..c08fed8d3f 100644 --- a/src/rootserver/ob_tenant_transfer_service.h +++ b/src/rootserver/ob_tenant_transfer_service.h @@ -231,6 +231,10 @@ private: int construct_ls_member_list_( common::sqlclient::ObMySQLResult &res, share::ObLSReplica::MemberList &ls_member_list); + int check_if_need_wait_due_to_last_failure_( + common::ObISQLClient &sql_proxy, + const share::ObTransferTask &task, + bool &need_wait); private: static const int64_t IDLE_TIME_US = 10 * 1000 * 1000L; // 10s static const int64_t BUSY_IDLE_TIME_US = 100 * 1000L; // 100ms diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index 80866c2e0e..7a08719552 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -1556,22 +1556,28 @@ DEF_INT(_transfer_start_retry_count, OB_TENANT_PARAMETER, "3", "[0,64]", DEF_TIME(_transfer_service_wakeup_interval, OB_TENANT_PARAMETER, "5m", "[1s,5m]", - "transfer service wakeup interval in errsim mode" + "transfer service wakeup interval in errsim mode. " "Range: [1s, 5m]", ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_TIME(_transfer_process_lock_tx_timeout, OB_TENANT_PARAMETER, "100s", "[30s,)", - "transaction timeout for locking and unlocking transfer task" + "transaction timeout for locking and unlocking transfer task. " "Range: [30s, +∞)", ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_INT(_transfer_task_tablet_count_threshold, OB_TENANT_PARAMETER, "100", "(0,100]", - "Threshold for the count of tablets that can be processed by a transfer task" + "Threshold for the count of tablets that can be processed by a transfer task. " "Range: (0, 100]", ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_BOOL(_enable_active_txn_transfer, OB_TENANT_PARAMETER, "True", "Specifies whether support transfer active tx", ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); + +DEF_TIME(_transfer_task_retry_interval, OB_TENANT_PARAMETER, "1m", "[0s,)", + "Retry interval after transfer task failure. " + "Range: [0s, +∞). Default: 1m", + ObParameterAttr(Section::ROOT_SERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); + // end of transfer DEF_TIME(dump_data_dictionary_to_log_interval, OB_TENANT_PARAMETER, "24h", "(0s,]", diff --git a/src/share/transfer/ob_transfer_info.cpp b/src/share/transfer/ob_transfer_info.cpp index 115a38d799..b30f1870c5 100644 --- a/src/share/transfer/ob_transfer_info.cpp +++ b/src/share/transfer/ob_transfer_info.cpp @@ -423,6 +423,7 @@ static const char* TRANSFER_TASK_COMMENT_ARRAY[] = "Task canceled", "Unable to process task due to transaction timeout", "Unable to process task due to inactive server in member list", + "Wait to retry due to the last failure", "Unknow"/*MAX_COMMENT*/ }; diff --git a/src/share/transfer/ob_transfer_info.h b/src/share/transfer/ob_transfer_info.h index fabea0164c..160c78b0ba 100644 --- a/src/share/transfer/ob_transfer_info.h +++ b/src/share/transfer/ob_transfer_info.h @@ -281,6 +281,7 @@ enum ObTransferTaskComment TASK_CANCELED = 3, TRANSACTION_TIMEOUT = 4, INACTIVE_SERVER_IN_MEMBER_LIST = 5, + WAIT_DUE_TO_LAST_FAILURE = 6, MAX_COMMENT }; diff --git a/src/share/transfer/ob_transfer_task_operator.cpp b/src/share/transfer/ob_transfer_task_operator.cpp index 81213839bb..125d8fced4 100644 --- a/src/share/transfer/ob_transfer_task_operator.cpp +++ b/src/share/transfer/ob_transfer_task_operator.cpp @@ -76,8 +76,8 @@ int ObTransferTaskOperator::get_task_with_time( ObSqlString sql; common::sqlclient::ObMySQLResult *res = NULL; const bool with_time = true; - if (OB_FAIL(sql.assign_fmt("SELECT time_to_usec(gmt_create) AS create_time, " - "time_to_usec(gmt_modified) AS finish_time, * FROM %s WHERE task_id = %ld%s", + if (OB_FAIL(sql.assign_fmt("SELECT time_to_usec(gmt_create) AS create_time_int64, " + "time_to_usec(gmt_modified) AS finish_time_int64, * FROM %s WHERE task_id = %ld%s", OB_ALL_TRANSFER_TASK_TNAME, task_id.id(), for_update ? " FOR UPDATE" : ""))) { LOG_WARN("fail to assign sql", KR(ret), K(task_id), K(for_update)); } else if (OB_FAIL(sql_proxy.read(result, tenant_id, sql.ptr()))) { @@ -830,8 +830,8 @@ int ObTransferTaskOperator::parse_sql_result_( common::ObCurTraceId::TraceId trace_id; if (with_time) { - (void)GET_COL_IGNORE_NULL(res.get_int, "create_time", create_time); - (void)GET_COL_IGNORE_NULL(res.get_int, "finish_time", finish_time); + (void)GET_COL_IGNORE_NULL(res.get_int, "create_time_int64", create_time); + (void)GET_COL_IGNORE_NULL(res.get_int, "finish_time_int64", finish_time); } (void)GET_COL_IGNORE_NULL(res.get_int, "task_id", task_id); (void)GET_COL_IGNORE_NULL(res.get_int, "src_ls", src_ls); @@ -981,8 +981,8 @@ int ObTransferTaskOperator::get_history_task( ObSqlString sql; bool with_time = true; common::sqlclient::ObMySQLResult *res = NULL; - if (OB_FAIL(sql.assign_fmt("SELECT time_to_usec(gmt_create) AS create_time, " - "time_to_usec(gmt_modified) AS finish_time, * FROM %s WHERE task_id = %ld", + if (OB_FAIL(sql.assign_fmt("SELECT time_to_usec(create_time) AS create_time_int64, " + "time_to_usec(finish_time) AS finish_time_int64, * FROM %s WHERE task_id = %ld", OB_ALL_TRANSFER_TASK_HISTORY_TNAME, task_id.id()))) { LOG_WARN("fail to assign sql", KR(ret), K(task_id)); } else if (OB_FAIL(sql_proxy.read(result, tenant_id, sql.ptr()))) { @@ -1049,6 +1049,54 @@ int ObTransferTaskOperator::get_max_task_id_from_history( return ret; } +int ObTransferTaskOperator::get_last_task_by_balance_task_id( + common::ObISQLClient &sql_proxy, + const uint64_t tenant_id, + const ObBalanceTaskID &balance_task_id, + ObTransferTask &last_task, + int64_t &finish_time) +{ + int ret = OB_SUCCESS; + last_task.reset(); + finish_time = OB_INVALID_TIMESTAMP; + int64_t create_time = OB_INVALID_TIMESTAMP; + const bool with_time = true; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !balance_task_id.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(balance_task_id)); + } else { + SMART_VAR(ObISQLClient::ReadResult, result) { + ObSqlString sql; + common::sqlclient::ObMySQLResult *res = NULL; + if (OB_FAIL(sql.assign_fmt("SELECT time_to_usec(create_time) AS create_time_int64, " + "time_to_usec(finish_time) AS finish_time_int64, * FROM %s " + "WHERE balance_task_id = %ld order by task_id desc limit 1", + OB_ALL_TRANSFER_TASK_HISTORY_TNAME, + balance_task_id.id()))) { + LOG_WARN("fail to assign sql", KR(ret), K(balance_task_id)); + } else if (OB_FAIL(sql_proxy.read(result, tenant_id, sql.ptr()))) { + LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql)); + } else if (OB_ISNULL(res = result.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get mysql result failed", KR(ret), K(tenant_id), K(sql)); + } else if (OB_FAIL(res->next())) { + if (OB_ITER_END == ret) { + ret = OB_ENTRY_NOT_EXIST; + LOG_TRACE("task not found", KR(ret), K(tenant_id), K(balance_task_id)); + } else { + LOG_WARN("next failed", KR(ret), K(sql)); + } + } else if (OB_FAIL(parse_sql_result_(*res, with_time, last_task, create_time, finish_time))) { + LOG_WARN("parse sql result failed", KR(ret), K(with_time), K(sql)); + } else { + LOG_TRACE("get last task by balance task id from history", KR(ret), + K(tenant_id), K(balance_task_id), K(last_task), K(create_time), K(finish_time)); + } + } + } + return ret; +} + int ObTransferTaskOperator::update_comment( common::ObISQLClient &sql_proxy, const uint64_t tenant_id, diff --git a/src/share/transfer/ob_transfer_task_operator.h b/src/share/transfer/ob_transfer_task_operator.h index 09ae3735dd..86548cafcd 100644 --- a/src/share/transfer/ob_transfer_task_operator.h +++ b/src/share/transfer/ob_transfer_task_operator.h @@ -407,6 +407,26 @@ public: const uint64_t tenant_id, ObTransferTaskID &max_task_id); + /* + * get last transfer task from __all_transfer_task_history by balance_task_id + * + * @param [in] sql_proxy: sql client + * @param [in] tenant_id: target tenant_id + * @param [in] balance_task_id: target balance task id + * @param [out] last_task: transfer task with max transfer_id in history + * @param [out] finish_time: finish time of the last task + * @return + * - OB_SUCCESS: successful + * - OB_ENTRY_NOT_EXIST: no history + * - other: failed + */ + static int get_last_task_by_balance_task_id( + common::ObISQLClient &sql_proxy, + const uint64_t tenant_id, + const ObBalanceTaskID &balance_task_id, + ObTransferTask &last_task, + int64_t &finish_time); + /* * update comment in __all_transfer_task according to task_id * diff --git a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result index 45a2b03230..81af566a5d 100644 --- a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result +++ b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result @@ -424,6 +424,7 @@ _transfer_service_wakeup_interval _transfer_start_retry_count _transfer_start_rpc_timeout _transfer_start_trans_timeout +_transfer_task_retry_interval _transfer_task_tablet_count_threshold _tx_data_memory_limit_percentage _tx_result_retention