support lob retry with local
This commit is contained in:
@ -462,48 +462,79 @@ bool ObLobManager::is_remote_ret_can_retry(int ret)
|
||||
return (ret == OB_NOT_MASTER);
|
||||
}
|
||||
|
||||
int ObLobManager::lob_remote_query_with_retry(
|
||||
ObLobAccessParam ¶m,
|
||||
common::ObAddr& dst_addr,
|
||||
ObLobQueryArg& arg,
|
||||
int64_t timeout,
|
||||
common::ObDataBuffer& rpc_buffer,
|
||||
obrpc::ObStorageRpcProxy::SSHandle<obrpc::OB_LOB_QUERY>& handle)
|
||||
int ObLobManager::lob_query_with_retry(ObLobAccessParam ¶m, ObAddr &dst_addr,
|
||||
bool &remote_bret, ObLobMetaScanIter& iter,
|
||||
ObLobQueryArg::QueryType qtype, void *&ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSService *ls_service = (MTL(ObLSService *));
|
||||
obrpc::ObStorageRpcProxy *svr_rpc_proxy = ls_service->get_storage_rpc_proxy();
|
||||
int64_t retry_max = REMOTE_LOB_QUERY_RETRY_MAX;
|
||||
int64_t retry_cnt = 0;
|
||||
bool is_continue = true;
|
||||
ObLSService *ls_service = (MTL(ObLSService *));
|
||||
obrpc::ObStorageRpcProxy *svr_rpc_proxy = ls_service->get_storage_rpc_proxy();
|
||||
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_LOCAL_RETRY);
|
||||
do {
|
||||
ret = svr_rpc_proxy->to(dst_addr).by(arg.tenant_id_)
|
||||
.dst_cluster_id(GCONF.cluster_id)
|
||||
.ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW)
|
||||
.timeout(timeout)
|
||||
.lob_query(arg, rpc_buffer, handle);
|
||||
if (remote_bret) {
|
||||
// first try to init remote ctx
|
||||
if (OB_FAIL(lob_remote_query_init_ctx(param, qtype, ctx))) {
|
||||
LOG_WARN("fail to init remote query ctx", K(ret));
|
||||
} else {
|
||||
ObLobRemoteQueryCtx *remote_ctx = reinterpret_cast<ObLobRemoteQueryCtx*>(ctx);
|
||||
int64_t timeout = param.timeout_ - ObTimeUtility::current_time();
|
||||
if (timeout < ObStorageRpcProxy::STREAM_RPC_TIMEOUT) {
|
||||
timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT;
|
||||
}
|
||||
ret = svr_rpc_proxy->to(dst_addr).by(remote_ctx->query_arg_.tenant_id_)
|
||||
.dst_cluster_id(GCONF.cluster_id)
|
||||
.ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW)
|
||||
.timeout(timeout)
|
||||
.lob_query(remote_ctx->query_arg_, remote_ctx->rpc_buffer_, remote_ctx->handle_);
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("failed to do remote lob query", K(ret), K(remote_ctx->query_arg_), K(dst_addr), K(timeout));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(lob_ctx_.lob_meta_mngr_->scan(param, iter))) {
|
||||
LOG_WARN("failed to do local lob query and show retry cnt and mem usage", K(ret), K(param),
|
||||
K(dst_addr), K(retry_cnt), K(param.allocator_->total()), K(param.allocator_->used()));
|
||||
// reset iter for maybe has done alloc for iter
|
||||
iter.reset();
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("failed to do remote query", K(ret), K(arg), K(dst_addr), K(timeout));
|
||||
if (is_remote_ret_can_retry(ret)) {
|
||||
// check timeout
|
||||
if (param.from_rpc_) { // from rpc should not do retry, just return ret
|
||||
is_continue = false;
|
||||
} else if (is_remote_ret_can_retry(ret)) {
|
||||
retry_cnt++;
|
||||
if (retry_cnt > retry_max) {
|
||||
if (retry_cnt >= 100 && retry_cnt % 50L == 0) {
|
||||
LOG_INFO("[LOB RETRY] The LOB query has been retried multiple times without success, "
|
||||
"and the execution may be blocked by a specific exception", KR(ret),
|
||||
"continuous_retry_cnt", retry_cnt, K(param), K(remote_bret), K(dst_addr));
|
||||
}
|
||||
if (ObTimeUtility::current_time() > param.timeout_) {
|
||||
is_continue = false;
|
||||
LOG_INFO("retry cnt is reach retry_max_cnt, return error code", K(ret), K(retry_cnt), K(retry_max));
|
||||
ret = OB_TIMEOUT;
|
||||
int64_t cur_time = ObTimeUtility::current_time();
|
||||
LOG_WARN("[LOB RETRY] query timeout", K(cur_time), K(param.timeout_), K(ret));
|
||||
} else if (IS_INTERRUPTED()) { // for worker interrupted
|
||||
is_continue = false;
|
||||
LOG_INFO("[LOB RETRY] Retry is interrupted by worker interrupt signal", KR(ret));
|
||||
} else if (lib::Worker::WS_OUT_OF_THROTTLE == THIS_WORKER.check_wait()) {
|
||||
is_continue = false;
|
||||
ret = OB_KILLED_BY_THROTTLING;
|
||||
LOG_INFO("[LOB RETRY] Retry is interrupted by worker check wait", KR(ret));
|
||||
} else {
|
||||
switch (ret) {
|
||||
case OB_NOT_MASTER: {
|
||||
bool remote_bret = false;
|
||||
// refresh leader
|
||||
remote_bret = false;
|
||||
// refresh location
|
||||
if (OB_FAIL(is_remote(param, remote_bret, dst_addr))) {
|
||||
LOG_WARN("fail to refresh leader addr", K(ret), K(param));
|
||||
is_continue = false;
|
||||
} else {
|
||||
LOG_INFO("refresh leader location", K(retry_cnt), K(retry_max), K(remote_bret), K(dst_addr), K(param));
|
||||
LOG_WARN("fail to do check is remote", K(ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
LOG_INFO("do nothing, just retry", K(ret), K(retry_cnt), K(retry_max));
|
||||
LOG_INFO("do nothing, just retry", K(ret), K(retry_cnt));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -517,63 +548,78 @@ int ObLobManager::lob_remote_query_with_retry(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobManager::query_remote(ObLobAccessParam& param, common::ObAddr& dst_addr, ObString& data)
|
||||
int ObLobManager::lob_remote_query_init_ctx(
|
||||
ObLobAccessParam ¶m,
|
||||
ObLobQueryArg::QueryType qtype,
|
||||
void *&ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (ctx != nullptr) {
|
||||
// do nothing, has been init
|
||||
} else {
|
||||
void *buff = param.allocator_->alloc(sizeof(ObLobRemoteQueryCtx));
|
||||
if (OB_ISNULL(buff)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc lob remote query ctx", K(ret));
|
||||
} else {
|
||||
ObLobRemoteQueryCtx *remote_ctx = new(buff)ObLobRemoteQueryCtx();
|
||||
if (OB_FAIL(remote_ctx->remote_reader_.open(param, remote_ctx->rpc_buffer_))) {
|
||||
LOG_WARN("fail to open lob remote reader", K(ret));
|
||||
} else {
|
||||
// build arg
|
||||
remote_ctx->query_arg_.tenant_id_ = param.tenant_id_;
|
||||
remote_ctx->query_arg_.offset_ = param.offset_;
|
||||
remote_ctx->query_arg_.len_ = param.len_;
|
||||
remote_ctx->query_arg_.cs_type_ = param.coll_type_;
|
||||
remote_ctx->query_arg_.scan_backward_ = param.scan_backward_;
|
||||
remote_ctx->query_arg_.qtype_ = qtype;
|
||||
remote_ctx->query_arg_.lob_locator_.ptr_ = param.lob_locator_->ptr_;
|
||||
remote_ctx->query_arg_.lob_locator_.size_ = param.lob_locator_->size_;
|
||||
remote_ctx->query_arg_.lob_locator_.has_lob_header_ = param.lob_locator_->has_lob_header_;
|
||||
//set ctx ptr
|
||||
ctx = buff;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobManager::query_remote(ObLobAccessParam& param, ObString& data)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLobLocatorV2 *lob_locator = param.lob_locator_;
|
||||
obrpc::ObStorageRpcProxy::SSHandle<obrpc::OB_LOB_QUERY> handle;
|
||||
common::ObDataBuffer rpc_buffer;
|
||||
ObLobQueryRemoteReader reader;
|
||||
if (OB_ISNULL(lob_locator)) {
|
||||
ret = OB_ERR_NULL_VALUE;
|
||||
LOG_WARN("lob locator is null.", K(ret), K(param));
|
||||
} else if (OB_FAIL(reader.open(param, rpc_buffer))) {
|
||||
LOG_WARN("fail to open lob remote reader", K(ret));
|
||||
} else if (OB_ISNULL(param.remote_query_ctx_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get remote query ctx nullptr", K(ret), K(param));
|
||||
} else {
|
||||
SMART_VAR(ObLobQueryArg, arg) {
|
||||
// build arg
|
||||
arg.tenant_id_ = param.tenant_id_;
|
||||
arg.offset_ = param.offset_;
|
||||
arg.len_ = param.len_;
|
||||
arg.cs_type_ = param.coll_type_;
|
||||
arg.scan_backward_ = param.scan_backward_;
|
||||
arg.qtype_ = ObLobQueryArg::QueryType::READ;
|
||||
arg.lob_locator_.ptr_ = param.lob_locator_->ptr_;
|
||||
arg.lob_locator_.size_ = param.lob_locator_->size_;
|
||||
arg.lob_locator_.has_lob_header_ = param.lob_locator_->has_lob_header_;
|
||||
int64_t timeout = param.timeout_ - ObTimeUtility::current_time();
|
||||
if (timeout < ObStorageRpcProxy::STREAM_RPC_TIMEOUT) {
|
||||
timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT;
|
||||
}
|
||||
if (OB_FAIL(lob_remote_query_with_retry(param, dst_addr, arg, timeout, rpc_buffer, handle))) {
|
||||
LOG_WARN("failed to do remote query", K(ret), K(arg));
|
||||
ObLobRemoteQueryCtx *remote_ctx = reinterpret_cast<ObLobRemoteQueryCtx*>(param.remote_query_ctx_);
|
||||
ObLobQueryBlock block;
|
||||
ObString block_data;
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(remote_ctx->remote_reader_.get_next_block(param, remote_ctx->rpc_buffer_, remote_ctx->handle_, block, block_data))) {
|
||||
if (ret != OB_ITER_END) {
|
||||
LOG_WARN("failed to get next lob query block", K(ret));
|
||||
}
|
||||
} else {
|
||||
ObLobQueryBlock block;
|
||||
ObString block_data;
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(reader.get_next_block(param, rpc_buffer, handle, block, block_data))) {
|
||||
if (ret != OB_ITER_END) {
|
||||
LOG_WARN("failed to get next lob query block", K(ret));
|
||||
}
|
||||
} else {
|
||||
if (param.scan_backward_) {
|
||||
if (data.write_front(block_data.ptr(), block_data.length()) != block_data.length()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to write data buffer", K(ret), K(data.remain()), K(block_data.length()));
|
||||
}
|
||||
} else {
|
||||
if (data.write(block_data.ptr(), block_data.length()) != block_data.length()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to write data buffer", K(ret), K(data.remain()), K(block_data.length()));
|
||||
}
|
||||
}
|
||||
if (param.scan_backward_) {
|
||||
if (data.write_front(block_data.ptr(), block_data.length()) != block_data.length()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to write data buffer", K(ret), K(data.remain()), K(block_data.length()));
|
||||
}
|
||||
} else {
|
||||
if (data.write(block_data.ptr(), block_data.length()) != block_data.length()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to write data buffer", K(ret), K(data.remain()), K(block_data.length()));
|
||||
}
|
||||
}
|
||||
if (ret == OB_ITER_END) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (ret == OB_ITER_END) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -666,53 +712,52 @@ int ObLobManager::query(
|
||||
} else {
|
||||
bool is_remote_lob = false;
|
||||
common::ObAddr dst_addr;
|
||||
ObLobMetaScanIter meta_iter;
|
||||
param.lob_data_ = reinterpret_cast<ObLobData*>(lob_common->buffer_);
|
||||
if (OB_FAIL(is_remote(param, is_remote_lob, dst_addr))) {
|
||||
LOG_WARN("check is remote failed.", K(ret), K(param));
|
||||
} else if (OB_FAIL(lob_query_with_retry(param, dst_addr, is_remote_lob, meta_iter,
|
||||
ObLobQueryArg::QueryType::READ, param.remote_query_ctx_))) {
|
||||
LOG_WARN("fail to do lob query with retry", K(ret), K(is_remote_lob), K(dst_addr));
|
||||
} else if (is_remote_lob) {
|
||||
if (OB_FAIL(query_remote(param, dst_addr, output_data))) {
|
||||
if (OB_FAIL(query_remote(param, output_data))) {
|
||||
LOG_WARN("do remote query failed.", K(ret), K(param), K(dst_addr));
|
||||
}
|
||||
} else {
|
||||
ObLobMetaScanIter meta_iter;
|
||||
ObLobCtx lob_ctx = lob_ctx_;
|
||||
if (!lob_common->is_init_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid lob common header for out row.", K(ret), KPC(lob_common));
|
||||
} else if (param.is_full_read()) {
|
||||
if (OB_FAIL(read_all(param, meta_iter, output_data))) {
|
||||
LOG_WARN("read_all fail", K(ret), K(param));
|
||||
}
|
||||
} else {
|
||||
param.lob_data_ = reinterpret_cast<ObLobData*>(lob_common->buffer_);
|
||||
if (OB_FAIL(lob_ctx.lob_meta_mngr_->scan(param, meta_iter))) {
|
||||
LOG_WARN("do lob meta scan failed.", K(ret), K(param));
|
||||
} else if (param.is_full_read()) {
|
||||
if (OB_FAIL(read_all(param, meta_iter, output_data))) {
|
||||
LOG_WARN("read_all fail", K(ret), K(param));
|
||||
}
|
||||
} else {
|
||||
ObLobQueryResult result;
|
||||
while (OB_SUCC(ret)) {
|
||||
ret = meta_iter.get_next_row(result.meta_result_);
|
||||
if (OB_FAIL(ret)) {
|
||||
if (ret == OB_ITER_END) {
|
||||
} else {
|
||||
LOG_WARN("failed to get next row.", K(ret));
|
||||
}
|
||||
} else if (ObTimeUtility::current_time() > param.timeout_) {
|
||||
ret = OB_TIMEOUT;
|
||||
int64_t cur_time = ObTimeUtility::current_time();
|
||||
LOG_WARN("query timeout", K(cur_time), K(param.timeout_), K(ret));
|
||||
/* TODO: weiyouchao.wyc should set param.asscess_ptable_ as false 2022.4.7 */
|
||||
} else if (param.asscess_ptable_ /* not operate piece table currently */ &&
|
||||
OB_FAIL(lob_ctx.lob_piece_mngr_->get(param, result.meta_result_.info_.piece_id_, result.piece_info_))) {
|
||||
LOG_WARN("get lob piece failed.", K(ret), K(result));
|
||||
} else if (OB_FAIL(get_real_data(param, result, output_data))) {
|
||||
LOG_WARN("failed to write data to output buf.", K(ret), K(result), K(output_data));
|
||||
ObLobQueryResult result;
|
||||
while (OB_SUCC(ret)) {
|
||||
ret = meta_iter.get_next_row(result.meta_result_);
|
||||
if (OB_FAIL(ret)) {
|
||||
if (ret == OB_ITER_END) {
|
||||
} else {
|
||||
LOG_WARN("failed to get next row.", K(ret));
|
||||
}
|
||||
}
|
||||
if (ret == OB_ITER_END) {
|
||||
ret = OB_SUCCESS;
|
||||
} else if (ObTimeUtility::current_time() > param.timeout_) {
|
||||
ret = OB_TIMEOUT;
|
||||
int64_t cur_time = ObTimeUtility::current_time();
|
||||
LOG_WARN("query timeout", K(cur_time), K(param.timeout_), K(ret));
|
||||
} else if (OB_FAIL(get_real_data(param, result, output_data))) {
|
||||
LOG_WARN("failed to write data to output buf.", K(ret), K(result), K(output_data));
|
||||
}
|
||||
}
|
||||
if (ret == OB_ITER_END) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
// finish query, release resource
|
||||
if (OB_NOT_NULL(param.remote_query_ctx_)) {
|
||||
ObLobRemoteQueryCtx *remote_ctx = reinterpret_cast<ObLobRemoteQueryCtx*>(param.remote_query_ctx_);
|
||||
remote_ctx->~ObLobRemoteQueryCtx();
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -816,14 +861,8 @@ int ObLobManager::query(
|
||||
LOG_WARN("alloc lob meta scan iterator fail", K(ret));
|
||||
} else if (OB_FAIL(is_remote(param, is_remote_lob, dst_addr))) {
|
||||
LOG_WARN("check is remote failed.", K(ret), K(param));
|
||||
} else if (is_remote_lob) {
|
||||
if (OB_FAIL(iter->open(param, dst_addr))) {
|
||||
LOG_WARN("open remote iter query failed.", K(ret), K(param), K(dst_addr));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(iter->open(param, lob_ctx))) {
|
||||
LOG_WARN("open local meta scan iter failed", K(ret), K(param));
|
||||
}
|
||||
} else if (OB_FAIL(iter->open(param, lob_ctx, dst_addr, is_remote_lob))) {
|
||||
LOG_WARN("open local meta scan iter failed", K(ret), K(param), K(dst_addr), K(is_remote_lob));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
result = iter;
|
||||
@ -2199,74 +2238,50 @@ int ObLobManager::getlength_remote(ObLobAccessParam& param, common::ObAddr& dst_
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLobLocatorV2 *lob_locator = param.lob_locator_;
|
||||
obrpc::ObStorageRpcProxy::SSHandle<obrpc::OB_LOB_QUERY> handle;
|
||||
common::ObDataBuffer rpc_buffer;
|
||||
ObLobQueryBlock header;
|
||||
char *buf = nullptr;
|
||||
int64_t buffer_len = header.get_serialize_size();
|
||||
if (OB_ISNULL(lob_locator)) {
|
||||
ret = OB_ERR_NULL_VALUE;
|
||||
LOG_WARN("lob locator is null.", K(ret), K(param));
|
||||
} else if (OB_ISNULL(buf = static_cast<char*>(param.allocator_->alloc(buffer_len)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("failed alloc buffer.", K(ret), K(buffer_len));
|
||||
} else if (!rpc_buffer.set_data(buf, buffer_len)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("failed to set rpc buffer", K(ret), K(buffer_len));
|
||||
} else if (OB_ISNULL(param.remote_query_ctx_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get remote query ctx nullptr", K(ret), K(param));
|
||||
} else {
|
||||
SMART_VAR(ObLobQueryArg, arg) {
|
||||
// build arg
|
||||
arg.tenant_id_ = param.tenant_id_;
|
||||
arg.offset_ = param.offset_;
|
||||
arg.len_ = param.len_;
|
||||
arg.cs_type_ = param.coll_type_;
|
||||
arg.scan_backward_ = param.scan_backward_;
|
||||
arg.qtype_ = ObLobQueryArg::QueryType::GET_LENGTH;
|
||||
arg.lob_locator_.ptr_ = param.lob_locator_->ptr_;
|
||||
arg.lob_locator_.size_ = param.lob_locator_->size_;
|
||||
arg.lob_locator_.has_lob_header_ = param.lob_locator_->has_lob_header_;
|
||||
int64_t timeout = param.timeout_ - ObTimeUtility::current_time();
|
||||
if (timeout < ObStorageRpcProxy::STREAM_RPC_TIMEOUT) {
|
||||
timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT;
|
||||
}
|
||||
if (OB_FAIL(lob_remote_query_with_retry(param, dst_addr, arg, timeout, rpc_buffer, handle))) {
|
||||
LOG_WARN("failed to do remote query", K(ret), K(arg));
|
||||
ObLobRemoteQueryCtx *remote_ctx = reinterpret_cast<ObLobRemoteQueryCtx*>(param.remote_query_ctx_);
|
||||
int64_t cur_position = remote_ctx->rpc_buffer_.get_position();
|
||||
while (OB_SUCC(ret) && remote_ctx->handle_.has_more()) {
|
||||
cur_position = remote_ctx->rpc_buffer_.get_position();
|
||||
if (OB_FAIL(remote_ctx->handle_.get_more(remote_ctx->rpc_buffer_))) {
|
||||
ret = OB_DATA_SOURCE_TIMEOUT;
|
||||
} else if (remote_ctx->rpc_buffer_.get_position() < 0) {
|
||||
ret = OB_ERR_SYS;
|
||||
} else if (cur_position == remote_ctx->rpc_buffer_.get_position()) {
|
||||
if (!remote_ctx->handle_.has_more()) {
|
||||
ret = OB_ITER_END;
|
||||
LOG_DEBUG("empty rpc buffer, no more data", K(remote_ctx->rpc_buffer_));
|
||||
} else {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_ERROR("rpc buffer has no data", K(ret), K(remote_ctx->rpc_buffer_));
|
||||
}
|
||||
} else {
|
||||
int64_t cur_position = rpc_buffer.get_position();
|
||||
while (OB_SUCC(ret) && handle.has_more()) {
|
||||
cur_position = rpc_buffer.get_position();
|
||||
if (OB_FAIL(handle.get_more(rpc_buffer))) {
|
||||
ret = OB_DATA_SOURCE_TIMEOUT;
|
||||
} else if (rpc_buffer.get_position() < 0) {
|
||||
ret = OB_ERR_SYS;
|
||||
} else if (cur_position == rpc_buffer.get_position()) {
|
||||
if (!handle.has_more()) {
|
||||
ret = OB_ITER_END;
|
||||
LOG_DEBUG("empty rpc buffer, no more data", K(rpc_buffer));
|
||||
} else {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_ERROR("rpc buffer has no data", K(ret), K(rpc_buffer));
|
||||
}
|
||||
} else {
|
||||
LOG_DEBUG("get more data", K(rpc_buffer));
|
||||
}
|
||||
}
|
||||
if (ret == OB_ITER_END) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
// do header decode
|
||||
if (OB_SUCC(ret)) {
|
||||
int64_t rpc_buffer_pos = 0;
|
||||
if (OB_FAIL(serialization::decode(rpc_buffer.get_data(),
|
||||
rpc_buffer.get_position(), rpc_buffer_pos, header))) {
|
||||
LOG_WARN("failed to decode lob query block", K(ret), K(rpc_buffer));
|
||||
} else if (!header.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid header", K(ret), K(header));
|
||||
} else {
|
||||
len = static_cast<uint64_t>(header.size_);
|
||||
}
|
||||
}
|
||||
LOG_DEBUG("get more data", K(remote_ctx->rpc_buffer_));
|
||||
}
|
||||
}
|
||||
if (ret == OB_ITER_END) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
// do header decode
|
||||
if (OB_SUCC(ret)) {
|
||||
int64_t rpc_buffer_pos = 0;
|
||||
if (OB_FAIL(serialization::decode(remote_ctx->rpc_buffer_.get_data(),
|
||||
remote_ctx->rpc_buffer_.get_position(), rpc_buffer_pos, header))) {
|
||||
LOG_WARN("failed to decode lob query block", K(ret), K(remote_ctx->rpc_buffer_));
|
||||
} else if (!header.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid header", K(ret), K(header));
|
||||
} else {
|
||||
len = static_cast<uint64_t>(header.size_);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2352,48 +2367,51 @@ int ObLobManager::getlength(ObLobAccessParam& param, uint64_t &len)
|
||||
} else { // do meta scan
|
||||
bool is_remote_lob = false;
|
||||
common::ObAddr dst_addr;
|
||||
ObLobMetaScanIter meta_iter;
|
||||
param.lob_data_ = reinterpret_cast<ObLobData*>(lob_common->buffer_);
|
||||
// mock do full scan
|
||||
param.offset_ = 0;
|
||||
param.len_ = UINT64_MAX;
|
||||
if (OB_FAIL(is_remote(param, is_remote_lob, dst_addr))) {
|
||||
LOG_WARN("check is remote failed.", K(ret), K(param));
|
||||
} else if (OB_FAIL(lob_query_with_retry(param, dst_addr, is_remote_lob, meta_iter,
|
||||
ObLobQueryArg::QueryType::GET_LENGTH, param.remote_query_ctx_))) {
|
||||
LOG_WARN("fail to do lob query with retry", K(ret), K(is_remote_lob), K(dst_addr));
|
||||
} else if (is_remote_lob) {
|
||||
if (OB_FAIL(getlength_remote(param, dst_addr, len))) {
|
||||
LOG_WARN("fail to get length remote", K(ret));
|
||||
}
|
||||
} else {
|
||||
ObLobMetaScanIter meta_iter;
|
||||
ObLobCtx lob_ctx = lob_ctx_;
|
||||
if (!lob_common->is_init_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid lob common header for out row.", K(ret), KPC(lob_common));
|
||||
} else {
|
||||
param.lob_data_ = reinterpret_cast<ObLobData*>(lob_common->buffer_);
|
||||
// mock do full scan
|
||||
param.offset_ = 0;
|
||||
param.len_ = UINT64_MAX;
|
||||
if (OB_FAIL(lob_ctx.lob_meta_mngr_->scan(param, meta_iter))) {
|
||||
LOG_WARN("do lob meta scan failed.", K(ret), K(param));
|
||||
} else {
|
||||
ObLobQueryResult result;
|
||||
while (OB_SUCC(ret)) {
|
||||
ret = meta_iter.get_next_row(result.meta_result_);
|
||||
if (OB_FAIL(ret)) {
|
||||
if (ret == OB_ITER_END) {
|
||||
} else {
|
||||
LOG_WARN("failed to get next row.", K(ret));
|
||||
}
|
||||
} else if (ObTimeUtility::current_time() > param.timeout_) {
|
||||
ret = OB_TIMEOUT;
|
||||
int64_t cur_time = ObTimeUtility::current_time();
|
||||
LOG_WARN("query timeout", K(cur_time), K(param.timeout_), K(ret));
|
||||
ObLobQueryResult result;
|
||||
while (OB_SUCC(ret)) {
|
||||
ret = meta_iter.get_next_row(result.meta_result_);
|
||||
if (OB_FAIL(ret)) {
|
||||
if (ret == OB_ITER_END) {
|
||||
} else {
|
||||
len += result.meta_result_.info_.char_len_;
|
||||
LOG_WARN("failed to get next row.", K(ret));
|
||||
}
|
||||
}
|
||||
if (ret == OB_ITER_END) {
|
||||
ret = OB_SUCCESS;
|
||||
} else if (ObTimeUtility::current_time() > param.timeout_) {
|
||||
ret = OB_TIMEOUT;
|
||||
int64_t cur_time = ObTimeUtility::current_time();
|
||||
LOG_WARN("query timeout", K(cur_time), K(param.timeout_), K(ret));
|
||||
} else {
|
||||
len += result.meta_result_.info_.char_len_;
|
||||
}
|
||||
}
|
||||
if (ret == OB_ITER_END) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
// release remote query resource
|
||||
if (OB_NOT_NULL(param.remote_query_ctx_)) {
|
||||
ObLobRemoteQueryCtx *remote_ctx = reinterpret_cast<ObLobRemoteQueryCtx*>(param.remote_query_ctx_);
|
||||
remote_ctx->~ObLobRemoteQueryCtx();
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -3761,16 +3779,32 @@ int ObLobManager::build_lob_param(ObLobAccessParam& param,
|
||||
|
||||
|
||||
/*************ObLobQueryIter*****************/
|
||||
int ObLobQueryIter::open(ObLobAccessParam ¶m, ObLobCtx& lob_ctx)
|
||||
int ObLobQueryIter::open(ObString &data, uint32_t byte_offset, uint32_t byte_len, ObCollationType cs, bool is_reverse)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(lob_ctx.lob_meta_mngr_) ||
|
||||
OB_ISNULL(lob_ctx.lob_piece_mngr_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid lob ctx.", K(ret), K(lob_ctx));
|
||||
} else if (OB_FAIL(lob_ctx.lob_meta_mngr_->scan(param, meta_iter_))) {
|
||||
LOG_WARN("open meta iter failed.");
|
||||
} else {
|
||||
cur_pos_ = 0;
|
||||
inner_data_.assign_ptr(data.ptr() + byte_offset, byte_len);
|
||||
is_inited_ = true;
|
||||
is_in_row_ = true;
|
||||
is_reverse_ = is_reverse;
|
||||
cs_type_ = cs;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobQueryIter::open(ObLobAccessParam ¶m, ObLobCtx& lob_ctx, common::ObAddr &dst_addr, bool &is_remote)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLobManager *lob_manager = MTL(ObLobManager*);
|
||||
if (OB_FAIL(lob_manager->lob_query_with_retry(param, dst_addr, is_remote, meta_iter_,
|
||||
ObLobQueryArg::QueryType::READ, remote_query_ctx_))) {
|
||||
LOG_WARN("fail to do lob query with retry", K(ret), K(is_remote), K(dst_addr));
|
||||
} else if (is_remote) { // init remote scan
|
||||
param_ = param;
|
||||
is_reverse_ = param.scan_backward_;
|
||||
cs_type_ = param.coll_type_;
|
||||
is_inited_ = true;
|
||||
is_remote_ = true;
|
||||
} else { // init local scan
|
||||
last_data_buf_len_ = OB_MIN(ObLobMetaUtil::LOB_OPER_PIECE_DATA_SIZE, param.byte_size_);
|
||||
last_data_ptr_ = reinterpret_cast<char*>(param.allocator_->alloc(last_data_buf_len_));
|
||||
if (OB_ISNULL(last_data_ptr_)) {
|
||||
@ -3789,56 +3823,6 @@ int ObLobQueryIter::open(ObLobAccessParam ¶m, ObLobCtx& lob_ctx)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobQueryIter::open(ObString &data, uint32_t byte_offset, uint32_t byte_len, ObCollationType cs, bool is_reverse)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
cur_pos_ = 0;
|
||||
inner_data_.assign_ptr(data.ptr() + byte_offset, byte_len);
|
||||
is_inited_ = true;
|
||||
is_in_row_ = true;
|
||||
is_reverse_ = is_reverse;
|
||||
cs_type_ = cs;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobQueryIter::open(ObLobAccessParam ¶m, common::ObAddr dst_addr)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLobLocatorV2 *lob_locator = param.lob_locator_;
|
||||
if (OB_ISNULL(lob_locator)) {
|
||||
ret = OB_ERR_NULL_VALUE;
|
||||
LOG_WARN("lob locator is null.", K(ret), K(param));
|
||||
} else if (OB_FAIL(remote_reader_.open(param, rpc_buffer_))) {
|
||||
LOG_WARN("failed to open remote reader", K(ret));
|
||||
} else {
|
||||
ObLobManager *lob_manager = MTL(ObLobManager*);
|
||||
// build arg
|
||||
query_arg_.tenant_id_ = param.tenant_id_;
|
||||
query_arg_.offset_ = param.offset_;
|
||||
query_arg_.len_ = param.len_;
|
||||
query_arg_.cs_type_ = param.coll_type_;
|
||||
query_arg_.qtype_ = ObLobQueryArg::QueryType::READ;
|
||||
query_arg_.scan_backward_ = param.scan_backward_;
|
||||
query_arg_.lob_locator_.ptr_ = param.lob_locator_->ptr_;
|
||||
query_arg_.lob_locator_.size_ = param.lob_locator_->size_;
|
||||
query_arg_.lob_locator_.has_lob_header_ = param.lob_locator_->has_lob_header_;
|
||||
int64_t timeout = param.timeout_ - ObTimeUtility::current_time();
|
||||
if (timeout < ObStorageRpcProxy::STREAM_RPC_TIMEOUT) {
|
||||
timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT;
|
||||
}
|
||||
if (OB_FAIL(lob_manager->lob_remote_query_with_retry(param, dst_addr, query_arg_, timeout, rpc_buffer_, handle_))) {
|
||||
LOG_WARN("failed to do remote query", K(ret), K(query_arg_));
|
||||
} else {
|
||||
param_ = param;
|
||||
is_reverse_ = param.scan_backward_;
|
||||
cs_type_ = param.coll_type_;
|
||||
is_inited_ = true;
|
||||
is_remote_ = true;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobQueryIter::get_next_row(ObLobQueryResult &result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -3973,12 +3957,14 @@ int ObLobQueryIter::get_next_row(ObString& data)
|
||||
uint64_t st_len = data.length();
|
||||
ObLobQueryBlock block;
|
||||
ObString cur_buffer;
|
||||
ObLobRemoteQueryCtx *remote_ctx = reinterpret_cast<ObLobRemoteQueryCtx*>(remote_query_ctx_);
|
||||
while (OB_SUCC(ret) && !has_fill_full) {
|
||||
// first try fill buffer remain data to output
|
||||
has_fill_full = fill_buffer_to_data(data);
|
||||
if (has_fill_full) {
|
||||
// data has been filled full, do nothing
|
||||
} else if (OB_FAIL(remote_reader_.get_next_block(param_, rpc_buffer_, handle_, block, last_data_))) {
|
||||
} else if (OB_FAIL(remote_ctx->remote_reader_.get_next_block(param_,
|
||||
remote_ctx->rpc_buffer_, remote_ctx->handle_, block, last_data_))) {
|
||||
if (ret != OB_ITER_END) {
|
||||
LOG_WARN("fail to get block from remote reader", K(ret));
|
||||
}
|
||||
@ -4039,6 +4025,11 @@ void ObLobQueryIter::reset()
|
||||
param_.allocator_->free(last_data_ptr_);
|
||||
last_data_ptr_ = nullptr;
|
||||
}
|
||||
// release remote query resource
|
||||
if (OB_NOT_NULL(remote_query_ctx_)) {
|
||||
ObLobRemoteQueryCtx *remote_ctx = reinterpret_cast<ObLobRemoteQueryCtx*>(remote_query_ctx_);
|
||||
remote_ctx->~ObLobRemoteQueryCtx();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -90,16 +90,24 @@ private:
|
||||
ObString data_buffer_;
|
||||
};
|
||||
|
||||
struct ObLobRemoteQueryCtx
|
||||
{
|
||||
ObLobRemoteQueryCtx() : handle_(), rpc_buffer_(), query_arg_(), remote_reader_() {}
|
||||
obrpc::ObStorageRpcProxy::SSHandle<obrpc::OB_LOB_QUERY> handle_;
|
||||
common::ObDataBuffer rpc_buffer_;
|
||||
ObLobQueryArg query_arg_;
|
||||
ObLobQueryRemoteReader remote_reader_;
|
||||
};
|
||||
|
||||
class ObLobQueryIter
|
||||
{
|
||||
public:
|
||||
ObLobQueryIter() : is_reverse_(false), cs_type_(CS_TYPE_BINARY), is_end_(false),
|
||||
meta_iter_(), lob_ctx_(), param_(), last_data_(), last_data_ptr_(nullptr), last_data_buf_len_(0),
|
||||
inner_data_(), cur_pos_(0), is_in_row_(false), is_inited_(false),
|
||||
is_remote_(false), handle_(), rpc_buffer_(), query_arg_(), remote_reader_() {}
|
||||
int open(ObLobAccessParam ¶m, ObLobCtx& lob_ctx); // outrow open
|
||||
is_remote_(false), remote_query_ctx_(nullptr) {}
|
||||
int open(ObString &data, uint32_t byte_offset, uint32_t byte_len, ObCollationType cs, bool is_reverse = false); // inrow open
|
||||
int open(ObLobAccessParam ¶m, common::ObAddr dst_addr); // remote open
|
||||
int open(ObLobAccessParam ¶m, ObLobCtx& lob_ctx, common::ObAddr& dst_addr, bool &is_remote); // open with retry inner
|
||||
int get_next_row(ObString& data);
|
||||
uint64_t get_cur_pos() { return meta_iter_.get_cur_pos(); }
|
||||
void reset();
|
||||
@ -126,10 +134,7 @@ private:
|
||||
bool is_inited_;
|
||||
// remote ctx
|
||||
bool is_remote_;
|
||||
obrpc::ObStorageRpcProxy::SSHandle<obrpc::OB_LOB_QUERY> handle_;
|
||||
common::ObDataBuffer rpc_buffer_;
|
||||
ObLobQueryArg query_arg_;
|
||||
ObLobQueryRemoteReader remote_reader_;
|
||||
void* remote_query_ctx_;
|
||||
};
|
||||
|
||||
class ObLobManager
|
||||
@ -184,6 +189,15 @@ public:
|
||||
const ObString &data,
|
||||
common::ObCollationType coll_type,
|
||||
ObLobLocatorV2 &out);
|
||||
int lob_query_with_retry(ObLobAccessParam ¶m,
|
||||
ObAddr &dst_addr,
|
||||
bool &remote_bret,
|
||||
ObLobMetaScanIter& iter,
|
||||
ObLobQueryArg::QueryType qtype,
|
||||
void *&ctx);
|
||||
int lob_remote_query_init_ctx(ObLobAccessParam ¶m,
|
||||
ObLobQueryArg::QueryType qtype,
|
||||
void *&ctx);
|
||||
int lob_remote_query_with_retry(
|
||||
ObLobAccessParam ¶m,
|
||||
common::ObAddr& dst_addr,
|
||||
@ -308,7 +322,7 @@ private:
|
||||
int get_inrow_data(ObLobAccessParam& param, ObString& data);
|
||||
int get_ls_leader(ObLobAccessParam& param, const uint64_t tenant_id, const share::ObLSID &ls_id, common::ObAddr &leader);
|
||||
int is_remote(ObLobAccessParam& param, bool& is_remote, common::ObAddr& dst_addr);
|
||||
int query_remote(ObLobAccessParam& param, common::ObAddr& dst_addr, ObString& data);
|
||||
int query_remote(ObLobAccessParam& param, ObString& data);
|
||||
int getlength_remote(ObLobAccessParam& param, common::ObAddr& dst_addr, uint64_t &len);
|
||||
int do_delete_one_piece(ObLobAccessParam& param, ObLobQueryResult &result, ObString &tmp_buff);
|
||||
int prepare_erase_buffer(ObLobAccessParam& param, ObString &tmp_buff);
|
||||
|
||||
@ -56,7 +56,8 @@ struct ObLobAccessParam {
|
||||
scan_backward_(false), asscess_ptable_(false), offset_(0), len_(0),
|
||||
parent_seq_no_(), seq_no_st_(), used_seq_cnt_(0), total_seq_cnt_(0), checksum_(0), update_len_(0),
|
||||
op_type_(ObLobDataOutRowCtx::OpType::SQL), is_fill_zero_(false), from_rpc_(false),
|
||||
inrow_read_nocopy_(false), inrow_threshold_(OB_DEFAULT_LOB_INROW_THRESHOLD), spec_lob_id_()
|
||||
inrow_read_nocopy_(false), inrow_threshold_(OB_DEFAULT_LOB_INROW_THRESHOLD), spec_lob_id_(),
|
||||
remote_query_ctx_(nullptr)
|
||||
{}
|
||||
~ObLobAccessParam() {
|
||||
if (OB_NOT_NULL(dml_base_param_)) {
|
||||
@ -123,6 +124,8 @@ public:
|
||||
bool inrow_read_nocopy_;
|
||||
int64_t inrow_threshold_;
|
||||
ObLobId spec_lob_id_;
|
||||
// remote query ctx
|
||||
void *remote_query_ctx_;
|
||||
};
|
||||
|
||||
struct ObLobMetaInfo {
|
||||
|
||||
Reference in New Issue
Block a user