|
|
|
|
@ -875,8 +875,6 @@ int ObStartCompleteMigrationTask::process()
|
|
|
|
|
LOG_WARN("failed to wait transfer table replace", K(ret), KPC(ctx_));
|
|
|
|
|
} else if (OB_FAIL(check_all_tablet_ready_())) {
|
|
|
|
|
LOG_WARN("failed to check all tablet ready", K(ret), KPC(ctx_));
|
|
|
|
|
} else if (OB_FAIL(wait_trans_tablet_explain_data_())) {
|
|
|
|
|
LOG_WARN("failed to wait log replay sync", K(ret), KPC(ctx_));
|
|
|
|
|
} else if (OB_FAIL(wait_log_replay_to_max_minor_end_scn_())) {
|
|
|
|
|
LOG_WARN("failed to wait log replay to max minor end scn", K(ret), KPC(ctx_));
|
|
|
|
|
} else if (OB_FAIL(update_ls_migration_status_hold_())) {
|
|
|
|
|
@ -897,6 +895,17 @@ int ObStartCompleteMigrationTask::process()
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int ObStartCompleteMigrationTask::get_wait_timeout_(int64_t &timeout)
|
|
|
|
|
{
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
timeout = 10_min;
|
|
|
|
|
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
|
|
|
|
|
if (tenant_config.is_valid()) {
|
|
|
|
|
timeout = tenant_config->_ls_migration_wait_completing_timeout;
|
|
|
|
|
}
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int ObStartCompleteMigrationTask::wait_log_sync_()
|
|
|
|
|
{
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
@ -905,9 +914,9 @@ int ObStartCompleteMigrationTask::wait_log_sync_()
|
|
|
|
|
bool is_need_rebuild = false;
|
|
|
|
|
palf::LSN last_end_lsn(0);
|
|
|
|
|
palf::LSN current_end_lsn(0);
|
|
|
|
|
const int64_t OB_CHECK_LOG_SYNC_INTERVAL = 200 * 1000; // 200ms
|
|
|
|
|
const int64_t CLOG_IN_SYNC_DELAY_TIMEOUT = 30 * 60 * 1000 * 1000; // 30 min
|
|
|
|
|
bool need_wait = true;
|
|
|
|
|
ObTimeoutCtx timeout_ctx;
|
|
|
|
|
int64_t timeout = 10_min;
|
|
|
|
|
|
|
|
|
|
if (!is_inited_) {
|
|
|
|
|
ret = OB_NOT_INIT;
|
|
|
|
|
@ -919,6 +928,10 @@ int ObStartCompleteMigrationTask::wait_log_sync_()
|
|
|
|
|
LOG_WARN("failed to check need wait log sync", K(ret), KPC(ctx_));
|
|
|
|
|
} else if (!need_wait) {
|
|
|
|
|
FLOG_INFO("no need wait log sync", KPC(ctx_));
|
|
|
|
|
} else if (OB_FAIL(get_wait_timeout_(timeout))) {
|
|
|
|
|
LOG_WARN("failed to get wait timeout", K(ret));
|
|
|
|
|
} else if (OB_FAIL(init_timeout_ctx_(timeout, timeout_ctx))) {
|
|
|
|
|
LOG_WARN("failed to init timeout ctx", K(ret));
|
|
|
|
|
} else {
|
|
|
|
|
#ifdef ERRSIM
|
|
|
|
|
SERVER_EVENT_SYNC_ADD("storage_ha", "wait_log_sync",
|
|
|
|
|
@ -930,7 +943,10 @@ int ObStartCompleteMigrationTask::wait_log_sync_()
|
|
|
|
|
int64_t current_ts = 0;
|
|
|
|
|
int64_t last_wait_replay_ts = ObTimeUtility::current_time();
|
|
|
|
|
while (OB_SUCC(ret) && !is_log_sync) {
|
|
|
|
|
if (OB_FAIL(check_ls_and_task_status_(ls))) {
|
|
|
|
|
if (timeout_ctx.is_timeouted()) {
|
|
|
|
|
ret = OB_LOG_NOT_SYNC;
|
|
|
|
|
LOG_WARN("already timeout", K(ret), KPC(ctx_));
|
|
|
|
|
} else if (OB_FAIL(check_ls_and_task_status_(ls))) {
|
|
|
|
|
LOG_WARN("failed to check ls and task status", K(ret), KPC(ctx_));
|
|
|
|
|
} else if (OB_FAIL(ls->is_in_sync(is_log_sync, is_need_rebuild))) {
|
|
|
|
|
LOG_WARN("failed to check is in sync", K(ret), KPC(ctx_));
|
|
|
|
|
@ -958,7 +974,7 @@ int ObStartCompleteMigrationTask::wait_log_sync_()
|
|
|
|
|
|
|
|
|
|
if (current_end_lsn == last_end_lsn) {
|
|
|
|
|
const int64_t current_ts = ObTimeUtility::current_time();
|
|
|
|
|
if ((current_ts - last_wait_replay_ts) > CLOG_IN_SYNC_DELAY_TIMEOUT) {
|
|
|
|
|
if ((current_ts - last_wait_replay_ts) > timeout) {
|
|
|
|
|
is_timeout = true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -968,7 +984,7 @@ int ObStartCompleteMigrationTask::wait_log_sync_()
|
|
|
|
|
} else {
|
|
|
|
|
ret = OB_LOG_NOT_SYNC;
|
|
|
|
|
STORAGE_LOG(WARN, "failed to check log replay sync. timeout, stop migration task",
|
|
|
|
|
K(ret), K(*ctx_), K(CLOG_IN_SYNC_DELAY_TIMEOUT), K(wait_replay_start_ts),
|
|
|
|
|
K(ret), K(*ctx_), K(timeout), K(wait_replay_start_ts),
|
|
|
|
|
K(current_ts), K(current_end_lsn));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -981,7 +997,7 @@ int ObStartCompleteMigrationTask::wait_log_sync_()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
ob_usleep(OB_CHECK_LOG_SYNC_INTERVAL);
|
|
|
|
|
ob_usleep(CHECK_CONDITION_INTERVAL);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -1004,13 +1020,12 @@ int ObStartCompleteMigrationTask::wait_log_replay_sync_()
|
|
|
|
|
bool wait_log_replay_success = false;
|
|
|
|
|
SCN current_replay_scn;
|
|
|
|
|
SCN last_replay_scn;
|
|
|
|
|
const int64_t OB_CHECK_LOG_REPLAY_INTERVAL = 200 * 1000; // 200ms
|
|
|
|
|
const int64_t CLOG_IN_REPLAY_DELAY_TIMEOUT = 10 * 60 * 1000 * 1000L; // 10 min
|
|
|
|
|
//TODO(muwei.ym) MAKE THIS TIME PARAM as hide configuration iterms
|
|
|
|
|
bool need_wait = false;
|
|
|
|
|
bool is_done = false;
|
|
|
|
|
const bool is_primay_tenant = MTL_IS_PRIMARY_TENANT();
|
|
|
|
|
share::SCN readable_scn;
|
|
|
|
|
ObTimeoutCtx timeout_ctx;
|
|
|
|
|
int64_t timeout = 10_min;
|
|
|
|
|
|
|
|
|
|
if (!is_inited_) {
|
|
|
|
|
ret = OB_NOT_INIT;
|
|
|
|
|
@ -1033,6 +1048,10 @@ int ObStartCompleteMigrationTask::wait_log_replay_sync_()
|
|
|
|
|
FLOG_INFO("no need wait replay log sync", KPC(ctx_));
|
|
|
|
|
} else if (!is_primay_tenant && OB_FAIL(ObStorageHAUtils::get_readable_scn_with_retry(readable_scn))) {
|
|
|
|
|
LOG_WARN("failed to get readable scn", K(ret), KPC(ctx_));
|
|
|
|
|
} else if (OB_FAIL(get_wait_timeout_(timeout))) {
|
|
|
|
|
LOG_WARN("failed to get wait timeout", K(ret));
|
|
|
|
|
} else if (OB_FAIL(init_timeout_ctx_(timeout, timeout_ctx))) {
|
|
|
|
|
LOG_WARN("failed to init timeout ctx", K(ret));
|
|
|
|
|
} else {
|
|
|
|
|
#ifdef ERRSIM
|
|
|
|
|
SERVER_EVENT_SYNC_ADD("storage_ha", "wait_log_replay_sync",
|
|
|
|
|
@ -1045,7 +1064,10 @@ int ObStartCompleteMigrationTask::wait_log_replay_sync_()
|
|
|
|
|
int64_t current_ts = 0;
|
|
|
|
|
bool need_rebuild = false;
|
|
|
|
|
while (OB_SUCC(ret) && !wait_log_replay_success) {
|
|
|
|
|
if (OB_FAIL(check_ls_and_task_status_(ls))) {
|
|
|
|
|
if (timeout_ctx.is_timeouted()) {
|
|
|
|
|
ret = OB_WAIT_REPLAY_TIMEOUT;
|
|
|
|
|
LOG_WARN("already timeout", K(ret), KPC(ctx_));
|
|
|
|
|
} else if (OB_FAIL(check_ls_and_task_status_(ls))) {
|
|
|
|
|
LOG_WARN("failed to check ls and task status", K(ret), KPC(ctx_));
|
|
|
|
|
} else if (OB_FAIL(rebuild_service->check_ls_need_rebuild(ls->get_ls_id(), need_rebuild))) {
|
|
|
|
|
LOG_WARN("failed to check ls need rebuild", K(ret), KPC(ls));
|
|
|
|
|
@ -1081,7 +1103,7 @@ int ObStartCompleteMigrationTask::wait_log_replay_sync_()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (current_replay_scn == last_replay_scn) {
|
|
|
|
|
if (current_ts - last_replay_ts > CLOG_IN_REPLAY_DELAY_TIMEOUT) {
|
|
|
|
|
if (current_ts - last_replay_ts > timeout) {
|
|
|
|
|
is_timeout = true;
|
|
|
|
|
}
|
|
|
|
|
if (is_timeout) {
|
|
|
|
|
@ -1090,7 +1112,7 @@ int ObStartCompleteMigrationTask::wait_log_replay_sync_()
|
|
|
|
|
} else {
|
|
|
|
|
ret = OB_WAIT_REPLAY_TIMEOUT;
|
|
|
|
|
STORAGE_LOG(WARN, "failed to check log replay sync. timeout, stop migration task",
|
|
|
|
|
K(ret), K(*ctx_), K(CLOG_IN_REPLAY_DELAY_TIMEOUT), K(wait_replay_start_ts),
|
|
|
|
|
K(ret), K(*ctx_), K(timeout), K(wait_replay_start_ts),
|
|
|
|
|
K(current_ts), K(current_replay_scn));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -1104,7 +1126,7 @@ int ObStartCompleteMigrationTask::wait_log_replay_sync_()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
ob_usleep(OB_CHECK_LOG_REPLAY_INTERVAL);
|
|
|
|
|
ob_usleep(CHECK_CONDITION_INTERVAL);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -1125,6 +1147,9 @@ int ObStartCompleteMigrationTask::wait_transfer_table_replace_()
|
|
|
|
|
const int64_t check_all_tablet_start_ts = ObTimeUtility::current_time();
|
|
|
|
|
const bool need_initial_state = false;
|
|
|
|
|
bool need_wait_transfer_table_replace = false;
|
|
|
|
|
ObTimeoutCtx timeout_ctx;
|
|
|
|
|
int64_t timeout = 10_min;
|
|
|
|
|
|
|
|
|
|
if (!is_inited_) {
|
|
|
|
|
ret = OB_NOT_INIT;
|
|
|
|
|
LOG_WARN("start complete migration task do not init", K(ret));
|
|
|
|
|
@ -1135,6 +1160,10 @@ int ObStartCompleteMigrationTask::wait_transfer_table_replace_()
|
|
|
|
|
LOG_WARN("failed to check need wait transfer table replace", K(ret), KPC(ctx_));
|
|
|
|
|
} else if (!need_wait_transfer_table_replace) {
|
|
|
|
|
LOG_INFO("no need wait transfer table replace", KPC(ls));
|
|
|
|
|
} else if (OB_FAIL(get_wait_timeout_(timeout))) {
|
|
|
|
|
LOG_WARN("failed to get wait timeout", K(ret));
|
|
|
|
|
} else if (OB_FAIL(init_timeout_ctx_(timeout, timeout_ctx))) {
|
|
|
|
|
LOG_WARN("failed to init timeout ctx", K(ret));
|
|
|
|
|
} else {
|
|
|
|
|
SERVER_EVENT_ADD("storage_ha", "wait_transfer_table_replace",
|
|
|
|
|
"tenant_id", ctx_->tenant_id_,
|
|
|
|
|
@ -1145,14 +1174,17 @@ int ObStartCompleteMigrationTask::wait_transfer_table_replace_()
|
|
|
|
|
LOG_WARN("failed to build tablet iter", K(ret), KPC(ctx_));
|
|
|
|
|
} else {
|
|
|
|
|
while (OB_SUCC(ret)) {
|
|
|
|
|
if (OB_FAIL(iter.get_next_tablet_id(tablet_id))) {
|
|
|
|
|
if (timeout_ctx.is_timeouted()) {
|
|
|
|
|
ret = OB_WAIT_TABLET_READY_TIMEOUT;
|
|
|
|
|
LOG_WARN("already timeout", K(ret), KPC(ctx_));
|
|
|
|
|
} else if (OB_FAIL(iter.get_next_tablet_id(tablet_id))) {
|
|
|
|
|
if (OB_ITER_END == ret) {
|
|
|
|
|
ret = OB_SUCCESS;
|
|
|
|
|
break;
|
|
|
|
|
} else {
|
|
|
|
|
LOG_WARN("failed to get tablet id", K(ret));
|
|
|
|
|
}
|
|
|
|
|
} else if (OB_FAIL(check_tablet_transfer_table_ready_(tablet_id, ls))) {
|
|
|
|
|
} else if (OB_FAIL(check_tablet_transfer_table_ready_(tablet_id, ls, timeout))) {
|
|
|
|
|
LOG_WARN("failed to check tablet ready", K(ret), K(tablet_id), KPC(ls));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -1163,24 +1195,6 @@ int ObStartCompleteMigrationTask::wait_transfer_table_replace_()
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int ObStartCompleteMigrationTask::wait_trans_tablet_explain_data_()
|
|
|
|
|
{
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
ObLS *ls = nullptr;
|
|
|
|
|
logservice::ObLogService *log_service = nullptr;
|
|
|
|
|
|
|
|
|
|
if (!is_inited_) {
|
|
|
|
|
ret = OB_NOT_INIT;
|
|
|
|
|
LOG_WARN("start prepare migration task do not init", K(ret));
|
|
|
|
|
} else if (OB_ISNULL(ls = ls_handle_.get_ls())) {
|
|
|
|
|
ret = OB_ERR_UNEXPECTED;
|
|
|
|
|
LOG_WARN("ls should not be NULL", K(ret), KPC(ctx_));
|
|
|
|
|
} else {
|
|
|
|
|
//TODO(muwei.ym) wait log replay to max tablet minor sstable log ts in 4.3
|
|
|
|
|
}
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int ObStartCompleteMigrationTask::change_member_list_with_retry_()
|
|
|
|
|
{
|
|
|
|
|
//change to HOLD status do not allow failed
|
|
|
|
|
@ -1523,6 +1537,8 @@ int ObStartCompleteMigrationTask::check_all_tablet_ready_()
|
|
|
|
|
ObLS *ls = nullptr;
|
|
|
|
|
const int64_t check_all_tablet_start_ts = ObTimeUtility::current_time();
|
|
|
|
|
const bool need_initial_state = false;
|
|
|
|
|
ObTimeoutCtx timeout_ctx;
|
|
|
|
|
int64_t timeout = 10_min;
|
|
|
|
|
|
|
|
|
|
if (!is_inited_) {
|
|
|
|
|
ret = OB_NOT_INIT;
|
|
|
|
|
@ -1530,6 +1546,10 @@ int ObStartCompleteMigrationTask::check_all_tablet_ready_()
|
|
|
|
|
} else if (OB_ISNULL(ls = ls_handle_.get_ls())) {
|
|
|
|
|
ret = OB_ERR_UNEXPECTED;
|
|
|
|
|
LOG_WARN("failed to change member list", K(ret), KP(ls));
|
|
|
|
|
} else if (OB_FAIL(get_wait_timeout_(timeout))) {
|
|
|
|
|
LOG_WARN("failed to get wait timeout", K(ret));
|
|
|
|
|
} else if (OB_FAIL(init_timeout_ctx_(timeout, timeout_ctx))) {
|
|
|
|
|
LOG_WARN("failed to init timeout ctx", K(ret));
|
|
|
|
|
} else {
|
|
|
|
|
ObHALSTabletIDIterator iter(ls->get_ls_id(), need_initial_state);
|
|
|
|
|
ObTabletID tablet_id;
|
|
|
|
|
@ -1537,14 +1557,17 @@ int ObStartCompleteMigrationTask::check_all_tablet_ready_()
|
|
|
|
|
LOG_WARN("failed to build tablet iter", K(ret), KPC(ctx_));
|
|
|
|
|
} else {
|
|
|
|
|
while (OB_SUCC(ret)) {
|
|
|
|
|
if (OB_FAIL(iter.get_next_tablet_id(tablet_id))) {
|
|
|
|
|
if (timeout_ctx.is_timeouted()) {
|
|
|
|
|
ret = OB_WAIT_TABLET_READY_TIMEOUT;
|
|
|
|
|
LOG_WARN("already timeout", K(ret), KPC(ctx_));
|
|
|
|
|
} else if (OB_FAIL(iter.get_next_tablet_id(tablet_id))) {
|
|
|
|
|
if (OB_ITER_END == ret) {
|
|
|
|
|
ret = OB_SUCCESS;
|
|
|
|
|
break;
|
|
|
|
|
} else {
|
|
|
|
|
LOG_WARN("failed to get tablet id", K(ret));
|
|
|
|
|
}
|
|
|
|
|
} else if (OB_FAIL(check_tablet_ready_(tablet_id, ls))) {
|
|
|
|
|
} else if (OB_FAIL(check_tablet_ready_(tablet_id, ls, timeout))) {
|
|
|
|
|
LOG_WARN("failed to check tablet ready", K(ret), K(tablet_id), KPC(ls));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -1557,12 +1580,11 @@ int ObStartCompleteMigrationTask::check_all_tablet_ready_()
|
|
|
|
|
|
|
|
|
|
int ObStartCompleteMigrationTask::check_tablet_ready_(
|
|
|
|
|
const common::ObTabletID &tablet_id,
|
|
|
|
|
ObLS *ls)
|
|
|
|
|
ObLS *ls,
|
|
|
|
|
const int64_t timeout)
|
|
|
|
|
{
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
const ObMDSGetTabletMode read_mode = ObMDSGetTabletMode::READ_WITHOUT_CHECK;
|
|
|
|
|
const int64_t OB_CHECK_TABLET_READY_INTERVAL = 200_ms;
|
|
|
|
|
const int64_t OB_CHECK_TABLET_READY_TIMEOUT = 30_min;
|
|
|
|
|
|
|
|
|
|
if (!is_inited_) {
|
|
|
|
|
ret = OB_NOT_INIT;
|
|
|
|
|
@ -1617,7 +1639,7 @@ int ObStartCompleteMigrationTask::check_tablet_ready_(
|
|
|
|
|
"current_ts", current_ts);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (current_ts - wait_tablet_start_ts < OB_CHECK_TABLET_READY_TIMEOUT) {
|
|
|
|
|
if (current_ts - wait_tablet_start_ts < timeout) {
|
|
|
|
|
} else {
|
|
|
|
|
if (OB_FAIL(ctx_->set_result(OB_WAIT_TABLET_READY_TIMEOUT, true /*allow_retry*/,
|
|
|
|
|
this->get_dag()->get_type()))) {
|
|
|
|
|
@ -1631,7 +1653,7 @@ int ObStartCompleteMigrationTask::check_tablet_ready_(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
ob_usleep(OB_CHECK_TABLET_READY_INTERVAL);
|
|
|
|
|
ob_usleep(CHECK_CONDITION_INTERVAL);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -1641,11 +1663,10 @@ int ObStartCompleteMigrationTask::check_tablet_ready_(
|
|
|
|
|
|
|
|
|
|
int ObStartCompleteMigrationTask::check_tablet_transfer_table_ready_(
|
|
|
|
|
const common::ObTabletID &tablet_id,
|
|
|
|
|
ObLS *ls)
|
|
|
|
|
ObLS *ls,
|
|
|
|
|
const int64_t timeout)
|
|
|
|
|
{
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
const int64_t OB_CHECK_TABLET_READY_INTERVAL = 200_ms;
|
|
|
|
|
const int64_t OB_CHECK_TABLET_READY_TIMEOUT = 30_min;
|
|
|
|
|
ObTransferService *transfer_service = nullptr;
|
|
|
|
|
|
|
|
|
|
if (!is_inited_) {
|
|
|
|
|
@ -1672,7 +1693,7 @@ int ObStartCompleteMigrationTask::check_tablet_transfer_table_ready_(
|
|
|
|
|
LOG_INFO("tablet has transfer table", K(ret), K(tablet_id), "ls_id", ls->get_ls_id());
|
|
|
|
|
const int64_t current_ts = ObTimeUtility::current_time();
|
|
|
|
|
transfer_service->wakeup();
|
|
|
|
|
if (current_ts - wait_tablet_start_ts < OB_CHECK_TABLET_READY_TIMEOUT) {
|
|
|
|
|
if (current_ts - wait_tablet_start_ts < timeout) {
|
|
|
|
|
} else {
|
|
|
|
|
if (OB_FAIL(ctx_->set_result(OB_WAIT_TABLET_READY_TIMEOUT, true /*allow_retry*/,
|
|
|
|
|
this->get_dag()->get_type()))) {
|
|
|
|
|
@ -1685,7 +1706,7 @@ int ObStartCompleteMigrationTask::check_tablet_transfer_table_ready_(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
ob_usleep(OB_CHECK_TABLET_READY_INTERVAL);
|
|
|
|
|
ob_usleep(CHECK_CONDITION_INTERVAL);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -1780,8 +1801,8 @@ int ObStartCompleteMigrationTask::wait_log_replay_to_max_minor_end_scn_()
|
|
|
|
|
ObLS *ls = nullptr;
|
|
|
|
|
bool need_wait = true;
|
|
|
|
|
SCN current_replay_scn = share::SCN::min_scn();
|
|
|
|
|
const int64_t OB_WAIT_LOG_REPLAY_INTERVAL = 200 * 1000; // 200ms
|
|
|
|
|
const int64_t OB_WAIT_LOG_REPLAY_TIMEOUT = 30 * 60 * 1000 * 1000L; // 30 min
|
|
|
|
|
ObTimeoutCtx timeout_ctx;
|
|
|
|
|
int64_t timeout = 10_min;
|
|
|
|
|
|
|
|
|
|
if (!is_inited_) {
|
|
|
|
|
ret = OB_NOT_INIT;
|
|
|
|
|
@ -1795,10 +1816,18 @@ int ObStartCompleteMigrationTask::wait_log_replay_to_max_minor_end_scn_()
|
|
|
|
|
LOG_WARN("failed to check need replay to max minor end scn", K(ret), KPC(ls), KPC(ctx_));
|
|
|
|
|
} else if (!need_wait) {
|
|
|
|
|
LOG_INFO("no need to wait ls checkpoint ts push", K(ret), KPC(ctx_));
|
|
|
|
|
} else if (OB_FAIL(get_wait_timeout_(timeout))) {
|
|
|
|
|
LOG_WARN("failed to get wait timeout", K(ret));
|
|
|
|
|
} else if (OB_FAIL(init_timeout_ctx_(timeout, timeout_ctx))) {
|
|
|
|
|
LOG_WARN("failed to init timeout ctx", K(ret));
|
|
|
|
|
} else {
|
|
|
|
|
const int64_t wait_replay_start_ts = ObTimeUtility::current_time();
|
|
|
|
|
while (OB_SUCC(ret)) {
|
|
|
|
|
if (OB_FAIL(check_ls_and_task_status_(ls))) {
|
|
|
|
|
if (timeout_ctx.is_timeouted()) {
|
|
|
|
|
ret = OB_WAIT_REPLAY_TIMEOUT;
|
|
|
|
|
LOG_WARN("already timeout", K(ret), KPC(ctx_));
|
|
|
|
|
break;
|
|
|
|
|
} else if (OB_FAIL(check_ls_and_task_status_(ls))) {
|
|
|
|
|
LOG_WARN("failed to check ls and task status", K(ret), KPC(ctx_));
|
|
|
|
|
} else if (OB_FAIL(ls->get_max_decided_scn(current_replay_scn))) {
|
|
|
|
|
LOG_WARN("failed to get current replay log ts", K(ret), KPC(ctx_));
|
|
|
|
|
@ -1815,7 +1844,7 @@ int ObStartCompleteMigrationTask::wait_log_replay_to_max_minor_end_scn_()
|
|
|
|
|
"current_ts", current_ts);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (current_ts - wait_replay_start_ts < OB_WAIT_LOG_REPLAY_TIMEOUT) {
|
|
|
|
|
if (current_ts - wait_replay_start_ts < timeout) {
|
|
|
|
|
} else {
|
|
|
|
|
if (OB_FAIL(ctx_->set_result(OB_WAIT_REPLAY_TIMEOUT, true /*allow_retry*/,
|
|
|
|
|
this->get_dag()->get_type()))) {
|
|
|
|
|
@ -1828,7 +1857,7 @@ int ObStartCompleteMigrationTask::wait_log_replay_to_max_minor_end_scn_()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (OB_SUCC(ret)) {
|
|
|
|
|
ob_usleep(OB_WAIT_LOG_REPLAY_INTERVAL);
|
|
|
|
|
ob_usleep(CHECK_CONDITION_INTERVAL);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -1885,6 +1914,20 @@ int ObStartCompleteMigrationTask::record_server_event_()
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int ObStartCompleteMigrationTask::init_timeout_ctx_(
|
|
|
|
|
const int64_t timeout,
|
|
|
|
|
ObTimeoutCtx &timeout_ctx)
|
|
|
|
|
{
|
|
|
|
|
int ret = OB_SUCCESS;
|
|
|
|
|
if (timeout <= 0) {
|
|
|
|
|
ret = OB_INVALID_ARGUMENT;
|
|
|
|
|
LOG_WARN("get invalid args", K(ret), K(timeout));
|
|
|
|
|
} else if (OB_FAIL(timeout_ctx.set_timeout(timeout))) {
|
|
|
|
|
LOG_WARN("failed to set timeout", K(ret), K(timeout));
|
|
|
|
|
}
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/******************ObFinishCompleteMigrationDag*********************/
|
|
|
|
|
ObFinishCompleteMigrationDag::ObFinishCompleteMigrationDag()
|
|
|
|
|
: ObCompleteMigrationDag(share::ObDagType::DAG_TYPE_FINISH_COMPLETE_MIGRATION),
|
|
|
|
|
|