[cp]fix: group request may be deaklock because nesting rpc

This commit is contained in:
obdev 2024-02-02 14:47:33 +00:00 committed by ob-robot
parent 22c45afcc4
commit 827aed2777
4 changed files with 136 additions and 43 deletions

View File

@ -17,6 +17,7 @@
#include "rpc/ob_request.h"
#define MULTI_LEVEL_QUEUE_SIZE (10)
#define MULTI_LEVEL_THRESHOLD (2)
#define GROUP_MULTI_LEVEL_THRESHOLD (1)
namespace oceanbase
{

View File

@ -365,6 +365,7 @@ ObResourceGroup::ObResourceGroup(int32_t group_id, ObTenant* tenant, share::ObCg
recv_req_cnt_(0),
shrink_(false),
token_change_ts_(0),
nesting_worker_cnt_(0),
tenant_(tenant),
cgroup_ctrl_(cgroup_ctrl)
{
@ -376,6 +377,8 @@ int ObResourceGroup::init()
if (nullptr == tenant_) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("group init failed");
} else if (FALSE_IT(multi_level_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size))) {
LOG_WARN("multi level queue set limit failed", K(ret), K(tenant_->id()), K(group_id_), K(*this));
} else {
req_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
inited_ = true;
@ -388,6 +391,28 @@ void ObResourceGroup::update_queue_size()
req_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
}
int ObResourceGroup::acquire_level_worker(int32_t level)
{
int ret = OB_SUCCESS;
ObTenantSwitchGuard guard(tenant_);
if (level <= 0 || level > MAX_REQUEST_LEVEL) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected level", K(level), K(tenant_->id()));
} else {
ObThWorker *w = nullptr;
if (OB_FAIL(create_worker(w, tenant_, group_id_, level, true /*ignore max worker limit*/, this))) {
LOG_WARN("create worker failed", K(ret));
} else if (!nesting_workers_.add_last(&w->worker_node_)) {
OB_ASSERT(false);
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("add worker to list fail", K(ret));
}
}
return ret;
}
int ObResourceGroup::acquire_more_worker(int64_t num, int64_t &succ_num, bool force)
{
int ret = OB_SUCCESS;
@ -424,6 +449,14 @@ void ObResourceGroup::check_worker_count()
{
int ret = OB_SUCCESS;
if (OB_SUCC(workers_lock_.trylock())) {
if (is_user_group(group_id_)
&& nesting_worker_cnt_ < (MAX_REQUEST_LEVEL - GROUP_MULTI_LEVEL_THRESHOLD)) {
for (int level = GROUP_MULTI_LEVEL_THRESHOLD + nesting_worker_cnt_; OB_SUCC(ret) && level < MAX_REQUEST_LEVEL; level++) {
if (OB_SUCC(acquire_level_worker(level))) {
nesting_worker_cnt_ = nesting_worker_cnt_ + 1;
}
}
}
int64_t now = ObTimeUtility::current_time();
bool enable_dynamic_worker = true;
int64_t threshold = 3 * 1000;
@ -508,9 +541,25 @@ int ObResourceGroup::clear_worker()
{
int ret = OB_SUCCESS;
ObMutexGuard guard(workers_lock_);
while (req_queue_.size() > 0) {
while (req_queue_.size() > 0
|| (multi_level_queue_.get_total_size() > 0)) {
ob_usleep(10L * 1000L);
}
while (nesting_workers_.get_size() > 0) {
int ret = OB_SUCCESS;
DLIST_FOREACH_REMOVESAFE(wnode, nesting_workers_) {
ObThWorker *w = static_cast<ObThWorker*>(wnode->get_data());
nesting_workers_.remove(wnode);
destroy_worker(w);
}
if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
LOG_INFO(
"Tenant has some group nesting workers need stop",
K(tenant_->id()),
"group nesting workers", nesting_workers_.get_size(),
"group id", get_group_id());
}
}
while (workers_.get_size() > 0) {
int ret = OB_SUCCESS;
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
@ -523,7 +572,7 @@ int ObResourceGroup::clear_worker()
"Tenant has some group workers need stop",
K(tenant_->id()),
"group workers", workers_.get_size(),
"group type", get_group_id());
"group id", get_group_id());
}
ob_usleep(10L * 1000L);
}
@ -753,11 +802,8 @@ int ObTenant::init(const ObTenantMeta &meta)
// there must be 2 workers.
static_cast<ObThWorker*>(workers_.get_first()->get_data())->set_priority_limit(QQ_HIGH);
static_cast<ObThWorker*>(workers_.get_last()->get_data())->set_priority_limit(QQ_NORMAL);
if (!is_virtual_tenant_id(id_) && !is_meta_tenant(id_)) {
for (int level = MULTI_LEVEL_THRESHOLD; level < MAX_REQUEST_LEVEL; level++) {
if (OB_FAIL(acquire_level_worker(1, succ_cnt, level))) {
break;
}
for (int level = MULTI_LEVEL_THRESHOLD; OB_SUCC(ret) && level < MAX_REQUEST_LEVEL; level++) {
if (OB_SUCC(acquire_level_worker(1, succ_cnt, level))) {
succ_cnt = 0L;
}
}
@ -1165,21 +1211,41 @@ int ObTenant::get_new_request(
ObLink* task = nullptr;
req = nullptr;
int wk_level = 0;
Thread::WaitGuard guard(Thread::WAIT_IN_TENANT_QUEUE);
if (w.is_group_worker()) {
w.set_large_query(false);
w.set_curr_request_level(0);
if (OB_SUCC(w.get_group()->req_queue_.pop(task, timeout))) {
EVENT_INC(REQUEST_DEQUEUE_COUNT);
if (nullptr == req && nullptr != task) {
req = static_cast<rpc::ObRequest*>(task);
if (req->large_retry_flag()) {
w.set_large_query();
wk_level = w.get_worker_level();
if (wk_level < 0 || wk_level >= MAX_REQUEST_LEVEL) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected level", K(wk_level), K(id_));
} else if (wk_level >= MAX_REQUEST_LEVEL - 1) {
ret = w.get_group()->multi_level_queue_.pop_timeup(task, wk_level, timeout);
if ((ret == OB_SUCCESS && nullptr == task) || ret == OB_ENTRY_NOT_EXIST) {
ret = OB_ENTRY_NOT_EXIST;
usleep(10 * 1000L);
} else if (ret == OB_SUCCESS){
rpc::ObRequest *tmp_req = static_cast<rpc::ObRequest*>(task);
LOG_WARN("req is timeout and discard", "tenant_id", id_, K(tmp_req));
} else {
LOG_ERROR("pop queue err", "tenant_id", id_, K(ret));
}
} else if (w.is_level_worker()) {
ret = w.get_group()->multi_level_queue_.pop(task, wk_level, timeout);
} else {
for (int32_t level = MAX_REQUEST_LEVEL - 1; level >= GROUP_MULTI_LEVEL_THRESHOLD; level--) {
IGNORE_RETURN w.get_group()->multi_level_queue_.try_pop(task, level);
if (nullptr != task) {
ret = OB_SUCCESS;
break;
}
}
if (nullptr == task) {
ret = w.get_group()->req_queue_.pop(task, timeout);
}
}
} else {
int wk_level = 0;
w.set_large_query(false);
w.set_curr_request_level(0);
wk_level = w.get_worker_level();
@ -1228,21 +1294,25 @@ int ObTenant::get_new_request(
}
}
}
}
if (OB_SUCC(ret)) {
EVENT_INC(REQUEST_DEQUEUE_COUNT);
if (nullptr == req && nullptr != task) {
req = static_cast<rpc::ObRequest*>(task);
if (OB_SUCC(ret)) {
EVENT_INC(REQUEST_DEQUEUE_COUNT);
if (nullptr == req && nullptr != task) {
req = static_cast<rpc::ObRequest*>(task);
}
if (nullptr != req) {
if (w.is_group_worker() && req->large_retry_flag()) {
w.set_large_query();
}
if (nullptr != req && req->get_type() == ObRequest::OB_RPC) {
using obrpc::ObRpcPacket;
const ObRpcPacket &pkt
= static_cast<const ObRpcPacket&>(req->get_packet());
w.set_curr_request_level(pkt.get_request_level());
if (req->get_type() == ObRequest::OB_RPC) {
using obrpc::ObRpcPacket;
const ObRpcPacket &pkt
= static_cast<const ObRpcPacket&>(req->get_packet());
w.set_curr_request_level(pkt.get_request_level());
}
}
}
return ret;
}
@ -1280,6 +1350,7 @@ int ObTenant::recv_group_request(ObRequest &req, int64_t group_id)
ObResourceGroup* group = nullptr;
ObResourceGroupNode* node = nullptr;
ObResourceGroupNode key(group_id);
int req_level = 0;
if (OB_SUCC(GroupMap::err_code_map(group_map_.get(&key, node)))) {
group = static_cast<ObResourceGroup*>(node);
} else if (OB_FAIL(group_map_.create_and_insert_group(group_id, this, &cgroup_ctrl_, group))) {
@ -1292,9 +1363,25 @@ int ObTenant::recv_group_request(ObRequest &req, int64_t group_id)
LOG_INFO("create group successfully", K_(id), K(group_id), K(group));
}
if (OB_SUCC(ret)) {
group->atomic_inc_recv_cnt();
if (OB_FAIL(group->req_queue_.push(&req, 0))) {
LOG_ERROR("push request to queue fail", K(ret), K(this));
if (req.get_type() == ObRequest::OB_RPC) {
using obrpc::ObRpcPacket;
const ObRpcPacket &pkt
= static_cast<const ObRpcPacket&>(req.get_packet());
req_level = min(pkt.get_request_level(), MAX_REQUEST_LEVEL - 1);
}
if (req_level < 0) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected level", K(req_level), K(id_), K(group_id));
} else if (is_user_group(group_id) && req_level >= GROUP_MULTI_LEVEL_THRESHOLD) {
group->recv_level_rpc_cnt_.atomic_inc(req_level);
if (OB_FAIL(group->multi_level_queue_.push(req, req_level, 0))) {
LOG_WARN("push request to queue fail", K(req_level), K(id_), K(group_id));
}
} else {
group->atomic_inc_recv_cnt();
if (OB_FAIL(group->req_queue_.push(&req, 0))) {
LOG_ERROR("push request to queue fail", K(id_), K(group_id));
}
}
int tmp_ret = OB_SUCCESS;
if (!share::ObCgSet::instance().is_group_critical(group_id) && 0 == group->workers_.get_size()) {

View File

@ -287,18 +287,33 @@ public:
int init();
void update_queue_size();
int acquire_more_worker(int64_t num, int64_t &succ_num, bool force = false);
int acquire_level_worker(int32_t level);
void check_worker_count();
void check_worker_count(ObThWorker &w);
int clear_worker();
TO_STRING_KV("group_id", group_id_,
"queue_size", req_queue_.size(),
"recv_req_cnt", recv_req_cnt_,
"min_worker_cnt", min_worker_cnt(),
"max_worker_cnt", max_worker_cnt(),
K(multi_level_queue_),
"recv_level_rpc_cnt", recv_level_rpc_cnt_,
"worker_cnt", workers_.get_size(),
"nesting_worker_cnt", nesting_workers_.get_size(),
"token_change", token_change_ts_);
private:
lib::ObMutex& workers_lock_;
WList workers_;
WList nesting_workers_;
common::ObPriorityQueue2<0, 1> req_queue_;
ObMultiLevelQueue multi_level_queue_;
bool inited_; // Mark whether the container has threads and queues allocated
volatile uint64_t recv_req_cnt_ CACHE_ALIGNED; // Statistics requested to enqueue
volatile bool shrink_ CACHE_ALIGNED;
int64_t token_change_ts_;
MultiLevelReqCnt recv_level_rpc_cnt_;
int nesting_worker_cnt_;
ObTenant *tenant_;
share::ObCgroupCtrl *cgroup_ctrl_;
};
@ -323,20 +338,8 @@ public:
while (NULL != (iter = const_cast<GroupMap*>(this)->GroupHash::quick_next(iter))) {
group = static_cast<ObResourceGroup*>(iter);
common::databuff_printf(buf, buf_len, pos,
"group_id = %d,"
"queue_size = %ld,"
"recv_req_cnt = %lu,"
"min_worker_cnt = %ld,"
"max_worker_cnt = %ld,"
"worker_cnt = %d,"
"token_change = %ld ",
group->group_id_,
group->req_queue_.size(),
group->recv_req_cnt_,
group->min_worker_cnt(),
group->max_worker_cnt(),
group->workers_.get_size(),
group->token_change_ts_);
"%s",
to_cstring(group));
}
return pos;
}

View File

@ -392,10 +392,12 @@ void ObThWorker::worker(int64_t &tenant_id, int64_t &req_recv_timestamp, int32_t
ret = OB_SUCCESS;
}
IGNORE_RETURN ATOMIC_FAA(&idle_us_, (wait_end_time - wait_start_time));
if (this->get_worker_level() == 0 && !is_group_worker()) {
if (this->get_worker_level() != 0) {
// nesting workers not allowed to calling check_worker_count
} else if (this->get_group() == nullptr) {
tenant_->check_worker_count(*this);
tenant_->lq_end(*this);
} else if (this->is_group_worker()) {
} else {
group_->check_worker_count(*this);
}
}