fix restore check merge_scn and optimise refresh_scn cache impl

This commit is contained in:
obdev 2025-01-02 08:44:47 +00:00 committed by ob-robot
parent 49bb8095f6
commit 45cb0cd546
7 changed files with 191 additions and 178 deletions

View File

@ -27,7 +27,9 @@ using namespace common;
* ObMViewMaintenanceService
*/
ObMViewMaintenanceService::ObMViewMaintenanceService() : is_inited_(false) {}
ObMViewMaintenanceService::ObMViewMaintenanceService() : is_inited_(false),
mview_refresh_info_timestamp_(0)
{}
ObMViewMaintenanceService::~ObMViewMaintenanceService() {}
@ -75,7 +77,7 @@ int ObMViewMaintenanceService::init()
LOG_WARN("fail to init mview clean snapshot task", KR(ret));
} else if (OB_FAIL(mview_update_cache_task_.init())) {
LOG_WARN("fail to init mview update cache task", KR(ret));
} else if (OB_FAIL(mview_refresh_info_map_.create(bucket_num, attr))) {
} else if (OB_FAIL(mview_refresh_info_cache_.create(bucket_num, attr))) {
LOG_WARN("fail to create mview refresh info map", KR(ret));
} else {
is_inited_ = true;
@ -90,6 +92,8 @@ int ObMViewMaintenanceService::start()
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObMViewMaintenanceService not init", KR(ret), KP(this));
} else if (!is_meta_tenant(MTL_ID()) && OB_FAIL(mview_update_cache_task_.start())) { // run on every tenant server
LOG_WARN("fail to start mview update cache task", KR(ret));
} else {
// do nothing
}
@ -97,6 +101,12 @@ int ObMViewMaintenanceService::start()
}
void ObMViewMaintenanceService::stop()
{
sys_ls_task_stop_();
mview_update_cache_task_.stop();
}
void ObMViewMaintenanceService::sys_ls_task_stop_()
{
mlog_maintenance_task_.stop();
mview_maintenance_task_.stop();
@ -133,7 +143,7 @@ void ObMViewMaintenanceService::destroy()
collect_mv_merge_info_task_.destroy();
mview_clean_snapshot_task_.destroy();
mview_update_cache_task_.destroy();
mview_refresh_info_map_.destroy();
mview_refresh_info_cache_.destroy();
}
int ObMViewMaintenanceService::inner_switch_to_leader()
@ -161,8 +171,6 @@ int ObMViewMaintenanceService::inner_switch_to_leader()
LOG_WARN("collect mv merge info task start failed", KR(ret));
} else if (OB_FAIL(mview_clean_snapshot_task_.start())) {
LOG_WARN("fail to start mview clean snapshot task", KR(ret));
} else if (OB_FAIL(mview_update_cache_task_.start())) {
LOG_WARN("fail to start mview update cache task", KR(ret));
}
}
const int64_t cost_us = ObTimeUtility::current_time() - start_time_us;
@ -178,12 +186,7 @@ int ObMViewMaintenanceService::inner_switch_to_follower()
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObMViewMaintenanceService not init", KR(ret), KP(this));
} else {
// start update cache task in follower
if (OB_FAIL(mview_update_cache_task_.start())) {
LOG_WARN("fail to start mview update cache task", KR(ret));
}
stop();
} else if (FALSE_IT(sys_ls_task_stop_())) {
}
const int64_t cost_us = ObTimeUtility::current_time() - start_time_us;
FLOG_INFO("mview_maintenance: switch_to_follower", KR(ret), K(tenant_id), K(cost_us));
@ -225,74 +228,71 @@ int ObMViewMaintenanceService::resume_leader()
return ret;
}
int ObMViewMaintenanceService::extract_sql_result(sqlclient::ObMySQLResult *mysql_result,
ObIArray<uint64_t> &mview_ids,
ObIArray<uint64_t> &last_refresh_scns,
ObIArray<uint64_t> &mview_refresh_modes)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(mysql_result)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("mysql result is null", K(ret), KP(mysql_result));
} else {
ObSEArray<uint64_t, 2> res_ids;
ObSEArray<uint64_t, 2> res_scns;
ObSEArray<uint64_t, 2> refresh_modes;
const int64_t col_idx0 = 0;
const int64_t col_idx1 = 1;
const int64_t col_idx2 = 2;
while (OB_SUCC(ret) && OB_SUCC(mysql_result->next())) {
uint64_t mview_id = OB_INVALID_ID;
uint64_t last_refresh_scn = OB_INVALID_SCN_VAL;
uint64_t refresh_mode = (uint64_t)ObMVRefreshMode::MAX;
if (OB_FAIL(mysql_result->get_uint(col_idx0, mview_id))
|| OB_FAIL(mysql_result->get_uint(col_idx1, last_refresh_scn))
|| OB_FAIL(mysql_result->get_uint(col_idx2, refresh_mode))) {
LOG_WARN("fail to get int/uint value", K(ret));
} else if (OB_FAIL(res_ids.push_back(mview_id))
|| OB_FAIL(res_scns.push_back(last_refresh_scn))
|| OB_FAIL(refresh_modes.push_back(refresh_mode))) {
LOG_WARN("fail to push back array", K(ret));
}
}
if (OB_LIKELY(OB_SUCCESS == ret || OB_ITER_END == ret)) {
if((OB_FAIL(mview_ids.assign(res_ids)) ||
OB_FAIL(last_refresh_scns.assign(res_scns)) ||
OB_FAIL(mview_refresh_modes.assign(refresh_modes)))) {
LOG_WARN("fail to assign array", K(ret));
}
}
}
return ret;
}
int ObMViewMaintenanceService::update_mview_refresh_info_cache(
const ObIArray<uint64_t> &mview_ids,
const ObIArray<uint64_t> &mview_refresh_scns,
const ObIArray<uint64_t> &mview_refresh_modes,
ObMviewRefreshInfoMap &mview_refresh_info_map) {
const ObIArray<uint64_t> &mview_refresh_modes) {
int ret = OB_SUCCESS;
int update_cache_cnt = 0;
const int invalid_refresh_scn = 0;
int64_t now_ts = ObTimeUtility::fast_current_time();
ARRAY_FOREACH_X(mview_ids, idx, cnt, OB_SUCC(ret)) {
RefreshInfo new_refresh_info;
if (mview_refresh_scns.at(idx) == invalid_refresh_scn) {
// skip update invalid scn in cache
} else if (mview_refresh_modes.at(idx) == (uint64_t)ObMVRefreshMode::MAJOR_COMPACTION) {
new_refresh_info.refresh_scn_ = mview_refresh_scns.at(idx);
new_refresh_info.refresh_ts_ = now_ts;
new_refresh_info.expired_ts_ = new_refresh_info.refresh_ts_ +
ObMViewMaintenanceService::CacheValidInterval;
if (OB_FAIL(mview_refresh_info_map.set_refactored(mview_ids.at(idx), new_refresh_info, 1/*overwrite*/))) {
LOG_WARN("fail to set refresh info", KR(ret), K(idx), K(mview_ids.at(idx)));
int64_t start_ts = ObTimeUtility::current_time();
hash::ObHashSet<uint64_t> update_set;
ObSEArray<uint64_t, 1> del_mview_id;
if (OB_FAIL(update_set.create(10))) {
LOG_WARN("init update set failed", KR(ret));
} else {
ARRAY_FOREACH_X(mview_ids, idx, cnt, OB_SUCC(ret)) {
if (mview_refresh_scns.at(idx) > 0 && mview_refresh_modes.at(idx) == (uint64_t)ObMVRefreshMode::MAJOR_COMPACTION) {
MViewRefreshInfo cache_info;
bool need_update = true;
if (OB_FAIL(update_set.set_refactored(mview_ids.at(idx)))) {
LOG_WARN("fail to set mview_id", KR(ret), K(idx), K(mview_ids.at(idx)));
} else if (OB_FAIL(mview_refresh_info_cache_.get_refactored(mview_ids.at(idx), cache_info))) {
if (OB_HASH_NOT_EXIST) {
ret = OB_SUCCESS;
}
} else if (mview_refresh_scns.at(idx) == cache_info.refresh_scn_) {
need_update = false;
}
if (OB_SUCC(ret) && need_update) {
MViewRefreshInfo new_refresh_info;
new_refresh_info.refresh_scn_ = mview_refresh_scns.at(idx);
if (OB_FAIL(mview_refresh_info_cache_.set_refactored(mview_ids.at(idx), new_refresh_info, 1/*overwrite*/))) {
LOG_WARN("fail to set refresh info", KR(ret), K(idx), K(mview_ids.at(idx)));
} else {
update_cache_cnt++;
}
}
}
update_cache_cnt += 1;
// for debug
LOG_INFO("update mview refresh info", K(ret), K(mview_refresh_scns.at(idx)),
K(update_cache_cnt));
}
}
// remove deleted mview_id in cache
if (OB_SUCC(ret) && mview_refresh_info_cache_.size() != update_set.size()) {
for (MViewRefreshInfoCache::iterator it = mview_refresh_info_cache_.begin();OB_SUCC(ret) && it != mview_refresh_info_cache_.end(); it++) {
if (OB_FAIL(update_set.exist_refactored(it->first))) {
if (OB_HASH_EXIST == ret) {
ret = OB_SUCCESS;
} else if (OB_HASH_NOT_EXIST) {
if (OB_FAIL(del_mview_id.push_back(it->first))) {
LOG_WARN("del_mview_id push failed", KR(ret), K(it->first));
}
} else {
LOG_WARN("check mview_id failed", KR(ret), K(it->first));
}
}
}
for (int64_t idx = 0; idx < del_mview_id.count() && OB_SUCC(ret); idx++) {
if (OB_FAIL(mview_refresh_info_cache_.erase_refactored(del_mview_id.at(idx)))) {
LOG_WARN("erash mview failed", KR(ret), K(del_mview_id.at(idx)));
}
}
}
// update timestamp
if (OB_SUCC(ret)) {
mview_refresh_info_timestamp_ = start_ts;
}
int64_t end_ts = ObTimeUtility::current_time();
LOG_INFO("update mview refresh info", K(ret), K(mview_ids), K(mview_refresh_scns), K(update_cache_cnt), K(del_mview_id),
"cost", end_ts - start_ts);
return ret;
}
@ -323,10 +323,10 @@ int ObMViewMaintenanceService::
tenant_id,
sql.ptr()))) {
LOG_WARN("fail to execute sql", K(ret), K(sql), K(tenant_id));
} else if (OB_FAIL(extract_sql_result(res.get_result(),
mview_ids,
last_refresh_scns,
mview_refresh_modes))) {
} else if (OB_FAIL(mview_update_cache_task_.extract_sql_result(res.get_result(),
mview_ids,
last_refresh_scns,
mview_refresh_modes))) {
LOG_WARN("failt to extract sql result", K(ret), K(sql), K(tenant_id));
}
}
@ -339,32 +339,39 @@ int ObMViewMaintenanceService::fetch_mv_refresh_scns(
const share::SCN &read_snapshot,
ObIArray<uint64_t> &mview_ids,
ObIArray<uint64_t> &mview_refresh_scns,
uint64_t &not_hit_count)
bool &hit_cache)
{
int ret = OB_SUCCESS;
hit_cache = false;
if (src_mview_ids.empty()) {
// do nothing
} else if (!read_snapshot.is_valid()) {
not_hit_count += 1;
} else if (ObTimeUtil::current_time() - mview_refresh_info_timestamp_ > CacheValidInterval) {
// cache expired
} else {
set_last_request_ts(ObTimeUtility::fast_current_time());
ARRAY_FOREACH_X(src_mview_ids, idx, cnt, OB_SUCC(ret) && 0 == not_hit_count) {
RefreshInfo refresh_info;
if (OB_FAIL(mview_refresh_info_map_.get_refactored(src_mview_ids.at(idx), refresh_info))) {
int64_t succ_cnt = 0;
ARRAY_FOREACH_X(src_mview_ids, idx, cnt, OB_SUCC(ret)) {
MViewRefreshInfo refresh_info;
if (OB_FAIL(mview_refresh_info_cache_.get_refactored(src_mview_ids.at(idx), refresh_info))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS;
not_hit_count += 1;
break;
}
LOG_WARN("fail to get refresh info", KR(ret), K(idx), K(src_mview_ids.at(idx)));
} else {
if (refresh_info.hit_cache(read_snapshot)) {
if (read_snapshot.get_val_for_tx() >= refresh_info.refresh_scn_) {
if (OB_FAIL(mview_refresh_scns.push_back(refresh_info.refresh_scn_))) {
LOG_WARN("fail to push back refresh scns", KR(ret), K(idx), K(src_mview_ids.at(idx)));
} else {
succ_cnt++;
}
} else {
not_hit_count += 1;
break;
}
}
if (OB_SUCC(ret) && src_mview_ids.count() == succ_cnt) {
hit_cache = true;
}
}
}
return ret;
@ -377,14 +384,13 @@ int ObMViewMaintenanceService::get_mview_refresh_info(const ObIArray<uint64_t> &
ObIArray<uint64_t> &mview_refresh_scns)
{
int ret = OB_SUCCESS;
uint64_t not_hit_count = 0;
bool hit_cache = false;
const uint64_t tenant_id = MTL_ID();
ObSEArray<uint64_t, 2> refresh_modes;
ObSEArray<uint64_t, 2> refresh_scns;
if (!is_inited_ || !mview_refresh_info_map_.created()) {
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("ObMViewMaintenanceService not init", KR(ret),
K(is_inited_), K(mview_refresh_info_map_.created()));
LOG_WARN("ObMViewMaintenanceService not init", KR(ret), K(is_inited_));
} else if (OB_ISNULL(sql_proxy)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(sql_proxy));
@ -394,11 +400,11 @@ int ObMViewMaintenanceService::get_mview_refresh_info(const ObIArray<uint64_t> &
} else if (src_mview_ids.empty()) {
// do nothing
} else if (OB_FAIL(fetch_mv_refresh_scns(src_mview_ids, read_snapshot,
mview_ids, refresh_scns, not_hit_count))){
mview_ids, refresh_scns, hit_cache))){
LOG_WARN("fail to fetch mv refresh scns", KR(ret), K(tenant_id), K(src_mview_ids));
}
if (OB_FAIL(ret)) {
} else if (not_hit_count == 0) {
} else if (hit_cache) {
if (OB_FAIL(mview_ids.assign(src_mview_ids)) ||
OB_FAIL(mview_refresh_scns.assign(refresh_scns))) {
LOG_WARN("fail to assign mview ids or mview refresh scns", K(ret));
@ -413,19 +419,13 @@ int ObMViewMaintenanceService::get_mview_refresh_info(const ObIArray<uint64_t> &
mview_refresh_scns,
refresh_modes))) {
LOG_WARN("fail to get mview last refresh info", K(ret), K(src_mview_ids), K(tenant_id));
} else if (OB_FAIL(ObMViewMaintenanceService::
update_mview_refresh_info_cache(mview_ids,
mview_refresh_scns,
refresh_modes,
mview_refresh_info_map_))) {
LOG_WARN("fail to update mview refresh info cache", K(ret), K(tenant_id));
}
}
// for debug
LOG_INFO("use mview refresh info cache",
K(src_mview_ids), K(mview_ids),
K(tenant_id), K(not_hit_count),
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
LOG_INFO("get_mview_refresh_info", K(ret), K(src_mview_ids), K(mview_ids),
K(tenant_id), K(hit_cache),
K(mview_refresh_scns), K(read_snapshot));
}
return ret;
}
} // namespace rootserver

