fix standby tenant get rootkey from primary tenant

This commit is contained in:
maosy 2023-08-11 14:18:41 +00:00 committed by ob-robot
parent ad113017d4
commit c9a7c0d8f2
10 changed files with 157 additions and 48 deletions

View File

@ -141,6 +141,7 @@ constexpr int OB_FROZEN_INFO_ALREADY_EXIST = -4744;
constexpr int OB_CREATE_STANDBY_TENANT_FAILED = -4765;
constexpr int OB_LS_WAITING_SAFE_DESTROY = -4766;
constexpr int OB_LS_LOCK_CONFLICT = -4768;
constexpr int OB_INVALID_ROOT_KEY = -4769;
constexpr int OB_ERR_PARSER_SYNTAX = -5006;
constexpr int OB_ERR_COLUMN_NOT_FOUND = -5031;
constexpr int OB_ERR_SYS_VARIABLE_UNKNOWN = -5044;

View File

@ -108,6 +108,7 @@
#include "storage/tx_storage/ob_ls_service.h"
#include "storage/tablelock/ob_lock_inner_connection_util.h"
namespace oceanbase
{
using namespace common;

View File

@ -120,6 +120,7 @@ class ObDDLService
{
public:
typedef std::pair<share::ObLSID, common::ObTabletID> LSTabletID;
typedef ObFixedLengthString<OB_ROOT_KEY_LEN + 1> RootKeyValue;
public:
friend class ObTableGroupHelp;
friend class ObStandbyClusterSchemaProcessor;

View File

@ -223,35 +223,13 @@ int ObLogRestoreSourceServiceConfigParser::check_before_update_inner_config(
ObCompatibilityMode &compat_mode)
{
int ret = OB_SUCCESS;
char passwd[OB_MAX_PASSWORD_LENGTH + 1] = { 0 }; //unencrypted password
ObSqlString user_and_tenant;
compat_mode = ObCompatibilityMode::OCEANBASE_MODE;
bool source_is_self = false;
SMART_VAR(ObLogRestoreProxyUtil, proxy) {
if (is_empty_) {
} else if (!type_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid parser", KPC(this));
} else if (0 == STRLEN(service_attr_.encrypt_passwd_)
|| 0 == STRLEN(service_attr_.user_.user_name_)
|| 0 == STRLEN(service_attr_.user_.tenant_name_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("fail to parse log restore source config, please check the config parameters");
LOG_USER_ERROR(OB_INVALID_ARGUMENT, "parse log restore source config, please check the config parameters");
} else if (OB_FAIL(service_attr_.get_password(passwd, sizeof(passwd)))) {
LOG_WARN("get servcie attr password failed");
} else if (OB_FAIL(service_attr_.get_user_str_(user_and_tenant))) {
LOG_WARN("get user str failed", K(service_attr_.user_.user_name_), K(service_attr_.user_.tenant_name_));
} else if (OB_FAIL(proxy.try_init(tenant_id_/*standby*/, service_attr_.addr_, user_and_tenant.ptr(), passwd))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("proxy connect to primary db failed", K(service_attr_.addr_), K(user_and_tenant));
} else if (OB_FAIL(proxy.get_tenant_id(service_attr_.user_.tenant_name_, service_attr_.user_.tenant_id_))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get primary tenant id failed", K(tenant_id_), K(service_attr_.user_));
} else if (OB_FAIL(proxy.get_cluster_id(service_attr_.user_.tenant_id_, service_attr_.user_.cluster_id_))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get primary cluster id failed", K(tenant_id_), K(service_attr_.user_.tenant_id_));
} else if (OB_FAIL(construct_restore_sql_proxy_(proxy))) {
LOG_WARN("failed to construct restore sql proxy", KR(ret));
} else if (!for_verify && OB_FAIL(service_attr_.check_restore_source_is_self_(source_is_self, tenant_id_))) {
LOG_WARN("check restore source is self failed");
} else if (source_is_self) {
@ -271,6 +249,48 @@ int ObLogRestoreSourceServiceConfigParser::check_before_update_inner_config(
return ret;
}
int ObLogRestoreSourceServiceConfigParser::construct_restore_sql_proxy_(ObLogRestoreProxyUtil &log_restore_proxy)
{
int ret = OB_SUCCESS;
char passwd[OB_MAX_PASSWORD_LENGTH + 1] = { 0 }; //unencrypted password
ObSqlString user_and_tenant;
if (is_empty_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret));
} else if (!type_.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid parser", KPC(this));
} else if (0 == STRLEN(service_attr_.encrypt_passwd_) ||
0 == STRLEN(service_attr_.user_.user_name_) ||
0 == STRLEN(service_attr_.user_.tenant_name_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("fail to parse log restore source config, please check the config parameters");
LOG_USER_ERROR(OB_INVALID_ARGUMENT,
"parse log restore source config, please check the config parameters");
} else if (OB_FAIL(service_attr_.get_password(passwd, sizeof(passwd)))) {
LOG_WARN("get servcie attr password failed");
} else if (OB_FAIL(service_attr_.get_user_str_(user_and_tenant))) {
LOG_WARN("get user str failed", K(service_attr_.user_.user_name_),
K(service_attr_.user_.tenant_name_));
} else if (OB_FAIL(log_restore_proxy.try_init(tenant_id_ /*standby*/, service_attr_.addr_,
user_and_tenant.ptr(), passwd))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("proxy connect to primary db failed", K(service_attr_.addr_),
K(user_and_tenant));
} else if (OB_FAIL(log_restore_proxy.get_tenant_id(service_attr_.user_.tenant_name_,
service_attr_.user_.tenant_id_))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get primary tenant id failed", K(tenant_id_),
K(service_attr_.user_));
} else if (OB_FAIL(log_restore_proxy.get_cluster_id(service_attr_.user_.tenant_id_,
service_attr_.user_.cluster_id_))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get primary cluster id failed", K(tenant_id_),
K(service_attr_.user_.tenant_id_));
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::get_compatibility_mode(common::ObCompatibilityMode &compatibility_mode)
{
int ret = OB_SUCCESS;
@ -288,6 +308,36 @@ int ObLogRestoreSourceServiceConfigParser::get_compatibility_mode(common::ObComp
return ret;
}
int ObLogRestoreSourceServiceConfigParser::
get_primary_server_addr(const common::ObSqlString &value,
uint64_t &primary_tenant_id,
uint64_t &primary_cluster_id,
ObIArray<common::ObAddr> &addr_list) {
int ret = OB_SUCCESS;
if (OB_FAIL(value.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(value));
} else if (OB_FAIL(parse_from(value))) {
LOG_WARN("failed to parse from value", KR(ret), K(value));
} else {
SMART_VAR(ObLogRestoreProxyUtil, proxy) {
if (OB_FAIL(construct_restore_sql_proxy_(proxy))) {
LOG_WARN("failed to construct restore sql proxy", KR(ret));
} else if (OB_FAIL(proxy.get_server_addr(service_attr_.user_.tenant_id_, addr_list))) {
LOG_WARN("failed to get server addr", KR(ret), K(tenant_id_), K(addr_list), K(service_attr_));
} else if (OB_UNLIKELY(0 >= addr_list.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("addr list is empty", K(primary_tenant_id), K(value));
} else {
primary_tenant_id = service_attr_.user_.tenant_id_;
primary_cluster_id = service_attr_.user_.cluster_id_;
}
LOG_INFO("get primary server info", K(primary_tenant_id), K(primary_cluster_id), K(addr_list));
}
}
return ret;
}
int ObLogRestoreSourceServiceConfigParser::do_parse_sub_config_(const common::ObString &config_str)
{
int ret = OB_SUCCESS;

View File

@ -27,6 +27,7 @@ namespace share
class ObLogArchiveDestConfigParser;
class ObBackupConfigType;
class ObIBackupConfigItemParser;
class ObLogRestoreProxyUtil;
class ObLogRestoreSourceLocationConfigParser : public ObLogArchiveDestConfigParser
{
@ -57,6 +58,9 @@ public:
const bool for_verify,
ObCompatibilityMode &compat_mode);
virtual int get_compatibility_mode(common::ObCompatibilityMode &compatibility_mode);
int get_primary_server_addr(const common::ObSqlString &value,
uint64_t &primary_tenant_id, uint64_t &primary_cluster_id,
ObIArray<common::ObAddr> &addr_list);
private:
int do_parse_sub_config_(const common::ObString &config_str);
int do_parse_restore_service_host_(const common::ObString &name, const common::ObString &value);
@ -64,6 +68,7 @@ private:
int do_parse_restore_service_passwd_(const common::ObString &name, const common::ObString &value);
int check_doing_service_restore_(common::ObISQLClient &trans, bool &is_doing);
int update_data_backup_dest_config_(common::ObISQLClient &trans);
int construct_restore_sql_proxy_(ObLogRestoreProxyUtil &log_restore_proxy);
private:
ObRestoreSourceServiceAttr service_attr_;
bool is_empty_;
@ -73,4 +78,4 @@ private:
}//share
}//oceanbase
#endif /* OCEANBASE_SHARE_BACKUP_OB_LOG_RESTORE_CONFIG_H_ */
#endif /* OCEANBASE_SHARE_BACKUP_OB_LOG_RESTORE_CONFIG_H_ */

File diff suppressed because one or more lines are too long

View File

@ -659,6 +659,7 @@ DEFINE_ERROR_EXT_DEP(OB_CREATE_STANDBY_TENANT_FAILED, -4765, -1, "HY000", "creat
DEFINE_ERROR_EXT_DEP(OB_LS_WAITING_SAFE_DESTROY, -4766, -1, "HY000", "ls waiting safe destory", "ls waiting safe destory, %s");
DEFINE_ERROR(OB_LS_NOT_LEADER, -4767, -1, "HY000", "log stream is not leader log stream");
DEFINE_ERROR_EXT_DEP(OB_LS_LOCK_CONFLICT, -4768, -1, "HY000", "ls lock conflict", "ls lock conflict, %s");
DEFINE_ERROR_EXT_DEP(OB_INVALID_ROOT_KEY, -4769, -1, "HY000", "invalid root key", "%s");
////////////////////////////////////////////////////////////////
// SQL & Schema specific error code, -5000 ~ -6000

View File

@ -2336,6 +2336,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_LS_WAITING_SAFE_DESTROY__USER_ERROR_MSG "ls waiting safe destory, %s"
#define OB_LS_NOT_LEADER__USER_ERROR_MSG "log stream is not leader log stream"
#define OB_LS_LOCK_CONFLICT__USER_ERROR_MSG "ls lock conflict, %s"
#define OB_INVALID_ROOT_KEY__USER_ERROR_MSG "%s"
#define OB_ERR_PARSER_INIT__USER_ERROR_MSG "Failed to init SQL parser"
#define OB_ERR_PARSE_SQL__USER_ERROR_MSG "%s near \'%.*s\' at line %d"
#define OB_ERR_RESOLVE_SQL__USER_ERROR_MSG "Resolve error"
@ -4450,6 +4451,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_LS_WAITING_SAFE_DESTROY__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4766, ls waiting safe destory, %s"
#define OB_LS_NOT_LEADER__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4767, log stream is not leader log stream"
#define OB_LS_LOCK_CONFLICT__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4768, ls lock conflict, %s"
#define OB_INVALID_ROOT_KEY__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4769, %s"
#define OB_ERR_PARSER_INIT__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -5000, Failed to init SQL parser"
#define OB_ERR_PARSE_SQL__ORA_USER_ERROR_MSG "ORA-00900: %s near \'%.*s\' at line %d"
#define OB_ERR_RESOLVE_SQL__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -5002, Resolve error"
@ -6003,7 +6005,7 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219;
#define OB_ERR_DATA_TOO_LONG_MSG_FMT_V2__ORA_USER_ERROR_MSG "ORA-12899: value too large for column %.*s (actual: %ld, maximum: %ld)"
#define OB_ERR_INVALID_DATE_MSG_FMT_V2__ORA_USER_ERROR_MSG "ORA-01861: Incorrect datetime value for column '%.*s' at row %ld"
extern int g_all_ob_errnos[2110];
extern int g_all_ob_errnos[2111];
const char *ob_error_name(const int oberr);
const char* ob_error_cause(const int oberr);

View File

@ -485,11 +485,11 @@ int ObLogRestoreProxyUtil::check_begin_lsn(const uint64_t tenant_id)
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
ObSqlString sql;
if (OB_FAIL(sql.assign_fmt("SELECT COUNT(*) AS CNT FROM %s OB_LS LEFT JOIN"
"(SELECT TENANT_ID, LS_ID, BEGIN_LSN FROM %s WHERE ROLE= 'LEADER') LOG_STAT "
"(SELECT TENANT_ID, LS_ID, BEGIN_LSN FROM %s WHERE ROLE= 'LEADER' AND TENANT_ID = %lu) LOG_STAT "
"ON OB_LS.LS_ID = LOG_STAT.LS_ID "
"WHERE LOG_STAT.TENANT_ID = %lu AND (BEGIN_LSN IS NULL OR BEGIN_LSN != 0)"
"AND OB_LS.STATUS NOT IN ('TENANT_DROPPING', 'CREATE_ABORT', 'PRE_TENANT_DROPPING')",
OB_DBA_OB_LS_TNAME, OB_GV_OB_LOG_STAT_ORA_TNAME, tenant_id))) {
"WHERE (BEGIN_LSN IS NULL OR BEGIN_LSN != 0)"
"AND OB_LS.STATUS NOT IN ('TENANT_DROPPING', 'CREATE_ABORT', 'PRE_TENANT_DROPPING')",
OB_DBA_OB_LS_ORA_TNAME, OB_GV_OB_LOG_STAT_ORA_TNAME, tenant_id))) {
LOG_WARN("fail to generate sql", KR(ret), K(tenant_id));
} else if (OB_FAIL(sql_proxy_.read(result, sql.ptr()))) {
LOG_WARN("check_begin_lsn failed", KR(ret), K(tenant_id), K(sql));
@ -510,6 +510,7 @@ int ObLogRestoreProxyUtil::check_begin_lsn(const uint64_t tenant_id)
LOG_WARN("primary tenant LS log may be recycled, create standby tenant is not allow", KR(ret), K(tenant_id), K(sql));
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "primary tenant LS log may be recycled, create standby tenant is");
}
LOG_INFO("check begion lsn", K(cnt), K(sql));
}
}
)
@ -520,18 +521,47 @@ int ObLogRestoreProxyUtil::check_begin_lsn(const uint64_t tenant_id)
int ObLogRestoreProxyUtil::get_server_ip_list(const uint64_t tenant_id, common::ObArray<common::ObAddr> &addrs)
{
int ret = OB_SUCCESS;
RESTORE_RETRY(
SMART_VAR(ObMySQLProxy::MySQLResult ,result) {
ObSqlString sql;
ObSqlString sql;
if (OB_FAIL(
sql.assign_fmt("SELECT SVR_IP, SQL_PORT AS SVR_PORT FROM %s WHERE TENANT_ID=%ld",
OB_DBA_OB_ACCESS_POINT_TNAME, tenant_id))) {
LOG_WARN("fail to generate sql");
} else if (OB_FAIL(construct_server_ip_list(sql, addrs))) {
LOG_WARN("failed to get server ip list", KR(ret), K(sql));
}
return ret;
}
int ObLogRestoreProxyUtil::get_server_addr(const uint64_t tenant_id, common::ObIArray<common::ObAddr> &addrs)
{
int ret = OB_SUCCESS;
ObSqlString sql;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("tenant id is invalid", KR(ret), K(tenant_id));
} else if (OB_FAIL(sql.assign_fmt("SELECT distinct(SVR_IP), SVR_PORT FROM %s WHERE TENANT_ID=%ld",
OB_GV_OB_LOG_STAT_TNAME, tenant_id))) {
LOG_WARN("fail to generate sql");
} else if (OB_FAIL(construct_server_ip_list(sql, addrs))) {
LOG_WARN("failed to get server ip list", KR(ret), K(sql));
}
return ret;
}
int ObLogRestoreProxyUtil::construct_server_ip_list(const common::ObSqlString &sql, common::ObIArray<common::ObAddr> &addrs)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(sql.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("sql is empty", KR(ret), K(sql));
} else {
RESTORE_RETRY(SMART_VAR(ObMySQLProxy::MySQLResult, result) {
ObMySQLResult *res = NULL;
if (OB_FAIL(sql.assign_fmt("SELECT SVR_IP, SQL_PORT FROM %s WHERE TENANT_ID=%ld",
OB_DBA_OB_ACCESS_POINT_TNAME, tenant_id))) {
LOG_WARN("fail to generate sql");
} else if (OB_FAIL(sql_proxy_.read(result, OB_INVALID_TENANT_ID, sql.ptr()))) {
LOG_WARN("read value from DBA_OB_ACCESS_POINT failed", K(tenant_id), K(sql));
if (OB_FAIL(sql_proxy_.read(result, OB_INVALID_TENANT_ID, sql.ptr()))) {
LOG_WARN("read value from DBA_OB_ACCESS_POINT failed", K(tenant_id_), K(sql));
} else if (OB_ISNULL(res = result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("query result is null", K(tenant_id), K(sql));
LOG_WARN("query result is null", K(tenant_id_), K(sql));
} else {
while (OB_SUCC(ret) && OB_SUCC(res->next())) {
ObString tmp_ip;
@ -539,31 +569,31 @@ int ObLogRestoreProxyUtil::get_server_ip_list(const uint64_t tenant_id, common::
ObAddr addr;
EXTRACT_VARCHAR_FIELD_MYSQL(*res, "SVR_IP", tmp_ip);
EXTRACT_INT_FIELD_MYSQL(*res, "SQL_PORT", tmp_port, int32_t);
EXTRACT_INT_FIELD_MYSQL(*res, "SVR_PORT", tmp_port, int32_t);
if (OB_FAIL(ret)) {
LOG_WARN("fail to get server ip and sql port", K(tmp_ip), K(tmp_port), K(tenant_id), K(sql));
LOG_WARN("fail to get server ip and sql port", K(tmp_ip), K(tmp_port), K(tenant_id_), K(sql));
} else if (!addr.set_ip_addr(tmp_ip, tmp_port)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to set addr", K(tmp_ip), K(tmp_port), K(tenant_id), K(sql));
LOG_WARN("fail to set addr", K(tmp_ip), K(tmp_port), K(tenant_id_), K(sql));
} else if (!addr.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("addr is invalid", K(addr), K(tenant_id), K(sql));
LOG_WARN("addr is invalid", K(addr), K(tenant_id_), K(sql));
} else if (OB_FAIL(addrs.push_back(addr))) {
LOG_WARN("fail to push back addr to addrs", K(addr), K(tenant_id), K(sql));
LOG_WARN("fail to push back addr to addrs", K(addr), K(tenant_id_), K(sql));
}
}
if (OB_ITER_END != ret) {
SERVER_LOG(WARN, "failed to get ip list", K(tenant_id));
SERVER_LOG(WARN, "failed to get ip list", K(tenant_id_));
} else {
ret = OB_SUCCESS;
}
}
}
)
)
}
return ret;
}
bool ObLogRestoreProxyUtil::is_user_changed_(const char *user_name, const char *user_password, const char *db_name)
{
return user_name_ != common::ObFixedLengthString<common::OB_MAX_USER_NAME_BUF_LENGTH>(user_name)

View File

@ -24,6 +24,7 @@
#include "lib/mysqlclient/ob_mysql_connection_pool.h"
#include "share/ob_ls_id.h"
#include "share/ob_tenant_role.h"
#include "share/ob_root_addr_agent.h"//ObRootAddr
#include "share/schema/ob_schema_struct.h"
#include "logservice/palf/palf_options.h"
#include <cstdint>
@ -139,6 +140,9 @@ public:
int get_compatibility_mode(const uint64_t tenant_id, ObCompatibilityMode &compat_mode);
// get log restore source tenant access point
int get_server_ip_list(const uint64_t tenant_id, common::ObArray<common::ObAddr> &addrs);
//get tenant server ip and prot
//param[in] tenant_id : primary tenant_id
int get_server_addr(const uint64_t tenant_id, common::ObIArray<common::ObAddr> &addrs);
int check_begin_lsn(const uint64_t tenant_id);
// get log restore source tenant info, includes tenant role and tennat status
int get_tenant_info(ObTenantRole &role, schema::ObTenantStatus &status);
@ -150,6 +154,7 @@ private:
bool is_user_changed_(const char *user_name, const char *user_password, const char *db_name);
void destroy_tg_();
private:
int construct_server_ip_list(const common::ObSqlString &sql, common::ObIArray<common::ObAddr> &addrs);
bool inited_;
uint64_t tenant_id_;
int tg_id_;