add uncompacted tables in rs major checker

This commit is contained in:
yangqise7en 2023-11-27 07:56:14 +00:00 committed by ob-robot
parent 3b9b6a9716
commit b7ea58d6e6
11 changed files with 119 additions and 62 deletions

View File

@ -304,7 +304,8 @@ bool ObMajorFreezeService::is_paused() const
}
int ObMajorFreezeService::get_uncompacted_tablets(
ObArray<ObTabletReplica> &uncompacted_tablets) const
ObArray<ObTabletReplica> &uncompacted_tablets,
ObArray<uint64_t> &uncompacted_table_ids) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -314,7 +315,7 @@ int ObMajorFreezeService::get_uncompacted_tablets(
if (OB_ISNULL(tenant_major_freeze_)) {
ret = OB_LEADER_NOT_EXIST;
LOG_WARN("tenant_major_freeze is null", KR(ret), K_(tenant_id));
} else if (OB_FAIL(tenant_major_freeze_->get_uncompacted_tablets(uncompacted_tablets))) {
} else if (OB_FAIL(tenant_major_freeze_->get_uncompacted_tablets(uncompacted_tablets, uncompacted_table_ids))) {
LOG_WARN("fail to get uncompacted tablets", KR(ret), K_(tenant_id));
}
}

View File

@ -88,7 +88,9 @@ public:
void destroy();
bool is_paused() const;
int get_uncompacted_tablets(common::ObArray<share::ObTabletReplica> &uncompacted_tablets) const;
int get_uncompacted_tablets(
common::ObArray<share::ObTabletReplica> &uncompacted_tablets,
common::ObArray<uint64_t> &uncompacted_table_ids) const;
protected:
virtual ObMajorFreezeServiceType get_service_type() const

View File

