replace ObBackIOAdapter with ObLogExternalStorageHandler in CDC and restore

This commit is contained in:
HaHaJeff 2023-07-18 14:47:53 +00:00 committed by ob-robot
parent a4ba36d613
commit 9a1e87378b
30 changed files with 388 additions and 169 deletions

View File

@ -31,7 +31,8 @@ ObCdcFetcher::ObCdcFetcher()
: is_inited_(false),
tenant_id_(OB_INVALID_TENANT_ID),
ls_service_(NULL),
large_buffer_pool_(NULL)
large_buffer_pool_(NULL),
log_ext_handler_(NULL)
{
}
@ -42,7 +43,8 @@ ObCdcFetcher::~ObCdcFetcher()
int ObCdcFetcher::init(const uint64_t tenant_id,
ObLSService *ls_service,
archive::LargeBufferPool *buffer_pool)
archive::LargeBufferPool *buffer_pool,
logservice::ObLogExternalStorageHandler *log_ext_handler)
{
int ret = OB_SUCCESS;
@ -50,7 +52,8 @@ int ObCdcFetcher::init(const uint64_t tenant_id,
ret = OB_INIT_TWICE;
LOG_WARN("inited twice", KR(ret));
} else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)
|| OB_ISNULL(ls_service) || OB_ISNULL(buffer_pool)) {
|| OB_ISNULL(ls_service) || OB_ISNULL(buffer_pool)
|| OB_ISNULL(log_ext_handler)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_service), K(buffer_pool));
} else {
@ -58,6 +61,7 @@ int ObCdcFetcher::init(const uint64_t tenant_id,
tenant_id_ = tenant_id;
ls_service_ = ls_service;
large_buffer_pool_ = buffer_pool;
log_ext_handler_ = log_ext_handler;
}
return ret;
@ -70,6 +74,7 @@ void ObCdcFetcher::destroy()
tenant_id_ = OB_INVALID_TENANT_ID;
ls_service_ = NULL;
large_buffer_pool_ = NULL;
log_ext_handler_ = NULL;
}
}
@ -352,7 +357,8 @@ int ObCdcFetcher::fetch_log_in_archive_(
if (OB_FAIL(pre_scn.convert_from_ts(ctx.get_progress()/1000L))) {
LOG_WARN("convert progress to scn failed", KR(ret), K(ctx));
} else if (need_init_iter && OB_FAIL(remote_iter.init(tenant_id_, ls_id, pre_scn,
start_lsn, LSN(LOG_MAX_LSN_VAL), large_buffer_pool_))) {
start_lsn, LSN(LOG_MAX_LSN_VAL), large_buffer_pool_,
log_ext_handler_))) {
LOG_WARN("init remote log iterator failed", KR(ret), K(tenant_id_), K(ls_id));
} else if (OB_FAIL(remote_iter.next(log_entry, lsn, buf, buf_size))) {
// expected OB_ITER_END and OB_SUCCEES, error occurs when other code is returned.

View File

@ -27,6 +27,10 @@
namespace oceanbase
{
namespace logservice
{
class ObLogExternalStorageHandler;
}
namespace cdc
{
using oceanbase::storage::ObLSService;
@ -48,7 +52,8 @@ public:
~ObCdcFetcher();
int init(const uint64_t tenant_id,
ObLSService *ls_service,
archive::LargeBufferPool *buffer_pool);
archive::LargeBufferPool *buffer_pool,
logservice::ObLogExternalStorageHandler *log_ext_handler);
void destroy();
public:
@ -201,6 +206,7 @@ private:
uint64_t tenant_id_;
ObLSService *ls_service_;
archive::LargeBufferPool *large_buffer_pool_;
logservice::ObLogExternalStorageHandler *log_ext_handler_;
};
// Some parameters and status during Fetch execution

View File

@ -50,7 +50,8 @@ ObCdcService::ObCdcService()
dest_info_(),
dest_info_lock_(),
ls_ctx_map_(),
large_buffer_pool_()
large_buffer_pool_(),
log_ext_handler_()
{
}
@ -71,9 +72,11 @@ int ObCdcService::init(const uint64_t tenant_id,
EXTLOG_LOG(WARN, "ls ctx map init failed", KR(ret), K(tenant_id));
} else if (OB_FAIL(large_buffer_pool_.init("CDCService", 1L * 1024 * 1024 * 1024))) {
EXTLOG_LOG(WARN, "large buffer pool init failed", KR(ret), K(tenant_id));
} else if (OB_FAIL(locator_.init(tenant_id, &large_buffer_pool_))) {
} else if (OB_FAIL(log_ext_handler_.init())) {
EXTLOG_LOG(WARN, "log ext handler init failed", KR(ret), K(tenant_id));
} else if (OB_FAIL(locator_.init(tenant_id, &large_buffer_pool_, &log_ext_handler_))) {
EXTLOG_LOG(WARN, "ObCdcStartLsnLocator init failed", KR(ret), K(tenant_id));
} else if (OB_FAIL(fetcher_.init(tenant_id, ls_service, &large_buffer_pool_))) {
} else if (OB_FAIL(fetcher_.init(tenant_id, ls_service, &large_buffer_pool_, &log_ext_handler_))) {
EXTLOG_LOG(WARN, "ObCdcFetcher init failed", KR(ret), K(tenant_id));
} else if (OB_FAIL(create_tenant_tg_(tenant_id))) {
EXTLOG_LOG(WARN, "cdc thread group create failed", KR(ret), K(tenant_id));
@ -131,7 +134,6 @@ void ObCdcService::run1()
large_buffer_pool_.weed_out();
last_purge_ts = current_ts;
}
ob_usleep(static_cast<uint32_t>(BASE_INTERVAL));
}
}
@ -144,6 +146,9 @@ int ObCdcService::start()
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
EXTLOG_LOG(WARN, "ObCdcService not init", K(ret));
// TODO by wenyue.zxl: change the concurrency of 'log_ext_handler_'(see resize interface)
} else if (OB_FAIL(log_ext_handler_.start(0))) {
EXTLOG_LOG(WARN, "log ext handler start failed", K(ret));
} else if (OB_FAIL(start_tenant_tg_(MTL_ID()))) {
EXTLOG_LOG(ERROR, "start CDCService failed", KR(ret));
} else {
@ -157,11 +162,13 @@ void ObCdcService::stop()
{
ATOMIC_STORE(&stop_flag_, true);
stop_tenant_tg_(MTL_ID());
log_ext_handler_.stop();
}
void ObCdcService::wait()
{
wait_tenant_tg_(MTL_ID());
log_ext_handler_.wait();
// do nothing
}
@ -175,6 +182,7 @@ void ObCdcService::destroy()
dest_info_.reset();
large_buffer_pool_.destroy();
ls_ctx_map_.destroy();
log_ext_handler_.destroy();
}
int ObCdcService::req_start_lsn_by_ts_ns(const obrpc::ObCdcReqStartLSNByTsReq &req,

View File

@ -18,6 +18,7 @@
#include "ob_cdc_start_lsn_locator.h"
#include "ob_cdc_struct.h" // ClientLSKey, ClientLSCtx, ClientLSCtxMap
#include "logservice/restoreservice/ob_remote_log_iterator.h"
#include "logservice/ob_log_external_storage_handler.h" // ObLogExternalStorageHandler
namespace oceanbase
{
@ -119,6 +120,7 @@ private:
common::ObSpinLock dest_info_lock_;
ClientLSCtxMap ls_ctx_map_;
archive::LargeBufferPool large_buffer_pool_;
logservice::ObLogExternalStorageHandler log_ext_handler_;
};
} // namespace cdc

View File

