[OBCDC] fix the problem which hbase relies the white and black list

This commit is contained in:
fkuner 2024-11-12 12:16:13 +00:00 committed by ob-robot
parent 5fe3def070
commit 920dc70c0c
3 changed files with 133 additions and 51 deletions

View File

@ -359,7 +359,7 @@ int ObLogDDLProcessor::handle_tenant_ddl_task_(
mark_stmt_binlog_record_invalid_(*ddl_stmt);
} else {
// statements are not filtered, processing DDL statements
if (enable_white_black_list_ && need_update_tic && OB_FAIL(handle_ddl_stmt_update_tic_(tenant, task,
if ((enable_white_black_list_ || TCONF.enable_hbase_mode) && need_update_tic && OB_FAIL(handle_ddl_stmt_update_tic_(tenant, task,
*ddl_stmt, old_schema_version, new_schema_version, stop_flag))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("handle_ddl_stmt_update_tic_ fail", KR(ret), K(tenant), K(task), K(ddl_stmt),

View File

@ -171,7 +171,13 @@ int ObLogPartMgr::add_all_user_tablets_and_tables_info(const int64_t timeout)
if (OB_SUCC(ret) && enable_white_black_list_ && OB_FAIL(add_user_table_info_(schema_guard,
table_schema, timeout))) {
LOG_ERROR("add_user_table_info failed", KR(ret), K_(tenant_id), KPC(table_schema));
if (OB_TIMEOUT != ret) {
LOG_ERROR("add_user_table_info failed", KR(ret), K_(tenant_id), KPC(table_schema));
}
} else if (TCONF.enable_hbase_mode && OB_FAIL(add_hbase_table(schema_guard, table_schema, timeout))) {
if (OB_TIMEOUT != ret) {
LOG_WARN("add_hbase_table fail", KR(ret));
}
}
}
@ -212,7 +218,13 @@ int ObLogPartMgr::add_all_user_tablets_and_tables_info(
if (OB_SUCC(ret) && enable_white_black_list_ && OB_FAIL(add_user_table_info_(tenant_info,
table_meta, timeout))) {
LOG_ERROR("add_user_table_info failed", KR(ret), K_(tenant_id), KPC(table_meta));
if (OB_TIMEOUT != ret) {
LOG_ERROR("add_user_table_info failed", KR(ret), K_(tenant_id), KPC(table_meta));
}
} else if (TCONF.enable_hbase_mode && OB_FAIL(add_hbase_table(table_meta, timeout))) {
if (OB_TIMEOUT != ret) {
LOG_WARN("add_hbase_table fail", KR(ret), K(table_meta));
}
}
}
@ -402,6 +414,73 @@ int ObLogPartMgr::add_table(const uint64_t table_id,
return ret;
}
int ObLogPartMgr::add_hbase_table(const uint64_t table_id,
DdlStmtTask &ddl_stmt,
const int64_t new_schema_version,
const int64_t timeout)
{
int ret = OB_SUCCESS;
const char *tenant_name = nullptr;
const char *database_name = nullptr;
const char *table_name = nullptr;
if (OB_FAIL(get_schema_info_of_table_id_(table_id, new_schema_version, tenant_name,
database_name, table_name, timeout))) {
if (OB_TIMEOUT != ret) {
LOG_ERROR("get_schema_info_of_table_id_ failed", KR(ret), K(table_id), K(new_schema_version));
}
} else if (OB_FAIL(try_add_hbase_table_(table_id, table_name, new_schema_version, timeout))) {
if (OB_TIMEOUT != ret) {
LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name), K(new_schema_version));
}
} else {
LOG_INFO("try_add_hbase_table_ success", K(table_id), K(table_name), K(new_schema_version));
}
return ret;
}
int ObLogPartMgr::add_hbase_table(ObLogSchemaGuard &schema_guard,
const ObSimpleTableSchemaV2 *table_schema,
const int64_t timeout)
{
int ret = OB_SUCCESS;
const uint64_t table_id = table_schema->get_table_id();
const char *table_name = table_schema->get_table_name();
const ObTableSchema *full_table_schema = NULL;
if (OB_FAIL(get_full_table_schema_(table_id, timeout, schema_guard, full_table_schema))) {
if (OB_TIMEOUT != ret) {
LOG_ERROR("get full table_schema failed", KR(ret), K(table_id));
}
} else if (OB_FAIL(try_add_hbase_table_(full_table_schema, timeout))) {
if (OB_TIMEOUT != ret) {
LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name));
}
} else {
LOG_INFO("try_add_hbase_table_ success", K(table_id), K(table_name));
}
return ret;
}
int ObLogPartMgr::add_hbase_table(const datadict::ObDictTableMeta *table_meta,
const int64_t timeout)
{
int ret = OB_SUCCESS;
const char *table_name = table_meta->get_table_name();
if (OB_FAIL(try_add_hbase_table_(table_meta, timeout))) {
if (OB_TIMEOUT != ret) {
LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_name));
}
} else {
LOG_INFO("try_add_hbase_table_ success", K(table_name));
}
return ret;
}
int ObLogPartMgr::try_get_offline_ddl_origin_table_schema_(const ObSimpleTableSchemaV2 &table_schema,
ObLogSchemaGuard &schema_guard,
const int64_t timeout,
@ -1658,22 +1737,6 @@ int ObLogPartMgr::add_user_table_info_(ObLogSchemaGuard &schema_guard,
ISTAT("insert table_id into cache success", K_(tenant_id), K(table_id),
K(database_id), K(tenant_name), K(database_name), K(table_name));
}
if (OB_SUCC(ret) && TCONF.enable_hbase_mode) {
// add hbase table
const ObTableSchema *full_table_schema = NULL;
if (OB_FAIL(get_full_table_schema_(table_id, timeout, schema_guard, full_table_schema))) {
if (OB_TIMEOUT != ret) {
LOG_ERROR("get full table_schema failed", KR(ret), "table_id", table_id);
}
} else if (OB_FAIL(try_add_hbase_table_(full_table_schema, table_name, timeout))) {
if (OB_TIMEOUT != ret) {
LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name));
}
} else {
LOG_INFO("try_add_hbase_table_ success", K(table_id), K(table_name));
}
}
}
return ret;
}
@ -1711,17 +1774,6 @@ int ObLogPartMgr::add_user_table_info_(ObDictTenantInfo *tenant_info,
ISTAT("insert table_id into cache success", K_(tenant_id), K(table_id),
K(database_id), K(tenant_name), K(database_name), K(table_name));
}
if (OB_SUCC(ret) && TCONF.enable_hbase_mode) {
// add hbase table
if (OB_FAIL(try_add_hbase_table_(table_meta, table_name, timeout))) {
if (OB_TIMEOUT != ret) {
LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name));
}
} else {
LOG_INFO("try_add_hbase_table_ success", K(table_id), K(table_name));
}
}
}
return ret;
}
@ -2721,7 +2773,9 @@ int ObLogPartMgr::try_add_hbase_table_(const uint64_t table_id,
int ret = OB_SUCCESS;
ObString tb_name_str(table_name);
// if table_name contains '$', it may be hbase table
if (NULL != tb_name_str.find('$')) {
if (NULL == tb_name_str.find('$')) {
// table_name_str doesn't contain '$' or maybe empty
} else {
if (is_online_refresh_mode(TCTX.refresh_mode_)) {
IObLogSchemaGetter *schema_getter = TCTX.schema_getter_;
ObLogSchemaGuard schema_guard;
@ -2734,10 +2788,10 @@ int ObLogPartMgr::try_add_hbase_table_(const uint64_t table_id,
if (OB_TIMEOUT != ret) {
LOG_ERROR("get_schema_guard_and_full_table_schema failed", KR(ret), K(table_id), KPC(full_table_schema));
}
} else if (OB_FAIL(try_add_hbase_table_(full_table_schema, table_name, timeout))) {
} else if (OB_FAIL(try_add_hbase_table_(full_table_schema, timeout))) {
LOG_ERROR("inner try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name));
} else {
// succ
LOG_INFO("inner try_add_hbase_table_ success", K(table_id), K(table_name));
}
} else {
ObDictTenantInfoGuard dict_tenant_info_guard;
@ -2750,38 +2804,46 @@ int ObLogPartMgr::try_add_hbase_table_(const uint64_t table_id,
LOG_ERROR("tenant_info is nullptr", K_(tenant_id));
} else if (OB_FAIL(tenant_info->get_table_meta(table_id, table_meta))) {
LOG_ERROR("tenant_info get table_meta failed", KR(ret), K_(tenant_id));
} else if (OB_FAIL(try_add_hbase_table_(table_meta, table_name, timeout))) {
} else if (OB_FAIL(try_add_hbase_table_(table_meta, timeout))) {
LOG_ERROR("inner try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name));
} else {
// succ
LOG_INFO("inner try_add_hbase_table_ success", K(table_id), K(table_name));
}
}
}
return ret;
}
template<class TABLE_SCHEMA>
int ObLogPartMgr::try_add_hbase_table_(const TABLE_SCHEMA *table_schema,
const char *table_name,
const int64_t timeout)
{
int ret = OB_SUCCESS;
ObString tb_name_str(table_name);
// if table_name contains '$', it may be hbase table
if (NULL != tb_name_str.find('$')) {
uint64_t table_id = OB_INVALID_ID;
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("table_schema is NULL", KR(ret), K(table_schema));
} else if (FALSE_IT(table_id = table_schema->get_table_id())) {
} else if (table_schema->is_in_recyclebin()) {
LOG_INFO("table is in recyclebin, no need to add", K(table_id), K(table_name));
} else if (OB_FAIL(TCTX.hbase_util_.add_hbase_table_id(*table_schema))) {
LOG_ERROR("hbase_util_ add_hbase_table_id", KR(ret), K(table_id), K(table_name));
} else {
// succ
const char *table_name = table_schema->get_table_name();
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("table_schema is nullptr", KR(ret));
} else {
ObString tb_name_str(table_name);
// if table_name contains '$', it may be hbase table
if (NULL != tb_name_str.find('$')) {
uint64_t table_id = OB_INVALID_ID;
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("table_schema is NULL", KR(ret), K(table_schema));
} else if (FALSE_IT(table_id = table_schema->get_table_id())) {
} else if (table_schema->is_in_recyclebin()) {
LOG_INFO("table is in recyclebin, no need to add", K(table_id), K(table_name));
} else if (OB_FAIL(TCTX.hbase_util_.add_hbase_table_id(*table_schema))) {
LOG_ERROR("hbase_util_ add_hbase_table_id", KR(ret), K(table_id), K(table_name));
} else {
LOG_INFO("inner try_add_hbase_table_ success", K(table_id), K(table_name));
}
}
}
return ret;
}

View File

@ -87,6 +87,18 @@ public:
const int64_t new_schema_version,
const int64_t timeout) = 0;
virtual int add_hbase_table(const uint64_t table_id,
DdlStmtTask &ddl_stmt,
const int64_t new_schema_version,
const int64_t timeout) = 0;
virtual int add_hbase_table(ObLogSchemaGuard &schema_guard,
const ObSimpleTableSchemaV2 *table_schema,
const int64_t timeout) = 0;
virtual int add_hbase_table(const datadict::ObDictTableMeta *table_meta,
const int64_t timeout) = 0;
/// Add a global unique index table, create index table scenario
/// @note must be called by a single thread in order by Schema version, not concurrently and in random order
////
@ -300,6 +312,15 @@ public:
DdlStmtTask &ddl_stmt,
const int64_t new_schema_version,
const int64_t timeout);
virtual int add_hbase_table(const uint64_t table_id,
DdlStmtTask &ddl_stmt,
const int64_t new_schema_version,
const int64_t timeout);
virtual int add_hbase_table(ObLogSchemaGuard &schema_guard,
const ObSimpleTableSchemaV2 *table_schema,
const int64_t timeout);
virtual int add_hbase_table(const datadict::ObDictTableMeta *table_meta,
const int64_t timeout);
virtual int alter_table(const uint64_t table_id,
const int64_t schema_version_before_alter,
const int64_t schema_version_after_alter,
@ -633,7 +654,6 @@ private:
const int64_t timeout);
template<class TABLE_SCHEMA>
int try_add_hbase_table_(const TABLE_SCHEMA *table_schema,
const char *table_name,
const int64_t timeout);
private: