From 8adcff8a8d5ae475cc8c015100465a62f13b5353 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 14 Jul 2023 09:18:22 +0000 Subject: [PATCH] [4.2][Standby] Optimize some processes when set log restore source --- .../ob_log_restore_net_driver.h | 2 +- .../restoreservice/ob_remote_log_source.h | 1 + src/rootserver/ob_recovery_ls_service.h | 2 +- src/share/CMakeLists.txt | 2 + src/share/backup/ob_backup_config.cpp | 340 +------ src/share/backup/ob_backup_config.h | 44 +- src/share/backup/ob_backup_struct.cpp | 725 +-------------- src/share/backup/ob_backup_struct.h | 68 +- src/share/backup/ob_log_restore_config.cpp | 384 ++++++++ src/share/backup/ob_log_restore_config.h | 76 ++ src/share/backup/ob_log_restore_struct.cpp | 839 ++++++++++++++++++ src/share/backup/ob_log_restore_struct.h | 113 +++ .../restore/ob_log_restore_source_mgr.cpp | 19 +- .../test_net_standby_restore_source.cpp | 2 +- 14 files changed, 1471 insertions(+), 1146 deletions(-) create mode 100644 src/share/backup/ob_log_restore_config.cpp create mode 100644 src/share/backup/ob_log_restore_config.h create mode 100644 src/share/backup/ob_log_restore_struct.cpp create mode 100644 src/share/backup/ob_log_restore_struct.h diff --git a/src/logservice/restoreservice/ob_log_restore_net_driver.h b/src/logservice/restoreservice/ob_log_restore_net_driver.h index d16ae5ed9..1caaa2198 100644 --- a/src/logservice/restoreservice/ob_log_restore_net_driver.h +++ b/src/logservice/restoreservice/ob_log_restore_net_driver.h @@ -21,7 +21,7 @@ #include "logservice/logfetcher/ob_log_fetcher_ls_ctx_additional_info_factory.h" #include "logservice/logfetcher/ob_log_fetcher_err_handler.h" #include "logservice/logfetcher/ob_log_fetcher_ls_ctx_default_factory.h" -#include "share/backup/ob_backup_struct.h" // ObRestoreSourceServiceAttr +#include "share/backup/ob_log_restore_struct.h" // ObRestoreSourceServiceAttr #include "share/ob_log_restore_proxy.h" // ObLogRestoreProxyUtil #include "ob_log_restore_driver_base.h" #include "ob_restore_log_function.h" // ObRestoreLogFunction diff --git a/src/logservice/restoreservice/ob_remote_log_source.h b/src/logservice/restoreservice/ob_remote_log_source.h index 121b472c3..75872cd4c 100644 --- a/src/logservice/restoreservice/ob_remote_log_source.h +++ b/src/logservice/restoreservice/ob_remote_log_source.h @@ -22,6 +22,7 @@ #include "logservice/palf/lsn.h" #include "share/scn.h" #include "share/backup/ob_backup_struct.h" // ObBackupPathString +#include "share/backup/ob_log_restore_struct.h" #include "share/ob_define.h" #include "share/ob_ls_id.h" #include "share/restore/ob_log_restore_source.h" // ObLogRestoreSourceType diff --git a/src/rootserver/ob_recovery_ls_service.h b/src/rootserver/ob_recovery_ls_service.h index cf6f2dc53..81c605560 100755 --- a/src/rootserver/ob_recovery_ls_service.h +++ b/src/rootserver/ob_recovery_ls_service.h @@ -20,7 +20,7 @@ #include "lib/lock/ob_spin_lock.h" //ObSpinLock #include "storage/tx/ob_multi_data_source.h" //ObTxBufferNode #include "src/share/restore/ob_log_restore_source.h" //ObLogRestoreSourceItem -#include "src/share/backup/ob_backup_struct.h" //ObRestoreSourceServiceAttr +#include "src/share/backup/ob_log_restore_struct.h" //ObRestoreSourceServiceAttr #include "share/restore/ob_log_restore_source_mgr.h" //ObLogRestoreSourceMgr #include "share/ob_log_restore_proxy.h" // ObLogRestoreProxyUtil diff --git a/src/share/CMakeLists.txt b/src/share/CMakeLists.txt index 01f370428..877f1a89f 100755 --- a/src/share/CMakeLists.txt +++ b/src/share/CMakeLists.txt @@ -46,6 +46,8 @@ ob_set_subtarget(ob_share backup backup/ob_archive_checkpoint.cpp backup/ob_archive_path.cpp backup/ob_backup_config.cpp + backup/ob_log_restore_config.cpp + backup/ob_log_restore_struct.cpp ) ob_set_subtarget(ob_share cache diff --git a/src/share/backup/ob_backup_config.cpp b/src/share/backup/ob_backup_config.cpp index c86bdcccb..d5aa0a762 100644 --- a/src/share/backup/ob_backup_config.cpp +++ b/src/share/backup/ob_backup_config.cpp @@ -12,6 +12,7 @@ #define USING_LOG_PREFIX SHARE #include "ob_backup_config.h" +#include "ob_log_restore_config.h" #include "ob_backup_data_table_operator.h" #include "ob_backup_helper.h" #include "ob_backup_store.h" @@ -838,342 +839,3 @@ int ObLogArchiveDestStateConfigParser::check_before_update_inner_config(obrpc::O return ret; } -int ObLogRestoreSourceLocationConfigParser::update_inner_config_table(common::ObISQLClient &trans) -{ - int ret = OB_SUCCESS; - ObBackupDestMgr dest_mgr; - ObLogRestoreSourceMgr restore_source_mgr; - ObAllTenantInfo tenant_info; - - if (!type_.is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid type", KR(ret), KPC(this)); - } else if (OB_FAIL(restore_source_mgr.init(tenant_id_, &trans))) { - LOG_WARN("failed to init restore_source_mgr", KR(ret), KPC(this)); - } else if (is_empty_) { - if (OB_FAIL(restore_source_mgr.delete_source())) { - LOG_WARN("failed to delete restore source", KR(ret), KPC(this)); - } - } else { - if (!archive_dest_.is_dest_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid restore source", KR(ret), KPC(this)); - LOG_USER_ERROR(OB_INVALID_ARGUMENT, "log_restore_source"); - } else if (OB_FAIL(archive_dest_.gen_path_config_items(config_items_))) { - LOG_WARN("fail to gen archive config items", KR(ret), KPC(this)); - } else { - ObString key_string = ObString::make_string(config_items_.at(0).key_.ptr()); - ObString path_string = ObString::make_string(OB_STR_PATH); - ObString value_string = ObString::make_string(config_items_.at(0).value_.ptr()); - if (1 != config_items_.count() - || path_string != key_string - || config_items_.at(0).value_.empty()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid archive source", KR(ret), KPC(this)); - LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set log_restore_source"); - } else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id_, &trans, - true /* for update */, tenant_info))) { - LOG_WARN("failed to load tenant info", KR(ret), K_(tenant_id)); - } else if (OB_FAIL(restore_source_mgr.add_location_source(tenant_info.get_recovery_until_scn(), - value_string))) { - LOG_WARN("failed to add log restore source", KR(ret), K(tenant_info), K(value_string), KPC(this)); - } - } - } - - return ret; -} - -int ObLogRestoreSourceLocationConfigParser::check_before_update_inner_config( - obrpc::ObSrvRpcProxy &rpc_proxy, - common::ObISQLClient &trans) -{ - int ret = OB_SUCCESS; - - if (is_empty_) { - } else if (!type_.is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid parser", KR(ret), KPC(this)); - } - //TODO (zbf271370) need supports format check, checks whether the directory exists, and is a log backup - - //TODO (wenjinyu.wjy) 4.3 need support access permission check - // - return ret; -} - -int ObLogRestoreSourceLocationConfigParser::do_parse_sub_config_(const common::ObString &config_str) -{ - int ret = OB_SUCCESS; - const char *target= nullptr; - if (config_str.empty()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("empty log restore source is not allowed", KR(ret), K(config_str)); - } else { - char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 }; - char *token = nullptr; - char *saveptr = nullptr; - char *p_end = nullptr; - if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", config_str.length(), config_str.ptr()))) { - LOG_WARN("fail to set config value", KR(ret), K(config_str)); - } else if (OB_ISNULL(token = ::STRTOK_R(tmp_str, "=", &saveptr))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("fail to split config str", K(ret), KP(token)); - } else if (OB_FALSE_IT(str_tolower(token, strlen(token)))) { - } else if (0 == STRCASECMP(token, OB_STR_LOCATION)) { - if (OB_FAIL(do_parse_log_archive_dest_(token, saveptr))) { - LOG_WARN("fail to do parse log archive dest", KR(ret), K(token), K(saveptr)); - } - } else { - ret = OB_NOT_SUPPORTED; - LOG_WARN("log restore source does not has this config", KR(ret), K(token)); - } - } - return ret; -} - -int ObLogRestoreSourceServiceConfigParser::parse_from(const common::ObSqlString &value) -{ - int ret = OB_SUCCESS; - char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 }; - char *token = nullptr; - char *saveptr = nullptr; - is_empty_ = false; - - if (value.empty()) { - is_empty_ = true; - } else if (value.length() > OB_MAX_BACKUP_DEST_LENGTH) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("config value is too long"); - } else if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", static_cast(value.length()), value.ptr()))) { - LOG_WARN("fail to set config value", K(value)); - } else { - token = tmp_str; - for (char *str = token; OB_SUCC(ret); str = nullptr) { - token = ::STRTOK_R(str, " ", &saveptr); - if (nullptr == token) { - break; - } else if (OB_FAIL(do_parse_sub_config_(token))) { - LOG_WARN("fail to do parse log restore source server sub config"); - } - } - } - return ret; -} - -int ObLogRestoreSourceServiceConfigParser::update_inner_config_table(common::ObISQLClient &trans) -{ - int ret = OB_SUCCESS; - ObLogRestoreSourceMgr restore_source_mgr; - ObAllTenantInfo tenant_info; - - if (!type_.is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid type", KPC(this)); - } else if (OB_FAIL(restore_source_mgr.init(tenant_id_, &trans))) { - LOG_WARN("failed to init restore_source_mgr", KPC(this)); - } else if (is_empty_) { - if (OB_FAIL(restore_source_mgr.delete_source())) { - LOG_WARN("failed to delete restore source", KPC(this)); - } - } else if (!service_attr_.is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid restore source", KPC(this)); - LOG_USER_ERROR(OB_INVALID_ARGUMENT, "log_restore_source"); - } else if (OB_FAIL(service_attr_.gen_config_items(config_items_))) { - LOG_WARN("fail to gen restore source service config items", KPC(this)); - } else { - /* - eg: 开源版本 "ip_list=127.0.0.1:1001;127.0.0.1:1002,USER=restore_user@primary_tenant,PASSWORD=xxxxxxx(密码),TENANT_ID=1002,CLUSTER_ID=10001,COMPATIBILITY_MODE=MYSQL,IS_ENCRYPTED=false" - 非开源版本 "ip_list=127.0.0.1:1001;127.0.0.1:1002,USER=restore_user@primary_tenant,PASSWORD=xxxxxxx(加密后密码),TENANT_ID=1002,CLUSTER_ID=10001,COMPATIBILITY_MODE=MYSQL,IS_ENCRYPTED=true" - */ - char value_string[OB_MAX_BACKUP_DEST_LENGTH] = { 0 }; - - if (config_items_.empty() || OB_MAX_RESTORE_SOURCE_SERVICE_CONFIG_LEN != config_items_.count()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid restore source", KPC(this)); - LOG_USER_ERROR(OB_INVALID_ARGUMENT, "log_restore_source"); - } else if (OB_FAIL(service_attr_.gen_service_attr_str(value_string, OB_MAX_BACKUP_DEST_LENGTH))) { - LOG_WARN("failed gen service attr str", K_(tenant_id)); - } else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id_, &trans, - true /* for update */, tenant_info))) { - LOG_WARN("failed to load tenant info", K_(tenant_id)); - } else if (OB_FAIL(restore_source_mgr.add_service_source(tenant_info.get_recovery_until_scn(), - value_string))) { - LOG_WARN("failed to add log restore source", K(tenant_info), K(value_string), KPC(this)); - } - } - return ret; -} - -int ObLogRestoreSourceServiceConfigParser::check_before_update_inner_config(obrpc::ObSrvRpcProxy &rpc_proxy, common::ObISQLClient &trans) -{ - int ret = OB_SUCCESS; - ObCompatibilityMode compat_mode = ObCompatibilityMode::OCEANBASE_MODE; - if (is_empty_) { - } else if (!type_.is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid parser", KR(ret), KPC(this)); - } else if (OB_FAIL(check_before_update_inner_config(false /* for_verify */, compat_mode))) { - LOG_WARN("fail to check before update inner config"); - } - return ret; -} - -int ObLogRestoreSourceServiceConfigParser::check_before_update_inner_config( - const bool for_verify, - ObCompatibilityMode &compat_mode) -{ - int ret = OB_SUCCESS; - char passwd[OB_MAX_PASSWORD_LENGTH + 1] = { 0 }; //unencrypted password - ObSqlString user_and_tenant; - compat_mode = ObCompatibilityMode::OCEANBASE_MODE; - - SMART_VAR(ObLogRestoreProxyUtil, proxy) { - if (is_empty_) { - } else if (!type_.is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid parser", KPC(this)); - } else if (0 == STRLEN(service_attr_.encrypt_passwd_) - || 0 == STRLEN(service_attr_.user_.user_name_) - || 0 == STRLEN(service_attr_.user_.tenant_name_)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("fail to parse log restore source config, please check the config parameters"); - LOG_USER_ERROR(OB_INVALID_ARGUMENT, "parse log restore source config, please check the config parameters"); - } else if (OB_FAIL(service_attr_.get_password(passwd, sizeof(passwd)))) { - LOG_WARN("get servcie attr password failed"); - } else if (OB_FAIL(service_attr_.get_user_str_(user_and_tenant))) { - LOG_WARN("get user str failed", K(service_attr_.user_.user_name_), K(service_attr_.user_.tenant_name_)); - } else if (OB_FAIL(proxy.try_init(tenant_id_/*standby*/, service_attr_.addr_, user_and_tenant.ptr(), passwd))) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("proxy connect to primary db failed", K(service_attr_.addr_), K(user_and_tenant)); - } else if (OB_FAIL(proxy.get_tenant_id(service_attr_.user_.tenant_name_, service_attr_.user_.tenant_id_))) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("get primary tenant id failed", K(tenant_id_), K(service_attr_.user_)); - } else if (OB_FAIL(proxy.get_cluster_id(service_attr_.user_.tenant_id_, service_attr_.user_.cluster_id_))) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("get primary cluster id failed", K(tenant_id_), K(service_attr_.user_.tenant_id_)); - } else if (OB_FAIL(proxy.get_compatibility_mode(service_attr_.user_.tenant_id_, service_attr_.user_.mode_))) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("get primary compatibility mode failed", K(tenant_id_), K(service_attr_.user_.tenant_id_)); - } else if (for_verify && OB_FAIL(proxy.check_begin_lsn(service_attr_.user_.tenant_id_))) { - LOG_WARN("check_begin_lsn failed", K(tenant_id_), K(service_attr_.user_.tenant_id_)); - } else { - compat_mode = service_attr_.user_.mode_; - LOG_INFO("check_before_update_inner_config success", K(tenant_id_), K(service_attr_), K(compat_mode)); - } - } - return ret; -} - -int ObLogRestoreSourceServiceConfigParser::get_compatibility_mode(common::ObCompatibilityMode &compatibility_mode) -{ - int ret = OB_SUCCESS; - compatibility_mode = OCEANBASE_MODE; - if (is_empty_) { - } else if (!type_.is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid parser", KPC(this)); - } else if (!service_attr_.user_.is_valid()) { - ret = OB_NOT_INIT; - LOG_WARN("not init", KPC(this)); - } else { - compatibility_mode = service_attr_.user_.mode_; - } - return ret; -} - -int ObLogRestoreSourceServiceConfigParser::do_parse_sub_config_(const common::ObString &config_str) -{ - int ret = OB_SUCCESS; - if (config_str.empty()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("empty log restore source sub config is not allowed", K(config_str)); - } else { - char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 }; - char *token = nullptr; - char *saveptr = nullptr; - if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", config_str.length(), config_str.ptr()))) { - LOG_WARN("fail to set config value", K(config_str)); - } else if (OB_ISNULL(token = ::STRTOK_R(tmp_str, "=", &saveptr))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("fail to split config str", KP(token)); - } else if (OB_FALSE_IT(str_tolower(token, strlen(token)))) { - } else if (0 == STRCASECMP(token, OB_STR_SERVICE)) { - if (OB_FAIL(do_parse_restore_service_host_(token, saveptr))) { - LOG_WARN("fail to do parse restore service host", K(token), K(saveptr)); - } - } else if (0 == STRCASECMP(token, OB_STR_USER)) { - if (OB_FAIL(do_parse_restore_service_user_(token, saveptr))) { - LOG_WARN("fail to do parse restore service user", K(token), K(saveptr)); - } - } else if (0 == STRCASECMP(token, OB_STR_PASSWORD)) { - if (OB_FAIL(do_parse_restore_service_passwd_(token, saveptr))) { - LOG_WARN("fail to do parse restore service passwd", K(token), K(saveptr)); - } - } else { - ret = OB_NOT_SUPPORTED; - LOG_WARN("log restore source does not has this config", K(token)); - } - } - - return ret; -} - -int ObLogRestoreSourceServiceConfigParser::do_parse_restore_service_host_(const common::ObString &name, const -common::ObString &value) -{ - int ret = OB_SUCCESS; - if (name.empty() || value.empty()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid log restore source service host config", K(name), K(value)); - } else if (OB_FAIL(service_attr_.parse_ip_port_from_str(value.ptr(), ";" /*delimiter*/))) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("parse restore source service host failed", K(name), K(value)); - } - if (OB_FAIL(ret)) { - LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set ip list config, please check the length and format of ip list"); - } - return ret; -} - -int ObLogRestoreSourceServiceConfigParser::do_parse_restore_service_user_(const common::ObString &name, const -common::ObString &value) -{ - int ret = OB_SUCCESS; - if (name.empty() || value.empty()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid log restore source service user config", K(name), K(value)); - } else { - char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 }; - char *user_token = nullptr; - char *tenant_token = nullptr; - if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", value.length(), value.ptr()))) { - LOG_WARN("fail to set user config value", K(value)); - } else if (OB_FAIL(service_attr_.set_service_user_config(tmp_str))) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("parse restore source service user failed", K(name), K(value)); - } - } - if (OB_FAIL(ret)) { - LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set user config, please check the length and format of username, tenant name"); - } - return ret; -} - -int ObLogRestoreSourceServiceConfigParser::do_parse_restore_service_passwd_(const common::ObString &name, const -common::ObString &value) -{ - int ret = OB_SUCCESS; - if (name.empty() || value.empty()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid log restore source service password config", K(name), K(value)); - } else if (OB_FAIL(service_attr_.set_service_passwd_to_encrypt(value.ptr()))) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("parse restore source service password failed", K(name), K(value)); - } - if (OB_FAIL(ret)) { - LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set password config, please check the length and format of password"); - } - return ret; -} diff --git a/src/share/backup/ob_backup_config.h b/src/share/backup/ob_backup_config.h index 182b19a01..7433827a5 100644 --- a/src/share/backup/ob_backup_config.h +++ b/src/share/backup/ob_backup_config.h @@ -21,6 +21,7 @@ #include "ob_backup_struct.h" #include "share/restore/ob_log_restore_source.h" #include "share/backup/ob_backup_store.h" +#include "ob_log_restore_struct.h" namespace oceanbase { @@ -216,49 +217,6 @@ private: DISALLOW_COPY_AND_ASSIGN(ObLogArchiveDestStateConfigParser); }; -class ObLogRestoreSourceLocationConfigParser : public ObLogArchiveDestConfigParser -{ -public: - ObLogRestoreSourceLocationConfigParser(const ObBackupConfigType::Type &type, const uint64_t tenant_id, const int64_t dest_no) - : ObLogArchiveDestConfigParser(type, tenant_id, dest_no) {} - virtual ~ObLogRestoreSourceLocationConfigParser() {} - virtual int update_inner_config_table(common::ObISQLClient &trans) override; - virtual int check_before_update_inner_config(obrpc::ObSrvRpcProxy &rpc_proxy, common::ObISQLClient &trans) override; - -protected: - virtual int do_parse_sub_config_(const common::ObString &config_str) override; - -private: - DISALLOW_COPY_AND_ASSIGN(ObLogRestoreSourceLocationConfigParser); -}; - -class ObLogRestoreSourceServiceConfigParser : public ObIBackupConfigItemParser -{ -public: - ObLogRestoreSourceServiceConfigParser(const ObBackupConfigType::Type &type, const uint64_t tenant_id) - : ObIBackupConfigItemParser(type, tenant_id) {} - virtual ~ObLogRestoreSourceServiceConfigParser() {} - virtual int parse_from(const common::ObSqlString &value) override; - virtual int update_inner_config_table(common::ObISQLClient &trans) override; - virtual int check_before_update_inner_config(obrpc::ObSrvRpcProxy &rpc_proxy, common::ObISQLClient &trans) override; - int check_before_update_inner_config( - const bool for_verify, - ObCompatibilityMode &compat_mode); - virtual int get_compatibility_mode(common::ObCompatibilityMode &compatibility_mode); -private: - int do_parse_sub_config_(const common::ObString &config_str); - int do_parse_restore_service_host_(const common::ObString &name, const common::ObString &value); - int do_parse_restore_service_user_(const common::ObString &name, const common::ObString &value); - int do_parse_restore_service_passwd_(const common::ObString &name, const common::ObString &value); - int check_doing_service_restore_(common::ObISQLClient &trans, bool &is_doing); - int check_source_service_connect_(); // todo - int update_data_backup_dest_config_(common::ObISQLClient &trans); -private: - ObRestoreSourceServiceAttr service_attr_; - bool is_empty_; - DISALLOW_COPY_AND_ASSIGN(ObLogRestoreSourceServiceConfigParser); -}; - } } diff --git a/src/share/backup/ob_backup_struct.cpp b/src/share/backup/ob_backup_struct.cpp index d147d9bba..1c9394060 100755 --- a/src/share/backup/ob_backup_struct.cpp +++ b/src/share/backup/ob_backup_struct.cpp @@ -1771,6 +1771,42 @@ int ObBackupDest::get_backup_dest_str(char *buf, const int64_t buf_size) const return ret; } +int ObBackupDest::get_backup_dest_str_with_primary_attr(char *buf, const int64_t buf_size) const +{ + int ret = OB_SUCCESS; + char storage_info_str[OB_MAX_BACKUP_STORAGE_INFO_LENGTH] = { 0 }; + share::ObBackupStore backup_store; + share::ObBackupFormatDesc desc; + ObSqlString persist_str; + if (!is_valid()) { + ret = OB_NOT_INIT; + LOG_WARN("backup dest is not init", K(ret), K(*this)); + } else if (OB_ISNULL(buf) || buf_size < share::OB_MAX_BACKUP_DEST_LENGTH) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), KP(buf), K(buf_size)); + } else if (OB_FAIL(databuff_printf(buf, buf_size, "%s", root_path_))) { + LOG_WARN("failed to get backup dest str", K(ret), K(root_path_), K(storage_info_)); + } else if (OB_FAIL(storage_info_->get_storage_info_str(storage_info_str, sizeof(storage_info_str), true/*no need encrypt*/))) { + OB_LOG(WARN, "fail to get storage info str!", K(ret), K(storage_info_)); + } else if (0 != strlen(storage_info_str) && OB_FAIL(databuff_printf(buf + strlen(buf), buf_size - strlen(buf), "?%s",storage_info_str))) { + LOG_WARN("failed to get backup dest str", K(ret), K(root_path_), K(storage_info_)); + } else if (OB_FAIL(backup_store.init(*this))) { + LOG_WARN("fail to init backup store", K(this)); + } else if (OB_FAIL(backup_store.read_format_file(desc))) { + LOG_WARN("backup store read format file failed", K(this)); + } else if (OB_UNLIKELY(! desc.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("backup store desc is invalid", K(desc)); + } else if (OB_FAIL(persist_str.assign_fmt("LOCATION=%s,TENANT_ID=%ld,CLUSTER_ID=%ld", + buf, desc.tenant_id_, desc.cluster_id_))) { + LOG_WARN("fail to assign persist str", K(this), K(desc)); + } else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast(persist_str.length()), persist_str.ptr()))) { + LOG_WARN("fail to print persist str", K(persist_str)); + } + + return ret; +} + int64_t ObBackupDest::to_string(char *buf, int64_t buf_len) const { int64_t pos = 0; @@ -4330,695 +4366,6 @@ int ObLogArchiveDestAtrr::assign(const ObLogArchiveDestAtrr& that) return ret; } -ObRestoreSourceServiceUser::ObRestoreSourceServiceUser() - : user_name_(), - tenant_name_(), - mode_(ObCompatibilityMode::OCEANBASE_MODE), - tenant_id_(OB_INVALID_TENANT_ID), - cluster_id_(OB_INVALID_CLUSTER_ID) -{ - user_name_[0] = '\0'; - tenant_name_[0] = '\0'; -} - -void ObRestoreSourceServiceUser::reset() -{ - user_name_[0] = '\0'; - tenant_name_[0] = '\0'; - mode_ = ObCompatibilityMode::OCEANBASE_MODE; - tenant_id_ = OB_INVALID_TENANT_ID; - cluster_id_ = OB_INVALID_CLUSTER_ID; -} - -bool ObRestoreSourceServiceUser::is_valid() const -{ - return STRLEN(user_name_) != 0 - && STRLEN(tenant_name_) != 0 - && (ObCompatibilityMode::OCEANBASE_MODE != mode_) - && (tenant_id_ != OB_INVALID_TENANT_ID) - && (cluster_id_ != OB_INVALID_CLUSTER_ID); -} - -bool ObRestoreSourceServiceUser::operator== (const ObRestoreSourceServiceUser &other) const -{ - return (STRLEN(this->user_name_) == STRLEN(other.user_name_)) - && (STRLEN(this->tenant_name_) == STRLEN(other.tenant_name_)) - && (0 == STRCMP(this->user_name_, other.user_name_)) - && (0 == STRCMP(this->tenant_name_, other.tenant_name_)) - && this->mode_ == other.mode_ - && this->tenant_id_ == other.tenant_id_ - && this->cluster_id_ == other.cluster_id_; -} - -int ObRestoreSourceServiceUser::assign(const ObRestoreSourceServiceUser &user) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(!user.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(user)); - } else if (OB_FAIL(databuff_printf(user_name_, sizeof(user_name_), "%s", user.user_name_))) { - LOG_WARN("user_name_ assign failed", K(user)); - } else if (OB_FAIL(databuff_printf(tenant_name_, sizeof(tenant_name_), "%s", user.tenant_name_))) { - LOG_WARN("tenant_name_ assign failed", K(user)); - } else { - mode_ = user.mode_; - tenant_id_ = user.tenant_id_; - cluster_id_ = user.cluster_id_; - } - return ret; -} - -ObRestoreSourceServiceAttr::ObRestoreSourceServiceAttr() - : addr_(), - user_() -{ - encrypt_passwd_[0] = '\0'; -} - -void ObRestoreSourceServiceAttr::reset() -{ - addr_.reset(); - user_.reset(); - encrypt_passwd_[0] = '\0'; -} - -/* - parse service attr from string - eg: "ip_list=127.0.0.1:1001;127.0.0.1:1002,USER=restore_user@primary_tenant,PASSWORD=xxxxxx,TENANT_ID=1002, - CLUSTER_ID=10001,COMPATIBILITY_MODE=MYSQL,IS_ENCRYPTED=true" -*/ -int ObRestoreSourceServiceAttr::parse_service_attr_from_str(ObSqlString &value) -{ - int ret = OB_SUCCESS; - char tmp_str[OB_MAX_BACKUP_DEST_LENGTH + 1] = { 0 }; - char *token = nullptr; - char *saveptr = nullptr; - - if (OB_UNLIKELY(value.empty() || value.length() > OB_MAX_BACKUP_DEST_LENGTH)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("log restore source attr value is invalid"); - } else if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", static_cast(value.length()), value.ptr()))) { - LOG_WARN("fail to print attr value", K(value)); - } else { - token = tmp_str; - for (char *str = token; OB_SUCC(ret); str = nullptr) { - token = ::STRTOK_R(str, ",", &saveptr); - if (nullptr == token) { - break; - } else if (OB_FAIL(do_parse_sub_service_attr(token))) { - LOG_WARN("fail to parse service attr str", K(token)); - } - } - } - return ret; -} - -int ObRestoreSourceServiceAttr::do_parse_sub_service_attr(const char *sub_value) -{ - int ret = OB_SUCCESS; - - if (OB_ISNULL(sub_value) || OB_UNLIKELY(0 == STRLEN(sub_value) || STRLEN(sub_value) > OB_MAX_BACKUP_DEST_LENGTH)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("log restore source service attr sub value is invalid", K(sub_value)); - } else { - char tmp_str[OB_MAX_BACKUP_DEST_LENGTH + 1] = { 0 }; - char *token = nullptr; - char *saveptr = nullptr; - if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%s", sub_value))) { - LOG_WARN("fail to print sub_value", K(sub_value)); - } else if (OB_ISNULL(token = ::STRTOK_R(tmp_str, "=", &saveptr))) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("fail to split sub_value str", K(token), KP(tmp_str)); - } else if (OB_FALSE_IT(str_tolower(token, strlen(token)))) { - } else if (0 == STRCASECMP(token, OB_STR_IP_LIST)) { - if (OB_FAIL(parse_ip_port_from_str(saveptr, ";"/*delimiter*/))) { - LOG_WARN("fail to parse ip list from str", K(token), K(saveptr)); - } - } else if (0 == STRCASECMP(token, OB_STR_USER)) { - if (OB_FAIL(set_service_user_config(saveptr))) { - LOG_WARN("fail to set restore service user and tenant", K(token), K(saveptr)); - } - } else if (0 == STRCASECMP(token, OB_STR_TENANT_ID)) { - if (OB_FAIL(set_service_tenant_id(saveptr))) { - LOG_WARN("fail to set restore service user and tenant", K(token), K(saveptr)); - } - } else if (0 == STRCASECMP(token, OB_STR_CLUSTER_ID)) { - if (OB_FAIL(set_service_cluster_id(saveptr))) { - LOG_WARN("fail to set restore service user and tenant", K(token), K(saveptr)); - } - } else if (0 == STRCASECMP(token, OB_COMPATIBILITY_MODE)) { - if (OB_FAIL(set_service_compatibility_mode(saveptr))) { - LOG_WARN("fail to set restore service compatibility mode", K(token), K(saveptr)); - } - } else if (0 == STRCASECMP(token, OB_STR_PASSWORD)) { - if (OB_FAIL(set_service_passwd_no_encrypt(saveptr))) { - LOG_WARN("fail to set restore service passwd", K(token), K(saveptr)); - } - } else if (0 == STRCASECMP(token, OB_STR_IS_ENCRYPTED)) { - } else { - ret = OB_NOT_SUPPORTED; - LOG_WARN("log restore source service do not have this config", K(token)); - } - } - return ret; -} - -int ObRestoreSourceServiceAttr::set_service_user_config(const char *user_tenant) -{ - int ret = OB_SUCCESS; - if (OB_ISNULL(user_tenant) || OB_UNLIKELY(0 == STRLEN(user_tenant) - || STRLEN(user_tenant) > OB_MAX_RESTORE_USER_AND_TENANT_LEN)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("log restore source service user is invalid"); - } else { - char *user_name = nullptr; - char *tenant_name = nullptr; - char tmp_str[OB_MAX_RESTORE_USER_AND_TENANT_LEN + 1] = { 0 }; - if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%s", user_tenant))) { - LOG_WARN("fail to print user_tenant", K(user_tenant)); - } else if (OB_ISNULL(user_name = ::STRTOK_R(tmp_str, "@", &tenant_name))) { - LOG_WARN("fail to split user_tenant", K(tmp_str)); - } else if (OB_FAIL(set_service_user(user_name, tenant_name))) { - LOG_WARN("fail to set service user", K(user_name), K(tenant_name)); - } - } - return ret; -} - -int ObRestoreSourceServiceAttr::set_service_user(const char *user, const char *tenant) -{ - int ret = OB_SUCCESS; - if (OB_ISNULL(user) || OB_ISNULL(tenant) || OB_UNLIKELY(0 == STRLEN(user) || 0 == STRLEN(tenant) - || STRLEN(user) > OB_MAX_USER_NAME_LENGTH || STRLEN(tenant) > OB_MAX_ORIGINAL_NANE_LENGTH)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("log restore source service user name or tenant name is invalid"); - } else if (OB_FAIL(databuff_printf(user_.user_name_, sizeof(user_.user_name_), "%s", user))) { - LOG_WARN("fail to print user name", K(user)); - } else if (OB_FAIL(databuff_printf(user_.tenant_name_, sizeof(user_.tenant_name_), "%s", tenant))) { - LOG_WARN("fail to print tenant name", K(tenant)); - } - LOG_DEBUG("set service user", K(user), K(tenant), K(user_)); - return ret; -} - -int ObRestoreSourceServiceAttr::set_service_tenant_id(const char *tenant_id_str) -{ - int ret = OB_SUCCESS; - char *p_end = nullptr; - if (OB_FAIL(ob_strtoull(tenant_id_str, p_end, user_.tenant_id_))) { - LOG_WARN("fail to set service tenant id from string", K(tenant_id_str)); - } - return ret; -} - -int ObRestoreSourceServiceAttr::set_service_cluster_id(const char *cluster_id_str) -{ - int ret = OB_SUCCESS; - if (OB_FAIL(ob_atoll(cluster_id_str, user_.cluster_id_))) { - LOG_WARN("fail to set service cluster id from string", K(cluster_id_str)); - } - return ret; -} - -int ObRestoreSourceServiceAttr::set_service_compatibility_mode(const char *compat_mode) -{ - int ret = OB_SUCCESS; - char token[OB_MAX_COMPAT_MODE_STR_LEN + 1] = { 0 }; - if (OB_ISNULL(compat_mode) || OB_UNLIKELY(0 == STRLEN(compat_mode) - || STRLEN(compat_mode) > OB_MAX_COMPAT_MODE_STR_LEN)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("log restore source service compat_mode is invalid"); - } else if (OB_FAIL(databuff_printf(token, sizeof(token), "%s", compat_mode))) { - LOG_WARN("fail to print compat mode", K(compat_mode)); - } else if (OB_FALSE_IT(str_toupper(token, STRLEN(token)))) { - } else if ((0 == STRCASECMP(token, "MYSQL"))) { - user_.mode_ = ObCompatibilityMode::MYSQL_MODE; - } else if ((0 == STRCASECMP(token, "ORACLE"))) { - user_.mode_ = ObCompatibilityMode::ORACLE_MODE; - } else { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid compatibility mode", K(compat_mode), K(ret)); - } - return ret; -} - -int ObRestoreSourceServiceAttr::set_service_passwd_to_encrypt(const char *passwd) -{ - int ret = OB_SUCCESS; - if (OB_ISNULL(passwd) || OB_UNLIKELY(0 == STRLEN(passwd) || STRLEN(passwd) > OB_MAX_PASSWORD_LENGTH)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument"); - } else if (OB_FAIL(databuff_printf(encrypt_passwd_, sizeof(encrypt_passwd_), "%s", passwd))) { - LOG_WARN("fail to print encrypt password"); - } - LOG_INFO("set service password success"); - return ret; -} - -int ObRestoreSourceServiceAttr::set_service_passwd_no_encrypt(const char *passwd) -{ - int ret = OB_SUCCESS; - if (OB_ISNULL(passwd) || OB_UNLIKELY(0 == STRLEN(passwd) || STRLEN(passwd) > OB_MAX_PASSWORD_LENGTH)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument"); - } else if (OB_FAIL(databuff_printf(encrypt_passwd_, sizeof(encrypt_passwd_), "%s", passwd))) { - LOG_WARN("fail to print encrypt password"); - } - return ret; -} - -// 127.0.0.1:1000;127.0.0.1:1001;127.0.0.1:1002 ==> 127.0.0.1:1000 127.0.0.1:1001 127.0.0.1:1002 -int ObRestoreSourceServiceAttr::parse_ip_port_from_str(const char *ip_list, const char *delimiter) -{ - int ret = OB_SUCCESS; - if (OB_ISNULL(ip_list) || OB_ISNULL(delimiter) || OB_UNLIKELY(STRLEN(ip_list) > OB_MAX_RESTORE_SOURCE_IP_LIST_LEN)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("log restore source service ip list is invalid"); - } - - char tmp_str[OB_MAX_RESTORE_SOURCE_IP_LIST_LEN + 1] = { 0 }; - char *token = nullptr; - char *saveptr = nullptr; - if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%s", ip_list))) { - LOG_WARN("fail to get ip list", K(ip_list)); - } else { - token = tmp_str; - for (char *str = token; OB_SUCC(ret); str = nullptr) { - token = ::STRTOK_R(str, delimiter, &saveptr); - if (nullptr == token) { - break; - } else { - ObAddr addr; - if (OB_FAIL(addr.parse_from_string(ObString(token)))) { - LOG_WARN("fail to parse addr", K(addr), K(token)); - } else if (!addr.is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("service addr is invalid", K(addr), K(ip_list)); - } else if (OB_FAIL(addr_.push_back(addr))){ - LOG_WARN("fail to push addr", K(addr)); - } - } - } - } - return ret; -} - -bool ObRestoreSourceServiceAttr::is_valid() const -{ - return service_user_is_valid() - && service_host_is_valid() - && service_password_is_valid(); -} - -bool ObRestoreSourceServiceAttr::service_user_is_valid() const -{ - return user_.is_valid(); -} - -bool ObRestoreSourceServiceAttr::service_host_is_valid() const -{ - return !addr_.empty(); -} - -bool ObRestoreSourceServiceAttr::service_password_is_valid() const -{ - return strlen(encrypt_passwd_) != 0; -} - -int ObRestoreSourceServiceAttr::gen_config_items(common::ObIArray &items) const -{ - int ret = OB_SUCCESS; - BackupConfigItemPair config; - if (!is_valid()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguement", KPC(this)); - } - - ObSqlString tmp; - // gen ip list config - if (OB_FAIL(ret)) { - } else if (OB_FAIL(config.key_.assign(OB_STR_IP_LIST))) { - LOG_WARN("failed to assign ip list key"); - } else if (OB_FAIL(get_ip_list_str_(tmp))) { - LOG_WARN("failed to get ip list str"); - } else if (OB_FAIL(config.value_.assign(tmp.ptr()))) { - LOG_WARN("failed to assign ip list value"); - } else if(OB_FAIL(items.push_back(config))) { - LOG_WARN("failed to push service source attr config", K(config)); - } - - // gen user config - if (OB_FAIL(ret)) { - } else if (OB_FAIL(config.key_.assign(OB_STR_USER))) { - LOG_WARN("failed to assign user key"); - } else if (OB_FAIL(get_user_str_(tmp))) { - LOG_WARN("failed to get user str"); - } else if (OB_FAIL(config.value_.assign(tmp.ptr()))) { - LOG_WARN("failed to assign user value"); - } else if (OB_FAIL(items.push_back(config))) { - LOG_WARN("failed to push service source attr config", K(config)); - } - - // gen password config - if (OB_FAIL(ret)) { - } else if (OB_FAIL(config.key_.assign(OB_STR_PASSWORD))) { - LOG_WARN("failed to assign password key"); - } else if (OB_FAIL(get_password_str_(tmp))) { - LOG_WARN("failed to get password str"); - } else if (OB_FAIL(config.value_.assign(tmp.ptr()))) { - LOG_WARN("failed to assign password value"); - } else if (OB_FAIL(items.push_back(config))) { - LOG_WARN("failed to push service source attr config", K(config)); - } - - // gen tenant id - if (OB_FAIL(ret)) { - } else if (OB_FAIL(config.key_.assign(OB_STR_TENANT_ID))) { - LOG_WARN("failed to assign tenant id key"); - } else if (OB_FAIL(get_tenant_id_str_(tmp))) { - LOG_WARN("failed to get tenant id str"); - } else if (OB_FAIL(config.value_.assign(tmp.ptr()))) { - LOG_WARN("failed to assign tenant id value"); - } else if (OB_FAIL(items.push_back(config))) { - LOG_WARN("failed to push service source attr config", K(config)); - } - - // gen cluster id - if (OB_FAIL(ret)) { - } else if (OB_FAIL(config.key_.assign(OB_STR_CLUSTER_ID))) { - LOG_WARN("failed to assign cluster id key"); - } else if (OB_FAIL(get_cluster_id_str_(tmp))) { - LOG_WARN("failed to get cluster id str"); - } else if (OB_FAIL(config.value_.assign(tmp.ptr()))) { - LOG_WARN("failed to assign cluster id value"); - } else if (OB_FAIL(items.push_back(config))) { - LOG_WARN("failed to push service source attr config", K(config)); - } - - // gen compatibility mode - if (OB_FAIL(ret)) { - } else if (OB_FAIL(config.key_.assign(OB_COMPATIBILITY_MODE))) { - LOG_WARN("failed to assign compatibility mode"); - } else if (OB_FAIL(get_compatibility_mode_str_(tmp))) { - LOG_WARN("failed to get compatibility mode str"); - } else if (OB_FAIL(config.value_.assign(tmp.ptr()))) { - LOG_WARN("failed to assign compatibility mode value"); - } else if (OB_FAIL(items.push_back(config))) { - LOG_WARN("failed to push service source attr config", K(config)); - } - - // gen is encrypted - if (OB_FAIL(ret)) { - } else if (OB_FAIL(config.key_.assign(OB_STR_IS_ENCRYPTED))) { - LOG_WARN("failed to assign encrypted key"); - } else if (OB_FAIL(get_is_encrypted_str_(tmp))) { - LOG_WARN("failed to get is encrypted str"); - } else if (OB_FAIL(config.value_.assign(tmp.ptr()))) { - LOG_WARN("failed to assign encrypted value"); - } else if (OB_FAIL(items.push_back(config))) { - LOG_WARN("failed to push service source attr config", K(config)); - } - - return ret; -} - -int ObRestoreSourceServiceAttr::gen_service_attr_str(char *buf, const int64_t buf_size) const -{ - int ret = OB_SUCCESS; - ObSqlString str; - ObSqlString ip_str; - ObSqlString user_str; - ObSqlString passwd_str; - ObSqlString compat_str; - ObSqlString is_encrypted_str; - - if (OB_UNLIKELY(!is_valid() || OB_ISNULL(buf) || buf_size <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid service attr argument", KP(buf), K(buf_size)); - } else if (OB_FAIL(get_ip_list_str_(ip_str))) { - LOG_WARN("get ip list str failed"); - } else if (OB_FAIL(get_user_str_(user_str))) { - LOG_WARN("get user str failed"); - } else if (OB_FAIL(get_password_str_(passwd_str))) { - LOG_WARN("get password str failed"); - } else if (OB_FAIL(get_compatibility_mode_str_(compat_str))) { - LOG_WARN("get compatibility mode str failed"); - } else if (OB_FAIL(get_is_encrypted_str_(is_encrypted_str))) { - LOG_WARN("get encrypted str failed"); - } else if (OB_FAIL(str.assign_fmt("IP_LIST=%s,USER=%s,PASSWORD=%s,TENANT_ID=%ld,CLUSTER_ID=%ld,COMPATIBILITY_MODE=%s,IS_ENCRYPTED=%s", - ip_str.ptr(), user_str.ptr(), passwd_str.ptr(), user_.tenant_id_, user_.cluster_id_, compat_str.ptr(), is_encrypted_str.ptr()))) { - LOG_WARN("fail to assign str", K(compat_str), K(is_encrypted_str)); - } else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast(str.length()), str.ptr()))) { - LOG_WARN("fail to print str", K(str)); - } - return ret; -} - -int ObRestoreSourceServiceAttr::get_ip_list_str_(char *buf, const int64_t buf_size) const -{ - int ret = OB_SUCCESS; - ObSqlString str; - if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid ip list argument", KP(buf), K(buf_size)); - } else if (OB_FAIL(get_ip_list_str_(str))) { - LOG_WARN("get ip list str failed"); - } else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast(str.length()), str.ptr()))) { - LOG_WARN("fail to print str", K(str)); - } - return ret; -} - -int ObRestoreSourceServiceAttr::get_ip_list_str_(ObSqlString &str) const -{ - int ret = OB_SUCCESS; - - ARRAY_FOREACH_N(addr_, idx, cnt) { - char ip_str[MAX_IP_PORT_LENGTH] = { 0 }; - const ObAddr ip = addr_.at(idx); - if (OB_FAIL(ip.ip_port_to_string(ip_str, sizeof(ip_str)))) { - LOG_WARN("fail to convert ip port to string", K(ip), K(ip_str)); - } else { - if (0 == idx && (OB_FAIL(str.assign_fmt("%s", ip_str)))) { - LOG_WARN("fail to assign ip str", K(str) ,K(ip), K(ip_str)); - } else if ( 0 != idx && OB_FAIL(str.append_fmt(";%s", ip_str))) { - LOG_WARN("fail to append ip str", K(str), K(ip), K(ip_str)); - } - } - } - LOG_DEBUG("get ip list str", K(str)); - return ret; -} - -int ObRestoreSourceServiceAttr::get_user_str_(char *buf, const int64_t buf_size) const -{ - int ret = OB_SUCCESS; - ObSqlString res_str; - if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid user str argument", KP(buf), K(buf_size)); - } else if (OB_FAIL(get_user_str_(res_str))) { - LOG_WARN("fail to get user str"); - } else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast(res_str.length()), res_str.ptr()))) { - LOG_WARN("fail to print str"); - } - return ret; -} - -int ObRestoreSourceServiceAttr::get_user_str_(ObSqlString &user_str) const -{ - int ret = OB_SUCCESS; - if (OB_FAIL(user_str.assign(user_.user_name_))) { - LOG_WARN("fail to assign user name" ,K(user_.user_name_)); - } else if (OB_FAIL(user_str.append_fmt("@%s",user_.tenant_name_))) { - LOG_WARN("fail to assign tenant name", K(user_.user_name_)); - } - return ret; -} - -int ObRestoreSourceServiceAttr::get_password_str_(char *buf, const int64_t buf_size) const -{ - int ret = OB_SUCCESS; - ObSqlString passwd_str; - if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid password str argument", KP(buf), K(buf_size)); - } else if (OB_FAIL(get_password_str_(passwd_str))) { - LOG_WARN("fail to get password str"); - } else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast(passwd_str.length()), passwd_str.ptr()))) { - LOG_WARN("fail to print str"); - } - return ret; -} - -int ObRestoreSourceServiceAttr::get_password_str_(ObSqlString &passwd_str) const -{ - int ret = OB_SUCCESS; - if (OB_FAIL(passwd_str.assign(encrypt_passwd_))) { - LOG_WARN("fail to assign password"); - } - return ret; -} - -int ObRestoreSourceServiceAttr::get_tenant_id_str_(char *buf, const int64_t buf_size) const -{ - int ret = OB_SUCCESS; - if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid tenant id str argument", KP(buf), K(buf_size)); - } else if (OB_FAIL(databuff_printf(buf, buf_size, "%ld", user_.tenant_id_))) { - LOG_WARN("fail to print tenant id str", K(user_.tenant_id_)); - } - return ret; -} - -int ObRestoreSourceServiceAttr::get_tenant_id_str_(ObSqlString &tenant_str) const -{ - int ret = OB_SUCCESS; - if (OB_FAIL(tenant_str.assign_fmt("%ld", user_.tenant_id_))) { - LOG_WARN("fail to assign tenant id str", K(user_.tenant_id_)); - } - return ret; -} - -int ObRestoreSourceServiceAttr::get_cluster_id_str_(char *buf, const int64_t buf_size) const -{ - int ret = OB_SUCCESS; - if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid cluster id str argument", KP(buf), K(buf_size)); - } else if (OB_FAIL(databuff_printf(buf, buf_size, "%ld", user_.cluster_id_))) { - LOG_WARN("fail to print cluster id str", K(user_.cluster_id_)); - } - return ret; -} - -int ObRestoreSourceServiceAttr::get_cluster_id_str_(ObSqlString &cluster_str) const -{ - int ret = OB_SUCCESS; - if (OB_FAIL(cluster_str.assign_fmt("%ld", user_.cluster_id_))) { - LOG_WARN("fail to assign cluster id str", K(user_.cluster_id_)); - } - return ret; -} - -int ObRestoreSourceServiceAttr::get_compatibility_mode_str_(char *buf, const int64_t buf_size) const -{ - int ret = OB_SUCCESS; - ObSqlString tmp_str; - if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid compatibilidy mode str argument", KP(buf), K(buf_size)); - } else if (OB_FAIL(get_compatibility_mode_str_(tmp_str))) { - LOG_WARN("fail to get compatibility mode str"); - } else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast(tmp_str.length()), tmp_str.ptr()))) { - LOG_WARN("fail to print compatibility mode str", K(tmp_str)); - } - return ret; -} - -int ObRestoreSourceServiceAttr::get_compatibility_mode_str_(ObSqlString &compatibility_str) const -{ - int ret = OB_SUCCESS; - const char *compat_mode = "INVALID"; - if (ObCompatibilityMode::MYSQL_MODE == user_.mode_) { - compat_mode = "MYSQL"; - } else if (ObCompatibilityMode::ORACLE_MODE == user_.mode_) { - compat_mode = "ORACLE"; - } else { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("compatibility mode is invalid", K(user_.mode_)); - } - if (OB_FAIL(ret)) { - } else if (OB_FAIL(compatibility_str.assign_fmt("%s", compat_mode))) { - LOG_WARN("fail to assign compatibility mode", K(compat_mode)); - } - return ret; -} - -int ObRestoreSourceServiceAttr::get_is_encrypted_str_(char *buf, const int64_t buf_size) const -{ - int ret = OB_SUCCESS; - if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid encrypted str argument", KP(buf), K(buf_size)); - } else if (OB_FAIL(databuff_printf(buf, buf_size, "%s", "false"))) { - LOG_WARN("fail to print encrypted str"); - } - return ret; -} - -int ObRestoreSourceServiceAttr::get_is_encrypted_str_(ObSqlString &encrypted_str) const -{ - int ret = OB_SUCCESS; - if (OB_FAIL(encrypted_str.assign("false"))) { - LOG_WARN("fail to print str"); - } - return ret; -} - - -int ObRestoreSourceServiceAttr::get_password(char *passwd, const int64_t buf_size) const -{ - int ret = OB_SUCCESS; - char tmp_passwd[OB_MAX_PASSWORD_LENGTH + 1] = { 0 }; - if (OB_ISNULL(passwd) || OB_UNLIKELY((buf_size <= 0) || (buf_size < OB_MAX_PASSWORD_LENGTH))) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid parameter when get password", K(passwd), K(STRLEN(passwd))); - } else if (OB_FAIL(databuff_printf(passwd, buf_size, "%s", encrypt_passwd_))) { - LOG_WARN("failed to print encrypt_passwd_ key", K(encrypt_passwd_)); - } - return ret; -} - -bool ObRestoreSourceServiceAttr::compare_addr_(common::ObArray addr) const -{ - int bret = true; - int ret = OB_SUCCESS; - if (addr.size() != this->addr_.size()) { - bret = false; - } else { - ARRAY_FOREACH_N(addr, idx, cnt) { - bool tmp_cmp = false; - ARRAY_FOREACH_N(this->addr_, idx1, cnt1) { - if (addr.at(idx) == this->addr_.at(idx1)) { - tmp_cmp = true; - } - } - if (!tmp_cmp) { - bret = false; - break; - } - } - } - return bret; -} - -bool ObRestoreSourceServiceAttr::operator==(const ObRestoreSourceServiceAttr &other) const -{ - return (this->user_ == other.user_) - && (STRLEN(this->encrypt_passwd_) == STRLEN(other.encrypt_passwd_)) - && (0 == STRCMP(this->encrypt_passwd_, other.encrypt_passwd_)) - && compare_addr_(other.addr_); -} - -int ObRestoreSourceServiceAttr::assign(const ObRestoreSourceServiceAttr &attr) -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY(!attr.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(attr)); - } else if (FALSE_IT(addr_.reset())) { - } else if (OB_FAIL(addr_.assign(attr.addr_))) { - LOG_WARN("addr_ assign failed", K(attr)); - } else if (OB_FAIL(user_.assign(attr.user_))) { - LOG_WARN("user_ assign failed", K(attr)); - } else if (OB_FAIL(databuff_printf(encrypt_passwd_, sizeof(attr.encrypt_passwd_), "%s", attr.encrypt_passwd_))) { - LOG_WARN("passwd_ assign failed", K(attr)); - } - return ret; -} - int share::trim_right_backslash(ObBackupPathString &path) { int ret = OB_SUCCESS; diff --git a/src/share/backup/ob_backup_struct.h b/src/share/backup/ob_backup_struct.h index 8cb63f328..a5bdb47a6 100755 --- a/src/share/backup/ob_backup_struct.h +++ b/src/share/backup/ob_backup_struct.h @@ -910,6 +910,7 @@ public: bool is_root_path_equal(const ObBackupDest &backup_dest) const; int is_backup_path_equal(const ObBackupDest &backup_dest, bool &is_equal) const; int get_backup_dest_str(char *buf, const int64_t buf_size) const; + int get_backup_dest_str_with_primary_attr(char *buf, const int64_t buf_size) const; int get_backup_path_str(char *buf, const int64_t buf_size) const; common::ObString get_root_path() const { return root_path_;} share::ObBackupStorageInfo *get_storage_info() const { return storage_info_;} @@ -1705,73 +1706,6 @@ struct ObLogArchiveDestAtrr final ObLogArchiveDestState state_; }; -struct ObRestoreSourceServiceUser final -{ - ObRestoreSourceServiceUser(); - ~ObRestoreSourceServiceUser() {} - void reset(); - bool is_valid() const; - int assign(const ObRestoreSourceServiceUser &user); - char user_name_[OB_MAX_USER_NAME_LENGTH]; - char tenant_name_[OB_MAX_ORIGINAL_NANE_LENGTH]; - ObCompatibilityMode mode_; - uint64_t tenant_id_; - int64_t cluster_id_; - bool operator == (const ObRestoreSourceServiceUser &other) const; - TO_STRING_KV(K_(user_name), K_(tenant_name), K_(tenant_id), K_(cluster_id)); -}; - -struct ObRestoreSourceServiceAttr final -{ - ObRestoreSourceServiceAttr(); - ~ObRestoreSourceServiceAttr() {} - void reset(); - int parse_service_attr_from_str(ObSqlString &str); - int do_parse_sub_service_attr(const char *sub_value); - int set_service_user_config(const char *user_tenant); - int set_service_user(const char *user, const char *tenant); - int set_service_tenant_id(const char *tenant_id); - int set_service_cluster_id(const char *cluster_id); - int set_service_compatibility_mode(const char *compatibility_mode); - // It need to convert password to encrypted password when pasre from log_restore_source config. - int set_service_passwd_to_encrypt(const char *passwd); - // There's no need to convert password to encrypted password when parse from __all_log_restore_source record. - int set_service_passwd_no_encrypt(const char *passwd); - int parse_ip_port_from_str(const char *buf, const char *delimiter); - bool is_valid() const; - bool service_user_is_valid() const; - bool service_host_is_valid() const; - bool service_password_is_valid() const; - int gen_config_items(common::ObIArray &items) const; - int gen_service_attr_str(char *buf, const int64_t buf_size) const; - int gen_service_attr_str(ObSqlString &str) const; - int get_ip_list_str_(char *buf, const int64_t buf_size) const; - int get_ip_list_str_(ObSqlString &str) const; - int get_user_str_(char *buf, const int64_t buf_size) const; - int get_user_str_(ObSqlString &str) const; - int get_password_str_(char *buf, const int64_t buf_size) const; - int get_password_str_(ObSqlString &str) const; - int get_tenant_id_str_(char *buf ,const int64_t buf_size) const; - int get_tenant_id_str_(ObSqlString &str) const; - int get_cluster_id_str_(char *buf, const int64_t buf_size) const; - int get_cluster_id_str_(ObSqlString &str) const; - int get_compatibility_mode_str_(char *buf, const int64_t buf_size) const; - int get_compatibility_mode_str_(ObSqlString &str) const; - int get_is_encrypted_str_(char *buf, const int64_t buf_size) const; - int get_is_encrypted_str_(ObSqlString &str) const; - int set_encrypt_password_key_(const char *encrypt_key); - int get_decrypt_password_key_(char *unencrypt_key, const int64_t buf_size) const; - // return the origion password - int get_password(char *passwd, const int64_t buf_size) const; - bool compare_addr_(common::ObArray addr) const; - bool operator ==(const ObRestoreSourceServiceAttr &other) const; - int assign(const ObRestoreSourceServiceAttr &attr); - TO_STRING_KV(K_(addr), K_(user), K_(encrypt_passwd)); - common::ObArray addr_; - ObRestoreSourceServiceUser user_; - char encrypt_passwd_[OB_MAX_BACKUP_SERIALIZEKEY_LENGTH]; -}; - // trim '/' from right until encouter a non backslash charactor. int trim_right_backslash(ObBackupPathString &path); diff --git a/src/share/backup/ob_log_restore_config.cpp b/src/share/backup/ob_log_restore_config.cpp new file mode 100644 index 000000000..11c448fde --- /dev/null +++ b/src/share/backup/ob_log_restore_config.cpp @@ -0,0 +1,384 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SHARE +#include "ob_backup_config.h" +#include "ob_log_restore_config.h" +#include "share/restore/ob_log_restore_source_mgr.h" // ObLogRestoreSourceMgr +#include "share/ob_log_restore_proxy.h" // ObLogRestoreProxyUtil + +using namespace oceanbase; +using namespace share; +using namespace common; + +int ObLogRestoreSourceLocationConfigParser::update_inner_config_table(common::ObISQLClient &trans) +{ + int ret = OB_SUCCESS; + ObBackupDestMgr dest_mgr; + ObLogRestoreSourceMgr restore_source_mgr; + ObAllTenantInfo tenant_info; + + if (!type_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid type", KR(ret), KPC(this)); + } else if (OB_FAIL(restore_source_mgr.init(tenant_id_, &trans))) { + LOG_WARN("failed to init restore_source_mgr", KR(ret), KPC(this)); + } else if (is_empty_) { + if (OB_FAIL(restore_source_mgr.delete_source())) { + LOG_WARN("failed to delete restore source", KR(ret), KPC(this)); + } + } else { + if (!archive_dest_.is_dest_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid restore source", KR(ret), KPC(this)); + LOG_USER_ERROR(OB_INVALID_ARGUMENT, "log_restore_source"); + } else if (OB_FAIL(archive_dest_.gen_path_config_items(config_items_))) { + LOG_WARN("fail to gen archive config items", KR(ret), KPC(this)); + } else { + ObString key_string = ObString::make_string(config_items_.at(0).key_.ptr()); + ObString path_string = ObString::make_string(OB_STR_PATH); + ObString value_string = ObString::make_string(config_items_.at(0).value_.ptr()); + + if (1 != config_items_.count() + || path_string != key_string + || config_items_.at(0).value_.empty()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid archive source", KR(ret), KPC(this)); + LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set log_restore_source"); + } else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id_, &trans, + true /* for update */, tenant_info))) { + LOG_WARN("failed to load tenant info", KR(ret), K_(tenant_id)); + } else if (OB_FAIL(restore_source_mgr.add_location_source(tenant_info.get_recovery_until_scn(), + value_string))) { + LOG_WARN("failed to add log restore source", KR(ret), K(tenant_info), K(value_string), KPC(this)); + } + } + } + + return ret; +} + +int ObLogRestoreSourceLocationConfigParser::check_before_update_inner_config( + obrpc::ObSrvRpcProxy &rpc_proxy, + common::ObISQLClient &trans) +{ + int ret = OB_SUCCESS; + + if (is_empty_) { + } else if (!type_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid parser", KR(ret), KPC(this)); + } else { + share::ObBackupStore backup_store; + share::ObBackupFormatDesc desc; + if (OB_FAIL(backup_store.init(archive_dest_.dest_))) { + LOG_WARN("backup store init failed", K_(archive_dest_.dest)); + } else if (OB_FAIL(backup_store.read_format_file(desc))) { + LOG_WARN("backup store read format file failed", K_(archive_dest_.dest)); + LOG_USER_ERROR(OB_INVALID_ARGUMENT, "access the log restore source location"); + } else if (OB_UNLIKELY(! desc.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("backup store desc is invalid"); + LOG_USER_ERROR(OB_INVALID_ARGUMENT, "access the log restore source location"); + } else if (GCONF.cluster_id == desc.cluster_id_ && tenant_id_ == desc.tenant_id_) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("set standby itself as log restore source is not allowed"); + LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set standby itself as log restore source"); + } + } + //TODO (wenjinyu.wjy) need support access permission check + // + return ret; +} + +int ObLogRestoreSourceLocationConfigParser::do_parse_sub_config_(const common::ObString &config_str) +{ + int ret = OB_SUCCESS; + const char *target= nullptr; + if (config_str.empty()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("empty log restore source is not allowed", KR(ret), K(config_str)); + } else { + char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 }; + char *token = nullptr; + char *saveptr = nullptr; + char *p_end = nullptr; + if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", config_str.length(), config_str.ptr()))) { + LOG_WARN("fail to set config value", KR(ret), K(config_str)); + } else if (OB_ISNULL(token = ::STRTOK_R(tmp_str, "=", &saveptr))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to split config str", K(ret), KP(token)); + } else if (OB_FALSE_IT(str_tolower(token, strlen(token)))) { + } else if (0 == STRCASECMP(token, OB_STR_LOCATION)) { + if (OB_FAIL(do_parse_log_archive_dest_(token, saveptr))) { + LOG_WARN("fail to do parse log archive dest", KR(ret), K(token), K(saveptr)); + } + } else { + ret = OB_NOT_SUPPORTED; + LOG_WARN("log restore source does not has this config", KR(ret), K(token)); + } + } + return ret; +} + +int ObLogRestoreSourceServiceConfigParser::parse_from(const common::ObSqlString &value) +{ + int ret = OB_SUCCESS; + char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 }; + char *token = nullptr; + char *saveptr = nullptr; + is_empty_ = false; + + if (value.empty()) { + is_empty_ = true; + } else if (value.length() > OB_MAX_BACKUP_DEST_LENGTH) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("config value is too long"); + } else if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", static_cast(value.length()), value.ptr()))) { + LOG_WARN("fail to set config value", K(value)); + } else { + token = tmp_str; + for (char *str = token; OB_SUCC(ret); str = nullptr) { + token = ::STRTOK_R(str, " ", &saveptr); + if (nullptr == token) { + break; + } else if (OB_FAIL(do_parse_sub_config_(token))) { + LOG_WARN("fail to do parse log restore source server sub config"); + } + } + } + return ret; +} + +int ObLogRestoreSourceServiceConfigParser::update_inner_config_table(common::ObISQLClient &trans) +{ + int ret = OB_SUCCESS; + ObLogRestoreSourceMgr restore_source_mgr; + ObAllTenantInfo tenant_info; + + if (!type_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid type", KPC(this)); + } else if (OB_FAIL(restore_source_mgr.init(tenant_id_, &trans))) { + LOG_WARN("failed to init restore_source_mgr", KPC(this)); + } else if (is_empty_) { + if (OB_FAIL(restore_source_mgr.delete_source())) { + LOG_WARN("failed to delete restore source", KPC(this)); + } + } else if (!service_attr_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid restore source", KPC(this)); + LOG_USER_ERROR(OB_INVALID_ARGUMENT, "log_restore_source"); + } else if (OB_FAIL(service_attr_.gen_config_items(config_items_))) { + LOG_WARN("fail to gen restore source service config items", KPC(this)); + } else { + /* + eg: 开源版本 "ip_list=127.0.0.1:1001;127.0.0.1:1002,USER=restore_user@primary_tenant,PASSWORD=xxxxxxx(密码),TENANT_ID=1002,CLUSTER_ID=10001,COMPATIBILITY_MODE=MYSQL,IS_ENCRYPTED=false" + 非开源版本 "ip_list=127.0.0.1:1001;127.0.0.1:1002,USER=restore_user@primary_tenant,PASSWORD=xxxxxxx(加密后密码),TENANT_ID=1002,CLUSTER_ID=10001,COMPATIBILITY_MODE=MYSQL,IS_ENCRYPTED=true" + */ + char value_string[OB_MAX_BACKUP_DEST_LENGTH] = { 0 }; + + if (config_items_.empty() || OB_MAX_RESTORE_SOURCE_SERVICE_CONFIG_LEN != config_items_.count()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid restore source", KPC(this)); + LOG_USER_ERROR(OB_INVALID_ARGUMENT, "log_restore_source"); + } else if (OB_FAIL(service_attr_.gen_service_attr_str(value_string, OB_MAX_BACKUP_DEST_LENGTH))) { + LOG_WARN("failed gen service attr str", K_(tenant_id)); + } else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id_, &trans, + true /* for update */, tenant_info))) { + LOG_WARN("failed to load tenant info", K_(tenant_id)); + } else if (OB_FAIL(restore_source_mgr.add_service_source(tenant_info.get_recovery_until_scn(), + value_string))) { + LOG_WARN("failed to add log restore source", K(tenant_info), K(value_string), KPC(this)); + } + } + return ret; +} + +int ObLogRestoreSourceServiceConfigParser::check_before_update_inner_config(obrpc::ObSrvRpcProxy &rpc_proxy, common::ObISQLClient &trans) +{ + int ret = OB_SUCCESS; + ObCompatibilityMode compat_mode = ObCompatibilityMode::OCEANBASE_MODE; + if (is_empty_) { + } else if (!type_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid parser", KR(ret), KPC(this)); + } else if (OB_FAIL(check_before_update_inner_config(false /* for_verify */, compat_mode))) { + LOG_WARN("fail to check before update inner config"); + } + return ret; +} + +int ObLogRestoreSourceServiceConfigParser::check_before_update_inner_config( + const bool for_verify, + ObCompatibilityMode &compat_mode) +{ + int ret = OB_SUCCESS; + char passwd[OB_MAX_PASSWORD_LENGTH + 1] = { 0 }; //unencrypted password + ObSqlString user_and_tenant; + compat_mode = ObCompatibilityMode::OCEANBASE_MODE; + bool source_is_self = false; + + SMART_VAR(ObLogRestoreProxyUtil, proxy) { + if (is_empty_) { + } else if (!type_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid parser", KPC(this)); + } else if (0 == STRLEN(service_attr_.encrypt_passwd_) + || 0 == STRLEN(service_attr_.user_.user_name_) + || 0 == STRLEN(service_attr_.user_.tenant_name_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("fail to parse log restore source config, please check the config parameters"); + LOG_USER_ERROR(OB_INVALID_ARGUMENT, "parse log restore source config, please check the config parameters"); + } else if (OB_FAIL(service_attr_.get_password(passwd, sizeof(passwd)))) { + LOG_WARN("get servcie attr password failed"); + } else if (OB_FAIL(service_attr_.get_user_str_(user_and_tenant))) { + LOG_WARN("get user str failed", K(service_attr_.user_.user_name_), K(service_attr_.user_.tenant_name_)); + } else if (OB_FAIL(proxy.try_init(tenant_id_/*standby*/, service_attr_.addr_, user_and_tenant.ptr(), passwd))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("proxy connect to primary db failed", K(service_attr_.addr_), K(user_and_tenant)); + } else if (OB_FAIL(proxy.get_tenant_id(service_attr_.user_.tenant_name_, service_attr_.user_.tenant_id_))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get primary tenant id failed", K(tenant_id_), K(service_attr_.user_)); + } else if (OB_FAIL(proxy.get_cluster_id(service_attr_.user_.tenant_id_, service_attr_.user_.cluster_id_))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get primary cluster id failed", K(tenant_id_), K(service_attr_.user_.tenant_id_)); + } else if (!for_verify && OB_FAIL(service_attr_.check_restore_source_is_self_(source_is_self, tenant_id_))) { + LOG_WARN("check restore source is self failed"); + } else if (source_is_self) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("set standby itself as log restore source is not allowed"); + LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set standby itself as log restore source"); + } else if (OB_FAIL(proxy.get_compatibility_mode(service_attr_.user_.tenant_id_, service_attr_.user_.mode_))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get primary compatibility mode failed", K(tenant_id_), K(service_attr_.user_.tenant_id_)); + } else if (for_verify && OB_FAIL(proxy.check_begin_lsn(service_attr_.user_.tenant_id_))) { + LOG_WARN("check_begin_lsn failed", K(tenant_id_), K(service_attr_.user_.tenant_id_)); + } else { + compat_mode = service_attr_.user_.mode_; + LOG_INFO("check_before_update_inner_config success", K(tenant_id_), K(service_attr_), K(compat_mode)); + } + } + return ret; +} + +int ObLogRestoreSourceServiceConfigParser::get_compatibility_mode(common::ObCompatibilityMode &compatibility_mode) +{ + int ret = OB_SUCCESS; + compatibility_mode = OCEANBASE_MODE; + if (is_empty_) { + } else if (!type_.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid parser", KPC(this)); + } else if (!service_attr_.user_.is_valid()) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KPC(this)); + } else { + compatibility_mode = service_attr_.user_.mode_; + } + return ret; +} + +int ObLogRestoreSourceServiceConfigParser::do_parse_sub_config_(const common::ObString &config_str) +{ + int ret = OB_SUCCESS; + if (config_str.empty()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("empty log restore source sub config is not allowed", K(config_str)); + } else { + char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 }; + char *token = nullptr; + char *saveptr = nullptr; + if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", config_str.length(), config_str.ptr()))) { + LOG_WARN("fail to set config value", K(config_str)); + } else if (OB_ISNULL(token = ::STRTOK_R(tmp_str, "=", &saveptr))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to split config str", KP(token)); + } else if (OB_FALSE_IT(str_tolower(token, strlen(token)))) { + } else if (0 == STRCASECMP(token, OB_STR_SERVICE)) { + if (OB_FAIL(do_parse_restore_service_host_(token, saveptr))) { + LOG_WARN("fail to do parse restore service host", K(token), K(saveptr)); + } + } else if (0 == STRCASECMP(token, OB_STR_USER)) { + if (OB_FAIL(do_parse_restore_service_user_(token, saveptr))) { + LOG_WARN("fail to do parse restore service user", K(token), K(saveptr)); + } + } else if (0 == STRCASECMP(token, OB_STR_PASSWORD)) { + if (OB_FAIL(do_parse_restore_service_passwd_(token, saveptr))) { + LOG_WARN("fail to do parse restore service passwd", K(token), K(saveptr)); + } + } else { + ret = OB_NOT_SUPPORTED; + LOG_WARN("log restore source does not has this config", K(token)); + } + } + + return ret; +} + +int ObLogRestoreSourceServiceConfigParser::do_parse_restore_service_host_(const common::ObString &name, const +common::ObString &value) +{ + int ret = OB_SUCCESS; + if (name.empty() || value.empty()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid log restore source service host config", K(name), K(value)); + } else if (OB_FAIL(service_attr_.parse_ip_port_from_str(value.ptr(), ";" /*delimiter*/))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("parse restore source service host failed", K(name), K(value)); + } + if (OB_FAIL(ret)) { + LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set ip list config, please check the length and format of ip list"); + } + return ret; +} + +int ObLogRestoreSourceServiceConfigParser::do_parse_restore_service_user_(const common::ObString &name, const +common::ObString &value) +{ + int ret = OB_SUCCESS; + if (name.empty() || value.empty()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid log restore source service user config", K(name), K(value)); + } else { + char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 }; + char *user_token = nullptr; + char *tenant_token = nullptr; + if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", value.length(), value.ptr()))) { + LOG_WARN("fail to set user config value", K(value)); + } else if (OB_FAIL(service_attr_.set_service_user_config(tmp_str))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("parse restore source service user failed", K(name), K(value)); + } + } + if (OB_FAIL(ret)) { + LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set user config, please check the length and format of username, tenant name"); + } + return ret; +} + +int ObLogRestoreSourceServiceConfigParser::do_parse_restore_service_passwd_(const common::ObString &name, const +common::ObString &value) +{ + int ret = OB_SUCCESS; + if (name.empty() || value.empty()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid log restore source service password config", K(name), K(value)); + } else if (OB_FAIL(service_attr_.set_service_passwd_to_encrypt(value.ptr()))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("parse restore source service password failed", K(name), K(value)); + } + if (OB_FAIL(ret)) { + LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set password config, please check the length and format of password"); + } + return ret; +} diff --git a/src/share/backup/ob_log_restore_config.h b/src/share/backup/ob_log_restore_config.h new file mode 100644 index 000000000..cbb9e8c55 --- /dev/null +++ b/src/share/backup/ob_log_restore_config.h @@ -0,0 +1,76 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_SHARE_BACKUP_OB_LOG_RESTORE_CONFIG_H_ +#define OCEANBASE_SHARE_BACKUP_OB_LOG_RESTORE_CONFIG_H_ + +#include +#include "lib/string/ob_sql_string.h" +#include "lib/mysqlclient/ob_mysql_proxy.h" +#include "ob_backup_struct.h" +#include "ob_backup_config.h" +#include "ob_log_restore_struct.h" + +namespace oceanbase +{ +namespace share +{ +class ObLogArchiveDestConfigParser; +class ObBackupConfigType; +class ObIBackupConfigItemParser; + +class ObLogRestoreSourceLocationConfigParser : public ObLogArchiveDestConfigParser +{ +public: + ObLogRestoreSourceLocationConfigParser(const ObBackupConfigType::Type &type, const uint64_t tenant_id, const int64_t dest_no) + : ObLogArchiveDestConfigParser(type, tenant_id, dest_no) {} + virtual ~ObLogRestoreSourceLocationConfigParser() {} + virtual int update_inner_config_table(common::ObISQLClient &trans) override; + virtual int check_before_update_inner_config(obrpc::ObSrvRpcProxy &rpc_proxy, common::ObISQLClient &trans) override; + +protected: + virtual int do_parse_sub_config_(const common::ObString &config_str) override; + +private: + DISALLOW_COPY_AND_ASSIGN(ObLogRestoreSourceLocationConfigParser); +}; + +class ObLogRestoreSourceServiceConfigParser : public ObIBackupConfigItemParser +{ +public: + ObLogRestoreSourceServiceConfigParser(const ObBackupConfigType::Type &type, const uint64_t tenant_id) + : ObIBackupConfigItemParser(type, tenant_id) {} + virtual ~ObLogRestoreSourceServiceConfigParser() {} + virtual int parse_from(const common::ObSqlString &value) override; + virtual int update_inner_config_table(common::ObISQLClient &trans) override; + virtual int check_before_update_inner_config(obrpc::ObSrvRpcProxy &rpc_proxy, common::ObISQLClient &trans) override; + int check_before_update_inner_config( + const bool for_verify, + ObCompatibilityMode &compat_mode); + virtual int get_compatibility_mode(common::ObCompatibilityMode &compatibility_mode); +private: + int do_parse_sub_config_(const common::ObString &config_str); + int do_parse_restore_service_host_(const common::ObString &name, const common::ObString &value); + int do_parse_restore_service_user_(const common::ObString &name, const common::ObString &value); + int do_parse_restore_service_passwd_(const common::ObString &name, const common::ObString &value); + int check_doing_service_restore_(common::ObISQLClient &trans, bool &is_doing); + int update_data_backup_dest_config_(common::ObISQLClient &trans); +private: + ObRestoreSourceServiceAttr service_attr_; + bool is_empty_; + DISALLOW_COPY_AND_ASSIGN(ObLogRestoreSourceServiceConfigParser); +}; + +}//share +}//oceanbase + +#endif /* OCEANBASE_SHARE_BACKUP_OB_LOG_RESTORE_CONFIG_H_ */ \ No newline at end of file diff --git a/src/share/backup/ob_log_restore_struct.cpp b/src/share/backup/ob_log_restore_struct.cpp new file mode 100644 index 000000000..3fb11a787 --- /dev/null +++ b/src/share/backup/ob_log_restore_struct.cpp @@ -0,0 +1,839 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SHARE +#include "lib/utility/ob_defer.h" +#include "share/backup/ob_backup_config.h" +#include "ob_log_restore_struct.h" + + +using namespace oceanbase; +using namespace common; +using namespace share; + +ObRestoreSourceServiceUser::ObRestoreSourceServiceUser() + : user_name_(), + tenant_name_(), + mode_(ObCompatibilityMode::OCEANBASE_MODE), + tenant_id_(OB_INVALID_TENANT_ID), + cluster_id_(OB_INVALID_CLUSTER_ID) +{ + user_name_[0] = '\0'; + tenant_name_[0] = '\0'; +} + +void ObRestoreSourceServiceUser::reset() +{ + user_name_[0] = '\0'; + tenant_name_[0] = '\0'; + mode_ = ObCompatibilityMode::OCEANBASE_MODE; + tenant_id_ = OB_INVALID_TENANT_ID; + cluster_id_ = OB_INVALID_CLUSTER_ID; +} + +bool ObRestoreSourceServiceUser::is_valid() const +{ + return STRLEN(user_name_) != 0 + && STRLEN(tenant_name_) != 0 + && (ObCompatibilityMode::OCEANBASE_MODE != mode_) + && (tenant_id_ != OB_INVALID_TENANT_ID) + && (cluster_id_ != OB_INVALID_CLUSTER_ID); +} + +bool ObRestoreSourceServiceUser::operator== (const ObRestoreSourceServiceUser &other) const +{ + return (STRLEN(this->user_name_) == STRLEN(other.user_name_)) + && (STRLEN(this->tenant_name_) == STRLEN(other.tenant_name_)) + && (0 == STRCMP(this->user_name_, other.user_name_)) + && (0 == STRCMP(this->tenant_name_, other.tenant_name_)) + && this->mode_ == other.mode_ + && this->tenant_id_ == other.tenant_id_ + && this->cluster_id_ == other.cluster_id_; +} + +int ObRestoreSourceServiceUser::assign(const ObRestoreSourceServiceUser &user) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!user.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(user)); + } else if (OB_FAIL(databuff_printf(user_name_, sizeof(user_name_), "%s", user.user_name_))) { + LOG_WARN("user_name_ assign failed", K(user)); + } else if (OB_FAIL(databuff_printf(tenant_name_, sizeof(tenant_name_), "%s", user.tenant_name_))) { + LOG_WARN("tenant_name_ assign failed", K(user)); + } else { + mode_ = user.mode_; + tenant_id_ = user.tenant_id_; + cluster_id_ = user.cluster_id_; + } + return ret; +} + +ObRestoreSourceServiceAttr::ObRestoreSourceServiceAttr() + : addr_(), + user_() +{ + encrypt_passwd_[0] = '\0'; +} + +void ObRestoreSourceServiceAttr::reset() +{ + addr_.reset(); + user_.reset(); + encrypt_passwd_[0] = '\0'; +} + +/* + parse service attr from string + eg: "ip_list=127.0.0.1:1001;127.0.0.1:1002,USER=restore_user@primary_tenant,PASSWORD=xxxxxx,TENANT_ID=1002, + CLUSTER_ID=10001,COMPATIBILITY_MODE=MYSQL,IS_ENCRYPTED=true" +*/ +int ObRestoreSourceServiceAttr::parse_service_attr_from_str(ObSqlString &value) +{ + int ret = OB_SUCCESS; + char tmp_str[OB_MAX_BACKUP_DEST_LENGTH + 1] = { 0 }; + char *token = nullptr; + char *saveptr = nullptr; + + if (OB_UNLIKELY(value.empty() || value.length() > OB_MAX_BACKUP_DEST_LENGTH)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("log restore source attr value is invalid"); + } else if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", static_cast(value.length()), value.ptr()))) { + LOG_WARN("fail to print attr value", K(value)); + } else { + token = tmp_str; + for (char *str = token; OB_SUCC(ret); str = nullptr) { + token = ::STRTOK_R(str, ",", &saveptr); + if (nullptr == token) { + break; + } else if (OB_FAIL(do_parse_sub_service_attr(token))) { + LOG_WARN("fail to parse service attr str", K(token)); + } + } + } + return ret; +} + +int ObRestoreSourceServiceAttr::do_parse_sub_service_attr(const char *sub_value) +{ + int ret = OB_SUCCESS; + + if (OB_ISNULL(sub_value) || OB_UNLIKELY(0 == STRLEN(sub_value) || STRLEN(sub_value) > OB_MAX_BACKUP_DEST_LENGTH)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("log restore source service attr sub value is invalid", K(sub_value)); + } else { + char tmp_str[OB_MAX_BACKUP_DEST_LENGTH + 1] = { 0 }; + char *token = nullptr; + char *saveptr = nullptr; + if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%s", sub_value))) { + LOG_WARN("fail to print sub_value", K(sub_value)); + } else if (OB_ISNULL(token = ::STRTOK_R(tmp_str, "=", &saveptr))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("fail to split sub_value str", K(token), KP(tmp_str)); + } else if (OB_FALSE_IT(str_tolower(token, strlen(token)))) { + } else if (0 == STRCASECMP(token, OB_STR_IP_LIST)) { + if (OB_FAIL(parse_ip_port_from_str(saveptr, ";"/*delimiter*/))) { + LOG_WARN("fail to parse ip list from str", K(token), K(saveptr)); + } + } else if (0 == STRCASECMP(token, OB_STR_USER)) { + if (OB_FAIL(set_service_user_config(saveptr))) { + LOG_WARN("fail to set restore service user and tenant", K(token), K(saveptr)); + } + } else if (0 == STRCASECMP(token, OB_STR_TENANT_ID)) { + if (OB_FAIL(set_service_tenant_id(saveptr))) { + LOG_WARN("fail to set restore service user and tenant", K(token), K(saveptr)); + } + } else if (0 == STRCASECMP(token, OB_STR_CLUSTER_ID)) { + if (OB_FAIL(set_service_cluster_id(saveptr))) { + LOG_WARN("fail to set restore service user and tenant", K(token), K(saveptr)); + } + } else if (0 == STRCASECMP(token, OB_COMPATIBILITY_MODE)) { + if (OB_FAIL(set_service_compatibility_mode(saveptr))) { + LOG_WARN("fail to set restore service compatibility mode", K(token), K(saveptr)); + } + } else if (0 == STRCASECMP(token, OB_STR_PASSWORD)) { + if (OB_FAIL(set_service_passwd_no_encrypt(saveptr))) { + LOG_WARN("fail to set restore service passwd", K(token), K(saveptr)); + } + } else if (0 == STRCASECMP(token, OB_STR_IS_ENCRYPTED)) { + } else { + ret = OB_NOT_SUPPORTED; + LOG_WARN("log restore source service do not have this config", K(token)); + } + } + return ret; +} + +int ObRestoreSourceServiceAttr::set_service_user_config(const char *user_tenant) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(user_tenant) || OB_UNLIKELY(0 == STRLEN(user_tenant) + || STRLEN(user_tenant) > OB_MAX_RESTORE_USER_AND_TENANT_LEN)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("log restore source service user is invalid"); + } else { + char *user_name = nullptr; + char *tenant_name = nullptr; + char tmp_str[OB_MAX_RESTORE_USER_AND_TENANT_LEN + 1] = { 0 }; + if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%s", user_tenant))) { + LOG_WARN("fail to print user_tenant", K(user_tenant)); + } else if (OB_ISNULL(user_name = ::STRTOK_R(tmp_str, "@", &tenant_name))) { + LOG_WARN("fail to split user_tenant", K(tmp_str)); + } else if (OB_FAIL(set_service_user(user_name, tenant_name))) { + LOG_WARN("fail to set service user", K(user_name), K(tenant_name)); + } + } + return ret; +} + +int ObRestoreSourceServiceAttr::set_service_user(const char *user, const char *tenant) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(user) || OB_ISNULL(tenant) || OB_UNLIKELY(0 == STRLEN(user) || 0 == STRLEN(tenant) + || STRLEN(user) > OB_MAX_USER_NAME_LENGTH || STRLEN(tenant) > OB_MAX_ORIGINAL_NANE_LENGTH)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("log restore source service user name or tenant name is invalid"); + } else if (OB_FAIL(databuff_printf(user_.user_name_, sizeof(user_.user_name_), "%s", user))) { + LOG_WARN("fail to print user name", K(user)); + } else if (OB_FAIL(databuff_printf(user_.tenant_name_, sizeof(user_.tenant_name_), "%s", tenant))) { + LOG_WARN("fail to print tenant name", K(tenant)); + } + LOG_DEBUG("set service user", K(user), K(tenant), K(user_)); + return ret; +} + +int ObRestoreSourceServiceAttr::set_service_tenant_id(const char *tenant_id_str) +{ + int ret = OB_SUCCESS; + char *p_end = nullptr; + if (OB_FAIL(ob_strtoull(tenant_id_str, p_end, user_.tenant_id_))) { + LOG_WARN("fail to set service tenant id from string", K(tenant_id_str)); + } + return ret; +} + +int ObRestoreSourceServiceAttr::set_service_cluster_id(const char *cluster_id_str) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(ob_atoll(cluster_id_str, user_.cluster_id_))) { + LOG_WARN("fail to set service cluster id from string", K(cluster_id_str)); + } + return ret; +} + +int ObRestoreSourceServiceAttr::set_service_compatibility_mode(const char *compat_mode) +{ + int ret = OB_SUCCESS; + char token[OB_MAX_COMPAT_MODE_STR_LEN + 1] = { 0 }; + if (OB_ISNULL(compat_mode) || OB_UNLIKELY(0 == STRLEN(compat_mode) + || STRLEN(compat_mode) > OB_MAX_COMPAT_MODE_STR_LEN)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("log restore source service compat_mode is invalid"); + } else if (OB_FAIL(databuff_printf(token, sizeof(token), "%s", compat_mode))) { + LOG_WARN("fail to print compat mode", K(compat_mode)); + } else if (OB_FALSE_IT(str_toupper(token, STRLEN(token)))) { + } else if ((0 == STRCASECMP(token, "MYSQL"))) { + user_.mode_ = ObCompatibilityMode::MYSQL_MODE; + } else if ((0 == STRCASECMP(token, "ORACLE"))) { + user_.mode_ = ObCompatibilityMode::ORACLE_MODE; + } else { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid compatibility mode", K(compat_mode), K(ret)); + } + return ret; +} + +int ObRestoreSourceServiceAttr::set_service_passwd_to_encrypt(const char *passwd) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(passwd) || OB_UNLIKELY(0 == STRLEN(passwd) || STRLEN(passwd) > OB_MAX_PASSWORD_LENGTH)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument"); + } else if (OB_FAIL(databuff_printf(encrypt_passwd_, sizeof(encrypt_passwd_), "%s", passwd))) { + LOG_WARN("fail to print encrypt password"); + } + LOG_INFO("set service password success"); + return ret; +} + +int ObRestoreSourceServiceAttr::set_service_passwd_no_encrypt(const char *passwd) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(passwd) || OB_UNLIKELY(0 == STRLEN(passwd) || STRLEN(passwd) > OB_MAX_PASSWORD_LENGTH)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument"); + } else if (OB_FAIL(databuff_printf(encrypt_passwd_, sizeof(encrypt_passwd_), "%s", passwd))) { + LOG_WARN("fail to print encrypt password"); + } + return ret; +} + +// 127.0.0.1:1000;127.0.0.1:1001;127.0.0.1:1002 ==> 127.0.0.1:1000 127.0.0.1:1001 127.0.0.1:1002 +int ObRestoreSourceServiceAttr::parse_ip_port_from_str(const char *ip_list, const char *delimiter) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(ip_list) || OB_ISNULL(delimiter) || OB_UNLIKELY(STRLEN(ip_list) > OB_MAX_RESTORE_SOURCE_IP_LIST_LEN)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("log restore source service ip list is invalid"); + } + + char tmp_str[OB_MAX_RESTORE_SOURCE_IP_LIST_LEN + 1] = { 0 }; + char *token = nullptr; + char *saveptr = nullptr; + if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%s", ip_list))) { + LOG_WARN("fail to get ip list", K(ip_list)); + } else { + token = tmp_str; + for (char *str = token; OB_SUCC(ret); str = nullptr) { + token = ::STRTOK_R(str, delimiter, &saveptr); + if (nullptr == token) { + break; + } else { + ObAddr addr; + if (OB_FAIL(addr.parse_from_string(ObString(token)))) { + LOG_WARN("fail to parse addr", K(addr), K(token)); + } else if (!addr.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("service addr is invalid", K(addr), K(ip_list)); + } else if (OB_FAIL(addr_.push_back(addr))){ + LOG_WARN("fail to push addr", K(addr)); + } + } + } + } + return ret; +} + +bool ObRestoreSourceServiceAttr::is_valid() const +{ + return service_user_is_valid() + && service_host_is_valid() + && service_password_is_valid(); +} + +bool ObRestoreSourceServiceAttr::service_user_is_valid() const +{ + return user_.is_valid(); +} + +bool ObRestoreSourceServiceAttr::service_host_is_valid() const +{ + return !addr_.empty(); +} + +bool ObRestoreSourceServiceAttr::service_password_is_valid() const +{ + return strlen(encrypt_passwd_) != 0; +} + +int ObRestoreSourceServiceAttr::gen_config_items(common::ObIArray &items) const +{ + int ret = OB_SUCCESS; + BackupConfigItemPair config; + if (!is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arguement", KPC(this)); + } + + ObSqlString tmp; + // gen ip list config + if (OB_FAIL(ret)) { + } else if (OB_FAIL(config.key_.assign(OB_STR_IP_LIST))) { + LOG_WARN("failed to assign ip list key"); + } else if (OB_FAIL(get_ip_list_str_(tmp))) { + LOG_WARN("failed to get ip list str"); + } else if (OB_FAIL(config.value_.assign(tmp.ptr()))) { + LOG_WARN("failed to assign ip list value"); + } else if(OB_FAIL(items.push_back(config))) { + LOG_WARN("failed to push service source attr config", K(config)); + } + + // gen user config + if (OB_FAIL(ret)) { + } else if (OB_FAIL(config.key_.assign(OB_STR_USER))) { + LOG_WARN("failed to assign user key"); + } else if (OB_FAIL(get_user_str_(tmp))) { + LOG_WARN("failed to get user str"); + } else if (OB_FAIL(config.value_.assign(tmp.ptr()))) { + LOG_WARN("failed to assign user value"); + } else if (OB_FAIL(items.push_back(config))) { + LOG_WARN("failed to push service source attr config", K(config)); + } + + // gen password config + if (OB_FAIL(ret)) { + } else if (OB_FAIL(config.key_.assign(OB_STR_PASSWORD))) { + LOG_WARN("failed to assign password key"); + } else if (OB_FAIL(get_password_str_(tmp))) { + LOG_WARN("failed to get password str"); + } else if (OB_FAIL(config.value_.assign(tmp.ptr()))) { + LOG_WARN("failed to assign password value"); + } else if (OB_FAIL(items.push_back(config))) { + LOG_WARN("failed to push service source attr config", K(config)); + } + + // gen tenant id + if (OB_FAIL(ret)) { + } else if (OB_FAIL(config.key_.assign(OB_STR_TENANT_ID))) { + LOG_WARN("failed to assign tenant id key"); + } else if (OB_FAIL(get_tenant_id_str_(tmp))) { + LOG_WARN("failed to get tenant id str"); + } else if (OB_FAIL(config.value_.assign(tmp.ptr()))) { + LOG_WARN("failed to assign tenant id value"); + } else if (OB_FAIL(items.push_back(config))) { + LOG_WARN("failed to push service source attr config", K(config)); + } + + // gen cluster id + if (OB_FAIL(ret)) { + } else if (OB_FAIL(config.key_.assign(OB_STR_CLUSTER_ID))) { + LOG_WARN("failed to assign cluster id key"); + } else if (OB_FAIL(get_cluster_id_str_(tmp))) { + LOG_WARN("failed to get cluster id str"); + } else if (OB_FAIL(config.value_.assign(tmp.ptr()))) { + LOG_WARN("failed to assign cluster id value"); + } else if (OB_FAIL(items.push_back(config))) { + LOG_WARN("failed to push service source attr config", K(config)); + } + + // gen compatibility mode + if (OB_FAIL(ret)) { + } else if (OB_FAIL(config.key_.assign(OB_COMPATIBILITY_MODE))) { + LOG_WARN("failed to assign compatibility mode"); + } else if (OB_FAIL(get_compatibility_mode_str_(tmp))) { + LOG_WARN("failed to get compatibility mode str"); + } else if (OB_FAIL(config.value_.assign(tmp.ptr()))) { + LOG_WARN("failed to assign compatibility mode value"); + } else if (OB_FAIL(items.push_back(config))) { + LOG_WARN("failed to push service source attr config", K(config)); + } + + // gen is encrypted + if (OB_FAIL(ret)) { + } else if (OB_FAIL(config.key_.assign(OB_STR_IS_ENCRYPTED))) { + LOG_WARN("failed to assign encrypted key"); + } else if (OB_FAIL(get_is_encrypted_str_(tmp))) { + LOG_WARN("failed to get is encrypted str"); + } else if (OB_FAIL(config.value_.assign(tmp.ptr()))) { + LOG_WARN("failed to assign encrypted value"); + } else if (OB_FAIL(items.push_back(config))) { + LOG_WARN("failed to push service source attr config", K(config)); + } + + return ret; +} + +int ObRestoreSourceServiceAttr::gen_service_attr_str(char *buf, const int64_t buf_size) const +{ + int ret = OB_SUCCESS; + ObSqlString str; + ObSqlString ip_str; + ObSqlString user_str; + ObSqlString passwd_str; + ObSqlString compat_str; + ObSqlString is_encrypted_str; + + if (OB_UNLIKELY(!is_valid() || OB_ISNULL(buf) || buf_size <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid service attr argument", KP(buf), K(buf_size)); + } else if (OB_FAIL(get_ip_list_str_(ip_str))) { + LOG_WARN("get ip list str failed"); + } else if (OB_FAIL(get_user_str_(user_str))) { + LOG_WARN("get user str failed"); + } else if (OB_FAIL(get_password_str_(passwd_str))) { + LOG_WARN("get password str failed"); + } else if (OB_FAIL(get_compatibility_mode_str_(compat_str))) { + LOG_WARN("get compatibility mode str failed"); + } else if (OB_FAIL(get_is_encrypted_str_(is_encrypted_str))) { + LOG_WARN("get encrypted str failed"); + } else if (OB_FAIL(str.assign_fmt("IP_LIST=%s,USER=%s,PASSWORD=%s,TENANT_ID=%ld,CLUSTER_ID=%ld,COMPATIBILITY_MODE=%s,IS_ENCRYPTED=%s", + ip_str.ptr(), user_str.ptr(), passwd_str.ptr(), user_.tenant_id_, user_.cluster_id_, compat_str.ptr(), is_encrypted_str.ptr()))) { + LOG_WARN("fail to assign str", K(compat_str), K(is_encrypted_str)); + } else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast(str.length()), str.ptr()))) { + LOG_WARN("fail to print str", K(str)); + } + return ret; +} + +int ObRestoreSourceServiceAttr::get_ip_list_str_(char *buf, const int64_t buf_size) const +{ + int ret = OB_SUCCESS; + ObSqlString str; + if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid ip list argument", KP(buf), K(buf_size)); + } else if (OB_FAIL(get_ip_list_str_(str))) { + LOG_WARN("get ip list str failed"); + } else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast(str.length()), str.ptr()))) { + LOG_WARN("fail to print str", K(str)); + } + return ret; +} + +int ObRestoreSourceServiceAttr::get_ip_list_str_(ObSqlString &str) const +{ + int ret = OB_SUCCESS; + + ARRAY_FOREACH_N(addr_, idx, cnt) { + char ip_str[MAX_IP_PORT_LENGTH] = { 0 }; + const ObAddr ip = addr_.at(idx); + if (OB_FAIL(ip.ip_port_to_string(ip_str, sizeof(ip_str)))) { + LOG_WARN("fail to convert ip port to string", K(ip), K(ip_str)); + } else { + if (0 == idx && (OB_FAIL(str.assign_fmt("%s", ip_str)))) { + LOG_WARN("fail to assign ip str", K(str) ,K(ip), K(ip_str)); + } else if ( 0 != idx && OB_FAIL(str.append_fmt(";%s", ip_str))) { + LOG_WARN("fail to append ip str", K(str), K(ip), K(ip_str)); + } + } + } + LOG_DEBUG("get ip list str", K(str)); + return ret; +} + +int ObRestoreSourceServiceAttr::get_user_str_(char *buf, const int64_t buf_size) const +{ + int ret = OB_SUCCESS; + ObSqlString res_str; + if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid user str argument", KP(buf), K(buf_size)); + } else if (OB_FAIL(get_user_str_(res_str))) { + LOG_WARN("fail to get user str"); + } else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast(res_str.length()), res_str.ptr()))) { + LOG_WARN("fail to print str"); + } + return ret; +} + +int ObRestoreSourceServiceAttr::get_user_str_(ObSqlString &user_str) const +{ + int ret = OB_SUCCESS; + if (OB_FAIL(user_str.assign(user_.user_name_))) { + LOG_WARN("fail to assign user name" ,K(user_.user_name_)); + } else if (OB_FAIL(user_str.append_fmt("@%s",user_.tenant_name_))) { + LOG_WARN("fail to assign tenant name", K(user_.user_name_)); + } + return ret; +} + +int ObRestoreSourceServiceAttr::get_password_str_(char *buf, const int64_t buf_size) const +{ + int ret = OB_SUCCESS; + ObSqlString passwd_str; + if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid password str argument", KP(buf), K(buf_size)); + } else if (OB_FAIL(get_password_str_(passwd_str))) { + LOG_WARN("fail to get password str"); + } else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast(passwd_str.length()), passwd_str.ptr()))) { + LOG_WARN("fail to print str"); + } + return ret; +} + +int ObRestoreSourceServiceAttr::get_password_str_(ObSqlString &passwd_str) const +{ + int ret = OB_SUCCESS; + if (OB_FAIL(passwd_str.assign(encrypt_passwd_))) { + LOG_WARN("fail to assign password"); + } + return ret; +} + +int ObRestoreSourceServiceAttr::get_tenant_id_str_(char *buf, const int64_t buf_size) const +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant id str argument", KP(buf), K(buf_size)); + } else if (OB_FAIL(databuff_printf(buf, buf_size, "%ld", user_.tenant_id_))) { + LOG_WARN("fail to print tenant id str", K(user_.tenant_id_)); + } + return ret; +} + +int ObRestoreSourceServiceAttr::get_tenant_id_str_(ObSqlString &tenant_str) const +{ + int ret = OB_SUCCESS; + if (OB_FAIL(tenant_str.assign_fmt("%ld", user_.tenant_id_))) { + LOG_WARN("fail to assign tenant id str", K(user_.tenant_id_)); + } + return ret; +} + +int ObRestoreSourceServiceAttr::get_cluster_id_str_(char *buf, const int64_t buf_size) const +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid cluster id str argument", KP(buf), K(buf_size)); + } else if (OB_FAIL(databuff_printf(buf, buf_size, "%ld", user_.cluster_id_))) { + LOG_WARN("fail to print cluster id str", K(user_.cluster_id_)); + } + return ret; +} + +int ObRestoreSourceServiceAttr::get_cluster_id_str_(ObSqlString &cluster_str) const +{ + int ret = OB_SUCCESS; + if (OB_FAIL(cluster_str.assign_fmt("%ld", user_.cluster_id_))) { + LOG_WARN("fail to assign cluster id str", K(user_.cluster_id_)); + } + return ret; +} + +int ObRestoreSourceServiceAttr::get_compatibility_mode_str_(char *buf, const int64_t buf_size) const +{ + int ret = OB_SUCCESS; + ObSqlString tmp_str; + if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid compatibilidy mode str argument", KP(buf), K(buf_size)); + } else if (OB_FAIL(get_compatibility_mode_str_(tmp_str))) { + LOG_WARN("fail to get compatibility mode str"); + } else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast(tmp_str.length()), tmp_str.ptr()))) { + LOG_WARN("fail to print compatibility mode str", K(tmp_str)); + } + return ret; +} + +int ObRestoreSourceServiceAttr::get_compatibility_mode_str_(ObSqlString &compatibility_str) const +{ + int ret = OB_SUCCESS; + const char *compat_mode = "INVALID"; + if (ObCompatibilityMode::MYSQL_MODE == user_.mode_) { + compat_mode = "MYSQL"; + } else if (ObCompatibilityMode::ORACLE_MODE == user_.mode_) { + compat_mode = "ORACLE"; + } else { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("compatibility mode is invalid", K(user_.mode_)); + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(compatibility_str.assign_fmt("%s", compat_mode))) { + LOG_WARN("fail to assign compatibility mode", K(compat_mode)); + } + return ret; +} + +int ObRestoreSourceServiceAttr::get_is_encrypted_str_(char *buf, const int64_t buf_size) const +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid encrypted str argument", KP(buf), K(buf_size)); + } else if (OB_FAIL(databuff_printf(buf, buf_size, "%s", "false"))) { + LOG_WARN("fail to print encrypted str"); + } + return ret; +} + +int ObRestoreSourceServiceAttr::get_is_encrypted_str_(ObSqlString &encrypted_str) const +{ + int ret = OB_SUCCESS; + if (OB_FAIL(encrypted_str.assign("false"))) { + LOG_WARN("fail to print str"); + } + return ret; +} + + +int ObRestoreSourceServiceAttr::get_password(char *passwd, const int64_t buf_size) const +{ + int ret = OB_SUCCESS; + char tmp_passwd[OB_MAX_PASSWORD_LENGTH + 1] = { 0 }; + if (OB_ISNULL(passwd) || OB_UNLIKELY((buf_size <= 0) || (buf_size < OB_MAX_PASSWORD_LENGTH))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid parameter when get password", K(passwd), K(STRLEN(passwd))); + } else if (OB_FAIL(databuff_printf(passwd, buf_size, "%s", encrypt_passwd_))) { + LOG_WARN("failed to print encrypt_passwd_ key", K(encrypt_passwd_)); + } + return ret; +} + +bool ObRestoreSourceServiceAttr::compare_addr_(common::ObArray addr) const +{ + int bret = true; + int ret = OB_SUCCESS; + if (addr.size() != this->addr_.size()) { + bret = false; + } else { + ARRAY_FOREACH_N(addr, idx, cnt) { + bool tmp_cmp = false; + ARRAY_FOREACH_N(this->addr_, idx1, cnt1) { + if (addr.at(idx) == this->addr_.at(idx1)) { + tmp_cmp = true; + } + } + if (!tmp_cmp) { + bret = false; + break; + } + } + } + return bret; +} + +int ObRestoreSourceServiceAttr::check_restore_source_is_self_(bool &is_self, uint64_t tenant_id) const +{ + int ret = OB_SUCCESS; + is_self = false; + int64_t curr_cluster_id = GCONF.cluster_id; + + // assume that cluster id is managed by + if (tenant_id == user_.tenant_id_ && curr_cluster_id == user_.cluster_id_) { + is_self = true; + LOG_WARN("set standby itself as log restore source is not allowed"); + } + LOG_INFO("check restore is self succ", K(tenant_id), K(user_.tenant_id_), K(curr_cluster_id), K(user_.cluster_id_)); + return ret; +} + +bool ObRestoreSourceServiceAttr::operator==(const ObRestoreSourceServiceAttr &other) const +{ + return (this->user_ == other.user_) + && (STRLEN(this->encrypt_passwd_) == STRLEN(other.encrypt_passwd_)) + && (0 == STRCMP(this->encrypt_passwd_, other.encrypt_passwd_)) + && compare_addr_(other.addr_); +} + +int ObRestoreSourceServiceAttr::assign(const ObRestoreSourceServiceAttr &attr) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!attr.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(attr)); + } else if (FALSE_IT(addr_.reset())) { + } else if (OB_FAIL(addr_.assign(attr.addr_))) { + LOG_WARN("addr_ assign failed", K(attr)); + } else if (OB_FAIL(user_.assign(attr.user_))) { + LOG_WARN("user_ assign failed", K(attr)); + } else if (OB_FAIL(databuff_printf(encrypt_passwd_, sizeof(attr.encrypt_passwd_), "%s", attr.encrypt_passwd_))) { + LOG_WARN("passwd_ assign failed", K(attr)); + } + return ret; +} + +ObRestoreSourceLocationPrimaryAttr::ObRestoreSourceLocationPrimaryAttr() + : tenant_id_(OB_INVALID_TENANT_ID), + cluster_id_(OB_INVALID_CLUSTER_ID) +{ + location_.reset(); +} + +int ObRestoreSourceLocationPrimaryAttr::parse_location_attr_from_str(const common::ObString &value) +{ + int ret = OB_SUCCESS; + char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 }; + char *token = nullptr; + char *saveptr = nullptr; + char *p_end = nullptr; + + if (value.empty()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("location log restore source is empty"); + } else if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", static_cast(value.length()), value.ptr()))) { + LOG_WARN("fail to set config value", K(ret), K(value)); + } else { + token = tmp_str; + for (char *str = token; OB_SUCC(ret); str = nullptr) { + token = ::STRTOK_R(str, ",", &saveptr); + if (nullptr == token) { + break; + } else if (OB_FAIL(do_parse_sub_config_(token))) { + LOG_WARN("fail to do parse location restore source sub config from string", K(token)); + } + } + } + return ret; +} + +int ObRestoreSourceLocationPrimaryAttr::do_parse_sub_config_(const char *sub_value) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(sub_value) || OB_UNLIKELY(0 == STRLEN(sub_value) || STRLEN(sub_value) > OB_MAX_BACKUP_DEST_LENGTH)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("location log restore source primary attr sub value is invalid", K(sub_value)); + } else { + char tmp_str[OB_MAX_BACKUP_DEST_LENGTH + 1] = { 0 }; + char *token = nullptr; + char *saveptr = nullptr; + if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%s", sub_value))) { + LOG_WARN("fail to print sub_value", K(sub_value)); + } else if (OB_ISNULL(token = ::STRTOK_R(tmp_str, "=", &saveptr))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("fail to split sub_value str", K(token), KP(tmp_str)); + } else if (0 == STRCASECMP(token, OB_STR_LOCATION)) { + if (OB_FAIL(set_location_path_(saveptr))) { + LOG_WARN("fail to do parse location path", K(token), K(saveptr)); + } + } else if (0 == STRCASECMP(token, OB_STR_CLUSTER_ID)) { + if (OB_FAIL(set_location_cluster_id_(saveptr))) { + LOG_WARN("fail to do parse location primary cluster id", K(token), K(saveptr)); + } + } else if (0 == STRCASECMP(token, OB_STR_TENANT_ID)) { + if (OB_FAIL(set_location_tenant_id_(saveptr))) { + LOG_WARN("fail to do parse location primary tenant id", K(token), K(saveptr)); + } + } else { + ret = OB_NOT_SUPPORTED; + LOG_WARN("location log restore source does not support this sub config", K(token)); + } + } + return ret; +} + +int ObRestoreSourceLocationPrimaryAttr::set_location_path_(const char *path) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(path) || OB_UNLIKELY(0 == STRLEN(path) || STRLEN(path) > OB_MAX_BACKUP_DEST_LENGTH)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("location path is invalid", K(path)); + } else if (OB_FAIL(location_.assign(path))) { + LOG_WARN("fail to assign location path", K(path)); + } + return ret; +} + +int ObRestoreSourceLocationPrimaryAttr::set_location_cluster_id_(const char *cluster_id_str) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(cluster_id_str) + || OB_UNLIKELY(0 == STRLEN(cluster_id_str) || STRLEN(cluster_id_str) > OB_MAX_BACKUP_DEST_LENGTH)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("location cluster id str is invalid", K(cluster_id_str)); + } else if (OB_FAIL(ob_atoll(cluster_id_str, cluster_id_))) { + LOG_WARN("fail to set location primary cluster id from string", K(cluster_id_str)); + } + return ret; +} + +int ObRestoreSourceLocationPrimaryAttr::set_location_tenant_id_(const char *tenant_id_str) +{ + int ret = OB_SUCCESS; + char *p_end = nullptr; + if (OB_ISNULL(tenant_id_str) || OB_UNLIKELY(0 == STRLEN(tenant_id_str))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("location tenant id is invalid", K(tenant_id_str)); + } else if (OB_FAIL(ob_strtoull(tenant_id_str, p_end, tenant_id_))) { + LOG_WARN("fail to set location primary tenant id from string", K(tenant_id_str)); + } + return ret; +} + +bool ObRestoreSourceLocationPrimaryAttr::is_valid() +{ + return tenant_id_ != OB_INVALID_TENANT_ID + && cluster_id_ != OB_INVALID_CLUSTER_ID + && location_.is_valid(); +} \ No newline at end of file diff --git a/src/share/backup/ob_log_restore_struct.h b/src/share/backup/ob_log_restore_struct.h new file mode 100644 index 000000000..eafc8fa1a --- /dev/null +++ b/src/share/backup/ob_log_restore_struct.h @@ -0,0 +1,113 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_SHARE_BACKUP_OB_LOG_RESTORE_STRUCT_H_ +#define OCEANBASE_SHARE_BACKUP_OB_LOG_RESTORE_STRUCT_H_ + +#include "lib/container/ob_array.h" +#include "share/scn.h" +#include "ob_backup_struct.h" + +namespace oceanbase +{ +namespace share +{ + +struct ObRestoreSourceServiceUser final +{ + ObRestoreSourceServiceUser(); + ~ObRestoreSourceServiceUser() {} + void reset(); + bool is_valid() const; + int assign(const ObRestoreSourceServiceUser &user); + char user_name_[OB_MAX_USER_NAME_LENGTH]; + char tenant_name_[OB_MAX_ORIGINAL_NANE_LENGTH]; + ObCompatibilityMode mode_; + uint64_t tenant_id_; + int64_t cluster_id_; + bool operator == (const ObRestoreSourceServiceUser &other) const; + TO_STRING_KV(K_(user_name), K_(tenant_name), K_(tenant_id), K_(cluster_id)); +}; + +struct ObRestoreSourceServiceAttr final +{ + ObRestoreSourceServiceAttr(); + ~ObRestoreSourceServiceAttr() {} + void reset(); + int parse_service_attr_from_str(ObSqlString &str); + int do_parse_sub_service_attr(const char *sub_value); + int set_service_user_config(const char *user_tenant); + int set_service_user(const char *user, const char *tenant); + int set_service_tenant_id(const char *tenant_id); + int set_service_cluster_id(const char *cluster_id); + int set_service_compatibility_mode(const char *compatibility_mode); + // It need to convert password to encrypted password when pasre from log_restore_source config. + int set_service_passwd_to_encrypt(const char *passwd); + // There's no need to convert password to encrypted password when parse from __all_log_restore_source record. + int set_service_passwd_no_encrypt(const char *passwd); + int parse_ip_port_from_str(const char *buf, const char *delimiter); + bool is_valid() const; + bool service_user_is_valid() const; + bool service_host_is_valid() const; + bool service_password_is_valid() const; + int gen_config_items(common::ObIArray &items) const; + int gen_service_attr_str(char *buf, const int64_t buf_size) const; + int gen_service_attr_str(ObSqlString &str) const; + int get_ip_list_str_(char *buf, const int64_t buf_size) const; + int get_ip_list_str_(ObSqlString &str) const; + int get_user_str_(char *buf, const int64_t buf_size) const; + int get_user_str_(ObSqlString &str) const; + int get_password_str_(char *buf, const int64_t buf_size) const; + int get_password_str_(ObSqlString &str) const; + int get_tenant_id_str_(char *buf ,const int64_t buf_size) const; + int get_tenant_id_str_(ObSqlString &str) const; + int get_cluster_id_str_(char *buf, const int64_t buf_size) const; + int get_cluster_id_str_(ObSqlString &str) const; + int get_compatibility_mode_str_(char *buf, const int64_t buf_size) const; + int get_compatibility_mode_str_(ObSqlString &str) const; + int get_is_encrypted_str_(char *buf, const int64_t buf_size) const; + int get_is_encrypted_str_(ObSqlString &str) const; + int set_encrypt_password_key_(const char *encrypt_key); + int get_decrypt_password_key_(char *unencrypt_key, const int64_t buf_size) const; + // return the origion password + int get_password(char *passwd, const int64_t buf_size) const; + bool compare_addr_(common::ObArray addr) const; + int check_restore_source_is_self_(bool &is_self, uint64_t tenant_id) const; + bool operator ==(const ObRestoreSourceServiceAttr &other) const; + int assign(const ObRestoreSourceServiceAttr &attr); + TO_STRING_KV(K_(addr), K_(user), K_(encrypt_passwd)); + common::ObArray addr_; + ObRestoreSourceServiceUser user_; + char encrypt_passwd_[OB_MAX_BACKUP_SERIALIZEKEY_LENGTH]; +}; + +struct ObRestoreSourceLocationPrimaryAttr final +{ + ObRestoreSourceLocationPrimaryAttr(); + ~ObRestoreSourceLocationPrimaryAttr() {} + void reset(); + uint64_t tenant_id_; + int64_t cluster_id_; + ObSqlString location_; + int parse_location_attr_from_str(const common::ObString &value); + int do_parse_sub_config_(const char *sub_value); + int set_location_path_(const char *path); + int set_location_cluster_id_(const char *cluster_id_str); + int set_location_tenant_id_(const char *tenant_id_str); + bool is_valid(); + TO_STRING_KV(K_(tenant_id), K_(cluster_id), K_(location)); +}; + +}//share +}//oceanbase + +#endif /* OCEANBASE_SHARE_BACKUP_OB_LOG_RESTORE_STRUCT_H_ */ \ No newline at end of file diff --git a/src/share/restore/ob_log_restore_source_mgr.cpp b/src/share/restore/ob_log_restore_source_mgr.cpp index 660c4c1f5..db218cc20 100644 --- a/src/share/restore/ob_log_restore_source_mgr.cpp +++ b/src/share/restore/ob_log_restore_source_mgr.cpp @@ -14,12 +14,14 @@ #include "lib/restore/ob_storage.h" #include "lib/utility/ob_macro_utils.h" #include "share/backup/ob_backup_struct.h" +#include "share/backup/ob_log_restore_struct.h" #include "ob_log_restore_source_mgr.h" #include "lib/ob_define.h" #include "lib/ob_errno.h" #include "lib/net/ob_addr.h" #include "lib/oblog/ob_log_module.h" #include "lib/string/ob_string.h" + using namespace oceanbase::share; int ObLogRestoreSourceMgr::init(const uint64_t tenant_id, ObISQLClient *proxy) { @@ -105,7 +107,10 @@ int ObLogRestoreSourceMgr::add_location_source(const SCN &recovery_until_scn, { int ret = OB_SUCCESS; ObBackupDest dest; - char dest_buf[OB_MAX_BACKUP_DEST_LENGTH] = {0}; + const int64_t MAX_RESTORE_CLUSTER_STR = 10 + 16; // CLUSTER_ID=4294967295 + const int64_t MAX_RESTORE_TENANT_STR = 20 + 16; // TENANT_ID=18446744073709551615 + const int64_t MAX_RESTORE_LOCAION_STR = OB_MAX_BACKUP_DEST_LENGTH + 16; // LOCATION=file:///data/xxxx + char dest_buf[MAX_RESTORE_LOCAION_STR + MAX_RESTORE_CLUSTER_STR + MAX_RESTORE_TENANT_STR] = { 0 }; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObLogRestoreSourceMgr not init", K(ret), K(is_inited_)); @@ -115,8 +120,9 @@ int ObLogRestoreSourceMgr::add_location_source(const SCN &recovery_until_scn, } else if (OB_FAIL(dest.set(archive_dest.ptr()))) { // use backup dest to manage oss key LOG_WARN("set backup dest failed", K(ret), K(archive_dest)); - } else if (OB_FAIL(dest.get_backup_dest_str(dest_buf, OB_MAX_BACKUP_DEST_LENGTH))) { - LOG_WARN("get backup dest str failed", K(ret), K(dest)); + } else if (OB_FAIL(dest.get_backup_dest_str_with_primary_attr(dest_buf, sizeof(dest_buf)))) { + // store primary cluster id and tenant id in log restore source + LOG_WARN("get backup dest str with primary attr failed", K(ret), K(dest)); } else { ObLogRestoreSourceItem item(tenant_id_, OB_DEFAULT_LOG_RESTORE_SOURCE_ID, @@ -126,7 +132,7 @@ int ObLogRestoreSourceMgr::add_location_source(const SCN &recovery_until_scn, if (OB_FAIL(table_operator_.insert_source(item))) { LOG_WARN("table_operator_ insert_source failed", K(ret), K(item)); } else { - LOG_INFO("add location source succ", K(recovery_until_scn), K(archive_dest)); + LOG_INFO("add location source succ", K(recovery_until_scn), K(archive_dest), K(item)); } } return ret; @@ -174,10 +180,13 @@ int ObLogRestoreSourceMgr::get_source_for_update(ObLogRestoreSourceItem &item, c int ObLogRestoreSourceMgr::get_backup_dest(const ObLogRestoreSourceItem &item, ObBackupDest &dest) { int ret = OB_SUCCESS; + ObRestoreSourceLocationPrimaryAttr location_attr; if (OB_UNLIKELY(! item.is_valid() || ! is_location_log_source_type(item.type_))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(item)); - } else if OB_FAIL(dest.set(item.value_)) { + } else if (OB_FAIL(location_attr.parse_location_attr_from_str(item.value_))) { + LOG_WARN("parse location attr from string failed", K(item)); + } else if OB_FAIL(dest.set(location_attr.location_.ptr())) { LOG_WARN("backup dest set failed", K(ret), K(item)); } return ret; diff --git a/unittest/logservice/test_net_standby_restore_source.cpp b/unittest/logservice/test_net_standby_restore_source.cpp index 28498cfc6..bb19043ef 100644 --- a/unittest/logservice/test_net_standby_restore_source.cpp +++ b/unittest/logservice/test_net_standby_restore_source.cpp @@ -10,7 +10,7 @@ * See the Mulan PubL v2 for more details. */ -#include "src/share/backup/ob_backup_struct.h" +#include "src/share/backup/ob_log_restore_struct.h" #include namespace oceanbase