!2035 MySQL特性--rename table语法

Merge pull request !2035 from 吕辉/rename
This commit is contained in:
opengauss-bot
2022-09-21 03:00:25 +00:00
committed by Gitee
10 changed files with 423 additions and 52 deletions

View File

@ -101,19 +101,6 @@
* represented in col_privs and col_ddl_privs (this is a list of untransformed AccessPriv nodes).
* Column privileges are only valid for objtype ACL_OBJECT_RELATION.
*/
typedef struct {
bool is_grant;
GrantObjectType objtype;
List* objects;
bool all_privs;
AclMode privileges;
AclMode ddl_privileges;
List* col_privs;
List* col_ddl_privs;
List* grantees;
bool grant_option;
DropBehavior behavior;
} InternalGrant;
/*
* Internal format used by ALTER DEFAULT PRIVILEGES.
@ -224,7 +211,7 @@ const struct AclClassId {
};
static void ExecGrantStmt_oids(InternalGrant* istmt);
static void ExecGrant_Relation(InternalGrant* grantStmt);
void ExecGrant_Relation(InternalGrant* grantStmt);
static void ExecGrant_Database(InternalGrant* grantStmt);
static void ExecGrant_Fdw(InternalGrant* grantStmt);
static void ExecGrant_ForeignServer(InternalGrant* grantStmt);
@ -2103,7 +2090,7 @@ static void ExecGrantRelationPrivilegesCheck(Form_pg_class tuple, AclMode* privi
/*
* This processes both sequences and non-sequences.
*/
static void ExecGrant_Relation(InternalGrant* istmt)
void ExecGrant_Relation(InternalGrant* istmt)
{
Relation relation = NULL;
Relation attRelation = NULL;

View File

@ -487,7 +487,7 @@ static int errstate;
name_list from_clause from_list opt_array_bounds
qualified_name_list any_name any_name_list
any_operator expr_list attrs callfunc_args
target_list insert_column_list set_target_list
target_list insert_column_list set_target_list rename_clause_list rename_clause
set_clause_list set_clause multiple_set_clause
ctext_expr_list ctext_row def_list tsconf_def_list indirection opt_indirection
reloption_list tblspc_option_list cfoption_list group_clause TriggerFuncArgs select_limit
@ -15614,8 +15614,54 @@ RenameStmt: ALTER AGGREGATE func_name aggr_args RENAME TO name
n->missing_ok = false;
$$ = (Node *)n;
}
| RENAME TABLE rename_clause_list
{
#ifndef ENABLE_MULTIPLE_NODES
if (u_sess->attr.attr_sql.sql_compatibility == B_FORMAT) {
RenameStmt *n = makeNode(RenameStmt);
n->renameType = OBJECT_TABLE;
n->renameTargetList = $3;
n->renameTableflag = true;
n->missing_ok = false;
$$ = (Node *)n;
} else {
const char* message = "rename table syntax is supported on dbcompatibility B.";
InsertErrorMessage(message, u_sess->plsql_cxt.plpgsql_yylloc);
ereport(errstate,
(errmodule(MOD_PARSER),
errcode(ERRCODE_SYNTAX_ERROR),
errmsg("rename table syntax is supported on dbcompatibility B."),
parser_errposition(@1)));
$$ = NULL;
}
#else
const char* message = "rename table syntax don't supported on distributed database.";
InsertErrorMessage(message, u_sess->plsql_cxt.plpgsql_yylloc);
ereport(errstate,
(errmodule(MOD_PARSER),
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("rename table syntax don't supported on distributed database."),
parser_errposition(@1)));
$$ = NULL;
#endif
}
;
rename_clause_list:
rename_clause { $$ = $1; }
| rename_clause_list ',' rename_clause { $$ = list_concat($1, $3); }
;
rename_clause:
qualified_name TO qualified_name
{
RenameCell* n = makeNode(RenameCell);
n->original_name = $1;
n->modify_name = $3;
$$ = list_make1(n);
}
;
opt_column: COLUMN { $$ = COLUMN; }
| /*EMPTY*/ { $$ = 0; }
;

View File

