!3186 对同事务中批量逐条INSERT场景进行opfusion对象复用的优化

Merge pull request !3186 from cc_db_dev/opfusion_master
This commit is contained in:
opengauss-bot
2023-06-01 09:11:12 +00:00
committed by Gitee
12 changed files with 249 additions and 32 deletions

View File

@ -1689,6 +1689,17 @@ static void InitSqlConfigureNamesBool()
NULL,
NULL,
NULL},
{{"enable_opfusion_reuse",
PGC_USERSET,
NODE_ALL,
QUERY_TUNING_METHOD,
gettext_noop("Enables reuse opfusion object."),
NULL},
&u_sess->attr.attr_sql.enable_opfusion_reuse,
false,
NULL,
NULL,
NULL},
#ifndef ENABLE_MULTIPLE_NODES
{{"plsql_show_all_error",
PGC_USERSET,

View File

@ -1091,6 +1091,13 @@ static void knl_u_dolphin_errdata_init(knl_u_dolphin_errdata_context *dolphin_er
dolphin_errdata_context->max_error_count = 64;
}
static void knl_u_opfusion_reuse_init(knl_u_opfusion_reuse_context* opfusion_reuse_ctx) {
Assert(opfusion_reuse_ctx != NULL);
opfusion_reuse_ctx->opfusionObj = NULL;
}
static void knl_u_user_login_init(knl_u_user_login_context* user_login_cxt)
{
Assert(user_login_cxt != NULL);
@ -1477,6 +1484,8 @@ void knl_session_init(knl_session_context* sess_cxt)
knl_u_clientConnTime_init(&sess_cxt->clientConnTime_cxt);
knl_u_opfusion_reuse_init(&sess_cxt->opfusion_reuse_ctx);
MemoryContextSeal(sess_cxt->top_mem_cxt);
}

View File

@ -194,6 +194,11 @@ void OpFusion::SaveInGPC(OpFusion *obj)
rc = memset_s((void *)&obj->m_local, sizeof(OpFusionLocaleVariable), 0, sizeof(OpFusionLocaleVariable));
securec_check(rc, "\0", "\0");
MemoryContextSeal(obj->m_global->m_context);
/* we can not reuse this obj for opfusion reuse */
if (u_sess->opfusion_reuse_ctx.opfusionObj == obj) {
u_sess->opfusion_reuse_ctx.opfusionObj = NULL;
}
}
void OpFusion::DropGlobalOpfusion(OpFusion *obj)
@ -646,6 +651,49 @@ void OpFusion::useOuterParameter(ParamListInfo params)
m_local.m_outParams = params;
}
static void* TryReuseOpfusionObj(FusionType ftype, MemoryContext context, CachedPlanSource *psrc,
List *plantree_list, ParamListInfo params)
{
if (!u_sess->attr.attr_sql.enable_opfusion_reuse){
return NULL;
}
/*
* we save the obj without FusionType check in FusionFactory
* so must check here
*/
if (INSERT_FUSION != ftype) {
return NULL;
}
OpFusion* checkOpfusionObj = (OpFusion *)u_sess->opfusion_reuse_ctx.opfusionObj;
if (psrc != NULL /* not support for cacheplan case */
|| checkOpfusionObj == NULL
|| checkOpfusionObj->m_local.m_optype != ftype) {
return NULL;
}
/*check the rel id*/
PlannedStmt *curr_plan = (PlannedStmt *)linitial(plantree_list);
int rtindex = linitial_int((List*)linitial(curr_plan->resultRelations));
Oid rel_oid = getrelid(rtindex, curr_plan->rtable);
if (rel_oid != checkOpfusionObj->m_global->m_reloid) {
return NULL;
}
/* check the resultdesc*/
Relation rel = heap_open(rel_oid, AccessShareLock);
TupleDesc rel_tupDesc = CreateTupleDescCopy(RelationGetDescr(rel));
heap_close(rel, AccessShareLock);
if (!equalTupleDescs(rel_tupDesc, checkOpfusionObj->m_global->m_tupDesc)) {
return NULL;
}
/* call specific reset function here*/
if (!checkOpfusionObj->ResetReuseFusion(context, psrc, plantree_list, params)){
checkOpfusionObj = NULL;
}
return (void*)checkOpfusionObj;
}
void *OpFusion::FusionFactory(FusionType ftype, MemoryContext context, CachedPlanSource *psrc, List *plantree_list,
ParamListInfo params)
@ -657,7 +705,16 @@ void *OpFusion::FusionFactory(FusionType ftype, MemoryContext context, CachedPla
if (ftype > BYPASS_OK) {
return NULL;
}
void *opfusionObj = NULL;
void *opfusionObj = NULL;
/*
* try to reuse opfusion object
*/
opfusionObj = TryReuseOpfusionObj(ftype, context, psrc, plantree_list, params);
if (opfusionObj) {
return opfusionObj;
}
MemoryContext objCxt = NULL;
bool isShared = psrc && psrc->gpc.status.InShareTable();
if (isShared) {
@ -706,6 +763,12 @@ void *OpFusion::FusionFactory(FusionType ftype, MemoryContext context, CachedPla
}
if (opfusionObj != NULL && !((OpFusion *)opfusionObj)->m_global->m_is_global)
((OpFusion *)opfusionObj)->m_global->m_type = ftype;
if (u_sess->attr.attr_sql.enable_opfusion_reuse){
/* XXX: better to free previous obj */
u_sess->opfusion_reuse_ctx.opfusionObj = opfusionObj;
}
return opfusionObj;
}
@ -1394,3 +1457,18 @@ void FreeExecutorStateForOpfusion(EState* estate)
}
ResetOpfusionExecutorState(estate);
}
/*
* AtEOXact_OpfusionReuse
*
* This routine is called during transaction commit or abort (it doesn't
* particularly care which). reset the opfusion reuse context
*/
void AtEOXact_OpfusionReuse()
{
/*
* no need to delete the memory context
* the are the sub nodes of the top transaction ctx
*/
u_sess->opfusion_reuse_ctx.opfusionObj = NULL;
}

