fix __all_virtual_thread result incorrect

This commit is contained in:
nroskill 2023-05-11 10:13:16 +00:00 committed by ob-robot
parent aaab6560a1
commit caaed5132f
14 changed files with 48 additions and 11 deletions

View File

@ -28,6 +28,7 @@ static const int64_t easy_monitor_signal = 34;
int ob_pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
void ob_set_thread_name(const char* type);
int64_t ob_update_loop_ts();
/**
* start
*/
@ -251,6 +252,7 @@ static void *easy_baseth_pool_monitor_func(void *args)
easy_info_log("monitor us :%ld sec :%f", us, sec);
while(tp->stoped == 0) {
ob_update_loop_ts();
usleep(us);
ev_tstamp now = ev_time();
easy_thread_pool_for_each(th, tp, 0) {

View File

@ -611,6 +611,7 @@ static inline int64_t current_time();
#endif
static void ( *syserr_cb)(const char *msg);
int64_t ob_update_loop_ts();
void
ev_set_syserr_cb (void (*cb)(const char *msg))
@ -2374,6 +2375,7 @@ static inline int64_t current_time();
EV_INVOKE_PENDING; /* in case we recurse, ensure ordering stays nice and clean */
do {
ob_update_loop_ts();
#if EV_VERIFY >= 2
ev_verify (EV_A);
#endif

View File

@ -17,6 +17,7 @@
#include "lib/lock/mutex.h"
#include "lib/time/ob_time_utility.h"
#include "lib/thread/ob_thread_name.h"
#include "lib/utility/utility.h"
using namespace oceanbase::lib;
@ -73,12 +74,13 @@ void ObClockGenerator::run1()
lib::set_thread_name("ClockGenerator");
while (!ready_) {
::usleep(SLEEP_US);
ob_usleep(SLEEP_US);
}
while (inited_) {
int64_t retry = 0;
int64_t cur_ts = 0;
int64_t delta = 0;
IGNORE_RETURN lib::Thread::update_loop_ts();
while (retry++ < MAX_RETRY) {
cur_ts = get_us();
delta = cur_ts - ATOMIC_LOAD(&cur_ts_);
@ -88,7 +90,7 @@ void ObClockGenerator::run1()
if (REACH_TIME_INTERVAL(PRINT_LOG_INTERVAL_US)) {
TRANS_LOG_RET(WARN, OB_ERR_SYS, "clock out of order", K(cur_ts), K(cur_ts_), K(delta));
}
::usleep(SLEEP_US);
ob_usleep(SLEEP_US);
}
}
if (delta < 0) {
@ -97,7 +99,7 @@ void ObClockGenerator::run1()
} else {
ATOMIC_STORE(&cur_ts_, cur_ts);
}
::usleep(SLEEP_US);
ob_usleep(SLEEP_US);
}
}

View File

@ -322,13 +322,13 @@ private:
#define SLEEP(time) \
do { \
oceanbase::common::ObWaitEventGuard wait_guard(oceanbase::common::ObWaitEventIds::DEFAULT_SLEEP, 0, 0, 0); \
oceanbase::common::ObSleepEventGuard wait_guard(oceanbase::common::ObWaitEventIds::DEFAULT_SLEEP, 0, ((int64_t)time) * 1000 * 1000); \
::sleep(time); \
} while (0)
#define USLEEP(time) \
do { \
oceanbase::common::ObWaitEventGuard wait_guard(oceanbase::common::ObWaitEventIds::DEFAULT_SLEEP, 0, 0, 0); \
oceanbase::common::ObSleepEventGuard wait_guard(oceanbase::common::ObWaitEventIds::DEFAULT_SLEEP, 0, (int64_t)time); \
::usleep(time); \
} while (0)

View File

@ -70,7 +70,7 @@ public:
ObSimpleThreadPool();
virtual ~ObSimpleThreadPool();
int init(const int64_t thread_num, const int64_t task_num_limit, const char *name = "unknonw", const uint64_t tenant_id = OB_SERVER_TENANT_ID);
int init(const int64_t thread_num, const int64_t task_num_limit, const char *name = "unknown", const uint64_t tenant_id = OB_SERVER_TENANT_ID);
void destroy();
int push(void *task);
int64_t get_queue_num() const { return queue_.size(); }

View File

@ -189,6 +189,11 @@ void ob_set_thread_name(const char* type)
::oceanbase::lib::set_thread_name(type);
}
int64_t ob_update_loop_ts()
{
return ::oceanbase::lib::Thread::update_loop_ts();
}
} /* extern "C" */
#endif /* _OCEABASE_TENANT_PRELOAD_H_ */

View File

@ -34,6 +34,7 @@ thread_local int64_t Thread::loop_ts_ = 0;
thread_local pthread_t Thread::thread_joined_ = 0;
thread_local int64_t Thread::sleep_us_ = 0;
thread_local bool Thread::is_blocking_ = false;
thread_local char* Thread::rpc_dest_addr_ = nullptr;
thread_local Thread* Thread::current_thread_ = nullptr;
int64_t Thread::total_thread_count_ = 0;

View File

@ -60,10 +60,12 @@ public:
return update_loop_ts(common::ObTimeUtility::fast_current_time());
}
public:
// for thread diagnose, maybe replace it with union later.
static thread_local int64_t loop_ts_;
static thread_local pthread_t thread_joined_;
static thread_local int64_t sleep_us_;
static thread_local bool is_blocking_;
static thread_local char* rpc_dest_addr_;
private:
static void* __th_start(void *th);
void destroy_stack();

View File

