diff --git a/src/rootserver/mview/ob_mview_maintenance_service.cpp b/src/rootserver/mview/ob_mview_maintenance_service.cpp index 636c33711..cee1bd0ee 100644 --- a/src/rootserver/mview/ob_mview_maintenance_service.cpp +++ b/src/rootserver/mview/ob_mview_maintenance_service.cpp @@ -27,7 +27,9 @@ using namespace common; * ObMViewMaintenanceService */ -ObMViewMaintenanceService::ObMViewMaintenanceService() : is_inited_(false) {} +ObMViewMaintenanceService::ObMViewMaintenanceService() : is_inited_(false), + mview_refresh_info_timestamp_(0) + {} ObMViewMaintenanceService::~ObMViewMaintenanceService() {} @@ -75,7 +77,7 @@ int ObMViewMaintenanceService::init() LOG_WARN("fail to init mview clean snapshot task", KR(ret)); } else if (OB_FAIL(mview_update_cache_task_.init())) { LOG_WARN("fail to init mview update cache task", KR(ret)); - } else if (OB_FAIL(mview_refresh_info_map_.create(bucket_num, attr))) { + } else if (OB_FAIL(mview_refresh_info_cache_.create(bucket_num, attr))) { LOG_WARN("fail to create mview refresh info map", KR(ret)); } else { is_inited_ = true; @@ -90,6 +92,8 @@ int ObMViewMaintenanceService::start() if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObMViewMaintenanceService not init", KR(ret), KP(this)); + } else if (!is_meta_tenant(MTL_ID()) && OB_FAIL(mview_update_cache_task_.start())) { // run on every tenant server + LOG_WARN("fail to start mview update cache task", KR(ret)); } else { // do nothing } @@ -97,6 +101,12 @@ int ObMViewMaintenanceService::start() } void ObMViewMaintenanceService::stop() +{ + sys_ls_task_stop_(); + mview_update_cache_task_.stop(); +} + +void ObMViewMaintenanceService::sys_ls_task_stop_() { mlog_maintenance_task_.stop(); mview_maintenance_task_.stop(); @@ -133,7 +143,7 @@ void ObMViewMaintenanceService::destroy() collect_mv_merge_info_task_.destroy(); mview_clean_snapshot_task_.destroy(); mview_update_cache_task_.destroy(); - mview_refresh_info_map_.destroy(); + mview_refresh_info_cache_.destroy(); } int ObMViewMaintenanceService::inner_switch_to_leader() @@ -161,8 +171,6 @@ int ObMViewMaintenanceService::inner_switch_to_leader() LOG_WARN("collect mv merge info task start failed", KR(ret)); } else if (OB_FAIL(mview_clean_snapshot_task_.start())) { LOG_WARN("fail to start mview clean snapshot task", KR(ret)); - } else if (OB_FAIL(mview_update_cache_task_.start())) { - LOG_WARN("fail to start mview update cache task", KR(ret)); } } const int64_t cost_us = ObTimeUtility::current_time() - start_time_us; @@ -178,12 +186,7 @@ int ObMViewMaintenanceService::inner_switch_to_follower() if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObMViewMaintenanceService not init", KR(ret), KP(this)); - } else { - // start update cache task in follower - if (OB_FAIL(mview_update_cache_task_.start())) { - LOG_WARN("fail to start mview update cache task", KR(ret)); - } - stop(); + } else if (FALSE_IT(sys_ls_task_stop_())) { } const int64_t cost_us = ObTimeUtility::current_time() - start_time_us; FLOG_INFO("mview_maintenance: switch_to_follower", KR(ret), K(tenant_id), K(cost_us)); @@ -225,74 +228,71 @@ int ObMViewMaintenanceService::resume_leader() return ret; } -int ObMViewMaintenanceService::extract_sql_result(sqlclient::ObMySQLResult *mysql_result, - ObIArray &mview_ids, - ObIArray &last_refresh_scns, - ObIArray &mview_refresh_modes) -{ - int ret = OB_SUCCESS; - if (OB_ISNULL(mysql_result)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("mysql result is null", K(ret), KP(mysql_result)); - } else { - ObSEArray res_ids; - ObSEArray res_scns; - ObSEArray refresh_modes; - const int64_t col_idx0 = 0; - const int64_t col_idx1 = 1; - const int64_t col_idx2 = 2; - while (OB_SUCC(ret) && OB_SUCC(mysql_result->next())) { - uint64_t mview_id = OB_INVALID_ID; - uint64_t last_refresh_scn = OB_INVALID_SCN_VAL; - uint64_t refresh_mode = (uint64_t)ObMVRefreshMode::MAX; - if (OB_FAIL(mysql_result->get_uint(col_idx0, mview_id)) - || OB_FAIL(mysql_result->get_uint(col_idx1, last_refresh_scn)) - || OB_FAIL(mysql_result->get_uint(col_idx2, refresh_mode))) { - LOG_WARN("fail to get int/uint value", K(ret)); - } else if (OB_FAIL(res_ids.push_back(mview_id)) - || OB_FAIL(res_scns.push_back(last_refresh_scn)) - || OB_FAIL(refresh_modes.push_back(refresh_mode))) { - LOG_WARN("fail to push back array", K(ret)); - } - } - if (OB_LIKELY(OB_SUCCESS == ret || OB_ITER_END == ret)) { - if((OB_FAIL(mview_ids.assign(res_ids)) || - OB_FAIL(last_refresh_scns.assign(res_scns)) || - OB_FAIL(mview_refresh_modes.assign(refresh_modes)))) { - LOG_WARN("fail to assign array", K(ret)); - } - } - } - return ret; -} - int ObMViewMaintenanceService::update_mview_refresh_info_cache( const ObIArray &mview_ids, const ObIArray &mview_refresh_scns, - const ObIArray &mview_refresh_modes, - ObMviewRefreshInfoMap &mview_refresh_info_map) { + const ObIArray &mview_refresh_modes) { int ret = OB_SUCCESS; int update_cache_cnt = 0; - const int invalid_refresh_scn = 0; - int64_t now_ts = ObTimeUtility::fast_current_time(); - ARRAY_FOREACH_X(mview_ids, idx, cnt, OB_SUCC(ret)) { - RefreshInfo new_refresh_info; - if (mview_refresh_scns.at(idx) == invalid_refresh_scn) { - // skip update invalid scn in cache - } else if (mview_refresh_modes.at(idx) == (uint64_t)ObMVRefreshMode::MAJOR_COMPACTION) { - new_refresh_info.refresh_scn_ = mview_refresh_scns.at(idx); - new_refresh_info.refresh_ts_ = now_ts; - new_refresh_info.expired_ts_ = new_refresh_info.refresh_ts_ + - ObMViewMaintenanceService::CacheValidInterval; - if (OB_FAIL(mview_refresh_info_map.set_refactored(mview_ids.at(idx), new_refresh_info, 1/*overwrite*/))) { - LOG_WARN("fail to set refresh info", KR(ret), K(idx), K(mview_ids.at(idx))); + int64_t start_ts = ObTimeUtility::current_time(); + hash::ObHashSet update_set; + ObSEArray del_mview_id; + if (OB_FAIL(update_set.create(10))) { + LOG_WARN("init update set failed", KR(ret)); + } else { + ARRAY_FOREACH_X(mview_ids, idx, cnt, OB_SUCC(ret)) { + if (mview_refresh_scns.at(idx) > 0 && mview_refresh_modes.at(idx) == (uint64_t)ObMVRefreshMode::MAJOR_COMPACTION) { + MViewRefreshInfo cache_info; + bool need_update = true; + if (OB_FAIL(update_set.set_refactored(mview_ids.at(idx)))) { + LOG_WARN("fail to set mview_id", KR(ret), K(idx), K(mview_ids.at(idx))); + } else if (OB_FAIL(mview_refresh_info_cache_.get_refactored(mview_ids.at(idx), cache_info))) { + if (OB_HASH_NOT_EXIST) { + ret = OB_SUCCESS; + } + } else if (mview_refresh_scns.at(idx) == cache_info.refresh_scn_) { + need_update = false; + } + if (OB_SUCC(ret) && need_update) { + MViewRefreshInfo new_refresh_info; + new_refresh_info.refresh_scn_ = mview_refresh_scns.at(idx); + if (OB_FAIL(mview_refresh_info_cache_.set_refactored(mview_ids.at(idx), new_refresh_info, 1/*overwrite*/))) { + LOG_WARN("fail to set refresh info", KR(ret), K(idx), K(mview_ids.at(idx))); + } else { + update_cache_cnt++; + } + } } - update_cache_cnt += 1; - // for debug - LOG_INFO("update mview refresh info", K(ret), K(mview_refresh_scns.at(idx)), - K(update_cache_cnt)); } } + // remove deleted mview_id in cache + if (OB_SUCC(ret) && mview_refresh_info_cache_.size() != update_set.size()) { + for (MViewRefreshInfoCache::iterator it = mview_refresh_info_cache_.begin();OB_SUCC(ret) && it != mview_refresh_info_cache_.end(); it++) { + if (OB_FAIL(update_set.exist_refactored(it->first))) { + if (OB_HASH_EXIST == ret) { + ret = OB_SUCCESS; + } else if (OB_HASH_NOT_EXIST) { + if (OB_FAIL(del_mview_id.push_back(it->first))) { + LOG_WARN("del_mview_id push failed", KR(ret), K(it->first)); + } + } else { + LOG_WARN("check mview_id failed", KR(ret), K(it->first)); + } + } + } + for (int64_t idx = 0; idx < del_mview_id.count() && OB_SUCC(ret); idx++) { + if (OB_FAIL(mview_refresh_info_cache_.erase_refactored(del_mview_id.at(idx)))) { + LOG_WARN("erash mview failed", KR(ret), K(del_mview_id.at(idx))); + } + } + } + // update timestamp + if (OB_SUCC(ret)) { + mview_refresh_info_timestamp_ = start_ts; + } + int64_t end_ts = ObTimeUtility::current_time(); + LOG_INFO("update mview refresh info", K(ret), K(mview_ids), K(mview_refresh_scns), K(update_cache_cnt), K(del_mview_id), + "cost", end_ts - start_ts); return ret; } @@ -323,10 +323,10 @@ int ObMViewMaintenanceService:: tenant_id, sql.ptr()))) { LOG_WARN("fail to execute sql", K(ret), K(sql), K(tenant_id)); - } else if (OB_FAIL(extract_sql_result(res.get_result(), - mview_ids, - last_refresh_scns, - mview_refresh_modes))) { + } else if (OB_FAIL(mview_update_cache_task_.extract_sql_result(res.get_result(), + mview_ids, + last_refresh_scns, + mview_refresh_modes))) { LOG_WARN("failt to extract sql result", K(ret), K(sql), K(tenant_id)); } } @@ -339,32 +339,39 @@ int ObMViewMaintenanceService::fetch_mv_refresh_scns( const share::SCN &read_snapshot, ObIArray &mview_ids, ObIArray &mview_refresh_scns, - uint64_t ¬_hit_count) + bool &hit_cache) { int ret = OB_SUCCESS; + hit_cache = false; if (src_mview_ids.empty()) { // do nothing } else if (!read_snapshot.is_valid()) { - not_hit_count += 1; + } else if (ObTimeUtil::current_time() - mview_refresh_info_timestamp_ > CacheValidInterval) { + // cache expired } else { - set_last_request_ts(ObTimeUtility::fast_current_time()); - ARRAY_FOREACH_X(src_mview_ids, idx, cnt, OB_SUCC(ret) && 0 == not_hit_count) { - RefreshInfo refresh_info; - if (OB_FAIL(mview_refresh_info_map_.get_refactored(src_mview_ids.at(idx), refresh_info))) { + int64_t succ_cnt = 0; + ARRAY_FOREACH_X(src_mview_ids, idx, cnt, OB_SUCC(ret)) { + MViewRefreshInfo refresh_info; + if (OB_FAIL(mview_refresh_info_cache_.get_refactored(src_mview_ids.at(idx), refresh_info))) { if (OB_HASH_NOT_EXIST == ret) { ret = OB_SUCCESS; - not_hit_count += 1; + break; } LOG_WARN("fail to get refresh info", KR(ret), K(idx), K(src_mview_ids.at(idx))); } else { - if (refresh_info.hit_cache(read_snapshot)) { + if (read_snapshot.get_val_for_tx() >= refresh_info.refresh_scn_) { if (OB_FAIL(mview_refresh_scns.push_back(refresh_info.refresh_scn_))) { LOG_WARN("fail to push back refresh scns", KR(ret), K(idx), K(src_mview_ids.at(idx))); + } else { + succ_cnt++; } } else { - not_hit_count += 1; + break; } } + if (OB_SUCC(ret) && src_mview_ids.count() == succ_cnt) { + hit_cache = true; + } } } return ret; @@ -377,14 +384,13 @@ int ObMViewMaintenanceService::get_mview_refresh_info(const ObIArray & ObIArray &mview_refresh_scns) { int ret = OB_SUCCESS; - uint64_t not_hit_count = 0; + bool hit_cache = false; const uint64_t tenant_id = MTL_ID(); ObSEArray refresh_modes; ObSEArray refresh_scns; - if (!is_inited_ || !mview_refresh_info_map_.created()) { + if (!is_inited_) { ret = OB_NOT_INIT; - LOG_WARN("ObMViewMaintenanceService not init", KR(ret), - K(is_inited_), K(mview_refresh_info_map_.created())); + LOG_WARN("ObMViewMaintenanceService not init", KR(ret), K(is_inited_)); } else if (OB_ISNULL(sql_proxy)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), KP(sql_proxy)); @@ -394,11 +400,11 @@ int ObMViewMaintenanceService::get_mview_refresh_info(const ObIArray & } else if (src_mview_ids.empty()) { // do nothing } else if (OB_FAIL(fetch_mv_refresh_scns(src_mview_ids, read_snapshot, - mview_ids, refresh_scns, not_hit_count))){ + mview_ids, refresh_scns, hit_cache))){ LOG_WARN("fail to fetch mv refresh scns", KR(ret), K(tenant_id), K(src_mview_ids)); } if (OB_FAIL(ret)) { - } else if (not_hit_count == 0) { + } else if (hit_cache) { if (OB_FAIL(mview_ids.assign(src_mview_ids)) || OB_FAIL(mview_refresh_scns.assign(refresh_scns))) { LOG_WARN("fail to assign mview ids or mview refresh scns", K(ret)); @@ -413,19 +419,13 @@ int ObMViewMaintenanceService::get_mview_refresh_info(const ObIArray & mview_refresh_scns, refresh_modes))) { LOG_WARN("fail to get mview last refresh info", K(ret), K(src_mview_ids), K(tenant_id)); - } else if (OB_FAIL(ObMViewMaintenanceService:: - update_mview_refresh_info_cache(mview_ids, - mview_refresh_scns, - refresh_modes, - mview_refresh_info_map_))) { - LOG_WARN("fail to update mview refresh info cache", K(ret), K(tenant_id)); } } - // for debug - LOG_INFO("use mview refresh info cache", - K(src_mview_ids), K(mview_ids), - K(tenant_id), K(not_hit_count), + if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) { + LOG_INFO("get_mview_refresh_info", K(ret), K(src_mview_ids), K(mview_ids), + K(tenant_id), K(hit_cache), K(mview_refresh_scns), K(read_snapshot)); + } return ret; } } // namespace rootserver diff --git a/src/rootserver/mview/ob_mview_maintenance_service.h b/src/rootserver/mview/ob_mview_maintenance_service.h index da25ca007..1a057ec14 100644 --- a/src/rootserver/mview/ob_mview_maintenance_service.h +++ b/src/rootserver/mview/ob_mview_maintenance_service.h @@ -28,29 +28,20 @@ namespace oceanbase { namespace rootserver { -class ObMviewUpdateCacheTask; -struct RefreshInfo -{ - uint64_t refresh_scn_; - int64_t refresh_ts_; - int64_t expired_ts_; - bool hit_cache(const share::SCN &read_snapshot) { - return expired_ts_ >= read_snapshot.convert_to_ts() && - read_snapshot.get_val_for_tx() >= refresh_scn_; - } - RefreshInfo() : refresh_scn_(0), refresh_ts_(0), expired_ts_(0) - {}; -}; -typedef common::hash:: - ObHashMap - ObMviewRefreshInfoMap; class ObMViewMaintenanceService : public logservice::ObIReplaySubHandler, public logservice::ObICheckpointSubHandler, public logservice::ObIRoleChangeSubHandler { public: static const int64_t CacheValidInterval = 30 * 1000 * 1000; //30s + struct MViewRefreshInfo + { + uint64_t refresh_scn_; + MViewRefreshInfo() : refresh_scn_(0) + {}; + }; + typedef hash::ObHashMap MViewRefreshInfoCache; public: ObMViewMaintenanceService(); virtual ~ObMViewMaintenanceService(); @@ -101,27 +92,21 @@ public: ObIArray &mview_ids, ObIArray &last_refresh_scns, ObIArray &mview_refresh_modes); - static int extract_sql_result(sqlclient::ObMySQLResult *mysql_result, - ObIArray &mview_ids, - ObIArray &last_refresh_scns, - ObIArray &mview_refresh_modes); - static int update_mview_refresh_info_cache(const ObIArray &mview_ids, - const ObIArray &mview_refresh_scns, - const ObIArray &mview_refresh_modes, - ObMviewRefreshInfoMap &mview_refresh_info_map); + int update_mview_refresh_info_cache(const ObIArray &mview_ids, + const ObIArray &mview_refresh_scns, + const ObIArray &mview_refresh_modes); int fetch_mv_refresh_scns(const ObIArray &src_mview_ids, const share::SCN &read_snapshot, ObIArray &mview_ids, ObIArray &mview_refresh_scns, - uint64_t ¬_hit_count); - int64_t get_last_request_ts() const { return ATOMIC_LOAD(&last_request_ts_); } - void set_last_request_ts(const int64_t last_request_ts) { ATOMIC_STORE(&last_request_ts_, last_request_ts); } - ObMviewRefreshInfoMap &get_mview_refresh_info_map() { return mview_refresh_info_map_; } + bool &hit_cache); private: int inner_switch_to_leader(); int inner_switch_to_follower(); + void sys_ls_task_stop_(); private: + bool is_inited_; ObMLogMaintenanceTask mlog_maintenance_task_; ObMViewMaintenanceTask mview_maintenance_task_; ObMViewRefreshStatsMaintenanceTask mvref_stats_maintenance_task_; @@ -131,9 +116,8 @@ private: ObCollectMvMergeInfoTask collect_mv_merge_info_task_; ObMViewCleanSnapshotTask mview_clean_snapshot_task_; ObMviewUpdateCacheTask mview_update_cache_task_; - ObMviewRefreshInfoMap mview_refresh_info_map_; - int64_t last_request_ts_; - bool is_inited_; + MViewRefreshInfoCache mview_refresh_info_cache_; + int64_t mview_refresh_info_timestamp_; }; } // namespace rootserver diff --git a/src/rootserver/mview/ob_mview_push_refresh_scn_task.cpp b/src/rootserver/mview/ob_mview_push_refresh_scn_task.cpp index 25c5c32e7..120af7bc1 100644 --- a/src/rootserver/mview/ob_mview_push_refresh_scn_task.cpp +++ b/src/rootserver/mview/ob_mview_push_refresh_scn_task.cpp @@ -23,17 +23,20 @@ #include "share/backup/ob_backup_data_table_operator.h" #include "storage/mview/ob_mview_refresh_stats_purge.h" #include "observer/ob_inner_sql_connection.h" +#include "share/ob_all_server_tracer.h" namespace oceanbase { namespace rootserver { #define QUERY_MAJOR_MV_MERGE_SCN_SQL "select mview_id,t2.data_table_id,last_refresh_scn,t3.tablet_id, \ - t4.svr_ip,t4.svr_port,t4.ls_id,t5.learner_list, t4.end_log_scn from %s t1 \ + t4.svr_ip,t4.svr_port,t4.ls_id,t4.end_log_scn, \ + locate(concat(t4.svr_ip,\":\", t4.svr_port), t5.paxos_member_list) > 0 is_member, \ + locate(concat(t4.svr_ip,\":\", t4.svr_port), t5.learner_list) > 0 is_leaner from %s t1 \ left join %s t2 on t1.mview_id = t2.table_id \ left join %s t3 on t2.data_table_id = t3.table_id \ left join %s t4 on t3.tablet_id = t4.tablet_id and t4.table_type = 10 \ left join %s t5 on t4.svr_ip = t5.svr_ip and t4.svr_port = t5.svr_port and t4.ls_id = t5.ls_id \ - where t1.refresh_mode = %ld and t1.last_refresh_scn > 0 order by 1,2,3,4,5,6,7,9" + where t1.refresh_mode = %ld and t1.last_refresh_scn > 0 order by 1,2,3,4,5,6,7,8" ObMViewPushRefreshScnTask::ObMViewPushRefreshScnTask() @@ -113,7 +116,7 @@ void ObMViewPushRefreshScnTask::runTimerTask() } else if (OB_FAIL(need_schedule_major_refresh_mv_task(tenant_id_, need_schedule))) { LOG_WARN("fail to check need schedule major refresh mv task", KR(ret), K(tenant_id_)); } else if (!need_schedule) { - } else if (FALSE_IT(void(check_major_mv_refresh_scn_safety(tenant_id_)))) { + } else if (REACH_TIME_INTERVAL(300 * 1000 * 1000) && FALSE_IT(void(check_major_mv_refresh_scn_safety(tenant_id_)))) { } else if (OB_UNLIKELY(OB_ISNULL(sql_proxy))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("sql proxy is null", KR(ret)); @@ -192,16 +195,19 @@ int ObMViewPushRefreshScnTask::check_major_mv_refresh_scn_safety(const uint64_t } } } + bool alive = true; + // ignore ret + SVR_TRACER.check_server_alive(merge_info.svr_addr_, alive); if (find_dest_merge_scn) { - } else if (merge_info.has_learner_) { - LOG_WARN("major_mv_safety>>>>", K(merge_info)); + } else if (!merge_info.is_member_ || merge_info.is_learner_) { + LOG_WARN("major_mv_safety>>>>", K(merge_info), K(alive)); } else { - LOG_ERROR("major_mv_safety>>>>", K(merge_info)); + LOG_ERROR("major_mv_safety>>>>", K(merge_info), K(alive)); is_safety = false; } } } - LOG_INFO("major_mv_safety>>>>", K(is_safety)); + LOG_INFO("major_mv_safety<<<<<<<<<<<", K(is_safety)); } return ret; } @@ -275,27 +281,23 @@ int ObMViewPushRefreshScnTask::get_major_mv_merge_info_(const uint64_t tenant_id char svr_ip[OB_IP_STR_BUFF] = ""; int64_t svr_port = 0; int64_t tmp_real_str_len = 0; - ObString learner_list; EXTRACT_INT_FIELD_MYSQL(*result, "mview_id", merge_info.mview_id_, int64_t); EXTRACT_INT_FIELD_MYSQL(*result, "data_table_id", merge_info.data_table_id_, int64_t); EXTRACT_UINT_FIELD_MYSQL(*result, "last_refresh_scn", merge_info.last_refresh_scn_, uint64_t); EXTRACT_INT_FIELD_MYSQL(*result, "tablet_id", merge_info.tablet_id_, int64_t); EXTRACT_STRBUF_FIELD_MYSQL(*result, "svr_ip", svr_ip, OB_IP_STR_BUFF, tmp_real_str_len); EXTRACT_INT_FIELD_MYSQL(*result, "svr_port", svr_port, int64_t); - (void)merge_info.svr_addr_.set_ip_addr(svr_ip, static_cast(svr_port)); EXTRACT_INT_FIELD_MYSQL(*result, "ls_id", merge_info.ls_id_, int64_t); EXTRACT_UINT_FIELD_MYSQL(*result, "end_log_scn", merge_info.end_log_scn_, uint64_t); - EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(*result, "learner_list", learner_list); - if (learner_list.length() > 0) { - merge_info.has_learner_ = true; - } else { - merge_info.has_learner_ = false; - } - + EXTRACT_INT_FIELD_MYSQL(*result, "is_member", merge_info.is_member_, int64_t); + EXTRACT_INT_FIELD_MYSQL(*result, "is_leaner", merge_info.is_learner_, int64_t); if (OB_FAIL(ret)) { LOG_WARN("fail to extract field from result", KR(ret)); - } else if (OB_FAIL(merge_info_array.push_back(merge_info))) { - LOG_WARN("fail to push merge_info to array", KR(ret)); + } else { + (void)merge_info.svr_addr_.set_ip_addr(svr_ip, static_cast(svr_port)); + if (OB_FAIL(merge_info_array.push_back(merge_info))) { + LOG_WARN("fail to push merge_info to array", KR(ret)); + } } if (OB_FAIL(ret)) { is_result_next_err = false; diff --git a/src/rootserver/mview/ob_mview_push_refresh_scn_task.h b/src/rootserver/mview/ob_mview_push_refresh_scn_task.h index ac521e6f7..1376bd61b 100644 --- a/src/rootserver/mview/ob_mview_push_refresh_scn_task.h +++ b/src/rootserver/mview/ob_mview_push_refresh_scn_task.h @@ -38,12 +38,13 @@ public: tablet_id_(0), svr_addr_(), ls_id_(), - has_learner_(false), - end_log_scn_(0) + end_log_scn_(0), + is_member_(false), + is_learner_(false) {} ~ObMajorMVMergeInfo() {} TO_STRING_KV(K_(mview_id), K_(data_table_id), K_(last_refresh_scn), K_(tablet_id), - K_(svr_addr), K_(ls_id), K_(has_learner), K_(end_log_scn)); + K_(svr_addr), K_(ls_id), K_(end_log_scn), K_(is_member), K_(is_learner)); bool is_valid() { return mview_id_ > 0 && data_table_id_ > 0 && last_refresh_scn_ > 0 && svr_addr_.is_valid() && ls_id_.is_valid() && tablet_id_ > 0; @@ -60,8 +61,9 @@ public: int64_t tablet_id_; ObAddr svr_addr_; share::ObLSID ls_id_; - bool has_learner_; uint64_t end_log_scn_; + bool is_member_; + bool is_learner_; }; class ObMViewPushRefreshScnTask : public ObMViewTimerTask diff --git a/src/rootserver/mview/ob_mview_update_cache_task.cpp b/src/rootserver/mview/ob_mview_update_cache_task.cpp index 0a9b8da0d..53b0051b2 100644 --- a/src/rootserver/mview/ob_mview_update_cache_task.cpp +++ b/src/rootserver/mview/ob_mview_update_cache_task.cpp @@ -110,10 +110,6 @@ void ObMviewUpdateCacheTask::runTimerTask() ObMySQLProxy *sql_proxy = GCTX.sql_proxy_; rootserver::ObMViewMaintenanceService *mview_maintenance_service = MTL(rootserver::ObMViewMaintenanceService*); - int64_t current_ts = ObTimeUtility::fast_current_time(); - int64_t last_request_ts; - const int64_t NeedUpdateCacheInterval = 10 * 60 * 1000 * 1000; // 10min - // check request time; if (OB_ISNULL(sql_proxy) || OB_ISNULL(mview_maintenance_service)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("sql proxy is null or ObMViewMaintenanceService is null", @@ -121,16 +117,8 @@ void ObMviewUpdateCacheTask::runTimerTask() } else if (!is_valid_tenant_id(tenant_id)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tenant id is invalid", KR(ret), K(tenant_id)); - } else if (OB_FALSE_IT(last_request_ts = mview_maintenance_service->get_last_request_ts())) { - } else if (last_request_ts < current_ts && - current_ts - last_request_ts > NeedUpdateCacheInterval) { - // clear cache - if (!mview_maintenance_service->get_mview_refresh_info_map().empty()) { - mview_maintenance_service->get_mview_refresh_info_map().clear(); - } } else { const int64_t refresh_mode = (int64_t)ObMVRefreshMode::MAJOR_COMPACTION; - ObMviewRefreshInfoMap &mview_refresh_info_map = mview_maintenance_service->get_mview_refresh_info_map(); ObSEArray mview_ids; ObSEArray mview_refresh_scns; ObSEArray mview_refresh_modes; @@ -143,8 +131,7 @@ void ObMviewUpdateCacheTask::runTimerTask() tenant_id, sql.ptr()))) { LOG_WARN("fail to execute sql", K(ret), K(sql), K(tenant_id)); - } else if (OB_FAIL(ObMViewMaintenanceService:: - extract_sql_result(res.get_result(), + } else if (OB_FAIL(extract_sql_result(res.get_result(), mview_ids, mview_refresh_scns, mview_refresh_modes))) { @@ -152,19 +139,56 @@ void ObMviewUpdateCacheTask::runTimerTask() } } if (OB_FAIL(ret)) { - //do nothing - } else if (mview_ids.empty()) { // do nothing - } else if (OB_FAIL(ObMViewMaintenanceService:: - update_mview_refresh_info_cache(mview_ids, - mview_refresh_scns, - mview_refresh_modes, - mview_refresh_info_map))){ + } else if (OB_FAIL(mview_maintenance_service->update_mview_refresh_info_cache(mview_ids, + mview_refresh_scns, + mview_refresh_modes))){ LOG_WARN("fail to update mview refresh info cache", K(ret), K(mview_ids), K(mview_refresh_scns), K(mview_refresh_modes), K(tenant_id)); } } } +int ObMviewUpdateCacheTask::extract_sql_result(sqlclient::ObMySQLResult *mysql_result, + ObIArray &mview_ids, + ObIArray &last_refresh_scns, + ObIArray &mview_refresh_modes) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(mysql_result)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("mysql result is null", K(ret), KP(mysql_result)); + } else { + ObSEArray res_ids; + ObSEArray res_scns; + ObSEArray refresh_modes; + const int64_t col_idx0 = 0; + const int64_t col_idx1 = 1; + const int64_t col_idx2 = 2; + while (OB_SUCC(ret) && OB_SUCC(mysql_result->next())) { + uint64_t mview_id = OB_INVALID_ID; + uint64_t last_refresh_scn = OB_INVALID_SCN_VAL; + uint64_t refresh_mode = (uint64_t)ObMVRefreshMode::MAX; + if (OB_FAIL(mysql_result->get_uint(col_idx0, mview_id)) + || OB_FAIL(mysql_result->get_uint(col_idx1, last_refresh_scn)) + || OB_FAIL(mysql_result->get_uint(col_idx2, refresh_mode))) { + LOG_WARN("fail to get int/uint value", K(ret)); + } else if (OB_FAIL(res_ids.push_back(mview_id)) + || OB_FAIL(res_scns.push_back(last_refresh_scn)) + || OB_FAIL(refresh_modes.push_back(refresh_mode))) { + LOG_WARN("fail to push back array", K(ret)); + } + } + if (OB_LIKELY(OB_SUCCESS == ret || OB_ITER_END == ret)) { + if((OB_FAIL(mview_ids.assign(res_ids)) || + OB_FAIL(last_refresh_scns.assign(res_scns)) || + OB_FAIL(mview_refresh_modes.assign(refresh_modes)))) { + LOG_WARN("fail to assign array", K(ret)); + } + } + } + return ret; +} + +} } -} \ No newline at end of file diff --git a/src/rootserver/mview/ob_mview_update_cache_task.h b/src/rootserver/mview/ob_mview_update_cache_task.h index d364f2a69..1cc64b108 100644 --- a/src/rootserver/mview/ob_mview_update_cache_task.h +++ b/src/rootserver/mview/ob_mview_update_cache_task.h @@ -43,6 +43,10 @@ public: void clean_up(); void runTimerTask() override; DISABLE_COPY_ASSIGN(ObMviewUpdateCacheTask); + int extract_sql_result(sqlclient::ObMySQLResult *mysql_result, + ObIArray &mview_ids, + ObIArray &last_refresh_scns, + ObIArray &mview_refresh_modes); private: bool is_inited_; bool is_stop_; diff --git a/src/rootserver/restore/ob_restore_scheduler.cpp b/src/rootserver/restore/ob_restore_scheduler.cpp index 8ee146c36..ebb6cfe42 100644 --- a/src/rootserver/restore/ob_restore_scheduler.cpp +++ b/src/rootserver/restore/ob_restore_scheduler.cpp @@ -1928,9 +1928,6 @@ int ObRestoreScheduler::wait_restore_safe_mview_merge_info_() } else if (OB_FAIL(global_proxy.get_major_refresh_mv_merge_scn(false, /*for update*/ major_mv_merge_scn))) { LOG_WARN("fail to get major_refresh_mv_merge_scn", K(ret), K(tenant_id_)); - } else if (OB_FAIL(ObMViewTimerTask::check_mview_last_refresh_scn(tenant_id_, - major_mv_merge_scn))) { - LOG_WARN("fail to check mview last refesh scn", K(ret), K(tenant_id_), K(major_mv_merge_scn)); } if (OB_SUCC(ret)) { if (mv_lastest_merge_scn < major_mv_merge_scn) {