fix clog_info membership info may be rewritten bug.
This commit is contained in:
parent
0b54fb4b94
commit
192f6aee4f
@ -143,8 +143,8 @@ int ObBaseStorageInfo::deep_copy(const ObBaseStorageInfo& base_storage_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!base_storage_info.is_valid()) {
|
||||
COMMON_LOG(WARN, "invalid arguments", K(base_storage_info));
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
COMMON_LOG(WARN, "invalid arguments", K(ret), K(base_storage_info));
|
||||
} else {
|
||||
version_ = base_storage_info.version_;
|
||||
epoch_id_ = base_storage_info.epoch_id_;
|
||||
@ -164,6 +164,28 @@ int ObBaseStorageInfo::deep_copy(const ObBaseStorageInfo& base_storage_info)
|
||||
OB_SERIALIZE_MEMBER(ObBaseStorageInfo, version_, epoch_id_, proposal_id_, last_replay_log_id_, last_submit_timestamp_,
|
||||
accumulate_checksum_, replica_num_, membership_timestamp_, membership_log_id_, curr_member_list_, ms_proposal_id_);
|
||||
|
||||
int ObBaseStorageInfo::try_update_member_list_info(const ObBaseStorageInfo& base_storage_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!base_storage_info.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
COMMON_LOG(WARN, "invalid arguments", K(ret), K(base_storage_info));
|
||||
} else if (OB_FAIL(try_update_member_list(base_storage_info.membership_log_id_,
|
||||
base_storage_info.membership_timestamp_,
|
||||
base_storage_info.replica_num_,
|
||||
base_storage_info.curr_member_list_,
|
||||
base_storage_info.ms_proposal_id_))) {
|
||||
if (OB_ENTRY_EXIST == ret) {
|
||||
// This ret code means no need update, so we rewrite it to OB_SUCCESS.
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
COMMON_LOG(WARN, "try_update_member_list failed", K(ret), K(base_storage_info));
|
||||
}
|
||||
} else {
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBaseStorageInfo::try_update_member_list(const uint64_t ms_log_id, const int64_t mc_timestamp,
|
||||
const int64_t replica_num, const ObMemberList& mlist, const common::ObProposalID& ms_proposal_id)
|
||||
{
|
||||
@ -171,18 +193,21 @@ int ObBaseStorageInfo::try_update_member_list(const uint64_t ms_log_id, const in
|
||||
if (mc_timestamp == membership_timestamp_) {
|
||||
ret = OB_ENTRY_EXIST;
|
||||
} else if (mc_timestamp > membership_timestamp_) {
|
||||
membership_log_id_ = ms_log_id;
|
||||
membership_timestamp_ = mc_timestamp;
|
||||
replica_num_ = replica_num;
|
||||
curr_member_list_ = mlist;
|
||||
ms_proposal_id_ = ms_proposal_id;
|
||||
COMMON_LOG(INFO,
|
||||
"ObBaseStorageInfo update member list success",
|
||||
K(ms_log_id),
|
||||
K(mc_timestamp),
|
||||
K(replica_num),
|
||||
K(mlist),
|
||||
K(ms_proposal_id));
|
||||
if (OB_FAIL(curr_member_list_.deep_copy(mlist))) {
|
||||
COMMON_LOG(WARN, "try_update_member_list failed", K(ret));
|
||||
} else {
|
||||
membership_log_id_ = ms_log_id;
|
||||
membership_timestamp_ = mc_timestamp;
|
||||
replica_num_ = replica_num;
|
||||
ms_proposal_id_ = ms_proposal_id;
|
||||
COMMON_LOG(INFO,
|
||||
"ObBaseStorageInfo update member list success",
|
||||
K(ms_log_id),
|
||||
K(mc_timestamp),
|
||||
K(replica_num),
|
||||
K(mlist),
|
||||
K(ms_proposal_id));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -96,6 +96,7 @@ public:
|
||||
{
|
||||
return curr_member_list_;
|
||||
}
|
||||
int try_update_member_list_info(const ObBaseStorageInfo& base_storage_info);
|
||||
int try_update_member_list(const uint64_t ms_log_id, const int64_t mc_timestamp, const int64_t replica_num,
|
||||
const common::ObMemberList& mlist, const common::ObProposalID& ms_proposal_id);
|
||||
int standby_force_update_member_list(const uint64_t ms_log_id, const int64_t mc_timestamp, const int64_t replica_num,
|
||||
|
@ -130,11 +130,14 @@ int ObPartGroupBackupTask::check_partition_validation()
|
||||
ret = OB_NOT_INIT;
|
||||
STORAGE_LOG(ERROR, "not inited", K(ret));
|
||||
} else {
|
||||
const bool is_restore_point = false;
|
||||
ObRecoveryPointSchemaFilter backup_filter;
|
||||
const ObPhysicalBackupArg& backup_arg = task_list_[0].arg_.backup_arg_;
|
||||
if (OB_FAIL(backup_filter.init(backup_arg.tenant_id_,
|
||||
backup_arg.backup_schema_version_, /*backup_schema_version*/
|
||||
backup_arg.backup_schema_version_ /*current_schema_version*/))) {
|
||||
const ObPhysicalBackupArg &backup_arg = task_list_[0].arg_.backup_arg_;
|
||||
if (OB_FAIL(backup_filter.init(
|
||||
backup_arg.tenant_id_,
|
||||
is_restore_point,
|
||||
backup_arg.backup_schema_version_, /*backup_schema_version*/
|
||||
backup_arg.backup_schema_version_ /*current_schema_version*/))) {
|
||||
STORAGE_LOG(WARN, "backup schema filter init fail", K(ret));
|
||||
}
|
||||
|
||||
|
@ -12612,6 +12612,7 @@ int ObRestoreTailoredPrepareTask::schedule_restore_tailored_task(ObRestoreTailor
|
||||
ObIPartitionGroup* partition_group = NULL;
|
||||
ObPartitionArray pkeys;
|
||||
const bool include_trans_table = false;
|
||||
const bool is_restore_point = false;
|
||||
const int64_t restore_schema_version = OB_INVALID_VERSION;
|
||||
const int64_t current_schema_version = OB_INVALID_VERSION;
|
||||
ObRecoveryPointSchemaFilter schema_filter;
|
||||
@ -12626,7 +12627,7 @@ int ObRestoreTailoredPrepareTask::schedule_restore_tailored_task(ObRestoreTailor
|
||||
LOG_WARN("partition group should not be NULL", K(ret), KP(partition_group));
|
||||
} else if (OB_FAIL(partition_group->get_all_pg_partition_keys(pkeys, include_trans_table))) {
|
||||
LOG_WARN("failed to get all pg partition keys", K(ret), "pg key", ctx_->replica_op_arg_.key_);
|
||||
} else if (OB_FAIL(schema_filter.init(tenant_id, restore_schema_version, current_schema_version))) {
|
||||
} else if (OB_FAIL(schema_filter.init(tenant_id, is_restore_point, restore_schema_version, current_schema_version))) {
|
||||
LOG_WARN("failed to init schema filter", K(ret), K(tenant_id), K(restore_schema_version));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < pkeys.count(); ++i) {
|
||||
|
@ -7994,6 +7994,7 @@ int ObPGStorage::update_restore_points(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSEArray<int64_t, 1> snapshot_versions;
|
||||
const bool is_restore_point = true;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("pg storage is not inited", K(ret));
|
||||
@ -8014,7 +8015,7 @@ int ObPGStorage::update_restore_points(
|
||||
LOG_WARN("failed to check restore point exist", K(ret), K_(pkey), K(restore_points));
|
||||
} else if (!is_exist) {
|
||||
if (OB_FAIL(
|
||||
get_restore_point_tables_(snapshot_ts, schema_version, pg_meta, metas, handle, is_ready, is_need))) {
|
||||
get_restore_point_tables_(snapshot_ts, schema_version, is_restore_point, pg_meta, metas, handle, is_ready, is_need))) {
|
||||
LOG_WARN("failed to get restore point sstables", K(ret), K_(pkey), K(snapshot_ts));
|
||||
} else if (!is_need) {
|
||||
// do nothing
|
||||
@ -8046,6 +8047,7 @@ int ObPGStorage::update_backup_points(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSEArray<int64_t, 1> snapshot_versions;
|
||||
const bool is_restore_point = false;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < backup_points.count(); i++) {
|
||||
const int64_t snapshot_ts = backup_points.at(i);
|
||||
const int64_t schema_version = schema_versions.at(i);
|
||||
@ -8058,7 +8060,7 @@ int ObPGStorage::update_backup_points(
|
||||
if (OB_FAIL(recovery_point_data_mgr_.check_backup_point_exist(snapshot_ts, is_exist))) {
|
||||
LOG_WARN("failed to check restore point exist", K(ret), K_(pkey), K(backup_points));
|
||||
} else if (!is_exist) {
|
||||
if (OB_FAIL(get_restore_point_tables_(snapshot_ts, schema_version, pg_meta, metas, handle, is_ready, is_need))) {
|
||||
if (OB_FAIL(get_restore_point_tables_(snapshot_ts, schema_version, is_restore_point, pg_meta, metas, handle, is_ready, is_need))) {
|
||||
LOG_WARN("failed to get restore point sstables", K(ret), K_(pkey), K(snapshot_ts));
|
||||
} else if (!is_need) {
|
||||
// do nothing
|
||||
@ -8105,7 +8107,7 @@ int ObPGStorage::get_backup_partition_meta_data(const ObPartitionKey& pkey, cons
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPGStorage::get_restore_point_tables_(const int64_t snapshot_version, const int64_t schema_version,
|
||||
int ObPGStorage::get_restore_point_tables_(const int64_t snapshot_version, const int64_t schema_version, const bool is_restore_point,
|
||||
ObPartitionGroupMeta& pg_meta, ObIArray<ObPGPartitionStoreMeta>& partition_metas, ObTablesHandle& handle,
|
||||
bool& is_ready, bool& is_need)
|
||||
{
|
||||
@ -8151,7 +8153,7 @@ int ObPGStorage::get_restore_point_tables_(const int64_t snapshot_version, const
|
||||
} else if (FALSE_IT(minor_schema_version = minor_schema_version < max_sstable_schema_version
|
||||
? max_sstable_schema_version
|
||||
: minor_schema_version)) {
|
||||
} else if (OB_FAIL(schema_filter.init(tenant_id, real_schema_version, minor_schema_version))) {
|
||||
} else if (OB_FAIL(schema_filter.init(tenant_id, is_restore_point, real_schema_version, minor_schema_version))) {
|
||||
LOG_WARN("failed to init backup schema checker",
|
||||
K(ret),
|
||||
K(pkey_),
|
||||
|
@ -609,7 +609,7 @@ private:
|
||||
int compat_fill_log_ts(ObArray<ObSSTable*>& replay_tables);
|
||||
|
||||
int replay_for_compat_(ObSSTable* sstable, int64_t& fill_log_ts);
|
||||
int get_restore_point_tables_(const int64_t snapshot_version, const int64_t schema_version,
|
||||
int get_restore_point_tables_(const int64_t snapshot_version, const int64_t schema_version, const bool is_restore_point,
|
||||
ObPartitionGroupMeta& pg_meta, ObIArray<ObPGPartitionStoreMeta>& partition_metas, ObTablesHandle& handle,
|
||||
bool& is_ready, bool& is_need);
|
||||
int alloc_meta_(ObPartitionGroupMeta*& meta);
|
||||
|
@ -223,6 +223,11 @@ int ObSavedStorageInfoV2::update_last_replay_log_info_(const ObPartitionKey& pke
|
||||
STORAGE_LOG(WARN, "base storage info copy failed", K(ret));
|
||||
}
|
||||
}
|
||||
// Here we need update clog_info_'s member_list info, because member_list and last_replay_log_id
|
||||
// may be updated seprately.
|
||||
// fix issue #35065166
|
||||
} else if (OB_FAIL(clog_info_.try_update_member_list_info(old_clog_info))) {
|
||||
STORAGE_LOG(WARN, "clog_info_.try_update_member_list_info failed", K(ret), K(old_clog_info));
|
||||
} else if (OB_FAIL(query_log_info_with_log_id(
|
||||
pkey, clog_info_.get_last_replay_log_id(), timeout, accum_checksum, submit_timestamp, epoch_id))) {
|
||||
STORAGE_LOG(WARN, "failed to query accum checksum", K(ret), K(pkey), K(*this));
|
||||
|
@ -880,6 +880,7 @@ int ObCreatePartitionParam::replace_tenant_id(const uint64_t new_tenant_id)
|
||||
/**********************ObRecoveryPointSchemaFilter***********************/
|
||||
ObRecoveryPointSchemaFilter::ObRecoveryPointSchemaFilter()
|
||||
: is_inited_(false),
|
||||
is_restore_point_(false),
|
||||
tenant_id_(OB_INVALID_ID),
|
||||
tenant_recovery_point_schema_version_(OB_INVALID_VERSION),
|
||||
tenant_current_schema_version_(OB_INVALID_VERSION),
|
||||
@ -896,7 +897,7 @@ bool ObRecoveryPointSchemaFilter::is_inited() const
|
||||
return is_inited_;
|
||||
}
|
||||
|
||||
int ObRecoveryPointSchemaFilter::init(const int64_t tenant_id, const int64_t tenant_recovery_point_schema_version,
|
||||
int ObRecoveryPointSchemaFilter::init(const int64_t tenant_id, const bool is_restore_point, const int64_t tenant_recovery_point_schema_version,
|
||||
const int64_t tenant_current_schema_version)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -926,6 +927,7 @@ int ObRecoveryPointSchemaFilter::init(const int64_t tenant_id, const int64_t ten
|
||||
tenant_id, schema_service, tenant_current_schema_version, current_schema_guard_))) {
|
||||
STORAGE_LOG(WARN, "failed to get tenant current schema guard", K(ret), K(tenant_current_schema_version));
|
||||
} else {
|
||||
is_restore_point_ = is_restore_point;
|
||||
tenant_id_ = tenant_id;
|
||||
tenant_recovery_point_schema_version_ = tenant_recovery_point_schema_version;
|
||||
tenant_current_schema_version_ = tenant_current_schema_version;
|
||||
@ -1046,7 +1048,7 @@ int ObRecoveryPointSchemaFilter::check_table_exist_(
|
||||
} else if (OB_FAIL(schema_guard.get_table_schema(table_id, table_schema))) {
|
||||
STORAGE_LOG(WARN, "failed to get table schema", K(ret), K(table_id));
|
||||
} else if (OB_FAIL(
|
||||
ObBackupRestoreTableSchemaChecker::check_backup_restore_need_skip_table(table_schema, need_skip))) {
|
||||
ObBackupRestoreTableSchemaChecker::check_backup_restore_need_skip_table(table_schema, need_skip, is_restore_point_))) {
|
||||
LOG_WARN("failed to check backup restore need skip table", K(ret), K(table_id));
|
||||
} else if (!need_skip) {
|
||||
// do nothing
|
||||
@ -1246,7 +1248,7 @@ int ObRecoveryPointSchemaFilter::get_table_ids_in_pg_(const ObPartitionKey& pgke
|
||||
|
||||
/***********************ObBackupRestoreTableSchemaChecker***************************/
|
||||
int ObBackupRestoreTableSchemaChecker::check_backup_restore_need_skip_table(
|
||||
const share::schema::ObTableSchema* table_schema, bool& need_skip)
|
||||
const share::schema::ObTableSchema* table_schema, bool& need_skip, const bool is_restore_point)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObIndexStatus index_status;
|
||||
@ -1260,8 +1262,12 @@ int ObBackupRestoreTableSchemaChecker::check_backup_restore_need_skip_table(
|
||||
} else if (table_schema->is_dropped_schema()) {
|
||||
STORAGE_LOG(INFO, "table is dropped, skip it", K(table_id));
|
||||
} else if (FALSE_IT(index_status = table_schema->get_index_status())) {
|
||||
} else if (table_schema->is_index_table() && ObIndexStatus::INDEX_STATUS_AVAILABLE != index_status) {
|
||||
STORAGE_LOG(INFO, "restore table is not available index, skip it", K(index_status), K(*table_schema));
|
||||
} else if (table_schema->is_index_table()
|
||||
&& (is_restore_point ?
|
||||
!is_final_index_status(index_status, table_schema->is_dropped_schema()) :
|
||||
ObIndexStatus::INDEX_STATUS_AVAILABLE != index_status)) {
|
||||
STORAGE_LOG(INFO, "restore table index is not expected status, skip it",
|
||||
K(is_restore_point), K(index_status), K(*table_schema));
|
||||
} else {
|
||||
need_skip = false;
|
||||
}
|
||||
|
@ -610,7 +610,7 @@ public:
|
||||
ObRecoveryPointSchemaFilter();
|
||||
virtual ~ObRecoveryPointSchemaFilter();
|
||||
bool is_inited() const;
|
||||
int init(const int64_t tenant_id, const int64_t tenant_recovery_point_schema_version,
|
||||
int init(const int64_t tenant_id, const bool is_restore_point, const int64_t tenant_recovery_point_schema_version,
|
||||
const int64_t tenant_current_schema_version);
|
||||
// check pg/partition exist
|
||||
int check_partition_exist(const common::ObPartitionKey pkey, bool& is_exist);
|
||||
@ -634,6 +634,7 @@ private:
|
||||
|
||||
private:
|
||||
bool is_inited_;
|
||||
bool is_restore_point_;
|
||||
int64_t tenant_id_;
|
||||
int64_t tenant_recovery_point_schema_version_;
|
||||
int64_t tenant_current_schema_version_;
|
||||
@ -645,7 +646,7 @@ private:
|
||||
|
||||
class ObBackupRestoreTableSchemaChecker {
|
||||
public:
|
||||
static int check_backup_restore_need_skip_table(const share::schema::ObTableSchema* table_schema, bool& need_skip);
|
||||
static int check_backup_restore_need_skip_table(const share::schema::ObTableSchema* table_schema, bool& need_skip, const bool is_restore_point = false);
|
||||
};
|
||||
|
||||
class ObRebuildListener {
|
||||
|
Loading…
x
Reference in New Issue
Block a user