enable pkt-nio to separate RPC memory under tenants

This commit is contained in:
liucc1997 2023-05-11 10:19:11 +00:00 committed by ob-robot
parent caaed5132f
commit 4ead928e53
9 changed files with 51 additions and 16 deletions

View File

@ -817,7 +817,7 @@ int ObNetEasy::start()
}
if (OB_SUCC(ret) && rpc_port_ > 0) {
global_ob_listener = &rpc_listener_;
ATOMIC_STORE(&global_ob_listener, &rpc_listener_);
if (!global_poc_server.has_start()) {
if (OB_FAIL(rpc_listener_.listen_create(rpc_port_))) {
LOG_ERROR("create listen failed", K(ret));

View File

@ -18,7 +18,7 @@ namespace oceanbase
{
namespace obrpc
{
ObPocServerHandleContext* get_poc_handle_context(rpc::ObRequest* req)
ObPocServerHandleContext* get_poc_handle_context(const rpc::ObRequest* req)
{
return (ObPocServerHandleContext*)req->get_server_handle_context();
}
@ -37,9 +37,7 @@ void ObPocRpcRequestOperator::response_result(ObRequest* req, obrpc::ObRpcPacket
ObAddr ObPocRpcRequestOperator::get_peer(const ObRequest* req)
{
UNUSED(req);
ObAddr addr;
return addr;
return get_poc_handle_context(req)->get_peer();
}
}; // end namespace obrpc

View File

@ -124,6 +124,22 @@ void ObPocServerHandleContext::resp(ObRpcPacket* pkt)
}
}
ObAddr ObPocServerHandleContext::get_peer()
{
ObAddr addr;
struct sockaddr_storage sock_addr;
if (0 == pn_get_peer(resp_id_, &sock_addr)) {
if (AF_INET == sock_addr.ss_family) {
struct sockaddr_in *sin = reinterpret_cast<struct sockaddr_in *>(&sock_addr);
addr.set_ipv4_addr(ntohl(sin->sin_addr.s_addr), ntohs(sin->sin_port));
} else if (AF_INET6 == sock_addr.ss_family) {
struct sockaddr_in6 *sin6 = reinterpret_cast<struct sockaddr_in6 *>(&sock_addr);
addr.set_ipv6_addr(&sin6->sin6_addr.s6_addr, ntohs(sin6->sin6_port));
}
}
return addr;
}
int serve_cb(int grp, const char* b, int64_t sz, uint64_t resp_id)
{
int ret = OB_SUCCESS;
@ -155,18 +171,19 @@ int ObPocRpcServer::start(int port, int net_thread_count, frame::ObReqDeliver* d
int ret = OB_SUCCESS;
// init pkt-nio framework
int lfd = -1;
int rl_net_thread_count = max(1, net_thread_count/4);
if ((lfd = pn_listen(port, serve_cb)) == -1) {
ret = OB_SERVER_LISTEN_ERROR;
RPC_LOG(ERROR, "pn_listen failed", K(ret));
} else {
global_deliver = deliver;
ATOMIC_STORE(&global_deliver, deliver);
int count = 0;
if ((count = pn_provision(lfd, DEFAULT_PNIO_GROUP, net_thread_count)) != net_thread_count) {
ret = OB_ERR_SYS;
RPC_LOG(WARN, "pn_provision error", K(count), K(net_thread_count));
} else if((count = pn_provision(lfd, RATELIMIT_PNIO_GROUP, net_thread_count)) != net_thread_count) {
} else if((count = pn_provision(lfd, RATELIMIT_PNIO_GROUP, rl_net_thread_count)) != rl_net_thread_count) {
ret = OB_ERR_SYS;
RPC_LOG(WARN, "pn_provision for RATELIMIT_PNIO_GROUP error", K(count), K(net_thread_count));
RPC_LOG(WARN, "pn_provision for RATELIMIT_PNIO_GROUP error", K(count), K(rl_net_thread_count));
} else {
has_start_ = true;
}
@ -218,7 +235,7 @@ bool server_in_black(struct sockaddr* sa) {
}
int dispatch_to_ob_listener(int accept_fd) {
int ret = -1;
if (oceanbase::obrpc::global_ob_listener) {
if (OB_NOT_NULL(ATOMIC_LOAD(&oceanbase::obrpc::global_ob_listener))) {
ret = oceanbase::obrpc::global_ob_listener->do_one_event(accept_fd);
}
return ret;

View File

@ -37,6 +37,7 @@ public:
static int create(int64_t resp_id, const char* buf, int64_t sz, rpc::ObRequest*& req);
void destroy() { pool_.destroy(); }
void resp(ObRpcPacket* pkt);
ObAddr get_peer();
void* alloc(int64_t sz) { return pool_.alloc(sz); }
private:
ObRpcMemPool& pool_;

View File

@ -42,13 +42,10 @@ static void* rpc_mem_pool_direct_alloc(int64_t tenant_id, const char* label, int
tenant_id = OB_SERVER_TENANT_ID;
}
ObMemAttr attr(tenant_id, label, common::ObCtxIds::RPC_CTX_ID);
auto* ret = common::ob_malloc(sz, attr);
if (OB_ISNULL(ret)
&& OB_ISNULL(lib::ObMallocAllocator::get_instance()->get_tenant_ctx_allocator(tenant_id, common::ObCtxIds::RPC_CTX_ID))) {
if (OB_ISNULL(lib::ObMallocAllocator::get_instance()->get_tenant_ctx_allocator_without_tlcache(tenant_id, common::ObCtxIds::RPC_CTX_ID))) {
attr.tenant_id_ = OB_SERVER_TENANT_ID;
ret = common::ob_malloc(sz, attr);
}
return ret;
return common::ob_malloc(sz, attr);
}
static void rpc_mem_pool_direct_free(void* p) { common::ob_free(p); }
static ObRpcMemPool::Page* rpc_mem_pool_create_page(int64_t tenant_id, const char* label, int64_t sz) {

View File

@ -494,6 +494,27 @@ PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz)
return pkts_resp(pkts, r);
}
PN_API int pn_get_peer(uint64_t req_id, struct sockaddr_storage* addr) {
int err = 0;
pn_resp_ctx_t* ctx = (typeof(ctx))req_id;
if (unlikely(NULL == ctx || NULL == addr)) {
err = -EINVAL;
rk_warn("invalid arguments, req_id=%p", ctx);
} else {
pkts_t* pkts = &ctx->pn->pkts;
pkts_sk_t* sock = (typeof(sock))idm_get(&pkts->sk_map, ctx->sock_id);
socklen_t sa_len = sizeof(struct sockaddr_storage);
if (unlikely(NULL == sock)) {
err = -EINVAL;
rk_warn("idm_get sock failed, sock_id=%lx", ctx->sock_id);
} else if (0 != getpeername(sock->fd, (struct sockaddr*)addr, &sa_len)) {
err = -EIO;
rk_warn("getpeername failed, fd=%d, errno=%d", sock->fd, errno);
}
}
return err;
}
PN_API int pn_ratelimit(int grp_id, int64_t value) {
int err = 0;
pn_grp_t* pn_grp = locate_grp(grp_id);

View File

@ -57,6 +57,7 @@ PN_API int pn_provision(int listen_id, int grp, int thread_count);
// gid_tid = (gid<<8) | tid
PN_API int pn_send(uint64_t gid_tid, struct sockaddr_in* addr, const char* buf, int64_t sz, int16_t categ_id, int64_t expire_us, client_cb_t cb, void* arg);
PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz);
PN_API int pn_get_peer(uint64_t req_id, struct sockaddr_storage* addr);
PN_API int pn_ratelimit(int grp_id, int64_t value);
PN_API int64_t pn_get_ratelimit(int grp_id);
PN_API uint64_t pn_get_rxbytes(int grp_id);

View File

@ -393,7 +393,7 @@ OB_DEF_SERIALIZE_SIZE(ObServerConfig)
} // end of namespace common
namespace obrpc {
bool enable_pkt_nio() {
return GCONF._enable_pkt_nio && GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_1_0_0;
return GCONF._enable_pkt_nio && GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_0_0;
}
}
} // end of namespace oceanbase

View File

@ -712,7 +712,7 @@ DEF_TIME(_ob_get_gts_ahead_interval, OB_CLUSTER_PARAMETER, "0s", "[0s, 1s]",
DEF_TIME(rpc_timeout, OB_CLUSTER_PARAMETER, "2s",
"the time during which a RPC request is permitted to execute before it is terminated",
ObParameterAttr(Section::RPC, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(_enable_pkt_nio, OB_CLUSTER_PARAMETER, "False",
DEF_BOOL(_enable_pkt_nio, OB_CLUSTER_PARAMETER, "True",
"enable pkt-nio, the new RPC framework"
"Value: True:turned on; False: turned off",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));