From 9051e00cd07201a76377807b815b71014c8628e1 Mon Sep 17 00:00:00 2001 From: YangEfei Date: Fri, 22 Nov 2024 03:45:21 +0000 Subject: [PATCH] [TABLELOCK] add replace_all_locks interface for recovery of auto_split --- .../test_ob_table_lock_service.cpp | 174 +++++++++++++++- src/observer/ob_inner_sql_rpc_processor.cpp | 3 +- src/observer/ob_inner_sql_rpc_proxy.h | 1 + .../ob_lock_inner_connection_util.cpp | 119 ++++++++++- .../tablelock/ob_lock_inner_connection_util.h | 15 +- .../tablelock/ob_table_lock_rpc_struct.cpp | 196 +++++++++++++++++- .../tablelock/ob_table_lock_rpc_struct.h | 40 +++- .../tablelock/ob_table_lock_service.cpp | 127 ++++-------- src/storage/tablelock/ob_table_lock_service.h | 18 +- .../test_ob_replace_lock_request.cpp | 90 +++++++- 10 files changed, 668 insertions(+), 115 deletions(-) diff --git a/mittest/simple_server/test_ob_table_lock_service.cpp b/mittest/simple_server/test_ob_table_lock_service.cpp index b5c5e0366..b1e43babf 100644 --- a/mittest/simple_server/test_ob_table_lock_service.cpp +++ b/mittest/simple_server/test_ob_table_lock_service.cpp @@ -1893,6 +1893,178 @@ TEST_F(ObTableLockServiceTest, replace_lock_and_unlock_concurrency) owner_one); ASSERT_EQ(OB_SUCCESS, ret); } + +TEST_F(ObTableLockServiceTest, replace_all_locks) +{ + LOG_INFO("ObTableLockServiceTest::replace_all_locks"); + int ret = OB_SUCCESS; + ObTxParam tx_param; + share::ObTenantSwitchGuard tenant_guard; + ObTxDesc *tx_desc1 = nullptr; + ObTxDesc *tx_desc2= nullptr; + ObTxDesc *tx_desc3= nullptr; + ObTxDesc *tx_desc4= nullptr; + ObTransService *txs = nullptr; + uint64_t table_id = 0; + ObTableLockMode check_lock_mode = EXCLUSIVE; + ObTableLockMode new_lock_mode = SHARE; + ObTableLockOwnerID owner_one(ObTableLockOwnerID::get_owner_by_value(1)); + ObTableLockOwnerID owner_two(ObTableLockOwnerID::get_owner_by_value(2)); + ObTableLockOwnerID owner_three(ObTableLockOwnerID::get_owner_by_value(3)); + ObTableLockMode lock_mode_one = ROW_SHARE; + ObTableLockMode lock_mode_two = ROW_EXCLUSIVE; + ObLockTableRequest lock_arg; + ObLockTableRequest new_lock_arg; + ObUnLockTableRequest unlock_arg1; + ObUnLockTableRequest unlock_arg2; + int64_t stmt_timeout_ts = -1; + + tx_param.access_mode_ = ObTxAccessMode::RW; + tx_param.isolation_ = ObTxIsolationLevel::RC; + tx_param.timeout_us_ = 6000 * 1000L; + tx_param.lock_timeout_us_ = -1; + tx_param.cluster_id_ = GCONF.cluster_id; + + ret = tenant_guard.switch_to(OB_SYS_TENANT_ID); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_NE(nullptr, MTL(ObTableLockService*)); + + txs = MTL(ObTransService*); + ASSERT_NE(nullptr, txs); + ASSERT_EQ(OB_SUCCESS, txs->acquire_tx(tx_desc1)); + + // 1. lock out_trans + LOG_INFO("ObTableLockServiceTest::replace_all_locks 1"); + get_table_id("t_one_part", table_id); + lock_arg.table_id_ = table_id; + lock_arg.owner_id_ = owner_one; + lock_arg.lock_mode_ = lock_mode_one; + lock_arg.op_type_ = OUT_TRANS_LOCK; + lock_arg.timeout_us_ = 0; + + unlock_arg1.table_id_ = table_id; + unlock_arg1.owner_id_ = owner_one; + unlock_arg1.lock_mode_ = lock_mode_one; + unlock_arg1.op_type_ = OUT_TRANS_UNLOCK; + unlock_arg1.timeout_us_ = 0; + + ret = MTL(ObTableLockService*)->lock(*tx_desc1, + tx_param, + lock_arg); + ASSERT_EQ(OB_SUCCESS, ret); + // 2. check lock + LOG_INFO("ObTableLockServiceTest::replace_all_locks 2"); + ret = MTL(ObTableLockService*)->lock_table(table_id, + check_lock_mode, + owner_two); + ASSERT_EQ(OB_EAGAIN, ret); + + // 3. commit lock + LOG_INFO("ObTableLockServiceTest::replace_all_locks 3"); + stmt_timeout_ts = ObTimeUtility::current_time() + 1000 * 1000; + ret = txs->commit_tx(*tx_desc1, stmt_timeout_ts); + ASSERT_EQ(OB_SUCCESS, ret); + ret = txs->release_tx(*tx_desc1); + ASSERT_EQ(OB_SUCCESS, ret); + + // 4. lock table and commit with another owenr and mode + LOG_INFO("ObTableLockServiceTest::replace_all_locks 4"); + ASSERT_EQ(OB_SUCCESS, txs->acquire_tx(tx_desc2)); + lock_arg.owner_id_ = owner_two; + lock_arg.lock_mode_ = lock_mode_two; + + unlock_arg2.table_id_ = table_id; + unlock_arg2.owner_id_ = owner_two; + unlock_arg2.lock_mode_ = lock_mode_two; + unlock_arg2.op_type_ = OUT_TRANS_UNLOCK; + unlock_arg2.timeout_us_ = 0; + + ret = MTL(ObTableLockService*)->lock(*tx_desc2, + tx_param, + lock_arg); + ASSERT_EQ(OB_SUCCESS, ret); + + ret = txs->commit_tx(*tx_desc2, stmt_timeout_ts); + ASSERT_EQ(OB_SUCCESS, ret); + ret = txs->release_tx(*tx_desc2); + ASSERT_EQ(OB_SUCCESS, ret); + + // 5. replace lock + LOG_INFO("ObTableLockServiceTest::replace_all_locks 5"); + ObArenaAllocator allocator; + ObReplaceAllLocksRequest replace_lock_arg(allocator); + new_lock_arg.table_id_ = table_id; + new_lock_arg.owner_id_ = owner_three; + new_lock_arg.lock_mode_ = EXCLUSIVE; + new_lock_arg.op_type_ = OUT_TRANS_LOCK; + new_lock_arg.timeout_us_ = 0; + + replace_lock_arg.lock_req_ = &new_lock_arg; + replace_lock_arg.unlock_req_list_.push_back(&unlock_arg1); + replace_lock_arg.unlock_req_list_.push_back(&unlock_arg2); + ASSERT_EQ(OB_SUCCESS, txs->acquire_tx(tx_desc3)); + + ret = MTL(ObTableLockService*)->replace_lock(*tx_desc3, + tx_param, + replace_lock_arg); + ASSERT_EQ(OB_SUCCESS, ret); + // 6. commit rplace lock + LOG_INFO("ObTableLockServiceTest::replace_all_locks 6"); + stmt_timeout_ts = ObTimeUtility::current_time() + 1000 * 1000; + ret = txs->commit_tx(*tx_desc3, stmt_timeout_ts); + ASSERT_EQ(OB_SUCCESS, ret); + ret = txs->release_tx(*tx_desc3); + ASSERT_EQ(OB_SUCCESS, ret); + + // 7. try to lock by origin owner + LOG_INFO("ObTableLockServiceTest::replace_all_locks 7"); + ret = MTL(ObTableLockService*)->lock_table(table_id, + ROW_SHARE, + owner_one); + ASSERT_EQ(OB_EAGAIN, ret); + + // 8. check new owner + LOG_INFO("ObTableLockServiceTest::replace_all_locks 8"); + ret = MTL(ObTableLockService*)->lock_table(table_id, + EXCLUSIVE, + owner_three); + ASSERT_EQ(OB_SUCCESS, ret); + + // 9. unlock + LOG_INFO("ObTableLockServiceTest::replace_all_locks 9"); + ASSERT_EQ(OB_SUCCESS, txs->acquire_tx(tx_desc4)); + unlock_arg2.owner_id_ = owner_three; + unlock_arg2.lock_mode_ = EXCLUSIVE; + ret = MTL(ObTableLockService *)->unlock(*tx_desc4, tx_param, unlock_arg2); + ASSERT_EQ(OB_SUCCESS, ret); + + stmt_timeout_ts = ObTimeUtility::current_time() + 1000 * 1000; + ret = txs->commit_tx(*tx_desc4, stmt_timeout_ts); + ASSERT_EQ(OB_SUCCESS, ret); + ret = txs->release_tx(*tx_desc4); + ASSERT_EQ(OB_SUCCESS, ret); + + // 10. try to lock by origin owner + LOG_INFO("ObTableLockServiceTest::replace_all_locks 10"); + ret = MTL(ObTableLockService*)->lock_table(table_id, + check_lock_mode, + owner_one); + ASSERT_EQ(OB_SUCCESS, ret); + + // 11. check new owner + LOG_INFO("ObTableLockServiceTest::replace_all_locks 11"); + ret = MTL(ObTableLockService*)->lock_table(table_id, + check_lock_mode, + owner_three); + ASSERT_EQ(OB_EAGAIN, ret); + + // 12. unlock by origin owner + LOG_INFO("ObTableLockServiceTest::replace_all_locks 12"); + ret = MTL(ObTableLockService*)->unlock_table(table_id, + check_lock_mode, + owner_one); + ASSERT_EQ(OB_SUCCESS, ret); +} }// end unittest } // end oceanbase @@ -1900,7 +2072,7 @@ TEST_F(ObTableLockServiceTest, replace_lock_and_unlock_concurrency) int main(int argc, char **argv) { oceanbase::unittest::init_log_and_gtest(argc, argv); - OB_LOGGER.set_log_level(OB_LOG_LEVEL_ERROR); + OB_LOGGER.set_log_level(OB_LOG_LEVEL_INFO); OB_LOGGER.set_mod_log_levels("STORAGE.TABLELOCK:DEBUG"); OB_LOGGER.set_enable_async_log(false); ::testing::InitGoogleTest(&argc, argv); diff --git a/src/observer/ob_inner_sql_rpc_processor.cpp b/src/observer/ob_inner_sql_rpc_processor.cpp index e2873efee..1cb8775da 100644 --- a/src/observer/ob_inner_sql_rpc_processor.cpp +++ b/src/observer/ob_inner_sql_rpc_processor.cpp @@ -420,7 +420,8 @@ int ObInnerSqlRpcP::process() case ObInnerSQLTransmitArg::OPERATION_TYPE_UNLOCK_ALONE_TABLET: case ObInnerSQLTransmitArg::OPERATION_TYPE_LOCK_OBJS: case ObInnerSQLTransmitArg::OPERATION_TYPE_UNLOCK_OBJS: - case ObInnerSQLTransmitArg::OPERATION_TYPE_REPLACE_LOCK: { + case ObInnerSQLTransmitArg::OPERATION_TYPE_REPLACE_LOCK: + case ObInnerSQLTransmitArg::OPERATION_TYPE_REPLACE_LOCKS: { if (OB_FAIL(ObInnerConnectionLockUtil::process_lock_rpc(transmit_arg, conn))) { LOG_WARN("process lock rpc failed", K(ret), K(transmit_arg.get_operation_type())); } diff --git a/src/observer/ob_inner_sql_rpc_proxy.h b/src/observer/ob_inner_sql_rpc_proxy.h index 23cdc0ed4..ad0723fb1 100644 --- a/src/observer/ob_inner_sql_rpc_proxy.h +++ b/src/observer/ob_inner_sql_rpc_proxy.h @@ -53,6 +53,7 @@ public: OPERATION_TYPE_LOCK_OBJS = 19, OPERATION_TYPE_UNLOCK_OBJS = 20, OPERATION_TYPE_REPLACE_LOCK = 21, + OPERATION_TYPE_REPLACE_LOCKS = 22, OPERATION_TYPE_MAX = 100 }; diff --git a/src/storage/tablelock/ob_lock_inner_connection_util.cpp b/src/storage/tablelock/ob_lock_inner_connection_util.cpp index 12bec3aa5..13be69cf8 100644 --- a/src/storage/tablelock/ob_lock_inner_connection_util.cpp +++ b/src/storage/tablelock/ob_lock_inner_connection_util.cpp @@ -161,7 +161,13 @@ int ObInnerConnectionLockUtil::process_lock_rpc( } case ObInnerSQLTransmitArg::OPERATION_TYPE_REPLACE_LOCK: { if (OB_FAIL(process_replace_lock_(arg, inner_conn))) { - LOG_WARN("process lock table failed", K(ret)); + LOG_WARN("process replace lock failed", K(ret)); + } + break; + } + case ObInnerSQLTransmitArg::OPERATION_TYPE_REPLACE_LOCKS: { + if (OB_FAIL(process_replace_all_locks_(arg, inner_conn))) { + LOG_WARN("process replace all locks failed", K(ret)); } break; } @@ -288,6 +294,28 @@ int ObInnerConnectionLockUtil::process_replace_lock_( return ret; } +int ObInnerConnectionLockUtil::process_replace_all_locks_( + const ObInnerSQLTransmitArg &arg, + observer::ObInnerSQLConnection *conn) +{ + int ret = OB_SUCCESS; + ObArenaAllocator allocator; + ObReplaceAllLocksRequest replace_req(allocator); + const char *buf = arg.get_inner_sql().ptr(); + const int64_t data_len = arg.get_inner_sql().length(); + int64_t pos = 0; + if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_3_4_0) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("cluster version check faild", KR(ret), K(GET_MIN_CLUSTER_VERSION())); + } else if (OB_FAIL(replace_req.deserialize(buf, data_len, pos))) { + LOG_WARN("deserialize and check header of ObReplaceLockRequest failed", K(ret), K(arg), K(pos)); + } else if (OB_FAIL(replace_lock(arg.get_tenant_id(), replace_req, conn))) { + LOG_WARN("replace all locks failed", K(ret), K(replace_req)); + } + replace_req.reset(); + return ret; +} + int ObInnerConnectionLockUtil::lock_table( const uint64_t tenant_id, const uint64_t table_id, @@ -594,6 +622,61 @@ int ObInnerConnectionLockUtil::replace_lock( return ret; } +int ObInnerConnectionLockUtil::replace_lock(const uint64_t tenant_id, + const ObReplaceAllLocksRequest &req, + observer::ObInnerSQLConnection *conn) +{ + int ret = OB_SUCCESS; + observer::ObReqTimeGuard req_timeinfo_guard; + + const bool local_execute = conn->is_local_execute(GCONF.cluster_id, tenant_id); + + SMART_VAR(ObInnerSQLResult, res, conn->get_session()) + { + if (OB_INVALID_ID == tenant_id) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(tenant_id)); + } else if (local_execute) { + if (OB_FAIL(conn->switch_tenant(tenant_id))) { + LOG_WARN("set system tenant id failed", K(ret), K(tenant_id)); + } + } else { + LOG_DEBUG("tenant not in server", K(ret), K(tenant_id)); + } + + if (OB_SUCC(ret)) { + if (!conn->is_in_trans()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("inner conn must be already in trans", K(ret)); + } else if (OB_FAIL(res.init(local_execute))) { + LOG_WARN("init result set", K(ret), K(local_execute)); + } else if (local_execute) { + if (OB_FAIL(replace_lock_(tenant_id, req, conn, res))) { + LOG_WARN("replace lock failed", KR(ret), K(req)); + } + } else { + char *tmp_str = nullptr; + int64_t pos = 0; + ObString sql; + if (OB_ISNULL(tmp_str = static_cast(ob_malloc(req.get_serialize_size(), "InnerLock")))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("alloc memory for sql_str failed", K(ret), K(req.get_serialize_size())); + } else if (OB_FAIL(req.serialize(tmp_str, req.get_serialize_size(), pos))) { + LOG_WARN("serialize replace lock table arg failed", K(ret), K(req)); + } else { + sql.assign_ptr(tmp_str, pos); + ret = conn->forward_request(tenant_id, obrpc::ObInnerSQLTransmitArg::OPERATION_TYPE_REPLACE_LOCKS, sql, res); + } + + if (OB_NOT_NULL(tmp_str)) { + ob_free(tmp_str); + } + } + } + } + return ret; +} + int ObInnerConnectionLockUtil::replace_lock_( const uint64_t tenant_id, const ObReplaceLockRequest &req, @@ -628,6 +711,40 @@ int ObInnerConnectionLockUtil::replace_lock_( return ret; } +int ObInnerConnectionLockUtil::replace_lock_( + const uint64_t tenant_id, + const ObReplaceAllLocksRequest &req, + observer::ObInnerSQLConnection *conn, + observer::ObInnerSQLResult &res) +{ + int ret = OB_SUCCESS; + transaction::ObTxDesc *tx_desc = nullptr; + + if (OB_ISNULL(conn)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("Invalid conn", KR(ret)); + } else if (OB_ISNULL(tx_desc = conn->get_session().get_tx_desc())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("Invalid tx_desc"); + } else { + transaction::ObTxParam tx_param; + tx_param.access_mode_ = transaction::ObTxAccessMode::RW; + tx_param.isolation_ = conn->get_session().get_tx_isolation(); + tx_param.cluster_id_ = GCONF.cluster_id; + conn->get_session().get_tx_timeout(tx_param.timeout_us_); + tx_param.lock_timeout_us_ = conn->get_session().get_trx_lock_timeout(); + + MTL_SWITCH(tenant_id) { + if (OB_FAIL(MTL(ObTableLockService *)->replace_lock(*tx_desc, tx_param, req))) { + LOG_WARN("replace lock failed", K(ret), K(tenant_id), K(req)); + } else if (OB_FAIL(res.close())) { + LOG_WARN("close result set failed", K(ret), K(tenant_id)); + } + } + } + return ret; +} + int ObInnerConnectionLockUtil::create_inner_conn(sql::ObSQLSessionInfo *session_info, common::ObMySQLProxy *sql_proxy, observer::ObInnerSQLConnection *&inner_conn) diff --git a/src/storage/tablelock/ob_lock_inner_connection_util.h b/src/storage/tablelock/ob_lock_inner_connection_util.h index 8a813768c..9fb372509 100644 --- a/src/storage/tablelock/ob_lock_inner_connection_util.h +++ b/src/storage/tablelock/ob_lock_inner_connection_util.h @@ -68,9 +68,9 @@ private: const obrpc::ObInnerSQLTransmitArg::InnerSQLOperationType operation_type, const obrpc::ObInnerSQLTransmitArg &arg, observer::ObInnerSQLConnection *conn); - static int process_replace_lock_(const obrpc::ObInnerSQLTransmitArg &arg, - observer::ObInnerSQLConnection *conn); -// --------------------- interface for inner connection client ----------------------- + static int process_replace_lock_(const obrpc::ObInnerSQLTransmitArg &arg, observer::ObInnerSQLConnection *conn); + static int process_replace_all_locks_(const obrpc::ObInnerSQLTransmitArg &arg, observer::ObInnerSQLConnection *conn); + // --------------------- interface for inner connection client ----------------------- public: static int lock_table( const uint64_t tenant_id, @@ -152,6 +152,10 @@ public: const uint64_t tenant_id, const ObReplaceLockRequest &req, observer::ObInnerSQLConnection *conn); + static int replace_lock( + const uint64_t tenant_id, + const ObReplaceAllLocksRequest &req, + observer::ObInnerSQLConnection *conn); static int create_inner_conn(sql::ObSQLSessionInfo *session_info, common::ObMySQLProxy *sql_proxy, observer::ObInnerSQLConnection *&inner_conn); @@ -167,6 +171,11 @@ private: const ObReplaceLockRequest &req, observer::ObInnerSQLConnection *conn, observer::ObInnerSQLResult &res); + static int replace_lock_( + const uint64_t tenant_id, + const ObReplaceAllLocksRequest &req, + observer::ObInnerSQLConnection *conn, + observer::ObInnerSQLResult &res); static int do_obj_lock_( const uint64_t tenant_id, const ObLockRequest &arg, diff --git a/src/storage/tablelock/ob_table_lock_rpc_struct.cpp b/src/storage/tablelock/ob_table_lock_rpc_struct.cpp index 787cb4da5..6ec4632b5 100644 --- a/src/storage/tablelock/ob_table_lock_rpc_struct.cpp +++ b/src/storage/tablelock/ob_table_lock_rpc_struct.cpp @@ -21,6 +21,14 @@ namespace transaction { namespace tablelock { +#define GET_REAL_TYPE_LOCK_REQ(T, allocator, lock_req) \ + void *ptr = static_cast(allocator.alloc(sizeof(T))); \ + if (OB_ISNULL(ptr)) { \ + ret = OB_ALLOCATE_MEMORY_FAILED; \ + LOG_WARN("failed to allocate lock_req", KR(ret)); \ + } else { \ + lock_req = new (ptr) T(); \ + } OB_SERIALIZE_MEMBER(ObLockParam, lock_id_, @@ -138,6 +146,150 @@ OB_DEF_DESERIALIZE(ObTableLockTaskRequest) return ret; } +OB_DEF_SERIALIZE_SIZE(ObReplaceAllLocksRequest) +{ + int ret = OB_SUCCESS; + int64_t len = 0; + int64_t count = 0; + if (OB_ISNULL(lock_req_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("lock_req should not be null", K(ret), KP(lock_req_)); + } else { + OB_UNIS_ADD_LEN(*lock_req_); + count = unlock_req_list_.count(); + OB_UNIS_ADD_LEN(count); + for (int64_t i = 0; i < count && OB_SUCC(ret); i++) { + if (OB_ISNULL(unlock_req_list_.at(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unlock_req should not be null", K(ret)); + } else { + OB_UNIS_ADD_LEN(*unlock_req_list_.at(i)); + } + } + } + return len; +} + +OB_DEF_SERIALIZE(ObReplaceAllLocksRequest) +{ + int ret = OB_SUCCESS; + int64_t count = 0; + if (OB_ISNULL(lock_req_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("lock_req should not be null", K(ret), KP(lock_req_)); + } else { + OB_UNIS_ENCODE(*lock_req_); + count = unlock_req_list_.count(); + OB_UNIS_ENCODE(count); + for (int64_t i = 0; i < count && OB_SUCC(ret); i++) { + if (OB_ISNULL(unlock_req_list_.at(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unlock_req should not be null", K(ret)); + } else { + OB_UNIS_ENCODE(*unlock_req_list_.at(i)); + } + } + } + return ret; +} + +OB_DEF_DESERIALIZE(ObReplaceAllLocksRequest) +{ + int ret = OB_SUCCESS; + ObLockRequest lock_req; + int64_t cnt = 0; + int64_t tmp_pos = 0; + void *ptr = nullptr; + ObUnLockRequest unlock_req; + + if (OB_FAIL(common::serialization::decode(buf, data_len, tmp_pos, lock_req))) { + LOG_WARN("deserialize lock_req failed", K(ret), K(data_len), K(tmp_pos), KPHEX(buf, data_len)); + } else { + switch(lock_req.type_) { + case ObLockRequest::ObLockMsgType::LOCK_OBJ_REQ: { + GET_REAL_TYPE_LOCK_REQ(ObLockObjsRequest, allocator_, lock_req_); + break; + } + case ObLockRequest::ObLockMsgType::LOCK_TABLE_REQ: { + GET_REAL_TYPE_LOCK_REQ(ObLockTableRequest, allocator_, lock_req_); + break; + } + case ObLockRequest::ObLockMsgType::LOCK_PARTITION_REQ: { + GET_REAL_TYPE_LOCK_REQ(ObLockPartitionRequest, allocator_, lock_req_); + break; + } + case ObLockRequest::ObLockMsgType::LOCK_TABLET_REQ: { + GET_REAL_TYPE_LOCK_REQ(ObLockTabletsRequest, allocator_, lock_req_); + break; + } + case ObLockRequest::ObLockMsgType::LOCK_ALONE_TABLET_REQ: { + GET_REAL_TYPE_LOCK_REQ(ObLockAloneTabletRequest, allocator_, lock_req_); + break; + } + default: { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("meet unexpected type of lock_req in ObReplaceAllLocksRequest", K(lock_req)); + } + } + } + + if (OB_FAIL(ret)) { + } else if (OB_FAIL(lock_req_->deserialize(buf, data_len, pos))) { + LOG_WARN("deserialize for lock_req failed", K(ret)); + allocator_.free(lock_req_); + lock_req_ = nullptr; + } else { + OB_UNIS_DECODE(cnt); + for (int64_t i = 0; i < cnt && OB_SUCC(ret); i++) { + unlock_req.reset(); + tmp_pos = pos; + ObLockRequest *unlock_req_ptr = nullptr; + if (OB_FAIL(common::serialization::decode(buf, data_len, tmp_pos, unlock_req))) { + LOG_WARN("deserialize lock_req failed", K(ret)); + } else { + switch (unlock_req.type_) { + case ObLockRequest::ObLockMsgType::UNLOCK_OBJ_REQ: { + GET_REAL_TYPE_LOCK_REQ(ObUnLockObjsRequest, allocator_, unlock_req_ptr); + break; + } + case ObLockRequest::ObLockMsgType::UNLOCK_TABLE_REQ: { + GET_REAL_TYPE_LOCK_REQ(ObUnLockTableRequest, allocator_, unlock_req_ptr); + break; + } + case ObLockRequest::ObLockMsgType::UNLOCK_PARTITION_REQ: { + GET_REAL_TYPE_LOCK_REQ(ObLockPartitionRequest, allocator_, unlock_req_ptr); + break; + } + case ObLockRequest::ObLockMsgType::UNLOCK_TABLET_REQ: { + GET_REAL_TYPE_LOCK_REQ(ObLockTabletsRequest, allocator_, unlock_req_ptr); + break; + } + case ObLockRequest::ObLockMsgType::UNLOCK_ALONE_TABLET_REQ: { + GET_REAL_TYPE_LOCK_REQ(ObLockAloneTabletRequest, allocator_, unlock_req_ptr); + break; + } + default: { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("meet unexpected type of unlock_req in ObReplaceAllLocksRequest", K(unlock_req)); + } + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(unlock_req_ptr->deserialize(buf, data_len, pos))) { + LOG_WARN("deserialize for unlock_req failed", K(ret)); + allocator_.free(unlock_req_ptr); + } else if (OB_FAIL(unlock_req_list_.push_back(unlock_req_ptr))) { + LOG_WARN("push unlock_req into list failed", K(ret)); + allocator_.free(unlock_req_ptr); + } + } + } + if (OB_FAIL(ret)) { + reset(); + } + return ret; +} + int TxDescHelper::deserialize_tx_desc(DESERIAL_PARAMS, ObTxDesc *&tx_desc) { int ret = OB_SUCCESS; @@ -187,7 +339,8 @@ int ObLockParam::set( const int64_t schema_version, const bool is_deadlock_avoid_enabled, const bool is_try_lock, - const int64_t expired_time) + const int64_t expired_time, + const bool is_for_replace) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!lock_id.is_valid()) || @@ -205,6 +358,7 @@ int ObLockParam::set( is_try_lock_ = is_try_lock; expired_time_ = expired_time; schema_version_ = schema_version; + is_for_replace_ = is_for_replace; } return ret; } @@ -619,6 +773,46 @@ int ObReplaceLockRequest::deserialize_new_lock_mode_and_owner(DESERIAL_PARAMS) return ret; } +void ObReplaceAllLocksRequest::reset() +{ + if (OB_NOT_NULL(lock_req_)) { + allocator_.free(lock_req_); + lock_req_ = nullptr; + } + for (int64_t i = 0; i < unlock_req_list_.count(); i++) { + ObLockRequest *unlock_req_ptr = unlock_req_list_.at(i); + if (OB_NOT_NULL(unlock_req_ptr)) { + allocator_.free(unlock_req_ptr); + } + } +} + +bool ObReplaceAllLocksRequest::is_valid() const +{ + bool is_valid = false; + if (OB_NOT_NULL(lock_req_)) { + is_valid = lock_req_->is_valid(); + } else { + is_valid = false; + } + + if (is_valid) { + if (unlock_req_list_.count() == 0) { + is_valid = false; + } else { + for (int64_t i = 0; i < unlock_req_list_.count() && is_valid; i++) { + ObLockRequest *unlock_req_ptr = unlock_req_list_.at(i); + if (OB_NOT_NULL(unlock_req_ptr)) { + is_valid = is_valid && unlock_req_ptr->is_valid(); + } else { + is_valid = false; + } + } + } + } + return is_valid; +} + int ObTableLockTaskRequest::set( const ObTableLockTaskType task_type, const share::ObLSID &lsid, diff --git a/src/storage/tablelock/ob_table_lock_rpc_struct.h b/src/storage/tablelock/ob_table_lock_rpc_struct.h index 9c3e67eac..18eb288b3 100644 --- a/src/storage/tablelock/ob_table_lock_rpc_struct.h +++ b/src/storage/tablelock/ob_table_lock_rpc_struct.h @@ -63,6 +63,7 @@ enum ObTableLockTaskType REPLACE_LOCK_SUBPARTITION = 21, REPLACE_LOCK_OBJECTS = 22, REPLACE_LOCK_ALONE_TABLET = 23, + REPLACE_ALL_LOCKS = 24, MAX_TASK_TYPE, }; @@ -111,15 +112,15 @@ public: {} virtual ~ObLockParam() { reset(); } void reset(); - int set( - const ObLockID &lock_id, - const ObTableLockMode lock_mode, - const ObTableLockOwnerID &owner_id, - const ObTableLockOpType op_type, - const int64_t schema_version, - const bool is_deadlock_avoid_enabled = false, - const bool is_try_lock = true, - const int64_t expired_time = 0); + int set(const ObLockID &lock_id, + const ObTableLockMode lock_mode, + const ObTableLockOwnerID &owner_id, + const ObTableLockOpType op_type, + const int64_t schema_version, + const bool is_deadlock_avoid_enabled = false, + const bool is_try_lock = true, + const int64_t expired_time = 0, + const bool is_for_replace = false); bool is_valid() const; TO_STRING_KV(K_(lock_id), K_(lock_mode), @@ -429,7 +430,6 @@ struct ObReplaceLockRequest { OB_UNIS_VERSION_V(1); public: - public: ObReplaceLockRequest() : new_lock_mode_(MAX_LOCK_MODE), new_lock_owner_(), unlock_req_(nullptr) {} @@ -446,6 +446,26 @@ public: ObLockRequest *unlock_req_; }; +struct ObReplaceAllLocksRequest +{ + OB_UNIS_VERSION_V(1); +public: + ObReplaceAllLocksRequest(common::ObIAllocator &allocator) : + lock_req_(nullptr), unlock_req_list_(), allocator_(allocator) + {} + ~ObReplaceAllLocksRequest() { reset(); } + void reset(); + bool is_valid() const; + int deserialize_and_check_header(DESERIAL_PARAMS); + int deserialize_new_lock_mode_and_owner(DESERIAL_PARAMS); + VIRTUAL_TO_STRING_KV(K_(lock_req), K_(unlock_req_list)); + +public: + ObLockRequest *lock_req_; + ObSArray unlock_req_list_; + common::ObIAllocator &allocator_; +}; + class ObTableLockTaskRequest final { OB_UNIS_VERSION(1); diff --git a/src/storage/tablelock/ob_table_lock_service.cpp b/src/storage/tablelock/ob_table_lock_service.cpp index 088b764b5..bf6d6268c 100644 --- a/src/storage/tablelock/ob_table_lock_service.cpp +++ b/src/storage/tablelock/ob_table_lock_service.cpp @@ -60,7 +60,8 @@ ObTableLockService::ObTableLockCtx::ObTableLockCtx() : lock_owner_(), schema_version_(-1), tx_is_killed_(false), - is_from_sql_(false) + is_from_sql_(false), + is_for_replace_(false) {} void ObTableLockService::ObRetryCtx::reuse() @@ -798,7 +799,8 @@ int ObTableLockService::lock_partition_or_subpartition(ObTxDesc &tx_desc, int ObTableLockService::lock(ObTxDesc &tx_desc, const ObTxParam &tx_param, - const ObLockRequest &arg) + const ObLockRequest &arg, + const bool is_for_replace) { int ret = OB_SUCCESS; @@ -816,6 +818,9 @@ int ObTableLockService::lock(ObTxDesc &tx_desc, if (OB_FAIL(ctx.set_by_lock_req(arg))) { LOG_WARN("set ObTableLockCtx failed", K(ret), K(arg)); } else { + if (is_for_replace) { + ctx.is_for_replace_ = true; + } ctx.is_in_trans_ = true; ctx.tx_desc_ = &tx_desc; ctx.tx_param_ = tx_param; @@ -857,8 +862,7 @@ int ObTableLockService::replace_lock(ObTxDesc &tx_desc, ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(tx_desc), K(replace_req), K(tx_desc.is_valid()), K(tx_param.is_valid()), K(replace_req.is_valid())); - // FIXME: change to 431 later - } else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_3_0_0))) { + } else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_3_4_0))) { LOG_WARN("cluster version check failed", K(ret), K(replace_req)); } else { ObReplaceTableLockCtx ctx; @@ -879,6 +883,37 @@ int ObTableLockService::replace_lock(ObTxDesc &tx_desc, return ret; } +int ObTableLockService::replace_lock(ObTxDesc &tx_desc, + const ObTxParam &tx_param, + const ObReplaceAllLocksRequest &replace_req) +{ + int ret = OB_SUCCESS; + + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("lock service is not inited", K(ret)); + } else if (OB_UNLIKELY(!tx_desc.is_valid()) || + OB_UNLIKELY(!tx_param.is_valid()) || + OB_UNLIKELY(!replace_req.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(tx_desc), K(replace_req), K(tx_desc.is_valid()), + K(tx_param.is_valid()), K(replace_req.is_valid())); + } else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_3_4_0))) { + LOG_WARN("cluster version check failed", K(ret), K(replace_req)); + } else { + for (int64_t i = 0; i < replace_req.unlock_req_list_.count() && OB_SUCC(ret); i++) { + if (OB_FAIL(unlock(tx_desc, tx_param, *replace_req.unlock_req_list_.at(i)))) { + LOG_WARN("unlock in replace failed", K(ret), K(replace_req)); + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(lock(tx_desc, tx_param, *replace_req.lock_req_, true))){ + LOG_WARN("lock in replace failed", K(ret), K(replace_req)); + } + } + return ret; +} + int ObTableLockService::garbage_collect_right_now() { int ret = OB_SUCCESS; @@ -1637,9 +1672,6 @@ int ObTableLockService::pack_batch_request_(ObTableLockCtx &ctx, ObLockParam lock_param; if (OB_FAIL(request.init(task_type, ls_id, ctx.tx_desc_))) { LOG_WARN("request init failed", K(ret), K(ctx), K(ls_id), KP(ctx.tx_desc_), K(lock_ids), K(task_type)); - } else if (ctx.is_replace_task()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("lock_param is not compatible with request", K(ret), K(ctx), K(task_type)); } else { for (int i = 0; i < lock_ids.count() && OB_SUCC(ret); ++i) { lock_param.reset(); @@ -1650,7 +1682,8 @@ int ObTableLockService::pack_batch_request_(ObTableLockCtx &ctx, ctx.schema_version_, ctx.is_deadlock_avoid_enabled(), ctx.is_try_lock(), - ctx.abs_timeout_ts_))) { + ctx.abs_timeout_ts_, + ctx.is_for_replace_))) { LOG_WARN("get lock param failed", K(ret)); } else if (OB_FAIL(request.params_.push_back(lock_param))) { LOG_WARN("get lock request failed", K(ret), K(lock_param)); @@ -1809,7 +1842,7 @@ int ObTableLockService::pack_and_call_rpc_(RpcProxy &proxy_batch, int ret = OB_SUCCESS; ObLockTaskBatchRequest request; if (OB_FAIL(pack_batch_request_(ctx, ctx.task_type_, ls_id, lock_ids, request))) { - LOG_WARN("pack_request_ failed", K(ret), K(ls_id), K(ctx), K(lock_ids)); + LOG_WARN("pack_batch_request_ failed", K(ret), K(ls_id), K(ctx), K(lock_ids)); } else if (OB_FAIL(rpc_call_(proxy_batch, addr, ctx.get_rpc_timeoutus(), request))) { LOG_WARN("failed to call async rpc", KR(ret), K(addr), K(ctx.abs_timeout_ts_), K(request)); } else { @@ -1831,7 +1864,7 @@ int ObTableLockService::pack_and_call_rpc_(obrpc::ObBatchReplaceLockProxy &proxy int ret = OB_SUCCESS; ObLockTaskBatchRequest request; if (OB_FAIL(pack_batch_request_(ctx, ctx.task_type_, ls_id, lock_ids, request))) { - LOG_WARN("pack_request_ failed", K(ret), K(ls_id), K(ctx), K(lock_ids)); + LOG_WARN("pack_batch_request_ failed", K(ret), K(ls_id), K(ctx), K(lock_ids)); } else if (OB_FAIL(rpc_call_(proxy_batch, addr, ctx.get_rpc_timeoutus(), request))) { LOG_WARN("failed to call async rpc", KR(ret), K(addr), K(ctx.abs_timeout_ts_), K(request)); } else { @@ -1923,54 +1956,6 @@ int ObTableLockService::batch_rpc_handle_(RpcProxy &proxy_batch, return ret; } -template -int ObTableLockService::parallel_rpc_handle_(RpcProxy &proxy_batch, - ObTableLockCtx &ctx, - const LockMap &lock_map, - const ObLSLockMap &ls_lock_map) -{ - int ret = OB_SUCCESS; - int64_t timeout_us = 0; - bool unused = false; - ObRetryCtx retry_ctx; - ObAddr addr; - ObTableLockTaskRequest request; - FOREACH_X(lock, lock_map, OB_SUCC(ret)) { - proxy_batch.reuse(); - retry_ctx.reuse(); - addr.reset(); - request.reset(); - const ObLockID &lock_id = lock->first; - const share::ObLSID &ls_id = lock->second; - - if (OB_FAIL(retry_ctx.rpc_ls_array_.push_back(ls_id))) { - LOG_WARN("push_back lsid failed", K(ret), K(ls_id)); - } else if (OB_FAIL(pack_request_(ctx, ctx.task_type_, lock_id, ls_id, addr, request))) { - LOG_WARN("pack_request_ failed", K(ret), K(ls_id), K(lock_id)); - } else if (FALSE_IT(timeout_us = ctx.get_rpc_timeoutus())) { - } else if (ctx.is_timeout()) { - ret = OB_TIMEOUT; - LOG_WARN("process obj lock timeout", K(ret), K(ctx)); - } else if (OB_FAIL(rpc_call_(proxy_batch, - addr, - timeout_us, - request))) { - LOG_WARN("failed to all async rpc", KR(ret), K(addr), K(timeout_us), K(request)); - } else { - retry_ctx.send_rpc_count_++; - ALLOW_NEXT_LOG(); - LOG_INFO("send table lock rpc", KR(ret), K(retry_ctx.send_rpc_count_), - K(addr), "request", request); - } - if (OB_FAIL(ret)) { - (void)collect_rollback_info_(retry_ctx.rpc_ls_array_, proxy_batch, ctx); - } else { - ret = handle_parallel_rpc_response_(proxy_batch, ctx, ls_lock_map, unused, retry_ctx); - } - } - return ret; -} - int ObTableLockService::inner_process_obj_lock_batch_(ObTableLockCtx &ctx, const ObLSLockMap &lock_map) { @@ -1991,36 +1976,12 @@ int ObTableLockService::inner_process_obj_lock_batch_(ObTableLockCtx &ctx, return ret; } -int ObTableLockService::inner_process_obj_lock_old_version_(ObTableLockCtx &ctx, - const LockMap &lock_map, - const ObLSLockMap &ls_lock_map) -{ - int ret = OB_SUCCESS; - obrpc::ObSrvRpcProxy rpc_proxy(*GCTX.srv_rpc_proxy_); - rpc_proxy.set_detect_session_killed(true); - // TODO: yanyuan.cxf we process the rpc one by one and do parallel later. - if (ctx.is_unlock_task()) { - ObHighPriorityTableLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::unlock_table); - ret = parallel_rpc_handle_(proxy_batch, ctx, lock_map, ls_lock_map); - } else { - ObTableLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::lock_table); - ret = parallel_rpc_handle_(proxy_batch, ctx, lock_map, ls_lock_map); - } - - return ret; -} - int ObTableLockService::inner_process_obj_lock_(ObTableLockCtx &ctx, const LockMap &lock_map, const ObLSLockMap &ls_lock_map) { int ret = OB_SUCCESS; - if (GET_MIN_CLUSTER_VERSION() > CLUSTER_VERSION_4_0_0_0) { - ret = inner_process_obj_lock_batch_(ctx, ls_lock_map); - } else { - ret = inner_process_obj_lock_old_version_(ctx, lock_map, ls_lock_map); - } - + ret = inner_process_obj_lock_batch_(ctx, ls_lock_map); return ret; } diff --git a/src/storage/tablelock/ob_table_lock_service.h b/src/storage/tablelock/ob_table_lock_service.h index 1e136c95e..4fe5e1ef3 100644 --- a/src/storage/tablelock/ob_table_lock_service.h +++ b/src/storage/tablelock/ob_table_lock_service.h @@ -122,6 +122,7 @@ private: // use to kill the whole lock table stmt. transaction::ObTxSEQ stmt_savepoint_; + bool is_for_replace_; TO_STRING_KV(K(task_type_), K(is_in_trans_), K(table_id_), K(partition_id_), K(tablet_list_), K(obj_list_), K(lock_op_type_), @@ -130,7 +131,8 @@ private: K(current_savepoint_), K(need_rollback_ls_), K(lock_mode_), K(lock_owner_), K(schema_version_), K(tx_is_killed_), - K(is_from_sql_), K(ret_code_before_end_stmt_or_tx_), K(stmt_savepoint_)); + K(is_from_sql_), K(ret_code_before_end_stmt_or_tx_), + K(stmt_savepoint_), K_(is_for_replace)); }; class ObReplaceTableLockCtx : public ObTableLockCtx @@ -260,7 +262,8 @@ public: ObLockPartitionRequest &arg); int lock(ObTxDesc &tx_desc, const ObTxParam &tx_param, - const ObLockRequest &arg); + const ObLockRequest &arg, + const bool is_for_replace = false); int unlock(ObTxDesc &tx_desc, const ObTxParam &tx_param, const ObUnLockRequest &arg); @@ -268,6 +271,9 @@ public: int replace_lock(ObTxDesc &tx_desc, const ObTxParam &tx_param, const ObReplaceLockRequest &replace_req); + int replace_lock(ObTxDesc &tx_desc, + const ObTxParam &tx_param, + const ObReplaceAllLocksRequest &replace_req); int garbage_collect_right_now(); int get_obj_lock_garbage_collector(ObOBJLockGarbageCollector *&obj_lock_garbage_collector); @@ -344,11 +350,6 @@ private: const ObLockIDArray &lock_ids, ObLockTaskBatchRequest &request); template - int parallel_rpc_handle_(RpcProxy &proxy_batch, - ObTableLockCtx &ctx, - const LockMap &lock_map, - const ObLSLockMap &ls_lock_map); - template int batch_rpc_handle_(RpcProxy &proxy_batch, ObTableLockCtx &ctx, const ObLSLockMap &lock_map); @@ -428,9 +429,6 @@ private: int inner_process_obj_lock_(ObTableLockCtx &ctx, const LockMap &lock_map, const ObLSLockMap &ls_lock_map); - int inner_process_obj_lock_old_version_(ObTableLockCtx &ctx, - const LockMap &lock_map, - const ObLSLockMap &ls_lock_map); int inner_process_obj_lock_batch_(ObTableLockCtx &ctx, const ObLSLockMap &ls_lock_map); int process_obj_lock_(ObTableLockCtx &ctx, diff --git a/unittest/storage/transaction/tablelock/test_ob_replace_lock_request.cpp b/unittest/storage/transaction/tablelock/test_ob_replace_lock_request.cpp index 36796f712..9972b5372 100644 --- a/unittest/storage/transaction/tablelock/test_ob_replace_lock_request.cpp +++ b/unittest/storage/transaction/tablelock/test_ob_replace_lock_request.cpp @@ -80,14 +80,16 @@ bool unlock_req_is_equal(const ObLockRequest &req1, const ObLockRequest &req2) && req1.op_type_ == req2.op_type_ && req1.timeout_us_ == req2.timeout_us_; if (is_equal) { switch (req1.type_) { - case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_TABLE_REQ: { + case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_TABLE_REQ: + case transaction::tablelock::ObLockRequest::ObLockMsgType::UNLOCK_TABLE_REQ: { const ObUnLockTableRequest &lock_req1 = static_cast(req1); const ObUnLockTableRequest &lock_req2 = static_cast(req2); is_equal = lock_req1.table_id_ == lock_req2.table_id_; TABLELOCK_LOG(INFO, "compare unlock request", K(lock_req1), K(lock_req2), K(is_equal)); break; } - case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_TABLET_REQ: { + case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_TABLET_REQ: + case transaction::tablelock::ObLockRequest::ObLockMsgType::UNLOCK_TABLET_REQ: { const ObUnLockTabletsRequest &lock_req1 = static_cast(req1); const ObUnLockTabletsRequest &lock_req2 = static_cast(req2); is_equal = @@ -95,21 +97,24 @@ bool unlock_req_is_equal(const ObLockRequest &req1, const ObLockRequest &req2) TABLELOCK_LOG(INFO, "compare unlock request", K(lock_req1), K(lock_req2), K(is_equal)); break; } - case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_PARTITION_REQ: { + case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_PARTITION_REQ: + case transaction::tablelock::ObLockRequest::ObLockMsgType::UNLOCK_PARTITION_REQ: { const ObUnLockPartitionRequest &lock_req1 = static_cast(req1); const ObUnLockPartitionRequest &lock_req2 = static_cast(req2); is_equal = lock_req1.table_id_ == lock_req2.table_id_ && lock_req1.part_object_id_ == lock_req2.part_object_id_; TABLELOCK_LOG(INFO, "compare unlock request", K(lock_req1), K(lock_req2), K(is_equal)); break; } - case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_OBJ_REQ: { + case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_OBJ_REQ: + case transaction::tablelock::ObLockRequest::ObLockMsgType::UNLOCK_OBJ_REQ: { const ObUnLockObjsRequest &lock_req1 = static_cast(req1); const ObUnLockObjsRequest &lock_req2 = static_cast(req2); is_equal = list_is_equal(lock_req1.objs_, lock_req1.objs_); TABLELOCK_LOG(INFO, "compare unlock request", K(lock_req1), K(lock_req2), K(is_equal)); break; } - case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_ALONE_TABLET_REQ: { + case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_ALONE_TABLET_REQ: + case transaction::tablelock::ObLockRequest::ObLockMsgType::UNLOCK_ALONE_TABLET_REQ: { const ObUnLockAloneTabletRequest &lock_req1 = static_cast(req1); const ObUnLockAloneTabletRequest &lock_req2 = static_cast(req2); is_equal = lock_req1.table_id_ == lock_req2.table_id_ && lock_req1.ls_id_ == lock_req2.ls_id_ @@ -211,6 +216,81 @@ TEST(ObReplaceLockRequest, test_replace_objs) replace_req.unlock_req_ = &unlock_req; CHECK_SERIALIZE_AND_DESERIALIZE(ObUnLockTableRequest); } + +TEST(ObReplaceLockRequest, test_replace_all_locks) +{ + INIT_SUCC(ret); + ObArenaAllocator allocator; + ObReplaceAllLocksRequest replace_req(allocator); + ObReplaceAllLocksRequest des_replace_req(allocator); + ObUnLockTableRequest unlock_req1; + ObUnLockTableRequest unlock_req2; + ObLockTableRequest lock_req; + const int LEN = 100; + char buf[LEN]; + int64_t pos = 0; + char tmp_buf[LEN]; + int64_t tmp_pos = 0; + bool is_equal = false; + + unlock_req1.type_ = ObLockRequest::ObLockMsgType::UNLOCK_TABLE_REQ; + unlock_req1.owner_id_.convert_from_value(1001); + unlock_req1.lock_mode_ = ROW_SHARE; + unlock_req1.op_type_ = OUT_TRANS_UNLOCK; + unlock_req1.timeout_us_ = 1000; + unlock_req1.table_id_ = 998; + + unlock_req2.type_ = ObLockRequest::ObLockMsgType::UNLOCK_TABLE_REQ; + unlock_req2.owner_id_.convert_from_value(1002); + unlock_req2.lock_mode_ = ROW_EXCLUSIVE; + unlock_req2.op_type_ = OUT_TRANS_UNLOCK; + unlock_req2.timeout_us_ = 1000; + unlock_req2.table_id_ = 998; + + lock_req.owner_id_.convert_from_value(1003); + lock_req.lock_mode_ = EXCLUSIVE; + lock_req.op_type_ = OUT_TRANS_LOCK; + lock_req.timeout_us_ = 1000; + lock_req.table_id_ = 998; + + replace_req.unlock_req_list_.push_back(&unlock_req1); + replace_req.unlock_req_list_.push_back(&unlock_req2); + replace_req.lock_req_ = &lock_req; + + ret = replace_req.serialize(buf, LEN, pos); + TABLELOCK_LOG(INFO, + "1. serailize replace_req", + K(ret), + K(replace_req), + K(unlock_req1), + K(unlock_req2), + K(lock_req), + K(pos), + KPHEX(buf, pos)); + ASSERT_EQ(ret, OB_SUCCESS); + + pos = 0; + ret = des_replace_req.deserialize(buf, LEN, pos); + + TABLELOCK_LOG(INFO, + "2. deserailize replace_req", + K(ret), + K(replace_req), + K(des_replace_req), + K(unlock_req1), + K(unlock_req2), + K(lock_req), + K(pos), + KPHEX(buf, pos)); + ASSERT_EQ(ret, OB_SUCCESS); + + is_equal = unlock_req_is_equal(*des_replace_req.lock_req_, *replace_req.lock_req_); + ASSERT_TRUE(is_equal); + for (int64_t i = 0; i < des_replace_req.unlock_req_list_.count() && is_equal; i++) { + is_equal = unlock_req_is_equal(*des_replace_req.unlock_req_list_.at(i), *replace_req.unlock_req_list_.at(i)); + } + ASSERT_TRUE(is_equal); +} } // namespace unittest } // namespace oceanbase int main(int argc, char **argv)