[CP] [CP] obcdc support hbase table

This commit is contained in:
fkuner
2024-04-01 07:58:30 +00:00
committed by ob-robot
parent 29c5f9b4a7
commit 1b4b0d1d3c
4 changed files with 281 additions and 63 deletions

View File

@ -64,27 +64,6 @@ void ObLogHbaseUtil::destroy()
column_id_map_.destroy();
}
int ObLogHbaseUtil::add_hbase_table_id(const ObTableSchema &table_schema)
{
int ret = OB_SUCCESS;
bool is_hbase_mode_table = false;
const uint64_t table_id = table_schema.get_table_id();
const char *table_name = table_schema.get_table_name();
if (OB_FAIL(filter_hbase_mode_table_(table_schema, is_hbase_mode_table))) {
LOG_ERROR("filter_hbase_mode_table_ fail", KR(ret), K(table_id), K(table_name), K(is_hbase_mode_table));
} else if (! is_hbase_mode_table) {
LOG_INFO("[IS_NOT_HBASE_TABLE]", K(table_name), K(table_id), K(is_hbase_mode_table));
} else if (OB_FAIL(table_id_set_.set_refactored(table_id))) {
LOG_ERROR("add_table_id into table_id_set_ fail", KR(ret), K(table_name), K(table_id));
} else {
LOG_INFO("[HBASE] add_table_id into table_id_set_ succ", K(table_name), K(table_id));
}
return ret;
}
int ObLogHbaseUtil::filter_hbase_mode_table_(const ObTableSchema &table_schema,
bool &is_hbase_mode_table)
{
@ -93,11 +72,11 @@ int ObLogHbaseUtil::filter_hbase_mode_table_(const ObTableSchema &table_schema,
is_hbase_mode_table = false;
// Marks the presence or absence of a specified column
int column_flag[HBASE_TABLE_COLUMN_COUNT];
memset(column_flag, '\0', sizeof(column_flag));
// Mark column T as bigint or not
bool is_T_column_bigint_type = false;
// Record T-column id
uint64_t column_id = OB_INVALID_ID;
memset(column_flag, '\0', sizeof(column_flag));
ObColumnIterByPrevNextID pre_next_id_iter(table_schema);
while (OB_SUCCESS == ret) {
@ -110,23 +89,9 @@ int ObLogHbaseUtil::filter_hbase_mode_table_(const ObTableSchema &table_schema,
} else if (OB_ISNULL(column_schema)) {
LOG_ERROR("column_schema is null", KPC(column_schema));
ret = OB_ERR_UNEXPECTED;
} else if (match_column_name_(*column_schema, HBASE_TABLE_COLUMN_COUNT, column_flag, is_T_column_bigint_type, column_id)) {
LOG_WARN("match column name failed", KR(ret), K(column_schema));
} else {
const char *column_name = column_schema->get_column_name();
if (0 == strcmp(column_name, K_COLUMN)) {
column_flag[0]++;
} else if (0 == strcmp(column_name, Q_COLUMN)) {
column_flag[1]++;
} else if (0 == strcmp(column_name, T_COLUMN)) {
column_flag[2]++;
if (ObIntType == column_schema->get_data_type()) {
is_T_column_bigint_type = true;
column_id = column_schema->get_column_id();
}
} else if (0 == strcmp(column_name, V_COLUMN)) {
column_flag[3]++;
}
}
} // while
@ -135,38 +100,54 @@ int ObLogHbaseUtil::filter_hbase_mode_table_(const ObTableSchema &table_schema,
ret = OB_SUCCESS;
}
int64_t hbase_table_column_cnt = 0;
// check contains four columns K, Q, T, V
for (int64_t idx=0; idx < HBASE_TABLE_COLUMN_COUNT && OB_SUCC(ret); ++idx) {
if (1 == column_flag[idx]) {
++hbase_table_column_cnt;
}
if (OB_SUCC(ret) && OB_FAIL(judge_and_add_hbase_table_(table_schema, is_T_column_bigint_type, column_id,
HBASE_TABLE_COLUMN_COUNT, column_flag, is_hbase_mode_table))) {
LOG_WARN("judge hbase table failed", KR(ret), K(table_schema), K(is_hbase_mode_table), K(is_T_column_bigint_type),
K(column_id), K(column_flag));
}
if (OB_SUCC(ret)) {
if ((HBASE_TABLE_COLUMN_COUNT == hbase_table_column_cnt)
&& is_T_column_bigint_type) {
is_hbase_mode_table = true;
return ret;
}
int ObLogHbaseUtil::filter_hbase_mode_table_(const ObDictTableMeta &table_meta,
bool &is_hbase_mode_table)
{
int ret = OB_SUCCESS;
TableID table_key(table_schema.get_table_id());
if (OB_UNLIKELY(OB_INVALID_ID == column_id)) {
LOG_ERROR("column_id is not valid", K(column_id));
ret = OB_ERR_UNEXPECTED;
} else if (OB_FAIL(column_id_map_.insert(table_key, column_id))) {
LOG_ERROR("column_id_map_ insert fail", KR(ret), K(table_key), K(column_id));
} else {
// succ
}
} else {
is_hbase_mode_table = false;
// Marks the presence or absence of a specified column
int column_flag[HBASE_TABLE_COLUMN_COUNT];
memset(column_flag, '\0', sizeof(column_flag));
// Mark column T as bigint or not
bool is_T_column_bigint_type = false;
// Record T-column id
uint64_t column_id = OB_INVALID_ID;
const int64_t column_count = table_meta.get_column_count();
const datadict::ObDictColumnMeta *col_metas = table_meta.get_column_metas();
if (column_count <= 0) {
LOG_TRACE("table don't have columns, skip", K(table_meta));
} else if (OB_ISNULL(col_metas)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("col_metas is nullptr", KR(ret), K(col_metas), K(table_meta));
} else {
for (int idx = 0; OB_SUCC(ret) && idx < column_count; idx++) {
const ObDictColumnMeta *col_meta = col_metas + idx;
if (OB_ISNULL(col_meta)) {
ret = OB_INVALID_DATA;
LOG_WARN("unexpected invalid ObDictColumnMeta", KR(ret), K(col_meta), K(table_meta));
} else if (match_column_name_(*col_meta, HBASE_TABLE_COLUMN_COUNT, column_flag, is_T_column_bigint_type, column_id)) {
LOG_WARN("match column name failed", KR(ret), K(col_meta));
} else {
}
}
}
LOG_INFO("[HBASE] table info", "table_id", table_schema.get_table_id(),
"table_name", table_schema.get_table_name(),
K(hbase_table_column_cnt),
K(column_id), K(is_T_column_bigint_type),
K(is_hbase_mode_table));
if (OB_SUCC(ret) && OB_FAIL(judge_and_add_hbase_table_(table_meta, is_T_column_bigint_type, column_id,
HBASE_TABLE_COLUMN_COUNT, column_flag, is_hbase_mode_table))) {
LOG_WARN("judge hbase table failed", KR(ret), K(table_meta), K(is_T_column_bigint_type),
K(column_id), K(column_flag), K(is_hbase_mode_table));
}
return ret;
}
@ -228,5 +209,84 @@ int ObLogHbaseUtil::is_hbase_table(const uint64_t table_id,
return ret;
}
template <class COLUMN_SCHEMA>
int ObLogHbaseUtil::match_column_name_(const COLUMN_SCHEMA &col_schema,
const int column_flag_size,
int *column_flag,
bool &is_T_column_bigint_type,
uint64_t &column_id)
{
int ret = OB_SUCCESS;
const char *column_name = col_schema.get_column_name();
if (HBASE_TABLE_COLUMN_COUNT > column_flag_size) {
ret = OB_INVALID_DATA;
LOG_WARN("column_flag is invalid", KR(ret), K(column_flag));
} else if (0 == strcmp(column_name, K_COLUMN)) {
column_flag[0]++;
} else if (0 == strcmp(column_name, Q_COLUMN)) {
column_flag[1]++;
} else if (0 == strcmp(column_name, T_COLUMN)) {
column_flag[2]++;
if (ObIntType == col_schema.get_data_type()) {
is_T_column_bigint_type = true;
column_id = col_schema.get_column_id();
}
} else if (0 == strcmp(column_name, V_COLUMN)) {
column_flag[3]++;
}
return ret;
}
template<class TABLE_SCHEMA>
int ObLogHbaseUtil::judge_and_add_hbase_table_(const TABLE_SCHEMA &table_schema,
const bool is_T_column_bigint_type,
const uint64_t column_id,
const int column_flag_size,
const int *column_flag,
bool &is_hbase_mode_table)
{
int ret = OB_SUCCESS;
int64_t hbase_table_column_cnt = 0;
if (HBASE_TABLE_COLUMN_COUNT > column_flag_size) {
ret = OB_INVALID_DATA;
LOG_WARN("column_flag is invalid ", KR(ret), K(column_flag));
} else {
// check contains four columns K, Q, T, V
for (int64_t idx = 0; idx < column_flag_size; ++idx) {
if (1 == column_flag[idx]) {
++hbase_table_column_cnt;
}
}
if ((HBASE_TABLE_COLUMN_COUNT == hbase_table_column_cnt)
&& is_T_column_bigint_type) {
is_hbase_mode_table = true;
TableID table_key(table_schema.get_table_id());
if (OB_UNLIKELY(OB_INVALID_ID == column_id)) {
LOG_ERROR("column_id is not valid", K(column_id));
ret = OB_ERR_UNEXPECTED;
} else if (OB_FAIL(column_id_map_.insert(table_key, column_id))) {
LOG_ERROR("column_id_map_ insert fail", KR(ret), K(table_key), K(column_id));
} else {
// succ
}
} else {
is_hbase_mode_table = false;
}
LOG_INFO("[HBASE] table info", "table_id", table_schema.get_table_id(),
"table_name", table_schema.get_table_name(),
K(hbase_table_column_cnt),
K(column_id), K(is_T_column_bigint_type),
K(is_hbase_mode_table));
}
return ret;
}
}
}

View File

@ -20,6 +20,10 @@
namespace oceanbase
{
namespace datadict
{
class ObDictTableMeta;
}
namespace share
{
namespace schema
@ -46,7 +50,27 @@ public:
// 2. contains four columns K, Q, T, V
// 3. T is of type bigint
// Note: All of the above conditions are not necessarily met for an hbase table
int add_hbase_table_id(const oceanbase::share::schema::ObTableSchema &table_schema);
template<class TABLE_SCHEMA>
int add_hbase_table_id(const TABLE_SCHEMA &table_schema)
{
int ret = OB_SUCCESS;
bool is_hbase_mode_table = false;
const uint64_t table_id = table_schema.get_table_id();
const char *table_name = table_schema.get_table_name();
if (OB_FAIL(filter_hbase_mode_table_(table_schema, is_hbase_mode_table))) {
OBLOG_LOG(ERROR, "filter_hbase_mode_table_ fail", KR(ret), K(table_id), K(table_name), K(is_hbase_mode_table));
} else if (! is_hbase_mode_table) {
OBLOG_LOG(INFO, "[IS_NOT_HBASE_TABLE]", K(table_name), K(table_id), K(is_hbase_mode_table));
} else if (OB_FAIL(table_id_set_.set_refactored(table_id))) {
OBLOG_LOG(ERROR, "add_table_id into table_id_set_ fail", KR(ret), K(table_name), K(table_id));
} else {
OBLOG_LOG(INFO, "[HBASE] add_table_id into table_id_set_ succ", K(table_name), K(table_id));
}
return ret;
}
// Determine if conversion is required
// table exists and is a T column
@ -108,6 +132,22 @@ private:
private:
int filter_hbase_mode_table_(const oceanbase::share::schema::ObTableSchema &table_schema,
bool &is_hbase_mode_table);
int filter_hbase_mode_table_(const oceanbase::datadict::ObDictTableMeta &table_meta,
bool &is_hbase_mode_table);
template <class COLUMN_SCHEMA>
int match_column_name_(const COLUMN_SCHEMA &col_schema,
const int column_flag_size,
int *column_flag,
bool &is_T_column_bigint_type,
uint64_t &column_id);
template<class TABLE_SCHEMA>
int judge_and_add_hbase_table_(const TABLE_SCHEMA &table_schema,
const bool is_T_column_bigint_type,
const uint64_t column_id,
const int column_flag_size,
const int *column_flag,
bool &is_hbase_mode_table);
private:
bool inited_;

View File

@ -385,6 +385,17 @@ int ObLogPartMgr::add_table(const uint64_t table_id,
ISTAT("set tic update info success", K(new_schema_version), K(tic_update_info),
K(tenant_name), K(database_name), K(table_name));
}
if (OB_SUCC(ret) && TCONF.enable_hbase_mode) {
// add hbase table
if (OB_FAIL(try_add_hbase_table_(table_id, table_name, new_schema_version, timeout))) {
if (OB_TIMEOUT != ret) {
LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name), K(new_schema_version));
}
} else {
LOG_INFO("try_add_hbase_table_ success", K(table_id), K(table_name), K(new_schema_version));
}
}
}
return ret;
}
@ -1607,6 +1618,22 @@ int ObLogPartMgr::add_user_table_info_(ObLogSchemaGuard &schema_guard,
ISTAT("insert table_id into cache success", K_(tenant_id), K(table_id),
K(database_id), K(tenant_name), K(database_name), K(table_name));
}
if (OB_SUCC(ret) && TCONF.enable_hbase_mode) {
// add hbase table
const ObTableSchema *full_table_schema = NULL;
if (OB_FAIL(get_full_table_schema_(table_id, timeout, schema_guard, full_table_schema))) {
if (OB_TIMEOUT != ret) {
LOG_ERROR("get full table_schema failed", KR(ret), "table_id", table_id);
}
} else if (OB_FAIL(try_add_hbase_table_(full_table_schema, table_name, timeout))) {
if (OB_TIMEOUT != ret) {
LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name));
}
} else {
LOG_INFO("try_add_hbase_table_ success", K(table_id), K(table_name));
}
}
}
return ret;
}
@ -1644,6 +1671,17 @@ int ObLogPartMgr::add_user_table_info_(ObDictTenantInfo *tenant_info,
ISTAT("insert table_id into cache success", K_(tenant_id), K(table_id),
K(database_id), K(tenant_name), K(database_name), K(table_name));
}
if (OB_SUCC(ret) && TCONF.enable_hbase_mode) {
// add hbase table
if (OB_FAIL(try_add_hbase_table_(table_meta, table_name, timeout))) {
if (OB_TIMEOUT != ret) {
LOG_WARN("try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name));
}
} else {
LOG_INFO("try_add_hbase_table_ success", K(table_id), K(table_name));
}
}
}
return ret;
}
@ -2597,6 +2635,78 @@ int ObLogPartMgr::inner_get_table_info_of_table_meta_(ObDictTenantInfo *tenant_i
return ret;
}
int ObLogPartMgr::try_add_hbase_table_(const uint64_t table_id,
const char *table_name,
const int64_t schema_version,
const int64_t timeout)
{
int ret = OB_SUCCESS;
ObString tb_name_str(table_name);
// if table_name contains '$', it may be hbase table
if (NULL != tb_name_str.find('$')) {
if (is_online_refresh_mode(TCTX.refresh_mode_)) {
IObLogSchemaGetter *schema_getter = TCTX.schema_getter_;
ObLogSchemaGuard schema_guard;
const ObTableSchema *full_table_schema = NULL;
if (OB_ISNULL(schema_getter)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("schema_getter is NULL", KR(ret), K(schema_getter));
} else if (OB_FAIL(schema_getter->get_schema_guard_and_full_table_schema(
tenant_id_, table_id, schema_version, timeout, schema_guard, full_table_schema))) {
if (OB_TIMEOUT != ret) {
LOG_ERROR("get_schema_guard_and_full_table_schema failed", KR(ret), K(table_id), KPC(full_table_schema));
}
} else if (try_add_hbase_table_(full_table_schema, table_name, timeout)) {
LOG_ERROR("inner try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name));
} else {
// succ
}
} else {
ObDictTenantInfoGuard dict_tenant_info_guard;
ObDictTenantInfo *tenant_info = nullptr;
datadict::ObDictTableMeta *table_meta = nullptr;
if (OB_FAIL(GLOGMETADATASERVICE.get_tenant_info_guard(tenant_id_, dict_tenant_info_guard))) {
LOG_ERROR("get tenant_info_guard failed", KR(ret), K_(tenant_id));
} else if (OB_ISNULL(tenant_info = dict_tenant_info_guard.get_tenant_info())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("tenant_info is nullptr", K_(tenant_id));
} else if (OB_FAIL(tenant_info->get_table_meta(table_id, table_meta))) {
LOG_ERROR("tenant_info get table_meta failed", KR(ret), K_(tenant_id));
} else if (try_add_hbase_table_(table_meta, table_name, timeout)) {
LOG_ERROR("inner try_add_hbase_table_ failed", KR(ret), K(table_id), K(table_name));
} else {
// succ
}
}
}
return ret;
}
template<class TABLE_SCHEMA>
int ObLogPartMgr::try_add_hbase_table_(const TABLE_SCHEMA *table_schema,
const char *table_name,
const int64_t timeout)
{
int ret = OB_SUCCESS;
ObString tb_name_str(table_name);
// if table_name contains '$', it may be hbase table
if (NULL != tb_name_str.find('$')) {
uint64_t table_id = OB_INVALID_ID;
if (OB_ISNULL(table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("table_schema is NULL", KR(ret), K(table_schema));
} else if (FALSE_IT(table_id = table_schema->get_table_id())) {
} else if (table_schema->is_in_recyclebin()) {
LOG_INFO("table is in recyclebin, no need to add", K(table_id), K(table_name));
} else if (OB_FAIL(TCTX.hbase_util_.add_hbase_table_id(*table_schema))) {
LOG_ERROR("hbase_util_ add_hbase_table_id", KR(ret), K(table_id), K(table_name));
} else {
// succ
}
}
return ret;
}
}
}
#undef _STAT

View File

@ -625,6 +625,14 @@ private:
const char *&table_name,
uint64_t &database_id,
bool &is_user_table);
int try_add_hbase_table_(const uint64_t table_id,
const char *table_name,
const int64_t schema_version,
const int64_t timeout);
template<class TABLE_SCHEMA>
int try_add_hbase_table_(const TABLE_SCHEMA *table_schema,
const char *table_name,
const int64_t timeout);
private:
ObLogTenant &host_;