diff --git a/src/logservice/cdcservice/ob_cdc_fetcher.cpp b/src/logservice/cdcservice/ob_cdc_fetcher.cpp index 60d092b8d..e3c290f95 100644 --- a/src/logservice/cdcservice/ob_cdc_fetcher.cpp +++ b/src/logservice/cdcservice/ob_cdc_fetcher.cpp @@ -31,7 +31,8 @@ ObCdcFetcher::ObCdcFetcher() : is_inited_(false), tenant_id_(OB_INVALID_TENANT_ID), ls_service_(NULL), - large_buffer_pool_(NULL) + large_buffer_pool_(NULL), + log_ext_handler_(NULL) { } @@ -42,7 +43,8 @@ ObCdcFetcher::~ObCdcFetcher() int ObCdcFetcher::init(const uint64_t tenant_id, ObLSService *ls_service, - archive::LargeBufferPool *buffer_pool) + archive::LargeBufferPool *buffer_pool, + logservice::ObLogExternalStorageHandler *log_ext_handler) { int ret = OB_SUCCESS; @@ -50,7 +52,8 @@ int ObCdcFetcher::init(const uint64_t tenant_id, ret = OB_INIT_TWICE; LOG_WARN("inited twice", KR(ret)); } else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id) - || OB_ISNULL(ls_service) || OB_ISNULL(buffer_pool)) { + || OB_ISNULL(ls_service) || OB_ISNULL(buffer_pool) + || OB_ISNULL(log_ext_handler)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_service), K(buffer_pool)); } else { @@ -58,6 +61,7 @@ int ObCdcFetcher::init(const uint64_t tenant_id, tenant_id_ = tenant_id; ls_service_ = ls_service; large_buffer_pool_ = buffer_pool; + log_ext_handler_ = log_ext_handler; } return ret; @@ -70,6 +74,7 @@ void ObCdcFetcher::destroy() tenant_id_ = OB_INVALID_TENANT_ID; ls_service_ = NULL; large_buffer_pool_ = NULL; + log_ext_handler_ = NULL; } } @@ -352,7 +357,8 @@ int ObCdcFetcher::fetch_log_in_archive_( if (OB_FAIL(pre_scn.convert_from_ts(ctx.get_progress()/1000L))) { LOG_WARN("convert progress to scn failed", KR(ret), K(ctx)); } else if (need_init_iter && OB_FAIL(remote_iter.init(tenant_id_, ls_id, pre_scn, - start_lsn, LSN(LOG_MAX_LSN_VAL), large_buffer_pool_))) { + start_lsn, LSN(LOG_MAX_LSN_VAL), large_buffer_pool_, + log_ext_handler_))) { LOG_WARN("init remote log iterator failed", KR(ret), K(tenant_id_), K(ls_id)); } else if (OB_FAIL(remote_iter.next(log_entry, lsn, buf, buf_size))) { // expected OB_ITER_END and OB_SUCCEES, error occurs when other code is returned. diff --git a/src/logservice/cdcservice/ob_cdc_fetcher.h b/src/logservice/cdcservice/ob_cdc_fetcher.h index e2d91e0ac..ce1e2ada5 100644 --- a/src/logservice/cdcservice/ob_cdc_fetcher.h +++ b/src/logservice/cdcservice/ob_cdc_fetcher.h @@ -27,6 +27,10 @@ namespace oceanbase { +namespace logservice +{ +class ObLogExternalStorageHandler; +} namespace cdc { using oceanbase::storage::ObLSService; @@ -48,7 +52,8 @@ public: ~ObCdcFetcher(); int init(const uint64_t tenant_id, ObLSService *ls_service, - archive::LargeBufferPool *buffer_pool); + archive::LargeBufferPool *buffer_pool, + logservice::ObLogExternalStorageHandler *log_ext_handler); void destroy(); public: @@ -201,6 +206,7 @@ private: uint64_t tenant_id_; ObLSService *ls_service_; archive::LargeBufferPool *large_buffer_pool_; + logservice::ObLogExternalStorageHandler *log_ext_handler_; }; // Some parameters and status during Fetch execution diff --git a/src/logservice/cdcservice/ob_cdc_service.cpp b/src/logservice/cdcservice/ob_cdc_service.cpp index f7214c211..820a9e5f5 100644 --- a/src/logservice/cdcservice/ob_cdc_service.cpp +++ b/src/logservice/cdcservice/ob_cdc_service.cpp @@ -50,7 +50,8 @@ ObCdcService::ObCdcService() dest_info_(), dest_info_lock_(), ls_ctx_map_(), - large_buffer_pool_() + large_buffer_pool_(), + log_ext_handler_() { } @@ -71,9 +72,11 @@ int ObCdcService::init(const uint64_t tenant_id, EXTLOG_LOG(WARN, "ls ctx map init failed", KR(ret), K(tenant_id)); } else if (OB_FAIL(large_buffer_pool_.init("CDCService", 1L * 1024 * 1024 * 1024))) { EXTLOG_LOG(WARN, "large buffer pool init failed", KR(ret), K(tenant_id)); - } else if (OB_FAIL(locator_.init(tenant_id, &large_buffer_pool_))) { + } else if (OB_FAIL(log_ext_handler_.init())) { + EXTLOG_LOG(WARN, "log ext handler init failed", KR(ret), K(tenant_id)); + } else if (OB_FAIL(locator_.init(tenant_id, &large_buffer_pool_, &log_ext_handler_))) { EXTLOG_LOG(WARN, "ObCdcStartLsnLocator init failed", KR(ret), K(tenant_id)); - } else if (OB_FAIL(fetcher_.init(tenant_id, ls_service, &large_buffer_pool_))) { + } else if (OB_FAIL(fetcher_.init(tenant_id, ls_service, &large_buffer_pool_, &log_ext_handler_))) { EXTLOG_LOG(WARN, "ObCdcFetcher init failed", KR(ret), K(tenant_id)); } else if (OB_FAIL(create_tenant_tg_(tenant_id))) { EXTLOG_LOG(WARN, "cdc thread group create failed", KR(ret), K(tenant_id)); @@ -131,7 +134,6 @@ void ObCdcService::run1() large_buffer_pool_.weed_out(); last_purge_ts = current_ts; } - ob_usleep(static_cast(BASE_INTERVAL)); } } @@ -144,6 +146,9 @@ int ObCdcService::start() if (IS_NOT_INIT) { ret = OB_NOT_INIT; EXTLOG_LOG(WARN, "ObCdcService not init", K(ret)); + // TODO by wenyue.zxl: change the concurrency of 'log_ext_handler_'(see resize interface) + } else if (OB_FAIL(log_ext_handler_.start(0))) { + EXTLOG_LOG(WARN, "log ext handler start failed", K(ret)); } else if (OB_FAIL(start_tenant_tg_(MTL_ID()))) { EXTLOG_LOG(ERROR, "start CDCService failed", KR(ret)); } else { @@ -157,11 +162,13 @@ void ObCdcService::stop() { ATOMIC_STORE(&stop_flag_, true); stop_tenant_tg_(MTL_ID()); + log_ext_handler_.stop(); } void ObCdcService::wait() { wait_tenant_tg_(MTL_ID()); + log_ext_handler_.wait(); // do nothing } @@ -175,6 +182,7 @@ void ObCdcService::destroy() dest_info_.reset(); large_buffer_pool_.destroy(); ls_ctx_map_.destroy(); + log_ext_handler_.destroy(); } int ObCdcService::req_start_lsn_by_ts_ns(const obrpc::ObCdcReqStartLSNByTsReq &req, diff --git a/src/logservice/cdcservice/ob_cdc_service.h b/src/logservice/cdcservice/ob_cdc_service.h index 5aff1e5a0..62024c2e6 100644 --- a/src/logservice/cdcservice/ob_cdc_service.h +++ b/src/logservice/cdcservice/ob_cdc_service.h @@ -18,6 +18,7 @@ #include "ob_cdc_start_lsn_locator.h" #include "ob_cdc_struct.h" // ClientLSKey, ClientLSCtx, ClientLSCtxMap #include "logservice/restoreservice/ob_remote_log_iterator.h" +#include "logservice/ob_log_external_storage_handler.h" // ObLogExternalStorageHandler namespace oceanbase { @@ -119,6 +120,7 @@ private: common::ObSpinLock dest_info_lock_; ClientLSCtxMap ls_ctx_map_; archive::LargeBufferPool large_buffer_pool_; + logservice::ObLogExternalStorageHandler log_ext_handler_; }; } // namespace cdc diff --git a/src/logservice/cdcservice/ob_cdc_start_lsn_locator.cpp b/src/logservice/cdcservice/ob_cdc_start_lsn_locator.cpp index 0e3e437c9..006b523eb 100644 --- a/src/logservice/cdcservice/ob_cdc_start_lsn_locator.cpp +++ b/src/logservice/cdcservice/ob_cdc_start_lsn_locator.cpp @@ -24,7 +24,8 @@ namespace cdc ObCdcStartLsnLocator::ObCdcStartLsnLocator() : is_inited_(false), tenant_id_(OB_INVALID_TENANT_ID), - large_buffer_pool_(NULL) + large_buffer_pool_(NULL), + log_ext_handler_(NULL) { } @@ -34,7 +35,8 @@ ObCdcStartLsnLocator::~ObCdcStartLsnLocator() } int ObCdcStartLsnLocator::init(const uint64_t tenant_id, - archive::LargeBufferPool *buffer_pool) + archive::LargeBufferPool *buffer_pool, + logservice::ObLogExternalStorageHandler *log_ext_handler) { int ret = OB_SUCCESS; @@ -48,6 +50,7 @@ int ObCdcStartLsnLocator::init(const uint64_t tenant_id, is_inited_ = true; tenant_id_ = tenant_id; large_buffer_pool_ = buffer_pool; + log_ext_handler_ = log_ext_handler; } return ret; @@ -59,6 +62,7 @@ void ObCdcStartLsnLocator::destroy() is_inited_ = false; tenant_id_ = OB_INVALID_TENANT_ID; large_buffer_pool_ = NULL; + log_ext_handler_ = NULL; } } @@ -252,7 +256,7 @@ int ObCdcStartLsnLocator::do_locate_ls_(const bool fetch_archive_only, LSN lsn; if (OB_FAIL(remote_group_iter.init(tenant_id_, ls_id, start_scn, - result_lsn, LSN(palf::LOG_MAX_LSN_VAL), large_buffer_pool_))) { + result_lsn, LSN(palf::LOG_MAX_LSN_VAL), large_buffer_pool_, log_ext_handler_))) { LOG_WARN("init remote group iter failed when retriving log group entry in start lsn locator", KR(ret), K(ls_id), K(tenant_id_)); } else if (OB_FAIL(remote_group_iter.next(log_group_entry, lsn, next_buf, next_buf_size))) { LOG_WARN("iterate through archive log failed", KR(ret), K(ls_id), K(tenant_id_)); diff --git a/src/logservice/cdcservice/ob_cdc_start_lsn_locator.h b/src/logservice/cdcservice/ob_cdc_start_lsn_locator.h index c207603a0..df7d12eaf 100644 --- a/src/logservice/cdcservice/ob_cdc_start_lsn_locator.h +++ b/src/logservice/cdcservice/ob_cdc_start_lsn_locator.h @@ -21,6 +21,10 @@ #include "logservice/archiveservice/large_buffer_pool.h" // LargeBufferPool namespace oceanbase { +namespace logservice +{ +class ObLogExternalStorageHandler; +} namespace cdc { using oceanbase::share::ObLSID; @@ -35,7 +39,8 @@ public: ObCdcStartLsnLocator(); ~ObCdcStartLsnLocator(); int init(const uint64_t tenant_id, - archive::LargeBufferPool *large_buffer_pool); + archive::LargeBufferPool *large_buffer_pool, + logservice::ObLogExternalStorageHandler *log_ext_handler); void destroy(); int req_start_lsn_by_ts_ns(const ObLocateLSNByTsReq &req_msg, ObLocateLSNByTsResp &result, @@ -61,6 +66,7 @@ private: bool is_inited_; uint64_t tenant_id_; archive::LargeBufferPool *large_buffer_pool_; + logservice::ObLogExternalStorageHandler *log_ext_handler_; }; } // namespace cdc diff --git a/src/logservice/libobcdc/src/ob_log_fetcher.cpp b/src/logservice/libobcdc/src/ob_log_fetcher.cpp index ee285da92..a1bf24e57 100644 --- a/src/logservice/libobcdc/src/ob_log_fetcher.cpp +++ b/src/logservice/libobcdc/src/ob_log_fetcher.cpp @@ -45,6 +45,7 @@ ObLogFetcher::ObLogFetcher() : fetching_mode_(ClientFetchingMode::FETCHING_MODE_UNKNOWN), archive_dest_(), large_buffer_pool_(), + log_ext_handler_(), task_pool_(NULL), sys_ls_handler_(NULL), err_handler_(NULL), @@ -139,6 +140,8 @@ int ObLogFetcher::init( LOG_ERROR("init part trans resolver factory fail", KR(ret)); } else if (large_buffer_pool_.init("ObLogFetcher", 1L * 1024 * 1024 * 1024)) { LOG_ERROR("init large buffer pool failed", KR(ret)); + } else if (log_ext_handler_.init()) { + LOG_ERROR("init log ext handler failed", KR(ret)); } else if (OB_FAIL(ls_fetch_mgr_.init(max_cached_ls_fetch_ctx_count, progress_controller_, part_trans_resolver_factory_, @@ -243,6 +246,8 @@ void ObLogFetcher::destroy() } // Finally reset fetching_mode_ because of some processing dependencies, such as ObLogRouteService fetching_mode_ = ClientFetchingMode::FETCHING_MODE_UNKNOWN; + log_ext_handler_.wait(); + log_ext_handler_.destroy(); LOG_INFO("destroy fetcher succ"); } @@ -263,7 +268,10 @@ int ObLogFetcher::start() } else { stop_flag_ = false; - if (is_integrated_fetching_mode(fetching_mode_) && OB_FAIL(log_route_service_.start())) { + // TODO by wenyue.zxl: change the concurrency of 'log_ext_handler_'(see resize interface) + if (OB_FAIL(log_ext_handler_.start(0))) { + LOG_ERROR("start ObLogExternalStorageHandler fail", KR(ret)); + } else if (is_integrated_fetching_mode(fetching_mode_) && OB_FAIL(log_route_service_.start())) { LOG_ERROR("start LogRouterService fail", KR(ret)); } else if (OB_FAIL(start_lsn_locator_.start())) { LOG_ERROR("start 'start_lsn_locator' fail", KR(ret)); @@ -318,6 +326,7 @@ void ObLogFetcher::stop() if (is_integrated_fetching_mode(fetching_mode_)) { log_route_service_.stop(); } + log_ext_handler_.stop(); LOG_INFO("stop fetcher succ"); } @@ -604,6 +613,19 @@ int ObLogFetcher::get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_ return ret; } +int ObLogFetcher::get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler) +{ + int ret = OB_SUCCESS; + + if(IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_ERROR("fetcher not inited, could not get log ext handler", KR(ret), K_(is_inited)); + } else { + log_ext_handler = &log_ext_handler_; + } + return ret; +} + int ObLogFetcher::check_progress( const uint64_t tenant_id, const int64_t timestamp, diff --git a/src/logservice/libobcdc/src/ob_log_fetcher.h b/src/logservice/libobcdc/src/ob_log_fetcher.h index 01af5713a..686821c88 100644 --- a/src/logservice/libobcdc/src/ob_log_fetcher.h +++ b/src/logservice/libobcdc/src/ob_log_fetcher.h @@ -76,6 +76,8 @@ public: virtual int get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_pool) = 0; + virtual int get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler) = 0; + // Checks if the sys progress of specified tenant exceeds the timestamp // For LogMetaDataService: // 1. At startup time, it need to build the baseline data for the startup timestamp, @@ -168,6 +170,8 @@ public: virtual int get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_pool); + virtual int get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler); + virtual int check_progress( const uint64_t tenant_id, const int64_t timestamp, @@ -242,6 +246,7 @@ private: ClientFetchingMode fetching_mode_; ObBackupPathString archive_dest_; archive::LargeBufferPool large_buffer_pool_; + logservice::ObLogExternalStorageHandler log_ext_handler_; TaskPool *task_pool_; IObLogSysLsTaskHandler *sys_ls_handler_; IObLogErrHandler *err_handler_; diff --git a/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp b/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp index 750623b55..e2d95206b 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp +++ b/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.cpp @@ -248,6 +248,7 @@ int LSFetchCtx::init_remote_iter() const LSN &start_lsn = progress_.get_next_lsn(); const int64_t cur_log_progress = progress_.get_progress(); archive::LargeBufferPool *large_buffer_pool = NULL; + logservice::ObLogExternalStorageHandler *log_ext_handler = NULL; SCN start_scn; if (remote_iter_.is_init()) { @@ -257,8 +258,10 @@ int LSFetchCtx::init_remote_iter() LOG_ERROR("convert log progress to start scn failed", KR(ret), K(cur_log_progress)); } else if (OB_FAIL(get_large_buffer_pool(large_buffer_pool))) { LOG_ERROR("get large buffer pool failed", KR(ret)); + } else if (OB_FAIL(get_log_ext_handler(log_ext_handler))) { + LOG_ERROR("get log ext handler failed", KR(ret)); } else if (OB_FAIL(remote_iter_.init(tenant_id, ls_id, start_scn, start_lsn, - LSN(LOG_MAX_LSN_VAL), large_buffer_pool))) { + LSN(LOG_MAX_LSN_VAL), large_buffer_pool, log_ext_handler))) { LOG_ERROR("remote iter init failed", KR(ret), K(tenant_id), K(ls_id), K(start_scn), K(start_lsn)); } return ret; @@ -681,6 +684,22 @@ int LSFetchCtx::get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_po return ret; } +int LSFetchCtx::get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler) +{ + int ret = OB_SUCCESS; + IObLogFetcher *fetcher = static_cast(ls_fetch_mgr_->get_fetcher_host()); + if (OB_ISNULL(fetcher)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("fetcher is nullptr", KR(ret), K(fetcher)); + } else if (OB_FAIL(fetcher->get_log_ext_handler(log_ext_handler))) { + LOG_ERROR("Fetcher get_log_ext_handler fail", KR(ret)); + } else if (OB_ISNULL(log_ext_handler)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("log_ext_handler is nullptr", KR(ret), K(log_ext_handler)); + } + return ret; +} + bool LSFetchCtx::need_update_svr_list() { int ret = OB_SUCCESS; diff --git a/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.h b/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.h index 32557f5f1..a89dba01b 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.h +++ b/src/logservice/libobcdc/src/ob_log_ls_fetch_ctx.h @@ -181,6 +181,7 @@ public: int locate_end_lsn(IObLogStartLSNLocator &start_lsn_locator); int get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_pool); + int get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler); ObRemoteLogParent *get_archive_source() { return source_; } int init_remote_iter(); diff --git a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp index 02188817f..43cb804bd 100644 --- a/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp +++ b/src/logservice/libobcdc/src/ob_log_ls_fetch_stream.cpp @@ -1447,6 +1447,7 @@ int FetchStream::fetch_miss_log_direct_( const int64_t tenant_id = ls_fetch_ctx.get_tls_id().get_tenant_id(); const ObLSID &ls_id = ls_fetch_ctx.get_tls_id().get_ls_id(); archive::LargeBufferPool *buffer_pool = NULL; + logservice::ObLogExternalStorageHandler *log_ext_handler = NULL; ObRpcResultCode rcode; SCN cur_scn; const int64_t start_fetch_ts = get_timestamp(); @@ -1460,6 +1461,8 @@ int FetchStream::fetch_miss_log_direct_( LOG_ERROR("convert log progress to scn failed", KR(ret), K(current_progress)); } else if (OB_FAIL(ls_fetch_ctx.get_large_buffer_pool(buffer_pool))) { LOG_ERROR("get large buffer pool when fetching missing log failed", KR(ret), K(ls_fetch_ctx)); + } else if (OB_FAIL(ls_fetch_ctx.get_log_ext_handler(log_ext_handler))) { + LOG_ERROR("get log ext handler when fetching missing log failed", KR(ret), K(ls_fetch_ctx)); } else { int64_t fetched_cnt = 0; const int64_t arr_cnt = miss_log_array.count(); @@ -1480,7 +1483,7 @@ int FetchStream::fetch_miss_log_direct_( if (get_timestamp() > time_upper_limit) { is_timeout = true; } else if (OB_FAIL(entry_iter.init(tenant_id, ls_id, cur_scn, missing_lsn, - LSN(palf::LOG_MAX_LSN_VAL), buffer_pool))) { + LSN(palf::LOG_MAX_LSN_VAL), buffer_pool, log_ext_handler))) { LOG_WARN("remote entry iter init failed", KR(ret)); } else if (OB_FAIL(entry_iter.next(log_entry, lsn, buf, buf_size))) { retry_on_err =true; diff --git a/src/logservice/logfetcher/ob_log_fetcher.cpp b/src/logservice/logfetcher/ob_log_fetcher.cpp index b0175e298..ef899e8b1 100755 --- a/src/logservice/logfetcher/ob_log_fetcher.cpp +++ b/src/logservice/logfetcher/ob_log_fetcher.cpp @@ -46,6 +46,7 @@ ObLogFetcher::ObLogFetcher() : fetching_mode_(ClientFetchingMode::FETCHING_MODE_UNKNOWN), archive_dest_(), large_buffer_pool_(), + log_ext_handler_(), ls_ctx_add_info_factory_(NULL), err_handler_(NULL), ls_fetch_mgr_(), @@ -128,6 +129,8 @@ int ObLogFetcher::init( LOG_ERROR("init progress controller fail", KR(ret)); } else if (OB_FAIL(large_buffer_pool_.init("ObLogFetcher", 1L * 1024 * 1024 * 1024))) { LOG_ERROR("init large buffer pool failed", KR(ret)); + } else if (OB_FAIL(log_ext_handler_.init())) { + LOG_ERROR("init failed", KR(ret)); } else if (OB_FAIL(ls_fetch_mgr_.init( progress_controller_, ls_ctx_factory, @@ -232,6 +235,8 @@ void ObLogFetcher::destroy() log_route_service_.wait(); log_route_service_.destroy(); } + log_ext_handler_.wait(); + log_ext_handler_.destroy(); // Finally reset fetching_mode_ because of some processing dependencies, such as ObLogRouteService fetching_mode_ = ClientFetchingMode::FETCHING_MODE_UNKNOWN; log_fetcher_user_ = LogFetcherUser::UNKNOWN; @@ -264,6 +269,9 @@ int ObLogFetcher::start() LOG_ERROR("start dead pool fail", KR(ret)); } else if (OB_FAIL(stream_worker_.start())) { LOG_ERROR("start stream worker fail", KR(ret)); + // TODO by wenyue.zxl: change the concurrency of 'log_ext_handler_'(see resize interface) + } else if (OB_FAIL(log_ext_handler_.start(0))) { + LOG_ERROR("start log external handler failed", KR(ret)); } else { LOG_INFO("LogFetcher start success"); } @@ -288,6 +296,7 @@ void ObLogFetcher::stop() if (is_integrated_fetching_mode(fetching_mode_)) { log_route_service_.stop(); } + log_ext_handler_.stop(); LOG_INFO("LogFetcher stop success"); } @@ -711,6 +720,20 @@ int ObLogFetcher::get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_ return ret; } +int ObLogFetcher::get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_hander) +{ + int ret = OB_SUCCESS; + + if(IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_ERROR("LogFetcher is not inited, could not get log ext handler", KR(ret), K_(is_inited)); + } else { + log_ext_hander = &log_ext_handler_; + } + + return ret; +} + int ObLogFetcher::get_fetcher_config(const ObLogFetcherConfig *&cfg) { int ret = OB_SUCCESS; diff --git a/src/logservice/logfetcher/ob_log_fetcher.h b/src/logservice/logfetcher/ob_log_fetcher.h index 35db806b4..3b549ccb6 100644 --- a/src/logservice/logfetcher/ob_log_fetcher.h +++ b/src/logservice/logfetcher/ob_log_fetcher.h @@ -35,6 +35,7 @@ #include "ob_log_progress_info.h" // ProgressInfo #include "ob_log_fetcher_start_parameters.h" // ObLogFetcherStartParameters #include "ob_log_fetcher_err_handler.h" // IObLogErrHandler +#include "logservice/ob_log_external_storage_handler.h" // ObLogExternalStorageHandler namespace oceanbase { @@ -132,6 +133,8 @@ public: virtual int get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_pool) = 0; + virtual int get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler) = 0; + virtual int get_fetcher_config(const ObLogFetcherConfig *&cfg) = 0; // Checks if the sys progress of specified tenant exceeds the timestamp @@ -245,6 +248,8 @@ public: virtual int get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_pool); + virtual int get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler); + virtual int get_fetcher_config(const ObLogFetcherConfig *&cfg); virtual int check_progress( @@ -275,39 +280,40 @@ private: }; private: - bool is_inited_; - LogFetcherUser log_fetcher_user_; - int64_t cluster_id_; - uint64_t source_tenant_id_; - uint64_t self_tenant_id_; - const ObLogFetcherConfig *cfg_; + bool is_inited_; + LogFetcherUser log_fetcher_user_; + int64_t cluster_id_; + uint64_t source_tenant_id_; + uint64_t self_tenant_id_; + const ObLogFetcherConfig *cfg_; - bool is_loading_data_dict_baseline_data_; - ClientFetchingMode fetching_mode_; - ObBackupPathString archive_dest_; - archive::LargeBufferPool large_buffer_pool_; - ObILogFetcherLSCtxAddInfoFactory *ls_ctx_add_info_factory_; - IObLogErrHandler *err_handler_; + bool is_loading_data_dict_baseline_data_; + ClientFetchingMode fetching_mode_; + ObBackupPathString archive_dest_; + archive::LargeBufferPool large_buffer_pool_; + logservice::ObLogExternalStorageHandler log_ext_handler_; + ObILogFetcherLSCtxAddInfoFactory *ls_ctx_add_info_factory_; + IObLogErrHandler *err_handler_; - ObLogLSFetchMgr ls_fetch_mgr_; // Fetch Log Task Manager - PartProgressController progress_controller_; // Process Controller + ObLogLSFetchMgr ls_fetch_mgr_; // Fetch Log Task Manager + PartProgressController progress_controller_; // Process Controller // Function Modules - ObLogRpc rpc_; - logservice::ObLogRouteService log_route_service_; - ObLogStartLSNLocator start_lsn_locator_; - ObLogFetcherIdlePool idle_pool_; - ObLogFetcherDeadPool dead_pool_; - ObLSWorker stream_worker_; + ObLogRpc rpc_; + logservice::ObLogRouteService log_route_service_; + ObLogStartLSNLocator start_lsn_locator_; + ObLogFetcherIdlePool idle_pool_; + ObLogFetcherDeadPool dead_pool_; + ObLSWorker stream_worker_; // TODO - ObFsContainerMgr fs_container_mgr_; + ObFsContainerMgr fs_container_mgr_; - volatile bool stop_flag_ CACHE_ALIGNED; + volatile bool stop_flag_ CACHE_ALIGNED; // stop flag - bool paused_ CACHE_ALIGNED; - int64_t pause_time_ CACHE_ALIGNED; - int64_t resume_time_ CACHE_ALIGNED; + bool paused_ CACHE_ALIGNED; + int64_t pause_time_ CACHE_ALIGNED; + int64_t resume_time_ CACHE_ALIGNED; private: DISALLOW_COPY_AND_ASSIGN(ObLogFetcher); diff --git a/src/logservice/logfetcher/ob_log_ls_fetch_ctx.cpp b/src/logservice/logfetcher/ob_log_ls_fetch_ctx.cpp index 9c370a7d0..12721ed42 100644 --- a/src/logservice/logfetcher/ob_log_ls_fetch_ctx.cpp +++ b/src/logservice/logfetcher/ob_log_ls_fetch_ctx.cpp @@ -244,6 +244,7 @@ int LSFetchCtx::init_remote_iter() const LSN &start_lsn = progress_.get_next_lsn(); const int64_t cur_log_progress = progress_.get_progress(); archive::LargeBufferPool *large_buffer_pool = NULL; + logservice::ObLogExternalStorageHandler *log_ext_handler = NULL; SCN start_scn; if (remote_iter_.is_init()) { @@ -253,8 +254,10 @@ int LSFetchCtx::init_remote_iter() LOG_ERROR("convert log progress to start scn failed", KR(ret), K(cur_log_progress)); } else if (OB_FAIL(get_large_buffer_pool(large_buffer_pool))) { LOG_ERROR("get large buffer pool failed", KR(ret)); + } else if (OB_FAIL(get_log_ext_handler(log_ext_handler))) { + LOG_ERROR("get log external handler failed", KR(ret)); } else if (OB_FAIL(remote_iter_.init(tenant_id, ls_id, start_scn, start_lsn, - LSN(LOG_MAX_LSN_VAL), large_buffer_pool))) { + LSN(LOG_MAX_LSN_VAL), large_buffer_pool, log_ext_handler))) { LOG_ERROR("remote iter init failed", KR(ret), K(tenant_id), K(ls_id), K(start_scn), K(start_lsn)); } return ret; @@ -430,6 +433,24 @@ int LSFetchCtx::get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_po return ret; } +int LSFetchCtx::get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler) +{ + int ret = OB_SUCCESS; + IObLogFetcher *fetcher = static_cast(ls_fetch_mgr_->get_fetcher_host()); + + if (OB_ISNULL(fetcher)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("fetcher is nullptr", KR(ret), K(fetcher)); + } else if (OB_FAIL(fetcher->get_log_ext_handler(log_ext_handler))) { + LOG_ERROR("Fetcher get_log_ext_handler failed", KR(ret)); + } else if (OB_ISNULL(log_ext_handler)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("get_log_ext_handler is nullptr", KR(ret), K(log_ext_handler)); + } + + return ret; +} + int LSFetchCtx::get_fetcher_config(const ObLogFetcherConfig *&cfg) { int ret = OB_SUCCESS; diff --git a/src/logservice/logfetcher/ob_log_ls_fetch_ctx.h b/src/logservice/logfetcher/ob_log_ls_fetch_ctx.h index fbbaa7d91..917a05e34 100644 --- a/src/logservice/logfetcher/ob_log_ls_fetch_ctx.h +++ b/src/logservice/logfetcher/ob_log_ls_fetch_ctx.h @@ -39,6 +39,10 @@ namespace oceanbase { +namespace logservice +{ +class ObLogExternalStorageHandler; +} namespace logfetcher { @@ -158,6 +162,8 @@ public: int get_large_buffer_pool(archive::LargeBufferPool *&large_buffer_pool); + int get_log_ext_handler(logservice::ObLogExternalStorageHandler *&log_ext_handler); + int get_fetcher_config(const ObLogFetcherConfig *&cfg); ObRemoteLogParent *get_archive_source() { return source_; } @@ -500,7 +506,6 @@ private: IObLogStartLSNLocator &start_lsn_locator); int set_end_lsn_and_init_dict_iter_(const palf::LSN &start_lsn); int get_log_route_service_(logservice::ObLogRouteService *&log_route_service); - int get_large_buffer_pool_(archive::LargeBufferPool *&large_buffer_pool); protected: FetchStreamType stype_; diff --git a/src/logservice/ob_log_external_storage_handler.cpp b/src/logservice/ob_log_external_storage_handler.cpp index 45c98b7ba..c5424af66 100644 --- a/src/logservice/ob_log_external_storage_handler.cpp +++ b/src/logservice/ob_log_external_storage_handler.cpp @@ -80,6 +80,7 @@ int ObLogExternalStorageHandler::start(const int64_t concurrency) ret = OB_INVALID_ARGUMENT; CLOG_LOG(WARN, "invalid argument", K(concurrency), KPC(this)); } else if (0 != concurrency + && !FALSE_IT(share::ObThreadPool::set_run_wrapper(MTL_CTX())) && OB_FAIL(ObSimpleThreadPool::init( concurrency, CAPACITY_COEFFICIENT * concurrency, "ObLogEXTTP", MTL_ID()))) { CLOG_LOG(WARN, "invalid argument", K(concurrency), KPC(this)); @@ -122,28 +123,32 @@ int ObLogExternalStorageHandler::resize(const int64_t new_concurrency, { int ret = OB_SUCCESS; ObTimeGuard time_guard("resize thread pool", DEFAULT_RESIZE_TIME_GUARD_THRESHOLD); - WLockGuardTimeout guard(resize_rw_lock_, timeout_us, ret); time_guard.click("after hold lock"); if (IS_NOT_INIT) { ret = OB_NOT_INIT; CLOG_LOG(WARN, "ObLogExternalStorageHandler not inited", KPC(this), K(new_concurrency), K(timeout_us)); - } else if (!is_running_) { - ret = OB_NOT_RUNNING; - CLOG_LOG(WARN, "ObLogExternalStorageHandler not running", KPC(this), K(new_concurrency), K(timeout_us)); - } else if (!is_valid_concurrency_(new_concurrency) || 0 > timeout_us) { + } else if (!is_valid_concurrency_(new_concurrency) || 0 >= timeout_us) { ret = OB_INVALID_ARGUMENT; CLOG_LOG(WARN, "invalid arguments", KPC(this), K(new_concurrency), K(timeout_us)); - // hold lock failed - } else if (OB_FAIL(ret)) { - CLOG_LOG(WARN, "hold lock failed", KPC(this), K(new_concurrency), K(timeout_us)); - } else if (new_concurrency == concurrency_) { + } else if (!check_need_resize_(new_concurrency)) { CLOG_LOG(TRACE, "no need resize", KPC(this), K(new_concurrency)); } else { - destroy_and_init_new_thread_pool_(new_concurrency); - time_guard.click("after create new thread pool"); - concurrency_ = new_concurrency; - capacity_ = CAPACITY_COEFFICIENT * new_concurrency; - CLOG_LOG(INFO, "ObLogExternalStorageHandler resize success", K(new_concurrency)); + WLockGuardTimeout guard(resize_rw_lock_, timeout_us, ret); + // hold lock failed + if (OB_FAIL(ret)) { + CLOG_LOG(WARN, "hold lock failed", KPC(this), K(new_concurrency), K(timeout_us)); + } else if (!is_running_) { + ret = OB_NOT_RUNNING; + CLOG_LOG(WARN, "ObLogExternalStorageHandler not running", KPC(this), K(new_concurrency), K(timeout_us)); + } else if (new_concurrency == concurrency_) { + CLOG_LOG(TRACE, "no need resize", KPC(this), K(new_concurrency)); + } else { + destroy_and_init_new_thread_pool_(new_concurrency); + time_guard.click("after create new thread pool"); + concurrency_ = new_concurrency; + capacity_ = CAPACITY_COEFFICIENT * new_concurrency; + CLOG_LOG(INFO, "ObLogExternalStorageHandler resize success", K(new_concurrency)); + } } return ret; } @@ -159,7 +164,7 @@ int ObLogExternalStorageHandler::pread(const common::ObString &uri, int64_t async_task_count = 0; ObTimeGuard time_guard("slow pread", DEFAULT_PREAD_TIME_GUARD_THRESHOLD); ObLogExternalStorageIOTaskCtx *async_task_ctx = NULL; - int64_t file_size = palf::PALF_PHY_BLOCK_SIZE; + int64_t file_size = 0; int64_t real_read_buf_size = 0; RLockGuard guard(resize_rw_lock_); time_guard.click("after hold by lock"); @@ -169,9 +174,12 @@ int ObLogExternalStorageHandler::pread(const common::ObString &uri, } else if (!is_running_) { ret = OB_NOT_RUNNING; CLOG_LOG(WARN, "ObLogExternalStorageHandler not running", K(uri), K(storage_info), K(offset), KP(buf), K(read_buf_size)); - } else if (uri.empty() || storage_info.empty() || 0 > offset || NULL == buf || 0 >= read_buf_size) { + // when uri is NFS, storage_info is empty. + } else if (uri.empty() || 0 > offset || NULL == buf || 0 >= read_buf_size) { ret = OB_INVALID_ARGUMENT; CLOG_LOG(WARN, "ObLogExternalStorageHandler invalid argument", K(uri), K(storage_info), K(offset), KP(buf), K(read_buf_size)); + } else if (OB_FAIL(handle_adapter_->get_file_size(uri, storage_info, file_size))) { + CLOG_LOG(WARN, "get_file_size failed", K(uri), K(storage_info), K(offset), KP(buf), K(read_buf_size)); } else if (offset >= file_size) { ret = OB_INVALID_ARGUMENT; CLOG_LOG(WARN, "read position lager than file size, invalid argument", K(file_size), K(offset), K(uri)); @@ -223,6 +231,11 @@ void ObLogExternalStorageHandler::handle(void *task) } } +int64_t ObLogExternalStorageHandler::get_recommend_concurrency_in_single_file() const +{ + return palf::PALF_PHY_BLOCK_SIZE / SINGLE_TASK_MINIMUM_SIZE; +} + bool ObLogExternalStorageHandler::is_valid_concurrency_(const int64_t concurrency) const { return 0 <= concurrency && CONCURRENCY_LIMIT >= concurrency; @@ -390,6 +403,7 @@ void ObLogExternalStorageHandler::destroy_and_init_new_thread_pool_( do { if (0 != new_concurrency + && !FALSE_IT(share::ObThreadPool::set_run_wrapper(MTL_CTX())) && OB_FAIL(ObSimpleThreadPool::init( new_concurrency, CAPACITY_COEFFICIENT * new_concurrency, "ObLogEXTTP", MTL_ID()))) { CLOG_LOG(WARN, "init ObSimpleThreadPool failed", K(new_concurrency), KPC(this)); @@ -405,5 +419,11 @@ void ObLogExternalStorageHandler::destroy_and_init_new_thread_pool_( CLOG_LOG_RET(WARN, OB_SUCCESS, "destroy_and_init_new_thread_pool_ success", K(time_guard), KPC(this)); } +bool ObLogExternalStorageHandler::check_need_resize_(const int64_t new_concurrency) const +{ + RLockGuard guard(resize_rw_lock_); + return new_concurrency != concurrency_; +} + } // end namespace logservice } // end namespace oceanbase diff --git a/src/logservice/ob_log_external_storage_handler.h b/src/logservice/ob_log_external_storage_handler.h index a96002cf8..f7627d90a 100644 --- a/src/logservice/ob_log_external_storage_handler.h +++ b/src/logservice/ob_log_external_storage_handler.h @@ -114,6 +114,8 @@ public: void handle(void *task) override final; + int64_t get_recommend_concurrency_in_single_file() const; + TO_STRING_KV(K_(concurrency), K_(capacity), K_(is_running), K_(is_inited), KP(handle_adapter_), KP(this)); private: // CONCURRENCY LIMIT is 128. @@ -156,6 +158,8 @@ private: void destroy_and_init_new_thread_pool_(const int64_t concurrency); + bool check_need_resize_(const int64_t concurrency) const; + private: typedef common::RWLock RWLock; typedef RWLock::RLockGuard RLockGuard; @@ -163,7 +167,7 @@ private: typedef RWLock::WLockGuardWithTimeout WLockGuardTimeout; int64_t concurrency_; int64_t capacity_; - RWLock resize_rw_lock_; + mutable RWLock resize_rw_lock_; ObSpinLock construct_async_task_lock_; ObLogExternalStorageIOTaskHandleIAdapter *handle_adapter_; bool is_running_; diff --git a/src/logservice/ob_log_external_storage_io_task.cpp b/src/logservice/ob_log_external_storage_io_task.cpp index 08fb18783..ff6c8ca59 100644 --- a/src/logservice/ob_log_external_storage_io_task.cpp +++ b/src/logservice/ob_log_external_storage_io_task.cpp @@ -129,7 +129,7 @@ int ObLogExternalStorageIOTaskCtx::get_ret_code() const { int ret = OB_SUCCESS; for (int64_t i = 0; i < total_task_count_; i++) { - if (OB_SUCCESS != running_status_[i].ret_) { + if (0 == i && OB_SUCCESS != running_status_[i].ret_) { ret = running_status_[i].ret_; CLOG_LOG(WARN, "asyn task execute failed", KPC(this)); break; @@ -336,7 +336,7 @@ int ObLogExternalStorageIOTaskHandleAdapter::pread(const ObString &uri, const int64_t read_buf_size, int64_t &real_read_size) { - ObTimeGuard time_guard("oss pread", 200 * 1000); + ObTimeGuard time_guard("oss pread", 100 * 1000); int ret = OB_SUCCESS; real_read_size = 0; ObIODevice *io_device = NULL; @@ -348,7 +348,7 @@ int ObLogExternalStorageIOTaskHandleAdapter::pread(const ObString &uri, } else if (FALSE_IT(time_guard.click("after open_io_fd"))) { CLOG_LOG(WARN, "open_io_fd failed", K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size)); } else if (OB_FAIL(io_device->pread(io_fd, offset, read_buf_size, buf, real_read_size))) { - CLOG_LOG(WARN, "pread failed", K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size)); + CLOG_LOG(WARN, "pread failed", K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size), K(time_guard)); } else if (FALSE_IT(time_guard.click("after pread"))) { } else { CLOG_LOG(TRACE, "pread success", K(time_guard), K(io_fd), K(offset), K(read_buf_size), KP(buf), K(real_read_size)); diff --git a/src/logservice/restoreservice/ob_log_restore_archive_driver.cpp b/src/logservice/restoreservice/ob_log_restore_archive_driver.cpp index 1fc2f17a9..032867b47 100644 --- a/src/logservice/restoreservice/ob_log_restore_archive_driver.cpp +++ b/src/logservice/restoreservice/ob_log_restore_archive_driver.cpp @@ -114,18 +114,12 @@ int ObLogRestoreArchiveDriver::check_need_schedule_(ObLS &ls, int64_t &task_count) { int ret = OB_SUCCESS; - int64_t concurrency = 0; ObRemoteFetchContext context; ObLogRestoreHandler *restore_handler = NULL; bool need_delay = false; need_schedule = false; - omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_)); - const int64_t log_restore_concurrency = tenant_config.is_valid() ? tenant_config->log_restore_concurrency : 1L; - if (0 == log_restore_concurrency) { - concurrency = std::min(get_restore_concurrency_by_max_cpu(), MAX_LS_FETCH_LOG_TASK_CONCURRENCY); - } else { - concurrency = std::min(log_restore_concurrency, MAX_LS_FETCH_LOG_TASK_CONCURRENCY); - } + int64_t concurrency = 0; + int64_t fetch_log_worker_count = 0; if (OB_ISNULL(restore_handler = ls.get_log_restore_handler())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("get restore_handler failed", K(ret), "id", ls.get_ls_id()); @@ -133,6 +127,9 @@ int ObLogRestoreArchiveDriver::check_need_schedule_(ObLS &ls, LOG_WARN("get fetch log context failed", K(ret), K(ls)); } else if (! need_schedule) { // do nothing + } else if (OB_FAIL(worker_->get_thread_count(fetch_log_worker_count))) { + LOG_WARN("get_thread_count from worker_ failed", K(ret), K(ls)); + } else if (FALSE_IT(concurrency = std::min(fetch_log_worker_count, MAX_LS_FETCH_LOG_TASK_CONCURRENCY))) { } else if (context.issue_task_num_ >= concurrency) { need_schedule = false; } else if (OB_FAIL(check_need_delay_(ls.get_ls_id(), need_delay))) { diff --git a/src/logservice/restoreservice/ob_log_restore_define.cpp b/src/logservice/restoreservice/ob_log_restore_define.cpp index 6495aa27c..4acc83009 100644 --- a/src/logservice/restoreservice/ob_log_restore_define.cpp +++ b/src/logservice/restoreservice/ob_log_restore_define.cpp @@ -80,10 +80,5 @@ bool ObLogRestoreSourceTenant::is_valid() const && ip_list_.count() > 0; } -int64_t get_restore_concurrency_by_max_cpu() -{ - return static_cast(MTL_CPU_COUNT() + 7) / 8; -} - } // namespace logservice } // namespace oceanbase diff --git a/src/logservice/restoreservice/ob_log_restore_define.h b/src/logservice/restoreservice/ob_log_restore_define.h index ab43bba68..946d7b36d 100644 --- a/src/logservice/restoreservice/ob_log_restore_define.h +++ b/src/logservice/restoreservice/ob_log_restore_define.h @@ -26,8 +26,6 @@ namespace oceanbase namespace logservice { const int64_t MAX_FETCH_LOG_BUF_LEN = 4 * 1024 * 1024L; -const int64_t MIN_FETCH_LOG_WORKER_THREAD_COUNT = 1; -const int64_t MAX_FETCH_LOG_WORKER_THREAD_COUNT = 10; const int64_t MAX_LS_FETCH_LOG_TASK_CONCURRENCY = 4; struct ObLogRestoreErrorContext @@ -81,8 +79,6 @@ struct ObLogRestoreSourceTenant final K_(is_oracle), K_(ip_list)); }; - -int64_t get_restore_concurrency_by_max_cpu(); } // namespace logservice } // namespace oceanbase diff --git a/src/logservice/restoreservice/ob_log_restore_scheduler.cpp b/src/logservice/restoreservice/ob_log_restore_scheduler.cpp index 4dc2311b8..1a29c8ed2 100644 --- a/src/logservice/restoreservice/ob_log_restore_scheduler.cpp +++ b/src/logservice/restoreservice/ob_log_restore_scheduler.cpp @@ -78,21 +78,22 @@ int ObLogRestoreScheduler::modify_thread_count_(const share::ObLogRestoreSourceT { int ret = OB_SUCCESS; int64_t restore_concurrency = 0; - omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_)); - const int64_t log_restore_concurrency = - tenant_config.is_valid() ? tenant_config->log_restore_concurrency : 1L; - // primary tenant or log restore source not location type, restore_concurrency is 1 - // parameter log_restore_concurrency is default zero, set restore_concurrency = max_cpu / 8, rounded up - // parameter log_restore_concurrency not zero, set restore_concurrency = log_restore_concurrency + // for primary tenant, set restore_concurrency to 1. + // otherwise, set restore_concurrency to tenant config. if (MTL_GET_TENANT_ROLE() == share::ObTenantRole::PRIMARY_TENANT || !share::is_location_log_source_type(source_type)) { restore_concurrency = 1; - } else if (0 == log_restore_concurrency) { - restore_concurrency = get_restore_concurrency_by_max_cpu(); } else { - restore_concurrency = log_restore_concurrency; + omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_)); + if (!tenant_config.is_valid()) { + restore_concurrency = 1L; + } else if (0 == tenant_config->log_restore_concurrency) { + restore_concurrency = MTL_CPU_COUNT(); + } else { + restore_concurrency = tenant_config->log_restore_concurrency; + } } - if (OB_FAIL(worker_->modify_thread_count(std::max(1L, restore_concurrency)))) { + if (OB_FAIL(worker_->modify_thread_count(restore_concurrency))) { CLOG_LOG(WARN, "modify worker thread failed", K(ret)); } return ret; diff --git a/src/logservice/restoreservice/ob_log_restore_service.h b/src/logservice/restoreservice/ob_log_restore_service.h index fc8858543..360d6a2b4 100644 --- a/src/logservice/restoreservice/ob_log_restore_service.h +++ b/src/logservice/restoreservice/ob_log_restore_service.h @@ -14,18 +14,18 @@ #define OCEANBASE_LOGSERVICE_OB_LOG_RESTORE_SERVICE_H_ #include "lib/utility/ob_macro_utils.h" -#include "rpc/frame/ob_req_transport.h" // ObReqTransport -#include "share/ob_thread_pool.h" // ObThreadPool -#include "ob_remote_fetch_log.h" // ObRemoteFetchLogImpl -#include "ob_log_restore_rpc.h" // ObLogResSvrRpc -#include "ob_remote_fetch_log_worker.h" // ObRemoteFetchWorker -#include "ob_remote_location_adaptor.h" // ObRemoteLocationAdaptor -#include "ob_remote_error_reporter.h" // ObRemoteErrorReporter -#include "ob_log_restore_allocator.h" // ObLogRestoreAllocator -#include "ob_log_restore_scheduler.h" // ObLogRestoreScheduler -#include "ob_log_restore_controller.h" // ObLogRestoreController -#include "ob_log_restore_net_driver.h" // ObLogRestoreNetDriver -#include "ob_log_restore_archive_driver.h" // ObLogRestoreArchiveDriver +#include "rpc/frame/ob_req_transport.h" // ObReqTransport +#include "share/ob_thread_pool.h" // ObThreadPool +#include "ob_remote_fetch_log.h" // ObRemoteFetchLogImpl +#include "ob_log_restore_rpc.h" // ObLogResSvrRpc +#include "ob_remote_fetch_log_worker.h" // ObRemoteFetchWorker +#include "ob_remote_location_adaptor.h" // ObRemoteLocationAdaptor +#include "ob_remote_error_reporter.h" // ObRemoteErrorReporter +#include "ob_log_restore_allocator.h" // ObLogRestoreAllocator +#include "ob_log_restore_scheduler.h" // ObLogRestoreScheduler +#include "ob_log_restore_controller.h" // ObLogRestoreController +#include "ob_log_restore_net_driver.h" // ObLogRestoreNetDriver +#include "ob_log_restore_archive_driver.h" // ObLogRestoreArchiveDriver namespace oceanbase { diff --git a/src/logservice/restoreservice/ob_remote_data_generator.cpp b/src/logservice/restoreservice/ob_remote_data_generator.cpp index 95e5862e4..5e9634823 100644 --- a/src/logservice/restoreservice/ob_remote_data_generator.cpp +++ b/src/logservice/restoreservice/ob_remote_data_generator.cpp @@ -38,14 +38,16 @@ RemoteDataGenerator::RemoteDataGenerator(const uint64_t tenant_id, const ObLSID &id, const LSN &start_lsn, const LSN &end_lsn, - const SCN &end_scn) : + const SCN &end_scn, + ObLogExternalStorageHandler *log_ext_handler) : tenant_id_(tenant_id), id_(id), start_lsn_(start_lsn), next_fetch_lsn_(start_lsn), end_scn_(end_scn), end_lsn_(end_lsn), - to_end_(false) + to_end_(false), + log_ext_handler_(log_ext_handler) {} RemoteDataGenerator::~RemoteDataGenerator() @@ -55,6 +57,7 @@ RemoteDataGenerator::~RemoteDataGenerator() start_lsn_.reset(); next_fetch_lsn_.reset(); end_lsn_.reset(); + log_ext_handler_ = NULL; } bool RemoteDataGenerator::is_valid() const @@ -64,7 +67,8 @@ bool RemoteDataGenerator::is_valid() const && start_lsn_.is_valid() && end_scn_.is_valid() && end_lsn_.is_valid() - && end_lsn_ > start_lsn_; + && end_lsn_ > start_lsn_ + && log_ext_handler_ != NULL; } bool RemoteDataGenerator::is_fetch_to_end() const @@ -108,8 +112,9 @@ ServiceDataGenerator::ServiceDataGenerator(const uint64_t tenant_id, const LSN &start_lsn, const LSN &end_lsn, const SCN &end_scn, - const ObAddr &server) : - RemoteDataGenerator(tenant_id, id, start_lsn, end_lsn, end_scn), + const ObAddr &server, + ObLogExternalStorageHandler *log_ext_handler) : + RemoteDataGenerator(tenant_id, id, start_lsn, end_lsn, end_scn, log_ext_handler), server_(server), result_() {} @@ -184,34 +189,6 @@ static int64_t cal_lsn_to_file_id_(const LSN &lsn) return cal_archive_file_id(lsn, palf::PALF_BLOCK_SIZE); } -static int read_file_(const ObString &base, - const share::ObBackupStorageInfo *storage_info, - const share::ObLSID &id, - const int64_t file_id, - const int64_t offset, - char *data, - const int64_t data_len, - int64_t &data_size) -{ - int ret = OB_SUCCESS; - share::ObBackupPath path; - if (OB_FAIL(ObArchivePathUtil::build_restore_path(base.ptr(), id, file_id, path))) { - LOG_WARN("build restore path failed", K(ret)); - } else { - ObString uri(path.get_obstr()); - int64_t real_size = 0; - if (OB_FAIL(ObArchiveFileUtils::range_read(uri, storage_info, data, data_len, offset, real_size))) { - LOG_WARN("read file failed", K(ret), K(uri), K(storage_info)); - } else if (0 == real_size) { - ret = OB_ITER_END; - LOG_INFO("read no data, need retry", K(ret), K(uri), K(storage_info), K(offset), K(real_size)); - } else { - data_size = real_size; - } - } - return ret; -} - static int extract_archive_file_header_(char *buf, const int64_t buf_size, palf::LSN &lsn) @@ -263,8 +240,9 @@ LocationDataGenerator::LocationDataGenerator(const uint64_t tenant_id, ObLogArchivePieceContext *piece_context, char *buf, const int64_t buf_size, - const int64_t single_read_size) : - RemoteDataGenerator(tenant_id, id, start_lsn, end_lsn, end_scn), + const int64_t single_read_size, + ObLogExternalStorageHandler *log_ext_handler) : + RemoteDataGenerator(tenant_id, id, start_lsn, end_lsn, end_scn, log_ext_handler), pre_scn_(pre_scn), base_lsn_(), data_len_(0), @@ -477,6 +455,40 @@ void LocationDataGenerator::cal_read_size_(const int64_t dest_id, } } +int LocationDataGenerator::read_file_(const ObString &base, + const share::ObBackupStorageInfo *storage_info, + const share::ObLSID &id, + const int64_t file_id, + const int64_t offset, + char *data, + const int64_t data_len, + int64_t &data_size) +{ + int ret = OB_SUCCESS; + share::ObBackupPath path; + if (OB_FAIL(ObArchivePathUtil::build_restore_path(base.ptr(), id, file_id, path))) { + LOG_WARN("build restore path failed", K(ret)); + } else { + ObString uri(path.get_obstr()); + char storage_info_cstr[OB_MAX_BACKUP_STORAGE_INFO_LENGTH] = {'\0'}; + int64_t real_size = 0; + if (OB_FAIL(storage_info->get_storage_info_str(storage_info_cstr, OB_MAX_BACKUP_STORAGE_INFO_LENGTH, false))) { + LOG_WARN("get_storage_info_str failed", K(ret), K(uri), K(storage_info)); + } else { + ObString storage_info_ob_str(storage_info_cstr); + if (OB_FAIL(log_ext_handler_->pread(uri, storage_info_ob_str, offset, data, data_len, real_size))) { + LOG_WARN("read file failed", K(ret), K(uri), K(storage_info)); + } else if (0 == real_size) { + ret = OB_ITER_END; + LOG_INFO("read no data, need retry", K(ret), K(uri), K(storage_info), K(offset), K(real_size)); + } else { + data_size = real_size; + } + } + } + return ret; +} + bool LocationDataGenerator::FileDesc::is_valid() const { return dest_id_ > 0 @@ -586,8 +598,9 @@ RawPathDataGenerator::RawPathDataGenerator(const uint64_t tenant_id, const SCN &end_scn, const int64_t piece_index, const int64_t min_file_id, - const int64_t max_file_id) : - RemoteDataGenerator(tenant_id, id, start_lsn, end_lsn, end_scn), + const int64_t max_file_id, + ObLogExternalStorageHandler *log_ext_handler) : + RemoteDataGenerator(tenant_id, id, start_lsn, end_lsn, end_scn, log_ext_handler), array_(array), data_len_(0), file_id_(0), diff --git a/src/logservice/restoreservice/ob_remote_data_generator.h b/src/logservice/restoreservice/ob_remote_data_generator.h index af889ffce..587ba061b 100644 --- a/src/logservice/restoreservice/ob_remote_data_generator.h +++ b/src/logservice/restoreservice/ob_remote_data_generator.h @@ -40,6 +40,7 @@ class LogGroupEntry; } namespace logservice { +class ObLogExternalStorageHandler; using oceanbase::palf::LSN; using oceanbase::palf::LogGroupEntry; using oceanbase::share::ObLSID; @@ -142,7 +143,8 @@ public: const ObLSID &id, const LSN &start_lsn, const LSN &end_lsn, - const share::SCN &end_scn); + const share::SCN &end_scn, + logservice::ObLogExternalStorageHandler *log_ext_handler); virtual ~RemoteDataGenerator(); public: @@ -166,6 +168,7 @@ protected: share::SCN end_scn_; LSN end_lsn_; bool to_end_; + logservice::ObLogExternalStorageHandler *log_ext_handler_; private: DISALLOW_COPY_AND_ASSIGN(RemoteDataGenerator); @@ -179,7 +182,8 @@ public: const LSN &start_lsn, const LSN &end_lsn, const share::SCN &end_scn, - const ObAddr &server); + const ObAddr &server, + logservice::ObLogExternalStorageHandler *log_ext_handler); virtual ~ServiceDataGenerator(); public: @@ -213,7 +217,8 @@ public: ObLogArchivePieceContext *piece_context, char *buf, const int64_t buf_size, - const int64_t single_read_size); + const int64_t single_read_size, + logservice::ObLogExternalStorageHandler *log_ext_handler); ~LocationDataGenerator(); int next_buffer(palf::LSN &lsn, char *&buf, int64_t &buf_size); int update_max_lsn(const palf::LSN &lsn); @@ -282,6 +287,14 @@ private: const int64_t file_id, const int64_t file_offset, int64_t &size); + int read_file_(const ObString &base, + const share::ObBackupStorageInfo *storage_info, + const share::ObLSID &id, + const int64_t file_id, + const int64_t offset, + char *data, + const int64_t data_len, + int64_t &real_read_size); private: share::SCN pre_scn_; // base_lsn_ is the start_lsn from the archive file, while the next_fetch_lsn_ is the start_lsn to fetch, @@ -313,7 +326,8 @@ public: const share::SCN &end_scn, const int64_t piece_index, const int64_t min_file_id, - const int64_t max_file_id); + const int64_t max_file_id, + logservice::ObLogExternalStorageHandler *log_ext_handler); virtual ~RawPathDataGenerator(); int next_buffer(palf::LSN &lsn, char *&buf, int64_t &buf_size); diff --git a/src/logservice/restoreservice/ob_remote_fetch_log_worker.cpp b/src/logservice/restoreservice/ob_remote_fetch_log_worker.cpp index e1087cb26..166fe8cd5 100644 --- a/src/logservice/restoreservice/ob_remote_fetch_log_worker.cpp +++ b/src/logservice/restoreservice/ob_remote_fetch_log_worker.cpp @@ -65,6 +65,7 @@ ObRemoteFetchWorker::ObRemoteFetchWorker() : ls_svr_(NULL), task_queue_(), allocator_(NULL), + log_ext_handler_(), cond_() {} @@ -95,6 +96,8 @@ int ObRemoteFetchWorker::init(const uint64_t tenant_id, K(allocator), K(restore_service), K(ls_svr)); } else if (OB_FAIL(task_queue_.init(FETCH_LOG_TASK_LIMIT, "RFLTaskQueue", MTL_ID()))) { LOG_WARN("task_queue_ init failed", K(ret)); + } else if (OB_FAIL(log_ext_handler_.init())) { + LOG_WARN("log_ext_handler_ init failed", K(ret)); } else { tenant_id_ = tenant_id; allocator_ = allocator; @@ -129,6 +132,7 @@ void ObRemoteFetchWorker::destroy() ls_svr_ = NULL; allocator_ = NULL; restore_controller_ = NULL; + log_ext_handler_.destroy(); inited_ = false; } } @@ -140,6 +144,8 @@ int ObRemoteFetchWorker::start() if (OB_UNLIKELY(! inited_)) { ret = OB_NOT_INIT; LOG_ERROR("ObRemoteFetchWorker not init", K(ret)); + } else if (OB_FAIL(log_ext_handler_.start(0))) { + LOG_WARN("ObLogExtStorageHandler start failed"); } else if (OB_FAIL(ObThreadPool::start())) { LOG_WARN("ObRemoteFetchWorker start failed", K(ret)); } else { @@ -151,12 +157,14 @@ int ObRemoteFetchWorker::start() void ObRemoteFetchWorker::stop() { LOG_INFO("ObRemoteFetchWorker thread stop", K_(tenant_id)); + log_ext_handler_.stop(); ObThreadPool::stop(); } void ObRemoteFetchWorker::wait() { LOG_INFO("ObRemoteFetchWorker thread wait", K_(tenant_id)); + log_ext_handler_.wait(); ObThreadPool::wait(); } @@ -183,21 +191,31 @@ int ObRemoteFetchWorker::submit_fetch_log_task(ObFetchLogTask *task) return ret; } -int ObRemoteFetchWorker::modify_thread_count(const int64_t thread_count) +int ObRemoteFetchWorker::modify_thread_count(const int64_t log_restore_concurrency) { int ret = OB_SUCCESS; - int64_t count = thread_count; - if (thread_count < MIN_FETCH_LOG_WORKER_THREAD_COUNT) { - count = MIN_FETCH_LOG_WORKER_THREAD_COUNT; - } else if (thread_count > MAX_FETCH_LOG_WORKER_THREAD_COUNT) { - count = MAX_FETCH_LOG_WORKER_THREAD_COUNT; - } - if (count == get_thread_count()) { + int64_t thread_count = 0; + if (OB_FAIL(log_ext_handler_.resize(log_restore_concurrency))) { + LOG_WARN("log_ext_handler_ resize failed", K(log_restore_concurrency), K(thread_count)); + } else if (FALSE_IT(thread_count = calcuate_thread_count_(log_restore_concurrency))) { + LOG_WARN("calcuate_thread_count_ failed", K(log_restore_concurrency), K(thread_count)); + } else if (thread_count == lib::Threads::get_thread_count()) { // do nothing - } else if (OB_FAIL(set_thread_count(count))) { + } else if (OB_FAIL(set_thread_count(thread_count))) { LOG_WARN("set thread count failed", K(ret)); } else { - LOG_INFO("set thread count succ", K(count)); + LOG_INFO("set thread count succ", K(thread_count), K(log_restore_concurrency)); + } + return ret; +} + +int ObRemoteFetchWorker::get_thread_count(int64_t &thread_count) const +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(! inited_)) { + LOG_WARN("ObRemoteFetchWorker not init"); + } else { + thread_count = lib::Threads::get_thread_count(); } return ret; } @@ -228,7 +246,7 @@ void ObRemoteFetchWorker::do_thread_task_() { int ret = OB_SUCCESS; int64_t size = task_queue_.size(); - if (0 != get_thread_idx() || get_thread_count() <= 1) { + if (0 != get_thread_idx() || lib::Threads::get_thread_count() <= 1) { for (int64_t i = 0; i < size && OB_SUCC(ret) && !has_set_stop(); i++) { if (OB_FAIL(handle_single_task_())) { LOG_WARN("handle single task failed", K(ret)); @@ -292,7 +310,8 @@ int ObRemoteFetchWorker::handle_fetch_log_task_(ObFetchLogTask *task) ret = OB_INVALID_ARGUMENT; LOG_ERROR("invalid argument", K(ret), K(task)); } else if (OB_FAIL(task->iter_.init(tenant_id_, task->id_, task->pre_scn_, - task->cur_lsn_, task->end_lsn_, allocator_->get_buferr_pool(), DEFAULT_BUF_SIZE))) { + task->cur_lsn_, task->end_lsn_, allocator_->get_buferr_pool(), + &log_ext_handler_, DEFAULT_BUF_SIZE))) { LOG_WARN("ObRemoteLogIterator init failed", K(ret), K_(tenant_id), KPC(task)); } else if (!need_fetch_log_(task->id_)) { LOG_TRACE("no need fetch log", KPC(task)); @@ -599,5 +618,14 @@ void ObRemoteFetchWorker::report_error_(const ObLSID &id, } } +int64_t ObRemoteFetchWorker::calcuate_thread_count_(const int64_t log_restore_concurrency) +{ + int64_t thread_count = 0; + int64_t recommend_concurrency_in_single_file = log_ext_handler_.get_recommend_concurrency_in_single_file(); + thread_count = static_cast( + log_restore_concurrency + recommend_concurrency_in_single_file - 1) / recommend_concurrency_in_single_file; + return thread_count; +} + } // namespace logservice } // namespace oceanbase diff --git a/src/logservice/restoreservice/ob_remote_fetch_log_worker.h b/src/logservice/restoreservice/ob_remote_fetch_log_worker.h index 6f3879ae4..8de5da5f2 100644 --- a/src/logservice/restoreservice/ob_remote_fetch_log_worker.h +++ b/src/logservice/restoreservice/ob_remote_fetch_log_worker.h @@ -13,11 +13,12 @@ #ifndef OCEANBASE_LOGSERVICE_OB_REMOTE_FETCH_LOG_WORKER_H_ #define OCEANBASE_LOGSERVICE_OB_REMOTE_FETCH_LOG_WORKER_H_ -#include "lib/queue/ob_lighty_queue.h" // ObLightyQueue -#include "common/ob_queue_thread.h" // ObCond -#include "share/ob_thread_pool.h" // ObThreadPool -#include "share/ob_ls_id.h" // ObLSID -#include "ob_remote_log_iterator.h" // ObRemoteLogGroupEntryIterator +#include "lib/queue/ob_lighty_queue.h" // ObLightyQueue +#include "common/ob_queue_thread.h" // ObCond +#include "share/ob_thread_pool.h" // ObThreadPool +#include "share/ob_ls_id.h" // ObLSID +#include "ob_remote_log_iterator.h" // ObRemoteLogGroupEntryIterator +#include "logservice/ob_log_external_storage_handler.h" // ObLogExternalStorageHandler namespace oceanbase { @@ -71,7 +72,8 @@ public: // @retval other code unexpected error int submit_fetch_log_task(ObFetchLogTask *task); - int modify_thread_count(const int64_t count); + int modify_thread_count(const int64_t log_restore_concurrency); + int get_thread_count(int64_t &thread_count) const; private: void run1(); @@ -99,6 +101,7 @@ private: const int ret_code, const palf::LSN &lsn, const ObLogRestoreErrorContext::ErrorType &error_type); + int64_t calcuate_thread_count_(const int64_t log_restore_concurrency); private: bool inited_; uint64_t tenant_id_; @@ -107,6 +110,7 @@ private: storage::ObLSService *ls_svr_; common::ObLightyQueue task_queue_; ObLogRestoreAllocator *allocator_; + ObLogExternalStorageHandler log_ext_handler_; common::ObCond cond_; private: diff --git a/src/logservice/restoreservice/ob_remote_log_iterator.h b/src/logservice/restoreservice/ob_remote_log_iterator.h index 83fde4c24..09bf09800 100644 --- a/src/logservice/restoreservice/ob_remote_log_iterator.h +++ b/src/logservice/restoreservice/ob_remote_log_iterator.h @@ -33,6 +33,7 @@ #include "share/backup/ob_backup_struct.h" #include "share/rc/ob_tenant_base.h" #include "logservice/archiveservice/large_buffer_pool.h" +#include "logservice/ob_log_external_storage_handler.h" // ObLogExternalHandler namespace oceanbase { @@ -81,6 +82,7 @@ public: const LSN &start_lsn, const LSN &end_lsn, archive::LargeBufferPool *buffer_pool, + logservice::ObLogExternalStorageHandler *log_ext_handler, const int64_t single_read_size = DEFAULT_SINGLE_READ_SIZE); // @brief used as local iterator, get one entry if not to end // @param[out] entry LogGroupEntry or LogEntry @@ -138,6 +140,7 @@ private: char *buf_; int64_t buf_size_; archive::LargeBufferPool *buffer_pool_; + logservice::ObLogExternalStorageHandler *log_ext_handler_; GetSourceFunc get_source_func_; UpdateSourceFunc update_source_func_; RefreshStorageInfoFunc refresh_storage_info_func_; diff --git a/src/logservice/restoreservice/ob_remote_log_iterator.ipp b/src/logservice/restoreservice/ob_remote_log_iterator.ipp index bda296b39..dfb071f8b 100644 --- a/src/logservice/restoreservice/ob_remote_log_iterator.ipp +++ b/src/logservice/restoreservice/ob_remote_log_iterator.ipp @@ -28,6 +28,7 @@ ObRemoteLogIterator::ObRemoteLogIterator(GetSourceFunc &get_source buf_(NULL), buf_size_(0), buffer_pool_(NULL), + log_ext_handler_(NULL), get_source_func_(get_source_func), update_source_func_(update_source_func), refresh_storage_info_func_(refresh_storage_info_func) @@ -46,6 +47,7 @@ int ObRemoteLogIterator::init(const uint64_t tenant_id, const LSN &start_lsn, const LSN &end_lsn, archive::LargeBufferPool *buffer_pool, + logservice::ObLogExternalStorageHandler *log_ext_handler, const int64_t single_read_size) { int ret = OB_SUCCESS; @@ -59,10 +61,11 @@ int ObRemoteLogIterator::init(const uint64_t tenant_id, || ! id.is_valid() || ! start_lsn.is_valid() || (end_lsn.is_valid() && end_lsn <= start_lsn) + || NULL == log_ext_handler || single_read_size < 2 * 1024 * 1024)) { // TODO set size ret = OB_INVALID_ARGUMENT; - CLOG_LOG(WARN, "invalid argument", K(ret), K(buffer_pool), - K(tenant_id), K(id), K(start_lsn), K(end_lsn)); + CLOG_LOG(WARN, "invalid argument", K(ret), KP(buffer_pool), + K(tenant_id), K(id), K(start_lsn), K(end_lsn), KP(log_ext_handler)); } else if (OB_FAIL(get_source_func_(id, source_guard_))) { CLOG_LOG(WARN, "get source failed", K(ret), K(id)); } else if (OB_ISNULL(source = source_guard_.get_source())) { @@ -82,6 +85,7 @@ int ObRemoteLogIterator::init(const uint64_t tenant_id, start_lsn_ = start_lsn; end_lsn_ = end_lsn; single_read_size_ = single_read_size; + log_ext_handler_ = log_ext_handler; ret = build_data_generator_(pre_scn, source, refresh_storage_info_func_); CLOG_LOG(INFO, "ObRemoteLogIterator init", K(ret), K(tenant_id), K(id), K(pre_scn), K(start_lsn), K(end_lsn)); } @@ -126,6 +130,8 @@ void ObRemoteLogIterator::reset() buf_size_ = 0; } + log_ext_handler_ = NULL; + id_.reset(); start_lsn_.reset(); cur_lsn_.reset(); @@ -200,7 +206,7 @@ int ObRemoteLogIterator::build_dest_data_generator_(const share::S source->get(array, end_scn); source->get_locate_info(piece_index, min_file_id, max_file_id); gen_ = MTL_NEW(RawPathDataGenerator, "ResDataGen", tenant_id_, id_, start_lsn_, end_lsn_, - array, end_scn, piece_index, min_file_id, max_file_id); + array, end_scn, piece_index, min_file_id, max_file_id, log_ext_handler_); if (OB_ISNULL(gen_)) { ret = OB_ALLOCATE_MEMORY_FAILED; CLOG_LOG(WARN, "alloc dest data generator failed", K(ret), KPC(this)); @@ -221,7 +227,7 @@ int ObRemoteLogIterator::build_location_data_generator_(const shar source->get(dest, piece_context, end_scn); gen_ = MTL_NEW(LocationDataGenerator, "ResDataGen", tenant_id_, pre_scn, id_, start_lsn_, end_lsn_, end_scn, dest, piece_context, buf_, buf_size_, - single_read_size_); + single_read_size_, log_ext_handler_); if (OB_ISNULL(gen_)) { ret = OB_ALLOCATE_MEMORY_FAILED; CLOG_LOG(WARN, "alloc location data generator failed", K(ret), KPC(this)); diff --git a/unittest/logservice/test_log_external_storage_handler.cpp b/unittest/logservice/test_log_external_storage_handler.cpp index 79f754d05..4fd5fb3d1 100644 --- a/unittest/logservice/test_log_external_storage_handler.cpp +++ b/unittest/logservice/test_log_external_storage_handler.cpp @@ -140,7 +140,7 @@ TEST(TestLogExternalStorageHandler, test_log_external_storage_handler) handler.handle_adapter_ = &adapter; EXPECT_EQ(true, handler.is_inited_); EXPECT_EQ(OB_NOT_RUNNING, handler.pread(uri, storage_info, offset, read_buf, read_buf_size, real_read_size)); - EXPECT_EQ(OB_NOT_RUNNING, handler.resize(16, 0)); + EXPECT_EQ(OB_NOT_RUNNING, handler.resize(16, 100)); // 测试invalid argument @@ -159,8 +159,9 @@ TEST(TestLogExternalStorageHandler, test_log_external_storage_handler) { ObString empty_uri; EXPECT_EQ(OB_INVALID_ARGUMENT, handler.pread(empty_uri, storage_info, offset, read_buf, read_buf_size, real_read_size)); + // NFS的storage info为empty ObString empty_storage_info; - EXPECT_EQ(OB_INVALID_ARGUMENT, handler.pread(uri, empty_storage_info, offset, read_buf, read_buf_size, real_read_size)); + EXPECT_EQ(OB_SUCCESS, handler.pread(uri, empty_storage_info, offset, read_buf, read_buf_size, real_read_size)); int64_t invalid_offset = -1; EXPECT_EQ(OB_INVALID_ARGUMENT, handler.pread(uri, storage_info, invalid_offset, read_buf, read_buf_size, real_read_size)); invalid_offset = 100*1024*1024; @@ -175,7 +176,7 @@ TEST(TestLogExternalStorageHandler, test_log_external_storage_handler) { int64_t invalid_concurrency = -1; EXPECT_EQ(OB_INVALID_ARGUMENT, handler.resize(invalid_concurrency, 0)); - int64_t invalid_timeout_us = -1; + int64_t invalid_timeout_us = 0; EXPECT_EQ(OB_INVALID_ARGUMENT, handler.resize(concurrency, invalid_timeout_us)); } @@ -488,7 +489,7 @@ int main(int argc, char **argv) { system("rm -rf test_log_external_storage_handler.log*"); OB_LOGGER.set_file_name("test_log_external_storage_handler.log", true); - OB_LOGGER.set_log_level("TRACE"); + OB_LOGGER.set_log_level("WDIAG"); srandom(ObTimeUtility::current_time()); PALF_LOG(INFO, "begin unittest::test_log_external_storage_handler"); ::testing::InitGoogleTest(&argc, argv);