diff --git a/mittest/logservice/CMakeLists.txt b/mittest/logservice/CMakeLists.txt index 61e45dc5cf..c28b80c359 100644 --- a/mittest/logservice/CMakeLists.txt +++ b/mittest/logservice/CMakeLists.txt @@ -38,6 +38,6 @@ ob_unittest_clog(test_ob_simple_log_config_change_mock_ele test_ob_simple_log_co ob_unittest_clog(test_ob_simple_log_arb_mock_ele test_ob_simple_log_arb_mock_ele.cpp) ob_unittest_clog(test_ob_simple_log_flashback_arb test_ob_simple_log_flashback_arb.cpp) ob_unittest_clog(test_ob_simple_log_restart test_ob_simple_log_restart.cpp) - +ob_unittest_clog(test_ob_simple_log_disk_mgr test_ob_simple_log_disk_mgr.cpp) add_subdirectory(archiveservice) diff --git a/mittest/logservice/env/ob_simple_arb_server.h b/mittest/logservice/env/ob_simple_arb_server.h index 525b45b6f8..130edf25b9 100644 --- a/mittest/logservice/env/ob_simple_arb_server.h +++ b/mittest/logservice/env/ob_simple_arb_server.h @@ -80,6 +80,8 @@ public: {return &allocator_;} virtual int update_disk_opts(const PalfDiskOptions &opts) override final {return OB_NOT_SUPPORTED;}; + virtual int get_disk_opts(PalfDiskOptions &opts) override final + {return OB_NOT_SUPPORTED;}; virtual int get_palf_env(PalfEnv *&palf_env) {return OB_NOT_SUPPORTED;}; bool is_valid() const override final diff --git a/mittest/logservice/env/ob_simple_log_cluster_env.cpp b/mittest/logservice/env/ob_simple_log_cluster_env.cpp index bbcebcb6c0..9d516287ed 100755 --- a/mittest/logservice/env/ob_simple_log_cluster_env.cpp +++ b/mittest/logservice/env/ob_simple_log_cluster_env.cpp @@ -426,6 +426,37 @@ int ObSimpleLogClusterTestEnv::create_paxos_group_with_arb( return ret; } +int ObSimpleLogClusterTestEnv::update_disk_options(const int64_t server_id, + const int64_t recycle_threshold, + const int64_t write_stop_threshold) +{ + int ret = OB_SUCCESS; + const int64_t MB = 1024 * 1024; + PalfOptions opts; + auto cluster = get_cluster(); + if (server_id >= 0 && server_id < cluster.size()) { + ObTenantEnv::set_tenant(cluster[server_id]->get_tenant_base()); + auto srv = cluster[server_id]; + if (true == srv->is_arb_server()) { + ret = OB_NOT_SUPPORTED; + } else { + auto palf_env_impl = dynamic_cast(srv->get_palf_env()); + if (OB_FAIL(palf_env_impl->get_options(opts))) { + PALF_LOG(ERROR, "get_optiosn failed", K(ret), K(server_id)); + } else { + // palf_env_impl->disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_utilization_threshold_ + // = recycle_threshold; + // palf_env_impl->disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_utilization_limit_threshold_ + // = write_stop_threshold; + opts.disk_options_.log_disk_utilization_threshold_ = recycle_threshold; + opts.disk_options_.log_disk_utilization_limit_threshold_ = write_stop_threshold; + ret = srv->update_disk_opts(opts.disk_options_); + } + } + } + return ret; +} + int ObSimpleLogClusterTestEnv::update_disk_options(const int64_t server_id, const int64_t file_block_num) { int ret = OB_SUCCESS; @@ -443,13 +474,40 @@ int ObSimpleLogClusterTestEnv::update_disk_options(const int64_t server_id, cons PALF_LOG(ERROR, "get_optiosn failed", K(ret), K(server_id)); } else { opts.disk_options_.log_disk_usage_limit_size_ = file_block_num * PALF_PHY_BLOCK_SIZE; - ret = palf_env_impl->update_options(opts); + ret = srv->update_disk_opts(opts.disk_options_); } } } return ret; } +int ObSimpleLogClusterTestEnv::update_disk_options(const int64_t log_block_num) +{ + int ret = OB_SUCCESS; + auto cluster = get_cluster(); + for (int64_t server_id = 0; server_id < cluster.size(); server_id++) { + ret = update_disk_options(server_id, log_block_num); + } + return ret; +} + +int ObSimpleLogClusterTestEnv::get_disk_options(const int64_t server_id, + PalfDiskOptions &opts) +{ + int ret = OB_SUCCESS; + auto cluster = get_cluster(); + if (server_id >= 0 && server_id < cluster.size()) { + ObTenantEnv::set_tenant(cluster[server_id]->get_tenant_base()); + auto srv = cluster[server_id]; + if (true == srv->is_arb_server()) { + ret = OB_NOT_SUPPORTED; + } else { + ret = srv->get_disk_opts(opts); + } + } + return ret; +} + int ObSimpleLogClusterTestEnv::restart_paxos_groups() { int ret = OB_SUCCESS; @@ -1394,5 +1452,33 @@ bool ObSimpleLogClusterTestEnv::is_upgraded(PalfHandleImplGuard &leader, const i return has_upgraded; } +int ObSimpleLogClusterTestEnv::wait_until_disk_space_to(const int64_t server_id, + const int64_t expect_log_disk_space) +{ + int ret = OB_SUCCESS; + auto cluster = get_cluster(); + if (server_id >= 0 && server_id < cluster.size()) { + ObTenantEnv::set_tenant(cluster[server_id]->get_tenant_base()); + auto srv = cluster[server_id]; + if (true == srv->is_arb_server()) { + ret = OB_NOT_SUPPORTED; + } else { + auto palf_env_impl = dynamic_cast(srv->get_palf_env()); + int64_t used_log_disk_space = INT64_MAX; + int64_t total_log_disk_space = 0; + while (used_log_disk_space >= expect_log_disk_space) { + if (OB_FAIL(palf_env_impl->get_disk_usage(used_log_disk_space, total_log_disk_space))) { + PALF_LOG(WARN, "get_disk_usage failed", K(used_log_disk_space), K(total_log_disk_space)); + } else { + usleep(10*1000); + PALF_LOG(INFO, "disk_space is not enough", K(used_log_disk_space), K(expect_log_disk_space)); + } + } + PALF_LOG(INFO, "wait_until_disk_space_to success", K(used_log_disk_space), K(expect_log_disk_space)); + } + } + return ret; +} + } // end namespace unittest } // end namespace oceanbase diff --git a/mittest/logservice/env/ob_simple_log_cluster_env.h b/mittest/logservice/env/ob_simple_log_cluster_env.h index 5e6c311268..a5f31f5935 100644 --- a/mittest/logservice/env/ob_simple_log_cluster_env.h +++ b/mittest/logservice/env/ob_simple_log_cluster_env.h @@ -175,6 +175,9 @@ public: PalfHandleImplGuard &leader); virtual int delete_paxos_group(const int64_t id); virtual int update_disk_options(const int64_t server_id, const int64_t log_block_number); + virtual int update_disk_options(const int64_t server_id, const int64_t recycle_threshold, const int64_t write_stop_threshold); + virtual int update_disk_options(const int64_t log_block_number); + virtual int get_disk_options(const int64_t server_id, PalfDiskOptions &opts); virtual int restart_paxos_groups(); virtual int restart_server(const int64_t server_id); virtual int remove_dir(); @@ -245,6 +248,7 @@ public: void set_disk_options_for_throttling(PalfEnvImpl &palf_env_impl); bool is_degraded(const PalfHandleImplGuard &leader, const int64_t degraded_server_idx); bool is_upgraded(PalfHandleImplGuard &leader, const int64_t palf_id); + int wait_until_disk_space_to(const int64_t server_id, const int64_t expect_log_disk_space); public: static int64_t palf_id_; private: diff --git a/mittest/logservice/env/ob_simple_log_server.cpp b/mittest/logservice/env/ob_simple_log_server.cpp index aa85399f66..f4a0c7ea81 100644 --- a/mittest/logservice/env/ob_simple_log_server.cpp +++ b/mittest/logservice/env/ob_simple_log_server.cpp @@ -154,6 +154,8 @@ int ObSimpleLogServer::simple_init( SERVER_LOG(WARN, "init_io failed", K(ret), K(addr)); } else if (FALSE_IT(guard.click("init_io_")) || OB_FAIL(init_log_service_())) { SERVER_LOG(WARN, "init_log_service failed", K(ret), K(addr)); + } else if (FALSE_IT(guard.click("init_log_service_")) || OB_FAIL(looper_.init(this))) { + SERVER_LOG(WARN, "init ObLooper failed", K(ret), K(addr)); } else { guard.click("init_log_service_"); SERVER_LOG(INFO, "simple_log_server init success", KPC(this), K(guard)); @@ -161,9 +163,113 @@ int ObSimpleLogServer::simple_init( return ret; } +int ObSimpleLogServer::construct_allowed_new_log_disk_(const uint64_t tenant_id, + const int64_t expected_log_disk_size, + const int64_t old_log_disk_size, + int64_t &allowed_new_log_disk_size) +{ + int ret = OB_SUCCESS; + bool can_update_log_disk_size_with_expected_log_disk = true; + int64_t palf_log_disk_size = 0; + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + if (OB_FAIL(guard.switch_to(tenant_id))) { + LOG_WARN("failed to switch tenant", K(ret), K(tenant_id)); + } else { + ObLogService *log_service = MTL(ObLogService*); + int64_t used_log_disk_size = 0; + if (OB_ISNULL(log_service)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("ObLogService is nullptr", K(ret), KP(log_service)); + } else if (OB_FAIL(log_service->get_palf_stable_disk_usage(used_log_disk_size, palf_log_disk_size))) { + LOG_WARN("failed to update_log_disk_size", K(ret), K(palf_log_disk_size), K(allowed_new_log_disk_size), K(expected_log_disk_size)); + } else if (FALSE_IT(can_update_log_disk_size_with_expected_log_disk = (expected_log_disk_size >= palf_log_disk_size))) { + // For shrinking log disk, we still update log disk size of 'new_unit' to 'old_log_disk_size'. + } else if (!can_update_log_disk_size_with_expected_log_disk + && FALSE_IT(allowed_new_log_disk_size = old_log_disk_size)) { + // For expanding log disk, we update log disk size of 'new_unit' to 'expected_log_disk_size'. + } else if (can_update_log_disk_size_with_expected_log_disk + && FALSE_IT(allowed_new_log_disk_size = expected_log_disk_size)) { + } else { + LOG_INFO("construct_new_log_disk success", K(ret), K(tenant_id), K(can_update_log_disk_size_with_expected_log_disk), + K(old_log_disk_size), K(allowed_new_log_disk_size), K(expected_log_disk_size)); + } + } + return ret; +} + +int ObSimpleLogServer::update_tenant_log_disk_size_(const uint64_t tenant_id, + const int64_t expected_log_disk_size, + const int64_t old_log_disk_size, + const int64_t allowed_new_log_disk_size) +{ + int ret = OB_SUCCESS; + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + if (OB_SUCC(guard.switch_to(tenant_id))) { + ObLogService *log_service = MTL(ObLogService *); + if (OB_ISNULL(log_service)) { + ret = OB_ERR_UNEXPECTED; + } else if (OB_FAIL(log_service->update_log_disk_usage_limit_size(expected_log_disk_size))) { + LOG_WARN("failed to update_log_disk_usage_limit_size", K(ret), K(tenant_id), K(expected_log_disk_size), + K(old_log_disk_size), K(allowed_new_log_disk_size)); + } else if (OB_FAIL(log_block_pool_.update_tenant(old_log_disk_size, allowed_new_log_disk_size))) { + LOG_WARN("failed to update teannt int ObServerLogBlockMGR", K(ret), K(tenant_id), K(expected_log_disk_size), + K(old_log_disk_size), K(allowed_new_log_disk_size)); + } else { + disk_opts_.log_disk_usage_limit_size_ = allowed_new_log_disk_size; + LOG_INFO("update_log_disk_usage_limit_size success", K(ret), K(tenant_id), K(expected_log_disk_size), + K(old_log_disk_size), K(allowed_new_log_disk_size), K(disk_opts_)); + } + } + return ret; +} + +int ObSimpleLogServer::update_disk_opts_no_lock_(const PalfDiskOptions &opts) +{ + int ret = OB_SUCCESS; + int64_t old_log_disk_size = disk_opts_.log_disk_usage_limit_size_; + int64_t expected_log_disk_size = opts.log_disk_usage_limit_size_; + int64_t allowed_new_log_disk_size = 0; + // 内部表中的disk_opts立马生效 + inner_table_disk_opts_ = opts; + // disk_opts_表示本地持久化最新的disk_opts,在construct_allowed_new_log_disk_中会修改disk_opts_的log_disk_usage_limit_size_ + disk_opts_ = opts; + if (!opts.is_valid()) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "invalid argument", K(opts)); + } else if (OB_FAIL(construct_allowed_new_log_disk_(node_id_, expected_log_disk_size, + old_log_disk_size, allowed_new_log_disk_size))) { + CLOG_LOG(WARN, "construct_allowed_new_log_disk_ failed", K(expected_log_disk_size), K(old_log_disk_size), + K(allowed_new_log_disk_size)); + } else if (OB_FAIL(update_tenant_log_disk_size_(node_id_, expected_log_disk_size, + old_log_disk_size, allowed_new_log_disk_size))) { + CLOG_LOG(WARN, "update_tenant_log_disk_size_ failed", K(expected_log_disk_size), K(old_log_disk_size), + K(allowed_new_log_disk_size)); + } else { + CLOG_LOG(INFO, "update_disk_opts success", K(opts), K(disk_opts_), K(expected_log_disk_size), K(old_log_disk_size), + K(allowed_new_log_disk_size)); + } + return ret; +} + + int ObSimpleLogServer::update_disk_opts(const PalfDiskOptions &opts) { - return palf_env_->palf_env_impl_.disk_options_wrapper_.update_disk_options(opts); + int ret = OB_SUCCESS; + ObSpinLockGuard guard(log_disk_lock_); + ret = update_disk_opts_no_lock_(opts); + return ret; +} + +int ObSimpleLogServer::try_resize() +{ + int ret = OB_SUCCESS; + ObSpinLockGuard guard(log_disk_lock_); + if (disk_opts_ != inner_table_disk_opts_) { + if (OB_FAIL(update_disk_opts_no_lock_(inner_table_disk_opts_))) { + + } + } + return ret; } int ObSimpleLogServer::init_memory_dump_timer_() @@ -243,7 +349,7 @@ int ObSimpleLogServer::init_io_(const std::string &cluster_name) storage_env.default_block_size_ = OB_DEFAULT_MACRO_BLOCK_SIZE; storage_env.data_disk_size_ = 1024 * 1024 * 1024; storage_env.data_disk_percentage_ = 0; - storage_env.log_disk_size_ = 2LL * 1024 * 1024 * 1024; + storage_env.log_disk_size_ = 10LL * 1024 * 1024 * 1024; storage_env.log_disk_percentage_ = 0; storage_env.log_spec_.log_dir_ = slog_dir.c_str(); @@ -256,7 +362,6 @@ int ObSimpleLogServer::init_io_(const std::string &cluster_name) iod_opt_array_[3].set("datafile_disk_percentage", storage_env.data_disk_percentage_); iod_opt_array_[4].set("datafile_size", storage_env.data_disk_size_); iod_opts_.opt_cnt_ = MAX_IOD_OPT_CNT; - const int64_t DEFATULT_RESERVED_SIZE = 10 * 1024 * 1024 * 1024ul; if (OB_FAIL(io_device_->init(iod_opts_))) { SERVER_LOG(ERROR, "init io device fail", K(ret)); } else if (OB_FAIL(log_block_pool_.init(storage_env.clog_dir_))) { @@ -267,9 +372,10 @@ int ObSimpleLogServer::init_io_(const std::string &cluster_name) SERVER_LOG(INFO, "init_io_ successs", K(ret), K(guard)); } if (OB_SUCC(ret)) { - log_block_pool_.get_tenants_log_disk_size_func_ = [this, &storage_env](int64_t &log_disk_size) -> int + log_block_pool_.get_tenants_log_disk_size_func_ = [](int64_t &log_disk_size) -> int { - log_disk_size = log_block_pool_.lower_align_(storage_env.log_disk_size_); + // ObServerLogBlockMGR 率先于 ObLogService加载,此时租户使用的log_disk_size为0. + log_disk_size = 0; return OB_SUCCESS; }; if (OB_FAIL(log_block_pool_.start(storage_env.log_disk_size_))) { @@ -285,12 +391,18 @@ int ObSimpleLogServer::init_log_service_() int ret = OB_SUCCESS; // init deps of log_service palf::PalfOptions opts; - opts.disk_options_.log_disk_usage_limit_size_ = 10 * 1024 * 1024 * 1024ul; - opts.disk_options_.log_disk_utilization_threshold_ = 80; - opts.disk_options_.log_disk_utilization_limit_threshold_ = 95; - opts.disk_options_.log_disk_throttling_percentage_ = 100; - opts.disk_options_.log_disk_throttling_maximum_duration_ = 2 * 3600 * 1000 * 1000L; - opts.disk_options_.log_writer_parallelism_ = 2; + if (disk_opts_.is_valid()) { + opts.disk_options_ = disk_opts_; + } else { + opts.disk_options_.log_disk_usage_limit_size_ = 10 * 1024 * 1024 * 1024ul; + opts.disk_options_.log_disk_utilization_threshold_ = 80; + opts.disk_options_.log_disk_utilization_limit_threshold_ = 95; + opts.disk_options_.log_disk_throttling_percentage_ = 100; + opts.disk_options_.log_disk_throttling_maximum_duration_ = 2 * 3600 * 1000 * 1000L; + opts.disk_options_.log_writer_parallelism_ = 2; + disk_opts_ = opts.disk_options_; + inner_table_disk_opts_ = disk_opts_; + } std::string clog_dir = clog_dir_ + "/tenant_1"; allocator_ = OB_NEW(ObTenantMutilAllocator, "TestBase", node_id_); ObMemAttr attr(1, "SimpleLog"); @@ -301,12 +413,14 @@ int ObSimpleLogServer::init_log_service_() } else if (OB_FAIL(log_service_.init(opts, clog_dir.c_str(), addr_, allocator_, transport_, &ls_service_, &location_service_, &reporter_, &log_block_pool_, &sql_proxy_, net_keepalive_))) { SERVER_LOG(ERROR, "init_log_service_ fail", K(ret)); + } else if (OB_FAIL(log_block_pool_.create_tenant(opts.disk_options_.log_disk_usage_limit_size_))) { + SERVER_LOG(ERROR, "crete tenant failed", K(ret)); } else if (OB_FAIL(mock_election_map_.init(ele_attr))) { SERVER_LOG(ERROR, "mock_election_map_ init fail", K(ret)); } else { palf_env_ = log_service_.get_palf_env(); palf_env_->palf_env_impl_.log_rpc_.tenant_id_ = OB_SERVER_TENANT_ID; - SERVER_LOG(INFO, "init_log_service_ success", K(ret)); + SERVER_LOG(INFO, "init_log_service_ success", K(ret), K(opts), K(disk_opts_)); } return ret; } @@ -321,6 +435,8 @@ int ObSimpleLogServer::simple_start(const bool is_bootstrap = false) SERVER_LOG(ERROR, "deliver_ start failed", K(ret)); } else if (OB_FAIL(log_service_.arb_service_.start())) { SERVER_LOG(ERROR, "arb_service start failed", K(ret)); + } else if (OB_FAIL(looper_.start())) { + SERVER_LOG(ERROR, "ObLooper start failed", K(ret)); } // do not start entire log_service_ for now, it will // slow down cases running @@ -331,6 +447,7 @@ int ObSimpleLogServer::simple_close(const bool is_shutdown = false) { int ret = OB_SUCCESS; ObTenantEnv::set_tenant(tenant_base_); + looper_.destroy(); ObTimeGuard guard("simple_close", 0); deliver_.destroy(is_shutdown); guard.click("destroy"); @@ -742,5 +859,73 @@ int ObLogDeliver::handle_req_(rpc::ObRequest &req) return ret; } +ObLooper::ObLooper() : log_server_(nullptr), + run_interval_(0), + is_inited_(false) { } + +ObLooper::~ObLooper() +{ + destroy(); +} + +int ObLooper::init(ObSimpleLogServer *log_server) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + PALF_LOG(WARN, "ObLooper has been inited", K(ret)); + } else if (NULL == log_server) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(WARN, "invalid argument", K(ret), KP(log_server)); + } else { + log_server_ = log_server; + share::ObThreadPool::set_run_wrapper(MTL_CTX()); + run_interval_ = ObLooper::INTERVAL_US; + is_inited_ = true; + } + + if ((OB_FAIL(ret)) && (OB_INIT_TWICE != ret)) { + destroy(); + } + PALF_LOG(INFO, "ObLooper init finished", K(ret)); + return ret; +} + +void ObLooper::destroy() +{ + stop(); + wait(); + is_inited_ = false; + log_server_ = NULL; +} + +void ObLooper::run1() +{ + lib::set_thread_name("ObLooper"); + log_loop_(); +} + +void ObLooper::log_loop_() +{ + + while (!has_set_stop()) { + int ret = OB_SUCCESS; + const int64_t start_ts = ObTimeUtility::current_time(); + + if (OB_FAIL(log_server_->try_resize())) { + PALF_LOG(WARN, "try_resize failed", K(ret)); + } + const int64_t round_cost_time = ObTimeUtility::current_time() - start_ts; + int32_t sleep_ts = run_interval_ - static_cast(round_cost_time); + if (sleep_ts < 0) { + sleep_ts = 0; + } + ob_usleep(sleep_ts); + + if (REACH_TENANT_TIME_INTERVAL(5 * 1000 * 1000)) { + PALF_LOG(INFO, "ObLooper round_cost_time(us)", K(round_cost_time)); + } + } +} } // unittest } // oceanbase diff --git a/mittest/logservice/env/ob_simple_log_server.h b/mittest/logservice/env/ob_simple_log_server.h index 873896fb72..49392d08dd 100644 --- a/mittest/logservice/env/ob_simple_log_server.h +++ b/mittest/logservice/env/ob_simple_log_server.h @@ -120,6 +120,26 @@ protected: common::ObAddr self_; }; +class ObSimpleLogServer; +class ObLooper : public share::ObThreadPool { +public: + static constexpr int64_t INTERVAL_US = 1000*1000; + ObLooper(); + virtual ~ObLooper(); +public: + int init(ObSimpleLogServer *log_server); + void destroy(); + void run1(); +private: + void log_loop_(); +private: + ObSimpleLogServer *log_server_; + int64_t run_interval_; + bool is_inited_; +private: + DISALLOW_COPY_AND_ASSIGN(ObLooper); +}; + class ObLogDeliver : public rpc::frame::ObReqDeliver, public lib::TGTaskHandler, public ObMittestBlacklist { public: @@ -224,6 +244,7 @@ public: virtual ILogBlockPool *get_block_pool() = 0; virtual ObILogAllocator *get_allocator() = 0; virtual int update_disk_opts(const PalfDiskOptions &opts) = 0; + virtual int get_disk_opts(PalfDiskOptions &opts) = 0; virtual int get_palf_env(PalfEnv *&palf_env) = 0; virtual bool is_arb_server() const {return false;}; virtual int64_t get_node_id() = 0; @@ -264,6 +285,12 @@ public: ObILogAllocator *get_allocator() override final { return allocator_; } virtual int update_disk_opts(const PalfDiskOptions &opts) override final; + virtual int get_disk_opts(PalfDiskOptions &opts) override final + { + opts = disk_opts_; + return OB_SUCCESS; + } + virtual int try_resize(); virtual int get_palf_env(PalfEnv *&palf_env) { palf_env = palf_env_; return OB_SUCCESS;} virtual void revert_palf_env(IPalfEnvImpl *palf_env) { UNUSED(palf_env); } @@ -348,6 +375,16 @@ protected: int init_network_(const common::ObAddr &addr, const bool is_bootstrap); int init_log_service_(); int init_memory_dump_timer_(); + // 更新log_disk_size的逻辑保持和ObMultiTenant.cpp中维护同名函数一样 + int construct_allowed_new_log_disk_(const uint64_t tenant_id, + const int64_t expected_log_disk_size, + const int64_t old_log_disk_size, + int64_t &allowed_new_log_disk_size); + int update_tenant_log_disk_size_(const uint64_t tenant_id, + const int64_t expected_log_disk_size, + const int64_t old_log_disk_size, + const int64_t allowed_log_disk_size); + int update_disk_opts_no_lock_(const PalfDiskOptions &opts); private: int64_t node_id_; @@ -378,6 +415,10 @@ private: ObSrvRpcProxy srv_proxy_; logservice::coordinator::ObFailureDetector detector_; MockElectionMap mock_election_map_; + ObSpinLock log_disk_lock_; + palf::PalfDiskOptions disk_opts_; + palf::PalfDiskOptions inner_table_disk_opts_; + ObLooper looper_; }; } // end unittest diff --git a/mittest/logservice/test_ob_simple_log_basic_func.cpp b/mittest/logservice/test_ob_simple_log_basic_func.cpp index e3d3485650..2a1eaa8f5a 100644 --- a/mittest/logservice/test_ob_simple_log_basic_func.cpp +++ b/mittest/logservice/test_ob_simple_log_basic_func.cpp @@ -381,35 +381,6 @@ TEST_F(TestObSimpleLogClusterBasicFunc, limit_palf_instances) EXPECT_EQ(OB_SUCCESS, delete_paxos_group(id1)); } -TEST_F(TestObSimpleLogClusterBasicFunc, out_of_disk_space) -{ - SET_CASE_LOG_FILE(TEST_NAME, "out_of_disk_space"); - int64_t id = ATOMIC_AAF(&palf_id_, 1); - int server_idx = 0; - PalfEnv *palf_env = NULL; - int64_t leader_idx = 0; - PalfHandleImplGuard leader; - share::SCN create_scn = share::SCN::base_scn(); - EXPECT_EQ(OB_SUCCESS, get_palf_env(server_idx, palf_env)); - EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, create_scn, leader_idx, leader)); - update_disk_options(leader_idx, MIN_DISK_SIZE_PER_PALF_INSTANCE/PALF_PHY_BLOCK_SIZE); - EXPECT_EQ(OB_SUCCESS, submit_log(leader, 6*31+1, id, MAX_LOG_BODY_SIZE)); - LogStorage *log_storage = &leader.palf_handle_impl_->log_engine_.log_storage_; - while (LSN(6*PALF_BLOCK_SIZE) > log_storage->log_tail_) { - usleep(500); - } - EXPECT_EQ(OB_SUCCESS, submit_log(leader, 20, id, MAX_LOG_BODY_SIZE)); - while (LSN(6*PALF_BLOCK_SIZE + 20 * MAX_LOG_BODY_SIZE) > log_storage->log_tail_) { - usleep(500); - } - LSN max_lsn = leader.palf_handle_impl_->get_max_lsn(); - wait_lsn_until_flushed(max_lsn, leader); - PALF_LOG(INFO, "out of disk max_lsn", K(max_lsn)); - usleep(palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS + 5*10000); - EXPECT_EQ(OB_LOG_OUTOF_DISK_SPACE, submit_log(leader, 1, id, MAX_LOG_BODY_SIZE)); - palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_ = 5 * MIN_DISK_SIZE_PER_PALF_INSTANCE; -} - TEST_F(TestObSimpleLogClusterBasicFunc, submit_group_log) { SET_CASE_LOG_FILE(TEST_NAME, "submit_group_log"); diff --git a/mittest/logservice/test_ob_simple_log_disk_mgr.cpp b/mittest/logservice/test_ob_simple_log_disk_mgr.cpp new file mode 100644 index 0000000000..b4cc3f8423 --- /dev/null +++ b/mittest/logservice/test_ob_simple_log_disk_mgr.cpp @@ -0,0 +1,365 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "lib/ob_define.h" +#include "lib/ob_errno.h" +#include +#include +#include +#include +#define private public +#define protected public +#include "env/ob_simple_log_cluster_env.h" +#undef private +#undef protected +#include "logservice/palf/log_reader_utils.h" +#include "logservice/palf/log_define.h" +#include "logservice/palf/log_group_entry_header.h" +#include "logservice/palf/log_io_worker.h" +#include "logservice/palf/lsn.h" + +const std::string TEST_NAME = "log_disk_mgr"; + +using namespace oceanbase::common; +using namespace oceanbase; +namespace oceanbase +{ +using namespace logservice; +namespace unittest +{ +class TestObSimpleLogDiskMgr : public ObSimpleLogClusterTestEnv +{ +public: + TestObSimpleLogDiskMgr() : ObSimpleLogClusterTestEnv() + { + int ret = init(); + if (OB_SUCCESS != ret) { + throw std::runtime_error("TestObSimpleLogDiskMgr init failed"); + } + } + ~TestObSimpleLogDiskMgr() + { + destroy(); + } + int init() + { + return OB_SUCCESS; + } + void destroy() + {} + int64_t id_; +}; + +int64_t ObSimpleLogClusterTestBase::member_cnt_ = 1; +int64_t ObSimpleLogClusterTestBase::node_cnt_ = 1; +std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME; +bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false; + +TEST_F(TestObSimpleLogDiskMgr, out_of_disk_space) +{ + SET_CASE_LOG_FILE(TEST_NAME, "out_of_disk_space"); + int64_t id = ATOMIC_AAF(&palf_id_, 1); + int server_idx = 0; + PalfEnv *palf_env = NULL; + int64_t leader_idx = 0; + PalfHandleImplGuard leader; + share::SCN create_scn = share::SCN::base_scn(); + EXPECT_EQ(OB_SUCCESS, get_palf_env(server_idx, palf_env)); + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, create_scn, leader_idx, leader)); + update_disk_options(leader_idx, MIN_DISK_SIZE_PER_PALF_INSTANCE/PALF_PHY_BLOCK_SIZE); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 6*31+1, id, MAX_LOG_BODY_SIZE)); + LogStorage *log_storage = &leader.palf_handle_impl_->log_engine_.log_storage_; + while (LSN(6*PALF_BLOCK_SIZE) > log_storage->log_tail_) { + usleep(500); + } + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 20, id, MAX_LOG_BODY_SIZE)); + while (LSN(6*PALF_BLOCK_SIZE + 20 * MAX_LOG_BODY_SIZE) > log_storage->log_tail_) { + usleep(500); + } + LSN max_lsn = leader.palf_handle_impl_->get_max_lsn(); + wait_lsn_until_flushed(max_lsn, leader); + PALF_LOG(INFO, "out of disk max_lsn", K(max_lsn)); + usleep(palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS + 5*10000); + EXPECT_EQ(OB_LOG_OUTOF_DISK_SPACE, submit_log(leader, 1, id, MAX_LOG_BODY_SIZE)); + usleep(ObLooper::INTERVAL_US*2); +} + +TEST_F(TestObSimpleLogDiskMgr, update_disk_options_basic) +{ + SET_CASE_LOG_FILE(TEST_NAME, "update_disk_options_basic"); + OB_LOGGER.set_log_level("INFO"); + const int64_t id = ATOMIC_AAF(&palf_id_, 1); + // 将日志盘空间设置为10GB + update_disk_options(10*1024*1024*1024ul/PALF_PHY_BLOCK_SIZE); + PALF_LOG(INFO, "start update_disk_options_basic", K(id)); + int64_t leader_idx = 0; + PalfHandleImplGuard leader; + PalfEnv *palf_env = NULL; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); + EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env)); + + // 提交1G的日志 + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 500, id, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader)); + + EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 20)); + usleep(1000*1000+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS); + EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS, + palf_env->palf_env_impl_.disk_options_wrapper_.status_); + EXPECT_EQ(palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_, + 20*PALF_PHY_BLOCK_SIZE); + // 可以在上一次为缩容完成之前,可以继续缩容 + EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 10)); + usleep(1000*1000+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS); + EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS, + palf_env->palf_env_impl_.disk_options_wrapper_.status_); + EXPECT_EQ(palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_, + 10*PALF_PHY_BLOCK_SIZE); + // 此时日志盘依旧未完成缩容,ObSimpleLogServer维护的disk_opts_依旧是10GB + { + PalfDiskOptions opts; + EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts)); + EXPECT_EQ(opts.log_disk_usage_limit_size_, 10*1024*1024*1024ul); + } + // 可以在上一次未缩容完成之前,可以继续扩容, 同时由于扩容后日志盘依旧小于第一次缩容 + // ,因此依旧处于缩容状态. + EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 11)); + usleep(1000*1000+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS); + EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS, + palf_env->palf_env_impl_.disk_options_wrapper_.status_); + EXPECT_EQ(palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_, + 11*PALF_PHY_BLOCK_SIZE); + // 此时日志盘依旧未完成缩容,ObSimpleLogServer维护的disk_opts_依旧是10GB + { + PalfDiskOptions opts; + EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts)); + EXPECT_EQ(opts.log_disk_usage_limit_size_, 10*1024*1024*1024ul); + } + usleep(1000*1000+100+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS); + EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS, + palf_env->palf_env_impl_.disk_options_wrapper_.status_); + EXPECT_EQ(palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_, + 11*PALF_PHY_BLOCK_SIZE); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); + const LSN base_lsn(12*PALF_BLOCK_SIZE); + EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->set_base_lsn(base_lsn)); + usleep(1000); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1000)); + EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader)); + // wait until disk space enough + EXPECT_EQ(OB_SUCCESS, wait_until_disk_space_to(leader_idx, (11*PALF_PHY_BLOCK_SIZE*80+100)/100)); + usleep(1000*1000+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS); + EXPECT_EQ(PalfDiskOptionsWrapper::Status::NORMAL_STATUS, + palf_env->palf_env_impl_.disk_options_wrapper_.status_); + EXPECT_EQ(palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_, + 11*PALF_PHY_BLOCK_SIZE); + // 等待后台线程再次执行update_disk_options操作,预期本地持久化的disk_opts会变为11*PALF_PHY_BLOCK_SIZE + { + usleep(2*ObLooper::INTERVAL_US); + PalfDiskOptions opts; + EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts)); + EXPECT_EQ(opts.log_disk_usage_limit_size_, 11*PALF_PHY_BLOCK_SIZE); + } +} + +TEST_F(TestObSimpleLogDiskMgr, update_disk_options_restart) +{ + disable_hot_cache_ = true; + SET_CASE_LOG_FILE(TEST_NAME, "update_disk_options_restart"); + OB_LOGGER.set_log_level("INFO"); + // 扩容操作 + EXPECT_EQ(OB_SUCCESS, update_disk_options(10*1024*1024*1024ul/PALF_PHY_BLOCK_SIZE)); + usleep(1000*1000+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS); + const int64_t id = ATOMIC_AAF(&palf_id_, 1); + int64_t leader_idx = 0; + PalfEnv *palf_env = NULL; + { + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); + { + PalfDiskOptions opts; + EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts)); + EXPECT_EQ(opts.log_disk_usage_limit_size_, 10*1024*1024*1024ul); + } + EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env)); + { + EXPECT_EQ(PalfDiskOptionsWrapper::Status::NORMAL_STATUS, + palf_env->palf_env_impl_.disk_options_wrapper_.status_); + EXPECT_EQ(10*1024*1024*1024ul, + palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_); + } + // 产生10个文件的数据 + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10*32, id, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); + // 最小的log_disk_size要求是存在8个日志文件 + EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 8)); + usleep(1000*1000+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS); + // 宕机前,缩容不会正式生效,因此不会导致停写 + EXPECT_EQ(true, palf_env->palf_env_impl_.diskspace_enough_); + EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS, + palf_env->palf_env_impl_.disk_options_wrapper_.status_); + EXPECT_EQ(10*1024*1024*1024ul, palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_); + int64_t log_disk_usage, total_log_disk_size; + EXPECT_EQ(OB_SUCCESS, palf_env->palf_env_impl_.get_disk_usage(log_disk_usage, total_log_disk_size)); + PALF_LOG(INFO, "log_disk_usage:", K(log_disk_usage), K(total_log_disk_size)); + // 缩容未成功前,log_disk_usage_limit_size_依旧保持10G. + // 本地持久化的log_disk_size为10G + // 内部表中持久化的log_disk_size为8*PALF_PHY_BLOCK_SIZE + PalfDiskOptions opts; + EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts)); + EXPECT_EQ(opts.log_disk_usage_limit_size_, 10*1024*1024*1024ul); + } + // 物理缩容未成功前,宕机重启预期不会停写 + EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); + { + // 内部表中持久化的log_disk_size依旧是8*PALF_PHY_BLOCK_SIZE + // 重启后继续缩容 + int64_t log_disk_usage, total_log_disk_size; + EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env)); + usleep(2*ObLooper::INTERVAL_US); + usleep(1000*1000 + BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS); + EXPECT_EQ(true, palf_env->palf_env_impl_.diskspace_enough_); + EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS, + palf_env->palf_env_impl_.disk_options_wrapper_.status_); + // 本地持久化的(slog)记录的依旧是10G,因此不会停写 + EXPECT_EQ(10*1024*1024*1024ul, palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_); + EXPECT_EQ(OB_SUCCESS, palf_env->palf_env_impl_.get_disk_usage(log_disk_usage, total_log_disk_size)); + PALF_LOG(INFO, "log_disk_usage:", K(log_disk_usage), K(total_log_disk_size)); + PalfDiskOptions opts; + EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts)); + EXPECT_EQ(opts.log_disk_usage_limit_size_, 10*1024*1024*1024ul); + } + { + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx)); + PalfDiskOptions opts; + EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts)); + EXPECT_EQ(opts.log_disk_usage_limit_size_, 10*1024*1024*1024ul); + + // 物理上保证不缩容,内部表中持久化的变为16*PALF_PHY_BLOCK_SIZE, 由于palf的log_disk_size是10G, + // 因此本次对palf是一次缩容操作. 但下一轮GC任务运行时,发现当前的使用的日志盘空间不会导致停写, + // 于是日志盘变为正常状态 + EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 16)); + // 在下一轮GC任务运行后,本地持久化的log_disk_size也会变为16*PALF_PHY_BLOCK_SIZE + usleep(1000*1000+palf::BlockGCTimerTask::BLOCK_GC_TIMER_INTERVAL_MS); + // 经过一轮GC后,会变为NORMAL_STATUS + EXPECT_EQ(true, palf_env->palf_env_impl_.diskspace_enough_); + EXPECT_EQ(PalfDiskOptionsWrapper::Status::NORMAL_STATUS, + palf_env->palf_env_impl_.disk_options_wrapper_.status_); + EXPECT_EQ(16*PALF_PHY_BLOCK_SIZE, + palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_); + // 后台线程会完成缩容操作,最终本地持久化的变为16*PALF_PHY_BLOCK_SIZE + usleep(ObLooper::INTERVAL_US*2); + EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts)); + EXPECT_EQ(opts.log_disk_usage_limit_size_, 16*PALF_PHY_BLOCK_SIZE); + } +} + +TEST_F(TestObSimpleLogDiskMgr, overshelling) +{ + disable_hot_cache_ = true; + SET_CASE_LOG_FILE(TEST_NAME, "overshelling"); + OB_LOGGER.set_log_level("INFO"); + ObServerLogBlockMgr *log_pool = nullptr; + EXPECT_EQ(OB_SUCCESS, get_log_pool(0, log_pool)); + ASSERT_NE(nullptr, log_pool); + // 验证扩缩容场景下的LogPool字段的正确性 + EXPECT_EQ(16*PALF_PHY_BLOCK_SIZE, log_pool->min_log_disk_size_for_all_tenants_); + const int64_t id = ATOMIC_AAF(&palf_id_, 1); + int64_t leader_idx = 0; + PalfEnv *palf_env = NULL; + { + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); + } + EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 15)); + int64_t log_disk_size_used_for_tenants = log_pool->min_log_disk_size_for_all_tenants_; + // 缩容还未成功,预期log_disk_size_used_for_tenants一定是16*PALF_PHY_BLOCK_SIZE + PalfDiskOptions opts; + EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts)); + if (opts.log_disk_usage_limit_size_ == 16*PALF_PHY_BLOCK_SIZE) { + EXPECT_EQ(16*PALF_PHY_BLOCK_SIZE, log_disk_size_used_for_tenants); + // 缩容不会立马生效 + usleep(ObLooper::INTERVAL_US*2); + EXPECT_EQ(15*PALF_PHY_BLOCK_SIZE, log_pool->min_log_disk_size_for_all_tenants_); + } else { + PALF_LOG(INFO, "update_disk_options successfully", K(log_disk_size_used_for_tenants), K(opts)); + } + + // 扩容预期立马成功 + EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 16)); + EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts)); + EXPECT_EQ(16*PALF_PHY_BLOCK_SIZE, log_disk_size_used_for_tenants); + EXPECT_EQ(16*PALF_PHY_BLOCK_SIZE, opts.log_disk_usage_limit_size_); + + // 直接扩容为LogPool上限值 + const int64_t limit_log_disk_size = log_pool->log_pool_meta_.curr_total_size_; + EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, limit_log_disk_size/PALF_PHY_BLOCK_SIZE)); + EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts)); + log_disk_size_used_for_tenants = log_pool->min_log_disk_size_for_all_tenants_; + EXPECT_EQ(limit_log_disk_size, log_disk_size_used_for_tenants); + EXPECT_EQ(limit_log_disk_size, opts.log_disk_usage_limit_size_); + + { + PalfHandleImplGuard leader; + EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx)); + // 生成10个文件 + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10*32, id, MAX_LOG_BODY_SIZE)); + EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); + EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, 10)); + // 缩容一定不会成功,租户日志盘规格依旧为上限值 + usleep(ObLooper::INTERVAL_US * 2); + EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env)); + EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS, + palf_env->palf_env_impl_.disk_options_wrapper_.status_); + EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts)); + log_disk_size_used_for_tenants = log_pool->min_log_disk_size_for_all_tenants_; + EXPECT_EQ(limit_log_disk_size, log_disk_size_used_for_tenants); + EXPECT_EQ(limit_log_disk_size, opts.log_disk_usage_limit_size_); + EXPECT_EQ(OB_MACHINE_RESOURCE_NOT_ENOUGH, log_pool->create_tenant(MIN_DISK_SIZE_PER_PALF_INSTANCE)); + const LSN base_lsn(8*PALF_BLOCK_SIZE); + EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->set_base_lsn(base_lsn)); + usleep(1000); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1000)); + EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader)); + EXPECT_EQ(OB_SUCCESS, wait_until_disk_space_to(leader_idx, (10*PALF_PHY_BLOCK_SIZE*80+100)/100)); + usleep(ObLooper::INTERVAL_US * 2); + EXPECT_EQ(PalfDiskOptionsWrapper::Status::NORMAL_STATUS, + palf_env->palf_env_impl_.disk_options_wrapper_.status_); + EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts)); + log_disk_size_used_for_tenants = log_pool->min_log_disk_size_for_all_tenants_; + EXPECT_EQ(10*PALF_PHY_BLOCK_SIZE, log_disk_size_used_for_tenants); + EXPECT_EQ(10*PALF_PHY_BLOCK_SIZE, opts.log_disk_usage_limit_size_); + EXPECT_EQ(OB_SUCCESS, log_pool->create_tenant(MIN_DISK_SIZE_PER_PALF_INSTANCE)); + log_pool->abort_create_tenant(MIN_DISK_SIZE_PER_PALF_INSTANCE); + + // 扩容预计一定成功 + EXPECT_EQ(OB_SUCCESS, update_disk_options(leader_idx, limit_log_disk_size/PALF_PHY_BLOCK_SIZE)); + EXPECT_EQ(PalfDiskOptionsWrapper::Status::NORMAL_STATUS, + palf_env->palf_env_impl_.disk_options_wrapper_.status_); + EXPECT_EQ(OB_SUCCESS, get_disk_options(leader_idx, opts)); + log_disk_size_used_for_tenants = log_pool->min_log_disk_size_for_all_tenants_; + EXPECT_EQ(limit_log_disk_size, log_disk_size_used_for_tenants); + EXPECT_EQ(limit_log_disk_size, opts.log_disk_usage_limit_size_); + EXPECT_EQ(OB_MACHINE_RESOURCE_NOT_ENOUGH, log_pool->create_tenant(MIN_DISK_SIZE_PER_PALF_INSTANCE)); + + } + +} + +} // namespace unittest +} // namespace oceanbase + +int main(int argc, char **argv) +{ + RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME); +} diff --git a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp index e4d793b27a..05b1ce9c94 100644 --- a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp +++ b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp @@ -95,85 +95,6 @@ void read_padding_entry(PalfHandleImplGuard &leader, SCN padding_scn, LSN paddin } -TEST_F(TestObSimpleLogClusterSingleReplica, update_disk_options) -{ - SET_CASE_LOG_FILE(TEST_NAME, "update_disk_options"); - OB_LOGGER.set_log_level("TRACE"); - const int64_t id = ATOMIC_AAF(&palf_id_, 1); - PALF_LOG(INFO, "start update_disk_options", K(id)); - int64_t leader_idx = 0; - PalfHandleImplGuard leader; - PalfEnv *palf_env = NULL; - EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); - EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx, palf_env)); - PalfOptions opts; - EXPECT_EQ(OB_SUCCESS, palf_env->get_options(opts)); - EXPECT_EQ(OB_SUCCESS, submit_log(leader, 4 * 32 + 10, id, MAX_LOG_BODY_SIZE)); - while (leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_ - < LSN(4 * 32 * MAX_LOG_BODY_SIZE)) { - sleep(1); - } - block_id_t min_block_id, max_block_id; - EXPECT_EQ(OB_SUCCESS, - leader.palf_handle_impl_->log_engine_.get_block_id_range( - min_block_id, max_block_id)); - EXPECT_EQ(4, max_block_id); - opts.disk_options_.log_disk_usage_limit_size_ = 8 * PALF_PHY_BLOCK_SIZE; - EXPECT_EQ(OB_SUCCESS, palf_env->update_options(opts)); - sleep(1); - opts.disk_options_.log_disk_utilization_limit_threshold_ = 50; - opts.disk_options_.log_disk_utilization_threshold_ = 40; - EXPECT_EQ(OB_SUCCESS, palf_env->update_options(opts)); - - opts.disk_options_.log_disk_usage_limit_size_ = 4 * PALF_PHY_BLOCK_SIZE; - EXPECT_EQ(OB_STATE_NOT_MATCH, palf_env->update_options(opts)); - - palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_ = palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_; - palf_env->palf_env_impl_.disk_options_wrapper_.status_ = PalfDiskOptionsWrapper::Status::NORMAL_STATUS; - - opts.disk_options_.log_disk_utilization_limit_threshold_ = 95; - opts.disk_options_.log_disk_utilization_threshold_ = 80; - EXPECT_EQ(OB_SUCCESS, palf_env->update_options(opts)); - EXPECT_EQ(OB_STATE_NOT_MATCH, palf_env->update_options(opts)); - sleep(1); - EXPECT_EQ(PalfDiskOptionsWrapper::Status::SHRINKING_STATUS, - palf_env->palf_env_impl_.disk_options_wrapper_.status_); - EXPECT_GT(palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_, - palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_usage_limit_size_); - EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->set_base_lsn(LSN(4 * PALF_BLOCK_SIZE))); - //EXPECT_EQ(LSN(4*PALF_BLOCK_SIZE), leader.palf_handle_.palf_handle_impl_->log_engine_.get_log_meta().get_log_snapshot_meta().base_lsn_); - while (leader.palf_handle_impl_->log_engine_.base_lsn_for_block_gc_ - != LSN(4 * PALF_BLOCK_SIZE)) { - sleep(1); - } - // wait blocks to be recycled - while (leader.palf_handle_impl_->log_engine_.get_base_lsn_used_for_block_gc() != (LSN(4*PALF_BLOCK_SIZE))) { - sleep(1); - } - sleep(1); - EXPECT_EQ(PalfDiskOptionsWrapper::Status::NORMAL_STATUS, - palf_env->palf_env_impl_.disk_options_wrapper_.status_); - EXPECT_EQ( - opts.disk_options_, - palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_); - EXPECT_EQ( - opts.disk_options_, - palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_); - PALF_LOG(INFO, "runlin trace", K(opts), - "holders:", palf_env->palf_env_impl_.disk_options_wrapper_); - // test expand - opts.disk_options_.log_disk_usage_limit_size_ = 5 * 1024 * 1024 * 1024ul; - EXPECT_EQ(OB_SUCCESS, palf_env->update_options(opts)); - opts.disk_options_.log_disk_usage_limit_size_ = 6 * 1024 * 1024 * 1024ul; - EXPECT_EQ(OB_SUCCESS, palf_env->update_options(opts)); - EXPECT_EQ( - opts.disk_options_, - palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_); - EXPECT_EQ( - opts.disk_options_, - palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_stopping_writing_); -} - TEST_F(TestObSimpleLogClusterSingleReplica, delete_paxos_group) { SET_CASE_LOG_FILE(TEST_NAME, "delete_paxos_group"); diff --git a/mittest/simple_server/env/ob_simple_cluster_test_base.cpp b/mittest/simple_server/env/ob_simple_cluster_test_base.cpp index 90768fa2ec..5722b74c39 100644 --- a/mittest/simple_server/env/ob_simple_cluster_test_base.cpp +++ b/mittest/simple_server/env/ob_simple_cluster_test_base.cpp @@ -18,6 +18,11 @@ #include "lib/time/ob_time_utility.h" #include "lib/utility/ob_defer.h" #include "logservice/palf/election/utils/election_common_define.h" +#define private public +#define protected public +#include "share/config/ob_server_config.h" +#undef private +#undef protected namespace oceanbase { @@ -79,6 +84,8 @@ bool ObSimpleClusterTestBase::is_started_ = false; std::string ObSimpleClusterTestBase::env_prefix_; std::string ObSimpleClusterTestBase::curr_dir_; bool ObSimpleClusterTestBase::enable_env_warn_log_ = false; +const char *ObSimpleClusterTestBase::UNIT_BASE ="box_ym_"; +const char *ObSimpleClusterTestBase::POOL_BASE ="pool_ym_"; ObSimpleClusterTestBase::ObSimpleClusterTestBase(const std::string &env_prefix, const char *log_disk_size, @@ -156,7 +163,6 @@ int ObSimpleClusterTestBase::start() GCONF.enable_record_trace_log = false; GMEMCONF.set_server_memory_limit(10 * 1024 * 1024 * 1024ul); - int32_t log_level; bool change_log_level = false; if (enable_env_warn_log_) { @@ -223,8 +229,8 @@ int ObSimpleClusterTestBase::create_tenant(const char *tenant_name, { ObSqlString sql; if (OB_FAIL(ret)) { - } else if (OB_FAIL(sql.assign_fmt("create resource unit box_ym_%s max_cpu 8, memory_size '%s', log_disk_size='%s';", - tenant_name, memory_size, log_disk_size))) { + } else if (OB_FAIL(sql.assign_fmt("create resource unit %s%s max_cpu 2, memory_size '%s', log_disk_size='%s';", + UNIT_BASE, tenant_name, memory_size, log_disk_size))) { SERVER_LOG(WARN, "create_tenant", K(ret)); } else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) { SERVER_LOG(WARN, "create_tenant", K(ret)); @@ -233,7 +239,7 @@ int ObSimpleClusterTestBase::create_tenant(const char *tenant_name, { ObSqlString sql; if (OB_FAIL(ret)) { - } else if (OB_FAIL(sql.assign_fmt("create resource pool pool_ym_%s unit = 'box_ym_%s', unit_num = 1, zone_list = ('zone1');", tenant_name, tenant_name))) { + } else if (OB_FAIL(sql.assign_fmt("create resource pool %s%s unit = '%s%s', unit_num = 1, zone_list = ('zone1');", POOL_BASE, tenant_name, UNIT_BASE, tenant_name))) { SERVER_LOG(WARN, "create_tenant", K(ret)); } else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) { SERVER_LOG(WARN, "create_tenant", K(ret)); @@ -242,7 +248,7 @@ int ObSimpleClusterTestBase::create_tenant(const char *tenant_name, { ObSqlString sql; if (OB_FAIL(ret)) { - } else if (OB_FAIL(sql.assign_fmt("create tenant %s replica_num = 1, primary_zone='zone1', resource_pool_list=('pool_ym_%s') set ob_tcp_invited_nodes='%%'%s", tenant_name, tenant_name, oracle_mode ? ", ob_compatibility_mode='oracle'" : ";"))) { + } else if (OB_FAIL(sql.assign_fmt("create tenant %s replica_num = 1, primary_zone='zone1', resource_pool_list=('%s%s') set ob_tcp_invited_nodes='%%'%s", tenant_name, POOL_BASE, tenant_name, oracle_mode ? ", ob_compatibility_mode='oracle'" : ";"))) { SERVER_LOG(WARN, "create_tenant", K(ret)); } else if (OB_FAIL(sql_proxy.write(sql.ptr(), affected_rows))) { SERVER_LOG(WARN, "create_tenant", K(ret)); @@ -251,7 +257,7 @@ int ObSimpleClusterTestBase::create_tenant(const char *tenant_name, if (change_log_level) { OB_LOGGER.set_log_level(log_level); } - SERVER_LOG(INFO, "create tenant finish", K(ret)); + SERVER_LOG(INFO, "create tenant finish", K(ret), K(tenant_name)); return ret; } diff --git a/mittest/simple_server/env/ob_simple_cluster_test_base.h b/mittest/simple_server/env/ob_simple_cluster_test_base.h index 64d3eaed59..c7edd374b7 100644 --- a/mittest/simple_server/env/ob_simple_cluster_test_base.h +++ b/mittest/simple_server/env/ob_simple_cluster_test_base.h @@ -32,7 +32,7 @@ public: // set_bootstrap_and_create_tenant_warn_log 默认bootstrap和创建租户使用WARN日志,加速启动 ObSimpleClusterTestBase(const std::string &env_prefix = "run_", const char *log_disk_size = "10G", - const char *memory_limit = "10G"); + const char *memory_limit = "16G"); virtual ~ObSimpleClusterTestBase(); int start(); @@ -62,6 +62,8 @@ protected: static std::string env_prefix_; static std::string curr_dir_; static bool enable_env_warn_log_; + static const char *UNIT_BASE; + static const char *POOL_BASE; }; } // end unittest diff --git a/mittest/simple_server/test_observer_expand_shrink.cpp b/mittest/simple_server/test_observer_expand_shrink.cpp index a0e6c9dd66..73054a49f3 100644 --- a/mittest/simple_server/test_observer_expand_shrink.cpp +++ b/mittest/simple_server/test_observer_expand_shrink.cpp @@ -20,6 +20,10 @@ #include "logservice/ob_log_service.h" #include "observer/ob_server_utils.h" #include "env/ob_simple_server_restart_helper.h" +#include "observer/omt/ob_tenant.h" +#include "share/unit/ob_unit_config.h" +#undef protected +#undef private const char *TEST_FILE_NAME = "test_observer_expand_shrink"; const char *BORN_CASE_NAME= "ObserverExpandShink"; @@ -56,7 +60,7 @@ TEST_F(ObserverExpandShink, basic_func) int64_t origin_server_in_use_size, origin_server_log_total_size; EXPECT_EQ(OB_SUCCESS, GCTX.log_block_mgr_->get_disk_usage(origin_server_in_use_size, origin_server_log_total_size)); GCONF.log_disk_size = GCTX.log_block_mgr_->lower_align_(2 * origin_server_log_total_size); - sleep(6); + sleep(11); int64_t new_server_in_use_size, new_server_log_total_size; EXPECT_EQ(OB_SUCCESS, GCTX.log_block_mgr_->get_disk_usage(new_server_in_use_size, new_server_log_total_size)); EXPECT_EQ(new_server_log_total_size, 2 * origin_server_log_total_size); @@ -102,6 +106,96 @@ TEST_F(ObserverExpandShink, basic_func) log_disk_percentage)); } +template +std::string string_format( const std::string& format, Args ... args ) +{ + int size_s = std::snprintf( nullptr, 0, format.c_str(), args ... ) + 1; // Extra space for '\0' + if( size_s <= 0 ){ throw std::runtime_error( "Error during formatting." ); } + auto size = static_cast( size_s ); + std::unique_ptr buf( new char[ size ] ); + std::snprintf( buf.get(), size, format.c_str(), args ... ); + return std::string( buf.get(), buf.get() + size - 1 ); // We don't want the '\0' inside +} + +TEST_F(ObserverExpandShink, resize_tenant_log_disk) +{ + omt::ObTenantNodeBalancer::get_instance().refresh_interval_ = 1 * 1000 * 1000; + sleep(10); + GCONF.log_disk_size = 20 * 1024 * 1024 * 1024ul; + bool tenant_exist = true; + int ret = OB_SUCCESS; + delete_tenant("runlin"); + while (true == tenant_exist && OB_SUCC(ret)) { + if (OB_FAIL(check_tenant_exist(tenant_exist, "runlin"))) { + SERVER_LOG(WARN, "check_tenant_exist failed", K(ret)); + } + } + // 保证log_disk_size变为10G生效 + sleep(3); + int64_t log_disk_origin_assigned = GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_; + bool bool_ret = true; + EXPECT_EQ(bool_ret, true); + LOG_INFO("runlin trace, before create one default tenant", KPC(GCTX.log_block_mgr_), K(log_disk_origin_assigned)); + // 每个租户的日志盘大小为2G(默认值) + EXPECT_EQ(OB_SUCCESS, create_tenant("runlin1")); + LOG_INFO("runlin trace, after create one default tenant", KPC(GCTX.log_block_mgr_), K(log_disk_origin_assigned)); + EXPECT_EQ(OB_SUCCESS, create_tenant("runlin2")); + EXPECT_EQ(GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_, + log_disk_origin_assigned + 4*1024*1024*1024ul); + LOG_INFO("runlin trace, after create two default tenant", KPC(GCTX.log_block_mgr_), K(log_disk_origin_assigned)); + // 修改租户规格 + int64_t affected_rows = 0; + std::string sql_str = "ALTER RESOURCE UNIT %s%s LOG_DISK_SIZE='%s'"; + { + std::string alter_resource_failed_sql = string_format(sql_str, UNIT_BASE, "runlin1", "100G"); + EXPECT_EQ(OB_MACHINE_RESOURCE_NOT_ENOUGH, exec_write_sql_sys(alter_resource_failed_sql.c_str(), affected_rows)); + LOG_INFO("runlin trace, alter resource failed", KPC(GCTX.log_block_mgr_)); + } + { + std::string alter_resource_failed_sql = string_format(sql_str, UNIT_BASE, "runlin1", "1G"); + EXPECT_EQ(OB_RESOURCE_UNIT_VALUE_BELOW_LIMIT, exec_write_sql_sys(alter_resource_failed_sql.c_str(), affected_rows)); + LOG_INFO("runlin trace, alter resource below limit", KPC(GCTX.log_block_mgr_)); + } + { + // 扩容验证 + std::string alter_resource_runlin1 = string_format(sql_str, UNIT_BASE, "runlin1", "6G"); + std::string alter_resource_runlin2 = string_format(sql_str, UNIT_BASE, "runlin2", "6G"); + EXPECT_EQ(OB_SUCCESS, exec_write_sql_sys(alter_resource_runlin1.c_str(), affected_rows)); + EXPECT_EQ(OB_SUCCESS, exec_write_sql_sys(alter_resource_runlin2.c_str(), affected_rows)); + sleep(3); + // 扩容操作直接执行成功 + EXPECT_EQ(GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_, + log_disk_origin_assigned + 12*1024*1024*1024ul); + LOG_INFO("runlin trace, alter resource to 6G success", KPC(GCTX.log_block_mgr_)); + } + { + // 缩容验证 + // 暂停TenantNodeBalancer的运行 + omt::ObTenantNodeBalancer::get_instance().refresh_interval_ = 10 * 1000 * 1000; + int64_t start_ts = ObTimeUtility::current_time(); + sleep(2); + { + std::string alter_resource_runlin1 = string_format(sql_str, UNIT_BASE, "runlin1", "2G"); + std::string alter_resource_runlin2 = string_format(sql_str, UNIT_BASE, "runlin2", "2G"); + EXPECT_EQ(OB_SUCCESS, exec_write_sql_sys(alter_resource_runlin1.c_str(), affected_rows)); + EXPECT_EQ(OB_SUCCESS, exec_write_sql_sys(alter_resource_runlin2.c_str(), affected_rows)); + } + const int64_t cost_ts = ObTimeUtility::current_time() - start_ts; + // 假设sleep+执行sql的时间小于4s + EXPECT_LE(cost_ts, 4*1000*1000); + // 此时TenantNodeBalancer线程还未开始运行,ObServerLogBlockMgr的min_log_disk_size_for_all_tenants_不会发生变化 + EXPECT_EQ(GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_, + log_disk_origin_assigned + 12*1024*1024*1024ul); + omt::ObTenantNodeBalancer::get_instance().refresh_interval_ = 1 * 1000 * 1000; + sleep(11); + LOG_INFO("runlin trace, alter resource to 2G success", KPC(GCTX.log_block_mgr_)); + EXPECT_EQ(GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_, + log_disk_origin_assigned + 4*1024*1024*1024ul); + } + EXPECT_EQ(OB_SUCCESS, delete_tenant("runlin1")); + EXPECT_EQ(OB_SUCCESS, delete_tenant("runlin2")); +} + //class ObserverExpandShinkRestart: public ObSimpleClusterTestBase //{ //public: @@ -114,56 +208,15 @@ TEST_F(ObserverExpandShink, basic_func) // SERVER_LOG(INFO, "create_tenant_after_restart trace", KPC(GCTX.log_block_mgr_)); //} -TEST_F(ObserverExpandShink, direct_set_observer) -{ - GCONF.log_disk_size = 1024 * 1024 * 1024ul * 3; - int64_t log_disk_size = 0; - int64_t log_disk_percentage = 0; - EXPECT_EQ(OB_SUCCESS, observer::ObServerUtils::get_log_disk_info_in_config( - log_disk_size, - log_disk_percentage)); - GCONF.log_disk_size = 10000000 * share::ObUnitResource::UNIT_MIN_LOG_DISK_SIZE; - EXPECT_EQ(OB_SERVER_OUTOF_DISK_SPACE, observer::ObServerUtils::get_log_disk_info_in_config( - log_disk_size, - log_disk_percentage)); - GCONF.log_disk_size = 3 * share::ObUnitResource::UNIT_MIN_LOG_DISK_SIZE; - EXPECT_EQ(false, GCTX.log_block_mgr_->check_space_is_enough_(share::ObUnitResource::UNIT_MIN_LOG_DISK_SIZE - 1)); - { - GCONF.log_disk_size = 3 * share::ObUnitResource::UNIT_MIN_LOG_DISK_SIZE; - sleep(6); - } - EXPECT_EQ(OB_SUCCESS, create_tenant("tt2")); - EXPECT_EQ(false, GCTX.log_block_mgr_->check_space_is_enough_(2*share::ObUnitResource::UNIT_MIN_LOG_DISK_SIZE-1)); - EXPECT_EQ(false, GCTX.log_block_mgr_->check_space_is_enough_(2*share::ObUnitResource::UNIT_MIN_LOG_DISK_SIZE)); - int64_t affected_rows = 0; - std::string succ_sql_str = "ALTER RESOURCE UNIT sys_unit_config LOG_DISK_SIZE='2G'"; - EXPECT_EQ(OB_SUCCESS, exec_write_sql_sys(succ_sql_str.c_str(), affected_rows)); - sleep(2); - // tenant_node_balancer 1 s 运行一次 - EXPECT_EQ(true, GCTX.log_block_mgr_->check_space_is_enough_(2*share::ObUnitResource::UNIT_MIN_LOG_DISK_SIZE)); - bool tenant_exist = false; - EXPECT_EQ(OB_SUCCESS, check_tenant_exist(tenant_exist, "tt2")); - EXPECT_EQ(true, tenant_exist); - EXPECT_EQ(OB_SUCCESS, delete_tenant("tt2")); - int ret = OB_SUCCESS; - while (true == tenant_exist && OB_SUCC(ret)) { - if (OB_FAIL(check_tenant_exist(tenant_exist, "tt2"))) { - SERVER_LOG(WARN, "check_tenant_exist failed", K(ret)); - } - } -} - TEST_F(ObserverExpandShink, test_hidden_sys_tenant) { omt::ObMultiTenant *omt = GCTX.omt_; bool remove_tenant_succ = false; int64_t log_disk_size_in_use = GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_; - EXPECT_EQ(log_disk_size_in_use, GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_); share::TenantUnits units; EXPECT_EQ(OB_SUCCESS, omt->get_tenant_units(units, false)); EXPECT_EQ(false, units.empty()); - bool has_sys_tenant = false; int64_t origin_sys_log_disk_size = 0; int64_t hidden_sys_log_disk_size = 0; for (int i = 0; i < units.count(); i++) { @@ -190,15 +243,20 @@ TEST_F(ObserverExpandShink, test_hidden_sys_tenant) GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_); log_disk_size_in_use = GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_; EXPECT_EQ(OB_SUCCESS, omt->convert_hidden_to_real_sys_tenant(sys_unit_config)); - has_sys_tenant = true; EXPECT_EQ(log_disk_size_in_use-hidden_sys_log_disk_size+sys_unit_config.config_.log_disk_size(), GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_); log_disk_size_in_use = GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_; CLOG_LOG(INFO, "runlin trace after convert_hidden_to_real_sys_tenant", K(log_disk_size_in_use), KPC(GCTX.log_block_mgr_), K(origin_sys_log_disk_size)); - int64_t new_sys_log_disk_size = 2 *1024*1024*1024l +512*1024*1024; - EXPECT_EQ(OB_SUCCESS, omt->update_tenant_log_disk_size(OB_SYS_TENANT_ID, new_sys_log_disk_size)); - EXPECT_EQ(true, has_sys_tenant); + int64_t new_sys_log_disk_size = sys_unit_config.config_.log_disk_size()+512*1024*1024; + omt::ObTenant *tenant = nullptr; + EXPECT_EQ(OB_SUCCESS, omt->get_tenant(OB_SYS_TENANT_ID, tenant)); + share::ObUnitInfoGetter::ObTenantConfig unit_config = tenant->get_unit(); + unit_config.config_.resource_.log_disk_size_ = new_sys_log_disk_size; + // 扩容直接成功 + EXPECT_EQ(OB_SUCCESS, omt->update_tenant_unit(unit_config)); + unit_config = tenant->get_unit(); + EXPECT_EQ(unit_config.config_.log_disk_size(), new_sys_log_disk_size); EXPECT_EQ(log_disk_size_in_use-sys_unit_config.config_.log_disk_size()+new_sys_log_disk_size, GCTX.log_block_mgr_->min_log_disk_size_for_all_tenants_); CLOG_LOG(INFO, "runlin trace after convert_real_to_hidden_sys_tenant", K(log_disk_size_in_use), KPC(GCTX.log_block_mgr_), @@ -206,6 +264,7 @@ TEST_F(ObserverExpandShink, test_hidden_sys_tenant) } +// don't has any case after this. TEST_F(ObserverExpandShink, paralle_set) { omt::ObTenantNodeBalancer::get_instance().refresh_interval_ = 1000 * 1000 * 1000; diff --git a/src/logservice/ob_log_service.cpp b/src/logservice/ob_log_service.cpp index cb8d7cced9..0163c7a5b4 100644 --- a/src/logservice/ob_log_service.cpp +++ b/src/logservice/ob_log_service.cpp @@ -495,6 +495,17 @@ int ObLogService::get_palf_disk_usage(int64_t &used_size_byte, int64_t &total_si return ret; } +int ObLogService::get_palf_stable_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else { + ret = palf_env_->get_stable_disk_usage(used_size_byte, total_size_byte); + } + return ret; +} + int ObLogService::update_palf_options_except_disk_usage_limit_size() { omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID())); diff --git a/src/logservice/ob_log_service.h b/src/logservice/ob_log_service.h index 9f171ca108..c81e0800f7 100644 --- a/src/logservice/ob_log_service.h +++ b/src/logservice/ob_log_service.h @@ -143,7 +143,17 @@ public: int update_replayable_point(const share::SCN &replayable_point); int get_replayable_point(share::SCN &replayable_point); + + // @brief get palf disk usage + // @param [out] used_size_byte + // @param [out] total_size_byte, if in shrinking status, total_size_byte is the value after shrinking. + // NB: total_size_byte may be smaller than used_size_byte. int get_palf_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte); + + // @brief get palf disk usage + // @param [out] used_size_byte + // @param [out] total_size_byte, if in shrinking status, total_size_byte is the value before shrinking. + int get_palf_stable_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte); // why we need update 'log_disk_size_' and 'log_disk_util_threshold' separately. // // 'log_disk_size' is a member of unit config. diff --git a/src/logservice/palf/palf_env.cpp b/src/logservice/palf/palf_env.cpp index 92e8453b91..888f9a856e 100644 --- a/src/logservice/palf/palf_env.cpp +++ b/src/logservice/palf/palf_env.cpp @@ -156,6 +156,11 @@ int PalfEnv::get_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte) return palf_env_impl_.get_disk_usage(used_size_byte, total_size_byte); } +int PalfEnv::get_stable_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte) +{ + return palf_env_impl_.get_stable_disk_usage(used_size_byte, total_size_byte); +} + int PalfEnv::get_options(PalfOptions &options) { return palf_env_impl_.get_options(options); diff --git a/src/logservice/palf/palf_env.h b/src/logservice/palf/palf_env.h index 493fa977ee..fb433c9494 100644 --- a/src/logservice/palf/palf_env.h +++ b/src/logservice/palf/palf_env.h @@ -86,8 +86,15 @@ public: // @brief get palf disk usage // @param [out] used_size_byte - // @param [out] total_size_byte + // @param [out] total_size_byte, if in shrinking status, total_size_byte is the value after shrinking. + // NB: total_size_byte may be smaller than used_size_byte. int get_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte); + + // @brief get stable disk usage + // @param [out] used_size_byte + // @param [out] total_size_byte, if in shrinking status, total_size_byte is the value before shrinking. + int get_stable_disk_usage(int64_t &used_size_byte, int64_t &total_size_byte); + // @brief update options // @param [in] options int update_options(const PalfOptions &options); diff --git a/src/logservice/palf/palf_env_impl.cpp b/src/logservice/palf/palf_env_impl.cpp index 29c9cce47e..4240b08460 100644 --- a/src/logservice/palf/palf_env_impl.cpp +++ b/src/logservice/palf/palf_env_impl.cpp @@ -78,6 +78,7 @@ int PalfDiskOptionsWrapper::init(const PalfDiskOptions &disk_opts) disk_opts_for_recycling_blocks_ = disk_opts_for_stopping_writing_ = disk_opts; status_ = Status::NORMAL_STATUS; cur_unrecyclable_log_disk_size_ = 0; + sequence_ = 0; } return ret; } @@ -89,6 +90,7 @@ void PalfDiskOptionsWrapper::reset() disk_opts_for_stopping_writing_.reset(); status_ = Status::INVALID_STATUS; cur_unrecyclable_log_disk_size_ = -1; + sequence_ = -1; } int PalfDiskOptionsWrapper::update_disk_options(const PalfDiskOptions &disk_opts_for_recycling_blocks) @@ -112,40 +114,53 @@ bool PalfDiskOptionsWrapper::need_throttling() const return disk_opts_for_stopping_writing_.is_valid() && cur_unrecyclable_log_disk_size_ > trigger_size; } -void PalfDiskOptionsWrapper::change_to_normal() +// Concurrent analysis +// BlockGCThread ConfigChangeThread +// T1 get_disk_options +// T2 shrink log_disk when status is SHRINKING_STATUS, +// make disk_opts_for_recycling_blocks to new PalfDiskOptions. +// T3 change disk_opts_for_stopping_writing for disk_opts_for_recycling_blocks +// and make status to NORMAL_STATUS +// This will cause write-stop, therefore, we only change status to NORMAL when sequence is same. +// And we only update sequence when PalfDiskOptions has change. +void PalfDiskOptionsWrapper::change_to_normal(const int64_t sequence) { ObSpinLockGuard guard(disk_opts_lock_); - status_ = Status::NORMAL_STATUS; - disk_opts_for_stopping_writing_ = disk_opts_for_recycling_blocks_; - PALF_LOG(INFO, "change_to_normal", KPC(this)); + if (sequence_ == sequence && Status::SHRINKING_STATUS == status_) { + status_ = Status::NORMAL_STATUS; + disk_opts_for_stopping_writing_ = disk_opts_for_recycling_blocks_; + PALF_LOG(INFO, "change_to_normal", KPC(this)); + } else { + PALF_LOG(INFO, "sequence has changed or status not match", KPC(this), K(sequence)); + } } int PalfDiskOptionsWrapper::update_disk_options_not_guarded_by_lock_(const PalfDiskOptions &disk_opts_for_recycling_blocks) { int ret = OB_SUCCESS; int64_t curr_stop_write_limit_size = - disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ * disk_opts_for_stopping_writing_.log_disk_utilization_limit_threshold_; + disk_opts_for_stopping_writing_.log_disk_usage_limit_size_; int64_t next_stop_write_limit_size = - disk_opts_for_recycling_blocks.log_disk_usage_limit_size_ * disk_opts_for_recycling_blocks.log_disk_utilization_limit_threshold_; + disk_opts_for_recycling_blocks.log_disk_usage_limit_size_; if (false == disk_opts_for_recycling_blocks.is_valid()) { ret = OB_INVALID_ARGUMENT; + } else if (disk_opts_for_recycling_blocks_ == disk_opts_for_recycling_blocks) { + PALF_LOG(INFO, "no need update disk options", K(ret), K(disk_opts_for_recycling_blocks_), K(disk_opts_for_recycling_blocks)); } else { - if (Status::SHRINKING_STATUS == status_) { - ret = OB_STATE_NOT_MATCH; - PALF_LOG(WARN, "don't support shrink log disk concurrently", K(ret), KPC(this)); - } else if (disk_opts_for_recycling_blocks_ == disk_opts_for_recycling_blocks) { - PALF_LOG(INFO, "no need update disk options", K(ret), K(disk_opts_for_recycling_blocks_), K(disk_opts_for_recycling_blocks)); - } else if (curr_stop_write_limit_size > next_stop_write_limit_size) { + if (curr_stop_write_limit_size > next_stop_write_limit_size) { status_ = Status::SHRINKING_STATUS; // In process of shrinking, to avoid stopping writing, // 'disk_opts_for_stopping_writing_' is still an original value, update it // with 'disk_opts_for_recycling_blocks' until there is no possibility // caused stopping writing. disk_opts_for_recycling_blocks_ = disk_opts_for_recycling_blocks; + PALF_LOG(INFO, "shrink log disk success", K(curr_stop_write_limit_size), K(next_stop_write_limit_size), + KPC(this)); } else { status_ = Status::NORMAL_STATUS; disk_opts_for_recycling_blocks_ = disk_opts_for_stopping_writing_ = disk_opts_for_recycling_blocks; - PALF_LOG(INFO, "update_disk_options_not_guarded_by_lock_ success", K(curr_stop_write_limit_size), K(next_stop_write_limit_size)); + PALF_LOG(INFO, "expand log disk success", K(curr_stop_write_limit_size), K(next_stop_write_limit_size), + KPC(this)); } //always update writing_throttling_trigger_percentage_ const int64_t new_trigger_percentage = disk_opts_for_recycling_blocks.log_disk_throttling_percentage_; @@ -155,6 +170,7 @@ int PalfDiskOptionsWrapper::update_disk_options_not_guarded_by_lock_(const PalfD disk_opts_for_recycling_blocks_.log_disk_throttling_maximum_duration_ = new_maximum_duration; disk_opts_for_stopping_writing_.log_disk_throttling_maximum_duration_ = new_maximum_duration; + sequence_++; } return ret; } @@ -497,7 +513,6 @@ int PalfEnvImpl::scan_all_palf_handle_impl_director_() { int ret = OB_SUCCESS; ObTimeGuard guard("PalfEnvImplStart", 0); - // TODO by runlin: how to avoid modify 'log_disk_usage_limit_size_' after restart? ReloadPalfHandleImplFunctor functor(this); if (OB_FAIL(scan_dir(log_dir_, functor))) { PALF_LOG(WARN, "scan_dir failed", K(ret)); @@ -658,9 +673,11 @@ int PalfEnvImpl::try_recycle_blocks() PalfDiskOptions disk_opts_for_stopping_writing; PalfDiskOptions disk_opts_for_recycling_blocks; PalfDiskOptionsWrapper::Status status = PalfDiskOptionsWrapper::Status::INVALID_STATUS; + int64_t sequence = -1; disk_options_wrapper_.get_disk_opts(disk_opts_for_stopping_writing, disk_opts_for_recycling_blocks, - status); + status, + sequence); int64_t total_used_size_byte = 0; int64_t total_unrecyclable_size_byte = 0; int64_t total_size_to_recycle_blocks = disk_opts_for_recycling_blocks.log_disk_usage_limit_size_; @@ -700,7 +717,7 @@ int PalfEnvImpl::try_recycle_blocks() int64_t oldest_palf_id = INVALID_PALF_ID; if (OB_SUCC(ret) && PalfDiskOptionsWrapper::Status::SHRINKING_STATUS == status) { if (total_used_size_byte < usable_disk_size_to_recycle_blocks) { - disk_options_wrapper_.change_to_normal(); + disk_options_wrapper_.change_to_normal(sequence); PALF_LOG(INFO, "change_to_normal success", K(disk_options_wrapper_), K(total_used_size_byte), K(usable_disk_size_to_recycle_blocks)); } @@ -726,7 +743,7 @@ int PalfEnvImpl::try_recycle_blocks() LOG_DBA_ERROR(OB_LOG_OUTOF_DISK_SPACE, "msg", "log disk space is almost full", "ret", tmp_ret, "total_size(MB)", disk_opts_for_recycling_blocks.log_disk_usage_limit_size_/MB, "used_size(MB)", total_used_size_byte/MB, - "used_percent(%)", (total_used_size_byte* 100) / (disk_opts_for_recycling_blocks.log_disk_usage_limit_size_ + 1), + "used_percent(%)", (total_used_size_byte* 100) / (disk_opts_for_stopping_writing.log_disk_usage_limit_size_ + 1), "warn_size(MB)", (total_size_to_recycle_blocks*disk_opts_for_recycling_blocks.log_disk_utilization_threshold_)/100/MB, "warn_percent(%)", disk_opts_for_recycling_blocks.log_disk_utilization_threshold_, "limit_size(MB)", (total_size_to_recycle_blocks*disk_opts_for_recycling_blocks.log_disk_utilization_limit_threshold_)/100/MB, @@ -846,6 +863,20 @@ int PalfEnvImpl::get_disk_usage(int64_t &used_size_byte, int64_t &total_usable_s return ret; } +int PalfEnvImpl::get_stable_disk_usage(int64_t &used_size_byte, int64_t &total_usable_size_byte) +{ + int ret = OB_SUCCESS; + constexpr int64_t MB = 1024 * 1024; + PalfDiskOptions disk_options = disk_options_wrapper_.get_disk_opts_for_stopping_writing(); + if (OB_FAIL(get_disk_usage_(used_size_byte))) { + PALF_LOG(WARN, "get_disk_usage_ failed", K(ret)); + } else { + total_usable_size_byte = disk_options.log_disk_usage_limit_size_; + PALF_LOG(INFO, "get_stable_disk_usage", K(ret), "capacity(MB):", total_usable_size_byte/MB, "used(MB):", used_size_byte/MB); + } + return ret; +} + int PalfEnvImpl::update_options(const PalfOptions &options) { int ret = OB_SUCCESS; diff --git a/src/logservice/palf/palf_env_impl.h b/src/logservice/palf/palf_env_impl.h index 312209225b..90f5922285 100644 --- a/src/logservice/palf/palf_env_impl.h +++ b/src/logservice/palf/palf_env_impl.h @@ -83,10 +83,14 @@ public: MAX_STATUS = 3 }; + static bool is_shrinking(const Status &status) + { return Status::SHRINKING_STATUS == status; } + PalfDiskOptionsWrapper() : disk_opts_for_stopping_writing_(), disk_opts_for_recycling_blocks_(), status_(Status::INVALID_STATUS), cur_unrecyclable_log_disk_size_(0), + sequence_(-1), disk_opts_lock_(common::ObLatchIds::PALF_ENV_LOCK) {} ~PalfDiskOptionsWrapper() { reset(); } @@ -94,7 +98,7 @@ public: void reset(); int update_disk_options(const PalfDiskOptions &disk_opts); - void change_to_normal(); + void change_to_normal(const int64_t sequence); const PalfDiskOptions& get_disk_opts_for_stopping_writing() const { ObSpinLockGuard guard(disk_opts_lock_); @@ -110,12 +114,14 @@ public: void get_disk_opts(PalfDiskOptions &disk_opts_for_stopping_writing, PalfDiskOptions &disk_opts_for_recycling_blocks, - Status &status) const + Status &status, + int64_t &sequence) const { ObSpinLockGuard guard(disk_opts_lock_); disk_opts_for_stopping_writing = disk_opts_for_stopping_writing_; disk_opts_for_recycling_blocks = disk_opts_for_recycling_blocks_; status = status_; + sequence = sequence_; } void get_throttling_options(PalfThrottleOptions &options) @@ -131,11 +137,20 @@ public: bool is_shrinking() const { ObSpinLockGuard guard(disk_opts_lock_); - return Status::SHRINKING_STATUS == status_; + return is_shrinking(status_); + } + + void set_stop_writing_disk_options_with_status(const PalfDiskOptions &new_opts, + const Status &status) + { + ObSpinLockGuard guard(disk_opts_lock_); + status_ = status; + disk_opts_for_stopping_writing_ = new_opts; } static constexpr int64_t MB = 1024*1024ll; TO_STRING_KV(K_(disk_opts_for_stopping_writing), K_(disk_opts_for_recycling_blocks), K_(status), - "cur_unrecyclable_log_disk_size(MB)", cur_unrecyclable_log_disk_size_/MB); + "cur_unrecyclable_log_disk_size(MB)", cur_unrecyclable_log_disk_size_/MB, + K_(sequence)); private: int update_disk_options_not_guarded_by_lock_(const PalfDiskOptions &new_opts); @@ -148,6 +163,7 @@ private: PalfDiskOptions disk_opts_for_recycling_blocks_; Status status_; int64_t cur_unrecyclable_log_disk_size_; + int64_t sequence_; mutable ObSpinLock disk_opts_lock_; }; @@ -238,6 +254,7 @@ public: int try_recycle_blocks(); bool check_disk_space_enough() override final; int get_disk_usage(int64_t &used_size_byte, int64_t &total_usable_size_byte); + int get_stable_disk_usage(int64_t &used_size_byte, int64_t &total_usable_size_byte); int update_options(const PalfOptions &options); int get_options(PalfOptions &options); int64_t get_rebuild_replica_log_lag_threshold() const diff --git a/src/logservice/palf/palf_options.cpp b/src/logservice/palf/palf_options.cpp index d2b00535b0..564f8dd615 100644 --- a/src/logservice/palf/palf_options.cpp +++ b/src/logservice/palf/palf_options.cpp @@ -67,6 +67,11 @@ bool PalfDiskOptions::operator==(const PalfDiskOptions &palf_disk_options) const && log_writer_parallelism_ == palf_disk_options.log_writer_parallelism_; } +bool PalfDiskOptions::operator!=(const PalfDiskOptions &palf_disk_options) const +{ + return !this->operator==(palf_disk_options); +} + PalfDiskOptions &PalfDiskOptions::operator=(const PalfDiskOptions &other) { log_disk_usage_limit_size_ = other.log_disk_usage_limit_size_; diff --git a/src/logservice/palf/palf_options.h b/src/logservice/palf/palf_options.h index 758114bad6..bd2d972aac 100644 --- a/src/logservice/palf/palf_options.h +++ b/src/logservice/palf/palf_options.h @@ -39,6 +39,7 @@ struct PalfDiskOptions void reset(); bool is_valid() const; bool operator==(const PalfDiskOptions &rhs) const; + bool operator!=(const PalfDiskOptions &rhs) const; PalfDiskOptions &operator=(const PalfDiskOptions &other); int64_t log_disk_usage_limit_size_; int log_disk_utilization_threshold_; diff --git a/src/observer/omt/ob_multi_tenant.cpp b/src/observer/omt/ob_multi_tenant.cpp index 5a71928136..c9515e0e62 100644 --- a/src/observer/omt/ob_multi_tenant.cpp +++ b/src/observer/omt/ob_multi_tenant.cpp @@ -1003,7 +1003,9 @@ int ObMultiTenant::update_tenant_unit_no_lock(const ObUnitInfoGetter::ObTenantCo const double max_cpu = static_cast(unit.config_.max_cpu()); const uint64_t tenant_id = unit.tenant_id_; int64_t allowed_mem_limit = 0; - + ObUnitInfoGetter::ObTenantConfig allowed_new_unit; + ObUnitInfoGetter::ObTenantConfig old_unit; + int64_t allowed_new_log_disk_size = 0; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); @@ -1012,13 +1014,27 @@ int ObMultiTenant::update_tenant_unit_no_lock(const ObUnitInfoGetter::ObTenantCo } else if (OB_ISNULL(tenant)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("tenant is nullptr", K(tenant_id)); - } else if (OB_FAIL(write_update_tenant_unit_slog(unit))) { + } else if (OB_FAIL(old_unit.assign(tenant->get_unit()))) { + LOG_ERROR("fail to assign old unit failed", K(tenant_id), K(unit)); + } else if (OB_FAIL(update_tenant_memory(tenant_id, unit.config_.memory_size(), allowed_mem_limit))) { + LOG_WARN("fail to update tenant memory", K(ret), K(tenant_id)); + } else if (OB_FAIL(update_tenant_log_disk_size(tenant_id, + unit.config_.log_disk_size(), + old_unit.config_.log_disk_size(), + allowed_new_log_disk_size))) { + LOG_WARN("fail to update tenant log disk size", K(ret), K(tenant_id)); + } else if (OB_FAIL(construct_allowed_unit_config(allowed_new_log_disk_size, + unit, + allowed_new_unit))) { + LOG_WARN("fail to construct_allowed_unit_config", K(allowed_new_log_disk_size), + K(allowed_new_unit)); + } else if (OB_FAIL(write_update_tenant_unit_slog(allowed_new_unit))) { LOG_WARN("fail to write tenant meta slog", K(ret), K(tenant_id)); - } else if (OB_FAIL(tenant->update_thread_cnt(max_cpu))) { + } else if (OB_FAIL(update_tenant_freezer_mem_limit(tenant_id, unit.config_.memory_size(), allowed_mem_limit))) { + LOG_WARN("fail to update_tenant_freezer_mem_limit", K(ret), K(tenant_id)); + } else if (OB_FAIL(tenant->update_thread_cnt(max_cpu))) { LOG_WARN("fail to update mtl module thread_cnt", K(ret), K(tenant_id)); - } else if (OB_FAIL(update_tenant_log_disk_size(tenant_id, unit.config_.log_disk_size()))) { - LOG_WARN("fail to update tenant log disk size", K(ret), K(tenant_id)); - } else if (FALSE_IT(tenant->set_unit_memory_size(allowed_mem_limit))) { + } else if (FALSE_IT(tenant->set_unit_memory_size(unit.config_.memory_size()))) { // unreachable } else { if (tenant->unit_min_cpu() != min_cpu) { @@ -1027,7 +1043,7 @@ int ObMultiTenant::update_tenant_unit_no_lock(const ObUnitInfoGetter::ObTenantCo if (tenant->unit_max_cpu() != max_cpu) { tenant->set_unit_max_cpu(max_cpu); } - tenant->set_tenant_unit(unit); + tenant->set_tenant_unit(allowed_new_unit); LOG_INFO("succecc to set tenant unit config", K(unit), K(allowed_mem_limit)); } @@ -1047,6 +1063,34 @@ int ObMultiTenant::update_tenant_memory(const ObUnitInfoGetter::ObTenantConfig & return ret; } +int ObMultiTenant::construct_allowed_unit_config(const int64_t allowed_new_log_disk_size, + const ObUnitInfoGetter::ObTenantConfig &expected_unit_config, + ObUnitInfoGetter::ObTenantConfig &allowed_new_unit) +{ + int ret = OB_SUCCESS; + if (0 >= allowed_new_log_disk_size + || !expected_unit_config.is_valid()) { + ret= OB_INVALID_ARGUMENT; + } else if (OB_FAIL(allowed_new_unit.assign(expected_unit_config))) { + LOG_ERROR("fail to assign new unit", K(allowed_new_log_disk_size), K(expected_unit_config)); + } else { + // construct allowed resource. + ObUnitResource allowed_resource( + expected_unit_config.config_.max_cpu(), + expected_unit_config.config_.min_cpu(), + expected_unit_config.config_.memory_size(), + allowed_new_log_disk_size, + expected_unit_config.config_.max_iops(), + expected_unit_config.config_.min_iops(), + expected_unit_config.config_.iops_weight()); + if (OB_FAIL(allowed_new_unit.config_.update_unit_resource(allowed_resource))) { + LOG_WARN("update_unit_resource failed", K(allowed_new_log_disk_size), K(allowed_new_unit), + K(allowed_resource)); + } + } + return ret; +} + int ObMultiTenant::update_tenant_unit(const ObUnitInfoGetter::ObTenantConfig &unit) { int ret = OB_SUCCESS; @@ -1110,33 +1154,104 @@ int ObMultiTenant::update_tenant_memory(const uint64_t tenant_id, const int64_t } int ObMultiTenant::update_tenant_log_disk_size(const uint64_t tenant_id, - const int64_t expected_log_disk_size) + const int64_t expected_log_disk_size, + const int64_t old_log_disk_size, + int64_t &allowed_new_log_disk_size) { int ret = OB_SUCCESS; + int64_t used_log_disk_size = 0, palf_log_disk_size = 0; + bool can_update_log_disk_size_with_expected_log_disk = false; + // 'expected_log_disk_size' is the latest log disk size record in __all_unit_config. + // 'old_log_disk_size' is current log disk size in ObTenant. + // 'allowed_new_log_disk_size' is current allowed log disk size when update log disk. + // + // To avoid overselling, we can not use 'expected_log_disk_size' to update unit config which will save in slog. + // therefore, we need constuct a virtual log disk size which named with 'allowed_new_log_disk_size'. MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); - int64_t old_log_disk_size = 0; - int64_t unused_log_disk_size = 0; if (OB_SUCC(guard.switch_to(tenant_id))) { ObLogService *log_service = MTL(ObLogService *); if (OB_ISNULL(log_service)) { ret = OB_ERR_UNEXPECTED; - } else if (OB_FAIL(log_service->get_palf_disk_usage(unused_log_disk_size, old_log_disk_size))) { - LOG_WARN("failed to get_palf_disk_usage", K(ret), K(unused_log_disk_size), K(old_log_disk_size), - K(expected_log_disk_size)); - } else if(OB_FAIL(GCTX.log_block_mgr_->update_tenant(old_log_disk_size, expected_log_disk_size))) { - LOG_WARN("failed to update_tenant in ObServerLogBlockMgr", K(ret), K(old_log_disk_size), - K(expected_log_disk_size)); + } else if (OB_FAIL(log_service->get_palf_stable_disk_usage(used_log_disk_size, palf_log_disk_size))) { + LOG_WARN("fail to get_palf_stable_disk_usage", K(tenant_id), K(old_log_disk_size), K(expected_log_disk_size)); + // The standard for determing whether it's in shrinking or expanding status: + // 1. If 'palf_log_disk_size' is smaller than or equal to 'expected_log_disk_size', it's in expanding status. + // 2. If 'palf_log_disk_size' is greater than 'expected_log_disk_size', it's in shrinking status. + // + // For shrinking log disk, we don't update ObTenantConfig of ObTenant to new ObTenantConfig until shrinking successfully. + // + // NB: All fields of new ObTenantConfig except log_disk_size has been updated in case of shrinking log disk. + // + // For example: + // 1. before shrinkg log disk successfully, and then expand log disk. + // - At T1 timestamp, the original log disk is 100G, and update it to 50G, we will construct 'new_unit' with + // 100G, but update palf with 50G because original log disk size is greater than new log disk size. + // - At T2 timestamp, the log disk size in current ObTenantConfig is 100G, and we update it to 80G, there are + // two scenarios: + // 1. if 'palf_log_disk_size' which get from palf is 100G, we think palf is still in shrinking status. and we will + // construct 'new_unit' with 100G because 'palf_log_disk_size' is greater than new log disk size(80G). but udpate + // palf with 80G + // 2. if 'palf_log_disk_size' which get from palf is 50G, we think palf has been in normal status. and we will + // construct 'new_unit' with 80G because 'palf_log_disk_size' is smaller than new log disk size(80G), but udpate + // palf with 80G. + } else if (FALSE_IT(can_update_log_disk_size_with_expected_log_disk = (expected_log_disk_size >= palf_log_disk_size))) { + // For expanding log disk, we can update 'allowed_new_log_disk_size' to 'expected_log_disk_size' directlly. + } else if (can_update_log_disk_size_with_expected_log_disk && FALSE_IT(allowed_new_log_disk_size = expected_log_disk_size)) { + // For shrinking log disk, we still update log disk size of 'new_unit' to 'old_log_disk_size'. + } else if (!can_update_log_disk_size_with_expected_log_disk && FALSE_IT(allowed_new_log_disk_size = old_log_disk_size)) { + // case 1: for shrinking log disk, shrinking log disk from 100G to 50G. + // - At T1 timestamp, 'expected_log_disk_size' is 50G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 100G. + // the log disk size record in slog is 100G, and we will update log disk size used for palf to 50G, but not update log + // disk which has assigned in ObServerLogBlockMGR. + // - At T2 timestamp, 'expected_log_disk_size' is still 50G, 'old_log_disk_size' is still 100G, however, 'allowed_new_log_disk_size' + // is 50G because of shrinking log disk has been successfully, the log disk record in slog is 50G, and then we will update log disk + // size used for palf to 50G again but has no effect, log disk assigned in ObServerLogBlockMGR update to 50G(assume there is only one tenant). + // + // case 2: for expanding log disk, expanding log disk from 100G to 150G. + // - At T1 timestamp, 'expected_log_disk_size' is 150G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 150G. + // the log disk size record in slog is 150G, and then, we will update log disk size used for palf to 150G, the log disk + // which has assigned in ObServerLogBlockMGR updaet to 150G(assume there is only one tenant). + // + // case 3: for shrinking log disk, shrinking log disk from 100G to 50G, and then shrinking log disk from 50G to 25G. + // - At T1 timestamp, 'expected_log_disk_size' is 50G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 100G. + // the log disk size record in slog is 100G, and we will update log disk size used for palf to 50G, but not update log + // disk which has assigned in ObServerLogBlockMGR. + // - At T2 timestamp, 'expected_log_disk_size' is 25G, 'old_log_disk_size' is still 100G, however, there are two possibility value for + // 'allowed_new_log_disk_size': + // 1. the value is 100G because of last shrinking log disk has not been successfully, the log disk record in slog is still 100G + // and then we will update log disk size used for palf to 25G, but not update log disk assigned in ObServerLogBlockMGR. + // At T3 timestamp, 'expected_log_disk_size' is 25G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 25G, + // the log disk record in slog is 25G, the log disk assigned in ObServerLogBlockMGR is 25G. + // 2. the value is 50G because of last shrinking log disk has been successfully, the log disk record in slog is 50G + // and then we will update log disk size used for palf to 25G, update log disk assigned in ObServerLogBlockMGR to 50G(assume there is only one tenant). + // At T3 timestamp, 'expected_log_disk_size' is 25G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 25G, + // the log disk record in slog is 25G, the log disk assigned in ObServerLogBlockMGR is 25G. + // + // case 4: for shrinking log disk, shrinking log disk from 100G to 50G, and then expanding log disk from 50G to 80G. + // - At T1 timestamp, 'expected_log_disk_size' is 50G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 100G. + // the log disk size record in slog is 100G, and we will update log disk size used for palf to 50G, but not update log + // disk which has assigned in ObServerLogBlockMGR. + // - At T2 timestamp, 'expected_log_disk_size' is 80G, 'old_log_disk_size' is still 100G, however, there are two possibility value for + // 'allowed_new_log_disk_size': + // 1. the value is 100G because of last shrinking log disk has not been successfully, the log disk record in slog is still 100G + // and then we will update log disk size used for palf to 80G, but not update log disk assigned in ObServerLogBlockMGR. + // At T3 timestamp, 'expected_log_disk_size' is 80G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 80G, + // the log disk record in slog is 80G, the log disk assigned in ObServerLogBlockMGR is 80G. + // 2. the value is 80G because of last shrinking log disk has been successfully, the log disk record in slog is 80G + // and then we will update log disk size used for palf to 80G, update log disk assigned in ObServerLogBlockMGR to 80G(assume there is only one tenant). + // At T3 timestamp, 'expected_log_disk_size' is 25G, 'old_log_disk_size' is 100G, 'allowed_new_log_disk_size' is 25G, + // the log disk record in slog is 25G, the log disk assigned in ObServerLogBlockMGR is 25G. + // + } else if (OB_FAIL(GCTX.log_block_mgr_->update_tenant(old_log_disk_size, allowed_new_log_disk_size))) { + LOG_WARN("failed to update teannt int ObServerLogBlockMGR", K(ret), K(tenant_id), K(expected_log_disk_size), + K(old_log_disk_size), K(allowed_new_log_disk_size)); + } else if (OB_FAIL(log_service->update_log_disk_usage_limit_size(expected_log_disk_size))) { + LOG_WARN("failed to update_log_disk_usage_limit_size", K(ret), K(tenant_id), K(expected_log_disk_size), + K(old_log_disk_size), K(allowed_new_log_disk_size)); + GCTX.log_block_mgr_->abort_update_tenant(allowed_new_log_disk_size, old_log_disk_size); } else { - if (OB_FAIL(log_service->update_log_disk_usage_limit_size(expected_log_disk_size))) { - LOG_WARN("failed to update_log_disk_usage_limit_size", K(ret), K(tenant_id), - K(expected_log_disk_size)); - } else { - LOG_INFO("update_tenant_log_disk_size success", K(ret), K(tenant_id), - K(expected_log_disk_size)); - } - if (false == is_virtual_tenant_id(tenant_id) && OB_FAIL(ret)) { - GCTX.log_block_mgr_->abort_update_tenant(expected_log_disk_size, old_log_disk_size); - } + LOG_INFO("update_log_disk_usage_limit_size success", K(ret), K(tenant_id), K(expected_log_disk_size), + K(old_log_disk_size), K(allowed_new_log_disk_size)); } } return ret; diff --git a/src/observer/omt/ob_multi_tenant.h b/src/observer/omt/ob_multi_tenant.h index 834698cc4d..dfe0ca8b1f 100644 --- a/src/observer/omt/ob_multi_tenant.h +++ b/src/observer/omt/ob_multi_tenant.h @@ -112,7 +112,9 @@ public: int update_tenant_memory(const uint64_t tenant_id, const int64_t mem_limit, int64_t &allowed_mem_limit); int update_tenant_memory(const share::ObUnitInfoGetter::ObTenantConfig &unit); int update_tenant_log_disk_size(const uint64_t tenant_id, - const int64_t expected_log_disk_size); + const int64_t expected_log_disk_size, + const int64_t old_log_disk_size, + int64_t &allowed_log_disk_size); int modify_tenant_io(const uint64_t tenant_id, const share::ObUnitConfig &unit_config); int update_tenant_config(uint64_t tenant_id); int update_palf_config(); @@ -180,6 +182,9 @@ protected: int remove_tenant(const uint64_t tenant_id, bool &remove_tenant_succ); uint32_t get_tenant_lock_bucket_idx(const uint64_t tenant_id); int update_tenant_unit_no_lock(const share::ObUnitInfoGetter::ObTenantConfig &unit); + int construct_allowed_unit_config(const int64_t allowed_log_disk_size, + const share::ObUnitInfoGetter::ObTenantConfig &expected_unit_config, + share::ObUnitInfoGetter::ObTenantConfig &allowed_unit); protected: static const int DEL_TRY_TIMES = 30; diff --git a/src/storage/ob_disk_usage_reporter.cpp b/src/storage/ob_disk_usage_reporter.cpp index 3e7e9cf555..418801dbb9 100644 --- a/src/storage/ob_disk_usage_reporter.cpp +++ b/src/storage/ob_disk_usage_reporter.cpp @@ -301,7 +301,7 @@ int ObDiskUsageReportTask::count_tenant_clog(const uint64_t tenant_id) if (OB_ISNULL(log_svr = MTL(ObLogService*))) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "log service is null", K(ret), KP(log_svr)); - } else if (OB_FAIL(log_svr->get_palf_disk_usage(clog_space, size_limit))) { + } else if (OB_FAIL(log_svr->get_palf_stable_disk_usage(clog_space, size_limit))) { STORAGE_LOG(WARN, "failed to get the disk space that clog used", K(ret)); } else { report_key.file_type_ = ObDiskReportFileType::OB_DISK_REPORT_TENANT_CLOG_DATA; diff --git a/src/storage/tx_storage/ob_checkpoint_service.cpp b/src/storage/tx_storage/ob_checkpoint_service.cpp index b0407620f7..184fb1221f 100644 --- a/src/storage/tx_storage/ob_checkpoint_service.cpp +++ b/src/storage/tx_storage/ob_checkpoint_service.cpp @@ -22,7 +22,6 @@ #include "storage/tx_storage/ob_tenant_freezer.h" #include "logservice/ob_log_service.h" #include "logservice/palf/log_define.h" -#include "logservice/palf/palf_env.h" #include "logservice/palf/lsn.h" #include "logservice/archiveservice/ob_archive_service.h" #include "storage/tx_storage/ob_ls_handle.h" @@ -203,17 +202,13 @@ bool ObCheckPointService::get_disk_usage_threshold_(int64_t &threshold) bool get_disk_usage_threshold_success = false; // avod clog disk full logservice::ObLogService *log_service = nullptr; - PalfEnv *palf_env = nullptr; if (OB_ISNULL(log_service = MTL(logservice::ObLogService *))) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "get_log_service failed", K(ret)); - } else if (OB_ISNULL(palf_env = log_service->get_palf_env())) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "get_palf_env failed", K(ret)); } else { int64_t used_size = 0; int64_t total_size = 0; - if (OB_FAIL(palf_env->get_disk_usage(used_size, total_size))) { + if (OB_FAIL(log_service->get_palf_disk_usage(used_size, total_size))) { STORAGE_LOG(WARN, "get_disk_usage failed", K(ret), K(used_size), K(total_size)); } else { threshold = total_size * NEED_FLUSH_CLOG_DISK_PERCENT / 100;