[CP] Optimize the memory usage of RPC requests

This commit is contained in:
liucc1997
2024-12-09 07:45:09 +00:00
committed by ob-robot
parent 6eb1ec1898
commit 94cbff5e27
7 changed files with 91 additions and 30 deletions

View File

@ -89,7 +89,7 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s
}
IGNORE_RETURN snprintf(rpc_timeguard_str, sizeof(rpc_timeguard_str), "sz=%ld,pcode=%x,id=%ld", sz, pcode, tenant_id);
timeguard.click(rpc_timeguard_str);
ObRpcMemPool* pool = ObRpcMemPool::create(tenant_id, pcode_label, pool_size);
ObRpcMemPool* pool = ObRpcMemPool::create(tenant_id, pcode_label, pool_size, ObRpcMemPool::RPC_CACHE_SIZE);
void *temp = NULL;
#ifdef ERRSIM
@ -147,29 +147,59 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s
return ret;
}
void* ObPocServerHandleContext::alloc(int64_t sz) {
resp_ptr_ = pn_resp_pre_alloc(resp_id_, sz);
return resp_ptr_;
}
void ObPocServerHandleContext::resp(ObRpcPacket* pkt)
{
int ret = OB_SUCCESS;
int sys_err = 0;
char reserve_buf[2048]; // reserve stack memory for response packet buf
char* buf = reserve_buf;
int64_t sz = 0;
char* buff = NULL;
int64_t resp_hdr_size = 0;
int64_t resp_buf_size = 0;
int64_t rpc_header_size = ObRpcPacket::get_header_size();
int pkt_hdr_size = sizeof(ObRpcPacket) + OB_NET_HEADER_LENGTH + rpc_header_size;
int64_t pos = 0;
char* pkt_ptr = reinterpret_cast<char*>(pkt);
char rpc_timeguard_str[ObPocRpcServer::RPC_TIMEGUARD_STRING_SIZE] = {'\0'};
ObTimeGuard timeguard("rpc_resp", 10 * 1000);
if (NULL == pkt) {
// do nothing
} else if (OB_FAIL(rpc_encode_ob_packet(pool_, pkt, buf, sz, sizeof(reserve_buf)))) {
RPC_LOG(WARN, "rpc_encode_ob_packet fail", KP(pkt), K(sz));
buf = NULL;
sz = 0;
} else if (OB_UNLIKELY(pkt_ptr != resp_ptr_)) {
// response error packet using temporary memory
int64_t sz = 0;
char* tmp_buf = reserve_buf;
if (OB_FAIL(rpc_encode_ob_packet(pool_, pkt, tmp_buf, sz, sizeof(reserve_buf)))) {
RPC_LOG(WARN, "rpc_encode_ob_packet fail", KP(pkt), K(sz));
} else {
buff = tmp_buf;
}
resp_hdr_size = 0;
resp_buf_size = sz;
} else if (OB_FAIL(pkt->encode_header(pkt_ptr + sizeof(ObRpcPacket) + OB_NET_HEADER_LENGTH, rpc_header_size, pos))) {
RPC_LOG(WARN, "encode pkt header fail", K(*pkt), K(rpc_header_size), K(pos));
} else {
/*
* RPC response packet buffer format
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* | ObRpcPacket | easy header | RPC header | rcode | RPC response |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
// ObRpcPacket is not used in pkt-nio, pn_resp will use this buff to allocate pn_resp_t struct
buff = pkt_ptr;
resp_hdr_size = sizeof(ObRpcPacket) + OB_NET_HEADER_LENGTH;
resp_buf_size = pkt->get_encoded_size();
IGNORE_RETURN snprintf(rpc_timeguard_str, sizeof(rpc_timeguard_str), "sz=%ld,pcode=%x,id=%ld",
sz,
pkt->get_pcode(),
pkt->get_tenant_id());
resp_buf_size,
pkt->get_pcode(),
pkt->get_tenant_id());
}
timeguard.click(rpc_timeguard_str);
if ((sys_err = pn_resp(resp_id_, buf, sz, resp_expired_abs_us_)) != 0) {
if ((sys_err = pn_resp(resp_id_, buff, resp_hdr_size, resp_buf_size, resp_expired_abs_us_)) != 0) {
ret = tranlate_to_ob_error(sys_err);
RPC_LOG(WARN, "pn_resp fail", K(resp_id_), K(sys_err));
}
}
@ -250,7 +280,7 @@ int serve_cb(int grp, const char* b, int64_t sz, uint64_t resp_id)
if (OB_SUCCESS != tmp_ret) {
if (OB_TMP_FAIL(ObPocServerHandleContext::resp_error(resp_id, tmp_ret, b, sz))) {
int sys_err = 0;
if ((sys_err = pn_resp(resp_id, NULL, 0, OB_INVALID_TIMESTAMP)) != 0) {
if ((sys_err = pn_resp(resp_id, NULL, 0, 0, OB_INVALID_TIMESTAMP)) != 0) {
RPC_LOG(WARN, "pn_resp fail", K(resp_id), K(sys_err));
}
}

View File

@ -33,7 +33,7 @@ public:
OBCG_ELECTION = 2
}; // same as src/share/resource_manager/ob_group_list.h
ObPocServerHandleContext(ObRpcMemPool& pool, uint64_t resp_id, int64_t resp_expired_abs_us):
pool_(pool), resp_id_(resp_id), resp_expired_abs_us_(resp_expired_abs_us), peer_()
pool_(pool), resp_id_(resp_id), resp_expired_abs_us_(resp_expired_abs_us), peer_(), resp_ptr_(NULL)
{}
~ObPocServerHandleContext() {
destroy();
@ -44,7 +44,7 @@ public:
static int resp_error(uint64_t resp_id, int err_code, const char* b, const int64_t sz);
ObAddr get_peer();
void set_peer_unsafe(); // This function can only be called from the pnio thread.
void* alloc(int64_t sz) { return pool_.alloc(sz); }
void* alloc(int64_t sz);
void set_resp_expired_time(int64_t ts) { resp_expired_abs_us_ = ts; }
int64_t get_resp_expired_time() { return resp_expired_abs_us_; }
private:
@ -52,6 +52,7 @@ private:
uint64_t resp_id_;
int64_t resp_expired_abs_us_;
ObAddr peer_;
void* resp_ptr_;
};

View File

@ -50,8 +50,8 @@ static void* rpc_mem_pool_direct_alloc(int64_t tenant_id, const char* label, int
return common::ob_malloc(sz, attr);
}
static void rpc_mem_pool_direct_free(void* p) { common::ob_free(p); }
static ObRpcMemPool::Page* rpc_mem_pool_create_page(int64_t tenant_id, const char* label, int64_t sz) {
int64_t alloc_sz = std::max(sizeof(ObRpcMemPool::Page) + sz, (uint64_t)ObRpcMemPool::RPC_POOL_PAGE_SIZE);
static ObRpcMemPool::Page* rpc_mem_pool_create_page(int64_t tenant_id, const char* label, int64_t sz, int64_t cache_sz = ObRpcMemPool::RPC_POOL_PAGE_SIZE) {
int64_t alloc_sz = std::max(sizeof(ObRpcMemPool::Page) + sz, (uint64_t)cache_sz);
ObRpcMemPool::Page* page = (typeof(page))rpc_mem_pool_direct_alloc(tenant_id, label, alloc_sz);
if (OB_ISNULL(page)) {
LOG_WARN_RET(common::OB_ALLOCATE_MEMORY_FAILED, "rpc memory pool alloc memory failed", K(sz), K(alloc_sz));
@ -67,11 +67,11 @@ static void rpc_mem_pool_destroy_page(ObRpcMemPool::Page* page) {
}
}
ObRpcMemPool* ObRpcMemPool::create(int64_t tenant_id, const char* label, int64_t req_sz)
ObRpcMemPool* ObRpcMemPool::create(int64_t tenant_id, const char* label, int64_t req_sz, int64_t cache_sz)
{
Page* page = nullptr;
ObRpcMemPool* pool = nullptr;
if (OB_NOT_NULL(page = rpc_mem_pool_create_page(tenant_id, label, req_sz + sizeof(ObRpcMemPool)))) {
if (OB_NOT_NULL(page = rpc_mem_pool_create_page(tenant_id, label, req_sz + sizeof(ObRpcMemPool), cache_sz))) {
if (OB_NOT_NULL(pool = (typeof(pool))page->alloc(sizeof(ObRpcMemPool)))) {
new(pool)ObRpcMemPool(tenant_id, label); // can not be null
pool->add_page(page);

View File

@ -21,12 +21,15 @@ namespace obrpc
class ObRpcMemPool
{
public:
enum { RPC_POOL_PAGE_SIZE = (1<<14) - 128};
enum {
RPC_POOL_PAGE_SIZE = (1<<14) - 128,
RPC_CACHE_SIZE = 3968
};
struct Page;
explicit ObRpcMemPool(): last_(NULL), tenant_id_(OB_INVALID_TENANT_ID), mem_label_("RpcDefault") {}
explicit ObRpcMemPool(int64_t tenant_id, const char* label): last_(NULL), tenant_id_(tenant_id), mem_label_(label) {}
~ObRpcMemPool() { destroy(); }
static ObRpcMemPool* create(int64_t tenant_id, const char* label, int64_t req_sz);
static ObRpcMemPool* create(int64_t tenant_id, const char* label, int64_t req_sz, int64_t cache_sz = ObRpcMemPool::RPC_POOL_PAGE_SIZE);
void* alloc(int64_t sz);
void set_tenant_id(int64_t tenant_id) { tenant_id_ = tenant_id; }
void reuse();

View File

@ -512,6 +512,7 @@ typedef struct pn_resp_ctx_t
void* req_handle;
uint64_t sock_id;
uint64_t pkt_id;
void* resp_ptr;
char reserve[sizeof(pkts_req_t)];
} pn_resp_ctx_t;
@ -525,6 +526,7 @@ static pn_resp_ctx_t* create_resp_ctx(pn_t* pn, void* req_handle, uint64_t sock_
ctx->req_handle = req_handle;
ctx->sock_id = sock_id;
ctx->pkt_id = pkt_id;
ctx->resp_ptr = NULL;
}
return ctx;
}
@ -557,8 +559,11 @@ static void pn_pkts_flush_cb_func(pkts_req_t* req)
if ((uint64_t)resp->ctx->reserve == (uint64_t)resp) {
fifo_free(resp->ctx);
} else {
void* resp_ptr = resp->ctx->resp_ptr;
fifo_free(resp->ctx);
cfifo_free(resp);
if (likely(resp_ptr)) {
cfifo_free(resp_ptr);
}
}
}
@ -567,15 +572,35 @@ static void pn_pkts_flush_cb_error_func(pkts_req_t* req)
pn_resp_ctx_t* ctx = (typeof(ctx))structof(req, pn_resp_ctx_t, reserve);
fifo_free(ctx);
}
PN_API void* pn_resp_pre_alloc(uint64_t req_id, int64_t sz)
{
void* p = NULL;
pn_resp_ctx_t* ctx = (typeof(ctx))req_id;
if (likely(ctx)) {
if (unlikely(ctx->resp_ptr != NULL)) {
int err = PNIO_ERROR;
rk_error("pn_resp_pre_alloc might has been executed, it is unexpected, ctx=%p, resp_ptr=%p", ctx, ctx->resp_ptr);
cfifo_free(ctx->resp_ptr);
}
p = cfifo_alloc(&ctx->pn->server_resp_alloc, sz);
ctx->resp_ptr = p;
}
return p;
}
PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz, int64_t resp_expired_abs_us)
PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t hdr_sz, int64_t payload_sz, int64_t resp_expired_abs_us)
{
pn_resp_ctx_t* ctx = (typeof(ctx))req_id;
pn_resp_t* resp = NULL;
if (sizeof(pn_resp_t) + sz <= sizeof(ctx->reserve)) {
resp = (typeof(resp))(ctx->reserve);
if (unlikely(0 == hdr_sz)) { // response null or response error
if (ctx->resp_ptr) {
cfifo_free(ctx->resp_ptr);
}
ctx->resp_ptr = cfifo_alloc(&ctx->pn->server_resp_alloc, sizeof(*resp) + payload_sz);
resp = (typeof(resp))ctx->resp_ptr;
} else {
resp = (typeof(resp))cfifo_alloc(&ctx->pn->server_resp_alloc, sizeof(*resp) + sz);
assert(hdr_sz >= sizeof(*resp));
resp = (typeof(resp))(buf + hdr_sz - sizeof(*resp));
}
pkts_req_t* r = NULL;
if (NULL != resp) {
@ -586,8 +611,9 @@ PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz, int64_t resp_ex
r->sock_id = ctx->sock_id;
r->categ_id = 0;
r->expire_us = resp_expired_abs_us;
eh_copy_msg(&r->msg, ctx->pkt_id, buf, sz);
eh_copy_msg(&r->msg, ctx->pkt_id, buf + hdr_sz, payload_sz);
} else {
rk_warn("allocate memory for pn_resp_t failed");
r = (typeof(r))(ctx->reserve);
r->errcode = ENOMEM;
r->flush_cb = pn_pkts_flush_cb_error_func;

View File

@ -79,7 +79,8 @@ PN_API int pn_listen(int port, serve_cb_t cb);
PN_API int pn_provision(int listen_id, int grp, int thread_count);
// gid_tid = (gid<<8) | tid
PN_API int pn_send(uint64_t gtid, struct sockaddr_storage* sock_addr, const pn_pkt_t* pkt, uint32_t* pkt_id_ret);
PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz, int64_t resp_expired_abs_us);
PN_API void* pn_resp_pre_alloc(uint64_t req_id, int64_t sz);
PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t hdr_sz, int64_t payload_sz, int64_t resp_expired_abs_us);
PN_API int pn_get_peer(uint64_t req_id, struct sockaddr_storage* addr);
PN_API int pn_ratelimit(int grp_id, int64_t value);
PN_API int64_t pn_get_ratelimit(int grp_id);

View File

@ -413,9 +413,9 @@ int ObLLVMDIHelper::get_current_scope(ObLLVMDIScope &scope)
ret = OB_NOT_INIT;
LOG_WARN("jc is NULL", K(ret));
} else if (NULL != jc_->sp_) {
scope.set_v(jc_->sp_);
} else {
scope.set_v(jc_->file_);
scope.set_v(jc_->sp_);
} else {
scope.set_v(jc_->file_);
}
return ret;
}