@ -58,6 +58,7 @@
#include "catalog/pg_partition_fn.h"
#include "catalog/pg_hashbucket.h"
#include "catalog/pg_hashbucket_fn.h"
#include "catalog/pg_synonym.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_trigger.h"
#include "catalog/pg_type.h"
@ -411,6 +412,11 @@ struct OldToNewChunkIdMappingData {
Oid newChunkId;
};
struct RenameTableNameData {
char* schemaname;
char* relname;
};
typedef OldToNewChunkIdMappingData* OldToNewChunkIdMapping;
/* Alter table target-type flags for ATSimplePermissions */
@ -5919,50 +5925,273 @@ void RenameConstraint(RenameStmt* stmt)
0 /* expected inhcount */);
}
static bool FindSynonymExist(char* relname, char* relnamespace)
{
HeapTuple htup = NULL;
bool isnull = false;
bool result = false;
Relation rel_synonym = heap_open(PgSynonymRelationId, RowExclusiveLock);
SysScanDesc adscan = systable_beginscan(rel_synonym, InvalidOid, false, NULL, 0, NULL);
while (HeapTupleIsValid(htup = systable_getnext(adscan))) {
Datum val = heap_getattr(htup, Anum_pg_synonym_synobjschema, rel_synonym->rd_att, &isnull);
if (val && pg_strcasecmp(DatumGetCString(val), relnamespace) == 0) {
val = heap_getattr(htup, Anum_pg_synonym_synobjname, rel_synonym->rd_att, &isnull);
if (val && pg_strcasecmp(DatumGetCString(val), relname) == 0) {
result = true;
}
}
}
systable_endscan(adscan);
heap_close(rel_synonym, RowExclusiveLock);
return result;
}
static int Compare_RenameTableNameData_func(const void* a, const void* b)
{
if (strcmp(((const RenameTableNameData*)a)->schemaname, ((const RenameTableNameData*)b)->schemaname) == 0) {
return strcmp(((const RenameTableNameData*)a)->relname, ((const RenameTableNameData*)b)->relname);
} else {
return strcmp(((const RenameTableNameData*)a)->schemaname, ((const RenameTableNameData*)b)->schemaname);
}
}
static void RenameTableFeature(RenameStmt* stmt)
{
char *orgiSchema = NULL, *orgitable = NULL, *modfySchema = NULL, *modfytable = NULL;
Oid orgiNameSpace = InvalidOid, modfyNameSpace = InvalidOid;
List* search_path = fetch_search_path(false);
Oid relnamespace = InvalidOid;
RangeVar* temp_name = NULL;
Oid relid = InvalidOid;
Relation rel_pg_class;
HeapTuple tup;
HeapTuple newtup;
Form_pg_class relform;
Datum values[Natts_pg_class] = { 0 };
bool nulls[Natts_pg_class] = { false };
bool replaces[Natts_pg_class] = { false };
if (stmt->renameTargetList == NULL) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Cannot get rename table name and modify name")));
}
RenameTableNameData storageTable[stmt->renameTargetList->length];
Relation lockRelation[stmt->renameTargetList->length];
int tableName_Count = 0;
ListCell* rename_Cell = NULL;
foreach(rename_Cell, stmt->renameTargetList) {
RenameCell* renameInfo = (RenameCell*)lfirst(rename_Cell);
temp_name = renameInfo->original_name;
orgiSchema = temp_name->schemaname;
/* if schema name don't assign */
if (orgiSchema == NULL && search_path != NIL) {
relnamespace = linitial_oid(search_path);
if (!OidIsValid(relnamespace)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Cannot get current namespace on Rename Table.")));
}
orgiSchema = get_namespace_name(relnamespace);
}
orgitable = temp_name->relname;
storageTable[tableName_Count].schemaname = pstrdup(orgiSchema);
storageTable[tableName_Count].relname = pstrdup(orgitable);
tableName_Count++;
}
if (stmt->renameTargetList->length >= 2) {
qsort((void*)storageTable, (size_t)stmt->renameTargetList->length, sizeof(RenameTableNameData), Compare_RenameTableNameData_func);
}
for (int num = 0; num < stmt->renameTargetList->length; num++) {
orgiNameSpace = get_namespace_oid(storageTable[num].schemaname, false);
relid = get_relname_relid(storageTable[num].relname, orgiNameSpace);
if (!OidIsValid(relid) && OidIsValid(u_sess->catalog_cxt.myTempNamespace)) {
relid = get_relname_relid(storageTable[num].relname, u_sess->catalog_cxt.myTempNamespace);
}
if(!OidIsValid(relid)) {
lockRelation[num] = NULL;
continue;
} else {
/* Don't support Tempporary Table */
if (IsTempTable(relid)) {
ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmsg("relation %s is temporary table, Rename table don't support.", get_rel_name(relid))));
}
lockRelation[num] = relation_open(relid, AccessExclusiveLock);
}
}
foreach(rename_Cell, stmt->renameTargetList) {
/* acquire the schema and table name in renameTargetList */
RenameCell* renameInfo = (RenameCell*)lfirst(rename_Cell);
temp_name = renameInfo->original_name;
orgiSchema = temp_name->schemaname;
if (orgiSchema != NULL) {
orgiNameSpace = get_namespace_oid(orgiSchema, false);
}
orgitable = temp_name->relname;
temp_name = renameInfo->modify_name;
modfySchema = temp_name->schemaname;
if (modfySchema != NULL) {
modfyNameSpace = get_namespace_oid(modfySchema, false);
}
modfytable = temp_name->relname;
/* obtain search_path, get schema name */
if (orgiSchema == NULL && search_path != NIL) {
relnamespace = linitial_oid(search_path);
if (!OidIsValid(relnamespace)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Cannot get current namespace on Rename Table.")));
} else if (orgiSchema == NULL) {
orgiNameSpace = relnamespace;
}
} else if (search_path == NIL && (orgiSchema == NULL || modfySchema == NULL)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Rename Table search_path get NIL in error.")));
}
/* Check whether exist Synonym on old table name and new table name */
if (orgiSchema == NULL) {
orgiSchema = get_namespace_name(relnamespace);
}
if (FindSynonymExist(orgitable, orgiSchema)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Rename Table \"%s.%s\" exist Synonym, so Rename table can't execute.",
orgiSchema, orgitable)));
} else if (modfySchema != NULL && SearchSysCacheExists2(SYNONYMNAMENSP, PointerGetDatum(modfytable), ObjectIdGetDatum(modfyNameSpace))) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Rename Table \"%s.%s\" exist Synonym, so Rename table can't execute.",
modfySchema, modfytable)));
} else if ((orgiSchema != NULL && modfySchema == NULL) &&
SearchSysCacheExists2(SYNONYMNAMENSP, PointerGetDatum(modfytable), ObjectIdGetDatum(orgiNameSpace))) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Rename Table \"%s.%s\" exist Synonym, so Rename table can't execute.",
orgiSchema, modfytable)));
} else if((orgiSchema == NULL && modfySchema == NULL) &&
SearchSysCacheExists2(SYNONYMNAMENSP, PointerGetDatum(modfytable), ObjectIdGetDatum(relnamespace))) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Rename Table \"%s.%s\" exist Synonym, so Rename table can't execute.",
get_namespace_name(relnamespace), modfytable)));
}
/* check a user's access privileges to a namespace */
if (pg_namespace_aclcheck(orgiNameSpace, GetUserId(), ACL_CREATE) == ACLCHECK_NO_PRIV) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("User %s don't have creat privileges on Schema %s.",
GetUserNameFromId(GetUserId()), get_namespace_name(orgiNameSpace))));
}
if (pg_namespace_aclcheck(modfyNameSpace, GetUserId(), ACL_CREATE) == ACLCHECK_NO_PRIV) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("User %s don't have creat privileges on Schema %s.",
GetUserNameFromId(GetUserId()), get_namespace_name(modfyNameSpace))));
}
/* Do rename table work */
rel_pg_class = heap_open(RelationRelationId, RowExclusiveLock);
relid = get_relname_relid(orgitable, orgiNameSpace);
/* Support view but cannot span schemaes */
if (IsRelaionView(relid) && modfyNameSpace != orgiNameSpace) {
ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmsg("relation %s is view, Rename table don't support span schemaes.", get_rel_name(relid))));
} else if (!OidIsValid(relid)) {
ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmsg("relation \"%s.%s\" does not exist, skipping", get_namespace_name(orgiNameSpace), orgitable)));
}
/* Rename regular table */
replaces[Anum_pg_class_relname - 1] = true;
values[Anum_pg_class_relname - 1] = CStringGetDatum(modfytable);
if (modfySchema != NULL) {
replaces[Anum_pg_class_relnamespace - 1] = true;
values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(modfyNameSpace);
}
tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tup)) {
ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmsg("cache lookup failed for relation %s", get_rel_name(relid))));
}
relform = (Form_pg_class)GETSTRUCT(tup);
if (relform->relkind == RELKIND_RELATION && relform->parttype == PARTTYPE_PARTITIONED_RELATION) {
renamePartitionedTable(relid, modfytable);
}
newtup = heap_modify_tuple(tup, RelationGetDescr(rel_pg_class), values, nulls, replaces);
simple_heap_update(rel_pg_class, &newtup->t_self, newtup);
CatalogUpdateIndexes(rel_pg_class, newtup);
ReleaseSysCache(tup);
tableam_tops_free_tuple(newtup);
heap_close(rel_pg_class, RowExclusiveLock);
CommandCounterIncrement();
/* revoke table privileges */
InternalGrant istmt;
istmt.is_grant = false;
istmt.objtype = ACL_OBJECT_RELATION;
istmt.objects = list_make1_oid(relid);
istmt.all_privs= true;
istmt.privileges = ACL_NO_RIGHTS;
istmt.ddl_privileges = ACL_NO_DDL_RIGHTS;
istmt.col_privs = NIL;
istmt.col_ddl_privs = NIL;
istmt.grantees = list_make1_oid(ACL_ID_PUBLIC);
istmt.grant_option = false;
istmt.behavior = DROP_CASCADE;
ExecGrant_Relation(&istmt);
}
for (int num = stmt->renameTargetList->length - 1; num >= 0; num--) {
if (lockRelation[num] != NULL) {
relation_close(lockRelation[num], AccessExclusiveLock);
}
}
for (int num = 0; num < stmt->renameTargetList->length; num++) {
pfree(storageTable[num].schemaname);
pfree(storageTable[num].relname);
}
}
/*
* Execute ALTER TABLE/INDEX/SEQUENCE/VIEW/FOREIGN TABLE RENAME
*/
void RenameRelation(RenameStmt* stmt)
{
Oid relid;
if (stmt->renameTableflag) {
RenameTableFeature(stmt);
} else {
Oid relid;
/*
* Grab an exclusive lock on the target table, index, sequence or view,
* which we will NOT release until end of transaction.
*
* Lock level used here should match RenameRelationInternal, to avoid lock
* escalation.
*/
relid = RangeVarGetRelidExtended(stmt->relation,
AccessExclusiveLock,
stmt->missing_ok,
false,
false,
false,
RangeVarCallbackForAlterRelation,
(void*)stmt);
/*
* Grab an exclusive lock on the target table, index, sequence or view,
* which we will NOT release until end of transaction.
*
* Lock level used here should match RenameRelationInternal, to avoid lock
* escalation.
*/
relid = RangeVarGetRelidExtended(stmt->relation,
AccessExclusiveLock,
stmt->missing_ok,
false,
false,
false,
RangeVarCallbackForAlterRelation,
(void*)stmt);
if (!OidIsValid(relid)) {
ereport(NOTICE, (errmsg("relation \"%s\" does not exist, skipping", stmt->relation->relname)));
return;
}
if (!OidIsValid(relid)) {
ereport(NOTICE, (errmsg("relation \"%s\" does not exist, skipping", stmt->relation->relname)));
return;
}
TrForbidAccessRbObject(RelationRelationId, relid, stmt->relation->relname);
/* If table has history table, we need rename corresponding history table */
if (is_ledger_usertable(relid)) {
rename_hist_by_usertable(relid, stmt->newname);
}
TrForbidAccessRbObject(RelationRelationId, relid, stmt->relation->relname);
/* If table has history table, we need rename corresponding history table */
if (is_ledger_usertable(relid)) {
rename_hist_by_usertable(relid, stmt->newname);
}
/* Do the work */
RenameRelationInternal(relid, stmt->newname);
/*
* Record the changecsn of the table that defines the index
*/
if (stmt->renameType == OBJECT_INDEX) {
Oid relOid = IndexGetRelation(relid, false);
Relation userRelaiton = RelationIdGetRelation(relOid);
UpdatePgObjectChangecsn(relOid, userRelaiton->rd_rel->relkind);
RelationClose(userRelaiton);
/* Do the work */
RenameRelationInternal(relid, stmt->newname);
/*
* Record the changecsn of the table that defines the index
*/
if (stmt->renameType == OBJECT_INDEX) {
Oid relOid = IndexGetRelation(relid, false);
Relation userRelaiton = RelationIdGetRelation(relOid);
UpdatePgObjectChangecsn(relOid, userRelaiton->rd_rel->relkind);
RelationClose(userRelaiton);
}
}
}

View File

@ -1269,7 +1269,12 @@ static void pgaudit_process_rename_object(Node* node, const char* querystring)
case OBJECT_INDEX:
case OBJECT_FOREIGN_TABLE:
case OBJECT_STREAM:
objectname = stmt->relation->relname;
if (stmt->renameTableflag && PointerIsValid(stmt->renameTargetList) &&
PointerIsValid(stmt->renameTargetList->head)) {
objectname = ((RenameCell*)stmt->renameTargetList->head->data.ptr_value)->original_name->relname;
} else {
objectname = stmt->relation->relname;
}
break;
default:
break;

View File

@ -581,6 +581,7 @@ typedef enum NodeTag {
T_SqlLoadConsInfo,
T_SqlLoadColExpr,
T_AutoIncrement,
T_RenameCell,
/*
* TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
*/

View File

@ -2167,6 +2167,11 @@ typedef struct CreatePackageBodyStmt {
bool pkgsecdef;
} CreatePackageBodyStmt;
typedef struct RenameCell {
RangeVar* original_name;
RangeVar* modify_name;
} RenameCell;
/* ----------------------
* Alter Object Rename Statement
* ----------------------
@ -2183,6 +2188,8 @@ typedef struct RenameStmt {
char* newname; /* the new name */
DropBehavior behavior; /* RESTRICT or CASCADE behavior */
bool missing_ok; /* skip error if missing? */
List* renameTargetList = NULL;
bool renameTableflag = false;
} RenameStmt;
/* ----------------------

View File

@ -248,6 +248,20 @@ typedef enum AclObjectKind {
MAX_ACL_KIND /* MUST BE LAST */
} AclObjectKind;
typedef struct {
bool is_grant;
GrantObjectType objtype;
List* objects;
bool all_privs;
AclMode privileges;
AclMode ddl_privileges;
List* col_privs;
List* col_ddl_privs;
List* grantees;
bool grant_option;
DropBehavior behavior;
} InternalGrant;
/*
* routines used internally
*/
@ -388,5 +402,6 @@ extern bool is_trust_language(Oid lang_oid);
extern Acl* allocacl(int n);
extern bool pg_publication_ownercheck(Oid pub_oid, Oid roleid);
extern bool pg_subscription_ownercheck(Oid sub_oid, Oid roleid);
extern void ExecGrant_Relation(InternalGrant* grantStmt);
#endif /* ACL_H */