@ -54,7 +54,7 @@ ObMajorMergeProgressChecker::ObMajorMergeProgressChecker(
ckm_validator_(tenant_id, stop_, tablet_ls_pair_cache_, tablet_status_map_,
table_compaction_map_, idx_ckm_validate_array_, validator_statistics_,
finish_tablet_ls_pair_array_, finish_tablet_ckm_array_),
uncompacted_tablets_(),
uncompacted_tablets_(), uncompacted_table_ids_(),
diagnose_rw_lock_(ObLatchIds::MAJOR_FREEZE_DIAGNOSE_LOCK),
ls_locality_cache_(), total_time_guard_(), validator_statistics_(), batch_size_mgr_() {}
@ -161,15 +161,20 @@ int ObMajorMergeProgressChecker::clear_cached_info()
finish_tablet_ckm_array_.reset();
progress_.reset();
ckm_validator_.clear_cached_info();
uncompacted_tablets_.reset();
loop_cnt_ = 0;
tablet_ls_pair_cache_.reuse();
{
SpinWLockGuard w_guard(diagnose_rw_lock_);
uncompacted_tablets_.reset();
uncompacted_table_ids_.reset();
}
}
return ret;
}
int ObMajorMergeProgressChecker::get_uncompacted_tablets(
ObArray<ObTabletReplica> &uncompacted_tablets) const
ObArray<ObTabletReplica> &input_tablets,
common::ObArray<uint64_t> &input_table_ids) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -177,8 +182,10 @@ int ObMajorMergeProgressChecker::get_uncompacted_tablets(
LOG_WARN("not init", KR(ret), K_(tenant_id));
} else {
SpinRLockGuard r_guard(diagnose_rw_lock_);
if (OB_FAIL(uncompacted_tablets.assign(uncompacted_tablets_))) {
if (OB_FAIL(input_tablets.assign(uncompacted_tablets_))) {
LOG_WARN("fail to assign uncompacted_tablets", KR(ret), K_(tenant_id), K_(uncompacted_tablets));
} else if (OB_FAIL(input_table_ids.assign(uncompacted_table_ids_))) {
LOG_WARN("fail to assign uncompacted_tablets", KR(ret), K_(tenant_id), K_(uncompacted_table_ids));
}
}
return ret;
@ -224,6 +231,11 @@ int ObMajorMergeProgressChecker::check_verification(
if (OB_TMP_FAIL(unfinish_table_id_array.push_back(table_id))) {
LOG_WARN("failed to push table_id into finish_array", KR(tmp_ret), KPC(table_compaction_info_ptr));
}
} else if (progress_.table_cnt_[INITIAL]++ < DEBUG_INFO_CNT) { // add into uncompacted tablets array to show in diagnose
SpinWLockGuard w_guard(diagnose_rw_lock_);
if (OB_TMP_FAIL(uncompacted_table_ids_.push_back(table_id))) {
LOG_WARN("fail to push_back", KR(tmp_ret), K_(tenant_id), K_(compaction_scn), K(table_id));
}
}
if (0 >= index_cnt // data & index should be in same batch
&& (++table_cnt >= table_batch_size)) {
@ -243,6 +255,7 @@ void ObMajorMergeProgressChecker::reset_uncompacted_tablets()
{
SpinWLockGuard w_guard(diagnose_rw_lock_);
uncompacted_tablets_.reuse();
uncompacted_table_ids_.reuse();
}
bool ObMajorMergeProgressChecker::should_ignore_cur_table(const ObSimpleTableSchemaV2 *simple_schema)
@ -608,10 +621,13 @@ void ObMajorMergeProgressChecker::print_unfinish_info(const int64_t cost_us)
{
int ret = OB_SUCCESS;
ObSEArray<uint64_t, DEBUG_INFO_CNT> tmp_table_id_array;
ObSEArray<ObTabletReplica, DEBUG_INFO_CNT> tmp_replica_array;
ObSEArray<ObTabletReplica, DEBUG_INFO_CNT> uncompacted_replica_array;
ObSEArray<uint64_t, DEBUG_INFO_CNT> uncompacted_table_array;
{
SpinRLockGuard r_guard(diagnose_rw_lock_);
if (OB_FAIL(tmp_replica_array.assign(uncompacted_tablets_))) {
if (OB_FAIL(uncompacted_replica_array.assign(uncompacted_tablets_))) {
LOG_WARN("failed to assgin array", KR(ret));
} else if (OB_FAIL(uncompacted_table_array.assign(uncompacted_table_ids_))) {
LOG_WARN("failed to assgin array", KR(ret));
}
}
@ -635,7 +651,8 @@ void ObMajorMergeProgressChecker::print_unfinish_info(const int64_t cost_us)
LOG_INFO("succ to check merge progress", K_(tenant_id), K_(loop_cnt), K_(compaction_scn), K(cost_us),
K_(progress), "remain_table_id_count", table_ids_.count(),
"remain_table_ids", tmp_table_id_array,
"uncompacted_tablets", tmp_replica_array,
"uncompacted_tablets", uncompacted_replica_array,
"uncompacted_table_ids", uncompacted_table_array,
K_(total_time_guard), K_(validator_statistics));
}

View File

@ -65,7 +65,9 @@ public:
share::SCN global_broadcast_scn,
const int64_t expected_epoch); // For each round major_freeze, need invoke this once.
int clear_cached_info();
int get_uncompacted_tablets(common::ObArray<share::ObTabletReplica> &uncompacted_tablets) const;
int get_uncompacted_tablets(
common::ObArray<share::ObTabletReplica> &uncompacted_tablets,
common::ObArray<uint64_t> &uncompacted_table_ids) const;
void reset_uncompacted_tablets();
int check_progress(compaction::ObMergeProgress &progress);
const compaction::ObTabletLSPairCache &get_tablet_ls_pair_cache() const { return tablet_ls_pair_cache_; }
@ -146,6 +148,7 @@ private:
compaction::ObTableCompactionInfoMap table_compaction_map_; // <table_id, compaction_info>
ObChecksumValidator ckm_validator_;
common::ObSEArray<share::ObTabletReplica, DEBUG_INFO_CNT> uncompacted_tablets_; // record for diagnose
common::ObSEArray<uint64_t, DEBUG_INFO_CNT> uncompacted_table_ids_; // record for diagnose
common::SpinRWLock diagnose_rw_lock_;
// cache of ls_infos in __all_ls_meta_table
share::ObCompactionLocalityCache ls_locality_cache_;

View File

@ -227,14 +227,15 @@ int ObMajorMergeScheduler::try_idle(
}
int ObMajorMergeScheduler::get_uncompacted_tablets(
ObArray<ObTabletReplica> &uncompacted_tablets) const
ObArray<ObTabletReplica> &uncompacted_tablets,
ObArray<uint64_t> &uncompacted_table_ids) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K_(tenant_id));
} else {
if (OB_FAIL(progress_checker_.get_uncompacted_tablets(uncompacted_tablets))) {
if (OB_FAIL(progress_checker_.get_uncompacted_tablets(uncompacted_tablets, uncompacted_table_ids))) {
LOG_WARN("fail to get uncompacted tablets", KR(ret), K_(tenant_id));
}
}

View File

@ -80,7 +80,9 @@ public:
ObMajorMergeIdling &get_major_scheduler_idling() { return idling_; }
int try_update_epoch_and_reload();
int get_uncompacted_tablets(common::ObArray<share::ObTabletReplica> &uncompacted_tablets) const;
int get_uncompacted_tablets(
common::ObArray<share::ObTabletReplica> &uncompacted_tablets,
common::ObArray<uint64_t> &uncompacted_table_ids) const;
protected:
virtual int try_idle(const int64_t ori_idle_time_us,

View File

@ -288,14 +288,15 @@ int ObTenantMajorFreeze::clear_merge_error()
}
int ObTenantMajorFreeze::get_uncompacted_tablets(
ObArray<ObTabletReplica> &uncompacted_tablets) const
ObArray<ObTabletReplica> &uncompacted_tablets,
ObArray<uint64_t> &uncompacted_table_ids) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K_(tenant_id));
} else {
if (OB_FAIL(merge_scheduler_.get_uncompacted_tablets(uncompacted_tablets))) {
if (OB_FAIL(merge_scheduler_.get_uncompacted_tablets(uncompacted_tablets, uncompacted_table_ids))) {
LOG_WARN("fail to get uncompacted tablets", KR(ret), K_(tenant_id));
}
}

View File

@ -70,7 +70,9 @@ public:
int clear_merge_error();
int get_uncompacted_tablets(common::ObArray<share::ObTabletReplica> &uncompacted_tablets) const;
int get_uncompacted_tablets(
common::ObArray<share::ObTabletReplica> &uncompacted_tablets,
common::ObArray<uint64_t> &uncompacted_table_ids) const;
private:
// major merge one by one

View File

@ -217,12 +217,11 @@ int ObTenantTabletMetaIterator::init(
sql_proxy_ = &sql_proxy;
valid_tablet_ls_pairs_.reuse();
valid_tablet_ls_pairs_idx_ = 0;
is_inited_ = true;
if (OB_FAIL(prefetch())) { // need to prefetch a batch of tablet_info
if (OB_ITER_END != ret) {
LOG_WARN("fail to prefetch", KR(ret), K_(tenant_id), K_(prefetch_tablet_idx));
}
} else {
is_inited_ = true;
}
}
return ret;
@ -256,10 +255,7 @@ int ObTenantTabletMetaIterator::prefetch()
int ObTenantTabletMetaIterator::prefetch_valid_tablet_ids()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTenantTabletMetaIterator is not inited", KR(ret));
} else if (OB_UNLIKELY(prefetch_tablet_idx_ != prefetched_tablets_.count())) {
if (OB_UNLIKELY(prefetch_tablet_idx_ != prefetched_tablets_.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("prefetched valid tablet ids have not been iterated to end",
KR(ret), K_(valid_tablet_ls_pairs_idx), "tablet ls pair count",
@ -285,10 +281,7 @@ int ObTenantTabletMetaIterator::prefetch_valid_tablet_ids()
int ObTenantTabletMetaIterator::prefetch_tablets()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTenantTabletMetaIterator is not inited", KR(ret));
} else if (OB_UNLIKELY(prefetch_tablet_idx_ != prefetched_tablets_.count())) {
if (OB_UNLIKELY(prefetch_tablet_idx_ != prefetched_tablets_.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("prefetched tablet infos have not been iterated to end",
KR(ret), K_(prefetch_tablet_idx),
@ -432,9 +425,9 @@ int ObTenantTabletTableIterator::next(ObTabletInfo &tablet_info)
int ObTenantTabletTableIterator::prefetch_()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!inited_) || OB_ISNULL(tt_operator_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret));
if (OB_ISNULL(tt_operator_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("operator is unexpected null", KR(ret), KP_(tt_operator));
} else {
ObTabletID last_tablet_id; // start with INVALID_TABLET_ID = 0
if (inner_tablet_infos_.count() > 0) {

View File

@ -1167,46 +1167,75 @@ int ObCompactionDiagnoseMgr::do_tenant_major_merge_diagnose(
} else {
(void) get_and_set_suspect_info(MAJOR_MERGE, UNKNOW_LS_ID, UNKNOW_TABLET_ID); // get RS schedule suspect info
ObTenantTabletScheduler *scheduler = MTL(ObTenantTabletScheduler*);
const int64_t frozen_scn = MAX(scheduler->get_frozen_version(), MTL(ObTenantFreezeInfoMgr*)->get_latest_frozen_version());
SMART_VAR(ObArray<ObTabletReplica>, uncompacted_tablets) {
if (OB_FAIL(major_freeze_service->get_uncompacted_tablets(uncompacted_tablets))) {
SMART_VARS_2((ObArray<ObTabletReplica>, uncompacted_tablets), (ObArray<uint64_t>, uncompacted_table_ids)) {
if (OB_FAIL(major_freeze_service->get_uncompacted_tablets(uncompacted_tablets, uncompacted_table_ids))) {
LOG_WARN("fail to get uncompacted tablets", KR(ret));
} else {
int64_t uncompacted_tablets_cnt = uncompacted_tablets.count();
LOG_INFO("finish get uncompacted tablets for diagnose", K(ret), K(uncompacted_tablets_cnt));
for (int64_t i = 0; OB_SUCC(ret) && i < uncompacted_tablets_cnt; ++i) {
if (can_add_diagnose_info()) {
const bool compaction_scn_not_valid = frozen_scn > uncompacted_tablets.at(i).get_snapshot_version();
const char *status =
ObTabletReplica::SCN_STATUS_ERROR == uncompacted_tablets.at(i).get_status()
? "CHECKSUM_ERROR"
: (compaction_scn_not_valid ? "compaction_scn_not_update" : "report_scn_not_update");
if (OB_FAIL(ADD_DIAGNOSE_INFO(
MAJOR_MERGE,
uncompacted_tablets.at(i).get_ls_id(),
uncompacted_tablets.at(i).get_tablet_id(),
ObCompactionDiagnoseInfo::DIA_STATUS_RS_UNCOMPACTED,
ObTimeUtility::fast_current_time(), "server",
uncompacted_tablets.at(i).get_server(), "status", status,
"frozen_scn", frozen_scn,
"compaction_scn", uncompacted_tablets.at(i).get_snapshot_version(),
"report_scn", uncompacted_tablets.at(i).get_report_scn()))) {
LOG_WARN("fail to set diagnose info", KR(ret), "uncompacted_tablet",
uncompacted_tablets.at(i));
ret = OB_SUCCESS; // ignore ret, and process next uncompacted_tablet
}
} else {
LOG_INFO("can not add diagnose info", K_(idx), K_(max_cnt), "uncompacted_tablet",
uncompacted_tablets.at(i));
}
}
(void) add_uncompacted_tablet_to_diagnose(uncompacted_tablets);
(void) add_uncompacted_table_ids_to_diagnose(uncompacted_table_ids);
}
}
}
return ret;
}
int ObCompactionDiagnoseMgr::add_uncompacted_tablet_to_diagnose(
const ObIArray<ObTabletReplica> &uncompacted_tablets)
{
int ret = OB_SUCCESS;
const int64_t frozen_scn = MAX(MTL(ObTenantTabletScheduler*)->get_frozen_version(), MTL(ObTenantFreezeInfoMgr*)->get_latest_frozen_version());
const int64_t uncompacted_tablets_cnt = uncompacted_tablets.count();
LOG_INFO("finish get uncompacted tablets for diagnose", K(ret), K(uncompacted_tablets_cnt));
for (int64_t i = 0; OB_SUCC(ret) && i < uncompacted_tablets_cnt; ++i) {
if (can_add_diagnose_info()) {
const bool compaction_scn_not_valid = frozen_scn > uncompacted_tablets.at(i).get_snapshot_version();
const char *status =
ObTabletReplica::SCN_STATUS_ERROR == uncompacted_tablets.at(i).get_status()
? "CHECKSUM_ERROR"
: (compaction_scn_not_valid ? "compaction_scn_not_update" : "report_scn_not_update");
if (OB_FAIL(ADD_DIAGNOSE_INFO(
MAJOR_MERGE, uncompacted_tablets.at(i).get_ls_id(),
uncompacted_tablets.at(i).get_tablet_id(),
ObCompactionDiagnoseInfo::DIA_STATUS_RS_UNCOMPACTED,
ObTimeUtility::fast_current_time(), "server",
uncompacted_tablets.at(i).get_server(), "status", status,
"frozen_scn", frozen_scn, "compaction_scn",
uncompacted_tablets.at(i).get_snapshot_version(), "report_scn",
uncompacted_tablets.at(i).get_report_scn()))) {
LOG_WARN("fail to set diagnose info", KR(ret), "uncompacted_tablet",
uncompacted_tablets.at(i));
ret = OB_SUCCESS; // ignore ret, and process next uncompacted_tablet
}
} else {
LOG_INFO("can not add diagnose info", K_(idx), K_(max_cnt),
"uncompacted_tablet", uncompacted_tablets.at(i));
}
}
return ret;
}
int ObCompactionDiagnoseMgr::add_uncompacted_table_ids_to_diagnose(const ObIArray<uint64_t> &uncompacted_table_ids)
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < uncompacted_table_ids.count(); ++i) {
if (can_add_diagnose_info()) {
if (OB_FAIL(ADD_COMMON_DIAGNOSE_INFO(
MAJOR_MERGE,
ObCompactionDiagnoseInfo::DIA_STATUS_RS_UNCOMPACTED,
ObTimeUtility::fast_current_time(),
"table_id", uncompacted_table_ids.at(i)))) {
LOG_WARN("fail to set diagnose info", KR(ret), "uncompacted_tablet",
uncompacted_table_ids.at(i));
ret = OB_SUCCESS; // ignore ret, and process next uncompacted_tablet
}
} else {
LOG_INFO("can not add diagnose info", K_(idx), K_(max_cnt),
"uncompacted_table", uncompacted_table_ids.at(i));
}
}
return ret;
}
int ObCompactionDiagnoseMgr::diagnose_tablet_mini_merge(
const ObLSID &ls_id,
ObTablet &tablet)

View File

@ -29,6 +29,10 @@ namespace rootserver
{
class ObMajorFreezeService;
}
namespace share
{
class ObTabletReplica;
}
using namespace storage;
using namespace share;
namespace compaction
@ -536,6 +540,8 @@ private:
int check_if_need_diagnose(rootserver::ObMajorFreezeService *&major_freeze_service,
bool &need_diagnose) const;
int do_tenant_major_merge_diagnose(rootserver::ObMajorFreezeService *major_freeze_service);
int add_uncompacted_tablet_to_diagnose(const ObIArray<share::ObTabletReplica> &uncompacted_tablets);
int add_uncompacted_table_ids_to_diagnose(const ObIArray<uint64_t> &uncompacted_table_ids);
public:
typedef common::hash::ObHashMap<ObLSID, ObLSCheckStatus> LSStatusMap;