From 4ead928e534c43f1b329c447df50f8bd6d92caed Mon Sep 17 00:00:00 2001 From: liucc1997 <1192520566@qq.com> Date: Thu, 11 May 2023 10:19:11 +0000 Subject: [PATCH] enable pkt-nio to separate RPC memory under tenants --- deps/oblib/src/rpc/frame/ob_net_easy.cpp | 2 +- .../rpc/obrpc/ob_poc_rpc_request_operator.cpp | 6 ++--- .../oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp | 25 ++++++++++++++++--- deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h | 1 + deps/oblib/src/rpc/obrpc/ob_rpc_mem_pool.cpp | 7 ++---- deps/oblib/src/rpc/pnio/interface/group.c | 21 ++++++++++++++++ deps/oblib/src/rpc/pnio/interface/group.h | 1 + src/share/config/ob_server_config.cpp | 2 +- src/share/parameter/ob_parameter_seed.ipp | 2 +- 9 files changed, 51 insertions(+), 16 deletions(-) diff --git a/deps/oblib/src/rpc/frame/ob_net_easy.cpp b/deps/oblib/src/rpc/frame/ob_net_easy.cpp index 09e886f49..1df455abb 100644 --- a/deps/oblib/src/rpc/frame/ob_net_easy.cpp +++ b/deps/oblib/src/rpc/frame/ob_net_easy.cpp @@ -817,7 +817,7 @@ int ObNetEasy::start() } if (OB_SUCC(ret) && rpc_port_ > 0) { - global_ob_listener = &rpc_listener_; + ATOMIC_STORE(&global_ob_listener, &rpc_listener_); if (!global_poc_server.has_start()) { if (OB_FAIL(rpc_listener_.listen_create(rpc_port_))) { LOG_ERROR("create listen failed", K(ret)); diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_request_operator.cpp b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_request_operator.cpp index 0a32a3deb..783b6c5b0 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_request_operator.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_request_operator.cpp @@ -18,7 +18,7 @@ namespace oceanbase { namespace obrpc { -ObPocServerHandleContext* get_poc_handle_context(rpc::ObRequest* req) +ObPocServerHandleContext* get_poc_handle_context(const rpc::ObRequest* req) { return (ObPocServerHandleContext*)req->get_server_handle_context(); } @@ -37,9 +37,7 @@ void ObPocRpcRequestOperator::response_result(ObRequest* req, obrpc::ObRpcPacket ObAddr ObPocRpcRequestOperator::get_peer(const ObRequest* req) { - UNUSED(req); - ObAddr addr; - return addr; + return get_poc_handle_context(req)->get_peer(); } }; // end namespace obrpc diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp index fb0d808a0..c9d8e5a64 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp @@ -124,6 +124,22 @@ void ObPocServerHandleContext::resp(ObRpcPacket* pkt) } } +ObAddr ObPocServerHandleContext::get_peer() +{ + ObAddr addr; + struct sockaddr_storage sock_addr; + if (0 == pn_get_peer(resp_id_, &sock_addr)) { + if (AF_INET == sock_addr.ss_family) { + struct sockaddr_in *sin = reinterpret_cast(&sock_addr); + addr.set_ipv4_addr(ntohl(sin->sin_addr.s_addr), ntohs(sin->sin_port)); + } else if (AF_INET6 == sock_addr.ss_family) { + struct sockaddr_in6 *sin6 = reinterpret_cast(&sock_addr); + addr.set_ipv6_addr(&sin6->sin6_addr.s6_addr, ntohs(sin6->sin6_port)); + } + } + return addr; +} + int serve_cb(int grp, const char* b, int64_t sz, uint64_t resp_id) { int ret = OB_SUCCESS; @@ -155,18 +171,19 @@ int ObPocRpcServer::start(int port, int net_thread_count, frame::ObReqDeliver* d int ret = OB_SUCCESS; // init pkt-nio framework int lfd = -1; + int rl_net_thread_count = max(1, net_thread_count/4); if ((lfd = pn_listen(port, serve_cb)) == -1) { ret = OB_SERVER_LISTEN_ERROR; RPC_LOG(ERROR, "pn_listen failed", K(ret)); } else { - global_deliver = deliver; + ATOMIC_STORE(&global_deliver, deliver); int count = 0; if ((count = pn_provision(lfd, DEFAULT_PNIO_GROUP, net_thread_count)) != net_thread_count) { ret = OB_ERR_SYS; RPC_LOG(WARN, "pn_provision error", K(count), K(net_thread_count)); - } else if((count = pn_provision(lfd, RATELIMIT_PNIO_GROUP, net_thread_count)) != net_thread_count) { + } else if((count = pn_provision(lfd, RATELIMIT_PNIO_GROUP, rl_net_thread_count)) != rl_net_thread_count) { ret = OB_ERR_SYS; - RPC_LOG(WARN, "pn_provision for RATELIMIT_PNIO_GROUP error", K(count), K(net_thread_count)); + RPC_LOG(WARN, "pn_provision for RATELIMIT_PNIO_GROUP error", K(count), K(rl_net_thread_count)); } else { has_start_ = true; } @@ -218,7 +235,7 @@ bool server_in_black(struct sockaddr* sa) { } int dispatch_to_ob_listener(int accept_fd) { int ret = -1; - if (oceanbase::obrpc::global_ob_listener) { + if (OB_NOT_NULL(ATOMIC_LOAD(&oceanbase::obrpc::global_ob_listener))) { ret = oceanbase::obrpc::global_ob_listener->do_one_event(accept_fd); } return ret; diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h index 457022c1a..eecf8dba6 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h @@ -37,6 +37,7 @@ public: static int create(int64_t resp_id, const char* buf, int64_t sz, rpc::ObRequest*& req); void destroy() { pool_.destroy(); } void resp(ObRpcPacket* pkt); + ObAddr get_peer(); void* alloc(int64_t sz) { return pool_.alloc(sz); } private: ObRpcMemPool& pool_; diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_mem_pool.cpp b/deps/oblib/src/rpc/obrpc/ob_rpc_mem_pool.cpp index 3ed98ba6c..5ae6ebe92 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_mem_pool.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_mem_pool.cpp @@ -42,13 +42,10 @@ static void* rpc_mem_pool_direct_alloc(int64_t tenant_id, const char* label, int tenant_id = OB_SERVER_TENANT_ID; } ObMemAttr attr(tenant_id, label, common::ObCtxIds::RPC_CTX_ID); - auto* ret = common::ob_malloc(sz, attr); - if (OB_ISNULL(ret) - && OB_ISNULL(lib::ObMallocAllocator::get_instance()->get_tenant_ctx_allocator(tenant_id, common::ObCtxIds::RPC_CTX_ID))) { + if (OB_ISNULL(lib::ObMallocAllocator::get_instance()->get_tenant_ctx_allocator_without_tlcache(tenant_id, common::ObCtxIds::RPC_CTX_ID))) { attr.tenant_id_ = OB_SERVER_TENANT_ID; - ret = common::ob_malloc(sz, attr); } - return ret; + return common::ob_malloc(sz, attr); } static void rpc_mem_pool_direct_free(void* p) { common::ob_free(p); } static ObRpcMemPool::Page* rpc_mem_pool_create_page(int64_t tenant_id, const char* label, int64_t sz) { diff --git a/deps/oblib/src/rpc/pnio/interface/group.c b/deps/oblib/src/rpc/pnio/interface/group.c index 3a081592a..f458a2a2f 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.c +++ b/deps/oblib/src/rpc/pnio/interface/group.c @@ -494,6 +494,27 @@ PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz) return pkts_resp(pkts, r); } +PN_API int pn_get_peer(uint64_t req_id, struct sockaddr_storage* addr) { + int err = 0; + pn_resp_ctx_t* ctx = (typeof(ctx))req_id; + if (unlikely(NULL == ctx || NULL == addr)) { + err = -EINVAL; + rk_warn("invalid arguments, req_id=%p", ctx); + } else { + pkts_t* pkts = &ctx->pn->pkts; + pkts_sk_t* sock = (typeof(sock))idm_get(&pkts->sk_map, ctx->sock_id); + socklen_t sa_len = sizeof(struct sockaddr_storage); + if (unlikely(NULL == sock)) { + err = -EINVAL; + rk_warn("idm_get sock failed, sock_id=%lx", ctx->sock_id); + } else if (0 != getpeername(sock->fd, (struct sockaddr*)addr, &sa_len)) { + err = -EIO; + rk_warn("getpeername failed, fd=%d, errno=%d", sock->fd, errno); + } + } + return err; +} + PN_API int pn_ratelimit(int grp_id, int64_t value) { int err = 0; pn_grp_t* pn_grp = locate_grp(grp_id); diff --git a/deps/oblib/src/rpc/pnio/interface/group.h b/deps/oblib/src/rpc/pnio/interface/group.h index d8ea23bdf..f2ab9c9fb 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.h +++ b/deps/oblib/src/rpc/pnio/interface/group.h @@ -57,6 +57,7 @@ PN_API int pn_provision(int listen_id, int grp, int thread_count); // gid_tid = (gid<<8) | tid PN_API int pn_send(uint64_t gid_tid, struct sockaddr_in* addr, const char* buf, int64_t sz, int16_t categ_id, int64_t expire_us, client_cb_t cb, void* arg); PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz); +PN_API int pn_get_peer(uint64_t req_id, struct sockaddr_storage* addr); PN_API int pn_ratelimit(int grp_id, int64_t value); PN_API int64_t pn_get_ratelimit(int grp_id); PN_API uint64_t pn_get_rxbytes(int grp_id); diff --git a/src/share/config/ob_server_config.cpp b/src/share/config/ob_server_config.cpp index ea740eaf4..af459f1c2 100644 --- a/src/share/config/ob_server_config.cpp +++ b/src/share/config/ob_server_config.cpp @@ -393,7 +393,7 @@ OB_DEF_SERIALIZE_SIZE(ObServerConfig) } // end of namespace common namespace obrpc { bool enable_pkt_nio() { - return GCONF._enable_pkt_nio && GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_1_0_0; + return GCONF._enable_pkt_nio && GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_0_0; } } } // end of namespace oceanbase diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index 4b2a13036..d88628eb7 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -712,7 +712,7 @@ DEF_TIME(_ob_get_gts_ahead_interval, OB_CLUSTER_PARAMETER, "0s", "[0s, 1s]", DEF_TIME(rpc_timeout, OB_CLUSTER_PARAMETER, "2s", "the time during which a RPC request is permitted to execute before it is terminated", ObParameterAttr(Section::RPC, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); -DEF_BOOL(_enable_pkt_nio, OB_CLUSTER_PARAMETER, "False", +DEF_BOOL(_enable_pkt_nio, OB_CLUSTER_PARAMETER, "True", "enable pkt-nio, the new RPC framework" "Value: True:turned on; False: turned off", ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));