fix misuse of async rpc while create ls
This commit is contained in:
parent
cb953e3049
commit
4dfd67d8fb
@ -654,11 +654,16 @@ int ObAdminRefreshSchema::call_server(const ObAddr &server)
|
||||
arg.schema_info_ = schema_info_;
|
||||
ObArray<int> return_code_array;
|
||||
ObSwitchSchemaProxy proxy(*GCTX.srv_rpc_proxy_, &ObSrvRpcProxy::switch_schema);
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
const int64_t timeout_ts = ctx.get_timeout(0);
|
||||
if (OB_FAIL(proxy.call(server, timeout_ts, arg))) {
|
||||
LOG_WARN("notify switch schema failed", KR(ret), K(server), K_(schema_version), K_(schema_info));
|
||||
} else if (OB_FAIL(proxy.wait_all(return_code_array))) {
|
||||
LOG_WARN("fail to wait all", KR(ret), K(server));
|
||||
}
|
||||
|
||||
if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) {
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
LOG_WARN("fail to wait all", KR(ret), KR(tmp_ret), K(server));
|
||||
} else if (OB_FAIL(ret)) {
|
||||
} else if (OB_UNLIKELY(return_code_array.empty())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("return_code_array is empty", KR(ret), K(server));
|
||||
|
@ -420,10 +420,10 @@ int ObTenantRoleTransitionService::get_ls_access_mode_(ObIArray<LSAccessModeInfo
|
||||
} else if (OB_FAIL(GCTX.location_service_->get_leader(
|
||||
GCONF.cluster_id, tenant_id_, info.ls_id_, false, leader))) {
|
||||
LOG_WARN("failed to get leader", KR(ret), K(tenant_id_), K(info));
|
||||
|
||||
} else if (OB_FAIL(arg.init(tenant_id_, info.ls_id_))) {
|
||||
LOG_WARN("failed to init arg", KR(ret), K(tenant_id_), K(info));
|
||||
} else if (OB_FAIL(proxy.call(leader, timeout, GCONF.cluster_id, tenant_id_, arg))) {
|
||||
//can not ignore error of each ls
|
||||
LOG_WARN("failed to send rpc", KR(ret), K(leader), K(timeout), K(tenant_id_), K(arg));
|
||||
} else {
|
||||
rpc_count++;
|
||||
@ -514,14 +514,18 @@ int ObTenantRoleTransitionService::do_change_ls_access_mode_(const ObIArray<LSAc
|
||||
if (OB_FAIL(arg.init(tenant_id_, info.ls_id_, info.mode_version_, target_access_mode, ref_scn))) {
|
||||
LOG_WARN("failed to init arg", KR(ret), K(info), K(target_access_mode), K(ref_scn));
|
||||
} else if (OB_FAIL(proxy.call(info.leader_addr_, timeout, GCONF.cluster_id, tenant_id_, arg))) {
|
||||
//can not ignore of each ls
|
||||
LOG_WARN("failed to send rpc", KR(ret), K(info), K(timeout), K(tenant_id_), K(arg));
|
||||
}
|
||||
}//end for
|
||||
//result
|
||||
ObArray<int> return_code_array;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
const int64_t rpc_count = ls_access_info.count();
|
||||
if (FAILEDx(proxy.wait_all(return_code_array))) {
|
||||
LOG_WARN("wait all batch result failed", KR(ret));
|
||||
if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) {
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
LOG_WARN("wait all batch result failed", KR(ret), KR(tmp_ret));
|
||||
} else if (OB_FAIL(ret)) {
|
||||
} else if (rpc_count != return_code_array.count() ||
|
||||
rpc_count != proxy.get_args().count() ||
|
||||
rpc_count != proxy.get_results().count()) {
|
||||
|
@ -366,6 +366,8 @@ int ObLSCreator::create_ls_(const ObILSAddr &addrs,
|
||||
} else {
|
||||
obrpc::ObCreateLSArg arg;
|
||||
int64_t rpc_count = 0;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObArray<int> return_code_array;
|
||||
lib::Worker::CompatMode new_compat_mode = compat_mode == ORACLE_MODE ?
|
||||
lib::Worker::CompatMode::ORACLE :
|
||||
lib::Worker::CompatMode::MYSQL;
|
||||
@ -378,16 +380,21 @@ int ObLSCreator::create_ls_(const ObILSAddr &addrs,
|
||||
create_with_palf, palf_base_info))) {
|
||||
LOG_WARN("failed to init create log stream arg", KR(ret), K(addr), K(create_with_palf),
|
||||
K_(id), K_(tenant_id), K(tenant_info), K(create_scn), K(new_compat_mode), K(palf_base_info));
|
||||
} else if (OB_FAIL(create_ls_proxy_.call(addr.addr_, ctx.get_timeout(),
|
||||
} else if (OB_TMP_FAIL(create_ls_proxy_.call(addr.addr_, ctx.get_timeout(),
|
||||
GCONF.cluster_id, tenant_id_, arg))) {
|
||||
LOG_WARN("failed to all async rpc", KR(ret), K(addr), K(ctx.get_timeout()),
|
||||
LOG_WARN("failed to all async rpc", KR(tmp_ret), K(addr), K(ctx.get_timeout()),
|
||||
K(arg), K(tenant_id_));
|
||||
} else {
|
||||
rpc_count++;
|
||||
}
|
||||
}
|
||||
if (FAILEDx(check_create_ls_result_(rpc_count, paxos_replica_num, member_list))) {
|
||||
LOG_WARN("failed to check ls result", KR(ret), K(rpc_count), K(paxos_replica_num));
|
||||
//wait all
|
||||
if (OB_TMP_FAIL(create_ls_proxy_.wait_all(return_code_array))) {
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
LOG_WARN("failed to wait all async rpc", KR(ret), KR(tmp_ret), K(rpc_count));
|
||||
}
|
||||
if (FAILEDx(check_create_ls_result_(rpc_count, paxos_replica_num, return_code_array, member_list))) {
|
||||
LOG_WARN("failed to check ls result", KR(ret), K(rpc_count), K(paxos_replica_num), K(return_code_array));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -396,18 +403,15 @@ int ObLSCreator::create_ls_(const ObILSAddr &addrs,
|
||||
|
||||
int ObLSCreator::check_create_ls_result_(const int64_t rpc_count,
|
||||
const int64_t paxos_replica_num,
|
||||
const ObIArray<int> &return_code_array,
|
||||
common::ObMemberList &member_list)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObArray<int> return_code_array;
|
||||
member_list.reset();
|
||||
if (OB_UNLIKELY(!is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret));
|
||||
} else if (OB_FAIL(create_ls_proxy_.wait_all(return_code_array))) {
|
||||
LOG_WARN("wait all batch result failed", KR(ret));
|
||||
} else if (rpc_count != return_code_array.count()
|
||||
|| rpc_count != create_ls_proxy_.get_args().count()
|
||||
|| rpc_count != create_ls_proxy_.get_results().count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("rpc count not equal to result count", KR(ret), K(rpc_count),
|
||||
@ -425,12 +429,11 @@ int ObLSCreator::check_create_ls_result_(const int64_t rpc_count,
|
||||
} else if (OB_SUCCESS != result->get_result()) {
|
||||
LOG_WARN("rpc is failed", KR(ret), K(*result), K(i));
|
||||
} else {
|
||||
const obrpc::ObCreateLSArg &arg = create_ls_proxy_.get_args().at(i);
|
||||
const ObAddr &addr = create_ls_proxy_.get_dests().at(i);
|
||||
if (common::ObReplicaTypeCheck::is_paxos_replica_V2(arg.get_replica_type())) {
|
||||
if (OB_FAIL(member_list.add_member(ObMember(addr, timestamp)))) {
|
||||
LOG_WARN("failed to add member", KR(ret), K(addr));
|
||||
}
|
||||
//TODO other replica type
|
||||
//can not get replica type from arg, arg and result is not match
|
||||
if (OB_FAIL(member_list.add_member(ObMember(addr, timestamp)))) {
|
||||
LOG_WARN("failed to add member", KR(ret), K(addr));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -470,6 +473,7 @@ int ObLSCreator::set_member_list_(const common::ObMemberList &member_list,
|
||||
const int64_t paxos_replica_num)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
DEBUG_SYNC(BEFORE_SET_LS_MEMBER_LIST);
|
||||
if (OB_UNLIKELY(!is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret));
|
||||
@ -484,6 +488,8 @@ int ObLSCreator::set_member_list_(const common::ObMemberList &member_list,
|
||||
} else {
|
||||
ObSetMemberListArgV2 arg;
|
||||
int64_t rpc_count = 0;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObArray<int> return_code_array;
|
||||
if (OB_FAIL(arg.init(tenant_id_, id_, paxos_replica_num, member_list))) {
|
||||
LOG_WARN("failed to init set member list arg", KR(ret), K_(id), K_(tenant_id),
|
||||
K(paxos_replica_num), K(member_list));
|
||||
@ -492,16 +498,23 @@ int ObLSCreator::set_member_list_(const common::ObMemberList &member_list,
|
||||
ObAddr addr;
|
||||
if (OB_FAIL(member_list.get_server_by_index(i, addr))) {
|
||||
LOG_WARN("failed to get member by index", KR(ret), K(i), K(member_list));
|
||||
} else if (OB_FAIL(set_member_list_proxy_.call(addr, ctx.get_timeout(),
|
||||
} else if (OB_TMP_FAIL(set_member_list_proxy_.call(addr, ctx.get_timeout(),
|
||||
GCONF.cluster_id, tenant_id_, arg))) {
|
||||
LOG_WARN("failed to set member list", KR(ret), K(ctx.get_timeout()), K(arg),
|
||||
LOG_WARN("failed to set member list", KR(tmp_ret), K(ctx.get_timeout()), K(arg),
|
||||
K(tenant_id_));
|
||||
} else {
|
||||
rpc_count++;
|
||||
}
|
||||
}
|
||||
if (FAILEDx(check_set_memberlist_result_(rpc_count, paxos_replica_num))) {
|
||||
LOG_WARN("failed to check set member liset result", KR(ret), K(rpc_count), K(paxos_replica_num));
|
||||
|
||||
if (OB_TMP_FAIL(set_member_list_proxy_.wait_all(return_code_array))) {
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
LOG_WARN("failed to wait all async rpc", KR(ret), KR(tmp_ret), K(rpc_count));
|
||||
|
||||
}
|
||||
if (FAILEDx(check_set_memberlist_result_(rpc_count, return_code_array, paxos_replica_num))) {
|
||||
LOG_WARN("failed to check set member liset result", KR(ret), K(rpc_count),
|
||||
K(paxos_replica_num), K(return_code_array));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -510,17 +523,14 @@ int ObLSCreator::set_member_list_(const common::ObMemberList &member_list,
|
||||
}
|
||||
|
||||
int ObLSCreator::check_set_memberlist_result_(const int64_t rpc_count,
|
||||
const ObIArray<int> &return_code_array,
|
||||
const int64_t paxos_replica_num)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObArray<int> return_code_array;
|
||||
if (OB_UNLIKELY(!is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret));
|
||||
} else if (OB_FAIL(set_member_list_proxy_.wait_all(return_code_array))) {
|
||||
LOG_WARN("wait all batch result failed", KR(ret));
|
||||
} else if (rpc_count != return_code_array.count()
|
||||
|| rpc_count != set_member_list_proxy_.get_args().count()
|
||||
|| rpc_count != set_member_list_proxy_.get_results().count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("rpc count not equal to result count", KR(ret), K(rpc_count),
|
||||
|
@ -149,8 +149,10 @@ private:
|
||||
ObLSReplicaAddr &ls_replica_addr);
|
||||
int check_create_ls_result_(const int64_t rpc_count,
|
||||
const int64_t paxos_replica_num,
|
||||
const ObIArray<int> &return_code_array,
|
||||
common::ObMemberList &member_list);
|
||||
int check_set_memberlist_result_(const int64_t rpc_count,
|
||||
const ObIArray<int> &return_code_array,
|
||||
const int64_t paxos_replica_num);
|
||||
|
||||
private:
|
||||
|
@ -430,6 +430,7 @@ class ObString;
|
||||
ACT(DDL_CHECK_TABLET_MERGE_STATUS,)\
|
||||
ACT(MODIFY_HIDDEN_TABLE_NOT_NULL_COLUMN_STATE_BEFORE_PUBLISH_SCHEMA,)\
|
||||
ACT(AFTER_MIGRATION_FETCH_TABLET_INFO,)\
|
||||
ACT(BEFORE_SET_LS_MEMBER_LIST,)\
|
||||
ACT(MAX_DEBUG_SYNC_POINT,)
|
||||
|
||||
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);
|
||||
|
Loading…
x
Reference in New Issue
Block a user