From f67db3c34f6f8e288e5fe104807dc8c3a4ae7eda Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 9 Feb 2024 02:07:09 +0000 Subject: [PATCH] BUGFIX: ddl lock table using lock thread group --- .../test_ob_table_lock_service.cpp | 15 +- src/observer/ob_inner_sql_connection.cpp | 14 +- src/observer/ob_inner_sql_connection.h | 14 +- src/share/ob_cluster_version.h | 1 + src/share/transfer/ob_transfer_info.cpp | 5 +- src/share/transfer/ob_transfer_info.h | 11 +- src/storage/ddl/ob_ddl_lock.cpp | 51 ++++-- .../ob_lock_inner_connection_util.cpp | 13 +- .../tablelock/ob_lock_inner_connection_util.h | 12 +- src/storage/tablelock/ob_lock_memtable.cpp | 6 + .../tablelock/ob_table_lock_rpc_struct.cpp | 164 ++++++++++++++++++ .../tablelock/ob_table_lock_rpc_struct.h | 109 ++++++++++-- .../tablelock/ob_table_lock_service.cpp | 90 ++++++++-- src/storage/tablelock/ob_table_lock_service.h | 5 + 14 files changed, 432 insertions(+), 78 deletions(-) diff --git a/mittest/simple_server/test_ob_table_lock_service.cpp b/mittest/simple_server/test_ob_table_lock_service.cpp index 44a02f7ed1..dd4e2e3dca 100644 --- a/mittest/simple_server/test_ob_table_lock_service.cpp +++ b/mittest/simple_server/test_ob_table_lock_service.cpp @@ -392,6 +392,7 @@ TEST_F(ObTableLockServiceTest, lock_part) ObTableLockOwnerID OWNER_ONE(1); ObTableLockOwnerID OWNER_TWO(2); ObLockPartitionRequest lock_arg; + ObUnLockPartitionRequest unlock_arg; tx_param.access_mode_ = ObTxAccessMode::RW; tx_param.isolation_ = ObTxIsolationLevel::RC; @@ -460,16 +461,16 @@ TEST_F(ObTableLockServiceTest, lock_part) get_table_part_ids(table_id, part_ids); lock_mode = ROW_EXCLUSIVE; - lock_arg.owner_id_ = OWNER_ONE; - lock_arg.lock_mode_ = lock_mode; - lock_arg.op_type_ = OUT_TRANS_UNLOCK; - lock_arg.timeout_us_ = 0; - lock_arg.table_id_ = table_id; - lock_arg.part_object_id_ = part_ids[0]; + unlock_arg.owner_id_ = OWNER_ONE; + unlock_arg.lock_mode_ = lock_mode; + unlock_arg.op_type_ = OUT_TRANS_UNLOCK; + unlock_arg.timeout_us_ = 0; + unlock_arg.table_id_ = table_id; + unlock_arg.part_object_id_ = part_ids[0]; ret = MTL(ObTableLockService*)->unlock_partition(*tx_desc, tx_param, - lock_arg); + unlock_arg); ASSERT_EQ(OB_SUCCESS, ret); // commit LOG_INFO("ObTableLockServiceTest::lock_part 4.2"); diff --git a/src/observer/ob_inner_sql_connection.cpp b/src/observer/ob_inner_sql_connection.cpp index fd32321a15..e4b37b4c2b 100644 --- a/src/observer/ob_inner_sql_connection.cpp +++ b/src/observer/ob_inner_sql_connection.cpp @@ -1184,15 +1184,17 @@ int ObInnerSQLConnection::register_multi_data_source(const uint64_t &tenant_id, int ObInnerSQLConnection::forward_request(const uint64_t tenant_id, const int64_t op_type, const ObString &sql, - ObInnerSQLResult &res) + ObInnerSQLResult &res, + const int32_t group_id) { - return forward_request_(tenant_id, op_type, sql, res); + return forward_request_(tenant_id, op_type, sql, res, group_id); } int ObInnerSQLConnection::forward_request_(const uint64_t tenant_id, const int64_t op_type, const ObString &sql, - ObInnerSQLResult &res) + ObInnerSQLResult &res, + const int32_t group_id) { int ret = OB_SUCCESS; @@ -1204,6 +1206,10 @@ int ObInnerSQLConnection::forward_request_(const uint64_t tenant_id, ObSQLMode sql_mode = 0; const ObSessionDDLInfo &ddl_info = get_session().get_ddl_info(); bool is_load_data_exec = get_session().is_load_data_exec_session(); + int32_t real_group_id = group_id_; + if (0 != group_id) { + real_group_id = group_id; + } if (is_resource_conn()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("resource_conn of resource_svr still doesn't has the tenant resource", K(ret), @@ -1233,7 +1239,7 @@ int ObInnerSQLConnection::forward_request_(const uint64_t tenant_id, } else if (OB_FAIL(GCTX.inner_sql_rpc_proxy_->to(get_resource_svr()) .by(tenant_id) .timeout(query_timeout) - .group_id(group_id_) + .group_id(real_group_id) .inner_sql_sync_transmit(arg, *(handler->get_result()), handler->get_handle()))) { diff --git a/src/observer/ob_inner_sql_connection.h b/src/observer/ob_inner_sql_connection.h index ddfabf9bdc..a5317b6d25 100644 --- a/src/observer/ob_inner_sql_connection.h +++ b/src/observer/ob_inner_sql_connection.h @@ -62,10 +62,10 @@ class ObLockObjRequest; class ObLockTableRequest; class ObLockTabletRequest; class ObLockPartitionRequest; -using ObUnLockObjRequest = ObLockObjRequest; -using ObUnLockTableRequest = ObLockTableRequest; -using ObUnLockPartitionRequest = ObLockPartitionRequest; -using ObUnLockTabletRequest = ObLockTabletRequest; +class ObUnLockObjRequest; +class ObUnLockTableRequest; +class ObUnLockPartitionRequest; +class ObUnLockTabletRequest; } } namespace observer @@ -254,7 +254,8 @@ public: int forward_request(const uint64_t tenant_id, const int64_t op_type, const ObString &sql, - ObInnerSQLResult &res); + ObInnerSQLResult &res, + const int32_t group_id = 0); public: // nested session and sql execute for foreign key. @@ -366,7 +367,8 @@ private: int forward_request_(const uint64_t tenant_id, const int64_t op_type, const ObString &sql, - ObInnerSQLResult &res); + ObInnerSQLResult &res, + const int32_t group_id = 0); int get_session_timeout_for_rpc(int64_t &query_timeout, int64_t &trx_timeout); private: bool inited_; diff --git a/src/share/ob_cluster_version.h b/src/share/ob_cluster_version.h index fd644c469e..c93addd14c 100644 --- a/src/share/ob_cluster_version.h +++ b/src/share/ob_cluster_version.h @@ -172,6 +172,7 @@ cal_version(const uint64_t major, const uint64_t minor, const uint64_t major_pat #define CLUSTER_VERSION_4_2_1_0 (oceanbase::common::cal_version(4, 2, 1, 0)) #define CLUSTER_VERSION_4_2_1_1 (oceanbase::common::cal_version(4, 2, 1, 1)) #define CLUSTER_VERSION_4_2_1_2 (oceanbase::common::cal_version(4, 2, 1, 2)) +#define CLUSTER_VERSION_4_2_1_3 (oceanbase::common::cal_version(4, 2, 1, 3)) #define CLUSTER_VERSION_4_2_2_0 (oceanbase::common::cal_version(4, 2, 2, 0)) #define CLUSTER_VERSION_4_3_0_0 (oceanbase::common::cal_version(4, 3, 0, 0)) //!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! diff --git a/src/share/transfer/ob_transfer_info.cpp b/src/share/transfer/ob_transfer_info.cpp index 44c4959387..115a38d799 100644 --- a/src/share/transfer/ob_transfer_info.cpp +++ b/src/share/transfer/ob_transfer_info.cpp @@ -828,14 +828,13 @@ int ObTransferLockUtil::unlock_tablet_on_src_ls_for_table_lock( return ret; } -template int ObTransferLockUtil::process_table_lock_on_tablets_( ObMySQLTransaction &trans, const uint64_t tenant_id, const ObLSID &ls_id, const transaction::tablelock::ObTableLockOwnerID &lock_owner_id, const ObDisplayTabletList &table_lock_tablet_list, - LockArg &lock_arg) + ObLockAloneTabletRequest &lock_arg) { int ret = OB_SUCCESS; lock_arg.tablet_ids_.reset(); @@ -872,7 +871,7 @@ int ObTransferLockUtil::process_table_lock_on_tablets_( LOG_WARN("lock tablet failed", KR(ret), K(tenant_id), K(lock_arg)); } } else if (OUT_TRANS_UNLOCK == lock_arg.op_type_) { - if (OB_FAIL(ObInnerConnectionLockUtil::unlock_tablet(tenant_id, lock_arg, conn))) { + if (OB_FAIL(ObInnerConnectionLockUtil::unlock_tablet(tenant_id, static_cast(lock_arg), conn))) { LOG_WARN("unock tablet failed", KR(ret), K(tenant_id), K(lock_arg)); } } else { diff --git a/src/share/transfer/ob_transfer_info.h b/src/share/transfer/ob_transfer_info.h index a9bab01aa7..fabea0164c 100644 --- a/src/share/transfer/ob_transfer_info.h +++ b/src/share/transfer/ob_transfer_info.h @@ -28,6 +28,14 @@ namespace oceanbase { +namespace transaction +{ +namespace tablelock +{ +class ObLockAloneTabletRequest; +} +} + namespace share { namespace schema @@ -455,14 +463,13 @@ public: const transaction::tablelock::ObTableLockOwnerID &lock_owner_id, const ObDisplayTabletList &table_lock_tablet_list); private: - template static int process_table_lock_on_tablets_( ObMySQLTransaction &trans, const uint64_t tenant_id, const ObLSID &ls_id, const transaction::tablelock::ObTableLockOwnerID &lock_owner_id, const ObDisplayTabletList &table_lock_tablet_list, - LockArg &lock_arg); + transaction::tablelock::ObLockAloneTabletRequest &lock_arg); }; } // end namespace share diff --git a/src/storage/ddl/ob_ddl_lock.cpp b/src/storage/ddl/ob_ddl_lock.cpp index 43fdff0ec7..dc8ee193bd 100644 --- a/src/storage/ddl/ob_ddl_lock.cpp +++ b/src/storage/ddl/ob_ddl_lock.cpp @@ -395,17 +395,23 @@ int ObDDLLock::do_table_lock( ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid conn", K(ret)); } else { - ObLockTableRequest arg; - arg.table_id_ = table_id; - arg.owner_id_ = lock_owner; - arg.lock_mode_ = lock_mode; - arg.op_type_ = op_type; - arg.timeout_us_ = timeout_us; if (is_lock) { + ObLockTableRequest arg; + arg.table_id_ = table_id; + arg.owner_id_ = lock_owner; + arg.lock_mode_ = lock_mode; + arg.op_type_ = op_type; + arg.timeout_us_ = timeout_us; if (OB_FAIL(ObInnerConnectionLockUtil::lock_table(tenant_id, arg, iconn))) { LOG_WARN("failed to lock table", K(ret)); } } else { + ObUnLockTableRequest arg; + arg.table_id_ = table_id; + arg.owner_id_ = lock_owner; + arg.lock_mode_ = lock_mode; + arg.op_type_ = op_type; + arg.timeout_us_ = timeout_us; if (OB_FAIL(ObInnerConnectionLockUtil::unlock_table(tenant_id, arg, iconn))) { if (OB_OBJ_LOCK_NOT_EXIST == ret) { ret = OB_SUCCESS; @@ -450,7 +456,9 @@ int ObDDLLock::do_table_lock( if (OB_SUCC(ret)) { ObArray ls_ids; - ObArray args; + ObArray lock_args; + ObArray unlock_args; + int64_t arg_count = 0; if (OB_FAIL(share::ObTabletToLSTableOperator::batch_get_ls(trans, tenant_id, tablet_ids, ls_ids))) { LOG_WARN("failed to get tablet ls", K(ret)); } else if (OB_UNLIKELY(ls_ids.count() != tablet_ids.count())) { @@ -460,39 +468,44 @@ int ObDDLLock::do_table_lock( for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); i++) { const ObLSID &ls_id = ls_ids[i]; int64_t j = 0; - for (; j < args.count(); j++) { - if (args[j].ls_id_ == ls_id) { + for (; j < (is_lock ? lock_args.count() : unlock_args.count()); j++) { + const ObLSID &tmp_ls_id = is_lock ? lock_args[j].ls_id_ : unlock_args[j].ls_id_; + if (tmp_ls_id == ls_id) { break; } } - if (j == args.count()) { - ObLockAloneTabletRequest arg; + arg_count = is_lock ? lock_args.count() : unlock_args.count(); + if (j == arg_count) { + ObLockAloneTabletRequest lock_arg; + ObUnLockAloneTabletRequest unlock_arg; + ObLockAloneTabletRequest &arg = is_lock ? lock_arg : unlock_arg; arg.owner_id_ = lock_owner; arg.lock_mode_ = lock_mode; arg.op_type_ = op_type; arg.timeout_us_ = timeout_us; arg.ls_id_ = ls_id; - if (OB_FAIL(args.push_back(arg))) { + if (OB_FAIL(is_lock ? lock_args.push_back(lock_arg) : unlock_args.push_back(unlock_arg))) { LOG_WARN("failed to push back modify arg", K(ret)); } } if (OB_SUCC(ret)) { - ObLockAloneTabletRequest &arg = args.at(j); + ObLockAloneTabletRequest &arg = is_lock ? lock_args.at(j) : unlock_args.at(j); if (OB_FAIL(arg.tablet_ids_.push_back(tablet_ids.at(i)))) { LOG_WARN("failed to push back", K(ret)); } } } - for (int64_t i = 0; OB_SUCC(ret) && i < args.count(); i++) { + arg_count = is_lock ? lock_args.count() : unlock_args.count(); + for (int64_t i = 0; OB_SUCC(ret) && i < arg_count; i++) { if (is_lock) { - if (OB_FAIL(ObInnerConnectionLockUtil::lock_tablet(tenant_id, args[i], iconn))) { - LOG_WARN("failed to lock tablet", K(ret)); + if (OB_FAIL(ObInnerConnectionLockUtil::lock_tablet(tenant_id, lock_args[i], iconn))) { + LOG_WARN("failed to lock tablet", K(ret), K(lock_args[i])); } } else { - if (OB_FAIL(ObInnerConnectionLockUtil::unlock_tablet(tenant_id, args[i], iconn))) { + if (OB_FAIL(ObInnerConnectionLockUtil::unlock_tablet(tenant_id, unlock_args[i], iconn))) { if (OB_OBJ_LOCK_NOT_EXIST == ret) { ret = OB_SUCCESS; - LOG_INFO("table lock already unlocked", K(ret), K(args[i])); + LOG_INFO("table lock already unlocked", K(ret), K(unlock_args[i])); } else { LOG_WARN("failed to unlock tablet", K(ret)); } @@ -727,7 +740,7 @@ int ObOnlineDDLLock::unlock_table( { int ret = OB_SUCCESS; ObInnerSQLConnection *iconn = nullptr; - ObLockObjRequest arg; + ObUnLockObjRequest arg; arg.obj_type_ = ObLockOBJType::OBJ_TYPE_ONLINE_DDL_TABLE; arg.obj_id_ = table_id; arg.timeout_us_ = timeout_us; diff --git a/src/storage/tablelock/ob_lock_inner_connection_util.cpp b/src/storage/tablelock/ob_lock_inner_connection_util.cpp index 49037c822a..f7a5b23275 100644 --- a/src/storage/tablelock/ob_lock_inner_connection_util.cpp +++ b/src/storage/tablelock/ob_lock_inner_connection_util.cpp @@ -666,8 +666,19 @@ int ObInnerConnectionLockUtil::request_lock_( } else if (OB_FAIL(arg.serialize(tmp_str, arg.get_serialize_size(), pos))) { LOG_WARN("serialize lock table arg failed", K(ret), K(arg)); } else { + int32_t group_id = 0; + const int64_t min_cluster_version = GET_MIN_CLUSTER_VERSION(); + if ((min_cluster_version > CLUSTER_VERSION_4_2_1_3 && min_cluster_version < CLUSTER_VERSION_4_2_2_0) + || (min_cluster_version > CLUSTER_VERSION_4_2_2_0 && min_cluster_version < CLUSTER_VERSION_4_3_0_0) + || (min_cluster_version >= CLUSTER_VERSION_4_3_0_0)) { + if (arg.is_unlock_request()) { + group_id = share::OBCG_UNLOCK; + } else { + group_id = share::OBCG_LOCK; + } + } sql.assign_ptr(tmp_str, arg.get_serialize_size()); - ret = conn->forward_request(tenant_id, operation_type, sql, res); + ret = conn->forward_request(tenant_id, operation_type, sql, res, group_id); } if (OB_NOT_NULL(tmp_str)) { diff --git a/src/storage/tablelock/ob_lock_inner_connection_util.h b/src/storage/tablelock/ob_lock_inner_connection_util.h index 4b92aee98d..f1d39a8fbc 100644 --- a/src/storage/tablelock/ob_lock_inner_connection_util.h +++ b/src/storage/tablelock/ob_lock_inner_connection_util.h @@ -45,12 +45,12 @@ class ObLockTableRequest; class ObLockTabletRequest; class ObLockPartitionRequest; class ObLockAloneTabletRequest; -using ObUnLockObjRequest = ObLockObjRequest; -using ObUnLockObjsRequest = ObLockObjsRequest; -using ObUnLockTableRequest = ObLockTableRequest; -using ObUnLockPartitionRequest = ObLockPartitionRequest; -using ObUnLockTabletRequest = ObLockTabletRequest; -using ObUnLockAloneTabletRequest = ObLockAloneTabletRequest; +class ObUnLockObjRequest; +class ObUnLockObjsRequest; +class ObUnLockTableRequest; +class ObUnLockPartitionRequest; +class ObUnLockTabletRequest; +class ObUnLockAloneTabletRequest; class ObInnerConnectionLockUtil { diff --git a/src/storage/tablelock/ob_lock_memtable.cpp b/src/storage/tablelock/ob_lock_memtable.cpp index eae49566a1..7fd87e5e20 100644 --- a/src/storage/tablelock/ob_lock_memtable.cpp +++ b/src/storage/tablelock/ob_lock_memtable.cpp @@ -39,6 +39,7 @@ using namespace share; using namespace storage; using namespace memtable; using namespace common; +using namespace oceanbase::lib; namespace transaction { @@ -146,6 +147,7 @@ int ObLockMemtable::lock_( // 1. record lock myself(check conflict). // 2. record lock at memtable ctx. // 3. create lock callback and list it on the callback list of memtable ctx. + do { // retry if there is lock conflict at part trans ctx. need_retry = false; @@ -328,6 +330,7 @@ int ObLockMemtable::unlock_( // 1. record unlock op myself(check conflict). // 2. record unlock op at memtable ctx. // 3. create unlock callback and list it on the callback list of memtable ctx. + Thread::WaitGuard guard(Thread::WAIT); do { // retry if there is lock conflict at part trans ctx. need_retry = false; @@ -549,6 +552,7 @@ int ObLockMemtable::lock( { int ret = OB_SUCCESS; LOG_DEBUG("ObLockMemtable::lock ", K(lock_op)); + Thread::WaitGuard guard(Thread::WAIT); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObLockMemtable not inited.", K(ret)); @@ -575,6 +579,7 @@ int ObLockMemtable::unlock( // only has OUT_TRANS_UNLOCK int ret = OB_SUCCESS; LOG_DEBUG("ObLockMemtable::unlock ", K(unlock_op)); + Thread::WaitGuard guard(Thread::WAIT); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObLockMemtable not inited.", K(ret)); @@ -726,6 +731,7 @@ int ObLockMemtable::get_lock_op_iter(const ObLockID &lock_id, int ObLockMemtable::check_and_clear_obj_lock(const bool force_compact) { int ret = OB_SUCCESS; + Thread::WaitGuard guard(Thread::WAIT); if (IS_NOT_INIT) { ret = OB_NOT_INIT; TABLELOCK_LOG(WARN, "ObLockMemtable not inited.", K(ret)); diff --git a/src/storage/tablelock/ob_table_lock_rpc_struct.cpp b/src/storage/tablelock/ob_table_lock_rpc_struct.cpp index 127f88e651..5ac81ee823 100644 --- a/src/storage/tablelock/ob_table_lock_rpc_struct.cpp +++ b/src/storage/tablelock/ob_table_lock_rpc_struct.cpp @@ -173,6 +173,19 @@ OB_DEF_DESERIALIZE(ObLockTaskBatchRequest) return ret; } +bool is_unlock_request(const ObTableLockTaskType type) +{ + return (UNLOCK_TABLE == type || + UNLOCK_TABLET == type || + UNLOCK_PARTITION == type || + UNLOCK_SUBPARTITION == type || + UNLOCK_OBJECT == type || + UNLOCK_DDL_TABLE == type || + UNLOCK_DDL_TABLET == type || + UNLOCK_ALONE_TABLET == type); +} + + void ObLockParam::reset() { lock_id_.reset(); @@ -248,6 +261,14 @@ bool ObLockRequest::is_valid() const is_op_type_valid(op_type_)); } +bool ObLockRequest::is_lock_thread_enabled() const +{ + const int64_t min_cluster_version = GET_MIN_CLUSTER_VERSION(); + return ((min_cluster_version > CLUSTER_VERSION_4_2_1_3 && min_cluster_version < CLUSTER_VERSION_4_2_2_0) + || (min_cluster_version > CLUSTER_VERSION_4_2_2_0 && min_cluster_version < CLUSTER_VERSION_4_3_0_0) + || (min_cluster_version >= CLUSTER_VERSION_4_3_0_0)); +} + void ObLockObjRequest::reset() { ObLockRequest::reset(); @@ -263,6 +284,27 @@ bool ObLockObjRequest::is_valid() const is_valid_id(obj_id_)); } +ObUnLockObjRequest::ObUnLockObjRequest() : ObLockObjRequest() +{ + if (is_lock_thread_enabled()) { + type_ = ObLockMsgType::LOCK_OBJ_REQ; + } else { + type_ = ObLockMsgType::UNLOCK_OBJ_REQ; + } +} + +bool ObUnLockObjRequest::is_valid() const +{ + bool valid = true; + valid = (ObLockRequest::is_valid() && + is_lock_obj_type_valid(obj_type_) && + is_valid_id(obj_id_)); + + valid = valid && (ObLockMsgType::LOCK_OBJ_REQ == type_ || + ObLockMsgType::UNLOCK_OBJ_REQ == type_); + return valid; +} + void ObLockObjsRequest::reset() { ObLockRequest::reset(); @@ -280,6 +322,28 @@ bool ObLockObjsRequest::is_valid() const return is_valid; } +ObUnLockObjsRequest::ObUnLockObjsRequest() + : ObLockObjsRequest() +{ + if (is_lock_thread_enabled()) { + type_ = ObLockMsgType::LOCK_OBJ_REQ; + } else { + type_ = ObLockMsgType::UNLOCK_OBJ_REQ; + } +} + +bool ObUnLockObjsRequest::is_valid() const +{ + bool is_valid = true; + is_valid = (ObLockRequest::is_valid()); + for (int64_t i = 0; i < objs_.count() && is_valid; i++) { + is_valid = is_valid && objs_.at(i).is_valid(); + } + is_valid = is_valid && (ObLockMsgType::LOCK_OBJ_REQ == type_ || + ObLockMsgType::UNLOCK_OBJ_REQ == type_); + return is_valid; +} + void ObLockTableRequest::reset() { ObLockRequest::reset(); @@ -293,6 +357,25 @@ bool ObLockTableRequest::is_valid() const is_valid_id(table_id_)); } +ObUnLockTableRequest::ObUnLockTableRequest() : ObLockTableRequest() +{ + if (is_lock_thread_enabled()) { + type_ = ObLockMsgType::LOCK_TABLE_REQ; + } else { + type_ = ObLockMsgType::UNLOCK_TABLE_REQ; + } +} + +bool ObUnLockTableRequest::is_valid() const +{ + bool is_valid = true; + is_valid = (ObLockRequest::is_valid() && + is_valid_id(table_id_) && + (ObLockMsgType::LOCK_TABLE_REQ == type_ || + ObLockMsgType::UNLOCK_TABLE_REQ == type_)); + return is_valid; +} + void ObLockPartitionRequest::reset() { ObLockTableRequest::reset(); @@ -307,6 +390,25 @@ bool ObLockPartitionRequest::is_valid() const is_valid_id(part_object_id_)); } +ObUnLockPartitionRequest::ObUnLockPartitionRequest() + : ObLockPartitionRequest() +{ + if (is_lock_thread_enabled()) { + type_ = ObLockMsgType::LOCK_PARTITION_REQ; + } else { + type_ = ObLockMsgType::UNLOCK_PARTITION_REQ; + } +} + +bool ObUnLockPartitionRequest::is_valid() const +{ + return ((ObLockMsgType::LOCK_PARTITION_REQ == type_ || + ObLockMsgType::UNLOCK_PARTITION_REQ == type_) && + ObLockRequest::is_valid() && + is_valid_id(table_id_) && + is_valid_id(part_object_id_)); +} + void ObLockTabletRequest::reset() { ObLockTableRequest::reset(); @@ -321,6 +423,24 @@ bool ObLockTabletRequest::is_valid() const tablet_id_.is_valid()); } +ObUnLockTabletRequest::ObUnLockTabletRequest() : ObLockTabletRequest() +{ + if (is_lock_thread_enabled()) { + type_ = ObLockMsgType::LOCK_TABLET_REQ; + } else { + type_ = ObLockMsgType::UNLOCK_TABLET_REQ; + } +} + +bool ObUnLockTabletRequest::is_valid() const +{ + return ((ObLockMsgType::LOCK_TABLET_REQ == type_ || + ObLockMsgType::UNLOCK_TABLET_REQ == type_) && + ObLockRequest::is_valid() && + is_valid_id(table_id_) && + tablet_id_.is_valid()); +} + void ObLockTabletsRequest::reset() { ObLockTableRequest::reset(); @@ -339,6 +459,28 @@ bool ObLockTabletsRequest::is_valid() const return is_valid; } +ObUnLockTabletsRequest::ObUnLockTabletsRequest() : ObLockTabletsRequest() +{ + if (is_lock_thread_enabled()) { + type_ = ObLockMsgType::LOCK_TABLET_REQ; + } else { + type_ = ObLockMsgType::UNLOCK_TABLET_REQ; + } +} + +bool ObUnLockTabletsRequest::is_valid() const +{ + bool is_valid = true; + is_valid = ((ObLockMsgType::LOCK_TABLET_REQ == type_ || + ObLockMsgType::UNLOCK_TABLET_REQ == type_) && + ObLockRequest::is_valid() && + is_valid_id(table_id_)); + for (int64_t i = 0; i < tablet_ids_.count() && is_valid; i++) { + is_valid = is_valid && tablet_ids_.at(i).is_valid(); + } + return is_valid; +} + void ObLockAloneTabletRequest::reset() { ObLockTabletsRequest::reset(); @@ -357,6 +499,28 @@ bool ObLockAloneTabletRequest::is_valid() const return is_valid; } +ObUnLockAloneTabletRequest::ObUnLockAloneTabletRequest() : ObLockAloneTabletRequest() +{ + if (is_lock_thread_enabled()) { + type_ = ObLockMsgType::LOCK_ALONE_TABLET_REQ; + } else { + type_ = ObLockMsgType::UNLOCK_ALONE_TABLET_REQ; + } +} + +bool ObUnLockAloneTabletRequest::is_valid() const +{ + bool is_valid = true; + is_valid = ((ObLockMsgType::LOCK_ALONE_TABLET_REQ == type_ || + ObLockMsgType::UNLOCK_ALONE_TABLET_REQ == type_) && + ObLockRequest::is_valid() && + ls_id_.is_valid()); + for (int64_t i = 0; i < tablet_ids_.count() && is_valid; i++) { + is_valid = is_valid && tablet_ids_.at(i).is_valid(); + } + return is_valid; +} + int ObTableLockTaskRequest::set( const ObTableLockTaskType task_type, const share::ObLSID &lsid, diff --git a/src/storage/tablelock/ob_table_lock_rpc_struct.h b/src/storage/tablelock/ob_table_lock_rpc_struct.h index 59f4f59e10..a8f46ccd47 100644 --- a/src/storage/tablelock/ob_table_lock_rpc_struct.h +++ b/src/storage/tablelock/ob_table_lock_rpc_struct.h @@ -58,6 +58,8 @@ enum ObTableLockTaskType MAX_TASK_TYPE, }; +bool is_unlock_request(const ObTableLockTaskType type); + struct ObLockParam { OB_UNIS_VERSION_V(1); @@ -117,6 +119,11 @@ public: LOCK_PARTITION_REQ = 5, LOCK_TABLET_REQ = 6, LOCK_ALONE_TABLET_REQ = 7, + UNLOCK_OBJ_REQ = 8, + UNLOCK_TABLE_REQ = 9, + UNLOCK_PARTITION_REQ = 10, + UNLOCK_TABLET_REQ = 11, + UNLOCK_ALONE_TABLET_REQ = 12, }; public: ObLockRequest() : @@ -129,6 +136,19 @@ public: virtual ~ObLockRequest() { reset(); } virtual void reset(); virtual bool is_valid() const; + bool is_lock_thread_enabled() const; + bool is_unlock_request() const + { + return (ObLockMsgType::UNLOCK_OBJ_REQ == type_ || + ObLockMsgType::UNLOCK_TABLE_REQ == type_ || + ObLockMsgType::UNLOCK_PARTITION_REQ == type_ || + ObLockMsgType::UNLOCK_TABLET_REQ == type_ || + ObLockMsgType::UNLOCK_ALONE_TABLET_REQ == type_); + } + bool is_lock_request() const + { + return !is_unlock_request(); + } VIRTUAL_TO_STRING_KV(K_(owner_id), K_(lock_mode), K_(op_type), K_(timeout_us)); public: ObLockMsgType type_; @@ -157,7 +177,14 @@ public: ObLockOBJType obj_type_; uint64_t obj_id_; }; -using ObUnLockObjRequest = ObLockObjRequest; + +struct ObUnLockObjRequest : ObLockObjRequest +{ +public: + ObUnLockObjRequest(); + virtual ~ObUnLockObjRequest() { reset(); } + virtual bool is_valid() const; +}; struct ObLockObjsRequest : public ObLockRequest { @@ -174,8 +201,16 @@ public: public: // which objects should we lock common::ObSEArray objs_; - }; -using ObUnLockObjsRequest = ObLockObjsRequest; +}; + +struct ObUnLockObjsRequest : public ObLockObjsRequest +{ +public: + ObUnLockObjsRequest(); + virtual ~ObUnLockObjsRequest() { reset(); } + virtual bool is_valid() const; +}; + struct ObLockTableRequest : public ObLockRequest { @@ -191,7 +226,14 @@ public: // which table should we lock uint64_t table_id_; }; -using ObUnLockTableRequest = ObLockTableRequest; + +struct ObUnLockTableRequest : public ObLockTableRequest +{ +public: + ObUnLockTableRequest(); + virtual ~ObUnLockTableRequest() { reset(); } + virtual bool is_valid() const; +}; struct ObLockPartitionRequest : public ObLockTableRequest { @@ -206,7 +248,14 @@ public: public: uint64_t part_object_id_; }; -using ObUnLockPartitionRequest = ObLockPartitionRequest; + +struct ObUnLockPartitionRequest : public ObLockPartitionRequest +{ +public: + ObUnLockPartitionRequest(); + virtual ~ObUnLockPartitionRequest() { reset(); } + virtual bool is_valid() const; +}; struct ObLockTabletRequest : public ObLockTableRequest { @@ -221,7 +270,14 @@ public: public: common::ObTabletID tablet_id_; }; -using ObUnLockTabletRequest = ObLockTabletRequest; + +struct ObUnLockTabletRequest : public ObLockTabletRequest +{ +public: + ObUnLockTabletRequest(); + virtual ~ObUnLockTabletRequest() { reset(); } + virtual bool is_valid() const; +}; struct ObLockTabletsRequest : public ObLockTableRequest { @@ -236,7 +292,14 @@ public: public: common::ObTabletIDArray tablet_ids_; }; -using ObUnLockTabletsRequest = ObLockTabletsRequest; + +struct ObUnLockTabletsRequest : public ObLockTabletsRequest +{ +public: + ObUnLockTabletsRequest(); + virtual ~ObUnLockTabletsRequest() { reset(); } + virtual bool is_valid() const; +}; struct ObLockAloneTabletRequest : public ObLockTabletsRequest { @@ -251,7 +314,14 @@ public: public: share::ObLSID ls_id_; }; -using ObUnLockAloneTabletRequest = ObLockAloneTabletRequest; + +struct ObUnLockAloneTabletRequest : public ObLockAloneTabletRequest +{ +public: + ObUnLockAloneTabletRequest(); + virtual ~ObUnLockAloneTabletRequest() { reset(); } + virtual bool is_valid() const; +}; class ObTableLockTaskRequest final { @@ -281,7 +351,14 @@ public: && OB_NOT_NULL(tx_desc_) && tx_desc_->is_valid()); } - + bool is_unlock_request() const + { + return ::oceanbase::transaction::tablelock::is_unlock_request(task_type_); + } + bool is_lock_request() const + { + return !is_unlock_request(); + } bool is_timeout() const; TO_STRING_KV(K(task_type_), K(lsid_), K(param_), KP(tx_desc_)); @@ -313,6 +390,14 @@ public: bool is_inited() const; bool is_valid() const; int assign(const ObLockTaskBatchRequest &arg); + bool is_unlock_request() const + { + return ::oceanbase::transaction::tablelock::is_unlock_request(task_type_); + } + bool is_lock_request() const + { + return !is_unlock_request(); + } TO_STRING_KV(K(task_type_), K(lsid_), K(params_), KPC(tx_desc_)); public: @@ -476,9 +561,9 @@ public: share::SCN commit_scn_; }; -} -} -} +} // namespace tablelock +} // namespace transaction +} // namespace oceanbase #endif /* OCEANBASE_STORAGE_TABLELOCK_OB_TABLE_LOCK_RPC_STRUCT_H_ */ diff --git a/src/storage/tablelock/ob_table_lock_service.cpp b/src/storage/tablelock/ob_table_lock_service.cpp index e3d50c379f..7a88a5c580 100644 --- a/src/storage/tablelock/ob_table_lock_service.cpp +++ b/src/storage/tablelock/ob_table_lock_service.cpp @@ -442,6 +442,7 @@ int ObTableLockService::lock_table(const uint64_t table_id, int64_t abs_timeout_ts = (0 == timeout_us) ? ObTimeUtility::current_time() + DEFAULT_TIMEOUT_US : ObTimeUtility::current_time() + timeout_us; + Thread::WaitGuard guard(Thread::WAIT); do { if (timeout_us != 0) { retry_timeout_us = abs_timeout_ts - ObTimeUtility::current_time(); @@ -479,6 +480,7 @@ int ObTableLockService::unlock_table(const uint64_t table_id, int64_t abs_timeout_ts = (0 == timeout_us) ? ObTimeUtility::current_time() + DEFAULT_TIMEOUT_US : ObTimeUtility::current_time() + timeout_us; + Thread::WaitGuard guard(Thread::WAIT); do { if (timeout_us != 0) { retry_timeout_us = abs_timeout_ts - ObTimeUtility::current_time(); @@ -518,6 +520,7 @@ int ObTableLockService::lock_tablet(const uint64_t table_id, int64_t abs_timeout_ts = (0 == timeout_us) ? ObTimeUtility::current_time() + DEFAULT_TIMEOUT_US : ObTimeUtility::current_time() + timeout_us; + Thread::WaitGuard guard(Thread::WAIT); do { if (timeout_us != 0) { retry_timeout_us = abs_timeout_ts - ObTimeUtility::current_time(); @@ -560,6 +563,7 @@ int ObTableLockService::unlock_tablet(const uint64_t table_id, int64_t abs_timeout_ts = (0 == timeout_us) ? ObTimeUtility::current_time() + DEFAULT_TIMEOUT_US : ObTimeUtility::current_time() + timeout_us; + Thread::WaitGuard guard(Thread::WAIT); do { if (timeout_us != 0) { retry_timeout_us = abs_timeout_ts - ObTimeUtility::current_time(); @@ -599,6 +603,7 @@ int ObTableLockService::lock_table(ObTxDesc &tx_desc, // is set by user in the 'WAIT n' option. // Furthermore, if timeout_us_ is 0, this lock will be judged as a try // lock semantics. It meets the actual semantics of 'NOWAIT' option. + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(LOCK_TABLE, arg.table_id_, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; ctx.tx_desc_ = &tx_desc; @@ -628,6 +633,7 @@ int ObTableLockService::unlock_table(ObTxDesc &tx_desc, LOG_WARN("invalid argument", K(ret), K(tx_desc), K(arg), K(tx_desc.is_valid()), K(tx_param.is_valid()), K(arg.is_valid())); } else { + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(UNLOCK_TABLE, arg.table_id_, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; ctx.tx_desc_ = &tx_desc; @@ -655,6 +661,7 @@ int ObTableLockService::lock_tablet(ObTxDesc &tx_desc, LOG_WARN("invalid argument", K(ret), K(tx_desc), K(arg), K(tx_desc.is_valid()), K(tx_param.is_valid()), K(arg.is_valid())); } else { + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(LOCK_TABLET, arg.table_id_, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; @@ -689,6 +696,7 @@ int ObTableLockService::lock_tablet(ObTxDesc &tx_desc, } else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) { LOG_WARN("data version check failed", K(ret), K(arg)); } else { + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(LOCK_TABLET, arg.table_id_, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; @@ -722,6 +730,7 @@ int ObTableLockService::unlock_tablet(ObTxDesc &tx_desc, LOG_WARN("invalid argument", K(ret), K(tx_desc), K(arg), K(tx_desc.is_valid()), K(tx_param.is_valid()), K(arg.is_valid())); } else { + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(UNLOCK_TABLET, arg.table_id_, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; @@ -757,6 +766,7 @@ int ObTableLockService::unlock_tablet(ObTxDesc &tx_desc, } else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) { LOG_WARN("data version check failed", K(ret), K(arg)); } else { + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(UNLOCK_TABLET, arg.table_id_, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; ctx.tx_desc_ = &tx_desc; @@ -790,6 +800,7 @@ int ObTableLockService::lock_tablet(ObTxDesc &tx_desc, } else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) { LOG_WARN("cluster version check failed", K(ret), K(arg)); } else { + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(LOCK_ALONE_TABLET, arg.table_id_, arg.ls_id_, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; @@ -825,6 +836,7 @@ int ObTableLockService::unlock_tablet(ObTxDesc &tx_desc, } else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) { LOG_WARN("cluster version check failed", K(ret), K(arg)); } else { + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(UNLOCK_ALONE_TABLET, arg.table_id_, arg.ls_id_, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; @@ -854,6 +866,7 @@ int ObTableLockService::lock_partition_or_subpartition(ObTxDesc &tx_desc, } else if (OB_FAIL(get_table_partition_level_(arg.table_id_, part_level))) { LOG_WARN("can not get table partition level", K(ret), K(arg)); } else { + Thread::WaitGuard guard(Thread::WAIT); switch (part_level) { case PARTITION_LEVEL_ONE: { if (OB_FAIL(lock_partition(tx_desc, tx_param, arg))) { @@ -894,6 +907,7 @@ int ObTableLockService::lock_partition(ObTxDesc &tx_desc, } else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_1_0_0))) { LOG_WARN("cluster version check failed", K(ret), K(arg)); } else { + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(LOCK_PARTITION, arg.table_id_, arg.part_object_id_, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; @@ -909,7 +923,7 @@ int ObTableLockService::lock_partition(ObTxDesc &tx_desc, int ObTableLockService::unlock_partition(ObTxDesc &tx_desc, const ObTxParam &tx_param, - const ObLockPartitionRequest &arg) + const ObUnLockPartitionRequest &arg) { int ret = OB_SUCCESS; @@ -926,6 +940,7 @@ int ObTableLockService::unlock_partition(ObTxDesc &tx_desc, } else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_1_0_0))) { LOG_WARN("cluster version check failed", K(ret), K(arg)); } else { + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(UNLOCK_PARTITION, arg.table_id_, arg.part_object_id_, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; @@ -956,6 +971,7 @@ int ObTableLockService::lock_subpartition(ObTxDesc &tx_desc, } else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_1_0_0))) { LOG_WARN("cluster version check failed", K(ret), K(arg)); } else { + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(LOCK_SUBPARTITION, arg.table_id_, arg.part_object_id_, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; @@ -971,7 +987,7 @@ int ObTableLockService::lock_subpartition(ObTxDesc &tx_desc, int ObTableLockService::unlock_subpartition(ObTxDesc &tx_desc, const ObTxParam &tx_param, - const ObLockPartitionRequest &arg) + const ObUnLockPartitionRequest &arg) { int ret = OB_SUCCESS; @@ -988,6 +1004,7 @@ int ObTableLockService::unlock_subpartition(ObTxDesc &tx_desc, } else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_1_0_0))) { LOG_WARN("cluster version check failed", K(ret), K(arg)); } else { + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(UNLOCK_SUBPARTITION, arg.table_id_, arg.part_object_id_, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; @@ -1018,6 +1035,7 @@ int ObTableLockService::lock_obj(ObTxDesc &tx_desc, } else if (OB_FAIL(check_data_version_after_(DATA_VERSION_4_1_0_0))) { LOG_WARN("data version check failed", K(ret), K(arg)); } else { + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(LOCK_OBJECT, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; ctx.tx_desc_ = &tx_desc; @@ -1052,6 +1070,7 @@ int ObTableLockService::unlock_obj(ObTxDesc &tx_desc, } else if (OB_FAIL(check_data_version_after_(DATA_VERSION_4_1_0_0))) { LOG_WARN("data version check failed", K(ret), K(arg)); } else { + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(UNLOCK_OBJECT, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; ctx.tx_desc_ = &tx_desc; @@ -1087,6 +1106,7 @@ int ObTableLockService::lock_obj(ObTxDesc &tx_desc, } else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) { LOG_WARN("cluster version check failed", K(ret), K(arg)); } else { + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(LOCK_OBJECT, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; ctx.tx_desc_ = &tx_desc; @@ -1123,6 +1143,7 @@ int ObTableLockService::unlock_obj(ObTxDesc &tx_desc, } else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) { LOG_WARN("cluster version check failed", K(ret), K(arg)); } else { + Thread::WaitGuard guard(Thread::WAIT); ObTableLockCtx ctx(UNLOCK_OBJECT, arg.timeout_us_, arg.timeout_us_); ctx.is_in_trans_ = true; ctx.tx_desc_ = &tx_desc; @@ -1745,6 +1766,41 @@ int ObTableLockService::batch_pre_check_lock_(ObTableLockCtx &ctx, return ret; } +template +int ObTableLockService::rpc_call_(RpcProxy &proxy_batch, + const ObAddr &addr, + const int64_t timeout_us, + const LockRequest &request) +{ + int ret = OB_SUCCESS; + int32_t group_id = 0; + const int64_t min_cluster_version = GET_MIN_CLUSTER_VERSION(); + if ((min_cluster_version > CLUSTER_VERSION_4_2_1_3 && min_cluster_version < CLUSTER_VERSION_4_2_2_0) + || (min_cluster_version > CLUSTER_VERSION_4_2_2_0 && min_cluster_version < CLUSTER_VERSION_4_3_0_0) + || (min_cluster_version >= CLUSTER_VERSION_4_3_0_0)) { + group_id = share::OBCG_LOCK; + if (request.is_unlock_request()) { + group_id = share::OBCG_UNLOCK; + } + if (OB_FAIL(proxy_batch.call(addr, + timeout_us, + GCONF.cluster_id, + MTL_ID(), + group_id, + request))) { + LOG_WARN("failed to all async rpc", KR(ret), K(addr), K(timeout_us), K(request)); + } + } else { + if (OB_FAIL(proxy_batch.call(addr, + timeout_us, + MTL_ID(), + request))) { + LOG_WARN("failed to all async rpc", KR(ret), K(addr), K(timeout_us), K(request)); + } + } + return ret; +} + int ObTableLockService::pre_check_lock_old_version_(ObTableLockCtx &ctx, const ObTableLockMode lock_mode, const ObTableLockOwnerID lock_owner, @@ -1793,12 +1849,11 @@ int ObTableLockService::pre_check_lock_old_version_(ObTableLockCtx &ctx, } else if (ctx.is_timeout()) { ret = (last_ret == OB_TRY_LOCK_ROW_CONFLICT) ? OB_ERR_EXCLUSIVE_LOCK_CONFLICT : OB_TIMEOUT; LOG_WARN("process obj lock timeout", K(ret), K(ctx)); - } else if (OB_FAIL(proxy_batch.call(addr, - timeout_us, - ctx.tx_desc_->get_tenant_id(), - request))) { - LOG_WARN("failed to all async rpc", KR(ret), K(addr), - K(ctx.abs_timeout_ts_), K(request)); + } else if (OB_FAIL(rpc_call_(proxy_batch, + addr, + timeout_us, + request))) { + LOG_WARN("failed to all async rpc", KR(ret), K(addr), K(request)); } else { retry_ctx.send_rpc_count_++; ALLOW_NEXT_LOG(); @@ -2041,10 +2096,10 @@ int ObTableLockService::send_rpc_task_(RpcProxy &proxy_batch, } else if (ctx.is_timeout()) { ret = OB_TIMEOUT; LOG_WARN("process obj lock timeout", K(ret), K(ctx)); - } else if (OB_FAIL(proxy_batch.call(addr, - ctx.get_rpc_timeoutus(), - ctx.tx_desc_->get_tenant_id(), - request))) { + } else if (OB_FAIL(rpc_call_(proxy_batch, + addr, + ctx.get_rpc_timeoutus(), + request))) { LOG_WARN("failed to call async rpc", KR(ret), K(addr), K(ctx.abs_timeout_ts_), K(request)); } else { @@ -2178,12 +2233,11 @@ int ObTableLockService::parallel_rpc_handle_(RpcProxy &proxy_batch, } else if (ctx.is_timeout()) { ret = OB_TIMEOUT; LOG_WARN("process obj lock timeout", K(ret), K(ctx)); - } else if (OB_FAIL(proxy_batch.call(addr, - timeout_us, - ctx.tx_desc_->get_tenant_id(), - request))) { - LOG_WARN("failed to all async rpc", KR(ret), K(addr), - K(ctx.abs_timeout_ts_), K(request)); + } else if (OB_FAIL(rpc_call_(proxy_batch, + addr, + timeout_us, + request))) { + LOG_WARN("failed to all async rpc", KR(ret), K(addr), K(timeout_us), K(request)); } else { retry_ctx.send_rpc_count_++; ALLOW_NEXT_LOG(); diff --git a/src/storage/tablelock/ob_table_lock_service.h b/src/storage/tablelock/ob_table_lock_service.h index 36de52e4f4..e701d046b4 100644 --- a/src/storage/tablelock/ob_table_lock_service.h +++ b/src/storage/tablelock/ob_table_lock_service.h @@ -479,6 +479,11 @@ private: const ObTableLockMode lock_mode, const ObTableLockOwnerID lock_owner, ObRetryCtx &retry_ctx); + template + int rpc_call_(RpcProxy &proxy_batch, + const ObAddr &addr, + const int64_t timeout_us, + const LockRequest &request); int get_retry_lock_ids_(const ObLockIDArray &lock_ids, const int64_t start_pos, ObLockIDArray &retry_lock_ids);