add refresh io calibration timetask at observer start
This commit is contained in:
@ -197,6 +197,7 @@ ObServer::ObServer()
|
||||
refresh_active_time_task_(),
|
||||
refresh_network_speed_task_(),
|
||||
refresh_cpu_frequency_task_(),
|
||||
refresh_io_calibration_task_(),
|
||||
schema_status_proxy_(sql_proxy_),
|
||||
is_log_dir_empty_(false),
|
||||
conn_res_mgr_(),
|
||||
@ -437,6 +438,8 @@ int ObServer::init(const ObServerOptions &opts, const ObPLogWriterCfg &log_cfg)
|
||||
LOG_ERROR("init refresh network speed task failed", KR(ret));
|
||||
} else if (OB_FAIL(init_refresh_cpu_frequency())) {
|
||||
LOG_ERROR("init refresh cpu frequency failed", KR(ret));
|
||||
} else if (OB_FAIL(init_refresh_io_calibration())) {
|
||||
LOG_ERROR("init refresh io calibration failed", KR(ret));
|
||||
} else if (OB_FAIL(ObOptStatManager::get_instance().init(
|
||||
&sql_proxy_, &config_))) {
|
||||
LOG_ERROR("init opt stat manager failed", KR(ret));
|
||||
@ -3334,6 +3337,62 @@ int ObServer::refresh_network_speed()
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObServer::ObRefreshIOCalibrationTimeTask::ObRefreshIOCalibrationTimeTask()
|
||||
: obs_(nullptr), tg_id_(-1), is_inited_(false)
|
||||
{}
|
||||
|
||||
int ObServer::ObRefreshIOCalibrationTimeTask::init(ObServer *obs, int tg_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_ERROR("ObRefreshIOCalibrationTimeTask has already been inited", KR(ret));
|
||||
} else if (OB_ISNULL(obs)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("ObRefreshIOCalibrationTimeTask init with null ptr", KR(ret), K(obs));
|
||||
} else {
|
||||
obs_ = obs;
|
||||
tg_id_ = tg_id;
|
||||
is_inited_ = true;
|
||||
if (OB_FAIL(TG_SCHEDULE(tg_id_, *this, REFRESH_INTERVAL, true /*schedule repeatly*/))) {
|
||||
LOG_ERROR("fail to schedule task ObRefreshIOCalibrationTimeTask", KR(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObServer::ObRefreshIOCalibrationTimeTask::destroy()
|
||||
{
|
||||
is_inited_ = false;
|
||||
tg_id_ = -1;
|
||||
obs_ = nullptr;
|
||||
}
|
||||
|
||||
void ObServer::ObRefreshIOCalibrationTimeTask::runTimerTask()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_ERROR("ObRefreshIOCalibrationTimeTask has not been inited", KR(ret));
|
||||
} else if (OB_ISNULL(obs_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("ObRefreshIOCalibrationTimeTask task got null ptr", KR(ret));
|
||||
} else if (OB_FAIL(obs_->refresh_io_calibration())) {
|
||||
LOG_WARN("ObRefreshIOCalibrationTimeTask task failed", KR(ret));
|
||||
} else {
|
||||
TG_CANCEL(tg_id_, *this);
|
||||
}
|
||||
}
|
||||
|
||||
int ObServer::refresh_io_calibration()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(ObIOCalibration::get_instance().read_from_table())) {
|
||||
LOG_WARN("fail to refresh io calibration from table", KR(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObServer::init_refresh_active_time_task()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -3388,6 +3447,15 @@ int ObServer::init_refresh_cpu_frequency()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObServer::init_refresh_io_calibration()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(refresh_io_calibration_task_.init(this, lib::TGDefIDs::ServerGTimer))) {
|
||||
LOG_ERROR("fail to init refresh io calibration task", KR(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
// @@Query cleanup rules for built tables and temporary tables:
|
||||
//1, Traverse all table_schema, if the session_id of table T <> 0 means that the table is being created or the previous creation failed or the temporary table is to be cleared, then enter 2#;
|
||||
//2, Create a table for the query: traverse the session, and determine whether T should be DROP according to the session_id and time of the session and table T;
|
||||
|
||||
@ -183,6 +183,21 @@ public:
|
||||
bool is_inited_;
|
||||
};
|
||||
|
||||
class ObRefreshIOCalibrationTimeTask: public common::ObTimerTask
|
||||
{
|
||||
public:
|
||||
ObRefreshIOCalibrationTimeTask();
|
||||
virtual ~ObRefreshIOCalibrationTimeTask() {}
|
||||
int init(ObServer *observer, int tg_id);
|
||||
void destroy();
|
||||
virtual void runTimerTask() override;
|
||||
private:
|
||||
const static int64_t REFRESH_INTERVAL = 10 * 1000L * 1000L;//10s
|
||||
ObServer *obs_;
|
||||
int tg_id_;
|
||||
bool is_inited_;
|
||||
};
|
||||
|
||||
class ObRefreshTime {
|
||||
public:
|
||||
explicit ObRefreshTime(ObServer *obs): obs_(obs){}
|
||||
@ -288,6 +303,7 @@ private:
|
||||
int get_network_speed_from_config_file(int64_t &network_speed);
|
||||
int refresh_network_speed();
|
||||
int refresh_cpu_frequency();
|
||||
int refresh_io_calibration();
|
||||
int clean_up_invalid_tables();
|
||||
int clean_up_invalid_tables_by_tenant(const uint64_t tenant_id);
|
||||
int init_ctas_clean_up_task(); //Regularly clean up the residuals related to querying and building tables and temporary tables
|
||||
@ -297,6 +313,7 @@ private:
|
||||
int init_refresh_active_time_task(); //Regularly update the sess_active_time of the temporary table created by the proxy connection sess
|
||||
int init_refresh_network_speed_task();
|
||||
int init_refresh_cpu_frequency();
|
||||
int init_refresh_io_calibration();
|
||||
int set_running_mode();
|
||||
void check_user_tenant_schema_refreshed(const common::ObIArray<uint64_t> &tenant_ids, const int64_t expire_time);
|
||||
void check_log_replay_over(const common::ObIArray<uint64_t> &tenant_ids, const int64_t expire_time);
|
||||
@ -440,6 +457,7 @@ private:
|
||||
ObRefreshTimeTask refresh_active_time_task_; // repeat & no retry
|
||||
ObRefreshNetworkSpeedTask refresh_network_speed_task_; // repeat & no retry
|
||||
ObRefreshCpuFreqTimeTask refresh_cpu_frequency_task_;
|
||||
ObRefreshIOCalibrationTimeTask refresh_io_calibration_task_; // retry to success & no repeat
|
||||
blocksstable::ObStorageEnv storage_env_;
|
||||
share::ObSchemaStatusProxy schema_status_proxy_;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user