[4.2][Standby] Optimize some processes when set log restore source

This commit is contained in:
obdev 2023-07-14 09:18:22 +00:00 committed by ob-robot
parent c0d2550b80
commit 8adcff8a8d
14 changed files with 1471 additions and 1146 deletions

View File

@ -21,7 +21,7 @@
#include "logservice/logfetcher/ob_log_fetcher_ls_ctx_additional_info_factory.h"
#include "logservice/logfetcher/ob_log_fetcher_err_handler.h"
#include "logservice/logfetcher/ob_log_fetcher_ls_ctx_default_factory.h"
#include "share/backup/ob_backup_struct.h" // ObRestoreSourceServiceAttr
#include "share/backup/ob_log_restore_struct.h" // ObRestoreSourceServiceAttr
#include "share/ob_log_restore_proxy.h" // ObLogRestoreProxyUtil
#include "ob_log_restore_driver_base.h"
#include "ob_restore_log_function.h" // ObRestoreLogFunction

View File

@ -22,6 +22,7 @@
#include "logservice/palf/lsn.h"
#include "share/scn.h"
#include "share/backup/ob_backup_struct.h" // ObBackupPathString
#include "share/backup/ob_log_restore_struct.h"
#include "share/ob_define.h"
#include "share/ob_ls_id.h"
#include "share/restore/ob_log_restore_source.h" // ObLogRestoreSourceType

View File

@ -20,7 +20,7 @@
#include "lib/lock/ob_spin_lock.h" //ObSpinLock
#include "storage/tx/ob_multi_data_source.h" //ObTxBufferNode
#include "src/share/restore/ob_log_restore_source.h" //ObLogRestoreSourceItem
#include "src/share/backup/ob_backup_struct.h" //ObRestoreSourceServiceAttr
#include "src/share/backup/ob_log_restore_struct.h" //ObRestoreSourceServiceAttr
#include "share/restore/ob_log_restore_source_mgr.h" //ObLogRestoreSourceMgr
#include "share/ob_log_restore_proxy.h" // ObLogRestoreProxyUtil

View File