@ -24,7 +24,8 @@ namespace cdc
ObCdcStartLsnLocator::ObCdcStartLsnLocator()
: is_inited_(false),
tenant_id_(OB_INVALID_TENANT_ID),
large_buffer_pool_(NULL)
large_buffer_pool_(NULL),
log_ext_handler_(NULL)
{
}
@ -34,7 +35,8 @@ ObCdcStartLsnLocator::~ObCdcStartLsnLocator()
}
int ObCdcStartLsnLocator::init(const uint64_t tenant_id,
archive::LargeBufferPool *buffer_pool)
archive::LargeBufferPool *buffer_pool,
logservice::ObLogExternalStorageHandler *log_ext_handler)
{
int ret = OB_SUCCESS;
@ -48,6 +50,7 @@ int ObCdcStartLsnLocator::init(const uint64_t tenant_id,
is_inited_ = true;
tenant_id_ = tenant_id;
large_buffer_pool_ = buffer_pool;
log_ext_handler_ = log_ext_handler;
}
return ret;
@ -59,6 +62,7 @@ void ObCdcStartLsnLocator::destroy()
is_inited_ = false;
tenant_id_ = OB_INVALID_TENANT_ID;
large_buffer_pool_ = NULL;
log_ext_handler_ = NULL;
}
}
@ -252,7 +256,7 @@ int ObCdcStartLsnLocator::do_locate_ls_(const bool fetch_archive_only,
LSN lsn;
if (OB_FAIL(remote_group_iter.init(tenant_id_, ls_id, start_scn,
result_lsn, LSN(palf::LOG_MAX_LSN_VAL), large_buffer_pool_))) {
result_lsn, LSN(palf::LOG_MAX_LSN_VAL), large_buffer_pool_, log_ext_handler_))) {
LOG_WARN("init remote group iter failed when retriving log group entry in start lsn locator", KR(ret), K(ls_id), K(tenant_id_));
} else if (OB_FAIL(remote_group_iter.next(log_group_entry, lsn, next_buf, next_buf_size))) {
LOG_WARN("iterate through archive log failed", KR(ret), K(ls_id), K(tenant_id_));

View File

@ -21,6 +21,10 @@
#include "logservice/archiveservice/large_buffer_pool.h" // LargeBufferPool
namespace oceanbase
{
namespace logservice
{
class ObLogExternalStorageHandler;
}
namespace cdc
{
using oceanbase::share::ObLSID;
@ -35,7 +39,8 @@ public:
ObCdcStartLsnLocator();
~ObCdcStartLsnLocator();
int init(const uint64_t tenant_id,
archive::LargeBufferPool *large_buffer_pool);
archive::LargeBufferPool *large_buffer_pool,
logservice::ObLogExternalStorageHandler *log_ext_handler);
void destroy();
int req_start_lsn_by_ts_ns(const ObLocateLSNByTsReq &req_msg,
ObLocateLSNByTsResp &result,
@ -61,6 +66,7 @@ private:
bool is_inited_;
uint64_t tenant_id_;
archive::LargeBufferPool *large_buffer_pool_;
logservice::ObLogExternalStorageHandler *log_ext_handler_;
};
} // namespace cdc

View File

@ -45,6 +45,7 @@ ObLogFetcher::ObLogFetcher() :
fetching_mode_(ClientFetchingMode::FETCHING_MODE_UNKNOWN),
archive_dest_(),
large_buffer_pool_(),
log_ext_handler_(),
task_pool_(NULL),
sys_ls_handler_(NULL),
err_handler_(NULL),
@ -139,6 +140,8 @@ int ObLogFetcher::init(
LOG_ERROR("init part trans resolver factory fail", KR(ret));
} else if (large_buffer_pool_.init("ObLogFetcher", 1L * 1024 * 1024 * 1024)) {
LOG_ERROR("init large buffer pool failed", KR(ret));
} else if (log_ext_handler_.init()) {
LOG_ERROR("init log ext handler failed", KR(ret));
} else if (OB_FAIL(ls_fetch_mgr_.init(max_cached_ls_fetch_ctx_count,
progress_controller_,
part_trans_resolver_factory_,
@ -243,6 +246,8 @@ void ObLogFetcher::destroy()
}
// Finally reset fetching_mode_ because of some processing dependencies, such as ObLogRouteService
fetching_mode_ = ClientFetchingMode::FETCHING_MODE_UNKNOWN;
log_ext_handler_.wait();
log_ext_handler_.destroy();
LOG_INFO("destroy fetcher succ");
}
@ -263,7 +268,10 @@ int ObLogFetcher::start()
} else {
stop_flag_ = false;
if (is_integrated_fetching_mode(fetching_mode_) && OB_FAIL(log_route_service_.start())) {
// TODO by wenyue.zxl: change the concurrency of 'log_ext_handler_'(see resize interface)
if (OB_FAIL(log_ext_handler_.start(0))) {
LOG_ERROR("start ObLogExternalStorageHandler fail", KR(ret));
} else if (is_integrated_fetching_mode(fetching_mode_) && OB_FAIL(log_route_service_.start())) {
LOG_ERROR("start LogRouterService fail", KR(ret));
} else if (OB_FAIL(start_lsn_locator_.start())) {
LOG_ERROR("start 'start_lsn_locator' fail", KR(ret));
@ -318,6 +326,7 @@ void ObLogFetcher::stop()
if (is_integrated_fetching_mode(fetching_mode_)) {
log_route_service_.stop();
}
log_ext_handler_.stop();
LOG_INFO("stop fetcher succ");
}
@ -604,6 +613,19 @@ int ObLogFetcher::get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_
return ret;
}
int ObLogFetcher::get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler)
{
int ret = OB_SUCCESS;
if(IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("fetcher not inited, could not get log ext handler", KR(ret), K_(is_inited));
} else {
log_ext_handler = &log_ext_handler_;
}
return ret;
}
int ObLogFetcher::check_progress(
const uint64_t tenant_id,
const int64_t timestamp,

View File

@ -76,6 +76,8 @@ public:
virtual int get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_pool) = 0;
virtual int get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler) = 0;
// Checks if the sys progress of specified tenant exceeds the timestamp
// For LogMetaDataService:
// 1. At startup time, it need to build the baseline data for the startup timestamp,
@ -168,6 +170,8 @@ public:
virtual int get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_pool);
virtual int get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler);
virtual int check_progress(
const uint64_t tenant_id,
const int64_t timestamp,
@ -242,6 +246,7 @@ private:
ClientFetchingMode fetching_mode_;
ObBackupPathString archive_dest_;
archive::LargeBufferPool large_buffer_pool_;
logservice::ObLogExternalStorageHandler log_ext_handler_;
TaskPool *task_pool_;
IObLogSysLsTaskHandler *sys_ls_handler_;
IObLogErrHandler *err_handler_;

View File

@ -248,6 +248,7 @@ int LSFetchCtx::init_remote_iter()
const LSN &start_lsn = progress_.get_next_lsn();
const int64_t cur_log_progress = progress_.get_progress();
archive::LargeBufferPool *large_buffer_pool = NULL;
logservice::ObLogExternalStorageHandler *log_ext_handler = NULL;
SCN start_scn;
if (remote_iter_.is_init()) {
@ -257,8 +258,10 @@ int LSFetchCtx::init_remote_iter()
LOG_ERROR("convert log progress to start scn failed", KR(ret), K(cur_log_progress));
} else if (OB_FAIL(get_large_buffer_pool(large_buffer_pool))) {
LOG_ERROR("get large buffer pool failed", KR(ret));
} else if (OB_FAIL(get_log_ext_handler(log_ext_handler))) {
LOG_ERROR("get log ext handler failed", KR(ret));
} else if (OB_FAIL(remote_iter_.init(tenant_id, ls_id, start_scn, start_lsn,
LSN(LOG_MAX_LSN_VAL), large_buffer_pool))) {
LSN(LOG_MAX_LSN_VAL), large_buffer_pool, log_ext_handler))) {
LOG_ERROR("remote iter init failed", KR(ret), K(tenant_id), K(ls_id), K(start_scn), K(start_lsn));
}
return ret;
@ -681,6 +684,22 @@ int LSFetchCtx::get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_po
return ret;
}
int LSFetchCtx::get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler)
{
int ret = OB_SUCCESS;
IObLogFetcher *fetcher = static_cast<IObLogFetcher *>(ls_fetch_mgr_->get_fetcher_host());
if (OB_ISNULL(fetcher)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("fetcher is nullptr", KR(ret), K(fetcher));
} else if (OB_FAIL(fetcher->get_log_ext_handler(log_ext_handler))) {
LOG_ERROR("Fetcher get_log_ext_handler fail", KR(ret));
} else if (OB_ISNULL(log_ext_handler)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("log_ext_handler is nullptr", KR(ret), K(log_ext_handler));
}
return ret;
}
bool LSFetchCtx::need_update_svr_list()
{
int ret = OB_SUCCESS;

View File

@ -181,6 +181,7 @@ public:
int locate_end_lsn(IObLogStartLSNLocator &start_lsn_locator);
int get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_pool);
int get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler);
ObRemoteLogParent *get_archive_source() { return source_; }
int init_remote_iter();

