openGauss资源池化支持多机并行

This commit is contained in:
quemingjian
2023-10-12 18:39:23 +08:00
parent c1aaeea5b2
commit ea7ff3627b
140 changed files with 10069 additions and 353 deletions

View File

@ -482,4 +482,7 @@ extern bool ResolveCminCmaxDuringDecoding(
struct HTAB* tuplecid_data, Snapshot snapshot, HeapTuple htup, Buffer buffer, CommandId* cmin, CommandId* cmax);
extern TableScanDesc heap_beginscan_internal(Relation relation, Snapshot snapshot, int nkeys, ScanKey key,
uint32 flags, ParallelHeapScanDesc parallel_scan, RangeScanInRedis rangeScanInRedis = {false, 0, 0});
#ifdef USE_SPQ
extern Relation try_table_open(Oid relationId, LOCKMODE lockmode);
#endif
#endif /* HEAPAM_H */

View File

@ -27,6 +27,10 @@
#include "access/relscan.h"
#include "nodes/execnodes.h"
#ifdef USE_SPQ
#include "access/spq_btbuild.h"
#endif
/* There's room for a 16-bit vacuum cycle ID in BTPageOpaqueData */
typedef uint16 BTCycleId;
@ -1104,6 +1108,9 @@ typedef struct BTWriteState {
BlockNumber btws_pages_alloced; /* # pages allocated */
BlockNumber btws_pages_written; /* # pages written out */
Page btws_zeropage; /* workspace for filling zeroes */
#ifdef USE_SPQ
SPQLeaderState *spqleader; /* spq btbuild leader */
#endif
} BTWriteState;
typedef struct BTOrderedIndexListElement {
@ -1242,6 +1249,10 @@ typedef struct {
* BTBuildState. Workers have their own spool and spool2, though.)
*/
BTLeader *btleader;
#ifdef USE_SPQ
/* spq btbuild leader */
SPQLeaderState *spqleader;
#endif
} BTBuildState;
/*
@ -1414,6 +1425,9 @@ extern void btree_check_third_page(Relation rel, Relation heap, bool need_heapti
extern int btree_num_keep_atts_fast(Relation rel, IndexTuple lastleft, IndexTuple firstright);
extern bool btree_allequalimage(Relation rel, bool debugmessage);
#ifdef USE_SPQ
extern void spq_load(BTWriteState wstate);
#endif
/*
* prototypes for functions in nbtxlog.c

View File

@ -0,0 +1,103 @@
/* -------------------------------------------------------------------------
*
* spq_btbuild.h
*
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/access/spq_btbuild.h
*
* -------------------------------------------------------------------------
*/
#ifdef USE_SPQ
#ifndef SPQ_BTBUILD_H
#define SPQ_BTBUILD_H
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "executor/spi.h"
#include "utils/portal.h"
#define NANOSECONDS_PER_MILLISECOND 1000000L
#define NANOSECONDS_PER_SECOND 1000000000L
#define SPQ_BATCH_SIZE (u_sess->attr.attr_spq.spq_batch_size)
#define SPQ_MEM_SIZE (u_sess->attr.attr_spq.spq_mem_size)
#define SPQ_QUEUE_SIZE (u_sess->attr.attr_spq.spq_queue_size)
#define ENABLE_SPQ (u_sess->attr.attr_spq.spq_enable_btbuild)
#define GET_IDX(i) ((i + 1) % SPQ_QUEUE_SIZE)
#define SPQ_BUFFER_SIZE \
(sizeof(IndexTupleBuffer) + sizeof(Size) * SPQ_BATCH_SIZE + sizeof(char) * SPQ_MEM_SIZE * (INDEX_MAX_KEYS + 1))
#define SPQ_SHARED_SIZE SPQ_BUFFER_SIZE *SPQ_QUEUE_SIZE
#define GET_BUFFER(SPQ_SHARED, INDEX) ((IndexTupleBuffer *)((char *)SPQ_SHARED->addr + SPQ_BUFFER_SIZE * INDEX))
#define GET_BUFFER_MEM(ITUPLE) ((char *)ITUPLE->addr + sizeof(Size) * SPQ_BATCH_SIZE)
typedef struct SPQSharedContext {
/*
* These fields are not modified during the sort. They primarily exist
* for the benefit of worker processes that need to create BTSpool state
* corresponding to that used by the leader.
*/
Oid heaprelid;
Oid indexrelid;
bool isunique;
bool isconcurrent;
slock_t mutex;
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
volatile bool done; /* flag if all tuples have been fetched */
volatile int bufferidx; /* buffer index */
volatile int consumer; /* buffer consume */
volatile int producer; /* buffer produce */
char addr[0]; /* varlen */
Snapshot snapshot;
int dop;
} SPQSharedContext;
typedef struct IndexTupleBuffer {
volatile int queue_size; /* the number of index tuples in this buffer */
volatile int idx; /* current tuple index in this buffer */
volatile int offset; /* memory offset in this buffer */
Size addr[0]; /* varlen offset: ituple + mem */
} IndexTupleBuffer;
typedef struct SPQLeaderState {
Relation heap;
Relation index;
IndexTupleBuffer *buffer;
SPQSharedContext *shared;
double processed;
Snapshot snapshot;
} SPQLeaderState;
typedef struct SPQWorkerState {
SPIPlanPtr plan;
Portal portal;
StringInfo sql;
Relation heap;
Relation index;
SPQSharedContext *shared;
uint64 processed; /* location from the last produce buffer */
bool all_fetched; /* flag if worker has managed all tuples */
} SPQWorkerState;
Datum spqbtbuild(Relation heap, Relation index, IndexInfo *indexInfo);
IndexTuple spq_consume(SPQLeaderState *spqleader);
bool enable_spq_btbuild(Relation rel);
bool enable_spq_btbuild_cic(Relation rel);
#endif // SPQ_BTBUILD_H
#endif

View File

@ -62,5 +62,9 @@ typedef FormData_pg_foreign_server *Form_pg_foreign_server;
#define Anum_pg_foreign_server_srvacl 6
#define Anum_pg_foreign_server_srvoptions 7
#ifdef USE_SPQ
#define GS_EXTTABLE_SERVER_NAME "gs_exttable_server"
#endif
#endif /* PG_FOREIGN_SERVER_H */

View File

@ -471,6 +471,11 @@ typedef FormData_pg_proc *Form_pg_proc;
#define PROARGMODE_VARIADIC 'v'
#define PROARGMODE_TABLE 't'
#ifdef USE_SPQ
#define PRODATAACCESS_NONE 'n'
#define PRODATAACCESS_ANY 'a'
#endif
#define PROC_LIB_PATH "$libdir/"
#define PORC_PLUGIN_LIB_PATH "$libdir/pg_plugin/"
#define PORC_SRC_LIB_PATH "$libdir/proc_srclib/"

View File

