fix index_checksum_validator about added index
This commit is contained in:
@ -104,6 +104,7 @@ int ObChecksumValidatorBase::validate_checksum(
|
|||||||
const hash::ObHashMap<ObTabletLSPair, ObTabletCompactionStatus> &tablet_compaction_map,
|
const hash::ObHashMap<ObTabletLSPair, ObTabletCompactionStatus> &tablet_compaction_map,
|
||||||
int64_t &table_count,
|
int64_t &table_count,
|
||||||
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
||||||
|
const ObIArray<uint64_t> &ori_table_ids,
|
||||||
ObMergeTimeStatistics &merge_time_statistics,
|
ObMergeTimeStatistics &merge_time_statistics,
|
||||||
const int64_t expected_epoch)
|
const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
@ -118,7 +119,7 @@ int ObChecksumValidatorBase::validate_checksum(
|
|||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", KR(ret), K_(tenant_id), K(frozen_scn));
|
LOG_WARN("invalid argument", KR(ret), K_(tenant_id), K(frozen_scn));
|
||||||
} else if (OB_FAIL(check_all_table_verification_finished(stop, frozen_scn, tablet_compaction_map,
|
} else if (OB_FAIL(check_all_table_verification_finished(stop, frozen_scn, tablet_compaction_map,
|
||||||
table_count, table_compaction_map, merge_time_statistics, expected_epoch))) {
|
table_count, table_compaction_map, ori_table_ids, merge_time_statistics, expected_epoch))) {
|
||||||
LOG_WARN("fail to check all table verification finished", KR(ret), K_(tenant_id), K(frozen_scn));
|
LOG_WARN("fail to check all table verification finished", KR(ret), K_(tenant_id), K(frozen_scn));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -221,11 +222,13 @@ int ObTabletChecksumValidator::check_all_table_verification_finished(
|
|||||||
const hash::ObHashMap<ObTabletLSPair, ObTabletCompactionStatus> &tablet_compaction_map,
|
const hash::ObHashMap<ObTabletLSPair, ObTabletCompactionStatus> &tablet_compaction_map,
|
||||||
int64_t &table_count,
|
int64_t &table_count,
|
||||||
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
||||||
|
const ObIArray<uint64_t> &ori_table_ids,
|
||||||
ObMergeTimeStatistics &merge_time_statistics,
|
ObMergeTimeStatistics &merge_time_statistics,
|
||||||
const int64_t expected_epoch)
|
const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int check_ret = OB_SUCCESS;
|
int check_ret = OB_SUCCESS;
|
||||||
|
UNUSED(ori_table_ids);
|
||||||
UNUSED(expected_epoch);
|
UNUSED(expected_epoch);
|
||||||
|
|
||||||
const int64_t start_time_us = ObTimeUtil::current_time();
|
const int64_t start_time_us = ObTimeUtil::current_time();
|
||||||
@ -498,11 +501,13 @@ int ObCrossClusterTabletChecksumValidator::check_all_table_verification_finished
|
|||||||
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
|
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
|
||||||
int64_t &table_count,
|
int64_t &table_count,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
||||||
|
const ObIArray<uint64_t> &ori_table_ids,
|
||||||
ObMergeTimeStatistics &merge_time_statistics,
|
ObMergeTimeStatistics &merge_time_statistics,
|
||||||
const int64_t expected_epoch)
|
const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int check_ret = OB_SUCCESS;
|
int check_ret = OB_SUCCESS;
|
||||||
|
UNUSED(ori_table_ids);
|
||||||
|
|
||||||
const int64_t start_time_us = ObTimeUtil::current_time();
|
const int64_t start_time_us = ObTimeUtil::current_time();
|
||||||
if (OB_UNLIKELY(!frozen_scn.is_valid() || tablet_compaction_map.empty())) {
|
if (OB_UNLIKELY(!frozen_scn.is_valid() || tablet_compaction_map.empty())) {
|
||||||
@ -1092,6 +1097,7 @@ int ObIndexChecksumValidator::check_all_table_verification_finished(
|
|||||||
const hash::ObHashMap<ObTabletLSPair, ObTabletCompactionStatus> &tablet_compaction_map,
|
const hash::ObHashMap<ObTabletLSPair, ObTabletCompactionStatus> &tablet_compaction_map,
|
||||||
int64_t &table_count,
|
int64_t &table_count,
|
||||||
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
||||||
|
const ObIArray<uint64_t> &ori_table_ids,
|
||||||
ObMergeTimeStatistics &merge_time_statistics,
|
ObMergeTimeStatistics &merge_time_statistics,
|
||||||
const int64_t expected_epoch)
|
const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
@ -1181,7 +1187,8 @@ int ObIndexChecksumValidator::check_all_table_verification_finished(
|
|||||||
if (OB_SUCC(ret) && (OB_SUCCESS == check_ret)) {
|
if (OB_SUCC(ret) && (OB_SUCCESS == check_ret)) {
|
||||||
// for data table with index, if all its index tables finished verification,
|
// for data table with index, if all its index tables finished verification,
|
||||||
// then mark it as INDEX_CKM_VERIFIED.
|
// then mark it as INDEX_CKM_VERIFIED.
|
||||||
if (OB_FAIL(handle_data_table_with_index(stop, frozen_scn, table_ids, table_schemas, table_compaction_map))) {
|
if (OB_FAIL(handle_data_table_with_index(stop, frozen_scn, table_ids, table_schemas,
|
||||||
|
ori_table_ids, table_compaction_map))) {
|
||||||
LOG_WARN("fail to handle data table with index", KR(ret), K_(tenant_id), K(stop), K(frozen_scn));
|
LOG_WARN("fail to handle data table with index", KR(ret), K_(tenant_id), K(stop), K(frozen_scn));
|
||||||
} else if (OB_FAIL(remove_not_exist_table(table_ids, table_compaction_map))) {
|
} else if (OB_FAIL(remove_not_exist_table(table_ids, table_compaction_map))) {
|
||||||
LOG_WARN("fail to remove not exist table", KR(ret), K_(tenant_id), K(frozen_scn));
|
LOG_WARN("fail to remove not exist table", KR(ret), K_(tenant_id), K(frozen_scn));
|
||||||
@ -1251,12 +1258,14 @@ int ObIndexChecksumValidator::handle_data_table_with_index(
|
|||||||
const SCN &frozen_scn,
|
const SCN &frozen_scn,
|
||||||
const ObIArray<uint64_t> &table_ids,
|
const ObIArray<uint64_t> &table_ids,
|
||||||
const ObIArray<const ObSimpleTableSchemaV2 *> &table_schemas,
|
const ObIArray<const ObSimpleTableSchemaV2 *> &table_schemas,
|
||||||
|
const ObIArray<uint64_t> &ori_table_ids,
|
||||||
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map)
|
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
SMART_VAR(ObArray<uint64_t>, data_tables_to_update) {
|
SMART_VAR(ObArray<uint64_t>, data_tables_to_update) {
|
||||||
// check data tables with index, return those need to be marked as INDEX_CKM_VERIFIED
|
// check data tables with index, return those need to be marked as INDEX_CKM_VERIFIED
|
||||||
if (OB_FAIL(check_data_table_with_index(table_schemas, table_compaction_map, data_tables_to_update))) {
|
if (OB_FAIL(check_data_table_with_index(table_schemas, table_compaction_map, ori_table_ids,
|
||||||
|
data_tables_to_update))) {
|
||||||
LOG_WARN("fail to check data table with index", KR(ret), K_(tenant_id), K(frozen_scn));
|
LOG_WARN("fail to check data table with index", KR(ret), K_(tenant_id), K(frozen_scn));
|
||||||
}
|
}
|
||||||
// mark data tables whose all index tables finished verification as INDEX_CKM_VERIFIED
|
// mark data tables whose all index tables finished verification as INDEX_CKM_VERIFIED
|
||||||
@ -1280,6 +1289,7 @@ int ObIndexChecksumValidator::handle_data_table_with_index(
|
|||||||
int ObIndexChecksumValidator::check_data_table_with_index(
|
int ObIndexChecksumValidator::check_data_table_with_index(
|
||||||
const ObIArray<const ObSimpleTableSchemaV2 *> &table_schemas,
|
const ObIArray<const ObSimpleTableSchemaV2 *> &table_schemas,
|
||||||
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
||||||
|
const ObIArray<uint64_t> &ori_table_ids,
|
||||||
ObIArray<uint64_t> &data_tables_to_update)
|
ObIArray<uint64_t> &data_tables_to_update)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -1307,9 +1317,9 @@ int ObIndexChecksumValidator::check_data_table_with_index(
|
|||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected error, simple schema is null", KR(ret), K_(tenant_id));
|
LOG_WARN("unexpected error, simple schema is null", KR(ret), K_(tenant_id));
|
||||||
} else if (simple_schema->is_index_table()) {
|
} else if (simple_schema->is_index_table()) {
|
||||||
if (simple_schema->can_read_index()) {
|
const uint64_t index_table_id = simple_schema->get_table_id();
|
||||||
const uint64_t index_table_id = simple_schema->get_table_id();
|
const uint64_t data_table_id = simple_schema->get_data_table_id();
|
||||||
const uint64_t data_table_id = simple_schema->get_data_table_id();
|
if (simple_schema->can_read_index() && has_exist_in_array(ori_table_ids, index_table_id)) {
|
||||||
ObTableCompactionInfo index_table_compaction_info;
|
ObTableCompactionInfo index_table_compaction_info;
|
||||||
if (OB_FAIL(table_compaction_map.get_refactored(index_table_id, index_table_compaction_info))) {
|
if (OB_FAIL(table_compaction_map.get_refactored(index_table_id, index_table_compaction_info))) {
|
||||||
LOG_WARN("fail to get refactored", KR(ret), K(index_table_id));
|
LOG_WARN("fail to get refactored", KR(ret), K(index_table_id));
|
||||||
@ -1322,8 +1332,8 @@ int ObIndexChecksumValidator::check_data_table_with_index(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else { // !simple_schema->can_read_index()
|
} else { // !simple_schema->can_read_index() || !has_exist_in_array(ori_table_ids, index_table_id)
|
||||||
// ignore index table can not read
|
// ignore index table that can not read or does not exist in ori_table_ids
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -70,6 +70,7 @@ public:
|
|||||||
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
|
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
|
||||||
int64_t &table_count,
|
int64_t &table_count,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
||||||
|
const common::ObIArray<uint64_t> &ori_table_ids,
|
||||||
ObMergeTimeStatistics &merge_time_statistics,
|
ObMergeTimeStatistics &merge_time_statistics,
|
||||||
const int64_t expected_epoch);
|
const int64_t expected_epoch);
|
||||||
|
|
||||||
@ -98,6 +99,7 @@ private:
|
|||||||
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
|
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
|
||||||
int64_t &table_count,
|
int64_t &table_count,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
||||||
|
const common::ObIArray<uint64_t> &ori_table_ids,
|
||||||
ObMergeTimeStatistics &merge_time_statistics,
|
ObMergeTimeStatistics &merge_time_statistics,
|
||||||
const int64_t expected_epoch) = 0;
|
const int64_t expected_epoch) = 0;
|
||||||
|
|
||||||
@ -126,6 +128,7 @@ private:
|
|||||||
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
|
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
|
||||||
int64_t &table_count,
|
int64_t &table_count,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
||||||
|
const common::ObIArray<uint64_t> &ori_table_ids,
|
||||||
ObMergeTimeStatistics &merge_time_statistics,
|
ObMergeTimeStatistics &merge_time_statistics,
|
||||||
const int64_t expected_epoch) override;
|
const int64_t expected_epoch) override;
|
||||||
// check whether all tablets of this table finished compaction or not,
|
// check whether all tablets of this table finished compaction or not,
|
||||||
@ -173,6 +176,7 @@ private:
|
|||||||
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
|
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
|
||||||
int64_t &table_count,
|
int64_t &table_count,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
||||||
|
const common::ObIArray<uint64_t> &ori_table_ids,
|
||||||
ObMergeTimeStatistics &merge_time_statistics,
|
ObMergeTimeStatistics &merge_time_statistics,
|
||||||
const int64_t expected_epoch) override;
|
const int64_t expected_epoch) override;
|
||||||
int validate_cross_cluster_checksum(const volatile bool &stop,
|
int validate_cross_cluster_checksum(const volatile bool &stop,
|
||||||
@ -227,6 +231,7 @@ private:
|
|||||||
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
|
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
|
||||||
int64_t &table_count,
|
int64_t &table_count,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
||||||
|
const common::ObIArray<uint64_t> &ori_table_ids,
|
||||||
ObMergeTimeStatistics &merge_time_statistics,
|
ObMergeTimeStatistics &merge_time_statistics,
|
||||||
const int64_t expected_epoch) override;
|
const int64_t expected_epoch) override;
|
||||||
void check_need_validate(const bool is_primary_service, bool &need_validate) const;
|
void check_need_validate(const bool is_primary_service, bool &need_validate) const;
|
||||||
@ -244,11 +249,13 @@ private:
|
|||||||
int handle_data_table_with_index(const volatile bool &stop,
|
int handle_data_table_with_index(const volatile bool &stop,
|
||||||
const share::SCN &frozen_scn,
|
const share::SCN &frozen_scn,
|
||||||
const common::ObIArray<uint64_t> &table_ids,
|
const common::ObIArray<uint64_t> &table_ids,
|
||||||
const ObIArray<const share::schema::ObSimpleTableSchemaV2 *> &table_schemas,
|
const common::ObIArray<const share::schema::ObSimpleTableSchemaV2 *> &table_schemas,
|
||||||
|
const common::ObIArray<uint64_t> &ori_table_ids,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map);
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map);
|
||||||
// check data tables with index, return those need to be marked as INDEX_CKM_VERIFIED
|
// check data tables with index, return those need to be marked as INDEX_CKM_VERIFIED
|
||||||
int check_data_table_with_index(const common::ObIArray<const share::schema::ObSimpleTableSchemaV2 *> &table_schemas,
|
int check_data_table_with_index(const common::ObIArray<const share::schema::ObSimpleTableSchemaV2 *> &table_schemas,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
||||||
|
const common::ObIArray<uint64_t> &ori_table_ids,
|
||||||
common::ObIArray<uint64_t> &data_tables_to_update);
|
common::ObIArray<uint64_t> &data_tables_to_update);
|
||||||
// handle index tables. validate column checksum if needed, and mark index tables as INDEX_CKM_VERIFIED
|
// handle index tables. validate column checksum if needed, and mark index tables as INDEX_CKM_VERIFIED
|
||||||
int handle_index_table(const share::SCN &frozen_scn,
|
int handle_index_table(const share::SCN &frozen_scn,
|
||||||
|
@ -503,14 +503,14 @@ int ObMajorMergeProgressChecker::check_verification(
|
|||||||
} else if (OB_FAIL(cross_cluster_validator_.check_and_set_validate(is_primary_service, global_broadcast_scn))) {
|
} else if (OB_FAIL(cross_cluster_validator_.check_and_set_validate(is_primary_service, global_broadcast_scn))) {
|
||||||
LOG_WARN("fail to check and set validate for cross_cluster_validator", KR(ret), K(global_broadcast_scn));
|
LOG_WARN("fail to check and set validate for cross_cluster_validator", KR(ret), K(global_broadcast_scn));
|
||||||
} else if (OB_FAIL(tablet_validator_.validate_checksum(stop, global_broadcast_scn, tablet_compaction_map_,
|
} else if (OB_FAIL(tablet_validator_.validate_checksum(stop, global_broadcast_scn, tablet_compaction_map_,
|
||||||
table_count_, table_compaction_map_, merge_time_statistics_, expected_epoch))) {
|
table_count_, table_compaction_map_, table_ids_, merge_time_statistics_, expected_epoch))) {
|
||||||
LOG_WARN("fail to validate checksum of tablet validator", KR(ret), K(global_broadcast_scn));
|
LOG_WARN("fail to validate checksum of tablet validator", KR(ret), K(global_broadcast_scn));
|
||||||
} else if (!tablet_compaction_map_.empty()) {
|
} else if (!tablet_compaction_map_.empty()) {
|
||||||
if (OB_FAIL(index_validator_.validate_checksum(stop, global_broadcast_scn,
|
if (OB_FAIL(index_validator_.validate_checksum(stop, global_broadcast_scn,
|
||||||
tablet_compaction_map_, table_count_, table_compaction_map_, merge_time_statistics_, expected_epoch))) {
|
tablet_compaction_map_, table_count_, table_compaction_map_, table_ids_, merge_time_statistics_, expected_epoch))) {
|
||||||
LOG_WARN("fail to validate checksum of index validator", KR(ret), K(global_broadcast_scn));
|
LOG_WARN("fail to validate checksum of index validator", KR(ret), K(global_broadcast_scn));
|
||||||
} else if (OB_FAIL(cross_cluster_validator_.validate_checksum(stop, global_broadcast_scn,
|
} else if (OB_FAIL(cross_cluster_validator_.validate_checksum(stop, global_broadcast_scn,
|
||||||
tablet_compaction_map_, table_count_, table_compaction_map_, merge_time_statistics_, expected_epoch))) {
|
tablet_compaction_map_, table_count_, table_compaction_map_, table_ids_, merge_time_statistics_, expected_epoch))) {
|
||||||
LOG_WARN("fail to validate checksum of cross cluster validator", KR(ret), K(global_broadcast_scn));
|
LOG_WARN("fail to validate checksum of cross cluster validator", KR(ret), K(global_broadcast_scn));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
Reference in New Issue
Block a user