diff --git a/tools/upgrade/actions.py b/tools/upgrade/actions.py index 5a58c7fb3..727b097b9 100755 --- a/tools/upgrade/actions.py +++ b/tools/upgrade/actions.py @@ -127,16 +127,45 @@ def set_parameter(cur, parameter, value, timeout = 0): cur.execute(sql) wait_parameter_sync(cur, False, parameter, value, timeout) +def set_session_timeout(cur, seconds): + sql = "set @@session.ob_query_timeout = {0}".format(seconds * 1000 * 1000) + logging.info(sql) + cur.execute(sql) + +def set_default_timeout_by_tenant(cur, timeout, timeout_per_tenant, min_timeout): + if timeout > 0: + logging.info("use timeout from opt, timeout(s):{0}".format(timeout)) + else: + query_cur = QueryCursor(cur) + tenant_id_list = fetch_tenant_ids(query_cur) + cal_timeout = len(tenant_id_list) * timeout_per_tenant + timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout) + logging.info("use default timeout caculated by tenants, " + "timeout(s):{0}, tenant_count:{1}, " + "timeout_per_tenant(s):{2}, min_timeout(s):{3}" + .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout)) + + return timeout + def set_tenant_parameter(cur, parameter, value, timeout = 0): + tenants_list = [] if get_min_cluster_version(cur) < get_version("4.2.1.0"): tenants_list = ['all'] else: tenants_list = ['sys', 'all_user', 'all_meta'] + + query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) + + set_session_timeout(cur, query_timeout) + for tenants in tenants_list: sql = """alter system set {0} = '{1}' tenant = '{2}'""".format(parameter, value, tenants) logging.info(sql) cur.execute(sql) + + set_session_timeout(cur, 10) + wait_parameter_sync(cur, True, parameter, value, timeout) def get_ori_enable_ddl(cur, timeout): @@ -225,7 +254,20 @@ def wait_parameter_sync(cur, is_tenant_config, key, value, timeout): table_name = "GV$OB_PARAMETERS" if not is_tenant_config else "__all_virtual_tenant_parameter_info" sql = """select count(*) as cnt from oceanbase.{0} where name = '{1}' and value != '{2}'""".format(table_name, key, value) - times = (timeout if timeout > 0 else 60) / 5 + + wait_timeout = 0 + query_timeout = 0 + if not is_tenant_config or timeout > 0: + wait_timeout = (timeout if timeout > 0 else 60) + query_timeout = wait_timeout + else: + # is_tenant_config & timeout not set + wait_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) + query_timeout = set_default_timeout_by_tenant(cur, timeout, 2, 60) + + set_session_timeout(cur, query_timeout) + + times = wait_timeout / 5 while times >= 0: logging.info(sql) cur.execute(sql) @@ -245,6 +287,8 @@ def wait_parameter_sync(cur, is_tenant_config, key, value, timeout): raise e time.sleep(5) + set_session_timeout(cur, 10) + def do_begin_upgrade(cur, timeout): if not check_parameter(cur, False, "enable_upgrade_mode", "True"): @@ -316,24 +360,38 @@ def do_suspend_merge(cur, timeout): tenants_list = ['all'] else: tenants_list = ['sys', 'all_user', 'all_meta'] + + query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) + + set_session_timeout(cur, query_timeout) + for tenants in tenants_list: action_sql = "alter system suspend merge tenant = {0}".format(tenants) rollback_sql = "alter system resume merge tenant = {0}".format(tenants) logging.info(action_sql) cur.execute(action_sql) + set_session_timeout(cur, 10) + def do_resume_merge(cur, timeout): tenants_list = [] if get_min_cluster_version(cur) < get_version("4.2.1.0"): tenants_list = ['all'] else: tenants_list = ['sys', 'all_user', 'all_meta'] + + query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) + + set_session_timeout(cur, query_timeout) + for tenants in tenants_list: action_sql = "alter system resume merge tenant = {0}".format(tenants) rollback_sql = "alter system suspend merge tenant = {0}".format(tenants) logging.info(action_sql) cur.execute(action_sql) + set_session_timeout(cur, 10) + class Cursor: __cursor = None def __init__(self, cursor): diff --git a/tools/upgrade/tenant_upgrade_action.py b/tools/upgrade/tenant_upgrade_action.py index fac3d7d28..77b53ccd8 100755 --- a/tools/upgrade/tenant_upgrade_action.py +++ b/tools/upgrade/tenant_upgrade_action.py @@ -21,17 +21,7 @@ def do_upgrade(conn, cur, timeout, user, pwd): else: run_upgrade_job(conn, cur, "UPGRADE_VIRTUAL_SCHEMA", timeout) - # just to make __all_virtual_upgrade_inspection avaliable - timeout_ts = (timeout if timeout > 0 else 600) * 1000 * 1000 - sql = "set @@session.ob_query_timeout = {0}".format(timeout_ts) - logging.info(sql) - cur.execute(sql) - sql = "alter system run job 'root_inspection'" - logging.info(sql) - cur.execute(sql) - sql = "set @@session.ob_query_timeout = 10000000" - logging.info(sql) - cur.execute(sql) + run_root_inspection(cur, timeout) ####========******####======== actions begin ========####******========#### upgrade_syslog_level(conn, cur) return @@ -47,7 +37,6 @@ def upgrade_syslog_level(conn, cur): info_cnt = result[0][0] if info_cnt > 0: actions.set_parameter(cur, "syslog_level", "WDIAG") - except Exception, e: logging.warn("upgrade syslog level failed!") raise e @@ -62,6 +51,18 @@ def query(cur, sql): def get_tenant_ids(cur): return [_[0] for _ in query(cur, 'select tenant_id from oceanbase.__all_tenant')] +def run_root_inspection(cur, timeout): + + query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600) + + actions.set_session_timeout(cur, query_timeout) + + sql = "alter system run job 'root_inspection'" + logging.info(sql) + cur.execute(sql) + + actions.set_session_timeout(cur, 10) + def upgrade_across_version(cur): current_data_version = actions.get_current_data_version() int_current_data_version = actions.get_version(current_data_version) @@ -167,7 +168,9 @@ def check_can_run_upgrade_job(cur, job_name): def check_upgrade_job_result(cur, job_name, timeout, max_used_job_id): try: - times = (timeout if timeout > 0 else 3600) / 10 + wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 100, 3600) + + times = wait_timeout / 10 while (times >= 0): sql = """select job_status, rs_svr_ip, rs_svr_port, gmt_create from oceanbase.__all_rootservice_job where job_type = '{0}' and job_id > {1} order by job_id desc limit 1 diff --git a/tools/upgrade/upgrade_health_checker.py b/tools/upgrade/upgrade_health_checker.py index 5f8eeb47e..255ed1dcf 100755 --- a/tools/upgrade/upgrade_health_checker.py +++ b/tools/upgrade/upgrade_health_checker.py @@ -73,7 +73,7 @@ sys.argv[0] + """ [OPTIONS]""" +\ ' that all modules should be run. They are splitted by ",".\n' +\ ' For example: -m all, or --module=ddl,normal_dml,special_action\n' +\ '-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\ -'-t, --timeout=name check timeout, default: 600(s).\n' + \ +'-t, --timeout=name check timeout.\n' + \ '-z, --zone=name If zone is not specified, check all servers status in cluster. \n' +\ ' Otherwise, only check servers status in specified zone. \n' + \ '\n\n' +\ @@ -135,8 +135,7 @@ Option('p', 'password', True, False, ''),\ Option('m', 'module', True, False, 'all'),\ # 日志文件路径,不同脚本的main函数中中会改成不同的默认值 Option('l', 'log-file', True, False),\ -# 一些检查的超时时间,默认是600s -Option('t', 'timeout', True, False, '600'),\ +Option('t', 'timeout', True, False, 0),\ Option('z', 'zone', True, False, ''),\ ]\ @@ -288,13 +287,38 @@ def check_zone_valid(query_cur, zone): else: logging.info("zone is empty, check all servers in cluster") +def fetch_tenant_ids(query_cur): + try: + tenant_id_list = [] + (desc, results) = query_cur.exec_query("""select distinct tenant_id from oceanbase.__all_tenant order by tenant_id desc""") + for r in results: + tenant_id_list.append(r[0]) + return tenant_id_list + except Exception, e: + logging.exception('fail to fetch distinct tenant ids') + raise e + +def set_default_timeout_by_tenant(query_cur, timeout, timeout_per_tenant, min_timeout): + if timeout > 0: + logging.info("use timeout from opt, timeout(s):{0}".format(timeout)) + else: + tenant_id_list = fetch_tenant_ids(query_cur) + cal_timeout = len(tenant_id_list) * timeout_per_tenant + timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout) + logging.info("use default timeout caculated by tenants, " + "timeout(s):{0}, tenant_count:{1}, " + "timeout_per_tenant(s):{2}, min_timeout(s):{3}" + .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout)) + + return timeout + #### START #### # 0. 检查server版本是否严格一致 def check_server_version_by_zone(query_cur, zone): if zone == '': logging.info("skip check server version by cluster") else: - sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server where zone = '{0}'""".format(zone); + sql = """select distinct(substring_index(build_version, '_', 1)) from oceanbase.__all_server where zone = '{0}'""".format(zone); (desc, results) = query_cur.exec_query(sql); if len(results) != 1: raise MyError("servers build_version not match") @@ -304,8 +328,9 @@ def check_server_version_by_zone(query_cur, zone): # 1. 检查paxos副本是否同步, paxos副本是否缺失 def check_paxos_replica(query_cur, timeout): # 1.1 检查paxos副本是否同步 - sql = """select count(*) from GV$OB_LOG_STAT where in_sync = 'NO'""" - check_until_timeout(query_cur, sql, 0, timeout) + sql = """select count(*) from oceanbase.GV$OB_LOG_STAT where in_sync = 'NO'""" + wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600) + check_until_timeout(query_cur, sql, 0, wait_timeout) # 1.2 检查paxos副本是否有缺失 TODO logging.info('check paxos replica success') @@ -315,26 +340,29 @@ def check_observer_status(query_cur, zone, timeout): sql = """select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status='inactive')""" if zone != '': sql += """ and zone = '{0}'""".format(zone) - check_until_timeout(query_cur, sql, 0, timeout) + wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600) + check_until_timeout(query_cur, sql, 0, wait_timeout) # 3. 检查schema是否刷新成功 def check_schema_status(query_cur, timeout): sql = """select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b""" - check_until_timeout(query_cur, sql, 1, timeout) + wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600) + check_until_timeout(query_cur, sql, 1, wait_timeout) # 4. check major finish def check_major_merge(query_cur, timeout): need_check = 0 - (desc, results) = query_cur.exec_query("""select distinct value from GV$OB_PARAMETERs where name = 'enable_major_freeze';""") + (desc, results) = query_cur.exec_query("""select distinct value from oceanbase.GV$OB_PARAMETERS where name = 'enable_major_freeze';""") if len(results) != 1: need_check = 1 elif results[0][0] != 'True': need_check = 1 if need_check == 1: - sql = """select count(1) from CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""" - check_until_timeout(query_cur, sql, 0, timeout) - sql2 = """select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""" - check_until_timeout(query_cur, sql2, 0, timeout) + wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600) + sql = """select count(1) from oceanbase.CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""" + check_until_timeout(query_cur, sql, 0, wait_timeout) + sql2 = """select /*+ query_timeout(1000000000) */ count(1) from oceanbase.__all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""" + check_until_timeout(query_cur, sql2, 0, wait_timeout) def check_until_timeout(query_cur, sql, value, timeout): times = timeout / 10 @@ -366,7 +394,6 @@ def do_check(my_host, my_port, my_user, my_passwd, upgrade_params, timeout, need raise_on_warnings = True) conn.autocommit = True cur = conn.cursor(buffered=True) - timeout = timeout if timeout > 0 else 600 try: query_cur = QueryCursor(cur) check_zone_valid(query_cur, zone) diff --git a/tools/upgrade/upgrade_post.py b/tools/upgrade/upgrade_post.py index ca38333d3..cc1093113 100755 --- a/tools/upgrade/upgrade_post.py +++ b/tools/upgrade/upgrade_post.py @@ -135,16 +135,45 @@ # cur.execute(sql) # wait_parameter_sync(cur, False, parameter, value, timeout) # +#def set_session_timeout(cur, seconds): +# sql = "set @@session.ob_query_timeout = {0}".format(seconds * 1000 * 1000) +# logging.info(sql) +# cur.execute(sql) +# +#def set_default_timeout_by_tenant(cur, timeout, timeout_per_tenant, min_timeout): +# if timeout > 0: +# logging.info("use timeout from opt, timeout(s):{0}".format(timeout)) +# else: +# query_cur = QueryCursor(cur) +# tenant_id_list = fetch_tenant_ids(query_cur) +# cal_timeout = len(tenant_id_list) * timeout_per_tenant +# timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout) +# logging.info("use default timeout caculated by tenants, " +# "timeout(s):{0}, tenant_count:{1}, " +# "timeout_per_tenant(s):{2}, min_timeout(s):{3}" +# .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout)) +# +# return timeout +# #def set_tenant_parameter(cur, parameter, value, timeout = 0): +# # tenants_list = [] # if get_min_cluster_version(cur) < get_version("4.2.1.0"): # tenants_list = ['all'] # else: # tenants_list = ['sys', 'all_user', 'all_meta'] +# +# query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) +# +# set_session_timeout(cur, query_timeout) +# # for tenants in tenants_list: # sql = """alter system set {0} = '{1}' tenant = '{2}'""".format(parameter, value, tenants) # logging.info(sql) # cur.execute(sql) +# +# set_session_timeout(cur, 10) +# # wait_parameter_sync(cur, True, parameter, value, timeout) # #def get_ori_enable_ddl(cur, timeout): @@ -233,7 +262,20 @@ # table_name = "GV$OB_PARAMETERS" if not is_tenant_config else "__all_virtual_tenant_parameter_info" # sql = """select count(*) as cnt from oceanbase.{0} # where name = '{1}' and value != '{2}'""".format(table_name, key, value) -# times = (timeout if timeout > 0 else 60) / 5 +# +# wait_timeout = 0 +# query_timeout = 0 +# if not is_tenant_config or timeout > 0: +# wait_timeout = (timeout if timeout > 0 else 60) +# query_timeout = wait_timeout +# else: +# # is_tenant_config & timeout not set +# wait_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) +# query_timeout = set_default_timeout_by_tenant(cur, timeout, 2, 60) +# +# set_session_timeout(cur, query_timeout) +# +# times = wait_timeout / 5 # while times >= 0: # logging.info(sql) # cur.execute(sql) @@ -253,6 +295,8 @@ # raise e # time.sleep(5) # +# set_session_timeout(cur, 10) +# #def do_begin_upgrade(cur, timeout): # # if not check_parameter(cur, False, "enable_upgrade_mode", "True"): @@ -324,24 +368,38 @@ # tenants_list = ['all'] # else: # tenants_list = ['sys', 'all_user', 'all_meta'] +# +# query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) +# +# set_session_timeout(cur, query_timeout) +# # for tenants in tenants_list: # action_sql = "alter system suspend merge tenant = {0}".format(tenants) # rollback_sql = "alter system resume merge tenant = {0}".format(tenants) # logging.info(action_sql) # cur.execute(action_sql) # +# set_session_timeout(cur, 10) +# #def do_resume_merge(cur, timeout): # tenants_list = [] # if get_min_cluster_version(cur) < get_version("4.2.1.0"): # tenants_list = ['all'] # else: # tenants_list = ['sys', 'all_user', 'all_meta'] +# +# query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) +# +# set_session_timeout(cur, query_timeout) +# # for tenants in tenants_list: # action_sql = "alter system resume merge tenant = {0}".format(tenants) # rollback_sql = "alter system suspend merge tenant = {0}".format(tenants) # logging.info(action_sql) # cur.execute(action_sql) # +# set_session_timeout(cur, 10) +# #class Cursor: # __cursor = None # def __init__(self, cursor): @@ -1334,17 +1392,7 @@ # else: # run_upgrade_job(conn, cur, "UPGRADE_VIRTUAL_SCHEMA", timeout) # -# # just to make __all_virtual_upgrade_inspection avaliable -# timeout_ts = (timeout if timeout > 0 else 600) * 1000 * 1000 -# sql = "set @@session.ob_query_timeout = {0}".format(timeout_ts) -# logging.info(sql) -# cur.execute(sql) -# sql = "alter system run job 'root_inspection'" -# logging.info(sql) -# cur.execute(sql) -# sql = "set @@session.ob_query_timeout = 10000000" -# logging.info(sql) -# cur.execute(sql) +# run_root_inspection(cur, timeout) #####========******####======== actions begin ========####******========#### # upgrade_syslog_level(conn, cur) # return @@ -1360,7 +1408,6 @@ # info_cnt = result[0][0] # if info_cnt > 0: # actions.set_parameter(cur, "syslog_level", "WDIAG") -# # except Exception, e: # logging.warn("upgrade syslog level failed!") # raise e @@ -1375,6 +1422,18 @@ #def get_tenant_ids(cur): # return [_[0] for _ in query(cur, 'select tenant_id from oceanbase.__all_tenant')] # +#def run_root_inspection(cur, timeout): +# +# query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600) +# +# actions.set_session_timeout(cur, query_timeout) +# +# sql = "alter system run job 'root_inspection'" +# logging.info(sql) +# cur.execute(sql) +# +# actions.set_session_timeout(cur, 10) +# #def upgrade_across_version(cur): # current_data_version = actions.get_current_data_version() # int_current_data_version = actions.get_version(current_data_version) @@ -1480,7 +1539,9 @@ # #def check_upgrade_job_result(cur, job_name, timeout, max_used_job_id): # try: -# times = (timeout if timeout > 0 else 3600) / 10 +# wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 100, 3600) +# +# times = wait_timeout / 10 # while (times >= 0): # sql = """select job_status, rs_svr_ip, rs_svr_port, gmt_create from oceanbase.__all_rootservice_job # where job_type = '{0}' and job_id > {1} order by job_id desc limit 1 @@ -2217,7 +2278,8 @@ # fail_list.append("""still has tenant snapshot task, upgrade is not allowed temporarily""") # else: # logging.info('check tenant snapshot task success') -## 19. 检查是否有租户在升到4.3.0版本之前已将binlog_row_image设为MINIMAL +# +## 17. 检查是否有租户在升到4.3.0版本之前已将binlog_row_image设为MINIMAL #def check_variable_binlog_row_image(query_cur): ## 4.3.0.0之前的版本,MINIMAL模式生成的日志CDC无法正常消费(DELETE日志). ## 4.3.0版本开始,MINIMAL模式做了改进,支持CDC消费,需要在升级到4.3.0.0之后再打开. @@ -2405,7 +2467,7 @@ #' that all modules should be run. They are splitted by ",".\n' +\ #' For example: -m all, or --module=ddl,normal_dml,special_action\n' +\ #'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\ -#'-t, --timeout=name check timeout, default: 600(s).\n' + \ +#'-t, --timeout=name check timeout.\n' + \ #'-z, --zone=name If zone is not specified, check all servers status in cluster. \n' +\ #' Otherwise, only check servers status in specified zone. \n' + \ #'\n\n' +\ @@ -2467,8 +2529,7 @@ #Option('m', 'module', True, False, 'all'),\ ## 日志文件路径,不同脚本的main函数中中会改成不同的默认值 #Option('l', 'log-file', True, False),\ -## 一些检查的超时时间,默认是600s -#Option('t', 'timeout', True, False, '600'),\ +#Option('t', 'timeout', True, False, 0),\ #Option('z', 'zone', True, False, ''),\ #]\ # @@ -2620,13 +2681,38 @@ # else: # logging.info("zone is empty, check all servers in cluster") # +#def fetch_tenant_ids(query_cur): +# try: +# tenant_id_list = [] +# (desc, results) = query_cur.exec_query("""select distinct tenant_id from oceanbase.__all_tenant order by tenant_id desc""") +# for r in results: +# tenant_id_list.append(r[0]) +# return tenant_id_list +# except Exception, e: +# logging.exception('fail to fetch distinct tenant ids') +# raise e +# +#def set_default_timeout_by_tenant(query_cur, timeout, timeout_per_tenant, min_timeout): +# if timeout > 0: +# logging.info("use timeout from opt, timeout(s):{0}".format(timeout)) +# else: +# tenant_id_list = fetch_tenant_ids(query_cur) +# cal_timeout = len(tenant_id_list) * timeout_per_tenant +# timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout) +# logging.info("use default timeout caculated by tenants, " +# "timeout(s):{0}, tenant_count:{1}, " +# "timeout_per_tenant(s):{2}, min_timeout(s):{3}" +# .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout)) +# +# return timeout +# ##### START #### ## 0. 检查server版本是否严格一致 #def check_server_version_by_zone(query_cur, zone): # if zone == '': # logging.info("skip check server version by cluster") # else: -# sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server where zone = '{0}'""".format(zone); +# sql = """select distinct(substring_index(build_version, '_', 1)) from oceanbase.__all_server where zone = '{0}'""".format(zone); # (desc, results) = query_cur.exec_query(sql); # if len(results) != 1: # raise MyError("servers build_version not match") @@ -2636,8 +2722,9 @@ ## 1. 检查paxos副本是否同步, paxos副本是否缺失 #def check_paxos_replica(query_cur, timeout): # # 1.1 检查paxos副本是否同步 -# sql = """select count(*) from GV$OB_LOG_STAT where in_sync = 'NO'""" -# check_until_timeout(query_cur, sql, 0, timeout) +# sql = """select count(*) from oceanbase.GV$OB_LOG_STAT where in_sync = 'NO'""" +# wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600) +# check_until_timeout(query_cur, sql, 0, wait_timeout) # # # 1.2 检查paxos副本是否有缺失 TODO # logging.info('check paxos replica success') @@ -2647,26 +2734,29 @@ # sql = """select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status='inactive')""" # if zone != '': # sql += """ and zone = '{0}'""".format(zone) -# check_until_timeout(query_cur, sql, 0, timeout) +# wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600) +# check_until_timeout(query_cur, sql, 0, wait_timeout) # ## 3. 检查schema是否刷新成功 #def check_schema_status(query_cur, timeout): # sql = """select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b""" -# check_until_timeout(query_cur, sql, 1, timeout) +# wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600) +# check_until_timeout(query_cur, sql, 1, wait_timeout) # ## 4. check major finish #def check_major_merge(query_cur, timeout): # need_check = 0 -# (desc, results) = query_cur.exec_query("""select distinct value from GV$OB_PARAMETERs where name = 'enable_major_freeze';""") +# (desc, results) = query_cur.exec_query("""select distinct value from oceanbase.GV$OB_PARAMETERS where name = 'enable_major_freeze';""") # if len(results) != 1: # need_check = 1 # elif results[0][0] != 'True': # need_check = 1 # if need_check == 1: -# sql = """select count(1) from CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""" -# check_until_timeout(query_cur, sql, 0, timeout) -# sql2 = """select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""" -# check_until_timeout(query_cur, sql2, 0, timeout) +# wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600) +# sql = """select count(1) from oceanbase.CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""" +# check_until_timeout(query_cur, sql, 0, wait_timeout) +# sql2 = """select /*+ query_timeout(1000000000) */ count(1) from oceanbase.__all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""" +# check_until_timeout(query_cur, sql2, 0, wait_timeout) # #def check_until_timeout(query_cur, sql, value, timeout): # times = timeout / 10 @@ -2698,7 +2788,6 @@ # raise_on_warnings = True) # conn.autocommit = True # cur = conn.cursor(buffered=True) -# timeout = timeout if timeout > 0 else 600 # try: # query_cur = QueryCursor(cur) # check_zone_valid(query_cur, zone) @@ -2795,8 +2884,14 @@ # # check compatible sync # parameter_count = int(server_count) * int(tenant_count) # current_data_version = actions.get_current_data_version() +# +# query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 2, 60) +# actions.set_session_timeout(cur, query_timeout) +# # sql = """select count(*) as cnt from oceanbase.__all_virtual_tenant_parameter_info where name = 'compatible' and value = '{0}' and tenant_id in ({1})""".format(current_data_version, tenant_ids_str) -# times = (timeout if timeout > 0 else 60) / 5 +# +# wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 60) +# times = wait_timeout / 5 # while times >= 0: # logging.info(sql) # cur.execute(sql) @@ -2816,6 +2911,8 @@ # raise e # time.sleep(5) # +# actions.set_session_timeout(cur, 10) +# # # check target_data_version/current_data_version from __all_core_table # int_current_data_version = actions.get_version(current_data_version) # sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0} and tenant_id in ({1})".format(int_current_data_version, tenant_ids_str) @@ -2830,16 +2927,20 @@ # logging.info("all tenant's target_data_version/current_data_version are match with {0}".format(current_data_version)) # ## 3 检查内部表自检是否成功 -#def check_root_inspection(query_cur, timeout): +#def check_root_inspection(cur, query_cur, timeout): # sql = "select count(*) from oceanbase.__all_virtual_upgrade_inspection where info != 'succeed'" -# times = timeout if timeout > 0 else 180 -# while times > 0 : +# +# wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600) +# +# times = wait_timeout / 10 +# while times >= 0 : # (desc, results) = query_cur.exec_query(sql) # if results[0][0] == 0: # break # time.sleep(10) # times -= 1 -# if times == 0: +# +# if times == -1: # logging.warn('check root inspection failed!') # raise e # logging.info('check root inspection success') @@ -2867,7 +2968,7 @@ # try: # check_cluster_version(cur, timeout) # check_data_version(cur, query_cur, timeout) -# check_root_inspection(query_cur, timeout) +# check_root_inspection(cur, query_cur, timeout) # enable_ddl(cur, timeout) # enable_rebalance(cur, timeout) # enable_rereplication(cur, timeout) diff --git a/tools/upgrade/upgrade_post_checker.py b/tools/upgrade/upgrade_post_checker.py index e49ddbb5e..2a6e56b87 100755 --- a/tools/upgrade/upgrade_post_checker.py +++ b/tools/upgrade/upgrade_post_checker.py @@ -41,8 +41,14 @@ def check_data_version(cur, query_cur, timeout): # check compatible sync parameter_count = int(server_count) * int(tenant_count) current_data_version = actions.get_current_data_version() + + query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 2, 60) + actions.set_session_timeout(cur, query_timeout) + sql = """select count(*) as cnt from oceanbase.__all_virtual_tenant_parameter_info where name = 'compatible' and value = '{0}' and tenant_id in ({1})""".format(current_data_version, tenant_ids_str) - times = (timeout if timeout > 0 else 60) / 5 + + wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 60) + times = wait_timeout / 5 while times >= 0: logging.info(sql) cur.execute(sql) @@ -62,6 +68,8 @@ def check_data_version(cur, query_cur, timeout): raise e time.sleep(5) + actions.set_session_timeout(cur, 10) + # check target_data_version/current_data_version from __all_core_table int_current_data_version = actions.get_version(current_data_version) sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0} and tenant_id in ({1})".format(int_current_data_version, tenant_ids_str) @@ -76,16 +84,20 @@ def check_data_version(cur, query_cur, timeout): logging.info("all tenant's target_data_version/current_data_version are match with {0}".format(current_data_version)) # 3 检查内部表自检是否成功 -def check_root_inspection(query_cur, timeout): +def check_root_inspection(cur, query_cur, timeout): sql = "select count(*) from oceanbase.__all_virtual_upgrade_inspection where info != 'succeed'" - times = timeout if timeout > 0 else 180 - while times > 0 : + + wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600) + + times = wait_timeout / 10 + while times >= 0 : (desc, results) = query_cur.exec_query(sql) if results[0][0] == 0: break time.sleep(10) times -= 1 - if times == 0: + + if times == -1: logging.warn('check root inspection failed!') raise e logging.info('check root inspection success') @@ -113,7 +125,7 @@ def do_check(conn, cur, query_cur, timeout): try: check_cluster_version(cur, timeout) check_data_version(cur, query_cur, timeout) - check_root_inspection(query_cur, timeout) + check_root_inspection(cur, query_cur, timeout) enable_ddl(cur, timeout) enable_rebalance(cur, timeout) enable_rereplication(cur, timeout) diff --git a/tools/upgrade/upgrade_pre.py b/tools/upgrade/upgrade_pre.py index 6ce6108cd..181d3e197 100755 --- a/tools/upgrade/upgrade_pre.py +++ b/tools/upgrade/upgrade_pre.py @@ -135,16 +135,45 @@ # cur.execute(sql) # wait_parameter_sync(cur, False, parameter, value, timeout) # +#def set_session_timeout(cur, seconds): +# sql = "set @@session.ob_query_timeout = {0}".format(seconds * 1000 * 1000) +# logging.info(sql) +# cur.execute(sql) +# +#def set_default_timeout_by_tenant(cur, timeout, timeout_per_tenant, min_timeout): +# if timeout > 0: +# logging.info("use timeout from opt, timeout(s):{0}".format(timeout)) +# else: +# query_cur = QueryCursor(cur) +# tenant_id_list = fetch_tenant_ids(query_cur) +# cal_timeout = len(tenant_id_list) * timeout_per_tenant +# timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout) +# logging.info("use default timeout caculated by tenants, " +# "timeout(s):{0}, tenant_count:{1}, " +# "timeout_per_tenant(s):{2}, min_timeout(s):{3}" +# .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout)) +# +# return timeout +# #def set_tenant_parameter(cur, parameter, value, timeout = 0): +# # tenants_list = [] # if get_min_cluster_version(cur) < get_version("4.2.1.0"): # tenants_list = ['all'] # else: # tenants_list = ['sys', 'all_user', 'all_meta'] +# +# query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) +# +# set_session_timeout(cur, query_timeout) +# # for tenants in tenants_list: # sql = """alter system set {0} = '{1}' tenant = '{2}'""".format(parameter, value, tenants) # logging.info(sql) # cur.execute(sql) +# +# set_session_timeout(cur, 10) +# # wait_parameter_sync(cur, True, parameter, value, timeout) # #def get_ori_enable_ddl(cur, timeout): @@ -233,7 +262,20 @@ # table_name = "GV$OB_PARAMETERS" if not is_tenant_config else "__all_virtual_tenant_parameter_info" # sql = """select count(*) as cnt from oceanbase.{0} # where name = '{1}' and value != '{2}'""".format(table_name, key, value) -# times = (timeout if timeout > 0 else 60) / 5 +# +# wait_timeout = 0 +# query_timeout = 0 +# if not is_tenant_config or timeout > 0: +# wait_timeout = (timeout if timeout > 0 else 60) +# query_timeout = wait_timeout +# else: +# # is_tenant_config & timeout not set +# wait_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) +# query_timeout = set_default_timeout_by_tenant(cur, timeout, 2, 60) +# +# set_session_timeout(cur, query_timeout) +# +# times = wait_timeout / 5 # while times >= 0: # logging.info(sql) # cur.execute(sql) @@ -253,6 +295,8 @@ # raise e # time.sleep(5) # +# set_session_timeout(cur, 10) +# #def do_begin_upgrade(cur, timeout): # # if not check_parameter(cur, False, "enable_upgrade_mode", "True"): @@ -324,24 +368,38 @@ # tenants_list = ['all'] # else: # tenants_list = ['sys', 'all_user', 'all_meta'] +# +# query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) +# +# set_session_timeout(cur, query_timeout) +# # for tenants in tenants_list: # action_sql = "alter system suspend merge tenant = {0}".format(tenants) # rollback_sql = "alter system resume merge tenant = {0}".format(tenants) # logging.info(action_sql) # cur.execute(action_sql) # +# set_session_timeout(cur, 10) +# #def do_resume_merge(cur, timeout): # tenants_list = [] # if get_min_cluster_version(cur) < get_version("4.2.1.0"): # tenants_list = ['all'] # else: # tenants_list = ['sys', 'all_user', 'all_meta'] +# +# query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) +# +# set_session_timeout(cur, query_timeout) +# # for tenants in tenants_list: # action_sql = "alter system resume merge tenant = {0}".format(tenants) # rollback_sql = "alter system suspend merge tenant = {0}".format(tenants) # logging.info(action_sql) # cur.execute(action_sql) # +# set_session_timeout(cur, 10) +# #class Cursor: # __cursor = None # def __init__(self, cursor): @@ -1334,17 +1392,7 @@ # else: # run_upgrade_job(conn, cur, "UPGRADE_VIRTUAL_SCHEMA", timeout) # -# # just to make __all_virtual_upgrade_inspection avaliable -# timeout_ts = (timeout if timeout > 0 else 600) * 1000 * 1000 -# sql = "set @@session.ob_query_timeout = {0}".format(timeout_ts) -# logging.info(sql) -# cur.execute(sql) -# sql = "alter system run job 'root_inspection'" -# logging.info(sql) -# cur.execute(sql) -# sql = "set @@session.ob_query_timeout = 10000000" -# logging.info(sql) -# cur.execute(sql) +# run_root_inspection(cur, timeout) #####========******####======== actions begin ========####******========#### # upgrade_syslog_level(conn, cur) # return @@ -1360,7 +1408,6 @@ # info_cnt = result[0][0] # if info_cnt > 0: # actions.set_parameter(cur, "syslog_level", "WDIAG") -# # except Exception, e: # logging.warn("upgrade syslog level failed!") # raise e @@ -1375,6 +1422,18 @@ #def get_tenant_ids(cur): # return [_[0] for _ in query(cur, 'select tenant_id from oceanbase.__all_tenant')] # +#def run_root_inspection(cur, timeout): +# +# query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600) +# +# actions.set_session_timeout(cur, query_timeout) +# +# sql = "alter system run job 'root_inspection'" +# logging.info(sql) +# cur.execute(sql) +# +# actions.set_session_timeout(cur, 10) +# #def upgrade_across_version(cur): # current_data_version = actions.get_current_data_version() # int_current_data_version = actions.get_version(current_data_version) @@ -1480,7 +1539,9 @@ # #def check_upgrade_job_result(cur, job_name, timeout, max_used_job_id): # try: -# times = (timeout if timeout > 0 else 3600) / 10 +# wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 100, 3600) +# +# times = wait_timeout / 10 # while (times >= 0): # sql = """select job_status, rs_svr_ip, rs_svr_port, gmt_create from oceanbase.__all_rootservice_job # where job_type = '{0}' and job_id > {1} order by job_id desc limit 1 @@ -2217,7 +2278,8 @@ # fail_list.append("""still has tenant snapshot task, upgrade is not allowed temporarily""") # else: # logging.info('check tenant snapshot task success') -## 19. 检查是否有租户在升到4.3.0版本之前已将binlog_row_image设为MINIMAL +# +## 17. 检查是否有租户在升到4.3.0版本之前已将binlog_row_image设为MINIMAL #def check_variable_binlog_row_image(query_cur): ## 4.3.0.0之前的版本,MINIMAL模式生成的日志CDC无法正常消费(DELETE日志). ## 4.3.0版本开始,MINIMAL模式做了改进,支持CDC消费,需要在升级到4.3.0.0之后再打开. @@ -2405,7 +2467,7 @@ #' that all modules should be run. They are splitted by ",".\n' +\ #' For example: -m all, or --module=ddl,normal_dml,special_action\n' +\ #'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\ -#'-t, --timeout=name check timeout, default: 600(s).\n' + \ +#'-t, --timeout=name check timeout.\n' + \ #'-z, --zone=name If zone is not specified, check all servers status in cluster. \n' +\ #' Otherwise, only check servers status in specified zone. \n' + \ #'\n\n' +\ @@ -2467,8 +2529,7 @@ #Option('m', 'module', True, False, 'all'),\ ## 日志文件路径,不同脚本的main函数中中会改成不同的默认值 #Option('l', 'log-file', True, False),\ -## 一些检查的超时时间,默认是600s -#Option('t', 'timeout', True, False, '600'),\ +#Option('t', 'timeout', True, False, 0),\ #Option('z', 'zone', True, False, ''),\ #]\ # @@ -2620,13 +2681,38 @@ # else: # logging.info("zone is empty, check all servers in cluster") # +#def fetch_tenant_ids(query_cur): +# try: +# tenant_id_list = [] +# (desc, results) = query_cur.exec_query("""select distinct tenant_id from oceanbase.__all_tenant order by tenant_id desc""") +# for r in results: +# tenant_id_list.append(r[0]) +# return tenant_id_list +# except Exception, e: +# logging.exception('fail to fetch distinct tenant ids') +# raise e +# +#def set_default_timeout_by_tenant(query_cur, timeout, timeout_per_tenant, min_timeout): +# if timeout > 0: +# logging.info("use timeout from opt, timeout(s):{0}".format(timeout)) +# else: +# tenant_id_list = fetch_tenant_ids(query_cur) +# cal_timeout = len(tenant_id_list) * timeout_per_tenant +# timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout) +# logging.info("use default timeout caculated by tenants, " +# "timeout(s):{0}, tenant_count:{1}, " +# "timeout_per_tenant(s):{2}, min_timeout(s):{3}" +# .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout)) +# +# return timeout +# ##### START #### ## 0. 检查server版本是否严格一致 #def check_server_version_by_zone(query_cur, zone): # if zone == '': # logging.info("skip check server version by cluster") # else: -# sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server where zone = '{0}'""".format(zone); +# sql = """select distinct(substring_index(build_version, '_', 1)) from oceanbase.__all_server where zone = '{0}'""".format(zone); # (desc, results) = query_cur.exec_query(sql); # if len(results) != 1: # raise MyError("servers build_version not match") @@ -2636,8 +2722,9 @@ ## 1. 检查paxos副本是否同步, paxos副本是否缺失 #def check_paxos_replica(query_cur, timeout): # # 1.1 检查paxos副本是否同步 -# sql = """select count(*) from GV$OB_LOG_STAT where in_sync = 'NO'""" -# check_until_timeout(query_cur, sql, 0, timeout) +# sql = """select count(*) from oceanbase.GV$OB_LOG_STAT where in_sync = 'NO'""" +# wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600) +# check_until_timeout(query_cur, sql, 0, wait_timeout) # # # 1.2 检查paxos副本是否有缺失 TODO # logging.info('check paxos replica success') @@ -2647,26 +2734,29 @@ # sql = """select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status='inactive')""" # if zone != '': # sql += """ and zone = '{0}'""".format(zone) -# check_until_timeout(query_cur, sql, 0, timeout) +# wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600) +# check_until_timeout(query_cur, sql, 0, wait_timeout) # ## 3. 检查schema是否刷新成功 #def check_schema_status(query_cur, timeout): # sql = """select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b""" -# check_until_timeout(query_cur, sql, 1, timeout) +# wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600) +# check_until_timeout(query_cur, sql, 1, wait_timeout) # ## 4. check major finish #def check_major_merge(query_cur, timeout): # need_check = 0 -# (desc, results) = query_cur.exec_query("""select distinct value from GV$OB_PARAMETERs where name = 'enable_major_freeze';""") +# (desc, results) = query_cur.exec_query("""select distinct value from oceanbase.GV$OB_PARAMETERS where name = 'enable_major_freeze';""") # if len(results) != 1: # need_check = 1 # elif results[0][0] != 'True': # need_check = 1 # if need_check == 1: -# sql = """select count(1) from CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""" -# check_until_timeout(query_cur, sql, 0, timeout) -# sql2 = """select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""" -# check_until_timeout(query_cur, sql2, 0, timeout) +# wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600) +# sql = """select count(1) from oceanbase.CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""" +# check_until_timeout(query_cur, sql, 0, wait_timeout) +# sql2 = """select /*+ query_timeout(1000000000) */ count(1) from oceanbase.__all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""" +# check_until_timeout(query_cur, sql2, 0, wait_timeout) # #def check_until_timeout(query_cur, sql, value, timeout): # times = timeout / 10 @@ -2698,7 +2788,6 @@ # raise_on_warnings = True) # conn.autocommit = True # cur = conn.cursor(buffered=True) -# timeout = timeout if timeout > 0 else 600 # try: # query_cur = QueryCursor(cur) # check_zone_valid(query_cur, zone) @@ -2795,8 +2884,14 @@ # # check compatible sync # parameter_count = int(server_count) * int(tenant_count) # current_data_version = actions.get_current_data_version() +# +# query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 2, 60) +# actions.set_session_timeout(cur, query_timeout) +# # sql = """select count(*) as cnt from oceanbase.__all_virtual_tenant_parameter_info where name = 'compatible' and value = '{0}' and tenant_id in ({1})""".format(current_data_version, tenant_ids_str) -# times = (timeout if timeout > 0 else 60) / 5 +# +# wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 60) +# times = wait_timeout / 5 # while times >= 0: # logging.info(sql) # cur.execute(sql) @@ -2816,6 +2911,8 @@ # raise e # time.sleep(5) # +# actions.set_session_timeout(cur, 10) +# # # check target_data_version/current_data_version from __all_core_table # int_current_data_version = actions.get_version(current_data_version) # sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0} and tenant_id in ({1})".format(int_current_data_version, tenant_ids_str) @@ -2830,16 +2927,20 @@ # logging.info("all tenant's target_data_version/current_data_version are match with {0}".format(current_data_version)) # ## 3 检查内部表自检是否成功 -#def check_root_inspection(query_cur, timeout): +#def check_root_inspection(cur, query_cur, timeout): # sql = "select count(*) from oceanbase.__all_virtual_upgrade_inspection where info != 'succeed'" -# times = timeout if timeout > 0 else 180 -# while times > 0 : +# +# wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600) +# +# times = wait_timeout / 10 +# while times >= 0 : # (desc, results) = query_cur.exec_query(sql) # if results[0][0] == 0: # break # time.sleep(10) # times -= 1 -# if times == 0: +# +# if times == -1: # logging.warn('check root inspection failed!') # raise e # logging.info('check root inspection success') @@ -2867,7 +2968,7 @@ # try: # check_cluster_version(cur, timeout) # check_data_version(cur, query_cur, timeout) -# check_root_inspection(query_cur, timeout) +# check_root_inspection(cur, query_cur, timeout) # enable_ddl(cur, timeout) # enable_rebalance(cur, timeout) # enable_rereplication(cur, timeout)