!5711 逻辑复制支持Alter table的DDL语句

Merge pull request !5711 from Hemny/repl_ddl_alter
This commit is contained in:
opengauss_bot
2024-08-09 02:54:05 +00:00
committed by Gitee
120 changed files with 25361 additions and 648 deletions

View File

@ -751,6 +751,8 @@ inline HeapTuple heaptup_alloc(Size size)
#define XLOG_HEAP3_NEW_CID 0x00
#define XLOG_HEAP3_REWRITE 0x10
#define XLOG_HEAP3_INVALID 0x20
/* XLOG_HEAP_TRUNCATE with 0x30 in heap in PG14 */
#define XLOG_HEAP3_TRUNCATE 0x30
/* we used to put all xl_heap_* together, which made us run out of opcodes (quickly)
* when trying to add a DELETE_IS_SUPER operation. Thus we split the codes carefully
@ -806,6 +808,26 @@ typedef struct xl_heap_delete {
#define SizeOfOldHeapDelete (offsetof(xl_heap_delete, flags) + sizeof(uint8))
#define SizeOfHeapDelete (offsetof(xl_heap_delete, infobits_set) + sizeof(uint8))
/*
* xl_heap_delete flag values, 8 bits are available.
*/
#define XLH_TRUNCATE_CASCADE (1<<0)
#define XLH_TRUNCATE_RESTART_SEQS (1<<1)
/*
* For truncate we list all truncated relids in an array, followed by all
* sequence relids that need to be restarted, if any.
* All rels are always within the same database, so we just list dbid once.
*/
typedef struct xl_heap_truncate {
Oid dbId;
uint32 nrelids;
uint8 flags;
Oid relids[FLEXIBLE_ARRAY_MEMBER];
} xl_heap_truncate;
#define SizeOfHeapTruncate (offsetof(xl_heap_truncate, relids))
/*
* We don't store the whole fixed part (HeapTupleHeaderData) of an inserted
* or updated tuple in WAL; we can save a few bytes by reconstructing the

View File

@ -118,30 +118,31 @@ extern Oid heap_create_with_catalog(const char *relname,
Oid relnamespace,
Oid reltablespace,
Oid relid,
Oid reltypeid,
Oid reloftypeid,
Oid ownerid,
TupleDesc tupdesc,
List *cooked_constraints,
char relkind,
char relpersistence,
bool shared_relation,
bool mapped_relation,
bool oidislocal,
int oidinhcount,
OnCommitAction oncommit,
Datum reloptions,
bool use_user_acl,
bool allow_system_table_mods,
PartitionState *partTableState,
int8 row_compress,
HashBucketInfo *bucketinfo,
bool record_dependce = true,
List* ceLst = NULL,
StorageType storage_type = HEAP_DISK,
LOCKMODE partLockMode = AccessExclusiveLock,
Oid reltypeid,
Oid reloftypeid,
Oid ownerid,
TupleDesc tupdesc,
List *cooked_constraints,
char relkind,
char relpersistence,
bool shared_relation,
bool mapped_relation,
bool oidislocal,
int oidinhcount,
OnCommitAction oncommit,
Datum reloptions,
bool use_user_acl,
bool allow_system_table_mods,
PartitionState *partTableState,
int8 row_compress,
HashBucketInfo *bucketinfo,
bool record_dependce = true,
List* ceLst = NULL,
StorageType storage_type = HEAP_DISK,
LOCKMODE partLockMode = AccessExclusiveLock,
ObjectAddress *typaddress= NULL,
List* depend_extend = NIL);
List* depend_extend = NIL,
Oid relrewrite = InvalidOid);
extern void heap_create_init_fork(Relation rel);

View File

@ -24,6 +24,7 @@
/* Publication trigger events */
#define PUB_TRIG_DDL_CMD_END "ddl_command_end"
#define PUB_TRIG_DDL_CMD_START "ddl_command_start"
#define PUB_TRIG_TBL_REWRITE "table_rewrite"
/* Publication event trigger prefix */
#define PUB_EVENT_TRIG_FORMAT "pg_deparse_trig_%s_%u"
@ -62,6 +63,9 @@ CATALOG(pg_publication,6130) BKI_ROWTYPE_OID(6141) BKI_SCHEMA_MACRO
bool pubdelete;
int8 pubddl;
/* true if truncates are published */
bool pubtruncate;
}
FormData_pg_publication;
#undef int8
@ -78,7 +82,7 @@ typedef FormData_pg_publication *Form_pg_publication;
* ----------------
*/
#define Natts_pg_publication 7
#define Natts_pg_publication 8
#define Anum_pg_publication_pubname 1
#define Anum_pg_publication_pubowner 2
#define Anum_pg_publication_puballtables 3
@ -86,11 +90,13 @@ typedef FormData_pg_publication *Form_pg_publication;
#define Anum_pg_publication_pubupdate 5
#define Anum_pg_publication_pubdelete 6
#define Anum_pg_publication_pubddl 7
#define Anum_pg_publication_pubtruncate 8
typedef struct PublicationActions {
bool pubinsert;
bool pubupdate;
bool pubdelete;
bool pubtruncate;
int64 pubddl;
} PublicationActions;

