Fix fetch tablet meta rpc timeout bug.
This commit is contained in:
@ -1560,3 +1560,9 @@ DEF_BOOL(_optimizer_better_inlist_costing, OB_TENANT_PARAMETER, "False",
|
||||
DEF_TIME(_ls_migration_wait_completing_timeout, OB_TENANT_PARAMETER, "30m", "[60s,)",
|
||||
"the wait timeout in ls complete migration phase",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_INT(_ha_tablet_info_batch_count, OB_TENANT_PARAMETER, "0", "[0,]",
|
||||
"the number of tablet replica info sent by on rpc for ha. Range: [0, +∞) in integer",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
DEF_TIME(_ha_rpc_timeout, OB_TENANT_PARAMETER, "0", "[0,120s]",
|
||||
"the rpc timeout for storage high availability. Range:[0, 120s]",
|
||||
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
|
||||
|
@ -206,6 +206,8 @@ int ObCopyMacroBlockObReader::init(
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", K(ret), K(param));
|
||||
} else {
|
||||
const int64_t rpc_timeout = ObStorageHAUtils::get_rpc_timeout();
|
||||
|
||||
SMART_VAR(ObCopyMacroBlockRangeArg, arg) {
|
||||
if (OB_FAIL(macro_block_mem_context_.init())) {
|
||||
LOG_WARN("failed to init macro block memory context", K(ret));
|
||||
@ -230,7 +232,7 @@ int ObCopyMacroBlockObReader::init(
|
||||
LOG_ERROR("rpc arg must not larger than packet size", K(ret), K(arg.get_serialize_size()));
|
||||
} else if (OB_FAIL(param.svr_rpc_proxy_->to(param.src_info_.src_addr_).by(OB_DATA_TENANT_ID).dst_cluster_id(param.src_info_.cluster_id_)
|
||||
.ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW)
|
||||
.timeout(ObStorageRpcProxy::STREAM_RPC_TIMEOUT)
|
||||
.timeout(rpc_timeout)
|
||||
.fetch_macro_block(arg, rpc_buffer_, handle_))) {
|
||||
LOG_WARN("failed to send fetch macro block rpc", K(param), K(ret));
|
||||
} else {
|
||||
@ -786,6 +788,7 @@ int ObCopyTabletInfoObReader::init(
|
||||
common::ObInOutBandwidthThrottle &bandwidth_throttle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t rpc_timeout = FETCH_TABLET_INFO_TIMEOUT;
|
||||
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
ret = OB_INIT_TWICE;
|
||||
@ -796,7 +799,8 @@ int ObCopyTabletInfoObReader::init(
|
||||
LOG_WARN("invalid argument", K(ret), K(src_info), K(rpc_arg));
|
||||
} else if (OB_FAIL(rpc_reader_.init(bandwidth_throttle))) {
|
||||
LOG_WARN("fail to init tablet info rpc reader", K(ret));
|
||||
} else if (OB_FAIL(srv_rpc_proxy.to(src_info.src_addr_).by(OB_DATA_TENANT_ID).timeout(FETCH_TABLET_INFO_TIMEOUT).dst_cluster_id(src_info.cluster_id_)
|
||||
} else if (FALSE_IT(rpc_timeout = ObStorageHAUtils::get_rpc_timeout())) {
|
||||
} else if (OB_FAIL(srv_rpc_proxy.to(src_info.src_addr_).by(OB_DATA_TENANT_ID).timeout(rpc_timeout).dst_cluster_id(src_info.cluster_id_)
|
||||
.ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW)
|
||||
.fetch_tablet_info(rpc_arg, rpc_reader_.get_rpc_buffer(), rpc_reader_.get_handle()))) {
|
||||
LOG_WARN("failed to send fetch tablet info rpc", K(ret), K(src_info), K(rpc_arg));
|
||||
@ -1051,6 +1055,7 @@ int ObCopySSTableInfoObReader::init(
|
||||
common::ObInOutBandwidthThrottle &bandwidth_throttle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t rpc_timeout = FETCH_TABLET_SSTABLE_INFO_TIMEOUT;
|
||||
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
ret = OB_INIT_TWICE;
|
||||
@ -1061,8 +1066,9 @@ int ObCopySSTableInfoObReader::init(
|
||||
LOG_WARN("invalid argument", K(ret), K(src_info), K(rpc_arg));
|
||||
} else if (OB_FAIL(rpc_reader_.init(bandwidth_throttle))) {
|
||||
LOG_WARN("fail to init tablet info rpc reader", K(ret));
|
||||
} else if (FALSE_IT(rpc_timeout = ObStorageHAUtils::get_rpc_timeout())) {
|
||||
} else if (OB_FAIL(srv_rpc_proxy.to(src_info.src_addr_).by(OB_DATA_TENANT_ID)
|
||||
.timeout(FETCH_TABLET_SSTABLE_INFO_TIMEOUT).dst_cluster_id(src_info.cluster_id_)
|
||||
.timeout(rpc_timeout).dst_cluster_id(src_info.cluster_id_)
|
||||
.ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW)
|
||||
.fetch_tablet_sstable_info(rpc_arg, rpc_reader_.get_rpc_buffer(), rpc_reader_.get_handle()))) {
|
||||
LOG_WARN("failed to send fetch tablet info rpc", K(ret), K(src_info), K(rpc_arg));
|
||||
@ -1912,6 +1918,7 @@ int ObCopySSTableMacroObReader::init(
|
||||
common::ObInOutBandwidthThrottle &bandwidth_throttle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t rpc_timeout = FETCH_SSTABLE_MACRO_INFO_TIMEOUT;
|
||||
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
ret = OB_INIT_TWICE;
|
||||
@ -1922,8 +1929,9 @@ int ObCopySSTableMacroObReader::init(
|
||||
LOG_WARN("invalid argument", K(ret), K(src_info), K(rpc_arg));
|
||||
} else if (OB_FAIL(rpc_reader_.init(bandwidth_throttle))) {
|
||||
LOG_WARN("fail to init tablet info rpc reader", K(ret));
|
||||
} else if (FALSE_IT(rpc_timeout = ObStorageHAUtils::get_rpc_timeout())) {
|
||||
} else if (OB_FAIL(srv_rpc_proxy.to(src_info.src_addr_).by(OB_DATA_TENANT_ID)
|
||||
.timeout(FETCH_SSTABLE_MACRO_INFO_TIMEOUT).dst_cluster_id(src_info.cluster_id_)
|
||||
.timeout(rpc_timeout).dst_cluster_id(src_info.cluster_id_)
|
||||
.ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW)
|
||||
.fetch_sstable_macro_info(rpc_arg, rpc_reader_.get_rpc_buffer(), rpc_reader_.get_handle()))) {
|
||||
LOG_WARN("failed to send fetch tablet info rpc", K(ret), K(src_info), K(rpc_arg));
|
||||
|
@ -31,6 +31,7 @@
|
||||
#include "storage/tx/ob_ts_mgr.h"
|
||||
#include "storage/tx_storage/ob_ls_service.h"
|
||||
#include "rootserver/ob_tenant_info_loader.h"
|
||||
#include "src/observer/omt/ob_tenant_config.h"
|
||||
|
||||
using namespace oceanbase::share;
|
||||
|
||||
@ -448,5 +449,18 @@ int ObTransferUtils::get_gts(const uint64_t tenant_id, SCN >s)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t ObStorageHAUtils::get_rpc_timeout()
|
||||
{
|
||||
int64_t rpc_timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT;
|
||||
int64_t tmp_rpc_timeout = 0;
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
|
||||
if (tenant_config.is_valid()) {
|
||||
tmp_rpc_timeout = tenant_config->_ha_rpc_timeout;
|
||||
rpc_timeout = std::max(rpc_timeout, tmp_rpc_timeout);
|
||||
}
|
||||
return rpc_timeout;
|
||||
}
|
||||
|
||||
|
||||
} // end namespace storage
|
||||
} // end namespace oceanbase
|
||||
|
@ -43,6 +43,7 @@ public:
|
||||
const share::SCN replay_scn,
|
||||
bool &need_rebuild);
|
||||
static int get_readable_scn_with_retry(share::SCN &readable_scn);
|
||||
static int64_t get_rpc_timeout();
|
||||
|
||||
private:
|
||||
static int check_merge_error_(const uint64_t tenant_id, common::ObISQLClient &sql_client);
|
||||
|
@ -1413,8 +1413,17 @@ int ObFetchTabletInfoP::process()
|
||||
ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_MAX;
|
||||
ObCopyTabletInfoObProducer producer;
|
||||
ObCopyTabletInfo tablet_info;
|
||||
const int64_t MAX_TABLET_NUM = 100;
|
||||
int64_t max_tablet_num = 32;
|
||||
int64_t tablet_count = 0;
|
||||
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
|
||||
if (tenant_config.is_valid()) {
|
||||
const int64_t tmp_max_tablet_num = tenant_config->_ha_tablet_info_batch_count;
|
||||
if (0 != tmp_max_tablet_num) {
|
||||
max_tablet_num = tmp_max_tablet_num;
|
||||
}
|
||||
}
|
||||
|
||||
LOG_INFO("start to fetch tablet info", K(arg_));
|
||||
|
||||
last_send_time_ = ObTimeUtility::current_time();
|
||||
@ -1464,7 +1473,7 @@ int ObFetchTabletInfoP::process()
|
||||
} else {
|
||||
STORAGE_LOG(WARN, "failed to get next tablet meta info", K(ret));
|
||||
}
|
||||
} else if (tablet_count >= MAX_TABLET_NUM) {
|
||||
} else if (tablet_count >= max_tablet_num) {
|
||||
timeguard.click();
|
||||
if (this->result_.get_position() > 0 && OB_FAIL(flush_and_wait())) {
|
||||
LOG_WARN("failed to flush and wait", K(ret), K(tablet_info));
|
||||
|
@ -287,6 +287,8 @@ _force_hash_groupby_dump
|
||||
_force_hash_join_spill
|
||||
_force_skip_encoding_partition_id
|
||||
_hash_area_size
|
||||
_ha_rpc_timeout
|
||||
_ha_tablet_info_batch_count
|
||||
_hidden_sys_tenant_memory
|
||||
_ignore_system_memory_over_limit_error
|
||||
_io_callback_thread_count
|
||||
|
Reference in New Issue
Block a user