[FEAT MERGE] implement of log disk writing throttling

This commit is contained in:
obdev
2023-05-02 01:45:12 +00:00
committed by ob-robot
parent 3ae36f5774
commit 8094743d24
57 changed files with 3492 additions and 216 deletions

View File

@ -18,6 +18,7 @@
#include "lib/oblog/ob_log.h"
#include "lib/time/ob_time_utility.h"
#include "lib/utility/ob_macro_utils.h"
#include "lib/oblog/ob_log_module.h"
#include "share/allocator/ob_tenant_mutil_allocator.h"
#include "share/config/ob_server_config.h"
#include "share/ob_errno.h"
@ -76,6 +77,7 @@ int PalfDiskOptionsWrapper::init(const PalfDiskOptions &disk_opts)
} else {
disk_opts_for_recycling_blocks_ = disk_opts_for_stopping_writing_ = disk_opts;
status_ = Status::NORMAL_STATUS;
cur_unrecyclable_log_disk_size_ = 0;
}
return ret;
}
@ -86,6 +88,7 @@ void PalfDiskOptionsWrapper::reset()
disk_opts_for_recycling_blocks_.reset();
disk_opts_for_stopping_writing_.reset();
status_ = Status::INVALID_STATUS;
cur_unrecyclable_log_disk_size_ = -1;
}
int PalfDiskOptionsWrapper::update_disk_options(const PalfDiskOptions &disk_opts_for_recycling_blocks)
@ -95,6 +98,20 @@ int PalfDiskOptionsWrapper::update_disk_options(const PalfDiskOptions &disk_opts
return update_disk_options_not_guarded_by_lock_(disk_opts_for_recycling_blocks);
}
void PalfDiskOptionsWrapper::set_cur_unrecyclable_log_disk_size(const int64_t unrecyclable_log_disk_size)
{
ObSpinLockGuard guard(disk_opts_lock_);
cur_unrecyclable_log_disk_size_ = unrecyclable_log_disk_size;
}
bool PalfDiskOptionsWrapper::need_throttling() const
{
bool is_need = false;
ObSpinLockGuard guard(disk_opts_lock_);
const int64_t trigger_size = disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ * disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ / 100;
return disk_opts_for_stopping_writing_.is_valid() && cur_unrecyclable_log_disk_size_ > trigger_size;
}
void PalfDiskOptionsWrapper::change_to_normal()
{
ObSpinLockGuard guard(disk_opts_lock_);
@ -112,21 +129,29 @@ int PalfDiskOptionsWrapper::update_disk_options_not_guarded_by_lock_(const PalfD
disk_opts_for_recycling_blocks.log_disk_usage_limit_size_ * disk_opts_for_recycling_blocks.log_disk_utilization_limit_threshold_;
if (false == disk_opts_for_recycling_blocks.is_valid()) {
ret = OB_INVALID_ARGUMENT;
} else if (Status::SHRINKING_STATUS == status_) {
ret = OB_STATE_NOT_MATCH;
PALF_LOG(WARN, "don't support shrink log disk concurrently", K(ret), KPC(this));
} else if (disk_opts_for_recycling_blocks_ == disk_opts_for_recycling_blocks) {
PALF_LOG(INFO, "no need update disk options", K(ret), K(disk_opts_for_recycling_blocks_), K(disk_opts_for_recycling_blocks));
} else if (curr_stop_write_limit_size > next_stop_write_limit_size) {
status_ = Status::SHRINKING_STATUS;
// In process of shrinking, to avoid stopping writing, 'disk_opts_for_stopping_writing_' is still
// an original value, update it with 'disk_opts_for_recycling_blocks' until there is no possibility
// caused stopping writing.
disk_opts_for_recycling_blocks_ = disk_opts_for_recycling_blocks;
} else {
status_ = Status::NORMAL_STATUS;
disk_opts_for_recycling_blocks_ = disk_opts_for_stopping_writing_ = disk_opts_for_recycling_blocks;
PALF_LOG(INFO, "update_disk_options_not_guarded_by_lock_ success", K(curr_stop_write_limit_size), K(next_stop_write_limit_size));
if (Status::SHRINKING_STATUS == status_) {
ret = OB_STATE_NOT_MATCH;
PALF_LOG(WARN, "don't support shrink log disk concurrently", K(ret), KPC(this));
} else if (disk_opts_for_recycling_blocks_ == disk_opts_for_recycling_blocks) {
PALF_LOG(INFO, "no need update disk options", K(ret), K(disk_opts_for_recycling_blocks_), K(disk_opts_for_recycling_blocks));
} else if (curr_stop_write_limit_size > next_stop_write_limit_size) {
status_ = Status::SHRINKING_STATUS;
// In process of shrinking, to avoid stopping writing,
// 'disk_opts_for_stopping_writing_' is still an original value, update it
// with 'disk_opts_for_recycling_blocks' until there is no possibility
// caused stopping writing.
disk_opts_for_recycling_blocks_ = disk_opts_for_recycling_blocks;
} else {
status_ = Status::NORMAL_STATUS;
disk_opts_for_recycling_blocks_ = disk_opts_for_stopping_writing_ = disk_opts_for_recycling_blocks;
PALF_LOG(INFO, "update_disk_options_not_guarded_by_lock_ success", K(curr_stop_write_limit_size), K(next_stop_write_limit_size));
}
//always update writing_throttling_trigger_percentage_
const int64_t new_trigger_percentage = disk_opts_for_recycling_blocks.log_disk_throttling_percentage_;
disk_opts_for_recycling_blocks_.log_disk_throttling_percentage_ = new_trigger_percentage;
disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = new_trigger_percentage;
}
return ret;
}
@ -137,7 +162,7 @@ PalfEnvImpl::PalfEnvImpl() : palf_meta_lock_(common::ObLatchIds::PALF_ENV_LOCK),
fetch_log_engine_(),
log_rpc_(),
cb_thread_pool_(),
log_io_worker_(),
log_io_worker_wrapper_(),
block_gc_timer_task_(),
log_updater_(),
monitor_(NULL),
@ -192,10 +217,10 @@ int PalfEnvImpl::init(
PALF_LOG(ERROR, "LogRpc init failed", K(ret));
} else if (OB_FAIL(cb_thread_pool_.init(io_cb_num, this))) {
PALF_LOG(ERROR, "LogIOTaskThreadPool init failed", K(ret));
} else if (OB_FAIL(log_io_worker_.init(log_io_worker_config_,
tenant_id,
cb_thread_pool_.get_tg_id(),
log_alloc_mgr, this))) {
} else if (OB_FAIL(log_io_worker_wrapper_.init(log_io_worker_config_,
tenant_id,
cb_thread_pool_.get_tg_id(),
log_alloc_mgr, this))) {
PALF_LOG(ERROR, "LogIOWorker init failed", K(ret));
} else if (OB_FAIL(block_gc_timer_task_.init(this))) {
PALF_LOG(ERROR, "ObCheckLogBlockCollectTask init failed", K(ret));
@ -245,7 +270,7 @@ int PalfEnvImpl::start()
PALF_LOG(WARN, "scan_all_palf_handle_impl_director_ failed", K(ret));
} else if (OB_FAIL(cb_thread_pool_.start())) {
PALF_LOG(ERROR, "LogIOTaskThreadPool start failed", K(ret));
} else if (OB_FAIL(log_io_worker_.start())) {
} else if (OB_FAIL(log_io_worker_wrapper_.start())) {
PALF_LOG(ERROR, "LogIOWorker start failed", K(ret));
} else if (OB_FAIL(block_gc_timer_task_.start())) {
PALF_LOG(ERROR, "FileCollectTimerTask start failed", K(ret));
@ -267,7 +292,7 @@ void PalfEnvImpl::stop()
if (is_running_) {
PALF_LOG(INFO, "PalfEnvImpl begin stop", KPC(this));
is_running_ = false;
log_io_worker_.stop();
log_io_worker_wrapper_.stop();
cb_thread_pool_.stop();
block_gc_timer_task_.stop();
fetch_log_engine_.stop();
@ -279,8 +304,8 @@ void PalfEnvImpl::stop()
void PalfEnvImpl::wait()
{
PALF_LOG(INFO, "PalfEnvImpl begin stop", KPC(this));
log_io_worker_.wait();
PALF_LOG(INFO, "PalfEnvImpl begin wait", KPC(this));
log_io_worker_wrapper_.wait();
cb_thread_pool_.wait();
block_gc_timer_task_.wait();
fetch_log_engine_.wait();
@ -295,7 +320,7 @@ void PalfEnvImpl::destroy()
is_running_ = false;
is_inited_ = false;
palf_handle_impl_map_.destroy();
log_io_worker_.destroy();
log_io_worker_wrapper_.destroy();
cb_thread_pool_.destroy();
log_loop_thread_.destroy();
block_gc_timer_task_.destroy();
@ -367,8 +392,8 @@ int PalfEnvImpl::create_palf_handle_impl_(const int64_t palf_id,
ret = OB_ALLOCATE_MEMORY_FAILED;
PALF_LOG(WARN, "alloc palf_handle_impl failed", K(ret));
} else if (OB_FAIL(palf_handle_impl->init(palf_id, access_mode, palf_base_info, replica_type,
&fetch_log_engine_, base_dir, log_alloc_mgr_, log_block_pool_, &log_rpc_, &log_io_worker_,
this, self_, &election_timer_, palf_epoch))) {
&fetch_log_engine_, base_dir, log_alloc_mgr_, log_block_pool_, &log_rpc_,
log_io_worker_wrapper_.get_log_io_worker(palf_id), this, self_, &election_timer_, palf_epoch))) {
PALF_LOG(ERROR, "IPalfHandleImpl init failed", K(ret), K(palf_id));
// NB: always insert value into hash map finally.
} else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, palf_handle_impl))) {
@ -631,14 +656,20 @@ int PalfEnvImpl::try_recycle_blocks()
disk_opts_for_recycling_blocks,
status);
int64_t total_used_size_byte = 0;
int64_t total_unrecyclable_size_byte = 0;
int64_t total_size_to_recycle_blocks = disk_opts_for_recycling_blocks.log_disk_usage_limit_size_;
int64_t total_size_to_stop_write = disk_opts_for_stopping_writing.log_disk_usage_limit_size_;
int64_t palf_id = 0;
int64_t maximum_used_size = 0;
int tmp_ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(get_disk_usage_(total_used_size_byte, palf_id, maximum_used_size))) {
} else if (OB_FAIL(get_disk_usage_(total_used_size_byte, total_unrecyclable_size_byte,
palf_id, maximum_used_size))) {
PALF_LOG(WARN, "get_disk_usage_ failed", K(ret), KPC(this));
} else if (FALSE_IT(disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(total_unrecyclable_size_byte))) {
} else if (OB_SUCCESS != (tmp_ret = log_io_worker_wrapper_.notify_need_writing_throttling(disk_options_wrapper_.need_throttling()))) {
PALF_LOG_RET(WARN, tmp_ret, "failed to update_disk_info", K(disk_options_wrapper_));
} else {
const int64_t usable_disk_size_to_recycle_blocks =
total_size_to_recycle_blocks
@ -696,6 +727,10 @@ int PalfEnvImpl::try_recycle_blocks()
"maximum_log_stream", palf_id,
"oldest_log_stream", oldest_palf_id,
"oldest_scn", oldest_scn);
} else {
if (REACH_TIME_INTERVAL(2 * 1000 * 1000L)) {
PALF_LOG(INFO, "LOG_DISK_OPTION", K(disk_options_wrapper_));
}
}
(void)remove_stale_incomplete_palf_();
@ -709,7 +744,7 @@ bool PalfEnvImpl::check_disk_space_enough()
}
PalfEnvImpl::GetTotalUsedDiskSpace::GetTotalUsedDiskSpace()
: total_used_disk_space_(0), maximum_used_size_(0), palf_id_(INVALID_PALF_ID) {}
: total_used_disk_space_(0), total_unrecyclable_disk_space_(0), maximum_used_size_(0), palf_id_(INVALID_PALF_ID) {}
PalfEnvImpl::GetTotalUsedDiskSpace::~GetTotalUsedDiskSpace() {}
bool PalfEnvImpl::GetTotalUsedDiskSpace::operator() (const LSKey &ls_key, IPalfHandleImpl *palf_handle_impl)
@ -726,7 +761,21 @@ bool PalfEnvImpl::GetTotalUsedDiskSpace::operator() (const LSKey &ls_key, IPalfH
palf_id_ = ls_key.id_;
}
total_used_disk_space_ += palf_handle_impl->get_total_used_disk_space();
PALF_LOG(TRACE, "get_total_used_disk_space success", K(ls_key), "total_used_disk_space(MB):", total_used_disk_space_/MB);
LSN base_lsn;
int ret = OB_SUCCESS;
if (OB_FAIL(palf_handle_impl->get_base_lsn(base_lsn))) {
PALF_LOG(WARN, "failed to get_base_lsn", K(ls_key));
ret_code_ = ret;
bool_ret = false;
} else {
const int64_t unrecyclable_meta_size = (PALF_META_BLOCK_SIZE + MAX_INFO_BLOCK_SIZE);
total_unrecyclable_disk_space_ += (palf_handle_impl->get_end_lsn() - base_lsn + unrecyclable_meta_size);
PALF_LOG(TRACE, "get_total_used_disk_space success", K(ls_key),
"total_used_disk_space(MB):", total_used_disk_space_/MB,
"total_unrecyclable_disk_space(MB):", total_unrecyclable_disk_space_/MB,
"end_lsn", palf_handle_impl->get_end_lsn(),
"base_lsn", base_lsn);
}
}
return bool_ret;
}
@ -933,7 +982,8 @@ int PalfEnvImpl::reload_palf_handle_impl_(const int64_t palf_id)
ret = OB_ALLOCATE_MEMORY_FAILED;
PALF_LOG(WARN, "alloc ipalf_handle_impl failed", K(ret));
} else if (OB_FAIL(tmp_palf_handle_impl->load(palf_id, &fetch_log_engine_, base_dir, log_alloc_mgr_,
log_block_pool_, &log_rpc_, &log_io_worker_, this, self_, &election_timer_, palf_epoch, is_integrity))) {
log_block_pool_, &log_rpc_, log_io_worker_wrapper_.get_log_io_worker(palf_id), this, self_,
&election_timer_, palf_epoch, is_integrity))) {
PALF_LOG(ERROR, "PalfHandleImpl init failed", K(ret), K(palf_id));
} else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, tmp_palf_handle_impl))) {
PALF_LOG(WARN, "palf_handle_impl_map_ insert_and_get failed", K(ret), K(palf_id), K(tmp_palf_handle_impl));
@ -962,6 +1012,7 @@ int PalfEnvImpl::reload_palf_handle_impl_(const int64_t palf_id)
}
int PalfEnvImpl::get_total_used_disk_space_(int64_t &total_used_disk_space,
int64_t &total_unrecyclable_disk_space,
int64_t &palf_id,
int64_t &maximum_used_size)
{
@ -974,18 +1025,19 @@ int PalfEnvImpl::get_total_used_disk_space_(int64_t &total_used_disk_space,
palf_id = functor.palf_id_;
maximum_used_size = functor.maximum_used_size_;
total_used_disk_space = functor.total_used_disk_space_;
total_unrecyclable_disk_space = functor.total_unrecyclable_disk_space_;
}
return ret;
}
int PalfEnvImpl::get_disk_usage_(int64_t &used_size_byte,
int64_t &unrecyclable_disk_space,
int64_t &palf_id,
int64_t &maximum_used_size)
{
int ret = OB_SUCCESS;
if (OB_FAIL(get_total_used_disk_space_(used_size_byte, palf_id, maximum_used_size))) {
if (OB_FAIL(get_total_used_disk_space_(used_size_byte, unrecyclable_disk_space, palf_id, maximum_used_size))) {
PALF_LOG(WARN, "get_total_used_disk_space failed", K(ret), KPC(this));
} else {
}
return ret;
}
@ -993,12 +1045,11 @@ int PalfEnvImpl::get_disk_usage_(int64_t &used_size_byte,
int PalfEnvImpl::get_disk_usage_(int64_t &used_size_byte)
{
int ret = OB_SUCCESS;
int64_t unused_unrecyclable_size = 0;
int64_t unused_palf_id = 0;
int64_t unused_size = 0;
if (OB_FAIL(get_disk_usage_(used_size_byte,
unused_palf_id, unused_size))) {
if (OB_FAIL(get_disk_usage_(used_size_byte, unused_unrecyclable_size, unused_palf_id, unused_size))) {
PALF_LOG(WARN, "get_total_used_disk_space failed", K(ret), KPC(this));
} else {
}
return ret;
}
@ -1150,7 +1201,7 @@ int PalfEnvImpl::get_io_start_time(int64_t &last_working_time)
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
last_working_time = log_io_worker_.get_last_working_time();
last_working_time = log_io_worker_wrapper_.get_last_working_time();
}
return ret;
}
@ -1170,5 +1221,16 @@ int PalfEnvImpl::update_replayable_point(const SCN &replayable_scn)
return ret;
}
int PalfEnvImpl::get_throttling_options(PalfThrottleOptions &options)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
(void)disk_options_wrapper_.get_throttling_options(options);
}
return ret;
}
} // end namespace palf
} // end namespace oceanbase