@ -435,12 +435,15 @@ ObPacket *ObReqTransport::send_session(easy_session_t *s) const
// Synchronous rpc always needs to return packets
s->unneed_response = false;
s->r.client_start_time = common::ObTimeUtility::current_time();
lib::Thread::update_loop_ts(s->r.client_start_time);
if (0 == s->addr.cidx) {
s->addr.cidx = balance_assign(s);
}
easy_inet_addr_to_str(&s->addr, buff, OB_SERVER_ADDR_STR_LEN);
lib::Thread::rpc_dest_addr_ = buff;
pkt = reinterpret_cast<ObPacket*>(easy_client_send(eio_, s->addr, s));
lib::Thread::rpc_dest_addr_ = nullptr;
if (NULL == pkt) {
SERVER_LOG(WARN, "send packet fail", "dst", buff, KP(s));
} else {

View File

@ -104,6 +104,7 @@ int eloop_thread_run(eloop_t** udata) {
int eloop_run(eloop_t* ep) {
while(true) {
int64_t epoll_timeout = 1000;
ob_update_loop_ts();
if (ep->ready_link.next != &ep->ready_link) {
epoll_timeout = 0; // make sure all events handled when progarm is blocked in epoll_ctl
}

View File

@ -9,6 +9,7 @@ extern void do_log(int level, const char* file, int line, const char* func, cons
extern void ob_set_thread_name(const char* type);
extern int ob_pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
extern int64_t ob_update_loop_ts();
extern log_func_t g_log_func;
extern int g_log_level;
enum { LOG_LEVEL_ERROR = 0, LOG_LEVEL_USER_LEVEL = 1, LOG_LEVEL_WARN = 2, LOG_LEVEL_INFO = 3, LOG_LEVEL_TRACE = 4, LOG_LEVEL_DEBUG = 5 };
@ -26,6 +27,10 @@ int ob_pthread_create(pthread_t *thread, const pthread_attr_t *attr,
{
return pthread_create(thread, attr, start_routine, arg);
}
int64_t ob_update_loop_ts()
{
return 0;
}
#endif
#define do_rk_log_macro(...) { format_reset(&g_log_fbuf); rk_log_macro(__VA_ARGS__); }
#define rk_error(...) do_rk_log_macro(ERROR, oceanbase::common::OB_ERR_SYS, ##__VA_ARGS__)

View File

@ -1947,12 +1947,14 @@ int dump_thread_info(lua_State *L)
GET_OTHER_TSI_ADDR(uint32_t*, wait_addr, &ObLatch::current_wait);
GET_OTHER_TSI_ADDR(pthread_t, join_addr, &Thread::thread_joined_);
GET_OTHER_TSI_ADDR(int64_t, sleep_us, &Thread::sleep_us_);
char wait_event[32];
GET_OTHER_TSI_ADDR(char*, rpc_dest_addr, &Thread::rpc_dest_addr_);
constexpr int64_t BUF_LEN = 64;
char wait_event[BUF_LEN];
wait_event[0] = '\0';
if (0 != join_addr) {
IGNORE_RETURN snprintf(wait_event, 32, "thread %u", *(uint32_t*)(thread_base + tid_offset));
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "thread %u %ld", *(uint32_t*)(thread_base + tid_offset), tid_offset);
} else if (0 != sleep_us) {
IGNORE_RETURN snprintf(wait_event, 32, "%ld us", sleep_us);
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "%ld us", sleep_us);
} else if (OB_NOT_NULL(wait_addr)) {
bool has_segv = false;
uint32_t val = 0;
@ -1961,10 +1963,15 @@ int dump_thread_info(lua_State *L)
}, has_segv);
if (has_segv) {
} else if (0 != (val & (1<<30))) {
IGNORE_RETURN snprintf(wait_event, 32, "wrlock on %u", val & 0x3fffffff);
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "wrlock on %u", val & 0x3fffffff);
} else {
IGNORE_RETURN snprintf(wait_event, 32, "%u rdlocks", val & 0x3fffffff);
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "%u rdlocks", val & 0x3fffffff);
}
} else if (OB_NOT_NULL(rpc_dest_addr)) {
bool has_segv = false;
do_with_crash_restore([&] {
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "rpc to %s", rpc_dest_addr);
}, has_segv);
}
gen.next_column(wait_event);
}

View File

@ -115,6 +115,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
break;
}
case WAIT_EVENT: {
GET_OTHER_TSI_ADDR(char*, rpc_dest_addr, &Thread::rpc_dest_addr_);
wait_event_[0] = '\0';
if (0 != join_addr) {
IGNORE_RETURN snprintf(wait_event_, 64, "thread %u", *(uint32_t*)(thread_base + tid_offset));
@ -132,6 +133,11 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
} else {
IGNORE_RETURN snprintf(wait_event_, 64, "%u rdlocks", val & 0x3fffffff);
}
} else if (OB_NOT_NULL(rpc_dest_addr)) {
bool has_segv = false;
do_with_crash_restore([&] {
IGNORE_RETURN snprintf(wait_event_, 64, "rpc to %s", rpc_dest_addr);
}, has_segv);
}
cells[i].set_varchar(wait_event_);
cells[i].set_collation_type(

View File

@ -81,6 +81,7 @@ private:
}
if (OB_FAIL(back_thread_->init_and_start([this]() {
while(true) {
IGNORE_RETURN lib::Thread::update_loop_ts();
if (!back_thread_->is_stopped()) {
for (int64_t idx = 0; idx < MAX_THREAD_NUM; ++idx) {
ClickPoint point = points_[idx].atomic_copy();