支持分区表用表达式作为分区键

This commit is contained in:
WangXiuqiang
2022-11-21 17:25:33 +08:00
parent aca1831e9d
commit 76108778f1
19 changed files with 1825 additions and 117 deletions

View File

@ -1015,6 +1015,7 @@ static Oid ExecUpsert(ModifyTableState* state, TupleTableSlot* slot, TupleTableS
return newid;
}
extern Tuple ComputePartKeyExprTuple(Relation rel, EState *estate, TupleTableSlot *slot, Tuple oldtuple, Relation partRel);
/* ----------------------------------------------------------------
* ExecInsert
*
@ -1342,8 +1343,11 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple
case PARTTYPE_PARTITIONED_RELATION: {
/* get partititon oid for insert the record */
partition_id =
heapTupleGetPartitionId(result_relation_desc, tuple, false, estate->es_plannedstmt->hasIgnore);
Tuple newtuple = ComputePartKeyExprTuple(result_relation_desc, estate, slot, tuple, NULL);
if (newtuple)
partition_id = heapTupleGetPartitionId(result_relation_desc, newtuple, false, estate->es_plannedstmt->hasIgnore);
else
partition_id = heapTupleGetPartitionId(result_relation_desc, tuple, false, estate->es_plannedstmt->hasIgnore);
/* if cannot find valid partition oid and sql has keyword ignore, return and don't insert */
if (estate->es_plannedstmt->hasIgnore && partition_id == InvalidOid) {
return NULL;
@ -1400,8 +1404,11 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple
Partition subPart = NULL;
/* get partititon oid for insert the record */
partitionId = heapTupleGetPartitionId(result_relation_desc, tuple, false,
estate->es_plannedstmt->hasIgnore);
Tuple newtuple = ComputePartKeyExprTuple(result_relation_desc, estate, slot, tuple, NULL);
if (newtuple)
partitionId = heapTupleGetPartitionId(result_relation_desc, newtuple, false, estate->es_plannedstmt->hasIgnore);
else
partitionId = heapTupleGetPartitionId(result_relation_desc, tuple, false, estate->es_plannedstmt->hasIgnore);
if (estate->es_plannedstmt->hasIgnore && partitionId == InvalidOid) {
return NULL;
}
@ -1418,7 +1425,11 @@ TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, Tuple
}
/* get subpartititon oid for insert the record */
subPartitionId = heapTupleGetPartitionId(partRel, tuple, false, estate->es_plannedstmt->hasIgnore);
Tuple newsubtuple = ComputePartKeyExprTuple(result_relation_desc, estate, slot, tuple, partRel);
if (newsubtuple)
subPartitionId = heapTupleGetPartitionId(partRel, newsubtuple, false, estate->es_plannedstmt->hasIgnore);
else
subPartitionId = heapTupleGetPartitionId(partRel, tuple, false, estate->es_plannedstmt->hasIgnore);
if (estate->es_plannedstmt->hasIgnore && subPartitionId == InvalidOid) {
return NULL;
}
@ -2407,15 +2418,25 @@ lreplace:
row_movement = false;
new_partId = oldPartitionOid;
} else {
partitionRoutingForTuple(result_relation_desc, tuple, u_sess->exec_cxt.route, can_ignore);
Tuple newtuple = ComputePartKeyExprTuple(result_relation_desc, estate, slot, tuple, NULL);
if (newtuple) {
partitionRoutingForTuple(result_relation_desc, newtuple, u_sess->exec_cxt.route, can_ignore);
} else {
partitionRoutingForTuple(result_relation_desc, tuple, u_sess->exec_cxt.route, can_ignore);
}
if (u_sess->exec_cxt.route->fileExist) {
new_partId = u_sess->exec_cxt.route->partitionId;
if (RelationIsSubPartitioned(result_relation_desc)) {
Partition part = partitionOpen(result_relation_desc, new_partId, RowExclusiveLock);
Relation partRel = partitionGetRelation(result_relation_desc, part);
Tuple newsubtuple = ComputePartKeyExprTuple(result_relation_desc, estate, slot, tuple, partRel);
if (newsubtuple) {
partitionRoutingForTuple(partRel, newsubtuple, u_sess->exec_cxt.route, can_ignore);
} else {
partitionRoutingForTuple(partRel, tuple, u_sess->exec_cxt.route, can_ignore);
}
partitionRoutingForTuple(partRel, tuple, u_sess->exec_cxt.route, can_ignore);
if (u_sess->exec_cxt.route->fileExist) {
new_partId = u_sess->exec_cxt.route->partitionId;
} else {