BUGFIX: ddl lock table using lock thread group

This commit is contained in:
obdev
2024-02-09 02:07:09 +00:00
committed by ob-robot
parent c246a053b2
commit f67db3c34f
14 changed files with 432 additions and 78 deletions

View File

@ -392,6 +392,7 @@ TEST_F(ObTableLockServiceTest, lock_part)
ObTableLockOwnerID OWNER_ONE(1);
ObTableLockOwnerID OWNER_TWO(2);
ObLockPartitionRequest lock_arg;
ObUnLockPartitionRequest unlock_arg;
tx_param.access_mode_ = ObTxAccessMode::RW;
tx_param.isolation_ = ObTxIsolationLevel::RC;
@ -460,16 +461,16 @@ TEST_F(ObTableLockServiceTest, lock_part)
get_table_part_ids(table_id, part_ids);
lock_mode = ROW_EXCLUSIVE;
lock_arg.owner_id_ = OWNER_ONE;
lock_arg.lock_mode_ = lock_mode;
lock_arg.op_type_ = OUT_TRANS_UNLOCK;
lock_arg.timeout_us_ = 0;
lock_arg.table_id_ = table_id;
lock_arg.part_object_id_ = part_ids[0];
unlock_arg.owner_id_ = OWNER_ONE;
unlock_arg.lock_mode_ = lock_mode;
unlock_arg.op_type_ = OUT_TRANS_UNLOCK;
unlock_arg.timeout_us_ = 0;
unlock_arg.table_id_ = table_id;
unlock_arg.part_object_id_ = part_ids[0];
ret = MTL(ObTableLockService*)->unlock_partition(*tx_desc,
tx_param,
lock_arg);
unlock_arg);
ASSERT_EQ(OB_SUCCESS, ret);
// commit
LOG_INFO("ObTableLockServiceTest::lock_part 4.2");

View File

