[TABLELOCK] fix several bugs about mysql lock functions and DBMS_LOCK

This commit is contained in:
YangEfei 2024-04-27 17:53:39 +00:00 committed by ob-robot
parent dc5fc328ac
commit 1442b06e77
7 changed files with 478 additions and 185 deletions

View File

@ -104,7 +104,14 @@ bool ObExprLockFunc::proxy_is_support(const ObExecContext &exec_ctx)
if (OB_ISNULL(session)) {
LOG_ERROR_RET(OB_INVALID_ARGUMENT, "session is null!");
} else {
is_support = session->is_feedback_proxy_info_support() || !session->is_obproxy_mode();
is_support = (session->is_feedback_proxy_info_support() && session->is_client_sessid_support()) || !session->is_obproxy_mode();
if (!is_support) {
LOG_WARN_RET(OB_NOT_SUPPORTED,
"proxy is not support this feature",
K(session->get_sessid()),
K(session->is_feedback_proxy_info_support()),
K(session->is_client_sessid_support()));
}
}
return is_support;
}
@ -334,7 +341,7 @@ int ObExprIsUsedLock::is_used_lock(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &
int ret = OB_SUCCESS;
// we can get session info use this
const ObSQLSessionInfo *session = ctx.exec_ctx_.get_my_session();
int64_t sess_id = 0;
uint32_t sess_id = 0;
ObDatum *lock_name = NULL;
if (!proxy_is_support(ctx.exec_ctx_)) {
@ -362,7 +369,7 @@ int ObExprIsUsedLock::is_used_lock(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &
// 1. get the lock session id
if (OB_SUCC(ret)) {
result.set_int(sess_id);
result.set_uint32(sess_id);
} else if (OB_EMPTY_RESULT == ret) {
// 2. there is nobody hold the lock
ret = OB_SUCCESS;

View File

@ -16,6 +16,7 @@
#include "lib/mysqlclient/ob_mysql_proxy.h"
#include "lib/mysqlclient/ob_mysql_result.h"
#include "lib/utility/ob_fast_convert.h"
#include "lib/alloc/alloc_assist.h"
#include "observer/ob_inner_sql_connection.h"
#include "share/ob_table_access_helper.h"
#include "sql/engine/ob_exec_context.h"
@ -275,6 +276,30 @@ int ObLockFuncContext::execute_read(const ObSqlString &sql, ObMySQLProxy::MySQLR
}
return ret;
}
int ObLockFuncExecutor::check_lock_exist_(const uint64_t &lock_id)
{
int ret = OB_SUCCESS;
uint64_t tenant_id = MTL_ID();
ObStringHolder query_lock_handle;
char where_cond[WHERE_CONDITION_BUFFER_SIZE] = {0};
char table_name[MAX_FULL_TABLE_NAME_LENGTH] = {0};
bool is_existed = false;
OZ(databuff_printf(where_cond,
WHERE_CONDITION_BUFFER_SIZE,
"WHERE obj_id = %" PRIu64
" and obj_type = %d",
lock_id,
static_cast<int>(ObLockOBJType::OBJ_TYPE_MYSQL_LOCK_FUNC)));
OZ (databuff_printf(table_name, MAX_FULL_TABLE_NAME_LENGTH,
"%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_DETECT_LOCK_INFO_TNAME));
OZ (ObTableAccessHelper::read_single_row(tenant_id,
{ "1" },
table_name,
where_cond,
is_existed));
return ret;
}
int ObLockFuncExecutor::check_lock_exist_(ObLockFuncContext &ctx,
ObSqlString &where_cond,
@ -330,9 +355,7 @@ int ObLockFuncExecutor::check_lock_exist_(ObLockFuncContext &ctx,
return ret;
}
int ObLockFuncExecutor::check_lock_exist_(ObLockFuncContext &ctx,
const uint64_t &lock_id,
bool &exist)
int ObLockFuncExecutor::check_lock_exist_(ObLockFuncContext &ctx, const uint64_t &lock_id, bool &exist)
{
int ret = OB_SUCCESS;
ObSqlString where_cond;
@ -358,49 +381,73 @@ int ObLockFuncExecutor::check_lock_exist_(ObLockFuncContext &ctx,
return ret;
}
int ObLockFuncExecutor::remove_lock_id_(ObLockFuncContext &ctx,
const int64_t raw_owner_id,
const uint64_t &lock_id)
int ObLockFuncExecutor::check_client_ssid_(ObLockFuncContext &ctx,
const uint32_t client_session_id,
const uint64_t client_session_create_ts)
{
int ret = OB_SUCCESS;
bool lock_exist = false;
if (OB_FAIL(check_lock_exist_(ctx,
raw_owner_id,
lock_id,
lock_exist))) {
LOG_WARN("check lock exist failed", K(ret), K(raw_owner_id), K(lock_id));
} else if (!lock_exist) {
if (OB_FAIL(remove_lock_id_(ctx, lock_id))) {
LOG_WARN("remove record from lock name table failed", K(ret), K(lock_id));
}
} else {
// there is some other lock, the lock name-id record should no be removed.
}
return ret;
}
int ObLockFuncExecutor::remove_lock_id_(ObLockFuncContext &ctx,
const int64_t lock_id)
{
int ret = OB_SUCCESS;
ObSqlString delete_sql;
int64_t affected_rows = 0;
lib::CompatModeGuard guard(lib::Worker::CompatMode::MYSQL);
char table_name[MAX_FULL_TABLE_NAME_LENGTH] = {0};
OZ (databuff_printf(table_name, MAX_FULL_TABLE_NAME_LENGTH,
"%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_DBMS_LOCK_ALLOCATED_TNAME));
OZ (delete_sql.assign_fmt("DELETE FROM %s WHERE lockid = %ld",
table_name,
lock_id));
OZ (ctx.execute_write(delete_sql, affected_rows));
CK (OB_LIKELY(1 == affected_rows));
int64_t record_client_session_create_ts = 0;
ObSqlString sql;
ObTableLockOwnerID owner_id;
common::sqlclient::ObMySQLResult *result = nullptr;
OZ (owner_id.convert_from_client_sessid(client_session_id, client_session_create_ts));
OZ (databuff_printf(
table_name, MAX_FULL_TABLE_NAME_LENGTH, "%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_CLIENT_TO_SERVER_SESSION_INFO_TNAME));
OZ (sql.assign_fmt("SELECT time_to_usec(client_session_create_ts)"
" FROM %s WHERE client_session_id = %" PRIu32,
table_name,
client_session_id));
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
OZ (ctx.execute_read(sql, res));
OV (OB_NOT_NULL(result = res.get_result()), OB_ERR_UNEXPECTED, client_session_id);
OZ (result->next());
// there's no record, means the client_sessid is not used before, or has been cleaned
if (OB_ITER_END == ret) {
ret = OB_EMPTY_RESULT;
}
OX (GET_COL_IGNORE_NULL(result->get_int,
"time_to_usec(client_session_create_ts)",
record_client_session_create_ts));
}
OX(
if (OB_UNLIKELY(record_client_session_create_ts != client_session_create_ts)) {
ObTableLockOwnerID rec_owner_id;
ObTableLockOwnerID cur_owner_id;
OZ (rec_owner_id.convert_from_client_sessid(client_session_id, record_client_session_create_ts));
OZ (cur_owner_id.convert_from_client_sessid(client_session_id, client_session_create_ts));
if (rec_owner_id == cur_owner_id) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("meet client_session_id reuse, and has the same owner_id", K(rec_owner_id), K(cur_owner_id));
} else if (record_client_session_create_ts > client_session_create_ts) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("there's a client_session with larger create_ts",
K(client_session_id),
K(record_client_session_create_ts),
K(client_session_create_ts));
} else if (record_client_session_create_ts < client_session_create_ts) {
int tmp_ret = OB_SUCCESS;
LOG_INFO("meet reuse client_session_id, will recycle the eariler one",
K(client_session_id),
K(record_client_session_create_ts),
K(client_session_create_ts));
// Although the client_session_id is consistent, there is a high probability that the owner_id will not be
// consistent. Therefore, the failure to recycle here will not affect the subsequent locking process
if (OB_TMP_FAIL(ObTableLockDetector::remove_lock_by_owner_id(rec_owner_id.raw_value()))) {
LOG_WARN("recycle old lock with the same client_session_id failed, keep locking process",
K(tmp_ret),
K(client_session_id),
K(record_client_session_create_ts),
K(client_session_create_ts));
}
}
});
return ret;
}
int ObLockFuncExecutor::remove_session_record_(ObLockFuncContext &ctx,
const int64_t client_session_id)
const uint32_t client_session_id)
{
int ret = OB_SUCCESS;
bool lock_exist = false;
@ -408,14 +455,11 @@ int ObLockFuncExecutor::remove_session_record_(ObLockFuncContext &ctx,
ObTableLockOwnerID lock_owner;
ObSqlString delete_sql;
int64_t affected_rows = 0;
OZ (lock_owner.convert_from_value(ObLockOwnerType::SESS_ID_OWNER_TYPE,
client_session_id));
OZ (check_lock_exist_(ctx, lock_owner.raw_value(), lock_exist));
if (OB_SUCC(ret) && !lock_exist) {
lib::CompatModeGuard guard(lib::Worker::CompatMode::MYSQL);
OZ (databuff_printf(table_name, MAX_FULL_TABLE_NAME_LENGTH,
"%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_CLIENT_TO_SERVER_SESSION_INFO_TNAME));
OZ (delete_sql.assign_fmt("DELETE FROM %s WHERE client_session_id = %ld",
OZ (delete_sql.assign_fmt("DELETE FROM %s WHERE client_session_id = %" PRIu32,
table_name,
client_session_id));
OZ (ctx.execute_write(delete_sql, affected_rows));
@ -442,7 +486,6 @@ int ObLockFuncExecutor::query_lock_id_(const ObString &lock_name,
int ret = OB_SUCCESS;
uint64_t tenant_id = MTL_ID();
ObStringHolder query_lock_handle;
char lock_handle_buf[MAX_LOCK_HANDLE_LEGNTH] = {0};
// 1. check if there's a lock with the same lock name
char where_cond[WHERE_CONDITION_BUFFER_SIZE] = {0};
char table_name[MAX_FULL_TABLE_NAME_LENGTH] = {0};
@ -463,12 +506,48 @@ int ObLockFuncExecutor::query_lock_id_(const ObString &lock_name,
if (OB_EMPTY_RESULT == ret) {
// there is no lock name.
} else if (OB_SUCC(ret)) {
query_lock_handle.to_string(lock_handle_buf, sizeof(lock_handle_buf));
OZ (extract_lock_id_(query_lock_handle.get_ob_string(), lock_id));
}
return ret;
}
int ObLockFuncExecutor::query_lock_id_and_lock_handle_(const ObString &lock_name,
uint64_t &lock_id,
char *lock_handle_buf)
{
int ret = OB_SUCCESS;
uint64_t tenant_id = MTL_ID();
ObStringHolder query_lock_handle;
// 1. check if there's a lock with the same lock name
char where_cond[WHERE_CONDITION_BUFFER_SIZE] = {0};
char table_name[MAX_FULL_TABLE_NAME_LENGTH] = {0};
int64_t lock_handle_len = 0;
// generate corresponding lock handle for the lock name,
// and insert them into the inner table DBMS_LOCK_ALLOCATED
lock_id = OB_INVALID_OBJECT_ID;
OZ (databuff_printf(where_cond, WHERE_CONDITION_BUFFER_SIZE,
"WHERE name = '%s'", to_cstring(lock_name)));
OZ (databuff_printf(table_name, MAX_FULL_TABLE_NAME_LENGTH,
"%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_DBMS_LOCK_ALLOCATED_TNAME));
OZ (ObTableAccessHelper::read_single_row(tenant_id,
{ "lockhandle" },
table_name,
where_cond,
query_lock_handle));
if (OB_EMPTY_RESULT == ret) {
// there is no lock name.
} else if (OB_SUCC(ret)) {
ObString lock_handle_string = query_lock_handle.get_ob_string();
OZ (extract_lock_id_(lock_handle_string, lock_id));
OV (lock_handle_string.length() < MAX_LOCK_HANDLE_LEGNTH, OB_INVALID_ARGUMENT, lock_handle_string);
OX (MEMCPY(lock_handle_buf, lock_handle_string.ptr(), lock_handle_string.length()));
lock_handle_buf[lock_handle_string.length()] = '\0';
}
return ret;
}
int ObLockFuncExecutor::extract_lock_id_(const ObString &lock_handle,
uint64_t &lock_id)
{
@ -521,10 +600,10 @@ void ObLockFuncExecutor::mark_lock_session_(sql::ObSQLSessionInfo *session, cons
session->set_is_lock_session(is_lock_session);
session->set_need_send_feedback_proxy_info(true);
} else {
LOG_INFO("the lock_session status on the session won't be changed, no need to mark again",
K(session->get_sessid()),
K(session->is_lock_session()),
K(session->is_need_send_feedback_proxy_info()));
LOG_DEBUG("the lock_session status on the session won't be changed, no need to mark again",
K(session->get_sessid()),
K(session->is_lock_session()),
K(session->is_need_send_feedback_proxy_info()));
}
}
@ -534,38 +613,59 @@ int ObGetLockExecutor::execute(ObExecContext &ctx,
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
int64_t client_session_id = 0;
int64_t server_session_id = 0;
uint64_t client_session_create_ts = 0;
ObSQLSessionInfo *sess = ctx.get_my_session();
uint32_t client_session_id = sess->get_client_sessid();
uint32_t server_session_id = sess->get_sessid();
uint64_t client_session_create_ts = sess->get_client_create_time();
uint64_t lock_id = 0;
bool is_rollback = false;
ObSQLSessionInfo *sess = ctx.get_my_session();
ObTxParam tx_param;
OZ (ObLockFuncContext::valid_execute_context(ctx));
// 1. generate lock_id and update DBMS_LOCK_ALLOCATED table
//
// 2. check client_session_id is valid
// 3. check whether need reroute (skip temporarily)
// 4. modify inner table
// 4.1 add session into CLIENT_TO_SERVER_SESSION_INFO table
// 4.2 add lock_obj into DETECT_LOCK_INFO table
// 5. lock obj
if (OB_SUCC(ret)) {
SMART_VAR(ObLockFuncContext, stack_ctx) {
client_session_id = sess->get_client_sessid();
client_session_create_ts = sess->get_client_create_time();
server_session_id = sess->get_sessid();
OZ (stack_ctx.init(*sess, ctx, timeout_us));
if (OB_SUCC(ret)) {
ObTxParam tx_param;
// 1. check whether need reroute
// 2. modify inner table
// 2.1 alloc lock identity
// 2.2 add to connection table
// 3. lock obj
OZ (check_need_reroute_(stack_ctx, sess, client_session_id, client_session_create_ts));
OZ (generate_lock_id_(stack_ctx, lock_name, lock_id));
OZ (update_session_table_(stack_ctx,
client_session_id,
client_session_create_ts,
server_session_id));
OZ (ObInnerConnectionLockUtil::build_tx_param(sess, tx_param));
OZ (lock_obj_(sess, tx_param, client_session_id, lock_id, timeout_us));
OX (mark_lock_session_(sess, true));
}
SMART_VAR(ObLockFuncContext, stack_ctx1) {
OZ (stack_ctx1.init(*sess, ctx, timeout_us));
OZ (generate_lock_id_(stack_ctx1, lock_name, timeout_us, lock_id));
is_rollback = (OB_SUCCESS != ret);
if (OB_TMP_FAIL(stack_ctx.destroy(ctx, *(ctx.get_my_session()), is_rollback))) {
if (OB_TMP_FAIL(stack_ctx1.destroy(ctx, *(ctx.get_my_session()), is_rollback))) {
LOG_WARN("stack ctx destroy failed", K(tmp_ret));
COVER_SUCC(tmp_ret);
}
}
}
if (OB_SUCC(ret)) {
SMART_VAR(ObLockFuncContext, stack_ctx2) {
OZ (stack_ctx2.init(*sess, ctx, timeout_us));
// only when connect by proxy should check client_session
if (sess->is_obproxy_mode()) {
OZ (check_client_ssid_(stack_ctx2, client_session_id, client_session_create_ts));
if (OB_EMPTY_RESULT == ret) {
ret = OB_SUCCESS; // there're no same client_session_id records, continue
} else {
// TODO(yangyifei.yyf): some SQL can not redo, reroute may cause errors, so skip this step temporarily
// OZ (check_need_reroute_(stack_ctx1, sess, client_session_id, client_session_create_ts));
}
}
OZ (update_session_table_(stack_ctx2,
client_session_id,
client_session_create_ts,
server_session_id));
OZ (ObInnerConnectionLockUtil::build_tx_param(sess, tx_param));
OZ (lock_obj_(sess, tx_param, client_session_id, client_session_create_ts, lock_id, timeout_us));
OX (mark_lock_session_(sess, true));
is_rollback = (OB_SUCCESS != ret);
if (OB_TMP_FAIL(stack_ctx2.destroy(ctx, *(ctx.get_my_session()), is_rollback))) {
LOG_WARN("stack ctx destroy failed", K(tmp_ret));
COVER_SUCC(tmp_ret);
}
@ -576,7 +676,8 @@ int ObGetLockExecutor::execute(ObExecContext &ctx,
int ObGetLockExecutor::lock_obj_(sql::ObSQLSessionInfo *session,
const ObTxParam &tx_param,
const int64_t client_session_id,
const uint32_t client_session_id,
const uint64_t client_session_create_ts,
const int64_t obj_id,
const int64_t timeout_us)
{
@ -595,12 +696,12 @@ int ObGetLockExecutor::lock_obj_(sql::ObSQLSessionInfo *session,
if (OB_FAIL(lock_id.set(ObLockOBJType::OBJ_TYPE_MYSQL_LOCK_FUNC, obj_id))) {
LOG_WARN("set lock_id failed", K(ret), K(obj_id));
} else if (OB_FAIL(arg.owner_id_.convert_from_value(ObLockOwnerType::SESS_ID_OWNER_TYPE, client_session_id))) {
} else if (OB_FAIL(arg.owner_id_.convert_from_client_sessid(client_session_id, client_session_create_ts))) {
LOG_WARN("convert client_session_id to owner_id failed", K(ret), K(client_session_id));
} else if (OB_FAIL(arg.objs_.push_back(lock_id))) {
LOG_WARN("push_back lock_id to arg.objs_ failed", K(ret), K(arg), K(lock_id));
} else if (OB_FAIL(ObTableLockDetector::record_detect_info_to_inner_table(
session, LOCK_OBJECT, arg, need_record_to_lock_table))) {
session, LOCK_OBJECT, arg, /*for_dbms_lock*/ false, need_record_to_lock_table))) {
LOG_WARN("record_detect_info_to_inner_table failed", K(ret), K(arg));
} else if (need_record_to_lock_table) {
if (OB_FAIL(lock_service->lock_obj(*tx_desc, tx_param, arg))) {
@ -613,20 +714,18 @@ int ObGetLockExecutor::lock_obj_(sql::ObSQLSessionInfo *session,
int ObGetLockExecutor::generate_lock_id_(ObLockFuncContext &ctx,
const ObString &lock_name,
const int64_t timeout_us,
uint64_t &lock_id)
{
int ret = OB_SUCCESS;
char lock_handle_buf[MAX_LOCK_HANDLE_LEGNTH] = {0};
if (OB_SUCC(query_lock_id_(lock_name, lock_id))) {
} else if (OB_EMPTY_RESULT == ret) {
OZ (query_lock_id_and_lock_handle_(lock_name, lock_id, lock_handle_buf));
if (OB_EMPTY_RESULT == ret) {
// there is no result, should create one
ret = OB_SUCCESS;
OZ (generate_lock_id_(lock_name, lock_id, lock_handle_buf));
OZ (write_lock_id_(ctx, lock_name, lock_id, lock_handle_buf));
} else {
// get lock id failed.
LOG_WARN("query lock id failed", K(ret));
}
OZ (write_lock_id_(ctx, lock_name, timeout_us, lock_id, lock_handle_buf));
return ret;
}
@ -653,11 +752,13 @@ int ObGetLockExecutor::generate_lock_id_(const ObString &lock_name,
int ObGetLockExecutor::write_lock_id_(ObLockFuncContext &ctx,
const ObString &lock_name,
const int64_t timeout_us,
const uint64_t &lock_id,
const char *lock_handle_buf)
{
int ret = OB_SUCCESS;
ObDMLSqlSplicer insert_dml;
ObSqlString delete_sql;
ObSqlString insert_sql;
int64_t affected_rows = 0;
const int64_t now = ObTimeUtility::current_time();
@ -669,22 +770,33 @@ int ObGetLockExecutor::write_lock_id_(ObLockFuncContext &ctx,
OZ (insert_dml.add_gmt_create(now));
OZ (insert_dml.add_gmt_modified(now));
OZ (insert_dml.add_pk_column("name", lock_name));
// this is not used at mysql, mysql never expired.
OZ (insert_dml.add_time_column("expiration", now));
// make sure lock_obj will be timeout or success before lock_id is expired
OZ (insert_dml.add_time_column("expiration", now + timeout_us + DEFAULT_EXPIRATION_US));
OZ (insert_dml.add_column("lockid", lock_id));
OZ (insert_dml.add_column("lockhandle", lock_handle_buf));
OZ (insert_dml.splice_insert_update_sql(table_name,
insert_sql));
OZ (ctx.execute_write(insert_sql, affected_rows));
CK (OB_LIKELY(1 == affected_rows));
CK (OB_LIKELY(1 == affected_rows || 2 == affected_rows));
// clean lock_id which is expired and not locked
OZ(delete_sql.assign_fmt("DELETE FROM %s WHERE expiration <= usec_to_time(%" PRId64
") AND lockid NOT IN (SELECT obj_id FROM %s.%s where obj_type = %d)",
table_name,
now,
OB_SYS_DATABASE_NAME,
OB_ALL_DETECT_LOCK_INFO_TNAME,
static_cast<int>(ObLockOBJType::OBJ_TYPE_MYSQL_LOCK_FUNC)));
affected_rows = 0;
OZ (ctx.execute_write(delete_sql, affected_rows));
return ret;
}
int ObGetLockExecutor::update_session_table_(ObLockFuncContext &ctx,
const int64_t client_session_id,
const uint32_t client_session_id,
const uint64_t client_session_create_ts,
const int64_t server_session_id)
const uint32_t server_session_id)
{
int ret = OB_SUCCESS;
ObDMLSqlSplicer insert_dml;
@ -716,13 +828,13 @@ int ObGetLockExecutor::update_session_table_(ObLockFuncContext &ctx,
int ObGetLockExecutor::check_need_reroute_(ObLockFuncContext &ctx,
sql::ObSQLSessionInfo *session,
const int64_t client_session_id,
const uint32_t client_session_id,
const uint64_t client_session_create_ts)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObAddr lock_session_addr;
int64_t lock_session_id = 0;
uint32_t lock_session_id = 0;
int32_t sql_port = 0;
ObSqlCtx *sql_ctx = ctx.my_exec_ctx_->get_sql_ctx();
@ -758,10 +870,10 @@ int ObGetLockExecutor::check_need_reroute_(ObLockFuncContext &ctx,
}
int ObGetLockExecutor::get_lock_session_(ObLockFuncContext &ctx,
const int64_t client_session_id,
const uint32_t client_session_id,
const uint64_t client_session_create_ts,
ObAddr &lock_session_addr,
int64_t &lock_session_id)
uint32_t &lock_session_id)
{
int ret = OB_SUCCESS;
char table_name[MAX_FULL_TABLE_NAME_LENGTH] = {'\0'};
@ -773,9 +885,10 @@ int ObGetLockExecutor::get_lock_session_(ObLockFuncContext &ctx,
ObSqlString sql;
common::sqlclient::ObMySQLResult *result = nullptr;
OZ (sql.assign_fmt("SELECT svr_ip, svr_port, server_session_id"
" FROM %s WHERE client_session_id = %ld",
table_name,
client_session_id));
" FROM %s WHERE client_session_id = %" PRIu32 " AND client_session_create_ts = %" PRIu64,
table_name,
client_session_id,
client_session_create_ts));
OZ (ctx.execute_read(sql, res));
OV (OB_NOT_NULL(result = res.get_result()), OB_ERR_UNEXPECTED, client_session_id);
OZ (get_first_session_info_(*result, lock_session_addr, lock_session_id));
@ -786,11 +899,12 @@ int ObGetLockExecutor::get_lock_session_(ObLockFuncContext &ctx,
int ObGetLockExecutor::get_first_session_info_(common::sqlclient::ObMySQLResult &res,
ObAddr &session_addr,
int64_t &server_session_id)
uint32_t &server_session_id)
{
int ret = OB_SUCCESS;
ObString svr_ip;
int64_t svr_port;
uint64_t tmp_session_id = 0;
OZ (res.next());
if (OB_ITER_END == ret) {
@ -798,7 +912,8 @@ int ObGetLockExecutor::get_first_session_info_(common::sqlclient::ObMySQLResult
}
OX (GET_COL_IGNORE_NULL(res.get_varchar, "svr_ip", svr_ip));
OX (GET_COL_IGNORE_NULL(res.get_int, "svr_port", svr_port));
OX (GET_COL_IGNORE_NULL(res.get_int, "server_session_id", server_session_id));
OX (GET_COL_IGNORE_NULL(res.get_uint, "server_session_id", tmp_session_id));
OX (server_session_id = static_cast<uint32_t>(tmp_session_id));
OX (session_addr.reset());
OV (session_addr.set_ip_addr(to_cstring(svr_ip), svr_port), OB_ERR_UNEXPECTED, svr_ip, svr_port);
@ -816,7 +931,7 @@ int ObGetLockExecutor::get_sql_port_(ObLockFuncContext &ctx,
int64_t tmp_sql_port = 0;
OZ (databuff_printf(
table_name, MAX_FULL_TABLE_NAME_LENGTH, "%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_VIRTUAL_SERVER_TNAME));
table_name, MAX_FULL_TABLE_NAME_LENGTH, "%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_VIRTUAL_LS_META_TABLE_TNAME));
OV (svr_addr.ip_to_string(svr_ip, MAX_IP_ADDR_LENGTH), OB_ERR_UNEXPECTED, svr_addr);
OX (
SMART_VAR(ObMySQLProxy::MySQLResult, res)
@ -824,7 +939,7 @@ int ObGetLockExecutor::get_sql_port_(ObLockFuncContext &ctx,
ObSqlString sql;
common::sqlclient::ObMySQLResult *result = nullptr;
OZ (sql.assign_fmt("SELECT sql_port FROM %s"
" WHERE svr_ip = '%s' AND svr_port = %" PRId32,
" WHERE svr_ip = '%s' AND svr_port = %" PRId32 " LIMIT 1",
table_name, svr_ip, svr_port));
OZ (ctx.execute_read(sql, res));
OV (OB_NOT_NULL(result = res.get_result()), OB_ERR_UNEXPECTED, svr_addr, svr_ip, svr_port);
@ -841,8 +956,8 @@ int ObReleaseLockExecutor::execute(ObExecContext &ctx,
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
int64_t client_session_id = 0;
int64_t server_session_id = 0;
uint32_t client_session_id = 0;
uint64_t client_session_create_ts = 0;
uint64_t lock_id = 0;
bool is_rollback = false;
bool need_remove_from_lock_table = true;
@ -853,6 +968,7 @@ int ObReleaseLockExecutor::execute(ObExecContext &ctx,
if (OB_SUCC(ret)) {
SMART_VAR(ObLockFuncContext, stack_ctx) {
client_session_id = ctx.get_my_session()->get_client_sessid();
client_session_create_ts = ctx.get_my_session()->get_client_create_time();
OZ (stack_ctx.init(*(ctx.get_my_session()), ctx));
if (OB_SUCC(ret)) {
ObSQLSessionInfo *session = GET_MY_SESSION(ctx);
@ -860,17 +976,20 @@ int ObReleaseLockExecutor::execute(ObExecContext &ctx,
ObTxParam tx_param;
ObUnLockObjsRequest arg;
ObTableLockOwnerID lock_owner;
// 1. get lock id from inner table
// 2. unlock obj
// 3. modify inner table
// 3.1 lock table: dec cnt if the cnt is greater than 1, else remove the record.
// 1. check client_session_id is valid
// 2. get lock id from inner table
// 3. unlock obj
// 4. modify inner table
// 4.1 lock table: dec cnt if the cnt is greater than 1, else remove the record.
// (this is processed internal)
// 3.2 lock name-id table: check the lock table if there is no lock of the same
// 4.2 lock name-id table: check the lock table if there is no lock of the same
// lock id, remove the record at lock name-id table.
// 3.3 session table: check the lock table if there is no lock of the same
// 4.3 session table: check the lock table if there is no lock of the same
// client session, remove the record of the same client session id.
OZ (lock_owner.convert_from_value(ObLockOwnerType::SESS_ID_OWNER_TYPE,
client_session_id));
if (ctx.get_my_session()->is_obproxy_mode()) {
OZ (check_client_ssid_(stack_ctx, client_session_id, client_session_create_ts));
}
OZ (lock_owner.convert_from_client_sessid(client_session_id, client_session_create_ts));
OZ (query_lock_id_(lock_name, lock_id));
OZ (ObInnerConnectionLockUtil::build_tx_param(session, tx_param));
@ -886,7 +1005,6 @@ int ObReleaseLockExecutor::execute(ObExecContext &ctx,
session, LOCK_OBJECT, arg, need_remove_from_lock_table));
if (OB_SUCC(ret) && need_remove_from_lock_table) {
OZ (unlock_obj_(tx_desc, tx_param, arg));
OZ (remove_lock_id_(stack_ctx, lock_owner.raw_value(), lock_id));
OZ (remove_session_record_(stack_ctx, client_session_id));
} else if (OB_EMPTY_RESULT == ret) {
if (OB_TMP_FAIL(check_lock_exist_(stack_ctx, lock_id, lock_id_existed))) {
@ -911,19 +1029,23 @@ int ObReleaseAllLockExecutor::execute(ObExecContext &ctx,
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
int64_t client_session_id = 0;
uint32_t client_session_id = 0;
uint64_t client_session_create_ts = 0;
bool is_rollback = false;
OZ (ObLockFuncContext::valid_execute_context(ctx));
OX (client_session_id = ctx.get_my_session()->get_client_sessid());
OZ (execute_(ctx, client_session_id, release_cnt));
OX (client_session_create_ts = ctx.get_my_session()->get_client_create_time());
OZ (execute_(ctx, client_session_id, client_session_create_ts, release_cnt));
return ret;
}
int ObReleaseAllLockExecutor::execute(const int64_t client_session_id)
int ObReleaseAllLockExecutor::execute(const int64_t raw_owner_id)
{
int ret = OB_SUCCESS;
int64_t release_cnt = 0;
ObArenaAllocator allocator(ObModIds::OB_SQL_EXPR);
ObTableLockOwnerID owner_id;
OX (owner_id.convert_from_value(raw_owner_id));
SMART_VAR(sql::ObSQLSessionInfo, session) {
SMART_VAR(sql::ObExecContext, exec_ctx, allocator) {
ObSqlCtx sql_ctx;
@ -946,7 +1068,7 @@ int ObReleaseAllLockExecutor::execute(const int64_t client_session_id)
OX (exec_ctx.set_physical_plan_ctx(&phy_plan_ctx));
OZ (ObLockFuncContext::valid_execute_context(exec_ctx));
OZ (execute_(exec_ctx, client_session_id, release_cnt));
OZ (execute_(exec_ctx, owner_id, release_cnt));
OX (exec_ctx.set_physical_plan_ctx(nullptr)); // avoid core during release exec_ctx
}
}
@ -955,12 +1077,54 @@ int ObReleaseAllLockExecutor::execute(const int64_t client_session_id)
}
int ObReleaseAllLockExecutor::execute_(ObExecContext &ctx,
const int64_t client_session_id,
const uint32_t client_session_id,
const uint64_t client_session_create_ts,
int64_t &release_cnt)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
bool is_rollback = false;
ObTableLockOwnerID owner_id;
OZ (ObLockFuncContext::valid_execute_context(ctx));
if (OB_SUCC(ret)) {
SMART_VAR(ObLockFuncContext, stack_ctx) {
OZ (stack_ctx.init(*(ctx.get_my_session()), ctx));
if (OB_SUCC(ret)) {
ObSQLSessionInfo *session = GET_MY_SESSION(ctx);
ObTxDesc *tx_desc = session->get_tx_desc();
ObTxParam tx_param;
if (ctx.get_my_session()->is_obproxy_mode()) {
OZ (check_client_ssid_(stack_ctx, client_session_id, client_session_create_ts));
}
OZ (ObInnerConnectionLockUtil::build_tx_param(session, tx_param));
OZ (owner_id.convert_from_client_sessid(client_session_id, client_session_create_ts));
OZ (release_all_locks_(stack_ctx,
session,
tx_param,
owner_id,
release_cnt));
OZ (remove_session_record_(stack_ctx, client_session_id));
}
is_rollback = (OB_SUCCESS != ret);
if (OB_TMP_FAIL(stack_ctx.destroy(ctx, *(ctx.get_my_session()), is_rollback))) {
LOG_WARN("stack ctx destroy failed", K(tmp_ret));
COVER_SUCC(tmp_ret);
}
}
}
return ret;
}
int ObReleaseAllLockExecutor::execute_(ObExecContext &ctx,
const ObTableLockOwnerID &owner_id,
int64_t &release_cnt)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
bool is_rollback = false;
uint32_t client_session_id = 0;
OZ (owner_id.convert_to_sessid(client_session_id));
OZ (ObLockFuncContext::valid_execute_context(ctx));
if (OB_SUCC(ret)) {
SMART_VAR(ObLockFuncContext, stack_ctx) {
@ -970,11 +1134,7 @@ int ObReleaseAllLockExecutor::execute_(ObExecContext &ctx,
ObTxDesc *tx_desc = session->get_tx_desc();
ObTxParam tx_param;
OZ (ObInnerConnectionLockUtil::build_tx_param(session, tx_param));
OZ (release_all_locks_(stack_ctx,
session,
tx_param,
client_session_id,
release_cnt));
OZ (release_all_locks_(stack_ctx, session, tx_param, owner_id, release_cnt));
OZ (remove_session_record_(stack_ctx, client_session_id));
}
is_rollback = (OB_SUCCESS != ret);
@ -990,7 +1150,7 @@ int ObReleaseAllLockExecutor::execute_(ObExecContext &ctx,
int ObReleaseAllLockExecutor::release_all_locks_(ObLockFuncContext &ctx,
ObSQLSessionInfo *session,
const ObTxParam &tx_param,
const int64_t client_session_id,
const ObTableLockOwnerID &owner_id,
int64_t &release_cnt)
{
int ret = OB_SUCCESS;
@ -1005,11 +1165,10 @@ int ObReleaseAllLockExecutor::release_all_locks_(ObLockFuncContext &ctx,
{
ObSqlString sql;
common::sqlclient::ObMySQLResult *result = NULL;
if (OB_FAIL(lock_owner.convert_from_value(ObLockOwnerType::SESS_ID_OWNER_TYPE, client_session_id))) {
} else if (OB_FAIL(sql.assign_fmt("SELECT obj_type, obj_id, lock_mode, owner_id, cnt"
if (OB_FAIL(sql.assign_fmt("SELECT obj_type, obj_id, lock_mode, owner_id, cnt"
" FROM %s WHERE owner_id = %ld",
table_name,
lock_owner.raw_value()))) {
owner_id.raw_value()))) {
LOG_WARN("fail to assign fmt", KR(ret));
} else if (OB_FAIL(ctx.execute_read(sql, res))) {
LOG_WARN("execute sql failed", KR(ret), K(sql));
@ -1070,9 +1229,6 @@ int ObReleaseAllLockExecutor::release_all_locks_(ObLockFuncContext &ctx,
} else if (arg.objs_.count() != 1) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("do not support batch unlock right now", K(arg));
} else if (ObLockOBJType::OBJ_TYPE_MYSQL_LOCK_FUNC == arg.objs_[0].obj_type_
&& OB_FAIL(remove_lock_id_(ctx, arg.owner_id_.raw_value(), arg.objs_[0].obj_id_))) {
LOG_WARN("remove lock id failed", K(ret), K(arg));
}
}
@ -1114,7 +1270,6 @@ int ObReleaseAllLockExecutor::parse_lock_request_(common::sqlclient::ObMySQLResu
arg.op_type_ = ObTableLockOpType::OUT_TRANS_LOCK;
arg.timeout_us_ = 0;
arg.is_from_sql_ = true;
arg.owner_id_.convert_from_value(owner_id);
}
return ret;
@ -1146,13 +1301,14 @@ int ObISFreeLockExecutor::execute(ObExecContext &ctx,
UNUSED(ctx);
int ret = OB_SUCCESS;
uint64_t lock_id = 0;
OZ (query_lock_id_(lock_name, lock_id));
OZ(query_lock_id_(lock_name, lock_id));
OZ(check_lock_exist_(lock_id));
return ret;
}
int ObISUsedLockExecutor::execute(ObExecContext &ctx,
const ObString &lock_name,
int64_t &sess_id)
uint32_t &sess_id)
{
UNUSED(ctx);
int ret = OB_SUCCESS;

View File

@ -124,8 +124,10 @@ public:
static constexpr int64_t LOCK_ID_LENGTH = 10;
static constexpr int64_t MIN_LOCK_HANDLE_ID = 0x40000000;
static constexpr int64_t MAX_LOCK_HANDLE_ID = 1999999999;
static constexpr int64_t DEFAULT_EXPIRATION_US = 60 * 1000 * 1000L; // 1min
public:
int check_lock_exist_(const uint64_t &lock_id);
int check_lock_exist_(ObLockFuncContext &ctx,
const int64_t raw_owner_id,
const uint64_t &lock_id,
@ -139,18 +141,22 @@ public:
int check_lock_exist_(ObLockFuncContext &ctx,
ObSqlString &where_cond,
bool &exist);
int remove_lock_id_(ObLockFuncContext &ctx,
const int64_t raw_owner_id,
const uint64_t &lock_id);
int check_client_ssid_(ObLockFuncContext &ctx,
const uint32_t client_session_id,
const uint64_t client_session_create_ts);
int remove_lock_id_(ObLockFuncContext &ctx, const int64_t raw_owner_id, const uint64_t &lock_id);
int remove_lock_id_(ObLockFuncContext &ctx,
const int64_t lock_id);
int remove_session_record_(ObLockFuncContext &ctx,
const int64_t client_session_id);
const uint32_t client_session_id);
int unlock_obj_(transaction::ObTxDesc *tx_desc,
const transaction::ObTxParam &tx_param,
const ObUnLockObjsRequest &arg);
int query_lock_id_(const ObString &lock_name,
uint64_t &lock_id);
int query_lock_id_and_lock_handle_(const ObString &lock_name,
uint64_t &lock_id,
char *lock_handle_buf);
int query_lock_owner_(const uint64_t &lock_id,
int64_t &owner_id);
int extract_lock_id_(const ObString &lock_handle,
@ -169,25 +175,27 @@ public:
private:
int generate_lock_id_(ObLockFuncContext &ctx,
const ObString &lock_name,
const int64_t timeout_us,
uint64_t &lock_id);
int update_session_table_(ObLockFuncContext &ctx,
const int64_t client_session_id,
const uint32_t client_session_id,
const uint64_t client_session_create_ts,
const int64_t server_session_id);
const uint32_t server_session_id);
int check_need_reroute_(ObLockFuncContext &ctx,
sql::ObSQLSessionInfo *session,
const int64_t client_session_id,
const uint32_t client_session_id,
const uint64_t client_session_create_ts);
int get_lock_session_(ObLockFuncContext &ctx,
const int64_t client_session_id,
const uint32_t client_session_id,
const uint64_t client_session_create_ts,
ObAddr &lock_session_addr,
int64_t &lock_session_id);
int get_first_session_info_(common::sqlclient::ObMySQLResult &res, ObAddr &session_addr, int64_t &server_session_id);
uint32_t &lock_session_id);
int get_first_session_info_(common::sqlclient::ObMySQLResult &res, ObAddr &session_addr, uint32_t &server_session_id);
int get_sql_port_(ObLockFuncContext &ctx, const ObAddr &svr_addr, int32_t &sql_port);
int lock_obj_(sql::ObSQLSessionInfo *session,
const transaction::ObTxParam &tx_param,
const int64_t client_session_id,
const uint32_t client_session_id,
const uint64_t client_session_create_ts,
const int64_t obj_id,
const int64_t timeout_us);
int generate_lock_id_(const ObString &lock_name,
@ -195,6 +203,7 @@ private:
char *lock_handle);
int write_lock_id_(ObLockFuncContext &ctx,
const ObString &lock_name,
const int64_t timeout_us,
const uint64_t &lock_id,
const char *lock_handle_buf);
};
@ -212,15 +221,19 @@ public:
int execute(sql::ObExecContext &ctx,
int64_t &release_cnt);
// used internal, release all the lock that required by the session.
int execute(const int64_t client_session_id);
int execute(const int64_t raw_owner_id);
private:
int execute_(sql::ObExecContext &ctx,
const int64_t client_session_id,
const uint32_t client_session_id,
const uint64_t client_session_create_ts,
int64_t &release_cnt);
int execute_(sql::ObExecContext &ctx,
const ObTableLockOwnerID &owner_id,
int64_t &release_cnt);
int release_all_locks_(ObLockFuncContext &ctx,
sql::ObSQLSessionInfo *session,
const transaction::ObTxParam &tx_param,
const int64_t client_session_id,
const ObTableLockOwnerID &owner_id,
int64_t &release_cnt);
int release_all_locks_(ObLockFuncContext &ctx,
const ObIArray<ObUnLockObjsRequest> &arg_list,
@ -248,7 +261,7 @@ class ObISUsedLockExecutor : public ObLockFuncExecutor
public:
int execute(sql::ObExecContext &ctx,
const ObString &lock_name,
int64_t &sess_id);
uint32_t &sess_id);
};
} // tablelock

View File

@ -190,14 +190,24 @@ int ObTableLockOwnerID::convert_from_value(const ObLockOwnerType owner_type,
return ret;
}
int ObTableLockOwnerID::convert_to_sessid(int64_t &sessid) const
int ObTableLockOwnerID::convert_from_client_sessid(const uint32_t client_sessid, const uint64_t client_sess_create_ts)
{
int ret = OB_SUCCESS;
pack_ = 0;
type_ = static_cast<unsigned char>(ObLockOwnerType::SESS_ID_OWNER_TYPE);
int64_t client_unique_id = client_sess_create_ts & CLIENT_SESS_CREATE_TS_MASK;
id_ = (static_cast<int64_t>(client_sessid)) | (client_unique_id << CLIENT_SESS_ID_BIT);
return ret;
}
int ObTableLockOwnerID::convert_to_sessid(uint32_t &sessid) const
{
int ret = OB_SUCCESS;
if (type_ != static_cast<int64_t>(ObLockOwnerType::SESS_ID_OWNER_TYPE)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("this lock owner id cannot be converted to session id", K(ret), K_(type));
} else {
sessid = id_;
sessid = static_cast<uint32_t>(id_ & CLIENT_SESS_ID_MASK);
}
return ret;
}

View File

@ -436,6 +436,10 @@ class ObTableLockOwnerID
public:
static const int64_t INVALID_ID = -1;
static const int64_t INVALID_RAW_OWNER_ID = ((1L << 54) - 1);
static const int64_t CLIENT_SESS_CREATE_TS_BIT = 22;
static const int64_t CLIENT_SESS_CREATE_TS_MASK = (1L << CLIENT_SESS_CREATE_TS_BIT) - 1;
static const int64_t CLIENT_SESS_ID_BIT = 32;
static const int64_t CLIENT_SESS_ID_MASK = (1L << CLIENT_SESS_ID_BIT) - 1;
static const int64_t MAX_VALID_RAW_OWNER_ID = INVALID_RAW_OWNER_ID - 1;
ObTableLockOwnerID() : pack_(INVALID_ID) {}
@ -462,7 +466,8 @@ public:
int convert_from_value(const int64_t packed_id);
// check valid.
int convert_from_value(const ObLockOwnerType owner_type, const int64_t raw_owner_id);
int convert_to_sessid(int64_t &sessid) const;
int convert_from_client_sessid(const uint32_t client_sessid, const uint64_t client_sess_create_ts);
int convert_to_sessid(uint32_t &sessid) const;
// assignment
ObTableLockOwnerID &operator=(const ObTableLockOwnerID &other)
{ pack_ = other.pack_; return *this; }

View File

@ -84,7 +84,7 @@ int ObTableLockDetectFuncList::do_session_alive_detect()
ObArray<ObTuple<int64_t>> owner_ids;
bool client_session_alive = true;
ObTableLockOwnerID owner_id;
int64_t client_session_id = sql::ObSQLSessionInfo::INVALID_SESSID;
uint32_t client_session_id = sql::ObSQLSessionInfo::INVALID_SESSID;
char full_table_name[OB_MAX_TABLE_NAME_BUF_LENGTH];
lib::CompatModeGuard compat_guard(lib::Worker::CompatMode::MYSQL);
if (OB_FAIL(databuff_printf(
@ -112,8 +112,7 @@ int ObTableLockDetectFuncList::do_session_alive_detect()
} else if (!client_session_alive) {
LOG_INFO(
"find client session is not alive, we will clean all recodrs of it later", K(ret), K(client_session_id));
ObReleaseAllLockExecutor executor;
executor.execute(client_session_id);
ObTableLockDetector::remove_lock_by_owner_id(owner_ids[i].element<0>());
}
}
}
@ -251,18 +250,19 @@ int ObTableLockDetectFuncList::check_server_is_online_(const ObString &svr_ip, c
is_online = false;
char full_table_name[OB_MAX_TABLE_NAME_BUF_LENGTH];
if (OB_FAIL(databuff_printf(
where_cond,
512,
"WHERE (svr_ip, svr_port) IN (SELECT u.svr_ip, u.svr_port FROM %s.%s AS u JOIN %s.%s AS r on "
"r.resource_pool_id = u.resource_pool_id WHERE tenant_id = %ld) and svr_ip = '%s' and svr_port = %ld",
OB_SYS_DATABASE_NAME,
OB_ALL_UNIT_TNAME,
OB_SYS_DATABASE_NAME,
OB_ALL_RESOURCE_POOL_TNAME,
MTL_ID(),
to_cstring(svr_ip),
svr_port))) {
if (OB_FAIL(
databuff_printf(where_cond,
512,
"WHERE (svr_ip, svr_port) IN (SELECT u.svr_ip, u.svr_port FROM %s.%s AS u JOIN %s.%s AS r on "
"r.resource_pool_id = u.resource_pool_id WHERE tenant_id = %ld) and svr_ip = '%s' and svr_port "
"= %ld and status = 'active'",
OB_SYS_DATABASE_NAME,
OB_ALL_UNIT_TNAME,
OB_SYS_DATABASE_NAME,
OB_ALL_RESOURCE_POOL_TNAME,
MTL_ID(),
to_cstring(svr_ip),
svr_port))) {
LOG_WARN("generate where condition for select sql failed", K(svr_ip), K(svr_port));
} else if (OB_FAIL(databuff_printf(
full_table_name, OB_MAX_TABLE_NAME_BUF_LENGTH, "%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_SERVER_TNAME))) {
@ -289,15 +289,14 @@ const char *ObTableLockDetector::detect_columns[8] = {
int ObTableLockDetector::record_detect_info_to_inner_table(sql::ObSQLSessionInfo *session_info,
const ObTableLockTaskType &task_type,
const ObLockRequest &lock_req,
const bool for_dbms_lock,
bool &need_record_to_lock_table)
{
int ret = OB_SUCCESS;
observer::ObInnerSQLConnection *inner_conn = nullptr;
char full_table_name[OB_MAX_TABLE_NAME_BUF_LENGTH];
share::ObDMLSqlSplicer dml;
ObSqlString insert_sql;
int64_t affected_rows = 0;
bool need_release_conn = false;
bool is_existed = false;
need_record_to_lock_table = true;
lib::CompatModeGuard guard(lib::Worker::CompatMode::MYSQL);
@ -321,21 +320,16 @@ int ObTableLockDetector::record_detect_info_to_inner_table(sql::ObSQLSessionInfo
OB_SYS_DATABASE_NAME,
OB_ALL_DETECT_LOCK_INFO_TNAME))) {
LOG_WARN("generate full table_name failed", K(OB_SYS_DATABASE_NAME), K(OB_ALL_DETECT_LOCK_INFO_TNAME));
} else if (OB_FAIL(generate_insert_dml_(task_type, lock_req, dml))) {
LOG_WARN("generate insert dml failed", K(ret));
} else if (OB_FAIL(dml.splice_insert_sql(full_table_name, insert_sql))) {
LOG_WARN("generate insert sql failed", K(ret), K(full_table_name));
} else if (OB_FAIL(insert_sql.append(" ON DUPLICATE KEY UPDATE cnt = cnt + 1"))) {
LOG_WARN("append 'cnt = cnt + 1' to the insert_sql failed", K(ret), K(insert_sql));
} else if (OB_FAIL(ObInnerConnectionLockUtil::execute_write_sql(inner_conn, insert_sql, affected_rows))) {
LOG_WARN("execute insert sql failed", K(ret), K(insert_sql));
} else if (affected_rows == 2) {
} else if (for_dbms_lock && OB_FAIL(check_dbms_lock_record_exist_(inner_conn, full_table_name, task_type, lock_req, is_existed))) {
LOG_WARN("check dbms_lock record exist failed", K(ret), K(task_type), K(lock_req));
}
if (OB_FAIL(ret)) {
} else if (for_dbms_lock && is_existed) {
need_record_to_lock_table = false;
LOG_INFO("there's the same lock in __all_detect_lock_info table, no need to record it to the lock table",
K(lock_req));
} else if (affected_rows != 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("only can affetct 1 row due to insert, or 2 rows due to insert on duplicate key", K(affected_rows));
} else if (OB_FAIL(record_detect_info_to_inner_table_(
inner_conn, full_table_name, task_type, lock_req, need_record_to_lock_table))) {
LOG_WARN("record_detect_info_to_inner_table_ failed", K(ret), K(task_type), K(lock_req));
}
if (need_release_conn && OB_NOT_NULL(inner_conn)) {
@ -474,6 +468,102 @@ int ObTableLockDetector::do_detect_and_clear()
return ret;
}
int ObTableLockDetector::remove_lock_by_owner_id(const int64_t raw_owner_id)
{
int ret = OB_SUCCESS;
ObReleaseAllLockExecutor executor;
if (OB_FAIL(executor.execute(raw_owner_id))) {
LOG_WARN("remove lock by owner_id failed", K(raw_owner_id));
}
return ret;
}
int ObTableLockDetector::record_detect_info_to_inner_table_(observer::ObInnerSQLConnection *inner_conn,
const char *table_name,
const ObTableLockTaskType &task_type,
const ObLockRequest &lock_req,
bool &need_record_to_lock_table)
{
int ret = OB_SUCCESS;
share::ObDMLSqlSplicer dml;
ObSqlString insert_sql;
int64_t affected_rows = 0;
if (OB_FAIL(generate_insert_dml_(task_type, lock_req, dml))) {
LOG_WARN("generate insert dml failed", K(ret));
} else if (OB_FAIL(dml.splice_insert_sql(table_name, insert_sql))) {
LOG_WARN("generate insert sql failed", K(ret), K(table_name));
} else if (OB_FAIL(insert_sql.append(" ON DUPLICATE KEY UPDATE cnt = cnt + 1"))) {
LOG_WARN("append 'cnt = cnt + 1' to the insert_sql failed", K(ret), K(insert_sql));
} else if (OB_FAIL(ObInnerConnectionLockUtil::execute_write_sql(inner_conn, insert_sql, affected_rows))) {
LOG_WARN("execute insert sql failed", K(ret), K(insert_sql));
} else if (affected_rows == 2) {
need_record_to_lock_table = false;
LOG_INFO("there's the same lock in __all_detect_lock_info table, no need to record it to the lock table",
K(lock_req));
} else if (affected_rows != 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("only can affetct 1 row due to insert, or 2 rows due to insert on duplicate key", K(affected_rows));
}
return ret;
}
int ObTableLockDetector::check_dbms_lock_record_exist_(observer::ObInnerSQLConnection *inner_conn,
const char *table_name,
const ObTableLockTaskType &task_type,
const ObLockRequest &lock_req,
bool &is_existed)
{
int ret = OB_SUCCESS;
ObSqlString sql;
uint64_t obj_type = static_cast<uint64_t>(ObLockOBJType::OBJ_TYPE_INVALID);
uint64_t obj_id = OB_INVALID_ID;
if (LOCK_OBJECT != task_type) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the task_type for DBMS_LOCK should be LOCK_OBJECT", K(ret), K(task_type), K(lock_req));
} else {
const ObLockObjsRequest &arg = static_cast<const ObLockObjsRequest &>(lock_req);
if (arg.objs_.count() > 1) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("do not support detect batch lock obj request right now", K(arg));
} else {
obj_type = static_cast<uint64_t>(arg.objs_[0].obj_type_);
obj_id = arg.objs_[0].obj_id_;
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(sql.append_fmt("SELECT 1 FROM %s WHERE task_type = %d AND obj_type = %" PRIu64 " AND obj_id = %" PRIu64
" AND owner_id = %" PRId64,
table_name,
static_cast<int>(task_type),
obj_type,
obj_id,
lock_req.owner_id_.raw_value()))) {
LOG_WARN("make select sql failed", K(ret), K(task_type), K(lock_req));
} else {
SMART_VAR(ObMySQLProxy::MySQLResult, res)
{
common::sqlclient::ObMySQLResult *result = nullptr;
if (OB_FAIL(ObInnerConnectionLockUtil::execute_read_sql(inner_conn, sql, res))) {
LOG_WARN("execute_read_sql for check_dbms_lock_record_exist_ failed", KR(ret), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get result", KR(ret), K(sql));
} else if (OB_FAIL(result->next())) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
is_existed = false;
}
} else {
is_existed = true;
}
}
}
return ret;
}
int ObTableLockDetector::generate_insert_dml_(const ObTableLockTaskType &task_type,
const ObLockRequest &lock_req,
share::ObDMLSqlSplicer &dml)
@ -502,7 +592,7 @@ int ObTableLockDetector::add_pk_column_to_dml_(const ObTableLockTaskType &task_t
share::ObDMLSqlSplicer &dml)
{
int ret = OB_SUCCESS;
uint64_t obj_type = static_cast<uint64_t>(ObTableLockTaskType::INVALID_LOCK_TASK_TYPE);
uint64_t obj_type = static_cast<uint64_t>(ObLockOBJType::OBJ_TYPE_INVALID);
uint64_t obj_id = OB_INVALID_ID;
switch (task_type) {

View File

@ -98,6 +98,7 @@ public:
static int record_detect_info_to_inner_table(sql::ObSQLSessionInfo *session_info,
const ObTableLockTaskType &task_type,
const ObLockRequest &lock_req,
const bool for_dbms_lock,
bool &need_record_to_lock_table);
static int remove_detect_info_from_inner_table(sql::ObSQLSessionInfo *session_info,
const ObTableLockTaskType &task_type,
@ -108,8 +109,19 @@ public:
const ObLockRequest &lock_req,
int64_t &cnt);
static int do_detect_and_clear();
static int remove_lock_by_owner_id(const int64_t raw_owner_id);
private:
static int record_detect_info_to_inner_table_(observer::ObInnerSQLConnection *inner_conn,
const char *table_name,
const ObTableLockTaskType &task_type,
const ObLockRequest &lock_req,
bool &need_record_to_lock_table);
static int check_dbms_lock_record_exist_(observer::ObInnerSQLConnection *inner_conn,
const char *table_name,
const ObTableLockTaskType &task_type,
const ObLockRequest &lock_req,
bool &is_existed);
static int generate_insert_dml_(const ObTableLockTaskType &task_type,
const ObLockRequest &lock_req,
share::ObDMLSqlSplicer &dml);