[CP] [PHYSICAL RESTORE] Remove dependence of schema fallback

This commit is contained in:
obdev
2022-09-19 08:03:00 +00:00
committed by wangzelin.wzl
parent fbabe9d0f1
commit 4507903009
4 changed files with 323 additions and 62 deletions

View File

@ -455,10 +455,10 @@ int ObRestoreScheduler::assign_pool_list(const char *str, common::ObIArray<ObStr
}
int ObRestoreScheduler::fill_restore_backup_info_param(
share::ObPhysicalRestoreJob &job, share::ObRestoreBackupInfoUtil::GetRestoreBackupInfoParam &param)
const share::ObPhysicalRestoreJob &job, share::ObRestoreBackupInfoUtil::GetRestoreBackupInfoParam &param)
{
int ret = OB_SUCCESS;
ObPhysicalRestoreBackupDestList &dest_list = job.multi_restore_path_list_;
const ObPhysicalRestoreBackupDestList &dest_list = job.multi_restore_path_list_;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", KR(ret));
@ -541,6 +541,7 @@ int ObRestoreScheduler::fill_pkeys_for_physical_restore_log(const ObPhysicalRest
int hash_ret = restore_pure_ids.exist_refactored(tid);
if (OB_HASH_EXIST == hash_ret) {
// skip
LOG_TRACE("table exist in base version, just skip", KR(ret), K(tid));
} else if (OB_HASH_NOT_EXIST == hash_ret) {
// check if partition has archived log
const uint64_t table_id = combine_id(job.backup_tenant_id_, tid);
@ -889,7 +890,6 @@ int ObRestoreScheduler::set_member_list(const ObPhysicalRestoreJob &job_info, co
{
int ret = OB_SUCCESS;
uint64_t tenant_id = job_info.tenant_id_;
const int64_t bucket_num = 1024;
common::hash::ObHashMap<ObPartitionKey, ObPartitionReplica::MemberList> member_list_map;
DEBUG_SYNC(BEFORE_PHYSICAL_RESTORE_SET_MEMBER_LIST);
if (!inited_) {
@ -897,8 +897,9 @@ int ObRestoreScheduler::set_member_list(const ObPhysicalRestoreJob &job_info, co
LOG_WARN("not inited", K(ret));
} else if (OB_FAIL(check_stop())) {
LOG_WARN("restore scheduler stopped", K(ret));
} else if (OB_FAIL(member_list_map.create(
bucket_num, ObModIds::OB_RESTORE_SET_MEMBER_LIST, ObModIds::OB_RESTORE_SET_MEMBER_LIST))) {
} else if (OB_FAIL(member_list_map.create(hash::cal_next_prime(BUCKET_NUM),
ObModIds::OB_RESTORE_SET_MEMBER_LIST,
ObModIds::OB_RESTORE_SET_MEMBER_LIST))) {
LOG_WARN("fail to create hashmap", K(ret), K(tenant_id));
} else if (OB_FAIL(build_member_list_map(tenant_id, member_list_map))) {
LOG_WARN("fail to build member_list_map", K(ret), K(tenant_id));
@ -1723,10 +1724,9 @@ int ObRestoreScheduler::filter_schema(const ObPhysicalRestoreJob &job_info)
} else {
common::hash::ObHashSet<uint64_t> table_white_list;
common::hash::ObHashSet<uint64_t> tablegroup_white_list;
const int64_t BUCKET_NUM = 1024;
if (OB_FAIL(table_white_list.create(BUCKET_NUM))) {
if (OB_FAIL(table_white_list.create(hash::cal_next_prime(BUCKET_NUM)))) {
LOG_WARN("fail to init table_white_list", KR(ret));
} else if (OB_FAIL(tablegroup_white_list.create(BUCKET_NUM))) {
} else if (OB_FAIL(tablegroup_white_list.create(hash::cal_next_prime(BUCKET_NUM)))) {
LOG_WARN("fail to init tablegroup_white_list", KR(ret));
} else if (OB_FAIL(gen_white_list(job_info, table_items, table_white_list, tablegroup_white_list))) {
LOG_WARN("fail to gen white list", KR(ret), K(table_items));
@ -2450,11 +2450,10 @@ int ObRestoreScheduler::convert_table_options(const uint64_t tenant_id)
int ObRestoreScheduler::convert_index_status(const ObPhysicalRestoreJob &job_info)
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard base_guard; // schema_guard with schema_version using by data backup
ObSchemaGetterGuard schema_guard; // schema_guard with local latest schema version
ObArray<uint64_t> error_index_ids;
ObArray<uint64_t> avaliable_index_ids;
ObArray<uint64_t> unavaliable_index_ids;
ObMultiVersionSchemaService::RefreshSchemaMode mode = ObMultiVersionSchemaService::FORCE_FALLBACK;
uint64_t tenant_id = job_info.tenant_id_;
if (!inited_) {
ret = OB_NOT_INIT;
@ -2464,9 +2463,6 @@ int ObRestoreScheduler::convert_index_status(const ObPhysicalRestoreJob &job_inf
LOG_WARN("invalid tenant id", KR(ret), K(tenant_id));
} else if (OB_FAIL(check_stop())) {
LOG_WARN("restore scheduler stopped", KR(ret));
} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(
tenant_id, base_guard, job_info.schema_version_, OB_INVALID_VERSION /*latest*/, mode))) {
LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id), "schema_version", job_info.schema_version_);
} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id));
} else {
@ -2479,37 +2475,29 @@ int ObRestoreScheduler::convert_index_status(const ObPhysicalRestoreJob &job_inf
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error unexpected, table schema is NULL", KR(ret));
} else if (is_inner_table(table_schema->get_table_id())) {
// inner_table's index won't rebuild, just skip
} else if (table_schema->is_index_table()) {
const uint64_t index_id = table_schema->get_table_id();
const ObIndexStatus index_status = table_schema->get_index_status();
if (INDEX_STATUS_UNAVAILABLE == index_status || INDEX_STATUS_RESTORE_INDEX_ERROR == index_status) { // case 1
if (INDEX_STATUS_UNAVAILABLE == index_status || INDEX_STATUS_RESTORE_INDEX_ERROR == index_status) {
// case 1
if (OB_FAIL(error_index_ids.push_back(index_id))) {
LOG_WARN("fail to push back index id", KR(ret), K(index_id));
}
} else if (INDEX_STATUS_INDEX_ERROR == index_status || INDEX_STATUS_UNUSABLE == index_status) {
// case 2, just skip
} else if (INDEX_STATUS_AVAILABLE == index_status) {
const ObSimpleTableSchemaV2 *index_schema = NULL;
if (OB_FAIL(base_guard.get_table_schema(index_id, index_schema))) {
LOG_WARN("fail to get index schema", KR(ret), K(index_id));
} else if (OB_ISNULL(index_schema)) {
// case 3.2 index is created when clog backup.
if (OB_FAIL(unavaliable_index_ids.push_back(index_id))) {
LOG_WARN("fail to push back index id", KR(ret), K(index_id));
}
} else if (INDEX_STATUS_AVAILABLE != index_schema->get_index_status()) {
// case 3.2 index is avaliable when clog backup.
if (OB_FAIL(unavaliable_index_ids.push_back(index_id))) {
LOG_WARN("fail to push back index id", KR(ret), K(index_id));
}
} else {
// case 3.1, just skip
if (OB_FAIL(avaliable_index_ids.push_back(index_id))) {
LOG_WARN("fail to push back index id", KR(ret), K(index_id));
}
}
}
}
}
if (FAILEDx(update_index_status(error_index_ids, INDEX_STATUS_INDEX_ERROR))) {
if (FAILEDx(generate_unavaliable_index_ids_(job_info, avaliable_index_ids, unavaliable_index_ids))) { // case 3
LOG_WARN("fail to generate unavaliable_index_ids", KR(ret), K(tenant_id));
} else if (OB_FAIL(update_index_status(error_index_ids, INDEX_STATUS_INDEX_ERROR))) {
LOG_WARN("fail to update index status", KR(ret), K(tenant_id));
} else if (OB_FAIL(update_index_status(unavaliable_index_ids, INDEX_STATUS_UNAVAILABLE))) {
LOG_WARN("fail to update index status", KR(ret), K(tenant_id));
@ -2518,6 +2506,123 @@ int ObRestoreScheduler::convert_index_status(const ObPhysicalRestoreJob &job_inf
return ret;
}
int ObRestoreScheduler::generate_unavaliable_index_ids_(const ObPhysicalRestoreJob &job_info,
const ObIArray<uint64_t> &avaliable_index_ids, ObIArray<uint64_t> &unavaliable_index_ids)
{
int ret = OB_SUCCESS;
const int64_t index_cnt = avaliable_index_ids.count();
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", KR(ret));
} else if (OB_FAIL(check_stop())) {
LOG_WARN("restore scheduler stopped", KR(ret));
} else if (index_cnt <= 0) {
// skip
} else {
const uint64_t tenant_id = job_info.tenant_id_;
const int64_t schema_version = job_info.schema_version_;
common::hash::ObHashSet<uint64_t> base_avaliable_index_ids;
if (OB_FAIL(base_avaliable_index_ids.create(hash::cal_next_prime(BUCKET_NUM), "BaseIdxes", "BaseIdxes"))) {
LOG_WARN("failed to create base_avaliable_index_ids", KR(ret));
}
const int64_t BATCH_FETCH_CNT = 1000;
int64_t start_idx = 0;
int64_t end_idx = min(start_idx + BATCH_FETCH_CNT, index_cnt);
while (OB_SUCC(ret) && end_idx <= index_cnt && start_idx < end_idx) {
if (OB_FAIL(check_stop())) {
LOG_WARN("restore scheduler stopped", KR(ret));
} else if (OB_FAIL(batch_fetch_base_avaliable_index_ids_(
tenant_id, schema_version, avaliable_index_ids, start_idx, end_idx, base_avaliable_index_ids))) {
LOG_WARN("fail to fetch base avaliable index tids", KR(ret), K(start_idx), K(end_idx));
} else {
start_idx = end_idx;
end_idx = min(start_idx + BATCH_FETCH_CNT, index_cnt);
}
} // end while
for (int64_t i = 0; OB_SUCC(ret) && i < index_cnt; i++) {
const uint64_t &index_id = avaliable_index_ids.at(i);
int64_t hash_ret = base_avaliable_index_ids.exist_refactored(index_id);
if (OB_HASH_EXIST == hash_ret) {
// index is avaliable in base schema version, skip rebuild
} else if (OB_HASH_NOT_EXIST == hash_ret) {
// index not exist or not avaliable in base schema version
if (OB_FAIL(unavaliable_index_ids.push_back(index_id))) {
LOG_WARN("fail to push back index_id", KR(ret), K(index_id));
}
} else {
ret = OB_SUCCESS == hash_ret ? OB_ERR_UNEXPECTED : hash_ret;
LOG_WARN("fail to check index_id exist", KR(ret), K(hash_ret), K(index_id));
}
} // end for
}
return ret;
}
// [start_idx, end_idx)
int ObRestoreScheduler::batch_fetch_base_avaliable_index_ids_(const uint64_t tenant_id, const int64_t schema_version,
const ObIArray<uint64_t> &avaliable_index_ids, const int64_t start_idx, const int64_t end_idx,
common::hash::ObHashSet<uint64_t> &base_avaliable_index_ids)
{
int ret = OB_SUCCESS;
const int64_t index_cnt = avaliable_index_ids.count();
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || OB_SYS_TENANT_ID == tenant_id || schema_version <= 0 ||
start_idx >= end_idx || end_idx > index_cnt)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", KR(ret), K(tenant_id), K(schema_version), K(start_idx), K(end_idx), K(index_cnt));
} else {
ObSqlString sql;
HEAP_VAR(ObMySQLProxy::MySQLResult, res)
{
common::sqlclient::ObMySQLResult *result = NULL;
if (OB_FAIL(sql.assign_fmt("SELECT table_id FROM ( "
"SELECT table_id, is_deleted, index_status, "
"ROW_NUMBER() OVER (PARTITION BY table_id ORDER BY schema_version DESC) AS RN "
"FROM %s WHERE tenant_id = 0 AND schema_version <= %ld and table_id in (",
OB_ALL_TABLE_V2_HISTORY_TNAME,
schema_version))) {
LOG_WARN("fail to assign sql", KR(ret), K(tenant_id), K(schema_version));
}
for (int64_t i = start_idx; OB_SUCC(ret) && i < end_idx; i++) {
if (OB_FAIL(sql.append_fmt("%s %ld",
i == start_idx ? "" : ",",
ObSchemaUtils::get_extract_schema_id(tenant_id, avaliable_index_ids.at(i))))) {
LOG_WARN("fail to append sql", KR(ret), K(i), "index_id", avaliable_index_ids.at(i));
}
} // end for
if (FAILEDx(sql.append_fmt(")) AS A WHERE A.rn = 1 AND is_deleted = 0 AND index_status = %ld",
static_cast<int64_t>(INDEX_STATUS_AVAILABLE)))) {
LOG_WARN("fail to append sql", KR(ret));
} else if (OB_FAIL(sql_proxy_->read(res, tenant_id, sql.ptr()))) {
LOG_WARN("fail to execute read", KR(ret), K(tenant_id), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get result", KR(ret), K(tenant_id), K(sql));
} else {
uint64_t index_id = OB_INVALID_ID;
while (OB_SUCC(ret) && OB_SUCC(result->next())) {
EXTRACT_INT_FIELD_MYSQL_WITH_TENANT_ID(*result, "table_id", index_id, tenant_id);
if (FAILEDx(base_avaliable_index_ids.set_refactored(index_id))) { // overwrite
LOG_WARN("fail to set index_id", KR(ret), K(index_id));
} else {
LOG_TRACE("get base avaliable index id", KR(ret), K(index_id));
}
} // end while
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
} else {
ret = OB_SUCC(ret) ? OB_ERR_UNEXPECTED : ret;
LOG_WARN("iter failed", KR(ret));
}
}
} // end HEAP_VAR
}
return ret;
}
int ObRestoreScheduler::update_index_status(const common::ObIArray<uint64_t> &index_ids, ObIndexStatus index_status)
{
int ret = OB_SUCCESS;
@ -2635,11 +2740,10 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i
int ret = OB_SUCCESS;
uint64_t tenant_id = job_info.tenant_id_;
ObSchemaGetterGuard latest_guard;
ObSchemaGetterGuard base_guard;
int64_t local_schema_version = OB_INVALID_VERSION;
ObMultiVersionSchemaService::RefreshSchemaMode mode = ObMultiVersionSchemaService::FORCE_FALLBACK;
ObArray<const ObSimpleTableSchemaV2 *> tables;
ObArray<const ObTablegroupSchema *> tablegroups;
common::hash::ObHashSet<ObPGKey> pg_key_set;
DEBUG_SYNC(BEFORE_PHYSICAL_RESTORE_USER_PARTITIONS);
if (!inited_) {
ret = OB_NOT_INIT;
@ -2652,6 +2756,8 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i
} else if (job_info.schema_version_ <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("schema version is invalid", KR(ret), K(job_info));
} else if (OB_FAIL(pg_key_set.create(hash::cal_next_prime(BUCKET_NUM), "ResDataPGKeys", "ResDataPGKeys"))) {
LOG_WARN("failed to create pg_key set", KR(ret));
} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, latest_guard))) {
LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id));
} else if (OB_FAIL(latest_guard.get_schema_version(tenant_id, local_schema_version))) {
@ -2659,9 +2765,8 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i
} else if (job_info.schema_version_ > local_schema_version) {
ret = OB_EAGAIN;
LOG_WARN("local schema is old, try again", KR(ret), K(local_schema_version), K(job_info));
} else if (OB_FAIL(schema_service_->get_tenant_schema_guard(
tenant_id, base_guard, job_info.schema_version_, OB_INVALID_VERSION /*latest*/, mode))) {
LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id), "schema_version", job_info.schema_version_);
} else if (OB_FAIL(get_pg_keys_for_physical_restore_data_(job_info, pg_key_set))) {
LOG_WARN("fail to get pg_key set", KR(ret), K(job_info));
} else if (OB_FAIL(latest_guard.get_user_table_schemas_in_tenant(tenant_id, tables))) {
LOG_WARN("get tenant table schemas failed", KR(ret), K(tenant_id));
} else if (OB_FAIL(latest_guard.get_tablegroup_schemas_in_tenant(tenant_id, tablegroups))) {
@ -2674,7 +2779,6 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i
const int64_t PARTITION_CNT_PER_RPC = 5;
for (int64_t i = 0; i < tablegroups.count() && OB_SUCC(ret); ++i) {
const ObTablegroupSchema *tablegroup = tablegroups.at(i);
const ObTablegroupSchema *base_tablegroup = NULL;
uint64_t tablegroup_id = OB_INVALID_ID;
ObRestorePartitionsArg arg;
if (OB_FAIL(check_stop())) {
@ -2685,9 +2789,7 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i
} else if (FALSE_IT(tablegroup_id = tablegroup->get_tablegroup_id())) {
} else if (!tablegroup->has_self_partition()) {
// bypass
} else if (OB_FAIL(base_guard.get_tablegroup_schema(tablegroup_id, base_tablegroup))) {
LOG_WARN("fail to get base tablegroup schema", KR(ret), K(tablegroup_id));
} else if (OB_FAIL(fill_restore_partition_arg(tablegroup_id, base_tablegroup, arg))) {
} else if (OB_FAIL(fill_restore_partition_arg_(*tablegroup, pg_key_set, arg))) {
LOG_WARN("fail to fill restore partition arg", KR(ret), K(tablegroup_id));
} else {
int64_t timeout = (tablegroup->get_all_part_num() / PARTITION_CNT_PER_RPC) * TIMEOUT_PER_RPC;
@ -2701,7 +2803,6 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i
}
for (int64_t i = 0; i < tables.count() && OB_SUCC(ret); ++i) {
const ObSimpleTableSchemaV2 *table = tables.at(i);
const ObSimpleTableSchemaV2 *base_table = NULL;
uint64_t table_id = OB_INVALID_ID;
ObRestorePartitionsArg arg;
if (OB_FAIL(check_stop())) {
@ -2715,9 +2816,7 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i
} else if (is_inner_table(table_id)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("should not be inner table", KR(ret), K(table_id));
} else if (OB_FAIL(base_guard.get_table_schema(table_id, base_table))) {
LOG_WARN("fail to get base table schema", KR(ret), K(table_id));
} else if (OB_FAIL(fill_restore_partition_arg(table_id, base_table, arg))) {
} else if (OB_FAIL(fill_restore_partition_arg_(*table, pg_key_set, arg))) {
LOG_WARN("fail to fill restore partition arg", KR(ret), K(table_id));
} else {
int64_t timeout = (table->get_all_part_num() / PARTITION_CNT_PER_RPC) * TIMEOUT_PER_RPC;
@ -2738,32 +2837,71 @@ int ObRestoreScheduler::create_user_partitions(const ObPhysicalRestoreJob &job_i
return ret;
}
int ObRestoreScheduler::fill_restore_partition_arg(
const uint64_t schema_id, const ObPartitionSchema *schema, obrpc::ObRestorePartitionsArg &arg)
int ObRestoreScheduler::get_pg_keys_for_physical_restore_data_(
const ObPhysicalRestoreJob &job_info, common::hash::ObHashSet<common::ObPGKey> &pg_key_set)
{
int ret = OB_SUCCESS;
arg.schema_id_ = schema_id;
ObRestoreBackupInfoUtil::GetRestoreBackupInfoParam param;
ObArray<ObPGKey> normal_pg_keys;
const uint64_t tenant_id = job_info.tenant_id_;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", KR(ret));
} else if (OB_FAIL(check_stop())) {
LOG_WARN("restore scheduler stopped", KR(ret));
} else if (OB_FAIL(fill_restore_backup_info_param(job_info, param))) {
LOG_WARN("fail to fill restore backup info param", KR(ret), K(job_info));
} else if (OB_FAIL(ObRestoreBackupInfoUtil::get_restore_normal_pg_keys(param, normal_pg_keys))) {
LOG_WARN("fail to get restore normal pg keys", KR(ret), K(job_info));
} else {
ObPGKey new_pg_key;
for (int64_t i = 0; OB_SUCC(ret) && i < normal_pg_keys.count(); i++) {
const ObPGKey &pg_key = normal_pg_keys.at(i);
// convert table_id with new tenant_id
if (OB_FAIL(new_pg_key.init(
combine_id(tenant_id, pg_key.get_table_id()), pg_key.get_partition_id(), pg_key.get_partition_cnt()))) {
LOG_WARN("fail to init new pg_key", KR(ret), K(tenant_id), K(pg_key));
} else if (OB_FAIL(pg_key_set.set_refactored(new_pg_key))) { // overwrite
LOG_WARN("fail to set pg_key", KR(ret), K(new_pg_key), K(pg_key));
} else {
LOG_TRACE("get base restore pg_key", KR(ret), K(new_pg_key), K(pg_key));
}
} // end for
}
return ret;
}
int ObRestoreScheduler::fill_restore_partition_arg_(const ObPartitionSchema &schema,
const common::hash::ObHashSet<common::ObPGKey> &pg_key_set, obrpc::ObRestorePartitionsArg &arg)
{
int ret = OB_SUCCESS;
arg.schema_id_ = schema.get_table_id();
arg.mode_ = OB_CREATE_TABLE_MODE_PHYSICAL_RESTORE;
bool skip = false;
if (OB_ISNULL(schema)) {
// schema doesn't exist for data restore, and log restore is needed.
skip = true;
} else if (!is_new_tablegroup_id(arg.schema_id_)) {
// unavaliable index for data restore
const ObTableSchema *table = static_cast<const ObTableSchema *>(schema);
if (table->has_self_partition() && table->is_global_index_table() &&
(table->is_dropped_schema() || INDEX_STATUS_AVAILABLE != table->get_index_status())) {
if (!is_new_tablegroup_id(arg.schema_id_)) {
const ObTableSchema &table = static_cast<const ObTableSchema &>(schema);
if (table.has_self_partition() && table.is_global_index_table() &&
(table.is_dropped_schema() || INDEX_STATUS_AVAILABLE != table.get_index_status())) {
skip = true;
}
}
if (OB_SUCC(ret) && !skip) {
bool check_dropped_schema = false;
ObPartitionKeyIter iter(arg.schema_id_, *schema, check_dropped_schema);
int64_t partition_id = OB_INVALID_ID;
while (OB_SUCC(ret) && OB_SUCC(iter.next_partition_id_v2(partition_id))) {
if (OB_FAIL(arg.partition_ids_.push_back(partition_id))) {
LOG_WARN("fail to push back partition_id", K(ret), K(schema_id), K(partition_id));
ObPartitionKeyIter iter(arg.schema_id_, schema, check_dropped_schema);
ObPGKey pg_key;
while (OB_SUCC(ret) && OB_SUCC(iter.next_partition_key_v2(pg_key))) {
int hash_ret = pg_key_set.exist_refactored(pg_key);
if (OB_HASH_NOT_EXIST == hash_ret) {
// skip
LOG_TRACE("pg_key not exist in base version, just skip", KR(ret), K(pg_key));
} else if (OB_HASH_EXIST == hash_ret) {
if (OB_FAIL(arg.partition_ids_.push_back(pg_key.get_partition_id()))) {
LOG_WARN("fail to push back partition_id", K(ret), K(pg_key));
}
} else {
ret = OB_SUCCESS == hash_ret ? OB_ERR_UNEXPECTED : hash_ret;
LOG_WARN("fail to check pg key exist", KR(ret), K(hash_ret), K(pg_key));
}
}
if (OB_ITER_END == ret) {

View File

@ -95,7 +95,7 @@ private:
/* restore tenant related */
int fill_job_info(share::ObPhysicalRestoreJob& job, obrpc::ObCreateTenantArg& arg);
int fill_restore_backup_info_param(
share::ObPhysicalRestoreJob& job, share::ObRestoreBackupInfoUtil::GetRestoreBackupInfoParam& param);
const share::ObPhysicalRestoreJob& job, share::ObRestoreBackupInfoUtil::GetRestoreBackupInfoParam& param);
int fill_backup_info(share::ObPhysicalRestoreJob& job, obrpc::ObCreateTenantArg& arg);
int fill_pkeys_for_physical_restore_log(const share::ObPhysicalRestoreJob& job, obrpc::ObCreateTenantArg& arg);
int fill_rs_info(share::ObPhysicalRestoreJob& job);
@ -119,8 +119,20 @@ private:
int convert_parameters(const share::ObPhysicalRestoreJob& job_info);
int log_nop_operation(const share::ObPhysicalRestoreJob& job_info);
int convert_column_statistic(const uint64_t tenant_id);
int generate_unavaliable_index_ids_(const share::ObPhysicalRestoreJob &job_info,
const common::ObIArray<uint64_t> &avaliable_index_ids, common::ObIArray<uint64_t> &unavaliable_index_ids);
int batch_fetch_base_avaliable_index_ids_(const uint64_t tenant_id, const int64_t schema_version,
const common::ObIArray<uint64_t> &avaliable_index_ids, const int64_t start_idx, const int64_t end_idx,
common::hash::ObHashSet<uint64_t> &base_avaliable_index_ids);
/*------------------------*/
/*---create user partitions---*/
int get_pg_keys_for_physical_restore_data_(
const share::ObPhysicalRestoreJob &job_info, common::hash::ObHashSet<common::ObPGKey> &pg_key_set);
int fill_restore_partition_arg_(const share::schema::ObPartitionSchema &schema,
const common::hash::ObHashSet<common::ObPGKey> &pg_key_set, obrpc::ObRestorePartitionsArg &arg);
/*----------------------------*/
/* filter schema */
int gen_white_list(const share::ObPhysicalRestoreJob& job_info,
const common::ObIArray<obrpc::ObTableItem>& table_items, common::hash::ObHashSet<uint64_t>& table_white_list,
@ -170,8 +182,6 @@ private:
const PhysicalRestorePartition& partition, const ObPhysicalRestoreStat& stat, share::ObDMLSqlSplicer& dml);
int clear_member_list_table(const uint64_t tenant_id);
int fill_restore_partition_arg(
const uint64_t schema_id, const share::schema::ObPartitionSchema* schema, obrpc::ObRestorePartitionsArg& arg);
/*------------------------*/
/* upgrade related */
@ -191,6 +201,9 @@ private:
private:
int drop_tenant_force_if_necessary(const share::ObPhysicalRestoreJob& job_info);
private:
static const int64_t BUCKET_NUM = 1024;
bool inited_;
mutable ObRestoreIdling idling_;
share::schema::ObMultiVersionSchemaService* schema_service_;

View File

@ -1293,6 +1293,109 @@ int ObRestoreBackupInfoUtil::get_restore_sys_table_ids(
return OB_SUCCESS;
}
int ObRestoreBackupInfoUtil::get_restore_normal_pg_keys(
const GetRestoreBackupInfoParam &param, common::ObIArray<common::ObPartitionKey> &pkey_list)
{
int ret = OB_SUCCESS;
bool is_cluster_level = param.backup_set_path_list_.empty();
if (is_cluster_level) {
if (OB_FAIL(get_restore_normal_pg_keys_v1_(param, pkey_list))) {
LOG_WARN("failed to get restore normal pg keys v1", KR(ret));
}
} else {
if (OB_FAIL(get_restore_normal_pg_keys_v2_(param, pkey_list))) {
LOG_WARN("failed to get restore normal pg keys v2", KR(ret));
}
}
return ret;
}
int ObRestoreBackupInfoUtil::get_restore_normal_pg_keys_v1_(
const GetRestoreBackupInfoParam &param, common::ObIArray<common::ObPartitionKey> &pkey_list)
{
int ret = OB_SUCCESS;
ObClusterBackupDest dest;
ObExternBackupInfoMgr backup_info_mgr;
ObExternTenantInfoMgr tenant_info_mgr;
ObExternTenantInfo tenant_info;
ObExternBackupInfo backup_info;
const int64_t cluster_version = ObClusterVersion::get_instance().get_cluster_version();
ObFakeBackupLeaseService fake_backup_lease;
ObTenantNameSimpleMgr tenant_name_mgr;
ObExternPGListMgr pg_list_mgr;
uint64_t backup_tenant_id = 0;
const char *backup_dest = param.backup_dest_;
const char *backup_cluster_name = param.backup_cluster_name_;
const int64_t cluster_id = param.cluster_id_;
const int64_t incarnation = param.incarnation_;
const char *backup_tenant_name = param.backup_tenant_name_;
const int64_t restore_timestamp = param.restore_timestamp_;
const char *passwd_array = param.passwd_array_;
if (OB_ISNULL(backup_dest) || OB_ISNULL(backup_cluster_name) || cluster_id <= 0 || incarnation < 0 ||
OB_ISNULL(backup_tenant_name) || restore_timestamp <= 0 || OB_ISNULL(passwd_array)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get restore backup info get invalid argument",
K(ret),
KP(backup_dest),
KP(backup_cluster_name),
K(cluster_id),
KP(backup_tenant_name),
K(restore_timestamp),
KP(passwd_array));
} else if (strlen(backup_dest) >= share::OB_MAX_BACKUP_DEST_LENGTH ||
strlen(backup_cluster_name) >= OB_MAX_CLUSTER_NAME_LENGTH) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("backup dest or backup cluster name size over flow", K(ret), K(backup_dest), K(backup_cluster_name));
} else if (OB_FAIL(dest.set(backup_dest, backup_cluster_name, cluster_id, incarnation))) {
LOG_WARN(
"failed to set backup dest", K(ret), K(backup_dest), K(backup_cluster_name), K(cluster_id), K(incarnation));
} else if (OB_FAIL(tenant_name_mgr.init())) {
LOG_WARN("faiuiled to init tenant_name mgr", K(ret));
} else if (OB_FAIL(tenant_name_mgr.read_backup_file(dest))) {
LOG_WARN("failed to read backup tenant name mgr", K(ret), K(dest));
} else if (OB_FAIL(tenant_name_mgr.get_tenant_id(backup_tenant_name, restore_timestamp, backup_tenant_id))) {
LOG_WARN("failed to backup tenant id", K(ret), K(backup_tenant_name), K(restore_timestamp));
} else if (OB_FAIL(tenant_info_mgr.init(dest, fake_backup_lease))) {
LOG_WARN("failed to init tenant info mgr", K(ret), K(dest));
} else if (OB_FAIL(tenant_info_mgr.find_tenant_info(backup_tenant_id, tenant_info))) {
LOG_WARN("failed to find tenant info", K(ret), K(backup_tenant_id));
} else if (OB_FAIL(backup_info_mgr.init(tenant_info.tenant_id_, dest, fake_backup_lease))) {
LOG_WARN("failed to init backup info mgr", K(ret), K(dest), K(tenant_info));
} else if (OB_FAIL(backup_info_mgr.find_backup_info(restore_timestamp, passwd_array, backup_info))) {
LOG_WARN("failed to find backup info", K(ret), K(restore_timestamp), K(tenant_info));
} else if (OB_FAIL(pg_list_mgr.init(backup_tenant_id,
backup_info.full_backup_set_id_,
backup_info.inc_backup_set_id_,
dest,
backup_info.date_,
backup_info.compatible_,
fake_backup_lease))) {
LOG_WARN("failed to init pg list mgr", K(ret), K(backup_info), K(dest));
} else if (OB_FAIL(pg_list_mgr.get_normal_pg_list(pkey_list))) {
LOG_WARN("failed to get sys pg list", K(ret), K(backup_info), K(dest));
}
return ret;
}
int ObRestoreBackupInfoUtil::get_restore_normal_pg_keys_v2_(
const GetRestoreBackupInfoParam &param, common::ObIArray<common::ObPartitionKey> &pkey_list)
{
int ret = OB_SUCCESS;
ObFakeBackupLeaseService fake_backup_lease;
ObExternPGListMgr pg_list_mgr;
ObSimpleBackupSetPath simple_set_path; // largest backup set path
if (OB_FAIL(param.get_largest_backup_set_path(simple_set_path))) {
LOG_WARN("failed to get smallest largest backup set path", K(ret));
} else if (OB_FAIL(pg_list_mgr.init(simple_set_path, fake_backup_lease))) {
LOG_WARN("failed to get sys pg list", K(ret), K(simple_set_path));
} else if (OB_FAIL(pg_list_mgr.get_normal_pg_list(pkey_list))) {
LOG_WARN("failed to get sys pg list", K(ret), K(simple_set_path));
}
return ret;
}
int ObRestoreBackupInfoUtil::check_is_snapshot_restore(const int64_t backup_snapshot, const int64_t restore_timestamp,
const uint64_t cluster_version, bool &is_snapshot_restore)
{

View File

@ -166,6 +166,9 @@ public:
static int get_restore_sys_table_ids(
const ObPhysicalRestoreInfo &info, common::ObIArray<common::ObPartitionKey> &pkey_list);
static int get_restore_normal_pg_keys(
const GetRestoreBackupInfoParam &param, common::ObIArray<common::ObPartitionKey> &pkey_list);
static int check_is_snapshot_restore(const int64_t backup_snapshot, const int64_t restore_timestamp,
const uint64_t cluster_version, bool &is_snapshot_restore);
@ -174,6 +177,10 @@ private:
static int get_restore_backup_info_v1_(const GetRestoreBackupInfoParam &param, ObRestoreBackupInfo &info);
// get info from simple path level
static int get_restore_backup_info_v2_(const GetRestoreBackupInfoParam &param, ObRestoreBackupInfo &info);
static int get_restore_normal_pg_keys_v1_(
const GetRestoreBackupInfoParam &param, common::ObIArray<common::ObPartitionKey> &pkey_list);
static int get_restore_normal_pg_keys_v2_(
const GetRestoreBackupInfoParam &param, common::ObIArray<common::ObPartitionKey> &pkey_list);
static int inner_get_restore_backup_set_info_(const GetRestoreBackupInfoParam &param,
ObBackupSetFileInfo &backup_set_info, ObExternTenantLocalityInfo &tenant_locality_info,
common::ObIArray<common::ObPGKey> &sys_pg_keys);