diff --git a/src/common/backend/utils/misc/guc/guc_sql.cpp b/src/common/backend/utils/misc/guc/guc_sql.cpp index cbb1b8ccc..c6b07c133 100755 --- a/src/common/backend/utils/misc/guc/guc_sql.cpp +++ b/src/common/backend/utils/misc/guc/guc_sql.cpp @@ -1689,6 +1689,17 @@ static void InitSqlConfigureNamesBool() NULL, NULL, NULL}, + {{"enable_opfusion_reuse", + PGC_USERSET, + NODE_ALL, + QUERY_TUNING_METHOD, + gettext_noop("Enables reuse opfusion object."), + NULL}, + &u_sess->attr.attr_sql.enable_opfusion_reuse, + false, + NULL, + NULL, + NULL}, #ifndef ENABLE_MULTIPLE_NODES {{"plsql_show_all_error", PGC_USERSET, diff --git a/src/gausskernel/process/threadpool/knl_session.cpp b/src/gausskernel/process/threadpool/knl_session.cpp index 57ac1b77c..903a22039 100755 --- a/src/gausskernel/process/threadpool/knl_session.cpp +++ b/src/gausskernel/process/threadpool/knl_session.cpp @@ -1091,6 +1091,13 @@ static void knl_u_dolphin_errdata_init(knl_u_dolphin_errdata_context *dolphin_er dolphin_errdata_context->max_error_count = 64; } +static void knl_u_opfusion_reuse_init(knl_u_opfusion_reuse_context* opfusion_reuse_ctx) { + + Assert(opfusion_reuse_ctx != NULL); + + opfusion_reuse_ctx->opfusionObj = NULL; +} + static void knl_u_user_login_init(knl_u_user_login_context* user_login_cxt) { Assert(user_login_cxt != NULL); @@ -1477,6 +1484,8 @@ void knl_session_init(knl_session_context* sess_cxt) knl_u_clientConnTime_init(&sess_cxt->clientConnTime_cxt); + knl_u_opfusion_reuse_init(&sess_cxt->opfusion_reuse_ctx); + MemoryContextSeal(sess_cxt->top_mem_cxt); } diff --git a/src/gausskernel/runtime/opfusion/opfusion.cpp b/src/gausskernel/runtime/opfusion/opfusion.cpp index cc0936b01..96b7189e0 100644 --- a/src/gausskernel/runtime/opfusion/opfusion.cpp +++ b/src/gausskernel/runtime/opfusion/opfusion.cpp @@ -194,6 +194,11 @@ void OpFusion::SaveInGPC(OpFusion *obj) rc = memset_s((void *)&obj->m_local, sizeof(OpFusionLocaleVariable), 0, sizeof(OpFusionLocaleVariable)); securec_check(rc, "\0", "\0"); MemoryContextSeal(obj->m_global->m_context); + /* we can not reuse this obj for opfusion reuse */ + if (u_sess->opfusion_reuse_ctx.opfusionObj == obj) { + u_sess->opfusion_reuse_ctx.opfusionObj = NULL; + } + } void OpFusion::DropGlobalOpfusion(OpFusion *obj) @@ -646,6 +651,49 @@ void OpFusion::useOuterParameter(ParamListInfo params) m_local.m_outParams = params; } +static void* TryReuseOpfusionObj(FusionType ftype, MemoryContext context, CachedPlanSource *psrc, + List *plantree_list, ParamListInfo params) +{ + if (!u_sess->attr.attr_sql.enable_opfusion_reuse){ + return NULL; + } + /* + * we save the obj without FusionType check in FusionFactory + * so must check here + */ + if (INSERT_FUSION != ftype) { + return NULL; + } + + OpFusion* checkOpfusionObj = (OpFusion *)u_sess->opfusion_reuse_ctx.opfusionObj; + if (psrc != NULL /* not support for cacheplan case */ + || checkOpfusionObj == NULL + || checkOpfusionObj->m_local.m_optype != ftype) { + return NULL; + } + + /*check the rel id*/ + PlannedStmt *curr_plan = (PlannedStmt *)linitial(plantree_list); + int rtindex = linitial_int((List*)linitial(curr_plan->resultRelations)); + Oid rel_oid = getrelid(rtindex, curr_plan->rtable); + if (rel_oid != checkOpfusionObj->m_global->m_reloid) { + return NULL; + } + + /* check the resultdesc*/ + Relation rel = heap_open(rel_oid, AccessShareLock); + TupleDesc rel_tupDesc = CreateTupleDescCopy(RelationGetDescr(rel)); + heap_close(rel, AccessShareLock); + if (!equalTupleDescs(rel_tupDesc, checkOpfusionObj->m_global->m_tupDesc)) { + return NULL; + } + + /* call specific reset function here*/ + if (!checkOpfusionObj->ResetReuseFusion(context, psrc, plantree_list, params)){ + checkOpfusionObj = NULL; + } + return (void*)checkOpfusionObj; +} void *OpFusion::FusionFactory(FusionType ftype, MemoryContext context, CachedPlanSource *psrc, List *plantree_list, ParamListInfo params) @@ -657,7 +705,16 @@ void *OpFusion::FusionFactory(FusionType ftype, MemoryContext context, CachedPla if (ftype > BYPASS_OK) { return NULL; } - void *opfusionObj = NULL; + + void *opfusionObj = NULL; + /* + * try to reuse opfusion object + */ + opfusionObj = TryReuseOpfusionObj(ftype, context, psrc, plantree_list, params); + if (opfusionObj) { + return opfusionObj; + } + MemoryContext objCxt = NULL; bool isShared = psrc && psrc->gpc.status.InShareTable(); if (isShared) { @@ -706,6 +763,12 @@ void *OpFusion::FusionFactory(FusionType ftype, MemoryContext context, CachedPla } if (opfusionObj != NULL && !((OpFusion *)opfusionObj)->m_global->m_is_global) ((OpFusion *)opfusionObj)->m_global->m_type = ftype; + + if (u_sess->attr.attr_sql.enable_opfusion_reuse){ + /* XXX: better to free previous obj */ + u_sess->opfusion_reuse_ctx.opfusionObj = opfusionObj; + } + return opfusionObj; } @@ -1394,3 +1457,18 @@ void FreeExecutorStateForOpfusion(EState* estate) } ResetOpfusionExecutorState(estate); } + +/* + * AtEOXact_OpfusionReuse + * + * This routine is called during transaction commit or abort (it doesn't + * particularly care which). reset the opfusion reuse context + */ +void AtEOXact_OpfusionReuse() +{ + /* + * no need to delete the memory context + * the are the sub nodes of the top transaction ctx + */ + u_sess->opfusion_reuse_ctx.opfusionObj = NULL; +} diff --git a/src/gausskernel/runtime/opfusion/opfusion_insert.cpp b/src/gausskernel/runtime/opfusion/opfusion_insert.cpp index 45e3e0934..b560c0e27 100644 --- a/src/gausskernel/runtime/opfusion/opfusion_insert.cpp +++ b/src/gausskernel/runtime/opfusion/opfusion_insert.cpp @@ -32,35 +32,7 @@ #include "executor/node/nodeModifyTable.h" #include "parser/parse_coerce.h" -void InsertFusion::InitGlobals() -{ - m_c_global = (InsertFusionGlobalVariable*)palloc0(sizeof(InsertFusionGlobalVariable)); - - m_global->m_reloid = getrelid(linitial_int((List*)linitial(m_global->m_planstmt->resultRelations)), - m_global->m_planstmt->rtable); - ModifyTable* node = (ModifyTable*)m_global->m_planstmt->planTree; - BaseResult* baseresult = (BaseResult*)linitial(node->plans); - List* targetList = baseresult->plan.targetlist; - - Relation rel = heap_open(m_global->m_reloid, AccessShareLock); - m_global->m_table_type = RelationIsUstoreFormat(rel) ? TAM_USTORE : TAM_HEAP; - m_global->m_exec_func_ptr = (OpFusionExecfuncType)&InsertFusion::ExecInsert; - - m_global->m_natts = RelationGetDescr(rel)->natts; - m_global->m_is_bucket_rel = RELATION_OWN_BUCKET(rel); - m_global->m_tupDesc = CreateTupleDescCopy(RelationGetDescr(rel)); - m_global->m_tupDesc->td_tam_ops = GetTableAmRoutine(m_global->m_table_type); - heap_close(rel, AccessShareLock); - - /* init param func const */ - m_global->m_paramNum = 0; - m_global->m_paramLoc = (ParamLoc*)palloc0(m_global->m_natts * sizeof(ParamLoc)); - m_c_global->m_targetParamNum = 0; - m_c_global->m_targetFuncNum = 0; - m_c_global->m_targetFuncNodes = (FuncExprInfo*)palloc0(m_global->m_natts * sizeof(FuncExprInfo)); - m_c_global->m_targetConstNum = 0; - m_c_global->m_targetConstLoc = (ConstLoc*)palloc0(m_global->m_natts * sizeof(ConstLoc)); - +void InsertFusion::InitBaseParam(List* targetList) { ListCell* lc = NULL; int i = 0; FuncExpr* func = NULL; @@ -104,6 +76,38 @@ void InsertFusion::InitGlobals() i++; } m_c_global->m_targetConstNum = i; +} + +void InsertFusion::InitGlobals() +{ + m_c_global = (InsertFusionGlobalVariable*)palloc0(sizeof(InsertFusionGlobalVariable)); + + m_global->m_reloid = getrelid(linitial_int((List*)linitial(m_global->m_planstmt->resultRelations)), + m_global->m_planstmt->rtable); + ModifyTable* node = (ModifyTable*)m_global->m_planstmt->planTree; + BaseResult* baseresult = (BaseResult*)linitial(node->plans); + List* targetList = baseresult->plan.targetlist; + + Relation rel = heap_open(m_global->m_reloid, AccessShareLock); + m_global->m_table_type = RelationIsUstoreFormat(rel) ? TAM_USTORE : TAM_HEAP; + m_global->m_exec_func_ptr = (OpFusionExecfuncType)&InsertFusion::ExecInsert; + + m_global->m_natts = RelationGetDescr(rel)->natts; + m_global->m_is_bucket_rel = RELATION_OWN_BUCKET(rel); + m_global->m_tupDesc = CreateTupleDescCopy(RelationGetDescr(rel)); + m_global->m_tupDesc->td_tam_ops = GetTableAmRoutine(m_global->m_table_type); + heap_close(rel, AccessShareLock); + + /* init param func const */ + m_global->m_paramNum = 0; + m_global->m_paramLoc = (ParamLoc*)palloc0(m_global->m_natts * sizeof(ParamLoc)); + m_c_global->m_targetParamNum = 0; + m_c_global->m_targetFuncNum = 0; + m_c_global->m_targetFuncNodes = (FuncExprInfo*)palloc0(m_global->m_natts * sizeof(FuncExprInfo)); + m_c_global->m_targetConstNum = 0; + m_c_global->m_targetConstLoc = (ConstLoc*)palloc0(m_global->m_natts * sizeof(ConstLoc)); + + InitBaseParam(targetList); } void InsertFusion::InitLocals(ParamListInfo params) @@ -455,3 +459,33 @@ bool InsertFusion::execute(long max_rows, char* completionTag) u_sess->statement_cxt.last_row_count = u_sess->statement_cxt.current_row_count; return success; } + +/* + * reset InsertFusion for reuse: + * such as + * insert into t values (1, 'a'); + * insert into t values (2, 'b'); + * only need to replace the planstmt + */ +bool InsertFusion::ResetReuseFusion(MemoryContext context, CachedPlanSource* psrc, List* plantree_list, ParamListInfo params) +{ + PlannedStmt *curr_plan = (PlannedStmt *)linitial(plantree_list); + m_global->m_planstmt = curr_plan; + + ModifyTable* node = (ModifyTable*)m_global->m_planstmt->planTree; + BaseResult* baseresult = (BaseResult*)linitial(node->plans); + List* targetList = baseresult->plan.targetlist; + + m_c_global->m_targetFuncNum = 0; + m_c_global->m_targetParamNum = 0; + + InitBaseParam(targetList); + + // local + m_c_local.m_estate->es_range_table = NIL; + m_c_local.m_estate->es_range_table = m_global->m_planstmt->rtable; + m_c_local.m_estate->es_plannedstmt = m_global->m_planstmt; + initParams(params); + + return true; +} \ No newline at end of file diff --git a/src/gausskernel/storage/access/transam/xact.cpp b/src/gausskernel/storage/access/transam/xact.cpp index 3a33d5db2..8d48f0c8e 100755 --- a/src/gausskernel/storage/access/transam/xact.cpp +++ b/src/gausskernel/storage/access/transam/xact.cpp @@ -3027,7 +3027,7 @@ static void CommitTransaction(bool STP_commit) AtEOXact_PartitionCache(true); AtEOXact_BucketCache(true); - + AtEOXact_OpfusionReuse(); /* * Make catalog changes visible to all backends. This has to happen after * relcache references are dropped (see comments for @@ -3480,6 +3480,9 @@ static void PrepareTransaction(bool STP_commit) AtEOXact_PartitionCache(true); AtEOXact_BucketCache(true); + + AtEOXact_OpfusionReuse(); + /* notify doesn't need a postprepare call */ PostPrepare_PgStat(); @@ -3937,6 +3940,7 @@ static void AbortTransaction(bool PerfectRollback, bool STP_rollback) AtEOXact_FreeTupleDesc(); AtEOXact_PartitionCache(false); AtEOXact_BucketCache(false); + AtEOXact_OpfusionReuse(); AtEOXact_Inval(false); smgrDoPendingDeletes(false); release_conn_to_compute_pool(); diff --git a/src/include/knl/knl_guc/knl_session_attr_sql.h b/src/include/knl/knl_guc/knl_session_attr_sql.h index f4b3af22d..85a7834ac 100644 --- a/src/include/knl/knl_guc/knl_session_attr_sql.h +++ b/src/include/knl/knl_guc/knl_session_attr_sql.h @@ -244,7 +244,7 @@ typedef struct knl_session_attr_sql { char* db4ai_snapshot_version_separator; int pldebugger_timeout; bool partition_page_estimation; - + bool enable_opfusion_reuse; #ifndef ENABLE_MULTIPLE_NODES bool uppercase_attribute_name; #endif diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index 398f06377..36bc00b49 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -2413,6 +2413,10 @@ typedef struct knl_u_dolphin_errdata_context { int max_error_count; } knl_u_dolphin_errdata_context; +typedef struct knl_u_opfusion_reuse_context { + void *opfusionObj; /* Opfusion cache Object */ +} knl_u_opfusion_reuse_context; + typedef struct knl_u_catalog_context { bool nulls[4]; struct PartitionIdentifier* route; @@ -2907,6 +2911,8 @@ typedef struct knl_session_context { knl_u_rep_origin_context reporigin_cxt; knl_u_dolphin_errdata_context dolphin_errdata_ctx; + knl_u_opfusion_reuse_context opfusion_reuse_ctx; + /* * Initialize context which records time for client connection establish. * This time records start on incommining resuest arrives e.g. poll() invoked to accept() and diff --git a/src/include/opfusion/opfusion.h b/src/include/opfusion/opfusion.h index 1a3628fab..cd4e8e3f3 100644 --- a/src/include/opfusion/opfusion.h +++ b/src/include/opfusion/opfusion.h @@ -46,6 +46,7 @@ extern void ExecCheckXactReadOnly(PlannedStmt* plannedstmt); extern bool IsRightRefState(List* plantreeList); EState* CreateExecutorStateForOpfusion(MemoryContext saveCxt, MemoryContext tmpCxt); void FreeExecutorStateForOpfusion(EState* estate); +extern void AtEOXact_OpfusionReuse(); /* * The variables in OpFusion is always in two parts: global's variables and local's variables. @@ -162,6 +163,12 @@ public: !IsRightRefState(stmtList)); } + virtual bool ResetReuseFusion(MemoryContext context, CachedPlanSource* psrc, List* plantree_list, ParamListInfo params) + { + return false; + } + + public: struct ConstLoc { Datum constValue; diff --git a/src/include/opfusion/opfusion_insert.h b/src/include/opfusion/opfusion_insert.h index f369a655c..07d3c227a 100644 --- a/src/include/opfusion/opfusion_insert.h +++ b/src/include/opfusion/opfusion_insert.h @@ -40,10 +40,15 @@ public: void InitGlobals(); void refreshParameterIfNecessary(); + + virtual bool ResetReuseFusion(MemoryContext context, CachedPlanSource* psrc, List* plantree_list, ParamListInfo params); + private: unsigned long ExecInsert(Relation rel, ResultRelInfo* resultRelInfo); + void InitBaseParam(List* targetList); + struct InsertFusionGlobalVariable { /* for func/op expr calculation */ FuncExprInfo* m_targetFuncNodes; diff --git a/src/test/regress/expected/bypass_simplequery_support.out b/src/test/regress/expected/bypass_simplequery_support.out index 26ff51c3c..975bf6fc6 100644 --- a/src/test/regress/expected/bypass_simplequery_support.out +++ b/src/test/regress/expected/bypass_simplequery_support.out @@ -1921,6 +1921,41 @@ explain delete from v_test; reset explain_perf_mode; drop table test cascade; NOTICE: drop cascades to view v_test +--test enable_opfusion_reuse +set enable_opfusion_reuse=on; +create table test_reuse_t1(a int,b int,c text); +create table test_reuse_t2(a int,b int,c text,d text); +begin; +insert into test_reuse_t1 values(1,1,'2'); +insert into test_reuse_t1 values(2,3,'2'); +insert into test_reuse_t2 values(1,2,'3','4'); +insert into test_reuse_t1 values(1,3,'4'); +select * from test_reuse_t1; + a | b | c +---+---+--- + 1 | 1 | 2 + 2 | 3 | 2 + 1 | 3 | 4 +(3 rows) + +commit; +truncate test_reuse_t1; +truncate test_reuse_t2; +begin; +insert into test_reuse_t1 values(1,1,'2'); +alter table test_reuse_t1 add column colf int; +insert into test_reuse_t1 values(1,1,'2'); +select * from test_reuse_t1; + a | b | c | colf +---+---+---+------ + 1 | 1 | 2 | + 1 | 1 | 2 | +(2 rows) + +commit; +drop table test_reuse_t1; +drop table test_reuse_t2; +reset enable_opfusion_reuse; -- end reset track_activities; set track_sql_count = off; diff --git a/src/test/regress/output/recovery_2pc_tools.source b/src/test/regress/output/recovery_2pc_tools.source index 6bf0da51e..3c310fe61 100644 --- a/src/test/regress/output/recovery_2pc_tools.source +++ b/src/test/regress/output/recovery_2pc_tools.source @@ -293,6 +293,7 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c enable_nvm | bool | | | enable_online_ddl_waitlock | bool | | | enable_opfusion | bool | | | + enable_opfusion_reuse | bool | | | enable_orc_cache | bool | | | enable_page_lsn_check | bool | | | enable_partition_opfusion | bool | | | diff --git a/src/test/regress/sql/bypass_simplequery_support.sql b/src/test/regress/sql/bypass_simplequery_support.sql index ada3ec28a..5117b4242 100755 --- a/src/test/regress/sql/bypass_simplequery_support.sql +++ b/src/test/regress/sql/bypass_simplequery_support.sql @@ -430,6 +430,33 @@ reset explain_perf_mode; drop table test cascade; +--test enable_opfusion_reuse +set enable_opfusion_reuse=on; +create table test_reuse_t1(a int,b int,c text); +create table test_reuse_t2(a int,b int,c text,d text); + +begin; +insert into test_reuse_t1 values(1,1,'2'); +insert into test_reuse_t1 values(2,3,'2'); +insert into test_reuse_t2 values(1,2,'3','4'); +insert into test_reuse_t1 values(1,3,'4'); +select * from test_reuse_t1; +commit; + +truncate test_reuse_t1; +truncate test_reuse_t2; + +begin; +insert into test_reuse_t1 values(1,1,'2'); +alter table test_reuse_t1 add column colf int; +insert into test_reuse_t1 values(1,1,'2'); +select * from test_reuse_t1; +commit; + +drop table test_reuse_t1; +drop table test_reuse_t2; +reset enable_opfusion_reuse; + -- end reset track_activities; set track_sql_count = off;