Disable direct load before upgrade

This commit is contained in:
ND501
2024-10-16 07:43:57 +00:00
committed by ob-robot
parent 752e8135ef
commit d6b955715c
4 changed files with 87 additions and 0 deletions

View File

@ -886,6 +886,29 @@ def is_x86_arch():
bret=True bret=True
return bret return bret
# 检查 direct_load 是否已经结束,开启升级之前需要确保没有 direct_load 任务,且升级期间尽量禁止 direct_load 任务
def disable_and_check_direct_load_task(cur, query_cur):
get_version_sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
(version_desc, version_results) = query_cur.exec_query(get_version_sql)
if len(version_results) != 1:
fail_list.append('min_observer_version is not sync')
elif len(version_results[0]) != 1:
fail_list.append('column cnt not match')
else:
min_cluster_version = get_version(version_results[0][0])
if min_cluster_version < get_version("4.3.3.0"):
# 通过配置项关闭 direct_load
set_parameter(cur, '_ob_enable_direct_load', 'False')
# 等待 5s,确保没有导入任务
time.sleep(5)
sql = """select count(1) from __all_virtual_load_data_stat"""
(desc, results) = query_cur.exec_query(sql)
if 0 != results[0][0]:
fail_list.append("There are direct load task in progress")
logging.info('check direct load task execut status success')
else:
logging.info('min cluster version is greater than 4.3.3, no need to disable and check direct load task')
# 检查cs_encoding格式是否兼容,对小于4.3.3版本的cpu不支持avx2指令集的集群,我们要求升级前schema上不存在cs_encoding的存储格式 # 检查cs_encoding格式是否兼容,对小于4.3.3版本的cpu不支持avx2指令集的集群,我们要求升级前schema上不存在cs_encoding的存储格式
# 注意:这里对混布集群 / schema上row_format进行了ddl变更的场景无法做到完全的防御 # 注意:这里对混布集群 / schema上row_format进行了ddl变更的场景无法做到完全的防御
def check_cs_encoding_arch_dependency_compatiblity(query_cur, cpu_arch): def check_cs_encoding_arch_dependency_compatiblity(query_cur, cpu_arch):
@ -997,6 +1020,7 @@ def do_check(my_host, my_port, my_user, my_passwd, timeout, upgrade_params, cpu_
check_disk_space_for_mds_sstable_compat(query_cur) check_disk_space_for_mds_sstable_compat(query_cur)
check_cs_encoding_arch_dependency_compatiblity(query_cur, cpu_arch) check_cs_encoding_arch_dependency_compatiblity(query_cur, cpu_arch)
# all check func should execute before check_fail_list # all check func should execute before check_fail_list
disable_and_check_direct_load_task(cur, query_cur)
check_fail_list() check_fail_list()
modify_server_permanent_offline_time(cur) modify_server_permanent_offline_time(cur)
except Exception as e: except Exception as e:

View File

@ -2539,6 +2539,29 @@
# bret=True # bret=True
# return bret # return bret
# #
## 检查 direct_load 是否已经结束,开启升级之前需要确保没有 direct_load 任务,且升级期间尽量禁止 direct_load 任务
#def disable_and_check_direct_load_task(cur, query_cur):
# get_version_sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
# (version_desc, version_results) = query_cur.exec_query(get_version_sql)
# if len(version_results) != 1:
# fail_list.append('min_observer_version is not sync')
# elif len(version_results[0]) != 1:
# fail_list.append('column cnt not match')
# else:
# min_cluster_version = get_version(version_results[0][0])
# if min_cluster_version < get_version("4.3.3.0"):
# # 通过配置项关闭 direct_load
# set_parameter(cur, '_ob_enable_direct_load', 'False')
# # 等待 5s,确保没有导入任务
# time.sleep(5)
# sql = """select count(1) from __all_virtual_load_data_stat"""
# (desc, results) = query_cur.exec_query(sql)
# if 0 != results[0][0]:
# fail_list.append("There are direct load task in progress")
# logging.info('check direct load task execut status success')
# else:
# logging.info('min cluster version is greater than 4.3.3, no need to disable and check direct load task')
#
## 检查cs_encoding格式是否兼容,对小于4.3.3版本的cpu不支持avx2指令集的集群,我们要求升级前schema上不存在cs_encoding的存储格式 ## 检查cs_encoding格式是否兼容,对小于4.3.3版本的cpu不支持avx2指令集的集群,我们要求升级前schema上不存在cs_encoding的存储格式
## 注意:这里对混布集群 / schema上row_format进行了ddl变更的场景无法做到完全的防御 ## 注意:这里对混布集群 / schema上row_format进行了ddl变更的场景无法做到完全的防御
#def check_cs_encoding_arch_dependency_compatiblity(query_cur, cpu_arch): #def check_cs_encoding_arch_dependency_compatiblity(query_cur, cpu_arch):
@ -2650,6 +2673,7 @@
# check_disk_space_for_mds_sstable_compat(query_cur) # check_disk_space_for_mds_sstable_compat(query_cur)
# check_cs_encoding_arch_dependency_compatiblity(query_cur, cpu_arch) # check_cs_encoding_arch_dependency_compatiblity(query_cur, cpu_arch)
# # all check func should execute before check_fail_list # # all check func should execute before check_fail_list
# disable_and_check_direct_load_task(cur, query_cur)
# check_fail_list() # check_fail_list()
# modify_server_permanent_offline_time(cur) # modify_server_permanent_offline_time(cur)
# except Exception as e: # except Exception as e:
@ -3275,6 +3299,10 @@
# actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'True', timeout) # actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'True', timeout)
# actions.do_resume_merge(cur, timeout) # actions.do_resume_merge(cur, timeout)
# #
## 8 打开 direct load
#def enable_direct_load(cur, timeout):
# actions.set_parameter(cur, '_ob_enable_direct_load', 'True', timeout)
#
## 开始升级后的检查 ## 开始升级后的检查
#def do_check(conn, cur, query_cur, timeout): #def do_check(conn, cur, query_cur, timeout):
# try: # try:
@ -3285,6 +3313,7 @@
# enable_rebalance(cur, timeout) # enable_rebalance(cur, timeout)
# enable_rereplication(cur, timeout) # enable_rereplication(cur, timeout)
# enable_major_freeze(cur, timeout) # enable_major_freeze(cur, timeout)
# enable_direct_load(cur, timeout)
# except Exception as e: # except Exception as e:
# logging.exception('run error') # logging.exception('run error')
# raise # raise

View File

@ -122,6 +122,10 @@ def enable_major_freeze(cur, timeout):
actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'True', timeout) actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'True', timeout)
actions.do_resume_merge(cur, timeout) actions.do_resume_merge(cur, timeout)
# 8 打开 direct load
def enable_direct_load(cur, timeout):
actions.set_parameter(cur, '_ob_enable_direct_load', 'True', timeout)
# 开始升级后的检查 # 开始升级后的检查
def do_check(conn, cur, query_cur, timeout): def do_check(conn, cur, query_cur, timeout):
try: try:
@ -132,6 +136,7 @@ def do_check(conn, cur, query_cur, timeout):
enable_rebalance(cur, timeout) enable_rebalance(cur, timeout)
enable_rereplication(cur, timeout) enable_rereplication(cur, timeout)
enable_major_freeze(cur, timeout) enable_major_freeze(cur, timeout)
enable_direct_load(cur, timeout)
except Exception as e: except Exception as e:
logging.exception('run error') logging.exception('run error')
raise raise

