fix handle_table_with_first_tablet_in_sys_ls about special_table_id = OB_INVALID_ID
This commit is contained in:
@ -446,7 +446,7 @@ int ObTabletChecksumValidator::validate_tablet_replica_checksum(
|
|||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
ObCrossClusterTabletChecksumValidator::ObCrossClusterTabletChecksumValidator()
|
ObCrossClusterTabletChecksumValidator::ObCrossClusterTabletChecksumValidator()
|
||||||
: major_merge_start_us_(-1), special_table_id_(OB_INVALID_ID)
|
: major_merge_start_us_(-1)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -832,9 +832,13 @@ int ObCrossClusterTabletChecksumValidator::handle_table_verification_finished(
|
|||||||
LOG_WARN("fail to check if contains first tablet in sys ls", KR(ret), K_(tenant_id), K(pairs));
|
LOG_WARN("fail to check if contains first tablet in sys ls", KR(ret), K_(tenant_id), K(pairs));
|
||||||
} else if (is_containing) {
|
} else if (is_containing) {
|
||||||
// do not write tablet checksum and update report_scn of this table here.
|
// do not write tablet checksum and update report_scn of this table here.
|
||||||
// instead, just record the table_id of this table here. write tablet checksum
|
// instead, write tablet checksum and update report_scn of this table in the end
|
||||||
// and update report_scn of this table in the end of this round of major freeze.
|
// of this round of major freeze.
|
||||||
special_table_id_ = table_id;
|
if (MAJOR_MERGE_SPECIAL_TABLE_ID != table_id) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("table_id of the table containing first tablet in sys ls does not equal"
|
||||||
|
" to 1", KR(ret), K_(tenant_id), K(table_id));
|
||||||
|
}
|
||||||
need_update_report_scn = false;
|
need_update_report_scn = false;
|
||||||
LOG_INFO("this table contains first tablet in sys ls, write tablet checksum and update"
|
LOG_INFO("this table contains first tablet in sys ls, write tablet checksum and update"
|
||||||
" report_scn of this table later", K(table_id), K(pairs));
|
" report_scn of this table later", K(table_id), K(pairs));
|
||||||
@ -892,12 +896,11 @@ int ObCrossClusterTabletChecksumValidator::write_tablet_checksum_at_table_level(
|
|||||||
bool is_exist = false;
|
bool is_exist = false;
|
||||||
FREEZE_TIME_GUARD;
|
FREEZE_TIME_GUARD;
|
||||||
if (OB_UNLIKELY(pairs.empty()
|
if (OB_UNLIKELY(pairs.empty()
|
||||||
|| (!table_compaction_info.is_index_ckm_verified() && (table_id != special_table_id_))
|
|| (!table_compaction_info.is_index_ckm_verified() && (MAJOR_MERGE_SPECIAL_TABLE_ID != table_id))
|
||||||
|| (!table_compaction_info.is_verified() && (table_id == special_table_id_)
|
|| (!table_compaction_info.is_verified() && (MAJOR_MERGE_SPECIAL_TABLE_ID == table_id)))) {
|
||||||
&& (OB_INVALID_ID != special_table_id_)))) {
|
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", KR(ret), K(pairs), K(frozen_scn), K(table_compaction_info),
|
LOG_WARN("invalid argument", KR(ret), K(pairs), K(frozen_scn), K(table_compaction_info),
|
||||||
K(table_id), K_(special_table_id));
|
K(table_id));
|
||||||
} else if (stop) {
|
} else if (stop) {
|
||||||
ret = OB_CANCELED;
|
ret = OB_CANCELED;
|
||||||
LOG_WARN("already stop", KR(ret), K_(tenant_id));
|
LOG_WARN("already stop", KR(ret), K_(tenant_id));
|
||||||
@ -909,9 +912,8 @@ int ObCrossClusterTabletChecksumValidator::write_tablet_checksum_at_table_level(
|
|||||||
if (table_compaction_info.can_skip_verifying()) {
|
if (table_compaction_info.can_skip_verifying()) {
|
||||||
// do not write tablet checksum items for tables that can skip verifying,
|
// do not write tablet checksum items for tables that can skip verifying,
|
||||||
// since tablet checksum items of these tables must have already been written
|
// since tablet checksum items of these tables must have already been written
|
||||||
} else if ((table_compaction_info.is_index_ckm_verified() && (table_id != special_table_id_))
|
} else if ((table_compaction_info.is_index_ckm_verified() && (MAJOR_MERGE_SPECIAL_TABLE_ID != table_id))
|
||||||
|| (table_compaction_info.is_verified() && (table_id == special_table_id_)
|
|| (table_compaction_info.is_verified() && (MAJOR_MERGE_SPECIAL_TABLE_ID == table_id))) {
|
||||||
&& (OB_INVALID_ID != special_table_id_))) {
|
|
||||||
const int64_t IMMEDIATE_RETRY_CNT = 5;
|
const int64_t IMMEDIATE_RETRY_CNT = 5;
|
||||||
int64_t fail_count = 0;
|
int64_t fail_count = 0;
|
||||||
int64_t sleep_time_us = 200 * 1000; // 200 ms
|
int64_t sleep_time_us = 200 * 1000; // 200 ms
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
#include "share/ob_tablet_replica_checksum_iterator.h"
|
#include "share/ob_tablet_replica_checksum_iterator.h"
|
||||||
#include "share/ob_freeze_info_proxy.h"
|
#include "share/ob_freeze_info_proxy.h"
|
||||||
#include "share/ob_zone_merge_info.h"
|
#include "share/ob_zone_merge_info.h"
|
||||||
|
#include "share/inner_table/ob_inner_table_schema_constants.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
@ -161,7 +162,6 @@ public:
|
|||||||
{
|
{
|
||||||
major_merge_start_us_ = major_merge_start_us;
|
major_merge_start_us_ = major_merge_start_us;
|
||||||
}
|
}
|
||||||
uint64_t get_special_table_id() const { return special_table_id_; }
|
|
||||||
// sync data from __all_tablet_replica_checksum to __all_tablet_checksum at table granularity
|
// sync data from __all_tablet_replica_checksum to __all_tablet_checksum at table granularity
|
||||||
int write_tablet_checksum_at_table_level(const volatile bool &stop,
|
int write_tablet_checksum_at_table_level(const volatile bool &stop,
|
||||||
const ObArray<share::ObTabletLSPair> &pairs,
|
const ObArray<share::ObTabletLSPair> &pairs,
|
||||||
@ -169,6 +169,8 @@ public:
|
|||||||
const share::ObTableCompactionInfo &table_compaction_info,
|
const share::ObTableCompactionInfo &table_compaction_info,
|
||||||
const uint64_t table_id,
|
const uint64_t table_id,
|
||||||
const int64_t expected_epoch);
|
const int64_t expected_epoch);
|
||||||
|
// table_id of the table containing first tablet in sys ls
|
||||||
|
static const uint64_t MAJOR_MERGE_SPECIAL_TABLE_ID = share::OB_ALL_CORE_TABLE_TID;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
virtual int check_all_table_verification_finished(const volatile bool &stop,
|
virtual int check_all_table_verification_finished(const volatile bool &stop,
|
||||||
@ -213,8 +215,6 @@ private:
|
|||||||
const static int64_t MAX_BATCH_INSERT_COUNT = 100;
|
const static int64_t MAX_BATCH_INSERT_COUNT = 100;
|
||||||
// record the time when starting to major merge, used for check_waiting_tablet_checksum_timeout
|
// record the time when starting to major merge, used for check_waiting_tablet_checksum_timeout
|
||||||
int64_t major_merge_start_us_;
|
int64_t major_merge_start_us_;
|
||||||
// record the table_id of the table which contains first tablet in sys ls
|
|
||||||
uint64_t special_table_id_;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Mainly to verify checksum between (global and local) index table and main table
|
// Mainly to verify checksum between (global and local) index table and main table
|
||||||
|
@ -157,33 +157,31 @@ int ObMajorMergeProgressChecker::handle_table_with_first_tablet_in_sys_ls(
|
|||||||
ObSchemaGetterGuard schema_guard;
|
ObSchemaGetterGuard schema_guard;
|
||||||
const ObSimpleTableSchemaV2 *simple_schema = nullptr;
|
const ObSimpleTableSchemaV2 *simple_schema = nullptr;
|
||||||
ObTableCompactionInfo cur_compaction_info;
|
ObTableCompactionInfo cur_compaction_info;
|
||||||
const uint64_t special_table_id = cross_cluster_validator_.get_special_table_id();
|
// table_id of the table containing first tablet in sys ls
|
||||||
|
const uint64_t major_merge_special_table_id = ObCrossClusterTabletChecksumValidator::MAJOR_MERGE_SPECIAL_TABLE_ID;
|
||||||
// only primary major_freeze_service need to handle table with frist tablet in sys ls here
|
// only primary major_freeze_service need to handle table with frist tablet in sys ls here
|
||||||
if (!is_primary_service) {
|
if (!is_primary_service) {
|
||||||
} else if (OB_UNLIKELY(OB_INVALID_ID == special_table_id)) {
|
|
||||||
ret = OB_INVALID_ARGUMENT;
|
|
||||||
LOG_WARN("invalid special table id", KR(ret), K_(tenant_id), K(special_table_id));
|
|
||||||
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard(tenant_id_, schema_guard))) {
|
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_full_schema_guard(tenant_id_, schema_guard))) {
|
||||||
LOG_WARN("fail to get tenant schema guard", KR(ret), K_(tenant_id));
|
LOG_WARN("fail to get tenant schema guard", KR(ret), K_(tenant_id));
|
||||||
} else if (OB_FAIL(schema_guard.get_simple_table_schema(tenant_id_, special_table_id, simple_schema))) {
|
} else if (OB_FAIL(schema_guard.get_simple_table_schema(tenant_id_, major_merge_special_table_id, simple_schema))) {
|
||||||
LOG_WARN("fail to get table schema", KR(ret), K_(tenant_id), K(special_table_id));
|
LOG_WARN("fail to get table schema", KR(ret), K_(tenant_id), K(major_merge_special_table_id));
|
||||||
} else if (OB_ISNULL(simple_schema)) {
|
} else if (OB_ISNULL(simple_schema)) {
|
||||||
ret = OB_TABLE_NOT_EXIST;
|
ret = OB_TABLE_NOT_EXIST;
|
||||||
LOG_WARN("table schema is null", KR(ret), K_(tenant_id), K(special_table_id));
|
LOG_WARN("table schema is null", KR(ret), K_(tenant_id), K(major_merge_special_table_id));
|
||||||
} else {
|
} else {
|
||||||
SMART_VAR(ObArray<ObTabletLSPair>, pairs) {
|
SMART_VAR(ObArray<ObTabletLSPair>, pairs) {
|
||||||
FREEZE_TIME_GUARD;
|
FREEZE_TIME_GUARD;
|
||||||
if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(tenant_id_, *simple_schema,
|
if (OB_FAIL(ObTabletReplicaChecksumOperator::get_tablet_ls_pairs(tenant_id_, *simple_schema,
|
||||||
*sql_proxy_, pairs))) {
|
*sql_proxy_, pairs))) {
|
||||||
LOG_WARN("fail to get tablet_ls pairs", KR(ret), K_(tenant_id), K(special_table_id));
|
LOG_WARN("fail to get tablet_ls pairs", KR(ret), K_(tenant_id), K(major_merge_special_table_id));
|
||||||
} else if (OB_UNLIKELY(pairs.count() < 1)) {
|
} else if (OB_UNLIKELY(pairs.count() < 1)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("fail to get tablet_ls pairs of current table schema", KR(ret), K_(tenant_id),
|
LOG_WARN("fail to get tablet_ls pairs of current table schema", KR(ret), K_(tenant_id),
|
||||||
K(special_table_id));
|
K(major_merge_special_table_id));
|
||||||
} else if (OB_FAIL(table_compaction_map_.get_refactored(special_table_id, cur_compaction_info))) {
|
} else if (OB_FAIL(table_compaction_map_.get_refactored(major_merge_special_table_id, cur_compaction_info))) {
|
||||||
LOG_WARN("fail to get refactored", KR(ret), K(special_table_id));
|
LOG_WARN("fail to get refactored", KR(ret), K(major_merge_special_table_id));
|
||||||
} else if (OB_FAIL(cross_cluster_validator_.write_tablet_checksum_at_table_level(stop, pairs,
|
} else if (OB_FAIL(cross_cluster_validator_.write_tablet_checksum_at_table_level(stop, pairs,
|
||||||
global_broadcast_scn, cur_compaction_info, special_table_id, expected_epoch))) {
|
global_broadcast_scn, cur_compaction_info, major_merge_special_table_id, expected_epoch))) {
|
||||||
LOG_WARN("fail to write tablet checksum at table level", KR(ret), K_(tenant_id), K(pairs));
|
LOG_WARN("fail to write tablet checksum at table level", KR(ret), K_(tenant_id), K(pairs));
|
||||||
} else if (OB_FAIL(ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
} else if (OB_FAIL(ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
||||||
tenant_id_, global_broadcast_scn.get_val_for_tx(),
|
tenant_id_, global_broadcast_scn.get_val_for_tx(),
|
||||||
|
Reference in New Issue
Block a user