View File

@ -28,29 +28,20 @@ namespace oceanbase
{
namespace rootserver
{
class ObMviewUpdateCacheTask;
struct RefreshInfo
{
uint64_t refresh_scn_;
int64_t refresh_ts_;
int64_t expired_ts_;
bool hit_cache(const share::SCN &read_snapshot) {
return expired_ts_ >= read_snapshot.convert_to_ts() &&
read_snapshot.get_val_for_tx() >= refresh_scn_;
}
RefreshInfo() : refresh_scn_(0), refresh_ts_(0), expired_ts_(0)
{};
};
typedef common::hash::
ObHashMap<uint64_t, RefreshInfo>
ObMviewRefreshInfoMap;
class ObMViewMaintenanceService : public logservice::ObIReplaySubHandler,
public logservice::ObICheckpointSubHandler,
public logservice::ObIRoleChangeSubHandler
{
public:
static const int64_t CacheValidInterval = 30 * 1000 * 1000; //30s
struct MViewRefreshInfo
{
uint64_t refresh_scn_;
MViewRefreshInfo() : refresh_scn_(0)
{};
};
typedef hash::ObHashMap<uint64_t, MViewRefreshInfo> MViewRefreshInfoCache;
public:
ObMViewMaintenanceService();
virtual ~ObMViewMaintenanceService();
@ -101,27 +92,21 @@ public:
ObIArray<uint64_t> &mview_ids,
ObIArray<uint64_t> &last_refresh_scns,
ObIArray<uint64_t> &mview_refresh_modes);
static int extract_sql_result(sqlclient::ObMySQLResult *mysql_result,
ObIArray<uint64_t> &mview_ids,
ObIArray<uint64_t> &last_refresh_scns,
ObIArray<uint64_t> &mview_refresh_modes);
static int update_mview_refresh_info_cache(const ObIArray<uint64_t> &mview_ids,
const ObIArray<uint64_t> &mview_refresh_scns,
const ObIArray<uint64_t> &mview_refresh_modes,
ObMviewRefreshInfoMap &mview_refresh_info_map);
int update_mview_refresh_info_cache(const ObIArray<uint64_t> &mview_ids,
const ObIArray<uint64_t> &mview_refresh_scns,
const ObIArray<uint64_t> &mview_refresh_modes);
int fetch_mv_refresh_scns(const ObIArray<uint64_t> &src_mview_ids,
const share::SCN &read_snapshot,
ObIArray<uint64_t> &mview_ids,
ObIArray<uint64_t> &mview_refresh_scns,
uint64_t &not_hit_count);
int64_t get_last_request_ts() const { return ATOMIC_LOAD(&last_request_ts_); }
void set_last_request_ts(const int64_t last_request_ts) { ATOMIC_STORE(&last_request_ts_, last_request_ts); }
ObMviewRefreshInfoMap &get_mview_refresh_info_map() { return mview_refresh_info_map_; }
bool &hit_cache);
private:
int inner_switch_to_leader();
int inner_switch_to_follower();
void sys_ls_task_stop_();
private:
bool is_inited_;
ObMLogMaintenanceTask mlog_maintenance_task_;
ObMViewMaintenanceTask mview_maintenance_task_;
ObMViewRefreshStatsMaintenanceTask mvref_stats_maintenance_task_;
@ -131,9 +116,8 @@ private:
ObCollectMvMergeInfoTask collect_mv_merge_info_task_;
ObMViewCleanSnapshotTask mview_clean_snapshot_task_;
ObMviewUpdateCacheTask mview_update_cache_task_;
ObMviewRefreshInfoMap mview_refresh_info_map_;
int64_t last_request_ts_;
bool is_inited_;
MViewRefreshInfoCache mview_refresh_info_cache_;
int64_t mview_refresh_info_timestamp_;
};
} // namespace rootserver