@ -158,6 +158,7 @@ typedef struct PlanTableEntry {
#define OPTIONSLEN 256
#define OBJECTLEN 31
#define PROJECTIONLEN 4001
#define SPQNODENAMELEN 256
/* plan_table_data column defination. */
typedef struct PlanTableData {
@ -473,6 +474,9 @@ typedef struct ExplainState {
bool is_explain_gplan;
char* opt_model_name;
ExplainFRSqlState es_frs; /* explain state for remote sql of foreign scan. */
#ifdef USE_SPQ
int current_id;
#endif
} ExplainState;
/* Hook for plugins to get control in explain_get_index_name() */

View File

@ -236,4 +236,7 @@ extern void SetPartionIndexType(IndexStmt* stmt, Relation rel, bool is_alter_tab
extern bool ConstraintSatisfyAutoIncrement(HeapTuple tuple, TupleDesc desc, AttrNumber attrnum, char contype);
extern void CheckRelAutoIncrementIndex(Oid relid, LOCKMODE lockmode);
extern void RebuildDependViewForProc(Oid proc_oid);
#ifdef USE_SPQ
extern void spq_btbuild_update_pg_class(Relation heap, Relation index);
#endif
#endif /* TABLECMDS_H */

View File

@ -74,6 +74,12 @@ public:
/* Get nodeIdx of producer by nodename. */
int getNodeIdx(const char* nodename);
#ifdef USE_SPQ
/* Get expectProducer nodeName */
char* getExpectProducerNodeName();
void setPstmt(PlannedStmt* p_stmt);
#endif
/* Get shared context for local stream. */
inline StreamSharedContext* getSharedContext()
{
@ -96,6 +102,9 @@ private:
void updateTransportInfo(StreamValue* val);
private:
#ifdef USE_SPQ
PlannedStmt* m_plan;
#endif
/* Current producer number. */
int m_currentProducerNum;

View File

@ -106,6 +106,11 @@ public:
/* Send tuple with Roundrobin. */
void roundRobinStream(TupleTableSlot* tuple, DestReceiver* self);
#ifdef USE_SPQ
/* Send batch with Roundrobin. */
void roundRobinStream(VectorBatch* batch);
#endif
/* Local roundrobin the tuple through memory. */
void localRoundRobinStream(TupleTableSlot* tuple);
@ -271,6 +276,9 @@ public:
m_threadInit = flag;
}
/* save expr context to producer. */
void setEcontext(ExprContext* econtext);
void setUniqueSQLKey(uint64 unique_sql_id, Oid unique_user_id, uint32 unique_cn_id);
void setGlobalSessionId(GlobalSessionId* globalSessionId);
void getGlobalSessionId(GlobalSessionId* globalSessionId);
@ -352,6 +360,9 @@ private:
template<int keyNum, int distrType>
void redistributeTupleChannel(TupleTableSlot* tuple);
template<int distrType>
void redistributeTupleChannelWithExpr(TupleTableSlot* tuple);
/* Choose which channel to send by hash value. */
template<int distrType>
inline int ChannelLocalizer(ScalarValue hashValue, int Dop, int nodeSize);
@ -508,6 +519,10 @@ private:
/* global session id */
GlobalSessionId m_globalSessionId;
bool m_hasExprKey;
List* m_exprkeystate;
ExprContext* m_econtext;
};
extern THR_LOCAL StreamProducer* streamProducer;

View File

@ -115,6 +115,9 @@ typedef struct StreamState {
int64* spill_size;
void* sortstate; /* merge sort for stream */
bool receive_message; /* The stream consumer has receive message from then producer */
#ifdef USE_SPQ
bool skip_direct_distribute_result;
#endif
} StreamState;
extern StreamState* ExecInitStream(Stream* node, EState* estate, int eflags);

View File

@ -841,4 +841,7 @@ private:
bool m_smpEnabled;
};
#ifdef USE_SPQ
extern bool IsJoinExprNull(List *joinExpr, ExprContext *econtext);
#endif
#endif /* EXECUTOR_H */

View File

@ -724,11 +724,15 @@ public:
/* get ThreadInstrumentation */
ThreadInstrumentation *getThreadInstrumentation(int idx, int planNodeId, int smpId)
{
ThreadInstrumentation *threadInstr =
#ifdef ENABLE_MULTIPLE_NODES
getThreadInstrumentationCN(idx, planNodeId, smpId);
ThreadInstrumentation *threadInstr = NULL;
#if defined(ENABLE_MULTIPLE_NODES) || defined(USE_SPQ)
if (t_thrd.spq_ctx.spq_role != ROLE_UTILITY) {
threadInstr = getThreadInstrumentationCN(idx, planNodeId, smpId);
} else {
threadInstr = getThreadInstrumentationDN(planNodeId, smpId);
}
#else
getThreadInstrumentationDN(planNodeId, smpId);
threadInstr = getThreadInstrumentationDN(planNodeId, smpId);
#endif /* ENABLE_MULTIPLE_NODES */
return threadInstr;
}

View File

@ -0,0 +1,28 @@
/* -------------------------------------------------------------------------
*
* nodeAssertOp.h
*
*
* Portions Copyright (c) 2023 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/executor/nodeAssertOp.h
*
* -------------------------------------------------------------------------
*/
#ifndef NODEASSERTOP_H
#define NODEASSERTOP_H
#ifdef USE_SPQ
#include "nodes/execnodes.h"
extern void ExecAssertOpExplainEnd(PlanState *planstate, struct StringInfoData *buf);
extern TupleTableSlot* ExecAssertOp(PlanState *node);
extern AssertOpState* ExecInitAssertOp(AssertOp *node, EState *estate, int eflags);
extern void ExecEndAssertOp(AssertOpState *node);
extern void ExecReScanAssertOp(AssertOpState *node);
#endif /* USE_SPQ */
#endif /* NODEASSERTOP_H */

View File

@ -29,6 +29,10 @@ extern HashJoinTable ExecHashTableCreate(Hash* node, List* hashOperators, bool k
extern void ExecHashTableDestroy(HashJoinTable hashtable);
extern void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot* slot, uint32 hashvalue, int planid, int dop,
Instrumentation* instrument = NULL);
#ifdef USE_SPQ
extern bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext* econtext, List* hashkeys, bool outer_tuple,
bool keep_nulls, uint32* hashvalue, bool *hashkeys_null);
#endif
extern bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext* econtext, List* hashkeys, bool outer_tuple,
bool keep_nulls, uint32* hashvalue);
extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int* bucketno, int* batchno);

View File

@ -0,0 +1,29 @@
/*-------------------------------------------------------------------------
*
* nodeSequence.h
* header file for nodeSequence.cpp.
*
* Portions Copyright (c) 2023 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 2012 - 2022, EMC/Greenplum
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
*
*
* IDENTIFICATION
* src/include/executor/node/nodeSequence.h
*
*-------------------------------------------------------------------------
*/
#ifndef NODESEQUENCE_H
#define NODESEQUENCE_H
#ifdef USE_SPQ
#include "executor/tuptable.h"
#include "nodes/execnodes.h"
extern SequenceState *ExecInitSequence(Sequence *node, EState *estate, int eflags);
extern TupleTableSlot *ExecSequence(PlanState *pstate);
extern void ExecReScanSequence(SequenceState *node);
extern void ExecEndSequence(SequenceState *node);
#endif /* USE_SPQ */
#endif

View File

@ -0,0 +1,38 @@
/*-------------------------------------------------------------------------
*
* nodeShareInputScan.h
*
* Portions Copyright (c) 2023 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 2012-2021 VMware, Inc. or its affiliates.
* Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/include/executor/node/nodeShareInputScan.h
*
*-------------------------------------------------------------------------
*/
#ifndef NODESHAREINPUTSCAN_H
#define NODESHAREINPUTSCAN_H
#ifdef USE_SPQ
#include "nodes/execnodes.h"
#include "storage/sharedfileset.h"
extern ShareInputScanState *ExecInitShareInputScan(ShareInputScan *node, EState *estate, int eflags);
extern void ExecEndShareInputScan(ShareInputScanState *node);
extern void ExecReScanShareInputScan(ShareInputScanState *node);
extern TupleTableSlot *ExecShareInputScan(PlanState *pstate);
extern Size ShareInputShmemSize(void);
extern void ShareInputShmemInit(void);
extern SharedFileSet *get_shareinput_fileset(void);
extern void tuplestore_make_shared(Tuplestorestate *state, SharedFileSet *fileset, const char *filename);
extern void tuplestore_freeze(Tuplestorestate *state);
extern Tuplestorestate *tuplestore_open_shared(SharedFileSet *fileset, const char *filename);
#endif /* USE_SPQ */
#endif /* NODESHAREINPUTSCAN_H */

View File

@ -0,0 +1,32 @@
/* -------------------------------------------------------------------------
*
* nodeSpqSeqscan.h
*
* Portions Copyright (c) 2023 Huawei Technologies Co.,Ltd.
*
* src/include/executor/node/nodeSpqSeqscan.h
*
* -------------------------------------------------------------------------
*/
#ifdef USE_SPQ
#ifndef NODESPQSEQSCAN_H
#define NODESPQSEQSCAN_H
#include "nodes/execnodes.h"
typedef SpqSeqScanState* (*init_spqscan_hook_type)(SpqSeqScan* node, EState* estate, int eflags);
typedef TupleTableSlot* (*exec_spqscan_hook_type)(PlanState* node);
typedef void (*end_spqscan_hook_type)(SpqSeqScanState* node);
typedef void (*spqscan_rescan_hook_type)(SpqSeqScanState* node);
extern THR_LOCAL init_spqscan_hook_type init_spqscan_hook;
extern THR_LOCAL exec_spqscan_hook_type exec_spqscan_hook;
extern THR_LOCAL end_spqscan_hook_type end_spqscan_hook;
extern THR_LOCAL spqscan_rescan_hook_type spqscan_rescan_hook;
// unchanged function compare with seqscan
extern void ExecSpqSeqMarkPos(SpqSeqScanState* node);
extern void ExecSpqSeqRestrPos(SpqSeqScanState* node);
#endif // NODESPQSEQSCAN_H
#endif

View File

@ -249,5 +249,12 @@ extern void heap_slot_getsomeattrs(TupleTableSlot* slot, int attnum);
extern bool heap_slot_attisnull(TupleTableSlot* slot, int attnum);
extern void heap_slot_formbatch(TupleTableSlot* slot, struct VectorBatch* batch, int cur_rows, int attnum);
#ifdef USE_SPQ
extern Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull);
extern void slot_getsomeattrs(TupleTableSlot *slot, int attnum);
extern void slot_getallattrs(TupleTableSlot *slot);
extern Datum heap_copy_tuple_as_datum(HeapTuple tuple, TupleDesc tupleDesc);
#endif
#endif /* !FRONTEND_PARSER */
#endif /* TUPTABLE_H */

View File

@ -444,4 +444,7 @@ extern void AdvanceFDWUpperPlan(FDWUpperRelCxt* ufdwCxt, UpperRelationKind stage
(strncmp(passwd, ENCRYPT_STR_PREFIX, strlen(ENCRYPT_STR_PREFIX)) == 0 && \
strlen(passwd) >= MIN_ENCRYPTED_PASSWORD_LENGTH)
#ifdef USE_SPQ
bool rel_is_external_table(Oid relid);
#endif
#endif /* FOREIGN_H */

View File

@ -154,6 +154,15 @@ typedef enum knl_thread_role {
SW_SENDER
} knl_thread_role;
#ifdef USE_SPQ
typedef enum {
ROLE_UTILITY = 0, /* Operating as a simple database engine */
ROLE_QUERY_COORDINTOR, /* Operating as the parallel query dispatcher */
ROLE_QUERY_EXECUTOR, /* Operating as a parallel query executor */
ROLE_UNDEFINED /* Should never see this role in use */
} SpqRole;
#endif
/*
* It is an 64bit identifier in Linux x64 system. There are many legacy
* code assumes the original pid is 32 bit where we replace with threadId.
@ -167,6 +176,9 @@ typedef struct knl_thread_arg {
char* save_para;
void* payload;
void* t_thrd;
#ifdef USE_SPQ
SpqRole spq_role;
#endif
union {
struct syslog_thread {
int syslog_handle;

View File

@ -51,5 +51,8 @@
#include "knl_guc/knl_instance_attr_resource.h"
#include "knl_guc/knl_session_attr_common.h"
#include "knl_guc/knl_instance_attr_common.h"
#ifdef USE_SPQ
#include "knl_guc/knl_session_attr_spq.h"
#endif
#endif /* SRC_INCLUDE_KNL_KNL_GUC_H_ */

View File

@ -0,0 +1,211 @@
/*
* Copyright (c) 2023 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* knl_session_attr_spq.h
* Data struct to store all knl_session_attr_spq GUC variables.
*
* When anyone try to added variable in this file, which means add a guc
* variable, there are several rules needed to obey:
*
* add variable to struct 'knl_@level@_attr_@group@'
*
* @level@:
* 1. instance: the level of guc variable is PGC_POSTMASTER.
* 2. session: the other level of guc variable.
*
* @group@: sql, storage, security, network, memory, resource, common, spq
* select the group according to the type of guc variable.
*
*
* IDENTIFICATION
* src/include/knl/knl_guc/knl_session_attr_spq.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef SRC_INCLUDE_KNL_KNL_SESSION_ATTR_MPP_H_
#define SRC_INCLUDE_KNL_KNL_SESSION_ATTR_MPP_H_
#include "knl/knl_guc/knl_guc_common.h"
#ifdef PGXC
#include "pgxc/nodemgr.h"
#endif
struct NodeDefinition;
typedef struct knl_session_attr_spq {
/* Optimizer related gucs */
bool gauss_enable_spq;
bool spq_optimizer_log;
int spq_optimizer_minidump;
int spq_optimizer_cost_model;
bool spq_optimizer_metadata_caching;
int spq_optimizer_mdcache_size;
bool spq_optimizer_use_gauss_allocators;
/* Optimizer debugging GUCs */
bool spq_optimizer_print_query;
bool spq_optimizer_print_plan;
bool spq_optimizer_print_xform;
bool spq_optimizer_print_xform_results;
bool spq_optimizer_print_memo_after_exploration;
bool spq_optimizer_print_memo_after_implementation;
bool spq_optimizer_print_memo_after_optimization;
bool spq_optimizer_print_job_scheduler;
bool spq_optimizer_print_expression_properties;
bool spq_optimizer_print_group_properties;
bool spq_optimizer_print_optimization_context;
bool spq_optimizer_print_optimization_stats;
bool spq_optimizer_print_optimization_cost;
/* array of xforms disable flags */
#define OPTIMIZER_XFORMS_COUNT 400 /* number of transformation rules */
bool spq_optimizer_xforms[OPTIMIZER_XFORMS_COUNT];
/* GUCs to tell Optimizer to enable a physical operator */
bool spq_optimizer_enable_nljoin;
bool spq_optimizer_enable_indexjoin;
bool spq_optimizer_enable_motions_masteronly_queries;
bool spq_optimizer_enable_motions;
bool spq_optimizer_enable_motion_broadcast;
bool spq_optimizer_enable_motion_gather;
bool spq_optimizer_enable_motion_redistribute;
bool spq_optimizer_discard_redistribute_hashjoin;
bool spq_optimizer_enable_sort;
bool spq_optimizer_enable_materialize;
bool spq_optimizer_enable_partition_propagation;
bool spq_optimizer_enable_partition_selection;
bool spq_optimizer_enable_outerjoin_rewrite;
bool spq_optimizer_enable_multiple_distinct_aggs;
bool spq_optimizer_enable_direct_dispatch;
bool spq_optimizer_enable_hashjoin_redistribute_broadcast_children;
bool spq_optimizer_enable_broadcast_nestloop_outer_child;
bool spq_optimizer_enable_streaming_material;
bool spq_optimizer_enable_gather_on_segment_for_dml;
bool spq_optimizer_enable_assert_maxonerow;
bool spq_optimizer_enable_constant_expression_evaluation;
bool spq_optimizer_enable_bitmapscan;
bool spq_optimizer_enable_outerjoin_to_unionall_rewrite;
bool spq_optimizer_enable_ctas;
bool spq_optimizer_enable_partial_index;
bool spq_optimizer_enable_dml;
bool spq_optimizer_enable_dml_triggers;
bool spq_optimizer_enable_dml_constraints;
bool spq_optimizer_enable_master_only_queries;
bool spq_optimizer_enable_hashjoin;
bool spq_optimizer_enable_dynamictablescan;
bool spq_optimizer_enable_indexscan;
bool spq_optimizer_enable_indexonlyscan;
bool spq_optimizer_enable_tablescan;
bool spq_optimizer_enable_seqsharescan;
bool spq_optimizer_enable_shareindexscan;
bool spq_optimizer_enable_hashagg;
bool spq_optimizer_enable_groupagg;
bool spq_optimizer_expand_fulljoin;
bool spq_optimizer_enable_mergejoin;
bool spq_optimizer_prune_unused_columns;
bool spq_optimizer_enable_redistribute_nestloop_loj_inner_child;
bool spq_optimizer_force_comprehensive_join_implementation;
bool spq_optimizer_enable_replicated_table;
/* Optimizer plan enumeration related GUCs */
bool spq_optimizer_enumerate_plans;
bool spq_optimizer_sample_plans;
int spq_optimizer_plan_id;
int spq_optimizer_samples_number;
/* Cardinality estimation related GUCs used by the Optimizer */
bool spq_optimizer_extract_dxl_stats;
bool spq_optimizer_extract_dxl_stats_all_nodes;
bool spq_optimizer_print_missing_stats;
double spq_optimizer_damping_factor_filter;
double spq_optimizer_damping_factor_join;
double spq_optimizer_damping_factor_groupby;
bool spq_optimizer_dpe_stats;
bool spq_optimizer_enable_derive_stats_all_groups;
/* Costing related GUCs used by the Optimizer */
int spq_optimizer_segments;
int spq_optimizer_penalize_broadcast_threshold;
double spq_optimizer_cost_threshold;
double spq_optimizer_nestloop_factor;
double spq_optimizer_sort_factor;
double spq_optimizer_share_tablescan_factor;
double spq_optimizer_share_indexscan_factor;
double spq_optimizer_hashjoin_spilling_mem_threshold;
double spq_optimizer_hashjoin_inner_cost_factor;
/* Optimizer hints */
int spq_optimizer_join_arity_for_associativity_commutativity;
int spq_optimizer_array_expansion_threshold;
int spq_optimizer_join_order_threshold;
int spq_optimizer_join_order;
int spq_optimizer_cte_inlining_bound;
int spq_optimizer_push_group_by_below_setop_threshold;
int spq_optimizer_xform_bind_threshold;
int spq_optimizer_skew_factor;
bool spq_optimizer_force_multistage_agg;
bool spq_optimizer_force_three_stage_scalar_dqa;
bool spq_optimizer_force_expanded_distinct_aggs;
bool spq_optimizer_force_agg_skew_avoidance;
bool spq_optimizer_penalize_skew;
bool spq_optimizer_prune_computed_columns;
bool spq_optimizer_push_requirements_from_consumer_to_producer;
bool spq_optimizer_enforce_subplans;
bool spq_optimizer_use_external_constant_expression_evaluation_for_ints;
bool spq_optimizer_apply_left_outer_to_union_all_disregarding_stats;
bool spq_optimizer_remove_superfluous_order;
bool spq_optimizer_remove_order_below_dml;
bool spq_optimizer_multilevel_partitioning;
bool spq_optimizer_parallel_union;
bool spq_optimizer_array_constraints;
bool spq_optimizer_cte_inlining;
bool spq_optimizer_enable_space_pruning;
bool spq_optimizer_enable_associativity;
bool spq_optimizer_enable_eageragg;
bool spq_optimizer_enable_orderedagg;
bool spq_optimizer_enable_range_predicate_dpe;
bool spq_enable_pre_optimizer_check;
bool spq_enable_result_hash_filter;
bool spq_debug_print_full_dtm;
bool spq_debug_cancel_print;
bool spq_print_direct_dispatch_info;
bool spq_log_dispatch_stats;
int spq_scan_unit_size;
int spq_scan_unit_bit;
char *gauss_cluster_map;
/* enable spq btbuild */
bool spq_enable_btbuild;
bool spq_enable_btbuild_cic;
int spq_batch_size;
int spq_mem_size;
int spq_queue_size;
} knl_session_attr_spq;
/* TODO SPQ Thread Role*/
typedef struct knl_t_spq_context {
SpqRole spq_role;
uint64 spq_session_id;
int current_id;
bool skip_direct_distribute_result;
int num_nodes;
NodeDefinition* nodesDefinition;
} knl_t_spq_context;
#endif /* SRC_INCLUDE_KNL_KNL_SESSION_ATTR_MPP_H_ */

