From 29a3065d02887fc471a8a12cef7501b4ade0bf5f Mon Sep 17 00:00:00 2001 From: liucc1997 <1192520566@qq.com> Date: Thu, 18 Jan 2024 07:42:13 +0000 Subject: [PATCH] add diagnose log, to show the cost time and traffic of each rpc steps --- deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h | 4 +- .../oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp | 15 ++++++- deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h | 1 + deps/oblib/src/rpc/pnio/interface/group.c | 41 +++++++++++++++++++ deps/oblib/src/rpc/pnio/interface/group.h | 1 + deps/oblib/src/rpc/pnio/io/eloop.c | 24 +++++++---- deps/oblib/src/rpc/pnio/nio/decode.t.h | 28 ++++++++++++- deps/oblib/src/rpc/pnio/nio/handle_io.t.h | 9 ++-- deps/oblib/src/rpc/pnio/nio/packet_client.c | 1 + deps/oblib/src/rpc/pnio/nio/packet_client.h | 5 ++- deps/oblib/src/rpc/pnio/nio/packet_server.c | 13 +++++- deps/oblib/src/rpc/pnio/nio/packet_server.h | 6 ++- deps/oblib/src/rpc/pnio/nio/pktc_post.h | 20 +++++++-- deps/oblib/src/rpc/pnio/nio/pktc_sk_factory.h | 3 ++ deps/oblib/src/rpc/pnio/nio/pkts_post.h | 16 +++++++- deps/oblib/src/rpc/pnio/nio/pkts_sk_factory.h | 4 ++ deps/oblib/src/rpc/pnio/nio/write_queue.t.h | 4 ++ deps/oblib/src/rpc/pnio/pkt-nio.h | 2 +- deps/oblib/src/rpc/pnio/r0/debug.h | 22 ++++++++++ 19 files changed, 193 insertions(+), 26 deletions(-) diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h index c7016a98cf..27dcff1603 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h @@ -203,6 +203,7 @@ public: const int64_t start_ts = common::ObTimeUtility::current_time(); ObRpcMemPool* pool = NULL; uint64_t pnio_group_id = ObPocRpcServer::DEFAULT_PNIO_GROUP; + char rpc_timeguard_str[ObPocRpcServer::RPC_TIMEGUARD_STRING_SIZE] = {'\0'}; ObTimeGuard timeguard("poc_rpc_post", 10 * 1000); // TODO:@fangwu.lcc map proxy.group_id_ to pnio_group_id if (OB_LS_FETCH_LOG2 == pcode) { @@ -240,7 +241,8 @@ public: cb->gtid_ = (pnio_group_id<<32) + thread_id; pkt_id_ptr = &cb->pkt_id_; } - timeguard.click(); + IGNORE_RETURN snprintf(rpc_timeguard_str, sizeof(rpc_timeguard_str), "sz=%ld,pcode=%x,id=%ld", req_sz, pcode, src_tenant_id); + timeguard.click(rpc_timeguard_str); if (OB_SUCC(ret)) { sockaddr_in sock_addr; const pn_pkt_t pkt = { diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp index aa623b7b9e..cb82ff7b17 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp @@ -52,6 +52,7 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s int ret = OB_SUCCESS; ObPocServerHandleContext* ctx = NULL; ObRpcPacket tmp_pkt; + char rpc_timeguard_str[ObPocRpcServer::RPC_TIMEGUARD_STRING_SIZE] = {'\0'}; ObTimeGuard timeguard("rpc_request_create", 200 * 1000); const int64_t alloc_payload_sz = sz; if (OB_FAIL(tmp_pkt.decode(buf, sz))) { @@ -70,7 +71,8 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s if (OB_UNLIKELY(tmp_pkt.get_group_id() == OBCG_ELECTION)) { tenant_id = OB_SERVER_TENANT_ID; } - timeguard.click(); + IGNORE_RETURN snprintf(rpc_timeguard_str, sizeof(rpc_timeguard_str), "sz=%ld,pcode=%x,id=%ld", sz, pcode, tenant_id); + timeguard.click(rpc_timeguard_str); ObRpcMemPool* pool = ObRpcMemPool::create(tenant_id, pcode_label, pool_size); void *temp = NULL; @@ -114,8 +116,9 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s const int64_t fly_ts = receive_ts - pkt->get_timestamp(); if (fly_ts > oceanbase::common::OB_MAX_PACKET_FLY_TS && TC_REACH_TIME_INTERVAL(100 * 1000)) { + ObAddr peer = ctx->get_peer(); RPC_LOG(WARN, "PNIO packet wait too much time between proxy and server_cb", "pcode", pkt->get_pcode(), - "fly_ts", fly_ts, "send_timestamp", pkt->get_timestamp()); + "fly_ts", fly_ts, "send_timestamp", pkt->get_timestamp(), K(peer), K(sz)); } } } @@ -131,13 +134,21 @@ void ObPocServerHandleContext::resp(ObRpcPacket* pkt) char reserve_buf[2048]; // reserve stack memory for response packet buf char* buf = reserve_buf; int64_t sz = 0; + char rpc_timeguard_str[ObPocRpcServer::RPC_TIMEGUARD_STRING_SIZE] = {'\0'}; + ObTimeGuard timeguard("rpc_resp", 10 * 1000); if (NULL == pkt) { // do nothing } else if (OB_FAIL(rpc_encode_ob_packet(pool_, pkt, buf, sz, sizeof(reserve_buf)))) { RPC_LOG(WARN, "rpc_encode_ob_packet fail", KP(pkt), K(sz)); buf = NULL; sz = 0; + } else { + IGNORE_RETURN snprintf(rpc_timeguard_str, sizeof(rpc_timeguard_str), "sz=%ld,pcode=%x,id=%ld", + sz, + pkt->get_pcode(), + pkt->get_tenant_id()); } + timeguard.click(rpc_timeguard_str); if ((sys_err = pn_resp(resp_id_, buf, sz, resp_expired_abs_us_)) != 0) { RPC_LOG(WARN, "pn_resp fail", K(resp_id_), K(sys_err)); } diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h index 2ad14aa981..41a1059a93 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h @@ -60,6 +60,7 @@ public: RATELIMIT_PNIO_GROUP = 2, END_GROUP }; + enum { RPC_TIMEGUARD_STRING_SIZE = 64}; ObPocRpcServer() : has_start_(false), start_as_client_(false){} ~ObPocRpcServer() {} int start(int port, int net_thread_count, rpc::frame::ObReqDeliver* deliver); diff --git a/deps/oblib/src/rpc/pnio/interface/group.c b/deps/oblib/src/rpc/pnio/interface/group.c index 309343969b..595e690235 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.c +++ b/deps/oblib/src/rpc/pnio/interface/group.c @@ -303,6 +303,10 @@ static void pn_pktc_resp_cb(pktc_cb_t* cb, const char* resp, int64_t sz) if (req) { req->resp_cb = NULL; } + if (cb->sk) { + cb->sk->sk_diag_info.doing_cnt --; + cb->sk->sk_diag_info.done_cnt ++; + } PNIO_DELAY_WARN(STAT_TIME_GUARD(eloop_client_cb_count, eloop_client_cb_time)); pn_cb->client_cb(pn_cb->arg, cb->errcode, resp, sz); cfifo_free(pn_cb); @@ -330,11 +334,13 @@ static pktc_req_t* pn_create_pktc_req(pn_t* pn, uint64_t pkt_id, addr_t dest, co cb->resp_cb = pn_pktc_resp_cb; cb->errcode = PNIO_OK; cb->req = r; + cb->sk = NULL; r->pkt_type = PN_NORMAL_PKT; r->flush_cb = pn_pktc_flush_cb; r->resp_cb = cb; r->dest = dest; r->categ_id = pkt->categ_id; + r->sk = NULL; dlink_init(&r->link); eh_copy_msg(&r->msg, cb->id, req, req_sz); return r; @@ -648,3 +654,38 @@ PN_API int pn_get_fd(uint64_t req_id) } return fd; } + +void pn_print_diag_info(pn_comm_t* pn_comm) { + pn_t* pn = (pn_t*)pn_comm; + int64_t client_cnt = 0; + int64_t server_cnt = 0; + // print socket diag info + dlink_for(&pn->pktc.sk_list, p) { + pktc_sk_t* s = structof(p, pktc_sk_t, list_link); + rk_info("client:%p_%s_%s_%d_%ld_%d, write_queue=%lu/%lu, write=%lu/%lu, read=%lu/%lu, doing=%lu, done=%lu, write_time=%lu, read_time=%lu, process_time=%lu", + s, T2S(addr, s->sk_diag_info.local_addr), T2S(addr, s->dest), s->fd, s->sk_diag_info.establish_time, s->conn_ok, + s->wq.cnt, s->wq.sz, + s->sk_diag_info.write_cnt, s->sk_diag_info.write_size, + s->sk_diag_info.read_cnt, s->sk_diag_info.read_size, + s->sk_diag_info.doing_cnt, s->sk_diag_info.done_cnt, + s->sk_diag_info.write_wait_time, s->sk_diag_info.read_time, s->sk_diag_info.read_process_time); + client_cnt++; + } + if (pn->pkts.sk_list.next != NULL) { + dlink_for(&pn->pkts.sk_list, p) { + pkts_sk_t* s = structof(p, pkts_sk_t, list_link); + rk_info("server:%p_%s_%d_%ld, write_queue=%lu/%lu, write=%lu/%lu, read=%lu/%lu, doing=%lu, done=%lu, write_time=%lu, read_time=%lu, process_time=%lu", + s, T2S(addr, s->peer), s->fd, s->sk_diag_info.establish_time, + s->wq.cnt, s->wq.sz, + s->sk_diag_info.write_cnt, s->sk_diag_info.write_size, + s->sk_diag_info.read_cnt, s->sk_diag_info.read_size, + s->sk_diag_info.doing_cnt, s->sk_diag_info.done_cnt, + s->sk_diag_info.write_wait_time, s->sk_diag_info.read_time, s->sk_diag_info.read_process_time); + server_cnt++; + } + } + // print pnio diag info + rk_info("client_send:%lu/%lu, client_queue_time=%lu, cnt=%ld, server_send:%lu/%lu, server_queue_time=%lu, cnt=%ld", + pn->pktc.diag_info.send_cnt, pn->pktc.diag_info.send_size, pn->pktc.diag_info.sc_queue_time, client_cnt, + pn->pkts.diag_info.send_cnt, pn->pkts.diag_info.send_size, pn->pkts.diag_info.sc_queue_time, server_cnt); +} diff --git a/deps/oblib/src/rpc/pnio/interface/group.h b/deps/oblib/src/rpc/pnio/interface/group.h index 039470d918..cee719f5c0 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.h +++ b/deps/oblib/src/rpc/pnio/interface/group.h @@ -93,6 +93,7 @@ PN_API int pn_terminate_pkt(uint64_t gtid, uint32_t pkt_id); extern int64_t pnio_keepalive_timeout; pn_comm_t* get_current_pnio(); void pn_release(pn_comm_t* pn_comm); +void pn_print_diag_info(pn_comm_t* pn_comm); #define PNIO_OK 0 #define PNIO_ERROR (-1) diff --git a/deps/oblib/src/rpc/pnio/io/eloop.c b/deps/oblib/src/rpc/pnio/io/eloop.c index 6943288a94..73035beea7 100644 --- a/deps/oblib/src/rpc/pnio/io/eloop.c +++ b/deps/oblib/src/rpc/pnio/io/eloop.c @@ -129,16 +129,22 @@ int eloop_run(eloop_t* ep) { } PNIO_DELAY_WARN(eloop_delay_warn(start_us, ELOOP_WARN_US)); - if (unlikely(NULL != pn && 0 == pn->tid && PNIO_REACH_TIME_INTERVAL(1000000))) { - static __thread uint64_t last_rx_bytes = 0; - static __thread uint64_t last_time = 0; - uint64_t rx_bytes = pn_get_rxbytes(pn->gid); + if (unlikely(PNIO_REACH_TIME_INTERVAL(1000000))) { int64_t cur_time_us = rk_get_us(); - uint64_t bytes = rx_bytes >= last_rx_bytes? rx_bytes - last_rx_bytes : 0xffffffff - last_rx_bytes + rx_bytes; - double bw = ((double)(bytes)) / (cur_time_us - last_time) * 0.95367431640625; - rk_info("[ratelimit] time: %8ld, bytes: %ld, bw: %8lf MB/s, add_ts: %ld, add_bytes: %ld\n", cur_time_us, rx_bytes, bw, cur_time_us - last_time, rx_bytes - last_rx_bytes); - last_rx_bytes = rx_bytes; - last_time = cur_time_us; + if (NULL != pn && 0 == pn->tid) { + static __thread uint64_t last_rx_bytes = 0; + static __thread uint64_t last_time = 0; + uint64_t rx_bytes = pn_get_rxbytes(pn->gid); + uint64_t bytes = rx_bytes >= last_rx_bytes? rx_bytes - last_rx_bytes : 0xffffffff - last_rx_bytes + rx_bytes; + double bw = ((double)(bytes)) / (cur_time_us - last_time) * 0.95367431640625; + rk_info("[ratelimit] time: %8ld, bytes: %ld, bw: %8lf MB/s, add_ts: %ld, add_bytes: %ld\n", cur_time_us, rx_bytes, bw, cur_time_us - last_time, rx_bytes - last_rx_bytes); + last_rx_bytes = rx_bytes; + last_time = cur_time_us; + } + // print debug info each 60 seconds + if (0 == cur_time_us/1000000%60) { + pn_print_diag_info(pn); + } } } pn_release(pn); diff --git a/deps/oblib/src/rpc/pnio/nio/decode.t.h b/deps/oblib/src/rpc/pnio/nio/decode.t.h index c46c6de084..eb4715762b 100644 --- a/deps/oblib/src/rpc/pnio/nio/decode.t.h +++ b/deps/oblib/src/rpc/pnio/nio/decode.t.h @@ -18,12 +18,38 @@ static int my_sk_do_decode(my_sk_t* s, my_msg_t* msg, int64_t* avail_bytes) { while(0 == (err = my_sk_read(&b, s, sz, avail_bytes)) && NULL != b && (req_sz = my_decode((char*)b, sz)) > 0 && req_sz > sz) { sz = req_sz; + if (0 == s->sk_diag_info.last_read_time) { + s->sk_diag_info.last_read_time = rk_get_us(); + } } if (req_sz <= 0) { err = EINVAL; } if (0 == err) { - *msg = (my_msg_t) { .sz = req_sz, .payload = (char*)b }; + *msg = (my_msg_t) { .sz = req_sz, .payload = (char*)b, .ctime_us = s->sk_diag_info.last_read_time}; + if (NULL != b) { + int64_t read_time = 0; + int64_t cur_time = rk_get_us(); + if (0 == s->sk_diag_info.last_read_time) { + s->sk_diag_info.last_read_time = cur_time; + msg->ctime_us = cur_time; + } else { + read_time = cur_time - s->sk_diag_info.last_read_time; + } + if (read_time > 100*1000) { + uint64_t pcode = 0; + uint64_t tenant_id = 0; + if (req_sz > sizeof(easy_head_t) + 24) { + pcode = *(uint64_t*)((char*)b + sizeof(easy_head_t)); + tenant_id = *(uint64_t*)((char*)b + sizeof(easy_head_t) + 16); + } + rk_warn("read pkt cost too much time, read_time=%ld, pkt_size=%ld, conn=%p, pcode=0x%lx, tenant_id=%ld", read_time, req_sz, s, pcode, tenant_id); + } + s->sk_diag_info.read_cnt ++; + s->sk_diag_info.read_size += req_sz; + s->sk_diag_info.read_time += read_time; + s->sk_diag_info.last_read_time = 0; + } } return err; } diff --git a/deps/oblib/src/rpc/pnio/nio/handle_io.t.h b/deps/oblib/src/rpc/pnio/nio/handle_io.t.h index 2d69157558..b6ad1aa3ac 100644 --- a/deps/oblib/src/rpc/pnio/nio/handle_io.t.h +++ b/deps/oblib/src/rpc/pnio/nio/handle_io.t.h @@ -42,7 +42,7 @@ static int my_sk_flush(my_sk_t* s, int64_t time_limit) { int my_sk_consume(my_sk_t* s, int64_t time_limit, int64_t* avail_bytes) { int err = 0; - my_msg_t msg = (my_msg_t) { .sz = 0, .payload = NULL }; + my_msg_t msg = (my_msg_t) { .sz = 0, .payload = NULL, .ctime_us = 0}; pn_comm_t* pn = get_current_pnio(); if (avail_bytes == NULL && skt(s, IN) && LOAD(&pn->pn_grp->rx_bw) != RATE_UNLIMITED) { // push socket to ratelimit list @@ -58,8 +58,11 @@ int my_sk_consume(my_sk_t* s, int64_t time_limit, int64_t* avail_bytes) { } } else if (NULL == msg.payload) { // not read a complete package yet - } else if (0 != (err = my_sk_handle_msg(s, &msg))) { - rk_info("handle msg fail: %d", err); + } else { + s->sk_diag_info.read_process_time += (rk_get_us() - msg.ctime_us); + if (0 != (err = my_sk_handle_msg(s, &msg))) { + rk_info("handle msg fail: %d", err); + } } } return err; diff --git a/deps/oblib/src/rpc/pnio/nio/packet_client.c b/deps/oblib/src/rpc/pnio/nio/packet_client.c index d3dd4ac554..a568c7d420 100644 --- a/deps/oblib/src/rpc/pnio/nio/packet_client.c +++ b/deps/oblib/src/rpc/pnio/nio/packet_client.c @@ -13,6 +13,7 @@ typedef struct pktc_msg_t { int64_t sz; char* payload; + int64_t ctime_us; } pktc_msg_t; static int64_t pktc_decode(char* b, int64_t s) { return eh_decode(b, s); } static uint64_t pktc_get_id(pktc_msg_t* m) { return eh_packet_id(m->payload); } diff --git a/deps/oblib/src/rpc/pnio/nio/packet_client.h b/deps/oblib/src/rpc/pnio/nio/packet_client.h index 0259a84d67..f541fcdaf2 100644 --- a/deps/oblib/src/rpc/pnio/nio/packet_client.h +++ b/deps/oblib/src/rpc/pnio/nio/packet_client.h @@ -24,13 +24,14 @@ struct pktc_cb_t { int64_t expire_us; pktc_resp_cb_func_t resp_cb; pktc_req_t* req; + struct pktc_sk_t* sk; int errcode; }; struct pktc_req_t { int64_t pkt_type; struct pktc_sk_t* sk; - PNIO_DELAY_WARN(int64_t ctime_us); + int64_t ctime_us; pktc_flush_cb_func_t flush_cb; pktc_cb_t* resp_cb; addr_t dest; @@ -60,6 +61,7 @@ typedef struct pktc_sk_t { ibuffer_t ib; dlink_t cb_head; int64_t user_keepalive_timeout; + socket_diag_info_t sk_diag_info; } pktc_sk_t; typedef struct pktc_sf_t { @@ -79,4 +81,5 @@ typedef struct pktc_t { link_t sk_table[1024]; hash_t cb_map; link_t cb_table[1<<16]; + diag_info_t diag_info; } pktc_t; diff --git a/deps/oblib/src/rpc/pnio/nio/packet_server.c b/deps/oblib/src/rpc/pnio/nio/packet_server.c index ee39523719..ac2142f841 100644 --- a/deps/oblib/src/rpc/pnio/nio/packet_server.c +++ b/deps/oblib/src/rpc/pnio/nio/packet_server.c @@ -13,13 +13,19 @@ typedef struct pkts_msg_t { int64_t sz; char* payload; + int64_t ctime_us; } pkts_msg_t; static int64_t pkts_decode(char* b, int64_t s) { return eh_decode(b, s);} void pkts_flush_cb(pkts_t* io, pkts_req_t* req) { + pkts_sk_t* sk = (typeof(sk))idm_get(&io->sk_map, req->sock_id); PNIO_DELAY_WARN(delay_warn("pkts_flush_cb", req->ctime_us, FLUSH_DELAY_WARN_US)); req->flush_cb(req); + if (sk) { + sk->sk_diag_info.doing_cnt --; + sk->sk_diag_info.done_cnt ++; + } } static int pkts_sk_read(void** b, pkts_sk_t* s, int64_t sz, int64_t* avail_bytes) { @@ -28,6 +34,7 @@ static int pkts_sk_read(void** b, pkts_sk_t* s, int64_t sz, int64_t* avail_bytes static int pkts_sk_handle_msg(pkts_sk_t* s, pkts_msg_t* msg) { pkts_t* pkts = structof(s->fty, pkts_t, sf); + s->sk_diag_info.doing_cnt ++; int ret = pkts->on_req(pkts, s->ib.b, msg->payload, msg->sz, s->id); ib_consumed(&s->ib, msg->sz); return ret; @@ -35,14 +42,15 @@ static int pkts_sk_handle_msg(pkts_sk_t* s, pkts_msg_t* msg) { static int pkts_wq_flush(sock_t* s, write_queue_t* wq, dlink_t** old_head) { // delete response req that has reached expired time - if (PNIO_REACH_TIME_INTERVAL(10*1000)) { + if (PNIO_REACH_TIME_INTERVAL(100*1000)) { int64_t cur_time = rk_get_us(); + pkts_t* io = structof(s->fty, pkts_t, sf); dlink_for(&wq->queue.head, p) { pkts_req_t* req = structof(p, pkts_req_t, link); if (req->expire_us > 0 && cur_time >= req->expire_us) { if (PNIO_OK == wq_delete(wq, p)) { rk_warn("rpc resp is expired, expire_us=%ld, sock_id=%ld", req->expire_us, req->sock_id); - pkts_flush_cb(NULL, req); + pkts_flush_cb(io, req); } } } @@ -72,6 +80,7 @@ int pkts_init(pkts_t* io, eloop_t* ep, pkts_cfg_t* cfg) { rk_info("pkts listen at %s", T2S(addr, cfg->addr)); idm_init(&io->sk_map, arrlen(io->sk_table)); io->on_req = cfg->handle_func; + dlink_init(&io->sk_list); el(); return err; } diff --git a/deps/oblib/src/rpc/pnio/nio/packet_server.h b/deps/oblib/src/rpc/pnio/nio/packet_server.h index 50a99a4df2..fc40052b84 100644 --- a/deps/oblib/src/rpc/pnio/nio/packet_server.h +++ b/deps/oblib/src/rpc/pnio/nio/packet_server.h @@ -23,7 +23,7 @@ typedef struct pkts_cfg_t { } pkts_cfg_t; typedef struct pkts_req_t { - PNIO_DELAY_WARN(int64_t ctime_us); + int64_t ctime_us; int errcode; pkts_flush_cb_func_t flush_cb; uint64_t sock_id; @@ -38,9 +38,11 @@ extern int pkts_resp(pkts_t* pkts, pkts_req_t* req); typedef struct pkts_sk_t { SOCK_COMMON; + dlink_t list_link; uint64_t id; write_queue_t wq; ibuffer_t ib; + socket_diag_info_t sk_diag_info; } pkts_sk_t; typedef struct pkts_sf_t { @@ -56,4 +58,6 @@ typedef struct pkts_t { sc_queue_t req_queue; idm_t sk_map; idm_item_t sk_table[1<<16]; + dlink_t sk_list; + diag_info_t diag_info; } pkts_t; diff --git a/deps/oblib/src/rpc/pnio/nio/pktc_post.h b/deps/oblib/src/rpc/pnio/nio/pktc_post.h index 9a3d16edb2..09bb62ec47 100644 --- a/deps/oblib/src/rpc/pnio/nio/pktc_post.h +++ b/deps/oblib/src/rpc/pnio/nio/pktc_post.h @@ -66,6 +66,7 @@ static int pktc_do_post(pktc_t* io, pktc_sk_t* sk, pktc_req_t* r) { // drop req } else { if (cb) { + sk->sk_diag_info.doing_cnt ++; dlink_insert(&sk->cb_head, &cb->sk_dlink); ihash_insert(&io->cb_map, &cb->hash_link); tw_regist(&io->cb_tw, &cb->timer_dlink); @@ -79,6 +80,7 @@ static void pktc_post_io(pktc_t* io, pktc_req_t* r) { int err = PNIO_OK; pktc_sk_t* sk = pktc_try_connect(io, r->dest); r->sk = sk; + r->resp_cb->sk = sk; if (NULL == sk) { err = PNIO_CONNECT_FAIL; } else if (PNIO_OK != (err = pktc_do_post(io, sk, r))) { @@ -95,7 +97,7 @@ static void pktc_post_io(pktc_t* io, pktc_req_t* r) { } int pktc_post(pktc_t* io, pktc_req_t* req) { - PNIO_DELAY_WARN(req->ctime_us = rk_get_corse_us()); + req->ctime_us = rk_get_corse_us(); if (req->msg.s < (int64_t)sizeof(req->msg)) { return -EINVAL; } @@ -115,6 +117,8 @@ int pktc_post(pktc_t* io, pktc_req_t* req) { static int pktc_handle_req_queue(pktc_t* io) { link_t* l = NULL; int cnt = 0; + int64_t sz = 0; + int64_t sc_queue_time = 0; while(cnt < 128 && (l = sc_queue_pop(&io->req_queue))) { pktc_req_t* req = structof(l, pktc_req_t, link); if (unlikely(PN_NORMAL_PKT != req->pkt_type)) { @@ -127,11 +131,21 @@ static int pktc_handle_req_queue(pktc_t* io) { } cfifo_free(cmd_req); } else { - PNIO_DELAY_WARN(delay_warn("pktc_handle_req_queue", req->ctime_us, HANDLE_DELAY_WARN_US)); - pktc_post_io(io, req); + int64_t cur_time = rk_get_corse_us(); + int64_t delay_time = cur_time - req->ctime_us; + if (delay_time > HANDLE_DELAY_WARN_US && PNIO_REACH_TIME_INTERVAL(500*1000)) { + rk_warn("[delay_warn] delay high: %ld", delay_time); + } cnt++; + sz += req->msg.s; + sc_queue_time += delay_time; + req->ctime_us = cur_time; + pktc_post_io(io, req); } } + io->diag_info.send_cnt += cnt; + io->diag_info.send_size += sz; + io->diag_info.sc_queue_time += sc_queue_time; return cnt == 0? EAGAIN: 0; } diff --git a/deps/oblib/src/rpc/pnio/nio/pktc_sk_factory.h b/deps/oblib/src/rpc/pnio/nio/pktc_sk_factory.h index 217df32951..43871a4cea 100644 --- a/deps/oblib/src/rpc/pnio/nio/pktc_sk_factory.h +++ b/deps/oblib/src/rpc/pnio/nio/pktc_sk_factory.h @@ -21,6 +21,7 @@ static int pktc_sk_check_connect(pktc_sk_t* s) { err = EIO; } else { s->conn_ok = 1; + s->sk_diag_info.local_addr = get_local_addr(s->fd); rk_info("sock connect OK: %p %s", s, T2S(sock_fd, s->fd)); //send handshake by ussl back-ground thread /* @@ -67,9 +68,11 @@ static void pktc_sk_destroy(pktc_sf_t* sf, pktc_sk_t* s) { static pktc_sk_t* pktc_sk_new(pktc_sf_t* sf) { pktc_sk_t* s = (pktc_sk_t*)pktc_sk_alloc(sizeof(*s)); if (s) { + memset(s, 0, sizeof(*s)); s->fty = (sf_t*)sf; s->ep_fd = -1; s->handle_event = (handle_event_t)pktc_sk_handle_event; + s->sk_diag_info.establish_time = rk_get_us(); pktc_sk_init(sf, s); } rk_info("sk_new: s=%p", s); diff --git a/deps/oblib/src/rpc/pnio/nio/pkts_post.h b/deps/oblib/src/rpc/pnio/nio/pkts_post.h index e975a996ae..858b81f63f 100644 --- a/deps/oblib/src/rpc/pnio/nio/pkts_post.h +++ b/deps/oblib/src/rpc/pnio/nio/pkts_post.h @@ -41,12 +41,24 @@ int pkts_resp(pkts_t* io, pkts_req_t* req) { static int pkts_handle_req_queue(pkts_t* io) { link_t* l = NULL; int cnt = 0; + int64_t sz = 0; + int64_t sc_queue_time = 0; while(cnt < 128 && (l = sc_queue_pop(&io->req_queue))) { pkts_req_t* req = structof(l, pkts_req_t, link); - PNIO_DELAY_WARN(delay_warn("pkts_handle_req_queue", req->ctime_us, HANDLE_DELAY_WARN_US)); - pkts_post_io(io, req); + int64_t cur_time = rk_get_corse_us(); + int64_t delay_time = cur_time - req->ctime_us; + if (delay_time > HANDLE_DELAY_WARN_US && PNIO_REACH_TIME_INTERVAL(500*1000)) { + rk_warn("[delay_warn] delay high: %ld", delay_time); + } cnt++; + sz += req->msg.s; + sc_queue_time += delay_time; + req->ctime_us = cur_time; + pkts_post_io(io, req); } + io->diag_info.send_cnt += cnt; + io->diag_info.send_size += sz; + io->diag_info.sc_queue_time += sc_queue_time; return cnt == 0? EAGAIN: 0; } diff --git a/deps/oblib/src/rpc/pnio/nio/pkts_sk_factory.h b/deps/oblib/src/rpc/pnio/nio/pkts_sk_factory.h index 702eff4ded..831cb14e11 100644 --- a/deps/oblib/src/rpc/pnio/nio/pkts_sk_factory.h +++ b/deps/oblib/src/rpc/pnio/nio/pkts_sk_factory.h @@ -19,6 +19,7 @@ static int pkts_sk_init(pkts_sf_t* sf, pkts_sk_t* s) { ib_init(&s->ib, MOD_PKTS_INBUF); s->rl_ready_link.next = NULL; s->id = idm_set(&pkts->sk_map, s); + dlink_insert(&pkts->sk_list, &s->list_link); rk_info("set pkts_sk_t sock_id s=%p, s->id=%ld", s, s->id); return 0; } @@ -26,6 +27,7 @@ static int pkts_sk_init(pkts_sf_t* sf, pkts_sk_t* s) { static void pkts_sk_destroy(pkts_sf_t* sf, pkts_sk_t* s) { pkts_t* pkts = structof(sf, pkts_t, sf); idm_del(&pkts->sk_map, s->id); + dlink_delete(&s->list_link); } int pkts_sk_handle_event(pkts_sk_t* s) { @@ -39,9 +41,11 @@ static int pkts_sk_rl_handle_event(pkts_sk_t* s, int64_t* read_bytes) { static pkts_sk_t* pkts_sk_new(pkts_sf_t* sf) { pkts_sk_t* s = (pkts_sk_t*)pkts_sk_alloc(sizeof(*s)); if (s) { + memset(s, 0, sizeof(*s)); s->fty = (sf_t*)sf; s->ep_fd = -1; s->handle_event = (handle_event_t)pkts_sk_handle_event; + s->sk_diag_info.establish_time = rk_get_us(); pkts_sk_init(sf, s); } rk_info("sk_new: s=%p", s); diff --git a/deps/oblib/src/rpc/pnio/nio/write_queue.t.h b/deps/oblib/src/rpc/pnio/nio/write_queue.t.h index eb96d41bab..3da9665f42 100644 --- a/deps/oblib/src/rpc/pnio/nio/write_queue.t.h +++ b/deps/oblib/src/rpc/pnio/nio/write_queue.t.h @@ -33,11 +33,15 @@ static int my_sk_do_flush(my_sk_t* s, int64_t* remain) { dlink_t* h = NULL; int err = my_wq_flush((sock_t*)s, &s->wq, &h); my_t* io = structof(s->fty, my_t, sf); + int64_t flushed_time_us = rk_get_us(); if (0 == err && NULL != h) { dlink_t* stop = dqueue_top(&s->wq.queue); while(h != stop) { my_req_t* req = structof(h, my_req_t, link); h = h->next; + s->sk_diag_info.write_cnt ++; + s->sk_diag_info.write_size += req->msg.s; + s->sk_diag_info.write_wait_time += (flushed_time_us - req->ctime_us); my_flush_cb_after_flush(io, req); } } diff --git a/deps/oblib/src/rpc/pnio/pkt-nio.h b/deps/oblib/src/rpc/pnio/pkt-nio.h index d7cd2e59d8..c8ec0cde2c 100644 --- a/deps/oblib/src/rpc/pnio/pkt-nio.h +++ b/deps/oblib/src/rpc/pnio/pkt-nio.h @@ -18,8 +18,8 @@ #include "r0/format.h" #include "r0/log.h" #include "r0/futex.h" -#include "r0/debug.h" #include "nio/addr.h" +#include "r0/debug.h" #include "ds/link.h" #include "ds/dlink.h" diff --git a/deps/oblib/src/rpc/pnio/r0/debug.h b/deps/oblib/src/rpc/pnio/r0/debug.h index b178cbbe03..faa3c2062c 100644 --- a/deps/oblib/src/rpc/pnio/r0/debug.h +++ b/deps/oblib/src/rpc/pnio/r0/debug.h @@ -16,6 +16,28 @@ struct stat_time_guard_t { int64_t *time; const char *procedure; }; +typedef struct diag_info_t +{ + uint64_t send_cnt; + uint64_t send_size; + uint64_t sc_queue_time; +} diag_info_t; +typedef struct socket_diag_info_t +{ + int64_t establish_time; + int64_t last_read_time; + uint64_t write_cnt; + uint64_t write_size; + uint64_t write_wait_time; + uint64_t read_cnt; + uint64_t read_size; + uint64_t read_time; + uint64_t read_process_time; + uint64_t doing_cnt; + uint64_t done_cnt; + addr_t local_addr; +} socket_diag_info_t; + extern __thread int64_t eloop_malloc_count; extern __thread int64_t eloop_malloc_time;