restore 租户受控回放
This commit is contained in:
@ -447,47 +447,53 @@ int ObAllTenantInfoProxy::update_tenant_recovery_status_in_trans(
|
|||||||
LOG_WARN("meta tenant no need init tenant info", KR(ret), K(tenant_id));
|
LOG_WARN("meta tenant no need init tenant info", KR(ret), K(tenant_id));
|
||||||
} else {
|
} else {
|
||||||
SCN new_sync_scn = gen_new_sync_scn(old_tenant_info.get_sync_scn(), sync_scn, old_tenant_info.get_recovery_until_scn());
|
SCN new_sync_scn = gen_new_sync_scn(old_tenant_info.get_sync_scn(), sync_scn, old_tenant_info.get_recovery_until_scn());
|
||||||
SCN new_replay_scn = gen_new_replayable_scn(old_tenant_info.get_replayable_scn(), replay_scn, new_sync_scn);
|
SCN new_replayable_scn = gen_new_replayable_scn(old_tenant_info.get_replayable_scn(), replay_scn, new_sync_scn);
|
||||||
SCN new_scn = gen_new_standby_scn(old_tenant_info.get_standby_scn(), readable_scn, new_replay_scn);
|
SCN new_readable_scn = gen_new_standby_scn(old_tenant_info.get_standby_scn(), readable_scn, new_replayable_scn);
|
||||||
|
|
||||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
|
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
|
||||||
if (OB_UNLIKELY(!tenant_config.is_valid())) {
|
if (OB_UNLIKELY(!tenant_config.is_valid())) {
|
||||||
LOG_WARN_RET(OB_ERR_UNEXPECTED, "tenant config is invalid", K(tenant_id));
|
LOG_WARN_RET(OB_ERR_UNEXPECTED, "tenant config is invalid", K(tenant_id));
|
||||||
} else {
|
} else {
|
||||||
const int64_t MAX_GAP = tenant_config->_standby_max_replay_gap_time * 1000;
|
const int64_t MAX_GAP = tenant_config->_standby_max_replay_gap_time * 1000;
|
||||||
SCN new_standby_scn_plus_gap = SCN::plus(new_scn, MAX_GAP);
|
SCN new_readable_scn_plus_gap = SCN::plus(new_readable_scn, MAX_GAP);
|
||||||
if (REACH_TENANT_TIME_INTERVAL(10 * 1000 * 1000)) { // 10s
|
if (REACH_TENANT_TIME_INTERVAL(10 * 1000 * 1000)) { // 10s
|
||||||
const int64_t REAL_GAP = new_replay_scn.get_val_for_gts() - new_scn.get_val_for_gts();
|
const int64_t REAL_GAP = new_replayable_scn.get_val_for_gts() - new_readable_scn.get_val_for_gts();
|
||||||
const bool IS_MAX_GAP_REACHED = REAL_GAP > MAX_GAP ? true : false;
|
const bool IS_MAX_GAP_REACHED = REAL_GAP > MAX_GAP ? true : false;
|
||||||
LOG_INFO("tenant scn gap info", K(IS_MAX_GAP_REACHED), K(REAL_GAP), K(MAX_GAP), K(new_sync_scn),
|
LOG_INFO("tenant scn gap info", K(IS_MAX_GAP_REACHED), K(REAL_GAP), K(MAX_GAP), K(new_sync_scn),
|
||||||
K(new_replay_scn), K(new_scn), K(old_tenant_info));
|
K(new_replayable_scn), K(new_readable_scn), K(old_tenant_info));
|
||||||
}
|
}
|
||||||
if (old_tenant_info.is_standby()
|
if (!old_tenant_info.is_primary()
|
||||||
&& new_replay_scn.is_valid()
|
&& !old_tenant_info.get_max_ls_id().is_sys_ls()
|
||||||
&& new_standby_scn_plus_gap.is_valid()
|
&& new_replayable_scn.is_valid()
|
||||||
&& new_replay_scn > new_standby_scn_plus_gap) {
|
&& new_readable_scn_plus_gap.is_valid()
|
||||||
if (new_standby_scn_plus_gap >= old_tenant_info.get_replayable_scn()
|
&& new_replayable_scn > new_readable_scn_plus_gap
|
||||||
|
&& new_readable_scn_plus_gap >= old_tenant_info.get_replayable_scn()
|
||||||
&& old_tenant_info.get_standby_scn() > SCN::base_scn()) {
|
&& old_tenant_info.get_standby_scn() > SCN::base_scn()) {
|
||||||
// sys ls's readable_scn starts from base_scn
|
// condition: !old_tenant_info.get_max_ls_id().is_sys_ls()
|
||||||
// replayable_scn cannot start from base_scn, it's too slow when restore tenant
|
// If max_ls_id is sys ls, this logic is not needed.
|
||||||
// at the beginning time, replayable_scn should be sync_scn
|
// The goal of this logic is to minimize the difference of readable_scn among multiple ls
|
||||||
new_replay_scn = new_standby_scn_plus_gap;
|
|
||||||
}
|
// condition: old_tenant_info.get_standby_scn() > SCN::base_scn()
|
||||||
|
// This condition is for restore tenant
|
||||||
|
// sys ls's readable_scn/standby_scn starts from base_scn
|
||||||
|
// replayable_scn cannot start from base_scn, it's too slow when we restore tenant
|
||||||
|
// At the beginning time, replayable_scn should be sync_scn
|
||||||
|
new_replayable_scn = new_readable_scn_plus_gap;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (old_tenant_info.get_sync_scn() == new_sync_scn
|
if (old_tenant_info.get_sync_scn() == new_sync_scn
|
||||||
&& old_tenant_info.get_replayable_scn() == new_replay_scn
|
&& old_tenant_info.get_replayable_scn() == new_replayable_scn
|
||||||
&& old_tenant_info.get_standby_scn() == new_scn) {
|
&& old_tenant_info.get_standby_scn() == new_readable_scn) {
|
||||||
LOG_DEBUG("no need update", K(old_tenant_info), K(new_sync_scn), K(new_replay_scn), K(new_scn));
|
LOG_DEBUG("no need update", K(old_tenant_info), K(new_sync_scn), K(new_replayable_scn), K(new_readable_scn));
|
||||||
} else if (OB_FAIL(sql.assign_fmt(
|
} else if (OB_FAIL(sql.assign_fmt(
|
||||||
"update %s set sync_scn = %ld, replayable_scn = %ld, "
|
"update %s set sync_scn = %ld, replayable_scn = %ld, "
|
||||||
"readable_scn = %ld where tenant_id = %lu "
|
"readable_scn = %ld where tenant_id = %lu "
|
||||||
"and readable_scn <= replayable_scn and "
|
"and readable_scn <= replayable_scn and "
|
||||||
"replayable_scn <= sync_scn and sync_scn <= recovery_until_scn", OB_ALL_TENANT_INFO_TNAME,
|
"replayable_scn <= sync_scn and sync_scn <= recovery_until_scn", OB_ALL_TENANT_INFO_TNAME,
|
||||||
new_sync_scn.get_val_for_inner_table_field(),
|
new_sync_scn.get_val_for_inner_table_field(),
|
||||||
new_replay_scn.get_val_for_inner_table_field(),
|
new_replayable_scn.get_val_for_inner_table_field(),
|
||||||
new_scn.get_val_for_inner_table_field(),
|
new_readable_scn.get_val_for_inner_table_field(),
|
||||||
tenant_id))) {
|
tenant_id))) {
|
||||||
LOG_WARN("failed to assign sql", KR(ret), K(tenant_id), K(sql));
|
LOG_WARN("failed to assign sql", KR(ret), K(tenant_id), K(sql));
|
||||||
} else if (OB_FAIL(trans.write(exec_tenant_id, sql.ptr(), affected_rows))) {
|
} else if (OB_FAIL(trans.write(exec_tenant_id, sql.ptr(), affected_rows))) {
|
||||||
@ -498,7 +504,7 @@ int ObAllTenantInfoProxy::update_tenant_recovery_status_in_trans(
|
|||||||
}
|
}
|
||||||
|
|
||||||
LOG_TRACE("update_tenant_recovery_status", KR(ret), K(tenant_id), K(affected_rows),
|
LOG_TRACE("update_tenant_recovery_status", KR(ret), K(tenant_id), K(affected_rows),
|
||||||
K(sql), K(old_tenant_info), K(new_sync_scn), K(new_replay_scn), K(new_scn),
|
K(sql), K(old_tenant_info), K(new_sync_scn), K(new_replayable_scn), K(new_readable_scn),
|
||||||
K(sync_scn), K(replay_scn), K(readable_scn));
|
K(sync_scn), K(replay_scn), K(readable_scn));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
Reference in New Issue
Block a user