fix memory lead of opfusion, and fix performance deterioration problem.

This commit is contained in:
xiong_xjun
2020-12-12 16:11:37 +08:00
parent 4898995c3a
commit d66a0a3697
3 changed files with 73 additions and 29 deletions

View File

@ -613,15 +613,15 @@ void OpFusion::setPreparedDestReceiver(DestReceiver* preparedDest)
}
/* evaluate datum from node, simple node types such as Const, Param, Var excludes FuncExpr and OpExpr */
Datum OpFusion::EvalSimpleArg(Node* arg, bool* is_null)
Datum OpFusion::EvalSimpleArg(Node* arg, bool* is_null, Datum* values, bool* isNulls)
{
switch (nodeTag(arg)) {
case T_Const:
*is_null = ((Const*)arg)->constisnull;
return ((Const*)arg)->constvalue;
case T_Var:
*is_null = m_isnull[(((Var*)arg)->varattno - 1)];
return m_values[(((Var*)arg)->varattno - 1)];
*is_null = isNulls[(((Var*)arg)->varattno - 1)];
return values[(((Var*)arg)->varattno - 1)];
case T_Param: {
ParamListInfo param_list = m_outParams != NULL ? m_outParams : m_params;
@ -633,13 +633,13 @@ Datum OpFusion::EvalSimpleArg(Node* arg, bool* is_null)
case T_RelabelType: {
if (IsA(((RelabelType*)arg)->arg, FuncExpr)) {
FuncExpr* expr = (FuncExpr*)(((RelabelType*)arg)->arg);
return CalFuncNodeVal(expr->funcid, expr->args, is_null);
return CalFuncNodeVal(expr->funcid, expr->args, is_null, values, isNulls);
} else if (IsA(((RelabelType*)arg)->arg, OpExpr)) {
OpExpr* expr = (OpExpr*)(((RelabelType*)arg)->arg);
return CalFuncNodeVal(expr->opfuncid, expr->args, is_null);
return CalFuncNodeVal(expr->opfuncid, expr->args, is_null, values, isNulls);
}
return EvalSimpleArg((Node*)((RelabelType*)arg)->arg, is_null);
return EvalSimpleArg((Node*)((RelabelType*)arg)->arg, is_null, values, isNulls);
}
default:
Assert(0);
@ -653,7 +653,7 @@ Datum OpFusion::EvalSimpleArg(Node* arg, bool* is_null)
return 0;
}
Datum OpFusion::CalFuncNodeVal(Oid functionId, List* args, bool* is_null)
Datum OpFusion::CalFuncNodeVal(Oid functionId, List* args, bool* is_null, Datum* values, bool* isNulls)
{
if (*is_null) {
return 0;
@ -666,11 +666,13 @@ Datum OpFusion::CalFuncNodeVal(Oid functionId, List* args, bool* is_null)
/* for now, we assuming FuncExpr and OpExpr only appear in the first arg */
if (IsA(first_arg_node, FuncExpr)) {
arg[0] = CalFuncNodeVal(((FuncExpr*)first_arg_node)->funcid, ((FuncExpr*)first_arg_node)->args, &is_nulls[0]);
arg[0] = CalFuncNodeVal(((FuncExpr*)first_arg_node)->funcid, ((FuncExpr*)first_arg_node)->args,
&is_nulls[0], values, isNulls);
} else if (IsA(first_arg_node, OpExpr)) {
arg[0] = CalFuncNodeVal(((OpExpr*)first_arg_node)->opfuncid, ((OpExpr*)first_arg_node)->args, &is_nulls[0]);
arg[0] = CalFuncNodeVal(((OpExpr*)first_arg_node)->opfuncid, ((OpExpr*)first_arg_node)->args,
&is_nulls[0], values, isNulls);
} else {
arg[0] = EvalSimpleArg(first_arg_node, &is_nulls[0]);
arg[0] = EvalSimpleArg(first_arg_node, &is_nulls[0], values, isNulls);
}
int length = list_length(args);
@ -683,7 +685,7 @@ Datum OpFusion::CalFuncNodeVal(Oid functionId, List* args, bool* is_null)
ListCell* tmp_arg = list_head(args);
for (int i = 1; i < length; i++) {
tmp_arg = lnext(tmp_arg);
arg[i] = EvalSimpleArg((Node*)lfirst(tmp_arg), &is_nulls[i]);
arg[i] = EvalSimpleArg((Node*)lfirst(tmp_arg), &is_nulls[i], values, isNulls);
*is_null = *is_null || is_nulls[i];
}
if (*is_null) {
@ -994,6 +996,8 @@ InsertFusion::InsertFusion(MemoryContext context, CachedPlanSource* psrc, List*
m_reslot = MakeSingleTupleTableSlot(m_tupDesc);
m_values = (Datum*)palloc0(RelationGetDescr(rel)->natts * sizeof(Datum));
m_isnull = (bool*)palloc0(RelationGetDescr(rel)->natts * sizeof(bool));
m_curVarValue = (Datum*)palloc0(RelationGetDescr(rel)->natts * sizeof(Datum));
m_curVarIsnull = (bool*)palloc0(RelationGetDescr(rel)->natts * sizeof(bool));
m_is_bucket_rel = RELATION_OWN_BUCKET(rel);
heap_close(rel, AccessShareLock);
@ -1060,12 +1064,17 @@ InsertFusion::InsertFusion(MemoryContext context, CachedPlanSource* psrc, List*
void InsertFusion::refreshParameterIfNecessary()
{
ParamListInfo parms = m_outParams != NULL ? m_outParams : m_params;
/* save cur var value */
for (int i = 0; i < m_tupDesc->natts; i++) {
m_curVarValue[i] = m_values[i];
m_curVarIsnull[i] = m_isnull[i];
}
/* calculate func result */
for (int i = 0; i < m_targetFuncNum; ++i) {
if (m_targetFuncNodes[i].funcid != InvalidOid) {
bool func_isnull = false;
m_values[m_targetFuncNodes[i].resno - 1] = CalFuncNodeVal(
m_targetFuncNodes[i].funcid, m_targetFuncNodes[i].args, &func_isnull);
m_targetFuncNodes[i].funcid, m_targetFuncNodes[i].args, &func_isnull, m_curVarValue, m_curVarIsnull);
m_isnull[m_targetFuncNodes[i].resno - 1] = func_isnull;
}
}
@ -1295,9 +1304,13 @@ UpdateFusion::UpdateFusion(MemoryContext context, CachedPlanSource* psrc, List*
m_targetIsnull = (bool*)palloc0(RelationGetDescr(rel)->natts * sizeof(bool));
m_targetParamLoc = (ParamLoc*)palloc0(RelationGetDescr(rel)->natts * sizeof(ParamLoc));
m_targetConstLoc = (int*)palloc0(RelationGetDescr(rel)->natts * sizeof(int));
m_curVarValue = (Datum*)palloc0(RelationGetDescr(rel)->natts * sizeof(Datum));
m_curVarIsnull = (bool*)palloc0(RelationGetDescr(rel)->natts * sizeof(bool));
m_targetVarLoc = (VarLoc*)palloc0(RelationGetDescr(rel)->natts * sizeof(VarLoc));
m_targetFuncNum = 0;
m_targetFuncNodes = (FuncExprInfo*)palloc0(RelationGetDescr(rel)->natts * sizeof(FuncExprInfo));
m_varNum = 0;
int i = 0;
ListCell* lc = NULL;
@ -1326,6 +1339,10 @@ UpdateFusion::UpdateFusion(MemoryContext context, CachedPlanSource* psrc, List*
m_targetIsnull[i] = ((Const*)expr)->constisnull;
m_targetValues[i] = ((Const*)expr)->constvalue;
m_targetConstLoc[i] = i;
} else if (IsA(expr, Var)) {
Var* var = (Var*)expr;
m_targetVarLoc[m_varNum].varNo = var->varattno;
m_targetVarLoc[m_varNum++].scanKeyIndx = i;
} else if (IsA(expr, OpExpr)) {
opexpr = (OpExpr*)expr;
@ -1360,6 +1377,17 @@ UpdateFusion::UpdateFusion(MemoryContext context, CachedPlanSource* psrc, List*
void UpdateFusion::refreshTargetParameterIfNecessary()
{
ParamListInfo parms = m_outParams != NULL ? m_outParams : m_params;
/* save cur var value */
for (int i = 0; i < m_tupDesc->natts; i++) {
m_curVarValue[i] = m_values[i];
m_curVarIsnull[i] = m_isnull[i];
}
if (m_varNum > 0) {
for (int i = 0; i < m_varNum; i++) {
m_values[m_targetVarLoc[i].scanKeyIndx] = m_curVarValue[m_targetVarLoc[i].varNo - 1];
m_isnull[m_targetVarLoc[i].scanKeyIndx] = m_curVarIsnull[m_targetVarLoc[i].varNo - 1];
}
}
/* mapping value for update from target */
if (m_targetParamNum > 0) {
Assert(m_targetParamNum > 0);
@ -1380,7 +1408,8 @@ void UpdateFusion::refreshTargetParameterIfNecessary()
if (m_targetFuncNodes[i].funcid != InvalidOid) {
bool func_isnull = false;
m_values[m_targetFuncNodes[i].resno - 1] =
CalFuncNodeVal(m_targetFuncNodes[i].funcid, m_targetFuncNodes[i].args, &func_isnull);
CalFuncNodeVal(m_targetFuncNodes[i].funcid, m_targetFuncNodes[i].args, &func_isnull,
m_curVarValue, m_curVarIsnull);
m_isnull[m_targetFuncNodes[i].resno - 1] = func_isnull;
}
}
@ -1878,6 +1907,7 @@ bool SelectForUpdateFusion::execute(long max_rows, char* completionTag)
ReleaseBuffer(buffer);
if (result == HeapTupleSelfUpdated) {
heap_freetuple_ext(tmptup);
continue;
}
@ -1887,6 +1917,7 @@ bool SelectForUpdateFusion::execute(long max_rows, char* completionTag)
errmsg("attempted to lock invisible tuple")));
break;
case HeapTupleSelfUpdated:
heap_freetuple_ext(tmptup);
/* already deleted by self; nothing to do */
break;
@ -1894,6 +1925,7 @@ bool SelectForUpdateFusion::execute(long max_rows, char* completionTag)
/* done successfully */
nprocessed++;
(*m_receiver->receiveSlot)(m_reslot, m_receiver);
heap_freetuple_ext(tmptup);
break;
case HeapTupleUpdated:
@ -1929,6 +1961,7 @@ bool SelectForUpdateFusion::execute(long max_rows, char* completionTag)
m_tmpvals[i] = valuesfornew[m_attrno[i] - 1];
m_tmpisnull[i] = isnullfornew[m_attrno[i] - 1];
}
heap_freetuple_ext(tmptup);
tmptup = heap_form_tuple(m_tupDesc, m_tmpvals, m_tmpisnull);
Assert(tmptup != NULL);
@ -1943,6 +1976,7 @@ bool SelectForUpdateFusion::execute(long max_rows, char* completionTag)
(*m_receiver->receiveSlot)(m_reslot, m_receiver);
tuple->t_self = update_ctid;
tuple = copyTuple;
heap_freetuple_ext(tmptup);
pfree(valuesfornew);
pfree(isnullfornew);
}
@ -1954,8 +1988,6 @@ bool SelectForUpdateFusion::execute(long max_rows, char* completionTag)
}
}
heap_freetuple_ext(tmptup);
if (!ScanDirectionIsNoMovement(*(m_scan->m_direction))) {
if (max_rows == 0 || nprocessed < (unsigned long)max_rows) {
m_isCompleted = true;
@ -2117,10 +2149,10 @@ bool AggFusion::execute(long max_rows, char *completionTag)
HeapTuple tmptup = heap_form_tuple(m_tupDesc, values, isnull);
(void)ExecStoreTuple(tmptup, reslot, InvalidBuffer, false);
heap_freetuple_ext(tmptup);
slot_getsomeattrs(reslot, m_tupDesc->natts);
(*m_receiver->receiveSlot)(reslot, m_receiver);
heap_freetuple_ext(tmptup);
success = true;
@ -2389,8 +2421,8 @@ bool SortFusion::execute(long max_rows, char *completionTag)
HeapTuple tmptup = heap_form_tuple(m_tupDesc, values, isnull);
(void)ExecStoreTuple(tmptup, reslot, InvalidBuffer, false);
heap_freetuple_ext(tmptup);
(*m_receiver->receiveSlot)(reslot, m_receiver);
heap_freetuple_ext(tmptup);
CHECK_FOR_INTERRUPTS();
nprocessed++;

View File

@ -277,9 +277,8 @@ static bool checkFlinfo(Node *node)
return true;
}
static bool checkExpr(Node *node, bool is_first, int resno)
static bool checkExpr(Node *node, bool is_first)
{
const int noVarInSubtree = -1;
NodeTag tag = nodeTag(node);
switch (tag) {
case T_Const:
@ -287,10 +286,6 @@ static bool checkExpr(Node *node, bool is_first, int resno)
return true;
}
case T_Var: {
/* bypass currently does not support updating with var */
if ((resno != ((Var*)node)->varattno) || (resno == noVarInSubtree)) {
return false;
}
return true;
}
@ -316,7 +311,7 @@ static bool checkExpr(Node *node, bool is_first, int resno)
bool result = true;
ListCell *lc = NULL;
foreach (lc, args) {
result = result && checkExpr((Node *)lfirst(lc), is_first, noVarInSubtree);
result = result && checkExpr((Node *)lfirst(lc), is_first);
is_first = false;
}
return result;
@ -339,14 +334,14 @@ static bool checkExpr(Node *node, bool is_first, int resno)
bool result = true;
ListCell *lc = NULL;
foreach (lc, args) {
result = result && checkExpr((Node *)lfirst(lc), is_first, noVarInSubtree);
result = result && checkExpr((Node *)lfirst(lc), is_first);
is_first = false;
}
return result;
}
case T_RelabelType: {
return checkExpr((Node *)((RelabelType *)node)->arg, is_first, noVarInSubtree);
return checkExpr((Node *)((RelabelType *)node)->arg, is_first);
}
default: {
@ -731,7 +726,7 @@ FusionType checkTargetlist(List *targetList, FusionType ftype)
if (target->resjunk && nodeTag((Node *)target->expr) == T_Var) {
continue;
}
if (!checkExpr((Node *)target->expr, true, target->resno)) {
if (!checkExpr((Node *)target->expr, true)) {
return NOBYPASS_EXP_NOT_SUPPORT;
}
}

View File

@ -82,9 +82,9 @@ public:
void setPreparedDestReceiver(DestReceiver* preparedDest);
Datum CalFuncNodeVal(Oid functionId, List* args, bool* is_null);
Datum CalFuncNodeVal(Oid functionId, List* args, bool* is_null, Datum* values, bool* isNulls);
Datum EvalSimpleArg(Node* arg, bool* is_null);
Datum EvalSimpleArg(Node* arg, bool* is_null, Datum* values, bool* isNulls);
static void tearDown(OpFusion* opfusion);
@ -201,6 +201,10 @@ private:
int m_targetParamNum;
Datum* m_curVarValue;
bool* m_curVarIsnull;
bool m_is_bucket_rel;
};
@ -230,6 +234,19 @@ private:
bool* m_targetIsnull;
Datum* m_curVarValue;
struct VarLoc {
int varNo;
int scanKeyIndx;
};
VarLoc* m_targetVarLoc;
int m_varNum;
bool* m_curVarIsnull;
int* m_targetConstLoc;
ParamLoc* m_targetParamLoc;