View File

@ -23,17 +23,20 @@
#include "share/backup/ob_backup_data_table_operator.h"
#include "storage/mview/ob_mview_refresh_stats_purge.h"
#include "observer/ob_inner_sql_connection.h"
#include "share/ob_all_server_tracer.h"
namespace oceanbase {
namespace rootserver {
#define QUERY_MAJOR_MV_MERGE_SCN_SQL "select mview_id,t2.data_table_id,last_refresh_scn,t3.tablet_id, \
t4.svr_ip,t4.svr_port,t4.ls_id,t5.learner_list, t4.end_log_scn from %s t1 \
t4.svr_ip,t4.svr_port,t4.ls_id,t4.end_log_scn, \
locate(concat(t4.svr_ip,\":\", t4.svr_port), t5.paxos_member_list) > 0 is_member, \
locate(concat(t4.svr_ip,\":\", t4.svr_port), t5.learner_list) > 0 is_leaner from %s t1 \
left join %s t2 on t1.mview_id = t2.table_id \
left join %s t3 on t2.data_table_id = t3.table_id \
left join %s t4 on t3.tablet_id = t4.tablet_id and t4.table_type = 10 \
left join %s t5 on t4.svr_ip = t5.svr_ip and t4.svr_port = t5.svr_port and t4.ls_id = t5.ls_id \
where t1.refresh_mode = %ld and t1.last_refresh_scn > 0 order by 1,2,3,4,5,6,7,9"
where t1.refresh_mode = %ld and t1.last_refresh_scn > 0 order by 1,2,3,4,5,6,7,8"
ObMViewPushRefreshScnTask::ObMViewPushRefreshScnTask()
@ -113,7 +116,7 @@ void ObMViewPushRefreshScnTask::runTimerTask()
} else if (OB_FAIL(need_schedule_major_refresh_mv_task(tenant_id_, need_schedule))) {
LOG_WARN("fail to check need schedule major refresh mv task", KR(ret), K(tenant_id_));
} else if (!need_schedule) {
} else if (FALSE_IT(void(check_major_mv_refresh_scn_safety(tenant_id_)))) {
} else if (REACH_TIME_INTERVAL(300 * 1000 * 1000) && FALSE_IT(void(check_major_mv_refresh_scn_safety(tenant_id_)))) {
} else if (OB_UNLIKELY(OB_ISNULL(sql_proxy))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql proxy is null", KR(ret));
@ -192,16 +195,19 @@ int ObMViewPushRefreshScnTask::check_major_mv_refresh_scn_safety(const uint64_t
}
}
}
bool alive = true;
// ignore ret
SVR_TRACER.check_server_alive(merge_info.svr_addr_, alive);
if (find_dest_merge_scn) {
} else if (merge_info.has_learner_) {
LOG_WARN("major_mv_safety>>>>", K(merge_info));
} else if (!merge_info.is_member_ || merge_info.is_learner_) {
LOG_WARN("major_mv_safety>>>>", K(merge_info), K(alive));
} else {
LOG_ERROR("major_mv_safety>>>>", K(merge_info));
LOG_ERROR("major_mv_safety>>>>", K(merge_info), K(alive));
is_safety = false;
}
}
}
LOG_INFO("major_mv_safety>>>>", K(is_safety));
LOG_INFO("major_mv_safety<<<<<<<<<<<", K(is_safety));
}
return ret;
}
@ -275,27 +281,23 @@ int ObMViewPushRefreshScnTask::get_major_mv_merge_info_(const uint64_t tenant_id
char svr_ip[OB_IP_STR_BUFF] = "";
int64_t svr_port = 0;
int64_t tmp_real_str_len = 0;
ObString learner_list;
EXTRACT_INT_FIELD_MYSQL(*result, "mview_id", merge_info.mview_id_, int64_t);
EXTRACT_INT_FIELD_MYSQL(*result, "data_table_id", merge_info.data_table_id_, int64_t);
EXTRACT_UINT_FIELD_MYSQL(*result, "last_refresh_scn", merge_info.last_refresh_scn_, uint64_t);
EXTRACT_INT_FIELD_MYSQL(*result, "tablet_id", merge_info.tablet_id_, int64_t);
EXTRACT_STRBUF_FIELD_MYSQL(*result, "svr_ip", svr_ip, OB_IP_STR_BUFF, tmp_real_str_len);
EXTRACT_INT_FIELD_MYSQL(*result, "svr_port", svr_port, int64_t);
(void)merge_info.svr_addr_.set_ip_addr(svr_ip, static_cast<int32_t>(svr_port));
EXTRACT_INT_FIELD_MYSQL(*result, "ls_id", merge_info.ls_id_, int64_t);
EXTRACT_UINT_FIELD_MYSQL(*result, "end_log_scn", merge_info.end_log_scn_, uint64_t);
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(*result, "learner_list", learner_list);
if (learner_list.length() > 0) {
merge_info.has_learner_ = true;
} else {
merge_info.has_learner_ = false;
}
EXTRACT_INT_FIELD_MYSQL(*result, "is_member", merge_info.is_member_, int64_t);
EXTRACT_INT_FIELD_MYSQL(*result, "is_leaner", merge_info.is_learner_, int64_t);
if (OB_FAIL(ret)) {
LOG_WARN("fail to extract field from result", KR(ret));
} else if (OB_FAIL(merge_info_array.push_back(merge_info))) {
LOG_WARN("fail to push merge_info to array", KR(ret));
} else {
(void)merge_info.svr_addr_.set_ip_addr(svr_ip, static_cast<int32_t>(svr_port));
if (OB_FAIL(merge_info_array.push_back(merge_info))) {
LOG_WARN("fail to push merge_info to array", KR(ret));
}
}
if (OB_FAIL(ret)) {
is_result_next_err = false;

View File

@ -38,12 +38,13 @@ public:
tablet_id_(0),
svr_addr_(),
ls_id_(),
has_learner_(false),
end_log_scn_(0)
end_log_scn_(0),
is_member_(false),
is_learner_(false)
{}
~ObMajorMVMergeInfo() {}
TO_STRING_KV(K_(mview_id), K_(data_table_id), K_(last_refresh_scn), K_(tablet_id),
K_(svr_addr), K_(ls_id), K_(has_learner), K_(end_log_scn));
K_(svr_addr), K_(ls_id), K_(end_log_scn), K_(is_member), K_(is_learner));
bool is_valid() {
return mview_id_ > 0 && data_table_id_ > 0 && last_refresh_scn_ > 0
&& svr_addr_.is_valid() && ls_id_.is_valid() && tablet_id_ > 0;
@ -60,8 +61,9 @@ public:
int64_t tablet_id_;
ObAddr svr_addr_;
share::ObLSID ls_id_;
bool has_learner_;
uint64_t end_log_scn_;
bool is_member_;
bool is_learner_;
};
class ObMViewPushRefreshScnTask : public ObMViewTimerTask

View File

@ -110,10 +110,6 @@ void ObMviewUpdateCacheTask::runTimerTask()
ObMySQLProxy *sql_proxy = GCTX.sql_proxy_;
rootserver::ObMViewMaintenanceService *mview_maintenance_service =
MTL(rootserver::ObMViewMaintenanceService*);
int64_t current_ts = ObTimeUtility::fast_current_time();
int64_t last_request_ts;
const int64_t NeedUpdateCacheInterval = 10 * 60 * 1000 * 1000; // 10min
// check request time;
if (OB_ISNULL(sql_proxy) || OB_ISNULL(mview_maintenance_service)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql proxy is null or ObMViewMaintenanceService is null",
@ -121,16 +117,8 @@ void ObMviewUpdateCacheTask::runTimerTask()
} else if (!is_valid_tenant_id(tenant_id)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tenant id is invalid", KR(ret), K(tenant_id));
} else if (OB_FALSE_IT(last_request_ts = mview_maintenance_service->get_last_request_ts())) {
} else if (last_request_ts < current_ts &&
current_ts - last_request_ts > NeedUpdateCacheInterval) {
// clear cache
if (!mview_maintenance_service->get_mview_refresh_info_map().empty()) {
mview_maintenance_service->get_mview_refresh_info_map().clear();
}
} else {
const int64_t refresh_mode = (int64_t)ObMVRefreshMode::MAJOR_COMPACTION;
ObMviewRefreshInfoMap &mview_refresh_info_map = mview_maintenance_service->get_mview_refresh_info_map();
ObSEArray<uint64_t, 2> mview_ids;
ObSEArray<uint64_t, 2> mview_refresh_scns;
ObSEArray<uint64_t, 2> mview_refresh_modes;
@ -143,8 +131,7 @@ void ObMviewUpdateCacheTask::runTimerTask()
tenant_id,
sql.ptr()))) {
LOG_WARN("fail to execute sql", K(ret), K(sql), K(tenant_id));
} else if (OB_FAIL(ObMViewMaintenanceService::
extract_sql_result(res.get_result(),
} else if (OB_FAIL(extract_sql_result(res.get_result(),
mview_ids,
mview_refresh_scns,
mview_refresh_modes))) {
@ -152,19 +139,56 @@ void ObMviewUpdateCacheTask::runTimerTask()
}
}
if (OB_FAIL(ret)) {
//do nothing
} else if (mview_ids.empty()) {
// do nothing
} else if (OB_FAIL(ObMViewMaintenanceService::
update_mview_refresh_info_cache(mview_ids,
mview_refresh_scns,
mview_refresh_modes,
mview_refresh_info_map))){
} else if (OB_FAIL(mview_maintenance_service->update_mview_refresh_info_cache(mview_ids,
mview_refresh_scns,
mview_refresh_modes))){
LOG_WARN("fail to update mview refresh info cache", K(ret),
K(mview_ids), K(mview_refresh_scns), K(mview_refresh_modes), K(tenant_id));
}
}
}
int ObMviewUpdateCacheTask::extract_sql_result(sqlclient::ObMySQLResult *mysql_result,
ObIArray<uint64_t> &mview_ids,
ObIArray<uint64_t> &last_refresh_scns,
ObIArray<uint64_t> &mview_refresh_modes)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(mysql_result)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("mysql result is null", K(ret), KP(mysql_result));
} else {
ObSEArray<uint64_t, 2> res_ids;
ObSEArray<uint64_t, 2> res_scns;
ObSEArray<uint64_t, 2> refresh_modes;
const int64_t col_idx0 = 0;
const int64_t col_idx1 = 1;
const int64_t col_idx2 = 2;
while (OB_SUCC(ret) && OB_SUCC(mysql_result->next())) {
uint64_t mview_id = OB_INVALID_ID;
uint64_t last_refresh_scn = OB_INVALID_SCN_VAL;
uint64_t refresh_mode = (uint64_t)ObMVRefreshMode::MAX;
if (OB_FAIL(mysql_result->get_uint(col_idx0, mview_id))
|| OB_FAIL(mysql_result->get_uint(col_idx1, last_refresh_scn))
|| OB_FAIL(mysql_result->get_uint(col_idx2, refresh_mode))) {
LOG_WARN("fail to get int/uint value", K(ret));
} else if (OB_FAIL(res_ids.push_back(mview_id))
|| OB_FAIL(res_scns.push_back(last_refresh_scn))
|| OB_FAIL(refresh_modes.push_back(refresh_mode))) {
LOG_WARN("fail to push back array", K(ret));
}
}
if (OB_LIKELY(OB_SUCCESS == ret || OB_ITER_END == ret)) {
if((OB_FAIL(mview_ids.assign(res_ids)) ||
OB_FAIL(last_refresh_scns.assign(res_scns)) ||
OB_FAIL(mview_refresh_modes.assign(refresh_modes)))) {
LOG_WARN("fail to assign array", K(ret));
}
}
}
return ret;
}
}
}
}

View File

@ -43,6 +43,10 @@ public:
void clean_up();
void runTimerTask() override;
DISABLE_COPY_ASSIGN(ObMviewUpdateCacheTask);
int extract_sql_result(sqlclient::ObMySQLResult *mysql_result,
ObIArray<uint64_t> &mview_ids,
ObIArray<uint64_t> &last_refresh_scns,
ObIArray<uint64_t> &mview_refresh_modes);
private:
bool is_inited_;
bool is_stop_;

View File

@ -1928,9 +1928,6 @@ int ObRestoreScheduler::wait_restore_safe_mview_merge_info_()
} else if (OB_FAIL(global_proxy.get_major_refresh_mv_merge_scn(false, /*for update*/
major_mv_merge_scn))) {
LOG_WARN("fail to get major_refresh_mv_merge_scn", K(ret), K(tenant_id_));
} else if (OB_FAIL(ObMViewTimerTask::check_mview_last_refresh_scn(tenant_id_,
major_mv_merge_scn))) {
LOG_WARN("fail to check mview last refesh scn", K(ret), K(tenant_id_), K(major_mv_merge_scn));
}
if (OB_SUCC(ret)) {
if (mv_lastest_merge_scn < major_mv_merge_scn) {