diff --git a/deps/easy/src/io/easy_baseth_pool.c b/deps/easy/src/io/easy_baseth_pool.c index 3baf3701c..52739d58f 100644 --- a/deps/easy/src/io/easy_baseth_pool.c +++ b/deps/easy/src/io/easy_baseth_pool.c @@ -29,6 +29,7 @@ int ob_pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); void ob_set_thread_name(const char* type); int64_t ob_update_loop_ts(); +void ob_usleep(const useconds_t v); /** * start */ @@ -253,7 +254,7 @@ static void *easy_baseth_pool_monitor_func(void *args) while(tp->stoped == 0) { ob_update_loop_ts(); - usleep(us); + ob_usleep(us); ev_tstamp now = ev_time(); easy_thread_pool_for_each(th, tp, 0) { ev_tstamp last = th->lastrun; diff --git a/deps/easy/src/io/easy_maccept.c b/deps/easy/src/io/easy_maccept.c index d414cc793..11c8da20c 100644 --- a/deps/easy/src/io/easy_maccept.c +++ b/deps/easy/src/io/easy_maccept.c @@ -41,6 +41,8 @@ easy_ma_t g_ma; int ob_pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); +int ob_epoll_wait(int __epfd, struct epoll_event *__events, + int __maxevents, int __timeout); void easy_ma_init(int port) { int i; @@ -170,7 +172,7 @@ void* easy_ma_thread_func(easy_ma_t* ma) while (!ma->stop) { const int maxevents = 64; struct epoll_event events[maxevents]; - int cnt = epoll_wait(efd, events, maxevents, 1000); + int cnt = ob_epoll_wait(efd, events, maxevents, 1000); for (i = 0; i < cnt; i++) { int cfd = events[i].data.fd; int emask = events[i].events; diff --git a/deps/easy/src/io/ev_epoll.c b/deps/easy/src/io/ev_epoll.c index e3112b2f6..ca12e52b4 100644 --- a/deps/easy/src/io/ev_epoll.c +++ b/deps/easy/src/io/ev_epoll.c @@ -64,6 +64,9 @@ #include +int ob_epoll_wait(int __epfd, struct epoll_event *__events, + int __maxevents, int __timeout); + static void epoll_modify (EV_P_ int fd, int oev, int nev) { @@ -133,7 +136,7 @@ epoll_poll (EV_P_ ev_tstamp timeout) /* epoll wait times cannot be larger than (LONG_MAX - 999UL) / HZ msecs, which is below */ /* the default libev max wait time, however. */ EV_RELEASE_CB; - eventcnt = epoll_wait (backend_fd, epoll_events, epoll_eventmax, (int)ceil (timeout * 1000.)); + eventcnt = ob_epoll_wait (backend_fd, epoll_events, epoll_eventmax, (int)ceil (timeout * 1000.)); EV_ACQUIRE_CB; if (expect_false (eventcnt < 0)) { diff --git a/deps/oblib/src/lib/async/ev_epoll.c b/deps/oblib/src/lib/async/ev_epoll.c index df118a6fe..45b14a0ba 100644 --- a/deps/oblib/src/lib/async/ev_epoll.c +++ b/deps/oblib/src/lib/async/ev_epoll.c @@ -67,6 +67,9 @@ #define EV_EMASK_EPERM 0x80 +int ob_epoll_wait(int __epfd, struct epoll_event *__events, + int __maxevents, int __timeout); + static void epoll_modify (EV_P_ int fd, int oev, int nev) { @@ -150,7 +153,7 @@ epoll_poll (EV_P_ ev_tstamp timeout) /* epoll wait times cannot be larger than (LONG_MAX - 999UL) / HZ msecs, which is below */ /* the default libev max wait time, however. */ EV_RELEASE_CB; - eventcnt = epoll_wait (backend_fd, epoll_events, epoll_eventmax, timeout * 1e3); + eventcnt = ob_epoll_wait (backend_fd, epoll_events, epoll_eventmax, timeout * 1e3); EV_ACQUIRE_CB; if (expect_false (eventcnt < 0)) diff --git a/deps/oblib/src/lib/signal/ob_signal_utils.cpp b/deps/oblib/src/lib/signal/ob_signal_utils.cpp index 27d2ee23d..588f5b8fb 100644 --- a/deps/oblib/src/lib/signal/ob_signal_utils.cpp +++ b/deps/oblib/src/lib/signal/ob_signal_utils.cpp @@ -20,6 +20,10 @@ #include "lib/charset/ob_mysql_global.h" #include "lib/signal/ob_libunwind.h" +extern "C" { +extern int ob_poll(struct pollfd *__fds, nfds_t __nfds, int __timeout); +}; + namespace oceanbase { namespace common @@ -170,7 +174,7 @@ int wait_readable(int fd, int64_t timeout) bzero(&pfd, sizeof(pfd)); pfd.fd = fd; pfd.events = POLLIN; - int n = poll(&pfd, 1, timeout); + int n = ob_poll(&pfd, 1, timeout); if (-1 == n) { ret = OB_ERR_SYS; DLOG(WARN, "poll failed, errno=%d", errno); diff --git a/deps/oblib/src/lib/stat/ob_diagnose_info.h b/deps/oblib/src/lib/stat/ob_diagnose_info.h index d3b40aa3c..f55b72426 100644 --- a/deps/oblib/src/lib/stat/ob_diagnose_info.h +++ b/deps/oblib/src/lib/stat/ob_diagnose_info.h @@ -188,13 +188,19 @@ class ObSleepEventGuard : public ObWaitEventGuard { public: explicit ObSleepEventGuard( - const int64_t event_no, - const uint64_t timeout_ms = 0, - const int64_t sleep_us = 0 + const int64_t event_no, + const uint64_t timeout_ms, + const int64_t sleep_us ) : ObWaitEventGuard(event_no, timeout_ms, sleep_us, 0, 0, false) { lib::Thread::sleep_us_ = sleep_us; } + explicit ObSleepEventGuard( + const int64_t sleep_us = 0 + ) : ObWaitEventGuard(ObWaitEventIds::DEFAULT_SLEEP, 0, sleep_us, 0, 0, false) + { + lib::Thread::sleep_us_ = sleep_us; + } ~ObSleepEventGuard() { lib::Thread::sleep_us_ = 0; @@ -322,13 +328,13 @@ private: #define SLEEP(time) \ do { \ - oceanbase::common::ObSleepEventGuard wait_guard(oceanbase::common::ObWaitEventIds::DEFAULT_SLEEP, 0, ((int64_t)time) * 1000 * 1000); \ + oceanbase::common::ObSleepEventGuard wait_guard(((int64_t)time) * 1000 * 1000); \ ::sleep(time); \ } while (0) #define USLEEP(time) \ do { \ - oceanbase::common::ObSleepEventGuard wait_guard(oceanbase::common::ObWaitEventIds::DEFAULT_SLEEP, 0, (int64_t)time); \ + oceanbase::common::ObSleepEventGuard wait_guard((int64_t)time); \ ::usleep(time); \ } while (0) diff --git a/deps/oblib/src/lib/thread/ob_tenant_hook.cpp b/deps/oblib/src/lib/thread/ob_tenant_hook.cpp index 989f724d1..59c85b397 100644 --- a/deps/oblib/src/lib/thread/ob_tenant_hook.cpp +++ b/deps/oblib/src/lib/thread/ob_tenant_hook.cpp @@ -5,20 +5,23 @@ #include "lib/thread/thread.h" #include "lib/thread/ob_thread_name.h" #include "lib/thread/protected_stack_allocator.h" +#include "lib/stat/ob_diagnose_info.h" #include +#include +#include -#define SYS_HOOK(func_name, ...) \ - ({ \ - int ret = 0; \ - if (!in_sys_hook++) { \ - oceanbase::lib::Thread::is_blocking_ = true; \ - ret = real_##func_name(__VA_ARGS__); \ - oceanbase::lib::Thread::is_blocking_ = false; \ - } else { \ - ret = real_##func_name(__VA_ARGS__); \ - } \ - in_sys_hook--; \ - ret; \ +#define SYS_HOOK(func_name, ...) \ + ({ \ + int ret = 0; \ + if (!in_sys_hook++) { \ + oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT; \ + ret = real_##func_name(__VA_ARGS__); \ + oceanbase::lib::Thread::is_blocking_ = 0; \ + } else { \ + ret = real_##func_name(__VA_ARGS__); \ + } \ + in_sys_hook--; \ + ret; \ }) namespace oceanbase { @@ -116,6 +119,28 @@ int pthread_rwlock_timedwrlock(pthread_rwlock_t *__restrict __rwlock, } #endif +int ob_epoll_wait(int __epfd, struct epoll_event *__events, + int __maxevents, int __timeout) +{ + static int (*real_epoll_wait)( + int __epfd, struct epoll_event *__events, + int __maxevents, int __timeout) = epoll_wait; + int ret = 0; + oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT_FOR_IO_EVENT; + ret = SYS_HOOK(epoll_wait, __epfd, __events, __maxevents, __timeout); + return ret; +} + +int ob_poll(struct pollfd *__fds, nfds_t __nfds, int __timeout) +{ + static int (*real_poll)( + struct pollfd *__fds, nfds_t __nfds, int __timeout) = poll; + int ret = 0; + oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT_FOR_IO_EVENT; + ret = SYS_HOOK(poll, __fds, __nfds, __timeout); + return ret; +} + int ob_pthread_cond_wait(pthread_cond_t *__restrict __cond, pthread_mutex_t *__restrict __mutex) { @@ -138,6 +163,12 @@ int ob_pthread_cond_timedwait(pthread_cond_t *__restrict __cond, return ret; } +void ob_usleep(const useconds_t v) +{ + oceanbase::common::ObSleepEventGuard wait_guard((int64_t)v); + ::usleep(v); +} + int futex_hook(uint32_t *uaddr, int futex_op, uint32_t val, const struct timespec* timeout) { static long int (*real_syscall)(long int __sysno, ...) = syscall; diff --git a/deps/oblib/src/lib/thread/thread.cpp b/deps/oblib/src/lib/thread/thread.cpp index 338e2e95a..f277fabb1 100644 --- a/deps/oblib/src/lib/thread/thread.cpp +++ b/deps/oblib/src/lib/thread/thread.cpp @@ -33,7 +33,7 @@ using namespace oceanbase::lib; thread_local int64_t Thread::loop_ts_ = 0; thread_local pthread_t Thread::thread_joined_ = 0; thread_local int64_t Thread::sleep_us_ = 0; -thread_local bool Thread::is_blocking_ = false; +thread_local uint8_t Thread::is_blocking_ = 0; thread_local char* Thread::rpc_dest_addr_ = nullptr; thread_local Thread* Thread::current_thread_ = nullptr; int64_t Thread::total_thread_count_ = 0; diff --git a/deps/oblib/src/lib/thread/thread.h b/deps/oblib/src/lib/thread/thread.h index aa738a29f..9695433cf 100644 --- a/deps/oblib/src/lib/thread/thread.h +++ b/deps/oblib/src/lib/thread/thread.h @@ -60,11 +60,14 @@ public: return update_loop_ts(common::ObTimeUtility::fast_current_time()); } public: + static constexpr uint8_t WAIT = (1 << 0); + static constexpr uint8_t WAIT_IN_TENANT_QUEUE = (1 << 1); + static constexpr uint8_t WAIT_FOR_IO_EVENT = (1 << 2); // for thread diagnose, maybe replace it with union later. static thread_local int64_t loop_ts_; static thread_local pthread_t thread_joined_; static thread_local int64_t sleep_us_; - static thread_local bool is_blocking_; + static thread_local uint8_t is_blocking_; static thread_local char* rpc_dest_addr_; private: static void* __th_start(void *th); diff --git a/deps/oblib/src/rpc/frame/ob_req_transport.cpp b/deps/oblib/src/rpc/frame/ob_req_transport.cpp index d1a8cc474..644888a24 100644 --- a/deps/oblib/src/rpc/frame/ob_req_transport.cpp +++ b/deps/oblib/src/rpc/frame/ob_req_transport.cpp @@ -435,7 +435,7 @@ ObPacket *ObReqTransport::send_session(easy_session_t *s) const // Synchronous rpc always needs to return packets s->unneed_response = false; s->r.client_start_time = common::ObTimeUtility::current_time(); - lib::Thread::update_loop_ts(s->r.client_start_time); + lib::Thread::loop_ts_ = s->r.client_start_time; // avoid clear_clock if (0 == s->addr.cidx) { s->addr.cidx = balance_assign(s); } diff --git a/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp b/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp index 9698830ce..04033be9e 100644 --- a/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp +++ b/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp @@ -37,6 +37,11 @@ using namespace oceanbase::common; #define SO_REUSEPORT 15 #endif +extern "C" { +extern int ob_epoll_wait(int __epfd, struct epoll_event *__events, + int __maxevents, int __timeout); +}; + namespace oceanbase { namespace obmysql @@ -753,7 +758,7 @@ private: void handle_epoll_event() { const int maxevents = 512; struct epoll_event events[maxevents]; - int cnt = epoll_wait(epfd_, events, maxevents, 1000); + int cnt = ob_epoll_wait(epfd_, events, maxevents, 1000); for(int i = 0; i < cnt; i++) { ObSqlSock* s = (ObSqlSock*)events[i].data.ptr; if (OB_UNLIKELY(NULL == s)) { diff --git a/deps/oblib/src/rpc/obrpc/ob_listener.cpp b/deps/oblib/src/rpc/obrpc/ob_listener.cpp index 625c9e4f2..992140157 100644 --- a/deps/oblib/src/rpc/obrpc/ob_listener.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_listener.cpp @@ -313,7 +313,7 @@ void ObListener::do_work() struct epoll_event conn_ev; while(!has_set_stop()) { - int cnt = epoll_wait(epoll_fd, events, MAXEPOLLSIZE, 1000); + int cnt = ob_epoll_wait(epoll_fd, events, MAXEPOLLSIZE, 1000); for (int i = 0; i < cnt; i++) { int accept_fd = events[i].data.fd; uint64_t client_magic = 0; diff --git a/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp b/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp index 812b1ca44..c3f250e38 100644 --- a/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp @@ -34,6 +34,11 @@ #include "rpc/frame/ob_req_transport.h" #include "io/easy_negotiation.h" +extern "C" { +extern int ob_epoll_wait(int __epfd, struct epoll_event *__events, + int __maxevents, int __timeout); +}; + using namespace oceanbase::common; using namespace oceanbase::common::serialization; using namespace oceanbase::lib; @@ -442,7 +447,7 @@ void ObNetKeepAlive::do_server_loop() ob_abort(); } while (!has_set_stop()) { - int cnt = epoll_wait(epfd, events, sizeof events/sizeof events[0], 1000); + int cnt = ob_epoll_wait(epfd, events, sizeof events/sizeof events[0], 1000); for (int i = 0; i < cnt; i++) { struct server *s = (struct server *)events[i].data.ptr; int ev_fd = NULL == s? pipefd_ : s->fd_; @@ -539,7 +544,7 @@ void ObNetKeepAlive::do_client_loop() int64_t now = get_usec(); int64_t past = now - last_check_ts; if (past < KEEPALIVE_INTERVAL) { - usleep(KEEPALIVE_INTERVAL - past); + ob_usleep(KEEPALIVE_INTERVAL - past); } for (int i = 0; i < MAX_RS_COUNT; i++) { struct rpc_server *rs = ATOMIC_LOAD(&rss_[i]); @@ -586,7 +591,7 @@ void ObNetKeepAlive::do_client_loop() } if (OB_SUCC(ret)) { - int cnt = epoll_wait(epfd, events, sizeof events/sizeof events[0], KEEPALIVE_INTERVAL/2/1000); + int cnt = ob_epoll_wait(epfd, events, sizeof events/sizeof events[0], KEEPALIVE_INTERVAL/2/1000); for (int i = 0; i < cnt; i++) { client *c = (client *)events[i].data.ptr; int ev_fd = c->fd_; diff --git a/deps/oblib/src/rpc/pnio/io/eloop.c b/deps/oblib/src/rpc/pnio/io/eloop.c index 6b312e076..78fba659c 100644 --- a/deps/oblib/src/rpc/pnio/io/eloop.c +++ b/deps/oblib/src/rpc/pnio/io/eloop.c @@ -48,7 +48,7 @@ void eloop_fire(eloop_t* ep, sock_t* s) { static void eloop_refire(eloop_t* ep, int64_t timeout) { const int maxevents = 512; struct epoll_event events[maxevents]; - int cnt = epoll_wait(ep->fd, events, maxevents, timeout); + int cnt = ob_epoll_wait(ep->fd, events, maxevents, timeout); for(int i = 0; i < cnt; i++) { sock_t* s = (sock_t*)events[i].data.ptr; s->mask |= events[i].events; diff --git a/deps/ussl-hook/loop/ussl_eloop.c b/deps/ussl-hook/loop/ussl_eloop.c index c553021f7..c0b855ace 100644 --- a/deps/ussl-hook/loop/ussl_eloop.c +++ b/deps/ussl-hook/loop/ussl_eloop.c @@ -1,3 +1,5 @@ +int64_t ob_update_loop_ts(); + struct epoll_event *ussl_make_epoll_event(struct epoll_event *event, uint32_t event_flag, void *val) { event->events = event_flag; @@ -41,7 +43,7 @@ static void ussl_eloop_refire(ussl_eloop_t *ep) { const int maxevents = 512; struct epoll_event events[maxevents]; - int cnt = epoll_wait(ep->fd, events, maxevents, 1000); + int cnt = ob_epoll_wait(ep->fd, events, maxevents, 1000); for (int i = 0; i < cnt; i++) { ussl_sock_t *s = (ussl_sock_t *)events[i].data.ptr; s->mask |= events[i].events; @@ -79,6 +81,7 @@ static void ussl_eloop_handle_sock_event(ussl_sock_t *s) int ussl_eloop_run(ussl_eloop_t *ep) { while (1) { + ob_update_loop_ts(); ussl_eloop_refire(ep); ussl_dlink_for(&ep->ready_link, p) { ussl_eloop_handle_sock_event(ussl_structof(p, ussl_sock_t, ready_link)); } check_and_handle_timeout_event(); diff --git a/deps/ussl-hook/ussl-hook.h b/deps/ussl-hook/ussl-hook.h index c61c5338e..d6d1035f5 100644 --- a/deps/ussl-hook/ussl-hook.h +++ b/deps/ussl-hook/ussl-hook.h @@ -62,4 +62,7 @@ typedef struct ssl_config_item_t const char *ssl_invited_nodes; // the list of observers to enable SSL } ssl_config_item_t; +extern int ob_epoll_wait(int __epfd, struct epoll_event *__events, + int __maxevents, int __timeout); + #endif // USSL_HOOK_USSL_HOOK_ diff --git a/deps/ussl-hook/ussl-loop.c b/deps/ussl-hook/ussl-loop.c index 6fc8fdfc2..bf4af670c 100644 --- a/deps/ussl-hook/ussl-loop.c +++ b/deps/ussl-hook/ussl-loop.c @@ -20,6 +20,10 @@ static ussl_sf_t acceptfd_fty; static ussl_sf_t clientfd_fty; static pthread_t ussl_bg_thread_id; +int ob_pthread_create(pthread_t *thread, const pthread_attr_t *attr, + void *(*start_routine) (void *), void *arg); +void ob_set_thread_name(const char* type); + static int uloop_init(uloop_t *l) { int err = 0; @@ -63,7 +67,7 @@ static int uloop_add_listen(uloop_t *l, int listen_fd, int backlog) static void *bg_thread_func(void *arg) { - prctl(PR_SET_NAME, "ussl_loop"); + ob_set_thread_name("ussl_loop"); uloop_run(&global_ussl_loop_struct); return NULL; } @@ -86,7 +90,7 @@ int init_bg_thread() if (0 == ret) { if (0 != uloop_init(&global_ussl_loop_struct)) { ussl_log_error("initialize uloop failed.") - } else if (0 != pthread_create(&ussl_bg_thread_id, NULL, bg_thread_func, NULL)) { + } else if (0 != ob_pthread_create(&ussl_bg_thread_id, NULL, bg_thread_func, NULL)) { ret = EIO; ussl_log_error("create background thread failed, errno:%d", errno); } else { diff --git a/src/diagnose/lua/ob_lua_api.cpp b/src/diagnose/lua/ob_lua_api.cpp index 5e889701a..b86f5d7ef 100644 --- a/src/diagnose/lua/ob_lua_api.cpp +++ b/src/diagnose/lua/ob_lua_api.cpp @@ -1926,16 +1926,16 @@ int dump_thread_info(lua_State *L) gen.next_column(trace_id_buf); } // status + GET_OTHER_TSI_ADDR(uint8_t, is_blocking, &Thread::is_blocking_); { GET_OTHER_TSI_ADDR(pthread_t, join_addr, &Thread::thread_joined_); GET_OTHER_TSI_ADDR(int64_t, sleep_us, &Thread::sleep_us_); - GET_OTHER_TSI_ADDR(bool, is_blocking, &Thread::is_blocking_); const char* status_str = nullptr; if (0 != join_addr) { status_str = "Join"; } else if (0 != sleep_us) { status_str = "Sleep"; - } else if (is_blocking) { + } else if (0 != is_blocking) { status_str = "Wait"; } else { status_str = "Run"; @@ -1972,6 +1972,10 @@ int dump_thread_info(lua_State *L) do_with_crash_restore([&] { IGNORE_RETURN snprintf(wait_event, BUF_LEN, "rpc to %s", rpc_dest_addr); }, has_segv); + } else if (0 != (is_blocking & Thread::WAIT_IN_TENANT_QUEUE)) { + IGNORE_RETURN snprintf(wait_event, BUF_LEN, "tenant worker request"); + } else if (0 != (is_blocking & Thread::WAIT_FOR_IO_EVENT)) { + IGNORE_RETURN snprintf(wait_event, BUF_LEN, "IO events"); } gen.next_column(wait_event); } diff --git a/src/diagnose/lua/ob_lua_handler.cpp b/src/diagnose/lua/ob_lua_handler.cpp index 72b5e67ca..1b9b31e8e 100644 --- a/src/diagnose/lua/ob_lua_handler.cpp +++ b/src/diagnose/lua/ob_lua_handler.cpp @@ -34,6 +34,8 @@ extern "C" { #include #include #include +extern int ob_epoll_wait(int __epfd, struct epoll_event *__events, + int __maxevents, int __timeout); } using namespace oceanbase; @@ -178,7 +180,7 @@ int ObUnixDomainListener::run() int conn_fd = -1; int ret = OB_SUCCESS; lib::Thread::update_loop_ts(); - int64_t event_cnt = epoll_wait(epoll_fd, events, EPOLL_EVENT_BUFFER_SIZE, TIMEOUT); + int64_t event_cnt = ob_epoll_wait(epoll_fd, events, EPOLL_EVENT_BUFFER_SIZE, TIMEOUT); if (event_cnt < 0) { if (EINTR == errno) { // timeout, ignore diff --git a/src/logservice/archiveservice/ob_archive_timer.cpp b/src/logservice/archiveservice/ob_archive_timer.cpp index 525c9288f..dfcc7eb07 100644 --- a/src/logservice/archiveservice/ob_archive_timer.cpp +++ b/src/logservice/archiveservice/ob_archive_timer.cpp @@ -112,7 +112,7 @@ void ObArchiveTimer::run1() int64_t end_tstamp = ObTimeUtility::current_time(); int64_t wait_interval = THREAD_RUN_INTERVAL - (end_tstamp - begin_tstamp); if (wait_interval > 0) { - usleep(wait_interval); + ob_usleep(wait_interval); } } } diff --git a/src/observer/ob_signal_handle.cpp b/src/observer/ob_signal_handle.cpp index 133c46bd8..b0fdde665 100644 --- a/src/observer/ob_signal_handle.cpp +++ b/src/observer/ob_signal_handle.cpp @@ -51,7 +51,10 @@ void ObSignalHandle::run1() //to check _stop every second struct timespec timeout = {1, 0}; while (!has_set_stop()) {//need not to check ret - if ( -1 == (signum = sigtimedwait(&waitset, NULL, &timeout))) { + oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT; + signum = sigtimedwait(&waitset, NULL, &timeout); + oceanbase::lib::Thread::is_blocking_ = 0; + if (-1 == signum) { //do not log error, because timeout will also return -1. } else if (OB_FAIL(deal_signals(signum))) { LOG_WARN("Deal signal error", K(ret), K(signum)); diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index 578874d8b..3bf2eadb8 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -375,7 +375,7 @@ void ObResourceGroup::check_worker_count() workers_.remove(wnode); destroy_worker(w); } else if (w->has_req_flag() - && Thread::is_blocking_ + && 0 != Thread::is_blocking_ && w->is_default_worker()) { ++token; } @@ -1018,6 +1018,7 @@ int ObTenant::get_new_request( ObLink* task = nullptr; req = nullptr; + Thread::is_blocking_ |= Thread::WAIT_IN_TENANT_QUEUE; if (w.is_group_worker()) { w.set_large_query(false); w.set_curr_request_level(0); @@ -1361,7 +1362,7 @@ void ObTenant::check_worker_count() workers_.remove(wnode); destroy_worker(w); } else if (w->has_req_flag() - && Thread::is_blocking_ + && 0 != Thread::is_blocking_ && w->is_default_worker()) { ++token; } diff --git a/src/observer/virtual_table/ob_all_virtual_thread.cpp b/src/observer/virtual_table/ob_all_virtual_thread.cpp index 367e10d2e..d843d69fd 100644 --- a/src/observer/virtual_table/ob_all_virtual_thread.cpp +++ b/src/observer/virtual_table/ob_all_virtual_thread.cpp @@ -57,15 +57,24 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) for (auto* header = *guard; OB_NOT_NULL(header); header = guard.next()) { auto* thread_base = (char*)(header->pth_); if (OB_NOT_NULL(thread_base)) { + GET_OTHER_TSI_ADDR(int64_t, tid, &get_tid_cache()); + { + char path[64]; + IGNORE_RETURN snprintf(path, 64, "/proc/self/task/%ld", tid); + if (-1 == access(path, F_OK)) { + // thread not exist, may have exited. + continue; + } + } GET_OTHER_TSI_ADDR(uint64_t, tenant_id, &ob_get_tenant_id()); if (!is_sys_tenant(effective_tenant_id_) && tenant_id != effective_tenant_id_) { continue; } - GET_OTHER_TSI_ADDR(int64_t, tid, &get_tid_cache()); GET_OTHER_TSI_ADDR(uint32_t*, wait_addr, &ObLatch::current_wait); GET_OTHER_TSI_ADDR(pthread_t, join_addr, &Thread::thread_joined_); GET_OTHER_TSI_ADDR(int64_t, sleep_us, &Thread::sleep_us_); + GET_OTHER_TSI_ADDR(uint8_t, is_blocking, &Thread::is_blocking_); for (int64_t i = 0; i < col_count && OB_SUCC(ret); ++i) { const uint64_t col_id = output_column_ids_.at(i); ObObj *cells = cur_row_.cells_; @@ -81,7 +90,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) break; } case TENANT_ID: { - cells[i].set_int(tenant_id); + cells[i].set_int(0 == tenant_id ? OB_SERVER_TENANT_ID : tenant_id); break; } case TID: { @@ -98,13 +107,12 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) break; } case STATUS: { - GET_OTHER_TSI_ADDR(bool, is_blocking, &Thread::is_blocking_); const char* status_str = nullptr; if (0 != join_addr) { status_str = "Join"; } else if (0 != sleep_us) { status_str = "Sleep"; - } else if (is_blocking) { + } else if (0 != is_blocking) { status_str = "Wait"; } else { status_str = "Run"; @@ -138,6 +146,10 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) do_with_crash_restore([&] { IGNORE_RETURN snprintf(wait_event_, 64, "rpc to %s", rpc_dest_addr); }, has_segv); + } else if (0 != (is_blocking & Thread::WAIT_IN_TENANT_QUEUE)) { + IGNORE_RETURN snprintf(wait_event_, 64, "tenant worker requests"); + } else if (0 != (is_blocking & Thread::WAIT_FOR_IO_EVENT)) { + IGNORE_RETURN snprintf(wait_event_, 64, "IO events"); } cells[i].set_varchar(wait_event_); cells[i].set_collation_type( diff --git a/src/share/ob_local_device.cpp b/src/share/ob_local_device.cpp index ca6b3ecd0..54061d868 100644 --- a/src/share/ob_local_device.cpp +++ b/src/share/ob_local_device.cpp @@ -1213,12 +1213,14 @@ int ObLocalDevice::io_getevents( SHARE_LOG(WARN, "Invalid io context pointer, ", K(ret), KP(io_context)); } else { int sys_ret = 0; + oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT_FOR_IO_EVENT; while ((sys_ret = ::io_getevents( local_io_context->io_context_, min_nr, local_io_events->max_event_cnt_, local_io_events->io_events_, timeout)) < 0 && -EINTR == sys_ret); // ignore EINTR + oceanbase::lib::Thread::is_blocking_ = 0; if (sys_ret < 0) { ret = OB_IO_ERROR; SHARE_LOG(WARN, "Fail to get io events, ", K(ret), K(sys_ret), KERRMSG); diff --git a/src/share/ob_task_define.cpp b/src/share/ob_task_define.cpp index 20f2e03e9..ab22c807f 100644 --- a/src/share/ob_task_define.cpp +++ b/src/share/ob_task_define.cpp @@ -86,7 +86,7 @@ private: { lib::set_thread_name("LogLimiterRefresh"); while (!has_set_stop()) { - usleep(100000); + ob_usleep(100000); limiter_.refresh(); } } diff --git a/src/sql/engine/expr/ob_expr_func_sleep.cpp b/src/sql/engine/expr/ob_expr_func_sleep.cpp index b69df3050..cfa4f4cec 100644 --- a/src/sql/engine/expr/ob_expr_func_sleep.cpp +++ b/src/sql/engine/expr/ob_expr_func_sleep.cpp @@ -58,7 +58,7 @@ int ObExprSleep::sleep(int64_t usec) useconds_t usec_req = static_cast(MIN(CHECK_INTERVAL_IN_US, usec_rem)); ObWaitEventGuard wait_guard(ObWaitEventIds::DEFAULT_SLEEP, 0, usec); while(usec_req > 0) { - (void)::usleep(usec_req); + ob_usleep(usec_req); if (OB_FAIL(THIS_WORKER.check_status())) { break; } else { diff --git a/src/storage/tx/ob_xa_inner_table_gc_worker.cpp b/src/storage/tx/ob_xa_inner_table_gc_worker.cpp index f4b2da6a8..35ad27304 100644 --- a/src/storage/tx/ob_xa_inner_table_gc_worker.cpp +++ b/src/storage/tx/ob_xa_inner_table_gc_worker.cpp @@ -100,7 +100,7 @@ void ObXAInnerTableGCWorker::run1() } } //sleep 20 secnd whether gc succ or not - sleep(20); + SLEEP(20); // try refresh gc_interval, // if gc falied, and not update last_scan_ts, update gc_interval can be effective // gc_interval ~ [20s, 24h]