add _stream_rpc_max_wait_timeout to avoid tenant worker hung for waiting next request
This commit is contained in:
@ -45,6 +45,12 @@ int __attribute__((weak)) check_arb_white_list(int64_t cluster_id, bool& is_arb)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t __attribute__((weak)) get_stream_rpc_max_wait_timeout(int64_t tenant_id)
|
||||
{
|
||||
//do nothing
|
||||
UNUSED(tenant_id);
|
||||
return ObRpcProcessorBase::DEFAULT_WAIT_NEXT_PACKET_TIMEOUT;
|
||||
}
|
||||
void ObRpcProcessorBase::reuse()
|
||||
{
|
||||
rpc_pkt_ = NULL;
|
||||
@ -542,6 +548,11 @@ int ObRpcProcessorBase::flush(int64_t wait_timeout)
|
||||
rpc::ObRequest *req = NULL;
|
||||
UNIS_VERSION_GUARD(unis_version_);
|
||||
|
||||
const int64_t stream_rpc_max_wait_timeout = get_stream_rpc_max_wait_timeout(tenant_id_);
|
||||
if (0 == wait_timeout || wait_timeout > stream_rpc_max_wait_timeout) {
|
||||
wait_timeout = stream_rpc_max_wait_timeout;
|
||||
}
|
||||
|
||||
if (nullptr == sc_) {
|
||||
sc_ = OB_NEWx(ObRpcStreamCond, (&lib::this_worker().get_sql_arena_allocator()), *sh_);
|
||||
if (nullptr == sc_) {
|
||||
|
@ -108,7 +108,7 @@ protected:
|
||||
virtual int deserialize();
|
||||
virtual int serialize();
|
||||
virtual int response(const int retcode) { return part_response(retcode, true); }
|
||||
virtual int flush(int64_t wait_timeout = DEFAULT_WAIT_NEXT_PACKET_TIMEOUT);
|
||||
virtual int flush(int64_t wait_timeout = 0);
|
||||
|
||||
void set_preserve_recv_data() { preserve_recv_data_ = true; }
|
||||
void set_result_compress_type(common::ObCompressorType t) { result_compress_type_ = t; }
|
||||
|
@ -25,7 +25,7 @@ using namespace oceanbase::obrpc;
|
||||
|
||||
ObRpcSessionHandler::ObRpcSessionHandler()
|
||||
{
|
||||
sessid_ = 0;
|
||||
sessid_ = ObTimeUtility::current_time();
|
||||
ObMemAttr attr(OB_SERVER_TENANT_ID, ObModIds::OB_HASH_NODE_NEXT_WAIT_MAP);
|
||||
SET_USE_500(attr);
|
||||
next_wait_map_.create(MAX_COND_COUNT, attr, attr);
|
||||
|
@ -568,6 +568,16 @@ int64_t get_max_rpc_packet_size()
|
||||
{
|
||||
return GCONF._max_rpc_packet_size;
|
||||
}
|
||||
|
||||
int64_t get_stream_rpc_max_wait_timeout(int64_t tenant_id)
|
||||
{
|
||||
int64_t stream_rpc_max_wait_timeout = ObRpcProcessorBase::DEFAULT_WAIT_NEXT_PACKET_TIMEOUT;
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
|
||||
if (OB_LIKELY(tenant_config.is_valid())) {
|
||||
stream_rpc_max_wait_timeout = tenant_config->_stream_rpc_max_wait_timeout;
|
||||
}
|
||||
return stream_rpc_max_wait_timeout;
|
||||
}
|
||||
} // end of namespace obrpc
|
||||
} // end of namespace oceanbase
|
||||
|
||||
|
@ -768,6 +768,9 @@ DEF_TIME(_ob_get_gts_ahead_interval, OB_CLUSTER_PARAMETER, "0s", "[0s, 1s]",
|
||||
DEF_TIME(rpc_timeout, OB_CLUSTER_PARAMETER, "2s",
|
||||
"the time during which a RPC request is permitted to execute before it is terminated",
|
||||
ObParameterAttr(Section::RPC, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_TIME(_stream_rpc_max_wait_timeout, OB_TENANT_PARAMETER, "30s", "[1s,)",
|
||||
"the maximum timeout for a tenant worker thread to wait for the next request while processing streaming RPC",
|
||||
ObParameterAttr(Section::RPC, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_BOOL(_enable_pkt_nio, OB_CLUSTER_PARAMETER, "True",
|
||||
"enable pkt-nio, the new RPC framework"
|
||||
"Value: True:turned on; False: turned off",
|
||||
|
@ -393,6 +393,7 @@ _sql_insert_multi_values_split_opt
|
||||
_stall_threshold_for_dynamic_worker
|
||||
_storage_leak_check_mod
|
||||
_storage_meta_memory_limit_percentage
|
||||
_stream_rpc_max_wait_timeout
|
||||
_system_tenant_limit_mode
|
||||
_temporary_file_io_area_size
|
||||
_temporary_file_meta_memory_limit_percentage
|
||||
|
Reference in New Issue
Block a user