remove log restore controller

This commit is contained in:
taoshuning
2023-08-02 07:42:25 +00:00
committed by ob-robot
parent 9aa4d8a313
commit f6c1852266
11 changed files with 8 additions and 276 deletions

View File

@ -179,7 +179,6 @@ ob_set_subtarget(ob_logservice restoreservice
restoreservice/ob_log_restore_allocator.cpp restoreservice/ob_log_restore_allocator.cpp
restoreservice/ob_remote_fetch_context.cpp restoreservice/ob_remote_fetch_context.cpp
restoreservice/ob_log_restore_scheduler.cpp restoreservice/ob_log_restore_scheduler.cpp
restoreservice/ob_log_restore_controller.cpp
restoreservice/ob_log_restore_driver_base.cpp restoreservice/ob_log_restore_driver_base.cpp
restoreservice/ob_log_restore_net_driver.cpp restoreservice/ob_log_restore_net_driver.cpp
restoreservice/ob_log_restore_archive_driver.cpp restoreservice/ob_log_restore_archive_driver.cpp

View File

@ -1,133 +0,0 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX CLOG
#include <algorithm>
#include "lib/oblog/ob_log_module.h"
#include "lib/utility/ob_macro_utils.h"
#include "lib/ob_define.h"
#include "lib/ob_errno.h"
#include "lib/atomic/ob_atomic.h"
#include "lib/time/ob_time_utility.h"
#include "share/ob_debug_sync.h" // DEBUG
#include "logservice/ob_log_service.h"
#include "ob_log_restore_controller.h"
namespace oceanbase
{
namespace logservice
{
ObLogRestoreController::ObLogRestoreController() :
inited_(false),
tenant_id_(OB_INVALID_TENANT_ID),
log_service_(NULL),
available_capacity_(0),
last_refresh_ts_(common::OB_INVALID_TIMESTAMP)
{}
ObLogRestoreController::~ObLogRestoreController()
{
destroy();
}
int ObLogRestoreController::init(const uint64_t tenant_id, ObLogService *log_service)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("ObLogRestoreController init twice", K(ret));
} else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)
|| OB_ISNULL(log_service)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(tenant_id), K(log_service));
} else {
tenant_id_ = tenant_id;
log_service_ = log_service;
inited_ = true;
}
return ret;
}
void ObLogRestoreController::destroy()
{
inited_ = false;
tenant_id_ = OB_INVALID_TENANT_ID;
log_service_ = NULL;
available_capacity_ = 0;
last_refresh_ts_ = OB_INVALID_TIMESTAMP;
}
int ObLogRestoreController::get_quota(const int64_t size, bool &succ)
{
int ret = OB_SUCCESS;
succ = false;
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
} else if (size > ATOMIC_LOAD(&available_capacity_)) {
succ = false;
} else {
ATOMIC_SAF(&available_capacity_, size);
succ = true;
}
return ret;
}
int ObLogRestoreController::update_quota()
{
int ret = OB_SUCCESS;
int64_t used_size = 0;
int64_t total_size = 0;
palf::PalfOptions palf_opt;
auto get_palf_used_size_func = [&](const palf::PalfHandle &palf_handle) -> int {
int ret = OB_SUCCESS;
int64_t palf_id = -1;
palf::LSN base_lsn;
palf::LSN end_lsn;
palf_handle.get_palf_id(palf_id);
if (OB_FAIL(palf_handle.get_base_lsn(base_lsn))) {
CLOG_LOG(WARN, "get palf base_lsn failed", K(palf_id));
} else if (OB_FAIL(palf_handle.get_end_lsn(end_lsn))) {
CLOG_LOG(WARN, "get palf end_lsn failed", K(palf_id));
} else if (OB_UNLIKELY(base_lsn > end_lsn)) {
CLOG_LOG(TRACE, "base_lsn smaller than end_lsn, maybe rebuild happened", K(palf_id), K(base_lsn), K(end_lsn));
} else {
used_size += static_cast<int64_t>(end_lsn - base_lsn);
}
return ret;
};
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObLogRestoreController not init", K(tenant_id_));
} else if (! need_update_()) {
} else if (OB_FAIL(log_service_->get_palf_options(palf_opt))) {
LOG_WARN("get palf option failed", K(tenant_id_));
} else if (OB_FAIL(log_service_->iterate_palf(get_palf_used_size_func))) {
LOG_WARN("fail to get palf_options", K(tenant_id_));
} else {
total_size = palf_opt.disk_options_.log_disk_usage_limit_size_;
const int64_t capacity = std::max(total_size / 100 * palf_opt.disk_options_.log_disk_utilization_threshold_ - used_size, 0L);
ATOMIC_SET(&available_capacity_, capacity);
last_refresh_ts_ = common::ObTimeUtility::fast_current_time();
LOG_TRACE("update log restore quota succ", K(tenant_id_), K(used_size), K(total_size), K(capacity), K(last_refresh_ts_));
}
return ret;
}
bool ObLogRestoreController::need_update_() const
{
return common::ObTimeUtility::fast_current_time() - last_refresh_ts_ >= LOG_RESTORE_CONTROL_REFRESH_INTERVAL;
}
} // namespace logservice
} // namespace oceanbase