View File

@ -0,0 +1 @@
DROP FUNCTION IF EXISTS pg_catalog.publication_deparse_table_rewrite() CASCADE;

View File

@ -0,0 +1 @@
DROP FUNCTION IF EXISTS pg_catalog.publication_deparse_table_rewrite() CASCADE;

View File

@ -0,0 +1,6 @@
DROP FUNCTION IF EXISTS pg_catalog.publication_deparse_table_rewrite() CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 4644;
CREATE FUNCTION pg_catalog.publication_deparse_table_rewrite ()
RETURNS event_trigger
LANGUAGE INTERNAL VOLATILE STRICT NOT FENCED
AS 'publication_deparse_table_rewrite';

View File

@ -0,0 +1,6 @@
DROP FUNCTION IF EXISTS pg_catalog.publication_deparse_table_rewrite() CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids = IUO_PROC, 4644;
CREATE FUNCTION pg_catalog.publication_deparse_table_rewrite ()
RETURNS event_trigger
LANGUAGE INTERNAL VOLATILE STRICT NOT FENCED
AS 'publication_deparse_table_rewrite';

View File

@ -210,10 +210,12 @@ extern Oid GetFunctionNodeGroupByFuncid(Oid funcid);
extern Oid GetFunctionNodeGroup(AlterFunctionStmt* stmt);
/* commands/eventcmds.c */
extern void CreateEventCommand(CreateEventStmt* stmt);
extern void AlterEventCommand(AlterEventStmt* stmt);
extern ObjectAddress CreateEventCommand(CreateEventStmt* stmt);
extern ObjectAddress AlterEventCommand(AlterEventStmt* stmt);
extern void DropEventCommand(DropEventStmt* stmt);
extern char* parseIntervalExprString(Node *intervalNode);
extern char* parseTimeExprString(Node* timeExpr);
#endif /* !FRONTEND_PARSER */
extern DefElem* defWithOids(bool value);
#endif /* DEFREM_H */

View File

