fix dynamic worker working in performance testing

This commit is contained in:
nroskill 2023-06-15 10:12:32 +00:00 committed by ob-robot
parent 20dd11d7a7
commit cf36ee8bbf
12 changed files with 146 additions and 84 deletions

View File

@ -10,18 +10,17 @@
#include <poll.h>
#include <sys/epoll.h>
#define SYS_HOOK(func_name, ...) \
({ \
int ret = 0; \
if (!in_sys_hook++) { \
oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT; \
ret = real_##func_name(__VA_ARGS__); \
oceanbase::lib::Thread::is_blocking_ = 0; \
} else { \
ret = real_##func_name(__VA_ARGS__); \
} \
in_sys_hook--; \
ret; \
#define SYS_HOOK(func_name, ...) \
({ \
int ret = 0; \
if (!in_sys_hook++) { \
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT); \
ret = real_##func_name(__VA_ARGS__); \
} else { \
ret = real_##func_name(__VA_ARGS__); \
} \
in_sys_hook--; \
ret; \
})
namespace oceanbase {
@ -98,9 +97,8 @@ int pthread_join(pthread_t _thread, void **__retval)
static int (*real_pthread_join)(pthread_t _thread, void **__retval) =
(typeof(real_pthread_join))dlsym(RTLD_NEXT, "pthread_join");
int ret = 0;
::oceanbase::lib::Thread::thread_joined_ = _thread;
::oceanbase::lib::Thread::JoinGuard guard(_thread);
ret = SYS_HOOK(pthread_join, _thread, __retval);
::oceanbase::lib::Thread::thread_joined_ = 0;
return ret;
}
@ -126,7 +124,7 @@ int ob_epoll_wait(int __epfd, struct epoll_event *__events,
int __epfd, struct epoll_event *__events,
int __maxevents, int __timeout) = epoll_wait;
int ret = 0;
oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT_FOR_IO_EVENT;
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_IO_EVENT);
ret = SYS_HOOK(epoll_wait, __epfd, __events, __maxevents, __timeout);
return ret;
}
@ -136,7 +134,7 @@ int ob_poll(struct pollfd *__fds, nfds_t __nfds, int __timeout)
static int (*real_poll)(
struct pollfd *__fds, nfds_t __nfds, int __timeout) = poll;
int ret = 0;
oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT_FOR_IO_EVENT;
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_IO_EVENT);
ret = SYS_HOOK(poll, __fds, __nfds, __timeout);
return ret;
}

View File

@ -33,8 +33,9 @@ using namespace oceanbase::lib;
thread_local int64_t Thread::loop_ts_ = 0;
thread_local pthread_t Thread::thread_joined_ = 0;
thread_local int64_t Thread::sleep_us_ = 0;
thread_local uint8_t Thread::is_blocking_ = 0;
thread_local int64_t Thread::blocking_ts_ = 0;
thread_local ObAddr Thread::rpc_dest_addr_;
thread_local uint8_t Thread::wait_event_ = 0;
thread_local Thread* Thread::current_thread_ = nullptr;
int64_t Thread::total_thread_count_ = 0;

View File

