!2309 MySQL特性--rename table语法bugfix

Merge pull request !2309 from 吕辉/rename_table_bugfix
This commit is contained in:
opengauss-bot
2022-11-30 10:47:15 +00:00
committed by Gitee
3 changed files with 183 additions and 49 deletions

View File

@ -5983,8 +5983,8 @@ static void RenameTableFeature(RenameStmt* stmt)
List* search_path = fetch_search_path(false);
Oid relnamespace = InvalidOid;
RangeVar* temp_name = NULL;
Oid relid = InvalidOid;
Relation rel_pg_class;
Oid relid = InvalidOid, relid_temp = InvalidOid;
Relation rel_pg_class, rel_pg_type;
HeapTuple tup;
HeapTuple newtup;
Form_pg_class relform;
@ -5992,12 +5992,15 @@ static void RenameTableFeature(RenameStmt* stmt)
Datum values[Natts_pg_class] = { 0 };
bool nulls[Natts_pg_class] = { false };
bool replaces[Natts_pg_class] = { false };
Datum type_values[Natts_pg_type] = { 0 };
bool type_nulls[Natts_pg_type] = { false };
bool type_replaces[Natts_pg_type] = { false };
if (stmt->renameTargetList == NULL) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Cannot get rename table name and modify name")));
}
RenameTableNameData storageTable[stmt->renameTargetList->length];
bool tempSchema[stmt->renameTargetList->length] = { false };
Relation lockRelation[stmt->renameTargetList->length];
int tableName_Count = 0;
ListCell* rename_Cell = NULL;
@ -6005,15 +6008,29 @@ static void RenameTableFeature(RenameStmt* stmt)
RenameCell* renameInfo = (RenameCell*)lfirst(rename_Cell);
temp_name = renameInfo->original_name;
orgiSchema = temp_name->schemaname;
/* orgitable NOT NULL */
Assert(temp_name->relname != NULL);
orgitable = temp_name->relname;
/* if schema name don't assign */
if (orgiSchema == NULL && search_path != NIL) {
relnamespace = linitial_oid(search_path);
if (!OidIsValid(relnamespace)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Cannot get current namespace on Rename Table.")));
ListCell* l = NULL;
foreach(l, search_path) {
relnamespace = lfirst_oid(l);
if (!OidIsValid(relnamespace)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Cannot get current namespace on Rename Table.")));
}
relid = get_relname_relid(orgitable, relnamespace);
/* Traversal the search_path until the correct schema of table is found */
if (OidIsValid(relid)) {
orgiSchema = get_namespace_name(relnamespace);
break;
}
}
orgiSchema = get_namespace_name(relnamespace);
}
orgitable = temp_name->relname;
if (orgiSchema == NULL) {
orgiSchema = get_namespace_name(PG_PUBLIC_NAMESPACE);
tempSchema[tableName_Count] = true;
}
storageTable[tableName_Count].schemaname = pstrdup(orgiSchema);
storageTable[tableName_Count].relname = pstrdup(orgitable);
tableName_Count++;
@ -6023,20 +6040,20 @@ static void RenameTableFeature(RenameStmt* stmt)
qsort((void*)storageTable, (size_t)stmt->renameTargetList->length, sizeof(RenameTableNameData), Compare_RenameTableNameData_func);
}
for (int num = 0; num < stmt->renameTargetList->length; num++) {
orgiNameSpace = get_namespace_oid(storageTable[num].schemaname, false);
relid = get_relname_relid(storageTable[num].relname, orgiNameSpace);
if (!OidIsValid(relid) && OidIsValid(u_sess->catalog_cxt.myTempNamespace)) {
relid = get_relname_relid(storageTable[num].relname, u_sess->catalog_cxt.myTempNamespace);
if (orgiSchema != NULL && !tempSchema[num]) {
orgiNameSpace = get_namespace_oid(storageTable[num].schemaname, false);
} else if (OidIsValid(u_sess->catalog_cxt.myTempNamespace)) {
orgiNameSpace = u_sess->catalog_cxt.myTempNamespace;
}
relid = get_relname_relid(storageTable[num].relname, orgiNameSpace);
if(!OidIsValid(relid)) {
lockRelation[num] = NULL;
orgiNameSpace = InvalidOid;
continue;
} else {
/* Don't support Tempporary Table */
if (IsTempTable(relid)) {
ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmsg("relation %s is temporary table, Rename table don't support.", get_rel_name(relid))));
if (IsTempTable(relid) || IsGlobalTempTable(relid)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("relation %s is temporary table, Rename table don't support.", get_rel_name(relid))));
}
lockRelation[num] = relation_open(relid, AccessExclusiveLock);
}
@ -6056,19 +6073,37 @@ static void RenameTableFeature(RenameStmt* stmt)
if (modfySchema != NULL) {
modfyNameSpace = get_namespace_oid(modfySchema, false);
}
/* modfytable NOT NULL */
Assert(temp_name->relname);
modfytable = temp_name->relname;
/* obtain search_path, get schema name */
if (orgiSchema == NULL && search_path != NIL) {
relnamespace = linitial_oid(search_path);
if (!OidIsValid(relnamespace)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Cannot get current namespace on Rename Table.")));
} else if (orgiSchema == NULL) {
orgiNameSpace = relnamespace;
ListCell* l = NULL;
foreach (l, search_path) {
relnamespace = lfirst_oid(l);
if (!OidIsValid(relnamespace)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Cannot get current namespace on Rename Table.")));
}
relid = get_relname_relid(orgitable, relnamespace);
/* Traversal the search_path until the correct schema of table is found */
if (OidIsValid(relid)) {
orgiSchema = get_namespace_name(relnamespace);
orgiNameSpace = relnamespace;
if (modfySchema == NULL) {
modfyNameSpace = relnamespace;
}
break;
}
}
} else if (search_path == NIL && (orgiSchema == NULL || modfySchema == NULL)) {
} else if (search_path == NIL && orgiSchema == NULL) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Rename Table search_path get NIL in error.")));
}
if (modfySchema == NULL && orgiSchema != NULL) {
/* if modfytable table has no schema specified,
* it's the same as orgiNameSpace */
modfyNameSpace = orgiNameSpace;
}
/* Check whether exist Synonym on old table name and new table name */
if (orgiSchema == NULL) {
@ -6084,52 +6119,110 @@ static void RenameTableFeature(RenameStmt* stmt)
SearchSysCacheExists2(SYNONYMNAMENSP, PointerGetDatum(modfytable), ObjectIdGetDatum(orgiNameSpace))) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Rename Table \"%s.%s\" exist Synonym, so Rename table can't execute.",
orgiSchema, modfytable)));
} else if((orgiSchema == NULL && modfySchema == NULL) &&
} else if ((orgiSchema == NULL && modfySchema == NULL) &&
SearchSysCacheExists2(SYNONYMNAMENSP, PointerGetDatum(modfytable), ObjectIdGetDatum(relnamespace))) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Rename Table \"%s.%s\" exist Synonym, so Rename table can't execute.",
get_namespace_name(relnamespace), modfytable)));
} else if (orgitable != NULL) {
Oid temp_namespace = InvalidOid;
if (orgiSchema != NULL) {
if (OidIsValid(orgiNameSpace)) {
temp_namespace = orgiNameSpace;
} else {
temp_namespace = relnamespace;
}
} else {
temp_namespace = relnamespace;
}
if (SearchSysCacheExists2(SYNONYMNAMENSP, PointerGetDatum(orgitable), ObjectIdGetDatum(temp_namespace))) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Rename Table \"%s.%s\" is Synonym, so Rename table can't support.",
get_namespace_name(temp_namespace), orgitable)));
}
}
/* check a user's access privileges to a namespace */
if (pg_namespace_aclcheck(orgiNameSpace, GetUserId(), ACL_CREATE) == ACLCHECK_NO_PRIV) {
if (pg_namespace_aclcheck(orgiNameSpace, GetUserId(), ACL_USAGE) != ACLCHECK_OK) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("User %s don't have creat privileges on Schema %s.",
GetUserNameFromId(GetUserId()), get_namespace_name(orgiNameSpace))));
}
if (pg_namespace_aclcheck(modfyNameSpace, GetUserId(), ACL_CREATE) == ACLCHECK_NO_PRIV) {
if (pg_namespace_aclcheck(modfyNameSpace, GetUserId(), ACL_USAGE) != ACLCHECK_OK) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("User %s don't have creat privileges on Schema %s.",
GetUserNameFromId(GetUserId()), get_namespace_name(modfyNameSpace))));
}
/* Do rename table work */
rel_pg_class = heap_open(RelationRelationId, RowExclusiveLock);
relid = get_relname_relid(orgitable, orgiNameSpace);
/* Support view but cannot span schemaes */
if (IsRelaionView(relid) && modfyNameSpace != orgiNameSpace) {
ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmsg("relation %s is view, Rename table don't support span schemaes.", get_rel_name(relid))));
} else if (!OidIsValid(relid)) {
ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmsg("relation \"%s.%s\" does not exist, skipping", get_namespace_name(orgiNameSpace), orgitable)));
if (!OidIsValid(relid)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("relation \"%s.%s\" does not exist, skipping", get_namespace_name(orgiNameSpace), orgitable)));
} else if (IsRelaionView(relid) && modfyNameSpace != orgiNameSpace) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("relation %s is view, Rename table don't support span schemaes.", get_rel_name(relid))));
} else if (orgiNameSpace == modfyNameSpace && pg_strcasecmp(orgitable, modfytable) == 0) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("relation \"%s.%s\" already exists", get_namespace_name(modfyNameSpace), modfytable)));
} else if (pg_class_aclcheck(relid, GetUserId(), ACL_ALTER) == ACLCHECK_NO_PRIV) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("permission denied for relation %s.%s", get_namespace_name(orgiNameSpace), orgitable)));
}
/* Rename regular table */
replaces[Anum_pg_class_relname - 1] = true;
values[Anum_pg_class_relname - 1] = DirectFunctionCall1(namein, CStringGetDatum(modfytable));
type_replaces[Anum_pg_type_typname - 1] = true;
type_values[Anum_pg_type_typname - 1] = DirectFunctionCall1(namein, CStringGetDatum(modfytable));
if (modfySchema != NULL) {
replaces[Anum_pg_class_relnamespace - 1] = true;
values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(modfyNameSpace);
type_replaces[Anum_pg_type_typnamespace - 1] = true;
type_values[Anum_pg_type_typnamespace - 1] = ObjectIdGetDatum(modfyNameSpace);
}
/* delete table privileges */
/* delete the table relacl. only superuser can operate the table */
nulls[Anum_pg_class_relacl - 1] = true;
replaces[Anum_pg_class_relacl - 1] = true;
tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tup)) {
ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmsg("cache lookup failed for relation %s", get_rel_name(relid))));
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("cache lookup failed for relation %s", get_rel_name(relid))));
}
relform = (Form_pg_class)GETSTRUCT(tup);
if (relform->relkind == RELKIND_RELATION && relform->parttype == PARTTYPE_PARTITIONED_RELATION) {
renamePartitionedTable(relid, modfytable);
} else if (relform->relhastriggers && modfyNameSpace != orgiNameSpace) {
ScanKeyData key;
bool is_find = false;
HeapTuple tuple = NULL;
Relation tgrel = heap_open(TriggerRelationId, RowExclusiveLock);
ScanKeyInit(&key, Anum_pg_trigger_tgrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relid));
SysScanDesc scan = systable_beginscan(tgrel, TriggerRelidNameIndexId, true, NULL, 1, &key);
while (HeapTupleIsValid(tuple = systable_getnext(scan))) {
Form_pg_trigger pg_trigger = (Form_pg_trigger)GETSTRUCT(tuple);
if (!pg_trigger->tgisinternal) {
is_find = true;
break;
}
}
systable_endscan(scan);
heap_close(tgrel, RowExclusiveLock);
if (is_find) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("Trigger in wrong schema on table %s", get_rel_name(relid))));
}
}
/* Fix other dependent stuff */
if (relform->relkind == RELKIND_RELATION || relform->relkind == RELKIND_MATVIEW) {
ObjectAddresses* objsMoved = NULL;
objsMoved = new_object_addresses();
Relation rel;
rel = relation_open(relid, NoLock);
// AlterIndexNamespaces(rel_pg_class, rel, orgiNameSpace, modfyNameSpace, objsMoved);
// AlterSeqNamespaces(rel_pg_class, rel, orgiNameSpace, modfyNameSpace, objsMoved, AccessExclusiveLock);
AlterConstraintNamespaces(RelationGetRelid(rel), orgiNameSpace, modfyNameSpace, false, objsMoved);
relation_close(rel, NoLock);
}
relid_temp = relid;
newtup = heap_modify_tuple(tup, RelationGetDescr(rel_pg_class), values, nulls, replaces);
simple_heap_update(rel_pg_class, &newtup->t_self, newtup);
CatalogUpdateIndexes(rel_pg_class, newtup);
@ -6138,21 +6231,25 @@ static void RenameTableFeature(RenameStmt* stmt)
heap_close(rel_pg_class, RowExclusiveLock);
CommandCounterIncrement();
/* revoke table privileges */
InternalGrant istmt;
istmt.is_grant = false;
istmt.objtype = ACL_OBJECT_RELATION;
istmt.objects = list_make1_oid(relid);
istmt.all_privs= true;
istmt.privileges = ACL_NO_RIGHTS;
istmt.ddl_privileges = ACL_NO_DDL_RIGHTS;
istmt.col_privs = NIL;
istmt.col_ddl_privs = NIL;
istmt.grantees = list_make1_oid(ACL_ID_PUBLIC);
istmt.grant_option = false;
istmt.behavior = DROP_CASCADE;
rel_pg_type = heap_open(TypeRelationId, RowExclusiveLock);
relid = get_typeoid(orgiNameSpace, orgitable);
if (!OidIsValid(relid)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("type \"%s.%s\" does not exist, skipping", get_namespace_name(orgiNameSpace), orgitable)));
}
tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tup)) {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("cache lookup failed for type %s", get_rel_name(relid))));
}
newtup = heap_modify_tuple(tup, RelationGetDescr(rel_pg_type), type_values, type_nulls, type_replaces);
simple_heap_update(rel_pg_type, &newtup->t_self, newtup);
CatalogUpdateIndexes(rel_pg_type, newtup);
ReleaseSysCache(tup);
tableam_tops_free_tuple(newtup);
heap_close(rel_pg_type, RowExclusiveLock);
CommandCounterIncrement();
ExecGrant_Relation(&istmt);
/* update dependencies to point to the new schema */
(void)changeDependencyFor(RelationRelationId, relid_temp, NamespaceRelationId, orgiNameSpace, modfyNameSpace);
}
for (int num = stmt->renameTargetList->length - 1; num >= 0; num--) {
if (lockRelation[num] != NULL) {
@ -6163,6 +6260,7 @@ static void RenameTableFeature(RenameStmt* stmt)
pfree(storageTable[num].schemaname);
pfree(storageTable[num].relname);
}
list_free_ext(search_path);
}
/*

View File

@ -1,7 +1,8 @@
drop database if exists mysql;
create database mysql dbcompatibility 'B';
\c mysql
create schema test; create schema tbinfo;
create schema test;
create schema tbinfo;
create table test.t1(id int);
create table test.t2(c_id int not null primary key, name varchar) partition by range (c_id) (partition t2_p1 values less than(100), partition t2_p2 values less than(200), partition t2_p3 values less than(MAXVALUE));
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "t2_pkey" for table "t2"
@ -35,6 +36,23 @@ ERROR: relation t4 is view, Rename table don't support span schemaes.
create temp table t5(id int);
rename table t5 to tt;
ERROR: relation t5 is temporary table, Rename table don't support.
create schema ot;
create table ot.t1(id int,name varchar2(10));
create or replace synonym t1 for ot.t1;
rename table t1 to t2;
ERROR: Rename Table "public.t1" is Synonym, so Rename table can't support.
create table b(a int);
rename table b to b1;
create table b(a int);
drop table b;
create view v2 as select * from tbinfo.t1;
rename table v2 to v4;
create global temp table tang01(id int primary key, t text);
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "tang01_pkey" for table "tang01"
rename table tang01 to tang02;
ERROR: relation tang01 is temporary table, Rename table don't support.
rename table b1 to b1;
ERROR: relation "public.b1" already exists
\c regression
drop database mysql;
create schema test; create schema tbinfo;

View File

@ -3,7 +3,9 @@ drop database if exists mysql;
create database mysql dbcompatibility 'B';
\c mysql
create schema test; create schema tbinfo;
create schema test;
create schema tbinfo;
create table test.t1(id int);
create table test.t2(c_id int not null primary key, name varchar) partition by range (c_id) (partition t2_p1 values less than(100), partition t2_p2 values less than(200), partition t2_p3 values less than(MAXVALUE));
create view test.t3 as select * from test.t1;
@ -15,11 +17,27 @@ rename table test.t1 to tbinfo.t1, test.t2 to tbinfo.t2, test.t3 to test.t4;
\d test.t4;
rename table test.t4 to tbinfo.t3;
create temp table t5(id int);
rename table t5 to tt;
create schema ot;
create table ot.t1(id int,name varchar2(10));
create or replace synonym t1 for ot.t1;
rename table t1 to t2;
create table b(a int);
rename table b to b1;
create table b(a int);
drop table b;
create view v2 as select * from tbinfo.t1;
rename table v2 to v4;
create global temp table tang01(id int primary key, t text);
rename table tang01 to tang02;
rename table b1 to b1;
\c regression
drop database mysql;
create schema test; create schema tbinfo;