[CP] add new group list for storage

This commit is contained in:
godyangfight
2023-12-18 22:42:51 +00:00
committed by ob-robot
parent 1e07e4f406
commit 21d4366ea5
29 changed files with 236 additions and 161 deletions

View File

@ -83,23 +83,25 @@ PhysicalRestoreStatus ObPhysicalRestoreTableOperator::get_restore_status(
ObPhysicalRestoreTableOperator::ObPhysicalRestoreTableOperator()
: inited_(false),
sql_client_(NULL), tenant_id_(OB_INVALID_TENANT_ID)
sql_client_(NULL), tenant_id_(OB_INVALID_TENANT_ID), group_id_(0)
{
}
int ObPhysicalRestoreTableOperator::init(common::ObISQLClient *sql_client,
const uint64_t tenant_id)
const uint64_t tenant_id,
const int32_t group_id)
{
int ret = OB_SUCCESS;
if (inited_) {
ret = OB_INIT_TWICE;
LOG_WARN("physical restore table operator init twice", K(ret));
} else if (OB_ISNULL(sql_client) || is_meta_tenant(tenant_id)) {
} else if (OB_ISNULL(sql_client) || is_meta_tenant(tenant_id) || group_id < 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("sql client is null or tenant id is invalid", KR(ret), KP(sql_client), K(tenant_id));
LOG_WARN("sql client is null or tenant id is invalid", KR(ret), KP(sql_client), K(tenant_id), K(group_id));
} else {
sql_client_ = sql_client;
tenant_id_ = tenant_id;
group_id_ = group_id;
inited_ = true;
}
return ret;
@ -124,7 +126,7 @@ int ObPhysicalRestoreTableOperator::insert_job(const ObPhysicalRestoreJob &job_i
LOG_WARN("fail to fill dml splicer", KR(ret), K(tenant_id_), K(job_info));
} else if (OB_FAIL(dml.splice_batch_insert_sql(OB_ALL_RESTORE_JOB_TNAME, sql))) {
LOG_WARN("splice_insert_sql failed", KR(ret));
} else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), affected_rows))) {
} else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) {
LOG_WARN("execute sql failed", KR(ret), K(exec_tenant_id), K(sql));
} else if (affected_rows <= 0) {
ret = OB_ERR_UNEXPECTED;
@ -357,7 +359,7 @@ int ObPhysicalRestoreTableOperator::get_jobs(
} else if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s ORDER BY job_id, name",
OB_ALL_RESTORE_JOB_TNAME))) {
LOG_WARN("failed to assign sql", K(ret));
} else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr()))) {
} else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr(), group_id_))) {
LOG_WARN("execute sql failed", K(ret), K(exec_tenant_id), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
@ -748,7 +750,7 @@ int ObPhysicalRestoreTableOperator::check_job_exist(
} else if (OB_FAIL(sql.assign_fmt("SELECT count(*) as count FROM %s WHERE job_id = %ld",
OB_ALL_RESTORE_JOB_TNAME, job_id))) {
LOG_WARN("failed to assign sql", K(ret));
} else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr()))) {
} else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr(), group_id_))) {
LOG_WARN("execute sql failed", K(ret), K(exec_tenant_id), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
@ -785,7 +787,7 @@ int ObPhysicalRestoreTableOperator::get_job(
} else if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s WHERE job_id = %ld",
OB_ALL_RESTORE_JOB_TNAME, job_id))) {
LOG_WARN("failed to assign sql", K(ret));
} else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr()))) {
} else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr(), group_id_))) {
LOG_WARN("execute sql failed", K(ret), K(exec_tenant_id), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
@ -853,7 +855,7 @@ int ObPhysicalRestoreTableOperator::update_job_error_info(
to_cstring(addr), to_cstring(trace_id),
job_id))) {
LOG_WARN("failed to set sql", K(ret), K(mod_str), K(return_ret), K(trace_id), K(addr));
} else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), affected_rows))) {
} else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) {
LOG_WARN("execute sql failed", K(sql), KR(ret), K(exec_tenant_id));
} else if (!is_single_row(affected_rows)
&& !is_zero_row(affected_rows)) {
@ -888,7 +890,7 @@ int ObPhysicalRestoreTableOperator::update_job_status(
"AND name = 'status' AND value != 'RESTORE_FAIL'",
OB_ALL_RESTORE_JOB_TNAME, status_str, job_id))) {
LOG_WARN("fail to assign fmt", K(ret), K(sql));
} else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), affected_rows))) {
} else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) {
LOG_WARN("execute sql failed", K(sql), K(ret), K(exec_tenant_id));
} else if (!is_single_row(affected_rows)
&& !is_zero_row(affected_rows)) {
@ -918,7 +920,7 @@ int ObPhysicalRestoreTableOperator::remove_job(
LOG_WARN("failed to add pk column", K(ret), K(job_id));
} else if (OB_FAIL(dml.splice_delete_sql(OB_ALL_RESTORE_JOB_TNAME, sql))) {
LOG_WARN("splice_delete_sql failed", K(ret));
} else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), affected_rows))) {
} else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) {
LOG_WARN("execute sql failed", K(sql), K(ret), K(exec_tenant_id));
} else {
// no need to check affected_rows
@ -1017,7 +1019,7 @@ int ObPhysicalRestoreTableOperator::get_restore_job_by_sql_(
} else {
SMART_VAR(common::ObMySQLProxy::MySQLResult, res) {
common::sqlclient::ObMySQLResult *result = NULL;
if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr()))) {
if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr(), group_id_))) {
LOG_WARN("execute sql failed", K(ret), K(exec_tenant_id), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
@ -1082,7 +1084,7 @@ int ObPhysicalRestoreTableOperator::check_finish_restore_to_consistent_scn(
"left join %s as b on a.ls_id = b.ls_id",
OB_ALL_LS_STATUS_TNAME, OB_ALL_LS_META_TABLE_TNAME))) {
LOG_WARN("failed to assign sql", K(ret));
} else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr()))) {
} else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr(), group_id_))) {
LOG_WARN("execute sql failed", KR(ret), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;

View File

@ -41,7 +41,7 @@ public:
* @param[in] tenant_id of restore job, maybe sys or user tenant
* @param[in] sql client
*/
int init(common::ObISQLClient *sql_client, const uint64_t tenant_id);
int init(common::ObISQLClient *sql_client, const uint64_t tenant_id, const int32_t group_id);
/*
* description: insert into __all_restore_job
* @param[in] restore job
@ -160,6 +160,7 @@ private:
bool inited_;
common::ObISQLClient *sql_client_;
uint64_t tenant_id_;
int32_t group_id_;
DISALLOW_COPY_AND_ASSIGN(ObPhysicalRestoreTableOperator);
};
@ -188,7 +189,7 @@ int ObPhysicalRestoreTableOperator::update_restore_option(
SHARE_LOG(WARN, "fail to add column", KR(ret), K(option_value));
} else if (OB_FAIL(dml.splice_update_sql(OB_ALL_RESTORE_JOB_TNAME, sql))) {
SHARE_LOG(WARN, "splice_insert_sql failed", KR(ret));
} else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), affected_rows))) {
} else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) {
SHARE_LOG(WARN, "execute sql failed", K(sql), KR(ret), K(exec_tenant_id));
} else if (affected_rows <= 0) {
ret = OB_ERR_UNEXPECTED;

View File

@ -678,7 +678,7 @@ int ObHisRestoreJobPersistInfo::init_initiator_job_history(
* ------------------------------ObRestorePersistHelper---------------------
*/
ObRestorePersistHelper::ObRestorePersistHelper()
: is_inited_(false), tenant_id_(OB_INVALID_TENANT_ID)
: is_inited_(false), tenant_id_(OB_INVALID_TENANT_ID), group_id_(0)
{
}
@ -688,7 +688,7 @@ uint64_t ObRestorePersistHelper::get_exec_tenant_id() const
return gen_meta_tenant_id(tenant_id_);
}
int ObRestorePersistHelper::init(const uint64_t tenant_id)
int ObRestorePersistHelper::init(const uint64_t tenant_id, const int32_t group_id)
{
int ret = OB_SUCCESS;
if(!is_sys_tenant(tenant_id) && !is_user_tenant(tenant_id)) {
@ -696,6 +696,7 @@ int ObRestorePersistHelper::init(const uint64_t tenant_id)
LOG_WARN("invalid tenant id", K(ret), K(tenant_id));
} else {
tenant_id_ = tenant_id;
group_id_ = group_id;
is_inited_ = true;
}
return ret;
@ -711,7 +712,7 @@ int ObRestorePersistHelper::insert_initial_restore_progress(
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObRestorePersistHelper not init", K(ret));
} else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this))) {
} else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this, group_id_))) {
LOG_WARN("failed to init restore progress table", K(ret));
} else if (OB_FAIL(restore_progress_table_operator.insert_or_update_row(proxy, persist_info, affected_rows))) {
LOG_WARN("failed to insert initial restore progress", K(ret));
@ -730,7 +731,7 @@ int ObRestorePersistHelper::get_restore_process(
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObRestorePersistHelper not init", K(ret));
} else if (OB_FAIL(table_op.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this))) {
} else if (OB_FAIL(table_op.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this, group_id_))) {
LOG_WARN("failed to init restore progress table", K(ret));
} else if (OB_FAIL(table_op.get_row(proxy, false, key, persist_info))) {
LOG_WARN("failed to get persist info", KR(ret), K(key));
@ -748,7 +749,7 @@ int ObRestorePersistHelper::insert_restore_job_history(
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObRestorePersistHelper not init", K(ret));
} else if (OB_FAIL(table_op.init(OB_ALL_RESTORE_JOB_HISTORY_TNAME, *this))) {
} else if (OB_FAIL(table_op.init(OB_ALL_RESTORE_JOB_HISTORY_TNAME, *this, group_id_))) {
LOG_WARN("failed to init restore progress table", K(ret));
} else if (OB_FAIL(table_op.insert_row(proxy, persist_info, affected_rows))) {
LOG_WARN("failed to get persist info", KR(ret), K(persist_info));
@ -768,7 +769,7 @@ int ObRestorePersistHelper::get_restore_job_history(
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObRestorePersistHelper not init", K(ret));
} else if (OB_FAIL(table_op.init(OB_ALL_RESTORE_JOB_HISTORY_TNAME, *this))) {
} else if (OB_FAIL(table_op.init(OB_ALL_RESTORE_JOB_HISTORY_TNAME, *this, group_id_))) {
LOG_WARN("failed to init restore progress table", K(ret));
} else if (OB_FAIL(table_op.get_row(proxy, false, restore_key, persist_info))) {
LOG_WARN("failed to get persist info", KR(ret), K(restore_key));
@ -786,7 +787,7 @@ int ObRestorePersistHelper::insert_initial_ls_restore_progress(
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObRestorePersistHelper not init", K(ret));
} else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this))) {
} else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this, group_id_))) {
LOG_WARN("failed to init ls restore progress table", K(ret));
} else if (OB_FAIL(ls_restore_progress_table_operator.insert_or_update_row(proxy, persist_info, affected_rows))) {
LOG_WARN("failed to insert initial ls restore progress", K(ret));
@ -805,7 +806,7 @@ int ObRestorePersistHelper::record_ls_his_restore_progress(
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObRestorePersistHelper not init", K(ret));
} else if (OB_FAIL(ls_his_restore_table_operator.init(OB_ALL_LS_RESTORE_HISTORY_TNAME, *this))) {
} else if (OB_FAIL(ls_his_restore_table_operator.init(OB_ALL_LS_RESTORE_HISTORY_TNAME, *this, group_id_))) {
LOG_WARN("failed to init ls his restore progress table", K(ret));
} else if (OB_FAIL(ls_his_restore_table_operator.insert_row(proxy, persist_info, affected_rows))) {
LOG_WARN("failed to insert ls his restore progress", K(ret));
@ -824,7 +825,7 @@ int ObRestorePersistHelper::inc_need_restore_ls_count_by_one(
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObRestorePersistHelper not init", K(ret));
} else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this))) {
} else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this, group_id_))) {
LOG_WARN("failed to init restore progress table", K(ret));
} else if (OB_FAIL(restore_progress_table_operator.increase_column_by_one(trans, job_key, OB_STR_LS_COUNT, affected_rows))) {
LOG_WARN("failed to increase finished ls count in restore progress table", K(ret), K(job_key));
@ -854,11 +855,11 @@ int ObRestorePersistHelper::inc_finished_ls_count_by_one(
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObRestorePersistHelper not init", K(ret));
} else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this))) {
} else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this, group_id_))) {
LOG_WARN("failed to init restore progress table", K(ret));
} else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this))) {
} else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this, group_id_))) {
LOG_WARN("failed to init ls restore progress table", K(ret));
} else if (OB_FAIL(ls_his_restore_table_operator.init(OB_ALL_LS_RESTORE_HISTORY_TNAME, *this))) {
} else if (OB_FAIL(ls_his_restore_table_operator.init(OB_ALL_LS_RESTORE_HISTORY_TNAME, *this, group_id_))) {
LOG_WARN("failed to init ls restore history table", K(ret));
} else if (OB_FAIL(ls_restore_progress_table_operator.get_row(trans, true/* need lock */, ls_key, progress_info))) {
LOG_WARN("failed to get ls restore progress", K(ret), K(ls_key));
@ -888,9 +889,9 @@ int ObRestorePersistHelper::inc_finished_tablet_count_by_one(
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObRestorePersistHelper not init", K(ret));
} else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this))) {
} else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this, group_id_))) {
LOG_WARN("failed to init restore progress table", K(ret));
} else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this))) {
} else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this, group_id_))) {
LOG_WARN("failed to init ls restore progress table", K(ret));
} else if (OB_FAIL(restore_progress_table_operator.increase_column_by_one(trans, job_key, OB_STR_FINISH_TABLET_COUNT, affected_rows))) {
LOG_WARN("failed to increase finished tablet count in restore progress table", K(ret), K(job_key));
@ -915,9 +916,9 @@ int ObRestorePersistHelper::inc_finished_restored_block_bytes(
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObRestorePersistHelper not init", K(ret));
} else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this))) {
} else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this, group_id_))) {
LOG_WARN("failed to init restore progress table", K(ret));
} else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this))) {
} else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this, group_id_))) {
LOG_WARN("failed to init ls restore progress table", K(ret));
} else if (OB_FAIL(restore_progress_table_operator.increase_column_by(trans, job_key, OB_STR_FINISH_BYTES, inc_finished_bytes, affected_rows))) {
LOG_WARN("failed to update finished bytes in restore progress table", K(ret), K(job_key), K(inc_finished_bytes));
@ -939,7 +940,7 @@ int ObRestorePersistHelper::update_log_restore_progress(
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObRestorePersistHelper not init", K(ret));
} else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this))) {
} else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this, group_id_))) {
LOG_WARN("failed to init ls restore progress table", K(ret));
} else if (OB_FAIL(ls_restore_progress_table_operator.update_uint_column(proxy, ls_key, OB_STR_LAST_REPLAY_SCN, last_replay_scn.get_val_for_inner_table_field(), affected_rows))) {
LOG_WARN("failed to update last replay scn in ls restore progress table", K(ret), K(ls_key), K(last_replay_scn));
@ -965,7 +966,7 @@ int ObRestorePersistHelper::update_ls_restore_status(
ret = OB_INVALID_ARGUMENT;
LOG_WARN("comment must not be null", K(ret), KP(comment));
} else if (OB_FALSE_IT(trace_id.to_string(trace_id_str, OB_MAX_TRACE_ID_BUFFER_SIZE))) {
} else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this))) {
} else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this, group_id_))) {
LOG_WARN("failed to init ls restore progress table", K(ret));
} else if (OB_FAIL(ls_restore_progress_table_operator.update_int_column(proxy, ls_key, OB_STR_STATUS,
status, affected_rows))) {

View File

@ -407,7 +407,7 @@ public:
uint64_t get_exec_tenant_id() const override;
int init(const uint64_t tenant_id);
int init(const uint64_t tenant_id, const int32_t group_id);
int insert_initial_restore_progress(
common::ObISQLClient &proxy, const ObRestoreProgressPersistInfo &persist_info) const;
@ -474,6 +474,7 @@ private:
bool is_inited_;
uint64_t tenant_id_; // sys or user tenant id
int32_t group_id_;
};