[UPGRADE] Support upgrading cluster with standby tenants

This commit is contained in:
tino247 2023-01-06 02:38:11 +00:00 committed by ob-robot
parent 63178d179b
commit 3d502c830a
12 changed files with 223 additions and 82 deletions

View File

@ -37,6 +37,7 @@
#include "share/schema/ob_schema_mgr.h"
#include "share/ob_schema_status_proxy.h"//ObSchemaStatusProxy
#include "share/ob_global_stat_proxy.h"//ObGlobalStatProxy
#include "share/ob_tenant_info_proxy.h" // ObAllTenantInfoProxy
namespace oceanbase
{
@ -814,6 +815,38 @@ int ObRootInspection::inspect(bool &passed, const char* &warning_info)
return ret;
}
// standby tenant may stay at lower data version,
// root_inspection won't check standby tenant's schema.
int ObRootInspection::construct_tenant_ids_(
common::ObIArray<uint64_t> &tenant_ids)
{
int ret = OB_SUCCESS;
tenant_ids.reset();
ObArray<uint64_t> standby_tenants;
ObArray<uint64_t> tmp_tenants;
if (!inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
} else if (OB_FAIL(check_cancel())) {
LOG_WARN("check_cancel failed", KR(ret));
} else if (OB_FAIL(ObAllTenantInfoProxy::get_standby_tenants(sql_proxy_, standby_tenants))) {
LOG_WARN("fail to get standby tenants", KR(ret));
} else if (OB_FAIL(ObTenantUtils::get_tenant_ids(schema_service_, tmp_tenants))) {
LOG_WARN("get_tenant_ids failed", KR(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < tmp_tenants.count(); i++) {
const uint64_t tenant_id = tmp_tenants.at(i);
if (has_exist_in_array(standby_tenants, tenant_id)) {
// skip
} else if (OB_FAIL(tenant_ids.push_back(tenant_id))) {
LOG_WARN("fail to push back tenant_id", KR(ret), K(tenant_id));
}
} // end for
}
return ret;
}
int ObRootInspection::check_zone()
{
int ret = OB_SUCCESS;
@ -868,7 +901,7 @@ int ObRootInspection::check_sys_stat_()
LOG_WARN("schema_service is null", KR(ret));
} else if (OB_FAIL(check_cancel())) {
LOG_WARN("check_cancel failed", KR(ret));
} else if (OB_FAIL(ObTenantUtils::get_tenant_ids(schema_service_, tenant_ids))) {
} else if (OB_FAIL(construct_tenant_ids_(tenant_ids))) {
LOG_WARN("get_tenant_ids failed", KR(ret));
} else {
int backup_ret = OB_SUCCESS;
@ -930,7 +963,7 @@ int ObRootInspection::check_sys_param_()
LOG_WARN("schema_service is null", KR(ret));
} else if (OB_FAIL(check_cancel())) {
LOG_WARN("check_cancel failed", KR(ret));
} else if (OB_FAIL(ObTenantUtils::get_tenant_ids(schema_service_, tenant_ids))) {
} else if (OB_FAIL(construct_tenant_ids_(tenant_ids))) {
LOG_WARN("get_tenant_ids failed", KR(ret));
} else {
int backup_ret = OB_SUCCESS;
@ -1202,7 +1235,7 @@ int ObRootInspection::check_sys_table_schemas_()
} else if (OB_ISNULL(schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema_service is null", KR(ret));
} else if (OB_FAIL(ObTenantUtils::get_tenant_ids(schema_service_, tenant_ids))) {
} else if (OB_FAIL(construct_tenant_ids_(tenant_ids))) {
LOG_WARN("get_tenant_ids failed", KR(ret));
} else {
int backup_ret = OB_SUCCESS;
@ -1818,7 +1851,7 @@ int ObRootInspection::check_data_version_()
} else if (OB_ISNULL(schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema_service is null", KR(ret));
} else if (OB_FAIL(ObTenantUtils::get_tenant_ids(schema_service_, tenant_ids))) {
} else if (OB_FAIL(construct_tenant_ids_(tenant_ids))) {
LOG_WARN("get_tenant_ids failed", KR(ret));
} else {
int backup_ret = OB_SUCCESS;

View File

@ -187,6 +187,7 @@ public:
private:
static const int64_t NAME_BUF_LEN = 64;
typedef common::ObFixedLengthString<NAME_BUF_LEN> Name;
int construct_tenant_ids_(common::ObIArray<uint64_t> &tenant_ids);
int check_zone();
int check_sys_stat_();
int check_sys_stat_(const uint64_t tenant_id);

View File

@ -17,6 +17,7 @@
#include "share/ob_global_stat_proxy.h"
#include "share/ob_cluster_event_history_table_operator.h"//CLUSTER_EVENT_INSTANCE
#include "share/ob_primary_standby_service.h" // ObPrimaryStandbyService
#include "share/ob_tenant_info_proxy.h" //ObAllTenantInfoProxy
namespace oceanbase
{
@ -1147,15 +1148,36 @@ int ObUpgradeExecutor::construct_tenant_ids_(
common::ObIArray<uint64_t> &dst_tenant_ids)
{
int ret = OB_SUCCESS;
ObArray<uint64_t> standby_tenants;
if (OB_FAIL(check_inner_stat_())) {
LOG_WARN("fail to check inner stat", KR(ret));
} else if (OB_FAIL(ObAllTenantInfoProxy::get_standby_tenants(sql_proxy_, standby_tenants))) {
LOG_WARN("fail to get standby tenants", KR(ret));
} else if (src_tenant_ids.count() > 0) {
for (int64_t i = 0; OB_SUCC(ret) && i < src_tenant_ids.count(); i++) {
const uint64_t tenant_id = src_tenant_ids.at(i);
if (has_exist_in_array(standby_tenants, tenant_id)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support to upgrade a standby tenant", KR(ret), K(tenant_id));
}
} // end for
// tenant_list is specified
if (OB_FAIL(dst_tenant_ids.assign(src_tenant_ids))) {
if (FAILEDx(dst_tenant_ids.assign(src_tenant_ids))) {
LOG_WARN("fail to assign tenant_ids", KR(ret));
}
} else if (OB_FAIL(schema_service_->get_tenant_ids(dst_tenant_ids))) {
LOG_WARN("fail to get tenant_ids", KR(ret));
} else {
ObArray<uint64_t> tenant_ids;
if (OB_FAIL(schema_service_->get_tenant_ids(tenant_ids))) {
LOG_WARN("fail to get tenant_ids", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); i++) {
const uint64_t tenant_id = tenant_ids.at(i);
if (has_exist_in_array(standby_tenants, tenant_id)) {
// skip
} else if (OB_FAIL(dst_tenant_ids.push_back(tenant_id))) {
LOG_WARN("fail to push back tenant_id", KR(ret), K(tenant_id));
}
} // end for
}
return ret;
}

View File

@ -185,6 +185,72 @@ int ObAllTenantInfoProxy::init_tenant_info(
return ret;
}
// won't return sys/meta tenant
int ObAllTenantInfoProxy::load_all_tenant_infos(
ObISQLClient *proxy,
common::ObIArray<ObAllTenantInfo> &tenant_infos)
{
int ret = OB_SUCCESS;
tenant_infos.reset();
if (OB_ISNULL(proxy)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("proxy is null", KR(ret), KP(proxy));
} else {
ObSqlString sql;
const uint64_t exec_tenant_id = OB_SYS_TENANT_ID;
if (OB_FAIL(sql.assign_fmt("select * from %s", OB_ALL_VIRTUAL_TENANT_INFO_TNAME))) {
LOG_WARN("failed to assign sql", KR(ret), K(sql));
} else {
HEAP_VAR(ObMySQLProxy::MySQLResult, res) {
common::sqlclient::ObMySQLResult *result = NULL;
if (OB_FAIL(proxy->read(res, exec_tenant_id, sql.ptr()))) {
LOG_WARN("failed to read", KR(ret), K(exec_tenant_id), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get sql result", KR(ret));
} else {
ObAllTenantInfo tenant_info;
while (OB_SUCC(ret) && OB_SUCC(result->next())) {
if (OB_FAIL(fill_cell(result, tenant_info))) {
LOG_WARN("failed to fill cell", KR(ret), K(sql));
} else if (OB_FAIL(tenant_infos.push_back(tenant_info))) {
LOG_WARN("fail to push back tenant info", KR(ret), K(tenant_info));
}
} // end while
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
} else {
ret = OB_SUCC(ret) ? OB_ERR_UNEXPECTED : ret;
LOG_WARN("fail to iterate tenant info", KR(ret));
}
}
}
}
}
return ret;
}
int ObAllTenantInfoProxy::get_standby_tenants(
ObISQLClient *proxy,
common::ObIArray<uint64_t> &tenant_ids)
{
int ret = OB_SUCCESS;
tenant_ids.reset();
ObArray<ObAllTenantInfo> tenants;
if (OB_FAIL(load_all_tenant_infos(proxy, tenants))){
LOG_WARN("fail to load all tenant infos", KR(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < tenants.count(); i++) {
const ObAllTenantInfo &tenant_info = tenants.at(i);
if (tenant_info.is_standby()
&& OB_FAIL(tenant_ids.push_back(tenant_info.get_tenant_id()))) {
LOG_WARN("fail to push back tenant_id", KR(ret), K(tenant_info));
}
} // end for
}
return ret;
}
int ObAllTenantInfoProxy::load_tenant_info(const uint64_t tenant_id,
ObISQLClient *proxy,
const bool for_update,

View File

@ -115,6 +115,22 @@ public:
* @param[in] proxy
*/
static int init_tenant_info(const ObAllTenantInfo &tenant_info, ObISQLClient *proxy);
/**
* @description: get all normal tenant's tenant_info from inner table
* @param[in] proxy
* @param[out] tenant_infos
*/
static int load_all_tenant_infos(
ObISQLClient *proxy,
common::ObIArray<ObAllTenantInfo> &tenant_infos);
/**
* @description: get all standby tenants from inner table
* @param[in] proxy
* @param[out] tenant_ids
*/
static int get_standby_tenants(
ObISQLClient *proxy,
common::ObIArray<uint64_t> &tenant_ids);
/**
* @description: get target tenant's tenant_info from inner table
* @param[in] tenant_id

View File

@ -61,12 +61,6 @@ def do_upgrade(my_host, my_port, my_user, my_passwd, timeout, my_module_set, upg
try:
query_cur = actions.QueryCursor(cur)
actions.check_server_version_by_cluster(cur)
# 获取租户id列表
tenant_id_list = actions.fetch_tenant_ids(query_cur)
if len(tenant_id_list) <= 0:
logging.error('distinct tenant id count is <= 0, tenant_id_count: %d', len(tenant_id_list))
raise MyError('no tenant id')
logging.info('there has %s distinct tenant ids: [%s]', len(tenant_id_list), ','.join(str(tenant_id) for tenant_id in tenant_id_list))
conn.commit()
if run_modules.MODULE_HEALTH_CHECK in my_module_set:
@ -85,7 +79,7 @@ def do_upgrade(my_host, my_port, my_user, my_passwd, timeout, my_module_set, upg
if run_modules.MODULE_TENANT_UPRADE in my_module_set:
logging.info('================begin to run tenant upgrade action ===============')
conn.autocommit = True
tenant_upgrade_action.do_upgrade(conn, cur, tenant_id_list, timeout, my_user, my_passwd)
tenant_upgrade_action.do_upgrade(conn, cur, timeout, my_user, my_passwd)
conn.autocommit = False
actions.refresh_commit_sql_list()
logging.info('================succeed to run tenant upgrade action ===============')

View File

@ -60,12 +60,6 @@ def do_upgrade(my_host, my_port, my_user, my_passwd, timeout, my_module_set, upg
try:
query_cur = actions.QueryCursor(cur)
actions.check_server_version_by_cluster(cur)
# 获取租户id列表
tenant_id_list = actions.fetch_tenant_ids(query_cur)
if len(tenant_id_list) <= 0:
logging.error('distinct tenant id count is <= 0, tenant_id_count: %d', len(tenant_id_list))
raise MyError('no tenant id')
logging.info('there has %s distinct tenant ids: [%s]', len(tenant_id_list), ','.join(str(tenant_id) for tenant_id in tenant_id_list))
if run_modules.MODULE_BEGIN_UPGRADE in my_module_set:
logging.info('================begin to run begin upgrade action===============')
@ -86,7 +80,7 @@ def do_upgrade(my_host, my_port, my_user, my_passwd, timeout, my_module_set, upg
if run_modules.MODULE_SPECIAL_ACTION in my_module_set:
logging.info('================begin to run special action===============')
conn.autocommit = True
special_upgrade_action_pre.do_special_upgrade(conn, cur, tenant_id_list, timeout, my_user, my_passwd)
special_upgrade_action_pre.do_special_upgrade(conn, cur, timeout, my_user, my_passwd)
conn.autocommit = False
actions.refresh_commit_sql_list()
logging.info('================succeed to run special action===============')

View File

@ -17,7 +17,7 @@ import actions
import sys
# 主库需要执行的升级动作
def do_special_upgrade(conn, cur, tenant_id_list, timeout, user, passwd):
def do_special_upgrade(conn, cur, timeout, user, passwd):
# special upgrade action
#升级语句对应的action要写在下面的actions begin和actions end这两行之间,
#因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end

View File

@ -10,7 +10,7 @@ import mysql.connector
from mysql.connector import errorcode
import actions
def do_upgrade(conn, cur, tenant_id_list, timeout, user, pwd):
def do_upgrade(conn, cur, timeout, user, pwd):
# upgrade action
#升级语句对应的action要写在下面的actions begin和actions end这两行之间,
#因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
@ -150,16 +150,16 @@ def check_can_run_upgrade_job(cur, job_name):
def check_upgrade_job_result(cur, job_name, timeout, max_used_job_id):
try:
times = (timeout if timeout > 0 else 1800) / 10
times = (timeout if timeout > 0 else 3600) / 10
while (times >= 0):
sql = """select job_status, rs_svr_ip, rs_svr_port from oceanbase.__all_rootservice_job
sql = """select job_status, rs_svr_ip, rs_svr_port, gmt_create from oceanbase.__all_rootservice_job
where job_type = '{0}' and job_id > {1} order by job_id desc limit 1
""".format(job_name, max_used_job_id)
results = query(cur, sql)
if (len(results) == 0):
logging.info("upgrade job not created yet")
elif (len(results) != 1 or len(results[0]) != 3):
elif (len(results) != 1 or len(results[0]) != 4):
logging.warn("row cnt not match")
raise e
elif ("INPROGRESS" == results[0][0]):
@ -168,13 +168,23 @@ def check_upgrade_job_result(cur, job_name, timeout, max_used_job_id):
if times % 10 == 0:
ip = results[0][1]
port = results[0][2]
gmt_create = results[0][3]
sql = """select count(*) from oceanbase.__all_virtual_core_meta_table where role = 1 and svr_ip = '{0}' and svr_port = {1}""".format(ip, port)
results = query(cur, sql)
if (len(results) != 1 or len(results[0]) != 1):
logging.warn("row/column cnt not match")
raise e
elif results[0][0] == 1:
logging.info("rs[{0}:{1}] still exist, keep waiting".format(ip, port))
sql = """select count(*) from oceanbase.__all_rootservice_event_history where gmt_create > '{0}' and event = 'full_rootservice'""".format(gmt_create)
results = query(cur, sql)
if (len(results) != 1 or len(results[0]) != 1):
logging.warn("row/column cnt not match")
raise e
elif results[0][0] > 0:
logging.warn("rs changed, should check if upgrade job is still running")
raise e
else:
logging.info("rs[{0}:{1}] still exist, keep waiting".format(ip, port))
else:
logging.warn("rs changed or not exist, should check if upgrade job is still running")
raise e

View File

@ -536,12 +536,6 @@
# try:
# query_cur = actions.QueryCursor(cur)
# actions.check_server_version_by_cluster(cur)
# # 获取租户id列表
# tenant_id_list = actions.fetch_tenant_ids(query_cur)
# if len(tenant_id_list) <= 0:
# logging.error('distinct tenant id count is <= 0, tenant_id_count: %d', len(tenant_id_list))
# raise MyError('no tenant id')
# logging.info('there has %s distinct tenant ids: [%s]', len(tenant_id_list), ','.join(str(tenant_id) for tenant_id in tenant_id_list))
# conn.commit()
#
# if run_modules.MODULE_HEALTH_CHECK in my_module_set:
@ -560,7 +554,7 @@
# if run_modules.MODULE_TENANT_UPRADE in my_module_set:
# logging.info('================begin to run tenant upgrade action ===============')
# conn.autocommit = True
# tenant_upgrade_action.do_upgrade(conn, cur, tenant_id_list, timeout, my_user, my_passwd)
# tenant_upgrade_action.do_upgrade(conn, cur, timeout, my_user, my_passwd)
# conn.autocommit = False
# actions.refresh_commit_sql_list()
# logging.info('================succeed to run tenant upgrade action ===============')
@ -706,12 +700,6 @@
# try:
# query_cur = actions.QueryCursor(cur)
# actions.check_server_version_by_cluster(cur)
# # 获取租户id列表
# tenant_id_list = actions.fetch_tenant_ids(query_cur)
# if len(tenant_id_list) <= 0:
# logging.error('distinct tenant id count is <= 0, tenant_id_count: %d', len(tenant_id_list))
# raise MyError('no tenant id')
# logging.info('there has %s distinct tenant ids: [%s]', len(tenant_id_list), ','.join(str(tenant_id) for tenant_id in tenant_id_list))
#
# if run_modules.MODULE_BEGIN_UPGRADE in my_module_set:
# logging.info('================begin to run begin upgrade action===============')
@ -732,7 +720,7 @@
# if run_modules.MODULE_SPECIAL_ACTION in my_module_set:
# logging.info('================begin to run special action===============')
# conn.autocommit = True
# special_upgrade_action_pre.do_special_upgrade(conn, cur, tenant_id_list, timeout, my_user, my_passwd)
# special_upgrade_action_pre.do_special_upgrade(conn, cur, timeout, my_user, my_passwd)
# conn.autocommit = False
# actions.refresh_commit_sql_list()
# logging.info('================succeed to run special action===============')
@ -1214,7 +1202,7 @@
#import sys
#
## 主库需要执行的升级动作
#def do_special_upgrade(conn, cur, tenant_id_list, timeout, user, passwd):
#def do_special_upgrade(conn, cur, timeout, user, passwd):
# # special upgrade action
##升级语句对应的action要写在下面的actions begin和actions end这两行之间,
##因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
@ -1258,7 +1246,7 @@
#from mysql.connector import errorcode
#import actions
#
#def do_upgrade(conn, cur, tenant_id_list, timeout, user, pwd):
#def do_upgrade(conn, cur, timeout, user, pwd):
# # upgrade action
##升级语句对应的action要写在下面的actions begin和actions end这两行之间,
##因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
@ -1398,16 +1386,16 @@
#
#def check_upgrade_job_result(cur, job_name, timeout, max_used_job_id):
# try:
# times = (timeout if timeout > 0 else 1800) / 10
# times = (timeout if timeout > 0 else 3600) / 10
# while (times >= 0):
# sql = """select job_status, rs_svr_ip, rs_svr_port from oceanbase.__all_rootservice_job
# sql = """select job_status, rs_svr_ip, rs_svr_port, gmt_create from oceanbase.__all_rootservice_job
# where job_type = '{0}' and job_id > {1} order by job_id desc limit 1
# """.format(job_name, max_used_job_id)
# results = query(cur, sql)
#
# if (len(results) == 0):
# logging.info("upgrade job not created yet")
# elif (len(results) != 1 or len(results[0]) != 3):
# elif (len(results) != 1 or len(results[0]) != 4):
# logging.warn("row cnt not match")
# raise e
# elif ("INPROGRESS" == results[0][0]):
@ -1416,13 +1404,23 @@
# if times % 10 == 0:
# ip = results[0][1]
# port = results[0][2]
# gmt_create = results[0][3]
# sql = """select count(*) from oceanbase.__all_virtual_core_meta_table where role = 1 and svr_ip = '{0}' and svr_port = {1}""".format(ip, port)
# results = query(cur, sql)
# if (len(results) != 1 or len(results[0]) != 1):
# logging.warn("row/column cnt not match")
# raise e
# elif results[0][0] == 1:
# logging.info("rs[{0}:{1}] still exist, keep waiting".format(ip, port))
# sql = """select count(*) from oceanbase.__all_rootservice_event_history where gmt_create > '{0}' and event = 'full_rootservice'""".format(gmt_create)
# results = query(cur, sql)
# if (len(results) != 1 or len(results[0]) != 1):
# logging.warn("row/column cnt not match")
# raise e
# elif results[0][0] > 0:
# logging.warn("rs changed, should check if upgrade job is still running")
# raise e
# else:
# logging.info("rs[{0}:{1}] still exist, keep waiting".format(ip, port))
# else:
# logging.warn("rs changed or not exist, should check if upgrade job is still running")
# raise e
@ -2420,16 +2418,19 @@
# current_data_version = actions.get_current_data_version()
# actions.wait_parameter_sync(cur, "compatible", current_data_version, 10)
#
# # check target_data_version/current_data_version
# sql = "select count(*) from oceanbase.__all_tenant"
# # check target_data_version/current_data_version except standby tenant
# sql = "select tenant_id from oceanbase.__all_tenant except select tenant_id from oceanbase.__all_virtual_tenant_info where tenant_role = 'STANDBY'"
# (desc, results) = query_cur.exec_query(sql)
# if len(results) != 1 or len(results[0]) != 1:
# if len(results) == 0:
# logging.warn('result cnt not match')
# raise e
# tenant_count = results[0][0]
# tenant_count = len(results)
# tenant_ids_str = ''
# for index, row in enumerate(results):
# tenant_ids_str += """{0}{1}""".format((',' if index > 0 else ''), row[0])
#
# int_current_data_version = actions.get_version(current_data_version)
# sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0}".format(int_current_data_version)
# sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0} and tenant_id in ({1})".format(int_current_data_version, tenant_ids_str)
# (desc, results) = query_cur.exec_query(sql)
# if len(results) != 1 or len(results[0]) != 1:
# logging.warn('result cnt not match')

View File

@ -22,16 +22,19 @@ def check_data_version(cur, query_cur, timeout):
current_data_version = actions.get_current_data_version()
actions.wait_parameter_sync(cur, "compatible", current_data_version, 10)
# check target_data_version/current_data_version
sql = "select count(*) from oceanbase.__all_tenant"
# check target_data_version/current_data_version except standby tenant
sql = "select tenant_id from oceanbase.__all_tenant except select tenant_id from oceanbase.__all_virtual_tenant_info where tenant_role = 'STANDBY'"
(desc, results) = query_cur.exec_query(sql)
if len(results) != 1 or len(results[0]) != 1:
if len(results) == 0:
logging.warn('result cnt not match')
raise e
tenant_count = results[0][0]
tenant_count = len(results)
tenant_ids_str = ''
for index, row in enumerate(results):
tenant_ids_str += """{0}{1}""".format((',' if index > 0 else ''), row[0])
int_current_data_version = actions.get_version(current_data_version)
sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0}".format(int_current_data_version)
sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0} and tenant_id in ({1})".format(int_current_data_version, tenant_ids_str)
(desc, results) = query_cur.exec_query(sql)
if len(results) != 1 or len(results[0]) != 1:
logging.warn('result cnt not match')

View File

@ -536,12 +536,6 @@
# try:
# query_cur = actions.QueryCursor(cur)
# actions.check_server_version_by_cluster(cur)
# # 获取租户id列表
# tenant_id_list = actions.fetch_tenant_ids(query_cur)
# if len(tenant_id_list) <= 0:
# logging.error('distinct tenant id count is <= 0, tenant_id_count: %d', len(tenant_id_list))
# raise MyError('no tenant id')
# logging.info('there has %s distinct tenant ids: [%s]', len(tenant_id_list), ','.join(str(tenant_id) for tenant_id in tenant_id_list))
# conn.commit()
#
# if run_modules.MODULE_HEALTH_CHECK in my_module_set:
@ -560,7 +554,7 @@
# if run_modules.MODULE_TENANT_UPRADE in my_module_set:
# logging.info('================begin to run tenant upgrade action ===============')
# conn.autocommit = True
# tenant_upgrade_action.do_upgrade(conn, cur, tenant_id_list, timeout, my_user, my_passwd)
# tenant_upgrade_action.do_upgrade(conn, cur, timeout, my_user, my_passwd)
# conn.autocommit = False
# actions.refresh_commit_sql_list()
# logging.info('================succeed to run tenant upgrade action ===============')
@ -706,12 +700,6 @@
# try:
# query_cur = actions.QueryCursor(cur)
# actions.check_server_version_by_cluster(cur)
# # 获取租户id列表
# tenant_id_list = actions.fetch_tenant_ids(query_cur)
# if len(tenant_id_list) <= 0:
# logging.error('distinct tenant id count is <= 0, tenant_id_count: %d', len(tenant_id_list))
# raise MyError('no tenant id')
# logging.info('there has %s distinct tenant ids: [%s]', len(tenant_id_list), ','.join(str(tenant_id) for tenant_id in tenant_id_list))
#
# if run_modules.MODULE_BEGIN_UPGRADE in my_module_set:
# logging.info('================begin to run begin upgrade action===============')
@ -732,7 +720,7 @@
# if run_modules.MODULE_SPECIAL_ACTION in my_module_set:
# logging.info('================begin to run special action===============')
# conn.autocommit = True
# special_upgrade_action_pre.do_special_upgrade(conn, cur, tenant_id_list, timeout, my_user, my_passwd)
# special_upgrade_action_pre.do_special_upgrade(conn, cur, timeout, my_user, my_passwd)
# conn.autocommit = False
# actions.refresh_commit_sql_list()
# logging.info('================succeed to run special action===============')
@ -1214,7 +1202,7 @@
#import sys
#
## 主库需要执行的升级动作
#def do_special_upgrade(conn, cur, tenant_id_list, timeout, user, passwd):
#def do_special_upgrade(conn, cur, timeout, user, passwd):
# # special upgrade action
##升级语句对应的action要写在下面的actions begin和actions end这两行之间,
##因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
@ -1258,7 +1246,7 @@
#from mysql.connector import errorcode
#import actions
#
#def do_upgrade(conn, cur, tenant_id_list, timeout, user, pwd):
#def do_upgrade(conn, cur, timeout, user, pwd):
# # upgrade action
##升级语句对应的action要写在下面的actions begin和actions end这两行之间,
##因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
@ -1398,16 +1386,16 @@
#
#def check_upgrade_job_result(cur, job_name, timeout, max_used_job_id):
# try:
# times = (timeout if timeout > 0 else 1800) / 10
# times = (timeout if timeout > 0 else 3600) / 10
# while (times >= 0):
# sql = """select job_status, rs_svr_ip, rs_svr_port from oceanbase.__all_rootservice_job
# sql = """select job_status, rs_svr_ip, rs_svr_port, gmt_create from oceanbase.__all_rootservice_job
# where job_type = '{0}' and job_id > {1} order by job_id desc limit 1
# """.format(job_name, max_used_job_id)
# results = query(cur, sql)
#
# if (len(results) == 0):
# logging.info("upgrade job not created yet")
# elif (len(results) != 1 or len(results[0]) != 3):
# elif (len(results) != 1 or len(results[0]) != 4):
# logging.warn("row cnt not match")
# raise e
# elif ("INPROGRESS" == results[0][0]):
@ -1416,13 +1404,23 @@
# if times % 10 == 0:
# ip = results[0][1]
# port = results[0][2]
# gmt_create = results[0][3]
# sql = """select count(*) from oceanbase.__all_virtual_core_meta_table where role = 1 and svr_ip = '{0}' and svr_port = {1}""".format(ip, port)
# results = query(cur, sql)
# if (len(results) != 1 or len(results[0]) != 1):
# logging.warn("row/column cnt not match")
# raise e
# elif results[0][0] == 1:
# logging.info("rs[{0}:{1}] still exist, keep waiting".format(ip, port))
# sql = """select count(*) from oceanbase.__all_rootservice_event_history where gmt_create > '{0}' and event = 'full_rootservice'""".format(gmt_create)
# results = query(cur, sql)
# if (len(results) != 1 or len(results[0]) != 1):
# logging.warn("row/column cnt not match")
# raise e
# elif results[0][0] > 0:
# logging.warn("rs changed, should check if upgrade job is still running")
# raise e
# else:
# logging.info("rs[{0}:{1}] still exist, keep waiting".format(ip, port))
# else:
# logging.warn("rs changed or not exist, should check if upgrade job is still running")
# raise e
@ -2420,16 +2418,19 @@
# current_data_version = actions.get_current_data_version()
# actions.wait_parameter_sync(cur, "compatible", current_data_version, 10)
#
# # check target_data_version/current_data_version
# sql = "select count(*) from oceanbase.__all_tenant"
# # check target_data_version/current_data_version except standby tenant
# sql = "select tenant_id from oceanbase.__all_tenant except select tenant_id from oceanbase.__all_virtual_tenant_info where tenant_role = 'STANDBY'"
# (desc, results) = query_cur.exec_query(sql)
# if len(results) != 1 or len(results[0]) != 1:
# if len(results) == 0:
# logging.warn('result cnt not match')
# raise e
# tenant_count = results[0][0]
# tenant_count = len(results)
# tenant_ids_str = ''
# for index, row in enumerate(results):
# tenant_ids_str += """{0}{1}""".format((',' if index > 0 else ''), row[0])
#
# int_current_data_version = actions.get_version(current_data_version)
# sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0}".format(int_current_data_version)
# sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0} and tenant_id in ({1})".format(int_current_data_version, tenant_ids_str)
# (desc, results) = query_cur.exec_query(sql)
# if len(results) != 1 or len(results[0]) != 1:
# logging.warn('result cnt not match')