[FEAT MERGE] server manager 微服务化改造
Co-authored-by: maosy <630014370@qq.com>
This commit is contained in:
@ -24,16 +24,15 @@
|
||||
#include "ob_disaster_recovery_task_executor.h"
|
||||
#include "rootserver/ob_root_balancer.h"
|
||||
#include "ob_rs_event_history_table_operator.h"
|
||||
#include "ob_server_manager.h"
|
||||
#include "share/ob_rpc_struct.h"
|
||||
#include "observer/ob_server_struct.h"
|
||||
#include "share/ob_server_status.h"
|
||||
#include "sql/executor/ob_executor_rpc_proxy.h"
|
||||
#include "rootserver/ob_disaster_recovery_task.h" // for ObDRTaskType
|
||||
#include "share/ob_share_util.h" // for ObShareUtil
|
||||
#include "lib/lock/ob_tc_rwlock.h" // for common::RWLock
|
||||
#include "rootserver/ob_disaster_recovery_task.h"
|
||||
#include "share/inner_table/ob_inner_table_schema_constants.h"
|
||||
#include "share/ob_all_server_tracer.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -49,7 +48,6 @@ ObDRTaskQueue::ObDRTaskQueue() : inited_(false),
|
||||
schedule_list_(),
|
||||
task_map_(),
|
||||
rpc_proxy_(nullptr),
|
||||
server_mgr_(nullptr),
|
||||
priority_(ObDRTaskPriority::MAX_PRI)
|
||||
{
|
||||
}
|
||||
@ -101,7 +99,6 @@ int ObDRTaskQueue::init(
|
||||
common::ObServerConfig &config,
|
||||
const int64_t bucket_num,
|
||||
obrpc::ObSrvRpcProxy *rpc_proxy,
|
||||
ObServerManager *server_mgr,
|
||||
ObDRTaskPriority priority)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -110,10 +107,9 @@ int ObDRTaskQueue::init(
|
||||
LOG_WARN("init twice", KR(ret));
|
||||
} else if (OB_UNLIKELY(bucket_num <= 0)
|
||||
|| OB_ISNULL(rpc_proxy)
|
||||
|| OB_ISNULL(server_mgr)
|
||||
|| (ObDRTaskPriority::LOW_PRI != priority && ObDRTaskPriority::HIGH_PRI != priority)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(bucket_num), KP(rpc_proxy), KP(server_mgr), K(priority));
|
||||
LOG_WARN("invalid argument", KR(ret), K(bucket_num), KP(rpc_proxy), K(priority));
|
||||
} else if (OB_FAIL(task_map_.create(bucket_num, "DRTaskMap"))) {
|
||||
LOG_WARN("fail to create task map", KR(ret), K(bucket_num));
|
||||
} else if (OB_FAIL(task_alloc_.init(
|
||||
@ -123,7 +119,6 @@ int ObDRTaskQueue::init(
|
||||
} else {
|
||||
config_ = &config;
|
||||
rpc_proxy_ = rpc_proxy;
|
||||
server_mgr_ = server_mgr;
|
||||
priority_ = priority;
|
||||
inited_ = true;
|
||||
}
|
||||
@ -414,32 +409,30 @@ int ObDRTaskQueue::check_task_need_cleaning_(
|
||||
// (3) rpc ls_check_dr_task_exist successfully told us task not exist
|
||||
// (4) task is timeout while any failure during whole procedure
|
||||
need_cleanning = false;
|
||||
share::ObServerStatus server_status;
|
||||
Bool task_exist = false;
|
||||
bool server_exist = true;
|
||||
|
||||
const ObAddr &dst_server = task.get_dst_server();
|
||||
share::ObServerInfoInTable server_info;
|
||||
if (OB_UNLIKELY(!inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("task queue not init", KR(ret));
|
||||
} else if (OB_ISNULL(server_mgr_) || OB_ISNULL(rpc_proxy_)) {
|
||||
} else if (OB_ISNULL(rpc_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("some ptr is null", KR(ret), KP(server_mgr_), KP(rpc_proxy_));
|
||||
} else if (OB_FAIL(server_mgr_->is_server_exist(task.get_dst_server(), server_exist))) {
|
||||
LOG_WARN("fail to check is server exist", KR(ret), "server", task.get_dst_server());
|
||||
} else if (!server_exist) {
|
||||
// case 1. server not exist
|
||||
FLOG_INFO("the reason to clean this task: server not exist", K(task));
|
||||
need_cleanning = true;
|
||||
ret_comment = ObDRTaskRetComment::CLEAN_TASK_DUE_TO_SERVER_NOT_EXIST;
|
||||
} else if (OB_FAIL(server_mgr_->get_server_status(task.get_dst_server(), server_status))) {
|
||||
// we only care about HeartBeatStatus in server_status
|
||||
LOG_WARN("fail to get server status", KR(ret), "server", task.get_dst_server());
|
||||
} else if (server_status.is_permanent_offline()) {
|
||||
// case 2. server status is permanant offline
|
||||
FLOG_INFO("the reason to clean this task: server permanent offline", K(task), K(server_status));
|
||||
LOG_WARN("some ptr is null", KR(ret), KP(rpc_proxy_));
|
||||
} else if (OB_FAIL(SVR_TRACER.get_server_info(dst_server, server_info))) {
|
||||
LOG_WARN("fail to get server_info", KR(ret), "server", dst_server);
|
||||
// case 1. server not exist
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
FLOG_INFO("the reason to clean this task: server not exist", K(task));
|
||||
need_cleanning = true;
|
||||
ret_comment = ObDRTaskRetComment::CLEAN_TASK_DUE_TO_SERVER_NOT_EXIST;
|
||||
}
|
||||
} else if (server_info.is_permanent_offline()) {
|
||||
// case 2. server is permanant offline
|
||||
FLOG_INFO("the reason to clean this task: server permanent offline", K(task), K(server_info));
|
||||
need_cleanning = true;
|
||||
ret_comment = ObDRTaskRetComment::CLEAN_TASK_DUE_TO_SERVER_PERMANENT_OFFLINE;
|
||||
} else if (server_status.is_alive()) {
|
||||
} else if (server_info.is_alive()) {
|
||||
ObDRTaskExistArg arg;
|
||||
arg.task_id_ = task.get_task_id();
|
||||
arg.tenant_id_ = task.get_tenant_id();
|
||||
@ -454,12 +447,12 @@ int ObDRTaskQueue::check_task_need_cleaning_(
|
||||
need_cleanning = true;
|
||||
ret_comment = ObDRTaskRetComment::CLEAN_TASK_DUE_TO_TASK_NOT_RUNNING;
|
||||
}
|
||||
} else if (server_status.is_temporary_offline()) {
|
||||
} else if (server_info.is_temporary_offline()) {
|
||||
ret = OB_SERVER_NOT_ALIVE;
|
||||
LOG_WARN("server status is not alive, task may be cleanned later", KR(ret), "server", task.get_dst_server(), K(server_status), K(task));
|
||||
LOG_WARN("server status is not alive, task may be cleanned later", KR(ret), "server", task.get_dst_server(), K(server_info), K(task));
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected server status", KR(ret), "server", task.get_dst_server(), K(server_status), K(task));
|
||||
LOG_WARN("unexpected server status", KR(ret), "server", task.get_dst_server(), K(server_info), K(task));
|
||||
}
|
||||
|
||||
// case 4. task is timeout while any OB_FAIL occurs
|
||||
@ -608,7 +601,6 @@ int ObDRTaskMgr::init(
|
||||
const common::ObAddr &server,
|
||||
common::ObServerConfig &config,
|
||||
ObDRTaskExecutor &task_executor,
|
||||
ObServerManager *server_mgr,
|
||||
obrpc::ObSrvRpcProxy *rpc_proxy,
|
||||
common::ObMySQLProxy *sql_proxy,
|
||||
share::schema::ObMultiVersionSchemaService *schema_service)
|
||||
@ -619,12 +611,11 @@ int ObDRTaskMgr::init(
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("init twice", KR(ret), K(inited_), K_(stopped));
|
||||
} else if (OB_UNLIKELY(!server.is_valid())
|
||||
|| OB_ISNULL(server_mgr)
|
||||
|| OB_ISNULL(rpc_proxy)
|
||||
|| OB_ISNULL(sql_proxy)
|
||||
|| OB_ISNULL(schema_service)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(server), KP(server_mgr), KP(rpc_proxy),
|
||||
LOG_WARN("invalid argument", KR(ret), K(server), KP(rpc_proxy),
|
||||
KP(sql_proxy), KP(schema_service));
|
||||
} else if (OB_FAIL(cond_.init(ObWaitEventIds::REBALANCE_TASK_MGR_COND_WAIT))) {
|
||||
LOG_WARN("fail to init cond", KR(ret));
|
||||
@ -634,15 +625,14 @@ int ObDRTaskMgr::init(
|
||||
config_ = &config;
|
||||
self_ = server;
|
||||
task_executor_ = &task_executor;
|
||||
server_mgr_ = server_mgr;
|
||||
rpc_proxy_ = rpc_proxy;
|
||||
sql_proxy_ = sql_proxy;
|
||||
schema_service_ = schema_service;
|
||||
if (OB_FAIL(high_task_queue_.init(
|
||||
config, TASK_QUEUE_LIMIT, rpc_proxy_, server_mgr_, ObDRTaskPriority::HIGH_PRI))) {
|
||||
config, TASK_QUEUE_LIMIT, rpc_proxy_, ObDRTaskPriority::HIGH_PRI))) {
|
||||
LOG_WARN("fail to init high priority task queue", KR(ret));
|
||||
} else if (OB_FAIL(low_task_queue_.init(
|
||||
config, TASK_QUEUE_LIMIT, rpc_proxy_, server_mgr_, ObDRTaskPriority::LOW_PRI))) {
|
||||
config, TASK_QUEUE_LIMIT, rpc_proxy_, ObDRTaskPriority::LOW_PRI))) {
|
||||
LOG_WARN("fail to init low priority task queue", KR(ret));
|
||||
} else if (OB_FAIL(disaster_recovery_task_table_updater_.init(sql_proxy, this))) {
|
||||
LOG_WARN("fail to init a ObDRTaskTableUpdater", KR(ret));
|
||||
|
||||
Reference in New Issue
Block a user