View File

@ -1447,6 +1447,7 @@ int FetchStream::fetch_miss_log_direct_(
const int64_t tenant_id = ls_fetch_ctx.get_tls_id().get_tenant_id();
const ObLSID &ls_id = ls_fetch_ctx.get_tls_id().get_ls_id();
archive::LargeBufferPool *buffer_pool = NULL;
logservice::ObLogExternalStorageHandler *log_ext_handler = NULL;
ObRpcResultCode rcode;
SCN cur_scn;
const int64_t start_fetch_ts = get_timestamp();
@ -1460,6 +1461,8 @@ int FetchStream::fetch_miss_log_direct_(
LOG_ERROR("convert log progress to scn failed", KR(ret), K(current_progress));
} else if (OB_FAIL(ls_fetch_ctx.get_large_buffer_pool(buffer_pool))) {
LOG_ERROR("get large buffer pool when fetching missing log failed", KR(ret), K(ls_fetch_ctx));
} else if (OB_FAIL(ls_fetch_ctx.get_log_ext_handler(log_ext_handler))) {
LOG_ERROR("get log ext handler when fetching missing log failed", KR(ret), K(ls_fetch_ctx));
} else {
int64_t fetched_cnt = 0;
const int64_t arr_cnt = miss_log_array.count();
@ -1480,7 +1483,7 @@ int FetchStream::fetch_miss_log_direct_(
if (get_timestamp() > time_upper_limit) {
is_timeout = true;
} else if (OB_FAIL(entry_iter.init(tenant_id, ls_id, cur_scn, missing_lsn,
LSN(palf::LOG_MAX_LSN_VAL), buffer_pool))) {
LSN(palf::LOG_MAX_LSN_VAL), buffer_pool, log_ext_handler))) {
LOG_WARN("remote entry iter init failed", KR(ret));
} else if (OB_FAIL(entry_iter.next(log_entry, lsn, buf, buf_size))) {
retry_on_err =true;

View File

@ -46,6 +46,7 @@ ObLogFetcher::ObLogFetcher() :
fetching_mode_(ClientFetchingMode::FETCHING_MODE_UNKNOWN),
archive_dest_(),
large_buffer_pool_(),
log_ext_handler_(),
ls_ctx_add_info_factory_(NULL),
err_handler_(NULL),
ls_fetch_mgr_(),
@ -128,6 +129,8 @@ int ObLogFetcher::init(
LOG_ERROR("init progress controller fail", KR(ret));
} else if (OB_FAIL(large_buffer_pool_.init("ObLogFetcher", 1L * 1024 * 1024 * 1024))) {
LOG_ERROR("init large buffer pool failed", KR(ret));
} else if (OB_FAIL(log_ext_handler_.init())) {
LOG_ERROR("init failed", KR(ret));
} else if (OB_FAIL(ls_fetch_mgr_.init(
progress_controller_,
ls_ctx_factory,
@ -232,6 +235,8 @@ void ObLogFetcher::destroy()
log_route_service_.wait();
log_route_service_.destroy();
}
log_ext_handler_.wait();
log_ext_handler_.destroy();
// Finally reset fetching_mode_ because of some processing dependencies, such as ObLogRouteService
fetching_mode_ = ClientFetchingMode::FETCHING_MODE_UNKNOWN;
log_fetcher_user_ = LogFetcherUser::UNKNOWN;
@ -264,6 +269,9 @@ int ObLogFetcher::start()
LOG_ERROR("start dead pool fail", KR(ret));
} else if (OB_FAIL(stream_worker_.start())) {
LOG_ERROR("start stream worker fail", KR(ret));
// TODO by wenyue.zxl: change the concurrency of 'log_ext_handler_'(see resize interface)
} else if (OB_FAIL(log_ext_handler_.start(0))) {
LOG_ERROR("start log external handler failed", KR(ret));
} else {
LOG_INFO("LogFetcher start success");
}
@ -288,6 +296,7 @@ void ObLogFetcher::stop()
if (is_integrated_fetching_mode(fetching_mode_)) {
log_route_service_.stop();
}
log_ext_handler_.stop();
LOG_INFO("LogFetcher stop success");
}
@ -711,6 +720,20 @@ int ObLogFetcher::get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_
return ret;
}
int ObLogFetcher::get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_hander)
{
int ret = OB_SUCCESS;
if(IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("LogFetcher is not inited, could not get log ext handler", KR(ret), K_(is_inited));
} else {
log_ext_hander = &log_ext_handler_;
}
return ret;
}
int ObLogFetcher::get_fetcher_config(const ObLogFetcherConfig *&cfg)
{
int ret = OB_SUCCESS;

View File

@ -35,6 +35,7 @@
#include "ob_log_progress_info.h" // ProgressInfo
#include "ob_log_fetcher_start_parameters.h" // ObLogFetcherStartParameters
#include "ob_log_fetcher_err_handler.h" // IObLogErrHandler
#include "logservice/ob_log_external_storage_handler.h" // ObLogExternalStorageHandler
namespace oceanbase
{
@ -132,6 +133,8 @@ public:
virtual int get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_pool) = 0;
virtual int get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler) = 0;
virtual int get_fetcher_config(const ObLogFetcherConfig *&cfg) = 0;
// Checks if the sys progress of specified tenant exceeds the timestamp
@ -245,6 +248,8 @@ public:
virtual int get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_pool);
virtual int get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler);
virtual int get_fetcher_config(const ObLogFetcherConfig *&cfg);
virtual int check_progress(
@ -275,39 +280,40 @@ private:
};
private:
bool is_inited_;
LogFetcherUser log_fetcher_user_;
int64_t cluster_id_;
uint64_t source_tenant_id_;
uint64_t self_tenant_id_;
const ObLogFetcherConfig *cfg_;
bool is_inited_;
LogFetcherUser log_fetcher_user_;
int64_t cluster_id_;
uint64_t source_tenant_id_;
uint64_t self_tenant_id_;
const ObLogFetcherConfig *cfg_;
bool is_loading_data_dict_baseline_data_;
ClientFetchingMode fetching_mode_;
ObBackupPathString archive_dest_;
archive::LargeBufferPool large_buffer_pool_;
ObILogFetcherLSCtxAddInfoFactory *ls_ctx_add_info_factory_;
IObLogErrHandler *err_handler_;
bool is_loading_data_dict_baseline_data_;
ClientFetchingMode fetching_mode_;
ObBackupPathString archive_dest_;
archive::LargeBufferPool large_buffer_pool_;
logservice::ObLogExternalStorageHandler log_ext_handler_;
ObILogFetcherLSCtxAddInfoFactory *ls_ctx_add_info_factory_;
IObLogErrHandler *err_handler_;
ObLogLSFetchMgr ls_fetch_mgr_; // Fetch Log Task Manager
PartProgressController progress_controller_; // Process Controller
ObLogLSFetchMgr ls_fetch_mgr_; // Fetch Log Task Manager
PartProgressController progress_controller_; // Process Controller
// Function Modules
ObLogRpc rpc_;
logservice::ObLogRouteService log_route_service_;
ObLogStartLSNLocator start_lsn_locator_;
ObLogFetcherIdlePool idle_pool_;
ObLogFetcherDeadPool dead_pool_;
ObLSWorker stream_worker_;
ObLogRpc rpc_;
logservice::ObLogRouteService log_route_service_;
ObLogStartLSNLocator start_lsn_locator_;
ObLogFetcherIdlePool idle_pool_;
ObLogFetcherDeadPool dead_pool_;
ObLSWorker stream_worker_;
// TODO
ObFsContainerMgr fs_container_mgr_;
ObFsContainerMgr fs_container_mgr_;
volatile bool stop_flag_ CACHE_ALIGNED;
volatile bool stop_flag_ CACHE_ALIGNED;
// stop flag
bool paused_ CACHE_ALIGNED;
int64_t pause_time_ CACHE_ALIGNED;
int64_t resume_time_ CACHE_ALIGNED;
bool paused_ CACHE_ALIGNED;
int64_t pause_time_ CACHE_ALIGNED;
int64_t resume_time_ CACHE_ALIGNED;
private:
DISALLOW_COPY_AND_ASSIGN(ObLogFetcher);

View File