View File

@ -32,35 +32,7 @@
#include "executor/node/nodeModifyTable.h"
#include "parser/parse_coerce.h"
void InsertFusion::InitGlobals()
{
m_c_global = (InsertFusionGlobalVariable*)palloc0(sizeof(InsertFusionGlobalVariable));
m_global->m_reloid = getrelid(linitial_int((List*)linitial(m_global->m_planstmt->resultRelations)),
m_global->m_planstmt->rtable);
ModifyTable* node = (ModifyTable*)m_global->m_planstmt->planTree;
BaseResult* baseresult = (BaseResult*)linitial(node->plans);
List* targetList = baseresult->plan.targetlist;
Relation rel = heap_open(m_global->m_reloid, AccessShareLock);
m_global->m_table_type = RelationIsUstoreFormat(rel) ? TAM_USTORE : TAM_HEAP;
m_global->m_exec_func_ptr = (OpFusionExecfuncType)&InsertFusion::ExecInsert;
m_global->m_natts = RelationGetDescr(rel)->natts;
m_global->m_is_bucket_rel = RELATION_OWN_BUCKET(rel);
m_global->m_tupDesc = CreateTupleDescCopy(RelationGetDescr(rel));
m_global->m_tupDesc->td_tam_ops = GetTableAmRoutine(m_global->m_table_type);
heap_close(rel, AccessShareLock);
/* init param func const */
m_global->m_paramNum = 0;
m_global->m_paramLoc = (ParamLoc*)palloc0(m_global->m_natts * sizeof(ParamLoc));
m_c_global->m_targetParamNum = 0;
m_c_global->m_targetFuncNum = 0;
m_c_global->m_targetFuncNodes = (FuncExprInfo*)palloc0(m_global->m_natts * sizeof(FuncExprInfo));
m_c_global->m_targetConstNum = 0;
m_c_global->m_targetConstLoc = (ConstLoc*)palloc0(m_global->m_natts * sizeof(ConstLoc));
void InsertFusion::InitBaseParam(List* targetList) {
ListCell* lc = NULL;
int i = 0;
FuncExpr* func = NULL;
@ -104,6 +76,38 @@ void InsertFusion::InitGlobals()
i++;
}
m_c_global->m_targetConstNum = i;
}
void InsertFusion::InitGlobals()
{
m_c_global = (InsertFusionGlobalVariable*)palloc0(sizeof(InsertFusionGlobalVariable));
m_global->m_reloid = getrelid(linitial_int((List*)linitial(m_global->m_planstmt->resultRelations)),
m_global->m_planstmt->rtable);
ModifyTable* node = (ModifyTable*)m_global->m_planstmt->planTree;
BaseResult* baseresult = (BaseResult*)linitial(node->plans);
List* targetList = baseresult->plan.targetlist;
Relation rel = heap_open(m_global->m_reloid, AccessShareLock);
m_global->m_table_type = RelationIsUstoreFormat(rel) ? TAM_USTORE : TAM_HEAP;
m_global->m_exec_func_ptr = (OpFusionExecfuncType)&InsertFusion::ExecInsert;
m_global->m_natts = RelationGetDescr(rel)->natts;
m_global->m_is_bucket_rel = RELATION_OWN_BUCKET(rel);
m_global->m_tupDesc = CreateTupleDescCopy(RelationGetDescr(rel));
m_global->m_tupDesc->td_tam_ops = GetTableAmRoutine(m_global->m_table_type);
heap_close(rel, AccessShareLock);
/* init param func const */
m_global->m_paramNum = 0;
m_global->m_paramLoc = (ParamLoc*)palloc0(m_global->m_natts * sizeof(ParamLoc));
m_c_global->m_targetParamNum = 0;
m_c_global->m_targetFuncNum = 0;
m_c_global->m_targetFuncNodes = (FuncExprInfo*)palloc0(m_global->m_natts * sizeof(FuncExprInfo));
m_c_global->m_targetConstNum = 0;
m_c_global->m_targetConstLoc = (ConstLoc*)palloc0(m_global->m_natts * sizeof(ConstLoc));
InitBaseParam(targetList);
}
void InsertFusion::InitLocals(ParamListInfo params)
@ -455,3 +459,33 @@ bool InsertFusion::execute(long max_rows, char* completionTag)
u_sess->statement_cxt.last_row_count = u_sess->statement_cxt.current_row_count;
return success;
}
/*
* reset InsertFusion for reuse:
* such as
* insert into t values (1, 'a');
* insert into t values (2, 'b');
* only need to replace the planstmt
*/
bool InsertFusion::ResetReuseFusion(MemoryContext context, CachedPlanSource* psrc, List* plantree_list, ParamListInfo params)
{
PlannedStmt *curr_plan = (PlannedStmt *)linitial(plantree_list);
m_global->m_planstmt = curr_plan;
ModifyTable* node = (ModifyTable*)m_global->m_planstmt->planTree;
BaseResult* baseresult = (BaseResult*)linitial(node->plans);
List* targetList = baseresult->plan.targetlist;
m_c_global->m_targetFuncNum = 0;
m_c_global->m_targetParamNum = 0;
InitBaseParam(targetList);
// local
m_c_local.m_estate->es_range_table = NIL;
m_c_local.m_estate->es_range_table = m_global->m_planstmt->rtable;
m_c_local.m_estate->es_plannedstmt = m_global->m_planstmt;
initParams(params);
return true;
}