@ -1184,15 +1184,17 @@ int ObInnerSQLConnection::register_multi_data_source(const uint64_t &tenant_id,
int ObInnerSQLConnection::forward_request(const uint64_t tenant_id,
const int64_t op_type,
const ObString &sql,
ObInnerSQLResult &res)
ObInnerSQLResult &res,
const int32_t group_id)
{
return forward_request_(tenant_id, op_type, sql, res);
return forward_request_(tenant_id, op_type, sql, res, group_id);
}
int ObInnerSQLConnection::forward_request_(const uint64_t tenant_id,
const int64_t op_type,
const ObString &sql,
ObInnerSQLResult &res)
ObInnerSQLResult &res,
const int32_t group_id)
{
int ret = OB_SUCCESS;
@ -1204,6 +1206,10 @@ int ObInnerSQLConnection::forward_request_(const uint64_t tenant_id,
ObSQLMode sql_mode = 0;
const ObSessionDDLInfo &ddl_info = get_session().get_ddl_info();
bool is_load_data_exec = get_session().is_load_data_exec_session();
int32_t real_group_id = group_id_;
if (0 != group_id) {
real_group_id = group_id;
}
if (is_resource_conn()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("resource_conn of resource_svr still doesn't has the tenant resource", K(ret),
@ -1233,7 +1239,7 @@ int ObInnerSQLConnection::forward_request_(const uint64_t tenant_id,
} else if (OB_FAIL(GCTX.inner_sql_rpc_proxy_->to(get_resource_svr())
.by(tenant_id)
.timeout(query_timeout)
.group_id(group_id_)
.group_id(real_group_id)
.inner_sql_sync_transmit(arg,
*(handler->get_result()),
handler->get_handle()))) {

View File

@ -62,10 +62,10 @@ class ObLockObjRequest;
class ObLockTableRequest;
class ObLockTabletRequest;
class ObLockPartitionRequest;
using ObUnLockObjRequest = ObLockObjRequest;
using ObUnLockTableRequest = ObLockTableRequest;
using ObUnLockPartitionRequest = ObLockPartitionRequest;
using ObUnLockTabletRequest = ObLockTabletRequest;
class ObUnLockObjRequest;
class ObUnLockTableRequest;
class ObUnLockPartitionRequest;
class ObUnLockTabletRequest;
}
}
namespace observer
@ -254,7 +254,8 @@ public:
int forward_request(const uint64_t tenant_id,
const int64_t op_type,
const ObString &sql,
ObInnerSQLResult &res);
ObInnerSQLResult &res,
const int32_t group_id = 0);
public:
// nested session and sql execute for foreign key.
@ -366,7 +367,8 @@ private:
int forward_request_(const uint64_t tenant_id,
const int64_t op_type,
const ObString &sql,
ObInnerSQLResult &res);
ObInnerSQLResult &res,
const int32_t group_id = 0);
int get_session_timeout_for_rpc(int64_t &query_timeout, int64_t &trx_timeout);
private:
bool inited_;

View File

@ -172,6 +172,7 @@ cal_version(const uint64_t major, const uint64_t minor, const uint64_t major_pat
#define CLUSTER_VERSION_4_2_1_0 (oceanbase::common::cal_version(4, 2, 1, 0))
#define CLUSTER_VERSION_4_2_1_1 (oceanbase::common::cal_version(4, 2, 1, 1))
#define CLUSTER_VERSION_4_2_1_2 (oceanbase::common::cal_version(4, 2, 1, 2))
#define CLUSTER_VERSION_4_2_1_3 (oceanbase::common::cal_version(4, 2, 1, 3))
#define CLUSTER_VERSION_4_2_2_0 (oceanbase::common::cal_version(4, 2, 2, 0))
#define CLUSTER_VERSION_4_3_0_0 (oceanbase::common::cal_version(4, 3, 0, 0))
//!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

View File

@ -828,14 +828,13 @@ int ObTransferLockUtil::unlock_tablet_on_src_ls_for_table_lock(
return ret;
}
template<typename LockArg>
int ObTransferLockUtil::process_table_lock_on_tablets_(
ObMySQLTransaction &trans,
const uint64_t tenant_id,
const ObLSID &ls_id,
const transaction::tablelock::ObTableLockOwnerID &lock_owner_id,
const ObDisplayTabletList &table_lock_tablet_list,
LockArg &lock_arg)
ObLockAloneTabletRequest &lock_arg)
{
int ret = OB_SUCCESS;
lock_arg.tablet_ids_.reset();
@ -872,7 +871,7 @@ int ObTransferLockUtil::process_table_lock_on_tablets_(
LOG_WARN("lock tablet failed", KR(ret), K(tenant_id), K(lock_arg));
}
} else if (OUT_TRANS_UNLOCK == lock_arg.op_type_) {
if (OB_FAIL(ObInnerConnectionLockUtil::unlock_tablet(tenant_id, lock_arg, conn))) {
if (OB_FAIL(ObInnerConnectionLockUtil::unlock_tablet(tenant_id, static_cast<ObUnLockAloneTabletRequest &>(lock_arg), conn))) {
LOG_WARN("unock tablet failed", KR(ret), K(tenant_id), K(lock_arg));
}
} else {

View File

@ -28,6 +28,14 @@
namespace oceanbase
{
namespace transaction
{
namespace tablelock
{
class ObLockAloneTabletRequest;
}
}
namespace share
{
namespace schema
@ -455,14 +463,13 @@ public:
const transaction::tablelock::ObTableLockOwnerID &lock_owner_id,
const ObDisplayTabletList &table_lock_tablet_list);
private:
template<typename LockArg>
static int process_table_lock_on_tablets_(
ObMySQLTransaction &trans,
const uint64_t tenant_id,
const ObLSID &ls_id,
const transaction::tablelock::ObTableLockOwnerID &lock_owner_id,
const ObDisplayTabletList &table_lock_tablet_list,
LockArg &lock_arg);
transaction::tablelock::ObLockAloneTabletRequest &lock_arg);
};
} // end namespace share

View File

@ -395,17 +395,23 @@ int ObDDLLock::do_table_lock(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid conn", K(ret));
} else {
ObLockTableRequest arg;
arg.table_id_ = table_id;
arg.owner_id_ = lock_owner;
arg.lock_mode_ = lock_mode;
arg.op_type_ = op_type;
arg.timeout_us_ = timeout_us;
if (is_lock) {
ObLockTableRequest arg;
arg.table_id_ = table_id;
arg.owner_id_ = lock_owner;
arg.lock_mode_ = lock_mode;
arg.op_type_ = op_type;
arg.timeout_us_ = timeout_us;
if (OB_FAIL(ObInnerConnectionLockUtil::lock_table(tenant_id, arg, iconn))) {
LOG_WARN("failed to lock table", K(ret));
}
} else {
ObUnLockTableRequest arg;
arg.table_id_ = table_id;
arg.owner_id_ = lock_owner;
arg.lock_mode_ = lock_mode;
arg.op_type_ = op_type;
arg.timeout_us_ = timeout_us;
if (OB_FAIL(ObInnerConnectionLockUtil::unlock_table(tenant_id, arg, iconn))) {
if (OB_OBJ_LOCK_NOT_EXIST == ret) {
ret = OB_SUCCESS;
@ -450,7 +456,9 @@ int ObDDLLock::do_table_lock(
if (OB_SUCC(ret)) {
ObArray<ObLSID> ls_ids;
ObArray<ObLockAloneTabletRequest> args;
ObArray<ObLockAloneTabletRequest> lock_args;
ObArray<ObUnLockAloneTabletRequest> unlock_args;
int64_t arg_count = 0;
if (OB_FAIL(share::ObTabletToLSTableOperator::batch_get_ls(trans, tenant_id, tablet_ids, ls_ids))) {
LOG_WARN("failed to get tablet ls", K(ret));
} else if (OB_UNLIKELY(ls_ids.count() != tablet_ids.count())) {
@ -460,39 +468,44 @@ int ObDDLLock::do_table_lock(
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); i++) {
const ObLSID &ls_id = ls_ids[i];
int64_t j = 0;
for (; j < args.count(); j++) {
if (args[j].ls_id_ == ls_id) {
for (; j < (is_lock ? lock_args.count() : unlock_args.count()); j++) {
const ObLSID &tmp_ls_id = is_lock ? lock_args[j].ls_id_ : unlock_args[j].ls_id_;
if (tmp_ls_id == ls_id) {
break;
}
}
if (j == args.count()) {
ObLockAloneTabletRequest arg;
arg_count = is_lock ? lock_args.count() : unlock_args.count();
if (j == arg_count) {
ObLockAloneTabletRequest lock_arg;
ObUnLockAloneTabletRequest unlock_arg;
ObLockAloneTabletRequest &arg = is_lock ? lock_arg : unlock_arg;
arg.owner_id_ = lock_owner;
arg.lock_mode_ = lock_mode;
arg.op_type_ = op_type;
arg.timeout_us_ = timeout_us;
arg.ls_id_ = ls_id;
if (OB_FAIL(args.push_back(arg))) {
if (OB_FAIL(is_lock ? lock_args.push_back(lock_arg) : unlock_args.push_back(unlock_arg))) {
LOG_WARN("failed to push back modify arg", K(ret));
}
}
if (OB_SUCC(ret)) {
ObLockAloneTabletRequest &arg = args.at(j);
ObLockAloneTabletRequest &arg = is_lock ? lock_args.at(j) : unlock_args.at(j);
if (OB_FAIL(arg.tablet_ids_.push_back(tablet_ids.at(i)))) {
LOG_WARN("failed to push back", K(ret));
}
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < args.count(); i++) {
arg_count = is_lock ? lock_args.count() : unlock_args.count();
for (int64_t i = 0; OB_SUCC(ret) && i < arg_count; i++) {
if (is_lock) {
if (OB_FAIL(ObInnerConnectionLockUtil::lock_tablet(tenant_id, args[i], iconn))) {
LOG_WARN("failed to lock tablet", K(ret));
if (OB_FAIL(ObInnerConnectionLockUtil::lock_tablet(tenant_id, lock_args[i], iconn))) {
LOG_WARN("failed to lock tablet", K(ret), K(lock_args[i]));
}
} else {
if (OB_FAIL(ObInnerConnectionLockUtil::unlock_tablet(tenant_id, args[i], iconn))) {
if (OB_FAIL(ObInnerConnectionLockUtil::unlock_tablet(tenant_id, unlock_args[i], iconn))) {
if (OB_OBJ_LOCK_NOT_EXIST == ret) {
ret = OB_SUCCESS;
LOG_INFO("table lock already unlocked", K(ret), K(args[i]));
LOG_INFO("table lock already unlocked", K(ret), K(unlock_args[i]));
} else {
LOG_WARN("failed to unlock tablet", K(ret));
}
@ -727,7 +740,7 @@ int ObOnlineDDLLock::unlock_table(
{
int ret = OB_SUCCESS;
ObInnerSQLConnection *iconn = nullptr;
ObLockObjRequest arg;
ObUnLockObjRequest arg;
arg.obj_type_ = ObLockOBJType::OBJ_TYPE_ONLINE_DDL_TABLE;
arg.obj_id_ = table_id;
arg.timeout_us_ = timeout_us;

View File

@ -666,8 +666,19 @@ int ObInnerConnectionLockUtil::request_lock_(
} else if (OB_FAIL(arg.serialize(tmp_str, arg.get_serialize_size(), pos))) {
LOG_WARN("serialize lock table arg failed", K(ret), K(arg));
} else {
int32_t group_id = 0;
const int64_t min_cluster_version = GET_MIN_CLUSTER_VERSION();
if ((min_cluster_version > CLUSTER_VERSION_4_2_1_3 && min_cluster_version < CLUSTER_VERSION_4_2_2_0)
|| (min_cluster_version > CLUSTER_VERSION_4_2_2_0 && min_cluster_version < CLUSTER_VERSION_4_3_0_0)
|| (min_cluster_version >= CLUSTER_VERSION_4_3_0_0)) {
if (arg.is_unlock_request()) {
group_id = share::OBCG_UNLOCK;
} else {
group_id = share::OBCG_LOCK;
}
}
sql.assign_ptr(tmp_str, arg.get_serialize_size());
ret = conn->forward_request(tenant_id, operation_type, sql, res);
ret = conn->forward_request(tenant_id, operation_type, sql, res, group_id);
}
if (OB_NOT_NULL(tmp_str)) {

View File

@ -45,12 +45,12 @@ class ObLockTableRequest;
class ObLockTabletRequest;
class ObLockPartitionRequest;
class ObLockAloneTabletRequest;
using ObUnLockObjRequest = ObLockObjRequest;
using ObUnLockObjsRequest = ObLockObjsRequest;
using ObUnLockTableRequest = ObLockTableRequest;
using ObUnLockPartitionRequest = ObLockPartitionRequest;
using ObUnLockTabletRequest = ObLockTabletRequest;
using ObUnLockAloneTabletRequest = ObLockAloneTabletRequest;
class ObUnLockObjRequest;
class ObUnLockObjsRequest;
class ObUnLockTableRequest;
class ObUnLockPartitionRequest;
class ObUnLockTabletRequest;
class ObUnLockAloneTabletRequest;
class ObInnerConnectionLockUtil
{

View File

@ -39,6 +39,7 @@ using namespace share;
using namespace storage;
using namespace memtable;
using namespace common;
using namespace oceanbase::lib;
namespace transaction
{
@ -146,6 +147,7 @@ int ObLockMemtable::lock_(
// 1. record lock myself(check conflict).
// 2. record lock at memtable ctx.
// 3. create lock callback and list it on the callback list of memtable ctx.
do {
// retry if there is lock conflict at part trans ctx.
need_retry = false;
@ -328,6 +330,7 @@ int ObLockMemtable::unlock_(
// 1. record unlock op myself(check conflict).
// 2. record unlock op at memtable ctx.
// 3. create unlock callback and list it on the callback list of memtable ctx.
Thread::WaitGuard guard(Thread::WAIT);
do {
// retry if there is lock conflict at part trans ctx.
need_retry = false;
@ -549,6 +552,7 @@ int ObLockMemtable::lock(
{
int ret = OB_SUCCESS;
LOG_DEBUG("ObLockMemtable::lock ", K(lock_op));
Thread::WaitGuard guard(Thread::WAIT);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObLockMemtable not inited.", K(ret));
@ -575,6 +579,7 @@ int ObLockMemtable::unlock(
// only has OUT_TRANS_UNLOCK
int ret = OB_SUCCESS;
LOG_DEBUG("ObLockMemtable::unlock ", K(unlock_op));
Thread::WaitGuard guard(Thread::WAIT);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObLockMemtable not inited.", K(ret));
@ -726,6 +731,7 @@ int ObLockMemtable::get_lock_op_iter(const ObLockID &lock_id,
int ObLockMemtable::check_and_clear_obj_lock(const bool force_compact)
{
int ret = OB_SUCCESS;
Thread::WaitGuard guard(Thread::WAIT);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
TABLELOCK_LOG(WARN, "ObLockMemtable not inited.", K(ret));

View File

@ -173,6 +173,19 @@ OB_DEF_DESERIALIZE(ObLockTaskBatchRequest)
return ret;
}
bool is_unlock_request(const ObTableLockTaskType type)
{
return (UNLOCK_TABLE == type ||
UNLOCK_TABLET == type ||
UNLOCK_PARTITION == type ||
UNLOCK_SUBPARTITION == type ||
UNLOCK_OBJECT == type ||
UNLOCK_DDL_TABLE == type ||
UNLOCK_DDL_TABLET == type ||
UNLOCK_ALONE_TABLET == type);
}
void ObLockParam::reset()
{
lock_id_.reset();
@ -248,6 +261,14 @@ bool ObLockRequest::is_valid() const
is_op_type_valid(op_type_));
}
bool ObLockRequest::is_lock_thread_enabled() const
{
const int64_t min_cluster_version = GET_MIN_CLUSTER_VERSION();
return ((min_cluster_version > CLUSTER_VERSION_4_2_1_3 && min_cluster_version < CLUSTER_VERSION_4_2_2_0)
|| (min_cluster_version > CLUSTER_VERSION_4_2_2_0 && min_cluster_version < CLUSTER_VERSION_4_3_0_0)
|| (min_cluster_version >= CLUSTER_VERSION_4_3_0_0));
}
void ObLockObjRequest::reset()
{
ObLockRequest::reset();
@ -263,6 +284,27 @@ bool ObLockObjRequest::is_valid() const
is_valid_id(obj_id_));
}
ObUnLockObjRequest::ObUnLockObjRequest() : ObLockObjRequest()
{
if (is_lock_thread_enabled()) {
type_ = ObLockMsgType::LOCK_OBJ_REQ;
} else {
type_ = ObLockMsgType::UNLOCK_OBJ_REQ;
}
}
bool ObUnLockObjRequest::is_valid() const
{
bool valid = true;
valid = (ObLockRequest::is_valid() &&
is_lock_obj_type_valid(obj_type_) &&
is_valid_id(obj_id_));
valid = valid && (ObLockMsgType::LOCK_OBJ_REQ == type_ ||
ObLockMsgType::UNLOCK_OBJ_REQ == type_);
return valid;
}
void ObLockObjsRequest::reset()
{
ObLockRequest::reset();
@ -280,6 +322,28 @@ bool ObLockObjsRequest::is_valid() const
return is_valid;
}
ObUnLockObjsRequest::ObUnLockObjsRequest()
: ObLockObjsRequest()
{
if (is_lock_thread_enabled()) {
type_ = ObLockMsgType::LOCK_OBJ_REQ;
} else {
type_ = ObLockMsgType::UNLOCK_OBJ_REQ;
}
}
bool ObUnLockObjsRequest::is_valid() const
{
bool is_valid = true;
is_valid = (ObLockRequest::is_valid());
for (int64_t i = 0; i < objs_.count() && is_valid; i++) {
is_valid = is_valid && objs_.at(i).is_valid();
}
is_valid = is_valid && (ObLockMsgType::LOCK_OBJ_REQ == type_ ||
ObLockMsgType::UNLOCK_OBJ_REQ == type_);
return is_valid;
}
void ObLockTableRequest::reset()
{
ObLockRequest::reset();
@ -293,6 +357,25 @@ bool ObLockTableRequest::is_valid() const
is_valid_id(table_id_));
}
ObUnLockTableRequest::ObUnLockTableRequest() : ObLockTableRequest()
{
if (is_lock_thread_enabled()) {
type_ = ObLockMsgType::LOCK_TABLE_REQ;
} else {
type_ = ObLockMsgType::UNLOCK_TABLE_REQ;
}
}
bool ObUnLockTableRequest::is_valid() const
{
bool is_valid = true;
is_valid = (ObLockRequest::is_valid() &&
is_valid_id(table_id_) &&
(ObLockMsgType::LOCK_TABLE_REQ == type_ ||
ObLockMsgType::UNLOCK_TABLE_REQ == type_));
return is_valid;
}
void ObLockPartitionRequest::reset()
{
ObLockTableRequest::reset();
@ -307,6 +390,25 @@ bool ObLockPartitionRequest::is_valid() const
is_valid_id(part_object_id_));
}
ObUnLockPartitionRequest::ObUnLockPartitionRequest()
: ObLockPartitionRequest()
{
if (is_lock_thread_enabled()) {
type_ = ObLockMsgType::LOCK_PARTITION_REQ;
} else {
type_ = ObLockMsgType::UNLOCK_PARTITION_REQ;
}
}
bool ObUnLockPartitionRequest::is_valid() const
{
return ((ObLockMsgType::LOCK_PARTITION_REQ == type_ ||
ObLockMsgType::UNLOCK_PARTITION_REQ == type_) &&
ObLockRequest::is_valid() &&
is_valid_id(table_id_) &&
is_valid_id(part_object_id_));
}
void ObLockTabletRequest::reset()
{
ObLockTableRequest::reset();
@ -321,6 +423,24 @@ bool ObLockTabletRequest::is_valid() const
tablet_id_.is_valid());
}
ObUnLockTabletRequest::ObUnLockTabletRequest() : ObLockTabletRequest()
{
if (is_lock_thread_enabled()) {
type_ = ObLockMsgType::LOCK_TABLET_REQ;
} else {
type_ = ObLockMsgType::UNLOCK_TABLET_REQ;
}
}
bool ObUnLockTabletRequest::is_valid() const
{
return ((ObLockMsgType::LOCK_TABLET_REQ == type_ ||
ObLockMsgType::UNLOCK_TABLET_REQ == type_) &&
ObLockRequest::is_valid() &&
is_valid_id(table_id_) &&
tablet_id_.is_valid());
}
void ObLockTabletsRequest::reset()
{
ObLockTableRequest::reset();
@ -339,6 +459,28 @@ bool ObLockTabletsRequest::is_valid() const
return is_valid;
}
ObUnLockTabletsRequest::ObUnLockTabletsRequest() : ObLockTabletsRequest()
{
if (is_lock_thread_enabled()) {
type_ = ObLockMsgType::LOCK_TABLET_REQ;
} else {
type_ = ObLockMsgType::UNLOCK_TABLET_REQ;
}
}
bool ObUnLockTabletsRequest::is_valid() const
{
bool is_valid = true;
is_valid = ((ObLockMsgType::LOCK_TABLET_REQ == type_ ||
ObLockMsgType::UNLOCK_TABLET_REQ == type_) &&
ObLockRequest::is_valid() &&
is_valid_id(table_id_));
for (int64_t i = 0; i < tablet_ids_.count() && is_valid; i++) {
is_valid = is_valid && tablet_ids_.at(i).is_valid();
}
return is_valid;
}
void ObLockAloneTabletRequest::reset()
{
ObLockTabletsRequest::reset();
@ -357,6 +499,28 @@ bool ObLockAloneTabletRequest::is_valid() const
return is_valid;
}
ObUnLockAloneTabletRequest::ObUnLockAloneTabletRequest() : ObLockAloneTabletRequest()
{
if (is_lock_thread_enabled()) {
type_ = ObLockMsgType::LOCK_ALONE_TABLET_REQ;
} else {
type_ = ObLockMsgType::UNLOCK_ALONE_TABLET_REQ;
}
}
bool ObUnLockAloneTabletRequest::is_valid() const
{
bool is_valid = true;
is_valid = ((ObLockMsgType::LOCK_ALONE_TABLET_REQ == type_ ||
ObLockMsgType::UNLOCK_ALONE_TABLET_REQ == type_) &&
ObLockRequest::is_valid() &&
ls_id_.is_valid());
for (int64_t i = 0; i < tablet_ids_.count() && is_valid; i++) {
is_valid = is_valid && tablet_ids_.at(i).is_valid();
}
return is_valid;
}
int ObTableLockTaskRequest::set(
const ObTableLockTaskType task_type,
const share::ObLSID &lsid,

View File

@ -58,6 +58,8 @@ enum ObTableLockTaskType
MAX_TASK_TYPE,
};
bool is_unlock_request(const ObTableLockTaskType type);
struct ObLockParam
{
OB_UNIS_VERSION_V(1);
@ -117,6 +119,11 @@ public:
LOCK_PARTITION_REQ = 5,
LOCK_TABLET_REQ = 6,
LOCK_ALONE_TABLET_REQ = 7,
UNLOCK_OBJ_REQ = 8,
UNLOCK_TABLE_REQ = 9,
UNLOCK_PARTITION_REQ = 10,
UNLOCK_TABLET_REQ = 11,
UNLOCK_ALONE_TABLET_REQ = 12,
};
public:
ObLockRequest() :
@ -129,6 +136,19 @@ public:
virtual ~ObLockRequest() { reset(); }
virtual void reset();
virtual bool is_valid() const;
bool is_lock_thread_enabled() const;
bool is_unlock_request() const
{
return (ObLockMsgType::UNLOCK_OBJ_REQ == type_ ||
ObLockMsgType::UNLOCK_TABLE_REQ == type_ ||
ObLockMsgType::UNLOCK_PARTITION_REQ == type_ ||
ObLockMsgType::UNLOCK_TABLET_REQ == type_ ||
ObLockMsgType::UNLOCK_ALONE_TABLET_REQ == type_);
}
bool is_lock_request() const
{
return !is_unlock_request();
}
VIRTUAL_TO_STRING_KV(K_(owner_id), K_(lock_mode), K_(op_type), K_(timeout_us));
public:
ObLockMsgType type_;
@ -157,7 +177,14 @@ public:
ObLockOBJType obj_type_;
uint64_t obj_id_;
};
using ObUnLockObjRequest = ObLockObjRequest;
struct ObUnLockObjRequest : ObLockObjRequest
{
public:
ObUnLockObjRequest();
virtual ~ObUnLockObjRequest() { reset(); }
virtual bool is_valid() const;
};
struct ObLockObjsRequest : public ObLockRequest
{
@ -174,8 +201,16 @@ public:
public:
// which objects should we lock
common::ObSEArray<ObLockID, 2> objs_;
};
using ObUnLockObjsRequest = ObLockObjsRequest;
};
struct ObUnLockObjsRequest : public ObLockObjsRequest
{
public:
ObUnLockObjsRequest();
virtual ~ObUnLockObjsRequest() { reset(); }
virtual bool is_valid() const;
};
struct ObLockTableRequest : public ObLockRequest
{
@ -191,7 +226,14 @@ public:
// which table should we lock
uint64_t table_id_;
};
using ObUnLockTableRequest = ObLockTableRequest;
struct ObUnLockTableRequest : public ObLockTableRequest
{
public:
ObUnLockTableRequest();
virtual ~ObUnLockTableRequest() { reset(); }
virtual bool is_valid() const;
};
struct ObLockPartitionRequest : public ObLockTableRequest
{
@ -206,7 +248,14 @@ public:
public:
uint64_t part_object_id_;
};
using ObUnLockPartitionRequest = ObLockPartitionRequest;
struct ObUnLockPartitionRequest : public ObLockPartitionRequest
{
public:
ObUnLockPartitionRequest();
virtual ~ObUnLockPartitionRequest() { reset(); }
virtual bool is_valid() const;
};
struct ObLockTabletRequest : public ObLockTableRequest
{
@ -221,7 +270,14 @@ public:
public:
common::ObTabletID tablet_id_;
};
using ObUnLockTabletRequest = ObLockTabletRequest;
struct ObUnLockTabletRequest : public ObLockTabletRequest
{
public:
ObUnLockTabletRequest();
virtual ~ObUnLockTabletRequest() { reset(); }
virtual bool is_valid() const;
};
struct ObLockTabletsRequest : public ObLockTableRequest
{
@ -236,7 +292,14 @@ public:
public:
common::ObTabletIDArray tablet_ids_;
};
using ObUnLockTabletsRequest = ObLockTabletsRequest;
struct ObUnLockTabletsRequest : public ObLockTabletsRequest
{
public:
ObUnLockTabletsRequest();
virtual ~ObUnLockTabletsRequest() { reset(); }
virtual bool is_valid() const;
};
struct ObLockAloneTabletRequest : public ObLockTabletsRequest
{
@ -251,7 +314,14 @@ public:
public:
share::ObLSID ls_id_;
};
using ObUnLockAloneTabletRequest = ObLockAloneTabletRequest;
struct ObUnLockAloneTabletRequest : public ObLockAloneTabletRequest
{
public:
ObUnLockAloneTabletRequest();
virtual ~ObUnLockAloneTabletRequest() { reset(); }
virtual bool is_valid() const;
};
class ObTableLockTaskRequest final
{
@ -281,7 +351,14 @@ public:
&& OB_NOT_NULL(tx_desc_)
&& tx_desc_->is_valid());
}
bool is_unlock_request() const
{
return ::oceanbase::transaction::tablelock::is_unlock_request(task_type_);
}
bool is_lock_request() const
{
return !is_unlock_request();
}
bool is_timeout() const;
TO_STRING_KV(K(task_type_), K(lsid_), K(param_), KP(tx_desc_));
@ -313,6 +390,14 @@ public:
bool is_inited() const;
bool is_valid() const;
int assign(const ObLockTaskBatchRequest &arg);
bool is_unlock_request() const
{
return ::oceanbase::transaction::tablelock::is_unlock_request(task_type_);
}
bool is_lock_request() const
{
return !is_unlock_request();
}
TO_STRING_KV(K(task_type_), K(lsid_), K(params_), KPC(tx_desc_));
public:
@ -476,9 +561,9 @@ public:
share::SCN commit_scn_;
};
}
}
}
} // namespace tablelock
} // namespace transaction
} // namespace oceanbase
#endif /* OCEANBASE_STORAGE_TABLELOCK_OB_TABLE_LOCK_RPC_STRUCT_H_ */

View File

@ -442,6 +442,7 @@ int ObTableLockService::lock_table(const uint64_t table_id,
int64_t abs_timeout_ts = (0 == timeout_us)
? ObTimeUtility::current_time() + DEFAULT_TIMEOUT_US
: ObTimeUtility::current_time() + timeout_us;
Thread::WaitGuard guard(Thread::WAIT);
do {
if (timeout_us != 0) {
retry_timeout_us = abs_timeout_ts - ObTimeUtility::current_time();
@ -479,6 +480,7 @@ int ObTableLockService::unlock_table(const uint64_t table_id,
int64_t abs_timeout_ts = (0 == timeout_us)
? ObTimeUtility::current_time() + DEFAULT_TIMEOUT_US
: ObTimeUtility::current_time() + timeout_us;
Thread::WaitGuard guard(Thread::WAIT);
do {
if (timeout_us != 0) {
retry_timeout_us = abs_timeout_ts - ObTimeUtility::current_time();
@ -518,6 +520,7 @@ int ObTableLockService::lock_tablet(const uint64_t table_id,
int64_t abs_timeout_ts = (0 == timeout_us)
? ObTimeUtility::current_time() + DEFAULT_TIMEOUT_US
: ObTimeUtility::current_time() + timeout_us;
Thread::WaitGuard guard(Thread::WAIT);
do {
if (timeout_us != 0) {
retry_timeout_us = abs_timeout_ts - ObTimeUtility::current_time();
@ -560,6 +563,7 @@ int ObTableLockService::unlock_tablet(const uint64_t table_id,
int64_t abs_timeout_ts = (0 == timeout_us)
? ObTimeUtility::current_time() + DEFAULT_TIMEOUT_US
: ObTimeUtility::current_time() + timeout_us;
Thread::WaitGuard guard(Thread::WAIT);
do {
if (timeout_us != 0) {
retry_timeout_us = abs_timeout_ts - ObTimeUtility::current_time();
@ -599,6 +603,7 @@ int ObTableLockService::lock_table(ObTxDesc &tx_desc,
// is set by user in the 'WAIT n' option.
// Furthermore, if timeout_us_ is 0, this lock will be judged as a try
// lock semantics. It meets the actual semantics of 'NOWAIT' option.
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(LOCK_TABLE, arg.table_id_, arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
ctx.tx_desc_ = &tx_desc;
@ -628,6 +633,7 @@ int ObTableLockService::unlock_table(ObTxDesc &tx_desc,
LOG_WARN("invalid argument", K(ret), K(tx_desc), K(arg), K(tx_desc.is_valid()),
K(tx_param.is_valid()), K(arg.is_valid()));
} else {
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(UNLOCK_TABLE, arg.table_id_, arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
ctx.tx_desc_ = &tx_desc;
@ -655,6 +661,7 @@ int ObTableLockService::lock_tablet(ObTxDesc &tx_desc,
LOG_WARN("invalid argument", K(ret), K(tx_desc), K(arg), K(tx_desc.is_valid()),
K(tx_param.is_valid()), K(arg.is_valid()));
} else {
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(LOCK_TABLET, arg.table_id_,
arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
@ -689,6 +696,7 @@ int ObTableLockService::lock_tablet(ObTxDesc &tx_desc,
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) {
LOG_WARN("data version check failed", K(ret), K(arg));
} else {
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(LOCK_TABLET, arg.table_id_,
arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
@ -722,6 +730,7 @@ int ObTableLockService::unlock_tablet(ObTxDesc &tx_desc,
LOG_WARN("invalid argument", K(ret), K(tx_desc), K(arg), K(tx_desc.is_valid()),
K(tx_param.is_valid()), K(arg.is_valid()));
} else {
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(UNLOCK_TABLET, arg.table_id_,
arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
@ -757,6 +766,7 @@ int ObTableLockService::unlock_tablet(ObTxDesc &tx_desc,
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) {
LOG_WARN("data version check failed", K(ret), K(arg));
} else {
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(UNLOCK_TABLET, arg.table_id_, arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
ctx.tx_desc_ = &tx_desc;
@ -790,6 +800,7 @@ int ObTableLockService::lock_tablet(ObTxDesc &tx_desc,
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) {
LOG_WARN("cluster version check failed", K(ret), K(arg));
} else {
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(LOCK_ALONE_TABLET, arg.table_id_,
arg.ls_id_, arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
@ -825,6 +836,7 @@ int ObTableLockService::unlock_tablet(ObTxDesc &tx_desc,
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) {
LOG_WARN("cluster version check failed", K(ret), K(arg));
} else {
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(UNLOCK_ALONE_TABLET, arg.table_id_,
arg.ls_id_, arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
@ -854,6 +866,7 @@ int ObTableLockService::lock_partition_or_subpartition(ObTxDesc &tx_desc,
} else if (OB_FAIL(get_table_partition_level_(arg.table_id_, part_level))) {
LOG_WARN("can not get table partition level", K(ret), K(arg));
} else {
Thread::WaitGuard guard(Thread::WAIT);
switch (part_level) {
case PARTITION_LEVEL_ONE: {
if (OB_FAIL(lock_partition(tx_desc, tx_param, arg))) {
@ -894,6 +907,7 @@ int ObTableLockService::lock_partition(ObTxDesc &tx_desc,
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_1_0_0))) {
LOG_WARN("cluster version check failed", K(ret), K(arg));
} else {
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(LOCK_PARTITION, arg.table_id_, arg.part_object_id_,
arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
@ -909,7 +923,7 @@ int ObTableLockService::lock_partition(ObTxDesc &tx_desc,
int ObTableLockService::unlock_partition(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObLockPartitionRequest &arg)
const ObUnLockPartitionRequest &arg)
{
int ret = OB_SUCCESS;
@ -926,6 +940,7 @@ int ObTableLockService::unlock_partition(ObTxDesc &tx_desc,
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_1_0_0))) {
LOG_WARN("cluster version check failed", K(ret), K(arg));
} else {
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(UNLOCK_PARTITION, arg.table_id_, arg.part_object_id_,
arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
@ -956,6 +971,7 @@ int ObTableLockService::lock_subpartition(ObTxDesc &tx_desc,
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_1_0_0))) {
LOG_WARN("cluster version check failed", K(ret), K(arg));
} else {
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(LOCK_SUBPARTITION, arg.table_id_, arg.part_object_id_,
arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
@ -971,7 +987,7 @@ int ObTableLockService::lock_subpartition(ObTxDesc &tx_desc,
int ObTableLockService::unlock_subpartition(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObLockPartitionRequest &arg)
const ObUnLockPartitionRequest &arg)
{
int ret = OB_SUCCESS;
@ -988,6 +1004,7 @@ int ObTableLockService::unlock_subpartition(ObTxDesc &tx_desc,
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_1_0_0))) {
LOG_WARN("cluster version check failed", K(ret), K(arg));
} else {
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(UNLOCK_SUBPARTITION, arg.table_id_, arg.part_object_id_,
arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
@ -1018,6 +1035,7 @@ int ObTableLockService::lock_obj(ObTxDesc &tx_desc,
} else if (OB_FAIL(check_data_version_after_(DATA_VERSION_4_1_0_0))) {
LOG_WARN("data version check failed", K(ret), K(arg));
} else {
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(LOCK_OBJECT, arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
ctx.tx_desc_ = &tx_desc;
@ -1052,6 +1070,7 @@ int ObTableLockService::unlock_obj(ObTxDesc &tx_desc,
} else if (OB_FAIL(check_data_version_after_(DATA_VERSION_4_1_0_0))) {
LOG_WARN("data version check failed", K(ret), K(arg));
} else {
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(UNLOCK_OBJECT, arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
ctx.tx_desc_ = &tx_desc;
@ -1087,6 +1106,7 @@ int ObTableLockService::lock_obj(ObTxDesc &tx_desc,
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) {
LOG_WARN("cluster version check failed", K(ret), K(arg));
} else {
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(LOCK_OBJECT, arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
ctx.tx_desc_ = &tx_desc;
@ -1123,6 +1143,7 @@ int ObTableLockService::unlock_obj(ObTxDesc &tx_desc,
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) {
LOG_WARN("cluster version check failed", K(ret), K(arg));
} else {
Thread::WaitGuard guard(Thread::WAIT);
ObTableLockCtx ctx(UNLOCK_OBJECT, arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
ctx.tx_desc_ = &tx_desc;
@ -1745,6 +1766,41 @@ int ObTableLockService::batch_pre_check_lock_(ObTableLockCtx &ctx,
return ret;
}
template<class RpcProxy, class LockRequest>
int ObTableLockService::rpc_call_(RpcProxy &proxy_batch,
const ObAddr &addr,
const int64_t timeout_us,
const LockRequest &request)
{
int ret = OB_SUCCESS;
int32_t group_id = 0;
const int64_t min_cluster_version = GET_MIN_CLUSTER_VERSION();
if ((min_cluster_version > CLUSTER_VERSION_4_2_1_3 && min_cluster_version < CLUSTER_VERSION_4_2_2_0)
|| (min_cluster_version > CLUSTER_VERSION_4_2_2_0 && min_cluster_version < CLUSTER_VERSION_4_3_0_0)
|| (min_cluster_version >= CLUSTER_VERSION_4_3_0_0)) {
group_id = share::OBCG_LOCK;
if (request.is_unlock_request()) {
group_id = share::OBCG_UNLOCK;
}
if (OB_FAIL(proxy_batch.call(addr,
timeout_us,
GCONF.cluster_id,
MTL_ID(),
group_id,
request))) {
LOG_WARN("failed to all async rpc", KR(ret), K(addr), K(timeout_us), K(request));
}
} else {
if (OB_FAIL(proxy_batch.call(addr,
timeout_us,
MTL_ID(),
request))) {
LOG_WARN("failed to all async rpc", KR(ret), K(addr), K(timeout_us), K(request));
}
}
return ret;
}
int ObTableLockService::pre_check_lock_old_version_(ObTableLockCtx &ctx,
const ObTableLockMode lock_mode,
const ObTableLockOwnerID lock_owner,
@ -1793,12 +1849,11 @@ int ObTableLockService::pre_check_lock_old_version_(ObTableLockCtx &ctx,
} else if (ctx.is_timeout()) {
ret = (last_ret == OB_TRY_LOCK_ROW_CONFLICT) ? OB_ERR_EXCLUSIVE_LOCK_CONFLICT : OB_TIMEOUT;
LOG_WARN("process obj lock timeout", K(ret), K(ctx));
} else if (OB_FAIL(proxy_batch.call(addr,
timeout_us,
ctx.tx_desc_->get_tenant_id(),
request))) {
LOG_WARN("failed to all async rpc", KR(ret), K(addr),
K(ctx.abs_timeout_ts_), K(request));
} else if (OB_FAIL(rpc_call_(proxy_batch,
addr,
timeout_us,
request))) {
LOG_WARN("failed to all async rpc", KR(ret), K(addr), K(request));
} else {
retry_ctx.send_rpc_count_++;
ALLOW_NEXT_LOG();
@ -2041,10 +2096,10 @@ int ObTableLockService::send_rpc_task_(RpcProxy &proxy_batch,
} else if (ctx.is_timeout()) {
ret = OB_TIMEOUT;
LOG_WARN("process obj lock timeout", K(ret), K(ctx));
} else if (OB_FAIL(proxy_batch.call(addr,
ctx.get_rpc_timeoutus(),
ctx.tx_desc_->get_tenant_id(),
request))) {
} else if (OB_FAIL(rpc_call_(proxy_batch,
addr,
ctx.get_rpc_timeoutus(),
request))) {
LOG_WARN("failed to call async rpc", KR(ret), K(addr),
K(ctx.abs_timeout_ts_), K(request));
} else {
@ -2178,12 +2233,11 @@ int ObTableLockService::parallel_rpc_handle_(RpcProxy &proxy_batch,
} else if (ctx.is_timeout()) {
ret = OB_TIMEOUT;
LOG_WARN("process obj lock timeout", K(ret), K(ctx));
} else if (OB_FAIL(proxy_batch.call(addr,
timeout_us,
ctx.tx_desc_->get_tenant_id(),
request))) {
LOG_WARN("failed to all async rpc", KR(ret), K(addr),
K(ctx.abs_timeout_ts_), K(request));
} else if (OB_FAIL(rpc_call_(proxy_batch,
addr,
timeout_us,
request))) {
LOG_WARN("failed to all async rpc", KR(ret), K(addr), K(timeout_us), K(request));
} else {
retry_ctx.send_rpc_count_++;
ALLOW_NEXT_LOG();

View File

@ -479,6 +479,11 @@ private:
const ObTableLockMode lock_mode,
const ObTableLockOwnerID lock_owner,
ObRetryCtx &retry_ctx);
template<class RpcProxy, class LockRequest>
int rpc_call_(RpcProxy &proxy_batch,
const ObAddr &addr,
const int64_t timeout_us,
const LockRequest &request);
int get_retry_lock_ids_(const ObLockIDArray &lock_ids,
const int64_t start_pos,
ObLockIDArray &retry_lock_ids);