fix ret -4016 when restore meta with undefine tablet
This commit is contained in:
@ -36,13 +36,16 @@ ObArchiveSchedulerService::ObArchiveSchedulerService()
|
||||
{
|
||||
}
|
||||
|
||||
int ObArchiveSchedulerService::mtl_init(ObArchiveSchedulerService *&archive_service)
|
||||
int ObArchiveSchedulerService::init()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObMySQLProxy *sql_proxy = nullptr;
|
||||
obrpc::ObSrvRpcProxy *rpc_proxy = nullptr;
|
||||
share::schema::ObMultiVersionSchemaService *schema_service = nullptr;
|
||||
if (OB_ISNULL(sql_proxy = GCTX.sql_proxy_)) {
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("archive scheduler init twice", K(ret));
|
||||
} else if (OB_ISNULL(sql_proxy = GCTX.sql_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sql_proxy should not be NULL", K(ret), KP(sql_proxy));
|
||||
} else if (OB_ISNULL(rpc_proxy = GCTX.srv_rpc_proxy_)) {
|
||||
@ -51,27 +54,12 @@ int ObArchiveSchedulerService::mtl_init(ObArchiveSchedulerService *&archive_serv
|
||||
} else if (OB_ISNULL(schema_service = GCTX.schema_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema_service should not be NULL", K(ret), KP(schema_service));
|
||||
} else if (OB_FAIL(archive_service->init(*schema_service, *rpc_proxy, *sql_proxy))) {
|
||||
LOG_WARN("fail to init tenant archive service", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObArchiveSchedulerService::init(
|
||||
share::schema::ObMultiVersionSchemaService &schema_service,
|
||||
ObSrvRpcProxy &rpc_proxy,
|
||||
common::ObMySQLProxy &sql_proxy)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("archive scheduler init twice", K(ret));
|
||||
} else if (OB_FAIL(create("ArchiveSvr", *this, ObWaitEventIds::BACKUP_ARCHIVE_SERVICE_COND_WAIT))) {
|
||||
LOG_WARN("failed to create log archive thread", K(ret));
|
||||
} else {
|
||||
schema_service_ = &schema_service;
|
||||
rpc_proxy_ = &rpc_proxy;
|
||||
sql_proxy_ = &sql_proxy;
|
||||
schema_service_ = schema_service;
|
||||
rpc_proxy_ = rpc_proxy;
|
||||
sql_proxy_ = sql_proxy;
|
||||
tenant_id_ = gen_user_tenant_id(MTL_ID());
|
||||
is_inited_ = true;
|
||||
}
|
||||
|
||||
@ -43,13 +43,8 @@ public:
|
||||
const int64_t FAST_IDLE_INTERVAL_US = 10 * 1000 * 1000; // 10s, used during BEGINNING or STOPPING
|
||||
//const int64_t MAX_IDLE_INTERVAL_US = 60 * 1000 * 1000; // 60s
|
||||
const int64_t MAX_IDLE_INTERVAL_US = 10 * 1000 * 1000; // 60s
|
||||
static int mtl_init(ObArchiveSchedulerService *&archive_service);
|
||||
|
||||
int init(
|
||||
share::schema::ObMultiVersionSchemaService &schema_service,
|
||||
obrpc::ObSrvRpcProxy &rpc_proxy,
|
||||
common::ObMySQLProxy &sql_proxy);
|
||||
|
||||
DEFINE_MTL_FUNC(ObArchiveSchedulerService);
|
||||
int init();
|
||||
void run2() override;
|
||||
// force cancel archive
|
||||
int force_cancel(const uint64_t tenant_id);
|
||||
|
||||
@ -34,12 +34,12 @@ ObBackupBaseService::~ObBackupBaseService()
|
||||
void ObBackupBaseService::run1()
|
||||
{
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
lib::set_thread_name(thread_name_);
|
||||
LOG_INFO("ObBackupBaseService thread run", K(thread_name_));
|
||||
if (OB_UNLIKELY(!is_created_)) {
|
||||
tmp_ret = OB_NOT_INIT;
|
||||
LOG_WARN_RET(OB_NOT_INIT, "not init", K(tmp_ret));
|
||||
} else {
|
||||
lib::set_thread_name(thread_name_);
|
||||
ObRSThreadFlag rs_work;
|
||||
run2();
|
||||
}
|
||||
@ -222,3 +222,25 @@ int ObBackupBaseService::check_leader()
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObBackupBaseService::mtl_thread_stop()
|
||||
{
|
||||
LOG_INFO("[BACKUP_SERVICE] thread stop start", K(tg_id_), K(thread_name_));
|
||||
if (-1 != tg_id_) {
|
||||
TG_STOP(tg_id_);
|
||||
}
|
||||
LOG_INFO("[BACKUP_SERVICE] thread stop finish", K(tg_id_), K(thread_name_));
|
||||
}
|
||||
|
||||
void ObBackupBaseService::mtl_thread_wait()
|
||||
{
|
||||
LOG_INFO("[BACKUP_SERVICE] thread wait start", K(tg_id_), K(thread_name_));
|
||||
if (-1 != tg_id_) {
|
||||
{
|
||||
ObThreadCondGuard guard(thread_cond_);
|
||||
thread_cond_.broadcast();
|
||||
}
|
||||
TG_WAIT(tg_id_);
|
||||
}
|
||||
LOG_INFO("[BACKUP_SERVICE] thread wait finish", K(tg_id_), K(thread_name_));
|
||||
}
|
||||
@ -12,6 +12,7 @@
|
||||
#define OCEANBASE_ROOTSERVER_OB_BACKUP_BASE_SERVICE_H_
|
||||
|
||||
#include "lib/thread/thread_mgr_interface.h"
|
||||
#include "rootserver/ob_tenant_thread_helper.h"
|
||||
#include "lib/lock/ob_thread_cond.h"
|
||||
#include "logservice/ob_log_base_type.h"
|
||||
#include "storage/ls/ob_ls.h"
|
||||
@ -44,6 +45,8 @@ public:
|
||||
void wait();
|
||||
void idle();
|
||||
void wakeup();
|
||||
void mtl_thread_stop();
|
||||
void mtl_thread_wait();
|
||||
void set_idle_time(const int64_t interval_time_us) { interval_idle_time_us_ = interval_time_us; }
|
||||
int64_t get_idle_time() const { return interval_idle_time_us_; }
|
||||
// role change
|
||||
|
||||
@ -423,7 +423,7 @@ int ObBackupDataBaseTask::set_optional_servers_(const ObIArray<common::ObAddr> &
|
||||
for (int i = 0; OB_SUCC(ret) && i < replica_array.count(); ++i) {
|
||||
const ObLSReplica &replica = replica_array.at(i);
|
||||
if (replica.is_in_service() && !replica.is_strong_leader() && replica.is_valid() && !replica.is_in_restore()
|
||||
&& ObReplicaTypeCheck::is_full_replica(replica.get_replica_type()) // TODO(chongrong.th) 5.0 allow R replica backup later
|
||||
&& ObReplicaTypeCheck::is_full_replica(replica.get_replica_type()) // TODO(chongrong.th) 4.3 allow R replica backup later
|
||||
&& !check_replica_in_black_server_(replica, black_servers)) {
|
||||
ObBackupServer server;
|
||||
server.set(replica.get_server(), 0/*high priority*/);
|
||||
|
||||
@ -30,26 +30,41 @@ namespace rootserver
|
||||
*----------------------------- ObBackupService -----------------------------
|
||||
*/
|
||||
|
||||
int ObBackupService::init(
|
||||
common::ObMySQLProxy &sql_proxy,
|
||||
obrpc::ObSrvRpcProxy &rpc_proxy,
|
||||
schema::ObMultiVersionSchemaService &schema_service,
|
||||
share::ObLocationService &loacation_service,
|
||||
ObBackupTaskScheduler &task_scheduler)
|
||||
int ObBackupService::init()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t tenant_id = MTL_ID();
|
||||
common::ObMySQLProxy *sql_proxy = nullptr;
|
||||
obrpc::ObSrvRpcProxy *rpc_proxy = nullptr;
|
||||
share::schema::ObMultiVersionSchemaService *schema_service = nullptr;
|
||||
ObBackupTaskScheduler *backup_task_scheduler = nullptr;
|
||||
share::ObLocationService *location_service = nullptr;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("backup mgr already inited", K(ret));
|
||||
} else if (OB_FAIL(task_scheduler.register_backup_srv(*this))) {
|
||||
} else if (OB_ISNULL(sql_proxy = GCTX.sql_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sql_proxy should not be NULL", K(ret), KP(sql_proxy));
|
||||
} else if (OB_ISNULL(rpc_proxy = GCTX.srv_rpc_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("rpc_proxy should not be NULL", K(ret), KP(rpc_proxy));
|
||||
} else if (OB_ISNULL(schema_service = GCTX.schema_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema_service should not be NULL", K(ret), KP(schema_service));
|
||||
} else if (OB_ISNULL(backup_task_scheduler = MTL(ObBackupTaskScheduler *))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("backup_task_scheduler should not be NULL", K(ret), KP(backup_task_scheduler));
|
||||
} else if (OB_ISNULL(location_service = GCTX.location_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("location_service should not be NULL", K(ret), KP(backup_task_scheduler));
|
||||
} else if (OB_FAIL(backup_task_scheduler->register_backup_srv(*this))) {
|
||||
LOG_WARN("failed to register backup srv", K(ret));
|
||||
} else if (OB_FAIL(sub_init(sql_proxy, rpc_proxy, schema_service, loacation_service, task_scheduler))) {
|
||||
} else if (OB_FAIL(sub_init(*sql_proxy, *rpc_proxy, *schema_service, *location_service, *backup_task_scheduler))) {
|
||||
LOG_WARN("failed to do sub init", K(ret));
|
||||
} else {
|
||||
tenant_id_ = tenant_id;
|
||||
task_scheduler_ = &task_scheduler;
|
||||
schema_service_ = &schema_service;
|
||||
task_scheduler_ = backup_task_scheduler;
|
||||
schema_service_ = schema_service;
|
||||
is_inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
@ -69,7 +84,7 @@ void ObBackupService::run2()
|
||||
LOG_WARN("fail to get schema guard", KR(ret));
|
||||
} else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id_, tenant_schema))) {
|
||||
LOG_WARN("failed to get schema ", KR(ret), K(tenant_id_));
|
||||
} else if (tenant_schema->is_normal()) {
|
||||
} else if (OB_NOT_NULL(tenant_schema) && tenant_schema->is_normal()) {
|
||||
if (can_schedule()) {
|
||||
process(last_trigger_ts);
|
||||
} else {
|
||||
@ -115,35 +130,6 @@ void ObBackupService::destroy()
|
||||
*----------------------------- ObBackupDataService -----------------------------
|
||||
*/
|
||||
|
||||
int ObBackupDataService::mtl_init(ObBackupDataService *&srv)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObMySQLProxy *sql_proxy = nullptr;
|
||||
obrpc::ObSrvRpcProxy *rpc_proxy = nullptr;
|
||||
share::schema::ObMultiVersionSchemaService *schema_service = nullptr;
|
||||
ObBackupTaskScheduler *backup_task_scheduler = nullptr;
|
||||
share::ObLocationService *location_service = nullptr;
|
||||
if (OB_ISNULL(sql_proxy = GCTX.sql_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sql_proxy should not be NULL", K(ret), KP(sql_proxy));
|
||||
} else if (OB_ISNULL(rpc_proxy = GCTX.srv_rpc_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("rpc_proxy should not be NULL", K(ret), KP(rpc_proxy));
|
||||
} else if (OB_ISNULL(schema_service = GCTX.schema_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema_service should not be NULL", K(ret), KP(schema_service));
|
||||
} else if (OB_ISNULL(backup_task_scheduler = MTL(ObBackupTaskScheduler *))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("backup_task_scheduler should not be NULL", K(ret), KP(backup_task_scheduler));
|
||||
} else if (OB_ISNULL(location_service = GCTX.location_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("location_service should not be NULL", K(ret), KP(backup_task_scheduler));
|
||||
} else if (OB_FAIL(srv->init(*sql_proxy, *rpc_proxy, *schema_service, *location_service, *backup_task_scheduler))) {
|
||||
LOG_WARN("fail to ini backup service");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBackupDataService::sub_init(
|
||||
common::ObMySQLProxy &sql_proxy,
|
||||
obrpc::ObSrvRpcProxy &rpc_proxy,
|
||||
@ -238,36 +224,6 @@ int ObBackupDataService::handle_backup_database_cancel(const uint64_t tenant_id,
|
||||
*----------------------------- ObBackupCleanService -----------------------------
|
||||
*/
|
||||
|
||||
|
||||
int ObBackupCleanService::mtl_init(ObBackupCleanService *&srv)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObMySQLProxy *sql_proxy = nullptr;
|
||||
obrpc::ObSrvRpcProxy *rpc_proxy = nullptr;
|
||||
share::schema::ObMultiVersionSchemaService *schema_service = nullptr;
|
||||
ObBackupTaskScheduler *backup_task_scheduler = nullptr;
|
||||
share::ObLocationService *location_service = nullptr;
|
||||
if (OB_ISNULL(sql_proxy = GCTX.sql_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sql_proxy should not be NULL", K(ret), KP(sql_proxy));
|
||||
} else if (OB_ISNULL(rpc_proxy = GCTX.srv_rpc_proxy_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("rpc_proxy should not be NULL", K(ret), KP(rpc_proxy));
|
||||
} else if (OB_ISNULL(schema_service = GCTX.schema_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema_service should not be NULL", K(ret), KP(schema_service));
|
||||
} else if (OB_ISNULL(backup_task_scheduler = MTL(ObBackupTaskScheduler *))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("backup_task_scheduler should not be NULL", K(ret), KP(backup_task_scheduler));
|
||||
} else if (OB_ISNULL(location_service = GCTX.location_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("location_service should not be NULL", K(ret), KP(backup_task_scheduler));
|
||||
} else if (OB_FAIL(srv->init(*sql_proxy, *rpc_proxy, *schema_service, *location_service, *backup_task_scheduler))) {
|
||||
LOG_WARN("fail to ini backup service");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBackupCleanService::sub_init(
|
||||
common::ObMySQLProxy &sql_proxy,
|
||||
obrpc::ObSrvRpcProxy &rpc_proxy,
|
||||
|
||||
@ -28,10 +28,7 @@ public:
|
||||
ObBackupService(): is_inited_(false), tenant_id_(OB_INVALID_TENANT_ID), can_schedule_(false), task_scheduler_(nullptr),
|
||||
schema_service_(nullptr) {}
|
||||
virtual ~ObBackupService() {};
|
||||
int init(common::ObMySQLProxy &sql_proxy, obrpc::ObSrvRpcProxy &rpc_proxy,
|
||||
share::schema::ObMultiVersionSchemaService &schema_service,
|
||||
share::ObLocationService &loacation_service,
|
||||
ObBackupTaskScheduler &task_scheduler);
|
||||
int init();
|
||||
void run2() override final;
|
||||
virtual int process(int64_t &last_schedule_ts) = 0;
|
||||
void destroy() override final;
|
||||
@ -71,7 +68,7 @@ class ObBackupDataService final : public ObBackupService
|
||||
public:
|
||||
ObBackupDataService(): ObBackupService(), backup_data_scheduler_() {}
|
||||
virtual ~ObBackupDataService() {}
|
||||
static int mtl_init(ObBackupDataService *&srv);
|
||||
DEFINE_MTL_FUNC(ObBackupDataService);
|
||||
int process(int64_t &last_schedule_ts) override;
|
||||
|
||||
ObIBackupJobScheduler *get_scheduler(const BackupJobType &type);
|
||||
@ -95,7 +92,7 @@ public:
|
||||
ObBackupCleanService() : ObBackupService(), backup_clean_scheduler_(), backup_auto_obsolete_delete_trigger_(),
|
||||
jobs_(), triggers_() {}
|
||||
virtual ~ObBackupCleanService() {}
|
||||
static int mtl_init(ObBackupCleanService *&srv);
|
||||
DEFINE_MTL_FUNC(ObBackupCleanService);
|
||||
int process(int64_t &last_schedule_ts) override;
|
||||
|
||||
ObIBackupJobScheduler *get_scheduler(const BackupJobType &type);
|
||||
|
||||
@ -1124,13 +1124,16 @@ ObBackupTaskScheduler::ObBackupTaskScheduler()
|
||||
{
|
||||
}
|
||||
|
||||
int ObBackupTaskScheduler::mtl_init(ObBackupTaskScheduler *&backup_task_scheduler)
|
||||
int ObBackupTaskScheduler::init()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObMySQLProxy *sql_proxy = GCTX.sql_proxy_;
|
||||
obrpc::ObSrvRpcProxy *rpc_proxy = GCTX.srv_rpc_proxy_;
|
||||
share::schema::ObMultiVersionSchemaService *schema_service = GCTX.schema_service_;
|
||||
if (OB_ISNULL(sql_proxy)) {
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("init twice", K(ret));
|
||||
} else if (OB_ISNULL(sql_proxy)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sql_proxy should not be NULL", K(ret), KP(sql_proxy));
|
||||
} else if (OB_ISNULL(rpc_proxy)) {
|
||||
@ -1139,21 +1142,6 @@ int ObBackupTaskScheduler::mtl_init(ObBackupTaskScheduler *&backup_task_schedule
|
||||
} else if (OB_ISNULL(schema_service)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("scheam service can not be NULL", K(ret), KP(schema_service));
|
||||
} else if (OB_FAIL(backup_task_scheduler->init(rpc_proxy, *sql_proxy, *schema_service))) {
|
||||
LOG_WARN("fail to init backup_task_scheduler", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBackupTaskScheduler::init(
|
||||
obrpc::ObSrvRpcProxy *rpc_proxy,
|
||||
common::ObMySQLProxy &sql_proxy,
|
||||
share::schema::ObMultiVersionSchemaService &schema_service)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("init twice", K(ret));
|
||||
} else if (OB_UNLIKELY(nullptr == rpc_proxy)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(rpc_proxy));
|
||||
@ -1162,11 +1150,11 @@ int ObBackupTaskScheduler::init(
|
||||
} else {
|
||||
tenant_id_ = MTL_ID();
|
||||
rpc_proxy_ = rpc_proxy;
|
||||
schema_service_ = &schema_service;
|
||||
if (OB_FAIL(queue_.init(MAX_BACKUP_TASK_QUEUE_LIMIT, rpc_proxy_, this, MAX_BACKUP_TASK_QUEUE_LIMIT, sql_proxy))) {
|
||||
schema_service_ = schema_service;
|
||||
if (OB_FAIL(queue_.init(MAX_BACKUP_TASK_QUEUE_LIMIT, rpc_proxy_, this, MAX_BACKUP_TASK_QUEUE_LIMIT, *sql_proxy))) {
|
||||
LOG_WARN("init rebalance task queue failed", K(ret), LITERAL_K(MAX_BACKUP_TASK_QUEUE_LIMIT));
|
||||
} else {
|
||||
sql_proxy_ = &sql_proxy;
|
||||
sql_proxy_ = sql_proxy;
|
||||
is_inited_ = true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -142,10 +142,8 @@ public:
|
||||
public:
|
||||
ObBackupTaskScheduler();
|
||||
virtual ~ObBackupTaskScheduler() {}
|
||||
static int mtl_init(ObBackupTaskScheduler *&backup_task_scheduler);
|
||||
int init(obrpc::ObSrvRpcProxy *rpc_proxy,
|
||||
common::ObMySQLProxy &sql_proxy,
|
||||
share::schema::ObMultiVersionSchemaService &schema_service);
|
||||
DEFINE_MTL_FUNC(ObBackupTaskScheduler);
|
||||
int init();
|
||||
|
||||
virtual void run2() override final;
|
||||
virtual void destroy() override final;
|
||||
|
||||
Reference in New Issue
Block a user