@ -46,6 +46,8 @@ ob_set_subtarget(ob_share backup
backup/ob_archive_checkpoint.cpp
backup/ob_archive_path.cpp
backup/ob_backup_config.cpp
backup/ob_log_restore_config.cpp
backup/ob_log_restore_struct.cpp
)
ob_set_subtarget(ob_share cache

View File

@ -12,6 +12,7 @@
#define USING_LOG_PREFIX SHARE
#include "ob_backup_config.h"
#include "ob_log_restore_config.h"
#include "ob_backup_data_table_operator.h"
#include "ob_backup_helper.h"
#include "ob_backup_store.h"
@ -838,342 +839,3 @@ int ObLogArchiveDestStateConfigParser::check_before_update_inner_config(obrpc::O
return ret;
}
int ObLogRestoreSourceLocationConfigParser::update_inner_config_table(common::ObISQLClient &trans)
{
int ret = OB_SUCCESS;
ObBackupDestMgr dest_mgr;
ObLogRestoreSourceMgr restore_source_mgr;
ObAllTenantInfo tenant_info;
if (!type_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid type", KR(ret), KPC(this));
} else if (OB_FAIL(restore_source_mgr.init(tenant_id_, &trans))) {
LOG_WARN("failed to init restore_source_mgr", KR(ret), KPC(this));
} else if (is_empty_) {
if (OB_FAIL(restore_source_mgr.delete_source())) {
LOG_WARN("failed to delete restore source", KR(ret), KPC(this));
}
} else {
if (!archive_dest_.is_dest_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid restore source", KR(ret), KPC(this));
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "log_restore_source");
} else if (OB_FAIL(archive_dest_.gen_path_config_items(config_items_))) {
LOG_WARN("fail to gen archive config items", KR(ret), KPC(this));
} else {
ObString key_string = ObString::make_string(config_items_.at(0).key_.ptr());
ObString path_string = ObString::make_string(OB_STR_PATH);
ObString value_string = ObString::make_string(config_items_.at(0).value_.ptr());
if (1 != config_items_.count()
|| path_string != key_string
|| config_items_.at(0).value_.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid archive source", KR(ret), KPC(this));
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set log_restore_source");
} else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id_, &trans,
true /* for update */, tenant_info))) {
LOG_WARN("failed to load tenant info", KR(ret), K_(tenant_id));
} else if (OB_FAIL(restore_source_mgr.add_location_source(tenant_info.get_recovery_until_scn(),
value_string))) {
LOG_WARN("failed to add log restore source", KR(ret), K(tenant_info), K(value_string), KPC(this));
}
}
}
return ret;
}
int ObLogRestoreSourceLocationConfigParser::check_before_update_inner_config(
obrpc::ObSrvRpcProxy &rpc_proxy,
common::ObISQLClient &trans)
{
int ret = OB_SUCCESS;
if (is_empty_) {
} else if (!type_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid parser", KR(ret), KPC(this));
}
//TODO (zbf271370) need supports format check, checks whether the directory exists, and is a log backup
//TODO (wenjinyu.wjy) 4.3 need support access permission check
//
return ret;
}
int ObLogRestoreSourceLocationConfigParser::do_parse_sub_config_(const common::ObString &config_str)
{
int ret = OB_SUCCESS;
const char *target= nullptr;
if (config_str.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("empty log restore source is not allowed", KR(ret), K(config_str));
} else {
char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 };
char *token = nullptr;
char *saveptr = nullptr;
char *p_end = nullptr;
if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", config_str.length(), config_str.ptr()))) {
LOG_WARN("fail to set config value", KR(ret), K(config_str));
} else if (OB_ISNULL(token = ::STRTOK_R(tmp_str, "=", &saveptr))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to split config str", K(ret), KP(token));
} else if (OB_FALSE_IT(str_tolower(token, strlen(token)))) {
} else if (0 == STRCASECMP(token, OB_STR_LOCATION)) {
if (OB_FAIL(do_parse_log_archive_dest_(token, saveptr))) {
LOG_WARN("fail to do parse log archive dest", KR(ret), K(token), K(saveptr));
}
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("log restore source does not has this config", KR(ret), K(token));
}
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::parse_from(const common::ObSqlString &value)
{
int ret = OB_SUCCESS;
char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 };
char *token = nullptr;
char *saveptr = nullptr;
is_empty_ = false;
if (value.empty()) {
is_empty_ = true;
} else if (value.length() > OB_MAX_BACKUP_DEST_LENGTH) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("config value is too long");
} else if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", static_cast<int>(value.length()), value.ptr()))) {
LOG_WARN("fail to set config value", K(value));
} else {
token = tmp_str;
for (char *str = token; OB_SUCC(ret); str = nullptr) {
token = ::STRTOK_R(str, " ", &saveptr);
if (nullptr == token) {
break;
} else if (OB_FAIL(do_parse_sub_config_(token))) {
LOG_WARN("fail to do parse log restore source server sub config");
}
}
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::update_inner_config_table(common::ObISQLClient &trans)
{
int ret = OB_SUCCESS;
ObLogRestoreSourceMgr restore_source_mgr;
ObAllTenantInfo tenant_info;
if (!type_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid type", KPC(this));
} else if (OB_FAIL(restore_source_mgr.init(tenant_id_, &trans))) {
LOG_WARN("failed to init restore_source_mgr", KPC(this));
} else if (is_empty_) {
if (OB_FAIL(restore_source_mgr.delete_source())) {
LOG_WARN("failed to delete restore source", KPC(this));
}
} else if (!service_attr_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid restore source", KPC(this));
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "log_restore_source");
} else if (OB_FAIL(service_attr_.gen_config_items(config_items_))) {
LOG_WARN("fail to gen restore source service config items", KPC(this));
} else {
/*
eg: "ip_list=127.0.0.1:1001;127.0.0.1:1002,USER=restore_user@primary_tenant,PASSWORD=xxxxxxx(密码),TENANT_ID=1002,CLUSTER_ID=10001,COMPATIBILITY_MODE=MYSQL,IS_ENCRYPTED=false"
"ip_list=127.0.0.1:1001;127.0.0.1:1002,USER=restore_user@primary_tenant,PASSWORD=xxxxxxx(加密后密码),TENANT_ID=1002,CLUSTER_ID=10001,COMPATIBILITY_MODE=MYSQL,IS_ENCRYPTED=true"
*/
char value_string[OB_MAX_BACKUP_DEST_LENGTH] = { 0 };
if (config_items_.empty() || OB_MAX_RESTORE_SOURCE_SERVICE_CONFIG_LEN != config_items_.count()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid restore source", KPC(this));
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "log_restore_source");
} else if (OB_FAIL(service_attr_.gen_service_attr_str(value_string, OB_MAX_BACKUP_DEST_LENGTH))) {
LOG_WARN("failed gen service attr str", K_(tenant_id));
} else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id_, &trans,
true /* for update */, tenant_info))) {
LOG_WARN("failed to load tenant info", K_(tenant_id));
} else if (OB_FAIL(restore_source_mgr.add_service_source(tenant_info.get_recovery_until_scn(),
value_string))) {
LOG_WARN("failed to add log restore source", K(tenant_info), K(value_string), KPC(this));
}
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::check_before_update_inner_config(obrpc::ObSrvRpcProxy &rpc_proxy, common::ObISQLClient &trans)
{
int ret = OB_SUCCESS;
ObCompatibilityMode compat_mode = ObCompatibilityMode::OCEANBASE_MODE;
if (is_empty_) {
} else if (!type_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid parser", KR(ret), KPC(this));
} else if (OB_FAIL(check_before_update_inner_config(false /* for_verify */, compat_mode))) {
LOG_WARN("fail to check before update inner config");
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::check_before_update_inner_config(
const bool for_verify,
ObCompatibilityMode &compat_mode)
{
int ret = OB_SUCCESS;
char passwd[OB_MAX_PASSWORD_LENGTH + 1] = { 0 }; //unencrypted password
ObSqlString user_and_tenant;
compat_mode = ObCompatibilityMode::OCEANBASE_MODE;
SMART_VAR(ObLogRestoreProxyUtil, proxy) {
if (is_empty_) {
} else if (!type_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid parser", KPC(this));
} else if (0 == STRLEN(service_attr_.encrypt_passwd_)
|| 0 == STRLEN(service_attr_.user_.user_name_)
|| 0 == STRLEN(service_attr_.user_.tenant_name_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("fail to parse log restore source config, please check the config parameters");
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "parse log restore source config, please check the config parameters");
} else if (OB_FAIL(service_attr_.get_password(passwd, sizeof(passwd)))) {
LOG_WARN("get servcie attr password failed");
} else if (OB_FAIL(service_attr_.get_user_str_(user_and_tenant))) {
LOG_WARN("get user str failed", K(service_attr_.user_.user_name_), K(service_attr_.user_.tenant_name_));
} else if (OB_FAIL(proxy.try_init(tenant_id_/*standby*/, service_attr_.addr_, user_and_tenant.ptr(), passwd))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("proxy connect to primary db failed", K(service_attr_.addr_), K(user_and_tenant));
} else if (OB_FAIL(proxy.get_tenant_id(service_attr_.user_.tenant_name_, service_attr_.user_.tenant_id_))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get primary tenant id failed", K(tenant_id_), K(service_attr_.user_));
} else if (OB_FAIL(proxy.get_cluster_id(service_attr_.user_.tenant_id_, service_attr_.user_.cluster_id_))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get primary cluster id failed", K(tenant_id_), K(service_attr_.user_.tenant_id_));
} else if (OB_FAIL(proxy.get_compatibility_mode(service_attr_.user_.tenant_id_, service_attr_.user_.mode_))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get primary compatibility mode failed", K(tenant_id_), K(service_attr_.user_.tenant_id_));
} else if (for_verify && OB_FAIL(proxy.check_begin_lsn(service_attr_.user_.tenant_id_))) {
LOG_WARN("check_begin_lsn failed", K(tenant_id_), K(service_attr_.user_.tenant_id_));
} else {
compat_mode = service_attr_.user_.mode_;
LOG_INFO("check_before_update_inner_config success", K(tenant_id_), K(service_attr_), K(compat_mode));
}
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::get_compatibility_mode(common::ObCompatibilityMode &compatibility_mode)
{
int ret = OB_SUCCESS;
compatibility_mode = OCEANBASE_MODE;
if (is_empty_) {
} else if (!type_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid parser", KPC(this));
} else if (!service_attr_.user_.is_valid()) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KPC(this));
} else {
compatibility_mode = service_attr_.user_.mode_;
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::do_parse_sub_config_(const common::ObString &config_str)
{
int ret = OB_SUCCESS;
if (config_str.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("empty log restore source sub config is not allowed", K(config_str));
} else {
char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 };
char *token = nullptr;
char *saveptr = nullptr;
if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", config_str.length(), config_str.ptr()))) {
LOG_WARN("fail to set config value", K(config_str));
} else if (OB_ISNULL(token = ::STRTOK_R(tmp_str, "=", &saveptr))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to split config str", KP(token));
} else if (OB_FALSE_IT(str_tolower(token, strlen(token)))) {
} else if (0 == STRCASECMP(token, OB_STR_SERVICE)) {
if (OB_FAIL(do_parse_restore_service_host_(token, saveptr))) {
LOG_WARN("fail to do parse restore service host", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_USER)) {
if (OB_FAIL(do_parse_restore_service_user_(token, saveptr))) {
LOG_WARN("fail to do parse restore service user", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_PASSWORD)) {
if (OB_FAIL(do_parse_restore_service_passwd_(token, saveptr))) {
LOG_WARN("fail to do parse restore service passwd", K(token), K(saveptr));
}
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("log restore source does not has this config", K(token));
}
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::do_parse_restore_service_host_(const common::ObString &name, const
common::ObString &value)
{
int ret = OB_SUCCESS;
if (name.empty() || value.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid log restore source service host config", K(name), K(value));
} else if (OB_FAIL(service_attr_.parse_ip_port_from_str(value.ptr(), ";" /*delimiter*/))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("parse restore source service host failed", K(name), K(value));
}
if (OB_FAIL(ret)) {
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set ip list config, please check the length and format of ip list");
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::do_parse_restore_service_user_(const common::ObString &name, const
common::ObString &value)
{
int ret = OB_SUCCESS;
if (name.empty() || value.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid log restore source service user config", K(name), K(value));
} else {
char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 };
char *user_token = nullptr;
char *tenant_token = nullptr;
if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", value.length(), value.ptr()))) {
LOG_WARN("fail to set user config value", K(value));
} else if (OB_FAIL(service_attr_.set_service_user_config(tmp_str))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("parse restore source service user failed", K(name), K(value));
}
}
if (OB_FAIL(ret)) {
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set user config, please check the length and format of username, tenant name");
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::do_parse_restore_service_passwd_(const common::ObString &name, const
common::ObString &value)
{
int ret = OB_SUCCESS;
if (name.empty() || value.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid log restore source service password config", K(name), K(value));
} else if (OB_FAIL(service_attr_.set_service_passwd_to_encrypt(value.ptr()))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("parse restore source service password failed", K(name), K(value));
}
if (OB_FAIL(ret)) {
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set password config, please check the length and format of password");
}
return ret;
}

View File

@ -21,6 +21,7 @@
#include "ob_backup_struct.h"
#include "share/restore/ob_log_restore_source.h"
#include "share/backup/ob_backup_store.h"
#include "ob_log_restore_struct.h"
namespace oceanbase
{
@ -216,49 +217,6 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObLogArchiveDestStateConfigParser);
};
class ObLogRestoreSourceLocationConfigParser : public ObLogArchiveDestConfigParser
{
public:
ObLogRestoreSourceLocationConfigParser(const ObBackupConfigType::Type &type, const uint64_t tenant_id, const int64_t dest_no)
: ObLogArchiveDestConfigParser(type, tenant_id, dest_no) {}
virtual ~ObLogRestoreSourceLocationConfigParser() {}
virtual int update_inner_config_table(common::ObISQLClient &trans) override;
virtual int check_before_update_inner_config(obrpc::ObSrvRpcProxy &rpc_proxy, common::ObISQLClient &trans) override;
protected:
virtual int do_parse_sub_config_(const common::ObString &config_str) override;
private:
DISALLOW_COPY_AND_ASSIGN(ObLogRestoreSourceLocationConfigParser);
};
class ObLogRestoreSourceServiceConfigParser : public ObIBackupConfigItemParser
{
public:
ObLogRestoreSourceServiceConfigParser(const ObBackupConfigType::Type &type, const uint64_t tenant_id)
: ObIBackupConfigItemParser(type, tenant_id) {}
virtual ~ObLogRestoreSourceServiceConfigParser() {}
virtual int parse_from(const common::ObSqlString &value) override;
virtual int update_inner_config_table(common::ObISQLClient &trans) override;
virtual int check_before_update_inner_config(obrpc::ObSrvRpcProxy &rpc_proxy, common::ObISQLClient &trans) override;
int check_before_update_inner_config(
const bool for_verify,
ObCompatibilityMode &compat_mode);
virtual int get_compatibility_mode(common::ObCompatibilityMode &compatibility_mode);
private:
int do_parse_sub_config_(const common::ObString &config_str);
int do_parse_restore_service_host_(const common::ObString &name, const common::ObString &value);
int do_parse_restore_service_user_(const common::ObString &name, const common::ObString &value);
int do_parse_restore_service_passwd_(const common::ObString &name, const common::ObString &value);
int check_doing_service_restore_(common::ObISQLClient &trans, bool &is_doing);
int check_source_service_connect_(); // todo
int update_data_backup_dest_config_(common::ObISQLClient &trans);
private:
ObRestoreSourceServiceAttr service_attr_;
bool is_empty_;
DISALLOW_COPY_AND_ASSIGN(ObLogRestoreSourceServiceConfigParser);
};
}
}

View File

@ -1771,6 +1771,42 @@ int ObBackupDest::get_backup_dest_str(char *buf, const int64_t buf_size) const
return ret;
}
int ObBackupDest::get_backup_dest_str_with_primary_attr(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
char storage_info_str[OB_MAX_BACKUP_STORAGE_INFO_LENGTH] = { 0 };
share::ObBackupStore backup_store;
share::ObBackupFormatDesc desc;
ObSqlString persist_str;
if (!is_valid()) {
ret = OB_NOT_INIT;
LOG_WARN("backup dest is not init", K(ret), K(*this));
} else if (OB_ISNULL(buf) || buf_size < share::OB_MAX_BACKUP_DEST_LENGTH) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), KP(buf), K(buf_size));
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%s", root_path_))) {
LOG_WARN("failed to get backup dest str", K(ret), K(root_path_), K(storage_info_));
} else if (OB_FAIL(storage_info_->get_storage_info_str(storage_info_str, sizeof(storage_info_str), true/*no need encrypt*/))) {
OB_LOG(WARN, "fail to get storage info str!", K(ret), K(storage_info_));
} else if (0 != strlen(storage_info_str) && OB_FAIL(databuff_printf(buf + strlen(buf), buf_size - strlen(buf), "?%s",storage_info_str))) {
LOG_WARN("failed to get backup dest str", K(ret), K(root_path_), K(storage_info_));
} else if (OB_FAIL(backup_store.init(*this))) {
LOG_WARN("fail to init backup store", K(this));
} else if (OB_FAIL(backup_store.read_format_file(desc))) {
LOG_WARN("backup store read format file failed", K(this));
} else if (OB_UNLIKELY(! desc.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("backup store desc is invalid", K(desc));
} else if (OB_FAIL(persist_str.assign_fmt("LOCATION=%s,TENANT_ID=%ld,CLUSTER_ID=%ld",
buf, desc.tenant_id_, desc.cluster_id_))) {
LOG_WARN("fail to assign persist str", K(this), K(desc));
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast<int>(persist_str.length()), persist_str.ptr()))) {
LOG_WARN("fail to print persist str", K(persist_str));
}
return ret;
}
int64_t ObBackupDest::to_string(char *buf, int64_t buf_len) const
{
int64_t pos = 0;
@ -4330,695 +4366,6 @@ int ObLogArchiveDestAtrr::assign(const ObLogArchiveDestAtrr& that)
return ret;
}
ObRestoreSourceServiceUser::ObRestoreSourceServiceUser()
: user_name_(),
tenant_name_(),
mode_(ObCompatibilityMode::OCEANBASE_MODE),
tenant_id_(OB_INVALID_TENANT_ID),
cluster_id_(OB_INVALID_CLUSTER_ID)
{
user_name_[0] = '\0';
tenant_name_[0] = '\0';
}
void ObRestoreSourceServiceUser::reset()
{
user_name_[0] = '\0';
tenant_name_[0] = '\0';
mode_ = ObCompatibilityMode::OCEANBASE_MODE;
tenant_id_ = OB_INVALID_TENANT_ID;
cluster_id_ = OB_INVALID_CLUSTER_ID;
}
bool ObRestoreSourceServiceUser::is_valid() const
{
return STRLEN(user_name_) != 0
&& STRLEN(tenant_name_) != 0
&& (ObCompatibilityMode::OCEANBASE_MODE != mode_)
&& (tenant_id_ != OB_INVALID_TENANT_ID)
&& (cluster_id_ != OB_INVALID_CLUSTER_ID);
}
bool ObRestoreSourceServiceUser::operator== (const ObRestoreSourceServiceUser &other) const
{
return (STRLEN(this->user_name_) == STRLEN(other.user_name_))
&& (STRLEN(this->tenant_name_) == STRLEN(other.tenant_name_))
&& (0 == STRCMP(this->user_name_, other.user_name_))
&& (0 == STRCMP(this->tenant_name_, other.tenant_name_))
&& this->mode_ == other.mode_
&& this->tenant_id_ == other.tenant_id_
&& this->cluster_id_ == other.cluster_id_;
}
int ObRestoreSourceServiceUser::assign(const ObRestoreSourceServiceUser &user)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!user.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(user));
} else if (OB_FAIL(databuff_printf(user_name_, sizeof(user_name_), "%s", user.user_name_))) {
LOG_WARN("user_name_ assign failed", K(user));
} else if (OB_FAIL(databuff_printf(tenant_name_, sizeof(tenant_name_), "%s", user.tenant_name_))) {
LOG_WARN("tenant_name_ assign failed", K(user));
} else {
mode_ = user.mode_;
tenant_id_ = user.tenant_id_;
cluster_id_ = user.cluster_id_;
}
return ret;
}
ObRestoreSourceServiceAttr::ObRestoreSourceServiceAttr()
: addr_(),
user_()
{
encrypt_passwd_[0] = '\0';
}
void ObRestoreSourceServiceAttr::reset()
{
addr_.reset();
user_.reset();
encrypt_passwd_[0] = '\0';
}
/*
parse service attr from string
eg: "ip_list=127.0.0.1:1001;127.0.0.1:1002,USER=restore_user@primary_tenant,PASSWORD=xxxxxx,TENANT_ID=1002,
CLUSTER_ID=10001,COMPATIBILITY_MODE=MYSQL,IS_ENCRYPTED=true"
*/
int ObRestoreSourceServiceAttr::parse_service_attr_from_str(ObSqlString &value)
{
int ret = OB_SUCCESS;
char tmp_str[OB_MAX_BACKUP_DEST_LENGTH + 1] = { 0 };
char *token = nullptr;
char *saveptr = nullptr;
if (OB_UNLIKELY(value.empty() || value.length() > OB_MAX_BACKUP_DEST_LENGTH)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("log restore source attr value is invalid");
} else if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", static_cast<int>(value.length()), value.ptr()))) {
LOG_WARN("fail to print attr value", K(value));
} else {
token = tmp_str;
for (char *str = token; OB_SUCC(ret); str = nullptr) {
token = ::STRTOK_R(str, ",", &saveptr);
if (nullptr == token) {
break;
} else if (OB_FAIL(do_parse_sub_service_attr(token))) {
LOG_WARN("fail to parse service attr str", K(token));
}
}
}
return ret;
}
int ObRestoreSourceServiceAttr::do_parse_sub_service_attr(const char *sub_value)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(sub_value) || OB_UNLIKELY(0 == STRLEN(sub_value) || STRLEN(sub_value) > OB_MAX_BACKUP_DEST_LENGTH)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("log restore source service attr sub value is invalid", K(sub_value));
} else {
char tmp_str[OB_MAX_BACKUP_DEST_LENGTH + 1] = { 0 };
char *token = nullptr;
char *saveptr = nullptr;
if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%s", sub_value))) {
LOG_WARN("fail to print sub_value", K(sub_value));
} else if (OB_ISNULL(token = ::STRTOK_R(tmp_str, "=", &saveptr))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("fail to split sub_value str", K(token), KP(tmp_str));
} else if (OB_FALSE_IT(str_tolower(token, strlen(token)))) {
} else if (0 == STRCASECMP(token, OB_STR_IP_LIST)) {
if (OB_FAIL(parse_ip_port_from_str(saveptr, ";"/*delimiter*/))) {
LOG_WARN("fail to parse ip list from str", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_USER)) {
if (OB_FAIL(set_service_user_config(saveptr))) {
LOG_WARN("fail to set restore service user and tenant", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_TENANT_ID)) {
if (OB_FAIL(set_service_tenant_id(saveptr))) {
LOG_WARN("fail to set restore service user and tenant", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_CLUSTER_ID)) {
if (OB_FAIL(set_service_cluster_id(saveptr))) {
LOG_WARN("fail to set restore service user and tenant", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_COMPATIBILITY_MODE)) {
if (OB_FAIL(set_service_compatibility_mode(saveptr))) {
LOG_WARN("fail to set restore service compatibility mode", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_PASSWORD)) {
if (OB_FAIL(set_service_passwd_no_encrypt(saveptr))) {
LOG_WARN("fail to set restore service passwd", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_IS_ENCRYPTED)) {
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("log restore source service do not have this config", K(token));
}
}
return ret;
}
int ObRestoreSourceServiceAttr::set_service_user_config(const char *user_tenant)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(user_tenant) || OB_UNLIKELY(0 == STRLEN(user_tenant)
|| STRLEN(user_tenant) > OB_MAX_RESTORE_USER_AND_TENANT_LEN)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("log restore source service user is invalid");
} else {
char *user_name = nullptr;
char *tenant_name = nullptr;
char tmp_str[OB_MAX_RESTORE_USER_AND_TENANT_LEN + 1] = { 0 };
if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%s", user_tenant))) {
LOG_WARN("fail to print user_tenant", K(user_tenant));
} else if (OB_ISNULL(user_name = ::STRTOK_R(tmp_str, "@", &tenant_name))) {
LOG_WARN("fail to split user_tenant", K(tmp_str));
} else if (OB_FAIL(set_service_user(user_name, tenant_name))) {
LOG_WARN("fail to set service user", K(user_name), K(tenant_name));
}
}
return ret;
}
int ObRestoreSourceServiceAttr::set_service_user(const char *user, const char *tenant)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(user) || OB_ISNULL(tenant) || OB_UNLIKELY(0 == STRLEN(user) || 0 == STRLEN(tenant)
|| STRLEN(user) > OB_MAX_USER_NAME_LENGTH || STRLEN(tenant) > OB_MAX_ORIGINAL_NANE_LENGTH)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("log restore source service user name or tenant name is invalid");
} else if (OB_FAIL(databuff_printf(user_.user_name_, sizeof(user_.user_name_), "%s", user))) {
LOG_WARN("fail to print user name", K(user));
} else if (OB_FAIL(databuff_printf(user_.tenant_name_, sizeof(user_.tenant_name_), "%s", tenant))) {
LOG_WARN("fail to print tenant name", K(tenant));
}
LOG_DEBUG("set service user", K(user), K(tenant), K(user_));
return ret;
}
int ObRestoreSourceServiceAttr::set_service_tenant_id(const char *tenant_id_str)
{
int ret = OB_SUCCESS;
char *p_end = nullptr;
if (OB_FAIL(ob_strtoull(tenant_id_str, p_end, user_.tenant_id_))) {
LOG_WARN("fail to set service tenant id from string", K(tenant_id_str));
}
return ret;
}
int ObRestoreSourceServiceAttr::set_service_cluster_id(const char *cluster_id_str)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ob_atoll(cluster_id_str, user_.cluster_id_))) {
LOG_WARN("fail to set service cluster id from string", K(cluster_id_str));
}
return ret;
}
int ObRestoreSourceServiceAttr::set_service_compatibility_mode(const char *compat_mode)
{
int ret = OB_SUCCESS;
char token[OB_MAX_COMPAT_MODE_STR_LEN + 1] = { 0 };
if (OB_ISNULL(compat_mode) || OB_UNLIKELY(0 == STRLEN(compat_mode)
|| STRLEN(compat_mode) > OB_MAX_COMPAT_MODE_STR_LEN)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("log restore source service compat_mode is invalid");
} else if (OB_FAIL(databuff_printf(token, sizeof(token), "%s", compat_mode))) {
LOG_WARN("fail to print compat mode", K(compat_mode));
} else if (OB_FALSE_IT(str_toupper(token, STRLEN(token)))) {
} else if ((0 == STRCASECMP(token, "MYSQL"))) {
user_.mode_ = ObCompatibilityMode::MYSQL_MODE;
} else if ((0 == STRCASECMP(token, "ORACLE"))) {
user_.mode_ = ObCompatibilityMode::ORACLE_MODE;
} else {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid compatibility mode", K(compat_mode), K(ret));
}
return ret;
}
int ObRestoreSourceServiceAttr::set_service_passwd_to_encrypt(const char *passwd)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(passwd) || OB_UNLIKELY(0 == STRLEN(passwd) || STRLEN(passwd) > OB_MAX_PASSWORD_LENGTH)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument");
} else if (OB_FAIL(databuff_printf(encrypt_passwd_, sizeof(encrypt_passwd_), "%s", passwd))) {
LOG_WARN("fail to print encrypt password");
}
LOG_INFO("set service password success");
return ret;
}
int ObRestoreSourceServiceAttr::set_service_passwd_no_encrypt(const char *passwd)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(passwd) || OB_UNLIKELY(0 == STRLEN(passwd) || STRLEN(passwd) > OB_MAX_PASSWORD_LENGTH)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument");
} else if (OB_FAIL(databuff_printf(encrypt_passwd_, sizeof(encrypt_passwd_), "%s", passwd))) {
LOG_WARN("fail to print encrypt password");
}
return ret;
}
// 127.0.0.1:1000;127.0.0.1:1001;127.0.0.1:1002 ==> 127.0.0.1:1000 127.0.0.1:1001 127.0.0.1:1002
int ObRestoreSourceServiceAttr::parse_ip_port_from_str(const char *ip_list, const char *delimiter)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(ip_list) || OB_ISNULL(delimiter) || OB_UNLIKELY(STRLEN(ip_list) > OB_MAX_RESTORE_SOURCE_IP_LIST_LEN)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("log restore source service ip list is invalid");
}
char tmp_str[OB_MAX_RESTORE_SOURCE_IP_LIST_LEN + 1] = { 0 };
char *token = nullptr;
char *saveptr = nullptr;
if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%s", ip_list))) {
LOG_WARN("fail to get ip list", K(ip_list));
} else {
token = tmp_str;
for (char *str = token; OB_SUCC(ret); str = nullptr) {
token = ::STRTOK_R(str, delimiter, &saveptr);
if (nullptr == token) {
break;
} else {
ObAddr addr;
if (OB_FAIL(addr.parse_from_string(ObString(token)))) {
LOG_WARN("fail to parse addr", K(addr), K(token));
} else if (!addr.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("service addr is invalid", K(addr), K(ip_list));
} else if (OB_FAIL(addr_.push_back(addr))){
LOG_WARN("fail to push addr", K(addr));
}
}
}
}
return ret;
}
bool ObRestoreSourceServiceAttr::is_valid() const
{
return service_user_is_valid()
&& service_host_is_valid()
&& service_password_is_valid();
}
bool ObRestoreSourceServiceAttr::service_user_is_valid() const
{
return user_.is_valid();
}
bool ObRestoreSourceServiceAttr::service_host_is_valid() const
{
return !addr_.empty();
}
bool ObRestoreSourceServiceAttr::service_password_is_valid() const
{
return strlen(encrypt_passwd_) != 0;
}
int ObRestoreSourceServiceAttr::gen_config_items(common::ObIArray<BackupConfigItemPair> &items) const
{
int ret = OB_SUCCESS;
BackupConfigItemPair config;
if (!is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguement", KPC(this));
}
ObSqlString tmp;
// gen ip list config
if (OB_FAIL(ret)) {
} else if (OB_FAIL(config.key_.assign(OB_STR_IP_LIST))) {
LOG_WARN("failed to assign ip list key");
} else if (OB_FAIL(get_ip_list_str_(tmp))) {
LOG_WARN("failed to get ip list str");
} else if (OB_FAIL(config.value_.assign(tmp.ptr()))) {
LOG_WARN("failed to assign ip list value");
} else if(OB_FAIL(items.push_back(config))) {
LOG_WARN("failed to push service source attr config", K(config));
}
// gen user config
if (OB_FAIL(ret)) {
} else if (OB_FAIL(config.key_.assign(OB_STR_USER))) {
LOG_WARN("failed to assign user key");
} else if (OB_FAIL(get_user_str_(tmp))) {
LOG_WARN("failed to get user str");
} else if (OB_FAIL(config.value_.assign(tmp.ptr()))) {
LOG_WARN("failed to assign user value");
} else if (OB_FAIL(items.push_back(config))) {
LOG_WARN("failed to push service source attr config", K(config));
}
// gen password config
if (OB_FAIL(ret)) {
} else if (OB_FAIL(config.key_.assign(OB_STR_PASSWORD))) {
LOG_WARN("failed to assign password key");
} else if (OB_FAIL(get_password_str_(tmp))) {
LOG_WARN("failed to get password str");
} else if (OB_FAIL(config.value_.assign(tmp.ptr()))) {
LOG_WARN("failed to assign password value");
} else if (OB_FAIL(items.push_back(config))) {
LOG_WARN("failed to push service source attr config", K(config));
}
// gen tenant id
if (OB_FAIL(ret)) {
} else if (OB_FAIL(config.key_.assign(OB_STR_TENANT_ID))) {
LOG_WARN("failed to assign tenant id key");
} else if (OB_FAIL(get_tenant_id_str_(tmp))) {
LOG_WARN("failed to get tenant id str");
} else if (OB_FAIL(config.value_.assign(tmp.ptr()))) {
LOG_WARN("failed to assign tenant id value");
} else if (OB_FAIL(items.push_back(config))) {
LOG_WARN("failed to push service source attr config", K(config));
}
// gen cluster id
if (OB_FAIL(ret)) {
} else if (OB_FAIL(config.key_.assign(OB_STR_CLUSTER_ID))) {
LOG_WARN("failed to assign cluster id key");
} else if (OB_FAIL(get_cluster_id_str_(tmp))) {
LOG_WARN("failed to get cluster id str");
} else if (OB_FAIL(config.value_.assign(tmp.ptr()))) {
LOG_WARN("failed to assign cluster id value");
} else if (OB_FAIL(items.push_back(config))) {
LOG_WARN("failed to push service source attr config", K(config));
}
// gen compatibility mode
if (OB_FAIL(ret)) {
} else if (OB_FAIL(config.key_.assign(OB_COMPATIBILITY_MODE))) {
LOG_WARN("failed to assign compatibility mode");
} else if (OB_FAIL(get_compatibility_mode_str_(tmp))) {
LOG_WARN("failed to get compatibility mode str");
} else if (OB_FAIL(config.value_.assign(tmp.ptr()))) {
LOG_WARN("failed to assign compatibility mode value");
} else if (OB_FAIL(items.push_back(config))) {
LOG_WARN("failed to push service source attr config", K(config));
}
// gen is encrypted
if (OB_FAIL(ret)) {
} else if (OB_FAIL(config.key_.assign(OB_STR_IS_ENCRYPTED))) {
LOG_WARN("failed to assign encrypted key");
} else if (OB_FAIL(get_is_encrypted_str_(tmp))) {
LOG_WARN("failed to get is encrypted str");
} else if (OB_FAIL(config.value_.assign(tmp.ptr()))) {
LOG_WARN("failed to assign encrypted value");
} else if (OB_FAIL(items.push_back(config))) {
LOG_WARN("failed to push service source attr config", K(config));
}
return ret;
}
int ObRestoreSourceServiceAttr::gen_service_attr_str(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
ObSqlString str;
ObSqlString ip_str;
ObSqlString user_str;
ObSqlString passwd_str;
ObSqlString compat_str;
ObSqlString is_encrypted_str;
if (OB_UNLIKELY(!is_valid() || OB_ISNULL(buf) || buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid service attr argument", KP(buf), K(buf_size));
} else if (OB_FAIL(get_ip_list_str_(ip_str))) {
LOG_WARN("get ip list str failed");
} else if (OB_FAIL(get_user_str_(user_str))) {
LOG_WARN("get user str failed");
} else if (OB_FAIL(get_password_str_(passwd_str))) {
LOG_WARN("get password str failed");
} else if (OB_FAIL(get_compatibility_mode_str_(compat_str))) {
LOG_WARN("get compatibility mode str failed");
} else if (OB_FAIL(get_is_encrypted_str_(is_encrypted_str))) {
LOG_WARN("get encrypted str failed");
} else if (OB_FAIL(str.assign_fmt("IP_LIST=%s,USER=%s,PASSWORD=%s,TENANT_ID=%ld,CLUSTER_ID=%ld,COMPATIBILITY_MODE=%s,IS_ENCRYPTED=%s",
ip_str.ptr(), user_str.ptr(), passwd_str.ptr(), user_.tenant_id_, user_.cluster_id_, compat_str.ptr(), is_encrypted_str.ptr()))) {
LOG_WARN("fail to assign str", K(compat_str), K(is_encrypted_str));
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast<int>(str.length()), str.ptr()))) {
LOG_WARN("fail to print str", K(str));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_ip_list_str_(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
ObSqlString str;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid ip list argument", KP(buf), K(buf_size));
} else if (OB_FAIL(get_ip_list_str_(str))) {
LOG_WARN("get ip list str failed");
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast<int>(str.length()), str.ptr()))) {
LOG_WARN("fail to print str", K(str));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_ip_list_str_(ObSqlString &str) const
{
int ret = OB_SUCCESS;
ARRAY_FOREACH_N(addr_, idx, cnt) {
char ip_str[MAX_IP_PORT_LENGTH] = { 0 };
const ObAddr ip = addr_.at(idx);
if (OB_FAIL(ip.ip_port_to_string(ip_str, sizeof(ip_str)))) {
LOG_WARN("fail to convert ip port to string", K(ip), K(ip_str));
} else {
if (0 == idx && (OB_FAIL(str.assign_fmt("%s", ip_str)))) {
LOG_WARN("fail to assign ip str", K(str) ,K(ip), K(ip_str));
} else if ( 0 != idx && OB_FAIL(str.append_fmt(";%s", ip_str))) {
LOG_WARN("fail to append ip str", K(str), K(ip), K(ip_str));
}
}
}
LOG_DEBUG("get ip list str", K(str));
return ret;
}
int ObRestoreSourceServiceAttr::get_user_str_(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
ObSqlString res_str;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid user str argument", KP(buf), K(buf_size));
} else if (OB_FAIL(get_user_str_(res_str))) {
LOG_WARN("fail to get user str");
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast<int>(res_str.length()), res_str.ptr()))) {
LOG_WARN("fail to print str");
}
return ret;
}
int ObRestoreSourceServiceAttr::get_user_str_(ObSqlString &user_str) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(user_str.assign(user_.user_name_))) {
LOG_WARN("fail to assign user name" ,K(user_.user_name_));
} else if (OB_FAIL(user_str.append_fmt("@%s",user_.tenant_name_))) {
LOG_WARN("fail to assign tenant name", K(user_.user_name_));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_password_str_(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
ObSqlString passwd_str;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid password str argument", KP(buf), K(buf_size));
} else if (OB_FAIL(get_password_str_(passwd_str))) {
LOG_WARN("fail to get password str");
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast<int>(passwd_str.length()), passwd_str.ptr()))) {
LOG_WARN("fail to print str");
}
return ret;
}
int ObRestoreSourceServiceAttr::get_password_str_(ObSqlString &passwd_str) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(passwd_str.assign(encrypt_passwd_))) {
LOG_WARN("fail to assign password");
}
return ret;
}
int ObRestoreSourceServiceAttr::get_tenant_id_str_(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant id str argument", KP(buf), K(buf_size));
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%ld", user_.tenant_id_))) {
LOG_WARN("fail to print tenant id str", K(user_.tenant_id_));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_tenant_id_str_(ObSqlString &tenant_str) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(tenant_str.assign_fmt("%ld", user_.tenant_id_))) {
LOG_WARN("fail to assign tenant id str", K(user_.tenant_id_));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_cluster_id_str_(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid cluster id str argument", KP(buf), K(buf_size));
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%ld", user_.cluster_id_))) {
LOG_WARN("fail to print cluster id str", K(user_.cluster_id_));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_cluster_id_str_(ObSqlString &cluster_str) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(cluster_str.assign_fmt("%ld", user_.cluster_id_))) {
LOG_WARN("fail to assign cluster id str", K(user_.cluster_id_));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_compatibility_mode_str_(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
ObSqlString tmp_str;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid compatibilidy mode str argument", KP(buf), K(buf_size));
} else if (OB_FAIL(get_compatibility_mode_str_(tmp_str))) {
LOG_WARN("fail to get compatibility mode str");
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast<int>(tmp_str.length()), tmp_str.ptr()))) {
LOG_WARN("fail to print compatibility mode str", K(tmp_str));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_compatibility_mode_str_(ObSqlString &compatibility_str) const
{
int ret = OB_SUCCESS;
const char *compat_mode = "INVALID";
if (ObCompatibilityMode::MYSQL_MODE == user_.mode_) {
compat_mode = "MYSQL";
} else if (ObCompatibilityMode::ORACLE_MODE == user_.mode_) {
compat_mode = "ORACLE";
} else {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("compatibility mode is invalid", K(user_.mode_));
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(compatibility_str.assign_fmt("%s", compat_mode))) {
LOG_WARN("fail to assign compatibility mode", K(compat_mode));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_is_encrypted_str_(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid encrypted str argument", KP(buf), K(buf_size));
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%s", "false"))) {
LOG_WARN("fail to print encrypted str");
}
return ret;
}
int ObRestoreSourceServiceAttr::get_is_encrypted_str_(ObSqlString &encrypted_str) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(encrypted_str.assign("false"))) {
LOG_WARN("fail to print str");
}
return ret;
}
int ObRestoreSourceServiceAttr::get_password(char *passwd, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
char tmp_passwd[OB_MAX_PASSWORD_LENGTH + 1] = { 0 };
if (OB_ISNULL(passwd) || OB_UNLIKELY((buf_size <= 0) || (buf_size < OB_MAX_PASSWORD_LENGTH))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid parameter when get password", K(passwd), K(STRLEN(passwd)));
} else if (OB_FAIL(databuff_printf(passwd, buf_size, "%s", encrypt_passwd_))) {
LOG_WARN("failed to print encrypt_passwd_ key", K(encrypt_passwd_));
}
return ret;
}
bool ObRestoreSourceServiceAttr::compare_addr_(common::ObArray<common::ObAddr> addr) const
{
int bret = true;
int ret = OB_SUCCESS;
if (addr.size() != this->addr_.size()) {
bret = false;
} else {
ARRAY_FOREACH_N(addr, idx, cnt) {
bool tmp_cmp = false;
ARRAY_FOREACH_N(this->addr_, idx1, cnt1) {
if (addr.at(idx) == this->addr_.at(idx1)) {
tmp_cmp = true;
}
}
if (!tmp_cmp) {
bret = false;
break;
}
}
}
return bret;
}
bool ObRestoreSourceServiceAttr::operator==(const ObRestoreSourceServiceAttr &other) const
{
return (this->user_ == other.user_)
&& (STRLEN(this->encrypt_passwd_) == STRLEN(other.encrypt_passwd_))
&& (0 == STRCMP(this->encrypt_passwd_, other.encrypt_passwd_))
&& compare_addr_(other.addr_);
}
int ObRestoreSourceServiceAttr::assign(const ObRestoreSourceServiceAttr &attr)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!attr.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(attr));
} else if (FALSE_IT(addr_.reset())) {
} else if (OB_FAIL(addr_.assign(attr.addr_))) {
LOG_WARN("addr_ assign failed", K(attr));
} else if (OB_FAIL(user_.assign(attr.user_))) {
LOG_WARN("user_ assign failed", K(attr));
} else if (OB_FAIL(databuff_printf(encrypt_passwd_, sizeof(attr.encrypt_passwd_), "%s", attr.encrypt_passwd_))) {
LOG_WARN("passwd_ assign failed", K(attr));
}
return ret;
}
int share::trim_right_backslash(ObBackupPathString &path)
{
int ret = OB_SUCCESS;

View File

@ -910,6 +910,7 @@ public:
bool is_root_path_equal(const ObBackupDest &backup_dest) const;
int is_backup_path_equal(const ObBackupDest &backup_dest, bool &is_equal) const;
int get_backup_dest_str(char *buf, const int64_t buf_size) const;
int get_backup_dest_str_with_primary_attr(char *buf, const int64_t buf_size) const;
int get_backup_path_str(char *buf, const int64_t buf_size) const;
common::ObString get_root_path() const { return root_path_;}
share::ObBackupStorageInfo *get_storage_info() const { return storage_info_;}
@ -1705,73 +1706,6 @@ struct ObLogArchiveDestAtrr final
ObLogArchiveDestState state_;
};
struct ObRestoreSourceServiceUser final
{
ObRestoreSourceServiceUser();
~ObRestoreSourceServiceUser() {}
void reset();
bool is_valid() const;
int assign(const ObRestoreSourceServiceUser &user);
char user_name_[OB_MAX_USER_NAME_LENGTH];
char tenant_name_[OB_MAX_ORIGINAL_NANE_LENGTH];
ObCompatibilityMode mode_;
uint64_t tenant_id_;
int64_t cluster_id_;
bool operator == (const ObRestoreSourceServiceUser &other) const;
TO_STRING_KV(K_(user_name), K_(tenant_name), K_(tenant_id), K_(cluster_id));
};
struct ObRestoreSourceServiceAttr final
{
ObRestoreSourceServiceAttr();
~ObRestoreSourceServiceAttr() {}
void reset();
int parse_service_attr_from_str(ObSqlString &str);
int do_parse_sub_service_attr(const char *sub_value);
int set_service_user_config(const char *user_tenant);
int set_service_user(const char *user, const char *tenant);
int set_service_tenant_id(const char *tenant_id);
int set_service_cluster_id(const char *cluster_id);
int set_service_compatibility_mode(const char *compatibility_mode);
// It need to convert password to encrypted password when pasre from log_restore_source config.
int set_service_passwd_to_encrypt(const char *passwd);
// There's no need to convert password to encrypted password when parse from __all_log_restore_source record.
int set_service_passwd_no_encrypt(const char *passwd);
int parse_ip_port_from_str(const char *buf, const char *delimiter);
bool is_valid() const;
bool service_user_is_valid() const;
bool service_host_is_valid() const;
bool service_password_is_valid() const;
int gen_config_items(common::ObIArray<BackupConfigItemPair> &items) const;
int gen_service_attr_str(char *buf, const int64_t buf_size) const;
int gen_service_attr_str(ObSqlString &str) const;
int get_ip_list_str_(char *buf, const int64_t buf_size) const;
int get_ip_list_str_(ObSqlString &str) const;
int get_user_str_(char *buf, const int64_t buf_size) const;
int get_user_str_(ObSqlString &str) const;
int get_password_str_(char *buf, const int64_t buf_size) const;
int get_password_str_(ObSqlString &str) const;
int get_tenant_id_str_(char *buf ,const int64_t buf_size) const;
int get_tenant_id_str_(ObSqlString &str) const;
int get_cluster_id_str_(char *buf, const int64_t buf_size) const;
int get_cluster_id_str_(ObSqlString &str) const;
int get_compatibility_mode_str_(char *buf, const int64_t buf_size) const;
int get_compatibility_mode_str_(ObSqlString &str) const;
int get_is_encrypted_str_(char *buf, const int64_t buf_size) const;
int get_is_encrypted_str_(ObSqlString &str) const;
int set_encrypt_password_key_(const char *encrypt_key);
int get_decrypt_password_key_(char *unencrypt_key, const int64_t buf_size) const;
// return the origion password
int get_password(char *passwd, const int64_t buf_size) const;
bool compare_addr_(common::ObArray<common::ObAddr> addr) const;
bool operator ==(const ObRestoreSourceServiceAttr &other) const;
int assign(const ObRestoreSourceServiceAttr &attr);
TO_STRING_KV(K_(addr), K_(user), K_(encrypt_passwd));
common::ObArray<common::ObAddr> addr_;
ObRestoreSourceServiceUser user_;
char encrypt_passwd_[OB_MAX_BACKUP_SERIALIZEKEY_LENGTH];
};
// trim '/' from right until encouter a non backslash charactor.
int trim_right_backslash(ObBackupPathString &path);

