|
|
|
|
@ -582,17 +582,6 @@ void UnPopulateImcs(Relation rel)
|
|
|
|
|
PG_END_TRY();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void GetColNamesForStandBy(Relation rel, int2vector* imcsAttsNum, int imcsNatts, List* &colList, int* nameLength)
|
|
|
|
|
{
|
|
|
|
|
Assert(imcsAttsNum != NULL && imcsNatts > 0);
|
|
|
|
|
FormData_pg_attribute *rel_atts = rel->rd_att->attrs;
|
|
|
|
|
for (int i = 0; i < imcsNatts; i++) {
|
|
|
|
|
AttrNumber attnumber = imcsAttsNum->values[i];
|
|
|
|
|
colList = lappend(colList, makeString(NameStr(rel_atts[attnumber - 1].attname)));
|
|
|
|
|
*nameLength += (strlen(NameStr(rel_atts[attnumber - 1].attname)) + 1);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PopulateImcsOnStandby(Oid relOid, StringInfo inputMsg)
|
|
|
|
|
{
|
|
|
|
|
int imcsNatts = 0;
|
|
|
|
|
@ -742,12 +731,9 @@ void ParsePopulateImcsParam(
|
|
|
|
|
rc = memcpy_s(&count, sizeof(int), pq_getmsgbytes(inputMsg, sizeof(int)), sizeof(int));
|
|
|
|
|
securec_check(rc, "\0", "\0");
|
|
|
|
|
|
|
|
|
|
char **colList = NULL;
|
|
|
|
|
if (count > 0) {
|
|
|
|
|
colList = (char **)palloc(count * sizeof(char *));
|
|
|
|
|
for (int i = 0; i < count; i++)
|
|
|
|
|
colList[i] = (char *)pq_getmsgstring(inputMsg);
|
|
|
|
|
}
|
|
|
|
|
int2 *attsNums = (int2*)palloc(sizeof(int2) * count);
|
|
|
|
|
rc = memcpy_s(attsNums, sizeof(int2) * count, pq_getmsgbytes(inputMsg, sizeof(int2) * count), sizeof(int2) * count);
|
|
|
|
|
securec_check(rc, "\0", "\0");
|
|
|
|
|
|
|
|
|
|
/* get current lsn from primary */
|
|
|
|
|
rc = memcpy_s(currentLsn, sizeof(XLogRecPtr), pq_getmsgbytes(inputMsg, sizeof(XLogRecPtr)),
|
|
|
|
|
@ -756,14 +742,9 @@ void ParsePopulateImcsParam(
|
|
|
|
|
ereport(DEBUG1, (errmsg("Received lsn for HTAP population.")));
|
|
|
|
|
pq_getmsgend(inputMsg);
|
|
|
|
|
|
|
|
|
|
int j = 0;
|
|
|
|
|
imcsAttsNum = buildint2vector(NULL, count);
|
|
|
|
|
for (int cnt = 0; cnt < count; cnt++) {
|
|
|
|
|
AttrNumber attnum = get_attnum(relOid, colList[cnt]);
|
|
|
|
|
imcsAttsNum->values[j] = attnum;
|
|
|
|
|
j++;
|
|
|
|
|
}
|
|
|
|
|
imcsAttsNum = buildint2vector(attsNums, count);
|
|
|
|
|
*imcsNatts = count;
|
|
|
|
|
pfree(attsNums);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -975,7 +956,8 @@ static void HandlePgxcReceive(int connCount, PGXCNodeHandle **tempConnections)
|
|
|
|
|
pfree_ext(tempConnections);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void PackBasicImcstoredRequest(PGXCNodeHandle *temp_connection, SendPopulateParams* populateParams)
|
|
|
|
|
static void PackBasicImcstoredRequest(
|
|
|
|
|
PGXCNodeHandle *temp_connection, SendPopulateParams* populateParams, int imcsNatts)
|
|
|
|
|
{
|
|
|
|
|
errno_t ss_rc = EOK;
|
|
|
|
|
int msglen = populateParams->msglen;
|
|
|
|
|
@ -1004,21 +986,15 @@ static void PackBasicImcstoredRequest(PGXCNodeHandle *temp_connection, SendPopul
|
|
|
|
|
securec_check(ss_rc, "\0", "\0");
|
|
|
|
|
temp_connection->outEnd += sizeof(Oid);
|
|
|
|
|
|
|
|
|
|
int colCount = list_length(populateParams->columnlist);
|
|
|
|
|
if (colCount > 0) {
|
|
|
|
|
if (imcsNatts > 0) {
|
|
|
|
|
ss_rc = memcpy_s(temp_connection->outBuffer + temp_connection->outEnd,
|
|
|
|
|
temp_connection->outSize - temp_connection->outEnd, &colCount, sizeof(int));
|
|
|
|
|
temp_connection->outSize - temp_connection->outEnd, &imcsNatts, sizeof(int));
|
|
|
|
|
securec_check(ss_rc, "\0", "\0");
|
|
|
|
|
temp_connection->outEnd += sizeof(int);
|
|
|
|
|
ListCell *cell;
|
|
|
|
|
foreach (cell, populateParams->columnlist) {
|
|
|
|
|
char *col_name = strVal(lfirst(cell));
|
|
|
|
|
int name_len = strlen(col_name) + 1;
|
|
|
|
|
ss_rc = memcpy_s(temp_connection->outBuffer + temp_connection->outEnd,
|
|
|
|
|
temp_connection->outSize - temp_connection->outEnd, col_name, name_len);
|
|
|
|
|
securec_check(ss_rc, "\0", "\0");
|
|
|
|
|
temp_connection->outEnd += name_len;
|
|
|
|
|
}
|
|
|
|
|
ss_rc = memcpy_s(temp_connection->outBuffer + temp_connection->outEnd,
|
|
|
|
|
temp_connection->outSize - temp_connection->outEnd, populateParams->attsNums, sizeof(int2) * imcsNatts);
|
|
|
|
|
securec_check(ss_rc, "\0", "\0");
|
|
|
|
|
temp_connection->outEnd += sizeof(int2) * imcsNatts;
|
|
|
|
|
XLogRecPtr lsn = t_thrd.shemem_ptr_cxt.XLogCtl->LogwrtRqst.Write;
|
|
|
|
|
ss_rc = memcpy_s(temp_connection->outBuffer + temp_connection->outEnd,
|
|
|
|
|
temp_connection->outSize - temp_connection->outEnd, &lsn, sizeof(XLogRecPtr));
|
|
|
|
|
@ -1027,7 +1003,7 @@ static void PackBasicImcstoredRequest(PGXCNodeHandle *temp_connection, SendPopul
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void SendImcstoredRequest(Oid relOid, Oid specifyPartOid, List *columnlist, int length, int type)
|
|
|
|
|
void SendImcstoredRequest(Oid relOid, Oid specifyPartOid, int2* attsNums, int imcsNatts, int type)
|
|
|
|
|
{
|
|
|
|
|
int connCount = 0;
|
|
|
|
|
PGXCNodeHandle** connections = NULL;
|
|
|
|
|
@ -1036,10 +1012,11 @@ void SendImcstoredRequest(Oid relOid, Oid specifyPartOid, List *columnlist, int
|
|
|
|
|
/* init send populate params */
|
|
|
|
|
populateParams.relOid = relOid;
|
|
|
|
|
populateParams.partOid = specifyPartOid;
|
|
|
|
|
populateParams.columnlist = columnlist;
|
|
|
|
|
populateParams.attsNums = attsNums;
|
|
|
|
|
populateParams.imcstoreType = type;
|
|
|
|
|
populateParams.msglen =
|
|
|
|
|
sizeof(int) + sizeof(int) + sizeof(Oid) + sizeof(Oid) + sizeof(int) + length + sizeof(XLogRecPtr);
|
|
|
|
|
sizeof(int) + sizeof(int) + sizeof(Oid) + sizeof(Oid) + sizeof(int) +
|
|
|
|
|
imcsNatts * sizeof(int2) + sizeof(XLogRecPtr);
|
|
|
|
|
|
|
|
|
|
connections = GetStandbyConnections(&connCount);
|
|
|
|
|
PGXCNodeHandle **temp_connections = NULL;
|
|
|
|
|
@ -1058,7 +1035,7 @@ void SendImcstoredRequest(Oid relOid, Oid specifyPartOid, List *columnlist, int
|
|
|
|
|
temp_connections[i]->remoteNodeName, temp_connections[i]->gsock.idx, temp_connections[i]->gsock.sid,
|
|
|
|
|
temp_connections[i]->state);
|
|
|
|
|
|
|
|
|
|
PackBasicImcstoredRequest(temp_connections[i], &populateParams);
|
|
|
|
|
PackBasicImcstoredRequest(temp_connections[i], &populateParams, imcsNatts);
|
|
|
|
|
|
|
|
|
|
if (pgxc_node_flush(temp_connections[i]) != 0) {
|
|
|
|
|
temp_connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL;
|
|
|
|
|
@ -1081,7 +1058,7 @@ void SendUnImcstoredRequest(Oid relOid, Oid specifyPartOid, int type)
|
|
|
|
|
/* init send populate params */
|
|
|
|
|
populateParams.relOid = relOid;
|
|
|
|
|
populateParams.partOid = specifyPartOid;
|
|
|
|
|
populateParams.columnlist = NULL;
|
|
|
|
|
populateParams.attsNums = NULL;
|
|
|
|
|
populateParams.imcstoreType = type;
|
|
|
|
|
populateParams.msglen =
|
|
|
|
|
sizeof(int) + sizeof(int) + sizeof(Oid) + sizeof(Oid);
|
|
|
|
|
@ -1102,7 +1079,7 @@ void SendUnImcstoredRequest(Oid relOid, Oid specifyPartOid, int type)
|
|
|
|
|
temp_connections[i]->remoteNodeName, temp_connections[i]->gsock.idx, temp_connections[i]->gsock.sid,
|
|
|
|
|
temp_connections[i]->state);
|
|
|
|
|
|
|
|
|
|
PackBasicImcstoredRequest(temp_connections[i], &populateParams);
|
|
|
|
|
PackBasicImcstoredRequest(temp_connections[i], &populateParams, 0);
|
|
|
|
|
if (pgxc_node_flush(temp_connections[i]) != 0) {
|
|
|
|
|
temp_connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL;
|
|
|
|
|
ereport(ERROR, (errcode(ERRCODE_CONNECTION_EXCEPTION),
|
|
|
|
|
|