fix wrong flag in standby rs major checker
This commit is contained in:
@ -134,8 +134,12 @@ int ObChecksumValidator::set_need_validate()
|
||||
if (is_primary_service_) {
|
||||
// need to check index checksum on primary tenant
|
||||
need_validate_index_ckm_ = true;
|
||||
if (OB_FAIL(check_tablet_checksum_sync_finish())) {
|
||||
if (OB_FAIL(check_tablet_checksum_sync_finish(true /*force_check*/))) {
|
||||
LOG_WARN("failed to check tablet checksum sync finish", K(ret), K_(is_primary_service));
|
||||
} else {
|
||||
// for primary service, if cross cluster ckm sync finish, need to validate cur round checksum & inner_table
|
||||
// else: write ckm into inner table
|
||||
need_validate_cross_cluster_ckm_ = cross_cluster_ckm_sync_finish_;
|
||||
}
|
||||
} else { // standby tenant
|
||||
need_validate_index_ckm_ = false;
|
||||
@ -173,7 +177,7 @@ int ObChecksumValidator::check_inner_status()
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid compaction_scn/expected_epoch/schema_guard_", KR(ret), K_(tenant_id),
|
||||
K_(compaction_scn), K_(expected_epoch), KP_(schema_guard));
|
||||
} else if (OB_FAIL(check_tablet_checksum_sync_finish())) {
|
||||
} else if (OB_FAIL(check_tablet_checksum_sync_finish(false /*force_check*/))) {
|
||||
LOG_WARN("failed to set need_validate", K(ret), K_(tenant_id));
|
||||
}
|
||||
return ret;
|
||||
@ -334,6 +338,7 @@ int ObChecksumValidator::update_table_compaction_info_by_tablet()
|
||||
if (OB_HASH_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
table_compaction_info_.set_uncompacted();
|
||||
(void) uncompact_info_.add_tablet(tenant_id_, cur_tablet_ls_pair_array_.at(idx).get_ls_id(), tablet_id);
|
||||
LOG_TRACE("tablet not exist in tablet status map", KR(ret), K(tablet_id),
|
||||
K_(cur_tablet_ls_pair_array), K_(table_compaction_info));
|
||||
#ifdef ERRSIM
|
||||
@ -522,24 +527,25 @@ int ObChecksumValidator::batch_update_report_scn()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObChecksumValidator::check_tablet_checksum_sync_finish()
|
||||
int ObChecksumValidator::check_tablet_checksum_sync_finish(const bool force_check)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_exist = false;
|
||||
if (cross_cluster_ckm_sync_finish_) {
|
||||
// need check inner table:
|
||||
// 1) force check when first init
|
||||
// 2) ckm not sync finish in standby service
|
||||
if (!force_check && (is_primary_service_ || cross_cluster_ckm_sync_finish_)) {
|
||||
} else if (OB_FAIL(ObTabletChecksumOperator::is_first_tablet_in_sys_ls_exist(*sql_proxy_,
|
||||
tenant_id_, compaction_scn_, is_exist))) {
|
||||
LOG_WARN("fail to check is first tablet in first ls exist", KR(ret), K_(tenant_id), K_(compaction_scn));
|
||||
} else if (is_exist) {
|
||||
cross_cluster_ckm_sync_finish_ = true;
|
||||
need_validate_cross_cluster_ckm_ = true;
|
||||
} else {
|
||||
need_validate_cross_cluster_ckm_ = false;
|
||||
cross_cluster_ckm_sync_finish_ = check_waiting_tablet_checksum_timeout();
|
||||
if (!is_primary_service_ && TC_REACH_TIME_INTERVAL(PRINT_CROSS_CLUSTER_LOG_INVERVAL)) {
|
||||
LOG_WARN("can not check cross-cluster checksum now, please wait until first tablet"
|
||||
"in sys ls exists", K_(tenant_id), K_(compaction_scn), K_(major_merge_start_us),
|
||||
"fast_current_time_us", ObTimeUtil::fast_current_time());
|
||||
"fast_current_time_us", ObTimeUtil::fast_current_time(), K(is_exist), K_(is_primary_service));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -63,7 +63,8 @@ public:
|
||||
compaction::ObIndexCkmValidatePairArray &idx_ckm_validate_array,
|
||||
compaction::ObCkmValidatorStatistics &statistics,
|
||||
ObArray<share::ObTabletLSPair> &finish_tablet_ls_pair_array,
|
||||
ObArray<share::ObTabletChecksumItem> &finish_tablet_ckm_array)
|
||||
ObArray<share::ObTabletChecksumItem> &finish_tablet_ckm_array,
|
||||
compaction::ObUncompactInfo &uncompact_info)
|
||||
: is_inited_(false),
|
||||
is_primary_service_(false),
|
||||
need_validate_index_ckm_(false),
|
||||
@ -83,6 +84,7 @@ public:
|
||||
idx_ckm_validate_array_(idx_ckm_validate_array),
|
||||
finish_tablet_ls_pair_array_(finish_tablet_ls_pair_array),
|
||||
finish_tablet_ckm_array_(finish_tablet_ckm_array),
|
||||
uncompact_info_(uncompact_info),
|
||||
schema_guard_(nullptr),
|
||||
simple_schema_(nullptr),
|
||||
table_compaction_info_(),
|
||||
@ -141,7 +143,7 @@ private:
|
||||
|
||||
/* Cross Cluster Checksum Section */
|
||||
int validate_cross_cluster_checksum();
|
||||
int check_tablet_checksum_sync_finish();
|
||||
int check_tablet_checksum_sync_finish(const bool force_check);
|
||||
int validate_replica_and_tablet_checksum();
|
||||
int check_column_checksum(
|
||||
const ObArray<share::ObTabletReplicaChecksumItem> &tablet_replica_checksum_items,
|
||||
@ -171,6 +173,7 @@ private:
|
||||
compaction::ObIndexCkmValidatePairArray &idx_ckm_validate_array_;
|
||||
ObArray<share::ObTabletLSPair> &finish_tablet_ls_pair_array_;
|
||||
ObArray<share::ObTabletChecksumItem> &finish_tablet_ckm_array_;
|
||||
compaction::ObUncompactInfo &uncompact_info_;
|
||||
|
||||
/* different for every table */
|
||||
share::schema::ObSchemaGetterGuard *schema_guard_;
|
||||
|
@ -53,10 +53,8 @@ ObMajorMergeProgressChecker::ObMajorMergeProgressChecker(
|
||||
tablet_status_map_(), table_compaction_map_(),
|
||||
ckm_validator_(tenant_id, stop_, tablet_ls_pair_cache_, tablet_status_map_,
|
||||
table_compaction_map_, idx_ckm_validate_array_, validator_statistics_,
|
||||
finish_tablet_ls_pair_array_, finish_tablet_ckm_array_),
|
||||
uncompacted_tablets_(), uncompacted_table_ids_(),
|
||||
diagnose_rw_lock_(ObLatchIds::MAJOR_FREEZE_DIAGNOSE_LOCK),
|
||||
ls_locality_cache_(), total_time_guard_(), validator_statistics_(), batch_size_mgr_() {}
|
||||
finish_tablet_ls_pair_array_, finish_tablet_ckm_array_, uncompact_info_),
|
||||
uncompact_info_(), ls_locality_cache_(), total_time_guard_(), validator_statistics_(), batch_size_mgr_() {}
|
||||
|
||||
int ObMajorMergeProgressChecker::init(
|
||||
const bool is_primary_service,
|
||||
@ -163,11 +161,7 @@ int ObMajorMergeProgressChecker::clear_cached_info()
|
||||
ckm_validator_.clear_cached_info();
|
||||
loop_cnt_ = 0;
|
||||
tablet_ls_pair_cache_.reuse();
|
||||
{
|
||||
SpinWLockGuard w_guard(diagnose_rw_lock_);
|
||||
uncompacted_tablets_.reset();
|
||||
uncompacted_table_ids_.reset();
|
||||
}
|
||||
reset_uncompacted_tablets();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -180,13 +174,8 @@ int ObMajorMergeProgressChecker::get_uncompacted_tablets(
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", KR(ret), K_(tenant_id));
|
||||
} else {
|
||||
SpinRLockGuard r_guard(diagnose_rw_lock_);
|
||||
if (OB_FAIL(input_tablets.assign(uncompacted_tablets_))) {
|
||||
LOG_WARN("fail to assign uncompacted_tablets", KR(ret), K_(tenant_id), K_(uncompacted_tablets));
|
||||
} else if (OB_FAIL(input_table_ids.assign(uncompacted_table_ids_))) {
|
||||
LOG_WARN("fail to assign uncompacted_tablets", KR(ret), K_(tenant_id), K_(uncompacted_table_ids));
|
||||
}
|
||||
} else if (OB_FAIL(uncompact_info_.get_uncompact_info(input_tablets, input_table_ids))) {
|
||||
LOG_WARN("fail to get uncompacted info", KR(ret), K_(tenant_id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -231,11 +220,7 @@ int ObMajorMergeProgressChecker::check_verification(
|
||||
if (OB_TMP_FAIL(unfinish_table_id_array.push_back(table_id))) {
|
||||
LOG_WARN("failed to push table_id into finish_array", KR(tmp_ret), KPC(table_compaction_info_ptr));
|
||||
}
|
||||
} else if (progress_.table_cnt_[INITIAL]++ < DEBUG_INFO_CNT) { // add into uncompacted tablets array to show in diagnose
|
||||
SpinWLockGuard w_guard(diagnose_rw_lock_);
|
||||
if (OB_TMP_FAIL(uncompacted_table_ids_.push_back(table_id))) {
|
||||
LOG_WARN("fail to push_back", KR(tmp_ret), K_(tenant_id), K_(compaction_scn), K(table_id));
|
||||
}
|
||||
(void) uncompact_info_.add_table(table_id);
|
||||
}
|
||||
if (0 >= index_cnt // data & index should be in same batch
|
||||
&& (++table_cnt >= table_batch_size)) {
|
||||
@ -251,13 +236,6 @@ int ObMajorMergeProgressChecker::check_verification(
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObMajorMergeProgressChecker::reset_uncompacted_tablets()
|
||||
{
|
||||
SpinWLockGuard w_guard(diagnose_rw_lock_);
|
||||
uncompacted_tablets_.reuse();
|
||||
uncompacted_table_ids_.reuse();
|
||||
}
|
||||
|
||||
bool ObMajorMergeProgressChecker::should_ignore_cur_table(const ObSimpleTableSchemaV2 *simple_schema)
|
||||
{
|
||||
bool bret = true;
|
||||
@ -620,19 +598,13 @@ int ObMajorMergeProgressChecker::check_progress(
|
||||
void ObMajorMergeProgressChecker::print_unfinish_info(const int64_t cost_us)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSEArray<uint64_t, DEBUG_INFO_CNT> tmp_table_id_array;
|
||||
ObSEArray<ObTabletReplica, DEBUG_INFO_CNT> uncompacted_replica_array;
|
||||
ObSEArray<uint64_t, DEBUG_INFO_CNT> uncompacted_table_array;
|
||||
{
|
||||
SpinRLockGuard r_guard(diagnose_rw_lock_);
|
||||
if (OB_FAIL(uncompacted_replica_array.assign(uncompacted_tablets_))) {
|
||||
LOG_WARN("failed to assgin array", KR(ret));
|
||||
} else if (OB_FAIL(uncompacted_table_array.assign(uncompacted_table_ids_))) {
|
||||
LOG_WARN("failed to assgin array", KR(ret));
|
||||
}
|
||||
}
|
||||
const int64_t array_cnt = ObUncompactInfo::DEBUG_INFO_CNT;
|
||||
ObSEArray<uint64_t, array_cnt> tmp_table_id_array;
|
||||
ObSEArray<ObTabletReplica, array_cnt> uncompacted_replica_array;
|
||||
ObSEArray<uint64_t, array_cnt> uncompacted_table_array;
|
||||
(void) uncompact_info_.get_uncompact_info(uncompacted_replica_array, uncompacted_table_array);
|
||||
if (table_ids_.count() > 0) {
|
||||
const int64_t table_id_cnt = MIN(DEBUG_INFO_CNT, table_ids_.count());
|
||||
const int64_t table_id_cnt = MIN(array_cnt, table_ids_.count());
|
||||
for (int64_t idx = 0; OB_SUCC(ret) && idx < table_id_cnt; ++idx) {
|
||||
if (OB_FAIL(tmp_table_id_array.push_back(table_ids_.at(idx)))) {
|
||||
LOG_WARN("failed to push array", KR(ret));
|
||||
@ -957,12 +929,7 @@ int ObMajorMergeProgressChecker::generate_tablet_status_map()
|
||||
LOG_ERROR("ERROR! ERROR! ERROR! find error status tablet replica", KR(ret), K(tablet_info));
|
||||
} else if (replica_snapshot_scn < compaction_scn_) {
|
||||
status = ObTabletCompactionStatus::INITIAL;
|
||||
if (progress_.unmerged_tablet_cnt_++ < DEBUG_INFO_CNT) { // add into uncompacted tablets array to show in diagnose
|
||||
SpinWLockGuard w_guard(diagnose_rw_lock_);
|
||||
if (OB_TMP_FAIL(uncompacted_tablets_.push_back(*replica))) {
|
||||
LOG_WARN("fail to push_back", KR(tmp_ret), K_(tenant_id), K_(compaction_scn), KPC(replica));
|
||||
}
|
||||
}
|
||||
(void) uncompact_info_.add_tablet(*replica);
|
||||
LOG_TRACE("unfinish tablet", KR(ret), K(replica_snapshot_scn), K_(compaction_scn));
|
||||
break;
|
||||
} else if (OB_FAIL(report_scn.convert_for_tx(replica->get_report_scn()))) { // check report_scn
|
||||
|
@ -68,7 +68,7 @@ public:
|
||||
int get_uncompacted_tablets(
|
||||
common::ObArray<share::ObTabletReplica> &uncompacted_tablets,
|
||||
common::ObArray<uint64_t> &uncompacted_table_ids) const;
|
||||
void reset_uncompacted_tablets();
|
||||
OB_INLINE void reset_uncompacted_tablets() { uncompact_info_.reset(); }
|
||||
int check_progress(compaction::ObMergeProgress &progress);
|
||||
const compaction::ObTabletLSPairCache &get_tablet_ls_pair_cache() const { return tablet_ls_pair_cache_; }
|
||||
private:
|
||||
@ -123,7 +123,6 @@ private:
|
||||
private:
|
||||
static const int64_t ADD_RS_EVENT_INTERVAL = 10L * 60 * 1000 * 1000; // 10m
|
||||
static const int64_t PRINT_LOG_INTERVAL = 2 * 60 * 1000 * 1000; // 2m
|
||||
static const int64_t DEBUG_INFO_CNT = 3;
|
||||
static const int64_t DEAL_REST_TABLE_CNT_THRESHOLD = 100;
|
||||
static const int64_t DEAL_REST_TABLE_INTERVAL = 10 * 60 * 1000 * 1000L; // 10m
|
||||
private:
|
||||
@ -147,9 +146,7 @@ private:
|
||||
// record each table compaction/verify status
|
||||
compaction::ObTableCompactionInfoMap table_compaction_map_; // <table_id, compaction_info>
|
||||
ObChecksumValidator ckm_validator_;
|
||||
common::ObSEArray<share::ObTabletReplica, DEBUG_INFO_CNT> uncompacted_tablets_; // record for diagnose
|
||||
common::ObSEArray<uint64_t, DEBUG_INFO_CNT> uncompacted_table_ids_; // record for diagnose
|
||||
common::SpinRWLock diagnose_rw_lock_;
|
||||
compaction::ObUncompactInfo uncompact_info_;
|
||||
// cache of ls_infos in __all_ls_meta_table
|
||||
share::ObCompactionLocalityCache ls_locality_cache_;
|
||||
// statistics section
|
||||
|
@ -295,5 +295,75 @@ int ObTabletLSPairCache::get_tablet_ls_pairs(
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* -------------------------------------------------------------------ObUncompactInfo-------------------------------------------------------------------
|
||||
*/
|
||||
ObUncompactInfo::ObUncompactInfo()
|
||||
: diagnose_rw_lock_(ObLatchIds::MAJOR_FREEZE_DIAGNOSE_LOCK),
|
||||
tablets_(),
|
||||
table_ids_()
|
||||
{}
|
||||
|
||||
ObUncompactInfo::~ObUncompactInfo()
|
||||
{
|
||||
reset();
|
||||
}
|
||||
|
||||
void ObUncompactInfo::reset()
|
||||
{
|
||||
SpinWLockGuard w_guard(diagnose_rw_lock_);
|
||||
tablets_.reuse();
|
||||
table_ids_.reuse();
|
||||
}
|
||||
|
||||
void ObUncompactInfo::add_table(const uint64_t table_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SpinWLockGuard w_guard(diagnose_rw_lock_);
|
||||
if (table_ids_.count() < DEBUG_INFO_CNT
|
||||
&& OB_FAIL(table_ids_.push_back(table_id))) {
|
||||
LOG_WARN("fail to push_back", KR(ret), K(table_id));
|
||||
}
|
||||
}
|
||||
|
||||
void ObUncompactInfo::add_tablet(const share::ObTabletReplica &replica)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SpinWLockGuard w_guard(diagnose_rw_lock_);
|
||||
if (tablets_.count() < DEBUG_INFO_CNT
|
||||
&& OB_FAIL(tablets_.push_back(replica))) {
|
||||
LOG_WARN("fail to push_back", KR(ret), K(replica));
|
||||
}
|
||||
}
|
||||
|
||||
void ObUncompactInfo::add_tablet(
|
||||
const uint64_t tenant_id,
|
||||
const share::ObLSID &ls_id,
|
||||
const common::ObTabletID &tablet_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTabletReplica fake_replica;
|
||||
fake_replica.fake_for_diagnose(tenant_id, ls_id, tablet_id);
|
||||
SpinWLockGuard w_guard(diagnose_rw_lock_);
|
||||
if (tablets_.count() < DEBUG_INFO_CNT
|
||||
&& OB_FAIL(tablets_.push_back(fake_replica))) {
|
||||
LOG_WARN("fail to push_back", KR(ret), K(fake_replica));
|
||||
}
|
||||
}
|
||||
|
||||
int ObUncompactInfo::get_uncompact_info(
|
||||
ObIArray<ObTabletReplica> &input_tablets,
|
||||
ObIArray<uint64_t> &input_table_ids) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SpinRLockGuard r_guard(diagnose_rw_lock_);
|
||||
if (OB_FAIL(input_tablets.assign(tablets_))) {
|
||||
LOG_WARN("fail to assign uncompacted_tablets", KR(ret), K_(tablets));
|
||||
} else if (OB_FAIL(input_table_ids.assign(table_ids_))) {
|
||||
LOG_WARN("fail to assign uncompacted_tablets", KR(ret), K_(table_ids));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // namespace compaction
|
||||
} // namespace oceanbase
|
||||
|
@ -15,6 +15,10 @@
|
||||
#include "share/ob_balance_define.h"
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace share
|
||||
{
|
||||
class ObTabletReplica;
|
||||
}
|
||||
namespace compaction
|
||||
{
|
||||
|
||||
@ -268,6 +272,27 @@ private:
|
||||
hash::ObHashMap<common::ObTabletID, share::ObLSID> map_;
|
||||
};
|
||||
|
||||
struct ObUncompactInfo
|
||||
{
|
||||
public:
|
||||
ObUncompactInfo();
|
||||
~ObUncompactInfo();
|
||||
void reset();
|
||||
void add_table(const uint64_t table_id);
|
||||
void add_tablet(const share::ObTabletReplica &replica);
|
||||
void add_tablet(
|
||||
const uint64_t tenant_id,
|
||||
const share::ObLSID &ls_id,
|
||||
const common::ObTabletID &tablet_id);
|
||||
int get_uncompact_info(
|
||||
common::ObIArray<share::ObTabletReplica> &input_tablets,
|
||||
common::ObIArray<uint64_t> &input_table_ids) const;
|
||||
static const int64_t DEBUG_INFO_CNT = 3;
|
||||
common::SpinRWLock diagnose_rw_lock_;
|
||||
common::ObSEArray<share::ObTabletReplica, DEBUG_INFO_CNT> tablets_; // record for diagnose
|
||||
common::ObSEArray<uint64_t, DEBUG_INFO_CNT> table_ids_; // record for diagnose
|
||||
};
|
||||
|
||||
} // namespace compaction
|
||||
} // namespace oceanbase
|
||||
|
||||
|
@ -3685,7 +3685,8 @@ int ObDagNetScheduler::loop_running_dag_net_map()
|
||||
int ObDagNetScheduler::loop_blocking_dag_net_list()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(OB_ISNULL(scheduler_))) {
|
||||
if (OB_ISNULL(scheduler_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
COMMON_LOG(WARN, "scheduler is null", KP(scheduler_));
|
||||
} else {
|
||||
ObMutexGuard guard(dag_net_map_lock_);
|
||||
|
@ -124,6 +124,16 @@ bool ObTabletReplica::is_equal_for_report(const ObTabletReplica &other) const
|
||||
return is_equal;
|
||||
}
|
||||
|
||||
void ObTabletReplica::fake_for_diagnose(const uint64_t tenant_id,
|
||||
const share::ObLSID &ls_id,
|
||||
const common::ObTabletID &tablet_id)
|
||||
{
|
||||
reset();
|
||||
tenant_id_ = tenant_id;
|
||||
ls_id_ = ls_id;
|
||||
tablet_id_ = tablet_id;
|
||||
}
|
||||
|
||||
ObTabletInfo::ObTabletInfo()
|
||||
: tenant_id_(OB_INVALID_TENANT_ID),
|
||||
tablet_id_(),
|
||||
|
@ -74,6 +74,10 @@ public:
|
||||
const int64_t required_size,
|
||||
const int64_t report_scn,
|
||||
const ScnStatus status);
|
||||
void fake_for_diagnose(
|
||||
const uint64_t tenant_id,
|
||||
const share::ObLSID &ls_id,
|
||||
const common::ObTabletID &tablet_id);
|
||||
bool is_equal_for_report(const ObTabletReplica &other) const;
|
||||
static bool is_status_valid(const ScnStatus status)
|
||||
{
|
||||
|
Reference in New Issue
Block a user