View File

@ -82,6 +82,9 @@ typedef struct knl_session_attr {
knl_session_attr_memory attr_memory;
knl_session_attr_resource attr_resource;
knl_session_attr_common attr_common;
#ifdef USE_SPQ
knl_session_attr_spq attr_spq;
#endif
} knl_session_attr;
typedef struct knl_u_stream_context {
@ -2660,6 +2663,54 @@ typedef struct knl_u_mot_context {
} knl_u_mot_context;
#endif
#ifdef USE_SPQ
namespace spqdxl {
class CDXLMemoryManager;
class CDXLTokens;
}
namespace spqos {
class CMemoryPool;
class CMemoryPoolManager;
class CWorkerPoolManager;
template <class T, class K>class CCache;
}
namespace spqmd {
class IMDCacheObject;
}
namespace spqopt {
class CMDKey;
}
typedef struct knl_u_spq_context {
/* dxl information */
spqdxl::CDXLMemoryManager* dxl_memory_manager;
spqos::CMemoryPool* pmpXerces;
spqos::CMemoryPool* pmpDXL;
uintptr_t m_ulpInitDXL;
uintptr_t m_ulpShutdownDXL;
void *m_pstrmap;
void *m_pxmlszmap;
spqos::CMemoryPool* m_mp;
spqdxl::CDXLMemoryManager* m_dxl_memory_manager;
/* memory pool manager */
spqos::CMemoryPoolManager* m_memory_pool_mgr;
/* worker pool manager */
spqos::CWorkerPoolManager* m_worker_pool_manager;
/* mdcache */
spqos::CCache<spqmd::IMDCacheObject *, spqopt::CMDKey *> *m_pcache;
uint64 m_ullCacheQuota;
int spq_node_all_configs_size;
int spq_node_configs_size;
MemoryContext spq_worker_context;
MemoryContext s_tupSerMemCtxt;
int32 spq_max_tuple_chunk_size;
} knl_u_spq_context;
#endif
typedef struct knl_u_gtt_context {
bool gtt_cleaner_exit_registered;
HTAB* gtt_storage_local_hash;
@ -2910,6 +2961,10 @@ typedef struct knl_session_context {
knl_u_mot_context mot_cxt;
#endif
#ifdef USE_SPQ
knl_u_spq_context spq_cxt;
#endif
/* instrumentation */
knl_u_unique_sql_context unique_sql_cxt;
knl_u_user_login_context user_login_cxt;

View File

@ -939,6 +939,10 @@ typedef struct knl_t_shemem_ptr_context {
struct HTAB* undoGroupLinkMap;
/* Maintain an image of DCF paxos index file */
struct DCFData *dcfData;
#ifdef USE_SPQ
/* shared memory hash table holding 'shareinput_Xslice_state' entries */
HTAB *shareinput_Xslice_hash;
#endif
} knl_t_shemem_ptr_context;
typedef struct knl_t_cstore_context {
@ -3559,6 +3563,9 @@ typedef struct knl_thrd_context {
knl_t_dms_context dms_cxt;
knl_t_ondemand_xlog_copy_context ondemand_xlog_copy_cxt;
knl_t_rc_context rc_cxt;
#ifdef USE_SPQ
knl_t_spq_context spq_ctx;
#endif
knl_t_dms_auxiliary_context dms_aux_cxt;
} knl_thrd_context;

View File

@ -136,6 +136,7 @@ extern const uint32 INDEX_HINT_VERSION_NUM;
extern const uint32 CREATE_TABLE_AS_VERSION_NUM;
extern const uint32 GB18030_2022_VERSION_NUM;
extern const uint32 PARTITION_ACCESS_EXCLUSIVE_LOCK_UPGRADE_VERSION;
extern const uint32 SPQ_VERSION_NUM;
extern void register_backend_version(uint32 backend_version);
extern bool contain_backend_version(uint32 version_number);

View File

@ -722,6 +722,9 @@ typedef struct EState {
bool have_current_xact_date; /* Check whether dirty reads exist in the cursor rollback scenario. */
int128 first_autoinc; /* autoinc has increased during this execution */
int result_rel_index; /* which result_rel_info to be excuted when multiple-relation modified. */
#ifdef USE_SPQ
List *es_sharenode;
#endif
} EState;
/*
@ -1834,6 +1837,43 @@ typedef struct ScanState {
*/
typedef ScanState SeqScanState;
#ifdef USE_SPQ
/*
* SpqSeqScanState
*/
typedef struct SpqSeqScanState {
SeqScanState ss;
void* pageManager;
void* blockManager;
} SpqSeqScanState;
typedef struct AssertOpState {
PlanState ps;
} AssertOpState;
/* ----------------
* State of each scanner of the ShareInput node
* ----------------
*/
typedef struct ShareInputScanState {
ScanState ss;
Tuplestorestate *ts_state;
int ts_pos;
struct shareinput_local_state *local_state;
struct shareinput_Xslice_reference *ref;
bool isready;
} ShareInputScanState;
typedef struct SequenceState {
PlanState ps;
PlanState **subplans;
int numSubplans;
/*
* True if no subplan has been executed.
*/
bool initState;
} SequenceState;
#endif
/*
* These structs store information about index quals that don't have simple
* constant right-hand sides. See comments for ExecIndexBuildScanKeys()
@ -2204,6 +2244,12 @@ typedef struct NestLoopState {
bool nl_MatchedOuter;
bool nl_MaterialAll;
TupleTableSlot* nl_NullInnerTupleSlot;
#ifdef USE_SPQ
List *nl_InnerJoinKeys; /* list of ExprState nodes */
List *nl_OuterJoinKeys; /* list of ExprState nodes */
bool nl_innerSideScanned; /* set to true once we've scanned all inner tuples the first time */
bool prefetch_inner;
#endif
} NestLoopState;
/* ----------------
@ -2381,6 +2427,12 @@ typedef struct HashJoinState {
bool hj_streamBothSides;
bool hj_rebuildHashtable;
List* hj_hashCollations; /* list of collations OIDs */
#ifdef USE_SPQ
bool hj_nonequijoin; /* set true if force hash table to keep nulls */
bool hj_InnerEmpty; /* set to true if inner side is empty */
bool prefetch_inner;
bool is_set_op_join;
#endif
} HashJoinState;
/* ----------------------------------------------------------------
@ -2480,6 +2532,9 @@ typedef struct AggState {
AggStatePerAgg peragg; /* per-Aggref information */
MemoryContext* aggcontexts; /* memory context for long-lived data */
ExprContext* tmpcontext; /* econtext for input expressions */
#ifdef USE_SPQ
AggSplit aggsplittype; /* agg-splitting mode, see nodes.h */
#endif
AggStatePerAgg curperagg; /* identifies currently active aggregate */
bool input_done; /* indicates end of input */
bool agg_done; /* indicates completion of Agg scan */
@ -2628,7 +2683,11 @@ typedef struct HashState {
List* hashkeys; /* list of ExprState nodes */
int32 local_work_mem; /* work_mem local for this hash join */
int64 spill_size;
#ifdef USE_SPQ
bool hs_keepnull; /* Keep nulls */
bool hs_quit_if_hashkeys_null; /* quit building hash table if hashkeys are all null */
bool hs_hashkeys_null; /* found an instance wherein hashkeys are all null */
#endif
/* hashkeys is same as parent's hj_InnerHashKeys */
} HashState;

View File

@ -45,6 +45,10 @@ typedef enum NodeTag {
* TAGS FOR PLAN NODES (plannodes.h)
*/
T_Plan = 100,
#ifdef USE_SPQ
T_Plan_Start,
T_Result,
#endif
T_BaseResult,
T_ProjectSet,
T_ModifyTable,
@ -57,6 +61,9 @@ typedef enum NodeTag {
T_BitmapOr,
T_Scan,
T_SeqScan,
#ifdef USE_SPQ
T_SpqSeqScan,
#endif
T_IndexScan,
T_IndexOnlyScan,
T_BitmapIndexScan,
@ -114,6 +121,18 @@ typedef enum NodeTag {
T_CreateAppWorkloadGroupMappingStmt,
T_AlterAppWorkloadGroupMappingStmt,
T_DropAppWorkloadGroupMappingStmt,
#endif
#ifdef USE_SPQ
T_Sequence,
T_DynamicSeqScan,
T_DynamicBitmapHeapScan,
T_Motion,
T_ShareInputScan,
T_SplitUpdate,
T_AssertOp,
T_PartitionSelector,
T_PartitionPruneInfo,
T_Plan_End,
#endif
/* these aren't subclasses of Plan: */
T_NestLoopParam,
@ -159,6 +178,12 @@ typedef enum NodeTag {
T_BitmapOrState,
T_ScanState,
T_SeqScanState,
#ifdef USE_SPQ
T_SpqSeqScanState,
T_AssertOpState,
T_ShareInputScanState,
T_SequenceState,
#endif
T_IndexScanState,
T_IndexOnlyScanState,
T_BitmapIndexScanState,
@ -572,6 +597,9 @@ typedef enum NodeTag {
T_Constraint,
T_DefElem,
T_RangeTblEntry,
#ifdef USE_SPQ
T_RangeTblFunction,
#endif
T_WithCheckOption,
T_TableSampleClause,
T_TimeCapsuleClause,
@ -829,6 +857,9 @@ typedef enum NodeTag {
T_CondInterval,
T_IndexCI,
T_RelCI,
#ifdef USE_SPQ
T_GpPolicy,
#endif
T_CentroidPoint,
T_UserSetElem,
T_UserVar,
@ -1066,8 +1097,12 @@ typedef enum JoinType {
JOIN_RIGHT_ANTI, /* Right Anti join */
JOIN_LEFT_ANTI_FULL, /* unmatched LHS tuples */
JOIN_RIGHT_ANTI_FULL /* unmatched RHS tuples */
JOIN_RIGHT_ANTI_FULL, /* unmatched RHS tuples */
#ifdef USE_SPQ
JOIN_LASJ_NOTIN, /* Left Anti Semi Join with Not-In semantics: */
/* If any NULL values are produced by inner side, */
/* return no join results. Otherwise, same as LASJ */
#endif
/*
* We might need additional join types someday.
*/
@ -1118,5 +1153,38 @@ struct TdigestData {
double valuetoc;
CentroidPoint nodes[0];
};
#ifdef USE_SPQ
#define AGGSPLITOP_COMBINE 0x01 /* substitute combinefn for transfn */
#define AGGSPLITOP_SKIPFINAL 0x02 /* skip finalfn, return state as-is */
#define AGGSPLITOP_SERIALIZE 0x04 /* apply serializefn to output */
#define AGGSPLITOP_DESERIALIZE 0x08 /* apply deserializefn to input */
#define AGGSPLITOP_DEDUPLICATED 0x100
/* Supported operating modes (i.e., useful combinations of these options): */
typedef enum AggSplit {
/* Basic, non-split aggregation: */
AGGSTAGE_NORMAL = 0,
/* Initial phase of partial aggregation, with serialization: */
AGGSTAGE_PARTIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE,
/* Final phase of partial aggregation, with deserialization: */
AGGSTAGE_FINAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE,
/*
* The inputs have already been deduplicated for DISTINCT.
* This is internal to the planner, it is never set on Aggrefs, and is
* stripped away from Aggs in setrefs.c.
*/
AGGSTAGE_DEDUPLICATED = AGGSPLITOP_DEDUPLICATED,
AGGSTAGE_INTERMEDIATE = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE | AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE,
} AggSplit;
/* Test whether an AggSplit value selects each primitive option: */
#define DO_AGGSPLIT_COMBINE(as) (((as) & AGGSPLITOP_COMBINE) != 0)
#define DO_AGGSPLIT_SKIPFINAL(as) (((as) & AGGSPLITOP_SKIPFINAL) != 0)
#define DO_AGGSPLIT_SERIALIZE(as) (((as) & AGGSPLITOP_SERIALIZE) != 0)
#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)
#endif
#endif /* NODES_H */

View File

@ -207,9 +207,15 @@ typedef enum RTEKind {
#ifdef PGXC
RTE_REMOTE_DUMMY, /* RTEs created by remote plan reduction */
#endif /* PGXC */
RTE_RESULT /* RTE represents an empty FROM clause; such
RTE_RESULT, /* RTE represents an empty FROM clause; such
* RTEs are added by the planner, they're not
* present during parsing or rewriting */
#ifdef USE_SPQ
RTE_NAMEDTUPLESTORE,
RTE_TABLEFUNC, /* TableFunc(.., column list) */
RTE_VOID, /* CDB: deleted RTE */
RTE_TABLEFUNCTION /* CDB: Functions over multiset input */
#endif
} RTEKind;
typedef struct RangeTblEntry {
@ -374,6 +380,9 @@ typedef struct RangeTblEntry {
* Select * from table_name subpartition (subpartition_name);
* or delete from table_name partition (partition_name, ...)
*/
#ifdef USE_SPQ
bool forceDistRandom;
#endif
} RangeTblEntry;
/*
@ -2387,6 +2396,24 @@ typedef struct GetDiagStmt {
List *condNum;
} GetDiagStmt;
#ifdef USE_SPQ
typedef struct RangeTblFunction {
NodeTag type;
Node *funcexpr; /* expression tree for func call */
int funccolcount; /* number of columns it contributes to RTE */
/* These fields record the contents of a column definition list, if any: */
List *funccolnames; /* column names (list of String) */
List *funccoltypes; /* OID list of column type OIDS */
List *funccoltypmods; /* integer list of column typmods */
List *funccolcollations; /* OID list of column collation OIDS */
bytea *funcuserdata; /* describe function user data. assume bytea */
/* This is set during planning for use by the executor: */
Bitmapset *funcparams; /* PARAM_EXEC Param IDs affecting this func */
} RangeTblFunction;
#endif
extern inline NodeTag transform_node_tag(Node* raw_parse_tree)
{
if (!raw_parse_tree) {

View File

@ -1957,7 +1957,13 @@ typedef struct RightRefState {
/* ****************************************************************************
* Query Tree
* *************************************************************************** */
#ifdef USE_SPQ
typedef uint8 ParentStmtType;
#define PARENTSTMTTYPE_NONE 0
#define PARENTSTMTTYPE_CTAS 1
#define PARENTSTMTTYPE_COPY 2
#define PARENTSTMTTYPE_REFRESH_MATVIEW 3
#endif
/*
* Query -
* Parse analysis turns all statements into a Query tree
@ -2090,6 +2096,12 @@ typedef struct Query {
RightRefState* rightRefState;
List* withCheckOptions; /* a list of WithCheckOption's */
List* indexhintList; /* a list of b mode index hint members */
#ifdef USE_SPQ
void* intoPolicy;
ParentStmtType parentStmtType;
bool is_support_spq;
#endif
} Query;
/* ----------------------

View File

@ -194,6 +194,11 @@ typedef struct PlannedStmt {
uint64 uniqueSQLId;
uint32 cause_type; /* Possible Slow SQL Risks in the Plan. */
#ifdef USE_SPQ
uint64 spq_session_id;
int current_id;
bool is_spq_optmized;
#endif
} PlannedStmt;
typedef struct NodeGroupInfoContext {
@ -635,6 +640,18 @@ typedef struct Scan {
*/
typedef Scan SeqScan;
#ifdef USE_SPQ
/* ----------------
* Spq scan node
* ----------------
*/
typedef struct SpqSeqScan {
SeqScan scan;
bool isFullTableScan;
bool isAdaptiveScan;
bool isDirectRead;
} SpqSeqScan;
#endif
/*
* ==========
* Column Store Scan nodes
@ -1059,6 +1076,10 @@ typedef struct Join {
List* nulleqqual;
uint32 skewoptimize;
#ifdef USE_SPQ
bool prefetch_inner; /* to avoid deadlock in spq */
bool is_set_op_join;
#endif
} Join;
/* ----------------
@ -1107,6 +1128,9 @@ typedef struct MergeJoin {
Oid* mergeCollations; /* per-clause OIDs of collations */
int* mergeStrategies; /* per-clause ordering (ASC or DESC) */
bool* mergeNullsFirst; /* per-clause nulls ordering */
#ifdef USE_SPQ
bool unique_outer; /*CDB-OLAP true => outer is unique in merge key */
#endif
} MergeJoin;
typedef struct VecMergeJoin : public MergeJoin {
@ -1125,6 +1149,9 @@ typedef struct HashJoin {
OpMemInfo mem_info; /* Memory info for inner hash table */
double joinRows;
List* hash_collations;
#ifdef USE_SPQ
List *hashqualclauses;
#endif
} HashJoin;
/* ----------------
@ -1135,6 +1162,10 @@ typedef struct Material {
Plan plan;
bool materialize_all; /* if all data should be materialized at the first time */
OpMemInfo mem_info; /* Memory info for material */
#ifdef USE_SPQ
bool spq_strict;
bool spq_shield_child_from_rescans;
#endif
} Material;
typedef struct VecMaterial : public Material {
@ -1233,6 +1264,9 @@ typedef enum SAggMethod {
typedef struct Agg {
Plan plan;
AggStrategy aggstrategy;
#ifdef USE_SPQ
AggSplit aggsplittype; /* agg-splitting mode, see nodes.h */
#endif
int numCols; /* number of grouping columns */
AttrNumber* grpColIdx; /* their indexes in the target list */
Oid* grpOperators; /* equality operators to compare with */
@ -1535,5 +1569,201 @@ typedef struct TrainModel {
MemoryContext cxt; // to store models
} TrainModel;
#ifdef USE_SPQ
/* ----------------
* Result node -
* If no outer plan, evaluate a variable-free targetlist.
* If outer plan, return tuples from outer plan (after a level of
* projection as shown by targetlist).
*
* If resconstantqual isn't NULL, it represents a one-time qualification
* test (i.e., one that doesn't depend on any variables from the outer plan,
* so needs to be evaluated only once).
*
* If numHashFilterCols is non-zero, we compute a mpphash value based
* on the columns listed in hashFilterColIdx for each input row. If the
* target segment based on the hash doesn't match the current execution
* segment, the row is discarded.
* ----------------
*/
typedef struct Result {
Plan plan;
Node *resconstantqual;
int numHashFilterCols;
AttrNumber *hashFilterColIdx;
Oid *hashFilterFuncs;
} Result;
/* -------------------------
* motion node structs
* -------------------------
*/
typedef enum MotionType {
MOTIONTYPE_GATHER, /* Send tuples from N senders to one receiver */
MOTIONTYPE_GATHER_SINGLE, /* Execute subplan on N nodes, but only send the tuples from one */
MOTIONTYPE_HASH, /* Use hashing to select a worker_idx destination */
MOTIONTYPE_BROADCAST, /* Send tuples from one sender to a fixed set of worker_idxes */
MOTIONTYPE_EXPLICIT, /* Send tuples to the segment explicitly specified in their segid column */
MOTIONTYPE_OUTER_QUERY /* Gather or Broadcast to outer query's slice, don't know which one yet */
} MotionType;
/*
* Motion Node
*/
typedef struct Motion {
Plan plan;
MotionType motionType;
bool sendSorted; /* if true, output should be sorted */
int motionID; /* required by AMS */
/* For Hash */
List *hashExprs; /* list of hash expressions */
Oid *hashFuncs; /* corresponding hash functions */
int numHashSegments; /* the module number of the hash function */
/* For Explicit */
AttrNumber segidColIdx; /* index of the segid column in the target list */
/* The following field is only used when sendSorted == true */
int numSortCols; /* number of sort-key columns */
AttrNumber *sortColIdx; /* their indexes in the target list */
Oid *sortOperators; /* OIDs of operators to sort them by */
Oid *collations; /* OIDs of collations */
bool *nullsFirst; /* NULLS FIRST/LAST directions */
/* sender slice info */
//PlanSlice *senderSliceInfo;
} Motion;
/*
* Sequence node
* Execute a list of subplans in the order of left-to-right, and return
* the results of the last subplan.
*/
typedef struct Sequence {
Plan plan;
List *subplans;
} Sequence;
/*
* PartitionPruneInfo - Details required to allow the executor to prune
* partitions.
*
* Here we store mapping details to allow translation of a partitioned table's
* index as returned by the partition pruning code into subplan indexes for
* plan types which support arbitrary numbers of subplans, such as Append.
* We also store various details to tell the executor when it should be
* performing partition pruning.
*
* Each PartitionedRelPruneInfo describes the partitioning rules for a single
* partitioned table (a/k/a level of partitioning). Since a partitioning
* hierarchy could contain multiple levels, we represent it by a List of
* PartitionedRelPruneInfos, where the first entry represents the topmost
* partitioned table and additional entries represent non-leaf child
* partitions, ordered such that parents appear before their children.
* Then, since an Append-type node could have multiple partitioning
* hierarchies among its children, we have an unordered List of those Lists.
*
* prune_infos List of Lists containing PartitionedRelPruneInfo nodes,
* one sublist per run-time-prunable partition hierarchy
* appearing in the parent plan node's subplans.
* other_subplans Indexes of any subplans that are not accounted for
* by any of the PartitionedRelPruneInfo nodes in
* "prune_infos". These subplans must not be pruned.
*/
typedef struct PartitionPruneInfo {
NodeTag type;
List *prune_infos;
Bitmapset *other_subplans;
} PartitionPruneInfo;
/* ----------------
* PartitionSelector node
*
* PartitionSelector performs partition pruning based on rows seen on
* the "other" side of a join. It performs partition pruning similar to
* run-time partition pruning in an Append node, but it is performed based
* on the rows seen, instead of executor params. The set of surviving
* partitions is made available to the Append node, by storing it in a
* special executor param, identified by 'paramid' field.
* ----------------
*/
typedef struct PartitionSelector {
Plan plan;
struct PartitionPruneInfo *part_prune_info;
int32 paramid; /* result is stored here */
} PartitionSelector;
/* ----------------
* shareinputscan node
* ----------------
*/
typedef struct ShareInputScan {
Scan scan;
bool cross_slice;
int share_id;
/*
* Slice that produces the tuplestore for this shared scan.
*
* As a special case, in a plan that has only one slice, this may be left
* to -1. The executor node ignores this when there is only one slice.
*/
int producer_slice_id;
/*
* Slice id that this ShareInputScan node runs in. If it's
* different from current slice ID, this ShareInputScan is "alien"
* to the current slice and doesn't need to be executed at all (in
* this slice). It is used to skip IPC in alien nodes.
*
* Like producer_slice_id, this can be left to -1 if there is only one
* slice in the plan tree.
*/
int this_slice_id;
/* Number of consumer slices participating, not including the producer. */
int nconsumers;
/* Discard the scan output? True for ORCA CTE producer, false otherwise. */
bool discard_output;
bool is_producer;
} ShareInputScan;
/*
* SplitUpdate Node
*
*/
typedef struct SplitUpdate {
Plan plan;
AttrNumber actionColIdx; /* index of action column into the target list */
AttrNumber tupleoidColIdx; /* index of tuple oid column into the target list */
AttrNumber ctidColIdx;
List *insertColIdx; /* list of columns to INSERT into the target list */
List *deleteColIdx; /* list of columns to DELETE into the target list */
/*
* Fields for calculating the target segment id.
*
* If the targetlist contains a 'gp_segment_id' field, these fields are
* used to compute the target segment id, for INSERT-action rows.
*/
int numHashAttrs;
AttrNumber *hashAttnos;
Oid *hashFuncs; /* corresponding hash functions */
int numHashSegments; /* # of segs to use in hash computation */
} SplitUpdate;
typedef struct AssertOp {
Plan plan;
int errcode; /* SQL error code */
List *errmessage; /* error message */
} AssertOp;
#endif /* USE_SPQ */
#endif /* PLANNODES_H */

View File

@ -285,6 +285,9 @@ typedef struct Aggref {
bool agghas_collectfn; /* is collection function available */
int8 aggstage; /* in which stage this aggref is in */
#endif /* PGXC */
#ifdef USE_SPQ
AggSplit aggsplittype; /* expected agg-splitting mode of parent Agg */
#endif
List* aggdirectargs; /* direct arguments, if an ordered-set agg */
List* args; /* arguments and sort expressions */
List* aggorder; /* ORDER BY (list of SortGroupClause) */
@ -352,6 +355,9 @@ typedef struct WindowFunc {
bool winstar; /* TRUE if argument list was really '*' */
bool winagg; /* is function a simple aggregate? */
int location; /* token location, or -1 if unknown */
#ifdef USE_SPQ
bool windistinct; /* TRUE if it's agg(DISTINCT ...) */
#endif
} WindowFunc;
/*
@ -600,6 +606,9 @@ typedef enum SubLinkType {
ROWCOMPARE_SUBLINK,
EXPR_SUBLINK,
ARRAY_SUBLINK,
#ifdef USE_SPQ
NOT_EXISTS_SUBLINK, /* spq uses NOT_EXIST_SUBLINK to implement correlated left anti semijoin. */
#endif
CTE_SUBLINK /* for SubPlans only */
} SubLinkType;
@ -678,6 +687,10 @@ typedef struct SubPlan {
/* Estimated execution costs: */
Cost startup_cost; /* one-time setup cost */
Cost per_call_cost; /* cost for each subplan evaluation */
#ifdef USE_SPQ
bool is_initplan; /* SPQ: Is the subplan implemented as an initplan? */
bool is_multirow; /* SPQ: May the subplan return more than one row? */
#endif
} SubPlan;
/*

View File

@ -25,6 +25,47 @@
#include "optimizer/bucketinfo.h"
#ifdef USE_SPQ
/*
* ApplyShareInputContext is used in different stages of ShareInputScan
* processing. This is mostly used as working area during the stages, but
* some information is also carried through multiple stages.
*/
typedef struct ApplyShareInputContextPerShare {
int producer_slice_id;
Bitmapset *participant_slices;
} ApplyShareInputContextPerShare;
struct PlanSlice;
struct Plan;
typedef struct ApplyShareInputContext {
/* curr_rtable is used by all stages when traversing into subqueries */
List *curr_rtable;
/*
* Populated in dag_to_tree() (or collect_shareinput_producers() for ORCA),
* used in replace_shareinput_targetlists()
*/
Plan **shared_plans;
int shared_input_count;
/*
* State for replace_shareinput_targetlists()
*/
int *share_refcounts;
int share_refcounts_sz; /* allocated sized of 'share_refcounts' */
/*
* State for apply_sharinput_xslice() walkers.
*/
PlanSlice *slices; /* root->glob->slices */
List *motStack; /* stack of motionIds leading to current node */
ApplyShareInputContextPerShare *shared_inputs; /* one for each share */
Bitmapset *qdShares; /* share_ids that are referenced from QD slices */
} ApplyShareInputContext;
#endif
/*
* Determines if query has to be launched
* on Coordinators only (SEQUENCE DDL),
@ -219,6 +260,9 @@ typedef struct PlannerGlobal {
/* There is a counter attempt to get name for sublinks */
int sublink_counter;
#ifdef USE_SPQ
ApplyShareInputContext share; /* workspace for GPDB plan sharing */
#endif
} PlannerGlobal;
/* macro for fetching the Plan associated with a SubPlan node */

View File

@ -50,6 +50,11 @@ typedef struct {
List* active_fns;
Node* case_val;
bool estimate;
#ifdef USE_SPQ
bool recurse_queries; /* recurse into query structures */
bool recurse_sublink_testexpr; /* recurse into sublink test expressions */
Size max_size; /* max constant binary size in bytes, 0: no restrictions */
#endif
} eval_const_expressions_context;
typedef enum { UNIQUE_CONSTRAINT, NOT_NULL_CONSTRAINT } constraintType;
@ -157,5 +162,10 @@ extern List* get_quals_lists(Node *jtnode);
extern bool isTableofType(Oid typeOid, Oid* base_oid, Oid* indexbyType);
extern Expr* simplify_function(Oid funcid, Oid result_type, int32 result_typmod, Oid result_collid, Oid input_collid,
List** args_p, bool process_args, bool allow_non_const, eval_const_expressions_context* context);
#ifdef USE_SPQ
extern Query *fold_constants(PlannerInfo *root, Query *q, ParamListInfo boundParams, Size max_size);
extern Query *flatten_join_alias_var_optimizer(Query *query, int queryLevel);
extern Expr *transform_array_Const_to_ArrayExpr(Const *c);
#endif
#endif /* CLAUSES_H */

View File

@ -144,6 +144,9 @@ typedef struct {
* in some scenarios, e.g. assignment of relationOids in fix_expr_common.
*/
List* relationOids; /* contain OIDs of relations the plan depends on */
#ifdef USE_SPQ
int streamID; /* required by AMS */
#endif
} RemoteQuery;
extern Plan* create_remote_mergeinto_plan(PlannerInfo* root, Plan* topplan, CmdType cmdtyp, MergeAction* action);

View File

@ -55,4 +55,8 @@ extern bool HasStoredGeneratedColumns(const PlannerInfo *root, Index rti);
extern PlannerInfo *get_cte_root(PlannerInfo *root, int levelsup, char *ctename);
#ifdef USE_SPQ
extern double spq_estimate_partitioned_numtuples(Relation rel);
#endif
#endif /* PLANCAT_H */

View File

@ -210,4 +210,9 @@ extern List* find_all_internal_tableOids(Oid parentOid);
extern bool check_agg_optimizable(Aggref* aggref, int16* strategy);
extern void check_hashjoinable(RestrictInfo* restrictinfo);
#ifdef USE_SPQ
extern void spq_extract_plan_dependencies(PlannerInfo *root, Plan *plan);
extern List* spq_make_null_eq_clause(List* joinqual, List** otherqual, List* nullinfo);
#endif
#endif /* PLANMAIN_H */

View File

@ -173,4 +173,29 @@ extern bool plan_tree_walker(Node* node, MethodWalker walker, void* context);
extern bool IsBlockedJoinNode(Plan* node);
#ifdef USE_SPQ
extern void plan_tree_base_subplan_put_plan(plan_tree_base_prefix *base, SubPlan *subplan, Plan *plan);
/*
* Rewrite the Plan associated with a SubPlan node during planning.
*/
static inline void planner_subplan_put_plan(struct PlannerInfo *root, SubPlan *subplan, Plan *plan)
{
ListCell *cell = list_nth_cell(root->glob->subplans, subplan->plan_id - 1);
cell->data.ptr_value = plan;
}
/*
* Rewrite the Plan associated with a SubPlan node in a completed PlannedStmt.
*/
static inline void exec_subplan_put_plan(struct PlannedStmt *plannedstmt, SubPlan *subplan, Plan *plan)
{
ListCell *cell = list_nth_cell(plannedstmt->subplans, subplan->plan_id - 1);
cell->data.ptr_value = plan;
}
extern List *extract_nodes_plan(Plan *pl, int nodeTag, bool descendIntoSubqueries);
extern List *extract_nodes_expression(Node *node, int nodeTag, bool descendIntoSubqueries);
extern int find_nodes(Node *node, List *nodeTags);
extern int check_collation(Node *node);
#endif
#endif /* PLANWALKER_H */

View File

@ -56,6 +56,10 @@ extern PlannedStmt* standard_planner(Query* parse, int cursorOptions, ParamListI
typedef void (*planner_hook_type) (Query* parse, int cursorOptions, ParamListInfo boundParams);
typedef void (*ndp_pushdown_hook_type) (Query* querytree, PlannedStmt *stmt);
extern THR_LOCAL PGDLLIMPORT ndp_pushdown_hook_type ndp_pushdown_hook;
#ifdef USE_SPQ
typedef PlannedStmt *(*spq_planner_hook_type) (Query* parse, int cursorOptions, ParamListInfo boundParams);
extern THR_LOCAL PGDLLIMPORT spq_planner_hook_type spq_planner_hook;
#endif
extern Plan* subquery_planner(PlannerGlobal* glob, Query* parse, PlannerInfo* parent_root, bool hasRecursion,
double tuple_fraction, PlannerInfo** subroot, int options = SUBQUERY_NORMAL, ItstDisKey* diskeys = NULL,

View File

@ -41,6 +41,9 @@ typedef enum {
REMOTE_BROADCAST, /* Broadcast data to all nodes. */
REMOTE_SPLIT_BROADCAST, /* Broadcast data to all parallel threads all nodes. */
REMOTE_HYBRID, /* Hybrid send data. */
#ifdef USE_SPQ
REMOTE_ROUNDROBIN,
#endif
LOCAL_DISTRIBUTE, /* Distribute data to all threads at local node. */
LOCAL_BROADCAST, /* Broadcast data to all threads at local node. */
LOCAL_ROUNDROBIN /* Roundrobin data to all threads at local node. */
@ -82,6 +85,9 @@ typedef struct Stream {
* used for recursive sql execution that under recursive-union operator. */
ExecNodes* origin_consumer_nodes;
bool is_recursive_local; /* LOCAL GATHER for recursive */
#ifdef USE_SPQ
int streamID;
#endif
} Stream;
extern void compute_stream_cost(StreamType type, char locator_type, double subrows, double subgblrows,
@ -95,4 +101,4 @@ extern List* get_max_cost_distkey_for_nulldistkey(
extern void parallel_stream_info_print(ParallelDesc* smpDesc, StreamType type);
extern List* make_distkey_for_append(PlannerInfo* root, Plan* subPlan);
#endif /* STREAM_COST_H */
#endif /* STREAM_COST_H */

View File

@ -83,4 +83,8 @@ extern Plan* update_plan_refs(PlannerInfo* root, Plan* plan, Index* fromRTI, Ind
extern void set_node_ref_subplan_walker(Plan* result_plan, set_node_ref_subplan_context* context);
extern void StreamPlanWalker(PlannedStmt *pstmt, Plan *plan, bool *need);
extern void mark_distribute_setop_remotequery(PlannerInfo* root, Node* node, Plan* plan, List* subPlans);
#endif /* STREAM_UTIL_H */
#ifdef USE_SPQ
extern void SpqSerializePlan(Plan* node, PlannedStmt* planned_stmt, StringInfoData* str,
int num_stream, int num_gather, bool push_subplan, uint64 queryId);
#endif
#endif /* STREAM_UTIL_H */

View File

@ -66,4 +66,11 @@ extern bool split_pathtarget_at_srfs(PlannerInfo *root, PathTarget *target, Path
/* Convenience macro to get a PathTarget with valid cost/width fields */
#define create_pathtarget(root, tlist) \
set_pathtarget_cost_width(root, make_pathtarget_from_tlist(tlist))
#ifdef USE_SPQ
extern List* tlist_members(Node* node, List* targetlist);
extern void get_sortgroupclauses_tles(List *clauses, List *targetList, List **tles, List **sortops, List **eqops);
extern Index maxSortGroupRef(List *targetlist, bool include_orderedagg);
#endif
#endif /* TLIST_H */

View File

@ -70,4 +70,14 @@ typedef enum {
#define XC_LOCK_FOR_BACKUP_KEY_1 0xFFFF
#define XC_LOCK_FOR_BACKUP_KEY_2 0xFFFF
#ifdef USE_SPQ
#define IS_SPQ_RUNNING (t_thrd.spq_ctx.spq_role != ROLE_UTILITY)
#define IS_SPQ_COORDINATOR (t_thrd.spq_ctx.spq_role == ROLE_QUERY_COORDINTOR)
#define IS_SPQ_EXECUTOR (t_thrd.spq_ctx.spq_role == ROLE_QUERY_EXECUTOR)
#else
#define IS_SPQ_RUNNING (false)
#define IS_SPQ_COORDINATOR (false)
#define IS_SPQ_EXECUTOR (false)
#endif
#endif /* PGXC_H */

View File

@ -315,7 +315,7 @@ extern Datum pgxc_execute_on_nodes(int numnodes, Oid* nodelist, char* query);
extern bool pgxc_node_receive(const int conn_count, PGXCNodeHandle** connections,
struct timeval* timeout, bool ignoreTimeoutWarning = false);
extern bool datanode_receive_from_physic_conn(
const int conn_count, PGXCNodeHandle** connections, struct timeval* timeout, bool ignoreTimeoutWarning = false);
const int conn_count, PGXCNodeHandle** connections, struct timeval* timeout);
extern bool datanode_receive_from_logic_conn(
const int conn_count, PGXCNodeHandle** connections, StreamNetCtl* ctl, int time_out);
extern bool pgxc_node_validate(PGXCNodeHandle *conn);

View File

@ -170,6 +170,11 @@ typedef struct RemoteQueryState {
char* serializedPlan; /* the serialized plan tree */
ParallelFunctionState* parallel_function_state;
bool has_stream_for_loop; /* has stream node in for loop sql which may cause hang. */
#ifdef USE_SPQ
uint64 queryId;
PGXCNodeHandle** spq_connections_info;
pg_conn **nodeCons;
#endif
} RemoteQueryState;
extern RemoteQueryState* CreateResponseCombiner(int node_count, CombineType combine_type);

View File

@ -215,7 +215,12 @@ extern void set_disable_conn_mode(void);
#define IsConnPortFromCoord(port) \
((port)->cmdline_options != NULL && strstr((port)->cmdline_options, "remotetype=coordinator") != NULL)
#else
#ifdef USE_SPQ
#define IsConnPortFromCoord(port) \
((port)->cmdline_options != NULL && strstr((port)->cmdline_options, "remotetype=coordinator") != NULL)
#else
#define IsConnPortFromCoord(port) false
#endif
extern bool get_addr_from_socket(int sock, struct sockaddr *saddr);
extern int get_ip_port_from_addr(char* sock_ip, int* port, struct sockaddr saddr);
#endif

View File

@ -39,6 +39,10 @@ extern void ChangeVarNodes(Node* node, int old_varno, int new_varno, int subleve
extern void IncrementVarSublevelsUp(Node* node, int delta_sublevels_up, int min_sublevels_up);
extern void IncrementVarSublevelsUp_rtable(List* rtable, int delta_sublevels_up, int min_sublevels_up);
#ifdef USE_SPQ
extern void SpqIncrementVarSublevelsUpInTransformGroupedWindows(Node *node, int delta_sublevels_up, int min_sublevels_up);
#endif
extern bool rangeTableEntry_used(Node* node, int rt_index, int sublevels_up);
extern bool attribute_used(Node* node, int rt_index, int attno, int sublevels_up);

View File

@ -102,6 +102,11 @@ typedef enum {
DestTupleHybrid,
#ifdef USE_SPQ
DestTupleRoundRobin,
DestBatchRoundRobin,
#endif
DestBatchBroadCast, /* results send to consumer thread in a broadcast way */
DestBatchLocalBroadCast, /* results send to consumer thread in a local broadcast way */
DestBatchRedistribute, /* results send to consumer thread in a redistribute way */

View File

@ -19,6 +19,9 @@
#include "catalog/pg_workload_group.h"
#include "catalog/pg_app_workloadgroup_mapping.h"
#include "catalog/pgxc_node.h"
#ifdef USE_SPQ
#include "parser/parse_coerce.h"
#endif
/* Result list element for get_op_btree_interpretation */
typedef struct OpBtreeInterpretation {
@ -220,6 +223,75 @@ extern bool get_func_iswindow(Oid funcid);
extern char get_func_prokind(Oid funcid);
extern char get_typecategory(Oid typid);
#ifdef USE_SPQ
/* comparison types */
typedef enum CmpType {
CmptEq, // equality
CmptNEq, // inequality
CmptLT, // less than
CmptLEq, // less or equal to
CmptGT, // greater than
CmptGEq, // greater or equal to
CmptOther // other operator
} CmpType;
#define ATTSTATSSLOT_VALUES 0x01
#define ATTSTATSSLOT_NUMBERS 0x02
/* Result struct for get_attstatsslot */
typedef struct AttStatsSlot {
/* Always filled: */
Oid staop; /* Actual staop for the found slot */
Oid stacoll; /* Actual collation for the found slot */
/* Filled if ATTSTATSSLOT_VALUES is specified: */
Oid valuetype; /* Actual datatype of the values */
Datum *values; /* slot's "values" array, or NULL if none */
int nvalues; /* length of values[], or 0 */
/* Filled if ATTSTATSSLOT_NUMBERS is specified: */
float4 *numbers; /* slot's "numbers" array, or NULL if none */
int nnumbers; /* length of numbers[], or 0 */
/* Remaining fields are private to get_attstatsslot/free_attstatsslot */
void *values_arr; /* palloc'd values array, if any */
void *numbers_arr; /* palloc'd numbers array, if any */
} AttStatsSlot;
extern Oid get_compatible_hash_opfamily(Oid opno);
extern Oid get_compatible_legacy_hash_opfamily(Oid opno);
extern void MemoryContextDeclareAccountingRoot(MemoryContext context);
extern Oid get_agg_transtype(Oid aggid);
extern bool is_agg_ordered(Oid aggid);
extern bool is_agg_partial_capable(Oid aggid);
extern List *get_func_output_arg_types(Oid funcid);
extern List *get_func_arg_types(Oid funcid);
extern char func_data_access(Oid funcid);
extern char func_exec_location(Oid funcid);
extern bool index_exists(Oid oid);
extern bool aggregate_exists(Oid oid);
extern Oid get_aggregate(const char *aggname, Oid oidType);
extern bool function_exists(Oid oid);
extern bool get_cast_func(Oid oidSrc, Oid oidDest, bool *is_binary_coercible, Oid *oidCastFunc, CoercionPathType *pathtype);
extern bool check_constraint_exists(Oid oidCheckconstraint);
extern char *get_check_constraint_name(Oid oidCheckconstraint);
extern Oid get_check_constraint_relid(Oid oidCheckconstraint);
extern List *get_check_constraint_oids(Oid oidRel);
extern Node *get_check_constraint_expr_tree(Oid oidCheckconstraint);
extern bool operator_exists(Oid oid);
extern bool relation_exists(Oid oid);
extern bool type_exists(Oid oid);
extern CmpType get_comparison_type(Oid oidOp);
extern Oid get_comparison_operator(Oid oidLeft, Oid oidRight, CmpType cmpt);
extern bool has_subclass_slow(Oid relationId);
extern List *get_operator_opfamilies(Oid opno);
extern List *get_index_opfamilies(Oid oidIndex);
extern bool relation_is_partitioned(Oid oid);
extern bool index_is_partitioned(Oid oid);
extern bool has_update_triggers(Oid relid);
extern bool spq_get_attstatsslot(AttStatsSlot *sslot, HeapTuple statstuple, int reqkind, Oid reqop, int flags);
extern void spq_free_attstatsslot(AttStatsSlot *sslot);
extern char * get_type_name(Oid typid);
extern int32 get_trigger_type(Oid triggerid);
extern HeapTuple get_att_stats(Oid relid, AttrNumber attrnum);
#endif
#define type_is_array(typid) (get_element_type(typid) != InvalidOid)
/* type_is_array_domain accepts both plain arrays and domains over arrays */
#define type_is_array_domain(typid) (get_base_element_type(typid) != InvalidOid)

View File

@ -352,6 +352,7 @@ extern bool numericvar_to_int64(const NumericVar* var, int64* result, bool can_i
extern void int64_to_numericvar(int64 val, NumericVar *var);
extern void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
extern char *numeric_normalize(Numeric num);
extern double numeric_to_double_no_overflow(Numeric num);
bool numeric_agg_trans_initvalisnull(Oid transfn_oid, bool initvalisnull);
void numeric_transfn_info_change(Oid aggfn_oid, Oid *transfn_oid, Oid *transtype);

View File

@ -411,6 +411,10 @@ typedef struct StdRdOptions {
int check_option_offset; /* for views */
int view_security_option_offset; /* for views */
Oid collate; /* table's default collation in b format. */
#ifdef USE_SPQ
/* SPQ OPTIONS */
int spq_bt_build_offset;
#endif
} StdRdOptions;
#define HEAP_MIN_FILLFACTOR 10

View File

@ -288,4 +288,5 @@ extern void set_varratio_after_calc_selectivity(
extern double get_windowagg_selectivity(PlannerInfo* root, WindowClause* wc, WindowFunc* wfunc, List* partitionExprs,
int32 constval, double tuples, unsigned int num_datanodes);
extern bool contain_single_col_stat(List* stat_list);
extern double convert_timevalue_to_scalar(Datum value, Oid typid);
#endif /* SELFUNCS_H */