[UPGRADE] Make upgrade_post.py reentrant

This commit is contained in:
tino247
2023-02-09 14:10:14 +00:00
committed by ob-robot
parent 7be8e390eb
commit 079e8b9a01
4 changed files with 87 additions and 57 deletions

View File

@ -218,59 +218,70 @@ def wait_parameter_sync(cur, is_tenant_config, key, value, timeout):
time.sleep(5) time.sleep(5)
def do_begin_upgrade(cur, timeout): def do_begin_upgrade(cur, timeout):
action_sql = "alter system begin upgrade"
rollback_sql = "alter system end upgrade"
if not check_parameter(cur, False, "enable_upgrade_mode", "True"): if not check_parameter(cur, False, "enable_upgrade_mode", "True"):
action_sql = "alter system begin upgrade"
rollback_sql = "alter system end upgrade"
logging.info(action_sql) logging.info(action_sql)
cur.execute(action_sql) cur.execute(action_sql)
global g_succ_sql_list
g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
wait_parameter_sync(cur, False, "enable_upgrade_mode", "True", timeout) wait_parameter_sync(cur, False, "enable_upgrade_mode", "True", timeout)
global g_succ_sql_list
g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
def do_begin_rolling_upgrade(cur, timeout): def do_begin_rolling_upgrade(cur, timeout):
action_sql = "alter system begin rolling upgrade"
rollback_sql = "alter system end upgrade"
if not check_parameter(cur, False, "_upgrade_stage", "DBUPGRADE"): if not check_parameter(cur, False, "_upgrade_stage", "DBUPGRADE"):
action_sql = "alter system begin rolling upgrade"
rollback_sql = "alter system end upgrade"
logging.info(action_sql) logging.info(action_sql)
cur.execute(action_sql) cur.execute(action_sql)
global g_succ_sql_list
g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
wait_parameter_sync(cur, False, "_upgrade_stage", "DBUPGRADE", timeout) wait_parameter_sync(cur, False, "_upgrade_stage", "DBUPGRADE", timeout)
global g_succ_sql_list
g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
def do_end_rolling_upgrade(cur, timeout): def do_end_rolling_upgrade(cur, timeout):
# maybe in upgrade_post_check stage or never run begin upgrade
if check_parameter(cur, False, "enable_upgrade_mode", "False"):
return
current_cluster_version = get_current_cluster_version() current_cluster_version = get_current_cluster_version()
action_sql = "alter system end rolling upgrade"
rollback_sql = "alter system end upgrade"
if not check_parameter(cur, False, "_upgrade_stage", "POSTUPGRADE") or not check_parameter(cur, False, "min_observer_version", current_cluster_version): if not check_parameter(cur, False, "_upgrade_stage", "POSTUPGRADE") or not check_parameter(cur, False, "min_observer_version", current_cluster_version):
action_sql = "alter system end rolling upgrade"
rollback_sql = "alter system end upgrade"
logging.info(action_sql) logging.info(action_sql)
cur.execute(action_sql) cur.execute(action_sql)
global g_succ_sql_list
g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
wait_parameter_sync(cur, False, "min_observer_version", current_data_version, timeout) wait_parameter_sync(cur, False, "min_observer_version", current_data_version, timeout)
wait_parameter_sync(cur, False, "_upgrade_stage", "POSTUPGRADE", timeout) wait_parameter_sync(cur, False, "_upgrade_stage", "POSTUPGRADE", timeout)
global g_succ_sql_list
g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
def do_end_upgrade(cur, timeout): def do_end_upgrade(cur, timeout):
action_sql = "alter system end upgrade"
rollback_sql = ""
if not check_parameter(cur, False, "enable_upgrade_mode", "False"): if not check_parameter(cur, False, "enable_upgrade_mode", "False"):
action_sql = "alter system end upgrade"
rollback_sql = ""
logging.info(action_sql) logging.info(action_sql)
cur.execute(action_sql) cur.execute(action_sql)
global g_succ_sql_list
g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
wait_parameter_sync(cur, False, "enable_upgrade_mode", "False", timeout) wait_parameter_sync(cur, False, "enable_upgrade_mode", "False", timeout)
global g_succ_sql_list
g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
class Cursor: class Cursor:
__cursor = None __cursor = None

View File