View File

@ -3027,7 +3027,7 @@ static void CommitTransaction(bool STP_commit)
AtEOXact_PartitionCache(true);
AtEOXact_BucketCache(true);
AtEOXact_OpfusionReuse();
/*
* Make catalog changes visible to all backends. This has to happen after
* relcache references are dropped (see comments for
@ -3480,6 +3480,9 @@ static void PrepareTransaction(bool STP_commit)
AtEOXact_PartitionCache(true);
AtEOXact_BucketCache(true);
AtEOXact_OpfusionReuse();
/* notify doesn't need a postprepare call */
PostPrepare_PgStat();
@ -3937,6 +3940,7 @@ static void AbortTransaction(bool PerfectRollback, bool STP_rollback)
AtEOXact_FreeTupleDesc();
AtEOXact_PartitionCache(false);
AtEOXact_BucketCache(false);
AtEOXact_OpfusionReuse();
AtEOXact_Inval(false);
smgrDoPendingDeletes(false);
release_conn_to_compute_pool();

View File

@ -244,7 +244,7 @@ typedef struct knl_session_attr_sql {
char* db4ai_snapshot_version_separator;
int pldebugger_timeout;
bool partition_page_estimation;
bool enable_opfusion_reuse;
#ifndef ENABLE_MULTIPLE_NODES
bool uppercase_attribute_name;
#endif

View File

@ -2413,6 +2413,10 @@ typedef struct knl_u_dolphin_errdata_context {
int max_error_count;
} knl_u_dolphin_errdata_context;
typedef struct knl_u_opfusion_reuse_context {
void *opfusionObj; /* Opfusion cache Object */
} knl_u_opfusion_reuse_context;
typedef struct knl_u_catalog_context {
bool nulls[4];
struct PartitionIdentifier* route;
@ -2907,6 +2911,8 @@ typedef struct knl_session_context {
knl_u_rep_origin_context reporigin_cxt;
knl_u_dolphin_errdata_context dolphin_errdata_ctx;
knl_u_opfusion_reuse_context opfusion_reuse_ctx;
/*
* Initialize context which records time for client connection establish.
* This time records start on incommining resuest arrives e.g. poll() invoked to accept() and

View File

@ -46,6 +46,7 @@ extern void ExecCheckXactReadOnly(PlannedStmt* plannedstmt);
extern bool IsRightRefState(List* plantreeList);
EState* CreateExecutorStateForOpfusion(MemoryContext saveCxt, MemoryContext tmpCxt);
void FreeExecutorStateForOpfusion(EState* estate);
extern void AtEOXact_OpfusionReuse();
/*
* The variables in OpFusion is always in two parts: global's variables and local's variables.
@ -162,6 +163,12 @@ public:
!IsRightRefState(stmtList));
}
virtual bool ResetReuseFusion(MemoryContext context, CachedPlanSource* psrc, List* plantree_list, ParamListInfo params)
{
return false;
}
public:
struct ConstLoc {
Datum constValue;

View File

@ -40,10 +40,15 @@ public:
void InitGlobals();
void refreshParameterIfNecessary();
virtual bool ResetReuseFusion(MemoryContext context, CachedPlanSource* psrc, List* plantree_list, ParamListInfo params);
private:
unsigned long ExecInsert(Relation rel, ResultRelInfo* resultRelInfo);
void InitBaseParam(List* targetList);
struct InsertFusionGlobalVariable {
/* for func/op expr calculation */
FuncExprInfo* m_targetFuncNodes;

View File

@ -1921,6 +1921,41 @@ explain delete from v_test;
reset explain_perf_mode;
drop table test cascade;
NOTICE: drop cascades to view v_test
--test enable_opfusion_reuse
set enable_opfusion_reuse=on;
create table test_reuse_t1(a int,b int,c text);
create table test_reuse_t2(a int,b int,c text,d text);
begin;
insert into test_reuse_t1 values(1,1,'2');
insert into test_reuse_t1 values(2,3,'2');
insert into test_reuse_t2 values(1,2,'3','4');
insert into test_reuse_t1 values(1,3,'4');
select * from test_reuse_t1;
a | b | c
---+---+---
1 | 1 | 2
2 | 3 | 2
1 | 3 | 4
(3 rows)
commit;
truncate test_reuse_t1;
truncate test_reuse_t2;
begin;
insert into test_reuse_t1 values(1,1,'2');
alter table test_reuse_t1 add column colf int;
insert into test_reuse_t1 values(1,1,'2');
select * from test_reuse_t1;
a | b | c | colf
---+---+---+------
1 | 1 | 2 |
1 | 1 | 2 |
(2 rows)
commit;
drop table test_reuse_t1;
drop table test_reuse_t2;
reset enable_opfusion_reuse;
-- end
reset track_activities;
set track_sql_count = off;

View File

@ -293,6 +293,7 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c
enable_nvm | bool | | |
enable_online_ddl_waitlock | bool | | |
enable_opfusion | bool | | |
enable_opfusion_reuse | bool | | |
enable_orc_cache | bool | | |
enable_page_lsn_check | bool | | |
enable_partition_opfusion | bool | | |

View File

@ -430,6 +430,33 @@ reset explain_perf_mode;
drop table test cascade;
--test enable_opfusion_reuse
set enable_opfusion_reuse=on;
create table test_reuse_t1(a int,b int,c text);
create table test_reuse_t2(a int,b int,c text,d text);
begin;
insert into test_reuse_t1 values(1,1,'2');
insert into test_reuse_t1 values(2,3,'2');
insert into test_reuse_t2 values(1,2,'3','4');
insert into test_reuse_t1 values(1,3,'4');
select * from test_reuse_t1;
commit;
truncate test_reuse_t1;
truncate test_reuse_t2;
begin;
insert into test_reuse_t1 values(1,1,'2');
alter table test_reuse_t1 add column colf int;
insert into test_reuse_t1 values(1,1,'2');
select * from test_reuse_t1;
commit;
drop table test_reuse_t1;
drop table test_reuse_t2;
reset enable_opfusion_reuse;
-- end
reset track_activities;
set track_sql_count = off;