!3761 【回合】修复merge into同时并发update/delete的问题

Merge pull request !3761 from pengjiong/cherry-pick-1689228628
This commit is contained in:
opengauss_bot
2023-07-13 07:30:08 +00:00
committed by Gitee
11 changed files with 429 additions and 33 deletions

View File

@ -394,6 +394,7 @@ static bool ExecMergeMatched(ModifyTableState* mtstate, EState* estate, TupleTab
slot = ExecMergeProjQual(mtstate, mergeMatchedActionStates, econtext, slot, slot, estate);
if (slot != NULL) {
TM_Result out_result;
(void)ExecUpdate(tupleid,
oldPartitionOid,
bucketid,
@ -404,7 +405,12 @@ static bool ExecMergeMatched(ModifyTableState* mtstate, EState* estate, TupleTab
mtstate,
mtstate->canSetTag,
partKeyUpdated,
&out_result,
partExprKeyStr);
/* the matched row has been delted or after updated, the row does not matched, change to insert. */
if (out_result == TM_Deleted || out_result == TM_Updated) {
return false;
}
}
if (action->commandType == CMD_UPDATE /* && tuple_updated*/)
InstrCountFiltered2(&mtstate->ps, 1);

View File

@ -719,9 +719,10 @@ checktest:
ExecProject(resultRelInfo->ri_updateProj, NULL);
/* Evaluate where qual if exists, add to count if filtered */
if (ExecQual(upsertState->us_updateWhere, econtext, false)) {
TM_Result out_result;
*returning = ExecUpdate(conflictTid, oldPartitionOid, bucketid, NULL,
upsertState->us_updateproj, planSlot, &mtstate->mt_epqstate,
mtstate, canSetTag, ((ModifyTable*)mtstate->ps.plan)->partKeyUpdated, partExprKeyStr);
mtstate, canSetTag, ((ModifyTable*)mtstate->ps.plan)->partKeyUpdated, &out_result, partExprKeyStr);
} else {
InstrCountFiltered1(&mtstate->ps, 1);
}
@ -2008,6 +2009,33 @@ end:;
return NULL;
}
/*
* check whether the tuple still match the merge condition after the tuple has been updated by other transaction.
* [in] node, executor node
* [in] epq_slot, the tuple slot after recheck on the updated tuple
* [in] mergeMatchedActionStates, update action state
* [in] fake_relation, heap table relation
* [in/out] slot, in: origin tuple slot, out: slot after merge projection
* [out] tuple, if the updated tuple still match the merge condition, return the new projected tuple
* [return value] true for match, false for not match.
*/
static bool MatchMergeCondition(ModifyTableState* node, TupleTableSlot* epq_slot, List* mergeMatchedActionStates,
Relation fake_relation, AttrNumber junkAttno, TupleTableSlot** slot, Tuple *tuple)
{
/* resultRelInfo->ri_mergeState is always not null */
*slot = ExecMergeProjQual(node, mergeMatchedActionStates, node->ps.ps_ExprContext, epq_slot, *slot, node->ps.state);
if (*slot != NULL) {
bool is_null = false;
/* Check after epq, whether the ctid is null. null means the updated row does not match */
(void)ExecGetJunkAttribute(epq_slot, junkAttno, &is_null);
if (!is_null) {
*tuple = tableam_tslot_get_tuple_from_slot(fake_relation, *slot);
return true;
}
}
return false;
}
/* ----------------------------------------------------------------
* ExecUpdate
*
@ -2031,7 +2059,7 @@ end:;
TupleTableSlot* ExecUpdate(ItemPointer tupleid,
Oid oldPartitionOid, /* when update a partitioned table , give a partitionOid to find the tuple */
int2 bucketid, HeapTupleHeader oldtuple, TupleTableSlot* slot, TupleTableSlot* planSlot, EPQState* epqstate,
ModifyTableState* node, bool canSetTag, bool partKeyUpdate, char* partExprKeyStr)
ModifyTableState* node, bool canSetTag, bool partKeyUpdate, TM_Result* out_result, char* partExprKeyStr)
{
EState* estate = node->ps.state;
Tuple tuple = NULL;
@ -2062,7 +2090,7 @@ TupleTableSlot* ExecUpdate(ItemPointer tupleid,
bool allow_update_self = (node->mt_upsert != NULL &&
node->mt_upsert->us_action != UPSERT_NONE) ? true : false;
*out_result = TM_Ok;
/*
* get information on the (current) result relation
*/
@ -2304,6 +2332,7 @@ lreplace:
estate->es_crosscheck_snapshot, estate->es_snapshot, true, // wait for commit
&oldslot, &tmfd, &update_indexes, &modifiedIdxAttrs, allow_update_self,
allowInplaceUpdate, &lockmode);
*out_result = result;
switch (result) {
case TM_SelfUpdated:
case TM_SelfModified:
@ -2343,7 +2372,6 @@ lreplace:
tupleid, node->delete_delta_rel);
}
}
break;
case TM_Updated: {
@ -2386,14 +2414,8 @@ lreplace:
* from plan slot.
*/
if (node->operation == CMD_MERGE) {
List* mergeMatchedActionStates = NIL;
/* resultRelInfo->ri_mergeState is always not null */
mergeMatchedActionStates = result_rel_info->ri_mergeState->matchedActionStates;
slot = ExecMergeProjQual(
node, mergeMatchedActionStates, node->ps.ps_ExprContext, epq_slot, slot, estate);
if (slot != NULL) {
tuple = tableam_tslot_get_tuple_from_slot(fake_relation, slot);
if (MatchMergeCondition(node, epq_slot, result_rel_info->ri_mergeState->matchedActionStates,
fake_relation, result_rel_info->ri_junkFilter->jf_junkAttNo, &slot, &tuple)) {
goto lreplace;
}
} else {
@ -2626,6 +2648,7 @@ lreplace:
allow_update_self,
allowInplaceUpdate,
&lockmode);
*out_result = result;
switch (result) {
case TM_SelfUpdated:
case TM_SelfModified:
@ -2696,14 +2719,8 @@ lreplace:
* projected from plan slot.
*/
if (node->operation == CMD_MERGE) {
List *mergeMatchedActionStates = NIL;
/* resultRelInfo->ri_mergeState is always not null */
mergeMatchedActionStates = result_rel_info->ri_mergeState->matchedActionStates;
slot = ExecMergeProjQual(node, mergeMatchedActionStates,
node->ps.ps_ExprContext, epq_slot, slot, estate);
if (slot != NULL) {
tuple = tableam_tslot_get_tuple_from_slot(fake_relation, slot);
if (MatchMergeCondition(node, epq_slot, result_rel_info->ri_mergeState->matchedActionStates,
fake_relation, result_rel_info->ri_junkFilter->jf_junkAttNo, &slot, &tuple)) {
goto lreplace;
}
} else {
@ -2839,7 +2856,7 @@ ldelete:
&oldslot,
&tmfd,
allow_update_self);
*out_result = result;
switch (result) {
case TM_SelfUpdated:
case TM_SelfModified:
@ -2938,15 +2955,8 @@ ldelete:
* needs slot to be projected from plan slot.
*/
if (node->operation == CMD_MERGE) {
List* mergeMatchedActionStates = NIL;
/* resultRelInfo->ri_mergeState is always not null */
mergeMatchedActionStates =
result_rel_info->ri_mergeState->matchedActionStates;
slot = ExecMergeProjQual(node, mergeMatchedActionStates,
node->ps.ps_ExprContext, epq_slot, slot, estate);
if (slot != NULL) {
tuple = tableam_tslot_get_tuple_from_slot(old_fake_relation, slot);
if (MatchMergeCondition(node, epq_slot, result_rel_info->ri_mergeState->matchedActionStates,
old_fake_relation, result_rel_info->ri_junkFilter->jf_junkAttNo, &slot, &tuple)) {
goto ldelete;
}
} else {
@ -3709,7 +3719,8 @@ static TupleTableSlot* ExecModifyTable(PlanState* state)
slot = ExecReplace(estate, node, slot, plan_slot, bucketid, hi_options, partition_list, partExprKeyStr);
}
break;
case CMD_UPDATE:
case CMD_UPDATE: {
TM_Result out_result;
slot = ExecUpdate(tuple_id,
old_partition_oid,
bucketid,
@ -3720,8 +3731,9 @@ static TupleTableSlot* ExecModifyTable(PlanState* state)
node,
node->canSetTag,
part_key_updated,
&out_result,
partExprKeyStr);
break;
} break;
case CMD_DELETE:
slot = ExecDelete(
tuple_id, old_partition_oid, bucketid, old_tuple, plan_slot, &node->mt_epqstate, node, node->canSetTag);

View File

@ -46,7 +46,7 @@ extern TupleTableSlot* ExecDelete(ItemPointer tupleid, Oid deletePartitionOid, i
extern TupleTableSlot* ExecUpdate(ItemPointer tupleid, Oid oldPartitionOid, int2 bucketid, HeapTupleHeader oldtuple,
TupleTableSlot* slot, TupleTableSlot* planSlot, EPQState* epqstate, ModifyTableState* node, bool canSetTag,
bool partKeyUpdate, char* partExprKeyStr = NULL);
bool partKeyUpdate, TM_Result* out_result, char* partExprKeyStr = NULL);
template <bool useHeapMultiInsert>
extern TupleTableSlot* ExecInsertT(ModifyTableState* state, TupleTableSlot* slot, TupleTableSlot* planSlot,

View File

@ -0,0 +1,73 @@
CREATE SCHEMA merge_concurrent_update_delete_1;
SET current_schema = merge_concurrent_update_delete_1;
-- test merge with concurrent update/delete
CREATE TABLE merge_1(a int, b int, c int);
CREATE TABLE merge_2(a int);
insert into merge_1 values(1,1,1);
insert into merge_2 values(1);
-- concurrent update on join condition row, after update, the row doesn't match, so the merge should go to not match condition
\parallel on 2
begin
update merge_1 set a=22 where a=1;
perform pg_sleep(3);
end;
/
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
select * from merge_1 order by a; -- two rows(merge not matched)
a | b | c
----+---+---
8 | 8 | 8
22 | 1 | 1
(2 rows)
delete from merge_1;
insert into merge_1 values(1,1,1);
-- concurrent update on non-join condition row, after update, the row still match, so the merge should go to match condition
\parallel on 2
begin
update merge_1 set b=22 where a=1;
perform pg_sleep(3);
end;
/
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
select * from merge_1 order by a; -- one rows(merge matched)
a | b | c
---+----+---
1 | 66 | 1
(1 row)
delete from merge_1;
insert into merge_1 values(1,1,1);
-- concurrent delete on join condition row, after delete, the merge should go to not match condition
\parallel on 2
begin
delete from merge_1;
perform pg_sleep(3);
end;
/
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
select * from merge_1 order by a; -- one new rows(merge not matched)
a | b | c
---+---+---
8 | 8 | 8
(1 row)
drop schema merge_concurrent_update_delete_1 cascade;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table merge_1
drop cascades to table merge_2

View File

@ -0,0 +1,78 @@
CREATE SCHEMA merge_concurrent_update_delete_2;
SET current_schema = merge_concurrent_update_delete_2;
-- test merge with concurrent update/delete on partition table(now row-movement)
CREATE TABLE merge_1(a int, b int, c int) partition by range(a) (partition p1 values less than(10), partition p2 values less than(maxvalue)) ENABLE ROW MOVEMENT;
CREATE TABLE merge_2(a int);
insert into merge_1 values(1,1,1);
insert into merge_2 values(1);
-- concurrent update on join condition row, lead row movement
\parallel on 2
begin
update merge_1 set a=22 where a=1; -- row movement, will be delete and insert
perform pg_sleep(3);
end;
/
-- this transaction will failed cause first transaction doing a row movement update, which lead the row be deleted and insert to another partition
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
ERROR: partition table update conflict
DETAIL: disable row movement of table can avoid this conflict
CONTEXT: SQL statement "merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8)"
PL/pgSQL function inline_code_block line 3 at SQL statement
select * from merge_1 order by a; -- one rows(22,1,1)
a | b | c
----+---+---
22 | 1 | 1
(1 row)
delete from merge_1;
insert into merge_1 values(1,1,1);
-- concurrent update on join condition row, no row movement, after update, the row doesn't match, so the merge should go to not match condition
\parallel on 2
begin
update merge_1 set a=9 where a=1;
perform pg_sleep(3);
end;
/
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
select * from merge_1 order by a; -- two rows(merge not matched)
a | b | c
---+---+---
8 | 8 | 8
9 | 1 | 1
(2 rows)
delete from merge_1;
insert into merge_1 values(1,1,1);
-- concurrent update on non-join condition row, no row movement, after update, the row still match, so the merge should go to match condition
\parallel on 2
begin
update merge_1 set b=88 where a=1;
perform pg_sleep(3);
end;
/
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
select * from merge_1 order by a; -- one rows(merge matched)
a | b | c
---+----+---
1 | 66 | 1
(1 row)
drop schema merge_concurrent_update_delete_2 cascade;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table merge_1
drop cascades to table merge_2

View File

@ -0,0 +1,52 @@
CREATE SCHEMA merge_concurrent_update_delete_3;
SET current_schema = merge_concurrent_update_delete_3;
-- test merge with concurrent update/delete
CREATE TABLE merge_1(a int, b int, c int);
CREATE TABLE merge_2(a int);
insert into merge_1 values(1,1,1);
insert into merge_2 values(1);
-- concurrent delete on join condition row, after delete, the row doesn't match, so the merge should go to not match condition
\parallel on 2
begin
delete from merge_1 where a=1;
perform pg_sleep(3);
end;
/
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
select * from merge_1 order by a; -- one new rows(merge not matched)
a | b | c
---+---+---
8 | 8 | 8
(1 row)
delete from merge_1;
insert into merge_1 values(1,1,1);
insert into merge_1 values(2,2,2);
-- concurrent delete on non-join condition row, after delete, the row still match, so the merge should go to match condition
\parallel on 2
begin
delete from merge_1 where a=2;
perform pg_sleep(3);
end;
/
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
select * from merge_1 order by a; -- one rows(merge matched)
a | b | c
---+----+---
1 | 66 | 1
(1 row)
drop schema merge_concurrent_update_delete_3 cascade;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table merge_1
drop cascades to table merge_2

View File

@ -277,6 +277,7 @@ test: single_node_forbidden
test: single_node_mergeinto merge_subquery merge_subquery3 merge_1
test: merge_where_col
test: merge_concurrent_update_delete_1 merge_concurrent_update_delete_2 merge_concurrent_update_delete_3
# Trigger tests
test: single_node_triggers

View File

@ -269,6 +269,7 @@ test: single_node_forbidden
test: single_node_mergeinto merge_subquery merge_subquery3 merge_1
test: merge_where_col
test: merge_concurrent_update_delete_1 merge_concurrent_update_delete_2 merge_concurrent_update_delete_3
# Trigger tests
test: single_node_triggers

View File

@ -0,0 +1,62 @@
CREATE SCHEMA merge_concurrent_update_delete_1;
SET current_schema = merge_concurrent_update_delete_1;
-- test merge with concurrent update/delete
CREATE TABLE merge_1(a int, b int, c int);
CREATE TABLE merge_2(a int);
insert into merge_1 values(1,1,1);
insert into merge_2 values(1);
-- concurrent update on join condition row, after update, the row doesn't match, so the merge should go to not match condition
\parallel on 2
begin
update merge_1 set a=22 where a=1;
perform pg_sleep(3);
end;
/
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
select * from merge_1 order by a; -- two rows(merge not matched)
delete from merge_1;
insert into merge_1 values(1,1,1);
-- concurrent update on non-join condition row, after update, the row still match, so the merge should go to match condition
\parallel on 2
begin
update merge_1 set b=22 where a=1;
perform pg_sleep(3);
end;
/
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
select * from merge_1 order by a; -- one rows(merge matched)
delete from merge_1;
insert into merge_1 values(1,1,1);
-- concurrent delete on join condition row, after delete, the merge should go to not match condition
\parallel on 2
begin
delete from merge_1;
perform pg_sleep(3);
end;
/
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
select * from merge_1 order by a; -- one new rows(merge not matched)
drop schema merge_concurrent_update_delete_1 cascade;

View File

@ -0,0 +1,65 @@
CREATE SCHEMA merge_concurrent_update_delete_2;
SET current_schema = merge_concurrent_update_delete_2;
-- test merge with concurrent update/delete on partition table(now row-movement)
CREATE TABLE merge_1(a int, b int, c int) partition by range(a) (partition p1 values less than(10), partition p2 values less than(maxvalue)) ENABLE ROW MOVEMENT;
CREATE TABLE merge_2(a int);
insert into merge_1 values(1,1,1);
insert into merge_2 values(1);
-- concurrent update on join condition row, lead row movement
\parallel on 2
begin
update merge_1 set a=22 where a=1; -- row movement, will be delete and insert
perform pg_sleep(3);
end;
/
-- this transaction will failed cause first transaction doing a row movement update, which lead the row be deleted and insert to another partition
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
select * from merge_1 order by a; -- one rows(22,1,1)
delete from merge_1;
insert into merge_1 values(1,1,1);
-- concurrent update on join condition row, no row movement, after update, the row doesn't match, so the merge should go to not match condition
\parallel on 2
begin
update merge_1 set a=9 where a=1;
perform pg_sleep(3);
end;
/
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
select * from merge_1 order by a; -- two rows(merge not matched)
delete from merge_1;
insert into merge_1 values(1,1,1);
-- concurrent update on non-join condition row, no row movement, after update, the row still match, so the merge should go to match condition
\parallel on 2
begin
update merge_1 set b=88 where a=1;
perform pg_sleep(3);
end;
/
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
select * from merge_1 order by a; -- one rows(merge matched)
drop schema merge_concurrent_update_delete_2 cascade;

View File

@ -0,0 +1,46 @@
CREATE SCHEMA merge_concurrent_update_delete_3;
SET current_schema = merge_concurrent_update_delete_3;
-- test merge with concurrent update/delete
CREATE TABLE merge_1(a int, b int, c int);
CREATE TABLE merge_2(a int);
insert into merge_1 values(1,1,1);
insert into merge_2 values(1);
-- concurrent delete on join condition row, after delete, the row doesn't match, so the merge should go to not match condition
\parallel on 2
begin
delete from merge_1 where a=1;
perform pg_sleep(3);
end;
/
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
select * from merge_1 order by a; -- one new rows(merge not matched)
delete from merge_1;
insert into merge_1 values(1,1,1);
insert into merge_1 values(2,2,2);
-- concurrent delete on non-join condition row, after delete, the row still match, so the merge should go to match condition
\parallel on 2
begin
delete from merge_1 where a=2;
perform pg_sleep(3);
end;
/
begin
perform pg_sleep(1);
merge INTO merge_1 p1 using (select * from merge_2) p2 on (p1.a=p2.a) when matched then update set p1.b=66 when NOT MATCHED THEN INSERT (a,b,c) values(8,8,8);
end;
/
\parallel off
select * from merge_1 order by a; -- one rows(merge matched)
drop schema merge_concurrent_update_delete_3 cascade;