[CP] [OBCDC] Fix can't launch in data_dict mode if start_ts is during generation of data_dictionary

This commit is contained in:
SanmuWangZJU 2023-12-14 11:17:47 +00:00 committed by ob-robot
parent de9cd80b7c
commit 1325c496a7
12 changed files with 164 additions and 265 deletions

View File

@ -47,6 +47,7 @@
&& common::OB_ENTRY_NOT_EXIST != ret) { \
ARCHIVE_LOG(WARN, "check and get record failed", K(ret)); \
} else if (common::OB_SUCCESS == ret \
&& record_context.last_record_round_ == key.round_ \
&& common::ObTimeUtility::current_time_ns() - record_context.last_record_scn_.convert_to_ts()*1000L < interval) { \
} else if (OB_FAIL(t.get_ls_array(array))) { \
ARCHIVE_LOG(WARN, "get ls array failed", K(ret), K(task_type)); \

View File

@ -43,19 +43,23 @@ ObDataDictMetaInfoItem::~ObDataDictMetaInfoItem()
void ObDataDictMetaInfoItem::reset()
{
version_ = 0;
snapshot_scn_ = share::OB_INVALID_SCN_VAL;
start_lsn_ = palf::LOG_INVALID_LSN_VAL;
end_lsn_ = palf::LOG_INVALID_LSN_VAL;
end_scn_ = share::OB_INVALID_SCN_VAL;
}
void ObDataDictMetaInfoItem::reset(
const uint64_t snapshot_scn,
const uint64_t start_lsn,
const uint64_t end_lsn)
const uint64_t end_lsn,
const int64_t end_scn)
{
snapshot_scn_ = snapshot_scn;
start_lsn_ = start_lsn;
end_lsn_ = end_lsn;
end_scn_ = end_scn;
}
DEFINE_SERIALIZE(ObDataDictMetaInfoItem)
@ -65,7 +69,8 @@ DEFINE_SERIALIZE(ObDataDictMetaInfoItem)
LST_DO_CODE(OB_UNIS_ENCODE,
snapshot_scn_,
start_lsn_,
end_lsn_);
end_lsn_,
end_scn_);
return ret;
}
@ -74,10 +79,26 @@ DEFINE_DESERIALIZE(ObDataDictMetaInfoItem)
{
int ret = OB_SUCCESS;
LST_DO_CODE(OB_UNIS_DECODE,
snapshot_scn_,
start_lsn_,
end_lsn_);
if (OB_UNLIKELY(version_ < 1)) {
ret = OB_VERSION_NOT_MATCH;
DDLOG(WARN, "expect valid version while deserializing DictMetaInfo", KR(ret), K_(version));
} else {
LST_DO_CODE(OB_UNIS_DECODE,
snapshot_scn_,
start_lsn_,
end_lsn_);
if (OB_FAIL(ret)) {
} else {
// deserialize field added in version = 2
if (version_ > 1) {
if (OB_FAIL(common::serialization::decode(buf, data_len, pos, end_scn_))) {
DDLOG(WARN, "deserialize end_scn field failed", KR(ret), K(data_len), K(pos), K(version_));
}
}
// add deserialize logic here for newer version.
}
}
return ret;
}
@ -89,7 +110,8 @@ DEFINE_GET_SERIALIZE_SIZE(ObDataDictMetaInfoItem)
LST_DO_CODE(OB_UNIS_ADD_LEN,
snapshot_scn_,
start_lsn_,
end_lsn_);
end_lsn_,
end_scn_);
return len;
}
@ -332,6 +354,8 @@ DEFINE_DESERIALIZE(ObDataDictMetaInfo)
}
for (int32_t i = 0; OB_SUCC(ret) && i < item_cnt && pos < data_len; i++) {
ObDataDictMetaInfoItem item;
item.set_version(header_.get_meta_version());
if (OB_FAIL(item.deserialize(buf, data_len, pos))) {
DDLOG(WARN, "datadict metainfo item deserialize failed", K(ret),
K(buf), K(data_len), K(pos));
@ -346,7 +370,7 @@ DEFINE_DESERIALIZE(ObDataDictMetaInfo)
/////////////////////////////////// MetaInfoQueryHelper ///////////////////////////////////
const char *MetaInfoQueryHelper::QUERY_META_INFO_SQL_STR =
"SELECT snapshot_scn, start_lsn, end_lsn FROM %s ORDER BY snapshot_scn DESC;";
"SELECT snapshot_scn, ora_rowscn as end_scn, start_lsn, end_lsn FROM %s ORDER BY snapshot_scn DESC;";
const char *MetaInfoQueryHelper::DATA_DICT_META_TABLE_NAME =
share::OB_ALL_DATA_DICTIONARY_IN_LOG_TNAME;
@ -443,8 +467,8 @@ int MetaInfoQueryHelper::get_data_dict_meta_info_(const share::SCN &base_scn, Da
SMART_VAR(ObISQLClient::ReadResult, result) {
if (OB_FAIL(sql.assign_fmt(QUERY_META_INFO_SQL_STR, DATA_DICT_META_TABLE_NAME))) {
DDLOG(WARN, "assign format to sqlstring failed", K(ret), K(QUERY_META_INFO_SQL_STR),
K(DATA_DICT_META_TABLE_NAME));
DDLOG(WARN, "assign format to sqlstring failed", KR(ret), KCSTRING(QUERY_META_INFO_SQL_STR),
KCSTRING(DATA_DICT_META_TABLE_NAME));
} else if (OB_FAIL(sql_proxy_.read(result, tenant_id_, sql.ptr()))) {
DDLOG(WARN, "sql proxy failed to read result when querying datadict metainfo", K(ret),
K(tenant_id_), "sql", sql.ptr());
@ -484,10 +508,12 @@ int MetaInfoQueryHelper::parse_record_from_result_(
// | Field | Type |
// +--------------+---------------------+
// | snapshot_scn | bigint(20) unsigned |
// | end_scn | bigint(20) unsigned |
// | start_lsn | bigint(20) unsigned |
// | end_lsn | bigint(20) unsigned |
// +--------------+---------------------+
EXTRACT_UINT_FIELD_MYSQL(result, "snapshot_scn", item.snapshot_scn_, uint64_t);
EXTRACT_INT_FIELD_MYSQL(result, "end_scn", item.end_scn_, int64_t);
EXTRACT_UINT_FIELD_MYSQL(result, "start_lsn", item.start_lsn_, uint64_t);
EXTRACT_UINT_FIELD_MYSQL(result, "end_lsn", item.end_lsn_, uint64_t);
// only use record generated after base_scn.

View File

@ -36,21 +36,30 @@ public:
void reset(
const uint64_t snapshot_scn,
const uint64_t start_lsn,
const uint64_t end_lsn);
const uint64_t end_lsn,
const int64_t end_scn);
bool operator==(const ObDataDictMetaInfoItem &that) const {
return snapshot_scn_ == that.snapshot_scn_ &&
start_lsn_ == that.start_lsn_ &&
end_lsn_ == that.end_lsn_;
end_lsn_ == that.end_lsn_ &&
end_scn_ == that.end_scn_;
}
void set_version(const int16_t version) { version_ = version; }
NEED_SERIALIZE_AND_DESERIALIZE;
TO_STRING_KV(K_(snapshot_scn), K_(start_lsn), K_(end_lsn));
TO_STRING_KV(K_(snapshot_scn), K_(end_scn), K_(start_lsn), K_(end_lsn));
public:
// Decide deserialize behavious;
// set while deserialize ObDataDictMetaInfo, value should be same with meta_version_ in ObDataDictMetaInfoHeader;
// NOTICE: SHOULD NOT SERIALIZE THIS FIELD.
int16_t version_;
uint64_t snapshot_scn_;
uint64_t start_lsn_;
uint64_t end_lsn_;
int64_t end_scn_; // generate from ora_rowscn(int64_t), add at meta_version = 2;
};
typedef ObSEArray<ObDataDictMetaInfoItem, 16> DataDictMetaInfoItemArr;
@ -59,7 +68,7 @@ class ObDataDictMetaInfoHeader
{
public:
static const int16_t DATADICT_METAINFO_HEADER_MAGIC = 0x4444; // hex of "DD"
static const int64_t DATADICT_METAINFO_META_VERSION = 0x0001; // version = 1
static const int64_t DATADICT_METAINFO_META_VERSION = 0x0002; // version = 1
public:
ObDataDictMetaInfoHeader();
~ObDataDictMetaInfoHeader();
@ -73,7 +82,7 @@ public:
const int64_t data_size);
uint64_t get_tenant_id() const { return tenant_id_; }
OB_INLINE int32_t get_meta_version() const {
OB_INLINE int16_t get_meta_version() const {
return meta_version_;
}
OB_INLINE int32_t get_item_count() const {
@ -132,7 +141,7 @@ public:
int64_t get_data_size() const {
return header_.get_data_size();
}
int32_t get_meta_version() const {
int16_t get_meta_version() const {
return header_.get_meta_version();
}

View File

@ -28,6 +28,7 @@ public:
virtual ~ObCDCQueryResult() { affect_rows_ = 0; }
public:
OB_INLINE void inc_affect_rows() { affect_rows_++; }
OB_INLINE int64_t get_affect_rows() const { return affect_rows_; }
OB_INLINE bool has_data() const { return affect_rows_ > 0; }
OB_INLINE bool is_empty() const { return ! has_data(); }
OB_INLINE T& get_data() const { return data_; }
@ -81,7 +82,7 @@ int ObCDCTenantQuery<T>::query(const uint64_t tenant_id, ObCDCQueryResult<T> &qu
} else {
bool query_done = false;
int retry_cnt = 0;
const int64_t retry_fail_sleep_time = 1 * _MSEC_;
const int64_t retry_fail_sleep_time = 100 * _MSEC_;
const int64_t retry_warn_interval = 5 * _SEC_;
const int64_t start_time = get_timestamp();
const int64_t end_time = start_time + retry_timeout;

View File

@ -135,7 +135,7 @@ public:
"part trans task prealloc page count");
// Log_level=INFO in the startup scenario, and then optimize the schema to WARN afterwards
DEF_STR(init_log_level, OB_CLUSTER_PARAMETER, "ALL.*:INFO;PALF.*:WARN;SHARE.SCHEMA:INFO", "log level: DEBUG, TRACE, INFO, WARN, USER_ERR, ERROR");
DEF_STR(log_level, OB_CLUSTER_PARAMETER, "ALL.*:INFO;PALF.*:WARN;SHARE.SCHEMA:WARN", "log level: DEBUG, TRACE, INFO, WARN, USER_ERR, ERROR");
DEF_STR(log_level, OB_CLUSTER_PARAMETER, "ALL.*:INFO;PALF.*:WARN;SHARE.SCHEMA:WARN;CLOG.*:WARN;STORAGE.*:WARN;ARCHIVE.*:WARN", "log level: DEBUG, TRACE, INFO, WARN, USER_ERR, ERROR");
// root server info for oblog, seperated by `;` between multi rootserver, a root server info format as `ip:rpc_port:sql_port`
DEF_STR(rootserver_list, OB_CLUSTER_PARAMETER, "|", "OB RootServer list");
DEF_STR(cluster_url, OB_CLUSTER_PARAMETER, "|", "OB configure url");

View File

@ -1278,6 +1278,10 @@ int ObLogInstance::config_tenant_mgr_(const int64_t start_tstamp_ns,
GET_SCHEMA_TIMEOUT_ON_START_UP,
add_tenant_succ))) {
LOG_ERROR("add_tenant fail", KR(ret), K(start_tstamp_ns), K(sys_schema_version));
} else if (! add_tenant_succ) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("[FATAL] [LAUNCH] ADD_TENANT WITH NO ALIVE SERVER MODE FAILED", KR(ret),
K(start_tstamp_ns), K_(refresh_mode), K_(fetching_mode));
}
} else {
if (OB_FAIL(tenant_mgr_->add_all_tenants(

View File

@ -14,7 +14,7 @@
#include "ob_log_instance.h" // TCTX
#include "ob_log_meta_data_queryer.h"
#include "lib/mysqlclient/ob_isql_client.h" // ObISQLClient
#include "lib/mysqlclient/ob_mysql_proxy.h" // ObISQLClient
#include "lib/mysqlclient/ob_mysql_result.h" // ObMySQLResult
#include "lib/string/ob_sql_string.h" // ObSqlString
#include "share/inner_table/ob_inner_table_schema_constants.h" // OB_***_TNAME
@ -24,201 +24,64 @@ namespace oceanbase
{
namespace libobcdc
{
ObLogMetaDataSQLQueryer::ObLogMetaDataSQLQueryer() :
is_inited_(false),
is_across_cluster_(false),
cluster_id_(OB_INVALID_CLUSTER_ID),
sql_proxy_(NULL)
{
}
// TODO: Currently query ORA_ROWSCN from inner_table, however not support by oracle tenant in obcdc tenant_sync_mode.
// Wait observer support query ORA_ROWSCN by DBA VIEW
// and DBA_OB_DATA_DICTIONARY_IN_LOG VIEW will add column support query ORA_ROWSCN as END_SCN;
// TODO: to compatible with oracle mode in tenant_sync_mode, use REPORT_TIME as end_scn for temporary;
const char* ObLogMetaDataSQLQueryer::QUERY_SQL_FORMAT = "SELECT SNAPSHOT_SCN, %s AS END_SCN, START_LSN, END_LSN "
"FROM %s WHERE SNAPSHOT_SCN <= %lu ORDER BY SNAPSHOT_SCN DESC %s";
ObLogMetaDataSQLQueryer::~ObLogMetaDataSQLQueryer()
{
destroy();
}
int ObLogMetaDataSQLQueryer::init(
const int64_t cluster_id,
const bool is_across_cluster,
common::ObISQLClient &sql_proxy)
int ObLogMetaDataSQLQueryer::get_data_dict_in_log_info(
const uint64_t tenant_id,
const int64_t start_timestamp_ns,
logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info)
{
int ret = OB_SUCCESS;
const static int64_t QUERY_TIMEOUT = 1 * _MIN_;
ObCDCQueryResult<logfetcher::DataDictionaryInLogInfo> query_result(data_dict_in_log_info);
const uint64_t query_tenant_id = TCTX.is_tenant_sync_mode() ? OB_SYS_TENANT_ID : tenant_id;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_ERROR("ObLogMetaDataSQLQueryer has inited", KR(ret));
if (OB_FAIL(query(query_tenant_id, query_result, QUERY_TIMEOUT))) {
LOG_ERROR("query data dict meta info failed", KR(ret), K(tenant_id), K(query_tenant_id));
} else if (query_result.is_empty()) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("can't find datadict snapshot_scn older than start_timestamp_ns", KR(ret), K(tenant_id), K(start_timestamp_ns));
} else {
is_across_cluster_ = is_across_cluster;
cluster_id_ = cluster_id;
sql_proxy_ = &sql_proxy;
is_inited_ = true;
LOG_INFO("ObLogMetaDataSQLQueryer init success", K(cluster_id), K(is_across_cluster));
LOG_INFO("get_data_dict_in_log_info succ", K(tenant_id), K(start_timestamp_ns), K(data_dict_in_log_info));
}
return ret;
}
void ObLogMetaDataSQLQueryer::destroy()
{
is_inited_ = false;
is_across_cluster_ = false;
cluster_id_ = OB_INVALID_CLUSTER_ID;
sql_proxy_ = NULL;
}
// Such as:
// select snapshot_scn, start_lsn, end_lsn from __all_virtual_data_dictionary_in_log where tenant_id=1004 and snapshot_scn <= 1669963370971342544 ORDER BY snapshot_scn DESC limit 1;
int ObLogMetaDataSQLQueryer::get_data_dict_in_log_info(
const uint64_t tenant_id,
const int64_t start_timstamp_ns,
int64_t &record_count,
logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info)
int ObLogMetaDataSQLQueryer::build_sql_statement_(const uint64_t tenant_id, ObSqlString &sql)
{
int ret = OB_SUCCESS;
const bool is_oracle_mode = TCTX.mysql_proxy_.is_oracle_mode();
const char *select_fields = "SNAPSHOT_SCN, START_LSN, END_LSN";
const char *limit_expr = is_oracle_mode ? "FETCH FIRST 1 ROWS ONLY" : "LIMIT 1";
// __all_virtual_data_dictionary_in_log
const char *data_dictionary_in_log_name = share::OB_ALL_VIRTUAL_DATA_DICTIONARY_IN_LOG_TNAME;
uint64_t query_tenent_id = OB_INVALID_TENANT_ID;
const char* limit_expr = is_oracle_mode ? "FETCH FIRST 1 ROWS ONLY" : "LIMIT 1";
const char* end_scn = is_oracle_mode ? "TIMESTAMP_TO_SCN(REPORT_TIME)" : "ORA_ROWSCN";
const char* table_name = is_oracle_mode ? share::OB_DBA_OB_DATA_DICTIONARY_IN_LOG_TNAME : share::OB_ALL_DATA_DICTIONARY_IN_LOG_TNAME;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObLogMetaDataSQLQueryer is not initialized", KR(ret));
} else if (OB_INVALID_TENANT_ID == tenant_id
|| OB_INVALID_TIMESTAMP == start_timstamp_ns) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), KT(tenant_id), K(start_timstamp_ns));
} else {
ObSqlString sql;
record_count = 0;
if (TCTX.is_tenant_sync_mode()) {
if (OB_FAIL(sql.assign_fmt(
"SELECT %s FROM %s"
" WHERE SNAPSHOT_SCN <= %lu"
" ORDER BY SNAPSHOT_SCN DESC %s",
select_fields, share::OB_DBA_OB_DATA_DICTIONARY_IN_LOG_TNAME,
start_timstamp_ns, limit_expr))) {
LOG_WARN("assign sql string failed", KR(ret), K(tenant_id), K(is_oracle_mode));
} else {
query_tenent_id = tenant_id;
}
} else {
// OB4.1 doesn't have CDB_OB_DATA_DICTIONARY_IN_LOG VIEW,should only query with virtual table
if (OB_FAIL(sql.assign_fmt(
"SELECT %s FROM %s"
" WHERE TENANT_ID = %lu AND SNAPSHOT_SCN <= %lu"
" ORDER BY SNAPSHOT_SCN DESC LIMIT 1",
select_fields, share::OB_ALL_VIRTUAL_DATA_DICTIONARY_IN_LOG_TNAME,
tenant_id, start_timstamp_ns))) {
LOG_WARN("assign sql string failed", KR(ret), K(tenant_id));
} else {
// Use OB_SYS_TENANT_ID to query
query_tenent_id = OB_SYS_TENANT_ID;
}
}
if (OB_SUCC(ret)) {
SMART_VAR(ObISQLClient::ReadResult, result) {
if (OB_FAIL(do_query_(OB_SYS_TENANT_ID, sql, result))) {
LOG_WARN("do_query_ failed", KR(ret), K(cluster_id_), K(tenant_id), "sql", sql.ptr());
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get mysql result failed", KR(ret));
} else if (OB_FAIL(get_records_template_(
*result.get_result(),
data_dict_in_log_info,
"DataDictionaryInLogInfo",
record_count))) {
LOG_WARN("construct data_dict_in_log_info failed", KR(ret), K(data_dict_in_log_info));
}
} // SMART_VAR
}
if (OB_FAIL(sql.assign_fmt(QUERY_SQL_FORMAT, end_scn, table_name, start_timstamp_ns_, limit_expr))) {
LOG_ERROR("format sql failed", KR(ret), K(tenant_id), K(is_oracle_mode), KCSTRING(limit_expr), KCSTRING(table_name));
}
return ret;
}
int ObLogMetaDataSQLQueryer::do_query_(
const uint64_t tenant_id,
ObSqlString &sql,
ObISQLClient::ReadResult &result)
int ObLogMetaDataSQLQueryer::parse_row_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult<logfetcher::DataDictionaryInLogInfo> &result)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObLogMetaDataSQLQueryer not init", KR(ret));
} else if (OB_ISNULL(sql_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("SqlProxy is NULL", KR(ret));
} else if (is_across_cluster_) {
if (OB_FAIL(sql_proxy_->read(result, cluster_id_, tenant_id, sql.ptr()))) {
LOG_WARN("execute sql failed", KR(ret), K(cluster_id_), K(tenant_id), "sql", sql.ptr());
}
} else {
if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
LOG_WARN("execute sql failed", KR(ret), K(tenant_id), "sql", sql.ptr());
}
}
return ret;
}
template <typename RecordsType>
int ObLogMetaDataSQLQueryer::get_records_template_(
common::sqlclient::ObMySQLResult &res,
RecordsType &records,
const char *event,
int64_t &record_count)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObLogMetaDataSQLQueryer not init", KR(ret));
} else {
record_count = 0;
while (OB_SUCC(ret)) {
if (OB_FAIL(res.next())) {
if (OB_ITER_END == ret) {
// End of iteration
} else {
LOG_WARN("get next result failed", KR(ret), K(event));
}
} else if (OB_FAIL(parse_record_from_row_(res, records))) {
LOG_WARN("parse_record_from_row_ failed", KR(ret), K(records));
} else {
record_count++;
}
} // while
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
}
return ret;
}
int ObLogMetaDataSQLQueryer::parse_record_from_row_(
common::sqlclient::ObMySQLResult &res,
logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info)
{
int ret = OB_SUCCESS;
int64_t snapshot_scn_int = 0;
int64_t end_scn_int = 0;
int64_t start_lsn_int = 0;
int64_t end_lsn_int = 0;
(void)GET_COL_IGNORE_NULL(res.get_int, "SNAPSHOT_SCN", snapshot_scn_int);
(void)GET_COL_IGNORE_NULL(res.get_int, "START_LSN", start_lsn_int);
(void)GET_COL_IGNORE_NULL(res.get_int, "END_LSN", end_lsn_int);
data_dict_in_log_info.reset(snapshot_scn_int, palf::LSN(start_lsn_int), palf::LSN(end_lsn_int));
EXTRACT_UINT_FIELD_MYSQL(sql_result, "SNAPSHOT_SCN", snapshot_scn_int, int64_t);
EXTRACT_UINT_FIELD_MYSQL(sql_result, "END_SCN", end_scn_int, int64_t);
EXTRACT_UINT_FIELD_MYSQL(sql_result, "START_LSN", start_lsn_int, int64_t);
EXTRACT_UINT_FIELD_MYSQL(sql_result, "END_LSN", end_lsn_int, int64_t);
result.get_data().reset(snapshot_scn_int, end_scn_int, palf::LSN(start_lsn_int), palf::LSN(end_lsn_int));
return ret;
}

View File

@ -13,13 +13,14 @@
#ifndef OCEANBASE_OB_LOG_META_SQL_QUERYER_H_
#define OCEANBASE_OB_LOG_META_SQL_QUERYER_H_
#include "lib/mysqlclient/ob_isql_client.h" // ObISQLClient
#include "ob_cdc_tenant_query.h"
#include "logservice/logfetcher/ob_log_data_dictionary_in_log_table.h"
namespace oceanbase
{
namespace common
{
class ObMySQLProxy;
namespace sqlclient
{
class ObMySQLResult;
@ -28,50 +29,26 @@ class ObMySQLResult;
namespace libobcdc
{
class ObLogMetaDataSQLQueryer
// query OB_SYS_TENANT_ID in tenant_sync_mode and query specific tenant_id in cluster_sync_mode
class ObLogMetaDataSQLQueryer : public ObCDCTenantQuery<logfetcher::DataDictionaryInLogInfo>
{
public:
ObLogMetaDataSQLQueryer();
virtual ~ObLogMetaDataSQLQueryer();
int init(
const int64_t cluster_id,
const bool is_across_cluster,
common::ObISQLClient &sql_proxy);
bool is_inited() const { return is_inited_; }
void destroy();
ObLogMetaDataSQLQueryer(const int64_t start_timstamp_ns, common::ObMySQLProxy &sql_proxy)
: ObCDCTenantQuery(sql_proxy), start_timstamp_ns_(start_timstamp_ns) {}
~ObLogMetaDataSQLQueryer() { start_timstamp_ns_ = OB_INVALID_TIMESTAMP; }
public:
int get_data_dict_in_log_info(
const uint64_t tenant_id,
const int64_t start_timstamp_ns,
int64_t &record_count,
const int64_t start_timestamp_ns,
logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info);
private:
int do_query_(const uint64_t tenant_id,
ObSqlString &sql,
ObISQLClient::ReadResult &result);
template <typename RecordsType>
int get_records_template_(
common::sqlclient::ObMySQLResult &res,
RecordsType &records,
const char *event,
int64_t &record_count);
// logfetcher::DataDictionaryInLogInfo
// @param [in] res, result read from __all_virtual_data_dictionary_in_log
// @param [out] data_dict_in_log_info
int parse_record_from_row_(
common::sqlclient::ObMySQLResult &res,
logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info);
int build_sql_statement_(const uint64_t tenant_id, ObSqlString &sql) override;
int parse_row_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult<logfetcher::DataDictionaryInLogInfo> &result) override;
private:
bool is_inited_; // whether this class is inited
bool is_across_cluster_; // whether the SQL query across cluster
int64_t cluster_id_; // ClusterID
common::ObISQLClient *sql_proxy_; // sql_proxy to use
static const char* QUERY_SQL_FORMAT;
private:
int64_t start_timstamp_ns_;
DISALLOW_COPY_AND_ASSIGN(ObLogMetaDataSQLQueryer);
};

View File

@ -13,6 +13,7 @@
#include "ob_log_meta_data_service.h"
#include "ob_log_instance.h"
#include "ob_log_fetching_mode.h"
#include "ob_log_meta_data_queryer.h"
#include "share/backup/ob_archive_struct.h"
#include "logservice/restoreservice/ob_log_archive_piece_mgr.h"
#include "logservice/data_dictionary/ob_data_dict_meta_info.h"
@ -39,7 +40,6 @@ ObLogMetaDataService &ObLogMetaDataService::get_instance()
ObLogMetaDataService::ObLogMetaDataService() :
is_inited_(false),
fetcher_(),
sql_queryer_(),
baseline_loader_(),
incremental_replayer_(),
fetcher_dispatcher_()
@ -56,7 +56,7 @@ int ObLogMetaDataService::init(
const ClientFetchingMode fetching_mode,
const share::ObBackupPathString &archive_dest,
IObLogSysLsTaskHandler *sys_ls_handler,
ObISQLClient *proxy,
common::ObMySQLProxy *proxy,
IObLogErrHandler *err_handler,
const int64_t cluster_id,
const ObLogConfig &cfg,
@ -67,8 +67,6 @@ int ObLogMetaDataService::init(
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_ERROR("init twice", KR(ret));
} else if (OB_FAIL(sql_queryer_.init(cluster_id, false, *proxy))) {
LOG_ERROR("ObLogMetaDataSQLQueryer init fail", KR(ret));
} else if (OB_FAIL(baseline_loader_.init(cfg))) {
LOG_ERROR("ObLogMetaDataBaselineLoader init fail", KR(ret));
} else if (OB_FAIL(incremental_replayer_.init())) {
@ -93,7 +91,6 @@ void ObLogMetaDataService::destroy()
{
if (IS_INIT) {
fetcher_.destroy();
sql_queryer_.destroy();
baseline_loader_.destroy();
incremental_replayer_.destroy();
fetcher_dispatcher_.destroy();
@ -125,7 +122,7 @@ int ObLogMetaDataService::refresh_baseline_meta_data(
LOG_ERROR("log_meta_data_service get_data_dict_in_log_info failed", KR(ret), K(tenant_id));
} else {
ISTAT("get_data_dict_in_log_info success", K(tenant_id), K(start_timestamp_ns), K(data_dict_in_log_info));
start_parameters.reset(data_dict_in_log_info.snapshot_scn_, start_timestamp_ns, data_dict_in_log_info);
start_parameters.reset(data_dict_in_log_info.snapshot_scn_, std::max(start_timestamp_ns, data_dict_in_log_info.end_scn_), data_dict_in_log_info);
}
if (OB_SUCC(ret)) {
@ -260,14 +257,18 @@ int ObLogMetaDataService::get_data_dict_in_log_info_in_archive_(
if (target_index >= item_arr_size) {
ret = OB_ENTRY_NOT_EXIST;
LOG_ERROR("not datadict metainfo item older than start_timestamp_ns", K(target_index), K(data_dict_meta_info),
LOG_ERROR("can't find datadict snapshot_scn older than start_timestamp_ns", K(target_index), K(data_dict_meta_info),
K(start_timestamp_ns));
} else {
datadict::ObDataDictMetaInfoItem data_dict_item;
data_dict_item = item_arr.at(target_index);
const int64_t end_scn_val = (data_dict_item.end_scn_ == share::OB_INVALID_SCN_VAL) ? start_timestamp_ns : data_dict_item.end_scn_;
data_dict_in_log_info.reset(data_dict_item.snapshot_scn_,
end_scn_val,
palf::LSN(data_dict_item.start_lsn_),
palf::LSN(data_dict_item.end_lsn_));
LOG_INFO("get_data_dict_in_log_info_in_archive_ succ", K(data_dict_meta_info), K(start_timestamp_ns), K(data_dict_in_log_info));
}
}
}
@ -329,7 +330,7 @@ int ObLogMetaDataService::read_meta_info_in_archive_log_(
LOG_WARN("data dict info is not valid", KR(ret), K(data_dict_meta_info));
} else {
const uint64_t tenant_id = data_dict_meta_info.get_tenant_id();
LOG_INFO("read_meta_info_in_archive_log success", K(tenant_id), K(data_dict_meta_info));
LOG_INFO("read_meta_info_in_archive_log success", K(tenant_id), K(start_timestamp_ns), K(data_dict_meta_info));
}
}
}
@ -343,7 +344,7 @@ int ObLogMetaDataService::read_meta_info_in_archive_log_(
int ObLogMetaDataService::get_data_dict_in_log_info_(
const uint64_t tenant_id,
const int64_t start_timstamp_ns,
const int64_t start_timestamp_ns,
logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info)
{
int ret = OB_SUCCESS;
@ -355,39 +356,43 @@ int ObLogMetaDataService::get_data_dict_in_log_info_(
ret = OB_NOT_INIT;
LOG_ERROR("ObLogMetaDataService is not initialized", KR(ret));
} else {
if (is_direct_fetching_mode(fetching_mode)) {
while (OB_SUCC(ret) && ! done) {
if (OB_FAIL(get_data_dict_in_log_info_in_archive_(start_timstamp_ns, data_dict_in_log_info))) {
LOG_WARN("get_data_dict_in_log_info_in_archive_ failed", KR(ret), K(start_timstamp_ns), K(tenant_id));
while (! done) {
if (is_direct_fetching_mode(fetching_mode)) {
if (OB_FAIL(get_data_dict_in_log_info_in_archive_(start_timestamp_ns, data_dict_in_log_info))) {
LOG_WARN("get_data_dict_in_log_info_in_archive_ failed", KR(ret), K(start_timestamp_ns), K(tenant_id), K(data_dict_in_log_info));
} else {
done = true;
}
} else if (is_integrated_fetching_mode(fetching_mode)) {
common::ObMySQLProxy& sql_proxy = TCTX.is_tenant_sync_mode() ? TCTX.mysql_proxy_.get_ob_mysql_proxy() : TCTX.tenant_sql_proxy_.get_ob_mysql_proxy();
ObLogMetaDataSQLQueryer sql_queryer(start_timestamp_ns, sql_proxy);
if (OB_FAIL(ret)) {
ret = OB_SUCCESS;
ob_usleep(100L * 1000L);
}
}
} else if (is_integrated_fetching_mode(fetching_mode)) {
while (OB_SUCC(ret) && ! done) {
if (OB_FAIL(sql_queryer_.get_data_dict_in_log_info(tenant_id, start_timstamp_ns, record_count, data_dict_in_log_info))) {
LOG_WARN("sql_queryer_ get_data_dict_in_log_info fail", KR(ret), K(tenant_id), K(start_timstamp_ns),
K(record_count), K(data_dict_in_log_info));
} else if (0 == record_count) {
ret = OB_NEED_RETRY;
LOG_WARN("No valid baseline data is currently available, need retry", KR(ret), K(tenant_id), K(start_timstamp_ns));
if (OB_FAIL(sql_queryer.get_data_dict_in_log_info(tenant_id, start_timestamp_ns, data_dict_in_log_info))) {
LOG_WARN("sql_queryer get_data_dict_in_log_info fail", KR(ret), K(tenant_id), K(start_timestamp_ns), K(data_dict_in_log_info));
} else {
done = true;
}
if (OB_FAIL(ret)) {
ret = OB_SUCCESS;
ob_usleep(100L * 1000L);
}
} else {
ret = OB_NOT_SUPPORTED;
done = true;
LOG_ERROR("[FATAL]fetching mode of TCTX is not valid", KR(ret), K(fetching_mode), K(tenant_id));
}
if (OB_SUCC(ret) || done) {
} else if (OB_ENTRY_NOT_EXIST == ret) {
done = true;
LOG_ERROR("[FATAL][DATA_DICT] Can't find suitable data_dict to launch OBCDC", KR(ret), K(start_timestamp_ns));
} else {
const static int64_t RETRY_FUNC_PRINT_INTERVAL = 10 * _SEC_;
const int64_t sleep_usec_on_error = 100 * _MSEC_;
if (REACH_TIME_INTERVAL(RETRY_FUNC_PRINT_INTERVAL)) {
LOG_WARN("[DATA_DICT] retry get data_dict_in_log_info", KR(ret));
} else {
LOG_TRACE("[DATA_DICT] retry get data_dict_in_log_info", KR(ret));
}
ret = OB_SUCCESS;
ob_usleep(sleep_usec_on_error);
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("fetching mode of TCTX is not valid", KR(ret), K(fetching_mode), K(tenant_id));
}
}

View File

@ -12,7 +12,6 @@
#define OCEANBASE_LOG_META_DATA_SERVICE_H_
#include "common/ob_region.h"
#include "lib/mysqlclient/ob_isql_client.h" // ObISQLClient
#include "lib/allocator/ob_concurrent_fifo_allocator.h" // ObConcurrentFIFOAllocator
#include "share/backup/ob_backup_struct.h" // ObBackupPathString
#include "logservice/data_dictionary/ob_data_dict_meta_info.h" // ObDataDictMetaInfo
@ -21,7 +20,6 @@
#include "ob_log_task_pool.h"
#include "ob_log_entry_task_pool.h"
#include "logservice/logfetcher/ob_log_data_dictionary_in_log_table.h"
#include "ob_log_meta_data_queryer.h"
#include "ob_log_meta_data_baseline_loader.h"
#include "ob_log_meta_data_replayer.h" // ObLogMetaDataReplayer
#include "ob_log_meta_data_fetcher.h" // ObLogMetaDataFetcher
@ -29,6 +27,10 @@
namespace oceanbase
{
namespace common
{
class ObMySQLProxy;
}
namespace libobcdc
{
class IObLogSysLsTaskHandler;
@ -48,7 +50,7 @@ public:
const ClientFetchingMode fetching_mode,
const share::ObBackupPathString &archive_dest,
IObLogSysLsTaskHandler *sys_ls_handler,
common::ObISQLClient *proxy,
common::ObMySQLProxy *proxy,
IObLogErrHandler *err_handler,
const int64_t cluster_id,
const ObLogConfig &cfg,
@ -112,7 +114,6 @@ private:
private:
bool is_inited_;
ObLogMetaDataFetcher fetcher_;
ObLogMetaDataSQLQueryer sql_queryer_;
ObLogMetaDataBaselineLoader baseline_loader_;
ObLogMetaDataReplayer incremental_replayer_;
ObLogMetaDataFetcherDispatcher fetcher_dispatcher_;

View File

@ -25,6 +25,7 @@ struct DataDictionaryInLogInfo
bool is_valid() const
{
return common::OB_INVALID_TIMESTAMP != snapshot_scn_
&& common::OB_INVALID_TIMESTAMP != end_scn_
&& start_lsn_.is_valid()
&& end_lsn_.is_valid();
}
@ -32,16 +33,19 @@ struct DataDictionaryInLogInfo
void reset()
{
snapshot_scn_ = common::OB_INVALID_TIMESTAMP;
end_scn_ = common::OB_INVALID_TIMESTAMP;
start_lsn_.reset();
end_lsn_.reset();
}
void reset(
const int64_t snapshot_scn,
const int64_t end_scn,
const palf::LSN &start_lsn,
const palf::LSN &end_lsn)
{
snapshot_scn_ = snapshot_scn;
end_scn_ = end_scn;
start_lsn_ = start_lsn;
end_lsn_ = end_lsn;
}
@ -49,6 +53,7 @@ struct DataDictionaryInLogInfo
DataDictionaryInLogInfo &operator=(const DataDictionaryInLogInfo &other)
{
snapshot_scn_ = other.snapshot_scn_;
end_scn_ = other.end_scn_;
start_lsn_ = other.start_lsn_;
end_lsn_ = other.end_lsn_;
return *this;
@ -56,10 +61,12 @@ struct DataDictionaryInLogInfo
TO_STRING_KV(
K_(snapshot_scn),
K_(end_scn),
K_(start_lsn),
K_(end_lsn));
int64_t snapshot_scn_;
int64_t end_scn_;
palf::LSN start_lsn_;
palf::LSN end_lsn_;
};

View File

@ -15,6 +15,7 @@
#include "gtest/gtest.h"
#include "lib/oblog/ob_log.h"
#include "logservice/palf/log_define.h"
#include "share/ob_errno.h"
#define private public
#include "logservice/data_dictionary/ob_data_dict_meta_info.h"
@ -36,12 +37,15 @@ TEST(ObDataDictMetaInfoItem, test_data_dict_meta_info_item)
int64_t pos = 0;
DataDictMetaInfoItemArr item_arr;
for (int64_t i = 0; i < item_cnt; i++) {
item.reset(random.get(), random.get(), random.get());
item.reset(random.get(), random.get(), random.get(), random.get());
EXPECT_EQ(OB_SUCCESS, item_arr.push_back(item));
EXPECT_EQ(OB_SUCCESS, item.serialize(buf, buf_size, pos));
}
int64_t deserialize_pos = 0;
EXPECT_EQ(common::OB_VERSION_NOT_MATCH, item.deserialize(buf, buf_size, deserialize_pos));
deserialize_pos = 0;
for (int64_t i = 0; i < item_cnt; i++) {
item.set_version(datadict::ObDataDictMetaInfoHeader::DATADICT_METAINFO_META_VERSION);
EXPECT_EQ(OB_SUCCESS, item.deserialize(buf, buf_size, deserialize_pos));
EXPECT_EQ(item, item_arr.at(i));
}
@ -90,7 +94,8 @@ TEST(ObDataDictMetaInfo, test_data_dict_meta_info)
while (have_enough_buffer ) {
item.reset(random.get(0, max_scn_val),
random.get(0, palf::LOG_MAX_LSN_VAL),
random.get(0, palf::LOG_MAX_LSN_VAL));
random.get(0, palf::LOG_MAX_LSN_VAL),
random.get(0, max_scn_val));
if (total_item_data_size + item.get_serialize_size() <= buffer_size - header_size) {
item_count_expected++;
total_item_data_size += item.get_serialize_size();