diff --git a/src/storage/lob/ob_lob_manager.cpp b/src/storage/lob/ob_lob_manager.cpp index 20076c5c44..69e161e51b 100644 --- a/src/storage/lob/ob_lob_manager.cpp +++ b/src/storage/lob/ob_lob_manager.cpp @@ -462,48 +462,79 @@ bool ObLobManager::is_remote_ret_can_retry(int ret) return (ret == OB_NOT_MASTER); } -int ObLobManager::lob_remote_query_with_retry( - ObLobAccessParam ¶m, - common::ObAddr& dst_addr, - ObLobQueryArg& arg, - int64_t timeout, - common::ObDataBuffer& rpc_buffer, - obrpc::ObStorageRpcProxy::SSHandle& handle) +int ObLobManager::lob_query_with_retry(ObLobAccessParam ¶m, ObAddr &dst_addr, + bool &remote_bret, ObLobMetaScanIter& iter, + ObLobQueryArg::QueryType qtype, void *&ctx) { int ret = OB_SUCCESS; - ObLSService *ls_service = (MTL(ObLSService *)); - obrpc::ObStorageRpcProxy *svr_rpc_proxy = ls_service->get_storage_rpc_proxy(); - int64_t retry_max = REMOTE_LOB_QUERY_RETRY_MAX; int64_t retry_cnt = 0; bool is_continue = true; + ObLSService *ls_service = (MTL(ObLSService *)); + obrpc::ObStorageRpcProxy *svr_rpc_proxy = ls_service->get_storage_rpc_proxy(); + oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY); do { - ret = svr_rpc_proxy->to(dst_addr).by(arg.tenant_id_) - .dst_cluster_id(GCONF.cluster_id) - .ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW) - .timeout(timeout) - .lob_query(arg, rpc_buffer, handle); + if (remote_bret) { + // first try to init remote ctx + if (OB_FAIL(lob_remote_query_init_ctx(param, qtype, ctx))) { + LOG_WARN("fail to init remote query ctx", K(ret)); + } else { + ObLobRemoteQueryCtx *remote_ctx = reinterpret_cast(ctx); + int64_t timeout = param.timeout_ - ObTimeUtility::current_time(); + if (timeout < ObStorageRpcProxy::STREAM_RPC_TIMEOUT) { + timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT; + } + ret = svr_rpc_proxy->to(dst_addr).by(remote_ctx->query_arg_.tenant_id_) + .dst_cluster_id(GCONF.cluster_id) + .ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW) + .timeout(timeout) + .lob_query(remote_ctx->query_arg_, remote_ctx->rpc_buffer_, remote_ctx->handle_); + if (OB_FAIL(ret)) { + LOG_WARN("failed to do remote lob query", K(ret), K(remote_ctx->query_arg_), K(dst_addr), K(timeout)); + } + } + } else { + if (OB_FAIL(lob_ctx_.lob_meta_mngr_->scan(param, iter))) { + LOG_WARN("failed to do local lob query and show retry cnt and mem usage", K(ret), K(param), + K(dst_addr), K(retry_cnt), K(param.allocator_->total()), K(param.allocator_->used())); + // reset iter for maybe has done alloc for iter + iter.reset(); + } + } if (OB_FAIL(ret)) { - LOG_WARN("failed to do remote query", K(ret), K(arg), K(dst_addr), K(timeout)); - if (is_remote_ret_can_retry(ret)) { + // check timeout + if (param.from_rpc_) { // from rpc should not do retry, just return ret + is_continue = false; + } else if (is_remote_ret_can_retry(ret)) { retry_cnt++; - if (retry_cnt > retry_max) { + if (retry_cnt >= 100 && retry_cnt % 50L == 0) { + LOG_INFO("[LOB RETRY] The LOB query has been retried multiple times without success, " + "and the execution may be blocked by a specific exception", KR(ret), + "continuous_retry_cnt", retry_cnt, K(param), K(remote_bret), K(dst_addr)); + } + if (ObTimeUtility::current_time() > param.timeout_) { is_continue = false; - LOG_INFO("retry cnt is reach retry_max_cnt, return error code", K(ret), K(retry_cnt), K(retry_max)); + ret = OB_TIMEOUT; + int64_t cur_time = ObTimeUtility::current_time(); + LOG_WARN("[LOB RETRY] query timeout", K(cur_time), K(param.timeout_), K(ret)); + } else if (IS_INTERRUPTED()) { // for worker interrupted + is_continue = false; + LOG_INFO("[LOB RETRY] Retry is interrupted by worker interrupt signal", KR(ret)); + } else if (lib::Worker::WS_OUT_OF_THROTTLE == THIS_WORKER.check_wait()) { + is_continue = false; + ret = OB_KILLED_BY_THROTTLING; + LOG_INFO("[LOB RETRY] Retry is interrupted by worker check wait", KR(ret)); } else { switch (ret) { case OB_NOT_MASTER: { - bool remote_bret = false; - // refresh leader + remote_bret = false; + // refresh location if (OB_FAIL(is_remote(param, remote_bret, dst_addr))) { - LOG_WARN("fail to refresh leader addr", K(ret), K(param)); - is_continue = false; - } else { - LOG_INFO("refresh leader location", K(retry_cnt), K(retry_max), K(remote_bret), K(dst_addr), K(param)); + LOG_WARN("fail to do check is remote", K(ret)); } break; } default: { - LOG_INFO("do nothing, just retry", K(ret), K(retry_cnt), K(retry_max)); + LOG_INFO("do nothing, just retry", K(ret), K(retry_cnt)); } } } @@ -517,63 +548,78 @@ int ObLobManager::lob_remote_query_with_retry( return ret; } -int ObLobManager::query_remote(ObLobAccessParam& param, common::ObAddr& dst_addr, ObString& data) +int ObLobManager::lob_remote_query_init_ctx( + ObLobAccessParam ¶m, + ObLobQueryArg::QueryType qtype, + void *&ctx) +{ + int ret = OB_SUCCESS; + if (ctx != nullptr) { + // do nothing, has been init + } else { + void *buff = param.allocator_->alloc(sizeof(ObLobRemoteQueryCtx)); + if (OB_ISNULL(buff)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to alloc lob remote query ctx", K(ret)); + } else { + ObLobRemoteQueryCtx *remote_ctx = new(buff)ObLobRemoteQueryCtx(); + if (OB_FAIL(remote_ctx->remote_reader_.open(param, remote_ctx->rpc_buffer_))) { + LOG_WARN("fail to open lob remote reader", K(ret)); + } else { + // build arg + remote_ctx->query_arg_.tenant_id_ = param.tenant_id_; + remote_ctx->query_arg_.offset_ = param.offset_; + remote_ctx->query_arg_.len_ = param.len_; + remote_ctx->query_arg_.cs_type_ = param.coll_type_; + remote_ctx->query_arg_.scan_backward_ = param.scan_backward_; + remote_ctx->query_arg_.qtype_ = qtype; + remote_ctx->query_arg_.lob_locator_.ptr_ = param.lob_locator_->ptr_; + remote_ctx->query_arg_.lob_locator_.size_ = param.lob_locator_->size_; + remote_ctx->query_arg_.lob_locator_.has_lob_header_ = param.lob_locator_->has_lob_header_; + //set ctx ptr + ctx = buff; + } + } + } + return ret; +} + +int ObLobManager::query_remote(ObLobAccessParam& param, ObString& data) { int ret = OB_SUCCESS; ObLobLocatorV2 *lob_locator = param.lob_locator_; - obrpc::ObStorageRpcProxy::SSHandle handle; - common::ObDataBuffer rpc_buffer; - ObLobQueryRemoteReader reader; if (OB_ISNULL(lob_locator)) { ret = OB_ERR_NULL_VALUE; LOG_WARN("lob locator is null.", K(ret), K(param)); - } else if (OB_FAIL(reader.open(param, rpc_buffer))) { - LOG_WARN("fail to open lob remote reader", K(ret)); + } else if (OB_ISNULL(param.remote_query_ctx_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get remote query ctx nullptr", K(ret), K(param)); } else { - SMART_VAR(ObLobQueryArg, arg) { - // build arg - arg.tenant_id_ = param.tenant_id_; - arg.offset_ = param.offset_; - arg.len_ = param.len_; - arg.cs_type_ = param.coll_type_; - arg.scan_backward_ = param.scan_backward_; - arg.qtype_ = ObLobQueryArg::QueryType::READ; - arg.lob_locator_.ptr_ = param.lob_locator_->ptr_; - arg.lob_locator_.size_ = param.lob_locator_->size_; - arg.lob_locator_.has_lob_header_ = param.lob_locator_->has_lob_header_; - int64_t timeout = param.timeout_ - ObTimeUtility::current_time(); - if (timeout < ObStorageRpcProxy::STREAM_RPC_TIMEOUT) { - timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT; - } - if (OB_FAIL(lob_remote_query_with_retry(param, dst_addr, arg, timeout, rpc_buffer, handle))) { - LOG_WARN("failed to do remote query", K(ret), K(arg)); + ObLobRemoteQueryCtx *remote_ctx = reinterpret_cast(param.remote_query_ctx_); + ObLobQueryBlock block; + ObString block_data; + while (OB_SUCC(ret)) { + if (OB_FAIL(remote_ctx->remote_reader_.get_next_block(param, remote_ctx->rpc_buffer_, remote_ctx->handle_, block, block_data))) { + if (ret != OB_ITER_END) { + LOG_WARN("failed to get next lob query block", K(ret)); + } } else { - ObLobQueryBlock block; - ObString block_data; - while (OB_SUCC(ret)) { - if (OB_FAIL(reader.get_next_block(param, rpc_buffer, handle, block, block_data))) { - if (ret != OB_ITER_END) { - LOG_WARN("failed to get next lob query block", K(ret)); - } - } else { - if (param.scan_backward_) { - if (data.write_front(block_data.ptr(), block_data.length()) != block_data.length()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("fail to write data buffer", K(ret), K(data.remain()), K(block_data.length())); - } - } else { - if (data.write(block_data.ptr(), block_data.length()) != block_data.length()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("fail to write data buffer", K(ret), K(data.remain()), K(block_data.length())); - } - } + if (param.scan_backward_) { + if (data.write_front(block_data.ptr(), block_data.length()) != block_data.length()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to write data buffer", K(ret), K(data.remain()), K(block_data.length())); + } + } else { + if (data.write(block_data.ptr(), block_data.length()) != block_data.length()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to write data buffer", K(ret), K(data.remain()), K(block_data.length())); } } - if (ret == OB_ITER_END) { - ret = OB_SUCCESS; - } } } + if (ret == OB_ITER_END) { + ret = OB_SUCCESS; + } } return ret; } @@ -666,53 +712,52 @@ int ObLobManager::query( } else { bool is_remote_lob = false; common::ObAddr dst_addr; + ObLobMetaScanIter meta_iter; + param.lob_data_ = reinterpret_cast(lob_common->buffer_); if (OB_FAIL(is_remote(param, is_remote_lob, dst_addr))) { LOG_WARN("check is remote failed.", K(ret), K(param)); + } else if (OB_FAIL(lob_query_with_retry(param, dst_addr, is_remote_lob, meta_iter, + ObLobQueryArg::QueryType::READ, param.remote_query_ctx_))) { + LOG_WARN("fail to do lob query with retry", K(ret), K(is_remote_lob), K(dst_addr)); } else if (is_remote_lob) { - if (OB_FAIL(query_remote(param, dst_addr, output_data))) { + if (OB_FAIL(query_remote(param, output_data))) { LOG_WARN("do remote query failed.", K(ret), K(param), K(dst_addr)); } } else { - ObLobMetaScanIter meta_iter; - ObLobCtx lob_ctx = lob_ctx_; if (!lob_common->is_init_) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid lob common header for out row.", K(ret), KPC(lob_common)); + } else if (param.is_full_read()) { + if (OB_FAIL(read_all(param, meta_iter, output_data))) { + LOG_WARN("read_all fail", K(ret), K(param)); + } } else { - param.lob_data_ = reinterpret_cast(lob_common->buffer_); - if (OB_FAIL(lob_ctx.lob_meta_mngr_->scan(param, meta_iter))) { - LOG_WARN("do lob meta scan failed.", K(ret), K(param)); - } else if (param.is_full_read()) { - if (OB_FAIL(read_all(param, meta_iter, output_data))) { - LOG_WARN("read_all fail", K(ret), K(param)); - } - } else { - ObLobQueryResult result; - while (OB_SUCC(ret)) { - ret = meta_iter.get_next_row(result.meta_result_); - if (OB_FAIL(ret)) { - if (ret == OB_ITER_END) { - } else { - LOG_WARN("failed to get next row.", K(ret)); - } - } else if (ObTimeUtility::current_time() > param.timeout_) { - ret = OB_TIMEOUT; - int64_t cur_time = ObTimeUtility::current_time(); - LOG_WARN("query timeout", K(cur_time), K(param.timeout_), K(ret)); - /* TODO: weiyouchao.wyc should set param.asscess_ptable_ as false 2022.4.7 */ - } else if (param.asscess_ptable_ /* not operate piece table currently */ && - OB_FAIL(lob_ctx.lob_piece_mngr_->get(param, result.meta_result_.info_.piece_id_, result.piece_info_))) { - LOG_WARN("get lob piece failed.", K(ret), K(result)); - } else if (OB_FAIL(get_real_data(param, result, output_data))) { - LOG_WARN("failed to write data to output buf.", K(ret), K(result), K(output_data)); + ObLobQueryResult result; + while (OB_SUCC(ret)) { + ret = meta_iter.get_next_row(result.meta_result_); + if (OB_FAIL(ret)) { + if (ret == OB_ITER_END) { + } else { + LOG_WARN("failed to get next row.", K(ret)); } - } - if (ret == OB_ITER_END) { - ret = OB_SUCCESS; + } else if (ObTimeUtility::current_time() > param.timeout_) { + ret = OB_TIMEOUT; + int64_t cur_time = ObTimeUtility::current_time(); + LOG_WARN("query timeout", K(cur_time), K(param.timeout_), K(ret)); + } else if (OB_FAIL(get_real_data(param, result, output_data))) { + LOG_WARN("failed to write data to output buf.", K(ret), K(result), K(output_data)); } } + if (ret == OB_ITER_END) { + ret = OB_SUCCESS; + } } } + // finish query, release resource + if (OB_NOT_NULL(param.remote_query_ctx_)) { + ObLobRemoteQueryCtx *remote_ctx = reinterpret_cast(param.remote_query_ctx_); + remote_ctx->~ObLobRemoteQueryCtx(); + } } } return ret; @@ -816,14 +861,8 @@ int ObLobManager::query( LOG_WARN("alloc lob meta scan iterator fail", K(ret)); } else if (OB_FAIL(is_remote(param, is_remote_lob, dst_addr))) { LOG_WARN("check is remote failed.", K(ret), K(param)); - } else if (is_remote_lob) { - if (OB_FAIL(iter->open(param, dst_addr))) { - LOG_WARN("open remote iter query failed.", K(ret), K(param), K(dst_addr)); - } - } else { - if (OB_FAIL(iter->open(param, lob_ctx))) { - LOG_WARN("open local meta scan iter failed", K(ret), K(param)); - } + } else if (OB_FAIL(iter->open(param, lob_ctx, dst_addr, is_remote_lob))) { + LOG_WARN("open local meta scan iter failed", K(ret), K(param), K(dst_addr), K(is_remote_lob)); } if (OB_SUCC(ret)) { result = iter; @@ -2199,74 +2238,50 @@ int ObLobManager::getlength_remote(ObLobAccessParam& param, common::ObAddr& dst_ { int ret = OB_SUCCESS; ObLobLocatorV2 *lob_locator = param.lob_locator_; - obrpc::ObStorageRpcProxy::SSHandle handle; - common::ObDataBuffer rpc_buffer; ObLobQueryBlock header; char *buf = nullptr; int64_t buffer_len = header.get_serialize_size(); if (OB_ISNULL(lob_locator)) { ret = OB_ERR_NULL_VALUE; LOG_WARN("lob locator is null.", K(ret), K(param)); - } else if (OB_ISNULL(buf = static_cast(param.allocator_->alloc(buffer_len)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed alloc buffer.", K(ret), K(buffer_len)); - } else if (!rpc_buffer.set_data(buf, buffer_len)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to set rpc buffer", K(ret), K(buffer_len)); + } else if (OB_ISNULL(param.remote_query_ctx_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get remote query ctx nullptr", K(ret), K(param)); } else { - SMART_VAR(ObLobQueryArg, arg) { - // build arg - arg.tenant_id_ = param.tenant_id_; - arg.offset_ = param.offset_; - arg.len_ = param.len_; - arg.cs_type_ = param.coll_type_; - arg.scan_backward_ = param.scan_backward_; - arg.qtype_ = ObLobQueryArg::QueryType::GET_LENGTH; - arg.lob_locator_.ptr_ = param.lob_locator_->ptr_; - arg.lob_locator_.size_ = param.lob_locator_->size_; - arg.lob_locator_.has_lob_header_ = param.lob_locator_->has_lob_header_; - int64_t timeout = param.timeout_ - ObTimeUtility::current_time(); - if (timeout < ObStorageRpcProxy::STREAM_RPC_TIMEOUT) { - timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT; - } - if (OB_FAIL(lob_remote_query_with_retry(param, dst_addr, arg, timeout, rpc_buffer, handle))) { - LOG_WARN("failed to do remote query", K(ret), K(arg)); + ObLobRemoteQueryCtx *remote_ctx = reinterpret_cast(param.remote_query_ctx_); + int64_t cur_position = remote_ctx->rpc_buffer_.get_position(); + while (OB_SUCC(ret) && remote_ctx->handle_.has_more()) { + cur_position = remote_ctx->rpc_buffer_.get_position(); + if (OB_FAIL(remote_ctx->handle_.get_more(remote_ctx->rpc_buffer_))) { + ret = OB_DATA_SOURCE_TIMEOUT; + } else if (remote_ctx->rpc_buffer_.get_position() < 0) { + ret = OB_ERR_SYS; + } else if (cur_position == remote_ctx->rpc_buffer_.get_position()) { + if (!remote_ctx->handle_.has_more()) { + ret = OB_ITER_END; + LOG_DEBUG("empty rpc buffer, no more data", K(remote_ctx->rpc_buffer_)); + } else { + ret = OB_ERR_SYS; + LOG_ERROR("rpc buffer has no data", K(ret), K(remote_ctx->rpc_buffer_)); + } } else { - int64_t cur_position = rpc_buffer.get_position(); - while (OB_SUCC(ret) && handle.has_more()) { - cur_position = rpc_buffer.get_position(); - if (OB_FAIL(handle.get_more(rpc_buffer))) { - ret = OB_DATA_SOURCE_TIMEOUT; - } else if (rpc_buffer.get_position() < 0) { - ret = OB_ERR_SYS; - } else if (cur_position == rpc_buffer.get_position()) { - if (!handle.has_more()) { - ret = OB_ITER_END; - LOG_DEBUG("empty rpc buffer, no more data", K(rpc_buffer)); - } else { - ret = OB_ERR_SYS; - LOG_ERROR("rpc buffer has no data", K(ret), K(rpc_buffer)); - } - } else { - LOG_DEBUG("get more data", K(rpc_buffer)); - } - } - if (ret == OB_ITER_END) { - ret = OB_SUCCESS; - } - // do header decode - if (OB_SUCC(ret)) { - int64_t rpc_buffer_pos = 0; - if (OB_FAIL(serialization::decode(rpc_buffer.get_data(), - rpc_buffer.get_position(), rpc_buffer_pos, header))) { - LOG_WARN("failed to decode lob query block", K(ret), K(rpc_buffer)); - } else if (!header.is_valid()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid header", K(ret), K(header)); - } else { - len = static_cast(header.size_); - } - } + LOG_DEBUG("get more data", K(remote_ctx->rpc_buffer_)); + } + } + if (ret == OB_ITER_END) { + ret = OB_SUCCESS; + } + // do header decode + if (OB_SUCC(ret)) { + int64_t rpc_buffer_pos = 0; + if (OB_FAIL(serialization::decode(remote_ctx->rpc_buffer_.get_data(), + remote_ctx->rpc_buffer_.get_position(), rpc_buffer_pos, header))) { + LOG_WARN("failed to decode lob query block", K(ret), K(remote_ctx->rpc_buffer_)); + } else if (!header.is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid header", K(ret), K(header)); + } else { + len = static_cast(header.size_); } } } @@ -2352,48 +2367,51 @@ int ObLobManager::getlength(ObLobAccessParam& param, uint64_t &len) } else { // do meta scan bool is_remote_lob = false; common::ObAddr dst_addr; + ObLobMetaScanIter meta_iter; + param.lob_data_ = reinterpret_cast(lob_common->buffer_); + // mock do full scan + param.offset_ = 0; + param.len_ = UINT64_MAX; if (OB_FAIL(is_remote(param, is_remote_lob, dst_addr))) { LOG_WARN("check is remote failed.", K(ret), K(param)); + } else if (OB_FAIL(lob_query_with_retry(param, dst_addr, is_remote_lob, meta_iter, + ObLobQueryArg::QueryType::GET_LENGTH, param.remote_query_ctx_))) { + LOG_WARN("fail to do lob query with retry", K(ret), K(is_remote_lob), K(dst_addr)); } else if (is_remote_lob) { if (OB_FAIL(getlength_remote(param, dst_addr, len))) { LOG_WARN("fail to get length remote", K(ret)); } } else { - ObLobMetaScanIter meta_iter; - ObLobCtx lob_ctx = lob_ctx_; if (!lob_common->is_init_) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid lob common header for out row.", K(ret), KPC(lob_common)); } else { - param.lob_data_ = reinterpret_cast(lob_common->buffer_); - // mock do full scan - param.offset_ = 0; - param.len_ = UINT64_MAX; - if (OB_FAIL(lob_ctx.lob_meta_mngr_->scan(param, meta_iter))) { - LOG_WARN("do lob meta scan failed.", K(ret), K(param)); - } else { - ObLobQueryResult result; - while (OB_SUCC(ret)) { - ret = meta_iter.get_next_row(result.meta_result_); - if (OB_FAIL(ret)) { - if (ret == OB_ITER_END) { - } else { - LOG_WARN("failed to get next row.", K(ret)); - } - } else if (ObTimeUtility::current_time() > param.timeout_) { - ret = OB_TIMEOUT; - int64_t cur_time = ObTimeUtility::current_time(); - LOG_WARN("query timeout", K(cur_time), K(param.timeout_), K(ret)); + ObLobQueryResult result; + while (OB_SUCC(ret)) { + ret = meta_iter.get_next_row(result.meta_result_); + if (OB_FAIL(ret)) { + if (ret == OB_ITER_END) { } else { - len += result.meta_result_.info_.char_len_; + LOG_WARN("failed to get next row.", K(ret)); } - } - if (ret == OB_ITER_END) { - ret = OB_SUCCESS; + } else if (ObTimeUtility::current_time() > param.timeout_) { + ret = OB_TIMEOUT; + int64_t cur_time = ObTimeUtility::current_time(); + LOG_WARN("query timeout", K(cur_time), K(param.timeout_), K(ret)); + } else { + len += result.meta_result_.info_.char_len_; } } + if (ret == OB_ITER_END) { + ret = OB_SUCCESS; + } } } + // release remote query resource + if (OB_NOT_NULL(param.remote_query_ctx_)) { + ObLobRemoteQueryCtx *remote_ctx = reinterpret_cast(param.remote_query_ctx_); + remote_ctx->~ObLobRemoteQueryCtx(); + } } } return ret; @@ -3761,16 +3779,32 @@ int ObLobManager::build_lob_param(ObLobAccessParam& param, /*************ObLobQueryIter*****************/ -int ObLobQueryIter::open(ObLobAccessParam ¶m, ObLobCtx& lob_ctx) +int ObLobQueryIter::open(ObString &data, uint32_t byte_offset, uint32_t byte_len, ObCollationType cs, bool is_reverse) { int ret = OB_SUCCESS; - if (OB_ISNULL(lob_ctx.lob_meta_mngr_) || - OB_ISNULL(lob_ctx.lob_piece_mngr_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid lob ctx.", K(ret), K(lob_ctx)); - } else if (OB_FAIL(lob_ctx.lob_meta_mngr_->scan(param, meta_iter_))) { - LOG_WARN("open meta iter failed."); - } else { + cur_pos_ = 0; + inner_data_.assign_ptr(data.ptr() + byte_offset, byte_len); + is_inited_ = true; + is_in_row_ = true; + is_reverse_ = is_reverse; + cs_type_ = cs; + return ret; +} + +int ObLobQueryIter::open(ObLobAccessParam ¶m, ObLobCtx& lob_ctx, common::ObAddr &dst_addr, bool &is_remote) +{ + int ret = OB_SUCCESS; + ObLobManager *lob_manager = MTL(ObLobManager*); + if (OB_FAIL(lob_manager->lob_query_with_retry(param, dst_addr, is_remote, meta_iter_, + ObLobQueryArg::QueryType::READ, remote_query_ctx_))) { + LOG_WARN("fail to do lob query with retry", K(ret), K(is_remote), K(dst_addr)); + } else if (is_remote) { // init remote scan + param_ = param; + is_reverse_ = param.scan_backward_; + cs_type_ = param.coll_type_; + is_inited_ = true; + is_remote_ = true; + } else { // init local scan last_data_buf_len_ = OB_MIN(ObLobMetaUtil::LOB_OPER_PIECE_DATA_SIZE, param.byte_size_); last_data_ptr_ = reinterpret_cast(param.allocator_->alloc(last_data_buf_len_)); if (OB_ISNULL(last_data_ptr_)) { @@ -3789,56 +3823,6 @@ int ObLobQueryIter::open(ObLobAccessParam ¶m, ObLobCtx& lob_ctx) return ret; } -int ObLobQueryIter::open(ObString &data, uint32_t byte_offset, uint32_t byte_len, ObCollationType cs, bool is_reverse) -{ - int ret = OB_SUCCESS; - cur_pos_ = 0; - inner_data_.assign_ptr(data.ptr() + byte_offset, byte_len); - is_inited_ = true; - is_in_row_ = true; - is_reverse_ = is_reverse; - cs_type_ = cs; - return ret; -} - -int ObLobQueryIter::open(ObLobAccessParam ¶m, common::ObAddr dst_addr) -{ - int ret = OB_SUCCESS; - ObLobLocatorV2 *lob_locator = param.lob_locator_; - if (OB_ISNULL(lob_locator)) { - ret = OB_ERR_NULL_VALUE; - LOG_WARN("lob locator is null.", K(ret), K(param)); - } else if (OB_FAIL(remote_reader_.open(param, rpc_buffer_))) { - LOG_WARN("failed to open remote reader", K(ret)); - } else { - ObLobManager *lob_manager = MTL(ObLobManager*); - // build arg - query_arg_.tenant_id_ = param.tenant_id_; - query_arg_.offset_ = param.offset_; - query_arg_.len_ = param.len_; - query_arg_.cs_type_ = param.coll_type_; - query_arg_.qtype_ = ObLobQueryArg::QueryType::READ; - query_arg_.scan_backward_ = param.scan_backward_; - query_arg_.lob_locator_.ptr_ = param.lob_locator_->ptr_; - query_arg_.lob_locator_.size_ = param.lob_locator_->size_; - query_arg_.lob_locator_.has_lob_header_ = param.lob_locator_->has_lob_header_; - int64_t timeout = param.timeout_ - ObTimeUtility::current_time(); - if (timeout < ObStorageRpcProxy::STREAM_RPC_TIMEOUT) { - timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT; - } - if (OB_FAIL(lob_manager->lob_remote_query_with_retry(param, dst_addr, query_arg_, timeout, rpc_buffer_, handle_))) { - LOG_WARN("failed to do remote query", K(ret), K(query_arg_)); - } else { - param_ = param; - is_reverse_ = param.scan_backward_; - cs_type_ = param.coll_type_; - is_inited_ = true; - is_remote_ = true; - } - } - return ret; -} - int ObLobQueryIter::get_next_row(ObLobQueryResult &result) { int ret = OB_SUCCESS; @@ -3973,12 +3957,14 @@ int ObLobQueryIter::get_next_row(ObString& data) uint64_t st_len = data.length(); ObLobQueryBlock block; ObString cur_buffer; + ObLobRemoteQueryCtx *remote_ctx = reinterpret_cast(remote_query_ctx_); while (OB_SUCC(ret) && !has_fill_full) { // first try fill buffer remain data to output has_fill_full = fill_buffer_to_data(data); if (has_fill_full) { // data has been filled full, do nothing - } else if (OB_FAIL(remote_reader_.get_next_block(param_, rpc_buffer_, handle_, block, last_data_))) { + } else if (OB_FAIL(remote_ctx->remote_reader_.get_next_block(param_, + remote_ctx->rpc_buffer_, remote_ctx->handle_, block, last_data_))) { if (ret != OB_ITER_END) { LOG_WARN("fail to get block from remote reader", K(ret)); } @@ -4039,6 +4025,11 @@ void ObLobQueryIter::reset() param_.allocator_->free(last_data_ptr_); last_data_ptr_ = nullptr; } + // release remote query resource + if (OB_NOT_NULL(remote_query_ctx_)) { + ObLobRemoteQueryCtx *remote_ctx = reinterpret_cast(remote_query_ctx_); + remote_ctx->~ObLobRemoteQueryCtx(); + } } diff --git a/src/storage/lob/ob_lob_manager.h b/src/storage/lob/ob_lob_manager.h index 9e5ca1b40c..1cc64742e0 100644 --- a/src/storage/lob/ob_lob_manager.h +++ b/src/storage/lob/ob_lob_manager.h @@ -90,16 +90,24 @@ private: ObString data_buffer_; }; +struct ObLobRemoteQueryCtx +{ + ObLobRemoteQueryCtx() : handle_(), rpc_buffer_(), query_arg_(), remote_reader_() {} + obrpc::ObStorageRpcProxy::SSHandle handle_; + common::ObDataBuffer rpc_buffer_; + ObLobQueryArg query_arg_; + ObLobQueryRemoteReader remote_reader_; +}; + class ObLobQueryIter { public: ObLobQueryIter() : is_reverse_(false), cs_type_(CS_TYPE_BINARY), is_end_(false), meta_iter_(), lob_ctx_(), param_(), last_data_(), last_data_ptr_(nullptr), last_data_buf_len_(0), inner_data_(), cur_pos_(0), is_in_row_(false), is_inited_(false), - is_remote_(false), handle_(), rpc_buffer_(), query_arg_(), remote_reader_() {} - int open(ObLobAccessParam ¶m, ObLobCtx& lob_ctx); // outrow open + is_remote_(false), remote_query_ctx_(nullptr) {} int open(ObString &data, uint32_t byte_offset, uint32_t byte_len, ObCollationType cs, bool is_reverse = false); // inrow open - int open(ObLobAccessParam ¶m, common::ObAddr dst_addr); // remote open + int open(ObLobAccessParam ¶m, ObLobCtx& lob_ctx, common::ObAddr& dst_addr, bool &is_remote); // open with retry inner int get_next_row(ObString& data); uint64_t get_cur_pos() { return meta_iter_.get_cur_pos(); } void reset(); @@ -126,10 +134,7 @@ private: bool is_inited_; // remote ctx bool is_remote_; - obrpc::ObStorageRpcProxy::SSHandle handle_; - common::ObDataBuffer rpc_buffer_; - ObLobQueryArg query_arg_; - ObLobQueryRemoteReader remote_reader_; + void* remote_query_ctx_; }; class ObLobManager @@ -184,6 +189,15 @@ public: const ObString &data, common::ObCollationType coll_type, ObLobLocatorV2 &out); + int lob_query_with_retry(ObLobAccessParam ¶m, + ObAddr &dst_addr, + bool &remote_bret, + ObLobMetaScanIter& iter, + ObLobQueryArg::QueryType qtype, + void *&ctx); + int lob_remote_query_init_ctx(ObLobAccessParam ¶m, + ObLobQueryArg::QueryType qtype, + void *&ctx); int lob_remote_query_with_retry( ObLobAccessParam ¶m, common::ObAddr& dst_addr, @@ -308,7 +322,7 @@ private: int get_inrow_data(ObLobAccessParam& param, ObString& data); int get_ls_leader(ObLobAccessParam& param, const uint64_t tenant_id, const share::ObLSID &ls_id, common::ObAddr &leader); int is_remote(ObLobAccessParam& param, bool& is_remote, common::ObAddr& dst_addr); - int query_remote(ObLobAccessParam& param, common::ObAddr& dst_addr, ObString& data); + int query_remote(ObLobAccessParam& param, ObString& data); int getlength_remote(ObLobAccessParam& param, common::ObAddr& dst_addr, uint64_t &len); int do_delete_one_piece(ObLobAccessParam& param, ObLobQueryResult &result, ObString &tmp_buff); int prepare_erase_buffer(ObLobAccessParam& param, ObString &tmp_buff); diff --git a/src/storage/lob/ob_lob_util.h b/src/storage/lob/ob_lob_util.h index 75a9447c0e..2440694777 100644 --- a/src/storage/lob/ob_lob_util.h +++ b/src/storage/lob/ob_lob_util.h @@ -56,7 +56,8 @@ struct ObLobAccessParam { scan_backward_(false), asscess_ptable_(false), offset_(0), len_(0), parent_seq_no_(), seq_no_st_(), used_seq_cnt_(0), total_seq_cnt_(0), checksum_(0), update_len_(0), op_type_(ObLobDataOutRowCtx::OpType::SQL), is_fill_zero_(false), from_rpc_(false), - inrow_read_nocopy_(false), inrow_threshold_(OB_DEFAULT_LOB_INROW_THRESHOLD), spec_lob_id_() + inrow_read_nocopy_(false), inrow_threshold_(OB_DEFAULT_LOB_INROW_THRESHOLD), spec_lob_id_(), + remote_query_ctx_(nullptr) {} ~ObLobAccessParam() { if (OB_NOT_NULL(dml_base_param_)) { @@ -123,6 +124,8 @@ public: bool inrow_read_nocopy_; int64_t inrow_threshold_; ObLobId spec_lob_id_; + // remote query ctx + void *remote_query_ctx_; }; struct ObLobMetaInfo {