[Chore](brpc) make error messages more verbose when brpc pool offer failed (#22558)
This commit is contained in:
@ -172,17 +172,17 @@ template <typename T>
|
||||
concept CanCancel = requires(T* response) { response->mutable_status(); };
|
||||
|
||||
template <CanCancel T>
|
||||
void offer_failed(T* response, google::protobuf::Closure* done, const std::string& pool_name) {
|
||||
void offer_failed(T* response, google::protobuf::Closure* done, const PriorityThreadPool& pool) {
|
||||
brpc::ClosureGuard closure_guard(done);
|
||||
response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
|
||||
response->mutable_status()->add_error_msgs("fail to offer request to the work pool, pool=" +
|
||||
pool_name);
|
||||
pool.get_info());
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void offer_failed(T* response, google::protobuf::Closure* done, const std::string& pool_name) {
|
||||
void offer_failed(T* response, google::protobuf::Closure* done, const PriorityThreadPool& pool) {
|
||||
brpc::ClosureGuard closure_guard(done);
|
||||
LOG(WARNING) << "fail to offer request to the work pool, pool=" << pool_name;
|
||||
LOG(WARNING) << "fail to offer request to the work pool, pool=" << pool.get_info();
|
||||
}
|
||||
|
||||
PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
|
||||
@ -272,7 +272,7 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c
|
||||
st.to_protobuf(response->mutable_status());
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -284,7 +284,7 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
|
||||
_exec_plan_fragment_in_pthread(controller, request, response, done);
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -320,7 +320,7 @@ void PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcContr
|
||||
_exec_plan_fragment_in_pthread(controller, request, response, done);
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -336,7 +336,7 @@ void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl
|
||||
st.to_protobuf(result->mutable_status());
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(result, done, _light_work_pool.get_name());
|
||||
offer_failed(result, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -348,7 +348,7 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll
|
||||
_tablet_writer_add_block(controller, request, response, done);
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _heavy_work_pool.get_name());
|
||||
offer_failed(response, done, _heavy_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -369,7 +369,7 @@ void PInternalServiceImpl::tablet_writer_add_block_by_http(
|
||||
}
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _heavy_work_pool.get_name());
|
||||
offer_failed(response, done, _heavy_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -398,7 +398,7 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl
|
||||
response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO);
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _heavy_work_pool.get_name());
|
||||
offer_failed(response, done, _heavy_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -419,7 +419,7 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
|
||||
}
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -496,7 +496,7 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
|
||||
st.to_protobuf(result->mutable_status());
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(result, done, _light_work_pool.get_name());
|
||||
offer_failed(result, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -509,7 +509,7 @@ void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controlle
|
||||
_exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(result, done, _heavy_work_pool.get_name());
|
||||
offer_failed(result, done, _heavy_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -613,7 +613,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
|
||||
st.to_protobuf(result->mutable_status());
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(result, done, _heavy_work_pool.get_name());
|
||||
offer_failed(result, done, _heavy_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -640,7 +640,7 @@ void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co
|
||||
st.to_protobuf(response->mutable_status());
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -652,7 +652,7 @@ void PInternalServiceImpl::get_column_ids_by_tablet_ids(google::protobuf::RpcCon
|
||||
_get_column_ids_by_tablet_ids(controller, request, response, done);
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -768,7 +768,7 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller,
|
||||
Status::OK().to_protobuf(response->mutable_status());
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _heavy_work_pool.get_name());
|
||||
offer_failed(response, done, _heavy_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -780,7 +780,7 @@ void PInternalServiceImpl::update_cache(google::protobuf::RpcController* control
|
||||
_exec_env->result_cache()->update(request, response);
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -792,7 +792,7 @@ void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controll
|
||||
_exec_env->result_cache()->fetch(request, result);
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(result, done, _heavy_work_pool.get_name());
|
||||
offer_failed(result, done, _heavy_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -804,7 +804,7 @@ void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controll
|
||||
_exec_env->result_cache()->clear(request, response);
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -823,7 +823,7 @@ void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* contr
|
||||
st.to_protobuf(response->mutable_status());
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -844,7 +844,7 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr
|
||||
st.to_protobuf(response->mutable_status());
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -865,7 +865,7 @@ void PInternalServiceImpl::apply_filterv2(::google::protobuf::RpcController* con
|
||||
st.to_protobuf(response->mutable_status());
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -898,7 +898,7 @@ void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller
|
||||
}
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _heavy_work_pool.get_name());
|
||||
offer_failed(response, done, _heavy_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -921,7 +921,7 @@ void PInternalServiceImpl::commit(google::protobuf::RpcController* controller,
|
||||
}
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -943,7 +943,7 @@ void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller,
|
||||
}
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -961,7 +961,7 @@ void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* c
|
||||
st.to_protobuf(response->mutable_status());
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -988,7 +988,7 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr
|
||||
_transmit_block(controller, request, response, done, Status::OK());
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, pool.get_name());
|
||||
offer_failed(response, done, pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1006,7 +1006,7 @@ void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcControlle
|
||||
_transmit_block(controller, new_request, response, new_done, st);
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _heavy_work_pool.get_name());
|
||||
offer_failed(response, done, _heavy_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1072,7 +1072,7 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co
|
||||
}
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1110,7 +1110,7 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co
|
||||
}
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1126,7 +1126,7 @@ void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* controlle
|
||||
response->mutable_status()->set_status_code(0);
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _light_work_pool.get_name());
|
||||
offer_failed(response, done, _light_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1344,7 +1344,7 @@ void PInternalServiceImpl::request_slave_tablet_pull_rowset(
|
||||
rowset_meta->tablet_id(), node_id, true);
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _heavy_work_pool.get_name());
|
||||
offer_failed(response, done, _heavy_work_pool);
|
||||
}
|
||||
Status::OK().to_protobuf(response->mutable_status());
|
||||
}
|
||||
@ -1415,7 +1415,7 @@ void PInternalServiceImpl::response_slave_tablet_pull_rowset(
|
||||
Status::OK().to_protobuf(response->mutable_status());
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _heavy_work_pool.get_name());
|
||||
offer_failed(response, done, _heavy_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1588,7 +1588,7 @@ void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro
|
||||
LOG(INFO) << "multiget_data finished, cost(us):" << watch.elapsed_time() / 1000;
|
||||
});
|
||||
if (!ret) {
|
||||
offer_failed(response, done, _heavy_work_pool.get_name());
|
||||
offer_failed(response, done, _heavy_work_pool);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -185,6 +185,8 @@ public:
|
||||
return _queue.size();
|
||||
}
|
||||
|
||||
uint32_t get_max_size() const { return _max_element; }
|
||||
|
||||
// Returns the total amount of time threads have blocked in blocking_get.
|
||||
uint64_t total_get_wait_time() const { return _total_get_wait_time; }
|
||||
|
||||
|
||||
@ -122,7 +122,14 @@ public:
|
||||
join();
|
||||
}
|
||||
|
||||
std::string get_name() const { return _name; }
|
||||
std::string get_info() const {
|
||||
return fmt::format(
|
||||
"PriorityThreadPool(name={}, queue_size={}/{}, active_thread={}/{}, "
|
||||
"total_get_wait_time={}, total_put_wait_time={})",
|
||||
_name, get_queue_size(), _work_queue.get_size(), _work_queue.get_max_size(),
|
||||
_active_threads, _threads.size(), _work_queue.total_get_wait_time(),
|
||||
_work_queue.total_put_wait_time());
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual bool is_shutdown() { return _shutdown; }
|
||||
|
||||
Reference in New Issue
Block a user