restoring replica do not resp reconfirm fetch log req.

This commit is contained in:
hf0
2021-07-22 00:01:05 +08:00
committed by wangzelin.wzl
parent 7fb21bbbe7
commit 69fa727d91
44 changed files with 990 additions and 334 deletions

View File

@ -2274,6 +2274,37 @@ void ObPartitionService::submit_pt_update_task_(const ObPartitionKey& pkey, cons
UNUSED(ret);
}
int ObPartitionService::submit_pg_pt_update_task_(const ObPartitionKey& pkey)
{
int ret = OB_SUCCESS;
ObPartitionArray pkeys;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
} else if (!pkey.is_valid()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(pkey));
} else if (pkey.is_pg()) {
ObIPartitionGroupGuard guard;
ObIPartitionGroup* partition = NULL;
if (OB_FAIL(get_partition(pkey, guard))) {
STORAGE_LOG(WARN, "get partition failed, ", K(ret), K(pkey));
} else if (OB_UNLIKELY(NULL == (partition = guard.get_partition_group()))) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "Unexpected error, the partition is NULL, ", K(pkey));
} else if (OB_FAIL(partition->get_all_pg_partition_keys(pkeys))) {
STORAGE_LOG(WARN, "get all pg partition keys error", K(ret), K(pkey));
}
rs_cb_->submit_pg_pt_update_task(pkeys);
} else {
if (OB_FAIL(pkeys.push_back(pkey))) {
STORAGE_LOG(WARN, "pkeys push back error", K(ret), K(pkey));
}
rs_cb_->submit_pg_pt_update_task(pkeys);
}
return ret;
}
/**
create partition group and create partitions
(1) create pg
@ -4462,29 +4493,6 @@ int ObPartitionService::get_role(const common::ObPartitionKey& pkey, common::ObR
return ret;
}
int ObPartitionService::get_role_for_partition_table(const common::ObPartitionKey& pkey, common::ObRole& role) const
{
int ret = OB_SUCCESS;
ObIPartitionGroupGuard guard;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "partition service is not initiated", K(ret));
} else if (OB_UNLIKELY(!pkey.is_valid())) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(DEBUG, "invalid argument", K(ret));
} else if (OB_FAIL(get_partition(pkey, guard))) {
STORAGE_LOG(DEBUG, "get partition failed", K(pkey), K(ret));
} else if (OB_ISNULL(guard.get_partition_group())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "get partition failed", K(pkey), K(ret));
} else if (OB_FAIL(guard.get_partition_group()->get_role_for_partition_table(role))) {
STORAGE_LOG(WARN, "get_role_for_partition_table failed", K(pkey), K(ret));
} else {
// do nothing
}
return ret;
}
int ObPartitionService::get_role_unsafe(const common::ObPartitionKey& pkey, common::ObRole& role) const
{
class GetRoleFunctor {
@ -7908,6 +7916,7 @@ int ObPartitionService::process_ms_info_task(ObMsInfoTask& task)
{
int ret = OB_SUCCESS;
ObTimeGuard timeguard("process_ms_info_task", 50L * 1000L);
const int64_t now = ObTimeUtility::current_time();
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
@ -7915,6 +7924,10 @@ int ObPartitionService::process_ms_info_task(ObMsInfoTask& task)
} else if (!task.is_valid()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "invalid argument", K(ret), K(task));
} else if (now - task.get_gen_ts() >= ObSlogWriterQueueThread::SLOG_FLUSH_TASK_TIMEOUT_THRESHOLD) {
if (REACH_TIME_INTERVAL(100 * 1000)) {
STORAGE_LOG(WARN, "task has been timeout, drop it", K(ret), K(task));
}
} else if (OB_FAIL(on_member_change_success(task.get_pkey(),
task.get_log_type(),
task.get_ms_log_id(),
@ -9942,7 +9955,8 @@ int ObPartitionService::submit_ms_info_task(const common::ObPartitionKey& pkey,
K(prev_member_list),
K(curr_member_list));
} else {
ObMsInfoTask task(pkey, server, cluster_id, log_type, ms_log_id, mc_timestamp, replica_num, ms_proposal_id);
const int64_t now = ObTimeUtility::current_time();
ObMsInfoTask task(pkey, server, cluster_id, log_type, ms_log_id, mc_timestamp, replica_num, ms_proposal_id, now);
if (OB_FAIL(task.update_prev_member_list(prev_member_list))) {
STORAGE_LOG(WARN, "update_prev_member_list failed", K(task), K(ret));
} else if (OB_FAIL(task.update_curr_member_list(curr_member_list))) {
@ -10248,6 +10262,8 @@ int ObPartitionService::internal_leader_active(const ObCbTask& active_task)
STORAGE_LOG(WARN, "get partition failed", K(pkey), K(ret));
} else if (OB_FAIL(rs_cb_->submit_pt_update_role_task(pkey))) {
STORAGE_LOG(WARN, "internal_leader_active callback failed", K(pkey), K(ret));
} else if (OB_FAIL(submit_pg_pt_update_task_(pkey))) {
STORAGE_LOG(WARN, "submit_pg_pt_update_task_ failed", K(pkey), K(ret));
} else if (LEADER != active_task.role_) {
// only LEADER need exec
ret = OB_ERR_UNEXPECTED;
@ -11941,9 +11957,8 @@ bool ObReplicaOpArg::is_physical_restore_follower() const
bool ObReplicaOpArg::is_FtoL() const
{
return CHANGE_REPLICA_OP == type_
&& ObReplicaType::REPLICA_TYPE_FULL == src_.get_replica_type()
&& ObReplicaType::REPLICA_TYPE_LOGONLY == dst_.get_replica_type();
return CHANGE_REPLICA_OP == type_ && ObReplicaType::REPLICA_TYPE_FULL == src_.get_replica_type() &&
ObReplicaType::REPLICA_TYPE_LOGONLY == dst_.get_replica_type();
}
bool ObReplicaOpArg::is_standby_restore() const