diff --git a/tools/upgrade/upgrade_checker.py b/tools/upgrade/upgrade_checker.py index 83394020a..76207a22b 100755 --- a/tools/upgrade/upgrade_checker.py +++ b/tools/upgrade/upgrade_checker.py @@ -285,16 +285,16 @@ def config_logging_module(log_filenamme): #### ---------------end---------------------- - +fail_list=[] #### START #### # 1. 检查前置版本 def check_observer_version(query_cur, upgrade_params): (desc, results) = query_cur.exec_query("""select distinct value from GV$OB_PARAMETERS where name='min_observer_version'""") if len(results) != 1: - raise MyError('query results count is not 1') + fail_list.append('query results count is not 1') elif cmp(results[0][0], upgrade_params.old_version) < 0 : - raise MyError('old observer version is expected equal or higher then: {0}, actual version:{1}'.format(upgrade_params.old_version, results[0][0])) + fail_list.append('old observer version is expected equal or higher then: {0}, actual version:{1}'.format(upgrade_params.old_version, results[0][0])) logging.info('check observer version success, version = {0}'.format(results[0][0])) # 2. 检查paxos副本是否同步, paxos副本是否缺失 @@ -302,7 +302,7 @@ def check_paxos_replica(query_cur): # 2.1 检查paxos副本是否同步 (desc, results) = query_cur.exec_query("""select count(1) as unsync_cnt from GV$OB_LOG_STAT where in_sync = 'NO'""") if results[0][0] > 0 : - raise MyError('{0} replicas unsync, please check'.format(results[0][0])) + fail_list.append('{0} replicas unsync, please check'.format(results[0][0])) # 2.2 检查paxos副本是否有缺失 TODO logging.info('check paxos replica success') @@ -311,11 +311,11 @@ def check_rebalance_task(query_cur): # 3.1 检查是否有做locality变更 (desc, results) = query_cur.exec_query("""select count(1) as cnt from DBA_OB_TENANT_JOBS where job_status='INPROGRESS' and result_code is null""") if results[0][0] > 0 : - raise MyError('{0} locality tasks is doing, please check'.format(results[0][0])) + fail_list.append('{0} locality tasks is doing, please check'.format(results[0][0])) # 3.2 检查是否有做balance (desc, results) = query_cur.exec_query("""select count(1) as rebalance_task_cnt from CDB_OB_LS_REPLICA_TASKS""") if results[0][0] > 0 : - raise MyError('{0} rebalance tasks is doing, please check'.format(results[0][0])) + fail_list.append('{0} rebalance tasks is doing, please check'.format(results[0][0])) logging.info('check rebalance task success') # 4. 检查集群状态 @@ -323,33 +323,65 @@ def check_cluster_status(query_cur): # 4.1 检查是否非合并状态 (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_MAJOR_COMPACTION where STATUS != 'IDLE'""") if results[0][0] > 0 : - raise MyError('{0} tenant is merging, please check'.format(results[0][0])) + fail_list.append('{0} tenant is merging, please check'.format(results[0][0])) logging.info('check cluster status success') - -# 16. 修改永久下线的时间,避免升级过程中缺副本 -def modify_server_permanent_offline_time(cur): - set_parameter(cur, 'server_permanent_offline_time', '72h') - -# 23. 检查是否有异常租户(creating,延迟删除,恢复中) + +# 5. 检查是否有异常租户(creating,延迟删除,恢复中) def check_tenant_status(query_cur): (desc, results) = query_cur.exec_query("""select count(*) as count from DBA_OB_TENANTS where status != 'NORMAL'""") if len(results) != 1 or len(results[0]) != 1: - raise MyError('results len not match') + fail_list.append('results len not match') elif 0 != results[0][0]: - raise MyError('has abnormal tenant, should stop') + fail_list.append('has abnormal tenant, should stop') else: logging.info('check tenant status success') -# 27. 检查无恢复任务 +# 6. 检查无恢复任务 def check_restore_job_exist(query_cur): (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_RESTORE_PROGRESS""") if len(results) != 1 or len(results[0]) != 1: - raise MyError('failed to restore job cnt') + fail_list.append('failed to restore job cnt') elif results[0][0] != 0: - raise MyError("""still has restore job, upgrade is not allowed temporarily""") + fail_list.append("""still has restore job, upgrade is not allowed temporarily""") logging.info('check restore job success') +def check_is_primary_zone_distributed(primary_zone_str): + semicolon_pos = len(primary_zone_str) + for i in range(len(primary_zone_str)): + if primary_zone_str[i] == ';': + semicolon_pos = i + break + comma_pos = len(primary_zone_str) + for j in range(len(primary_zone_str)): + if primary_zone_str[j] == ',': + comma_pos = j + break + if comma_pos < semicolon_pos: + return True + else: + return False + +# 7. 升级前需要primary zone只有一个 +def check_tenant_primary_zone(query_cur): + (desc, results) = query_cur.exec_query("""select tenant_name,primary_zone from DBA_OB_TENANTS where tenant_id != 1"""); + for item in results: + if cmp(item[1], "RANDOM") == 0: + fail_list.append('{0} tenant primary zone random before update not allowed'.format(item[0])) + elif check_is_primary_zone_distributed(item[1]): + fail_list.append('{0} tenant primary zone distributed before update not allowed'.format(item[0])) + logging.info('check tenant primary zone success') + +# 8. 修改永久下线的时间,避免升级过程中缺副本 +def modify_server_permanent_offline_time(cur): + set_parameter(cur, 'server_permanent_offline_time', '72h') + +# last check of do_check, make sure no function execute after check_fail_list +def check_fail_list(): + if len(fail_list) != 0 : + error_msg ="upgrade checker failed with " + str(len(fail_list)) + " reasons: " + ", ".join(['['+x+"] " for x in fail_list]) + raise MyError(error_msg) + # 开始升级前的检查 def do_check(my_host, my_port, my_user, my_passwd, upgrade_params): try: @@ -368,8 +400,11 @@ def do_check(my_host, my_port, my_user, my_passwd, upgrade_params): check_rebalance_task(query_cur) check_cluster_status(query_cur) check_tenant_status(query_cur) - modify_server_permanent_offline_time(cur) check_restore_job_exist(query_cur) + check_tenant_primary_zone(query_cur) + # all check func should execute before check_fail_list + check_fail_list() + #modify_server_permanent_offline_time(cur) except Exception, e: logging.exception('run error') raise e diff --git a/tools/upgrade/upgrade_post.py b/tools/upgrade/upgrade_post.py index 1929271c3..b57b9b49d 100755 --- a/tools/upgrade/upgrade_post.py +++ b/tools/upgrade/upgrade_post.py @@ -13682,7 +13682,7 @@ # #class UpgradeParams: # log_filename = 'upgrade_checker.log' -# old_version = '4.0.0' +# old_version = '4.0.0.0' ##### --------------start : my_error.py -------------- #class MyError(Exception): # def __init__(self, value): @@ -13956,286 +13956,102 @@ ##### ---------------end---------------------- # # -# +#fail_list=[] # ##### START #### ## 1. 检查前置版本 #def check_observer_version(query_cur, upgrade_params): -# (desc, results) = query_cur.exec_query("""select distinct value from __all_virtual_sys_parameter_stat where name='min_observer_version'""") +# (desc, results) = query_cur.exec_query("""select distinct value from GV$OB_PARAMETERS where name='min_observer_version'""") # if len(results) != 1: -# raise MyError('query results count is not 1') +# fail_list.append('query results count is not 1') # elif cmp(results[0][0], upgrade_params.old_version) < 0 : -# raise MyError('old observer version is expected equal or higher then: {0}, actual version:{1}'.format(upgrade_params.old_version, results[0][0])) +# fail_list.append('old observer version is expected equal or higher then: {0}, actual version:{1}'.format(upgrade_params.old_version, results[0][0])) # logging.info('check observer version success, version = {0}'.format(results[0][0])) # ## 2. 检查paxos副本是否同步, paxos副本是否缺失 #def check_paxos_replica(query_cur): # # 2.1 检查paxos副本是否同步 -# (desc, results) = query_cur.exec_query("""select count(1) as unsync_cnt from __all_virtual_clog_stat where is_in_sync = 0 and is_offline = 0 and replica_type != 16""") +# (desc, results) = query_cur.exec_query("""select count(1) as unsync_cnt from GV$OB_LOG_STAT where in_sync = 'NO'""") # if results[0][0] > 0 : -# raise MyError('{0} replicas unsync, please check'.format(results[0][0])) +# fail_list.append('{0} replicas unsync, please check'.format(results[0][0])) # # 2.2 检查paxos副本是否有缺失 TODO # logging.info('check paxos replica success') # ## 3. 检查是否有做balance, locality变更 #def check_rebalance_task(query_cur): # # 3.1 检查是否有做locality变更 -# (desc, results) = query_cur.exec_query("""select count(1) as cnt from __all_rootservice_job where job_status='INPROGRESS' and return_code is null""") +# (desc, results) = query_cur.exec_query("""select count(1) as cnt from DBA_OB_TENANT_JOBS where job_status='INPROGRESS' and result_code is null""") # if results[0][0] > 0 : -# raise MyError('{0} locality tasks is doing, please check'.format(results[0][0])) +# fail_list.append('{0} locality tasks is doing, please check'.format(results[0][0])) # # 3.2 检查是否有做balance -# (desc, results) = query_cur.exec_query("""select count(1) as rebalance_task_cnt from __all_virtual_rebalance_task_stat""") +# (desc, results) = query_cur.exec_query("""select count(1) as rebalance_task_cnt from CDB_OB_LS_REPLICA_TASKS""") # if results[0][0] > 0 : -# raise MyError('{0} rebalance tasks is doing, please check'.format(results[0][0])) +# fail_list.append('{0} rebalance tasks is doing, please check'.format(results[0][0])) # logging.info('check rebalance task success') # ## 4. 检查集群状态 #def check_cluster_status(query_cur): # # 4.1 检查是否非合并状态 -# (desc, results) = query_cur.exec_query("""select info from __all_zone where zone='' and name='merge_status'""") -# if cmp(results[0][0], 'IDLE') != 0 : -# raise MyError('global status expected = {0}, actual = {1}'.format('IDLE', results[0][0])) -# logging.info('check cluster status success') -# # 4.2 检查合并版本是否>=3 -# (desc, results) = query_cur.exec_query("""select cast(value as unsigned) value from __all_zone where zone='' and name='last_merged_version'""") -# if results[0][0] < 2 : -# raise MyError('global last_merged_version expected >= 2 actual = {0}'.format(results[0][0])) -# logging.info('check global last_merged_version success') -# -## 5. 检查没有打开enable_separate_sys_clog -#def check_disable_separate_sys_clog(query_cur): -# (desc, results) = query_cur.exec_query("""select count(1) from __all_sys_parameter where name like 'enable_separate_sys_clog'""") +# (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_MAJOR_COMPACTION where STATUS != 'IDLE'""") # if results[0][0] > 0 : -# raise MyError('enable_separate_sys_clog is true, unexpected') -# logging.info('check separate_sys_clog success') -# -## 6. 检查配置宏块的data_seq -#def check_macro_block_data_seq(query_cur): -# query_cur.exec_sql("""set ob_query_timeout=1800000000""") -# row_count = query_cur.exec_sql("""select * from __all_virtual_partition_sstable_macro_info where data_seq < 0 limit 1""") -# if row_count != 0: -# raise MyError('check_macro_block_data_seq failed, too old macro block needs full merge') -# logging.info('check_macro_block_data_seq success') -# -## 8. 检查租户的resource_pool内存规格, 要求F类型unit最小内存大于5G,L类型unit最小内存大于2G. -#def check_tenant_resource_pool(query_cur): -# (desc, results) = query_cur.exec_query("""select 1 from v$sysstat where name = 'is mini mode' and value = '1' and con_id = 1 limit 1""") -# if len(results) > 0: -# # mini部署的集群,租户规格可以很小,这里跳过检查 -# pass -# else: -# (desc, results) = query_cur.exec_query("""select count(*) from oceanbase.__all_resource_pool a, oceanbase.__all_unit_config b where a.unit_config_id = b.unit_config_id and b.unit_config_id != 100 and a.replica_type=0 and b.min_memory < 5368709120""") -# if results[0][0] > 0 : -# raise MyError('{0} tenant resource pool unit config is less than 5G, please check'.format(results[0][0])) -# (desc, results) = query_cur.exec_query("""select count(*) from oceanbase.__all_resource_pool a, oceanbase.__all_unit_config b where a.unit_config_id = b.unit_config_id and b.unit_config_id != 100 and a.replica_type=5 and b.min_memory < 2147483648""") -# if results[0][0] > 0 : -# raise MyError('{0} tenant logonly resource pool unit config is less than 2G, please check'.format(results[0][0])) -# -## 9. 检查是否有日志型副本分布在Full类型unit中 -## 2020-12-31 根据外部使用L副本且L型unit功能不成熟的需求,将这个检查去掉. -# -## 10. 检查租户分区数是否超出内存限制 -#def check_tenant_part_num(query_cur): -# # 统计每个租户在各个server上的分区数量 -# (desc, res_part_num) = query_cur.exec_query("""select svr_ip, svr_port, table_id >> 40 as tenant_id, count(*) as part_num from __all_virtual_clog_stat group by 1,2,3 order by 1,2,3""") -# # 计算每个租户在每个server上的max_memory -# (desc, res_unit_memory) = query_cur.exec_query("""select u.svr_ip, u.svr_port, t.tenant_id, uc.max_memory, p.replica_type from __all_unit u, __All_resource_pool p, __all_tenant t, __all_unit_config uc where p.resource_pool_id = u.resource_pool_id and t.tenant_id = p.tenant_id and p.unit_config_id = uc.unit_config_id""") -# # 查询每个server的memstore_limit_percentage -# (desc, res_svr_memstore_percent) = query_cur.exec_query("""select svr_ip, svr_port, name, value from __all_virtual_sys_parameter_stat where name = 'memstore_limit_percentage'""") -# part_static_cost = 128 * 1024 -# part_dynamic_cost = 400 * 1024 -# # 考虑到升级过程中可能有建表的需求,因此预留512个分区 -# part_num_reserved = 512 -# for line in res_part_num: -# svr_ip = line[0] -# svr_port = line[1] -# tenant_id = line[2] -# part_num = line[3] -# for uline in res_unit_memory: -# uip = uline[0] -# uport = uline[1] -# utid = uline[2] -# umem = uline[3] -# utype = uline[4] -# if svr_ip == uip and svr_port == uport and tenant_id == utid: -# for mpline in res_svr_memstore_percent: -# mpip = mpline[0] -# mpport = mpline[1] -# if mpip == uip and mpport == uport: -# mspercent = int(mpline[3]) -# mem_limit = umem -# if 0 == utype: -# # full类型的unit需要为memstore预留内存 -# mem_limit = umem * (100 - mspercent) / 100 -# part_num_limit = mem_limit / (part_static_cost + part_dynamic_cost / 10); -# if part_num_limit <= 1000: -# part_num_limit = mem_limit / (part_static_cost + part_dynamic_cost) -# if part_num >= (part_num_limit - part_num_reserved): -# raise MyError('{0} {1} {2} exceed tenant partition num limit, please check'.format(line, uline, mpline)) -# break -# logging.info('check tenant partition num success') -# -## 11. 检查存在租户partition,但是不存在unit的observer -#def check_tenant_resource(query_cur): -# (desc, res_unit) = query_cur.exec_query("""select tenant_id, svr_ip, svr_port from __all_virtual_partition_info where (tenant_id, svr_ip, svr_port) not in (select tenant_id, svr_ip, svr_port from __all_unit, __all_resource_pool where __all_unit.resource_pool_id = __all_resource_pool.resource_pool_id group by tenant_id, svr_ip, svr_port) group by tenant_id, svr_ip, svr_port""") -# for line in res_unit: -# raise MyError('{0} tenant unit not exist but partition exist'.format(line)) -# logging.info("check tenant resource success") -# -## 12. 检查系统表(__all_table_history)索引生效情况 -#def check_sys_index_status(query_cur): -# (desc, results) = query_cur.exec_query("""select count(*) as cnt from __all_table where data_table_id = 1099511627890 and table_id = 1099511637775 and index_type = 1 and index_status = 2""") -# if len(results) != 1 or results[0][0] != 1: -# raise MyError("""__all_table_history's index status not valid""") -# logging.info("""check __all_table_history's index status success""") -# -## 14. 检查升级前是否有只读zone -#def check_readonly_zone(query_cur): -# (desc, results) = query_cur.exec_query("""select count(*) from __all_zone where name='zone_type' and info='ReadOnly'""") -# if results[0][0] != 0: -# raise MyError("""check_readonly_zone failed, ob2.2 not support readonly_zone""") -# logging.info("""check_readonly_zone success""") -# -## 16. 修改永久下线的时间,避免升级过程中缺副本 -#def modify_server_permanent_offline_time(cur): -# set_parameter(cur, 'server_permanent_offline_time', '72h') -# -## 17. 修改安全删除副本时间 -#def modify_replica_safe_remove_time(cur): -# set_parameter(cur, 'replica_safe_remove_time', '72h') -# -## 18. 检查progressive_merge_round都升到1 -# -## 19. 从小于224的版本升级上来时,需要确认high_priority_net_thread_count配置项值为0 (224版本开始该值默认为1) -#def check_high_priority_net_thread_count_before_224(query_cur): -# # 获取最小版本 -# (desc, results) = query_cur.exec_query("""select distinct value from __all_virtual_sys_parameter_stat where name='min_observer_version'""") -# if len(results) != 1: -# raise MyError('distinct observer version exist') -# elif cmp(results[0][0], "2.2.40") >= 0 : -# # 最小版本大于等于2.2.40,忽略检查 -# logging.info('cluster version ({0}) is greate than or equal to 2.2.40, need not check high_priority_net_thread_count'.format(results[0][0])) -# else: -# # 低于224版本的需要确认配置项值为0 -# logging.info('cluster version is ({0}), need check high_priority_net_thread_count'.format(results[0][0])) -# (desc, results) = query_cur.exec_query("""select count(*) from __all_sys_parameter where name like 'high_priority_net_thread_count' and value not like '0'""") -# if results[0][0] > 0: -# raise MyError('high_priority_net_thread_count is greater than 0, unexpected') -# logging.info('check high_priority_net_thread_count finished') -# -## 20. 从小于226的版本升级上来时,要求不能有备库存在 -#def check_standby_cluster(query_cur): -# # 获取最小版本 -# (desc, results) = query_cur.exec_query("""select distinct value from __all_virtual_sys_parameter_stat where name='min_observer_version'""") -# if len(results) != 1: -# raise MyError('distinct observer version exist') -# elif cmp(results[0][0], "2.2.60") >= 0 : -# # 最小版本大于等于2.2.60,忽略检查 -# logging.info('cluster version ({0}) is greate than or equal to 2.2.60, need not check standby cluster'.format(results[0][0])) -# else: -# logging.info('cluster version is ({0}), need check standby cluster'.format(results[0][0])) -# (desc, results) = query_cur.exec_query("""select count(*) as cnt from __all_table where table_name = '__all_cluster'""") -# if results[0][0] == 0: -# logging.info('cluster ({0}) has no __all_cluster table, no standby cluster'.format(results[0][0])) -# else: -# (desc, results) = query_cur.exec_query("""select count(*) as cnt from __all_cluster""") -# if results[0][0] > 1: -# raise MyError("""multiple cluster exist in __all_cluster, maybe standby clusters added, not supported""") -# logging.info('check standby cluster from __all_cluster success') -# -## 21. 3.0是barrier版本,要求列column_id修正的升级任务做完 -#def check_schema_split_v2_finish(query_cur): -# # 获取最小版本 -# sql = """select cast(column_value as signed) as version from __all_core_table -# where table_name='__all_global_stat' and column_name = 'split_schema_version_v2'""" -# (desc, results) = query_cur.exec_query(sql) -# if len(results) != 1 or len(results[0]) != 1: -# raise MyError('row or column cnt not match') -# elif results[0][0] < 0: -# raise MyError('schema split v2 not finished yet') -# else: -# logging.info('check schema split v2 finish success') -# -# -# -## 23. 检查是否有异常租户(creating,延迟删除,恢复中) +# fail_list.append('{0} tenant is merging, please check'.format(results[0][0])) +# logging.info('check cluster status success') +# +## 5. 检查是否有异常租户(creating,延迟删除,恢复中) #def check_tenant_status(query_cur): -# (desc, results) = query_cur.exec_query("""select count(*) as count from __all_tenant where status != 'NORMAL'""") +# (desc, results) = query_cur.exec_query("""select count(*) as count from DBA_OB_TENANTS where status != 'NORMAL'""") # if len(results) != 1 or len(results[0]) != 1: -# raise MyError('results len not match') +# fail_list.append('results len not match') # elif 0 != results[0][0]: -# raise MyError('has abnormal tenant, should stop') +# fail_list.append('has abnormal tenant, should stop') # else: # logging.info('check tenant status success') # -## 24. 所有版本升级都要检查micro_block_merge_verify_level -#def check_micro_block_verify_level(query_cur): -# (desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_sys_parameter_stat where name='micro_block_merge_verify_level' and value < 2""") -# if results[0][0] != 0: -# raise MyError("""unexpected micro_block_merge_verify_level detected, upgrade is not allowed temporarily""") -# logging.info('check micro_block_merge_verify_level success') -# -##25. 需要使用最大性能模式升级,227版本修改了模式切换相关的内部表 -#def check_cluster_protection_mode(query_cur): -# (desc, results) = query_cur.exec_query("""select count(*) from __all_core_table where table_name = '__all_cluster' and column_name = 'protection_mode'"""); -# if len(results) != 1: -# raise MyError('failed to get protection mode') -# elif results[0][0] == 0: -# logging.info('no need to check protection mode') -# else: -# (desc, results) = query_cur.exec_query("""select column_value from __all_core_table where table_name = '__all_cluster' and column_name = 'protection_mode'"""); -# if len(results) != 1: -# raise MyError('failed to get protection mode') -# elif cmp(results[0][0], '0') != 0: -# raise MyError('cluster not maximum performance protection mode before update not allowed, protecion_mode={0}'.format(results[0][0])) -# else: -# logging.info('cluster protection mode legal before update!') -# -## 27. 检查无恢复任务 +## 6. 检查无恢复任务 #def check_restore_job_exist(query_cur): -# (desc, results) = query_cur.exec_query("""select count(1) from __all_restore_job""") +# (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_RESTORE_PROGRESS""") # if len(results) != 1 or len(results[0]) != 1: -# raise MyError('failed to restore job cnt') +# fail_list.append('failed to restore job cnt') # elif results[0][0] != 0: -# raise MyError("""still has restore job, upgrade is not allowed temporarily""") +# fail_list.append("""still has restore job, upgrade is not allowed temporarily""") # logging.info('check restore job success') # -## 28. 检查系统租户系统表leader是否打散 -#def check_sys_table_leader(query_cur): -# (desc, results) = query_cur.exec_query("""select svr_ip, svr_port from oceanbase.__all_virtual_core_meta_table where role = 1""") -# if len(results) != 1 or len(results[0]) != 2: -# raise MyError('failed to rs leader') +# +#def check_is_primary_zone_distributed(primary_zone_str): +# semicolon_pos = len(primary_zone_str) +# for i in range(len(primary_zone_str)): +# if primary_zone_str[i] == ';': +# semicolon_pos = i +# break +# comma_pos = len(primary_zone_str) +# for j in range(len(primary_zone_str)): +# if primary_zone_str[j] == ',': +# comma_pos = j +# break +# if comma_pos < semicolon_pos: +# return True # else: -# svr_ip = results[0][0] -# svr_port = results[0][1] -# # check __all_root_table's leader -# (desc, results) = query_cur.exec_query("""select count(1) from oceanbase.__all_virtual_core_root_table -# where role = 1 and svr_ip = '{0}' and svr_port = '{1}'""".format(svr_ip, svr_port)) -# if len(results) != 1 or len(results[0]) != 1: -# raise MyError('failed to __all_root_table leader') -# elif results[0][0] != 1: -# raise MyError("""__all_root_table should be {0}:{1}""".format(svr_ip, svr_port)) +# return False # -# # check sys tables' leader -# (desc, results) = query_cur.exec_query("""select count(1) from oceanbase.__all_virtual_core_root_table -# where tenant_id = 1 and role = 1 and (svr_ip != '{0}' or svr_port != '{1}')""" .format(svr_ip, svr_port)) -# if len(results) != 1 or len(results[0]) != 1: -# raise MyError('failed to __all_root_table leader') -# elif results[0][0] != 0: -# raise MyError("""sys tables'leader should be {0}:{1}""".format(svr_ip, svr_port)) +## 7. 升级前需要primary zone只有一个 +#def check_tenant_primary_zone(query_cur): +# (desc, results) = query_cur.exec_query("""select tenant_name,primary_zone from DBA_OB_TENANTS where tenant_id != 1"""); +# for item in results: +# if cmp(item[1], "RANDOM") == 0: +# fail_list.append('{0} tenant primary zone random before update not allowed'.format(item[0])) +# elif check_is_primary_zone_distributed(item[1]): +# fail_list.append('{0} tenant primary zone distributed before update not allowed'.format(item[0])) +# logging.info('check tenant primary zone success') # -## 30. check duplicate index name in mysql -## https://work.aone.alibaba-inc.com/issue/36047465 -#def check_duplicate_index_name_in_mysql(query_cur, cur): -# (desc, results) = query_cur.exec_query( -# """ -# select /*+ OB_QUERY_TIMEOUT(100000000) */ -# a.database_id, a.data_table_id, lower(substr(table_name, length(substring_index(a.table_name, "_", 4)) + 2)) as index_name -# from oceanbase.__all_virtual_table as a join oceanbase.__all_tenant as b on a.tenant_id = b.tenant_id -# where a.table_type = 5 and b.compatibility_mode = 0 and lower(table_name) like "__idx%" group by 1,2,3 having count(*) > 1 -# """) -# if (len(results) != 0) : -# raise MyError("Duplicate index name exist in mysql tenant") +## 8. 修改永久下线的时间,避免升级过程中缺副本 +#def modify_server_permanent_offline_time(cur): +# set_parameter(cur, 'server_permanent_offline_time', '72h') +# +## last check of do_check, make sure no function execute after check_fail_list +#def check_fail_list(): +# if len(fail_list) != 0 : +# error_msg ="upgrade checker failed with " + str(len(fail_list)) + " reasons: " + ", ".join(['['+x+"] " for x in fail_list]) +# raise MyError(error_msg) # ## 开始升级前的检查 #def do_check(my_host, my_port, my_user, my_passwd, upgrade_params): @@ -14254,22 +14070,12 @@ # check_paxos_replica(query_cur) # check_rebalance_task(query_cur) # check_cluster_status(query_cur) -# check_disable_separate_sys_clog(query_cur) -# check_macro_block_data_seq(query_cur) -# check_tenant_resource_pool(query_cur) # check_tenant_status(query_cur) -# check_tenant_part_num(query_cur) -# check_tenant_resource(query_cur) -# check_readonly_zone(query_cur) -# modify_server_permanent_offline_time(cur) -# modify_replica_safe_remove_time(cur) -# check_high_priority_net_thread_count_before_224(query_cur) -# check_standby_cluster(query_cur) -# #check_schema_split_v2_finish(query_cur) -# check_micro_block_verify_level(query_cur) # check_restore_job_exist(query_cur) -# check_sys_table_leader(query_cur) -# check_duplicate_index_name_in_mysql(query_cur, cur) +# check_tenant_primary_zone(query_cur) +# # all check func should execute before check_fail_list +# check_fail_list() +# #modify_server_permanent_offline_time(cur) # except Exception, e: # logging.exception('run error') # raise e @@ -15068,7 +14874,7 @@ #def check_cluster_version(query_cur): # # 一方面配置项生效是个异步生效任务,另一方面是2.2.0之后新增租户级配置项刷新,和系统级配置项刷新复用同一个timer,这里暂且等一下。 # times = 30 -# sql="select distinct value = '{0}' from oceanbase.__all_virtual_sys_parameter_stat where name='min_observer_version'".format(upgrade_params.new_version) +# sql="select distinct value = '{0}' from oceanbase.GV$OB_PARAMETERS where name='min_observer_version'".format(upgrade_params.new_version) # while times > 0 : # (desc, results) = query_cur.exec_query(sql) # if len(results) == 1 and results[0][0] == 1: @@ -15081,38 +14887,6 @@ # else: # logging.info("check_cluster_version success") # -#def check_storage_format_version(query_cur): -# # Specified expected version each time want to upgrade (see OB_STORAGE_FORMAT_VERSION_MAX) -# expect_version = 4; -# sql = "select value from oceanbase.__all_zone where zone = '' and name = 'storage_format_version'" -# times = 180 -# while times > 0 : -# (desc, results) = query_cur.exec_query(sql) -# if len(results) == 1 and results[0][0] == expect_version: -# break -# time.sleep(10) -# times -= 1 -# if times == 0: -# logging.warn("check storage format version timeout! Expected version {0}".format(expect_version)) -# raise e -# else: -# logging.info("check expected storage format version '{0}' success".format(expect_version)) -# -#def upgrade_storage_format_version(conn, cur): -# try: -# # enable_ddl -# set_parameter(cur, 'enable_ddl', 'True') -# -# # run job -# sql = "alter system run job 'UPGRADE_STORAGE_FORMAT_VERSION';" -# logging.info(sql) -# cur.execute(sql) -# -# except Exception, e: -# logging.warn("upgrade storage format version failed") -# raise e -# logging.info("upgrade storage format version finish") -# ## 2 检查内部表自检是否成功 #def check_root_inspection(query_cur): # sql = "select count(*) from oceanbase.__all_virtual_upgrade_inspection where info != 'succeed'" @@ -15128,20 +14902,6 @@ # raise e # logging.info('check root inspection success') # -## 3 check standby cluster -#def check_standby_cluster(conn, query_cur, my_user, my_passwd): -# try: -# is_primary = check_current_cluster_is_primary(query_cur) -# if not is_primary: -# logging.info("""current cluster is standby cluster, just skip""") -# else: -# tenant_id_list = fetch_tenant_ids(query_cur) -# standby_cluster_infos = fetch_standby_cluster_infos(conn, query_cur, my_user, my_passwd) -# check_ddl_and_dml_sync(conn, query_cur, standby_cluster_infos, tenant_id_list) -# except Exception, e: -# logging.exception('fail to fetch standby cluster info') -# raise e -# ## 4 开ddl #def enable_ddl(cur): # set_parameter(cur, 'enable_ddl', 'True') @@ -15169,82 +14929,6 @@ # logging.exception('fail to fetch distinct tenant ids') # raise e # -#def check_current_cluster_is_primary(query_cur): -# try: -# sql = """SELECT * FROM v$ob_cluster -# WHERE cluster_role = "PRIMARY" -# AND cluster_status = "VALID" -# AND (switchover_status = "NOT ALLOWED" OR switchover_status = "TO STANDBY") """ -# (desc, results) = query_cur.exec_query(sql) -# is_primary = len(results) > 0 -# return is_primary -# except Exception, e: -# logging.exception("""fail to check current is primary""") -# raise e -# -#def fetch_standby_cluster_infos(conn, query_cur, user, pwd): -# try: -# is_primary = check_current_cluster_is_primary(query_cur) -# if not is_primary: -# logging.exception("""should be primary cluster""") -# raise e -# -# standby_cluster_infos = [] -# sql = """SELECT cluster_id, rootservice_list from v$ob_standby_status""" -# (desc, results) = query_cur.exec_query(sql) -# -# for r in results: -# standby_cluster_info = {} -# if 2 != len(r): -# logging.exception("length not match") -# raise e -# standby_cluster_info['cluster_id'] = r[0] -# standby_cluster_info['user'] = user -# standby_cluster_info['pwd'] = pwd -# # construct ip/port -# address = r[1].split(";")[0] # choose first address in rs_list -# standby_cluster_info['ip'] = str(address.split(":")[0]) -# standby_cluster_info['port'] = address.split(":")[2] -# # append -# standby_cluster_infos.append(standby_cluster_info) -# logging.info("""cluster_info : cluster_id = {0}, ip = {1}, port = {2}""" -# .format(standby_cluster_info['cluster_id'], -# standby_cluster_info['ip'], -# standby_cluster_info['port'])) -# conn.commit() -# # check standby cluster -# for standby_cluster_info in standby_cluster_infos: -# # connect -# logging.info("""create connection : cluster_id = {0}, ip = {1}, port = {2}""" -# .format(standby_cluster_info['cluster_id'], -# standby_cluster_info['ip'], -# standby_cluster_info['port'])) -# -# tmp_conn = mysql.connector.connect(user = standby_cluster_info['user'], -# password = standby_cluster_info['pwd'], -# host = standby_cluster_info['ip'], -# port = standby_cluster_info['port'], -# database = 'oceanbase') -# -# tmp_cur = tmp_conn.cursor(buffered=True) -# tmp_conn.autocommit = True -# tmp_query_cur = Cursor(tmp_cur) -# is_primary = check_current_cluster_is_primary(tmp_query_cur) -# if is_primary: -# logging.exception("""primary cluster changed : cluster_id = {0}, ip = {1}, port = {2}""" -# .format(standby_cluster_info['cluster_id'], -# standby_cluster_info['ip'], -# standby_cluster_info['port'])) -# raise e -# # close -# tmp_cur.close() -# tmp_conn.close() -# -# return standby_cluster_infos -# except Exception, e: -# logging.exception('fail to fetch standby cluster info') -# raise e -# #def check_ddl_and_dml_sync(conn, query_cur, standby_cluster_infos, tenant_ids): # try: # conn.commit() @@ -15394,10 +15078,7 @@ # query_cur = Cursor(cur) # try: # check_cluster_version(query_cur) -# #upgrade_storage_format_version(conn, cur) -# #check_storage_format_version(query_cur) # check_root_inspection(query_cur) -# check_standby_cluster(conn, query_cur, my_user, my_passwd) # enable_ddl(cur) # enable_rebalance(cur) # enable_rereplication(cur) diff --git a/tools/upgrade/upgrade_pre.py b/tools/upgrade/upgrade_pre.py index 4d8fe9d3e..44f1c00ab 100755 --- a/tools/upgrade/upgrade_pre.py +++ b/tools/upgrade/upgrade_pre.py @@ -13682,7 +13682,7 @@ # #class UpgradeParams: # log_filename = 'upgrade_checker.log' -# old_version = '4.0.0' +# old_version = '4.0.0.0' ##### --------------start : my_error.py -------------- #class MyError(Exception): # def __init__(self, value): @@ -13956,286 +13956,102 @@ ##### ---------------end---------------------- # # -# +#fail_list=[] # ##### START #### ## 1. 检查前置版本 #def check_observer_version(query_cur, upgrade_params): -# (desc, results) = query_cur.exec_query("""select distinct value from __all_virtual_sys_parameter_stat where name='min_observer_version'""") +# (desc, results) = query_cur.exec_query("""select distinct value from GV$OB_PARAMETERS where name='min_observer_version'""") # if len(results) != 1: -# raise MyError('query results count is not 1') +# fail_list.append('query results count is not 1') # elif cmp(results[0][0], upgrade_params.old_version) < 0 : -# raise MyError('old observer version is expected equal or higher then: {0}, actual version:{1}'.format(upgrade_params.old_version, results[0][0])) +# fail_list.append('old observer version is expected equal or higher then: {0}, actual version:{1}'.format(upgrade_params.old_version, results[0][0])) # logging.info('check observer version success, version = {0}'.format(results[0][0])) # ## 2. 检查paxos副本是否同步, paxos副本是否缺失 #def check_paxos_replica(query_cur): # # 2.1 检查paxos副本是否同步 -# (desc, results) = query_cur.exec_query("""select count(1) as unsync_cnt from __all_virtual_clog_stat where is_in_sync = 0 and is_offline = 0 and replica_type != 16""") +# (desc, results) = query_cur.exec_query("""select count(1) as unsync_cnt from GV$OB_LOG_STAT where in_sync = 'NO'""") # if results[0][0] > 0 : -# raise MyError('{0} replicas unsync, please check'.format(results[0][0])) +# fail_list.append('{0} replicas unsync, please check'.format(results[0][0])) # # 2.2 检查paxos副本是否有缺失 TODO # logging.info('check paxos replica success') # ## 3. 检查是否有做balance, locality变更 #def check_rebalance_task(query_cur): # # 3.1 检查是否有做locality变更 -# (desc, results) = query_cur.exec_query("""select count(1) as cnt from __all_rootservice_job where job_status='INPROGRESS' and return_code is null""") +# (desc, results) = query_cur.exec_query("""select count(1) as cnt from DBA_OB_TENANT_JOBS where job_status='INPROGRESS' and result_code is null""") # if results[0][0] > 0 : -# raise MyError('{0} locality tasks is doing, please check'.format(results[0][0])) +# fail_list.append('{0} locality tasks is doing, please check'.format(results[0][0])) # # 3.2 检查是否有做balance -# (desc, results) = query_cur.exec_query("""select count(1) as rebalance_task_cnt from __all_virtual_rebalance_task_stat""") +# (desc, results) = query_cur.exec_query("""select count(1) as rebalance_task_cnt from CDB_OB_LS_REPLICA_TASKS""") # if results[0][0] > 0 : -# raise MyError('{0} rebalance tasks is doing, please check'.format(results[0][0])) +# fail_list.append('{0} rebalance tasks is doing, please check'.format(results[0][0])) # logging.info('check rebalance task success') # ## 4. 检查集群状态 #def check_cluster_status(query_cur): # # 4.1 检查是否非合并状态 -# (desc, results) = query_cur.exec_query("""select info from __all_zone where zone='' and name='merge_status'""") -# if cmp(results[0][0], 'IDLE') != 0 : -# raise MyError('global status expected = {0}, actual = {1}'.format('IDLE', results[0][0])) -# logging.info('check cluster status success') -# # 4.2 检查合并版本是否>=3 -# (desc, results) = query_cur.exec_query("""select cast(value as unsigned) value from __all_zone where zone='' and name='last_merged_version'""") -# if results[0][0] < 2 : -# raise MyError('global last_merged_version expected >= 2 actual = {0}'.format(results[0][0])) -# logging.info('check global last_merged_version success') -# -## 5. 检查没有打开enable_separate_sys_clog -#def check_disable_separate_sys_clog(query_cur): -# (desc, results) = query_cur.exec_query("""select count(1) from __all_sys_parameter where name like 'enable_separate_sys_clog'""") +# (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_MAJOR_COMPACTION where STATUS != 'IDLE'""") # if results[0][0] > 0 : -# raise MyError('enable_separate_sys_clog is true, unexpected') -# logging.info('check separate_sys_clog success') -# -## 6. 检查配置宏块的data_seq -#def check_macro_block_data_seq(query_cur): -# query_cur.exec_sql("""set ob_query_timeout=1800000000""") -# row_count = query_cur.exec_sql("""select * from __all_virtual_partition_sstable_macro_info where data_seq < 0 limit 1""") -# if row_count != 0: -# raise MyError('check_macro_block_data_seq failed, too old macro block needs full merge') -# logging.info('check_macro_block_data_seq success') -# -## 8. 检查租户的resource_pool内存规格, 要求F类型unit最小内存大于5G,L类型unit最小内存大于2G. -#def check_tenant_resource_pool(query_cur): -# (desc, results) = query_cur.exec_query("""select 1 from v$sysstat where name = 'is mini mode' and value = '1' and con_id = 1 limit 1""") -# if len(results) > 0: -# # mini部署的集群,租户规格可以很小,这里跳过检查 -# pass -# else: -# (desc, results) = query_cur.exec_query("""select count(*) from oceanbase.__all_resource_pool a, oceanbase.__all_unit_config b where a.unit_config_id = b.unit_config_id and b.unit_config_id != 100 and a.replica_type=0 and b.min_memory < 5368709120""") -# if results[0][0] > 0 : -# raise MyError('{0} tenant resource pool unit config is less than 5G, please check'.format(results[0][0])) -# (desc, results) = query_cur.exec_query("""select count(*) from oceanbase.__all_resource_pool a, oceanbase.__all_unit_config b where a.unit_config_id = b.unit_config_id and b.unit_config_id != 100 and a.replica_type=5 and b.min_memory < 2147483648""") -# if results[0][0] > 0 : -# raise MyError('{0} tenant logonly resource pool unit config is less than 2G, please check'.format(results[0][0])) -# -## 9. 检查是否有日志型副本分布在Full类型unit中 -## 2020-12-31 根据外部使用L副本且L型unit功能不成熟的需求,将这个检查去掉. -# -## 10. 检查租户分区数是否超出内存限制 -#def check_tenant_part_num(query_cur): -# # 统计每个租户在各个server上的分区数量 -# (desc, res_part_num) = query_cur.exec_query("""select svr_ip, svr_port, table_id >> 40 as tenant_id, count(*) as part_num from __all_virtual_clog_stat group by 1,2,3 order by 1,2,3""") -# # 计算每个租户在每个server上的max_memory -# (desc, res_unit_memory) = query_cur.exec_query("""select u.svr_ip, u.svr_port, t.tenant_id, uc.max_memory, p.replica_type from __all_unit u, __All_resource_pool p, __all_tenant t, __all_unit_config uc where p.resource_pool_id = u.resource_pool_id and t.tenant_id = p.tenant_id and p.unit_config_id = uc.unit_config_id""") -# # 查询每个server的memstore_limit_percentage -# (desc, res_svr_memstore_percent) = query_cur.exec_query("""select svr_ip, svr_port, name, value from __all_virtual_sys_parameter_stat where name = 'memstore_limit_percentage'""") -# part_static_cost = 128 * 1024 -# part_dynamic_cost = 400 * 1024 -# # 考虑到升级过程中可能有建表的需求,因此预留512个分区 -# part_num_reserved = 512 -# for line in res_part_num: -# svr_ip = line[0] -# svr_port = line[1] -# tenant_id = line[2] -# part_num = line[3] -# for uline in res_unit_memory: -# uip = uline[0] -# uport = uline[1] -# utid = uline[2] -# umem = uline[3] -# utype = uline[4] -# if svr_ip == uip and svr_port == uport and tenant_id == utid: -# for mpline in res_svr_memstore_percent: -# mpip = mpline[0] -# mpport = mpline[1] -# if mpip == uip and mpport == uport: -# mspercent = int(mpline[3]) -# mem_limit = umem -# if 0 == utype: -# # full类型的unit需要为memstore预留内存 -# mem_limit = umem * (100 - mspercent) / 100 -# part_num_limit = mem_limit / (part_static_cost + part_dynamic_cost / 10); -# if part_num_limit <= 1000: -# part_num_limit = mem_limit / (part_static_cost + part_dynamic_cost) -# if part_num >= (part_num_limit - part_num_reserved): -# raise MyError('{0} {1} {2} exceed tenant partition num limit, please check'.format(line, uline, mpline)) -# break -# logging.info('check tenant partition num success') -# -## 11. 检查存在租户partition,但是不存在unit的observer -#def check_tenant_resource(query_cur): -# (desc, res_unit) = query_cur.exec_query("""select tenant_id, svr_ip, svr_port from __all_virtual_partition_info where (tenant_id, svr_ip, svr_port) not in (select tenant_id, svr_ip, svr_port from __all_unit, __all_resource_pool where __all_unit.resource_pool_id = __all_resource_pool.resource_pool_id group by tenant_id, svr_ip, svr_port) group by tenant_id, svr_ip, svr_port""") -# for line in res_unit: -# raise MyError('{0} tenant unit not exist but partition exist'.format(line)) -# logging.info("check tenant resource success") -# -## 12. 检查系统表(__all_table_history)索引生效情况 -#def check_sys_index_status(query_cur): -# (desc, results) = query_cur.exec_query("""select count(*) as cnt from __all_table where data_table_id = 1099511627890 and table_id = 1099511637775 and index_type = 1 and index_status = 2""") -# if len(results) != 1 or results[0][0] != 1: -# raise MyError("""__all_table_history's index status not valid""") -# logging.info("""check __all_table_history's index status success""") -# -## 14. 检查升级前是否有只读zone -#def check_readonly_zone(query_cur): -# (desc, results) = query_cur.exec_query("""select count(*) from __all_zone where name='zone_type' and info='ReadOnly'""") -# if results[0][0] != 0: -# raise MyError("""check_readonly_zone failed, ob2.2 not support readonly_zone""") -# logging.info("""check_readonly_zone success""") -# -## 16. 修改永久下线的时间,避免升级过程中缺副本 -#def modify_server_permanent_offline_time(cur): -# set_parameter(cur, 'server_permanent_offline_time', '72h') -# -## 17. 修改安全删除副本时间 -#def modify_replica_safe_remove_time(cur): -# set_parameter(cur, 'replica_safe_remove_time', '72h') -# -## 18. 检查progressive_merge_round都升到1 -# -## 19. 从小于224的版本升级上来时,需要确认high_priority_net_thread_count配置项值为0 (224版本开始该值默认为1) -#def check_high_priority_net_thread_count_before_224(query_cur): -# # 获取最小版本 -# (desc, results) = query_cur.exec_query("""select distinct value from __all_virtual_sys_parameter_stat where name='min_observer_version'""") -# if len(results) != 1: -# raise MyError('distinct observer version exist') -# elif cmp(results[0][0], "2.2.40") >= 0 : -# # 最小版本大于等于2.2.40,忽略检查 -# logging.info('cluster version ({0}) is greate than or equal to 2.2.40, need not check high_priority_net_thread_count'.format(results[0][0])) -# else: -# # 低于224版本的需要确认配置项值为0 -# logging.info('cluster version is ({0}), need check high_priority_net_thread_count'.format(results[0][0])) -# (desc, results) = query_cur.exec_query("""select count(*) from __all_sys_parameter where name like 'high_priority_net_thread_count' and value not like '0'""") -# if results[0][0] > 0: -# raise MyError('high_priority_net_thread_count is greater than 0, unexpected') -# logging.info('check high_priority_net_thread_count finished') -# -## 20. 从小于226的版本升级上来时,要求不能有备库存在 -#def check_standby_cluster(query_cur): -# # 获取最小版本 -# (desc, results) = query_cur.exec_query("""select distinct value from __all_virtual_sys_parameter_stat where name='min_observer_version'""") -# if len(results) != 1: -# raise MyError('distinct observer version exist') -# elif cmp(results[0][0], "2.2.60") >= 0 : -# # 最小版本大于等于2.2.60,忽略检查 -# logging.info('cluster version ({0}) is greate than or equal to 2.2.60, need not check standby cluster'.format(results[0][0])) -# else: -# logging.info('cluster version is ({0}), need check standby cluster'.format(results[0][0])) -# (desc, results) = query_cur.exec_query("""select count(*) as cnt from __all_table where table_name = '__all_cluster'""") -# if results[0][0] == 0: -# logging.info('cluster ({0}) has no __all_cluster table, no standby cluster'.format(results[0][0])) -# else: -# (desc, results) = query_cur.exec_query("""select count(*) as cnt from __all_cluster""") -# if results[0][0] > 1: -# raise MyError("""multiple cluster exist in __all_cluster, maybe standby clusters added, not supported""") -# logging.info('check standby cluster from __all_cluster success') -# -## 21. 3.0是barrier版本,要求列column_id修正的升级任务做完 -#def check_schema_split_v2_finish(query_cur): -# # 获取最小版本 -# sql = """select cast(column_value as signed) as version from __all_core_table -# where table_name='__all_global_stat' and column_name = 'split_schema_version_v2'""" -# (desc, results) = query_cur.exec_query(sql) -# if len(results) != 1 or len(results[0]) != 1: -# raise MyError('row or column cnt not match') -# elif results[0][0] < 0: -# raise MyError('schema split v2 not finished yet') -# else: -# logging.info('check schema split v2 finish success') -# -# -# -## 23. 检查是否有异常租户(creating,延迟删除,恢复中) +# fail_list.append('{0} tenant is merging, please check'.format(results[0][0])) +# logging.info('check cluster status success') +# +## 5. 检查是否有异常租户(creating,延迟删除,恢复中) #def check_tenant_status(query_cur): -# (desc, results) = query_cur.exec_query("""select count(*) as count from __all_tenant where status != 'NORMAL'""") +# (desc, results) = query_cur.exec_query("""select count(*) as count from DBA_OB_TENANTS where status != 'NORMAL'""") # if len(results) != 1 or len(results[0]) != 1: -# raise MyError('results len not match') +# fail_list.append('results len not match') # elif 0 != results[0][0]: -# raise MyError('has abnormal tenant, should stop') +# fail_list.append('has abnormal tenant, should stop') # else: # logging.info('check tenant status success') # -## 24. 所有版本升级都要检查micro_block_merge_verify_level -#def check_micro_block_verify_level(query_cur): -# (desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_sys_parameter_stat where name='micro_block_merge_verify_level' and value < 2""") -# if results[0][0] != 0: -# raise MyError("""unexpected micro_block_merge_verify_level detected, upgrade is not allowed temporarily""") -# logging.info('check micro_block_merge_verify_level success') -# -##25. 需要使用最大性能模式升级,227版本修改了模式切换相关的内部表 -#def check_cluster_protection_mode(query_cur): -# (desc, results) = query_cur.exec_query("""select count(*) from __all_core_table where table_name = '__all_cluster' and column_name = 'protection_mode'"""); -# if len(results) != 1: -# raise MyError('failed to get protection mode') -# elif results[0][0] == 0: -# logging.info('no need to check protection mode') -# else: -# (desc, results) = query_cur.exec_query("""select column_value from __all_core_table where table_name = '__all_cluster' and column_name = 'protection_mode'"""); -# if len(results) != 1: -# raise MyError('failed to get protection mode') -# elif cmp(results[0][0], '0') != 0: -# raise MyError('cluster not maximum performance protection mode before update not allowed, protecion_mode={0}'.format(results[0][0])) -# else: -# logging.info('cluster protection mode legal before update!') -# -## 27. 检查无恢复任务 +## 6. 检查无恢复任务 #def check_restore_job_exist(query_cur): -# (desc, results) = query_cur.exec_query("""select count(1) from __all_restore_job""") +# (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_RESTORE_PROGRESS""") # if len(results) != 1 or len(results[0]) != 1: -# raise MyError('failed to restore job cnt') +# fail_list.append('failed to restore job cnt') # elif results[0][0] != 0: -# raise MyError("""still has restore job, upgrade is not allowed temporarily""") +# fail_list.append("""still has restore job, upgrade is not allowed temporarily""") # logging.info('check restore job success') # -## 28. 检查系统租户系统表leader是否打散 -#def check_sys_table_leader(query_cur): -# (desc, results) = query_cur.exec_query("""select svr_ip, svr_port from oceanbase.__all_virtual_core_meta_table where role = 1""") -# if len(results) != 1 or len(results[0]) != 2: -# raise MyError('failed to rs leader') +# +#def check_is_primary_zone_distributed(primary_zone_str): +# semicolon_pos = len(primary_zone_str) +# for i in range(len(primary_zone_str)): +# if primary_zone_str[i] == ';': +# semicolon_pos = i +# break +# comma_pos = len(primary_zone_str) +# for j in range(len(primary_zone_str)): +# if primary_zone_str[j] == ',': +# comma_pos = j +# break +# if comma_pos < semicolon_pos: +# return True # else: -# svr_ip = results[0][0] -# svr_port = results[0][1] -# # check __all_root_table's leader -# (desc, results) = query_cur.exec_query("""select count(1) from oceanbase.__all_virtual_core_root_table -# where role = 1 and svr_ip = '{0}' and svr_port = '{1}'""".format(svr_ip, svr_port)) -# if len(results) != 1 or len(results[0]) != 1: -# raise MyError('failed to __all_root_table leader') -# elif results[0][0] != 1: -# raise MyError("""__all_root_table should be {0}:{1}""".format(svr_ip, svr_port)) +# return False # -# # check sys tables' leader -# (desc, results) = query_cur.exec_query("""select count(1) from oceanbase.__all_virtual_core_root_table -# where tenant_id = 1 and role = 1 and (svr_ip != '{0}' or svr_port != '{1}')""" .format(svr_ip, svr_port)) -# if len(results) != 1 or len(results[0]) != 1: -# raise MyError('failed to __all_root_table leader') -# elif results[0][0] != 0: -# raise MyError("""sys tables'leader should be {0}:{1}""".format(svr_ip, svr_port)) +## 7. 升级前需要primary zone只有一个 +#def check_tenant_primary_zone(query_cur): +# (desc, results) = query_cur.exec_query("""select tenant_name,primary_zone from DBA_OB_TENANTS where tenant_id != 1"""); +# for item in results: +# if cmp(item[1], "RANDOM") == 0: +# fail_list.append('{0} tenant primary zone random before update not allowed'.format(item[0])) +# elif check_is_primary_zone_distributed(item[1]): +# fail_list.append('{0} tenant primary zone distributed before update not allowed'.format(item[0])) +# logging.info('check tenant primary zone success') # -## 30. check duplicate index name in mysql -## https://work.aone.alibaba-inc.com/issue/36047465 -#def check_duplicate_index_name_in_mysql(query_cur, cur): -# (desc, results) = query_cur.exec_query( -# """ -# select /*+ OB_QUERY_TIMEOUT(100000000) */ -# a.database_id, a.data_table_id, lower(substr(table_name, length(substring_index(a.table_name, "_", 4)) + 2)) as index_name -# from oceanbase.__all_virtual_table as a join oceanbase.__all_tenant as b on a.tenant_id = b.tenant_id -# where a.table_type = 5 and b.compatibility_mode = 0 and lower(table_name) like "__idx%" group by 1,2,3 having count(*) > 1 -# """) -# if (len(results) != 0) : -# raise MyError("Duplicate index name exist in mysql tenant") +## 8. 修改永久下线的时间,避免升级过程中缺副本 +#def modify_server_permanent_offline_time(cur): +# set_parameter(cur, 'server_permanent_offline_time', '72h') +# +## last check of do_check, make sure no function execute after check_fail_list +#def check_fail_list(): +# if len(fail_list) != 0 : +# error_msg ="upgrade checker failed with " + str(len(fail_list)) + " reasons: " + ", ".join(['['+x+"] " for x in fail_list]) +# raise MyError(error_msg) # ## 开始升级前的检查 #def do_check(my_host, my_port, my_user, my_passwd, upgrade_params): @@ -14254,22 +14070,12 @@ # check_paxos_replica(query_cur) # check_rebalance_task(query_cur) # check_cluster_status(query_cur) -# check_disable_separate_sys_clog(query_cur) -# check_macro_block_data_seq(query_cur) -# check_tenant_resource_pool(query_cur) # check_tenant_status(query_cur) -# check_tenant_part_num(query_cur) -# check_tenant_resource(query_cur) -# check_readonly_zone(query_cur) -# modify_server_permanent_offline_time(cur) -# modify_replica_safe_remove_time(cur) -# check_high_priority_net_thread_count_before_224(query_cur) -# check_standby_cluster(query_cur) -# #check_schema_split_v2_finish(query_cur) -# check_micro_block_verify_level(query_cur) # check_restore_job_exist(query_cur) -# check_sys_table_leader(query_cur) -# check_duplicate_index_name_in_mysql(query_cur, cur) +# check_tenant_primary_zone(query_cur) +# # all check func should execute before check_fail_list +# check_fail_list() +# #modify_server_permanent_offline_time(cur) # except Exception, e: # logging.exception('run error') # raise e @@ -15068,7 +14874,7 @@ #def check_cluster_version(query_cur): # # 一方面配置项生效是个异步生效任务,另一方面是2.2.0之后新增租户级配置项刷新,和系统级配置项刷新复用同一个timer,这里暂且等一下。 # times = 30 -# sql="select distinct value = '{0}' from oceanbase.__all_virtual_sys_parameter_stat where name='min_observer_version'".format(upgrade_params.new_version) +# sql="select distinct value = '{0}' from oceanbase.GV$OB_PARAMETERS where name='min_observer_version'".format(upgrade_params.new_version) # while times > 0 : # (desc, results) = query_cur.exec_query(sql) # if len(results) == 1 and results[0][0] == 1: @@ -15081,38 +14887,6 @@ # else: # logging.info("check_cluster_version success") # -#def check_storage_format_version(query_cur): -# # Specified expected version each time want to upgrade (see OB_STORAGE_FORMAT_VERSION_MAX) -# expect_version = 4; -# sql = "select value from oceanbase.__all_zone where zone = '' and name = 'storage_format_version'" -# times = 180 -# while times > 0 : -# (desc, results) = query_cur.exec_query(sql) -# if len(results) == 1 and results[0][0] == expect_version: -# break -# time.sleep(10) -# times -= 1 -# if times == 0: -# logging.warn("check storage format version timeout! Expected version {0}".format(expect_version)) -# raise e -# else: -# logging.info("check expected storage format version '{0}' success".format(expect_version)) -# -#def upgrade_storage_format_version(conn, cur): -# try: -# # enable_ddl -# set_parameter(cur, 'enable_ddl', 'True') -# -# # run job -# sql = "alter system run job 'UPGRADE_STORAGE_FORMAT_VERSION';" -# logging.info(sql) -# cur.execute(sql) -# -# except Exception, e: -# logging.warn("upgrade storage format version failed") -# raise e -# logging.info("upgrade storage format version finish") -# ## 2 检查内部表自检是否成功 #def check_root_inspection(query_cur): # sql = "select count(*) from oceanbase.__all_virtual_upgrade_inspection where info != 'succeed'" @@ -15128,20 +14902,6 @@ # raise e # logging.info('check root inspection success') # -## 3 check standby cluster -#def check_standby_cluster(conn, query_cur, my_user, my_passwd): -# try: -# is_primary = check_current_cluster_is_primary(query_cur) -# if not is_primary: -# logging.info("""current cluster is standby cluster, just skip""") -# else: -# tenant_id_list = fetch_tenant_ids(query_cur) -# standby_cluster_infos = fetch_standby_cluster_infos(conn, query_cur, my_user, my_passwd) -# check_ddl_and_dml_sync(conn, query_cur, standby_cluster_infos, tenant_id_list) -# except Exception, e: -# logging.exception('fail to fetch standby cluster info') -# raise e -# ## 4 开ddl #def enable_ddl(cur): # set_parameter(cur, 'enable_ddl', 'True') @@ -15169,82 +14929,6 @@ # logging.exception('fail to fetch distinct tenant ids') # raise e # -#def check_current_cluster_is_primary(query_cur): -# try: -# sql = """SELECT * FROM v$ob_cluster -# WHERE cluster_role = "PRIMARY" -# AND cluster_status = "VALID" -# AND (switchover_status = "NOT ALLOWED" OR switchover_status = "TO STANDBY") """ -# (desc, results) = query_cur.exec_query(sql) -# is_primary = len(results) > 0 -# return is_primary -# except Exception, e: -# logging.exception("""fail to check current is primary""") -# raise e -# -#def fetch_standby_cluster_infos(conn, query_cur, user, pwd): -# try: -# is_primary = check_current_cluster_is_primary(query_cur) -# if not is_primary: -# logging.exception("""should be primary cluster""") -# raise e -# -# standby_cluster_infos = [] -# sql = """SELECT cluster_id, rootservice_list from v$ob_standby_status""" -# (desc, results) = query_cur.exec_query(sql) -# -# for r in results: -# standby_cluster_info = {} -# if 2 != len(r): -# logging.exception("length not match") -# raise e -# standby_cluster_info['cluster_id'] = r[0] -# standby_cluster_info['user'] = user -# standby_cluster_info['pwd'] = pwd -# # construct ip/port -# address = r[1].split(";")[0] # choose first address in rs_list -# standby_cluster_info['ip'] = str(address.split(":")[0]) -# standby_cluster_info['port'] = address.split(":")[2] -# # append -# standby_cluster_infos.append(standby_cluster_info) -# logging.info("""cluster_info : cluster_id = {0}, ip = {1}, port = {2}""" -# .format(standby_cluster_info['cluster_id'], -# standby_cluster_info['ip'], -# standby_cluster_info['port'])) -# conn.commit() -# # check standby cluster -# for standby_cluster_info in standby_cluster_infos: -# # connect -# logging.info("""create connection : cluster_id = {0}, ip = {1}, port = {2}""" -# .format(standby_cluster_info['cluster_id'], -# standby_cluster_info['ip'], -# standby_cluster_info['port'])) -# -# tmp_conn = mysql.connector.connect(user = standby_cluster_info['user'], -# password = standby_cluster_info['pwd'], -# host = standby_cluster_info['ip'], -# port = standby_cluster_info['port'], -# database = 'oceanbase') -# -# tmp_cur = tmp_conn.cursor(buffered=True) -# tmp_conn.autocommit = True -# tmp_query_cur = Cursor(tmp_cur) -# is_primary = check_current_cluster_is_primary(tmp_query_cur) -# if is_primary: -# logging.exception("""primary cluster changed : cluster_id = {0}, ip = {1}, port = {2}""" -# .format(standby_cluster_info['cluster_id'], -# standby_cluster_info['ip'], -# standby_cluster_info['port'])) -# raise e -# # close -# tmp_cur.close() -# tmp_conn.close() -# -# return standby_cluster_infos -# except Exception, e: -# logging.exception('fail to fetch standby cluster info') -# raise e -# #def check_ddl_and_dml_sync(conn, query_cur, standby_cluster_infos, tenant_ids): # try: # conn.commit() @@ -15394,10 +15078,7 @@ # query_cur = Cursor(cur) # try: # check_cluster_version(query_cur) -# #upgrade_storage_format_version(conn, cur) -# #check_storage_format_version(query_cur) # check_root_inspection(query_cur) -# check_standby_cluster(conn, query_cur, my_user, my_passwd) # enable_ddl(cur) # enable_rebalance(cur) # enable_rereplication(cur)