View File

@ -2539,6 +2539,29 @@
# bret=True # bret=True
# return bret # return bret
# #
## 检查 direct_load 是否已经结束,开启升级之前需要确保没有 direct_load 任务,且升级期间尽量禁止 direct_load 任务
#def disable_and_check_direct_load_task(cur, query_cur):
# get_version_sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
# (version_desc, version_results) = query_cur.exec_query(get_version_sql)
# if len(version_results) != 1:
# fail_list.append('min_observer_version is not sync')
# elif len(version_results[0]) != 1:
# fail_list.append('column cnt not match')
# else:
# min_cluster_version = get_version(version_results[0][0])
# if min_cluster_version < get_version("4.3.3.0"):
# # 通过配置项关闭 direct_load
# set_parameter(cur, '_ob_enable_direct_load', 'False')
# # 等待 5s,确保没有导入任务
# time.sleep(5)
# sql = """select count(1) from __all_virtual_load_data_stat"""
# (desc, results) = query_cur.exec_query(sql)
# if 0 != results[0][0]:
# fail_list.append("There are direct load task in progress")
# logging.info('check direct load task execut status success')
# else:
# logging.info('min cluster version is greater than 4.3.3, no need to disable and check direct load task')
#
## 检查cs_encoding格式是否兼容,对小于4.3.3版本的cpu不支持avx2指令集的集群,我们要求升级前schema上不存在cs_encoding的存储格式 ## 检查cs_encoding格式是否兼容,对小于4.3.3版本的cpu不支持avx2指令集的集群,我们要求升级前schema上不存在cs_encoding的存储格式
## 注意:这里对混布集群 / schema上row_format进行了ddl变更的场景无法做到完全的防御 ## 注意:这里对混布集群 / schema上row_format进行了ddl变更的场景无法做到完全的防御
#def check_cs_encoding_arch_dependency_compatiblity(query_cur, cpu_arch): #def check_cs_encoding_arch_dependency_compatiblity(query_cur, cpu_arch):
@ -2650,6 +2673,7 @@
# check_disk_space_for_mds_sstable_compat(query_cur) # check_disk_space_for_mds_sstable_compat(query_cur)
# check_cs_encoding_arch_dependency_compatiblity(query_cur, cpu_arch) # check_cs_encoding_arch_dependency_compatiblity(query_cur, cpu_arch)
# # all check func should execute before check_fail_list # # all check func should execute before check_fail_list
# disable_and_check_direct_load_task(cur, query_cur)
# check_fail_list() # check_fail_list()
# modify_server_permanent_offline_time(cur) # modify_server_permanent_offline_time(cur)
# except Exception as e: # except Exception as e:
@ -3275,6 +3299,10 @@
# actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'True', timeout) # actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'True', timeout)
# actions.do_resume_merge(cur, timeout) # actions.do_resume_merge(cur, timeout)
# #
## 8 打开 direct load
#def enable_direct_load(cur, timeout):
# actions.set_parameter(cur, '_ob_enable_direct_load', 'True', timeout)
#
## 开始升级后的检查 ## 开始升级后的检查
#def do_check(conn, cur, query_cur, timeout): #def do_check(conn, cur, query_cur, timeout):
# try: # try:
@ -3285,6 +3313,7 @@
# enable_rebalance(cur, timeout) # enable_rebalance(cur, timeout)
# enable_rereplication(cur, timeout) # enable_rereplication(cur, timeout)
# enable_major_freeze(cur, timeout) # enable_major_freeze(cur, timeout)
# enable_direct_load(cur, timeout)
# except Exception as e: # except Exception as e:
# logging.exception('run error') # logging.exception('run error')
# raise # raise