From ae0b797794d1cfc903641d67cd02227e9c5909a9 Mon Sep 17 00:00:00 2001 From: nroskill Date: Fri, 26 May 2023 01:41:11 +0000 Subject: [PATCH] fix wrong addr in rpc wait --- deps/oblib/src/lib/thread/thread.h | 1 + src/diagnose/lua/ob_lua_api.cpp | 57 ++++++++++--------- .../virtual_table/ob_all_virtual_thread.cpp | 49 +++++++++------- 3 files changed, 59 insertions(+), 48 deletions(-) diff --git a/deps/oblib/src/lib/thread/thread.h b/deps/oblib/src/lib/thread/thread.h index a019d081e8..5e244013ed 100644 --- a/deps/oblib/src/lib/thread/thread.h +++ b/deps/oblib/src/lib/thread/thread.h @@ -64,6 +64,7 @@ public: static constexpr uint8_t WAIT = (1 << 0); static constexpr uint8_t WAIT_IN_TENANT_QUEUE = (1 << 1); static constexpr uint8_t WAIT_FOR_IO_EVENT = (1 << 2); + static constexpr uint8_t WAIT_FOR_TRANS_RETRY = (1 << 3); // for thread diagnose, maybe replace it with union later. static thread_local int64_t loop_ts_; static thread_local pthread_t thread_joined_; diff --git a/src/diagnose/lua/ob_lua_api.cpp b/src/diagnose/lua/ob_lua_api.cpp index e39dd39a2d..f79454f518 100644 --- a/src/diagnose/lua/ob_lua_api.cpp +++ b/src/diagnose/lua/ob_lua_api.cpp @@ -1846,9 +1846,10 @@ int select_schema_slot(lua_State *L) return 1; } -#define GET_OTHER_TSI_ADDR(type, var_name, addr) \ +#define GET_OTHER_TSI_ADDR(var_name, addr) \ const int64_t var_name##_offset = ((int64_t)addr - (int64_t)pthread_self()); \ -type var_name = *(type*)(thread_base + var_name##_offset); +decltype(*addr) var_name = *(decltype(addr))(thread_base + var_name##_offset); + // list{list, list...} = dump_threads_info() int dump_thread_info(lua_State *L) { @@ -1875,12 +1876,11 @@ int dump_thread_info(lua_State *L) // avoid SMART_CALL stack gen.next_row(); // tname - GET_OTHER_TSI_ADDR(char*, tname, ob_get_tname()); + GET_OTHER_TSI_ADDR(tname, &(ob_get_tname()[0])); // PAY ATTENTION HERE - tname = thread_base + tname_offset; - gen.next_column(tname); + gen.next_column(thread_base + tname_offset); // tid - GET_OTHER_TSI_ADDR(int64_t, tid, &get_tid_cache()); + GET_OTHER_TSI_ADDR(tid, &get_tid_cache()); gen.next_column(tid); // thread_base { @@ -1889,26 +1889,26 @@ int dump_thread_info(lua_State *L) gen.next_column(addr); } // loop_ts - GET_OTHER_TSI_ADDR(int64_t, loop_ts, &oceanbase::lib::Thread::loop_ts_); + GET_OTHER_TSI_ADDR(loop_ts, &oceanbase::lib::Thread::loop_ts_); gen.next_column(loop_ts); // latch_hold { char addrs[256]; - GET_OTHER_TSI_ADDR(uint32_t**, locks_addr, &ObLatch::current_locks); - GET_OTHER_TSI_ADDR(int8_t, slot_cnt, &ObLatch::max_lock_slot_idx) + GET_OTHER_TSI_ADDR(locks_addr, &(ObLatch::current_locks[0])); + GET_OTHER_TSI_ADDR(slot_cnt, &ObLatch::max_lock_slot_idx) const int64_t cnt = std::min(ARRAYSIZEOF(ObLatch::current_locks), (int64_t)slot_cnt); - locks_addr = (uint32_t**)(thread_base + locks_addr_offset); + decltype(&locks_addr) locks = (decltype(&locks_addr))(thread_base + locks_addr_offset); addrs[0] = 0; for (int64_t i = 0, j = 0; i < cnt; ++i) { int64_t idx = (slot_cnt + i) % ARRAYSIZEOF(ObLatch::current_locks); - if (OB_NOT_NULL(locks_addr[idx]) && j < 256) { + if (OB_NOT_NULL(locks[idx]) && j < 256) { bool has_segv = false; uint32_t val = 0; do_with_crash_restore([&] { - val = *locks_addr[idx]; + val = *locks[idx]; }, has_segv); if (!has_segv && 0 != val) { - j += snprintf(addrs + j, 256 - j, "%p ", locks_addr[idx]); + j += snprintf(addrs + j, 256 - j, "%p ", locks[idx]); } } } @@ -1919,7 +1919,7 @@ int dump_thread_info(lua_State *L) } } // latch_wait - GET_OTHER_TSI_ADDR(uint32_t*, wait_addr, &ObLatch::current_wait); + GET_OTHER_TSI_ADDR(wait_addr, &ObLatch::current_wait); if (OB_NOT_NULL(wait_addr)) { char addr[32]; snprintf(addr, 32, "%p", wait_addr); @@ -1929,16 +1929,16 @@ int dump_thread_info(lua_State *L) } // trace_id { - GET_OTHER_TSI_ADDR(ObCurTraceId::TraceId, trace_id, ObCurTraceId::get_trace_id()); + GET_OTHER_TSI_ADDR(trace_id, ObCurTraceId::get_trace_id()); char trace_id_buf[40]; IGNORE_RETURN trace_id.to_string(trace_id_buf, 40); gen.next_column(trace_id_buf); } // status - GET_OTHER_TSI_ADDR(uint8_t, is_blocking, &Thread::is_blocking_); + GET_OTHER_TSI_ADDR(is_blocking, &Thread::is_blocking_); { - GET_OTHER_TSI_ADDR(pthread_t, join_addr, &Thread::thread_joined_); - GET_OTHER_TSI_ADDR(int64_t, sleep_us, &Thread::sleep_us_); + GET_OTHER_TSI_ADDR(join_addr, &Thread::thread_joined_); + GET_OTHER_TSI_ADDR(sleep_us, &Thread::sleep_us_); const char* status_str = nullptr; if (0 != join_addr) { status_str = "Join"; @@ -1953,17 +1953,15 @@ int dump_thread_info(lua_State *L) } // wait_event { - GET_OTHER_TSI_ADDR(uint32_t*, wait_addr, &ObLatch::current_wait); - GET_OTHER_TSI_ADDR(pthread_t, join_addr, &Thread::thread_joined_); - GET_OTHER_TSI_ADDR(int64_t, sleep_us, &Thread::sleep_us_); - GET_OTHER_TSI_ADDR(char*, rpc_dest_addr, &Thread::rpc_dest_addr_); + GET_OTHER_TSI_ADDR(wait_addr, &ObLatch::current_wait); + GET_OTHER_TSI_ADDR(join_addr, &Thread::thread_joined_); + GET_OTHER_TSI_ADDR(sleep_us, &Thread::sleep_us_); + GET_OTHER_TSI_ADDR(rpc_dest_addr, &Thread::rpc_dest_addr_); constexpr int64_t BUF_LEN = 64; char wait_event[BUF_LEN]; wait_event[0] = '\0'; if (0 != join_addr) { IGNORE_RETURN snprintf(wait_event, BUF_LEN, "thread %u %ld", *(uint32_t*)(thread_base + tid_offset), tid_offset); - } else if (0 != sleep_us) { - IGNORE_RETURN snprintf(wait_event, BUF_LEN, "%ld us", sleep_us); } else if (OB_NOT_NULL(wait_addr)) { bool has_segv = false; uint32_t val = 0; @@ -1976,15 +1974,22 @@ int dump_thread_info(lua_State *L) } else { IGNORE_RETURN snprintf(wait_event, BUF_LEN, "%u rdlocks", val & 0x3fffffff); } - } else if (OB_NOT_NULL(rpc_dest_addr)) { + } else if (rpc_dest_addr.is_valid()) { bool has_segv = false; do_with_crash_restore([&] { - IGNORE_RETURN snprintf(wait_event, BUF_LEN, "rpc to %s", rpc_dest_addr); + int ret = snprintf(wait_event, BUF_LEN, "rpc to "); + if (ret > 0) { + IGNORE_RETURN rpc_dest_addr.to_string(wait_event + ret, BUF_LEN - ret); + } }, has_segv); } else if (0 != (is_blocking & Thread::WAIT_IN_TENANT_QUEUE)) { IGNORE_RETURN snprintf(wait_event, BUF_LEN, "tenant worker request"); } else if (0 != (is_blocking & Thread::WAIT_FOR_IO_EVENT)) { IGNORE_RETURN snprintf(wait_event, BUF_LEN, "IO events"); + } else if (0 != (is_blocking & Thread::WAIT_FOR_TRANS_RETRY)) { + IGNORE_RETURN snprintf(wait_event, 64, "trans retry"); + } else if (0 != sleep_us) { + IGNORE_RETURN snprintf(wait_event, BUF_LEN, "%ld us", sleep_us); } gen.next_column(wait_event); } diff --git a/src/observer/virtual_table/ob_all_virtual_thread.cpp b/src/observer/virtual_table/ob_all_virtual_thread.cpp index f023edf4f9..2b79f2967c 100644 --- a/src/observer/virtual_table/ob_all_virtual_thread.cpp +++ b/src/observer/virtual_table/ob_all_virtual_thread.cpp @@ -14,9 +14,9 @@ #include "lib/signal/ob_signal_utils.h" #include "lib/thread/protected_stack_allocator.h" -#define GET_OTHER_TSI_ADDR(type, var_name, addr) \ +#define GET_OTHER_TSI_ADDR(var_name, addr) \ const int64_t var_name##_offset = ((int64_t)addr - (int64_t)pthread_self()); \ -type var_name = *(type*)(thread_base + var_name##_offset); +decltype(*addr) var_name = *(decltype(addr))(thread_base + var_name##_offset); namespace oceanbase { @@ -57,7 +57,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) for (auto* header = *guard; OB_NOT_NULL(header); header = guard.next()) { auto* thread_base = (char*)(header->pth_); if (OB_NOT_NULL(thread_base)) { - GET_OTHER_TSI_ADDR(int64_t, tid, &get_tid_cache()); + GET_OTHER_TSI_ADDR(tid, &get_tid_cache()); { char path[64]; IGNORE_RETURN snprintf(path, 64, "/proc/self/task/%ld", tid); @@ -66,15 +66,15 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) continue; } } - GET_OTHER_TSI_ADDR(uint64_t, tenant_id, &ob_get_tenant_id()); + GET_OTHER_TSI_ADDR(tenant_id, &ob_get_tenant_id()); if (!is_sys_tenant(effective_tenant_id_) && tenant_id != effective_tenant_id_) { continue; } - GET_OTHER_TSI_ADDR(uint32_t*, wait_addr, &ObLatch::current_wait); - GET_OTHER_TSI_ADDR(pthread_t, join_addr, &Thread::thread_joined_); - GET_OTHER_TSI_ADDR(int64_t, sleep_us, &Thread::sleep_us_); - GET_OTHER_TSI_ADDR(uint8_t, is_blocking, &Thread::is_blocking_); + GET_OTHER_TSI_ADDR(wait_addr, &ObLatch::current_wait); + GET_OTHER_TSI_ADDR(join_addr, &Thread::thread_joined_); + GET_OTHER_TSI_ADDR(sleep_us, &Thread::sleep_us_); + GET_OTHER_TSI_ADDR(is_blocking, &Thread::is_blocking_); for (int64_t i = 0; i < col_count && OB_SUCC(ret); ++i) { const uint64_t col_id = output_column_ids_.at(i); ObObj *cells = cur_row_.cells_; @@ -98,7 +98,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) break; } case TNAME: { - GET_OTHER_TSI_ADDR(char*, tname, ob_get_tname()); + GET_OTHER_TSI_ADDR(tname, &(ob_get_tname()[0])); // PAY ATTENTION HERE MEMCPY(tname_, thread_base + tname_offset, sizeof(tname_)); cells[i].set_varchar(tname_); @@ -123,12 +123,10 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) break; } case WAIT_EVENT: { - GET_OTHER_TSI_ADDR(char*, rpc_dest_addr, &Thread::rpc_dest_addr_); + GET_OTHER_TSI_ADDR(rpc_dest_addr, &Thread::rpc_dest_addr_); wait_event_[0] = '\0'; if (0 != join_addr) { IGNORE_RETURN snprintf(wait_event_, 64, "thread %u", *(uint32_t*)(thread_base + tid_offset)); - } else if (0 != sleep_us) { - IGNORE_RETURN snprintf(wait_event_, 64, "%ld us", sleep_us); } else if (OB_NOT_NULL(wait_addr)) { bool has_segv = false; uint32_t val = 0; @@ -141,15 +139,22 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) } else { IGNORE_RETURN snprintf(wait_event_, 64, "%u rdlocks", val & 0x3fffffff); } - } else if (OB_NOT_NULL(rpc_dest_addr)) { + } else if (rpc_dest_addr.is_valid()) { bool has_segv = false; do_with_crash_restore([&] { - IGNORE_RETURN snprintf(wait_event_, 64, "rpc to %s", rpc_dest_addr); + int ret = snprintf(wait_event_, 64, "rpc to "); + if (ret > 0) { + IGNORE_RETURN rpc_dest_addr.to_string(wait_event_ + ret, 64 - ret); + } }, has_segv); } else if (0 != (is_blocking & Thread::WAIT_IN_TENANT_QUEUE)) { IGNORE_RETURN snprintf(wait_event_, 64, "tenant worker requests"); } else if (0 != (is_blocking & Thread::WAIT_FOR_IO_EVENT)) { IGNORE_RETURN snprintf(wait_event_, 64, "IO events"); + } else if (0 != (is_blocking & Thread::WAIT_FOR_TRANS_RETRY)) { + IGNORE_RETURN snprintf(wait_event_, 64, "trans retry"); + } else if (0 != sleep_us) { + IGNORE_RETURN snprintf(wait_event_, 64, "%ld us", sleep_us); } cells[i].set_varchar(wait_event_); cells[i].set_collation_type( @@ -168,21 +173,21 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) break; } case LATCH_HOLD: { - GET_OTHER_TSI_ADDR(uint32_t**, locks_addr, &ObLatch::current_locks); - GET_OTHER_TSI_ADDR(int8_t, slot_cnt, &ObLatch::max_lock_slot_idx) + GET_OTHER_TSI_ADDR(locks_addr, &(ObLatch::current_locks[0])); + GET_OTHER_TSI_ADDR(slot_cnt, &ObLatch::max_lock_slot_idx) const int64_t cnt = std::min(ARRAYSIZEOF(ObLatch::current_locks), (int64_t)slot_cnt); - locks_addr = (uint32_t**)(thread_base + locks_addr_offset); + decltype(&locks_addr) locks = (decltype(&locks_addr))(thread_base + locks_addr_offset); locks_addr_[0] = 0; for (int64_t i = 0, j = 0; i < cnt; ++i) { int64_t idx = (slot_cnt + i) % ARRAYSIZEOF(ObLatch::current_locks); - if (OB_NOT_NULL(locks_addr[idx]) && j < 256) { + if (OB_NOT_NULL(locks[idx]) && j < 256) { bool has_segv = false; uint32_t val = 0; do_with_crash_restore([&] { - val = *locks_addr[idx]; + val = *locks[idx]; }, has_segv); if (!has_segv && 0 != val) { - j += snprintf(locks_addr_ + j, 256 - j, "%p ", locks_addr[idx]); + j += snprintf(locks_addr_ + j, 256 - j, "%p ", locks[idx]); } } } @@ -192,7 +197,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) break; } case TRACE_ID: { - GET_OTHER_TSI_ADDR(ObCurTraceId::TraceId, trace_id, ObCurTraceId::get_trace_id()); + GET_OTHER_TSI_ADDR(trace_id, ObCurTraceId::get_trace_id()); IGNORE_RETURN trace_id.to_string(trace_id_buf_, sizeof(trace_id_buf_)); cells[i].set_varchar(trace_id_buf_); cells[i].set_collation_type( @@ -200,7 +205,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row) break; } case LOOP_TS: { - GET_OTHER_TSI_ADDR(int64_t, loop_ts, &oceanbase::lib::Thread::loop_ts_); + GET_OTHER_TSI_ADDR(loop_ts, &oceanbase::lib::Thread::loop_ts_); cells[i].set_timestamp(loop_ts); break; }