Files
oceanbase/tools/upgrade/special_upgrade_action_pre.py
2021-12-20 21:06:47 +08:00

536 lines
20 KiB
Python
Executable File

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from my_error import MyError
import time
import mysql.connector
from mysql.connector import errorcode
import logging
import re
import string
from random import Random
from actions import DMLCursor
from actions import QueryCursor
from actions import check_current_cluster_is_primary
import binascii
import my_utils
import actions
import sys
#def modify_schema_history(conn, cur, tenant_ids):
# try:
# conn.autocommit = True
#
# # disable ddl
# ori_enable_ddl = actions.get_ori_enable_ddl(cur)
# if ori_enable_ddl == 1:
# actions.set_parameter(cur, 'enable_ddl', 'False')
# log('tenant_ids: {0}'.format(tenant_ids))
# for tenant_id in tenant_ids:
# sql = """alter system change tenant tenant_id = {0}""".format(tenant_id)
# log(sql)
# cur.execute(sql)
# #####implement#####
#
# # 还原默认 tenant
# sys_tenant_id = 1
# sql = """alter system change tenant tenant_id = {0}""".format(sys_tenant_id)
# log(sql)
# cur.execute(sql)
#
# # enable ddl
# if ori_enable_ddl == 1:
# actions.set_parameter(cur, 'enable_ddl', 'True')
# except Exception, e:
# logging.warn("exec modify trigger failed")
# raise e
# logging.info('exec modify trigger finish')
# 主库需要执行的升级动作
def do_special_upgrade(conn, cur, tenant_id_list, user, passwd):
# special upgrade action
#升级语句对应的action要写在下面的actions begin和actions end这两行之间,
#因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
#这两行之间的这些代码,如果不写在这两行之间的话会导致清空不掉相应的代码。
# 主库升级流程没加滚动升级步骤,或混部阶段DDL测试有相关case覆盖前,混部开始禁DDL
actions.set_parameter(cur, 'enable_ddl', 'False')
####========******####======== actions begin ========####******========####
return
####========******####========= actions end =========####******========####
def do_add_recovery_status_to_all_zone(conn, cur):
try:
logging.info('add recovery status row to __all_zone for each zone')
zones = [];
recovery_status = [];
# pre-check, may skip
check_updated_sql = "select * from oceanbase.__all_zone where zone !='' AND name='recovery_status'"
cur.execute(check_updated_sql)
recovery_status = cur.fetchall()
if 0 < len(recovery_status):
logging.info('[recovery_status] row already exists, no need to add')
# get zones
if 0 >= len(recovery_status):
all_zone_sql = "select distinct(zone) zone from oceanbase.__all_zone where zone !=''"
cur.execute(all_zone_sql)
zone_results = cur.fetchall()
for r in zone_results:
zones.append("('" + r[0] + "', 'recovery_status', 0, 'NORMAL')")
# add rows
if 0 < len(zones):
upgrade_sql = "insert into oceanbase.__all_zone(zone, name, value, info) values " + ','.join(zones)
logging.info(upgrade_sql)
cur.execute(upgrade_sql)
conn.commit()
# check result
if 0 < len(zones):
cur.execute(check_updated_sql)
check_results = cur.fetchall()
if len(check_results) != len(zones):
raise MyError('fail insert [recovery_status] row into __all_zone')
except Exception, e:
logging.exception('do_add_recovery_status_to_all_zone error')
raise e
def do_add_storage_type_to_all_zone(conn, cur):
try:
logging.info('add storage type row to __all_zone for each zone')
zones = [];
storage_types = [];
# pre-check, may skip
check_updated_sql = "select * from oceanbase.__all_zone where zone !='' AND name='storage_type'"
cur.execute(check_updated_sql)
storage_types = cur.fetchall()
if 0 < len(storage_types):
logging.info('[storage_types] row already exists, no need to add')
# get zones
if 0 >= len(storage_types):
all_zone_sql = "select distinct(zone) zone from oceanbase.__all_zone where zone !=''"
cur.execute(all_zone_sql)
zone_results = cur.fetchall()
for r in zone_results:
zones.append("('" + r[0] + "', 'storage_type', 0, 'LOCAL')")
# add rows
if 0 < len(zones):
upgrade_sql = "insert into oceanbase.__all_zone(zone, name, value, info) values " + ','.join(zones)
logging.info(upgrade_sql)
cur.execute(upgrade_sql)
conn.commit()
# check result
if 0 < len(zones):
cur.execute(check_updated_sql)
check_results = cur.fetchall()
if len(check_results) != len(zones):
raise MyError('fail insert [storage_type] row into __all_zone')
except Exception, e:
logging.exception('do_add_storage_type_to_all_zone error')
raise e
def modify_trigger(conn, cur, tenant_ids):
try:
conn.autocommit = True
# disable ddl
ori_enable_ddl = actions.get_ori_enable_ddl(cur)
if ori_enable_ddl == 1:
actions.set_parameter(cur, 'enable_ddl', 'False')
log('tenant_ids: {0}'.format(tenant_ids))
for tenant_id in tenant_ids:
sql = """alter system change tenant tenant_id = {0}""".format(tenant_id)
log(sql)
cur.execute(sql)
#####implement#####
trigger_sql = """
update __all_tenant_trigger
set
package_spec_source = replace(
package_spec_source,
'FUNCTION UPDATING(column VARCHAR2 := NULL) RETURN BOOL\;',
'FUNCTION UPDATING(column_name VARCHAR2 := NULL) RETURN BOOL\;'
),
package_body_source = replace(replace(
package_body_source,
'
PROCEDURE init_trigger(update_columns IN STRINGARRAY) IS
BEGIN
NULL\;
END\;
',
'
PROCEDURE init_trigger(update_columns IN STRINGARRAY) IS
BEGIN
update_columns_ := STRINGARRAY()\;
update_columns_.EXTEND(update_columns.COUNT)\;
FOR i IN 1 .. update_columns.COUNT LOOP
update_columns_(i) := update_columns(i)\;
END LOOP\;
END\;
'),
'
FUNCTION UPDATING(column VARCHAR2 := NULL) RETURN BOOL IS
BEGIN
RETURN (dml_event_ = 4)\;
END\;
',
'
FUNCTION UPDATING(column_name VARCHAR2 := NULL) RETURN BOOL IS
is_updating BOOL\;
BEGIN
is_updating := (dml_event_ = 4)\;
IF (is_updating AND column_name IS NOT NULL) THEN
is_updating := FALSE\;
FOR i IN 1 .. update_columns_.COUNT LOOP
IF (UPPER(update_columns_(i)) = UPPER(column_name)) THEN is_updating := TRUE\; EXIT\; END IF\;
END LOOP\;
END IF\;
RETURN is_updating\;
END\;
'); """
log(trigger_sql)
cur.execute(trigger_sql)
log("update rows = " + str(cur.rowcount))
trigger_history_sql = """
update __all_tenant_trigger_history
set
package_spec_source = replace(
package_spec_source,
'FUNCTION UPDATING(column VARCHAR2 := NULL) RETURN BOOL\;',
'FUNCTION UPDATING(column_name VARCHAR2 := NULL) RETURN BOOL\;'
),
package_body_source = replace(replace(
package_body_source,
'
PROCEDURE init_trigger(update_columns IN STRINGARRAY) IS
BEGIN
NULL\;
END\;
',
'
PROCEDURE init_trigger(update_columns IN STRINGARRAY) IS
BEGIN
update_columns_ := STRINGARRAY()\;
update_columns_.EXTEND(update_columns.COUNT)\;
FOR i IN 1 .. update_columns.COUNT LOOP
update_columns_(i) := update_columns(i)\;
END LOOP\;
END\;
'),
'
FUNCTION UPDATING(column VARCHAR2 := NULL) RETURN BOOL IS
BEGIN
RETURN (dml_event_ = 4)\;
END\;
',
'
FUNCTION UPDATING(column_name VARCHAR2 := NULL) RETURN BOOL IS
is_updating BOOL\;
BEGIN
is_updating := (dml_event_ = 4)\;
IF (is_updating AND column_name IS NOT NULL) THEN
is_updating := FALSE\;
FOR i IN 1 .. update_columns_.COUNT LOOP
IF (UPPER(update_columns_(i)) = UPPER(column_name)) THEN is_updating := TRUE\; EXIT\; END IF\;
END LOOP\;
END IF\;
RETURN is_updating\;
END\;
')
where is_deleted = 0; """
log(trigger_history_sql)
cur.execute(trigger_history_sql)
log("update rows = " + str(cur.rowcount))
#####implement end#####
# 还原默认 tenant
sys_tenant_id = 1
sql = """alter system change tenant tenant_id = {0}""".format(sys_tenant_id)
log(sql)
cur.execute(sql)
# enable ddl
if ori_enable_ddl == 1:
actions.set_parameter(cur, 'enable_ddl', 'True')
except Exception, e:
logging.warn("exec modify trigger failed")
raise e
logging.info('exec modify trigger finish')
def fill_priv_file_column_for_all_user(conn, cur):
try:
conn.autocommit = True
# disable ddl
ori_enable_ddl = actions.get_ori_enable_ddl(cur)
if ori_enable_ddl == 1:
actions.set_parameter(cur, 'enable_ddl', 'False')
tenant_ids = get_tenant_ids(cur)
log('tenant_ids: {0}'.format(tenant_ids))
for tenant_id in tenant_ids:
sql = """alter system change tenant tenant_id = {0}""".format(tenant_id)
log(sql)
cur.execute(sql)
tenant_id_in_sql = 0
if 1 == tenant_id:
tenant_id_in_sql = 1
begin_user_id = 0
begin_schema_version = 0
fetch_num = 1000
while (True):
query_limit = """
where tenant_id = {0} and (user_id, schema_version) > ({1}, {2})
order by tenant_id, user_id, schema_version
limit {3}""".format(tenant_id_in_sql, begin_user_id, begin_schema_version, fetch_num)
sql = """select /*+ QUERY_TIMEOUT(1500000000) */ user_id, schema_version
from oceanbase.__all_user_history""" + query_limit
log(sql)
result_rows = query(cur, sql)
log("select rows = " + str(cur.rowcount))
if len(result_rows) <= 0:
break
else:
last_schema_version = result_rows[-1][1]
last_user_id = result_rows[-1][0]
condition = """
where priv_alter = 1 and priv_create = 1 and priv_create_user = 1 and priv_delete = 1
and priv_drop = 1 and priv_insert = 1 and priv_update = 1 and priv_select = 1
and priv_index = 1 and priv_create_view = 1 and priv_show_view = 1 and priv_show_db = 1
and priv_super = 1 and priv_create_synonym = 1
and tenant_id = {0} and (user_id, schema_version) > ({1}, {2})
and (user_id, schema_version) <= ({3}, {4})
""".format(tenant_id_in_sql, begin_user_id, begin_schema_version, last_user_id, last_schema_version)
sql = """update /*+ QUERY_TIMEOUT(150000000) */ oceanbase.__all_user_history
set priv_file = 1""" + condition
log(sql)
cur.execute(sql)
log("update rows = " + str(cur.rowcount))
condition = """
where priv_super = 1 and tenant_id = {0} and (user_id, schema_version) > ({1}, {2})
and (user_id, schema_version) <= ({3}, {4})
""".format(tenant_id_in_sql, begin_user_id, begin_schema_version, last_user_id, last_schema_version)
sql = """update /*+ QUERY_TIMEOUT(150000000) */ oceanbase.__all_user_history
set priv_alter_tenant = 1,
priv_alter_system = 1,
priv_create_resource_unit = 1,
priv_create_resource_pool = 1 """ + condition
log(sql)
cur.execute(sql)
begin_schema_version = last_schema_version
begin_user_id = last_user_id
begin_user_id = 0
while (True):
query_limit = """
where tenant_id = {0} and user_id > {1}
order by tenant_id, user_id
limit {2}""".format(tenant_id_in_sql, begin_user_id, fetch_num)
sql = """select /*+ QUERY_TIMEOUT(1500000000) */ user_id
from oceanbase.__all_user""" + query_limit
log(sql)
result_rows = query(cur, sql)
log("select rows = " + str(cur.rowcount))
if len(result_rows) <= 0:
break
else:
end_user_id = result_rows[-1][0]
condition = """
where priv_alter = 1 and priv_create = 1 and priv_create_user = 1 and priv_delete = 1
and priv_drop = 1 and priv_insert = 1 and priv_update = 1 and priv_select = 1
and priv_index = 1 and priv_create_view = 1 and priv_show_view = 1 and priv_show_db = 1
and priv_super = 1 and priv_create_synonym = 1
and tenant_id = {0} and user_id > {1} and user_id <= {2}
""".format(tenant_id_in_sql, begin_user_id, end_user_id)
sql = """update /*+ QUERY_TIMEOUT(150000000) */ oceanbase.__all_user
set priv_file = 1 """ + condition
log(sql)
cur.execute(sql)
log("update rows = " + str(cur.rowcount))
condition = """
where priv_super = 1
and tenant_id = {0} and user_id > {1} and user_id <= {2}
""".format(tenant_id_in_sql, begin_user_id, end_user_id)
sql = """update /*+ QUERY_TIMEOUT(150000000) */ oceanbase.__all_user
set priv_alter_tenant = 1,
priv_alter_system = 1,
priv_create_resource_unit = 1,
priv_create_resource_pool = 1 """ + condition
log(sql)
cur.execute(sql)
begin_user_id = end_user_id
# 还原默认 tenant
sys_tenant_id = 1
sql = """alter system change tenant tenant_id = {0}""".format(sys_tenant_id)
log(sql)
cur.execute(sql)
# enable ddl
if ori_enable_ddl == 1:
actions.set_parameter(cur, 'enable_ddl', 'True')
except Exception, e:
logging.warn("exec fill priv_file to all_user failed")
raise e
logging.info('exec fill priv_file to all_user finish')
def insert_split_schema_version_v2(conn, cur, user, pwd):
try:
query_cur = actions.QueryCursor(cur)
is_primary = actions.check_current_cluster_is_primary(query_cur)
if not is_primary:
logging.warn("should run in primary cluster")
raise e
# primary cluster
dml_cur = actions.DMLCursor(cur)
sql = """replace into __all_core_table(table_name, row_id, column_name, column_value)
values ('__all_global_stat', 1, 'split_schema_version_v2', '-1');"""
rowcount = dml_cur.exec_update(sql)
if rowcount <= 0:
logging.warn("invalid rowcount : {0}".format(rowcount))
raise e
# standby cluster
standby_cluster_list = actions.fetch_standby_cluster_infos(conn, query_cur, user, pwd)
for standby_cluster in standby_cluster_list:
# connect
logging.info("create connection : cluster_id = {0}, ip = {1}, port = {2}"
.format(standby_cluster['cluster_id'],
standby_cluster['ip'],
standby_cluster['port']))
tmp_conn = mysql.connector.connect(user = standby_cluster['user'],
password = standby_cluster['pwd'],
host = standby_cluster['ip'],
port = standby_cluster['port'],
database = 'oceanbase')
tmp_cur = tmp_conn.cursor(buffered=True)
tmp_conn.autocommit = True
tmp_query_cur = actions.QueryCursor(tmp_cur)
# check if stanby cluster
is_primary = actions.check_current_cluster_is_primary(tmp_query_cur)
if is_primary:
logging.exception("""primary cluster changed : cluster_id = {0}, ip = {1}, port = {2}"""
.format(standby_cluster['cluster_id'],
standby_cluster['ip'],
standby_cluster['port']))
raise e
# replace
tmp_dml_cur = actions.DMLCursor(tmp_cur)
sql = """replace into __all_core_table(table_name, row_id, column_name, column_value)
values ('__all_global_stat', 1, 'split_schema_version_v2', '-1');"""
rowcount = tmp_dml_cur.exec_update(sql)
if rowcount <= 0:
logging.warn("invalid rowcount : {0}".format(rowcount))
raise e
# close
tmp_cur.close()
tmp_conn.close()
except Exception, e:
logging.warn("init split_schema_version_v2 failed")
raise e
def query(cur, sql):
log(sql)
cur.execute(sql)
results = cur.fetchall()
return results
def log(msg):
logging.info(msg)
def get_oracle_tenant_ids(cur):
return [_[0] for _ in query(cur, 'select tenant_id from oceanbase.__all_tenant where compatibility_mode = 1')]
def get_tenant_ids(cur):
return [_[0] for _ in query(cur, 'select tenant_id from oceanbase.__all_tenant')]
# 修正升级上来的旧外键在 __all_foreign_key_column 和 __all_foreign_key_column_history 中 position 列的数据
def modify_foreign_key_column_position_info(conn, cur):
try:
conn.autocommit = True
# disable ddl
ori_enable_ddl = actions.get_ori_enable_ddl(cur)
if ori_enable_ddl == 1:
actions.set_parameter(cur, 'enable_ddl', 'False')
tenant_ids = get_tenant_ids(cur)
log('tenant_ids: {0}'.format(tenant_ids))
for tenant_id in tenant_ids:
sql = """alter system change tenant tenant_id = {0}""".format(tenant_id)
log(sql)
cur.execute(sql)
tenant_id_in_sql = 0
if 1 == tenant_id:
tenant_id_in_sql = 1
# 查出租户下所有未被删除的外键
sql = """select /*+ QUERY_TIMEOUT(1500000000) */ foreign_key_id from oceanbase.__all_foreign_key where tenant_id = {0}""".format(tenant_id_in_sql)
log(sql)
foreign_key_id_rows = query(cur, sql)
fk_num = len(foreign_key_id_rows)
cnt = 0
# 遍历每个外键,检查 oceanbase.__all_foreign_key_column 中记录的 position 信息是否为 0,如果为 0,需要更新为正确的值
while cnt < fk_num:
foreign_key_id = foreign_key_id_rows[cnt][0]
sql = """select /*+ QUERY_TIMEOUT(1500000000) */ child_column_id, parent_column_id from oceanbase.__all_foreign_key_column where foreign_key_id = {0} and position = 0 and tenant_id = {1} order by gmt_create asc""".format(foreign_key_id, tenant_id_in_sql)
log(sql)
need_update_rows = query(cur, sql)
fk_col_num = len(need_update_rows)
if fk_col_num > 0:
position = 1
# 遍历特定外键里的每个 position 信息为 0 的列
while position <= fk_col_num:
child_column_id = need_update_rows[position - 1][0]
parent_column_id = need_update_rows[position - 1][1]
# 在 oceanbase.__all_foreign_key_column_history 里面更新 position 的值
sql = """update /*+ QUERY_TIMEOUT(150000000) */ oceanbase.__all_foreign_key_column_history set position = {0} where foreign_key_id = {1} and child_column_id = {2} and parent_column_id = {3} and tenant_id = {4}""".format(position, foreign_key_id, child_column_id, parent_column_id, tenant_id_in_sql)
log(sql)
cur.execute(sql)
if cur.rowcount == 0:
logging.warn("affected rows is 0 when update oceanbase.__all_foreign_key_column_history")
raise e
# 在 oceanbase.__all_foreign_key_column 里面更新 position 的值
sql = """update /*+ QUERY_TIMEOUT(150000000) */ oceanbase.__all_foreign_key_column set position = {0} where foreign_key_id = {1} and child_column_id = {2} and parent_column_id = {3} and tenant_id = {4}""".format(position, foreign_key_id, child_column_id, parent_column_id, tenant_id_in_sql)
log(sql)
cur.execute(sql)
if cur.rowcount != 1:
logging.warn("affected rows is not 1 when update oceanbase.__all_foreign_key_column")
raise e
position = position + 1
cnt = cnt + 1
# 还原默认 tenant
sys_tenant_id = 1
sql = """alter system change tenant tenant_id = {0}""".format(sys_tenant_id)
log(sql)
cur.execute(sql)
# enable ddl
if ori_enable_ddl == 1:
actions.set_parameter(cur, 'enable_ddl', 'True')
except Exception, e:
logging.warn("modify foreign key column position failed")
raise e
logging.info('modify foreign key column position finish')