Files
Mijamind cb3fa65c63 【资源池化】openGauss算子下推特性合入
1.opengauss内核适配
2.ndpplugin
2023-05-16 21:03:02 +08:00

299 lines
8.5 KiB
C++

/* -------------------------------------------------------------------------
* ndpam.h
* prototypes for functions in contrib/ndpplugin/ndpam.cpp
*
* Portions Copyright (c) 2022 Huawei Technologies Co.,Ltd.
*
* IDENTIFICATION
* contrib/ndpplugin/ndpam.h
*
* -------------------------------------------------------------------------
*/
#ifndef NDPAM_H
#define NDPAM_H
#include "component/thread/mpmcqueue.h"
#include "ndpplugin.h" // for global instance config
#define PARALLEL_SCAN_GAP_AU_ALIGNED ((unsigned)PAGE_NUM_PER_AU << 1)
enum class NdpRetCode {
// ok
NDP_OK = 0,
// for desc
ALLOC_MC_FAILED,
ALLOC_MQ_FAILED,
// for channel
CONNECT_FAILED,
CONNECT_UNUSABLE,
RPC_ADMIN_SEND_FAIL,
TABLE_ID_INVALID,
TABLE_MGR_UNUSABLE,
RPC_ADMIN_SEND_CTX_FAILED,
RPC_ADMIN_SEND_PLAN_FAILED,
RPC_ADMIN_SEND_STATE_FAILED,
RPC_ADMIN_SEND_TERMINATE_FAILED,
RPC_ADMIN_SEND_VERSION_FAILED,
RPC_IO_SEND_FAILED,
RPC_IO_CALLBACK_ERROR,
// for memory alloc
ALLOC_RESPONSE_MEMORY_FAILED,
NDP_RETURN_FAILED,
NDP_RETURN_STATUS_ERROR,
NDP_ERROR,
NDP_CONSTRUCT_FAILED
};
typedef PageHeaderData NdpPageHeaderData;
typedef PageHeaderData* NdpPageHeader;
class NdpIoSlot : public BaseObject {
public:
explicit NdpIoSlot(void* privateData)
: priv(privateData), resp(nullptr), respRet(NdpRetCode::NDP_OK), rpcStatus(RPC_OK)
{
reqMsg.data = respMsg.data = nullptr;
reqMsg.len = respMsg.len = 0;
}
~NdpIoSlot() { FreeResp(); }
int SetReq(RelFileNode& node, uint16 taskId, uint16 tableId, AuInfo& auinfo);
void SetReq(uint16 taskId) { req.taskId = taskId; }
NdpRetCode SetResp(int pageNum);
void SetRespRet(NdpRetCode code) { respRet = code; }
void SetStartBlockNum(uint32 start) { startBlockNum = start; }
void* GetPriv(void) { return priv; }
uint16 GetReqTableId(void) { return req.tableId; }
NdpRetCode GetResp(NdpPageHeader& pages, int& pageNum, BlockNumber& start, uint64*& map);
RpcMessage* GetReqMsg(void) { return &reqMsg; }
RpcMessage* GetRespMsg(void) { return &respMsg; }
uint32 GetStartBlockNum(void) { return startBlockNum; }
int GetPushDownPageNum(void) {return req.auInfos[0].pageNum;};
void SetRPCRet(RpcStatus rpccode) { rpcStatus = rpccode; }
private:
void FreeResp(void);
void* priv;
RpcMessage reqMsg;
RpcMessage respMsg;
NdpIORequest req;
NdpIOResponse* resp;
NdpRetCode respRet;
uint32 startBlockNum;
RpcStatus rpcStatus;
};
/*
* plan <-> NdpContext
* |----ScanNode <-> NdpScanCondition
* |----Scan producer <-> NdpScanDescData
* |----Scan producer <-> NdpScanDescData
* |----ScanNode <-> NdpScanCondition
*/
typedef struct NdpContext { // for each plan tree
MemoryContext ccMem;
pthread_rwlock_t ccLock;
struct HTAB* channelCache; // for connector <=> rpc
uint32 rpcCount;
uint32 tableCount; // for oid <=> tableId, also can do in server
knl_session_context* u_sess; // for statistics without ENABLE_THREAD_POOL
} NdpContext;
class NdpScanDescData : public BaseObject { // for each scan thread
public:
TableScanDesc scan;
int curPageType;
int curLinesNum;
NdpIoSlot* curIO; // for free
NdpPageHeader curNdpPages;
int curNdpPagesNum;
int nextNdpPageIndex;
NdpPageHeader curNdpPage; // can unify to id, if pushdown page store after global shared memory
int curNormalPageId;
int nextTupleOffset; // for access tuple in pushdown page
int nextLineIndex;
#ifdef NDP_ASYNC_RPC
pg_atomic_uint32 reqCount;
pg_atomic_uint32 respCount;
MpmcBoundedQueue<NdpIoSlot*>* respIO{nullptr};
MpmcBoundedQueue<int>* normalPagesId{nullptr};
#else
int normalPagesNum;
int normalPagesId[PAGE_NUM_PER_AU];
#endif
BlockNumber handledBlock; // number of handled block
BlockNumber nBlock; // block's number of the scan relation
// for statistics
int sendFailedN{0};
int failedIoN{0};
int normalPageN{0};
int pushDownPageN{0};
int sendBackPageN{0};
int ndpPageAggN{0};
int ndpPageScanN{0};
int rev{0};
NdpScanCondition* cond; // for Plan
ScanState* scanState;
AggState* aggState;
TupleTableSlot* aggSlot;
MemoryContext memCtx{nullptr};
NdpScanDescData() = default;
~NdpScanDescData();
NdpRetCode Init(ScanState* sstate, TableScanDesc sscan);
void Reset(void);
void AddToNormal(uint32 start, uint32 end);
void AddToNormal(uint32 block)
{
#ifdef NDP_ASYNC_RPC
if (!normalPagesId->Enqueue(block)) {
ereport(ERROR, (errmsg("normal page exceed limit.")));
}
#else
normalPagesId[normalPagesNum] = block;
normalPagesNum++;
#endif
normalPageN++;
}
bool HandleSlot(NdpIoSlot* slot);
#ifdef NDP_ASYNC_RPC
bool GetNextSlot(void);
#endif
void FreeCurSlot(void)
{
if (curIO) {
delete curIO;
curIO = nullptr;
}
}
};
typedef NdpScanDescData* NdpScanDesc;
enum class NdpScanChannelStatus{
UNCONNECTED = 0,
CONNECTED,
QUERYSENT,
CLOSED
};
enum class NdpTableStatus {
INITIAL = 0,
PLANSENT,
STATESENT,
CONSTRUCTFAIL
};
struct NdpTableMgr : public BaseObject {
volatile NdpTableStatus status = NdpTableStatus::INITIAL;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // protect status
uint16 ioFailed = 0;
uint16 cmdNdpFailed = 0;
};
// connector <=> channel
struct NdpScanChannel {
char rpcIp[NDP_RPC_IP_LEN];
uint32 rpcId; // handle of rpc, support multi-thread
volatile NdpScanChannelStatus status; // atomic access, called by multi-thread
pthread_mutex_t mutex;
RpcClient rpcClient;
uint16 queryId;
uint32 tableNum;
NdpTableMgr* tableMgr; // status to know if condition sent
uint32 connFailed;
uint16 cmdFailed;
NdpTableStatus GetTableStatus(uint16 tableId)
{
if (tableId >= tableNum) {
return NdpTableStatus::CONSTRUCTFAIL;
}
return tableMgr[tableId].status;
}
void DestroyChannel()
{
DisconnectRpc();
status = NdpScanChannelStatus::CLOSED;
if (tableMgr) {
delete []tableMgr;
}
tableNum = 0;
pthread_mutex_destroy(&mutex);
}
// do initialize in init instead of constructor, because allocated by Hash insert
bool Init(uint32 id, char* ip, uint32 tableN);
NdpRetCode SendRequest(NdpIoSlot* req, NdpScanDesc ndpScan); // should support multi-thread
NdpRetCode SendEnd();
NdpRetCode SendAdminReq(NdpAdminRequest* req, NdpAdminResponse* resp, size_t size);
// this function can only be called under mutex locked, need atomic write status
void DisconnectRpc()
{
if (rpcClient) {
RpcClientDisconnect(rpcClient);
rpcClient = 0;
status = NdpScanChannelStatus::UNCONNECTED;
}
}
NdpRetCode SendReq(NdpIoSlot* req, NdpScanDesc ndpScan);
NdpRetCode SendAdmin(NdpTableMgr* mgr, NdpIoSlot* req, NdpScanDesc ndpScan);
NdpRetCode SendIo(NdpIoSlot* req, NdpScanDesc ndpScan);
NdpRetCode SendQuery(NdpScanDesc ndpScan);
NdpRetCode SendPlan(NdpScanDesc ndpScan);
NdpRetCode SendState(NdpScanDesc ndpScan);
NdpAdminRequest* ConstructPlanReq(NdpScanDesc ndpScan);
bool ExtractTupleDesc(TupleDesc desc, NdpTupleDesc* td);
bool ExtractRelation(TableScanDesc scan, NdpRelation* rel);
bool ExtractXact(TableScanDesc scan, NdpXact* xact);
bool ExtractAggState(NdpScanDesc ndpScan, NdpAggState* aggS);
NdpPlanState* CreatePlanState(NdpScanDesc ndpScan);
void DestroyPlanState(NdpPlanState* state);
NdpAdminRequest* ConstructPlanState(NdpScanDesc ndpScan);
NdpAdminRequest* ConstructQuery(NdpScanDesc ndpScan);
NdpAdminRequest* ConstructVersion();
};
struct NdpPageMethod {
void (*get_pageinfo)(NdpScanDesc ndpScan, BlockNumber page, CephObject *object, char *ip,
BlockNumber& end, uint32& phyStartBlockNum);
};
void pm_get_pageinfo(NdpScanDesc ndpScan, BlockNumber page, CephObject *object, char *ip,
BlockNumber& end, uint32& phyStartBlockNum);
void md_get_pageinfo(NdpScanDesc ndpScan, BlockNumber page, CephObject *object, char *ip,
BlockNumber& end, uint32& phyStartBlockNum);
void seg_get_pageinfo(NdpScanDesc ndpScan, BlockNumber page, CephObject *object, char *ip,
BlockNumber& end, uint32& phyStartBlockNum);
static const NdpPageMethod PAGEMETHOD[] {
{
md_get_pageinfo,
},
{
seg_get_pageinfo,
}
};
#endif // NDPAM_H