accelerate RS major merge
This commit is contained in:
parent
264d33917d
commit
e0796d7598
@ -285,7 +285,8 @@ int ObTabletChecksumValidator::check_all_table_verification_finished(
|
||||
}
|
||||
const int64_t cost_time_us = ObTimeUtil::current_time() - start_time_us;
|
||||
merge_time_statistics.update_merge_status_us_.tablet_validator_us_ = cost_time_us;
|
||||
|
||||
LOG_INFO("finish to check all table verification finished", KR(ret), K_(tenant_id), K(frozen_scn),
|
||||
K(expected_epoch), K(stop), K(cost_time_us));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -344,16 +345,16 @@ int ObTabletChecksumValidator::check_table_compaction_and_validate_checksum(
|
||||
LOG_WARN("fail to validate tablet replica checksum", KR(ret), K(frozen_scn), K(table_id));
|
||||
}
|
||||
}
|
||||
// set this table as COMPACTED/CAN_SKIP_VERIFYING if needed
|
||||
// set this table as COMPACTED/VERIFIED if needed
|
||||
if (OB_SUCC(ret) && need_update_map) {
|
||||
if (OB_FAIL(table_compaction_map.set_refactored(table_id, latest_compaction_info, true/*overwrite*/))) {
|
||||
LOG_WARN("fail to set refactored", KR(ret), K(table_id), K(latest_compaction_info));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else { // like VIEW, it does not have tablet, treat it as compaction finished and can skip verifying
|
||||
} else { // like VIEW, it does not have tablet, treat it as compaction finished and VERIFIED
|
||||
latest_compaction_info.tablet_cnt_ = 0;
|
||||
latest_compaction_info.set_can_skip_verifying();
|
||||
latest_compaction_info.set_verified();
|
||||
if (OB_FAIL(table_compaction_map.set_refactored(table_id, latest_compaction_info, true/*overwrite*/))) {
|
||||
LOG_WARN("fail to set refactored", KR(ret), K(table_id), K(latest_compaction_info));
|
||||
}
|
||||
@ -400,7 +401,8 @@ int ObTabletChecksumValidator::check_table_compaction_info(
|
||||
if (OB_SUCC(ret) && (idx == tablet_cnt)) {
|
||||
latest_compaction_info.tablet_cnt_ = tablet_ids.count();
|
||||
if (exist_skip_verifying_tablet) {
|
||||
latest_compaction_info.set_can_skip_verifying();
|
||||
// for table that exists CAN_SKIP_VERIFYING tablet, direct mark this table as VERIFIED
|
||||
latest_compaction_info.set_verified();
|
||||
} else {
|
||||
latest_compaction_info.set_compacted();
|
||||
}
|
||||
@ -446,17 +448,17 @@ int ObTabletChecksumValidator::validate_tablet_replica_checksum(
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
ObCrossClusterTabletChecksumValidator::ObCrossClusterTabletChecksumValidator()
|
||||
: major_merge_start_us_(-1)
|
||||
: major_merge_start_us_(-1), is_all_tablet_checksum_exist_(false)
|
||||
{
|
||||
}
|
||||
|
||||
int ObCrossClusterTabletChecksumValidator::check_need_validate(
|
||||
int ObCrossClusterTabletChecksumValidator::check_and_set_validate(
|
||||
const bool is_primary_service,
|
||||
const SCN &frozen_scn,
|
||||
bool &need_validate) const
|
||||
const share::SCN &frozen_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_exist = false;
|
||||
is_all_tablet_checksum_exist_ = false; // reset is_all_tablet_checksum_exist_
|
||||
if (OB_UNLIKELY(!frozen_scn.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(is_primary_service), K(frozen_scn));
|
||||
@ -467,31 +469,15 @@ int ObCrossClusterTabletChecksumValidator::check_need_validate(
|
||||
LOG_WARN("fail to check is first tablet in first ls exist", KR(ret), K_(tenant_id));
|
||||
} else if (is_exist) {
|
||||
// need to check cross-cluster checksum on primary tenant when all tablet checksum exist
|
||||
need_validate = true;
|
||||
need_validate_ = true;
|
||||
is_all_tablet_checksum_exist_ = true;
|
||||
} else {
|
||||
// no need to check cross-cluster checksum on primary tenant when not all tablet checksum exist
|
||||
need_validate = false;
|
||||
need_validate_ = false;
|
||||
}
|
||||
} else {
|
||||
// need to check cross-cluster checksum on standby tenant
|
||||
need_validate = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCrossClusterTabletChecksumValidator::check_and_set_validate(
|
||||
const bool is_primary_service,
|
||||
const share::SCN &frozen_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool need_validate = false;
|
||||
if (OB_UNLIKELY(!frozen_scn.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(is_primary_service), K(frozen_scn));
|
||||
} else if (OB_FAIL(check_need_validate(is_primary_service, frozen_scn, need_validate))) {
|
||||
LOG_WARN("fail to check need validate", KR(ret), K_(tenant_id), K(is_primary_service), K(frozen_scn));
|
||||
} else {
|
||||
set_need_validate(need_validate);
|
||||
need_validate_ = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -559,14 +545,8 @@ int ObCrossClusterTabletChecksumValidator::check_all_table_verification_finished
|
||||
}
|
||||
}
|
||||
}
|
||||
} else { // like VIEW that has no tablet, update report_scn for this table and mark it as VERIFIED
|
||||
if (cur_compaction_info.is_index_ckm_verified()) {
|
||||
if (OB_FAIL(handle_table_verification_finished(stop, simple_schema, frozen_scn,
|
||||
table_compaction_map, merge_time_statistics, expected_epoch))) {
|
||||
LOG_WARN("fail to handle table verification finished", KR(ret), K_(tenant_id),
|
||||
K(frozen_scn), KPC(simple_schema));
|
||||
}
|
||||
}
|
||||
} else { // like VIEW that has no tablet, no need to validate cross-cluster checksum
|
||||
// do nothing. should has been marked as VERIFIED by ObTabletChecksumValidator
|
||||
}
|
||||
if (OB_CHECKSUM_ERROR == ret) {
|
||||
check_ret = ret;
|
||||
@ -593,7 +573,8 @@ int ObCrossClusterTabletChecksumValidator::check_all_table_verification_finished
|
||||
}
|
||||
const int64_t cost_time_us = ObTimeUtil::current_time() - start_time_us;
|
||||
merge_time_statistics.update_merge_status_us_.cross_cluster_validator_us_ = cost_time_us;
|
||||
|
||||
LOG_INFO("finish to check all table verification finished", KR(ret), K_(tenant_id), K(frozen_scn),
|
||||
K(expected_epoch), K(stop), K(cost_time_us));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -620,12 +601,10 @@ int ObCrossClusterTabletChecksumValidator::validate_cross_cluster_checksum(
|
||||
ObTimeUtil::current_time());
|
||||
}
|
||||
// check whether all tablet checksum has already exist
|
||||
bool is_exist = false;
|
||||
FREEZE_TIME_GUARD;
|
||||
if (OB_FAIL(ObTabletChecksumOperator::is_first_tablet_in_sys_ls_exist(*sql_proxy_,
|
||||
tenant_id_, frozen_scn, is_exist))) {
|
||||
LOG_WARN("fail to check is first tablet in first ls exist", KR(ret), K_(tenant_id), K(frozen_scn));
|
||||
} else if (is_exist || is_wait_tablet_checksum_timeout) { // all tablet checksum exist or timeout
|
||||
if (OB_FAIL(check_if_all_tablet_checksum_exist(frozen_scn))) {
|
||||
LOG_WARN("fail to check if all tablet checksum exist", KR(ret), K_(tenant_id), K(frozen_scn));
|
||||
} else if (is_all_tablet_checksum_exist_ || is_wait_tablet_checksum_timeout) { // all tablet checksum exist or timeout
|
||||
if (OB_FAIL(check_cross_cluster_checksum(*simple_schema, frozen_scn))) {
|
||||
if (OB_ITEM_NOT_MATCH == ret) {
|
||||
if (OB_TMP_FAIL(handle_table_can_not_verify(simple_schema->get_table_id(), table_compaction_map))) {
|
||||
@ -772,6 +751,26 @@ bool ObCrossClusterTabletChecksumValidator::is_first_tablet_in_sys_ls(const ObTa
|
||||
return (item.ls_id_.is_sys_ls()) && (item.tablet_id_.id() == ObTabletID::MIN_VALID_TABLET_ID);
|
||||
}
|
||||
|
||||
// 1. is_all_tablet_checksum_exist_ = true: do nothing
|
||||
// 2. is_all_tablet_checksum_exist_ = false: check and update is_all_tablet_checksum_exist_
|
||||
int ObCrossClusterTabletChecksumValidator::check_if_all_tablet_checksum_exist(
|
||||
const SCN &frozen_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_exist = false;
|
||||
if (is_all_tablet_checksum_exist_) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(ObTabletChecksumOperator::is_first_tablet_in_sys_ls_exist(*sql_proxy_,
|
||||
tenant_id_, frozen_scn, is_exist))) {
|
||||
LOG_WARN("fail to check is first tablet in first ls exist", KR(ret), K_(tenant_id), K(frozen_scn));
|
||||
} else {
|
||||
// update is_all_tablet_checksum_exist_ according to the result of
|
||||
// ObTabletChecksumOperator::is_first_tablet_in_sys_ls_exist
|
||||
is_all_tablet_checksum_exist_ = is_exist;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObCrossClusterTabletChecksumValidator::check_waiting_tablet_checksum_timeout() const
|
||||
{
|
||||
const int64_t MAX_TABLET_CHECKSUM_WAIT_TIME_US = 36 * 3600 * 1000 * 1000L; // 36 hours
|
||||
@ -893,7 +892,6 @@ int ObCrossClusterTabletChecksumValidator::write_tablet_checksum_at_table_level(
|
||||
const int64_t expected_epoch)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_exist = false;
|
||||
FREEZE_TIME_GUARD;
|
||||
if (OB_UNLIKELY(pairs.empty()
|
||||
|| (!table_compaction_info.is_index_ckm_verified() && (MAJOR_MERGE_SPECIAL_TABLE_ID != table_id))
|
||||
@ -905,14 +903,8 @@ int ObCrossClusterTabletChecksumValidator::write_tablet_checksum_at_table_level(
|
||||
ret = OB_CANCELED;
|
||||
LOG_WARN("already stop", KR(ret), K_(tenant_id));
|
||||
} else if (!is_primary_service_) { // only primary major_freeze_service need to write tablet checksum
|
||||
} else if (OB_FAIL(ObTabletChecksumOperator::is_first_tablet_in_sys_ls_exist(*sql_proxy_,
|
||||
tenant_id_, frozen_scn, is_exist))) {
|
||||
LOG_WARN("fail to check is first tablet in first ls exist", KR(ret), K_(tenant_id));
|
||||
} else if (!is_exist) {
|
||||
if (table_compaction_info.can_skip_verifying()) {
|
||||
// do not write tablet checksum items for tables that can skip verifying,
|
||||
// since tablet checksum items of these tables must have already been written
|
||||
} else if ((table_compaction_info.is_index_ckm_verified() && (MAJOR_MERGE_SPECIAL_TABLE_ID != table_id))
|
||||
} else if (!is_all_tablet_checksum_exist_) {
|
||||
if ((table_compaction_info.is_index_ckm_verified() && (MAJOR_MERGE_SPECIAL_TABLE_ID != table_id))
|
||||
|| (table_compaction_info.is_verified() && (MAJOR_MERGE_SPECIAL_TABLE_ID == table_id))) {
|
||||
const int64_t IMMEDIATE_RETRY_CNT = 5;
|
||||
int64_t fail_count = 0;
|
||||
@ -1122,11 +1114,15 @@ int ObIndexChecksumValidator::check_all_table_verification_finished(
|
||||
LOG_WARN("fail to get tenant table schemas", KR(ret), K_(tenant_id));
|
||||
} else {
|
||||
table_count = table_schemas.count();
|
||||
int64_t last_epoch_check_us = ObTimeUtil::fast_current_time();
|
||||
for (int64_t i = 0; (i < table_count) && OB_SUCC(ret) && !stop; ++i) {
|
||||
const ObSimpleTableSchemaV2 *simple_schema = table_schemas.at(i);
|
||||
if (OB_ISNULL(simple_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error, simple schema is null", KR(ret), K_(tenant_id));
|
||||
} else if (OB_FAIL(ObMajorFreezeUtil::check_epoch_periodically(*sql_proxy_, tenant_id_,
|
||||
expected_epoch, last_epoch_check_us))) {
|
||||
LOG_WARN("fail to check freeze service epoch", KR(ret), K_(tenant_id), K(stop));
|
||||
} else {
|
||||
const uint64_t table_id = simple_schema->get_table_id();
|
||||
ObTableCompactionInfo cur_compaction_info;
|
||||
@ -1140,8 +1136,8 @@ int ObIndexChecksumValidator::check_all_table_verification_finished(
|
||||
if (simple_schema->can_read_index()) {
|
||||
// 1. for index table can read, may need to check column checksum
|
||||
if (OB_FAIL(handle_index_table(frozen_scn, cur_compaction_info, simple_schema,
|
||||
schema_guard, table_compaction_map, expected_epoch))) {
|
||||
LOG_WARN("fail to handle index table", KR(ret), K(frozen_scn), K(simple_schema), K(expected_epoch));
|
||||
schema_guard, table_compaction_map))) {
|
||||
LOG_WARN("fail to handle index table", KR(ret), K(frozen_scn), K(simple_schema));
|
||||
}
|
||||
} else { // !simple_schema->can_read_index()
|
||||
// 2. for index table can not read, directly mark it as VERIFIED
|
||||
@ -1188,7 +1184,8 @@ int ObIndexChecksumValidator::check_all_table_verification_finished(
|
||||
}
|
||||
const int64_t cost_time_us = ObTimeUtil::current_time() - start_time_us;
|
||||
merge_time_statistics.update_merge_status_us_.index_validator_us_ = cost_time_us;
|
||||
|
||||
LOG_INFO("finish to check all table verification finished", KR(ret), K_(tenant_id), K(frozen_scn),
|
||||
K(expected_epoch), K(stop), K(cost_time_us));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1201,7 +1198,7 @@ int ObIndexChecksumValidator::update_data_table_verified(
|
||||
int ret = OB_SUCCESS;
|
||||
if (data_table_compaction.is_index_ckm_verified()
|
||||
|| data_table_compaction.is_verified()) { // skip if already finished verification
|
||||
} else if (data_table_compaction.finish_compaction()) {
|
||||
} else if (data_table_compaction.is_compacted()) {
|
||||
if (OB_FAIL(handle_table_verification_finished(data_table_id, frozen_scn, table_compaction_map))) {
|
||||
LOG_WARN("fail to handle table compaction finished", KR(ret), K(data_table_id), K(frozen_scn));
|
||||
}
|
||||
@ -1226,7 +1223,7 @@ int ObIndexChecksumValidator::handle_table_verification_finished(
|
||||
LOG_WARN("fail to get refactored", KR(ret), K(table_id));
|
||||
} else if (cur_compaction_info.is_index_ckm_verified()
|
||||
|| cur_compaction_info.is_verified()) { // skip if finished verification
|
||||
} else if (!cur_compaction_info.finish_compaction()) {
|
||||
} else if (!cur_compaction_info.is_compacted()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("table must finish compaction when arriving here", KR(ret), K(table_id), K(cur_compaction_info));
|
||||
} else {
|
||||
@ -1291,7 +1288,7 @@ int ObIndexChecksumValidator::check_data_table(
|
||||
ObTableCompactionInfo table_compaction_info;
|
||||
if (OB_FAIL(table_compaction_map.get_refactored(table_id, table_compaction_info))) {
|
||||
LOG_WARN("fail to get refactored", KR(ret), K(table_id));
|
||||
} else if (table_compaction_info.finish_compaction()) {
|
||||
} else if (table_compaction_info.is_compacted()) {
|
||||
if (OB_FAIL(data_tables_to_update.push_back(table_id))) {
|
||||
LOG_WARN("fail to push back", KR(ret), K(table_id));
|
||||
}
|
||||
@ -1333,8 +1330,7 @@ int ObIndexChecksumValidator::handle_index_table(
|
||||
const ObTableCompactionInfo &index_compaction_info,
|
||||
const ObSimpleTableSchemaV2 *index_simple_schema,
|
||||
ObSchemaGetterGuard &schema_guard,
|
||||
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
||||
const int64_t expected_epoch)
|
||||
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -1361,13 +1357,13 @@ int ObIndexChecksumValidator::handle_index_table(
|
||||
LOG_INFO("index table is not verified while data table is already verified, skip"
|
||||
" verification for this index table", K(index_table_id), K(data_table_id),
|
||||
K(index_compaction_info), K(data_compaction_info));
|
||||
if (index_compaction_info.finish_compaction()) {
|
||||
if (index_compaction_info.is_compacted()) {
|
||||
if (OB_FAIL(handle_table_verification_finished(index_table_id, frozen_scn, table_compaction_map))) {
|
||||
LOG_WARN("fail to handle index table compaction finished", KR(ret), K(index_table_id), K(frozen_scn));
|
||||
}
|
||||
}
|
||||
} else if (index_simple_schema->has_tablet()) {
|
||||
if (!index_compaction_info.finish_compaction() || !data_compaction_info.finish_compaction()) {
|
||||
if (!index_compaction_info.is_compacted() || !data_compaction_info.is_compacted()) {
|
||||
} else if (index_compaction_info.is_compacted() && data_compaction_info.is_compacted()) {
|
||||
#ifdef ERRSIM
|
||||
ret = OB_E(EventTable::EN_MEDIUM_VERIFY_GROUP_SKIP_SET_VERIFY) OB_SUCCESS;
|
||||
@ -1386,7 +1382,7 @@ int ObIndexChecksumValidator::handle_index_table(
|
||||
if (need_validate()) {
|
||||
FREEZE_TIME_GUARD;
|
||||
if (FAILEDx(ObTabletReplicaChecksumOperator::check_column_checksum(tenant_id_,
|
||||
*data_simple_schema, *index_simple_schema, frozen_scn, *sql_proxy_, expected_epoch))) {
|
||||
*data_simple_schema, *index_simple_schema, frozen_scn, *sql_proxy_))) {
|
||||
if ((OB_ITEM_NOT_MATCH == ret) || (OB_TABLE_NOT_EXIST == ret)) {
|
||||
if (OB_TMP_FAIL(handle_table_can_not_verify(index_table_id, table_compaction_map))) {
|
||||
LOG_WARN("fail to handle table can not verify", KR(tmp_ret), K(index_table_id));
|
||||
@ -1409,19 +1405,9 @@ int ObIndexChecksumValidator::handle_index_table(
|
||||
LOG_WARN("fail to handle table verification finished", KR(ret), K(index_table_id), K(frozen_scn));
|
||||
}
|
||||
}
|
||||
} else if (index_compaction_info.can_skip_verifying() || data_compaction_info.can_skip_verifying()) {
|
||||
// if one of them can skip verifying, that means we don't need to execute index checksum verification.
|
||||
// Mark index table as INDEX_CKM_VERIFIED directly.
|
||||
if (OB_FAIL(handle_table_verification_finished(index_table_id, frozen_scn, table_compaction_map))) {
|
||||
LOG_WARN("fail to handle index table verification finished", KR(ret), K(index_table_id), K(frozen_scn));
|
||||
}
|
||||
}
|
||||
} else { // virtual index table has no tablet, no need to execute index checksum verification.
|
||||
if (index_compaction_info.finish_compaction() && data_compaction_info.finish_compaction()) {
|
||||
if (OB_FAIL(handle_table_verification_finished(index_table_id, frozen_scn, table_compaction_map))) {
|
||||
LOG_WARN("fail to handle index table verification finished", KR(ret), K(index_table_id), K(frozen_scn));
|
||||
}
|
||||
}
|
||||
// do nothing. should has been marked as VERIFIED by ObTabletChecksumValidator
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -187,15 +187,13 @@ private:
|
||||
const share::schema::ObSimpleTableSchemaV2 *simple_schema,
|
||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
||||
ObMergeTimeStatistics &merge_time_statistics);
|
||||
int check_need_validate(const bool is_primary_service,
|
||||
const share::SCN &frozen_scn,
|
||||
bool &need_validate) const;
|
||||
int check_cross_cluster_checksum(const share::schema::ObSimpleTableSchemaV2 &simple_schema,
|
||||
const share::SCN &frozen_scn);
|
||||
void sort_tablet_ids(ObArray<ObTabletID> &tablet_ids);
|
||||
int check_column_checksum(const ObArray<share::ObTabletReplicaChecksumItem> &tablet_replica_checksum_items,
|
||||
const ObArray<share::ObTabletChecksumItem> &tablet_checksum_items);
|
||||
bool is_first_tablet_in_sys_ls(const share::ObTabletReplicaChecksumItem &item) const;
|
||||
int check_if_all_tablet_checksum_exist(const share::SCN &frozen_scn);
|
||||
bool check_waiting_tablet_checksum_timeout() const;
|
||||
// handle the table, update its all tablets' status if needed. And update its compaction_info in @table_compaction_map
|
||||
int handle_table_verification_finished(const volatile bool &stop,
|
||||
@ -215,6 +213,7 @@ private:
|
||||
const static int64_t MAX_BATCH_INSERT_COUNT = 100;
|
||||
// record the time when starting to major merge, used for check_waiting_tablet_checksum_timeout
|
||||
int64_t major_merge_start_us_;
|
||||
bool is_all_tablet_checksum_exist_;
|
||||
};
|
||||
|
||||
// Mainly to verify checksum between (global and local) index table and main table
|
||||
@ -268,8 +267,7 @@ private:
|
||||
const share::ObTableCompactionInfo &index_compaction_info,
|
||||
const share::schema::ObSimpleTableSchemaV2 *index_simple_schema,
|
||||
share::schema::ObSchemaGetterGuard &schema_guard,
|
||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
||||
const int64_t expected_epoch);
|
||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map);
|
||||
// This function is specially designed to make it easier for troubleshooting. Moreover, this
|
||||
// function will not modify table_compaction_map, which ensures major compaction will not be
|
||||
// affected by this function.
|
||||
|
@ -429,8 +429,15 @@ int ObMajorMergeProgressChecker::check_tablet_compaction_scn(
|
||||
std::lower_bound(all_progress.begin(), all_progress.end(), ls_r->get_zone());
|
||||
if ((p != all_progress.end()) && (p->zone_ == ls_r->get_zone())) {
|
||||
SCN replica_snapshot_scn;
|
||||
SCN replica_report_scn;
|
||||
if (OB_FAIL(replica_snapshot_scn.convert_for_tx(r->get_snapshot_version()))) {
|
||||
LOG_WARN("fail to convert val to SCN", KR(ret), "snapshot_version", r->get_snapshot_version());
|
||||
} else if (replica_report_scn.convert_for_tx(r->get_report_scn())) {
|
||||
LOG_WARN("fail to convert val to SCN", KR(ret), "report_scn", r->get_report_scn());
|
||||
} else if (replica_report_scn > replica_snapshot_scn) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected report_scn and snapshot_scn", KR(ret), "report_scn",
|
||||
r->get_report_scn(), "snapshot_scn", r->get_snapshot_version());
|
||||
} else if ((REPLICA_TYPE_LOGONLY == ls_r->get_replica_type())
|
||||
|| (REPLICA_TYPE_ENCRYPTION_LOGONLY == ls_r->get_replica_type())) {
|
||||
// logonly replica no need check
|
||||
@ -440,17 +447,22 @@ int ObMajorMergeProgressChecker::check_tablet_compaction_scn(
|
||||
p->smallest_snapshot_scn_ = replica_snapshot_scn;
|
||||
}
|
||||
if (replica_snapshot_scn >= global_broadcast_scn) {
|
||||
if (replica_snapshot_scn > global_broadcast_scn) {
|
||||
tablet_need_verify = false; // this tablet doesn't need to execute checksum verification
|
||||
if (replica_snapshot_scn > global_broadcast_scn) { // launched another medium compaction
|
||||
tablet_need_verify = false; // this tablet does not need to execute checksum verification
|
||||
} else { // replica_snapshot_scn == global_broadcast_scn
|
||||
// check tablet replica status when replica_snapshot_scn = global_broadcast_scn,
|
||||
// so as to find out checksum error occured before this round of major freeze.
|
||||
// not check tablet replica status when replica_snapshot_scn > global_broadcast_scn,
|
||||
// since the checksum error detected here may be caused by medium compaction after
|
||||
// this round of major freeze.
|
||||
if (ObTabletReplica::ScnStatus::SCN_STATUS_ERROR == r->get_status()) {
|
||||
ret = OB_CHECKSUM_ERROR;
|
||||
LOG_ERROR("ERROR! ERROR! ERROR! find error status tablet replica", KR(ret), K(tablet_info));
|
||||
if (replica_report_scn == global_broadcast_scn) { // finished verification on the old leader
|
||||
tablet_need_verify = false; // this tablet does not need to execute checksum verification
|
||||
} else { // replica_report_scn < global_broadcast_scn
|
||||
// check tablet replica status when replica_snapshot_scn = global_broadcast_scn
|
||||
// and replica_report_scn < global_broadcast_scn, so as to find out checksum error
|
||||
// occured before this round of major freeze. do not check tablet replica status
|
||||
// when replica_snapshot_scn > global_broadcast_scn or replica_report_scn =
|
||||
// global_broadcast_scn, since the checksum error detected here may be caused by
|
||||
// medium compaction after this round of major freeze.
|
||||
if (ObTabletReplica::ScnStatus::SCN_STATUS_ERROR == r->get_status()) {
|
||||
ret = OB_CHECKSUM_ERROR;
|
||||
LOG_ERROR("ERROR! ERROR! ERROR! find error status tablet replica", KR(ret), K(tablet_info));
|
||||
}
|
||||
}
|
||||
}
|
||||
++(p->merged_tablet_cnt_);
|
||||
|
@ -1015,8 +1015,7 @@ int ObTabletReplicaChecksumOperator::check_column_checksum(
|
||||
const ObSimpleTableSchemaV2 &data_simple_schema,
|
||||
const ObSimpleTableSchemaV2 &index_simple_schema,
|
||||
const SCN &compaction_scn,
|
||||
ObMySQLProxy &sql_proxy,
|
||||
const int64_t expected_epoch)
|
||||
ObMySQLProxy &sql_proxy)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const uint64_t index_table_id = index_simple_schema.get_table_id();
|
||||
@ -1045,7 +1044,7 @@ int ObTabletReplicaChecksumOperator::check_column_checksum(
|
||||
const bool is_global_index = index_simple_schema.is_global_index_table();
|
||||
if (is_global_index) {
|
||||
if (OB_FAIL(check_global_index_column_checksum(tenant_id, *data_table_schema, *index_table_schema,
|
||||
compaction_scn, sql_proxy, expected_epoch))) {
|
||||
compaction_scn, sql_proxy))) {
|
||||
LOG_WARN("fail to check global index column checksum", KR(ret), K(tenant_id), K(compaction_scn));
|
||||
}
|
||||
} else if (OB_UNLIKELY(index_simple_schema.is_spatial_index())) {
|
||||
@ -1053,7 +1052,7 @@ int ObTabletReplicaChecksumOperator::check_column_checksum(
|
||||
// spatial index column is different from data table column
|
||||
} else {
|
||||
if (OB_FAIL(check_local_index_column_checksum(tenant_id, *data_table_schema, *index_table_schema,
|
||||
compaction_scn, sql_proxy, expected_epoch))) {
|
||||
compaction_scn, sql_proxy))) {
|
||||
LOG_WARN("fail to check local index column checksum", KR(ret), K(tenant_id), K(compaction_scn));
|
||||
}
|
||||
}
|
||||
@ -1066,8 +1065,7 @@ int ObTabletReplicaChecksumOperator::check_global_index_column_checksum(
|
||||
const ObTableSchema &data_table_schema,
|
||||
const ObTableSchema &index_table_schema,
|
||||
const SCN &compaction_scn,
|
||||
ObMySQLProxy &sql_proxy,
|
||||
const int64_t expected_epoch)
|
||||
ObMySQLProxy &sql_proxy)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -1122,16 +1120,6 @@ int ObTabletReplicaChecksumOperator::check_global_index_column_checksum(
|
||||
} else if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
|
||||
LOG_WARN("fail to get data table tablet checksum items", KR(ret), K(data_table_schema));
|
||||
}
|
||||
} else if (OB_FAIL(ObServiceEpochProxy::check_service_epoch(sql_proxy, tenant_id,
|
||||
ObServiceEpochProxy::FREEZE_SERVICE_EPOCH, expected_epoch, is_match))) {
|
||||
LOG_WARN("fail to check service epoch", KR(ret), K(tenant_id), K(compaction_scn), K(expected_epoch));
|
||||
} else if (!is_match) {
|
||||
// Do not compare column checksum in case of OB_FREEZE_SERVICE_EPOCH_MISMATCH, since
|
||||
// tablet replica checksum items may be incomplete now.
|
||||
//
|
||||
ret = OB_FREEZE_SERVICE_EPOCH_MISMATCH;
|
||||
LOG_WARN("no need to compare column checksum, cuz freeze_service_epoch mismatch",
|
||||
KR(ret), K(tenant_id), K(compaction_scn), K(expected_epoch));
|
||||
} else if (need_verify_checksum_(compaction_scn, index_table_schema, index_table_ckm_items,
|
||||
need_verify, index_ckm_tablet_cnt)) {
|
||||
LOG_WARN("fail to check need verfy checksum", KR(ret), K(compaction_scn), K(index_table_id), K(data_table_id));
|
||||
@ -1178,8 +1166,7 @@ int ObTabletReplicaChecksumOperator::check_local_index_column_checksum(
|
||||
const ObTableSchema &data_table_schema,
|
||||
const ObTableSchema &index_table_schema,
|
||||
const SCN &compaction_scn,
|
||||
ObMySQLProxy &sql_proxy,
|
||||
const int64_t expected_epoch)
|
||||
ObMySQLProxy &sql_proxy)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -1208,15 +1195,6 @@ int ObTabletReplicaChecksumOperator::check_local_index_column_checksum(
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tablet count of local index table is not same with data table", KR(ret), "data_table_tablet_cnt",
|
||||
data_table_tablets.count(), "index_table_tablet_cnt", index_table_tablets.count());
|
||||
} else if (OB_FAIL(ObServiceEpochProxy::check_service_epoch(sql_proxy, tenant_id,
|
||||
ObServiceEpochProxy::FREEZE_SERVICE_EPOCH, expected_epoch, is_match))) {
|
||||
LOG_WARN("fail to check service epoch", KR(ret), K(tenant_id), K(compaction_scn), K(expected_epoch));
|
||||
} else if (!is_match) {
|
||||
// Do not compare column checksum in case of OB_FREEZE_SERVICE_EPOCH_MISMATCH, since
|
||||
// tablet replica checksum items may be incomplete now.
|
||||
ret = OB_FREEZE_SERVICE_EPOCH_MISMATCH;
|
||||
LOG_WARN("no need to compare column checksum, cuz freeze_service_epoch mismatch",
|
||||
KR(ret), K(tenant_id), K(compaction_scn), K(expected_epoch));
|
||||
} else if (need_verify_checksum_(compaction_scn, index_table_schema, index_table_ckm_items,
|
||||
need_verify, index_ckm_tablet_cnt)) {
|
||||
LOG_WARN("fail to check need verfy checksum", KR(ret), K(compaction_scn), K(index_table_id), K(data_table_id));
|
||||
|
@ -177,8 +177,7 @@ public:
|
||||
const schema::ObSimpleTableSchemaV2 &data_simple_schema,
|
||||
const schema::ObSimpleTableSchemaV2 &index_simple_schema,
|
||||
const SCN &compaction_scn,
|
||||
common::ObMySQLProxy &sql_proxy,
|
||||
const int64_t expected_epoch);
|
||||
common::ObMySQLProxy &sql_proxy);
|
||||
|
||||
static int set_column_meta_with_hex_str(
|
||||
const ObString &hex_str,
|
||||
@ -266,16 +265,14 @@ private:
|
||||
const schema::ObTableSchema &data_table_schema,
|
||||
const schema::ObTableSchema &index_table_schema,
|
||||
const SCN &compaction_scn,
|
||||
common::ObMySQLProxy &sql_proxy,
|
||||
const int64_t expected_epoch);
|
||||
common::ObMySQLProxy &sql_proxy);
|
||||
|
||||
static int check_local_index_column_checksum(
|
||||
const uint64_t tenant_id,
|
||||
const schema::ObTableSchema &data_table_schema,
|
||||
const schema::ObTableSchema &index_table_schema,
|
||||
const SCN &compaction_scn,
|
||||
common::ObMySQLProxy &sql_proxy,
|
||||
const int64_t expected_epoch);
|
||||
common::ObMySQLProxy &sql_proxy);
|
||||
|
||||
// get column checksum_sum from items and store result in map
|
||||
// KV of @column_ckm_sum_map is: <column_id, column_checksum_sum>
|
||||
|
@ -190,8 +190,14 @@ public:
|
||||
enum ObTabletCompactionStatus
|
||||
{
|
||||
INITIAL = 0,
|
||||
COMPACTED, // tablet finished compaction
|
||||
CAN_SKIP_VERIFYING, // tablet finished compaction and not need to verify
|
||||
// tablet finished compaction
|
||||
COMPACTED,
|
||||
// tablet finished compaction and no need to verify checksum
|
||||
// 1. compaction_scn of this tablet > frozen_scn of this round major compaction. i.e., already
|
||||
// launched another medium compaction for this tablet.
|
||||
// 2. report_scn of this tablet > frozen_scn of this round major compaction. i.e., already
|
||||
// finished verification on the old leader.
|
||||
CAN_SKIP_VERIFYING,
|
||||
STATUS_MAX
|
||||
};
|
||||
|
||||
@ -202,11 +208,6 @@ public:
|
||||
INITIAL = 0,
|
||||
// already finished compaction and verified tablet checksum
|
||||
COMPACTED,
|
||||
// already finished compaction and can skip verification due to the following two reasons:
|
||||
// 1. this table has no tablet.
|
||||
// 2. this table has tablets, but compaction_scn of tablets > frozen_scn of this round major compaction.
|
||||
// i.e., already launched another medium compaction for this table.
|
||||
CAN_SKIP_VERIFYING,
|
||||
// already verified index checksum
|
||||
INDEX_CKM_VERIFIED,
|
||||
// already verified all kinds of checksum (i.e., tablet checksum, index checksum, cross-cluster checksum)
|
||||
@ -231,13 +232,10 @@ public:
|
||||
bool is_uncompacted() const { return Status::INITIAL == status_; }
|
||||
void set_compacted() { status_ = Status::COMPACTED; }
|
||||
bool is_compacted() const { return Status::COMPACTED == status_; }
|
||||
void set_can_skip_verifying() { status_ = Status::CAN_SKIP_VERIFYING; }
|
||||
bool can_skip_verifying() const { return Status::CAN_SKIP_VERIFYING == status_; }
|
||||
void set_index_ckm_verified() { status_ = Status::INDEX_CKM_VERIFIED; }
|
||||
bool is_index_ckm_verified() const { return Status::INDEX_CKM_VERIFIED == status_; }
|
||||
void set_verified() { status_ = Status::VERIFIED; }
|
||||
bool is_verified() const { return Status::VERIFIED == status_; }
|
||||
bool finish_compaction() const { return (is_compacted() || can_skip_verifying()); }
|
||||
|
||||
TO_STRING_KV(K_(table_id), K_(tablet_cnt), K_(status));
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user