diff --git a/src/share/ob_lob_access_utils.h b/src/share/ob_lob_access_utils.h index 4da9eed8e5..cb00c7c512 100644 --- a/src/share/ob_lob_access_utils.h +++ b/src/share/ob_lob_access_utils.h @@ -55,7 +55,7 @@ struct ObLobTextIterCtx ObLobTextIterCtx(ObLobLocatorV2 &locator, const sql::ObBasicSessionInfo *session, ObIAllocator *allocator = NULL, uint32_t buffer_len = OB_LOB_ITER_DEFAULT_BUFFER_LEN) : - alloc_(allocator), session_(NULL), buff_(NULL), buff_byte_len_(buffer_len), start_offset_(0), + alloc_(allocator), session_(session), buff_(NULL), buff_byte_len_(buffer_len), start_offset_(0), total_access_len_(0), total_byte_len_(0), content_byte_len_(0), content_len_(0), reserved_byte_len_(0), reserved_len_(0), accessed_byte_len_(0), accessed_len_(0), last_accessed_byte_len_(0), last_accessed_len_(0), iter_count_(0), is_cloned_temporary_(false), diff --git a/src/storage/lob/ob_lob_manager.cpp b/src/storage/lob/ob_lob_manager.cpp index 0215623040..13a647cda5 100644 --- a/src/storage/lob/ob_lob_manager.cpp +++ b/src/storage/lob/ob_lob_manager.cpp @@ -453,10 +453,14 @@ int ObLobManager::query_remote(ObLobAccessParam& param, common::ObAddr& dst_addr arg.lob_locator_.ptr_ = param.lob_locator_->ptr_; arg.lob_locator_.size_ = param.lob_locator_->size_; arg.lob_locator_.has_lob_header_ = param.lob_locator_->has_lob_header_; + int64_t timeout = param.timeout_ - ObTimeUtility::current_time(); + if (timeout < ObStorageRpcProxy::STREAM_RPC_TIMEOUT) { + timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT; + } ret = svr_rpc_proxy->to(dst_addr).by(arg.tenant_id_) .dst_cluster_id(GCONF.cluster_id) .ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW) - .timeout(ObStorageRpcProxy::STREAM_RPC_TIMEOUT) + .timeout(timeout) .lob_query(arg, rpc_buffer, handle); if (OB_FAIL(ret)) { LOG_WARN("failed to do remote query", K(ret)); @@ -1808,10 +1812,14 @@ int ObLobManager::getlength_remote(ObLobAccessParam& param, common::ObAddr& dst_ arg.lob_locator_.ptr_ = param.lob_locator_->ptr_; arg.lob_locator_.size_ = param.lob_locator_->size_; arg.lob_locator_.has_lob_header_ = param.lob_locator_->has_lob_header_; + int64_t timeout = param.timeout_ - ObTimeUtility::current_time(); + if (timeout < ObStorageRpcProxy::STREAM_RPC_TIMEOUT) { + timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT; + } ret = svr_rpc_proxy->to(dst_addr).by(arg.tenant_id_) .dst_cluster_id(GCONF.cluster_id) .ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW) - .timeout(ObStorageRpcProxy::STREAM_RPC_TIMEOUT) + .timeout(timeout) .lob_query(arg, rpc_buffer, handle); if (OB_FAIL(ret)) { LOG_WARN("failed to do remote query", K(ret)); diff --git a/src/storage/ob_storage_rpc.cpp b/src/storage/ob_storage_rpc.cpp index 815591b2ad..b62728681e 100644 --- a/src/storage/ob_storage_rpc.cpp +++ b/src/storage/ob_storage_rpc.cpp @@ -1706,8 +1706,9 @@ int ObLobQueryP::process_read() param.scan_backward_ = arg_.scan_backward_; param.from_rpc_ = true; ObLobQueryIter *iter = nullptr; + int64_t timeout = rpc_pkt_->get_timeout() + get_send_timestamp(); if (OB_FAIL(lob_mngr->build_lob_param(param, allocator_, arg_.cs_type_, arg_.offset_, - arg_.len_, ObStorageRpcProxy::STREAM_RPC_TIMEOUT, arg_.lob_locator_))) { + arg_.len_, timeout, arg_.lob_locator_))) { LOG_WARN("failed to build lob param", K(ret)); } else if (OB_FAIL(lob_mngr->query(param, iter))) { LOG_WARN("failed to query lob.", K(ret), K(param)); @@ -1752,7 +1753,7 @@ int ObLobQueryP::process_getlength() param.from_rpc_ = true; header.reset(); uint64_t len = 0; - int64_t timeout = ObTimeUtility::current_time() + ObStorageRpcProxy::STREAM_RPC_TIMEOUT; + int64_t timeout = rpc_pkt_->get_timeout() + get_send_timestamp(); if (OB_FAIL(lob_mngr->build_lob_param(param, allocator_, arg_.cs_type_, arg_.offset_, arg_.len_, timeout, arg_.lob_locator_))) { LOG_WARN("failed to build lob param", K(ret));