View File

@ -0,0 +1,384 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SHARE
#include "ob_backup_config.h"
#include "ob_log_restore_config.h"
#include "share/restore/ob_log_restore_source_mgr.h" // ObLogRestoreSourceMgr
#include "share/ob_log_restore_proxy.h" // ObLogRestoreProxyUtil
using namespace oceanbase;
using namespace share;
using namespace common;
int ObLogRestoreSourceLocationConfigParser::update_inner_config_table(common::ObISQLClient &trans)
{
int ret = OB_SUCCESS;
ObBackupDestMgr dest_mgr;
ObLogRestoreSourceMgr restore_source_mgr;
ObAllTenantInfo tenant_info;
if (!type_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid type", KR(ret), KPC(this));
} else if (OB_FAIL(restore_source_mgr.init(tenant_id_, &trans))) {
LOG_WARN("failed to init restore_source_mgr", KR(ret), KPC(this));
} else if (is_empty_) {
if (OB_FAIL(restore_source_mgr.delete_source())) {
LOG_WARN("failed to delete restore source", KR(ret), KPC(this));
}
} else {
if (!archive_dest_.is_dest_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid restore source", KR(ret), KPC(this));
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "log_restore_source");
} else if (OB_FAIL(archive_dest_.gen_path_config_items(config_items_))) {
LOG_WARN("fail to gen archive config items", KR(ret), KPC(this));
} else {
ObString key_string = ObString::make_string(config_items_.at(0).key_.ptr());
ObString path_string = ObString::make_string(OB_STR_PATH);
ObString value_string = ObString::make_string(config_items_.at(0).value_.ptr());
if (1 != config_items_.count()
|| path_string != key_string
|| config_items_.at(0).value_.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid archive source", KR(ret), KPC(this));
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set log_restore_source");
} else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id_, &trans,
true /* for update */, tenant_info))) {
LOG_WARN("failed to load tenant info", KR(ret), K_(tenant_id));
} else if (OB_FAIL(restore_source_mgr.add_location_source(tenant_info.get_recovery_until_scn(),
value_string))) {
LOG_WARN("failed to add log restore source", KR(ret), K(tenant_info), K(value_string), KPC(this));
}
}
}
return ret;
}
int ObLogRestoreSourceLocationConfigParser::check_before_update_inner_config(
obrpc::ObSrvRpcProxy &rpc_proxy,
common::ObISQLClient &trans)
{
int ret = OB_SUCCESS;
if (is_empty_) {
} else if (!type_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid parser", KR(ret), KPC(this));
} else {
share::ObBackupStore backup_store;
share::ObBackupFormatDesc desc;
if (OB_FAIL(backup_store.init(archive_dest_.dest_))) {
LOG_WARN("backup store init failed", K_(archive_dest_.dest));
} else if (OB_FAIL(backup_store.read_format_file(desc))) {
LOG_WARN("backup store read format file failed", K_(archive_dest_.dest));
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "access the log restore source location");
} else if (OB_UNLIKELY(! desc.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("backup store desc is invalid");
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "access the log restore source location");
} else if (GCONF.cluster_id == desc.cluster_id_ && tenant_id_ == desc.tenant_id_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("set standby itself as log restore source is not allowed");
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set standby itself as log restore source");
}
}
//TODO (wenjinyu.wjy) need support access permission check
//
return ret;
}
int ObLogRestoreSourceLocationConfigParser::do_parse_sub_config_(const common::ObString &config_str)
{
int ret = OB_SUCCESS;
const char *target= nullptr;
if (config_str.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("empty log restore source is not allowed", KR(ret), K(config_str));
} else {
char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 };
char *token = nullptr;
char *saveptr = nullptr;
char *p_end = nullptr;
if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", config_str.length(), config_str.ptr()))) {
LOG_WARN("fail to set config value", KR(ret), K(config_str));
} else if (OB_ISNULL(token = ::STRTOK_R(tmp_str, "=", &saveptr))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to split config str", K(ret), KP(token));
} else if (OB_FALSE_IT(str_tolower(token, strlen(token)))) {
} else if (0 == STRCASECMP(token, OB_STR_LOCATION)) {
if (OB_FAIL(do_parse_log_archive_dest_(token, saveptr))) {
LOG_WARN("fail to do parse log archive dest", KR(ret), K(token), K(saveptr));
}
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("log restore source does not has this config", KR(ret), K(token));
}
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::parse_from(const common::ObSqlString &value)
{
int ret = OB_SUCCESS;
char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 };
char *token = nullptr;
char *saveptr = nullptr;
is_empty_ = false;
if (value.empty()) {
is_empty_ = true;
} else if (value.length() > OB_MAX_BACKUP_DEST_LENGTH) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("config value is too long");
} else if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", static_cast<int>(value.length()), value.ptr()))) {
LOG_WARN("fail to set config value", K(value));
} else {
token = tmp_str;
for (char *str = token; OB_SUCC(ret); str = nullptr) {
token = ::STRTOK_R(str, " ", &saveptr);
if (nullptr == token) {
break;
} else if (OB_FAIL(do_parse_sub_config_(token))) {
LOG_WARN("fail to do parse log restore source server sub config");
}
}
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::update_inner_config_table(common::ObISQLClient &trans)
{
int ret = OB_SUCCESS;
ObLogRestoreSourceMgr restore_source_mgr;
ObAllTenantInfo tenant_info;
if (!type_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid type", KPC(this));
} else if (OB_FAIL(restore_source_mgr.init(tenant_id_, &trans))) {
LOG_WARN("failed to init restore_source_mgr", KPC(this));
} else if (is_empty_) {
if (OB_FAIL(restore_source_mgr.delete_source())) {
LOG_WARN("failed to delete restore source", KPC(this));
}
} else if (!service_attr_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid restore source", KPC(this));
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "log_restore_source");
} else if (OB_FAIL(service_attr_.gen_config_items(config_items_))) {
LOG_WARN("fail to gen restore source service config items", KPC(this));
} else {
/*
eg: "ip_list=127.0.0.1:1001;127.0.0.1:1002,USER=restore_user@primary_tenant,PASSWORD=xxxxxxx(密码),TENANT_ID=1002,CLUSTER_ID=10001,COMPATIBILITY_MODE=MYSQL,IS_ENCRYPTED=false"
"ip_list=127.0.0.1:1001;127.0.0.1:1002,USER=restore_user@primary_tenant,PASSWORD=xxxxxxx(加密后密码),TENANT_ID=1002,CLUSTER_ID=10001,COMPATIBILITY_MODE=MYSQL,IS_ENCRYPTED=true"
*/
char value_string[OB_MAX_BACKUP_DEST_LENGTH] = { 0 };
if (config_items_.empty() || OB_MAX_RESTORE_SOURCE_SERVICE_CONFIG_LEN != config_items_.count()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid restore source", KPC(this));
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "log_restore_source");
} else if (OB_FAIL(service_attr_.gen_service_attr_str(value_string, OB_MAX_BACKUP_DEST_LENGTH))) {
LOG_WARN("failed gen service attr str", K_(tenant_id));
} else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(tenant_id_, &trans,
true /* for update */, tenant_info))) {
LOG_WARN("failed to load tenant info", K_(tenant_id));
} else if (OB_FAIL(restore_source_mgr.add_service_source(tenant_info.get_recovery_until_scn(),
value_string))) {
LOG_WARN("failed to add log restore source", K(tenant_info), K(value_string), KPC(this));
}
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::check_before_update_inner_config(obrpc::ObSrvRpcProxy &rpc_proxy, common::ObISQLClient &trans)
{
int ret = OB_SUCCESS;
ObCompatibilityMode compat_mode = ObCompatibilityMode::OCEANBASE_MODE;
if (is_empty_) {
} else if (!type_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid parser", KR(ret), KPC(this));
} else if (OB_FAIL(check_before_update_inner_config(false /* for_verify */, compat_mode))) {
LOG_WARN("fail to check before update inner config");
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::check_before_update_inner_config(
const bool for_verify,
ObCompatibilityMode &compat_mode)
{
int ret = OB_SUCCESS;
char passwd[OB_MAX_PASSWORD_LENGTH + 1] = { 0 }; //unencrypted password
ObSqlString user_and_tenant;
compat_mode = ObCompatibilityMode::OCEANBASE_MODE;
bool source_is_self = false;
SMART_VAR(ObLogRestoreProxyUtil, proxy) {
if (is_empty_) {
} else if (!type_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid parser", KPC(this));
} else if (0 == STRLEN(service_attr_.encrypt_passwd_)
|| 0 == STRLEN(service_attr_.user_.user_name_)
|| 0 == STRLEN(service_attr_.user_.tenant_name_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("fail to parse log restore source config, please check the config parameters");
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "parse log restore source config, please check the config parameters");
} else if (OB_FAIL(service_attr_.get_password(passwd, sizeof(passwd)))) {
LOG_WARN("get servcie attr password failed");
} else if (OB_FAIL(service_attr_.get_user_str_(user_and_tenant))) {
LOG_WARN("get user str failed", K(service_attr_.user_.user_name_), K(service_attr_.user_.tenant_name_));
} else if (OB_FAIL(proxy.try_init(tenant_id_/*standby*/, service_attr_.addr_, user_and_tenant.ptr(), passwd))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("proxy connect to primary db failed", K(service_attr_.addr_), K(user_and_tenant));
} else if (OB_FAIL(proxy.get_tenant_id(service_attr_.user_.tenant_name_, service_attr_.user_.tenant_id_))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get primary tenant id failed", K(tenant_id_), K(service_attr_.user_));
} else if (OB_FAIL(proxy.get_cluster_id(service_attr_.user_.tenant_id_, service_attr_.user_.cluster_id_))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get primary cluster id failed", K(tenant_id_), K(service_attr_.user_.tenant_id_));
} else if (!for_verify && OB_FAIL(service_attr_.check_restore_source_is_self_(source_is_self, tenant_id_))) {
LOG_WARN("check restore source is self failed");
} else if (source_is_self) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("set standby itself as log restore source is not allowed");
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set standby itself as log restore source");
} else if (OB_FAIL(proxy.get_compatibility_mode(service_attr_.user_.tenant_id_, service_attr_.user_.mode_))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get primary compatibility mode failed", K(tenant_id_), K(service_attr_.user_.tenant_id_));
} else if (for_verify && OB_FAIL(proxy.check_begin_lsn(service_attr_.user_.tenant_id_))) {
LOG_WARN("check_begin_lsn failed", K(tenant_id_), K(service_attr_.user_.tenant_id_));
} else {
compat_mode = service_attr_.user_.mode_;
LOG_INFO("check_before_update_inner_config success", K(tenant_id_), K(service_attr_), K(compat_mode));
}
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::get_compatibility_mode(common::ObCompatibilityMode &compatibility_mode)
{
int ret = OB_SUCCESS;
compatibility_mode = OCEANBASE_MODE;
if (is_empty_) {
} else if (!type_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid parser", KPC(this));
} else if (!service_attr_.user_.is_valid()) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KPC(this));
} else {
compatibility_mode = service_attr_.user_.mode_;
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::do_parse_sub_config_(const common::ObString &config_str)
{
int ret = OB_SUCCESS;
if (config_str.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("empty log restore source sub config is not allowed", K(config_str));
} else {
char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 };
char *token = nullptr;
char *saveptr = nullptr;
if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", config_str.length(), config_str.ptr()))) {
LOG_WARN("fail to set config value", K(config_str));
} else if (OB_ISNULL(token = ::STRTOK_R(tmp_str, "=", &saveptr))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to split config str", KP(token));
} else if (OB_FALSE_IT(str_tolower(token, strlen(token)))) {
} else if (0 == STRCASECMP(token, OB_STR_SERVICE)) {
if (OB_FAIL(do_parse_restore_service_host_(token, saveptr))) {
LOG_WARN("fail to do parse restore service host", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_USER)) {
if (OB_FAIL(do_parse_restore_service_user_(token, saveptr))) {
LOG_WARN("fail to do parse restore service user", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_PASSWORD)) {
if (OB_FAIL(do_parse_restore_service_passwd_(token, saveptr))) {
LOG_WARN("fail to do parse restore service passwd", K(token), K(saveptr));
}
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("log restore source does not has this config", K(token));
}
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::do_parse_restore_service_host_(const common::ObString &name, const
common::ObString &value)
{
int ret = OB_SUCCESS;
if (name.empty() || value.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid log restore source service host config", K(name), K(value));
} else if (OB_FAIL(service_attr_.parse_ip_port_from_str(value.ptr(), ";" /*delimiter*/))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("parse restore source service host failed", K(name), K(value));
}
if (OB_FAIL(ret)) {
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set ip list config, please check the length and format of ip list");
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::do_parse_restore_service_user_(const common::ObString &name, const
common::ObString &value)
{
int ret = OB_SUCCESS;
if (name.empty() || value.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid log restore source service user config", K(name), K(value));
} else {
char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 };
char *user_token = nullptr;
char *tenant_token = nullptr;
if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", value.length(), value.ptr()))) {
LOG_WARN("fail to set user config value", K(value));
} else if (OB_FAIL(service_attr_.set_service_user_config(tmp_str))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("parse restore source service user failed", K(name), K(value));
}
}
if (OB_FAIL(ret)) {
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set user config, please check the length and format of username, tenant name");
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::do_parse_restore_service_passwd_(const common::ObString &name, const
common::ObString &value)
{
int ret = OB_SUCCESS;
if (name.empty() || value.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid log restore source service password config", K(name), K(value));
} else if (OB_FAIL(service_attr_.set_service_passwd_to_encrypt(value.ptr()))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("parse restore source service password failed", K(name), K(value));
}
if (OB_FAIL(ret)) {
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "set password config, please check the length and format of password");
}
return ret;
}

View File

@ -0,0 +1,76 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_SHARE_BACKUP_OB_LOG_RESTORE_CONFIG_H_
#define OCEANBASE_SHARE_BACKUP_OB_LOG_RESTORE_CONFIG_H_
#include <utility>
#include "lib/string/ob_sql_string.h"
#include "lib/mysqlclient/ob_mysql_proxy.h"
#include "ob_backup_struct.h"
#include "ob_backup_config.h"
#include "ob_log_restore_struct.h"
namespace oceanbase
{
namespace share
{
class ObLogArchiveDestConfigParser;
class ObBackupConfigType;
class ObIBackupConfigItemParser;
class ObLogRestoreSourceLocationConfigParser : public ObLogArchiveDestConfigParser
{
public:
ObLogRestoreSourceLocationConfigParser(const ObBackupConfigType::Type &type, const uint64_t tenant_id, const int64_t dest_no)
: ObLogArchiveDestConfigParser(type, tenant_id, dest_no) {}
virtual ~ObLogRestoreSourceLocationConfigParser() {}
virtual int update_inner_config_table(common::ObISQLClient &trans) override;
virtual int check_before_update_inner_config(obrpc::ObSrvRpcProxy &rpc_proxy, common::ObISQLClient &trans) override;
protected:
virtual int do_parse_sub_config_(const common::ObString &config_str) override;
private:
DISALLOW_COPY_AND_ASSIGN(ObLogRestoreSourceLocationConfigParser);
};
class ObLogRestoreSourceServiceConfigParser : public ObIBackupConfigItemParser
{
public:
ObLogRestoreSourceServiceConfigParser(const ObBackupConfigType::Type &type, const uint64_t tenant_id)
: ObIBackupConfigItemParser(type, tenant_id) {}
virtual ~ObLogRestoreSourceServiceConfigParser() {}
virtual int parse_from(const common::ObSqlString &value) override;
virtual int update_inner_config_table(common::ObISQLClient &trans) override;
virtual int check_before_update_inner_config(obrpc::ObSrvRpcProxy &rpc_proxy, common::ObISQLClient &trans) override;
int check_before_update_inner_config(
const bool for_verify,
ObCompatibilityMode &compat_mode);
virtual int get_compatibility_mode(common::ObCompatibilityMode &compatibility_mode);
private:
int do_parse_sub_config_(const common::ObString &config_str);
int do_parse_restore_service_host_(const common::ObString &name, const common::ObString &value);
int do_parse_restore_service_user_(const common::ObString &name, const common::ObString &value);
int do_parse_restore_service_passwd_(const common::ObString &name, const common::ObString &value);
int check_doing_service_restore_(common::ObISQLClient &trans, bool &is_doing);
int update_data_backup_dest_config_(common::ObISQLClient &trans);
private:
ObRestoreSourceServiceAttr service_attr_;
bool is_empty_;
DISALLOW_COPY_AND_ASSIGN(ObLogRestoreSourceServiceConfigParser);
};
}//share
}//oceanbase
#endif /* OCEANBASE_SHARE_BACKUP_OB_LOG_RESTORE_CONFIG_H_ */

View File

@ -0,0 +1,839 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SHARE
#include "lib/utility/ob_defer.h"
#include "share/backup/ob_backup_config.h"
#include "ob_log_restore_struct.h"
using namespace oceanbase;
using namespace common;
using namespace share;
ObRestoreSourceServiceUser::ObRestoreSourceServiceUser()
: user_name_(),
tenant_name_(),
mode_(ObCompatibilityMode::OCEANBASE_MODE),
tenant_id_(OB_INVALID_TENANT_ID),
cluster_id_(OB_INVALID_CLUSTER_ID)
{
user_name_[0] = '\0';
tenant_name_[0] = '\0';
}
void ObRestoreSourceServiceUser::reset()
{
user_name_[0] = '\0';
tenant_name_[0] = '\0';
mode_ = ObCompatibilityMode::OCEANBASE_MODE;
tenant_id_ = OB_INVALID_TENANT_ID;
cluster_id_ = OB_INVALID_CLUSTER_ID;
}
bool ObRestoreSourceServiceUser::is_valid() const
{
return STRLEN(user_name_) != 0
&& STRLEN(tenant_name_) != 0
&& (ObCompatibilityMode::OCEANBASE_MODE != mode_)
&& (tenant_id_ != OB_INVALID_TENANT_ID)
&& (cluster_id_ != OB_INVALID_CLUSTER_ID);
}
bool ObRestoreSourceServiceUser::operator== (const ObRestoreSourceServiceUser &other) const
{
return (STRLEN(this->user_name_) == STRLEN(other.user_name_))
&& (STRLEN(this->tenant_name_) == STRLEN(other.tenant_name_))
&& (0 == STRCMP(this->user_name_, other.user_name_))
&& (0 == STRCMP(this->tenant_name_, other.tenant_name_))
&& this->mode_ == other.mode_
&& this->tenant_id_ == other.tenant_id_
&& this->cluster_id_ == other.cluster_id_;
}
int ObRestoreSourceServiceUser::assign(const ObRestoreSourceServiceUser &user)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!user.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(user));
} else if (OB_FAIL(databuff_printf(user_name_, sizeof(user_name_), "%s", user.user_name_))) {
LOG_WARN("user_name_ assign failed", K(user));
} else if (OB_FAIL(databuff_printf(tenant_name_, sizeof(tenant_name_), "%s", user.tenant_name_))) {
LOG_WARN("tenant_name_ assign failed", K(user));
} else {
mode_ = user.mode_;
tenant_id_ = user.tenant_id_;
cluster_id_ = user.cluster_id_;
}
return ret;
}
ObRestoreSourceServiceAttr::ObRestoreSourceServiceAttr()
: addr_(),
user_()
{
encrypt_passwd_[0] = '\0';
}
void ObRestoreSourceServiceAttr::reset()
{
addr_.reset();
user_.reset();
encrypt_passwd_[0] = '\0';
}
/*
parse service attr from string
eg: "ip_list=127.0.0.1:1001;127.0.0.1:1002,USER=restore_user@primary_tenant,PASSWORD=xxxxxx,TENANT_ID=1002,
CLUSTER_ID=10001,COMPATIBILITY_MODE=MYSQL,IS_ENCRYPTED=true"
*/
int ObRestoreSourceServiceAttr::parse_service_attr_from_str(ObSqlString &value)
{
int ret = OB_SUCCESS;
char tmp_str[OB_MAX_BACKUP_DEST_LENGTH + 1] = { 0 };
char *token = nullptr;
char *saveptr = nullptr;
if (OB_UNLIKELY(value.empty() || value.length() > OB_MAX_BACKUP_DEST_LENGTH)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("log restore source attr value is invalid");
} else if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", static_cast<int>(value.length()), value.ptr()))) {
LOG_WARN("fail to print attr value", K(value));
} else {
token = tmp_str;
for (char *str = token; OB_SUCC(ret); str = nullptr) {
token = ::STRTOK_R(str, ",", &saveptr);
if (nullptr == token) {
break;
} else if (OB_FAIL(do_parse_sub_service_attr(token))) {
LOG_WARN("fail to parse service attr str", K(token));
}
}
}
return ret;
}
int ObRestoreSourceServiceAttr::do_parse_sub_service_attr(const char *sub_value)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(sub_value) || OB_UNLIKELY(0 == STRLEN(sub_value) || STRLEN(sub_value) > OB_MAX_BACKUP_DEST_LENGTH)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("log restore source service attr sub value is invalid", K(sub_value));
} else {
char tmp_str[OB_MAX_BACKUP_DEST_LENGTH + 1] = { 0 };
char *token = nullptr;
char *saveptr = nullptr;
if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%s", sub_value))) {
LOG_WARN("fail to print sub_value", K(sub_value));
} else if (OB_ISNULL(token = ::STRTOK_R(tmp_str, "=", &saveptr))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("fail to split sub_value str", K(token), KP(tmp_str));
} else if (OB_FALSE_IT(str_tolower(token, strlen(token)))) {
} else if (0 == STRCASECMP(token, OB_STR_IP_LIST)) {
if (OB_FAIL(parse_ip_port_from_str(saveptr, ";"/*delimiter*/))) {
LOG_WARN("fail to parse ip list from str", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_USER)) {
if (OB_FAIL(set_service_user_config(saveptr))) {
LOG_WARN("fail to set restore service user and tenant", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_TENANT_ID)) {
if (OB_FAIL(set_service_tenant_id(saveptr))) {
LOG_WARN("fail to set restore service user and tenant", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_CLUSTER_ID)) {
if (OB_FAIL(set_service_cluster_id(saveptr))) {
LOG_WARN("fail to set restore service user and tenant", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_COMPATIBILITY_MODE)) {
if (OB_FAIL(set_service_compatibility_mode(saveptr))) {
LOG_WARN("fail to set restore service compatibility mode", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_PASSWORD)) {
if (OB_FAIL(set_service_passwd_no_encrypt(saveptr))) {
LOG_WARN("fail to set restore service passwd", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_IS_ENCRYPTED)) {
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("log restore source service do not have this config", K(token));
}
}
return ret;
}
int ObRestoreSourceServiceAttr::set_service_user_config(const char *user_tenant)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(user_tenant) || OB_UNLIKELY(0 == STRLEN(user_tenant)
|| STRLEN(user_tenant) > OB_MAX_RESTORE_USER_AND_TENANT_LEN)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("log restore source service user is invalid");
} else {
char *user_name = nullptr;
char *tenant_name = nullptr;
char tmp_str[OB_MAX_RESTORE_USER_AND_TENANT_LEN + 1] = { 0 };
if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%s", user_tenant))) {
LOG_WARN("fail to print user_tenant", K(user_tenant));
} else if (OB_ISNULL(user_name = ::STRTOK_R(tmp_str, "@", &tenant_name))) {
LOG_WARN("fail to split user_tenant", K(tmp_str));
} else if (OB_FAIL(set_service_user(user_name, tenant_name))) {
LOG_WARN("fail to set service user", K(user_name), K(tenant_name));
}
}
return ret;
}
int ObRestoreSourceServiceAttr::set_service_user(const char *user, const char *tenant)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(user) || OB_ISNULL(tenant) || OB_UNLIKELY(0 == STRLEN(user) || 0 == STRLEN(tenant)
|| STRLEN(user) > OB_MAX_USER_NAME_LENGTH || STRLEN(tenant) > OB_MAX_ORIGINAL_NANE_LENGTH)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("log restore source service user name or tenant name is invalid");
} else if (OB_FAIL(databuff_printf(user_.user_name_, sizeof(user_.user_name_), "%s", user))) {
LOG_WARN("fail to print user name", K(user));
} else if (OB_FAIL(databuff_printf(user_.tenant_name_, sizeof(user_.tenant_name_), "%s", tenant))) {
LOG_WARN("fail to print tenant name", K(tenant));
}
LOG_DEBUG("set service user", K(user), K(tenant), K(user_));
return ret;
}
int ObRestoreSourceServiceAttr::set_service_tenant_id(const char *tenant_id_str)
{
int ret = OB_SUCCESS;
char *p_end = nullptr;
if (OB_FAIL(ob_strtoull(tenant_id_str, p_end, user_.tenant_id_))) {
LOG_WARN("fail to set service tenant id from string", K(tenant_id_str));
}
return ret;
}
int ObRestoreSourceServiceAttr::set_service_cluster_id(const char *cluster_id_str)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ob_atoll(cluster_id_str, user_.cluster_id_))) {
LOG_WARN("fail to set service cluster id from string", K(cluster_id_str));
}
return ret;
}
int ObRestoreSourceServiceAttr::set_service_compatibility_mode(const char *compat_mode)
{
int ret = OB_SUCCESS;
char token[OB_MAX_COMPAT_MODE_STR_LEN + 1] = { 0 };
if (OB_ISNULL(compat_mode) || OB_UNLIKELY(0 == STRLEN(compat_mode)
|| STRLEN(compat_mode) > OB_MAX_COMPAT_MODE_STR_LEN)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("log restore source service compat_mode is invalid");
} else if (OB_FAIL(databuff_printf(token, sizeof(token), "%s", compat_mode))) {
LOG_WARN("fail to print compat mode", K(compat_mode));
} else if (OB_FALSE_IT(str_toupper(token, STRLEN(token)))) {
} else if ((0 == STRCASECMP(token, "MYSQL"))) {
user_.mode_ = ObCompatibilityMode::MYSQL_MODE;
} else if ((0 == STRCASECMP(token, "ORACLE"))) {
user_.mode_ = ObCompatibilityMode::ORACLE_MODE;
} else {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid compatibility mode", K(compat_mode), K(ret));
}
return ret;
}
int ObRestoreSourceServiceAttr::set_service_passwd_to_encrypt(const char *passwd)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(passwd) || OB_UNLIKELY(0 == STRLEN(passwd) || STRLEN(passwd) > OB_MAX_PASSWORD_LENGTH)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument");
} else if (OB_FAIL(databuff_printf(encrypt_passwd_, sizeof(encrypt_passwd_), "%s", passwd))) {
LOG_WARN("fail to print encrypt password");
}
LOG_INFO("set service password success");
return ret;
}
int ObRestoreSourceServiceAttr::set_service_passwd_no_encrypt(const char *passwd)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(passwd) || OB_UNLIKELY(0 == STRLEN(passwd) || STRLEN(passwd) > OB_MAX_PASSWORD_LENGTH)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument");
} else if (OB_FAIL(databuff_printf(encrypt_passwd_, sizeof(encrypt_passwd_), "%s", passwd))) {
LOG_WARN("fail to print encrypt password");
}
return ret;
}
// 127.0.0.1:1000;127.0.0.1:1001;127.0.0.1:1002 ==> 127.0.0.1:1000 127.0.0.1:1001 127.0.0.1:1002
int ObRestoreSourceServiceAttr::parse_ip_port_from_str(const char *ip_list, const char *delimiter)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(ip_list) || OB_ISNULL(delimiter) || OB_UNLIKELY(STRLEN(ip_list) > OB_MAX_RESTORE_SOURCE_IP_LIST_LEN)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("log restore source service ip list is invalid");
}
char tmp_str[OB_MAX_RESTORE_SOURCE_IP_LIST_LEN + 1] = { 0 };
char *token = nullptr;
char *saveptr = nullptr;
if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%s", ip_list))) {
LOG_WARN("fail to get ip list", K(ip_list));
} else {
token = tmp_str;
for (char *str = token; OB_SUCC(ret); str = nullptr) {
token = ::STRTOK_R(str, delimiter, &saveptr);
if (nullptr == token) {
break;
} else {
ObAddr addr;
if (OB_FAIL(addr.parse_from_string(ObString(token)))) {
LOG_WARN("fail to parse addr", K(addr), K(token));
} else if (!addr.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("service addr is invalid", K(addr), K(ip_list));
} else if (OB_FAIL(addr_.push_back(addr))){
LOG_WARN("fail to push addr", K(addr));
}
}
}
}
return ret;
}
bool ObRestoreSourceServiceAttr::is_valid() const
{
return service_user_is_valid()
&& service_host_is_valid()
&& service_password_is_valid();
}
bool ObRestoreSourceServiceAttr::service_user_is_valid() const
{
return user_.is_valid();
}
bool ObRestoreSourceServiceAttr::service_host_is_valid() const
{
return !addr_.empty();
}
bool ObRestoreSourceServiceAttr::service_password_is_valid() const
{
return strlen(encrypt_passwd_) != 0;
}
int ObRestoreSourceServiceAttr::gen_config_items(common::ObIArray<BackupConfigItemPair> &items) const
{
int ret = OB_SUCCESS;
BackupConfigItemPair config;
if (!is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguement", KPC(this));
}
ObSqlString tmp;
// gen ip list config
if (OB_FAIL(ret)) {
} else if (OB_FAIL(config.key_.assign(OB_STR_IP_LIST))) {
LOG_WARN("failed to assign ip list key");
} else if (OB_FAIL(get_ip_list_str_(tmp))) {
LOG_WARN("failed to get ip list str");
} else if (OB_FAIL(config.value_.assign(tmp.ptr()))) {
LOG_WARN("failed to assign ip list value");
} else if(OB_FAIL(items.push_back(config))) {
LOG_WARN("failed to push service source attr config", K(config));
}
// gen user config
if (OB_FAIL(ret)) {
} else if (OB_FAIL(config.key_.assign(OB_STR_USER))) {
LOG_WARN("failed to assign user key");
} else if (OB_FAIL(get_user_str_(tmp))) {
LOG_WARN("failed to get user str");
} else if (OB_FAIL(config.value_.assign(tmp.ptr()))) {
LOG_WARN("failed to assign user value");
} else if (OB_FAIL(items.push_back(config))) {
LOG_WARN("failed to push service source attr config", K(config));
}
// gen password config
if (OB_FAIL(ret)) {
} else if (OB_FAIL(config.key_.assign(OB_STR_PASSWORD))) {
LOG_WARN("failed to assign password key");
} else if (OB_FAIL(get_password_str_(tmp))) {
LOG_WARN("failed to get password str");
} else if (OB_FAIL(config.value_.assign(tmp.ptr()))) {
LOG_WARN("failed to assign password value");
} else if (OB_FAIL(items.push_back(config))) {
LOG_WARN("failed to push service source attr config", K(config));
}
// gen tenant id
if (OB_FAIL(ret)) {
} else if (OB_FAIL(config.key_.assign(OB_STR_TENANT_ID))) {
LOG_WARN("failed to assign tenant id key");
} else if (OB_FAIL(get_tenant_id_str_(tmp))) {
LOG_WARN("failed to get tenant id str");
} else if (OB_FAIL(config.value_.assign(tmp.ptr()))) {
LOG_WARN("failed to assign tenant id value");
} else if (OB_FAIL(items.push_back(config))) {
LOG_WARN("failed to push service source attr config", K(config));
}
// gen cluster id
if (OB_FAIL(ret)) {
} else if (OB_FAIL(config.key_.assign(OB_STR_CLUSTER_ID))) {
LOG_WARN("failed to assign cluster id key");
} else if (OB_FAIL(get_cluster_id_str_(tmp))) {
LOG_WARN("failed to get cluster id str");
} else if (OB_FAIL(config.value_.assign(tmp.ptr()))) {
LOG_WARN("failed to assign cluster id value");
} else if (OB_FAIL(items.push_back(config))) {
LOG_WARN("failed to push service source attr config", K(config));
}
// gen compatibility mode
if (OB_FAIL(ret)) {
} else if (OB_FAIL(config.key_.assign(OB_COMPATIBILITY_MODE))) {
LOG_WARN("failed to assign compatibility mode");
} else if (OB_FAIL(get_compatibility_mode_str_(tmp))) {
LOG_WARN("failed to get compatibility mode str");
} else if (OB_FAIL(config.value_.assign(tmp.ptr()))) {
LOG_WARN("failed to assign compatibility mode value");
} else if (OB_FAIL(items.push_back(config))) {
LOG_WARN("failed to push service source attr config", K(config));
}
// gen is encrypted
if (OB_FAIL(ret)) {
} else if (OB_FAIL(config.key_.assign(OB_STR_IS_ENCRYPTED))) {
LOG_WARN("failed to assign encrypted key");
} else if (OB_FAIL(get_is_encrypted_str_(tmp))) {
LOG_WARN("failed to get is encrypted str");
} else if (OB_FAIL(config.value_.assign(tmp.ptr()))) {
LOG_WARN("failed to assign encrypted value");
} else if (OB_FAIL(items.push_back(config))) {
LOG_WARN("failed to push service source attr config", K(config));
}
return ret;
}
int ObRestoreSourceServiceAttr::gen_service_attr_str(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
ObSqlString str;
ObSqlString ip_str;
ObSqlString user_str;
ObSqlString passwd_str;
ObSqlString compat_str;
ObSqlString is_encrypted_str;
if (OB_UNLIKELY(!is_valid() || OB_ISNULL(buf) || buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid service attr argument", KP(buf), K(buf_size));
} else if (OB_FAIL(get_ip_list_str_(ip_str))) {
LOG_WARN("get ip list str failed");
} else if (OB_FAIL(get_user_str_(user_str))) {
LOG_WARN("get user str failed");
} else if (OB_FAIL(get_password_str_(passwd_str))) {
LOG_WARN("get password str failed");
} else if (OB_FAIL(get_compatibility_mode_str_(compat_str))) {
LOG_WARN("get compatibility mode str failed");
} else if (OB_FAIL(get_is_encrypted_str_(is_encrypted_str))) {
LOG_WARN("get encrypted str failed");
} else if (OB_FAIL(str.assign_fmt("IP_LIST=%s,USER=%s,PASSWORD=%s,TENANT_ID=%ld,CLUSTER_ID=%ld,COMPATIBILITY_MODE=%s,IS_ENCRYPTED=%s",
ip_str.ptr(), user_str.ptr(), passwd_str.ptr(), user_.tenant_id_, user_.cluster_id_, compat_str.ptr(), is_encrypted_str.ptr()))) {
LOG_WARN("fail to assign str", K(compat_str), K(is_encrypted_str));
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast<int>(str.length()), str.ptr()))) {
LOG_WARN("fail to print str", K(str));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_ip_list_str_(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
ObSqlString str;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid ip list argument", KP(buf), K(buf_size));
} else if (OB_FAIL(get_ip_list_str_(str))) {
LOG_WARN("get ip list str failed");
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast<int>(str.length()), str.ptr()))) {
LOG_WARN("fail to print str", K(str));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_ip_list_str_(ObSqlString &str) const
{
int ret = OB_SUCCESS;
ARRAY_FOREACH_N(addr_, idx, cnt) {
char ip_str[MAX_IP_PORT_LENGTH] = { 0 };
const ObAddr ip = addr_.at(idx);
if (OB_FAIL(ip.ip_port_to_string(ip_str, sizeof(ip_str)))) {
LOG_WARN("fail to convert ip port to string", K(ip), K(ip_str));
} else {
if (0 == idx && (OB_FAIL(str.assign_fmt("%s", ip_str)))) {
LOG_WARN("fail to assign ip str", K(str) ,K(ip), K(ip_str));
} else if ( 0 != idx && OB_FAIL(str.append_fmt(";%s", ip_str))) {
LOG_WARN("fail to append ip str", K(str), K(ip), K(ip_str));
}
}
}
LOG_DEBUG("get ip list str", K(str));
return ret;
}
int ObRestoreSourceServiceAttr::get_user_str_(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
ObSqlString res_str;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid user str argument", KP(buf), K(buf_size));
} else if (OB_FAIL(get_user_str_(res_str))) {
LOG_WARN("fail to get user str");
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast<int>(res_str.length()), res_str.ptr()))) {
LOG_WARN("fail to print str");
}
return ret;
}
int ObRestoreSourceServiceAttr::get_user_str_(ObSqlString &user_str) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(user_str.assign(user_.user_name_))) {
LOG_WARN("fail to assign user name" ,K(user_.user_name_));
} else if (OB_FAIL(user_str.append_fmt("@%s",user_.tenant_name_))) {
LOG_WARN("fail to assign tenant name", K(user_.user_name_));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_password_str_(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
ObSqlString passwd_str;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid password str argument", KP(buf), K(buf_size));
} else if (OB_FAIL(get_password_str_(passwd_str))) {
LOG_WARN("fail to get password str");
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast<int>(passwd_str.length()), passwd_str.ptr()))) {
LOG_WARN("fail to print str");
}
return ret;
}
int ObRestoreSourceServiceAttr::get_password_str_(ObSqlString &passwd_str) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(passwd_str.assign(encrypt_passwd_))) {
LOG_WARN("fail to assign password");
}
return ret;
}
int ObRestoreSourceServiceAttr::get_tenant_id_str_(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant id str argument", KP(buf), K(buf_size));
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%ld", user_.tenant_id_))) {
LOG_WARN("fail to print tenant id str", K(user_.tenant_id_));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_tenant_id_str_(ObSqlString &tenant_str) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(tenant_str.assign_fmt("%ld", user_.tenant_id_))) {
LOG_WARN("fail to assign tenant id str", K(user_.tenant_id_));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_cluster_id_str_(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid cluster id str argument", KP(buf), K(buf_size));
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%ld", user_.cluster_id_))) {
LOG_WARN("fail to print cluster id str", K(user_.cluster_id_));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_cluster_id_str_(ObSqlString &cluster_str) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(cluster_str.assign_fmt("%ld", user_.cluster_id_))) {
LOG_WARN("fail to assign cluster id str", K(user_.cluster_id_));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_compatibility_mode_str_(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
ObSqlString tmp_str;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid compatibilidy mode str argument", KP(buf), K(buf_size));
} else if (OB_FAIL(get_compatibility_mode_str_(tmp_str))) {
LOG_WARN("fail to get compatibility mode str");
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%.*s", static_cast<int>(tmp_str.length()), tmp_str.ptr()))) {
LOG_WARN("fail to print compatibility mode str", K(tmp_str));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_compatibility_mode_str_(ObSqlString &compatibility_str) const
{
int ret = OB_SUCCESS;
const char *compat_mode = "INVALID";
if (ObCompatibilityMode::MYSQL_MODE == user_.mode_) {
compat_mode = "MYSQL";
} else if (ObCompatibilityMode::ORACLE_MODE == user_.mode_) {
compat_mode = "ORACLE";
} else {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("compatibility mode is invalid", K(user_.mode_));
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(compatibility_str.assign_fmt("%s", compat_mode))) {
LOG_WARN("fail to assign compatibility mode", K(compat_mode));
}
return ret;
}
int ObRestoreSourceServiceAttr::get_is_encrypted_str_(char *buf, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid encrypted str argument", KP(buf), K(buf_size));
} else if (OB_FAIL(databuff_printf(buf, buf_size, "%s", "false"))) {
LOG_WARN("fail to print encrypted str");
}
return ret;
}
int ObRestoreSourceServiceAttr::get_is_encrypted_str_(ObSqlString &encrypted_str) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(encrypted_str.assign("false"))) {
LOG_WARN("fail to print str");
}
return ret;
}
int ObRestoreSourceServiceAttr::get_password(char *passwd, const int64_t buf_size) const
{
int ret = OB_SUCCESS;
char tmp_passwd[OB_MAX_PASSWORD_LENGTH + 1] = { 0 };
if (OB_ISNULL(passwd) || OB_UNLIKELY((buf_size <= 0) || (buf_size < OB_MAX_PASSWORD_LENGTH))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid parameter when get password", K(passwd), K(STRLEN(passwd)));
} else if (OB_FAIL(databuff_printf(passwd, buf_size, "%s", encrypt_passwd_))) {
LOG_WARN("failed to print encrypt_passwd_ key", K(encrypt_passwd_));
}
return ret;
}
bool ObRestoreSourceServiceAttr::compare_addr_(common::ObArray<common::ObAddr> addr) const
{
int bret = true;
int ret = OB_SUCCESS;
if (addr.size() != this->addr_.size()) {
bret = false;
} else {
ARRAY_FOREACH_N(addr, idx, cnt) {
bool tmp_cmp = false;
ARRAY_FOREACH_N(this->addr_, idx1, cnt1) {
if (addr.at(idx) == this->addr_.at(idx1)) {
tmp_cmp = true;
}
}
if (!tmp_cmp) {
bret = false;
break;
}
}
}
return bret;
}
int ObRestoreSourceServiceAttr::check_restore_source_is_self_(bool &is_self, uint64_t tenant_id) const
{
int ret = OB_SUCCESS;
is_self = false;
int64_t curr_cluster_id = GCONF.cluster_id;
// assume that cluster id is managed by
if (tenant_id == user_.tenant_id_ && curr_cluster_id == user_.cluster_id_) {
is_self = true;
LOG_WARN("set standby itself as log restore source is not allowed");
}
LOG_INFO("check restore is self succ", K(tenant_id), K(user_.tenant_id_), K(curr_cluster_id), K(user_.cluster_id_));
return ret;
}
bool ObRestoreSourceServiceAttr::operator==(const ObRestoreSourceServiceAttr &other) const
{
return (this->user_ == other.user_)
&& (STRLEN(this->encrypt_passwd_) == STRLEN(other.encrypt_passwd_))
&& (0 == STRCMP(this->encrypt_passwd_, other.encrypt_passwd_))
&& compare_addr_(other.addr_);
}
int ObRestoreSourceServiceAttr::assign(const ObRestoreSourceServiceAttr &attr)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!attr.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(attr));
} else if (FALSE_IT(addr_.reset())) {
} else if (OB_FAIL(addr_.assign(attr.addr_))) {
LOG_WARN("addr_ assign failed", K(attr));
} else if (OB_FAIL(user_.assign(attr.user_))) {
LOG_WARN("user_ assign failed", K(attr));
} else if (OB_FAIL(databuff_printf(encrypt_passwd_, sizeof(attr.encrypt_passwd_), "%s", attr.encrypt_passwd_))) {
LOG_WARN("passwd_ assign failed", K(attr));
}
return ret;
}
ObRestoreSourceLocationPrimaryAttr::ObRestoreSourceLocationPrimaryAttr()
: tenant_id_(OB_INVALID_TENANT_ID),
cluster_id_(OB_INVALID_CLUSTER_ID)
{
location_.reset();
}
int ObRestoreSourceLocationPrimaryAttr::parse_location_attr_from_str(const common::ObString &value)
{
int ret = OB_SUCCESS;
char tmp_str[OB_MAX_BACKUP_DEST_LENGTH] = { 0 };
char *token = nullptr;
char *saveptr = nullptr;
char *p_end = nullptr;
if (value.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("location log restore source is empty");
} else if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%.*s", static_cast<int>(value.length()), value.ptr()))) {
LOG_WARN("fail to set config value", K(ret), K(value));
} else {
token = tmp_str;
for (char *str = token; OB_SUCC(ret); str = nullptr) {
token = ::STRTOK_R(str, ",", &saveptr);
if (nullptr == token) {
break;
} else if (OB_FAIL(do_parse_sub_config_(token))) {
LOG_WARN("fail to do parse location restore source sub config from string", K(token));
}
}
}
return ret;
}
int ObRestoreSourceLocationPrimaryAttr::do_parse_sub_config_(const char *sub_value)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(sub_value) || OB_UNLIKELY(0 == STRLEN(sub_value) || STRLEN(sub_value) > OB_MAX_BACKUP_DEST_LENGTH)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("location log restore source primary attr sub value is invalid", K(sub_value));
} else {
char tmp_str[OB_MAX_BACKUP_DEST_LENGTH + 1] = { 0 };
char *token = nullptr;
char *saveptr = nullptr;
if (OB_FAIL(databuff_printf(tmp_str, sizeof(tmp_str), "%s", sub_value))) {
LOG_WARN("fail to print sub_value", K(sub_value));
} else if (OB_ISNULL(token = ::STRTOK_R(tmp_str, "=", &saveptr))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("fail to split sub_value str", K(token), KP(tmp_str));
} else if (0 == STRCASECMP(token, OB_STR_LOCATION)) {
if (OB_FAIL(set_location_path_(saveptr))) {
LOG_WARN("fail to do parse location path", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_CLUSTER_ID)) {
if (OB_FAIL(set_location_cluster_id_(saveptr))) {
LOG_WARN("fail to do parse location primary cluster id", K(token), K(saveptr));
}
} else if (0 == STRCASECMP(token, OB_STR_TENANT_ID)) {
if (OB_FAIL(set_location_tenant_id_(saveptr))) {
LOG_WARN("fail to do parse location primary tenant id", K(token), K(saveptr));
}
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("location log restore source does not support this sub config", K(token));
}
}
return ret;
}
int ObRestoreSourceLocationPrimaryAttr::set_location_path_(const char *path)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(path) || OB_UNLIKELY(0 == STRLEN(path) || STRLEN(path) > OB_MAX_BACKUP_DEST_LENGTH)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("location path is invalid", K(path));
} else if (OB_FAIL(location_.assign(path))) {
LOG_WARN("fail to assign location path", K(path));
}
return ret;
}
int ObRestoreSourceLocationPrimaryAttr::set_location_cluster_id_(const char *cluster_id_str)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(cluster_id_str)
|| OB_UNLIKELY(0 == STRLEN(cluster_id_str) || STRLEN(cluster_id_str) > OB_MAX_BACKUP_DEST_LENGTH)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("location cluster id str is invalid", K(cluster_id_str));
} else if (OB_FAIL(ob_atoll(cluster_id_str, cluster_id_))) {
LOG_WARN("fail to set location primary cluster id from string", K(cluster_id_str));
}
return ret;
}
int ObRestoreSourceLocationPrimaryAttr::set_location_tenant_id_(const char *tenant_id_str)
{
int ret = OB_SUCCESS;
char *p_end = nullptr;
if (OB_ISNULL(tenant_id_str) || OB_UNLIKELY(0 == STRLEN(tenant_id_str))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("location tenant id is invalid", K(tenant_id_str));
} else if (OB_FAIL(ob_strtoull(tenant_id_str, p_end, tenant_id_))) {
LOG_WARN("fail to set location primary tenant id from string", K(tenant_id_str));
}
return ret;
}
bool ObRestoreSourceLocationPrimaryAttr::is_valid()
{
return tenant_id_ != OB_INVALID_TENANT_ID
&& cluster_id_ != OB_INVALID_CLUSTER_ID
&& location_.is_valid();
}

