fix wrong addr in rpc wait

This commit is contained in:
nroskill
2023-05-26 01:41:11 +00:00
committed by ob-robot
parent fa6703b8dd
commit ae0b797794
3 changed files with 59 additions and 48 deletions

View File

@ -64,6 +64,7 @@ public:
static constexpr uint8_t WAIT = (1 << 0);
static constexpr uint8_t WAIT_IN_TENANT_QUEUE = (1 << 1);
static constexpr uint8_t WAIT_FOR_IO_EVENT = (1 << 2);
static constexpr uint8_t WAIT_FOR_TRANS_RETRY = (1 << 3);
// for thread diagnose, maybe replace it with union later.
static thread_local int64_t loop_ts_;
static thread_local pthread_t thread_joined_;

View File

@ -1846,9 +1846,10 @@ int select_schema_slot(lua_State *L)
return 1;
}
#define GET_OTHER_TSI_ADDR(type, var_name, addr) \
#define GET_OTHER_TSI_ADDR(var_name, addr) \
const int64_t var_name##_offset = ((int64_t)addr - (int64_t)pthread_self()); \
type var_name = *(type*)(thread_base + var_name##_offset);
decltype(*addr) var_name = *(decltype(addr))(thread_base + var_name##_offset);
// list{list, list...} = dump_threads_info()
int dump_thread_info(lua_State *L)
{
@ -1875,12 +1876,11 @@ int dump_thread_info(lua_State *L)
// avoid SMART_CALL stack
gen.next_row();
// tname
GET_OTHER_TSI_ADDR(char*, tname, ob_get_tname());
GET_OTHER_TSI_ADDR(tname, &(ob_get_tname()[0]));
// PAY ATTENTION HERE
tname = thread_base + tname_offset;
gen.next_column(tname);
gen.next_column(thread_base + tname_offset);
// tid
GET_OTHER_TSI_ADDR(int64_t, tid, &get_tid_cache());
GET_OTHER_TSI_ADDR(tid, &get_tid_cache());
gen.next_column(tid);
// thread_base
{
@ -1889,26 +1889,26 @@ int dump_thread_info(lua_State *L)
gen.next_column(addr);
}
// loop_ts
GET_OTHER_TSI_ADDR(int64_t, loop_ts, &oceanbase::lib::Thread::loop_ts_);
GET_OTHER_TSI_ADDR(loop_ts, &oceanbase::lib::Thread::loop_ts_);
gen.next_column(loop_ts);
// latch_hold
{
char addrs[256];
GET_OTHER_TSI_ADDR(uint32_t**, locks_addr, &ObLatch::current_locks);
GET_OTHER_TSI_ADDR(int8_t, slot_cnt, &ObLatch::max_lock_slot_idx)
GET_OTHER_TSI_ADDR(locks_addr, &(ObLatch::current_locks[0]));
GET_OTHER_TSI_ADDR(slot_cnt, &ObLatch::max_lock_slot_idx)
const int64_t cnt = std::min(ARRAYSIZEOF(ObLatch::current_locks), (int64_t)slot_cnt);
locks_addr = (uint32_t**)(thread_base + locks_addr_offset);
decltype(&locks_addr) locks = (decltype(&locks_addr))(thread_base + locks_addr_offset);
addrs[0] = 0;
for (int64_t i = 0, j = 0; i < cnt; ++i) {
int64_t idx = (slot_cnt + i) % ARRAYSIZEOF(ObLatch::current_locks);
if (OB_NOT_NULL(locks_addr[idx]) && j < 256) {
if (OB_NOT_NULL(locks[idx]) && j < 256) {
bool has_segv = false;
uint32_t val = 0;
do_with_crash_restore([&] {
val = *locks_addr[idx];
val = *locks[idx];
}, has_segv);
if (!has_segv && 0 != val) {
j += snprintf(addrs + j, 256 - j, "%p ", locks_addr[idx]);
j += snprintf(addrs + j, 256 - j, "%p ", locks[idx]);
}
}
}
@ -1919,7 +1919,7 @@ int dump_thread_info(lua_State *L)
}
}
// latch_wait
GET_OTHER_TSI_ADDR(uint32_t*, wait_addr, &ObLatch::current_wait);
GET_OTHER_TSI_ADDR(wait_addr, &ObLatch::current_wait);
if (OB_NOT_NULL(wait_addr)) {
char addr[32];
snprintf(addr, 32, "%p", wait_addr);
@ -1929,16 +1929,16 @@ int dump_thread_info(lua_State *L)
}
// trace_id
{
GET_OTHER_TSI_ADDR(ObCurTraceId::TraceId, trace_id, ObCurTraceId::get_trace_id());
GET_OTHER_TSI_ADDR(trace_id, ObCurTraceId::get_trace_id());
char trace_id_buf[40];
IGNORE_RETURN trace_id.to_string(trace_id_buf, 40);
gen.next_column(trace_id_buf);
}
// status
GET_OTHER_TSI_ADDR(uint8_t, is_blocking, &Thread::is_blocking_);
GET_OTHER_TSI_ADDR(is_blocking, &Thread::is_blocking_);
{
GET_OTHER_TSI_ADDR(pthread_t, join_addr, &Thread::thread_joined_);
GET_OTHER_TSI_ADDR(int64_t, sleep_us, &Thread::sleep_us_);
GET_OTHER_TSI_ADDR(join_addr, &Thread::thread_joined_);
GET_OTHER_TSI_ADDR(sleep_us, &Thread::sleep_us_);
const char* status_str = nullptr;
if (0 != join_addr) {
status_str = "Join";
@ -1953,17 +1953,15 @@ int dump_thread_info(lua_State *L)
}
// wait_event
{
GET_OTHER_TSI_ADDR(uint32_t*, wait_addr, &ObLatch::current_wait);
GET_OTHER_TSI_ADDR(pthread_t, join_addr, &Thread::thread_joined_);
GET_OTHER_TSI_ADDR(int64_t, sleep_us, &Thread::sleep_us_);
GET_OTHER_TSI_ADDR(char*, rpc_dest_addr, &Thread::rpc_dest_addr_);
GET_OTHER_TSI_ADDR(wait_addr, &ObLatch::current_wait);
GET_OTHER_TSI_ADDR(join_addr, &Thread::thread_joined_);
GET_OTHER_TSI_ADDR(sleep_us, &Thread::sleep_us_);
GET_OTHER_TSI_ADDR(rpc_dest_addr, &Thread::rpc_dest_addr_);
constexpr int64_t BUF_LEN = 64;
char wait_event[BUF_LEN];
wait_event[0] = '\0';
if (0 != join_addr) {
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "thread %u %ld", *(uint32_t*)(thread_base + tid_offset), tid_offset);
} else if (0 != sleep_us) {
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "%ld us", sleep_us);
} else if (OB_NOT_NULL(wait_addr)) {
bool has_segv = false;
uint32_t val = 0;
@ -1976,15 +1974,22 @@ int dump_thread_info(lua_State *L)
} else {
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "%u rdlocks", val & 0x3fffffff);
}
} else if (OB_NOT_NULL(rpc_dest_addr)) {
} else if (rpc_dest_addr.is_valid()) {
bool has_segv = false;
do_with_crash_restore([&] {
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "rpc to %s", rpc_dest_addr);
int ret = snprintf(wait_event, BUF_LEN, "rpc to ");
if (ret > 0) {
IGNORE_RETURN rpc_dest_addr.to_string(wait_event + ret, BUF_LEN - ret);
}
}, has_segv);
} else if (0 != (is_blocking & Thread::WAIT_IN_TENANT_QUEUE)) {
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "tenant worker request");
} else if (0 != (is_blocking & Thread::WAIT_FOR_IO_EVENT)) {
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "IO events");
} else if (0 != (is_blocking & Thread::WAIT_FOR_TRANS_RETRY)) {
IGNORE_RETURN snprintf(wait_event, 64, "trans retry");
} else if (0 != sleep_us) {
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "%ld us", sleep_us);
}
gen.next_column(wait_event);
}