@ -244,6 +244,7 @@ int LSFetchCtx::init_remote_iter()
const LSN &start_lsn = progress_.get_next_lsn();
const int64_t cur_log_progress = progress_.get_progress();
archive::LargeBufferPool *large_buffer_pool = NULL;
logservice::ObLogExternalStorageHandler *log_ext_handler = NULL;
SCN start_scn;
if (remote_iter_.is_init()) {
@ -253,8 +254,10 @@ int LSFetchCtx::init_remote_iter()
LOG_ERROR("convert log progress to start scn failed", KR(ret), K(cur_log_progress));
} else if (OB_FAIL(get_large_buffer_pool(large_buffer_pool))) {
LOG_ERROR("get large buffer pool failed", KR(ret));
} else if (OB_FAIL(get_log_ext_handler(log_ext_handler))) {
LOG_ERROR("get log external handler failed", KR(ret));
} else if (OB_FAIL(remote_iter_.init(tenant_id, ls_id, start_scn, start_lsn,
LSN(LOG_MAX_LSN_VAL), large_buffer_pool))) {
LSN(LOG_MAX_LSN_VAL), large_buffer_pool, log_ext_handler))) {
LOG_ERROR("remote iter init failed", KR(ret), K(tenant_id), K(ls_id), K(start_scn), K(start_lsn));
}
return ret;
@ -430,6 +433,24 @@ int LSFetchCtx::get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_po
return ret;
}
int LSFetchCtx::get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler)
{
int ret = OB_SUCCESS;
IObLogFetcher *fetcher = static_cast<IObLogFetcher *>(ls_fetch_mgr_->get_fetcher_host());
if (OB_ISNULL(fetcher)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("fetcher is nullptr", KR(ret), K(fetcher));
} else if (OB_FAIL(fetcher->get_log_ext_handler(log_ext_handler))) {
LOG_ERROR("Fetcher get_log_ext_handler failed", KR(ret));
} else if (OB_ISNULL(log_ext_handler)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("get_log_ext_handler is nullptr", KR(ret), K(log_ext_handler));
}
return ret;
}
int LSFetchCtx::get_fetcher_config(const ObLogFetcherConfig *&cfg)
{
int ret = OB_SUCCESS;

View File

@ -39,6 +39,10 @@
namespace oceanbase
{
namespace logservice
{
class ObLogExternalStorageHandler;
}
namespace logfetcher
{
@ -158,6 +162,8 @@ public:
int get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_pool);
int get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler);
int get_fetcher_config(const ObLogFetcherConfig *&cfg);
ObRemoteLogParent *get_archive_source() { return source_; }
@ -500,7 +506,6 @@ private:
IObLogStartLSNLocator &start_lsn_locator);
int set_end_lsn_and_init_dict_iter_(const palf::LSN &start_lsn);
int get_log_route_service_(logservice::ObLogRouteService *&log_route_service);
int get_large_buffer_pool_(archive::LargeBufferPool *&large_buffer_pool);
protected:
FetchStreamType stype_;

View File