View File

@ -0,0 +1,113 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_SHARE_BACKUP_OB_LOG_RESTORE_STRUCT_H_
#define OCEANBASE_SHARE_BACKUP_OB_LOG_RESTORE_STRUCT_H_
#include "lib/container/ob_array.h"
#include "share/scn.h"
#include "ob_backup_struct.h"
namespace oceanbase
{
namespace share
{
struct ObRestoreSourceServiceUser final
{
ObRestoreSourceServiceUser();
~ObRestoreSourceServiceUser() {}
void reset();
bool is_valid() const;
int assign(const ObRestoreSourceServiceUser &user);
char user_name_[OB_MAX_USER_NAME_LENGTH];
char tenant_name_[OB_MAX_ORIGINAL_NANE_LENGTH];
ObCompatibilityMode mode_;
uint64_t tenant_id_;
int64_t cluster_id_;
bool operator == (const ObRestoreSourceServiceUser &other) const;
TO_STRING_KV(K_(user_name), K_(tenant_name), K_(tenant_id), K_(cluster_id));
};
struct ObRestoreSourceServiceAttr final
{
ObRestoreSourceServiceAttr();
~ObRestoreSourceServiceAttr() {}
void reset();
int parse_service_attr_from_str(ObSqlString &str);
int do_parse_sub_service_attr(const char *sub_value);
int set_service_user_config(const char *user_tenant);
int set_service_user(const char *user, const char *tenant);
int set_service_tenant_id(const char *tenant_id);
int set_service_cluster_id(const char *cluster_id);
int set_service_compatibility_mode(const char *compatibility_mode);
// It need to convert password to encrypted password when pasre from log_restore_source config.
int set_service_passwd_to_encrypt(const char *passwd);
// There's no need to convert password to encrypted password when parse from __all_log_restore_source record.
int set_service_passwd_no_encrypt(const char *passwd);
int parse_ip_port_from_str(const char *buf, const char *delimiter);
bool is_valid() const;
bool service_user_is_valid() const;
bool service_host_is_valid() const;
bool service_password_is_valid() const;
int gen_config_items(common::ObIArray<BackupConfigItemPair> &items) const;
int gen_service_attr_str(char *buf, const int64_t buf_size) const;
int gen_service_attr_str(ObSqlString &str) const;
int get_ip_list_str_(char *buf, const int64_t buf_size) const;
int get_ip_list_str_(ObSqlString &str) const;
int get_user_str_(char *buf, const int64_t buf_size) const;
int get_user_str_(ObSqlString &str) const;
int get_password_str_(char *buf, const int64_t buf_size) const;
int get_password_str_(ObSqlString &str) const;
int get_tenant_id_str_(char *buf ,const int64_t buf_size) const;
int get_tenant_id_str_(ObSqlString &str) const;
int get_cluster_id_str_(char *buf, const int64_t buf_size) const;
int get_cluster_id_str_(ObSqlString &str) const;
int get_compatibility_mode_str_(char *buf, const int64_t buf_size) const;
int get_compatibility_mode_str_(ObSqlString &str) const;
int get_is_encrypted_str_(char *buf, const int64_t buf_size) const;
int get_is_encrypted_str_(ObSqlString &str) const;
int set_encrypt_password_key_(const char *encrypt_key);
int get_decrypt_password_key_(char *unencrypt_key, const int64_t buf_size) const;
// return the origion password
int get_password(char *passwd, const int64_t buf_size) const;
bool compare_addr_(common::ObArray<common::ObAddr> addr) const;
int check_restore_source_is_self_(bool &is_self, uint64_t tenant_id) const;
bool operator ==(const ObRestoreSourceServiceAttr &other) const;
int assign(const ObRestoreSourceServiceAttr &attr);
TO_STRING_KV(K_(addr), K_(user), K_(encrypt_passwd));
common::ObArray<common::ObAddr> addr_;
ObRestoreSourceServiceUser user_;
char encrypt_passwd_[OB_MAX_BACKUP_SERIALIZEKEY_LENGTH];
};
struct ObRestoreSourceLocationPrimaryAttr final
{
ObRestoreSourceLocationPrimaryAttr();
~ObRestoreSourceLocationPrimaryAttr() {}
void reset();
uint64_t tenant_id_;
int64_t cluster_id_;
ObSqlString location_;
int parse_location_attr_from_str(const common::ObString &value);
int do_parse_sub_config_(const char *sub_value);
int set_location_path_(const char *path);
int set_location_cluster_id_(const char *cluster_id_str);
int set_location_tenant_id_(const char *tenant_id_str);
bool is_valid();
TO_STRING_KV(K_(tenant_id), K_(cluster_id), K_(location));
};
}//share
}//oceanbase
#endif /* OCEANBASE_SHARE_BACKUP_OB_LOG_RESTORE_STRUCT_H_ */

