[CP] [OBCDC] Support multi-version ls while using online schema
This commit is contained in:
162
src/logservice/libobcdc/src/ob_cdc_tenant_query.h
Normal file
162
src/logservice/libobcdc/src/ob_cdc_tenant_query.h
Normal file
@ -0,0 +1,162 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#ifndef OCEANBASE_LIBOBCDC_TENANT_QUERYER_H_
|
||||
#define OCEANBASE_LIBOBCDC_TENANT_QUERYER_H_
|
||||
|
||||
#include "lib/mysqlclient/ob_mysql_proxy.h"
|
||||
#include "ob_log_utils.h"
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace libobcdc
|
||||
{
|
||||
// ResultType = T
|
||||
template<typename T>
|
||||
class ObCDCQueryResult
|
||||
{
|
||||
public:
|
||||
ObCDCQueryResult(T &result) : affect_rows_(0), data_(result) {}
|
||||
virtual ~ObCDCQueryResult() { affect_rows_ = 0; }
|
||||
public:
|
||||
OB_INLINE void inc_affect_rows() { affect_rows_++; }
|
||||
OB_INLINE bool has_data() const { return affect_rows_ > 0; }
|
||||
OB_INLINE bool is_empty() const { return ! has_data(); }
|
||||
OB_INLINE T& get_data() const { return data_; }
|
||||
|
||||
TO_STRING_KV(K_(affect_rows));
|
||||
|
||||
private:
|
||||
int64_t affect_rows_;
|
||||
T &data_;
|
||||
};
|
||||
|
||||
// ResultType = T
|
||||
template<typename T>
|
||||
class ObCDCTenantQuery
|
||||
{
|
||||
public:
|
||||
ObCDCTenantQuery(common::ObMySQLProxy &sql_proxy) : sql_proxy_(&sql_proxy) {}
|
||||
virtual ~ObCDCTenantQuery() { sql_proxy_ = nullptr; }
|
||||
public:
|
||||
int query(const uint64_t tenant_id, ObCDCQueryResult<T> &query_result, const int64_t retry_timeout);
|
||||
int query(ObCDCQueryResult<T> &result, const int64_t retry_timeout)
|
||||
{ return query(OB_SYS_TENANT_ID, result, retry_timeout); }
|
||||
protected:
|
||||
virtual int build_sql_statement_(const uint64_t tenant_id, ObSqlString &sql) = 0;
|
||||
// convert query rusult from sql_result to result and deepcopy is required if necessory
|
||||
virtual int parse_row_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult<T> &result) = 0;
|
||||
private:
|
||||
|
||||
private:
|
||||
int build_sql_statement_(ObSqlString &sql) { return build_sql_statement_(OB_SYS_TENANT_ID, sql); }
|
||||
int parse_sql_result_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult<T> &result);
|
||||
|
||||
private:
|
||||
static const int64_t DEFAULT_RETRY_TIMEOUT = 1000;
|
||||
|
||||
private:
|
||||
common::ObMySQLProxy *sql_proxy_;
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
int ObCDCTenantQuery<T>::query(const uint64_t tenant_id, ObCDCQueryResult<T> &query_result, const int64_t retry_timeout)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObSqlString sql;
|
||||
|
||||
if (OB_ISNULL(sql_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OBLOG_LOG(ERROR, "invalid sql proxy", KR(ret));
|
||||
} else if (OB_FAIL(build_sql_statement_(tenant_id, sql))) {
|
||||
OBLOG_LOG(ERROR, "build_sql_statement_ failed", KR(ret), K(tenant_id), K(sql));
|
||||
} else {
|
||||
bool query_done = false;
|
||||
int retry_cnt = 0;
|
||||
const int64_t retry_fail_sleep_time = 1 * _MSEC_;
|
||||
const int64_t retry_warn_interval = 5 * _SEC_;
|
||||
const int64_t start_time = get_timestamp();
|
||||
const int64_t end_time = start_time + retry_timeout;
|
||||
|
||||
while (! query_done) {
|
||||
common::sqlclient::ObMySQLResult *sql_result = nullptr;
|
||||
|
||||
SMART_VAR(ObISQLClient::ReadResult, result) {
|
||||
if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
|
||||
OBLOG_LOG(ERROR, "sql read failed from sql_proxy", KR(ret), K(tenant_id), K(sql));
|
||||
} else if (OB_ISNULL(sql_result = result.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OBLOG_LOG(ERROR, "invalid sql_result", KR(ret), K(tenant_id), K(sql));
|
||||
} else {
|
||||
query_done = true;
|
||||
|
||||
if (OB_FAIL(parse_sql_result_(*sql_result, query_result))) {
|
||||
OBLOG_LOG(ERROR, "parse_sql_result_ failed", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (! query_done) {
|
||||
int64_t cur_time = get_timestamp();
|
||||
|
||||
if (cur_time < end_time) {
|
||||
retry_cnt ++;
|
||||
if (TC_REACH_TIME_INTERVAL(retry_warn_interval)) {
|
||||
OBLOG_LOG(INFO, "tenant query retring", KR(ret),
|
||||
K(tenant_id), K(retry_cnt), "remain_retry_time", end_time - cur_time, K(sql));
|
||||
}
|
||||
ob_usleep(retry_fail_sleep_time);
|
||||
} else {
|
||||
// query_done but failed.
|
||||
query_done = true;
|
||||
OBLOG_LOG(WARN, "tenant query failed after retry", KR(ret), K(tenant_id), K(sql), K(retry_cnt));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
int ObCDCTenantQuery<T>::parse_sql_result_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult<T> &result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_UNLIKELY(result.has_data())) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
OBLOG_LOG(ERROR, "expect empty query_result before query begin", KR(ret), K(result));
|
||||
} else {
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(sql_result.next())) {
|
||||
if (OB_ITER_END != ret) {
|
||||
OBLOG_LOG(ERROR, "iterate sql result failed", KR(ret), K(result));
|
||||
}
|
||||
} else {
|
||||
result.inc_affect_rows();
|
||||
|
||||
if (OB_FAIL(parse_row_(sql_result, result))) {
|
||||
OBLOG_LOG(ERROR, "parse_row_ failed", KR(ret), K(result));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace libobcdc
|
||||
} // namespace oceanbase
|
||||
#endif
|
||||
@ -557,8 +557,11 @@ int ObLogInstance::init_common_(uint64_t start_tstamp_ns, ERROR_CALLBACK err_cb)
|
||||
// 2. Change the schema to WARN after the startup is complete
|
||||
OB_LOGGER.set_mod_log_levels(TCONF.init_log_level.str());
|
||||
|
||||
if (OB_FAIL(common::ObClockGenerator::init())) {
|
||||
LOG_ERROR("failed to init ob clock generator", KR(ret));
|
||||
}
|
||||
// 校验配置项是否满足期望
|
||||
if (OB_FAIL(TCONF.check_all())) {
|
||||
else if (OB_FAIL(TCONF.check_all())) {
|
||||
LOG_ERROR("check config fail", KR(ret));
|
||||
} else if (OB_FAIL(dump_config_())) {
|
||||
LOG_ERROR("dump_config_ fail", KR(ret));
|
||||
@ -750,13 +753,6 @@ int ObLogInstance::init_components_(const uint64_t start_tstamp_ns)
|
||||
LOG_INFO("set working mode", K(working_mode_str), K(working_mode_), "working_mode", print_working_mode(working_mode_));
|
||||
}
|
||||
|
||||
// init ObClockGenerator
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(common::ObClockGenerator::init())) {
|
||||
LOG_ERROR("failed to init ob clock generator", KR(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_UNLIKELY(! is_refresh_mode_valid(refresh_mode))) {
|
||||
ret = OB_INVALID_CONFIG;
|
||||
|
||||
@ -20,6 +20,53 @@ namespace oceanbase
|
||||
{
|
||||
namespace libobcdc
|
||||
{
|
||||
|
||||
const char* TenantLSQueryer::QUERY_LS_INFO_SQL_FORMAT =
|
||||
"SELECT LS_ID FROM "
|
||||
"(SELECT LS_ID, STATUS, CREATE_SCN, CASE WHEN STATUS = 'DROPPED' THEN ora_rowscn ELSE 1 END AS DROP_SCN FROM %s) "
|
||||
"WHERE CREATE_SCN <= %lu AND (DROP_SCN > %lu OR DROP_SCN = 1) AND STATUS NOT IN ('CREATE_ABORT', 'CREATING');";
|
||||
|
||||
int TenantLSQueryer::build_sql_statement_(const uint64_t tenant_id, ObSqlString &sql)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
share::SCN snapshot_scn;
|
||||
|
||||
if (OB_FAIL(snapshot_scn.convert_for_gts(snapshot_ts_ns_))) {
|
||||
LOG_ERROR("convert_for_gts failed", KR(ret), K(tenant_id), K_(snapshot_ts_ns));
|
||||
} else if (OB_UNLIKELY(! snapshot_scn.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("snapshot_scn is not valid", KR(ret), K(tenant_id), K(snapshot_scn), K_(snapshot_ts_ns));
|
||||
} else {
|
||||
uint64_t snapshto_scn_val = snapshot_scn.get_val_for_inner_table_field();
|
||||
|
||||
if (OB_FAIL(sql.assign_fmt(QUERY_LS_INFO_SQL_FORMAT, "__ALL_LS", snapshto_scn_val, snapshto_scn_val))) {
|
||||
LOG_ERROR("assign_fmt for TenantLSQuerySQL failed", KR(ret),
|
||||
K_(snapshot_ts_ns), K(snapshot_scn), K(snapshto_scn_val), K(sql));
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int TenantLSQueryer::parse_row_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult<LSIDArray> &result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t ls_id = share::ObLSID::INVALID_LS_ID;
|
||||
EXTRACT_UINT_FIELD_MYSQL(sql_result, "LS_ID", ls_id, int64_t);
|
||||
share::ObLSID ob_ls_id(ls_id);
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_ERROR("extract ls_id field from sql_result failed", KR(ret), K_(snapshot_ts_ns), K(ob_ls_id));
|
||||
} else if (OB_UNLIKELY(! ob_ls_id.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("invalid ls_id", KR(ret), K_(snapshot_ts_ns), K(ob_ls_id));
|
||||
} else if (OB_FAIL(result.get_data().push_back(ob_ls_id))) {
|
||||
LOG_ERROR("push_back ls_id into query_result failed", KR(ret), K_(snapshot_ts_ns), K(ob_ls_id), K(result));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObLogLsGetter::ObLogLsGetter() :
|
||||
is_inited_(false),
|
||||
tenant_ls_ids_cache_()
|
||||
@ -31,7 +78,7 @@ ObLogLsGetter::~ObLogLsGetter()
|
||||
destroy();
|
||||
}
|
||||
|
||||
int ObLogLsGetter::init(const common::ObIArray<uint64_t> &tenant_ids)
|
||||
int ObLogLsGetter::init(const common::ObIArray<uint64_t> &tenant_ids, const int64_t start_tstamp_ns)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -47,8 +94,8 @@ int ObLogLsGetter::init(const common::ObIArray<uint64_t> &tenant_ids)
|
||||
if (OB_SYS_TENANT_ID == tenant_id
|
||||
|| is_meta_tenant(tenant_id)) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(query_and_set_tenant_ls_info_(tenant_id))) {
|
||||
LOG_ERROR("query_and_set_tenant_ls_info_ failed", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(query_and_set_tenant_ls_info_(tenant_id, start_tstamp_ns))) {
|
||||
LOG_ERROR("query_and_set_tenant_ls_info_ failed", KR(ret), K(tenant_id), K(start_tstamp_ns));
|
||||
}
|
||||
}
|
||||
|
||||
@ -70,6 +117,7 @@ void ObLogLsGetter::destroy()
|
||||
|
||||
int ObLogLsGetter::get_ls_ids(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t snapshot_ts,
|
||||
common::ObIArray<share::ObLSID> &ls_id_array)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -84,7 +132,7 @@ int ObLogLsGetter::get_ls_ids(
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
// query and set
|
||||
if (OB_FAIL(query_and_set_tenant_ls_info_(tenant_id))) {
|
||||
if (OB_FAIL(query_and_set_tenant_ls_info_(tenant_id, snapshot_ts))) {
|
||||
LOG_ERROR("query_and_set_tenant_ls_info_ failed", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
@ -100,28 +148,18 @@ int ObLogLsGetter::get_ls_ids(
|
||||
}
|
||||
|
||||
int ObLogLsGetter::query_and_set_tenant_ls_info_(
|
||||
const uint64_t tenant_id)
|
||||
const uint64_t tenant_id,
|
||||
const int64_t snapshot_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLogSysTableHelper::TenantLSIDs tenant_ls_ids;
|
||||
LSIDArray ls_ids;
|
||||
|
||||
if (OB_FAIL(query_tenant_ls_info_(tenant_id, tenant_ls_ids))) {
|
||||
LOG_ERROR("query_tenant_ls_info_ failed", KR(ret), K(tenant_id));
|
||||
if (OB_FAIL(query_tenant_ls_info_(tenant_id, snapshot_ts, ls_ids))) {
|
||||
LOG_ERROR("query_tenant_ls_info failed", KR(ret), K(tenant_id), K(snapshot_ts));
|
||||
} else if (OB_FAIL(tenant_ls_ids_cache_.insert(tenant_id, ls_ids))) {
|
||||
LOG_ERROR("tenant_ls_ids_cache_ insert data failed", KR(ret), K(tenant_id), K(ls_ids));
|
||||
} else {
|
||||
ARRAY_FOREACH_N(tenant_ls_ids, ls_ids_idx, ls_ids_count) {
|
||||
if (OB_FAIL(ls_ids.push_back(tenant_ls_ids.at(ls_ids_idx)))) {
|
||||
LOG_ERROR("ls_ids push_back failed", KR(ret), K(ls_ids), K(tenant_ls_ids));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(tenant_ls_ids_cache_.insert(tenant_id, ls_ids))) {
|
||||
LOG_ERROR("tenant_ls_ids_cache_ insert failed", KR(ret), K(tenant_id), K(ls_ids));
|
||||
} else {
|
||||
LOG_INFO("tenant_ls_ids_cache_ insert succ", K(tenant_id), K(ls_ids));
|
||||
}
|
||||
} // OB_SUCC
|
||||
LOG_INFO("tenant_ls_ids_cache_ insert success", K(tenant_id), K(snapshot_ts), K(ls_ids));
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -129,33 +167,28 @@ int ObLogLsGetter::query_and_set_tenant_ls_info_(
|
||||
|
||||
int ObLogLsGetter::query_tenant_ls_info_(
|
||||
const uint64_t tenant_id,
|
||||
ObLogSysTableHelper::TenantLSIDs &tenant_ls_ids)
|
||||
const int64_t snapshot_ts,
|
||||
LSIDArray &ls_array)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
IObLogSysTableHelper *systable_helper = TCTX.systable_helper_;
|
||||
tenant_ls_ids.reset();
|
||||
bool done = false;
|
||||
const static int64_t QUERY_TIMEOUT = 10 * _MIN_;
|
||||
ls_array.reset();
|
||||
ObCDCQueryResult<LSIDArray> query_result(ls_array);
|
||||
TenantLSQueryer queryer(snapshot_ts, TCTX.tenant_sql_proxy_.get_ob_mysql_proxy());
|
||||
|
||||
if (OB_ISNULL(systable_helper)) {
|
||||
if (OB_UNLIKELY(! is_user_tenant(tenant_id))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_ERROR("expected user_tenant for TenantLSGetter", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(queryer.query(tenant_id, query_result, QUERY_TIMEOUT))) {
|
||||
LOG_ERROR("query tenant ls_id failed", KR(ret), K(tenant_id), K(snapshot_ts), K(ls_array));
|
||||
} else if (OB_UNLIKELY(query_result.is_empty())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("systable_helper is NULL", KR(ret));
|
||||
} else {
|
||||
while (! done && OB_SUCCESS == ret) {
|
||||
if (OB_FAIL(systable_helper->query_tenant_ls_info(tenant_id, tenant_ls_ids))) {
|
||||
LOG_WARN("systable_helper query_tenant_ls_info fail", KR(ret), K(tenant_id), K(tenant_ls_ids));
|
||||
} else {
|
||||
done = true;
|
||||
}
|
||||
|
||||
if (OB_NEED_RETRY == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
ob_usleep(100L * 1000L);
|
||||
}
|
||||
}
|
||||
LOG_ERROR("tenant ls_id query result empty", KR(ret), K(tenant_id), K(snapshot_ts), K(query_result), K(ls_array));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
} // namespace libobcdc
|
||||
} // namespace oceanbase
|
||||
|
||||
@ -16,33 +16,54 @@
|
||||
#include "lib/hash/ob_linear_hash_map.h" // ObLinearHashMap
|
||||
#include "ob_log_systable_helper.h" // ObLogSysTableHelper
|
||||
#include "ob_log_tenant.h"
|
||||
#include "ob_cdc_tenant_query.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace libobcdc
|
||||
{
|
||||
|
||||
typedef ObArray<share::ObLSID> LSIDArray;
|
||||
|
||||
class TenantLSQueryer : public ObCDCTenantQuery<LSIDArray>
|
||||
{
|
||||
public:
|
||||
TenantLSQueryer(const int64_t snapshot_ts_ns, common::ObMySQLProxy &sql_proxy)
|
||||
: ObCDCTenantQuery(sql_proxy), snapshot_ts_ns_(snapshot_ts_ns) {}
|
||||
~TenantLSQueryer() { snapshot_ts_ns_ = OB_INVALID_TIMESTAMP; }
|
||||
private:
|
||||
int build_sql_statement_(const uint64_t tenant_id, ObSqlString &sql) override;
|
||||
int parse_row_(common::sqlclient::ObMySQLResult &sql_result, ObCDCQueryResult<LSIDArray> &result) override;
|
||||
private:
|
||||
static const char* QUERY_LS_INFO_SQL_FORMAT;
|
||||
private:
|
||||
int64_t snapshot_ts_ns_;
|
||||
};
|
||||
|
||||
class ObLogLsGetter
|
||||
{
|
||||
public:
|
||||
ObLogLsGetter();
|
||||
~ObLogLsGetter();
|
||||
int init(const common::ObIArray<uint64_t> &tenant_ids);
|
||||
int init(const common::ObIArray<uint64_t> &tenant_ids, const int64_t start_tstamp_ns);
|
||||
void destroy();
|
||||
|
||||
int get_ls_ids(
|
||||
const uint64_t tenant_id,
|
||||
const int64_t snapshot_ts,
|
||||
common::ObIArray<share::ObLSID> &ls_id_array);
|
||||
|
||||
private:
|
||||
int query_and_set_tenant_ls_info_(
|
||||
const uint64_t tenant_id);
|
||||
const uint64_t tenant_id,
|
||||
const int64_t snapshot_ts);
|
||||
|
||||
int query_tenant_ls_info_(
|
||||
const uint64_t tenant_id,
|
||||
ObLogSysTableHelper::TenantLSIDs &tenant_ls_ids);
|
||||
const int64_t snapshot_ts,
|
||||
LSIDArray &ls_array);
|
||||
|
||||
private:
|
||||
typedef ObArray<share::ObLSID> LSIDArray;
|
||||
typedef common::ObLinearHashMap<TenantID, LSIDArray> TenantLSIDsCache;
|
||||
|
||||
bool is_inited_;
|
||||
|
||||
@ -34,7 +34,7 @@ int ObLogLSOpProcessor::process_ls_op(
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_ERROR("ls attr is invalid", KR(ret), K(ls_attr));
|
||||
} else if (OB_FAIL(TCTX.get_tenant_guard(tenant_id, guard))) {
|
||||
LOG_ERROR("get_tenant fail", KR(ret), K(tenant_id), K(guard));
|
||||
LOG_WARN("get_tenant fail", KR(ret), K(tenant_id), K(guard));
|
||||
} else if (OB_ISNULL(tenant = guard.get_tenant())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("get tenant fail, tenant is NULL", KR(ret), K(tenant_id));
|
||||
|
||||
@ -14,8 +14,8 @@
|
||||
#include "ob_log_meta_data_replayer.h"
|
||||
#include "ob_log_part_trans_task.h"
|
||||
|
||||
#define _STAT(level, fmt, args...) _OBLOG_LOG(level, "[LOG_META_DATA] [REPALYER] " fmt, ##args)
|
||||
#define STAT(level, fmt, args...) OBLOG_LOG(level, "[LOG_META_DATA] [REPALYER] " fmt, ##args)
|
||||
#define _STAT(level, fmt, args...) _OBLOG_LOG(level, "[LOG_META_DATA] [REPLAYER] " fmt, ##args)
|
||||
#define STAT(level, fmt, args...) OBLOG_LOG(level, "[LOG_META_DATA] [REPLAYER] " fmt, ##args)
|
||||
#define _ISTAT(fmt, args...) _STAT(INFO, fmt, ##args)
|
||||
#define ISTAT(fmt, args...) STAT(INFO, fmt, ##args)
|
||||
#define _DSTAT(fmt, args...) _STAT(DEBUG, fmt, ##args)
|
||||
@ -139,7 +139,7 @@ int ObLogMetaDataReplayer::replay(
|
||||
"TRANS_COUNT(TOTAL=%ld DDL_TRANS=%ld/%ld LS_OP=%ld)",
|
||||
tenant_id, start_timestamp_ns, NTS_TO_STR(start_timestamp_ns),
|
||||
replay_info_stat.total_part_trans_task_count_,
|
||||
replay_info_stat.ddl_part_trans_task_repalyed_count_,
|
||||
replay_info_stat.ddl_part_trans_task_replayed_count_,
|
||||
replay_info_stat.ddl_part_trans_task_toal_count_,
|
||||
replay_info_stat.ls_op_part_trans_task_count_);
|
||||
}
|
||||
@ -165,7 +165,7 @@ int ObLogMetaDataReplayer::handle_ddl_trans_(
|
||||
// Only DDL transactions less than or equal to the start timestamp need to be replayed
|
||||
if (trans_commit_version <= start_timestamp_ns) {
|
||||
DSTAT("handle DDL_TRANS to be replayed", K(part_trans_task));
|
||||
replay_info_stat.ddl_part_trans_task_repalyed_count_++;
|
||||
replay_info_stat.ddl_part_trans_task_replayed_count_++;
|
||||
|
||||
if (OB_FAIL(part_trans_task.parse_multi_data_source_data_for_ddl("ObLogMetaDataReplayer"))) {
|
||||
LOG_ERROR("parse_multi_data_source_data_for_ddl failed", KR(ret), K(part_trans_task));
|
||||
|
||||
@ -63,13 +63,13 @@ private:
|
||||
{
|
||||
total_part_trans_task_count_ = 0;
|
||||
ddl_part_trans_task_toal_count_ = 0;
|
||||
ddl_part_trans_task_repalyed_count_ = 0;
|
||||
ddl_part_trans_task_replayed_count_ = 0;
|
||||
ls_op_part_trans_task_count_ = 0;
|
||||
}
|
||||
|
||||
int64_t total_part_trans_task_count_;
|
||||
int64_t ddl_part_trans_task_toal_count_;
|
||||
int64_t ddl_part_trans_task_repalyed_count_;
|
||||
int64_t ddl_part_trans_task_replayed_count_;
|
||||
int64_t ls_op_part_trans_task_count_;
|
||||
};
|
||||
|
||||
|
||||
@ -13,8 +13,8 @@
|
||||
|
||||
#include "ob_log_schema_incremental_replay.h"
|
||||
|
||||
#define _STAT(level, fmt, args...) _OBLOG_LOG(level, "[LOG_META_DATA] [REPALYER] [SCHMEA] " fmt, ##args)
|
||||
#define STAT(level, fmt, args...) OBLOG_LOG(level, "[LOG_META_DATA] [REPALYER] [SCHMEA] " fmt, ##args)
|
||||
#define _STAT(level, fmt, args...) _OBLOG_LOG(level, "[LOG_META_DATA] [REPLAYER] [SCHMEA] " fmt, ##args)
|
||||
#define STAT(level, fmt, args...) OBLOG_LOG(level, "[LOG_META_DATA] [REPLAYER] [SCHMEA] " fmt, ##args)
|
||||
#define _ISTAT(fmt, args...) _STAT(INFO, fmt, ##args)
|
||||
#define ISTAT(fmt, args...) STAT(INFO, fmt, ##args)
|
||||
#define _DSTAT(fmt, args...) _STAT(DEBUG, fmt, ##args)
|
||||
|
||||
@ -20,7 +20,7 @@
|
||||
#include "common/ob_role.h" // LEADER
|
||||
|
||||
#include "share/inner_table/ob_inner_table_schema_constants.h" // OB_***_TNAME
|
||||
#include "share/schema/ob_schema_struct.h" // TenantStatus
|
||||
#include "share/schema/ob_schema_struct.h" // TenantStatus, ObTenantStatus
|
||||
#include "ob_log_instance.h" // TCTX
|
||||
#include "ob_log_config.h" // ObLogConfig, TCONF
|
||||
#include "ob_log_utils.h"
|
||||
|
||||
@ -432,8 +432,8 @@ int ObLogTenantMgr::start_tenant_service_(
|
||||
} else if (OB_SYS_TENANT_ID == tenant_id) {
|
||||
// sys tenant, do nothing
|
||||
} else if (! is_normal_new_created_tenant) {
|
||||
if (OB_FAIL(ls_getter_.get_ls_ids(tenant_id, ls_id_array))) {
|
||||
LOG_ERROR("ls_getter_ get_ls_ids failed", KR(ret), K(tenant_id), K(ls_id_array));
|
||||
if (OB_FAIL(ls_getter_.get_ls_ids(tenant_id, start_tstamp_ns, ls_id_array))) {
|
||||
LOG_ERROR("ls_getter_ get_ls_ids failed", KR(ret), K(tenant_id), K(ls_id_array), K(start_tstamp_ns));
|
||||
}
|
||||
}
|
||||
} else if (is_data_dict_refresh_mode(refresh_mode_)) {
|
||||
@ -1333,8 +1333,8 @@ int ObLogTenantMgr::get_tenant_ids_(
|
||||
// get available tenant id list
|
||||
else if (OB_FAIL(sys_schema_guard.get_available_tenant_ids(tenant_id_list, timeout))) {
|
||||
LOG_ERROR("get_available_tenant_ids fail", KR(ret), K(tenant_id_list), K(timeout));
|
||||
} else if (OB_FAIL(ls_getter_.init(tenant_id_list))) {
|
||||
LOG_ERROR("ObLogLsGetter init fail", KR(ret), K(tenant_id_list));
|
||||
} else if (OB_FAIL(ls_getter_.init(tenant_id_list, start_tstamp_ns))) {
|
||||
LOG_ERROR("ObLogLsGetter init fail", KR(ret), K(tenant_id_list), K(start_tstamp_ns));
|
||||
}
|
||||
} else if (is_data_dict_refresh_mode(refresh_mode_)) {
|
||||
IObLogSysTableHelper *systable_helper = TCTX.systable_helper_;
|
||||
|
||||
@ -100,7 +100,6 @@ public:
|
||||
|
||||
public:
|
||||
// Overloading thread handling functions
|
||||
// virtual int handle(void *data, const int64_t thread_index, volatile bool &stop_flag);
|
||||
virtual void handle(void *data, volatile bool &stop_flag) override;
|
||||
|
||||
public:
|
||||
|
||||
Reference in New Issue
Block a user