[CP] Avoid insert user recover job repeatedly when switch RS

This commit is contained in:
hamstersox 2023-11-06 06:09:48 +00:00 committed by ob-robot
parent 5d11cc123e
commit 788aefd3f0
3 changed files with 79 additions and 29 deletions

View File

@ -21,6 +21,8 @@
#include "share/location_cache/ob_location_service.h"
#include "share/restore/ob_physical_restore_table_operator.h"
#include "share/restore/ob_import_util.h"
#include "storage/tablelock/ob_lock_inner_connection_util.h"
#include "observer/ob_inner_sql_connection.h"
using namespace oceanbase;
using namespace rootserver;
@ -233,11 +235,19 @@ int ObRecoverTableJobScheduler::sys_prepare_(share::ObRecoverTableJob &job)
LOG_WARN("failed to check target tenant version", K(ret));
} else if (OB_FAIL(helper.init(job.get_target_tenant_id()))) {
LOG_WARN("failed to init recover table persist helper", K(ret));
} else if (OB_FAIL(helper.get_recover_table_job_by_initiator(*sql_proxy_, job, target_job))) {
}
ObMySQLTransaction trans;
const uint64_t meta_tenant_id = gen_meta_tenant_id(job.get_target_tenant_id());
if (FAILEDx(trans.start(sql_proxy_, meta_tenant_id))) {
LOG_WARN("failed to start trans", K(ret));
} else if (OB_FAIL(lock_recover_table_(meta_tenant_id, trans))) {
LOG_WARN("failed to lock recover table", K(ret));
} else if (OB_FAIL(helper.get_recover_table_job_by_initiator(trans, job, target_job))) {
if (OB_ENTRY_NOT_EXIST == ret) {
if (OB_FAIL(helper.get_recover_table_job_history_by_initiator(*sql_proxy_, job, target_job))) {
if (OB_FAIL(helper.get_recover_table_job_history_by_initiator(trans, job, target_job))) {
if (OB_ENTRY_NOT_EXIST == ret) {
if (OB_FAIL(insert_user_job_(job, helper))) {
if (OB_FAIL(insert_user_job_(job, trans, helper))) {
LOG_WARN("failed to insert user job", K(ret), K(job));
} else {
ROOTSERVICE_EVENT_ADD("recover_table", "insert_user_job",
@ -252,6 +262,15 @@ int ObRecoverTableJobScheduler::sys_prepare_(share::ObRecoverTableJob &job)
LOG_WARN("failed to get target tenant recover table job", K(ret), K(job));
}
}
if (trans.is_started()) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
ret = OB_SUCC(ret) ? tmp_ret : ret;
LOG_WARN("failed to end trans", K(ret));
}
}
#ifdef ERRSIM
ret = OB_E(EventTable::EN_INSERT_USER_RECOVER_JOB_FAILED) OB_SUCCESS;
if (OB_FAIL(ret)) {
@ -265,13 +284,33 @@ int ObRecoverTableJobScheduler::sys_prepare_(share::ObRecoverTableJob &job)
return ret;
}
int ObRecoverTableJobScheduler::lock_recover_table_(
const uint64_t tenant_id, ObMySQLTransaction &trans)
{
int ret = OB_SUCCESS;
ObLockTableRequest recover_job_arg;
recover_job_arg.table_id_ = OB_ALL_RECOVER_TABLE_JOB_TID;
recover_job_arg.lock_mode_ = EXCLUSIVE;
recover_job_arg.timeout_us_ = 0; // try lock
recover_job_arg.op_type_ = IN_TRANS_COMMON_LOCK; // unlock when trans end
observer::ObInnerSQLConnection *conn = nullptr;
if (OB_ISNULL(conn = dynamic_cast<observer::ObInnerSQLConnection *>(trans.get_connection()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("conn is NULL", KR(ret));
} else if (OB_FAIL(transaction::tablelock::ObInnerConnectionLockUtil::lock_table(tenant_id, recover_job_arg, conn))) {
LOG_WARN("failed to lock table", K(ret));
}
return ret;
}
int ObRecoverTableJobScheduler::insert_user_job_(
const share::ObRecoverTableJob &job, share::ObRecoverTablePersistHelper &helper)
const share::ObRecoverTableJob &job,
ObMySQLTransaction &trans,
share::ObRecoverTablePersistHelper &helper)
{
int ret = OB_SUCCESS;
ObRecoverTableJob target_job;
ObMySQLTransaction trans;
uint64_t meta_tenant_id = gen_meta_tenant_id(job.get_target_tenant_id());
if (OB_FAIL(target_job.assign(job))) {
LOG_WARN("failed to assign target job", K(ret));
} else {
@ -280,27 +319,12 @@ int ObRecoverTableJobScheduler::insert_user_job_(
target_job.set_initiator_job_id(job.get_job_id());
target_job.set_target_tenant_id(target_job.get_tenant_id());
}
if (FAILEDx(trans.start(sql_proxy_, meta_tenant_id))) {
LOG_WARN("failed to start trans", K(ret));
} else {
int64_t job_id = 0;
if (OB_FAIL(ObLSBackupInfoOperator::get_next_job_id(trans, target_job.get_tenant_id(), job_id))) {
LOG_WARN("failed to get next job id", K(ret), "tenant_id", target_job.get_tenant_id());
} else if (OB_FALSE_IT(target_job.set_job_id(job_id))) {
} else if (OB_FAIL(helper.insert_recover_table_job(trans, target_job))) {
LOG_WARN("failed to insert initial recover table job", K(ret));
}
if (OB_SUCC(ret)) {
if (OB_FAIL(trans.end(true))) {
LOG_WARN("failed to commit trans", K(ret));
}
} else {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = trans.end(false))) {
LOG_WARN("failed to rollback", K(ret), K(tmp_ret));
}
}
int64_t job_id = 0;
if (FAILEDx(ObLSBackupInfoOperator::get_next_job_id(trans, job.get_target_tenant_id(), job_id))) {
LOG_WARN("failed to get next job id", K(ret), "tenant_id", job.get_target_tenant_id());
} else if (OB_FALSE_IT(target_job.set_job_id(job_id))) {
} else if (OB_FAIL(helper.insert_recover_table_job(trans, target_job))) {
LOG_WARN("failed to insert initial recover table job", K(ret));
}
return ret;
@ -576,6 +600,8 @@ int ObRecoverTableJobScheduler::active_aux_tenant_(share::ObRecoverTableJob &job
*sql_proxy_, job.get_initiator_job_id(), job.get_initiator_tenant_id(), restore_history_info))) {
LOG_WARN("failed to get restore job history", K(ret),
"initiator_job_id", job.get_job_id(), "initiator_tenant_id", job.get_tenant_id());
} else if (OB_FAIL(ban_multi_version_recycling_(job, restore_history_info.restore_tenant_id_))) {
LOG_WARN("failed to ban multi version cecycling", K(ret));
} else if (OB_FAIL(failover_to_primary_(job, restore_history_info.restore_tenant_id_))) {
LOG_WARN("failed to failover to primary", K(ret), K(restore_history_info));
}
@ -600,6 +626,23 @@ int ObRecoverTableJobScheduler::active_aux_tenant_(share::ObRecoverTableJob &job
return ret;
}
int ObRecoverTableJobScheduler::ban_multi_version_recycling_(share::ObRecoverTableJob &job, const uint64_t aux_tenant_id)
{
int ret = OB_SUCCESS;
const int64_t tenant_id = aux_tenant_id;
const int64_t MAX_UNDO_RETENTION = 31536000; // 1 year
int64_t affected_row = 0;
ObSqlString sql;
MTL_SWITCH(OB_SYS_TENANT_ID) {
if (OB_FAIL(sql.assign_fmt("alter system set undo_retention = %ld", MAX_UNDO_RETENTION))) {
LOG_WARN("failed to assign fmt", K(ret));
} else if (OB_FAIL(sql_proxy_->write(tenant_id, sql.ptr(), affected_row))) {
LOG_WARN("failed to set undo retention", K(ret));
}
}
return ret;
}
int ObRecoverTableJobScheduler::failover_to_primary_(
share::ObRecoverTableJob &job, const uint64_t aux_tenant_id)
{

View File

@ -63,7 +63,11 @@ private:
void sys_process_(share::ObRecoverTableJob &job);
int check_target_tenant_version_(share::ObRecoverTableJob &job);
int sys_prepare_(share::ObRecoverTableJob &job);
int insert_user_job_(const share::ObRecoverTableJob &job, share::ObRecoverTablePersistHelper &helper);
int lock_recover_table_(const uint64_t tenant_id, ObMySQLTransaction &trans);
int insert_user_job_(
const share::ObRecoverTableJob &job,
ObMySQLTransaction &trans,
share::ObRecoverTablePersistHelper &helper);
int recovering_(share::ObRecoverTableJob &job);
int sys_finish_(const share::ObRecoverTableJob &job);
int drop_aux_tenant_(const share::ObRecoverTableJob &job);
@ -73,6 +77,7 @@ private:
int restore_aux_tenant_(share::ObRecoverTableJob &job);
int check_aux_tenant_(share::ObRecoverTableJob &job, const uint64_t aux_tenant_id);
int active_aux_tenant_(share::ObRecoverTableJob &job);
int ban_multi_version_recycling_(share::ObRecoverTableJob &job, const uint64_t aux_tenant_id);
int failover_to_primary_(share::ObRecoverTableJob &job, const uint64_t aux_tenant_id);
int check_tenant_compatibility(
share::schema::ObSchemaGetterGuard &aux_tenant_guard,

View File

@ -35,7 +35,9 @@ public:
// please think carefully about circular dependencies before adding inner table into the white list
static bool in_inner_table_lock_white_list(const uint64_t inner_table_id)
{
return share::OB_ALL_BALANCE_JOB_TID == inner_table_id;
bool b_ret = share::OB_ALL_BALANCE_JOB_TID == inner_table_id
|| share::OB_ALL_RECOVER_TABLE_JOB_TID == inner_table_id;
return b_ret;
}
/*
* lock inner table in trans with internal_sql_execute_timeout