[CP] [CP] fix handling index table error and ignore table matching when enable_white_black_list is false
This commit is contained in:
		@ -50,7 +50,8 @@ namespace libobcdc
 | 
			
		||||
ObLogDDLProcessor::ObLogDDLProcessor() :
 | 
			
		||||
    is_inited_(false),
 | 
			
		||||
    schema_getter_(NULL),
 | 
			
		||||
    skip_reversed_schema_version_(false)
 | 
			
		||||
    skip_reversed_schema_version_(false),
 | 
			
		||||
    enable_white_black_list_(false)
 | 
			
		||||
{}
 | 
			
		||||
 | 
			
		||||
ObLogDDLProcessor::~ObLogDDLProcessor()
 | 
			
		||||
@ -60,7 +61,8 @@ ObLogDDLProcessor::~ObLogDDLProcessor()
 | 
			
		||||
 | 
			
		||||
int ObLogDDLProcessor::init(
 | 
			
		||||
    IObLogSchemaGetter *schema_getter,
 | 
			
		||||
    const bool skip_reversed_schema_version)
 | 
			
		||||
    const bool skip_reversed_schema_version,
 | 
			
		||||
    const bool enable_white_black_list)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
 | 
			
		||||
@ -72,6 +74,7 @@ int ObLogDDLProcessor::init(
 | 
			
		||||
    LOG_ERROR("invalid argument", KR(ret), K(schema_getter));
 | 
			
		||||
  } else {
 | 
			
		||||
    skip_reversed_schema_version_ = skip_reversed_schema_version;
 | 
			
		||||
    enable_white_black_list_ = enable_white_black_list;
 | 
			
		||||
    is_inited_ = true;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
@ -83,6 +86,7 @@ void ObLogDDLProcessor::destroy()
 | 
			
		||||
  is_inited_ = false;
 | 
			
		||||
  schema_getter_ = NULL;
 | 
			
		||||
  skip_reversed_schema_version_ = false;
 | 
			
		||||
  enable_white_black_list_ = false;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObLogDDLProcessor::handle_ddl_trans(
 | 
			
		||||
@ -355,8 +359,8 @@ int ObLogDDLProcessor::handle_tenant_ddl_task_(
 | 
			
		||||
        mark_stmt_binlog_record_invalid_(*ddl_stmt);
 | 
			
		||||
      } else {
 | 
			
		||||
        // statements are not filtered, processing DDL statements
 | 
			
		||||
        if (need_update_tic && OB_FAIL(handle_ddl_stmt_update_tic_(tenant, task, *ddl_stmt, old_schema_version,
 | 
			
		||||
            new_schema_version, stop_flag))) {
 | 
			
		||||
        if (enable_white_black_list_ && need_update_tic && OB_FAIL(handle_ddl_stmt_update_tic_(tenant, task,
 | 
			
		||||
            *ddl_stmt, old_schema_version, new_schema_version, stop_flag))) {
 | 
			
		||||
          if (OB_IN_STOP_STATE != ret) {
 | 
			
		||||
            LOG_ERROR("handle_ddl_stmt_update_tic_ fail", KR(ret), K(tenant), K(task), K(ddl_stmt),
 | 
			
		||||
                K(old_schema_version), K(new_schema_version));
 | 
			
		||||
 | 
			
		||||
@ -53,7 +53,8 @@ public:
 | 
			
		||||
public:
 | 
			
		||||
  int init(
 | 
			
		||||
      IObLogSchemaGetter *schema_getter,
 | 
			
		||||
      const bool skip_reversed_schema_version);
 | 
			
		||||
      const bool skip_reversed_schema_version,
 | 
			
		||||
      const bool enable_white_black_list);
 | 
			
		||||
  void destroy();
 | 
			
		||||
  int handle_ddl_trans(
 | 
			
		||||
      PartTransTask &task,
 | 
			
		||||
@ -326,6 +327,7 @@ private:
 | 
			
		||||
  bool                is_inited_;
 | 
			
		||||
  IObLogSchemaGetter  *schema_getter_;
 | 
			
		||||
  bool                skip_reversed_schema_version_;
 | 
			
		||||
  bool                enable_white_black_list_;
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
  DISALLOW_COPY_AND_ASSIGN(ObLogDDLProcessor);
 | 
			
		||||
 | 
			
		||||
@ -967,7 +967,8 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns)
 | 
			
		||||
 | 
			
		||||
  INIT(trans_redo_dispatcher_, ObLogTransRedoDispatcher, redo_dispatcher_mem_limit, enable_sort_by_seq_no, *trans_stat_mgr_);
 | 
			
		||||
 | 
			
		||||
  INIT(ddl_processor_, ObLogDDLProcessor, schema_getter_, TCONF.skip_reversed_schema_verison);
 | 
			
		||||
  INIT(ddl_processor_, ObLogDDLProcessor, schema_getter_, TCONF.skip_reversed_schema_verison,
 | 
			
		||||
      TCONF.enable_white_black_list);
 | 
			
		||||
 | 
			
		||||
  INIT(sequencer_, ObLogSequencer, TCONF.sequencer_thread_num, CDC_CFG_MGR.get_sequencer_queue_length(),
 | 
			
		||||
      *trans_ctx_mgr_, *trans_stat_mgr_, *committer_, *trans_redo_dispatcher_, *trans_msg_sorter_, *err_handler);
 | 
			
		||||
 | 
			
		||||
@ -169,7 +169,8 @@ int ObLogPartMgr::add_all_user_tablets_and_tables_info(const int64_t timeout)
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (OB_SUCC(ret) && OB_FAIL(add_user_table_info_(schema_guard, table_schema, timeout))) {
 | 
			
		||||
      if (OB_SUCC(ret) && enable_white_black_list_ && OB_FAIL(add_user_table_info_(schema_guard,
 | 
			
		||||
          table_schema, timeout))) {
 | 
			
		||||
        LOG_ERROR("add_user_table_info failed", KR(ret), K_(tenant_id), KPC(table_schema));
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
@ -209,7 +210,8 @@ int ObLogPartMgr::add_all_user_tablets_and_tables_info(
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      if (OB_SUCC(ret) && OB_FAIL(add_user_table_info_(tenant_info, table_meta, timeout))) {
 | 
			
		||||
      if (OB_SUCC(ret) && enable_white_black_list_ && OB_FAIL(add_user_table_info_(tenant_info,
 | 
			
		||||
          table_meta, timeout))) {
 | 
			
		||||
        LOG_ERROR("add_user_table_info failed", KR(ret), K_(tenant_id), KPC(table_meta));
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
@ -2406,10 +2408,14 @@ int ObLogPartMgr::get_table_info_of_table_schema_(ObLogSchemaGuard &schema_guard
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  uint64_t table_id = OB_INVALID_ID;
 | 
			
		||||
  const ObSimpleTableSchemaV2 *final_table_schema = NULL;
 | 
			
		||||
  bool is_index_table = false;
 | 
			
		||||
  if (OB_ISNULL(table_schema)) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_ERROR("table_schema is NULL", KR(ret), K(table_schema));
 | 
			
		||||
  } else if (FALSE_IT(table_id = table_schema->get_table_id())) {
 | 
			
		||||
  } else if (table_schema->is_index_table()) {
 | 
			
		||||
    is_index_table = true;
 | 
			
		||||
    LOG_INFO("table is index table, ignore it", K(table_id), KPC(table_schema));
 | 
			
		||||
  } else if (table_schema->is_user_hidden_table()) {
 | 
			
		||||
    const ObSimpleTableSchemaV2 *origin_table_schema = nullptr;
 | 
			
		||||
    if (OB_FAIL(try_get_offline_ddl_origin_table_schema_(*table_schema, schema_guard,
 | 
			
		||||
@ -2474,15 +2480,19 @@ int ObLogPartMgr::get_table_info_of_table_schema_(ObLogSchemaGuard &schema_guard
 | 
			
		||||
    final_table_schema = table_schema;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  if (OB_SUCC(ret) && OB_FAIL(inner_get_table_info_of_table_schema_(schema_guard, final_table_schema, tenant_name,
 | 
			
		||||
      database_name, table_name, database_id, is_user_table, timeout))) {
 | 
			
		||||
    if (OB_TIMEOUT != ret) {
 | 
			
		||||
      LOG_ERROR("inner get table info failed", KR(ret), KPC(final_table_schema));
 | 
			
		||||
  if (OB_SUCC(ret)) {
 | 
			
		||||
    if (is_index_table) {
 | 
			
		||||
      is_user_table = false;
 | 
			
		||||
    } else if (OB_FAIL(inner_get_table_info_of_table_schema_(schema_guard, final_table_schema, tenant_name,
 | 
			
		||||
        database_name, table_name, database_id, is_user_table, timeout))) {
 | 
			
		||||
      if (OB_TIMEOUT != ret) {
 | 
			
		||||
        LOG_ERROR("inner get table info failed", KR(ret), KPC(final_table_schema));
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
      LOG_INFO("get table info of table_schema finished", KR(ret), K(table_id), K(tenant_name),
 | 
			
		||||
          K(database_name), K(table_name), K(database_id), K(is_user_table));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  LOG_INFO("get table info of table_schema finished", KR(ret), K(table_id), K(tenant_name),
 | 
			
		||||
      K(database_name), K(table_name), K(database_id), K(is_user_table));
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -2498,6 +2508,7 @@ int ObLogPartMgr::get_table_info_of_table_meta_(ObDictTenantInfo *tenant_info,
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  uint64_t table_id = OB_INVALID_ID;
 | 
			
		||||
  const datadict::ObDictTableMeta *final_table_meta = NULL;
 | 
			
		||||
  bool is_index_table = false;
 | 
			
		||||
  if (OB_ISNULL(tenant_info)) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_ERROR("tenant_info is NULL", KR(ret), K_(tenant_id));
 | 
			
		||||
@ -2505,6 +2516,9 @@ int ObLogPartMgr::get_table_info_of_table_meta_(ObDictTenantInfo *tenant_info,
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_ERROR("table_meta is NULL", KR(ret), K(table_meta));
 | 
			
		||||
  } else if (FALSE_IT(table_id = table_meta->get_table_id())) {
 | 
			
		||||
  } else if (table_meta->is_index_table()) {
 | 
			
		||||
    is_index_table = true;
 | 
			
		||||
    LOG_INFO("table is index table, ignore it", K(table_id), KPC(table_meta));
 | 
			
		||||
  } else if (table_meta->is_user_hidden_table()) {
 | 
			
		||||
    datadict::ObDictTableMeta *origin_table_meta = nullptr;
 | 
			
		||||
    if (OB_FAIL(try_get_offline_ddl_origin_table_meta_(*table_meta, tenant_info,
 | 
			
		||||
@ -2558,13 +2572,19 @@ int ObLogPartMgr::get_table_info_of_table_meta_(ObDictTenantInfo *tenant_info,
 | 
			
		||||
    final_table_meta = table_meta;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  if (OB_SUCC(ret) && OB_FAIL(inner_get_table_info_of_table_meta_(tenant_info, final_table_meta,
 | 
			
		||||
      tenant_name, database_name, table_name, database_id, is_user_table))) {
 | 
			
		||||
    LOG_ERROR("inner get table info failed", KR(ret), KPC(final_table_meta));
 | 
			
		||||
  if (OB_SUCC(ret)) {
 | 
			
		||||
    if (is_index_table) {
 | 
			
		||||
      is_user_table = false;
 | 
			
		||||
    } else if (OB_FAIL(inner_get_table_info_of_table_meta_(tenant_info, final_table_meta,
 | 
			
		||||
        tenant_name, database_name, table_name, database_id, is_user_table))) {
 | 
			
		||||
      if (OB_TIMEOUT != ret) {
 | 
			
		||||
        LOG_ERROR("inner get table info failed", KR(ret), KPC(final_table_meta));
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
      LOG_INFO("get table info of table_meta success", K(table_id), K(tenant_name),
 | 
			
		||||
          K(database_name), K(table_name), K(database_id), K(is_user_table));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  LOG_INFO("get table info of table_meta success", K(table_id), K(tenant_name),
 | 
			
		||||
      K(database_name), K(table_name), K(database_id), K(is_user_table));
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user