View File

@ -0,0 +1,49 @@
drop database if exists mysql;
create database mysql dbcompatibility 'B';
\c mysql
create schema test; create schema tbinfo;
create table test.t1(id int);
create table test.t2(c_id int not null primary key, name varchar) partition by range (c_id) (partition t2_p1 values less than(100), partition t2_p2 values less than(200), partition t2_p3 values less than(MAXVALUE));
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "t2_pkey" for table "t2"
create view test.t3 as select * from test.t1;
rename table test.t1 to tbinfo.t1, test.t2 to tbinfo.t2, test.t3 to test.t4;
\d tbinfo.t1;
Table "tbinfo.t1"
Column | Type | Modifiers
--------+---------+-----------
id | integer |
\d tbinfo.t2;
Table "tbinfo.t2"
Column | Type | Modifiers
--------+-------------------+-----------
c_id | integer | not null
name | character varying |
Indexes:
"t2_pkey" PRIMARY KEY, btree (c_id) LOCAL TABLESPACE pg_default
Partition By RANGE(c_id)
Number of partitions: 3 (View pg_partition to check each partition range.)
\d test.t4;
View "test.t4"
Column | Type | Modifiers
--------+---------+-----------
id | integer |
rename table test.t4 to tbinfo.t3;
ERROR: relation t4 is view, Rename table don't support span schemaes.
create temp table t5(id int);
rename table t5 to tt;
ERROR: relation t5 is temporary table, Rename table don't support.
\c regression
drop database mysql;
create schema test; create schema tbinfo;
create table test.t1(id int);
rename table test.t1 to tbinfo.t1;
ERROR: rename table syntax is supported on dbcompatibility B.
LINE 1: rename table test.t1 to tbinfo.t1;
^
alter table test.t1 rename to t2;
drop table test.t2;
drop schema test cascade;
drop schema tbinfo cascade;

View File

@ -1004,3 +1004,4 @@ test: sytcomp_del_upt4orderby
test: aioptimizer
test: aioptimizer_small
test: pgfincore
test: rename_table

View File

@ -0,0 +1,31 @@
drop database if exists mysql;
create database mysql dbcompatibility 'B';
\c mysql
create schema test; create schema tbinfo;
create table test.t1(id int);
create table test.t2(c_id int not null primary key, name varchar) partition by range (c_id) (partition t2_p1 values less than(100), partition t2_p2 values less than(200), partition t2_p3 values less than(MAXVALUE));
create view test.t3 as select * from test.t1;
rename table test.t1 to tbinfo.t1, test.t2 to tbinfo.t2, test.t3 to test.t4;
\d tbinfo.t1;
\d tbinfo.t2;
\d test.t4;
rename table test.t4 to tbinfo.t3;
create temp table t5(id int);
rename table t5 to tt;
\c regression
drop database mysql;
create schema test; create schema tbinfo;
create table test.t1(id int);
rename table test.t1 to tbinfo.t1;
alter table test.t1 rename to t2;
drop table test.t2;
drop schema test cascade;
drop schema tbinfo cascade;