[tablelock] support batch lock tablets and objects for ddl lock

This commit is contained in:
YangEfei
2023-08-09 04:12:42 +00:00
committed by ob-robot
parent 82aceac81e
commit 7d8c1d5a7f
10 changed files with 545 additions and 108 deletions

View File

@ -412,7 +412,10 @@ int ObInnerSqlRpcP::process()
case ObInnerSQLTransmitArg::OPERATION_TYPE_LOCK_SUBPART:
case ObInnerSQLTransmitArg::OPERATION_TYPE_UNLOCK_SUBPART:
case ObInnerSQLTransmitArg::OPERATION_TYPE_LOCK_ALONE_TABLET:
case ObInnerSQLTransmitArg::OPERATION_TYPE_UNLOCK_ALONE_TABLET: {
case ObInnerSQLTransmitArg::OPERATION_TYPE_UNLOCK_ALONE_TABLET:
case ObInnerSQLTransmitArg::OPERATION_TYPE_LOCK_OBJS:
case ObInnerSQLTransmitArg::OPERATION_TYPE_UNLOCK_OBJS: {
if (OB_FAIL(ObInnerConnectionLockUtil::process_lock_rpc(transmit_arg, conn))) {
LOG_WARN("process lock rpc failed", K(ret), K(transmit_arg.get_operation_type()));
}

View File

@ -50,6 +50,8 @@ public:
OPERATION_TYPE_UNLOCK_SUBPART = 16,
OPERATION_TYPE_LOCK_ALONE_TABLET = 17,
OPERATION_TYPE_UNLOCK_ALONE_TABLET = 18,
OPERATION_TYPE_LOCK_OBJS = 19,
OPERATION_TYPE_UNLOCK_OBJS = 20,
OPERATION_TYPE_MAX = 100
};

View File

@ -4839,13 +4839,24 @@ int ObDDLService::lock_tablets(ObMySQLTransaction &trans,
LOG_WARN("conn_ is NULL", KR(ret));
} else {
LOG_INFO("lock tablet", KR(ret), K(tablet_ids), K(table_id), K(tenant_id), KPC(conn));
for (int i = 0; i < tablet_ids.count() && OB_SUCC(ret); i++) {
if (OB_FAIL(ObInnerConnectionLockUtil::lock_tablet(tenant_id,
table_id,
tablet_ids.at(i),
EXCLUSIVE,
timeout,
conn))) {
if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_2_0_0) {
for (int i = 0; i < tablet_ids.count() && OB_SUCC(ret); i++) {
if (OB_FAIL(ObInnerConnectionLockUtil::lock_tablet(tenant_id,
table_id,
tablet_ids.at(i),
EXCLUSIVE,
timeout,
conn))) {
LOG_WARN("lock dest table failed", KR(ret), K(table_id), K(tenant_id));
}
}
} else {
if (OB_FAIL(ObInnerConnectionLockUtil::lock_tablet(tenant_id,
table_id,
tablet_ids,
EXCLUSIVE,
timeout,
conn))) {
LOG_WARN("lock dest table failed", KR(ret), K(table_id), K(tenant_id));
ret = ObDDLUtil::is_table_lock_retry_ret_code(ret) ? OB_EAGAIN : ret;
}

View File

@ -362,9 +362,17 @@ int ObDDLLock::lock_table_lock_in_trans(
LOG_WARN("failed to lock table", K(ret));
}
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); i++) {
if (OB_FAIL(ObInnerConnectionLockUtil::lock_tablet(tenant_id, table_id, tablet_ids.at(i), lock_mode, timeout_us, iconn))) {
LOG_WARN("failed to lock tablet", K(ret));
if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_2_0_0) {
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); i++) {
if (OB_FAIL(ObInnerConnectionLockUtil::lock_tablet(
tenant_id, table_id, tablet_ids.at(i), lock_mode, timeout_us, iconn))) {
LOG_WARN("failed to lock tablet", K(ret), K(table_id), K(tablet_ids), K(lock_mode), K(timeout_us));
}
}
} else {
if (OB_FAIL(
ObInnerConnectionLockUtil::lock_tablet(tenant_id, table_id, tablet_ids, lock_mode, timeout_us, iconn))) {
LOG_WARN("failed to lock tablets", K(ret), K(table_id), K(tablet_ids), K(lock_mode), K(timeout_us));
}
}
}
@ -608,7 +616,7 @@ int ObOnlineDDLLock::lock_table_in_trans(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid conn", K(ret));
} else if (OB_FAIL(ObInnerConnectionLockUtil::lock_obj(tenant_id, arg, iconn))) {
LOG_WARN("failed to lock online ddl table in trans", K(ret));
LOG_WARN("failed to lock online ddl table in trans", K(ret), K(arg));
}
return ret;
}
@ -623,20 +631,25 @@ int ObOnlineDDLLock::lock_tablets_in_trans(
{
int ret = OB_SUCCESS;
ObInnerSQLConnection *iconn = nullptr;
ObLockObjRequest arg;
arg.obj_type_ = ObLockOBJType::OBJ_TYPE_ONLINE_DDL_TABLET;
arg.timeout_us_ = timeout_us;
arg.op_type_ = ObTableLockOpType::IN_TRANS_COMMON_LOCK;
arg.lock_mode_ = lock_mode;
arg.owner_id_ = 0;
if (OB_ISNULL(iconn = static_cast<ObInnerSQLConnection *>(trans.get_connection()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid conn", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); i++) {
arg.obj_id_ = tablet_ids.at(i).id();
if (OB_FAIL(ObInnerConnectionLockUtil::lock_obj(tenant_id, arg, iconn))) {
LOG_WARN("failed to lock online ddl tablet in trans", K(ret));
} else {
ObLockObjsRequest arg;
ObLockID lock_id;
arg.timeout_us_ = timeout_us;
arg.op_type_ = ObTableLockOpType::IN_TRANS_COMMON_LOCK;
arg.lock_mode_ = lock_mode;
arg.owner_id_ = 0;
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); i++) {
if (OB_FAIL(lock_id.set(ObLockOBJType::OBJ_TYPE_ONLINE_DDL_TABLET, tablet_ids.at(i).id()))) {
LOG_WARN("set lock id failed", K(ret), K(i), K(tablet_ids));
} else if (OB_FAIL(arg.objs_.push_back(lock_id))) {
LOG_WARN("add lock id failed", K(ret), K(lock_id));
}
}
if (OB_SUCC(ret) && OB_FAIL(ObInnerConnectionLockUtil::lock_obj(tenant_id, arg, iconn))) {
LOG_WARN("failed to lock online ddl tablets in trans", K(ret), K(arg));
}
}
return ret;
@ -679,20 +692,25 @@ int ObOnlineDDLLock::lock_tablets(
{
int ret = OB_SUCCESS;
ObInnerSQLConnection *iconn = nullptr;
ObLockObjRequest arg;
arg.obj_type_ = ObLockOBJType::OBJ_TYPE_ONLINE_DDL_TABLET;
arg.timeout_us_ = timeout_us;
arg.op_type_ = ObTableLockOpType::OUT_TRANS_LOCK;
arg.lock_mode_ = lock_mode;
arg.owner_id_ = lock_owner;
if (OB_ISNULL(iconn = static_cast<ObInnerSQLConnection *>(trans.get_connection()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid conn", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); i++) {
arg.obj_id_ = tablet_ids.at(i).id();
if (OB_FAIL(ObInnerConnectionLockUtil::lock_obj(tenant_id, arg, iconn))) {
LOG_WARN("failed to lock online ddl tablet", K(ret));
} else {
ObLockObjsRequest arg;
ObLockID lock_id;
arg.timeout_us_ = timeout_us;
arg.op_type_ = ObTableLockOpType::OUT_TRANS_LOCK;
arg.lock_mode_ = lock_mode;
arg.owner_id_ = lock_owner;
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); i++) {
if (OB_FAIL(lock_id.set(ObLockOBJType::OBJ_TYPE_ONLINE_DDL_TABLET, tablet_ids.at(i).id()))) {
LOG_WARN("set lock id failed", K(ret), K(i), K(tablet_ids));
} else if (OB_FAIL(arg.objs_.push_back(lock_id))) {
LOG_WARN("add lock id failed", K(ret), K(lock_id));
}
}
if (OB_SUCC(ret) && OB_FAIL(ObInnerConnectionLockUtil::lock_obj(tenant_id, arg, iconn))) {
LOG_WARN("failed to lock online ddl tablets", K(ret), K(arg));
}
}
return ret;
@ -743,30 +761,38 @@ int ObOnlineDDLLock::unlock_tablets(
{
int ret = OB_SUCCESS;
ObInnerSQLConnection *iconn = nullptr;
ObLockObjRequest arg;
arg.obj_type_ = ObLockOBJType::OBJ_TYPE_ONLINE_DDL_TABLET;
arg.timeout_us_ = timeout_us;
arg.op_type_ = ObTableLockOpType::OUT_TRANS_UNLOCK;
arg.lock_mode_ = lock_mode;
arg.owner_id_ = lock_owner;
if (OB_ISNULL(iconn = static_cast<ObInnerSQLConnection *>(trans.get_connection()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid conn", K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); i++) {
arg.obj_id_ = tablet_ids.at(i).id();
if (OB_FAIL(ObInnerConnectionLockUtil::unlock_obj(tenant_id, arg, iconn))) {
} else {
ObUnLockObjsRequest arg;
ObLockID lock_id;
arg.timeout_us_ = timeout_us;
arg.op_type_ = ObTableLockOpType::OUT_TRANS_UNLOCK;
arg.lock_mode_ = lock_mode;
arg.owner_id_ = lock_owner;
for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); i++) {
if (OB_FAIL(lock_id.set(ObLockOBJType::OBJ_TYPE_ONLINE_DDL_TABLET, tablet_ids.at(i).id()))) {
LOG_WARN("set lock id failed", K(ret), K(i), K(tablet_ids));
} else if (OB_FAIL(arg.objs_.push_back(lock_id))) {
LOG_WARN("add lock id failed", K(ret), K(lock_id));
}
}
if (OB_SUCC(ret) && OB_FAIL(ObInnerConnectionLockUtil::unlock_obj(tenant_id, arg, iconn))) {
LOG_WARN("meet fail during unlock online ddl tablets", K(ret), K(arg));
if (OB_OBJ_LOCK_NOT_EXIST == ret) {
ret = OB_SUCCESS;
some_lock_not_exist = true;
LOG_WARN("online ddl tablet already unlocked", K(ret), K(arg));
LOG_WARN("online ddl tablet already unlocked", K(ret));
} else {
LOG_WARN("failed to unlock online ddl tablet", K(ret), K(arg));
LOG_WARN("failed to unlock online ddl tablet", K(ret));
}
}
}
return ret;
}
}
} // namespace storage
}

View File

@ -99,6 +99,14 @@ int ObInnerConnectionLockUtil::process_lock_rpc(
REQUEST_LOCK_4_1(ObUnLockObjRequest, operation_type, arg, inner_conn);
break;
}
case ObInnerSQLTransmitArg::OPERATION_TYPE_LOCK_OBJS: {
REQUEST_LOCK_4_2(ObLockObjsRequest, operation_type, arg, inner_conn);
break;
}
case ObInnerSQLTransmitArg::OPERATION_TYPE_UNLOCK_OBJS: {
REQUEST_LOCK_4_2(ObUnLockObjsRequest, operation_type, arg, inner_conn);
break;
}
case ObInnerSQLTransmitArg::OPERATION_TYPE_LOCK_PART: {
REQUEST_LOCK_4_1(ObLockPartitionRequest, operation_type, arg, inner_conn);
break;
@ -189,8 +197,10 @@ int ObInnerConnectionLockUtil::process_lock_tablet_(
LOG_WARN("lock tablet failed", K(ret), K(arg.get_tenant_id()), K(lock_arg));
}
}
} else {
} else if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_2_0_0) {
REQUEST_LOCK_4_1(ObLockTabletRequest, operation_type, arg, conn);
} else {
REQUEST_LOCK_4_2(ObLockTabletsRequest, operation_type, arg, conn);
}
return ret;
@ -299,7 +309,19 @@ int ObInnerConnectionLockUtil::lock_tablet(
observer::ObInnerSQLConnection *conn)
{
int ret = OB_SUCCESS;
if (GET_MIN_CLUSTER_VERSION() > CLUSTER_VERSION_4_0_0_0) {
if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_0_0) {
ObLockTabletsRequest lock_arg;
lock_arg.owner_id_ = 0;
lock_arg.lock_mode_ = lock_mode;
lock_arg.op_type_ = IN_TRANS_COMMON_LOCK;
lock_arg.timeout_us_ = timeout_us;
lock_arg.table_id_ = table_id;
if (OB_FAIL(lock_arg.tablet_ids_.push_back(tablet_id))) {
LOG_WARN("add tablet id failed", K(ret), K(tablet_id));
} else if (OB_FAIL(request_lock_(tenant_id, lock_arg, ObInnerSQLTransmitArg::OPERATION_TYPE_LOCK_TABLET, conn))) {
LOG_WARN("request lock for tablet failed", K(ret), K(lock_arg));
}
} else if (GET_MIN_CLUSTER_VERSION() > CLUSTER_VERSION_4_0_0_0) {
ObLockTabletRequest lock_arg;
lock_arg.owner_id_ = 0;
lock_arg.lock_mode_ = lock_mode;
@ -317,6 +339,30 @@ int ObInnerConnectionLockUtil::lock_tablet(
return ret;
}
int ObInnerConnectionLockUtil::lock_tablet(
const uint64_t tenant_id,
const uint64_t table_id,
const ObIArray<ObTabletID> &tablet_ids,
const ObTableLockMode lock_mode,
const int64_t timeout_us,
observer::ObInnerSQLConnection *conn)
{
int ret = OB_SUCCESS;
ObLockTabletsRequest lock_arg;
lock_arg.owner_id_ = 0;
lock_arg.lock_mode_ = lock_mode;
lock_arg.op_type_ = IN_TRANS_COMMON_LOCK;
lock_arg.timeout_us_ = timeout_us;
lock_arg.table_id_ = table_id;
if (OB_FAIL(lock_arg.tablet_ids_.assign(tablet_ids))) {
LOG_WARN("assign tablet id failed", K(ret), K(tablet_ids));
} else if (OB_FAIL(request_lock_(tenant_id, lock_arg, ObInnerSQLTransmitArg::OPERATION_TYPE_LOCK_TABLET, conn))) {
LOG_WARN("request lock for tablets failed", K(ret), K(lock_arg));
}
return ret;
}
int ObInnerConnectionLockUtil::lock_tablet(
const uint64_t tenant_id,
const ObLockTabletRequest &arg,
@ -386,6 +432,29 @@ int ObInnerConnectionLockUtil::unlock_obj(
return ret;
}
int ObInnerConnectionLockUtil::lock_obj(
const uint64_t tenant_id,
const ObLockObjsRequest &arg,
observer::ObInnerSQLConnection *conn)
{
return request_lock_(tenant_id, arg, ObInnerSQLTransmitArg::OPERATION_TYPE_LOCK_OBJS, conn);
}
int ObInnerConnectionLockUtil::unlock_obj(
const uint64_t tenant_id,
const ObUnLockObjsRequest &arg,
observer::ObInnerSQLConnection *conn)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(ObTableLockOpType::OUT_TRANS_UNLOCK != arg.op_type_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("only OUT_TRANS_LOCK should unlock.", K(ret), K(arg));
} else {
ret = request_lock_(tenant_id, arg, ObInnerSQLTransmitArg::OPERATION_TYPE_UNLOCK_OBJS, conn);
}
return ret;
}
int ObInnerConnectionLockUtil::do_obj_lock_(
const uint64_t tenant_id,
const ObLockRequest &arg,
@ -431,20 +500,30 @@ int ObInnerConnectionLockUtil::do_obj_lock_(
break;
}
case ObInnerSQLTransmitArg::OPERATION_TYPE_LOCK_TABLET: {
const ObLockTabletRequest &lock_arg = static_cast<const ObLockTabletRequest &>(arg);
if (OB_FAIL(MTL(ObTableLockService*)->lock_tablet(*tx_desc,
tx_param,
lock_arg))) {
LOG_WARN("lock tablet failed", K(ret), K(tenant_id), K(lock_arg));
if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_2_0_0) {
const ObLockTabletRequest &lock_arg = static_cast<const ObLockTabletRequest &>(arg);
if (OB_FAIL(MTL(ObTableLockService *)->lock_tablet(*tx_desc, tx_param, lock_arg))) {
LOG_WARN("lock tablet failed", K(ret), K(tenant_id), K(lock_arg));
}
} else {
const ObLockTabletsRequest &lock_arg = static_cast<const ObLockTabletsRequest &>(arg);
if (OB_FAIL(MTL(ObTableLockService *)->lock_tablet(*tx_desc, tx_param, lock_arg))) {
LOG_WARN("lock tablets failed", K(ret), K(tenant_id), K(lock_arg));
}
}
break;
}
case ObInnerSQLTransmitArg::OPERATION_TYPE_UNLOCK_TABLET: {
const ObUnLockTabletRequest &lock_arg = static_cast<const ObUnLockTabletRequest &>(arg);
if (OB_FAIL(MTL(ObTableLockService*)->unlock_tablet(*tx_desc,
tx_param,
lock_arg))) {
LOG_WARN("unlock tablet failed", K(ret), K(tenant_id), K(lock_arg));
if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_2_0_0) {
const ObUnLockTabletRequest &lock_arg = static_cast<const ObUnLockTabletRequest &>(arg);
if (OB_FAIL(MTL(ObTableLockService *)->unlock_tablet(*tx_desc, tx_param, lock_arg))) {
LOG_WARN("unlock tablet failed", K(ret), K(tenant_id), K(lock_arg));
}
} else {
const ObUnLockTabletsRequest &lock_arg = static_cast<const ObUnLockTabletsRequest &>(arg);
if (OB_FAIL(MTL(ObTableLockService *)->unlock_tablet(*tx_desc, tx_param, lock_arg))) {
LOG_WARN("unlock tablets failed", K(ret), K(tenant_id), K(lock_arg));
}
}
break;
}
@ -468,22 +547,32 @@ int ObInnerConnectionLockUtil::do_obj_lock_(
}
case ObInnerSQLTransmitArg::OPERATION_TYPE_LOCK_OBJ: {
const ObLockObjRequest &lock_arg = static_cast<const ObLockObjRequest &>(arg);
if (OB_FAIL(MTL(ObTableLockService*)->lock_obj(*tx_desc,
tx_param,
lock_arg))) {
if (OB_FAIL(MTL(ObTableLockService *)->lock_obj(*tx_desc, tx_param, lock_arg))) {
LOG_WARN("lock object failed", K(ret), K(tenant_id), K(lock_arg));
}
break;
}
case ObInnerSQLTransmitArg::OPERATION_TYPE_LOCK_OBJS: {
const ObLockObjsRequest &lock_arg = static_cast<const ObLockObjsRequest &>(arg);
if (OB_FAIL(MTL(ObTableLockService *)->lock_obj(*tx_desc, tx_param, lock_arg))) {
LOG_WARN("lock objects failed", K(ret), K(tenant_id), K(lock_arg));
}
break;
}
case ObInnerSQLTransmitArg::OPERATION_TYPE_UNLOCK_OBJ: {
const ObUnLockObjRequest &lock_arg = static_cast<const ObUnLockObjRequest &>(arg);
if (OB_FAIL(MTL(ObTableLockService*)->unlock_obj(*tx_desc,
tx_param,
lock_arg))) {
if (OB_FAIL(MTL(ObTableLockService *)->unlock_obj(*tx_desc, tx_param, lock_arg))) {
LOG_WARN("unlock object failed", K(ret), K(tenant_id), K(lock_arg));
}
break;
}
case ObInnerSQLTransmitArg::OPERATION_TYPE_UNLOCK_OBJS: {
const ObUnLockObjsRequest &lock_arg = static_cast<const ObUnLockObjsRequest &>(arg);
if (OB_FAIL(MTL(ObTableLockService *)->unlock_obj(*tx_desc, tx_param, lock_arg))) {
LOG_WARN("unlock objects failed", K(ret), K(tenant_id), K(lock_arg));
}
break;
}
case ObInnerSQLTransmitArg::OPERATION_TYPE_LOCK_SUBPART: {
const ObLockPartitionRequest &lock_arg = static_cast<const ObLockPartitionRequest &>(arg);
if (OB_FAIL(MTL(ObTableLockService*)->lock_subpartition(*tx_desc,

View File

@ -40,11 +40,13 @@ namespace tablelock
{
class ObLockRequest;
class ObLockObjRequest;
class ObLockObjsRequest;
class ObLockTableRequest;
class ObLockTabletRequest;
class ObLockPartitionRequest;
class ObLockAloneTabletRequest;
using ObUnLockObjRequest = ObLockObjRequest;
using ObUnLockObjsRequest = ObLockObjsRequest;
using ObUnLockTableRequest = ObLockTableRequest;
using ObUnLockPartitionRequest = ObLockPartitionRequest;
using ObUnLockTabletRequest = ObLockTabletRequest;
@ -105,6 +107,13 @@ public:
const ObTableLockMode lock_mode,
const int64_t timeout_us,
observer::ObInnerSQLConnection *conn);
static int lock_tablet(
const uint64_t tenant_id,
const uint64_t table_id,
const ObIArray<ObTabletID> &tablet_ids,
const ObTableLockMode lock_mode,
const int64_t timeout_us,
observer::ObInnerSQLConnection *conn);
static int lock_tablet(
const uint64_t tenant_id,
const ObLockTabletRequest &arg,
@ -129,6 +138,14 @@ public:
const uint64_t tenant_id,
const ObUnLockObjRequest &arg,
observer::ObInnerSQLConnection *conn);
static int lock_obj(
const uint64_t tenant_id,
const ObLockObjsRequest &arg,
observer::ObInnerSQLConnection *conn);
static int unlock_obj(
const uint64_t tenant_id,
const ObUnLockObjsRequest &arg,
observer::ObInnerSQLConnection *conn);
private:
static int do_obj_lock_(
const uint64_t tenant_id,

View File

@ -44,6 +44,9 @@ OB_SERIALIZE_MEMBER_INHERIT(ObLockObjRequest, ObLockRequest,
obj_type_,
obj_id_);
OB_SERIALIZE_MEMBER_INHERIT(ObLockObjsRequest, ObLockRequest,
objs_);
OB_SERIALIZE_MEMBER_INHERIT(ObLockTableRequest, ObLockRequest,
table_id_);
@ -256,6 +259,23 @@ bool ObLockObjRequest::is_valid() const
is_valid_id(obj_id_));
}
void ObLockObjsRequest::reset()
{
ObLockRequest::reset();
objs_.reset();
}
bool ObLockObjsRequest::is_valid() const
{
bool is_valid = true;
is_valid = (ObLockMsgType::LOCK_OBJ_REQ == type_ &&
ObLockRequest::is_valid());
for (int64_t i = 0; i < objs_.count() && is_valid; i++) {
is_valid = is_valid && objs_.at(i).is_valid();
}
return is_valid;
}
void ObLockTableRequest::reset()
{
ObLockRequest::reset();

View File

@ -159,6 +159,24 @@ public:
};
using ObUnLockObjRequest = ObLockObjRequest;
struct ObLockObjsRequest : public ObLockRequest
{
OB_UNIS_VERSION_V(1);
public:
ObLockObjsRequest() :
ObLockRequest(),
objs_()
{ type_ = ObLockMsgType::LOCK_OBJ_REQ; }
virtual ~ObLockObjsRequest() { reset(); }
virtual void reset();
virtual bool is_valid() const;
INHERIT_TO_STRING_KV("ObLockRequest", ObLockRequest, K_(objs));
public:
// which objects should we lock
common::ObSEArray<ObLockID, 2> objs_;
};
using ObUnLockObjsRequest = ObLockObjsRequest;
struct ObLockTableRequest : public ObLockRequest
{
OB_UNIS_VERSION_V(1);
@ -218,6 +236,7 @@ public:
public:
common::ObTabletIDArray tablet_ids_;
};
using ObUnLockTabletsRequest = ObLockTabletsRequest;
struct ObLockAloneTabletRequest : public ObLockTabletsRequest
{

View File

@ -39,14 +39,12 @@ namespace transaction
namespace tablelock
{
ObTableLockService::ObTableLockCtx::ObTableLockCtx(const ObTableLockTaskType task_type,
const uint64_t table_id,
const int64_t origin_timeout_us,
const int64_t timeout_us)
: task_type_(task_type),
is_in_trans_(false),
table_id_(table_id),
table_id_(OB_INVALID_ID),
origin_timeout_us_(origin_timeout_us),
timeout_us_(timeout_us),
abs_timeout_ts_(),
@ -64,6 +62,15 @@ ObTableLockService::ObTableLockCtx::ObTableLockCtx(const ObTableLockTaskType tas
: ObTimeUtility::current_time() + timeout_us;
}
ObTableLockService::ObTableLockCtx::ObTableLockCtx(const ObTableLockTaskType task_type,
const uint64_t table_id,
const int64_t origin_timeout_us,
const int64_t timeout_us)
: ObTableLockCtx(task_type, origin_timeout_us, timeout_us)
{
table_id_ = table_id;
}
ObTableLockService::ObTableLockCtx::ObTableLockCtx(const ObTableLockTaskType task_type,
const uint64_t table_id,
const uint64_t partition_id,
@ -84,17 +91,6 @@ ObTableLockService::ObTableLockCtx::ObTableLockCtx(const ObTableLockTaskType tas
ls_id_ = ls_id;
}
ObTableLockService::ObTableLockCtx::ObTableLockCtx(const ObTableLockTaskType task_type,
const ObLockOBJType obj_type,
const uint64_t obj_id,
const int64_t origin_timeout_us,
const int64_t timeout_us)
: ObTableLockCtx(task_type, 0, origin_timeout_us, timeout_us)
{
obj_type_ = obj_type;
obj_id_ = obj_id;
}
void ObTableLockService::ObRetryCtx::reuse()
{
need_retry_ = false;
@ -259,6 +255,41 @@ int ObTableLockService::ObTableLockCtx::set_tablet_id(const common::ObTabletID &
return ret;
}
int ObTableLockService::ObTableLockCtx::set_lock_id(const common::ObIArray<ObLockID> &lock_ids)
{
int ret = OB_SUCCESS;
obj_list_.reuse();
for (int64_t i = 0; OB_SUCC(ret) && i < lock_ids.count(); i++) {
if (OB_FAIL(obj_list_.push_back(lock_ids.at(i)))) {
LOG_WARN("set lock id failed", K(ret), K(i), K(lock_ids));
}
}
return ret;
}
int ObTableLockService::ObTableLockCtx::set_lock_id(const ObLockID &lock_id)
{
int ret = OB_SUCCESS;
obj_list_.reuse();
if (OB_FAIL(obj_list_.push_back(lock_id))) {
LOG_WARN("set lock id failed", K(ret), K(lock_id));
}
return ret;
}
int ObTableLockService::ObTableLockCtx::set_lock_id(const ObLockOBJType &obj_type, const uint64_t obj_id)
{
int ret = OB_SUCCESS;
ObLockID lock_id;
obj_list_.reuse();
if (OB_FAIL(lock_id.set(obj_type, obj_id))) {
LOG_WARN("set lock id failed", K(ret), K(obj_type), K(obj_id));
} else if (OB_FAIL(obj_list_.push_back(lock_id))) {
LOG_WARN("set lock id failed", K(ret), K(lock_id));
}
return ret;
}
bool ObTableLockService::ObTableLockCtx::is_timeout() const
{
return ObTimeUtility::current_time() >= abs_timeout_ts_;
@ -611,7 +642,6 @@ int ObTableLockService::lock_tablet(ObTxDesc &tx_desc,
const ObLockTabletRequest &arg)
{
int ret = OB_SUCCESS;
const ObTableLockOwnerID lock_owner(0);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -638,12 +668,44 @@ int ObTableLockService::lock_tablet(ObTxDesc &tx_desc,
return ret;
}
int ObTableLockService::lock_tablet(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObLockTabletsRequest &arg)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("lock service is not inited", K(ret));
} else if (OB_UNLIKELY(!tx_desc.is_valid()) ||
OB_UNLIKELY(!tx_param.is_valid()) ||
OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tx_desc), K(arg), K(tx_desc.is_valid()),
K(tx_param.is_valid()), K(arg.is_valid()));
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) {
LOG_WARN("data version check failed", K(ret), K(arg));
} else {
ObTableLockCtx ctx(LOCK_TABLET, arg.table_id_,
arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
if (OB_FAIL(ctx.set_tablet_id(arg.tablet_ids_))) {
LOG_WARN("set tablet id failed", K(ret), K(arg));
} else if (OB_FAIL(process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_))) {
LOG_WARN("process lock task failed", K(ret), K(arg));
}
}
return ret;
}
int ObTableLockService::unlock_tablet(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObUnLockTabletRequest &arg)
{
int ret = OB_SUCCESS;
const ObTableLockOwnerID lock_owner(0);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -671,6 +733,39 @@ int ObTableLockService::unlock_tablet(ObTxDesc &tx_desc,
return ret;
}
int ObTableLockService::unlock_tablet(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObUnLockTabletsRequest &arg)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("lock service is not inited", K(ret));
} else if (OB_UNLIKELY(!tx_desc.is_valid()) ||
OB_UNLIKELY(!tx_param.is_valid()) ||
OB_UNLIKELY(!arg.is_valid()) ||
OB_UNLIKELY(ObTableLockOpType::OUT_TRANS_UNLOCK != arg.op_type_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tx_desc), K(arg), K(tx_desc.is_valid()),
K(tx_param.is_valid()), K(arg.is_valid()));
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) {
LOG_WARN("data version check failed", K(ret), K(arg));
} else {
ObTableLockCtx ctx(UNLOCK_TABLET, arg.table_id_, arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
if (OB_FAIL(ctx.set_tablet_id(arg.tablet_ids_))) {
LOG_WARN("set tablet id failed", K(ret), K(arg));
} else if (OB_FAIL(process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_))) {
LOG_WARN("process lock task failed", K(ret), K(arg));
}
}
return ret;
}
int ObTableLockService::lock_tablet(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObLockAloneTabletRequest &arg)
@ -911,12 +1006,16 @@ int ObTableLockService::lock_obj(ObTxDesc &tx_desc,
} else if (OB_FAIL(check_data_version_after_(DATA_VERSION_4_1_0_0))) {
LOG_WARN("data version check failed", K(ret), K(arg));
} else {
ObTableLockCtx ctx(LOCK_OBJECT, arg.obj_type_, arg.obj_id_, arg.timeout_us_, arg.timeout_us_);
ObTableLockCtx ctx(LOCK_OBJECT, arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
if (OB_FAIL(ctx.set_lock_id(arg.obj_type_, arg.obj_id_))) {
LOG_WARN("set lock id failed", K(ret), K(arg));
} else {
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
}
}
return ret;
}
@ -940,12 +1039,85 @@ int ObTableLockService::unlock_obj(ObTxDesc &tx_desc,
} else if (OB_FAIL(check_data_version_after_(DATA_VERSION_4_1_0_0))) {
LOG_WARN("data version check failed", K(ret), K(arg));
} else {
ObTableLockCtx ctx(UNLOCK_OBJECT, arg.obj_type_, arg.obj_id_, arg.timeout_us_, arg.timeout_us_);
ObTableLockCtx ctx(UNLOCK_OBJECT, arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
if (OB_FAIL(ctx.set_lock_id(arg.obj_type_, arg.obj_id_))) {
LOG_WARN("set lock id failed", K(ret), K(arg));
} else {
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
}
}
return ret;
}
int ObTableLockService::lock_obj(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObLockObjsRequest &arg)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("lock service is not inited", K(ret));
} else if (OB_UNLIKELY(!tx_desc.is_valid()) ||
OB_UNLIKELY(!tx_param.is_valid()) ||
OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tx_desc), K(arg), K(tx_desc.is_valid()),
K(tx_param.is_valid()), K(arg.is_valid()));
} else if (OB_FAIL(check_data_version_after_(DATA_VERSION_4_1_0_0))) {
LOG_WARN("data version check failed", K(ret), K(arg));
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) {
LOG_WARN("cluster version check failed", K(ret), K(arg));
} else {
ObTableLockCtx ctx(LOCK_OBJECT, arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
if (OB_FAIL(ctx.set_lock_id(arg.objs_))) {
LOG_WARN("set lock id failed", K(ret), K(arg));
} else {
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
}
}
return ret;
}
int ObTableLockService::unlock_obj(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObUnLockObjsRequest &arg)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("lock service is not inited", K(ret));
} else if (OB_UNLIKELY(!tx_desc.is_valid()) ||
OB_UNLIKELY(!tx_param.is_valid()) ||
OB_UNLIKELY(!arg.is_valid()) ||
OB_UNLIKELY(ObTableLockOpType::OUT_TRANS_UNLOCK != arg.op_type_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tx_desc), K(arg), K(tx_desc.is_valid()),
K(tx_param.is_valid()), K(arg.is_valid()));
} else if (OB_FAIL(check_data_version_after_(DATA_VERSION_4_1_0_0))) {
LOG_WARN("data version check failed", K(ret), K(arg));
} else if (OB_FAIL(check_cluster_version_after_(CLUSTER_VERSION_4_2_0_0))) {
LOG_WARN("cluster version check failed", K(ret), K(arg));
} else {
ObTableLockCtx ctx(UNLOCK_OBJECT, arg.timeout_us_, arg.timeout_us_);
ctx.is_in_trans_ = true;
ctx.tx_desc_ = &tx_desc;
ctx.tx_param_ = tx_param;
ctx.lock_op_type_ = arg.op_type_;
if (OB_FAIL(ctx.set_lock_id(arg.objs_))) {
LOG_WARN("set lock id failed", K(ret), K(arg));
} else {
ret = process_lock_task_(ctx, arg.lock_mode_, arg.owner_id_);
}
}
return ret;
}
@ -1028,16 +1200,23 @@ int ObTableLockService::process_obj_lock_task_(ObTableLockCtx &ctx,
const ObTableLockOwnerID lock_owner)
{
int ret = OB_SUCCESS;
ObLockID lock_id;
if (OB_FAIL(get_lock_id(ctx.obj_type_, ctx.obj_id_, lock_id))) {
LOG_WARN("failed to get_obj_lock_id", K(ret), K(ctx));
} else if (OB_FAIL(process_obj_lock_(ctx,
LOCK_SERVICE_LS,
lock_id,
lock_mode,
lock_owner))) {
LOG_WARN("lock obj failed", K(ret), K(ctx), K(LOCK_SERVICE_LS),
K(lock_id), K(ctx.task_type_), K(lock_mode));
if (ctx.obj_list_.empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("obj list is empty when lock obj", K(ret), K(ctx), K(lock_mode), K(lock_owner));
} else {
if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_2_0_0) {
ObLockID lock_id;
for (int64_t i = 0; OB_SUCC(ret) && i < ctx.obj_list_.count(); ++i) {
lock_id = ctx.obj_list_.at(i);
if (OB_FAIL(process_obj_lock_(ctx, LOCK_SERVICE_LS, lock_id, lock_mode, lock_owner))) {
LOG_WARN("lock obj failed", K(ret), K(ctx), K(LOCK_SERVICE_LS), K(lock_id), K(ctx.task_type_), K(lock_mode));
}
}
} else {
if (OB_FAIL(process_obj_lock_(ctx, LOCK_SERVICE_LS, ctx.obj_list_, lock_mode, lock_owner))) {
LOG_WARN("lock obj failed", K(ret), K(ctx), K(LOCK_SERVICE_LS), K(ctx.obj_list_), K(ctx.task_type_), K(lock_mode));
}
}
}
return ret;
}
@ -2119,6 +2298,63 @@ int ObTableLockService::process_obj_lock_(ObTableLockCtx &ctx,
return ret;
}
int ObTableLockService::process_obj_lock_(ObTableLockCtx &ctx,
const ObLSID &ls_id,
const common::ObIArray<ObLockID> &lock_ids,
const ObTableLockMode lock_mode,
const ObTableLockOwnerID lock_owner)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
bool need_retry = false;
ObLSLockMap ls_lock_map;
ObLockIDArray lock_array;
ObLockIDArray *p = nullptr;
if (OB_FAIL(ls_lock_map.create(10, lib::ObLabel("LSLockMap")))) {
LOG_WARN("ls_lock_map create failed");
} else if (OB_FAIL(ls_lock_map.set_refactored(ls_id, lock_array)) &&
OB_ENTRY_EXIST != ret && OB_HASH_EXIST != ret) {
LOG_WARN("fail to set tablet_map", K(ret), K(ls_id));
} else if (OB_ISNULL(p = const_cast<ObLockIDArray *>(ls_lock_map.get(ls_id)))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the ls not exist at tablet map", K(ret), K(ls_id));
} else if (OB_FAIL(p->assign(lock_ids))) {
LOG_WARN("push_back tablet_id failed", K(ret), K(ls_id), K(lock_ids));
} else {
do {
need_retry = false;
if (ctx.is_timeout()) {
ret = OB_TIMEOUT;
LOG_WARN("lock table timeout", K(ret), K(ctx));
} else if (OB_FAIL(start_sub_tx_(ctx))) {
LOG_WARN("failed to start sub tx", K(ret), K(ctx));
} else {
if (OB_FAIL(inner_process_obj_lock_batch_(ctx, ls_lock_map, lock_mode, lock_owner))) {
LOG_WARN("process object locks failed", K(ret), K(lock_ids), K(ls_id));
}
if (OB_SUCC(ret)) {
if (OB_FAIL(end_sub_tx_(ctx, false/*not rollback*/))) {
LOG_WARN("failed to end sub tx", K(ret), K(ctx));
}
} else {
// rollback the sub tx.
need_retry = need_retry_single_task_(ctx, ret);
// overwrite the ret code.
if (need_retry &&
OB_FAIL(end_sub_tx_(ctx, true/*rollback*/))) {
LOG_WARN("failed to rollback sub tx", K(ret), K(ctx));
}
}
}
} while (need_retry && OB_SUCC(ret));
}
LOG_DEBUG("ObTableLockService::process_obj_lock_", K(ret), K(ctx), K(ls_id),
K(lock_ids), K(ctx.task_type_), K(lock_mode), K(lock_owner));
return ret;
}
int ObTableLockService::process_table_lock_(ObTableLockCtx &ctx,
const ObTableLockMode lock_mode,
const ObTableLockOwnerID lock_owner)

View File

@ -61,6 +61,9 @@ private:
class ObTableLockCtx
{
public:
ObTableLockCtx(const ObTableLockTaskType task_type,
const int64_t origin_timeout_us,
const int64_t timeout_us);
ObTableLockCtx(const ObTableLockTaskType task_type,
const uint64_t table_id,
const int64_t origin_timeout_us,
@ -75,14 +78,12 @@ private:
const share::ObLSID &ls_id,
const int64_t origin_timeout_us,
const int64_t timeout_us);
ObTableLockCtx(const ObTableLockTaskType task_type,
const ObLockOBJType obj_type,
const uint64_t obj_id,
const int64_t origin_timeout_us,
const int64_t timeout_us);
~ObTableLockCtx() {}
int set_tablet_id(const common::ObIArray<common::ObTabletID> &tablet_ids);
int set_tablet_id(const common::ObTabletID &tablet_id);
int set_lock_id(const common::ObIArray<ObLockID> &lock_ids);
int set_lock_id(const ObLockID &lock_id);
int set_lock_id(const ObLockOBJType &obj_type, const uint64_t obj_id);
bool is_try_lock() const { return 0 == timeout_us_; }
bool is_deadlock_avoid_enabled() const;
bool is_timeout() const;
@ -118,17 +119,12 @@ private:
ObTableLockTaskType task_type_; // current lock request type
bool is_in_trans_;
union {
// used for table/partition/tablet
// used for table/partition
struct {
uint64_t table_id_;
uint64_t partition_id_; // set when lock or unlock specified partition
share::ObLSID ls_id_; // used for alone tablet lock and unlock
};
// used for lock object
struct {
ObLockOBJType obj_type_;
uint64_t obj_id_;
};
};
ObTableLockOpType lock_op_type_; // specify the lock op type
@ -143,6 +139,7 @@ private:
share::ObLSArray need_rollback_ls_; // which ls has been modified after
// the current_savepoint_ created.
common::ObTabletIDArray tablet_list_; // all the tablets need to be locked/unlocked
ObLockIDArray obj_list_;
// TODO: yanyuan.cxf we need better performance.
// share::ObLSArray ls_list_; // related ls list
int64_t schema_version_; // the schema version of the table to be locked
@ -153,11 +150,11 @@ private:
int64_t stmt_savepoint_;
TO_STRING_KV(K(is_in_trans_), K(table_id_), K(partition_id_),
K(tablet_list_), K(obj_type_), K(obj_id_), K(lock_op_type_),
K(tablet_list_), K(obj_list_), K(lock_op_type_),
K(origin_timeout_us_), K(timeout_us_),
K(abs_timeout_ts_), KPC(tx_desc_), K(tx_param_),
K(current_savepoint_), K(need_rollback_ls_),
K(tablet_list_), K(schema_version_), K(tx_is_killed_),
K(schema_version_), K(tx_is_killed_),
K(is_from_sql_), K(stmt_savepoint_));
};
class ObRetryCtx
@ -289,11 +286,17 @@ public:
int lock_tablet(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObLockTabletRequest &arg);
int lock_tablet(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObLockTabletsRequest &arg);
// arg.op_type_ should be OUT_TRANS_UNLOCK:
// unlock the table level lock and the tablet level lock.
int unlock_tablet(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObUnLockTabletRequest &arg);
int unlock_tablet(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObUnLockTabletsRequest &arg);
int lock_tablet(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObLockAloneTabletRequest &arg);
@ -323,6 +326,12 @@ public:
int unlock_obj(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObUnLockObjRequest &arg);
int lock_obj(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObLockObjsRequest &arg);
int unlock_obj(ObTxDesc &tx_desc,
const ObTxParam &tx_param,
const ObUnLockObjsRequest &arg);
int garbage_collect_right_now();
int get_obj_lock_garbage_collector(ObOBJLockGarbageCollector *&obj_lock_garbage_collector);
private:
@ -504,6 +513,11 @@ private:
const ObLockID &lock_id,
const ObTableLockMode lock_mode,
const ObTableLockOwnerID lock_owner);
int process_obj_lock_(ObTableLockCtx &ctx,
const share::ObLSID &ls_id,
const common::ObIArray<ObLockID> &lock_ids,
const ObTableLockMode lock_mode,
const ObTableLockOwnerID lock_owner);
static bool is_part_table_lock_(const ObTableLockTaskType task_type);
int get_table_lock_mode_(const ObTableLockTaskType task_type,
const ObTableLockMode part_lock_mode,