pkt-nio set epoll handle timeout and add timegaurd
This commit is contained in:
parent
cacbb57260
commit
334557cd30
4
deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h
vendored
4
deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h
vendored
@ -176,6 +176,7 @@ public:
|
||||
const int64_t start_ts = common::ObTimeUtility::current_time();
|
||||
ObRpcMemPool* pool = NULL;
|
||||
uint64_t pnio_group_id = ObPocRpcServer::DEFAULT_PNIO_GROUP;
|
||||
ObTimeGuard timeguard("poc_rpc_post", 10 * 1000);
|
||||
// TODO:@fangwu.lcc map proxy.group_id_ to pnio_group_id
|
||||
if (OB_LS_FETCH_LOG2 == pcode) {
|
||||
pnio_group_id = ObPocRpcServer::RATELIMIT_PNIO_GROUP;
|
||||
@ -194,10 +195,12 @@ public:
|
||||
} else {
|
||||
char* req = NULL;
|
||||
int64_t req_sz = 0;
|
||||
timeguard.click();
|
||||
if (OB_FAIL(rpc_encode_req(proxy, *pool, pcode, args, opts, req, req_sz, NULL == ucb))) {
|
||||
RPC_LOG(WARN, "rpc encode req fail", K(ret));
|
||||
} else if(OB_FAIL(check_blacklist(addr))) {
|
||||
RPC_LOG(WARN, "check_blacklist failed", K(addr));
|
||||
} else if (FALSE_IT(timeguard.click())) {
|
||||
} else if (OB_FAIL(ObAsyncRespCallback::create(*pool, ucb, cb))) {
|
||||
RPC_LOG(WARN, "create ObAsyncRespCallback failed", K(ucb));
|
||||
} else if (OB_NOT_NULL(cb)) {
|
||||
@ -207,6 +210,7 @@ public:
|
||||
init_ucb(proxy, cb->get_ucb(), addr, start_ts, req_sz);
|
||||
}
|
||||
}
|
||||
timeguard.click();
|
||||
if (OB_SUCC(ret)) {
|
||||
sockaddr_in sock_addr;
|
||||
if (0 != (sys_err = pn_send(
|
||||
|
@ -49,6 +49,7 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s
|
||||
int ret = OB_SUCCESS;
|
||||
ObPocServerHandleContext* ctx = NULL;
|
||||
ObRpcPacket tmp_pkt;
|
||||
ObTimeGuard timeguard("rpc_request_create", 200 * 1000);
|
||||
const int64_t alloc_payload_sz = sz;
|
||||
if (OB_FAIL(tmp_pkt.decode(buf, sz))) {
|
||||
RPC_LOG(ERROR, "decode packet fail", K(ret));
|
||||
@ -62,6 +63,7 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s
|
||||
if (OB_UNLIKELY(tmp_pkt.get_group_id() == OBCG_ELECTION)) {
|
||||
tenant_id = OB_SERVER_TENANT_ID;
|
||||
}
|
||||
timeguard.click();
|
||||
ObRpcMemPool* pool = ObRpcMemPool::create(tenant_id, pcode_label, pool_size);
|
||||
void *temp = NULL;
|
||||
if (OB_ISNULL(pool)) {
|
||||
@ -75,6 +77,7 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s
|
||||
ctx = new(temp)ObPocServerHandleContext(*pool, resp_id, resp_expired_abs_us);
|
||||
ctx->set_peer_unsafe();
|
||||
req = new(ctx + 1)ObRequest(ObRequest::OB_RPC, ObRequest::TRANSPORT_PROTO_POC);
|
||||
timeguard.click();
|
||||
ObRpcPacket* pkt = (ObRpcPacket*)pool->alloc(sizeof(ObRpcPacket) + alloc_payload_sz);
|
||||
if (NULL == pkt) {
|
||||
RPC_LOG(WARN, "pool allocate rpc packet memory failed", K(tenant_id), K(pcode_label));
|
||||
@ -186,6 +189,7 @@ int serve_cb(int grp, const char* b, int64_t sz, uint64_t resp_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObTimeGuard timeguard("rpc_serve_cb", 200 * 1000);
|
||||
if (NULL == b || sz <= easy_head_size) {
|
||||
tmp_ret = OB_INVALID_DATA;
|
||||
RPC_LOG(WARN, "rpc request is invalid", K(tmp_ret), K(b), K(sz));
|
||||
@ -198,6 +202,7 @@ int serve_cb(int grp, const char* b, int64_t sz, uint64_t resp_id)
|
||||
if (OB_TMP_FAIL(ObPocServerHandleContext::create(resp_id, b, sz, req))) {
|
||||
RPC_LOG(WARN, "created req is null", K(tmp_ret), K(sz), K(resp_id));
|
||||
} else {
|
||||
timeguard.click();
|
||||
global_deliver->deliver(*req);
|
||||
}
|
||||
}
|
||||
|
2
deps/oblib/src/rpc/pnio/config.h
vendored
2
deps/oblib/src/rpc/pnio/config.h
vendored
@ -17,7 +17,7 @@
|
||||
#define FLUSH_DELAY_WARN_US 500000
|
||||
#define HANDLE_DELAY_WARN_US 500000
|
||||
#define ELOOP_WARN_US 500000
|
||||
#define EPOLL_HANDLE_TIME_LIMIT 0
|
||||
#define EPOLL_HANDLE_TIME_LIMIT 500000
|
||||
#define MAX_REQ_QUEUE_COUNT 4096
|
||||
#define MAX_WRITE_QUEUE_COUNT 4096
|
||||
#define MAX_CATEG_COUNT 1024
|
||||
|
2
deps/oblib/src/rpc/pnio/r0/debug.c
vendored
2
deps/oblib/src/rpc/pnio/r0/debug.c
vendored
@ -20,6 +20,6 @@ void stat_cleanup(void *s) {
|
||||
int64_t *time = ((struct stat_time_guard_t *)s)->time;
|
||||
*time += cost;
|
||||
if (cost > ELOOP_WARN_US) {
|
||||
rk_info("eloop handle events cost too much time: %ldus, procedure: %s", cost, ((struct stat_time_guard_t *)s)->procedure);
|
||||
rk_warn("[delay_warn] cost too much time: %ldus, procedure: %s", cost, ((struct stat_time_guard_t *)s)->procedure);
|
||||
}
|
||||
}
|
||||
|
4
deps/oblib/src/rpc/pnio/r0/debug.h
vendored
4
deps/oblib/src/rpc/pnio/r0/debug.h
vendored
@ -55,7 +55,7 @@ inline void eloop_delay_warn(int64_t start_us, int64_t warn_us) {
|
||||
if (warn_us > 0) {
|
||||
int64_t delay = rk_get_corse_us() - start_us;
|
||||
if (delay > warn_us) {
|
||||
rk_info("eloop handle events delay high: %ld, malloc=%ld/%ld write=%ld/%ld read=%ld/%ld server_process=%ld/%ld client_cb=%ld/%ld",
|
||||
rk_warn("[delay_warn] eloop handle events delay high: %ld, malloc=%ld/%ld write=%ld/%ld read=%ld/%ld server_process=%ld/%ld client_cb=%ld/%ld",
|
||||
delay, eloop_malloc_time, eloop_malloc_count, eloop_write_time, eloop_write_count, eloop_read_time, eloop_read_count,
|
||||
eloop_server_process_time, eloop_server_process_count, eloop_client_cb_time, eloop_client_cb_count);
|
||||
}
|
||||
@ -67,7 +67,7 @@ void delay_warn(const char* msg, int64_t start_us, int64_t warn_us)
|
||||
if (warn_us > 0) {
|
||||
int64_t delay = rk_get_corse_us() - start_us;
|
||||
if (delay > warn_us && PNIO_REACH_TIME_INTERVAL(500*1000)) {
|
||||
rk_info("%s delay high: %ld, start_us = %ld", msg, delay, start_us);
|
||||
rk_warn("[delay_warn] %s delay high: %ld, start_us=%ld", msg, delay, start_us);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user