[OBCDC] filter tenant in advance when working in data_dict refresh mode

This commit is contained in:
obdev 2023-03-02 18:54:17 +00:00 committed by ob-robot
parent 4bcd321454
commit ca825173cf
4 changed files with 90 additions and 17 deletions

View File

@ -494,7 +494,7 @@ int FetchStream::check_need_fetch_log_with_upper_limit_(bool &need_fetch_log)
{
int ret = OB_SUCCESS;
if (OB_FAIL(get_upper_limit(upper_limit_))) {
LOG_ERROR("get upper limit failed", KR(ret));
LOG_ERROR("get upper limit failed", KR(ret), "tls_id", ls_fetch_ctx_->get_tls_id());
} else if (OB_FAIL(check_need_fetch_log_(upper_limit_, need_fetch_log))) {
LOG_ERROR("check need fetch log failed", KR(ret), K(upper_limit_));
}
@ -920,14 +920,15 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag)
} else if (! is_direct_fetching_mode(ls_fetch_ctx_->get_fetching_mode())) {
const ClientFetchingMode mode = ls_fetch_ctx_->get_fetching_mode();
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("fetching mode of ls fetch ctx doesn't match", KR(ret), K(mode));
LOG_ERROR("fetching mode of ls fetch ctx doesn't match", KR(ret), K(mode), "tls_id", ls_fetch_ctx_->get_tls_id());
} else if (OB_FAIL(check_need_fetch_log_with_upper_limit_(need_fetch_log))) {
LOG_ERROR("get upper limit failed", KR(ret), KPC(this), K(need_fetch_log));
LOG_ERROR("get upper limit failed", KR(ret), KPC(this), K(need_fetch_log), "tls_id", ls_fetch_ctx_->get_tls_id());
} else if (! need_fetch_log && OB_FAIL(hibernate_())) {
LOG_ERROR("hibernate_ failed", KR(ret), KPC(this));
} else {
KickOutInfo kick_out_info;
TransStatInfo tsi;
const TenantLSID &tls_id = ls_fetch_ctx_->get_tls_id();
int64_t fetched_group_entry_cnt = 0;
int64_t fetched_group_entry_size = 0;
int64_t start_handle_timestamp = get_timestamp();
@ -969,12 +970,12 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag)
LOG_ERROR("read group entry failed when handling fetch archive task", KR(ret), K(log_group_entry),
K(lsn), K(kick_out_info), K(ls_fetch_ctx_));
} else if (OB_NEED_RETRY == ret) {
LOG_WARN("read_group_entry failed, retry", KR(ret), K(log_group_entry), K(lsn));
LOG_WARN("read_group_entry failed, retry", KR(ret), K(log_group_entry), K(lsn), K(tls_id));
need_fetch_log = false;
ret = OB_SUCCESS;
}
} else if (OB_FAIL(ls_fetch_ctx_->update_progress(log_group_entry, lsn))) {
LOG_ERROR("ls fetch ctx update progress failed", KR(ret), K(log_group_entry), K(lsn));
LOG_ERROR("ls fetch ctx update progress failed", KR(ret), K(log_group_entry), K(lsn), K(tls_id));
}
if (OB_SUCC(ret)) {
const int64_t submit_ts = log_group_entry.get_scn().get_val_for_logservice();
@ -988,7 +989,7 @@ int FetchStream::handle_fetch_archive_task_(volatile bool &stop_flag)
const int64_t read_log_time = get_timestamp() - start_handle_timestamp;
if (OB_FAIL(update_fetch_task_state_(kick_out_info, stop_flag, flush_time))) {
LOG_ERROR("update fetch task state failed", KR(ret), K(kick_out_info));
LOG_ERROR("update fetch task state failed", KR(ret), K(kick_out_info), K(tls_id));
} else {
update_fetch_stat_info_(fetched_group_entry_cnt, fetched_group_entry_size,
read_log_time, fetch_remote_time, flush_time, tsi);

View File

@ -178,7 +178,7 @@ int QueryAllTenantStrategy::build_sql_statement(
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid argument", KR(ret), K(sql_buf), K(mul_statement_buf_len));
} else if (OB_FAIL(databuff_printf(sql_buf, mul_statement_buf_len, pos,
"SELECT TENANT_ID FROM %s WHERE TENANT_TYPE != 'META'", OB_DBA_OB_TENANTS_TNAME))) {
"SELECT TENANT_ID, TENANT_NAME FROM %s WHERE TENANT_TYPE != 'META'", OB_DBA_OB_TENANTS_TNAME))) {
LOG_ERROR("build_sql_statement failed for query all_tenant_info", KR(ret), K(pos), KCSTRING(sql_buf));
}
@ -585,26 +585,29 @@ int IObLogSysTableHelper::BatchSQLQuery::get_records(
return get_records_tpl_(records, "QueryAllServerInfo", record_count);
}
int IObLogSysTableHelper::BatchSQLQuery::parse_record_from_row_(common::ObIArray<uint64_t> &records)
int IObLogSysTableHelper::BatchSQLQuery::parse_record_from_row_(common::ObIArray<TenantInfo> &records)
{
int ret = OB_SUCCESS;
uint64_t tenant_id = OB_INVALID_TENANT_ID;
int64_t index = -1;
ObString tenant_name;
index++;
GET_DATA(uint, index, tenant_id, "TENANT_ID");
index++;
GET_DATA(varchar, index, tenant_name, "TENANT_NAME");
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) {
ret = OB_INVALID_DATA;
LOG_ERROR("invalid tenant_id query from server", KR(ret), K(tenant_id));
} else if (OB_FAIL(records.push_back(tenant_id))) {
} else if (OB_FAIL(records.push_back(TenantInfo(tenant_id, tenant_name)))) {
LOG_ERROR("push_back tenant_id into tenant_id_list failed", KR(ret), K(tenant_id), K(records));
}
return ret;
}
int IObLogSysTableHelper::BatchSQLQuery::get_records(common::ObIArray<uint64_t> &records)
int IObLogSysTableHelper::BatchSQLQuery::get_records(common::ObIArray<TenantInfo> &records)
{
int64_t record_count = 0;
return get_records_tpl_(records, "QueryAllTenantInfo", record_count);
@ -1077,12 +1080,12 @@ int ObLogSysTableHelper::query_sql_server_list(
return ret;
}
int ObLogSysTableHelper::query_tenant_id_list(common::ObIArray<uint64_t> &tenant_id_list)
int ObLogSysTableHelper::query_tenant_info_list(common::ObIArray<TenantInfo> &tenant_info_list)
{
int ret = OB_SUCCESS;
BatchSQLQuery query;
QueryAllTenantStrategy query_all_tenant_strategy;
tenant_id_list.reset();
tenant_info_list.reset();
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
@ -1099,7 +1102,7 @@ int ObLogSysTableHelper::query_tenant_id_list(common::ObIArray<uint64_t> &tenant
"mysql_error_code", query.get_mysql_err_code(),
"mysql_error_msg", query.get_mysql_err_msg());
}
} else if (OB_FAIL(query.get_records(tenant_id_list))) {
} else if (OB_FAIL(query.get_records(tenant_info_list))) {
if (OB_NEED_RETRY == ret) {
LOG_WARN("get_records fail while query_all_tenant_info, need retry", KR(ret),
"mysql_error_code", query.get_mysql_err_code(),
@ -1111,6 +1114,36 @@ int ObLogSysTableHelper::query_tenant_id_list(common::ObIArray<uint64_t> &tenant
}
}
LOG_DEBUG("query_all_tenant_info", KR(ret), K(tenant_info_list));
return ret;
}
int ObLogSysTableHelper::query_tenant_id_list(common::ObIArray<uint64_t> &tenant_id_list)
{
int ret = OB_SUCCESS;
BatchSQLQuery query;
QueryAllTenantStrategy query_all_tenant_strategy;
common::ObSEArray<TenantInfo, 16> tenant_info_list;
tenant_id_list.reset();
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
LOG_ERROR("systable_helper not init", KR(ret));
} else if (OB_FAIL(query_tenant_info_list(tenant_info_list))) {
LOG_ERROR("query tenant_id, tenant_name list failed", KR(ret), K(tenant_info_list));
} else {
const int64_t tenant_list_size = tenant_info_list.count();
ARRAY_FOREACH_N(tenant_info_list, i, tenant_list_size) {
const TenantInfo &tenant_id_name = tenant_info_list.at(i);
if (OB_FAIL(tenant_id_list.push_back(tenant_id_name.tenant_id))) {
LOG_ERROR("push tenant_id into tenant_id_list failed", K(tenant_id_name), K(tenant_id_list),
K(tenant_info_list));
}
}
}
LOG_DEBUG("query_all_tenant_info", KR(ret), K(tenant_id_list));
return ret;

View File

@ -181,6 +181,7 @@ public:
static const int64_t ALL_SERVER_DEFAULT_RECORDS_NUM = 32;
struct ClusterInfo;
struct TenantInfo;
struct ObServerVersionInfo;
typedef common::ObSEArray<ObServerVersionInfo, DEFAULT_RECORDS_NUM> ObServerVersionInfoArray;
@ -217,6 +218,8 @@ public:
const ObLogSvrBlacklist &server_blacklist,
common::ObIArray<common::ObAddr> &sql_server_list) = 0;
virtual int query_tenant_info_list(common::ObIArray<TenantInfo> &tenant_info_list) = 0;
// query tenant_id list
virtual int query_tenant_id_list(common::ObIArray<uint64_t> &tenant_id_list) = 0;
@ -310,7 +313,7 @@ public:
* Error codes
* - OB_NEED_RETRY: Connection error encountered
*/
int get_records(common::ObIArray<uint64_t> &record);
int get_records(common::ObIArray<TenantInfo> &record);
/*
* Error codes
@ -336,7 +339,7 @@ public:
int parse_record_from_row_(
const ObLogSvrBlacklist &server_blacklist,
common::ObIArray<common::ObAddr> &records);
int parse_record_from_row_(common::ObIArray<uint64_t> &records);
int parse_record_from_row_(common::ObIArray<TenantInfo> &records);
int parse_record_from_row_(common::ObIArray<common::ObAddr> &records);
int parse_record_from_row_(share::schema::TenantStatus &records);
@ -375,6 +378,17 @@ public:
TO_STRING_KV(K_(cluster_id));
};
struct TenantInfo {
public:
TenantInfo(): tenant_id(OB_INVALID_TENANT_ID), tenant_name() {}
TenantInfo(const uint64_t id, const ObString &name): tenant_id(id), tenant_name(name) {}
TO_STRING_KV(K(tenant_id), K(tenant_name));
uint64_t tenant_id;
ObFixedLengthString<OB_MAX_TENANT_NAME_LENGTH + 1> tenant_name;
};
struct ObServerVersionInfo
{
ObServerVersionInfo() { reset(); }
@ -451,6 +465,7 @@ public:
const ObLogSvrBlacklist &server_blacklist,
common::ObIArray<ObAddr> &sql_server_list);
virtual int query_tenant_id_list(common::ObIArray<uint64_t> &tenant_id_list);
virtual int query_tenant_info_list(common::ObIArray<TenantInfo> &tenant_info_list);
virtual int query_tenant_sql_server_list(
const uint64_t tenant_id,
common::ObIArray<common::ObAddr> &tenant_server_list);

View File

@ -12,6 +12,7 @@
* Tenant Manager for OBCDC(ObLogTenantMgr)
*/
#define USING_LOG_PREFIX OBLOG
#include <algorithm> // std::min
@ -24,6 +25,7 @@
#include "ob_log_config.h" // TCONF
#include "ob_log_store_service.h"
#include "ob_cdc_tenant_sql_server_provider.h"
#include "lib/utility/ob_macro_utils.h"
#define _STAT(level, fmt, args...) _OBLOG_LOG(level, "[STAT] [TenantMgr] " fmt, ##args)
#define STAT(level, fmt, args...) OBLOG_LOG(level, "[STAT] [TenantMgr] " fmt, ##args)
@ -1312,17 +1314,39 @@ int ObLogTenantMgr::get_tenant_ids_(
}
} else if (is_data_dict_refresh_mode(refresh_mode_)) {
IObLogSysTableHelper *systable_helper = TCTX.systable_helper_;
ObSEArray<IObLogSysTableHelper::TenantInfo, 16> tenant_info_list;
bool done = false;
tenant_id_list.reset();
if (OB_ISNULL(systable_helper)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("systable_helper is NULL", KR(ret));
} else {
while (! done && OB_SUCCESS == ret) {
if (OB_FAIL(systable_helper->query_tenant_id_list(tenant_id_list))) {
if (OB_FAIL(systable_helper->query_tenant_info_list(tenant_info_list))) {
LOG_WARN("systable_helper query_tenant_ls_info fail", KR(ret), K(tenant_id_list));
} else {
done = true;
const int64_t tenant_info_list_cnt = tenant_info_list.count();
ARRAY_FOREACH_N(tenant_info_list, idx, tenant_info_list_cnt) {
const IObLogSysTableHelper::TenantInfo &tenant_info = tenant_info_list.at(idx);
bool chosen = true;
if (OB_FAIL(filter_tenant(tenant_info.tenant_name.ptr(), chosen))) {
LOG_ERROR("filter_tenant failed", K(tenant_info), K(chosen), K(start_tstamp_ns),
K(sys_schema_version));
} else if (! chosen && OB_SYS_TENANT_ID != tenant_info.tenant_id) {
// tenant have been filtered
LOG_INFO("tenant has been filtered in advance", K(chosen), K(tenant_info), K(tenant_id_list));
} else if (OB_FAIL(tenant_id_list.push_back(tenant_info.tenant_id))) {
LOG_ERROR("push back tenant_id_list failed", K(tenant_info), K(tenant_id_list),
K(tenant_info_list), K(chosen));
} else {
LOG_INFO("tenant hasn't been filtered in advance", K(chosen), K(tenant_info), K(tenant_id_list));
}
}
if (OB_SUCC(ret)) {
done = true;
}
}
if (OB_NEED_RETRY == ret) {