openGauss支持发布订阅

This commit is contained in:
xue_meng_en
2021-12-15 15:04:22 +08:00
committed by TotaJ
parent 4bf1feab89
commit eaaf873a9f
87 changed files with 5856 additions and 438 deletions

View File

@ -87,8 +87,11 @@
#include "catalog/pg_pltemplate.h"
#include "catalog/pg_proc.h"
#include "catalog/gs_package.h"
#include "catalog/pg_publication.h"
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_range.h"
#include "catalog/pg_recyclebin.h"
#include "catalog/pg_replication_origin.h"
#include "catalog/pg_resource_pool.h"
#include "catalog/pg_rewrite.h"
#include "catalog/pg_rlspolicy.h"
@ -98,6 +101,7 @@
#include "catalog/pg_shseclabel.h"
#include "catalog/pg_statistic.h"
#include "catalog/pg_statistic_ext.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_synonym.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_trigger.h"
@ -311,6 +315,13 @@ static const FormData_pg_attribute Desc_gs_opt_model[Natts_gs_opt_model] = {Sche
static const FormData_pg_attribute Desc_gs_model_warehouse[Natts_gs_model_warehouse] = {Schema_gs_model_warehouse};
static const FormData_pg_attribute Desc_gs_package[Natts_gs_package] = {Schema_gs_package};
static const FormData_pg_attribute Desc_pg_subscription[Natts_pg_subscription] = {Schema_pg_subscription};
static const FormData_pg_attribute Desc_pg_publication[Natts_pg_publication] = {Schema_pg_publication};
static const FormData_pg_attribute Desc_pg_publication_rel[Natts_pg_publication_rel] = {Schema_pg_publication_rel};
static const FormData_pg_attribute Desc_pg_replication_origin[Natts_pg_replication_origin] = {
Schema_pg_replication_origin
};
/* Please add to the array in ascending order of oid value */
static struct CatalogRelationBuildParam catalogBuildParam[CATALOG_NUM] = {{DefaultAclRelationId,
"pg_default_acl",
@ -1120,6 +1131,42 @@ static struct CatalogRelationBuildParam catalogBuildParam[CATALOG_NUM] = {{Defau
Desc_gs_opt_model,
false,
true},
{SubscriptionRelationId,
"pg_subscription",
SubscriptionRelation_Rowtype_Id,
true,
true,
Natts_pg_subscription,
Desc_pg_subscription,
false,
true},
{PublicationRelationId,
"pg_publication",
PublicationRelation_Rowtype_Id,
false,
true,
Natts_pg_publication,
Desc_pg_publication,
false,
true},
{PublicationRelRelationId,
"pg_publication_rel",
PublicationRelRelationId_Rowtype_Id,
false,
true,
Natts_pg_publication_rel,
Desc_pg_publication_rel,
false,
true},
{ReplicationOriginRelationId,
"pg_replication_origin",
ReplicationOriginRelationId_Rowtype_Id,
true,
false,
Natts_pg_replication_origin,
Desc_pg_replication_origin,
false,
true},
{PackageRelationId,
"gs_package",
PackageRelation_Rowtype_Id,
@ -3453,7 +3500,11 @@ static void RelationDestroyRelation(Relation relation, bool remember_tupdesc)
list_free_ext(relation->rd_indexlist);
bms_free_ext(relation->rd_indexattr);
bms_free_ext(relation->rd_keyattr);
bms_free_ext(relation->rd_pkattr);
bms_free_ext(relation->rd_idattr);
if (relation->rd_options) {
pfree_ext(relation->rd_pubactions);
}
FreeTriggerDesc(relation->trigdesc);
if (relation->rd_rlsdesc) {
MemoryContextDelete(relation->rd_rlsdesc->rlsCxt);
@ -4166,6 +4217,7 @@ void AtEOXact_RelationCache(bool isCommit)
list_free_ext(relation->rd_indexlist);
relation->rd_indexlist = NIL;
relation->rd_oidindex = InvalidOid;
relation->rd_pkindex = InvalidOid;
relation->rd_indexvalid = 0;
}
}
@ -4243,6 +4295,7 @@ void AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid, SubTrans
list_free_ext(relation->rd_indexlist);
relation->rd_indexlist = NIL;
relation->rd_oidindex = InvalidOid;
relation->rd_pkindex = InvalidOid;
relation->rd_indexvalid = 0;
}
}
@ -5656,6 +5709,7 @@ List* RelationGetIndexList(Relation relation, bool inc_unused)
/* Now save a copy of the completed list in the relcache entry. */
if (!inc_unused) {
SaveCopyList(relation, result, oidIndex);
relation->rd_pkindex = pkeyIndex;
}
return result;
@ -5857,6 +5911,11 @@ void RelationSetIndexList(Relation relation, List* indexIds, Oid oidIndex)
list_free_ext(relation->rd_indexlist);
relation->rd_indexlist = indexIds;
relation->rd_oidindex = oidIndex;
/*
* For the moment, assume the target rel hasn't got a pk or replica
* index. We'll load them on demand in the API that wraps access to them.
*/
relation->rd_pkindex = InvalidOid;
relation->rd_indexvalid = 2; /* mark list as forced */
/* must flag that we have a forced index list */
u_sess->relcache_cxt.need_eoxact_work = true;
@ -5888,6 +5947,25 @@ Oid RelationGetOidIndex(Relation relation)
return relation->rd_oidindex;
}
/*
* RelationGetPrimaryKeyIndex -- get OID of the relation's primary key index
*
* Returns InvalidOid if there is no such index.
*/
Oid RelationGetPrimaryKeyIndex(Relation relation)
{
List* ilist;
if (relation->rd_indexvalid == 0) {
/* RelationGetIndexList does the heavy lifting. */
ilist = RelationGetIndexList(relation);
list_free(ilist);
Assert(relation->rd_indexvalid != 0);
}
return relation->rd_pkindex;
}
/*
* RelationGetReplicaIndex -- get OID of the relation's replica identity index
* If the table is partition table, return the replica identity index of parent table.
@ -6173,7 +6251,9 @@ Bitmapset* RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind att
Bitmapset* uindexattrs = NULL;
List* indexoidlist = NULL;
ListCell* l = NULL;
Bitmapset *pkindexattrs; /* columns in the primary index */
Bitmapset* idindexattrs = NULL; /* columns in the the replica identity */
Oid relpkindex;
Oid relreplindex;
MemoryContext oldcxt;
@ -6185,6 +6265,8 @@ Bitmapset* RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind att
return bms_copy(relation->rd_indexattr);
case INDEX_ATTR_BITMAP_KEY:
return bms_copy(relation->rd_keyattr);
case INDEX_ATTR_BITMAP_PRIMARY_KEY:
return bms_copy(relation->rd_pkattr);
case INDEX_ATTR_BITMAP_IDENTITY_KEY:
return bms_copy(relation->rd_idattr);
default:
@ -6206,12 +6288,14 @@ Bitmapset* RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind att
return NULL;
/*
* Copy the rd_replidindex value computed by RelationGetIndexList before
* proceeding. This is needed because a relcache flush could occur inside
* index_open below, resetting the fields managed by RelationGetIndexList.
* (The values we're computing will still be valid, assuming that caller
* has a sufficient lock on the relation.)
*/
* Copy the rd_pkindex and rd_replidindex value computed by
* RelationGetIndexList before proceeding. This is needed because a
* relcache flush could occur inside index_open below, resetting the
* fields managed by RelationGetIndexList. (The values we're computing
* will still be valid, assuming that caller has a sufficient lock on
* the relation.)
*/
relpkindex = relation->rd_pkindex;
relreplindex = relation->rd_replidindex;
/*
@ -6226,6 +6310,7 @@ Bitmapset* RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind att
*/
indexattrs = NULL;
uindexattrs = NULL;
pkindexattrs = NULL;
idindexattrs = NULL;
foreach (l, indexoidlist) {
Oid indexOid = lfirst_oid(l);
@ -6233,6 +6318,7 @@ Bitmapset* RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind att
IndexInfo* indexInfo = NULL;
int i;
bool isKey = false; /* candidate key */
bool isPK = false; /* primary key */
bool isIDKey = false; /* replica identity index */
indexDesc = index_open(indexOid, AccessShareLock);
@ -6242,6 +6328,8 @@ Bitmapset* RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind att
/* Can this index be referenced by a foreign key? */
isKey = indexInfo->ii_Unique && indexInfo->ii_Expressions == NIL && indexInfo->ii_Predicate == NIL;
/* Is this a primary key? */
isPK = (indexOid == relpkindex);
/* Is this index the configured (or default) replica identity? */
isIDKey = (indexOid == relreplindex);
@ -6265,6 +6353,10 @@ Bitmapset* RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind att
}
if (isIDKey && i < indexInfo->ii_NumIndexKeyAttrs)
idindexattrs = bms_add_member(idindexattrs, attrnum - FirstLowInvalidHeapAttributeNumber);
if (isPK) {
pkindexattrs = bms_add_member(pkindexattrs, attrnum - FirstLowInvalidHeapAttributeNumber);
}
}
}
@ -6279,6 +6371,14 @@ Bitmapset* RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind att
list_free_ext(indexoidlist);
/* Don't leak the old values of these bitmaps, if any */
bms_free(relation->rd_indexattr);
relation->rd_indexattr = NULL;
bms_free(relation->rd_pkattr);
relation->rd_pkattr = NULL;
bms_free(relation->rd_idattr);
relation->rd_idattr = NULL;
/*
* Now save copies of the bitmaps in the relcache entry. We intentionally
* set rd_indexattr last, because that's the one that signals validity of
@ -6288,6 +6388,7 @@ Bitmapset* RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind att
*/
oldcxt = MemoryContextSwitchTo(u_sess->cache_mem_cxt);
relation->rd_keyattr = bms_copy(uindexattrs);
relation->rd_pkattr = bms_copy(pkindexattrs);
relation->rd_idattr = bms_copy(idindexattrs);
relation->rd_indexattr = bms_copy(indexattrs);
(void)MemoryContextSwitchTo(oldcxt);
@ -6298,6 +6399,8 @@ Bitmapset* RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind att
return indexattrs;
case INDEX_ATTR_BITMAP_KEY:
return uindexattrs;
case INDEX_ATTR_BITMAP_PRIMARY_KEY:
return bms_copy(relation->rd_pkattr);
case INDEX_ATTR_BITMAP_IDENTITY_KEY:
return idindexattrs;
default:
@ -6482,6 +6585,70 @@ void RelationGetExclusionInfo(Relation indexRelation, Oid** operators, Oid** pro
(void)MemoryContextSwitchTo(oldcxt);
}
/*
* Get publication actions for the given relation.
*/
struct PublicationActions* GetRelationPublicationActions(Relation relation)
{
List* puboids;
ListCell* lc;
MemoryContext oldcxt;
int rc;
PublicationActions* pubactions = (PublicationActions*)palloc0(sizeof(PublicationActions));
if (relation->rd_pubactions) {
errno_t rcs = memcpy_s(pubactions, sizeof(PublicationActions),
relation->rd_pubactions, sizeof(PublicationActions));
securec_check(rcs, "\0", "\0");
return pubactions;
}
/* Fetch the publication membership info. */
puboids = GetRelationPublications(RelationGetRelid(relation));
puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
foreach(lc, puboids)
{
Oid pubid = lfirst_oid(lc);
HeapTuple tup;
Form_pg_publication pubform;
tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for publication %u", pubid);
pubform = (Form_pg_publication) GETSTRUCT(tup);
pubactions->pubinsert |= pubform->pubinsert;
pubactions->pubupdate |= pubform->pubupdate;
pubactions->pubdelete |= pubform->pubdelete;
ReleaseSysCache(tup);
/*
* If we know everything is replicated, there is no point to check
* for other publications.
*/
if (pubactions->pubinsert && pubactions->pubupdate && pubactions->pubdelete)
break;
}
if (relation->rd_pubactions) {
pfree(relation->rd_pubactions);
relation->rd_pubactions = NULL;
}
/* Now save copy of the actions in the relcache entry. */
oldcxt = MemoryContextSwitchTo(u_sess->cache_mem_cxt);
relation->rd_pubactions = (PublicationActions*)palloc(sizeof(PublicationActions));
rc = memcpy_s(relation->rd_pubactions, sizeof(PublicationActions), pubactions, sizeof(PublicationActions));
securec_check(rc, "", "");
MemoryContextSwitchTo(oldcxt);
return pubactions;
}
/*
* load_relcache_init_file, write_relcache_init_file
*
@ -6854,9 +7021,12 @@ static bool load_relcache_init_file(bool shared)
rel->rd_indexvalid = 0;
rel->rd_indexlist = NIL;
rel->rd_oidindex = InvalidOid;
rel->rd_pkindex = InvalidOid;
rel->rd_indexattr = NULL;
rel->rd_keyattr = NULL;
rel->rd_pkattr = NULL;
rel->rd_idattr = NULL;
rel->rd_pubactions = NULL;
rel->rd_createSubid = InvalidSubTransactionId;
rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
rel->rd_amcache = NULL;
@ -7811,4 +7981,30 @@ static void SetupPageCompressForRelation(Relation relation, PageCompressOpts* co
compress_options->compressDiffConvert, preallocChunks,
symbol, compressLevel, algorithm, chunkSize);
}
}
}
char RelationGetRelReplident(Relation r)
{
bool isNull = false;
char relreplident;
HeapTuple tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(RelationGetRelid(r)));
if (!HeapTupleIsValid(tuple)) {
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("cache lookup failed for relation %u", RelationGetRelid(r))));
}
Relation classRel = heap_open(RelationRelationId, AccessShareLock);
Datum datum = heap_getattr(tuple, Anum_pg_class_relreplident, RelationGetDescr(classRel), &isNull);
if (isNull) {
relreplident = REPLICA_IDENTITY_NOTHING;
} else {
relreplident = CharGetDatum(datum);
}
heap_close(classRel, AccessShareLock);
ReleaseSysCache(tuple);
return relreplident;
}