[OBCDC] fix issues cause cdc stuck
This commit is contained in:
		@ -273,8 +273,10 @@ public:
 | 
			
		||||
  // ------------------------------------------------------------------------
 | 
			
		||||
  //              configurations which supports dynamically modify
 | 
			
		||||
  // ------------------------------------------------------------------------
 | 
			
		||||
  T_DEF_INT_INFT(mysql_connect_timeout_sec, OB_CLUSTER_PARAMETER, 40, 1, "mysql connection timeout in seconds");
 | 
			
		||||
  T_DEF_INT_INFT(mysql_query_timeout_sec, OB_CLUSTER_PARAMETER, 30, 1, "mysql query timeout in seconds");
 | 
			
		||||
  T_DEF_INT_INFT(rs_sql_connect_timeout_sec, OB_CLUSTER_PARAMETER, 40, 1, "rootservice mysql connection timeout in seconds");
 | 
			
		||||
  T_DEF_INT_INFT(rs_sql_query_timeout_sec, OB_CLUSTER_PARAMETER, 30, 1, "rootservice mysql query timeout in seconds");
 | 
			
		||||
  T_DEF_INT_INFT(tenant_sql_connect_timeout_sec, OB_CLUSTER_PARAMETER, 40, 1, "tenant mysql connection timeout in seconds");
 | 
			
		||||
  T_DEF_INT_INFT(tenant_sql_query_timeout_sec, OB_CLUSTER_PARAMETER, 30, 1, "tenant mysql query timeout in seconds");
 | 
			
		||||
  T_DEF_INT_INFT(start_lsn_locator_rpc_timeout_sec, OB_CLUSTER_PARAMETER, 60, 1,
 | 
			
		||||
      "start lsn locator rpc timeout in seconds");
 | 
			
		||||
  T_DEF_INT_INFT(start_lsn_locator_batch_count, OB_CLUSTER_PARAMETER, 5, 1, "start lsn locator batch count");
 | 
			
		||||
 | 
			
		||||
@ -732,8 +732,10 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns)
 | 
			
		||||
  const char *tg_white_list = TCONF.tablegroup_white_list.str();
 | 
			
		||||
  const char *tg_black_list = TCONF.tablegroup_black_list.str();
 | 
			
		||||
  int64_t max_cached_trans_ctx_count = MAX_CACHED_TRANS_CTX_COUNT;
 | 
			
		||||
  int64_t sql_conn_timeout_us = TCONF.mysql_connect_timeout_sec * _SEC_;
 | 
			
		||||
  int64_t sql_query_timeout_us = TCONF.mysql_query_timeout_sec * _SEC_;
 | 
			
		||||
  int64_t rs_sql_conn_timeout_us = TCONF.rs_sql_connect_timeout_sec * _SEC_;
 | 
			
		||||
  int64_t rs_sql_query_timeout_us = TCONF.rs_sql_query_timeout_sec * _SEC_;
 | 
			
		||||
  int64_t tenant_sql_conn_timeout_us = TCONF.tenant_sql_connect_timeout_sec * _SEC_;
 | 
			
		||||
  int64_t tenant_sql_query_timeout_us = TCONF.tenant_sql_query_timeout_sec * _SEC_;
 | 
			
		||||
  const char *ob_trace_id_ptr = TCONF.ob_trace_id.str();
 | 
			
		||||
  const char *drc_message_factory_binlog_record_type_str = TCONF.drc_message_factory_binlog_record_type.str();
 | 
			
		||||
  // The starting schema version of the SYS tenant
 | 
			
		||||
@ -794,10 +796,10 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns)
 | 
			
		||||
  // init ObLogMysqlProxy
 | 
			
		||||
  if (OB_SUCC(ret)) {
 | 
			
		||||
    if (OB_FAIL(mysql_proxy_.init(cluster_user, cluster_password, cluster_db_name,
 | 
			
		||||
        sql_conn_timeout_us, sql_query_timeout_us, enable_ssl_client_authentication, rs_server_provider_))) {
 | 
			
		||||
        rs_sql_conn_timeout_us, rs_sql_query_timeout_us, enable_ssl_client_authentication, rs_server_provider_))) {
 | 
			
		||||
      LOG_ERROR("mysql_proxy_ init fail", KR(ret), K(rs_server_provider_),
 | 
			
		||||
          K(cluster_user), K(cluster_password), K(cluster_db_name), K(sql_conn_timeout_us),
 | 
			
		||||
          K(sql_query_timeout_us), K(enable_ssl_client_authentication));
 | 
			
		||||
          K(cluster_user), K(cluster_password), K(cluster_db_name), K(rs_sql_conn_timeout_us),
 | 
			
		||||
          K(rs_sql_query_timeout_us), K(enable_ssl_client_authentication));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
