sync all inner code

This commit is contained in:
openGaussDev
2022-10-27 20:21:03 +08:00
committed by yanghao
parent a10741b11e
commit f7d23913d6
520 changed files with 63136 additions and 9761 deletions

View File

@ -226,8 +226,8 @@ static bool pg_decode_filter(LogicalDecodingContext* ctx, RepOriginId origin_id)
}
/* print the tuple 'tuple' into the StringInfo s */
static void TupleToJsoninfo(
cJSON* cols_name, cJSON* cols_type, cJSON* cols_val, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
static void TupleToJsoninfo(Relation relation, cJSON* cols_name, cJSON* cols_type, cJSON* cols_val, TupleDesc tupdesc,
HeapTuple tuple, bool isOld)
{
if ((tuple->tupTableType == HEAP_TUPLE) && (HEAP_TUPLE_IS_COMPRESSED(tuple->t_data) ||
(int)HeapTupleHeaderGetNatts(tuple->t_data, tupdesc) > tupdesc->natts)) {
@ -254,7 +254,7 @@ static void TupleToJsoninfo(
* Don't print system columns, oid will already have been printed if
* present.
*/
if (attr->attnum < 0)
if (attr->attnum < 0 || (isOld && !IsRelationReplidentKey(relation, attr->attnum)))
continue;
Oid typid = attr->atttypid; /* type of current attribute */
@ -265,8 +265,6 @@ static void TupleToJsoninfo(
} else {
origval = uheap_getattr((UHeapTuple)tuple, natt + 1, tupdesc, &isnull);
}
if (isnull && skip_nulls)
continue;
/* print attribute name */
@ -355,53 +353,53 @@ static void pg_decode_change(
case REORDER_BUFFER_CHANGE_INSERT:
op_type = cJSON_CreateString("INSERT");
if (change->data.tp.newtuple != NULL) {
TupleToJsoninfo(
columns_name, columns_type, columns_val, tupdesc, &change->data.tp.newtuple->tuple, false);
TupleToJsoninfo(relation, columns_name, columns_type, columns_val, tupdesc,
&change->data.tp.newtuple->tuple, false);
}
break;
case REORDER_BUFFER_CHANGE_UPDATE:
op_type = cJSON_CreateString("UPDATE");
if (change->data.tp.oldtuple != NULL) {
TupleToJsoninfo(
old_keys_name, old_keys_type, old_keys_val, tupdesc, &change->data.tp.oldtuple->tuple, true);
TupleToJsoninfo(relation, old_keys_name, old_keys_type, old_keys_val, tupdesc,
&change->data.tp.oldtuple->tuple, true);
}
if (change->data.tp.newtuple != NULL) {
TupleToJsoninfo(
columns_name, columns_type, columns_val, tupdesc, &change->data.tp.newtuple->tuple, false);
TupleToJsoninfo(relation, columns_name, columns_type, columns_val, tupdesc,
&change->data.tp.newtuple->tuple, false);
}
break;
case REORDER_BUFFER_CHANGE_DELETE:
op_type = cJSON_CreateString("DELETE");
if (change->data.tp.oldtuple != NULL) {
TupleToJsoninfo(
old_keys_name, old_keys_type, old_keys_val, tupdesc, &change->data.tp.oldtuple->tuple, true);
TupleToJsoninfo(relation, old_keys_name, old_keys_type, old_keys_val, tupdesc,
&change->data.tp.oldtuple->tuple, true);
}
/* if there was no PK, we only know that a delete happened */
break;
case REORDER_BUFFER_CHANGE_UINSERT:
op_type = cJSON_CreateString("INSERT");
if (change->data.utp.newtuple != NULL) {
TupleToJsoninfo(columns_name, columns_type, columns_val, tupdesc,
TupleToJsoninfo(relation, columns_name, columns_type, columns_val, tupdesc,
(HeapTuple)(&change->data.utp.newtuple->tuple), false);
}
break;
case REORDER_BUFFER_CHANGE_UDELETE:
op_type = cJSON_CreateString("DELETE");
if (change->data.utp.oldtuple != NULL) {
TupleToJsoninfo(old_keys_name, old_keys_type, old_keys_val, tupdesc,
TupleToJsoninfo(relation, old_keys_name, old_keys_type, old_keys_val, tupdesc,
(HeapTuple)(&change->data.utp.oldtuple->tuple), true);
}
break;
case REORDER_BUFFER_CHANGE_UUPDATE:
op_type = cJSON_CreateString("UPDATE");
if (change->data.utp.oldtuple != NULL) {
TupleToJsoninfo(old_keys_name, old_keys_type, old_keys_val, tupdesc,
TupleToJsoninfo(relation, old_keys_name, old_keys_type, old_keys_val, tupdesc,
(HeapTuple)(&change->data.utp.oldtuple->tuple), true);
}
if (change->data.utp.newtuple != NULL) {
TupleToJsoninfo(columns_name, columns_type, columns_val, tupdesc,
TupleToJsoninfo(relation, columns_name, columns_type, columns_val, tupdesc,
(HeapTuple)&change->data.utp.newtuple->tuple, false);
}
break;