@ -80,6 +80,7 @@ int ObLogExternalStorageHandler::start(const int64_t concurrency)
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid argument", K(concurrency), KPC(this));
} else if (0 != concurrency
&& !FALSE_IT(share::ObThreadPool::set_run_wrapper(MTL_CTX()))
&& OB_FAIL(ObSimpleThreadPool::init(
concurrency, CAPACITY_COEFFICIENT * concurrency, "ObLogEXTTP", MTL_ID()))) {
CLOG_LOG(WARN, "invalid argument", K(concurrency), KPC(this));
@ -122,28 +123,32 @@ int ObLogExternalStorageHandler::resize(const int64_t new_concurrency,
{
int ret = OB_SUCCESS;
ObTimeGuard time_guard("resize thread pool", DEFAULT_RESIZE_TIME_GUARD_THRESHOLD);
WLockGuardTimeout guard(resize_rw_lock_, timeout_us, ret);
time_guard.click("after hold lock");
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "ObLogExternalStorageHandler not inited", KPC(this), K(new_concurrency), K(timeout_us));
} else if (!is_running_) {
ret = OB_NOT_RUNNING;
CLOG_LOG(WARN, "ObLogExternalStorageHandler not running", KPC(this), K(new_concurrency), K(timeout_us));
} else if (!is_valid_concurrency_(new_concurrency) || 0 > timeout_us) {
} else if (!is_valid_concurrency_(new_concurrency) || 0 >= timeout_us) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid arguments", KPC(this), K(new_concurrency), K(timeout_us));
// hold lock failed
} else if (OB_FAIL(ret)) {
CLOG_LOG(WARN, "hold lock failed", KPC(this), K(new_concurrency), K(timeout_us));
} else if (new_concurrency == concurrency_) {
} else if (!check_need_resize_(new_concurrency)) {
CLOG_LOG(TRACE, "no need resize", KPC(this), K(new_concurrency));
} else {
destroy_and_init_new_thread_pool_(new_concurrency);
time_guard.click("after create new thread pool");
concurrency_ = new_concurrency;
capacity_ = CAPACITY_COEFFICIENT * new_concurrency;
CLOG_LOG(INFO, "ObLogExternalStorageHandler resize success", K(new_concurrency));
WLockGuardTimeout guard(resize_rw_lock_, timeout_us, ret);
// hold lock failed
if (OB_FAIL(ret)) {
CLOG_LOG(WARN, "hold lock failed", KPC(this), K(new_concurrency), K(timeout_us));
} else if (!is_running_) {
ret = OB_NOT_RUNNING;
CLOG_LOG(WARN, "ObLogExternalStorageHandler not running", KPC(this), K(new_concurrency), K(timeout_us));
} else if (new_concurrency == concurrency_) {
CLOG_LOG(TRACE, "no need resize", KPC(this), K(new_concurrency));
} else {
destroy_and_init_new_thread_pool_(new_concurrency);
time_guard.click("after create new thread pool");
concurrency_ = new_concurrency;
capacity_ = CAPACITY_COEFFICIENT * new_concurrency;
CLOG_LOG(INFO, "ObLogExternalStorageHandler resize success", K(new_concurrency));
}
}
return ret;
}
@ -159,7 +164,7 @@ int ObLogExternalStorageHandler::pread(const common::ObString &uri,
int64_t async_task_count = 0;
ObTimeGuard time_guard("slow pread", DEFAULT_PREAD_TIME_GUARD_THRESHOLD);
ObLogExternalStorageIOTaskCtx *async_task_ctx = NULL;
int64_t file_size = palf::PALF_PHY_BLOCK_SIZE;
int64_t file_size = 0;
int64_t real_read_buf_size = 0;
RLockGuard guard(resize_rw_lock_);
time_guard.click("after hold by lock");
@ -169,9 +174,12 @@ int ObLogExternalStorageHandler::pread(const common::ObString &uri,
} else if (!is_running_) {
ret = OB_NOT_RUNNING;
CLOG_LOG(WARN, "ObLogExternalStorageHandler not running", K(uri), K(storage_info), K(offset), KP(buf), K(read_buf_size));
} else if (uri.empty() || storage_info.empty() || 0 > offset || NULL == buf || 0 >= read_buf_size) {
// when uri is NFS, storage_info is empty.
} else if (uri.empty() || 0 > offset || NULL == buf || 0 >= read_buf_size) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "ObLogExternalStorageHandler invalid argument", K(uri), K(storage_info), K(offset), KP(buf), K(read_buf_size));
} else if (OB_FAIL(handle_adapter_->get_file_size(uri, storage_info, file_size))) {
CLOG_LOG(WARN, "get_file_size failed", K(uri), K(storage_info), K(offset), KP(buf), K(read_buf_size));
} else if (offset >= file_size) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "read position lager than file size, invalid argument", K(file_size), K(offset), K(uri));
@ -223,6 +231,11 @@ void ObLogExternalStorageHandler::handle(void *task)
}
}
int64_t ObLogExternalStorageHandler::get_recommend_concurrency_in_single_file() const
{
return palf::PALF_PHY_BLOCK_SIZE / SINGLE_TASK_MINIMUM_SIZE;
}
bool ObLogExternalStorageHandler::is_valid_concurrency_(const int64_t concurrency) const
{
return 0 <= concurrency && CONCURRENCY_LIMIT >= concurrency;
@ -390,6 +403,7 @@ void ObLogExternalStorageHandler::destroy_and_init_new_thread_pool_(
do {
if (0 != new_concurrency
&& !FALSE_IT(share::ObThreadPool::set_run_wrapper(MTL_CTX()))
&& OB_FAIL(ObSimpleThreadPool::init(
new_concurrency, CAPACITY_COEFFICIENT * new_concurrency, "ObLogEXTTP", MTL_ID()))) {
CLOG_LOG(WARN, "init ObSimpleThreadPool failed", K(new_concurrency), KPC(this));
@ -405,5 +419,11 @@ void ObLogExternalStorageHandler::destroy_and_init_new_thread_pool_(
CLOG_LOG_RET(WARN, OB_SUCCESS, "destroy_and_init_new_thread_pool_ success", K(time_guard), KPC(this));
}
bool ObLogExternalStorageHandler::check_need_resize_(const int64_t new_concurrency) const
{
RLockGuard guard(resize_rw_lock_);
return new_concurrency != concurrency_;
}
} // end namespace logservice
} // end namespace oceanbase

View File

@ -114,6 +114,8 @@ public:
void handle(void *task) override final;
int64_t get_recommend_concurrency_in_single_file() const;
TO_STRING_KV(K_(concurrency), K_(capacity), K_(is_running), K_(is_inited), KP(handle_adapter_), KP(this));
private:
// CONCURRENCY LIMIT is 128.
@ -156,6 +158,8 @@ private:
void destroy_and_init_new_thread_pool_(const int64_t concurrency);
bool check_need_resize_(const int64_t concurrency) const;
private:
typedef common::RWLock RWLock;
typedef RWLock::RLockGuard RLockGuard;
@ -163,7 +167,7 @@ private:
typedef RWLock::WLockGuardWithTimeout WLockGuardTimeout;
int64_t concurrency_;
int64_t capacity_;
RWLock resize_rw_lock_;
mutable RWLock resize_rw_lock_;
ObSpinLock construct_async_task_lock_;
ObLogExternalStorageIOTaskHandleIAdapter *handle_adapter_;
bool is_running_;

View File

@ -129,7 +129,7 @@ int ObLogExternalStorageIOTaskCtx::get_ret_code() const
{
int ret = OB_SUCCESS;
for (int64_t i = 0; i < total_task_count_; i++) {
if (OB_SUCCESS != running_status_[i].ret_) {
if (0 == i && OB_SUCCESS != running_status_[i].ret_) {
ret = running_status_[i].ret_;
CLOG_LOG(WARN, "asyn task execute failed", KPC(this));
break;
@ -336,7 +336,7 @@ int ObLogExternalStorageIOTaskHandleAdapter::pread(const ObString &uri,
const int64_t read_buf_size,
int64_t &real_read_size)
{
ObTimeGuard time_guard("oss pread", 200 * 1000);
ObTimeGuard time_guard("oss pread", 100 * 1000);
int ret = OB_SUCCESS;
real_read_size = 0;
ObIODevice *io_device = NULL;
@ -348,7 +348,7 @@ int ObLogExternalStorageIOTaskHandleAdapter::pread(const ObString &uri,
} else if (FALSE_IT(time_guard.click("after open_io_fd"))) {
CLOG_LOG(WARN, "open_io_fd failed", K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size));
} else if (OB_FAIL(io_device->pread(io_fd, offset, read_buf_size, buf, real_read_size))) {
CLOG_LOG(WARN, "pread failed", K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size));
CLOG_LOG(WARN, "pread failed", K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size), K(time_guard));
} else if (FALSE_IT(time_guard.click("after pread"))) {
} else {
CLOG_LOG(TRACE, "pread success", K(time_guard), K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size));

View File

@ -114,18 +114,12 @@ int ObLogRestoreArchiveDriver::check_need_schedule_(ObLS &ls,
int64_t &task_count)
{
int ret = OB_SUCCESS;
int64_t concurrency = 0;
ObRemoteFetchContext context;
ObLogRestoreHandler *restore_handler = NULL;
bool need_delay = false;
need_schedule = false;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_));
const int64_t log_restore_concurrency = tenant_config.is_valid() ? tenant_config->log_restore_concurrency : 1L;
if (0 == log_restore_concurrency) {
concurrency = std::min(get_restore_concurrency_by_max_cpu(), MAX_LS_FETCH_LOG_TASK_CONCURRENCY);
} else {
concurrency = std::min(log_restore_concurrency, MAX_LS_FETCH_LOG_TASK_CONCURRENCY);
}
int64_t concurrency = 0;
int64_t fetch_log_worker_count = 0;
if (OB_ISNULL(restore_handler = ls.get_log_restore_handler())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("get restore_handler failed", K(ret), "id", ls.get_ls_id());
@ -133,6 +127,9 @@ int ObLogRestoreArchiveDriver::check_need_schedule_(ObLS &ls,
LOG_WARN("get fetch log context failed", K(ret), K(ls));
} else if (! need_schedule) {
// do nothing
} else if (OB_FAIL(worker_->get_thread_count(fetch_log_worker_count))) {
LOG_WARN("get_thread_count from worker_ failed", K(ret), K(ls));
} else if (FALSE_IT(concurrency = std::min(fetch_log_worker_count, MAX_LS_FETCH_LOG_TASK_CONCURRENCY))) {
} else if (context.issue_task_num_ >= concurrency) {
need_schedule = false;
} else if (OB_FAIL(check_need_delay_(ls.get_ls_id(), need_delay))) {

View File

@ -80,10 +80,5 @@ bool ObLogRestoreSourceTenant::is_valid() const
&& ip_list_.count() > 0;
}
int64_t get_restore_concurrency_by_max_cpu()
{
return static_cast<int64_t>(MTL_CPU_COUNT() + 7) / 8;
}
} // namespace logservice
} // namespace oceanbase

View File

@ -26,8 +26,6 @@ namespace oceanbase
namespace logservice
{
const int64_t MAX_FETCH_LOG_BUF_LEN = 4 * 1024 * 1024L;
const int64_t MIN_FETCH_LOG_WORKER_THREAD_COUNT = 1;
const int64_t MAX_FETCH_LOG_WORKER_THREAD_COUNT = 10;
const int64_t MAX_LS_FETCH_LOG_TASK_CONCURRENCY = 4;
struct ObLogRestoreErrorContext
@ -81,8 +79,6 @@ struct ObLogRestoreSourceTenant final
K_(is_oracle),
K_(ip_list));
};
int64_t get_restore_concurrency_by_max_cpu();
} // namespace logservice
} // namespace oceanbase

View File

