[TABLELOCK] move the process of recycling the DBMS_LOCK_ALLOCATED table to the background GC thread
This commit is contained in:
@ -292,6 +292,29 @@ public:
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
static int delete_row(const uint64_t tenant_id,
|
||||
const ObString &table,
|
||||
const ObString &value)
|
||||
{
|
||||
TIMEGUARD_INIT(OCCAM, 1_s, 60_s);
|
||||
#define PRINT_WRAPPER KR(ret), K(MTL_ID()), K(table), K(value), K(sql)
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql;
|
||||
int64_t affected_rows = 0;
|
||||
if (CLICK_FAIL(sql.append_fmt("DELETE FROM %s WHERE %s", to_cstring(table), to_cstring(value)))) {
|
||||
} else if (OB_ISNULL(GCTX.sql_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
OB_LOG_(WARN, "GCTX.sql_proxy_ is nullptr");
|
||||
} else if (CLICK_FAIL(GCTX.sql_proxy_->write(tenant_id, sql.ptr(), affected_rows))) {
|
||||
OB_LOG_(WARN, "GCTX.sql_proxy_ insert row failed");
|
||||
} else {
|
||||
OB_LOG_(INFO, "GCTX.sql_proxy_ insert row success");
|
||||
}
|
||||
return ret;
|
||||
#undef PRINT_WRAPPER
|
||||
}
|
||||
|
||||
static int set_parameter(const ObString &value)
|
||||
{
|
||||
TIMEGUARD_INIT(OCCAM, 1_s, 60_s);
|
||||
|
@ -285,7 +285,7 @@ int ObLockFuncExecutor::check_lock_exist_(const uint64_t &lock_id)
|
||||
char table_name[MAX_FULL_TABLE_NAME_LENGTH] = {0};
|
||||
bool is_existed = false;
|
||||
|
||||
OZ(databuff_printf(where_cond,
|
||||
OZ (databuff_printf(where_cond,
|
||||
WHERE_CONDITION_BUFFER_SIZE,
|
||||
"WHERE obj_id = %" PRIu64
|
||||
" and obj_type = %d",
|
||||
@ -617,6 +617,32 @@ void ObLockFuncExecutor::mark_lock_session_(sql::ObSQLSessionInfo *session, cons
|
||||
}
|
||||
}
|
||||
|
||||
int ObLockFuncExecutor::remove_expired_lock_id()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
char table_name[MAX_FULL_TABLE_NAME_LENGTH] = {0};
|
||||
char where_cond[WHERE_CONDITION_BUFFER_SIZE] = {0};
|
||||
const int64_t now = ObTimeUtility::current_time();
|
||||
// delete 10 rows each time, to avoid causing abnormal delays due to deleting too many rows
|
||||
const int delete_limit = 10;
|
||||
|
||||
OZ (databuff_printf(table_name, MAX_FULL_TABLE_NAME_LENGTH,
|
||||
"%s.%s", OB_SYS_DATABASE_NAME, OB_ALL_DBMS_LOCK_ALLOCATED_TNAME));
|
||||
OZ (databuff_printf(where_cond,
|
||||
WHERE_CONDITION_BUFFER_SIZE,
|
||||
"expiration <= usec_to_time(%" PRId64
|
||||
") AND lockid NOT IN (SELECT obj_id FROM %s.%s where obj_type = %d or obj_type = %d)"
|
||||
"LIMIT %d",
|
||||
now,
|
||||
OB_SYS_DATABASE_NAME,
|
||||
OB_ALL_DETECT_LOCK_INFO_TNAME,
|
||||
static_cast<int>(ObLockOBJType::OBJ_TYPE_MYSQL_LOCK_FUNC),
|
||||
static_cast<int>(ObLockOBJType::OBJ_TYPE_DBMS_LOCK),
|
||||
delete_limit));
|
||||
OZ (ObTableAccessHelper::delete_row(MTL_ID(), table_name, where_cond));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObGetLockExecutor::execute(ObExecContext &ctx,
|
||||
const ObString &lock_name,
|
||||
const int64_t timeout_us)
|
||||
@ -789,17 +815,6 @@ int ObGetLockExecutor::write_lock_id_(ObLockFuncContext &ctx,
|
||||
OZ (ctx.execute_write(insert_sql, affected_rows));
|
||||
CK (OB_LIKELY(1 == affected_rows || 2 == affected_rows));
|
||||
|
||||
// clean lock_id which is expired and not locked
|
||||
OZ(delete_sql.assign_fmt("DELETE FROM %s WHERE expiration <= usec_to_time(%" PRId64
|
||||
") AND lockid NOT IN (SELECT obj_id FROM %s.%s where obj_type = %d)",
|
||||
table_name,
|
||||
now,
|
||||
OB_SYS_DATABASE_NAME,
|
||||
OB_ALL_DETECT_LOCK_INFO_TNAME,
|
||||
static_cast<int>(ObLockOBJType::OBJ_TYPE_MYSQL_LOCK_FUNC)));
|
||||
affected_rows = 0;
|
||||
OZ (ctx.execute_write(delete_sql, affected_rows));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -120,7 +120,7 @@ public:
|
||||
+ OB_MAX_DATABASE_NAME_LENGTH
|
||||
+ 1);
|
||||
static constexpr int64_t MAX_LOCK_HANDLE_LEGNTH = 65;
|
||||
static constexpr int64_t WHERE_CONDITION_BUFFER_SIZE = 100;
|
||||
static constexpr int64_t WHERE_CONDITION_BUFFER_SIZE = 512;
|
||||
static constexpr int64_t LOCK_ID_LENGTH = 10;
|
||||
static constexpr int64_t MIN_LOCK_HANDLE_ID = 0x40000000;
|
||||
static constexpr int64_t MAX_LOCK_HANDLE_ID = 1999999999;
|
||||
@ -165,6 +165,8 @@ public:
|
||||
uint64_t &lock_id);
|
||||
void mark_lock_session_(sql::ObSQLSessionInfo *session,
|
||||
const bool is_lock_session);
|
||||
int remove_expired_lock_id();
|
||||
int remove_expired_lock_id_(sql::ObExecContext &ctx);
|
||||
};
|
||||
|
||||
class ObGetLockExecutor : public ObLockFuncExecutor
|
||||
|
@ -464,6 +464,7 @@ int ObTableLockDetector::do_detect_and_clear()
|
||||
if (OB_FAIL(func1.call_function_directly())) {
|
||||
LOG_WARN("do session_alive detect failed", K(ret));
|
||||
}
|
||||
remove_expired_lock_id();
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -478,6 +479,16 @@ int ObTableLockDetector::remove_lock_by_owner_id(const int64_t raw_owner_id)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLockDetector::remove_expired_lock_id()
|
||||
{
|
||||
int ret =OB_SUCCESS;
|
||||
ObLockFuncExecutor executor;
|
||||
if (OB_FAIL(executor.remove_expired_lock_id())) {
|
||||
LOG_WARN("remove expired lock_id failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLockDetector::record_detect_info_to_inner_table_(observer::ObInnerSQLConnection *inner_conn,
|
||||
const char *table_name,
|
||||
const ObTableLockTaskType &task_type,
|
||||
|
@ -110,6 +110,7 @@ public:
|
||||
int64_t &cnt);
|
||||
static int do_detect_and_clear();
|
||||
static int remove_lock_by_owner_id(const int64_t raw_owner_id);
|
||||
static int remove_expired_lock_id();
|
||||
|
||||
private:
|
||||
static int record_detect_info_to_inner_table_(observer::ObInnerSQLConnection *inner_conn,
|
||||
|
Reference in New Issue
Block a user