[TABLELOCK] add replace_all_locks interface for recovery of auto_split
This commit is contained in:
parent
32ad2dce19
commit
9051e00cd0
@ -1893,6 +1893,178 @@ TEST_F(ObTableLockServiceTest, replace_lock_and_unlock_concurrency)
|
||||
owner_one);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
}
|
||||
|
||||
TEST_F(ObTableLockServiceTest, replace_all_locks)
|
||||
{
|
||||
LOG_INFO("ObTableLockServiceTest::replace_all_locks");
|
||||
int ret = OB_SUCCESS;
|
||||
ObTxParam tx_param;
|
||||
share::ObTenantSwitchGuard tenant_guard;
|
||||
ObTxDesc *tx_desc1 = nullptr;
|
||||
ObTxDesc *tx_desc2= nullptr;
|
||||
ObTxDesc *tx_desc3= nullptr;
|
||||
ObTxDesc *tx_desc4= nullptr;
|
||||
ObTransService *txs = nullptr;
|
||||
uint64_t table_id = 0;
|
||||
ObTableLockMode check_lock_mode = EXCLUSIVE;
|
||||
ObTableLockMode new_lock_mode = SHARE;
|
||||
ObTableLockOwnerID owner_one(ObTableLockOwnerID::get_owner_by_value(1));
|
||||
ObTableLockOwnerID owner_two(ObTableLockOwnerID::get_owner_by_value(2));
|
||||
ObTableLockOwnerID owner_three(ObTableLockOwnerID::get_owner_by_value(3));
|
||||
ObTableLockMode lock_mode_one = ROW_SHARE;
|
||||
ObTableLockMode lock_mode_two = ROW_EXCLUSIVE;
|
||||
ObLockTableRequest lock_arg;
|
||||
ObLockTableRequest new_lock_arg;
|
||||
ObUnLockTableRequest unlock_arg1;
|
||||
ObUnLockTableRequest unlock_arg2;
|
||||
int64_t stmt_timeout_ts = -1;
|
||||
|
||||
tx_param.access_mode_ = ObTxAccessMode::RW;
|
||||
tx_param.isolation_ = ObTxIsolationLevel::RC;
|
||||
tx_param.timeout_us_ = 6000 * 1000L;
|
||||
tx_param.lock_timeout_us_ = -1;
|
||||
tx_param.cluster_id_ = GCONF.cluster_id;
|
||||
|
||||
ret = tenant_guard.switch_to(OB_SYS_TENANT_ID);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_NE(nullptr, MTL(ObTableLockService*));
|
||||
|
||||
txs = MTL(ObTransService*);
|
||||
ASSERT_NE(nullptr, txs);
|
||||
ASSERT_EQ(OB_SUCCESS, txs->acquire_tx(tx_desc1));
|
||||
|
||||
// 1. lock out_trans
|
||||
LOG_INFO("ObTableLockServiceTest::replace_all_locks 1");
|
||||
get_table_id("t_one_part", table_id);
|
||||
lock_arg.table_id_ = table_id;
|
||||
lock_arg.owner_id_ = owner_one;
|
||||
lock_arg.lock_mode_ = lock_mode_one;
|
||||
lock_arg.op_type_ = OUT_TRANS_LOCK;
|
||||
lock_arg.timeout_us_ = 0;
|
||||
|
||||
unlock_arg1.table_id_ = table_id;
|
||||
unlock_arg1.owner_id_ = owner_one;
|
||||
unlock_arg1.lock_mode_ = lock_mode_one;
|
||||
unlock_arg1.op_type_ = OUT_TRANS_UNLOCK;
|
||||
unlock_arg1.timeout_us_ = 0;
|
||||
|
||||
ret = MTL(ObTableLockService*)->lock(*tx_desc1,
|
||||
tx_param,
|
||||
lock_arg);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
// 2. check lock
|
||||
LOG_INFO("ObTableLockServiceTest::replace_all_locks 2");
|
||||
ret = MTL(ObTableLockService*)->lock_table(table_id,
|
||||
check_lock_mode,
|
||||
owner_two);
|
||||
ASSERT_EQ(OB_EAGAIN, ret);
|
||||
|
||||
// 3. commit lock
|
||||
LOG_INFO("ObTableLockServiceTest::replace_all_locks 3");
|
||||
stmt_timeout_ts = ObTimeUtility::current_time() + 1000 * 1000;
|
||||
ret = txs->commit_tx(*tx_desc1, stmt_timeout_ts);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ret = txs->release_tx(*tx_desc1);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
// 4. lock table and commit with another owenr and mode
|
||||
LOG_INFO("ObTableLockServiceTest::replace_all_locks 4");
|
||||
ASSERT_EQ(OB_SUCCESS, txs->acquire_tx(tx_desc2));
|
||||
lock_arg.owner_id_ = owner_two;
|
||||
lock_arg.lock_mode_ = lock_mode_two;
|
||||
|
||||
unlock_arg2.table_id_ = table_id;
|
||||
unlock_arg2.owner_id_ = owner_two;
|
||||
unlock_arg2.lock_mode_ = lock_mode_two;
|
||||
unlock_arg2.op_type_ = OUT_TRANS_UNLOCK;
|
||||
unlock_arg2.timeout_us_ = 0;
|
||||
|
||||
ret = MTL(ObTableLockService*)->lock(*tx_desc2,
|
||||
tx_param,
|
||||
lock_arg);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
ret = txs->commit_tx(*tx_desc2, stmt_timeout_ts);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ret = txs->release_tx(*tx_desc2);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
// 5. replace lock
|
||||
LOG_INFO("ObTableLockServiceTest::replace_all_locks 5");
|
||||
ObArenaAllocator allocator;
|
||||
ObReplaceAllLocksRequest replace_lock_arg(allocator);
|
||||
new_lock_arg.table_id_ = table_id;
|
||||
new_lock_arg.owner_id_ = owner_three;
|
||||
new_lock_arg.lock_mode_ = EXCLUSIVE;
|
||||
new_lock_arg.op_type_ = OUT_TRANS_LOCK;
|
||||
new_lock_arg.timeout_us_ = 0;
|
||||
|
||||
replace_lock_arg.lock_req_ = &new_lock_arg;
|
||||
replace_lock_arg.unlock_req_list_.push_back(&unlock_arg1);
|
||||
replace_lock_arg.unlock_req_list_.push_back(&unlock_arg2);
|
||||
ASSERT_EQ(OB_SUCCESS, txs->acquire_tx(tx_desc3));
|
||||
|
||||
ret = MTL(ObTableLockService*)->replace_lock(*tx_desc3,
|
||||
tx_param,
|
||||
replace_lock_arg);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
// 6. commit rplace lock
|
||||
LOG_INFO("ObTableLockServiceTest::replace_all_locks 6");
|
||||
stmt_timeout_ts = ObTimeUtility::current_time() + 1000 * 1000;
|
||||
ret = txs->commit_tx(*tx_desc3, stmt_timeout_ts);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ret = txs->release_tx(*tx_desc3);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
// 7. try to lock by origin owner
|
||||
LOG_INFO("ObTableLockServiceTest::replace_all_locks 7");
|
||||
ret = MTL(ObTableLockService*)->lock_table(table_id,
|
||||
ROW_SHARE,
|
||||
owner_one);
|
||||
ASSERT_EQ(OB_EAGAIN, ret);
|
||||
|
||||
// 8. check new owner
|
||||
LOG_INFO("ObTableLockServiceTest::replace_all_locks 8");
|
||||
ret = MTL(ObTableLockService*)->lock_table(table_id,
|
||||
EXCLUSIVE,
|
||||
owner_three);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
// 9. unlock
|
||||
LOG_INFO("ObTableLockServiceTest::replace_all_locks 9");
|
||||
ASSERT_EQ(OB_SUCCESS, txs->acquire_tx(tx_desc4));
|
||||
unlock_arg2.owner_id_ = owner_three;
|
||||
unlock_arg2.lock_mode_ = EXCLUSIVE;
|
||||
ret = MTL(ObTableLockService *)->unlock(*tx_desc4, tx_param, unlock_arg2);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
stmt_timeout_ts = ObTimeUtility::current_time() + 1000 * 1000;
|
||||
ret = txs->commit_tx(*tx_desc4, stmt_timeout_ts);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ret = txs->release_tx(*tx_desc4);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
// 10. try to lock by origin owner
|
||||
LOG_INFO("ObTableLockServiceTest::replace_all_locks 10");
|
||||
ret = MTL(ObTableLockService*)->lock_table(table_id,
|
||||
check_lock_mode,
|
||||
owner_one);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
// 11. check new owner
|
||||
LOG_INFO("ObTableLockServiceTest::replace_all_locks 11");
|
||||
ret = MTL(ObTableLockService*)->lock_table(table_id,
|
||||
check_lock_mode,
|
||||
owner_three);
|
||||
ASSERT_EQ(OB_EAGAIN, ret);
|
||||
|
||||
// 12. unlock by origin owner
|
||||
LOG_INFO("ObTableLockServiceTest::replace_all_locks 12");
|
||||
ret = MTL(ObTableLockService*)->unlock_table(table_id,
|
||||
check_lock_mode,
|
||||
owner_one);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
}
|
||||
}// end unittest
|
||||
} // end oceanbase
|
||||
|
||||
@ -1900,7 +2072,7 @@ TEST_F(ObTableLockServiceTest, replace_lock_and_unlock_concurrency)
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
oceanbase::unittest::init_log_and_gtest(argc, argv);
|
||||
OB_LOGGER.set_log_level(OB_LOG_LEVEL_ERROR);
|
||||
OB_LOGGER.set_log_level(OB_LOG_LEVEL_INFO);
|
||||
OB_LOGGER.set_mod_log_levels("STORAGE.TABLELOCK:DEBUG");
|
||||
OB_LOGGER.set_enable_async_log(false);
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
|
@ -420,7 +420,8 @@ int ObInnerSqlRpcP::process()
|
||||
case ObInnerSQLTransmitArg::OPERATION_TYPE_UNLOCK_ALONE_TABLET:
|
||||
case ObInnerSQLTransmitArg::OPERATION_TYPE_LOCK_OBJS:
|
||||
case ObInnerSQLTransmitArg::OPERATION_TYPE_UNLOCK_OBJS:
|
||||
case ObInnerSQLTransmitArg::OPERATION_TYPE_REPLACE_LOCK: {
|
||||
case ObInnerSQLTransmitArg::OPERATION_TYPE_REPLACE_LOCK:
|
||||
case ObInnerSQLTransmitArg::OPERATION_TYPE_REPLACE_LOCKS: {
|
||||
if (OB_FAIL(ObInnerConnectionLockUtil::process_lock_rpc(transmit_arg, conn))) {
|
||||
LOG_WARN("process lock rpc failed", K(ret), K(transmit_arg.get_operation_type()));
|
||||
}
|
||||
|
@ -53,6 +53,7 @@ public:
|
||||
OPERATION_TYPE_LOCK_OBJS = 19,
|
||||
OPERATION_TYPE_UNLOCK_OBJS = 20,
|
||||
OPERATION_TYPE_REPLACE_LOCK = 21,
|
||||
OPERATION_TYPE_REPLACE_LOCKS = 22,
|
||||
OPERATION_TYPE_MAX = 100
|
||||
};
|
||||
|
||||
|
@ -161,7 +161,13 @@ int ObInnerConnectionLockUtil::process_lock_rpc(
|
||||
}
|
||||
case ObInnerSQLTransmitArg::OPERATION_TYPE_REPLACE_LOCK: {
|
||||
if (OB_FAIL(process_replace_lock_(arg, inner_conn))) {
|
||||
LOG_WARN("process lock table failed", K(ret));
|
||||
LOG_WARN("process replace lock failed", K(ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObInnerSQLTransmitArg::OPERATION_TYPE_REPLACE_LOCKS: {
|
||||
if (OB_FAIL(process_replace_all_locks_(arg, inner_conn))) {
|
||||
LOG_WARN("process replace all locks failed", K(ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -288,6 +294,28 @@ int ObInnerConnectionLockUtil::process_replace_lock_(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObInnerConnectionLockUtil::process_replace_all_locks_(
|
||||
const ObInnerSQLTransmitArg &arg,
|
||||
observer::ObInnerSQLConnection *conn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObArenaAllocator allocator;
|
||||
ObReplaceAllLocksRequest replace_req(allocator);
|
||||
const char *buf = arg.get_inner_sql().ptr();
|
||||
const int64_t data_len = arg.get_inner_sql().length();
|
||||
int64_t pos = 0;
|
||||
if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_3_4_0) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("cluster version check faild", KR(ret), K(GET_MIN_CLUSTER_VERSION()));
|
||||
} else if (OB_FAIL(replace_req.deserialize(buf, data_len, pos))) {
|
||||
LOG_WARN("deserialize and check header of ObReplaceLockRequest failed", K(ret), K(arg), K(pos));
|
||||
} else if (OB_FAIL(replace_lock(arg.get_tenant_id(), replace_req, conn))) {
|
||||
LOG_WARN("replace all locks failed", K(ret), K(replace_req));
|
||||
}
|
||||
replace_req.reset();
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObInnerConnectionLockUtil::lock_table(
|
||||
const uint64_t tenant_id,
|
||||
const uint64_t table_id,
|
||||
@ -594,6 +622,61 @@ int ObInnerConnectionLockUtil::replace_lock(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObInnerConnectionLockUtil::replace_lock(const uint64_t tenant_id,
|
||||
const ObReplaceAllLocksRequest &req,
|
||||
observer::ObInnerSQLConnection *conn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
observer::ObReqTimeGuard req_timeinfo_guard;
|
||||
|
||||
const bool local_execute = conn->is_local_execute(GCONF.cluster_id, tenant_id);
|
||||
|
||||
SMART_VAR(ObInnerSQLResult, res, conn->get_session())
|
||||
{
|
||||
if (OB_INVALID_ID == tenant_id) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tenant_id));
|
||||
} else if (local_execute) {
|
||||
if (OB_FAIL(conn->switch_tenant(tenant_id))) {
|
||||
LOG_WARN("set system tenant id failed", K(ret), K(tenant_id));
|
||||
}
|
||||
} else {
|
||||
LOG_DEBUG("tenant not in server", K(ret), K(tenant_id));
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (!conn->is_in_trans()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("inner conn must be already in trans", K(ret));
|
||||
} else if (OB_FAIL(res.init(local_execute))) {
|
||||
LOG_WARN("init result set", K(ret), K(local_execute));
|
||||
} else if (local_execute) {
|
||||
if (OB_FAIL(replace_lock_(tenant_id, req, conn, res))) {
|
||||
LOG_WARN("replace lock failed", KR(ret), K(req));
|
||||
}
|
||||
} else {
|
||||
char *tmp_str = nullptr;
|
||||
int64_t pos = 0;
|
||||
ObString sql;
|
||||
if (OB_ISNULL(tmp_str = static_cast<char *>(ob_malloc(req.get_serialize_size(), "InnerLock")))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("alloc memory for sql_str failed", K(ret), K(req.get_serialize_size()));
|
||||
} else if (OB_FAIL(req.serialize(tmp_str, req.get_serialize_size(), pos))) {
|
||||
LOG_WARN("serialize replace lock table arg failed", K(ret), K(req));
|
||||
} else {
|
||||
sql.assign_ptr(tmp_str, pos);
|
||||
ret = conn->forward_request(tenant_id, obrpc::ObInnerSQLTransmitArg::OPERATION_TYPE_REPLACE_LOCKS, sql, res);
|
||||
}
|
||||
|
||||
if (OB_NOT_NULL(tmp_str)) {
|
||||
ob_free(tmp_str);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObInnerConnectionLockUtil::replace_lock_(
|
||||
const uint64_t tenant_id,
|
||||
const ObReplaceLockRequest &req,
|
||||
@ -628,6 +711,40 @@ int ObInnerConnectionLockUtil::replace_lock_(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObInnerConnectionLockUtil::replace_lock_(
|
||||
const uint64_t tenant_id,
|
||||
const ObReplaceAllLocksRequest &req,
|
||||
observer::ObInnerSQLConnection *conn,
|
||||
observer::ObInnerSQLResult &res)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
transaction::ObTxDesc *tx_desc = nullptr;
|
||||
|
||||
if (OB_ISNULL(conn)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("Invalid conn", KR(ret));
|
||||
} else if (OB_ISNULL(tx_desc = conn->get_session().get_tx_desc())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("Invalid tx_desc");
|
||||
} else {
|
||||
transaction::ObTxParam tx_param;
|
||||
tx_param.access_mode_ = transaction::ObTxAccessMode::RW;
|
||||
tx_param.isolation_ = conn->get_session().get_tx_isolation();
|
||||
tx_param.cluster_id_ = GCONF.cluster_id;
|
||||
conn->get_session().get_tx_timeout(tx_param.timeout_us_);
|
||||
tx_param.lock_timeout_us_ = conn->get_session().get_trx_lock_timeout();
|
||||
|
||||
MTL_SWITCH(tenant_id) {
|
||||
if (OB_FAIL(MTL(ObTableLockService *)->replace_lock(*tx_desc, tx_param, req))) {
|
||||
LOG_WARN("replace lock failed", K(ret), K(tenant_id), K(req));
|
||||
} else if (OB_FAIL(res.close())) {
|
||||
LOG_WARN("close result set failed", K(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObInnerConnectionLockUtil::create_inner_conn(sql::ObSQLSessionInfo *session_info,
|
||||
common::ObMySQLProxy *sql_proxy,
|
||||
observer::ObInnerSQLConnection *&inner_conn)
|
||||
|
@ -68,9 +68,9 @@ private:
|
||||
const obrpc::ObInnerSQLTransmitArg::InnerSQLOperationType operation_type,
|
||||
const obrpc::ObInnerSQLTransmitArg &arg,
|
||||
observer::ObInnerSQLConnection *conn);
|
||||
static int process_replace_lock_(const obrpc::ObInnerSQLTransmitArg &arg,
|
||||
observer::ObInnerSQLConnection *conn);
|
||||
// --------------------- interface for inner connection client -----------------------
|
||||
static int process_replace_lock_(const obrpc::ObInnerSQLTransmitArg &arg, observer::ObInnerSQLConnection *conn);
|
||||
static int process_replace_all_locks_(const obrpc::ObInnerSQLTransmitArg &arg, observer::ObInnerSQLConnection *conn);
|
||||
// --------------------- interface for inner connection client -----------------------
|
||||
public:
|
||||
static int lock_table(
|
||||
const uint64_t tenant_id,
|
||||
@ -152,6 +152,10 @@ public:
|
||||
const uint64_t tenant_id,
|
||||
const ObReplaceLockRequest &req,
|
||||
observer::ObInnerSQLConnection *conn);
|
||||
static int replace_lock(
|
||||
const uint64_t tenant_id,
|
||||
const ObReplaceAllLocksRequest &req,
|
||||
observer::ObInnerSQLConnection *conn);
|
||||
static int create_inner_conn(sql::ObSQLSessionInfo *session_info,
|
||||
common::ObMySQLProxy *sql_proxy,
|
||||
observer::ObInnerSQLConnection *&inner_conn);
|
||||
@ -167,6 +171,11 @@ private:
|
||||
const ObReplaceLockRequest &req,
|
||||
observer::ObInnerSQLConnection *conn,
|
||||
observer::ObInnerSQLResult &res);
|
||||
static int replace_lock_(
|
||||
const uint64_t tenant_id,
|
||||
const ObReplaceAllLocksRequest &req,
|
||||
observer::ObInnerSQLConnection *conn,
|
||||
observer::ObInnerSQLResult &res);
|
||||
static int do_obj_lock_(
|
||||
const uint64_t tenant_id,
|
||||
const ObLockRequest &arg,
|
||||
|
@ -21,6 +21,14 @@ namespace transaction
|
||||
{
|
||||
namespace tablelock
|
||||
{
|
||||
#define GET_REAL_TYPE_LOCK_REQ(T, allocator, lock_req) \
|
||||
void *ptr = static_cast<T *>(allocator.alloc(sizeof(T))); \
|
||||
if (OB_ISNULL(ptr)) { \
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED; \
|
||||
LOG_WARN("failed to allocate lock_req", KR(ret)); \
|
||||
} else { \
|
||||
lock_req = new (ptr) T(); \
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObLockParam,
|
||||
lock_id_,
|
||||
@ -138,6 +146,150 @@ OB_DEF_DESERIALIZE(ObTableLockTaskRequest)
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_DEF_SERIALIZE_SIZE(ObReplaceAllLocksRequest)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t len = 0;
|
||||
int64_t count = 0;
|
||||
if (OB_ISNULL(lock_req_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("lock_req should not be null", K(ret), KP(lock_req_));
|
||||
} else {
|
||||
OB_UNIS_ADD_LEN(*lock_req_);
|
||||
count = unlock_req_list_.count();
|
||||
OB_UNIS_ADD_LEN(count);
|
||||
for (int64_t i = 0; i < count && OB_SUCC(ret); i++) {
|
||||
if (OB_ISNULL(unlock_req_list_.at(i))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unlock_req should not be null", K(ret));
|
||||
} else {
|
||||
OB_UNIS_ADD_LEN(*unlock_req_list_.at(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
OB_DEF_SERIALIZE(ObReplaceAllLocksRequest)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t count = 0;
|
||||
if (OB_ISNULL(lock_req_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("lock_req should not be null", K(ret), KP(lock_req_));
|
||||
} else {
|
||||
OB_UNIS_ENCODE(*lock_req_);
|
||||
count = unlock_req_list_.count();
|
||||
OB_UNIS_ENCODE(count);
|
||||
for (int64_t i = 0; i < count && OB_SUCC(ret); i++) {
|
||||
if (OB_ISNULL(unlock_req_list_.at(i))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unlock_req should not be null", K(ret));
|
||||
} else {
|
||||
OB_UNIS_ENCODE(*unlock_req_list_.at(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_DEF_DESERIALIZE(ObReplaceAllLocksRequest)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLockRequest lock_req;
|
||||
int64_t cnt = 0;
|
||||
int64_t tmp_pos = 0;
|
||||
void *ptr = nullptr;
|
||||
ObUnLockRequest unlock_req;
|
||||
|
||||
if (OB_FAIL(common::serialization::decode(buf, data_len, tmp_pos, lock_req))) {
|
||||
LOG_WARN("deserialize lock_req failed", K(ret), K(data_len), K(tmp_pos), KPHEX(buf, data_len));
|
||||
} else {
|
||||
switch(lock_req.type_) {
|
||||
case ObLockRequest::ObLockMsgType::LOCK_OBJ_REQ: {
|
||||
GET_REAL_TYPE_LOCK_REQ(ObLockObjsRequest, allocator_, lock_req_);
|
||||
break;
|
||||
}
|
||||
case ObLockRequest::ObLockMsgType::LOCK_TABLE_REQ: {
|
||||
GET_REAL_TYPE_LOCK_REQ(ObLockTableRequest, allocator_, lock_req_);
|
||||
break;
|
||||
}
|
||||
case ObLockRequest::ObLockMsgType::LOCK_PARTITION_REQ: {
|
||||
GET_REAL_TYPE_LOCK_REQ(ObLockPartitionRequest, allocator_, lock_req_);
|
||||
break;
|
||||
}
|
||||
case ObLockRequest::ObLockMsgType::LOCK_TABLET_REQ: {
|
||||
GET_REAL_TYPE_LOCK_REQ(ObLockTabletsRequest, allocator_, lock_req_);
|
||||
break;
|
||||
}
|
||||
case ObLockRequest::ObLockMsgType::LOCK_ALONE_TABLET_REQ: {
|
||||
GET_REAL_TYPE_LOCK_REQ(ObLockAloneTabletRequest, allocator_, lock_req_);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("meet unexpected type of lock_req in ObReplaceAllLocksRequest", K(lock_req));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(lock_req_->deserialize(buf, data_len, pos))) {
|
||||
LOG_WARN("deserialize for lock_req failed", K(ret));
|
||||
allocator_.free(lock_req_);
|
||||
lock_req_ = nullptr;
|
||||
} else {
|
||||
OB_UNIS_DECODE(cnt);
|
||||
for (int64_t i = 0; i < cnt && OB_SUCC(ret); i++) {
|
||||
unlock_req.reset();
|
||||
tmp_pos = pos;
|
||||
ObLockRequest *unlock_req_ptr = nullptr;
|
||||
if (OB_FAIL(common::serialization::decode(buf, data_len, tmp_pos, unlock_req))) {
|
||||
LOG_WARN("deserialize lock_req failed", K(ret));
|
||||
} else {
|
||||
switch (unlock_req.type_) {
|
||||
case ObLockRequest::ObLockMsgType::UNLOCK_OBJ_REQ: {
|
||||
GET_REAL_TYPE_LOCK_REQ(ObUnLockObjsRequest, allocator_, unlock_req_ptr);
|
||||
break;
|
||||
}
|
||||
case ObLockRequest::ObLockMsgType::UNLOCK_TABLE_REQ: {
|
||||
GET_REAL_TYPE_LOCK_REQ(ObUnLockTableRequest, allocator_, unlock_req_ptr);
|
||||
break;
|
||||
}
|
||||
case ObLockRequest::ObLockMsgType::UNLOCK_PARTITION_REQ: {
|
||||
GET_REAL_TYPE_LOCK_REQ(ObLockPartitionRequest, allocator_, unlock_req_ptr);
|
||||
break;
|
||||
}
|
||||
case ObLockRequest::ObLockMsgType::UNLOCK_TABLET_REQ: {
|
||||
GET_REAL_TYPE_LOCK_REQ(ObLockTabletsRequest, allocator_, unlock_req_ptr);
|
||||
break;
|
||||
}
|
||||
case ObLockRequest::ObLockMsgType::UNLOCK_ALONE_TABLET_REQ: {
|
||||
GET_REAL_TYPE_LOCK_REQ(ObLockAloneTabletRequest, allocator_, unlock_req_ptr);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("meet unexpected type of unlock_req in ObReplaceAllLocksRequest", K(unlock_req));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(unlock_req_ptr->deserialize(buf, data_len, pos))) {
|
||||
LOG_WARN("deserialize for unlock_req failed", K(ret));
|
||||
allocator_.free(unlock_req_ptr);
|
||||
} else if (OB_FAIL(unlock_req_list_.push_back(unlock_req_ptr))) {
|
||||
LOG_WARN("push unlock_req into list failed", K(ret));
|
||||
allocator_.free(unlock_req_ptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
reset();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int TxDescHelper::deserialize_tx_desc(DESERIAL_PARAMS, ObTxDesc *&tx_desc)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -187,7 +339,8 @@ int ObLockParam::set(
|
||||
const int64_t schema_version,
|
||||
const bool is_deadlock_avoid_enabled,
|
||||
const bool is_try_lock,
|
||||
const int64_t expired_time)
|
||||
const int64_t expired_time,
|
||||
const bool is_for_replace)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!lock_id.is_valid()) ||
|
||||
@ -205,6 +358,7 @@ int ObLockParam::set(
|
||||
is_try_lock_ = is_try_lock;
|
||||
expired_time_ = expired_time;
|
||||
schema_version_ = schema_version;
|
||||
is_for_replace_ = is_for_replace;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -619,6 +773,46 @@ int ObReplaceLockRequest::deserialize_new_lock_mode_and_owner(DESERIAL_PARAMS)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObReplaceAllLocksRequest::reset()
|
||||
{
|
||||
if (OB_NOT_NULL(lock_req_)) {
|
||||
allocator_.free(lock_req_);
|
||||
lock_req_ = nullptr;
|
||||
}
|
||||
for (int64_t i = 0; i < unlock_req_list_.count(); i++) {
|
||||
ObLockRequest *unlock_req_ptr = unlock_req_list_.at(i);
|
||||
if (OB_NOT_NULL(unlock_req_ptr)) {
|
||||
allocator_.free(unlock_req_ptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool ObReplaceAllLocksRequest::is_valid() const
|
||||
{
|
||||
bool is_valid = false;
|
||||
if (OB_NOT_NULL(lock_req_)) {
|
||||
is_valid = lock_req_->is_valid();
|
||||
} else {
|
||||
is_valid = false;
|
||||
}
|
||||
|
||||
if (is_valid) {
|
||||
if (unlock_req_list_.count() == 0) {
|
||||
is_valid = false;
|
||||
} else {
|
||||
for (int64_t i = 0; i < unlock_req_list_.count() && is_valid; i++) {
|
||||
ObLockRequest *unlock_req_ptr = unlock_req_list_.at(i);
|
||||
if (OB_NOT_NULL(unlock_req_ptr)) {
|
||||
is_valid = is_valid && unlock_req_ptr->is_valid();
|
||||
} else {
|
||||
is_valid = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return is_valid;
|
||||
}
|
||||
|
||||
int ObTableLockTaskRequest::set(
|
||||
const ObTableLockTaskType task_type,
|
||||
const share::ObLSID &lsid,
|
||||
|
@ -63,6 +63,7 @@ enum ObTableLockTaskType
|
||||
REPLACE_LOCK_SUBPARTITION = 21,
|
||||
REPLACE_LOCK_OBJECTS = 22,
|
||||
REPLACE_LOCK_ALONE_TABLET = 23,
|
||||
REPLACE_ALL_LOCKS = 24,
|
||||
MAX_TASK_TYPE,
|
||||
};
|
||||
|
||||
@ -111,15 +112,15 @@ public:
|
||||
{}
|
||||
virtual ~ObLockParam() { reset(); }
|
||||
void reset();
|
||||
int set(
|
||||
const ObLockID &lock_id,
|
||||
const ObTableLockMode lock_mode,
|
||||
const ObTableLockOwnerID &owner_id,
|
||||
const ObTableLockOpType op_type,
|
||||
const int64_t schema_version,
|
||||
const bool is_deadlock_avoid_enabled = false,
|
||||
const bool is_try_lock = true,
|
||||
const int64_t expired_time = 0);
|
||||
int set(const ObLockID &lock_id,
|
||||
const ObTableLockMode lock_mode,
|
||||
const ObTableLockOwnerID &owner_id,
|
||||
const ObTableLockOpType op_type,
|
||||
const int64_t schema_version,
|
||||
const bool is_deadlock_avoid_enabled = false,
|
||||
const bool is_try_lock = true,
|
||||
const int64_t expired_time = 0,
|
||||
const bool is_for_replace = false);
|
||||
bool is_valid() const;
|
||||
TO_STRING_KV(K_(lock_id),
|
||||
K_(lock_mode),
|
||||
@ -429,7 +430,6 @@ struct ObReplaceLockRequest
|
||||
{
|
||||
OB_UNIS_VERSION_V(1);
|
||||
public:
|
||||
public:
|
||||
ObReplaceLockRequest() :
|
||||
new_lock_mode_(MAX_LOCK_MODE), new_lock_owner_(), unlock_req_(nullptr)
|
||||
{}
|
||||
@ -446,6 +446,26 @@ public:
|
||||
ObLockRequest *unlock_req_;
|
||||
};
|
||||
|
||||
struct ObReplaceAllLocksRequest
|
||||
{
|
||||
OB_UNIS_VERSION_V(1);
|
||||
public:
|
||||
ObReplaceAllLocksRequest(common::ObIAllocator &allocator) :
|
||||
lock_req_(nullptr), unlock_req_list_(), allocator_(allocator)
|
||||
{}
|
||||
~ObReplaceAllLocksRequest() { reset(); }
|
||||
void reset();
|
||||
bool is_valid() const;
|
||||
int deserialize_and_check_header(DESERIAL_PARAMS);
|
||||
int deserialize_new_lock_mode_and_owner(DESERIAL_PARAMS);
|
||||
VIRTUAL_TO_STRING_KV(K_(lock_req), K_(unlock_req_list));
|
||||
|
||||
public:
|
||||
ObLockRequest *lock_req_;
|
||||
ObSArray<ObLockRequest *> unlock_req_list_;
|
||||
common::ObIAllocator &allocator_;
|
||||
};
|
||||
|
||||
class ObTableLockTaskRequest final
|
||||
{
|
||||
OB_UNIS_VERSION(1);
|
||||
|
@ -60,7 +60,8 @@ ObTableLockService::ObTableLockCtx::ObTableLockCtx() :
|
||||
lock_owner_(),
|
||||
schema_version_(-1),
|
||||
tx_is_killed_(false),
|
||||
is_from_sql_(false)
|
||||
is_from_sql_(false),
|
||||
is_for_replace_(false)
|
||||
{}
|
||||
|
||||
void ObTableLockService::ObRetryCtx::reuse()
|
||||
@ -798,7 +799,8 @@ int ObTableLockService::lock_partition_or_subpartition(ObTxDesc &tx_desc,
|
||||
|
||||
int ObTableLockService::lock(ObTxDesc &tx_desc,
|
||||
const ObTxParam &tx_param,
|
||||
const ObLockRequest &arg)
|
||||
const ObLockRequest &arg,
|
||||
const bool is_for_replace)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -816,6 +818,9 @@ int ObTableLockService::lock(ObTxDesc &tx_desc,
|
||||
if (OB_FAIL(ctx.set_by_lock_req(arg))) {
|
||||
LOG_WARN("set ObTableLockCtx failed", K(ret), K(arg));
|
||||
} else {
|
||||
if (is_for_replace) {
|
||||
ctx.is_for_replace_ = true;
|
||||
}
|
||||
ctx.is_in_trans_ = true;
|
||||
ctx.tx_desc_ = &tx_desc;
|
||||
ctx.tx_param_ = tx_param;
|
||||
@ -857,8 +862,7 @@ int ObTableLockService::replace_lock(ObTxDesc &tx_desc,
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tx_desc), K(replace_req), K(tx_desc.is_valid()),
|
||||
K(tx_param.is_valid()), K(replace_req.is_valid()));
|
||||
// FIXME: change to 431 later
|
||||
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_3_0_0))) {
|
||||
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_3_4_0))) {
|
||||
LOG_WARN("cluster version check failed", K(ret), K(replace_req));
|
||||
} else {
|
||||
ObReplaceTableLockCtx ctx;
|
||||
@ -879,6 +883,37 @@ int ObTableLockService::replace_lock(ObTxDesc &tx_desc,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLockService::replace_lock(ObTxDesc &tx_desc,
|
||||
const ObTxParam &tx_param,
|
||||
const ObReplaceAllLocksRequest &replace_req)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("lock service is not inited", K(ret));
|
||||
} else if (OB_UNLIKELY(!tx_desc.is_valid()) ||
|
||||
OB_UNLIKELY(!tx_param.is_valid()) ||
|
||||
OB_UNLIKELY(!replace_req.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(tx_desc), K(replace_req), K(tx_desc.is_valid()),
|
||||
K(tx_param.is_valid()), K(replace_req.is_valid()));
|
||||
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_3_4_0))) {
|
||||
LOG_WARN("cluster version check failed", K(ret), K(replace_req));
|
||||
} else {
|
||||
for (int64_t i = 0; i < replace_req.unlock_req_list_.count() && OB_SUCC(ret); i++) {
|
||||
if (OB_FAIL(unlock(tx_desc, tx_param, *replace_req.unlock_req_list_.at(i)))) {
|
||||
LOG_WARN("unlock in replace failed", K(ret), K(replace_req));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(lock(tx_desc, tx_param, *replace_req.lock_req_, true))){
|
||||
LOG_WARN("lock in replace failed", K(ret), K(replace_req));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLockService::garbage_collect_right_now()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1637,9 +1672,6 @@ int ObTableLockService::pack_batch_request_(ObTableLockCtx &ctx,
|
||||
ObLockParam lock_param;
|
||||
if (OB_FAIL(request.init(task_type, ls_id, ctx.tx_desc_))) {
|
||||
LOG_WARN("request init failed", K(ret), K(ctx), K(ls_id), KP(ctx.tx_desc_), K(lock_ids), K(task_type));
|
||||
} else if (ctx.is_replace_task()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("lock_param is not compatible with request", K(ret), K(ctx), K(task_type));
|
||||
} else {
|
||||
for (int i = 0; i < lock_ids.count() && OB_SUCC(ret); ++i) {
|
||||
lock_param.reset();
|
||||
@ -1650,7 +1682,8 @@ int ObTableLockService::pack_batch_request_(ObTableLockCtx &ctx,
|
||||
ctx.schema_version_,
|
||||
ctx.is_deadlock_avoid_enabled(),
|
||||
ctx.is_try_lock(),
|
||||
ctx.abs_timeout_ts_))) {
|
||||
ctx.abs_timeout_ts_,
|
||||
ctx.is_for_replace_))) {
|
||||
LOG_WARN("get lock param failed", K(ret));
|
||||
} else if (OB_FAIL(request.params_.push_back(lock_param))) {
|
||||
LOG_WARN("get lock request failed", K(ret), K(lock_param));
|
||||
@ -1809,7 +1842,7 @@ int ObTableLockService::pack_and_call_rpc_(RpcProxy &proxy_batch,
|
||||
int ret = OB_SUCCESS;
|
||||
ObLockTaskBatchRequest<ObLockParam> request;
|
||||
if (OB_FAIL(pack_batch_request_(ctx, ctx.task_type_, ls_id, lock_ids, request))) {
|
||||
LOG_WARN("pack_request_ failed", K(ret), K(ls_id), K(ctx), K(lock_ids));
|
||||
LOG_WARN("pack_batch_request_ failed", K(ret), K(ls_id), K(ctx), K(lock_ids));
|
||||
} else if (OB_FAIL(rpc_call_(proxy_batch, addr, ctx.get_rpc_timeoutus(), request))) {
|
||||
LOG_WARN("failed to call async rpc", KR(ret), K(addr), K(ctx.abs_timeout_ts_), K(request));
|
||||
} else {
|
||||
@ -1831,7 +1864,7 @@ int ObTableLockService::pack_and_call_rpc_(obrpc::ObBatchReplaceLockProxy &proxy
|
||||
int ret = OB_SUCCESS;
|
||||
ObLockTaskBatchRequest<ObReplaceLockParam> request;
|
||||
if (OB_FAIL(pack_batch_request_(ctx, ctx.task_type_, ls_id, lock_ids, request))) {
|
||||
LOG_WARN("pack_request_ failed", K(ret), K(ls_id), K(ctx), K(lock_ids));
|
||||
LOG_WARN("pack_batch_request_ failed", K(ret), K(ls_id), K(ctx), K(lock_ids));
|
||||
} else if (OB_FAIL(rpc_call_(proxy_batch, addr, ctx.get_rpc_timeoutus(), request))) {
|
||||
LOG_WARN("failed to call async rpc", KR(ret), K(addr), K(ctx.abs_timeout_ts_), K(request));
|
||||
} else {
|
||||
@ -1923,54 +1956,6 @@ int ObTableLockService::batch_rpc_handle_(RpcProxy &proxy_batch,
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <class RpcProxy>
|
||||
int ObTableLockService::parallel_rpc_handle_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
const LockMap &lock_map,
|
||||
const ObLSLockMap &ls_lock_map)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t timeout_us = 0;
|
||||
bool unused = false;
|
||||
ObRetryCtx retry_ctx;
|
||||
ObAddr addr;
|
||||
ObTableLockTaskRequest request;
|
||||
FOREACH_X(lock, lock_map, OB_SUCC(ret)) {
|
||||
proxy_batch.reuse();
|
||||
retry_ctx.reuse();
|
||||
addr.reset();
|
||||
request.reset();
|
||||
const ObLockID &lock_id = lock->first;
|
||||
const share::ObLSID &ls_id = lock->second;
|
||||
|
||||
if (OB_FAIL(retry_ctx.rpc_ls_array_.push_back(ls_id))) {
|
||||
LOG_WARN("push_back lsid failed", K(ret), K(ls_id));
|
||||
} else if (OB_FAIL(pack_request_(ctx, ctx.task_type_, lock_id, ls_id, addr, request))) {
|
||||
LOG_WARN("pack_request_ failed", K(ret), K(ls_id), K(lock_id));
|
||||
} else if (FALSE_IT(timeout_us = ctx.get_rpc_timeoutus())) {
|
||||
} else if (ctx.is_timeout()) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("process obj lock timeout", K(ret), K(ctx));
|
||||
} else if (OB_FAIL(rpc_call_(proxy_batch,
|
||||
addr,
|
||||
timeout_us,
|
||||
request))) {
|
||||
LOG_WARN("failed to all async rpc", KR(ret), K(addr), K(timeout_us), K(request));
|
||||
} else {
|
||||
retry_ctx.send_rpc_count_++;
|
||||
ALLOW_NEXT_LOG();
|
||||
LOG_INFO("send table lock rpc", KR(ret), K(retry_ctx.send_rpc_count_),
|
||||
K(addr), "request", request);
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
(void)collect_rollback_info_(retry_ctx.rpc_ls_array_, proxy_batch, ctx);
|
||||
} else {
|
||||
ret = handle_parallel_rpc_response_(proxy_batch, ctx, ls_lock_map, unused, retry_ctx);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLockService::inner_process_obj_lock_batch_(ObTableLockCtx &ctx,
|
||||
const ObLSLockMap &lock_map)
|
||||
{
|
||||
@ -1991,36 +1976,12 @@ int ObTableLockService::inner_process_obj_lock_batch_(ObTableLockCtx &ctx,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLockService::inner_process_obj_lock_old_version_(ObTableLockCtx &ctx,
|
||||
const LockMap &lock_map,
|
||||
const ObLSLockMap &ls_lock_map)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
obrpc::ObSrvRpcProxy rpc_proxy(*GCTX.srv_rpc_proxy_);
|
||||
rpc_proxy.set_detect_session_killed(true);
|
||||
// TODO: yanyuan.cxf we process the rpc one by one and do parallel later.
|
||||
if (ctx.is_unlock_task()) {
|
||||
ObHighPriorityTableLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::unlock_table);
|
||||
ret = parallel_rpc_handle_(proxy_batch, ctx, lock_map, ls_lock_map);
|
||||
} else {
|
||||
ObTableLockProxy proxy_batch(rpc_proxy, &obrpc::ObSrvRpcProxy::lock_table);
|
||||
ret = parallel_rpc_handle_(proxy_batch, ctx, lock_map, ls_lock_map);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLockService::inner_process_obj_lock_(ObTableLockCtx &ctx,
|
||||
const LockMap &lock_map,
|
||||
const ObLSLockMap &ls_lock_map)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (GET_MIN_CLUSTER_VERSION() > CLUSTER_VERSION_4_0_0_0) {
|
||||
ret = inner_process_obj_lock_batch_(ctx, ls_lock_map);
|
||||
} else {
|
||||
ret = inner_process_obj_lock_old_version_(ctx, lock_map, ls_lock_map);
|
||||
}
|
||||
|
||||
ret = inner_process_obj_lock_batch_(ctx, ls_lock_map);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -122,6 +122,7 @@ private:
|
||||
|
||||
// use to kill the whole lock table stmt.
|
||||
transaction::ObTxSEQ stmt_savepoint_;
|
||||
bool is_for_replace_;
|
||||
|
||||
TO_STRING_KV(K(task_type_), K(is_in_trans_), K(table_id_), K(partition_id_),
|
||||
K(tablet_list_), K(obj_list_), K(lock_op_type_),
|
||||
@ -130,7 +131,8 @@ private:
|
||||
K(current_savepoint_), K(need_rollback_ls_),
|
||||
K(lock_mode_), K(lock_owner_),
|
||||
K(schema_version_), K(tx_is_killed_),
|
||||
K(is_from_sql_), K(ret_code_before_end_stmt_or_tx_), K(stmt_savepoint_));
|
||||
K(is_from_sql_), K(ret_code_before_end_stmt_or_tx_),
|
||||
K(stmt_savepoint_), K_(is_for_replace));
|
||||
};
|
||||
|
||||
class ObReplaceTableLockCtx : public ObTableLockCtx
|
||||
@ -260,7 +262,8 @@ public:
|
||||
ObLockPartitionRequest &arg);
|
||||
int lock(ObTxDesc &tx_desc,
|
||||
const ObTxParam &tx_param,
|
||||
const ObLockRequest &arg);
|
||||
const ObLockRequest &arg,
|
||||
const bool is_for_replace = false);
|
||||
int unlock(ObTxDesc &tx_desc,
|
||||
const ObTxParam &tx_param,
|
||||
const ObUnLockRequest &arg);
|
||||
@ -268,6 +271,9 @@ public:
|
||||
int replace_lock(ObTxDesc &tx_desc,
|
||||
const ObTxParam &tx_param,
|
||||
const ObReplaceLockRequest &replace_req);
|
||||
int replace_lock(ObTxDesc &tx_desc,
|
||||
const ObTxParam &tx_param,
|
||||
const ObReplaceAllLocksRequest &replace_req);
|
||||
int garbage_collect_right_now();
|
||||
int get_obj_lock_garbage_collector(ObOBJLockGarbageCollector *&obj_lock_garbage_collector);
|
||||
|
||||
@ -344,11 +350,6 @@ private:
|
||||
const ObLockIDArray &lock_ids,
|
||||
ObLockTaskBatchRequest<ObReplaceLockParam> &request);
|
||||
template<class RpcProxy>
|
||||
int parallel_rpc_handle_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
const LockMap &lock_map,
|
||||
const ObLSLockMap &ls_lock_map);
|
||||
template<class RpcProxy>
|
||||
int batch_rpc_handle_(RpcProxy &proxy_batch,
|
||||
ObTableLockCtx &ctx,
|
||||
const ObLSLockMap &lock_map);
|
||||
@ -428,9 +429,6 @@ private:
|
||||
int inner_process_obj_lock_(ObTableLockCtx &ctx,
|
||||
const LockMap &lock_map,
|
||||
const ObLSLockMap &ls_lock_map);
|
||||
int inner_process_obj_lock_old_version_(ObTableLockCtx &ctx,
|
||||
const LockMap &lock_map,
|
||||
const ObLSLockMap &ls_lock_map);
|
||||
int inner_process_obj_lock_batch_(ObTableLockCtx &ctx,
|
||||
const ObLSLockMap &ls_lock_map);
|
||||
int process_obj_lock_(ObTableLockCtx &ctx,
|
||||
|
@ -80,14 +80,16 @@ bool unlock_req_is_equal(const ObLockRequest &req1, const ObLockRequest &req2)
|
||||
&& req1.op_type_ == req2.op_type_ && req1.timeout_us_ == req2.timeout_us_;
|
||||
if (is_equal) {
|
||||
switch (req1.type_) {
|
||||
case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_TABLE_REQ: {
|
||||
case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_TABLE_REQ:
|
||||
case transaction::tablelock::ObLockRequest::ObLockMsgType::UNLOCK_TABLE_REQ: {
|
||||
const ObUnLockTableRequest &lock_req1 = static_cast<const ObUnLockTableRequest &>(req1);
|
||||
const ObUnLockTableRequest &lock_req2 = static_cast<const ObUnLockTableRequest &>(req2);
|
||||
is_equal = lock_req1.table_id_ == lock_req2.table_id_;
|
||||
TABLELOCK_LOG(INFO, "compare unlock request", K(lock_req1), K(lock_req2), K(is_equal));
|
||||
break;
|
||||
}
|
||||
case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_TABLET_REQ: {
|
||||
case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_TABLET_REQ:
|
||||
case transaction::tablelock::ObLockRequest::ObLockMsgType::UNLOCK_TABLET_REQ: {
|
||||
const ObUnLockTabletsRequest &lock_req1 = static_cast<const ObUnLockTabletsRequest &>(req1);
|
||||
const ObUnLockTabletsRequest &lock_req2 = static_cast<const ObUnLockTabletsRequest &>(req2);
|
||||
is_equal =
|
||||
@ -95,21 +97,24 @@ bool unlock_req_is_equal(const ObLockRequest &req1, const ObLockRequest &req2)
|
||||
TABLELOCK_LOG(INFO, "compare unlock request", K(lock_req1), K(lock_req2), K(is_equal));
|
||||
break;
|
||||
}
|
||||
case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_PARTITION_REQ: {
|
||||
case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_PARTITION_REQ:
|
||||
case transaction::tablelock::ObLockRequest::ObLockMsgType::UNLOCK_PARTITION_REQ: {
|
||||
const ObUnLockPartitionRequest &lock_req1 = static_cast<const ObUnLockPartitionRequest &>(req1);
|
||||
const ObUnLockPartitionRequest &lock_req2 = static_cast<const ObUnLockPartitionRequest &>(req2);
|
||||
is_equal = lock_req1.table_id_ == lock_req2.table_id_ && lock_req1.part_object_id_ == lock_req2.part_object_id_;
|
||||
TABLELOCK_LOG(INFO, "compare unlock request", K(lock_req1), K(lock_req2), K(is_equal));
|
||||
break;
|
||||
}
|
||||
case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_OBJ_REQ: {
|
||||
case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_OBJ_REQ:
|
||||
case transaction::tablelock::ObLockRequest::ObLockMsgType::UNLOCK_OBJ_REQ: {
|
||||
const ObUnLockObjsRequest &lock_req1 = static_cast<const ObUnLockObjsRequest &>(req1);
|
||||
const ObUnLockObjsRequest &lock_req2 = static_cast<const ObUnLockObjsRequest &>(req2);
|
||||
is_equal = list_is_equal(lock_req1.objs_, lock_req1.objs_);
|
||||
TABLELOCK_LOG(INFO, "compare unlock request", K(lock_req1), K(lock_req2), K(is_equal));
|
||||
break;
|
||||
}
|
||||
case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_ALONE_TABLET_REQ: {
|
||||
case transaction::tablelock::ObLockRequest::ObLockMsgType::LOCK_ALONE_TABLET_REQ:
|
||||
case transaction::tablelock::ObLockRequest::ObLockMsgType::UNLOCK_ALONE_TABLET_REQ: {
|
||||
const ObUnLockAloneTabletRequest &lock_req1 = static_cast<const ObUnLockAloneTabletRequest &>(req1);
|
||||
const ObUnLockAloneTabletRequest &lock_req2 = static_cast<const ObUnLockAloneTabletRequest &>(req2);
|
||||
is_equal = lock_req1.table_id_ == lock_req2.table_id_ && lock_req1.ls_id_ == lock_req2.ls_id_
|
||||
@ -211,6 +216,81 @@ TEST(ObReplaceLockRequest, test_replace_objs)
|
||||
replace_req.unlock_req_ = &unlock_req;
|
||||
CHECK_SERIALIZE_AND_DESERIALIZE(ObUnLockTableRequest);
|
||||
}
|
||||
|
||||
TEST(ObReplaceLockRequest, test_replace_all_locks)
|
||||
{
|
||||
INIT_SUCC(ret);
|
||||
ObArenaAllocator allocator;
|
||||
ObReplaceAllLocksRequest replace_req(allocator);
|
||||
ObReplaceAllLocksRequest des_replace_req(allocator);
|
||||
ObUnLockTableRequest unlock_req1;
|
||||
ObUnLockTableRequest unlock_req2;
|
||||
ObLockTableRequest lock_req;
|
||||
const int LEN = 100;
|
||||
char buf[LEN];
|
||||
int64_t pos = 0;
|
||||
char tmp_buf[LEN];
|
||||
int64_t tmp_pos = 0;
|
||||
bool is_equal = false;
|
||||
|
||||
unlock_req1.type_ = ObLockRequest::ObLockMsgType::UNLOCK_TABLE_REQ;
|
||||
unlock_req1.owner_id_.convert_from_value(1001);
|
||||
unlock_req1.lock_mode_ = ROW_SHARE;
|
||||
unlock_req1.op_type_ = OUT_TRANS_UNLOCK;
|
||||
unlock_req1.timeout_us_ = 1000;
|
||||
unlock_req1.table_id_ = 998;
|
||||
|
||||
unlock_req2.type_ = ObLockRequest::ObLockMsgType::UNLOCK_TABLE_REQ;
|
||||
unlock_req2.owner_id_.convert_from_value(1002);
|
||||
unlock_req2.lock_mode_ = ROW_EXCLUSIVE;
|
||||
unlock_req2.op_type_ = OUT_TRANS_UNLOCK;
|
||||
unlock_req2.timeout_us_ = 1000;
|
||||
unlock_req2.table_id_ = 998;
|
||||
|
||||
lock_req.owner_id_.convert_from_value(1003);
|
||||
lock_req.lock_mode_ = EXCLUSIVE;
|
||||
lock_req.op_type_ = OUT_TRANS_LOCK;
|
||||
lock_req.timeout_us_ = 1000;
|
||||
lock_req.table_id_ = 998;
|
||||
|
||||
replace_req.unlock_req_list_.push_back(&unlock_req1);
|
||||
replace_req.unlock_req_list_.push_back(&unlock_req2);
|
||||
replace_req.lock_req_ = &lock_req;
|
||||
|
||||
ret = replace_req.serialize(buf, LEN, pos);
|
||||
TABLELOCK_LOG(INFO,
|
||||
"1. serailize replace_req",
|
||||
K(ret),
|
||||
K(replace_req),
|
||||
K(unlock_req1),
|
||||
K(unlock_req2),
|
||||
K(lock_req),
|
||||
K(pos),
|
||||
KPHEX(buf, pos));
|
||||
ASSERT_EQ(ret, OB_SUCCESS);
|
||||
|
||||
pos = 0;
|
||||
ret = des_replace_req.deserialize(buf, LEN, pos);
|
||||
|
||||
TABLELOCK_LOG(INFO,
|
||||
"2. deserailize replace_req",
|
||||
K(ret),
|
||||
K(replace_req),
|
||||
K(des_replace_req),
|
||||
K(unlock_req1),
|
||||
K(unlock_req2),
|
||||
K(lock_req),
|
||||
K(pos),
|
||||
KPHEX(buf, pos));
|
||||
ASSERT_EQ(ret, OB_SUCCESS);
|
||||
|
||||
is_equal = unlock_req_is_equal(*des_replace_req.lock_req_, *replace_req.lock_req_);
|
||||
ASSERT_TRUE(is_equal);
|
||||
for (int64_t i = 0; i < des_replace_req.unlock_req_list_.count() && is_equal; i++) {
|
||||
is_equal = unlock_req_is_equal(*des_replace_req.unlock_req_list_.at(i), *replace_req.unlock_req_list_.at(i));
|
||||
}
|
||||
ASSERT_TRUE(is_equal);
|
||||
}
|
||||
} // namespace unittest
|
||||
} // namespace oceanbase
|
||||
int main(int argc, char **argv)
|
||||
|
Loading…
x
Reference in New Issue
Block a user