parallel index scans

This commit is contained in:
chenxiaobin
2020-11-05 16:39:20 +08:00
parent 957587f4ea
commit c45feed3e4
52 changed files with 1530 additions and 298 deletions

View File

@ -21,6 +21,9 @@
* ExecEndIndexScan releases all storage.
* ExecIndexMarkPos marks scan position.
* ExecIndexRestrPos restores scan position.
* ExecIndexScanEstimate estimates DSM space needed for parallel index scan
* ExecIndexScanInitializeDSM initialize DSM for parallel indexscan
* ExecIndexScanInitializeWorker attach to DSM info in parallel worker
*/
#include "postgres.h"
#include "knl/knl_variable.h"
@ -179,6 +182,19 @@ TupleTableSlot* ExecIndexScan(IndexScanState* node)
*/
void ExecReScanIndexScan(IndexScanState* node)
{
bool reset_parallel_scan = true;
/*
* If we are here to just update the scan keys, then don't reset parallel
* scan. We don't want each of the participating process in the parallel
* scan to update the shared parallel scan state at the start of the scan.
* It is quite possible that one of the participants has already begun
* scanning the index when another has yet to start it.
*/
if (node->iss_NumRuntimeKeys != 0 && !node->iss_RuntimeKeysReady) {
reset_parallel_scan = false;
}
/*
* For recursive-stream rescan, if number of RuntimeKeys not euqal zero,
* just return without rescan.
@ -231,9 +247,20 @@ void ExecReScanIndexScan(IndexScanState* node)
}
}
/* reset index scan */
abs_idx_rescan(
node->iss_ScanDesc, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys, node->iss_NumOrderByKeys);
/*
* Reset (parallel) index scan. For parallel-aware nodes, the scan
* descriptor is initialized during actual execution of node and we can
* reach here before that (ex. during execution of nest loop join). So,
* avoid updating the scan descriptor at that time.
*/
if (node->iss_ScanDesc) {
abs_idx_rescan(node->iss_ScanDesc, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys,
node->iss_NumOrderByKeys);
if (reset_parallel_scan && GetIndexScanDesc(node->iss_ScanDesc)->parallel_scan) {
index_parallelrescan(GetIndexScanDesc(node->iss_ScanDesc));
}
}
ExecScanReScan(&node->ss);
}
@ -678,28 +705,36 @@ IndexScanState* ExecInitIndexScan(IndexScan* node, EState* estate, int eflags)
}
} else {
/*
* Initialize scan descriptor.
* for parallel-aware node, we initialize the scan descriptor after
* initializing the shared memory for parallel execution.
*/
index_state->iss_ScanDesc = abs_idx_beginscan(current_relation,
index_state->iss_RelationDesc,
estate->es_snapshot,
index_state->iss_NumScanKeys,
index_state->iss_NumOrderByKeys,
(ScanState*)index_state);
if (!node->scan.plan.parallel_aware) {
/*
* Initialize scan descriptor.
*/
index_state->iss_ScanDesc = abs_idx_beginscan(current_relation,
index_state->iss_RelationDesc,
estate->es_snapshot,
index_state->iss_NumScanKeys,
index_state->iss_NumOrderByKeys,
(ScanState*)index_state);
}
}
/*
* If no run-time keys to calculate, go ahead and pass the scankeys to the
* index AM.
*/
if (index_state->iss_ScanDesc == NULL) {
index_state->ss.ps.stubType = PST_Scan;
} else if (index_state->iss_NumRuntimeKeys == 0) {
abs_idx_rescan_local(index_state->iss_ScanDesc,
index_state->iss_ScanKeys,
index_state->iss_NumScanKeys,
index_state->iss_OrderByKeys,
index_state->iss_NumOrderByKeys);
if (!node->scan.plan.parallel_aware) {
/*
* If no run-time keys to calculate, go ahead and pass the scankeys to the
* index AM.
*/
if (index_state->iss_ScanDesc == NULL) {
index_state->ss.ps.stubType = PST_Scan;
} else if (index_state->iss_NumRuntimeKeys == 0) {
abs_idx_rescan_local(index_state->iss_ScanDesc,
index_state->iss_ScanKeys,
index_state->iss_NumScanKeys,
index_state->iss_OrderByKeys,
index_state->iss_NumOrderByKeys);
}
}
/*
@ -1337,3 +1372,85 @@ void ExecInitPartitionForIndexScan(IndexScanState* index_state, EState* estate)
}
}
}
/* ----------------------------------------------------------------
* Parallel Scan Support
* ----------------------------------------------------------------
*/
/* ----------------------------------------------------------------
* ExecIndexScanEstimate
*
* estimates the space required to serialize indexscan node.
* ----------------------------------------------------------------
*/
void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt)
{
EState *estate = node->ss.ps.state;
node->iss_PscanLen = index_parallelscan_estimate(node->iss_RelationDesc, estate->es_snapshot);
}
/* ----------------------------------------------------------------
* ExecIndexScanInitializeDSM
*
* Set up a parallel index scan descriptor.
* ----------------------------------------------------------------
*/
void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt, int nodeid)
{
EState *estate = node->ss.ps.state;
knl_u_parallel_context *cxt = (knl_u_parallel_context *)pcxt->seg;
/* Here we can't use palloc, cause we have switch to old memctx in ExecInitParallelPlan */
cxt->pwCtx->queryInfo.piscan[nodeid] =
(ParallelIndexScanDesc)MemoryContextAllocZero(cxt->memCtx, node->iss_PscanLen);
index_parallelscan_initialize(node->ss.ss_currentRelation, node->iss_PscanLen, node->iss_RelationDesc,
estate->es_snapshot, cxt->pwCtx->queryInfo.piscan[nodeid]);
cxt->pwCtx->queryInfo.piscan[nodeid]->plan_node_id = node->ss.ps.plan->plan_node_id;
node->iss_ScanDesc = (AbsTblScanDesc)index_beginscan_parallel(node->ss.ss_currentRelation, node->iss_RelationDesc,
node->iss_NumScanKeys, node->iss_NumOrderByKeys, cxt->pwCtx->queryInfo.piscan[nodeid]);
/*
* If no run-time keys to calculate, go ahead and pass the scankeys to the
* index AM.
*/
if (node->iss_NumRuntimeKeys == 0) {
abs_idx_rescan(node->iss_ScanDesc, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys,
node->iss_NumOrderByKeys);
}
}
/* ----------------------------------------------------------------
* ExecIndexScanInitializeWorker
*
* Copy relevant information from TOC into planstate.
* ----------------------------------------------------------------
*/
void ExecIndexScanInitializeWorker(IndexScanState *node, void *context)
{
ParallelIndexScanDesc piscan = NULL;
knl_u_parallel_context *cxt = (knl_u_parallel_context *)context;
for (int i = 0; i < cxt->pwCtx->queryInfo.piscan_num; i++) {
if (node->ss.ps.plan->plan_node_id == cxt->pwCtx->queryInfo.piscan[i]->plan_node_id) {
piscan = cxt->pwCtx->queryInfo.piscan[i];
break;
}
}
if (piscan == NULL) {
ereport(ERROR, (errmsg("could not find plan info, plan node id:%d", node->ss.ps.plan->plan_node_id)));
}
node->iss_ScanDesc = (AbsTblScanDesc)index_beginscan_parallel(node->ss.ss_currentRelation, node->iss_RelationDesc,
node->iss_NumScanKeys, node->iss_NumOrderByKeys, piscan);
/*
* If no run-time keys to calculate, go ahead and pass the scankeys to the
* index AM.
*/
if (node->iss_NumRuntimeKeys == 0) {
abs_idx_rescan(node->iss_ScanDesc, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys,
node->iss_NumOrderByKeys);
}
}