From 4dfd67d8fb96753c3b1d696bc65ea3a90e47a8f8 Mon Sep 17 00:00:00 2001 From: maosy <630014370@qq.com> Date: Fri, 6 Jan 2023 08:38:08 +0000 Subject: [PATCH] fix misuse of async rpc while create ls --- src/rootserver/ob_system_admin_util.cpp | 9 +++- .../ob_tenant_role_transition_service.cpp | 10 ++-- src/share/ls/ob_ls_creator.cpp | 52 +++++++++++-------- src/share/ls/ob_ls_creator.h | 2 + src/share/ob_debug_sync_point.h | 1 + 5 files changed, 48 insertions(+), 26 deletions(-) diff --git a/src/rootserver/ob_system_admin_util.cpp b/src/rootserver/ob_system_admin_util.cpp index d5b6b5824..4c6f8f351 100644 --- a/src/rootserver/ob_system_admin_util.cpp +++ b/src/rootserver/ob_system_admin_util.cpp @@ -654,11 +654,16 @@ int ObAdminRefreshSchema::call_server(const ObAddr &server) arg.schema_info_ = schema_info_; ObArray return_code_array; ObSwitchSchemaProxy proxy(*GCTX.srv_rpc_proxy_, &ObSrvRpcProxy::switch_schema); + int tmp_ret = OB_SUCCESS; const int64_t timeout_ts = ctx.get_timeout(0); if (OB_FAIL(proxy.call(server, timeout_ts, arg))) { LOG_WARN("notify switch schema failed", KR(ret), K(server), K_(schema_version), K_(schema_info)); - } else if (OB_FAIL(proxy.wait_all(return_code_array))) { - LOG_WARN("fail to wait all", KR(ret), K(server)); + } + + if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { + ret = OB_SUCC(ret) ? tmp_ret : ret; + LOG_WARN("fail to wait all", KR(ret), KR(tmp_ret), K(server)); + } else if (OB_FAIL(ret)) { } else if (OB_UNLIKELY(return_code_array.empty())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("return_code_array is empty", KR(ret), K(server)); diff --git a/src/rootserver/ob_tenant_role_transition_service.cpp b/src/rootserver/ob_tenant_role_transition_service.cpp index 6d64cabaa..3a4e658b2 100644 --- a/src/rootserver/ob_tenant_role_transition_service.cpp +++ b/src/rootserver/ob_tenant_role_transition_service.cpp @@ -420,10 +420,10 @@ int ObTenantRoleTransitionService::get_ls_access_mode_(ObIArrayget_leader( GCONF.cluster_id, tenant_id_, info.ls_id_, false, leader))) { LOG_WARN("failed to get leader", KR(ret), K(tenant_id_), K(info)); - } else if (OB_FAIL(arg.init(tenant_id_, info.ls_id_))) { LOG_WARN("failed to init arg", KR(ret), K(tenant_id_), K(info)); } else if (OB_FAIL(proxy.call(leader, timeout, GCONF.cluster_id, tenant_id_, arg))) { + //can not ignore error of each ls LOG_WARN("failed to send rpc", KR(ret), K(leader), K(timeout), K(tenant_id_), K(arg)); } else { rpc_count++; @@ -514,14 +514,18 @@ int ObTenantRoleTransitionService::do_change_ls_access_mode_(const ObIArray return_code_array; + int tmp_ret = OB_SUCCESS; const int64_t rpc_count = ls_access_info.count(); - if (FAILEDx(proxy.wait_all(return_code_array))) { - LOG_WARN("wait all batch result failed", KR(ret)); + if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { + ret = OB_SUCC(ret) ? tmp_ret : ret; + LOG_WARN("wait all batch result failed", KR(ret), KR(tmp_ret)); + } else if (OB_FAIL(ret)) { } else if (rpc_count != return_code_array.count() || rpc_count != proxy.get_args().count() || rpc_count != proxy.get_results().count()) { diff --git a/src/share/ls/ob_ls_creator.cpp b/src/share/ls/ob_ls_creator.cpp index 0b3f987f7..8adf61d22 100644 --- a/src/share/ls/ob_ls_creator.cpp +++ b/src/share/ls/ob_ls_creator.cpp @@ -366,6 +366,8 @@ int ObLSCreator::create_ls_(const ObILSAddr &addrs, } else { obrpc::ObCreateLSArg arg; int64_t rpc_count = 0; + int tmp_ret = OB_SUCCESS; + ObArray return_code_array; lib::Worker::CompatMode new_compat_mode = compat_mode == ORACLE_MODE ? lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL; @@ -378,16 +380,21 @@ int ObLSCreator::create_ls_(const ObILSAddr &addrs, create_with_palf, palf_base_info))) { LOG_WARN("failed to init create log stream arg", KR(ret), K(addr), K(create_with_palf), K_(id), K_(tenant_id), K(tenant_info), K(create_scn), K(new_compat_mode), K(palf_base_info)); - } else if (OB_FAIL(create_ls_proxy_.call(addr.addr_, ctx.get_timeout(), + } else if (OB_TMP_FAIL(create_ls_proxy_.call(addr.addr_, ctx.get_timeout(), GCONF.cluster_id, tenant_id_, arg))) { - LOG_WARN("failed to all async rpc", KR(ret), K(addr), K(ctx.get_timeout()), + LOG_WARN("failed to all async rpc", KR(tmp_ret), K(addr), K(ctx.get_timeout()), K(arg), K(tenant_id_)); } else { rpc_count++; } } - if (FAILEDx(check_create_ls_result_(rpc_count, paxos_replica_num, member_list))) { - LOG_WARN("failed to check ls result", KR(ret), K(rpc_count), K(paxos_replica_num)); + //wait all + if (OB_TMP_FAIL(create_ls_proxy_.wait_all(return_code_array))) { + ret = OB_SUCC(ret) ? tmp_ret : ret; + LOG_WARN("failed to wait all async rpc", KR(ret), KR(tmp_ret), K(rpc_count)); + } + if (FAILEDx(check_create_ls_result_(rpc_count, paxos_replica_num, return_code_array, member_list))) { + LOG_WARN("failed to check ls result", KR(ret), K(rpc_count), K(paxos_replica_num), K(return_code_array)); } } } @@ -396,18 +403,15 @@ int ObLSCreator::create_ls_(const ObILSAddr &addrs, int ObLSCreator::check_create_ls_result_(const int64_t rpc_count, const int64_t paxos_replica_num, + const ObIArray &return_code_array, common::ObMemberList &member_list) { int ret = OB_SUCCESS; - ObArray return_code_array; member_list.reset(); if (OB_UNLIKELY(!is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret)); - } else if (OB_FAIL(create_ls_proxy_.wait_all(return_code_array))) { - LOG_WARN("wait all batch result failed", KR(ret)); } else if (rpc_count != return_code_array.count() - || rpc_count != create_ls_proxy_.get_args().count() || rpc_count != create_ls_proxy_.get_results().count()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("rpc count not equal to result count", KR(ret), K(rpc_count), @@ -425,12 +429,11 @@ int ObLSCreator::check_create_ls_result_(const int64_t rpc_count, } else if (OB_SUCCESS != result->get_result()) { LOG_WARN("rpc is failed", KR(ret), K(*result), K(i)); } else { - const obrpc::ObCreateLSArg &arg = create_ls_proxy_.get_args().at(i); const ObAddr &addr = create_ls_proxy_.get_dests().at(i); - if (common::ObReplicaTypeCheck::is_paxos_replica_V2(arg.get_replica_type())) { - if (OB_FAIL(member_list.add_member(ObMember(addr, timestamp)))) { - LOG_WARN("failed to add member", KR(ret), K(addr)); - } + //TODO other replica type + //can not get replica type from arg, arg and result is not match + if (OB_FAIL(member_list.add_member(ObMember(addr, timestamp)))) { + LOG_WARN("failed to add member", KR(ret), K(addr)); } } } @@ -470,6 +473,7 @@ int ObLSCreator::set_member_list_(const common::ObMemberList &member_list, const int64_t paxos_replica_num) { int ret = OB_SUCCESS; + DEBUG_SYNC(BEFORE_SET_LS_MEMBER_LIST); if (OB_UNLIKELY(!is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret)); @@ -484,6 +488,8 @@ int ObLSCreator::set_member_list_(const common::ObMemberList &member_list, } else { ObSetMemberListArgV2 arg; int64_t rpc_count = 0; + int tmp_ret = OB_SUCCESS; + ObArray return_code_array; if (OB_FAIL(arg.init(tenant_id_, id_, paxos_replica_num, member_list))) { LOG_WARN("failed to init set member list arg", KR(ret), K_(id), K_(tenant_id), K(paxos_replica_num), K(member_list)); @@ -492,16 +498,23 @@ int ObLSCreator::set_member_list_(const common::ObMemberList &member_list, ObAddr addr; if (OB_FAIL(member_list.get_server_by_index(i, addr))) { LOG_WARN("failed to get member by index", KR(ret), K(i), K(member_list)); - } else if (OB_FAIL(set_member_list_proxy_.call(addr, ctx.get_timeout(), + } else if (OB_TMP_FAIL(set_member_list_proxy_.call(addr, ctx.get_timeout(), GCONF.cluster_id, tenant_id_, arg))) { - LOG_WARN("failed to set member list", KR(ret), K(ctx.get_timeout()), K(arg), + LOG_WARN("failed to set member list", KR(tmp_ret), K(ctx.get_timeout()), K(arg), K(tenant_id_)); } else { rpc_count++; } } - if (FAILEDx(check_set_memberlist_result_(rpc_count, paxos_replica_num))) { - LOG_WARN("failed to check set member liset result", KR(ret), K(rpc_count), K(paxos_replica_num)); + + if (OB_TMP_FAIL(set_member_list_proxy_.wait_all(return_code_array))) { + ret = OB_SUCC(ret) ? tmp_ret : ret; + LOG_WARN("failed to wait all async rpc", KR(ret), KR(tmp_ret), K(rpc_count)); + + } + if (FAILEDx(check_set_memberlist_result_(rpc_count, return_code_array, paxos_replica_num))) { + LOG_WARN("failed to check set member liset result", KR(ret), K(rpc_count), + K(paxos_replica_num), K(return_code_array)); } } } @@ -510,17 +523,14 @@ int ObLSCreator::set_member_list_(const common::ObMemberList &member_list, } int ObLSCreator::check_set_memberlist_result_(const int64_t rpc_count, + const ObIArray &return_code_array, const int64_t paxos_replica_num) { int ret = OB_SUCCESS; - ObArray return_code_array; if (OB_UNLIKELY(!is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret)); - } else if (OB_FAIL(set_member_list_proxy_.wait_all(return_code_array))) { - LOG_WARN("wait all batch result failed", KR(ret)); } else if (rpc_count != return_code_array.count() - || rpc_count != set_member_list_proxy_.get_args().count() || rpc_count != set_member_list_proxy_.get_results().count()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("rpc count not equal to result count", KR(ret), K(rpc_count), diff --git a/src/share/ls/ob_ls_creator.h b/src/share/ls/ob_ls_creator.h index 1d0de1467..9012c6d53 100644 --- a/src/share/ls/ob_ls_creator.h +++ b/src/share/ls/ob_ls_creator.h @@ -149,8 +149,10 @@ private: ObLSReplicaAddr &ls_replica_addr); int check_create_ls_result_(const int64_t rpc_count, const int64_t paxos_replica_num, + const ObIArray &return_code_array, common::ObMemberList &member_list); int check_set_memberlist_result_(const int64_t rpc_count, + const ObIArray &return_code_array, const int64_t paxos_replica_num); private: diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index 2277681bc..446835adf 100644 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -430,6 +430,7 @@ class ObString; ACT(DDL_CHECK_TABLET_MERGE_STATUS,)\ ACT(MODIFY_HIDDEN_TABLE_NOT_NULL_COLUMN_STATE_BEFORE_PUBLISH_SCHEMA,)\ ACT(AFTER_MIGRATION_FETCH_TABLET_INFO,)\ + ACT(BEFORE_SET_LS_MEMBER_LIST,)\ ACT(MAX_DEBUG_SYNC_POINT,) DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);