During failover, in the prepare phase, execute recover cancel

This commit is contained in:
obdev
2023-02-24 13:17:01 +00:00
committed by ob-robot
parent 5ef5dd6fae
commit c8bb7bbeb8
10 changed files with 87 additions and 50 deletions

View File

@ -55,7 +55,7 @@ int ObTenantInfoLoader::init()
} else if (OB_ISNULL(GCTX.sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql proxy is null", KR(ret));
} else if (OB_FAIL(create(thread_cnt, "TenantInfoLoader"))) {
} else if (OB_FAIL(create(thread_cnt, "TenantInf"))) {
LOG_WARN("failed to create tenant info loader thread", KR(ret), K(thread_cnt));
}
}

View File

@ -306,6 +306,9 @@ int ObTenantRecoveryReportor::update_ls_recovery(ObLS *ls, common::ObMySQLProxy
K(first_proposal_id), K(second_proposal_id),
K(ls_recovery_stat));
}
LOG_TRACE("tenant update ls recovery stat", KR(ret), K(role),
K(first_proposal_id), K(second_proposal_id),
K(ls_recovery_stat));
}
return ret;

View File

@ -10,7 +10,7 @@
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX RS
#define USING_LOG_PREFIX STANDBY
#include "ob_tenant_role_transition_service.h"
#include "logservice/palf/log_define.h"
#include "share/scn.h"
@ -25,6 +25,7 @@
#include "share/location_cache/ob_location_service.h"//get ls leader
#include "share/ob_schema_status_proxy.h"//set_schema_status
#include "storage/tx/ob_timestamp_service.h" // ObTimestampService
#include "share/ob_primary_standby_service.h" // ObPrimaryStandbyService
namespace oceanbase
{
@ -194,6 +195,8 @@ int ObTenantRoleTransitionService::do_prepare_flashback_(share::ObAllTenantInfo
{
int ret = OB_SUCCESS;
DEBUG_SYNC(BEFORE_PREPARE_FLASHBACK);
if (OB_FAIL(check_inner_stat())) {
LOG_WARN("error unexpected", KR(ret), K(tenant_id_), KP(sql_proxy_), KP(rpc_proxy_));
} else if (OB_UNLIKELY(!(tenant_info.is_prepare_flashback_for_failover_to_primary_status()
@ -306,6 +309,14 @@ int ObTenantRoleTransitionService::do_prepare_flashback_for_failover_to_primary_
LOG_WARN("tenant switchover status not valid", KR(ret), K(tenant_info), K_(switchover_epoch));
} else if (OB_FAIL(update_tenant_stat_info_())) {
LOG_WARN("failed to update tenant stat info", KR(ret), K(tenant_info), K_(switchover_epoch));
} else if (OB_FAIL(OB_PRIMARY_STANDBY_SERVICE.do_recover_tenant(tenant_id_,
share::PREPARE_FLASHBACK_FOR_FAILOVER_TO_PRIMARY_SWITCHOVER_STATUS,
obrpc::ObRecoverTenantArg::RecoverType::CANCEL,
SCN::min_scn()))) {
LOG_WARN("failed to do_recover_tenant", KR(ret), K_(tenant_id));
// reset error code and USER_ERROR to avoid print recover error log
ret = OB_ERR_UNEXPECTED;
LOG_USER_ERROR(OB_ERR_UNEXPECTED, "can not do recover cancel for tenant, failed to failover to primary");
} else if (OB_FAIL(ObAllTenantInfoProxy::update_tenant_switchover_status(
tenant_id_, sql_proxy_, tenant_info.get_switchover_epoch(),
tenant_info.get_switchover_status(), share::FLASHBACK_SWITCHOVER_STATUS))) {
@ -824,7 +835,7 @@ int ObTenantRoleTransitionService::switchover_update_tenant_status(
if (OB_FAIL(ret)) {
} else if (OB_FAIL(ObAllTenantInfoProxy::update_tenant_status(tenant_id,
&trans,
trans,
new_role,
old_status,
new_status,

View File

@ -396,6 +396,7 @@ class ObString;
ACT(BEFORE_DO_FLASHBACK,)\
ACT(PREPARE_FLASHBACK_FOR_SWITCH_TO_PRIMARY,)\
ACT(SWITCHING_TO_STANDBY,)\
ACT(BEFORE_PREPARE_FLASHBACK,)\
ACT(BEFORE_LS_RESTORE_SYS_TABLETS,)\
ACT(BEFORE_WAIT_RESTORE_SYS_TABLETS,)\
ACT(BEFORE_WAIT_RESTORE_TABLETS_META,)\

View File

@ -19,7 +19,6 @@
#include "rootserver/ob_rs_event_history_table_operator.h" // ROOTSERVICE_EVENT_ADD
#include "rootserver/ob_tenant_role_transition_service.h" // ObTenantRoleTransitionService
#include "rootserver/ob_primary_ls_service.h"//ObTenantLSInfo
#include "share/restore/ob_log_restore_source_mgr.h" // ObLogRestoreSourceMgr
#include "share/ls/ob_ls_recovery_stat_operator.h"// ObLSRecoveryStatOperator
#include "share/ls/ob_ls_life_manager.h" //ObLSLifeAgentManager
#include "share/ls/ob_ls_operator.h" //ObLSAttr
@ -243,7 +242,8 @@ int ObPrimaryStandbyService::recover_tenant(const obrpc::ObRecoverTenantArg &arg
LOG_WARN("invalid arg", K(arg), KR(ret));
} else if (OB_FAIL(get_target_tenant_id(arg.get_tenant_name(), arg.get_exec_tenant_id(), tenant_id))) {
LOG_WARN("failed to get_target_tenant_id", KR(ret), K(tenant_id), K(arg));
} else if (OB_FAIL(do_recover_tenant(arg, tenant_id))) {
} else if (OB_FAIL(do_recover_tenant(tenant_id, share::NORMAL_SWITCHOVER_STATUS, arg.get_type(),
arg.get_recovery_until_scn()))) {
LOG_WARN("failed to do_recover_tenant", KR(ret), K(tenant_id), K(arg));
}
@ -255,7 +255,11 @@ int ObPrimaryStandbyService::recover_tenant(const obrpc::ObRecoverTenantArg &arg
return ret;
}
int ObPrimaryStandbyService::do_recover_tenant(const obrpc::ObRecoverTenantArg &arg, const uint64_t tenant_id)
int ObPrimaryStandbyService::do_recover_tenant(
const uint64_t tenant_id,
const share::ObTenantSwitchoverStatus &working_sw_status,
const obrpc::ObRecoverTenantArg::RecoverType &recover_type,
const share::SCN &recovery_until_scn)
{
int ret = OB_SUCCESS;
ObAllTenantInfo tenant_info;
@ -267,9 +271,10 @@ int ObPrimaryStandbyService::do_recover_tenant(const obrpc::ObRecoverTenantArg &
ObLSRecoveryStat sys_ls_recovery;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("inner stat error", KR(ret), K_(inited));
} else if (!arg.is_valid()) {
} else if (!obrpc::ObRecoverTenantArg::is_valid(recover_type, recovery_until_scn)
|| !working_sw_status.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(arg), KR(ret));
LOG_WARN("invalid arg", K(recover_type), K(recovery_until_scn), KR(ret));
} else if (OB_ISNULL(GCTX.srv_rpc_proxy_) || OB_ISNULL(schema_service_) || OB_ISNULL(sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pointer is null", KR(ret), KP(GCTX.srv_rpc_proxy_), KP(schema_service_), KP(sql_proxy_));
@ -282,7 +287,7 @@ int ObPrimaryStandbyService::do_recover_tenant(const obrpc::ObRecoverTenantArg &
LOG_WARN("failed to get tenant info", KR(ret), K(tenant_id));
} else if (OB_ISNULL(tenant_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tenant_schema is null", KR(ret), K(tenant_id), K(arg));
LOG_WARN("tenant_schema is null", KR(ret), K(tenant_id), K(recover_type), K(recovery_until_scn));
} else if (OB_FAIL(trans.start(sql_proxy_, exec_tenant_id))) {
LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id), K(tenant_id));
} else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id, &trans, true, tenant_info))) {
@ -291,37 +296,34 @@ int ObPrimaryStandbyService::do_recover_tenant(const obrpc::ObRecoverTenantArg &
ret = OB_OP_NOT_ALLOW;
LOG_WARN("tenant role is not STANDBY", K(tenant_info));
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "tenant role is not STANDBY, recover is");
} else if (!tenant_info.is_normal_status()) {
} else if (tenant_info.get_switchover_status() != working_sw_status) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("tenant switchover_status is not NORMAL", K(tenant_info));
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "tenant switchover_status is not NORMAL, recover is");
LOG_WARN("unexpected tenant switchover status", KR(ret), K(working_sw_status), K(tenant_info));
} else if (OB_FAIL(ls_recovery_operator.get_ls_recovery_stat(tenant_id, share::SYS_LS,
true /*for_update*/, sys_ls_recovery, trans))) {
LOG_WARN("failed to get ls recovery stat", KR(ret), K(tenant_id));
} else if (obrpc::ObRecoverTenantArg::RecoverType::UNTIL == arg.get_type()
&& (arg.get_recovery_until_scn() < tenant_info.get_sync_scn()
|| arg.get_recovery_until_scn() < sys_ls_recovery.get_sync_scn())) {
} else if (obrpc::ObRecoverTenantArg::RecoverType::UNTIL == recover_type
&& (recovery_until_scn < tenant_info.get_sync_scn()
|| recovery_until_scn < sys_ls_recovery.get_sync_scn())) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("recover before tenant sync_scn or SYS LS sync_scn is not allow", KR(ret), K(tenant_info),
K(tenant_id), K(arg), K(sys_ls_recovery));
K(tenant_id), K(recover_type), K(recovery_until_scn), K(sys_ls_recovery));
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "recover before tenant sync_scn or SYS LS sync_scn is");
} else if (tenant_schema->is_normal()) {
ObLogRestoreSourceMgr restore_source_mgr;
const SCN &recovery_until_scn = obrpc::ObRecoverTenantArg::RecoverType::UNTIL == arg.get_type() ?
arg.get_recovery_until_scn() : SCN::max(tenant_info.get_sync_scn(), sys_ls_recovery.get_sync_scn());
if (tenant_info.get_recovery_until_scn() == recovery_until_scn) {
LOG_WARN("recovery_until_scn is same with original", KR(ret), K(tenant_info), K(tenant_id), K(arg));
} else if (OB_FAIL(restore_source_mgr.init(tenant_id, &trans))) {
LOG_WARN("failed to init restore_source_mgr", KR(ret), K(tenant_id), K(arg));
} else if (OB_FAIL(restore_source_mgr.update_recovery_until_scn(recovery_until_scn))) {
LOG_WARN("failed to update_recovery_until_scn", KR(ret), K(tenant_id), K(arg));
const SCN &recovery_until_scn_to_set = obrpc::ObRecoverTenantArg::RecoverType::UNTIL == recover_type ?
recovery_until_scn : SCN::max(tenant_info.get_sync_scn(), sys_ls_recovery.get_sync_scn());
if (tenant_info.get_recovery_until_scn() == recovery_until_scn_to_set) {
LOG_WARN("recovery_until_scn is same with original", KR(ret), K(tenant_info), K(tenant_id),
K(recover_type), K(recovery_until_scn));
} else if (OB_FAIL(ObAllTenantInfoProxy::update_tenant_recovery_until_scn(
tenant_id, trans, tenant_info.get_switchover_epoch(), recovery_until_scn))) {
LOG_WARN("failed to update_tenant_recovery_until_scn", KR(ret), K(tenant_id), K(arg));
tenant_id, trans, tenant_info.get_switchover_epoch(), recovery_until_scn_to_set))) {
LOG_WARN("failed to update_tenant_recovery_until_scn", KR(ret), K(tenant_id), K(recover_type),
K(recovery_until_scn), K(recovery_until_scn_to_set));
}
} else {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("tenant status is not normal, recover is not allowed", KR(ret), K(tenant_id), K(arg), KPC(tenant_schema));
LOG_WARN("tenant status is not normal, recover is not allowed", KR(ret), K(tenant_id),
K(recover_type), K(recovery_until_scn), KPC(tenant_schema));
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "tenant status is not normal, recover is");
}

View File

@ -66,6 +66,21 @@ public:
int write_upgrade_barrier_log(ObMySQLTransaction &trans, const uint64_t tenant_id, const uint64_t data_version);
/**
* @description:
* do recover tenant
* @param[in] tenant_id recover tenant_id
* @param[in] working_sw_status recover tenant in expected switchover status
* @param[in] recover_type recover type UNTIL/CANCEL
* @param[in] recovery_until_scn
* @return return code
*/
int do_recover_tenant(
const uint64_t tenant_id,
const share::ObTenantSwitchoverStatus &working_sw_status,
const obrpc::ObRecoverTenantArg::RecoverType &recover_type,
const share::SCN &recovery_until_scn);
private:
int check_inner_stat_();
@ -78,15 +93,6 @@ private:
*/
int failover_to_primary(const uint64_t tenant_id, const obrpc::ObSwitchTenantArg::OpType &switch_optype);
/**
* @description:
* do recover tenant
* @param[in] arg recover switch arguments
* @param[in] tenant_id recover tenant_id
* @return return code
*/
int do_recover_tenant(const obrpc::ObRecoverTenantArg &arg, const uint64_t tenant_id);
/**
* @description:
* get target tenant_id from tenant_name to operate

View File

@ -5527,7 +5527,7 @@ int ObRecoverTenantArg::init(
const SCN &recovery_until_scn)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == exec_tenant_id || !is_valid_(type, recovery_until_scn))) {
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == exec_tenant_id || !is_valid(type, recovery_until_scn))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(exec_tenant_id), K(type), K(recovery_until_scn));
} else {

View File

@ -6354,18 +6354,19 @@ public:
const share::SCN &recovery_until_scn);
bool is_valid() const {
return OB_INVALID_TENANT_ID != exec_tenant_id_
&& is_valid_(type_, recovery_until_scn_);
&& is_valid(type_, recovery_until_scn_);
}
static bool is_valid(const RecoverType &type, const share::SCN &recovery_until_scn) {
return ((RecoverType::UNTIL == type && recovery_until_scn.is_valid_and_not_min())
|| (RecoverType::CANCEL == type && recovery_until_scn.is_min()));
}
int assign(const ObRecoverTenantArg &other);
void set_stmt_str(const ObString &stmt_str) { stmt_str_ = stmt_str; }
TO_STRING_KV(K_(exec_tenant_id), K_(tenant_name), K_(type), K_(recovery_until_scn), K_(stmt_str));
private:
bool is_valid_(const RecoverType type, const share::SCN &recovery_until_scn) const {
return ((RecoverType::UNTIL == type && recovery_until_scn.is_valid_and_not_min())
|| (RecoverType::CANCEL == type && recovery_until_scn.is_min()));
}
#define Property_declare_var(variable_type, variable_name)\
private:\

View File

@ -24,6 +24,7 @@
#include "common/ob_timeout_ctx.h"//ObTimeoutCtx
#include "rootserver/ob_root_utils.h"//ObRootUtils
#include "rootserver/ob_rs_event_history_table_operator.h" // ROOTSERVICE_EVENT_ADD
#include "share/restore/ob_log_restore_source_mgr.h" // ObLogRestoreSourceMgr
using namespace oceanbase;
using namespace oceanbase::common;
@ -552,6 +553,7 @@ int ObAllTenantInfoProxy::update_tenant_recovery_until_scn(
ObTimeoutCtx ctx;
ObLSRecoveryStatOperator ls_recovery_operator;
ObLSRecoveryStat sys_ls_recovery;
ObLogRestoreSourceMgr restore_source_mgr;
if (!is_user_tenant(tenant_id) || OB_INVALID_VERSION == switchover_epoch) {
ret = OB_INVALID_ARGUMENT;
@ -582,7 +584,13 @@ int ObAllTenantInfoProxy::update_tenant_recovery_until_scn(
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "state changed, check sync_scn and switchover status, recover is");
} else if (!is_single_row(affected_rows)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("expect updating one row", KR(ret), K(affected_rows), K(sql));
LOG_WARN("expect updating one row", KR(ret), K(affected_rows),
K(switchover_epoch), K(recovery_until_scn), K(sql));
// update __all_log_restore_source
} else if (OB_FAIL(restore_source_mgr.init(tenant_id, &trans))) {
LOG_WARN("failed to init restore_source_mgr", KR(ret), K(tenant_id), K(recovery_until_scn));
} else if (OB_FAIL(restore_source_mgr.update_recovery_until_scn(recovery_until_scn))) {
LOG_WARN("failed to update_recovery_until_scn", KR(ret), K(tenant_id), K(recovery_until_scn));
}
int64_t cost = ObTimeUtility::current_time() - begin_time;
@ -595,7 +603,7 @@ int ObAllTenantInfoProxy::update_tenant_recovery_until_scn(
int ObAllTenantInfoProxy::update_tenant_status(
const uint64_t tenant_id,
ObISQLClient *proxy,
common::ObMySQLTransaction &trans,
const ObTenantRole new_role,
const ObTenantSwitchoverStatus &old_status,
const ObTenantSwitchoverStatus &new_status,
@ -612,9 +620,9 @@ int ObAllTenantInfoProxy::update_tenant_status(
int64_t affected_rows = 0;
ObTimeoutCtx ctx;
int64_t new_switchover_epoch = OB_INVALID_VERSION;
ObLogRestoreSourceMgr restore_source_mgr;
if (OB_UNLIKELY(OB_ISNULL(proxy)
|| !is_user_tenant(tenant_id)
if (OB_UNLIKELY(!is_user_tenant(tenant_id)
|| !new_role.is_valid()
|| !old_status.is_valid()
|| !new_status.is_valid()
@ -625,7 +633,7 @@ int ObAllTenantInfoProxy::update_tenant_status(
|| !recovery_until_scn.is_valid_and_not_min()
|| OB_INVALID_VERSION == old_switchover_epoch)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tenant_info is invalid", KR(ret), KP(proxy), K(tenant_id), K(new_role), K(old_status),
LOG_WARN("tenant_info is invalid", KR(ret), K(tenant_id), K(new_role), K(old_status),
K(new_status), K(sync_scn), K(replayable_scn), K(readable_scn), K(recovery_until_scn),
K(old_switchover_epoch));
} else if (OB_FAIL(get_new_switchover_epoch_(old_switchover_epoch, old_status, new_status,
@ -656,7 +664,7 @@ int ObAllTenantInfoProxy::update_tenant_status(
replayable_scn.get_val_for_inner_table_field(),
readable_scn.get_val_for_inner_table_field()))) {
LOG_WARN("failed to assign sql", KR(ret), K(tenant_id), K(sql));
} else if (OB_FAIL(proxy->write(exec_tenant_id, sql.ptr(), affected_rows))) {
} else if (OB_FAIL(trans.write(exec_tenant_id, sql.ptr(), affected_rows))) {
LOG_WARN("failed to execute sql", KR(ret), K(exec_tenant_id), K(sql));
} else if (0 == affected_rows) {
ret = OB_NEED_RETRY;
@ -664,6 +672,11 @@ int ObAllTenantInfoProxy::update_tenant_status(
} else if (!is_single_row(affected_rows)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("expect updating one row", KR(ret), K(affected_rows), K(sql));
// update __all_log_restore_source
} else if (OB_FAIL(restore_source_mgr.init(tenant_id, &trans))) {
LOG_WARN("failed to init restore_source_mgr", KR(ret), K(tenant_id), K(recovery_until_scn));
} else if (OB_FAIL(restore_source_mgr.update_recovery_until_scn(recovery_until_scn))) {
LOG_WARN("failed to update_recovery_until_scn", KR(ret), K(tenant_id), K(recovery_until_scn));
}
ObAllTenantInfo tenant_info;

View File

@ -211,7 +211,7 @@ public:
*/
static int update_tenant_status(
const uint64_t tenant_id,
ObISQLClient *proxy,
common::ObMySQLTransaction &trans,
const ObTenantRole new_role,
const ObTenantSwitchoverStatus &old_status,
const ObTenantSwitchoverStatus &new_status,