149 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			149 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python
 | |
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| from my_error import MyError
 | |
| import mysql.connector
 | |
| from mysql.connector import errorcode
 | |
| from actions import BaseDMLAction
 | |
| from actions import reflect_action_cls_list
 | |
| from actions import fetch_observer_version
 | |
| from actions import QueryCursor
 | |
| from actions import check_current_cluster_is_primary
 | |
| import logging
 | |
| import my_utils
 | |
| 
 | |
| '''
 | |
| 添加一条normal dml的方法:
 | |
| 
 | |
| 在本文件中,添加一个类名以"NormalDMLActionPre"开头并且继承自BaseDMLAction的类,
 | |
| 然后在这个类中实现以下成员函数,并且每个函数执行出错都要抛错:
 | |
| (1)@staticmethod get_seq_num():
 | |
| 返回一个代表着执行顺序的序列号,该序列号在本文件中不允许重复,若有重复则会报错。
 | |
| (2)dump_before_do_action(self):
 | |
| 执行action sql之前把一些相关数据dump到日志中。
 | |
| (3)check_before_do_action(self):
 | |
| 执行action sql之前的检查。
 | |
| (4)@staticmethod get_action_dml():
 | |
| 返回action sql,并且该sql必须为dml。
 | |
| (5)@staticmethod get_rollback_sql():
 | |
| 返回回滚该action的sql。
 | |
| (6)dump_after_do_action(self):
 | |
| 执行action sql之后把一些相关数据dump到日志中。
 | |
| (7)check_after_do_action(self):
 | |
| 执行action sql之后的检查。
 | |
| (8)skip_action(self):
 | |
| check if check_before_do_action() and do_action() can be skipped
 | |
| 
 | |
| 举例:
 | |
| class NormalDMLActionPre1(BaseDMLAction):
 | |
|   @staticmethod
 | |
|   def get_seq_num():
 | |
|     return 0
 | |
|   def dump_before_do_action(self):
 | |
|     my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test""")
 | |
|   def skip_action(self):
 | |
|     (desc, results) = self._query_cursor.exec_query("""select * from test.for_test where pk = 9""")
 | |
|     return (len(results) > 0)
 | |
|   def check_before_do_action(self):
 | |
|     (desc, results) = self._query_cursor.exec_query("""select * from test.for_test where pk = 9""")
 | |
|     if len(results) > 0:
 | |
|       raise MyError('some row in table test.for_test whose primary key is 9 already exists')
 | |
|   @staticmethod
 | |
|   def get_action_dml():
 | |
|     return """insert into test.for_test values (9, 'haha', 99)"""
 | |
|   @staticmethod
 | |
|   def get_rollback_sql():
 | |
|     return """delete from test.for_test where pk = 9"""
 | |
|   def dump_after_do_action(self):
 | |
|     my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test""")
 | |
|   def check_after_do_action(self):
 | |
|     (desc, results) = self._query_cursor.exec_query("""select * from test.for_test where pk = 9""")
 | |
|     if len(results) != 1:
 | |
|       raise MyError('there should be only one row whose primary key is 9 in table test.for_test, but there has {0} rows like that'.format(len(results)))
 | |
|     elif results[0][0] != 9 or results[0][1] != 'haha' or results[0][2] != 99:
 | |
|       raise MyError('the row that has been inserted is not expected, it is: [{0}]'.format(','.join(str(r) for r in results[0])))
 | |
| '''
 | |
| 
 | |
| #升级语句对应的action要写在下面的actions begin和actions end这两行之间,
 | |
| #因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
 | |
| #这两行之间的这些action,如果不写在这两行之间的话会导致清空不掉相应的action。
 | |
| 
 | |
| ####========******####======== actions begin ========####******========####
 | |
| ####========******####========= actions end =========####******========####
 | |
| 
 | |
| 
 | |
| def do_normal_dml_actions(cur):
 | |