@ -63,16 +63,73 @@ public:
return update_loop_ts(common::ObTimeUtility::fast_current_time());
}
public:
class BaseWaitGuard
{
public:
OB_INLINE explicit BaseWaitGuard() : last_ts_(blocking_ts_)
{
blocking_ts_ = common::ObTimeUtility::fast_current_time();
}
~BaseWaitGuard()
{
blocking_ts_ = last_ts_;
}
private:
int64_t last_ts_;
};
class WaitGuard : public BaseWaitGuard
{
public:
OB_INLINE explicit WaitGuard(uint8_t type) : type_(type)
{
wait_event_ |= type;
}
~WaitGuard()
{
wait_event_ &= ~type_;
}
private:
uint8_t type_;
};
class JoinGuard : public BaseWaitGuard
{
public:
OB_INLINE explicit JoinGuard(pthread_t thread)
{
thread_joined_ = thread;
}
~JoinGuard()
{
thread_joined_ = 0;
}
};
class RpcGuard : public BaseWaitGuard
{
public:
OB_INLINE explicit RpcGuard(const easy_addr_t& addr)
{
IGNORE_RETURN new (&rpc_dest_addr_) ObAddr(addr);
}
OB_INLINE explicit RpcGuard(const ObAddr& addr)
{
IGNORE_RETURN new (&rpc_dest_addr_) ObAddr(addr);
}
~RpcGuard()
{
rpc_dest_addr_.reset();
}
};
static constexpr uint8_t WAIT = (1 << 0);
static constexpr uint8_t WAIT_IN_TENANT_QUEUE = (1 << 1);
static constexpr uint8_t WAIT_FOR_IO_EVENT = (1 << 2);
static constexpr uint8_t WAIT_FOR_TRANS_RETRY = (1 << 3);
// for thread diagnose, maybe replace it with union later.
static thread_local int64_t loop_ts_;
static thread_local pthread_t thread_joined_;
static thread_local int64_t sleep_us_;
static thread_local uint8_t is_blocking_;
static thread_local int64_t blocking_ts_;
static thread_local ObAddr rpc_dest_addr_;
static thread_local uint8_t wait_event_;
private:
static void* __th_start(void *th);
void destroy_stack();

View File

