diff --git a/src/observer/net/ob_ingress_bw_alloc_service.cpp b/src/observer/net/ob_ingress_bw_alloc_service.cpp index 852ed5c9fe..caf3b1737a 100644 --- a/src/observer/net/ob_ingress_bw_alloc_service.cpp +++ b/src/observer/net/ob_ingress_bw_alloc_service.cpp @@ -131,40 +131,40 @@ int ObNetEndpointIngressManager::collect_predict_bw(ObNetEndpointKVArray &update } ObArray return_code_array; - if (OB_FAIL(proxy_batch.wait_all(return_code_array))) { - LOG_WARN("wait batch result failed", KR(ret)); - } - if (OB_FAIL(ret)) { - } else if (return_code_array.count() != update_kvs.count() || - return_code_array.count() != proxy_batch.get_results().count()) { + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(proxy_batch.wait_all(return_code_array))) { + LOG_WARN("wait batch result failed", KR(tmp_ret), K(ret)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } else if (OB_FAIL(ret)) { + } else if (return_code_array.count() != update_kvs.count()) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("cnt not match", - KR(ret), - "return_cnt", - return_code_array.count(), - "result_cnt", - proxy_batch.get_results().count(), - "server_cnt", - ingress_plan_map_.size()); - } - - for (int64_t i = 0; OB_SUCC(ret) && i < update_kvs.count(); i++) { - if (OB_FAIL(return_code_array.at(i))) { - const ObAddr &addr = proxy_batch.get_dests().at(i); - LOG_WARN("rpc execute failed", KR(ret), K(addr), K(update_kvs[i])); - ret = OB_SUCCESS; // ignore error - } else { - const obrpc::ObNetEndpointPredictIngressRes *result = proxy_batch.get_results().at(i); - if (OB_ISNULL(result)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("result is null", KR(ret)); + LOG_WARN("cnt not match", KR(ret), + "return_cnt", return_code_array.count(), + "kv_cnt", update_kvs.count()); + } else if (OB_FAIL(proxy_batch.check_return_cnt(return_code_array.count()))) { + LOG_WARN("cnt not match", KR(ret), + "return_cnt", return_code_array.count(), + "server_cnt", ingress_plan_map_.size()); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < update_kvs.count(); i++) { + if (OB_FAIL(return_code_array.at(i))) { + const ObAddr &addr = proxy_batch.get_dests().at(i); + LOG_WARN("rpc execute failed", KR(ret), K(addr), K(update_kvs[i])); ret = OB_SUCCESS; // ignore error } else { - ObNetEndpointValue *endpoint_value = update_kvs[i].value_; - endpoint_value->predicted_bw_ = result->predicted_bw_; + const obrpc::ObNetEndpointPredictIngressRes *result = proxy_batch.get_results().at(i); + if (OB_ISNULL(result)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("result is null", KR(ret)); + ret = OB_SUCCESS; // ignore error + } else { + ObNetEndpointValue *endpoint_value = update_kvs[i].value_; + endpoint_value->predicted_bw_ = result->predicted_bw_; + } } } } + return ret; } @@ -258,29 +258,27 @@ int ObNetEndpointIngressManager::commit_bw_limit_plan(ObNetEndpointKVArray &upda } ObArray return_code_array; - if (OB_FAIL(proxy_batch.wait_all(return_code_array))) { + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(proxy_batch.wait_all(return_code_array))) { LOG_WARN("wait batch result failed", KR(ret)); - } - - if (OB_FAIL(ret)) { - } else if (return_code_array.count() != update_kvs.count() || - return_code_array.count() != proxy_batch.get_results().count()) { + ret = OB_SUCC(ret) ? tmp_ret : ret; + } else if (OB_FAIL(ret)) { + } else if (return_code_array.count() != update_kvs.count()) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("cnt not match", - KR(ret), - "return_cnt", - return_code_array.count(), - "result_cnt", - proxy_batch.get_results().count(), - "server_cnt", - ingress_plan_map_.size()); - } - - for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); i++) { - if (OB_FAIL(return_code_array.at(i))) { - const ObAddr &addr = proxy_batch.get_dests().at(i); - LOG_WARN("rpc execute failed", KR(ret), K(addr)); - ret = OB_SUCCESS; // ignore error + LOG_WARN("cnt not match", KR(ret), + "return_cnt", return_code_array.count(), + "kv_cnt", update_kvs.count()); + } else if (OB_FAIL(proxy_batch.check_return_cnt(return_code_array.count()))) { + LOG_WARN("cnt not match", KR(ret), + "return_cnt", return_code_array.count(), + "server_cnt", ingress_plan_map_.size()); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); i++) { + if (OB_FAIL(return_code_array.at(i))) { + const ObAddr &addr = proxy_batch.get_dests().at(i); + LOG_WARN("rpc execute failed", KR(ret), K(addr)); + ret = OB_SUCCESS; // ignore error + } } } return ret; @@ -482,4 +480,4 @@ int ObIngressBWAllocService::resume_leader() } } // namespace rootserver -} // namespace oceanbase \ No newline at end of file +} // namespace oceanbase diff --git a/src/observer/ob_root_service_monitor.cpp b/src/observer/ob_root_service_monitor.cpp index 26137c44fb..4e9d3d6089 100644 --- a/src/observer/ob_root_service_monitor.cpp +++ b/src/observer/ob_root_service_monitor.cpp @@ -215,7 +215,6 @@ int ObRootServiceMonitor::try_start_root_service() FLOG_WARN("rs_list should not has no member", KR(ret), K(rs_list), "count", rs_list.count()); } else { int64_t timeout = GCONF.rpc_timeout; - int64_t count = 0; rootserver::ObGetRootserverRoleProxy proxy( *GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::get_root_server_status); ObDetectMasterRsArg arg; @@ -223,35 +222,41 @@ int ObRootServiceMonitor::try_start_root_service() ObAddr &addr = rs_list.at(i); if (!addr.is_valid() || GCTX.self_addr() == addr) { // do nothing, no need to check self - } else if (OB_SUCCESS != (tmp_ret = arg.init(addr, cluster_id))) { + } else if (OB_TMP_FAIL(arg.init(addr, cluster_id))) { // cluster_id is useless, but we want to reuse ObDetectMasterRsArg here need_to_wait = true; FLOG_WARN("need to wait because fail to init arg", KR(tmp_ret), K(addr), K(cluster_id)); - } else if (FALSE_IT(count++)) { - // shall never be here - } else if (OB_SUCCESS != (tmp_ret = proxy.call(addr, timeout, OB_SYS_TENANT_ID, arg))) { + } else if (OB_TMP_FAIL(proxy.call(addr, timeout, OB_SYS_TENANT_ID, arg))) { need_to_wait = true; FLOG_WARN("need to wait because fail to send rpc", KR(tmp_ret), K(addr), K(timeout), K(arg)); } } ObArray return_ret_array; - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_ret_array))) { + if (OB_TMP_FAIL(proxy.wait_all(return_ret_array))) { // ignore ret need_to_wait = true; FLOG_WARN("need to wait because wait batch result failed", KR(tmp_ret)); - } else if (return_ret_array.count() != count) { - //ignore ret - need_to_wait = true; - FLOG_WARN("need to wait because send rpc count should match return rpc count", K(count), - "return_ret_array count", return_ret_array.count()); + } else if (OB_FAIL(ret) || need_to_wait) { + // skip } else { - for (int64_t i = 0; OB_SUCC(ret) && i < return_ret_array.count(); i++) { - int return_ret = return_ret_array.at(i); - const ObAddr &addr = proxy.get_dests().at(i); - if (OB_SUCCESS == return_ret) { + // don't use arg/dest here because call() may has failure. + // !use_invalid_addr means count of args_/dest_/results_/return_rets are matched. + const bool use_invalid_addr = (OB_SUCCESS != proxy.check_return_cnt(return_ret_array.count())); + if (use_invalid_addr) { + need_to_wait = true; + FLOG_WARN("need to wait because rpc cnt maybe not match"); + } + ObAddr invalid_addr; + // try best effort to detect if leader exist by results although need_to_wait is true. + for (int64_t i = 0; OB_SUCC(ret) && i < proxy.get_results().count(); i++) { + const ObGetRootserverRoleResult *result = proxy.get_results().at(i); + const ObAddr &addr = use_invalid_addr ? invalid_addr : proxy.get_dests().at(i); + if (!use_invalid_addr && OB_SUCCESS != return_ret_array.at(i)) { + need_to_wait = true; + FLOG_WARN("need to wait because rpc fail", "tmp_ret", return_ret_array.at(i), K(addr)); + } else { // need to check that server is leader or !status::init - const ObGetRootserverRoleResult *result = proxy.get_results().at(i); if (OB_ISNULL(result)) { //ignore ret need_to_wait = true; @@ -266,11 +271,8 @@ int ObRootServiceMonitor::try_start_root_service() FLOG_WARN("need to wait because another root server already exist", K(addr), "status", result->get_status(), "role", result->get_role()); } - } else { - need_to_wait = true; - FLOG_WARN("need to wait because failed to get result", KR(ret), K(addr)); } - } + } // end for } if (OB_FAIL(ret)) { diff --git a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp index f07faaf01f..5359f2e7c0 100644 --- a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp @@ -2561,19 +2561,21 @@ int ObSyncTabletAutoincSeqCtx::call_and_process_all_tablet_autoinc_seqs(P &proxy // wait rpc and process result int tmp_ret = OB_SUCCESS; common::ObArray tmp_ret_array; - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(tmp_ret_array))) { - LOG_WARN("rpc proxy wait failed", K(tmp_ret)); + if (OB_TMP_FAIL(proxy.wait_all(tmp_ret_array))) { + LOG_WARN("rpc proxy wait failed", KR(ret), KR(tmp_ret)); ret = OB_SUCCESS == ret ? tmp_ret : ret; - } else if (OB_SUCC(ret)) { - const auto &result_array = proxy.get_results(); - if (tmp_ret_array.count() != ls_to_tablet_map.size() || result_array.count() != ls_to_tablet_map.size()) { + } else if (OB_FAIL(ret)) { + } else if (OB_FAIL(proxy.check_return_cnt(tmp_ret_array.count()))) { + LOG_WARN("return cnt not match", KR(ret), "return_cnt", tmp_ret_array.count()); + } else { + if (tmp_ret_array.count() != ls_to_tablet_map.size()) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("result count not match", K(ret), K(ls_to_tablet_map.size()), K(tmp_ret_array.count()), K(result_array.count())); + LOG_WARN("result count not match", KR(ret), K(ls_to_tablet_map.size()), K(tmp_ret_array.count())); } else { ObHashMap>::hashtable::iterator map_iter = ls_to_tablet_map.begin(); int64_t new_params_cnt = 0; - // check and reserve first + const auto &result_array = proxy.get_results(); for (int64_t i = 0; OB_SUCC(ret) && i < result_array.count(); ++i, ++map_iter) { const int rpc_ret_code = tmp_ret_array.at(i); const auto *cur_result = result_array.at(i); diff --git a/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp b/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp index 97c9a0c199..d7059e75ba 100644 --- a/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp +++ b/src/rootserver/ddl_task/ob_ddl_single_replica_executor.cpp @@ -175,16 +175,20 @@ int ObDDLSingleReplicaExecutor::schedule_task() LOG_INFO("send build single replica request", K(arg), K(dest_leader_addr)); } } - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(ret_array))) { - LOG_WARN("rpc_proxy wait failed", K(ret), K(tmp_ret)); + if (OB_TMP_FAIL(proxy.wait_all(ret_array))) { + LOG_WARN("rpc_pRoxy wait failed", KR(ret), KR(tmp_ret)); ret = (OB_SUCCESS == ret) ? tmp_ret : ret; - } else if (OB_SUCC(ret)) { - const ObIArray &result_array = proxy.get_results(); - if (ret_array.count() != idxs.count() || result_array.count() != idxs.count()) { + } else if (OB_FAIL(ret)) { + } else if (OB_FAIL(proxy.check_return_cnt(ret_array.count()))) { + LOG_WARN("return cnt not match", KR(ret), "return_cnt", ret_array.count()); + } else { + if (ret_array.count() != idxs.count()) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("error unexpected, ret array count is not equal to request count", K(ret)); + LOG_WARN("error unexpected, ret array count is not equal to request count", + KR(ret), "return_cnt", ret_array.count(), "idxs_cnt", idxs.count()); } ObSpinLockGuard guard(lock_); + const ObIArray &result_array = proxy.get_results(); for (int64_t i = 0; OB_SUCC(ret) && i < ret_array.count(); ++i) { const int64_t idx = idxs.at(i); if (!build_infos.at(idx).need_schedule()) { // already handle respone rpc diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index 6ad9e56c33..f840142d23 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -1810,10 +1810,13 @@ int check_trans_end(const ObArray &send_array, // collect result int tmp_ret = OB_SUCCESS; common::ObArray tmp_ret_array; - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(tmp_ret_array))) { - LOG_WARN("rpc proxy wait failed", K(tmp_ret)); + if (OB_TMP_FAIL(proxy.wait_all(tmp_ret_array))) { + LOG_WARN("rpc proxy wait failed", KR(ret), KR(tmp_ret)); ret = OB_SUCCESS == ret ? tmp_ret : ret; - } else if (OB_SUCC(ret)) { + } else if (OB_FAIL(ret)) { + } else if (OB_FAIL(proxy.check_return_cnt(tmp_ret_array.count()))) { + LOG_WARN("return cnt not match", KR(ret), "return_cnt", tmp_ret_array.count()); + } else { const ObIArray &result_array = proxy.get_results(); const ObIArray &arg_array = proxy.get_args(); const ObIArray &dest_array = proxy.get_dests(); diff --git a/src/rootserver/ob_bootstrap.cpp b/src/rootserver/ob_bootstrap.cpp index 7e3b2ab708..7f45cda699 100644 --- a/src/rootserver/ob_bootstrap.cpp +++ b/src/rootserver/ob_bootstrap.cpp @@ -346,9 +346,11 @@ int ObPreBootstrap::notify_sys_tenant_server_unit_resource() } } int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = notify_proxy.wait())) { + if (OB_TMP_FAIL(notify_proxy.wait())) { LOG_WARN("fail to wait notify resource", K(ret), K(tmp_ret)); ret = (OB_SUCCESS == ret) ? tmp_ret : ret; + } else if (OB_SUCC(ret)) { + // can use arg/dest/result here. } } @@ -924,18 +926,22 @@ int ObBootstrap::broadcast_sys_schema(const ObSArray &table_schem ObArray return_code_array; int tmp_ret = OB_SUCCESS; // always wait all - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_code_array))) { + if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { LOG_WARN("wait batch result failed", KR(tmp_ret), KR(ret)); ret = OB_SUCC(ret) ? tmp_ret : ret; + } else if (OB_FAIL(ret)) { + } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) { + LOG_WARN("return cnt not match", KR(ret), "return_cnt", return_code_array.count()); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); i++) { + int res_ret = return_code_array.at(i); + const ObAddr &addr = proxy.get_dests().at(i); + if (OB_SUCCESS != res_ret) { + ret = res_ret; + LOG_WARN("broadcast schema failed", KR(ret), K(addr)); + } + } // end for } - for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); i++) { - int res_ret = return_code_array.at(i); - const ObAddr &addr = proxy.get_dests().at(i); - if (OB_SUCCESS != res_ret) { - ret = res_ret; - LOG_WARN("broadcast schema failed", KR(ret), K(addr)); - } - } // end for } BOOTSTRAP_CHECK_SUCCESS(); return ret; diff --git a/src/rootserver/ob_ddl_service.cpp b/src/rootserver/ob_ddl_service.cpp index c4ad2a3c66..6c6da1f9e1 100755 --- a/src/rootserver/ob_ddl_service.cpp +++ b/src/rootserver/ob_ddl_service.cpp @@ -22644,7 +22644,6 @@ int ObDDLService::notify_init_tenant_config( LOG_WARN("fail to set default timeout", KR(ret)); } else { ObArenaAllocator allocator("InitTenantConf"); - int64_t server_cnt = addrs.count(); // 1. construct arg obrpc::ObInitTenantConfigArg arg; for (int64_t i = 0; OB_SUCC(ret) && i < init_configs.count(); i++) { @@ -22685,34 +22684,33 @@ int ObDDLService::notify_init_tenant_config( if (OB_FAIL(ret) || call_rs) { } else if (OB_FAIL(proxy.call(rs_addr, timeout, arg))) { LOG_WARN("fail to call rs", KR(ret), K(rs_addr), K(timeout), K(arg)); - } else { - server_cnt++; } // 3. check result ObArray return_ret_array; int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_ret_array))) { // ignore ret + if (OB_TMP_FAIL(proxy.wait_all(return_ret_array))) { // ignore ret LOG_WARN("wait batch result failed", KR(tmp_ret), KR(ret)); ret = OB_SUCC(ret) ? tmp_ret : ret; - } else if (return_ret_array.count() != server_cnt) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("result cnt not match", KR(ret), K(server_cnt), "res_cnt", return_ret_array.count()); + } else if (OB_FAIL(ret)) { + } else if (OB_FAIL(proxy.check_return_cnt(return_ret_array.count()))) { + LOG_WARN("return cnt not match", KR(ret), "return_cnt", return_ret_array.count()); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < return_ret_array.count(); i++) { + int return_ret = return_ret_array.at(i); + const ObAddr &addr = proxy.get_dests().at(i); + const ObInitTenantConfigRes *result = proxy.get_results().at(i); + if (OB_SUCCESS != return_ret) { + ret = return_ret; + LOG_WARN("rpc return error", KR(ret), K(addr), K(timeout)); + } else if (OB_ISNULL(result)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get empty result", KR(ret), K(addr), K(timeout)); + } else if (OB_SUCCESS != result->get_ret()) { + ret = result->get_ret(); + LOG_WARN("persist tenant config failed", KR(ret), K(addr), K(timeout)); + } + } // end for } - for (int64_t i = 0; OB_SUCC(ret) && i < return_ret_array.count(); i++) { - int return_ret = return_ret_array.at(i); - const ObAddr &addr = proxy.get_dests().at(i); - const ObInitTenantConfigRes *result = proxy.get_results().at(i); - if (OB_SUCCESS != return_ret) { - ret = return_ret; - LOG_WARN("rpc return error", KR(ret), K(addr), K(timeout)); - } else if (OB_ISNULL(result)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("get empty result", KR(ret), K(addr), K(timeout)); - } else if (OB_SUCCESS != result->get_ret()) { - ret = result->get_ret(); - LOG_WARN("persist tenant config failed", KR(ret), K(addr), K(timeout)); - } - } // end for } return ret; } @@ -22878,23 +22876,29 @@ int ObDDLService::notify_root_key( } // 2. check result ObArray return_ret_array; - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_ret_array))) { // ignore ret + if (OB_TMP_FAIL(proxy.wait_all(return_ret_array))) { // ignore ret LOG_WARN("wait batch result failed", KR(tmp_ret), KR(ret)); ret = OB_SUCC(ret) ? tmp_ret : ret; - } - for (int64_t i = 0; OB_SUCC(ret) && i < return_ret_array.count() && !has_failed; ++i) { - if (OB_TMP_FAIL(return_ret_array.at(i))) { - has_failed = true; - return_ret= tmp_ret; - LOG_WARN("rpc return error", KR(tmp_ret), K(i)); + } else if (OB_FAIL(ret) || has_failed) { + } else { + // don't use arg/dest here because call() may has failure. + for (int64_t i = 0; OB_SUCC(ret) && i < return_ret_array.count() && !has_failed; ++i) { + if (OB_TMP_FAIL(return_ret_array.at(i))) { + has_failed = true; + return_ret = tmp_ret; + LOG_WARN("rpc return error", KR(tmp_ret), K(i)); + } } } - if (OB_SUCC(ret) && arg.is_set_) { + if (OB_FAIL(ret)) { + } else if (arg.is_set_) { if (OB_UNLIKELY(has_failed)) { ret = return_ret; LOG_WARN("failed to set root key", KR(ret)); } } else { + // 1. don't use arg/dest here because call() may has failure. + // 2. result may be meanless when related return ret is not OB_SUCCESS obrpc::RootKeyType key_type = obrpc::RootKeyType::INVALID; ObString root_key; for (int64_t i = 0; OB_SUCC(ret) && i < proxy.get_results().count(); ++i) { @@ -23224,20 +23228,24 @@ int ObDDLService::broadcast_sys_table_schemas( ObArray return_code_array; int tmp_ret = OB_SUCCESS; // always wait all - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_code_array))) { + if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { LOG_WARN("wait batch result failed", KR(tmp_ret), KR(ret)); ret = OB_SUCC(ret) ? tmp_ret : ret; + } else if (OB_FAIL(ret)) { + } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) { + LOG_WARN("return cnt not match", KR(ret), "return_cnt", return_code_array.count()); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); i++) { + int res_ret = return_code_array.at(i); + const ObAddr &addr = proxy.get_dests().at(i); + if (OB_SUCCESS != res_ret + && (addr == leader->get_server() + || addr == GCONF.self_addr_)) { // leader and rs must succeed + ret = res_ret; + LOG_WARN("broadcast schema failed", KR(ret), K(addr), K(tenant_id)); + } + } // end for } - for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); i++) { - int res_ret = return_code_array.at(i); - const ObAddr &addr = proxy.get_dests().at(i); - if (OB_SUCCESS != res_ret - && (addr == leader->get_server() - || addr == GCONF.self_addr_)) { // leader and rs must succeed - ret = res_ret; - LOG_WARN("broadcast schema failed", KR(ret), K(addr), K(tenant_id)); - } - } // end for } } LOG_INFO("[CREATE_TENANT] STEP 2.2. finish broadcast sys table schemas", KR(ret), K(tenant_id), @@ -26810,6 +26818,7 @@ int ObDDLService::notify_refresh_schema(const ObAddrIArray &addrs) LOG_INFO("try to notify refresh schema", K(is_async), K(schema_version), K(local_schema_info), K(schema_info)); const int64_t rpc_timeout = GCONF.rpc_timeout; int64_t timeout = 0; + bool report_failure = false; FOREACH_X(s, server_list, OB_SUCC(ret)) { if (OB_ISNULL(s)) { ret = OB_ERR_UNEXPECTED; @@ -26817,30 +26826,26 @@ int ObDDLService::notify_refresh_schema(const ObAddrIArray &addrs) } else if (rs_addr == *s) { continue; } else { - bool found = false; - for (int64_t i = 0; !found && i < addrs.count(); i++) { - if (addrs.at(i) == *s) { - found = true; - } - } - if (found) { + arg.force_refresh_ = has_exist_in_array(addrs, *s); + if (arg.force_refresh_) { // refresh schema sync and report error + if (!report_failure) { + report_failure = true; + } if (OB_FAIL(ObShareUtil::get_ctx_timeout(rpc_timeout, timeout))) { LOG_WARN("fail to get timeout", KR(ret)); } + arg.is_async_ = false; } else { // refresh schema async and ignore error timeout = std::min(THIS_WORKER.get_timeout_remain(), rpc_timeout); - } - arg.force_refresh_ = found; - if (!arg.force_refresh_) { arg.is_async_ = is_async; } // overwrite ret if (FAILEDx(proxy.call(*s, timeout, arg))) { LOG_WARN("send switch schema rpc failed", KR(ret), K(timeout), K(schema_version), K(schema_info), K(arg), "server", *s); - if (!found) { // ignore servers that are not in addrs + if (!arg.force_refresh_) { // ignore servers that are not in addrs ret = OB_SUCCESS; } } @@ -26848,19 +26853,27 @@ int ObDDLService::notify_refresh_schema(const ObAddrIArray &addrs) } ObArray return_code_array; int tmp_ret = OB_SUCCESS; // always wait all - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_code_array))) { + if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { LOG_WARN("wait result failed", KR(tmp_ret), KR(ret)); ret = OB_SUCC(ret) ? tmp_ret : ret; - } - ARRAY_FOREACH_N(return_code_array, i, cnt) { - int res_ret = return_code_array.at(i); - const ObAddr &addr = proxy.get_dests().at(i); - const obrpc::ObSwitchSchemaArg &tmp_arg = proxy.get_args().at(i); - if (OB_SUCCESS != res_ret && tmp_arg.force_refresh_) { - ret = res_ret; - LOG_WARN("switch schema failed", KR(ret), K(addr)); + } else if (OB_FAIL(ret)) { + } else if (report_failure) { + // check result only when report_failure = true + if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) { + LOG_WARN("fail to check return count", + KR(ret), "return_cnt", return_code_array.count()); + } else { + ARRAY_FOREACH_N(return_code_array, i, cnt) { + int res_ret = return_code_array.at(i); + const ObAddr &addr = proxy.get_dests().at(i); + const obrpc::ObSwitchSchemaArg &tmp_arg = proxy.get_args().at(i); + if (OB_SUCCESS != res_ret && tmp_arg.force_refresh_) { + ret = res_ret; + LOG_WARN("switch schema failed", KR(ret), K(addr)); + } + } // end for } - } // end for + } } LOG_INFO("notify switch schema finished", KR(ret), K(schema_version), K(schema_info), K(arg), K(addrs)); diff --git a/src/rootserver/ob_heartbeat_service.cpp b/src/rootserver/ob_heartbeat_service.cpp index ded603d975..62025c6e4d 100644 --- a/src/rootserver/ob_heartbeat_service.cpp +++ b/src/rootserver/ob_heartbeat_service.cpp @@ -291,6 +291,7 @@ int ObHeartbeatService::set_hb_responses_(const int64_t whitelist_epoch_id, ObSe need_process_hb_responses_ = true; hb_responses_epoch_id_ = whitelist_epoch_id; hb_responses_.reset(); + // don't use arg/dest here because call() may has failue. ARRAY_FOREACH_X(proxy->get_results(), idx, cnt, OB_SUCC(ret)) { const ObHBResponse *hb_response = proxy->get_results().at(idx); if (OB_ISNULL(hb_response)) { diff --git a/src/rootserver/ob_ls_recovery_stat_handler.cpp b/src/rootserver/ob_ls_recovery_stat_handler.cpp index 69bb151393..39269f14e6 100755 --- a/src/rootserver/ob_ls_recovery_stat_handler.cpp +++ b/src/rootserver/ob_ls_recovery_stat_handler.cpp @@ -424,7 +424,7 @@ int ObLSRecoveryStatHandler::do_get_majority_readable_scn_( //get result ObArray return_code_array; - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_code_array))) { + if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { LOG_WARN("wait all batch result failed", KR(ret), KR(tmp_ret)); ret = OB_SUCCESS == ret ? tmp_ret : ret; } else if (OB_FAIL(ret)) { @@ -433,10 +433,9 @@ int ObLSRecoveryStatHandler::do_get_majority_readable_scn_( majority_cnt, return_code_array, proxy, - rpc_count, majority_min_readable_scn))) { LOG_WARN("failed to calc_majority_min_readable_scn", KR(ret), K(leader_readable_scn), - K(ob_member_list), K(return_code_array), K(rpc_count)); + K(ob_member_list), K(return_code_array)); } } @@ -448,7 +447,6 @@ int ObLSRecoveryStatHandler::calc_majority_min_readable_scn_( const int64_t majority_cnt, const ObIArray &return_code_array, const ObGetLSReplayedScnProxy &proxy, - const int64_t rpc_count, SCN &majority_min_readable_scn) { int ret = OB_SUCCESS; @@ -456,22 +454,19 @@ int ObLSRecoveryStatHandler::calc_majority_min_readable_scn_( majority_min_readable_scn = SCN::max_scn(); if (OB_FAIL(check_inner_stat_())) { LOG_WARN("inner stat error", KR(ret), K_(is_inited)); - } else if (!leader_readable_scn.is_valid_and_not_min() || 0 >= majority_cnt || 0 >= rpc_count) { + } else if (!leader_readable_scn.is_valid_and_not_min() || 0 >= majority_cnt) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(leader_readable_scn), K(majority_cnt), K(rpc_count)); - } else if (rpc_count != return_code_array.count() || - rpc_count != proxy.get_results().count()) { + LOG_WARN("invalid argument", KR(ret), K(leader_readable_scn), K(majority_cnt)); + } else if (return_code_array.count() != proxy.get_results().count()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("rpc count not equal to result count", KR(ret), - K(rpc_count), K(return_code_array), "arg count", - proxy.get_args().count(), K(proxy.get_results().count())); + K(return_code_array), K(proxy.get_results().count())); } else if (OB_FAIL(readable_scn_list.push_back(leader_readable_scn))) { LOG_WARN("failed to push back", KR(ret), K(leader_readable_scn), K(readable_scn_list)); - } else if (OB_FAIL(ret)) { } else { ObGetLSReplayedScnRes res; int tmp_ret = OB_SUCCESS; - + // don't use arg/dest here because call() may has failure. for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); ++i) { tmp_ret = return_code_array.at(i); // skip error server diff --git a/src/rootserver/ob_ls_recovery_stat_handler.h b/src/rootserver/ob_ls_recovery_stat_handler.h index 86878b9ae2..ebf928d507 100644 --- a/src/rootserver/ob_ls_recovery_stat_handler.h +++ b/src/rootserver/ob_ls_recovery_stat_handler.h @@ -110,7 +110,6 @@ private: const int64_t majority_cnt, const ObIArray &return_code_array, const ObGetLSReplayedScnProxy &proxy, - const int64_t rpc_count, share::SCN &majority_min_readable_scn); int construct_new_member_list_( diff --git a/src/rootserver/ob_root_minor_freeze.cpp b/src/rootserver/ob_root_minor_freeze.cpp index bf5a0442e8..b8a119473c 100644 --- a/src/rootserver/ob_root_minor_freeze.cpp +++ b/src/rootserver/ob_root_minor_freeze.cpp @@ -200,31 +200,37 @@ int ObRootMinorFreeze::do_minor_freeze(const ParamsContainer ¶ms) const ObMinorFreezeProxy proxy(*rpc_proxy_, &ObSrvRpcProxy::minor_freeze); LOG_INFO("do minor freeze", K(params)); - for (int64_t i = 0; i < params.get_params().count() && OB_SUCC(check_cancel()); ++i) { + for (int64_t i = 0; OB_SUCC(ret) && i < params.get_params().count(); ++i) { const MinorFreezeParam ¶m = params.get_params().at(i); - - if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = proxy.call(param.server, - MINOR_FREEZE_TIMEOUT, param.arg)))) { - LOG_WARN("proxy call failed", K(tmp_ret), K(param.arg), + if (OB_FAIL(check_cancel())) { + LOG_WARN("fail to check cancel", KR(ret)); + } else if (OB_TMP_FAIL(proxy.call(param.server, MINOR_FREEZE_TIMEOUT, param.arg))) { + LOG_WARN("proxy call failed", KR(tmp_ret), K(param.arg), "dest addr", param.server); - failure_cnt ++; + failure_cnt++; } } - if (OB_FAIL(proxy.wait())) { - LOG_WARN("proxy wait failed", K(ret)); + ObArray return_code_array; + if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { + LOG_WARN("proxy wait failed", KR(ret), KR(tmp_ret)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } else if (OB_FAIL(ret)) { + } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) { + LOG_WARN("return cnt not match", KR(ret), "return_cnt", return_code_array.count()); } else { for (int i = 0; i < proxy.get_results().count(); ++i) { - if (OB_SUCCESS != (tmp_ret = static_cast(*proxy.get_results().at(i)))) { + if (OB_TMP_FAIL(static_cast(*proxy.get_results().at(i)))) { LOG_WARN("fail to do minor freeze on target server, ", K(tmp_ret), "dest addr:", proxy.get_dests().at(i), "param:", proxy.get_args().at(i)); - failure_cnt ++; + failure_cnt++; } } } - if (0 != failure_cnt && OB_CANCELED != ret) { + if (OB_FAIL(ret)) { + } else if (0 != failure_cnt) { ret = OB_PARTIAL_FAILED; LOG_WARN("minor freeze partial failed", KR(ret), K(failure_cnt)); } diff --git a/src/rootserver/ob_root_utils.cpp b/src/rootserver/ob_root_utils.cpp index 1d6c9b8010..7daba6fd69 100644 --- a/src/rootserver/ob_root_utils.cpp +++ b/src/rootserver/ob_root_utils.cpp @@ -2200,6 +2200,8 @@ int ObRootUtils::notify_switch_leader( if (OB_TMP_FAIL(proxy.wait())) { ret = OB_SUCC(ret) ? tmp_ret : ret; LOG_WARN("failed to wait all result", KR(ret), KR(tmp_ret)); + } else if (OB_SUCC(ret)) { + // arg/dest/result can be used here. } } } diff --git a/src/rootserver/ob_schema_history_recycler.cpp b/src/rootserver/ob_schema_history_recycler.cpp index 14d40ec834..12ae880beb 100644 --- a/src/rootserver/ob_schema_history_recycler.cpp +++ b/src/rootserver/ob_schema_history_recycler.cpp @@ -375,46 +375,41 @@ int ObSchemaHistoryRecycler::get_recycle_schema_version_by_server( } ObArray return_code_array; int tmp_ret = OB_SUCCESS; // always wait all - if (OB_SUCCESS != (tmp_ret = proxy_batch.wait_all(return_code_array))) { - LOG_WARN("wait batch result failed", K(tmp_ret), KR(ret)); + if (OB_TMP_FAIL(proxy_batch.wait_all(return_code_array))) { + LOG_WARN("wait batch result failed", KR(tmp_ret), KR(ret)); ret = OB_SUCC(ret) ? tmp_ret : ret; - } - if (OB_FAIL(ret)) { - } else if (return_code_array.count() != server_list.count() - || return_code_array.count() != proxy_batch.get_results().count()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("cnt not match", KR(ret), - "return_cnt", return_code_array.count(), - "result_cnt", proxy_batch.get_results().count(), - "server_cnt", server_list.count()); - } - for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); i++) { - int res_ret = return_code_array.at(i); - const ObAddr &addr = proxy_batch.get_dests().at(i); - if (OB_SUCCESS != res_ret) { - ret = res_ret; - LOG_WARN("rpc execute failed", KR(ret), K(addr)); - } else { - const obrpc::ObGetMinSSTableSchemaVersionRes *result = proxy_batch.get_results().at(i); - if (OB_ISNULL(result)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("result is null", KR(ret)); - } else if (result->ret_list_.count() != arg.tenant_id_arg_list_.count()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("cnt not match", KR(ret), - "tenant_cnt", arg.tenant_id_arg_list_.count(), - "result_cnt", result->ret_list_.count()); + } else if (OB_FAIL(ret)) { + } else if (OB_FAIL(proxy_batch.check_return_cnt(return_code_array.count()))) { + LOG_WARN("fail to check return cnt", KR(ret), "return_cnt", return_code_array.count()); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); i++) { + int res_ret = return_code_array.at(i); + const ObAddr &addr = proxy_batch.get_dests().at(i); + if (OB_SUCCESS != res_ret) { + ret = res_ret; + LOG_WARN("rpc execute failed", KR(ret), K(addr)); } else { - for (int64_t j = 0; OB_SUCC(ret) && j < result->ret_list_.count(); j++) { - int64_t schema_version = result->ret_list_.at(j); - uint64_t tenant_id = arg.tenant_id_arg_list_.at(j); - if (FAILEDx(fill_recycle_schema_versions( - tenant_id, schema_version, recycle_schema_versions))) { - LOG_WARN("fail to fill recycle schema versions", - KR(ret), K(tenant_id), K(schema_version)); + const obrpc::ObGetMinSSTableSchemaVersionRes *result = proxy_batch.get_results().at(i); + if (OB_ISNULL(result)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("result is null", KR(ret)); + } else if (result->ret_list_.count() != arg.tenant_id_arg_list_.count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("cnt not match", KR(ret), + "tenant_cnt", arg.tenant_id_arg_list_.count(), + "result_cnt", result->ret_list_.count()); + } else { + for (int64_t j = 0; OB_SUCC(ret) && j < result->ret_list_.count(); j++) { + int64_t schema_version = result->ret_list_.at(j); + uint64_t tenant_id = arg.tenant_id_arg_list_.at(j); + if (FAILEDx(fill_recycle_schema_versions( + tenant_id, schema_version, recycle_schema_versions))) { + LOG_WARN("fail to fill recycle schema versions", + KR(ret), K(tenant_id), K(schema_version)); + } + LOG_INFO("[SCHEMA_RECYCLE] get recycle schema version from observer", + KR(ret), K(addr), K(tenant_id), K(schema_version)); } - LOG_INFO("[SCHEMA_RECYCLE] get recycle schema version from observer", - KR(ret), K(addr), K(tenant_id), K(schema_version)); } } } diff --git a/src/rootserver/ob_system_admin_util.cpp b/src/rootserver/ob_system_admin_util.cpp index 1ceab975c2..eacc5713f6 100644 --- a/src/rootserver/ob_system_admin_util.cpp +++ b/src/rootserver/ob_system_admin_util.cpp @@ -714,14 +714,15 @@ int ObAdminRefreshSchema::call_server(const ObAddr &server) if (OB_FAIL(proxy.call(server, timeout_ts, arg))) { LOG_WARN("notify switch schema failed", KR(ret), K(server), K_(schema_version), K_(schema_info)); } - if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { ret = OB_SUCC(ret) ? tmp_ret : ret; LOG_WARN("fail to wait all", KR(ret), KR(tmp_ret), K(server)); } else if (OB_FAIL(ret)) { - } else if (OB_UNLIKELY(return_code_array.empty())) { + } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) { + LOG_WARN("fail to check return cnt", KR(ret), K(server), "return_cnt", return_code_array.count()); + } else if (OB_UNLIKELY(1 != return_code_array.count())) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("return_code_array is empty", KR(ret), K(server)); + LOG_WARN("return_code_array count shoud be 1", KR(ret), K(server), "return_cnt", return_code_array.count()); } else { ret = return_code_array.at(0); } diff --git a/src/rootserver/ob_tenant_info_loader.cpp b/src/rootserver/ob_tenant_info_loader.cpp index 3550e96296..1eb43df843 100644 --- a/src/rootserver/ob_tenant_info_loader.cpp +++ b/src/rootserver/ob_tenant_info_loader.cpp @@ -349,15 +349,12 @@ void ObTenantInfoLoader::broadcast_tenant_info_content_() } int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_code_array))) { + if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { LOG_WARN("wait all batch result failed", KR(ret), KR(tmp_ret)); ret = OB_SUCCESS == ret ? tmp_ret : ret; - } else if (proxy.get_results().count() != return_code_array.count()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("result count not match", KR(ret), - K(rpc_count), K(return_code_array), "arg count", - proxy.get_args().count(), K(proxy.get_results().count())); } else if (OB_FAIL(ret)) { + } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) { + LOG_WARN("fail to check return cnt", KR(ret), "return_cnt", return_code_array.count()); } else { (void)ATOMIC_AAF(&broadcast_times_, 1); ObUpdateTenantInfoCacheRes res; diff --git a/src/rootserver/ob_tenant_role_transition_service.cpp b/src/rootserver/ob_tenant_role_transition_service.cpp index 6e63f00a0f..0694f40a03 100644 --- a/src/rootserver/ob_tenant_role_transition_service.cpp +++ b/src/rootserver/ob_tenant_role_transition_service.cpp @@ -783,7 +783,6 @@ int ObTenantRoleTransitionService::get_ls_access_mode_( ObGetLSAccessModeProxy proxy( *rpc_proxy_, &obrpc::ObSrvRpcProxy::get_ls_access_mode); obrpc::ObGetLSAccessModeInfoArg arg; - int64_t rpc_count = 0; ObArray return_code_array; int tmp_ret = OB_SUCCESS; const uint64_t group_id = share::OBCG_DBA_COMMAND; @@ -801,30 +800,28 @@ int ObTenantRoleTransitionService::get_ls_access_mode_( //can not ignore error of each ls LOG_WARN("failed to send rpc", KR(ret), K(leader), K(timeout), K(tenant_id_), K(arg), K(group_id)); - } else { - rpc_count++; } if (OB_FAIL(ret)) { int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS !=(tmp_ret = GCTX.location_service_->nonblock_renew( + if (OB_TMP_FAIL(GCTX.location_service_->nonblock_renew( GCONF.cluster_id, tenant_id_, info.ls_id_))) { - LOG_WARN("failed to renew location", KR(ret), K(tenant_id_), K(info)); + LOG_WARN("failed to renew location", KR(ret), KR(tmp_ret), K(tenant_id_), K(info)); } } }//end for //get result - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_code_array))) { + if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { LOG_WARN("wait all batch result failed", KR(ret), KR(tmp_ret)); ret = OB_SUCC(ret) ? tmp_ret : ret; } else if (OB_FAIL(ret)) { //no need to process return code - } else if (rpc_count != return_code_array.count() || - rpc_count != proxy.get_results().count()) { + } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) { + LOG_WARN("fail to check return cnt", KR(ret), "return_cnt", return_code_array.count()); + } else if (status_info_array.count() != return_code_array.count()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("rpc count not equal to result count", KR(ret), - K(rpc_count), K(return_code_array), "arg count", - proxy.get_args().count()); + K(return_code_array.count()), K(status_info_array.count())); } else { for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); ++i) { ret = return_code_array.at(i); @@ -842,12 +839,11 @@ int ObTenantRoleTransitionService::get_ls_access_mode_( } } LOG_INFO("[ROLE_TRANSITION] get ls access mode", KR(ret), K(arg)); - } if (OB_FAIL(ret)) { int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = do_nonblock_renew(status_info_array, success_ls_ids, tenant_id_))) { + if (OB_TMP_FAIL(do_nonblock_renew(status_info_array, success_ls_ids, tenant_id_))) { LOG_WARN("failed to renew location", KR(ret), KR(tmp_ret), K(tenant_id_), K(status_info_array), K(success_ls_ids)); } } @@ -909,11 +905,12 @@ int ObTenantRoleTransitionService::do_change_ls_access_mode_( ret = OB_SUCC(ret) ? tmp_ret : ret; LOG_WARN("wait all batch result failed", KR(ret), KR(tmp_ret)); } else if (OB_FAIL(ret)) { - } else if (rpc_count != return_code_array.count() || - rpc_count != proxy.get_results().count()) { + } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) { + LOG_WARN("fail to check return cnt", KR(ret), "return_cnt", return_code_array.count()); + } else if (rpc_count != return_code_array.count()) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("rpc count not equal to result count", KR(ret), K(rpc_count), - K(return_code_array), "arg count", proxy.get_args().count()); + LOG_WARN("rpc count not equal to result count", KR(ret), + K(rpc_count), K(return_code_array.count())); } else { for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); ++i) { ret = return_code_array.at(i); @@ -1288,7 +1285,6 @@ int ObTenantRoleTransitionService::get_checkpoints_by_rpc(const uint64_t tenant_ ObGetLSSyncScnProxy proxy( *GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::get_ls_sync_scn); obrpc::ObGetLSSyncScnArg arg; - int64_t rpc_count = 0; const uint64_t group_id = share::OBCG_DBA_COMMAND; for (int64_t i = 0; OB_SUCC(ret) && i < status_info_array.count(); ++i) { const ObLSStatusInfo &info = status_info_array.at(i); @@ -1303,23 +1299,21 @@ int ObTenantRoleTransitionService::get_checkpoints_by_rpc(const uint64_t tenant_ } else if (OB_FAIL(proxy.call(leader, timeout_us, GCONF.cluster_id, tenant_id, group_id, arg))) { LOG_WARN("failed to send rpc", KR(ret), K(leader), K(timeout_us), K(tenant_id), K(arg), K(group_id)); - } else { - rpc_count++; } }//end for //get result ObArray return_code_array; int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_code_array))) { + if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { LOG_WARN("wait all batch result failed", KR(ret), KR(tmp_ret)); ret = OB_SUCCESS == ret ? tmp_ret : ret; - } else if (rpc_count != return_code_array.count() || - rpc_count != proxy.get_results().count()) { + } else if (OB_FAIL(ret)) { + } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) { + LOG_WARN("fail to check return cnt", KR(ret), "return_cnt", return_code_array.count()); + } else if (status_info_array.count() != return_code_array.count()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("rpc count not equal to result count", KR(ret), - K(rpc_count), K(return_code_array), "arg count", - proxy.get_args().count()); - } else if (OB_FAIL(ret)) { + K(return_code_array.count()), K(return_code_array.count())); } else { ObGetLSSyncScnRes res; for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); ++i) { @@ -1489,16 +1483,16 @@ void ObTenantRoleTransitionService::broadcast_tenant_info(const char* const log_ } int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_code_array))) { + if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { LOG_WARN("wait all batch result failed", KR(ret), KR(tmp_ret)); ret = OB_SUCCESS == ret ? tmp_ret : ret; - } else if (rpc_count != return_code_array.count() || - rpc_count != proxy.get_results().count()) { + } else if (OB_FAIL(ret)) { + } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) { + LOG_WARN("fail to check return cnt", KR(ret), "return_cnt", return_code_array.count()); + } else if (rpc_count != return_code_array.count()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("rpc count not equal to result count", KR(ret), - K(rpc_count), K(return_code_array), "arg count", - proxy.get_args().count(), K(proxy.get_results().count())); - } else if (OB_FAIL(ret)) { + K(rpc_count), K(return_code_array.count())); } else { ObRefreshTenantInfoRes res; for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); ++i) { diff --git a/src/rootserver/ob_unit_manager.cpp b/src/rootserver/ob_unit_manager.cpp index eec0a4038b..f9998fce4b 100644 --- a/src/rootserver/ob_unit_manager.cpp +++ b/src/rootserver/ob_unit_manager.cpp @@ -4768,15 +4768,18 @@ int ObUnitManager::rollback_persistent_units( if (OB_TMP_FAIL(notify_proxy.wait_all(return_ret_array))) { LOG_WARN("fail to wait notify resource", KR(ret), K(tmp_ret)); ret = OB_SUCC(ret) ? tmp_ret : ret; - } - if (OB_SUCC(ret)) { + } else if (OB_FAIL(ret)) { + } else { + // don't use arg/dest/result here because call() may has failure. + ObAddr invalid_addr; for (int64_t i = 0; OB_SUCC(ret) && i < return_ret_array.count(); i++) { const int ret_i = return_ret_array.at(i); + const ObAddr &addr = return_ret_array.count() != notify_proxy.get_dests().count() ? + invalid_addr : notify_proxy.get_dests().at(i); // if (OB_SUCCESS != ret_i && OB_TENANT_NOT_IN_SERVER != ret_i) { if (OB_SUCCESS != ret_i && OB_TENANT_NOT_IN_SERVER != ret_i) { ret = ret_i; - LOG_WARN("fail to mark tenant removed", KR(ret), KR(ret_i), - K(notify_proxy.get_dests().at(i))); + LOG_WARN("fail to mark tenant removed", KR(ret), KR(ret_i), K(addr)); } } } @@ -4965,6 +4968,8 @@ int ObUnitManager::allocate_pool_units_( if (OB_TMP_FAIL(notify_proxy.wait())) { LOG_WARN("fail to wait notify resource", K(ret), K(tmp_ret)); ret = (OB_SUCCESS == ret) ? tmp_ret : ret; + } else if (OB_SUCC(ret)) { + // arg/dest/result can be used here. } if (is_valid_tenant_id(pool.tenant_id_)) { ret = ERRSIM_UNIT_PERSISTENCE_ERROR ? : ret; @@ -5585,18 +5590,18 @@ int ObUnitManager::get_servers_resource_info_via_rpc( if (OB_TMP_FAIL(proxy.wait())) { LOG_WARN("fail to wait all batch result", KR(ret), KR(tmp_ret)); ret = OB_SUCC(ret) ? tmp_ret : ret; - } - tmp_report_servers_resource_info.reset(); - ARRAY_FOREACH_X(proxy.get_results(), idx, cnt, OB_SUCC(ret)) { - const obrpc::ObGetServerResourceInfoResult *rpc_result = proxy.get_results().at(idx); - if (OB_ISNULL(rpc_result)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("rpc_result is null", KR(ret), KP(rpc_result)); - } else if (OB_UNLIKELY(!rpc_result->is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("rpc_result is invalid", KR(ret), KPC(rpc_result)); - } else if (OB_FAIL(tmp_report_servers_resource_info.push_back(*rpc_result))) { - LOG_WARN("fail to push an element into tmp_report_servers_resource_info", KR(ret), KPC(rpc_result)); + } else if (OB_SUCC(ret)) { + ARRAY_FOREACH_X(proxy.get_results(), idx, cnt, OB_SUCC(ret)) { + const obrpc::ObGetServerResourceInfoResult *rpc_result = proxy.get_results().at(idx); + if (OB_ISNULL(rpc_result)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("rpc_result is null", KR(ret), KP(rpc_result)); + } else if (OB_UNLIKELY(!rpc_result->is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("rpc_result is invalid", KR(ret), KPC(rpc_result)); + } else if (OB_FAIL(tmp_report_servers_resource_info.push_back(*rpc_result))) { + LOG_WARN("fail to push an element into tmp_report_servers_resource_info", KR(ret), KPC(rpc_result)); + } } } } @@ -7859,9 +7864,11 @@ int ObUnitManager::do_grant_pools_( } } int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = notify_proxy.wait())) { + if (OB_TMP_FAIL(notify_proxy.wait())) { LOG_WARN("fail to wait notify resource", KR(ret), K(tmp_ret)); ret = (OB_SUCCESS == ret) ? tmp_ret : ret; + } else if (OB_SUCC(ret)) { + // arg/dest/result can be used here. } ret = ERRSIM_UNIT_PERSISTENCE_ERROR ? : ret; if (OB_FAIL(ret) && pools.count() == all_pool_units.count()) { @@ -7960,9 +7967,11 @@ int ObUnitManager::do_revoke_pools_( LOG_WARN("failed to commit shrinking pools in revoking", KR(ret), K(tenant_id)); } int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = notify_proxy.wait())) { + if (OB_TMP_FAIL(notify_proxy.wait())) { LOG_WARN("fail to wait notify resource", KR(ret), K(tmp_ret)); ret = (OB_SUCCESS == ret) ? tmp_ret : ret; + } else if (OB_SUCC(ret)) { + // arg/dest/result can be used here. } } return ret; @@ -8532,9 +8541,11 @@ int ObUnitManager::do_migrate_unit_notify_resource_(const share::ObResourcePool LOG_WARN("fail to try notify server unit resource", K(ret)); } int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = notify_proxy.wait())) { + if (OB_TMP_FAIL(notify_proxy.wait())) { LOG_WARN("fail to wait notify resource", K(ret), K(tmp_ret)); ret = (OB_SUCCESS == ret) ? tmp_ret : ret; + } else if (OB_SUCC(ret)) { + // arg/dest/result can be used here. } // Rollback persistent unit if persistence failed @@ -8739,11 +8750,12 @@ int ObUnitManager::inner_try_delete_migrate_unit_resource( K(ret), K(rpc_timeout), "unit", *unit, "dest", migrate_from_server); } int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = notify_proxy.wait())) { + if (OB_TMP_FAIL(notify_proxy.wait())) { LOG_WARN("fail to wait notify resource", K(ret), K(tmp_ret)); ret = (OB_SUCCESS == ret) ? tmp_ret : ret; } if (OB_SUCC(ret)) { + // arg/dest/result can be used here. LOG_INFO("notify resource to server succeed", "unit", *unit, "dest", migrate_from_server); } } diff --git a/src/rootserver/restore/ob_restore_scheduler.cpp b/src/rootserver/restore/ob_restore_scheduler.cpp index 40b1465e6d..030a4660e2 100644 --- a/src/rootserver/restore/ob_restore_scheduler.cpp +++ b/src/rootserver/restore/ob_restore_scheduler.cpp @@ -605,15 +605,19 @@ int ObRestoreScheduler::restore_keystore(const share::ObPhysicalRestoreJob &job_ } int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_code_array))) { + if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { LOG_WARN("wait batch result failed", KR(tmp_ret), KR(ret)); ret = OB_SUCC(ret) ? tmp_ret : ret; - } - for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); i++) { - ret = return_code_array.at(i); - const ObAddr &addr = proxy.get_dests().at(i); - if (OB_FAIL(ret)) { - LOG_WARN("failed to restore key", KR(ret), K(addr)); + } else if (OB_FAIL(ret)) { + } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) { + LOG_WARN("fail to check return cnt", KR(ret), "return_cnt", return_code_array.count()); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); i++) { + ret = return_code_array.at(i); + const ObAddr &addr = proxy.get_dests().at(i); + if (OB_FAIL(ret)) { + LOG_WARN("failed to restore key", KR(ret), K(addr)); + } } } } diff --git a/src/share/location_cache/ob_ls_location_service.cpp b/src/share/location_cache/ob_ls_location_service.cpp index a2c27a5a27..8fe67fe730 100644 --- a/src/share/location_cache/ob_ls_location_service.cpp +++ b/src/share/location_cache/ob_ls_location_service.cpp @@ -862,22 +862,23 @@ int ObLSLocationService::detect_ls_leaders_( if (OB_TMP_FAIL(proxy.wait_all(return_ret_array))) { // ignore ret LOG_WARN("wait batch result failed", KR(tmp_ret), KR(ret)); ret = OB_SUCC(ret) ? tmp_ret : ret; + } else if (OB_FAIL(ret)) { } else { + // don't use arg/dest here because call() may has failure. + // !use_invalid_addr means count of args_/dest_/results_/return_rets are matched. + const bool use_invalid_addr = (OB_SUCCESS != proxy.check_return_cnt(return_ret_array.count())); ObAddr invalid_addr; - for (int64_t i = 0; OB_SUCC(ret) && i < return_ret_array.count(); i++) { - int return_ret = return_ret_array.at(i); + for (int64_t i = 0; OB_SUCC(ret) && i < proxy.get_results().count(); i++) { const obrpc::ObGetLeaderLocationsResult *result = proxy.get_results().at(i); - if (OB_SUCCESS == return_ret) { - if (OB_NOT_NULL(result)) { - if (OB_FAIL(append(leaders, result->get_leader_replicas()))) { - LOG_WARN("fail to append array", KR(ret), KPC(result)); - } - } else { - LOG_TRACE("result is null", K(i), K(timeout)); - } + const ObAddr &addr = use_invalid_addr ? invalid_addr : dests.at(i); + if (!use_invalid_addr && OB_SUCCESS != return_ret_array.at(i)) { + LOG_WARN("fail to get result by rpc, just ignore", "tmp_ret", return_ret_array.at(i), K(addr)); + } else if (OB_ISNULL(result) || !result->is_valid()) { + // return fail + } else if (OB_FAIL(append(leaders, result->get_leader_replicas()))) { + LOG_WARN("fail to append array", KR(ret), KPC(result)); } else { - LOG_TRACE("fail to detect ls leader", "ret", return_ret, K(timeout), - "addr", OB_ISNULL(result) ? invalid_addr : result->get_addr()); + LOG_TRACE("result is null", K(i), K(timeout), K(addr)); } } // end for } diff --git a/src/share/ls/ob_ls_creator.cpp b/src/share/ls/ob_ls_creator.cpp index 9daf99d13a..5abd3e022a 100644 --- a/src/share/ls/ob_ls_creator.cpp +++ b/src/share/ls/ob_ls_creator.cpp @@ -399,7 +399,6 @@ int ObLSCreator::create_ls_(const ObILSAddr &addrs, LOG_WARN("fail to set timeout ctx", KR(ret)); } else { obrpc::ObCreateLSArg arg; - int64_t rpc_count = 0; int tmp_ret = OB_SUCCESS; ObArray return_code_array; lib::Worker::CompatMode new_compat_mode = compat_mode == ORACLE_MODE ? @@ -409,7 +408,6 @@ int ObLSCreator::create_ls_(const ObILSAddr &addrs, for (int64_t i = 0; OB_SUCC(ret) && i < addrs.count(); ++i) { arg.reset(); const ObLSReplicaAddr &addr = addrs.at(i); - rpc_count++; if (OB_FAIL(arg.init(tenant_id_, id_, addr.replica_type_, addr.replica_property_, tenant_info, create_scn, new_compat_mode, create_with_palf, palf_base_info))) { @@ -424,10 +422,10 @@ int ObLSCreator::create_ls_(const ObILSAddr &addrs, //wait all if (OB_TMP_FAIL(create_ls_proxy_.wait_all(return_code_array))) { ret = OB_SUCC(ret) ? tmp_ret : ret; - LOG_WARN("failed to wait all async rpc", KR(ret), KR(tmp_ret), K(rpc_count)); + LOG_WARN("failed to wait all async rpc", KR(ret), KR(tmp_ret)); } - if (FAILEDx(check_create_ls_result_(rpc_count, paxos_replica_num, return_code_array, member_list, learner_list))) { - LOG_WARN("failed to check ls result", KR(ret), K(rpc_count), K(paxos_replica_num), K(return_code_array), K(learner_list)); + if (FAILEDx(check_create_ls_result_(paxos_replica_num, return_code_array, member_list, learner_list))) { + LOG_WARN("failed to check ls result", KR(ret), K(paxos_replica_num), K(return_code_array), K(learner_list)); } #ifdef OB_BUILD_ARBITRATION @@ -555,11 +553,11 @@ int ObLSCreator::try_create_arbitration_service_replica_( } #endif -int ObLSCreator::check_create_ls_result_(const int64_t rpc_count, - const int64_t paxos_replica_num, - const ObIArray &return_code_array, - common::ObMemberList &member_list, - common::GlobalLearnerList &learner_list) +int ObLSCreator::check_create_ls_result_( + const int64_t paxos_replica_num, + const ObIArray &return_code_array, + common::ObMemberList &member_list, + common::GlobalLearnerList &learner_list) { int ret = OB_SUCCESS; member_list.reset(); @@ -567,13 +565,13 @@ int ObLSCreator::check_create_ls_result_(const int64_t rpc_count, if (OB_UNLIKELY(!is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret)); - } else if (rpc_count != return_code_array.count() - || rpc_count != create_ls_proxy_.get_results().count()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("rpc count not equal to result count", KR(ret), K(rpc_count), - K(return_code_array), "arg count", create_ls_proxy_.get_args().count()); + } else if (return_code_array.count() != create_ls_proxy_.get_results().count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("rpc count not equal to result count", KR(ret), + K(return_code_array.count()), K(create_ls_proxy_.get_results().count())); } else { const int64_t timestamp = 1; + // don't use arg/dest here because call() may has failure. for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); ++i) { if (OB_SUCCESS != return_code_array.at(i)) { LOG_WARN("rpc is failed", KR(ret), K(return_code_array.at(i)), K(i)); @@ -607,11 +605,12 @@ int ObLSCreator::check_create_ls_result_(const int64_t rpc_count, LOG_WARN("failed to add member", KR(ret), K(addr)); } } - LOG_TRACE("create ls result", KR(ret), K(i), K(addr), KPC(result), K(rpc_count)); + LOG_TRACE("create ls result", KR(ret), K(i), K(addr), KPC(result)); } } } - if (rootserver::majority(paxos_replica_num) > member_list.get_member_number()) { + if (OB_FAIL(ret)) { + } else if (rootserver::majority(paxos_replica_num) > member_list.get_member_number()) { ret = OB_REPLICA_NUM_NOT_ENOUGH; LOG_WARN("success count less than majority", KR(ret), K(paxos_replica_num), K(member_list)); @@ -733,11 +732,9 @@ int ObLSCreator::set_member_list_(const common::ObMemberList &member_list, if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) { LOG_WARN("fail to set timeout ctx", KR(ret)); } else { - int64_t rpc_count = 0; ObArray return_code_array; for (int64_t i = 0; OB_SUCC(ret) && i < member_list.get_member_number(); ++i) { ObAddr addr; - rpc_count++; ObSetMemberListArgV2 arg; if (OB_FAIL(arg.init(tenant_id_, id_, paxos_replica_num, member_list, arbitration_service, learner_list))) { LOG_WARN("failed to init set member list arg", KR(ret), K_(id), K_(tenant_id), @@ -755,11 +752,11 @@ int ObLSCreator::set_member_list_(const common::ObMemberList &member_list, if (OB_TMP_FAIL(set_member_list_proxy_.wait_all(return_code_array))) { ret = OB_SUCC(ret) ? tmp_ret : ret; - LOG_WARN("failed to wait all async rpc", KR(ret), KR(tmp_ret), K(rpc_count)); + LOG_WARN("failed to wait all async rpc", KR(ret), KR(tmp_ret)); } - if (FAILEDx(check_set_memberlist_result_(rpc_count, return_code_array, paxos_replica_num))) { - LOG_WARN("failed to check set member liset result", KR(ret), K(rpc_count), + if (FAILEDx(check_set_memberlist_result_(return_code_array, paxos_replica_num))) { + LOG_WARN("failed to check set member liset result", KR(ret), K(paxos_replica_num), K(return_code_array)); } } @@ -777,21 +774,21 @@ int ObLSCreator::set_member_list_(const common::ObMemberList &member_list, return ret; } -int ObLSCreator::check_set_memberlist_result_(const int64_t rpc_count, - const ObIArray &return_code_array, - const int64_t paxos_replica_num) +int ObLSCreator::check_set_memberlist_result_( + const ObIArray &return_code_array, + const int64_t paxos_replica_num) { int ret = OB_SUCCESS; int64_t success_cnt = 0; if (OB_UNLIKELY(!is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret)); - } else if (rpc_count != return_code_array.count() - || rpc_count != set_member_list_proxy_.get_results().count()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("rpc count not equal to result count", KR(ret), K(rpc_count), - K(return_code_array), "arg count", set_member_list_proxy_.get_args().count()); + } else if (return_code_array.count() != set_member_list_proxy_.get_results().count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("rpc count not equal to result count", KR(ret), + K(return_code_array.count()), K(set_member_list_proxy_.get_results().count())); } else { + // don't use arg/dest here because call() may has failure. for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); ++i) { if (OB_SUCCESS != return_code_array.at(i)) { LOG_WARN("rpc is failed", KR(ret), K(return_code_array.at(i)), K(i)); @@ -807,7 +804,8 @@ int ObLSCreator::check_set_memberlist_result_(const int64_t rpc_count, } } } - if (rootserver::majority(paxos_replica_num) > success_cnt) { + if (OB_FAIL(ret)) { + } else if (rootserver::majority(paxos_replica_num) > success_cnt) { ret = OB_REPLICA_NUM_NOT_ENOUGH; LOG_WARN("success count less than majority", KR(ret), K(success_cnt), K(paxos_replica_num)); diff --git a/src/share/ls/ob_ls_creator.h b/src/share/ls/ob_ls_creator.h index 4aaa378020..3e3d46b7ef 100644 --- a/src/share/ls/ob_ls_creator.h +++ b/src/share/ls/ob_ls_creator.h @@ -175,14 +175,12 @@ private: const ObTenantRole &tenant_role, const ObAddr &arbitration_service); #endif - int check_create_ls_result_(const int64_t rpc_count, - const int64_t paxos_replica_num, - const ObIArray &return_code_array, - common::ObMemberList &member_list, - common::GlobalLearnerList &learner_list); - int check_set_memberlist_result_(const int64_t rpc_count, - const ObIArray &return_code_array, - const int64_t paxos_replica_num); + int check_create_ls_result_(const int64_t paxos_replica_num, + const ObIArray &return_code_array, + common::ObMemberList &member_list, + common::GlobalLearnerList &learner_list); + int check_set_memberlist_result_(const ObIArray &return_code_array, + const int64_t paxos_replica_num); // alloc ls addr for duplicate log stream // @params[in] tenant_id, which tenant's log stream diff --git a/src/share/ls/ob_rpc_ls_table.cpp b/src/share/ls/ob_rpc_ls_table.cpp index 4db7db3069..e7d3fd7fe6 100644 --- a/src/share/ls/ob_rpc_ls_table.cpp +++ b/src/share/ls/ob_rpc_ls_table.cpp @@ -274,6 +274,7 @@ int ObRpcLSTable::do_detect_master_rs_ls_( LOG_WARN("start_idx/end_idx is invalid", KR(ret), K(start_idx), K(end_idx), "list_cnt", server_list.count()); } else { + ObArray tmp_addrs; ObTimeoutCtx ctx; int64_t timeout = GCONF.rpc_timeout; // default value is 2s int tmp_ret = share::ObShareUtil::set_default_timeout_ctx(ctx, timeout); @@ -286,6 +287,8 @@ int ObRpcLSTable::do_detect_master_rs_ls_( ObAddr &addr = server_list.at(i); if (!addr.is_valid()) { // TODO: @wanhong.wwh: need check when addr is not valid + } else if (OB_FAIL(tmp_addrs.push_back(addr))) { + LOG_WARN("fail to push back addr", KR(ret), K(addr)); } else if (OB_FAIL(arg.init(addr, cluster_id))) { LOG_WARN("fail to init arg", KR(ret), K(addr), K(cluster_id)); } else if (OB_TMP_FAIL(proxy.call(addr, timeout, cluster_id, @@ -298,30 +301,29 @@ int ObRpcLSTable::do_detect_master_rs_ls_( if (OB_TMP_FAIL(proxy.wait_all(return_ret_array))) { // ignore ret LOG_WARN("wait batch result failed", KR(tmp_ret), KR(ret)); ret = OB_SUCC(ret) ? tmp_ret : ret; - } else if (proxy.get_dests().count() != proxy.get_args().count() - || return_ret_array.count() != proxy.get_args().count() - || return_ret_array.count() != proxy.get_results().count()) { - ret = OB_STATE_NOT_MATCH; - LOG_WARN("args/dest/return_ret_array/results count not match, need retry", - KR(ret), "args_cnt", proxy.get_args().count(), - "dests_cnt", proxy.get_dests().count(), - "return_cnt", return_ret_array.count(), - "result_cnt", proxy.get_results().count()); - } - bool leader_exist = false; - for (int64_t i = 0; OB_SUCC(ret) && i < return_ret_array.count(); i++) { - int return_ret = return_ret_array.at(i); - if (OB_SUCCESS == return_ret) { - const ObAddr &addr = proxy.get_dests().at(i); + } else if (OB_FAIL(ret)) { + } else { + // don't use arg/dest here because call() may has failure. + // !use_invalid_addr means count of args_/dest_/results_/return_rets are matched. + bool leader_exist = false; + const bool use_invalid_addr = (OB_SUCCESS != proxy.check_return_cnt(return_ret_array.count())); + ObAddr invalid_addr; + for (int64_t i = 0; OB_SUCC(ret) && i < proxy.get_results().count(); i++) { + const ObAddr &addr = use_invalid_addr ? invalid_addr : proxy.get_dests().at(i); const ObDetectMasterRsLSResult *result = proxy.get_results().at(i); - if (OB_ISNULL(result)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("result is null", KR(ret)); + if (!use_invalid_addr && OB_SUCCESS != return_ret_array.at(i)) { + LOG_WARN("fail to get result by rpc, just ignore", "tmp_ret", return_ret_array.at(i), K(addr)); + } else if (OB_ISNULL(result) || !result->is_valid()) { + // return fail } else if (OB_FAIL(deal_with_result_ls_(*result, leader_exist, server_list, ls_info))) { LOG_WARN("fail to deal with result", KR(ret), K(addr), KPC(result)); } else { LOG_TRACE("detect master rs", KR(ret), K(addr), KPC(result)); } + } // end for + + if (use_invalid_addr || proxy.get_results().count() <= 0) { + LOG_WARN("Detect master rs may be failed", KR(ret), K(tmp_addrs)); } } } diff --git a/src/share/ob_ddl_common.cpp b/src/share/ob_ddl_common.cpp index 1f33a11624..bfd97603b8 100644 --- a/src/share/ob_ddl_common.cpp +++ b/src/share/ob_ddl_common.cpp @@ -1703,10 +1703,13 @@ int ObCheckTabletDataComplementOp::do_check_tablets_merge_status( // handle batch result int tmp_ret = OB_SUCCESS; common::ObArray return_ret_array; - if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_ret_array))) { + if (OB_TMP_FAIL(proxy.wait_all(return_ret_array))) { LOG_WARN("rpc proxy wait failed", K(tmp_ret)); ret = OB_SUCCESS == ret ? tmp_ret : ret; - } else if (OB_SUCC(ret)) { + } else if (OB_FAIL(ret)) { + } else if (OB_FAIL(proxy.check_return_cnt(return_ret_array.count()))) { + LOG_WARN("return cnt not match", KR(ret), "return_cnt", return_ret_array.count()); + } else { if (return_ret_array.count() != ip_tablets_map.size()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("rpc proxy rsp size not equal to send size", K(ret), K(return_ret_array.count()), K(ip_tablets_map.size())); diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 5e723a4840..7cf2dec4d1 100755 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -8397,6 +8397,13 @@ void ObDetectMasterRsLSResult::reset() ls_info_.reset(); } +bool ObDetectMasterRsLSResult::is_valid() const +{ + return ObRole::INVALID_ROLE != role_ // sys ls replica is leader/follower + || master_rs_.is_valid(); // sys ls replica not exist + +} + void ObDetectMasterRsLSResult::set_role(const common::ObRole &role) { role_ = role; diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 4db92f7d7e..0df688e622 100755 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -8976,6 +8976,7 @@ public: int assign(const ObDetectMasterRsLSResult &other); void reset(); + bool is_valid() const; void set_role(const common::ObRole &role); void set_master_rs(const common::ObAddr &master_rs); int set_replica(const share::ObLSReplica &replica); diff --git a/src/share/rpc/ob_async_rpc_proxy.h b/src/share/rpc/ob_async_rpc_proxy.h index 838e6590b3..fdffeb047c 100644 --- a/src/share/rpc/ob_async_rpc_proxy.h +++ b/src/share/rpc/ob_async_rpc_proxy.h @@ -140,6 +140,8 @@ public: const common::ObIArray &get_dests() const { return dests_; } const common::ObIArray &get_results() const { return results_; } int receive_response(); + + int check_return_cnt(const int64_t return_cnt) const; private: int call_rpc(const common::ObAddr &server, const int64_t timeout, const int64_t cluster_id, const uint64_t tenant_id, const RpcArg &arg, ObAsyncCB *cb); @@ -185,6 +187,7 @@ template::reuse() { args_.reuse(); + dests_.reuse(); results_.reuse(); response_count_ = 0; ObAsyncCB *cb = cb_list_.get_first(); @@ -467,6 +470,24 @@ int ObAsyncRpcProxy::receive_response() return ret; } +template +int ObAsyncRpcProxy::check_return_cnt( + const int64_t return_cnt) const +{ + int ret = common::OB_SUCCESS; + if (return_cnt != args_.count() + || return_cnt != dests_.count() + || return_cnt != results_.count()) { + ret = common::OB_INVALID_ARGUMENT; + RPC_LOG(WARN, "return cnt not match", + KR(ret), K(return_cnt), + "arg_cnt", args_.count(), + "dest_cnt", dests_.count(), + "result_cnt", results_.count()); + } + return ret; +} + #define RPC_F(code, arg, result, name) \ typedef obrpc::ObAsyncRpcProxy *, const obrpc::ObRpcOpts &), obrpc::ObSrvRpcProxy> name diff --git a/src/share/schema/ob_ddl_trans_controller.cpp b/src/share/schema/ob_ddl_trans_controller.cpp index 2ab323d76d..38edef4dc0 100644 --- a/src/share/schema/ob_ddl_trans_controller.cpp +++ b/src/share/schema/ob_ddl_trans_controller.cpp @@ -171,7 +171,11 @@ int ObDDLTransController::broadcast_consensus_version(const int64_t tenant_id, int tmp_ret = OB_SUCCESS; ObArray return_code_array; if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { - LOG_WARN("wait result failed", KR(tmp_ret)); + LOG_WARN("wait result failed", KR(tmp_ret), K(ret)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } else if (OB_FAIL(ret)) { + } else { + // don't use arg/dest here beacause call() may has failure. } } LOG_INFO("broadcast consensus version finished", KR(ret), K(schema_version), K(arg), K(server_list)); diff --git a/src/storage/tablelock/ob_table_lock_service.cpp b/src/storage/tablelock/ob_table_lock_service.cpp index 7f4b66be23..1bb69a8707 100644 --- a/src/storage/tablelock/ob_table_lock_service.cpp +++ b/src/storage/tablelock/ob_table_lock_service.cpp @@ -1392,8 +1392,10 @@ int ObTableLockService::collect_rollback_info_(const ObArray &ls_ int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - // need wait rpcs that sent finish - // otherwise proxy reused or destructored will cause flying rpc core + // 1. need wait rpcs that sent finish + // otherwise proxy reused or destructored will cause flying rpc core + // 2. don't use arg/dest here because call() may has failure. + // 3. return_array/result can be used only when wait_all() is success. ObArray return_code_array; if (OB_TMP_FAIL(proxy_batch.wait_all(return_code_array))) { LOG_WARN("wait rpc failed", K(tmp_ret)); @@ -1422,9 +1424,10 @@ int ObTableLockService::handle_parallel_rpc_response_(RpcProxy &proxy_batch, // handle result ObArray return_code_array; if (OB_TMP_FAIL(proxy_batch.wait_all(return_code_array)) + || OB_TMP_FAIL(proxy_batch.check_return_cnt(return_code_array.count())) || retry_ctx.send_rpc_count_ != return_code_array.count()) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("rpc failed", K(tmp_ret), K(retry_ctx.send_rpc_count_), K(return_code_array.count())); + LOG_WARN("rpc failed", KR(ret), KR(tmp_ret), K(retry_ctx.send_rpc_count_), K(return_code_array.count())); // we need add the ls into touched to make rollback. can_retry = false; retry_ctx.need_retry_ = false;