fix cpu time incorrect without cgroup
This commit is contained in:
1
deps/oblib/src/lib/list/ob_dlink_node.h
vendored
1
deps/oblib/src/lib/list/ob_dlink_node.h
vendored
@ -139,6 +139,7 @@ template<typename T>
|
||||
struct ObDLinkNode: public ObDLinkBase<ObDLinkNode<T> >
|
||||
{
|
||||
ObDLinkNode():data_() {};
|
||||
ObDLinkNode(T data):data_(data) {};
|
||||
~ObDLinkNode() {};
|
||||
|
||||
T &get_data() {return data_;};
|
||||
|
191
deps/oblib/src/lib/thread/thread.cpp
vendored
191
deps/oblib/src/lib/thread/thread.cpp
vendored
@ -13,6 +13,7 @@
|
||||
#define USING_LOG_PREFIX LIB
|
||||
|
||||
#include "thread.h"
|
||||
#include "threads.h"
|
||||
#include <pthread.h>
|
||||
#include <sys/syscall.h>
|
||||
#include "lib/ob_errno.h"
|
||||
@ -45,18 +46,10 @@ Thread &Thread::current()
|
||||
return *current_thread_;
|
||||
}
|
||||
|
||||
Thread::Thread()
|
||||
: Thread(nullptr)
|
||||
{}
|
||||
|
||||
Thread::Thread(int64_t stack_size)
|
||||
: Thread(nullptr, stack_size)
|
||||
{}
|
||||
|
||||
Thread::Thread(Runnable runnable, int64_t stack_size)
|
||||
Thread::Thread(Threads *threads, int64_t idx, int64_t stack_size)
|
||||
: pth_(0),
|
||||
runnable_(runnable),
|
||||
tenant_id_(OB_SERVER_TENANT_ID),
|
||||
threads_(threads),
|
||||
idx_(idx),
|
||||
#ifndef OB_USE_ASAN
|
||||
stack_addr_(nullptr),
|
||||
#endif
|
||||
@ -64,7 +57,10 @@ Thread::Thread(Runnable runnable, int64_t stack_size)
|
||||
stop_(true),
|
||||
join_concurrency_(0),
|
||||
pid_before_stop_(0),
|
||||
tid_before_stop_(0)
|
||||
tid_before_stop_(0),
|
||||
tid_(0),
|
||||
thread_list_node_(this),
|
||||
cpu_time_(0)
|
||||
{}
|
||||
|
||||
Thread::~Thread()
|
||||
@ -75,48 +71,44 @@ Thread::~Thread()
|
||||
int Thread::start()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(runnable_)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
const int64_t count = ATOMIC_FAA(&total_thread_count_, 1);
|
||||
if (count >= get_max_thread_num() - OB_RESERVED_THREAD_NUM) {
|
||||
ATOMIC_FAA(&total_thread_count_, -1);
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
LOG_ERROR("thread count reach limit", K(ret), "current count", count);
|
||||
} else if (stack_size_ <= 0) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("invalid stack_size", K(ret), K(stack_size_));
|
||||
#ifndef OB_USE_ASAN
|
||||
} else if (OB_ISNULL(stack_addr_ = g_stack_allocer.alloc(0 == GET_TENANT_ID() ? OB_SERVER_TENANT_ID : GET_TENANT_ID(), stack_size_ + SIG_STACK_SIZE))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_ERROR("alloc stack memory failed", K(stack_size_));
|
||||
#endif
|
||||
} else {
|
||||
const int64_t count = ATOMIC_FAA(&total_thread_count_, 1);
|
||||
if (count >= get_max_thread_num() - OB_RESERVED_THREAD_NUM) {
|
||||
pthread_attr_t attr;
|
||||
bool need_destroy = false;
|
||||
int pret = pthread_attr_init(&attr);
|
||||
if (pret == 0) {
|
||||
need_destroy = true;
|
||||
#ifndef OB_USE_ASAN
|
||||
pret = pthread_attr_setstack(&attr, stack_addr_, stack_size_);
|
||||
#endif
|
||||
}
|
||||
if (pret == 0) {
|
||||
stop_ = false;
|
||||
pret = pthread_create(&pth_, &attr, __th_start, this);
|
||||
if (pret != 0) {
|
||||
LOG_ERROR("pthread create failed", K(pret), K(errno));
|
||||
pth_ = 0;
|
||||
}
|
||||
}
|
||||
if (0 != pret) {
|
||||
ATOMIC_FAA(&total_thread_count_, -1);
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
LOG_ERROR("thread count reach limit", K(ret), "current count", count);
|
||||
} else if (stack_size_ <= 0) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("invalid stack_size", K(ret), K(stack_size_));
|
||||
#ifndef OB_USE_ASAN
|
||||
} else if (OB_ISNULL(stack_addr_ = g_stack_allocer.alloc(0 == GET_TENANT_ID() ? OB_SERVER_TENANT_ID : GET_TENANT_ID(), stack_size_ + SIG_STACK_SIZE))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_ERROR("alloc stack memory failed", K(stack_size_));
|
||||
#endif
|
||||
} else {
|
||||
pthread_attr_t attr;
|
||||
bool need_destroy = false;
|
||||
int pret = pthread_attr_init(&attr);
|
||||
if (pret == 0) {
|
||||
need_destroy = true;
|
||||
#ifndef OB_USE_ASAN
|
||||
pret = pthread_attr_setstack(&attr, stack_addr_, stack_size_);
|
||||
#endif
|
||||
}
|
||||
if (pret == 0) {
|
||||
stop_ = false;
|
||||
pret = pthread_create(&pth_, &attr, __th_start, this);
|
||||
if (pret != 0) {
|
||||
LOG_ERROR("pthread create failed", K(pret), K(errno));
|
||||
pth_ = 0;
|
||||
}
|
||||
}
|
||||
if (0 != pret) {
|
||||
ATOMIC_FAA(&total_thread_count_, -1);
|
||||
ret = OB_ERR_SYS;
|
||||
stop_ = true;
|
||||
}
|
||||
if (need_destroy) {
|
||||
pthread_attr_destroy(&attr);
|
||||
}
|
||||
ret = OB_ERR_SYS;
|
||||
stop_ = true;
|
||||
}
|
||||
if (need_destroy) {
|
||||
pthread_attr_destroy(&attr);
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -125,12 +117,6 @@ int Thread::start()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int Thread::start(Runnable runnable)
|
||||
{
|
||||
runnable_ = runnable;
|
||||
return start();
|
||||
}
|
||||
|
||||
void Thread::stop()
|
||||
{
|
||||
#ifdef ERRSIM
|
||||
@ -157,6 +143,28 @@ void Thread::stop()
|
||||
stop_ = true;
|
||||
}
|
||||
|
||||
uint64_t Thread::get_tenant_id() const
|
||||
{
|
||||
uint64_t tenant_id = OB_SERVER_TENANT_ID;
|
||||
IRunWrapper *run_wrapper_ = threads_->get_run_wrapper();
|
||||
if (OB_NOT_NULL(run_wrapper_)) {
|
||||
tenant_id = run_wrapper_->id();
|
||||
}
|
||||
return tenant_id;
|
||||
}
|
||||
|
||||
void Thread::run()
|
||||
{
|
||||
IRunWrapper *run_wrapper_ = threads_->get_run_wrapper();
|
||||
if (OB_NOT_NULL(run_wrapper_)) {
|
||||
run_wrapper_->pre_run(this);
|
||||
threads_->run(idx_);
|
||||
run_wrapper_->end_run(this);
|
||||
} else {
|
||||
threads_->run(idx_);
|
||||
}
|
||||
}
|
||||
|
||||
void Thread::dump_pth() // for debug pthread join faileds
|
||||
{
|
||||
#ifndef OB_USE_ASAN
|
||||
@ -210,7 +218,6 @@ void Thread::wait()
|
||||
#endif
|
||||
}
|
||||
destroy_stack();
|
||||
runnable_ = nullptr;
|
||||
if (1 <= ATOMIC_AAF(&join_concurrency_, -1)) {
|
||||
ob_abort();
|
||||
}
|
||||
@ -246,6 +253,7 @@ void* Thread::__th_start(void *arg)
|
||||
Thread * const th = reinterpret_cast<Thread*>(arg);
|
||||
ob_set_thread_tenant_id(th->get_tenant_id());
|
||||
current_thread_ = th;
|
||||
th->tid_ = gettid();
|
||||
#ifndef OB_USE_ASAN
|
||||
ObStackHeader *stack_header = ProtectedStackAllocator::stack_header(th->stack_addr_);
|
||||
abort_unless(stack_header->check_magic());
|
||||
@ -301,7 +309,7 @@ void* Thread::__th_start(void *arg)
|
||||
WITH_CONTEXT(*mem_context) {
|
||||
try {
|
||||
in_try_stmt = true;
|
||||
th->runnable_();
|
||||
th->run();
|
||||
in_try_stmt = false;
|
||||
} catch (OB_BASE_EXCEPTION &except) {
|
||||
// we don't catch other exception because we don't know how to handle it
|
||||
@ -321,6 +329,69 @@ void* Thread::__th_start(void *arg)
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
int Thread::get_cpu_time_inc(int64_t &cpu_time_inc)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const pid_t pid = getpid();
|
||||
const int64_t tid = tid_;
|
||||
int64_t cpu_time = 0;
|
||||
cpu_time_inc = 0;
|
||||
|
||||
int fd = -1;
|
||||
int64_t read_size = -1;
|
||||
int32_t PATH_BUFSIZE = 512;
|
||||
int32_t MAX_LINE_LENGTH = 1024;
|
||||
int32_t VALUE_BUFSIZE = 32;
|
||||
char stat_path[PATH_BUFSIZE];
|
||||
char stat_content[MAX_LINE_LENGTH];
|
||||
|
||||
if (tid == 0) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else {
|
||||
snprintf(stat_path, PATH_BUFSIZE, "/proc/%d/task/%ld/stat", pid, tid);
|
||||
if ((fd = ::open(stat_path, O_RDONLY)) < 0) {
|
||||
ret = OB_IO_ERROR;
|
||||
LOG_WARN("open file error", K((const char *)stat_path), K(errno), KERRMSG, K(ret));
|
||||
} else if ((read_size = read(fd, stat_content, MAX_LINE_LENGTH)) < 0) {
|
||||
ret = OB_IO_ERROR;
|
||||
LOG_WARN("read file error",
|
||||
K((const char *)stat_path),
|
||||
K((const char *)stat_content),
|
||||
K(ret),
|
||||
K(errno),
|
||||
KERRMSG,
|
||||
K(ret));
|
||||
} else {
|
||||
// do nothing
|
||||
}
|
||||
if (fd >= 0) {
|
||||
close(fd);
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
const int USER_TIME_FIELD_INDEX = 13;
|
||||
const int SYSTEM_TIME_FIELD_INDEX = 14;
|
||||
int field_index = 0;
|
||||
char *save_ptr = nullptr;
|
||||
char *field_ptr = strtok_r(stat_content, " ", &save_ptr);
|
||||
while (field_ptr != NULL) {
|
||||
if (field_index == USER_TIME_FIELD_INDEX) {
|
||||
cpu_time += std::stoul(field_ptr) * 1000000 / sysconf(_SC_CLK_TCK);
|
||||
}
|
||||
if (field_index == SYSTEM_TIME_FIELD_INDEX) {
|
||||
cpu_time += std::stoul(field_ptr) * 1000000 / sysconf(_SC_CLK_TCK);
|
||||
break;
|
||||
}
|
||||
field_ptr = strtok_r(NULL, " ", &save_ptr);
|
||||
field_index++;
|
||||
}
|
||||
cpu_time_inc = cpu_time - cpu_time_;
|
||||
cpu_time_ = cpu_time;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace lib
|
||||
|
40
deps/oblib/src/lib/thread/thread.h
vendored
40
deps/oblib/src/lib/thread/thread.h
vendored
@ -22,20 +22,36 @@
|
||||
namespace oceanbase {
|
||||
namespace lib {
|
||||
|
||||
class Thread;
|
||||
class Threads;
|
||||
class IRunWrapper
|
||||
{
|
||||
public:
|
||||
virtual ~IRunWrapper() {}
|
||||
virtual int pre_run(Thread*)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
return ret;
|
||||
}
|
||||
virtual int end_run(Thread*)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
return ret;
|
||||
}
|
||||
virtual uint64_t id() const = 0;
|
||||
};
|
||||
|
||||
/// \class
|
||||
/// A wrapper of Linux thread that supports normal thread operations.
|
||||
class Thread {
|
||||
public:
|
||||
using Runnable = std::function<void()>;
|
||||
static constexpr int PATH_SIZE = 128;
|
||||
Thread();
|
||||
Thread(int64_t stack_size);
|
||||
Thread(Runnable runnable, int64_t stack_size=0);
|
||||
Thread(Threads *threads, int64_t idx, int64_t stack_size);
|
||||
~Thread();
|
||||
|
||||
int start();
|
||||
int start(Runnable runnable);
|
||||
void stop();
|
||||
void run();
|
||||
void wait();
|
||||
void destroy();
|
||||
void dump_pth();
|
||||
@ -47,8 +63,11 @@ public:
|
||||
static Thread ¤t();
|
||||
|
||||
bool has_set_stop() const;
|
||||
uint64_t get_tenant_id() const { return tenant_id_; }
|
||||
void set_tenant_id(uint64_t tenant_id) { tenant_id_ = tenant_id; }
|
||||
uint64_t get_tenant_id() const;
|
||||
using ThreadListNode = common::ObDLinkNode<lib::Thread *>;
|
||||
ThreadListNode *get_thread_list_node() { return &thread_list_node_; }
|
||||
int get_cpu_time_inc(int64_t &cpu_time_inc);
|
||||
int64_t get_tid() { return tid_; }
|
||||
|
||||
OB_INLINE static int64_t update_loop_ts(int64_t t)
|
||||
{
|
||||
@ -139,8 +158,8 @@ private:
|
||||
static int64_t total_thread_count_;
|
||||
private:
|
||||
pthread_t pth_;
|
||||
Runnable runnable_;
|
||||
uint64_t tenant_id_;
|
||||
Threads *threads_;
|
||||
int64_t idx_;
|
||||
#ifndef OB_USE_ASAN
|
||||
void *stack_addr_;
|
||||
#endif
|
||||
@ -149,6 +168,9 @@ private:
|
||||
int64_t join_concurrency_;
|
||||
pid_t pid_before_stop_;
|
||||
pid_t tid_before_stop_;
|
||||
int64_t tid_;
|
||||
ThreadListNode thread_list_node_;
|
||||
int64_t cpu_time_;
|
||||
};
|
||||
|
||||
OB_INLINE bool Thread::has_set_stop() const
|
||||
|
19
deps/oblib/src/lib/thread/threads.cpp
vendored
19
deps/oblib/src/lib/thread/threads.cpp
vendored
@ -67,11 +67,7 @@ int Threads::do_set_thread_count(int64_t n_threads)
|
||||
MEMCPY(new_threads, threads_, sizeof (Thread*) * n_threads_);
|
||||
for (auto i = n_threads_; i < n_threads; i++) {
|
||||
Thread *thread = nullptr;
|
||||
if (this->run_wrapper_ == nullptr) {
|
||||
ret = create_thread(thread, [this, i]() { this->run(i); });
|
||||
} else {
|
||||
ret = create_thread(thread, [this, i]() { this->run_wrapper_->pre_run(this); this->run(i); this->run_wrapper_->end_run(this); });
|
||||
}
|
||||
ret = create_thread(thread, i);
|
||||
if (OB_FAIL(ret)) {
|
||||
n_threads = i;
|
||||
break;
|
||||
@ -182,11 +178,7 @@ int Threads::start()
|
||||
MEMSET(threads_, 0, sizeof (Thread*) * n_threads_);
|
||||
for (int i = 0; i < n_threads_; i++) {
|
||||
Thread *thread = nullptr;
|
||||
if (this->run_wrapper_ == nullptr) {
|
||||
ret = create_thread(thread, [this, i]() { this->run(i); });
|
||||
} else {
|
||||
ret = create_thread(thread, [this, i]() { this->run_wrapper_->pre_run(this); this->run(i); this->run_wrapper_->end_run(this); });
|
||||
}
|
||||
ret = create_thread(thread, i);
|
||||
if (OB_FAIL(ret)) {
|
||||
break;
|
||||
} else {
|
||||
@ -213,7 +205,7 @@ void Threads::run(int64_t idx)
|
||||
run1();
|
||||
}
|
||||
|
||||
int Threads::create_thread(Thread *&thread, std::function<void()> entry)
|
||||
int Threads::create_thread(Thread *&thread, int64_t idx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
thread = nullptr;
|
||||
@ -221,10 +213,7 @@ int Threads::create_thread(Thread *&thread, std::function<void()> entry)
|
||||
if (buf == nullptr) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
} else {
|
||||
thread = new (buf) Thread(entry, stack_size_);
|
||||
if (nullptr != run_wrapper_) {
|
||||
thread->set_tenant_id(run_wrapper_->id());
|
||||
}
|
||||
thread = new (buf) Thread(this, idx, stack_size_);
|
||||
if (OB_FAIL(thread->start())) {
|
||||
thread->~Thread();
|
||||
ob_free(thread);
|
||||
|
30
deps/oblib/src/lib/thread/threads.h
vendored
30
deps/oblib/src/lib/thread/threads.h
vendored
@ -21,28 +21,10 @@
|
||||
#include "lib/lock/ob_spin_rwlock.h"
|
||||
|
||||
extern int64_t global_thread_stack_size;
|
||||
|
||||
namespace oceanbase {
|
||||
namespace lib {
|
||||
|
||||
class Threads;
|
||||
class IRunWrapper
|
||||
{
|
||||
public:
|
||||
virtual ~IRunWrapper() {}
|
||||
virtual int pre_run(Threads*)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
return ret;
|
||||
}
|
||||
virtual int end_run(Threads*)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
return ret;
|
||||
}
|
||||
virtual uint64_t id() const = 0;
|
||||
};
|
||||
|
||||
class IRunWrapper;
|
||||
class Threads
|
||||
{
|
||||
public:
|
||||
@ -83,10 +65,15 @@ public:
|
||||
{
|
||||
run_wrapper_ = run_wrapper;
|
||||
}
|
||||
IRunWrapper * get_run_wrapper()
|
||||
{
|
||||
return run_wrapper_;
|
||||
}
|
||||
virtual int start();
|
||||
virtual void stop();
|
||||
virtual void wait();
|
||||
void destroy();
|
||||
virtual void run(int64_t idx);
|
||||
|
||||
public:
|
||||
template <class Functor>
|
||||
@ -112,12 +99,11 @@ protected:
|
||||
void set_thread_idx(int64_t idx) { thread_idx_ = idx; }
|
||||
|
||||
private:
|
||||
virtual void run(int64_t idx);
|
||||
virtual void run1() {}
|
||||
|
||||
int do_thread_recycle();
|
||||
/// \brief Create thread with start entry \c entry.
|
||||
int create_thread(Thread *&thread, std::function<void()> entry);
|
||||
/// \brief Create thread
|
||||
int create_thread(Thread *&thread, int64_t idx);
|
||||
|
||||
/// \brief Destroy thread.
|
||||
void destroy_thread(Thread *thread);
|
||||
|
@ -2115,7 +2115,7 @@ int ObMultiTenant::get_tenant_cpu_time(const uint64_t tenant_id, int64_t &cpu_ti
|
||||
int ret = OB_SUCCESS;
|
||||
ObTenant *tenant = nullptr;
|
||||
cpu_time = 0;
|
||||
if (GCONF.enable_cgroup) {
|
||||
if (OB_NOT_NULL(GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid()) {
|
||||
ret = GCTX.cgroup_ctrl_->get_cpu_time(tenant_id, cpu_time);
|
||||
} else {
|
||||
if (!lock_.try_rdlock()) {
|
||||
@ -2123,7 +2123,7 @@ int ObMultiTenant::get_tenant_cpu_time(const uint64_t tenant_id, int64_t &cpu_ti
|
||||
} else {
|
||||
if (OB_FAIL(get_tenant_unsafe(tenant_id, tenant))) {
|
||||
} else {
|
||||
cpu_time = tenant->get_ru_cputime();
|
||||
cpu_time = tenant->get_cpu_time();
|
||||
}
|
||||
lock_.unlock();
|
||||
}
|
||||
|
@ -611,7 +611,8 @@ ObTenant::ObTenant(const int64_t id,
|
||||
ctx_(nullptr),
|
||||
st_metrics_(),
|
||||
sql_limiter_(),
|
||||
worker_us_(0)
|
||||
worker_us_(0),
|
||||
cpu_time_us_(0)
|
||||
{
|
||||
token_usage_check_ts_ = ObTimeUtility::current_time();
|
||||
lock_.set_diagnose(true);
|
||||
@ -1554,7 +1555,7 @@ void ObTenant::update_token_usage()
|
||||
int ret = OB_SUCCESS;
|
||||
const auto now = ObTimeUtility::current_time();
|
||||
const auto duration = static_cast<double>(now - token_usage_check_ts_);
|
||||
if (duration > 1000 * 1000 && OB_SUCC(workers_lock_.trylock())) { // every second
|
||||
if (duration >= 1000 * 1000 && OB_SUCC(workers_lock_.trylock())) { // every second
|
||||
ObResourceGroupNode* iter = NULL;
|
||||
ObResourceGroup* group = nullptr;
|
||||
int64_t idle_us = 0;
|
||||
@ -1579,6 +1580,22 @@ void ObTenant::update_token_usage()
|
||||
token_usage_ = std::max(.0, 1.0 * (total_us - idle_us) / total_us);
|
||||
IGNORE_RETURN ATOMIC_FAA(&worker_us_, total_us - idle_us);
|
||||
}
|
||||
|
||||
if (OB_NOT_NULL(GCTX.cgroup_ctrl_) && GCTX.cgroup_ctrl_->is_valid()) {
|
||||
//do nothing
|
||||
} else if (duration >= 1000 * 1000 && OB_SUCC(thread_list_lock_.trylock())) { // every second
|
||||
int64_t cpu_time_inc = 0;
|
||||
DLIST_FOREACH_REMOVESAFE(thread_list_node_, thread_list_)
|
||||
{
|
||||
Thread *thread = thread_list_node_->get_data();
|
||||
int64_t inc = 0;
|
||||
if (OB_SUCC(thread->get_cpu_time_inc(inc))) {
|
||||
cpu_time_inc += inc;
|
||||
}
|
||||
}
|
||||
thread_list_lock_.unlock();
|
||||
IGNORE_RETURN ATOMIC_FAA(&cpu_time_us_, cpu_time_inc);
|
||||
}
|
||||
}
|
||||
|
||||
void ObTenant::periodically_check()
|
||||
|
@ -412,8 +412,6 @@ public:
|
||||
int64_t max_worker_cnt() const;
|
||||
lib::Worker::CompatMode get_compat_mode() const;
|
||||
OB_INLINE share::ObTenantSpace &ctx() { return *ctx_; }
|
||||
|
||||
OB_INLINE void add_ru_cputime(int64_t ru_utime) { IGNORE_RETURN ATOMIC_FAA(reinterpret_cast<uint64_t *>(&ru_cputime_us_), ru_utime); }
|
||||
int rdlock(common::ObLDHandle &handle);
|
||||
int wrlock(common::ObLDHandle &handle);
|
||||
int try_rdlock(common::ObLDHandle &handle);
|
||||
@ -471,7 +469,7 @@ public:
|
||||
OB_INLINE bool user_sched_enabled() const { return !disable_user_sched_; }
|
||||
OB_INLINE double get_token_usage() const { return token_usage_; }
|
||||
OB_INLINE int64_t get_worker_time() const { return ATOMIC_LOAD(&worker_us_); }
|
||||
OB_INLINE int64_t get_ru_cputime() const { return ATOMIC_LOAD(&ru_cputime_us_); }
|
||||
OB_INLINE int64_t get_cpu_time() const { return ATOMIC_LOAD(&cpu_time_us_); }
|
||||
int64_t get_rusage_time();
|
||||
// sql throttle
|
||||
void update_sql_throttle_metrics(const ObSqlThrottleMetrics &metrics)
|
||||
@ -600,7 +598,7 @@ public:
|
||||
lib::ObQueryRateLimiter sql_limiter_;
|
||||
// idle time between two checkpoints
|
||||
int64_t worker_us_;
|
||||
int64_t ru_cputime_us_ CACHE_ALIGNED;
|
||||
int64_t cpu_time_us_ CACHE_ALIGNED;
|
||||
}; // end of class ObTenant
|
||||
|
||||
OB_INLINE int64_t ObResourceGroup::min_worker_cnt() const
|
||||
|
@ -107,7 +107,7 @@ ObThWorker::ObThWorker()
|
||||
priority_limit_(RQ_LOW), is_lq_yield_(false),
|
||||
query_start_time_(0), last_check_time_(0),
|
||||
can_retry_(true), need_retry_(false),
|
||||
has_add_to_cgroup_(false), last_wakeup_ts_(0), blocking_ts_(nullptr), ru_cputime_(0),
|
||||
has_add_to_cgroup_(false), last_wakeup_ts_(0), blocking_ts_(nullptr),
|
||||
idle_us_(0)
|
||||
{
|
||||
}
|
||||
@ -300,17 +300,6 @@ void ObThWorker::set_th_worker_thread_name()
|
||||
}
|
||||
}
|
||||
|
||||
OB_INLINE void ObThWorker::update_ru_cputime()
|
||||
{
|
||||
struct rusage ru;
|
||||
getrusage(RUSAGE_THREAD, &ru);
|
||||
int64_t ru_utime =
|
||||
ru.ru_utime.tv_sec * 1000000 + ru.ru_utime.tv_usec
|
||||
+ ru.ru_stime.tv_sec * 1000000 + ru.ru_stime.tv_usec;
|
||||
tenant_->add_ru_cputime(ru_utime - ru_cputime_);
|
||||
ru_cputime_ = ru_utime;
|
||||
}
|
||||
|
||||
void ObThWorker::worker(int64_t &tenant_id, int64_t &req_recv_timestamp, int32_t &worker_level)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -396,9 +385,6 @@ void ObThWorker::worker(int64_t &tenant_id, int64_t &req_recv_timestamp, int32_t
|
||||
set_last_wakeup_ts(query_start_time_);
|
||||
set_rpc_stat_srv(&(tenant_->rpc_stat_info_->rpc_stat_srv_));
|
||||
process_request(*req);
|
||||
if (!GCONF.enable_cgroup) {
|
||||
update_ru_cputime();
|
||||
}
|
||||
query_enqueue_time_ = INT64_MAX;
|
||||
query_start_time_ = INT64_MAX;
|
||||
} else {
|
||||
|
@ -139,7 +139,6 @@ private:
|
||||
|
||||
int64_t last_wakeup_ts_;
|
||||
int64_t* blocking_ts_;
|
||||
int64_t ru_cputime_;
|
||||
int64_t idle_us_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObThWorker);
|
||||
|
@ -290,27 +290,43 @@ ObCgroupCtrl *ObTenantBase::get_cgroup()
|
||||
return cgroup_ctrl;
|
||||
}
|
||||
|
||||
int ObTenantBase::pre_run(lib::Threads *th)
|
||||
int ObTenantBase::pre_run(lib::Thread* thread)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTenantEnv::set_tenant(this);
|
||||
ObCgroupCtrl *cgroup_ctrl = get_cgroup();
|
||||
if (cgroup_ctrl != nullptr) {
|
||||
if (cgroup_ctrl != nullptr && cgroup_ctrl->is_valid()) {
|
||||
ret = cgroup_ctrl->add_self_to_cgroup(id_);
|
||||
}
|
||||
{
|
||||
ThreadListNode *node = thread->get_thread_list_node();
|
||||
lib::ObMutexGuard guard(thread_list_lock_);
|
||||
if (OB_ISNULL(node)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_ERROR("Fail to allocate memory", K(ret));
|
||||
} else if (!thread_list_.add_last(node)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("add to thread list fail", K(ret));
|
||||
}
|
||||
}
|
||||
ATOMIC_INC(&thread_count_);
|
||||
LOG_INFO("tenant thread pre_run", K(MTL_ID()), K(ret), K(thread_count_), KP(th));
|
||||
LOG_INFO("tenant thread pre_run", K(MTL_ID()), K(ret), K(thread_count_), KP(thread));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTenantBase::end_run(lib::Threads *th)
|
||||
int ObTenantBase::end_run(lib::Thread* thread)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTenantEnv::set_tenant(nullptr);
|
||||
ObCgroupCtrl *cgroup_ctrl = get_cgroup();
|
||||
if (cgroup_ctrl != nullptr) {
|
||||
if (cgroup_ctrl != nullptr && cgroup_ctrl->is_valid()) {
|
||||
ret = cgroup_ctrl->remove_self_from_cgroup(id_);
|
||||
}
|
||||
{
|
||||
ThreadListNode *node = (ThreadListNode *) thread->get_thread_list_node();
|
||||
lib::ObMutexGuard guard(thread_list_lock_);
|
||||
thread_list_.remove(node);
|
||||
}
|
||||
ATOMIC_DEC(&thread_count_);
|
||||
LOG_INFO("tenant thread end_run", K(id_), K(ret), K(thread_count_));
|
||||
return ret;
|
||||
|
@ -403,8 +403,8 @@ template<class T> struct Identity {};
|
||||
|
||||
public:
|
||||
// TGHelper need
|
||||
virtual int pre_run(lib::Threads*) override;
|
||||
virtual int end_run(lib::Threads*) override;
|
||||
virtual int pre_run(lib::Thread*) override;
|
||||
virtual int end_run(lib::Thread*) override;
|
||||
virtual void tg_create_cb(int tg_id) override;
|
||||
virtual void tg_destroy_cb(int tg_id) override;
|
||||
|
||||
@ -556,6 +556,11 @@ private:
|
||||
bool enable_tenant_ctx_check_;
|
||||
int64_t thread_count_;
|
||||
bool mini_mode_;
|
||||
|
||||
using ThreadListNode = common::ObDLinkNode<lib::Thread *>;
|
||||
using ThreadList = common::ObDList<ThreadListNode>;
|
||||
ThreadList thread_list_;
|
||||
lib::ObMutex thread_list_lock_;
|
||||
};
|
||||
|
||||
using ReleaseCbFunc = std::function<int (common::ObLDHandle&)>;
|
||||
|
@ -671,7 +671,7 @@ int ObCgroupCtrl::get_cpu_time(const uint64_t tenant_id, int64_t &cpu_time)
|
||||
|
||||
char usage_path[PATH_BUFSIZE];
|
||||
char usage_value[VALUE_BUFSIZE + 1];
|
||||
snprintf(usage_path, PATH_BUFSIZE, "%s/tenant_%lu/cpuacct.usage", root_cgroup_, tenant_id);
|
||||
snprintf(usage_path, PATH_BUFSIZE, "%s/tenant_%04lu/cpuacct.usage", root_cgroup_, tenant_id);
|
||||
MEMSET(usage_value, 0, VALUE_BUFSIZE);
|
||||
if(OB_FAIL(get_string_from_file_(usage_path, usage_value))) {
|
||||
LOG_WARN("get cpu usage failed",
|
||||
|
Reference in New Issue
Block a user