fix: wait retry queue empty when remove tenant
This commit is contained in:
@ -28,16 +28,23 @@ int ObRetryQueue::push(ObRequest &req, const uint64_t timestamp)
|
|||||||
return queue_[queue_idx].push(&req);
|
return queue_[queue_idx].push(&req);
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObRetryQueue::pop(ObLink *&task)
|
int ObRetryQueue::pop(ObLink *&task, bool need_clear)
|
||||||
{
|
{
|
||||||
int ret = OB_ENTRY_NOT_EXIST;
|
int ret = OB_ENTRY_NOT_EXIST;
|
||||||
uint64_t curr_timestamp = ObTimeUtility::current_time();
|
uint64_t curr_timestamp = ObTimeUtility::current_time();
|
||||||
uint64_t idx = last_timestamp_ / RETRY_QUEUE_TIMESTEP;
|
uint64_t idx = last_timestamp_ / RETRY_QUEUE_TIMESTEP;
|
||||||
|
if (!need_clear) {
|
||||||
int queue_idx = idx & (RETRY_QUEUE_SIZE - 1);
|
int queue_idx = idx & (RETRY_QUEUE_SIZE - 1);
|
||||||
while (last_timestamp_ <= curr_timestamp && OB_FAIL(queue_[queue_idx].pop(task))) {
|
while (last_timestamp_ <= curr_timestamp && OB_FAIL(queue_[queue_idx].pop(task))) {
|
||||||
ATOMIC_FAA(&last_timestamp_, RETRY_QUEUE_TIMESTEP);
|
ATOMIC_FAA(&last_timestamp_, RETRY_QUEUE_TIMESTEP);
|
||||||
queue_idx = (++idx) & (RETRY_QUEUE_SIZE - 1);
|
queue_idx = (++idx) & (RETRY_QUEUE_SIZE - 1);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
int queue_idx = 0;
|
||||||
|
while (queue_idx < RETRY_QUEUE_SIZE && OB_FAIL(queue_[queue_idx].pop(task))) {
|
||||||
|
queue_idx++;
|
||||||
|
}
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -30,7 +30,7 @@ public:
|
|||||||
}
|
}
|
||||||
enum { RETRY_QUEUE_SIZE = 256 };
|
enum { RETRY_QUEUE_SIZE = 256 };
|
||||||
int push(rpc::ObRequest &req, const uint64_t timestamp);
|
int push(rpc::ObRequest &req, const uint64_t timestamp);
|
||||||
int pop(common::ObLink *&task);
|
int pop(common::ObLink *&task, bool need_clear = false);
|
||||||
uint64_t get_last_timestamp() const;
|
uint64_t get_last_timestamp() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|||||||
@ -421,6 +421,9 @@ int ObResourceGroup::clear_worker()
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObMutexGuard guard(workers_lock_);
|
ObMutexGuard guard(workers_lock_);
|
||||||
|
while (req_queue_.size() > 0) {
|
||||||
|
ob_usleep(10L * 1000L);
|
||||||
|
}
|
||||||
while (workers_.get_size() > 0) {
|
while (workers_.get_size() > 0) {
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
|
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
|
||||||
@ -829,6 +832,10 @@ int ObTenant::create_tenant_module()
|
|||||||
void ObTenant::wait()
|
void ObTenant::wait()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
handle_retry_req(true);
|
||||||
|
while (req_queue_.size() > 0) {
|
||||||
|
ob_usleep(10L * 1000L);
|
||||||
|
}
|
||||||
while (workers_.get_size() > 0) {
|
while (workers_.get_size() > 0) {
|
||||||
if (OB_SUCC(workers_lock_.trylock())) {
|
if (OB_SUCC(workers_lock_.trylock())) {
|
||||||
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
|
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
|
||||||
@ -1224,11 +1231,8 @@ int ObTenant::recv_large_request(rpc::ObRequest &req)
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
req.set_enqueue_timestamp(ObTimeUtility::current_time());
|
req.set_enqueue_timestamp(ObTimeUtility::current_time());
|
||||||
req.set_large_retry_flag(true);
|
req.set_large_retry_flag(true);
|
||||||
if (ATOMIC_LOAD(&stopped_)) {
|
if (0 != req.get_group_id()) {
|
||||||
ret = OB_IN_STOP_STATE;
|
if (OB_FAIL(recv_group_request(req, req.get_group_id()))) {
|
||||||
LOG_WARN("receive large request but tenant has already stopped", K(ret), K(id_));
|
|
||||||
} else if (0 != req.get_group_id()) {
|
|
||||||
if (OB_FAIL(recv_request(req))) {
|
|
||||||
LOG_WARN("tenant receive large retry request fail", K(ret));
|
LOG_WARN("tenant receive large retry request fail", K(ret));
|
||||||
}
|
}
|
||||||
} else if (OB_FAIL(recv_group_request(req, OBCG_LQ))){
|
} else if (OB_FAIL(recv_group_request(req, OBCG_LQ))){
|
||||||
@ -1242,7 +1246,14 @@ int ObTenant::recv_large_request(rpc::ObRequest &req)
|
|||||||
|
|
||||||
int ObTenant::push_retry_queue(rpc::ObRequest &req, const uint64_t timestamp)
|
int ObTenant::push_retry_queue(rpc::ObRequest &req, const uint64_t timestamp)
|
||||||
{
|
{
|
||||||
return retry_queue_.push(req, timestamp);
|
int ret = OB_SUCCESS;
|
||||||
|
if (ATOMIC_LOAD(&stopped_)) {
|
||||||
|
ret = OB_IN_STOP_STATE;
|
||||||
|
LOG_WARN("receive retry request but tenant has already stopped", K(ret), K(id_));
|
||||||
|
} else if (OB_FAIL(retry_queue_.push(req, timestamp))) {
|
||||||
|
LOG_ERROR("push retry queue failed", K(ret), K(id_));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTenant::timeup()
|
int ObTenant::timeup()
|
||||||
@ -1263,12 +1274,12 @@ int ObTenant::timeup()
|
|||||||
return OB_SUCCESS;
|
return OB_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObTenant::handle_retry_req()
|
void ObTenant::handle_retry_req(bool need_clear)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObLink* task = nullptr;
|
ObLink* task = nullptr;
|
||||||
ObRequest *req = NULL;
|
ObRequest *req = NULL;
|
||||||
while (OB_SUCC(retry_queue_.pop(task))) {
|
while (OB_SUCC(retry_queue_.pop(task, need_clear))) {
|
||||||
req = static_cast<rpc::ObRequest*>(task);
|
req = static_cast<rpc::ObRequest*>(task);
|
||||||
if (OB_FAIL(recv_large_request(*req))) {
|
if (OB_FAIL(recv_large_request(*req))) {
|
||||||
LOG_ERROR("tenant patrol push req fail", "tenant", id_);
|
LOG_ERROR("tenant patrol push req fail", "tenant", id_);
|
||||||
|
|||||||
@ -446,7 +446,7 @@ public:
|
|||||||
int recv_request(rpc::ObRequest &req);
|
int recv_request(rpc::ObRequest &req);
|
||||||
int recv_large_request(rpc::ObRequest &req);
|
int recv_large_request(rpc::ObRequest &req);
|
||||||
int push_retry_queue(rpc::ObRequest &req, const uint64_t idx);
|
int push_retry_queue(rpc::ObRequest &req, const uint64_t idx);
|
||||||
void handle_retry_req();
|
void handle_retry_req(bool need_clear = false);
|
||||||
void check_worker_count(ObThWorker &w);
|
void check_worker_count(ObThWorker &w);
|
||||||
void update_queue_size();
|
void update_queue_size();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user