Files

1131 lines
36 KiB
C++

/* -------------------------------------------------------------------------
* ndpam.cpp
* Routines to handle ndp page
*
* Portions Copyright (c) 2022 Huawei Technologies Co.,Ltd.
*
* IDENTIFICATION
* contrib/ndpplugin/ndpam.cpp
*
* -------------------------------------------------------------------------
*/
#include "access/csnlog.h"
#include "access/slru.h"
#include "executor/node/nodeAgg.h"
#include "component/rpc/rpc.h"
#include "storage/smgr/segment_internal.h"
#include "ddes/dms/ss_transaction.h"
#include "algorithm"
#include "storage/dss/fio_dss.h"
#include "storage/smgr/segment.h"
#include "ndpnodes.h"
#include "ndpam.h"
#define NDP_PAGE_QUEUE_SIZE (1u << 10)
#define NDP_NORMAL_QUEUE_SIZE (1u << 12)
#define ClogCtl(n) (&t_thrd.shemem_ptr_cxt.ClogCtl[CBufHashPartition(n)])
#define CsnlogCtl(n) (&t_thrd.shemem_ptr_cxt.CsnlogCtlPtr[CSNBufHashPartition(n)])
#define CSNLOG_XACTS_PER_PAGE (BLCKSZ / sizeof(CommitSeqNo))
#define CSN_LWLOCK_ACQUIRE(pageno, lockmode) ((void)LWLockAcquire(CSNBufMappingPartitionLock(pageno), lockmode))
#define CSN_LWLOCK_RELEASE(pageno) (LWLockRelease(CSNBufMappingPartitionLock(pageno)))
#define TransactionIdToCSNPage(xid) ((xid) / (TransactionId)CSNLOG_XACTS_PER_PAGE)
constexpr int RPC_FAILED_LIMIT = 3;
constexpr int SEND_FAILED_LIMIT = 10;
inline static void SegLogicPageIdToExtentOffset(BlockNumber logicId, uint32* offset)
{
if (logicId < EXT_SIZE_8_TOTAL_PAGES) {
*offset = logicId % EXT_SIZE_8;
} else if (logicId < EXT_SIZE_128_TOTAL_PAGES) {
logicId -= EXT_SIZE_8_TOTAL_PAGES;
*offset = logicId % EXT_SIZE_128;
} else if (logicId < EXT_SIZE_1024_TOTAL_PAGES) {
logicId -= EXT_SIZE_128_TOTAL_PAGES;
*offset = logicId % EXT_SIZE_1024;
} else {
logicId -= EXT_SIZE_1024_TOTAL_PAGES;
*offset = logicId % EXT_SIZE_8192;
}
}
static void md_get_physical_info(Relation rel, ForkNumber forknum, BlockNumber blocknum,
int *handle, off_t *offset)
{
SMgrRelation reln = rel->rd_smgr;
MdfdVec *v = NULL;
v = _mdfd_getseg(reln, forknum, blocknum, false, EXTENSION_FAIL);
Assert(v != NULL);
*handle = FileFd(v->mdfd_vfd);
*offset = DF_OFFSET_TO_SLICE_OFFSET(((off_t)blocknum) * BLCKSZ);
}
void md_get_pageinfo(NdpScanDesc ndpScan, BlockNumber page, CephObject *object, char *ip,
BlockNumber& end, uint32& phyStartBlockNum)
{
// step1:get object and ip.
int handle;
off_t offset;
HeapScanDesc scan = (HeapScanDesc)ndpScan->scan;
md_get_physical_info(scan->rs_base.rs_rd, 0, page, &handle, &offset);
#ifdef GlobalCache
dss_get_addr(handle, offset, object->pool, object->image, ip, &object->objId, &object->objOffset);
#endif
// step2:slice pages, get end and phyStartBlockNum.
BlockNumber start = ndpScan->handledBlock;
#ifdef GlobalCache
Assert(object->objOffset >= 0);
end = Min((uint64_t)ndpScan->nBlock, (uint64_t)(start + PAGE_NUM_PER_AU - object->objOffset/BLCKSZ));
#else
end = Min((uint64_t)ndpScan->nBlock, (uint64_t)(start + PAGE_NUM_PER_AU));
#endif
phyStartBlockNum = NDPMERGEBIT(start, 0);
}
static void seg_get_physical_info(Relation rel, ForkNumber forknum, BlockNumber blocknum,
SegPageLocation &loc, int *handle, off_t *offset)
{
SMgrRelation reln = rel->rd_smgr;
loc = seg_get_physical_location(rel->rd_node, MAIN_FORKNUM, blocknum);
SegmentCheck(loc.blocknum != InvalidBlockNumber);
LockSegmentHeadPartition(reln->seg_space->spcNode, reln->seg_space->dbNode,
reln->seg_desc[forknum]->head_blocknum, LW_SHARED);
SegSpace *spc = reln->seg_space;
RelFileNode relNode = {.spcNode = spc->spcNode,
.dbNode = spc->dbNode,
.relNode = EXTENT_SIZE_TO_TYPE(loc.extent_size),
.bucketNode = SegmentBktId,
.opt = 0};
int egid = EXTENT_TYPE_TO_GROUPID(relNode.relNode);
SegExtentGroup *seg = &spc->extent_group[egid][forknum];
off_t beginoff = ((off_t)loc.blocknum) * BLCKSZ;
int sliceno = DF_OFFSET_TO_SLICENO(beginoff);
SegPhysicalFile spf = df_get_physical_file(seg->segfile, sliceno, loc.blocknum);
*handle = spf.fd;
*offset = DF_OFFSET_TO_SLICE_OFFSET(beginoff);
UnlockSegmentHeadPartition(reln->seg_space->spcNode, reln->seg_space->dbNode,
reln->seg_desc[forknum]->head_blocknum);
}
void seg_get_pageinfo(NdpScanDesc ndpScan, BlockNumber page, CephObject *object, char *ip,
BlockNumber& end, uint32& phyStartBlockNum)
{
// step1:get object and ip.
int handle;
off_t offset;
SegPageLocation loc;
HeapScanDesc scan = (HeapScanDesc)ndpScan->scan;
seg_get_physical_info(scan->rs_base.rs_rd, 0, page, loc, &handle, &offset);
#ifdef GlobalCache
dss_get_addr(handle, offset, object->pool, object->image, ip, &object->objId, &object->objOffset);
#endif
// step2:slice pages, get end and phyStartBlockNum.
uint32 extentOff;
BlockNumber start = ndpScan->handledBlock;
SegLogicPageIdToExtentOffset(ndpScan->handledBlock, &extentOff);
uint64_t blks = Min(loc.extent_size - extentOff,
Min(uint32(PAGE_NUM_PER_AU), uint32(ndpScan->nBlock - ndpScan->handledBlock)));
#ifdef GlobalCache
Assert(object->objOffset >= 0);
end = start + Min(blks, (uint64_t)(PAGE_NUM_PER_AU - object->objOffset/BLCKSZ));
#else
end = start + blks;
#endif
phyStartBlockNum = NDPMERGEBIT(loc.blocknum, EXTENT_SIZE_TO_GROUPID(loc.extent_size) + 1);
}
void pm_get_pageinfo(NdpScanDesc ndpScan, BlockNumber page, CephObject *object, char *ip,
BlockNumber& end, uint32& phyStartBlockNum)
{
HeapScanDesc scan = (HeapScanDesc)ndpScan->scan;
int which = scan->rs_base.rs_rd->rd_smgr->smgr_which;
FileType filetype = which == 0 ? MDFILE : (which == 2 ? SEGFILE : INVALIDFILE);
Assert(filetype != INVALIDFILE);
PAGEMETHOD[filetype].get_pageinfo(ndpScan, page, object, ip, end, phyStartBlockNum);
uint64 curAUAligned = ndpScan->handledBlock / PARALLEL_SCAN_GAP_AU_ALIGNED;
if (scan->dop > 1 && end > (curAUAligned + 1) * PARALLEL_SCAN_GAP_AU_ALIGNED) {
end = (curAUAligned + 1) * PARALLEL_SCAN_GAP_AU_ALIGNED;
}
}
void CopyCLog(int64 pageno, char *pageBuffer)
{
int slotno;
errno_t rc = EOK;
/* lock is acquired by SimpleLruReadPage_ReadOnly */
slotno = SimpleLruReadPage_ReadOnly(ClogCtl(pageno), pageno, FirstNormalTransactionId);
rc = memcpy_s(pageBuffer, BLCKSZ, ClogCtl(pageno)->shared->page_buffer[slotno], BLCKSZ);
securec_check(rc, "", "");
LWLockRelease(ClogCtl(pageno)->shared->control_lock);
}
void CopyCSNLog(int64 pageno, char *pageBuffer)
{
int slotno;
errno_t rc = EOK;
CSN_LWLOCK_ACQUIRE(pageno, LW_SHARED);
slotno = SimpleLruReadPage_ReadOnly_Locked(CsnlogCtl(pageno), pageno, FirstNormalTransactionId);
rc = memcpy_s(pageBuffer, BLCKSZ, CsnlogCtl(pageno)->shared->page_buffer[slotno], BLCKSZ);
securec_check(rc, "", "");
CSN_LWLOCK_RELEASE(pageno);
}
int NdpIoSlot::SetReq(RelFileNode& node, uint16 taskId, uint16 tableId, AuInfo& auinfo)
{
int bitCount = 0;
reqMsg.data = &req;
reqMsg.len = sizeof(NdpIORequest);
respMsg.data = nullptr;
respMsg.len = 0;
req.taskId = taskId;
req.tableId = tableId;
req.auInfos[0].phyStartBlockNum = auinfo.phyStartBlockNum;
req.auInfos[0].pageNum = auinfo.pageNum;
#ifdef GlobalCache
errno_t rc2 = memcpy_s(&req.auInfos[0].object, sizeof(CephObject), &auinfo.object, sizeof(CephObject));
securec_check(rc2, "", "");
#endif
errno_t rc = memset_s(req.pageMap, BITMAP_SIZE_PER_AU_BYTE, 0, BITMAP_SIZE_PER_AU_BYTE);
securec_check(rc, "", "");
for (uint32 i = startBlockNum, offset = 0; i != startBlockNum + auinfo.pageNum; ++i, ++offset) {
bool cached = IsPageHitBufferPool(node, MAIN_FORKNUM, i);
if (!cached) {
NDPSETBIT(req.pageMap, offset);
++bitCount;
}
}
if (SS_STANDBY_MODE) {
SSIsPageHitDms(node, startBlockNum, auinfo.pageNum, req.pageMap, &bitCount);
}
return bitCount;
}
NdpRetCode NdpIoSlot::SetResp(int pageNum)
{
#ifdef ENABLE_SSL
respMsg.len = 0;
respMsg.data = nullptr;
return NdpRetCode::NDP_OK;
#else
respMsg.len = DSS_DEFAULT_AU_SIZE;
if (g_ndp_instance.pageContext->Dequeue(respMsg.data)) {
resp = reinterpret_cast<NdpIOResponse*>(respMsg.data);
return NdpRetCode::NDP_OK;
}
return NdpRetCode::ALLOC_RESPONSE_MEMORY_FAILED;
#endif
}
NdpRetCode NdpIoSlot::GetResp(NdpPageHeader& pages, int& pageNum, BlockNumber& start, uint64*& map)
{
start = startBlockNum;
map = nullptr;
pages = nullptr;
pageNum = 0;
if (respRet != NdpRetCode::NDP_OK) {
return respRet;
}
auto rpcResp = reinterpret_cast<NdpIOResponse*>(respMsg.data);
if (rpcResp == nullptr) {
return NdpRetCode::NDP_RETURN_FAILED;
}
#ifdef ENABLE_SSL
resp = reinterpret_cast<NdpIOResponse*>(respMsg.data);
#endif
if (rpcResp->status != 0) {
return NdpRetCode::NDP_RETURN_STATUS_ERROR;
}
map = rpcResp->pageMap;
pageNum = rpcResp->ndpPageNums;
if (pageNum) {
pages = (NdpPageHeader)((char*)rpcResp + sizeof(NdpIOResponse));
}
return NdpRetCode::NDP_OK;
}
void NdpIoSlot::FreeResp()
{
#ifdef ENABLE_SSL
if (resp != nullptr) {
free(resp);
resp = nullptr;
return;
}
if (respMsg.data) {
free(respMsg.data);
respMsg.data = nullptr;
return;
}
#else
bool enqueued;
Assert(g_ndp_instance.pageContext);
if (resp) {
enqueued = g_ndp_instance.pageContext->Enqueue(resp);
if (!enqueued) {
ereport(WARNING, (errmsg("try enqueue slot memory to queue failed")));
}
resp = nullptr;
return;
}
if (respMsg.data) {
enqueued = g_ndp_instance.pageContext->Enqueue(respMsg.data);
if (!enqueued) {
ereport(WARNING, (errmsg("try enqueue slot memory to queue failed")));
}
respMsg.data = nullptr;
return;
}
#endif
}
NdpRetCode NdpScanDescData::Init(ScanState* sstate, TableScanDesc sscan)
{
curIO = nullptr; // necessary, because rescan may be before get next
// free everything in destructor if alloc failed
#ifdef NDP_ASYNC_RPC
pg_atomic_init_u32(&reqCount, 0);
pg_atomic_init_u32(&respCount, 0);
respIO = new MpmcBoundedQueue<NdpIoSlot*>(NDP_PAGE_QUEUE_SIZE);
if (respIO == nullptr) {
ereport(WARNING, (errmsg("Alloc NpdPage Queue failed, size %d", NDP_PAGE_QUEUE_SIZE)));
return NdpRetCode::ALLOC_MQ_FAILED;
}
normalPagesId = new MpmcBoundedQueue<int>(NDP_NORMAL_QUEUE_SIZE);
if (normalPagesId == nullptr) {
ereport(WARNING, (errmsg("Alloc Normal Queue failed, size %d", NDP_NORMAL_QUEUE_SIZE)));
return NdpRetCode::ALLOC_MQ_FAILED;
}
#endif
memCtx = AllocSetContextCreate(CurrentMemoryContext, "ThreadNdpScanContext", 0, (4u << 10), (4u << 10));
if (!memCtx) {
ereport(WARNING, (errmsg("Create ThreadNdpScanContext failed!")));
return NdpRetCode::ALLOC_MC_FAILED;
}
#ifdef FAULT_INJECT
if ((rand() % PERCENTAGE_DIV) < PERCENTAGE) {
ereport(WARNING, (errmsg("Fault inject -- Create ThreadNdpScanContext failed")));
return NdpRetCode::ALLOC_MC_FAILED;
}
#endif
cond = (NdpScanCondition*)sstate->ps.plan->ndp_pushdown_condition;
scan = sscan;
scanState = sstate;
aggState = NULL;
aggSlot = NULL;
return NdpRetCode::NDP_OK;
}
NdpScanDescData::~NdpScanDescData()
{
// wait all callback return
while (pg_atomic_read_u32(&reqCount) != pg_atomic_read_u32(&respCount)) {
pg_usleep(NDP_RPC_WAIT_USEC);
}
#ifdef NDP_ASYNC_RPC
if (respIO) {
NdpIoSlot* tmpIO = nullptr;
while (respIO->Dequeue(tmpIO)) {
delete tmpIO;
}
delete respIO;
}
if (normalPagesId) {
delete normalPagesId;
}
#endif
if (memCtx) {
MemoryContextDelete(memCtx);
}
}
void NdpScanDescData::Reset()
{
handledBlock = 0;
curLinesNum = 0;
nextLineIndex = 0;
#ifdef NDP_ASYNC_RPC
// add timestamp if we don't want to wait, and remember release all while deleting.
while (pg_atomic_read_u32(&respCount) < pg_atomic_read_u32(&reqCount)) {
pg_usleep(NDP_RPC_WAIT_USEC);
}
#endif
FreeCurSlot();
#ifdef NDP_ASYNC_RPC
NdpIoSlot* tmpIO = nullptr;
int tmpId;
while (respIO->Dequeue(tmpIO)) {
delete tmpIO;
}
while (normalPagesId->Dequeue(tmpId));
#else
normalPagesNum = 0;
#endif
MemoryContextReset(memCtx);
}
void NdpScanDescData::AddToNormal(uint32 start, uint32 end)
{
end = end > nBlock ? nBlock : end;
start = start > nBlock ? nBlock : start;
for (uint32 i = start; i < end; ++i) {
AddToNormal(i);
}
}
// return true if set current io slot;
bool NdpScanDescData::HandleSlot(NdpIoSlot* slot)
{
uint32 start;
uint64* pageMap;
NdpPageHeader pages;
int pageNum;
NdpRetCode ret = slot->GetResp(pages, pageNum, start, pageMap);
if (ret != NdpRetCode::NDP_OK) {
++failedIoN;
}
if (start > nBlock) {
ereport(ERROR, (errmsg("can not happen start %u is cross the border.", start)));
return false;
}
uint32 end = start + slot->GetPushDownPageNum();
if (pageMap == nullptr) { // slot is invalid, add to normal list
AddToNormal(start, end);
return false;
}
// put unhandled page to normal queue
int count = 0;
end = end > nBlock ? nBlock : end;
start = start > nBlock ? nBlock : start;
for (uint32 offset = start, i = 0; offset < end; ++i, ++offset) {
int flag = NDPGETBIT(pageMap, i);
if (!flag) {
AddToNormal(offset);
count++;
}
}
sendBackPageN += count;
if (scanState->ps.instrument) {
scanState->ps.instrument->ndp_sendback_page += count;
}
if (pages) {
curIO = slot;
curNdpPages = pages;
curNdpPagesNum = pageNum;
nextNdpPageIndex = 0;
if (pages->pd_flags == NDP_FILTERED_PAGE) {
ndpPageScanN += pageNum;
} else {
ndpPageAggN += pageNum;
}
if (scanState->ps.instrument) {
scanState->ps.instrument->ndp_handled += pageNum;
}
return true;
} else {
return false;
}
}
#ifdef NDP_ASYNC_RPC
bool NdpScanDescData::GetNextSlot(void)
{
NdpIoSlot* slot = nullptr;
if (respIO->Dequeue(slot)) {
if (HandleSlot(slot)) {
return true;
} else {
delete slot;
return false;
}
}
return false;
}
#endif
#ifdef NDP_ASYNC_RPC
void NdpIoSlotCallDone(RpcStatus status, void *arg)
{
NdpIoSlot* cbArg = reinterpret_cast<NdpIoSlot*>(arg);
if (!cbArg) {
return;
}
NdpScanDesc ndpScan = reinterpret_cast<NdpScanDesc>(cbArg->GetPriv());
if (ndpScan == nullptr) {
return;
}
#ifdef FAULT_INJECT
if ((rand() % PERCENTAGE_DIV) < PERCENTAGE) {
status = RPC_ERROR;
}
#endif
if (status != RPC_OK) {
cbArg->SetRespRet(NdpRetCode::RPC_IO_CALLBACK_ERROR);
cbArg->SetRPCRet(status);
}
while (!ndpScan->respIO->Enqueue(cbArg)) {
pg_usleep(NDP_RPC_WAIT_USEC);
}
pg_atomic_add_fetch_u32(&ndpScan->respCount, 1);
}
#endif
bool NdpScanChannel::Init(uint32 id, char* ip, uint32 tableN)
{
rpcId = id;
errno_t rc = strcpy_s(rpcIp, NDP_RPC_IP_LEN, ip);
securec_check(rc, "", "");
status = NdpScanChannelStatus::UNCONNECTED;
pthread_mutex_init(&mutex, nullptr);
rpcClient = 0;
queryId = 0;
tableNum = tableN;
tableMgr = New(CurrentMemoryContext) NdpTableMgr[tableNum]();
if (tableMgr == nullptr) {
return false;
}
connFailed = 0;
cmdFailed = 0;
return true;
}
NdpRetCode NdpScanChannel::SendRequest(NdpIoSlot* req, NdpScanDesc ndpScan)
{
if (status == NdpScanChannelStatus::CLOSED) {
return NdpRetCode::CONNECT_UNUSABLE;
}
if (req->GetReqTableId() >= tableNum) {
ereport(WARNING, (errmsg("table id %u should be littler then %u.", req->GetReqTableId(), tableNum)));
return NdpRetCode::TABLE_ID_INVALID;
}
if (status == NdpScanChannelStatus::QUERYSENT) {
return SendReq(req, ndpScan);
}
if (pthread_mutex_trylock(&mutex) != 0) {
return NdpRetCode::CONNECT_UNUSABLE;
}
NdpRetCode ret = NdpRetCode::NDP_OK;
switch (status) {
case NdpScanChannelStatus::UNCONNECTED: {
// free old connection
DisconnectRpc();
// call rpc connect
RpcStatus connectRet = RpcClientConnect(rpcIp, u_sess->ndp_cxt.ndp_port, rpcClient);
if (SECUREC_UNLIKELY(connectRet != RPC_OK)) {
++connFailed;
DisconnectRpc();
if (connFailed >= RPC_FAILED_LIMIT) {
status = NdpScanChannelStatus::CLOSED;
}
ereport(LOG, (errmsg("rpc connect (count:%d) failed, ip:port %s:%d. rpc status: %d",
connFailed, rpcIp, u_sess->ndp_cxt.ndp_port, connectRet)));
ret = NdpRetCode::CONNECT_FAILED;
break;
}
status = NdpScanChannelStatus::CONNECTED;
}
case NdpScanChannelStatus::CONNECTED: {
PG_TRY();
{
ret = SendQuery(ndpScan);
}
PG_CATCH();
{
ereport(WARNING, (errmsg("send query failed, it is possible a palloc failed.")));
pthread_mutex_unlock(&mutex);
PG_RE_THROW();
}
PG_END_TRY();
if (SECUREC_UNLIKELY(ret != NdpRetCode::NDP_OK)) {
++cmdFailed;
if (cmdFailed >= RPC_FAILED_LIMIT) {
status = NdpScanChannelStatus::CLOSED;
}
break;
}
status = NdpScanChannelStatus::QUERYSENT;
}
case NdpScanChannelStatus::QUERYSENT: {
pthread_mutex_unlock(&mutex);
return SendReq(req, ndpScan);
}
case NdpScanChannelStatus::CLOSED:
default: {
ret = NdpRetCode::CONNECT_UNUSABLE;
break;
}
}
pthread_mutex_unlock(&mutex);
return ret;
}
NdpRetCode NdpScanChannel::SendEnd()
{
NdpRetCode retCode = NdpRetCode::NDP_OK;
if (status != NdpScanChannelStatus::QUERYSENT) {
/* If all pages are cached in BufferPool, the channel might be UNCONNECTED */
ereport(LOG, (errmsg("channel %s is not QUERY_SENT.", rpcIp)));
return NdpRetCode::CONNECT_UNUSABLE;
}
pthread_mutex_lock(&mutex);
if (status == NdpScanChannelStatus::QUERYSENT) {
NdpAdminRequest req;
NdpAdminResponse resp;
req.head.command = NDP_TERMINATE;
req.head.size = sizeof(NdpAdminRequest);
req.taskId = queryId;
req.tableId = 0;
NdpRetCode ret = SendAdminReq(&req, &resp, sizeof(resp.ret));
if (ret != NdpRetCode::NDP_OK) {
retCode = NdpRetCode::RPC_ADMIN_SEND_TERMINATE_FAILED;
} else {
status = NdpScanChannelStatus::CLOSED;
}
}
pthread_mutex_unlock(&mutex);
return retCode;
}
NdpRetCode NdpScanChannel::SendReq(NdpIoSlot* req, NdpScanDesc ndpScan)
{
NdpTableMgr* mgr = &tableMgr[req->GetReqTableId()];
if (mgr->status == NdpTableStatus::CONSTRUCTFAIL) {
return NdpRetCode::NDP_CONSTRUCT_FAILED;
}
if (mgr->status == NdpTableStatus::STATESENT) {
return SendIo(req, ndpScan);
}
if (mgr->cmdNdpFailed >= RPC_FAILED_LIMIT) {
return NdpRetCode::RPC_ADMIN_SEND_FAIL;
}
if (pthread_mutex_trylock(&mgr->mutex) != 0) {
return NdpRetCode::TABLE_MGR_UNUSABLE;
}
NdpRetCode ret;
PG_TRY();
{
ret = SendAdmin(mgr, req, ndpScan);
}
PG_CATCH();
{
ereport(WARNING, (errmsg("send failed, it is possible a palloc failed.")));
ret = NdpRetCode::NDP_ERROR;
}
PG_END_TRY();
pthread_mutex_unlock(&mgr->mutex);
if (mgr->status == NdpTableStatus::STATESENT) {
return SendIo(req, ndpScan);
}
return ret;
}
NdpRetCode NdpScanChannel::SendAdmin(NdpTableMgr* mgr, NdpIoSlot* req, NdpScanDesc ndpScan)
{
NdpRetCode ret = NdpRetCode::NDP_OK;
switch (mgr->status) {
case NdpTableStatus::INITIAL: {
ret = SendPlan(ndpScan);
if (ret != NdpRetCode::NDP_OK) {
mgr->status = (ret == NdpRetCode::NDP_CONSTRUCT_FAILED) ?
NdpTableStatus::CONSTRUCTFAIL : mgr->status;
break;
}
mgr->status = NdpTableStatus::PLANSENT;
}
case NdpTableStatus::PLANSENT: {
ret = SendState(ndpScan);
if (ret != NdpRetCode::NDP_OK) {
mgr->status = (ret == NdpRetCode::NDP_CONSTRUCT_FAILED) ?
NdpTableStatus::CONSTRUCTFAIL : mgr->status;
break;
}
mgr->status = NdpTableStatus::STATESENT;
}
case NdpTableStatus::STATESENT: {
ret = NdpRetCode::NDP_OK;
break;
}
default: {
ret = NdpRetCode::NDP_ERROR;
break;
}
}
if (ret != NdpRetCode::NDP_OK) {
++mgr->cmdNdpFailed;
}
return ret;
}
NdpRetCode NdpScanChannel::SendQuery(NdpScanDesc ndpScan)
{
NdpAdminRequest *v = ConstructVersion();
NdpAdminResponse resp;
NdpRetCode versionRet = SendAdminReq(v, &resp, sizeof(NdpAdminResponse));
pfree(v);
if (versionRet != NdpRetCode::NDP_OK) {
DisconnectRpc();
status = NdpScanChannelStatus::CLOSED;
ereport(LOG, (errmsg("send version admin (count:%d) request failed, ip:port %s:%d.",
cmdFailed, rpcIp, u_sess->ndp_cxt.ndp_port)));
return NdpRetCode::RPC_ADMIN_SEND_VERSION_FAILED;
}
NdpAdminRequest *query;
resp = {};
query = ConstructQuery(ndpScan);
NdpRetCode queryRet = SendAdminReq(query, &resp, sizeof(NdpAdminResponse));
pfree(query);
if (queryRet != NdpRetCode::NDP_OK) {
ereport(LOG, (errmsg("send admin (count:%d) request failed, ip:port %s:%d.",
cmdFailed, rpcIp, u_sess->ndp_cxt.ndp_port)));
return NdpRetCode::RPC_ADMIN_SEND_CTX_FAILED;
}
queryId = (uint16)resp.queryId;
return NdpRetCode::NDP_OK;
}
NdpRetCode NdpScanChannel::SendPlan(NdpScanDesc ndpScan)
{
NdpAdminRequest* planReq = ConstructPlanReq(ndpScan);
if (planReq == nullptr) {
return NdpRetCode::NDP_CONSTRUCT_FAILED;
}
NdpAdminResponse resp;
NdpRetCode ret = SendAdminReq(planReq, &resp, sizeof(resp.ret));
pfree(planReq);
if (ret != NdpRetCode::NDP_OK) {
ret = NdpRetCode::RPC_ADMIN_SEND_PLAN_FAILED;
}
return ret;
}
NdpRetCode NdpScanChannel::SendAdminReq(NdpAdminRequest* req, NdpAdminResponse* resp, size_t size)
{
RpcStatus status = RpcSendAdminReq(req, resp, size, rpcClient);
if (status != RPC_OK) {
ereport(LOG, (errmsg("RpcSendAdminReq failed. CMD code:%d, Rpc code: %d", req->head.command, status)));
return NdpRetCode::RPC_ADMIN_SEND_FAIL;
} else if (resp->ret != NDP_OK) {
ereport(LOG, (errmsg("AdminReq handle failed.")));
return NdpRetCode::NDP_ERROR;
}
return NdpRetCode::NDP_OK;
}
NdpRetCode NdpScanChannel::SendState(NdpScanDesc ndpScan)
{
NdpAdminRequest* state = ConstructPlanState(ndpScan);
if (state == nullptr) {
return NdpRetCode::NDP_CONSTRUCT_FAILED;
}
NdpAdminResponse resp;
NdpRetCode ret = SendAdminReq(state, &resp, sizeof(resp.ret));
pfree(state);
if (ret != NdpRetCode::NDP_OK) {
ret = NdpRetCode::RPC_ADMIN_SEND_STATE_FAILED;
}
return ret;
}
NdpRetCode NdpScanChannel::SendIo(NdpIoSlot* req, NdpScanDesc ndpScan)
{
req->SetReq(queryId);
NdpTableMgr* mgr = &tableMgr[req->GetReqTableId()];
if (mgr->ioFailed >= SEND_FAILED_LIMIT) {
return NdpRetCode::RPC_IO_SEND_FAILED;
}
#ifdef NDP_ASYNC_RPC
RpcCallDone callDone = {.cb = &NdpIoSlotCallDone, .arg = (void*)req };
pg_atomic_add_fetch_u32(&ndpScan->reqCount, 1); // before send
RpcStatus ret = RpcSendIOReq(req->GetReqMsg(), req->GetRespMsg(), &callDone, rpcClient);
if (ret != RPC_OK) {
pg_atomic_sub_fetch_u32(&ndpScan->reqCount, 1); // before send
mgr->ioFailed++;
return NdpRetCode::RPC_IO_SEND_FAILED;
}
#else
RpcStatus ret = RpcSendIOReq(req->GetReqMsg(), req->GetRespMsg(), NULL, rpcClient);
if (ret != RPC_OK) {
mgr->ioFailed++;
return NdpRetCode::RPC_IO_SEND_FAILED;
}
#endif
return NdpRetCode::NDP_OK;
}
NdpAdminRequest* NdpScanChannel::ConstructPlanReq(NdpScanDesc ndpScan)
{
if (IsA(ndpScan->cond->plan, Agg)) {
Agg* agg = (Agg*)ndpScan->cond->plan;
if (agg->grp_collations == NULL && agg->numCols > 0) {
agg->grp_collations = (unsigned int*)palloc(sizeof(unsigned int) * agg->numCols);
for (int i = 0; i < agg->numCols; i++) {
agg->grp_collations[i] = InvalidOid;
}
}
}
char* str = nodeToString(ndpScan->cond->plan);
int len = strlen(str) + 1;
if (len == 1) {
return nullptr;
}
NdpAdminRequest* req = reinterpret_cast<NdpAdminRequest*>(palloc(sizeof(NdpAdminRequest) + len));
req->head.command = NDP_PLAN;
req->head.size = sizeof(NdpAdminRequest) + len;
req->taskId = queryId;
req->tableId = ndpScan->cond->tableId;
errno_t rc = memcpy_s(reinterpret_cast<void*>(req + 1), len, str, len);
securec_check(rc, "", "");
return req;
}
bool NdpScanChannel::ExtractTupleDesc(TupleDesc desc, NdpTupleDesc* td)
{
td->natts = desc->natts;
if (td->natts == 0) {
td->attrs = NULL;
return true;
}
td->attrs = (NdpPGAttr *)palloc(sizeof(NdpPGAttr) * td->natts);
for (int i = 0; i < desc->natts; ++i) {
td->attrs[i].attlen = desc->attrs[i].attlen;
td->attrs[i].attbyval = desc->attrs[i].attbyval;
td->attrs[i].attcacheoff = desc->attrs[i].attcacheoff;
td->attrs[i].attalign = desc->attrs[i].attalign;
td->attrs[i].attndims = desc->attrs[i].attndims;
td->attrs[i].attstorage = desc->attrs[i].attstorage;
}
td->tdhasoid = desc->tdhasoid;
td->tdhasuids = desc->tdhasuids;
td->tdtypeid = desc->tdtypeid;
td->tdtypmod = desc->tdtypmod;
return true;
}
bool NdpScanChannel::ExtractRelation(TableScanDesc scan, NdpRelation* rel)
{
rel->node.spcNode = scan->rs_rd->rd_node.spcNode;
rel->node.dbNode = scan->rs_rd->rd_node.dbNode;
rel->node.relNode = scan->rs_rd->rd_node.relNode;
rel->node.bucketNode = scan->rs_rd->rd_node.bucketNode;
rel->node.opt = scan->rs_rd->rd_node.opt;
return ExtractTupleDesc(scan->rs_rd->rd_att, &rel->att);
}
bool NdpScanChannel::ExtractXact(TableScanDesc scan, NdpXact* xact)
{
xact->comboCids = NULL;
xact->CLogPageBuffer = NULL;
xact->CSNLogPageBuffer = NULL;
/* snapshot */
NdpSnapshot* snapshot = &xact->snapshot;
snapshot->satisfies = scan->rs_snapshot->satisfies;
snapshot->xmin = scan->rs_snapshot->xmin; /* all XID < xmin are visible to me */
snapshot->xmax = scan->rs_snapshot->xmax; /* all XID >= xmax are invisible to me */
snapshot->snapshotcsn = scan->rs_snapshot->snapshotcsn;
snapshot->curcid = scan->rs_snapshot->curcid;
/* TransactionState */
xact->transactionId = GetCurrentTransactionIdIfAny();
/* comboCids */
xact->usedComboCids = u_sess->utils_cxt.usedComboCids;
if (xact->usedComboCids > 0) {
xact->comboCids = (uint32 *)palloc(xact->usedComboCids * 2 * sizeof(uint32));
uint32 *comboCids = (uint32 *)u_sess->utils_cxt.comboCids;
for (int i = 0; i < xact->usedComboCids; i++) {
xact->comboCids[i * 2] = comboCids[i * 2];
xact->comboCids[i * 2 + 1] = comboCids[i * 2 + 1];
}
} else {
xact->comboCids = NULL;
}
/* clog & csnlog */
/* It's okay to read latestCompletedXid without acquiring ProcArrayLock shared lock
* because we dont' care if we get a slightly stale value
*/
xact->latestCompletedXid = t_thrd.xact_cxt.ShmemVariableCache->latestCompletedXid;
int64 pagenoStart, pagenoEnd, pageno;
pagenoStart = TransactionIdToPage(FirstNormalTransactionId);
pagenoEnd = TransactionIdToPage(xact->latestCompletedXid);
xact->CLogLen = (pagenoEnd - pagenoStart + 1) * BLCKSZ;
xact->CLogPageBuffer = (char *)palloc(xact->CLogLen);
for (pageno = pagenoStart; pageno <= pagenoEnd; pageno++) {
CopyCLog(pageno, xact->CLogPageBuffer + (pageno - pagenoStart) * BLCKSZ);
}
pagenoStart = TransactionIdToCSNPage(xact->latestCompletedXid);
pagenoEnd = TransactionIdToCSNPage(xact->latestCompletedXid);
xact->CSNLogLen = (pagenoEnd - pagenoStart + 1) * BLCKSZ;
xact->CSNLogPageBuffer = (char *)palloc(xact->CSNLogLen);
for (pageno = pagenoStart; pageno <= pagenoEnd; pageno++) {
CopyCSNLog(pageno, xact->CSNLogPageBuffer + (pageno - pagenoStart) * BLCKSZ);
}
return true;
}
bool NdpScanChannel::ExtractAggState(NdpScanDesc ndpScan, NdpAggState* aggS)
{
aggS->aggTd.natts = 0;
aggS->aggTd.attrs = nullptr;
aggS->aggNum = 0;
aggS->perAggTd = nullptr;
aggS->numCols = 0;
aggS->eqFuncOid = nullptr;
aggS->hashFuncOid = nullptr;
if (ndpScan->aggState) {
Assert(ndpScan->aggSlot != nullptr);
ExtractTupleDesc(ndpScan->aggSlot->tts_tupleDescriptor, &aggS->aggTd);
aggS->aggNum = ndpScan->aggState->numaggs;
aggS->perAggTd = (NdpTupleDesc*)palloc(aggS->aggNum * sizeof(NdpTupleDesc));
for (int aggNo = 0; aggNo < aggS->aggNum; ++aggNo) {
ExtractTupleDesc(ndpScan->aggState->evaldesc, &aggS->perAggTd[aggNo]);
}
Agg* agg = (Agg*)ndpScan->aggState->ss.ps.plan;
if ((agg->aggstrategy == AGG_HASHED) && (agg->numCols > 0)) {
aggS->numCols = agg->numCols;
aggS->eqFuncOid = (unsigned int*)palloc(aggS->numCols * sizeof(unsigned int));
aggS->hashFuncOid = (unsigned int*)palloc(aggS->numCols * sizeof(unsigned int));
for (int colNo = 0; colNo < aggS->numCols; ++colNo) {
aggS->eqFuncOid[colNo] = ndpScan->aggState->phases[0].eqfunctions[colNo].fn_oid;
aggS->hashFuncOid[colNo] = ndpScan->aggState->hashfunctions[colNo].fn_oid;
}
}
}
return true;
}
bool CheckExprContext(ExprContext* econtext, NdpParamList* pList)
{
ParamListInfo paramInfo = econtext->ecxt_param_list_info;
if (paramInfo->paramFetch != NULL) {
return false;
}
pList->numParams = paramInfo->numParams;
pList->params = (NdpParamData*)palloc(pList->numParams * sizeof(NdpParamData));
for (int i = 0; i < pList->numParams; i++) {
ParamExternData* from = &paramInfo->params[i];
NdpParamData* to = &pList->params[i];
if (from->tabInfo && from->tabInfo->isnestedtable && plpgsql_estate) {
pfree(pList->params);
pList->numParams = 0;
return false;
}
to->isnull = from->isnull;
to->ptype = from->ptype;
to->value = from->value;
get_typlenbyval(to->ptype, &to->typlen, &to->typbyval);
to->value = datumCopy(to->value, to->typbyval, to->typlen);
}
return true;
}
bool ExtractParamList(NdpScanDesc ndpScan, NdpParamList* pList)
{
ExprContext* econtext = ndpScan->scanState->ps.ps_ExprContext;
if (econtext == nullptr || econtext->ecxt_param_list_info == nullptr ||
econtext->ecxt_param_list_info->numParams == 0) {
pList->numParams = 0;
pList->params = nullptr;
return true;
}
return CheckExprContext(econtext, pList);
}
bool ExtractKnlSessionContext(NdpSessionContext* sess)
{
if (u_sess != nullptr) {
sess->sql_compatibility = u_sess->attr.attr_sql.sql_compatibility;
sess->behavior_compat_flags = u_sess->utils_cxt.behavior_compat_flags;
sess->encoding = u_sess->mb_cxt.DatabaseEncoding->encoding;
return true;
}
return false;
}
NdpPlanState* NdpScanChannel::CreatePlanState(NdpScanDesc ndpScan)
{
NdpPlanState* state = (NdpPlanState*)palloc(sizeof(NdpPlanState));
TableScanDesc scan = ndpScan->scan;
ExtractRelation(scan, &state->rel);
ProjectionInfo* proj_info = ndpScan->scanState->ps.ps_ProjInfo;
if (proj_info != NULL) {
ExtractTupleDesc(proj_info->pi_slot->tts_tupleDescriptor, &state->scanTd);
} else {
ExtractTupleDesc(ndpScan->scanState->ss_ScanTupleSlot->tts_tupleDescriptor, &state->scanTd);
}
if (!ExtractAggState(ndpScan, &state->aggState)) {
DestroyPlanState(state);
return nullptr;
}
if (!ExtractParamList(ndpScan, &state->paramList)) {
DestroyPlanState(state);
return nullptr;
}
if (!ExtractKnlSessionContext(&state->sess)) {
DestroyPlanState(state);
return nullptr;
}
return state;
}
void NdpScanChannel::DestroyPlanState(NdpPlanState* state)
{
if (state == nullptr) return;
if (state->rel.att.attrs) pfree(state->rel.att.attrs);
if (state->scanTd.attrs) pfree(state->scanTd.attrs);
if (state->paramList.params != nullptr) {
pfree(state->paramList.params);
}
NdpAggState* aggS = &state->aggState;
if (aggS->aggTd.attrs) pfree(aggS->aggTd.attrs);
if (aggS->perAggTd) {
for (int i = 0; i < aggS->aggNum; ++i) {
if (aggS->perAggTd[i].attrs) pfree(aggS->perAggTd[i].attrs);
}
pfree(aggS->perAggTd);
}
if (aggS->eqFuncOid) pfree(aggS->eqFuncOid);
if (aggS->hashFuncOid) pfree(aggS->hashFuncOid);
pfree(state);
}
NdpAdminRequest* NdpScanChannel::ConstructPlanState(NdpScanDesc ndpScan)
{
NdpPlanState* state = CreatePlanState(ndpScan);
if (state == nullptr) {
return nullptr;
}
StringInfoData str;
initStringInfo(&str);
NdpAdminRequest head;
appendBinaryStringInfo(&str, (const char*)&head, (int)sizeof(head));
stateToString(state, &str);
NdpAdminRequest* ptr = (NdpAdminRequest*)str.data;
ptr->head.command = NDP_PLANSTATE;
ptr->head.size = str.len;
ptr->taskId = queryId;
ptr->tableId = ndpScan->cond->tableId;
DestroyPlanState(state);
return ptr;
}
NdpAdminRequest* NdpScanChannel::ConstructQuery(NdpScanDesc ndpScan)
{
NdpQuery* query = (NdpQuery*)palloc(sizeof(NdpQuery));
NdpContext* context = static_cast<NdpContext*>(ndpScan->cond->ctx);
query->tableNum = context->tableCount;
ExtractXact(ndpScan->scan, &query->xact);
StringInfoData str;
initStringInfo(&str);
NdpAdminRequest head;
appendBinaryStringInfo(&str, (const char*)&head, (int)sizeof(head));
queryToString(query, &str);
NdpAdminRequest* ptr = (NdpAdminRequest*)str.data;
ptr->head.command = NDP_QUERY;
ptr->head.size = str.len;
if (query->xact.comboCids) pfree(query->xact.comboCids);
if (query->xact.CLogPageBuffer) pfree(query->xact.CLogPageBuffer);
if (query->xact.CSNLogPageBuffer) pfree(query->xact.CSNLogPageBuffer);
pfree(query);
return ptr;
}
NdpAdminRequest* NdpScanChannel::ConstructVersion()
{
int len = sizeof(uint64);
uint64 version = (((uint64)GRAND_VERSION_NUM) << 32) | NDP_LOCAL_VERSION_NUM;
NdpAdminRequest* req = reinterpret_cast<NdpAdminRequest*>(palloc(sizeof(NdpAdminRequest) + len));
req->head.command = NDP_VERSION;
req->head.size = sizeof(NdpAdminRequest) + len;
req->taskId = -1;
req->tableId = -1;
errno_t rc = memcpy_s(reinterpret_cast<void*>(req + 1), len, &version, len);
securec_check(rc, "", "");
return req;
}