[CP] [OBCDC] Fix can't launch in data_dict mode if start_ts is during generation of data_dictionary
This commit is contained in:
parent
a97235802c
commit
1bbd4ddafe
@ -47,6 +47,7 @@
|
||||
&& common::OB_ENTRY_NOT_EXIST != ret) { \
|
||||
ARCHIVE_LOG(WARN, "check and get record failed", K(ret)); \
|
||||
} else if (common::OB_SUCCESS == ret \
|
||||
&& record_context.last_record_round_ == key.round_ \
|
||||
&& common::ObTimeUtility::current_time_ns() - record_context.last_record_scn_.convert_to_ts()*1000L < interval) { \
|
||||
} else if (OB_FAIL(t.get_ls_array(array))) { \
|
||||
ARCHIVE_LOG(WARN, "get ls array failed", K(ret), K(task_type)); \
|
||||
|
@ -43,19 +43,23 @@ ObDataDictMetaInfoItem::~ObDataDictMetaInfoItem()
|
||||
|
||||
void ObDataDictMetaInfoItem::reset()
|
||||
{
|
||||
version_ = 0;
|
||||
snapshot_scn_ = share::OB_INVALID_SCN_VAL;
|
||||
start_lsn_ = palf::LOG_INVALID_LSN_VAL;
|
||||
end_lsn_ = palf::LOG_INVALID_LSN_VAL;
|
||||
end_scn_ = share::OB_INVALID_SCN_VAL;
|
||||
}
|
||||
|
||||
void ObDataDictMetaInfoItem::reset(
|
||||
const uint64_t snapshot_scn,
|
||||
const uint64_t start_lsn,
|
||||
const uint64_t end_lsn)
|
||||
const uint64_t end_lsn,
|
||||
const int64_t end_scn)
|
||||
{
|
||||
snapshot_scn_ = snapshot_scn;
|
||||
start_lsn_ = start_lsn;
|
||||
end_lsn_ = end_lsn;
|
||||
end_scn_ = end_scn;
|
||||
}
|
||||
|
||||
DEFINE_SERIALIZE(ObDataDictMetaInfoItem)
|
||||
@ -65,7 +69,8 @@ DEFINE_SERIALIZE(ObDataDictMetaInfoItem)
|
||||
LST_DO_CODE(OB_UNIS_ENCODE,
|
||||
snapshot_scn_,
|
||||
start_lsn_,
|
||||
end_lsn_);
|
||||
end_lsn_,
|
||||
end_scn_);
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -74,10 +79,26 @@ DEFINE_DESERIALIZE(ObDataDictMetaInfoItem)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
LST_DO_CODE(OB_UNIS_DECODE,
|
||||
snapshot_scn_,
|
||||
start_lsn_,
|
||||
end_lsn_);
|
||||
if (OB_UNLIKELY(version_ < 1)) {
|
||||
ret = OB_VERSION_NOT_MATCH;
|
||||
DDLOG(WARN, "expect valid version while deserializing DictMetaInfo", KR(ret), K_(version));
|
||||
} else {
|
||||
LST_DO_CODE(OB_UNIS_DECODE,
|
||||
snapshot_scn_,
|
||||
start_lsn_,
|
||||
end_lsn_);
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else {
|
||||
// deserialize field added in version = 2
|
||||
if (version_ > 1) {
|
||||
if (OB_FAIL(common::serialization::decode(buf, data_len, pos, end_scn_))) {
|
||||
DDLOG(WARN, "deserialize end_scn field failed", KR(ret), K(data_len), K(pos), K(version_));
|
||||
}
|
||||
}
|
||||
// add deserialize logic here for newer version.
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -89,7 +110,8 @@ DEFINE_GET_SERIALIZE_SIZE(ObDataDictMetaInfoItem)
|
||||
LST_DO_CODE(OB_UNIS_ADD_LEN,
|
||||
snapshot_scn_,
|
||||
start_lsn_,
|
||||
end_lsn_);
|
||||
end_lsn_,
|
||||
end_scn_);
|
||||
|
||||
return len;
|
||||
}
|
||||
@ -332,6 +354,8 @@ DEFINE_DESERIALIZE(ObDataDictMetaInfo)
|
||||
}
|
||||
for (int32_t i = 0; OB_SUCC(ret) && i < item_cnt && pos < data_len; i++) {
|
||||
ObDataDictMetaInfoItem item;
|
||||
item.set_version(header_.get_meta_version());
|
||||
|
||||
if (OB_FAIL(item.deserialize(buf, data_len, pos))) {
|
||||
DDLOG(WARN, "datadict metainfo item deserialize failed", K(ret),
|
||||
K(buf), K(data_len), K(pos));
|
||||
@ -346,7 +370,7 @@ DEFINE_DESERIALIZE(ObDataDictMetaInfo)
|
||||
/////////////////////////////////// MetaInfoQueryHelper ///////////////////////////////////
|
||||
|
||||
const char *MetaInfoQueryHelper::QUERY_META_INFO_SQL_STR =
|
||||
"SELECT snapshot_scn, start_lsn, end_lsn FROM %s ORDER BY snapshot_scn DESC;";
|
||||
"SELECT snapshot_scn, ora_rowscn as end_scn, start_lsn, end_lsn FROM %s ORDER BY snapshot_scn DESC;";
|
||||
const char *MetaInfoQueryHelper::DATA_DICT_META_TABLE_NAME =
|
||||
share::OB_ALL_DATA_DICTIONARY_IN_LOG_TNAME;
|
||||
|
||||
@ -443,8 +467,8 @@ int MetaInfoQueryHelper::get_data_dict_meta_info_(const share::SCN &base_scn, Da
|
||||
|
||||
SMART_VAR(ObISQLClient::ReadResult, result) {
|
||||
if (OB_FAIL(sql.assign_fmt(QUERY_META_INFO_SQL_STR, DATA_DICT_META_TABLE_NAME))) {
|
||||
DDLOG(WARN, "assign format to sqlstring failed", K(ret), K(QUERY_META_INFO_SQL_STR),
|
||||
K(DATA_DICT_META_TABLE_NAME));
|
||||
DDLOG(WARN, "assign format to sqlstring failed", KR(ret), KCSTRING(QUERY_META_INFO_SQL_STR),
|
||||
KCSTRING(DATA_DICT_META_TABLE_NAME));
|
||||
} else if (OB_FAIL(sql_proxy_.read(result, tenant_id_, sql.ptr()))) {
|
||||
DDLOG(WARN, "sql proxy failed to read result when querying datadict metainfo", K(ret),
|
||||
K(tenant_id_), "sql", sql.ptr());
|
||||
@ -484,10 +508,12 @@ int MetaInfoQueryHelper::parse_record_from_result_(
|
||||
// | Field | Type |
|
||||
// +--------------+---------------------+
|
||||
// | snapshot_scn | bigint(20) unsigned |
|
||||
// | end_scn | bigint(20) unsigned |
|
||||
// | start_lsn | bigint(20) unsigned |
|
||||
// | end_lsn | bigint(20) unsigned |
|
||||
// +--------------+---------------------+
|
||||
EXTRACT_UINT_FIELD_MYSQL(result, "snapshot_scn", item.snapshot_scn_, uint64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(result, "end_scn", item.end_scn_, int64_t);
|
||||
EXTRACT_UINT_FIELD_MYSQL(result, "start_lsn", item.start_lsn_, uint64_t);
|
||||
EXTRACT_UINT_FIELD_MYSQL(result, "end_lsn", item.end_lsn_, uint64_t);
|
||||
// only use record generated after base_scn.
|
||||
|
@ -36,21 +36,30 @@ public:
|
||||
void reset(
|
||||
const uint64_t snapshot_scn,
|
||||
const uint64_t start_lsn,
|
||||
const uint64_t end_lsn);
|
||||
const uint64_t end_lsn,
|
||||
const int64_t end_scn);
|
||||
|
||||
bool operator==(const ObDataDictMetaInfoItem &that) const {
|
||||
return snapshot_scn_ == that.snapshot_scn_ &&
|
||||
start_lsn_ == that.start_lsn_ &&
|
||||
end_lsn_ == that.end_lsn_;
|
||||
end_lsn_ == that.end_lsn_ &&
|
||||
end_scn_ == that.end_scn_;
|
||||
}
|
||||
|
||||
void set_version(const int16_t version) { version_ = version; }
|
||||
|
||||
NEED_SERIALIZE_AND_DESERIALIZE;
|
||||
|
||||
TO_STRING_KV(K_(snapshot_scn), K_(start_lsn), K_(end_lsn));
|
||||
TO_STRING_KV(K_(snapshot_scn), K_(end_scn), K_(start_lsn), K_(end_lsn));
|
||||
public:
|
||||
// Decide deserialize behavious;
|
||||
// set while deserialize ObDataDictMetaInfo, value should be same with meta_version_ in ObDataDictMetaInfoHeader;
|
||||
// NOTICE: SHOULD NOT SERIALIZE THIS FIELD.
|
||||
int16_t version_;
|
||||
uint64_t snapshot_scn_;
|
||||
uint64_t start_lsn_;
|
||||
uint64_t end_lsn_;
|
||||
int64_t end_scn_; // generate from ora_rowscn(int64_t), add at meta_version = 2;
|
||||
};
|
||||
|
||||
typedef ObSEArray<ObDataDictMetaInfoItem, 16> DataDictMetaInfoItemArr;
|
||||
@ -59,7 +68,7 @@ class ObDataDictMetaInfoHeader
|
||||
{
|
||||
public:
|
||||
static const int16_t DATADICT_METAINFO_HEADER_MAGIC = 0x4444; // hex of "DD"
|
||||
static const int64_t DATADICT_METAINFO_META_VERSION = 0x0001; // version = 1
|
||||
static const int64_t DATADICT_METAINFO_META_VERSION = 0x0002; // version = 1
|
||||
public:
|
||||
ObDataDictMetaInfoHeader();
|
||||
~ObDataDictMetaInfoHeader();
|
||||
@ -73,7 +82,7 @@ public:
|
||||
const int64_t data_size);
|
||||
|
||||
uint64_t get_tenant_id() const { return tenant_id_; }
|
||||
OB_INLINE int32_t get_meta_version() const {
|
||||
OB_INLINE int16_t get_meta_version() const {
|
||||
return meta_version_;
|
||||
}
|
||||
OB_INLINE int32_t get_item_count() const {
|
||||
@ -132,7 +141,7 @@ public:
|
||||
int64_t get_data_size() const {
|
||||
return header_.get_data_size();
|
||||
}
|
||||
int32_t get_meta_version() const {
|
||||
int16_t get_meta_version() const {
|
||||
return header_.get_meta_version();
|
||||
}
|
||||
|
||||
|
@ -28,6 +28,7 @@ public:
|
||||
virtual ~ObCDCQueryResult() { affect_rows_ = 0; }
|
||||
public:
|
||||
OB_INLINE void inc_affect_rows() { affect_rows_++; }
|
||||
OB_INLINE int64_t get_affect_rows() const { return affect_rows_; }
|
||||
OB_INLINE bool has_data() const { return affect_rows_ > 0; }
|
||||
OB_INLINE bool is_empty() const { return ! has_data(); }
|
||||
OB_INLINE T& get_data() const { return data_; }
|
||||
@ -81,7 +82,7 @@ int ObCDCTenantQuery<T>::query(const uint64_t tenant_id, ObCDCQueryResult<T> &qu
|
||||
} else {
|
||||
bool query_done = false;
|
||||
int retry_cnt = 0;
|
||||
const int64_t retry_fail_sleep_time = 1 * _MSEC_;
|
||||
const int64_t retry_fail_sleep_time = 100 * _MSEC_;
|
||||
const int64_t retry_warn_interval = 5 * _SEC_;
|
||||
const int64_t start_time = get_timestamp();
|
||||
const int64_t end_time = start_time + retry_timeout;
|
||||
|
@ -135,7 +135,7 @@ public:
|
||||
"part trans task prealloc page count");
|
||||
// Log_level=INFO in the startup scenario, and then optimize the schema to WARN afterwards
|
||||
DEF_STR(init_log_level, OB_CLUSTER_PARAMETER, "ALL.*:INFO;PALF.*:WARN;SHARE.SCHEMA:INFO", "log level: DEBUG, TRACE, INFO, WARN, USER_ERR, ERROR");
|
||||
DEF_STR(log_level, OB_CLUSTER_PARAMETER, "ALL.*:INFO;PALF.*:WARN;SHARE.SCHEMA:WARN", "log level: DEBUG, TRACE, INFO, WARN, USER_ERR, ERROR");
|
||||
DEF_STR(log_level, OB_CLUSTER_PARAMETER, "ALL.*:INFO;PALF.*:WARN;SHARE.SCHEMA:WARN;CLOG.*:WARN;STORAGE.*:WARN;ARCHIVE.*:WARN", "log level: DEBUG, TRACE, INFO, WARN, USER_ERR, ERROR");
|
||||
// root server info for oblog, seperated by `;` between multi rootserver, a root server info format as `ip:rpc_port:sql_port`
|
||||
DEF_STR(rootserver_list, OB_CLUSTER_PARAMETER, "|", "OB RootServer list");
|
||||
DEF_STR(cluster_url, OB_CLUSTER_PARAMETER, "|", "OB configure url");
|
||||
|
@ -1278,6 +1278,10 @@ int ObLogInstance::config_tenant_mgr_(const int64_t start_tstamp_ns,
|
||||
GET_SCHEMA_TIMEOUT_ON_START_UP,
|
||||
add_tenant_succ))) {
|
||||
LOG_ERROR("add_tenant fail", KR(ret), K(start_tstamp_ns), K(sys_schema_version));
|
||||
} else if (! add_tenant_succ) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("[FATAL] [LAUNCH] ADD_TENANT WITH NO ALIVE SERVER MODE FAILED", KR(ret),
|
||||
K(start_tstamp_ns), K_(refresh_mode), K_(fetching_mode));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(tenant_mgr_->add_all_tenants(
|
||||
|
@ -14,7 +14,7 @@
|
||||
|
||||
#include "ob_log_instance.h" // TCTX
|
||||
#include "ob_log_meta_data_queryer.h"
|
||||
#include "lib/mysqlclient/ob_isql_client.h" // ObISQLClient
|
||||
#include "lib/mysqlclient/ob_mysql_proxy.h" // ObISQLClient
|
||||
#include "lib/mysqlclient/ob_mysql_result.h" // ObMySQLResult
|
||||
#include "lib/string/ob_sql_string.h" // ObSqlString
|
||||
#include "share/inner_table/ob_inner_table_schema_constants.h" // OB_***_TNAME
|
||||
@ -24,201 +24,64 @@ namespace oceanbase
|
||||
{
|
||||
namespace libobcdc
|
||||
{
|
||||
ObLogMetaDataSQLQueryer::ObLogMetaDataSQLQueryer() :
|
||||
is_inited_(false),
|
||||
is_across_cluster_(false),
|
||||
cluster_id_(OB_INVALID_CLUSTER_ID),
|
||||
sql_proxy_(NULL)
|
||||
{
|
||||
|
||||
}
|
||||
// TODO: Currently query ORA_ROWSCN from inner_table, however not support by oracle tenant in obcdc tenant_sync_mode.
|
||||
// Wait observer support query ORA_ROWSCN by DBA VIEW
|
||||
// and DBA_OB_DATA_DICTIONARY_IN_LOG VIEW will add column support query ORA_ROWSCN as END_SCN;
|
||||
// TODO: to compatible with oracle mode in tenant_sync_mode, use REPORT_TIME as end_scn for temporary;
|
||||
const char* ObLogMetaDataSQLQueryer::QUERY_SQL_FORMAT = "SELECT SNAPSHOT_SCN, %s AS END_SCN, START_LSN, END_LSN "
|
||||
"FROM %s WHERE SNAPSHOT_SCN <= %lu ORDER BY SNAPSHOT_SCN DESC %s";
|
||||
|
||||
ObLogMetaDataSQLQueryer::~ObLogMetaDataSQLQueryer()
|
||||
{
|
||||
destroy();
|
||||
}
|
||||
|
||||
int ObLogMetaDataSQLQueryer::init(
|
||||
const int64_t cluster_id,
|
||||
const bool is_across_cluster,
|
||||
common::ObISQLClient &sql_proxy)
|
||||
int ObLogMetaDataSQLQueryer::get_data_dict_in_log_info(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t start_timestamp_ns,
|
||||
logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const static int64_t QUERY_TIMEOUT = 1 * _MIN_;
|
||||
ObCDCQueryResult<logfetcher::DataDictionaryInLogInfo> query_result(data_dict_in_log_info);
|
||||
const uint64_t query_tenant_id = TCTX.is_tenant_sync_mode() ? OB_SYS_TENANT_ID : tenant_id;
|
||||
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_ERROR("ObLogMetaDataSQLQueryer has inited", KR(ret));
|
||||
if (OB_FAIL(query(query_tenant_id, query_result, QUERY_TIMEOUT))) {
|
||||
LOG_ERROR("query data dict meta info failed", KR(ret), K(tenant_id), K(query_tenant_id));
|
||||
} else if (query_result.is_empty()) {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
LOG_WARN("can't find datadict snapshot_scn older than start_timestamp_ns", KR(ret), K(tenant_id), K(start_timestamp_ns));
|
||||
} else {
|
||||
is_across_cluster_ = is_across_cluster;
|
||||
cluster_id_ = cluster_id;
|
||||
sql_proxy_ = &sql_proxy;
|
||||
is_inited_ = true;
|
||||
|
||||
LOG_INFO("ObLogMetaDataSQLQueryer init success", K(cluster_id), K(is_across_cluster));
|
||||
LOG_INFO("get_data_dict_in_log_info succ", K(tenant_id), K(start_timestamp_ns), K(data_dict_in_log_info));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObLogMetaDataSQLQueryer::destroy()
|
||||
{
|
||||
is_inited_ = false;
|
||||
is_across_cluster_ = false;
|
||||
cluster_id_ = OB_INVALID_CLUSTER_ID;
|
||||
sql_proxy_ = NULL;
|
||||
}
|
||||
|
||||
// Such as:
|
||||
// select snapshot_scn, start_lsn, end_lsn from __all_virtual_data_dictionary_in_log where tenant_id=1004 and snapshot_scn <= 1669963370971342544 ORDER BY snapshot_scn DESC limit 1;
|
||||
int ObLogMetaDataSQLQueryer::get_data_dict_in_log_info(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t start_timstamp_ns,
|
||||
int64_t &record_count,
|
||||
logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info)
|
||||
int ObLogMetaDataSQLQueryer::build_sql_statement_(const uint64_t tenant_id, ObSqlString &sql)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const bool is_oracle_mode = TCTX.mysql_proxy_.is_oracle_mode();
|
||||
const char *select_fields = "SNAPSHOT_SCN, START_LSN, END_LSN";
|
||||
const char *limit_expr = is_oracle_mode ? "FETCH FIRST 1 ROWS ONLY" : "LIMIT 1";
|
||||
// __all_virtual_data_dictionary_in_log
|
||||
const char *data_dictionary_in_log_name = share::OB_ALL_VIRTUAL_DATA_DICTIONARY_IN_LOG_TNAME;
|
||||
uint64_t query_tenent_id = OB_INVALID_TENANT_ID;
|
||||
const char* limit_expr = is_oracle_mode ? "FETCH FIRST 1 ROWS ONLY" : "LIMIT 1";
|
||||
const char* end_scn = is_oracle_mode ? "TIMESTAMP_TO_SCN(REPORT_TIME)" : "ORA_ROWSCN";
|
||||
const char* table_name = is_oracle_mode ? share::OB_DBA_OB_DATA_DICTIONARY_IN_LOG_TNAME : share::OB_ALL_DATA_DICTIONARY_IN_LOG_TNAME;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObLogMetaDataSQLQueryer is not initialized", KR(ret));
|
||||
} else if (OB_INVALID_TENANT_ID == tenant_id
|
||||
|| OB_INVALID_TIMESTAMP == start_timstamp_ns) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), KT(tenant_id), K(start_timstamp_ns));
|
||||
} else {
|
||||
ObSqlString sql;
|
||||
record_count = 0;
|
||||
|
||||
if (TCTX.is_tenant_sync_mode()) {
|
||||
if (OB_FAIL(sql.assign_fmt(
|
||||
"SELECT %s FROM %s"
|
||||
" WHERE SNAPSHOT_SCN <= %lu"
|
||||
" ORDER BY SNAPSHOT_SCN DESC %s",
|
||||
select_fields, share::OB_DBA_OB_DATA_DICTIONARY_IN_LOG_TNAME,
|
||||
start_timstamp_ns, limit_expr))) {
|
||||
LOG_WARN("assign sql string failed", KR(ret), K(tenant_id), K(is_oracle_mode));
|
||||
} else {
|
||||
query_tenent_id = tenant_id;
|
||||
}
|
||||
} else {
|
||||
// OB4.1 doesn't have CDB_OB_DATA_DICTIONARY_IN_LOG VIEW,should only query with virtual table
|
||||
if (OB_FAIL(sql.assign_fmt(
|
||||
"SELECT %s FROM %s"
|
||||
" WHERE TENANT_ID = %lu AND SNAPSHOT_SCN <= %lu"
|
||||
" ORDER BY SNAPSHOT_SCN DESC LIMIT 1",
|
||||
select_fields, share::OB_ALL_VIRTUAL_DATA_DICTIONARY_IN_LOG_TNAME,
|
||||
tenant_id, start_timstamp_ns))) {
|
||||
LOG_WARN("assign sql string failed", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
// Use OB_SYS_TENANT_ID to query
|
||||
query_tenent_id = OB_SYS_TENANT_ID;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
SMART_VAR(ObISQLClient::ReadResult, result) {
|
||||
if (OB_FAIL(do_query_(OB_SYS_TENANT_ID, sql, result))) {
|
||||
LOG_WARN("do_query_ failed", KR(ret), K(cluster_id_), K(tenant_id), "sql", sql.ptr());
|
||||
} else if (OB_ISNULL(result.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get mysql result failed", KR(ret));
|
||||
} else if (OB_FAIL(get_records_template_(
|
||||
*result.get_result(),
|
||||
data_dict_in_log_info,
|
||||
"DataDictionaryInLogInfo",
|
||||
record_count))) {
|
||||
LOG_WARN("construct data_dict_in_log_info failed", KR(ret), K(data_dict_in_log_info));
|
||||
}
|
||||
} // SMART_VAR
|
||||
}
|
||||
if (OB_FAIL(sql.assign_fmt(QUERY_SQL_FORMAT, end_scn, table_name, start_timstamp_ns_, limit_expr))) {
|
||||
LOG_ERROR("format sql failed", KR(ret), K(tenant_id), K(is_oracle_mode), KCSTRING(limit_expr), KCSTRING(table_name));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogMetaDataSQLQueryer::do_query_(
|
||||
const uint64_t tenant_id,
|
||||
ObSqlString &sql,
|
||||
ObISQLClient::ReadResult &result)
|
||||
int ObLogMetaDataSQLQueryer::parse_row_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult<logfetcher::DataDictionaryInLogInfo> &result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObLogMetaDataSQLQueryer not init", KR(ret));
|
||||
} else if (OB_ISNULL(sql_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("SqlProxy is NULL", KR(ret));
|
||||
} else if (is_across_cluster_) {
|
||||
if (OB_FAIL(sql_proxy_->read(result, cluster_id_, tenant_id, sql.ptr()))) {
|
||||
LOG_WARN("execute sql failed", KR(ret), K(cluster_id_), K(tenant_id), "sql", sql.ptr());
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
|
||||
LOG_WARN("execute sql failed", KR(ret), K(tenant_id), "sql", sql.ptr());
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename RecordsType>
|
||||
int ObLogMetaDataSQLQueryer::get_records_template_(
|
||||
common::sqlclient::ObMySQLResult &res,
|
||||
RecordsType &records,
|
||||
const char *event,
|
||||
int64_t &record_count)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObLogMetaDataSQLQueryer not init", KR(ret));
|
||||
} else {
|
||||
record_count = 0;
|
||||
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(res.next())) {
|
||||
if (OB_ITER_END == ret) {
|
||||
// End of iteration
|
||||
} else {
|
||||
LOG_WARN("get next result failed", KR(ret), K(event));
|
||||
}
|
||||
} else if (OB_FAIL(parse_record_from_row_(res, records))) {
|
||||
LOG_WARN("parse_record_from_row_ failed", KR(ret), K(records));
|
||||
} else {
|
||||
record_count++;
|
||||
}
|
||||
} // while
|
||||
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogMetaDataSQLQueryer::parse_record_from_row_(
|
||||
common::sqlclient::ObMySQLResult &res,
|
||||
logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t snapshot_scn_int = 0;
|
||||
int64_t end_scn_int = 0;
|
||||
int64_t start_lsn_int = 0;
|
||||
int64_t end_lsn_int = 0;
|
||||
|
||||
(void)GET_COL_IGNORE_NULL(res.get_int, "SNAPSHOT_SCN", snapshot_scn_int);
|
||||
(void)GET_COL_IGNORE_NULL(res.get_int, "START_LSN", start_lsn_int);
|
||||
(void)GET_COL_IGNORE_NULL(res.get_int, "END_LSN", end_lsn_int);
|
||||
|
||||
data_dict_in_log_info.reset(snapshot_scn_int, palf::LSN(start_lsn_int), palf::LSN(end_lsn_int));
|
||||
EXTRACT_UINT_FIELD_MYSQL(sql_result, "SNAPSHOT_SCN", snapshot_scn_int, int64_t);
|
||||
EXTRACT_UINT_FIELD_MYSQL(sql_result, "END_SCN", end_scn_int, int64_t);
|
||||
EXTRACT_UINT_FIELD_MYSQL(sql_result, "START_LSN", start_lsn_int, int64_t);
|
||||
EXTRACT_UINT_FIELD_MYSQL(sql_result, "END_LSN", end_lsn_int, int64_t);
|
||||
result.get_data().reset(snapshot_scn_int, end_scn_int, palf::LSN(start_lsn_int), palf::LSN(end_lsn_int));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
@ -13,13 +13,14 @@
|
||||
#ifndef OCEANBASE_OB_LOG_META_SQL_QUERYER_H_
|
||||
#define OCEANBASE_OB_LOG_META_SQL_QUERYER_H_
|
||||
|
||||
#include "lib/mysqlclient/ob_isql_client.h" // ObISQLClient
|
||||
#include "ob_cdc_tenant_query.h"
|
||||
#include "logservice/logfetcher/ob_log_data_dictionary_in_log_table.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace common
|
||||
{
|
||||
class ObMySQLProxy;
|
||||
namespace sqlclient
|
||||
{
|
||||
class ObMySQLResult;
|
||||
@ -28,50 +29,26 @@ class ObMySQLResult;
|
||||
|
||||
namespace libobcdc
|
||||
{
|
||||
class ObLogMetaDataSQLQueryer
|
||||
|
||||
// query OB_SYS_TENANT_ID in tenant_sync_mode and query specific tenant_id in cluster_sync_mode
|
||||
class ObLogMetaDataSQLQueryer : public ObCDCTenantQuery<logfetcher::DataDictionaryInLogInfo>
|
||||
{
|
||||
public:
|
||||
ObLogMetaDataSQLQueryer();
|
||||
virtual ~ObLogMetaDataSQLQueryer();
|
||||
int init(
|
||||
const int64_t cluster_id,
|
||||
const bool is_across_cluster,
|
||||
common::ObISQLClient &sql_proxy);
|
||||
bool is_inited() const { return is_inited_; }
|
||||
void destroy();
|
||||
|
||||
ObLogMetaDataSQLQueryer(const int64_t start_timstamp_ns, common::ObMySQLProxy &sql_proxy)
|
||||
: ObCDCTenantQuery(sql_proxy), start_timstamp_ns_(start_timstamp_ns) {}
|
||||
~ObLogMetaDataSQLQueryer() { start_timstamp_ns_ = OB_INVALID_TIMESTAMP; }
|
||||
public:
|
||||
int get_data_dict_in_log_info(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t start_timstamp_ns,
|
||||
int64_t &record_count,
|
||||
const int64_t start_timestamp_ns,
|
||||
logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info);
|
||||
|
||||
private:
|
||||
int do_query_(const uint64_t tenant_id,
|
||||
ObSqlString &sql,
|
||||
ObISQLClient::ReadResult &result);
|
||||
|
||||
template <typename RecordsType>
|
||||
int get_records_template_(
|
||||
common::sqlclient::ObMySQLResult &res,
|
||||
RecordsType &records,
|
||||
const char *event,
|
||||
int64_t &record_count);
|
||||
|
||||
// logfetcher::DataDictionaryInLogInfo
|
||||
// @param [in] res, result read from __all_virtual_data_dictionary_in_log
|
||||
// @param [out] data_dict_in_log_info
|
||||
int parse_record_from_row_(
|
||||
common::sqlclient::ObMySQLResult &res,
|
||||
logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info);
|
||||
|
||||
int build_sql_statement_(const uint64_t tenant_id, ObSqlString &sql) override;
|
||||
int parse_row_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult<logfetcher::DataDictionaryInLogInfo> &result) override;
|
||||
private:
|
||||
bool is_inited_; // whether this class is inited
|
||||
bool is_across_cluster_; // whether the SQL query across cluster
|
||||
int64_t cluster_id_; // ClusterID
|
||||
common::ObISQLClient *sql_proxy_; // sql_proxy to use
|
||||
|
||||
static const char* QUERY_SQL_FORMAT;
|
||||
private:
|
||||
int64_t start_timstamp_ns_;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObLogMetaDataSQLQueryer);
|
||||
};
|
||||
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include "ob_log_meta_data_service.h"
|
||||
#include "ob_log_instance.h"
|
||||
#include "ob_log_fetching_mode.h"
|
||||
#include "ob_log_meta_data_queryer.h"
|
||||
#include "share/backup/ob_archive_struct.h"
|
||||
#include "logservice/restoreservice/ob_log_archive_piece_mgr.h"
|
||||
#include "logservice/data_dictionary/ob_data_dict_meta_info.h"
|
||||
@ -39,7 +40,6 @@ ObLogMetaDataService &ObLogMetaDataService::get_instance()
|
||||
ObLogMetaDataService::ObLogMetaDataService() :
|
||||
is_inited_(false),
|
||||
fetcher_(),
|
||||
sql_queryer_(),
|
||||
baseline_loader_(),
|
||||
incremental_replayer_(),
|
||||
fetcher_dispatcher_()
|
||||
@ -56,7 +56,7 @@ int ObLogMetaDataService::init(
|
||||
const ClientFetchingMode fetching_mode,
|
||||
const share::ObBackupPathString &archive_dest,
|
||||
IObLogSysLsTaskHandler *sys_ls_handler,
|
||||
ObISQLClient *proxy,
|
||||
common::ObMySQLProxy *proxy,
|
||||
IObLogErrHandler *err_handler,
|
||||
const int64_t cluster_id,
|
||||
const ObLogConfig &cfg,
|
||||
@ -67,8 +67,6 @@ int ObLogMetaDataService::init(
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_ERROR("init twice", KR(ret));
|
||||
} else if (OB_FAIL(sql_queryer_.init(cluster_id, false, *proxy))) {
|
||||
LOG_ERROR("ObLogMetaDataSQLQueryer init fail", KR(ret));
|
||||
} else if (OB_FAIL(baseline_loader_.init(cfg))) {
|
||||
LOG_ERROR("ObLogMetaDataBaselineLoader init fail", KR(ret));
|
||||
} else if (OB_FAIL(incremental_replayer_.init())) {
|
||||
@ -93,7 +91,6 @@ void ObLogMetaDataService::destroy()
|
||||
{
|
||||
if (IS_INIT) {
|
||||
fetcher_.destroy();
|
||||
sql_queryer_.destroy();
|
||||
baseline_loader_.destroy();
|
||||
incremental_replayer_.destroy();
|
||||
fetcher_dispatcher_.destroy();
|
||||
@ -125,7 +122,7 @@ int ObLogMetaDataService::refresh_baseline_meta_data(
|
||||
LOG_ERROR("log_meta_data_service get_data_dict_in_log_info failed", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
ISTAT("get_data_dict_in_log_info success", K(tenant_id), K(start_timestamp_ns), K(data_dict_in_log_info));
|
||||
start_parameters.reset(data_dict_in_log_info.snapshot_scn_, start_timestamp_ns, data_dict_in_log_info);
|
||||
start_parameters.reset(data_dict_in_log_info.snapshot_scn_, std::max(start_timestamp_ns, data_dict_in_log_info.end_scn_), data_dict_in_log_info);
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -260,14 +257,18 @@ int ObLogMetaDataService::get_data_dict_in_log_info_in_archive_(
|
||||
|
||||
if (target_index >= item_arr_size) {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
LOG_ERROR("not datadict metainfo item older than start_timestamp_ns", K(target_index), K(data_dict_meta_info),
|
||||
LOG_ERROR("can't find datadict snapshot_scn older than start_timestamp_ns", K(target_index), K(data_dict_meta_info),
|
||||
K(start_timestamp_ns));
|
||||
} else {
|
||||
datadict::ObDataDictMetaInfoItem data_dict_item;
|
||||
data_dict_item = item_arr.at(target_index);
|
||||
const int64_t end_scn_val = (data_dict_item.end_scn_ == share::OB_INVALID_SCN_VAL) ? start_timestamp_ns : data_dict_item.end_scn_;
|
||||
|
||||
data_dict_in_log_info.reset(data_dict_item.snapshot_scn_,
|
||||
end_scn_val,
|
||||
palf::LSN(data_dict_item.start_lsn_),
|
||||
palf::LSN(data_dict_item.end_lsn_));
|
||||
LOG_INFO("get_data_dict_in_log_info_in_archive_ succ", K(data_dict_meta_info), K(start_timestamp_ns), K(data_dict_in_log_info));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -329,7 +330,7 @@ int ObLogMetaDataService::read_meta_info_in_archive_log_(
|
||||
LOG_WARN("data dict info is not valid", KR(ret), K(data_dict_meta_info));
|
||||
} else {
|
||||
const uint64_t tenant_id = data_dict_meta_info.get_tenant_id();
|
||||
LOG_INFO("read_meta_info_in_archive_log success", K(tenant_id), K(data_dict_meta_info));
|
||||
LOG_INFO("read_meta_info_in_archive_log success", K(tenant_id), K(start_timestamp_ns), K(data_dict_meta_info));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -343,7 +344,7 @@ int ObLogMetaDataService::read_meta_info_in_archive_log_(
|
||||
|
||||
int ObLogMetaDataService::get_data_dict_in_log_info_(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t start_timstamp_ns,
|
||||
const int64_t start_timestamp_ns,
|
||||
logfetcher::DataDictionaryInLogInfo &data_dict_in_log_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -355,39 +356,43 @@ int ObLogMetaDataService::get_data_dict_in_log_info_(
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_ERROR("ObLogMetaDataService is not initialized", KR(ret));
|
||||
} else {
|
||||
if (is_direct_fetching_mode(fetching_mode)) {
|
||||
while (OB_SUCC(ret) && ! done) {
|
||||
if (OB_FAIL(get_data_dict_in_log_info_in_archive_(start_timstamp_ns, data_dict_in_log_info))) {
|
||||
LOG_WARN("get_data_dict_in_log_info_in_archive_ failed", KR(ret), K(start_timstamp_ns), K(tenant_id));
|
||||
while (! done) {
|
||||
if (is_direct_fetching_mode(fetching_mode)) {
|
||||
if (OB_FAIL(get_data_dict_in_log_info_in_archive_(start_timestamp_ns, data_dict_in_log_info))) {
|
||||
LOG_WARN("get_data_dict_in_log_info_in_archive_ failed", KR(ret), K(start_timestamp_ns), K(tenant_id), K(data_dict_in_log_info));
|
||||
} else {
|
||||
done = true;
|
||||
}
|
||||
} else if (is_integrated_fetching_mode(fetching_mode)) {
|
||||
common::ObMySQLProxy& sql_proxy = TCTX.is_tenant_sync_mode() ? TCTX.mysql_proxy_.get_ob_mysql_proxy() : TCTX.tenant_sql_proxy_.get_ob_mysql_proxy();
|
||||
ObLogMetaDataSQLQueryer sql_queryer(start_timestamp_ns, sql_proxy);
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
ret = OB_SUCCESS;
|
||||
ob_usleep(100L * 1000L);
|
||||
}
|
||||
}
|
||||
} else if (is_integrated_fetching_mode(fetching_mode)) {
|
||||
while (OB_SUCC(ret) && ! done) {
|
||||
if (OB_FAIL(sql_queryer_.get_data_dict_in_log_info(tenant_id, start_timstamp_ns, record_count, data_dict_in_log_info))) {
|
||||
LOG_WARN("sql_queryer_ get_data_dict_in_log_info fail", KR(ret), K(tenant_id), K(start_timstamp_ns),
|
||||
K(record_count), K(data_dict_in_log_info));
|
||||
} else if (0 == record_count) {
|
||||
ret = OB_NEED_RETRY;
|
||||
LOG_WARN("No valid baseline data is currently available, need retry", KR(ret), K(tenant_id), K(start_timstamp_ns));
|
||||
if (OB_FAIL(sql_queryer.get_data_dict_in_log_info(tenant_id, start_timestamp_ns, data_dict_in_log_info))) {
|
||||
LOG_WARN("sql_queryer get_data_dict_in_log_info fail", KR(ret), K(tenant_id), K(start_timestamp_ns), K(data_dict_in_log_info));
|
||||
} else {
|
||||
done = true;
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
ret = OB_SUCCESS;
|
||||
ob_usleep(100L * 1000L);
|
||||
}
|
||||
} else {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
done = true;
|
||||
LOG_ERROR("[FATAL]fetching mode of TCTX is not valid", KR(ret), K(fetching_mode), K(tenant_id));
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) || done) {
|
||||
} else if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
done = true;
|
||||
LOG_ERROR("[FATAL][DATA_DICT] Can't find suitable data_dict to launch OBCDC", KR(ret), K(start_timestamp_ns));
|
||||
} else {
|
||||
const static int64_t RETRY_FUNC_PRINT_INTERVAL = 10 * _SEC_;
|
||||
const int64_t sleep_usec_on_error = 100 * _MSEC_;
|
||||
if (REACH_TIME_INTERVAL(RETRY_FUNC_PRINT_INTERVAL)) {
|
||||
LOG_WARN("[DATA_DICT] retry get data_dict_in_log_info", KR(ret));
|
||||
} else {
|
||||
LOG_TRACE("[DATA_DICT] retry get data_dict_in_log_info", KR(ret));
|
||||
}
|
||||
ret = OB_SUCCESS;
|
||||
ob_usleep(sleep_usec_on_error);
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("fetching mode of TCTX is not valid", KR(ret), K(fetching_mode), K(tenant_id));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -12,7 +12,6 @@
|
||||
#define OCEANBASE_LOG_META_DATA_SERVICE_H_
|
||||
|
||||
#include "common/ob_region.h"
|
||||
#include "lib/mysqlclient/ob_isql_client.h" // ObISQLClient
|
||||
#include "lib/allocator/ob_concurrent_fifo_allocator.h" // ObConcurrentFIFOAllocator
|
||||
#include "share/backup/ob_backup_struct.h" // ObBackupPathString
|
||||
#include "logservice/data_dictionary/ob_data_dict_meta_info.h" // ObDataDictMetaInfo
|
||||
@ -21,7 +20,6 @@
|
||||
#include "ob_log_task_pool.h"
|
||||
#include "ob_log_entry_task_pool.h"
|
||||
#include "logservice/logfetcher/ob_log_data_dictionary_in_log_table.h"
|
||||
#include "ob_log_meta_data_queryer.h"
|
||||
#include "ob_log_meta_data_baseline_loader.h"
|
||||
#include "ob_log_meta_data_replayer.h" // ObLogMetaDataReplayer
|
||||
#include "ob_log_meta_data_fetcher.h" // ObLogMetaDataFetcher
|
||||
@ -29,6 +27,10 @@
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace common
|
||||
{
|
||||
class ObMySQLProxy;
|
||||
}
|
||||
namespace libobcdc
|
||||
{
|
||||
class IObLogSysLsTaskHandler;
|
||||
@ -48,7 +50,7 @@ public:
|
||||
const ClientFetchingMode fetching_mode,
|
||||
const share::ObBackupPathString &archive_dest,
|
||||
IObLogSysLsTaskHandler *sys_ls_handler,
|
||||
common::ObISQLClient *proxy,
|
||||
common::ObMySQLProxy *proxy,
|
||||
IObLogErrHandler *err_handler,
|
||||
const int64_t cluster_id,
|
||||
const ObLogConfig &cfg,
|
||||
@ -112,7 +114,6 @@ private:
|
||||
private:
|
||||
bool is_inited_;
|
||||
ObLogMetaDataFetcher fetcher_;
|
||||
ObLogMetaDataSQLQueryer sql_queryer_;
|
||||
ObLogMetaDataBaselineLoader baseline_loader_;
|
||||
ObLogMetaDataReplayer incremental_replayer_;
|
||||
ObLogMetaDataFetcherDispatcher fetcher_dispatcher_;
|
||||
|
@ -25,6 +25,7 @@ struct DataDictionaryInLogInfo
|
||||
bool is_valid() const
|
||||
{
|
||||
return common::OB_INVALID_TIMESTAMP != snapshot_scn_
|
||||
&& common::OB_INVALID_TIMESTAMP != end_scn_
|
||||
&& start_lsn_.is_valid()
|
||||
&& end_lsn_.is_valid();
|
||||
}
|
||||
@ -32,16 +33,19 @@ struct DataDictionaryInLogInfo
|
||||
void reset()
|
||||
{
|
||||
snapshot_scn_ = common::OB_INVALID_TIMESTAMP;
|
||||
end_scn_ = common::OB_INVALID_TIMESTAMP;
|
||||
start_lsn_.reset();
|
||||
end_lsn_.reset();
|
||||
}
|
||||
|
||||
void reset(
|
||||
const int64_t snapshot_scn,
|
||||
const int64_t end_scn,
|
||||
const palf::LSN &start_lsn,
|
||||
const palf::LSN &end_lsn)
|
||||
{
|
||||
snapshot_scn_ = snapshot_scn;
|
||||
end_scn_ = end_scn;
|
||||
start_lsn_ = start_lsn;
|
||||
end_lsn_ = end_lsn;
|
||||
}
|
||||
@ -49,6 +53,7 @@ struct DataDictionaryInLogInfo
|
||||
DataDictionaryInLogInfo &operator=(const DataDictionaryInLogInfo &other)
|
||||
{
|
||||
snapshot_scn_ = other.snapshot_scn_;
|
||||
end_scn_ = other.end_scn_;
|
||||
start_lsn_ = other.start_lsn_;
|
||||
end_lsn_ = other.end_lsn_;
|
||||
return *this;
|
||||
@ -56,10 +61,12 @@ struct DataDictionaryInLogInfo
|
||||
|
||||
TO_STRING_KV(
|
||||
K_(snapshot_scn),
|
||||
K_(end_scn),
|
||||
K_(start_lsn),
|
||||
K_(end_lsn));
|
||||
|
||||
int64_t snapshot_scn_;
|
||||
int64_t end_scn_;
|
||||
palf::LSN start_lsn_;
|
||||
palf::LSN end_lsn_;
|
||||
};
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include "gtest/gtest.h"
|
||||
#include "lib/oblog/ob_log.h"
|
||||
#include "logservice/palf/log_define.h"
|
||||
#include "share/ob_errno.h"
|
||||
|
||||
#define private public
|
||||
#include "logservice/data_dictionary/ob_data_dict_meta_info.h"
|
||||
@ -36,12 +37,15 @@ TEST(ObDataDictMetaInfoItem, test_data_dict_meta_info_item)
|
||||
int64_t pos = 0;
|
||||
DataDictMetaInfoItemArr item_arr;
|
||||
for (int64_t i = 0; i < item_cnt; i++) {
|
||||
item.reset(random.get(), random.get(), random.get());
|
||||
item.reset(random.get(), random.get(), random.get(), random.get());
|
||||
EXPECT_EQ(OB_SUCCESS, item_arr.push_back(item));
|
||||
EXPECT_EQ(OB_SUCCESS, item.serialize(buf, buf_size, pos));
|
||||
}
|
||||
int64_t deserialize_pos = 0;
|
||||
EXPECT_EQ(common::OB_VERSION_NOT_MATCH, item.deserialize(buf, buf_size, deserialize_pos));
|
||||
deserialize_pos = 0;
|
||||
for (int64_t i = 0; i < item_cnt; i++) {
|
||||
item.set_version(datadict::ObDataDictMetaInfoHeader::DATADICT_METAINFO_META_VERSION);
|
||||
EXPECT_EQ(OB_SUCCESS, item.deserialize(buf, buf_size, deserialize_pos));
|
||||
EXPECT_EQ(item, item_arr.at(i));
|
||||
}
|
||||
@ -90,7 +94,8 @@ TEST(ObDataDictMetaInfo, test_data_dict_meta_info)
|
||||
while (have_enough_buffer ) {
|
||||
item.reset(random.get(0, max_scn_val),
|
||||
random.get(0, palf::LOG_MAX_LSN_VAL),
|
||||
random.get(0, palf::LOG_MAX_LSN_VAL));
|
||||
random.get(0, palf::LOG_MAX_LSN_VAL),
|
||||
random.get(0, max_scn_val));
|
||||
if (total_item_data_size + item.get_serialize_size() <= buffer_size - header_size) {
|
||||
item_count_expected++;
|
||||
total_item_data_size += item.get_serialize_size();
|
||||
|
Loading…
x
Reference in New Issue
Block a user