@ -440,9 +440,10 @@ ObPacket *ObReqTransport::send_session(easy_session_t *s) const
s->addr.cidx = balance_assign(s);
}
IGNORE_RETURN new (&lib::Thread::rpc_dest_addr_) ObAddr(s->addr);
pkt = reinterpret_cast<ObPacket*>(easy_client_send(eio_, s->addr, s));
lib::Thread::rpc_dest_addr_.reset();
{
lib::Thread::RpcGuard guard(s->addr);
pkt = reinterpret_cast<ObPacket*>(easy_client_send(eio_, s->addr, s));
}
if (NULL == pkt) {
char buff[OB_SERVER_ADDR_STR_LEN] = {'\0'};
easy_inet_addr_to_str(&s->addr, buff, OB_SERVER_ADDR_STR_LEN);

View File

@ -131,31 +131,32 @@ public:
if (OB_LS_FETCH_LOG2 == pcode) {
pnio_group_id = ObPocRpcServer::RATELIMIT_PNIO_GROUP;
}
IGNORE_RETURN new (&lib::Thread::rpc_dest_addr_) ObAddr(addr);
if (OB_FAIL(rpc_encode_req(proxy, pool, pcode, args, opts, req, req_sz, false))) {
RPC_LOG(WARN, "rpc encode req fail", K(ret));
} else if(OB_FAIL(check_blacklist(addr))) {
RPC_LOG(WARN, "check_blacklist failed", K(ret));
} else if (0 != (sys_err = pn_send(
(pnio_group_id<<32) + thread_id,
obaddr2sockaddr(&sock_addr, addr),
req,
req_sz,
static_cast<int16_t>(set.idx_of_pcode(pcode)),
start_ts + get_proxy_timeout(proxy),
ObSyncRespCallback::client_cb,
&cb))) {
ret = translate_io_error(sys_err);
RPC_LOG(WARN, "pn_send fail", K(sys_err), K(addr), K(pcode));
} else if (OB_FAIL(cb.wait())) {
RPC_LOG(WARN, "sync rpc execute fail", K(ret), K(addr), K(pcode));
} else if (NULL == (resp = cb.get_resp(resp_sz))) {
ret = common::OB_ERR_UNEXPECTED;
RPC_LOG(WARN, "sync rpc execute success but resp is null", K(ret), K(addr), K(pcode));
} else if (OB_FAIL(rpc_decode_resp(resp, resp_sz, out, resp_pkt, rcode))) {
RPC_LOG(WARN, "execute rpc fail", K(addr), K(pcode), K(ret));
{
lib::Thread::RpcGuard guard(addr);
if (OB_FAIL(rpc_encode_req(proxy, pool, pcode, args, opts, req, req_sz, false))) {
RPC_LOG(WARN, "rpc encode req fail", K(ret));
} else if(OB_FAIL(check_blacklist(addr))) {
RPC_LOG(WARN, "check_blacklist failed", K(ret));
} else if (0 != (sys_err = pn_send(
(pnio_group_id<<32) + thread_id,
obaddr2sockaddr(&sock_addr, addr),
req,
req_sz,
static_cast<int16_t>(set.idx_of_pcode(pcode)),
start_ts + get_proxy_timeout(proxy),
ObSyncRespCallback::client_cb,
&cb))) {
ret = translate_io_error(sys_err);
RPC_LOG(WARN, "pn_send fail", K(sys_err), K(addr), K(pcode));
} else if (OB_FAIL(cb.wait())) {
RPC_LOG(WARN, "sync rpc execute fail", K(ret), K(addr), K(pcode));
} else if (NULL == (resp = cb.get_resp(resp_sz))) {
ret = common::OB_ERR_UNEXPECTED;
RPC_LOG(WARN, "sync rpc execute success but resp is null", K(ret), K(addr), K(pcode));
} else if (OB_FAIL(rpc_decode_resp(resp, resp_sz, out, resp_pkt, rcode))) {
RPC_LOG(WARN, "execute rpc fail", K(addr), K(pcode), K(ret));
}
}
lib::Thread::rpc_dest_addr_.reset();
if (rcode.rcode_ != OB_DESERIALIZE_ERROR) {
int wb_ret = OB_SUCCESS;
if (common::OB_SUCCESS != (wb_ret = log_user_error_and_warn(rcode))) {

View File

@ -1937,7 +1937,7 @@ int dump_thread_info(lua_State *L)
gen.next_column(trace_id_buf);
}
// status
GET_OTHER_TSI_ADDR(is_blocking, &Thread::is_blocking_);
GET_OTHER_TSI_ADDR(blocking_ts, &Thread::blocking_ts_);
{
GET_OTHER_TSI_ADDR(join_addr, &Thread::thread_joined_);
GET_OTHER_TSI_ADDR(sleep_us, &Thread::sleep_us_);
@ -1946,7 +1946,7 @@ int dump_thread_info(lua_State *L)
status_str = "Join";
} else if (0 != sleep_us) {
status_str = "Sleep";
} else if (0 != is_blocking) {
} else if (0 != blocking_ts) {
status_str = "Wait";
} else {
status_str = "Run";
@ -1959,6 +1959,7 @@ int dump_thread_info(lua_State *L)
GET_OTHER_TSI_ADDR(join_addr, &Thread::thread_joined_);
GET_OTHER_TSI_ADDR(sleep_us, &Thread::sleep_us_);
GET_OTHER_TSI_ADDR(rpc_dest_addr, &Thread::rpc_dest_addr_);
GET_OTHER_TSI_ADDR(event, &Thread::wait_event_);
constexpr int64_t BUF_LEN = 64;
char wait_event[BUF_LEN];
ObAddr addr;
@ -1984,14 +1985,14 @@ int dump_thread_info(lua_State *L)
if ((ret = snprintf(wait_event, BUF_LEN, "rpc to ")) > 0) {
IGNORE_RETURN addr.to_string(wait_event + ret, BUF_LEN - ret);
}
} else if (0 != (is_blocking & Thread::WAIT_IN_TENANT_QUEUE)) {
} else if (0 != blocking_ts && (0 != (Thread::WAIT_IN_TENANT_QUEUE & event))) {
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "tenant worker request");
} else if (0 != (is_blocking & Thread::WAIT_FOR_IO_EVENT)) {
} else if (0 != blocking_ts && (0 != (Thread::WAIT_FOR_IO_EVENT & event))) {
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "IO events");
} else if (0 != (is_blocking & Thread::WAIT_FOR_TRANS_RETRY)) {
IGNORE_RETURN snprintf(wait_event, 64, "trans retry");
} else if (0 != sleep_us) {
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "%ld us", sleep_us);
} else if (0 != blocking_ts) {
IGNORE_RETURN snprintf(wait_event, BUF_LEN, "%ld us", common::ObTimeUtility::fast_current_time() - blocking_ts);
}
gen.next_column(wait_event);
}