@ -78,21 +78,22 @@ int ObLogRestoreScheduler::modify_thread_count_(const share::ObLogRestoreSourceT
{
int ret = OB_SUCCESS;
int64_t restore_concurrency = 0;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_));
const int64_t log_restore_concurrency =
tenant_config.is_valid() ? tenant_config->log_restore_concurrency : 1L;
// primary tenant or log restore source not location type, restore_concurrency is 1
// parameter log_restore_concurrency is default zero, set restore_concurrency = max_cpu / 8, rounded up
// parameter log_restore_concurrency not zero, set restore_concurrency = log_restore_concurrency
// for primary tenant, set restore_concurrency to 1.
// otherwise, set restore_concurrency to tenant config.
if (MTL_GET_TENANT_ROLE() == share::ObTenantRole::PRIMARY_TENANT
|| !share::is_location_log_source_type(source_type)) {
restore_concurrency = 1;
} else if (0 == log_restore_concurrency) {
restore_concurrency = get_restore_concurrency_by_max_cpu();
} else {
restore_concurrency = log_restore_concurrency;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_));
if (!tenant_config.is_valid()) {
restore_concurrency = 1L;
} else if (0 == tenant_config->log_restore_concurrency) {
restore_concurrency = MTL_CPU_COUNT();
} else {
restore_concurrency = tenant_config->log_restore_concurrency;
}
}
if (OB_FAIL(worker_->modify_thread_count(std::max(1L, restore_concurrency)))) {
if (OB_FAIL(worker_->modify_thread_count(restore_concurrency))) {
CLOG_LOG(WARN, "modify worker thread failed", K(ret));
}
return ret;

View File

@ -14,18 +14,18 @@
#define OCEANBASE_LOGSERVICE_OB_LOG_RESTORE_SERVICE_H_
#include "lib/utility/ob_macro_utils.h"
#include "rpc/frame/ob_req_transport.h" // ObReqTransport
#include "share/ob_thread_pool.h" // ObThreadPool
#include "ob_remote_fetch_log.h" // ObRemoteFetchLogImpl
#include "ob_log_restore_rpc.h" // ObLogResSvrRpc
#include "ob_remote_fetch_log_worker.h" // ObRemoteFetchWorker
#include "ob_remote_location_adaptor.h" // ObRemoteLocationAdaptor
#include "ob_remote_error_reporter.h" // ObRemoteErrorReporter
#include "ob_log_restore_allocator.h" // ObLogRestoreAllocator
#include "ob_log_restore_scheduler.h" // ObLogRestoreScheduler
#include "ob_log_restore_controller.h" // ObLogRestoreController
#include "ob_log_restore_net_driver.h" // ObLogRestoreNetDriver
#include "ob_log_restore_archive_driver.h" // ObLogRestoreArchiveDriver
#include "rpc/frame/ob_req_transport.h" // ObReqTransport
#include "share/ob_thread_pool.h" // ObThreadPool
#include "ob_remote_fetch_log.h" // ObRemoteFetchLogImpl
#include "ob_log_restore_rpc.h" // ObLogResSvrRpc
#include "ob_remote_fetch_log_worker.h" // ObRemoteFetchWorker
#include "ob_remote_location_adaptor.h" // ObRemoteLocationAdaptor
#include "ob_remote_error_reporter.h" // ObRemoteErrorReporter
#include "ob_log_restore_allocator.h" // ObLogRestoreAllocator
#include "ob_log_restore_scheduler.h" // ObLogRestoreScheduler
#include "ob_log_restore_controller.h" // ObLogRestoreController
#include "ob_log_restore_net_driver.h" // ObLogRestoreNetDriver
#include "ob_log_restore_archive_driver.h" // ObLogRestoreArchiveDriver
namespace oceanbase
{

View File

@ -38,14 +38,16 @@ RemoteDataGenerator::RemoteDataGenerator(const uint64_t tenant_id,
const ObLSID &id,
const LSN &start_lsn,
const LSN &end_lsn,
const SCN &end_scn) :
const SCN &end_scn,
ObLogExternalStorageHandler *log_ext_handler) :
tenant_id_(tenant_id),
id_(id),
start_lsn_(start_lsn),
next_fetch_lsn_(start_lsn),
end_scn_(end_scn),
end_lsn_(end_lsn),
to_end_(false)
to_end_(false),
log_ext_handler_(log_ext_handler)
{}
RemoteDataGenerator::~RemoteDataGenerator()
@ -55,6 +57,7 @@ RemoteDataGenerator::~RemoteDataGenerator()
start_lsn_.reset();
next_fetch_lsn_.reset();
end_lsn_.reset();
log_ext_handler_ = NULL;
}
bool RemoteDataGenerator::is_valid() const
@ -64,7 +67,8 @@ bool RemoteDataGenerator::is_valid() const
&& start_lsn_.is_valid()
&& end_scn_.is_valid()
&& end_lsn_.is_valid()
&& end_lsn_ > start_lsn_;
&& end_lsn_ > start_lsn_
&& log_ext_handler_ != NULL;
}
bool RemoteDataGenerator::is_fetch_to_end() const
@ -108,8 +112,9 @@ ServiceDataGenerator::ServiceDataGenerator(const uint64_t tenant_id,
const LSN &start_lsn,
const LSN &end_lsn,
const SCN &end_scn,
const ObAddr &server) :
RemoteDataGenerator(tenant_id, id, start_lsn, end_lsn, end_scn),
const ObAddr &server,
ObLogExternalStorageHandler *log_ext_handler) :
RemoteDataGenerator(tenant_id, id, start_lsn, end_lsn, end_scn, log_ext_handler),
server_(server),
result_()
{}
@ -184,34 +189,6 @@ static int64_t cal_lsn_to_file_id_(const LSN &lsn)
return cal_archive_file_id(lsn, palf::PALF_BLOCK_SIZE);
}
static int read_file_(const ObString &base,
const share::ObBackupStorageInfo *storage_info,
const share::ObLSID &id,
const int64_t file_id,
const int64_t offset,
char *data,
const int64_t data_len,
int64_t &data_size)
{
int ret = OB_SUCCESS;
share::ObBackupPath path;
if (OB_FAIL(ObArchivePathUtil::build_restore_path(base.ptr(), id, file_id, path))) {
LOG_WARN("build restore path failed", K(ret));
} else {
ObString uri(path.get_obstr());
int64_t real_size = 0;
if (OB_FAIL(ObArchiveFileUtils::range_read(uri, storage_info, data, data_len, offset, real_size))) {
LOG_WARN("read file failed", K(ret), K(uri), K(storage_info));
} else if (0 == real_size) {
ret = OB_ITER_END;
LOG_INFO("read no data, need retry", K(ret), K(uri), K(storage_info), K(offset), K(real_size));
} else {
data_size = real_size;
}
}
return ret;
}
static int extract_archive_file_header_(char *buf,
const int64_t buf_size,
palf::LSN &lsn)
@ -263,8 +240,9 @@ LocationDataGenerator::LocationDataGenerator(const uint64_t tenant_id,
ObLogArchivePieceContext *piece_context,
char *buf,
const int64_t buf_size,
const int64_t single_read_size) :
RemoteDataGenerator(tenant_id, id, start_lsn, end_lsn, end_scn),
const int64_t single_read_size,
ObLogExternalStorageHandler *log_ext_handler) :
RemoteDataGenerator(tenant_id, id, start_lsn, end_lsn, end_scn, log_ext_handler),
pre_scn_(pre_scn),
base_lsn_(),
data_len_(0),
@ -477,6 +455,40 @@ void LocationDataGenerator::cal_read_size_(const int64_t dest_id,
}
}
int LocationDataGenerator::read_file_(const ObString &base,
const share::ObBackupStorageInfo *storage_info,
const share::ObLSID &id,
const int64_t file_id,
const int64_t offset,
char *data,
const int64_t data_len,
int64_t &data_size)
{
int ret = OB_SUCCESS;
share::ObBackupPath path;
if (OB_FAIL(ObArchivePathUtil::build_restore_path(base.ptr(), id, file_id, path))) {
LOG_WARN("build restore path failed", K(ret));
} else {
ObString uri(path.get_obstr());
char storage_info_cstr[OB_MAX_BACKUP_STORAGE_INFO_LENGTH] = {'\0'};
int64_t real_size = 0;
if (OB_FAIL(storage_info->get_storage_info_str(storage_info_cstr, OB_MAX_BACKUP_STORAGE_INFO_LENGTH, false))) {
LOG_WARN("get_storage_info_str failed", K(ret), K(uri), K(storage_info));
} else {
ObString storage_info_ob_str(storage_info_cstr);
if (OB_FAIL(log_ext_handler_->pread(uri, storage_info_ob_str, offset, data, data_len, real_size))) {
LOG_WARN("read file failed", K(ret), K(uri), K(storage_info));
} else if (0 == real_size) {
ret = OB_ITER_END;
LOG_INFO("read no data, need retry", K(ret), K(uri), K(storage_info), K(offset), K(real_size));
} else {
data_size = real_size;
}
}
}
return ret;
}
bool LocationDataGenerator::FileDesc::is_valid() const
{
return dest_id_ > 0
@ -586,8 +598,9 @@ RawPathDataGenerator::RawPathDataGenerator(const uint64_t tenant_id,
const SCN &end_scn,
const int64_t piece_index,
const int64_t min_file_id,
const int64_t max_file_id) :
RemoteDataGenerator(tenant_id, id, start_lsn, end_lsn, end_scn),
const int64_t max_file_id,
ObLogExternalStorageHandler *log_ext_handler) :
RemoteDataGenerator(tenant_id, id, start_lsn, end_lsn, end_scn, log_ext_handler),
array_(array),
data_len_(0),
file_id_(0),

View File

