diff --git a/src/logservice/libobcdc/src/ob_cdc_tenant_query.h b/src/logservice/libobcdc/src/ob_cdc_tenant_query.h new file mode 100644 index 0000000000..68fc1b89de --- /dev/null +++ b/src/logservice/libobcdc/src/ob_cdc_tenant_query.h @@ -0,0 +1,162 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_LIBOBCDC_TENANT_QUERYER_H_ +#define OCEANBASE_LIBOBCDC_TENANT_QUERYER_H_ + +#include "lib/mysqlclient/ob_mysql_proxy.h" +#include "ob_log_utils.h" +namespace oceanbase +{ +namespace libobcdc +{ +// ResultType = T +template +class ObCDCQueryResult +{ +public: + ObCDCQueryResult(T &result) : affect_rows_(0), data_(result) {} + virtual ~ObCDCQueryResult() { affect_rows_ = 0; } +public: + OB_INLINE void inc_affect_rows() { affect_rows_++; } + OB_INLINE bool has_data() const { return affect_rows_ > 0; } + OB_INLINE bool is_empty() const { return ! has_data(); } + OB_INLINE T& get_data() const { return data_; } + +TO_STRING_KV(K_(affect_rows)); + +private: + int64_t affect_rows_; + T &data_; +}; + +// ResultType = T +template +class ObCDCTenantQuery +{ +public: + ObCDCTenantQuery(common::ObMySQLProxy &sql_proxy) : sql_proxy_(&sql_proxy) {} + virtual ~ObCDCTenantQuery() { sql_proxy_ = nullptr; } +public: + int query(const uint64_t tenant_id, ObCDCQueryResult &query_result, const int64_t retry_timeout); + int query(ObCDCQueryResult &result, const int64_t retry_timeout) + { return query(OB_SYS_TENANT_ID, result, retry_timeout); } +protected: + virtual int build_sql_statement_(const uint64_t tenant_id, ObSqlString &sql) = 0; + // convert query rusult from sql_result to result and deepcopy is required if necessory + virtual int parse_row_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult &result) = 0; +private: + +private: + int build_sql_statement_(ObSqlString &sql) { return build_sql_statement_(OB_SYS_TENANT_ID, sql); } + int parse_sql_result_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult &result); + +private: + static const int64_t DEFAULT_RETRY_TIMEOUT = 1000; + +private: + common::ObMySQLProxy *sql_proxy_; +}; + +template +int ObCDCTenantQuery::query(const uint64_t tenant_id, ObCDCQueryResult &query_result, const int64_t retry_timeout) +{ + int ret = OB_SUCCESS; + common::ObSqlString sql; + + if (OB_ISNULL(sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + OBLOG_LOG(ERROR, "invalid sql proxy", KR(ret)); + } else if (OB_FAIL(build_sql_statement_(tenant_id, sql))) { + OBLOG_LOG(ERROR, "build_sql_statement_ failed", KR(ret), K(tenant_id), K(sql)); + } else { + bool query_done = false; + int retry_cnt = 0; + const int64_t retry_fail_sleep_time = 1 * _MSEC_; + const int64_t retry_warn_interval = 5 * _SEC_; + const int64_t start_time = get_timestamp(); + const int64_t end_time = start_time + retry_timeout; + + while (! query_done) { + common::sqlclient::ObMySQLResult *sql_result = nullptr; + + SMART_VAR(ObISQLClient::ReadResult, result) { + if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) { + OBLOG_LOG(ERROR, "sql read failed from sql_proxy", KR(ret), K(tenant_id), K(sql)); + } else if (OB_ISNULL(sql_result = result.get_result())) { + ret = OB_ERR_UNEXPECTED; + OBLOG_LOG(ERROR, "invalid sql_result", KR(ret), K(tenant_id), K(sql)); + } else { + query_done = true; + + if (OB_FAIL(parse_sql_result_(*sql_result, query_result))) { + OBLOG_LOG(ERROR, "parse_sql_result_ failed", KR(ret), K(tenant_id)); + } + } + } + + if (! query_done) { + int64_t cur_time = get_timestamp(); + + if (cur_time < end_time) { + retry_cnt ++; + if (TC_REACH_TIME_INTERVAL(retry_warn_interval)) { + OBLOG_LOG(INFO, "tenant query retring", KR(ret), + K(tenant_id), K(retry_cnt), "remain_retry_time", end_time - cur_time, K(sql)); + } + ob_usleep(retry_fail_sleep_time); + } else { + // query_done but failed. + query_done = true; + OBLOG_LOG(WARN, "tenant query failed after retry", KR(ret), K(tenant_id), K(sql), K(retry_cnt)); + } + } + } + } + + return ret; +} + +template +int ObCDCTenantQuery::parse_sql_result_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult &result) +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(result.has_data())) { + ret = OB_STATE_NOT_MATCH; + OBLOG_LOG(ERROR, "expect empty query_result before query begin", KR(ret), K(result)); + } else { + while (OB_SUCC(ret)) { + if (OB_FAIL(sql_result.next())) { + if (OB_ITER_END != ret) { + OBLOG_LOG(ERROR, "iterate sql result failed", KR(ret), K(result)); + } + } else { + result.inc_affect_rows(); + + if (OB_FAIL(parse_row_(sql_result, result))) { + OBLOG_LOG(ERROR, "parse_row_ failed", KR(ret), K(result)); + } + } + } + } + + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + } + + return ret; +} + +} // namespace libobcdc +} // namespace oceanbase +#endif \ No newline at end of file diff --git a/src/logservice/libobcdc/src/ob_log_instance.cpp b/src/logservice/libobcdc/src/ob_log_instance.cpp index f2128cda20..c2cddd80c9 100644 --- a/src/logservice/libobcdc/src/ob_log_instance.cpp +++ b/src/logservice/libobcdc/src/ob_log_instance.cpp @@ -557,8 +557,11 @@ int ObLogInstance::init_common_(uint64_t start_tstamp_ns, ERROR_CALLBACK err_cb) // 2. Change the schema to WARN after the startup is complete OB_LOGGER.set_mod_log_levels(TCONF.init_log_level.str()); + if (OB_FAIL(common::ObClockGenerator::init())) { + LOG_ERROR("failed to init ob clock generator", KR(ret)); + } // 校验配置项是否满足期望 - if (OB_FAIL(TCONF.check_all())) { + else if (OB_FAIL(TCONF.check_all())) { LOG_ERROR("check config fail", KR(ret)); } else if (OB_FAIL(dump_config_())) { LOG_ERROR("dump_config_ fail", KR(ret)); @@ -750,13 +753,6 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns) LOG_INFO("set working mode", K(working_mode_str), K(working_mode_), "working_mode", print_working_mode(working_mode_)); } - // init ObClockGenerator - if (OB_SUCC(ret)) { - if (OB_FAIL(common::ObClockGenerator::init())) { - LOG_ERROR("failed to init ob clock generator", KR(ret)); - } - } - if (OB_SUCC(ret)) { if (OB_UNLIKELY(! is_refresh_mode_valid(refresh_mode))) { ret = OB_INVALID_CONFIG; diff --git a/src/logservice/libobcdc/src/ob_log_ls_getter.cpp b/src/logservice/libobcdc/src/ob_log_ls_getter.cpp index 33e462b919..a65990f9c3 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_getter.cpp +++ b/src/logservice/libobcdc/src/ob_log_ls_getter.cpp @@ -20,6 +20,53 @@ namespace oceanbase { namespace libobcdc { + +const char* TenantLSQueryer::QUERY_LS_INFO_SQL_FORMAT = + "SELECT LS_ID FROM " + "(SELECT LS_ID, STATUS, CREATE_SCN, CASE WHEN STATUS = 'DROPPED' THEN ora_rowscn ELSE 1 END AS DROP_SCN FROM %s) " + "WHERE CREATE_SCN <= %lu AND (DROP_SCN > %lu OR DROP_SCN = 1) AND STATUS NOT IN ('CREATE_ABORT', 'CREATING');"; + +int TenantLSQueryer::build_sql_statement_(const uint64_t tenant_id, ObSqlString &sql) +{ + int ret = OB_SUCCESS; + share::SCN snapshot_scn; + + if (OB_FAIL(snapshot_scn.convert_for_gts(snapshot_ts_ns_))) { + LOG_ERROR("convert_for_gts failed", KR(ret), K(tenant_id), K_(snapshot_ts_ns)); + } else if (OB_UNLIKELY(! snapshot_scn.is_valid())) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("snapshot_scn is not valid", KR(ret), K(tenant_id), K(snapshot_scn), K_(snapshot_ts_ns)); + } else { + uint64_t snapshto_scn_val = snapshot_scn.get_val_for_inner_table_field(); + + if (OB_FAIL(sql.assign_fmt(QUERY_LS_INFO_SQL_FORMAT, "__ALL_LS", snapshto_scn_val, snapshto_scn_val))) { + LOG_ERROR("assign_fmt for TenantLSQuerySQL failed", KR(ret), + K_(snapshot_ts_ns), K(snapshot_scn), K(snapshto_scn_val), K(sql)); + } + } + + return ret; +} + +int TenantLSQueryer::parse_row_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult &result) +{ + int ret = OB_SUCCESS; + int64_t ls_id = share::ObLSID::INVALID_LS_ID; + EXTRACT_UINT_FIELD_MYSQL(sql_result, "LS_ID", ls_id, int64_t); + share::ObLSID ob_ls_id(ls_id); + + if (OB_FAIL(ret)) { + LOG_ERROR("extract ls_id field from sql_result failed", KR(ret), K_(snapshot_ts_ns), K(ob_ls_id)); + } else if (OB_UNLIKELY(! ob_ls_id.is_valid())) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("invalid ls_id", KR(ret), K_(snapshot_ts_ns), K(ob_ls_id)); + } else if (OB_FAIL(result.get_data().push_back(ob_ls_id))) { + LOG_ERROR("push_back ls_id into query_result failed", KR(ret), K_(snapshot_ts_ns), K(ob_ls_id), K(result)); + } + + return ret; +} + ObLogLsGetter::ObLogLsGetter() : is_inited_(false), tenant_ls_ids_cache_() @@ -31,7 +78,7 @@ ObLogLsGetter::~ObLogLsGetter() destroy(); } -int ObLogLsGetter::init(const common::ObIArray &tenant_ids) +int ObLogLsGetter::init(const common::ObIArray &tenant_ids, const int64_t start_tstamp_ns) { int ret = OB_SUCCESS; @@ -47,8 +94,8 @@ int ObLogLsGetter::init(const common::ObIArray &tenant_ids) if (OB_SYS_TENANT_ID == tenant_id || is_meta_tenant(tenant_id)) { // do nothing - } else if (OB_FAIL(query_and_set_tenant_ls_info_(tenant_id))) { - LOG_ERROR("query_and_set_tenant_ls_info_ failed", KR(ret), K(tenant_id)); + } else if (OB_FAIL(query_and_set_tenant_ls_info_(tenant_id, start_tstamp_ns))) { + LOG_ERROR("query_and_set_tenant_ls_info_ failed", KR(ret), K(tenant_id), K(start_tstamp_ns)); } } @@ -70,6 +117,7 @@ void ObLogLsGetter::destroy() int ObLogLsGetter::get_ls_ids( const uint64_t tenant_id, + const int64_t snapshot_ts, common::ObIArray &ls_id_array) { int ret = OB_SUCCESS; @@ -84,7 +132,7 @@ int ObLogLsGetter::get_ls_ids( } else { ret = OB_SUCCESS; // query and set - if (OB_FAIL(query_and_set_tenant_ls_info_(tenant_id))) { + if (OB_FAIL(query_and_set_tenant_ls_info_(tenant_id, snapshot_ts))) { LOG_ERROR("query_and_set_tenant_ls_info_ failed", KR(ret), K(tenant_id)); } } @@ -100,28 +148,18 @@ int ObLogLsGetter::get_ls_ids( } int ObLogLsGetter::query_and_set_tenant_ls_info_( - const uint64_t tenant_id) + const uint64_t tenant_id, + const int64_t snapshot_ts) { int ret = OB_SUCCESS; - ObLogSysTableHelper::TenantLSIDs tenant_ls_ids; LSIDArray ls_ids; - if (OB_FAIL(query_tenant_ls_info_(tenant_id, tenant_ls_ids))) { - LOG_ERROR("query_tenant_ls_info_ failed", KR(ret), K(tenant_id)); + if (OB_FAIL(query_tenant_ls_info_(tenant_id, snapshot_ts, ls_ids))) { + LOG_ERROR("query_tenant_ls_info failed", KR(ret), K(tenant_id), K(snapshot_ts)); + } else if (OB_FAIL(tenant_ls_ids_cache_.insert(tenant_id, ls_ids))) { + LOG_ERROR("tenant_ls_ids_cache_ insert data failed", KR(ret), K(tenant_id), K(ls_ids)); } else { - ARRAY_FOREACH_N(tenant_ls_ids, ls_ids_idx, ls_ids_count) { - if (OB_FAIL(ls_ids.push_back(tenant_ls_ids.at(ls_ids_idx)))) { - LOG_ERROR("ls_ids push_back failed", KR(ret), K(ls_ids), K(tenant_ls_ids)); - } - } - - if (OB_SUCC(ret)) { - if (OB_FAIL(tenant_ls_ids_cache_.insert(tenant_id, ls_ids))) { - LOG_ERROR("tenant_ls_ids_cache_ insert failed", KR(ret), K(tenant_id), K(ls_ids)); - } else { - LOG_INFO("tenant_ls_ids_cache_ insert succ", K(tenant_id), K(ls_ids)); - } - } // OB_SUCC + LOG_INFO("tenant_ls_ids_cache_ insert success", K(tenant_id), K(snapshot_ts), K(ls_ids)); } return ret; @@ -129,33 +167,28 @@ int ObLogLsGetter::query_and_set_tenant_ls_info_( int ObLogLsGetter::query_tenant_ls_info_( const uint64_t tenant_id, - ObLogSysTableHelper::TenantLSIDs &tenant_ls_ids) + const int64_t snapshot_ts, + LSIDArray &ls_array) { int ret = OB_SUCCESS; - IObLogSysTableHelper *systable_helper = TCTX.systable_helper_; - tenant_ls_ids.reset(); - bool done = false; + const static int64_t QUERY_TIMEOUT = 10 * _MIN_; + ls_array.reset(); + ObCDCQueryResult query_result(ls_array); + TenantLSQueryer queryer(snapshot_ts, TCTX.tenant_sql_proxy_.get_ob_mysql_proxy()); - if (OB_ISNULL(systable_helper)) { + if (OB_UNLIKELY(! is_user_tenant(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_ERROR("expected user_tenant for TenantLSGetter", KR(ret), K(tenant_id)); + } else if (OB_FAIL(queryer.query(tenant_id, query_result, QUERY_TIMEOUT))) { + LOG_ERROR("query tenant ls_id failed", KR(ret), K(tenant_id), K(snapshot_ts), K(ls_array)); + } else if (OB_UNLIKELY(query_result.is_empty())) { ret = OB_ERR_UNEXPECTED; - LOG_ERROR("systable_helper is NULL", KR(ret)); - } else { - while (! done && OB_SUCCESS == ret) { - if (OB_FAIL(systable_helper->query_tenant_ls_info(tenant_id, tenant_ls_ids))) { - LOG_WARN("systable_helper query_tenant_ls_info fail", KR(ret), K(tenant_id), K(tenant_ls_ids)); - } else { - done = true; - } - - if (OB_NEED_RETRY == ret) { - ret = OB_SUCCESS; - ob_usleep(100L * 1000L); - } - } + LOG_ERROR("tenant ls_id query result empty", KR(ret), K(tenant_id), K(snapshot_ts), K(query_result), K(ls_array)); } return ret; } + } // namespace libobcdc } // namespace oceanbase diff --git a/src/logservice/libobcdc/src/ob_log_ls_getter.h b/src/logservice/libobcdc/src/ob_log_ls_getter.h index c9cd38501c..d682e89830 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_getter.h +++ b/src/logservice/libobcdc/src/ob_log_ls_getter.h @@ -16,33 +16,54 @@ #include "lib/hash/ob_linear_hash_map.h" // ObLinearHashMap #include "ob_log_systable_helper.h" // ObLogSysTableHelper #include "ob_log_tenant.h" +#include "ob_cdc_tenant_query.h" namespace oceanbase { namespace libobcdc { + +typedef ObArray LSIDArray; + +class TenantLSQueryer : public ObCDCTenantQuery +{ +public: + TenantLSQueryer(const int64_t snapshot_ts_ns, common::ObMySQLProxy &sql_proxy) + : ObCDCTenantQuery(sql_proxy), snapshot_ts_ns_(snapshot_ts_ns) {} + ~TenantLSQueryer() { snapshot_ts_ns_ = OB_INVALID_TIMESTAMP; } +private: + int build_sql_statement_(const uint64_t tenant_id, ObSqlString &sql) override; + int parse_row_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult &result) override; +private: + static const char* QUERY_LS_INFO_SQL_FORMAT; +private: + int64_t snapshot_ts_ns_; +}; + class ObLogLsGetter { public: ObLogLsGetter(); ~ObLogLsGetter(); - int init(const common::ObIArray &tenant_ids); + int init(const common::ObIArray &tenant_ids, const int64_t start_tstamp_ns); void destroy(); int get_ls_ids( const uint64_t tenant_id, + const int64_t snapshot_ts, common::ObIArray &ls_id_array); private: int query_and_set_tenant_ls_info_( - const uint64_t tenant_id); + const uint64_t tenant_id, + const int64_t snapshot_ts); int query_tenant_ls_info_( const uint64_t tenant_id, - ObLogSysTableHelper::TenantLSIDs &tenant_ls_ids); + const int64_t snapshot_ts, + LSIDArray &ls_array); private: - typedef ObArray LSIDArray; typedef common::ObLinearHashMap TenantLSIDsCache; bool is_inited_; diff --git a/src/logservice/libobcdc/src/ob_log_ls_op_processor.cpp b/src/logservice/libobcdc/src/ob_log_ls_op_processor.cpp index 1b2153ac59..415d1b41ad 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_op_processor.cpp +++ b/src/logservice/libobcdc/src/ob_log_ls_op_processor.cpp @@ -34,7 +34,7 @@ int ObLogLSOpProcessor::process_ls_op( ret = OB_INVALID_ARGUMENT; LOG_ERROR("ls attr is invalid", KR(ret), K(ls_attr)); } else if (OB_FAIL(TCTX.get_tenant_guard(tenant_id, guard))) { - LOG_ERROR("get_tenant fail", KR(ret), K(tenant_id), K(guard)); + LOG_WARN("get_tenant fail", KR(ret), K(tenant_id), K(guard)); } else if (OB_ISNULL(tenant = guard.get_tenant())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("get tenant fail, tenant is NULL", KR(ret), K(tenant_id)); diff --git a/src/logservice/libobcdc/src/ob_log_meta_data_replayer.cpp b/src/logservice/libobcdc/src/ob_log_meta_data_replayer.cpp index aabe988104..7c7ec67414 100644 --- a/src/logservice/libobcdc/src/ob_log_meta_data_replayer.cpp +++ b/src/logservice/libobcdc/src/ob_log_meta_data_replayer.cpp @@ -14,8 +14,8 @@ #include "ob_log_meta_data_replayer.h" #include "ob_log_part_trans_task.h" -#define _STAT(level, fmt, args...) _OBLOG_LOG(level, "[LOG_META_DATA] [REPALYER] " fmt, ##args) -#define STAT(level, fmt, args...) OBLOG_LOG(level, "[LOG_META_DATA] [REPALYER] " fmt, ##args) +#define _STAT(level, fmt, args...) _OBLOG_LOG(level, "[LOG_META_DATA] [REPLAYER] " fmt, ##args) +#define STAT(level, fmt, args...) OBLOG_LOG(level, "[LOG_META_DATA] [REPLAYER] " fmt, ##args) #define _ISTAT(fmt, args...) _STAT(INFO, fmt, ##args) #define ISTAT(fmt, args...) STAT(INFO, fmt, ##args) #define _DSTAT(fmt, args...) _STAT(DEBUG, fmt, ##args) @@ -139,7 +139,7 @@ int ObLogMetaDataReplayer::replay( "TRANS_COUNT(TOTAL=%ld DDL_TRANS=%ld/%ld LS_OP=%ld)", tenant_id, start_timestamp_ns, NTS_TO_STR(start_timestamp_ns), replay_info_stat.total_part_trans_task_count_, - replay_info_stat.ddl_part_trans_task_repalyed_count_, + replay_info_stat.ddl_part_trans_task_replayed_count_, replay_info_stat.ddl_part_trans_task_toal_count_, replay_info_stat.ls_op_part_trans_task_count_); } @@ -165,7 +165,7 @@ int ObLogMetaDataReplayer::handle_ddl_trans_( // Only DDL transactions less than or equal to the start timestamp need to be replayed if (trans_commit_version <= start_timestamp_ns) { DSTAT("handle DDL_TRANS to be replayed", K(part_trans_task)); - replay_info_stat.ddl_part_trans_task_repalyed_count_++; + replay_info_stat.ddl_part_trans_task_replayed_count_++; if (OB_FAIL(part_trans_task.parse_multi_data_source_data_for_ddl("ObLogMetaDataReplayer"))) { LOG_ERROR("parse_multi_data_source_data_for_ddl failed", KR(ret), K(part_trans_task)); diff --git a/src/logservice/libobcdc/src/ob_log_meta_data_replayer.h b/src/logservice/libobcdc/src/ob_log_meta_data_replayer.h index a6092ddbea..22921d1c3f 100644 --- a/src/logservice/libobcdc/src/ob_log_meta_data_replayer.h +++ b/src/logservice/libobcdc/src/ob_log_meta_data_replayer.h @@ -63,13 +63,13 @@ private: { total_part_trans_task_count_ = 0; ddl_part_trans_task_toal_count_ = 0; - ddl_part_trans_task_repalyed_count_ = 0; + ddl_part_trans_task_replayed_count_ = 0; ls_op_part_trans_task_count_ = 0; } int64_t total_part_trans_task_count_; int64_t ddl_part_trans_task_toal_count_; - int64_t ddl_part_trans_task_repalyed_count_; + int64_t ddl_part_trans_task_replayed_count_; int64_t ls_op_part_trans_task_count_; }; diff --git a/src/logservice/libobcdc/src/ob_log_schema_incremental_replay.cpp b/src/logservice/libobcdc/src/ob_log_schema_incremental_replay.cpp index e3c1fe8198..e1ef309d45 100644 --- a/src/logservice/libobcdc/src/ob_log_schema_incremental_replay.cpp +++ b/src/logservice/libobcdc/src/ob_log_schema_incremental_replay.cpp @@ -13,8 +13,8 @@ #include "ob_log_schema_incremental_replay.h" -#define _STAT(level, fmt, args...) _OBLOG_LOG(level, "[LOG_META_DATA] [REPALYER] [SCHMEA] " fmt, ##args) -#define STAT(level, fmt, args...) OBLOG_LOG(level, "[LOG_META_DATA] [REPALYER] [SCHMEA] " fmt, ##args) +#define _STAT(level, fmt, args...) _OBLOG_LOG(level, "[LOG_META_DATA] [REPLAYER] [SCHMEA] " fmt, ##args) +#define STAT(level, fmt, args...) OBLOG_LOG(level, "[LOG_META_DATA] [REPLAYER] [SCHMEA] " fmt, ##args) #define _ISTAT(fmt, args...) _STAT(INFO, fmt, ##args) #define ISTAT(fmt, args...) STAT(INFO, fmt, ##args) #define _DSTAT(fmt, args...) _STAT(DEBUG, fmt, ##args) diff --git a/src/logservice/libobcdc/src/ob_log_systable_helper.cpp b/src/logservice/libobcdc/src/ob_log_systable_helper.cpp index 37f59dc6f3..59fcb65fff 100644 --- a/src/logservice/libobcdc/src/ob_log_systable_helper.cpp +++ b/src/logservice/libobcdc/src/ob_log_systable_helper.cpp @@ -20,7 +20,7 @@ #include "common/ob_role.h" // LEADER #include "share/inner_table/ob_inner_table_schema_constants.h" // OB_***_TNAME -#include "share/schema/ob_schema_struct.h" // TenantStatus +#include "share/schema/ob_schema_struct.h" // TenantStatus, ObTenantStatus #include "ob_log_instance.h" // TCTX #include "ob_log_config.h" // ObLogConfig, TCONF #include "ob_log_utils.h" diff --git a/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp b/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp index 7af240ad67..73a31b1901 100644 --- a/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp +++ b/src/logservice/libobcdc/src/ob_log_tenant_mgr.cpp @@ -432,8 +432,8 @@ int ObLogTenantMgr::start_tenant_service_( } else if (OB_SYS_TENANT_ID == tenant_id) { // sys tenant, do nothing } else if (! is_normal_new_created_tenant) { - if (OB_FAIL(ls_getter_.get_ls_ids(tenant_id, ls_id_array))) { - LOG_ERROR("ls_getter_ get_ls_ids failed", KR(ret), K(tenant_id), K(ls_id_array)); + if (OB_FAIL(ls_getter_.get_ls_ids(tenant_id, start_tstamp_ns, ls_id_array))) { + LOG_ERROR("ls_getter_ get_ls_ids failed", KR(ret), K(tenant_id), K(ls_id_array), K(start_tstamp_ns)); } } } else if (is_data_dict_refresh_mode(refresh_mode_)) { @@ -1333,8 +1333,8 @@ int ObLogTenantMgr::get_tenant_ids_( // get available tenant id list else if (OB_FAIL(sys_schema_guard.get_available_tenant_ids(tenant_id_list, timeout))) { LOG_ERROR("get_available_tenant_ids fail", KR(ret), K(tenant_id_list), K(timeout)); - } else if (OB_FAIL(ls_getter_.init(tenant_id_list))) { - LOG_ERROR("ObLogLsGetter init fail", KR(ret), K(tenant_id_list)); + } else if (OB_FAIL(ls_getter_.init(tenant_id_list, start_tstamp_ns))) { + LOG_ERROR("ObLogLsGetter init fail", KR(ret), K(tenant_id_list), K(start_tstamp_ns)); } } else if (is_data_dict_refresh_mode(refresh_mode_)) { IObLogSysTableHelper *systable_helper = TCTX.systable_helper_; diff --git a/src/logservice/libobcdc/src/ob_ls_worker.h b/src/logservice/libobcdc/src/ob_ls_worker.h index 1e5d9c11ba..3b6e39df37 100644 --- a/src/logservice/libobcdc/src/ob_ls_worker.h +++ b/src/logservice/libobcdc/src/ob_ls_worker.h @@ -100,7 +100,6 @@ public: public: // Overloading thread handling functions - // virtual int handle(void *data, const int64_t thread_index, volatile bool &stop_flag); virtual void handle(void *data, volatile bool &stop_flag) override; public: