From 920dc70c0ca2036f54475f948849bf2e86d5cdc6 Mon Sep 17 00:00:00 2001 From: fkuner <784819644@qq.com> Date: Tue, 12 Nov 2024 12:16:13 +0000 Subject: [PATCH] [OBCDC] fix the problem which hbase relies the white and black list --- .../libobcdc/src/ob_log_ddl_processor.cpp | 2 +- .../libobcdc/src/ob_log_part_mgr.cpp | 160 ++++++++++++------ src/logservice/libobcdc/src/ob_log_part_mgr.h | 22 ++- 3 files changed, 133 insertions(+), 51 deletions(-) diff --git a/src/logservice/libobcdc/src/ob_log_ddl_processor.cpp b/src/logservice/libobcdc/src/ob_log_ddl_processor.cpp index d83df1968..bd6473059 100644 --- a/src/logservice/libobcdc/src/ob_log_ddl_processor.cpp +++ b/src/logservice/libobcdc/src/ob_log_ddl_processor.cpp @@ -359,7 +359,7 @@ int ObLogDDLProcessor::handle_tenant_ddl_task_( mark_stmt_binlog_record_invalid_(*ddl_stmt); } else { // statements are not filtered, processing DDL statements - if (enable_white_black_list_ && need_update_tic && OB_FAIL(handle_ddl_stmt_update_tic_(tenant, task, + if ((enable_white_black_list_ || TCONF.enable_hbase_mode) && need_update_tic && OB_FAIL(handle_ddl_stmt_update_tic_(tenant, task, *ddl_stmt, old_schema_version, new_schema_version, stop_flag))) { if (OB_IN_STOP_STATE != ret) { LOG_ERROR("handle_ddl_stmt_update_tic_ fail", KR(ret), K(tenant), K(task), K(ddl_stmt), diff --git a/src/logservice/libobcdc/src/ob_log_part_mgr.cpp b/src/logservice/libobcdc/src/ob_log_part_mgr.cpp index 9b110fd25..2542f0ba4 100644 --- a/src/logservice/libobcdc/src/ob_log_part_mgr.cpp +++ b/src/logservice/libobcdc/src/ob_log_part_mgr.cpp @@ -171,7 +171,13 @@ int ObLogPartMgr::add_all_user_tablets_and_tables_info(const int64_t timeout) if (OB_SUCC(ret) && enable_white_black_list_ && OB_FAIL(add_user_table_info_(schema_guard, table_schema, timeout))) { - LOG_ERROR("add_user_table_info failed", KR(ret), K_(tenant_id), KPC(table_schema)); + if (OB_TIMEOUT != ret) { + LOG_ERROR("add_user_table_info failed", KR(ret), K_(tenant_id), KPC(table_schema)); + } + } else if (TCONF.enable_hbase_mode && OB_FAIL(add_hbase_table(schema_guard, table_schema, timeout))) { + if (OB_TIMEOUT != ret) { + LOG_WARN("add_hbase_table fail", KR(ret)); + } } } @@ -212,7 +218,13 @@ int ObLogPartMgr::add_all_user_tablets_and_tables_info( if (OB_SUCC(ret) && enable_white_black_list_ && OB_FAIL(add_user_table_info_(tenant_info, table_meta, timeout))) { - LOG_ERROR("add_user_table_info failed", KR(ret), K_(tenant_id), KPC(table_meta)); + if (OB_TIMEOUT != ret) { + LOG_ERROR("add_user_table_info failed", KR(ret), K_(tenant_id), KPC(table_meta)); + } + } else if (TCONF.enable_hbase_mode && OB_FAIL(add_hbase_table(table_meta, timeout))) { + if (OB_TIMEOUT != ret) { + LOG_WARN("add_hbase_table fail", KR(ret), K(table_meta)); + } } } @@ -402,6 +414,73 @@ int ObLogPartMgr::add_table(const uint64_t table_id, return ret; } +int ObLogPartMgr::add_hbase_table(const uint64_t table_id, + DdlStmtTask &ddl_stmt, + const int64_t new_schema_version, + const int64_t timeout) +{ + int ret = OB_SUCCESS; + const char *tenant_name = nullptr; + const char *database_name = nullptr; + const char *table_name = nullptr; + + if (OB_FAIL(get_schema_info_of_table_id_(table_id, new_schema_version, tenant_name, + database_name, table_name, timeout))) { + if (OB_TIMEOUT != ret) { + LOG_ERROR("get_schema_info_of_table_id_ failed", KR(ret), K(table_id), K(new_schema_version)); + } + } else if (OB_FAIL(try_add_hbase_table_(table_id, table_name, new_schema_version, timeout))) { + if (OB_TIMEOUT != ret) { + LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name), K(new_schema_version)); + } + } else { + LOG_INFO("try_add_hbase_table_ success", K(table_id), K(table_name), K(new_schema_version)); + } + + return ret; +} + +int ObLogPartMgr::add_hbase_table(ObLogSchemaGuard &schema_guard, + const ObSimpleTableSchemaV2 *table_schema, + const int64_t timeout) +{ + int ret = OB_SUCCESS; + const uint64_t table_id = table_schema->get_table_id(); + const char *table_name = table_schema->get_table_name(); + const ObTableSchema *full_table_schema = NULL; + + if (OB_FAIL(get_full_table_schema_(table_id, timeout, schema_guard, full_table_schema))) { + if (OB_TIMEOUT != ret) { + LOG_ERROR("get full table_schema failed", KR(ret), K(table_id)); + } + } else if (OB_FAIL(try_add_hbase_table_(full_table_schema, timeout))) { + if (OB_TIMEOUT != ret) { + LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name)); + } + } else { + LOG_INFO("try_add_hbase_table_ success", K(table_id), K(table_name)); + } + + return ret; +} + +int ObLogPartMgr::add_hbase_table(const datadict::ObDictTableMeta *table_meta, + const int64_t timeout) +{ + int ret = OB_SUCCESS; + const char *table_name = table_meta->get_table_name(); + + if (OB_FAIL(try_add_hbase_table_(table_meta, timeout))) { + if (OB_TIMEOUT != ret) { + LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_name)); + } + } else { + LOG_INFO("try_add_hbase_table_ success", K(table_name)); + } + + return ret; +} + int ObLogPartMgr::try_get_offline_ddl_origin_table_schema_(const ObSimpleTableSchemaV2 &table_schema, ObLogSchemaGuard &schema_guard, const int64_t timeout, @@ -1658,22 +1737,6 @@ int ObLogPartMgr::add_user_table_info_(ObLogSchemaGuard &schema_guard, ISTAT("insert table_id into cache success", K_(tenant_id), K(table_id), K(database_id), K(tenant_name), K(database_name), K(table_name)); } - - if (OB_SUCC(ret) && TCONF.enable_hbase_mode) { - // add hbase table - const ObTableSchema *full_table_schema = NULL; - if (OB_FAIL(get_full_table_schema_(table_id, timeout, schema_guard, full_table_schema))) { - if (OB_TIMEOUT != ret) { - LOG_ERROR("get full table_schema failed", KR(ret), "table_id", table_id); - } - } else if (OB_FAIL(try_add_hbase_table_(full_table_schema, table_name, timeout))) { - if (OB_TIMEOUT != ret) { - LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name)); - } - } else { - LOG_INFO("try_add_hbase_table_ success", K(table_id), K(table_name)); - } - } } return ret; } @@ -1711,17 +1774,6 @@ int ObLogPartMgr::add_user_table_info_(ObDictTenantInfo *tenant_info, ISTAT("insert table_id into cache success", K_(tenant_id), K(table_id), K(database_id), K(tenant_name), K(database_name), K(table_name)); } - - if (OB_SUCC(ret) && TCONF.enable_hbase_mode) { - // add hbase table - if (OB_FAIL(try_add_hbase_table_(table_meta, table_name, timeout))) { - if (OB_TIMEOUT != ret) { - LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name)); - } - } else { - LOG_INFO("try_add_hbase_table_ success", K(table_id), K(table_name)); - } - } } return ret; } @@ -2721,7 +2773,9 @@ int ObLogPartMgr::try_add_hbase_table_(const uint64_t table_id, int ret = OB_SUCCESS; ObString tb_name_str(table_name); // if table_name contains '$', it may be hbase table - if (NULL != tb_name_str.find('$')) { + if (NULL == tb_name_str.find('$')) { + // table_name_str doesn't contain '$' or maybe empty + } else { if (is_online_refresh_mode(TCTX.refresh_mode_)) { IObLogSchemaGetter *schema_getter = TCTX.schema_getter_; ObLogSchemaGuard schema_guard; @@ -2734,10 +2788,10 @@ int ObLogPartMgr::try_add_hbase_table_(const uint64_t table_id, if (OB_TIMEOUT != ret) { LOG_ERROR("get_schema_guard_and_full_table_schema failed", KR(ret), K(table_id), KPC(full_table_schema)); } - } else if (OB_FAIL(try_add_hbase_table_(full_table_schema, table_name, timeout))) { + } else if (OB_FAIL(try_add_hbase_table_(full_table_schema, timeout))) { LOG_ERROR("inner try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name)); } else { - // succ + LOG_INFO("inner try_add_hbase_table_ success", K(table_id), K(table_name)); } } else { ObDictTenantInfoGuard dict_tenant_info_guard; @@ -2750,38 +2804,46 @@ int ObLogPartMgr::try_add_hbase_table_(const uint64_t table_id, LOG_ERROR("tenant_info is nullptr", K_(tenant_id)); } else if (OB_FAIL(tenant_info->get_table_meta(table_id, table_meta))) { LOG_ERROR("tenant_info get table_meta failed", KR(ret), K_(tenant_id)); - } else if (OB_FAIL(try_add_hbase_table_(table_meta, table_name, timeout))) { + } else if (OB_FAIL(try_add_hbase_table_(table_meta, timeout))) { LOG_ERROR("inner try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name)); } else { - // succ + LOG_INFO("inner try_add_hbase_table_ success", K(table_id), K(table_name)); } } } + return ret; } template int ObLogPartMgr::try_add_hbase_table_(const TABLE_SCHEMA *table_schema, - const char *table_name, const int64_t timeout) { int ret = OB_SUCCESS; - ObString tb_name_str(table_name); - // if table_name contains '$', it may be hbase table - if (NULL != tb_name_str.find('$')) { - uint64_t table_id = OB_INVALID_ID; - if (OB_ISNULL(table_schema)) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("table_schema is NULL", KR(ret), K(table_schema)); - } else if (FALSE_IT(table_id = table_schema->get_table_id())) { - } else if (table_schema->is_in_recyclebin()) { - LOG_INFO("table is in recyclebin, no need to add", K(table_id), K(table_name)); - } else if (OB_FAIL(TCTX.hbase_util_.add_hbase_table_id(*table_schema))) { - LOG_ERROR("hbase_util_ add_hbase_table_id", KR(ret), K(table_id), K(table_name)); - } else { - // succ + const char *table_name = table_schema->get_table_name(); + + if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("table_schema is nullptr", KR(ret)); + } else { + ObString tb_name_str(table_name); + // if table_name contains '$', it may be hbase table + if (NULL != tb_name_str.find('$')) { + uint64_t table_id = OB_INVALID_ID; + if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("table_schema is NULL", KR(ret), K(table_schema)); + } else if (FALSE_IT(table_id = table_schema->get_table_id())) { + } else if (table_schema->is_in_recyclebin()) { + LOG_INFO("table is in recyclebin, no need to add", K(table_id), K(table_name)); + } else if (OB_FAIL(TCTX.hbase_util_.add_hbase_table_id(*table_schema))) { + LOG_ERROR("hbase_util_ add_hbase_table_id", KR(ret), K(table_id), K(table_name)); + } else { + LOG_INFO("inner try_add_hbase_table_ success", K(table_id), K(table_name)); + } } } + return ret; } diff --git a/src/logservice/libobcdc/src/ob_log_part_mgr.h b/src/logservice/libobcdc/src/ob_log_part_mgr.h index 9d38d48d6..2208953f7 100644 --- a/src/logservice/libobcdc/src/ob_log_part_mgr.h +++ b/src/logservice/libobcdc/src/ob_log_part_mgr.h @@ -87,6 +87,18 @@ public: const int64_t new_schema_version, const int64_t timeout) = 0; + virtual int add_hbase_table(const uint64_t table_id, + DdlStmtTask &ddl_stmt, + const int64_t new_schema_version, + const int64_t timeout) = 0; + + virtual int add_hbase_table(ObLogSchemaGuard &schema_guard, + const ObSimpleTableSchemaV2 *table_schema, + const int64_t timeout) = 0; + + virtual int add_hbase_table(const datadict::ObDictTableMeta *table_meta, + const int64_t timeout) = 0; + /// Add a global unique index table, create index table scenario /// @note must be called by a single thread in order by Schema version, not concurrently and in random order //// @@ -300,6 +312,15 @@ public: DdlStmtTask &ddl_stmt, const int64_t new_schema_version, const int64_t timeout); + virtual int add_hbase_table(const uint64_t table_id, + DdlStmtTask &ddl_stmt, + const int64_t new_schema_version, + const int64_t timeout); + virtual int add_hbase_table(ObLogSchemaGuard &schema_guard, + const ObSimpleTableSchemaV2 *table_schema, + const int64_t timeout); + virtual int add_hbase_table(const datadict::ObDictTableMeta *table_meta, + const int64_t timeout); virtual int alter_table(const uint64_t table_id, const int64_t schema_version_before_alter, const int64_t schema_version_after_alter, @@ -633,7 +654,6 @@ private: const int64_t timeout); template int try_add_hbase_table_(const TABLE_SCHEMA *table_schema, - const char *table_name, const int64_t timeout); private: