Files
Mijamind cb3fa65c63 【资源池化】openGauss算子下推特性合入
1.opengauss内核适配
2.ndpplugin
2023-05-16 21:03:02 +08:00

616 lines
18 KiB
C++

/*
* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* rpc.cpp
*
* IDENTIFICATION
* src\common\component\rpc\rpc.cpp
*
* -------------------------------------------------------------------------
*/
#include <unistd.h>
#include "utils/dynloader.h"
#include "component/rpc/rpc.h"
#ifdef NDP_CLIENT
#include "utils/elog.h"
#include "knl/knl_session.h"
#else
#include "utils/log.h"
#include "utils/config.h"
#include "ndp/ndp.h"
#include "securec_check.h"
#endif
#define CHECK_RPC_STATUS(status) if ((status) != STATUS_OK) return RPC_ERROR
#define CHECK_NDP_RPC_STATUS(status) if ((status) != RPC_OK) return RPC_ERROR
#ifdef ENABLE_SSL
#define OCK_RPC_CONFIG_USE_SSL_CALLBACK (1ul << (2))
typedef uintptr_t OckRpcServerContext;
using OckRpcServerCtxBuilderHandler = OckRpcServerContext (*)(RpcServer server);
using OckRpcServerCtxCleanupHandler = void (*)(RpcServer server, OckRpcServerContext ctx);
/** @brief TLS callbacks */
/**
* @brief Keypass erase function
* @param keypass the memory address of keypass
*/
using OckRpcTlsKeypassErase = void (*)(char *keypass);
/**
* @brief Get private key file's path and length, and get the keypass
* @param priKeyPath the path of private key
* @param keypass the keypass
* @param erase the erase function
*/
using OckRpcTlsGetPrivateKey = void (*)(const char **priKeyPath, char **keypass, OckRpcTlsKeypassErase *erase);
/**
* @brief Get the certificate file of public key
* @param certPath the path of certificate
*/
using OckRpcTlsGetCert = void (*)(const char **certPath);
/**
* @brief The cert verify function
* @param x509 the X509_STORE_CTX object of CA
* @param crlPath the crl file path
*
* @return -1 for failed, and 1 for success
*/
using OckRpcTlsCertVerify = int (*)(void *x509, const char *crlPath);
/**
* @brief Get the CA and verify
* @param caPath the path of CA file
* @param crlPath the crl file path
* @param verify the verify function
*/
using OckRpcTlsGetCAAndVerify = void (*)(const char **caPath, const char **crlPath, OckRpcTlsCertVerify *verify);
typedef struct {
/* Must enable special bit before you set config value OckRpcCreateConfigMask */
uint64_t mask;
/* Set Key-Value mode to config, must enable OCK_RPC_CONFIG_USE_RPC_CONFIGS */
RpcConfigs configs;
/* Set user define Server Ctx build and cleanup handler, must enable OCK_RPC_CONFIG_USE_SERVER_CTX_BUILD */
OckRpcServerCtxBuilderHandler serverCtxbuilder;
OckRpcServerCtxCleanupHandler serverCtxCleanup;
/**
* Set SSL handler, must enable OCK_RPC_CONFIG_USE_SSL_CALLBACK
*
* In Server side getCert and getPriKey can't be nullptr
* In Client side getCaAndVerify can't be nullptr
*/
OckRpcTlsGetCAAndVerify getCaAndVerify; /* get the CA path and verify callback. */
OckRpcTlsGetCert getCert; /* get the certificate file of public key */
OckRpcTlsGetPrivateKey getPriKey; /* get the private key and keypass */
} OckRpcCreateConfig;
#ifdef NDP_CLIENT
using ClientConnectWithCfg = RpcStatus (*)(const char* ip, uint16_t port, RpcClient* client, OckRpcCreateConfig* cfg);
#else
using ServerCreateWithCfg = RpcStatus (*)(const char* ip, uint16_t port, RpcServer* server, OckRpcCreateConfig* cfg);
#endif
#endif
#ifdef NDP_CLIENT
using ClientConnect = RpcStatus (*)(const char *ip, uint16_t port, RpcClient *client);
using ClientDisconnect = void (*)(RpcClient client);
using ClientCall = RpcStatus (*)(RpcClient client, uint16_t msgId, RpcMessage *request, RpcMessage *response,
RpcCallDone *done);
using ClientSetTimeout = void (*)(RpcClient client, int64_t timeout);
typedef struct RpcUcxFunc {
ClientConnect clientConnect;
#ifdef ENABLE_SSL
ClientConnectWithCfg clientConnectWithCfg;
#endif
ClientDisconnect clientDisconnect;
ClientCall clientCall;
ClientSetTimeout clientSetTimeout;
} RpcUcxFunc;
#else
using ServerCreate = RpcStatus (*)(const char *ip, uint16_t port, RpcServer *server);
using ServerAddService = RpcStatus (*)(RpcServer server, RpcService *service);
using ServerStart = RpcStatus (*)(RpcServer server);
using ServerDestroy = void (*)(RpcServer server);
using ServerReply = RpcStatus (*)(RpcServerContext ctx, uint16_t msgId, RpcMessage *reply, RpcCallDone *done);
using ServerCleanupCtx = void (*)(RpcServerContext ctx);
typedef struct RpcUcxFunc {
ServerCreate serverCreate;
#ifdef ENABLE_SSL
ServerCreateWithCfg serverCreateWithCfg;
#endif
ServerAddService serverAddService;
ServerStart serverStart;
ServerDestroy serverDestroy;
ServerReply serverReply;
ServerCleanupCtx serverCleanCtx;
} RpcUcxFunc;
#endif
using ULOG_Init = void (*)(int x, int y, std::nullptr_t ptr, int z, int i);
using SetOpensslDLopenLibPath = int (*)(const char *ssl, const char *crypto);
constexpr int64_t REPLY_TIMEOUT = 60000;
void *g_rpcUcxDl = nullptr;
RpcUcxFunc g_rpcUcxFunc;
#ifdef ENABLE_SSL
#ifdef NDP_CLIENT
int tlsCertVerify(void *x509, const char *crlPath)
{
// rpc has basic verify, we don't add extra verify process, so return true directly
return 1;
}
void GetCAAndVerify(const char **caPath, const char **crlPath, OckRpcTlsCertVerify *verify)
{
*caPath = u_sess->ndp_cxt.ca_path;
*crlPath = u_sess->ndp_cxt.crl_path;
*verify = tlsCertVerify;
return;
}
#else
void KeypassErase(char *keypass)
{
if (keypass != nullptr) {
free(keypass);
}
}
void GetCert(const char **certPath)
{
*certPath = configSets->certPath.c_str();
}
void GetPrivateKey(const char **priKeyPath, char **keypass, OckRpcTlsKeypassErase *erase)
{
*priKeyPath = configSets->priKeyPath.c_str();
*erase = KeypassErase;
*keypass = (char*)malloc(configSets->keypass.length() + 1);
if (*keypass == nullptr) {
LOG_ERROR << "malloc failed, keypass copy failed.";
}
// keypass need encrypt further
errno_t rc = memcpy_s(*keypass, configSets->keypass.length() + 1, configSets->keypass.c_str(),
configSets->keypass.length() + 1);
securec_check(rc, "", "");
}
#endif
RpcStatus InitSslDl(char *sslDlPath, char* sslPath, char* cryptoPath)
{
if (sslDlPath == NULL || sslPath == NULL || cryptoPath == NULL) {
#ifdef NDP_CLIENT
ereport(WARNING, (errmsg("InitRpcDl failed, path is null")));
#else
LOG_ERROR << "InitRpcDl failed, path is null";
#endif
return RPC_ERROR;
}
if (g_rpcUcxDl != NULL) {
return RPC_OK;
}
/* load ulog */
void *sslDl;
CHECK_RPC_STATUS(OpenDl(&sslDl, sslDlPath));
/* init ulog */
SetOpensslDLopenLibPath setSSLDlPath;
CHECK_RPC_STATUS(LoadSymbol(sslDl, "SetOpensslDLopenLibPath", (void **)&setSSLDlPath));
setSSLDlPath("sslPath", "cryptoPath");
return RPC_OK;
}
#endif
RpcStatus InitRpcDl(char *path)
{
if (path == nullptr) {
#ifdef NDP_CLIENT
ereport(WARNING, (errmsg("dlopen rpc_ucx path is nullptr")));
#else
LOG_ERROR << "dlopen rpc_ucx path is nullptr";
#endif
return RPC_ERROR;
}
if (g_rpcUcxDl != nullptr) {
return RPC_OK;
}
CHECK_RPC_STATUS(OpenDl(&g_rpcUcxDl, path));
return RPC_OK;
}
/**
* load ulog from so, only need to use once, before InitRpcDl
* @param ulogPath
* @return
*/
RpcStatus LoadUlog(char* ulogPath)
{
if (ulogPath == nullptr) {
#ifdef NDP_CLIENT
ereport(WARNING, (errmsg("dlopen ulog path is nullptr")));
#else
LOG_ERROR << "dlopen ulog path is nullptr";
#endif
return RPC_ERROR;
}
/* load ulog */
void *ulog;
CHECK_RPC_STATUS(OpenDl(&ulog, ulogPath));
/* init ulog */
ULOG_Init ulogInit;
CHECK_RPC_STATUS(LoadSymbol(ulog, "ULOG_Init", (void **)&ulogInit));
ulogInit(0, 3, nullptr, 0, 0);
CloseDl(ulog);
return RPC_OK;
}
RpcStatus InitRpcEnv(DependencePath paths)
{
CHECK_NDP_RPC_STATUS(LoadUlog(paths.ulogPath));
#ifdef ENABLE_SSL
CHECK_NDP_RPC_STATUS(InitSslDl(paths.sslDLPath, paths.sslPath, paths.cryptoPath));
#endif
CHECK_NDP_RPC_STATUS(InitRpcDl(paths.rpcPath));
return RPC_OK;
}
#ifndef NDP_CLIENT
static RpcStatus RpcServerDlsym(void)
{
CHECK_RPC_STATUS(LoadSymbol(g_rpcUcxDl, "OckRpcServerCreate", (void **)&g_rpcUcxFunc.serverCreate));
#ifdef ENABLE_SSL
CHECK_RPC_STATUS(LoadSymbol(g_rpcUcxDl, "OckRpcServerCreateWithCfg", (void **)&g_rpcUcxFunc.serverCreateWithCfg));
#endif
CHECK_RPC_STATUS(LoadSymbol(g_rpcUcxDl, "OckRpcServerAddService", (void **)&g_rpcUcxFunc.serverAddService));
CHECK_RPC_STATUS(LoadSymbol(g_rpcUcxDl, "OckRpcServerStart", (void **)&g_rpcUcxFunc.serverStart));
CHECK_RPC_STATUS(LoadSymbol(g_rpcUcxDl, "OckRpcServerDestroy", (void **)&g_rpcUcxFunc.serverDestroy));
CHECK_RPC_STATUS(LoadSymbol(g_rpcUcxDl, "OckRpcServerReply", (void **)&g_rpcUcxFunc.serverReply));
CHECK_RPC_STATUS(LoadSymbol(g_rpcUcxDl, "OckRpcServerCleanupCtx", (void **)&g_rpcUcxFunc.serverCleanCtx));
return RPC_OK;
}
RpcStatus InitRpcServerConfig()
{
return RPC_OK;
}
RpcStatus InitRpcServer(KnlRpcContext& ctx, DependencePath paths)
{
// load dl
CHECK_NDP_RPC_STATUS(InitRpcEnv(paths));
// load server functions
if (RpcServerDlsym() != RPC_OK) {
LOG_ERROR << "dlsym rpc server func, path";
CloseDl(g_rpcUcxDl);
g_rpcUcxDl = nullptr;
return RPC_ERROR;
}
if (ctx.serverHandle != 0) {
g_rpcUcxFunc.serverDestroy(ctx.serverHandle);
}
CHECK_NDP_RPC_STATUS(InitRpcServerConfig());
#ifdef ENABLE_SSL
OckRpcCreateConfig cfg;
cfg.mask = OCK_RPC_CONFIG_USE_SSL_CALLBACK;
cfg.getCaAndVerify = nullptr;
cfg.getCert = GetCert;
cfg.getPriKey = GetPrivateKey;
RpcStatus status = g_rpcUcxFunc.serverCreateWithCfg(ctx.ip, ctx.port, &ctx.serverHandle, &cfg);
#else
RpcStatus status = g_rpcUcxFunc.serverCreate(ctx.ip, ctx.port, &ctx.serverHandle);
#endif
if (status != RPC_OK) {
LOG_ERROR << "OckRpcServerCreate failed, ip " << ctx.ip << "port" << ctx.port;
CloseDl(g_rpcUcxDl);
g_rpcUcxDl = nullptr;
return RPC_ERROR;
}
return RPC_OK;
}
static void RpcAdminProc(RpcServerContext handle, RpcMessage msg)
{
NdpAdminRequest *header = (NdpAdminRequest *)msg.data;
NdpAdminResponse resp;
size_t size = offsetof(NdpAdminResponse, queryId); // just send ret default
resp.ret = NDP_ILLEGAL;
NDP_PG_TRY();
{
if (!NdpAdminProc(header, resp, size)) {
LOG_DEBUG << "rpc admin message is received successfully, "
<< "admin command is " << (int)(header->head.command);
resp.ret = NDP_OK;
}
}
NDP_PG_CATCH();
{
LOG_INFO << "rpc admin message is received failed, "
<< "admin command is " << (int)(header->head.command);
resp.ret = NDP_ERR;
}
NDP_PG_END_TRY();
RpcMessage reply = {.data = (void*)&resp, .len = size};
if (g_rpcUcxFunc.serverReply(handle, RPC_ADMIN_REQ, &reply, nullptr) != RPC_OK) {
LOG_ERROR << "send reply failed";
}
g_rpcUcxFunc.serverCleanCtx(handle);
}
RpcStatus SendIOTaskErrReply(NdpIOTask* task, NDP_ERRNO error)
{
NdpIOResponse res;
res.status = error;
RpcMessage reply = {.data = nullptr, .len = 0};
reply.data = &res;
g_rpcUcxFunc.serverReply(task->handle, RPC_IO_REQ, &reply, nullptr);
g_rpcUcxFunc.serverCleanCtx(task->handle);
delete task;
}
#ifdef FAULT_INJECT
static void IOInject(NdpIOTask* &task)
{
auto iter = injectPlanVarMap.find(task->header->taskId);
if (iter != injectPlanVarMap.end()) {
iter->second->ioCount.fetch_add(1, std::memory_order_relaxed);
}
// timeout inject
if ((rand() % PERCENTAGE_DIV) < PERCENTAGE) {
sleep((rand() % PERCENTAGE_DIV));
}
SendIOTaskErrReply(task, ERR_AIO_FAILED);
}
#endif
static void RpcIOProc(RpcServerContext handle, RpcMessage msg)
{
NdpIOTask* task = new NdpIOTask(handle, reinterpret_cast<NdpIORequest*>(msg.data));
#ifdef NDP_ASYNC_CEPH
if (!SubmitAioReadData(task)) {
LOG_DEBUG << "rpc IO message is received successfully.";
} else {
SendIOTaskErrReply(task, ERR_AIO_FAILED);
}
#else
globalWorkerManager->AddTask(task);
#endif
}
RpcStatus RpcIOTaskHandler(NdpIOTask* task)
{
#ifdef FAULT_INJECT
if ((rand() % PERCENTAGE_DIV) < PERCENTAGE) {
IOInject(task);
return RPC_ERROR;
}
#endif
RpcServerContext handle = task->handle;
NdpIORequest *header = task->header;
#ifdef NDP_ASYNC_CEPH
t_thrd.ndpWorkerCtx->scanPages = task->aioDesc->readBuf;
#endif
NdpIOResponse res;
res.status = NDP_ILLEGAL;
Status ioStatus;
RpcMessage reply = {.data = nullptr, .len = 0};
NDP_PG_TRY();
{
ioStatus = NdpIOProc(header, &reply);
if (reply.data) {
LOG_DEBUG << "ndpworker " << pthread_self() << " successful handle "
<< reinterpret_cast<NdpIOResponse *>(reply.data)->ndpPageNums << " ndppages.";
reinterpret_cast<NdpIOResponse *>(reply.data)->status = NDP_OK;
} else {
LOG_DEBUG << "ndpworker " << pthread_self() << " handle 0 pages";
}
}
NDP_PG_CATCH();
{
ioStatus = STATUS_ERROR;
}
NDP_PG_END_TRY();
if (ioStatus != STATUS_OK) {
reply.len = sizeof(NdpIOResponse);
if (reply.data == nullptr) {
res.status = NDP_ERR;
reply.data = &res;
} else {
reinterpret_cast<NdpIOResponse *>(reply.data)->status = NDP_ERR;
}
}
RpcStatus status = g_rpcUcxFunc.serverReply(handle, RPC_IO_REQ, &reply, nullptr);
if (status != RPC_OK) {
LOG_WARN << "send reply failed";
}
g_rpcUcxFunc.serverCleanCtx(handle);
delete task;
return status;
}
static RpcStatus RegisterRpcProcFunc(void)
{
RpcServer server = ndp_instance.rpcContext.serverHandle;
if (server == 0) {
LOG_ERROR << "register rpc proc func failed, server handler:" << server;
return RPC_ERROR;
}
RpcService adminService = {.id = RPC_ADMIN_REQ, .handler = RpcAdminProc};
RpcStatus rpcStatus = g_rpcUcxFunc.serverAddService(server, &adminService);
if (rpcStatus != RPC_OK) {
LOG_ERROR << "add service RPC_ADMIN_REQ failed, status = " << rpcStatus;
return RPC_ERROR;
}
RpcService ioService = {.id = RPC_IO_REQ, .handler = RpcIOProc};
rpcStatus = g_rpcUcxFunc.serverAddService(server, &ioService);
if (rpcStatus != RPC_OK) {
LOG_ERROR << "add service RPC_IO_REQ failed, status = " << rpcStatus;
return RPC_ERROR;
}
return RPC_OK;
}
RpcStatus RpcServerInit(void)
{
RpcStatus rpcStatus;
memset(&ndp_instance, 0, sizeof(ndp_instance));
DependencePath paths;
paths.ulogPath = LIB_ULOG;
paths.rpcPath = LIB_RPC_UCX;
paths.sslDLPath = LIB_OPENSSL_DL;
paths.sslPath = LIB_SSL;
paths.cryptoPath = LIB_CRYPTO;
strcpy(ndp_instance.rpcContext.ip, configSets->ip.c_str());
ndp_instance.rpcContext.port = configSets->port;
CHECK_NDP_RPC_STATUS(InitRpcServer(ndp_instance.rpcContext, paths));
CHECK_NDP_RPC_STATUS(RegisterRpcProcFunc());
rpcStatus = g_rpcUcxFunc.serverStart(ndp_instance.rpcContext.serverHandle);
if (rpcStatus != RPC_OK) {
LOG_ERROR << "RpcServerStart failed";
return rpcStatus;
}
return rpcStatus;
}
#else
static RpcStatus RpcClientDlsym(void)
{
CHECK_RPC_STATUS(LoadSymbol(g_rpcUcxDl, "OckRpcClientConnect", (void **)&g_rpcUcxFunc.clientConnect));
#ifdef ENABLE_SSL
CHECK_RPC_STATUS(LoadSymbol(g_rpcUcxDl, "OckRpcClientConnectWithCfg", (void **)&g_rpcUcxFunc.clientConnectWithCfg));
#endif
CHECK_RPC_STATUS(LoadSymbol(g_rpcUcxDl, "OckRpcClientDisconnect", (void **)&g_rpcUcxFunc.clientDisconnect));
CHECK_RPC_STATUS(LoadSymbol(g_rpcUcxDl, "OckRpcClientCall", (void **)&g_rpcUcxFunc.clientCall));
CHECK_RPC_STATUS(LoadSymbol(g_rpcUcxDl, "OckRpcClientSetTimeout", (void **)&g_rpcUcxFunc.clientSetTimeout));
return RPC_OK;
}
RpcStatus RpcClientInit(DependencePath& paths)
{
// load dl
CHECK_NDP_RPC_STATUS(InitRpcEnv(paths));
if (RpcClientDlsym() != RPC_OK) {
CloseDl(g_rpcUcxDl);
g_rpcUcxDl = nullptr;
return RPC_ERROR;
}
return RPC_OK;
}
RpcStatus RpcClientConnect(char *ip, uint16_t port, RpcClient& clientHandle)
{
#ifdef ENABLE_SSL
OckRpcCreateConfig cfg;
cfg.mask = OCK_RPC_CONFIG_USE_SSL_CALLBACK;
cfg.getCaAndVerify = GetCAAndVerify;
cfg.getCert = nullptr;
cfg.getPriKey = nullptr;
RpcStatus rpcStatus = g_rpcUcxFunc.clientConnectWithCfg(ip, port, &clientHandle, &cfg);
#else
RpcStatus rpcStatus = g_rpcUcxFunc.clientConnect(ip, port, &clientHandle);
#endif
if (rpcStatus != RPC_OK) {
ereport(LOG, (errmsg("RpcClientConnect failed, ip: %s, port: %d", ip, port)));
return rpcStatus;
}
g_rpcUcxFunc.clientSetTimeout(clientHandle, REPLY_TIMEOUT);
return RPC_OK;
}
void RpcClientDisconnect(RpcClient clientHandle)
{
g_rpcUcxFunc.clientDisconnect(clientHandle);
ereport(LOG, (errmsg("RpcClientDisconnect complete.")));
}
// size is for expand NdpAdminResponse
RpcStatus RpcSendAdminReq(NdpAdminRequest* req, NdpAdminResponse* resp, size_t size, RpcClient clientHandle)
{
RpcMessage request = {.data = (void*)req, .len = req->head.size};
RpcMessage response = {.data = (void*)resp, .len = size};
resp->ret = NDP_ILLEGAL;
RpcStatus rpcStatus = g_rpcUcxFunc.clientCall(clientHandle, RPC_ADMIN_REQ, &request, &response, nullptr);
return rpcStatus;
}
RpcStatus RpcSendIOReq(RpcMessage* request, RpcMessage* response, RpcCallDone* done, RpcClient clientHandle)
{
RpcStatus rpcStatus = g_rpcUcxFunc.clientCall(clientHandle, RPC_IO_REQ, request, response, done);
if (rpcStatus != RPC_OK) {
ereport(WARNING, (errmsg("RpcSendIOReq failed. Error code: %d", rpcStatus)));
}
return rpcStatus;
}
#endif