fix wrong status type in __all_virtual_thread
This commit is contained in:
parent
2054cbdabf
commit
b1bb0aefec
3
deps/easy/src/io/easy_baseth_pool.c
vendored
3
deps/easy/src/io/easy_baseth_pool.c
vendored
@ -29,6 +29,7 @@ int ob_pthread_create(pthread_t *thread, const pthread_attr_t *attr,
|
||||
void *(*start_routine) (void *), void *arg);
|
||||
void ob_set_thread_name(const char* type);
|
||||
int64_t ob_update_loop_ts();
|
||||
void ob_usleep(const useconds_t v);
|
||||
/**
|
||||
* start
|
||||
*/
|
||||
@ -253,7 +254,7 @@ static void *easy_baseth_pool_monitor_func(void *args)
|
||||
|
||||
while(tp->stoped == 0) {
|
||||
ob_update_loop_ts();
|
||||
usleep(us);
|
||||
ob_usleep(us);
|
||||
ev_tstamp now = ev_time();
|
||||
easy_thread_pool_for_each(th, tp, 0) {
|
||||
ev_tstamp last = th->lastrun;
|
||||
|
4
deps/easy/src/io/easy_maccept.c
vendored
4
deps/easy/src/io/easy_maccept.c
vendored
@ -41,6 +41,8 @@ easy_ma_t g_ma;
|
||||
|
||||
int ob_pthread_create(pthread_t *thread, const pthread_attr_t *attr,
|
||||
void *(*start_routine) (void *), void *arg);
|
||||
int ob_epoll_wait(int __epfd, struct epoll_event *__events,
|
||||
int __maxevents, int __timeout);
|
||||
void easy_ma_init(int port)
|
||||
{
|
||||
int i;
|
||||
@ -170,7 +172,7 @@ void* easy_ma_thread_func(easy_ma_t* ma)
|
||||
while (!ma->stop) {
|
||||
const int maxevents = 64;
|
||||
struct epoll_event events[maxevents];
|
||||
int cnt = epoll_wait(efd, events, maxevents, 1000);
|
||||
int cnt = ob_epoll_wait(efd, events, maxevents, 1000);
|
||||
for (i = 0; i < cnt; i++) {
|
||||
int cfd = events[i].data.fd;
|
||||
int emask = events[i].events;
|
||||
|
5
deps/easy/src/io/ev_epoll.c
vendored
5
deps/easy/src/io/ev_epoll.c
vendored
@ -64,6 +64,9 @@
|
||||
|
||||
#include <sys/epoll.h>
|
||||
|
||||
int ob_epoll_wait(int __epfd, struct epoll_event *__events,
|
||||
int __maxevents, int __timeout);
|
||||
|
||||
static void
|
||||
epoll_modify (EV_P_ int fd, int oev, int nev)
|
||||
{
|
||||
@ -133,7 +136,7 @@ epoll_poll (EV_P_ ev_tstamp timeout)
|
||||
/* epoll wait times cannot be larger than (LONG_MAX - 999UL) / HZ msecs, which is below */
|
||||
/* the default libev max wait time, however. */
|
||||
EV_RELEASE_CB;
|
||||
eventcnt = epoll_wait (backend_fd, epoll_events, epoll_eventmax, (int)ceil (timeout * 1000.));
|
||||
eventcnt = ob_epoll_wait (backend_fd, epoll_events, epoll_eventmax, (int)ceil (timeout * 1000.));
|
||||
EV_ACQUIRE_CB;
|
||||
|
||||
if (expect_false (eventcnt < 0)) {
|
||||
|
5
deps/oblib/src/lib/async/ev_epoll.c
vendored
5
deps/oblib/src/lib/async/ev_epoll.c
vendored
@ -67,6 +67,9 @@
|
||||
|
||||
#define EV_EMASK_EPERM 0x80
|
||||
|
||||
int ob_epoll_wait(int __epfd, struct epoll_event *__events,
|
||||
int __maxevents, int __timeout);
|
||||
|
||||
static void
|
||||
epoll_modify (EV_P_ int fd, int oev, int nev)
|
||||
{
|
||||
@ -150,7 +153,7 @@ epoll_poll (EV_P_ ev_tstamp timeout)
|
||||
/* epoll wait times cannot be larger than (LONG_MAX - 999UL) / HZ msecs, which is below */
|
||||
/* the default libev max wait time, however. */
|
||||
EV_RELEASE_CB;
|
||||
eventcnt = epoll_wait (backend_fd, epoll_events, epoll_eventmax, timeout * 1e3);
|
||||
eventcnt = ob_epoll_wait (backend_fd, epoll_events, epoll_eventmax, timeout * 1e3);
|
||||
EV_ACQUIRE_CB;
|
||||
|
||||
if (expect_false (eventcnt < 0))
|
||||
|
@ -20,6 +20,10 @@
|
||||
#include "lib/charset/ob_mysql_global.h"
|
||||
#include "lib/signal/ob_libunwind.h"
|
||||
|
||||
extern "C" {
|
||||
extern int ob_poll(struct pollfd *__fds, nfds_t __nfds, int __timeout);
|
||||
};
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace common
|
||||
@ -170,7 +174,7 @@ int wait_readable(int fd, int64_t timeout)
|
||||
bzero(&pfd, sizeof(pfd));
|
||||
pfd.fd = fd;
|
||||
pfd.events = POLLIN;
|
||||
int n = poll(&pfd, 1, timeout);
|
||||
int n = ob_poll(&pfd, 1, timeout);
|
||||
if (-1 == n) {
|
||||
ret = OB_ERR_SYS;
|
||||
DLOG(WARN, "poll failed, errno=%d", errno);
|
||||
|
16
deps/oblib/src/lib/stat/ob_diagnose_info.h
vendored
16
deps/oblib/src/lib/stat/ob_diagnose_info.h
vendored
@ -188,13 +188,19 @@ class ObSleepEventGuard : public ObWaitEventGuard
|
||||
{
|
||||
public:
|
||||
explicit ObSleepEventGuard(
|
||||
const int64_t event_no,
|
||||
const uint64_t timeout_ms = 0,
|
||||
const int64_t sleep_us = 0
|
||||
const int64_t event_no,
|
||||
const uint64_t timeout_ms,
|
||||
const int64_t sleep_us
|
||||
) : ObWaitEventGuard(event_no, timeout_ms, sleep_us, 0, 0, false)
|
||||
{
|
||||
lib::Thread::sleep_us_ = sleep_us;
|
||||
}
|
||||
explicit ObSleepEventGuard(
|
||||
const int64_t sleep_us = 0
|
||||
) : ObWaitEventGuard(ObWaitEventIds::DEFAULT_SLEEP, 0, sleep_us, 0, 0, false)
|
||||
{
|
||||
lib::Thread::sleep_us_ = sleep_us;
|
||||
}
|
||||
~ObSleepEventGuard()
|
||||
{
|
||||
lib::Thread::sleep_us_ = 0;
|
||||
@ -322,13 +328,13 @@ private:
|
||||
|
||||
#define SLEEP(time) \
|
||||
do { \
|
||||
oceanbase::common::ObSleepEventGuard wait_guard(oceanbase::common::ObWaitEventIds::DEFAULT_SLEEP, 0, ((int64_t)time) * 1000 * 1000); \
|
||||
oceanbase::common::ObSleepEventGuard wait_guard(((int64_t)time) * 1000 * 1000); \
|
||||
::sleep(time); \
|
||||
} while (0)
|
||||
|
||||
#define USLEEP(time) \
|
||||
do { \
|
||||
oceanbase::common::ObSleepEventGuard wait_guard(oceanbase::common::ObWaitEventIds::DEFAULT_SLEEP, 0, (int64_t)time); \
|
||||
oceanbase::common::ObSleepEventGuard wait_guard((int64_t)time); \
|
||||
::usleep(time); \
|
||||
} while (0)
|
||||
|
||||
|
55
deps/oblib/src/lib/thread/ob_tenant_hook.cpp
vendored
55
deps/oblib/src/lib/thread/ob_tenant_hook.cpp
vendored
@ -5,20 +5,23 @@
|
||||
#include "lib/thread/thread.h"
|
||||
#include "lib/thread/ob_thread_name.h"
|
||||
#include "lib/thread/protected_stack_allocator.h"
|
||||
#include "lib/stat/ob_diagnose_info.h"
|
||||
#include <dlfcn.h>
|
||||
#include <poll.h>
|
||||
#include <sys/epoll.h>
|
||||
|
||||
#define SYS_HOOK(func_name, ...) \
|
||||
({ \
|
||||
int ret = 0; \
|
||||
if (!in_sys_hook++) { \
|
||||
oceanbase::lib::Thread::is_blocking_ = true; \
|
||||
ret = real_##func_name(__VA_ARGS__); \
|
||||
oceanbase::lib::Thread::is_blocking_ = false; \
|
||||
} else { \
|
||||
ret = real_##func_name(__VA_ARGS__); \
|
||||
} \
|
||||
in_sys_hook--; \
|
||||
ret; \
|
||||
#define SYS_HOOK(func_name, ...) \
|
||||
({ \
|
||||
int ret = 0; \
|
||||
if (!in_sys_hook++) { \
|
||||
oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT; \
|
||||
ret = real_##func_name(__VA_ARGS__); \
|
||||
oceanbase::lib::Thread::is_blocking_ = 0; \
|
||||
} else { \
|
||||
ret = real_##func_name(__VA_ARGS__); \
|
||||
} \
|
||||
in_sys_hook--; \
|
||||
ret; \
|
||||
})
|
||||
|
||||
namespace oceanbase {
|
||||
@ -116,6 +119,28 @@ int pthread_rwlock_timedwrlock(pthread_rwlock_t *__restrict __rwlock,
|
||||
}
|
||||
#endif
|
||||
|
||||
int ob_epoll_wait(int __epfd, struct epoll_event *__events,
|
||||
int __maxevents, int __timeout)
|
||||
{
|
||||
static int (*real_epoll_wait)(
|
||||
int __epfd, struct epoll_event *__events,
|
||||
int __maxevents, int __timeout) = epoll_wait;
|
||||
int ret = 0;
|
||||
oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT_FOR_IO_EVENT;
|
||||
ret = SYS_HOOK(epoll_wait, __epfd, __events, __maxevents, __timeout);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ob_poll(struct pollfd *__fds, nfds_t __nfds, int __timeout)
|
||||
{
|
||||
static int (*real_poll)(
|
||||
struct pollfd *__fds, nfds_t __nfds, int __timeout) = poll;
|
||||
int ret = 0;
|
||||
oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT_FOR_IO_EVENT;
|
||||
ret = SYS_HOOK(poll, __fds, __nfds, __timeout);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ob_pthread_cond_wait(pthread_cond_t *__restrict __cond,
|
||||
pthread_mutex_t *__restrict __mutex)
|
||||
{
|
||||
@ -138,6 +163,12 @@ int ob_pthread_cond_timedwait(pthread_cond_t *__restrict __cond,
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ob_usleep(const useconds_t v)
|
||||
{
|
||||
oceanbase::common::ObSleepEventGuard wait_guard((int64_t)v);
|
||||
::usleep(v);
|
||||
}
|
||||
|
||||
int futex_hook(uint32_t *uaddr, int futex_op, uint32_t val, const struct timespec* timeout)
|
||||
{
|
||||
static long int (*real_syscall)(long int __sysno, ...) = syscall;
|
||||
|
2
deps/oblib/src/lib/thread/thread.cpp
vendored
2
deps/oblib/src/lib/thread/thread.cpp
vendored
@ -33,7 +33,7 @@ using namespace oceanbase::lib;
|
||||
thread_local int64_t Thread::loop_ts_ = 0;
|
||||
thread_local pthread_t Thread::thread_joined_ = 0;
|
||||
thread_local int64_t Thread::sleep_us_ = 0;
|
||||
thread_local bool Thread::is_blocking_ = false;
|
||||
thread_local uint8_t Thread::is_blocking_ = 0;
|
||||
thread_local char* Thread::rpc_dest_addr_ = nullptr;
|
||||
thread_local Thread* Thread::current_thread_ = nullptr;
|
||||
int64_t Thread::total_thread_count_ = 0;
|
||||
|
5
deps/oblib/src/lib/thread/thread.h
vendored
5
deps/oblib/src/lib/thread/thread.h
vendored
@ -60,11 +60,14 @@ public:
|
||||
return update_loop_ts(common::ObTimeUtility::fast_current_time());
|
||||
}
|
||||
public:
|
||||
static constexpr uint8_t WAIT = (1 << 0);
|
||||
static constexpr uint8_t WAIT_IN_TENANT_QUEUE = (1 << 1);
|
||||
static constexpr uint8_t WAIT_FOR_IO_EVENT = (1 << 2);
|
||||
// for thread diagnose, maybe replace it with union later.
|
||||
static thread_local int64_t loop_ts_;
|
||||
static thread_local pthread_t thread_joined_;
|
||||
static thread_local int64_t sleep_us_;
|
||||
static thread_local bool is_blocking_;
|
||||
static thread_local uint8_t is_blocking_;
|
||||
static thread_local char* rpc_dest_addr_;
|
||||
private:
|
||||
static void* __th_start(void *th);
|
||||
|
@ -435,7 +435,7 @@ ObPacket *ObReqTransport::send_session(easy_session_t *s) const
|
||||
// Synchronous rpc always needs to return packets
|
||||
s->unneed_response = false;
|
||||
s->r.client_start_time = common::ObTimeUtility::current_time();
|
||||
lib::Thread::update_loop_ts(s->r.client_start_time);
|
||||
lib::Thread::loop_ts_ = s->r.client_start_time; // avoid clear_clock
|
||||
if (0 == s->addr.cidx) {
|
||||
s->addr.cidx = balance_assign(s);
|
||||
}
|
||||
|
7
deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp
vendored
7
deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp
vendored
@ -37,6 +37,11 @@ using namespace oceanbase::common;
|
||||
#define SO_REUSEPORT 15
|
||||
#endif
|
||||
|
||||
extern "C" {
|
||||
extern int ob_epoll_wait(int __epfd, struct epoll_event *__events,
|
||||
int __maxevents, int __timeout);
|
||||
};
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace obmysql
|
||||
@ -753,7 +758,7 @@ private:
|
||||
void handle_epoll_event() {
|
||||
const int maxevents = 512;
|
||||
struct epoll_event events[maxevents];
|
||||
int cnt = epoll_wait(epfd_, events, maxevents, 1000);
|
||||
int cnt = ob_epoll_wait(epfd_, events, maxevents, 1000);
|
||||
for(int i = 0; i < cnt; i++) {
|
||||
ObSqlSock* s = (ObSqlSock*)events[i].data.ptr;
|
||||
if (OB_UNLIKELY(NULL == s)) {
|
||||
|
2
deps/oblib/src/rpc/obrpc/ob_listener.cpp
vendored
2
deps/oblib/src/rpc/obrpc/ob_listener.cpp
vendored
@ -313,7 +313,7 @@ void ObListener::do_work()
|
||||
struct epoll_event conn_ev;
|
||||
|
||||
while(!has_set_stop()) {
|
||||
int cnt = epoll_wait(epoll_fd, events, MAXEPOLLSIZE, 1000);
|
||||
int cnt = ob_epoll_wait(epoll_fd, events, MAXEPOLLSIZE, 1000);
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
int accept_fd = events[i].data.fd;
|
||||
uint64_t client_magic = 0;
|
||||
|
11
deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp
vendored
11
deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp
vendored
@ -34,6 +34,11 @@
|
||||
#include "rpc/frame/ob_req_transport.h"
|
||||
#include "io/easy_negotiation.h"
|
||||
|
||||
extern "C" {
|
||||
extern int ob_epoll_wait(int __epfd, struct epoll_event *__events,
|
||||
int __maxevents, int __timeout);
|
||||
};
|
||||
|
||||
using namespace oceanbase::common;
|
||||
using namespace oceanbase::common::serialization;
|
||||
using namespace oceanbase::lib;
|
||||
@ -442,7 +447,7 @@ void ObNetKeepAlive::do_server_loop()
|
||||
ob_abort();
|
||||
}
|
||||
while (!has_set_stop()) {
|
||||
int cnt = epoll_wait(epfd, events, sizeof events/sizeof events[0], 1000);
|
||||
int cnt = ob_epoll_wait(epfd, events, sizeof events/sizeof events[0], 1000);
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
struct server *s = (struct server *)events[i].data.ptr;
|
||||
int ev_fd = NULL == s? pipefd_ : s->fd_;
|
||||
@ -539,7 +544,7 @@ void ObNetKeepAlive::do_client_loop()
|
||||
int64_t now = get_usec();
|
||||
int64_t past = now - last_check_ts;
|
||||
if (past < KEEPALIVE_INTERVAL) {
|
||||
usleep(KEEPALIVE_INTERVAL - past);
|
||||
ob_usleep(KEEPALIVE_INTERVAL - past);
|
||||
}
|
||||
for (int i = 0; i < MAX_RS_COUNT; i++) {
|
||||
struct rpc_server *rs = ATOMIC_LOAD(&rss_[i]);
|
||||
@ -586,7 +591,7 @@ void ObNetKeepAlive::do_client_loop()
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
int cnt = epoll_wait(epfd, events, sizeof events/sizeof events[0], KEEPALIVE_INTERVAL/2/1000);
|
||||
int cnt = ob_epoll_wait(epfd, events, sizeof events/sizeof events[0], KEEPALIVE_INTERVAL/2/1000);
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
client *c = (client *)events[i].data.ptr;
|
||||
int ev_fd = c->fd_;
|
||||
|
2
deps/oblib/src/rpc/pnio/io/eloop.c
vendored
2
deps/oblib/src/rpc/pnio/io/eloop.c
vendored
@ -48,7 +48,7 @@ void eloop_fire(eloop_t* ep, sock_t* s) {
|
||||
static void eloop_refire(eloop_t* ep, int64_t timeout) {
|
||||
const int maxevents = 512;
|
||||
struct epoll_event events[maxevents];
|
||||
int cnt = epoll_wait(ep->fd, events, maxevents, timeout);
|
||||
int cnt = ob_epoll_wait(ep->fd, events, maxevents, timeout);
|
||||
for(int i = 0; i < cnt; i++) {
|
||||
sock_t* s = (sock_t*)events[i].data.ptr;
|
||||
s->mask |= events[i].events;
|
||||
|
5
deps/ussl-hook/loop/ussl_eloop.c
vendored
5
deps/ussl-hook/loop/ussl_eloop.c
vendored
@ -1,3 +1,5 @@
|
||||
int64_t ob_update_loop_ts();
|
||||
|
||||
struct epoll_event *ussl_make_epoll_event(struct epoll_event *event, uint32_t event_flag, void *val)
|
||||
{
|
||||
event->events = event_flag;
|
||||
@ -41,7 +43,7 @@ static void ussl_eloop_refire(ussl_eloop_t *ep)
|
||||
{
|
||||
const int maxevents = 512;
|
||||
struct epoll_event events[maxevents];
|
||||
int cnt = epoll_wait(ep->fd, events, maxevents, 1000);
|
||||
int cnt = ob_epoll_wait(ep->fd, events, maxevents, 1000);
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
ussl_sock_t *s = (ussl_sock_t *)events[i].data.ptr;
|
||||
s->mask |= events[i].events;
|
||||
@ -79,6 +81,7 @@ static void ussl_eloop_handle_sock_event(ussl_sock_t *s)
|
||||
int ussl_eloop_run(ussl_eloop_t *ep)
|
||||
{
|
||||
while (1) {
|
||||
ob_update_loop_ts();
|
||||
ussl_eloop_refire(ep);
|
||||
ussl_dlink_for(&ep->ready_link, p) { ussl_eloop_handle_sock_event(ussl_structof(p, ussl_sock_t, ready_link)); }
|
||||
check_and_handle_timeout_event();
|
||||
|
3
deps/ussl-hook/ussl-hook.h
vendored
3
deps/ussl-hook/ussl-hook.h
vendored
@ -62,4 +62,7 @@ typedef struct ssl_config_item_t
|
||||
const char *ssl_invited_nodes; // the list of observers to enable SSL
|
||||
} ssl_config_item_t;
|
||||
|
||||
extern int ob_epoll_wait(int __epfd, struct epoll_event *__events,
|
||||
int __maxevents, int __timeout);
|
||||
|
||||
#endif // USSL_HOOK_USSL_HOOK_
|
||||
|
8
deps/ussl-hook/ussl-loop.c
vendored
8
deps/ussl-hook/ussl-loop.c
vendored
@ -20,6 +20,10 @@ static ussl_sf_t acceptfd_fty;
|
||||
static ussl_sf_t clientfd_fty;
|
||||
static pthread_t ussl_bg_thread_id;
|
||||
|
||||
int ob_pthread_create(pthread_t *thread, const pthread_attr_t *attr,
|
||||
void *(*start_routine) (void *), void *arg);
|
||||
void ob_set_thread_name(const char* type);
|
||||
|
||||
static int uloop_init(uloop_t *l)
|
||||
{
|
||||
int err = 0;
|
||||
@ -63,7 +67,7 @@ static int uloop_add_listen(uloop_t *l, int listen_fd, int backlog)
|
||||
|
||||
static void *bg_thread_func(void *arg)
|
||||
{
|
||||
prctl(PR_SET_NAME, "ussl_loop");
|
||||
ob_set_thread_name("ussl_loop");
|
||||
uloop_run(&global_ussl_loop_struct);
|
||||
return NULL;
|
||||
}
|
||||
@ -86,7 +90,7 @@ int init_bg_thread()
|
||||
if (0 == ret) {
|
||||
if (0 != uloop_init(&global_ussl_loop_struct)) {
|
||||
ussl_log_error("initialize uloop failed.")
|
||||
} else if (0 != pthread_create(&ussl_bg_thread_id, NULL, bg_thread_func, NULL)) {
|
||||
} else if (0 != ob_pthread_create(&ussl_bg_thread_id, NULL, bg_thread_func, NULL)) {
|
||||
ret = EIO;
|
||||
ussl_log_error("create background thread failed, errno:%d", errno);
|
||||
} else {
|
||||
|
@ -1926,16 +1926,16 @@ int dump_thread_info(lua_State *L)
|
||||
gen.next_column(trace_id_buf);
|
||||
}
|
||||
// status
|
||||
GET_OTHER_TSI_ADDR(uint8_t, is_blocking, &Thread::is_blocking_);
|
||||
{
|
||||
GET_OTHER_TSI_ADDR(pthread_t, join_addr, &Thread::thread_joined_);
|
||||
GET_OTHER_TSI_ADDR(int64_t, sleep_us, &Thread::sleep_us_);
|
||||
GET_OTHER_TSI_ADDR(bool, is_blocking, &Thread::is_blocking_);
|
||||
const char* status_str = nullptr;
|
||||
if (0 != join_addr) {
|
||||
status_str = "Join";
|
||||
} else if (0 != sleep_us) {
|
||||
status_str = "Sleep";
|
||||
} else if (is_blocking) {
|
||||
} else if (0 != is_blocking) {
|
||||
status_str = "Wait";
|
||||
} else {
|
||||
status_str = "Run";
|
||||
@ -1972,6 +1972,10 @@ int dump_thread_info(lua_State *L)
|
||||
do_with_crash_restore([&] {
|
||||
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "rpc to %s", rpc_dest_addr);
|
||||
}, has_segv);
|
||||
} else if (0 != (is_blocking & Thread::WAIT_IN_TENANT_QUEUE)) {
|
||||
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "tenant worker request");
|
||||
} else if (0 != (is_blocking & Thread::WAIT_FOR_IO_EVENT)) {
|
||||
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "IO events");
|
||||
}
|
||||
gen.next_column(wait_event);
|
||||
}
|
||||
|
@ -34,6 +34,8 @@ extern "C" {
|
||||
#include <lua.h>
|
||||
#include <lauxlib.h>
|
||||
#include <lualib.h>
|
||||
extern int ob_epoll_wait(int __epfd, struct epoll_event *__events,
|
||||
int __maxevents, int __timeout);
|
||||
}
|
||||
|
||||
using namespace oceanbase;
|
||||
@ -178,7 +180,7 @@ int ObUnixDomainListener::run()
|
||||
int conn_fd = -1;
|
||||
int ret = OB_SUCCESS;
|
||||
lib::Thread::update_loop_ts();
|
||||
int64_t event_cnt = epoll_wait(epoll_fd, events, EPOLL_EVENT_BUFFER_SIZE, TIMEOUT);
|
||||
int64_t event_cnt = ob_epoll_wait(epoll_fd, events, EPOLL_EVENT_BUFFER_SIZE, TIMEOUT);
|
||||
if (event_cnt < 0) {
|
||||
if (EINTR == errno) {
|
||||
// timeout, ignore
|
||||
|
@ -112,7 +112,7 @@ void ObArchiveTimer::run1()
|
||||
int64_t end_tstamp = ObTimeUtility::current_time();
|
||||
int64_t wait_interval = THREAD_RUN_INTERVAL - (end_tstamp - begin_tstamp);
|
||||
if (wait_interval > 0) {
|
||||
usleep(wait_interval);
|
||||
ob_usleep(wait_interval);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -51,7 +51,10 @@ void ObSignalHandle::run1()
|
||||
//to check _stop every second
|
||||
struct timespec timeout = {1, 0};
|
||||
while (!has_set_stop()) {//need not to check ret
|
||||
if ( -1 == (signum = sigtimedwait(&waitset, NULL, &timeout))) {
|
||||
oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT;
|
||||
signum = sigtimedwait(&waitset, NULL, &timeout);
|
||||
oceanbase::lib::Thread::is_blocking_ = 0;
|
||||
if (-1 == signum) {
|
||||
//do not log error, because timeout will also return -1.
|
||||
} else if (OB_FAIL(deal_signals(signum))) {
|
||||
LOG_WARN("Deal signal error", K(ret), K(signum));
|
||||
|
@ -375,7 +375,7 @@ void ObResourceGroup::check_worker_count()
|
||||
workers_.remove(wnode);
|
||||
destroy_worker(w);
|
||||
} else if (w->has_req_flag()
|
||||
&& Thread::is_blocking_
|
||||
&& 0 != Thread::is_blocking_
|
||||
&& w->is_default_worker()) {
|
||||
++token;
|
||||
}
|
||||
@ -1018,6 +1018,7 @@ int ObTenant::get_new_request(
|
||||
ObLink* task = nullptr;
|
||||
|
||||
req = nullptr;
|
||||
Thread::is_blocking_ |= Thread::WAIT_IN_TENANT_QUEUE;
|
||||
if (w.is_group_worker()) {
|
||||
w.set_large_query(false);
|
||||
w.set_curr_request_level(0);
|
||||
@ -1361,7 +1362,7 @@ void ObTenant::check_worker_count()
|
||||
workers_.remove(wnode);
|
||||
destroy_worker(w);
|
||||
} else if (w->has_req_flag()
|
||||
&& Thread::is_blocking_
|
||||
&& 0 != Thread::is_blocking_
|
||||
&& w->is_default_worker()) {
|
||||
++token;
|
||||
}
|
||||
|
@ -57,15 +57,24 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
|
||||
for (auto* header = *guard; OB_NOT_NULL(header); header = guard.next()) {
|
||||
auto* thread_base = (char*)(header->pth_);
|
||||
if (OB_NOT_NULL(thread_base)) {
|
||||
GET_OTHER_TSI_ADDR(int64_t, tid, &get_tid_cache());
|
||||
{
|
||||
char path[64];
|
||||
IGNORE_RETURN snprintf(path, 64, "/proc/self/task/%ld", tid);
|
||||
if (-1 == access(path, F_OK)) {
|
||||
// thread not exist, may have exited.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
GET_OTHER_TSI_ADDR(uint64_t, tenant_id, &ob_get_tenant_id());
|
||||
if (!is_sys_tenant(effective_tenant_id_)
|
||||
&& tenant_id != effective_tenant_id_) {
|
||||
continue;
|
||||
}
|
||||
GET_OTHER_TSI_ADDR(int64_t, tid, &get_tid_cache());
|
||||
GET_OTHER_TSI_ADDR(uint32_t*, wait_addr, &ObLatch::current_wait);
|
||||
GET_OTHER_TSI_ADDR(pthread_t, join_addr, &Thread::thread_joined_);
|
||||
GET_OTHER_TSI_ADDR(int64_t, sleep_us, &Thread::sleep_us_);
|
||||
GET_OTHER_TSI_ADDR(uint8_t, is_blocking, &Thread::is_blocking_);
|
||||
for (int64_t i = 0; i < col_count && OB_SUCC(ret); ++i) {
|
||||
const uint64_t col_id = output_column_ids_.at(i);
|
||||
ObObj *cells = cur_row_.cells_;
|
||||
@ -81,7 +90,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
|
||||
break;
|
||||
}
|
||||
case TENANT_ID: {
|
||||
cells[i].set_int(tenant_id);
|
||||
cells[i].set_int(0 == tenant_id ? OB_SERVER_TENANT_ID : tenant_id);
|
||||
break;
|
||||
}
|
||||
case TID: {
|
||||
@ -98,13 +107,12 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
|
||||
break;
|
||||
}
|
||||
case STATUS: {
|
||||
GET_OTHER_TSI_ADDR(bool, is_blocking, &Thread::is_blocking_);
|
||||
const char* status_str = nullptr;
|
||||
if (0 != join_addr) {
|
||||
status_str = "Join";
|
||||
} else if (0 != sleep_us) {
|
||||
status_str = "Sleep";
|
||||
} else if (is_blocking) {
|
||||
} else if (0 != is_blocking) {
|
||||
status_str = "Wait";
|
||||
} else {
|
||||
status_str = "Run";
|
||||
@ -138,6 +146,10 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
|
||||
do_with_crash_restore([&] {
|
||||
IGNORE_RETURN snprintf(wait_event_, 64, "rpc to %s", rpc_dest_addr);
|
||||
}, has_segv);
|
||||
} else if (0 != (is_blocking & Thread::WAIT_IN_TENANT_QUEUE)) {
|
||||
IGNORE_RETURN snprintf(wait_event_, 64, "tenant worker requests");
|
||||
} else if (0 != (is_blocking & Thread::WAIT_FOR_IO_EVENT)) {
|
||||
IGNORE_RETURN snprintf(wait_event_, 64, "IO events");
|
||||
}
|
||||
cells[i].set_varchar(wait_event_);
|
||||
cells[i].set_collation_type(
|
||||
|
@ -1213,12 +1213,14 @@ int ObLocalDevice::io_getevents(
|
||||
SHARE_LOG(WARN, "Invalid io context pointer, ", K(ret), KP(io_context));
|
||||
} else {
|
||||
int sys_ret = 0;
|
||||
oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT_FOR_IO_EVENT;
|
||||
while ((sys_ret = ::io_getevents(
|
||||
local_io_context->io_context_,
|
||||
min_nr,
|
||||
local_io_events->max_event_cnt_,
|
||||
local_io_events->io_events_,
|
||||
timeout)) < 0 && -EINTR == sys_ret); // ignore EINTR
|
||||
oceanbase::lib::Thread::is_blocking_ = 0;
|
||||
if (sys_ret < 0) {
|
||||
ret = OB_IO_ERROR;
|
||||
SHARE_LOG(WARN, "Fail to get io events, ", K(ret), K(sys_ret), KERRMSG);
|
||||
|
@ -86,7 +86,7 @@ private:
|
||||
{
|
||||
lib::set_thread_name("LogLimiterRefresh");
|
||||
while (!has_set_stop()) {
|
||||
usleep(100000);
|
||||
ob_usleep(100000);
|
||||
limiter_.refresh();
|
||||
}
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ int ObExprSleep::sleep(int64_t usec)
|
||||
useconds_t usec_req = static_cast<useconds_t>(MIN(CHECK_INTERVAL_IN_US, usec_rem));
|
||||
ObWaitEventGuard wait_guard(ObWaitEventIds::DEFAULT_SLEEP, 0, usec);
|
||||
while(usec_req > 0) {
|
||||
(void)::usleep(usec_req);
|
||||
ob_usleep(usec_req);
|
||||
if (OB_FAIL(THIS_WORKER.check_status())) {
|
||||
break;
|
||||
} else {
|
||||
|
@ -100,7 +100,7 @@ void ObXAInnerTableGCWorker::run1()
|
||||
}
|
||||
}
|
||||
//sleep 20 secnd whether gc succ or not
|
||||
sleep(20);
|
||||
SLEEP(20);
|
||||
// try refresh gc_interval,
|
||||
// if gc falied, and not update last_scan_ts, update gc_interval can be effective
|
||||
// gc_interval ~ [20s, 24h]
|
||||
|
Loading…
x
Reference in New Issue
Block a user