View File

@ -14,9 +14,9 @@
#include "lib/signal/ob_signal_utils.h"
#include "lib/thread/protected_stack_allocator.h"
#define GET_OTHER_TSI_ADDR(type, var_name, addr) \
#define GET_OTHER_TSI_ADDR(var_name, addr) \
const int64_t var_name##_offset = ((int64_t)addr - (int64_t)pthread_self()); \
type var_name = *(type*)(thread_base + var_name##_offset);
decltype(*addr) var_name = *(decltype(addr))(thread_base + var_name##_offset);
namespace oceanbase
{
@ -57,7 +57,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
for (auto* header = *guard; OB_NOT_NULL(header); header = guard.next()) {
auto* thread_base = (char*)(header->pth_);
if (OB_NOT_NULL(thread_base)) {
GET_OTHER_TSI_ADDR(int64_t, tid, &get_tid_cache());
GET_OTHER_TSI_ADDR(tid, &get_tid_cache());
{
char path[64];
IGNORE_RETURN snprintf(path, 64, "/proc/self/task/%ld", tid);
@ -66,15 +66,15 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
continue;
}
}
GET_OTHER_TSI_ADDR(uint64_t, tenant_id, &ob_get_tenant_id());
GET_OTHER_TSI_ADDR(tenant_id, &ob_get_tenant_id());
if (!is_sys_tenant(effective_tenant_id_)
&& tenant_id != effective_tenant_id_) {
continue;
}
GET_OTHER_TSI_ADDR(uint32_t*, wait_addr, &ObLatch::current_wait);
GET_OTHER_TSI_ADDR(pthread_t, join_addr, &Thread::thread_joined_);
GET_OTHER_TSI_ADDR(int64_t, sleep_us, &Thread::sleep_us_);
GET_OTHER_TSI_ADDR(uint8_t, is_blocking, &Thread::is_blocking_);
GET_OTHER_TSI_ADDR(wait_addr, &ObLatch::current_wait);
GET_OTHER_TSI_ADDR(join_addr, &Thread::thread_joined_);
GET_OTHER_TSI_ADDR(sleep_us, &Thread::sleep_us_);
GET_OTHER_TSI_ADDR(is_blocking, &Thread::is_blocking_);
for (int64_t i = 0; i < col_count && OB_SUCC(ret); ++i) {
const uint64_t col_id = output_column_ids_.at(i);
ObObj *cells = cur_row_.cells_;
@ -98,7 +98,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
break;
}
case TNAME: {
GET_OTHER_TSI_ADDR(char*, tname, ob_get_tname());
GET_OTHER_TSI_ADDR(tname, &(ob_get_tname()[0]));
// PAY ATTENTION HERE
MEMCPY(tname_, thread_base + tname_offset, sizeof(tname_));
cells[i].set_varchar(tname_);
@ -123,12 +123,10 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
break;
}
case WAIT_EVENT: {
GET_OTHER_TSI_ADDR(char*, rpc_dest_addr, &Thread::rpc_dest_addr_);
GET_OTHER_TSI_ADDR(rpc_dest_addr, &Thread::rpc_dest_addr_);
wait_event_[0] = '\0';
if (0 != join_addr) {
IGNORE_RETURN snprintf(wait_event_, 64, "thread %u", *(uint32_t*)(thread_base + tid_offset));
} else if (0 != sleep_us) {
IGNORE_RETURN snprintf(wait_event_, 64, "%ld us", sleep_us);
} else if (OB_NOT_NULL(wait_addr)) {
bool has_segv = false;
uint32_t val = 0;
@ -141,15 +139,22 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
} else {
IGNORE_RETURN snprintf(wait_event_, 64, "%u rdlocks", val & 0x3fffffff);
}
} else if (OB_NOT_NULL(rpc_dest_addr)) {
} else if (rpc_dest_addr.is_valid()) {
bool has_segv = false;
do_with_crash_restore([&] {
IGNORE_RETURN snprintf(wait_event_, 64, "rpc to %s", rpc_dest_addr);
int ret = snprintf(wait_event_, 64, "rpc to ");
if (ret > 0) {
IGNORE_RETURN rpc_dest_addr.to_string(wait_event_ + ret, 64 - ret);
}
}, has_segv);
} else if (0 != (is_blocking & Thread::WAIT_IN_TENANT_QUEUE)) {
IGNORE_RETURN snprintf(wait_event_, 64, "tenant worker requests");
} else if (0 != (is_blocking & Thread::WAIT_FOR_IO_EVENT)) {
IGNORE_RETURN snprintf(wait_event_, 64, "IO events");
} else if (0 != (is_blocking & Thread::WAIT_FOR_TRANS_RETRY)) {
IGNORE_RETURN snprintf(wait_event_, 64, "trans retry");
} else if (0 != sleep_us) {
IGNORE_RETURN snprintf(wait_event_, 64, "%ld us", sleep_us);
}
cells[i].set_varchar(wait_event_);
cells[i].set_collation_type(
@ -168,21 +173,21 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
break;
}
case LATCH_HOLD: {
GET_OTHER_TSI_ADDR(uint32_t**, locks_addr, &ObLatch::current_locks);
GET_OTHER_TSI_ADDR(int8_t, slot_cnt, &ObLatch::max_lock_slot_idx)
GET_OTHER_TSI_ADDR(locks_addr, &(ObLatch::current_locks[0]));
GET_OTHER_TSI_ADDR(slot_cnt, &ObLatch::max_lock_slot_idx)
const int64_t cnt = std::min(ARRAYSIZEOF(ObLatch::current_locks), (int64_t)slot_cnt);
locks_addr = (uint32_t**)(thread_base + locks_addr_offset);
decltype(&locks_addr) locks = (decltype(&locks_addr))(thread_base + locks_addr_offset);
locks_addr_[0] = 0;
for (int64_t i = 0, j = 0; i < cnt; ++i) {
int64_t idx = (slot_cnt + i) % ARRAYSIZEOF(ObLatch::current_locks);
if (OB_NOT_NULL(locks_addr[idx]) && j < 256) {
if (OB_NOT_NULL(locks[idx]) && j < 256) {
bool has_segv = false;
uint32_t val = 0;
do_with_crash_restore([&] {
val = *locks_addr[idx];
val = *locks[idx];
}, has_segv);
if (!has_segv && 0 != val) {
j += snprintf(locks_addr_ + j, 256 - j, "%p ", locks_addr[idx]);
j += snprintf(locks_addr_ + j, 256 - j, "%p ", locks[idx]);
}
}
}
@ -192,7 +197,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
break;
}
case TRACE_ID: {
GET_OTHER_TSI_ADDR(ObCurTraceId::TraceId, trace_id, ObCurTraceId::get_trace_id());
GET_OTHER_TSI_ADDR(trace_id, ObCurTraceId::get_trace_id());
IGNORE_RETURN trace_id.to_string(trace_id_buf_, sizeof(trace_id_buf_));
cells[i].set_varchar(trace_id_buf_);
cells[i].set_collation_type(
@ -200,7 +205,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
break;
}
case LOOP_TS: {
GET_OTHER_TSI_ADDR(int64_t, loop_ts, &oceanbase::lib::Thread::loop_ts_);
GET_OTHER_TSI_ADDR(loop_ts, &oceanbase::lib::Thread::loop_ts_);
cells[i].set_timestamp(loop_ts);
break;
}