From 3a6db0a542776a7595ea226836bc7d66e25ad629 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 16 Jul 2021 20:57:14 +0800 Subject: [PATCH] patch code to open source --- .../oblib/src/lib/restore/ob_storage_file.cpp | 7 +- src/rootserver/backup/ob_backup_data_mgr.cpp | 7 +- .../ob_tenant_backup_data_clean_mgr.cpp | 2 +- src/rootserver/ob_backup_data_clean.cpp | 2 +- src/rootserver/ob_partition_backup.h | 2 +- src/rootserver/ob_root_backup.cpp | 34 ++- src/rootserver/ob_root_backup.h | 1 + src/share/backup/ob_backup_operator.cpp | 30 +++ src/share/backup/ob_backup_operator.h | 2 + src/share/backup/ob_backup_scheduler.cpp | 56 +++-- src/share/backup/ob_backup_scheduler.h | 4 +- .../backup/ob_pg_backup_task_updater.cpp | 19 ++ src/share/backup/ob_pg_backup_task_updater.h | 1 + src/share/restore/ob_restore_base_reader.h | 13 +- src/storage/ob_partition_base_data_backup.cpp | 3 +- .../ob_partition_base_data_ob_reader.cpp | 17 +- .../ob_partition_base_data_ob_reader.h | 2 +- .../ob_partition_base_data_oss_reader.cpp | 11 +- src/storage/ob_partition_migrator.cpp | 195 ++++++++++-------- src/storage/ob_partition_migrator.h | 4 +- src/storage/ob_partition_store.cpp | 12 +- src/storage/ob_partition_store.h | 8 +- src/storage/ob_pg_storage.cpp | 7 +- src/storage/ob_storage_struct.cpp | 14 +- .../test_partition_migrator_table_key_mgr.cpp | 26 ++- 25 files changed, 329 insertions(+), 150 deletions(-) diff --git a/deps/oblib/src/lib/restore/ob_storage_file.cpp b/deps/oblib/src/lib/restore/ob_storage_file.cpp index 35c127c735..658907e3b7 100644 --- a/deps/oblib/src/lib/restore/ob_storage_file.cpp +++ b/deps/oblib/src/lib/restore/ob_storage_file.cpp @@ -554,8 +554,11 @@ int ObStorageFileUtil::get_partition_ids_from_dir(const char* dir_path, common:: ret = OB_INVALID_ARGUMENT; OB_LOG(WARN, "dir path is invalid", K(ret), KP(dir_path)); } else if (OB_ISNULL(open_dir = ::opendir(dir_path))) { - if (ENOENT != errno) { - ret = OB_FILE_NOT_OPENED; + if (ENOENT == errno) { + ret = OB_DIR_NOT_EXIST; + OB_LOG(WARN, "fail to open dir", K(ret), K(dir_path), K(strerror_r(errno, errno_buf, sizeof(errno_buf)))); + } else { + ret = OB_IO_ERROR; OB_LOG(WARN, "fail to open dir", K(ret), K(dir_path), K(strerror_r(errno, errno_buf, sizeof(errno_buf)))); } } diff --git a/src/rootserver/backup/ob_backup_data_mgr.cpp b/src/rootserver/backup/ob_backup_data_mgr.cpp index ec627cbb16..0c89406e9a 100644 --- a/src/rootserver/backup/ob_backup_data_mgr.cpp +++ b/src/rootserver/backup/ob_backup_data_mgr.cpp @@ -259,9 +259,12 @@ int ObBackupListDataMgr::get_clog_pkey_list(common::ObIArray& pk cluster_backup_dest_, tenant_id_, log_archive_round_, path))) { LOG_WARN("failed to get tenant clog data path", K(ret), K(cluster_backup_dest_)); } else if (OB_FAIL(util.get_pkeys_from_dir(path.get_ptr(), cluster_backup_dest_.get_storage_info(), pkey_list))) { - LOG_WARN("failed to get pkeys from dir", K(ret), K(path)); + if (OB_DIR_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("failed to get pkeys from dir", K(ret), K(path)); + } } - LOG_INFO("get clog pkey list count", K(pkey_list.count())); return ret; } diff --git a/src/rootserver/backup/ob_tenant_backup_data_clean_mgr.cpp b/src/rootserver/backup/ob_tenant_backup_data_clean_mgr.cpp index 33897d1f15..576a7aca3c 100644 --- a/src/rootserver/backup/ob_tenant_backup_data_clean_mgr.cpp +++ b/src/rootserver/backup/ob_tenant_backup_data_clean_mgr.cpp @@ -579,7 +579,7 @@ int ObBackupDataCleanUtil::touch_backup_file( if (path.is_empty() || OB_ISNULL(storage_info)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("touch backup file get invalid argument", K(ret), K(path), KP(storage_info)); - } else if (OB_FAIL(check_can_delete(device_type, can_delete))) { + } else if (OB_FAIL(check_can_touch(device_type, can_delete))) { LOG_WARN("failed to check can delete", K(ret), K(device_type), K(path)); } else if (!can_delete) { // do nothing diff --git a/src/rootserver/ob_backup_data_clean.cpp b/src/rootserver/ob_backup_data_clean.cpp index 3b750680b5..1e1a063d39 100644 --- a/src/rootserver/ob_backup_data_clean.cpp +++ b/src/rootserver/ob_backup_data_clean.cpp @@ -3197,7 +3197,7 @@ int ObBackupDataClean::get_cluster_max_succeed_backup_set(int64_t& backup_set_id LOG_WARN("failed to init backup task history updater", K(ret)); } else if (OB_FAIL(updater.get_tenant_max_succeed_backup_task(OB_SYS_TENANT_ID, task_info))) { if (OB_INVALID_BACKUP_SET_ID == ret) { - backup_set_id = 0; + backup_set_id = INT64_MAX; ret = OB_SUCCESS; } else { LOG_WARN("failed to get tenant max succeed backup task", K(ret)); diff --git a/src/rootserver/ob_partition_backup.h b/src/rootserver/ob_partition_backup.h index dd6754c1b7..41c014ee63 100644 --- a/src/rootserver/ob_partition_backup.h +++ b/src/rootserver/ob_partition_backup.h @@ -111,7 +111,7 @@ public: void reset(); bool is_valid() const; TO_STRING_KV(K_(replica_element)); - common::ObSEArray replica_element_; + common::ObSEArray replica_element_; const ObBackupElement* choose_element_; DISALLOW_COPY_AND_ASSIGN(ObReplicaBackupElement); }; diff --git a/src/rootserver/ob_root_backup.cpp b/src/rootserver/ob_root_backup.cpp index 26536a42ed..c2053e7a2b 100644 --- a/src/rootserver/ob_root_backup.cpp +++ b/src/rootserver/ob_root_backup.cpp @@ -3555,9 +3555,18 @@ int ObTenantBackup::do_cancel(const share::ObTenantBackupTaskInfo& task_info, co } else if (OB_FAIL(get_finished_backup_task(task_info, pg_task_infos, trans))) { LOG_WARN("failed to get pg backup task result", K(ret), K(task_info)); } else if (FALSE_IT(is_all_task_finished = pg_task_infos.count() == task_info.pg_count_)) { - } else if (!is_all_task_finished && OB_FAIL(check_doing_pg_tasks(task_info, trans))) { - LOG_WARN("failed to check doing pg task", K(ret), K(task_info)); - // pg backup result is not ready, need wait + } else if (!is_all_task_finished) { + // 1.get finished backup need use all pg task inos instead, and use simpe pg task info to save memory + // 2.split all pg task into finish tasks and doing tasks and pending tasks + // 3.cancel pending pg tasks interface use pending tasks + // 4.check doing pg tasks interface use doing tasks + // 5.do_cancel interface trans should start in function + if (OB_FAIL(cancel_pending_pg_tasks(task_info, trans))) { + LOG_WARN("failed to cancel pending pg tasks", K(ret), K(task_info)); + } else if (OB_FAIL(check_doing_pg_tasks(task_info, trans))) { + LOG_WARN("faield to check doing pg tasks", K(ret), K(task_info)); + // pg backup result is not ready, need wait + } } if (OB_SUCC(ret)) { @@ -3648,6 +3657,25 @@ int ObTenantBackup::check_standalone_table_need_backup( return ret; } +int ObTenantBackup::cancel_pending_pg_tasks(const ObTenantBackupTaskInfo& task_info, common::ObISQLClient& trans) +{ + int ret = OB_SUCCESS; + share::ObPGBackupTaskUpdater pg_task_updater; + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("tenant backup do not init", K(ret)); + } else if (!task_info.is_valid() || ObTenantBackupTaskInfo::CANCEL != task_info.status_) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("cancel pending pg tasks get invalid argument", K(ret), K(task_info)); + } else if (OB_FAIL(pg_task_updater.init(trans))) { + LOG_WARN("failed to init pg task updater", K(ret), K(task_info)); + } else if (OB_FAIL(pg_task_updater.cancel_pending_tasks( + task_info.tenant_id_, task_info.incarnation_, task_info.backup_set_id_))) { + LOG_WARN("failed to cancel pending tasks", K(ret), K(task_info)); + } + return ret; +} + int ObTenantBackup::commit_trans(ObMySQLTransaction& trans) { int ret = OB_SUCCESS; diff --git a/src/rootserver/ob_root_backup.h b/src/rootserver/ob_root_backup.h index a1be207c2a..f4806ce92a 100644 --- a/src/rootserver/ob_root_backup.h +++ b/src/rootserver/ob_root_backup.h @@ -309,6 +309,7 @@ private: int check_standalone_table_need_backup(const share::schema::ObTableSchema* table_schema, bool& need_backup); int commit_trans(ObMySQLTransaction& trans); int start_trans(ObTimeoutCtx& timeout_ctx, ObMySQLTransaction& trans); + int cancel_pending_pg_tasks(const share::ObTenantBackupTaskInfo& task_info, common::ObISQLClient& trans); private: static const int64_t MAX_CHECK_INTERVAL = 10 * 1000 * 1000; // 10s diff --git a/src/share/backup/ob_backup_operator.cpp b/src/share/backup/ob_backup_operator.cpp index 5b830e284c..04e971435d 100644 --- a/src/share/backup/ob_backup_operator.cpp +++ b/src/share/backup/ob_backup_operator.cpp @@ -1162,6 +1162,36 @@ int ObPGBackupTaskOperator::update_result_and_status_and_statics( return ret; } +int ObPGBackupTaskOperator::cancel_pending_tasks(const uint64_t tenant_id, const int64_t incarnation, + const int64_t backup_set_id, const int64_t limit_num, common::ObISQLClient& sql_proxy) +{ + int ret = OB_SUCCESS; + ObSqlString sql; + int64_t affected_rows = -1; + const int32_t result = OB_CANCELED; + + if (OB_INVALID_ID == tenant_id || incarnation < 0 || backup_set_id < 0 || limit_num <= 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("update addr get invalid argument", K(ret), K(tenant_id), K(incarnation), K(backup_set_id), K(limit_num)); + } else if (OB_FAIL(sql.append_fmt( + "UPDATE %s SET status = 'FINISH', result ='%d', end_time = now(6) " + "WHERE tenant_id = %lu AND incarnation = %ld and backup_set_id = %ld and status = 'PENDING' LIMIT %ld", + OB_ALL_TENANT_PG_BACKUP_TASK_TNAME, + result, + tenant_id, + incarnation, + backup_set_id, + limit_num))) { + LOG_WARN("failed to append sql", K(ret), K(sql)); + } else if (OB_FAIL(sql_proxy.write(tenant_id, sql.ptr(), affected_rows))) { + LOG_WARN("fail to execute sql", K(ret)); + } else if (affected_rows < 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("affected rows unexpected", K(ret), K(affected_rows)); + } + return ret; +} + template int ObTenantBackupInfoOperation::set_info_item(const char* name, const char* info_str, T& info) { diff --git a/src/share/backup/ob_backup_operator.h b/src/share/backup/ob_backup_operator.h index 93f97c36d2..5ed96ad13c 100644 --- a/src/share/backup/ob_backup_operator.h +++ b/src/share/backup/ob_backup_operator.h @@ -95,6 +95,8 @@ public: common::ObISQLClient& sql_proxy, common::ObIArray& pg_task_infos); static int update_result_and_status_and_statics( common::ObISQLClient& sql_proxy, const ObPGBackupTaskInfo& pg_task_info); + static int cancel_pending_tasks(const uint64_t tenant_id, const int64_t incarnation, const int64_t backup_set_id, + const int64_t limit_num, common::ObISQLClient& sql_proxy); private: static int fill_one_item(const ObPGBackupTaskItem& item, ObDMLSqlSplicer& dml); diff --git a/src/share/backup/ob_backup_scheduler.cpp b/src/share/backup/ob_backup_scheduler.cpp index 188e9d1683..6812a3f148 100644 --- a/src/share/backup/ob_backup_scheduler.cpp +++ b/src/share/backup/ob_backup_scheduler.cpp @@ -219,6 +219,7 @@ int ObBackupScheduler::get_tenant_ids(ObIArray& tenant_ids) int ret = OB_SUCCESS; ObSchemaGetterGuard guard; int64_t backup_schema_version = 0; + ObArray tmp_tenant_ids; if (!is_inited_) { ret = OB_NOT_INIT; @@ -230,8 +231,22 @@ int ObBackupScheduler::get_tenant_ids(ObIArray& tenant_ids) } else if (OB_FAIL(ObBackupUtils::retry_get_tenant_schema_guard( OB_SYS_TENANT_ID, *schema_service_, backup_schema_version, guard))) { LOG_WARN("failed to get tenant schema guard", K(ret), K(backup_schema_version)); - } else if (OB_FAIL(guard.get_tenant_ids(tenant_ids))) { + } else if (OB_FAIL(guard.get_tenant_ids(tmp_tenant_ids))) { LOG_WARN("failed to get tenant ids", K(ret), K(arg_)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < tmp_tenant_ids.count(); ++i) { + const uint64_t tenant_id = tmp_tenant_ids.at(i); + bool can_backup = false; + if (OB_FAIL(get_tenant_schema_version(tenant_id, backup_schema_version))) { + LOG_WARN("failed to get tenant schem version", K(ret), K(tenant_id)); + } else if (OB_FAIL(check_tenant_can_backup(tenant_id, backup_schema_version, can_backup))) { + LOG_WARN("failed to check tenant can backup", K(ret), K(tenant_id), K(backup_schema_version)); + } else if (!can_backup) { + // do nothing + } else if (OB_FAIL(tenant_ids.push_back(tenant_id))) { + LOG_WARN("failed to push tenant id into array", K(ret)); + } + } } } else { ret = OB_NOT_SUPPORTED; @@ -406,7 +421,7 @@ int ObBackupScheduler::schedule_tenant_backup(const int64_t backup_snapshot_vers LOG_WARN("tenant id should be sys tenant id", K(ret), K(tenant_id), K(backup_snapshot_version), K(backup_dest)); } else if (OB_FAIL(get_tenant_schema_version(tenant_id, backup_schema_version))) { LOG_WARN("failed to get tenant schema version", K(ret), K(tenant_id)); - } else if (OB_FAIL(check_tenant_can_backup(tenant_id, backup_schema_version, info_manager, can_backup))) { + } else if (OB_FAIL(check_tenant_backup_data_version(tenant_id, info_manager, can_backup))) { LOG_WARN("failed to check tenant can backup", K(ret), K(tenant_id)); } else if (!can_backup) { // do nothing @@ -731,33 +746,26 @@ int ObBackupScheduler::check_backup_task_infos_status(const ObBaseBackupInfoStru } int ObBackupScheduler::check_tenant_can_backup( - const uint64_t tenant_id, const int64_t backup_schema_version, ObBackupInfoManager& info_manager, bool& can_backup) + const uint64_t tenant_id, const int64_t backup_schema_version, bool& can_backup) { int ret = OB_SUCCESS; ObSchemaGetterGuard guard; const ObSimpleTenantSchema* tenant_schema = NULL; - int64_t base_backup_version = 0; - can_backup = true; + if (!is_inited_) { ret = OB_NOT_INIT; LOG_WARN("backup scheduler do not init", K(ret)); } else if (OB_INVALID_ID == tenant_id || backup_schema_version <= 0) { ret = OB_INVALID_ARGUMENT; LOG_WARN("check tenant can backup get invalid argument", K(ret), K(tenant_id), K(backup_schema_version)); - } else if (OB_FAIL(info_manager.get_base_backup_version(tenant_id, *proxy_, base_backup_version))) { - LOG_WARN("failed to get base backup version", K(ret), K(tenant_id), K(base_backup_version)); } else if (OB_FAIL(schema_service_->get_tenant_schema_guard(tenant_id, guard, backup_schema_version))) { LOG_WARN("failed to get tenant schema guard", K(ret), K(backup_schema_version)); } else if (OB_FAIL(guard.get_tenant_info(tenant_id, tenant_schema))) { LOG_WARN("failed to get tenant info", K(ret), K(tenant_id)); - } else if (tenant_schema->is_dropping() || tenant_schema->is_restore() || - backup_data_version_ < base_backup_version) { + } else if (tenant_schema->is_dropping() || tenant_schema->is_restore()) { can_backup = false; - FLOG_INFO("tenant can no join in backup, skip backup", - K(*tenant_schema), - K(backup_data_version_), - K(base_backup_version)); + FLOG_INFO("tenant can no join in backup, skip backup", K(*tenant_schema)); } return ret; } @@ -835,5 +843,27 @@ int ObBackupScheduler::check_log_archive_status() return ret; } +int ObBackupScheduler::check_tenant_backup_data_version( + const uint64_t tenant_id, ObBackupInfoManager& info_manager, bool& can_backup) +{ + int ret = OB_SUCCESS; + int64_t base_backup_version = 0; + can_backup = true; + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("backup scheduler do not init", K(ret)); + } else if (OB_INVALID_ID == tenant_id) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("check tenant can backup get invalid argument", K(ret), K(tenant_id)); + } else if (OB_FAIL(info_manager.get_base_backup_version(tenant_id, *proxy_, base_backup_version))) { + LOG_WARN("failed to get base backup version", K(ret), K(tenant_id), K(base_backup_version)); + } else if (backup_data_version_ < base_backup_version) { + can_backup = false; + FLOG_INFO("tenant can no join in backup, skip backup", K(backup_data_version_), K(base_backup_version)); + } + return ret; +} + } // namespace share } // namespace oceanbase diff --git a/src/share/backup/ob_backup_scheduler.h b/src/share/backup/ob_backup_scheduler.h index dbe1f3088d..6ce6818078 100644 --- a/src/share/backup/ob_backup_scheduler.h +++ b/src/share/backup/ob_backup_scheduler.h @@ -52,14 +52,14 @@ private: int rollback_backup_infos(ObBackupInfoManager& info_manager); int rollback_backup_info(const ObBaseBackupInfoStruct& info, ObBackupInfoManager& info_manager); int check_backup_task_infos_status(const ObBaseBackupInfoStruct& info); - int check_tenant_can_backup(const uint64_t tenant_id, const int64_t backup_schema_version, - ObBackupInfoManager& info_manager, bool& can_backup); + int check_tenant_can_backup(const uint64_t tenant_id, const int64_t backup_schema_version, bool& can_backup); int check_gts_(const common::ObIArray& tenant_ids); int check_gts_(const uint64_t tenant_id); int init_frozen_schema_versions_(rootserver::ObFreezeInfoManager& freeze_info_manager, const int64_t frozen_version); int check_backup_schema_version_(const uint64_t tenant_id, const int64_t backup_schema_version); int create_backup_point(const uint64_t tenant_id); int check_log_archive_status(); + int check_tenant_backup_data_version(const uint64_t tenant_id, ObBackupInfoManager& info_manager, bool& can_backup); private: static const int64_t MAX_TENANT_BUCKET = 1024; diff --git a/src/share/backup/ob_pg_backup_task_updater.cpp b/src/share/backup/ob_pg_backup_task_updater.cpp index 2fd1a53668..6201e0acc6 100644 --- a/src/share/backup/ob_pg_backup_task_updater.cpp +++ b/src/share/backup/ob_pg_backup_task_updater.cpp @@ -435,3 +435,22 @@ int ObPGBackupTaskUpdater::update_status_and_result(const common::ObIArray& pg_task_info_array); + int cancel_pending_tasks(const uint64_t tenant_id, const int64_t incarnation, const int64_t backup_set_id); private: static const int64_t MAX_BATCH_COUNT = 1024; diff --git a/src/share/restore/ob_restore_base_reader.h b/src/share/restore/ob_restore_base_reader.h index 3ee480c80e..1ff75b50f4 100644 --- a/src/share/restore/ob_restore_base_reader.h +++ b/src/share/restore/ob_restore_base_reader.h @@ -34,18 +34,25 @@ public: int get_create_tenant_stmt(common::ObString& stmt); int get_create_tablegroup_stmts(common::ObIArray& stmts); int get_create_foreign_key_stmts(common::ObIArray& stmts); + int get_create_trigger_stmts(common::ObIArray& stmts); int get_create_database_stmts(common::ObIArray& stmts); int get_create_data_table_stmts(common::ObIArray& stmts); int get_create_user_stmts(common::ObIArray& stmts); int get_create_index_table_stmts( common::ObIArray& stmts, common::hash::ObHashSet& dropped_index_ids); + int get_routine_ids(common::ObIArray& routine_ids); + int get_create_routine_stmt(const common::ObString& routine_id_name, common::ObString& stmt); + int get_package_ids(common::ObIArray& package_ids); + int get_create_package_stmt(const common::ObString& package_id_name, common::ObString& stmt); + int get_udt_ids(common::ObIArray& udt_ids); + int get_create_udt_stmt(const common::ObString& udt_id_name, common::ObString& stmt); + int get_create_tablespace_stmts(common::ObIArray& stmts); int get_create_synonym_stmts(common::ObIArray& stmts); /* Commands that can be executed directly without any modification */ int get_direct_executable_stmts(const char* direct_executable_definitions, common::ObIArray& stmts); int get_recycle_objects(common::ObIArray& objects); + int get_security_audit_stmts(common::ObIArray& stmts); int get_create_all_timezone_stmts(common::ObIArray& stmts); - -private: int get_one_object_from_oss( const char* last_name, const bool allow_not_exist, common::ObIArray& stmts); int get_create_table_stmt( @@ -56,8 +63,6 @@ private: int read_one_file(const common::ObStoragePath& path, common::ObIAllocator& allocator, char*& buf, int64_t& read_size); private: - // oss path: "oss://runiu1/ob1.XX/3/1001" - // file path: "file:///mnt/test_nfs_runiu/ob1.XX/3/1001" common::ObStoragePath common_path_; ObRestoreArgs& args_; common::ObArenaAllocator allocator_; diff --git a/src/storage/ob_partition_base_data_backup.cpp b/src/storage/ob_partition_base_data_backup.cpp index bac3738dcb..88bd604bb6 100644 --- a/src/storage/ob_partition_base_data_backup.cpp +++ b/src/storage/ob_partition_base_data_backup.cpp @@ -1286,6 +1286,7 @@ int ObBackupFileAppender::open(common::ObInOutBandwidthThrottle& bandwidth_throt file_type_ = type; backup_arg_ = &backup_arg; bandwidth_throttle_ = &bandwidth_throttle; + file_offset_ = 0; switch (type) { case BACKUP_META: @@ -1933,7 +1934,6 @@ int ObBackupFileAppender::close() STORAGE_LOG(WARN, "close appender fail", K(ret), K(tmp_ret), K(storage_appender_)); } } - file_offset_ = 0; is_opened_ = false; STORAGE_LOG(INFO, "finish close bakcup file appender", K(ret), K(tmp_ret)); return ret; @@ -3797,6 +3797,7 @@ int ObBackupFinishTask::init(ObMigrateCtx& ctx) int ObBackupFinishTask::process() { int ret = OB_SUCCESS; + if (NULL != ctx_) { STORAGE_LOG(INFO, "start ObBackupFinishTask process", "pkey", ctx_->replica_op_arg_.key_); } diff --git a/src/storage/ob_partition_base_data_ob_reader.cpp b/src/storage/ob_partition_base_data_ob_reader.cpp index 44b3daf1b0..ddb7cd2e40 100644 --- a/src/storage/ob_partition_base_data_ob_reader.cpp +++ b/src/storage/ob_partition_base_data_ob_reader.cpp @@ -2481,23 +2481,30 @@ ObTailoredRowIterator::ObTailoredRowIterator() {} int ObTailoredRowIterator::init(const uint64_t index_id, const ObPartitionKey& pg_key, const int64_t schema_version, - const ObITable::TableKey& table_key, ObTablesHandle& handle) + const ObITable::TableKey& table_key, const int64_t restore_snapshot_version, ObTablesHandle& handle) { int ret = OB_SUCCESS; if (is_inited_) { ret = OB_INIT_TWICE; LOG_WARN("tailored row iterator init twice", K(ret)); - } else if (OB_INVALID_ID == index_id || handle.empty() || !pg_key.is_valid() || !table_key.is_valid()) { + } else if (OB_INVALID_ID == index_id || handle.empty() || !pg_key.is_valid() || !table_key.is_valid() || + restore_snapshot_version <= 0) { ret = OB_INVALID_ARGUMENT; - LOG_WARN( - "tailored row iter init get invalid argument", K(ret), K(index_id), K(pg_key), K(table_key), K(schema_version)); + LOG_WARN("tailored row iter init get invalid argument", + K(ret), + K(index_id), + K(pg_key), + K(table_key), + K(schema_version), + K(restore_snapshot_version)); } else { ObExtStoreRange key_range; key_range.get_range().set_whole_range(); ObVersionRange version_range; version_range.base_version_ = 0; version_range.multi_version_start_ = 0; - version_range.snapshot_version_ = table_key.trans_version_range_.snapshot_version_; + version_range.snapshot_version_ = + std::max(table_key.trans_version_range_.snapshot_version_, restore_snapshot_version); snapshot_version_ = table_key.trans_version_range_.snapshot_version_; int64_t save_schema_version = -1; memtable::ObIMemtableCtxFactory* mem_ctx_factory = ObPartitionService::get_instance().get_mem_ctx_factory(); diff --git a/src/storage/ob_partition_base_data_ob_reader.h b/src/storage/ob_partition_base_data_ob_reader.h index fe6814f161..b6f99ad8fb 100644 --- a/src/storage/ob_partition_base_data_ob_reader.h +++ b/src/storage/ob_partition_base_data_ob_reader.h @@ -524,7 +524,7 @@ public: virtual ~ObTailoredRowIterator() {} int init(const uint64_t index_id, const ObPartitionKey& pg_key, const int64_t schema_version, - const ObITable::TableKey& table_key, ObTablesHandle& handle); + const ObITable::TableKey& table_key, const int64_t restore_snapshot_version, ObTablesHandle& handle); virtual int get_next_row(const ObStoreRow*& store_row); virtual const obrpc::ObFetchLogicRowArg* get_fetch_logic_row_arg() { diff --git a/src/storage/ob_partition_base_data_oss_reader.cpp b/src/storage/ob_partition_base_data_oss_reader.cpp index 5579a46ec4..b2083cad09 100644 --- a/src/storage/ob_partition_base_data_oss_reader.cpp +++ b/src/storage/ob_partition_base_data_oss_reader.cpp @@ -316,17 +316,20 @@ int ObPartitionMetaStorageReader::read_table_keys() OB_LOG(WARN, "fail to deserialize table key size", K(ret), K(table_idx), K(table_count), K(read_size)); } else if (OB_FAIL(table_key.deserialize(read_buf, read_size, pos))) { OB_LOG(WARN, "fail to deserialize table key", K(ret), K(table_idx), K(table_count), K(read_size)); - } else if (serialize_size != table_key.get_serialize_size()) { - ret = OB_ERR_UNEXPECTED; - OB_LOG(WARN, "table keys serialize size is not match", K(ret), K(serialize_size), K(table_key)); } else if (OB_FAIL(table_keys_array_.push_back(table_key))) { OB_LOG(WARN, "fail to push backup table key", K(ret), K(table_key)); } else { OB_LOG(DEBUG, "succ to add table key meta", K(read_size), K(pos), K(table_key)); } } - } + if (OB_SUCC(ret)) { + if (pos != read_size) { + ret = OB_ERR_UNEXPECTED; + OB_LOG(WARN, "table keys serialize size is not match", K(ret), K(pos), K(read_size)); + } + } + } return ret; } diff --git a/src/storage/ob_partition_migrator.cpp b/src/storage/ob_partition_migrator.cpp index c1cfdd4709..60d91dfbe0 100644 --- a/src/storage/ob_partition_migrator.cpp +++ b/src/storage/ob_partition_migrator.cpp @@ -786,7 +786,7 @@ int ObPartitionMigrateCtx::add_sstable(ObSSTable& sstable) if (OB_ISNULL(ctx_)) { ret = OB_ERR_SYS; LOG_WARN("ctx must not null", K(ret), K(*this)); - } else if (!is_partition_exist_ || !ctx_->is_copy_index()) { + } else if (!ctx_->is_copy_index()) { common::SpinWLockGuard guard(lock_); if (OB_FAIL(handle_.add_table(&sstable))) { LOG_WARN("failed to add table", K(ret)); @@ -891,6 +891,7 @@ int ObMigratePrepareTask::create_new_partition(const ObAddr& src_server, ObRepli ObCreatePGParam param; common::ObReplicaProperty replica_property; // TODO: need using RS appoint replica property ObMigrateStatus migrate_status; + bool need_create_memtable = false; if (!is_inited_) { ret = OB_NOT_INIT; @@ -962,7 +963,9 @@ int ObMigratePrepareTask::create_new_partition(const ObAddr& src_server, ObRepli STORAGE_LOG(WARN, "fail to write add partition (group) log.", K(ret), K(subcmd), K(log_attr)); } else if (OB_FAIL(partition->create_partition_group(param))) { STORAGE_LOG(WARN, "fail to create partition group", K(ret)); - } else if (OB_FAIL(partition->create_memtable(in_slog_trans))) { + } else if (FALSE_IT(need_create_memtable = !(partition->get_pg_storage().is_restoring_base_data() || + partition->get_pg_storage().is_restoring_standby()))) { + } else if (need_create_memtable && OB_FAIL(partition->create_memtable(in_slog_trans))) { STORAGE_LOG(WARN, "fail to create memtable", K(ret)); } else { file_handle.reset(); @@ -2763,12 +2766,7 @@ int ObPartGroupMigrationTask::set_migrate_in(ObPGStorage& pg_storage) LOG_INFO("no need set migrate_status", K(task->arg_.type_), K(pkey)); } else if (OB_FAIL(ObMigrateStatusHelper::trans_replica_op(task->arg_.type_, new_status))) { LOG_WARN("failed to trans_replica_op_to_migrate_status", K(ret), "arg", task->arg_); - } else if (OB_MIGRATE_STATUS_ADD == old_status || OB_MIGRATE_STATUS_MIGRATE == old_status || - OB_MIGRATE_STATUS_REBUILD == old_status || OB_MIGRATE_STATUS_CHANGE == old_status || - OB_MIGRATE_STATUS_RESTORE == old_status || OB_MIGRATE_STATUS_COPY_GLOBAL_INDEX == old_status || - OB_MIGRATE_STATUS_COPY_LOCAL_INDEX == old_status || OB_MIGRATE_STATUS_HOLD == old_status || - OB_MIGRATE_STATUS_RESTORE_FOLLOWER == old_status || OB_MIGRATE_STATUS_RESTORE_STANDBY == old_status || - OB_MIGRATE_STATUS_LINK_MAJOR == old_status) { + } else if (OB_MIGRATE_STATUS_NONE != old_status) { ret = OB_STATE_NOT_MATCH; LOG_WARN("old migrate is not finish, cannot set new migrate status", K(ret), @@ -3265,8 +3263,6 @@ int ObPartGroupMigrationTask::try_change_member_list( const int64_t start_ts = ObTimeUtility::current_time(); ObMigrateSrcInfo leader_info; bool need_batch = false; - const int64_t local_switch_epoch = GCTX.get_switch_epoch2(); - int64_t remote_switch_epoch = -1; DEBUG_SYNC(BEFORE_CHANGE_MEMBER_LIST); @@ -3281,17 +3277,6 @@ int ObPartGroupMigrationTask::try_change_member_list( } else if (report_list.count() <= 0) { ret = OB_ERR_SYS; STORAGE_LOG(ERROR, "task list is empty", K(ret)); - } else if (FALSE_IT(remote_switch_epoch = report_list.at(0).arg_.switch_epoch_)) { - } else if (remote_switch_epoch != local_switch_epoch) { - ret = OB_ERR_SYS; - STORAGE_LOG(ERROR, - "switch epoch is not equal local switch epoch", - K(ret), - K(report_list.at(0)), - K(local_switch_epoch), - K(remote_switch_epoch)); - first_error_code_ = ret; - is_all_finish = true; } else if (NORMAL_CHANGE_MEMBER_LIST != change_member_option_) { if (MIGRATE_REPLICA_OP == type_) { if (OB_SUCCESS != (tmp_ret = batch_remove_src_replica(report_list))) { @@ -5927,15 +5912,12 @@ int ObMigratePrepareTask::try_hold_local_partition() LOG_WARN("failed to get leader", K(ret), "arg", ctx_->replica_op_arg_); } else if (OB_FAIL(partition->get_role(role))) { LOG_WARN("failed to get partition role", K(ret), "arg", ctx_->replica_op_arg_); - } else if (leader.is_valid() && - leader == MYADDR - // TODO - //&& is_strong_leader(role) - && (ADD_REPLICA_OP == ctx_->replica_op_arg_.type_ || MIGRATE_REPLICA_OP == ctx_->replica_op_arg_.type_ || - FAST_MIGRATE_REPLICA_OP == ctx_->replica_op_arg_.type_ || - REBUILD_REPLICA_OP == ctx_->replica_op_arg_.type_ || - CHANGE_REPLICA_OP == ctx_->replica_op_arg_.type_ || - LINK_SHARE_MAJOR_OP == ctx_->replica_op_arg_.type_)) { + } else if (leader.is_valid() && leader == MYADDR && is_strong_leader(role) && + (ADD_REPLICA_OP == ctx_->replica_op_arg_.type_ || MIGRATE_REPLICA_OP == ctx_->replica_op_arg_.type_ || + FAST_MIGRATE_REPLICA_OP == ctx_->replica_op_arg_.type_ || + REBUILD_REPLICA_OP == ctx_->replica_op_arg_.type_ || + CHANGE_REPLICA_OP == ctx_->replica_op_arg_.type_ || + LINK_SHARE_MAJOR_OP == ctx_->replica_op_arg_.type_)) { if (REBUILD_REPLICA_OP == ctx_->replica_op_arg_.type_) { if (OB_FAIL(MIGRATOR.get_partition_service()->turn_off_rebuild_flag(ctx_->replica_op_arg_))) { LOG_WARN("Failed to report_rebuild_replica off", K(ret), "arg", ctx_->replica_op_arg_); @@ -6723,7 +6705,7 @@ bool ObMigratePrepareTask::can_migrate_src_skip_log_sync(const obrpc::ObFetchPGI b_ret = false; } else if (ADD_REPLICA_OP == ctx.replica_op_arg_.type_ || MIGRATE_REPLICA_OP == ctx.replica_op_arg_.type_) { if (result.pg_meta_.is_restore_ > REPLICA_NOT_RESTORE && - result.pg_meta_.is_restore_ < REPLICA_RESTORE_DUMP_MEMTABLE) { + result.pg_meta_.is_restore_ <= REPLICA_RESTORE_MEMBER_LIST) { b_ret = true; } } @@ -6919,7 +6901,8 @@ int ObMigratePrepareTask::build_migrate_table_info(const uint64_t table_id, cons local_inc_sstables, remote_major_sstables, remote_inc_sstables, - info.major_sstables_))) { + info.major_sstables_, + part_ctx))) { LOG_WARN("failed to build migrate major sstable", K(ret)); } else if (OB_FAIL(build_migrate_minor_sstable(need_reuse_local_minor, local_inc_sstables, @@ -7112,7 +7095,7 @@ int ObMigratePrepareTask::build_remote_minor_sstables(const common::ObIArray& local_major_tables, ObIArray& local_inc_tables, ObIArray& remote_major_tables, ObIArray& remote_inc_tables, - ObIArray& copy_sstables) + ObIArray& copy_sstables, ObPartitionMigrateCtx& part_ctx) { int ret = OB_SUCCESS; @@ -7121,7 +7104,8 @@ int ObMigratePrepareTask::build_migrate_major_sstable(const bool need_reuse_loca local_inc_tables, remote_major_tables, remote_inc_tables, - copy_sstables))) { + copy_sstables, + part_ctx))) { LOG_WARN("failed to build_migrate_major_sstable", K(ret)); } return ret; @@ -7219,13 +7203,14 @@ int ObMigratePrepareTask::build_migrate_major_sstable_(ObIArray& local_major_tables, ObIArray& local_inc_tables, ObIArray& remote_major_tables, ObIArray& remote_inc_tables, - ObIArray& copy_sstables) + ObIArray& copy_sstables, ObPartitionMigrateCtx& part_ctx) { int ret = OB_SUCCESS; int64_t max_snapshot_version = 0; UNUSED(local_inc_tables); UNUSED(remote_inc_tables); UNUSED(need_reuse_local_minor); + bool need_add_local_major = false; // D type replica major need to consider compaction_interval. Do local compaction when greater than 0, and meanwhile // need local major const int64_t follower_replica_merge_level = GCONF._follower_replica_merge_level; @@ -7240,6 +7225,7 @@ int ObMigratePrepareTask::build_migrate_major_sstable_v2_(const bool need_reuse_ for (int64_t i = 0; OB_SUCC(ret) && i < remote_major_tables.count(); ++i) { const ObITable::TableKey& remote_major_table = remote_major_tables.at(i); if (remote_major_table.get_snapshot_version() <= max_snapshot_version && !remote_major_table.is_trans_sstable()) { + need_add_local_major = true; continue; } else { ObMigrateTableInfo::SSTableInfo info; @@ -7254,6 +7240,24 @@ int ObMigratePrepareTask::build_migrate_major_sstable_v2_(const bool need_reuse_ } } } + + if (OB_SUCC(ret) && need_add_local_major) { + ObTableHandle table_handle; + for (int64_t i = 0; OB_SUCC(ret) && i < local_major_tables.count(); ++i) { + const ObITable::TableKey& local_major_table = local_major_tables.at(i); + ObITable* table = NULL; + table_handle.reset(); + if (OB_FAIL(ObPartitionService::get_instance().acquire_sstable(local_major_table, table_handle))) { + LOG_WARN("failed to get complete sstable by key", K(ret), K(local_major_table)); + } else if (NULL == (table = table_handle.get_table())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("table should not be NULL", K(ret), KP(table)); + } else if (OB_FAIL(part_ctx.add_sstable(*reinterpret_cast(table)))) { + LOG_WARN("failed to add sstable", K(ret)); + } + } + } + if (OB_SUCC(ret)) { FLOG_INFO("build_migrate_major_sstable", K(local_major_tables), @@ -7959,7 +7963,14 @@ int ObMigrateUtil::merge_trans_table(ObMigrateCtx& ctx) ret = OB_ERR_SYS; LOG_WARN("partition group must not null", K(ret)); } else if (FALSE_IT(pg_key = pg->get_partition_key())) { - } else { + } else if (FALSE_IT(ctx.old_trans_table_seq_ = pg->get_pg_storage().get_trans_table_seq())) { + } else if (OB_FAIL( + ObMigrateUtil::add_trans_sstable_to_part_ctx(trans_sstable->get_key().pkey_, *trans_sstable, ctx))) { + LOG_WARN("failed to add trans sstable to part migrate ctx", K(ret), KPC(trans_sstable)); + } + // TODO() need reconsider sstable reuse, now skip trans sstable reuse + /* + else { ObTransTableMergeTask merge_task; ObMacroBlockWriter writer; ObTableHandle new_trans_sstable; @@ -7979,7 +7990,7 @@ int ObMigrateUtil::merge_trans_table(ObMigrateCtx& ctx) LOG_WARN("failed to add trans sstable to part migrate ctx", K(ret), KPC(new_sstable)); } } - + */ return ret; } @@ -8093,7 +8104,7 @@ int ObMigratePostPrepareTask::deal_with_rebuild_partition() LOG_WARN("failed to get leader", K(ret), "arg", ctx_->replica_op_arg_); } else if (OB_FAIL(partition->get_role(role))) { LOG_WARN("failed to get real role", K(ret), "arg", ctx_->replica_op_arg_); - } else if (leader.is_valid() && leader == MYADDR) { // TODO //&& is_strong_leader(role)) { + } else if (leader.is_valid() && leader == MYADDR && is_strong_leader(role)) { if (OB_FAIL(MIGRATOR.get_partition_service()->turn_off_rebuild_flag(ctx_->replica_op_arg_))) { LOG_WARN("Failed to report_rebuild_replica off", K(ret), "arg", ctx_->replica_op_arg_); } else { @@ -11417,6 +11428,11 @@ int ObMigrateFinishTask::process() LOG_WARN("failed to merge trans table", K(ret)); } else if (OB_FAIL(create_pg_partition_if_need())) { LOG_WARN("Failed to create pg partition if need", K(ret)); + } else if (ADD_REPLICA_OP == ctx_->replica_op_arg_.type_ && + ((ctx_->is_restore_ > REPLICA_NOT_RESTORE && ctx_->is_restore_ < REPLICA_RESTORE_MAX) || + REPLICA_RESTORE_STANDBY == ctx_->is_restore_)) { + is_add_replica_during_restore = true; + LOG_INFO("no need check read add replica during restore", K(ctx_->is_restore_), "pkey", ctx_->replica_op_arg_.key_); } else { if (ObReplicaTypeCheck::is_replica_with_ssstore(ctx_->replica_op_arg_.dst_.get_replica_type())) { if (OB_FAIL(check_pg_partition_ready_for_read())) { @@ -11429,8 +11445,10 @@ int ObMigrateFinishTask::process() LOG_WARN("failed to check_available_index_all_exist", K(ret)); } } + } - if (OB_SUCC(ret) && OB_FAIL(enable_replay())) { + if (OB_SUCC(ret)) { + if (OB_FAIL(enable_replay())) { LOG_WARN("failed to enable replay", K(ret), K(ctx_->replica_op_arg_)); } } @@ -11602,7 +11620,10 @@ int ObMigrateFinishTask::create_pg_partition_if_need() } else if (OB_ISNULL(pg = ctx_->partition_guard_.get_partition_group())) { ret = OB_ERR_SYS; LOG_WARN("pg should not be NULL", K(ret), KP(pg)); - } else if (!ctx_->is_copy_index()) { + } else if (ctx_->is_copy_index()) { + // no need to replace store map, just skip it. + LOG_INFO("replica op tyoe is no need batch replace sstable, skip it"); + } else { const ObSavedStorageInfoV2& saved_storage_info = ctx_->pg_meta_.storage_info_; if (OB_FAIL(pg->get_pg_storage().batch_replace_store_map(ctx_->part_ctx_array_, saved_storage_info.get_data_info().get_schema_version(), @@ -11610,45 +11631,6 @@ int ObMigrateFinishTask::create_pg_partition_if_need() ctx_->old_trans_table_seq_))) { LOG_WARN("failed to batch replace store map", K(ret)); } - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < ctx_->part_ctx_array_.count(); ++i) { - ObPartitionMigrateCtx& pctx = ctx_->part_ctx_array_.at(i); - const ObPGPartitionStoreMeta& meta = pctx.copy_info_.meta_; - if (pctx.is_partition_exist_) { - if (!pctx.handle_.empty()) { - ret = OB_ERR_SYS; - LOG_ERROR("handle must be empty when partition exist", K(ret), K(pctx)); - } - } else { - obrpc::ObCreatePartitionArg arg; - const ObSavedStorageInfoV2& saved_storage_info = ctx_->pg_meta_.storage_info_; - arg.schema_version_ = saved_storage_info.get_data_info().get_schema_version(); - arg.lease_start_ = meta.create_timestamp_; - - if (ctx_->replica_op_arg_.is_physical_restore_leader()) { - arg.restore_ = REPLICA_RESTORE_DATA; - } else if (ctx_->replica_op_arg_.is_standby_restore()) { - arg.restore_ = REPLICA_RESTORE_STANDBY; - } - - const bool is_replay = false; - const uint64_t unused = 0; - - if (OB_FAIL(pg->create_pg_partition(meta.pkey_, - meta.multi_version_start_, - meta.data_table_id_, - arg, - in_slog_trans, - is_replay, - unused, - pctx.handle_))) { - LOG_WARN("fail to create pg partition", K(ret), K(meta)); - } else { - FLOG_INFO("succeed to create pg partition during finish", "pkey", meta.pkey_, "handle", pctx.handle_); - pctx.handle_.reset(); - } - } - } } return ret; } @@ -12733,10 +12715,18 @@ int ObRestoreTailoredPrepareTask::update_restore_flag_cut_data() { int ret = OB_SUCCESS; const int16_t flag = REPLICA_RESTORE_CUT_DATA; + ObIPartitionGroup* partition_group = NULL; + int16_t restore_status = ObReplicaRestoreStatus::REPLICA_NOT_RESTORE; if (!is_inited_) { ret = OB_NOT_INIT; LOG_WARN("restore failored prepare task do not init", K(ret)); + } else if (OB_ISNULL((partition_group = ctx_->partition_guard_.get_partition_group()))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("partition group should not be NULL", K(ret), KP(partition_group)); + } else if (FALSE_IT(restore_status = partition_group->get_pg_storage().get_restore_state())) { + } else if (restore_status == flag) { + // do nothing } else { const ObPGKey& pg_key = ctx_->replica_op_arg_.key_; if (OB_FAIL(MIGRATOR.get_partition_service()->set_restore_flag(pg_key, flag))) { @@ -12753,6 +12743,7 @@ int ObRestoreTailoredPrepareTask::check_need_generate_task(bool& need_generate) ObIPartitionGroup* partition_group = NULL; ObTablesHandle tables_handle; int64_t max_upper_trans_version = 0; + transaction::ObTransService* trans_service = NULL; if (!is_inited_) { ret = OB_NOT_INIT; @@ -12762,18 +12753,46 @@ int ObRestoreTailoredPrepareTask::check_need_generate_task(bool& need_generate) LOG_WARN("partition group should not be NULL", K(ret), KP(partition_group)); } else if (OB_FAIL(partition_group->get_all_tables(tables_handle))) { LOG_WARN("failed to get all tables", K(ret), KP(partition_group)); + } else if (OB_ISNULL(trans_service = GCTX.par_ser_->get_trans_service())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("trans service should not be NULL", K(ret), KP(trans_service)); + } else if (RESTORE_STANDBY_OP == ctx_->replica_op_arg_.type_) { + need_generate = true; } else { const int64_t restore_snapshot_version = ctx_->replica_op_arg_.phy_restore_arg_.restore_info_.restore_snapshot_version_; for (int64_t i = 0; OB_SUCC(ret) && i < tables_handle.get_count(); ++i) { - const ObITable* table = tables_handle.get_table(i); + ObITable* table = tables_handle.get_table(i); + ObSSTable* sstable = NULL; if (OB_ISNULL(table)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("table should not be NULL", K(ret), KP(table)); } else if (table->is_major_sstable() || table->is_trans_sstable()) { // do nothing + } else if (OB_ISNULL(sstable = reinterpret_cast(table))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("sstable should not be NULL", K(ret), KP(sstable)); } else { - max_upper_trans_version = std::max(table->get_upper_trans_version(), max_upper_trans_version); + int64_t max_trans_version = sstable->get_upper_trans_version(); + if (INT64_MAX == sstable->get_upper_trans_version()) { + bool is_all_rollback_trans = false; + // get_all_latest_minor_sstables return all minor except complement + // all normal minor sstable consist of data between [start_log_ts, end_log_ts] + if (OB_FAIL(trans_service->get_max_trans_version_before_given_log_ts( + sstable->get_partition_key(), sstable->get_end_log_ts(), max_trans_version, is_all_rollback_trans))) { + LOG_WARN("failed to get_max_trans_version_before_given_log_id", K(ret), KPC(sstable)); + } else if (0 == max_trans_version && !is_all_rollback_trans) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("max trans version should not be 0", + K(sstable->get_partition_key()), + "end_log_ts", + sstable->get_end_log_ts()); + } + } + + if (OB_SUCC(ret)) { + max_upper_trans_version = std::max(max_trans_version, max_upper_trans_version); + } } } @@ -12840,6 +12859,7 @@ int ObRestoreTailoredTask::process() ObMacroBlocksWriteCtx block_write_ctx; ObMacroBlocksWriteCtx lob_block_write_ctx; const int64_t schema_version = OB_INVALID_VERSION; + int64_t restore_snapshot = 0; LOG_INFO("start ObRestoreTailoredTask process", "table_id", index_id_, "partition_key", partition_key_); @@ -12849,9 +12869,12 @@ int ObRestoreTailoredTask::process() } else if (minor_tables_handle_.empty()) { // do nothing LOG_INFO("minor tables handle is empty, no need cut", K(partition_key_), K(index_id_)); + } else if (FALSE_IT( + restore_snapshot = ctx_->replica_op_arg_.phy_restore_arg_.restore_info_.restore_snapshot_version_)) { } else if (OB_FAIL(get_tailored_table_key(table_key))) { LOG_WARN("failed to get tailored table key", K(ret), K(index_id_), K(pg_key_)); - } else if (OB_FAIL(row_iter.init(index_id_, pg_key_, schema_version, table_key, minor_tables_handle_))) { + } else if (OB_FAIL(row_iter.init( + index_id_, pg_key_, schema_version, table_key, restore_snapshot, minor_tables_handle_))) { LOG_WARN("failed to init tailored row iter", K(ret), K(index_id_), K(pg_key_), K(partition_key_)); } else if (OB_FAIL(logic_row_writer.init(&row_iter, pg_key_, ctx_->get_partition()->get_storage_file_handle()))) { LOG_WARN("failed to init logic row writer", K(ret), K(pg_key_), K(partition_key_)); @@ -12882,6 +12905,7 @@ int ObRestoreTailoredTask::get_tailored_table_key(ObITable::TableKey& table_key) } else { int64_t base_version = INT64_MAX; int64_t multi_version_start = 0; + int64_t snapshot_version = 0; int64_t start_log_ts = INT64_MAX; int64_t end_log_ts = 0; int64_t max_log_ts = 0; @@ -12912,6 +12936,7 @@ int ObRestoreTailoredTask::get_tailored_table_key(ObITable::TableKey& table_key) start_log_ts = std::min(start_log_ts, table->get_start_log_ts()); end_log_ts = std::max(end_log_ts, table->get_end_log_ts()); max_log_ts = std::max(max_log_ts, table->get_max_log_ts()); + snapshot_version = std::max(snapshot_version, table->get_snapshot_version()); } } @@ -12922,7 +12947,7 @@ int ObRestoreTailoredTask::get_tailored_table_key(ObITable::TableKey& table_key) table_key.trans_version_range_.base_version_ = base_version; table_key.trans_version_range_.multi_version_start_ = multi_version_start; table_key.trans_version_range_.snapshot_version_ = - ctx_->replica_op_arg_.phy_restore_arg_.restore_info_.restore_snapshot_version_; + std::min(ctx_->replica_op_arg_.phy_restore_arg_.restore_info_.restore_snapshot_version_, snapshot_version); table_key.version_.version_ = ctx_->replica_op_arg_.phy_restore_arg_.restore_data_version_ + 1; // table_key.upper_trans_version // table_key.max_merged_version @@ -13129,7 +13154,6 @@ int ObRestoreTailoredFinishTask::process() } if (OB_FAIL(ret)) { - } else if (part_ctx_array_.empty()) { // do nothing } else if (OB_ISNULL(partition_group = ctx_->partition_guard_.get_partition_group())) { ret = OB_ERR_UNEXPECTED; @@ -13146,7 +13170,8 @@ int ObRestoreTailoredFinishTask::process() LOG_WARN("failed to get all saved info", K(ret)); } else { ObDataStorageInfo& storage_info = save_info.get_data_info(); - storage_info.set_publish_version(restore_snapshot_version); + const int64_t publish_version = std::min(storage_info.get_publish_version(), restore_snapshot_version); + storage_info.set_publish_version(publish_version); storage_info.set_schema_version(schema_version_); if (OB_FAIL(partition_group->set_storage_info(save_info))) { LOG_WARN("failed to set storage info", K(ret), K(save_info)); diff --git a/src/storage/ob_partition_migrator.h b/src/storage/ob_partition_migrator.h index 45c041ec6e..0431820ae9 100644 --- a/src/storage/ob_partition_migrator.h +++ b/src/storage/ob_partition_migrator.h @@ -763,7 +763,7 @@ protected: common::ObIArray& local_major_tables, common::ObIArray& local_inc_tables, common::ObIArray& remote_major_tables, common::ObIArray& remote_inc_tables, - common::ObIArray& copy_sstables); + common::ObIArray& copy_sstables, ObPartitionMigrateCtx& part_ctx); int build_migrate_major_sstable_(common::ObIArray& local_major_tables, common::ObIArray& local_inc_tables, common::ObIArray& remote_major_tables, common::ObIArray& remote_inc_tables, @@ -772,7 +772,7 @@ protected: common::ObIArray& local_major_tables, common::ObIArray& local_inc_tables, common::ObIArray& remote_major_tables, common::ObIArray& remote_inc_tables, - common::ObIArray& copy_sstables); + common::ObIArray& copy_sstables, ObPartitionMigrateCtx& part_ctx); int build_migrate_minor_sstable(const bool need_reuse_local_minor, common::ObIArray& local_tables, common::ObIArray& remote_inc_tables, ObIArray& remote_gc_inc_sstables, common::ObIArray& copy_sstables); diff --git a/src/storage/ob_partition_store.cpp b/src/storage/ob_partition_store.cpp index 54abd57713..949abe7b96 100644 --- a/src/storage/ob_partition_store.cpp +++ b/src/storage/ob_partition_store.cpp @@ -766,7 +766,7 @@ int ObPartitionStore::create_multi_version_store_( return ret; } -int ObPartitionStore::get_index_status(const int64_t schema_version, +int ObPartitionStore::get_index_status(const int64_t schema_version, const bool is_physical_restore, common::ObIArray& index_status, common::ObIArray& deleted_and_error_index_ids) { @@ -807,7 +807,8 @@ int ObPartitionStore::get_index_status(const int64_t schema_version, LOG_WARN("failed to get full tenant schema guard", K(ret), K(fetch_tenant_id), K(pkey_)); } else if (OB_FAIL(schema_guard.get_schema_version(fetch_tenant_id, latest_schema_version))) { LOG_WARN("failed to get schema version", K(ret), K(fetch_tenant_id), K(pkey_)); - } else if (latest_schema_version > save_schema_version) { + } else if (latest_schema_version > save_schema_version + || (is_physical_restore && latest_schema_version >= save_schema_version)) { // befor check the delete status of index, we should make sure the schema guard is refreshed for (int64_t i = 0; OB_SUCC(ret) && i < index_status.count(); ++i) { const share::schema::ObTableSchema* table_schema = NULL; @@ -3218,8 +3219,8 @@ int ObPartitionStore::get_kept_multi_version_start( return ret; } -int ObPartitionStore::check_all_merged( - memtable::ObMemtable& memtable, const int64_t schema_version, bool& is_all_merged, bool& can_release) +int ObPartitionStore::check_all_merged(memtable::ObMemtable& memtable, const int64_t schema_version, + const bool is_physical_restore, bool& is_all_merged, bool& can_release) { int ret = OB_SUCCESS; is_all_merged = false; @@ -3232,7 +3233,8 @@ int ObPartitionStore::check_all_merged( if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); - } else if (OB_FAIL(get_index_status(schema_version, index_status, deleted_and_error_index_ids))) { + } else if (OB_FAIL( + get_index_status(schema_version, is_physical_restore, index_status, deleted_and_error_index_ids))) { if (OB_EAGAIN != ret && OB_TABLE_IS_DELETED != ret) { LOG_WARN("failed to get index ids", K(ret)); } else if (OB_TABLE_IS_DELETED == ret) { diff --git a/src/storage/ob_partition_store.h b/src/storage/ob_partition_store.h index 2b6f223f18..45dc22d032 100644 --- a/src/storage/ob_partition_store.h +++ b/src/storage/ob_partition_store.h @@ -140,8 +140,8 @@ public: int check_ready_for_read(const uint64_t table_id, bool& is_raedy); int check_major_merge_finished(const common::ObVersion& version, bool& finished); int check_need_report(const common::ObVersion& version, bool& need_report); - int check_all_merged( - memtable::ObMemtable& memtable, const int64_t schema_version, bool& is_all_merged, bool& can_release); + int check_all_merged(memtable::ObMemtable& memtable, const int64_t schema_version, const bool is_physical_restore, + bool& is_all_merged, bool& can_release); int64_t get_multi_version_start(); int get_replay_tables(const uint64_t table_id, ObIArray& replay_tables); @@ -227,8 +227,8 @@ private: int write_report_status(const ObReportStatus& status, const uint64_t data_table_id, const bool write_slog); int remove_old_table_(const common::ObVersion& kept_min_version, const int64_t multi_version_start, const int64_t kept_major_num, const int64_t backup_snapshot_version, ObMultiVersionTableStore& table_store); - int get_index_status(const int64_t schema_version, common::ObIArray& index_status, - common::ObIArray& deleted_index_ids); + int get_index_status(const int64_t schema_version, const bool is_physical_restore, + common::ObIArray& index_status, common::ObIArray& deleted_index_ids); int release_head_memtable(memtable::ObMemtable* memtable); int check_table_store_exist_nolock(const uint64_t index_id, bool& exist, ObMultiVersionTableStore*& got_table_store); int check_table_store_exist_with_lock( diff --git a/src/storage/ob_pg_storage.cpp b/src/storage/ob_pg_storage.cpp index 4f20d87108..baa339c27c 100644 --- a/src/storage/ob_pg_storage.cpp +++ b/src/storage/ob_pg_storage.cpp @@ -2872,7 +2872,8 @@ int ObPGStorage::set_pg_storage_info(const ObSavedStorageInfoV2& info) STORAGE_LOG(ERROR, "cannot set storage info when memtable count not zero", K(ret), K(*this)); } else if (meta_->storage_info_.get_data_info().get_publish_version() > info.get_data_info().get_publish_version() && ObReplicaRestoreStatus::REPLICA_RESTORE_DATA != meta_->is_restore_ && - ObReplicaRestoreStatus::REPLICA_RESTORE_CUT_DATA != meta_->is_restore_) { + ObReplicaRestoreStatus::REPLICA_RESTORE_CUT_DATA != meta_->is_restore_ && + ObReplicaRestoreStatus::REPLICA_RESTORE_STANDBY != meta_->is_restore_) { ret = OB_STATE_NOT_MATCH; STORAGE_LOG(WARN, "new storage info's publish version should not smaller than local", K(ret), K(*meta_), K(info)); } else if (OB_FAIL(alloc_meta_(next_meta_ptr))) { @@ -3470,6 +3471,8 @@ int ObPGStorage::check_can_release_pg_memtable_(ObTablesHandle& memtable_merged, int64_t timestamp = 0; memtable_merged.reset(); memtable_to_release.reset(); + // outside hold the lock + const bool is_physical_restore = meta_->is_restore_ > REPLICA_NOT_RESTORE && meta_->is_restore_ < REPLICA_RESTORE_MAX; if (OB_FAIL(get_all_pg_partition_keys_(partitions))) { STORAGE_LOG(WARN, "failed to get all pg partition keys", K(ret)); @@ -3506,7 +3509,7 @@ int ObPGStorage::check_can_release_pg_memtable_(ObTablesHandle& memtable_merged, ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "partition storage is null", K(ret), K(pkey)); } else if (OB_FAIL(storage->get_partition_store().check_all_merged( - *memtable, schema_version, part_all_merged, part_can_release))) { + *memtable, schema_version, is_physical_restore, part_all_merged, part_can_release))) { STORAGE_LOG(WARN, "failed to check if partition merged", K(ret), K(pkey)); } else { pg_all_merged = pg_all_merged && part_all_merged; diff --git a/src/storage/ob_storage_struct.cpp b/src/storage/ob_storage_struct.cpp index d12765171c..138735e870 100644 --- a/src/storage/ob_storage_struct.cpp +++ b/src/storage/ob_storage_struct.cpp @@ -522,7 +522,8 @@ int ObMigrateStatusHelper::trans_fail_status(const ObMigrateStatus& cur_status, break; } case OB_MIGRATE_STATUS_RESTORE: { - fail_status = OB_MIGRATE_STATUS_RESTORE_FAIL; + // allow observer self reentry + fail_status = OB_MIGRATE_STATUS_NONE; break; } case OB_MIGRATE_STATUS_COPY_GLOBAL_INDEX: { @@ -538,11 +539,13 @@ int ObMigrateStatusHelper::trans_fail_status(const ObMigrateStatus& cur_status, break; } case OB_MIGRATE_STATUS_RESTORE_FOLLOWER: { - fail_status = OB_MIGRATE_STATUS_RESTORE_FAIL; + // allow observer self reentry + fail_status = OB_MIGRATE_STATUS_NONE; break; } case OB_MIGRATE_STATUS_RESTORE_STANDBY: { - fail_status = OB_MIGRATE_STATUS_RESTORE_FAIL; + // allow observer self reentry + fail_status = OB_MIGRATE_STATUS_NONE; break; } case OB_MIGRATE_STATUS_LINK_MAJOR: { @@ -592,7 +595,10 @@ int ObMigrateStatusHelper::trans_reboot_status(const ObMigrateStatus& cur_status } case OB_MIGRATE_STATUS_RESTORE: case OB_MIGRATE_STATUS_RESTORE_FOLLOWER: - case OB_MIGRATE_STATUS_RESTORE_STANDBY: + case OB_MIGRATE_STATUS_RESTORE_STANDBY: { + reboot_status = OB_MIGRATE_STATUS_NONE; + break; + } case OB_MIGRATE_STATUS_RESTORE_FAIL: { reboot_status = OB_MIGRATE_STATUS_RESTORE_FAIL; break; diff --git a/unittest/storage/test_partition_migrator_table_key_mgr.cpp b/unittest/storage/test_partition_migrator_table_key_mgr.cpp index 82eab704a2..7b63e7a5ea 100644 --- a/unittest/storage/test_partition_migrator_table_key_mgr.cpp +++ b/unittest/storage/test_partition_migrator_table_key_mgr.cpp @@ -2007,6 +2007,7 @@ TEST_F(TestPartitionMigrateTableKeyMgr, test_build_migrate_major_sstable_no_new_ int64_t need_reserve_major_snapshot = 0; bool need_reuse_local_minor = true; ObVersion version(2); + ObPartitionMigrateCtx part_ctx; // local_major_tables: [0, 200] // local_inc_tables: [(180, 200, 300), (10, 30)] @@ -2056,7 +2057,8 @@ TEST_F(TestPartitionMigrateTableKeyMgr, test_build_migrate_major_sstable_no_new_ local_inc_tables, remote_major_tables, remote_inc_tables, - copy_sstables)); + copy_sstables, + part_ctx)); ASSERT_EQ(0, copy_sstables.count()); ASSERT_EQ(0, need_reserve_major_snapshot); @@ -2114,7 +2116,8 @@ TEST_F(TestPartitionMigrateTableKeyMgr, test_build_migrate_major_sstable_no_new_ local_inc_tables, remote_major_tables, remote_inc_tables, - copy_sstables)); + copy_sstables, + part_ctx)); ASSERT_EQ(0, copy_sstables.count()); ASSERT_EQ(0, need_reserve_major_snapshot); } @@ -2135,6 +2138,7 @@ TEST_F(TestPartitionMigrateTableKeyMgr, test_build_migrate_major_sstable_with_ne int64_t need_reserve_major_snapshot = 0; bool need_reuse_local_minor = true; ObVersion version(2); + ObPartitionMigrateCtx part_ctx; // local_major_tables: [0, 200] // local_inc_tables: [(180, 200, 300), (10, 30)] @@ -2193,7 +2197,8 @@ TEST_F(TestPartitionMigrateTableKeyMgr, test_build_migrate_major_sstable_with_ne local_inc_tables, remote_major_tables, remote_inc_tables, - copy_sstables)); + copy_sstables, + part_ctx)); ASSERT_EQ(1, copy_sstables.count()); // ASSERT_EQ(300, need_reserve_major_snapshot); @@ -2268,7 +2273,8 @@ TEST_F(TestPartitionMigrateTableKeyMgr, test_build_migrate_major_sstable_with_ne local_inc_tables, remote_major_tables, remote_inc_tables, - copy_sstables)); + copy_sstables, + part_ctx)); ASSERT_EQ(1, copy_sstables.count()); ASSERT_EQ(remote_major_tables.at(1), copy_sstables.at(0).src_table_key_); @@ -2343,7 +2349,8 @@ TEST_F(TestPartitionMigrateTableKeyMgr, test_build_migrate_major_sstable_with_ne local_inc_tables, remote_major_tables, remote_inc_tables, - copy_sstables)); + copy_sstables, + part_ctx)); ASSERT_EQ(1, copy_sstables.count()); // ASSERT_EQ(300, need_reserve_major_snapshot); @@ -2410,7 +2417,8 @@ TEST_F(TestPartitionMigrateTableKeyMgr, test_build_migrate_major_sstable_with_ne local_inc_tables, remote_major_tables, remote_inc_tables, - copy_sstables)); + copy_sstables, + part_ctx)); ASSERT_EQ(1, copy_sstables.count()); ASSERT_EQ(remote_major_tables.at(1), copy_sstables.at(0).src_table_key_); ASSERT_EQ(0, need_reserve_major_snapshot); @@ -2477,7 +2485,8 @@ TEST_F(TestPartitionMigrateTableKeyMgr, test_build_migrate_major_sstable_with_ne local_inc_tables, remote_major_tables, remote_inc_tables, - copy_sstables)); + copy_sstables, + part_ctx)); ASSERT_EQ(1, copy_sstables.count()); ASSERT_EQ(remote_major_tables.at(1), copy_sstables.at(0).src_table_key_); ASSERT_EQ(0, need_reserve_major_snapshot); @@ -2545,7 +2554,8 @@ TEST_F(TestPartitionMigrateTableKeyMgr, test_build_migrate_major_sstable_with_ne local_inc_tables, remote_major_tables, remote_inc_tables, - copy_sstables)); + copy_sstables, + part_ctx)); ASSERT_EQ(2, copy_sstables.count()); ASSERT_EQ(remote_major_tables.at(0), copy_sstables.at(0).src_table_key_); ASSERT_EQ(remote_major_tables.at(1), copy_sstables.at(1).src_table_key_);