@ -40,6 +40,7 @@ class LogGroupEntry;
}
namespace logservice
{
class ObLogExternalStorageHandler;
using oceanbase::palf::LSN;
using oceanbase::palf::LogGroupEntry;
using oceanbase::share::ObLSID;
@ -142,7 +143,8 @@ public:
const ObLSID &id,
const LSN &start_lsn,
const LSN &end_lsn,
const share::SCN &end_scn);
const share::SCN &end_scn,
logservice::ObLogExternalStorageHandler *log_ext_handler);
virtual ~RemoteDataGenerator();
public:
@ -166,6 +168,7 @@ protected:
share::SCN end_scn_;
LSN end_lsn_;
bool to_end_;
logservice::ObLogExternalStorageHandler *log_ext_handler_;
private:
DISALLOW_COPY_AND_ASSIGN(RemoteDataGenerator);
@ -179,7 +182,8 @@ public:
const LSN &start_lsn,
const LSN &end_lsn,
const share::SCN &end_scn,
const ObAddr &server);
const ObAddr &server,
logservice::ObLogExternalStorageHandler *log_ext_handler);
virtual ~ServiceDataGenerator();
public:
@ -213,7 +217,8 @@ public:
ObLogArchivePieceContext *piece_context,
char *buf,
const int64_t buf_size,
const int64_t single_read_size);
const int64_t single_read_size,
logservice::ObLogExternalStorageHandler *log_ext_handler);
~LocationDataGenerator();
int next_buffer(palf::LSN &lsn, char *&buf, int64_t &buf_size);
int update_max_lsn(const palf::LSN &lsn);
@ -282,6 +287,14 @@ private:
const int64_t file_id,
const int64_t file_offset,
int64_t &size);
int read_file_(const ObString &base,
const share::ObBackupStorageInfo *storage_info,
const share::ObLSID &id,
const int64_t file_id,
const int64_t offset,
char *data,
const int64_t data_len,
int64_t &real_read_size);
private:
share::SCN pre_scn_;
// base_lsn_ is the start_lsn from the archive file, while the next_fetch_lsn_ is the start_lsn to fetch,
@ -313,7 +326,8 @@ public:
const share::SCN &end_scn,
const int64_t piece_index,
const int64_t min_file_id,
const int64_t max_file_id);
const int64_t max_file_id,
logservice::ObLogExternalStorageHandler *log_ext_handler);
virtual ~RawPathDataGenerator();
int next_buffer(palf::LSN &lsn, char *&buf, int64_t &buf_size);

View File

@ -65,6 +65,7 @@ ObRemoteFetchWorker::ObRemoteFetchWorker() :
ls_svr_(NULL),
task_queue_(),
allocator_(NULL),
log_ext_handler_(),
cond_()
{}
@ -95,6 +96,8 @@ int ObRemoteFetchWorker::init(const uint64_t tenant_id,
K(allocator), K(restore_service), K(ls_svr));
} else if (OB_FAIL(task_queue_.init(FETCH_LOG_TASK_LIMIT, "RFLTaskQueue", MTL_ID()))) {
LOG_WARN("task_queue_ init failed", K(ret));
} else if (OB_FAIL(log_ext_handler_.init())) {
LOG_WARN("log_ext_handler_ init failed", K(ret));
} else {
tenant_id_ = tenant_id;
allocator_ = allocator;
@ -129,6 +132,7 @@ void ObRemoteFetchWorker::destroy()
ls_svr_ = NULL;
allocator_ = NULL;
restore_controller_ = NULL;
log_ext_handler_.destroy();
inited_ = false;
}
}
@ -140,6 +144,8 @@ int ObRemoteFetchWorker::start()
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
LOG_ERROR("ObRemoteFetchWorker not init", K(ret));
} else if (OB_FAIL(log_ext_handler_.start(0))) {
LOG_WARN("ObLogExtStorageHandler start failed");
} else if (OB_FAIL(ObThreadPool::start())) {
LOG_WARN("ObRemoteFetchWorker start failed", K(ret));
} else {
@ -151,12 +157,14 @@ int ObRemoteFetchWorker::start()
void ObRemoteFetchWorker::stop()
{
LOG_INFO("ObRemoteFetchWorker thread stop", K_(tenant_id));
log_ext_handler_.stop();
ObThreadPool::stop();
}
void ObRemoteFetchWorker::wait()
{
LOG_INFO("ObRemoteFetchWorker thread wait", K_(tenant_id));
log_ext_handler_.wait();
ObThreadPool::wait();
}
@ -183,21 +191,31 @@ int ObRemoteFetchWorker::submit_fetch_log_task(ObFetchLogTask *task)
return ret;
}
int ObRemoteFetchWorker::modify_thread_count(const int64_t thread_count)
int ObRemoteFetchWorker::modify_thread_count(const int64_t log_restore_concurrency)
{
int ret = OB_SUCCESS;
int64_t count = thread_count;
if (thread_count < MIN_FETCH_LOG_WORKER_THREAD_COUNT) {
count = MIN_FETCH_LOG_WORKER_THREAD_COUNT;
} else if (thread_count > MAX_FETCH_LOG_WORKER_THREAD_COUNT) {
count = MAX_FETCH_LOG_WORKER_THREAD_COUNT;
}
if (count == get_thread_count()) {
int64_t thread_count = 0;
if (OB_FAIL(log_ext_handler_.resize(log_restore_concurrency))) {
LOG_WARN("log_ext_handler_ resize failed", K(log_restore_concurrency), K(thread_count));
} else if (FALSE_IT(thread_count = calcuate_thread_count_(log_restore_concurrency))) {
LOG_WARN("calcuate_thread_count_ failed", K(log_restore_concurrency), K(thread_count));
} else if (thread_count == lib::Threads::get_thread_count()) {
// do nothing
} else if (OB_FAIL(set_thread_count(count))) {
} else if (OB_FAIL(set_thread_count(thread_count))) {
LOG_WARN("set thread count failed", K(ret));
} else {
LOG_INFO("set thread count succ", K(count));
LOG_INFO("set thread count succ", K(thread_count), K(log_restore_concurrency));
}
return ret;
}
int ObRemoteFetchWorker::get_thread_count(int64_t &thread_count) const
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(! inited_)) {
LOG_WARN("ObRemoteFetchWorker not init");
} else {
thread_count = lib::Threads::get_thread_count();
}
return ret;
}
@ -228,7 +246,7 @@ void ObRemoteFetchWorker::do_thread_task_()
{
int ret = OB_SUCCESS;
int64_t size = task_queue_.size();
if (0 != get_thread_idx() || get_thread_count() <= 1) {
if (0 != get_thread_idx() || lib::Threads::get_thread_count() <= 1) {
for (int64_t i = 0; i < size && OB_SUCC(ret) && !has_set_stop(); i++) {
if (OB_FAIL(handle_single_task_())) {
LOG_WARN("handle single task failed", K(ret));
@ -292,7 +310,8 @@ int ObRemoteFetchWorker::handle_fetch_log_task_(ObFetchLogTask *task)
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid argument", K(ret), K(task));
} else if (OB_FAIL(task->iter_.init(tenant_id_, task->id_, task->pre_scn_,
task->cur_lsn_, task->end_lsn_, allocator_->get_buferr_pool(), DEFAULT_BUF_SIZE))) {
task->cur_lsn_, task->end_lsn_, allocator_->get_buferr_pool(),
&log_ext_handler_, DEFAULT_BUF_SIZE))) {
LOG_WARN("ObRemoteLogIterator init failed", K(ret), K_(tenant_id), KPC(task));
} else if (!need_fetch_log_(task->id_)) {
LOG_TRACE("no need fetch log", KPC(task));
@ -599,5 +618,14 @@ void ObRemoteFetchWorker::report_error_(const ObLSID &id,
}
}
int64_t ObRemoteFetchWorker::calcuate_thread_count_(const int64_t log_restore_concurrency)
{
int64_t thread_count = 0;
int64_t recommend_concurrency_in_single_file = log_ext_handler_.get_recommend_concurrency_in_single_file();
thread_count = static_cast<int64_t>(
log_restore_concurrency + recommend_concurrency_in_single_file - 1) / recommend_concurrency_in_single_file;
return thread_count;
}
} // namespace logservice
} // namespace oceanbase

View File

