!3031 优化RowToVec算子在投影列与过滤列不重叠情况下的性能
Merge pull request !3031 from wanghao19920907/master_rowtvec
This commit is contained in:
@ -548,6 +548,7 @@ static void GetAccessedVarNumbers(ProjectionInfo* projInfo, List* targetList, Li
|
||||
List* vars = NIL;
|
||||
List* varattno_list = NIL;
|
||||
List* lateAccessVarNoList = NIL;
|
||||
List* projectVarNumbers = NIL;
|
||||
List* sysVarList = NIL;
|
||||
List* qualVarNoList = NIL;
|
||||
bool isConst = false;
|
||||
@ -579,6 +580,7 @@ static void GetAccessedVarNumbers(ProjectionInfo* projInfo, List* targetList, Li
|
||||
* Used for PackT optimization: PackTCopyVarsList records those columns what we need to move.
|
||||
*/
|
||||
List* PackTCopyVarsList = list_copy(varattno_list);
|
||||
projectVarNumbers = list_copy(varattno_list);
|
||||
|
||||
/* Now consider the quals */
|
||||
vars = pull_var_clause((Node*)qual, PVC_RECURSE_AGGREGATES, PVC_RECURSE_PLACEHOLDERS);
|
||||
@ -617,6 +619,7 @@ static void GetAccessedVarNumbers(ProjectionInfo* projInfo, List* targetList, Li
|
||||
projInfo->pi_PackTCopyVars = PackTCopyVarsList;
|
||||
projInfo->pi_acessedVarNumbers = varattno_list;
|
||||
projInfo->pi_lateAceessVarNumbers = lateAccessVarNoList;
|
||||
projInfo->pi_projectVarNumbers = projectVarNumbers;
|
||||
projInfo->pi_sysAttrList = sysVarList;
|
||||
projInfo->pi_const = isConst;
|
||||
projInfo->pi_PackLateAccessVarNumbers = PackLateAccessList;
|
||||
|
@ -725,8 +725,7 @@ static void InitRelationBatchScanEnv(SeqScanState *state)
|
||||
List *pColList = proj->pi_acessedVarNumbers;
|
||||
|
||||
batchstate->colNum = list_length(pColList);
|
||||
batchstate->lateRead = (bool *)palloc0(sizeof(bool) * batchstate->colNum);
|
||||
batchstate->colId = (int *)palloc(sizeof(int) * batchstate->colNum);
|
||||
batchstate->colAttr = (ScanBatchColAttr *)palloc0(sizeof(ScanBatchColAttr) * batchstate->colNum);
|
||||
|
||||
int i = 0;
|
||||
ListCell *cell = NULL;
|
||||
@ -734,17 +733,27 @@ static void InitRelationBatchScanEnv(SeqScanState *state)
|
||||
/* Initilize which columns should be accessed */
|
||||
foreach (cell, pColList) {
|
||||
Assert(lfirst_int(cell) > 0);
|
||||
batchstate->colId[i] = lfirst_int(cell) - 1;
|
||||
batchstate->lateRead[i] = false;
|
||||
batchstate->colAttr[i].colId = lfirst_int(cell) - 1;
|
||||
batchstate->colAttr[i].lateRead = false;
|
||||
i++;
|
||||
}
|
||||
|
||||
foreach (cell, proj->pi_projectVarNumbers) {
|
||||
int colId = lfirst_int(cell) - 1;
|
||||
for (i = 0; i < batchstate->colNum; ++i) {
|
||||
if (batchstate->colAttr[i].colId == colId) {
|
||||
batchstate->colAttr[i].isProject = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Intilize which columns will be late read */
|
||||
foreach (cell, proj->pi_lateAceessVarNumbers) {
|
||||
int colId = lfirst_int(cell) - 1;
|
||||
for (i = 0; i < batchstate->colNum; ++i) {
|
||||
if (batchstate->colId[i] == colId) {
|
||||
batchstate->lateRead[i] = true;
|
||||
if (batchstate->colAttr[i].colId == colId) {
|
||||
batchstate->colAttr[i].lateRead = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -806,8 +815,7 @@ static SeqScanState *ExecInitSeqScanBatchMode(SeqScan *node, SeqScanState* scans
|
||||
ExecAssignVectorForExprEval(scanstate->ps.ps_ExprContext);
|
||||
|
||||
scanstate->ps.targetlist = (List *)ExecInitVecExpr((Expr *)node->plan.targetlist, (PlanState *)scanstate);
|
||||
scanBatchState->pCurrentBatch = New(CurrentMemoryContext)
|
||||
VectorBatch(CurrentMemoryContext, scanstate->ps.ps_ResultTupleSlot->tts_tupleDescriptor);
|
||||
|
||||
scanBatchState->pScanBatch =
|
||||
New(CurrentMemoryContext)VectorBatch(CurrentMemoryContext, scanstate->ss_currentRelation->rd_att);
|
||||
|
||||
@ -821,7 +829,7 @@ static SeqScanState *ExecInitSeqScanBatchMode(SeqScan *node, SeqScanState* scans
|
||||
|
||||
InitRelationBatchScanEnv(scanstate);
|
||||
for (i = 0; i < scanBatchState->colNum; i++) {
|
||||
scanBatchState->maxcolId = Max(scanBatchState->maxcolId, scanBatchState->colId[i]);
|
||||
scanBatchState->maxcolId = Max(scanBatchState->maxcolId, scanBatchState->colAttr[i].colId);
|
||||
}
|
||||
scanBatchState->maxcolId++;
|
||||
|
||||
@ -831,7 +839,8 @@ static SeqScanState *ExecInitSeqScanBatchMode(SeqScan *node, SeqScanState* scans
|
||||
proj = scanstate->ps.ps_ProjInfo;
|
||||
|
||||
/* Check if it is simple without need to invoke projection code */
|
||||
fSimpleMap = proj->pi_directMap && (scanBatchState->pCurrentBatch->m_cols == proj->pi_numSimpleVars);
|
||||
fSimpleMap = proj->pi_directMap &&
|
||||
(scanstate->ps.ps_ResultTupleSlot->tts_tupleDescriptor->natts == proj->pi_numSimpleVars);
|
||||
scanstate->ps.ps_ProjInfo->pi_directMap = fSimpleMap;
|
||||
scanBatchState->scanfinished = false;
|
||||
}
|
||||
|
@ -142,84 +142,111 @@ inline struct varlena* DetoastDatumBatch(struct varlena* datum, ScalarVector* ar
|
||||
}
|
||||
|
||||
template<bool hasNull>
|
||||
static void FillVector(ScalarVector* pVector, int rows)
|
||||
static inline void FillVector(ScalarVector* ipVector, ScalarVector* opVector, int rows)
|
||||
{
|
||||
ScalarVector* pVector = opVector ? opVector : ipVector;
|
||||
|
||||
for (int i = 0; i < rows; i++) {
|
||||
if (hasNull && unlikely(IS_NULL(pVector->m_flag[i]))) {
|
||||
if (hasNull && unlikely(IS_NULL(ipVector->m_flag[i]))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pVector->AddVar(pVector->m_vals[i], i);
|
||||
pVector->AddVar(ipVector->m_vals[i], i);
|
||||
}
|
||||
}
|
||||
|
||||
template<bool hasNull>
|
||||
static void FillTidVector(ScalarVector* pVector, int rows)
|
||||
static inline void FillTidVector(ScalarVector* ipVector, ScalarVector* opVector, int rows)
|
||||
{
|
||||
ScalarVector* pVector = opVector ? opVector : ipVector;
|
||||
|
||||
for (int i = 0; i < rows; i++) {
|
||||
if (hasNull && unlikely(IS_NULL(pVector->m_flag[i]))) {
|
||||
if (hasNull && unlikely(IS_NULL(ipVector->m_flag[i]))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ItemPointer srcTid = (ItemPointer)DatumGetPointer(pVector->m_vals[i]);
|
||||
ItemPointer srcTid = (ItemPointer)DatumGetPointer(ipVector->m_vals[i]);
|
||||
ItemPointer destTid = (ItemPointer)(&pVector->m_vals[i]);
|
||||
*destTid = *srcTid;
|
||||
}
|
||||
}
|
||||
|
||||
template<bool hasNull>
|
||||
static void TransformScalarVector(Form_pg_attribute attr, ScalarVector* pVector, int rows)
|
||||
static inline void FillVarlenaVector(Form_pg_attribute attr, ScalarVector* ipVector, ScalarVector* opVector, int rows)
|
||||
{
|
||||
int i = 0;
|
||||
int typeLen = attr->attlen;
|
||||
ScalarVector* pVector;
|
||||
int i, off;
|
||||
Datum v, v0;
|
||||
switch (typeLen) {
|
||||
|
||||
if (opVector) {
|
||||
pVector = opVector;
|
||||
off = opVector->m_rows;
|
||||
}
|
||||
else {
|
||||
pVector = ipVector;
|
||||
off = 0;
|
||||
}
|
||||
|
||||
if (attr->atttypid == NUMERICOID) {
|
||||
for (i = 0; i < rows; i++) {
|
||||
if (hasNull && unlikely(IS_NULL(ipVector->m_flag[i]))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
v = ipVector->m_vals[i];
|
||||
/* if numeric cloumn, try to convert numeric to big integer */
|
||||
pVector->m_vals[i + off] = try_direct_convert_numeric_normal_to_fast(v, pVector);
|
||||
}
|
||||
}
|
||||
else {
|
||||
for (i = 0; i < rows; i++) {
|
||||
if (hasNull && unlikely(IS_NULL(ipVector->m_flag[i]))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
v0 = ipVector->m_vals[i];
|
||||
v = PointerGetDatum(DetoastDatumBatch((struct varlena *)DatumGetPointer(v0), pVector));
|
||||
if (v == v0) {
|
||||
pVector->AddVar(v0, i + off);
|
||||
} else {
|
||||
pVector->m_vals[i + off] = v;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template<bool hasNull>
|
||||
static void TransformScalarVector(Form_pg_attribute attr, ScalarVector* ipVector, ScalarVector* opVector, int rows)
|
||||
{
|
||||
switch (attr->attlen) {
|
||||
case sizeof(char):
|
||||
case sizeof(int16):
|
||||
case sizeof(int32):
|
||||
case sizeof(Datum):
|
||||
/* nothing to do */
|
||||
if (opVector) {
|
||||
for (int i = 0; i < rows; i++) {
|
||||
if (hasNull && unlikely(IS_NULL(ipVector->m_flag[i]))) {
|
||||
continue;
|
||||
}
|
||||
opVector->m_vals[opVector->m_rows + i] = ipVector->m_vals[i];
|
||||
}
|
||||
}
|
||||
break;
|
||||
/* See ScalarVector::DatumToScalar to get the define */
|
||||
case 12: /* TIMETZOID, TINTERVALOID */
|
||||
case 16: /* INTERVALOID, UUIDOID */
|
||||
case 64: /* NAMEOID */
|
||||
case -2:
|
||||
FillVector<hasNull>(pVector, rows);
|
||||
FillVector<hasNull>(ipVector, opVector, rows);
|
||||
break;
|
||||
case -1:
|
||||
if (attr->atttypid == NUMERICOID) {
|
||||
for (i = 0; i < rows; i++) {
|
||||
if (hasNull && unlikely(IS_NULL(pVector->m_flag[i]))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
v = pVector->m_vals[i];
|
||||
/* if numeric cloumn, try to convert numeric to big integer */
|
||||
pVector->m_vals[i] = try_direct_convert_numeric_normal_to_fast(v, pVector);
|
||||
}
|
||||
}
|
||||
else {
|
||||
for (i = 0; i < rows; i++) {
|
||||
if (hasNull && unlikely(IS_NULL(pVector->m_flag[i]))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
v0 = pVector->m_vals[i];
|
||||
v = PointerGetDatum(DetoastDatumBatch((struct varlena *)DatumGetPointer(v0), pVector));
|
||||
if (v == v0) {
|
||||
pVector->AddVar(v0, i);
|
||||
} else {
|
||||
pVector->m_vals[i] = v;
|
||||
}
|
||||
}
|
||||
}
|
||||
FillVarlenaVector<hasNull>(attr, ipVector, opVector, rows);
|
||||
break;
|
||||
case 6:
|
||||
if (attr->atttypid == TIDOID && !attr->attbyval) {
|
||||
FillTidVector<hasNull>(pVector, rows);
|
||||
FillTidVector<hasNull>(ipVector, opVector, rows);
|
||||
} else {
|
||||
FillVector<hasNull>(pVector, rows);
|
||||
FillVector<hasNull>(ipVector, opVector, rows);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
@ -229,35 +256,46 @@ static void TransformScalarVector(Form_pg_attribute attr, ScalarVector* pVector,
|
||||
}
|
||||
|
||||
template <bool lateRead>
|
||||
void VectorizeTupleBatchMode(VectorBatch *pBatch, TupleTableSlot **slots,
|
||||
ExprContext *econtext, ScanBatchState *scanstate, int rows)
|
||||
static void VectorizeTupleBatchMode(VectorBatch *ipBatch, VectorBatch *opBatch, FormData_pg_attribute* attrs,
|
||||
ExprContext *econtext, SeqScanState *node, int rows)
|
||||
{
|
||||
int i, j, colidx = 0;
|
||||
int i, colidx = 0;
|
||||
MemoryContext transformContext = econtext->ecxt_per_tuple_memory;
|
||||
ProjectionInfo *proj = node->ps.ps_ProjInfo;
|
||||
AttrNumber att;
|
||||
ScanBatchState *scanstate = node->scanBatchState;
|
||||
|
||||
/* Extract all the values of the old tuple */
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(transformContext);
|
||||
|
||||
/* for not late read, deform all the column into batch */
|
||||
if (!lateRead) {
|
||||
for (j = 0; j < rows; j++) {
|
||||
tableam_tslot_formbatch(slots[j], pBatch, j, scanstate->maxcolId);
|
||||
}
|
||||
|
||||
for (i = 0; i < scanstate->maxcolId; i++) {
|
||||
scanstate->nullflag[i] = pBatch->m_arr[i].m_const;
|
||||
pBatch->m_arr[i].m_const = false;
|
||||
if (opBatch) {
|
||||
Assert(lateRead);
|
||||
for (i = 0; i < scanstate->colNum; i++) {
|
||||
if (scanstate->colAttr[i].isProject) {
|
||||
att = proj->pi_varNumbers[i];
|
||||
/* not overlap project columns */
|
||||
if (scanstate->colAttr[i].lateRead) {
|
||||
colidx = scanstate->colAttr[i].colId;
|
||||
if (scanstate->nullflag[colidx]) {
|
||||
TransformScalarVector<true>(&attrs[colidx], &ipBatch->m_arr[att - 1], &opBatch->m_arr[i], rows);
|
||||
} else {
|
||||
TransformScalarVector<false>(&attrs[colidx], &ipBatch->m_arr[att - 1], &opBatch->m_arr[i], rows);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (i = 0; i < scanstate->colNum; i++) {
|
||||
if ((lateRead && scanstate->lateRead[i]) || (!lateRead && !scanstate->lateRead[i])) {
|
||||
colidx = scanstate->colId[i];
|
||||
Form_pg_attribute attr = &slots[0]->tts_tupleDescriptor->attrs[colidx];
|
||||
if (scanstate->nullflag[colidx]) {
|
||||
TransformScalarVector<true>(attr, &pBatch->m_arr[colidx], rows);
|
||||
} else {
|
||||
TransformScalarVector<false>(attr, &pBatch->m_arr[colidx], rows);
|
||||
else {
|
||||
Assert(!opBatch);
|
||||
for (i = 0; i < scanstate->colNum; i++) {
|
||||
/* overlap project columns or qual columns */
|
||||
if ((lateRead && scanstate->colAttr[i].lateRead) || (!lateRead && !scanstate->colAttr[i].lateRead)) {
|
||||
colidx = scanstate->colAttr[i].colId;
|
||||
if (scanstate->nullflag[colidx]) {
|
||||
TransformScalarVector<true>(&attrs[colidx], &ipBatch->m_arr[colidx], NULL, rows);
|
||||
} else {
|
||||
TransformScalarVector<false>(&attrs[colidx], &ipBatch->m_arr[colidx], NULL, rows);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -265,6 +303,27 @@ void VectorizeTupleBatchMode(VectorBatch *pBatch, TupleTableSlot **slots,
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
}
|
||||
|
||||
static void CopyScalarVector(VectorBatch *ipBatch, VectorBatch *opBatch, SeqScanState *node, int rows)
|
||||
{
|
||||
ProjectionInfo *proj = node->ps.ps_ProjInfo;
|
||||
AttrNumber att;
|
||||
ScanBatchState *scanstate = node->scanBatchState;
|
||||
|
||||
for (int i = 0; i < scanstate->colNum; i++) {
|
||||
if (scanstate->colAttr[i].isProject) {
|
||||
att = proj->pi_varNumbers[i];
|
||||
/* The data has been stored directly to finalbatch */
|
||||
if (scanstate->colAttr[i].lateRead) {
|
||||
opBatch->m_arr[i].copyFlag(&(ipBatch->m_arr[att - 1]), 0, rows);
|
||||
}
|
||||
/* Process columns that have been transformed, if there is overlap */
|
||||
else {
|
||||
opBatch->m_arr[i].copyDeep(&(ipBatch->m_arr[att - 1]), 0, rows);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* @Description: Vectorized Operator--Convert row data to vector batch.
|
||||
*
|
||||
@ -323,76 +382,94 @@ done:
|
||||
return batch;
|
||||
}
|
||||
|
||||
static VectorBatch *ApplyProjectionAndFilterBatch(VectorBatch *pScanBatch,
|
||||
SeqScanState *node, TupleTableSlot **outerslot)
|
||||
/*
|
||||
* @Description: Process Qual and Project, The vectorization of tuples
|
||||
* if fSimpleMap is true, vectorize tuple in pScanBatch, else output pFinalBatch.
|
||||
* @IN pScanBatch: Scan Batch
|
||||
* @IN pFinalBatch: Final Batch
|
||||
* @return: if pOutBatch = NULL : Batch filters all tuples
|
||||
* if pOutBatch = pFinalBatch : Has been output directly to pFinalBatch
|
||||
* if pOutBatch != pFinalBatch && pOutBatch != NULL : Output toprojInfo->pi_batch in ExecVecProject
|
||||
*/
|
||||
static VectorBatch *ApplyProjectionAndFilterBatch(VectorBatch *pScanBatch, VectorBatch *pFinalBatch, SeqScanState *node)
|
||||
{
|
||||
ExprContext *econtext = NULL;
|
||||
ExprContext *econtext = node->ps.ps_ExprContext;
|
||||
ProjectionInfo *proj = node->ps.ps_ProjInfo;
|
||||
TupleDesc tupedesc = node->ss_ScanTupleSlot->tts_tupleDescriptor;
|
||||
VectorBatch *pOutBatch = NULL;
|
||||
bool fSimpleMap = false;
|
||||
bool fSimpleMap = node->ps.ps_ProjInfo->pi_directMap;
|
||||
uint64 inputRows = pScanBatch->m_rows;
|
||||
uint64 outputRows = pFinalBatch->m_rows;
|
||||
List* qual = (List*)node->ps.qual;
|
||||
|
||||
econtext = node->ps.ps_ExprContext;
|
||||
pOutBatch = node->scanBatchState->pCurrentBatch;
|
||||
fSimpleMap = node->ps.ps_ProjInfo->pi_directMap;
|
||||
if (pScanBatch->m_rows != 0) {
|
||||
Assert(pFinalBatch);
|
||||
|
||||
if (pScanBatch->m_rows) {
|
||||
initEcontextBatch(pScanBatch, NULL, NULL, NULL);
|
||||
|
||||
/* Evaluate the qualification clause if any. */
|
||||
if (qual != NULL) {
|
||||
ScalarVector *pVector = NULL;
|
||||
pVector = ExecVecQual(qual, econtext, false);
|
||||
/* If no matched rows, fetch again. */
|
||||
if (pVector == NULL) {
|
||||
pOutBatch->m_rows = 0;
|
||||
/* collect information of removed rows */
|
||||
InstrCountFiltered1(node, inputRows - pOutBatch->m_rows);
|
||||
return pOutBatch;
|
||||
InstrCountFiltered1(node, inputRows);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Call optimized PackT function when batch mode is turned on. */
|
||||
if (econtext->ecxt_scanbatch->m_sel) {
|
||||
pScanBatch->Pack(econtext->ecxt_scanbatch->m_sel);
|
||||
if (pScanBatch->m_sel) {
|
||||
pScanBatch->Pack(pScanBatch->m_sel);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Late read these columns
|
||||
* reset m_rows to the value before VecQual
|
||||
*/
|
||||
VectorizeTupleBatchMode<true>(pScanBatch, outerslot, econtext, node->scanBatchState, pScanBatch->m_rows);
|
||||
if (fSimpleMap) {
|
||||
/* From pScanBatch stored directly to pFinalBatch */
|
||||
VectorizeTupleBatchMode<true>(pScanBatch, pFinalBatch, tupedesc->attrs, econtext, node, pScanBatch->m_rows);
|
||||
|
||||
/* Copy flag for lateread columns, Copy flag and data for not lateread columns */
|
||||
CopyScalarVector(pScanBatch, pFinalBatch, node, pScanBatch->m_rows);
|
||||
|
||||
/* Project the final result */
|
||||
if (!fSimpleMap) {
|
||||
if (!proj->pi_exprContext->have_vec_set_fun) {
|
||||
pFinalBatch->m_rows = outputRows + pScanBatch->m_rows;
|
||||
pFinalBatch->FixRowCount();
|
||||
}
|
||||
|
||||
/* collect information of removed rows */
|
||||
InstrCountFiltered1(node, inputRows - pScanBatch->m_rows);
|
||||
}
|
||||
else {
|
||||
/* In palce vectorize int pScanBatch for project */
|
||||
VectorizeTupleBatchMode<true>(pScanBatch, NULL, tupedesc->attrs, econtext, node, pScanBatch->m_rows);
|
||||
|
||||
/* Project the final result */
|
||||
pOutBatch = ExecVecProject(proj, true, NULL);
|
||||
} else {
|
||||
/*
|
||||
* Copy the result to output batch. Note the output batch has different column set than
|
||||
* the scan batch, so we have to remap them. Projection will handle all logics here, so
|
||||
* for non simpleMap case, we don't need to do anything.
|
||||
*/
|
||||
pOutBatch->m_rows += pScanBatch->m_rows;
|
||||
|
||||
/* Copy flag and data for project */
|
||||
for (int i = 0; i < pOutBatch->m_cols; i++) {
|
||||
AttrNumber att = proj->pi_varNumbers[i];
|
||||
Assert(att > 0 && att <= pScanBatch->m_cols);
|
||||
|
||||
errno_t rc = memcpy_s(&pOutBatch->m_arr[i], sizeof(ScalarVector), &pScanBatch->m_arr[att - 1],
|
||||
sizeof(ScalarVector));
|
||||
securec_check(rc, "\0", "\0");
|
||||
pFinalBatch->m_arr[i].copyDeep(&(pOutBatch->m_arr[i]), 0, pOutBatch->m_rows);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!proj->pi_exprContext->have_vec_set_fun) {
|
||||
pOutBatch->m_rows = Min(pOutBatch->m_rows, pScanBatch->m_rows);
|
||||
pOutBatch->FixRowCount();
|
||||
}
|
||||
if (!proj->pi_exprContext->have_vec_set_fun) {
|
||||
pFinalBatch->m_rows = outputRows + pOutBatch->m_rows;
|
||||
pFinalBatch->FixRowCount();
|
||||
}
|
||||
|
||||
/* collect information of removed rows */
|
||||
InstrCountFiltered1(node, inputRows - pOutBatch->m_rows);
|
||||
/* collect information of removed rows */
|
||||
InstrCountFiltered1(node, inputRows - pOutBatch->m_rows);
|
||||
}
|
||||
}
|
||||
|
||||
/* Check fullness of return batch and refill it does not contain enough? */
|
||||
return pOutBatch;
|
||||
return pFinalBatch;
|
||||
}
|
||||
|
||||
static inline void VectorizeDeformBatch(VectorBatch *pBatch, ScanBatchState *scanstate, ScanBatchResult* scanSlotBatch)
|
||||
{
|
||||
for (int i = 0; i < scanSlotBatch->rows; i++) {
|
||||
tableam_tslot_formbatch(scanSlotBatch->scanTupleSlotInBatch[i], pBatch, i, scanstate->maxcolId);
|
||||
}
|
||||
}
|
||||
|
||||
static VectorBatch *ExecRowToVecBatchMode(RowToVecState *state)
|
||||
@ -403,9 +480,10 @@ static VectorBatch *ExecRowToVecBatchMode(RowToVecState *state)
|
||||
ExprContext *econtext = state->ps.ps_ExprContext;
|
||||
VectorBatch *pBatch = scanBatchState->pScanBatch;
|
||||
seqScanState->ps.ps_ProjInfo->pi_exprContext->ecxt_scanbatch = pBatch;
|
||||
VectorBatch *pOutBatch = scanBatchState->pCurrentBatch;
|
||||
const int BatchModeMaxTuples = 900;
|
||||
TupleDesc tupedesc = seqScanState->ss_ScanTupleSlot->tts_tupleDescriptor;
|
||||
const int BatchModeMaxTuples = BatchMaxSize * 0.9;
|
||||
const int MaxLoopsForReset = 50;
|
||||
ScanBatchResult *scanSlotBatch;
|
||||
|
||||
pFinalBatch->Reset();
|
||||
ResetExprContext(seqScanState->ps.ps_ExprContext);
|
||||
@ -419,23 +497,24 @@ static VectorBatch *ExecRowToVecBatchMode(RowToVecState *state)
|
||||
}
|
||||
|
||||
pBatch->Reset();
|
||||
pOutBatch->Reset();
|
||||
scanBatchState->scanTupleSlotMaxNum = BatchMaxSize;
|
||||
|
||||
int loops = 0;
|
||||
while (true) {
|
||||
loops++;
|
||||
pFinalBatch = state->m_pCurrentBatch;
|
||||
|
||||
/* Reset MemoryContext to avoid using too much memory if scan times more than MAX_LOOPS_FOR_RESET */
|
||||
if (loops == MaxLoopsForReset) {
|
||||
if (pFinalBatch->m_rows != 0) {
|
||||
return pFinalBatch;
|
||||
}
|
||||
loops = 0;
|
||||
loops = 0;
|
||||
ResetExprContext(seqScanState->ps.ps_ExprContext);
|
||||
ResetExprContext(econtext);
|
||||
}
|
||||
ScanBatchResult *scanSlotBatch = (ScanBatchResult *)ExecProcNode((PlanState*)seqScanState);
|
||||
|
||||
scanSlotBatch = (ScanBatchResult *)ExecProcNode((PlanState*)seqScanState);
|
||||
|
||||
/* scanSlotBatch is NULL means early free */
|
||||
if (scanSlotBatch == NULL || scanBatchState->scanfinished) {
|
||||
@ -445,35 +524,40 @@ static VectorBatch *ExecRowToVecBatchMode(RowToVecState *state)
|
||||
return pFinalBatch;
|
||||
}
|
||||
|
||||
/* Deform tuples for filter columns */
|
||||
VectorizeDeformBatch(pBatch, scanBatchState, scanSlotBatch);
|
||||
|
||||
/* prepare scanstate for this time process */
|
||||
for (int i = 0; i < scanBatchState->maxcolId; i++) {
|
||||
scanBatchState->nullflag[i] = pBatch->m_arr[i].m_const;
|
||||
/* prepare pBatch for next time read */
|
||||
pBatch->m_arr[i].m_const = false;
|
||||
}
|
||||
|
||||
/* Vectorize tuples for filter columns. */
|
||||
VectorizeTupleBatchMode<false>(pBatch, scanSlotBatch->scanTupleSlotInBatch,
|
||||
econtext, scanBatchState, scanSlotBatch->rows);
|
||||
VectorizeTupleBatchMode<false>(pBatch, NULL, tupedesc->attrs, econtext, seqScanState, scanSlotBatch->rows);
|
||||
|
||||
pBatch->FixRowCount(scanSlotBatch->rows);
|
||||
|
||||
/* apply filter conditions and vectorize tuples for late read columns. */
|
||||
pOutBatch = ApplyProjectionAndFilterBatch(pBatch, seqScanState, scanSlotBatch->scanTupleSlotInBatch);
|
||||
pFinalBatch = ApplyProjectionAndFilterBatch(pBatch, pFinalBatch, seqScanState);
|
||||
if (BatchIsNull(pFinalBatch)) {
|
||||
if (!scanBatchState->scanfinished) {
|
||||
continue;
|
||||
}
|
||||
scanBatchState->scanfinished = false;
|
||||
state->m_fNoMoreRows = true;
|
||||
pFinalBatch = state->m_pCurrentBatch;
|
||||
|
||||
return pFinalBatch;
|
||||
}
|
||||
|
||||
/* prepare pBatch for next time read */
|
||||
for (int i = 0 ; i < pBatch->m_cols; i++) {
|
||||
scanBatchState->nullflag[i] = false;
|
||||
}
|
||||
pBatch->FixRowCount(0);
|
||||
|
||||
if (BatchIsNull(pOutBatch)) {
|
||||
if (!scanBatchState->scanfinished) {
|
||||
continue;
|
||||
}
|
||||
scanBatchState->scanfinished = false;
|
||||
state->m_fNoMoreRows = true;
|
||||
return pFinalBatch;
|
||||
}
|
||||
|
||||
for (int i = 0; i < pOutBatch->m_cols; i++) {
|
||||
pFinalBatch->m_arr[i].copyDeep(&(pOutBatch->m_arr[i]), 0, pOutBatch->m_rows);
|
||||
}
|
||||
|
||||
pFinalBatch->m_rows += pOutBatch->m_rows;
|
||||
|
||||
scanBatchState->scanTupleSlotMaxNum = BatchMaxSize - pFinalBatch->m_rows;
|
||||
|
||||
/*
|
||||
|
@ -836,6 +836,17 @@ void ScalarVector::copyDeep(ScalarVector* vector, int start_idx, int end_idx)
|
||||
m_rows += copy_rows;
|
||||
}
|
||||
|
||||
void ScalarVector::copyFlag(ScalarVector* vector, int start_idx, int end_idx)
|
||||
{
|
||||
errno_t rc;
|
||||
Assert(vector != NULL);
|
||||
int copy_rows; /* the number of rows to be copyed */
|
||||
|
||||
copy_rows = end_idx - start_idx;
|
||||
rc = memcpy_s(&m_flag[m_rows], BatchMaxSize - m_rows, &vector->m_flag[start_idx], copy_rows);
|
||||
securec_check(rc, "\0", "\0");
|
||||
}
|
||||
|
||||
void ScalarVector::copyNth(ScalarVector* vector, int nth)
|
||||
{
|
||||
Assert(vector != NULL);
|
||||
|
@ -487,6 +487,7 @@ typedef struct ProjectionInfo {
|
||||
int pi_lastInnerVar;
|
||||
int pi_lastOuterVar;
|
||||
int pi_lastScanVar;
|
||||
List* pi_projectVarNumbers;
|
||||
List* pi_acessedVarNumbers;
|
||||
List* pi_sysAttrList;
|
||||
List* pi_lateAceessVarNumbers;
|
||||
@ -1736,15 +1737,19 @@ struct ScanBatchResult {
|
||||
TupleTableSlot** scanTupleSlotInBatch; /* array size of BatchMaxSize, stores tuples scanned in a page */
|
||||
};
|
||||
|
||||
struct ScanBatchColAttr {
|
||||
int colId; /* only save the used cols. */
|
||||
bool lateRead; /* for project */
|
||||
bool isProject; /* is project? */
|
||||
};
|
||||
|
||||
struct ScanBatchState {
|
||||
VectorBatch* pCurrentBatch; /* for output in batch */
|
||||
VectorBatch* pScanBatch; /* batch formed from tuples */
|
||||
int scanTupleSlotMaxNum; /* max row number of tuples can be scanned once */
|
||||
int colNum;
|
||||
int *colId; /* for qual and project, only save the used cols. */
|
||||
int maxcolId;
|
||||
ScanBatchColAttr* colAttr; /* for qual and project, save attributes. */
|
||||
bool *nullflag; /*indicate the batch has null value for performance */
|
||||
bool *lateRead; /* for project */
|
||||
bool scanfinished; /* last time return with rows, but pages of this partition is read out */
|
||||
ScanBatchResult scanBatch;
|
||||
};
|
||||
|
@ -293,6 +293,7 @@ public:
|
||||
void copy(ScalarVector* vector);
|
||||
|
||||
void copyDeep(ScalarVector* vector, int start_idx, int endIdx);
|
||||
void copyFlag(ScalarVector* vector, int start_idx, int end_idx);
|
||||
void copyNth(ScalarVector* vector, int Nth);
|
||||
|
||||
void copy(ScalarVector* vector, const bool* pSel);
|
||||
|
Reference in New Issue
Block a user