[BUGFIX] fix lob remote query timeout
This commit is contained in:
@ -55,7 +55,7 @@ struct ObLobTextIterCtx
|
||||
|
||||
ObLobTextIterCtx(ObLobLocatorV2 &locator, const sql::ObBasicSessionInfo *session,
|
||||
ObIAllocator *allocator = NULL, uint32_t buffer_len = OB_LOB_ITER_DEFAULT_BUFFER_LEN) :
|
||||
alloc_(allocator), session_(NULL), buff_(NULL), buff_byte_len_(buffer_len), start_offset_(0),
|
||||
alloc_(allocator), session_(session), buff_(NULL), buff_byte_len_(buffer_len), start_offset_(0),
|
||||
total_access_len_(0), total_byte_len_(0), content_byte_len_(0), content_len_(0),
|
||||
reserved_byte_len_(0), reserved_len_(0), accessed_byte_len_(0), accessed_len_(0),
|
||||
last_accessed_byte_len_(0), last_accessed_len_(0), iter_count_(0), is_cloned_temporary_(false),
|
||||
|
||||
@ -453,10 +453,14 @@ int ObLobManager::query_remote(ObLobAccessParam& param, common::ObAddr& dst_addr
|
||||
arg.lob_locator_.ptr_ = param.lob_locator_->ptr_;
|
||||
arg.lob_locator_.size_ = param.lob_locator_->size_;
|
||||
arg.lob_locator_.has_lob_header_ = param.lob_locator_->has_lob_header_;
|
||||
int64_t timeout = param.timeout_ - ObTimeUtility::current_time();
|
||||
if (timeout < ObStorageRpcProxy::STREAM_RPC_TIMEOUT) {
|
||||
timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT;
|
||||
}
|
||||
ret = svr_rpc_proxy->to(dst_addr).by(arg.tenant_id_)
|
||||
.dst_cluster_id(GCONF.cluster_id)
|
||||
.ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW)
|
||||
.timeout(ObStorageRpcProxy::STREAM_RPC_TIMEOUT)
|
||||
.timeout(timeout)
|
||||
.lob_query(arg, rpc_buffer, handle);
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("failed to do remote query", K(ret));
|
||||
@ -1808,10 +1812,14 @@ int ObLobManager::getlength_remote(ObLobAccessParam& param, common::ObAddr& dst_
|
||||
arg.lob_locator_.ptr_ = param.lob_locator_->ptr_;
|
||||
arg.lob_locator_.size_ = param.lob_locator_->size_;
|
||||
arg.lob_locator_.has_lob_header_ = param.lob_locator_->has_lob_header_;
|
||||
int64_t timeout = param.timeout_ - ObTimeUtility::current_time();
|
||||
if (timeout < ObStorageRpcProxy::STREAM_RPC_TIMEOUT) {
|
||||
timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT;
|
||||
}
|
||||
ret = svr_rpc_proxy->to(dst_addr).by(arg.tenant_id_)
|
||||
.dst_cluster_id(GCONF.cluster_id)
|
||||
.ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW)
|
||||
.timeout(ObStorageRpcProxy::STREAM_RPC_TIMEOUT)
|
||||
.timeout(timeout)
|
||||
.lob_query(arg, rpc_buffer, handle);
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("failed to do remote query", K(ret));
|
||||
|
||||
@ -1706,8 +1706,9 @@ int ObLobQueryP::process_read()
|
||||
param.scan_backward_ = arg_.scan_backward_;
|
||||
param.from_rpc_ = true;
|
||||
ObLobQueryIter *iter = nullptr;
|
||||
int64_t timeout = rpc_pkt_->get_timeout() + get_send_timestamp();
|
||||
if (OB_FAIL(lob_mngr->build_lob_param(param, allocator_, arg_.cs_type_, arg_.offset_,
|
||||
arg_.len_, ObStorageRpcProxy::STREAM_RPC_TIMEOUT, arg_.lob_locator_))) {
|
||||
arg_.len_, timeout, arg_.lob_locator_))) {
|
||||
LOG_WARN("failed to build lob param", K(ret));
|
||||
} else if (OB_FAIL(lob_mngr->query(param, iter))) {
|
||||
LOG_WARN("failed to query lob.", K(ret), K(param));
|
||||
@ -1752,7 +1753,7 @@ int ObLobQueryP::process_getlength()
|
||||
param.from_rpc_ = true;
|
||||
header.reset();
|
||||
uint64_t len = 0;
|
||||
int64_t timeout = ObTimeUtility::current_time() + ObStorageRpcProxy::STREAM_RPC_TIMEOUT;
|
||||
int64_t timeout = rpc_pkt_->get_timeout() + get_send_timestamp();
|
||||
if (OB_FAIL(lob_mngr->build_lob_param(param, allocator_, arg_.cs_type_, arg_.offset_,
|
||||
arg_.len_, timeout, arg_.lob_locator_))) {
|
||||
LOG_WARN("failed to build lob param", K(ret));
|
||||
|
||||
Reference in New Issue
Block a user