From c5d973effaa5c78e1883fef62831277ac3a2f565 Mon Sep 17 00:00:00 2001 From: zhjc1124 Date: Fri, 9 Feb 2024 18:47:14 +0000 Subject: [PATCH] add refresh io calibration timetask at observer start --- src/observer/ob_server.cpp | 68 ++++++++++++++++++++++++++++++++++++++ src/observer/ob_server.h | 18 ++++++++++ 2 files changed, 86 insertions(+) diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index 9d87ba280b..6ba3474f52 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -197,6 +197,7 @@ ObServer::ObServer() refresh_active_time_task_(), refresh_network_speed_task_(), refresh_cpu_frequency_task_(), + refresh_io_calibration_task_(), schema_status_proxy_(sql_proxy_), is_log_dir_empty_(false), conn_res_mgr_(), @@ -437,6 +438,8 @@ int ObServer::init(const ObServerOptions &opts, const ObPLogWriterCfg &log_cfg) LOG_ERROR("init refresh network speed task failed", KR(ret)); } else if (OB_FAIL(init_refresh_cpu_frequency())) { LOG_ERROR("init refresh cpu frequency failed", KR(ret)); + } else if (OB_FAIL(init_refresh_io_calibration())) { + LOG_ERROR("init refresh io calibration failed", KR(ret)); } else if (OB_FAIL(ObOptStatManager::get_instance().init( &sql_proxy_, &config_))) { LOG_ERROR("init opt stat manager failed", KR(ret)); @@ -3334,6 +3337,62 @@ int ObServer::refresh_network_speed() return ret; } +ObServer::ObRefreshIOCalibrationTimeTask::ObRefreshIOCalibrationTimeTask() +: obs_(nullptr), tg_id_(-1), is_inited_(false) +{} + +int ObServer::ObRefreshIOCalibrationTimeTask::init(ObServer *obs, int tg_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + LOG_ERROR("ObRefreshIOCalibrationTimeTask has already been inited", KR(ret)); + } else if (OB_ISNULL(obs)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("ObRefreshIOCalibrationTimeTask init with null ptr", KR(ret), K(obs)); + } else { + obs_ = obs; + tg_id_ = tg_id; + is_inited_ = true; + if (OB_FAIL(TG_SCHEDULE(tg_id_, *this, REFRESH_INTERVAL, true /*schedule repeatly*/))) { + LOG_ERROR("fail to schedule task ObRefreshIOCalibrationTimeTask", KR(ret)); + } + } + return ret; +} + +void ObServer::ObRefreshIOCalibrationTimeTask::destroy() +{ + is_inited_ = false; + tg_id_ = -1; + obs_ = nullptr; +} + +void ObServer::ObRefreshIOCalibrationTimeTask::runTimerTask() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_ERROR("ObRefreshIOCalibrationTimeTask has not been inited", KR(ret)); + } else if (OB_ISNULL(obs_)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("ObRefreshIOCalibrationTimeTask task got null ptr", KR(ret)); + } else if (OB_FAIL(obs_->refresh_io_calibration())) { + LOG_WARN("ObRefreshIOCalibrationTimeTask task failed", KR(ret)); + } else { + TG_CANCEL(tg_id_, *this); + } +} + +int ObServer::refresh_io_calibration() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(ObIOCalibration::get_instance().read_from_table())) { + LOG_WARN("fail to refresh io calibration from table", KR(ret)); + } + return ret; +} + int ObServer::init_refresh_active_time_task() { int ret = OB_SUCCESS; @@ -3388,6 +3447,15 @@ int ObServer::init_refresh_cpu_frequency() return ret; } +int ObServer::init_refresh_io_calibration() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(refresh_io_calibration_task_.init(this, lib::TGDefIDs::ServerGTimer))) { + LOG_ERROR("fail to init refresh io calibration task", KR(ret)); + } + return ret; +} + // @@Query cleanup rules for built tables and temporary tables: //1, Traverse all table_schema, if the session_id of table T <> 0 means that the table is being created or the previous creation failed or the temporary table is to be cleared, then enter 2#; //2, Create a table for the query: traverse the session, and determine whether T should be DROP according to the session_id and time of the session and table T; diff --git a/src/observer/ob_server.h b/src/observer/ob_server.h index 67b20258cb..81988b8dc9 100644 --- a/src/observer/ob_server.h +++ b/src/observer/ob_server.h @@ -183,6 +183,21 @@ public: bool is_inited_; }; + class ObRefreshIOCalibrationTimeTask: public common::ObTimerTask + { + public: + ObRefreshIOCalibrationTimeTask(); + virtual ~ObRefreshIOCalibrationTimeTask() {} + int init(ObServer *observer, int tg_id); + void destroy(); + virtual void runTimerTask() override; + private: + const static int64_t REFRESH_INTERVAL = 10 * 1000L * 1000L;//10s + ObServer *obs_; + int tg_id_; + bool is_inited_; + }; + class ObRefreshTime { public: explicit ObRefreshTime(ObServer *obs): obs_(obs){} @@ -288,6 +303,7 @@ private: int get_network_speed_from_config_file(int64_t &network_speed); int refresh_network_speed(); int refresh_cpu_frequency(); + int refresh_io_calibration(); int clean_up_invalid_tables(); int clean_up_invalid_tables_by_tenant(const uint64_t tenant_id); int init_ctas_clean_up_task(); //Regularly clean up the residuals related to querying and building tables and temporary tables @@ -297,6 +313,7 @@ private: int init_refresh_active_time_task(); //Regularly update the sess_active_time of the temporary table created by the proxy connection sess int init_refresh_network_speed_task(); int init_refresh_cpu_frequency(); + int init_refresh_io_calibration(); int set_running_mode(); void check_user_tenant_schema_refreshed(const common::ObIArray &tenant_ids, const int64_t expire_time); void check_log_replay_over(const common::ObIArray &tenant_ids, const int64_t expire_time); @@ -440,6 +457,7 @@ private: ObRefreshTimeTask refresh_active_time_task_; // repeat & no retry ObRefreshNetworkSpeedTask refresh_network_speed_task_; // repeat & no retry ObRefreshCpuFreqTimeTask refresh_cpu_frequency_task_; + ObRefreshIOCalibrationTimeTask refresh_io_calibration_task_; // retry to success & no repeat blocksstable::ObStorageEnv storage_env_; share::ObSchemaStatusProxy schema_status_proxy_;