[FEAT MERGE] Optimization for CdcService when Reading Archivelog
This commit is contained in:
committed by
ob-robot
parent
f436b6e223
commit
41b1ac65f9
@ -500,6 +500,11 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id,
|
|||||||
LogGroupEntry log_group_entry;
|
LogGroupEntry log_group_entry;
|
||||||
LSN lsn;
|
LSN lsn;
|
||||||
FetchMode fetch_mode = get_fetch_mode_when_fetching_log_(ctx, fetch_archive_only);
|
FetchMode fetch_mode = get_fetch_mode_when_fetching_log_(ctx, fetch_archive_only);
|
||||||
|
if (fetch_mode != ctx.get_fetch_mode()) {
|
||||||
|
// when in force_fetch_archive mode, if we don't set fetch mode here,
|
||||||
|
// the ability of reading archive log concurrently can't be utilized
|
||||||
|
ctx.set_fetch_mode(fetch_mode, "ModeConsistence");
|
||||||
|
}
|
||||||
int64_t finish_fetch_ts = OB_INVALID_TIMESTAMP;
|
int64_t finish_fetch_ts = OB_INVALID_TIMESTAMP;
|
||||||
// update fetching rounds
|
// update fetching rounds
|
||||||
scan_round_count++;
|
scan_round_count++;
|
||||||
@ -603,9 +608,9 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id,
|
|||||||
if (OB_SUCC(ret) && fetch_log_succ) {
|
if (OB_SUCC(ret) && fetch_log_succ) {
|
||||||
check_next_group_entry_(lsn, log_group_entry, fetched_log_count, resp, frt, reach_upper_limit, ctx);
|
check_next_group_entry_(lsn, log_group_entry, fetched_log_count, resp, frt, reach_upper_limit, ctx);
|
||||||
resp.set_progress(ctx.get_progress());
|
resp.set_progress(ctx.get_progress());
|
||||||
if (frt.is_stopped()) {
|
// There is reserved space for the last log group entry, so we assume that the buffer is always enough here,
|
||||||
// Stop fetching log
|
// So we could fill response buffer without checking buffer full
|
||||||
} else if (OB_FAIL(prefill_resp_with_group_entry_(ls_id, lsn, log_group_entry, resp, fetch_time_stat))) {
|
if (OB_FAIL(prefill_resp_with_group_entry_(ls_id, lsn, log_group_entry, resp, fetch_time_stat))) {
|
||||||
if (OB_BUF_NOT_ENOUGH == ret) {
|
if (OB_BUF_NOT_ENOUGH == ret) {
|
||||||
handle_when_buffer_full_(frt); // stop
|
handle_when_buffer_full_(frt); // stop
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
|
|||||||
@ -20,6 +20,39 @@ namespace oceanbase
|
|||||||
{
|
{
|
||||||
namespace cdc
|
namespace cdc
|
||||||
{
|
{
|
||||||
|
/////////////////////////////// ExpiredLSArchiveEntryFunctor /////////////////////////////////
|
||||||
|
|
||||||
|
ExpiredArchiveClientLSFunctor::ExpiredArchiveClientLSFunctor(const int64_t current_time):
|
||||||
|
current_time_us_(current_time),
|
||||||
|
valid_client_ls_cnt_(0),
|
||||||
|
other_client_ls_cnt_(0)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
ExpiredArchiveClientLSFunctor::~ExpiredArchiveClientLSFunctor()
|
||||||
|
{
|
||||||
|
valid_client_ls_cnt_ = 0;
|
||||||
|
other_client_ls_cnt_ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ExpiredArchiveClientLSFunctor::operator()(const ClientLSKey &key, ClientLSCtx *value)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
bool bret = true;
|
||||||
|
if (OB_ISNULL(value)) {
|
||||||
|
EXTLOG_LOG(WARN, "get null clientls ctx", K(key));
|
||||||
|
} else {
|
||||||
|
const FetchMode fetch_mode = value->get_fetch_mode();
|
||||||
|
if (FetchMode::FETCHMODE_ARCHIVE == fetch_mode) {
|
||||||
|
valid_client_ls_cnt_++;
|
||||||
|
} else {
|
||||||
|
other_client_ls_cnt_++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return bret;
|
||||||
|
}
|
||||||
|
|
||||||
///////////////////////////////////////////ObCdcService///////////////////////////////////////////
|
///////////////////////////////////////////ObCdcService///////////////////////////////////////////
|
||||||
|
|
||||||
// suppose archive log only has one destination.
|
// suppose archive log only has one destination.
|
||||||
@ -78,7 +111,7 @@ int ObCdcService::init(const uint64_t tenant_id,
|
|||||||
EXTLOG_LOG(WARN, "ObCdcStartLsnLocator init failed", KR(ret), K(tenant_id));
|
EXTLOG_LOG(WARN, "ObCdcStartLsnLocator init failed", KR(ret), K(tenant_id));
|
||||||
} else if (OB_FAIL(fetcher_.init(tenant_id, ls_service, &large_buffer_pool_, &log_ext_handler_))) {
|
} else if (OB_FAIL(fetcher_.init(tenant_id, ls_service, &large_buffer_pool_, &log_ext_handler_))) {
|
||||||
EXTLOG_LOG(WARN, "ObCdcFetcher init failed", KR(ret), K(tenant_id));
|
EXTLOG_LOG(WARN, "ObCdcFetcher init failed", KR(ret), K(tenant_id));
|
||||||
} else if (OB_FAIL(create_tenant_tg_(tenant_id))) {
|
} else if (OB_FAIL(create_tenant_tg_(tenant_id))) {
|
||||||
EXTLOG_LOG(WARN, "cdc thread group create failed", KR(ret), K(tenant_id));
|
EXTLOG_LOG(WARN, "cdc thread group create failed", KR(ret), K(tenant_id));
|
||||||
} else {
|
} else {
|
||||||
is_inited_ = true;
|
is_inited_ = true;
|
||||||
@ -100,9 +133,11 @@ void ObCdcService::run1()
|
|||||||
static const int64_t QUERY_INTERVAL = 10L * BASE_INTERVAL;
|
static const int64_t QUERY_INTERVAL = 10L * BASE_INTERVAL;
|
||||||
static const int64_t RECYCLE_INTERVAL = 10L * 60 * BASE_INTERVAL;
|
static const int64_t RECYCLE_INTERVAL = 10L * 60 * BASE_INTERVAL;
|
||||||
static const int64_t BUFFER_POOL_PURGE_INTERVAL = 10L * 60 * BASE_INTERVAL;
|
static const int64_t BUFFER_POOL_PURGE_INTERVAL = 10L * 60 * BASE_INTERVAL;
|
||||||
|
static const int64_t CHECK_CDC_READ_ARCHIVE_INTERVAL = 10L * BASE_INTERVAL;
|
||||||
int64_t last_query_ts = 0;
|
int64_t last_query_ts = 0;
|
||||||
int64_t last_recycle_ts = 0;
|
int64_t last_recycle_ts = 0;
|
||||||
int64_t last_purge_ts = 0;
|
int64_t last_purge_ts = 0;
|
||||||
|
int64_t last_check_cdc_read_archive_ts = 0;
|
||||||
while(! is_stoped()) {
|
while(! is_stoped()) {
|
||||||
// archive is always off for sys tenant, no need to query archive dest
|
// archive is always off for sys tenant, no need to query archive dest
|
||||||
int64_t current_ts = ObTimeUtility::current_time();
|
int64_t current_ts = ObTimeUtility::current_time();
|
||||||
@ -134,6 +169,13 @@ void ObCdcService::run1()
|
|||||||
large_buffer_pool_.weed_out();
|
large_buffer_pool_.weed_out();
|
||||||
last_purge_ts = current_ts;
|
last_purge_ts = current_ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (current_ts - last_check_cdc_read_archive_ts >= CHECK_CDC_READ_ARCHIVE_INTERVAL) {
|
||||||
|
if (OB_FAIL(resize_log_ext_handler_())) {
|
||||||
|
EXTLOG_LOG(WARN, "failed to resize log ext handler");
|
||||||
|
}
|
||||||
|
last_check_cdc_read_archive_ts = current_ts;
|
||||||
|
}
|
||||||
ob_usleep(static_cast<uint32_t>(BASE_INTERVAL));
|
ob_usleep(static_cast<uint32_t>(BASE_INTERVAL));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -146,7 +188,6 @@ int ObCdcService::start()
|
|||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
EXTLOG_LOG(WARN, "ObCdcService not init", K(ret));
|
EXTLOG_LOG(WARN, "ObCdcService not init", K(ret));
|
||||||
// TODO by wenyue.zxl: change the concurrency of 'log_ext_handler_'(see resize interface)
|
|
||||||
} else if (OB_FAIL(log_ext_handler_.start(0))) {
|
} else if (OB_FAIL(log_ext_handler_.start(0))) {
|
||||||
EXTLOG_LOG(WARN, "log ext handler start failed", K(ret));
|
EXTLOG_LOG(WARN, "log ext handler start failed", K(ret));
|
||||||
} else if (OB_FAIL(start_tenant_tg_(MTL_ID()))) {
|
} else if (OB_FAIL(start_tenant_tg_(MTL_ID()))) {
|
||||||
@ -304,6 +345,37 @@ int ObCdcService::recycle_expired_ctx_(const int64_t cur_ts)
|
|||||||
return OB_SUCCESS;
|
return OB_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObCdcService::resize_log_ext_handler_()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
const int64_t current_ts = ObTimeUtility::current_time();
|
||||||
|
const int64_t tenant_max_cpu = MTL_CPU_COUNT();
|
||||||
|
ExpiredArchiveClientLSFunctor functor(current_ts);
|
||||||
|
ObStorageType type = common::OB_STORAGE_MAX_TYPE;
|
||||||
|
ObArchiveDestInfo dest_info = get_archive_dest_info();
|
||||||
|
|
||||||
|
if (OB_FAIL(ls_ctx_map_.for_each(functor))) {
|
||||||
|
EXTLOG_LOG(ERROR, "failed to get expired archive client ls key in ls_ctx_map");
|
||||||
|
} else {
|
||||||
|
const int64_t other_ls_count = functor.get_other_client_ls_cnt();
|
||||||
|
const int64_t valid_ls_count = functor.get_valid_client_ls_cnt();
|
||||||
|
const int64_t single_read_concurrency = 8; // default 8
|
||||||
|
const int64_t new_concurrency = min(tenant_max_cpu, (single_read_concurrency - 1) * valid_ls_count);
|
||||||
|
|
||||||
|
if (OB_FAIL(log_ext_handler_.resize(new_concurrency))) {
|
||||||
|
EXTLOG_LOG(WARN, "log_ext_handler failed to resize", K(new_concurrency));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
EXTLOG_LOG(INFO, "finish to resize log external storage handler", K(current_ts),
|
||||||
|
K(tenant_max_cpu), K(valid_ls_count), K(other_ls_count), K(new_concurrency));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
void ObCdcService::do_monitor_stat_(const int64_t start_ts,
|
void ObCdcService::do_monitor_stat_(const int64_t start_ts,
|
||||||
const int64_t end_ts,
|
const int64_t end_ts,
|
||||||
const int64_t send_ts,
|
const int64_t send_ts,
|
||||||
|
|||||||
@ -45,6 +45,29 @@ private:
|
|||||||
int64_t cur_ts_;
|
int64_t cur_ts_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class ExpiredArchiveClientLSFunctor
|
||||||
|
{
|
||||||
|
static constexpr int64_t LS_ARCHIVE_ENTRY_EXPIRED_TIME = 10L * 60 * 1000 * 1000; // 10 min;
|
||||||
|
public:
|
||||||
|
explicit ExpiredArchiveClientLSFunctor(const int64_t current_time);
|
||||||
|
~ExpiredArchiveClientLSFunctor();
|
||||||
|
|
||||||
|
bool operator()(const ClientLSKey &key, ClientLSCtx *value);
|
||||||
|
|
||||||
|
int64_t get_other_client_ls_cnt() const {
|
||||||
|
return other_client_ls_cnt_;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t get_valid_client_ls_cnt() const {
|
||||||
|
return valid_client_ls_cnt_;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
int64_t current_time_us_;
|
||||||
|
int64_t valid_client_ls_cnt_;
|
||||||
|
int64_t other_client_ls_cnt_;
|
||||||
|
};
|
||||||
|
|
||||||
class ObCdcService: public lib::TGRunnable
|
class ObCdcService: public lib::TGRunnable
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -98,6 +121,9 @@ public:
|
|||||||
private:
|
private:
|
||||||
int query_tenant_archive_info_();
|
int query_tenant_archive_info_();
|
||||||
int recycle_expired_ctx_(const int64_t cur_ts);
|
int recycle_expired_ctx_(const int64_t cur_ts);
|
||||||
|
|
||||||
|
int resize_log_ext_handler_();
|
||||||
|
|
||||||
void do_monitor_stat_(const int64_t start_ts,
|
void do_monitor_stat_(const int64_t start_ts,
|
||||||
const int64_t end_ts,
|
const int64_t end_ts,
|
||||||
const int64_t send_ts,
|
const int64_t send_ts,
|
||||||
|
|||||||
@ -255,6 +255,7 @@ public:
|
|||||||
T_DEF_INT_INFT(io_thread_num, OB_CLUSTER_PARAMETER, 4, 1, "io thread number");
|
T_DEF_INT_INFT(io_thread_num, OB_CLUSTER_PARAMETER, 4, 1, "io thread number");
|
||||||
T_DEF_INT(idle_pool_thread_num, OB_CLUSTER_PARAMETER, 4, 1, 32, "idle pool thread num");
|
T_DEF_INT(idle_pool_thread_num, OB_CLUSTER_PARAMETER, 4, 1, 32, "idle pool thread num");
|
||||||
T_DEF_INT(dead_pool_thread_num, OB_CLUSTER_PARAMETER, 1, 1, 32, "dead pool thread num");
|
T_DEF_INT(dead_pool_thread_num, OB_CLUSTER_PARAMETER, 1, 1, 32, "dead pool thread num");
|
||||||
|
T_DEF_INT(cdc_read_archive_log_concurrency, OB_CLUSTER_PARAMETER, 8, 1, 64, "log external storage handler thread num");
|
||||||
T_DEF_INT(stream_worker_thread_num, OB_CLUSTER_PARAMETER, 8, 1, 64, "stream worker thread num");
|
T_DEF_INT(stream_worker_thread_num, OB_CLUSTER_PARAMETER, 8, 1, 64, "stream worker thread num");
|
||||||
T_DEF_INT(start_lsn_locator_thread_num, OB_CLUSTER_PARAMETER, 4, 1, 32, "start lsn locator thread num");
|
T_DEF_INT(start_lsn_locator_thread_num, OB_CLUSTER_PARAMETER, 4, 1, 32, "start lsn locator thread num");
|
||||||
T_DEF_INT_INFT(start_lsn_locator_locate_count, OB_CLUSTER_PARAMETER, 1, 1, "start lsn locator locate count");
|
T_DEF_INT_INFT(start_lsn_locator_locate_count, OB_CLUSTER_PARAMETER, 1, 1, "start lsn locator locate count");
|
||||||
|
|||||||
@ -45,6 +45,7 @@ ObLogFetcher::ObLogFetcher() :
|
|||||||
fetching_mode_(ClientFetchingMode::FETCHING_MODE_UNKNOWN),
|
fetching_mode_(ClientFetchingMode::FETCHING_MODE_UNKNOWN),
|
||||||
archive_dest_(),
|
archive_dest_(),
|
||||||
large_buffer_pool_(),
|
large_buffer_pool_(),
|
||||||
|
log_ext_handler_concurrency_(0),
|
||||||
log_ext_handler_(),
|
log_ext_handler_(),
|
||||||
task_pool_(NULL),
|
task_pool_(NULL),
|
||||||
sys_ls_handler_(NULL),
|
sys_ls_handler_(NULL),
|
||||||
@ -144,7 +145,7 @@ int ObLogFetcher::init(
|
|||||||
LOG_ERROR("init part trans resolver factory fail", KR(ret));
|
LOG_ERROR("init part trans resolver factory fail", KR(ret));
|
||||||
} else if (OB_FAIL(large_buffer_pool_.init("ObLogFetcher", 1L * 1024 * 1024 * 1024))) {
|
} else if (OB_FAIL(large_buffer_pool_.init("ObLogFetcher", 1L * 1024 * 1024 * 1024))) {
|
||||||
LOG_ERROR("init large buffer pool failed", KR(ret));
|
LOG_ERROR("init large buffer pool failed", KR(ret));
|
||||||
} else if (OB_FAIL(log_ext_handler_.init())) {
|
} else if (is_direct_fetching_mode(fetching_mode) && OB_FAIL(log_ext_handler_.init())) {
|
||||||
LOG_ERROR("init log ext handler failed", KR(ret));
|
LOG_ERROR("init log ext handler failed", KR(ret));
|
||||||
} else if (OB_FAIL(ls_fetch_mgr_.init(
|
} else if (OB_FAIL(ls_fetch_mgr_.init(
|
||||||
max_cached_ls_fetch_ctx_count,
|
max_cached_ls_fetch_ctx_count,
|
||||||
@ -199,6 +200,7 @@ int ObLogFetcher::init(
|
|||||||
is_loading_data_dict_baseline_data_ = is_loading_data_dict_baseline_data;
|
is_loading_data_dict_baseline_data_ = is_loading_data_dict_baseline_data;
|
||||||
fetching_mode_ = fetching_mode;
|
fetching_mode_ = fetching_mode;
|
||||||
archive_dest_ = archive_dest;
|
archive_dest_ = archive_dest;
|
||||||
|
log_ext_handler_concurrency_ = cfg.cdc_read_archive_log_concurrency;
|
||||||
stop_flag_ = true;
|
stop_flag_ = true;
|
||||||
is_inited_ = true;
|
is_inited_ = true;
|
||||||
|
|
||||||
@ -255,9 +257,13 @@ void ObLogFetcher::destroy()
|
|||||||
log_route_service_.destroy();
|
log_route_service_.destroy();
|
||||||
}
|
}
|
||||||
// Finally reset fetching_mode_ because of some processing dependencies, such as ObLogRouteService
|
// Finally reset fetching_mode_ because of some processing dependencies, such as ObLogRouteService
|
||||||
|
if (is_direct_fetching_mode(fetching_mode_)) {
|
||||||
|
log_ext_handler_.wait();
|
||||||
|
log_ext_handler_.destroy();
|
||||||
|
}
|
||||||
fetching_mode_ = ClientFetchingMode::FETCHING_MODE_UNKNOWN;
|
fetching_mode_ = ClientFetchingMode::FETCHING_MODE_UNKNOWN;
|
||||||
log_ext_handler_.wait();
|
log_ext_handler_concurrency_ = 0;
|
||||||
log_ext_handler_.destroy();
|
|
||||||
|
|
||||||
LOG_INFO("destroy fetcher succ");
|
LOG_INFO("destroy fetcher succ");
|
||||||
}
|
}
|
||||||
@ -278,8 +284,8 @@ int ObLogFetcher::start()
|
|||||||
} else {
|
} else {
|
||||||
stop_flag_ = false;
|
stop_flag_ = false;
|
||||||
|
|
||||||
// TODO by wenyue.zxl: change the concurrency of 'log_ext_handler_'(see resize interface)
|
if (is_direct_fetching_mode(fetching_mode_) &&
|
||||||
if (OB_FAIL(log_ext_handler_.start(0))) {
|
OB_FAIL(log_ext_handler_.start(log_ext_handler_concurrency_))) {
|
||||||
LOG_ERROR("start ObLogExternalStorageHandler fail", KR(ret));
|
LOG_ERROR("start ObLogExternalStorageHandler fail", KR(ret));
|
||||||
} else if (is_integrated_fetching_mode(fetching_mode_) && OB_FAIL(log_route_service_.start())) {
|
} else if (is_integrated_fetching_mode(fetching_mode_) && OB_FAIL(log_route_service_.start())) {
|
||||||
LOG_ERROR("start LogRouterService fail", KR(ret));
|
LOG_ERROR("start LogRouterService fail", KR(ret));
|
||||||
@ -336,7 +342,9 @@ void ObLogFetcher::stop()
|
|||||||
if (is_integrated_fetching_mode(fetching_mode_)) {
|
if (is_integrated_fetching_mode(fetching_mode_)) {
|
||||||
log_route_service_.stop();
|
log_route_service_.stop();
|
||||||
}
|
}
|
||||||
log_ext_handler_.stop();
|
if (is_direct_fetching_mode(fetching_mode_)) {
|
||||||
|
log_ext_handler_.stop();
|
||||||
|
}
|
||||||
|
|
||||||
LOG_INFO("stop fetcher succ");
|
LOG_INFO("stop fetcher succ");
|
||||||
}
|
}
|
||||||
@ -386,6 +394,9 @@ void ObLogFetcher::mark_stop_flag()
|
|||||||
dead_pool_.mark_stop_flag();
|
dead_pool_.mark_stop_flag();
|
||||||
idle_pool_.mark_stop_flag();
|
idle_pool_.mark_stop_flag();
|
||||||
start_lsn_locator_.mark_stop_flag();
|
start_lsn_locator_.mark_stop_flag();
|
||||||
|
if (is_direct_fetching_mode(fetching_mode_)) {
|
||||||
|
log_ext_handler_.stop();
|
||||||
|
}
|
||||||
LOG_INFO("mark fetcher stop succ",K_(is_loading_data_dict_baseline_data));
|
LOG_INFO("mark fetcher stop succ",K_(is_loading_data_dict_baseline_data));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -541,6 +552,7 @@ void ObLogFetcher::configure(const ObLogConfig &cfg)
|
|||||||
const int64_t blacklist_survival_time_penalty_period_min = cfg.blacklist_survival_time_penalty_period_min;
|
const int64_t blacklist_survival_time_penalty_period_min = cfg.blacklist_survival_time_penalty_period_min;
|
||||||
const int64_t blacklist_history_overdue_time_min = cfg.blacklist_history_overdue_time_min;
|
const int64_t blacklist_history_overdue_time_min = cfg.blacklist_history_overdue_time_min;
|
||||||
const int64_t blacklist_history_clear_interval_min = cfg.blacklist_history_clear_interval_min;
|
const int64_t blacklist_history_clear_interval_min = cfg.blacklist_history_clear_interval_min;
|
||||||
|
const int64_t log_ext_handler_concurrency = cfg.cdc_read_archive_log_concurrency;
|
||||||
|
|
||||||
ATOMIC_STORE(&g_print_ls_heartbeat_info, print_ls_heartbeat_info);
|
ATOMIC_STORE(&g_print_ls_heartbeat_info, print_ls_heartbeat_info);
|
||||||
ATOMIC_STORE(&g_inner_heartbeat_interval, inner_heartbeat_interval);
|
ATOMIC_STORE(&g_inner_heartbeat_interval, inner_heartbeat_interval);
|
||||||
@ -579,6 +591,12 @@ void ObLogFetcher::configure(const ObLogConfig &cfg)
|
|||||||
LOG_ERROR("update_background_refresh_time failed", KR(ret),
|
LOG_ERROR("update_background_refresh_time failed", KR(ret),
|
||||||
"log_router_background_refresh_interval_sec", log_router_background_refresh_interval_sec);
|
"log_router_background_refresh_interval_sec", log_router_background_refresh_interval_sec);
|
||||||
}
|
}
|
||||||
|
} else if (IS_INIT && is_direct_fetching_mode(fetching_mode_)) {
|
||||||
|
if (OB_FAIL(log_ext_handler_.resize(log_ext_handler_concurrency))) {
|
||||||
|
LOG_ERROR("log_ext_handler failed to resize when reloading configure", K(log_ext_handler_concurrency));
|
||||||
|
} else {
|
||||||
|
log_ext_handler_concurrency_ = log_ext_handler_concurrency;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -246,6 +246,7 @@ private:
|
|||||||
ClientFetchingMode fetching_mode_;
|
ClientFetchingMode fetching_mode_;
|
||||||
ObBackupPathString archive_dest_;
|
ObBackupPathString archive_dest_;
|
||||||
archive::LargeBufferPool large_buffer_pool_;
|
archive::LargeBufferPool large_buffer_pool_;
|
||||||
|
int64_t log_ext_handler_concurrency_;
|
||||||
logservice::ObLogExternalStorageHandler log_ext_handler_;
|
logservice::ObLogExternalStorageHandler log_ext_handler_;
|
||||||
TaskPool *task_pool_;
|
TaskPool *task_pool_;
|
||||||
IObLogSysLsTaskHandler *sys_ls_handler_;
|
IObLogSysLsTaskHandler *sys_ls_handler_;
|
||||||
|
|||||||
@ -98,6 +98,7 @@ public:
|
|||||||
T_DEF_INT_INFT(io_thread_num, OB_CLUSTER_PARAMETER, 4, 1, "io thread number");
|
T_DEF_INT_INFT(io_thread_num, OB_CLUSTER_PARAMETER, 4, 1, "io thread number");
|
||||||
T_DEF_INT(idle_pool_thread_num, OB_CLUSTER_PARAMETER, 1, 1, 32, "idle pool thread num");
|
T_DEF_INT(idle_pool_thread_num, OB_CLUSTER_PARAMETER, 1, 1, 32, "idle pool thread num");
|
||||||
T_DEF_INT(dead_pool_thread_num, OB_CLUSTER_PARAMETER, 1, 1, 32, "dead pool thread num");
|
T_DEF_INT(dead_pool_thread_num, OB_CLUSTER_PARAMETER, 1, 1, 32, "dead pool thread num");
|
||||||
|
T_DEF_INT(cdc_read_archive_log_concurrency, OB_CLUSTER_PARAMETER, 4, 1, 64, "log external storage handler thread num");
|
||||||
T_DEF_INT(stream_worker_thread_num, OB_CLUSTER_PARAMETER, 4, 1, 64, "stream worker thread num");
|
T_DEF_INT(stream_worker_thread_num, OB_CLUSTER_PARAMETER, 4, 1, 64, "stream worker thread num");
|
||||||
T_DEF_INT(start_lsn_locator_thread_num, OB_CLUSTER_PARAMETER, 1, 1, 32, "start lsn locator thread num");
|
T_DEF_INT(start_lsn_locator_thread_num, OB_CLUSTER_PARAMETER, 1, 1, 32, "start lsn locator thread num");
|
||||||
T_DEF_INT_INFT(start_lsn_locator_locate_count, OB_CLUSTER_PARAMETER, 1, 1, "start lsn locator locate count");
|
T_DEF_INT_INFT(start_lsn_locator_locate_count, OB_CLUSTER_PARAMETER, 1, 1, "start lsn locator locate count");
|
||||||
|
|||||||
@ -46,6 +46,7 @@ ObLogFetcher::ObLogFetcher() :
|
|||||||
fetching_mode_(ClientFetchingMode::FETCHING_MODE_UNKNOWN),
|
fetching_mode_(ClientFetchingMode::FETCHING_MODE_UNKNOWN),
|
||||||
archive_dest_(),
|
archive_dest_(),
|
||||||
large_buffer_pool_(),
|
large_buffer_pool_(),
|
||||||
|
log_ext_handler_concurrency_(0),
|
||||||
log_ext_handler_(),
|
log_ext_handler_(),
|
||||||
ls_ctx_add_info_factory_(NULL),
|
ls_ctx_add_info_factory_(NULL),
|
||||||
err_handler_(NULL),
|
err_handler_(NULL),
|
||||||
@ -131,7 +132,7 @@ int ObLogFetcher::init(
|
|||||||
LOG_ERROR("init progress controller fail", KR(ret));
|
LOG_ERROR("init progress controller fail", KR(ret));
|
||||||
} else if (OB_FAIL(large_buffer_pool_.init("ObLogFetcher", 1L * 1024 * 1024 * 1024))) {
|
} else if (OB_FAIL(large_buffer_pool_.init("ObLogFetcher", 1L * 1024 * 1024 * 1024))) {
|
||||||
LOG_ERROR("init large buffer pool failed", KR(ret));
|
LOG_ERROR("init large buffer pool failed", KR(ret));
|
||||||
} else if (OB_FAIL(log_ext_handler_.init())) {
|
} else if (is_direct_fetching_mode(fetching_mode) && OB_FAIL(log_ext_handler_.init())) {
|
||||||
LOG_ERROR("init failed", KR(ret));
|
LOG_ERROR("init failed", KR(ret));
|
||||||
} else if (OB_FAIL(ls_fetch_mgr_.init(
|
} else if (OB_FAIL(ls_fetch_mgr_.init(
|
||||||
progress_controller_,
|
progress_controller_,
|
||||||
@ -193,6 +194,7 @@ int ObLogFetcher::init(
|
|||||||
paused_ = false;
|
paused_ = false;
|
||||||
pause_time_ = OB_INVALID_TIMESTAMP;
|
pause_time_ = OB_INVALID_TIMESTAMP;
|
||||||
resume_time_ = OB_INVALID_TIMESTAMP;
|
resume_time_ = OB_INVALID_TIMESTAMP;
|
||||||
|
log_ext_handler_concurrency_ = cfg.cdc_read_archive_log_concurrency;
|
||||||
stop_flag_ = true;
|
stop_flag_ = true;
|
||||||
is_inited_ = true;
|
is_inited_ = true;
|
||||||
|
|
||||||
@ -236,8 +238,11 @@ void ObLogFetcher::destroy()
|
|||||||
log_route_service_.wait();
|
log_route_service_.wait();
|
||||||
log_route_service_.destroy();
|
log_route_service_.destroy();
|
||||||
}
|
}
|
||||||
log_ext_handler_.wait();
|
log_ext_handler_concurrency_ = 0;
|
||||||
log_ext_handler_.destroy();
|
if (is_direct_fetching_mode(fetching_mode_)) {
|
||||||
|
log_ext_handler_.wait();
|
||||||
|
log_ext_handler_.destroy();
|
||||||
|
}
|
||||||
// Finally reset fetching_mode_ because of some processing dependencies, such as ObLogRouteService
|
// Finally reset fetching_mode_ because of some processing dependencies, such as ObLogRouteService
|
||||||
fetching_mode_ = ClientFetchingMode::FETCHING_MODE_UNKNOWN;
|
fetching_mode_ = ClientFetchingMode::FETCHING_MODE_UNKNOWN;
|
||||||
log_fetcher_user_ = LogFetcherUser::UNKNOWN;
|
log_fetcher_user_ = LogFetcherUser::UNKNOWN;
|
||||||
@ -271,7 +276,8 @@ int ObLogFetcher::start()
|
|||||||
} else if (OB_FAIL(stream_worker_.start())) {
|
} else if (OB_FAIL(stream_worker_.start())) {
|
||||||
LOG_ERROR("start stream worker fail", KR(ret));
|
LOG_ERROR("start stream worker fail", KR(ret));
|
||||||
// TODO by wenyue.zxl: change the concurrency of 'log_ext_handler_'(see resize interface)
|
// TODO by wenyue.zxl: change the concurrency of 'log_ext_handler_'(see resize interface)
|
||||||
} else if (OB_FAIL(log_ext_handler_.start(0))) {
|
} else if (is_direct_fetching_mode(fetching_mode_) &&
|
||||||
|
OB_FAIL(log_ext_handler_.start(log_ext_handler_concurrency_))) {
|
||||||
LOG_ERROR("start log external handler failed", KR(ret));
|
LOG_ERROR("start log external handler failed", KR(ret));
|
||||||
} else {
|
} else {
|
||||||
LOG_INFO("LogFetcher start success");
|
LOG_INFO("LogFetcher start success");
|
||||||
@ -297,7 +303,9 @@ void ObLogFetcher::stop()
|
|||||||
if (is_integrated_fetching_mode(fetching_mode_)) {
|
if (is_integrated_fetching_mode(fetching_mode_)) {
|
||||||
log_route_service_.stop();
|
log_route_service_.stop();
|
||||||
}
|
}
|
||||||
log_ext_handler_.stop();
|
if (is_direct_fetching_mode(fetching_mode_)) {
|
||||||
|
log_ext_handler_.stop();
|
||||||
|
}
|
||||||
|
|
||||||
LOG_INFO("LogFetcher stop success");
|
LOG_INFO("LogFetcher stop success");
|
||||||
}
|
}
|
||||||
@ -349,6 +357,9 @@ void ObLogFetcher::mark_stop_flag()
|
|||||||
if (is_cdc(log_fetcher_user_)) {
|
if (is_cdc(log_fetcher_user_)) {
|
||||||
start_lsn_locator_.mark_stop_flag();
|
start_lsn_locator_.mark_stop_flag();
|
||||||
}
|
}
|
||||||
|
if (is_direct_fetching_mode(fetching_mode_)) {
|
||||||
|
log_ext_handler_.stop();
|
||||||
|
}
|
||||||
LOG_INFO("LogFetcher mark stop succ");
|
LOG_INFO("LogFetcher mark stop succ");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -641,6 +652,7 @@ void ObLogFetcher::configure(const ObLogFetcherConfig &cfg)
|
|||||||
const int64_t blacklist_survival_time_penalty_period_min = cfg.blacklist_survival_time_penalty_period_min;
|
const int64_t blacklist_survival_time_penalty_period_min = cfg.blacklist_survival_time_penalty_period_min;
|
||||||
const int64_t blacklist_history_overdue_time_min = cfg.blacklist_history_overdue_time_min;
|
const int64_t blacklist_history_overdue_time_min = cfg.blacklist_history_overdue_time_min;
|
||||||
const int64_t blacklist_history_clear_interval_min = cfg.blacklist_history_clear_interval_min;
|
const int64_t blacklist_history_clear_interval_min = cfg.blacklist_history_clear_interval_min;
|
||||||
|
const int64_t log_ext_handler_concurrency = cfg.cdc_read_archive_log_concurrency;
|
||||||
|
|
||||||
ATOMIC_STORE(&g_print_ls_heartbeat_info, print_ls_heartbeat_info);
|
ATOMIC_STORE(&g_print_ls_heartbeat_info, print_ls_heartbeat_info);
|
||||||
|
|
||||||
@ -676,6 +688,12 @@ void ObLogFetcher::configure(const ObLogFetcherConfig &cfg)
|
|||||||
LOG_ERROR("update_background_refresh_time failed", KR(ret),
|
LOG_ERROR("update_background_refresh_time failed", KR(ret),
|
||||||
"log_router_background_refresh_interval_sec", log_router_background_refresh_interval_sec);
|
"log_router_background_refresh_interval_sec", log_router_background_refresh_interval_sec);
|
||||||
}
|
}
|
||||||
|
} else if (IS_INIT && is_direct_fetching_mode(fetching_mode_)) {
|
||||||
|
if (OB_FAIL(log_ext_handler_.resize(log_ext_handler_concurrency))) {
|
||||||
|
LOG_ERROR("log_ext_handler failed to resize when reloading configure", K(log_ext_handler_concurrency));
|
||||||
|
} else {
|
||||||
|
log_ext_handler_concurrency_ = log_ext_handler_concurrency;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -291,6 +291,7 @@ private:
|
|||||||
ClientFetchingMode fetching_mode_;
|
ClientFetchingMode fetching_mode_;
|
||||||
ObBackupPathString archive_dest_;
|
ObBackupPathString archive_dest_;
|
||||||
archive::LargeBufferPool large_buffer_pool_;
|
archive::LargeBufferPool large_buffer_pool_;
|
||||||
|
int64_t log_ext_handler_concurrency_;
|
||||||
logservice::ObLogExternalStorageHandler log_ext_handler_;
|
logservice::ObLogExternalStorageHandler log_ext_handler_;
|
||||||
ObILogFetcherLSCtxAddInfoFactory *ls_ctx_add_info_factory_;
|
ObILogFetcherLSCtxAddInfoFactory *ls_ctx_add_info_factory_;
|
||||||
IObLogErrHandler *err_handler_;
|
IObLogErrHandler *err_handler_;
|
||||||
|
|||||||
@ -118,7 +118,7 @@ public:
|
|||||||
// The interface to submit log for physical restore and physical standby
|
// The interface to submit log for physical restore and physical standby
|
||||||
class ObLogRestoreHandler : public ObLogHandlerBase
|
class ObLogRestoreHandler : public ObLogHandlerBase
|
||||||
{
|
{
|
||||||
static const int64_t MAX_RAW_WRITE_RETRY_TIMES = 1000;
|
static const int64_t MAX_RAW_WRITE_RETRY_TIMES = 10000;
|
||||||
static const int64_t MAX_RETRY_SLEEP_US = 100;
|
static const int64_t MAX_RETRY_SLEEP_US = 100;
|
||||||
public:
|
public:
|
||||||
ObLogRestoreHandler();
|
ObLogRestoreHandler();
|
||||||
|
|||||||
Reference in New Issue
Block a user