From 7a505cf040e76caec176dd1ce86829faa809e8e6 Mon Sep 17 00:00:00 2001 From: chenlinzhong <490103404@qq.com> Date: Fri, 19 Aug 2022 10:25:01 +0800 Subject: [PATCH] [remote-udaf](optimize) Optimize RPC exception handling logic (#11680) --- .../aggregate_function_rpc.h | 135 ++++++++++++------ 1 file changed, 92 insertions(+), 43 deletions(-) diff --git a/be/src/vec/aggregate_functions/aggregate_function_rpc.h b/be/src/vec/aggregate_functions/aggregate_function_rpc.h index 6a9a3894c8..d7ac8f59e3 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_rpc.h +++ b/be/src/vec/aggregate_functions/aggregate_function_rpc.h @@ -43,7 +43,10 @@ #include "vec/io/io_helper.h" namespace doris::vectorized { +#define error_default_str "#$@" + constexpr int64_t max_buffered_rows = 4096; + struct AggregateRpcUdafData { private: std::string _update_fn; @@ -54,6 +57,7 @@ private: std::shared_ptr _client; PFunctionCallResponse _res; std::vector _buffer_request; + bool _error; public: AggregateRpcUdafData() = default; @@ -65,6 +69,10 @@ public: ~AggregateRpcUdafData() {} + void set_error(bool flag) { _error = flag; } + + bool has_error() { return _error == true; } + Status merge(AggregateRpcUdafData& rhs) { send_buffer_to_rpc_server(); if (has_last_result()) { @@ -94,10 +102,12 @@ public: _merge_fn = fn.aggregate_fn.merge_fn_symbol; _server_addr = fn.hdfs_location; _finalize_fn = fn.aggregate_fn.finalize_fn_symbol; + set_error(false); _client = ExecEnv::GetInstance()->brpc_function_client_cache()->get_client(_server_addr); if (_client == nullptr) { std::string err_msg = "init rpc error, addr:" + _server_addr; LOG(ERROR) << err_msg; + set_error(true); return Status::InternalError(err_msg); } return Status::OK(); @@ -107,19 +117,33 @@ public: PFunctionCallResponse& response) { _client->fn_call(&cntl, &request, &response, nullptr); if (cntl.Failed()) { - return Status::InternalError(fmt::format("call to rpc function {} failed: {}", - request.function_name(), cntl.ErrorText()) - .c_str()); + set_error(true); + std::stringstream err_msg; + err_msg << " call rpc function failed"; + err_msg << " _server_addr:" << _server_addr; + err_msg << " function_name:" << request.function_name(); + err_msg << " err:" << cntl.ErrorText(); + LOG(ERROR) << err_msg.str(); + return Status::InternalError(err_msg.str()); } if (!response.has_status() || response.result_size() == 0) { - return Status::InternalError( - fmt::format("call rpc function {} failed: status or result is not set.", - request.function_name())); + set_error(true); + std::stringstream err_msg; + err_msg << " call rpc function failed, status or result is not set"; + err_msg << " _server_addr:" << _server_addr; + err_msg << " function_name:" << request.function_name(); + LOG(ERROR) << err_msg.str(); + return Status::InternalError(err_msg.str()); } if (response.status().status_code() != 0) { - return Status::InternalError(fmt::format("call to rpc function {} failed: {}", - request.function_name(), - response.status().DebugString())); + set_error(true); + std::stringstream err_msg; + err_msg << " call rpc function failed"; + err_msg << " _server_addr:" << _server_addr; + err_msg << " function_name:" << request.function_name(); + err_msg << " err:" << response.status().DebugString(); + LOG(ERROR) << err_msg.str(); + return Status::InternalError(err_msg.str()); } return Status::OK(); } @@ -224,7 +248,12 @@ public: void serialize(BufferWritable& buf) { send_buffer_to_rpc_server(); - std::string serialize_data = _res.SerializeAsString(); + std::string serialize_data = error_default_str; + if (!has_error()) { + serialize_data = _res.SerializeAsString(); + } else { + LOG(ERROR) << "serialize empty buf"; + } write_binary(serialize_data, buf); } @@ -232,11 +261,32 @@ public: send_buffer_to_rpc_server(); std::string serialize_data; read_binary(serialize_data, buf); - _res.ParseFromString(serialize_data); - set_last_result(true); + if (error_default_str != serialize_data) { + _res.ParseFromString(serialize_data); + set_last_result(true); + } else { + LOG(ERROR) << "deserialize empty buf"; + set_error(true); + } } +#define GETDATA(LOCATTYPE, TYPEVALUE) \ + if (response.result_size() > 0 && response.result(0).TYPEVALUE##_##value_size() > 0) { \ + LOCATTYPE ret = response.result(0).TYPEVALUE##_##value(0); \ + to.insert_data((char*)&ret, 0); \ + } else { \ + LOG(ERROR) << "_server_addr:" << _server_addr << ",_finalize_fn:" << _finalize_fn \ + << ",msg: failed to get final result cause return type need " << #TYPEVALUE \ + << "but result is empty"; \ + to.insert_default(); \ + } + + //if any unexpected error happen will return NULL Status get(IColumn& to, const DataTypePtr& return_type) { + if (has_error()) { + to.insert_default(); + return Status::OK(); + } send_buffer_to_rpc_server(); PFunctionCallRequest request; PFunctionCallResponse response; @@ -245,7 +295,10 @@ public: request.mutable_context()->mutable_function_context()->mutable_args_data()->CopyFrom( _res.result()); send_rpc_request(cntl, request, response); - + if (has_error()) { + to.insert_default(); + return Status::OK(); + } DataTypePtr result_type = return_type; if (return_type->is_nullable()) { result_type = @@ -253,36 +306,32 @@ public: } WhichDataType which(result_type); if (which.is_float32()) { - float ret = response.result(0).float_value(0); - to.insert_data((char*)&ret, 0); - } - if (which.is_float64()) { - double ret = response.result(0).double_value(0); - to.insert_data((char*)&ret, 0); - } - if (which.is_int32()) { - int32_t ret = response.result(0).int32_value(0); - to.insert_data((char*)&ret, 0); - } - if (which.is_uint32()) { - uint32_t ret = response.result(0).uint32_value(0); - to.insert_data((char*)&ret, 0); - } - if (which.is_int64()) { - int64_t ret = response.result(0).int64_value(0); - to.insert_data((char*)&ret, 0); - } - if (which.is_uint64()) { - uint64_t ret = response.result(0).uint64_value(0); - to.insert_data((char*)&ret, 0); - } - if (which.is_uint8()) { - uint8_t ret = response.result(0).bool_value(0); - to.insert_data((char*)&ret, 0); - } - if (which.is_string()) { - std::string ret = response.result(0).string_value(0); - to.insert_data(ret.c_str(), ret.size()); + GETDATA(float, float); + } else if (which.is_float64()) { + GETDATA(double, double); + } else if (which.is_int32()) { + GETDATA(int32_t, int32); + } else if (which.is_uint32()) { + GETDATA(uint32_t, uint32); + } else if (which.is_int64()) { + GETDATA(int64_t, int64); + } else if (which.is_uint64()) { + GETDATA(uint64_t, uint64); + } else if (which.is_uint8()) { + GETDATA(uint8_t, bool); + } else if (which.is_string()) { + if (response.result_size() > 0 && response.result(0).string_value_size() > 0) { + std::string ret = response.result(0).string_value(0); + to.insert_data(ret.c_str(), ret.size()); + } else { + LOG(ERROR) << "_server_addr:" << _server_addr << ",_finalize_fn:" << _finalize_fn + << ",msg: failed to get final result cause return type need string but " + "result is empty"; + to.insert_default(); + } + } else { + LOG(ERROR) << "failed to get result cause unkown return type"; + to.insert_default(); } return Status::OK(); }