@ -110,8 +110,7 @@ extern void EventTriggerCollectSimpleCommand(ObjectAddress address,
extern void EventTriggerAlterTableStart(Node *parsetree);
extern void EventTriggerAlterTableRelid(Oid objectId);
extern void EventTriggerCollectAlterTableSubcmd(Node *subcmd,
ObjectAddress address);
extern void EventTriggerCollectAlterTableSubcmd(Node *subcmd, ObjectAddress address, bool rewrite);
extern void EventTriggerAlterTableEnd(void);
extern void EventTriggerCollectGrant(InternalGrant *istmt);
@ -124,6 +123,8 @@ extern void EventTriggerCollectCreateOpClass(CreateOpClassStmt *stmt,
extern void EventTriggerCollectAlterTSConfig(AlterTSConfigurationStmt *stmt,
Oid cfgId, Oid *dictIds, int ndicts);
extern void EventTriggerCollectAlterDefPrivs(AlterDefaultPrivilegesStmt *stmt);
extern void EventTriggerAlterTypeStart(AlterTableCmd *subcmd, Relation rel);
extern void EventTriggerAlterTypeEnd(Node *subcmd, ObjectAddress address, int rewrite);
extern void EventTriggerAlterTypeUpdate(ObjectAddress address, AttrNumber old_attnum);
#endif /* EVENT_TRIGGER_H */

View File

@ -159,6 +159,10 @@ extern void ExecuteTruncate(TruncateStmt* stmt, const char* sql_statement);
extern void ExecuteTruncate(TruncateStmt* stmt);
#endif
extern void ExecuteTruncateGuts(
List *explicit_rels, List *relids, List *relids_logged, List *rels_in_redis,
DropBehavior behavior, bool restart_seqs, TruncateStmt* stmt);
extern void SetRelationHasSubclass(Oid relationId, bool relhassubclass);
extern ObjectAddress renameatt(RenameStmt* stmt);
@ -169,6 +173,7 @@ extern ObjectAddress RenameRelation(RenameStmt* stmt);
extern void RenameRelationInternal(Oid myrelid, const char* newrelname, char* newschema = NULL);
extern void ResetRelRewrite(Oid myrelid);
extern void find_composite_type_dependencies(Oid typeOid, Relation origRelation, const char* origTypeName);
extern void check_of_type(HeapTuple typetuple);

View File

@ -2956,6 +2956,7 @@ typedef struct knl_u_hook_context {
void *nullsMinimalPolicyHook;
void *getIgnoreKeywordTokenHook;
void *modifyTypeForPartitionKeyHook;
void *deparseCollectedCommandHook;
} knl_u_hook_context;
typedef struct knl_u_libsw_context {

View File

@ -37,6 +37,7 @@
/*****************************************************************************
* Backend version and inplace upgrade staffs
*****************************************************************************/
extern const uint32 PUBLICATION_DDL_AT_VERSION_NUM;
extern const uint32 PIPELINED_FUNCTION_VERSION_NUM;
extern const uint32 DISABLE_CONSTRAINT_VERSION_NUM;
extern const uint32 SUPPORT_GS_DEPENDENCY_VERSION_NUM;

View File

@ -1409,6 +1409,7 @@ typedef struct AlterFunctionStmt {
NodeTag type;
FuncWithArgs* func; /* name and args of function */
List* actions; /* list of DefElem */
bool isProcedure = false;
} AlterFunctionStmt;
enum CompileEntry {

View File

@ -1002,6 +1002,7 @@ typedef struct AlterTableCmd { /* one subcommand of an ALTER TABLE */
bool alterGPI; /* check whether is global partition index alter statement */
bool is_first; /* a flag of ALTER TABLE ... ADD ... FIRST */
char *after_name; /* column name of ALTER TABLE ... ADD ... AFTER column_name */
bool recursing;
} AlterTableCmd;
typedef struct AddTableIntoCBIState {
@ -1166,6 +1167,7 @@ typedef struct ColumnDef {
Form_pg_attribute dropped_attr; /* strcuture for dropped attribute during create table like OE */
char generatedCol; /* generated column setting */
Node *update_default;
char *initdefval;
} ColumnDef;
/*
@ -2473,6 +2475,7 @@ typedef struct RenameStmt {
bool missing_ok; /* skip error if missing? */
List* renameTargetList = NULL;
bool renameTableflag = false;
bool is_modifycolumn = false;
} RenameStmt;
/* ----------------------

View File

@ -21,11 +21,14 @@
*/
typedef enum DeparsedCommandType
{
DCT_ObjectCreate,
DCT_ObjectDrop,
DCT_SimpleCmd,
DCT_TableDropEnd,
DCT_TableDropStart
DCT_TableDropStart,
DCT_TableAlter,
DCT_ObjectCreate,
DCT_ObjectDrop,
DCT_TypeDropStart,
DCT_TypeDropEnd
} DeparsedCommandType;
/*

View File

@ -103,6 +103,10 @@ extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple,
LogicalRepTupleData *newtup);
extern void logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary);
extern LogicalRepRelId logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup);
extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[],
bool cascade, bool restart_seqs);
extern List *logicalrep_read_truncate(StringInfo in,
bool *cascade, bool *restart_seqs);
extern void logicalrep_write_rel(StringInfo out, Relation rel);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
extern void logicalrep_write_typ(StringInfo out, Oid typoid);

View File

@ -30,6 +30,7 @@ typedef enum OutputPluginOutputType {
*/
typedef struct OutputPluginOptions {
OutputPluginOutputType output_type;
bool receive_rewrites;
} OutputPluginOptions;
/*
@ -75,6 +76,15 @@ typedef void (*LogicalDecodeChangeCB)(
typedef void (*ParallelLogicalDecodeChangeCB)(
struct ParallelLogicalDecodingContext* ctx, ReorderBufferTXN* txn, Relation relation, ParallelReorderBufferChange* change);
/*
* Callback for every TRUNCATE in a successful transaction.
*/
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations,
Relation relations[],
ReorderBufferChange *change);
/*
* Called for every (explicit or implicit) COMMIT of a successful transaction.
*/
@ -107,6 +117,7 @@ typedef struct OutputPluginCallbacks {
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeAbortCB abort_cb;
LogicalDecodePrepareCB prepare_cb;
@ -119,6 +130,7 @@ typedef struct ParallelOutputPluginCallbacks {
LogicalDecodeStartupCB startup_cb;
LogicalDecodeBeginCB begin_cb;
ParallelLogicalDecodeChangeCB change_cb;
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeShutdownCB shutdown_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;

View File

@ -73,6 +73,7 @@ enum ReorderBufferChangeType {
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
REORDER_BUFFER_CHANGE_DDL,
REORDER_BUFFER_CHANGE_TRUNCATE,
REORDER_BUFFER_CHANGE_UINSERT,
REORDER_BUFFER_CHANGE_UUPDATE,
REORDER_BUFFER_CHANGE_UDELETE
@ -115,6 +116,17 @@ typedef struct ReorderBufferChange {
CommitSeqNo snapshotcsn;
} tp;
/*
* Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing
* one set of relations to be truncated.
*/
struct {
Size nrelids;
bool cascade;
bool restart_seqs;
Oid *relids;
} truncate;
/* Old, new utuples when action == UHEAP_INSERT|UPDATE|DELETE */
struct {
/* relation that has been changed */
@ -213,8 +225,8 @@ typedef struct ReorderBufferTXN {
XLogRecPtr restart_decoding_lsn;
/* origin of the change that caused this transaction */
RepOriginId origin_id;
XLogRecPtr origin_lsn;
RepOriginId origin_id;
XLogRecPtr origin_lsn;
/* The csn of the transaction */
CommitSeqNo csn;
@ -317,7 +329,9 @@ typedef struct ReorderBuffer ReorderBuffer;
/* change callback signature */
typedef void (*ReorderBufferApplyChangeCB)(
ReorderBuffer* rb, ReorderBufferTXN* txn, Relation relation, ReorderBufferChange* change);
/* truncate callback signature */
typedef void (*ReorderBufferApplyTruncateCB) (
ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
/* begin callback signature */
typedef void (*ReorderBufferBeginCB)(ReorderBuffer* rb, ReorderBufferTXN* txn);
@ -374,6 +388,7 @@ struct ReorderBuffer {
*/
ReorderBufferBeginCB begin;
ReorderBufferApplyChangeCB apply_change;
ReorderBufferApplyTruncateCB apply_truncate;
ReorderBufferCommitCB commit;
ReorderBufferAbortCB abort;
ReorderBufferPrepareCB prepare;
@ -384,6 +399,11 @@ struct ReorderBuffer {
*/
void* private_data;
/*
* Saved output plugin option
*/
bool output_rewrites;
/*
* Private memory context.
*/

View File

@ -16,6 +16,7 @@
/* Context info needed for deparsing ddl command */
typedef struct
{
bool verbose_mode;
/*
* include_owner indicates if the owner/role of the command should be
* included in the deparsed Json output. It is set to false for any commands
@ -29,9 +30,73 @@ typedef struct
char max_volatility;
} ddl_deparse_context;
extern Relation table_open(Oid relationId, LOCKMODE lockmode);
extern void table_close(Relation relation, LOCKMODE lockmode);
extern char *deparse_utility_command(CollectedCommand *cmd,
ddl_deparse_context * context);
extern char *deparse_ddl_json_to_string(char *jsonb, char** owner);
extern char *deparse_drop_command(const char *objidentity, const char *objecttype, Node *parsetree);
extern List *deparse_altertable_end(CollectedCommand *cmd);
extern bool relation_support_ddl_replication(Oid relid, bool rewrite = false);
/*
* Before they are turned into JSONB representation, each command is
* represented as an object tree, using the structs below.
*/
typedef enum
{
ObjTypeNull,
ObjTypeBool,
ObjTypeString,
ObjTypeArray,
ObjTypeInteger,
ObjTypeFloat,
ObjTypeObject
} ObjType;
/*
* Represent the command as an object tree.
*/
typedef struct ObjTree
{
slist_head params; /* Object tree parameters */
int numParams; /* Number of parameters in the object tree */
StringInfo fmtinfo; /* Format string of the ObjTree */
bool present; /* Indicates if boolean value should be stored */
} ObjTree;
/*
* An element of an object tree (ObjTree).
*/
typedef struct ObjElem
{
char *name; /* Name of object element */
ObjType objtype; /* Object type */
union {
bool boolean;
char *string;
int64 integer;
float8 flt;
ObjTree *object;
List *array;
} value; /* Store the object value based on the object
* type */
slist_node node; /* Used in converting back to ObjElem
* structure */
} ObjElem;
ObjTree *new_objtree_VA(const char *fmt, int numobjs, ...);
ObjElem *new_string_object(char *value);
void append_format_string(ObjTree *tree, char *sub_fmt);
void append_object_object(ObjTree *tree, char *sub_fmt, ObjTree *value);
void append_string_object(ObjTree *tree, char *sub_fmt, char *name,
const char *value);
void append_array_object(ObjTree *tree, char *sub_fmt, List *array);
typedef enum {
DEPARSE_SIMPLE_COMMAND,
ALTER_RELATION_SUBCMD
} collectCmdHookType;
typedef void *(*deparseCollectedCommand)(int type, CollectedCommand *cmd, CollectedATSubcmd *sub,
ddl_deparse_context *context);
#endif /* DDL_DEPARSE_H */

View File

@ -38,6 +38,7 @@ typedef struct CollectedATSubcmd
{
ObjectAddress address; /* affected column, constraint, index, ... */
Node *parsetree;
char *usingexpr;
} CollectedATSubcmd;
typedef struct CollectedCommand
@ -61,6 +62,7 @@ typedef struct CollectedCommand
{
Oid objectId;
Oid classId;
bool rewrite;
List *subcmds;
} alterTable;

View File

@ -416,6 +416,7 @@ typedef struct StdRdOptions {
/* SPQ OPTIONS */
int spq_bt_build_offset;
#endif
Oid relrewrite;
} StdRdOptions;
#define HEAP_MIN_FILLFACTOR 10
@ -543,6 +544,8 @@ typedef struct StdRdOptions {
((relation)->rd_options ? \
((StdRdOptions *) (relation)->rd_options)->parallel_workers : (defaultpw))
#define RelationGetRelrewriteOption(relation) \
((relation)->rd_options ? ((StdRdOptions*)(relation)->rd_options)->relrewrite : InvalidOid)
/*
* RelationIsValid
* True iff relation descriptor is valid.