diff --git a/deps/easy/src/io/easy_baseth_pool.c b/deps/easy/src/io/easy_baseth_pool.c index ed4b0327c..3baf3701c 100644 --- a/deps/easy/src/io/easy_baseth_pool.c +++ b/deps/easy/src/io/easy_baseth_pool.c @@ -28,6 +28,7 @@ static const int64_t easy_monitor_signal = 34; int ob_pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); void ob_set_thread_name(const char* type); +int64_t ob_update_loop_ts(); /** * start */ @@ -251,6 +252,7 @@ static void *easy_baseth_pool_monitor_func(void *args) easy_info_log("monitor us :%ld sec :%f", us, sec); while(tp->stoped == 0) { + ob_update_loop_ts(); usleep(us); ev_tstamp now = ev_time(); easy_thread_pool_for_each(th, tp, 0) { diff --git a/deps/easy/src/io/ev.c b/deps/easy/src/io/ev.c index 50f901b4d..93a7b4a59 100644 --- a/deps/easy/src/io/ev.c +++ b/deps/easy/src/io/ev.c @@ -611,6 +611,7 @@ static inline int64_t current_time(); #endif static void ( *syserr_cb)(const char *msg); + int64_t ob_update_loop_ts(); void ev_set_syserr_cb (void (*cb)(const char *msg)) @@ -2374,6 +2375,7 @@ static inline int64_t current_time(); EV_INVOKE_PENDING; /* in case we recurse, ensure ordering stays nice and clean */ do { + ob_update_loop_ts(); #if EV_VERIFY >= 2 ev_verify (EV_A); #endif diff --git a/deps/oblib/src/common/ob_clock_generator.cpp b/deps/oblib/src/common/ob_clock_generator.cpp index 17e17af3f..124be9fd2 100644 --- a/deps/oblib/src/common/ob_clock_generator.cpp +++ b/deps/oblib/src/common/ob_clock_generator.cpp @@ -17,6 +17,7 @@ #include "lib/lock/mutex.h" #include "lib/time/ob_time_utility.h" #include "lib/thread/ob_thread_name.h" +#include "lib/utility/utility.h" using namespace oceanbase::lib; @@ -73,12 +74,13 @@ void ObClockGenerator::run1() lib::set_thread_name("ClockGenerator"); while (!ready_) { - ::usleep(SLEEP_US); + ob_usleep(SLEEP_US); } while (inited_) { int64_t retry = 0; int64_t cur_ts = 0; int64_t delta = 0; + IGNORE_RETURN lib::Thread::update_loop_ts(); while (retry++ < MAX_RETRY) { cur_ts = get_us(); delta = cur_ts - ATOMIC_LOAD(&cur_ts_); @@ -88,7 +90,7 @@ void ObClockGenerator::run1() if (REACH_TIME_INTERVAL(PRINT_LOG_INTERVAL_US)) { TRANS_LOG_RET(WARN, OB_ERR_SYS, "clock out of order", K(cur_ts), K(cur_ts_), K(delta)); } - ::usleep(SLEEP_US); + ob_usleep(SLEEP_US); } } if (delta < 0) { @@ -97,7 +99,7 @@ void ObClockGenerator::run1() } else { ATOMIC_STORE(&cur_ts_, cur_ts); } - ::usleep(SLEEP_US); + ob_usleep(SLEEP_US); } } diff --git a/deps/oblib/src/lib/stat/ob_diagnose_info.h b/deps/oblib/src/lib/stat/ob_diagnose_info.h index b0f386a88..d3b40aa3c 100644 --- a/deps/oblib/src/lib/stat/ob_diagnose_info.h +++ b/deps/oblib/src/lib/stat/ob_diagnose_info.h @@ -322,13 +322,13 @@ private: #define SLEEP(time) \ do { \ - oceanbase::common::ObWaitEventGuard wait_guard(oceanbase::common::ObWaitEventIds::DEFAULT_SLEEP, 0, 0, 0); \ + oceanbase::common::ObSleepEventGuard wait_guard(oceanbase::common::ObWaitEventIds::DEFAULT_SLEEP, 0, ((int64_t)time) * 1000 * 1000); \ ::sleep(time); \ } while (0) #define USLEEP(time) \ do { \ - oceanbase::common::ObWaitEventGuard wait_guard(oceanbase::common::ObWaitEventIds::DEFAULT_SLEEP, 0, 0, 0); \ + oceanbase::common::ObSleepEventGuard wait_guard(oceanbase::common::ObWaitEventIds::DEFAULT_SLEEP, 0, (int64_t)time); \ ::usleep(time); \ } while (0) diff --git a/deps/oblib/src/lib/thread/ob_simple_thread_pool.h b/deps/oblib/src/lib/thread/ob_simple_thread_pool.h index 7508800cd..8b7d81e98 100644 --- a/deps/oblib/src/lib/thread/ob_simple_thread_pool.h +++ b/deps/oblib/src/lib/thread/ob_simple_thread_pool.h @@ -70,7 +70,7 @@ public: ObSimpleThreadPool(); virtual ~ObSimpleThreadPool(); - int init(const int64_t thread_num, const int64_t task_num_limit, const char *name = "unknonw", const uint64_t tenant_id = OB_SERVER_TENANT_ID); + int init(const int64_t thread_num, const int64_t task_num_limit, const char *name = "unknown", const uint64_t tenant_id = OB_SERVER_TENANT_ID); void destroy(); int push(void *task); int64_t get_queue_num() const { return queue_.size(); } diff --git a/deps/oblib/src/lib/thread/ob_tenant_hook.cpp b/deps/oblib/src/lib/thread/ob_tenant_hook.cpp index b3d9922ab..989f724d1 100644 --- a/deps/oblib/src/lib/thread/ob_tenant_hook.cpp +++ b/deps/oblib/src/lib/thread/ob_tenant_hook.cpp @@ -189,6 +189,11 @@ void ob_set_thread_name(const char* type) ::oceanbase::lib::set_thread_name(type); } +int64_t ob_update_loop_ts() +{ + return ::oceanbase::lib::Thread::update_loop_ts(); +} + } /* extern "C" */ #endif /* _OCEABASE_TENANT_PRELOAD_H_ */ diff --git a/deps/oblib/src/lib/thread/thread.cpp b/deps/oblib/src/lib/thread/thread.cpp index 781929de9..338e2e95a 100644 --- a/deps/oblib/src/lib/thread/thread.cpp +++ b/deps/oblib/src/lib/thread/thread.cpp @@ -34,6 +34,7 @@ thread_local int64_t Thread::loop_ts_ = 0; thread_local pthread_t Thread::thread_joined_ = 0; thread_local int64_t Thread::sleep_us_ = 0; thread_local bool Thread::is_blocking_ = false; +thread_local char* Thread::rpc_dest_addr_ = nullptr; thread_local Thread* Thread::current_thread_ = nullptr; int64_t Thread::total_thread_count_ = 0; diff --git a/deps/oblib/src/lib/thread/thread.h b/deps/oblib/src/lib/thread/thread.h index 9f0f72a52..aa738a29f 100644 --- a/deps/oblib/src/lib/thread/thread.h +++ b/deps/oblib/src/lib/thread/thread.h @@ -60,10 +60,12 @@ public: return update_loop_ts(common::ObTimeUtility::fast_current_time()); } public: + // for thread diagnose, maybe replace it with union later. static thread_local int64_t loop_ts_; static thread_local pthread_t thread_joined_; static thread_local int64_t sleep_us_; static thread_local bool is_blocking_; + static thread_local char* rpc_dest_addr_; private: static void* __th_start(void *th); void destroy_stack(); diff --git a/deps/oblib/src/rpc/frame/ob_req_transport.cpp b/deps/oblib/src/rpc/frame/ob_req_transport.cpp index ed1d4ac04..d1a8cc474 100644 --- a/deps/oblib/src/rpc/frame/ob_req_transport.cpp +++ b/deps/oblib/src/rpc/frame/ob_req_transport.cpp @@ -435,12 +435,15 @@ ObPacket *ObReqTransport::send_session(easy_session_t *s) const // Synchronous rpc always needs to return packets s->unneed_response = false; s->r.client_start_time = common::ObTimeUtility::current_time(); + lib::Thread::update_loop_ts(s->r.client_start_time); if (0 == s->addr.cidx) { s->addr.cidx = balance_assign(s); } easy_inet_addr_to_str(&s->addr, buff, OB_SERVER_ADDR_STR_LEN); + lib::Thread::rpc_dest_addr_ = buff; pkt = reinterpret_cast(easy_client_send(eio_, s->addr, s)); + lib::Thread::rpc_dest_addr_ = nullptr; if (NULL == pkt) { SERVER_LOG(WARN, "send packet fail", "dst", buff, KP(s)); } else { diff --git a/deps/oblib/src/rpc/pnio/io/eloop.c b/deps/oblib/src/rpc/pnio/io/eloop.c index 4509f7f44..780c63153 100644 --- a/deps/oblib/src/rpc/pnio/io/eloop.c +++ b/deps/oblib/src/rpc/pnio/io/eloop.c @@ -104,6 +104,7 @@ int eloop_thread_run(eloop_t** udata) { int eloop_run(eloop_t* ep) { while(true) { int64_t epoll_timeout = 1000; + ob_update_loop_ts(); if (ep->ready_link.next != &ep->ready_link) { epoll_timeout = 0; // make sure all events handled when progarm is blocked in epoll_ctl } diff --git a/deps/oblib/src/rpc/pnio/r0/log.h b/deps/oblib/src/rpc/pnio/r0/log.h index 293986b50..9ade0281c 100644 --- a/deps/oblib/src/rpc/pnio/r0/log.h +++ b/deps/oblib/src/rpc/pnio/r0/log.h @@ -9,6 +9,7 @@ extern void do_log(int level, const char* file, int line, const char* func, cons extern void ob_set_thread_name(const char* type); extern int ob_pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); +extern int64_t ob_update_loop_ts(); extern log_func_t g_log_func; extern int g_log_level; enum { LOG_LEVEL_ERROR = 0, LOG_LEVEL_USER_LEVEL = 1, LOG_LEVEL_WARN = 2, LOG_LEVEL_INFO = 3, LOG_LEVEL_TRACE = 4, LOG_LEVEL_DEBUG = 5 }; @@ -26,6 +27,10 @@ int ob_pthread_create(pthread_t *thread, const pthread_attr_t *attr, { return pthread_create(thread, attr, start_routine, arg); } +int64_t ob_update_loop_ts() +{ + return 0; +} #endif #define do_rk_log_macro(...) { format_reset(&g_log_fbuf); rk_log_macro(__VA_ARGS__); } #define rk_error(...) do_rk_log_macro(ERROR, oceanbase::common::OB_ERR_SYS, ##__VA_ARGS__) diff --git a/src/diagnose/lua/ob_lua_api.cpp b/src/diagnose/lua/ob_lua_api.cpp index cfd6cf043..5e889701a 100644 --- a/src/diagnose/lua/ob_lua_api.cpp +++ b/src/diagnose/lua/ob_lua_api.cpp @@ -1947,12 +1947,14 @@ int dump_thread_info(lua_State *L) GET_OTHER_TSI_ADDR(uint32_t*, wait_addr, &ObLatch::current_wait); GET_OTHER_TSI_ADDR(pthread_t, join_addr, &Thread::thread_joined_); GET_OTHER_TSI_ADDR(int64_t, sleep_us, &Thread::sleep_us_); - char wait_event[32]; + GET_OTHER_TSI_ADDR(char*, rpc_dest_addr, &Thread::rpc_dest_addr_); + constexpr int64_t BUF_LEN = 64; + char wait_event[BUF_LEN]; wait_event[0] = '\0'; if (0 != join_addr) { - IGNORE_RETURN snprintf(wait_event, 32, "thread %u", *(uint32_t*)(thread_base + tid_offset)); + IGNORE_RETURN snprintf(wait_event, BUF_LEN, "thread %u %ld", *(uint32_t*)(thread_base + tid_offset), tid_offset); } else if (0 != sleep_us) { - IGNORE_RETURN snprintf(wait_event, 32, "%ld us", sleep_us); + IGNORE_RETURN snprintf(wait_event, BUF_LEN, "%ld us", sleep_us); } else if (OB_NOT_NULL(wait_addr)) { bool has_segv = false; uint32_t val = 0; @@ -1961,10 +1963,15 @@ int dump_thread_info(lua_State *L) }, has_segv); if (has_segv) { } else if (0 != (val & (1<<30))) { - IGNORE_RETURN snprintf(wait_event, 32, "wrlock on %u", val & 0x3fffffff); + IGNORE_RETURN snprintf(wait_event, BUF_LEN, "wrlock on %u", val & 0x3fffffff); } else { - IGNORE_RETURN snprintf(wait_event, 32, "%u rdlocks", val & 0x3fffffff); + IGNORE_RETURN snprintf(wait_event, BUF_LEN, "%u rdlocks", val & 0x3fffffff); } + } else if (OB_NOT_NULL(rpc_dest_addr)) { + bool has_segv = false; + do_with_crash_restore([&] { + IGNORE_RETURN snprintf(wait_event, BUF_LEN, "rpc to %s", rpc_dest_addr); + }, has_segv); } gen.next_column(wait_event); } diff --git a/src/observer/virtual_table/ob_all_virtual_thread.cpp b/src/observer/virtual_table/ob_all_virtual_thread.cpp index 6ee3081bc..367e10d2e 100644 --- a/src/observer/virtual_table/ob_all_virtual_thread.cpp +++ b/src/observer/virtual_table/ob_all_virtual_thread.cpp @@ -115,6 +115,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) break; } case WAIT_EVENT: { + GET_OTHER_TSI_ADDR(char*, rpc_dest_addr, &Thread::rpc_dest_addr_); wait_event_[0] = '\0'; if (0 != join_addr) { IGNORE_RETURN snprintf(wait_event_, 64, "thread %u", *(uint32_t*)(thread_base + tid_offset)); @@ -132,6 +133,11 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) } else { IGNORE_RETURN snprintf(wait_event_, 64, "%u rdlocks", val & 0x3fffffff); } + } else if (OB_NOT_NULL(rpc_dest_addr)) { + bool has_segv = false; + do_with_crash_restore([&] { + IGNORE_RETURN snprintf(wait_event_, 64, "rpc to %s", rpc_dest_addr); + }, has_segv); } cells[i].set_varchar(wait_event_); cells[i].set_collation_type( diff --git a/src/share/ob_occam_time_guard.h b/src/share/ob_occam_time_guard.h index 5005f2df4..903f59646 100644 --- a/src/share/ob_occam_time_guard.h +++ b/src/share/ob_occam_time_guard.h @@ -81,6 +81,7 @@ private: } if (OB_FAIL(back_thread_->init_and_start([this]() { while(true) { + IGNORE_RETURN lib::Thread::update_loop_ts(); if (!back_thread_->is_stopped()) { for (int64_t idx = 0; idx < MAX_THREAD_NUM; ++idx) { ClickPoint point = points_[idx].atomic_copy();