View File

@ -1,48 +0,0 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LOGSERVICE_OB_LOG_RESTORE_CONTROLLER_H_
#define OCEANBASE_LOGSERVICE_OB_LOG_RESTORE_CONTROLLER_H_
#include "lib/utility/ob_macro_utils.h"
#include <cstdint>
namespace oceanbase
{
namespace logservice
{
class ObLogService;
// Only support single thread get and update log restore quota
class ObLogRestoreController
{
const int64_t LOG_RESTORE_CONTROL_REFRESH_INTERVAL = 1000 * 1000L; // 1s
public:
ObLogRestoreController();
~ObLogRestoreController();
public:
int init(const uint64_t tenant_id, ObLogService *log_service);
void destroy();
int update_quota();
int get_quota(const int64_t size, bool &succ);
private:
bool need_update_() const;
private:
bool inited_;
uint64_t tenant_id_;
ObLogService *log_service_;
int64_t available_capacity_;
int64_t last_refresh_ts_;
private:
DISALLOW_COPY_AND_ASSIGN(ObLogRestoreController);
};
} // namespace logservice
} // namespace oceanbase
#endif

View File

@ -22,7 +22,6 @@
#include "share/ob_ls_id.h" #include "share/ob_ls_id.h"
#include "share/rc/ob_tenant_base.h" #include "share/rc/ob_tenant_base.h"
#include "storage/tx_storage/ob_ls_service.h" // ObLSService #include "storage/tx_storage/ob_ls_service.h" // ObLSService
#include "ob_log_restore_controller.h" // ObLogRestoreController
#include "logservice/ob_log_service.h" // ObLogService #include "logservice/ob_log_service.h" // ObLogService
#include "share/restore/ob_log_restore_source.h" // ObLogRestoreSourceType #include "share/restore/ob_log_restore_source.h" // ObLogRestoreSourceType
#include "logservice/logfetcher/ob_log_fetcher.h" // ObLogFetcher #include "logservice/logfetcher/ob_log_fetcher.h" // ObLogFetcher
@ -71,18 +70,16 @@ ObLogRestoreNetDriver::~ObLogRestoreNetDriver()
} }
int ObLogRestoreNetDriver::init(const uint64_t tenant_id, int ObLogRestoreNetDriver::init(const uint64_t tenant_id,
ObLogRestoreController *controller,
ObLSService *ls_svr, ObLSService *ls_svr,
ObLogService *log_service) ObLogService *log_service)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_UNLIKELY(tenant_id == OB_INVALID_TENANT_ID if (OB_UNLIKELY(tenant_id == OB_INVALID_TENANT_ID
|| NULL == controller
|| NULL == ls_svr)) { || NULL == ls_svr)) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(tenant_id), K(controller), K(ls_svr)); LOG_WARN("invalid argument", K(tenant_id), K(ls_svr));
} else if (OB_FAIL(restore_function_.init(tenant_id, controller, ls_svr))) { } else if (OB_FAIL(restore_function_.init(tenant_id, ls_svr))) {
LOG_WARN("restore_function_ init failed", K(tenant_id), K(controller), K(ls_svr)); LOG_WARN("restore_function_ init failed", K(tenant_id), K(ls_svr));
} else if (OB_FAIL(error_handler_.init(ls_svr))) { } else if (OB_FAIL(error_handler_.init(ls_svr))) {
LOG_WARN("error_handler_ init failed"); LOG_WARN("error_handler_ init failed");
} else if (OB_FAIL(cfg_.init())) { } else if (OB_FAIL(cfg_.init())) {

View File

@ -46,7 +46,6 @@ class ObLogFetcher;
namespace logservice namespace logservice
{ {
class ObLogRestoreController;
class ObLogService; class ObLogService;
// The driver for standby based on net service, and its functions includes: // The driver for standby based on net service, and its functions includes:
// 1. fetcher and proxy management; // 1. fetcher and proxy management;
@ -60,7 +59,6 @@ public:
~ObLogRestoreNetDriver(); ~ObLogRestoreNetDriver();
public: public:
int init(const uint64_t tenant_id, int init(const uint64_t tenant_id,
ObLogRestoreController *controller,
storage::ObLSService *ls_svr, storage::ObLSService *ls_svr,
ObLogService *log_service); ObLogService *log_service);
void destroy(); void destroy();

View File

@ -67,17 +67,15 @@ int ObLogRestoreService::init(rpc::frame::ObReqTransport *transport,
LOG_WARN("invalid argument", K(ret), K(transport), K(ls_svr), K(log_service)); LOG_WARN("invalid argument", K(ret), K(transport), K(ls_svr), K(log_service));
} else if (OB_FAIL(proxy_.init(transport))) { } else if (OB_FAIL(proxy_.init(transport))) {
LOG_WARN("proxy_ init failed", K(ret)); LOG_WARN("proxy_ init failed", K(ret));
} else if (OB_FAIL(restore_controller_.init(tenant_id, log_service))) {
LOG_WARN("restore_controller_ init failed");
} else if (OB_FAIL(location_adaptor_.init(tenant_id, ls_svr))) { } else if (OB_FAIL(location_adaptor_.init(tenant_id, ls_svr))) {
LOG_WARN("location_adaptor_ init failed", K(ret)); LOG_WARN("location_adaptor_ init failed", K(ret));
} else if (OB_FAIL(archive_driver_.init(tenant_id, ls_svr, log_service, &fetch_log_worker_))) { } else if (OB_FAIL(archive_driver_.init(tenant_id, ls_svr, log_service, &fetch_log_worker_))) {
LOG_WARN("archive_driver_ init failed"); LOG_WARN("archive_driver_ init failed");
} else if (OB_FAIL(net_driver_.init(tenant_id, &restore_controller_, ls_svr, log_service))) { } else if (OB_FAIL(net_driver_.init(tenant_id, ls_svr, log_service))) {
LOG_WARN("net_driver_ init failed"); LOG_WARN("net_driver_ init failed");
} else if (OB_FAIL(fetch_log_impl_.init(tenant_id, &archive_driver_, &net_driver_))) { } else if (OB_FAIL(fetch_log_impl_.init(tenant_id, &archive_driver_, &net_driver_))) {
LOG_WARN("fetch_log_impl_ init failed", K(ret)); LOG_WARN("fetch_log_impl_ init failed", K(ret));
} else if (OB_FAIL(fetch_log_worker_.init(tenant_id, &allocator_, &restore_controller_, this, ls_svr))) { } else if (OB_FAIL(fetch_log_worker_.init(tenant_id, &allocator_, this, ls_svr))) {
LOG_WARN("fetch_log_worker_ init failed", K(ret)); LOG_WARN("fetch_log_worker_ init failed", K(ret));
} else if (OB_FAIL(error_reporter_.init(tenant_id, ls_svr))) { } else if (OB_FAIL(error_reporter_.init(tenant_id, ls_svr))) {
LOG_WARN("error_reporter_ init failed", K(ret)); LOG_WARN("error_reporter_ init failed", K(ret));
@ -99,7 +97,6 @@ void ObLogRestoreService::destroy()
fetch_log_worker_.destroy(); fetch_log_worker_.destroy();
stop(); stop();
wait(); wait();
restore_controller_.destroy();
location_adaptor_.destroy(); location_adaptor_.destroy();
archive_driver_.destroy(); archive_driver_.destroy();
net_driver_.destroy(); net_driver_.destroy();
@ -183,8 +180,6 @@ void ObLogRestoreService::do_thread_task_()
share::ObLogRestoreSourceItem source; share::ObLogRestoreSourceItem source;
bool source_exist = false; bool source_exist = false;
update_restore_quota_();
if (OB_FAIL(update_upstream_(source, source_exist))) { if (OB_FAIL(update_upstream_(source, source_exist))) {
LOG_WARN("update_upstream_ failed"); LOG_WARN("update_upstream_ failed");
} else if (source_exist) { } else if (source_exist) {
@ -206,11 +201,6 @@ void ObLogRestoreService::do_thread_task_()
} }
} }
void ObLogRestoreService::update_restore_quota_()
{
(void)restore_controller_.update_quota();
}
int ObLogRestoreService::update_upstream_(share::ObLogRestoreSourceItem &source, bool &source_exist) int ObLogRestoreService::update_upstream_(share::ObLogRestoreSourceItem &source, bool &source_exist)
{ {
return location_adaptor_.update_upstream(source, source_exist); return location_adaptor_.update_upstream(source, source_exist);

View File

@ -23,7 +23,6 @@
#include "ob_remote_error_reporter.h" // ObRemoteErrorReporter #include "ob_remote_error_reporter.h" // ObRemoteErrorReporter
#include "ob_log_restore_allocator.h" // ObLogRestoreAllocator #include "ob_log_restore_allocator.h" // ObLogRestoreAllocator
#include "ob_log_restore_scheduler.h" // ObLogRestoreScheduler #include "ob_log_restore_scheduler.h" // ObLogRestoreScheduler
#include "ob_log_restore_controller.h" // ObLogRestoreController
#include "ob_log_restore_net_driver.h" // ObLogRestoreNetDriver #include "ob_log_restore_net_driver.h" // ObLogRestoreNetDriver
#include "ob_log_restore_archive_driver.h" // ObLogRestoreArchiveDriver #include "ob_log_restore_archive_driver.h" // ObLogRestoreArchiveDriver
@ -73,7 +72,6 @@ public:
private: private:
void run1(); void run1();
void do_thread_task_(); void do_thread_task_();
void update_restore_quota_();
int update_upstream_(share::ObLogRestoreSourceItem &source, bool &source_exist); int update_upstream_(share::ObLogRestoreSourceItem &source, bool &source_exist);
void schedule_fetch_log_(share::ObLogRestoreSourceItem &source); void schedule_fetch_log_(share::ObLogRestoreSourceItem &source);
void schedule_resource_(const share::ObLogRestoreSourceType &source_type); void schedule_resource_(const share::ObLogRestoreSourceType &source_type);
@ -90,7 +88,6 @@ private:
int64_t last_update_restore_upper_limit_ts_; int64_t last_update_restore_upper_limit_ts_;
ObLSService *ls_svr_; ObLSService *ls_svr_;
ObLogResSvrRpc proxy_; ObLogResSvrRpc proxy_;
ObLogRestoreController restore_controller_;
ObRemoteLocationAdaptor location_adaptor_; ObRemoteLocationAdaptor location_adaptor_;
ObLogRestoreArchiveDriver archive_driver_; ObLogRestoreArchiveDriver archive_driver_;
ObLogRestoreNetDriver net_driver_; ObLogRestoreNetDriver net_driver_;

View File

@ -28,7 +28,6 @@
#include "ob_fetch_log_task.h" // ObFetchLogTask #include "ob_fetch_log_task.h" // ObFetchLogTask
#include "ob_log_restore_handler.h" // ObLogRestoreHandler #include "ob_log_restore_handler.h" // ObLogRestoreHandler
#include "ob_log_restore_allocator.h" // ObLogRestoreAllocator #include "ob_log_restore_allocator.h" // ObLogRestoreAllocator
#include "ob_log_restore_controller.h"
#include "storage/tx_storage/ob_ls_handle.h" // ObLSHandle #include "storage/tx_storage/ob_ls_handle.h" // ObLSHandle
#include "logservice/archiveservice/ob_archive_define.h" // archive #include "logservice/archiveservice/ob_archive_define.h" // archive
#include "storage/tx_storage/ob_ls_map.h" // ObLSIterator #include "storage/tx_storage/ob_ls_map.h" // ObLSIterator
@ -60,7 +59,6 @@ using namespace share;
ObRemoteFetchWorker::ObRemoteFetchWorker() : ObRemoteFetchWorker::ObRemoteFetchWorker() :
inited_(false), inited_(false),
tenant_id_(OB_INVALID_TENANT_ID), tenant_id_(OB_INVALID_TENANT_ID),
restore_controller_(NULL),
restore_service_(NULL), restore_service_(NULL),
ls_svr_(NULL), ls_svr_(NULL),
task_queue_(), task_queue_(),
@ -76,7 +74,6 @@ ObRemoteFetchWorker::~ObRemoteFetchWorker()
int ObRemoteFetchWorker::init(const uint64_t tenant_id, int ObRemoteFetchWorker::init(const uint64_t tenant_id,
ObLogRestoreAllocator *allocator, ObLogRestoreAllocator *allocator,
ObLogRestoreController *restore_controller,
ObLogRestoreService *restore_service, ObLogRestoreService *restore_service,
ObLSService *ls_svr) ObLSService *ls_svr)
{ {
@ -88,12 +85,10 @@ int ObRemoteFetchWorker::init(const uint64_t tenant_id,
LOG_ERROR("ObRemoteFetchWorker has been initialized", K(ret)); LOG_ERROR("ObRemoteFetchWorker has been initialized", K(ret));
} else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id) } else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)
|| OB_ISNULL(allocator) || OB_ISNULL(allocator)
|| OB_ISNULL(restore_controller)
|| OB_ISNULL(restore_service) || OB_ISNULL(restore_service)
|| OB_ISNULL(ls_svr)) { || OB_ISNULL(ls_svr)) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(tenant_id), K(restore_controller), LOG_WARN("invalid argument", K(tenant_id), K(allocator), K(restore_service), K(ls_svr));
K(allocator), K(restore_service), K(ls_svr));
} else if (OB_FAIL(task_queue_.init(FETCH_LOG_TASK_LIMIT, "RFLTaskQueue", MTL_ID()))) { } else if (OB_FAIL(task_queue_.init(FETCH_LOG_TASK_LIMIT, "RFLTaskQueue", MTL_ID()))) {
LOG_WARN("task_queue_ init failed", K(ret)); LOG_WARN("task_queue_ init failed", K(ret));
} else if (OB_FAIL(log_ext_handler_.init())) { } else if (OB_FAIL(log_ext_handler_.init())) {
@ -101,7 +96,6 @@ int ObRemoteFetchWorker::init(const uint64_t tenant_id,
} else { } else {
tenant_id_ = tenant_id; tenant_id_ = tenant_id;
allocator_ = allocator; allocator_ = allocator;
restore_controller_ = restore_controller;
restore_service_ = restore_service; restore_service_ = restore_service;
ls_svr_ = ls_svr; ls_svr_ = ls_svr;
inited_ = true; inited_ = true;
@ -131,7 +125,6 @@ void ObRemoteFetchWorker::destroy()
restore_service_ = NULL; restore_service_ = NULL;
ls_svr_ = NULL; ls_svr_ = NULL;
allocator_ = NULL; allocator_ = NULL;
restore_controller_ = NULL;
log_ext_handler_.destroy(); log_ext_handler_.destroy();
inited_ = false; inited_ = false;
} }
@ -373,7 +366,6 @@ int ObRemoteFetchWorker::submit_entries_(ObFetchLogTask &task)
LSN lsn; LSN lsn;
const ObLSID id = task.id_; const ObLSID id = task.id_;
while (OB_SUCC(ret) && ! has_set_stop()) { while (OB_SUCC(ret) && ! has_set_stop()) {
bool quota_done = false;
if (OB_FAIL(task.iter_.next(entry, lsn, buf, size))) { if (OB_FAIL(task.iter_.next(entry, lsn, buf, size))) {
if (OB_ITER_END != ret) { if (OB_ITER_END != ret) {
LOG_WARN("ObRemoteLogIterator next failed", K(task)); LOG_WARN("ObRemoteLogIterator next failed", K(task));
@ -385,10 +377,6 @@ int ObRemoteFetchWorker::submit_entries_(ObFetchLogTask &task)
LOG_WARN("entry is invalid", K(entry), K(lsn), K(task)); LOG_WARN("entry is invalid", K(entry), K(lsn), K(task));
} else if (task.cur_lsn_ > lsn) { } else if (task.cur_lsn_ > lsn) {
LOG_INFO("repeated log, just skip", K(lsn), K(entry), K(task)); LOG_INFO("repeated log, just skip", K(lsn), K(entry), K(task));
} else if (OB_FAIL(wait_restore_quota_(entry.get_serialize_size(), quota_done))) {
LOG_WARN("wait restore quota failed", K(entry), K(task));
} else if (! quota_done) {
break;
} else if (OB_FAIL(submit_log_(id, task.proposal_id_, lsn, } else if (OB_FAIL(submit_log_(id, task.proposal_id_, lsn,
entry.get_scn(), buf, entry.get_serialize_size()))) { entry.get_scn(), buf, entry.get_serialize_size()))) {
LOG_WARN("submit log failed", K(buf), K(entry), K(lsn), K(task)); LOG_WARN("submit log failed", K(buf), K(entry), K(lsn), K(task));
@ -405,25 +393,6 @@ int ObRemoteFetchWorker::submit_entries_(ObFetchLogTask &task)
return ret; return ret;
} }
int ObRemoteFetchWorker::wait_restore_quota_(const int64_t size, bool &done)
{
int ret = OB_SUCCESS;
done = false;
while (OB_SUCC(ret) && ! done && ! has_set_stop()) {
if (OB_FAIL(restore_controller_->get_quota(size, done))) {
LOG_WARN("get quota failed");
} else if (! done) {
if (REACH_TIME_INTERVAL(10 * 1000 * 1000L)) {
LOG_INFO("clog disk is not enough, just wait", K(size));
} else {
LOG_TRACE("get quota succ", K(size));
}
usleep(100 * 1000L); // if get quota not done, sleep 100ms
}
}
return ret;
}
int ObRemoteFetchWorker::submit_log_(const ObLSID &id, int ObRemoteFetchWorker::submit_log_(const ObLSID &id,
const int64_t proposal_id, const int64_t proposal_id,
const LSN &lsn, const LSN &lsn,

View File

@ -45,7 +45,6 @@ class ObRemoteSourceGuard;
class ObRemoteLogParent; class ObRemoteLogParent;
class ObLogRestoreService; class ObLogRestoreService;
class ObLogRestoreAllocator; class ObLogRestoreAllocator;
class ObLogRestoreController;
using oceanbase::share::ObLSID; using oceanbase::share::ObLSID;
using oceanbase::palf::LSN; using oceanbase::palf::LSN;
// Remote fetch log worker // Remote fetch log worker
@ -57,7 +56,6 @@ public:
int init(const uint64_t tenant_id, int init(const uint64_t tenant_id,
ObLogRestoreAllocator *allocator, ObLogRestoreAllocator *allocator,
ObLogRestoreController *restore_controller,
ObLogRestoreService *restore_service, ObLogRestoreService *restore_service,
storage::ObLSService *ls_svr); storage::ObLSService *ls_svr);
void destroy(); void destroy();
@ -84,7 +82,6 @@ private:
int submit_entries_(ObFetchLogTask &task); int submit_entries_(ObFetchLogTask &task);
int submit_log_(const ObLSID &id, const int64_t proposal_id, const LSN &lsn, int submit_log_(const ObLSID &id, const int64_t proposal_id, const LSN &lsn,
const share::SCN &scn, const char *buf, const int64_t buf_size); const share::SCN &scn, const char *buf, const int64_t buf_size);
int wait_restore_quota_(const int64_t size, bool &done);
void mark_if_to_end_(ObFetchLogTask &task, const share::SCN &upper_limit_scn, const share::SCN &scn); void mark_if_to_end_(ObFetchLogTask &task, const share::SCN &upper_limit_scn, const share::SCN &scn);
int try_retire_(ObFetchLogTask *&task); int try_retire_(ObFetchLogTask *&task);
void try_update_location_info_(const ObFetchLogTask &task, ObRemoteLogGroupEntryIterator &iter); void try_update_location_info_(const ObFetchLogTask &task, ObRemoteLogGroupEntryIterator &iter);
@ -105,7 +102,6 @@ private:
private: private:
bool inited_; bool inited_;
uint64_t tenant_id_; uint64_t tenant_id_;
ObLogRestoreController *restore_controller_;
ObLogRestoreService *restore_service_; ObLogRestoreService *restore_service_;
storage::ObLSService *ls_svr_; storage::ObLSService *ls_svr_;
common::ObLightyQueue task_queue_; common::ObLightyQueue task_queue_;

View File

@ -23,7 +23,6 @@
#include "ob_log_restore_handler.h" // ObLogRestoreHandler #include "ob_log_restore_handler.h" // ObLogRestoreHandler
#include "storage/tx_storage/ob_ls_handle.h" // ObLSHandle #include "storage/tx_storage/ob_ls_handle.h" // ObLSHandle
#include "storage/tx_storage/ob_ls_service.h" // ObLSService #include "storage/tx_storage/ob_ls_service.h" // ObLSService
#include "ob_log_restore_controller.h" // ObLogRestoreController
namespace oceanbase namespace oceanbase
{ {
@ -60,7 +59,6 @@ void ObRestoreLogFunction::destroy()
} }
int ObRestoreLogFunction::init(const uint64_t tenant_id, int ObRestoreLogFunction::init(const uint64_t tenant_id,
ObLogRestoreController *controller,
storage::ObLSService *ls_svr) storage::ObLSService *ls_svr)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -68,13 +66,11 @@ int ObRestoreLogFunction::init(const uint64_t tenant_id,
ret = OB_INIT_TWICE; ret = OB_INIT_TWICE;
CLOG_LOG(WARN, "ObRestoreLogFunction init twice", K(inited_)); CLOG_LOG(WARN, "ObRestoreLogFunction init twice", K(inited_));
} else if (OB_UNLIKELY(tenant_id == OB_INVALID_TENANT_ID } else if (OB_UNLIKELY(tenant_id == OB_INVALID_TENANT_ID
|| NULL == controller
|| NULL == ls_svr)) { || NULL == ls_svr)) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid argument", K(tenant_id), K(controller), K(ls_svr)); CLOG_LOG(WARN, "invalid argument", K(tenant_id), K(ls_svr));
} else { } else {
tenant_id_ = tenant_id; tenant_id_ = tenant_id;
controller_ = controller;
ls_svr_ = ls_svr; ls_svr_ = ls_svr;
inited_ = true; inited_ = true;
} }
@ -85,7 +81,6 @@ void ObRestoreLogFunction::reset()
{ {
inited_ = false; inited_ = false;
tenant_id_ = OB_INVALID_TENANT_ID; tenant_id_ = OB_INVALID_TENANT_ID;
controller_ = NULL;
ls_svr_ = NULL; ls_svr_ = NULL;
} }
@ -103,7 +98,6 @@ int ObRestoreLogFunction::handle_group_entry(
{ {
UNUSED(tenant_id); UNUSED(tenant_id);
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
bool quota_done = false;
const int64_t size = group_entry.get_serialize_size(); const int64_t size = group_entry.get_serialize_size();
if (OB_UNLIKELY(!inited_)) { if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
@ -115,11 +109,6 @@ int ObRestoreLogFunction::handle_group_entry(
|| NULL == buffer)) { || NULL == buffer)) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid argument", K(id), K(proposal_id), K(group_start_lsn), K(group_entry), K(buffer)); CLOG_LOG(WARN, "invalid argument", K(id), K(proposal_id), K(group_start_lsn), K(group_entry), K(buffer));
} else if (OB_FAIL(wait_restore_quota_(size, quota_done, stop_flag))) {
LOG_WARN("wait restore quota failed", K(group_entry), K(group_start_lsn));
} else if (! quota_done) {
ret = OB_IN_STOP_STATE;
LOG_WARN("get quota failed", K(quota_done), K(group_entry), K(group_start_lsn));
} else if (OB_FAIL(process_(id, proposal_id, group_start_lsn, group_entry.get_scn(), } else if (OB_FAIL(process_(id, proposal_id, group_start_lsn, group_entry.get_scn(),
buffer, group_entry.get_serialize_size(), stop_flag))) { buffer, group_entry.get_serialize_size(), stop_flag))) {
CLOG_LOG(WARN, "process failed", K(id), K(group_start_lsn), K(group_entry), K(buffer)); CLOG_LOG(WARN, "process failed", K(id), K(group_start_lsn), K(group_entry), K(buffer));
@ -127,25 +116,6 @@ int ObRestoreLogFunction::handle_group_entry(
return ret; return ret;
} }
int ObRestoreLogFunction::wait_restore_quota_(const int64_t size, bool &done, volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
done = false;
while (OB_SUCC(ret) && !done && !stop_flag) {
if (OB_FAIL(controller_->get_quota(size, done))) {
LOG_WARN("get quota failed");
} else if (! done) {
if (REACH_TIME_INTERVAL(10 * 1000 * 1000L)) {
LOG_INFO("clog disk is not enough, just wait", K(size));
} else {
LOG_TRACE("get quota succ", K(size));
}
usleep(100 * 1000L); // if get quota not done, sleep 100ms
}
}
return ret;
}
int ObRestoreLogFunction::process_(const share::ObLSID &id, int ObRestoreLogFunction::process_(const share::ObLSID &id,
const int64_t proposal_id, const int64_t proposal_id,
const palf::LSN &lsn, const palf::LSN &lsn,

View File

@ -30,14 +30,13 @@ class ObLSService;
} }
namespace logservice namespace logservice
{ {
class ObLogRestoreController;
class ObRestoreLogFunction : public logfetcher::ILogFetcherHandler class ObRestoreLogFunction : public logfetcher::ILogFetcherHandler
{ {
public: public:
ObRestoreLogFunction(); ObRestoreLogFunction();
virtual ~ObRestoreLogFunction(); virtual ~ObRestoreLogFunction();
public: public:
int init(const uint64_t tenant_id, ObLogRestoreController *controller, storage::ObLSService *ls_svr); int init(const uint64_t tenant_id, storage::ObLSService *ls_svr);
void destroy(); void destroy();
void reset(); void reset();
@ -62,11 +61,9 @@ private:
const int64_t buf_size, const int64_t buf_size,
volatile bool &stop_flag); volatile bool &stop_flag);
int wait_restore_quota_(const int64_t size, bool &done, volatile bool &stop_flag);
private: private:
bool inited_; bool inited_;
uint64_t tenant_id_; uint64_t tenant_id_;
ObLogRestoreController *controller_;
storage::ObLSService *ls_svr_; storage::ObLSService *ls_svr_;
}; };
} // namespace logservice } // namespace logservice