@ -13,11 +13,12 @@
#ifndef OCEANBASE_LOGSERVICE_OB_REMOTE_FETCH_LOG_WORKER_H_
#define OCEANBASE_LOGSERVICE_OB_REMOTE_FETCH_LOG_WORKER_H_
#include "lib/queue/ob_lighty_queue.h" // ObLightyQueue
#include "common/ob_queue_thread.h" // ObCond
#include "share/ob_thread_pool.h" // ObThreadPool
#include "share/ob_ls_id.h" // ObLSID
#include "ob_remote_log_iterator.h" // ObRemoteLogGroupEntryIterator
#include "lib/queue/ob_lighty_queue.h" // ObLightyQueue
#include "common/ob_queue_thread.h" // ObCond
#include "share/ob_thread_pool.h" // ObThreadPool
#include "share/ob_ls_id.h" // ObLSID
#include "ob_remote_log_iterator.h" // ObRemoteLogGroupEntryIterator
#include "logservice/ob_log_external_storage_handler.h" // ObLogExternalStorageHandler
namespace oceanbase
{
@ -71,7 +72,8 @@ public:
// @retval other code unexpected error
int submit_fetch_log_task(ObFetchLogTask *task);
int modify_thread_count(const int64_t count);
int modify_thread_count(const int64_t log_restore_concurrency);
int get_thread_count(int64_t &thread_count) const;
private:
void run1();
@ -99,6 +101,7 @@ private:
const int ret_code,
const palf::LSN &lsn,
const ObLogRestoreErrorContext::ErrorType &error_type);
int64_t calcuate_thread_count_(const int64_t log_restore_concurrency);
private:
bool inited_;
uint64_t tenant_id_;
@ -107,6 +110,7 @@ private:
storage::ObLSService *ls_svr_;
common::ObLightyQueue task_queue_;
ObLogRestoreAllocator *allocator_;
ObLogExternalStorageHandler log_ext_handler_;
common::ObCond cond_;
private:

View File

@ -33,6 +33,7 @@
#include "share/backup/ob_backup_struct.h"
#include "share/rc/ob_tenant_base.h"
#include "logservice/archiveservice/large_buffer_pool.h"
#include "logservice/ob_log_external_storage_handler.h" // ObLogExternalHandler
namespace oceanbase
{
@ -81,6 +82,7 @@ public:
const LSN &start_lsn,
const LSN &end_lsn,
archive::LargeBufferPool *buffer_pool,
logservice::ObLogExternalStorageHandler *log_ext_handler,
const int64_t single_read_size = DEFAULT_SINGLE_READ_SIZE);
// @brief used as local iterator, get one entry if not to end
// @param[out] entry LogGroupEntry or LogEntry
@ -138,6 +140,7 @@ private:
char *buf_;
int64_t buf_size_;
archive::LargeBufferPool *buffer_pool_;
logservice::ObLogExternalStorageHandler *log_ext_handler_;
GetSourceFunc get_source_func_;
UpdateSourceFunc update_source_func_;
RefreshStorageInfoFunc refresh_storage_info_func_;

View File

@ -28,6 +28,7 @@ ObRemoteLogIterator<LogEntryType>::ObRemoteLogIterator(GetSourceFunc &get_source
buf_(NULL),
buf_size_(0),
buffer_pool_(NULL),
log_ext_handler_(NULL),
get_source_func_(get_source_func),
update_source_func_(update_source_func),
refresh_storage_info_func_(refresh_storage_info_func)
@ -46,6 +47,7 @@ int ObRemoteLogIterator<LogEntryType>::init(const uint64_t tenant_id,
const LSN &start_lsn,
const LSN &end_lsn,
archive::LargeBufferPool *buffer_pool,
logservice::ObLogExternalStorageHandler *log_ext_handler,
const int64_t single_read_size)
{
int ret = OB_SUCCESS;
@ -59,10 +61,11 @@ int ObRemoteLogIterator<LogEntryType>::init(const uint64_t tenant_id,
|| ! id.is_valid()
|| ! start_lsn.is_valid()
|| (end_lsn.is_valid() && end_lsn <= start_lsn)
|| NULL == log_ext_handler
|| single_read_size < 2 * 1024 * 1024)) { // TODO set size
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid argument", K(ret), K(buffer_pool),
K(tenant_id), K(id), K(start_lsn), K(end_lsn));
CLOG_LOG(WARN, "invalid argument", K(ret), KP(buffer_pool),
K(tenant_id), K(id), K(start_lsn), K(end_lsn), KP(log_ext_handler));
} else if (OB_FAIL(get_source_func_(id, source_guard_))) {
CLOG_LOG(WARN, "get source failed", K(ret), K(id));
} else if (OB_ISNULL(source = source_guard_.get_source())) {
@ -82,6 +85,7 @@ int ObRemoteLogIterator<LogEntryType>::init(const uint64_t tenant_id,
start_lsn_ = start_lsn;
end_lsn_ = end_lsn;
single_read_size_ = single_read_size;
log_ext_handler_ = log_ext_handler;
ret = build_data_generator_(pre_scn, source, refresh_storage_info_func_);
CLOG_LOG(INFO, "ObRemoteLogIterator init", K(ret), K(tenant_id), K(id), K(pre_scn), K(start_lsn), K(end_lsn));
}
@ -126,6 +130,8 @@ void ObRemoteLogIterator<LogEntryType>::reset()
buf_size_ = 0;
}
log_ext_handler_ = NULL;
id_.reset();
start_lsn_.reset();
cur_lsn_.reset();
@ -200,7 +206,7 @@ int ObRemoteLogIterator<LogEntryType>::build_dest_data_generator_(const share::S
source->get(array, end_scn);
source->get_locate_info(piece_index, min_file_id, max_file_id);
gen_ = MTL_NEW(RawPathDataGenerator, "ResDataGen", tenant_id_, id_, start_lsn_, end_lsn_,
array, end_scn, piece_index, min_file_id, max_file_id);
array, end_scn, piece_index, min_file_id, max_file_id, log_ext_handler_);
if (OB_ISNULL(gen_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
CLOG_LOG(WARN, "alloc dest data generator failed", K(ret), KPC(this));
@ -221,7 +227,7 @@ int ObRemoteLogIterator<LogEntryType>::build_location_data_generator_(const shar
source->get(dest, piece_context, end_scn);
gen_ = MTL_NEW(LocationDataGenerator, "ResDataGen", tenant_id_, pre_scn,
id_, start_lsn_, end_lsn_, end_scn, dest, piece_context, buf_, buf_size_,
single_read_size_);
single_read_size_, log_ext_handler_);
if (OB_ISNULL(gen_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
CLOG_LOG(WARN, "alloc location data generator failed", K(ret), KPC(this));

View File

@ -140,7 +140,7 @@ TEST(TestLogExternalStorageHandler, test_log_external_storage_handler)
handler.handle_adapter_ = &adapter;
EXPECT_EQ(true, handler.is_inited_);
EXPECT_EQ(OB_NOT_RUNNING, handler.pread(uri, storage_info, offset, read_buf, read_buf_size, real_read_size));
EXPECT_EQ(OB_NOT_RUNNING, handler.resize(16, 0));
EXPECT_EQ(OB_NOT_RUNNING, handler.resize(16, 100));
// 测试invalid argument
@ -159,8 +159,9 @@ TEST(TestLogExternalStorageHandler, test_log_external_storage_handler)
{
ObString empty_uri;
EXPECT_EQ(OB_INVALID_ARGUMENT, handler.pread(empty_uri, storage_info, offset, read_buf, read_buf_size, real_read_size));
// NFS的storage info为empty
ObString empty_storage_info;
EXPECT_EQ(OB_INVALID_ARGUMENT, handler.pread(uri, empty_storage_info, offset, read_buf, read_buf_size, real_read_size));
EXPECT_EQ(OB_SUCCESS, handler.pread(uri, empty_storage_info, offset, read_buf, read_buf_size, real_read_size));
int64_t invalid_offset = -1;
EXPECT_EQ(OB_INVALID_ARGUMENT, handler.pread(uri, storage_info, invalid_offset, read_buf, read_buf_size, real_read_size));
invalid_offset = 100*1024*1024;
@ -175,7 +176,7 @@ TEST(TestLogExternalStorageHandler, test_log_external_storage_handler)
{
int64_t invalid_concurrency = -1;
EXPECT_EQ(OB_INVALID_ARGUMENT, handler.resize(invalid_concurrency, 0));
int64_t invalid_timeout_us = -1;
int64_t invalid_timeout_us = 0;
EXPECT_EQ(OB_INVALID_ARGUMENT, handler.resize(concurrency, invalid_timeout_us));
}
@ -488,7 +489,7 @@ int main(int argc, char **argv)
{
system("rm -rf test_log_external_storage_handler.log*");
OB_LOGGER.set_file_name("test_log_external_storage_handler.log", true);
OB_LOGGER.set_log_level("TRACE");
OB_LOGGER.set_log_level("WDIAG");
srandom(ObTimeUtility::current_time());
PALF_LOG(INFO, "begin unittest::test_log_external_storage_handler");
::testing::InitGoogleTest(&argc, argv);