View File

@ -51,9 +51,10 @@ void ObSignalHandle::run1()
//to check _stop every second
struct timespec timeout = {1, 0};
while (!has_set_stop()) {//need not to check ret
oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT;
signum = sigtimedwait(&waitset, NULL, &timeout);
oceanbase::lib::Thread::is_blocking_ = 0;
{
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT);
signum = sigtimedwait(&waitset, NULL, &timeout);
}
if (-1 == signum) {
//do not log error, because timeout will also return -1.
} else if (OB_FAIL(deal_signals(signum))) {

View File

@ -374,6 +374,7 @@ void ObResourceGroup::check_worker_count()
int ret = OB_SUCCESS;
if (OB_SUCC(workers_lock_.trylock())) {
int64_t token = 1;
int64_t now = ObTimeUtility::current_time();
bool enable_dynamic_worker = true;
{
ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_->id()));
@ -385,31 +386,30 @@ void ObResourceGroup::check_worker_count()
workers_.remove(wnode);
destroy_worker(w);
} else if (w->has_req_flag()
&& w->is_blocking()
&& w->blocking_ts() - now >= EXPAND_INTERVAL
&& enable_dynamic_worker) {
++token;
}
}
int64_t succ_num = 0L;
int64_t now = 0;
token = std::max(token, min_worker_cnt());
token = std::min(token, max_worker_cnt());
if (OB_UNLIKELY(workers_.get_size() < min_worker_cnt())) {
const auto diff = min_worker_cnt() - workers_.get_size();
token_change_ts_ = ObTimeUtility::current_time();
token_change_ts_ = now;
ATOMIC_STORE(&shrink_, false);
acquire_more_worker(diff, succ_num);
LOG_INFO("worker thread created", K(tenant_->id()), K(group_id_), K(token));
} else if (OB_UNLIKELY(token > workers_.get_size())
&& OB_LIKELY(ObMallocAllocator::get_instance()->get_tenant_remain(tenant_->id()) > ObMallocAllocator::get_instance()->get_tenant_limit(tenant_->id()) * 0.05)) {
ATOMIC_STORE(&shrink_, false);
if (OB_LIKELY((now = ObTimeUtility::current_time()) - token_change_ts_ >= EXPAND_INTERVAL)) {
if (OB_LIKELY(now - token_change_ts_ >= EXPAND_INTERVAL)) {
token_change_ts_ = now;
acquire_more_worker(1, succ_num);
LOG_INFO("worker thread created", K(tenant_->id()), K(group_id_), K(token));
}
} else if (OB_UNLIKELY(token < workers_.get_size())
&& OB_LIKELY((now = ObTimeUtility::current_time()) - token_change_ts_ >= SHRINK_INTERVAL)) {
&& OB_LIKELY(now - token_change_ts_ >= SHRINK_INTERVAL)) {
token_change_ts_ = now;
ATOMIC_STORE(&shrink_, true);
LOG_INFO("worker thread began to shrink", K(tenant_->id()), K(group_id_), K(token));
@ -1034,7 +1034,7 @@ int ObTenant::get_new_request(
ObLink* task = nullptr;
req = nullptr;
Thread::is_blocking_ |= Thread::WAIT_IN_TENANT_QUEUE;
Thread::WaitGuard guard(Thread::WAIT_IN_TENANT_QUEUE);
if (w.is_group_worker()) {
w.set_large_query(false);
w.set_curr_request_level(0);
@ -1367,6 +1367,7 @@ void ObTenant::check_worker_count()
int ret = OB_SUCCESS;
if (OB_SUCC(workers_lock_.trylock())) {
int64_t token = 3;
int64_t now = ObTimeUtility::current_time();
bool enable_dynamic_worker = true;
{
ObTenantConfigGuard tenant_config(TENANT_CONF(id_));
@ -1379,32 +1380,31 @@ void ObTenant::check_worker_count()
workers_.remove(wnode);
destroy_worker(w);
} else if (w->has_req_flag()
&& w->is_blocking()
&& w->blocking_ts() - now >= EXPAND_INTERVAL
&& w->is_default_worker()
&& enable_dynamic_worker) {
++token;
}
}
int64_t succ_num = 0L;
int64_t now = 0;
token = std::max(token, min_worker_cnt());
token = std::min(token, max_worker_cnt());
if (OB_UNLIKELY(workers_.get_size() < min_worker_cnt())) {
const auto diff = min_worker_cnt() - workers_.get_size();
token_change_ts_ = ObTimeUtility::current_time();
token_change_ts_ = now;
ATOMIC_STORE(&shrink_, false);
acquire_more_worker(diff, succ_num);
LOG_INFO("worker thread created", K(id_), K(token));
} else if (OB_UNLIKELY(token > workers_.get_size())
&& OB_LIKELY(ObMallocAllocator::get_instance()->get_tenant_remain(id_) > ObMallocAllocator::get_instance()->get_tenant_limit(id_) * 0.05)) {
ATOMIC_STORE(&shrink_, false);
if (OB_LIKELY((now = ObTimeUtility::current_time()) - token_change_ts_ >= EXPAND_INTERVAL)) {
if (OB_LIKELY(now - token_change_ts_ >= EXPAND_INTERVAL)) {
token_change_ts_ = now;
acquire_more_worker(1, succ_num);
LOG_INFO("worker thread created", K(id_), K(token));
}
} else if (OB_UNLIKELY(token < workers_.get_size())
&& OB_LIKELY((now = ObTimeUtility::current_time()) - token_change_ts_ >= SHRINK_INTERVAL)) {
&& OB_LIKELY(now - token_change_ts_ >= SHRINK_INTERVAL)) {
token_change_ts_ = now;
ATOMIC_STORE(&shrink_, true);
LOG_INFO("worker thread began to shrink", K(id_), K(token));

View File

@ -107,7 +107,7 @@ ObThWorker::ObThWorker()
priority_limit_(RQ_LOW), is_lq_yield_(false),
query_start_time_(0), last_check_time_(0),
can_retry_(true), need_retry_(false),
has_add_to_cgroup_(false), last_wakeup_ts_(0), is_blocking_(nullptr), ru_cputime_(0),
has_add_to_cgroup_(false), last_wakeup_ts_(0), blocking_ts_(nullptr), ru_cputime_(0),
idle_us_(0)
{
}
@ -318,7 +318,7 @@ void ObThWorker::worker(int64_t &tenant_id, int64_t &req_recv_timestamp, int32_t
int64_t wait_start_time = 0;
int64_t wait_end_time = 0;
procor_.th_created();
is_blocking_ = &Thread::is_blocking_;
blocking_ts_ = &Thread::blocking_ts_;
ObTLTaGuard ta_guard(tenant_->id());
// Avoid adding and deleting entities from the root node for every request, the parameters are meaningless

View File

@ -101,7 +101,7 @@ public:
OB_INLINE void set_lq_yield(bool v=true) { is_lq_yield_ = v; }
OB_INLINE int64_t get_last_wakeup_ts() { return last_wakeup_ts_; }
OB_INLINE void set_last_wakeup_ts(int64_t last_wakeup_ts) { last_wakeup_ts_ = last_wakeup_ts; }
OB_INLINE bool is_blocking() const { return OB_NOT_NULL(is_blocking_) && (0 != *is_blocking_); }
OB_INLINE int64_t blocking_ts() const { return OB_NOT_NULL(blocking_ts_) ? (*blocking_ts_) : 0; }
private:
void set_th_worker_thread_name();
@ -134,7 +134,7 @@ private:
bool has_add_to_cgroup_;
int64_t last_wakeup_ts_;
uint8_t* is_blocking_;
int64_t* blocking_ts_;
int64_t ru_cputime_;
int64_t idle_us_;
private:

View File

@ -74,7 +74,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
GET_OTHER_TSI_ADDR(wait_addr, &ObLatch::current_wait);
GET_OTHER_TSI_ADDR(join_addr, &Thread::thread_joined_);
GET_OTHER_TSI_ADDR(sleep_us, &Thread::sleep_us_);
GET_OTHER_TSI_ADDR(is_blocking, &Thread::is_blocking_);
GET_OTHER_TSI_ADDR(blocking_ts, &Thread::blocking_ts_);
for (int64_t i = 0; i < col_count && OB_SUCC(ret); ++i) {
const uint64_t col_id = output_column_ids_.at(i);
ObObj *cells = cur_row_.cells_;
@ -112,7 +112,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
status_str = "Join";
} else if (0 != sleep_us) {
status_str = "Sleep";
} else if (0 != is_blocking) {
} else if (0 != blocking_ts) {
status_str = "Wait";
} else {
status_str = "Run";
@ -124,6 +124,7 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
}
case WAIT_EVENT: {
GET_OTHER_TSI_ADDR(rpc_dest_addr, &Thread::rpc_dest_addr_);
GET_OTHER_TSI_ADDR(event, &Thread::wait_event_);
ObAddr addr;
struct iovec local_iov = {&addr, sizeof(ObAddr)};
struct iovec remote_iov = {thread_base + rpc_dest_addr_offset, sizeof(ObAddr)};
@ -147,14 +148,14 @@ int ObAllVirtualThread::inner_get_next_row(common::ObNewRow *&row)
if ((ret = snprintf(wait_event_, 64, "rpc to ")) > 0) {
IGNORE_RETURN addr.to_string(wait_event_ + ret, 64 - ret);
}
} else if (0 != (is_blocking & Thread::WAIT_IN_TENANT_QUEUE)) {
} else if (0 != blocking_ts && (0 != (Thread::WAIT_IN_TENANT_QUEUE & event))) {
IGNORE_RETURN snprintf(wait_event_, 64, "tenant worker requests");
} else if (0 != (is_blocking & Thread::WAIT_FOR_IO_EVENT)) {
} else if (0 != blocking_ts && (0 != (Thread::WAIT_FOR_IO_EVENT & event))) {
IGNORE_RETURN snprintf(wait_event_, 64, "IO events");
} else if (0 != (is_blocking & Thread::WAIT_FOR_TRANS_RETRY)) {
IGNORE_RETURN snprintf(wait_event_, 64, "trans retry");
} else if (0 != sleep_us) {
IGNORE_RETURN snprintf(wait_event_, 64, "%ld us", sleep_us);
} else if (0 != blocking_ts) {
IGNORE_RETURN snprintf(wait_event_, 64, "%ld us", common::ObTimeUtility::fast_current_time() - blocking_ts);
}
cells[i].set_varchar(wait_event_);
cells[i].set_collation_type(

View File

@ -1213,14 +1213,15 @@ int ObLocalDevice::io_getevents(
SHARE_LOG(WARN, "Invalid io context pointer, ", K(ret), KP(io_context));
} else {
int sys_ret = 0;
oceanbase::lib::Thread::is_blocking_ |= oceanbase::lib::Thread::WAIT_FOR_IO_EVENT;
while ((sys_ret = ::io_getevents(
local_io_context->io_context_,
min_nr,
local_io_events->max_event_cnt_,
local_io_events->io_events_,
timeout)) < 0 && -EINTR == sys_ret); // ignore EINTR
oceanbase::lib::Thread::is_blocking_ = 0;
{
oceanbase::lib::Thread::WaitGuard guard(oceanbase::lib::Thread::WAIT_FOR_IO_EVENT);
while ((sys_ret = ::io_getevents(
local_io_context->io_context_,
min_nr,
local_io_events->max_event_cnt_,
local_io_events->io_events_,
timeout)) < 0 && -EINTR == sys_ret); // ignore EINTR
}
if (sys_ret < 0) {
ret = OB_IO_ERROR;
SHARE_LOG(WARN, "Fail to get io events, ", K(ret), K(sys_ret), KERRMSG);