!3639 update场景的opfusion复用

Merge pull request !3639 from cc_db_dev/opfusion_update
This commit is contained in:
opengauss_bot
2023-06-30 06:43:52 +00:00
committed by Gitee
4 changed files with 100 additions and 48 deletions

View File

@ -344,8 +344,8 @@ MemoryContext opt_AllocSetContextCreate(MemoryContext parent, const char* name,
block->endptr = ((char*)context) + firstBlockSize;
block->allocSize = firstBlockSize;
context->totalSpace += firstBlockSize;
context->freeSpace += block->endptr - block->freeptr;
context->totalSpace = firstBlockSize;
context->freeSpace = block->endptr - block->freeptr;
block->prev = NULL;
block->next = NULL;

View File

@ -662,7 +662,7 @@ static void* TryReuseOpfusionObj(FusionType ftype, MemoryContext context, Cached
* we save the obj without FusionType check in FusionFactory
* so must check here
*/
if (INSERT_FUSION != ftype && DELETE_FUSION != ftype) {
if (INSERT_FUSION != ftype && DELETE_FUSION != ftype && UPDATE_FUSION != ftype) {
return NULL;
}

View File

@ -27,6 +27,7 @@
#include "access/tableam.h"
#include "commands/matview.h"
#include "executor/node/nodeModifyTable.h"
#include "opfusion/opfusion_indexscan.h"
#include "parser/parse_coerce.h"
UHeapTuple UpdateFusion::uheapModifyTuple(UHeapTuple tuple, Relation rel)
@ -105,56 +106,16 @@ UpdateFusion::UpdateFusion(MemoryContext context, CachedPlanSource* psrc, List*
MemoryContextSwitchTo(old_context);
}
void UpdateFusion::InitGlobals()
void UpdateFusion::InitBaseParam(List* targetList)
{
int hash_col_num = 0;
m_c_global = (UpdateFusionGlobalVariable*)palloc0(sizeof(UpdateFusionGlobalVariable));
ModifyTable* node = (ModifyTable*)m_global->m_planstmt->planTree;
Plan *updatePlan = (Plan *)linitial(node->plans);
IndexScan* indexscan = (IndexScan *)JudgePlanIsPartIterator(updatePlan);
m_global->m_reloid = getrelid(linitial_int((List*)linitial(m_global->m_planstmt->resultRelations)),
m_global->m_planstmt->rtable);
Relation rel = heap_open(m_global->m_reloid, RowExclusiveLock);
m_global->m_table_type = RelationIsUstoreFormat(rel) ? TAM_USTORE : TAM_HEAP;
m_global->m_exec_func_ptr = (OpFusionExecfuncType)&UpdateFusion::ExecUpdate;
m_global->m_is_bucket_rel = RELATION_OWN_BUCKET(rel);
m_global->m_natts = RelationGetDescr(rel)->natts;
m_global->m_tupDesc = CreateTupleDescCopy(RelationGetDescr(rel));
m_global->m_tupDesc->td_tam_ops = GetTableAmRoutine(m_global->m_table_type);
hash_col_num = rel->rd_isblockchain ? 1 : 0;
heap_close(rel, NoLock);
#ifdef USE_ASSERT_CHECKING
if (m_global->m_is_bucket_rel) {
/* ctid + tablebucketid */
Assert(m_global->m_natts + 2 == list_length(indexscan->scan.plan.targetlist) + hash_col_num);
} else {
/* ctid */
if (indexscan->scan.isPartTbl) {
Assert(m_global->m_natts + 2 <= list_length(indexscan->scan.plan.targetlist) + hash_col_num);
} else {
Assert(m_global->m_natts + 1 <= list_length(indexscan->scan.plan.targetlist) + hash_col_num);
}
}
#endif
/* init target, include param and const */
m_c_global->m_targetParamNum = 0;
m_c_global->m_targetParamLoc = (ParamLoc*)palloc0(m_global->m_natts * sizeof(ParamLoc));
m_c_global->m_targetConstLoc = (ConstLoc*)palloc0(m_global->m_natts * sizeof(ConstLoc));
m_c_global->m_targetVarLoc = (VarLoc*)palloc0(m_global->m_natts * sizeof(VarLoc));
m_c_global->m_targetFuncNum = 0;
m_c_global->m_targetFuncNodes = (FuncExprInfo*)palloc0(m_global->m_natts * sizeof(FuncExprInfo));
m_c_global->m_varNum = 0;
int i = 0;
ListCell* lc = NULL;
OpExpr* opexpr = NULL;
int i = 0;
FuncExpr* func = NULL;
TargetEntry* res = NULL;
Expr* expr = NULL;
OpExpr* opexpr = NULL;
foreach (lc, indexscan->scan.plan.targetlist) {
foreach (lc, targetList) {
/* ignore ctid + tablebucketid or ctid at last */
if (i >= m_global->m_natts) {
break;
@ -200,6 +161,52 @@ void UpdateFusion::InitGlobals()
m_c_global->m_targetConstNum = i;
}
void UpdateFusion::InitGlobals()
{
int hash_col_num = 0;
m_c_global = (UpdateFusionGlobalVariable*)palloc0(sizeof(UpdateFusionGlobalVariable));
ModifyTable* node = (ModifyTable*)m_global->m_planstmt->planTree;
Plan *updatePlan = (Plan *)linitial(node->plans);
IndexScan* indexscan = (IndexScan *)JudgePlanIsPartIterator(updatePlan);
m_global->m_reloid = getrelid(linitial_int((List*)linitial(m_global->m_planstmt->resultRelations)),
m_global->m_planstmt->rtable);
Relation rel = heap_open(m_global->m_reloid, RowExclusiveLock);
m_global->m_table_type = RelationIsUstoreFormat(rel) ? TAM_USTORE : TAM_HEAP;
m_global->m_exec_func_ptr = (OpFusionExecfuncType)&UpdateFusion::ExecUpdate;
m_global->m_is_bucket_rel = RELATION_OWN_BUCKET(rel);
m_global->m_natts = RelationGetDescr(rel)->natts;
m_global->m_tupDesc = CreateTupleDescCopy(RelationGetDescr(rel));
m_global->m_tupDesc->td_tam_ops = GetTableAmRoutine(m_global->m_table_type);
hash_col_num = rel->rd_isblockchain ? 1 : 0;
heap_close(rel, NoLock);
#ifdef USE_ASSERT_CHECKING
if (m_global->m_is_bucket_rel) {
/* ctid + tablebucketid */
Assert(m_global->m_natts + 2 == list_length(indexscan->scan.plan.targetlist) + hash_col_num);
} else {
/* ctid */
if (indexscan->scan.isPartTbl) {
Assert(m_global->m_natts + 2 <= list_length(indexscan->scan.plan.targetlist) + hash_col_num);
} else {
Assert(m_global->m_natts + 1 <= list_length(indexscan->scan.plan.targetlist) + hash_col_num);
}
}
#endif
/* init target, include param and const */
m_c_global->m_targetParamNum = 0;
m_c_global->m_targetParamLoc = (ParamLoc*)palloc0(m_global->m_natts * sizeof(ParamLoc));
m_c_global->m_targetConstLoc = (ConstLoc*)palloc0(m_global->m_natts * sizeof(ConstLoc));
m_c_global->m_targetVarLoc = (VarLoc*)palloc0(m_global->m_natts * sizeof(VarLoc));
m_c_global->m_targetFuncNum = 0;
m_c_global->m_targetFuncNodes = (FuncExprInfo*)palloc0(m_global->m_natts * sizeof(FuncExprInfo));
m_c_global->m_varNum = 0;
InitBaseParam(indexscan->scan.plan.targetlist);
}
void UpdateFusion::InitLocals(ParamListInfo params)
{
m_local.m_tmpisnull = NULL;
@ -658,3 +665,43 @@ bool UpdateFusion::execute(long max_rows, char *completionTag)
u_sess->statement_cxt.last_row_count = u_sess->statement_cxt.current_row_count;
return success;
}
/*
* reset InsertFusion for reuse:
* such as
* insert into t values (1, 'a');
* insert into t values (2, 'b');
* only need to replace the planstmt
*/
bool UpdateFusion::ResetReuseFusion(MemoryContext context, CachedPlanSource* psrc, List* plantree_list, ParamListInfo params)
{
PlannedStmt *curr_plan = (PlannedStmt *)linitial(plantree_list);
m_global->m_planstmt = curr_plan;
ModifyTable* node = (ModifyTable*)m_global->m_planstmt->planTree;
Plan *updatePlan = (Plan *)linitial(node->plans);
IndexScan* indexscan = (IndexScan *)JudgePlanIsPartIterator(updatePlan);
List* targetList = indexscan->scan.plan.targetlist;
m_c_global->m_targetFuncNum = 0;
m_c_global->m_targetParamNum = 0;
m_c_global->m_varNum = 0;
InitBaseParam(targetList);
// init local
m_c_local.m_estate->es_range_table = m_global->m_planstmt->rtable;
m_c_local.m_estate->es_plannedstmt = m_global->m_planstmt;
initParams(params);
if(IsA(indexscan, IndexScan)
&& typeid(*(m_local.m_scan)) == typeid(IndexScanFusion)) {
((IndexScanFusion *)m_local.m_scan)->ResetIndexScanFusion(indexscan, m_global->m_planstmt,
m_local.m_outParams ? m_local.m_outParams : m_local.m_params);
} else {
m_local.m_scan = ScanFusion::getScanFusion((Node*)indexscan, m_global->m_planstmt,
m_local.m_outParams ? m_local.m_outParams : m_local.m_params);
}
return true;
}

View File

@ -40,6 +40,11 @@ public:
void InitGlobals();
void refreshTargetParameterIfNecessary();
void InitBaseParam(List* targetList);
bool ResetReuseFusion(MemoryContext context, CachedPlanSource* psrc, List* plantree_list, ParamListInfo params);
private:
HeapTuple heapModifyTuple(HeapTuple tuple);