@ -95,7 +95,6 @@ def do_upgrade(my_host, my_port, my_user, my_passwd, timeout, my_module_set, upg
if run_modules.MODULE_POST_CHECK in my_module_set: if run_modules.MODULE_POST_CHECK in my_module_set:
logging.info('================begin to run post check action ===============') logging.info('================begin to run post check action ===============')
conn.autocommit = True conn.autocommit = True
actions.do_end_upgrade(cur, timeout)
upgrade_post_checker.do_check(conn, cur, query_cur, timeout) upgrade_post_checker.do_check(conn, cur, query_cur, timeout)
conn.autocommit = False conn.autocommit = False
actions.refresh_commit_sql_list() actions.refresh_commit_sql_list()

View File

@ -226,59 +226,70 @@
# time.sleep(5) # time.sleep(5)
# #
#def do_begin_upgrade(cur, timeout): #def do_begin_upgrade(cur, timeout):
# action_sql = "alter system begin upgrade"
# rollback_sql = "alter system end upgrade"
# #
# if not check_parameter(cur, False, "enable_upgrade_mode", "True"): # if not check_parameter(cur, False, "enable_upgrade_mode", "True"):
# action_sql = "alter system begin upgrade"
# rollback_sql = "alter system end upgrade"
# logging.info(action_sql) # logging.info(action_sql)
#
# cur.execute(action_sql) # cur.execute(action_sql)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
#
# wait_parameter_sync(cur, False, "enable_upgrade_mode", "True", timeout) # wait_parameter_sync(cur, False, "enable_upgrade_mode", "True", timeout)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
# #
#def do_begin_rolling_upgrade(cur, timeout): #def do_begin_rolling_upgrade(cur, timeout):
# action_sql = "alter system begin rolling upgrade"
# rollback_sql = "alter system end upgrade"
# #
# if not check_parameter(cur, False, "_upgrade_stage", "DBUPGRADE"): # if not check_parameter(cur, False, "_upgrade_stage", "DBUPGRADE"):
# action_sql = "alter system begin rolling upgrade"
# rollback_sql = "alter system end upgrade"
#
# logging.info(action_sql) # logging.info(action_sql)
# cur.execute(action_sql) # cur.execute(action_sql)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
#
# wait_parameter_sync(cur, False, "_upgrade_stage", "DBUPGRADE", timeout) # wait_parameter_sync(cur, False, "_upgrade_stage", "DBUPGRADE", timeout)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
# #
#def do_end_rolling_upgrade(cur, timeout): #def do_end_rolling_upgrade(cur, timeout):
#
# # maybe in upgrade_post_check stage or never run begin upgrade
# if check_parameter(cur, False, "enable_upgrade_mode", "False"):
# return
#
# current_cluster_version = get_current_cluster_version() # current_cluster_version = get_current_cluster_version()
#
# action_sql = "alter system end rolling upgrade"
# rollback_sql = "alter system end upgrade"
#
# if not check_parameter(cur, False, "_upgrade_stage", "POSTUPGRADE") or not check_parameter(cur, False, "min_observer_version", current_cluster_version): # if not check_parameter(cur, False, "_upgrade_stage", "POSTUPGRADE") or not check_parameter(cur, False, "min_observer_version", current_cluster_version):
# action_sql = "alter system end rolling upgrade"
# rollback_sql = "alter system end upgrade"
#
# logging.info(action_sql) # logging.info(action_sql)
# cur.execute(action_sql) # cur.execute(action_sql)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
#
# wait_parameter_sync(cur, False, "min_observer_version", current_data_version, timeout) # wait_parameter_sync(cur, False, "min_observer_version", current_data_version, timeout)
# wait_parameter_sync(cur, False, "_upgrade_stage", "POSTUPGRADE", timeout) # wait_parameter_sync(cur, False, "_upgrade_stage", "POSTUPGRADE", timeout)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
# #
#def do_end_upgrade(cur, timeout): #def do_end_upgrade(cur, timeout):
# action_sql = "alter system end upgrade"
# rollback_sql = ""
# #
# if not check_parameter(cur, False, "enable_upgrade_mode", "False"): # if not check_parameter(cur, False, "enable_upgrade_mode", "False"):
# action_sql = "alter system end upgrade"
# rollback_sql = ""
#
# logging.info(action_sql) # logging.info(action_sql)
# cur.execute(action_sql) # cur.execute(action_sql)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
#
# wait_parameter_sync(cur, False, "enable_upgrade_mode", "False", timeout) # wait_parameter_sync(cur, False, "enable_upgrade_mode", "False", timeout)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
# #
#class Cursor: #class Cursor:
# __cursor = None # __cursor = None
@ -572,7 +583,6 @@
# if run_modules.MODULE_POST_CHECK in my_module_set: # if run_modules.MODULE_POST_CHECK in my_module_set:
# logging.info('================begin to run post check action ===============') # logging.info('================begin to run post check action ===============')
# conn.autocommit = True # conn.autocommit = True
# actions.do_end_upgrade(cur, timeout)
# upgrade_post_checker.do_check(conn, cur, query_cur, timeout) # upgrade_post_checker.do_check(conn, cur, query_cur, timeout)
# conn.autocommit = False # conn.autocommit = False
# actions.refresh_commit_sql_list() # actions.refresh_commit_sql_list()

View File

@ -226,59 +226,70 @@
# time.sleep(5) # time.sleep(5)
# #
#def do_begin_upgrade(cur, timeout): #def do_begin_upgrade(cur, timeout):
# action_sql = "alter system begin upgrade"
# rollback_sql = "alter system end upgrade"
# #
# if not check_parameter(cur, False, "enable_upgrade_mode", "True"): # if not check_parameter(cur, False, "enable_upgrade_mode", "True"):
# action_sql = "alter system begin upgrade"
# rollback_sql = "alter system end upgrade"
# logging.info(action_sql) # logging.info(action_sql)
#
# cur.execute(action_sql) # cur.execute(action_sql)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
#
# wait_parameter_sync(cur, False, "enable_upgrade_mode", "True", timeout) # wait_parameter_sync(cur, False, "enable_upgrade_mode", "True", timeout)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
# #
#def do_begin_rolling_upgrade(cur, timeout): #def do_begin_rolling_upgrade(cur, timeout):
# action_sql = "alter system begin rolling upgrade"
# rollback_sql = "alter system end upgrade"
# #
# if not check_parameter(cur, False, "_upgrade_stage", "DBUPGRADE"): # if not check_parameter(cur, False, "_upgrade_stage", "DBUPGRADE"):
# action_sql = "alter system begin rolling upgrade"
# rollback_sql = "alter system end upgrade"
#
# logging.info(action_sql) # logging.info(action_sql)
# cur.execute(action_sql) # cur.execute(action_sql)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
#
# wait_parameter_sync(cur, False, "_upgrade_stage", "DBUPGRADE", timeout) # wait_parameter_sync(cur, False, "_upgrade_stage", "DBUPGRADE", timeout)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
# #
#def do_end_rolling_upgrade(cur, timeout): #def do_end_rolling_upgrade(cur, timeout):
#
# # maybe in upgrade_post_check stage or never run begin upgrade
# if check_parameter(cur, False, "enable_upgrade_mode", "False"):
# return
#
# current_cluster_version = get_current_cluster_version() # current_cluster_version = get_current_cluster_version()
#
# action_sql = "alter system end rolling upgrade"
# rollback_sql = "alter system end upgrade"
#
# if not check_parameter(cur, False, "_upgrade_stage", "POSTUPGRADE") or not check_parameter(cur, False, "min_observer_version", current_cluster_version): # if not check_parameter(cur, False, "_upgrade_stage", "POSTUPGRADE") or not check_parameter(cur, False, "min_observer_version", current_cluster_version):
# action_sql = "alter system end rolling upgrade"
# rollback_sql = "alter system end upgrade"
#
# logging.info(action_sql) # logging.info(action_sql)
# cur.execute(action_sql) # cur.execute(action_sql)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
#
# wait_parameter_sync(cur, False, "min_observer_version", current_data_version, timeout) # wait_parameter_sync(cur, False, "min_observer_version", current_data_version, timeout)
# wait_parameter_sync(cur, False, "_upgrade_stage", "POSTUPGRADE", timeout) # wait_parameter_sync(cur, False, "_upgrade_stage", "POSTUPGRADE", timeout)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
# #
#def do_end_upgrade(cur, timeout): #def do_end_upgrade(cur, timeout):
# action_sql = "alter system end upgrade"
# rollback_sql = ""
# #
# if not check_parameter(cur, False, "enable_upgrade_mode", "False"): # if not check_parameter(cur, False, "enable_upgrade_mode", "False"):
# action_sql = "alter system end upgrade"
# rollback_sql = ""
#
# logging.info(action_sql) # logging.info(action_sql)
# cur.execute(action_sql) # cur.execute(action_sql)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
#
# wait_parameter_sync(cur, False, "enable_upgrade_mode", "False", timeout) # wait_parameter_sync(cur, False, "enable_upgrade_mode", "False", timeout)
# #
# global g_succ_sql_list
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
# #
#class Cursor: #class Cursor:
# __cursor = None # __cursor = None
@ -572,7 +583,6 @@
# if run_modules.MODULE_POST_CHECK in my_module_set: # if run_modules.MODULE_POST_CHECK in my_module_set:
# logging.info('================begin to run post check action ===============') # logging.info('================begin to run post check action ===============')
# conn.autocommit = True # conn.autocommit = True
# actions.do_end_upgrade(cur, timeout)
# upgrade_post_checker.do_check(conn, cur, query_cur, timeout) # upgrade_post_checker.do_check(conn, cur, query_cur, timeout)
# conn.autocommit = False # conn.autocommit = False
# actions.refresh_commit_sql_list() # actions.refresh_commit_sql_list()