diff --git a/src/logservice/archiveservice/ob_ls_meta_recorder.cpp b/src/logservice/archiveservice/ob_ls_meta_recorder.cpp index fcdebd307..e5557ab92 100644 --- a/src/logservice/archiveservice/ob_ls_meta_recorder.cpp +++ b/src/logservice/archiveservice/ob_ls_meta_recorder.cpp @@ -47,6 +47,7 @@ && common::OB_ENTRY_NOT_EXIST != ret) { \ ARCHIVE_LOG(WARN, "check and get record failed", K(ret)); \ } else if (common::OB_SUCCESS == ret \ + && record_context.last_record_round_ == key.round_ \ && common::ObTimeUtility::current_time_ns() - record_context.last_record_scn_.convert_to_ts()*1000L < interval) { \ } else if (OB_FAIL(t.get_ls_array(array))) { \ ARCHIVE_LOG(WARN, "get ls array failed", K(ret), K(task_type)); \ diff --git a/src/logservice/data_dictionary/ob_data_dict_meta_info.cpp b/src/logservice/data_dictionary/ob_data_dict_meta_info.cpp index 953ff2150..444c1a103 100644 --- a/src/logservice/data_dictionary/ob_data_dict_meta_info.cpp +++ b/src/logservice/data_dictionary/ob_data_dict_meta_info.cpp @@ -43,19 +43,23 @@ ObDataDictMetaInfoItem::~ObDataDictMetaInfoItem() void ObDataDictMetaInfoItem::reset() { + version_ = 0; snapshot_scn_ = share::OB_INVALID_SCN_VAL; start_lsn_ = palf::LOG_INVALID_LSN_VAL; end_lsn_ = palf::LOG_INVALID_LSN_VAL; + end_scn_ = share::OB_INVALID_SCN_VAL; } void ObDataDictMetaInfoItem::reset( const uint64_t snapshot_scn, const uint64_t start_lsn, - const uint64_t end_lsn) + const uint64_t end_lsn, + const int64_t end_scn) { snapshot_scn_ = snapshot_scn; start_lsn_ = start_lsn; end_lsn_ = end_lsn; + end_scn_ = end_scn; } DEFINE_SERIALIZE(ObDataDictMetaInfoItem) @@ -65,7 +69,8 @@ DEFINE_SERIALIZE(ObDataDictMetaInfoItem) LST_DO_CODE(OB_UNIS_ENCODE, snapshot_scn_, start_lsn_, - end_lsn_); + end_lsn_, + end_scn_); return ret; } @@ -74,10 +79,26 @@ DEFINE_DESERIALIZE(ObDataDictMetaInfoItem) { int ret = OB_SUCCESS; - LST_DO_CODE(OB_UNIS_DECODE, - snapshot_scn_, - start_lsn_, - end_lsn_); + if (OB_UNLIKELY(version_ < 1)) { + ret = OB_VERSION_NOT_MATCH; + DDLOG(WARN, "expect valid version while deserializing DictMetaInfo", KR(ret), K_(version)); + } else { + LST_DO_CODE(OB_UNIS_DECODE, + snapshot_scn_, + start_lsn_, + end_lsn_); + + if (OB_FAIL(ret)) { + } else { + // deserialize field added in version = 2 + if (version_ > 1) { + if (OB_FAIL(common::serialization::decode(buf, data_len, pos, end_scn_))) { + DDLOG(WARN, "deserialize end_scn field failed", KR(ret), K(data_len), K(pos), K(version_)); + } + } + // add deserialize logic here for newer version. + } + } return ret; } @@ -89,7 +110,8 @@ DEFINE_GET_SERIALIZE_SIZE(ObDataDictMetaInfoItem) LST_DO_CODE(OB_UNIS_ADD_LEN, snapshot_scn_, start_lsn_, - end_lsn_); + end_lsn_, + end_scn_); return len; } @@ -332,6 +354,8 @@ DEFINE_DESERIALIZE(ObDataDictMetaInfo) } for (int32_t i = 0; OB_SUCC(ret) && i < item_cnt && pos < data_len; i++) { ObDataDictMetaInfoItem item; + item.set_version(header_.get_meta_version()); + if (OB_FAIL(item.deserialize(buf, data_len, pos))) { DDLOG(WARN, "datadict metainfo item deserialize failed", K(ret), K(buf), K(data_len), K(pos)); @@ -346,7 +370,7 @@ DEFINE_DESERIALIZE(ObDataDictMetaInfo) /////////////////////////////////// MetaInfoQueryHelper /////////////////////////////////// const char *MetaInfoQueryHelper::QUERY_META_INFO_SQL_STR = - "SELECT snapshot_scn, start_lsn, end_lsn FROM %s ORDER BY snapshot_scn DESC;"; + "SELECT snapshot_scn, ora_rowscn as end_scn, start_lsn, end_lsn FROM %s ORDER BY snapshot_scn DESC;"; const char *MetaInfoQueryHelper::DATA_DICT_META_TABLE_NAME = share::OB_ALL_DATA_DICTIONARY_IN_LOG_TNAME; @@ -443,8 +467,8 @@ int MetaInfoQueryHelper::get_data_dict_meta_info_(const share::SCN &base_scn, Da SMART_VAR(ObISQLClient::ReadResult, result) { if (OB_FAIL(sql.assign_fmt(QUERY_META_INFO_SQL_STR, DATA_DICT_META_TABLE_NAME))) { - DDLOG(WARN, "assign format to sqlstring failed", K(ret), K(QUERY_META_INFO_SQL_STR), - K(DATA_DICT_META_TABLE_NAME)); + DDLOG(WARN, "assign format to sqlstring failed", KR(ret), KCSTRING(QUERY_META_INFO_SQL_STR), + KCSTRING(DATA_DICT_META_TABLE_NAME)); } else if (OB_FAIL(sql_proxy_.read(result, tenant_id_, sql.ptr()))) { DDLOG(WARN, "sql proxy failed to read result when querying datadict metainfo", K(ret), K(tenant_id_), "sql", sql.ptr()); @@ -484,10 +508,12 @@ int MetaInfoQueryHelper::parse_record_from_result_( // | Field | Type | // +--------------+---------------------+ // | snapshot_scn | bigint(20) unsigned | + // | end_scn | bigint(20) unsigned | // | start_lsn | bigint(20) unsigned | // | end_lsn | bigint(20) unsigned | // +--------------+---------------------+ EXTRACT_UINT_FIELD_MYSQL(result, "snapshot_scn", item.snapshot_scn_, uint64_t); + EXTRACT_INT_FIELD_MYSQL(result, "end_scn", item.end_scn_, int64_t); EXTRACT_UINT_FIELD_MYSQL(result, "start_lsn", item.start_lsn_, uint64_t); EXTRACT_UINT_FIELD_MYSQL(result, "end_lsn", item.end_lsn_, uint64_t); // only use record generated after base_scn. diff --git a/src/logservice/data_dictionary/ob_data_dict_meta_info.h b/src/logservice/data_dictionary/ob_data_dict_meta_info.h index da7da9403..ebcb792ef 100644 --- a/src/logservice/data_dictionary/ob_data_dict_meta_info.h +++ b/src/logservice/data_dictionary/ob_data_dict_meta_info.h @@ -36,21 +36,30 @@ public: void reset( const uint64_t snapshot_scn, const uint64_t start_lsn, - const uint64_t end_lsn); + const uint64_t end_lsn, + const int64_t end_scn); bool operator==(const ObDataDictMetaInfoItem &that) const { return snapshot_scn_ == that.snapshot_scn_ && start_lsn_ == that.start_lsn_ && - end_lsn_ == that.end_lsn_; + end_lsn_ == that.end_lsn_ && + end_scn_ == that.end_scn_; } + void set_version(const int16_t version) { version_ = version; } + NEED_SERIALIZE_AND_DESERIALIZE; - TO_STRING_KV(K_(snapshot_scn), K_(start_lsn), K_(end_lsn)); + TO_STRING_KV(K_(snapshot_scn), K_(end_scn), K_(start_lsn), K_(end_lsn)); public: + // Decide deserialize behavious; + // set while deserialize ObDataDictMetaInfo, value should be same with meta_version_ in ObDataDictMetaInfoHeader; + // NOTICE: SHOULD NOT SERIALIZE THIS FIELD. + int16_t version_; uint64_t snapshot_scn_; uint64_t start_lsn_; uint64_t end_lsn_; + int64_t end_scn_; // generate from ora_rowscn(int64_t), add at meta_version = 2; }; typedef ObSEArray DataDictMetaInfoItemArr; @@ -59,7 +68,7 @@ class ObDataDictMetaInfoHeader { public: static const int16_t DATADICT_METAINFO_HEADER_MAGIC = 0x4444; // hex of "DD" - static const int64_t DATADICT_METAINFO_META_VERSION = 0x0001; // version = 1 + static const int64_t DATADICT_METAINFO_META_VERSION = 0x0002; // version = 1 public: ObDataDictMetaInfoHeader(); ~ObDataDictMetaInfoHeader(); @@ -73,7 +82,7 @@ public: const int64_t data_size); uint64_t get_tenant_id() const { return tenant_id_; } - OB_INLINE int32_t get_meta_version() const { + OB_INLINE int16_t get_meta_version() const { return meta_version_; } OB_INLINE int32_t get_item_count() const { @@ -132,7 +141,7 @@ public: int64_t get_data_size() const { return header_.get_data_size(); } - int32_t get_meta_version() const { + int16_t get_meta_version() const { return header_.get_meta_version(); } diff --git a/src/logservice/libobcdc/src/ob_cdc_tenant_query.h b/src/logservice/libobcdc/src/ob_cdc_tenant_query.h index 68fc1b89d..aabe40916 100644 --- a/src/logservice/libobcdc/src/ob_cdc_tenant_query.h +++ b/src/logservice/libobcdc/src/ob_cdc_tenant_query.h @@ -28,6 +28,7 @@ public: virtual ~ObCDCQueryResult() { affect_rows_ = 0; } public: OB_INLINE void inc_affect_rows() { affect_rows_++; } + OB_INLINE int64_t get_affect_rows() const { return affect_rows_; } OB_INLINE bool has_data() const { return affect_rows_ > 0; } OB_INLINE bool is_empty() const { return ! has_data(); } OB_INLINE T& get_data() const { return data_; } @@ -81,7 +82,7 @@ int ObCDCTenantQuery::query(const uint64_t tenant_id, ObCDCQueryResult &qu } else { bool query_done = false; int retry_cnt = 0; - const int64_t retry_fail_sleep_time = 1 * _MSEC_; + const int64_t retry_fail_sleep_time = 100 * _MSEC_; const int64_t retry_warn_interval = 5 * _SEC_; const int64_t start_time = get_timestamp(); const int64_t end_time = start_time + retry_timeout; diff --git a/src/logservice/libobcdc/src/ob_log_config.h b/src/logservice/libobcdc/src/ob_log_config.h index 569800899..479fa4057 100644 --- a/src/logservice/libobcdc/src/ob_log_config.h +++ b/src/logservice/libobcdc/src/ob_log_config.h @@ -135,7 +135,7 @@ public: "part trans task prealloc page count"); // Log_level=INFO in the startup scenario, and then optimize the schema to WARN afterwards DEF_STR(init_log_level, OB_CLUSTER_PARAMETER, "ALL.*:INFO;PALF.*:WARN;SHARE.SCHEMA:INFO", "log level: DEBUG, TRACE, INFO, WARN, USER_ERR, ERROR"); - DEF_STR(log_level, OB_CLUSTER_PARAMETER, "ALL.*:INFO;PALF.*:WARN;SHARE.SCHEMA:WARN", "log level: DEBUG, TRACE, INFO, WARN, USER_ERR, ERROR"); + DEF_STR(log_level, OB_CLUSTER_PARAMETER, "ALL.*:INFO;PALF.*:WARN;SHARE.SCHEMA:WARN;CLOG.*:WARN;STORAGE.*:WARN;ARCHIVE.*:WARN", "log level: DEBUG, TRACE, INFO, WARN, USER_ERR, ERROR"); // root server info for oblog, seperated by `;` between multi rootserver, a root server info format as `ip:rpc_port:sql_port` DEF_STR(rootserver_list, OB_CLUSTER_PARAMETER, "|", "OB RootServer list"); DEF_STR(cluster_url, OB_CLUSTER_PARAMETER, "|", "OB configure url"); diff --git a/src/logservice/libobcdc/src/ob_log_instance.cpp b/src/logservice/libobcdc/src/ob_log_instance.cpp index c2cddd80c..b79c5fbdd 100644 --- a/src/logservice/libobcdc/src/ob_log_instance.cpp +++ b/src/logservice/libobcdc/src/ob_log_instance.cpp @@ -1278,6 +1278,10 @@ int ObLogInstance::config_tenant_mgr_(const int64_t start_tstamp_ns, GET_SCHEMA_TIMEOUT_ON_START_UP, add_tenant_succ))) { LOG_ERROR("add_tenant fail", KR(ret), K(start_tstamp_ns), K(sys_schema_version)); + } else if (! add_tenant_succ) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("[FATAL] [LAUNCH] ADD_TENANT WITH NO ALIVE SERVER MODE FAILED", KR(ret), + K(start_tstamp_ns), K_(refresh_mode), K_(fetching_mode)); } } else { if (OB_FAIL(tenant_mgr_->add_all_tenants( diff --git a/src/logservice/libobcdc/src/ob_log_meta_data_queryer.cpp b/src/logservice/libobcdc/src/ob_log_meta_data_queryer.cpp index 30145a7bb..dd1faac35 100644 --- a/src/logservice/libobcdc/src/ob_log_meta_data_queryer.cpp +++ b/src/logservice/libobcdc/src/ob_log_meta_data_queryer.cpp @@ -14,7 +14,7 @@ #include "ob_log_instance.h" // TCTX #include "ob_log_meta_data_queryer.h" -#include "lib/mysqlclient/ob_isql_client.h" // ObISQLClient +#include "lib/mysqlclient/ob_mysql_proxy.h" // ObISQLClient #include "lib/mysqlclient/ob_mysql_result.h" // ObMySQLResult #include "lib/string/ob_sql_string.h" // ObSqlString #include "share/inner_table/ob_inner_table_schema_constants.h" // OB_***_TNAME @@ -24,201 +24,64 @@ namespace oceanbase { namespace libobcdc { -ObLogMetaDataSQLQueryer::ObLogMetaDataSQLQueryer() : - is_inited_(false), - is_across_cluster_(false), - cluster_id_(OB_INVALID_CLUSTER_ID), - sql_proxy_(NULL) -{ -} +// TODO: Currently query ORA_ROWSCN from inner_table, however not support by oracle tenant in obcdc tenant_sync_mode. +// Wait observer support query ORA_ROWSCN by DBA VIEW +// and DBA_OB_DATA_DICTIONARY_IN_LOG VIEW will add column support query ORA_ROWSCN as END_SCN; +// TODO: to compatible with oracle mode in tenant_sync_mode, use REPORT_TIME as end_scn for temporary; +const char* ObLogMetaDataSQLQueryer::QUERY_SQL_FORMAT = "SELECT SNAPSHOT_SCN, %s AS END_SCN, START_LSN, END_LSN " + "FROM %s WHERE SNAPSHOT_SCN <= %lu ORDER BY SNAPSHOT_SCN DESC %s"; -ObLogMetaDataSQLQueryer::~ObLogMetaDataSQLQueryer() -{ - destroy(); -} - -int ObLogMetaDataSQLQueryer::init( - const int64_t cluster_id, - const bool is_across_cluster, - common::ObISQLClient &sql_proxy) +int ObLogMetaDataSQLQueryer::get_data_dict_in_log_info( + const uint64_t tenant_id, + const int64_t start_timestamp_ns, + logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info) { int ret = OB_SUCCESS; + const static int64_t QUERY_TIMEOUT = 1 * _MIN_; + ObCDCQueryResult query_result(data_dict_in_log_info); + const uint64_t query_tenant_id = TCTX.is_tenant_sync_mode() ? OB_SYS_TENANT_ID : tenant_id; - if (IS_INIT) { - ret = OB_INIT_TWICE; - LOG_ERROR("ObLogMetaDataSQLQueryer has inited", KR(ret)); + if (OB_FAIL(query(query_tenant_id, query_result, QUERY_TIMEOUT))) { + LOG_ERROR("query data dict meta info failed", KR(ret), K(tenant_id), K(query_tenant_id)); + } else if (query_result.is_empty()) { + ret = OB_ENTRY_NOT_EXIST; + LOG_WARN("can't find datadict snapshot_scn older than start_timestamp_ns", KR(ret), K(tenant_id), K(start_timestamp_ns)); } else { - is_across_cluster_ = is_across_cluster; - cluster_id_ = cluster_id; - sql_proxy_ = &sql_proxy; - is_inited_ = true; - - LOG_INFO("ObLogMetaDataSQLQueryer init success", K(cluster_id), K(is_across_cluster)); + LOG_INFO("get_data_dict_in_log_info succ", K(tenant_id), K(start_timestamp_ns), K(data_dict_in_log_info)); } return ret; } -void ObLogMetaDataSQLQueryer::destroy() -{ - is_inited_ = false; - is_across_cluster_ = false; - cluster_id_ = OB_INVALID_CLUSTER_ID; - sql_proxy_ = NULL; -} - -// Such as: -// select snapshot_scn, start_lsn, end_lsn from __all_virtual_data_dictionary_in_log where tenant_id=1004 and snapshot_scn <= 1669963370971342544 ORDER BY snapshot_scn DESC limit 1; -int ObLogMetaDataSQLQueryer::get_data_dict_in_log_info( - const uint64_t tenant_id, - const int64_t start_timstamp_ns, - int64_t &record_count, - logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info) +int ObLogMetaDataSQLQueryer::build_sql_statement_(const uint64_t tenant_id, ObSqlString &sql) { int ret = OB_SUCCESS; const bool is_oracle_mode = TCTX.mysql_proxy_.is_oracle_mode(); - const char *select_fields = "SNAPSHOT_SCN, START_LSN, END_LSN"; - const char *limit_expr = is_oracle_mode ? "FETCH FIRST 1 ROWS ONLY" : "LIMIT 1"; - // __all_virtual_data_dictionary_in_log - const char *data_dictionary_in_log_name = share::OB_ALL_VIRTUAL_DATA_DICTIONARY_IN_LOG_TNAME; - uint64_t query_tenent_id = OB_INVALID_TENANT_ID; + const char* limit_expr = is_oracle_mode ? "FETCH FIRST 1 ROWS ONLY" : "LIMIT 1"; + const char* end_scn = is_oracle_mode ? "TIMESTAMP_TO_SCN(REPORT_TIME)" : "ORA_ROWSCN"; + const char* table_name = is_oracle_mode ? share::OB_DBA_OB_DATA_DICTIONARY_IN_LOG_TNAME : share::OB_ALL_DATA_DICTIONARY_IN_LOG_TNAME; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObLogMetaDataSQLQueryer is not initialized", KR(ret)); - } else if (OB_INVALID_TENANT_ID == tenant_id - || OB_INVALID_TIMESTAMP == start_timstamp_ns) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), KT(tenant_id), K(start_timstamp_ns)); - } else { - ObSqlString sql; - record_count = 0; - - if (TCTX.is_tenant_sync_mode()) { - if (OB_FAIL(sql.assign_fmt( - "SELECT %s FROM %s" - " WHERE SNAPSHOT_SCN <= %lu" - " ORDER BY SNAPSHOT_SCN DESC %s", - select_fields, share::OB_DBA_OB_DATA_DICTIONARY_IN_LOG_TNAME, - start_timstamp_ns, limit_expr))) { - LOG_WARN("assign sql string failed", KR(ret), K(tenant_id), K(is_oracle_mode)); - } else { - query_tenent_id = tenant_id; - } - } else { - // OB4.1 doesn't have CDB_OB_DATA_DICTIONARY_IN_LOG VIEW,should only query with virtual table - if (OB_FAIL(sql.assign_fmt( - "SELECT %s FROM %s" - " WHERE TENANT_ID = %lu AND SNAPSHOT_SCN <= %lu" - " ORDER BY SNAPSHOT_SCN DESC LIMIT 1", - select_fields, share::OB_ALL_VIRTUAL_DATA_DICTIONARY_IN_LOG_TNAME, - tenant_id, start_timstamp_ns))) { - LOG_WARN("assign sql string failed", KR(ret), K(tenant_id)); - } else { - // Use OB_SYS_TENANT_ID to query - query_tenent_id = OB_SYS_TENANT_ID; - } - } - - if (OB_SUCC(ret)) { - SMART_VAR(ObISQLClient::ReadResult, result) { - if (OB_FAIL(do_query_(OB_SYS_TENANT_ID, sql, result))) { - LOG_WARN("do_query_ failed", KR(ret), K(cluster_id_), K(tenant_id), "sql", sql.ptr()); - } else if (OB_ISNULL(result.get_result())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get mysql result failed", KR(ret)); - } else if (OB_FAIL(get_records_template_( - *result.get_result(), - data_dict_in_log_info, - "DataDictionaryInLogInfo", - record_count))) { - LOG_WARN("construct data_dict_in_log_info failed", KR(ret), K(data_dict_in_log_info)); - } - } // SMART_VAR - } + if (OB_FAIL(sql.assign_fmt(QUERY_SQL_FORMAT, end_scn, table_name, start_timstamp_ns_, limit_expr))) { + LOG_ERROR("format sql failed", KR(ret), K(tenant_id), K(is_oracle_mode), KCSTRING(limit_expr), KCSTRING(table_name)); } return ret; } -int ObLogMetaDataSQLQueryer::do_query_( - const uint64_t tenant_id, - ObSqlString &sql, - ObISQLClient::ReadResult &result) +int ObLogMetaDataSQLQueryer::parse_row_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult &result) { int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObLogMetaDataSQLQueryer not init", KR(ret)); - } else if (OB_ISNULL(sql_proxy_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("SqlProxy is NULL", KR(ret)); - } else if (is_across_cluster_) { - if (OB_FAIL(sql_proxy_->read(result, cluster_id_, tenant_id, sql.ptr()))) { - LOG_WARN("execute sql failed", KR(ret), K(cluster_id_), K(tenant_id), "sql", sql.ptr()); - } - } else { - if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) { - LOG_WARN("execute sql failed", KR(ret), K(tenant_id), "sql", sql.ptr()); - } - } - - return ret; -} - -template -int ObLogMetaDataSQLQueryer::get_records_template_( - common::sqlclient::ObMySQLResult &res, - RecordsType &records, - const char *event, - int64_t &record_count) -{ - int ret = OB_SUCCESS; - - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - LOG_WARN("ObLogMetaDataSQLQueryer not init", KR(ret)); - } else { - record_count = 0; - - while (OB_SUCC(ret)) { - if (OB_FAIL(res.next())) { - if (OB_ITER_END == ret) { - // End of iteration - } else { - LOG_WARN("get next result failed", KR(ret), K(event)); - } - } else if (OB_FAIL(parse_record_from_row_(res, records))) { - LOG_WARN("parse_record_from_row_ failed", KR(ret), K(records)); - } else { - record_count++; - } - } // while - - if (OB_ITER_END == ret) { - ret = OB_SUCCESS; - } - } - - return ret; -} - -int ObLogMetaDataSQLQueryer::parse_record_from_row_( - common::sqlclient::ObMySQLResult &res, - logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info) -{ - int ret = OB_SUCCESS; int64_t snapshot_scn_int = 0; + int64_t end_scn_int = 0; int64_t start_lsn_int = 0; int64_t end_lsn_int = 0; - - (void)GET_COL_IGNORE_NULL(res.get_int, "SNAPSHOT_SCN", snapshot_scn_int); - (void)GET_COL_IGNORE_NULL(res.get_int, "START_LSN", start_lsn_int); - (void)GET_COL_IGNORE_NULL(res.get_int, "END_LSN", end_lsn_int); - - data_dict_in_log_info.reset(snapshot_scn_int, palf::LSN(start_lsn_int), palf::LSN(end_lsn_int)); + EXTRACT_UINT_FIELD_MYSQL(sql_result, "SNAPSHOT_SCN", snapshot_scn_int, int64_t); + EXTRACT_UINT_FIELD_MYSQL(sql_result, "END_SCN", end_scn_int, int64_t); + EXTRACT_UINT_FIELD_MYSQL(sql_result, "START_LSN", start_lsn_int, int64_t); + EXTRACT_UINT_FIELD_MYSQL(sql_result, "END_LSN", end_lsn_int, int64_t); + result.get_data().reset(snapshot_scn_int, end_scn_int, palf::LSN(start_lsn_int), palf::LSN(end_lsn_int)); return ret; } diff --git a/src/logservice/libobcdc/src/ob_log_meta_data_queryer.h b/src/logservice/libobcdc/src/ob_log_meta_data_queryer.h index f9786867f..6955fb9a7 100644 --- a/src/logservice/libobcdc/src/ob_log_meta_data_queryer.h +++ b/src/logservice/libobcdc/src/ob_log_meta_data_queryer.h @@ -13,13 +13,14 @@ #ifndef OCEANBASE_OB_LOG_META_SQL_QUERYER_H_ #define OCEANBASE_OB_LOG_META_SQL_QUERYER_H_ -#include "lib/mysqlclient/ob_isql_client.h" // ObISQLClient +#include "ob_cdc_tenant_query.h" #include "logservice/logfetcher/ob_log_data_dictionary_in_log_table.h" namespace oceanbase { namespace common { +class ObMySQLProxy; namespace sqlclient { class ObMySQLResult; @@ -28,50 +29,26 @@ class ObMySQLResult; namespace libobcdc { -class ObLogMetaDataSQLQueryer + +// query OB_SYS_TENANT_ID in tenant_sync_mode and query specific tenant_id in cluster_sync_mode +class ObLogMetaDataSQLQueryer : public ObCDCTenantQuery { public: - ObLogMetaDataSQLQueryer(); - virtual ~ObLogMetaDataSQLQueryer(); - int init( - const int64_t cluster_id, - const bool is_across_cluster, - common::ObISQLClient &sql_proxy); - bool is_inited() const { return is_inited_; } - void destroy(); - + ObLogMetaDataSQLQueryer(const int64_t start_timstamp_ns, common::ObMySQLProxy &sql_proxy) + : ObCDCTenantQuery(sql_proxy), start_timstamp_ns_(start_timstamp_ns) {} + ~ObLogMetaDataSQLQueryer() { start_timstamp_ns_ = OB_INVALID_TIMESTAMP; } public: int get_data_dict_in_log_info( const uint64_t tenant_id, - const int64_t start_timstamp_ns, - int64_t &record_count, + const int64_t start_timestamp_ns, logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info); - private: - int do_query_(const uint64_t tenant_id, - ObSqlString &sql, - ObISQLClient::ReadResult &result); - - template - int get_records_template_( - common::sqlclient::ObMySQLResult &res, - RecordsType &records, - const char *event, - int64_t &record_count); - - // logfetcher::DataDictionaryInLogInfo - // @param [in] res, result read from __all_virtual_data_dictionary_in_log - // @param [out] data_dict_in_log_info - int parse_record_from_row_( - common::sqlclient::ObMySQLResult &res, - logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info); - + int build_sql_statement_(const uint64_t tenant_id, ObSqlString &sql) override; + int parse_row_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult &result) override; private: - bool is_inited_; // whether this class is inited - bool is_across_cluster_; // whether the SQL query across cluster - int64_t cluster_id_; // ClusterID - common::ObISQLClient *sql_proxy_; // sql_proxy to use - + static const char* QUERY_SQL_FORMAT; +private: + int64_t start_timstamp_ns_; DISALLOW_COPY_AND_ASSIGN(ObLogMetaDataSQLQueryer); }; diff --git a/src/logservice/libobcdc/src/ob_log_meta_data_service.cpp b/src/logservice/libobcdc/src/ob_log_meta_data_service.cpp index 6efa7c18a..fa89cb4b4 100644 --- a/src/logservice/libobcdc/src/ob_log_meta_data_service.cpp +++ b/src/logservice/libobcdc/src/ob_log_meta_data_service.cpp @@ -13,6 +13,7 @@ #include "ob_log_meta_data_service.h" #include "ob_log_instance.h" #include "ob_log_fetching_mode.h" +#include "ob_log_meta_data_queryer.h" #include "share/backup/ob_archive_struct.h" #include "logservice/restoreservice/ob_log_archive_piece_mgr.h" #include "logservice/data_dictionary/ob_data_dict_meta_info.h" @@ -39,7 +40,6 @@ ObLogMetaDataService &ObLogMetaDataService::get_instance() ObLogMetaDataService::ObLogMetaDataService() : is_inited_(false), fetcher_(), - sql_queryer_(), baseline_loader_(), incremental_replayer_(), fetcher_dispatcher_() @@ -56,7 +56,7 @@ int ObLogMetaDataService::init( const ClientFetchingMode fetching_mode, const share::ObBackupPathString &archive_dest, IObLogSysLsTaskHandler *sys_ls_handler, - ObISQLClient *proxy, + common::ObMySQLProxy *proxy, IObLogErrHandler *err_handler, const int64_t cluster_id, const ObLogConfig &cfg, @@ -67,8 +67,6 @@ int ObLogMetaDataService::init( if (IS_INIT) { ret = OB_INIT_TWICE; LOG_ERROR("init twice", KR(ret)); - } else if (OB_FAIL(sql_queryer_.init(cluster_id, false, *proxy))) { - LOG_ERROR("ObLogMetaDataSQLQueryer init fail", KR(ret)); } else if (OB_FAIL(baseline_loader_.init(cfg))) { LOG_ERROR("ObLogMetaDataBaselineLoader init fail", KR(ret)); } else if (OB_FAIL(incremental_replayer_.init())) { @@ -93,7 +91,6 @@ void ObLogMetaDataService::destroy() { if (IS_INIT) { fetcher_.destroy(); - sql_queryer_.destroy(); baseline_loader_.destroy(); incremental_replayer_.destroy(); fetcher_dispatcher_.destroy(); @@ -125,7 +122,7 @@ int ObLogMetaDataService::refresh_baseline_meta_data( LOG_ERROR("log_meta_data_service get_data_dict_in_log_info failed", KR(ret), K(tenant_id)); } else { ISTAT("get_data_dict_in_log_info success", K(tenant_id), K(start_timestamp_ns), K(data_dict_in_log_info)); - start_parameters.reset(data_dict_in_log_info.snapshot_scn_, start_timestamp_ns, data_dict_in_log_info); + start_parameters.reset(data_dict_in_log_info.snapshot_scn_, std::max(start_timestamp_ns, data_dict_in_log_info.end_scn_), data_dict_in_log_info); } if (OB_SUCC(ret)) { @@ -260,14 +257,18 @@ int ObLogMetaDataService::get_data_dict_in_log_info_in_archive_( if (target_index >= item_arr_size) { ret = OB_ENTRY_NOT_EXIST; - LOG_ERROR("not datadict metainfo item older than start_timestamp_ns", K(target_index), K(data_dict_meta_info), + LOG_ERROR("can't find datadict snapshot_scn older than start_timestamp_ns", K(target_index), K(data_dict_meta_info), K(start_timestamp_ns)); } else { datadict::ObDataDictMetaInfoItem data_dict_item; data_dict_item = item_arr.at(target_index); + const int64_t end_scn_val = (data_dict_item.end_scn_ == share::OB_INVALID_SCN_VAL) ? start_timestamp_ns : data_dict_item.end_scn_; + data_dict_in_log_info.reset(data_dict_item.snapshot_scn_, + end_scn_val, palf::LSN(data_dict_item.start_lsn_), palf::LSN(data_dict_item.end_lsn_)); + LOG_INFO("get_data_dict_in_log_info_in_archive_ succ", K(data_dict_meta_info), K(start_timestamp_ns), K(data_dict_in_log_info)); } } } @@ -329,7 +330,7 @@ int ObLogMetaDataService::read_meta_info_in_archive_log_( LOG_WARN("data dict info is not valid", KR(ret), K(data_dict_meta_info)); } else { const uint64_t tenant_id = data_dict_meta_info.get_tenant_id(); - LOG_INFO("read_meta_info_in_archive_log success", K(tenant_id), K(data_dict_meta_info)); + LOG_INFO("read_meta_info_in_archive_log success", K(tenant_id), K(start_timestamp_ns), K(data_dict_meta_info)); } } } @@ -343,7 +344,7 @@ int ObLogMetaDataService::read_meta_info_in_archive_log_( int ObLogMetaDataService::get_data_dict_in_log_info_( const uint64_t tenant_id, - const int64_t start_timstamp_ns, + const int64_t start_timestamp_ns, logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info) { int ret = OB_SUCCESS; @@ -355,39 +356,43 @@ int ObLogMetaDataService::get_data_dict_in_log_info_( ret = OB_NOT_INIT; LOG_ERROR("ObLogMetaDataService is not initialized", KR(ret)); } else { - if (is_direct_fetching_mode(fetching_mode)) { - while (OB_SUCC(ret) && ! done) { - if (OB_FAIL(get_data_dict_in_log_info_in_archive_(start_timstamp_ns, data_dict_in_log_info))) { - LOG_WARN("get_data_dict_in_log_info_in_archive_ failed", KR(ret), K(start_timstamp_ns), K(tenant_id)); + while (! done) { + if (is_direct_fetching_mode(fetching_mode)) { + if (OB_FAIL(get_data_dict_in_log_info_in_archive_(start_timestamp_ns, data_dict_in_log_info))) { + LOG_WARN("get_data_dict_in_log_info_in_archive_ failed", KR(ret), K(start_timestamp_ns), K(tenant_id), K(data_dict_in_log_info)); } else { done = true; } + } else if (is_integrated_fetching_mode(fetching_mode)) { + common::ObMySQLProxy& sql_proxy = TCTX.is_tenant_sync_mode() ? TCTX.mysql_proxy_.get_ob_mysql_proxy() : TCTX.tenant_sql_proxy_.get_ob_mysql_proxy(); + ObLogMetaDataSQLQueryer sql_queryer(start_timestamp_ns, sql_proxy); - if (OB_FAIL(ret)) { - ret = OB_SUCCESS; - ob_usleep(100L * 1000L); - } - } - } else if (is_integrated_fetching_mode(fetching_mode)) { - while (OB_SUCC(ret) && ! done) { - if (OB_FAIL(sql_queryer_.get_data_dict_in_log_info(tenant_id, start_timstamp_ns, record_count, data_dict_in_log_info))) { - LOG_WARN("sql_queryer_ get_data_dict_in_log_info fail", KR(ret), K(tenant_id), K(start_timstamp_ns), - K(record_count), K(data_dict_in_log_info)); - } else if (0 == record_count) { - ret = OB_NEED_RETRY; - LOG_WARN("No valid baseline data is currently available, need retry", KR(ret), K(tenant_id), K(start_timstamp_ns)); + if (OB_FAIL(sql_queryer.get_data_dict_in_log_info(tenant_id, start_timestamp_ns, data_dict_in_log_info))) { + LOG_WARN("sql_queryer get_data_dict_in_log_info fail", KR(ret), K(tenant_id), K(start_timestamp_ns), K(data_dict_in_log_info)); } else { done = true; } - - if (OB_FAIL(ret)) { - ret = OB_SUCCESS; - ob_usleep(100L * 1000L); - } + } else { + ret = OB_NOT_SUPPORTED; + done = true; + LOG_ERROR("[FATAL]fetching mode of TCTX is not valid", KR(ret), K(fetching_mode), K(tenant_id)); + } + + if (OB_SUCC(ret) || done) { + } else if (OB_ENTRY_NOT_EXIST == ret) { + done = true; + LOG_ERROR("[FATAL][DATA_DICT] Can't find suitable data_dict to launch OBCDC", KR(ret), K(start_timestamp_ns)); + } else { + const static int64_t RETRY_FUNC_PRINT_INTERVAL = 10 * _SEC_; + const int64_t sleep_usec_on_error = 100 * _MSEC_; + if (REACH_TIME_INTERVAL(RETRY_FUNC_PRINT_INTERVAL)) { + LOG_WARN("[DATA_DICT] retry get data_dict_in_log_info", KR(ret)); + } else { + LOG_TRACE("[DATA_DICT] retry get data_dict_in_log_info", KR(ret)); + } + ret = OB_SUCCESS; + ob_usleep(sleep_usec_on_error); } - } else { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("fetching mode of TCTX is not valid", KR(ret), K(fetching_mode), K(tenant_id)); } } diff --git a/src/logservice/libobcdc/src/ob_log_meta_data_service.h b/src/logservice/libobcdc/src/ob_log_meta_data_service.h index 07eb6c491..f33dd65f2 100644 --- a/src/logservice/libobcdc/src/ob_log_meta_data_service.h +++ b/src/logservice/libobcdc/src/ob_log_meta_data_service.h @@ -12,7 +12,6 @@ #define OCEANBASE_LOG_META_DATA_SERVICE_H_ #include "common/ob_region.h" -#include "lib/mysqlclient/ob_isql_client.h" // ObISQLClient #include "lib/allocator/ob_concurrent_fifo_allocator.h" // ObConcurrentFIFOAllocator #include "share/backup/ob_backup_struct.h" // ObBackupPathString #include "logservice/data_dictionary/ob_data_dict_meta_info.h" // ObDataDictMetaInfo @@ -21,7 +20,6 @@ #include "ob_log_task_pool.h" #include "ob_log_entry_task_pool.h" #include "logservice/logfetcher/ob_log_data_dictionary_in_log_table.h" -#include "ob_log_meta_data_queryer.h" #include "ob_log_meta_data_baseline_loader.h" #include "ob_log_meta_data_replayer.h" // ObLogMetaDataReplayer #include "ob_log_meta_data_fetcher.h" // ObLogMetaDataFetcher @@ -29,6 +27,10 @@ namespace oceanbase { +namespace common +{ +class ObMySQLProxy; +} namespace libobcdc { class IObLogSysLsTaskHandler; @@ -48,7 +50,7 @@ public: const ClientFetchingMode fetching_mode, const share::ObBackupPathString &archive_dest, IObLogSysLsTaskHandler *sys_ls_handler, - common::ObISQLClient *proxy, + common::ObMySQLProxy *proxy, IObLogErrHandler *err_handler, const int64_t cluster_id, const ObLogConfig &cfg, @@ -112,7 +114,6 @@ private: private: bool is_inited_; ObLogMetaDataFetcher fetcher_; - ObLogMetaDataSQLQueryer sql_queryer_; ObLogMetaDataBaselineLoader baseline_loader_; ObLogMetaDataReplayer incremental_replayer_; ObLogMetaDataFetcherDispatcher fetcher_dispatcher_; diff --git a/src/logservice/logfetcher/ob_log_data_dictionary_in_log_table.h b/src/logservice/logfetcher/ob_log_data_dictionary_in_log_table.h index b88b54002..508a31826 100644 --- a/src/logservice/logfetcher/ob_log_data_dictionary_in_log_table.h +++ b/src/logservice/logfetcher/ob_log_data_dictionary_in_log_table.h @@ -25,6 +25,7 @@ struct DataDictionaryInLogInfo bool is_valid() const { return common::OB_INVALID_TIMESTAMP != snapshot_scn_ + && common::OB_INVALID_TIMESTAMP != end_scn_ && start_lsn_.is_valid() && end_lsn_.is_valid(); } @@ -32,16 +33,19 @@ struct DataDictionaryInLogInfo void reset() { snapshot_scn_ = common::OB_INVALID_TIMESTAMP; + end_scn_ = common::OB_INVALID_TIMESTAMP; start_lsn_.reset(); end_lsn_.reset(); } void reset( const int64_t snapshot_scn, + const int64_t end_scn, const palf::LSN &start_lsn, const palf::LSN &end_lsn) { snapshot_scn_ = snapshot_scn; + end_scn_ = end_scn; start_lsn_ = start_lsn; end_lsn_ = end_lsn; } @@ -49,6 +53,7 @@ struct DataDictionaryInLogInfo DataDictionaryInLogInfo &operator=(const DataDictionaryInLogInfo &other) { snapshot_scn_ = other.snapshot_scn_; + end_scn_ = other.end_scn_; start_lsn_ = other.start_lsn_; end_lsn_ = other.end_lsn_; return *this; @@ -56,10 +61,12 @@ struct DataDictionaryInLogInfo TO_STRING_KV( K_(snapshot_scn), + K_(end_scn), K_(start_lsn), K_(end_lsn)); int64_t snapshot_scn_; + int64_t end_scn_; palf::LSN start_lsn_; palf::LSN end_lsn_; }; diff --git a/unittest/data_dictionary/test_data_dict_meta_info.cpp b/unittest/data_dictionary/test_data_dict_meta_info.cpp index 1baa533b4..28b7ae6f7 100644 --- a/unittest/data_dictionary/test_data_dict_meta_info.cpp +++ b/unittest/data_dictionary/test_data_dict_meta_info.cpp @@ -15,6 +15,7 @@ #include "gtest/gtest.h" #include "lib/oblog/ob_log.h" #include "logservice/palf/log_define.h" +#include "share/ob_errno.h" #define private public #include "logservice/data_dictionary/ob_data_dict_meta_info.h" @@ -36,12 +37,15 @@ TEST(ObDataDictMetaInfoItem, test_data_dict_meta_info_item) int64_t pos = 0; DataDictMetaInfoItemArr item_arr; for (int64_t i = 0; i < item_cnt; i++) { - item.reset(random.get(), random.get(), random.get()); + item.reset(random.get(), random.get(), random.get(), random.get()); EXPECT_EQ(OB_SUCCESS, item_arr.push_back(item)); EXPECT_EQ(OB_SUCCESS, item.serialize(buf, buf_size, pos)); } int64_t deserialize_pos = 0; + EXPECT_EQ(common::OB_VERSION_NOT_MATCH, item.deserialize(buf, buf_size, deserialize_pos)); + deserialize_pos = 0; for (int64_t i = 0; i < item_cnt; i++) { + item.set_version(datadict::ObDataDictMetaInfoHeader::DATADICT_METAINFO_META_VERSION); EXPECT_EQ(OB_SUCCESS, item.deserialize(buf, buf_size, deserialize_pos)); EXPECT_EQ(item, item_arr.at(i)); } @@ -90,7 +94,8 @@ TEST(ObDataDictMetaInfo, test_data_dict_meta_info) while (have_enough_buffer ) { item.reset(random.get(0, max_scn_val), random.get(0, palf::LOG_MAX_LSN_VAL), - random.get(0, palf::LOG_MAX_LSN_VAL)); + random.get(0, palf::LOG_MAX_LSN_VAL), + random.get(0, max_scn_val)); if (total_item_data_size + item.get_serialize_size() <= buffer_size - header_size) { item_count_expected++; total_item_data_size += item.get_serialize_size();