[CP] [to #2024111500105132794] fix: make routine name case insensitive when revoking mysql PL privileges

This commit is contained in:
0xacc
2025-02-13 12:49:19 +00:00
committed by ob-robot
parent 916b495b5f
commit 797534f436
2 changed files with 114 additions and 4 deletions

View File

@ -276,6 +276,105 @@ int ObPrivSqlService::grant_table(
return ret;
}
int ObPrivSqlService::gen_delete_routine_priv_sql(
ObISQLClient &sql_client,
const uint64_t exec_tenant_id,
const ObRoutinePrivSortKey &routine_priv_key,
ObDMLSqlSplicer &dml)
{
int ret = OB_SUCCESS;
// why we need to SELECT before DELETE?
//
// following case will write two rows of `routine_name` in different cases to __all_routine_privilege:
//
// create function `DB`.`f_Test`() returns int return 1;
// grant execute on function `DB`.`f_test` to tester;
// __all_routine_privilege:
// +---------------+--------------+--------------+----------+---------+--------------+
// | database_name | routine_name | routine_type | all_priv | grantor | grantor_host |
// +---------------+--------------+--------------+----------+---------+--------------+
// | db | f_Test | 2 | 3 | admin | % |
// | db | f_test | 2 | 1 | admin | % |
// +---------------+--------------+--------------+----------+---------+--------------+
//
// however, `routine_name` of __all_routine_privilege is of VARBINARY type, which is case-sensitive.
// so DELETE statement will fail if condition `routine_name=routine_priv_key.routine_name_` is applied,
// which causes -4016 and makes the routine cannot be deleted.
//
// to address this issue, we use `CAST(routine_name AS CHAR)=routine_priv_key.routine_name_` in a SELECT statement to compare in a case-insensivtie manner,
// and find the original text written to __all_routine_privilege, then use it in DELETE statement.
//
// rowkey_columns = [
// ('tenant_id', 'int'),
// ('user_id', 'int'),
// ('database_name', 'varbinary:OB_MAX_DATABASE_NAME_BINARY_LENGTH'),
// ('routine_name', 'varbinary:OB_MAX_ROUTINE_NAME_BINARY_LENGTH'),
// ('routine_type', 'int'),
// ]
//
// pk columns and the fact that routine names are case-insensitive make sure at most one row is SELECTed.
// __all_routine_privilege is consistent with priv_mgr, so at least one row is SELECTed.
// as a result, we can expect exactly one row is SELECTed.
//
// in this way, we can always delete what was written, no matter `routine_name` in uppercase or lowercase.
//
ObSqlString sql;
if (OB_FAIL(sql.append_fmt("SELECT routine_name FROM %s WHERE tenant_id=%d AND user_id=%lu AND database_name=",
OB_ALL_ROUTINE_PRIVILEGE_TNAME,
0,
routine_priv_key.user_id_))) {
LOG_WARN("failed to append_fmt", K(ret), K(sql));
} else if (OB_FAIL(sql_append_hex_escape_str(routine_priv_key.db_, sql))) {
LOG_WARN("failed to sql_append_hex_escape_str", K(ret), K(routine_priv_key.db_), K(sql));
} else if (OB_FAIL(sql.append(" AND CAST(routine_name AS CHAR)="))) {
LOG_WARN("failed to append sql string", K(ret), K(sql));
} else if (OB_FAIL(sql_append_hex_escape_str(routine_priv_key.routine_, sql))) {
LOG_WARN("failed to sql_append_hex_escape_str", K(ret), K(routine_priv_key.routine_), K(sql));
} else if (OB_FAIL(sql.append_fmt(" AND routine_type=%ld", routine_priv_key.routine_type_))) {
LOG_WARN("failed to append_fmt", K(ret), K(routine_priv_key.routine_type_), K(sql));
} else {
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
ObString routine_name;
common::sqlclient::ObMySQLResult *res = nullptr;
if (OB_FAIL(sql_client.read(result, exec_tenant_id, sql.ptr()))) {
LOG_WARN("failed to execute sql", K(ret), K(sql));
} else if (OB_ISNULL(res = result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected NULL result set", K(ret));
} else if (OB_FAIL(res->next())) {
LOG_WARN("failed to iter result set", K(ret));
} else {
EXTRACT_VARCHAR_FIELD_MYSQL(*res, "routine_name", routine_name);
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(dml.add_pk_column("routine_name", ObHexEscapeSqlStr(routine_name)))) {
LOG_WARN("failed to add_pk_column", K(ret), K(routine_name));
}
// only one row is expected
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(res->next())) {
if (OB_ITER_END != ret) {
LOG_WARN("failed to iter result set", K(ret), K(sql));
} else {
ret = OB_SUCCESS;
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected multiple rows", K(ret), K(sql));
}
}
}
return ret;
}
int ObPrivSqlService::grant_routine(
const ObRoutinePrivSortKey &routine_priv_key,
@ -311,11 +410,15 @@ int ObPrivSqlService::grant_routine(
// insert into __all_routine_privilege
if (OB_SUCC(ret)) {
if (is_deleted) {
if (OB_FAIL(exec.exec_delete(OB_ALL_ROUTINE_PRIVILEGE_TNAME, dml, affected_rows))) {
LOG_WARN("exec_delete failed", K(ret));
if (OB_FAIL(gen_delete_routine_priv_sql(sql_client, exec_tenant_id, routine_priv_key, dml))) {
LOG_WARN("failed to gen_delete_routine_sql", K(ret));
} else if (OB_FAIL(exec.exec_delete(OB_ALL_ROUTINE_PRIVILEGE_TNAME, dml, affected_rows))) {
LOG_WARN("failed to exec_delete", K(ret));
}
} else {
if (OB_FAIL(exec.exec_replace(OB_ALL_ROUTINE_PRIVILEGE_TNAME, dml, affected_rows))) {
if (OB_FAIL(dml.add_pk_column("routine_name", ObHexEscapeSqlStr(routine_priv_key.routine_)))) {
LOG_WARN("failed to add_pk_column", K(ret));
} else if (OB_FAIL(exec.exec_replace(OB_ALL_ROUTINE_PRIVILEGE_TNAME, dml, affected_rows))) {
LOG_WARN("exec_replace failed", K(ret));
}
}
@ -996,7 +1099,8 @@ int ObPrivSqlService::gen_routine_priv_dml(
if (OB_FAIL(dml.add_pk_column("tenant_id", 0))
|| OB_FAIL(dml.add_pk_column("user_id", routine_priv_key.user_id_))
|| OB_FAIL(dml.add_pk_column("database_name", ObHexEscapeSqlStr(routine_priv_key.db_)))
|| OB_FAIL(dml.add_pk_column("routine_name", ObHexEscapeSqlStr(routine_priv_key.routine_)))
// add routine_name outside
// || OB_FAIL(dml.add_pk_column("routine_name", ObHexEscapeSqlStr(routine_priv_key.routine_)))
|| OB_FAIL(dml.add_pk_column("routine_type", routine_priv_key.routine_type_))
|| OB_FAIL(dml.add_column("all_priv", all_priv))) {
LOG_WARN("add column failed", K(ret));

View File

@ -260,6 +260,12 @@ private:
ObDMLSqlSplicer &dml,
bool is_deleted);
static int gen_delete_routine_priv_sql(
ObISQLClient &sql_client,
const uint64_t exec_tenant_id,
const ObRoutinePrivSortKey &routine_priv_key,
ObDMLSqlSplicer &dml);
private:
DISALLOW_COPY_AND_ASSIGN(ObPrivSqlService);
};