fix progress_checker may loop after finish & use same locality in RS major check
This commit is contained in:
parent
0e2f1802c0
commit
4a3cf27c64
@ -75,7 +75,7 @@ int ObMajorMergeProgressChecker::init(
|
||||
LOG_WARN("fail to create table compaction info map", KR(ret), K_(tenant_id), K(TABLE_MAP_BUCKET_CNT));
|
||||
} else if (OB_FAIL(ckm_validator_.init(is_primary_service, sql_proxy))) {
|
||||
LOG_WARN("fail to init checksum validator", KR(ret), K_(tenant_id));
|
||||
} else if (OB_FAIL(ls_locality_cache_.init(tenant_id_))) {
|
||||
} else if (OB_FAIL(ls_locality_cache_.init(tenant_id_, &merge_info_mgr))) {
|
||||
LOG_WARN("failed to init ls locality cache", K(ret));
|
||||
} else {
|
||||
idx_ckm_validate_array_.set_attr(ObMemAttr(tenant_id_, "RSCompCkmPair"));
|
||||
@ -401,6 +401,8 @@ int ObMajorMergeProgressChecker::check_index_and_rest_table()
|
||||
LOG_WARN("failed to validate index checksum", KR(ret), K_(compaction_scn));
|
||||
} else if (OB_FAIL(deal_with_rest_data_table())) {
|
||||
LOG_WARN("deal with rest data table", KR(ret), K_(compaction_scn));
|
||||
} else if (progress_.is_merge_finished()) {
|
||||
LOG_INFO("progress is check finished", KR(ret), K_(progress));
|
||||
} else if (progress_.only_remain_special_table_to_verified()) {
|
||||
bool finish_validate = false;
|
||||
#ifdef ERRSIM
|
||||
|
@ -25,6 +25,7 @@ namespace share
|
||||
ObCompactionLocalityCache::ObCompactionLocalityCache()
|
||||
: is_inited_(false),
|
||||
tenant_id_(OB_INVALID_TENANT_ID),
|
||||
merge_info_mgr_(nullptr),
|
||||
ls_infos_map_()
|
||||
{}
|
||||
|
||||
@ -33,7 +34,7 @@ ObCompactionLocalityCache::~ObCompactionLocalityCache()
|
||||
destroy();
|
||||
}
|
||||
|
||||
int ObCompactionLocalityCache::init(const uint64_t tenant_id)
|
||||
int ObCompactionLocalityCache::init(const uint64_t tenant_id, rootserver::ObMajorMergeInfoManager *merge_info_mgr)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
@ -49,6 +50,7 @@ int ObCompactionLocalityCache::init(const uint64_t tenant_id)
|
||||
destroy();
|
||||
} else {
|
||||
tenant_id_ = tenant_id;
|
||||
merge_info_mgr_ = merge_info_mgr;
|
||||
is_inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
@ -57,6 +59,7 @@ int ObCompactionLocalityCache::init(const uint64_t tenant_id)
|
||||
void ObCompactionLocalityCache::destroy()
|
||||
{
|
||||
is_inited_ = false;
|
||||
merge_info_mgr_ = nullptr;
|
||||
if (ls_infos_map_.created()) {
|
||||
ls_infos_map_.destroy();
|
||||
}
|
||||
@ -83,7 +86,11 @@ int ObCompactionLocalityCache::inner_refresh_ls_locality()
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObStorageLocalityCache is not inited", KR(ret), K_(tenant_id));
|
||||
} else if (OB_FAIL(get_zone_list(zone_list))) {
|
||||
} else if (nullptr != merge_info_mgr_) {
|
||||
if (OB_FAIL(merge_info_mgr_->get_zone_merge_mgr().get_zone(zone_list))) {
|
||||
LOG_WARN("failed to get zone list", KR(ret));
|
||||
}
|
||||
} else if (OB_FAIL(get_zone_list_from_inner_table(zone_list))) {
|
||||
LOG_WARN("failed to get zone list", K(ret), K_(tenant_id));
|
||||
} else if (zone_list.empty()) {
|
||||
LOG_INFO("zone list is empty, skip get ls locality", K(ret), K_(tenant_id));
|
||||
@ -92,7 +99,8 @@ int ObCompactionLocalityCache::inner_refresh_ls_locality()
|
||||
share::ObDiagnoseTabletType::TYPE_MEDIUM_MERGE))) {
|
||||
LOG_WARN("failed to add diagnose tablet for locality cache", K(ret));
|
||||
}
|
||||
} else {
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
// 1. clear ls_infos cached in memory
|
||||
ls_infos_map_.reuse();
|
||||
// 2. load ls_infos from __all_ls_meta_table
|
||||
@ -115,7 +123,7 @@ int ObCompactionLocalityCache::inner_refresh_ls_locality()
|
||||
}
|
||||
}
|
||||
cost_ts = ObTimeUtility::fast_current_time() - cost_ts;
|
||||
LOG_INFO("finish to refresh ls locality cache", KR(ret), K_(tenant_id), K(cost_ts));
|
||||
LOG_INFO("finish to refresh ls locality cache", KR(ret), K_(tenant_id), K(cost_ts), K(zone_list));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -163,7 +171,7 @@ int ObCompactionLocalityCache::refresh_by_zone(
|
||||
if (FAILEDx(ls_infos_map_.set_refactored(ls_id, tmp_ls_info, 1/*overwrite*/))) {
|
||||
LOG_WARN("fail to set refactored", KR(ret), K(ls_id), K(tmp_ls_info));
|
||||
} else {
|
||||
FLOG_INFO("success to refresh cached ls_info", K(ret), K(tmp_ls_info));
|
||||
FLOG_INFO("success to refresh cached ls_info", K(ret), K(tmp_ls_info), K(zone_list));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -185,7 +193,7 @@ bool ObCompactionLocalityCache::replica_in_zone_list(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCompactionLocalityCache::get_zone_list(ObIArray<ObZone> &zone_list)
|
||||
int ObCompactionLocalityCache::get_zone_list_from_inner_table(ObIArray<ObZone> &zone_list)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
zone_list.reuse();
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include "share/ls/ob_ls_table_operator.h"
|
||||
#include "deps/oblib/src/lib/net/ob_addr.h"
|
||||
#include "deps/oblib/src/common/ob_zone.h"
|
||||
#include "rootserver/freeze/ob_major_merge_info_manager.h"
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace common
|
||||
@ -33,16 +34,16 @@ class ObCompactionLocalityCache
|
||||
public:
|
||||
ObCompactionLocalityCache();
|
||||
~ObCompactionLocalityCache();
|
||||
int init(const uint64_t tenant_id);
|
||||
int init(const uint64_t tenant_id, rootserver::ObMajorMergeInfoManager *merge_info_mgr = nullptr);
|
||||
void destroy();
|
||||
bool empty() const { return ls_infos_map_.empty(); }
|
||||
int refresh_ls_locality(const bool force_refresh = false);
|
||||
int refresh_ls_locality(const bool force_refresh);
|
||||
int get_ls_info(const share::ObLSID &ls_id, share::ObLSInfo &ls_info);
|
||||
TO_STRING_KV(K_(is_inited), K_(tenant_id));
|
||||
|
||||
private:
|
||||
const int64_t CHECK_LS_LOCALITY_INTERVAL = 5 * 60 * 1000 * 1000L; // 5 mins
|
||||
int get_zone_list(ObIArray<common::ObZone> &zone_list);
|
||||
int get_zone_list_from_inner_table(ObIArray<common::ObZone> &zone_list);
|
||||
int str2zone_list(
|
||||
const char *str,
|
||||
ObIArray<common::ObZone> &zone_list);
|
||||
@ -57,6 +58,7 @@ private:
|
||||
private:
|
||||
bool is_inited_;
|
||||
uint64_t tenant_id_;
|
||||
rootserver::ObMajorMergeInfoManager *merge_info_mgr_;
|
||||
common::hash::ObHashMap<share::ObLSID, share::ObLSInfo> ls_infos_map_;
|
||||
};
|
||||
|
||||
|
@ -27,6 +27,7 @@ ObCompactionScheduleIterator::ObCompactionScheduleIterator(
|
||||
is_major_(is_major),
|
||||
scan_finish_(false),
|
||||
merge_finish_(false),
|
||||
report_scn_flag_(false),
|
||||
ls_idx_(-1),
|
||||
tablet_idx_(0),
|
||||
schedule_tablet_cnt_(0),
|
||||
@ -150,7 +151,7 @@ int ObCompactionScheduleIterator::get_next_tablet(ObTabletHandle &tablet_handle)
|
||||
if (OB_TABLET_NOT_EXIST == ret) {
|
||||
tablet_idx_++;
|
||||
} else {
|
||||
LOG_WARN("fail to get tablet", K(ret), K(tablet_idx_), K(tablet_id), K_(timeout_us));
|
||||
LOG_WARN("fail to get tablet", K(ret), K(tablet_idx_), K(tablet_id));
|
||||
}
|
||||
} else {
|
||||
tablet_handle.set_wash_priority(WashTabletPriority::WTP_LOW);
|
||||
@ -220,7 +221,7 @@ int ObCompactionScheduleIterator::get_tablet_ids()
|
||||
int ObCompactionScheduleIterator::get_tablet_handle(
|
||||
const ObTabletID &tablet_id, ObTabletHandle &tablet_handle)
|
||||
{
|
||||
int ret = ls_tablet_svr_->get_tablet(tablet_id, tablet_handle, timeout_us_);
|
||||
int ret = ls_tablet_svr_->get_tablet(tablet_id, tablet_handle, 0/*timeout*/);
|
||||
#ifdef ERRSIM
|
||||
if (OB_SUCC(ret) && tablet_id.id() > ObTabletID::MIN_USER_TABLET_ID) {
|
||||
ret = OB_E(EventTable::EN_COMPACTION_ITER_TABLET_NOT_EXIST) ret;
|
||||
|
@ -78,7 +78,6 @@ private:
|
||||
bool scan_finish_;
|
||||
bool merge_finish_;
|
||||
bool report_scn_flag_;
|
||||
int64_t timeout_us_;
|
||||
int64_t ls_idx_;
|
||||
int64_t tablet_idx_;
|
||||
int64_t schedule_tablet_cnt_;
|
||||
|
@ -223,7 +223,7 @@ int ObTenantMediumChecker::check_medium_finish_schedule()
|
||||
LOG_WARN("ObTenantMediumChecker is not inited", K(ret));
|
||||
} else {
|
||||
// refresh ls locality cache
|
||||
if (OB_TMP_FAIL(ls_locality_cache_.refresh_ls_locality())) {
|
||||
if (OB_TMP_FAIL(ls_locality_cache_.refresh_ls_locality(false /*force_refresh*/))) {
|
||||
LOG_WARN("failed to refresh ls locality", K(tmp_ret));
|
||||
ADD_COMMON_SUSPECT_INFO(MEDIUM_MERGE, share::ObDiagnoseTabletType::TYPE_MEDIUM_MERGE,
|
||||
SUSPECT_FAILED_TO_REFRESH_LS_LOCALITY, tmp_ret);
|
||||
|
@ -1413,12 +1413,12 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (is_leader && could_major_merge && OB_TMP_FAIL(ls_start_schedule_medium(ls_id, ls_could_schedule_medium))) {
|
||||
if (OB_FAIL(ret) || !is_leader) {
|
||||
} else if (could_major_merge && OB_TMP_FAIL(ls_start_schedule_medium(ls_id, ls_could_schedule_medium))) {
|
||||
LOG_WARN("failed to set start schedule medium", K(ret), K(tmp_ret), K(ls_id));
|
||||
} else if (!ls_could_schedule_medium) { // not allow schedule medium
|
||||
if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) {
|
||||
LOG_INFO("tenant is blocking schedule medium", KR(ret), K(MTL_ID()), K(ls_id));
|
||||
LOG_INFO("tenant is blocking schedule medium", KR(ret), K(MTL_ID()), K(ls_id), K(is_leader), K(could_major_merge));
|
||||
}
|
||||
}
|
||||
bool enable_adaptive_compaction = get_enable_adaptive_compaction();
|
||||
|
Loading…
x
Reference in New Issue
Block a user