BUGFIX: retry lock if tenant dose not exist
This commit is contained in:
@ -170,6 +170,18 @@ OB_DEF_DESERIALIZE(ObLockTaskBatchRequest)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObLockParam::reset()
|
||||
{
|
||||
lock_id_.reset();
|
||||
lock_mode_ = NO_LOCK;
|
||||
owner_id_.reset();
|
||||
op_type_ = UNKNOWN_TYPE;
|
||||
is_deadlock_avoid_enabled_ = false;
|
||||
is_try_lock_ = true;
|
||||
expired_time_ = 0;
|
||||
schema_version_ = -1;
|
||||
}
|
||||
|
||||
int ObLockParam::set(
|
||||
const ObLockID &lock_id,
|
||||
const ObTableLockMode lock_mode,
|
||||
@ -354,6 +366,11 @@ int ObTableLockTaskRequest::assign(const ObTableLockTaskRequest &arg)
|
||||
}
|
||||
|
||||
ObTableLockTaskRequest::~ObTableLockTaskRequest()
|
||||
{
|
||||
reset();
|
||||
}
|
||||
|
||||
void ObTableLockTaskRequest::reset()
|
||||
{
|
||||
auto txs = MTL(transaction::ObTransService*);
|
||||
if (OB_NOT_NULL(tx_desc_)) {
|
||||
@ -364,6 +381,7 @@ ObTableLockTaskRequest::~ObTableLockTaskRequest()
|
||||
}
|
||||
task_type_ = INVALID_LOCK_TASK_TYPE;
|
||||
lsid_.reset();
|
||||
param_.reset();
|
||||
tx_desc_ = nullptr;
|
||||
need_release_tx_ = false;
|
||||
}
|
||||
|
||||
@ -72,7 +72,8 @@ public:
|
||||
expired_time_(0),
|
||||
schema_version_(-1)
|
||||
{}
|
||||
virtual ~ObLockParam() {}
|
||||
virtual ~ObLockParam() { reset(); }
|
||||
void reset();
|
||||
int set(
|
||||
const ObLockID &lock_id,
|
||||
const ObTableLockMode lock_mode,
|
||||
@ -245,6 +246,7 @@ public:
|
||||
need_release_tx_(false)
|
||||
{}
|
||||
~ObTableLockTaskRequest();
|
||||
void reset();
|
||||
int set(
|
||||
const ObTableLockTaskType task_type,
|
||||
const share::ObLSID &lsid,
|
||||
|
||||
@ -95,6 +95,14 @@ ObTableLockService::ObTableLockCtx::ObTableLockCtx(const ObTableLockTaskType tas
|
||||
obj_id_ = obj_id;
|
||||
}
|
||||
|
||||
void ObTableLockService::ObRetryCtx::reuse()
|
||||
{
|
||||
need_retry_ = false;
|
||||
send_rpc_count_ = 0;
|
||||
rpc_ls_array_.reuse();
|
||||
retry_lock_ids_.reuse();
|
||||
}
|
||||
|
||||
int64_t ObTableLockService::ObOBJLockGarbageCollector::GARBAGE_COLLECT_PRECISION = 100_ms;
|
||||
int64_t ObTableLockService::ObOBJLockGarbageCollector::GARBAGE_COLLECT_EXEC_INTERVAL = 10_s;
|
||||
int64_t ObTableLockService::ObOBJLockGarbageCollector::GARBAGE_COLLECT_TIMEOUT = 10_min;
|
||||
@ -1143,145 +1151,169 @@ int ObTableLockService::get_table_lock_mode_(const ObTableLockTaskType task_type
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLockService::get_retry_tablet_list_(const ObLockIDArray &lock_ids,
|
||||
common::ObTabletIDArray &retry_tablets)
|
||||
int ObTableLockService::get_retry_lock_ids_(const ObLockIDArray &lock_ids,
|
||||
const int64_t start_pos,
|
||||
ObLockIDArray &retry_lock_ids)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObTabletID tablet_id;
|
||||
for (int64_t i = 0; i < lock_ids.count() && OB_SUCC(ret); ++i) {
|
||||
if (OB_FAIL(lock_ids.at(i).convert_to(tablet_id))) {
|
||||
LOG_WARN("lock id failed to convert to tablet id", K(ret));
|
||||
} else if (OB_FAIL(retry_tablets.push_back(tablet_id))) {
|
||||
LOG_WARN("get retry tablet failed", K(ret), K(tablet_id));
|
||||
for (int64_t i = start_pos; i < lock_ids.count() && OB_SUCC(ret); ++i) {
|
||||
if (OB_FAIL(retry_lock_ids.push_back(lock_ids.at(i)))) {
|
||||
LOG_WARN("get retry tablet failed", K(ret), K(lock_ids.at(i)));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLockService::get_retry_lock_ids_(const ObLSID &ls_id,
|
||||
const ObLSLockMap &ls_lock_map,
|
||||
const int64_t start_pos,
|
||||
ObLockIDArray &retry_lock_ids)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObLockIDArray *lock_ids = nullptr;
|
||||
// get the retry tablet list
|
||||
if (OB_ISNULL(lock_ids = ls_lock_map.get(ls_id))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("the ls not exist at tablet map", K(ret), K(ls_id));
|
||||
} else if (OB_FAIL(get_retry_lock_ids_(*lock_ids, start_pos, retry_lock_ids))) {
|
||||
// get the lock ids
|
||||
LOG_WARN("get retry lock id list failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLockService::collect_rollback_info_(const share::ObLSID &ls_id,
|
||||
ObTableLockCtx &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ctx.add_touched_ls(ls_id))) {
|
||||
LOG_ERROR("add touched ls failed.", K(ret), K(ls_id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLockService::collect_rollback_info_(const ObArray<share::ObLSID> &ls_array,
|
||||
ObTableLockCtx &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// all rpcs treated as failed
|
||||
for (int i = 0; i < ls_array.count() && OB_SUCC(ret); i++) {
|
||||
if (OB_FAIL(ctx.add_touched_ls(ls_array.at(i)))) {
|
||||
LOG_ERROR("add touched ls failed.", K(ret), K(ls_array.at(i)));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template<class RpcProxy>
|
||||
int ObTableLockService::get_retry_tablet_list_(const ObLSLockMap &ls_lock_map,
|
||||
const RpcProxy &proxy_batch,
|
||||
const ObArray<share::ObLSID> &ls_array,
|
||||
common::ObTabletIDArray &retry_tablets)
|
||||
int ObTableLockService::collect_rollback_info_(const ObArray<share::ObLSID> &ls_array,
|
||||
RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObLockIDArray *lock_ids = nullptr;
|
||||
common::ObTabletID tablet_id;
|
||||
ObLSID ls_id;
|
||||
for (int64_t i = 0; i < proxy_batch.get_results().count() && OB_SUCC(ret); ++i) {
|
||||
const ObTableLockTaskResult *result = proxy_batch.get_results().at(i);
|
||||
ls_id = ls_array.at(i);
|
||||
if (OB_ISNULL(result)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("result is null", KR(ret), K(i), K(ls_id));
|
||||
} else if (FALSE_IT(tmp_ret = result->get_ret_code())) {
|
||||
} else if (OB_TMP_FAIL(tmp_ret) && need_retry_rpc_task_(tmp_ret, result)) {
|
||||
// get the retry tablet list
|
||||
if (OB_ISNULL(lock_ids = const_cast<ObLockIDArray *>(ls_lock_map.get(ls_id)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("the ls not exist at tablet map", K(ret), K(ls_id));
|
||||
} else {
|
||||
// get the tablet ids
|
||||
for (int64_t i = result->get_success_pos() + 1; i < lock_ids->count() && OB_SUCC(ret); ++i) {
|
||||
if (OB_FAIL(lock_ids->at(i).convert_to(tablet_id))) {
|
||||
LOG_WARN("lock id failed to convert to tablet id", K(ret));
|
||||
} else if (OB_FAIL(retry_tablets.push_back(tablet_id))) {
|
||||
LOG_WARN("get retry tablet failed", K(ret), K(tablet_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// need wait rpcs that sent finish
|
||||
// otherwise proxy reused or destructored will cause flying rpc core
|
||||
ObArray<int> return_code_array;
|
||||
if (OB_TMP_FAIL(proxy_batch.wait_all(return_code_array))) {
|
||||
LOG_WARN("wait rpc failed", K(tmp_ret));
|
||||
}
|
||||
|
||||
if (OB_FAIL(collect_rollback_info_(ls_array, ctx))) {
|
||||
LOG_WARN("collect rollback info failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template<class RpcProxy>
|
||||
int ObTableLockService::handle_parallel_rpc_response_(int rpc_call_ret,
|
||||
int64_t rpc_count,
|
||||
RpcProxy &proxy_batch,
|
||||
int ObTableLockService::handle_parallel_rpc_response_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
ObArray<share::ObLSID> &ls_array,
|
||||
bool &can_retry)
|
||||
const ObLSLockMap &ls_lock_map,
|
||||
bool &can_retry,
|
||||
ObRetryCtx &retry_ctx)
|
||||
{
|
||||
int ret = rpc_call_ret;
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObTransService *txs = MTL(ObTransService*);
|
||||
ObLSID ls_id;
|
||||
|
||||
can_retry = true;
|
||||
if (OB_FAIL(ret)) {
|
||||
// need wait rpcs that sent finish
|
||||
// otherwise proxy reused or destructored will cause flying rpc core
|
||||
ObArray<int> return_code_array;
|
||||
proxy_batch.wait_all(return_code_array);
|
||||
|
||||
// all rpcs treated as failed
|
||||
for (int i = 0; i < ls_array.count(); i++) {
|
||||
if (OB_SUCCESS != (tmp_ret = ctx.add_touched_ls(ls_array.at(i)))) {
|
||||
LOG_ERROR("add touched ls failed.", K(tmp_ret), K(ls_array.at(i)));
|
||||
}
|
||||
}
|
||||
retry_ctx.need_retry_ = true;
|
||||
// handle result
|
||||
ObArray<int> return_code_array;
|
||||
if (OB_TMP_FAIL(proxy_batch.wait_all(return_code_array))
|
||||
|| retry_ctx.send_rpc_count_ != return_code_array.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("rpc failed", K(tmp_ret), K(retry_ctx.send_rpc_count_), K(return_code_array.count()));
|
||||
// we need add the ls into touched to make rollback.
|
||||
can_retry = false;
|
||||
retry_ctx.need_retry_ = false;
|
||||
(void) collect_rollback_info_(retry_ctx.rpc_ls_array_, ctx);
|
||||
} else {
|
||||
// handle result
|
||||
ObArray<int> return_code_array;
|
||||
if (OB_SUCCESS != (tmp_ret = proxy_batch.wait_all(return_code_array))
|
||||
|| rpc_count != return_code_array.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("rpc failed", KR(ret), K(tmp_ret),
|
||||
K(rpc_count), K(return_code_array.count()));
|
||||
// we need add the ls into touched to make rollback.
|
||||
for (int i = 0; i < ls_array.count(); i++) {
|
||||
if (OB_SUCCESS != (tmp_ret = ctx.add_touched_ls(ls_array.at(i)))) {
|
||||
LOG_ERROR("add touched ls failed.", K(tmp_ret), K(ls_array.at(i)));
|
||||
//check each ret of every rpc
|
||||
const ObTableLockTaskResult *result = nullptr;
|
||||
for (int64_t i = 0; i < return_code_array.count(); ++i) {
|
||||
result = nullptr;
|
||||
tmp_ret = return_code_array.at(i);
|
||||
ls_id = retry_ctx.rpc_ls_array_.at(i);
|
||||
if (need_retry_whole_rpc_task_(tmp_ret)) {
|
||||
// rpc failed, but we need retry the whole rpc task.
|
||||
LOG_WARN("lock rpc failed, but we need retry", KR(tmp_ret), K(i), K(ls_id));
|
||||
if (OB_TMP_FAIL(get_retry_lock_ids_(ls_id,
|
||||
ls_lock_map,
|
||||
0,
|
||||
retry_ctx.retry_lock_ids_))) {
|
||||
can_retry = false;
|
||||
retry_ctx.need_retry_ = false;
|
||||
ret = tmp_ret;
|
||||
LOG_WARN("get retry tablet list failed", KR(ret), K(ls_id));
|
||||
}
|
||||
}
|
||||
can_retry = false;
|
||||
} else {
|
||||
//check each ret of every rpc
|
||||
for (int64_t i = 0; i < return_code_array.count(); ++i) {
|
||||
const ObTableLockTaskResult *result = proxy_batch.get_results().at(i);
|
||||
if (OB_SUCCESS != (tmp_ret = return_code_array.at(i))) {
|
||||
LOG_WARN("lock rpc failed", KR(tmp_ret), K(i), K(ls_array.at(i)));
|
||||
} else if (OB_ISNULL(result)) {
|
||||
} else {
|
||||
if (OB_TMP_FAIL(tmp_ret)) {
|
||||
LOG_WARN("lock rpc failed", KR(tmp_ret), K(i), K(ls_id));
|
||||
} else if (OB_ISNULL(result = proxy_batch.get_results().at(i))) {
|
||||
tmp_ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("result is null", KR(tmp_ret), K(i), K(ls_array.at(i)));
|
||||
} else if (OB_SUCCESS != (tmp_ret = result->get_tx_result_code())) {
|
||||
LOG_WARN("get tx exec result failed", KR(tmp_ret), K(i), K(ls_array.at(i)));
|
||||
LOG_WARN("result is null", KR(tmp_ret), K(i), K(ls_id));
|
||||
} else if (OB_TMP_FAIL(result->get_tx_result_code())) {
|
||||
LOG_WARN("get tx exec result failed", KR(tmp_ret), K(i), K(ls_id));
|
||||
} else if (OB_TMP_FAIL(txs->add_tx_exec_result(*ctx.tx_desc_,
|
||||
result->tx_result_))) {
|
||||
LOG_WARN("failed to add exec result", K(tmp_ret), K(ctx), K(result->tx_result_));
|
||||
}
|
||||
|
||||
// rpc failed or we get tx exec result failed,
|
||||
// we need add the ls into touched to make rollback.
|
||||
if (OB_SUCCESS != tmp_ret) {
|
||||
if (OB_TMP_FAIL(tmp_ret)) {
|
||||
ret = tmp_ret;
|
||||
can_retry = false;
|
||||
share::ObLSID ls_id = ls_array.at(i);
|
||||
if (OB_SUCCESS != (tmp_ret = ctx.add_touched_ls(ls_id))) {
|
||||
LOG_ERROR("add touched ls failed.", K(tmp_ret), K(ls_id));
|
||||
}
|
||||
} else if (OB_SUCCESS != (tmp_ret = (txs->add_tx_exec_result(*ctx.tx_desc_,
|
||||
proxy_batch.get_results().at(i)->tx_result_)))) {
|
||||
ret = tmp_ret;
|
||||
can_retry = false;
|
||||
// must rollback the whole tx.
|
||||
LOG_WARN("failed to add exec result", K(tmp_ret), K(ctx),
|
||||
K(proxy_batch.get_results().at(i)->tx_result_));
|
||||
if (OB_SUCCESS != (tmp_ret = ctx.add_touched_ls(ls_array.at(i)))) {
|
||||
LOG_ERROR("add touched ls failed.", K(tmp_ret), K(ls_array.at(i)));
|
||||
}
|
||||
retry_ctx.need_retry_ = false;
|
||||
(void) collect_rollback_info_(ls_id, ctx);
|
||||
} else {
|
||||
// if error codes are only OB_TRY_LOCK_ROW_CONFLICT, will retry
|
||||
tmp_ret = result->get_ret_code();
|
||||
if (OB_TRANS_KILLED == tmp_ret) {
|
||||
if (need_retry_part_rpc_task_(tmp_ret, result)) {
|
||||
LOG_WARN("lock rpc failed, but we need retry", KR(tmp_ret), K(i), K(ls_id));
|
||||
if (OB_TMP_FAIL(get_retry_lock_ids_(ls_id,
|
||||
ls_lock_map,
|
||||
result->get_success_pos() + 1,
|
||||
retry_ctx.retry_lock_ids_))) {
|
||||
can_retry = false;
|
||||
retry_ctx.need_retry_ = false;
|
||||
ret = tmp_ret;
|
||||
LOG_WARN("get retry tablet list failed", KR(ret), K(ls_id));
|
||||
}
|
||||
} else if (OB_TRANS_KILLED == tmp_ret) {
|
||||
// the trans need kill.
|
||||
ctx.tx_is_killed_ = true;
|
||||
can_retry = false;
|
||||
} else if (OB_TMP_FAIL(tmp_ret)) {
|
||||
can_retry = can_retry && need_retry_rpc_task_(tmp_ret, result);
|
||||
can_retry = false;
|
||||
retry_ctx.need_retry_ = false;
|
||||
}
|
||||
if (OB_FAIL(tmp_ret)) {
|
||||
LOG_WARN("lock rpc wrong", K(tmp_ret), K(ls_array.at(i)));
|
||||
if (OB_SUCC(ret) || ret != OB_TRY_LOCK_ROW_CONFLICT) {
|
||||
if (OB_TMP_FAIL(tmp_ret)) {
|
||||
LOG_WARN("lock rpc wrong", K(tmp_ret), K(ls_id));
|
||||
if (OB_SUCC(ret) || ret == OB_TRY_LOCK_ROW_CONFLICT) {
|
||||
ret = tmp_ret;
|
||||
}
|
||||
}
|
||||
@ -1302,7 +1334,7 @@ int ObTableLockService::pre_check_lock_(ObTableLockCtx &ctx,
|
||||
if (GET_MIN_CLUSTER_VERSION() > CLUSTER_VERSION_4_0_0_0) {
|
||||
ret = batch_pre_check_lock_(ctx, lock_mode, lock_owner, ls_lock_map);
|
||||
} else {
|
||||
ret = pre_check_lock_old_version_(ctx, lock_mode, lock_owner);
|
||||
ret = pre_check_lock_old_version_(ctx, lock_mode, lock_owner, ls_lock_map);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1335,27 +1367,76 @@ int ObTableLockService::parallel_batch_rpc_handle_(RpcProxy &proxy_batch,
|
||||
do {
|
||||
in_map = retry_map;
|
||||
retry_map = &maps[retry_times % MAP_NUM];
|
||||
if (OB_FAIL(parallel_batch_rpc_handle_(proxy_batch,
|
||||
ctx,
|
||||
lock_task_type,
|
||||
*in_map,
|
||||
lock_mode,
|
||||
lock_owner,
|
||||
can_retry,
|
||||
*retry_map))) {
|
||||
LOG_WARN("process rpc failed", KR(ret), K(ctx), K(retry_times));
|
||||
if (OB_FAIL(retry_map->reuse())) {
|
||||
LOG_WARN("reuse retry map failed", K(ret));
|
||||
} else if (OB_FAIL(parallel_batch_rpc_handle_(proxy_batch,
|
||||
ctx,
|
||||
lock_task_type,
|
||||
*in_map,
|
||||
lock_mode,
|
||||
lock_owner,
|
||||
can_retry,
|
||||
*retry_map))) {
|
||||
LOG_WARN("process rpc failed", KR(ret), K(can_retry), K(ctx), K(retry_times));
|
||||
}
|
||||
if (can_retry && !retry_map->empty()) {
|
||||
retry_times++;
|
||||
}
|
||||
if (retry_times % 10) {
|
||||
LOG_WARN("retry too many times", K(retry_times), K(ctx));
|
||||
if (retry_times % 10 == 0) {
|
||||
LOG_WARN("retry too many times", K(retry_times), K(can_retry), K(ctx));
|
||||
FOREACH(data, ls_lock_map) {
|
||||
const share::ObLSID &ls_id = data->first;
|
||||
const ObLockIDArray &lock_ids = data->second;
|
||||
LOG_WARN("retry data", K(ls_id), K(lock_ids));
|
||||
}
|
||||
}
|
||||
} while (can_retry && !retry_map->empty());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template<class RpcProxy>
|
||||
int ObTableLockService::parallel_send_rpc_task_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
const ObTableLockTaskType lock_task_type,
|
||||
const ObLSLockMap &ls_lock_map,
|
||||
const ObTableLockMode lock_mode,
|
||||
const ObTableLockOwnerID lock_owner,
|
||||
ObRetryCtx &retry_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
bool has_retry = false;
|
||||
|
||||
retry_ctx.send_rpc_count_ = 0;
|
||||
// send async rpc parallel
|
||||
FOREACH_X(data, ls_lock_map, OB_SUCC(ret)) {
|
||||
const share::ObLSID &ls_id = data->first;
|
||||
const ObLockIDArray &lock_ids = data->second;
|
||||
if (!has_retry) {
|
||||
if (OB_FAIL(send_rpc_task_(proxy_batch,
|
||||
ctx,
|
||||
ls_id,
|
||||
lock_ids,
|
||||
lock_mode,
|
||||
lock_owner,
|
||||
retry_ctx))) {
|
||||
LOG_WARN("send rpc task failed", K(ret));
|
||||
}
|
||||
}
|
||||
if (retry_ctx.need_retry_ || has_retry) {
|
||||
has_retry = true;
|
||||
if (OB_TMP_FAIL(get_retry_lock_ids_(lock_ids,
|
||||
0,
|
||||
retry_ctx.retry_lock_ids_))) {
|
||||
LOG_WARN("get retry tablet failed", KR(ret));
|
||||
ret = tmp_ret;
|
||||
};
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template<class RpcProxy>
|
||||
int ObTableLockService::parallel_batch_rpc_handle_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
@ -1368,91 +1449,34 @@ int ObTableLockService::parallel_batch_rpc_handle_(RpcProxy &proxy_batch,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
int64_t rpc_count = 0;
|
||||
int64_t timeout_us = 0;
|
||||
ObArray<share::ObLSID> ls_array;
|
||||
common::ObTabletIDArray retry_tablets;
|
||||
bool tmp_can_retry = false; // whether a single rpc can retry.
|
||||
bool has_retry = false;
|
||||
ObRetryCtx retry_ctx;
|
||||
proxy_batch.reuse();
|
||||
|
||||
// send async rpc parallel
|
||||
FOREACH_X(data, ls_lock_map, OB_SUCC(ret) || can_retry) {
|
||||
tmp_can_retry = false;
|
||||
share::ObLSID ls_id = data->first;
|
||||
const ObLockIDArray &lock_ids = data->second;
|
||||
ObLockTaskBatchRequest request;
|
||||
ObAddr addr;
|
||||
if (!has_retry) {
|
||||
if (OB_FAIL(ls_array.push_back(ls_id))) {
|
||||
LOG_WARN("push_back lsid failed", K(ret), K(ls_id));
|
||||
} else if (OB_FAIL(pack_batch_request_(ctx,
|
||||
lock_task_type,
|
||||
lock_mode,
|
||||
lock_owner,
|
||||
ls_id,
|
||||
lock_ids,
|
||||
request))) {
|
||||
LOG_WARN("pack_request_ failed", K(ret), K(ls_id));
|
||||
} else if (OB_FAIL(get_ls_leader_(ctx.tx_desc_->get_cluster_id(),
|
||||
ctx.tx_desc_->get_tenant_id(),
|
||||
ls_id,
|
||||
ctx.abs_timeout_ts_,
|
||||
addr))) {
|
||||
if (need_renew_location_(ret)) {
|
||||
tmp_can_retry = true;
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
LOG_WARN("failed to get ls leader", K(ret), K(ctx), K(ls_id));
|
||||
} else if (FALSE_IT(timeout_us = ctx.get_rpc_timeoutus())) {
|
||||
} else if (ctx.is_timeout()) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("process obj lock timeout", K(ret), K(ctx));
|
||||
} else if (OB_FAIL(proxy_batch.call(addr,
|
||||
timeout_us,
|
||||
ctx.tx_desc_->get_tenant_id(),
|
||||
request))) {
|
||||
LOG_WARN("failed to all async rpc", KR(ret), K(addr),
|
||||
K(ctx.abs_timeout_ts_), K(request));
|
||||
} else {
|
||||
rpc_count++;
|
||||
ALLOW_NEXT_LOG();
|
||||
LOG_INFO("send table lock rpc", KR(ret), K(addr), "request", request);
|
||||
}
|
||||
}
|
||||
if (tmp_can_retry || has_retry) {
|
||||
has_retry = true;
|
||||
if (OB_TMP_FAIL(get_retry_tablet_list_(lock_ids,
|
||||
retry_tablets))) {
|
||||
LOG_WARN("get retry tablet failed", KR(ret));
|
||||
ret = tmp_ret;
|
||||
can_retry = false;
|
||||
};
|
||||
}
|
||||
if (OB_FAIL(parallel_send_rpc_task_(proxy_batch,
|
||||
ctx,
|
||||
lock_task_type,
|
||||
ls_lock_map,
|
||||
lock_mode,
|
||||
lock_owner,
|
||||
retry_ctx))) {
|
||||
can_retry = false;
|
||||
(void)collect_rollback_info_(retry_ctx.rpc_ls_array_, proxy_batch, ctx);
|
||||
LOG_WARN("send rpc task failed", KR(ret));
|
||||
} else {
|
||||
// process rpc response
|
||||
ret = handle_parallel_rpc_response_(proxy_batch,
|
||||
ctx,
|
||||
ls_lock_map,
|
||||
can_retry,
|
||||
retry_ctx);
|
||||
}
|
||||
|
||||
ret = handle_parallel_rpc_response_(ret,
|
||||
rpc_count,
|
||||
proxy_batch,
|
||||
ctx,
|
||||
ls_array,
|
||||
tmp_can_retry);
|
||||
can_retry = can_retry && tmp_can_retry;
|
||||
// does not need retry if it is success.
|
||||
if (OB_FAIL(ret) && can_retry) {
|
||||
if (OB_TMP_FAIL(get_retry_tablet_list_(ls_lock_map,
|
||||
proxy_batch,
|
||||
ls_array,
|
||||
retry_tablets))) {
|
||||
LOG_WARN("get retry tablet failed", KR(tmp_ret));
|
||||
ret = tmp_ret;
|
||||
can_retry = false;
|
||||
};
|
||||
}
|
||||
// get the retry map
|
||||
if (can_retry and retry_tablets.count() != 0) {
|
||||
if (can_retry && retry_ctx.retry_lock_ids_.count() != 0) {
|
||||
LOG_WARN("lock rpc failed, but we need retry", K(ret), K(can_retry), K(retry_ctx));
|
||||
if (OB_FAIL(fill_ls_lock_map_(ctx,
|
||||
retry_tablets,
|
||||
retry_ctx.retry_lock_ids_,
|
||||
retry_ls_lock_map,
|
||||
true /* force refresh location */))) {
|
||||
LOG_WARN("refill ls lock map failed", KP(ret), K(ctx));
|
||||
@ -1525,39 +1549,43 @@ int ObTableLockService::batch_pre_check_lock_(ObTableLockCtx &ctx,
|
||||
|
||||
int ObTableLockService::pre_check_lock_old_version_(ObTableLockCtx &ctx,
|
||||
const ObTableLockMode lock_mode,
|
||||
const ObTableLockOwnerID lock_owner)
|
||||
const ObTableLockOwnerID lock_owner,
|
||||
const ObLSLockMap &ls_lock_map)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int last_ret = OB_SUCCESS;
|
||||
int64_t USLEEP_TIME = 100; // 0.1 ms
|
||||
bool need_retry = false;
|
||||
int64_t timeout_us = 0;
|
||||
bool unused = false;
|
||||
share::ObLSID ls_id;
|
||||
ObLockID lock_id;
|
||||
ObRetryCtx retry_ctx;
|
||||
ObAddr addr;
|
||||
ObTableLockTaskRequest request;
|
||||
ObTableLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::lock_table);
|
||||
// only used in LOCK_TABLE/LOCK_PARTITION
|
||||
if (LOCK_TABLE == ctx.task_type_ ||
|
||||
LOCK_PARTITION == ctx.task_type_) {
|
||||
do {
|
||||
need_retry = false;
|
||||
ObArray<share::ObLSID> ls_array;
|
||||
int64_t rpc_count = 0;
|
||||
ObTableLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::lock_table);
|
||||
int64_t timeout_us = 0;
|
||||
timeout_us = 0;
|
||||
proxy_batch.reuse();
|
||||
|
||||
retry_ctx.reuse();
|
||||
// send async rpc parallel
|
||||
for (int64_t i = 0; i < ctx.get_tablet_cnt() && OB_SUCC(ret); ++i) {
|
||||
share::ObLSID ls_id;
|
||||
ObLockID lock_id;
|
||||
ObTabletID &tablet_id = ctx.tablet_list_.at(i);
|
||||
const ObTabletID &tablet_id = ctx.tablet_list_.at(i);
|
||||
if (OB_FAIL(get_tablet_ls_(ctx, tablet_id, ls_id))) {
|
||||
LOG_WARN("failed to get tablet ls", K(ret), K(tablet_id));
|
||||
} else if (OB_FAIL(get_lock_id(tablet_id, lock_id))) {
|
||||
LOG_WARN("get lock id failed", K(ret), K(ctx));
|
||||
} else {
|
||||
ObAddr addr;
|
||||
ObTableLockTaskRequest request;
|
||||
addr.reset();
|
||||
request.reset();
|
||||
// can not reused because of allocator reset
|
||||
ObTableLockTaskResult result;
|
||||
|
||||
if (OB_FAIL(ls_array.push_back(ls_id))) {
|
||||
if (OB_FAIL(retry_ctx.rpc_ls_array_.push_back(ls_id))) {
|
||||
LOG_WARN("push_back lsid failed", K(ret), K(ls_id));
|
||||
} else if (OB_FAIL(pack_request_(ctx, PRE_CHECK_TABLET, lock_mode, lock_owner,
|
||||
lock_id, ls_id, addr, request))) {
|
||||
@ -1574,13 +1602,17 @@ int ObTableLockService::pre_check_lock_old_version_(ObTableLockCtx &ctx,
|
||||
LOG_WARN("failed to all async rpc", KR(ret), K(addr),
|
||||
K(ctx.abs_timeout_ts_), K(request));
|
||||
} else {
|
||||
rpc_count++;
|
||||
retry_ctx.send_rpc_count_++;
|
||||
ALLOW_NEXT_LOG();
|
||||
LOG_INFO("send table pre_check rpc", KR(ret), K(addr), "request", request);
|
||||
}
|
||||
}
|
||||
}
|
||||
ret = handle_parallel_rpc_response_(ret, rpc_count, proxy_batch, ctx, ls_array, unused);
|
||||
if (OB_FAIL(ret)) {
|
||||
(void)collect_rollback_info_(retry_ctx.rpc_ls_array_, proxy_batch, ctx);
|
||||
} else {
|
||||
ret = handle_parallel_rpc_response_(proxy_batch, ctx, ls_lock_map, unused, retry_ctx);
|
||||
}
|
||||
|
||||
if (is_timeout_ret_code_(ret)) {
|
||||
ret = (last_ret == OB_TRY_LOCK_ROW_CONFLICT) ? OB_ERR_EXCLUSIVE_LOCK_CONFLICT : OB_TIMEOUT;
|
||||
@ -1667,11 +1699,12 @@ int ObTableLockService::pack_batch_request_(ObTableLockCtx &ctx,
|
||||
ObLockTaskBatchRequest &request)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLockParam lock_param;
|
||||
if (OB_FAIL(request.init(task_type, ls_id, ctx.tx_desc_))) {
|
||||
LOG_WARN("request init failed", K(ret), K(task_type), K(ls_id), KP(ctx.tx_desc_));
|
||||
} else {
|
||||
for (int i = 0; i < lock_ids.count() && OB_SUCC(ret); ++i) {
|
||||
ObLockParam lock_param;
|
||||
lock_param.reset();
|
||||
if (OB_FAIL(lock_param.set(lock_ids[i],
|
||||
lock_mode,
|
||||
lock_owner,
|
||||
@ -1751,19 +1784,21 @@ int ObTableLockService::batch_rpc_handle_(RpcProxy &proxy_batch,
|
||||
do {
|
||||
in_map = retry_map;
|
||||
retry_map = &maps[retry_times % MAP_NUM];
|
||||
if (OB_FAIL(batch_rpc_handle_(proxy_batch,
|
||||
ctx,
|
||||
*in_map,
|
||||
lock_mode,
|
||||
lock_owner,
|
||||
can_retry,
|
||||
*retry_map))) {
|
||||
if (OB_FAIL(retry_map->reuse())) {
|
||||
LOG_WARN("reuse retry map failed", K(ret));
|
||||
} else if (OB_FAIL(batch_rpc_handle_(proxy_batch,
|
||||
ctx,
|
||||
*in_map,
|
||||
lock_mode,
|
||||
lock_owner,
|
||||
can_retry,
|
||||
*retry_map))) {
|
||||
LOG_WARN("process rpc failed", KR(ret), K(ctx), K(retry_times));
|
||||
}
|
||||
if (can_retry && !retry_map->empty()) {
|
||||
retry_times++;
|
||||
}
|
||||
if (retry_times % 10) {
|
||||
if (retry_times % 10 == 0) {
|
||||
LOG_WARN("retry too many times", K(retry_times), K(ctx), K(retry_map->size()));
|
||||
}
|
||||
} while (can_retry && !retry_map->empty());
|
||||
@ -1771,6 +1806,91 @@ int ObTableLockService::batch_rpc_handle_(RpcProxy &proxy_batch,
|
||||
return ret;
|
||||
}
|
||||
|
||||
template<class RpcProxy>
|
||||
int ObTableLockService::send_rpc_task_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
const share::ObLSID &ls_id,
|
||||
const ObLockIDArray &lock_ids,
|
||||
const ObTableLockMode lock_mode,
|
||||
const ObTableLockOwnerID lock_owner,
|
||||
ObRetryCtx &retry_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLockTaskBatchRequest request;
|
||||
ObAddr addr;
|
||||
|
||||
retry_ctx.need_retry_ = false;
|
||||
if (OB_FAIL(retry_ctx.rpc_ls_array_.push_back(ls_id))) {
|
||||
LOG_WARN("push_back lsid failed", K(ret), K(ls_id));
|
||||
} else if (OB_FAIL(pack_batch_request_(ctx,
|
||||
ctx.task_type_,
|
||||
lock_mode,
|
||||
lock_owner,
|
||||
ls_id,
|
||||
lock_ids,
|
||||
request))) {
|
||||
LOG_WARN("pack_request_ failed", K(ret), K(ls_id));
|
||||
} else if (OB_FAIL(get_ls_leader_(ctx.tx_desc_->get_cluster_id(),
|
||||
ctx.tx_desc_->get_tenant_id(),
|
||||
ls_id,
|
||||
ctx.abs_timeout_ts_,
|
||||
addr))) {
|
||||
if (need_renew_location_(ret)) {
|
||||
retry_ctx.need_retry_ = true;
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
LOG_WARN("failed to get ls leader", K(ret), K(ctx), K(ls_id));
|
||||
} else if (ctx.is_timeout()) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("process obj lock timeout", K(ret), K(ctx));
|
||||
} else if (OB_FAIL(proxy_batch.call(addr,
|
||||
ctx.get_rpc_timeoutus(),
|
||||
ctx.tx_desc_->get_tenant_id(),
|
||||
request))) {
|
||||
LOG_WARN("failed to call async rpc", KR(ret), K(addr),
|
||||
K(ctx.abs_timeout_ts_), K(request));
|
||||
} else {
|
||||
retry_ctx.send_rpc_count_++;
|
||||
ALLOW_NEXT_LOG();
|
||||
LOG_INFO("send table lock rpc", KR(ret), K(retry_ctx.send_rpc_count_), K(addr), "request", request);
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
retry_ctx.need_retry_ = false;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template<class RpcProxy>
|
||||
int ObTableLockService::send_one_rpc_task_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
const share::ObLSID &ls_id,
|
||||
const ObLockIDArray &lock_ids,
|
||||
const ObTableLockMode lock_mode,
|
||||
const ObTableLockOwnerID lock_owner,
|
||||
ObRetryCtx &retry_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
retry_ctx.send_rpc_count_ = 0;
|
||||
if (OB_FAIL(send_rpc_task_(proxy_batch,
|
||||
ctx,
|
||||
ls_id,
|
||||
lock_ids,
|
||||
lock_mode,
|
||||
lock_owner,
|
||||
retry_ctx))) {
|
||||
LOG_WARN("send rpc task failed", K(ret), K(ls_id));
|
||||
} else if (retry_ctx.need_retry_) {
|
||||
if (OB_FAIL(get_retry_lock_ids_(lock_ids,
|
||||
0,
|
||||
retry_ctx.retry_lock_ids_))) {
|
||||
retry_ctx.need_retry_ = false;
|
||||
LOG_WARN("get retry tablet failed", KR(ret));
|
||||
};
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template<class RpcProxy>
|
||||
int ObTableLockService::batch_rpc_handle_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
@ -1781,81 +1901,45 @@ int ObTableLockService::batch_rpc_handle_(RpcProxy &proxy_batch,
|
||||
ObLSLockMap &retry_ls_lock_map)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
int64_t timeout_us = 0;
|
||||
common::ObTabletIDArray retry_tablets;
|
||||
bool tmp_can_retry = false; // whether a single rpc can retry.
|
||||
ObLockIDArray retry_lock_ids;
|
||||
ObRetryCtx retry_ctx;
|
||||
FOREACH_X(data, ls_lock_map, OB_SUCC(ret) || can_retry) {
|
||||
tmp_can_retry = false;
|
||||
proxy_batch.reuse();
|
||||
ObArray<share::ObLSID> ls_array;
|
||||
int64_t rpc_count = 0;
|
||||
share::ObLSID ls_id = data->first;
|
||||
retry_ctx.reuse();
|
||||
const share::ObLSID &ls_id = data->first;
|
||||
const ObLockIDArray &lock_ids = data->second;
|
||||
ObLockTaskBatchRequest request;
|
||||
ObAddr addr;
|
||||
|
||||
if (OB_FAIL(ls_array.push_back(ls_id))) {
|
||||
LOG_WARN("push_back lsid failed", K(ret), K(ls_id));
|
||||
} else if (OB_FAIL(pack_batch_request_(ctx, ctx.task_type_, lock_mode, lock_owner,
|
||||
ls_id, lock_ids, request))) {
|
||||
LOG_WARN("pack_request_ failed", K(ret), K(ls_id));
|
||||
} else if (OB_FAIL(get_ls_leader_(ctx.tx_desc_->get_cluster_id(),
|
||||
ctx.tx_desc_->get_tenant_id(),
|
||||
ls_id,
|
||||
ctx.abs_timeout_ts_,
|
||||
addr))) {
|
||||
if (need_renew_location_(ret)) {
|
||||
tmp_can_retry = true;
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
LOG_WARN("failed to get ls leader", K(ret), K(ctx), K(ls_id));
|
||||
} else if (FALSE_IT(timeout_us = ctx.get_rpc_timeoutus())) {
|
||||
} else if (ctx.is_timeout()) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("process obj lock timeout", K(ret), K(ctx));
|
||||
} else if (OB_FAIL(proxy_batch.call(addr,
|
||||
timeout_us,
|
||||
ctx.tx_desc_->get_tenant_id(),
|
||||
request))) {
|
||||
LOG_WARN("failed to call async rpc", KR(ret), K(addr),
|
||||
K(ctx.abs_timeout_ts_), K(request));
|
||||
if (OB_FAIL(send_one_rpc_task_(proxy_batch,
|
||||
ctx,
|
||||
ls_id,
|
||||
lock_ids,
|
||||
lock_mode,
|
||||
lock_owner,
|
||||
retry_ctx))) {
|
||||
can_retry = false;
|
||||
(void)collect_rollback_info_(retry_ctx.rpc_ls_array_, proxy_batch, ctx);
|
||||
LOG_WARN("send rpc task failed", KR(ret));
|
||||
} else {
|
||||
rpc_count++;
|
||||
ALLOW_NEXT_LOG();
|
||||
LOG_INFO("send table lock rpc", KR(ret), K(rpc_count), K(addr), "request", request);
|
||||
}
|
||||
if (tmp_can_retry) {
|
||||
if (OB_FAIL(get_retry_tablet_list_(lock_ids,
|
||||
retry_tablets))) {
|
||||
LOG_WARN("get retry tablet failed", KR(ret));
|
||||
can_retry = false;
|
||||
};
|
||||
} else {
|
||||
ret = handle_parallel_rpc_response_(ret,
|
||||
rpc_count,
|
||||
proxy_batch,
|
||||
ret = handle_parallel_rpc_response_(proxy_batch,
|
||||
ctx,
|
||||
ls_array,
|
||||
tmp_can_retry);
|
||||
ls_lock_map,
|
||||
can_retry,
|
||||
retry_ctx);
|
||||
}
|
||||
can_retry = can_retry && tmp_can_retry;
|
||||
// does not need retry if it is success.
|
||||
if (OB_FAIL(ret) && can_retry) {
|
||||
if (OB_TMP_FAIL(get_retry_tablet_list_(ls_lock_map,
|
||||
proxy_batch,
|
||||
ls_array,
|
||||
retry_tablets))) {
|
||||
LOG_WARN("get retry tablet failed", KR(tmp_ret));
|
||||
ret = tmp_ret;
|
||||
// collect one rpc's retry tablets.
|
||||
if (can_retry) {
|
||||
if (OB_FAIL(get_retry_lock_ids_(retry_ctx.retry_lock_ids_,
|
||||
0,
|
||||
retry_lock_ids))) {
|
||||
can_retry = false;
|
||||
};
|
||||
LOG_WARN("get retry tablet list failed", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
// get the retry map
|
||||
if (can_retry and retry_tablets.count() != 0) {
|
||||
if (can_retry && retry_lock_ids.count() != 0) {
|
||||
if (OB_FAIL(fill_ls_lock_map_(ctx,
|
||||
retry_tablets,
|
||||
retry_lock_ids,
|
||||
retry_ls_lock_map,
|
||||
true /* force refresh location */))) {
|
||||
LOG_WARN("refill ls lock map failed", KP(ret), K(ctx));
|
||||
@ -1869,22 +1953,25 @@ template<class RpcProxy>
|
||||
int ObTableLockService::parallel_rpc_handle_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
const LockMap &lock_map,
|
||||
const ObLSLockMap &ls_lock_map,
|
||||
const ObTableLockMode lock_mode,
|
||||
const ObTableLockOwnerID lock_owner)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t timeout_us = 0;
|
||||
bool unused = false;
|
||||
ObRetryCtx retry_ctx;
|
||||
ObAddr addr;
|
||||
ObTableLockTaskRequest request;
|
||||
FOREACH_X(lock, lock_map, OB_SUCC(ret)) {
|
||||
proxy_batch.reuse();
|
||||
ObArray<share::ObLSID> ls_array;
|
||||
int64_t rpc_count = 0;
|
||||
ObLockID lock_id = lock->first;
|
||||
share::ObLSID ls_id = lock->second;
|
||||
ObAddr addr;
|
||||
ObTableLockTaskRequest request;
|
||||
retry_ctx.reuse();
|
||||
addr.reset();
|
||||
request.reset();
|
||||
const ObLockID &lock_id = lock->first;
|
||||
const share::ObLSID &ls_id = lock->second;
|
||||
|
||||
if (OB_FAIL(ls_array.push_back(ls_id))) {
|
||||
if (OB_FAIL(retry_ctx.rpc_ls_array_.push_back(ls_id))) {
|
||||
LOG_WARN("push_back lsid failed", K(ret), K(ls_id));
|
||||
} else if (OB_FAIL(pack_request_(ctx, ctx.task_type_, lock_mode, lock_owner,
|
||||
lock_id, ls_id, addr, request))) {
|
||||
@ -1900,11 +1987,16 @@ int ObTableLockService::parallel_rpc_handle_(RpcProxy &proxy_batch,
|
||||
LOG_WARN("failed to all async rpc", KR(ret), K(addr),
|
||||
K(ctx.abs_timeout_ts_), K(request));
|
||||
} else {
|
||||
rpc_count++;
|
||||
retry_ctx.send_rpc_count_++;
|
||||
ALLOW_NEXT_LOG();
|
||||
LOG_INFO("send table lock rpc", KR(ret), K(rpc_count), K(addr), "request", request);
|
||||
LOG_INFO("send table lock rpc", KR(ret), K(retry_ctx.send_rpc_count_),
|
||||
K(addr), "request", request);
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
(void)collect_rollback_info_(retry_ctx.rpc_ls_array_, proxy_batch, ctx);
|
||||
} else {
|
||||
ret = handle_parallel_rpc_response_(proxy_batch, ctx, ls_lock_map, unused, retry_ctx);
|
||||
}
|
||||
ret = handle_parallel_rpc_response_(ret, rpc_count, proxy_batch, ctx, ls_array, unused);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1928,6 +2020,7 @@ int ObTableLockService::inner_process_obj_lock_batch_(ObTableLockCtx &ctx,
|
||||
|
||||
int ObTableLockService::inner_process_obj_lock_old_version_(ObTableLockCtx &ctx,
|
||||
const LockMap &lock_map,
|
||||
const ObLSLockMap &ls_lock_map,
|
||||
const ObTableLockMode lock_mode,
|
||||
const ObTableLockOwnerID lock_owner)
|
||||
{
|
||||
@ -1935,10 +2028,10 @@ int ObTableLockService::inner_process_obj_lock_old_version_(ObTableLockCtx &ctx,
|
||||
// TODO: yanyuan.cxf we process the rpc one by one and do parallel later.
|
||||
if (ctx.is_unlock_task()) {
|
||||
ObHighPriorityTableLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::unlock_table);
|
||||
ret = parallel_rpc_handle_(proxy_batch, ctx, lock_map, lock_mode, lock_owner);
|
||||
ret = parallel_rpc_handle_(proxy_batch, ctx, lock_map, ls_lock_map, lock_mode, lock_owner);
|
||||
} else {
|
||||
ObTableLockProxy proxy_batch(*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::lock_table);
|
||||
ret = parallel_rpc_handle_(proxy_batch, ctx, lock_map, lock_mode, lock_owner);
|
||||
ret = parallel_rpc_handle_(proxy_batch, ctx, lock_map, ls_lock_map, lock_mode, lock_owner);
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -1954,7 +2047,7 @@ int ObTableLockService::inner_process_obj_lock_(ObTableLockCtx &ctx,
|
||||
if (GET_MIN_CLUSTER_VERSION() > CLUSTER_VERSION_4_0_0_0) {
|
||||
ret = inner_process_obj_lock_batch_(ctx, ls_lock_map, lock_mode, lock_owner);
|
||||
} else {
|
||||
ret = inner_process_obj_lock_old_version_(ctx, lock_map, lock_mode, lock_owner);
|
||||
ret = inner_process_obj_lock_old_version_(ctx, lock_map, ls_lock_map, lock_mode, lock_owner);
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -2126,6 +2219,26 @@ int ObTableLockService::check_op_allowed_(const uint64_t table_id,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLockService::get_lock_id_ls_(
|
||||
const ObTableLockCtx &ctx,
|
||||
const ObLockID &lock_id,
|
||||
ObLSID &ls_id,
|
||||
bool force_refresh)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (lock_id.is_tablet_lock()) {
|
||||
ObTabletID tablet_id;
|
||||
if (OB_FAIL(lock_id.convert_to(tablet_id))) {
|
||||
LOG_WARN("convert tablet id failed", K(ret), K(lock_id));
|
||||
} else if (OB_FAIL(get_tablet_ls_(ctx, tablet_id, ls_id, force_refresh))) {
|
||||
LOG_WARN("get tablet ls failed", K(ret), K(tablet_id), K(force_refresh));
|
||||
}
|
||||
} else {
|
||||
ls_id = LOCK_SERVICE_LS;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLockService::get_tablet_ls_(
|
||||
const ObTableLockCtx &ctx,
|
||||
const ObTabletID &tablet_id,
|
||||
@ -2218,24 +2331,24 @@ int ObTableLockService::get_process_tablets_(const ObTableLockMode lock_mode,
|
||||
}
|
||||
|
||||
int ObTableLockService::fill_ls_lock_map_(ObTableLockCtx &ctx,
|
||||
const common::ObTabletIDArray &tablets,
|
||||
const ObLockIDArray &lock_ids,
|
||||
ObLSLockMap &ls_lock_map,
|
||||
bool force_refresh_location)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
share::ObLSID ls_id;
|
||||
ObLockIDArray lock_array;
|
||||
ObLockIDArray *p = nullptr;
|
||||
if (OB_FAIL(ls_lock_map.reuse())) {
|
||||
LOG_WARN("fail to reuse ls_lock_map", KR(ret));
|
||||
} else {
|
||||
for (int64_t i = 0; i < tablets.count() && OB_SUCC(ret); ++i) {
|
||||
share::ObLSID ls_id;
|
||||
ObLockIDArray lock_array;
|
||||
ObLockIDArray *p = nullptr;
|
||||
const ObTabletID &tablet_id = tablets.at(i);
|
||||
ObLockID lock_id;
|
||||
if (OB_FAIL(get_tablet_ls_(ctx, tablet_id, ls_id, force_refresh_location))) {
|
||||
LOG_WARN("failed to get tablet ls", K(ret), K(tablet_id));
|
||||
} else if (OB_FAIL(get_lock_id(tablet_id, lock_id))) {
|
||||
LOG_WARN("get lock id failed", K(ret), K(ctx));
|
||||
for (int64_t i = 0; i < lock_ids.count() && OB_SUCC(ret); ++i) {
|
||||
ls_id.reset();
|
||||
lock_array.reuse();
|
||||
p = nullptr;
|
||||
const ObLockID &lock_id = lock_ids.at(i);
|
||||
if (OB_FAIL(get_lock_id_ls_(ctx, lock_id, ls_id, force_refresh_location))) {
|
||||
LOG_WARN("failed to get lock ls", K(ret), K(lock_id));
|
||||
} else if (OB_FAIL(ls_lock_map.set_refactored(ls_id, lock_array)) &&
|
||||
OB_ENTRY_EXIST != ret && OB_HASH_EXIST != ret) {
|
||||
LOG_WARN("fail to set ls_lock_map", K(ret), K(ls_id));
|
||||
@ -2245,7 +2358,7 @@ int ObTableLockService::fill_ls_lock_map_(ObTableLockCtx &ctx,
|
||||
} else if (OB_FAIL(p->push_back(lock_id))) {
|
||||
LOG_WARN("push_back lock_id failed", K(ret), K(ls_id), K(lock_id));
|
||||
}
|
||||
LOG_DEBUG("tablet add to lock map", K(lock_id), K(tablet_id), K(i));
|
||||
LOG_DEBUG("lock add to lock map", K(lock_id), K(i));
|
||||
}
|
||||
}
|
||||
|
||||
@ -2258,17 +2371,21 @@ int ObTableLockService::fill_ls_lock_map_(ObTableLockCtx &ctx,
|
||||
ObLSLockMap &ls_lock_map)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
share::ObLSID ls_id;
|
||||
ObLockIDArray lock_array;
|
||||
ObLockIDArray *p = nullptr;
|
||||
ObLockID lock_id;
|
||||
if (OB_FAIL(lock_map.reuse())) {
|
||||
LOG_WARN("fail to reuse lock_map", KR(ret));
|
||||
} else if (OB_FAIL(ls_lock_map.reuse())) {
|
||||
LOG_WARN("fail to reuse ls_lock_map", KR(ret));
|
||||
} else {
|
||||
for (int64_t i = 0; i < tablets.count() && OB_SUCC(ret); ++i) {
|
||||
share::ObLSID ls_id;
|
||||
ObLockIDArray lock_array;
|
||||
ObLockIDArray *p = nullptr;
|
||||
ls_id.reset();
|
||||
lock_array.reuse();
|
||||
p = nullptr;
|
||||
lock_id.reset();
|
||||
const ObTabletID &tablet_id = tablets.at(i);
|
||||
ObLockID lock_id;
|
||||
if (OB_FAIL(get_tablet_ls_(ctx, tablet_id, ls_id))) {
|
||||
LOG_WARN("failed to get tablet ls", K(ret), K(tablet_id));
|
||||
} else if (OB_FAIL(get_lock_id(tablet_id,
|
||||
@ -2368,8 +2485,13 @@ bool ObTableLockService::need_retry_single_task_(const ObTableLockCtx &ctx,
|
||||
return need_retry;
|
||||
}
|
||||
|
||||
bool ObTableLockService::need_retry_rpc_task_(const int64_t ret,
|
||||
const ObTableLockTaskResult *result) const
|
||||
bool ObTableLockService::need_retry_whole_rpc_task_(const int ret)
|
||||
{
|
||||
return (OB_TENANT_NOT_IN_SERVER == ret);
|
||||
}
|
||||
|
||||
bool ObTableLockService::need_retry_part_rpc_task_(const int ret,
|
||||
const ObTableLockTaskResult *result) const
|
||||
{
|
||||
bool need_retry = false;
|
||||
need_retry = (OB_LS_NOT_EXIST == ret ||
|
||||
|
||||
@ -160,6 +160,25 @@ private:
|
||||
K(tablet_list_), K(schema_version_), K(tx_is_killed_),
|
||||
K(is_from_sql_), K(stmt_savepoint_));
|
||||
};
|
||||
class ObRetryCtx
|
||||
{
|
||||
public:
|
||||
ObRetryCtx() : need_retry_(false),
|
||||
send_rpc_count_(0),
|
||||
rpc_ls_array_(),
|
||||
retry_lock_ids_()
|
||||
{}
|
||||
~ObRetryCtx()
|
||||
{ reuse(); }
|
||||
void reuse();
|
||||
public:
|
||||
TO_STRING_KV(K_(need_retry), K_(send_rpc_count), K_(rpc_ls_array),
|
||||
K_(retry_lock_ids));
|
||||
bool need_retry_;
|
||||
int64_t send_rpc_count_; // how many rpc we have send.
|
||||
ObArray<share::ObLSID> rpc_ls_array_;
|
||||
ObLockIDArray retry_lock_ids_; // the lock id need to be retry.
|
||||
};
|
||||
public:
|
||||
class ObOBJLockGarbageCollector
|
||||
{
|
||||
@ -313,8 +332,9 @@ private:
|
||||
const int64_t ret) const;
|
||||
bool need_retry_single_task_(const ObTableLockCtx &ctx,
|
||||
const int64_t ret) const;
|
||||
bool need_retry_rpc_task_(const int64_t ret,
|
||||
const ObTableLockTaskResult *result) const;
|
||||
bool need_retry_whole_rpc_task_(const int ret);
|
||||
bool need_retry_part_rpc_task_(const int ret,
|
||||
const ObTableLockTaskResult *result) const;
|
||||
bool need_renew_location_(const int64_t ret) const;
|
||||
int rewrite_return_code_(const int ret) const;
|
||||
int is_timeout_ret_code_(const int ret) const;
|
||||
@ -348,7 +368,7 @@ private:
|
||||
LockMap &lock_map,
|
||||
ObLSLockMap &ls_lock_map);
|
||||
int fill_ls_lock_map_(ObTableLockCtx &ctx,
|
||||
const common::ObTabletIDArray &tablets,
|
||||
const ObLockIDArray &lock_ids,
|
||||
ObLSLockMap &ls_lock_map,
|
||||
bool force_refresh_location);
|
||||
int fill_ls_lock_map_(ObTableLockCtx &ctx,
|
||||
@ -359,6 +379,10 @@ private:
|
||||
const ObTabletID &tablet_id,
|
||||
share::ObLSID &ls_id,
|
||||
bool force_refresh = false);
|
||||
int get_lock_id_ls_(const ObTableLockCtx &ctx,
|
||||
const ObLockID &lock_id,
|
||||
share::ObLSID &ls_id,
|
||||
bool force_refresh = false);
|
||||
int get_ls_leader_(const int64_t cluster_id,
|
||||
const uint64_t tenant_id,
|
||||
const share::ObLSID &ls_id,
|
||||
@ -383,6 +407,7 @@ private:
|
||||
int parallel_rpc_handle_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
const LockMap &lock_map,
|
||||
const ObLSLockMap &ls_lock_map,
|
||||
const ObTableLockMode lock_mode,
|
||||
const ObTableLockOwnerID lock_owner);
|
||||
template<class RpcProxy>
|
||||
@ -400,12 +425,11 @@ private:
|
||||
bool &can_retry,
|
||||
ObLSLockMap &retry_ls_lock_map);
|
||||
template<class RpcProxy>
|
||||
int handle_parallel_rpc_response_(int rpc_call_ret,
|
||||
int64_t rpc_count,
|
||||
RpcProxy &proxy_batch,
|
||||
int handle_parallel_rpc_response_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
ObArray<share::ObLSID> &ls_array,
|
||||
bool &can_retry);
|
||||
const ObLSLockMap &ls_lock_map,
|
||||
bool &can_retry,
|
||||
ObRetryCtx &retry_ctx);
|
||||
template<class RpcProxy>
|
||||
int parallel_batch_rpc_handle_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
@ -422,13 +446,45 @@ private:
|
||||
const ObTableLockOwnerID lock_owner,
|
||||
bool &can_retry,
|
||||
ObLSLockMap &retry_ls_lock_map);
|
||||
int get_retry_tablet_list_(const ObLockIDArray &lock_ids,
|
||||
common::ObTabletIDArray &retry_tablets);
|
||||
template<class RpcProxy>
|
||||
int get_retry_tablet_list_(const ObLSLockMap &ls_lock_map,
|
||||
const RpcProxy &proxy_batch,
|
||||
const ObArray<share::ObLSID> &ls_array,
|
||||
common::ObTabletIDArray &retry_tablets);
|
||||
int parallel_send_rpc_task_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
const ObTableLockTaskType lock_task_type,
|
||||
const ObLSLockMap &ls_lock_map,
|
||||
const ObTableLockMode lock_mode,
|
||||
const ObTableLockOwnerID lock_owner,
|
||||
ObRetryCtx &retry_ctx);
|
||||
template<class RpcProxy>
|
||||
int send_one_rpc_task_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
const share::ObLSID &ls_id,
|
||||
const ObLockIDArray &lock_ids,
|
||||
const ObTableLockMode lock_mode,
|
||||
const ObTableLockOwnerID lock_owner,
|
||||
ObRetryCtx &retry_ctx);
|
||||
template<class RpcProxy>
|
||||
int send_rpc_task_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
const share::ObLSID &ls_id,
|
||||
const ObLockIDArray &lock_ids,
|
||||
const ObTableLockMode lock_mode,
|
||||
const ObTableLockOwnerID lock_owner,
|
||||
ObRetryCtx &retry_ctx);
|
||||
int get_retry_lock_ids_(const ObLockIDArray &lock_ids,
|
||||
const int64_t start_pos,
|
||||
ObLockIDArray &retry_lock_ids);
|
||||
int get_retry_lock_ids_(const share::ObLSID &ls_id,
|
||||
const ObLSLockMap &ls_lock_map,
|
||||
const int64_t start_pos,
|
||||
ObLockIDArray &retry_lock_ids);
|
||||
int collect_rollback_info_(const share::ObLSID &ls_id,
|
||||
ObTableLockCtx &ctx);
|
||||
int collect_rollback_info_(const ObArray<share::ObLSID> &ls_array,
|
||||
ObTableLockCtx &ctx);
|
||||
template<class RpcProxy>
|
||||
int collect_rollback_info_(const ObArray<share::ObLSID> &ls_array,
|
||||
RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx);
|
||||
int inner_process_obj_lock_(ObTableLockCtx &ctx,
|
||||
const LockMap &lock_map,
|
||||
const ObLSLockMap &ls_lock_map,
|
||||
@ -436,6 +492,7 @@ private:
|
||||
const ObTableLockOwnerID lock_owner);
|
||||
int inner_process_obj_lock_old_version_(ObTableLockCtx &ctx,
|
||||
const LockMap &lock_map,
|
||||
const ObLSLockMap &ls_lock_map,
|
||||
const ObTableLockMode lock_mode,
|
||||
const ObTableLockOwnerID lock_owner);
|
||||
int inner_process_obj_lock_batch_(ObTableLockCtx &ctx,
|
||||
@ -470,7 +527,8 @@ private:
|
||||
const ObLSLockMap &ls_lock_map);
|
||||
int pre_check_lock_old_version_(ObTableLockCtx &ctx,
|
||||
const ObTableLockMode lock_mode,
|
||||
const ObTableLockOwnerID lock_owner);
|
||||
const ObTableLockOwnerID lock_owner,
|
||||
const ObLSLockMap &ls_lock_map);
|
||||
// used by deadlock detector.
|
||||
int deal_with_deadlock_(ObTableLockCtx &ctx);
|
||||
int get_table_partition_level_(const ObTableID table_id, ObPartitionLevel &part_level);
|
||||
|
||||
Reference in New Issue
Block a user