[remote-udaf](optimize) Optimize RPC exception handling logic (#11680)
This commit is contained in:
@ -43,7 +43,10 @@
|
||||
#include "vec/io/io_helper.h"
|
||||
namespace doris::vectorized {
|
||||
|
||||
#define error_default_str "#$@"
|
||||
|
||||
constexpr int64_t max_buffered_rows = 4096;
|
||||
|
||||
struct AggregateRpcUdafData {
|
||||
private:
|
||||
std::string _update_fn;
|
||||
@ -54,6 +57,7 @@ private:
|
||||
std::shared_ptr<PFunctionService_Stub> _client;
|
||||
PFunctionCallResponse _res;
|
||||
std::vector<PFunctionCallRequest> _buffer_request;
|
||||
bool _error;
|
||||
|
||||
public:
|
||||
AggregateRpcUdafData() = default;
|
||||
@ -65,6 +69,10 @@ public:
|
||||
|
||||
~AggregateRpcUdafData() {}
|
||||
|
||||
void set_error(bool flag) { _error = flag; }
|
||||
|
||||
bool has_error() { return _error == true; }
|
||||
|
||||
Status merge(AggregateRpcUdafData& rhs) {
|
||||
send_buffer_to_rpc_server();
|
||||
if (has_last_result()) {
|
||||
@ -94,10 +102,12 @@ public:
|
||||
_merge_fn = fn.aggregate_fn.merge_fn_symbol;
|
||||
_server_addr = fn.hdfs_location;
|
||||
_finalize_fn = fn.aggregate_fn.finalize_fn_symbol;
|
||||
set_error(false);
|
||||
_client = ExecEnv::GetInstance()->brpc_function_client_cache()->get_client(_server_addr);
|
||||
if (_client == nullptr) {
|
||||
std::string err_msg = "init rpc error, addr:" + _server_addr;
|
||||
LOG(ERROR) << err_msg;
|
||||
set_error(true);
|
||||
return Status::InternalError(err_msg);
|
||||
}
|
||||
return Status::OK();
|
||||
@ -107,19 +117,33 @@ public:
|
||||
PFunctionCallResponse& response) {
|
||||
_client->fn_call(&cntl, &request, &response, nullptr);
|
||||
if (cntl.Failed()) {
|
||||
return Status::InternalError(fmt::format("call to rpc function {} failed: {}",
|
||||
request.function_name(), cntl.ErrorText())
|
||||
.c_str());
|
||||
set_error(true);
|
||||
std::stringstream err_msg;
|
||||
err_msg << " call rpc function failed";
|
||||
err_msg << " _server_addr:" << _server_addr;
|
||||
err_msg << " function_name:" << request.function_name();
|
||||
err_msg << " err:" << cntl.ErrorText();
|
||||
LOG(ERROR) << err_msg.str();
|
||||
return Status::InternalError(err_msg.str());
|
||||
}
|
||||
if (!response.has_status() || response.result_size() == 0) {
|
||||
return Status::InternalError(
|
||||
fmt::format("call rpc function {} failed: status or result is not set.",
|
||||
request.function_name()));
|
||||
set_error(true);
|
||||
std::stringstream err_msg;
|
||||
err_msg << " call rpc function failed, status or result is not set";
|
||||
err_msg << " _server_addr:" << _server_addr;
|
||||
err_msg << " function_name:" << request.function_name();
|
||||
LOG(ERROR) << err_msg.str();
|
||||
return Status::InternalError(err_msg.str());
|
||||
}
|
||||
if (response.status().status_code() != 0) {
|
||||
return Status::InternalError(fmt::format("call to rpc function {} failed: {}",
|
||||
request.function_name(),
|
||||
response.status().DebugString()));
|
||||
set_error(true);
|
||||
std::stringstream err_msg;
|
||||
err_msg << " call rpc function failed";
|
||||
err_msg << " _server_addr:" << _server_addr;
|
||||
err_msg << " function_name:" << request.function_name();
|
||||
err_msg << " err:" << response.status().DebugString();
|
||||
LOG(ERROR) << err_msg.str();
|
||||
return Status::InternalError(err_msg.str());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
@ -224,7 +248,12 @@ public:
|
||||
|
||||
void serialize(BufferWritable& buf) {
|
||||
send_buffer_to_rpc_server();
|
||||
std::string serialize_data = _res.SerializeAsString();
|
||||
std::string serialize_data = error_default_str;
|
||||
if (!has_error()) {
|
||||
serialize_data = _res.SerializeAsString();
|
||||
} else {
|
||||
LOG(ERROR) << "serialize empty buf";
|
||||
}
|
||||
write_binary(serialize_data, buf);
|
||||
}
|
||||
|
||||
@ -232,11 +261,32 @@ public:
|
||||
send_buffer_to_rpc_server();
|
||||
std::string serialize_data;
|
||||
read_binary(serialize_data, buf);
|
||||
_res.ParseFromString(serialize_data);
|
||||
set_last_result(true);
|
||||
if (error_default_str != serialize_data) {
|
||||
_res.ParseFromString(serialize_data);
|
||||
set_last_result(true);
|
||||
} else {
|
||||
LOG(ERROR) << "deserialize empty buf";
|
||||
set_error(true);
|
||||
}
|
||||
}
|
||||
|
||||
#define GETDATA(LOCATTYPE, TYPEVALUE) \
|
||||
if (response.result_size() > 0 && response.result(0).TYPEVALUE##_##value_size() > 0) { \
|
||||
LOCATTYPE ret = response.result(0).TYPEVALUE##_##value(0); \
|
||||
to.insert_data((char*)&ret, 0); \
|
||||
} else { \
|
||||
LOG(ERROR) << "_server_addr:" << _server_addr << ",_finalize_fn:" << _finalize_fn \
|
||||
<< ",msg: failed to get final result cause return type need " << #TYPEVALUE \
|
||||
<< "but result is empty"; \
|
||||
to.insert_default(); \
|
||||
}
|
||||
|
||||
//if any unexpected error happen will return NULL
|
||||
Status get(IColumn& to, const DataTypePtr& return_type) {
|
||||
if (has_error()) {
|
||||
to.insert_default();
|
||||
return Status::OK();
|
||||
}
|
||||
send_buffer_to_rpc_server();
|
||||
PFunctionCallRequest request;
|
||||
PFunctionCallResponse response;
|
||||
@ -245,7 +295,10 @@ public:
|
||||
request.mutable_context()->mutable_function_context()->mutable_args_data()->CopyFrom(
|
||||
_res.result());
|
||||
send_rpc_request(cntl, request, response);
|
||||
|
||||
if (has_error()) {
|
||||
to.insert_default();
|
||||
return Status::OK();
|
||||
}
|
||||
DataTypePtr result_type = return_type;
|
||||
if (return_type->is_nullable()) {
|
||||
result_type =
|
||||
@ -253,36 +306,32 @@ public:
|
||||
}
|
||||
WhichDataType which(result_type);
|
||||
if (which.is_float32()) {
|
||||
float ret = response.result(0).float_value(0);
|
||||
to.insert_data((char*)&ret, 0);
|
||||
}
|
||||
if (which.is_float64()) {
|
||||
double ret = response.result(0).double_value(0);
|
||||
to.insert_data((char*)&ret, 0);
|
||||
}
|
||||
if (which.is_int32()) {
|
||||
int32_t ret = response.result(0).int32_value(0);
|
||||
to.insert_data((char*)&ret, 0);
|
||||
}
|
||||
if (which.is_uint32()) {
|
||||
uint32_t ret = response.result(0).uint32_value(0);
|
||||
to.insert_data((char*)&ret, 0);
|
||||
}
|
||||
if (which.is_int64()) {
|
||||
int64_t ret = response.result(0).int64_value(0);
|
||||
to.insert_data((char*)&ret, 0);
|
||||
}
|
||||
if (which.is_uint64()) {
|
||||
uint64_t ret = response.result(0).uint64_value(0);
|
||||
to.insert_data((char*)&ret, 0);
|
||||
}
|
||||
if (which.is_uint8()) {
|
||||
uint8_t ret = response.result(0).bool_value(0);
|
||||
to.insert_data((char*)&ret, 0);
|
||||
}
|
||||
if (which.is_string()) {
|
||||
std::string ret = response.result(0).string_value(0);
|
||||
to.insert_data(ret.c_str(), ret.size());
|
||||
GETDATA(float, float);
|
||||
} else if (which.is_float64()) {
|
||||
GETDATA(double, double);
|
||||
} else if (which.is_int32()) {
|
||||
GETDATA(int32_t, int32);
|
||||
} else if (which.is_uint32()) {
|
||||
GETDATA(uint32_t, uint32);
|
||||
} else if (which.is_int64()) {
|
||||
GETDATA(int64_t, int64);
|
||||
} else if (which.is_uint64()) {
|
||||
GETDATA(uint64_t, uint64);
|
||||
} else if (which.is_uint8()) {
|
||||
GETDATA(uint8_t, bool);
|
||||
} else if (which.is_string()) {
|
||||
if (response.result_size() > 0 && response.result(0).string_value_size() > 0) {
|
||||
std::string ret = response.result(0).string_value(0);
|
||||
to.insert_data(ret.c_str(), ret.size());
|
||||
} else {
|
||||
LOG(ERROR) << "_server_addr:" << _server_addr << ",_finalize_fn:" << _finalize_fn
|
||||
<< ",msg: failed to get final result cause return type need string but "
|
||||
"result is empty";
|
||||
to.insert_default();
|
||||
}
|
||||
} else {
|
||||
LOG(ERROR) << "failed to get result cause unkown return type";
|
||||
to.insert_default();
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user