From 94cbff5e2716697963d864f5c842de5297097b7b Mon Sep 17 00:00:00 2001 From: liucc1997 <1192520566@qq.com> Date: Mon, 9 Dec 2024 07:45:09 +0000 Subject: [PATCH] [CP] Optimize the memory usage of RPC requests --- .../oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp | 54 ++++++++++++++----- deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h | 5 +- deps/oblib/src/rpc/obrpc/ob_rpc_mem_pool.cpp | 8 +-- deps/oblib/src/rpc/obrpc/ob_rpc_mem_pool.h | 7 ++- deps/oblib/src/rpc/pnio/interface/group.c | 38 ++++++++++--- deps/oblib/src/rpc/pnio/interface/group.h | 3 +- src/objit/src/ob_llvm_di_helper.cpp | 6 +-- 7 files changed, 91 insertions(+), 30 deletions(-) diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp index d333c1c9fe..22c538be7c 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp @@ -89,7 +89,7 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s } IGNORE_RETURN snprintf(rpc_timeguard_str, sizeof(rpc_timeguard_str), "sz=%ld,pcode=%x,id=%ld", sz, pcode, tenant_id); timeguard.click(rpc_timeguard_str); - ObRpcMemPool* pool = ObRpcMemPool::create(tenant_id, pcode_label, pool_size); + ObRpcMemPool* pool = ObRpcMemPool::create(tenant_id, pcode_label, pool_size, ObRpcMemPool::RPC_CACHE_SIZE); void *temp = NULL; #ifdef ERRSIM @@ -147,29 +147,59 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s return ret; } +void* ObPocServerHandleContext::alloc(int64_t sz) { + resp_ptr_ = pn_resp_pre_alloc(resp_id_, sz); + return resp_ptr_; +} + void ObPocServerHandleContext::resp(ObRpcPacket* pkt) { int ret = OB_SUCCESS; int sys_err = 0; char reserve_buf[2048]; // reserve stack memory for response packet buf - char* buf = reserve_buf; - int64_t sz = 0; + char* buff = NULL; + int64_t resp_hdr_size = 0; + int64_t resp_buf_size = 0; + int64_t rpc_header_size = ObRpcPacket::get_header_size(); + int pkt_hdr_size = sizeof(ObRpcPacket) + OB_NET_HEADER_LENGTH + rpc_header_size; + int64_t pos = 0; + char* pkt_ptr = reinterpret_cast(pkt); char rpc_timeguard_str[ObPocRpcServer::RPC_TIMEGUARD_STRING_SIZE] = {'\0'}; ObTimeGuard timeguard("rpc_resp", 10 * 1000); if (NULL == pkt) { // do nothing - } else if (OB_FAIL(rpc_encode_ob_packet(pool_, pkt, buf, sz, sizeof(reserve_buf)))) { - RPC_LOG(WARN, "rpc_encode_ob_packet fail", KP(pkt), K(sz)); - buf = NULL; - sz = 0; + } else if (OB_UNLIKELY(pkt_ptr != resp_ptr_)) { + // response error packet using temporary memory + int64_t sz = 0; + char* tmp_buf = reserve_buf; + if (OB_FAIL(rpc_encode_ob_packet(pool_, pkt, tmp_buf, sz, sizeof(reserve_buf)))) { + RPC_LOG(WARN, "rpc_encode_ob_packet fail", KP(pkt), K(sz)); + } else { + buff = tmp_buf; + } + resp_hdr_size = 0; + resp_buf_size = sz; + } else if (OB_FAIL(pkt->encode_header(pkt_ptr + sizeof(ObRpcPacket) + OB_NET_HEADER_LENGTH, rpc_header_size, pos))) { + RPC_LOG(WARN, "encode pkt header fail", K(*pkt), K(rpc_header_size), K(pos)); } else { + /* + * RPC response packet buffer format + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * | ObRpcPacket | easy header | RPC header | rcode | RPC response | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + */ + // ObRpcPacket is not used in pkt-nio, pn_resp will use this buff to allocate pn_resp_t struct + buff = pkt_ptr; + resp_hdr_size = sizeof(ObRpcPacket) + OB_NET_HEADER_LENGTH; + resp_buf_size = pkt->get_encoded_size(); IGNORE_RETURN snprintf(rpc_timeguard_str, sizeof(rpc_timeguard_str), "sz=%ld,pcode=%x,id=%ld", - sz, - pkt->get_pcode(), - pkt->get_tenant_id()); + resp_buf_size, + pkt->get_pcode(), + pkt->get_tenant_id()); } timeguard.click(rpc_timeguard_str); - if ((sys_err = pn_resp(resp_id_, buf, sz, resp_expired_abs_us_)) != 0) { + if ((sys_err = pn_resp(resp_id_, buff, resp_hdr_size, resp_buf_size, resp_expired_abs_us_)) != 0) { + ret = tranlate_to_ob_error(sys_err); RPC_LOG(WARN, "pn_resp fail", K(resp_id_), K(sys_err)); } } @@ -250,7 +280,7 @@ int serve_cb(int grp, const char* b, int64_t sz, uint64_t resp_id) if (OB_SUCCESS != tmp_ret) { if (OB_TMP_FAIL(ObPocServerHandleContext::resp_error(resp_id, tmp_ret, b, sz))) { int sys_err = 0; - if ((sys_err = pn_resp(resp_id, NULL, 0, OB_INVALID_TIMESTAMP)) != 0) { + if ((sys_err = pn_resp(resp_id, NULL, 0, 0, OB_INVALID_TIMESTAMP)) != 0) { RPC_LOG(WARN, "pn_resp fail", K(resp_id), K(sys_err)); } } diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h index 41321a5403..45b3de4e62 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h @@ -33,7 +33,7 @@ public: OBCG_ELECTION = 2 }; // same as src/share/resource_manager/ob_group_list.h ObPocServerHandleContext(ObRpcMemPool& pool, uint64_t resp_id, int64_t resp_expired_abs_us): - pool_(pool), resp_id_(resp_id), resp_expired_abs_us_(resp_expired_abs_us), peer_() + pool_(pool), resp_id_(resp_id), resp_expired_abs_us_(resp_expired_abs_us), peer_(), resp_ptr_(NULL) {} ~ObPocServerHandleContext() { destroy(); @@ -44,7 +44,7 @@ public: static int resp_error(uint64_t resp_id, int err_code, const char* b, const int64_t sz); ObAddr get_peer(); void set_peer_unsafe(); // This function can only be called from the pnio thread. - void* alloc(int64_t sz) { return pool_.alloc(sz); } + void* alloc(int64_t sz); void set_resp_expired_time(int64_t ts) { resp_expired_abs_us_ = ts; } int64_t get_resp_expired_time() { return resp_expired_abs_us_; } private: @@ -52,6 +52,7 @@ private: uint64_t resp_id_; int64_t resp_expired_abs_us_; ObAddr peer_; + void* resp_ptr_; }; diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_mem_pool.cpp b/deps/oblib/src/rpc/obrpc/ob_rpc_mem_pool.cpp index b988ddb1b2..3625354d30 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_mem_pool.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_mem_pool.cpp @@ -50,8 +50,8 @@ static void* rpc_mem_pool_direct_alloc(int64_t tenant_id, const char* label, int return common::ob_malloc(sz, attr); } static void rpc_mem_pool_direct_free(void* p) { common::ob_free(p); } -static ObRpcMemPool::Page* rpc_mem_pool_create_page(int64_t tenant_id, const char* label, int64_t sz) { - int64_t alloc_sz = std::max(sizeof(ObRpcMemPool::Page) + sz, (uint64_t)ObRpcMemPool::RPC_POOL_PAGE_SIZE); +static ObRpcMemPool::Page* rpc_mem_pool_create_page(int64_t tenant_id, const char* label, int64_t sz, int64_t cache_sz = ObRpcMemPool::RPC_POOL_PAGE_SIZE) { + int64_t alloc_sz = std::max(sizeof(ObRpcMemPool::Page) + sz, (uint64_t)cache_sz); ObRpcMemPool::Page* page = (typeof(page))rpc_mem_pool_direct_alloc(tenant_id, label, alloc_sz); if (OB_ISNULL(page)) { LOG_WARN_RET(common::OB_ALLOCATE_MEMORY_FAILED, "rpc memory pool alloc memory failed", K(sz), K(alloc_sz)); @@ -67,11 +67,11 @@ static void rpc_mem_pool_destroy_page(ObRpcMemPool::Page* page) { } } -ObRpcMemPool* ObRpcMemPool::create(int64_t tenant_id, const char* label, int64_t req_sz) +ObRpcMemPool* ObRpcMemPool::create(int64_t tenant_id, const char* label, int64_t req_sz, int64_t cache_sz) { Page* page = nullptr; ObRpcMemPool* pool = nullptr; - if (OB_NOT_NULL(page = rpc_mem_pool_create_page(tenant_id, label, req_sz + sizeof(ObRpcMemPool)))) { + if (OB_NOT_NULL(page = rpc_mem_pool_create_page(tenant_id, label, req_sz + sizeof(ObRpcMemPool), cache_sz))) { if (OB_NOT_NULL(pool = (typeof(pool))page->alloc(sizeof(ObRpcMemPool)))) { new(pool)ObRpcMemPool(tenant_id, label); // can not be null pool->add_page(page); diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_mem_pool.h b/deps/oblib/src/rpc/obrpc/ob_rpc_mem_pool.h index d26e1a12f1..0074ea15d4 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_mem_pool.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_mem_pool.h @@ -21,12 +21,15 @@ namespace obrpc class ObRpcMemPool { public: - enum { RPC_POOL_PAGE_SIZE = (1<<14) - 128}; + enum { + RPC_POOL_PAGE_SIZE = (1<<14) - 128, + RPC_CACHE_SIZE = 3968 + }; struct Page; explicit ObRpcMemPool(): last_(NULL), tenant_id_(OB_INVALID_TENANT_ID), mem_label_("RpcDefault") {} explicit ObRpcMemPool(int64_t tenant_id, const char* label): last_(NULL), tenant_id_(tenant_id), mem_label_(label) {} ~ObRpcMemPool() { destroy(); } - static ObRpcMemPool* create(int64_t tenant_id, const char* label, int64_t req_sz); + static ObRpcMemPool* create(int64_t tenant_id, const char* label, int64_t req_sz, int64_t cache_sz = ObRpcMemPool::RPC_POOL_PAGE_SIZE); void* alloc(int64_t sz); void set_tenant_id(int64_t tenant_id) { tenant_id_ = tenant_id; } void reuse(); diff --git a/deps/oblib/src/rpc/pnio/interface/group.c b/deps/oblib/src/rpc/pnio/interface/group.c index 0b95df06f7..5677f277df 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.c +++ b/deps/oblib/src/rpc/pnio/interface/group.c @@ -512,6 +512,7 @@ typedef struct pn_resp_ctx_t void* req_handle; uint64_t sock_id; uint64_t pkt_id; + void* resp_ptr; char reserve[sizeof(pkts_req_t)]; } pn_resp_ctx_t; @@ -525,6 +526,7 @@ static pn_resp_ctx_t* create_resp_ctx(pn_t* pn, void* req_handle, uint64_t sock_ ctx->req_handle = req_handle; ctx->sock_id = sock_id; ctx->pkt_id = pkt_id; + ctx->resp_ptr = NULL; } return ctx; } @@ -557,8 +559,11 @@ static void pn_pkts_flush_cb_func(pkts_req_t* req) if ((uint64_t)resp->ctx->reserve == (uint64_t)resp) { fifo_free(resp->ctx); } else { + void* resp_ptr = resp->ctx->resp_ptr; fifo_free(resp->ctx); - cfifo_free(resp); + if (likely(resp_ptr)) { + cfifo_free(resp_ptr); + } } } @@ -567,15 +572,35 @@ static void pn_pkts_flush_cb_error_func(pkts_req_t* req) pn_resp_ctx_t* ctx = (typeof(ctx))structof(req, pn_resp_ctx_t, reserve); fifo_free(ctx); } +PN_API void* pn_resp_pre_alloc(uint64_t req_id, int64_t sz) +{ + void* p = NULL; + pn_resp_ctx_t* ctx = (typeof(ctx))req_id; + if (likely(ctx)) { + if (unlikely(ctx->resp_ptr != NULL)) { + int err = PNIO_ERROR; + rk_error("pn_resp_pre_alloc might has been executed, it is unexpected, ctx=%p, resp_ptr=%p", ctx, ctx->resp_ptr); + cfifo_free(ctx->resp_ptr); + } + p = cfifo_alloc(&ctx->pn->server_resp_alloc, sz); + ctx->resp_ptr = p; + } + return p; +} -PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz, int64_t resp_expired_abs_us) +PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t hdr_sz, int64_t payload_sz, int64_t resp_expired_abs_us) { pn_resp_ctx_t* ctx = (typeof(ctx))req_id; pn_resp_t* resp = NULL; - if (sizeof(pn_resp_t) + sz <= sizeof(ctx->reserve)) { - resp = (typeof(resp))(ctx->reserve); + if (unlikely(0 == hdr_sz)) { // response null or response error + if (ctx->resp_ptr) { + cfifo_free(ctx->resp_ptr); + } + ctx->resp_ptr = cfifo_alloc(&ctx->pn->server_resp_alloc, sizeof(*resp) + payload_sz); + resp = (typeof(resp))ctx->resp_ptr; } else { - resp = (typeof(resp))cfifo_alloc(&ctx->pn->server_resp_alloc, sizeof(*resp) + sz); + assert(hdr_sz >= sizeof(*resp)); + resp = (typeof(resp))(buf + hdr_sz - sizeof(*resp)); } pkts_req_t* r = NULL; if (NULL != resp) { @@ -586,8 +611,9 @@ PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz, int64_t resp_ex r->sock_id = ctx->sock_id; r->categ_id = 0; r->expire_us = resp_expired_abs_us; - eh_copy_msg(&r->msg, ctx->pkt_id, buf, sz); + eh_copy_msg(&r->msg, ctx->pkt_id, buf + hdr_sz, payload_sz); } else { + rk_warn("allocate memory for pn_resp_t failed"); r = (typeof(r))(ctx->reserve); r->errcode = ENOMEM; r->flush_cb = pn_pkts_flush_cb_error_func; diff --git a/deps/oblib/src/rpc/pnio/interface/group.h b/deps/oblib/src/rpc/pnio/interface/group.h index 23c4f532ca..1323f56f75 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.h +++ b/deps/oblib/src/rpc/pnio/interface/group.h @@ -79,7 +79,8 @@ PN_API int pn_listen(int port, serve_cb_t cb); PN_API int pn_provision(int listen_id, int grp, int thread_count); // gid_tid = (gid<<8) | tid PN_API int pn_send(uint64_t gtid, struct sockaddr_storage* sock_addr, const pn_pkt_t* pkt, uint32_t* pkt_id_ret); -PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz, int64_t resp_expired_abs_us); +PN_API void* pn_resp_pre_alloc(uint64_t req_id, int64_t sz); +PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t hdr_sz, int64_t payload_sz, int64_t resp_expired_abs_us); PN_API int pn_get_peer(uint64_t req_id, struct sockaddr_storage* addr); PN_API int pn_ratelimit(int grp_id, int64_t value); PN_API int64_t pn_get_ratelimit(int grp_id); diff --git a/src/objit/src/ob_llvm_di_helper.cpp b/src/objit/src/ob_llvm_di_helper.cpp index 5b9e38f84b..fec3762ef9 100644 --- a/src/objit/src/ob_llvm_di_helper.cpp +++ b/src/objit/src/ob_llvm_di_helper.cpp @@ -413,9 +413,9 @@ int ObLLVMDIHelper::get_current_scope(ObLLVMDIScope &scope) ret = OB_NOT_INIT; LOG_WARN("jc is NULL", K(ret)); } else if (NULL != jc_->sp_) { - scope.set_v(jc_->sp_); - } else { - scope.set_v(jc_->file_); + scope.set_v(jc_->sp_); + } else { + scope.set_v(jc_->file_); } return ret; }