diff --git a/deps/oblib/src/lib/utility/utility.cpp b/deps/oblib/src/lib/utility/utility.cpp index 4af34aa4d..0f16e7bab 100644 --- a/deps/oblib/src/lib/utility/utility.cpp +++ b/deps/oblib/src/lib/utility/utility.cpp @@ -576,13 +576,17 @@ int get_ethernet_speed(const ObString& devname, int64_t& speed) int rc = OB_SUCCESS; bool exist = false; char path[OB_MAX_FILE_NAME_LENGTH]; + static int dev_file_exist = 1; if (0 == devname.length()) { _OB_LOG(WARN, "empty devname"); rc = OB_INVALID_ARGUMENT; } else { IGNORE_RETURN snprintf(path, sizeof(path), "/sys/class/net/%.*s", devname.length(), devname.ptr()); if (OB_SUCCESS != (rc = FileDirectoryUtils::is_exists(path, exist)) || !exist) { + if (dev_file_exist) { _OB_LOG(WARN, "path %s not exist", path); + dev_file_exist = 0; + } rc = OB_FILE_NOT_EXIST; } } diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index ee813c336..970ecd4d0 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -142,6 +142,7 @@ ObServer::ObServer() long_ops_task_(), ctas_clean_up_task_(), refresh_active_time_task_(), + refresh_network_speed_task_(), schema_status_proxy_(sql_proxy_), is_log_dir_empty_(false) { @@ -300,6 +301,8 @@ int ObServer::init(const ObServerOptions& opts, const ObPLogWriterCfg& log_cfg) LOG_ERROR("init ctas clean up task fail", K(ret)); } else if (OB_FAIL(init_refresh_active_time_task())) { LOG_ERROR("init refresh active time task fail", K(ret)); + } else if (OB_FAIL(init_refresh_network_speed_task())) { + LOG_ERROR("init refresh network speed task fail", K(ret)); } else if (OB_FAIL(user_col_stat_service_.init(&sql_proxy_, &config_))) { LOG_WARN("init user table column stat service failed"); } else if (OB_FAIL(user_table_stat_service_.init(&sql_proxy_, &ObPartitionService::get_instance(), &config_))) { @@ -1665,32 +1668,6 @@ int ObServer::init_storage() return ret; } -int ObServer::init_bandwidth_throttle() -{ - int ret = OB_SUCCESS; - const int64_t sys_bkgd_net_percentage = config_.sys_bkgd_net_percentage; - - int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = get_ethernet_speed(config_.devname.str(), ethernet_speed_))) { - ethernet_speed_ = DEFAULT_ETHERNET_SPEED; - LOG_WARN("cannot get Ethernet speed, use default", K(tmp_ret), "devname", config_.devname.str()); - } else if (ethernet_speed_ < 0) { - ethernet_speed_ = DEFAULT_ETHERNET_SPEED; - LOG_WARN("get invalid Ethernet speed, use default", "devname", config_.devname.str()); - } - - if (OB_SUCC(ret)) { - int64_t rate = ethernet_speed_ * sys_bkgd_net_percentage / 100; - - if (OB_FAIL(bandwidth_throttle_.init(rate))) { - LOG_WARN("failed to init bandwidth throttle", K(ret), K(rate), K(ethernet_speed_)); - } else { - LOG_INFO("succeed to init_bandwidth_throttle", K(sys_bkgd_net_percentage_), K(ethernet_speed_), K(rate)); - } - } - return ret; -} - int ObServer::init_gc_partition_adapter() { int ret = OB_SUCCESS; @@ -1702,36 +1679,219 @@ int ObServer::init_gc_partition_adapter() return ret; } +int ObServer::get_network_speed_from_sysfs(int64_t &network_speed) +{ + int ret = OB_SUCCESS; + // sys_bkgd_net_percentage_ = config_.sys_bkgd_net_percentage; + + int tmp_ret = OB_SUCCESS; + if (OB_FAIL(get_ethernet_speed(config_.devname.str(), network_speed))) { + LOG_WARN("cannot get Ethernet speed, use default", K(tmp_ret), "devname", config_.devname.str()); + } else if (network_speed < 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid Ethernet speed, use default", "devname", config_.devname.str()); + } + + return ret; +} + +char* strtrim(char* str) +{ + char* ptr; + + if (str == NULL) { + return NULL; + } + + ptr = str + strlen(str) - 1; + while (isspace(*str)) { + str++; + } + + while ((ptr > str) && isspace(*ptr)) { + *ptr-- = '\0'; + } + + return str; +} + +static int64_t nic_rate_parse(const char *str, bool &valid) +{ + char *p_unit = nullptr; + int64_t value = 0; + + if (OB_ISNULL(str) || '\0' == str[0]) { + valid = false; + } else { + valid = true; + value = strtol(str, &p_unit, 0); + p_unit = strtrim(p_unit); + + if (OB_ISNULL(p_unit)) { + valid = false; + } else if (value <= 0) { + valid = false; + } else if (0 == STRCASECMP("bit", p_unit) + || 0 == STRCASECMP("b", p_unit)) { + // do nothing + } else if (0 == STRCASECMP("kbit", p_unit) + || 0 == STRCASECMP("kb", p_unit) + || 0 == STRCASECMP("k", p_unit)) { + value <<= 10; + } else if ('\0' == *p_unit + || 0 == STRCASECMP("mbit", p_unit) + || 0 == STRCASECMP("mb", p_unit) + || 0 == STRCASECMP("m", p_unit)) { + // default is meta bit + value <<= 20; + } else if (0 == STRCASECMP("gbit", p_unit) + || 0 == STRCASECMP("gb", p_unit) + || 0 == STRCASECMP("g", p_unit)) { + value <<= 30; + } else { + valid = false; + LOG_ERROR("parse nic rate error", K(str), K(p_unit)); + } + } + return value; +} + +int ObServer::get_network_speed_from_config_file(int64_t &network_speed) +{ + int ret = OB_SUCCESS; + const char *nic_rate_path = "etc/nic.rate.config"; + const int64_t MAX_NIC_CONFIG_FILE_SIZE = 1 << 10; // 1KB + FILE *fp = nullptr; + char *buf = nullptr; + static int nic_rate_file_exist = 1; + + if (OB_ISNULL(buf = static_cast(ob_malloc(MAX_NIC_CONFIG_FILE_SIZE + 1, + ObModIds::OB_BUFFER)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_ERROR("alloc buffer failed", LITERAL_K(MAX_NIC_CONFIG_FILE_SIZE), K(ret)); + } else if (OB_ISNULL(fp = fopen(nic_rate_path, "r"))) { + if (ENOENT == errno) { + ret = OB_FILE_NOT_EXIST; + if (nic_rate_file_exist) { + LOG_WARN("NIC Config file doesn't exist, auto detecting", K(nic_rate_path), K(ret)); + nic_rate_file_exist = 0; + } + } else { + ret = OB_IO_ERROR; + if (EAGAIN == errno) { + LOG_WARN("Can't open NIC Config file", K(nic_rate_path), K(errno), K(ret)); + } else { + LOG_ERROR("Can't open NIC Config file", K(nic_rate_path), K(errno), K(ret)); + } + } + } else { + if (!nic_rate_file_exist) { + LOG_INFO("Reading NIC Config file", K(nic_rate_path)); + nic_rate_file_exist = 1; + } + memset(buf, 0, MAX_NIC_CONFIG_FILE_SIZE + 1); + fread(buf, 1, MAX_NIC_CONFIG_FILE_SIZE, fp); + char *prate = nullptr; + + if (OB_UNLIKELY(0 != ferror(fp))) { + ret = OB_IO_ERROR; + LOG_ERROR("Read NIC Config file error", K(nic_rate_path), K(ret)); + } else if (OB_UNLIKELY(0 == feof(fp))) { + ret = OB_BUF_NOT_ENOUGH; + LOG_ERROR("NIC Config file is too long", K(nic_rate_path), K(ret)); + } else { + prate = strchr(buf, '='); + if (nullptr != prate) { + prate++; + bool valid = false; + int64_t nic_rate = nic_rate_parse(prate, valid); + if (valid) { + network_speed = nic_rate / 8; + } else { + ret = OB_INVALID_ARGUMENT; + LOG_ERROR("invalid NIC Rate Config", K(ret)); + } + } else { + ret = OB_INVALID_ARGUMENT; + LOG_ERROR("invalid NIC Config file", K(ret)); + } + } // else + + if (OB_UNLIKELY(0 != fclose(fp))) { + ret = OB_IO_ERROR; + LOG_ERROR("Close NIC Config file failed", K(ret)); + } + } // else + if (OB_LIKELY(nullptr != buf)) { + ob_free(buf); + buf = nullptr; + } + return ret; +} + +int ObServer::init_bandwidth_throttle() +{ + int ret = OB_SUCCESS; + int64_t network_speed = 0; + + if (OB_SUCC(get_network_speed_from_config_file(network_speed))) { + LOG_DEBUG("got network speed from config file", K(network_speed)); + } else if (OB_SUCC(get_network_speed_from_sysfs(network_speed))) { + LOG_DEBUG("got network speed from sysfs", K(network_speed)); + } else { + network_speed = DEFAULT_ETHERNET_SPEED; + LOG_DEBUG("using default network speed", K(network_speed)); + } + + sys_bkgd_net_percentage_ = config_.sys_bkgd_net_percentage; + if (network_speed > 0) { + int64_t rate = network_speed * sys_bkgd_net_percentage_ / 100; + if (OB_FAIL(bandwidth_throttle_.init(rate))) { + LOG_WARN("failed to init bandwidth throttle", K(ret), K(rate), K(network_speed)); + } else { + LOG_INFO("succeed to init_bandwidth_throttle", + K(sys_bkgd_net_percentage_), + K(network_speed), + K(rate)); + ethernet_speed_ = network_speed; + } + } + return ret; +} + int ObServer::reload_config() { int ret = OB_SUCCESS; - if (OB_FAIL(reload_bandwidth_throttle_limit())) { + if (OB_FAIL(reload_bandwidth_throttle_limit(ethernet_speed_))) { LOG_WARN("failed to reload_bandwidth_throttle_limit", K(ret)); } return ret; } -int ObServer::reload_bandwidth_throttle_limit() +int ObServer::reload_bandwidth_throttle_limit(int64_t network_speed) { int ret = OB_SUCCESS; const int64_t sys_bkgd_net_percentage = config_.sys_bkgd_net_percentage; - if (OB_SUCC(ret) && sys_bkgd_net_percentage != sys_bkgd_net_percentage_) { - int64_t rate = ethernet_speed_ * sys_bkgd_net_percentage / 100; + if ((sys_bkgd_net_percentage_ != sys_bkgd_net_percentage) || (ethernet_speed_ != network_speed)) { + if (network_speed <= 0) { + LOG_WARN("wrong network speed.", K(ethernet_speed_)); + network_speed = DEFAULT_ETHERNET_SPEED; + } + int64_t rate = network_speed * sys_bkgd_net_percentage / 100; if (OB_FAIL(bandwidth_throttle_.set_rate(rate))) { LOG_WARN("failed to reset bandwidth throttle", K(ret), K(rate), K(ethernet_speed_)); } else { - sys_bkgd_net_percentage_ = sys_bkgd_net_percentage; LOG_INFO("succeed to reload_bandwidth_throttle_limit", - "old_percentage", - sys_bkgd_net_percentage_, - "new_percentage", - sys_bkgd_net_percentage, - K(ethernet_speed_), + "old_percentage", sys_bkgd_net_percentage_, + "new_percentage", sys_bkgd_net_percentage, + K(network_speed), K(rate)); + sys_bkgd_net_percentage_ = sys_bkgd_net_percentage; + ethernet_speed_ = network_speed; } } return ret; @@ -1957,6 +2117,8 @@ void ObServer::ObRefreshTimeTask::runTimerTask() } else if (OB_FAIL(obs_->refresh_temp_table_sess_active_time())) { LOG_WARN("ObRefreshTimeTask clean up task failed", K(ret)); } + + LOG_WARN("LICQ, ObRefreshTimeTask::runTimerTask", K(ret)); } int ObServer::refresh_temp_table_sess_active_time() @@ -1972,6 +2134,73 @@ int ObServer::refresh_temp_table_sess_active_time() return ret; } +ObServer::ObRefreshNetworkSpeedTask::ObRefreshNetworkSpeedTask() +: obs_(nullptr), is_inited_(false) +{} + +int ObServer::ObRefreshNetworkSpeedTask::init(ObServer *obs, int tg_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + LOG_WARN("ObRefreshNetworkSpeedTask has already been inited", K(ret)); + } else if (OB_ISNULL(obs)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ObRefreshNetworkSpeedTask init with null ptr", K(ret), K(obs)); + } else { + obs_ = obs; + is_inited_ = true; + if (OB_FAIL(TG_SCHEDULE(tg_id, *this, REFRESH_INTERVAL, true /*schedule repeatly*/))) { + LOG_WARN("fail to schedule task ObRefreshNetworkSpeedTask", K(ret)); + } + } + return ret; +} + +void ObServer::ObRefreshNetworkSpeedTask::destroy() +{ + is_inited_ = false; + obs_ = nullptr; +} + +void ObServer::ObRefreshNetworkSpeedTask::runTimerTask() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("ObRefreshNetworkSpeedTask has not been inited", K(ret)); + } else if (OB_ISNULL(obs_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ObRefreshNetworkSpeedTask cleanup task got null ptr", K(ret)); + } else if (OB_FAIL(obs_->refresh_network_speed())) { + LOG_WARN("ObRefreshNetworkSpeedTask reload bandwidth throttle limit failed", K(ret)); + } +} + +int ObServer::refresh_network_speed() +{ + int ret = OB_SUCCESS; + int64_t network_speed = 0; + + if (OB_SUCC(get_network_speed_from_config_file(network_speed))) { + LOG_DEBUG("got network speed from config file", K(network_speed)); + } else if (OB_SUCC(get_network_speed_from_sysfs(network_speed))) { + LOG_DEBUG("got network speed from sysfs", K(network_speed)); + } else { + network_speed = DEFAULT_ETHERNET_SPEED; + LOG_DEBUG("using default network speed", K(network_speed)); + } + + if ((network_speed > 0) && (network_speed != ethernet_speed_)) { + LOG_INFO("network speed changed", "from", ethernet_speed_, "to", network_speed); + if (OB_FAIL(reload_bandwidth_throttle_limit(network_speed))) { + LOG_WARN("ObRefreshNetworkSpeedTask reload bandwidth throttle limit failed", K(ret)); + } + } + + return ret; +} + int ObServer::init_refresh_active_time_task() { int ret = OB_SUCCESS; @@ -1990,6 +2219,15 @@ int ObServer::init_ctas_clean_up_task() return ret; } +int ObServer::init_refresh_network_speed_task() +{ + int ret = OB_SUCCESS; + if (OB_FAIL(refresh_network_speed_task_.init(this, lib::TGDefIDs::ServerGTimer))) { + LOG_WARN("fail to init refresh network speed task", K(ret)); + } + return ret; +} + // @@Query cleanup rules for built tables and temporary tables: // 1, Traverse all table_schema, if the session_id of table T <> 0 means that the table is being created or the previous // creation failed or the temporary table is to be cleared, then enter 2#; 2, Create a table for the query: traverse the diff --git a/src/observer/ob_server.h b/src/observer/ob_server.h index dd250f297..af61e545a 100644 --- a/src/observer/ob_server.h +++ b/src/observer/ob_server.h @@ -108,7 +108,22 @@ public: bool is_inited_; }; - class ObRefreshTime { + class ObRefreshNetworkSpeedTask: public common::ObTimerTask + { + public: + ObRefreshNetworkSpeedTask(); + virtual ~ObRefreshNetworkSpeedTask() {} + int init(ObServer *observer, int tg_id); + void destroy(); + virtual void runTimerTask() override; + private: + const static int64_t REFRESH_INTERVAL = 1L * 1000L * 1000L;//1hr + ObServer *obs_; + bool is_inited_; + }; + + +class ObRefreshTime { public: explicit ObRefreshTime(ObServer* obs) : obs_(obs) {} @@ -257,10 +272,13 @@ private: int wait_gts(); int init_gts_cache_mgr(); int init_storage(); - int init_bandwidth_throttle(); int init_gc_partition_adapter(); - int reload_bandwidth_throttle_limit(); int init_loaddata_global_stat(); + int init_bandwidth_throttle(); + int reload_bandwidth_throttle_limit(int64_t network_speed); + int get_network_speed_from_sysfs(int64_t &network_speed); + int get_network_speed_from_config_file(int64_t &network_speed); + int refresh_network_speed(); int clean_up_invalid_tables(); int init_ctas_clean_up_task(); // Regularly clean up the residuals related to querying and building tables and @@ -268,6 +286,7 @@ private: int refresh_temp_table_sess_active_time(); int init_refresh_active_time_task(); // Regularly update the sess_active_time of the temporary table created by the // proxy connection sess + int init_refresh_network_speed_task(); int set_running_mode(); int check_server_can_start_service(); @@ -394,6 +413,7 @@ private: storage::ObPurgeCompletedMonitorInfoTask long_ops_task_; ObCTASCleanUpTask ctas_clean_up_task_; // repeat & no retry ObRefreshTimeTask refresh_active_time_task_; // repeat & no retry + ObRefreshNetworkSpeedTask refresh_network_speed_task_; // repeat & no retry blocksstable::ObStorageEnv storage_env_; share::ObSchemaStatusProxy schema_status_proxy_; ObSignalWorker sig_worker_;