fix OmtNodeBalancer hang
This commit is contained in:
parent
c8173884d0
commit
669e679f03
39
deps/oblib/src/lib/thread/thread.cpp
vendored
39
deps/oblib/src/lib/thread/thread.cpp
vendored
@ -24,6 +24,7 @@
|
||||
#include "lib/thread/protected_stack_allocator.h"
|
||||
#include "lib/utility/ob_defer.h"
|
||||
#include "lib/utility/ob_hang_fatal_error.h"
|
||||
#include "lib/utility/ob_tracepoint.h"
|
||||
#include "lib/signal/ob_signal_struct.h"
|
||||
|
||||
using namespace oceanbase;
|
||||
@ -127,6 +128,16 @@ int Thread::start(Runnable runnable)
|
||||
|
||||
void Thread::stop()
|
||||
{
|
||||
#ifdef ERRSIM
|
||||
if (!stop_
|
||||
&& stack_addr_ != NULL
|
||||
&& 0 != (OB_E(EventTable::EN_THREAD_HANG) 0)) {
|
||||
int tid_offset = 720;
|
||||
int tid = *(pid_t*)((char*)pth_ + tid_offset);
|
||||
LOG_WARN_RET(OB_SUCCESS, "stop was ignored", K(tid));
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
#ifndef OB_USE_ASAN
|
||||
if (!stop_ && stack_addr_ != NULL) {
|
||||
int tid_offset = 720;
|
||||
@ -179,14 +190,35 @@ void Thread::dump_pth() // for debug pthread join faileds
|
||||
#endif
|
||||
}
|
||||
|
||||
void Thread::wait()
|
||||
int Thread::try_wait()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (pth_ != 0) {
|
||||
int tmp = pthread_tryjoin_np(pth_, nullptr);
|
||||
if (EBUSY == tmp) {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_WARN("pthread_tryjoin_np failed", K(tmp), K(errno), K(tid_before_stop_));
|
||||
} else if (0 == tmp) {
|
||||
destroy_stack();
|
||||
runnable_ = nullptr;
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("pthread_tryjoin_np failed", K(tmp), K(errno), K(tid_before_stop_));
|
||||
dump_pth();
|
||||
abort();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void Thread::wait()
|
||||
{
|
||||
int ret = 0;
|
||||
if (pth_ != 0) {
|
||||
if (2 <= ATOMIC_AAF(&join_concurrency_, 1)) {
|
||||
abort();
|
||||
}
|
||||
if (OB_FAIL(pthread_join(pth_, nullptr))) {
|
||||
if (0 != (ret = pthread_join(pth_, nullptr))) {
|
||||
LOG_ERROR("pthread_join failed", K(ret), K(errno));
|
||||
#ifndef OB_USE_ASAN
|
||||
dump_pth();
|
||||
@ -194,7 +226,6 @@ void Thread::wait()
|
||||
#endif
|
||||
}
|
||||
destroy_stack();
|
||||
pth_ = 0;
|
||||
runnable_ = nullptr;
|
||||
if (1 <= ATOMIC_AAF(&join_concurrency_, -1)) {
|
||||
abort();
|
||||
@ -215,11 +246,11 @@ void Thread::destroy()
|
||||
|
||||
void Thread::destroy_stack()
|
||||
{
|
||||
|
||||
#ifndef OB_USE_ASAN
|
||||
if (stack_addr_ != nullptr) {
|
||||
g_stack_allocer.dealloc(stack_addr_);
|
||||
stack_addr_ = nullptr;
|
||||
pth_ = 0;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
1
deps/oblib/src/lib/thread/thread.h
vendored
1
deps/oblib/src/lib/thread/thread.h
vendored
@ -34,6 +34,7 @@ public:
|
||||
int start();
|
||||
int start(Runnable runnable);
|
||||
void stop();
|
||||
int try_wait();
|
||||
void wait();
|
||||
void destroy();
|
||||
void dump_pth();
|
||||
|
13
deps/oblib/src/lib/thread/threads.cpp
vendored
13
deps/oblib/src/lib/thread/threads.cpp
vendored
@ -240,6 +240,19 @@ void Threads::destroy_thread(Thread *thread)
|
||||
ob_free(thread);
|
||||
}
|
||||
|
||||
int Threads::try_wait()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (threads_ != nullptr) {
|
||||
for (int i = 0; i < n_threads_ && OB_SUCC(ret); i++) {
|
||||
if (threads_[i] != nullptr) {
|
||||
ret = threads_[i]->try_wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void Threads::wait()
|
||||
{
|
||||
if (threads_ != nullptr) {
|
||||
|
1
deps/oblib/src/lib/thread/threads.h
vendored
1
deps/oblib/src/lib/thread/threads.h
vendored
@ -93,6 +93,7 @@ public:
|
||||
}
|
||||
virtual int start();
|
||||
virtual void stop();
|
||||
virtual int try_wait();
|
||||
virtual void wait();
|
||||
void destroy();
|
||||
|
||||
|
1
deps/oblib/src/lib/utility/ob_tracepoint.h
vendored
1
deps/oblib/src/lib/utility/ob_tracepoint.h
vendored
@ -679,6 +679,7 @@ class EventTable
|
||||
EN_TX_FREE_ROUTE_STATE_SIZE = 2003,
|
||||
// Transaction common
|
||||
EN_TX_RESULT_INCOMPLETE = 2011,
|
||||
EN_THREAD_HANG = 2022,
|
||||
|
||||
EVENT_TABLE_MAX = SIZE_OF_EVENT_TABLE
|
||||
};
|
||||
|
@ -145,7 +145,7 @@ void TestMultiTenant::TearDown()
|
||||
GCTX.omt_->get_tenant_ids(ids);
|
||||
bool lock_succ = false;
|
||||
for (int64_t index = 0; index < ids.size(); index++) {
|
||||
GCTX.omt_->remove_tenant(ids[index], lock_succ);
|
||||
while (OB_EAGAIN == GCTX.omt_->remove_tenant(ids[index], lock_succ));
|
||||
}
|
||||
}
|
||||
|
||||
@ -187,7 +187,7 @@ TEST_F(TestMultiTenant, create_and_remove_tenant)
|
||||
ret = add_tenant(tenants[0]);
|
||||
ASSERT_EQ(OB_TENANT_EXIST, ret);
|
||||
bool lock_succ = false;
|
||||
GCTX.omt_->remove_tenant(tenants[0], lock_succ);
|
||||
while (OB_EAGAIN == GCTX.omt_->remove_tenant(tenants[0], lock_succ));
|
||||
//ASSERT_EQ(OB_SUCCESS, ret); // partition_service should init
|
||||
ret = add_tenant(tenants[0]);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
@ -334,9 +334,9 @@ TEST_F(TestMultiTenant, tenant_local)
|
||||
}
|
||||
guard.release();
|
||||
bool lock_succ = false;
|
||||
GCTX.omt_->remove_tenant(tenant_id[0], lock_succ);
|
||||
while (OB_EAGAIN == GCTX.omt_->remove_tenant(tenant_id[0], lock_succ));
|
||||
ASSERT_EQ(seq, 1);
|
||||
GCTX.omt_->remove_tenant(tenant_id[1], lock_succ);
|
||||
while (OB_EAGAIN == GCTX.omt_->remove_tenant(tenant_id[1], lock_succ));
|
||||
ASSERT_EQ(seq, 0);
|
||||
}
|
||||
|
||||
|
@ -911,7 +911,9 @@ int ObMultiTenant::create_tenant(const ObTenantMeta &meta, bool write_slog, cons
|
||||
if (create_step >= ObTenantCreateStep::STEP_TENANT_NEWED) {
|
||||
if (OB_NOT_NULL(tenant)) {
|
||||
tenant->stop();
|
||||
tenant->wait();
|
||||
while (OB_SUCCESS != tenant->try_wait()) {
|
||||
ob_usleep(100 * 1000);
|
||||
}
|
||||
tenant->destroy();
|
||||
ob_delete(tenant);
|
||||
tenant = nullptr;
|
||||
@ -1442,12 +1444,12 @@ int ObMultiTenant::mark_del_tenant(const uint64_t tenant_id)
|
||||
|
||||
// 确保remove_tenant函数可以重复调用, 因为在删除租户时失败会不断重试,
|
||||
// 这里只是删除内存结构,持久化的数据还在。
|
||||
int ObMultiTenant::remove_tenant(const uint64_t tenant_id, bool &try_clock_succ)
|
||||
int ObMultiTenant::remove_tenant(const uint64_t tenant_id, bool &remove_tenant_succ)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObTenant *removed_tenant = nullptr;
|
||||
try_clock_succ = false;
|
||||
remove_tenant_succ = false;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
@ -1479,19 +1481,10 @@ int ObMultiTenant::remove_tenant(const uint64_t tenant_id, bool &try_clock_succ)
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(removed_tenant)) {
|
||||
LOG_INFO("removed_tenant begin to wait", K(tenant_id));
|
||||
removed_tenant->wait();
|
||||
LOG_INFO("removed_tenant begin to try wlock", K(tenant_id));
|
||||
ObLDHandle handle;
|
||||
for (int i = 0; i < DEL_TRY_TIMES && !try_clock_succ; ++i) {
|
||||
if (OB_SUCC(removed_tenant->try_wrlock(handle))) {
|
||||
try_clock_succ = true;
|
||||
} else {
|
||||
ob_usleep(TIME_SLICE_PERIOD);
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
if (OB_FAIL(removed_tenant->try_wait())) {
|
||||
LOG_WARN("remove tenant try_wait failed", K(ret));
|
||||
} else if (OB_FAIL(removed_tenant->try_wrlock(handle))) {
|
||||
LOG_WARN("can't get tenant wlock to remove tenant", K(ret), K(tenant_id),
|
||||
KP(removed_tenant), K(removed_tenant->lock_));
|
||||
removed_tenant->lock_.ld_.print();
|
||||
@ -1512,6 +1505,7 @@ int ObMultiTenant::remove_tenant(const uint64_t tenant_id, bool &try_clock_succ)
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("must be same tenant", K(tenant_id), K(ret));
|
||||
} else {
|
||||
remove_tenant_succ = true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1660,14 +1654,14 @@ int ObMultiTenant::del_tenant(const uint64_t tenant_id)
|
||||
// 保证remove_tenant, clear_persistent_data可以幂等重试,
|
||||
// 如果失败会但不是加锁失败会一直无限重试, 保证如果prepare log写成功一定会有commit日志,
|
||||
// 即使这个过程中宕机重启, 重启回放日志时会继续删除并且补一条delete commit log
|
||||
bool lock_tenant_succ = false;
|
||||
if (OB_FAIL(remove_tenant(tenant_id, lock_tenant_succ))) {
|
||||
bool remove_tenant_succ = false;
|
||||
if (OB_FAIL(remove_tenant(tenant_id, remove_tenant_succ))) {
|
||||
LOG_WARN("fail to remove tenant", K(ret), K(tenant_id));
|
||||
// If lock failed, the tenant is not removed from tenants_list,
|
||||
// Here can break and leave ObTenantNodeBalancer::check_del_tenant to retry again,
|
||||
// in this case, the deletion of other tenants does not get stuck.
|
||||
// Otherwise it will have to retry indefinitely here, because the tenant cannot be obtained
|
||||
if (false == lock_tenant_succ) {
|
||||
if (false == remove_tenant_succ) {
|
||||
break;
|
||||
} else {
|
||||
SLEEP(1);
|
||||
|
@ -181,7 +181,7 @@ protected:
|
||||
const int64_t mem_limit,
|
||||
ObTenantMeta &meta);
|
||||
int create_virtual_tenants();
|
||||
int remove_tenant(const uint64_t tenant_id, bool &lock_succ);
|
||||
int remove_tenant(const uint64_t tenant_id, bool &remove_tenant_succ);
|
||||
uint32_t get_tenant_lock_bucket_idx(const uint64_t tenant_id);
|
||||
int update_tenant_unit_no_lock(const share::ObUnitInfoGetter::ObTenantConfig &unit,
|
||||
const UpdateTenantConfigOpt &opt);
|
||||
|
@ -417,28 +417,26 @@ void ObResourceGroup::check_worker_count(ObThWorker &w)
|
||||
}
|
||||
}
|
||||
|
||||
int ObResourceGroup::clear_worker()
|
||||
int ObResourceGroup::try_clear_worker()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMutexGuard guard(workers_lock_);
|
||||
while (req_queue_.size() > 0) {
|
||||
ob_usleep(10L * 1000L);
|
||||
if (req_queue_.size() > 0) {
|
||||
ret = OB_EAGAIN;
|
||||
}
|
||||
while (workers_.get_size() > 0) {
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ret)) {
|
||||
// try next time
|
||||
} else if (workers_.get_size() > 0) {
|
||||
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
|
||||
const auto w = static_cast<ObThWorker*>(wnode->get_data());
|
||||
workers_.remove(wnode);
|
||||
destroy_worker(w);
|
||||
w->stop();
|
||||
if (OB_FAIL(ret)) {
|
||||
// try next time
|
||||
} else if (OB_SUCC(w->try_wait())) {
|
||||
workers_.remove(wnode);
|
||||
destroy_worker(w);
|
||||
}
|
||||
}
|
||||
if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
|
||||
LOG_INFO(
|
||||
"Tenant has some group workers need stop",
|
||||
K(tenant_->id()),
|
||||
"group workers", workers_.get_size(),
|
||||
"group type", get_group_id());
|
||||
}
|
||||
ob_usleep(10L * 1000L);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -472,16 +470,17 @@ int GroupMap::create_and_insert_group(int32_t group_id, ObTenant *tenant, ObCgro
|
||||
return ret;
|
||||
}
|
||||
|
||||
void GroupMap::wait_group()
|
||||
int GroupMap::try_wait_group()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObResourceGroupNode* iter = NULL;
|
||||
while (nullptr != (iter = quick_next(iter))) {
|
||||
while (OB_NOT_NULL(iter = quick_next(iter)) && OB_SUCC(ret)) {
|
||||
ObResourceGroup *group = static_cast<ObResourceGroup*>(iter);
|
||||
if (OB_FAIL(group->clear_worker())) {
|
||||
LOG_ERROR("group clear worker failed", K(ret));
|
||||
if (OB_FAIL(group->try_clear_worker())) {
|
||||
// try next time
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void GroupMap::destroy_group()
|
||||
@ -829,58 +828,55 @@ int ObTenant::create_tenant_module()
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTenant::wait()
|
||||
int ObTenant::try_wait()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
handle_retry_req(true);
|
||||
while (req_queue_.size() > 0) {
|
||||
ob_usleep(10L * 1000L);
|
||||
if (req_queue_.size() > 0) {
|
||||
ret = OB_EAGAIN;
|
||||
}
|
||||
while (workers_.get_size() > 0) {
|
||||
if (OB_SUCC(workers_lock_.trylock())) {
|
||||
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
|
||||
const auto w = static_cast<ObThWorker*>(wnode->get_data());
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
// try next time
|
||||
} else if (workers_.get_size() > 0 && OB_SUCC(workers_lock_.trylock())) {
|
||||
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
|
||||
const auto w = static_cast<ObThWorker*>(wnode->get_data());
|
||||
w->stop();
|
||||
if (OB_FAIL(ret)) {
|
||||
// try next time
|
||||
} else if (OB_SUCC(w->try_wait())) {
|
||||
workers_.remove(wnode);
|
||||
destroy_worker(w);
|
||||
}
|
||||
IGNORE_RETURN workers_lock_.unlock();
|
||||
if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
|
||||
LOG_INFO(
|
||||
"Tenant has some workers need stop",
|
||||
K_(id),
|
||||
"workers", workers_.get_size(),
|
||||
K_(req_queue));
|
||||
}
|
||||
}
|
||||
ob_usleep(10L * 1000L);
|
||||
IGNORE_RETURN workers_lock_.unlock();
|
||||
}
|
||||
LOG_WARN_RET(OB_SUCCESS,"start remove nesting", K(nesting_workers_.get_size()), K_(id));
|
||||
while (nesting_workers_.get_size() > 0) {
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_SUCC(workers_lock_.trylock())) {
|
||||
DLIST_FOREACH_REMOVESAFE(wnode, nesting_workers_) {
|
||||
auto w = static_cast<ObThWorker*>(wnode->get_data());
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
// try next time
|
||||
} else if (nesting_workers_.get_size() > 0 && OB_SUCC(workers_lock_.trylock())) {
|
||||
DLIST_FOREACH_REMOVESAFE(wnode, nesting_workers_) {
|
||||
auto w = static_cast<ObThWorker*>(wnode->get_data());
|
||||
w->stop();
|
||||
if (OB_FAIL(ret)) {
|
||||
// try next time
|
||||
} else if (OB_SUCC(w->try_wait())) {
|
||||
nesting_workers_.remove(wnode);
|
||||
destroy_worker(w);
|
||||
}
|
||||
IGNORE_RETURN workers_lock_.unlock();
|
||||
if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
|
||||
LOG_INFO(
|
||||
"Tenant has some nesting workers need stop",
|
||||
K_(id),
|
||||
"nesting workers", nesting_workers_.get_size(),
|
||||
K_(req_queue));
|
||||
}
|
||||
}
|
||||
ob_usleep(10L * 1000L);
|
||||
IGNORE_RETURN workers_lock_.unlock();
|
||||
}
|
||||
LOG_WARN_RET(OB_SUCCESS, "finish remove nesting", K(nesting_workers_.get_size()), K_(id));
|
||||
|
||||
LOG_WARN_RET(OB_SUCCESS, "start remove group_map", K_(id));
|
||||
group_map_.wait_group();
|
||||
LOG_WARN_RET(OB_SUCCESS, "finish remove group_map", K_(id));
|
||||
if (OB_FAIL(ret)) {
|
||||
// try next time
|
||||
} else {
|
||||
ret = group_map_.try_wait_group();
|
||||
}
|
||||
|
||||
if (!is_virtual_tenant_id(id_) && !wait_mtl_finished_) {
|
||||
if (OB_FAIL(ret)) {
|
||||
// try next time
|
||||
} else if (!is_virtual_tenant_id(id_) && !wait_mtl_finished_) {
|
||||
ObTenantSwitchGuard guard(this);
|
||||
ObTenantBase::stop_mtl_module();
|
||||
OB_PX_TARGET_MGR.delete_tenant(id_);
|
||||
@ -888,6 +884,7 @@ void ObTenant::wait()
|
||||
ObTenantBase::wait_mtl_module();
|
||||
wait_mtl_finished_ = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTenant::destroy()
|
||||
|
@ -295,7 +295,7 @@ public:
|
||||
int acquire_more_worker(int64_t num, int64_t &succ_num);
|
||||
void check_worker_count();
|
||||
void check_worker_count(ObThWorker &w);
|
||||
int clear_worker();
|
||||
int try_clear_worker();
|
||||
|
||||
lib::ObMutex workers_lock_;
|
||||
|
||||
@ -323,7 +323,7 @@ public:
|
||||
}
|
||||
~GroupMap() {}
|
||||
int create_and_insert_group(int32_t group_id, ObTenant *tenant, share::ObCgroupCtrl *cgroup_ctrl, ObResourceGroup *&group);
|
||||
void wait_group();
|
||||
int try_wait_group();
|
||||
void destroy_group();
|
||||
int64_t to_string(char *buf, const int64_t buf_len) const
|
||||
{
|
||||
@ -401,7 +401,7 @@ public:
|
||||
int init(const ObTenantMeta &meta);
|
||||
void stop() { ATOMIC_STORE(&stopped_, true); }
|
||||
void start() { ATOMIC_STORE(&stopped_, false); }
|
||||
void wait();
|
||||
int try_wait();
|
||||
void destroy();
|
||||
bool has_stopped() const { return ATOMIC_LOAD(&stopped_); }
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user