patch 3.1.x modification to open source branch

This commit is contained in:
wd0 2021-07-27 19:42:15 +08:00 committed by wangzelin.wzl
parent 0f38ab7395
commit 91852f9b58
10 changed files with 49 additions and 20 deletions

View File

@ -1802,7 +1802,7 @@ int ObLogStateMgr::on_leader_takeover_()
// only LEADER need call on_leader_takeover
role_change_time_ = ObTimeUtility::current_time();
if (OB_FAIL(partition_service_->submit_pt_update_role_task(partition_key_))) {
CLOG_LOG(WARN, "ps_cb->submit_pt_update_task failed", K_(partition_key), K(ret));
CLOG_LOG(WARN, "ps_cb submit pt update role task failed", K_(partition_key), K(ret));
}
} else {
int64_t before_takeover = ObTimeUtility::current_time();
@ -1872,7 +1872,7 @@ int ObLogStateMgr::on_leader_revoke_()
// only LEADER need call on_leader_revoke
role_change_time_ = ObTimeUtility::current_time();
if (OB_FAIL(partition_service_->submit_pt_update_role_task(partition_key_))) {
CLOG_LOG(WARN, "ps_cb->submit_pt_update_task failed", K_(partition_key), K(ret));
CLOG_LOG(WARN, "ps_cb submit pt update role task failed", K_(partition_key), K(ret));
}
} else {
int64_t before_revoke = ObTimeUtility::current_time();

View File

@ -916,7 +916,7 @@ int ObService::submit_pt_update_role_task(const ObPartitionKey& pkey)
}
int ObService::submit_pt_update_task(
const ObPartitionKey& part_key, const bool need_report_checksum, const bool with_role)
const ObPartitionKey& part_key, const bool need_report_checksum)
{
int ret = OB_SUCCESS;
const bool is_remove = false;
@ -927,7 +927,7 @@ int ObService::submit_pt_update_task(
} else if (!part_key.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(part_key), K(ret));
} else if (OB_FAIL(partition_table_updater_.async_update(part_key, with_role))) {
} else if (OB_FAIL(partition_table_updater_.async_update(part_key, false/*with_role*/))) {
LOG_WARN("async_update failed", K(part_key), K(ret));
} else if (need_report_checksum) {
if (part_key.is_pg()) {
@ -2050,7 +2050,6 @@ int ObService::report_replica(const obrpc::ObReportSingleReplicaArg& arg)
int ret = OB_SUCCESS;
LOG_INFO("receive report replica request", K(arg.partition_key_));
const bool need_report_checksum = false;
const bool with_role_report = true;
ObPartitionArray pkeys;
if (!inited_) {
ret = OB_NOT_INIT;
@ -2079,8 +2078,10 @@ int ObService::report_replica(const obrpc::ObReportSingleReplicaArg& arg)
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(submit_pt_update_task(arg.partition_key_, need_report_checksum, with_role_report))) {
if (OB_FAIL(submit_pt_update_task(arg.partition_key_, need_report_checksum))) {
LOG_WARN("async_update failed", K(arg.partition_key_), K(ret));
} else if (OB_FAIL(submit_pt_update_role_task(arg.partition_key_))) {
LOG_WARN("fail to submit pt update role task", K(ret), "pkey", arg.partition_key_);
}
// update partition meta table, ignore failed
submit_pg_pt_update_task(pkeys);
@ -2904,7 +2905,7 @@ int ObService::report_replica()
ret = OB_SUCCESS;
}
} else if (OB_FAIL(submit_pt_update_task(
partition->get_partition_key(), true /*need report checksum*/, true /*with role report*/))) {
partition->get_partition_key(), true /*need report checksum*/))) {
if (OB_PARTITION_NOT_EXIST == ret) {
// The GC thread is already working,
// and deleted during traversal, the replica has been deleted needs to be avoided blocking the start process
@ -2914,6 +2915,10 @@ int ObService::report_replica()
LOG_WARN(
"submit partition table update task failed", K(ret), "partition_key", partition->get_partition_key());
}
} else if (OB_FAIL(submit_pt_update_role_task(
partition->get_partition_key()))) {
LOG_WARN("fail to submit pt update role task", K(ret),
"pkey", partition->get_partition_key());
} else {
// Update partition meta table without concern for error codes
submit_pg_pt_update_task(pkeys);

View File

@ -101,8 +101,8 @@ public:
////////////////////////////////////////////////////////////////
// ObIPartitionReport interface
virtual int submit_pt_update_task(const common::ObPartitionKey& part_key, const bool need_report_checksum = true,
const bool with_role = false) override;
virtual int submit_pt_update_task(const common::ObPartitionKey& part_key,
const bool need_report_checksum = true) override;
virtual int submit_pt_update_role_task(const common::ObPartitionKey& part_key) override;
virtual void submit_pg_pt_update_task(const common::ObPartitionArray& pg_partitions) override;
virtual int submit_checksum_update_task(const common::ObPartitionKey& part_key, const uint64_t sstable_id,

View File

@ -921,13 +921,13 @@ int ObGlobalIndexBuilder::update_partition_leader_array(common::ObIArray<Partiti
// transaction on the partition not finish, wait and retry
} else if (OB_NOT_MASTER == ret_code || OB_PARTITION_NOT_EXIST == ret_code) {
int64_t part_array_idx = invalid_snapshot_id_array.at(i);
if (part_array_idx >= invalid_snapshot_id_array.count()) {
if (part_array_idx >= partition_leader_array.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("part array id unexpected",
K(ret),
K(part_array_idx),
"invalid snapshot id array count",
invalid_snapshot_id_array.count());
partition_leader_array.count());
} else {
const ObPartitionKey& pkey = partition_leader_array.at(part_array_idx).pkey_;
ObArenaAllocator allocator(ObModIds::OB_RS_PARTITION_TABLE_TEMP);

View File

@ -6255,6 +6255,11 @@ int ObRemoveMemberTask::add_remove_member_task_info(const ObRemoveMemberTaskInfo
return ret;
}
void ObRemoveMemberTask::clear_task_info()
{
task_infos_.reset();
}
// the code try to invoke this func has the responsbility to guarantee
// there is only one task info for each partition
int ObModifyQuorumTask::add_modify_quorum_task_info(const ObModifyQuorumTaskInfo& task_info)

View File

@ -2577,7 +2577,7 @@ public:
int build_remove_member_rpc_arg(obrpc::ObMemberChangeBatchArg& arg) const;
int add_remove_member_task_info(const ObRemoveMemberTaskInfo& task_info);
int assign(const ObRemoveMemberTask& that);
void clear_task_info();
private:
common::ObArray<ObRemoveMemberTaskInfo> task_infos_;
bool admin_force_;

View File

@ -207,10 +207,14 @@ int ObRebalanceTaskUtil::build_batch_remove_member_task(ObRebalanceTaskMgr& task
} else { // cannot accumulate any more, execute the previous task the reset
if (OB_FAIL(task_mgr.add_task(task))) {
LOG_WARN("fail to add task", K(ret));
} else if (OB_FAIL(task_info_array.push_back(task_info))) {
LOG_WARN("fail to add task_info", K(ret), K(task_info));
} else if (OB_FAIL(task.build(task_info_array, leader, comment, check_leader))) {
LOG_WARN("fail to build remove member task", K(ret));
}
task.clear_task_info(); // reset this task whatever
if (OB_SUCC(ret)) {
if (OB_FAIL(task_info_array.push_back(task_info))) {
LOG_WARN("fail to add task_info", K(ret), K(task_info));
} else if (OB_FAIL(task.build(task_info_array, leader, comment, check_leader))) {
LOG_WARN("fail to build remove member task", K(ret));
}
}
}
break;

View File

@ -776,6 +776,7 @@ int ObServerManager::receive_hb(
const ObLeaseRequest& lease_request, uint64_t& server_id, bool& to_alive, bool& update_delay_time_flag)
{
int ret = OB_SUCCESS;
bool zone_exist = true;
to_alive = false;
if (!inited_) {
ret = OB_NOT_INIT;
@ -783,6 +784,15 @@ int ObServerManager::receive_hb(
} else if (!lease_request.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid lease_request", K(lease_request), K(ret));
} else if (OB_UNLIKELY(nullptr == GCTX.root_service_ || nullptr == zone_mgr_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rootservice ptr is null", KR(ret));
} else if (GCTX.root_service_->is_full_service()
&& OB_FAIL(zone_mgr_->check_zone_exist(lease_request.zone_, zone_exist))) {
LOG_WARN("fail to check zone exist", KR(ret));
} else if (!zone_exist) {
ret = OB_ZONE_INFO_NOT_EXIST;
LOG_WARN("zone info not exist", KR(ret), K(lease_request));
} else {
SpinWLockGuard guard(server_status_rwlock_);
const bool with_rootserver = rs_addr_ == lease_request.server_;
@ -1024,8 +1034,13 @@ int ObServerManager::set_server_status(const ObLeaseRequest& lease_request, cons
} else if (!lease_request.is_valid() || hb_timestamp <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid lease_request or invalid hb_timestamp", K(lease_request), K(hb_timestamp), K(ret));
} else if (OB_UNLIKELY(nullptr == GCTX.root_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rootservice ptr is null", KR(ret));
} else {
server_status.zone_ = lease_request.zone_;
if (GCTX.root_service_->is_full_service()) {
server_status.zone_ = lease_request.zone_;
}
MEMCPY(server_status.build_version_, lease_request.build_version_, OB_SERVER_VERSION_LENGTH);
server_status.server_ = lease_request.server_;
server_status.sql_port_ = lease_request.inner_port_;

View File

@ -36,7 +36,7 @@ public:
virtual ~ObIPartitionReport()
{}
virtual int submit_pt_update_task(
const common::ObPartitionKey& part_key, const bool need_report_checksum = true, const bool with_role = false) = 0;
const common::ObPartitionKey& part_key, const bool need_report_checksum = true) = 0;
virtual int submit_pt_update_role_task(const common::ObPartitionKey& pkey) = 0;
virtual void submit_pg_pt_update_task(const common::ObPartitionArray& pg_partitions) = 0;
virtual int submit_checksum_update_task(const common::ObPartitionKey& pkey, const uint64_t sstable_id,

View File

@ -29,8 +29,8 @@ public:
{}
virtual ~MockObIPartitionReport()
{}
MOCK_METHOD3(submit_pt_update_task,
int(const common::ObPartitionKey& part_key, const bool need_report_checksum, const bool with_role));
MOCK_METHOD2(submit_pt_update_task,
int(const common::ObPartitionKey& part_key, const bool need_report_checksum));
MOCK_METHOD1(submit_pt_update_role_task, int(const common::ObPartitionKey& part_key));
MOCK_METHOD1(submit_pg_pt_update_task, void(const common::ObPartitionArray& pg_partitions));
MOCK_METHOD1(pt_sync_update, int(const common::ObPartitionKey& part_key));