fix compaction diagnose info and scn_revert defence, and optimize the mechanism of fetching ls_info
This commit is contained in:
@ -38,7 +38,8 @@ ObMajorMergeProgressChecker::ObMajorMergeProgressChecker()
|
|||||||
schema_service_(nullptr), zone_merge_mgr_(nullptr), lst_operator_(nullptr),
|
schema_service_(nullptr), zone_merge_mgr_(nullptr), lst_operator_(nullptr),
|
||||||
server_trace_(nullptr), tablet_compaction_map_(), table_count_(0), table_ids_(),
|
server_trace_(nullptr), tablet_compaction_map_(), table_count_(0), table_ids_(),
|
||||||
table_compaction_map_(), tablet_validator_(), index_validator_(), cross_cluster_validator_(),
|
table_compaction_map_(), tablet_validator_(), index_validator_(), cross_cluster_validator_(),
|
||||||
uncompacted_tablets_(), diagnose_rw_lock_(ObLatchIds::MAJOR_FREEZE_DIAGNOSE_LOCK)
|
uncompacted_tablets_(), diagnose_rw_lock_(ObLatchIds::MAJOR_FREEZE_DIAGNOSE_LOCK),
|
||||||
|
ls_infos_map_()
|
||||||
{}
|
{}
|
||||||
|
|
||||||
int ObMajorMergeProgressChecker::init(
|
int ObMajorMergeProgressChecker::init(
|
||||||
@ -60,6 +61,8 @@ int ObMajorMergeProgressChecker::init(
|
|||||||
LOG_WARN("fail to create tablet compaction status map", KR(ret), K(tenant_id), K(DEFAULT_MAP_BUCKET_CNT));
|
LOG_WARN("fail to create tablet compaction status map", KR(ret), K(tenant_id), K(DEFAULT_MAP_BUCKET_CNT));
|
||||||
} else if (OB_FAIL(table_compaction_map_.create(DEFAULT_MAP_BUCKET_CNT, "MFTbCompMap", "MFTbCompMap", tenant_id))) {
|
} else if (OB_FAIL(table_compaction_map_.create(DEFAULT_MAP_BUCKET_CNT, "MFTbCompMap", "MFTbCompMap", tenant_id))) {
|
||||||
LOG_WARN("fail to create table compaction status map", KR(ret), K(tenant_id), K(DEFAULT_MAP_BUCKET_CNT));
|
LOG_WARN("fail to create table compaction status map", KR(ret), K(tenant_id), K(DEFAULT_MAP_BUCKET_CNT));
|
||||||
|
} else if (OB_FAIL(ls_infos_map_.create(300, "MFLsInfoMap", "MFLsInfoMap", tenant_id))) {
|
||||||
|
LOG_WARN("fail to create table compaction status map", KR(ret), K(tenant_id));
|
||||||
} else if (OB_FAIL(tablet_validator_.init(tenant_id, is_primary_service, sql_proxy, zone_merge_mgr))) {
|
} else if (OB_FAIL(tablet_validator_.init(tenant_id, is_primary_service, sql_proxy, zone_merge_mgr))) {
|
||||||
LOG_WARN("fail to init tablet validator", KR(ret), K(tenant_id));
|
LOG_WARN("fail to init tablet validator", KR(ret), K(tenant_id));
|
||||||
} else if (OB_FAIL(index_validator_.init(tenant_id, is_primary_service, sql_proxy, zone_merge_mgr))) {
|
} else if (OB_FAIL(index_validator_.init(tenant_id, is_primary_service, sql_proxy, zone_merge_mgr))) {
|
||||||
@ -253,6 +256,8 @@ int ObMajorMergeProgressChecker::check_merge_progress(
|
|||||||
LOG_WARN("fail to generate tablet table map", K_(tenant_id), KR(ret));
|
LOG_WARN("fail to generate tablet table map", K_(tenant_id), KR(ret));
|
||||||
} else if (OB_FAIL(schema_guard.get_table_ids_in_tenant(tenant_id_, table_ids_))) {
|
} else if (OB_FAIL(schema_guard.get_table_ids_in_tenant(tenant_id_, table_ids_))) {
|
||||||
LOG_WARN("fail to get table ids in tenant", KR(ret), K_(tenant_id));
|
LOG_WARN("fail to get table ids in tenant", KR(ret), K_(tenant_id));
|
||||||
|
} else if (OB_FAIL(refresh_ls_infos())) {
|
||||||
|
LOG_WARN("fail to refresh ls infos", KR(ret), K_(tenant_id));
|
||||||
} else {
|
} else {
|
||||||
ObTabletInfo tablet_info;
|
ObTabletInfo tablet_info;
|
||||||
while (!stop && OB_SUCC(ret)) {
|
while (!stop && OB_SUCC(ret)) {
|
||||||
@ -371,15 +376,18 @@ int ObMajorMergeProgressChecker::check_tablet(
|
|||||||
ObLSInfo ls_info;
|
ObLSInfo ls_info;
|
||||||
int64_t cluster_id = GCONF.cluster_id;
|
int64_t cluster_id = GCONF.cluster_id;
|
||||||
const ObLSID &ls_id = tablet_info.get_ls_id();
|
const ObLSID &ls_id = tablet_info.get_ls_id();
|
||||||
{
|
if (OB_FAIL(ls_infos_map_.get_refactored(ls_id, ls_info))) {
|
||||||
FREEZE_TIME_GUARD;
|
if (OB_HASH_NOT_EXIST == ret) {
|
||||||
if (OB_FAIL(lst_operator_->get(cluster_id, tenant_id_,
|
// ls_info does not exist, ignore this tablet
|
||||||
ls_id, share::ObLSTable::DEFAULT_MODE, ls_info))) {
|
ret = OB_SUCCESS;
|
||||||
LOG_WARN("fail to get ls info", KR(ret), K_(tenant_id), K(ls_id));
|
if (TC_REACH_TIME_INTERVAL(30 * 1000 * 1000)) { // 30s
|
||||||
|
LOG_WARN("ls_info does not exist", K_(tenant_id), K(ls_id), K(tablet_info));
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
LOG_WARN("fail to get ls_info from ls_info_map", KR(ret), K(ls_id), K_(tenant_id));
|
||||||
}
|
}
|
||||||
if (FAILEDx(check_tablet_compaction_scn(all_progress, global_broadcast_scn, tablet_info, ls_info))) {
|
} else if (OB_FAIL(check_tablet_compaction_scn(all_progress, global_broadcast_scn, tablet_info, ls_info))) {
|
||||||
LOG_WARN("fail to check data version", KR(ret), K(tablet_info), K(ls_info));
|
LOG_WARN("fail to check tablet compaction_scn", KR(ret), K(tablet_info), K(ls_info));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -544,5 +552,40 @@ void ObMajorMergeProgressChecker::reset_uncompacted_tablets()
|
|||||||
uncompacted_tablets_.reset();
|
uncompacted_tablets_.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObMajorMergeProgressChecker::refresh_ls_infos()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (IS_NOT_INIT) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
LOG_WARN("not init", KR(ret), K_(tenant_id));
|
||||||
|
} else {
|
||||||
|
FREEZE_TIME_GUARD;
|
||||||
|
// 1. clear ls_infos cached in memory
|
||||||
|
ls_infos_map_.reuse();
|
||||||
|
SMART_VAR(ObArray<ObLSInfo>, ls_infos) {
|
||||||
|
// 2. load ls_infos from __all_ls_meta_table
|
||||||
|
const bool inner_table_only = false;
|
||||||
|
if (OB_ISNULL(lst_operator_)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("lst_operator is null", KR(ret), K_(tenant_id));
|
||||||
|
} else if (OB_FAIL(lst_operator_->get_by_tenant(tenant_id_, inner_table_only, ls_infos))) {
|
||||||
|
LOG_WARN("fail to get ls infos", KR(ret), K_(tenant_id));
|
||||||
|
} else {
|
||||||
|
// 3. update ls_infos cached in memory
|
||||||
|
const int64_t ls_infos_cnt = ls_infos.count();
|
||||||
|
for (int64_t i = 0; (i < ls_infos_cnt) && OB_SUCC(ret); ++i) {
|
||||||
|
const ObLSID &ls_id = ls_infos.at(i).get_ls_id();
|
||||||
|
const ObLSInfo &ls_info = ls_infos.at(i);
|
||||||
|
if (OB_FAIL(ls_infos_map_.set_refactored(ls_id, ls_info, true/*overwrite*/))) {
|
||||||
|
LOG_WARN("fail to set refactored", KR(ret), K(ls_id), K(ls_info));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG_INFO("finish to refresh ls infos", KR(ret), K(ls_infos));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace rootserver
|
} // namespace rootserver
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
|
|||||||
@ -25,6 +25,7 @@ namespace share
|
|||||||
{
|
{
|
||||||
class ObTabletTableOperator;
|
class ObTabletTableOperator;
|
||||||
class ObLSInfo;
|
class ObLSInfo;
|
||||||
|
class ObLSID;
|
||||||
class ObLSTableOperator;
|
class ObLSTableOperator;
|
||||||
class ObIServerTrace;
|
class ObIServerTrace;
|
||||||
struct ObTabletInfo;
|
struct ObTabletInfo;
|
||||||
@ -155,6 +156,7 @@ public:
|
|||||||
|
|
||||||
void set_major_merge_start_time(const int64_t major_merge_start_us);
|
void set_major_merge_start_time(const int64_t major_merge_start_us);
|
||||||
int get_uncompacted_tablets(common::ObArray<share::ObTabletReplica> &uncompacted_tablets) const;
|
int get_uncompacted_tablets(common::ObArray<share::ObTabletReplica> &uncompacted_tablets) const;
|
||||||
|
void reset_uncompacted_tablets();
|
||||||
|
|
||||||
public:
|
public:
|
||||||
ObMergeTimeStatistics merge_time_statistics_;
|
ObMergeTimeStatistics merge_time_statistics_;
|
||||||
@ -170,7 +172,7 @@ private:
|
|||||||
const share::ObTabletInfo &tablet,
|
const share::ObTabletInfo &tablet,
|
||||||
const share::ObLSInfo &ls_info);
|
const share::ObLSInfo &ls_info);
|
||||||
int mark_uncompacted_tables_as_verified(const common::ObIArray<share::ObTableCompactionInfo> &uncompacted_tables);
|
int mark_uncompacted_tables_as_verified(const common::ObIArray<share::ObTableCompactionInfo> &uncompacted_tables);
|
||||||
void reset_uncompacted_tablets();
|
int refresh_ls_infos();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
@ -192,6 +194,8 @@ private:
|
|||||||
ObCrossClusterTabletChecksumValidator cross_cluster_validator_;
|
ObCrossClusterTabletChecksumValidator cross_cluster_validator_;
|
||||||
common::ObArray<share::ObTabletReplica> uncompacted_tablets_; // record for diagnose
|
common::ObArray<share::ObTabletReplica> uncompacted_tablets_; // record for diagnose
|
||||||
common::SpinRWLock diagnose_rw_lock_;
|
common::SpinRWLock diagnose_rw_lock_;
|
||||||
|
// cache of ls_infos in __all_ls_meta_table
|
||||||
|
common::hash::ObHashMap<share::ObLSID, share::ObLSInfo> ls_infos_map_;
|
||||||
|
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObMajorMergeProgressChecker);
|
DISALLOW_COPY_AND_ASSIGN(ObMajorMergeProgressChecker);
|
||||||
};
|
};
|
||||||
|
|||||||
@ -149,6 +149,9 @@ void ObMajorMergeScheduler::run3()
|
|||||||
} else if (OB_FAIL(do_work())) {
|
} else if (OB_FAIL(do_work())) {
|
||||||
LOG_WARN("fail to do major scheduler work", KR(ret), K_(tenant_id), "cur_epoch", get_epoch());
|
LOG_WARN("fail to do major scheduler work", KR(ret), K_(tenant_id), "cur_epoch", get_epoch());
|
||||||
}
|
}
|
||||||
|
// out of do_work, there must be no major merge on this server. therefore, here, clear
|
||||||
|
// compcation diagnose infos that stored in memory of this server.
|
||||||
|
progress_checker_.reset_uncompacted_tablets();
|
||||||
|
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
if (OB_TMP_FAIL(try_idle(DEFAULT_IDLE_US, ret))) {
|
if (OB_TMP_FAIL(try_idle(DEFAULT_IDLE_US, ret))) {
|
||||||
|
|||||||
@ -221,18 +221,18 @@ int ObGlobalMergeTableOperator::check_scn_revert(
|
|||||||
if (it->need_update_ && it->is_scn_) {
|
if (it->need_update_ && it->is_scn_) {
|
||||||
if (0 == STRCMP(it->name_, "frozen_scn")) {
|
if (0 == STRCMP(it->name_, "frozen_scn")) {
|
||||||
if (it->get_scn() < global_merge_info.frozen_scn_.get_scn()) {
|
if (it->get_scn() < global_merge_info.frozen_scn_.get_scn()) {
|
||||||
LOG_ERROR("frozen_scn revert", K(tenant_id), "origin_frozen_scn", it->get_scn(),
|
LOG_WARN("frozen_scn revert", K(tenant_id), "new_frozen_scn", it->get_scn(),
|
||||||
"new_frozen_scn", global_merge_info.frozen_scn_.get_scn());
|
"origin_frozen_scn", global_merge_info.frozen_scn_.get_scn());
|
||||||
}
|
}
|
||||||
} else if (0 == STRCMP(it->name_, "global_broadcast_scn")) {
|
} else if (0 == STRCMP(it->name_, "global_broadcast_scn")) {
|
||||||
if (it->get_scn() < global_merge_info.global_broadcast_scn_.get_scn()) {
|
if (it->get_scn() < global_merge_info.global_broadcast_scn_.get_scn()) {
|
||||||
LOG_ERROR("global_broadcast_scn revert", K(tenant_id), "origin_global_broadcast_scn",
|
LOG_WARN("global_broadcast_scn revert", K(tenant_id), "new_global_broadcast_scn",
|
||||||
it->get_scn(), "new_global_broadcast_scn", global_merge_info.global_broadcast_scn_.get_scn());
|
it->get_scn(), "origin_global_broadcast_scn", global_merge_info.global_broadcast_scn_.get_scn());
|
||||||
}
|
}
|
||||||
} else if (0 == STRCMP(it->name_, "last_merged_scn")) {
|
} else if (0 == STRCMP(it->name_, "last_merged_scn")) {
|
||||||
if (it->get_scn() < global_merge_info.last_merged_scn_.get_scn()) {
|
if (it->get_scn() < global_merge_info.last_merged_scn_.get_scn()) {
|
||||||
LOG_ERROR("last_merged_scn revert", K(tenant_id), "origin_last_merged_scn",
|
LOG_WARN("last_merged_scn revert", K(tenant_id), "new_last_merged_scn",
|
||||||
it->get_scn(), "new_last_merged_scn", global_merge_info.last_merged_scn_.get_scn());
|
it->get_scn(), "origin_last_merged_scn", global_merge_info.last_merged_scn_.get_scn());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -492,23 +492,23 @@ int ObZoneMergeTableOperator::check_scn_revert(
|
|||||||
if (it->need_update_ && it->is_scn_) {
|
if (it->need_update_ && it->is_scn_) {
|
||||||
if (0 == STRCMP(it->name_, "frozen_scn")) {
|
if (0 == STRCMP(it->name_, "frozen_scn")) {
|
||||||
if (it->get_scn() < zone_merge_info.frozen_scn_.get_scn()) {
|
if (it->get_scn() < zone_merge_info.frozen_scn_.get_scn()) {
|
||||||
LOG_ERROR("frozen_scn revert", K(tenant_id), "origin_frozen_scn", it->get_scn(),
|
LOG_WARN("frozen_scn revert", K(tenant_id), "new_frozen_scn", it->get_scn(),
|
||||||
"new_frozen_scn", zone_merge_info.frozen_scn_.get_scn());
|
"origin_frozen_scn", zone_merge_info.frozen_scn_.get_scn());
|
||||||
}
|
}
|
||||||
} else if (0 == STRCMP(it->name_, "broadcast_scn")) {
|
} else if (0 == STRCMP(it->name_, "broadcast_scn")) {
|
||||||
if (it->get_scn() < zone_merge_info.broadcast_scn_.get_scn()) {
|
if (it->get_scn() < zone_merge_info.broadcast_scn_.get_scn()) {
|
||||||
LOG_ERROR("broadcast_scn revert", K(tenant_id), "origin_broadcast_scn",
|
LOG_WARN("broadcast_scn revert", K(tenant_id), "new_broadcast_scn",
|
||||||
it->get_scn(), "new_broadcast_scn", zone_merge_info.broadcast_scn_.get_scn());
|
it->get_scn(), "origin_broadcast_scn", zone_merge_info.broadcast_scn_.get_scn());
|
||||||
}
|
}
|
||||||
} else if (0 == STRCMP(it->name_, "last_merged_scn")) {
|
} else if (0 == STRCMP(it->name_, "last_merged_scn")) {
|
||||||
if (it->get_scn() < zone_merge_info.last_merged_scn_.get_scn()) {
|
if (it->get_scn() < zone_merge_info.last_merged_scn_.get_scn()) {
|
||||||
LOG_ERROR("last_merged_scn revert", K(tenant_id), "origin_last_merged_scn",
|
LOG_WARN("last_merged_scn revert", K(tenant_id), "new_last_merged_scn",
|
||||||
it->get_scn(), "new_last_merged_scn", zone_merge_info.last_merged_scn_.get_scn());
|
it->get_scn(), "origin_last_merged_scn", zone_merge_info.last_merged_scn_.get_scn());
|
||||||
}
|
}
|
||||||
} else if (0 == STRCMP(it->name_, "all_merged_scn")) {
|
} else if (0 == STRCMP(it->name_, "all_merged_scn")) {
|
||||||
if (it->get_scn() < zone_merge_info.all_merged_scn_.get_scn()) {
|
if (it->get_scn() < zone_merge_info.all_merged_scn_.get_scn()) {
|
||||||
LOG_ERROR("all_merged_scn revert", K(tenant_id), "origin_all_merged_scn",
|
LOG_WARN("all_merged_scn revert", K(tenant_id), "new_all_merged_scn",
|
||||||
it->get_scn(), "new_all_merged_scn", zone_merge_info.all_merged_scn_.get_scn());
|
it->get_scn(), "origin_all_merged_scn", zone_merge_info.all_merged_scn_.get_scn());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user