fix traverse full_schema from one schema_guard
This commit is contained in:
@ -139,7 +139,7 @@ bool ObChecksumValidatorBase::exist_in_table_array(
|
|||||||
}
|
}
|
||||||
|
|
||||||
int ObChecksumValidatorBase::get_table_compaction_info(
|
int ObChecksumValidatorBase::get_table_compaction_info(
|
||||||
const ObTableSchema &table_schema,
|
const ObSimpleTableSchemaV2 &simple_schema,
|
||||||
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
||||||
ObTableCompactionInfo &table_compaction_info)
|
ObTableCompactionInfo &table_compaction_info)
|
||||||
{
|
{
|
||||||
@ -148,10 +148,10 @@ int ObChecksumValidatorBase::get_table_compaction_info(
|
|||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("not init", KR(ret), K_(tenant_id));
|
LOG_WARN("not init", KR(ret), K_(tenant_id));
|
||||||
} else if (!table_schema.is_valid()) {
|
} else if (!simple_schema.is_valid()) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", KR(ret), K(table_schema));
|
LOG_WARN("invalid argument", KR(ret), K(simple_schema));
|
||||||
} else if (FALSE_IT(table_id = table_schema.get_table_id())) {
|
} else if (FALSE_IT(table_id = simple_schema.get_table_id())) {
|
||||||
} else if (OB_FAIL(table_compaction_map.get_refactored(table_id, table_compaction_info))) {
|
} else if (OB_FAIL(table_compaction_map.get_refactored(table_id, table_compaction_info))) {
|
||||||
if (OB_HASH_NOT_EXIST == ret) { // first initialization
|
if (OB_HASH_NOT_EXIST == ret) { // first initialization
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
@ -256,18 +256,12 @@ int ObTabletChecksumValidator::check_all_table_verification_finished(
|
|||||||
LOG_WARN("unexpected error, simple schema is null", KR(ret), K_(tenant_id));
|
LOG_WARN("unexpected error, simple schema is null", KR(ret), K_(tenant_id));
|
||||||
} else {
|
} else {
|
||||||
const uint64_t table_id = simple_schema->get_table_id();
|
const uint64_t table_id = simple_schema->get_table_id();
|
||||||
const ObTableSchema *table_schema = nullptr;
|
|
||||||
if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, table_id, table_schema))) {
|
|
||||||
LOG_WARN("fail to get table schema", KR(ret), K_(tenant_id), K(table_id));
|
|
||||||
} else if (OB_ISNULL(table_schema)) {
|
|
||||||
LOG_WARN("table_schema is null", K_(tenant_id), K(table_id), KPC(simple_schema));
|
|
||||||
}
|
|
||||||
// check whether all tablets of this table finished compaction or not, and
|
// check whether all tablets of this table finished compaction or not, and
|
||||||
// execute tablet replica checksum verification if this table has tablet.
|
// execute tablet replica checksum verification if this table has tablet.
|
||||||
else if (OB_FAIL(check_table_compaction_and_validate_checksum(*table_schema, frozen_scn,
|
if (OB_FAIL(check_table_compaction_and_validate_checksum(*simple_schema, frozen_scn,
|
||||||
tablet_compaction_map, table_compaction_map))) {
|
tablet_compaction_map, table_compaction_map))) {
|
||||||
LOG_WARN("fail to check table compaction finished", KR(ret), K(frozen_scn),
|
LOG_WARN("fail to check table compaction finished", KR(ret), K(frozen_scn),
|
||||||
KPC(table_schema));
|
KPC(simple_schema));
|
||||||
}
|
}
|
||||||
if (OB_CHECKSUM_ERROR == ret) {
|
if (OB_CHECKSUM_ERROR == ret) {
|
||||||
check_ret = ret;
|
check_ret = ret;
|
||||||
@ -292,7 +286,7 @@ int ObTabletChecksumValidator::check_all_table_verification_finished(
|
|||||||
// note that, when one table finished compaction, we need to execute tablet_replica
|
// note that, when one table finished compaction, we need to execute tablet_replica
|
||||||
// checksum verification if this table has tablet.
|
// checksum verification if this table has tablet.
|
||||||
int ObTabletChecksumValidator::check_table_compaction_and_validate_checksum(
|
int ObTabletChecksumValidator::check_table_compaction_and_validate_checksum(
|
||||||
const ObTableSchema &table_schema,
|
const ObSimpleTableSchemaV2 &simple_schema,
|
||||||
const SCN &frozen_scn,
|
const SCN &frozen_scn,
|
||||||
const hash::ObHashMap<ObTabletLSPair, ObTabletCompactionStatus> &tablet_compaction_map,
|
const hash::ObHashMap<ObTabletLSPair, ObTabletCompactionStatus> &tablet_compaction_map,
|
||||||
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map)
|
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map)
|
||||||
@ -301,19 +295,19 @@ int ObTabletChecksumValidator::check_table_compaction_and_validate_checksum(
|
|||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
uint64_t table_id = UINT64_MAX;
|
uint64_t table_id = UINT64_MAX;
|
||||||
ObTableCompactionInfo latest_compaction_info;
|
ObTableCompactionInfo latest_compaction_info;
|
||||||
if (OB_FAIL(get_table_compaction_info(table_schema, table_compaction_map, latest_compaction_info))) {
|
if (OB_FAIL(get_table_compaction_info(simple_schema, table_compaction_map, latest_compaction_info))) {
|
||||||
LOG_WARN("fail to get table compaction info", KR(ret), K(table_schema));
|
LOG_WARN("fail to get table compaction info", KR(ret), K(simple_schema));
|
||||||
} else if (FALSE_IT(table_id = latest_compaction_info.table_id_)) {
|
} else if (FALSE_IT(table_id = latest_compaction_info.table_id_)) {
|
||||||
} else if (latest_compaction_info.is_uncompacted()) {
|
} else if (latest_compaction_info.is_uncompacted()) {
|
||||||
SMART_VAR(ObArray<ObTabletID>, tablet_ids) {
|
SMART_VAR(ObArray<ObTabletID>, tablet_ids) {
|
||||||
SMART_VAR(ObArray<ObTabletLSPair>, pairs) {
|
SMART_VAR(ObArray<ObTabletLSPair>, pairs) {
|
||||||
if (table_schema.has_tablet()) {
|
if (simple_schema.has_tablet()) {
|
||||||
FREEZE_TIME_GUARD;
|
FREEZE_TIME_GUARD;
|
||||||
if (OB_FAIL(table_schema.get_tablet_ids(tablet_ids))) {
|
if (OB_FAIL(simple_schema.get_tablet_ids(tablet_ids))) {
|
||||||
LOG_WARN("fail to get tablet_ids from table schema", KR(ret), K(table_schema));
|
LOG_WARN("fail to get tablet_ids from table schema", KR(ret), K(simple_schema));
|
||||||
} else if (OB_UNLIKELY(tablet_ids.count() < 1)) {
|
} else if (OB_UNLIKELY(tablet_ids.count() < 1)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("fail to get tablet_ids of current table schema", KR(ret), K_(tenant_id), K(table_schema));
|
LOG_WARN("fail to get tablet_ids of current table schema", KR(ret), K_(tenant_id), K(simple_schema));
|
||||||
} else if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(tenant_id_,
|
} else if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(tenant_id_,
|
||||||
table_id, *sql_proxy_, tablet_ids, pairs))) {
|
table_id, *sql_proxy_, tablet_ids, pairs))) {
|
||||||
if (OB_LIKELY(OB_ITEM_NOT_MATCH == ret)) {
|
if (OB_LIKELY(OB_ITEM_NOT_MATCH == ret)) {
|
||||||
@ -536,39 +530,34 @@ int ObCrossClusterTabletChecksumValidator::check_all_table_verification_finished
|
|||||||
LOG_WARN("unexpected error, simple schema is null", KR(ret), K_(tenant_id));
|
LOG_WARN("unexpected error, simple schema is null", KR(ret), K_(tenant_id));
|
||||||
} else {
|
} else {
|
||||||
const uint64_t table_id = simple_schema->get_table_id();
|
const uint64_t table_id = simple_schema->get_table_id();
|
||||||
const ObTableSchema *table_schema = nullptr;
|
|
||||||
ObTableCompactionInfo cur_compaction_info;
|
ObTableCompactionInfo cur_compaction_info;
|
||||||
if (OB_FAIL(table_ids.push_back(table_id))) {
|
if (OB_FAIL(table_ids.push_back(table_id))) {
|
||||||
LOG_WARN("fail to push back", KR(ret), K(table_id));
|
LOG_WARN("fail to push back", KR(ret), K(table_id));
|
||||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, table_id, table_schema))) {
|
} else if (OB_FAIL(get_table_compaction_info(*simple_schema, table_compaction_map, cur_compaction_info))) {
|
||||||
LOG_WARN("fail to get table schema", KR(ret), K_(tenant_id), K(table_id));
|
LOG_WARN("fail to get table compaction info", KR(ret), K(frozen_scn), KPC(simple_schema));
|
||||||
} else if (OB_ISNULL(table_schema)) {
|
|
||||||
LOG_WARN("table_schema is null", K_(tenant_id), K(table_id), KPC(simple_schema));
|
|
||||||
} else if (OB_FAIL(get_table_compaction_info(*table_schema, table_compaction_map, cur_compaction_info))) {
|
|
||||||
LOG_WARN("fail to get table compaction info", KR(ret), K(frozen_scn), KPC(table_schema));
|
|
||||||
} else if (cur_compaction_info.is_verified()) { // already finished verification, skip it!
|
} else if (cur_compaction_info.is_verified()) { // already finished verification, skip it!
|
||||||
} else if (table_schema->has_tablet()) {
|
} else if (simple_schema->has_tablet()) {
|
||||||
if (cur_compaction_info.is_index_ckm_verified()) {
|
if (cur_compaction_info.is_index_ckm_verified()) {
|
||||||
if (need_validate()) { // need to validate cross-cluster checksum
|
if (need_validate()) { // need to validate cross-cluster checksum
|
||||||
if (OB_FAIL(validate_cross_cluster_checksum(stop, frozen_scn, expected_epoch,
|
if (OB_FAIL(validate_cross_cluster_checksum(stop, frozen_scn, expected_epoch,
|
||||||
table_schema, table_compaction_map, merge_time_statistics))) {
|
simple_schema, table_compaction_map, merge_time_statistics))) {
|
||||||
LOG_WARN("fail to validate cross-cluster checksum", KR(ret), K(stop),
|
LOG_WARN("fail to validate cross-cluster checksum", KR(ret), K(stop),
|
||||||
K(frozen_scn), K(expected_epoch), K(table_id));
|
K(frozen_scn), K(expected_epoch), K(table_id));
|
||||||
}
|
}
|
||||||
} else { // no need to validate cross-cluster checksum
|
} else { // no need to validate cross-cluster checksum
|
||||||
if (OB_FAIL(handle_table_verification_finished(stop, table_schema, frozen_scn,
|
if (OB_FAIL(handle_table_verification_finished(stop, simple_schema, frozen_scn,
|
||||||
table_compaction_map, merge_time_statistics, expected_epoch))) {
|
table_compaction_map, merge_time_statistics, expected_epoch))) {
|
||||||
LOG_WARN("fail to handle table verification finished", KR(ret), K_(tenant_id),
|
LOG_WARN("fail to handle table verification finished", KR(ret), K_(tenant_id),
|
||||||
K(frozen_scn), KPC(table_schema));
|
K(frozen_scn), KPC(simple_schema));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else { // like VIEW that has no tablet, update report_scn for this table and mark it as VERIFIED
|
} else { // like VIEW that has no tablet, update report_scn for this table and mark it as VERIFIED
|
||||||
if (cur_compaction_info.is_index_ckm_verified()) {
|
if (cur_compaction_info.is_index_ckm_verified()) {
|
||||||
if (OB_FAIL(handle_table_verification_finished(stop, table_schema, frozen_scn,
|
if (OB_FAIL(handle_table_verification_finished(stop, simple_schema, frozen_scn,
|
||||||
table_compaction_map, merge_time_statistics, expected_epoch))) {
|
table_compaction_map, merge_time_statistics, expected_epoch))) {
|
||||||
LOG_WARN("fail to handle table verification finished", KR(ret), K_(tenant_id),
|
LOG_WARN("fail to handle table verification finished", KR(ret), K_(tenant_id),
|
||||||
K(frozen_scn), KPC(table_schema));
|
K(frozen_scn), KPC(simple_schema));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -605,15 +594,15 @@ int ObCrossClusterTabletChecksumValidator::validate_cross_cluster_checksum(
|
|||||||
const volatile bool &stop,
|
const volatile bool &stop,
|
||||||
const SCN &frozen_scn,
|
const SCN &frozen_scn,
|
||||||
const int64_t expected_epoch,
|
const int64_t expected_epoch,
|
||||||
const ObTableSchema *table_schema,
|
const ObSimpleTableSchemaV2 *simple_schema,
|
||||||
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
||||||
ObMergeTimeStatistics &merge_time_statistics)
|
ObMergeTimeStatistics &merge_time_statistics)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
if (OB_UNLIKELY(!frozen_scn.is_valid() || expected_epoch < 0 || OB_ISNULL(table_schema))) {
|
if (OB_UNLIKELY(!frozen_scn.is_valid() || expected_epoch < 0 || OB_ISNULL(simple_schema))) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", KR(ret), K(frozen_scn), K(expected_epoch), KP(table_schema));
|
LOG_WARN("invalid argument", KR(ret), K(frozen_scn), K(expected_epoch), KP(simple_schema));
|
||||||
} else {
|
} else {
|
||||||
// check whether waiting all tablet checksum has timed out
|
// check whether waiting all tablet checksum has timed out
|
||||||
bool is_wait_tablet_checksum_timeout = check_waiting_tablet_checksum_timeout();
|
bool is_wait_tablet_checksum_timeout = check_waiting_tablet_checksum_timeout();
|
||||||
@ -630,28 +619,28 @@ int ObCrossClusterTabletChecksumValidator::validate_cross_cluster_checksum(
|
|||||||
tenant_id_, frozen_scn, is_exist))) {
|
tenant_id_, frozen_scn, is_exist))) {
|
||||||
LOG_WARN("fail to check is first tablet in first ls exist", KR(ret), K_(tenant_id), K(frozen_scn));
|
LOG_WARN("fail to check is first tablet in first ls exist", KR(ret), K_(tenant_id), K(frozen_scn));
|
||||||
} else if (is_exist || is_wait_tablet_checksum_timeout) { // all tablet checksum exist or timeout
|
} else if (is_exist || is_wait_tablet_checksum_timeout) { // all tablet checksum exist or timeout
|
||||||
if (OB_FAIL(check_cross_cluster_checksum(*table_schema, frozen_scn))) {
|
if (OB_FAIL(check_cross_cluster_checksum(*simple_schema, frozen_scn))) {
|
||||||
if (OB_ITEM_NOT_MATCH == ret) {
|
if (OB_ITEM_NOT_MATCH == ret) {
|
||||||
if (OB_TMP_FAIL(handle_table_can_not_verify(table_schema->get_table_id(), table_compaction_map))) {
|
if (OB_TMP_FAIL(handle_table_can_not_verify(simple_schema->get_table_id(), table_compaction_map))) {
|
||||||
LOG_WARN("fail to handle table can not verify", KR(ret), "table_id", table_schema->get_table_id());
|
LOG_WARN("fail to handle table can not verify", KR(ret), "table_id", simple_schema->get_table_id());
|
||||||
} else {
|
} else {
|
||||||
ret = OB_SUCCESS; // ignore ret
|
ret = OB_SUCCESS; // ignore ret
|
||||||
}
|
}
|
||||||
} else if (OB_CHECKSUM_ERROR == ret) {
|
} else if (OB_CHECKSUM_ERROR == ret) {
|
||||||
LOG_DBA_ERROR(OB_CHECKSUM_ERROR, "msg", "ERROR! ERROR! ERROR! checksum error in "
|
LOG_DBA_ERROR(OB_CHECKSUM_ERROR, "msg", "ERROR! ERROR! ERROR! checksum error in "
|
||||||
"cross-cluster checksum", KR(ret), K_(tenant_id), K(frozen_scn), KPC(table_schema));
|
"cross-cluster checksum", KR(ret), K_(tenant_id), K(frozen_scn), KPC(simple_schema));
|
||||||
} else {
|
} else {
|
||||||
LOG_WARN("fail to check cross-cluster checksum", KR(ret), K_(tenant_id),
|
LOG_WARN("fail to check cross-cluster checksum", KR(ret), K_(tenant_id),
|
||||||
K(frozen_scn), KPC(table_schema));
|
K(frozen_scn), KPC(simple_schema));
|
||||||
}
|
}
|
||||||
} else if (OB_FAIL(handle_table_verification_finished(stop, table_schema, frozen_scn,
|
} else if (OB_FAIL(handle_table_verification_finished(stop, simple_schema, frozen_scn,
|
||||||
table_compaction_map, merge_time_statistics, expected_epoch))) {
|
table_compaction_map, merge_time_statistics, expected_epoch))) {
|
||||||
LOG_WARN("fail to handle table verification finished", KR(ret), K_(tenant_id),
|
LOG_WARN("fail to handle table verification finished", KR(ret), K_(tenant_id),
|
||||||
K(frozen_scn), KPC(table_schema));
|
K(frozen_scn), KPC(simple_schema));
|
||||||
}
|
}
|
||||||
} else if (TC_REACH_TIME_INTERVAL(10 * 60 * 1000 * 1000)) { // 10 min
|
} else if (TC_REACH_TIME_INTERVAL(10 * 60 * 1000 * 1000)) { // 10 min
|
||||||
LOG_WARN("can not check cross-cluster checksum now, please wait until first tablet "
|
LOG_WARN("can not check cross-cluster checksum now, please wait until first tablet "
|
||||||
"in sys ls exists", K_(tenant_id), K(frozen_scn), KPC(table_schema),
|
"in sys ls exists", K_(tenant_id), K(frozen_scn), KPC(simple_schema),
|
||||||
K_(major_merge_start_us), "current_time_us", ObTimeUtil::current_time());
|
K_(major_merge_start_us), "current_time_us", ObTimeUtil::current_time());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -659,30 +648,30 @@ int ObCrossClusterTabletChecksumValidator::validate_cross_cluster_checksum(
|
|||||||
}
|
}
|
||||||
|
|
||||||
int ObCrossClusterTabletChecksumValidator::check_cross_cluster_checksum(
|
int ObCrossClusterTabletChecksumValidator::check_cross_cluster_checksum(
|
||||||
const ObTableSchema &table_schema,
|
const ObSimpleTableSchemaV2 &simple_schema,
|
||||||
const SCN &frozen_scn)
|
const SCN &frozen_scn)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_UNLIKELY(!table_schema.has_tablet() || !frozen_scn.is_valid() || frozen_scn < SCN::min_scn())) {
|
if (OB_UNLIKELY(!simple_schema.has_tablet() || !frozen_scn.is_valid() || frozen_scn < SCN::min_scn())) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", KR(ret), K(table_schema), K(frozen_scn));
|
LOG_WARN("invalid argument", KR(ret), K(simple_schema), K(frozen_scn));
|
||||||
} else {
|
} else {
|
||||||
SMART_VARS_2((ObArray<ObTabletID>, tablet_ids), (ObArray<ObTabletLSPair>, pairs)) {
|
SMART_VARS_2((ObArray<ObTabletID>, tablet_ids), (ObArray<ObTabletLSPair>, pairs)) {
|
||||||
FREEZE_TIME_GUARD;
|
FREEZE_TIME_GUARD;
|
||||||
if (OB_FAIL(table_schema.get_tablet_ids(tablet_ids))) {
|
if (OB_FAIL(simple_schema.get_tablet_ids(tablet_ids))) {
|
||||||
LOG_WARN("fail to get tablet_ids from table schema", KR(ret), K(table_schema));
|
LOG_WARN("fail to get tablet_ids from table schema", KR(ret), K(simple_schema));
|
||||||
} else if (OB_UNLIKELY(tablet_ids.count() < 1)) {
|
} else if (OB_UNLIKELY(tablet_ids.count() < 1)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("fail to get tablet_ids of current table schema", KR(ret), K_(tenant_id), K(table_schema));
|
LOG_WARN("fail to get tablet_ids of current table schema", KR(ret), K_(tenant_id), K(simple_schema));
|
||||||
} else if (FALSE_IT(sort_tablet_ids(tablet_ids))) { // tablet_ids should be in order
|
} else if (FALSE_IT(sort_tablet_ids(tablet_ids))) { // tablet_ids should be in order
|
||||||
} else if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(tenant_id_,
|
} else if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(tenant_id_,
|
||||||
table_schema.get_table_id(), *sql_proxy_, tablet_ids, pairs))) {
|
simple_schema.get_table_id(), *sql_proxy_, tablet_ids, pairs))) {
|
||||||
LOG_WARN("fail to get tablet_ls pairs", KR(ret), K_(tenant_id), "table_id",
|
LOG_WARN("fail to get tablet_ls pairs", KR(ret), K_(tenant_id), "table_id",
|
||||||
table_schema.get_table_id());
|
simple_schema.get_table_id());
|
||||||
} else if (OB_UNLIKELY(pairs.count() < 1)) {
|
} else if (OB_UNLIKELY(pairs.count() < 1)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("fail to get tablet_ls pairs of current table schema", KR(ret),
|
LOG_WARN("fail to get tablet_ls pairs of current table schema", KR(ret),
|
||||||
K_(tenant_id), "table_id", table_schema.get_table_id(), K(tablet_ids));
|
K_(tenant_id), "table_id", simple_schema.get_table_id(), K(tablet_ids));
|
||||||
} else {
|
} else {
|
||||||
SMART_VARS_2((ObArray<ObTabletReplicaChecksumItem>, tablet_replica_checksum_items),
|
SMART_VARS_2((ObArray<ObTabletReplicaChecksumItem>, tablet_replica_checksum_items),
|
||||||
(ObArray<ObTabletChecksumItem>, tablet_checksum_items)) {
|
(ObArray<ObTabletChecksumItem>, tablet_checksum_items)) {
|
||||||
@ -701,10 +690,10 @@ int ObCrossClusterTabletChecksumValidator::check_cross_cluster_checksum(
|
|||||||
} else if (OB_FAIL(check_column_checksum(tablet_replica_checksum_items, tablet_checksum_items))) {
|
} else if (OB_FAIL(check_column_checksum(tablet_replica_checksum_items, tablet_checksum_items))) {
|
||||||
if (OB_CHECKSUM_ERROR == ret) {
|
if (OB_CHECKSUM_ERROR == ret) {
|
||||||
LOG_ERROR("ERROR! ERROR! ERROR! checksum error in cross-cluster checksum", KR(ret),
|
LOG_ERROR("ERROR! ERROR! ERROR! checksum error in cross-cluster checksum", KR(ret),
|
||||||
K_(tenant_id), K(frozen_scn), K(table_schema));
|
K_(tenant_id), K(frozen_scn), K(simple_schema));
|
||||||
} else {
|
} else {
|
||||||
LOG_WARN("fail to check cross-cluster checksum", KR(ret), K_(tenant_id),
|
LOG_WARN("fail to check cross-cluster checksum", KR(ret), K_(tenant_id),
|
||||||
K(frozen_scn), K(table_schema));
|
K(frozen_scn), K(simple_schema));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -786,7 +775,7 @@ bool ObCrossClusterTabletChecksumValidator::check_waiting_tablet_checksum_timeou
|
|||||||
// If one table finished cross-cluster checksum verification, update report_scn and then mark it as VERIFIED
|
// If one table finished cross-cluster checksum verification, update report_scn and then mark it as VERIFIED
|
||||||
int ObCrossClusterTabletChecksumValidator::handle_table_verification_finished(
|
int ObCrossClusterTabletChecksumValidator::handle_table_verification_finished(
|
||||||
const volatile bool &stop,
|
const volatile bool &stop,
|
||||||
const ObTableSchema *table_schema,
|
const ObSimpleTableSchemaV2 *simple_schema,
|
||||||
const SCN &frozen_scn,
|
const SCN &frozen_scn,
|
||||||
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
||||||
ObMergeTimeStatistics &merge_time_statistics,
|
ObMergeTimeStatistics &merge_time_statistics,
|
||||||
@ -795,11 +784,11 @@ int ObCrossClusterTabletChecksumValidator::handle_table_verification_finished(
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
bool is_containing = false;
|
bool is_containing = false;
|
||||||
if (OB_ISNULL(table_schema)) {
|
if (OB_ISNULL(simple_schema)) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", KR(ret));
|
LOG_WARN("invalid argument", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
const uint64_t table_id = table_schema->get_table_id();
|
const uint64_t table_id = simple_schema->get_table_id();
|
||||||
ObTableCompactionInfo cur_compaction_info;
|
ObTableCompactionInfo cur_compaction_info;
|
||||||
if (OB_FAIL(table_compaction_map.get_refactored(table_id, cur_compaction_info))) {
|
if (OB_FAIL(table_compaction_map.get_refactored(table_id, cur_compaction_info))) {
|
||||||
LOG_WARN("fail to get refactored", KR(ret), K(table_id));
|
LOG_WARN("fail to get refactored", KR(ret), K(table_id));
|
||||||
@ -811,10 +800,10 @@ int ObCrossClusterTabletChecksumValidator::handle_table_verification_finished(
|
|||||||
} else {
|
} else {
|
||||||
// set it to false, if succ to handle_table_can_not_verify
|
// set it to false, if succ to handle_table_can_not_verify
|
||||||
bool need_update_map = true;
|
bool need_update_map = true;
|
||||||
if (table_schema->has_tablet()) {
|
if (simple_schema->has_tablet()) {
|
||||||
SMART_VAR(ObArray<ObTabletLSPair>, pairs) {
|
SMART_VAR(ObArray<ObTabletLSPair>, pairs) {
|
||||||
FREEZE_TIME_GUARD;
|
FREEZE_TIME_GUARD;
|
||||||
if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(tenant_id_, *table_schema, *sql_proxy_, pairs))) {
|
if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(tenant_id_, *simple_schema, *sql_proxy_, pairs))) {
|
||||||
if (OB_LIKELY(OB_ITEM_NOT_MATCH == ret)) {
|
if (OB_LIKELY(OB_ITEM_NOT_MATCH == ret)) {
|
||||||
if (OB_TMP_FAIL(handle_table_can_not_verify(table_id, table_compaction_map))) {
|
if (OB_TMP_FAIL(handle_table_can_not_verify(table_id, table_compaction_map))) {
|
||||||
LOG_WARN("fail to handle table can not verify", KR(tmp_ret), K(table_id));
|
LOG_WARN("fail to handle table can not verify", KR(tmp_ret), K(table_id));
|
||||||
@ -1131,23 +1120,18 @@ int ObIndexChecksumValidator::check_all_table_verification_finished(
|
|||||||
LOG_WARN("unexpected error, simple schema is null", KR(ret), K_(tenant_id));
|
LOG_WARN("unexpected error, simple schema is null", KR(ret), K_(tenant_id));
|
||||||
} else {
|
} else {
|
||||||
const uint64_t table_id = simple_schema->get_table_id();
|
const uint64_t table_id = simple_schema->get_table_id();
|
||||||
const ObTableSchema *table_schema = nullptr;
|
|
||||||
ObTableCompactionInfo cur_compaction_info;
|
ObTableCompactionInfo cur_compaction_info;
|
||||||
if (OB_FAIL(table_ids.push_back(table_id))) {
|
if (OB_FAIL(table_ids.push_back(table_id))) {
|
||||||
LOG_WARN("fail to push back", KR(ret), K(table_id));
|
LOG_WARN("fail to push back", KR(ret), K(table_id));
|
||||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, table_id, table_schema))) {
|
} else if (OB_FAIL(get_table_compaction_info(*simple_schema, table_compaction_map, cur_compaction_info))) {
|
||||||
LOG_WARN("fail to get table schema", KR(ret), K_(tenant_id), K(table_id));
|
LOG_WARN("fail to get table compaction info", KR(ret), K(frozen_scn), KPC(simple_schema));
|
||||||
} else if (OB_ISNULL(table_schema)) {
|
|
||||||
LOG_WARN("table_schema is null", K_(tenant_id), K(table_id), KPC(simple_schema));
|
|
||||||
} else if (OB_FAIL(get_table_compaction_info(*table_schema, table_compaction_map, cur_compaction_info))) {
|
|
||||||
LOG_WARN("fail to get table compaction info", KR(ret), K(frozen_scn), KPC(table_schema));
|
|
||||||
} else if (cur_compaction_info.is_index_ckm_verified()
|
} else if (cur_compaction_info.is_index_ckm_verified()
|
||||||
|| cur_compaction_info.is_verified()) { // already finished verification, skip it!
|
|| cur_compaction_info.is_verified()) { // already finished verification, skip it!
|
||||||
} else if (simple_schema->is_index_table()) {
|
} else if (simple_schema->is_index_table()) {
|
||||||
if (simple_schema->can_read_index()) {
|
if (simple_schema->can_read_index()) {
|
||||||
// 1. for index table can read, may need to check column checksum
|
// 1. for index table can read, may need to check column checksum
|
||||||
if (OB_FAIL(handle_index_table(frozen_scn, cur_compaction_info, table_schema,
|
if (OB_FAIL(handle_index_table(frozen_scn, cur_compaction_info, simple_schema,
|
||||||
simple_schema, schema_guard, table_compaction_map, expected_epoch))) {
|
schema_guard, table_compaction_map, expected_epoch))) {
|
||||||
LOG_WARN("fail to handle index table", KR(ret), K(frozen_scn), K(simple_schema), K(expected_epoch));
|
LOG_WARN("fail to handle index table", KR(ret), K(frozen_scn), K(simple_schema), K(expected_epoch));
|
||||||
}
|
}
|
||||||
} else { // !simple_schema->can_read_index()
|
} else { // !simple_schema->can_read_index()
|
||||||
@ -1160,17 +1144,9 @@ int ObIndexChecksumValidator::check_all_table_verification_finished(
|
|||||||
LOG_WARN("fail to handle table can not verify", KR(ret));
|
LOG_WARN("fail to handle table can not verify", KR(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
if (table_schema->get_index_tid_count() < 1) { // handle data table, meanwhile not have relative index table
|
|
||||||
if (cur_compaction_info.finish_compaction()) {
|
|
||||||
if (OB_FAIL(handle_table_verification_finished(table_id, frozen_scn, table_compaction_map))) {
|
|
||||||
LOG_WARN("fail to handle table verification finished", KR(ret), K(table_id), K(frozen_scn));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
if (OB_TMP_FAIL(try_print_first_unverified_info(simple_schema, table_schema, table_compaction_map, already_print))) {
|
if (OB_TMP_FAIL(try_print_first_unverified_info(simple_schema, table_schemas, table_compaction_map, already_print))) {
|
||||||
LOG_WARN("fail to try print first unverified info", KR(tmp_ret), K(table_id));
|
LOG_WARN("fail to try print first unverified info", KR(tmp_ret), K(table_id));
|
||||||
}
|
}
|
||||||
if (OB_CHECKSUM_ERROR == ret) {
|
if (OB_CHECKSUM_ERROR == ret) {
|
||||||
@ -1185,10 +1161,10 @@ int ObIndexChecksumValidator::check_all_table_verification_finished(
|
|||||||
} // end for loop
|
} // end for loop
|
||||||
|
|
||||||
if (OB_SUCC(ret) && (OB_SUCCESS == check_ret)) {
|
if (OB_SUCC(ret) && (OB_SUCCESS == check_ret)) {
|
||||||
// for data table with index, if all its index tables finished verification,
|
// 1. data table with index: if all its index tables finished verification, mark it as INDEX_CKM_VERIFIED.
|
||||||
// then mark it as INDEX_CKM_VERIFIED.
|
// 2. data table without index: if it finished compaction, directly mark it as INDEX_CKM_VERIFIED.
|
||||||
if (OB_FAIL(handle_data_table_with_index(stop, frozen_scn, table_ids, table_schemas,
|
// 3. other types of tables that are not index: if it finished compaction, directly mark it as INDEX_CKM_VERIFIED.
|
||||||
ori_table_ids, table_compaction_map))) {
|
if (OB_FAIL(handle_data_table(stop, frozen_scn, table_ids, table_schemas, ori_table_ids, table_compaction_map))) {
|
||||||
LOG_WARN("fail to handle data table with index", KR(ret), K_(tenant_id), K(stop), K(frozen_scn));
|
LOG_WARN("fail to handle data table with index", KR(ret), K_(tenant_id), K(stop), K(frozen_scn));
|
||||||
} else if (OB_FAIL(remove_not_exist_table(table_ids, table_compaction_map))) {
|
} else if (OB_FAIL(remove_not_exist_table(table_ids, table_compaction_map))) {
|
||||||
LOG_WARN("fail to remove not exist table", KR(ret), K_(tenant_id), K(frozen_scn));
|
LOG_WARN("fail to remove not exist table", KR(ret), K_(tenant_id), K(frozen_scn));
|
||||||
@ -1253,7 +1229,7 @@ int ObIndexChecksumValidator::handle_table_verification_finished(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObIndexChecksumValidator::handle_data_table_with_index(
|
int ObIndexChecksumValidator::handle_data_table(
|
||||||
const volatile bool &stop,
|
const volatile bool &stop,
|
||||||
const SCN &frozen_scn,
|
const SCN &frozen_scn,
|
||||||
const ObIArray<uint64_t> &table_ids,
|
const ObIArray<uint64_t> &table_ids,
|
||||||
@ -1263,12 +1239,11 @@ int ObIndexChecksumValidator::handle_data_table_with_index(
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
SMART_VAR(ObArray<uint64_t>, data_tables_to_update) {
|
SMART_VAR(ObArray<uint64_t>, data_tables_to_update) {
|
||||||
// check data tables with index, return those need to be marked as INDEX_CKM_VERIFIED
|
// check tables that are not index table, return those need to be marked as INDEX_CKM_VERIFIED
|
||||||
if (OB_FAIL(check_data_table_with_index(table_schemas, table_compaction_map, ori_table_ids,
|
if (OB_FAIL(check_data_table(table_schemas, table_compaction_map, ori_table_ids, data_tables_to_update))) {
|
||||||
data_tables_to_update))) {
|
|
||||||
LOG_WARN("fail to check data table with index", KR(ret), K_(tenant_id), K(frozen_scn));
|
LOG_WARN("fail to check data table with index", KR(ret), K_(tenant_id), K(frozen_scn));
|
||||||
}
|
}
|
||||||
// mark data tables whose all index tables finished verification as INDEX_CKM_VERIFIED
|
// mark tables exist in @data_tables_to_update as INDEX_CKM_VERIFIED
|
||||||
hash::ObHashMap<uint64_t, ObTableCompactionInfo>::iterator iter = table_compaction_map.begin();
|
hash::ObHashMap<uint64_t, ObTableCompactionInfo>::iterator iter = table_compaction_map.begin();
|
||||||
for (; !stop && OB_SUCC(ret) && (iter != table_compaction_map.end()); ++iter) {
|
for (; !stop && OB_SUCC(ret) && (iter != table_compaction_map.end()); ++iter) {
|
||||||
const uint64_t cur_table_id = iter->first;
|
const uint64_t cur_table_id = iter->first;
|
||||||
@ -1286,7 +1261,7 @@ int ObIndexChecksumValidator::handle_data_table_with_index(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObIndexChecksumValidator::check_data_table_with_index(
|
int ObIndexChecksumValidator::check_data_table(
|
||||||
const ObIArray<const ObSimpleTableSchemaV2 *> &table_schemas,
|
const ObIArray<const ObSimpleTableSchemaV2 *> &table_schemas,
|
||||||
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
||||||
const ObIArray<uint64_t> &ori_table_ids,
|
const ObIArray<uint64_t> &ori_table_ids,
|
||||||
@ -1295,22 +1270,26 @@ int ObIndexChecksumValidator::check_data_table_with_index(
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
data_tables_to_update.reset();
|
data_tables_to_update.reset();
|
||||||
int64_t table_count = table_schemas.count();
|
int64_t table_count = table_schemas.count();
|
||||||
// push_back data tables with index
|
// push_back data tables with index, data tables without index, and other types of non-index
|
||||||
|
// tables that finished compaction into @data_tables_to_update
|
||||||
for (int64_t i = 0; (i < table_count) && OB_SUCC(ret); ++i) {
|
for (int64_t i = 0; (i < table_count) && OB_SUCC(ret); ++i) {
|
||||||
const ObSimpleTableSchemaV2 *simple_schema = table_schemas.at(i);
|
const ObSimpleTableSchemaV2 *simple_schema = table_schemas.at(i);
|
||||||
if (OB_ISNULL(simple_schema)) {
|
if (OB_ISNULL(simple_schema)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected error, simple schema is null", KR(ret), K_(tenant_id));
|
LOG_WARN("unexpected error, simple schema is null", KR(ret), K_(tenant_id));
|
||||||
} else if (simple_schema->is_index_table()) {
|
} else if (!simple_schema->is_index_table()) { // data table
|
||||||
const uint64_t data_table_id = simple_schema->get_data_table_id();
|
const uint64_t table_id = simple_schema->get_table_id();
|
||||||
if (!has_exist_in_array(data_tables_to_update, data_table_id)) {
|
ObTableCompactionInfo table_compaction_info;
|
||||||
if (OB_FAIL(data_tables_to_update.push_back(data_table_id))) {
|
if (OB_FAIL(table_compaction_map.get_refactored(table_id, table_compaction_info))) {
|
||||||
LOG_WARN("fail to push back", KR(ret), K(data_table_id));
|
LOG_WARN("fail to get refactored", KR(ret), K(table_id));
|
||||||
|
} else if (table_compaction_info.finish_compaction()) {
|
||||||
|
if (OB_FAIL(data_tables_to_update.push_back(table_id))) {
|
||||||
|
LOG_WARN("fail to push back", KR(ret), K(table_id));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// remove data tables whose index tables do not finish verification
|
// remove data tables whose index tables do not finish verification from @data_tables_to_update
|
||||||
for (int64_t i = 0; (i < table_count) && OB_SUCC(ret); ++i) {
|
for (int64_t i = 0; (i < table_count) && OB_SUCC(ret); ++i) {
|
||||||
const ObSimpleTableSchemaV2 *simple_schema = table_schemas.at(i);
|
const ObSimpleTableSchemaV2 *simple_schema = table_schemas.at(i);
|
||||||
if (OB_ISNULL(simple_schema)) {
|
if (OB_ISNULL(simple_schema)) {
|
||||||
@ -1343,30 +1322,29 @@ int ObIndexChecksumValidator::check_data_table_with_index(
|
|||||||
int ObIndexChecksumValidator::handle_index_table(
|
int ObIndexChecksumValidator::handle_index_table(
|
||||||
const SCN &frozen_scn,
|
const SCN &frozen_scn,
|
||||||
const ObTableCompactionInfo &index_compaction_info,
|
const ObTableCompactionInfo &index_compaction_info,
|
||||||
const ObTableSchema *index_table_schema,
|
const ObSimpleTableSchemaV2 *index_simple_schema,
|
||||||
const ObSimpleTableSchemaV2 *simple_schema,
|
|
||||||
ObSchemaGetterGuard &schema_guard,
|
ObSchemaGetterGuard &schema_guard,
|
||||||
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
||||||
const int64_t expected_epoch)
|
const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
if (OB_UNLIKELY(!frozen_scn.is_valid() || OB_ISNULL(index_table_schema) || OB_ISNULL(simple_schema))) {
|
if (OB_UNLIKELY(!frozen_scn.is_valid() || OB_ISNULL(index_simple_schema))) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", KR(ret), K_(tenant_id), K(frozen_scn), KP(index_table_schema), KP(simple_schema));
|
LOG_WARN("invalid argument", KR(ret), K_(tenant_id), K(frozen_scn), KP(index_simple_schema));
|
||||||
} else {
|
} else {
|
||||||
const uint64_t index_table_id = simple_schema->get_table_id();
|
const uint64_t index_table_id = index_simple_schema->get_table_id();
|
||||||
const uint64_t data_table_id = simple_schema->get_data_table_id();
|
const uint64_t data_table_id = index_simple_schema->get_data_table_id();
|
||||||
const ObTableSchema *data_table_schema = nullptr;
|
const ObSimpleTableSchemaV2 *data_simple_schema = nullptr;
|
||||||
if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, data_table_id, data_table_schema))) {
|
if (OB_FAIL(schema_guard.get_simple_table_schema(tenant_id_, data_table_id, data_simple_schema))) {
|
||||||
LOG_WARN("fail to get table schema", KR(ret), K_(tenant_id), K(data_table_id));
|
LOG_WARN("fail to get table schema", KR(ret), K_(tenant_id), K(data_table_id));
|
||||||
} else if (OB_ISNULL(data_table_schema)) {
|
} else if (OB_ISNULL(data_simple_schema)) {
|
||||||
ret = OB_TABLE_NOT_EXIST;
|
ret = OB_TABLE_NOT_EXIST;
|
||||||
LOG_WARN("fail to get data table schema", KR(ret), K_(tenant_id), K(index_table_id), K(data_table_id));
|
LOG_WARN("fail to get data table schema", KR(ret), K_(tenant_id), K(index_table_id), K(data_table_id));
|
||||||
} else {
|
} else {
|
||||||
ObTableCompactionInfo data_compaction_info;
|
ObTableCompactionInfo data_compaction_info;
|
||||||
if (OB_FAIL(get_table_compaction_info(*data_table_schema, table_compaction_map, data_compaction_info))) {
|
if (OB_FAIL(get_table_compaction_info(*data_simple_schema, table_compaction_map, data_compaction_info))) {
|
||||||
LOG_WARN("fail to get table compaction info", KR(ret), K(frozen_scn), KPC(data_table_schema));
|
LOG_WARN("fail to get table compaction info", KR(ret), K(frozen_scn), KPC(data_simple_schema));
|
||||||
} else if (data_compaction_info.is_index_ckm_verified() || data_compaction_info.is_verified()) {
|
} else if (data_compaction_info.is_index_ckm_verified() || data_compaction_info.is_verified()) {
|
||||||
// if a data table finished verification, then create index on this data table.
|
// if a data table finished verification, then create index on this data table.
|
||||||
// we should skip verification for this index table, cuz the data table may already
|
// we should skip verification for this index table, cuz the data table may already
|
||||||
@ -1379,7 +1357,7 @@ int ObIndexChecksumValidator::handle_index_table(
|
|||||||
LOG_WARN("fail to handle index table compaction finished", KR(ret), K(index_table_id), K(frozen_scn));
|
LOG_WARN("fail to handle index table compaction finished", KR(ret), K(index_table_id), K(frozen_scn));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (index_table_schema->has_tablet()) {
|
} else if (index_simple_schema->has_tablet()) {
|
||||||
if (!index_compaction_info.finish_compaction() || !data_compaction_info.finish_compaction()) {
|
if (!index_compaction_info.finish_compaction() || !data_compaction_info.finish_compaction()) {
|
||||||
} else if (index_compaction_info.is_compacted() && data_compaction_info.is_compacted()) {
|
} else if (index_compaction_info.is_compacted() && data_compaction_info.is_compacted()) {
|
||||||
#ifdef ERRSIM
|
#ifdef ERRSIM
|
||||||
@ -1399,8 +1377,8 @@ int ObIndexChecksumValidator::handle_index_table(
|
|||||||
if (need_validate()) {
|
if (need_validate()) {
|
||||||
FREEZE_TIME_GUARD;
|
FREEZE_TIME_GUARD;
|
||||||
if (FAILEDx(ObTabletReplicaChecksumOperator::check_column_checksum(tenant_id_,
|
if (FAILEDx(ObTabletReplicaChecksumOperator::check_column_checksum(tenant_id_,
|
||||||
*data_table_schema, *index_table_schema, frozen_scn, *sql_proxy_, expected_epoch))) {
|
*data_simple_schema, *index_simple_schema, frozen_scn, *sql_proxy_, expected_epoch))) {
|
||||||
if (OB_ITEM_NOT_MATCH == ret) {
|
if ((OB_ITEM_NOT_MATCH == ret) || (OB_TABLE_NOT_EXIST == ret)) {
|
||||||
if (OB_TMP_FAIL(handle_table_can_not_verify(index_table_id, table_compaction_map))) {
|
if (OB_TMP_FAIL(handle_table_can_not_verify(index_table_id, table_compaction_map))) {
|
||||||
LOG_WARN("fail to handle table can not verify", KR(tmp_ret), K(index_table_id));
|
LOG_WARN("fail to handle table can not verify", KR(tmp_ret), K(index_table_id));
|
||||||
} else {
|
} else {
|
||||||
@ -1409,10 +1387,10 @@ int ObIndexChecksumValidator::handle_index_table(
|
|||||||
}
|
}
|
||||||
} else if (OB_CHECKSUM_ERROR == ret) {
|
} else if (OB_CHECKSUM_ERROR == ret) {
|
||||||
LOG_DBA_ERROR(OB_CHECKSUM_ERROR, "msg", "ERROR! ERROR! ERROR! checksum error in index checksum",
|
LOG_DBA_ERROR(OB_CHECKSUM_ERROR, "msg", "ERROR! ERROR! ERROR! checksum error in index checksum",
|
||||||
KR(ret), KPC(data_table_schema), K_(tenant_id), K(frozen_scn), KPC(index_table_schema));
|
KR(ret), KPC(data_simple_schema), K_(tenant_id), K(frozen_scn), KPC(index_simple_schema));
|
||||||
} else {
|
} else {
|
||||||
LOG_WARN("fail to check index column checksum", KR(ret), K_(tenant_id), KPC(data_table_schema),
|
LOG_WARN("fail to check index column checksum", KR(ret), K_(tenant_id), KPC(data_simple_schema),
|
||||||
KPC(index_table_schema));
|
KPC(index_simple_schema));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1443,14 +1421,14 @@ int ObIndexChecksumValidator::handle_index_table(
|
|||||||
|
|
||||||
int ObIndexChecksumValidator::try_print_first_unverified_info(
|
int ObIndexChecksumValidator::try_print_first_unverified_info(
|
||||||
const ObSimpleTableSchemaV2 *simple_schema,
|
const ObSimpleTableSchemaV2 *simple_schema,
|
||||||
const ObTableSchema *table_schema,
|
const ObArray<const share::schema::ObSimpleTableSchemaV2 *> &table_schemas,
|
||||||
const hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
const hash::ObHashMap<uint64_t, ObTableCompactionInfo> &table_compaction_map,
|
||||||
bool &already_print)
|
bool &already_print)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_ISNULL(simple_schema) || OB_ISNULL(table_schema)) {
|
if (OB_ISNULL(simple_schema)) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", KR(ret), KP(simple_schema), KP(table_schema));
|
LOG_WARN("invalid argument", KR(ret), KP(simple_schema));
|
||||||
} else if (already_print) { // do nothing. only print infos about the first unverified table
|
} else if (already_print) { // do nothing. only print infos about the first unverified table
|
||||||
} else {
|
} else {
|
||||||
const uint64_t table_id = simple_schema->get_table_id();
|
const uint64_t table_id = simple_schema->get_table_id();
|
||||||
@ -1471,31 +1449,30 @@ int ObIndexChecksumValidator::try_print_first_unverified_info(
|
|||||||
LOG_INFO("index table is unverified", K(cur_compaction_info), K(data_compaction_info));
|
LOG_INFO("index table is unverified", K(cur_compaction_info), K(data_compaction_info));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (table_schema->get_index_tid_count() < 1) {
|
// 2.2 data table, print data table compaction info and its index table compaction infos
|
||||||
// 2.2 data table without index, print data table compaction info
|
SMART_VAR(ObArray<ObTableCompactionInfo>, index_table_compaction_infos) {
|
||||||
LOG_INFO("data table without index is unverified", K(cur_compaction_info));
|
const int64_t table_count = table_schemas.count();
|
||||||
} else if (table_schema->get_index_tid_count() >= 1) {
|
// traverse table_schemas to find all indexs of this data table
|
||||||
// 2.3 data table with index, print data table compaction info and index table compaction infos
|
for (int64_t i = 0; OB_SUCC(ret) && (i < table_count); ++i) {
|
||||||
SMART_VARS_2((ObArray<ObAuxTableMetaInfo>, simple_index_infos_array),
|
const ObSimpleTableSchemaV2 *cur_simple_schema = table_schemas.at(i);
|
||||||
(ObArray<ObTableCompactionInfo>, index_table_compaction_infos)) {
|
if (OB_ISNULL(cur_simple_schema)) {
|
||||||
if (OB_FAIL(table_schema->get_simple_index_infos(simple_index_infos_array, true))) {
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("fail to get simple index infos array", KR(ret), KPC(table_schema));
|
LOG_WARN("unexpected error, simple schema is null", KR(ret), K_(tenant_id));
|
||||||
} else {
|
} else if (cur_simple_schema->is_index_table()) {
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && (i < simple_index_infos_array.count()); ++i) {
|
const uint64_t data_table_id = cur_simple_schema->get_data_table_id();
|
||||||
|
if (data_table_id == table_id) {
|
||||||
|
const uint64_t index_table_id = cur_simple_schema->get_table_id();
|
||||||
ObTableCompactionInfo index_compaction_info;
|
ObTableCompactionInfo index_compaction_info;
|
||||||
const uint64_t index_table_id = simple_index_infos_array.at(i).table_id_;
|
|
||||||
if (OB_FAIL(table_compaction_map.get_refactored(index_table_id, index_compaction_info))) {
|
if (OB_FAIL(table_compaction_map.get_refactored(index_table_id, index_compaction_info))) {
|
||||||
LOG_WARN("fail to get table_compaction_info from table_compaction_map", KR(ret), K(index_table_id));
|
LOG_WARN("fail to get table_compaction_info from table_compaction_map", KR(ret), K(index_table_id));
|
||||||
} else {
|
} else if (OB_FAIL(index_table_compaction_infos.push_back(index_compaction_info))) {
|
||||||
if (OB_FAIL(index_table_compaction_infos.push_back(index_compaction_info))) {
|
|
||||||
LOG_WARN("fail to push back", KR(ret), K(index_compaction_info));
|
LOG_WARN("fail to push back", KR(ret), K(index_compaction_info));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
LOG_INFO("data table with index is unverified", K(cur_compaction_info), K(index_table_compaction_infos));
|
LOG_INFO("data table is unverified", K(cur_compaction_info), K(index_table_compaction_infos));
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -79,7 +79,7 @@ public:
|
|||||||
protected:
|
protected:
|
||||||
bool exist_in_table_array(const uint64_t table_id,
|
bool exist_in_table_array(const uint64_t table_id,
|
||||||
const common::ObIArray<uint64_t> &table_ids) const;
|
const common::ObIArray<uint64_t> &table_ids) const;
|
||||||
int get_table_compaction_info(const share::schema::ObTableSchema &table_schema,
|
int get_table_compaction_info(const share::schema::ObSimpleTableSchemaV2 &simple_schema,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
||||||
share::ObTableCompactionInfo &table_compaction_info);
|
share::ObTableCompactionInfo &table_compaction_info);
|
||||||
// compare 'table_compaction_map' with 'table_ids', and then remove those
|
// compare 'table_compaction_map' with 'table_ids', and then remove those
|
||||||
@ -133,7 +133,7 @@ private:
|
|||||||
const int64_t expected_epoch) override;
|
const int64_t expected_epoch) override;
|
||||||
// check whether all tablets of this table finished compaction or not,
|
// check whether all tablets of this table finished compaction or not,
|
||||||
// and execute tablet replica checksum verification if this table has tablet.
|
// and execute tablet replica checksum verification if this table has tablet.
|
||||||
int check_table_compaction_and_validate_checksum(const share::schema::ObTableSchema &table_schema,
|
int check_table_compaction_and_validate_checksum(const share::schema::ObSimpleTableSchemaV2 &simple_schema,
|
||||||
const share::SCN &frozen_scn,
|
const share::SCN &frozen_scn,
|
||||||
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
|
const hash::ObHashMap<share::ObTabletLSPair, share::ObTabletCompactionStatus> &tablet_compaction_map,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map);
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map);
|
||||||
@ -182,13 +182,14 @@ private:
|
|||||||
int validate_cross_cluster_checksum(const volatile bool &stop,
|
int validate_cross_cluster_checksum(const volatile bool &stop,
|
||||||
const share::SCN &frozen_scn,
|
const share::SCN &frozen_scn,
|
||||||
const int64_t expected_epoch,
|
const int64_t expected_epoch,
|
||||||
const share::schema::ObTableSchema *table_schema,
|
const share::schema::ObSimpleTableSchemaV2 *simple_schema,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
||||||
ObMergeTimeStatistics &merge_time_statistics);
|
ObMergeTimeStatistics &merge_time_statistics);
|
||||||
int check_need_validate(const bool is_primary_service,
|
int check_need_validate(const bool is_primary_service,
|
||||||
const share::SCN &frozen_scn,
|
const share::SCN &frozen_scn,
|
||||||
bool &need_validate) const;
|
bool &need_validate) const;
|
||||||
int check_cross_cluster_checksum(const share::schema::ObTableSchema &table_schema, const share::SCN &frozen_scn);
|
int check_cross_cluster_checksum(const share::schema::ObSimpleTableSchemaV2 &simple_schema,
|
||||||
|
const share::SCN &frozen_scn);
|
||||||
void sort_tablet_ids(ObArray<ObTabletID> &tablet_ids);
|
void sort_tablet_ids(ObArray<ObTabletID> &tablet_ids);
|
||||||
int check_column_checksum(const ObArray<share::ObTabletReplicaChecksumItem> &tablet_replica_checksum_items,
|
int check_column_checksum(const ObArray<share::ObTabletReplicaChecksumItem> &tablet_replica_checksum_items,
|
||||||
const ObArray<share::ObTabletChecksumItem> &tablet_checksum_items);
|
const ObArray<share::ObTabletChecksumItem> &tablet_checksum_items);
|
||||||
@ -196,7 +197,7 @@ private:
|
|||||||
bool check_waiting_tablet_checksum_timeout() const;
|
bool check_waiting_tablet_checksum_timeout() const;
|
||||||
// handle the table, update its all tablets' status if needed. And update its compaction_info in @table_compaction_map
|
// handle the table, update its all tablets' status if needed. And update its compaction_info in @table_compaction_map
|
||||||
int handle_table_verification_finished(const volatile bool &stop,
|
int handle_table_verification_finished(const volatile bool &stop,
|
||||||
const share::schema::ObTableSchema *table_schema,
|
const share::schema::ObSimpleTableSchemaV2 *simple_schema,
|
||||||
const share::SCN &frozen_scn,
|
const share::SCN &frozen_scn,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
||||||
ObMergeTimeStatistics &merge_time_statistics,
|
ObMergeTimeStatistics &merge_time_statistics,
|
||||||
@ -245,23 +246,27 @@ private:
|
|||||||
int handle_table_verification_finished(const uint64_t table_id,
|
int handle_table_verification_finished(const uint64_t table_id,
|
||||||
const share::SCN &frozen_scn,
|
const share::SCN &frozen_scn,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map);
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map);
|
||||||
// handle data tables with index, mark data tables whose all index tables finished verification as INDEX_CKM_VERIFIED
|
// handle tables that are not index_table, including:
|
||||||
int handle_data_table_with_index(const volatile bool &stop,
|
// 1. data table with index: if all its index tables finished verification, mark it as INDEX_CKM_VERIFIED
|
||||||
|
// 2. data table without index: if it finished compaction, mark it as INDEX_CKM_VERIFIED
|
||||||
|
// 3. there may be other types of tables that are not index_table: if it finished compaction, mark it as INDEX_CKM_VERIFIED
|
||||||
|
int handle_data_table(const volatile bool &stop,
|
||||||
const share::SCN &frozen_scn,
|
const share::SCN &frozen_scn,
|
||||||
const common::ObIArray<uint64_t> &table_ids,
|
const common::ObIArray<uint64_t> &table_ids,
|
||||||
const common::ObIArray<const share::schema::ObSimpleTableSchemaV2 *> &table_schemas,
|
const ObIArray<const share::schema::ObSimpleTableSchemaV2 *> &table_schemas,
|
||||||
const common::ObIArray<uint64_t> &ori_table_ids,
|
const common::ObIArray<uint64_t> &ori_table_ids,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map);
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map);
|
||||||
// check data tables with index, return those need to be marked as INDEX_CKM_VERIFIED
|
// check data tables, return those need to be marked as INDEX_CKM_VERIFIED:
|
||||||
int check_data_table_with_index(const common::ObIArray<const share::schema::ObSimpleTableSchemaV2 *> &table_schemas,
|
// 1. data table with index: if all its index tables finished verification, it need to be marked as INDEX_CKM_VERIFIED
|
||||||
|
// 2. data table without index: if it finished compaction, it need to be marked as INDEX_CKM_VERIFIED
|
||||||
|
int check_data_table(const common::ObIArray<const share::schema::ObSimpleTableSchemaV2 *> &table_schemas,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
||||||
const common::ObIArray<uint64_t> &ori_table_ids,
|
const common::ObIArray<uint64_t> &ori_table_ids,
|
||||||
common::ObIArray<uint64_t> &data_tables_to_update);
|
common::ObIArray<uint64_t> &data_tables_to_update);
|
||||||
// handle index tables. validate column checksum if needed, and mark index tables as INDEX_CKM_VERIFIED
|
// handle index tables. validate column checksum if needed, and mark index tables as INDEX_CKM_VERIFIED
|
||||||
int handle_index_table(const share::SCN &frozen_scn,
|
int handle_index_table(const share::SCN &frozen_scn,
|
||||||
const share::ObTableCompactionInfo &index_compaction_info,
|
const share::ObTableCompactionInfo &index_compaction_info,
|
||||||
const share::schema::ObTableSchema *index_table_schema,
|
const share::schema::ObSimpleTableSchemaV2 *index_simple_schema,
|
||||||
const share::schema::ObSimpleTableSchemaV2 *simple_schema,
|
|
||||||
share::schema::ObSchemaGetterGuard &schema_guard,
|
share::schema::ObSchemaGetterGuard &schema_guard,
|
||||||
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
||||||
const int64_t expected_epoch);
|
const int64_t expected_epoch);
|
||||||
@ -269,7 +274,7 @@ private:
|
|||||||
// function will not modify table_compaction_map, which ensures major compaction will not be
|
// function will not modify table_compaction_map, which ensures major compaction will not be
|
||||||
// affected by this function.
|
// affected by this function.
|
||||||
int try_print_first_unverified_info(const share::schema::ObSimpleTableSchemaV2 *simple_schema,
|
int try_print_first_unverified_info(const share::schema::ObSimpleTableSchemaV2 *simple_schema,
|
||||||
const share::schema::ObTableSchema *table_schema,
|
const ObArray<const share::schema::ObSimpleTableSchemaV2 *> &table_schemas,
|
||||||
const hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
const hash::ObHashMap<uint64_t, share::ObTableCompactionInfo> &table_compaction_map,
|
||||||
bool &already_print);
|
bool &already_print);
|
||||||
};
|
};
|
||||||
|
@ -152,7 +152,7 @@ int ObMajorMergeProgressChecker::handle_table_with_first_tablet_in_sys_ls(
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObSchemaGetterGuard schema_guard;
|
ObSchemaGetterGuard schema_guard;
|
||||||
const ObTableSchema *table_schema = nullptr;
|
const ObSimpleTableSchemaV2 *simple_schema = nullptr;
|
||||||
ObTableCompactionInfo cur_compaction_info;
|
ObTableCompactionInfo cur_compaction_info;
|
||||||
const uint64_t special_table_id = cross_cluster_validator_.get_special_table_id();
|
const uint64_t special_table_id = cross_cluster_validator_.get_special_table_id();
|
||||||
// only primary major_freeze_service need to handle table with frist tablet in sys ls here
|
// only primary major_freeze_service need to handle table with frist tablet in sys ls here
|
||||||
@ -162,15 +162,15 @@ int ObMajorMergeProgressChecker::handle_table_with_first_tablet_in_sys_ls(
|
|||||||
LOG_WARN("invalid special table id", KR(ret), K_(tenant_id), K(special_table_id));
|
LOG_WARN("invalid special table id", KR(ret), K_(tenant_id), K(special_table_id));
|
||||||
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard(tenant_id_, schema_guard))) {
|
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard(tenant_id_, schema_guard))) {
|
||||||
LOG_WARN("fail to get tenant schema guard", KR(ret), K_(tenant_id));
|
LOG_WARN("fail to get tenant schema guard", KR(ret), K_(tenant_id));
|
||||||
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, special_table_id, table_schema))) {
|
} else if (OB_FAIL(schema_guard.get_simple_table_schema(tenant_id_, special_table_id, simple_schema))) {
|
||||||
LOG_WARN("fail to get table schema", KR(ret), K_(tenant_id), K(special_table_id));
|
LOG_WARN("fail to get table schema", KR(ret), K_(tenant_id), K(special_table_id));
|
||||||
} else if (OB_ISNULL(table_schema)) {
|
} else if (OB_ISNULL(simple_schema)) {
|
||||||
ret = OB_TABLE_NOT_EXIST;
|
ret = OB_TABLE_NOT_EXIST;
|
||||||
LOG_WARN("table schema is null", KR(ret), K_(tenant_id), K(special_table_id));
|
LOG_WARN("table schema is null", KR(ret), K_(tenant_id), K(special_table_id));
|
||||||
} else {
|
} else {
|
||||||
SMART_VAR(ObArray<ObTabletLSPair>, pairs) {
|
SMART_VAR(ObArray<ObTabletLSPair>, pairs) {
|
||||||
FREEZE_TIME_GUARD;
|
FREEZE_TIME_GUARD;
|
||||||
if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(tenant_id_, *table_schema,
|
if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(tenant_id_, *simple_schema,
|
||||||
*sql_proxy_, pairs))) {
|
*sql_proxy_, pairs))) {
|
||||||
LOG_WARN("fail to get tablet_ls pairs", KR(ret), K_(tenant_id), K(special_table_id));
|
LOG_WARN("fail to get tablet_ls pairs", KR(ret), K_(tenant_id), K(special_table_id));
|
||||||
} else if (OB_UNLIKELY(pairs.count() < 1)) {
|
} else if (OB_UNLIKELY(pairs.count() < 1)) {
|
||||||
|
@ -171,7 +171,7 @@ int ObMajorMergeScheduler::try_idle(
|
|||||||
int64_t merger_check_interval = idling_.get_idle_interval_us();
|
int64_t merger_check_interval = idling_.get_idle_interval_us();
|
||||||
const int64_t start_time_us = ObTimeUtil::current_time();
|
const int64_t start_time_us = ObTimeUtil::current_time();
|
||||||
|
|
||||||
if (OB_SUCC(work_ret)) {
|
if (OB_SUCCESS == work_ret) {
|
||||||
fail_count_ = 0;
|
fail_count_ = 0;
|
||||||
} else {
|
} else {
|
||||||
++fail_count_;
|
++fail_count_;
|
||||||
@ -189,7 +189,7 @@ int ObMajorMergeScheduler::try_idle(
|
|||||||
} else {
|
} else {
|
||||||
idle_time_us = merger_check_interval;
|
idle_time_us = merger_check_interval;
|
||||||
LOG_WARN("major merge failed more than immediate cnt, turn to idle status",
|
LOG_WARN("major merge failed more than immediate cnt, turn to idle status",
|
||||||
K_(fail_count), LITERAL_K(IMMEDIATE_RETRY_CNT));
|
K_(fail_count), LITERAL_K(IMMEDIATE_RETRY_CNT), K(idle_time_us));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (is_paused()) {
|
if (is_paused()) {
|
||||||
@ -197,11 +197,11 @@ int ObMajorMergeScheduler::try_idle(
|
|||||||
LOG_INFO("major_merge_scheduler is paused", K_(tenant_id), K(idle_time_us),
|
LOG_INFO("major_merge_scheduler is paused", K_(tenant_id), K(idle_time_us),
|
||||||
"epoch", get_epoch());
|
"epoch", get_epoch());
|
||||||
if (OB_FAIL(idling_.idle(idle_time_us))) {
|
if (OB_FAIL(idling_.idle(idle_time_us))) {
|
||||||
LOG_WARN("fail to idle", KR(ret));
|
LOG_WARN("fail to idle", KR(ret), K(idle_time_us));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG_INFO("major_merge_scheduler is not paused", K_(tenant_id), K(idle_time_us),
|
LOG_INFO("major_merge_scheduler is not idling", KR(ret), K_(tenant_id), K(idle_time_us),
|
||||||
"epoch", get_epoch());
|
K_(stop), "epoch", get_epoch());
|
||||||
} else if (OB_FAIL(idling_.idle(idle_time_us))) {
|
} else if (OB_FAIL(idling_.idle(idle_time_us))) {
|
||||||
LOG_WARN("fail to idle", KR(ret), K(idle_time_us));
|
LOG_WARN("fail to idle", KR(ret), K(idle_time_us));
|
||||||
}
|
}
|
||||||
|
@ -989,32 +989,70 @@ int ObTabletReplicaChecksumOperator::innner_verify_tablet_replica_checksum(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObTabletReplicaChecksumOperator::get_index_and_data_table_schema(
|
||||||
|
ObSchemaGetterGuard &schema_guard,
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const uint64_t index_table_id,
|
||||||
|
const uint64_t data_table_id,
|
||||||
|
const ObTableSchema *&index_table_schema,
|
||||||
|
const ObTableSchema *&data_table_schema)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_FAIL(schema_guard.get_table_schema(tenant_id, index_table_id, index_table_schema))) {
|
||||||
|
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(index_table_id));
|
||||||
|
} else if (OB_ISNULL(index_table_schema)) {
|
||||||
|
LOG_WARN("index_table_schema is null", K(tenant_id), K(index_table_id));
|
||||||
|
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, data_table_id, data_table_schema))) {
|
||||||
|
LOG_WARN("fail to get table schema", KR(ret), K(tenant_id), K(data_table_id));
|
||||||
|
} else if (OB_ISNULL(data_table_schema)) {
|
||||||
|
LOG_WARN("data_table_schema is null", K(tenant_id), K(data_table_id));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObTabletReplicaChecksumOperator::check_column_checksum(
|
int ObTabletReplicaChecksumOperator::check_column_checksum(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const ObTableSchema &data_table_schema,
|
const ObSimpleTableSchemaV2 &data_simple_schema,
|
||||||
const ObTableSchema &index_table_schema,
|
const ObSimpleTableSchemaV2 &index_simple_schema,
|
||||||
const SCN &compaction_scn,
|
const SCN &compaction_scn,
|
||||||
ObMySQLProxy &sql_proxy,
|
ObMySQLProxy &sql_proxy,
|
||||||
const int64_t expected_epoch)
|
const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
const uint64_t index_table_id = index_simple_schema.get_table_id();
|
||||||
|
const uint64_t data_table_id = data_simple_schema.get_table_id();
|
||||||
|
const ObTableSchema *index_table_schema = nullptr;
|
||||||
|
const ObTableSchema *data_table_schema = nullptr;
|
||||||
|
// destruct the schema_guard after validating index column checksum to free memory
|
||||||
|
ObSchemaGetterGuard schema_guard;
|
||||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)
|
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)
|
||||||
|| !data_table_schema.is_valid()
|
|| !data_simple_schema.is_valid()
|
||||||
|| !index_table_schema.is_valid())) {
|
|| !index_simple_schema.is_valid())) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid arguments", KR(ret), K(tenant_id), K(data_table_schema), K(index_table_schema));
|
LOG_WARN("invalid arguments", KR(ret), K(tenant_id), K(data_simple_schema), K(index_simple_schema));
|
||||||
|
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard(tenant_id, schema_guard))) {
|
||||||
|
LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id));
|
||||||
|
} else if (OB_FAIL(get_index_and_data_table_schema(schema_guard, tenant_id, index_table_id,
|
||||||
|
data_table_id, index_table_schema, data_table_schema))) {
|
||||||
|
LOG_WARN("fail to get index and data table schema", KR(ret), K(index_simple_schema), K(data_simple_schema));
|
||||||
|
} else if (OB_ISNULL(index_table_schema) || OB_ISNULL(data_table_schema)) {
|
||||||
|
// table schemas are changed, and index_table or data_table does not exist in new table schemas.
|
||||||
|
// no need to check index column checksum.
|
||||||
|
ret = OB_TABLE_NOT_EXIST;
|
||||||
|
LOG_WARN("table schema is null", KR(ret), KP(index_table_schema), KP(data_table_schema),
|
||||||
|
K(index_table_id), K(data_table_id));
|
||||||
} else {
|
} else {
|
||||||
const bool is_global_index = index_table_schema.is_global_index_table();
|
const bool is_global_index = index_simple_schema.is_global_index_table();
|
||||||
if (is_global_index) {
|
if (is_global_index) {
|
||||||
if (OB_FAIL(check_global_index_column_checksum(tenant_id, data_table_schema, index_table_schema,
|
if (OB_FAIL(check_global_index_column_checksum(tenant_id, *data_table_schema, *index_table_schema,
|
||||||
compaction_scn, sql_proxy, expected_epoch))) {
|
compaction_scn, sql_proxy, expected_epoch))) {
|
||||||
LOG_WARN("fail to check global index column checksum", KR(ret), K(tenant_id), K(compaction_scn));
|
LOG_WARN("fail to check global index column checksum", KR(ret), K(tenant_id), K(compaction_scn));
|
||||||
}
|
}
|
||||||
} else if (OB_UNLIKELY(index_table_schema.is_spatial_index())) {
|
} else if (OB_UNLIKELY(index_simple_schema.is_spatial_index())) {
|
||||||
// do nothing
|
// do nothing
|
||||||
// spatial index column is different from data table column
|
// spatial index column is different from data table column
|
||||||
} else {
|
} else {
|
||||||
if (OB_FAIL(check_local_index_column_checksum(tenant_id, data_table_schema, index_table_schema,
|
if (OB_FAIL(check_local_index_column_checksum(tenant_id, *data_table_schema, *index_table_schema,
|
||||||
compaction_scn, sql_proxy, expected_epoch))) {
|
compaction_scn, sql_proxy, expected_epoch))) {
|
||||||
LOG_WARN("fail to check local index column checksum", KR(ret), K(tenant_id), K(compaction_scn));
|
LOG_WARN("fail to check local index column checksum", KR(ret), K(tenant_id), K(compaction_scn));
|
||||||
}
|
}
|
||||||
@ -1399,20 +1437,20 @@ int ObTabletReplicaChecksumOperator::find_checksum_item_(
|
|||||||
|
|
||||||
int ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(
|
int ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const ObTableSchema &table_schema,
|
const ObSimpleTableSchemaV2 &simple_schema,
|
||||||
ObMySQLProxy &sql_proxy,
|
ObMySQLProxy &sql_proxy,
|
||||||
ObIArray<ObTabletLSPair> &pairs)
|
ObIArray<ObTabletLSPair> &pairs)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if ((!is_valid_tenant_id(tenant_id)) || (!table_schema.is_valid())) {
|
if ((!is_valid_tenant_id(tenant_id)) || (!simple_schema.is_valid())) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||||
} else {
|
} else {
|
||||||
SMART_VAR(ObArray<ObTabletID>, tablet_ids) {
|
SMART_VAR(ObArray<ObTabletID>, tablet_ids) {
|
||||||
if (OB_FAIL(get_table_all_tablet_ids_(table_schema, tablet_ids))) {
|
if (OB_FAIL(get_table_all_tablet_ids_(simple_schema, tablet_ids))) {
|
||||||
LOG_WARN("fail to get table all tablet ids", KR(ret), K(table_schema));
|
LOG_WARN("fail to get table all tablet ids", KR(ret), K(simple_schema));
|
||||||
} else if (tablet_ids.count() > 0) {
|
} else if (tablet_ids.count() > 0) {
|
||||||
const uint64_t table_id = table_schema.get_table_id();
|
const uint64_t table_id = simple_schema.get_table_id();
|
||||||
if (OB_FAIL(get_tablet_ls_pairs(tenant_id, table_id, sql_proxy, tablet_ids, pairs))) {
|
if (OB_FAIL(get_tablet_ls_pairs(tenant_id, table_id, sql_proxy, tablet_ids, pairs))) {
|
||||||
LOG_WARN("fail to get tablet_ls_pairs", KR(ret), K(tenant_id), K(table_id));
|
LOG_WARN("fail to get tablet_ls_pairs", KR(ret), K(tenant_id), K(table_id));
|
||||||
}
|
}
|
||||||
@ -1477,18 +1515,18 @@ int ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(
|
|||||||
int ObTabletReplicaChecksumOperator::get_tablet_replica_checksum_items_(
|
int ObTabletReplicaChecksumOperator::get_tablet_replica_checksum_items_(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
ObMySQLProxy &sql_proxy,
|
ObMySQLProxy &sql_proxy,
|
||||||
const ObTableSchema &table_schema,
|
const ObSimpleTableSchemaV2 &simple_schema,
|
||||||
const SCN &compaction_scn,
|
const SCN &compaction_scn,
|
||||||
ObIArray<ObTabletLSPair> &tablet_pairs,
|
ObIArray<ObTabletLSPair> &tablet_pairs,
|
||||||
ObIArray<ObTabletReplicaChecksumItem> &items)
|
ObIArray<ObTabletReplicaChecksumItem> &items)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if ((!is_valid_tenant_id(tenant_id)) || (!table_schema.is_valid())) {
|
if ((!is_valid_tenant_id(tenant_id)) || (!simple_schema.is_valid())) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||||
} else {
|
} else {
|
||||||
const uint64_t table_id = table_schema.get_table_id();
|
const uint64_t table_id = simple_schema.get_table_id();
|
||||||
if (OB_FAIL(get_tablet_ls_pairs(tenant_id, table_schema, sql_proxy, tablet_pairs))) {
|
if (OB_FAIL(get_tablet_ls_pairs(tenant_id, simple_schema, sql_proxy, tablet_pairs))) {
|
||||||
LOG_WARN("fail to get tablet_ls_pairs", KR(ret), K(tenant_id), K(table_id));
|
LOG_WARN("fail to get tablet_ls_pairs", KR(ret), K(tenant_id), K(table_id));
|
||||||
} else if (OB_FAIL(ObTabletReplicaChecksumOperator::batch_get(tenant_id, tablet_pairs, compaction_scn,
|
} else if (OB_FAIL(ObTabletReplicaChecksumOperator::batch_get(tenant_id, tablet_pairs, compaction_scn,
|
||||||
sql_proxy, items, true/*include_larger_than*/))) {
|
sql_proxy, items, true/*include_larger_than*/))) {
|
||||||
@ -1503,17 +1541,17 @@ int ObTabletReplicaChecksumOperator::get_tablet_replica_checksum_items_(
|
|||||||
}
|
}
|
||||||
|
|
||||||
int ObTabletReplicaChecksumOperator::get_table_all_tablet_ids_(
|
int ObTabletReplicaChecksumOperator::get_table_all_tablet_ids_(
|
||||||
const ObTableSchema &table_schema,
|
const ObSimpleTableSchemaV2 &simple_schema,
|
||||||
ObIArray<ObTabletID> &schema_tablet_ids)
|
ObIArray<ObTabletID> &schema_tablet_ids)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_UNLIKELY(!table_schema.is_valid())) {
|
if (OB_UNLIKELY(!simple_schema.is_valid())) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid arguments", KR(ret), K(table_schema));
|
LOG_WARN("invalid arguments", KR(ret), K(simple_schema));
|
||||||
} else {
|
} else {
|
||||||
if (table_schema.has_tablet()) {
|
if (simple_schema.has_tablet()) {
|
||||||
if (OB_FAIL(table_schema.get_tablet_ids(schema_tablet_ids))) {
|
if (OB_FAIL(simple_schema.get_tablet_ids(schema_tablet_ids))) {
|
||||||
LOG_WARN("fail to get tablet_ids from table schema", KR(ret), K(table_schema));
|
LOG_WARN("fail to get tablet_ids from table schema", KR(ret), K(simple_schema));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1550,7 +1588,7 @@ int ObTabletReplicaChecksumOperator::check_table_all_tablets_ckm_status_(
|
|||||||
|
|
||||||
int ObTabletReplicaChecksumOperator::need_verify_checksum_(
|
int ObTabletReplicaChecksumOperator::need_verify_checksum_(
|
||||||
const SCN &compaction_scn,
|
const SCN &compaction_scn,
|
||||||
const ObTableSchema &table_schema,
|
const ObSimpleTableSchemaV2 &simple_schema,
|
||||||
const ObIArray<ObTabletReplicaChecksumItem> &items,
|
const ObIArray<ObTabletReplicaChecksumItem> &items,
|
||||||
bool &need_verify,
|
bool &need_verify,
|
||||||
int64_t &ckm_tablet_cnt)
|
int64_t &ckm_tablet_cnt)
|
||||||
@ -1563,8 +1601,8 @@ int ObTabletReplicaChecksumOperator::need_verify_checksum_(
|
|||||||
if (item_cnt <= 0) {
|
if (item_cnt <= 0) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", KR(ret), K(item_cnt));
|
LOG_WARN("invalid argument", KR(ret), K(item_cnt));
|
||||||
} else if (OB_FAIL(get_table_all_tablet_ids_(table_schema, schema_tablet_ids))) {
|
} else if (OB_FAIL(get_table_all_tablet_ids_(simple_schema, schema_tablet_ids))) {
|
||||||
LOG_WARN("fail to get table all tablet ids", KR(ret), K(table_schema));
|
LOG_WARN("fail to get table all tablet ids", KR(ret), K(simple_schema));
|
||||||
} else if (OB_FAIL(tablet_id_set.create(item_cnt))) {
|
} else if (OB_FAIL(tablet_id_set.create(item_cnt))) {
|
||||||
LOG_WARN("fail to create tablet_id set", KR(ret), K(item_cnt));
|
LOG_WARN("fail to create tablet_id set", KR(ret), K(item_cnt));
|
||||||
} else {
|
} else {
|
||||||
@ -1607,7 +1645,7 @@ int ObTabletReplicaChecksumOperator::need_verify_checksum_(
|
|||||||
need_verify = false;
|
need_verify = false;
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
LOG_INFO("no need to verify checksum, cuz tablet in table_schema has no checksum_item",
|
LOG_INFO("no need to verify checksum, cuz tablet in table_schema has no checksum_item",
|
||||||
"table_id", table_schema.get_table_id(), "tablet_id", tablet_id->id(), K(items));
|
"table_id", simple_schema.get_table_id(), "tablet_id", tablet_id->id(), K(items));
|
||||||
} else if (OB_HASH_EXIST == ret) {
|
} else if (OB_HASH_EXIST == ret) {
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
|
@ -155,7 +155,7 @@ public:
|
|||||||
|
|
||||||
static int get_tablet_ls_pairs(
|
static int get_tablet_ls_pairs(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const schema::ObTableSchema &table_schema,
|
const schema::ObSimpleTableSchemaV2 &simple_schema,
|
||||||
common::ObMySQLProxy &sql_proxy,
|
common::ObMySQLProxy &sql_proxy,
|
||||||
common::ObIArray<ObTabletLSPair> &tablet_ls_pairs);
|
common::ObIArray<ObTabletLSPair> &tablet_ls_pairs);
|
||||||
|
|
||||||
@ -174,8 +174,8 @@ public:
|
|||||||
|
|
||||||
static int check_column_checksum(
|
static int check_column_checksum(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const schema::ObTableSchema &data_table_schema,
|
const schema::ObSimpleTableSchemaV2 &data_simple_schema,
|
||||||
const schema::ObTableSchema &index_table_schema,
|
const schema::ObSimpleTableSchemaV2 &index_simple_schema,
|
||||||
const SCN &compaction_scn,
|
const SCN &compaction_scn,
|
||||||
common::ObMySQLProxy &sql_proxy,
|
common::ObMySQLProxy &sql_proxy,
|
||||||
const int64_t expected_epoch);
|
const int64_t expected_epoch);
|
||||||
@ -253,6 +253,14 @@ private:
|
|||||||
static int innner_verify_tablet_replica_checksum(
|
static int innner_verify_tablet_replica_checksum(
|
||||||
const common::ObIArray<ObTabletReplicaChecksumItem> &ckm_items);
|
const common::ObIArray<ObTabletReplicaChecksumItem> &ckm_items);
|
||||||
|
|
||||||
|
static int get_index_and_data_table_schema(
|
||||||
|
schema::ObSchemaGetterGuard &schema_guard,
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const uint64_t index_table_id,
|
||||||
|
const uint64_t data_table_id,
|
||||||
|
const schema::ObTableSchema *&index_table_schema,
|
||||||
|
const schema::ObTableSchema *&data_table_schema);
|
||||||
|
|
||||||
static int check_global_index_column_checksum(
|
static int check_global_index_column_checksum(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const schema::ObTableSchema &data_table_schema,
|
const schema::ObTableSchema &data_table_schema,
|
||||||
@ -288,13 +296,13 @@ private:
|
|||||||
static int get_tablet_replica_checksum_items_(
|
static int get_tablet_replica_checksum_items_(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
common::ObMySQLProxy &mysql_proxy,
|
common::ObMySQLProxy &mysql_proxy,
|
||||||
const schema::ObTableSchema &table_schema,
|
const schema::ObSimpleTableSchemaV2 &simple_schema,
|
||||||
const SCN &compaction_scn,
|
const SCN &compaction_scn,
|
||||||
common::ObIArray<ObTabletLSPair> &tablet_pairs,
|
common::ObIArray<ObTabletLSPair> &tablet_pairs,
|
||||||
common::ObIArray<ObTabletReplicaChecksumItem> &items);
|
common::ObIArray<ObTabletReplicaChecksumItem> &items);
|
||||||
|
|
||||||
static int get_table_all_tablet_ids_(
|
static int get_table_all_tablet_ids_(
|
||||||
const schema::ObTableSchema &table_schema,
|
const schema::ObSimpleTableSchemaV2 &simple_schema,
|
||||||
common::ObIArray<common::ObTabletID> &schema_tablet_ids);
|
common::ObIArray<common::ObTabletID> &schema_tablet_ids);
|
||||||
|
|
||||||
static int find_checksum_item_(
|
static int find_checksum_item_(
|
||||||
@ -310,7 +318,7 @@ private:
|
|||||||
|
|
||||||
static int need_verify_checksum_(
|
static int need_verify_checksum_(
|
||||||
const SCN &compaction_scn,
|
const SCN &compaction_scn,
|
||||||
const schema::ObTableSchema &table_schema,
|
const schema::ObSimpleTableSchemaV2 &simple_schema,
|
||||||
const common::ObIArray<ObTabletReplicaChecksumItem> &items,
|
const common::ObIArray<ObTabletReplicaChecksumItem> &items,
|
||||||
bool &need_verify,
|
bool &need_verify,
|
||||||
int64_t &ckm_tablet_cnt);
|
int64_t &ckm_tablet_cnt);
|
||||||
|
Reference in New Issue
Block a user