[CP] [UPGRADE] Calculate default timeout according to tenant count
This commit is contained in:
		| @ -127,16 +127,45 @@ def set_parameter(cur, parameter, value, timeout = 0): | ||||
|   cur.execute(sql) | ||||
|   wait_parameter_sync(cur, False, parameter, value, timeout) | ||||
|  | ||||
| def set_session_timeout(cur, seconds): | ||||
|   sql = "set @@session.ob_query_timeout = {0}".format(seconds * 1000 * 1000) | ||||
|   logging.info(sql) | ||||
|   cur.execute(sql) | ||||
|  | ||||
| def set_default_timeout_by_tenant(cur, timeout, timeout_per_tenant, min_timeout): | ||||
|   if timeout > 0: | ||||
|     logging.info("use timeout from opt, timeout(s):{0}".format(timeout)) | ||||
|   else: | ||||
|     query_cur = QueryCursor(cur) | ||||
|     tenant_id_list = fetch_tenant_ids(query_cur) | ||||
|     cal_timeout = len(tenant_id_list) * timeout_per_tenant | ||||
|     timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout) | ||||
|     logging.info("use default timeout caculated by tenants, " | ||||
|                  "timeout(s):{0}, tenant_count:{1}, " | ||||
|                  "timeout_per_tenant(s):{2}, min_timeout(s):{3}" | ||||
|                  .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout)) | ||||
|  | ||||
|   return timeout | ||||
|  | ||||
| def set_tenant_parameter(cur, parameter, value, timeout = 0): | ||||
|  | ||||
|   tenants_list = [] | ||||
|   if get_min_cluster_version(cur) < get_version("4.2.1.0"): | ||||
|     tenants_list = ['all'] | ||||
|   else: | ||||
|     tenants_list = ['sys', 'all_user', 'all_meta'] | ||||
|  | ||||
|   query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) | ||||
|  | ||||
|   set_session_timeout(cur, query_timeout) | ||||
|  | ||||
|   for tenants in tenants_list: | ||||
|     sql = """alter system set {0} = '{1}' tenant = '{2}'""".format(parameter, value, tenants) | ||||
|     logging.info(sql) | ||||
|     cur.execute(sql) | ||||
|  | ||||
|   set_session_timeout(cur, 10) | ||||
|  | ||||
|   wait_parameter_sync(cur, True, parameter, value, timeout) | ||||
|  | ||||
| def get_ori_enable_ddl(cur, timeout): | ||||
| @ -225,7 +254,20 @@ def wait_parameter_sync(cur, is_tenant_config, key, value, timeout): | ||||
|   table_name = "GV$OB_PARAMETERS" if not is_tenant_config else "__all_virtual_tenant_parameter_info" | ||||
|   sql = """select count(*) as cnt from oceanbase.{0} | ||||
|            where name = '{1}' and value != '{2}'""".format(table_name, key, value) | ||||
|   times = (timeout if timeout > 0 else 60) / 5 | ||||
|  | ||||
|   wait_timeout = 0 | ||||
|   query_timeout = 0 | ||||
|   if not is_tenant_config or timeout > 0: | ||||
|     wait_timeout = (timeout if timeout > 0 else 60) | ||||
|     query_timeout = wait_timeout | ||||
|   else: | ||||
|     # is_tenant_config & timeout not set | ||||
|     wait_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) | ||||
|     query_timeout = set_default_timeout_by_tenant(cur, timeout, 2, 60) | ||||
|  | ||||
|   set_session_timeout(cur, query_timeout) | ||||
|  | ||||
|   times = wait_timeout / 5 | ||||
|   while times >= 0: | ||||
|     logging.info(sql) | ||||
|     cur.execute(sql) | ||||
| @ -245,6 +287,8 @@ def wait_parameter_sync(cur, is_tenant_config, key, value, timeout): | ||||
|       raise e | ||||
|     time.sleep(5) | ||||
|  | ||||
|   set_session_timeout(cur, 10) | ||||
|  | ||||
| def do_begin_upgrade(cur, timeout): | ||||
|  | ||||
|   if not check_parameter(cur, False, "enable_upgrade_mode", "True"): | ||||
| @ -316,24 +360,38 @@ def do_suspend_merge(cur, timeout): | ||||
|     tenants_list = ['all'] | ||||
|   else: | ||||
|     tenants_list = ['sys', 'all_user', 'all_meta'] | ||||
|  | ||||
|   query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) | ||||
|  | ||||
|   set_session_timeout(cur, query_timeout) | ||||
|  | ||||
|   for tenants in tenants_list: | ||||
|     action_sql = "alter system suspend merge tenant = {0}".format(tenants) | ||||
|     rollback_sql = "alter system resume merge tenant = {0}".format(tenants) | ||||
|     logging.info(action_sql) | ||||
|     cur.execute(action_sql) | ||||
|  | ||||
|   set_session_timeout(cur, 10) | ||||
|  | ||||
| def do_resume_merge(cur, timeout): | ||||
|   tenants_list = [] | ||||
|   if get_min_cluster_version(cur) < get_version("4.2.1.0"): | ||||
|     tenants_list = ['all'] | ||||
|   else: | ||||
|     tenants_list = ['sys', 'all_user', 'all_meta'] | ||||
|  | ||||
|   query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) | ||||
|  | ||||
|   set_session_timeout(cur, query_timeout) | ||||
|  | ||||
|   for tenants in tenants_list: | ||||
|     action_sql = "alter system resume merge tenant = {0}".format(tenants) | ||||
|     rollback_sql = "alter system suspend merge tenant = {0}".format(tenants) | ||||
|     logging.info(action_sql) | ||||
|     cur.execute(action_sql) | ||||
|  | ||||
|   set_session_timeout(cur, 10) | ||||
|  | ||||
| class Cursor: | ||||
|   __cursor = None | ||||
|   def __init__(self, cursor): | ||||
|  | ||||
| @ -21,17 +21,7 @@ def do_upgrade(conn, cur, timeout, user, pwd): | ||||
|   else: | ||||
|     run_upgrade_job(conn, cur, "UPGRADE_VIRTUAL_SCHEMA", timeout) | ||||
|  | ||||
|   # just to make __all_virtual_upgrade_inspection avaliable | ||||
|   timeout_ts = (timeout if timeout > 0 else 600) * 1000 * 1000 | ||||
|   sql = "set @@session.ob_query_timeout = {0}".format(timeout_ts) | ||||
|   logging.info(sql) | ||||
|   cur.execute(sql) | ||||
|   sql = "alter system run job 'root_inspection'" | ||||
|   logging.info(sql) | ||||
|   cur.execute(sql) | ||||
|   sql = "set @@session.ob_query_timeout = 10000000" | ||||
|   logging.info(sql) | ||||
|   cur.execute(sql) | ||||
|   run_root_inspection(cur, timeout) | ||||
| ####========******####======== actions begin ========####******========#### | ||||
|   upgrade_syslog_level(conn, cur) | ||||
|   return | ||||
| @ -47,7 +37,6 @@ def upgrade_syslog_level(conn, cur): | ||||
|     info_cnt = result[0][0] | ||||
|     if info_cnt > 0: | ||||
|       actions.set_parameter(cur, "syslog_level", "WDIAG") | ||||
|  | ||||
|   except Exception, e: | ||||
|     logging.warn("upgrade syslog level failed!") | ||||
|     raise e | ||||
| @ -62,6 +51,18 @@ def query(cur, sql): | ||||
| def get_tenant_ids(cur): | ||||
|   return [_[0] for _ in query(cur, 'select tenant_id from oceanbase.__all_tenant')] | ||||
|  | ||||
| def run_root_inspection(cur, timeout): | ||||
|  | ||||
|   query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600) | ||||
|  | ||||
|   actions.set_session_timeout(cur, query_timeout) | ||||
|  | ||||
|   sql = "alter system run job 'root_inspection'" | ||||
|   logging.info(sql) | ||||
|   cur.execute(sql) | ||||
|  | ||||
|   actions.set_session_timeout(cur, 10) | ||||
|  | ||||
| def upgrade_across_version(cur): | ||||
|   current_data_version = actions.get_current_data_version() | ||||
|   int_current_data_version = actions.get_version(current_data_version) | ||||
| @ -167,7 +168,9 @@ def check_can_run_upgrade_job(cur, job_name): | ||||
|  | ||||
| def check_upgrade_job_result(cur, job_name, timeout, max_used_job_id): | ||||
|   try: | ||||
|     times = (timeout if timeout > 0 else 3600) / 10 | ||||
|     wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 100, 3600) | ||||
|  | ||||
|     times = wait_timeout / 10 | ||||
|     while (times >= 0): | ||||
|       sql = """select job_status, rs_svr_ip, rs_svr_port, gmt_create from oceanbase.__all_rootservice_job | ||||
|                where job_type = '{0}' and job_id > {1} order by job_id desc limit 1 | ||||
|  | ||||
| @ -73,7 +73,7 @@ sys.argv[0] + """ [OPTIONS]""" +\ | ||||
| '                    that all modules should be run. They are splitted by ",".\n' +\ | ||||
| '                    For example: -m all, or --module=ddl,normal_dml,special_action\n' +\ | ||||
| '-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\ | ||||
| '-t, --timeout=name  check timeout, default: 600(s).\n' + \ | ||||
| '-t, --timeout=name  check timeout.\n' + \ | ||||
| '-z, --zone=name     If zone is not specified, check all servers status in cluster. \n' +\ | ||||
| '                    Otherwise, only check servers status in specified zone. \n' + \ | ||||
| '\n\n' +\ | ||||
| @ -135,8 +135,7 @@ Option('p', 'password', True, False, ''),\ | ||||
| Option('m', 'module', True, False, 'all'),\ | ||||
| # 日志文件路径,不同脚本的main函数中中会改成不同的默认值 | ||||
| Option('l', 'log-file', True, False),\ | ||||
| # 一些检查的超时时间,默认是600s | ||||
| Option('t', 'timeout', True, False, '600'),\ | ||||
| Option('t', 'timeout', True, False, 0),\ | ||||
| Option('z', 'zone', True, False, ''),\ | ||||
| ]\ | ||||
|  | ||||
| @ -288,13 +287,38 @@ def check_zone_valid(query_cur, zone): | ||||
|   else: | ||||
|     logging.info("zone is empty, check all servers in cluster") | ||||
|  | ||||
| def fetch_tenant_ids(query_cur): | ||||
|   try: | ||||
|     tenant_id_list = [] | ||||
|     (desc, results) = query_cur.exec_query("""select distinct tenant_id from oceanbase.__all_tenant order by tenant_id desc""") | ||||
|     for r in results: | ||||
|       tenant_id_list.append(r[0]) | ||||
|     return tenant_id_list | ||||
|   except Exception, e: | ||||
|     logging.exception('fail to fetch distinct tenant ids') | ||||
|     raise e | ||||
|  | ||||
| def set_default_timeout_by_tenant(query_cur, timeout, timeout_per_tenant, min_timeout): | ||||
|   if timeout > 0: | ||||
|     logging.info("use timeout from opt, timeout(s):{0}".format(timeout)) | ||||
|   else: | ||||
|     tenant_id_list = fetch_tenant_ids(query_cur) | ||||
|     cal_timeout = len(tenant_id_list) * timeout_per_tenant | ||||
|     timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout) | ||||
|     logging.info("use default timeout caculated by tenants, " | ||||
|                  "timeout(s):{0}, tenant_count:{1}, " | ||||
|                  "timeout_per_tenant(s):{2}, min_timeout(s):{3}" | ||||
|                  .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout)) | ||||
|  | ||||
|   return timeout | ||||
|  | ||||
| #### START #### | ||||
| # 0. 检查server版本是否严格一致 | ||||
| def check_server_version_by_zone(query_cur, zone): | ||||
|   if zone == '': | ||||
|     logging.info("skip check server version by cluster") | ||||
|   else: | ||||
|     sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server where zone = '{0}'""".format(zone); | ||||
|     sql = """select distinct(substring_index(build_version, '_', 1)) from oceanbase.__all_server where zone = '{0}'""".format(zone); | ||||
|     (desc, results) = query_cur.exec_query(sql); | ||||
|     if len(results) != 1: | ||||
|       raise MyError("servers build_version not match") | ||||
| @ -304,8 +328,9 @@ def check_server_version_by_zone(query_cur, zone): | ||||
| # 1. 检查paxos副本是否同步, paxos副本是否缺失 | ||||
| def check_paxos_replica(query_cur, timeout): | ||||
|   # 1.1 检查paxos副本是否同步 | ||||
|   sql = """select count(*) from GV$OB_LOG_STAT where in_sync = 'NO'""" | ||||
|   check_until_timeout(query_cur, sql, 0, timeout) | ||||
|   sql = """select count(*) from oceanbase.GV$OB_LOG_STAT where in_sync = 'NO'""" | ||||
|   wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600) | ||||
|   check_until_timeout(query_cur, sql, 0, wait_timeout) | ||||
|  | ||||
|   # 1.2 检查paxos副本是否有缺失 TODO | ||||
|   logging.info('check paxos replica success') | ||||
| @ -315,26 +340,29 @@ def check_observer_status(query_cur, zone, timeout): | ||||
|   sql = """select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status='inactive')""" | ||||
|   if zone != '': | ||||
|     sql += """ and zone = '{0}'""".format(zone) | ||||
|   check_until_timeout(query_cur, sql, 0, timeout) | ||||
|   wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600) | ||||
|   check_until_timeout(query_cur, sql, 0, wait_timeout) | ||||
|  | ||||
| # 3. 检查schema是否刷新成功 | ||||
| def check_schema_status(query_cur, timeout): | ||||
|   sql = """select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b""" | ||||
|   check_until_timeout(query_cur, sql, 1, timeout) | ||||
|   wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600) | ||||
|   check_until_timeout(query_cur, sql, 1, wait_timeout) | ||||
|  | ||||
| # 4. check major finish | ||||
| def check_major_merge(query_cur, timeout): | ||||
|   need_check = 0 | ||||
|   (desc, results) = query_cur.exec_query("""select distinct value from  GV$OB_PARAMETERs where name = 'enable_major_freeze';""") | ||||
|   (desc, results) = query_cur.exec_query("""select distinct value from oceanbase.GV$OB_PARAMETERS where name = 'enable_major_freeze';""") | ||||
|   if len(results) != 1: | ||||
|     need_check = 1 | ||||
|   elif results[0][0] != 'True': | ||||
|     need_check = 1 | ||||
|   if need_check == 1: | ||||
|     sql = """select count(1) from CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""" | ||||
|     check_until_timeout(query_cur, sql, 0, timeout) | ||||
|     sql2 = """select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""" | ||||
|     check_until_timeout(query_cur, sql2, 0, timeout) | ||||
|     wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600) | ||||
|     sql = """select count(1) from oceanbase.CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""" | ||||
|     check_until_timeout(query_cur, sql, 0, wait_timeout) | ||||
|     sql2 = """select /*+ query_timeout(1000000000) */ count(1) from oceanbase.__all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""" | ||||
|     check_until_timeout(query_cur, sql2, 0, wait_timeout) | ||||
|  | ||||
| def check_until_timeout(query_cur, sql, value, timeout): | ||||
|   times = timeout / 10 | ||||
| @ -366,7 +394,6 @@ def do_check(my_host, my_port, my_user, my_passwd, upgrade_params, timeout, need | ||||
|                                    raise_on_warnings = True) | ||||
|     conn.autocommit = True | ||||
|     cur = conn.cursor(buffered=True) | ||||
|     timeout = timeout if timeout > 0 else 600 | ||||
|     try: | ||||
|       query_cur = QueryCursor(cur) | ||||
|       check_zone_valid(query_cur, zone) | ||||
|  | ||||
| @ -135,16 +135,45 @@ | ||||
| #  cur.execute(sql) | ||||
| #  wait_parameter_sync(cur, False, parameter, value, timeout) | ||||
| # | ||||
| #def set_session_timeout(cur, seconds): | ||||
| #  sql = "set @@session.ob_query_timeout = {0}".format(seconds * 1000 * 1000) | ||||
| #  logging.info(sql) | ||||
| #  cur.execute(sql) | ||||
| # | ||||
| #def set_default_timeout_by_tenant(cur, timeout, timeout_per_tenant, min_timeout): | ||||
| #  if timeout > 0: | ||||
| #    logging.info("use timeout from opt, timeout(s):{0}".format(timeout)) | ||||
| #  else: | ||||
| #    query_cur = QueryCursor(cur) | ||||
| #    tenant_id_list = fetch_tenant_ids(query_cur) | ||||
| #    cal_timeout = len(tenant_id_list) * timeout_per_tenant | ||||
| #    timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout) | ||||
| #    logging.info("use default timeout caculated by tenants, " | ||||
| #                 "timeout(s):{0}, tenant_count:{1}, " | ||||
| #                 "timeout_per_tenant(s):{2}, min_timeout(s):{3}" | ||||
| #                 .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout)) | ||||
| # | ||||
| #  return timeout | ||||
| # | ||||
| #def set_tenant_parameter(cur, parameter, value, timeout = 0): | ||||
| # | ||||
| #  tenants_list = [] | ||||
| #  if get_min_cluster_version(cur) < get_version("4.2.1.0"): | ||||
| #    tenants_list = ['all'] | ||||
| #  else: | ||||
| #    tenants_list = ['sys', 'all_user', 'all_meta'] | ||||
| # | ||||
| #  query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) | ||||
| # | ||||
| #  set_session_timeout(cur, query_timeout) | ||||
| # | ||||
| #  for tenants in tenants_list: | ||||
| #    sql = """alter system set {0} = '{1}' tenant = '{2}'""".format(parameter, value, tenants) | ||||
| #    logging.info(sql) | ||||
| #    cur.execute(sql) | ||||
| # | ||||
| #  set_session_timeout(cur, 10) | ||||
| # | ||||
| #  wait_parameter_sync(cur, True, parameter, value, timeout) | ||||
| # | ||||
| #def get_ori_enable_ddl(cur, timeout): | ||||
| @ -233,7 +262,20 @@ | ||||
| #  table_name = "GV$OB_PARAMETERS" if not is_tenant_config else "__all_virtual_tenant_parameter_info" | ||||
| #  sql = """select count(*) as cnt from oceanbase.{0} | ||||
| #           where name = '{1}' and value != '{2}'""".format(table_name, key, value) | ||||
| #  times = (timeout if timeout > 0 else 60) / 5 | ||||
| # | ||||
| #  wait_timeout = 0 | ||||
| #  query_timeout = 0 | ||||
| #  if not is_tenant_config or timeout > 0: | ||||
| #    wait_timeout = (timeout if timeout > 0 else 60) | ||||
| #    query_timeout = wait_timeout | ||||
| #  else: | ||||
| #    # is_tenant_config & timeout not set | ||||
| #    wait_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) | ||||
| #    query_timeout = set_default_timeout_by_tenant(cur, timeout, 2, 60) | ||||
| # | ||||
| #  set_session_timeout(cur, query_timeout) | ||||
| # | ||||
| #  times = wait_timeout / 5 | ||||
| #  while times >= 0: | ||||
| #    logging.info(sql) | ||||
| #    cur.execute(sql) | ||||
| @ -253,6 +295,8 @@ | ||||
| #      raise e | ||||
| #    time.sleep(5) | ||||
| # | ||||
| #  set_session_timeout(cur, 10) | ||||
| # | ||||
| #def do_begin_upgrade(cur, timeout): | ||||
| # | ||||
| #  if not check_parameter(cur, False, "enable_upgrade_mode", "True"): | ||||
| @ -324,24 +368,38 @@ | ||||
| #    tenants_list = ['all'] | ||||
| #  else: | ||||
| #    tenants_list = ['sys', 'all_user', 'all_meta'] | ||||
| # | ||||
| #  query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) | ||||
| # | ||||
| #  set_session_timeout(cur, query_timeout) | ||||
| # | ||||
| #  for tenants in tenants_list: | ||||
| #    action_sql = "alter system suspend merge tenant = {0}".format(tenants) | ||||
| #    rollback_sql = "alter system resume merge tenant = {0}".format(tenants) | ||||
| #    logging.info(action_sql) | ||||
| #    cur.execute(action_sql) | ||||
| # | ||||
| #  set_session_timeout(cur, 10) | ||||
| # | ||||
| #def do_resume_merge(cur, timeout): | ||||
| #  tenants_list = [] | ||||
| #  if get_min_cluster_version(cur) < get_version("4.2.1.0"): | ||||
| #    tenants_list = ['all'] | ||||
| #  else: | ||||
| #    tenants_list = ['sys', 'all_user', 'all_meta'] | ||||
| # | ||||
| #  query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) | ||||
| # | ||||
| #  set_session_timeout(cur, query_timeout) | ||||
| # | ||||
| #  for tenants in tenants_list: | ||||
| #    action_sql = "alter system resume merge tenant = {0}".format(tenants) | ||||
| #    rollback_sql = "alter system suspend merge tenant = {0}".format(tenants) | ||||
| #    logging.info(action_sql) | ||||
| #    cur.execute(action_sql) | ||||
| # | ||||
| #  set_session_timeout(cur, 10) | ||||
| # | ||||
| #class Cursor: | ||||
| #  __cursor = None | ||||
| #  def __init__(self, cursor): | ||||
| @ -1334,17 +1392,7 @@ | ||||
| #  else: | ||||
| #    run_upgrade_job(conn, cur, "UPGRADE_VIRTUAL_SCHEMA", timeout) | ||||
| # | ||||
| #  # just to make __all_virtual_upgrade_inspection avaliable | ||||
| #  timeout_ts = (timeout if timeout > 0 else 600) * 1000 * 1000 | ||||
| #  sql = "set @@session.ob_query_timeout = {0}".format(timeout_ts) | ||||
| #  logging.info(sql) | ||||
| #  cur.execute(sql) | ||||
| #  sql = "alter system run job 'root_inspection'" | ||||
| #  logging.info(sql) | ||||
| #  cur.execute(sql) | ||||
| #  sql = "set @@session.ob_query_timeout = 10000000" | ||||
| #  logging.info(sql) | ||||
| #  cur.execute(sql) | ||||
| #  run_root_inspection(cur, timeout) | ||||
| #####========******####======== actions begin ========####******========#### | ||||
| #  upgrade_syslog_level(conn, cur) | ||||
| #  return | ||||
| @ -1360,7 +1408,6 @@ | ||||
| #    info_cnt = result[0][0] | ||||
| #    if info_cnt > 0: | ||||
| #      actions.set_parameter(cur, "syslog_level", "WDIAG") | ||||
| # | ||||
| #  except Exception, e: | ||||
| #    logging.warn("upgrade syslog level failed!") | ||||
| #    raise e | ||||
| @ -1375,6 +1422,18 @@ | ||||
| #def get_tenant_ids(cur): | ||||
| #  return [_[0] for _ in query(cur, 'select tenant_id from oceanbase.__all_tenant')] | ||||
| # | ||||
| #def run_root_inspection(cur, timeout): | ||||
| # | ||||
| #  query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600) | ||||
| # | ||||
| #  actions.set_session_timeout(cur, query_timeout) | ||||
| # | ||||
| #  sql = "alter system run job 'root_inspection'" | ||||
| #  logging.info(sql) | ||||
| #  cur.execute(sql) | ||||
| # | ||||
| #  actions.set_session_timeout(cur, 10) | ||||
| # | ||||
| #def upgrade_across_version(cur): | ||||
| #  current_data_version = actions.get_current_data_version() | ||||
| #  int_current_data_version = actions.get_version(current_data_version) | ||||
| @ -1480,7 +1539,9 @@ | ||||
| # | ||||
| #def check_upgrade_job_result(cur, job_name, timeout, max_used_job_id): | ||||
| #  try: | ||||
| #    times = (timeout if timeout > 0 else 3600) / 10 | ||||
| #    wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 100, 3600) | ||||
| # | ||||
| #    times = wait_timeout / 10 | ||||
| #    while (times >= 0): | ||||
| #      sql = """select job_status, rs_svr_ip, rs_svr_port, gmt_create from oceanbase.__all_rootservice_job | ||||
| #               where job_type = '{0}' and job_id > {1} order by job_id desc limit 1 | ||||
| @ -2217,7 +2278,8 @@ | ||||
| #        fail_list.append("""still has tenant snapshot task, upgrade is not allowed temporarily""") | ||||
| #      else: | ||||
| #        logging.info('check tenant snapshot task success') | ||||
| ## 19. 检查是否有租户在升到4.3.0版本之前已将binlog_row_image设为MINIMAL | ||||
| # | ||||
| ## 17. 检查是否有租户在升到4.3.0版本之前已将binlog_row_image设为MINIMAL | ||||
| #def check_variable_binlog_row_image(query_cur): | ||||
| ## 4.3.0.0之前的版本,MINIMAL模式生成的日志CDC无法正常消费(DELETE日志). | ||||
| ## 4.3.0版本开始,MINIMAL模式做了改进,支持CDC消费,需要在升级到4.3.0.0之后再打开. | ||||
| @ -2405,7 +2467,7 @@ | ||||
| #'                    that all modules should be run. They are splitted by ",".\n' +\ | ||||
| #'                    For example: -m all, or --module=ddl,normal_dml,special_action\n' +\ | ||||
| #'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\ | ||||
| #'-t, --timeout=name  check timeout, default: 600(s).\n' + \ | ||||
| #'-t, --timeout=name  check timeout.\n' + \ | ||||
| #'-z, --zone=name     If zone is not specified, check all servers status in cluster. \n' +\ | ||||
| #'                    Otherwise, only check servers status in specified zone. \n' + \ | ||||
| #'\n\n' +\ | ||||
| @ -2467,8 +2529,7 @@ | ||||
| #Option('m', 'module', True, False, 'all'),\ | ||||
| ## 日志文件路径,不同脚本的main函数中中会改成不同的默认值 | ||||
| #Option('l', 'log-file', True, False),\ | ||||
| ## 一些检查的超时时间,默认是600s | ||||
| #Option('t', 'timeout', True, False, '600'),\ | ||||
| #Option('t', 'timeout', True, False, 0),\ | ||||
| #Option('z', 'zone', True, False, ''),\ | ||||
| #]\ | ||||
| # | ||||
| @ -2620,13 +2681,38 @@ | ||||
| #  else: | ||||
| #    logging.info("zone is empty, check all servers in cluster") | ||||
| # | ||||
| #def fetch_tenant_ids(query_cur): | ||||
| #  try: | ||||
| #    tenant_id_list = [] | ||||
| #    (desc, results) = query_cur.exec_query("""select distinct tenant_id from oceanbase.__all_tenant order by tenant_id desc""") | ||||
| #    for r in results: | ||||
| #      tenant_id_list.append(r[0]) | ||||
| #    return tenant_id_list | ||||
| #  except Exception, e: | ||||
| #    logging.exception('fail to fetch distinct tenant ids') | ||||
| #    raise e | ||||
| # | ||||
| #def set_default_timeout_by_tenant(query_cur, timeout, timeout_per_tenant, min_timeout): | ||||
| #  if timeout > 0: | ||||
| #    logging.info("use timeout from opt, timeout(s):{0}".format(timeout)) | ||||
| #  else: | ||||
| #    tenant_id_list = fetch_tenant_ids(query_cur) | ||||
| #    cal_timeout = len(tenant_id_list) * timeout_per_tenant | ||||
| #    timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout) | ||||
| #    logging.info("use default timeout caculated by tenants, " | ||||
| #                 "timeout(s):{0}, tenant_count:{1}, " | ||||
| #                 "timeout_per_tenant(s):{2}, min_timeout(s):{3}" | ||||
| #                 .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout)) | ||||
| # | ||||
| #  return timeout | ||||
| # | ||||
| ##### START #### | ||||
| ## 0. 检查server版本是否严格一致 | ||||
| #def check_server_version_by_zone(query_cur, zone): | ||||
| #  if zone == '': | ||||
| #    logging.info("skip check server version by cluster") | ||||
| #  else: | ||||
| #    sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server where zone = '{0}'""".format(zone); | ||||
| #    sql = """select distinct(substring_index(build_version, '_', 1)) from oceanbase.__all_server where zone = '{0}'""".format(zone); | ||||
| #    (desc, results) = query_cur.exec_query(sql); | ||||
| #    if len(results) != 1: | ||||
| #      raise MyError("servers build_version not match") | ||||
| @ -2636,8 +2722,9 @@ | ||||
| ## 1. 检查paxos副本是否同步, paxos副本是否缺失 | ||||
| #def check_paxos_replica(query_cur, timeout): | ||||
| #  # 1.1 检查paxos副本是否同步 | ||||
| #  sql = """select count(*) from GV$OB_LOG_STAT where in_sync = 'NO'""" | ||||
| #  check_until_timeout(query_cur, sql, 0, timeout) | ||||
| #  sql = """select count(*) from oceanbase.GV$OB_LOG_STAT where in_sync = 'NO'""" | ||||
| #  wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600) | ||||
| #  check_until_timeout(query_cur, sql, 0, wait_timeout) | ||||
| # | ||||
| #  # 1.2 检查paxos副本是否有缺失 TODO | ||||
| #  logging.info('check paxos replica success') | ||||
| @ -2647,26 +2734,29 @@ | ||||
| #  sql = """select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status='inactive')""" | ||||
| #  if zone != '': | ||||
| #    sql += """ and zone = '{0}'""".format(zone) | ||||
| #  check_until_timeout(query_cur, sql, 0, timeout) | ||||
| #  wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600) | ||||
| #  check_until_timeout(query_cur, sql, 0, wait_timeout) | ||||
| # | ||||
| ## 3. 检查schema是否刷新成功 | ||||
| #def check_schema_status(query_cur, timeout): | ||||
| #  sql = """select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b""" | ||||
| #  check_until_timeout(query_cur, sql, 1, timeout) | ||||
| #  wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600) | ||||
| #  check_until_timeout(query_cur, sql, 1, wait_timeout) | ||||
| # | ||||
| ## 4. check major finish | ||||
| #def check_major_merge(query_cur, timeout): | ||||
| #  need_check = 0 | ||||
| #  (desc, results) = query_cur.exec_query("""select distinct value from  GV$OB_PARAMETERs where name = 'enable_major_freeze';""") | ||||
| #  (desc, results) = query_cur.exec_query("""select distinct value from oceanbase.GV$OB_PARAMETERS where name = 'enable_major_freeze';""") | ||||
| #  if len(results) != 1: | ||||
| #    need_check = 1 | ||||
| #  elif results[0][0] != 'True': | ||||
| #    need_check = 1 | ||||
| #  if need_check == 1: | ||||
| #    sql = """select count(1) from CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""" | ||||
| #    check_until_timeout(query_cur, sql, 0, timeout) | ||||
| #    sql2 = """select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""" | ||||
| #    check_until_timeout(query_cur, sql2, 0, timeout) | ||||
| #    wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600) | ||||
| #    sql = """select count(1) from oceanbase.CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""" | ||||
| #    check_until_timeout(query_cur, sql, 0, wait_timeout) | ||||
| #    sql2 = """select /*+ query_timeout(1000000000) */ count(1) from oceanbase.__all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""" | ||||
| #    check_until_timeout(query_cur, sql2, 0, wait_timeout) | ||||
| # | ||||
| #def check_until_timeout(query_cur, sql, value, timeout): | ||||
| #  times = timeout / 10 | ||||
| @ -2698,7 +2788,6 @@ | ||||
| #                                   raise_on_warnings = True) | ||||
| #    conn.autocommit = True | ||||
| #    cur = conn.cursor(buffered=True) | ||||
| #    timeout = timeout if timeout > 0 else 600 | ||||
| #    try: | ||||
| #      query_cur = QueryCursor(cur) | ||||
| #      check_zone_valid(query_cur, zone) | ||||
| @ -2795,8 +2884,14 @@ | ||||
| #  # check compatible sync | ||||
| #  parameter_count = int(server_count) * int(tenant_count) | ||||
| #  current_data_version = actions.get_current_data_version() | ||||
| # | ||||
| #  query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 2, 60) | ||||
| #  actions.set_session_timeout(cur, query_timeout) | ||||
| # | ||||
| #  sql = """select count(*) as cnt from oceanbase.__all_virtual_tenant_parameter_info where name = 'compatible' and value = '{0}' and tenant_id in ({1})""".format(current_data_version, tenant_ids_str) | ||||
| #  times = (timeout if timeout > 0 else 60) / 5 | ||||
| # | ||||
| #  wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 60) | ||||
| #  times = wait_timeout / 5 | ||||
| #  while times >= 0: | ||||
| #    logging.info(sql) | ||||
| #    cur.execute(sql) | ||||
| @ -2816,6 +2911,8 @@ | ||||
| #      raise e | ||||
| #    time.sleep(5) | ||||
| # | ||||
| #  actions.set_session_timeout(cur, 10) | ||||
| # | ||||
| #  # check target_data_version/current_data_version from __all_core_table | ||||
| #  int_current_data_version = actions.get_version(current_data_version) | ||||
| #  sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0} and tenant_id in ({1})".format(int_current_data_version, tenant_ids_str) | ||||
| @ -2830,16 +2927,20 @@ | ||||
| #    logging.info("all tenant's target_data_version/current_data_version are match with {0}".format(current_data_version)) | ||||
| # | ||||
| ## 3 检查内部表自检是否成功 | ||||
| #def check_root_inspection(query_cur, timeout): | ||||
| #def check_root_inspection(cur, query_cur, timeout): | ||||
| #  sql = "select count(*) from oceanbase.__all_virtual_upgrade_inspection where info != 'succeed'" | ||||
| #  times = timeout if timeout > 0 else 180 | ||||
| #  while times > 0 : | ||||
| # | ||||
| #  wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600) | ||||
| # | ||||
| #  times = wait_timeout / 10 | ||||
| #  while times >= 0 : | ||||
| #    (desc, results) = query_cur.exec_query(sql) | ||||
| #    if results[0][0] == 0: | ||||
| #      break | ||||
| #    time.sleep(10) | ||||
| #    times -= 1 | ||||
| #  if times == 0: | ||||
| # | ||||
| #  if times == -1: | ||||
| #    logging.warn('check root inspection failed!') | ||||
| #    raise e | ||||
| #  logging.info('check root inspection success') | ||||
| @ -2867,7 +2968,7 @@ | ||||
| #  try: | ||||
| #    check_cluster_version(cur, timeout) | ||||
| #    check_data_version(cur, query_cur, timeout) | ||||
| #    check_root_inspection(query_cur, timeout) | ||||
| #    check_root_inspection(cur, query_cur, timeout) | ||||
| #    enable_ddl(cur, timeout) | ||||
| #    enable_rebalance(cur, timeout) | ||||
| #    enable_rereplication(cur, timeout) | ||||
|  | ||||
| @ -41,8 +41,14 @@ def check_data_version(cur, query_cur, timeout): | ||||
|   # check compatible sync | ||||
|   parameter_count = int(server_count) * int(tenant_count) | ||||
|   current_data_version = actions.get_current_data_version() | ||||
|  | ||||
|   query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 2, 60) | ||||
|   actions.set_session_timeout(cur, query_timeout) | ||||
|  | ||||
|   sql = """select count(*) as cnt from oceanbase.__all_virtual_tenant_parameter_info where name = 'compatible' and value = '{0}' and tenant_id in ({1})""".format(current_data_version, tenant_ids_str) | ||||
|   times = (timeout if timeout > 0 else 60) / 5 | ||||
|  | ||||
|   wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 60) | ||||
|   times = wait_timeout / 5 | ||||
|   while times >= 0: | ||||
|     logging.info(sql) | ||||
|     cur.execute(sql) | ||||
| @ -62,6 +68,8 @@ def check_data_version(cur, query_cur, timeout): | ||||
|       raise e | ||||
|     time.sleep(5) | ||||
|  | ||||
|   actions.set_session_timeout(cur, 10) | ||||
|  | ||||
|   # check target_data_version/current_data_version from __all_core_table | ||||
|   int_current_data_version = actions.get_version(current_data_version) | ||||
|   sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0} and tenant_id in ({1})".format(int_current_data_version, tenant_ids_str) | ||||
| @ -76,16 +84,20 @@ def check_data_version(cur, query_cur, timeout): | ||||
|     logging.info("all tenant's target_data_version/current_data_version are match with {0}".format(current_data_version)) | ||||
|  | ||||
| # 3 检查内部表自检是否成功 | ||||
| def check_root_inspection(query_cur, timeout): | ||||
| def check_root_inspection(cur, query_cur, timeout): | ||||
|   sql = "select count(*) from oceanbase.__all_virtual_upgrade_inspection where info != 'succeed'" | ||||
|   times = timeout if timeout > 0 else 180 | ||||
|   while times > 0 : | ||||
|  | ||||
|   wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600) | ||||
|  | ||||
|   times = wait_timeout / 10 | ||||
|   while times >= 0 : | ||||
|     (desc, results) = query_cur.exec_query(sql) | ||||
|     if results[0][0] == 0: | ||||
|       break | ||||
|     time.sleep(10) | ||||
|     times -= 1 | ||||
|   if times == 0: | ||||
|  | ||||
|   if times == -1: | ||||
|     logging.warn('check root inspection failed!') | ||||
|     raise e | ||||
|   logging.info('check root inspection success') | ||||
| @ -113,7 +125,7 @@ def do_check(conn, cur, query_cur, timeout): | ||||
|   try: | ||||
|     check_cluster_version(cur, timeout) | ||||
|     check_data_version(cur, query_cur, timeout) | ||||
|     check_root_inspection(query_cur, timeout) | ||||
|     check_root_inspection(cur, query_cur, timeout) | ||||
|     enable_ddl(cur, timeout) | ||||
|     enable_rebalance(cur, timeout) | ||||
|     enable_rereplication(cur, timeout) | ||||
|  | ||||
| @ -135,16 +135,45 @@ | ||||
| #  cur.execute(sql) | ||||
| #  wait_parameter_sync(cur, False, parameter, value, timeout) | ||||
| # | ||||
| #def set_session_timeout(cur, seconds): | ||||
| #  sql = "set @@session.ob_query_timeout = {0}".format(seconds * 1000 * 1000) | ||||
| #  logging.info(sql) | ||||
| #  cur.execute(sql) | ||||
| # | ||||
| #def set_default_timeout_by_tenant(cur, timeout, timeout_per_tenant, min_timeout): | ||||
| #  if timeout > 0: | ||||
| #    logging.info("use timeout from opt, timeout(s):{0}".format(timeout)) | ||||
| #  else: | ||||
| #    query_cur = QueryCursor(cur) | ||||
| #    tenant_id_list = fetch_tenant_ids(query_cur) | ||||
| #    cal_timeout = len(tenant_id_list) * timeout_per_tenant | ||||
| #    timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout) | ||||
| #    logging.info("use default timeout caculated by tenants, " | ||||
| #                 "timeout(s):{0}, tenant_count:{1}, " | ||||
| #                 "timeout_per_tenant(s):{2}, min_timeout(s):{3}" | ||||
| #                 .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout)) | ||||
| # | ||||
| #  return timeout | ||||
| # | ||||
| #def set_tenant_parameter(cur, parameter, value, timeout = 0): | ||||
| # | ||||
| #  tenants_list = [] | ||||
| #  if get_min_cluster_version(cur) < get_version("4.2.1.0"): | ||||
| #    tenants_list = ['all'] | ||||
| #  else: | ||||
| #    tenants_list = ['sys', 'all_user', 'all_meta'] | ||||
| # | ||||
| #  query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) | ||||
| # | ||||
| #  set_session_timeout(cur, query_timeout) | ||||
| # | ||||
| #  for tenants in tenants_list: | ||||
| #    sql = """alter system set {0} = '{1}' tenant = '{2}'""".format(parameter, value, tenants) | ||||
| #    logging.info(sql) | ||||
| #    cur.execute(sql) | ||||
| # | ||||
| #  set_session_timeout(cur, 10) | ||||
| # | ||||
| #  wait_parameter_sync(cur, True, parameter, value, timeout) | ||||
| # | ||||
| #def get_ori_enable_ddl(cur, timeout): | ||||
| @ -233,7 +262,20 @@ | ||||
| #  table_name = "GV$OB_PARAMETERS" if not is_tenant_config else "__all_virtual_tenant_parameter_info" | ||||
| #  sql = """select count(*) as cnt from oceanbase.{0} | ||||
| #           where name = '{1}' and value != '{2}'""".format(table_name, key, value) | ||||
| #  times = (timeout if timeout > 0 else 60) / 5 | ||||
| # | ||||
| #  wait_timeout = 0 | ||||
| #  query_timeout = 0 | ||||
| #  if not is_tenant_config or timeout > 0: | ||||
| #    wait_timeout = (timeout if timeout > 0 else 60) | ||||
| #    query_timeout = wait_timeout | ||||
| #  else: | ||||
| #    # is_tenant_config & timeout not set | ||||
| #    wait_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) | ||||
| #    query_timeout = set_default_timeout_by_tenant(cur, timeout, 2, 60) | ||||
| # | ||||
| #  set_session_timeout(cur, query_timeout) | ||||
| # | ||||
| #  times = wait_timeout / 5 | ||||
| #  while times >= 0: | ||||
| #    logging.info(sql) | ||||
| #    cur.execute(sql) | ||||
| @ -253,6 +295,8 @@ | ||||
| #      raise e | ||||
| #    time.sleep(5) | ||||
| # | ||||
| #  set_session_timeout(cur, 10) | ||||
| # | ||||
| #def do_begin_upgrade(cur, timeout): | ||||
| # | ||||
| #  if not check_parameter(cur, False, "enable_upgrade_mode", "True"): | ||||
| @ -324,24 +368,38 @@ | ||||
| #    tenants_list = ['all'] | ||||
| #  else: | ||||
| #    tenants_list = ['sys', 'all_user', 'all_meta'] | ||||
| # | ||||
| #  query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) | ||||
| # | ||||
| #  set_session_timeout(cur, query_timeout) | ||||
| # | ||||
| #  for tenants in tenants_list: | ||||
| #    action_sql = "alter system suspend merge tenant = {0}".format(tenants) | ||||
| #    rollback_sql = "alter system resume merge tenant = {0}".format(tenants) | ||||
| #    logging.info(action_sql) | ||||
| #    cur.execute(action_sql) | ||||
| # | ||||
| #  set_session_timeout(cur, 10) | ||||
| # | ||||
| #def do_resume_merge(cur, timeout): | ||||
| #  tenants_list = [] | ||||
| #  if get_min_cluster_version(cur) < get_version("4.2.1.0"): | ||||
| #    tenants_list = ['all'] | ||||
| #  else: | ||||
| #    tenants_list = ['sys', 'all_user', 'all_meta'] | ||||
| # | ||||
| #  query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60) | ||||
| # | ||||
| #  set_session_timeout(cur, query_timeout) | ||||
| # | ||||
| #  for tenants in tenants_list: | ||||
| #    action_sql = "alter system resume merge tenant = {0}".format(tenants) | ||||
| #    rollback_sql = "alter system suspend merge tenant = {0}".format(tenants) | ||||
| #    logging.info(action_sql) | ||||
| #    cur.execute(action_sql) | ||||
| # | ||||
| #  set_session_timeout(cur, 10) | ||||
| # | ||||
| #class Cursor: | ||||
| #  __cursor = None | ||||
| #  def __init__(self, cursor): | ||||
| @ -1334,17 +1392,7 @@ | ||||
| #  else: | ||||
| #    run_upgrade_job(conn, cur, "UPGRADE_VIRTUAL_SCHEMA", timeout) | ||||
| # | ||||
| #  # just to make __all_virtual_upgrade_inspection avaliable | ||||
| #  timeout_ts = (timeout if timeout > 0 else 600) * 1000 * 1000 | ||||
| #  sql = "set @@session.ob_query_timeout = {0}".format(timeout_ts) | ||||
| #  logging.info(sql) | ||||
| #  cur.execute(sql) | ||||
| #  sql = "alter system run job 'root_inspection'" | ||||
| #  logging.info(sql) | ||||
| #  cur.execute(sql) | ||||
| #  sql = "set @@session.ob_query_timeout = 10000000" | ||||
| #  logging.info(sql) | ||||
| #  cur.execute(sql) | ||||
| #  run_root_inspection(cur, timeout) | ||||
| #####========******####======== actions begin ========####******========#### | ||||
| #  upgrade_syslog_level(conn, cur) | ||||
| #  return | ||||
| @ -1360,7 +1408,6 @@ | ||||
| #    info_cnt = result[0][0] | ||||
| #    if info_cnt > 0: | ||||
| #      actions.set_parameter(cur, "syslog_level", "WDIAG") | ||||
| # | ||||
| #  except Exception, e: | ||||
| #    logging.warn("upgrade syslog level failed!") | ||||
| #    raise e | ||||
| @ -1375,6 +1422,18 @@ | ||||
| #def get_tenant_ids(cur): | ||||
| #  return [_[0] for _ in query(cur, 'select tenant_id from oceanbase.__all_tenant')] | ||||
| # | ||||
| #def run_root_inspection(cur, timeout): | ||||
| # | ||||
| #  query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600) | ||||
| # | ||||
| #  actions.set_session_timeout(cur, query_timeout) | ||||
| # | ||||
| #  sql = "alter system run job 'root_inspection'" | ||||
| #  logging.info(sql) | ||||
| #  cur.execute(sql) | ||||
| # | ||||
| #  actions.set_session_timeout(cur, 10) | ||||
| # | ||||
| #def upgrade_across_version(cur): | ||||
| #  current_data_version = actions.get_current_data_version() | ||||
| #  int_current_data_version = actions.get_version(current_data_version) | ||||
| @ -1480,7 +1539,9 @@ | ||||
| # | ||||
| #def check_upgrade_job_result(cur, job_name, timeout, max_used_job_id): | ||||
| #  try: | ||||
| #    times = (timeout if timeout > 0 else 3600) / 10 | ||||
| #    wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 100, 3600) | ||||
| # | ||||
| #    times = wait_timeout / 10 | ||||
| #    while (times >= 0): | ||||
| #      sql = """select job_status, rs_svr_ip, rs_svr_port, gmt_create from oceanbase.__all_rootservice_job | ||||
| #               where job_type = '{0}' and job_id > {1} order by job_id desc limit 1 | ||||
| @ -2217,7 +2278,8 @@ | ||||
| #        fail_list.append("""still has tenant snapshot task, upgrade is not allowed temporarily""") | ||||
| #      else: | ||||
| #        logging.info('check tenant snapshot task success') | ||||
| ## 19. 检查是否有租户在升到4.3.0版本之前已将binlog_row_image设为MINIMAL | ||||
| # | ||||
| ## 17. 检查是否有租户在升到4.3.0版本之前已将binlog_row_image设为MINIMAL | ||||
| #def check_variable_binlog_row_image(query_cur): | ||||
| ## 4.3.0.0之前的版本,MINIMAL模式生成的日志CDC无法正常消费(DELETE日志). | ||||
| ## 4.3.0版本开始,MINIMAL模式做了改进,支持CDC消费,需要在升级到4.3.0.0之后再打开. | ||||
| @ -2405,7 +2467,7 @@ | ||||
| #'                    that all modules should be run. They are splitted by ",".\n' +\ | ||||
| #'                    For example: -m all, or --module=ddl,normal_dml,special_action\n' +\ | ||||
| #'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\ | ||||
| #'-t, --timeout=name  check timeout, default: 600(s).\n' + \ | ||||
| #'-t, --timeout=name  check timeout.\n' + \ | ||||
| #'-z, --zone=name     If zone is not specified, check all servers status in cluster. \n' +\ | ||||
| #'                    Otherwise, only check servers status in specified zone. \n' + \ | ||||
| #'\n\n' +\ | ||||
| @ -2467,8 +2529,7 @@ | ||||
| #Option('m', 'module', True, False, 'all'),\ | ||||
| ## 日志文件路径,不同脚本的main函数中中会改成不同的默认值 | ||||
| #Option('l', 'log-file', True, False),\ | ||||
| ## 一些检查的超时时间,默认是600s | ||||
| #Option('t', 'timeout', True, False, '600'),\ | ||||
| #Option('t', 'timeout', True, False, 0),\ | ||||
| #Option('z', 'zone', True, False, ''),\ | ||||
| #]\ | ||||
| # | ||||
| @ -2620,13 +2681,38 @@ | ||||
| #  else: | ||||
| #    logging.info("zone is empty, check all servers in cluster") | ||||
| # | ||||
| #def fetch_tenant_ids(query_cur): | ||||
| #  try: | ||||
| #    tenant_id_list = [] | ||||
| #    (desc, results) = query_cur.exec_query("""select distinct tenant_id from oceanbase.__all_tenant order by tenant_id desc""") | ||||
| #    for r in results: | ||||
| #      tenant_id_list.append(r[0]) | ||||
| #    return tenant_id_list | ||||
| #  except Exception, e: | ||||
| #    logging.exception('fail to fetch distinct tenant ids') | ||||
| #    raise e | ||||
| # | ||||
| #def set_default_timeout_by_tenant(query_cur, timeout, timeout_per_tenant, min_timeout): | ||||
| #  if timeout > 0: | ||||
| #    logging.info("use timeout from opt, timeout(s):{0}".format(timeout)) | ||||
| #  else: | ||||
| #    tenant_id_list = fetch_tenant_ids(query_cur) | ||||
| #    cal_timeout = len(tenant_id_list) * timeout_per_tenant | ||||
| #    timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout) | ||||
| #    logging.info("use default timeout caculated by tenants, " | ||||
| #                 "timeout(s):{0}, tenant_count:{1}, " | ||||
| #                 "timeout_per_tenant(s):{2}, min_timeout(s):{3}" | ||||
| #                 .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout)) | ||||
| # | ||||
| #  return timeout | ||||
| # | ||||
| ##### START #### | ||||
| ## 0. 检查server版本是否严格一致 | ||||
| #def check_server_version_by_zone(query_cur, zone): | ||||
| #  if zone == '': | ||||
| #    logging.info("skip check server version by cluster") | ||||
| #  else: | ||||
| #    sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server where zone = '{0}'""".format(zone); | ||||
| #    sql = """select distinct(substring_index(build_version, '_', 1)) from oceanbase.__all_server where zone = '{0}'""".format(zone); | ||||
| #    (desc, results) = query_cur.exec_query(sql); | ||||
| #    if len(results) != 1: | ||||
| #      raise MyError("servers build_version not match") | ||||
| @ -2636,8 +2722,9 @@ | ||||
| ## 1. 检查paxos副本是否同步, paxos副本是否缺失 | ||||
| #def check_paxos_replica(query_cur, timeout): | ||||
| #  # 1.1 检查paxos副本是否同步 | ||||
| #  sql = """select count(*) from GV$OB_LOG_STAT where in_sync = 'NO'""" | ||||
| #  check_until_timeout(query_cur, sql, 0, timeout) | ||||
| #  sql = """select count(*) from oceanbase.GV$OB_LOG_STAT where in_sync = 'NO'""" | ||||
| #  wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600) | ||||
| #  check_until_timeout(query_cur, sql, 0, wait_timeout) | ||||
| # | ||||
| #  # 1.2 检查paxos副本是否有缺失 TODO | ||||
| #  logging.info('check paxos replica success') | ||||
| @ -2647,26 +2734,29 @@ | ||||
| #  sql = """select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status='inactive')""" | ||||
| #  if zone != '': | ||||
| #    sql += """ and zone = '{0}'""".format(zone) | ||||
| #  check_until_timeout(query_cur, sql, 0, timeout) | ||||
| #  wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600) | ||||
| #  check_until_timeout(query_cur, sql, 0, wait_timeout) | ||||
| # | ||||
| ## 3. 检查schema是否刷新成功 | ||||
| #def check_schema_status(query_cur, timeout): | ||||
| #  sql = """select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b""" | ||||
| #  check_until_timeout(query_cur, sql, 1, timeout) | ||||
| #  wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600) | ||||
| #  check_until_timeout(query_cur, sql, 1, wait_timeout) | ||||
| # | ||||
| ## 4. check major finish | ||||
| #def check_major_merge(query_cur, timeout): | ||||
| #  need_check = 0 | ||||
| #  (desc, results) = query_cur.exec_query("""select distinct value from  GV$OB_PARAMETERs where name = 'enable_major_freeze';""") | ||||
| #  (desc, results) = query_cur.exec_query("""select distinct value from oceanbase.GV$OB_PARAMETERS where name = 'enable_major_freeze';""") | ||||
| #  if len(results) != 1: | ||||
| #    need_check = 1 | ||||
| #  elif results[0][0] != 'True': | ||||
| #    need_check = 1 | ||||
| #  if need_check == 1: | ||||
| #    sql = """select count(1) from CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""" | ||||
| #    check_until_timeout(query_cur, sql, 0, timeout) | ||||
| #    sql2 = """select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""" | ||||
| #    check_until_timeout(query_cur, sql2, 0, timeout) | ||||
| #    wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600) | ||||
| #    sql = """select count(1) from oceanbase.CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""" | ||||
| #    check_until_timeout(query_cur, sql, 0, wait_timeout) | ||||
| #    sql2 = """select /*+ query_timeout(1000000000) */ count(1) from oceanbase.__all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""" | ||||
| #    check_until_timeout(query_cur, sql2, 0, wait_timeout) | ||||
| # | ||||
| #def check_until_timeout(query_cur, sql, value, timeout): | ||||
| #  times = timeout / 10 | ||||
| @ -2698,7 +2788,6 @@ | ||||
| #                                   raise_on_warnings = True) | ||||
| #    conn.autocommit = True | ||||
| #    cur = conn.cursor(buffered=True) | ||||
| #    timeout = timeout if timeout > 0 else 600 | ||||
| #    try: | ||||
| #      query_cur = QueryCursor(cur) | ||||
| #      check_zone_valid(query_cur, zone) | ||||
| @ -2795,8 +2884,14 @@ | ||||
| #  # check compatible sync | ||||
| #  parameter_count = int(server_count) * int(tenant_count) | ||||
| #  current_data_version = actions.get_current_data_version() | ||||
| # | ||||
| #  query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 2, 60) | ||||
| #  actions.set_session_timeout(cur, query_timeout) | ||||
| # | ||||
| #  sql = """select count(*) as cnt from oceanbase.__all_virtual_tenant_parameter_info where name = 'compatible' and value = '{0}' and tenant_id in ({1})""".format(current_data_version, tenant_ids_str) | ||||
| #  times = (timeout if timeout > 0 else 60) / 5 | ||||
| # | ||||
| #  wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 60) | ||||
| #  times = wait_timeout / 5 | ||||
| #  while times >= 0: | ||||
| #    logging.info(sql) | ||||
| #    cur.execute(sql) | ||||
| @ -2816,6 +2911,8 @@ | ||||
| #      raise e | ||||
| #    time.sleep(5) | ||||
| # | ||||
| #  actions.set_session_timeout(cur, 10) | ||||
| # | ||||
| #  # check target_data_version/current_data_version from __all_core_table | ||||
| #  int_current_data_version = actions.get_version(current_data_version) | ||||
| #  sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0} and tenant_id in ({1})".format(int_current_data_version, tenant_ids_str) | ||||
| @ -2830,16 +2927,20 @@ | ||||
| #    logging.info("all tenant's target_data_version/current_data_version are match with {0}".format(current_data_version)) | ||||
| # | ||||
| ## 3 检查内部表自检是否成功 | ||||
| #def check_root_inspection(query_cur, timeout): | ||||
| #def check_root_inspection(cur, query_cur, timeout): | ||||
| #  sql = "select count(*) from oceanbase.__all_virtual_upgrade_inspection where info != 'succeed'" | ||||
| #  times = timeout if timeout > 0 else 180 | ||||
| #  while times > 0 : | ||||
| # | ||||
| #  wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600) | ||||
| # | ||||
| #  times = wait_timeout / 10 | ||||
| #  while times >= 0 : | ||||
| #    (desc, results) = query_cur.exec_query(sql) | ||||
| #    if results[0][0] == 0: | ||||
| #      break | ||||
| #    time.sleep(10) | ||||
| #    times -= 1 | ||||
| #  if times == 0: | ||||
| # | ||||
| #  if times == -1: | ||||
| #    logging.warn('check root inspection failed!') | ||||
| #    raise e | ||||
| #  logging.info('check root inspection success') | ||||
| @ -2867,7 +2968,7 @@ | ||||
| #  try: | ||||
| #    check_cluster_version(cur, timeout) | ||||
| #    check_data_version(cur, query_cur, timeout) | ||||
| #    check_root_inspection(query_cur, timeout) | ||||
| #    check_root_inspection(cur, query_cur, timeout) | ||||
| #    enable_ddl(cur, timeout) | ||||
| #    enable_rebalance(cur, timeout) | ||||
| #    enable_rereplication(cur, timeout) | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 tino247
					tino247