/* ------------------------------------------------------------------------- * ndpam.cpp * Routines to handle ndp page * * Portions Copyright (c) 2022 Huawei Technologies Co.,Ltd. * * IDENTIFICATION * contrib/ndpplugin/ndpam.cpp * * ------------------------------------------------------------------------- */ #include "access/csnlog.h" #include "access/slru.h" #include "executor/node/nodeAgg.h" #include "component/rpc/rpc.h" #include "storage/smgr/segment_internal.h" #include "ddes/dms/ss_transaction.h" #include "algorithm" #include "storage/dss/fio_dss.h" #include "storage/smgr/segment.h" #include "ndpnodes.h" #include "ndpam.h" #define NDP_PAGE_QUEUE_SIZE (1u << 10) #define NDP_NORMAL_QUEUE_SIZE (1u << 12) #define ClogCtl(n) (&t_thrd.shemem_ptr_cxt.ClogCtl[CBufHashPartition(n)]) #define CsnlogCtl(n) (&t_thrd.shemem_ptr_cxt.CsnlogCtlPtr[CSNBufHashPartition(n)]) #define CSNLOG_XACTS_PER_PAGE (BLCKSZ / sizeof(CommitSeqNo)) #define CSN_LWLOCK_ACQUIRE(pageno, lockmode) ((void)LWLockAcquire(CSNBufMappingPartitionLock(pageno), lockmode)) #define CSN_LWLOCK_RELEASE(pageno) (LWLockRelease(CSNBufMappingPartitionLock(pageno))) #define TransactionIdToCSNPage(xid) ((xid) / (TransactionId)CSNLOG_XACTS_PER_PAGE) constexpr int RPC_FAILED_LIMIT = 3; constexpr int SEND_FAILED_LIMIT = 10; inline static void SegLogicPageIdToExtentOffset(BlockNumber logicId, uint32* offset) { if (logicId < EXT_SIZE_8_TOTAL_PAGES) { *offset = logicId % EXT_SIZE_8; } else if (logicId < EXT_SIZE_128_TOTAL_PAGES) { logicId -= EXT_SIZE_8_TOTAL_PAGES; *offset = logicId % EXT_SIZE_128; } else if (logicId < EXT_SIZE_1024_TOTAL_PAGES) { logicId -= EXT_SIZE_128_TOTAL_PAGES; *offset = logicId % EXT_SIZE_1024; } else { logicId -= EXT_SIZE_1024_TOTAL_PAGES; *offset = logicId % EXT_SIZE_8192; } } static void md_get_physical_info(Relation rel, ForkNumber forknum, BlockNumber blocknum, int *handle, off_t *offset) { SMgrRelation reln = rel->rd_smgr; MdfdVec *v = NULL; v = _mdfd_getseg(reln, forknum, blocknum, false, EXTENSION_FAIL); Assert(v != NULL); *handle = FileFd(v->mdfd_vfd); *offset = DF_OFFSET_TO_SLICE_OFFSET(((off_t)blocknum) * BLCKSZ); } void md_get_pageinfo(NdpScanDesc ndpScan, BlockNumber page, CephObject *object, char *ip, BlockNumber& end, uint32& phyStartBlockNum) { // step1:get object and ip. int handle; off_t offset; HeapScanDesc scan = (HeapScanDesc)ndpScan->scan; md_get_physical_info(scan->rs_base.rs_rd, 0, page, &handle, &offset); #ifdef GlobalCache dss_get_addr(handle, offset, object->pool, object->image, ip, &object->objId, &object->objOffset); #endif // step2:slice pages, get end and phyStartBlockNum. BlockNumber start = ndpScan->handledBlock; #ifdef GlobalCache Assert(object->objOffset >= 0); end = Min((uint64_t)ndpScan->nBlock, (uint64_t)(start + PAGE_NUM_PER_AU - object->objOffset/BLCKSZ)); #else end = Min((uint64_t)ndpScan->nBlock, (uint64_t)(start + PAGE_NUM_PER_AU)); #endif phyStartBlockNum = NDPMERGEBIT(start, 0); } static void seg_get_physical_info(Relation rel, ForkNumber forknum, BlockNumber blocknum, SegPageLocation &loc, int *handle, off_t *offset) { SMgrRelation reln = rel->rd_smgr; loc = seg_get_physical_location(rel->rd_node, MAIN_FORKNUM, blocknum); SegmentCheck(loc.blocknum != InvalidBlockNumber); LockSegmentHeadPartition(reln->seg_space->spcNode, reln->seg_space->dbNode, reln->seg_desc[forknum]->head_blocknum, LW_SHARED); SegSpace *spc = reln->seg_space; RelFileNode relNode = {.spcNode = spc->spcNode, .dbNode = spc->dbNode, .relNode = EXTENT_SIZE_TO_TYPE(loc.extent_size), .bucketNode = SegmentBktId, .opt = 0}; int egid = EXTENT_TYPE_TO_GROUPID(relNode.relNode); SegExtentGroup *seg = &spc->extent_group[egid][forknum]; off_t beginoff = ((off_t)loc.blocknum) * BLCKSZ; int sliceno = DF_OFFSET_TO_SLICENO(beginoff); SegPhysicalFile spf = df_get_physical_file(seg->segfile, sliceno, loc.blocknum); *handle = spf.fd; *offset = DF_OFFSET_TO_SLICE_OFFSET(beginoff); UnlockSegmentHeadPartition(reln->seg_space->spcNode, reln->seg_space->dbNode, reln->seg_desc[forknum]->head_blocknum); } void seg_get_pageinfo(NdpScanDesc ndpScan, BlockNumber page, CephObject *object, char *ip, BlockNumber& end, uint32& phyStartBlockNum) { // step1:get object and ip. int handle; off_t offset; SegPageLocation loc; HeapScanDesc scan = (HeapScanDesc)ndpScan->scan; seg_get_physical_info(scan->rs_base.rs_rd, 0, page, loc, &handle, &offset); #ifdef GlobalCache dss_get_addr(handle, offset, object->pool, object->image, ip, &object->objId, &object->objOffset); #endif // step2:slice pages, get end and phyStartBlockNum. uint32 extentOff; BlockNumber start = ndpScan->handledBlock; SegLogicPageIdToExtentOffset(ndpScan->handledBlock, &extentOff); uint64_t blks = Min(loc.extent_size - extentOff, Min(uint32(PAGE_NUM_PER_AU), uint32(ndpScan->nBlock - ndpScan->handledBlock))); #ifdef GlobalCache Assert(object->objOffset >= 0); end = start + Min(blks, (uint64_t)(PAGE_NUM_PER_AU - object->objOffset/BLCKSZ)); #else end = start + blks; #endif phyStartBlockNum = NDPMERGEBIT(loc.blocknum, EXTENT_SIZE_TO_GROUPID(loc.extent_size) + 1); } void pm_get_pageinfo(NdpScanDesc ndpScan, BlockNumber page, CephObject *object, char *ip, BlockNumber& end, uint32& phyStartBlockNum) { HeapScanDesc scan = (HeapScanDesc)ndpScan->scan; int which = scan->rs_base.rs_rd->rd_smgr->smgr_which; FileType filetype = which == 0 ? MDFILE : (which == 2 ? SEGFILE : INVALIDFILE); Assert(filetype != INVALIDFILE); PAGEMETHOD[filetype].get_pageinfo(ndpScan, page, object, ip, end, phyStartBlockNum); uint64 curAUAligned = ndpScan->handledBlock / PARALLEL_SCAN_GAP_AU_ALIGNED; if (scan->dop > 1 && end > (curAUAligned + 1) * PARALLEL_SCAN_GAP_AU_ALIGNED) { end = (curAUAligned + 1) * PARALLEL_SCAN_GAP_AU_ALIGNED; } } void CopyCLog(int64 pageno, char *pageBuffer) { int slotno; errno_t rc = EOK; /* lock is acquired by SimpleLruReadPage_ReadOnly */ slotno = SimpleLruReadPage_ReadOnly(ClogCtl(pageno), pageno, FirstNormalTransactionId); rc = memcpy_s(pageBuffer, BLCKSZ, ClogCtl(pageno)->shared->page_buffer[slotno], BLCKSZ); securec_check(rc, "", ""); LWLockRelease(ClogCtl(pageno)->shared->control_lock); } void CopyCSNLog(int64 pageno, char *pageBuffer) { int slotno; errno_t rc = EOK; CSN_LWLOCK_ACQUIRE(pageno, LW_SHARED); slotno = SimpleLruReadPage_ReadOnly_Locked(CsnlogCtl(pageno), pageno, FirstNormalTransactionId); rc = memcpy_s(pageBuffer, BLCKSZ, CsnlogCtl(pageno)->shared->page_buffer[slotno], BLCKSZ); securec_check(rc, "", ""); CSN_LWLOCK_RELEASE(pageno); } int NdpIoSlot::SetReq(RelFileNode& node, uint16 taskId, uint16 tableId, AuInfo& auinfo) { int bitCount = 0; reqMsg.data = &req; reqMsg.len = sizeof(NdpIORequest); respMsg.data = nullptr; respMsg.len = 0; req.taskId = taskId; req.tableId = tableId; req.auInfos[0].phyStartBlockNum = auinfo.phyStartBlockNum; req.auInfos[0].pageNum = auinfo.pageNum; #ifdef GlobalCache errno_t rc2 = memcpy_s(&req.auInfos[0].object, sizeof(CephObject), &auinfo.object, sizeof(CephObject)); securec_check(rc2, "", ""); #endif errno_t rc = memset_s(req.pageMap, BITMAP_SIZE_PER_AU_BYTE, 0, BITMAP_SIZE_PER_AU_BYTE); securec_check(rc, "", ""); for (uint32 i = startBlockNum, offset = 0; i != startBlockNum + auinfo.pageNum; ++i, ++offset) { bool cached = IsPageHitBufferPool(node, MAIN_FORKNUM, i); if (!cached) { NDPSETBIT(req.pageMap, offset); ++bitCount; } } if (SS_STANDBY_MODE) { SSIsPageHitDms(node, startBlockNum, auinfo.pageNum, req.pageMap, &bitCount); } return bitCount; } NdpRetCode NdpIoSlot::SetResp(int pageNum) { #ifdef ENABLE_SSL respMsg.len = 0; respMsg.data = nullptr; return NdpRetCode::NDP_OK; #else respMsg.len = DSS_DEFAULT_AU_SIZE; if (g_ndp_instance.pageContext->Dequeue(respMsg.data)) { resp = reinterpret_cast(respMsg.data); return NdpRetCode::NDP_OK; } return NdpRetCode::ALLOC_RESPONSE_MEMORY_FAILED; #endif } NdpRetCode NdpIoSlot::GetResp(NdpPageHeader& pages, int& pageNum, BlockNumber& start, uint64*& map) { start = startBlockNum; map = nullptr; pages = nullptr; pageNum = 0; if (respRet != NdpRetCode::NDP_OK) { return respRet; } auto rpcResp = reinterpret_cast(respMsg.data); if (rpcResp == nullptr) { return NdpRetCode::NDP_RETURN_FAILED; } #ifdef ENABLE_SSL resp = reinterpret_cast(respMsg.data); #endif if (rpcResp->status != 0) { return NdpRetCode::NDP_RETURN_STATUS_ERROR; } map = rpcResp->pageMap; pageNum = rpcResp->ndpPageNums; if (pageNum) { pages = (NdpPageHeader)((char*)rpcResp + sizeof(NdpIOResponse)); } return NdpRetCode::NDP_OK; } void NdpIoSlot::FreeResp() { #ifdef ENABLE_SSL if (resp != nullptr) { free(resp); resp = nullptr; return; } if (respMsg.data) { free(respMsg.data); respMsg.data = nullptr; return; } #else bool enqueued; Assert(g_ndp_instance.pageContext); if (resp) { enqueued = g_ndp_instance.pageContext->Enqueue(resp); if (!enqueued) { ereport(WARNING, (errmsg("try enqueue slot memory to queue failed"))); } resp = nullptr; return; } if (respMsg.data) { enqueued = g_ndp_instance.pageContext->Enqueue(respMsg.data); if (!enqueued) { ereport(WARNING, (errmsg("try enqueue slot memory to queue failed"))); } respMsg.data = nullptr; return; } #endif } NdpRetCode NdpScanDescData::Init(ScanState* sstate, TableScanDesc sscan) { curIO = nullptr; // necessary, because rescan may be before get next // free everything in destructor if alloc failed #ifdef NDP_ASYNC_RPC pg_atomic_init_u32(&reqCount, 0); pg_atomic_init_u32(&respCount, 0); respIO = new MpmcBoundedQueue(NDP_PAGE_QUEUE_SIZE); if (respIO == nullptr) { ereport(WARNING, (errmsg("Alloc NpdPage Queue failed, size %d", NDP_PAGE_QUEUE_SIZE))); return NdpRetCode::ALLOC_MQ_FAILED; } normalPagesId = new MpmcBoundedQueue(NDP_NORMAL_QUEUE_SIZE); if (normalPagesId == nullptr) { ereport(WARNING, (errmsg("Alloc Normal Queue failed, size %d", NDP_NORMAL_QUEUE_SIZE))); return NdpRetCode::ALLOC_MQ_FAILED; } #endif memCtx = AllocSetContextCreate(CurrentMemoryContext, "ThreadNdpScanContext", 0, (4u << 10), (4u << 10)); if (!memCtx) { ereport(WARNING, (errmsg("Create ThreadNdpScanContext failed!"))); return NdpRetCode::ALLOC_MC_FAILED; } #ifdef FAULT_INJECT if ((rand() % PERCENTAGE_DIV) < PERCENTAGE) { ereport(WARNING, (errmsg("Fault inject -- Create ThreadNdpScanContext failed"))); return NdpRetCode::ALLOC_MC_FAILED; } #endif cond = (NdpScanCondition*)sstate->ps.plan->ndp_pushdown_condition; scan = sscan; scanState = sstate; aggState = NULL; aggSlot = NULL; return NdpRetCode::NDP_OK; } NdpScanDescData::~NdpScanDescData() { // wait all callback return while (pg_atomic_read_u32(&reqCount) != pg_atomic_read_u32(&respCount)) { pg_usleep(NDP_RPC_WAIT_USEC); } #ifdef NDP_ASYNC_RPC if (respIO) { NdpIoSlot* tmpIO = nullptr; while (respIO->Dequeue(tmpIO)) { delete tmpIO; } delete respIO; } if (normalPagesId) { delete normalPagesId; } #endif if (memCtx) { MemoryContextDelete(memCtx); } } void NdpScanDescData::Reset() { handledBlock = 0; curLinesNum = 0; nextLineIndex = 0; #ifdef NDP_ASYNC_RPC // add timestamp if we don't want to wait, and remember release all while deleting. while (pg_atomic_read_u32(&respCount) < pg_atomic_read_u32(&reqCount)) { pg_usleep(NDP_RPC_WAIT_USEC); } #endif FreeCurSlot(); #ifdef NDP_ASYNC_RPC NdpIoSlot* tmpIO = nullptr; int tmpId; while (respIO->Dequeue(tmpIO)) { delete tmpIO; } while (normalPagesId->Dequeue(tmpId)); #else normalPagesNum = 0; #endif MemoryContextReset(memCtx); } void NdpScanDescData::AddToNormal(uint32 start, uint32 end) { end = end > nBlock ? nBlock : end; start = start > nBlock ? nBlock : start; for (uint32 i = start; i < end; ++i) { AddToNormal(i); } } // return true if set current io slot; bool NdpScanDescData::HandleSlot(NdpIoSlot* slot) { uint32 start; uint64* pageMap; NdpPageHeader pages; int pageNum; NdpRetCode ret = slot->GetResp(pages, pageNum, start, pageMap); if (ret != NdpRetCode::NDP_OK) { ++failedIoN; } if (start > nBlock) { ereport(ERROR, (errmsg("can not happen start %u is cross the border.", start))); return false; } uint32 end = start + slot->GetPushDownPageNum(); if (pageMap == nullptr) { // slot is invalid, add to normal list AddToNormal(start, end); return false; } // put unhandled page to normal queue int count = 0; end = end > nBlock ? nBlock : end; start = start > nBlock ? nBlock : start; for (uint32 offset = start, i = 0; offset < end; ++i, ++offset) { int flag = NDPGETBIT(pageMap, i); if (!flag) { AddToNormal(offset); count++; } } sendBackPageN += count; if (scanState->ps.instrument) { scanState->ps.instrument->ndp_sendback_page += count; } if (pages) { curIO = slot; curNdpPages = pages; curNdpPagesNum = pageNum; nextNdpPageIndex = 0; if (pages->pd_flags == NDP_FILTERED_PAGE) { ndpPageScanN += pageNum; } else { ndpPageAggN += pageNum; } if (scanState->ps.instrument) { scanState->ps.instrument->ndp_handled += pageNum; } return true; } else { return false; } } #ifdef NDP_ASYNC_RPC bool NdpScanDescData::GetNextSlot(void) { NdpIoSlot* slot = nullptr; if (respIO->Dequeue(slot)) { if (HandleSlot(slot)) { return true; } else { delete slot; return false; } } return false; } #endif #ifdef NDP_ASYNC_RPC void NdpIoSlotCallDone(RpcStatus status, void *arg) { NdpIoSlot* cbArg = reinterpret_cast(arg); if (!cbArg) { return; } NdpScanDesc ndpScan = reinterpret_cast(cbArg->GetPriv()); if (ndpScan == nullptr) { return; } #ifdef FAULT_INJECT if ((rand() % PERCENTAGE_DIV) < PERCENTAGE) { status = RPC_ERROR; } #endif if (status != RPC_OK) { cbArg->SetRespRet(NdpRetCode::RPC_IO_CALLBACK_ERROR); cbArg->SetRPCRet(status); } while (!ndpScan->respIO->Enqueue(cbArg)) { pg_usleep(NDP_RPC_WAIT_USEC); } pg_atomic_add_fetch_u32(&ndpScan->respCount, 1); } #endif bool NdpScanChannel::Init(uint32 id, char* ip, uint32 tableN) { rpcId = id; errno_t rc = strcpy_s(rpcIp, NDP_RPC_IP_LEN, ip); securec_check(rc, "", ""); status = NdpScanChannelStatus::UNCONNECTED; pthread_mutex_init(&mutex, nullptr); rpcClient = 0; queryId = 0; tableNum = tableN; tableMgr = New(CurrentMemoryContext) NdpTableMgr[tableNum](); if (tableMgr == nullptr) { return false; } connFailed = 0; cmdFailed = 0; return true; } NdpRetCode NdpScanChannel::SendRequest(NdpIoSlot* req, NdpScanDesc ndpScan) { if (status == NdpScanChannelStatus::CLOSED) { return NdpRetCode::CONNECT_UNUSABLE; } if (req->GetReqTableId() >= tableNum) { ereport(WARNING, (errmsg("table id %u should be littler then %u.", req->GetReqTableId(), tableNum))); return NdpRetCode::TABLE_ID_INVALID; } if (status == NdpScanChannelStatus::QUERYSENT) { return SendReq(req, ndpScan); } if (pthread_mutex_trylock(&mutex) != 0) { return NdpRetCode::CONNECT_UNUSABLE; } NdpRetCode ret = NdpRetCode::NDP_OK; switch (status) { case NdpScanChannelStatus::UNCONNECTED: { // free old connection DisconnectRpc(); // call rpc connect RpcStatus connectRet = RpcClientConnect(rpcIp, u_sess->ndp_cxt.ndp_port, rpcClient); if (SECUREC_UNLIKELY(connectRet != RPC_OK)) { ++connFailed; DisconnectRpc(); if (connFailed >= RPC_FAILED_LIMIT) { status = NdpScanChannelStatus::CLOSED; } ereport(LOG, (errmsg("rpc connect (count:%d) failed, ip:port %s:%d. rpc status: %d", connFailed, rpcIp, u_sess->ndp_cxt.ndp_port, connectRet))); ret = NdpRetCode::CONNECT_FAILED; break; } status = NdpScanChannelStatus::CONNECTED; } case NdpScanChannelStatus::CONNECTED: { PG_TRY(); { ret = SendQuery(ndpScan); } PG_CATCH(); { ereport(WARNING, (errmsg("send query failed, it is possible a palloc failed."))); pthread_mutex_unlock(&mutex); PG_RE_THROW(); } PG_END_TRY(); if (SECUREC_UNLIKELY(ret != NdpRetCode::NDP_OK)) { ++cmdFailed; if (cmdFailed >= RPC_FAILED_LIMIT) { status = NdpScanChannelStatus::CLOSED; } break; } status = NdpScanChannelStatus::QUERYSENT; } case NdpScanChannelStatus::QUERYSENT: { pthread_mutex_unlock(&mutex); return SendReq(req, ndpScan); } case NdpScanChannelStatus::CLOSED: default: { ret = NdpRetCode::CONNECT_UNUSABLE; break; } } pthread_mutex_unlock(&mutex); return ret; } NdpRetCode NdpScanChannel::SendEnd() { NdpRetCode retCode = NdpRetCode::NDP_OK; if (status != NdpScanChannelStatus::QUERYSENT) { /* If all pages are cached in BufferPool, the channel might be UNCONNECTED */ ereport(LOG, (errmsg("channel %s is not QUERY_SENT.", rpcIp))); return NdpRetCode::CONNECT_UNUSABLE; } pthread_mutex_lock(&mutex); if (status == NdpScanChannelStatus::QUERYSENT) { NdpAdminRequest req; NdpAdminResponse resp; req.head.command = NDP_TERMINATE; req.head.size = sizeof(NdpAdminRequest); req.taskId = queryId; req.tableId = 0; NdpRetCode ret = SendAdminReq(&req, &resp, sizeof(resp.ret)); if (ret != NdpRetCode::NDP_OK) { retCode = NdpRetCode::RPC_ADMIN_SEND_TERMINATE_FAILED; } else { status = NdpScanChannelStatus::CLOSED; } } pthread_mutex_unlock(&mutex); return retCode; } NdpRetCode NdpScanChannel::SendReq(NdpIoSlot* req, NdpScanDesc ndpScan) { NdpTableMgr* mgr = &tableMgr[req->GetReqTableId()]; if (mgr->status == NdpTableStatus::CONSTRUCTFAIL) { return NdpRetCode::NDP_CONSTRUCT_FAILED; } if (mgr->status == NdpTableStatus::STATESENT) { return SendIo(req, ndpScan); } if (mgr->cmdNdpFailed >= RPC_FAILED_LIMIT) { return NdpRetCode::RPC_ADMIN_SEND_FAIL; } if (pthread_mutex_trylock(&mgr->mutex) != 0) { return NdpRetCode::TABLE_MGR_UNUSABLE; } NdpRetCode ret; PG_TRY(); { ret = SendAdmin(mgr, req, ndpScan); } PG_CATCH(); { ereport(WARNING, (errmsg("send failed, it is possible a palloc failed."))); ret = NdpRetCode::NDP_ERROR; } PG_END_TRY(); pthread_mutex_unlock(&mgr->mutex); if (mgr->status == NdpTableStatus::STATESENT) { return SendIo(req, ndpScan); } return ret; } NdpRetCode NdpScanChannel::SendAdmin(NdpTableMgr* mgr, NdpIoSlot* req, NdpScanDesc ndpScan) { NdpRetCode ret = NdpRetCode::NDP_OK; switch (mgr->status) { case NdpTableStatus::INITIAL: { ret = SendPlan(ndpScan); if (ret != NdpRetCode::NDP_OK) { mgr->status = (ret == NdpRetCode::NDP_CONSTRUCT_FAILED) ? NdpTableStatus::CONSTRUCTFAIL : mgr->status; break; } mgr->status = NdpTableStatus::PLANSENT; } case NdpTableStatus::PLANSENT: { ret = SendState(ndpScan); if (ret != NdpRetCode::NDP_OK) { mgr->status = (ret == NdpRetCode::NDP_CONSTRUCT_FAILED) ? NdpTableStatus::CONSTRUCTFAIL : mgr->status; break; } mgr->status = NdpTableStatus::STATESENT; } case NdpTableStatus::STATESENT: { ret = NdpRetCode::NDP_OK; break; } default: { ret = NdpRetCode::NDP_ERROR; break; } } if (ret != NdpRetCode::NDP_OK) { ++mgr->cmdNdpFailed; } return ret; } NdpRetCode NdpScanChannel::SendQuery(NdpScanDesc ndpScan) { NdpAdminRequest *v = ConstructVersion(); NdpAdminResponse resp; NdpRetCode versionRet = SendAdminReq(v, &resp, sizeof(NdpAdminResponse)); pfree(v); if (versionRet != NdpRetCode::NDP_OK) { DisconnectRpc(); status = NdpScanChannelStatus::CLOSED; ereport(LOG, (errmsg("send version admin (count:%d) request failed, ip:port %s:%d.", cmdFailed, rpcIp, u_sess->ndp_cxt.ndp_port))); return NdpRetCode::RPC_ADMIN_SEND_VERSION_FAILED; } NdpAdminRequest *query; resp = {}; query = ConstructQuery(ndpScan); NdpRetCode queryRet = SendAdminReq(query, &resp, sizeof(NdpAdminResponse)); pfree(query); if (queryRet != NdpRetCode::NDP_OK) { ereport(LOG, (errmsg("send admin (count:%d) request failed, ip:port %s:%d.", cmdFailed, rpcIp, u_sess->ndp_cxt.ndp_port))); return NdpRetCode::RPC_ADMIN_SEND_CTX_FAILED; } queryId = (uint16)resp.queryId; return NdpRetCode::NDP_OK; } NdpRetCode NdpScanChannel::SendPlan(NdpScanDesc ndpScan) { NdpAdminRequest* planReq = ConstructPlanReq(ndpScan); if (planReq == nullptr) { return NdpRetCode::NDP_CONSTRUCT_FAILED; } NdpAdminResponse resp; NdpRetCode ret = SendAdminReq(planReq, &resp, sizeof(resp.ret)); pfree(planReq); if (ret != NdpRetCode::NDP_OK) { ret = NdpRetCode::RPC_ADMIN_SEND_PLAN_FAILED; } return ret; } NdpRetCode NdpScanChannel::SendAdminReq(NdpAdminRequest* req, NdpAdminResponse* resp, size_t size) { RpcStatus status = RpcSendAdminReq(req, resp, size, rpcClient); if (status != RPC_OK) { ereport(LOG, (errmsg("RpcSendAdminReq failed. CMD code:%d, Rpc code: %d", req->head.command, status))); return NdpRetCode::RPC_ADMIN_SEND_FAIL; } else if (resp->ret != NDP_OK) { ereport(LOG, (errmsg("AdminReq handle failed."))); return NdpRetCode::NDP_ERROR; } return NdpRetCode::NDP_OK; } NdpRetCode NdpScanChannel::SendState(NdpScanDesc ndpScan) { NdpAdminRequest* state = ConstructPlanState(ndpScan); if (state == nullptr) { return NdpRetCode::NDP_CONSTRUCT_FAILED; } NdpAdminResponse resp; NdpRetCode ret = SendAdminReq(state, &resp, sizeof(resp.ret)); pfree(state); if (ret != NdpRetCode::NDP_OK) { ret = NdpRetCode::RPC_ADMIN_SEND_STATE_FAILED; } return ret; } NdpRetCode NdpScanChannel::SendIo(NdpIoSlot* req, NdpScanDesc ndpScan) { req->SetReq(queryId); NdpTableMgr* mgr = &tableMgr[req->GetReqTableId()]; if (mgr->ioFailed >= SEND_FAILED_LIMIT) { return NdpRetCode::RPC_IO_SEND_FAILED; } #ifdef NDP_ASYNC_RPC RpcCallDone callDone = {.cb = &NdpIoSlotCallDone, .arg = (void*)req }; pg_atomic_add_fetch_u32(&ndpScan->reqCount, 1); // before send RpcStatus ret = RpcSendIOReq(req->GetReqMsg(), req->GetRespMsg(), &callDone, rpcClient); if (ret != RPC_OK) { pg_atomic_sub_fetch_u32(&ndpScan->reqCount, 1); // before send mgr->ioFailed++; return NdpRetCode::RPC_IO_SEND_FAILED; } #else RpcStatus ret = RpcSendIOReq(req->GetReqMsg(), req->GetRespMsg(), NULL, rpcClient); if (ret != RPC_OK) { mgr->ioFailed++; return NdpRetCode::RPC_IO_SEND_FAILED; } #endif return NdpRetCode::NDP_OK; } NdpAdminRequest* NdpScanChannel::ConstructPlanReq(NdpScanDesc ndpScan) { if (IsA(ndpScan->cond->plan, Agg)) { Agg* agg = (Agg*)ndpScan->cond->plan; if (agg->grp_collations == NULL && agg->numCols > 0) { agg->grp_collations = (unsigned int*)palloc(sizeof(unsigned int) * agg->numCols); for (int i = 0; i < agg->numCols; i++) { agg->grp_collations[i] = InvalidOid; } } } char* str = nodeToString(ndpScan->cond->plan); int len = strlen(str) + 1; if (len == 1) { return nullptr; } NdpAdminRequest* req = reinterpret_cast(palloc(sizeof(NdpAdminRequest) + len)); req->head.command = NDP_PLAN; req->head.size = sizeof(NdpAdminRequest) + len; req->taskId = queryId; req->tableId = ndpScan->cond->tableId; errno_t rc = memcpy_s(reinterpret_cast(req + 1), len, str, len); securec_check(rc, "", ""); return req; } bool NdpScanChannel::ExtractTupleDesc(TupleDesc desc, NdpTupleDesc* td) { td->natts = desc->natts; if (td->natts == 0) { td->attrs = NULL; return true; } td->attrs = (NdpPGAttr *)palloc(sizeof(NdpPGAttr) * td->natts); for (int i = 0; i < desc->natts; ++i) { td->attrs[i].attlen = desc->attrs[i].attlen; td->attrs[i].attbyval = desc->attrs[i].attbyval; td->attrs[i].attcacheoff = desc->attrs[i].attcacheoff; td->attrs[i].attalign = desc->attrs[i].attalign; td->attrs[i].attndims = desc->attrs[i].attndims; td->attrs[i].attstorage = desc->attrs[i].attstorage; } td->tdhasoid = desc->tdhasoid; td->tdhasuids = desc->tdhasuids; td->tdtypeid = desc->tdtypeid; td->tdtypmod = desc->tdtypmod; return true; } bool NdpScanChannel::ExtractRelation(TableScanDesc scan, NdpRelation* rel) { rel->node.spcNode = scan->rs_rd->rd_node.spcNode; rel->node.dbNode = scan->rs_rd->rd_node.dbNode; rel->node.relNode = scan->rs_rd->rd_node.relNode; rel->node.bucketNode = scan->rs_rd->rd_node.bucketNode; rel->node.opt = scan->rs_rd->rd_node.opt; return ExtractTupleDesc(scan->rs_rd->rd_att, &rel->att); } bool NdpScanChannel::ExtractXact(TableScanDesc scan, NdpXact* xact) { xact->comboCids = NULL; xact->CLogPageBuffer = NULL; xact->CSNLogPageBuffer = NULL; /* snapshot */ NdpSnapshot* snapshot = &xact->snapshot; snapshot->satisfies = scan->rs_snapshot->satisfies; snapshot->xmin = scan->rs_snapshot->xmin; /* all XID < xmin are visible to me */ snapshot->xmax = scan->rs_snapshot->xmax; /* all XID >= xmax are invisible to me */ snapshot->snapshotcsn = scan->rs_snapshot->snapshotcsn; snapshot->curcid = scan->rs_snapshot->curcid; /* TransactionState */ xact->transactionId = GetCurrentTransactionIdIfAny(); /* comboCids */ xact->usedComboCids = u_sess->utils_cxt.usedComboCids; if (xact->usedComboCids > 0) { xact->comboCids = (uint32 *)palloc(xact->usedComboCids * 2 * sizeof(uint32)); uint32 *comboCids = (uint32 *)u_sess->utils_cxt.comboCids; for (int i = 0; i < xact->usedComboCids; i++) { xact->comboCids[i * 2] = comboCids[i * 2]; xact->comboCids[i * 2 + 1] = comboCids[i * 2 + 1]; } } else { xact->comboCids = NULL; } /* clog & csnlog */ /* It's okay to read latestCompletedXid without acquiring ProcArrayLock shared lock * because we dont' care if we get a slightly stale value */ xact->latestCompletedXid = t_thrd.xact_cxt.ShmemVariableCache->latestCompletedXid; int64 pagenoStart, pagenoEnd, pageno; pagenoStart = TransactionIdToPage(FirstNormalTransactionId); pagenoEnd = TransactionIdToPage(xact->latestCompletedXid); xact->CLogLen = (pagenoEnd - pagenoStart + 1) * BLCKSZ; xact->CLogPageBuffer = (char *)palloc(xact->CLogLen); for (pageno = pagenoStart; pageno <= pagenoEnd; pageno++) { CopyCLog(pageno, xact->CLogPageBuffer + (pageno - pagenoStart) * BLCKSZ); } pagenoStart = TransactionIdToCSNPage(xact->latestCompletedXid); pagenoEnd = TransactionIdToCSNPage(xact->latestCompletedXid); xact->CSNLogLen = (pagenoEnd - pagenoStart + 1) * BLCKSZ; xact->CSNLogPageBuffer = (char *)palloc(xact->CSNLogLen); for (pageno = pagenoStart; pageno <= pagenoEnd; pageno++) { CopyCSNLog(pageno, xact->CSNLogPageBuffer + (pageno - pagenoStart) * BLCKSZ); } return true; } bool NdpScanChannel::ExtractAggState(NdpScanDesc ndpScan, NdpAggState* aggS) { aggS->aggTd.natts = 0; aggS->aggTd.attrs = nullptr; aggS->aggNum = 0; aggS->perAggTd = nullptr; aggS->numCols = 0; aggS->eqFuncOid = nullptr; aggS->hashFuncOid = nullptr; if (ndpScan->aggState) { Assert(ndpScan->aggSlot != nullptr); ExtractTupleDesc(ndpScan->aggSlot->tts_tupleDescriptor, &aggS->aggTd); aggS->aggNum = ndpScan->aggState->numaggs; aggS->perAggTd = (NdpTupleDesc*)palloc(aggS->aggNum * sizeof(NdpTupleDesc)); for (int aggNo = 0; aggNo < aggS->aggNum; ++aggNo) { ExtractTupleDesc(ndpScan->aggState->evaldesc, &aggS->perAggTd[aggNo]); } Agg* agg = (Agg*)ndpScan->aggState->ss.ps.plan; if ((agg->aggstrategy == AGG_HASHED) && (agg->numCols > 0)) { aggS->numCols = agg->numCols; aggS->eqFuncOid = (unsigned int*)palloc(aggS->numCols * sizeof(unsigned int)); aggS->hashFuncOid = (unsigned int*)palloc(aggS->numCols * sizeof(unsigned int)); for (int colNo = 0; colNo < aggS->numCols; ++colNo) { aggS->eqFuncOid[colNo] = ndpScan->aggState->phases[0].eqfunctions[colNo].fn_oid; aggS->hashFuncOid[colNo] = ndpScan->aggState->hashfunctions[colNo].fn_oid; } } } return true; } bool CheckExprContext(ExprContext* econtext, NdpParamList* pList) { ParamListInfo paramInfo = econtext->ecxt_param_list_info; if (paramInfo->paramFetch != NULL) { return false; } pList->numParams = paramInfo->numParams; pList->params = (NdpParamData*)palloc(pList->numParams * sizeof(NdpParamData)); for (int i = 0; i < pList->numParams; i++) { ParamExternData* from = ¶mInfo->params[i]; NdpParamData* to = &pList->params[i]; if (from->tabInfo && from->tabInfo->isnestedtable && plpgsql_estate) { pfree(pList->params); pList->numParams = 0; return false; } to->isnull = from->isnull; to->ptype = from->ptype; to->value = from->value; get_typlenbyval(to->ptype, &to->typlen, &to->typbyval); to->value = datumCopy(to->value, to->typbyval, to->typlen); } return true; } bool ExtractParamList(NdpScanDesc ndpScan, NdpParamList* pList) { ExprContext* econtext = ndpScan->scanState->ps.ps_ExprContext; if (econtext == nullptr || econtext->ecxt_param_list_info == nullptr || econtext->ecxt_param_list_info->numParams == 0) { pList->numParams = 0; pList->params = nullptr; return true; } return CheckExprContext(econtext, pList); } bool ExtractKnlSessionContext(NdpSessionContext* sess) { if (u_sess != nullptr) { sess->sql_compatibility = u_sess->attr.attr_sql.sql_compatibility; sess->behavior_compat_flags = u_sess->utils_cxt.behavior_compat_flags; sess->encoding = u_sess->mb_cxt.DatabaseEncoding->encoding; return true; } return false; } NdpPlanState* NdpScanChannel::CreatePlanState(NdpScanDesc ndpScan) { NdpPlanState* state = (NdpPlanState*)palloc(sizeof(NdpPlanState)); TableScanDesc scan = ndpScan->scan; ExtractRelation(scan, &state->rel); ProjectionInfo* proj_info = ndpScan->scanState->ps.ps_ProjInfo; if (proj_info != NULL) { ExtractTupleDesc(proj_info->pi_slot->tts_tupleDescriptor, &state->scanTd); } else { ExtractTupleDesc(ndpScan->scanState->ss_ScanTupleSlot->tts_tupleDescriptor, &state->scanTd); } if (!ExtractAggState(ndpScan, &state->aggState)) { DestroyPlanState(state); return nullptr; } if (!ExtractParamList(ndpScan, &state->paramList)) { DestroyPlanState(state); return nullptr; } if (!ExtractKnlSessionContext(&state->sess)) { DestroyPlanState(state); return nullptr; } return state; } void NdpScanChannel::DestroyPlanState(NdpPlanState* state) { if (state == nullptr) return; if (state->rel.att.attrs) pfree(state->rel.att.attrs); if (state->scanTd.attrs) pfree(state->scanTd.attrs); if (state->paramList.params != nullptr) { pfree(state->paramList.params); } NdpAggState* aggS = &state->aggState; if (aggS->aggTd.attrs) pfree(aggS->aggTd.attrs); if (aggS->perAggTd) { for (int i = 0; i < aggS->aggNum; ++i) { if (aggS->perAggTd[i].attrs) pfree(aggS->perAggTd[i].attrs); } pfree(aggS->perAggTd); } if (aggS->eqFuncOid) pfree(aggS->eqFuncOid); if (aggS->hashFuncOid) pfree(aggS->hashFuncOid); pfree(state); } NdpAdminRequest* NdpScanChannel::ConstructPlanState(NdpScanDesc ndpScan) { NdpPlanState* state = CreatePlanState(ndpScan); if (state == nullptr) { return nullptr; } StringInfoData str; initStringInfo(&str); NdpAdminRequest head; appendBinaryStringInfo(&str, (const char*)&head, (int)sizeof(head)); stateToString(state, &str); NdpAdminRequest* ptr = (NdpAdminRequest*)str.data; ptr->head.command = NDP_PLANSTATE; ptr->head.size = str.len; ptr->taskId = queryId; ptr->tableId = ndpScan->cond->tableId; DestroyPlanState(state); return ptr; } NdpAdminRequest* NdpScanChannel::ConstructQuery(NdpScanDesc ndpScan) { NdpQuery* query = (NdpQuery*)palloc(sizeof(NdpQuery)); NdpContext* context = static_cast(ndpScan->cond->ctx); query->tableNum = context->tableCount; ExtractXact(ndpScan->scan, &query->xact); StringInfoData str; initStringInfo(&str); NdpAdminRequest head; appendBinaryStringInfo(&str, (const char*)&head, (int)sizeof(head)); queryToString(query, &str); NdpAdminRequest* ptr = (NdpAdminRequest*)str.data; ptr->head.command = NDP_QUERY; ptr->head.size = str.len; if (query->xact.comboCids) pfree(query->xact.comboCids); if (query->xact.CLogPageBuffer) pfree(query->xact.CLogPageBuffer); if (query->xact.CSNLogPageBuffer) pfree(query->xact.CSNLogPageBuffer); pfree(query); return ptr; } NdpAdminRequest* NdpScanChannel::ConstructVersion() { int len = sizeof(uint64); uint64 version = (((uint64)GRAND_VERSION_NUM) << 32) | NDP_LOCAL_VERSION_NUM; NdpAdminRequest* req = reinterpret_cast(palloc(sizeof(NdpAdminRequest) + len)); req->head.command = NDP_VERSION; req->head.size = sizeof(NdpAdminRequest) + len; req->taskId = -1; req->tableId = -1; errno_t rc = memcpy_s(reinterpret_cast(req + 1), len, &version, len); securec_check(rc, "", ""); return req; }