diff --git a/src/sql/engine/expr/ob_expr_lock_func.cpp b/src/sql/engine/expr/ob_expr_lock_func.cpp index 81c2675ce..2a95f6062 100644 --- a/src/sql/engine/expr/ob_expr_lock_func.cpp +++ b/src/sql/engine/expr/ob_expr_lock_func.cpp @@ -104,7 +104,14 @@ bool ObExprLockFunc::proxy_is_support(const ObExecContext &exec_ctx) if (OB_ISNULL(session)) { LOG_ERROR_RET(OB_INVALID_ARGUMENT, "session is null!"); } else { - is_support = session->is_feedback_proxy_info_support() || !session->is_obproxy_mode(); + is_support = (session->is_feedback_proxy_info_support() && session->is_client_sessid_support()) || !session->is_obproxy_mode(); + if (!is_support) { + LOG_WARN_RET(OB_NOT_SUPPORTED, + "proxy is not support this feature", + K(session->get_sessid()), + K(session->is_feedback_proxy_info_support()), + K(session->is_client_sessid_support())); + } } return is_support; } @@ -334,7 +341,7 @@ int ObExprIsUsedLock::is_used_lock(const ObExpr &expr, ObEvalCtx &ctx, ObDatum & int ret = OB_SUCCESS; // we can get session info use this const ObSQLSessionInfo *session = ctx.exec_ctx_.get_my_session(); - int64_t sess_id = 0; + uint32_t sess_id = 0; ObDatum *lock_name = NULL; if (!proxy_is_support(ctx.exec_ctx_)) { @@ -362,7 +369,7 @@ int ObExprIsUsedLock::is_used_lock(const ObExpr &expr, ObEvalCtx &ctx, ObDatum & // 1. get the lock session id if (OB_SUCC(ret)) { - result.set_int(sess_id); + result.set_uint32(sess_id); } else if (OB_EMPTY_RESULT == ret) { // 2. there is nobody hold the lock ret = OB_SUCCESS; diff --git a/src/storage/tablelock/ob_lock_func_executor.cpp b/src/storage/tablelock/ob_lock_func_executor.cpp index 4d7281182..f72ac1da9 100644 --- a/src/storage/tablelock/ob_lock_func_executor.cpp +++ b/src/storage/tablelock/ob_lock_func_executor.cpp @@ -16,6 +16,7 @@ #include "lib/mysqlclient/ob_mysql_proxy.h" #include "lib/mysqlclient/ob_mysql_result.h" #include "lib/utility/ob_fast_convert.h" +#include "lib/alloc/alloc_assist.h" #include "observer/ob_inner_sql_connection.h" #include "share/ob_table_access_helper.h" #include "sql/engine/ob_exec_context.h" @@ -275,6 +276,30 @@ int ObLockFuncContext::execute_read(const ObSqlString &sql, ObMySQLProxy::MySQLR } return ret; } +int ObLockFuncExecutor::check_lock_exist_(const uint64_t &lock_id) +{ + int ret = OB_SUCCESS; + uint64_t tenant_id = MTL_ID(); + ObStringHolder query_lock_handle; + char where_cond[WHERE_CONDITION_BUFFER_SIZE] = {0}; + char table_name[MAX_FULL_TABLE_NAME_LENGTH] = {0}; + bool is_existed = false; + + OZ(databuff_printf(where_cond, + WHERE_CONDITION_BUFFER_SIZE, + "WHERE obj_id = %" PRIu64 + " and obj_type = %d", + lock_id, + static_cast(ObLockOBJType::OBJ_TYPE_MYSQL_LOCK_FUNC))); + OZ (databuff_printf(table_name, MAX_FULL_TABLE_NAME_LENGTH, + "%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_DETECT_LOCK_INFO_TNAME)); + OZ (ObTableAccessHelper::read_single_row(tenant_id, + { "1" }, + table_name, + where_cond, + is_existed)); + return ret; +} int ObLockFuncExecutor::check_lock_exist_(ObLockFuncContext &ctx, ObSqlString &where_cond, @@ -330,9 +355,7 @@ int ObLockFuncExecutor::check_lock_exist_(ObLockFuncContext &ctx, return ret; } -int ObLockFuncExecutor::check_lock_exist_(ObLockFuncContext &ctx, - const uint64_t &lock_id, - bool &exist) +int ObLockFuncExecutor::check_lock_exist_(ObLockFuncContext &ctx, const uint64_t &lock_id, bool &exist) { int ret = OB_SUCCESS; ObSqlString where_cond; @@ -358,49 +381,73 @@ int ObLockFuncExecutor::check_lock_exist_(ObLockFuncContext &ctx, return ret; } -int ObLockFuncExecutor::remove_lock_id_(ObLockFuncContext &ctx, - const int64_t raw_owner_id, - const uint64_t &lock_id) +int ObLockFuncExecutor::check_client_ssid_(ObLockFuncContext &ctx, + const uint32_t client_session_id, + const uint64_t client_session_create_ts) { int ret = OB_SUCCESS; - bool lock_exist = false; - if (OB_FAIL(check_lock_exist_(ctx, - raw_owner_id, - lock_id, - lock_exist))) { - LOG_WARN("check lock exist failed", K(ret), K(raw_owner_id), K(lock_id)); - } else if (!lock_exist) { - if (OB_FAIL(remove_lock_id_(ctx, lock_id))) { - LOG_WARN("remove record from lock name table failed", K(ret), K(lock_id)); - } - } else { - // there is some other lock, the lock name-id record should no be removed. - } - return ret; -} - -int ObLockFuncExecutor::remove_lock_id_(ObLockFuncContext &ctx, - const int64_t lock_id) -{ - int ret = OB_SUCCESS; - ObSqlString delete_sql; - int64_t affected_rows = 0; - lib::CompatModeGuard guard(lib::Worker::CompatMode::MYSQL); char table_name[MAX_FULL_TABLE_NAME_LENGTH] = {0}; - OZ (databuff_printf(table_name, MAX_FULL_TABLE_NAME_LENGTH, - "%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_DBMS_LOCK_ALLOCATED_TNAME)); - - OZ (delete_sql.assign_fmt("DELETE FROM %s WHERE lockid = %ld", - table_name, - lock_id)); - OZ (ctx.execute_write(delete_sql, affected_rows)); - CK (OB_LIKELY(1 == affected_rows)); + int64_t record_client_session_create_ts = 0; + ObSqlString sql; + ObTableLockOwnerID owner_id; + common::sqlclient::ObMySQLResult *result = nullptr; + OZ (owner_id.convert_from_client_sessid(client_session_id, client_session_create_ts)); + OZ (databuff_printf( + table_name, MAX_FULL_TABLE_NAME_LENGTH, "%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_CLIENT_TO_SERVER_SESSION_INFO_TNAME)); + OZ (sql.assign_fmt("SELECT time_to_usec(client_session_create_ts)" + " FROM %s WHERE client_session_id = %" PRIu32, + table_name, + client_session_id)); + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + OZ (ctx.execute_read(sql, res)); + OV (OB_NOT_NULL(result = res.get_result()), OB_ERR_UNEXPECTED, client_session_id); + OZ (result->next()); + // there's no record, means the client_sessid is not used before, or has been cleaned + if (OB_ITER_END == ret) { + ret = OB_EMPTY_RESULT; + } + OX (GET_COL_IGNORE_NULL(result->get_int, + "time_to_usec(client_session_create_ts)", + record_client_session_create_ts)); + } + OX( + if (OB_UNLIKELY(record_client_session_create_ts != client_session_create_ts)) { + ObTableLockOwnerID rec_owner_id; + ObTableLockOwnerID cur_owner_id; + OZ (rec_owner_id.convert_from_client_sessid(client_session_id, record_client_session_create_ts)); + OZ (cur_owner_id.convert_from_client_sessid(client_session_id, client_session_create_ts)); + if (rec_owner_id == cur_owner_id) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("meet client_session_id reuse, and has the same owner_id", K(rec_owner_id), K(cur_owner_id)); + } else if (record_client_session_create_ts > client_session_create_ts) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("there's a client_session with larger create_ts", + K(client_session_id), + K(record_client_session_create_ts), + K(client_session_create_ts)); + } else if (record_client_session_create_ts < client_session_create_ts) { + int tmp_ret = OB_SUCCESS; + LOG_INFO("meet reuse client_session_id, will recycle the eariler one", + K(client_session_id), + K(record_client_session_create_ts), + K(client_session_create_ts)); + // Although the client_session_id is consistent, there is a high probability that the owner_id will not be + // consistent. Therefore, the failure to recycle here will not affect the subsequent locking process + if (OB_TMP_FAIL(ObTableLockDetector::remove_lock_by_owner_id(rec_owner_id.raw_value()))) { + LOG_WARN("recycle old lock with the same client_session_id failed, keep locking process", + K(tmp_ret), + K(client_session_id), + K(record_client_session_create_ts), + K(client_session_create_ts)); + } + } + }); return ret; } int ObLockFuncExecutor::remove_session_record_(ObLockFuncContext &ctx, - const int64_t client_session_id) + const uint32_t client_session_id) { int ret = OB_SUCCESS; bool lock_exist = false; @@ -408,14 +455,11 @@ int ObLockFuncExecutor::remove_session_record_(ObLockFuncContext &ctx, ObTableLockOwnerID lock_owner; ObSqlString delete_sql; int64_t affected_rows = 0; - OZ (lock_owner.convert_from_value(ObLockOwnerType::SESS_ID_OWNER_TYPE, - client_session_id)); - OZ (check_lock_exist_(ctx, lock_owner.raw_value(), lock_exist)); if (OB_SUCC(ret) && !lock_exist) { lib::CompatModeGuard guard(lib::Worker::CompatMode::MYSQL); OZ (databuff_printf(table_name, MAX_FULL_TABLE_NAME_LENGTH, "%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_CLIENT_TO_SERVER_SESSION_INFO_TNAME)); - OZ (delete_sql.assign_fmt("DELETE FROM %s WHERE client_session_id = %ld", + OZ (delete_sql.assign_fmt("DELETE FROM %s WHERE client_session_id = %" PRIu32, table_name, client_session_id)); OZ (ctx.execute_write(delete_sql, affected_rows)); @@ -442,7 +486,6 @@ int ObLockFuncExecutor::query_lock_id_(const ObString &lock_name, int ret = OB_SUCCESS; uint64_t tenant_id = MTL_ID(); ObStringHolder query_lock_handle; - char lock_handle_buf[MAX_LOCK_HANDLE_LEGNTH] = {0}; // 1. check if there's a lock with the same lock name char where_cond[WHERE_CONDITION_BUFFER_SIZE] = {0}; char table_name[MAX_FULL_TABLE_NAME_LENGTH] = {0}; @@ -463,12 +506,48 @@ int ObLockFuncExecutor::query_lock_id_(const ObString &lock_name, if (OB_EMPTY_RESULT == ret) { // there is no lock name. } else if (OB_SUCC(ret)) { - query_lock_handle.to_string(lock_handle_buf, sizeof(lock_handle_buf)); OZ (extract_lock_id_(query_lock_handle.get_ob_string(), lock_id)); } return ret; } +int ObLockFuncExecutor::query_lock_id_and_lock_handle_(const ObString &lock_name, + uint64_t &lock_id, + char *lock_handle_buf) +{ + int ret = OB_SUCCESS; + uint64_t tenant_id = MTL_ID(); + ObStringHolder query_lock_handle; + // 1. check if there's a lock with the same lock name + char where_cond[WHERE_CONDITION_BUFFER_SIZE] = {0}; + char table_name[MAX_FULL_TABLE_NAME_LENGTH] = {0}; + int64_t lock_handle_len = 0; + // generate corresponding lock handle for the lock name, + // and insert them into the inner table DBMS_LOCK_ALLOCATED + + lock_id = OB_INVALID_OBJECT_ID; + + OZ (databuff_printf(where_cond, WHERE_CONDITION_BUFFER_SIZE, + "WHERE name = '%s'", to_cstring(lock_name))); + OZ (databuff_printf(table_name, MAX_FULL_TABLE_NAME_LENGTH, + "%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_DBMS_LOCK_ALLOCATED_TNAME)); + OZ (ObTableAccessHelper::read_single_row(tenant_id, + { "lockhandle" }, + table_name, + where_cond, + query_lock_handle)); + if (OB_EMPTY_RESULT == ret) { + // there is no lock name. + } else if (OB_SUCC(ret)) { + ObString lock_handle_string = query_lock_handle.get_ob_string(); + OZ (extract_lock_id_(lock_handle_string, lock_id)); + OV (lock_handle_string.length() < MAX_LOCK_HANDLE_LEGNTH, OB_INVALID_ARGUMENT, lock_handle_string); + OX (MEMCPY(lock_handle_buf, lock_handle_string.ptr(), lock_handle_string.length())); + lock_handle_buf[lock_handle_string.length()] = '\0'; + } + return ret; +} + int ObLockFuncExecutor::extract_lock_id_(const ObString &lock_handle, uint64_t &lock_id) { @@ -521,10 +600,10 @@ void ObLockFuncExecutor::mark_lock_session_(sql::ObSQLSessionInfo *session, cons session->set_is_lock_session(is_lock_session); session->set_need_send_feedback_proxy_info(true); } else { - LOG_INFO("the lock_session status on the session won't be changed, no need to mark again", - K(session->get_sessid()), - K(session->is_lock_session()), - K(session->is_need_send_feedback_proxy_info())); + LOG_DEBUG("the lock_session status on the session won't be changed, no need to mark again", + K(session->get_sessid()), + K(session->is_lock_session()), + K(session->is_need_send_feedback_proxy_info())); } } @@ -534,38 +613,59 @@ int ObGetLockExecutor::execute(ObExecContext &ctx, { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - int64_t client_session_id = 0; - int64_t server_session_id = 0; - uint64_t client_session_create_ts = 0; + ObSQLSessionInfo *sess = ctx.get_my_session(); + uint32_t client_session_id = sess->get_client_sessid(); + uint32_t server_session_id = sess->get_sessid(); + uint64_t client_session_create_ts = sess->get_client_create_time(); uint64_t lock_id = 0; bool is_rollback = false; - ObSQLSessionInfo *sess = ctx.get_my_session(); + ObTxParam tx_param; + OZ (ObLockFuncContext::valid_execute_context(ctx)); + // 1. generate lock_id and update DBMS_LOCK_ALLOCATED table + // + // 2. check client_session_id is valid + // 3. check whether need reroute (skip temporarily) + // 4. modify inner table + // 4.1 add session into CLIENT_TO_SERVER_SESSION_INFO table + // 4.2 add lock_obj into DETECT_LOCK_INFO table + // 5. lock obj + if (OB_SUCC(ret)) { - SMART_VAR(ObLockFuncContext, stack_ctx) { - client_session_id = sess->get_client_sessid(); - client_session_create_ts = sess->get_client_create_time(); - server_session_id = sess->get_sessid(); - OZ (stack_ctx.init(*sess, ctx, timeout_us)); - if (OB_SUCC(ret)) { - ObTxParam tx_param; - // 1. check whether need reroute - // 2. modify inner table - // 2.1 alloc lock identity - // 2.2 add to connection table - // 3. lock obj - OZ (check_need_reroute_(stack_ctx, sess, client_session_id, client_session_create_ts)); - OZ (generate_lock_id_(stack_ctx, lock_name, lock_id)); - OZ (update_session_table_(stack_ctx, - client_session_id, - client_session_create_ts, - server_session_id)); - OZ (ObInnerConnectionLockUtil::build_tx_param(sess, tx_param)); - OZ (lock_obj_(sess, tx_param, client_session_id, lock_id, timeout_us)); - OX (mark_lock_session_(sess, true)); - } + SMART_VAR(ObLockFuncContext, stack_ctx1) { + OZ (stack_ctx1.init(*sess, ctx, timeout_us)); + OZ (generate_lock_id_(stack_ctx1, lock_name, timeout_us, lock_id)); + is_rollback = (OB_SUCCESS != ret); - if (OB_TMP_FAIL(stack_ctx.destroy(ctx, *(ctx.get_my_session()), is_rollback))) { + if (OB_TMP_FAIL(stack_ctx1.destroy(ctx, *(ctx.get_my_session()), is_rollback))) { + LOG_WARN("stack ctx destroy failed", K(tmp_ret)); + COVER_SUCC(tmp_ret); + } + } + } + if (OB_SUCC(ret)) { + SMART_VAR(ObLockFuncContext, stack_ctx2) { + OZ (stack_ctx2.init(*sess, ctx, timeout_us)); + // only when connect by proxy should check client_session + if (sess->is_obproxy_mode()) { + OZ (check_client_ssid_(stack_ctx2, client_session_id, client_session_create_ts)); + if (OB_EMPTY_RESULT == ret) { + ret = OB_SUCCESS; // there're no same client_session_id records, continue + } else { + // TODO(yangyifei.yyf): some SQL can not redo, reroute may cause errors, so skip this step temporarily + // OZ (check_need_reroute_(stack_ctx1, sess, client_session_id, client_session_create_ts)); + } + } + OZ (update_session_table_(stack_ctx2, + client_session_id, + client_session_create_ts, + server_session_id)); + OZ (ObInnerConnectionLockUtil::build_tx_param(sess, tx_param)); + OZ (lock_obj_(sess, tx_param, client_session_id, client_session_create_ts, lock_id, timeout_us)); + OX (mark_lock_session_(sess, true)); + + is_rollback = (OB_SUCCESS != ret); + if (OB_TMP_FAIL(stack_ctx2.destroy(ctx, *(ctx.get_my_session()), is_rollback))) { LOG_WARN("stack ctx destroy failed", K(tmp_ret)); COVER_SUCC(tmp_ret); } @@ -576,7 +676,8 @@ int ObGetLockExecutor::execute(ObExecContext &ctx, int ObGetLockExecutor::lock_obj_(sql::ObSQLSessionInfo *session, const ObTxParam &tx_param, - const int64_t client_session_id, + const uint32_t client_session_id, + const uint64_t client_session_create_ts, const int64_t obj_id, const int64_t timeout_us) { @@ -595,12 +696,12 @@ int ObGetLockExecutor::lock_obj_(sql::ObSQLSessionInfo *session, if (OB_FAIL(lock_id.set(ObLockOBJType::OBJ_TYPE_MYSQL_LOCK_FUNC, obj_id))) { LOG_WARN("set lock_id failed", K(ret), K(obj_id)); - } else if (OB_FAIL(arg.owner_id_.convert_from_value(ObLockOwnerType::SESS_ID_OWNER_TYPE, client_session_id))) { + } else if (OB_FAIL(arg.owner_id_.convert_from_client_sessid(client_session_id, client_session_create_ts))) { LOG_WARN("convert client_session_id to owner_id failed", K(ret), K(client_session_id)); } else if (OB_FAIL(arg.objs_.push_back(lock_id))) { LOG_WARN("push_back lock_id to arg.objs_ failed", K(ret), K(arg), K(lock_id)); } else if (OB_FAIL(ObTableLockDetector::record_detect_info_to_inner_table( - session, LOCK_OBJECT, arg, need_record_to_lock_table))) { + session, LOCK_OBJECT, arg, /*for_dbms_lock*/ false, need_record_to_lock_table))) { LOG_WARN("record_detect_info_to_inner_table failed", K(ret), K(arg)); } else if (need_record_to_lock_table) { if (OB_FAIL(lock_service->lock_obj(*tx_desc, tx_param, arg))) { @@ -613,20 +714,18 @@ int ObGetLockExecutor::lock_obj_(sql::ObSQLSessionInfo *session, int ObGetLockExecutor::generate_lock_id_(ObLockFuncContext &ctx, const ObString &lock_name, + const int64_t timeout_us, uint64_t &lock_id) { int ret = OB_SUCCESS; char lock_handle_buf[MAX_LOCK_HANDLE_LEGNTH] = {0}; - if (OB_SUCC(query_lock_id_(lock_name, lock_id))) { - } else if (OB_EMPTY_RESULT == ret) { + OZ (query_lock_id_and_lock_handle_(lock_name, lock_id, lock_handle_buf)); + if (OB_EMPTY_RESULT == ret) { // there is no result, should create one ret = OB_SUCCESS; OZ (generate_lock_id_(lock_name, lock_id, lock_handle_buf)); - OZ (write_lock_id_(ctx, lock_name, lock_id, lock_handle_buf)); - } else { - // get lock id failed. - LOG_WARN("query lock id failed", K(ret)); } + OZ (write_lock_id_(ctx, lock_name, timeout_us, lock_id, lock_handle_buf)); return ret; } @@ -653,11 +752,13 @@ int ObGetLockExecutor::generate_lock_id_(const ObString &lock_name, int ObGetLockExecutor::write_lock_id_(ObLockFuncContext &ctx, const ObString &lock_name, + const int64_t timeout_us, const uint64_t &lock_id, const char *lock_handle_buf) { int ret = OB_SUCCESS; ObDMLSqlSplicer insert_dml; + ObSqlString delete_sql; ObSqlString insert_sql; int64_t affected_rows = 0; const int64_t now = ObTimeUtility::current_time(); @@ -669,22 +770,33 @@ int ObGetLockExecutor::write_lock_id_(ObLockFuncContext &ctx, OZ (insert_dml.add_gmt_create(now)); OZ (insert_dml.add_gmt_modified(now)); OZ (insert_dml.add_pk_column("name", lock_name)); - // this is not used at mysql, mysql never expired. - OZ (insert_dml.add_time_column("expiration", now)); + // make sure lock_obj will be timeout or success before lock_id is expired + OZ (insert_dml.add_time_column("expiration", now + timeout_us + DEFAULT_EXPIRATION_US)); OZ (insert_dml.add_column("lockid", lock_id)); OZ (insert_dml.add_column("lockhandle", lock_handle_buf)); OZ (insert_dml.splice_insert_update_sql(table_name, insert_sql)); OZ (ctx.execute_write(insert_sql, affected_rows)); - CK (OB_LIKELY(1 == affected_rows)); + CK (OB_LIKELY(1 == affected_rows || 2 == affected_rows)); + + // clean lock_id which is expired and not locked + OZ(delete_sql.assign_fmt("DELETE FROM %s WHERE expiration <= usec_to_time(%" PRId64 + ") AND lockid NOT IN (SELECT obj_id FROM %s.%s where obj_type = %d)", + table_name, + now, + OB_SYS_DATABASE_NAME, + OB_ALL_DETECT_LOCK_INFO_TNAME, + static_cast(ObLockOBJType::OBJ_TYPE_MYSQL_LOCK_FUNC))); + affected_rows = 0; + OZ (ctx.execute_write(delete_sql, affected_rows)); return ret; } int ObGetLockExecutor::update_session_table_(ObLockFuncContext &ctx, - const int64_t client_session_id, + const uint32_t client_session_id, const uint64_t client_session_create_ts, - const int64_t server_session_id) + const uint32_t server_session_id) { int ret = OB_SUCCESS; ObDMLSqlSplicer insert_dml; @@ -716,13 +828,13 @@ int ObGetLockExecutor::update_session_table_(ObLockFuncContext &ctx, int ObGetLockExecutor::check_need_reroute_(ObLockFuncContext &ctx, sql::ObSQLSessionInfo *session, - const int64_t client_session_id, + const uint32_t client_session_id, const uint64_t client_session_create_ts) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; ObAddr lock_session_addr; - int64_t lock_session_id = 0; + uint32_t lock_session_id = 0; int32_t sql_port = 0; ObSqlCtx *sql_ctx = ctx.my_exec_ctx_->get_sql_ctx(); @@ -758,10 +870,10 @@ int ObGetLockExecutor::check_need_reroute_(ObLockFuncContext &ctx, } int ObGetLockExecutor::get_lock_session_(ObLockFuncContext &ctx, - const int64_t client_session_id, + const uint32_t client_session_id, const uint64_t client_session_create_ts, ObAddr &lock_session_addr, - int64_t &lock_session_id) + uint32_t &lock_session_id) { int ret = OB_SUCCESS; char table_name[MAX_FULL_TABLE_NAME_LENGTH] = {'\0'}; @@ -773,9 +885,10 @@ int ObGetLockExecutor::get_lock_session_(ObLockFuncContext &ctx, ObSqlString sql; common::sqlclient::ObMySQLResult *result = nullptr; OZ (sql.assign_fmt("SELECT svr_ip, svr_port, server_session_id" - " FROM %s WHERE client_session_id = %ld", - table_name, - client_session_id)); + " FROM %s WHERE client_session_id = %" PRIu32 " AND client_session_create_ts = %" PRIu64, + table_name, + client_session_id, + client_session_create_ts)); OZ (ctx.execute_read(sql, res)); OV (OB_NOT_NULL(result = res.get_result()), OB_ERR_UNEXPECTED, client_session_id); OZ (get_first_session_info_(*result, lock_session_addr, lock_session_id)); @@ -786,11 +899,12 @@ int ObGetLockExecutor::get_lock_session_(ObLockFuncContext &ctx, int ObGetLockExecutor::get_first_session_info_(common::sqlclient::ObMySQLResult &res, ObAddr &session_addr, - int64_t &server_session_id) + uint32_t &server_session_id) { int ret = OB_SUCCESS; ObString svr_ip; int64_t svr_port; + uint64_t tmp_session_id = 0; OZ (res.next()); if (OB_ITER_END == ret) { @@ -798,7 +912,8 @@ int ObGetLockExecutor::get_first_session_info_(common::sqlclient::ObMySQLResult } OX (GET_COL_IGNORE_NULL(res.get_varchar, "svr_ip", svr_ip)); OX (GET_COL_IGNORE_NULL(res.get_int, "svr_port", svr_port)); - OX (GET_COL_IGNORE_NULL(res.get_int, "server_session_id", server_session_id)); + OX (GET_COL_IGNORE_NULL(res.get_uint, "server_session_id", tmp_session_id)); + OX (server_session_id = static_cast(tmp_session_id)); OX (session_addr.reset()); OV (session_addr.set_ip_addr(to_cstring(svr_ip), svr_port), OB_ERR_UNEXPECTED, svr_ip, svr_port); @@ -816,7 +931,7 @@ int ObGetLockExecutor::get_sql_port_(ObLockFuncContext &ctx, int64_t tmp_sql_port = 0; OZ (databuff_printf( - table_name, MAX_FULL_TABLE_NAME_LENGTH, "%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_VIRTUAL_SERVER_TNAME)); + table_name, MAX_FULL_TABLE_NAME_LENGTH, "%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_VIRTUAL_LS_META_TABLE_TNAME)); OV (svr_addr.ip_to_string(svr_ip, MAX_IP_ADDR_LENGTH), OB_ERR_UNEXPECTED, svr_addr); OX ( SMART_VAR(ObMySQLProxy::MySQLResult, res) @@ -824,7 +939,7 @@ int ObGetLockExecutor::get_sql_port_(ObLockFuncContext &ctx, ObSqlString sql; common::sqlclient::ObMySQLResult *result = nullptr; OZ (sql.assign_fmt("SELECT sql_port FROM %s" - " WHERE svr_ip = '%s' AND svr_port = %" PRId32, + " WHERE svr_ip = '%s' AND svr_port = %" PRId32 " LIMIT 1", table_name, svr_ip, svr_port)); OZ (ctx.execute_read(sql, res)); OV (OB_NOT_NULL(result = res.get_result()), OB_ERR_UNEXPECTED, svr_addr, svr_ip, svr_port); @@ -841,8 +956,8 @@ int ObReleaseLockExecutor::execute(ObExecContext &ctx, { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - int64_t client_session_id = 0; - int64_t server_session_id = 0; + uint32_t client_session_id = 0; + uint64_t client_session_create_ts = 0; uint64_t lock_id = 0; bool is_rollback = false; bool need_remove_from_lock_table = true; @@ -853,6 +968,7 @@ int ObReleaseLockExecutor::execute(ObExecContext &ctx, if (OB_SUCC(ret)) { SMART_VAR(ObLockFuncContext, stack_ctx) { client_session_id = ctx.get_my_session()->get_client_sessid(); + client_session_create_ts = ctx.get_my_session()->get_client_create_time(); OZ (stack_ctx.init(*(ctx.get_my_session()), ctx)); if (OB_SUCC(ret)) { ObSQLSessionInfo *session = GET_MY_SESSION(ctx); @@ -860,17 +976,20 @@ int ObReleaseLockExecutor::execute(ObExecContext &ctx, ObTxParam tx_param; ObUnLockObjsRequest arg; ObTableLockOwnerID lock_owner; - // 1. get lock id from inner table - // 2. unlock obj - // 3. modify inner table - // 3.1 lock table: dec cnt if the cnt is greater than 1, else remove the record. + // 1. check client_session_id is valid + // 2. get lock id from inner table + // 3. unlock obj + // 4. modify inner table + // 4.1 lock table: dec cnt if the cnt is greater than 1, else remove the record. // (this is processed internal) - // 3.2 lock name-id table: check the lock table if there is no lock of the same + // 4.2 lock name-id table: check the lock table if there is no lock of the same // lock id, remove the record at lock name-id table. - // 3.3 session table: check the lock table if there is no lock of the same + // 4.3 session table: check the lock table if there is no lock of the same // client session, remove the record of the same client session id. - OZ (lock_owner.convert_from_value(ObLockOwnerType::SESS_ID_OWNER_TYPE, - client_session_id)); + if (ctx.get_my_session()->is_obproxy_mode()) { + OZ (check_client_ssid_(stack_ctx, client_session_id, client_session_create_ts)); + } + OZ (lock_owner.convert_from_client_sessid(client_session_id, client_session_create_ts)); OZ (query_lock_id_(lock_name, lock_id)); OZ (ObInnerConnectionLockUtil::build_tx_param(session, tx_param)); @@ -886,7 +1005,6 @@ int ObReleaseLockExecutor::execute(ObExecContext &ctx, session, LOCK_OBJECT, arg, need_remove_from_lock_table)); if (OB_SUCC(ret) && need_remove_from_lock_table) { OZ (unlock_obj_(tx_desc, tx_param, arg)); - OZ (remove_lock_id_(stack_ctx, lock_owner.raw_value(), lock_id)); OZ (remove_session_record_(stack_ctx, client_session_id)); } else if (OB_EMPTY_RESULT == ret) { if (OB_TMP_FAIL(check_lock_exist_(stack_ctx, lock_id, lock_id_existed))) { @@ -911,19 +1029,23 @@ int ObReleaseAllLockExecutor::execute(ObExecContext &ctx, { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - int64_t client_session_id = 0; + uint32_t client_session_id = 0; + uint64_t client_session_create_ts = 0; bool is_rollback = false; OZ (ObLockFuncContext::valid_execute_context(ctx)); OX (client_session_id = ctx.get_my_session()->get_client_sessid()); - OZ (execute_(ctx, client_session_id, release_cnt)); + OX (client_session_create_ts = ctx.get_my_session()->get_client_create_time()); + OZ (execute_(ctx, client_session_id, client_session_create_ts, release_cnt)); return ret; } -int ObReleaseAllLockExecutor::execute(const int64_t client_session_id) +int ObReleaseAllLockExecutor::execute(const int64_t raw_owner_id) { int ret = OB_SUCCESS; int64_t release_cnt = 0; ObArenaAllocator allocator(ObModIds::OB_SQL_EXPR); + ObTableLockOwnerID owner_id; + OX (owner_id.convert_from_value(raw_owner_id)); SMART_VAR(sql::ObSQLSessionInfo, session) { SMART_VAR(sql::ObExecContext, exec_ctx, allocator) { ObSqlCtx sql_ctx; @@ -946,7 +1068,7 @@ int ObReleaseAllLockExecutor::execute(const int64_t client_session_id) OX (exec_ctx.set_physical_plan_ctx(&phy_plan_ctx)); OZ (ObLockFuncContext::valid_execute_context(exec_ctx)); - OZ (execute_(exec_ctx, client_session_id, release_cnt)); + OZ (execute_(exec_ctx, owner_id, release_cnt)); OX (exec_ctx.set_physical_plan_ctx(nullptr)); // avoid core during release exec_ctx } } @@ -955,12 +1077,54 @@ int ObReleaseAllLockExecutor::execute(const int64_t client_session_id) } int ObReleaseAllLockExecutor::execute_(ObExecContext &ctx, - const int64_t client_session_id, + const uint32_t client_session_id, + const uint64_t client_session_create_ts, int64_t &release_cnt) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; bool is_rollback = false; + ObTableLockOwnerID owner_id; + OZ (ObLockFuncContext::valid_execute_context(ctx)); + if (OB_SUCC(ret)) { + SMART_VAR(ObLockFuncContext, stack_ctx) { + OZ (stack_ctx.init(*(ctx.get_my_session()), ctx)); + if (OB_SUCC(ret)) { + ObSQLSessionInfo *session = GET_MY_SESSION(ctx); + ObTxDesc *tx_desc = session->get_tx_desc(); + ObTxParam tx_param; + if (ctx.get_my_session()->is_obproxy_mode()) { + OZ (check_client_ssid_(stack_ctx, client_session_id, client_session_create_ts)); + } + OZ (ObInnerConnectionLockUtil::build_tx_param(session, tx_param)); + OZ (owner_id.convert_from_client_sessid(client_session_id, client_session_create_ts)); + OZ (release_all_locks_(stack_ctx, + session, + tx_param, + owner_id, + release_cnt)); + OZ (remove_session_record_(stack_ctx, client_session_id)); + } + is_rollback = (OB_SUCCESS != ret); + if (OB_TMP_FAIL(stack_ctx.destroy(ctx, *(ctx.get_my_session()), is_rollback))) { + LOG_WARN("stack ctx destroy failed", K(tmp_ret)); + COVER_SUCC(tmp_ret); + } + } + } + return ret; +} + +int ObReleaseAllLockExecutor::execute_(ObExecContext &ctx, + const ObTableLockOwnerID &owner_id, + int64_t &release_cnt) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + bool is_rollback = false; + uint32_t client_session_id = 0; + + OZ (owner_id.convert_to_sessid(client_session_id)); OZ (ObLockFuncContext::valid_execute_context(ctx)); if (OB_SUCC(ret)) { SMART_VAR(ObLockFuncContext, stack_ctx) { @@ -970,11 +1134,7 @@ int ObReleaseAllLockExecutor::execute_(ObExecContext &ctx, ObTxDesc *tx_desc = session->get_tx_desc(); ObTxParam tx_param; OZ (ObInnerConnectionLockUtil::build_tx_param(session, tx_param)); - OZ (release_all_locks_(stack_ctx, - session, - tx_param, - client_session_id, - release_cnt)); + OZ (release_all_locks_(stack_ctx, session, tx_param, owner_id, release_cnt)); OZ (remove_session_record_(stack_ctx, client_session_id)); } is_rollback = (OB_SUCCESS != ret); @@ -990,7 +1150,7 @@ int ObReleaseAllLockExecutor::execute_(ObExecContext &ctx, int ObReleaseAllLockExecutor::release_all_locks_(ObLockFuncContext &ctx, ObSQLSessionInfo *session, const ObTxParam &tx_param, - const int64_t client_session_id, + const ObTableLockOwnerID &owner_id, int64_t &release_cnt) { int ret = OB_SUCCESS; @@ -1005,11 +1165,10 @@ int ObReleaseAllLockExecutor::release_all_locks_(ObLockFuncContext &ctx, { ObSqlString sql; common::sqlclient::ObMySQLResult *result = NULL; - if (OB_FAIL(lock_owner.convert_from_value(ObLockOwnerType::SESS_ID_OWNER_TYPE, client_session_id))) { - } else if (OB_FAIL(sql.assign_fmt("SELECT obj_type, obj_id, lock_mode, owner_id, cnt" + if (OB_FAIL(sql.assign_fmt("SELECT obj_type, obj_id, lock_mode, owner_id, cnt" " FROM %s WHERE owner_id = %ld", table_name, - lock_owner.raw_value()))) { + owner_id.raw_value()))) { LOG_WARN("fail to assign fmt", KR(ret)); } else if (OB_FAIL(ctx.execute_read(sql, res))) { LOG_WARN("execute sql failed", KR(ret), K(sql)); @@ -1070,9 +1229,6 @@ int ObReleaseAllLockExecutor::release_all_locks_(ObLockFuncContext &ctx, } else if (arg.objs_.count() != 1) { ret = OB_NOT_SUPPORTED; LOG_WARN("do not support batch unlock right now", K(arg)); - } else if (ObLockOBJType::OBJ_TYPE_MYSQL_LOCK_FUNC == arg.objs_[0].obj_type_ - && OB_FAIL(remove_lock_id_(ctx, arg.owner_id_.raw_value(), arg.objs_[0].obj_id_))) { - LOG_WARN("remove lock id failed", K(ret), K(arg)); } } @@ -1114,7 +1270,6 @@ int ObReleaseAllLockExecutor::parse_lock_request_(common::sqlclient::ObMySQLResu arg.op_type_ = ObTableLockOpType::OUT_TRANS_LOCK; arg.timeout_us_ = 0; arg.is_from_sql_ = true; - arg.owner_id_.convert_from_value(owner_id); } return ret; @@ -1146,13 +1301,14 @@ int ObISFreeLockExecutor::execute(ObExecContext &ctx, UNUSED(ctx); int ret = OB_SUCCESS; uint64_t lock_id = 0; - OZ (query_lock_id_(lock_name, lock_id)); + OZ(query_lock_id_(lock_name, lock_id)); + OZ(check_lock_exist_(lock_id)); return ret; } int ObISUsedLockExecutor::execute(ObExecContext &ctx, const ObString &lock_name, - int64_t &sess_id) + uint32_t &sess_id) { UNUSED(ctx); int ret = OB_SUCCESS; diff --git a/src/storage/tablelock/ob_lock_func_executor.h b/src/storage/tablelock/ob_lock_func_executor.h index ac61bdf7d..258ca89ad 100644 --- a/src/storage/tablelock/ob_lock_func_executor.h +++ b/src/storage/tablelock/ob_lock_func_executor.h @@ -124,8 +124,10 @@ public: static constexpr int64_t LOCK_ID_LENGTH = 10; static constexpr int64_t MIN_LOCK_HANDLE_ID = 0x40000000; static constexpr int64_t MAX_LOCK_HANDLE_ID = 1999999999; + static constexpr int64_t DEFAULT_EXPIRATION_US = 60 * 1000 * 1000L; // 1min public: + int check_lock_exist_(const uint64_t &lock_id); int check_lock_exist_(ObLockFuncContext &ctx, const int64_t raw_owner_id, const uint64_t &lock_id, @@ -139,18 +141,22 @@ public: int check_lock_exist_(ObLockFuncContext &ctx, ObSqlString &where_cond, bool &exist); - int remove_lock_id_(ObLockFuncContext &ctx, - const int64_t raw_owner_id, - const uint64_t &lock_id); + int check_client_ssid_(ObLockFuncContext &ctx, + const uint32_t client_session_id, + const uint64_t client_session_create_ts); + int remove_lock_id_(ObLockFuncContext &ctx, const int64_t raw_owner_id, const uint64_t &lock_id); int remove_lock_id_(ObLockFuncContext &ctx, const int64_t lock_id); int remove_session_record_(ObLockFuncContext &ctx, - const int64_t client_session_id); + const uint32_t client_session_id); int unlock_obj_(transaction::ObTxDesc *tx_desc, const transaction::ObTxParam &tx_param, const ObUnLockObjsRequest &arg); int query_lock_id_(const ObString &lock_name, uint64_t &lock_id); + int query_lock_id_and_lock_handle_(const ObString &lock_name, + uint64_t &lock_id, + char *lock_handle_buf); int query_lock_owner_(const uint64_t &lock_id, int64_t &owner_id); int extract_lock_id_(const ObString &lock_handle, @@ -169,25 +175,27 @@ public: private: int generate_lock_id_(ObLockFuncContext &ctx, const ObString &lock_name, + const int64_t timeout_us, uint64_t &lock_id); int update_session_table_(ObLockFuncContext &ctx, - const int64_t client_session_id, + const uint32_t client_session_id, const uint64_t client_session_create_ts, - const int64_t server_session_id); + const uint32_t server_session_id); int check_need_reroute_(ObLockFuncContext &ctx, sql::ObSQLSessionInfo *session, - const int64_t client_session_id, + const uint32_t client_session_id, const uint64_t client_session_create_ts); int get_lock_session_(ObLockFuncContext &ctx, - const int64_t client_session_id, + const uint32_t client_session_id, const uint64_t client_session_create_ts, ObAddr &lock_session_addr, - int64_t &lock_session_id); - int get_first_session_info_(common::sqlclient::ObMySQLResult &res, ObAddr &session_addr, int64_t &server_session_id); + uint32_t &lock_session_id); + int get_first_session_info_(common::sqlclient::ObMySQLResult &res, ObAddr &session_addr, uint32_t &server_session_id); int get_sql_port_(ObLockFuncContext &ctx, const ObAddr &svr_addr, int32_t &sql_port); int lock_obj_(sql::ObSQLSessionInfo *session, const transaction::ObTxParam &tx_param, - const int64_t client_session_id, + const uint32_t client_session_id, + const uint64_t client_session_create_ts, const int64_t obj_id, const int64_t timeout_us); int generate_lock_id_(const ObString &lock_name, @@ -195,6 +203,7 @@ private: char *lock_handle); int write_lock_id_(ObLockFuncContext &ctx, const ObString &lock_name, + const int64_t timeout_us, const uint64_t &lock_id, const char *lock_handle_buf); }; @@ -212,15 +221,19 @@ public: int execute(sql::ObExecContext &ctx, int64_t &release_cnt); // used internal, release all the lock that required by the session. - int execute(const int64_t client_session_id); + int execute(const int64_t raw_owner_id); private: int execute_(sql::ObExecContext &ctx, - const int64_t client_session_id, + const uint32_t client_session_id, + const uint64_t client_session_create_ts, + int64_t &release_cnt); + int execute_(sql::ObExecContext &ctx, + const ObTableLockOwnerID &owner_id, int64_t &release_cnt); int release_all_locks_(ObLockFuncContext &ctx, sql::ObSQLSessionInfo *session, const transaction::ObTxParam &tx_param, - const int64_t client_session_id, + const ObTableLockOwnerID &owner_id, int64_t &release_cnt); int release_all_locks_(ObLockFuncContext &ctx, const ObIArray &arg_list, @@ -248,7 +261,7 @@ class ObISUsedLockExecutor : public ObLockFuncExecutor public: int execute(sql::ObExecContext &ctx, const ObString &lock_name, - int64_t &sess_id); + uint32_t &sess_id); }; } // tablelock diff --git a/src/storage/tablelock/ob_table_lock_common.cpp b/src/storage/tablelock/ob_table_lock_common.cpp index b2a11fafe..9ea5e60c3 100644 --- a/src/storage/tablelock/ob_table_lock_common.cpp +++ b/src/storage/tablelock/ob_table_lock_common.cpp @@ -190,14 +190,24 @@ int ObTableLockOwnerID::convert_from_value(const ObLockOwnerType owner_type, return ret; } -int ObTableLockOwnerID::convert_to_sessid(int64_t &sessid) const +int ObTableLockOwnerID::convert_from_client_sessid(const uint32_t client_sessid, const uint64_t client_sess_create_ts) +{ + int ret = OB_SUCCESS; + pack_ = 0; + type_ = static_cast(ObLockOwnerType::SESS_ID_OWNER_TYPE); + int64_t client_unique_id = client_sess_create_ts & CLIENT_SESS_CREATE_TS_MASK; + id_ = (static_cast(client_sessid)) | (client_unique_id << CLIENT_SESS_ID_BIT); + return ret; +} + +int ObTableLockOwnerID::convert_to_sessid(uint32_t &sessid) const { int ret = OB_SUCCESS; if (type_ != static_cast(ObLockOwnerType::SESS_ID_OWNER_TYPE)) { ret = OB_NOT_SUPPORTED; LOG_WARN("this lock owner id cannot be converted to session id", K(ret), K_(type)); } else { - sessid = id_; + sessid = static_cast(id_ & CLIENT_SESS_ID_MASK); } return ret; } diff --git a/src/storage/tablelock/ob_table_lock_common.h b/src/storage/tablelock/ob_table_lock_common.h index dbcfb853a..930e99277 100644 --- a/src/storage/tablelock/ob_table_lock_common.h +++ b/src/storage/tablelock/ob_table_lock_common.h @@ -436,6 +436,10 @@ class ObTableLockOwnerID public: static const int64_t INVALID_ID = -1; static const int64_t INVALID_RAW_OWNER_ID = ((1L << 54) - 1); + static const int64_t CLIENT_SESS_CREATE_TS_BIT = 22; + static const int64_t CLIENT_SESS_CREATE_TS_MASK = (1L << CLIENT_SESS_CREATE_TS_BIT) - 1; + static const int64_t CLIENT_SESS_ID_BIT = 32; + static const int64_t CLIENT_SESS_ID_MASK = (1L << CLIENT_SESS_ID_BIT) - 1; static const int64_t MAX_VALID_RAW_OWNER_ID = INVALID_RAW_OWNER_ID - 1; ObTableLockOwnerID() : pack_(INVALID_ID) {} @@ -462,7 +466,8 @@ public: int convert_from_value(const int64_t packed_id); // check valid. int convert_from_value(const ObLockOwnerType owner_type, const int64_t raw_owner_id); - int convert_to_sessid(int64_t &sessid) const; + int convert_from_client_sessid(const uint32_t client_sessid, const uint64_t client_sess_create_ts); + int convert_to_sessid(uint32_t &sessid) const; // assignment ObTableLockOwnerID &operator=(const ObTableLockOwnerID &other) { pack_ = other.pack_; return *this; } diff --git a/src/storage/tablelock/ob_table_lock_live_detector.cpp b/src/storage/tablelock/ob_table_lock_live_detector.cpp index 298e9079c..1e731fa94 100644 --- a/src/storage/tablelock/ob_table_lock_live_detector.cpp +++ b/src/storage/tablelock/ob_table_lock_live_detector.cpp @@ -84,7 +84,7 @@ int ObTableLockDetectFuncList::do_session_alive_detect() ObArray> owner_ids; bool client_session_alive = true; ObTableLockOwnerID owner_id; - int64_t client_session_id = sql::ObSQLSessionInfo::INVALID_SESSID; + uint32_t client_session_id = sql::ObSQLSessionInfo::INVALID_SESSID; char full_table_name[OB_MAX_TABLE_NAME_BUF_LENGTH]; lib::CompatModeGuard compat_guard(lib::Worker::CompatMode::MYSQL); if (OB_FAIL(databuff_printf( @@ -112,8 +112,7 @@ int ObTableLockDetectFuncList::do_session_alive_detect() } else if (!client_session_alive) { LOG_INFO( "find client session is not alive, we will clean all recodrs of it later", K(ret), K(client_session_id)); - ObReleaseAllLockExecutor executor; - executor.execute(client_session_id); + ObTableLockDetector::remove_lock_by_owner_id(owner_ids[i].element<0>()); } } } @@ -251,18 +250,19 @@ int ObTableLockDetectFuncList::check_server_is_online_(const ObString &svr_ip, c is_online = false; char full_table_name[OB_MAX_TABLE_NAME_BUF_LENGTH]; - if (OB_FAIL(databuff_printf( - where_cond, - 512, - "WHERE (svr_ip, svr_port) IN (SELECT u.svr_ip, u.svr_port FROM %s.%s AS u JOIN %s.%s AS r on " - "r.resource_pool_id = u.resource_pool_id WHERE tenant_id = %ld) and svr_ip = '%s' and svr_port = %ld", - OB_SYS_DATABASE_NAME, - OB_ALL_UNIT_TNAME, - OB_SYS_DATABASE_NAME, - OB_ALL_RESOURCE_POOL_TNAME, - MTL_ID(), - to_cstring(svr_ip), - svr_port))) { + if (OB_FAIL( + databuff_printf(where_cond, + 512, + "WHERE (svr_ip, svr_port) IN (SELECT u.svr_ip, u.svr_port FROM %s.%s AS u JOIN %s.%s AS r on " + "r.resource_pool_id = u.resource_pool_id WHERE tenant_id = %ld) and svr_ip = '%s' and svr_port " + "= %ld and status = 'active'", + OB_SYS_DATABASE_NAME, + OB_ALL_UNIT_TNAME, + OB_SYS_DATABASE_NAME, + OB_ALL_RESOURCE_POOL_TNAME, + MTL_ID(), + to_cstring(svr_ip), + svr_port))) { LOG_WARN("generate where condition for select sql failed", K(svr_ip), K(svr_port)); } else if (OB_FAIL(databuff_printf( full_table_name, OB_MAX_TABLE_NAME_BUF_LENGTH, "%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_SERVER_TNAME))) { @@ -289,15 +289,14 @@ const char *ObTableLockDetector::detect_columns[8] = { int ObTableLockDetector::record_detect_info_to_inner_table(sql::ObSQLSessionInfo *session_info, const ObTableLockTaskType &task_type, const ObLockRequest &lock_req, + const bool for_dbms_lock, bool &need_record_to_lock_table) { int ret = OB_SUCCESS; observer::ObInnerSQLConnection *inner_conn = nullptr; char full_table_name[OB_MAX_TABLE_NAME_BUF_LENGTH]; - share::ObDMLSqlSplicer dml; - ObSqlString insert_sql; - int64_t affected_rows = 0; bool need_release_conn = false; + bool is_existed = false; need_record_to_lock_table = true; lib::CompatModeGuard guard(lib::Worker::CompatMode::MYSQL); @@ -321,21 +320,16 @@ int ObTableLockDetector::record_detect_info_to_inner_table(sql::ObSQLSessionInfo OB_SYS_DATABASE_NAME, OB_ALL_DETECT_LOCK_INFO_TNAME))) { LOG_WARN("generate full table_name failed", K(OB_SYS_DATABASE_NAME), K(OB_ALL_DETECT_LOCK_INFO_TNAME)); - } else if (OB_FAIL(generate_insert_dml_(task_type, lock_req, dml))) { - LOG_WARN("generate insert dml failed", K(ret)); - } else if (OB_FAIL(dml.splice_insert_sql(full_table_name, insert_sql))) { - LOG_WARN("generate insert sql failed", K(ret), K(full_table_name)); - } else if (OB_FAIL(insert_sql.append(" ON DUPLICATE KEY UPDATE cnt = cnt + 1"))) { - LOG_WARN("append 'cnt = cnt + 1' to the insert_sql failed", K(ret), K(insert_sql)); - } else if (OB_FAIL(ObInnerConnectionLockUtil::execute_write_sql(inner_conn, insert_sql, affected_rows))) { - LOG_WARN("execute insert sql failed", K(ret), K(insert_sql)); - } else if (affected_rows == 2) { + } else if (for_dbms_lock && OB_FAIL(check_dbms_lock_record_exist_(inner_conn, full_table_name, task_type, lock_req, is_existed))) { + LOG_WARN("check dbms_lock record exist failed", K(ret), K(task_type), K(lock_req)); + } + + if (OB_FAIL(ret)) { + } else if (for_dbms_lock && is_existed) { need_record_to_lock_table = false; - LOG_INFO("there's the same lock in __all_detect_lock_info table, no need to record it to the lock table", - K(lock_req)); - } else if (affected_rows != 1) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("only can affetct 1 row due to insert, or 2 rows due to insert on duplicate key", K(affected_rows)); + } else if (OB_FAIL(record_detect_info_to_inner_table_( + inner_conn, full_table_name, task_type, lock_req, need_record_to_lock_table))) { + LOG_WARN("record_detect_info_to_inner_table_ failed", K(ret), K(task_type), K(lock_req)); } if (need_release_conn && OB_NOT_NULL(inner_conn)) { @@ -474,6 +468,102 @@ int ObTableLockDetector::do_detect_and_clear() return ret; } +int ObTableLockDetector::remove_lock_by_owner_id(const int64_t raw_owner_id) +{ + int ret = OB_SUCCESS; + ObReleaseAllLockExecutor executor; + if (OB_FAIL(executor.execute(raw_owner_id))) { + LOG_WARN("remove lock by owner_id failed", K(raw_owner_id)); + } + return ret; +} + +int ObTableLockDetector::record_detect_info_to_inner_table_(observer::ObInnerSQLConnection *inner_conn, + const char *table_name, + const ObTableLockTaskType &task_type, + const ObLockRequest &lock_req, + bool &need_record_to_lock_table) +{ + int ret = OB_SUCCESS; + share::ObDMLSqlSplicer dml; + ObSqlString insert_sql; + int64_t affected_rows = 0; + + if (OB_FAIL(generate_insert_dml_(task_type, lock_req, dml))) { + LOG_WARN("generate insert dml failed", K(ret)); + } else if (OB_FAIL(dml.splice_insert_sql(table_name, insert_sql))) { + LOG_WARN("generate insert sql failed", K(ret), K(table_name)); + } else if (OB_FAIL(insert_sql.append(" ON DUPLICATE KEY UPDATE cnt = cnt + 1"))) { + LOG_WARN("append 'cnt = cnt + 1' to the insert_sql failed", K(ret), K(insert_sql)); + } else if (OB_FAIL(ObInnerConnectionLockUtil::execute_write_sql(inner_conn, insert_sql, affected_rows))) { + LOG_WARN("execute insert sql failed", K(ret), K(insert_sql)); + } else if (affected_rows == 2) { + need_record_to_lock_table = false; + LOG_INFO("there's the same lock in __all_detect_lock_info table, no need to record it to the lock table", + K(lock_req)); + } else if (affected_rows != 1) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("only can affetct 1 row due to insert, or 2 rows due to insert on duplicate key", K(affected_rows)); + } + return ret; +} + +int ObTableLockDetector::check_dbms_lock_record_exist_(observer::ObInnerSQLConnection *inner_conn, + const char *table_name, + const ObTableLockTaskType &task_type, + const ObLockRequest &lock_req, + bool &is_existed) +{ + int ret = OB_SUCCESS; + ObSqlString sql; + uint64_t obj_type = static_cast(ObLockOBJType::OBJ_TYPE_INVALID); + uint64_t obj_id = OB_INVALID_ID; + + if (LOCK_OBJECT != task_type) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("the task_type for DBMS_LOCK should be LOCK_OBJECT", K(ret), K(task_type), K(lock_req)); + } else { + const ObLockObjsRequest &arg = static_cast(lock_req); + if (arg.objs_.count() > 1) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("do not support detect batch lock obj request right now", K(arg)); + } else { + obj_type = static_cast(arg.objs_[0].obj_type_); + obj_id = arg.objs_[0].obj_id_; + } + } + + if (OB_FAIL(ret)) { + } else if (OB_FAIL(sql.append_fmt("SELECT 1 FROM %s WHERE task_type = %d AND obj_type = %" PRIu64 " AND obj_id = %" PRIu64 + " AND owner_id = %" PRId64, + table_name, + static_cast(task_type), + obj_type, + obj_id, + lock_req.owner_id_.raw_value()))) { + LOG_WARN("make select sql failed", K(ret), K(task_type), K(lock_req)); + } else { + SMART_VAR(ObMySQLProxy::MySQLResult, res) + { + common::sqlclient::ObMySQLResult *result = nullptr; + if (OB_FAIL(ObInnerConnectionLockUtil::execute_read_sql(inner_conn, sql, res))) { + LOG_WARN("execute_read_sql for check_dbms_lock_record_exist_ failed", KR(ret), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get result", KR(ret), K(sql)); + } else if (OB_FAIL(result->next())) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + is_existed = false; + } + } else { + is_existed = true; + } + } + } + return ret; +} + int ObTableLockDetector::generate_insert_dml_(const ObTableLockTaskType &task_type, const ObLockRequest &lock_req, share::ObDMLSqlSplicer &dml) @@ -502,7 +592,7 @@ int ObTableLockDetector::add_pk_column_to_dml_(const ObTableLockTaskType &task_t share::ObDMLSqlSplicer &dml) { int ret = OB_SUCCESS; - uint64_t obj_type = static_cast(ObTableLockTaskType::INVALID_LOCK_TASK_TYPE); + uint64_t obj_type = static_cast(ObLockOBJType::OBJ_TYPE_INVALID); uint64_t obj_id = OB_INVALID_ID; switch (task_type) { diff --git a/src/storage/tablelock/ob_table_lock_live_detector.h b/src/storage/tablelock/ob_table_lock_live_detector.h index 90bc78efd..230925643 100644 --- a/src/storage/tablelock/ob_table_lock_live_detector.h +++ b/src/storage/tablelock/ob_table_lock_live_detector.h @@ -98,6 +98,7 @@ public: static int record_detect_info_to_inner_table(sql::ObSQLSessionInfo *session_info, const ObTableLockTaskType &task_type, const ObLockRequest &lock_req, + const bool for_dbms_lock, bool &need_record_to_lock_table); static int remove_detect_info_from_inner_table(sql::ObSQLSessionInfo *session_info, const ObTableLockTaskType &task_type, @@ -108,8 +109,19 @@ public: const ObLockRequest &lock_req, int64_t &cnt); static int do_detect_and_clear(); + static int remove_lock_by_owner_id(const int64_t raw_owner_id); private: + static int record_detect_info_to_inner_table_(observer::ObInnerSQLConnection *inner_conn, + const char *table_name, + const ObTableLockTaskType &task_type, + const ObLockRequest &lock_req, + bool &need_record_to_lock_table); + static int check_dbms_lock_record_exist_(observer::ObInnerSQLConnection *inner_conn, + const char *table_name, + const ObTableLockTaskType &task_type, + const ObLockRequest &lock_req, + bool &is_existed); static int generate_insert_dml_(const ObTableLockTaskType &task_type, const ObLockRequest &lock_req, share::ObDMLSqlSplicer &dml);