From cf36ee8bbfdd41731ba9a35c65671e8e8f037f7d Mon Sep 17 00:00:00 2001 From: nroskill Date: Thu, 15 Jun 2023 10:12:32 +0000 Subject: [PATCH] fix dynamic worker working in performance testing --- deps/oblib/src/lib/thread/ob_tenant_hook.cpp | 30 +++++---- deps/oblib/src/lib/thread/thread.cpp | 3 +- deps/oblib/src/lib/thread/thread.h | 61 ++++++++++++++++++- deps/oblib/src/rpc/frame/ob_req_transport.cpp | 7 ++- deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h | 49 +++++++-------- src/diagnose/lua/ob_lua_api.cpp | 13 ++-- src/observer/ob_signal_handle.cpp | 7 ++- src/observer/omt/ob_tenant.cpp | 22 +++---- src/observer/omt/ob_th_worker.cpp | 4 +- src/observer/omt/ob_th_worker.h | 4 +- .../virtual_table/ob_all_virtual_thread.cpp | 13 ++-- src/share/ob_local_device.cpp | 17 +++--- 12 files changed, 146 insertions(+), 84 deletions(-) diff --git a/deps/oblib/src/lib/thread/ob_tenant_hook.cpp b/deps/oblib/src/lib/thread/ob_tenant_hook.cpp index 59c85b397..20484b300 100644 --- a/deps/oblib/src/lib/thread/ob_tenant_hook.cpp +++ b/deps/oblib/src/lib/thread/ob_tenant_hook.cpp @@ -10,18 +10,17 @@ #include #include -#define SYS_HOOK(func_name, ...) \ - ({ \ - int ret = 0; \ - if (!in_sys_hook++) { \ - oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT; \ - ret = real_##func_name(__VA_ARGS__); \ - oceanbase::lib::Thread::is_blocking_ = 0; \ - } else { \ - ret = real_##func_name(__VA_ARGS__); \ - } \ - in_sys_hook--; \ - ret; \ +#define SYS_HOOK(func_name, ...) \ + ({ \ + int ret = 0; \ + if (!in_sys_hook++) { \ + oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT); \ + ret = real_##func_name(__VA_ARGS__); \ + } else { \ + ret = real_##func_name(__VA_ARGS__); \ + } \ + in_sys_hook--; \ + ret; \ }) namespace oceanbase { @@ -98,9 +97,8 @@ int pthread_join(pthread_t _thread, void **__retval) static int (*real_pthread_join)(pthread_t _thread, void **__retval) = (typeof(real_pthread_join))dlsym(RTLD_NEXT, "pthread_join"); int ret = 0; - ::oceanbase::lib::Thread::thread_joined_ = _thread; + ::oceanbase::lib::Thread::JoinGuard guard(_thread); ret = SYS_HOOK(pthread_join, _thread, __retval); - ::oceanbase::lib::Thread::thread_joined_ = 0; return ret; } @@ -126,7 +124,7 @@ int ob_epoll_wait(int __epfd, struct epoll_event *__events, int __epfd, struct epoll_event *__events, int __maxevents, int __timeout) = epoll_wait; int ret = 0; - oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT_FOR_IO_EVENT; + oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_IO_EVENT); ret = SYS_HOOK(epoll_wait, __epfd, __events, __maxevents, __timeout); return ret; } @@ -136,7 +134,7 @@ int ob_poll(struct pollfd *__fds, nfds_t __nfds, int __timeout) static int (*real_poll)( struct pollfd *__fds, nfds_t __nfds, int __timeout) = poll; int ret = 0; - oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT_FOR_IO_EVENT; + oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_IO_EVENT); ret = SYS_HOOK(poll, __fds, __nfds, __timeout); return ret; } diff --git a/deps/oblib/src/lib/thread/thread.cpp b/deps/oblib/src/lib/thread/thread.cpp index 9ae1ca8f9..a18bf8937 100644 --- a/deps/oblib/src/lib/thread/thread.cpp +++ b/deps/oblib/src/lib/thread/thread.cpp @@ -33,8 +33,9 @@ using namespace oceanbase::lib; thread_local int64_t Thread::loop_ts_ = 0; thread_local pthread_t Thread::thread_joined_ = 0; thread_local int64_t Thread::sleep_us_ = 0; -thread_local uint8_t Thread::is_blocking_ = 0; +thread_local int64_t Thread::blocking_ts_ = 0; thread_local ObAddr Thread::rpc_dest_addr_; +thread_local uint8_t Thread::wait_event_ = 0; thread_local Thread* Thread::current_thread_ = nullptr; int64_t Thread::total_thread_count_ = 0; diff --git a/deps/oblib/src/lib/thread/thread.h b/deps/oblib/src/lib/thread/thread.h index b63c0522b..8a0106aaf 100644 --- a/deps/oblib/src/lib/thread/thread.h +++ b/deps/oblib/src/lib/thread/thread.h @@ -63,16 +63,73 @@ public: return update_loop_ts(common::ObTimeUtility::fast_current_time()); } public: + class BaseWaitGuard + { + public: + OB_INLINE explicit BaseWaitGuard() : last_ts_(blocking_ts_) + { + blocking_ts_ = common::ObTimeUtility::fast_current_time(); + } + ~BaseWaitGuard() + { + blocking_ts_ = last_ts_; + } + private: + int64_t last_ts_; + }; + class WaitGuard : public BaseWaitGuard + { + public: + OB_INLINE explicit WaitGuard(uint8_t type) : type_(type) + { + wait_event_ |= type; + } + ~WaitGuard() + { + wait_event_ &= ~type_; + } + private: + uint8_t type_; + }; + class JoinGuard : public BaseWaitGuard + { + public: + OB_INLINE explicit JoinGuard(pthread_t thread) + { + thread_joined_ = thread; + } + ~JoinGuard() + { + thread_joined_ = 0; + } + }; + class RpcGuard : public BaseWaitGuard + { + public: + OB_INLINE explicit RpcGuard(const easy_addr_t& addr) + { + IGNORE_RETURN new (&rpc_dest_addr_) ObAddr(addr); + } + OB_INLINE explicit RpcGuard(const ObAddr& addr) + { + IGNORE_RETURN new (&rpc_dest_addr_) ObAddr(addr); + } + ~RpcGuard() + { + rpc_dest_addr_.reset(); + } + }; + static constexpr uint8_t WAIT = (1 << 0); static constexpr uint8_t WAIT_IN_TENANT_QUEUE = (1 << 1); static constexpr uint8_t WAIT_FOR_IO_EVENT = (1 << 2); - static constexpr uint8_t WAIT_FOR_TRANS_RETRY = (1 << 3); // for thread diagnose, maybe replace it with union later. static thread_local int64_t loop_ts_; static thread_local pthread_t thread_joined_; static thread_local int64_t sleep_us_; - static thread_local uint8_t is_blocking_; + static thread_local int64_t blocking_ts_; static thread_local ObAddr rpc_dest_addr_; + static thread_local uint8_t wait_event_; private: static void* __th_start(void *th); void destroy_stack(); diff --git a/deps/oblib/src/rpc/frame/ob_req_transport.cpp b/deps/oblib/src/rpc/frame/ob_req_transport.cpp index 391f8b742..5870fe3ca 100644 --- a/deps/oblib/src/rpc/frame/ob_req_transport.cpp +++ b/deps/oblib/src/rpc/frame/ob_req_transport.cpp @@ -440,9 +440,10 @@ ObPacket *ObReqTransport::send_session(easy_session_t *s) const s->addr.cidx = balance_assign(s); } - IGNORE_RETURN new (&lib::Thread::rpc_dest_addr_) ObAddr(s->addr); - pkt = reinterpret_cast(easy_client_send(eio_, s->addr, s)); - lib::Thread::rpc_dest_addr_.reset(); + { + lib::Thread::RpcGuard guard(s->addr); + pkt = reinterpret_cast(easy_client_send(eio_, s->addr, s)); + } if (NULL == pkt) { char buff[OB_SERVER_ADDR_STR_LEN] = {'\0'}; easy_inet_addr_to_str(&s->addr, buff, OB_SERVER_ADDR_STR_LEN); diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h index 6098b9e84..7ba6163db 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h @@ -131,31 +131,32 @@ public: if (OB_LS_FETCH_LOG2 == pcode) { pnio_group_id = ObPocRpcServer::RATELIMIT_PNIO_GROUP; } - IGNORE_RETURN new (&lib::Thread::rpc_dest_addr_) ObAddr(addr); - if (OB_FAIL(rpc_encode_req(proxy, pool, pcode, args, opts, req, req_sz, false))) { - RPC_LOG(WARN, "rpc encode req fail", K(ret)); - } else if(OB_FAIL(check_blacklist(addr))) { - RPC_LOG(WARN, "check_blacklist failed", K(ret)); - } else if (0 != (sys_err = pn_send( - (pnio_group_id<<32) + thread_id, - obaddr2sockaddr(&sock_addr, addr), - req, - req_sz, - static_cast(set.idx_of_pcode(pcode)), - start_ts + get_proxy_timeout(proxy), - ObSyncRespCallback::client_cb, - &cb))) { - ret = translate_io_error(sys_err); - RPC_LOG(WARN, "pn_send fail", K(sys_err), K(addr), K(pcode)); - } else if (OB_FAIL(cb.wait())) { - RPC_LOG(WARN, "sync rpc execute fail", K(ret), K(addr), K(pcode)); - } else if (NULL == (resp = cb.get_resp(resp_sz))) { - ret = common::OB_ERR_UNEXPECTED; - RPC_LOG(WARN, "sync rpc execute success but resp is null", K(ret), K(addr), K(pcode)); - } else if (OB_FAIL(rpc_decode_resp(resp, resp_sz, out, resp_pkt, rcode))) { - RPC_LOG(WARN, "execute rpc fail", K(addr), K(pcode), K(ret)); + { + lib::Thread::RpcGuard guard(addr); + if (OB_FAIL(rpc_encode_req(proxy, pool, pcode, args, opts, req, req_sz, false))) { + RPC_LOG(WARN, "rpc encode req fail", K(ret)); + } else if(OB_FAIL(check_blacklist(addr))) { + RPC_LOG(WARN, "check_blacklist failed", K(ret)); + } else if (0 != (sys_err = pn_send( + (pnio_group_id<<32) + thread_id, + obaddr2sockaddr(&sock_addr, addr), + req, + req_sz, + static_cast(set.idx_of_pcode(pcode)), + start_ts + get_proxy_timeout(proxy), + ObSyncRespCallback::client_cb, + &cb))) { + ret = translate_io_error(sys_err); + RPC_LOG(WARN, "pn_send fail", K(sys_err), K(addr), K(pcode)); + } else if (OB_FAIL(cb.wait())) { + RPC_LOG(WARN, "sync rpc execute fail", K(ret), K(addr), K(pcode)); + } else if (NULL == (resp = cb.get_resp(resp_sz))) { + ret = common::OB_ERR_UNEXPECTED; + RPC_LOG(WARN, "sync rpc execute success but resp is null", K(ret), K(addr), K(pcode)); + } else if (OB_FAIL(rpc_decode_resp(resp, resp_sz, out, resp_pkt, rcode))) { + RPC_LOG(WARN, "execute rpc fail", K(addr), K(pcode), K(ret)); + } } - lib::Thread::rpc_dest_addr_.reset(); if (rcode.rcode_ != OB_DESERIALIZE_ERROR) { int wb_ret = OB_SUCCESS; if (common::OB_SUCCESS != (wb_ret = log_user_error_and_warn(rcode))) { diff --git a/src/diagnose/lua/ob_lua_api.cpp b/src/diagnose/lua/ob_lua_api.cpp index afeb9d78a..b3354aedc 100644 --- a/src/diagnose/lua/ob_lua_api.cpp +++ b/src/diagnose/lua/ob_lua_api.cpp @@ -1937,7 +1937,7 @@ int dump_thread_info(lua_State *L) gen.next_column(trace_id_buf); } // status - GET_OTHER_TSI_ADDR(is_blocking, &Thread::is_blocking_); + GET_OTHER_TSI_ADDR(blocking_ts, &Thread::blocking_ts_); { GET_OTHER_TSI_ADDR(join_addr, &Thread::thread_joined_); GET_OTHER_TSI_ADDR(sleep_us, &Thread::sleep_us_); @@ -1946,7 +1946,7 @@ int dump_thread_info(lua_State *L) status_str = "Join"; } else if (0 != sleep_us) { status_str = "Sleep"; - } else if (0 != is_blocking) { + } else if (0 != blocking_ts) { status_str = "Wait"; } else { status_str = "Run"; @@ -1959,6 +1959,7 @@ int dump_thread_info(lua_State *L) GET_OTHER_TSI_ADDR(join_addr, &Thread::thread_joined_); GET_OTHER_TSI_ADDR(sleep_us, &Thread::sleep_us_); GET_OTHER_TSI_ADDR(rpc_dest_addr, &Thread::rpc_dest_addr_); + GET_OTHER_TSI_ADDR(event, &Thread::wait_event_); constexpr int64_t BUF_LEN = 64; char wait_event[BUF_LEN]; ObAddr addr; @@ -1984,14 +1985,14 @@ int dump_thread_info(lua_State *L) if ((ret = snprintf(wait_event, BUF_LEN, "rpc to ")) > 0) { IGNORE_RETURN addr.to_string(wait_event + ret, BUF_LEN - ret); } - } else if (0 != (is_blocking & Thread::WAIT_IN_TENANT_QUEUE)) { + } else if (0 != blocking_ts && (0 != (Thread::WAIT_IN_TENANT_QUEUE & event))) { IGNORE_RETURN snprintf(wait_event, BUF_LEN, "tenant worker request"); - } else if (0 != (is_blocking & Thread::WAIT_FOR_IO_EVENT)) { + } else if (0 != blocking_ts && (0 != (Thread::WAIT_FOR_IO_EVENT & event))) { IGNORE_RETURN snprintf(wait_event, BUF_LEN, "IO events"); - } else if (0 != (is_blocking & Thread::WAIT_FOR_TRANS_RETRY)) { - IGNORE_RETURN snprintf(wait_event, 64, "trans retry"); } else if (0 != sleep_us) { IGNORE_RETURN snprintf(wait_event, BUF_LEN, "%ld us", sleep_us); + } else if (0 != blocking_ts) { + IGNORE_RETURN snprintf(wait_event, BUF_LEN, "%ld us", common::ObTimeUtility::fast_current_time() - blocking_ts); } gen.next_column(wait_event); } diff --git a/src/observer/ob_signal_handle.cpp b/src/observer/ob_signal_handle.cpp index 74444255c..ca2c5c04a 100644 --- a/src/observer/ob_signal_handle.cpp +++ b/src/observer/ob_signal_handle.cpp @@ -51,9 +51,10 @@ void ObSignalHandle::run1() //to check _stop every second struct timespec timeout = {1, 0}; while (!has_set_stop()) {//need not to check ret - oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT; - signum = sigtimedwait(&waitset, NULL, &timeout); - oceanbase::lib::Thread::is_blocking_ = 0; + { + oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT); + signum = sigtimedwait(&waitset, NULL, &timeout); + } if (-1 == signum) { //do not log error, because timeout will also return -1. } else if (OB_FAIL(deal_signals(signum))) { diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index db3859e19..2a64cc1dc 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -374,6 +374,7 @@ void ObResourceGroup::check_worker_count() int ret = OB_SUCCESS; if (OB_SUCC(workers_lock_.trylock())) { int64_t token = 1; + int64_t now = ObTimeUtility::current_time(); bool enable_dynamic_worker = true; { ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_->id())); @@ -385,31 +386,30 @@ void ObResourceGroup::check_worker_count() workers_.remove(wnode); destroy_worker(w); } else if (w->has_req_flag() - && w->is_blocking() + && w->blocking_ts() - now >= EXPAND_INTERVAL && enable_dynamic_worker) { ++token; } } int64_t succ_num = 0L; - int64_t now = 0; token = std::max(token, min_worker_cnt()); token = std::min(token, max_worker_cnt()); if (OB_UNLIKELY(workers_.get_size() < min_worker_cnt())) { const auto diff = min_worker_cnt() - workers_.get_size(); - token_change_ts_ = ObTimeUtility::current_time(); + token_change_ts_ = now; ATOMIC_STORE(&shrink_, false); acquire_more_worker(diff, succ_num); LOG_INFO("worker thread created", K(tenant_->id()), K(group_id_), K(token)); } else if (OB_UNLIKELY(token > workers_.get_size()) && OB_LIKELY(ObMallocAllocator::get_instance()->get_tenant_remain(tenant_->id()) > ObMallocAllocator::get_instance()->get_tenant_limit(tenant_->id()) * 0.05)) { ATOMIC_STORE(&shrink_, false); - if (OB_LIKELY((now = ObTimeUtility::current_time()) - token_change_ts_ >= EXPAND_INTERVAL)) { + if (OB_LIKELY(now - token_change_ts_ >= EXPAND_INTERVAL)) { token_change_ts_ = now; acquire_more_worker(1, succ_num); LOG_INFO("worker thread created", K(tenant_->id()), K(group_id_), K(token)); } } else if (OB_UNLIKELY(token < workers_.get_size()) - && OB_LIKELY((now = ObTimeUtility::current_time()) - token_change_ts_ >= SHRINK_INTERVAL)) { + && OB_LIKELY(now - token_change_ts_ >= SHRINK_INTERVAL)) { token_change_ts_ = now; ATOMIC_STORE(&shrink_, true); LOG_INFO("worker thread began to shrink", K(tenant_->id()), K(group_id_), K(token)); @@ -1034,7 +1034,7 @@ int ObTenant::get_new_request( ObLink* task = nullptr; req = nullptr; - Thread::is_blocking_ |= Thread::WAIT_IN_TENANT_QUEUE; + Thread::WaitGuard guard(Thread::WAIT_IN_TENANT_QUEUE); if (w.is_group_worker()) { w.set_large_query(false); w.set_curr_request_level(0); @@ -1367,6 +1367,7 @@ void ObTenant::check_worker_count() int ret = OB_SUCCESS; if (OB_SUCC(workers_lock_.trylock())) { int64_t token = 3; + int64_t now = ObTimeUtility::current_time(); bool enable_dynamic_worker = true; { ObTenantConfigGuard tenant_config(TENANT_CONF(id_)); @@ -1379,32 +1380,31 @@ void ObTenant::check_worker_count() workers_.remove(wnode); destroy_worker(w); } else if (w->has_req_flag() - && w->is_blocking() + && w->blocking_ts() - now >= EXPAND_INTERVAL && w->is_default_worker() && enable_dynamic_worker) { ++token; } } int64_t succ_num = 0L; - int64_t now = 0; token = std::max(token, min_worker_cnt()); token = std::min(token, max_worker_cnt()); if (OB_UNLIKELY(workers_.get_size() < min_worker_cnt())) { const auto diff = min_worker_cnt() - workers_.get_size(); - token_change_ts_ = ObTimeUtility::current_time(); + token_change_ts_ = now; ATOMIC_STORE(&shrink_, false); acquire_more_worker(diff, succ_num); LOG_INFO("worker thread created", K(id_), K(token)); } else if (OB_UNLIKELY(token > workers_.get_size()) && OB_LIKELY(ObMallocAllocator::get_instance()->get_tenant_remain(id_) > ObMallocAllocator::get_instance()->get_tenant_limit(id_) * 0.05)) { ATOMIC_STORE(&shrink_, false); - if (OB_LIKELY((now = ObTimeUtility::current_time()) - token_change_ts_ >= EXPAND_INTERVAL)) { + if (OB_LIKELY(now - token_change_ts_ >= EXPAND_INTERVAL)) { token_change_ts_ = now; acquire_more_worker(1, succ_num); LOG_INFO("worker thread created", K(id_), K(token)); } } else if (OB_UNLIKELY(token < workers_.get_size()) - && OB_LIKELY((now = ObTimeUtility::current_time()) - token_change_ts_ >= SHRINK_INTERVAL)) { + && OB_LIKELY(now - token_change_ts_ >= SHRINK_INTERVAL)) { token_change_ts_ = now; ATOMIC_STORE(&shrink_, true); LOG_INFO("worker thread began to shrink", K(id_), K(token)); diff --git a/src/observer/omt/ob_th_worker.cpp b/src/observer/omt/ob_th_worker.cpp index 85174e405..b87eab403 100644 --- a/src/observer/omt/ob_th_worker.cpp +++ b/src/observer/omt/ob_th_worker.cpp @@ -107,7 +107,7 @@ ObThWorker::ObThWorker() priority_limit_(RQ_LOW), is_lq_yield_(false), query_start_time_(0), last_check_time_(0), can_retry_(true), need_retry_(false), - has_add_to_cgroup_(false), last_wakeup_ts_(0), is_blocking_(nullptr), ru_cputime_(0), + has_add_to_cgroup_(false), last_wakeup_ts_(0), blocking_ts_(nullptr), ru_cputime_(0), idle_us_(0) { } @@ -318,7 +318,7 @@ void ObThWorker::worker(int64_t &tenant_id, int64_t &req_recv_timestamp, int32_t int64_t wait_start_time = 0; int64_t wait_end_time = 0; procor_.th_created(); - is_blocking_ = &Thread::is_blocking_; + blocking_ts_ = &Thread::blocking_ts_; ObTLTaGuard ta_guard(tenant_->id()); // Avoid adding and deleting entities from the root node for every request, the parameters are meaningless diff --git a/src/observer/omt/ob_th_worker.h b/src/observer/omt/ob_th_worker.h index d6869e4d2..1c9700499 100644 --- a/src/observer/omt/ob_th_worker.h +++ b/src/observer/omt/ob_th_worker.h @@ -101,7 +101,7 @@ public: OB_INLINE void set_lq_yield(bool v=true) { is_lq_yield_ = v; } OB_INLINE int64_t get_last_wakeup_ts() { return last_wakeup_ts_; } OB_INLINE void set_last_wakeup_ts(int64_t last_wakeup_ts) { last_wakeup_ts_ = last_wakeup_ts; } - OB_INLINE bool is_blocking() const { return OB_NOT_NULL(is_blocking_) && (0 != *is_blocking_); } + OB_INLINE int64_t blocking_ts() const { return OB_NOT_NULL(blocking_ts_) ? (*blocking_ts_) : 0; } private: void set_th_worker_thread_name(); @@ -134,7 +134,7 @@ private: bool has_add_to_cgroup_; int64_t last_wakeup_ts_; - uint8_t* is_blocking_; + int64_t* blocking_ts_; int64_t ru_cputime_; int64_t idle_us_; private: diff --git a/src/observer/virtual_table/ob_all_virtual_thread.cpp b/src/observer/virtual_table/ob_all_virtual_thread.cpp index 0d3a2640f..02dd40af3 100644 --- a/src/observer/virtual_table/ob_all_virtual_thread.cpp +++ b/src/observer/virtual_table/ob_all_virtual_thread.cpp @@ -74,7 +74,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) GET_OTHER_TSI_ADDR(wait_addr, &ObLatch::current_wait); GET_OTHER_TSI_ADDR(join_addr, &Thread::thread_joined_); GET_OTHER_TSI_ADDR(sleep_us, &Thread::sleep_us_); - GET_OTHER_TSI_ADDR(is_blocking, &Thread::is_blocking_); + GET_OTHER_TSI_ADDR(blocking_ts, &Thread::blocking_ts_); for (int64_t i = 0; i < col_count && OB_SUCC(ret); ++i) { const uint64_t col_id = output_column_ids_.at(i); ObObj *cells = cur_row_.cells_; @@ -112,7 +112,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) status_str = "Join"; } else if (0 != sleep_us) { status_str = "Sleep"; - } else if (0 != is_blocking) { + } else if (0 != blocking_ts) { status_str = "Wait"; } else { status_str = "Run"; @@ -124,6 +124,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) } case WAIT_EVENT: { GET_OTHER_TSI_ADDR(rpc_dest_addr, &Thread::rpc_dest_addr_); + GET_OTHER_TSI_ADDR(event, &Thread::wait_event_); ObAddr addr; struct iovec local_iov = {&addr, sizeof(ObAddr)}; struct iovec remote_iov = {thread_base + rpc_dest_addr_offset, sizeof(ObAddr)}; @@ -147,14 +148,14 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) if ((ret = snprintf(wait_event_, 64, "rpc to ")) > 0) { IGNORE_RETURN addr.to_string(wait_event_ + ret, 64 - ret); } - } else if (0 != (is_blocking & Thread::WAIT_IN_TENANT_QUEUE)) { + } else if (0 != blocking_ts && (0 != (Thread::WAIT_IN_TENANT_QUEUE & event))) { IGNORE_RETURN snprintf(wait_event_, 64, "tenant worker requests"); - } else if (0 != (is_blocking & Thread::WAIT_FOR_IO_EVENT)) { + } else if (0 != blocking_ts && (0 != (Thread::WAIT_FOR_IO_EVENT & event))) { IGNORE_RETURN snprintf(wait_event_, 64, "IO events"); - } else if (0 != (is_blocking & Thread::WAIT_FOR_TRANS_RETRY)) { - IGNORE_RETURN snprintf(wait_event_, 64, "trans retry"); } else if (0 != sleep_us) { IGNORE_RETURN snprintf(wait_event_, 64, "%ld us", sleep_us); + } else if (0 != blocking_ts) { + IGNORE_RETURN snprintf(wait_event_, 64, "%ld us", common::ObTimeUtility::fast_current_time() - blocking_ts); } cells[i].set_varchar(wait_event_); cells[i].set_collation_type( diff --git a/src/share/ob_local_device.cpp b/src/share/ob_local_device.cpp index 54061d868..fdeed1e84 100644 --- a/src/share/ob_local_device.cpp +++ b/src/share/ob_local_device.cpp @@ -1213,14 +1213,15 @@ int ObLocalDevice::io_getevents( SHARE_LOG(WARN, "Invalid io context pointer, ", K(ret), KP(io_context)); } else { int sys_ret = 0; - oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT_FOR_IO_EVENT; - while ((sys_ret = ::io_getevents( - local_io_context->io_context_, - min_nr, - local_io_events->max_event_cnt_, - local_io_events->io_events_, - timeout)) < 0 && -EINTR == sys_ret); // ignore EINTR - oceanbase::lib::Thread::is_blocking_ = 0; + { + oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_IO_EVENT); + while ((sys_ret = ::io_getevents( + local_io_context->io_context_, + min_nr, + local_io_events->max_event_cnt_, + local_io_events->io_events_, + timeout)) < 0 && -EINTR == sys_ret); // ignore EINTR + } if (sys_ret < 0) { ret = OB_IO_ERROR; SHARE_LOG(WARN, "Fail to get io events, ", K(ret), K(sys_ret), KERRMSG);