|   import normal_dml_actions_pre
 | |
|   cls_list = reflect_action_cls_list(normal_dml_actions_pre, 'NormalDMLActionPre')
 | |
| 
 | |
|   # check if pre upgrade script can run reentrantly
 | |
|   query_cur = QueryCursor(cur)
 | |
|   version = fetch_observer_version(query_cur)
 | |
|   can_skip = False
 | |
|   if (cmp(version, "2.2.77") >= 0 and cmp(version, "3.0.0") < 0):
 | |
|     can_skip = True
 | |
|   elif (cmp(version, "3.1.1") >= 0):
 | |
|     can_skip = True
 | |
|   else:
 | |
|     can_skip = False
 | |
| 
 | |
|   for cls in cls_list:
 | |
|     logging.info('do normal dml acion, seq_num: %d', cls.get_seq_num())
 | |
|     action = cls(cur)
 | |
|     action.dump_before_do_action()
 | |
|     if False == can_skip or False == action.skip_action():
 | |
|       action.check_before_do_action()
 | |
|       action.do_action()
 | |
|     else:
 | |
|       logging.info("skip dml action, seq_num: %d", cls.get_seq_num())
 | |
|     action.dump_after_do_action()
 | |
|     action.check_after_do_action()
 | |
| 
 | |
| def do_normal_dml_actions_by_standby_cluster(standby_cluster_infos):
 | |
|   try:
 | |
|     for standby_cluster_info in standby_cluster_infos:
 | |
|       logging.info("do_normal_dml_actions_by_standby_cluster: cluster_id = {0}, ip = {1}, port = {2}"
 | |
|                    .format(standby_cluster_info['cluster_id'],
 | |
|                            standby_cluster_info['ip'],
 | |
|                            standby_cluster_info['port']))
 | |
|       logging.info("create connection : cluster_id = {0}, ip = {1}, port = {2}"
 | |
|                    .format(standby_cluster_info['cluster_id'],
 | |
|                            standby_cluster_info['ip'],
 | |
|                            standby_cluster_info['port']))
 | |
|       conn = mysql.connector.connect(user     =  standby_cluster_info['user'],
 | |
|                                      password =  standby_cluster_info['pwd'],
 | |
|                                      host     =  standby_cluster_info['ip'],
 | |
|                                      port     =  standby_cluster_info['port'],
 | |
|                                      database =  'oceanbase',
 | |
|                                      raise_on_warnings = True)
 | |
| 
 | |
|       cur = conn.cursor(buffered=True)
 | |
|       conn.autocommit = True
 | |
|       query_cur = QueryCursor(cur)
 | |
|       is_primary = check_current_cluster_is_primary(query_cur)
 | |
|       if is_primary:
 | |
|         logging.exception("""primary cluster changed : cluster_id = {0}, ip = {1}, port = {2}"""
 | |
|                           .format(standby_cluster_info['cluster_id'],
 | |
|                                   standby_cluster_info['ip'],
 | |
|                                   standby_cluster_info['port']))
 | |
|         raise e
 | |
| 
 | |
|       ## process
 | |
|       do_normal_dml_actions(cur)
 | |
| 
 | |
|       cur.close()
 | |
|       conn.close()
 | |
|   except Exception, e:
 | |
|     logging.exception("""do_normal_dml_actions_by_standby_cluster failed""")
 | |
|     raise e
 | |
| 
 | |
| def get_normal_dml_actions_sqls_str():
 | |
|   import normal_dml_actions_pre
 | |
|   ret_str = ''
 | |
|   cls_list = reflect_action_cls_list(normal_dml_actions_pre, 'NormalDMLActionPre')
 | |
|   for i in range(0, len(cls_list)):
 | |
|     if i > 0:
 | |
|       ret_str += '\n'
 | |
|     ret_str += cls_list[i].get_action_dml() + ';'
 | |
|   return ret_str
 | 
