[BUGFIX] support lob remote query retry for 4038
This commit is contained in:
@ -430,12 +430,65 @@ int ObLobManager::is_remote(ObLobAccessParam& param, bool& is_remote, common::Ob
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObLobManager::is_remote_ret_can_retry(int ret)
|
||||
{
|
||||
return (ret == OB_NOT_MASTER);
|
||||
}
|
||||
|
||||
int ObLobManager::lob_remote_query_with_retry(
|
||||
ObLobAccessParam ¶m,
|
||||
common::ObAddr& dst_addr,
|
||||
ObLobQueryArg& arg,
|
||||
int64_t timeout,
|
||||
common::ObDataBuffer& rpc_buffer,
|
||||
obrpc::ObStorageRpcProxy::SSHandle<obrpc::OB_LOB_QUERY>& handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSService *ls_service = (MTL(ObLSService *));
|
||||
obrpc::ObStorageRpcProxy *svr_rpc_proxy = ls_service->get_storage_rpc_proxy();
|
||||
int64_t retry_max = REMOTE_LOB_QUERY_RETRY_MAX;
|
||||
int64_t retry_cnt = 0;
|
||||
bool is_continue = true;
|
||||
do {
|
||||
ret = svr_rpc_proxy->to(dst_addr).by(arg.tenant_id_)
|
||||
.dst_cluster_id(GCONF.cluster_id)
|
||||
.ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW)
|
||||
.timeout(timeout)
|
||||
.lob_query(arg, rpc_buffer, handle);
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("failed to do remote query", K(ret), K(arg), K(dst_addr), K(timeout));
|
||||
if (is_remote_ret_can_retry(ret)) {
|
||||
retry_cnt++;
|
||||
switch (ret) {
|
||||
case OB_NOT_MASTER: {
|
||||
bool remote_bret = false;
|
||||
// refresh leader
|
||||
if (OB_FAIL(is_remote(param, remote_bret, dst_addr))) {
|
||||
LOG_WARN("fail to refresh leader addr", K(ret), K(param));
|
||||
is_continue = false;
|
||||
} else {
|
||||
LOG_INFO("refresh leader location", K(retry_cnt), K(retry_max), K(remote_bret), K(dst_addr), K(param));
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
LOG_INFO("do nothing, just retry", K(ret), K(retry_cnt), K(retry_max));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
is_continue = false;
|
||||
}
|
||||
} else {
|
||||
is_continue = false;
|
||||
}
|
||||
} while (is_continue && retry_cnt <= retry_max);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLobManager::query_remote(ObLobAccessParam& param, common::ObAddr& dst_addr, ObString& data)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLobLocatorV2 *lob_locator = param.lob_locator_;
|
||||
ObLSService *ls_service = (MTL(ObLSService *));
|
||||
obrpc::ObStorageRpcProxy *svr_rpc_proxy = ls_service->get_storage_rpc_proxy();
|
||||
obrpc::ObStorageRpcProxy::SSHandle<obrpc::OB_LOB_QUERY> handle;
|
||||
common::ObDataBuffer rpc_buffer;
|
||||
ObLobQueryRemoteReader reader;
|
||||
@ -460,12 +513,7 @@ int ObLobManager::query_remote(ObLobAccessParam& param, common::ObAddr& dst_addr
|
||||
if (timeout < ObStorageRpcProxy::STREAM_RPC_TIMEOUT) {
|
||||
timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT;
|
||||
}
|
||||
ret = svr_rpc_proxy->to(dst_addr).by(arg.tenant_id_)
|
||||
.dst_cluster_id(GCONF.cluster_id)
|
||||
.ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW)
|
||||
.timeout(timeout)
|
||||
.lob_query(arg, rpc_buffer, handle);
|
||||
if (OB_FAIL(ret)) {
|
||||
if (OB_FAIL(lob_remote_query_with_retry(param, dst_addr, arg, timeout, rpc_buffer, handle))) {
|
||||
LOG_WARN("failed to do remote query", K(ret), K(arg));
|
||||
} else {
|
||||
ObLobQueryBlock block;
|
||||
@ -1815,8 +1863,6 @@ int ObLobManager::getlength_remote(ObLobAccessParam& param, common::ObAddr& dst_
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLobLocatorV2 *lob_locator = param.lob_locator_;
|
||||
ObLSService *ls_service = (MTL(ObLSService *));
|
||||
obrpc::ObStorageRpcProxy *svr_rpc_proxy = ls_service->get_storage_rpc_proxy();
|
||||
obrpc::ObStorageRpcProxy::SSHandle<obrpc::OB_LOB_QUERY> handle;
|
||||
common::ObDataBuffer rpc_buffer;
|
||||
ObLobQueryBlock header;
|
||||
@ -1847,13 +1893,8 @@ int ObLobManager::getlength_remote(ObLobAccessParam& param, common::ObAddr& dst_
|
||||
if (timeout < ObStorageRpcProxy::STREAM_RPC_TIMEOUT) {
|
||||
timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT;
|
||||
}
|
||||
ret = svr_rpc_proxy->to(dst_addr).by(arg.tenant_id_)
|
||||
.dst_cluster_id(GCONF.cluster_id)
|
||||
.ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW)
|
||||
.timeout(timeout)
|
||||
.lob_query(arg, rpc_buffer, handle);
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("failed to do remote query", K(ret));
|
||||
if (OB_FAIL(lob_remote_query_with_retry(param, dst_addr, arg, timeout, rpc_buffer, handle))) {
|
||||
LOG_WARN("failed to do remote query", K(ret), K(arg));
|
||||
} else {
|
||||
int64_t cur_position = rpc_buffer.get_position();
|
||||
while (OB_SUCC(ret) && handle.has_more()) {
|
||||
@ -3348,9 +3389,7 @@ int ObLobQueryIter::open(ObLobAccessParam ¶m, common::ObAddr dst_addr)
|
||||
} else if (OB_FAIL(remote_reader_.open(param, rpc_buffer_))) {
|
||||
LOG_WARN("failed to open remote reader", K(ret));
|
||||
} else {
|
||||
ObLSService *ls_service = (MTL(ObLSService *));
|
||||
obrpc::ObStorageRpcProxy *svr_rpc_proxy = ls_service->get_storage_rpc_proxy();
|
||||
const int64_t cluster_id = GCONF.cluster_id;
|
||||
ObLobManager *lob_manager = MTL(ObLobManager*);
|
||||
// build arg
|
||||
query_arg_.tenant_id_ = param.tenant_id_;
|
||||
query_arg_.offset_ = param.offset_;
|
||||
@ -3365,12 +3404,7 @@ int ObLobQueryIter::open(ObLobAccessParam ¶m, common::ObAddr dst_addr)
|
||||
if (timeout < ObStorageRpcProxy::STREAM_RPC_TIMEOUT) {
|
||||
timeout = ObStorageRpcProxy::STREAM_RPC_TIMEOUT;
|
||||
}
|
||||
ret = svr_rpc_proxy->to(dst_addr).by(query_arg_.tenant_id_)
|
||||
.dst_cluster_id(cluster_id)
|
||||
.ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW)
|
||||
.timeout(timeout)
|
||||
.lob_query(query_arg_, rpc_buffer_, handle_);
|
||||
if (OB_FAIL(ret)) {
|
||||
if (OB_FAIL(lob_manager->lob_remote_query_with_retry(param, dst_addr, query_arg_, timeout, rpc_buffer_, handle_))) {
|
||||
LOG_WARN("failed to do remote query", K(ret), K(query_arg_));
|
||||
} else {
|
||||
param_ = param;
|
||||
|
||||
Reference in New Issue
Block a user