fix backup delete skip tablet timeout
This commit is contained in:
@ -1184,15 +1184,9 @@ int ObUserTenantBackupJobMgr::move_to_history_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
LOG_INFO("start to move backup job to history", KPC(job_attr_));
|
||||
ObMySQLTransaction trans;
|
||||
ObBackupSetTaskMgr set_task_mgr;
|
||||
ObTimeoutCtx timeout_ctx;
|
||||
if (is_sys_tenant(job_attr_->initiator_tenant_id_) && OB_FAIL(report_failed_to_initiator_())) {
|
||||
LOG_WARN("fail to report job finish to initiator tenant id", K(ret), KPC(job_attr_));
|
||||
} else if (OB_FAIL(trans.start(sql_proxy_, tenant_id_))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to start trans", K(ret));
|
||||
} else if (OB_FAIL(set_query_timeout_and_trx_timeout_(timeout_ctx))) {
|
||||
LOG_WARN("failed to set query timeout and trx timeout", K(ret));
|
||||
} else {
|
||||
if (OB_FAIL(set_task_mgr.init(tenant_id_, *job_attr_, *sql_proxy_,
|
||||
*rpc_proxy_, *task_scheduler_, *schema_service_, *backup_service_))) {
|
||||
@ -1201,43 +1195,30 @@ int ObUserTenantBackupJobMgr::move_to_history_()
|
||||
} else {
|
||||
LOG_WARN("[DATA_BACKUP]failed to init set task mgr", K(ret), KPC(job_attr_));
|
||||
}
|
||||
} else if (OB_FAIL(set_task_mgr.do_clean_up(trans))) {
|
||||
} else if (OB_FAIL(set_task_mgr.do_clean_up())) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to do clean up", K(ret), K(set_task_mgr));
|
||||
}
|
||||
|
||||
ObMySQLTransaction trans;
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(job_attr_->tenant_id_)))) {
|
||||
LOG_WARN("failed to start trans", K(ret));
|
||||
} else if (OB_FAIL(ObBackupJobOperator::move_job_to_his(trans, job_attr_->tenant_id_, job_attr_->job_id_))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to move job to history", K(ret), KPC(job_attr_));
|
||||
}
|
||||
|
||||
if (trans.is_started()) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
|
||||
LOG_WARN("failed to end trans", K(ret), K(tmp_ret));
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(trans.end(true))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to commit trans", K(ret));
|
||||
} else {
|
||||
LOG_INFO("succeed to move job to history. backup job finish", "tenant_id", job_attr_->tenant_id_, "job_id", job_attr_->job_id_);
|
||||
backup_service_->wakeup();
|
||||
}
|
||||
} else {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (tmp_ret = trans.end(false))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to roll back status", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObUserTenantBackupJobMgr::set_query_timeout_and_trx_timeout_(ObTimeoutCtx &timeout_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t ob_query_timeout = 600 * 1000 * 1000; // 600s
|
||||
const int64_t ob_trx_timeout = 600 * 1000 * 1000; // 600s
|
||||
const int64_t abs_timeout = ObTimeUtility::current_time() + ob_query_timeout;
|
||||
if (OB_FAIL(timeout_ctx.set_trx_timeout_us(ob_trx_timeout))) {
|
||||
LOG_WARN("failed to set trx timeout us", K(ret), K(ob_trx_timeout));
|
||||
} else if (OB_FAIL(timeout_ctx.set_abs_timeout(abs_timeout))) {
|
||||
LOG_WARN("failed to set abs timeout", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -162,7 +162,6 @@ private:
|
||||
int check_dest_validity_();
|
||||
int cancel_();
|
||||
int update_set_task_to_canceling_();
|
||||
int set_query_timeout_and_trx_timeout_(ObTimeoutCtx &timeout_ctx);
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObUserTenantBackupJobMgr);
|
||||
};
|
||||
|
||||
@ -1707,24 +1707,34 @@ int ObBackupSetTaskMgr::do_backup_completing_log_(ObArray<ObBackupLSTaskAttr> &l
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBackupSetTaskMgr::do_clean_up(ObMySQLTransaction &trans)
|
||||
int ObBackupSetTaskMgr::do_clean_up()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMySQLTransaction trans;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("[DATA_BACKUP]not init", K(ret));
|
||||
} else if (OB_FAIL(backup_service_->check_leader())) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to check lease", K(ret));
|
||||
} else if (OB_FAIL(ObBackupSkippedTabletOperator::batch_move_skip_tablet(*sql_proxy_, set_task_attr_.tenant_id_, set_task_attr_.task_id_))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to move skip tablet", K(ret));
|
||||
} else if (OB_FAIL(trans.start(sql_proxy_, gen_meta_tenant_id(set_task_attr_.tenant_id_)))) {
|
||||
LOG_WARN("failed to start trans", K(ret));
|
||||
} else if (OB_FAIL(ObBackupLSTaskInfoOperator::move_ls_task_info_to_his(trans, set_task_attr_.task_id_,
|
||||
set_task_attr_.tenant_id_))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to move task to history", K(ret), K(set_task_attr_));
|
||||
} else if (OB_FAIL(ObBackupSkippedTabletOperator::move_skip_tablet_to_his(trans, set_task_attr_.tenant_id_, set_task_attr_.task_id_))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to move skip tablet to history", K(ret));
|
||||
} else if (OB_FAIL(ObBackupLSTaskOperator::move_ls_to_his(trans, set_task_attr_.tenant_id_, set_task_attr_.job_id_))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to move ls to history", K(ret), K(set_task_attr_));
|
||||
} else if (OB_FAIL(ObBackupTaskOperator::move_task_to_his(trans, set_task_attr_.tenant_id_, set_task_attr_.job_id_))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to move task to history", K(ret), K(set_task_attr_));
|
||||
}
|
||||
if (trans.is_started()) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
|
||||
LOG_WARN("failed to end trans", K(ret), K(tmp_ret));
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -43,7 +43,7 @@ public:
|
||||
ObMultiVersionSchemaService &schema_service,
|
||||
ObBackupDataService &backup_service);
|
||||
int process();
|
||||
int do_clean_up(ObMySQLTransaction &trans);
|
||||
int do_clean_up();
|
||||
int deal_failed_set_task(ObMySQLTransaction &trans);
|
||||
share::ObBackupStatus::Status get_status() const { return set_task_attr_.status_.status_; }
|
||||
TO_STRING_KV(K_(meta_tenant_id), K_(set_task_attr));
|
||||
|
||||
@ -2215,6 +2215,50 @@ int ObBackupLSTaskOperator::update_max_tablet_checkpoint_scn(
|
||||
*--------------------------__all_backup_skipped_tablet------------------------------
|
||||
*/
|
||||
|
||||
int ObBackupSkippedTabletOperator::batch_move_skip_tablet(
|
||||
common::ObMySQLProxy &proxy, const uint64_t tenant_id, const int64_t task_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_user_tenant(tenant_id) || task_id <= 0) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(tenant_id), K(task_id));
|
||||
} else {
|
||||
ObSqlString sql;
|
||||
const int64_t DELETE_BATCH_NUM = 1024;
|
||||
int64_t affected_rows = 0;
|
||||
ObMySQLTransaction trans;
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(trans.start(&proxy, get_exec_tenant_id(tenant_id)))) {
|
||||
LOG_WARN("failed to start trans", K(ret));
|
||||
} else if (OB_FAIL(sql.assign_fmt(
|
||||
"insert into %s select * from %s where %s=%lu and %s=%lu order by turn_id, retry_id, tablet_id limit %ld",
|
||||
OB_ALL_BACKUP_SKIPPED_TABLET_HISTORY_TNAME, OB_ALL_BACKUP_SKIPPED_TABLET_TNAME,
|
||||
OB_STR_TENANT_ID, tenant_id, OB_STR_TASK_ID, task_id, DELETE_BATCH_NUM))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to init sql", K(ret));
|
||||
} else if (OB_FAIL(trans.write(get_exec_tenant_id(tenant_id), sql.ptr(), affected_rows))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to exec sql", K(ret), K(sql));
|
||||
} else if (0 == affected_rows) {
|
||||
break;
|
||||
} else if (OB_FALSE_IT(sql.reset())) {
|
||||
} else if (OB_FAIL(sql.assign_fmt(
|
||||
"delete from %s where %s=%lu and %s=%lu order by turn_id, retry_id, tablet_id limit %ld",
|
||||
OB_ALL_BACKUP_SKIPPED_TABLET_TNAME, OB_STR_TENANT_ID, tenant_id, OB_STR_TASK_ID, task_id, DELETE_BATCH_NUM))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to init sql", K(ret));
|
||||
} else if (OB_FAIL(trans.write(get_exec_tenant_id(tenant_id), sql.ptr(), affected_rows))) {
|
||||
LOG_WARN("[DATA_BACKUP]failed to exec sql", K(ret), K(sql));
|
||||
}
|
||||
if (trans.is_started()) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(trans.end(OB_SUCC(ret)))) {
|
||||
LOG_WARN("failed to end trans", K(ret), K(tmp_ret));
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBackupSkippedTabletOperator::get_skip_tablet(
|
||||
common::ObISQLClient &proxy,
|
||||
const bool need_lock,
|
||||
|
||||
@ -47,6 +47,7 @@ private:
|
||||
class ObBackupSkippedTabletOperator : public ObBackupBaseTableOperator
|
||||
{
|
||||
public:
|
||||
static int batch_move_skip_tablet(common::ObMySQLProxy &proxy, const uint64_t tenant_id, const int64_t task_id);
|
||||
static int get_skip_tablet(common::ObISQLClient &proxy, const bool need_lock, const uint64_t tenant_id,
|
||||
const int64_t task_id, const share::ObBackupSkippedType skipped_type,
|
||||
common::hash::ObHashSet<ObBackupSkipTabletAttr> &skip_tablets);
|
||||
|
||||
Reference in New Issue
Block a user