From 1b4b0d1d3ca09ec164731333a40f18ea3b90db33 Mon Sep 17 00:00:00 2001 From: fkuner <784819644@qq.com> Date: Mon, 1 Apr 2024 07:58:30 +0000 Subject: [PATCH] [CP] [CP] obcdc support hbase table --- .../libobcdc/src/ob_log_hbase_mode.cpp | 184 ++++++++++++------ .../libobcdc/src/ob_log_hbase_mode.h | 42 +++- .../libobcdc/src/ob_log_part_mgr.cpp | 110 +++++++++++ src/logservice/libobcdc/src/ob_log_part_mgr.h | 8 + 4 files changed, 281 insertions(+), 63 deletions(-) diff --git a/src/logservice/libobcdc/src/ob_log_hbase_mode.cpp b/src/logservice/libobcdc/src/ob_log_hbase_mode.cpp index 0ddbb0d7ed..b27217737a 100644 --- a/src/logservice/libobcdc/src/ob_log_hbase_mode.cpp +++ b/src/logservice/libobcdc/src/ob_log_hbase_mode.cpp @@ -64,27 +64,6 @@ void ObLogHbaseUtil::destroy() column_id_map_.destroy(); } -int ObLogHbaseUtil::add_hbase_table_id(const ObTableSchema &table_schema) -{ - int ret = OB_SUCCESS; - - bool is_hbase_mode_table = false; - const uint64_t table_id = table_schema.get_table_id(); - const char *table_name = table_schema.get_table_name(); - - if (OB_FAIL(filter_hbase_mode_table_(table_schema, is_hbase_mode_table))) { - LOG_ERROR("filter_hbase_mode_table_ fail", KR(ret), K(table_id), K(table_name), K(is_hbase_mode_table)); - } else if (! is_hbase_mode_table) { - LOG_INFO("[IS_NOT_HBASE_TABLE]", K(table_name), K(table_id), K(is_hbase_mode_table)); - } else if (OB_FAIL(table_id_set_.set_refactored(table_id))) { - LOG_ERROR("add_table_id into table_id_set_ fail", KR(ret), K(table_name), K(table_id)); - } else { - LOG_INFO("[HBASE] add_table_id into table_id_set_ succ", K(table_name), K(table_id)); - } - - return ret; -} - int ObLogHbaseUtil::filter_hbase_mode_table_(const ObTableSchema &table_schema, bool &is_hbase_mode_table) { @@ -93,11 +72,11 @@ int ObLogHbaseUtil::filter_hbase_mode_table_(const ObTableSchema &table_schema, is_hbase_mode_table = false; // Marks the presence or absence of a specified column int column_flag[HBASE_TABLE_COLUMN_COUNT]; + memset(column_flag, '\0', sizeof(column_flag)); // Mark column T as bigint or not bool is_T_column_bigint_type = false; // Record T-column id uint64_t column_id = OB_INVALID_ID; - memset(column_flag, '\0', sizeof(column_flag)); ObColumnIterByPrevNextID pre_next_id_iter(table_schema); while (OB_SUCCESS == ret) { @@ -110,23 +89,9 @@ int ObLogHbaseUtil::filter_hbase_mode_table_(const ObTableSchema &table_schema, } else if (OB_ISNULL(column_schema)) { LOG_ERROR("column_schema is null", KPC(column_schema)); ret = OB_ERR_UNEXPECTED; + } else if (match_column_name_(*column_schema, HBASE_TABLE_COLUMN_COUNT, column_flag, is_T_column_bigint_type, column_id)) { + LOG_WARN("match column name failed", KR(ret), K(column_schema)); } else { - const char *column_name = column_schema->get_column_name(); - - if (0 == strcmp(column_name, K_COLUMN)) { - column_flag[0]++; - } else if (0 == strcmp(column_name, Q_COLUMN)) { - column_flag[1]++; - } else if (0 == strcmp(column_name, T_COLUMN)) { - column_flag[2]++; - - if (ObIntType == column_schema->get_data_type()) { - is_T_column_bigint_type = true; - column_id = column_schema->get_column_id(); - } - } else if (0 == strcmp(column_name, V_COLUMN)) { - column_flag[3]++; - } } } // while @@ -135,38 +100,54 @@ int ObLogHbaseUtil::filter_hbase_mode_table_(const ObTableSchema &table_schema, ret = OB_SUCCESS; } - int64_t hbase_table_column_cnt = 0; - // check contains four columns K, Q, T, V - for (int64_t idx=0; idx < HBASE_TABLE_COLUMN_COUNT && OB_SUCC(ret); ++idx) { - if (1 == column_flag[idx]) { - ++hbase_table_column_cnt; - } + if (OB_SUCC(ret) && OB_FAIL(judge_and_add_hbase_table_(table_schema, is_T_column_bigint_type, column_id, + HBASE_TABLE_COLUMN_COUNT, column_flag, is_hbase_mode_table))) { + LOG_WARN("judge hbase table failed", KR(ret), K(table_schema), K(is_hbase_mode_table), K(is_T_column_bigint_type), + K(column_id), K(column_flag)); } - if (OB_SUCC(ret)) { - if ((HBASE_TABLE_COLUMN_COUNT == hbase_table_column_cnt) - && is_T_column_bigint_type) { - is_hbase_mode_table = true; + return ret; +} - TableID table_key(table_schema.get_table_id()); - if (OB_UNLIKELY(OB_INVALID_ID == column_id)) { - LOG_ERROR("column_id is not valid", K(column_id)); - ret = OB_ERR_UNEXPECTED; - } else if (OB_FAIL(column_id_map_.insert(table_key, column_id))) { - LOG_ERROR("column_id_map_ insert fail", KR(ret), K(table_key), K(column_id)); +int ObLogHbaseUtil::filter_hbase_mode_table_(const ObDictTableMeta &table_meta, + bool &is_hbase_mode_table) +{ + int ret = OB_SUCCESS; + + is_hbase_mode_table = false; + // Marks the presence or absence of a specified column + int column_flag[HBASE_TABLE_COLUMN_COUNT]; + memset(column_flag, '\0', sizeof(column_flag)); + // Mark column T as bigint or not + bool is_T_column_bigint_type = false; + // Record T-column id + uint64_t column_id = OB_INVALID_ID; + + const int64_t column_count = table_meta.get_column_count(); + const datadict::ObDictColumnMeta *col_metas = table_meta.get_column_metas(); + if (column_count <= 0) { + LOG_TRACE("table don't have columns, skip", K(table_meta)); + } else if (OB_ISNULL(col_metas)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("col_metas is nullptr", KR(ret), K(col_metas), K(table_meta)); + } else { + for (int idx = 0; OB_SUCC(ret) && idx < column_count; idx++) { + const ObDictColumnMeta *col_meta = col_metas + idx; + if (OB_ISNULL(col_meta)) { + ret = OB_INVALID_DATA; + LOG_WARN("unexpected invalid ObDictColumnMeta", KR(ret), K(col_meta), K(table_meta)); + } else if (match_column_name_(*col_meta, HBASE_TABLE_COLUMN_COUNT, column_flag, is_T_column_bigint_type, column_id)) { + LOG_WARN("match column name failed", KR(ret), K(col_meta)); } else { - // succ } - } else { - is_hbase_mode_table = false; } } - LOG_INFO("[HBASE] table info", "table_id", table_schema.get_table_id(), - "table_name", table_schema.get_table_name(), - K(hbase_table_column_cnt), - K(column_id), K(is_T_column_bigint_type), - K(is_hbase_mode_table)); + if (OB_SUCC(ret) && OB_FAIL(judge_and_add_hbase_table_(table_meta, is_T_column_bigint_type, column_id, + HBASE_TABLE_COLUMN_COUNT, column_flag, is_hbase_mode_table))) { + LOG_WARN("judge hbase table failed", KR(ret), K(table_meta), K(is_T_column_bigint_type), + K(column_id), K(column_flag), K(is_hbase_mode_table)); + } return ret; } @@ -228,5 +209,84 @@ int ObLogHbaseUtil::is_hbase_table(const uint64_t table_id, return ret; } +template +int ObLogHbaseUtil::match_column_name_(const COLUMN_SCHEMA &col_schema, + const int column_flag_size, + int *column_flag, + bool &is_T_column_bigint_type, + uint64_t &column_id) +{ + int ret = OB_SUCCESS; + const char *column_name = col_schema.get_column_name(); + + if (HBASE_TABLE_COLUMN_COUNT > column_flag_size) { + ret = OB_INVALID_DATA; + LOG_WARN("column_flag is invalid", KR(ret), K(column_flag)); + } else if (0 == strcmp(column_name, K_COLUMN)) { + column_flag[0]++; + } else if (0 == strcmp(column_name, Q_COLUMN)) { + column_flag[1]++; + } else if (0 == strcmp(column_name, T_COLUMN)) { + column_flag[2]++; + if (ObIntType == col_schema.get_data_type()) { + is_T_column_bigint_type = true; + column_id = col_schema.get_column_id(); + } + } else if (0 == strcmp(column_name, V_COLUMN)) { + column_flag[3]++; + } + + return ret; +} + +template +int ObLogHbaseUtil::judge_and_add_hbase_table_(const TABLE_SCHEMA &table_schema, + const bool is_T_column_bigint_type, + const uint64_t column_id, + const int column_flag_size, + const int *column_flag, + bool &is_hbase_mode_table) +{ + int ret = OB_SUCCESS; + int64_t hbase_table_column_cnt = 0; + + if (HBASE_TABLE_COLUMN_COUNT > column_flag_size) { + ret = OB_INVALID_DATA; + LOG_WARN("column_flag is invalid ", KR(ret), K(column_flag)); + } else { + // check contains four columns K, Q, T, V + for (int64_t idx = 0; idx < column_flag_size; ++idx) { + if (1 == column_flag[idx]) { + ++hbase_table_column_cnt; + } + } + + if ((HBASE_TABLE_COLUMN_COUNT == hbase_table_column_cnt) + && is_T_column_bigint_type) { + is_hbase_mode_table = true; + + TableID table_key(table_schema.get_table_id()); + if (OB_UNLIKELY(OB_INVALID_ID == column_id)) { + LOG_ERROR("column_id is not valid", K(column_id)); + ret = OB_ERR_UNEXPECTED; + } else if (OB_FAIL(column_id_map_.insert(table_key, column_id))) { + LOG_ERROR("column_id_map_ insert fail", KR(ret), K(table_key), K(column_id)); + } else { + // succ + } + } else { + is_hbase_mode_table = false; + } + + LOG_INFO("[HBASE] table info", "table_id", table_schema.get_table_id(), + "table_name", table_schema.get_table_name(), + K(hbase_table_column_cnt), + K(column_id), K(is_T_column_bigint_type), + K(is_hbase_mode_table)); + } + + return ret; +} + } } diff --git a/src/logservice/libobcdc/src/ob_log_hbase_mode.h b/src/logservice/libobcdc/src/ob_log_hbase_mode.h index b6ab17f4bb..f65db51b5f 100644 --- a/src/logservice/libobcdc/src/ob_log_hbase_mode.h +++ b/src/logservice/libobcdc/src/ob_log_hbase_mode.h @@ -20,6 +20,10 @@ namespace oceanbase { +namespace datadict +{ +class ObDictTableMeta; +} namespace share { namespace schema @@ -46,7 +50,27 @@ public: // 2. contains four columns K, Q, T, V // 3. T is of type bigint // Note: All of the above conditions are not necessarily met for an hbase table - int add_hbase_table_id(const oceanbase::share::schema::ObTableSchema &table_schema); + template + int add_hbase_table_id(const TABLE_SCHEMA &table_schema) + { + int ret = OB_SUCCESS; + + bool is_hbase_mode_table = false; + const uint64_t table_id = table_schema.get_table_id(); + const char *table_name = table_schema.get_table_name(); + + if (OB_FAIL(filter_hbase_mode_table_(table_schema, is_hbase_mode_table))) { + OBLOG_LOG(ERROR, "filter_hbase_mode_table_ fail", KR(ret), K(table_id), K(table_name), K(is_hbase_mode_table)); + } else if (! is_hbase_mode_table) { + OBLOG_LOG(INFO, "[IS_NOT_HBASE_TABLE]", K(table_name), K(table_id), K(is_hbase_mode_table)); + } else if (OB_FAIL(table_id_set_.set_refactored(table_id))) { + OBLOG_LOG(ERROR, "add_table_id into table_id_set_ fail", KR(ret), K(table_name), K(table_id)); + } else { + OBLOG_LOG(INFO, "[HBASE] add_table_id into table_id_set_ succ", K(table_name), K(table_id)); + } + + return ret; + } // Determine if conversion is required // table exists and is a T column @@ -108,6 +132,22 @@ private: private: int filter_hbase_mode_table_(const oceanbase::share::schema::ObTableSchema &table_schema, bool &is_hbase_mode_table); + int filter_hbase_mode_table_(const oceanbase::datadict::ObDictTableMeta &table_meta, + bool &is_hbase_mode_table); + template + int match_column_name_(const COLUMN_SCHEMA &col_schema, + const int column_flag_size, + int *column_flag, + bool &is_T_column_bigint_type, + uint64_t &column_id); + + template + int judge_and_add_hbase_table_(const TABLE_SCHEMA &table_schema, + const bool is_T_column_bigint_type, + const uint64_t column_id, + const int column_flag_size, + const int *column_flag, + bool &is_hbase_mode_table); private: bool inited_; diff --git a/src/logservice/libobcdc/src/ob_log_part_mgr.cpp b/src/logservice/libobcdc/src/ob_log_part_mgr.cpp index c53fac77f3..002483d385 100644 --- a/src/logservice/libobcdc/src/ob_log_part_mgr.cpp +++ b/src/logservice/libobcdc/src/ob_log_part_mgr.cpp @@ -385,6 +385,17 @@ int ObLogPartMgr::add_table(const uint64_t table_id, ISTAT("set tic update info success", K(new_schema_version), K(tic_update_info), K(tenant_name), K(database_name), K(table_name)); } + + if (OB_SUCC(ret) && TCONF.enable_hbase_mode) { + // add hbase table + if (OB_FAIL(try_add_hbase_table_(table_id, table_name, new_schema_version, timeout))) { + if (OB_TIMEOUT != ret) { + LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name), K(new_schema_version)); + } + } else { + LOG_INFO("try_add_hbase_table_ success", K(table_id), K(table_name), K(new_schema_version)); + } + } } return ret; } @@ -1607,6 +1618,22 @@ int ObLogPartMgr::add_user_table_info_(ObLogSchemaGuard &schema_guard, ISTAT("insert table_id into cache success", K_(tenant_id), K(table_id), K(database_id), K(tenant_name), K(database_name), K(table_name)); } + + if (OB_SUCC(ret) && TCONF.enable_hbase_mode) { + // add hbase table + const ObTableSchema *full_table_schema = NULL; + if (OB_FAIL(get_full_table_schema_(table_id, timeout, schema_guard, full_table_schema))) { + if (OB_TIMEOUT != ret) { + LOG_ERROR("get full table_schema failed", KR(ret), "table_id", table_id); + } + } else if (OB_FAIL(try_add_hbase_table_(full_table_schema, table_name, timeout))) { + if (OB_TIMEOUT != ret) { + LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name)); + } + } else { + LOG_INFO("try_add_hbase_table_ success", K(table_id), K(table_name)); + } + } } return ret; } @@ -1644,6 +1671,17 @@ int ObLogPartMgr::add_user_table_info_(ObDictTenantInfo *tenant_info, ISTAT("insert table_id into cache success", K_(tenant_id), K(table_id), K(database_id), K(tenant_name), K(database_name), K(table_name)); } + + if (OB_SUCC(ret) && TCONF.enable_hbase_mode) { + // add hbase table + if (OB_FAIL(try_add_hbase_table_(table_meta, table_name, timeout))) { + if (OB_TIMEOUT != ret) { + LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name)); + } + } else { + LOG_INFO("try_add_hbase_table_ success", K(table_id), K(table_name)); + } + } } return ret; } @@ -2597,6 +2635,78 @@ int ObLogPartMgr::inner_get_table_info_of_table_meta_(ObDictTenantInfo *tenant_i return ret; } +int ObLogPartMgr::try_add_hbase_table_(const uint64_t table_id, + const char *table_name, + const int64_t schema_version, + const int64_t timeout) +{ + int ret = OB_SUCCESS; + ObString tb_name_str(table_name); + // if table_name contains '$', it may be hbase table + if (NULL != tb_name_str.find('$')) { + if (is_online_refresh_mode(TCTX.refresh_mode_)) { + IObLogSchemaGetter *schema_getter = TCTX.schema_getter_; + ObLogSchemaGuard schema_guard; + const ObTableSchema *full_table_schema = NULL; + if (OB_ISNULL(schema_getter)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("schema_getter is NULL", KR(ret), K(schema_getter)); + } else if (OB_FAIL(schema_getter->get_schema_guard_and_full_table_schema( + tenant_id_, table_id, schema_version, timeout, schema_guard, full_table_schema))) { + if (OB_TIMEOUT != ret) { + LOG_ERROR("get_schema_guard_and_full_table_schema failed", KR(ret), K(table_id), KPC(full_table_schema)); + } + } else if (try_add_hbase_table_(full_table_schema, table_name, timeout)) { + LOG_ERROR("inner try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name)); + } else { + // succ + } + } else { + ObDictTenantInfoGuard dict_tenant_info_guard; + ObDictTenantInfo *tenant_info = nullptr; + datadict::ObDictTableMeta *table_meta = nullptr; + if (OB_FAIL(GLOGMETADATASERVICE.get_tenant_info_guard(tenant_id_, dict_tenant_info_guard))) { + LOG_ERROR("get tenant_info_guard failed", KR(ret), K_(tenant_id)); + } else if (OB_ISNULL(tenant_info = dict_tenant_info_guard.get_tenant_info())) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tenant_info is nullptr", K_(tenant_id)); + } else if (OB_FAIL(tenant_info->get_table_meta(table_id, table_meta))) { + LOG_ERROR("tenant_info get table_meta failed", KR(ret), K_(tenant_id)); + } else if (try_add_hbase_table_(table_meta, table_name, timeout)) { + LOG_ERROR("inner try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name)); + } else { + // succ + } + } + } + return ret; +} + +template +int ObLogPartMgr::try_add_hbase_table_(const TABLE_SCHEMA *table_schema, + const char *table_name, + const int64_t timeout) +{ + int ret = OB_SUCCESS; + ObString tb_name_str(table_name); + // if table_name contains '$', it may be hbase table + if (NULL != tb_name_str.find('$')) { + uint64_t table_id = OB_INVALID_ID; + if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("table_schema is NULL", KR(ret), K(table_schema)); + } else if (FALSE_IT(table_id = table_schema->get_table_id())) { + } else if (table_schema->is_in_recyclebin()) { + LOG_INFO("table is in recyclebin, no need to add", K(table_id), K(table_name)); + } else if (OB_FAIL(TCTX.hbase_util_.add_hbase_table_id(*table_schema))) { + LOG_ERROR("hbase_util_ add_hbase_table_id", KR(ret), K(table_id), K(table_name)); + } else { + // succ + } + } + return ret; +} + } } #undef _STAT diff --git a/src/logservice/libobcdc/src/ob_log_part_mgr.h b/src/logservice/libobcdc/src/ob_log_part_mgr.h index 53b2e95578..ec4ed278e5 100644 --- a/src/logservice/libobcdc/src/ob_log_part_mgr.h +++ b/src/logservice/libobcdc/src/ob_log_part_mgr.h @@ -625,6 +625,14 @@ private: const char *&table_name, uint64_t &database_id, bool &is_user_table); + int try_add_hbase_table_(const uint64_t table_id, + const char *table_name, + const int64_t schema_version, + const int64_t timeout); + template + int try_add_hbase_table_(const TABLE_SCHEMA *table_schema, + const char *table_name, + const int64_t timeout); private: ObLogTenant &host_;