BUGFIX: move safe destroy worker into tenant
This commit is contained in:
@ -1011,7 +1011,9 @@ ObGarbageCollector::ObGarbageCollector() : is_inited_(false),
|
||||
rpc_proxy_(NULL),
|
||||
sql_proxy_(NULL),
|
||||
self_addr_(),
|
||||
seq_(1)
|
||||
seq_(1),
|
||||
safe_destroy_handler_(),
|
||||
stop_create_new_gc_task_(true)
|
||||
{
|
||||
}
|
||||
|
||||
@ -1042,6 +1044,8 @@ int ObGarbageCollector::init(ObLSService *ls_service,
|
||||
|| OB_ISNULL(sql_proxy) || !self_addr.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
CLOG_LOG(WARN, "invalid arguments", K(ret), KP(ls_service), KP(rpc_proxy), KP(sql_proxy), K(self_addr));
|
||||
} else if (OB_FAIL(safe_destroy_handler_.init())) {
|
||||
CLOG_LOG(WARN, "safe destroy handler init failed", K(ret));
|
||||
} else {
|
||||
ls_service_ = ls_service;
|
||||
rpc_proxy_ = rpc_proxy;
|
||||
@ -1063,8 +1067,11 @@ int ObGarbageCollector::start()
|
||||
CLOG_LOG(WARN, "ObGarbageCollector is not inited", K(ret));
|
||||
} else if (OB_FAIL(ObThreadPool::start())) {
|
||||
CLOG_LOG(ERROR, "ObGarbageCollector thread failed to start", K(ret));
|
||||
} else if (OB_FAIL(safe_destroy_handler_.start())) {
|
||||
CLOG_LOG(ERROR, "safe destroy handler failed to start", K(ret));
|
||||
} else {
|
||||
// do nothing
|
||||
stop_create_new_gc_task_ = false;
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -1072,12 +1079,19 @@ int ObGarbageCollector::start()
|
||||
|
||||
void ObGarbageCollector::stop()
|
||||
{
|
||||
ObThreadPool::stop();
|
||||
CLOG_LOG(INFO, "ObGarbageCollector stop");
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(safe_destroy_handler_.stop())) {
|
||||
CLOG_LOG(WARN, "safe destroy handler stop failed", K(ret));
|
||||
} else {
|
||||
stop_create_new_gc_task_ = true;
|
||||
CLOG_LOG(INFO, "ObGarbageCollector stop");
|
||||
}
|
||||
}
|
||||
|
||||
void ObGarbageCollector::wait()
|
||||
{
|
||||
safe_destroy_handler_.wait();
|
||||
ObThreadPool::stop();
|
||||
ObThreadPool::wait();
|
||||
CLOG_LOG(INFO, "ObGarbageCollector wait");
|
||||
}
|
||||
@ -1091,6 +1105,7 @@ void ObGarbageCollector::destroy()
|
||||
rpc_proxy_ = NULL;
|
||||
sql_proxy_ = NULL;
|
||||
self_addr_.reset();
|
||||
safe_destroy_handler_.destroy();
|
||||
}
|
||||
|
||||
void ObGarbageCollector::run1()
|
||||
@ -1099,17 +1114,21 @@ void ObGarbageCollector::run1()
|
||||
lib::set_thread_name("GCCollector");
|
||||
|
||||
while (!has_set_stop()) {
|
||||
ObGCCandidateArray gc_candidates;
|
||||
int64_t gc_interval = GC_INTERVAL;
|
||||
CLOG_LOG(INFO, "Garbage Collector is running", K(seq_), K(gc_interval));
|
||||
gc_candidates.reset();
|
||||
(void)gc_check_member_list_(gc_candidates);
|
||||
(void)execute_gc_(gc_candidates);
|
||||
gc_candidates.reset();
|
||||
(void)gc_check_ls_status_(gc_candidates);
|
||||
(void)execute_gc_(gc_candidates);
|
||||
ob_usleep(gc_interval);
|
||||
seq_++;
|
||||
if (!stop_create_new_gc_task_) {
|
||||
ObGCCandidateArray gc_candidates;
|
||||
int64_t gc_interval = GC_INTERVAL;
|
||||
CLOG_LOG(INFO, "Garbage Collector is running", K(seq_), K(gc_interval));
|
||||
gc_candidates.reset();
|
||||
(void)gc_check_member_list_(gc_candidates);
|
||||
(void)execute_gc_(gc_candidates);
|
||||
gc_candidates.reset();
|
||||
(void)gc_check_ls_status_(gc_candidates);
|
||||
(void)execute_gc_(gc_candidates);
|
||||
ob_usleep(gc_interval);
|
||||
seq_++;
|
||||
}
|
||||
// safe destroy task
|
||||
(void) safe_destroy_handler_.handle();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1131,6 +1150,15 @@ int ObGarbageCollector::get_ls_status_from_table(const ObLSID &ls_id,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObGarbageCollector::add_safe_destroy_task(ObSafeDestroyTask &task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(safe_destroy_handler_.push(task))) {
|
||||
CLOG_LOG(WARN, "failed to add safe destroy task", K(ret), K(task));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObGarbageCollector::is_ls_dropping_ls_status(const LSStatus &status)
|
||||
{
|
||||
return LSStatus::LS_DROPPING == status;
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
#include "share/ob_thread_pool.h"
|
||||
#include "share/ob_ls_id.h"
|
||||
#include "share/ls/ob_ls_status_operator.h"
|
||||
#include "storage/tx_storage/ob_safe_destroy_handler.h"
|
||||
#include "logservice/ob_log_base_header.h"
|
||||
#include "logservice/ob_append_callback.h"
|
||||
|
||||
@ -185,6 +186,12 @@ public:
|
||||
static bool is_tenant_dropping_ls_status(const LSStatus &status);
|
||||
int get_ls_status_from_table(const share::ObLSID &ls_id,
|
||||
share::ObLSStatus &ls_status);
|
||||
int add_safe_destroy_task(storage::ObSafeDestroyTask &task);
|
||||
template <typename Function>
|
||||
int safe_destroy_task_for_each(Function &fn)
|
||||
{
|
||||
return safe_destroy_handler_.for_each(fn);
|
||||
}
|
||||
private:
|
||||
bool is_valid_ls_status_(const LSStatus &status);
|
||||
bool is_need_gc_ls_status_(const LSStatus &status);
|
||||
@ -216,6 +223,9 @@ private:
|
||||
common::ObMySQLProxy *sql_proxy_;
|
||||
common::ObAddr self_addr_;
|
||||
int64_t seq_;
|
||||
storage::ObSafeDestroyHandler safe_destroy_handler_;
|
||||
// stop push task, but will process the left task.
|
||||
bool stop_create_new_gc_task_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObGarbageCollector);
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user