View File

@ -14,12 +14,14 @@
#include "lib/restore/ob_storage.h"
#include "lib/utility/ob_macro_utils.h"
#include "share/backup/ob_backup_struct.h"
#include "share/backup/ob_log_restore_struct.h"
#include "ob_log_restore_source_mgr.h"
#include "lib/ob_define.h"
#include "lib/ob_errno.h"
#include "lib/net/ob_addr.h"
#include "lib/oblog/ob_log_module.h"
#include "lib/string/ob_string.h"
using namespace oceanbase::share;
int ObLogRestoreSourceMgr::init(const uint64_t tenant_id, ObISQLClient *proxy)
{
@ -105,7 +107,10 @@ int ObLogRestoreSourceMgr::add_location_source(const SCN &recovery_until_scn,
{
int ret = OB_SUCCESS;
ObBackupDest dest;
char dest_buf[OB_MAX_BACKUP_DEST_LENGTH] = {0};
const int64_t MAX_RESTORE_CLUSTER_STR = 10 + 16; // CLUSTER_ID=4294967295
const int64_t MAX_RESTORE_TENANT_STR = 20 + 16; // TENANT_ID=18446744073709551615
const int64_t MAX_RESTORE_LOCAION_STR = OB_MAX_BACKUP_DEST_LENGTH + 16; // LOCATION=file:///data/xxxx
char dest_buf[MAX_RESTORE_LOCAION_STR + MAX_RESTORE_CLUSTER_STR + MAX_RESTORE_TENANT_STR] = { 0 };
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObLogRestoreSourceMgr not init", K(ret), K(is_inited_));
@ -115,8 +120,9 @@ int ObLogRestoreSourceMgr::add_location_source(const SCN &recovery_until_scn,
} else if (OB_FAIL(dest.set(archive_dest.ptr()))) {
// use backup dest to manage oss key
LOG_WARN("set backup dest failed", K(ret), K(archive_dest));
} else if (OB_FAIL(dest.get_backup_dest_str(dest_buf, OB_MAX_BACKUP_DEST_LENGTH))) {
LOG_WARN("get backup dest str failed", K(ret), K(dest));
} else if (OB_FAIL(dest.get_backup_dest_str_with_primary_attr(dest_buf, sizeof(dest_buf)))) {
// store primary cluster id and tenant id in log restore source
LOG_WARN("get backup dest str with primary attr failed", K(ret), K(dest));
} else {
ObLogRestoreSourceItem item(tenant_id_,
OB_DEFAULT_LOG_RESTORE_SOURCE_ID,
@ -126,7 +132,7 @@ int ObLogRestoreSourceMgr::add_location_source(const SCN &recovery_until_scn,
if (OB_FAIL(table_operator_.insert_source(item))) {
LOG_WARN("table_operator_ insert_source failed", K(ret), K(item));
} else {
LOG_INFO("add location source succ", K(recovery_until_scn), K(archive_dest));
LOG_INFO("add location source succ", K(recovery_until_scn), K(archive_dest), K(item));
}
}
return ret;
@ -174,10 +180,13 @@ int ObLogRestoreSourceMgr::get_source_for_update(ObLogRestoreSourceItem &item, c
int ObLogRestoreSourceMgr::get_backup_dest(const ObLogRestoreSourceItem &item, ObBackupDest &dest)
{
int ret = OB_SUCCESS;
ObRestoreSourceLocationPrimaryAttr location_attr;
if (OB_UNLIKELY(! item.is_valid() || ! is_location_log_source_type(item.type_))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(item));
} else if OB_FAIL(dest.set(item.value_)) {
} else if (OB_FAIL(location_attr.parse_location_attr_from_str(item.value_))) {
LOG_WARN("parse location attr from string failed", K(item));
} else if OB_FAIL(dest.set(location_attr.location_.ptr())) {
LOG_WARN("backup dest set failed", K(ret), K(item));
}
return ret;

View File

@ -10,7 +10,7 @@
* See the Mulan PubL v2 for more details.
*/
#include "src/share/backup/ob_backup_struct.h"
#include "src/share/backup/ob_log_restore_struct.h"
#include <gtest/gtest.h>
namespace oceanbase