@ -832,11 +834,11 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns)
 | 
			
		||||
  // init ObLogTenantSQLProxy
 | 
			
		||||
  if (OB_SUCC(ret)) {
 | 
			
		||||
    if (OB_FAIL(tenant_sql_proxy_.init(cluster_user, cluster_password, cluster_db_name,
 | 
			
		||||
        sql_conn_timeout_us, sql_query_timeout_us, enable_ssl_client_authentication,
 | 
			
		||||
        tenant_sql_conn_timeout_us, tenant_sql_query_timeout_us, enable_ssl_client_authentication,
 | 
			
		||||
        tenant_server_provider_, true/*is_tenant_server_provider*/))) {
 | 
			
		||||
      LOG_ERROR("tenant_sql_proxy_ init fail", KR(ret), K(tenant_server_provider_),
 | 
			
		||||
          K(cluster_user), K(cluster_password), K(cluster_db_name), K(sql_conn_timeout_us),
 | 
			
		||||
          K(sql_query_timeout_us), K(enable_ssl_client_authentication));
 | 
			
		||||
          K(cluster_user), K(cluster_password), K(cluster_db_name), K(tenant_sql_conn_timeout_us),
 | 
			
		||||
          K(tenant_sql_query_timeout_us), K(enable_ssl_client_authentication));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1395,8 +1395,8 @@ int ObLogSysTableHelper::change_to_next_server_(const int64_t svr_idx, ObLogMySQ
 | 
			
		||||
  // update connection
 | 
			
		||||
  ObAddr svr;
 | 
			
		||||
  MySQLConnConfig conn_config;
 | 
			
		||||
  int mysql_connect_timeout_sec = TCONF.mysql_connect_timeout_sec;
 | 
			
		||||
  int mysql_query_timeout_sec = TCONF.mysql_query_timeout_sec;
 | 
			
		||||
  int mysql_connect_timeout_sec = TCONF.rs_sql_connect_timeout_sec;
 | 
			
		||||
  int mysql_query_timeout_sec = TCONF.rs_sql_query_timeout_sec;
 | 
			
		||||
  const bool enable_ssl_client_authentication = (1 == TCONF.ssl_client_authentication);
 | 
			
		||||
 | 
			
		||||
  if (OB_ISNULL(svr_provider_)) {
 | 
			
		||||
 | 
			
		||||
@ -352,10 +352,6 @@ int ObLogTenantMgr::do_add_tenant_(const uint64_t tenant_id,
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  // del tenant_start_ddl_info if exists regardless of ret(in case of tenant already dropped etc.)
 | 
			
		||||
  try_del_tenant_start_ddl_info_(tenant_id);
 | 
			
		||||
 | 
			
		||||
  if (OB_SUCCESS == ret) {
 | 
			
		||||
    ISTAT("[ADD_TENANT]", K(tenant_id), K(tenant_name), K(is_new_created_tenant), K(is_new_tenant_by_restore),
 | 
			
		||||
        K(is_tenant_served), K(start_tstamp_ns), K(tenant_start_serve_ts_ns), K(sys_schema_version),
 | 
			
		||||
@ -522,13 +518,17 @@ int ObLogTenantMgr::add_tenant(const uint64_t tenant_id,
 | 
			
		||||
            K(sys_schema_version), K(timeout));
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
      // NOTE: currently add_tenant is serialize executed, thus reset all info in add_tenant_start_ddl_info_map_ is safe.
 | 
			
		||||
      add_tenant_start_ddl_info_map_.reset();
 | 
			
		||||
      // add tenant success
 | 
			
		||||
      add_tenant_succ = true;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  // NOTE: currently add_tenant is NOT serialize executed, thus reset all info in add_tenant_start_ddl_info_map_ is NOT safe.
 | 
			
		||||
  // (meta_tenant add_tenant_start -> user_tenant add_tenant_start -> meta_tenant add_tenant_end -> user_tenant add_tenant_end.)
 | 
			
		||||
  // reset tenant_start_ddl_info for specified tenant_id regardless of ret(in case of tenant already dropped or not serve
 | 
			
		||||
  // and other unexpected case, otherwise global_heartbeat will be stucked.)
 | 
			
		||||
  try_del_tenant_start_ddl_info_(tenant_id);
 | 
			
		||||
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -558,8 +558,13 @@ int ObLogTenantMgr::get_first_schema_version_of_tenant_(const uint64_t tenant_id
 | 
			
		||||
      timeout))) {
 | 
			
		||||
    // OB_TENANT_HAS_BEEN_DROPPED return caller
 | 
			
		||||
    if (OB_TIMEOUT != ret) {
 | 
			
		||||
      LOG_ERROR("get_first_trans_end_schema_version fail", KR(ret), K(tenant_id),
 | 
			
		||||
          K(first_schema_version));
 | 
			
		||||
      if (OB_TENANT_HAS_BEEN_DROPPED == ret) {
 | 
			
		||||
        LOG_WARN("get_first_trans_end_schema_version fail cause tenant dropped", KR(ret), K(tenant_id),
 | 
			
		||||
            K(first_schema_version));
 | 
			
		||||
      } else {
 | 
			
		||||
        LOG_ERROR("get_first_trans_end_schema_version fail", KR(ret), K(tenant_id),
 | 
			
		||||
            K(first_schema_version));
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  } else if (OB_UNLIKELY(first_schema_version <= 0)) {
 | 
			
		||||
    LOG_ERROR("tenant first schema versioin is invalid", K(tenant_id), K(first_schema_version));
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user