!353 并行查询-并行worker不执行CallXactCallBacks
Merge pull request !353 from TotaJ/bugfix/parallel_xactcall
This commit is contained in:
@ -1056,10 +1056,12 @@ static void set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, RangeT
|
||||
* temporary buffers could be expensive, though, and we don't have
|
||||
* the rest of the necessary infrastructure right now anyway. So
|
||||
* for now, bail out if we see a temporary table.
|
||||
*
|
||||
* Don't support parallel query for foreign table.
|
||||
*/
|
||||
if (get_rel_persistence(rte->relid) == RELPERSISTENCE_TEMP)
|
||||
if (get_rel_persistence(rte->relid) == RELPERSISTENCE_TEMP || rte->relkind == RELKIND_FOREIGN_TABLE) {
|
||||
return;
|
||||
|
||||
}
|
||||
/* Don't support parallel for partitioned table. */
|
||||
if (rte->ispartrel) {
|
||||
return;
|
||||
|
||||
@ -84,6 +84,7 @@
|
||||
#include "access/clog.h"
|
||||
#include "access/csnlog.h"
|
||||
#include "access/htup.h"
|
||||
#include "access/parallel.h"
|
||||
#include "access/subtrans.h"
|
||||
#include "access/transam.h"
|
||||
#include "access/twophase.h"
|
||||
@ -2063,10 +2064,12 @@ void FinishPreparedTransaction(const char* gid, bool isCommit)
|
||||
MOTProcessRecoveredTransaction(xid, isCommit);
|
||||
}
|
||||
|
||||
if (isCommit) {
|
||||
CallXactCallbacks(XACT_EVENT_COMMIT_PREPARED);
|
||||
} else {
|
||||
CallXactCallbacks(XACT_EVENT_ROLLBACK_PREPARED);
|
||||
if (!IsParallelWorker()) {
|
||||
if (isCommit) {
|
||||
CallXactCallbacks(XACT_EVENT_COMMIT_PREPARED);
|
||||
} else {
|
||||
CallXactCallbacks(XACT_EVENT_ROLLBACK_PREPARED);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@ -2099,8 +2102,10 @@ void FinishPreparedTransaction(const char* gid, bool isCommit)
|
||||
commitLibraryLen,
|
||||
hdr->initfileinval);
|
||||
|
||||
/* Release MOT locks */
|
||||
CallXactCallbacks(XACT_EVENT_END_TRANSACTION);
|
||||
if (!IsParallelWorker()) {
|
||||
/* Release MOT locks */
|
||||
CallXactCallbacks(XACT_EVENT_END_TRANSACTION);
|
||||
}
|
||||
} else {
|
||||
RecordTransactionAbortPrepared(xid,
|
||||
hdr->nsubxacts,
|
||||
|
||||
@ -2424,7 +2424,9 @@ static void StartTransaction(bool begin_on_gtm)
|
||||
/* done with start processing, set current transaction state to "in progress" */
|
||||
s->state = TRANS_INPROGRESS;
|
||||
|
||||
CallXactCallbacks(XACT_EVENT_START);
|
||||
if (!IsParallelWorker()) {
|
||||
CallXactCallbacks(XACT_EVENT_START);
|
||||
}
|
||||
|
||||
if (module_logging_is_on(MOD_TRANS_XACT)) {
|
||||
ereport(LOG,
|
||||
@ -2781,19 +2783,18 @@ static void CommitTransaction(bool stpCommit)
|
||||
Assert(BCMArrayIsEmpty());
|
||||
}
|
||||
|
||||
/*
|
||||
* For MOT, CallXactCallbacks should be called be called before RecordTransactionCommit.
|
||||
*
|
||||
* Commit MOT Engine - do this as late as possible to allow
|
||||
* atomic cross transaction between PG tables and MOT tables
|
||||
* in the future.
|
||||
*/
|
||||
CallXactCallbacks(XACT_EVENT_COMMIT);
|
||||
|
||||
/*
|
||||
* Here is where we really truly local commit.
|
||||
*/
|
||||
if (!is_parallel_worker) {
|
||||
/*
|
||||
* For MOT, CallXactCallbacks should be called be called before RecordTransactionCommit.
|
||||
*
|
||||
* Commit MOT Engine - do this as late as possible to allow
|
||||
* atomic cross transaction between PG tables and MOT tables
|
||||
* in the future.
|
||||
*/
|
||||
CallXactCallbacks(XACT_EVENT_COMMIT);
|
||||
latestXid = RecordTransactionCommit();
|
||||
} else {
|
||||
/*
|
||||
@ -2894,8 +2895,10 @@ static void CommitTransaction(bool stpCommit)
|
||||
|
||||
TRACE_POSTGRESQL_TRANSACTION_COMMIT(t_thrd.proc->lxid);
|
||||
|
||||
/* Release MOT locks */
|
||||
CallXactCallbacks(XACT_EVENT_END_TRANSACTION);
|
||||
if (!is_parallel_worker) {
|
||||
/* Release MOT locks */
|
||||
CallXactCallbacks(XACT_EVENT_END_TRANSACTION);
|
||||
}
|
||||
|
||||
/*
|
||||
* Let others know about no transaction in progress by me. Note that this
|
||||
@ -3875,7 +3878,9 @@ static void AbortTransaction(bool PerfectRollback, bool stpRollback)
|
||||
*/
|
||||
AfterTriggerEndXact(false); /* 'false' means it's abort */
|
||||
|
||||
CallXactCallbacks(XACT_EVENT_PREROLLBACK_CLEANUP);
|
||||
if (!is_parallel_worker) {
|
||||
CallXactCallbacks(XACT_EVENT_PREROLLBACK_CLEANUP);
|
||||
}
|
||||
|
||||
AtAbort_Portals(stpRollback);
|
||||
AtEOXact_LargeObject(false);
|
||||
@ -3926,7 +3931,9 @@ static void AbortTransaction(bool PerfectRollback, bool stpRollback)
|
||||
if (t_thrd.utils_cxt.TopTransactionResourceOwner != NULL) {
|
||||
bool change_user_name = false;
|
||||
instr_report_workload_xact_info(false);
|
||||
CallXactCallbacks(XACT_EVENT_ABORT);
|
||||
if (!is_parallel_worker) {
|
||||
CallXactCallbacks(XACT_EVENT_ABORT);
|
||||
}
|
||||
|
||||
ResourceOwnerRelease(t_thrd.utils_cxt.TopTransactionResourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, false, true);
|
||||
AtEOXact_Buffers(false);
|
||||
|
||||
@ -268,6 +268,20 @@ EXPLAIN (VERBOSE, COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = postgres_fdw_a
|
||||
EXPLAIN (VERBOSE, COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 === t1.c2;
|
||||
EXPLAIN (VERBOSE, COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = abs(t1.c2);
|
||||
EXPLAIN (VERBOSE, COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = t1.c2;
|
||||
-- don't do parallel query with foreign table
|
||||
set force_parallel_mode=on;
|
||||
set parallel_setup_cost=0;
|
||||
set parallel_tuple_cost=0.000005;
|
||||
set max_parallel_workers_per_gather=2;
|
||||
set min_parallel_table_scan_size=0;
|
||||
set parallel_leader_participation=on;
|
||||
EXPLAIN (VERBOSE, COSTS false) SELECT * FROM ft1;
|
||||
reset force_parallel_mode;
|
||||
reset parallel_setup_cost;
|
||||
reset parallel_tuple_cost;
|
||||
reset max_parallel_workers_per_gather;
|
||||
reset min_parallel_table_scan_size;
|
||||
reset parallel_leader_participation;
|
||||
|
||||
-- ===================================================================
|
||||
-- WHERE with remotely-executable conditions
|
||||
|
||||
@ -484,6 +484,27 @@ EXPLAIN (VERBOSE, COSTS false) SELECT * FROM ft1 t1 WHERE t1.c1 = t1.c2;
|
||||
Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" = c2))
|
||||
(3 rows)
|
||||
|
||||
-- don't do parallel query with foreign table
|
||||
set force_parallel_mode=on;
|
||||
set parallel_setup_cost=0;
|
||||
set parallel_tuple_cost=0.000005;
|
||||
set max_parallel_workers_per_gather=2;
|
||||
set min_parallel_table_scan_size=0;
|
||||
set parallel_leader_participation=on;
|
||||
EXPLAIN (VERBOSE, COSTS false) SELECT * FROM ft1;
|
||||
QUERY PLAN
|
||||
-------------------------------------------------------------------------
|
||||
Foreign Scan on public.ft1
|
||||
Output: c1, c2, c3, c4, c5, c6, c7, c8
|
||||
Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1"
|
||||
(3 rows)
|
||||
|
||||
reset force_parallel_mode;
|
||||
reset parallel_setup_cost;
|
||||
reset parallel_tuple_cost;
|
||||
reset max_parallel_workers_per_gather;
|
||||
reset min_parallel_table_scan_size;
|
||||
reset parallel_leader_participation;
|
||||
-- ===================================================================
|
||||
-- WHERE with remotely-executable conditions
|
||||
-- ===================================================================
|
||||
|
||||
Reference in New Issue
Block a user