Avoid stopping writing after restart due to log disk is shrinking.
This commit is contained in:
@ -38,6 +38,6 @@ ob_unittest_clog(test_ob_simple_log_config_change_mock_ele test_ob_simple_log_co
|
||||
ob_unittest_clog(test_ob_simple_log_arb_mock_ele test_ob_simple_log_arb_mock_ele.cpp)
|
||||
ob_unittest_clog(test_ob_simple_log_flashback_arb test_ob_simple_log_flashback_arb.cpp)
|
||||
ob_unittest_clog(test_ob_simple_log_restart test_ob_simple_log_restart.cpp)
|
||||
|
||||
ob_unittest_clog(test_ob_simple_log_disk_mgr test_ob_simple_log_disk_mgr.cpp)
|
||||
|
||||
add_subdirectory(archiveservice)
|
||||
|
||||
@ -80,6 +80,8 @@ public:
|
||||
{return &allocator_;}
|
||||
virtual int update_disk_opts(const PalfDiskOptions &opts) override final
|
||||
{return OB_NOT_SUPPORTED;};
|
||||
virtual int get_disk_opts(PalfDiskOptions &opts) override final
|
||||
{return OB_NOT_SUPPORTED;};
|
||||
virtual int get_palf_env(PalfEnv *&palf_env)
|
||||
{return OB_NOT_SUPPORTED;};
|
||||
bool is_valid() const override final
|
||||
|
||||
@ -426,6 +426,37 @@ int ObSimpleLogClusterTestEnv::create_paxos_group_with_arb(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSimpleLogClusterTestEnv::update_disk_options(const int64_t server_id,
|
||||
const int64_t recycle_threshold,
|
||||
const int64_t write_stop_threshold)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t MB = 1024 * 1024;
|
||||
PalfOptions opts;
|
||||
auto cluster = get_cluster();
|
||||
if (server_id >= 0 && server_id < cluster.size()) {
|
||||
ObTenantEnv::set_tenant(cluster[server_id]->get_tenant_base());
|
||||
auto srv = cluster[server_id];
|
||||
if (true == srv->is_arb_server()) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
} else {
|
||||
auto palf_env_impl = dynamic_cast<PalfEnvImpl*>(srv->get_palf_env());
|
||||
if (OB_FAIL(palf_env_impl->get_options(opts))) {
|
||||
PALF_LOG(ERROR, "get_optiosn failed", K(ret), K(server_id));
|
||||
} else {
|
||||
// palf_env_impl->disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_utilization_threshold_
|
||||
// = recycle_threshold;
|
||||
// palf_env_impl->disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_utilization_limit_threshold_
|
||||
// = write_stop_threshold;
|
||||
opts.disk_options_.log_disk_utilization_threshold_ = recycle_threshold;
|
||||
opts.disk_options_.log_disk_utilization_limit_threshold_ = write_stop_threshold;
|
||||
ret = srv->update_disk_opts(opts.disk_options_);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSimpleLogClusterTestEnv::update_disk_options(const int64_t server_id, const int64_t file_block_num)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -443,13 +474,40 @@ int ObSimpleLogClusterTestEnv::update_disk_options(const int64_t server_id, cons
|
||||
PALF_LOG(ERROR, "get_optiosn failed", K(ret), K(server_id));
|
||||
} else {
|
||||
opts.disk_options_.log_disk_usage_limit_size_ = file_block_num * PALF_PHY_BLOCK_SIZE;
|
||||
ret = palf_env_impl->update_options(opts);
|
||||
ret = srv->update_disk_opts(opts.disk_options_);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSimpleLogClusterTestEnv::update_disk_options(const int64_t log_block_num)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
auto cluster = get_cluster();
|
||||
for (int64_t server_id = 0; server_id < cluster.size(); server_id++) {
|
||||
ret = update_disk_options(server_id, log_block_num);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSimpleLogClusterTestEnv::get_disk_options(const int64_t server_id,
|
||||
PalfDiskOptions &opts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
auto cluster = get_cluster();
|
||||
if (server_id >= 0 && server_id < cluster.size()) {
|
||||
ObTenantEnv::set_tenant(cluster[server_id]->get_tenant_base());
|
||||
auto srv = cluster[server_id];
|
||||
if (true == srv->is_arb_server()) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
} else {
|
||||
ret = srv->get_disk_opts(opts);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSimpleLogClusterTestEnv::restart_paxos_groups()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1394,5 +1452,33 @@ bool ObSimpleLogClusterTestEnv::is_upgraded(PalfHandleImplGuard &leader, const i
|
||||
return has_upgraded;
|
||||
}
|
||||
|
||||
int ObSimpleLogClusterTestEnv::wait_until_disk_space_to(const int64_t server_id,
|
||||
const int64_t expect_log_disk_space)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
auto cluster = get_cluster();
|
||||
if (server_id >= 0 && server_id < cluster.size()) {
|
||||
ObTenantEnv::set_tenant(cluster[server_id]->get_tenant_base());
|
||||
auto srv = cluster[server_id];
|
||||
if (true == srv->is_arb_server()) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
} else {
|
||||
auto palf_env_impl = dynamic_cast<PalfEnvImpl*>(srv->get_palf_env());
|
||||
int64_t used_log_disk_space = INT64_MAX;
|
||||
int64_t total_log_disk_space = 0;
|
||||
while (used_log_disk_space >= expect_log_disk_space) {
|
||||
if (OB_FAIL(palf_env_impl->get_disk_usage(used_log_disk_space, total_log_disk_space))) {
|
||||
PALF_LOG(WARN, "get_disk_usage failed", K(used_log_disk_space), K(total_log_disk_space));
|
||||
} else {
|
||||
usleep(10*1000);
|
||||
PALF_LOG(INFO, "disk_space is not enough", K(used_log_disk_space), K(expect_log_disk_space));
|
||||
}
|
||||
}
|
||||
PALF_LOG(INFO, "wait_until_disk_space_to success", K(used_log_disk_space), K(expect_log_disk_space));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // end namespace unittest
|
||||
} // end namespace oceanbase
|
||||
|
||||
@ -175,6 +175,9 @@ public:
|
||||
PalfHandleImplGuard &leader);
|
||||
virtual int delete_paxos_group(const int64_t id);
|
||||
virtual int update_disk_options(const int64_t server_id, const int64_t log_block_number);
|
||||
virtual int update_disk_options(const int64_t server_id, const int64_t recycle_threshold, const int64_t write_stop_threshold);
|
||||
virtual int update_disk_options(const int64_t log_block_number);
|
||||
virtual int get_disk_options(const int64_t server_id, PalfDiskOptions &opts);
|
||||
virtual int restart_paxos_groups();
|
||||
virtual int restart_server(const int64_t server_id);
|
||||
virtual int remove_dir();
|
||||
@ -245,6 +248,7 @@ public:
|
||||
void set_disk_options_for_throttling(PalfEnvImpl &palf_env_impl);
|
||||
bool is_degraded(const PalfHandleImplGuard &leader, const int64_t degraded_server_idx);
|
||||
bool is_upgraded(PalfHandleImplGuard &leader, const int64_t palf_id);
|
||||
int wait_until_disk_space_to(const int64_t server_id, const int64_t expect_log_disk_space);
|
||||
public:
|
||||
static int64_t palf_id_;
|
||||
private:
|
||||
|
||||
209
mittest/logservice/env/ob_simple_log_server.cpp
vendored
209
mittest/logservice/env/ob_simple_log_server.cpp
vendored
@ -154,6 +154,8 @@ int ObSimpleLogServer::simple_init(
|
||||
SERVER_LOG(WARN, "init_io failed", K(ret), K(addr));
|
||||
} else if (FALSE_IT(guard.click("init_io_")) || OB_FAIL(init_log_service_())) {
|
||||
SERVER_LOG(WARN, "init_log_service failed", K(ret), K(addr));
|
||||
} else if (FALSE_IT(guard.click("init_log_service_")) || OB_FAIL(looper_.init(this))) {
|
||||
SERVER_LOG(WARN, "init ObLooper failed", K(ret), K(addr));
|
||||
} else {
|
||||
guard.click("init_log_service_");
|
||||
SERVER_LOG(INFO, "simple_log_server init success", KPC(this), K(guard));
|
||||
@ -161,9 +163,113 @@ int ObSimpleLogServer::simple_init(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSimpleLogServer::construct_allowed_new_log_disk_(const uint64_t tenant_id,
|
||||
const int64_t expected_log_disk_size,
|
||||
const int64_t old_log_disk_size,
|
||||
int64_t &allowed_new_log_disk_size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool can_update_log_disk_size_with_expected_log_disk = true;
|
||||
int64_t palf_log_disk_size = 0;
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
if (OB_FAIL(guard.switch_to(tenant_id))) {
|
||||
LOG_WARN("failed to switch tenant", K(ret), K(tenant_id));
|
||||
} else {
|
||||
ObLogService *log_service = MTL(ObLogService*);
|
||||
int64_t used_log_disk_size = 0;
|
||||
if (OB_ISNULL(log_service)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("ObLogService is nullptr", K(ret), KP(log_service));
|
||||
} else if (OB_FAIL(log_service->get_palf_stable_disk_usage(used_log_disk_size, palf_log_disk_size))) {
|
||||
LOG_WARN("failed to update_log_disk_size", K(ret), K(palf_log_disk_size), K(allowed_new_log_disk_size), K(expected_log_disk_size));
|
||||
} else if (FALSE_IT(can_update_log_disk_size_with_expected_log_disk = (expected_log_disk_size >= palf_log_disk_size))) {
|
||||
// For shrinking log disk, we still update log disk size of 'new_unit' to 'old_log_disk_size'.
|
||||
} else if (!can_update_log_disk_size_with_expected_log_disk
|
||||
&& FALSE_IT(allowed_new_log_disk_size = old_log_disk_size)) {
|
||||
// For expanding log disk, we update log disk size of 'new_unit' to 'expected_log_disk_size'.
|
||||
} else if (can_update_log_disk_size_with_expected_log_disk
|
||||
&& FALSE_IT(allowed_new_log_disk_size = expected_log_disk_size)) {
|
||||
} else {
|
||||
LOG_INFO("construct_new_log_disk success", K(ret), K(tenant_id), K(can_update_log_disk_size_with_expected_log_disk),
|
||||
K(old_log_disk_size), K(allowed_new_log_disk_size), K(expected_log_disk_size));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSimpleLogServer::update_tenant_log_disk_size_(const uint64_t tenant_id,
|
||||
const int64_t expected_log_disk_size,
|
||||
const int64_t old_log_disk_size,
|
||||
const int64_t allowed_new_log_disk_size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
if (OB_SUCC(guard.switch_to(tenant_id))) {
|
||||
ObLogService *log_service = MTL(ObLogService *);
|
||||
if (OB_ISNULL(log_service)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
} else if (OB_FAIL(log_service->update_log_disk_usage_limit_size(expected_log_disk_size))) {
|
||||
LOG_WARN("failed to update_log_disk_usage_limit_size", K(ret), K(tenant_id), K(expected_log_disk_size),
|
||||
K(old_log_disk_size), K(allowed_new_log_disk_size));
|
||||
} else if (OB_FAIL(log_block_pool_.update_tenant(old_log_disk_size, allowed_new_log_disk_size))) {
|
||||
LOG_WARN("failed to update teannt int ObServerLogBlockMGR", K(ret), K(tenant_id), K(expected_log_disk_size),
|
||||
K(old_log_disk_size), K(allowed_new_log_disk_size));
|
||||
} else {
|
||||
disk_opts_.log_disk_usage_limit_size_ = allowed_new_log_disk_size;
|
||||
LOG_INFO("update_log_disk_usage_limit_size success", K(ret), K(tenant_id), K(expected_log_disk_size),
|
||||
K(old_log_disk_size), K(allowed_new_log_disk_size), K(disk_opts_));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSimpleLogServer::update_disk_opts_no_lock_(const PalfDiskOptions &opts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t old_log_disk_size = disk_opts_.log_disk_usage_limit_size_;
|
||||
int64_t expected_log_disk_size = opts.log_disk_usage_limit_size_;
|
||||
int64_t allowed_new_log_disk_size = 0;
|
||||
// 内部表中的disk_opts立马生效
|
||||
inner_table_disk_opts_ = opts;
|
||||
// disk_opts_表示本地持久化最新的disk_opts,在construct_allowed_new_log_disk_中会修改disk_opts_的log_disk_usage_limit_size_
|
||||
disk_opts_ = opts;
|
||||
if (!opts.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
CLOG_LOG(WARN, "invalid argument", K(opts));
|
||||
} else if (OB_FAIL(construct_allowed_new_log_disk_(node_id_, expected_log_disk_size,
|
||||
old_log_disk_size, allowed_new_log_disk_size))) {
|
||||
CLOG_LOG(WARN, "construct_allowed_new_log_disk_ failed", K(expected_log_disk_size), K(old_log_disk_size),
|
||||
K(allowed_new_log_disk_size));
|
||||
} else if (OB_FAIL(update_tenant_log_disk_size_(node_id_, expected_log_disk_size,
|
||||
old_log_disk_size, allowed_new_log_disk_size))) {
|
||||
CLOG_LOG(WARN, "update_tenant_log_disk_size_ failed", K(expected_log_disk_size), K(old_log_disk_size),
|
||||
K(allowed_new_log_disk_size));
|
||||
} else {
|
||||
CLOG_LOG(INFO, "update_disk_opts success", K(opts), K(disk_opts_), K(expected_log_disk_size), K(old_log_disk_size),
|
||||
K(allowed_new_log_disk_size));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int ObSimpleLogServer::update_disk_opts(const PalfDiskOptions &opts)
|
||||
{
|
||||
return palf_env_->palf_env_impl_.disk_options_wrapper_.update_disk_options(opts);
|
||||
int ret = OB_SUCCESS;
|
||||
ObSpinLockGuard guard(log_disk_lock_);
|
||||
ret = update_disk_opts_no_lock_(opts);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSimpleLogServer::try_resize()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSpinLockGuard guard(log_disk_lock_);
|
||||
if (disk_opts_ != inner_table_disk_opts_) {
|
||||
if (OB_FAIL(update_disk_opts_no_lock_(inner_table_disk_opts_))) {
|
||||
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSimpleLogServer::init_memory_dump_timer_()
|
||||
@ -243,7 +349,7 @@ int ObSimpleLogServer::init_io_(const std::string &cluster_name)
|
||||
storage_env.default_block_size_ = OB_DEFAULT_MACRO_BLOCK_SIZE;
|
||||
storage_env.data_disk_size_ = 1024 * 1024 * 1024;
|
||||
storage_env.data_disk_percentage_ = 0;
|
||||
storage_env.log_disk_size_ = 2LL * 1024 * 1024 * 1024;
|
||||
storage_env.log_disk_size_ = 10LL * 1024 * 1024 * 1024;
|
||||
storage_env.log_disk_percentage_ = 0;
|
||||
|
||||
storage_env.log_spec_.log_dir_ = slog_dir.c_str();
|
||||
@ -256,7 +362,6 @@ int ObSimpleLogServer::init_io_(const std::string &cluster_name)
|
||||
iod_opt_array_[3].set("datafile_disk_percentage", storage_env.data_disk_percentage_);
|
||||
iod_opt_array_[4].set("datafile_size", storage_env.data_disk_size_);
|
||||
iod_opts_.opt_cnt_ = MAX_IOD_OPT_CNT;
|
||||
const int64_t DEFATULT_RESERVED_SIZE = 10 * 1024 * 1024 * 1024ul;
|
||||
if (OB_FAIL(io_device_->init(iod_opts_))) {
|
||||
SERVER_LOG(ERROR, "init io device fail", K(ret));
|
||||
} else if (OB_FAIL(log_block_pool_.init(storage_env.clog_dir_))) {
|
||||
@ -267,9 +372,10 @@ int ObSimpleLogServer::init_io_(const std::string &cluster_name)
|
||||
SERVER_LOG(INFO, "init_io_ successs", K(ret), K(guard));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
log_block_pool_.get_tenants_log_disk_size_func_ = [this, &storage_env](int64_t &log_disk_size) -> int
|
||||
log_block_pool_.get_tenants_log_disk_size_func_ = [](int64_t &log_disk_size) -> int
|
||||
{
|
||||
log_disk_size = log_block_pool_.lower_align_(storage_env.log_disk_size_);
|
||||
// ObServerLogBlockMGR 率先于 ObLogService加载,此时租户使用的log_disk_size为0.
|
||||
log_disk_size = 0;
|
||||
return OB_SUCCESS;
|
||||
};
|
||||
if (OB_FAIL(log_block_pool_.start(storage_env.log_disk_size_))) {
|
||||
@ -285,12 +391,18 @@ int ObSimpleLogServer::init_log_service_()
|
||||
int ret = OB_SUCCESS;
|
||||
// init deps of log_service
|
||||
palf::PalfOptions opts;
|
||||
opts.disk_options_.log_disk_usage_limit_size_ = 10 * 1024 * 1024 * 1024ul;
|
||||
opts.disk_options_.log_disk_utilization_threshold_ = 80;
|
||||
opts.disk_options_.log_disk_utilization_limit_threshold_ = 95;
|
||||
opts.disk_options_.log_disk_throttling_percentage_ = 100;
|
||||
opts.disk_options_.log_disk_throttling_maximum_duration_ = 2 * 3600 * 1000 * 1000L;
|
||||
opts.disk_options_.log_writer_parallelism_ = 2;
|
||||
if (disk_opts_.is_valid()) {
|
||||
opts.disk_options_ = disk_opts_;
|
||||
} else {
|
||||
opts.disk_options_.log_disk_usage_limit_size_ = 10 * 1024 * 1024 * 1024ul;
|
||||
opts.disk_options_.log_disk_utilization_threshold_ = 80;
|
||||
opts.disk_options_.log_disk_utilization_limit_threshold_ = 95;
|
||||
opts.disk_options_.log_disk_throttling_percentage_ = 100;
|
||||
opts.disk_options_.log_disk_throttling_maximum_duration_ = 2 * 3600 * 1000 * 1000L;
|
||||
opts.disk_options_.log_writer_parallelism_ = 2;
|
||||
disk_opts_ = opts.disk_options_;
|
||||
inner_table_disk_opts_ = disk_opts_;
|
||||
}
|
||||
std::string clog_dir = clog_dir_ + "/tenant_1";
|
||||
allocator_ = OB_NEW(ObTenantMutilAllocator, "TestBase", node_id_);
|
||||
ObMemAttr attr(1, "SimpleLog");
|
||||
@ -301,12 +413,14 @@ int ObSimpleLogServer::init_log_service_()
|
||||
} else if (OB_FAIL(log_service_.init(opts, clog_dir.c_str(), addr_, allocator_, transport_, &ls_service_,
|
||||
&location_service_, &reporter_, &log_block_pool_, &sql_proxy_, net_keepalive_))) {
|
||||
SERVER_LOG(ERROR, "init_log_service_ fail", K(ret));
|
||||
} else if (OB_FAIL(log_block_pool_.create_tenant(opts.disk_options_.log_disk_usage_limit_size_))) {
|
||||
SERVER_LOG(ERROR, "crete tenant failed", K(ret));
|
||||
} else if (OB_FAIL(mock_election_map_.init(ele_attr))) {
|
||||
SERVER_LOG(ERROR, "mock_election_map_ init fail", K(ret));
|
||||
} else {
|
||||
palf_env_ = log_service_.get_palf_env();
|
||||
palf_env_->palf_env_impl_.log_rpc_.tenant_id_ = OB_SERVER_TENANT_ID;
|
||||
SERVER_LOG(INFO, "init_log_service_ success", K(ret));
|
||||
SERVER_LOG(INFO, "init_log_service_ success", K(ret), K(opts), K(disk_opts_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -321,6 +435,8 @@ int ObSimpleLogServer::simple_start(const bool is_bootstrap = false)
|
||||
SERVER_LOG(ERROR, "deliver_ start failed", K(ret));
|
||||
} else if (OB_FAIL(log_service_.arb_service_.start())) {
|
||||
SERVER_LOG(ERROR, "arb_service start failed", K(ret));
|
||||
} else if (OB_FAIL(looper_.start())) {
|
||||
SERVER_LOG(ERROR, "ObLooper start failed", K(ret));
|
||||
}
|
||||
// do not start entire log_service_ for now, it will
|
||||
// slow down cases running
|
||||
@ -331,6 +447,7 @@ int ObSimpleLogServer::simple_close(const bool is_shutdown = false)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTenantEnv::set_tenant(tenant_base_);
|
||||
looper_.destroy();
|
||||
ObTimeGuard guard("simple_close", 0);
|
||||
deliver_.destroy(is_shutdown);
|
||||
guard.click("destroy");
|
||||
@ -742,5 +859,73 @@ int ObLogDeliver::handle_req_(rpc::ObRequest &req)
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObLooper::ObLooper() : log_server_(nullptr),
|
||||
run_interval_(0),
|
||||
is_inited_(false) { }
|
||||
|
||||
ObLooper::~ObLooper()
|
||||
{
|
||||
destroy();
|
||||
}
|
||||
|
||||
int ObLooper::init(ObSimpleLogServer *log_server)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
ret = OB_INIT_TWICE;
|
||||
PALF_LOG(WARN, "ObLooper has been inited", K(ret));
|
||||
} else if (NULL == log_server) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
PALF_LOG(WARN, "invalid argument", K(ret), KP(log_server));
|
||||
} else {
|
||||
log_server_ = log_server;
|
||||
share::ObThreadPool::set_run_wrapper(MTL_CTX());
|
||||
run_interval_ = ObLooper::INTERVAL_US;
|
||||
is_inited_ = true;
|
||||
}
|
||||
|
||||
if ((OB_FAIL(ret)) && (OB_INIT_TWICE != ret)) {
|
||||
destroy();
|
||||
}
|
||||
PALF_LOG(INFO, "ObLooper init finished", K(ret));
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObLooper::destroy()
|
||||
{
|
||||
stop();
|
||||
wait();
|
||||
is_inited_ = false;
|
||||
log_server_ = NULL;
|
||||
}
|
||||
|
||||
void ObLooper::run1()
|
||||
{
|
||||
lib::set_thread_name("ObLooper");
|
||||
log_loop_();
|
||||
}
|
||||
|
||||
void ObLooper::log_loop_()
|
||||
{
|
||||
|
||||
while (!has_set_stop()) {
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t start_ts = ObTimeUtility::current_time();
|
||||
|
||||
if (OB_FAIL(log_server_->try_resize())) {
|
||||
PALF_LOG(WARN, "try_resize failed", K(ret));
|
||||
}
|
||||
const int64_t round_cost_time = ObTimeUtility::current_time() - start_ts;
|
||||
int32_t sleep_ts = run_interval_ - static_cast<const int32_t>(round_cost_time);
|
||||
if (sleep_ts < 0) {
|
||||
sleep_ts = 0;
|
||||
}
|
||||
ob_usleep(sleep_ts);
|
||||
|
||||
if (REACH_TENANT_TIME_INTERVAL(5 * 1000 * 1000)) {
|
||||
PALF_LOG(INFO, "ObLooper round_cost_time(us)", K(round_cost_time));
|
||||
}
|
||||
}
|
||||
}
|
||||
} // unittest
|
||||
} // oceanbase
|
||||
|
||||
41
mittest/logservice/env/ob_simple_log_server.h
vendored
41
mittest/logservice/env/ob_simple_log_server.h
vendored
@ -120,6 +120,26 @@ protected:
|
||||
common::ObAddr self_;
|
||||
};
|
||||
|
||||
class ObSimpleLogServer;
|
||||
class ObLooper : public share::ObThreadPool {
|
||||
public:
|
||||
static constexpr int64_t INTERVAL_US = 1000*1000;
|
||||
ObLooper();
|
||||
virtual ~ObLooper();
|
||||
public:
|
||||
int init(ObSimpleLogServer *log_server);
|
||||
void destroy();
|
||||
void run1();
|
||||
private:
|
||||
void log_loop_();
|
||||
private:
|
||||
ObSimpleLogServer *log_server_;
|
||||
int64_t run_interval_;
|
||||
bool is_inited_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObLooper);
|
||||
};
|
||||
|
||||
class ObLogDeliver : public rpc::frame::ObReqDeliver, public lib::TGTaskHandler, public ObMittestBlacklist
|
||||
{
|
||||
public:
|
||||
@ -224,6 +244,7 @@ public:
|
||||
virtual ILogBlockPool *get_block_pool() = 0;
|
||||
virtual ObILogAllocator *get_allocator() = 0;
|
||||
virtual int update_disk_opts(const PalfDiskOptions &opts) = 0;
|
||||
virtual int get_disk_opts(PalfDiskOptions &opts) = 0;
|
||||
virtual int get_palf_env(PalfEnv *&palf_env) = 0;
|
||||
virtual bool is_arb_server() const {return false;};
|
||||
virtual int64_t get_node_id() = 0;
|
||||
@ -264,6 +285,12 @@ public:
|
||||
ObILogAllocator *get_allocator() override final
|
||||
{ return allocator_; }
|
||||
virtual int update_disk_opts(const PalfDiskOptions &opts) override final;
|
||||
virtual int get_disk_opts(PalfDiskOptions &opts) override final
|
||||
{
|
||||
opts = disk_opts_;
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
virtual int try_resize();
|
||||
virtual int get_palf_env(PalfEnv *&palf_env)
|
||||
{ palf_env = palf_env_; return OB_SUCCESS;}
|
||||
virtual void revert_palf_env(IPalfEnvImpl *palf_env) { UNUSED(palf_env); }
|
||||
@ -348,6 +375,16 @@ protected:
|
||||
int init_network_(const common::ObAddr &addr, const bool is_bootstrap);
|
||||
int init_log_service_();
|
||||
int init_memory_dump_timer_();
|
||||
// 更新log_disk_size的逻辑保持和ObMultiTenant.cpp中维护同名函数一样
|
||||
int construct_allowed_new_log_disk_(const uint64_t tenant_id,
|
||||
const int64_t expected_log_disk_size,
|
||||
const int64_t old_log_disk_size,
|
||||
int64_t &allowed_new_log_disk_size);
|
||||
int update_tenant_log_disk_size_(const uint64_t tenant_id,
|
||||
const int64_t expected_log_disk_size,
|
||||
const int64_t old_log_disk_size,
|
||||
const int64_t allowed_log_disk_size);
|
||||
int update_disk_opts_no_lock_(const PalfDiskOptions &opts);
|
||||
|
||||
private:
|
||||
int64_t node_id_;
|
||||
@ -378,6 +415,10 @@ private:
|
||||
ObSrvRpcProxy srv_proxy_;
|
||||
logservice::coordinator::ObFailureDetector detector_;
|
||||
MockElectionMap mock_election_map_;
|
||||
ObSpinLock log_disk_lock_;
|
||||
palf::PalfDiskOptions disk_opts_;
|
||||
palf::PalfDiskOptions inner_table_disk_opts_;
|
||||
ObLooper looper_;
|
||||
};
|
||||
|
||||
} // end unittest
|
||||
|
||||
@ -381,35 +381,6 @@ TEST_F(TestObSimpleLogClusterBasicFunc, limit_palf_instances)
|
||||
EXPECT_EQ(OB_SUCCESS, delete_paxos_group(id1));
|
||||
}
|
||||
|
||||
TEST_F(TestObSimpleLogClusterBasicFunc, out_of_disk_space)
|
||||
{
|
||||
SET_CASE_LOG_FILE(TEST_NAME, "out_of_disk_space");
|
||||
int64_t id = ATOMIC_AAF(&palf_id_, 1);
|
||||
int server_idx = 0;
|
||||
PalfEnv *palf_env = NULL;
|
||||
int64_t leader_idx = 0;
|
||||
PalfHandleImplGuard leader;
|
||||
share::SCN create_scn = share::SCN::base_scn();
|
||||
EXPECT_EQ(OB_SUCCESS, get_palf_env(server_idx, palf_env));
|
||||
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, create_scn, leader_idx, leader));
|
||||
update_disk_options(leader_idx, MIN_DISK_SIZE_PER_PALF_INSTANCE/PALF_PHY_BLOCK_SIZE);
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 6*31+1, id, MAX_LOG_BODY_SIZE));
|
||||
LogStorage *log_storage = &leader.palf_handle_impl_->log_engine_.log_storage_;
|
||||
while (LSN(6*PALF_BLOCK_SIZE) > log_storage->log_tail_) {
|
||||
usleep(500);
|
||||
}
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 20, id, MAX_LOG_BODY_SIZE));
|
||||
while (LSN(6*PALF_BLOCK_SIZE + 20 * MAX_LOG_BODY_SIZE) > log_storage->log_tail_) {
|
||||
usleep(500);
|
||||
}
|
||||
LSN max_lsn = leader.palf_handle_impl_->get_max_lsn();
|
||||
wait_lsn_until_flushed(max_lsn, leader);
|
||||
PALF_LOG(INFO, "out of disk max_lsn", K(max_lsn));
|
||||
usleep(palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS + 5*10000);
|
||||
EXPECT_EQ(OB_LOG_OUTOF_DISK_SPACE, submit_log(leader, 1, id, MAX_LOG_BODY_SIZE));
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_ = 5 * MIN_DISK_SIZE_PER_PALF_INSTANCE;
|
||||
}
|
||||
|
||||
TEST_F(TestObSimpleLogClusterBasicFunc, submit_group_log)
|
||||
{
|
||||
SET_CASE_LOG_FILE(TEST_NAME, "submit_group_log");
|
||||
|
||||
365
mittest/logservice/test_ob_simple_log_disk_mgr.cpp
Normal file
365
mittest/logservice/test_ob_simple_log_disk_mgr.cpp
Normal file
@ -0,0 +1,365 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include "lib/ob_define.h"
|
||||
#include "lib/ob_errno.h"
|
||||
#include <cstdio>
|
||||
#include <gtest/gtest.h>
|
||||
#include <signal.h>
|
||||
#include <stdexcept>
|
||||
#define private public
|
||||
#define protected public
|
||||
#include "env/ob_simple_log_cluster_env.h"
|
||||
#undef private
|
||||
#undef protected
|
||||
#include "logservice/palf/log_reader_utils.h"
|
||||
#include "logservice/palf/log_define.h"
|
||||
#include "logservice/palf/log_group_entry_header.h"
|
||||
#include "logservice/palf/log_io_worker.h"
|
||||
#include "logservice/palf/lsn.h"
|
||||
|
||||
const std::string TEST_NAME = "log_disk_mgr";
|
||||
|
||||
using namespace oceanbase::common;
|
||||
using namespace oceanbase;
|
||||
namespace oceanbase
|
||||
{
|
||||
using namespace logservice;
|
||||
namespace unittest
|
||||
{
|
||||
class TestObSimpleLogDiskMgr : public ObSimpleLogClusterTestEnv
|
||||
{
|
||||
public:
|
||||
TestObSimpleLogDiskMgr() : ObSimpleLogClusterTestEnv()
|
||||
{
|
||||
int ret = init();
|
||||
if (OB_SUCCESS != ret) {
|
||||
throw std::runtime_error("TestObSimpleLogDiskMgr init failed");
|
||||
}
|
||||
}
|
||||
~TestObSimpleLogDiskMgr()
|
||||
{
|
||||
destroy();
|
||||
}
|
||||
int init()
|
||||
{
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
void destroy()
|
||||
{}
|
||||
int64_t id_;
|
||||
};
|
||||
|
||||
int64_t ObSimpleLogClusterTestBase::member_cnt_ = 1;
|
||||
int64_t ObSimpleLogClusterTestBase::node_cnt_ = 1;
|
||||
std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
|
||||
bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false;
|
||||
|
||||
TEST_F(TestObSimpleLogDiskMgr, out_of_disk_space)
|
||||
{
|
||||
SET_CASE_LOG_FILE(TEST_NAME, "out_of_disk_space");
|
||||
int64_t id = ATOMIC_AAF(&palf_id_, 1);
|
||||
int server_idx = 0;
|
||||
PalfEnv *palf_env = NULL;
|
||||
int64_t leader_idx = 0;
|
||||
PalfHandleImplGuard leader;
|
||||
share::SCN create_scn = share::SCN::base_scn();
|
||||
EXPECT_EQ(OB_SUCCESS, get_palf_env(server_idx, palf_env));
|
||||
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, create_scn, leader_idx, leader));
|
||||
update_disk_options(leader_idx, MIN_DISK_SIZE_PER_PALF_INSTANCE/PALF_PHY_BLOCK_SIZE);
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 6*31+1, id, MAX_LOG_BODY_SIZE));
|
||||
LogStorage *log_storage = &leader.palf_handle_impl_->log_engine_.log_storage_;
|
||||
while (LSN(6*PALF_BLOCK_SIZE) > log_storage->log_tail_) {
|
||||
usleep(500);
|
||||
}
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 20, id, MAX_LOG_BODY_SIZE));
|
||||
while (LSN(6*PALF_BLOCK_SIZE + 20 * MAX_LOG_BODY_SIZE) > log_storage->log_tail_) {
|
||||
usleep(500);
|
||||
}
|
||||
LSN max_lsn = leader.palf_handle_impl_->get_max_lsn();
|
||||
wait_lsn_until_flushed(max_lsn, leader);
|
||||
PALF_LOG(INFO, "out of disk max_lsn", K(max_lsn));
|
||||
usleep(palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS + 5*10000);
|
||||
EXPECT_EQ(OB_LOG_OUTOF_DISK_SPACE, submit_log(leader, 1, id, MAX_LOG_BODY_SIZE));
|
||||
usleep(ObLooper::INTERVAL_US*2);
|
||||
}
|
||||
|
||||
TEST_F(TestObSimpleLogDiskMgr, update_disk_options_basic)
|
||||
{
|
||||
SET_CASE_LOG_FILE(TEST_NAME, "update_disk_options_basic");
|
||||
OB_LOGGER.set_log_level("INFO");
|
||||
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
|
||||
// 将日志盘空间设置为10GB
|
||||
update_disk_options(10*1024*1024*1024ul/PALF_PHY_BLOCK_SIZE);
|
||||
PALF_LOG(INFO, "start update_disk_options_basic", K(id));
|
||||
int64_t leader_idx = 0;
|
||||
PalfHandleImplGuard leader;
|
||||
PalfEnv *palf_env = NULL;
|
||||
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
|
||||
EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
|
||||
|
||||
// 提交1G的日志
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 500, id, MAX_LOG_BODY_SIZE));
|
||||
EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader));
|
||||
|
||||
EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 20));
|
||||
usleep(1000*1000+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS);
|
||||
EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.status_);
|
||||
EXPECT_EQ(palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_,
|
||||
20*PALF_PHY_BLOCK_SIZE);
|
||||
// 可以在上一次为缩容完成之前,可以继续缩容
|
||||
EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 10));
|
||||
usleep(1000*1000+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS);
|
||||
EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.status_);
|
||||
EXPECT_EQ(palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_,
|
||||
10*PALF_PHY_BLOCK_SIZE);
|
||||
// 此时日志盘依旧未完成缩容,ObSimpleLogServer维护的disk_opts_依旧是10GB
|
||||
{
|
||||
PalfDiskOptions opts;
|
||||
EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts));
|
||||
EXPECT_EQ(opts.log_disk_usage_limit_size_, 10*1024*1024*1024ul);
|
||||
}
|
||||
// 可以在上一次未缩容完成之前,可以继续扩容, 同时由于扩容后日志盘依旧小于第一次缩容
|
||||
// ,因此依旧处于缩容状态.
|
||||
EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 11));
|
||||
usleep(1000*1000+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS);
|
||||
EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.status_);
|
||||
EXPECT_EQ(palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_,
|
||||
11*PALF_PHY_BLOCK_SIZE);
|
||||
// 此时日志盘依旧未完成缩容,ObSimpleLogServer维护的disk_opts_依旧是10GB
|
||||
{
|
||||
PalfDiskOptions opts;
|
||||
EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts));
|
||||
EXPECT_EQ(opts.log_disk_usage_limit_size_, 10*1024*1024*1024ul);
|
||||
}
|
||||
usleep(1000*1000+100+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS);
|
||||
EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.status_);
|
||||
EXPECT_EQ(palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_,
|
||||
11*PALF_PHY_BLOCK_SIZE);
|
||||
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
|
||||
const LSN base_lsn(12*PALF_BLOCK_SIZE);
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->set_base_lsn(base_lsn));
|
||||
usleep(1000);
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1000));
|
||||
EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader));
|
||||
// wait until disk space enough
|
||||
EXPECT_EQ(OB_SUCCESS, wait_until_disk_space_to(leader_idx, (11*PALF_PHY_BLOCK_SIZE*80+100)/100));
|
||||
usleep(1000*1000+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS);
|
||||
EXPECT_EQ(PalfDiskOptionsWrapper::Status::NORMAL_STATUS,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.status_);
|
||||
EXPECT_EQ(palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_,
|
||||
11*PALF_PHY_BLOCK_SIZE);
|
||||
// 等待后台线程再次执行update_disk_options操作,预期本地持久化的disk_opts会变为11*PALF_PHY_BLOCK_SIZE
|
||||
{
|
||||
usleep(2*ObLooper::INTERVAL_US);
|
||||
PalfDiskOptions opts;
|
||||
EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts));
|
||||
EXPECT_EQ(opts.log_disk_usage_limit_size_, 11*PALF_PHY_BLOCK_SIZE);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestObSimpleLogDiskMgr, update_disk_options_restart)
|
||||
{
|
||||
disable_hot_cache_ = true;
|
||||
SET_CASE_LOG_FILE(TEST_NAME, "update_disk_options_restart");
|
||||
OB_LOGGER.set_log_level("INFO");
|
||||
// 扩容操作
|
||||
EXPECT_EQ(OB_SUCCESS, update_disk_options(10*1024*1024*1024ul/PALF_PHY_BLOCK_SIZE));
|
||||
usleep(1000*1000+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS);
|
||||
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
|
||||
int64_t leader_idx = 0;
|
||||
PalfEnv *palf_env = NULL;
|
||||
{
|
||||
PalfHandleImplGuard leader;
|
||||
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
|
||||
{
|
||||
PalfDiskOptions opts;
|
||||
EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts));
|
||||
EXPECT_EQ(opts.log_disk_usage_limit_size_, 10*1024*1024*1024ul);
|
||||
}
|
||||
EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
|
||||
{
|
||||
EXPECT_EQ(PalfDiskOptionsWrapper::Status::NORMAL_STATUS,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.status_);
|
||||
EXPECT_EQ(10*1024*1024*1024ul,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_);
|
||||
}
|
||||
// 产生10个文件的数据
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10*32, id, MAX_LOG_BODY_SIZE));
|
||||
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
|
||||
// 最小的log_disk_size要求是存在8个日志文件
|
||||
EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 8));
|
||||
usleep(1000*1000+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS);
|
||||
// 宕机前,缩容不会正式生效,因此不会导致停写
|
||||
EXPECT_EQ(true, palf_env->palf_env_impl_.diskspace_enough_);
|
||||
EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.status_);
|
||||
EXPECT_EQ(10*1024*1024*1024ul, palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_);
|
||||
int64_t log_disk_usage, total_log_disk_size;
|
||||
EXPECT_EQ(OB_SUCCESS, palf_env->palf_env_impl_.get_disk_usage(log_disk_usage, total_log_disk_size));
|
||||
PALF_LOG(INFO, "log_disk_usage:", K(log_disk_usage), K(total_log_disk_size));
|
||||
// 缩容未成功前,log_disk_usage_limit_size_依旧保持10G.
|
||||
// 本地持久化的log_disk_size为10G
|
||||
// 内部表中持久化的log_disk_size为8*PALF_PHY_BLOCK_SIZE
|
||||
PalfDiskOptions opts;
|
||||
EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts));
|
||||
EXPECT_EQ(opts.log_disk_usage_limit_size_, 10*1024*1024*1024ul);
|
||||
}
|
||||
// 物理缩容未成功前,宕机重启预期不会停写
|
||||
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
|
||||
{
|
||||
// 内部表中持久化的log_disk_size依旧是8*PALF_PHY_BLOCK_SIZE
|
||||
// 重启后继续缩容
|
||||
int64_t log_disk_usage, total_log_disk_size;
|
||||
EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
|
||||
usleep(2*ObLooper::INTERVAL_US);
|
||||
usleep(1000*1000 + BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS);
|
||||
EXPECT_EQ(true, palf_env->palf_env_impl_.diskspace_enough_);
|
||||
EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.status_);
|
||||
// 本地持久化的(slog)记录的依旧是10G,因此不会停写
|
||||
EXPECT_EQ(10*1024*1024*1024ul, palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_);
|
||||
EXPECT_EQ(OB_SUCCESS, palf_env->palf_env_impl_.get_disk_usage(log_disk_usage, total_log_disk_size));
|
||||
PALF_LOG(INFO, "log_disk_usage:", K(log_disk_usage), K(total_log_disk_size));
|
||||
PalfDiskOptions opts;
|
||||
EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts));
|
||||
EXPECT_EQ(opts.log_disk_usage_limit_size_, 10*1024*1024*1024ul);
|
||||
}
|
||||
{
|
||||
PalfHandleImplGuard leader;
|
||||
EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx));
|
||||
PalfDiskOptions opts;
|
||||
EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts));
|
||||
EXPECT_EQ(opts.log_disk_usage_limit_size_, 10*1024*1024*1024ul);
|
||||
|
||||
// 物理上保证不缩容,内部表中持久化的变为16*PALF_PHY_BLOCK_SIZE, 由于palf的log_disk_size是10G,
|
||||
// 因此本次对palf是一次缩容操作. 但下一轮GC任务运行时,发现当前的使用的日志盘空间不会导致停写,
|
||||
// 于是日志盘变为正常状态
|
||||
EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 16));
|
||||
// 在下一轮GC任务运行后,本地持久化的log_disk_size也会变为16*PALF_PHY_BLOCK_SIZE
|
||||
usleep(1000*1000+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS);
|
||||
// 经过一轮GC后,会变为NORMAL_STATUS
|
||||
EXPECT_EQ(true, palf_env->palf_env_impl_.diskspace_enough_);
|
||||
EXPECT_EQ(PalfDiskOptionsWrapper::Status::NORMAL_STATUS,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.status_);
|
||||
EXPECT_EQ(16*PALF_PHY_BLOCK_SIZE,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_);
|
||||
// 后台线程会完成缩容操作,最终本地持久化的变为16*PALF_PHY_BLOCK_SIZE
|
||||
usleep(ObLooper::INTERVAL_US*2);
|
||||
EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts));
|
||||
EXPECT_EQ(opts.log_disk_usage_limit_size_, 16*PALF_PHY_BLOCK_SIZE);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestObSimpleLogDiskMgr, overshelling)
|
||||
{
|
||||
disable_hot_cache_ = true;
|
||||
SET_CASE_LOG_FILE(TEST_NAME, "overshelling");
|
||||
OB_LOGGER.set_log_level("INFO");
|
||||
ObServerLogBlockMgr *log_pool = nullptr;
|
||||
EXPECT_EQ(OB_SUCCESS, get_log_pool(0, log_pool));
|
||||
ASSERT_NE(nullptr, log_pool);
|
||||
// 验证扩缩容场景下的LogPool字段的正确性
|
||||
EXPECT_EQ(16*PALF_PHY_BLOCK_SIZE, log_pool->min_log_disk_size_for_all_tenants_);
|
||||
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
|
||||
int64_t leader_idx = 0;
|
||||
PalfEnv *palf_env = NULL;
|
||||
{
|
||||
PalfHandleImplGuard leader;
|
||||
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
|
||||
}
|
||||
EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 15));
|
||||
int64_t log_disk_size_used_for_tenants = log_pool->min_log_disk_size_for_all_tenants_;
|
||||
// 缩容还未成功,预期log_disk_size_used_for_tenants一定是16*PALF_PHY_BLOCK_SIZE
|
||||
PalfDiskOptions opts;
|
||||
EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts));
|
||||
if (opts.log_disk_usage_limit_size_ == 16*PALF_PHY_BLOCK_SIZE) {
|
||||
EXPECT_EQ(16*PALF_PHY_BLOCK_SIZE, log_disk_size_used_for_tenants);
|
||||
// 缩容不会立马生效
|
||||
usleep(ObLooper::INTERVAL_US*2);
|
||||
EXPECT_EQ(15*PALF_PHY_BLOCK_SIZE, log_pool->min_log_disk_size_for_all_tenants_);
|
||||
} else {
|
||||
PALF_LOG(INFO, "update_disk_options successfully", K(log_disk_size_used_for_tenants), K(opts));
|
||||
}
|
||||
|
||||
// 扩容预期立马成功
|
||||
EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 16));
|
||||
EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts));
|
||||
EXPECT_EQ(16*PALF_PHY_BLOCK_SIZE, log_disk_size_used_for_tenants);
|
||||
EXPECT_EQ(16*PALF_PHY_BLOCK_SIZE, opts.log_disk_usage_limit_size_);
|
||||
|
||||
// 直接扩容为LogPool上限值
|
||||
const int64_t limit_log_disk_size = log_pool->log_pool_meta_.curr_total_size_;
|
||||
EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, limit_log_disk_size/PALF_PHY_BLOCK_SIZE));
|
||||
EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts));
|
||||
log_disk_size_used_for_tenants = log_pool->min_log_disk_size_for_all_tenants_;
|
||||
EXPECT_EQ(limit_log_disk_size, log_disk_size_used_for_tenants);
|
||||
EXPECT_EQ(limit_log_disk_size, opts.log_disk_usage_limit_size_);
|
||||
|
||||
{
|
||||
PalfHandleImplGuard leader;
|
||||
EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx));
|
||||
// 生成10个文件
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10*32, id, MAX_LOG_BODY_SIZE));
|
||||
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
|
||||
EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 10));
|
||||
// 缩容一定不会成功,租户日志盘规格依旧为上限值
|
||||
usleep(ObLooper::INTERVAL_US * 2);
|
||||
EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
|
||||
EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.status_);
|
||||
EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts));
|
||||
log_disk_size_used_for_tenants = log_pool->min_log_disk_size_for_all_tenants_;
|
||||
EXPECT_EQ(limit_log_disk_size, log_disk_size_used_for_tenants);
|
||||
EXPECT_EQ(limit_log_disk_size, opts.log_disk_usage_limit_size_);
|
||||
EXPECT_EQ(OB_MACHINE_RESOURCE_NOT_ENOUGH, log_pool->create_tenant(MIN_DISK_SIZE_PER_PALF_INSTANCE));
|
||||
const LSN base_lsn(8*PALF_BLOCK_SIZE);
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->set_base_lsn(base_lsn));
|
||||
usleep(1000);
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1000));
|
||||
EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader));
|
||||
EXPECT_EQ(OB_SUCCESS, wait_until_disk_space_to(leader_idx, (10*PALF_PHY_BLOCK_SIZE*80+100)/100));
|
||||
usleep(ObLooper::INTERVAL_US * 2);
|
||||
EXPECT_EQ(PalfDiskOptionsWrapper::Status::NORMAL_STATUS,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.status_);
|
||||
EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts));
|
||||
log_disk_size_used_for_tenants = log_pool->min_log_disk_size_for_all_tenants_;
|
||||
EXPECT_EQ(10*PALF_PHY_BLOCK_SIZE, log_disk_size_used_for_tenants);
|
||||
EXPECT_EQ(10*PALF_PHY_BLOCK_SIZE, opts.log_disk_usage_limit_size_);
|
||||
EXPECT_EQ(OB_SUCCESS, log_pool->create_tenant(MIN_DISK_SIZE_PER_PALF_INSTANCE));
|
||||
log_pool->abort_create_tenant(MIN_DISK_SIZE_PER_PALF_INSTANCE);
|
||||
|
||||
// 扩容预计一定成功
|
||||
EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, limit_log_disk_size/PALF_PHY_BLOCK_SIZE));
|
||||
EXPECT_EQ(PalfDiskOptionsWrapper::Status::NORMAL_STATUS,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.status_);
|
||||
EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts));
|
||||
log_disk_size_used_for_tenants = log_pool->min_log_disk_size_for_all_tenants_;
|
||||
EXPECT_EQ(limit_log_disk_size, log_disk_size_used_for_tenants);
|
||||
EXPECT_EQ(limit_log_disk_size, opts.log_disk_usage_limit_size_);
|
||||
EXPECT_EQ(OB_MACHINE_RESOURCE_NOT_ENOUGH, log_pool->create_tenant(MIN_DISK_SIZE_PER_PALF_INSTANCE));
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
} // namespace unittest
|
||||
} // namespace oceanbase
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME);
|
||||
}
|
||||
@ -95,85 +95,6 @@ void read_padding_entry(PalfHandleImplGuard &leader, SCN padding_scn, LSN paddin
|
||||
|
||||
}
|
||||
|
||||
TEST_F(TestObSimpleLogClusterSingleReplica, update_disk_options)
|
||||
{
|
||||
SET_CASE_LOG_FILE(TEST_NAME, "update_disk_options");
|
||||
OB_LOGGER.set_log_level("TRACE");
|
||||
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
|
||||
PALF_LOG(INFO, "start update_disk_options", K(id));
|
||||
int64_t leader_idx = 0;
|
||||
PalfHandleImplGuard leader;
|
||||
PalfEnv *palf_env = NULL;
|
||||
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
|
||||
EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env));
|
||||
PalfOptions opts;
|
||||
EXPECT_EQ(OB_SUCCESS, palf_env->get_options(opts));
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 4 * 32 + 10, id, MAX_LOG_BODY_SIZE));
|
||||
while (leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_
|
||||
< LSN(4 * 32 * MAX_LOG_BODY_SIZE)) {
|
||||
sleep(1);
|
||||
}
|
||||
block_id_t min_block_id, max_block_id;
|
||||
EXPECT_EQ(OB_SUCCESS,
|
||||
leader.palf_handle_impl_->log_engine_.get_block_id_range(
|
||||
min_block_id, max_block_id));
|
||||
EXPECT_EQ(4, max_block_id);
|
||||
opts.disk_options_.log_disk_usage_limit_size_ = 8 * PALF_PHY_BLOCK_SIZE;
|
||||
EXPECT_EQ(OB_SUCCESS, palf_env->update_options(opts));
|
||||
sleep(1);
|
||||
opts.disk_options_.log_disk_utilization_limit_threshold_ = 50;
|
||||
opts.disk_options_.log_disk_utilization_threshold_ = 40;
|
||||
EXPECT_EQ(OB_SUCCESS, palf_env->update_options(opts));
|
||||
|
||||
opts.disk_options_.log_disk_usage_limit_size_ = 4 * PALF_PHY_BLOCK_SIZE;
|
||||
EXPECT_EQ(OB_STATE_NOT_MATCH, palf_env->update_options(opts));
|
||||
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_ = palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_;
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.status_ = PalfDiskOptionsWrapper::Status::NORMAL_STATUS;
|
||||
|
||||
opts.disk_options_.log_disk_utilization_limit_threshold_ = 95;
|
||||
opts.disk_options_.log_disk_utilization_threshold_ = 80;
|
||||
EXPECT_EQ(OB_SUCCESS, palf_env->update_options(opts));
|
||||
EXPECT_EQ(OB_STATE_NOT_MATCH, palf_env->update_options(opts));
|
||||
sleep(1);
|
||||
EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.status_);
|
||||
EXPECT_GT(palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_);
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->set_base_lsn(LSN(4 * PALF_BLOCK_SIZE)));
|
||||
//EXPECT_EQ(LSN(4*PALF_BLOCK_SIZE), leader.palf_handle_.palf_handle_impl_->log_engine_.get_log_meta().get_log_snapshot_meta().base_lsn_);
|
||||
while (leader.palf_handle_impl_->log_engine_.base_lsn_for_block_gc_
|
||||
!= LSN(4 * PALF_BLOCK_SIZE)) {
|
||||
sleep(1);
|
||||
}
|
||||
// wait blocks to be recycled
|
||||
while (leader.palf_handle_impl_->log_engine_.get_base_lsn_used_for_block_gc() != (LSN(4*PALF_BLOCK_SIZE))) {
|
||||
sleep(1);
|
||||
}
|
||||
sleep(1);
|
||||
EXPECT_EQ(PalfDiskOptionsWrapper::Status::NORMAL_STATUS,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.status_);
|
||||
EXPECT_EQ(
|
||||
opts.disk_options_,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_);
|
||||
EXPECT_EQ(
|
||||
opts.disk_options_,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_);
|
||||
PALF_LOG(INFO, "runlin trace", K(opts),
|
||||
"holders:", palf_env->palf_env_impl_.disk_options_wrapper_);
|
||||
// test expand
|
||||
opts.disk_options_.log_disk_usage_limit_size_ = 5 * 1024 * 1024 * 1024ul;
|
||||
EXPECT_EQ(OB_SUCCESS, palf_env->update_options(opts));
|
||||
opts.disk_options_.log_disk_usage_limit_size_ = 6 * 1024 * 1024 * 1024ul;
|
||||
EXPECT_EQ(OB_SUCCESS, palf_env->update_options(opts));
|
||||
EXPECT_EQ(
|
||||
opts.disk_options_,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_);
|
||||
EXPECT_EQ(
|
||||
opts.disk_options_,
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_);
|
||||
}
|
||||
|
||||
TEST_F(TestObSimpleLogClusterSingleReplica, delete_paxos_group)
|
||||
{
|
||||
SET_CASE_LOG_FILE(TEST_NAME, "delete_paxos_group");
|
||||
|
||||
@ -18,6 +18,11 @@
|
||||
#include "lib/time/ob_time_utility.h"
|
||||
#include "lib/utility/ob_defer.h"
|
||||
#include "logservice/palf/election/utils/election_common_define.h"
|
||||
#define private public
|
||||
#define protected public
|
||||
#include "share/config/ob_server_config.h"
|
||||
#undef private
|
||||
#undef protected
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -79,6 +84,8 @@ bool ObSimpleClusterTestBase::is_started_ = false;
|
||||
std::string ObSimpleClusterTestBase::env_prefix_;
|
||||
std::string ObSimpleClusterTestBase::curr_dir_;
|
||||
bool ObSimpleClusterTestBase::enable_env_warn_log_ = false;
|
||||
const char *ObSimpleClusterTestBase::UNIT_BASE ="box_ym_";
|
||||
const char *ObSimpleClusterTestBase::POOL_BASE ="pool_ym_";
|
||||
|
||||
ObSimpleClusterTestBase::ObSimpleClusterTestBase(const std::string &env_prefix,
|
||||
const char *log_disk_size,
|
||||
@ -156,7 +163,6 @@ int ObSimpleClusterTestBase::start()
|
||||
GCONF.enable_record_trace_log = false;
|
||||
GMEMCONF.set_server_memory_limit(10 * 1024 * 1024 * 1024ul);
|
||||
|
||||
|
||||
int32_t log_level;
|
||||
bool change_log_level = false;
|
||||
if (enable_env_warn_log_) {
|
||||
@ -223,8 +229,8 @@ int ObSimpleClusterTestBase::create_tenant(const char *tenant_name,
|
||||
{
|
||||
ObSqlString sql;
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(sql.assign_fmt("create resource unit box_ym_%s max_cpu 8, memory_size '%s', log_disk_size='%s';",
|
||||
tenant_name, memory_size, log_disk_size))) {
|
||||
} else if (OB_FAIL(sql.assign_fmt("create resource unit %s%s max_cpu 2, memory_size '%s', log_disk_size='%s';",
|
||||
UNIT_BASE, tenant_name, memory_size, log_disk_size))) {
|
||||
SERVER_LOG(WARN, "create_tenant", K(ret));
|
||||
} else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) {
|
||||
SERVER_LOG(WARN, "create_tenant", K(ret));
|
||||
@ -233,7 +239,7 @@ int ObSimpleClusterTestBase::create_tenant(const char *tenant_name,
|
||||
{
|
||||
ObSqlString sql;
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(sql.assign_fmt("create resource pool pool_ym_%s unit = 'box_ym_%s', unit_num = 1, zone_list = ('zone1');", tenant_name, tenant_name))) {
|
||||
} else if (OB_FAIL(sql.assign_fmt("create resource pool %s%s unit = '%s%s', unit_num = 1, zone_list = ('zone1');", POOL_BASE, tenant_name, UNIT_BASE, tenant_name))) {
|
||||
SERVER_LOG(WARN, "create_tenant", K(ret));
|
||||
} else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) {
|
||||
SERVER_LOG(WARN, "create_tenant", K(ret));
|
||||
@ -242,7 +248,7 @@ int ObSimpleClusterTestBase::create_tenant(const char *tenant_name,
|
||||
{
|
||||
ObSqlString sql;
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(sql.assign_fmt("create tenant %s replica_num = 1, primary_zone='zone1', resource_pool_list=('pool_ym_%s') set ob_tcp_invited_nodes='%%'%s", tenant_name, tenant_name, oracle_mode ? ", ob_compatibility_mode='oracle'" : ";"))) {
|
||||
} else if (OB_FAIL(sql.assign_fmt("create tenant %s replica_num = 1, primary_zone='zone1', resource_pool_list=('%s%s') set ob_tcp_invited_nodes='%%'%s", tenant_name, POOL_BASE, tenant_name, oracle_mode ? ", ob_compatibility_mode='oracle'" : ";"))) {
|
||||
SERVER_LOG(WARN, "create_tenant", K(ret));
|
||||
} else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) {
|
||||
SERVER_LOG(WARN, "create_tenant", K(ret));
|
||||
@ -251,7 +257,7 @@ int ObSimpleClusterTestBase::create_tenant(const char *tenant_name,
|
||||
if (change_log_level) {
|
||||
OB_LOGGER.set_log_level(log_level);
|
||||
}
|
||||
SERVER_LOG(INFO, "create tenant finish", K(ret));
|
||||
SERVER_LOG(INFO, "create tenant finish", K(ret), K(tenant_name));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -32,7 +32,7 @@ public:
|
||||
// set_bootstrap_and_create_tenant_warn_log 默认bootstrap和创建租户使用WARN日志,加速启动
|
||||
ObSimpleClusterTestBase(const std::string &env_prefix = "run_",
|
||||
const char *log_disk_size = "10G",
|
||||
const char *memory_limit = "10G");
|
||||
const char *memory_limit = "16G");
|
||||
virtual ~ObSimpleClusterTestBase();
|
||||
|
||||
int start();
|
||||
@ -62,6 +62,8 @@ protected:
|
||||
static std::string env_prefix_;
|
||||
static std::string curr_dir_;
|
||||
static bool enable_env_warn_log_;
|
||||
static const char *UNIT_BASE;
|
||||
static const char *POOL_BASE;
|
||||
};
|
||||
|
||||
} // end unittest
|
||||
|
||||
@ -20,6 +20,10 @@
|
||||
#include "logservice/ob_log_service.h"
|
||||
#include "observer/ob_server_utils.h"
|
||||
#include "env/ob_simple_server_restart_helper.h"
|
||||
#include "observer/omt/ob_tenant.h"
|
||||
#include "share/unit/ob_unit_config.h"
|
||||
#undef protected
|
||||
#undef private
|
||||
|
||||
const char *TEST_FILE_NAME = "test_observer_expand_shrink";
|
||||
const char *BORN_CASE_NAME= "ObserverExpandShink";
|
||||
@ -56,7 +60,7 @@ TEST_F(ObserverExpandShink, basic_func)
|
||||
int64_t origin_server_in_use_size, origin_server_log_total_size;
|
||||
EXPECT_EQ(OB_SUCCESS, GCTX.log_block_mgr_->get_disk_usage(origin_server_in_use_size, origin_server_log_total_size));
|
||||
GCONF.log_disk_size = GCTX.log_block_mgr_->lower_align_(2 * origin_server_log_total_size);
|
||||
sleep(6);
|
||||
sleep(11);
|
||||
int64_t new_server_in_use_size, new_server_log_total_size;
|
||||
EXPECT_EQ(OB_SUCCESS, GCTX.log_block_mgr_->get_disk_usage(new_server_in_use_size, new_server_log_total_size));
|
||||
EXPECT_EQ(new_server_log_total_size, 2 * origin_server_log_total_size);
|
||||
@ -102,6 +106,96 @@ TEST_F(ObserverExpandShink, basic_func)
|
||||
log_disk_percentage));
|
||||
}
|
||||
|
||||
template<typename ... Args>
|
||||
std::string string_format( const std::string& format, Args ... args )
|
||||
{
|
||||
int size_s = std::snprintf( nullptr, 0, format.c_str(), args ... ) + 1; // Extra space for '\0'
|
||||
if( size_s <= 0 ){ throw std::runtime_error( "Error during formatting." ); }
|
||||
auto size = static_cast<size_t>( size_s );
|
||||
std::unique_ptr<char[]> buf( new char[ size ] );
|
||||
std::snprintf( buf.get(), size, format.c_str(), args ... );
|
||||
return std::string( buf.get(), buf.get() + size - 1 ); // We don't want the '\0' inside
|
||||
}
|
||||
|
||||
TEST_F(ObserverExpandShink, resize_tenant_log_disk)
|
||||
{
|
||||
omt::ObTenantNodeBalancer::get_instance().refresh_interval_ = 1 * 1000 * 1000;
|
||||
sleep(10);
|
||||
GCONF.log_disk_size = 20 * 1024 * 1024 * 1024ul;
|
||||
bool tenant_exist = true;
|
||||
int ret = OB_SUCCESS;
|
||||
delete_tenant("runlin");
|
||||
while (true == tenant_exist && OB_SUCC(ret)) {
|
||||
if (OB_FAIL(check_tenant_exist(tenant_exist, "runlin"))) {
|
||||
SERVER_LOG(WARN, "check_tenant_exist failed", K(ret));
|
||||
}
|
||||
}
|
||||
// 保证log_disk_size变为10G生效
|
||||
sleep(3);
|
||||
int64_t log_disk_origin_assigned = GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_;
|
||||
bool bool_ret = true;
|
||||
EXPECT_EQ(bool_ret, true);
|
||||
LOG_INFO("runlin trace, before create one default tenant", KPC(GCTX.log_block_mgr_), K(log_disk_origin_assigned));
|
||||
// 每个租户的日志盘大小为2G(默认值)
|
||||
EXPECT_EQ(OB_SUCCESS, create_tenant("runlin1"));
|
||||
LOG_INFO("runlin trace, after create one default tenant", KPC(GCTX.log_block_mgr_), K(log_disk_origin_assigned));
|
||||
EXPECT_EQ(OB_SUCCESS, create_tenant("runlin2"));
|
||||
EXPECT_EQ(GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_,
|
||||
log_disk_origin_assigned + 4*1024*1024*1024ul);
|
||||
LOG_INFO("runlin trace, after create two default tenant", KPC(GCTX.log_block_mgr_), K(log_disk_origin_assigned));
|
||||
// 修改租户规格
|
||||
int64_t affected_rows = 0;
|
||||
std::string sql_str = "ALTER RESOURCE UNIT %s%s LOG_DISK_SIZE='%s'";
|
||||
{
|
||||
std::string alter_resource_failed_sql = string_format(sql_str, UNIT_BASE, "runlin1", "100G");
|
||||
EXPECT_EQ(OB_MACHINE_RESOURCE_NOT_ENOUGH, exec_write_sql_sys(alter_resource_failed_sql.c_str(), affected_rows));
|
||||
LOG_INFO("runlin trace, alter resource failed", KPC(GCTX.log_block_mgr_));
|
||||
}
|
||||
{
|
||||
std::string alter_resource_failed_sql = string_format(sql_str, UNIT_BASE, "runlin1", "1G");
|
||||
EXPECT_EQ(OB_RESOURCE_UNIT_VALUE_BELOW_LIMIT, exec_write_sql_sys(alter_resource_failed_sql.c_str(), affected_rows));
|
||||
LOG_INFO("runlin trace, alter resource below limit", KPC(GCTX.log_block_mgr_));
|
||||
}
|
||||
{
|
||||
// 扩容验证
|
||||
std::string alter_resource_runlin1 = string_format(sql_str, UNIT_BASE, "runlin1", "6G");
|
||||
std::string alter_resource_runlin2 = string_format(sql_str, UNIT_BASE, "runlin2", "6G");
|
||||
EXPECT_EQ(OB_SUCCESS, exec_write_sql_sys(alter_resource_runlin1.c_str(), affected_rows));
|
||||
EXPECT_EQ(OB_SUCCESS, exec_write_sql_sys(alter_resource_runlin2.c_str(), affected_rows));
|
||||
sleep(3);
|
||||
// 扩容操作直接执行成功
|
||||
EXPECT_EQ(GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_,
|
||||
log_disk_origin_assigned + 12*1024*1024*1024ul);
|
||||
LOG_INFO("runlin trace, alter resource to 6G success", KPC(GCTX.log_block_mgr_));
|
||||
}
|
||||
{
|
||||
// 缩容验证
|
||||
// 暂停TenantNodeBalancer的运行
|
||||
omt::ObTenantNodeBalancer::get_instance().refresh_interval_ = 10 * 1000 * 1000;
|
||||
int64_t start_ts = ObTimeUtility::current_time();
|
||||
sleep(2);
|
||||
{
|
||||
std::string alter_resource_runlin1 = string_format(sql_str, UNIT_BASE, "runlin1", "2G");
|
||||
std::string alter_resource_runlin2 = string_format(sql_str, UNIT_BASE, "runlin2", "2G");
|
||||
EXPECT_EQ(OB_SUCCESS, exec_write_sql_sys(alter_resource_runlin1.c_str(), affected_rows));
|
||||
EXPECT_EQ(OB_SUCCESS, exec_write_sql_sys(alter_resource_runlin2.c_str(), affected_rows));
|
||||
}
|
||||
const int64_t cost_ts = ObTimeUtility::current_time() - start_ts;
|
||||
// 假设sleep+执行sql的时间小于4s
|
||||
EXPECT_LE(cost_ts, 4*1000*1000);
|
||||
// 此时TenantNodeBalancer线程还未开始运行,ObServerLogBlockMgr的min_log_disk_size_for_all_tenants_不会发生变化
|
||||
EXPECT_EQ(GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_,
|
||||
log_disk_origin_assigned + 12*1024*1024*1024ul);
|
||||
omt::ObTenantNodeBalancer::get_instance().refresh_interval_ = 1 * 1000 * 1000;
|
||||
sleep(11);
|
||||
LOG_INFO("runlin trace, alter resource to 2G success", KPC(GCTX.log_block_mgr_));
|
||||
EXPECT_EQ(GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_,
|
||||
log_disk_origin_assigned + 4*1024*1024*1024ul);
|
||||
}
|
||||
EXPECT_EQ(OB_SUCCESS, delete_tenant("runlin1"));
|
||||
EXPECT_EQ(OB_SUCCESS, delete_tenant("runlin2"));
|
||||
}
|
||||
|
||||
//class ObserverExpandShinkRestart: public ObSimpleClusterTestBase
|
||||
//{
|
||||
//public:
|
||||
@ -114,56 +208,15 @@ TEST_F(ObserverExpandShink, basic_func)
|
||||
// SERVER_LOG(INFO, "create_tenant_after_restart trace", KPC(GCTX.log_block_mgr_));
|
||||
//}
|
||||
|
||||
TEST_F(ObserverExpandShink, direct_set_observer)
|
||||
{
|
||||
GCONF.log_disk_size = 1024 * 1024 * 1024ul * 3;
|
||||
int64_t log_disk_size = 0;
|
||||
int64_t log_disk_percentage = 0;
|
||||
EXPECT_EQ(OB_SUCCESS, observer::ObServerUtils::get_log_disk_info_in_config(
|
||||
log_disk_size,
|
||||
log_disk_percentage));
|
||||
GCONF.log_disk_size = 10000000 * share::ObUnitResource::UNIT_MIN_LOG_DISK_SIZE;
|
||||
EXPECT_EQ(OB_SERVER_OUTOF_DISK_SPACE, observer::ObServerUtils::get_log_disk_info_in_config(
|
||||
log_disk_size,
|
||||
log_disk_percentage));
|
||||
GCONF.log_disk_size = 3 * share::ObUnitResource::UNIT_MIN_LOG_DISK_SIZE;
|
||||
EXPECT_EQ(false, GCTX.log_block_mgr_->check_space_is_enough_(share::ObUnitResource::UNIT_MIN_LOG_DISK_SIZE - 1));
|
||||
{
|
||||
GCONF.log_disk_size = 3 * share::ObUnitResource::UNIT_MIN_LOG_DISK_SIZE;
|
||||
sleep(6);
|
||||
}
|
||||
EXPECT_EQ(OB_SUCCESS, create_tenant("tt2"));
|
||||
EXPECT_EQ(false, GCTX.log_block_mgr_->check_space_is_enough_(2*share::ObUnitResource::UNIT_MIN_LOG_DISK_SIZE-1));
|
||||
EXPECT_EQ(false, GCTX.log_block_mgr_->check_space_is_enough_(2*share::ObUnitResource::UNIT_MIN_LOG_DISK_SIZE));
|
||||
int64_t affected_rows = 0;
|
||||
std::string succ_sql_str = "ALTER RESOURCE UNIT sys_unit_config LOG_DISK_SIZE='2G'";
|
||||
EXPECT_EQ(OB_SUCCESS, exec_write_sql_sys(succ_sql_str.c_str(), affected_rows));
|
||||
sleep(2);
|
||||
// tenant_node_balancer 1 s 运行一次
|
||||
EXPECT_EQ(true, GCTX.log_block_mgr_->check_space_is_enough_(2*share::ObUnitResource::UNIT_MIN_LOG_DISK_SIZE));
|
||||
bool tenant_exist = false;
|
||||
EXPECT_EQ(OB_SUCCESS, check_tenant_exist(tenant_exist, "tt2"));
|
||||
EXPECT_EQ(true, tenant_exist);
|
||||
EXPECT_EQ(OB_SUCCESS, delete_tenant("tt2"));
|
||||
int ret = OB_SUCCESS;
|
||||
while (true == tenant_exist && OB_SUCC(ret)) {
|
||||
if (OB_FAIL(check_tenant_exist(tenant_exist, "tt2"))) {
|
||||
SERVER_LOG(WARN, "check_tenant_exist failed", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ObserverExpandShink, test_hidden_sys_tenant)
|
||||
{
|
||||
omt::ObMultiTenant *omt = GCTX.omt_;
|
||||
bool remove_tenant_succ = false;
|
||||
int64_t log_disk_size_in_use = GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_;
|
||||
EXPECT_EQ(log_disk_size_in_use, GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_);
|
||||
|
||||
share::TenantUnits units;
|
||||
EXPECT_EQ(OB_SUCCESS, omt->get_tenant_units(units, false));
|
||||
EXPECT_EQ(false, units.empty());
|
||||
bool has_sys_tenant = false;
|
||||
int64_t origin_sys_log_disk_size = 0;
|
||||
int64_t hidden_sys_log_disk_size = 0;
|
||||
for (int i = 0; i < units.count(); i++) {
|
||||
@ -190,15 +243,20 @@ TEST_F(ObserverExpandShink, test_hidden_sys_tenant)
|
||||
GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_);
|
||||
log_disk_size_in_use = GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_;
|
||||
EXPECT_EQ(OB_SUCCESS, omt->convert_hidden_to_real_sys_tenant(sys_unit_config));
|
||||
has_sys_tenant = true;
|
||||
EXPECT_EQ(log_disk_size_in_use-hidden_sys_log_disk_size+sys_unit_config.config_.log_disk_size(),
|
||||
GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_);
|
||||
log_disk_size_in_use = GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_;
|
||||
CLOG_LOG(INFO, "runlin trace after convert_hidden_to_real_sys_tenant", K(log_disk_size_in_use), KPC(GCTX.log_block_mgr_),
|
||||
K(origin_sys_log_disk_size));
|
||||
int64_t new_sys_log_disk_size = 2 *1024*1024*1024l +512*1024*1024;
|
||||
EXPECT_EQ(OB_SUCCESS, omt->update_tenant_log_disk_size(OB_SYS_TENANT_ID, new_sys_log_disk_size));
|
||||
EXPECT_EQ(true, has_sys_tenant);
|
||||
int64_t new_sys_log_disk_size = sys_unit_config.config_.log_disk_size()+512*1024*1024;
|
||||
omt::ObTenant *tenant = nullptr;
|
||||
EXPECT_EQ(OB_SUCCESS, omt->get_tenant(OB_SYS_TENANT_ID, tenant));
|
||||
share::ObUnitInfoGetter::ObTenantConfig unit_config = tenant->get_unit();
|
||||
unit_config.config_.resource_.log_disk_size_ = new_sys_log_disk_size;
|
||||
// 扩容直接成功
|
||||
EXPECT_EQ(OB_SUCCESS, omt->update_tenant_unit(unit_config));
|
||||
unit_config = tenant->get_unit();
|
||||
EXPECT_EQ(unit_config.config_.log_disk_size(), new_sys_log_disk_size);
|
||||
EXPECT_EQ(log_disk_size_in_use-sys_unit_config.config_.log_disk_size()+new_sys_log_disk_size,
|
||||
GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_);
|
||||
CLOG_LOG(INFO, "runlin trace after convert_real_to_hidden_sys_tenant", K(log_disk_size_in_use), KPC(GCTX.log_block_mgr_),
|
||||
@ -206,6 +264,7 @@ TEST_F(ObserverExpandShink, test_hidden_sys_tenant)
|
||||
}
|
||||
|
||||
|
||||
// don't has any case after this.
|
||||
TEST_F(ObserverExpandShink, paralle_set)
|
||||
{
|
||||
omt::ObTenantNodeBalancer::get_instance().refresh_interval_ = 1000 * 1000 * 1000;
|
||||
|
||||
@ -495,6 +495,17 @@ int ObLogService::get_palf_disk_usage(int64_t &used_size_byte, int64_t &total_si
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogService::get_palf_stable_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else {
|
||||
ret = palf_env_->get_stable_disk_usage(used_size_byte, total_size_byte);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogService::update_palf_options_except_disk_usage_limit_size()
|
||||
{
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
|
||||
|
||||
@ -143,7 +143,17 @@ public:
|
||||
|
||||
int update_replayable_point(const share::SCN &replayable_point);
|
||||
int get_replayable_point(share::SCN &replayable_point);
|
||||
|
||||
// @brief get palf disk usage
|
||||
// @param [out] used_size_byte
|
||||
// @param [out] total_size_byte, if in shrinking status, total_size_byte is the value after shrinking.
|
||||
// NB: total_size_byte may be smaller than used_size_byte.
|
||||
int get_palf_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte);
|
||||
|
||||
// @brief get palf disk usage
|
||||
// @param [out] used_size_byte
|
||||
// @param [out] total_size_byte, if in shrinking status, total_size_byte is the value before shrinking.
|
||||
int get_palf_stable_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte);
|
||||
// why we need update 'log_disk_size_' and 'log_disk_util_threshold' separately.
|
||||
//
|
||||
// 'log_disk_size' is a member of unit config.
|
||||
|
||||
@ -156,6 +156,11 @@ int PalfEnv::get_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte)
|
||||
return palf_env_impl_.get_disk_usage(used_size_byte, total_size_byte);
|
||||
}
|
||||
|
||||
int PalfEnv::get_stable_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte)
|
||||
{
|
||||
return palf_env_impl_.get_stable_disk_usage(used_size_byte, total_size_byte);
|
||||
}
|
||||
|
||||
int PalfEnv::get_options(PalfOptions &options)
|
||||
{
|
||||
return palf_env_impl_.get_options(options);
|
||||
|
||||
@ -86,8 +86,15 @@ public:
|
||||
|
||||
// @brief get palf disk usage
|
||||
// @param [out] used_size_byte
|
||||
// @param [out] total_size_byte
|
||||
// @param [out] total_size_byte, if in shrinking status, total_size_byte is the value after shrinking.
|
||||
// NB: total_size_byte may be smaller than used_size_byte.
|
||||
int get_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte);
|
||||
|
||||
// @brief get stable disk usage
|
||||
// @param [out] used_size_byte
|
||||
// @param [out] total_size_byte, if in shrinking status, total_size_byte is the value before shrinking.
|
||||
int get_stable_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte);
|
||||
|
||||
// @brief update options
|
||||
// @param [in] options
|
||||
int update_options(const PalfOptions &options);
|
||||
|
||||
@ -78,6 +78,7 @@ int PalfDiskOptionsWrapper::init(const PalfDiskOptions &disk_opts)
|
||||
disk_opts_for_recycling_blocks_ = disk_opts_for_stopping_writing_ = disk_opts;
|
||||
status_ = Status::NORMAL_STATUS;
|
||||
cur_unrecyclable_log_disk_size_ = 0;
|
||||
sequence_ = 0;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -89,6 +90,7 @@ void PalfDiskOptionsWrapper::reset()
|
||||
disk_opts_for_stopping_writing_.reset();
|
||||
status_ = Status::INVALID_STATUS;
|
||||
cur_unrecyclable_log_disk_size_ = -1;
|
||||
sequence_ = -1;
|
||||
}
|
||||
|
||||
int PalfDiskOptionsWrapper::update_disk_options(const PalfDiskOptions &disk_opts_for_recycling_blocks)
|
||||
@ -112,40 +114,53 @@ bool PalfDiskOptionsWrapper::need_throttling() const
|
||||
return disk_opts_for_stopping_writing_.is_valid() && cur_unrecyclable_log_disk_size_ > trigger_size;
|
||||
}
|
||||
|
||||
void PalfDiskOptionsWrapper::change_to_normal()
|
||||
// Concurrent analysis
|
||||
// BlockGCThread ConfigChangeThread
|
||||
// T1 get_disk_options
|
||||
// T2 shrink log_disk when status is SHRINKING_STATUS,
|
||||
// make disk_opts_for_recycling_blocks to new PalfDiskOptions.
|
||||
// T3 change disk_opts_for_stopping_writing for disk_opts_for_recycling_blocks
|
||||
// and make status to NORMAL_STATUS
|
||||
// This will cause write-stop, therefore, we only change status to NORMAL when sequence is same.
|
||||
// And we only update sequence when PalfDiskOptions has change.
|
||||
void PalfDiskOptionsWrapper::change_to_normal(const int64_t sequence)
|
||||
{
|
||||
ObSpinLockGuard guard(disk_opts_lock_);
|
||||
status_ = Status::NORMAL_STATUS;
|
||||
disk_opts_for_stopping_writing_ = disk_opts_for_recycling_blocks_;
|
||||
PALF_LOG(INFO, "change_to_normal", KPC(this));
|
||||
if (sequence_ == sequence && Status::SHRINKING_STATUS == status_) {
|
||||
status_ = Status::NORMAL_STATUS;
|
||||
disk_opts_for_stopping_writing_ = disk_opts_for_recycling_blocks_;
|
||||
PALF_LOG(INFO, "change_to_normal", KPC(this));
|
||||
} else {
|
||||
PALF_LOG(INFO, "sequence has changed or status not match", KPC(this), K(sequence));
|
||||
}
|
||||
}
|
||||
|
||||
int PalfDiskOptionsWrapper::update_disk_options_not_guarded_by_lock_(const PalfDiskOptions &disk_opts_for_recycling_blocks)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t curr_stop_write_limit_size =
|
||||
disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ * disk_opts_for_stopping_writing_.log_disk_utilization_limit_threshold_;
|
||||
disk_opts_for_stopping_writing_.log_disk_usage_limit_size_;
|
||||
int64_t next_stop_write_limit_size =
|
||||
disk_opts_for_recycling_blocks.log_disk_usage_limit_size_ * disk_opts_for_recycling_blocks.log_disk_utilization_limit_threshold_;
|
||||
disk_opts_for_recycling_blocks.log_disk_usage_limit_size_;
|
||||
if (false == disk_opts_for_recycling_blocks.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
} else if (disk_opts_for_recycling_blocks_ == disk_opts_for_recycling_blocks) {
|
||||
PALF_LOG(INFO, "no need update disk options", K(ret), K(disk_opts_for_recycling_blocks_), K(disk_opts_for_recycling_blocks));
|
||||
} else {
|
||||
if (Status::SHRINKING_STATUS == status_) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
PALF_LOG(WARN, "don't support shrink log disk concurrently", K(ret), KPC(this));
|
||||
} else if (disk_opts_for_recycling_blocks_ == disk_opts_for_recycling_blocks) {
|
||||
PALF_LOG(INFO, "no need update disk options", K(ret), K(disk_opts_for_recycling_blocks_), K(disk_opts_for_recycling_blocks));
|
||||
} else if (curr_stop_write_limit_size > next_stop_write_limit_size) {
|
||||
if (curr_stop_write_limit_size > next_stop_write_limit_size) {
|
||||
status_ = Status::SHRINKING_STATUS;
|
||||
// In process of shrinking, to avoid stopping writing,
|
||||
// 'disk_opts_for_stopping_writing_' is still an original value, update it
|
||||
// with 'disk_opts_for_recycling_blocks' until there is no possibility
|
||||
// caused stopping writing.
|
||||
disk_opts_for_recycling_blocks_ = disk_opts_for_recycling_blocks;
|
||||
PALF_LOG(INFO, "shrink log disk success", K(curr_stop_write_limit_size), K(next_stop_write_limit_size),
|
||||
KPC(this));
|
||||
} else {
|
||||
status_ = Status::NORMAL_STATUS;
|
||||
disk_opts_for_recycling_blocks_ = disk_opts_for_stopping_writing_ = disk_opts_for_recycling_blocks;
|
||||
PALF_LOG(INFO, "update_disk_options_not_guarded_by_lock_ success", K(curr_stop_write_limit_size), K(next_stop_write_limit_size));
|
||||
PALF_LOG(INFO, "expand log disk success", K(curr_stop_write_limit_size), K(next_stop_write_limit_size),
|
||||
KPC(this));
|
||||
}
|
||||
//always update writing_throttling_trigger_percentage_
|
||||
const int64_t new_trigger_percentage = disk_opts_for_recycling_blocks.log_disk_throttling_percentage_;
|
||||
@ -155,6 +170,7 @@ int PalfDiskOptionsWrapper::update_disk_options_not_guarded_by_lock_(const PalfD
|
||||
disk_opts_for_recycling_blocks_.log_disk_throttling_maximum_duration_ = new_maximum_duration;
|
||||
disk_opts_for_stopping_writing_.log_disk_throttling_maximum_duration_ = new_maximum_duration;
|
||||
|
||||
sequence_++;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -497,7 +513,6 @@ int PalfEnvImpl::scan_all_palf_handle_impl_director_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTimeGuard guard("PalfEnvImplStart", 0);
|
||||
// TODO by runlin: how to avoid modify 'log_disk_usage_limit_size_' after restart?
|
||||
ReloadPalfHandleImplFunctor functor(this);
|
||||
if (OB_FAIL(scan_dir(log_dir_, functor))) {
|
||||
PALF_LOG(WARN, "scan_dir failed", K(ret));
|
||||
@ -658,9 +673,11 @@ int PalfEnvImpl::try_recycle_blocks()
|
||||
PalfDiskOptions disk_opts_for_stopping_writing;
|
||||
PalfDiskOptions disk_opts_for_recycling_blocks;
|
||||
PalfDiskOptionsWrapper::Status status = PalfDiskOptionsWrapper::Status::INVALID_STATUS;
|
||||
int64_t sequence = -1;
|
||||
disk_options_wrapper_.get_disk_opts(disk_opts_for_stopping_writing,
|
||||
disk_opts_for_recycling_blocks,
|
||||
status);
|
||||
status,
|
||||
sequence);
|
||||
int64_t total_used_size_byte = 0;
|
||||
int64_t total_unrecyclable_size_byte = 0;
|
||||
int64_t total_size_to_recycle_blocks = disk_opts_for_recycling_blocks.log_disk_usage_limit_size_;
|
||||
@ -700,7 +717,7 @@ int PalfEnvImpl::try_recycle_blocks()
|
||||
int64_t oldest_palf_id = INVALID_PALF_ID;
|
||||
if (OB_SUCC(ret) && PalfDiskOptionsWrapper::Status::SHRINKING_STATUS == status) {
|
||||
if (total_used_size_byte < usable_disk_size_to_recycle_blocks) {
|
||||
disk_options_wrapper_.change_to_normal();
|
||||
disk_options_wrapper_.change_to_normal(sequence);
|
||||
PALF_LOG(INFO, "change_to_normal success", K(disk_options_wrapper_),
|
||||
K(total_used_size_byte), K(usable_disk_size_to_recycle_blocks));
|
||||
}
|
||||
@ -726,7 +743,7 @@ int PalfEnvImpl::try_recycle_blocks()
|
||||
LOG_DBA_ERROR(OB_LOG_OUTOF_DISK_SPACE, "msg", "log disk space is almost full", "ret", tmp_ret,
|
||||
"total_size(MB)", disk_opts_for_recycling_blocks.log_disk_usage_limit_size_/MB,
|
||||
"used_size(MB)", total_used_size_byte/MB,
|
||||
"used_percent(%)", (total_used_size_byte* 100) / (disk_opts_for_recycling_blocks.log_disk_usage_limit_size_ + 1),
|
||||
"used_percent(%)", (total_used_size_byte* 100) / (disk_opts_for_stopping_writing.log_disk_usage_limit_size_ + 1),
|
||||
"warn_size(MB)", (total_size_to_recycle_blocks*disk_opts_for_recycling_blocks.log_disk_utilization_threshold_)/100/MB,
|
||||
"warn_percent(%)", disk_opts_for_recycling_blocks.log_disk_utilization_threshold_,
|
||||
"limit_size(MB)", (total_size_to_recycle_blocks*disk_opts_for_recycling_blocks.log_disk_utilization_limit_threshold_)/100/MB,
|
||||
@ -846,6 +863,20 @@ int PalfEnvImpl::get_disk_usage(int64_t &used_size_byte, int64_t &total_usable_s
|
||||
return ret;
|
||||
}
|
||||
|
||||
int PalfEnvImpl::get_stable_disk_usage(int64_t &used_size_byte, int64_t &total_usable_size_byte)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
constexpr int64_t MB = 1024 * 1024;
|
||||
PalfDiskOptions disk_options = disk_options_wrapper_.get_disk_opts_for_stopping_writing();
|
||||
if (OB_FAIL(get_disk_usage_(used_size_byte))) {
|
||||
PALF_LOG(WARN, "get_disk_usage_ failed", K(ret));
|
||||
} else {
|
||||
total_usable_size_byte = disk_options.log_disk_usage_limit_size_;
|
||||
PALF_LOG(INFO, "get_stable_disk_usage", K(ret), "capacity(MB):", total_usable_size_byte/MB, "used(MB):", used_size_byte/MB);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int PalfEnvImpl::update_options(const PalfOptions &options)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -83,10 +83,14 @@ public:
|
||||
MAX_STATUS = 3
|
||||
};
|
||||
|
||||
static bool is_shrinking(const Status &status)
|
||||
{ return Status::SHRINKING_STATUS == status; }
|
||||
|
||||
PalfDiskOptionsWrapper() : disk_opts_for_stopping_writing_(),
|
||||
disk_opts_for_recycling_blocks_(),
|
||||
status_(Status::INVALID_STATUS),
|
||||
cur_unrecyclable_log_disk_size_(0),
|
||||
sequence_(-1),
|
||||
disk_opts_lock_(common::ObLatchIds::PALF_ENV_LOCK) {}
|
||||
~PalfDiskOptionsWrapper() { reset(); }
|
||||
|
||||
@ -94,7 +98,7 @@ public:
|
||||
void reset();
|
||||
int update_disk_options(const PalfDiskOptions &disk_opts);
|
||||
|
||||
void change_to_normal();
|
||||
void change_to_normal(const int64_t sequence);
|
||||
const PalfDiskOptions& get_disk_opts_for_stopping_writing() const
|
||||
{
|
||||
ObSpinLockGuard guard(disk_opts_lock_);
|
||||
@ -110,12 +114,14 @@ public:
|
||||
|
||||
void get_disk_opts(PalfDiskOptions &disk_opts_for_stopping_writing,
|
||||
PalfDiskOptions &disk_opts_for_recycling_blocks,
|
||||
Status &status) const
|
||||
Status &status,
|
||||
int64_t &sequence) const
|
||||
{
|
||||
ObSpinLockGuard guard(disk_opts_lock_);
|
||||
disk_opts_for_stopping_writing = disk_opts_for_stopping_writing_;
|
||||
disk_opts_for_recycling_blocks = disk_opts_for_recycling_blocks_;
|
||||
status = status_;
|
||||
sequence = sequence_;
|
||||
}
|
||||
|
||||
void get_throttling_options(PalfThrottleOptions &options)
|
||||
@ -131,11 +137,20 @@ public:
|
||||
bool is_shrinking() const
|
||||
{
|
||||
ObSpinLockGuard guard(disk_opts_lock_);
|
||||
return Status::SHRINKING_STATUS == status_;
|
||||
return is_shrinking(status_);
|
||||
}
|
||||
|
||||
void set_stop_writing_disk_options_with_status(const PalfDiskOptions &new_opts,
|
||||
const Status &status)
|
||||
{
|
||||
ObSpinLockGuard guard(disk_opts_lock_);
|
||||
status_ = status;
|
||||
disk_opts_for_stopping_writing_ = new_opts;
|
||||
}
|
||||
static constexpr int64_t MB = 1024*1024ll;
|
||||
TO_STRING_KV(K_(disk_opts_for_stopping_writing), K_(disk_opts_for_recycling_blocks), K_(status),
|
||||
"cur_unrecyclable_log_disk_size(MB)", cur_unrecyclable_log_disk_size_/MB);
|
||||
"cur_unrecyclable_log_disk_size(MB)", cur_unrecyclable_log_disk_size_/MB,
|
||||
K_(sequence));
|
||||
|
||||
private:
|
||||
int update_disk_options_not_guarded_by_lock_(const PalfDiskOptions &new_opts);
|
||||
@ -148,6 +163,7 @@ private:
|
||||
PalfDiskOptions disk_opts_for_recycling_blocks_;
|
||||
Status status_;
|
||||
int64_t cur_unrecyclable_log_disk_size_;
|
||||
int64_t sequence_;
|
||||
mutable ObSpinLock disk_opts_lock_;
|
||||
};
|
||||
|
||||
@ -238,6 +254,7 @@ public:
|
||||
int try_recycle_blocks();
|
||||
bool check_disk_space_enough() override final;
|
||||
int get_disk_usage(int64_t &used_size_byte, int64_t &total_usable_size_byte);
|
||||
int get_stable_disk_usage(int64_t &used_size_byte, int64_t &total_usable_size_byte);
|
||||
int update_options(const PalfOptions &options);
|
||||
int get_options(PalfOptions &options);
|
||||
int64_t get_rebuild_replica_log_lag_threshold() const
|
||||
|
||||
@ -67,6 +67,11 @@ bool PalfDiskOptions::operator==(const PalfDiskOptions &palf_disk_options) const
|
||||
&& log_writer_parallelism_ == palf_disk_options.log_writer_parallelism_;
|
||||
}
|
||||
|
||||
bool PalfDiskOptions::operator!=(const PalfDiskOptions &palf_disk_options) const
|
||||
{
|
||||
return !this->operator==(palf_disk_options);
|
||||
}
|
||||
|
||||
PalfDiskOptions &PalfDiskOptions::operator=(const PalfDiskOptions &other)
|
||||
{
|
||||
log_disk_usage_limit_size_ = other.log_disk_usage_limit_size_;
|
||||
|
||||
@ -39,6 +39,7 @@ struct PalfDiskOptions
|
||||
void reset();
|
||||
bool is_valid() const;
|
||||
bool operator==(const PalfDiskOptions &rhs) const;
|
||||
bool operator!=(const PalfDiskOptions &rhs) const;
|
||||
PalfDiskOptions &operator=(const PalfDiskOptions &other);
|
||||
int64_t log_disk_usage_limit_size_;
|
||||
int log_disk_utilization_threshold_;
|
||||
|
||||
@ -1003,7 +1003,9 @@ int ObMultiTenant::update_tenant_unit_no_lock(const ObUnitInfoGetter::ObTenantCo
|
||||
const double max_cpu = static_cast<double>(unit.config_.max_cpu());
|
||||
const uint64_t tenant_id = unit.tenant_id_;
|
||||
int64_t allowed_mem_limit = 0;
|
||||
|
||||
ObUnitInfoGetter::ObTenantConfig allowed_new_unit;
|
||||
ObUnitInfoGetter::ObTenantConfig old_unit;
|
||||
int64_t allowed_new_log_disk_size = 0;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
@ -1012,13 +1014,27 @@ int ObMultiTenant::update_tenant_unit_no_lock(const ObUnitInfoGetter::ObTenantCo
|
||||
} else if (OB_ISNULL(tenant)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("tenant is nullptr", K(tenant_id));
|
||||
} else if (OB_FAIL(write_update_tenant_unit_slog(unit))) {
|
||||
} else if (OB_FAIL(old_unit.assign(tenant->get_unit()))) {
|
||||
LOG_ERROR("fail to assign old unit failed", K(tenant_id), K(unit));
|
||||
} else if (OB_FAIL(update_tenant_memory(tenant_id, unit.config_.memory_size(), allowed_mem_limit))) {
|
||||
LOG_WARN("fail to update tenant memory", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(update_tenant_log_disk_size(tenant_id,
|
||||
unit.config_.log_disk_size(),
|
||||
old_unit.config_.log_disk_size(),
|
||||
allowed_new_log_disk_size))) {
|
||||
LOG_WARN("fail to update tenant log disk size", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(construct_allowed_unit_config(allowed_new_log_disk_size,
|
||||
unit,
|
||||
allowed_new_unit))) {
|
||||
LOG_WARN("fail to construct_allowed_unit_config", K(allowed_new_log_disk_size),
|
||||
K(allowed_new_unit));
|
||||
} else if (OB_FAIL(write_update_tenant_unit_slog(allowed_new_unit))) {
|
||||
LOG_WARN("fail to write tenant meta slog", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tenant->update_thread_cnt(max_cpu))) {
|
||||
} else if (OB_FAIL(update_tenant_freezer_mem_limit(tenant_id, unit.config_.memory_size(), allowed_mem_limit))) {
|
||||
LOG_WARN("fail to update_tenant_freezer_mem_limit", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(tenant->update_thread_cnt(max_cpu))) {
|
||||
LOG_WARN("fail to update mtl module thread_cnt", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(update_tenant_log_disk_size(tenant_id, unit.config_.log_disk_size()))) {
|
||||
LOG_WARN("fail to update tenant log disk size", K(ret), K(tenant_id));
|
||||
} else if (FALSE_IT(tenant->set_unit_memory_size(allowed_mem_limit))) {
|
||||
} else if (FALSE_IT(tenant->set_unit_memory_size(unit.config_.memory_size()))) {
|
||||
// unreachable
|
||||
} else {
|
||||
if (tenant->unit_min_cpu() != min_cpu) {
|
||||
@ -1027,7 +1043,7 @@ int ObMultiTenant::update_tenant_unit_no_lock(const ObUnitInfoGetter::ObTenantCo
|
||||
if (tenant->unit_max_cpu() != max_cpu) {
|
||||
tenant->set_unit_max_cpu(max_cpu);
|
||||
}
|
||||
tenant->set_tenant_unit(unit);
|
||||
tenant->set_tenant_unit(allowed_new_unit);
|
||||
LOG_INFO("succecc to set tenant unit config", K(unit), K(allowed_mem_limit));
|
||||
}
|
||||
|
||||
@ -1047,6 +1063,34 @@ int ObMultiTenant::update_tenant_memory(const ObUnitInfoGetter::ObTenantConfig &
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMultiTenant::construct_allowed_unit_config(const int64_t allowed_new_log_disk_size,
|
||||
const ObUnitInfoGetter::ObTenantConfig &expected_unit_config,
|
||||
ObUnitInfoGetter::ObTenantConfig &allowed_new_unit)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (0 >= allowed_new_log_disk_size
|
||||
|| !expected_unit_config.is_valid()) {
|
||||
ret= OB_INVALID_ARGUMENT;
|
||||
} else if (OB_FAIL(allowed_new_unit.assign(expected_unit_config))) {
|
||||
LOG_ERROR("fail to assign new unit", K(allowed_new_log_disk_size), K(expected_unit_config));
|
||||
} else {
|
||||
// construct allowed resource.
|
||||
ObUnitResource allowed_resource(
|
||||
expected_unit_config.config_.max_cpu(),
|
||||
expected_unit_config.config_.min_cpu(),
|
||||
expected_unit_config.config_.memory_size(),
|
||||
allowed_new_log_disk_size,
|
||||
expected_unit_config.config_.max_iops(),
|
||||
expected_unit_config.config_.min_iops(),
|
||||
expected_unit_config.config_.iops_weight());
|
||||
if (OB_FAIL(allowed_new_unit.config_.update_unit_resource(allowed_resource))) {
|
||||
LOG_WARN("update_unit_resource failed", K(allowed_new_log_disk_size), K(allowed_new_unit),
|
||||
K(allowed_resource));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMultiTenant::update_tenant_unit(const ObUnitInfoGetter::ObTenantConfig &unit)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1110,33 +1154,104 @@ int ObMultiTenant::update_tenant_memory(const uint64_t tenant_id, const int64_t
|
||||
}
|
||||
|
||||
int ObMultiTenant::update_tenant_log_disk_size(const uint64_t tenant_id,
|
||||
const int64_t expected_log_disk_size)
|
||||
const int64_t expected_log_disk_size,
|
||||
const int64_t old_log_disk_size,
|
||||
int64_t &allowed_new_log_disk_size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t used_log_disk_size = 0, palf_log_disk_size = 0;
|
||||
bool can_update_log_disk_size_with_expected_log_disk = false;
|
||||
// 'expected_log_disk_size' is the latest log disk size record in __all_unit_config.
|
||||
// 'old_log_disk_size' is current log disk size in ObTenant.
|
||||
// 'allowed_new_log_disk_size' is current allowed log disk size when update log disk.
|
||||
//
|
||||
// To avoid overselling, we can not use 'expected_log_disk_size' to update unit config which will save in slog.
|
||||
// therefore, we need constuct a virtual log disk size which named with 'allowed_new_log_disk_size'.
|
||||
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
|
||||
int64_t old_log_disk_size = 0;
|
||||
int64_t unused_log_disk_size = 0;
|
||||
if (OB_SUCC(guard.switch_to(tenant_id))) {
|
||||
ObLogService *log_service = MTL(ObLogService *);
|
||||
if (OB_ISNULL(log_service)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
} else if (OB_FAIL(log_service->get_palf_disk_usage(unused_log_disk_size, old_log_disk_size))) {
|
||||
LOG_WARN("failed to get_palf_disk_usage", K(ret), K(unused_log_disk_size), K(old_log_disk_size),
|
||||
K(expected_log_disk_size));
|
||||
} else if(OB_FAIL(GCTX.log_block_mgr_->update_tenant(old_log_disk_size, expected_log_disk_size))) {
|
||||
LOG_WARN("failed to update_tenant in ObServerLogBlockMgr", K(ret), K(old_log_disk_size),
|
||||
K(expected_log_disk_size));
|
||||
} else if (OB_FAIL(log_service->get_palf_stable_disk_usage(used_log_disk_size, palf_log_disk_size))) {
|
||||
LOG_WARN("fail to get_palf_stable_disk_usage", K(tenant_id), K(old_log_disk_size), K(expected_log_disk_size));
|
||||
// The standard for determing whether it's in shrinking or expanding status:
|
||||
// 1. If 'palf_log_disk_size' is smaller than or equal to 'expected_log_disk_size', it's in expanding status.
|
||||
// 2. If 'palf_log_disk_size' is greater than 'expected_log_disk_size', it's in shrinking status.
|
||||
//
|
||||
// For shrinking log disk, we don't update ObTenantConfig of ObTenant to new ObTenantConfig until shrinking successfully.
|
||||
//
|
||||
// NB: All fields of new ObTenantConfig except log_disk_size has been updated in case of shrinking log disk.
|
||||
//
|
||||
// For example:
|
||||
// 1. before shrinkg log disk successfully, and then expand log disk.
|
||||
// - At T1 timestamp, the original log disk is 100G, and update it to 50G, we will construct 'new_unit' with
|
||||
// 100G, but update palf with 50G because original log disk size is greater than new log disk size.
|
||||
// - At T2 timestamp, the log disk size in current ObTenantConfig is 100G, and we update it to 80G, there are
|
||||
// two scenarios:
|
||||
// 1. if 'palf_log_disk_size' which get from palf is 100G, we think palf is still in shrinking status. and we will
|
||||
// construct 'new_unit' with 100G because 'palf_log_disk_size' is greater than new log disk size(80G). but udpate
|
||||
// palf with 80G
|
||||
// 2. if 'palf_log_disk_size' which get from palf is 50G, we think palf has been in normal status. and we will
|
||||
// construct 'new_unit' with 80G because 'palf_log_disk_size' is smaller than new log disk size(80G), but udpate
|
||||
// palf with 80G.
|
||||
} else if (FALSE_IT(can_update_log_disk_size_with_expected_log_disk = (expected_log_disk_size >= palf_log_disk_size))) {
|
||||
// For expanding log disk, we can update 'allowed_new_log_disk_size' to 'expected_log_disk_size' directlly.
|
||||
} else if (can_update_log_disk_size_with_expected_log_disk && FALSE_IT(allowed_new_log_disk_size = expected_log_disk_size)) {
|
||||
// For shrinking log disk, we still update log disk size of 'new_unit' to 'old_log_disk_size'.
|
||||
} else if (!can_update_log_disk_size_with_expected_log_disk && FALSE_IT(allowed_new_log_disk_size = old_log_disk_size)) {
|
||||
// case 1: for shrinking log disk, shrinking log disk from 100G to 50G.
|
||||
// - At T1 timestamp, 'expected_log_disk_size' is 50G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 100G.
|
||||
// the log disk size record in slog is 100G, and we will update log disk size used for palf to 50G, but not update log
|
||||
// disk which has assigned in ObServerLogBlockMGR.
|
||||
// - At T2 timestamp, 'expected_log_disk_size' is still 50G, 'old_log_disk_size' is still 100G, however, 'allowed_new_log_disk_size'
|
||||
// is 50G because of shrinking log disk has been successfully, the log disk record in slog is 50G, and then we will update log disk
|
||||
// size used for palf to 50G again but has no effect, log disk assigned in ObServerLogBlockMGR update to 50G(assume there is only one tenant).
|
||||
//
|
||||
// case 2: for expanding log disk, expanding log disk from 100G to 150G.
|
||||
// - At T1 timestamp, 'expected_log_disk_size' is 150G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 150G.
|
||||
// the log disk size record in slog is 150G, and then, we will update log disk size used for palf to 150G, the log disk
|
||||
// which has assigned in ObServerLogBlockMGR updaet to 150G(assume there is only one tenant).
|
||||
//
|
||||
// case 3: for shrinking log disk, shrinking log disk from 100G to 50G, and then shrinking log disk from 50G to 25G.
|
||||
// - At T1 timestamp, 'expected_log_disk_size' is 50G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 100G.
|
||||
// the log disk size record in slog is 100G, and we will update log disk size used for palf to 50G, but not update log
|
||||
// disk which has assigned in ObServerLogBlockMGR.
|
||||
// - At T2 timestamp, 'expected_log_disk_size' is 25G, 'old_log_disk_size' is still 100G, however, there are two possibility value for
|
||||
// 'allowed_new_log_disk_size':
|
||||
// 1. the value is 100G because of last shrinking log disk has not been successfully, the log disk record in slog is still 100G
|
||||
// and then we will update log disk size used for palf to 25G, but not update log disk assigned in ObServerLogBlockMGR.
|
||||
// At T3 timestamp, 'expected_log_disk_size' is 25G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 25G,
|
||||
// the log disk record in slog is 25G, the log disk assigned in ObServerLogBlockMGR is 25G.
|
||||
// 2. the value is 50G because of last shrinking log disk has been successfully, the log disk record in slog is 50G
|
||||
// and then we will update log disk size used for palf to 25G, update log disk assigned in ObServerLogBlockMGR to 50G(assume there is only one tenant).
|
||||
// At T3 timestamp, 'expected_log_disk_size' is 25G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 25G,
|
||||
// the log disk record in slog is 25G, the log disk assigned in ObServerLogBlockMGR is 25G.
|
||||
//
|
||||
// case 4: for shrinking log disk, shrinking log disk from 100G to 50G, and then expanding log disk from 50G to 80G.
|
||||
// - At T1 timestamp, 'expected_log_disk_size' is 50G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 100G.
|
||||
// the log disk size record in slog is 100G, and we will update log disk size used for palf to 50G, but not update log
|
||||
// disk which has assigned in ObServerLogBlockMGR.
|
||||
// - At T2 timestamp, 'expected_log_disk_size' is 80G, 'old_log_disk_size' is still 100G, however, there are two possibility value for
|
||||
// 'allowed_new_log_disk_size':
|
||||
// 1. the value is 100G because of last shrinking log disk has not been successfully, the log disk record in slog is still 100G
|
||||
// and then we will update log disk size used for palf to 80G, but not update log disk assigned in ObServerLogBlockMGR.
|
||||
// At T3 timestamp, 'expected_log_disk_size' is 80G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 80G,
|
||||
// the log disk record in slog is 80G, the log disk assigned in ObServerLogBlockMGR is 80G.
|
||||
// 2. the value is 80G because of last shrinking log disk has been successfully, the log disk record in slog is 80G
|
||||
// and then we will update log disk size used for palf to 80G, update log disk assigned in ObServerLogBlockMGR to 80G(assume there is only one tenant).
|
||||
// At T3 timestamp, 'expected_log_disk_size' is 25G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 25G,
|
||||
// the log disk record in slog is 25G, the log disk assigned in ObServerLogBlockMGR is 25G.
|
||||
//
|
||||
} else if (OB_FAIL(GCTX.log_block_mgr_->update_tenant(old_log_disk_size, allowed_new_log_disk_size))) {
|
||||
LOG_WARN("failed to update teannt int ObServerLogBlockMGR", K(ret), K(tenant_id), K(expected_log_disk_size),
|
||||
K(old_log_disk_size), K(allowed_new_log_disk_size));
|
||||
} else if (OB_FAIL(log_service->update_log_disk_usage_limit_size(expected_log_disk_size))) {
|
||||
LOG_WARN("failed to update_log_disk_usage_limit_size", K(ret), K(tenant_id), K(expected_log_disk_size),
|
||||
K(old_log_disk_size), K(allowed_new_log_disk_size));
|
||||
GCTX.log_block_mgr_->abort_update_tenant(allowed_new_log_disk_size, old_log_disk_size);
|
||||
} else {
|
||||
if (OB_FAIL(log_service->update_log_disk_usage_limit_size(expected_log_disk_size))) {
|
||||
LOG_WARN("failed to update_log_disk_usage_limit_size", K(ret), K(tenant_id),
|
||||
K(expected_log_disk_size));
|
||||
} else {
|
||||
LOG_INFO("update_tenant_log_disk_size success", K(ret), K(tenant_id),
|
||||
K(expected_log_disk_size));
|
||||
}
|
||||
if (false == is_virtual_tenant_id(tenant_id) && OB_FAIL(ret)) {
|
||||
GCTX.log_block_mgr_->abort_update_tenant(expected_log_disk_size, old_log_disk_size);
|
||||
}
|
||||
LOG_INFO("update_log_disk_usage_limit_size success", K(ret), K(tenant_id), K(expected_log_disk_size),
|
||||
K(old_log_disk_size), K(allowed_new_log_disk_size));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
||||
@ -112,7 +112,9 @@ public:
|
||||
int update_tenant_memory(const uint64_t tenant_id, const int64_t mem_limit, int64_t &allowed_mem_limit);
|
||||
int update_tenant_memory(const share::ObUnitInfoGetter::ObTenantConfig &unit);
|
||||
int update_tenant_log_disk_size(const uint64_t tenant_id,
|
||||
const int64_t expected_log_disk_size);
|
||||
const int64_t expected_log_disk_size,
|
||||
const int64_t old_log_disk_size,
|
||||
int64_t &allowed_log_disk_size);
|
||||
int modify_tenant_io(const uint64_t tenant_id, const share::ObUnitConfig &unit_config);
|
||||
int update_tenant_config(uint64_t tenant_id);
|
||||
int update_palf_config();
|
||||
@ -180,6 +182,9 @@ protected:
|
||||
int remove_tenant(const uint64_t tenant_id, bool &remove_tenant_succ);
|
||||
uint32_t get_tenant_lock_bucket_idx(const uint64_t tenant_id);
|
||||
int update_tenant_unit_no_lock(const share::ObUnitInfoGetter::ObTenantConfig &unit);
|
||||
int construct_allowed_unit_config(const int64_t allowed_log_disk_size,
|
||||
const share::ObUnitInfoGetter::ObTenantConfig &expected_unit_config,
|
||||
share::ObUnitInfoGetter::ObTenantConfig &allowed_unit);
|
||||
|
||||
protected:
|
||||
static const int DEL_TRY_TIMES = 30;
|
||||
|
||||
@ -301,7 +301,7 @@ int ObDiskUsageReportTask::count_tenant_clog(const uint64_t tenant_id)
|
||||
if (OB_ISNULL(log_svr = MTL(ObLogService*))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "log service is null", K(ret), KP(log_svr));
|
||||
} else if (OB_FAIL(log_svr->get_palf_disk_usage(clog_space, size_limit))) {
|
||||
} else if (OB_FAIL(log_svr->get_palf_stable_disk_usage(clog_space, size_limit))) {
|
||||
STORAGE_LOG(WARN, "failed to get the disk space that clog used", K(ret));
|
||||
} else {
|
||||
report_key.file_type_ = ObDiskReportFileType::OB_DISK_REPORT_TENANT_CLOG_DATA;
|
||||
|
||||
@ -22,7 +22,6 @@
|
||||
#include "storage/tx_storage/ob_tenant_freezer.h"
|
||||
#include "logservice/ob_log_service.h"
|
||||
#include "logservice/palf/log_define.h"
|
||||
#include "logservice/palf/palf_env.h"
|
||||
#include "logservice/palf/lsn.h"
|
||||
#include "logservice/archiveservice/ob_archive_service.h"
|
||||
#include "storage/tx_storage/ob_ls_handle.h"
|
||||
@ -203,17 +202,13 @@ bool ObCheckPointService::get_disk_usage_threshold_(int64_t &threshold)
|
||||
bool get_disk_usage_threshold_success = false;
|
||||
// avod clog disk full
|
||||
logservice::ObLogService *log_service = nullptr;
|
||||
PalfEnv *palf_env = nullptr;
|
||||
if (OB_ISNULL(log_service = MTL(logservice::ObLogService *))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "get_log_service failed", K(ret));
|
||||
} else if (OB_ISNULL(palf_env = log_service->get_palf_env())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "get_palf_env failed", K(ret));
|
||||
} else {
|
||||
int64_t used_size = 0;
|
||||
int64_t total_size = 0;
|
||||
if (OB_FAIL(palf_env->get_disk_usage(used_size, total_size))) {
|
||||
if (OB_FAIL(log_service->get_palf_disk_usage(used_size, total_size))) {
|
||||
STORAGE_LOG(WARN, "get_disk_usage failed", K(ret), K(used_size), K(total_size));
|
||||
} else {
|
||||
threshold = total_size * NEED_FLUSH_CLOG_DISK_PERCENT / 100;
|
||||
|
||||
Reference in New Issue
Block a user