/* ------------------------------------------------------------------------- * ndpplugin.cpp * Routines to support ndp executor smart pushdown * * Portions Copyright (c) 2022 Huawei Technologies Co.,Ltd. * * IDENTIFICATION * contrib/ndpplugin/ndpplugin.cpp * * ------------------------------------------------------------------------- */ #include "access/valid.h" #include "access/tableam.h" #include "executor/node/nodeAgg.h" #include "component/rpc/rpc.h" #include "storage/file/fio_device.h" #include "storage/smgr/segment_internal.h" #include "funcapi.h" #include "ndpplugin.h" #include "ndp_check.h" #include "ndpam.h" #include "storage/ipc.h" PG_MODULE_MAGIC; PG_FUNCTION_INFO_V1(ndpplugin_invoke); PG_FUNCTION_INFO_V1(pushdown_statistics); #define IS_AU_ALIGNED(start) (!((start) & ((unsigned)PAGE_NUM_PER_AU - 1))) #define NDP_SCAN_CHANNEL_DEFAULT_MAX 128 #define NDP_SCAN_CEPH_DEFAULT_MAX 128 #define NDP_SCAN_TABLE_DEFAULT_MAX 128 void TransitionFunction(AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate, FunctionCallInfoData* fcinfo); static void NdpAggSlotAppend(AggState* state, AggStatePerGroup pergroup, TupleTableSlot* slot); static void knl_u_ndp_init(knl_u_ndp_context* ndp_cxt); static void NdpReInitContext(); THR_LOCAL ndp_pushdown_hook_type backup_ndp_pushdown_hook_type = NULL; THR_LOCAL TableAmNdpRoutine_hook_type backup_ndp_tableam = NULL; THR_LOCAL ExecutorStart_hook_type ndp_hook_ExecutorStart = NULL; THR_LOCAL ExecutorEnd_hook_type ndp_hook_ExecutorEnd = NULL; THR_LOCAL bool HOOK_INIT = false; constexpr int NDP_MAX_AWAIT_REQUEST = 64; constexpr int NDP_MEMORY_POOL_SIZE = 2048; NdpInstanceContext g_ndp_instance = { .mutex = PTHREAD_MUTEX_INITIALIZER, .status = UNINITIALIZED, .pageContext = new MpmcBoundedQueue(NDP_MEMORY_POOL_SIZE) }; void NdpSharedMemoryAlloc() { Assert(g_ndp_instance.pageContext); if (!g_ndp_instance.pageContext) { pthread_mutex_unlock(&g_ndp_instance.mutex); ereport(ERROR, (errmsg("memory pool haven't been init or already released."))); } MpmcBoundedQueue* pageContext = g_ndp_instance.pageContext; size_t blockSize = DSS_DEFAULT_AU_SIZE; void* ptr = malloc(blockSize * NDP_MEMORY_POOL_SIZE); if (!ptr) { pthread_mutex_unlock(&g_ndp_instance.mutex); ereport(ERROR, (errmsg("ndpplugin try alloc memory failed."))); } g_ndp_instance.pageContextPtr = ptr; uintptr_t ptrval = reinterpret_cast(ptr); for (int i = 0; i < NDP_MEMORY_POOL_SIZE; ++i) { pageContext->Enqueue(reinterpret_cast(ptrval + (i * blockSize))); } } /* * proc_exit callback to free g_ndp_instance */ static void NdpInstanceUninit(int status, Datum arg) { pthread_mutex_lock(&g_ndp_instance.mutex); if (g_ndp_instance.pageContextPtr) { free(g_ndp_instance.pageContextPtr); g_ndp_instance.pageContextPtr = nullptr; delete g_ndp_instance.pageContext; g_ndp_instance.pageContext = nullptr; } g_ndp_instance.status = UNINITIALIZED; pthread_mutex_unlock(&g_ndp_instance.mutex); } void NdpInstanceInit() { if (g_ndp_instance.status == INITIALIZED) { return; } #ifndef ENABLE_SSL // if not using ssl, use memory pool NdpSharedMemoryAlloc(); #endif g_ndp_instance.status = INITIALIZED; /* PostmasterMain(process_shared_preload_libraries) inits g_ndp_instance first */ on_proc_exit(NdpInstanceUninit, 0); } #define IndexGetBuffer(pages, i) ((char*)(pages) + i * BLCKSZ) #define NdpTupleOffset(t) ((t)->len + offsetof(NdpTupleHeaderData, tuple)) typedef struct NdpTupleHeaderData { uint64 len; HeapTupleHeaderData tuple; } NdpTupleHeaderData; typedef NdpTupleHeaderData* NdpTupleHeader; /* * return channel if rpc channel is ok, otherwise return null; */ NdpScanChannel* NdpScanGetChannel(NdpContext* ctx, char* connIp) { bool found = false; NdpScanChannel* channel; // no need to think about remove, do it when plan is over pthread_rwlock_rdlock(&ctx->ccLock); channel = (NdpScanChannel*)hash_search(ctx->channelCache, connIp, HASH_FIND, NULL); pthread_rwlock_unlock(&ctx->ccLock); if (!channel) { MemoryContext old = MemoryContextSwitchTo(ctx->ccMem); pthread_rwlock_wrlock(&ctx->ccLock); channel = (NdpScanChannel*)hash_search(ctx->channelCache, connIp, HASH_ENTER, &found); if (!found) { if (!channel->Init(ctx->rpcCount++, connIp, ctx->tableCount)) { hash_search(ctx->channelCache, connIp, HASH_REMOVE, NULL); channel = nullptr; } } pthread_rwlock_unlock(&ctx->ccLock); (void)MemoryContextSwitchTo(old); } return channel; } void NdpScanTryPushDownScan(HeapScanDesc scan, NdpScanDesc ndpScan) { BlockNumber start, end, phyStart; int bitCount = 0; NdpIoSlot* slot; NdpRetCode ret; AuInfo auinfo; char connIp[NDP_RPC_IP_LEN]; start = ndpScan->handledBlock; pm_get_pageinfo(ndpScan, start, &auinfo.object, connIp, end, phyStart); NdpContext* context = static_cast(ndpScan->cond->ctx); NdpScanChannel* channel = NdpScanGetChannel(context, connIp); if (!channel || channel->GetTableStatus(ndpScan->cond->tableId) == NdpTableStatus::CONSTRUCTFAIL) { ndpScan->AddToNormal(start, end); goto next; } slot = New(CurrentMemoryContext) NdpIoSlot(ndpScan); slot->SetStartBlockNum(start); auinfo.phyStartBlockNum = phyStart; auinfo.pageNum = end - start; bitCount = slot->SetReq(scan->rs_base.rs_rd->rd_smgr->smgr_rnode.node, 0, ndpScan->cond->tableId, auinfo); // set a threshold int the future if (bitCount == 0) { delete slot; ndpScan->AddToNormal(start, end); goto next; } /* Numbers of Ndp page may greater than pageNum, because of NdpTupleHeader has len. * And sometimes there are too much agg. Fix it in the future and SetResp; */ if (slot->SetResp(bitCount) != NdpRetCode::NDP_OK) { delete slot; ndpScan->AddToNormal(start, end); goto next; } ret = channel->SendRequest(slot, ndpScan); if (ret != NdpRetCode::NDP_OK) { delete slot; ndpScan->AddToNormal(start, end); ndpScan->sendFailedN++; ereport(DEBUG2, (errmsg("send request failed, error code %d.", static_cast(ret)))); } else { #ifndef NDP_ASYNC_RPC if (!ndpScan->HandleSlot(slot)) { delete slot; } #endif ndpScan->pushDownPageN += bitCount; if (ndpScan->scanState->ps.instrument) { ndpScan->scanState->ps.instrument->ndp_pushdown_page += bitCount; } } next: ndpScan->handledBlock = end; if (scan->dop > 1 && ((ndpScan->handledBlock - scan->rs_base.rs_startblock) % PARALLEL_SCAN_GAP_AU_ALIGNED == 0)) { ndpScan->handledBlock += (scan->dop - 1) * PARALLEL_SCAN_GAP_AU_ALIGNED; } } static bool NdpScanGetPageIO(NdpScanDesc ndpScan) { for (;;) { if (ndpScan->nextNdpPageIndex < ndpScan->curNdpPagesNum) { ndpScan->curNdpPage = (NdpPageHeader)IndexGetBuffer(ndpScan->curNdpPages, ndpScan->nextNdpPageIndex); Assert(((uintptr_t)ndpScan->curNdpPage & 0x7) == 0); ndpScan->nextNdpPageIndex++; ndpScan->curPageType = ndpScan->curNdpPage->pd_flags; ndpScan->curLinesNum = PageGetMaxOffsetNumber((Page)(ndpScan->curNdpPage)); ndpScan->nextLineIndex = 0; return true; } else { // all Ndp page has been handled, can be free ndpScan->FreeCurSlot(); // get next rpc page list #ifdef NDP_ASYNC_RPC if (!ndpScan->GetNextSlot()) { return false; } #else return false; #endif } } } static bool NdpScanGetPageLocal(NdpScanDesc ndpScan) { #ifdef NDP_ASYNC_RPC if (ndpScan->normalPagesId->Dequeue(ndpScan->curNormalPageId)) { #else if (ndpScan->normalPagesNum) { ndpScan->curNormalPageId = ndpScan->normalPagesId[ndpScan->normalPagesNum - 1]; ndpScan->normalPagesNum--; #endif ndpScan->curPageType = NORMAL_PAGE; heapgetpage(ndpScan->scan, ndpScan->curNormalPageId); ndpScan->curLinesNum = ndpScan->scan->rs_ntuples; ndpScan->nextLineIndex = 0; return true; } return false; } static bool NdpScanGetPageQueue(NdpScanDesc ndpScan) { /* * If get page from IO queue first, and most pages of IO was failed, * this pages will add to normal queue then will cause normal queue full quickly. * Get page from normal queue first. * If IO fail a lot but get page from normal first, plugin will not send IO request at a period of time. */ // from normal page // normalPagesId depends on NdpIO(resp) if (NdpScanGetPageLocal(ndpScan)) { return true; } // from Ndp page if (NdpScanGetPageIO(ndpScan)) { return true; } return false; } // return false if finished static bool NdpScanGetPage(NdpScanDesc ndpScan) { for (;;) { bool found = NdpScanGetPageQueue(ndpScan); if (found) { return true; } #ifdef NDP_ASYNC_RPC uint32 req = pg_atomic_read_u32(&ndpScan->reqCount); uint32 resp = pg_atomic_read_u32(&ndpScan->respCount); Assert(req >= resp); if (ndpScan->handledBlock < ndpScan->nBlock) { if ((req - resp) >= NDP_MAX_AWAIT_REQUEST) { pg_usleep(NDP_RPC_WAIT_USEC); } else { NdpScanTryPushDownScan((HeapScanDesc)ndpScan->scan, ndpScan); } continue; } // wait request if (resp < req) { pg_usleep(NDP_RPC_WAIT_USEC); // if normal page finish, io request failed, pages been added to normal queue, can't return directly. } else if (ndpScan->normalPagesId->Empty() && ndpScan->respIO->Empty()) { return false; } #else if (ndpScan->handledBlock < ndpScan->nBlock) { NdpScanTryPushDownScan((HeapScanDesc)ndpScan->scan, ndpScan); } else { return false; } #endif } } static void NdpScanHandleFilteredTuple(ScanState* scanState, HeapTuple tuple) { ProjectionInfo* proj_info = scanState->ps.ps_ProjInfo; if (proj_info) { heap_slot_store_heap_tuple(tuple, proj_info->pi_slot, InvalidBuffer, false, false); tableam_tslot_getsomeattrs(proj_info->pi_slot, proj_info->pi_slot->tts_tupleDescriptor->natts); proj_info->pi_slot->tts_flags &= ~TTS_FLAG_EMPTY; } } static void initialize_aggregate(AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate) { Plan* plan = aggstate->ss.ps.plan; int64 local_work_mem = SET_NODEMEM(plan->operatorMemKB[0], plan->dop); int64 max_mem = (plan->operatorMaxMem > 0) ? SET_NODEMEM(plan->operatorMaxMem, plan->dop) : 0; if (peraggstate->numSortCols > 0) { /* * In case of rescan, maybe there could be an uncompleted sort * operation? Clean it up if so. */ if (peraggstate->sortstates[aggstate->current_set]) tuplesort_end(peraggstate->sortstates[aggstate->current_set]); if (peraggstate->numInputs == 1) { peraggstate->sortstates[aggstate->current_set] = tuplesort_begin_datum(peraggstate->evaldesc->attrs[0].atttypid, peraggstate->sortOperators[0], peraggstate->sortCollations[0], peraggstate->sortNullsFirst[0], local_work_mem, false); } else { peraggstate->sortstates[aggstate->current_set] = tuplesort_begin_heap(peraggstate->evaldesc, peraggstate->numSortCols, peraggstate->sortColIdx, peraggstate->sortOperators, peraggstate->sortCollations, peraggstate->sortNullsFirst, local_work_mem, false, max_mem, plan->plan_node_id, SET_DOP(plan->dop)); } } /* * (Re)set transValue to the initial value. * * Note that when the initial value is pass-by-ref, we must copy it * (into the aggcontext) since we will pfree the transValue later. */ if (peraggstate->initValueIsNull) { pergroupstate->transValue = peraggstate->initValue; } else { pergroupstate->transValue = datumCopy(peraggstate->initValue, peraggstate->transtypeByVal, peraggstate->transtypeLen); } pergroupstate->transValueIsNull = peraggstate->initValueIsNull; pergroupstate->noTransValue = peraggstate->initValueIsNull; /* * (Re)set collectValue to the initial value. * * Note that when the initial value is pass-by-ref, we must copy it * (into the aggcontext) since we will pfree the collectValue later. * collection type is same as transition type. */ if (peraggstate->initCollectValueIsNull) { pergroupstate->collectValue = peraggstate->initCollectValue; } else { pergroupstate->collectValue = datumCopy(peraggstate->initCollectValue, peraggstate->transtypeByVal, peraggstate->transtypeLen); } pergroupstate->collectValueIsNull = peraggstate->initCollectValueIsNull; pergroupstate->noCollectValue = peraggstate->initCollectValueIsNull; } static void initialize_aggregates(AggState* aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroup) { int numReset = Max(aggstate->phase->numsets, 1); for (int aggno = 0; aggno < aggstate->numaggs; aggno++) { AggStatePerAgg peraggstate = &peragg[aggno]; for (int setno = 0; setno < numReset; setno++) { AggStatePerGroup pergroupstate = &pergroup[aggno + (setno * (aggstate->numaggs))]; aggstate->current_set = setno; initialize_aggregate(aggstate, peraggstate, pergroupstate); } } } /** * look for a hash entry * @param state agg state of current plan * @param slot slot load from backend * @return found hash entry */ static AggHashEntry LookForHashEntry(AggState* state, TupleTableSlot* slot) { TupleTableSlot* hashslot = state->hashslot; ListCell* l = NULL; AggHashEntry entry; bool isnew = false; AggWriteFileControl* TempFileControl = (AggWriteFileControl*)state->aggTempFileControl; if (hashslot->tts_tupleDescriptor == NULL) { ExecSetSlotDescriptor(hashslot, state->ss.ss_ScanTupleSlot->tts_tupleDescriptor); // Make sure all unused columns are NULLs ExecStoreAllNullTuple(hashslot); } // init hash slot tableam_tslot_getsomeattrs(slot, linitial_int(state->hash_needed)); int counter = slot->tts_nvalid - 1; foreach (l, state->hash_needed) { int varNumber = lfirst_int(l) - 1; hashslot->tts_values[varNumber] = slot->tts_values[counter]; hashslot->tts_isnull[varNumber] = slot->tts_isnull[counter]; counter--; } if (TempFileControl->spillToDisk == false || TempFileControl->finishwrite == true) { entry = (AggHashEntry)LookupTupleHashEntry(state->hashtable, hashslot, &isnew, true); } else { entry = (AggHashEntry)LookupTupleHashEntry(state->hashtable, hashslot, &isnew, false); } if (!isnew) { if (((Agg *)state->ss.ps.plan)->unique_check) { ereport(ERROR, (errcode(ERRCODE_UNEXPECTED_NODE_STATE), errmsg("find a duplicate plan"))); } return entry; } // is a new entry if (entry) { initialize_aggregates(state, state->peragg, entry->pergroup); agg_spill_to_disk(TempFileControl, state->hashtable, state->hashslot, ((Agg*)state->ss.ps.plan)->numGroups, true, state->ss.ps.plan->plan_node_id, SET_DOP(state->ss.ps.plan->dop), state->ss.ps.instrument); if (TempFileControl->filesource && state->ss.ps.instrument) { TempFileControl->filesource->m_spill_size = &state->ss.ps.instrument->sorthashinfo.spill_size; } } else { // find a new entry, but memory is not enough, write tuple to temp file Assert(TempFileControl->spillToDisk == true && TempFileControl->finishwrite == false); MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot); /* * Here need switch memorycontext to ecxt_per_tuple_memory, so memory which be applyed in function * ComputeHashValue is freed. */ MemoryContext oldContext = MemoryContextSwitchTo(state->tmpcontext->ecxt_per_tuple_memory); uint32 hashvalue = ComputeHashValue(state->hashtable); MemoryContextSwitchTo(oldContext); TempFileControl->filesource->writeTup(tuple, hashvalue & (TempFileControl->filenum - 1)); } return entry; } static void NdpHashAgg(AggState* state, TupleTableSlot* slot) { AggHashEntry entry = LookForHashEntry(state, slot); if (entry != NULL) { // accumulate slot to tuple NdpAggSlotAppend(state, entry->pergroup, slot); } } static void NdpAggSlotAppend(AggState* state, AggStatePerGroup pergroup, TupleTableSlot* slot) { int numGroupingSets = Max(state->phase->numsets, 1); int numAggs = state->numaggs; int aggno; int setno = 0; int counter = 0; for (aggno = 0; aggno < state->numaggs; aggno++) { AggStatePerAgg peraggstate = &state->peragg[aggno]; AggStatePerGroup pergroupstate = &pergroup[aggno]; if (pergroupstate->transValueIsNull) { if (((Agg*)(state->ss.ps.plan))->aggstrategy == AGG_PLAIN) { peraggstate->initValue = datumCopy(slot->tts_values[counter], peraggstate->transtypeByVal, peraggstate->transtypeLen); peraggstate->initValueIsNull = slot->tts_isnull[counter]; pergroupstate->transValue = peraggstate->initValue; pergroupstate->transValueIsNull = slot->tts_isnull[counter]; pergroupstate->noTransValue = false; counter++; continue; } } FunctionCallInfoData fcinfo; // init the number of arguments to a function. // fn_nargs = 2, since we only have two inputs, 0th is transvalue, 1th is resultvalue from backend InitFunctionCallInfoArgs(fcinfo, 2, 1); // add slot value to fcinfo fcinfo.arg[1] = slot->tts_values[counter]; fcinfo.argnull[1] = slot->tts_isnull[counter]; fcinfo.argTypes[1] = InvalidOid; counter++; // normally numGroupingSets = 1 for (setno = 0; setno < numGroupingSets; setno++) { AggStatePerGroup pergroupstate = &pergroup[aggno + (setno * numAggs)]; state->current_set = setno; TransitionFunction(state, peraggstate, pergroupstate, &fcinfo); } } } /* * check and fill transition value, then call the function * */ void TransitionFunction(AggState* aggstate, AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate, FunctionCallInfoData* fcinfo) { Datum newVal; if (peraggstate->transfn.fn_strict) { /* * For a strict transfn, nothing happens when there's a NULL input; we * just keep the prior transValue. */ for (int i = 1; i <= peraggstate->numTransInputs; i++) { if (fcinfo->argnull[i]) return; } if (pergroupstate->noTransValue) { /* * transValue has not been initialized. This is the first non-NULL * input value. We use it as the initial value for transValue. (We * already checked that the agg's input type is binary-compatible * with its transtype, so straight copy here is OK.) * * We must copy the datum into aggcontext if it is pass-by-ref. We * do not need to pfree the old transValue, since it's NULL. */ pergroupstate->transValue = datumCopy(fcinfo->arg[1], peraggstate->transtypeByVal, peraggstate->transtypeLen); pergroupstate->transValueIsNull = false; pergroupstate->noTransValue = false; return; } if (pergroupstate->transValueIsNull) { /* * Don't call a strict function with NULL inputs. Note it is * possible to get here despite the above tests, if the transfn is * strict *and* returned a NULL on a prior cycle. If that happens * we will propagate the NULL all the way to the end. */ return; } } // set up aggstate->curperagg to allow get aggref aggstate->curperagg = peraggstate; /* * OK to call the collection function * fn_nargs = 2, since we only have two inputs, 0th is transvalue, 1th is resultvalue from backend */ InitFunctionCallInfoData( *fcinfo, &(peraggstate->collectfn), 2, peraggstate->aggCollation, (Node*)aggstate, NULL); fcinfo->arg[0] = pergroupstate->transValue; fcinfo->argnull[0] = pergroupstate->transValueIsNull; fcinfo->argTypes[0] = InvalidOid; fcinfo->isnull = false; /* just in case transfn doesn't set it */ Node* origin_fcxt = fcinfo->context; if (peraggstate->is_avg) { Node* fcontext = (Node*)palloc0(sizeof(Node)); #ifdef FAULT_INJECT if ((rand() % PERCENTAGE_DIV) < PERCENTAGE) { ereport(ERROR, (errmsg("Fault inject -- palloc fail"))); } #endif fcontext->type = (NodeTag)(peraggstate->is_avg); fcinfo->context = fcontext; } newVal = FunctionCallInvoke(fcinfo); aggstate->curperagg = NULL; fcinfo->context = origin_fcxt; /* * If pass-by-ref datatype, must copy the new value into aggcontext and * pfree the prior transValue. But if transfn returned a pointer to its * first input, we don't need to do anything. */ if (!peraggstate->transtypeByVal && DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue)) { if (!fcinfo->isnull) { newVal = datumCopy(newVal, peraggstate->transtypeByVal, peraggstate->transtypeLen); } if (!pergroupstate->transValueIsNull) pfree(DatumGetPointer(pergroupstate->transValue)); } if (((Agg*)(aggstate->ss.ps.plan))->aggstrategy == AGG_PLAIN) { peraggstate->initValue = newVal; peraggstate->initValueIsNull = fcinfo->isnull; } pergroupstate->transValue = newVal; pergroupstate->transValueIsNull = fcinfo->isnull; pergroupstate->noTransValue = pergroupstate->transValueIsNull; } static TupleDesc NdpAggTupleDescCreate(AggState* aggState) { Assert(aggState->ss.ps.plan->type == T_Agg); Agg* agg = (Agg*)aggState->ss.ps.plan; int len = aggState->numaggs + agg->numCols; TupleDesc typeInfo = CreateTemplateTupleDesc(len, false, TableAmHeap); int curResno = 1; for (int aggno = 0; aggno < aggState->numaggs; ++aggno) { AggStatePerAgg perAgg = &aggState->peragg[aggno]; // we don't rely on Aggref::aggtrantype, which is defined in PGXC Oid aggTransType = ((FuncExpr*)perAgg->transfn.fn_expr)->funcresulttype; int32 typmod = -1; int attdim = 0; Oid collationid = 0; // get from pg_type HeapTuple tp; tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(aggTransType)); if (HeapTupleIsValid(tp)) { Form_pg_type typtup = (Form_pg_type)GETSTRUCT(tp); typmod = typtup->typtypmod; attdim = typtup->typndims; collationid = typtup->typcollation; ReleaseSysCache(tp); } TupleDescInitEntry(typeInfo, curResno, NULL, aggTransType, typmod, attdim); TupleDescInitEntryCollation(typeInfo, curResno, collationid); curResno++; } for (int i = 0; i < agg->numCols; ++i) { AttrNumber att = agg->grpColIdx[i]; Node* node = (Node*)list_nth(agg->plan.lefttree->targetlist, att - 1); Assert(node->type == T_TargetEntry); Node* expr = (Node*)(((TargetEntry*)node)->expr); TupleDescInitEntry(typeInfo, curResno, ((TargetEntry*)node)->resname, exprType(expr), exprTypmod(expr), 0); TupleDescInitEntryCollation(typeInfo, curResno, exprCollation(expr)); curResno++; } return typeInfo; } static void NdpScanHandleAggTuple(AggState* aggState, TupleTableSlot* slot, HeapTuple tuple) { if (aggState == NULL) { ereport(WARNING, (errmsg("Can't happen, ndp page flag is wrong!"))); return; } heap_slot_store_heap_tuple(tuple, slot, InvalidBuffer, false, false); tableam_tslot_getsomeattrs(slot, slot->tts_tupleDescriptor->natts); // read tuple if (((Agg*)aggState->ss.ps.plan)->aggstrategy == AGG_HASHED) { NdpHashAgg(aggState, slot); } else { NdpAggSlotAppend(aggState, aggState->pergroup, slot); } } static inline bool NdpScanCheckKey(HeapScanDesc scan) { HeapTuple tuple = &(scan->rs_ctup); int nkeys = scan->rs_base.rs_nkeys; ScanKey key = scan->rs_base.rs_key; if (key != NULL) { bool valid = false; HeapKeyTest(tuple, (scan->rs_tupdesc), nkeys, key, valid); if (valid) { return true; } } else { return true; } return false; } static bool NdpScanGetTupleFromStocked(HeapScanDesc scan, NdpScanDesc ndpScan) { HeapTuple tuple = &(scan->rs_ctup); while (ndpScan->nextLineIndex < ndpScan->curLinesNum) { int curLineIndex = ndpScan->nextLineIndex; ndpScan->nextLineIndex++; Assert(ndpScan->curPageType != INVALID_PAGE); if (ndpScan->curPageType == NORMAL_PAGE) { BlockNumber page = ndpScan->curNormalPageId; Page dp = (Page)BufferGetPage(scan->rs_base.rs_cbuf); OffsetNumber line_off = scan->rs_base.rs_vistuples[curLineIndex]; ItemId lpp = HeapPageGetItemId(dp, line_off); Assert(ItemIdIsNormal(lpp)); // set tuple tuple->t_data = (HeapTupleHeader)PageGetItem((Page)dp, lpp); tuple->t_len = ItemIdGetLength(lpp); ItemPointerSet(&(tuple->t_self), page, line_off); HeapTupleCopyBaseFromPage(tuple, dp); if (NdpScanCheckKey(scan)) { scan->rs_base.rs_cindex = curLineIndex; return true; } } else { ItemId lpp = PageGetItemId((Page)ndpScan->curNdpPage, curLineIndex + 1); HeapTupleHeader pushDownTuple = (HeapTupleHeader)((char*)ndpScan->curNdpPage + lpp->lp_off); tuple->t_data = pushDownTuple; tuple->t_len = (uint32)lpp->lp_len; if (ndpScan->curPageType == NDP_FILTERED_PAGE) { if (NdpScanCheckKey(scan)) { scan->rs_base.rs_cindex = curLineIndex; NdpScanHandleFilteredTuple(ndpScan->scanState, tuple); return true; } } else if (ndpScan->curPageType == NDP_AGG_PAGE) { NdpScanHandleAggTuple(ndpScan->aggState, ndpScan->aggSlot, tuple); } } } return false; } static void NdpScanGetCachedTuple(HeapScanDesc scan, NdpScanDesc ndpScan) { CHECK_FOR_INTERRUPTS(); for(;;) { // 1. scan stocked page if (NdpScanGetTupleFromStocked(scan, ndpScan)) { return; } // 2. get new page if (!NdpScanGetPage(ndpScan)) { // free buffer if (BufferIsValid(scan->rs_base.rs_cbuf)) { ReleaseBuffer(scan->rs_base.rs_cbuf); } ndpScan->FreeCurSlot(); scan->rs_base.rs_cbuf = InvalidBuffer; scan->rs_base.rs_cblock = InvalidBlockNumber; scan->rs_base.rs_inited = false; scan->rs_ctup.t_data = NULL; return; } } } Tuple NdpScanGetTuple(TableScanDesc sscan, ScanDirection dir, TupleTableSlot* slot) { HeapScanDesc scan = (HeapScanDesc)sscan; HeapTuple tuple = &(scan->rs_ctup); Assert(ScanDirectionIsForward(dir)); NdpScanDesc ndpScan = (NdpScanDesc)sscan->ndp_ctx; MemoryContext oldMct = MemoryContextSwitchTo(ndpScan->memCtx); if (!scan->rs_base.rs_inited) { if (scan->rs_base.rs_nblocks == 0) { Assert(!BufferIsValid(scan->rs_base.rs_cbuf)); tuple->t_data = NULL; goto out; } // doesn't support rs_parallel and rs_syncscan if (scan->rs_parallel != NULL || scan->rs_base.rs_syncscan) { ereport(WARNING, (errmsg("parallel not support %p, syncscan not support %d in NDP scene.", scan->rs_parallel, scan->rs_base.rs_syncscan))); } // init NdpScanDesc, rs_startblock must AU aligned in begin_scan Assert(IS_AU_ALIGNED(scan->rs_base.rs_startblock)); ndpScan->handledBlock = scan->rs_base.rs_startblock; ndpScan->nBlock = scan->rs_base.rs_nblocks; ndpScan->curPageType = INVALID_PAGE; ndpScan->curIO = nullptr; ndpScan->curLinesNum = 0; ndpScan->nextLineIndex = 0; ndpScan->curNdpPagesNum = 0; ndpScan->nextNdpPageIndex = 0; #ifndef NDP_ASYNC_RPC ndpScan->normalPagesNum = 0; #endif scan->rs_base.rs_inited = true; } PG_TRY(); { NdpScanGetCachedTuple(scan, ndpScan); } PG_CATCH(); { delete ndpScan; sscan->ndp_ctx = nullptr; PG_RE_THROW(); } PG_END_TRY(); out: (void)MemoryContextSwitchTo(oldMct); if (scan->rs_ctup.t_data == NULL) { ereport(DEBUG2, (errmsg("heap_getnext returning EOS"))); return NULL; // Upper doesn't judge t_data, so tuple must return NULL if t_data is NULL. } return tuple; } void NdpScanParallelInit(TableScanDesc sscan, int32 dop, ScanDirection dir) { HeapScanDesc scan = (HeapScanDesc) sscan; Assert(!ScanDirectionIsBackward(dir)); if (!scan || scan->rs_base.rs_nblocks == 0) { return; } if (dop <= 1) { return; } scan->dop = dop; uint32 paral_blocks = u_sess->stream_cxt.smp_id * PARALLEL_SCAN_GAP_AU_ALIGNED; /* If not enough pages to divide into every worker. */ if (scan->rs_base.rs_nblocks <= paral_blocks) { scan->rs_base.rs_startblock = 0; scan->rs_base.rs_nblocks = 0; return; } scan->rs_base.rs_startblock = paral_blocks; } // check state after planstate inited void CheckAndSetNdpScan(Relation relation, Snapshot snapshot, ScanState* sstate, TableScanDesc desc) { if (relation->rd_tam_ops != TableAmHeap) return; // only support astore currently if (IsSystemRelation(relation) || IsCatalogRelation(relation) || IsToastRelation(relation) || RelationIsToast(relation) || isAnyTempNamespace(RelationGetNamespace(relation)) || RELATION_IS_TEMP(relation) || RelationGetRelPersistence(relation) == RELPERSISTENCE_UNLOGGED) return; if (RowRelationIsCompressed(relation)) return; if (relation->is_compressed) return; // check TableScanDesc if (desc->rs_snapshot->satisfies != SNAPSHOT_MVCC) { return; } if (!desc->rs_pageatatime) return; if (desc->rs_nblocks < (unsigned int)u_sess->ndp_cxt.pushdown_min_blocks) return; HeapScanDesc scan = (HeapScanDesc)desc; if (scan->rs_parallel != nullptr) { ereport(NOTICE, (errmsg("parallel are not supported in NDP scene"))); return; } // recheck if (sstate->ps.plan->ndp_pushdown_condition == nullptr) { ereport(WARNING, (errmsg("Ndp condition should not be NULL"))); return; } NdpScanDesc ndpScanDesc = New(CurrentMemoryContext) NdpScanDescData; NdpRetCode ret = ndpScanDesc->Init(sstate, desc); if (ret != NdpRetCode::NDP_OK) { delete ndpScanDesc; ereport(ERROR, (errmsg("NdpScanDesc init failed, code %d", static_cast(ret)))); return; } desc->ndp_pushdown_optimized = true; desc->ndp_ctx = ndpScanDesc; desc->rs_syncscan = false; } TableScanDesc hook_ndp_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, ScanState* sstate, RangeScanInRedis rangeScanInRedis) { TableScanDesc scanDesc = tableam_scan_begin(relation, snapshot, nkeys, key, rangeScanInRedis); if (scanDesc) { CheckAndSetNdpScan(relation, snapshot, sstate, scanDesc); } return scanDesc; } void hook_ndp_init_parallel(TableScanDesc sscan, int32 dop, ScanDirection dir) { if (!ScanDirectionIsBackward(dir)) { return NdpScanParallelInit(sscan, dop, dir); } else { return tableam_scan_init_parallel_seqscan(sscan, dop, dir); } } void hook_ndp_rescan(TableScanDesc sscan, ScanKey key) { tableam_scan_rescan(sscan, key); sscan->rs_syncscan = false; sscan->ndp_pushdown_optimized = true; NdpScanDesc ndpScan = (NdpScanDesc)sscan->ndp_ctx; if (ndpScan) { ndpScan->Reset(); } } static void SendTerminate(NdpContext* context) { if (context == nullptr) { return; } HASH_SEQ_STATUS status; NdpScanChannel* channel; // notify rpc server to release query resource hash_seq_init(&status, context->channelCache); while ((channel = (NdpScanChannel*)hash_seq_search(&status)) != nullptr) { NdpRetCode retCode = channel->SendEnd(); if (retCode != NdpRetCode::NDP_OK) { ereport(DEBUG2, (errmsg("SendEnd %s fail code[%d].", channel->rpcIp, static_cast(retCode)))); } channel->DestroyChannel(); hash_search(context->channelCache, channel->rpcIp, HASH_REMOVE, NULL); } } void NdpDestroyContext(NdpContext* context) { if (context == nullptr) { return; } hash_destroy(context->channelCache); context->channelCache = nullptr; } static void NdpReInitContext() { if (u_sess->ndp_cxt.cxt == nullptr) { return; } NdpContext* context = (NdpContext*)u_sess->ndp_cxt.cxt; SendTerminate(context); NdpDestroyContext(context); MemoryContext oldContext = MemoryContextSwitchTo(u_sess->ndp_cxt.mem_cxt); context->rpcCount = 0; context->tableCount = 0; context->u_sess = u_sess; HASHCTL ctlConn; ctlConn.keysize = NDP_RPC_IP_LEN; ctlConn.entrysize = sizeof(NdpScanChannel); ctlConn.hash = string_hash; context->channelCache = hash_create("Ndp Connector to IPC Channel", NDP_SCAN_CHANNEL_DEFAULT_MAX, &ctlConn, HASH_ELEM | HASH_FUNCTION); if (context->channelCache == nullptr) { pfree(context); u_sess->ndp_cxt.cxt = nullptr; } MemoryContextSwitchTo(oldContext); } void hook_ndp_endscan(TableScanDesc sscan) { NdpScanDesc ndpScan = (NdpScanDesc)sscan->ndp_ctx; if (ndpScan == nullptr || !sscan->ndp_pushdown_optimized || ndpScan->cond == nullptr) { return tableam_scan_end(sscan); } NdpContext* context = static_cast(ndpScan->cond->ctx); if (context == nullptr || context->u_sess == nullptr) { delete ndpScan; return tableam_scan_end(sscan); } knl_session_context* sess = context->u_sess; __atomic_add_fetch(&sess->ndp_cxt.stats->sendFailed, ndpScan->sendFailedN, __ATOMIC_RELAXED); __atomic_add_fetch(&sess->ndp_cxt.stats->failedIO, ndpScan->failedIoN, __ATOMIC_RELAXED); __atomic_add_fetch(&sess->ndp_cxt.stats->pushDownPage, ndpScan->pushDownPageN, __ATOMIC_RELAXED); __atomic_add_fetch(&sess->ndp_cxt.stats->sendBackPage, ndpScan->sendBackPageN, __ATOMIC_RELAXED); __atomic_add_fetch(&sess->ndp_cxt.stats->ndpPageAgg, ndpScan->ndpPageAggN, __ATOMIC_RELAXED); __atomic_add_fetch(&sess->ndp_cxt.stats->ndpPageScan, ndpScan->ndpPageScanN, __ATOMIC_RELAXED); if (!StreamThreadAmI()) { __atomic_add_fetch(&sess->ndp_cxt.stats->queryCounter, 1, __ATOMIC_RELAXED); } delete ndpScan; sscan->ndp_ctx = NULL; return tableam_scan_end(sscan); } Tuple hook_ndp_getnexttuple(TableScanDesc sscan, ScanDirection direction, TupleTableSlot* slot) { if (ScanDirectionIsForward(direction)) { return NdpScanGetTuple(sscan, direction, slot); } else { return heap_getnext(sscan, direction); } } void hook_ndp_handle_hashaggtuple(AggState* aggstate, HeapTupleData *tts_minhdr) { TupleTableSlot *aggSlot = (TupleTableSlot *)aggstate->ndp_slot; if (aggSlot) { NdpScanHandleAggTuple(aggstate, aggSlot, tts_minhdr); } } // shared by multi thread static TableAmNdpRoutine_hook ndp_tableam_apply = { .scan_begin = hook_ndp_beginscan, .scan_init_parallel_seqscan = hook_ndp_init_parallel, .scan_rescan = hook_ndp_rescan, .scan_end = hook_ndp_endscan, .scan_getnexttuple = hook_ndp_getnexttuple, .handle_hashaggslot = hook_ndp_handle_hashaggtuple }; void ndpplugin_invoke(void) { ereport(DEBUG2, (errmsg("dummy function to let process load this library."))); return; } NdpScanCondition* NdpCreateScanCondition(Plan* node) { NdpScanCondition* cond = makeNode(NdpScanCondition); cond->plan = node; return cond; } void NdpDestroyScanCondition(NdpScanCondition* cond) { if (!cond) { return; } pfree((void*)cond); } NdpContext* NdpCreateContext() { NdpContext* context = (NdpContext*)palloc(sizeof(NdpContext)); #ifdef FAULT_INJECT if ((rand() % PERCENTAGE_DIV) < PERCENTAGE) { ereport(ERROR, (errmsg("Fault inject -- palloc fail"))); } #endif pthread_rwlock_init(&context->ccLock, NULL); context->ccMem = CurrentMemoryContext; HASHCTL ctlConn; ctlConn.keysize = NDP_RPC_IP_LEN; ctlConn.entrysize = sizeof(NdpScanChannel); ctlConn.hash = string_hash; context->channelCache = hash_create("Ndp Connector to IPC Channel", NDP_SCAN_CHANNEL_DEFAULT_MAX, &ctlConn, HASH_ELEM | HASH_FUNCTION); if (context->channelCache == NULL) { pfree(context); return NULL; } context->rpcCount = 0; context->tableCount = 0; context->u_sess = u_sess; DependencePath paths = { .ulogPath = LIB_ULOG, .rpcPath = LIB_RPC_UCX, .sslDLPath = LIB_OPENSSL_DL, .sslPath = LIB_SSL, .cryptoPath = LIB_CRYPTO }; RpcStatus status = RpcClientInit(paths); if (status != RPC_OK) { hash_destroy(context->channelCache); pfree(context); return NULL; } return context; } NdpContext* GetNdpContext() { if (u_sess->ndp_cxt.cxt == nullptr) { MemoryContext oldContext = nullptr; oldContext = MemoryContextSwitchTo(u_sess->ndp_cxt.mem_cxt); u_sess->ndp_cxt.cxt = NdpCreateContext(); MemoryContextSwitchTo(oldContext); } else { NdpReInitContext(); } return (NdpContext*)u_sess->ndp_cxt.cxt; } // check after create plan static void CheckAndSetNdpScanPlan(PlannedStmt* stmt, SeqScan* scan, Plan* parent, NdpContext** context) { Plan* pushDownPlan = CheckAndGetNdpPlan(stmt, scan, parent); if (pushDownPlan == NULL) { return; } NdpScanCondition* cond = NdpCreateScanCondition(pushDownPlan); if (cond == NULL) { scan->plan.ndp_pushdown_optimized = false; scan->plan.ndp_pushdown_condition = NULL; ereport(WARNING, (errmsg("NdpCreateScanCondition failed"))); } else { // store ndp context in ndp_pushdown_condition if (*context) { cond->ctx = *context; cond->tableId = ((*context)->tableCount)++; scan->plan.ndp_pushdown_optimized = true; scan->plan.ndp_pushdown_condition = (Node*)cond; } else { *context = GetNdpContext(); if (!*context) { NdpDestroyScanCondition(cond); scan->plan.ndp_pushdown_optimized = false; scan->plan.ndp_pushdown_condition = NULL; } else { cond->ctx = *context; cond->tableId = ((*context)->tableCount)++; scan->plan.ndp_pushdown_optimized = true; scan->plan.ndp_pushdown_condition = (Node*)cond; } } } } static void TraversePlan(PlannedStmt* stmt, Plan* plan, Plan* parent, NdpContext** context) { if (!plan) return; /* filter out the lefttree and righttree of T_MergeJoin which is T_Sort */ if (IsA(plan, MergeJoin)) { TraversePlan(stmt, outerPlan(outerPlan(plan)), plan, context); TraversePlan(stmt, outerPlan(innerPlan(plan)), plan, context); return; } else if (IsA(plan, SeqScan)) { CheckAndSetNdpScanPlan(stmt, castNode(SeqScan, plan), parent, context); } else if (IsA(plan, SubqueryScan)) { TraversePlan(stmt, castNode(SubqueryScan, plan)->subplan, plan, context); } else if (IsA(plan, Append)) { ListCell* lc = NULL; foreach (lc, castNode(Append, plan)->appendplans) { Plan* appendPlans = (Plan*)lfirst(lc); TraversePlan(stmt, appendPlans, plan, context); } } TraversePlan(stmt, outerPlan(plan), plan, context); TraversePlan(stmt, innerPlan(plan), plan, context); } static void CheckAndSetNdpPlan(Query* querytree, PlannedStmt* stmt) { knl_u_ndp_init(&u_sess->ndp_cxt); if (!CheckNdpSupport(querytree, stmt)) { return; } // travel plan to find scan node NdpContext* context = NULL; TraversePlan(stmt, stmt->planTree, NULL, &context); ListCell *l = NULL; foreach (l, stmt->subplans) { Plan *subplan = (Plan *)lfirst(l); TraversePlan(stmt, subplan, NULL, &context); } } static void NdpAggInitCollect(AggState* node) { for (int i = 0; i < node->numaggs; i++) { AggStatePerAgg peragg = &(node->peragg[i]); Aggref* aggref = peragg->aggref; Oid collectfn_oid; Expr* collectfnexpr = NULL; if (OidIsValid(peragg->collectfn_oid)) { continue; } /* Fetch the pg_aggregate row */ HeapTuple aggTuple = SearchSysCache1(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid)); if (!HeapTupleIsValid(aggTuple)) { ereport(ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmodule(MOD_EXECUTOR), errmsg("cache lookup failed for aggregate %u", aggref->aggfnoid))); } Form_pg_aggregate aggform = (Form_pg_aggregate)GETSTRUCT(aggTuple); /* Check permission to call aggregate function */ AclResult aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(), ACL_EXECUTE); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, ACL_KIND_PROC, get_func_name(aggref->aggfnoid)); peragg->collectfn_oid = collectfn_oid = aggform->aggcollectfn; Oid aggtranstype = aggform->aggtranstype; if (OidIsValid(collectfn_oid)) { /* we expect final function expression to be NULL in call to * build_aggregate_fnexprs below, since InvalidOid is passed for * finalfn_oid argument. Use a dummy expression to accept that. */ Expr* dummyexpr = NULL; /* * for XC, we need to setup the collection function expression as well. * Use build_aggregate_fnexpr() with invalid final function oid, and collection * function information instead of transition function information. * We should really be adding this step inside * build_aggregate_fnexprs() but this way it becomes easy to merge. */ build_aggregate_fnexprs(&aggtranstype, 1, aggtranstype, aggref->aggtype, aggref->inputcollid, collectfn_oid, InvalidOid, &collectfnexpr, &dummyexpr); Assert(!dummyexpr); } fmgr_info(collectfn_oid, &peragg->collectfn); peragg->collectfn.fn_expr = (Node*)collectfnexpr; ReleaseSysCache(aggTuple); } } static void NdpAggInit(AggState* node) { Agg* plan = reinterpret_cast(node->ss.ps.plan); if (plan->aggstrategy == AGG_PLAIN) { for (int i = 0; i < node->numaggs; i++) { AggStatePerGroup pergroup = &(node->pergroup[i]); AggStatePerAgg peragg = &(node->peragg[i]); pergroup->transValueIsNull = peragg->initValueIsNull; if (!peragg->initValueIsNull) { pergroup->transValue = datumCopy(peragg->initValue, peragg->transtypeByVal, peragg->transtypeLen); pergroup->noTransValue = false; } else { pergroup->noTransValue = true; } } } /* * we currently rely on collect function from gaussdb */ if (IS_STREAM_PLAN || StreamThreadAmI()) { NdpAggInitCollect(node); } } static void TraverseState(PlanState* state, PlanState* parent) { if (!state) return; if (IsA(state, SeqScanState)) { auto seq = reinterpret_cast(state); if (!seq->ss_currentScanDesc || !seq->ss_currentScanDesc->ndp_pushdown_optimized) { return; } auto ndpScan = reinterpret_cast(seq->ss_currentScanDesc->ndp_ctx); Assert(ndpScan); if (IsA(ndpScan->cond->plan, Agg)) { Assert(parent && IsA(parent, AggState)); ndpScan->aggState = reinterpret_cast(parent); TupleDesc desc = NdpAggTupleDescCreate(ndpScan->aggState); // should use ExecInitExtraTupleSlot to put in estate->es_tupleTable? TupleTableSlot* slot = MakeTupleTableSlot(false, TableAmHeap); ExecSetSlotDescriptor(slot, desc); ndpScan->aggSlot = slot; NdpAggInit(ndpScan->aggState); ndpScan->aggState->ndp_slot = slot; } } else if (IsA(state, SubqueryScanState)) { TraverseState(castNode(SubqueryScanState, state)->subplan, state); } else if (IsA(state, AppendState)) { AppendState* appState = reinterpret_cast(state); for (int i = 0; i < appState->as_nplans; i++) { TraverseState(*(appState->appendplans + i), state); } } TraverseState(outerPlanState(state), state); TraverseState(innerPlanState(state), state); } static void NdpExecutorStart(QueryDesc* queryDesc, int eflags) { knl_u_ndp_init(&u_sess->ndp_cxt); if (ndp_hook_ExecutorStart) ndp_hook_ExecutorStart(queryDesc, eflags); else standard_ExecutorStart(queryDesc, eflags); TraverseState(queryDesc->planstate, NULL); ListCell *l = NULL; foreach (l, queryDesc->estate->es_subplanstates) { PlanState* subplanstate = (PlanState*)lfirst(l); TraverseState(subplanstate, NULL); } } static void NdpExecutorEnd(QueryDesc* queryDesc) { if (ndp_hook_ExecutorEnd) ndp_hook_ExecutorEnd(queryDesc); else standard_ExecutorEnd(queryDesc); if (!StreamThreadAmI()) { NdpReInitContext(); } } static void InitializeNdpGUCOptions() { DefineCustomBoolVariable("ndpplugin.enable_ndp", "Enable NDP engine", NULL, &u_sess->ndp_cxt.enable_ndp, false, PGC_USERSET, 0, NULL, NULL, NULL); DefineCustomIntVariable("ndpplugin.pushdown_min_blocks", "Sets the lower limit of pushdown pages..", NULL, &u_sess->ndp_cxt.pushdown_min_blocks, 0, 0, INT_MAX / 1000, PGC_USERSET, GUC_CUSTOM_PLACEHOLDER, NULL, NULL, NULL); DefineCustomIntVariable("ndpplugin.ndp_port", "Sets the ndp_port of ndp", NULL, &u_sess->ndp_cxt.ndp_port, 8000, 0, 65535, PGC_USERSET, GUC_CUSTOM_PLACEHOLDER, NULL, NULL, NULL); #ifdef ENABLE_SSL DefineCustomStringVariable("ndpplugin.ca_path", "Client CA path", NULL, &u_sess->ndp_cxt.ca_path, "./", PGC_USERSET, GUC_LIST_INPUT, NULL, NULL, NULL); DefineCustomStringVariable("ndpplugin.crl_path", "Client crl path", NULL, &u_sess->ndp_cxt.crl_path, "./", PGC_USERSET, GUC_LIST_INPUT, NULL, NULL, NULL); #endif } static void knl_u_ndp_init(knl_u_ndp_context* ndp_cxt) { if (ndp_cxt->mem_cxt != nullptr) { return; } ndp_cxt->mem_cxt = AllocSetContextCreate(u_sess->top_mem_cxt, "NdpSelfMemoryContext", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); MemoryContext oldContext = MemoryContextSwitchTo(u_sess->ndp_cxt.mem_cxt); ndp_cxt->stats = (NdpStats*)palloc0(sizeof(NdpStats)); ndp_cxt->cxt = nullptr; InitializeNdpGUCOptions(); MemoryContextSwitchTo(oldContext); } /* * Entrypoint of this extension */ void _PG_init(void) { ereport(DEBUG2, (errmsg("init ndpplugin."))); if (!ENABLE_DSS) { ereport(DEBUG2, (errmsg("ndpplugin is not support while DMS and DSS disable."))); return; } pthread_mutex_lock(&g_ndp_instance.mutex); #ifdef GlobalCache long long au_size; const char *vg_name = g_instance.attr.attr_storage.dss_attr.ss_dss_data_vg_name + 1; int ret = dss_compare_size(vg_name, &au_size); if (ret != 0 || au_size != DSS_DEFAULT_AU_SIZE) { pthread_mutex_unlock(&g_ndp_instance.mutex); ereport(WARNING, (errmsg("init ndpplugin failed, inconsistency between dss_ausize and ndpplugin_ausize!"))); return; } #endif NdpInstanceInit(); pthread_mutex_unlock(&g_ndp_instance.mutex); if (HOOK_INIT == false) { backup_ndp_pushdown_hook_type = ndp_pushdown_hook; ndp_pushdown_hook = CheckAndSetNdpPlan; backup_ndp_tableam = ndp_tableam; ndp_tableam = &ndp_tableam_apply; ndp_hook_ExecutorStart = ExecutorStart_hook; ExecutorStart_hook = NdpExecutorStart; ndp_hook_ExecutorEnd = ExecutorEnd_hook; ExecutorEnd_hook = NdpExecutorEnd; } HOOK_INIT = true; knl_u_ndp_init(&u_sess->ndp_cxt); } void _PG_fini(void) { ndp_pushdown_hook = backup_ndp_pushdown_hook_type; ndp_tableam = backup_ndp_tableam; ExecutorStart_hook = ndp_hook_ExecutorStart; ExecutorEnd_hook = ndp_hook_ExecutorEnd; MemoryContextDelete(u_sess->ndp_cxt.mem_cxt); pthread_mutex_lock(&g_ndp_instance.mutex); if (g_ndp_instance.pageContextPtr) { free(g_ndp_instance.pageContextPtr); g_ndp_instance.pageContextPtr = nullptr; delete g_ndp_instance.pageContext; g_ndp_instance.pageContext = nullptr; } g_ndp_instance.status = UNINITIALIZED; pthread_mutex_unlock(&g_ndp_instance.mutex); } /* * For test ndpplugin push down functionality */ Datum pushdown_statistics(PG_FUNCTION_ARGS) { if (u_sess->ndp_cxt.stats == NULL) { ereport(WARNING, (errmsg("ndp init failed, the pushdown statistics can not be viewed"))); PG_RETURN_NULL(); } const int cols = 7; TupleDesc tupdesc = CreateTemplateTupleDesc(cols, true); TupleDescInitEntry(tupdesc, (AttrNumber)1, "query", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)2, "total_pushdown_page", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)3, "back_to_gauss", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)4, "received_scan", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)5, "received_agg", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)6, "failed_backend_handle", INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber)7, "failed_sendback", INT8OID, -1, 0); BlessTupleDesc(tupdesc); Datum values[cols]; values[0] = UInt64GetDatum(u_sess->ndp_cxt.stats->queryCounter); values[1] = UInt64GetDatum(u_sess->ndp_cxt.stats->pushDownPage); values[2] = UInt64GetDatum(u_sess->ndp_cxt.stats->sendBackPage); values[3] = UInt64GetDatum(u_sess->ndp_cxt.stats->ndpPageScan); values[4] = UInt64GetDatum(u_sess->ndp_cxt.stats->ndpPageAgg); values[5] = UInt64GetDatum(u_sess->ndp_cxt.stats->failedIO); values[6] = UInt64GetDatum(u_sess->ndp_cxt.stats->sendFailed); bool nulls[cols] = {false, false, false, false, false, false, false}; HeapTuple tuple = heap_form_tuple(tupdesc, values, nulls); HeapTupleHeader result = (HeapTupleHeader)palloc(tuple->t_len); int rc = memcpy_s(result, tuple->t_len, tuple->t_data, tuple->t_len); securec_check_ss(rc, "\0", "\0"); ReleaseTupleDesc(tupdesc); PG_RETURN_HEAPTUPLEHEADER(result); } /* test section end */