pick #46200
This commit is contained in:
@ -54,6 +54,7 @@ void BeConfDataDirReader::get_data_dir_by_file_path(io::Path* file_path,
|
||||
}
|
||||
if (file_path->string().compare(0, data_dir_info.path.size(), data_dir_info.path) == 0) {
|
||||
*data_dir_arg = data_dir_info.path;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -67,7 +68,7 @@ void BeConfDataDirReader::init_be_conf_data_dir(
|
||||
data_dir_info.path = store_paths[i].path;
|
||||
data_dir_info.storage_medium = store_paths[i].storage_medium;
|
||||
data_dir_info.data_dir_type = DataDirType::OLAP_DATA_DIR;
|
||||
data_dir_info.bvar_name = "local_data_dir_" + std::to_string(i);
|
||||
data_dir_info.metric_name = "local_data_dir_" + std::to_string(i);
|
||||
be_config_data_dir_list.push_back(data_dir_info);
|
||||
}
|
||||
|
||||
@ -76,7 +77,7 @@ void BeConfDataDirReader::init_be_conf_data_dir(
|
||||
data_dir_info.path = spill_store_paths[i].path;
|
||||
data_dir_info.storage_medium = spill_store_paths[i].storage_medium;
|
||||
data_dir_info.data_dir_type = doris::DataDirType::SPILL_DISK_DIR;
|
||||
data_dir_info.bvar_name = "spill_data_dir_" + std::to_string(i);
|
||||
data_dir_info.metric_name = "spill_data_dir_" + std::to_string(i);
|
||||
be_config_data_dir_list.push_back(data_dir_info);
|
||||
}
|
||||
|
||||
@ -85,9 +86,14 @@ void BeConfDataDirReader::init_be_conf_data_dir(
|
||||
data_dir_info.path = cache_paths[i].path;
|
||||
data_dir_info.storage_medium = TStorageMedium::REMOTE_CACHE;
|
||||
data_dir_info.data_dir_type = doris::DataDirType::DATA_CACHE_DIR;
|
||||
data_dir_info.bvar_name = "local_cache_dir_" + std::to_string(i);
|
||||
data_dir_info.metric_name = "local_cache_dir_" + std::to_string(i);
|
||||
be_config_data_dir_list.push_back(data_dir_info);
|
||||
}
|
||||
|
||||
std::sort(be_config_data_dir_list.begin(), be_config_data_dir_list.end(),
|
||||
[](const DataDirInfo& a, const DataDirInfo& b) {
|
||||
return a.path.length() > b.path.length();
|
||||
});
|
||||
}
|
||||
|
||||
LocalFileReader::LocalFileReader(Path path, size_t file_size, int fd,
|
||||
|
||||
@ -72,7 +72,7 @@ struct DataDirInfo {
|
||||
bool is_used = false; // whether available mark
|
||||
TStorageMedium::type storage_medium = TStorageMedium::HDD; // Storage medium type: SSD|HDD
|
||||
DataDirType data_dir_type = DataDirType::OLAP_DATA_DIR;
|
||||
std::string bvar_name;
|
||||
std::string metric_name;
|
||||
};
|
||||
struct PredicateFilterInfo {
|
||||
int type = 0;
|
||||
|
||||
@ -72,7 +72,7 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_
|
||||
_need_create_query_thread_pool(need_create_query_thread_pool) {
|
||||
std::vector<DataDirInfo>& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list;
|
||||
for (const auto& data_dir : data_dir_list) {
|
||||
_scan_io_throttle_map[data_dir.path] = std::make_shared<IOThrottle>(data_dir.bvar_name);
|
||||
_scan_io_throttle_map[data_dir.path] = std::make_shared<IOThrottle>(data_dir.metric_name);
|
||||
}
|
||||
_remote_scan_io_throttle = std::make_shared<IOThrottle>();
|
||||
|
||||
|
||||
@ -18,6 +18,8 @@
|
||||
#include "runtime/workload_group/workload_group_metrics.h"
|
||||
|
||||
#include "common/config.h"
|
||||
#include "io/fs/local_file_reader.h"
|
||||
#include "olap/olap_common.h"
|
||||
#include "runtime/workload_group/workload_group.h"
|
||||
#include "runtime/workload_management/io_throttle.h"
|
||||
#include "util/doris_metrics.h"
|
||||
@ -31,7 +33,7 @@ WorkloadGroupMetrics::~WorkloadGroupMetrics() {
|
||||
|
||||
WorkloadGroupMetrics::WorkloadGroupMetrics(WorkloadGroup* wg) {
|
||||
_entity = DorisMetrics::instance()->metric_registry()->register_entity(
|
||||
"workload_group." + wg->name(), {{"name", wg->name()}});
|
||||
"workload_group." + std::to_string(wg->id()), {{"workload_group", wg->name()}});
|
||||
|
||||
_cpu_time_metric = std::make_unique<doris::MetricPrototype>(
|
||||
doris::MetricType::COUNTER, doris::MetricUnit::SECONDS, "workload_group_cpu_time_sec");
|
||||
@ -55,13 +57,15 @@ WorkloadGroupMetrics::WorkloadGroupMetrics(WorkloadGroup* wg) {
|
||||
_remote_scan_bytes_counter = (IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>(
|
||||
_remote_scan_bytes_metric.get()));
|
||||
|
||||
for (const auto& [key, io_throttle] : wg->_scan_io_throttle_map) {
|
||||
std::vector<DataDirInfo>& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list;
|
||||
for (const auto& data_dir : data_dir_list) {
|
||||
std::unique_ptr<doris::MetricPrototype> metric = std::make_unique<doris::MetricPrototype>(
|
||||
doris::MetricType::COUNTER, doris::MetricUnit::BYTES,
|
||||
"workload_group_local_scan_bytes_" + io_throttle->metric_name());
|
||||
_local_scan_bytes_counter_map[key] =
|
||||
(IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>(metric.get()));
|
||||
_local_scan_bytes_metric_map[key] = std::move(metric);
|
||||
"workload_group_local_scan_bytes_" + data_dir.metric_name);
|
||||
_local_scan_bytes_counter_map.insert(
|
||||
{data_dir.path,
|
||||
(IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>(metric.get()))});
|
||||
_local_scan_bytes_metric_map.insert({data_dir.path, std::move(metric)});
|
||||
}
|
||||
}
|
||||
|
||||
@ -75,7 +79,10 @@ void WorkloadGroupMetrics::update_memory_used_bytes(int64_t memory_used) {
|
||||
|
||||
void WorkloadGroupMetrics::update_local_scan_io_bytes(std::string path, uint64_t delta_io_bytes) {
|
||||
_local_scan_bytes_counter->increment(delta_io_bytes);
|
||||
_local_scan_bytes_counter_map[path]->increment((int64_t)delta_io_bytes);
|
||||
auto range = _local_scan_bytes_counter_map.equal_range(path);
|
||||
for (auto it = range.first; it != range.second; ++it) {
|
||||
it->second->increment((int64_t)delta_io_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
void WorkloadGroupMetrics::update_remote_scan_io_bytes(uint64_t delta_io_bytes) {
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -62,14 +63,15 @@ private:
|
||||
std::unique_ptr<doris::MetricPrototype> _local_scan_bytes_metric {nullptr};
|
||||
std::unique_ptr<doris::MetricPrototype> _remote_scan_bytes_metric {nullptr};
|
||||
// NOTE: _local_scan_bytes_metric is sum of all disk's IO
|
||||
// _local_disk_io_metric is every disk's IO
|
||||
std::map<std::string, std::unique_ptr<doris::MetricPrototype>> _local_scan_bytes_metric_map;
|
||||
std::unordered_multimap<std::string, std::unique_ptr<doris::MetricPrototype>>
|
||||
_local_scan_bytes_metric_map;
|
||||
|
||||
IntAtomicCounter* _cpu_time_counter {nullptr}; // used for metric
|
||||
IntAtomicCounter* _mem_used_bytes_counter {nullptr}; // used for metric
|
||||
IntAtomicCounter* _local_scan_bytes_counter {nullptr}; // used for metric
|
||||
IntAtomicCounter* _remote_scan_bytes_counter {nullptr}; // used for metric
|
||||
std::map<std::string, IntAtomicCounter*> _local_scan_bytes_counter_map; // used for metric
|
||||
IntAtomicCounter* _cpu_time_counter {nullptr}; // used for metric
|
||||
IntAtomicCounter* _mem_used_bytes_counter {nullptr}; // used for metric
|
||||
IntAtomicCounter* _local_scan_bytes_counter {nullptr}; // used for metric
|
||||
IntAtomicCounter* _remote_scan_bytes_counter {nullptr}; // used for metric
|
||||
std::unordered_multimap<std::string, IntAtomicCounter*>
|
||||
_local_scan_bytes_counter_map; // used for metric
|
||||
|
||||
std::atomic<uint64_t> _cpu_time_nanos {0};
|
||||
std::atomic<uint64_t> _last_cpu_time_nanos {0};
|
||||
|
||||
Reference in New Issue
Block a user