diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.cpp b/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.cpp index fe0d625536..24bf55b814 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.cpp @@ -45,6 +45,12 @@ int __attribute__((weak)) check_arb_white_list(int64_t cluster_id, bool& is_arb) return ret; } +int64_t __attribute__((weak)) get_stream_rpc_max_wait_timeout(int64_t tenant_id) +{ + //do nothing + UNUSED(tenant_id); + return ObRpcProcessorBase::DEFAULT_WAIT_NEXT_PACKET_TIMEOUT; +} void ObRpcProcessorBase::reuse() { rpc_pkt_ = NULL; @@ -542,6 +548,11 @@ int ObRpcProcessorBase::flush(int64_t wait_timeout) rpc::ObRequest *req = NULL; UNIS_VERSION_GUARD(unis_version_); + const int64_t stream_rpc_max_wait_timeout = get_stream_rpc_max_wait_timeout(tenant_id_); + if (0 == wait_timeout || wait_timeout > stream_rpc_max_wait_timeout) { + wait_timeout = stream_rpc_max_wait_timeout; + } + if (nullptr == sc_) { sc_ = OB_NEWx(ObRpcStreamCond, (&lib::this_worker().get_sql_arena_allocator()), *sh_); if (nullptr == sc_) { diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.h b/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.h index ce30c3eec6..2db38c8db9 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.h @@ -108,7 +108,7 @@ protected: virtual int deserialize(); virtual int serialize(); virtual int response(const int retcode) { return part_response(retcode, true); } - virtual int flush(int64_t wait_timeout = DEFAULT_WAIT_NEXT_PACKET_TIMEOUT); + virtual int flush(int64_t wait_timeout = 0); void set_preserve_recv_data() { preserve_recv_data_ = true; } void set_result_compress_type(common::ObCompressorType t) { result_compress_type_ = t; } diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp b/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp index 8619c6eb97..1f5c0fad15 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp @@ -25,7 +25,7 @@ using namespace oceanbase::obrpc; ObRpcSessionHandler::ObRpcSessionHandler() { - sessid_ = 0; + sessid_ = ObTimeUtility::current_time(); ObMemAttr attr(OB_SERVER_TENANT_ID, ObModIds::OB_HASH_NODE_NEXT_WAIT_MAP); SET_USE_500(attr); next_wait_map_.create(MAX_COND_COUNT, attr, attr); diff --git a/src/share/config/ob_server_config.cpp b/src/share/config/ob_server_config.cpp index f122b636fe..2087a09449 100644 --- a/src/share/config/ob_server_config.cpp +++ b/src/share/config/ob_server_config.cpp @@ -568,6 +568,16 @@ int64_t get_max_rpc_packet_size() { return GCONF._max_rpc_packet_size; } + +int64_t get_stream_rpc_max_wait_timeout(int64_t tenant_id) +{ + int64_t stream_rpc_max_wait_timeout = ObRpcProcessorBase::DEFAULT_WAIT_NEXT_PACKET_TIMEOUT; + omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id)); + if (OB_LIKELY(tenant_config.is_valid())) { + stream_rpc_max_wait_timeout = tenant_config->_stream_rpc_max_wait_timeout; + } + return stream_rpc_max_wait_timeout; +} } // end of namespace obrpc } // end of namespace oceanbase diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index bf68b53230..fbaf355e56 100755 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -768,6 +768,9 @@ DEF_TIME(_ob_get_gts_ahead_interval, OB_CLUSTER_PARAMETER, "0s", "[0s, 1s]", DEF_TIME(rpc_timeout, OB_CLUSTER_PARAMETER, "2s", "the time during which a RPC request is permitted to execute before it is terminated", ObParameterAttr(Section::RPC, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_TIME(_stream_rpc_max_wait_timeout, OB_TENANT_PARAMETER, "30s", "[1s,)", + "the maximum timeout for a tenant worker thread to wait for the next request while processing streaming RPC", + ObParameterAttr(Section::RPC, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_BOOL(_enable_pkt_nio, OB_CLUSTER_PARAMETER, "True", "enable pkt-nio, the new RPC framework" "Value: True:turned on; False: turned off", diff --git a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result index 3a8395d7f2..70fd118b9e 100644 --- a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result +++ b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result @@ -393,6 +393,7 @@ _sql_insert_multi_values_split_opt _stall_threshold_for_dynamic_worker _storage_leak_check_mod _storage_meta_memory_limit_percentage +_stream_rpc_max_wait_timeout _system_tenant_limit_mode _temporary_file_io_area_size _temporary_file_meta_memory_limit_percentage