diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index aca3ea597c..7689562254 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -59,6 +59,9 @@ void WorkloadGroupListener::handle_topic_info(const std::vector& topi // 4 create and update task scheduler wg->upsert_task_scheduler(&workload_group_info, _exec_env); + // 5 upsert io throttle + wg->upsert_scan_io_throttle(&workload_group_info); + LOG(INFO) << "[topic_publish_wg]update workload group finish, wg info=" << wg->debug_string() << ", enable_cpu_hard_limit=" << (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false") diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index 491e301e8a..4dd04d1558 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -49,6 +49,7 @@ #include "exec/schema_scanner/schema_variables_scanner.h" #include "exec/schema_scanner/schema_views_scanner.h" #include "exec/schema_scanner/schema_workload_group_privileges.h" +#include "exec/schema_scanner/schema_workload_group_resource_usage_scanner.h" #include "exec/schema_scanner/schema_workload_groups_scanner.h" #include "exec/schema_scanner/schema_workload_sched_policy_scanner.h" #include "olap/hll.h" @@ -230,6 +231,8 @@ std::unique_ptr SchemaScanner::create(TSchemaTableType::type type return SchemaFileCacheStatisticsScanner::create_unique(); case TSchemaTableType::SCH_WORKLOAD_GROUP_PRIVILEGES: return SchemaWorkloadGroupPrivilegesScanner::create_unique(); + case TSchemaTableType::SCH_WORKLOAD_GROUP_RESOURCE_USAGE: + return SchemaBackendWorkloadGroupResourceUsage::create_unique(); default: return SchemaDummyScanner::create_unique(); break; diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp index b35e84a9f9..74e95f4203 100644 --- a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp +++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp @@ -27,7 +27,7 @@ namespace doris { std::vector SchemaBackendActiveTasksScanner::_s_tbls_columns = { // name, type, size - {"BE_ID", TYPE_BIGINT, sizeof(StringRef), false}, + {"BE_ID", TYPE_BIGINT, sizeof(int64_t), false}, {"FE_HOST", TYPE_VARCHAR, sizeof(StringRef), false}, {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false}, {"TASK_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false}, diff --git a/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp b/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp new file mode 100644 index 0000000000..ca339044e9 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.cpp @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/schema_scanner/schema_workload_group_resource_usage_scanner.h" + +#include +#include + +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "runtime/workload_group/workload_group_manager.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_factory.hpp" + +namespace doris { +std::vector SchemaBackendWorkloadGroupResourceUsage::_s_tbls_columns = { + // name, type, size + {"BE_ID", TYPE_BIGINT, sizeof(int64_t), false}, + {"WORKLOAD_GROUP_ID", TYPE_BIGINT, sizeof(int64_t), false}, + {"MEMORY_USAGE_BYTES", TYPE_BIGINT, sizeof(int64_t), false}, + {"CPU_USAGE_PERCENT", TYPE_DOUBLE, sizeof(double), false}, + {"LOCAL_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false}, + {"REMOTE_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false}, +}; + +SchemaBackendWorkloadGroupResourceUsage::SchemaBackendWorkloadGroupResourceUsage() + : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_WORKLOAD_GROUP_RESOURCE_USAGE) {} + +SchemaBackendWorkloadGroupResourceUsage::~SchemaBackendWorkloadGroupResourceUsage() {} + +Status SchemaBackendWorkloadGroupResourceUsage::start(RuntimeState* state) { + _block_rows_limit = state->batch_size(); + return Status::OK(); +} + +Status SchemaBackendWorkloadGroupResourceUsage::get_next_block_internal(vectorized::Block* block, + bool* eos) { + if (!_is_init) { + return Status::InternalError("Used before initialized."); + } + + if (nullptr == block || nullptr == eos) { + return Status::InternalError("input pointer is nullptr."); + } + + if (_block == nullptr) { + _block = vectorized::Block::create_unique(); + + for (int i = 0; i < _s_tbls_columns.size(); ++i) { + TypeDescriptor descriptor(_s_tbls_columns[i].type); + auto data_type = + vectorized::DataTypeFactory::instance().create_data_type(descriptor, true); + _block->insert(vectorized::ColumnWithTypeAndName(data_type->create_column(), data_type, + _s_tbls_columns[i].name)); + } + + ExecEnv::GetInstance()->workload_group_mgr()->get_wg_resource_usage(_block.get()); + _total_rows = _block->rows(); + } + + if (_row_idx == _total_rows) { + *eos = true; + return Status::OK(); + } + + int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx); + vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block); + RETURN_IF_ERROR(mblock.add_rows(_block.get(), _row_idx, current_batch_rows)); + _row_idx += current_batch_rows; + + *eos = _row_idx == _total_rows; + return Status::OK(); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.h b/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.h new file mode 100644 index 0000000000..236dd69999 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_workload_group_resource_usage_scanner.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "exec/schema_scanner.h" + +namespace doris { +class RuntimeState; +namespace vectorized { +class Block; +} // namespace vectorized + +class SchemaBackendWorkloadGroupResourceUsage : public SchemaScanner { + ENABLE_FACTORY_CREATOR(SchemaBackendWorkloadGroupResourceUsage); + +public: + SchemaBackendWorkloadGroupResourceUsage(); + ~SchemaBackendWorkloadGroupResourceUsage() override; + + Status start(RuntimeState* state) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; + + static std::vector _s_tbls_columns; + +private: + int _block_rows_limit = 4096; + int _row_idx = 0; + int _total_rows = 0; + std::unique_ptr _block = nullptr; +}; +}; // namespace doris \ No newline at end of file diff --git a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp index 8b0c6be536..dd81a3ecb2 100644 --- a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp @@ -42,6 +42,8 @@ std::vector SchemaWorkloadGroupsScanner::_s_tbls_colu {"SPILL_THRESHOLD_LOW_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true}, {"SPILL_THRESHOLD_HIGH_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true}, {"TAG", TYPE_VARCHAR, sizeof(StringRef), true}, + {"READ_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), true}, + {"REMOTE_READ_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), true}, }; SchemaWorkloadGroupsScanner::SchemaWorkloadGroupsScanner() diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index ca549e51c5..2d9b9e962e 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -28,6 +28,8 @@ #include "common/config.h" #include "common/status.h" #include "runtime/exec_env.h" +#include "runtime/thread_context.h" +#include "runtime/workload_management/io_throttle.h" #include "util/runtime_profile.h" #include "util/threadpool.h" @@ -585,15 +587,19 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, if (UNLIKELY(0 == _len || _offset + _len < off)) { return Status::OK(); } - // [0]: maximum len trying to read, [1] maximum length buffer can provide, [2] actual len buffer has - size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off}); + { - SCOPED_RAW_TIMER(&_statis.copy_time); - memcpy((void*)out, _buf.get() + (off - _offset), read_len); + LIMIT_REMOTE_SCAN_IO(bytes_read); + // [0]: maximum len trying to read, [1] maximum length buffer can provide, [2] actual len buffer has + size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off}); + { + SCOPED_RAW_TIMER(&_statis.copy_time); + memcpy((void*)out, _buf.get() + (off - _offset), read_len); + } + *bytes_read = read_len; + _statis.request_io += 1; + _statis.request_bytes += read_len; } - *bytes_read = read_len; - _statis.request_io += 1; - _statis.request_bytes += read_len; if (off + *bytes_read == _offset + _len) { reset_offset(_offset + _whole_buffer_size); } diff --git a/be/src/io/fs/file_reader.cpp b/be/src/io/fs/file_reader.cpp index 23ab3c44b5..e4ffda2a9c 100644 --- a/be/src/io/fs/file_reader.cpp +++ b/be/src/io/fs/file_reader.cpp @@ -26,6 +26,8 @@ namespace doris { namespace io { +const std::string FileReader::VIRTUAL_REMOTE_DATA_DIR = "virtual_remote_data_dir"; + Status FileReader::read_at(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) { DCHECK(bthread_self() == 0); diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h index 70e49151f5..03828ef28d 100644 --- a/be/src/io/fs/file_reader.h +++ b/be/src/io/fs/file_reader.h @@ -68,6 +68,8 @@ public: FileReader() = default; virtual ~FileReader() = default; + static const std::string VIRTUAL_REMOTE_DATA_DIR; + DISALLOW_COPY_AND_ASSIGN(FileReader); /// If io_ctx is not null, @@ -85,6 +87,8 @@ public: virtual std::shared_ptr fs() const = 0; + virtual const std::string& get_data_dir_path() { return VIRTUAL_REMOTE_DATA_DIR; } + protected: virtual Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) = 0; diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index 23d56d8f2a..263276768b 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -28,6 +28,8 @@ #include "common/logging.h" #include "io/fs/err_utils.h" // #include "io/fs/hdfs_file_system.h" +#include "runtime/thread_context.h" +#include "runtime/workload_management/io_throttle.h" #include "service/backend_options.h" #include "util/doris_metrics.h" #include "util/hdfs_util.h" @@ -97,6 +99,7 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r return Status::OK(); } + LIMIT_REMOTE_SCAN_IO(bytes_read); size_t has_read = 0; while (has_read < bytes_req) { tSize loop_read = hdfsPread(_handle->fs(), _handle->file(), offset + has_read, @@ -152,6 +155,7 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r return Status::OK(); } + LIMIT_REMOTE_SCAN_IO(bytes_read); size_t has_read = 0; while (has_read < bytes_req) { int64_t loop_read = diff --git a/be/src/io/fs/local_file_reader.cpp b/be/src/io/fs/local_file_reader.cpp index 0760dbe9fb..93953eeddd 100644 --- a/be/src/io/fs/local_file_reader.cpp +++ b/be/src/io/fs/local_file_reader.cpp @@ -33,6 +33,10 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/sync_point.h" #include "io/fs/err_utils.h" +#include "olap/olap_common.h" +#include "olap/options.h" +#include "runtime/thread_context.h" +#include "runtime/workload_management/io_throttle.h" #include "util/async_io.h" #include "util/doris_metrics.h" @@ -40,9 +44,57 @@ namespace doris { namespace io { struct IOContext; +std::vector BeConfDataDirReader::be_config_data_dir_list; + +void BeConfDataDirReader::get_data_dir_by_file_path(io::Path* file_path, + std::string* data_dir_arg) { + for (const auto& data_dir_info : be_config_data_dir_list) { + if (data_dir_info.path.size() >= file_path->string().size()) { + continue; + } + if (file_path->string().compare(0, data_dir_info.path.size(), data_dir_info.path) == 0) { + *data_dir_arg = data_dir_info.path; + } + } +} + +void BeConfDataDirReader::init_be_conf_data_dir( + const std::vector& store_paths, + const std::vector& spill_store_paths, + const std::vector& cache_paths) { + for (int i = 0; i < store_paths.size(); i++) { + DataDirInfo data_dir_info; + data_dir_info.path = store_paths[i].path; + data_dir_info.storage_medium = store_paths[i].storage_medium; + data_dir_info.data_dir_type = DataDirType::OLAP_DATA_DIR; + data_dir_info.bvar_name = "local_data_dir_" + std::to_string(i); + be_config_data_dir_list.push_back(data_dir_info); + } + + for (int i = 0; i < spill_store_paths.size(); i++) { + doris::DataDirInfo data_dir_info; + data_dir_info.path = spill_store_paths[i].path; + data_dir_info.storage_medium = spill_store_paths[i].storage_medium; + data_dir_info.data_dir_type = doris::DataDirType::SPILL_DISK_DIR; + data_dir_info.bvar_name = "spill_data_dir_" + std::to_string(i); + be_config_data_dir_list.push_back(data_dir_info); + } + + for (int i = 0; i < cache_paths.size(); i++) { + doris::DataDirInfo data_dir_info; + data_dir_info.path = cache_paths[i].path; + data_dir_info.storage_medium = TStorageMedium::REMOTE_CACHE; + data_dir_info.data_dir_type = doris::DataDirType::DATA_CACHE_DIR; + data_dir_info.bvar_name = "local_cache_dir_" + std::to_string(i); + be_config_data_dir_list.push_back(data_dir_info); + } +} + LocalFileReader::LocalFileReader(Path path, size_t file_size, int fd, std::shared_ptr fs) : _fd(fd), _path(std::move(path)), _file_size(file_size), _fs(std::move(fs)) { + _data_dir_path = ""; + BeConfDataDirReader::get_data_dir_by_file_path(&_path, &_data_dir_path); DorisMetrics::instance()->local_file_open_reading->increment(1); DorisMetrics::instance()->local_file_reader_total->increment(1); } @@ -78,6 +130,8 @@ Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_ bytes_req = std::min(bytes_req, _file_size - offset); *bytes_read = 0; + LIMIT_LOCAL_SCAN_IO(get_data_dir_path(), bytes_read); + while (bytes_req != 0) { auto res = SYNC_POINT_HOOK_RETURN_VALUE(::pread(_fd, to, bytes_req, offset), "LocalFileReader::pread", _fd, to); diff --git a/be/src/io/fs/local_file_reader.h b/be/src/io/fs/local_file_reader.h index 0ada5f230e..67596c276a 100644 --- a/be/src/io/fs/local_file_reader.h +++ b/be/src/io/fs/local_file_reader.h @@ -30,7 +30,23 @@ #include "util/slice.h" namespace doris { + +struct StorePath; +struct DataDirInfo; +struct CachePath; + namespace io { + +struct BeConfDataDirReader { + static std::vector be_config_data_dir_list; + + static void get_data_dir_by_file_path(Path* file_path, std::string* data_dir_arg); + + static void init_be_conf_data_dir(const std::vector& store_paths, + const std::vector& spill_store_paths, + const std::vector& cache_paths); +}; + struct IOContext; class LocalFileReader final : public FileReader { @@ -49,6 +65,8 @@ public: FileSystemSPtr fs() const override { return _fs; } + const std::string& get_data_dir_path() override { return _data_dir_path; } + private: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; @@ -59,6 +77,7 @@ private: size_t _file_size; std::atomic _closed = false; std::shared_ptr _fs; + std::string _data_dir_path; // be conf's data dir path }; } // namespace io diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp index 48955f2eb6..005257c131 100644 --- a/be/src/io/fs/s3_file_reader.cpp +++ b/be/src/io/fs/s3_file_reader.cpp @@ -33,6 +33,8 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "io/fs/err_utils.h" #include "io/fs/s3_common.h" +#include "runtime/thread_context.h" +#include "runtime/workload_management/io_throttle.h" #include "util/bvar_helper.h" #include "util/doris_metrics.h" #include "util/runtime_profile.h" @@ -115,6 +117,8 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s, 8s for each backoff + LIMIT_REMOTE_SCAN_IO(bytes_read); + int total_sleep_time = 0; while (retry_count <= max_retries) { s3_file_reader_read_counter << 1; diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 7e0cf6645d..b6e336722f 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -54,6 +54,12 @@ using TabletUid = UniqueId; enum CompactionType { BASE_COMPACTION = 1, CUMULATIVE_COMPACTION = 2, FULL_COMPACTION = 3 }; +enum DataDirType { + SPILL_DISK_DIR, + OLAP_DATA_DIR, + DATA_CACHE_DIR, +}; + struct DataDirInfo { std::string path; size_t path_hash = 0; @@ -64,6 +70,8 @@ struct DataDirInfo { int64_t trash_used_capacity = 0; bool is_used = false; // whether available mark TStorageMedium::type storage_medium = TStorageMedium::HDD; // Storage medium type: SSD|HDD + DataDirType data_dir_type = DataDirType::OLAP_DATA_DIR; + std::string bvar_name; }; struct PredicateFilterInfo { int type = 0; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 96199f434d..ce1ac137e1 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -226,6 +226,7 @@ Status PipelineTask::execute(bool* eos) { if (cpu_qs) { cpu_qs->add_cpu_nanos(delta_cpu_time); } + query_context()->update_wg_cpu_adder(delta_cpu_time); }}; // The status must be runnable *eos = false; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 6ddb87eaf6..c404c73a07 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -203,7 +203,7 @@ public: ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get(); } Status init_pipeline_task_scheduler(); - void init_file_cache_factory(); + void init_file_cache_factory(std::vector& cache_paths); io::FileCacheFactory* file_cache_factory() { return _file_cache_factory; } UserFunctionCache* user_function_cache() { return _user_function_cache; } FragmentMgr* fragment_mgr() { return _fragment_mgr; } diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index fa2cef6e31..30098c9b61 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -36,6 +36,7 @@ #include "common/status.h" #include "io/cache/block/block_file_cache_factory.h" #include "io/fs/file_meta_cache.h" +#include "io/fs/local_file_reader.h" #include "olap/memtable_memory_limiter.h" #include "olap/olap_define.h" #include "olap/options.h" @@ -212,7 +213,10 @@ Status ExecEnv::_init(const std::vector& store_paths, // so it should be created before all query begin and deleted after all query and daemon thread stoppped _runtime_query_statistics_mgr = new RuntimeQueryStatiticsMgr(); - init_file_cache_factory(); + std::vector cache_paths; + init_file_cache_factory(cache_paths); + doris::io::BeConfDataDirReader::init_be_conf_data_dir(store_paths, spill_store_paths, + cache_paths); _pipeline_tracer_ctx = std::make_unique(); // before query RETURN_IF_ERROR(init_pipeline_task_scheduler()); _workload_group_manager = new WorkloadGroupMgr(); @@ -321,13 +325,12 @@ Status ExecEnv::init_pipeline_task_scheduler() { return Status::OK(); } -void ExecEnv::init_file_cache_factory() { +void ExecEnv::init_file_cache_factory(std::vector& cache_paths) { // Load file cache before starting up daemon threads to make sure StorageEngine is read. if (doris::config::enable_file_cache) { _file_cache_factory = new io::FileCacheFactory(); io::IFileCache::init(); std::unordered_set cache_path_set; - std::vector cache_paths; Status olap_res = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths); if (!olap_res) { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 3d0b2289ba..8c892bf9f7 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -308,6 +308,12 @@ public: // only for file scan node std::map file_scan_range_params_map; + void update_wg_cpu_adder(int64_t delta_cpu_time) { + if (_workload_group != nullptr) { + _workload_group->update_cpu_adder(delta_cpu_time); + } + } + private: TUniqueId _query_id; ExecEnv* _exec_env = nullptr; diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 40b3985dec..84d0ccfe24 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -111,6 +111,40 @@ __VA_ARGS__; \ } while (0) +#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \ + std::shared_ptr iot = nullptr; \ + auto* t_ctx = doris::thread_context(true); \ + if (t_ctx) { \ + iot = t_ctx->get_local_scan_io_throttle(data_dir); \ + } \ + if (iot) { \ + iot->acquire(-1); \ + } \ + Defer defer { \ + [&]() { \ + if (iot) { \ + iot->update_next_io_time(*bytes_read); \ + t_ctx->update_total_local_scan_io_adder(*bytes_read); \ + } \ + } \ + } + +#define LIMIT_REMOTE_SCAN_IO(bytes_read) \ + std::shared_ptr iot = nullptr; \ + if (auto* t_ctx = doris::thread_context(true)) { \ + iot = t_ctx->get_remote_scan_io_throttle(); \ + } \ + if (iot) { \ + iot->acquire(-1); \ + } \ + Defer defer { \ + [&]() { \ + if (iot) { \ + iot->update_next_io_time(*bytes_read); \ + } \ + } \ + } + namespace doris { class ThreadContext; @@ -227,6 +261,26 @@ public: std::weak_ptr workload_group() { return _wg_wptr; } + std::shared_ptr get_local_scan_io_throttle(const std::string& data_dir) { + if (std::shared_ptr wg_ptr = _wg_wptr.lock()) { + return wg_ptr->get_local_scan_io_throttle(data_dir); + } + return nullptr; + } + + std::shared_ptr get_remote_scan_io_throttle() { + if (std::shared_ptr wg_ptr = _wg_wptr.lock()) { + return wg_ptr->get_remote_scan_io_throttle(); + } + return nullptr; + } + + void update_total_local_scan_io_adder(size_t bytes_read) { + if (std::shared_ptr wg_ptr = _wg_wptr.lock()) { + wg_ptr->update_total_local_scan_io_adder(bytes_read); + } + } + int thread_local_handle_count = 0; int skip_memory_check = 0; int skip_large_memory_check = 0; @@ -292,7 +346,7 @@ public: }; // must call create_thread_local_if_not_exits() before use thread_context(). -static ThreadContext* thread_context() { +static ThreadContext* thread_context(bool allow_return_null = false) { if (pthread_context_ptr_init) { // in pthread DCHECK(bthread_self() == 0); @@ -306,6 +360,9 @@ static ThreadContext* thread_context() { DCHECK(bthread_context != nullptr); return bthread_context; } + if (allow_return_null) { + return nullptr; + } // It means that use thread_context() but this thread not attached a query/load using SCOPED_ATTACH_TASK macro. LOG(FATAL) << "__builtin_unreachable, " << doris::memory_orphan_check_msg; __builtin_unreachable(); diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 0a34ada5c7..57a855c9bf 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -27,12 +27,14 @@ #include #include "common/logging.h" +#include "io/fs/local_file_reader.h" #include "olap/storage_engine.h" #include "pipeline/task_queue.h" #include "pipeline/task_scheduler.h" #include "runtime/exec_env.h" #include "runtime/memory/global_memory_arbitrator.h" #include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/workload_management/io_throttle.h" #include "util/mem_info.h" #include "util/parse_util.h" #include "util/runtime_profile.h" @@ -62,7 +64,24 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) _max_remote_scan_thread_num(tg_info.max_remote_scan_thread_num), _min_remote_scan_thread_num(tg_info.min_remote_scan_thread_num), _spill_low_watermark(tg_info.spill_low_watermark), - _spill_high_watermark(tg_info.spill_high_watermark) {} + _spill_high_watermark(tg_info.spill_high_watermark), + _scan_bytes_per_second(tg_info.read_bytes_per_second), + _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second) { + std::vector& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list; + for (const auto& data_dir : data_dir_list) { + _scan_io_throttle_map[data_dir.path] = + std::make_shared(_name, data_dir.bvar_name + "_read_bytes"); + } + _remote_scan_io_throttle = std::make_shared(_name, "remote_read_bytes"); + _mem_used_status = std::make_unique>(_name, "memory_used", 0); + _cpu_usage_adder = std::make_unique>(_name, "cpu_usage_adder"); + _cpu_usage_per_second = std::make_unique>>( + _name, "cpu_usage", _cpu_usage_adder.get(), 10); + _total_local_scan_io_adder = + std::make_unique>(_name, "total_local_read_bytes"); + _total_local_scan_io_per_second = std::make_unique>>( + _name, "total_local_read_bytes_per_second", _total_local_scan_io_adder.get(), 1); +} std::string WorkloadGroup::debug_string() const { std::shared_lock rl {_mutex}; @@ -70,11 +89,13 @@ std::string WorkloadGroup::debug_string() const { "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = " "{}, version = {}, cpu_hard_limit = {}, scan_thread_num = " "{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = {}, " - "spill_low_watermark={}, spill_high_watermark={}, is_shutdown={}, query_num={}]", + "spill_low_watermark={}, spill_high_watermark={}, is_shutdown={}, query_num={}, " + "read_bytes_per_second={}, remote_read_bytes_per_second={}]", _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES), _enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit(), _scan_thread_num, _max_remote_scan_thread_num, _min_remote_scan_thread_num, - _spill_low_watermark, _spill_high_watermark, _is_shutdown, _query_ctxs.size()); + _spill_low_watermark, _spill_high_watermark, _is_shutdown, _query_ctxs.size(), + _scan_bytes_per_second, _remote_scan_bytes_per_second); } void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { @@ -101,6 +122,8 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { _min_remote_scan_thread_num = tg_info.min_remote_scan_thread_num; _spill_low_watermark = tg_info.spill_low_watermark; _spill_high_watermark = tg_info.spill_high_watermark; + _scan_bytes_per_second = tg_info.read_bytes_per_second; + _remote_scan_bytes_per_second = tg_info.remote_read_bytes_per_second; } else { return; } @@ -122,6 +145,7 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots( } } refresh_memory(used_memory); + _mem_used_status->set_value(used_memory); return used_memory; } @@ -368,6 +392,20 @@ Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_g } workload_group_info->spill_high_watermark = spill_high_watermark; + // 14 scan io + int read_bytes_per_second = -1; + if (tworkload_group_info.__isset.read_bytes_per_second) { + read_bytes_per_second = tworkload_group_info.read_bytes_per_second; + } + workload_group_info->read_bytes_per_second = read_bytes_per_second; + + // 15 remote scan io + int remote_read_bytes_per_second = -1; + if (tworkload_group_info.__isset.remote_read_bytes_per_second) { + remote_read_bytes_per_second = tworkload_group_info.remote_read_bytes_per_second; + } + workload_group_info->remote_read_bytes_per_second = remote_read_bytes_per_second; + return Status::OK(); } @@ -537,6 +575,38 @@ std::string WorkloadGroup::thread_debug_info() { return str; } +void WorkloadGroup::upsert_scan_io_throttle(WorkloadGroupInfo* tg_info) { + for (const auto& [key, io_throttle] : _scan_io_throttle_map) { + io_throttle->set_io_bytes_per_second(tg_info->read_bytes_per_second); + } + + _remote_scan_io_throttle->set_io_bytes_per_second(tg_info->remote_read_bytes_per_second); +} + +std::shared_ptr WorkloadGroup::get_local_scan_io_throttle(const std::string& disk_dir) { + auto find_ret = _scan_io_throttle_map.find(disk_dir); + if (find_ret != _scan_io_throttle_map.end()) { + return find_ret->second; + } + return nullptr; +} + +std::shared_ptr WorkloadGroup::get_remote_scan_io_throttle() { + return _remote_scan_io_throttle; +} + +void WorkloadGroup::update_cpu_adder(int64_t delta_cpu_time) { + (*_cpu_usage_adder) << (uint64_t)delta_cpu_time; +} + +void WorkloadGroup::update_total_local_scan_io_adder(size_t scan_bytes) { + (*_total_local_scan_io_adder) << scan_bytes; +} + +int64_t WorkloadGroup::get_remote_scan_bytes_per_second() { + return _remote_scan_io_throttle->get_bvar_io_per_second(); +} + void WorkloadGroup::try_stop_schedulers() { std::shared_lock rlock(_task_sched_lock); if (_task_sched) { diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index a53b7ac657..9ad3d6e62a 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include @@ -40,6 +41,7 @@ class ThreadPool; class ExecEnv; class CgroupCpuCtl; class QueryContext; +class IOThrottle; namespace vectorized { class SimplifiedScanScheduler; @@ -190,6 +192,23 @@ public: std::string thread_debug_info(); + std::shared_ptr get_local_scan_io_throttle(const std::string& disk_dir); + + std::shared_ptr get_remote_scan_io_throttle(); + + void upsert_scan_io_throttle(WorkloadGroupInfo* tg_info); + + void update_cpu_adder(int64_t delta_cpu_time); + + void update_total_local_scan_io_adder(size_t scan_bytes); + + int64_t get_mem_used() { return _mem_used_status->get_value(); } + uint64_t get_cpu_usage() { return _cpu_usage_per_second->get_value(); } + int64_t get_local_scan_bytes_per_second() { + return _total_local_scan_io_per_second->get_value(); + } + int64_t get_remote_scan_bytes_per_second(); + private: mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; @@ -210,6 +229,8 @@ private: std::atomic _min_remote_scan_thread_num; std::atomic _spill_low_watermark; std::atomic _spill_high_watermark; + std::atomic _scan_bytes_per_second {-1}; + std::atomic _remote_scan_bytes_per_second {-1}; // means workload group is mark dropped // new query can not submit @@ -223,6 +244,16 @@ private: std::unique_ptr _scan_task_sched {nullptr}; std::unique_ptr _remote_scan_task_sched {nullptr}; std::unique_ptr _memtable_flush_pool {nullptr}; + + std::map> _scan_io_throttle_map; + std::shared_ptr _remote_scan_io_throttle {nullptr}; + + // bvar metric + std::unique_ptr> _mem_used_status; + std::unique_ptr> _cpu_usage_adder; + std::unique_ptr>> _cpu_usage_per_second; + std::unique_ptr> _total_local_scan_io_adder; + std::unique_ptr>> _total_local_scan_io_per_second; }; using WorkloadGroupPtr = std::shared_ptr; @@ -241,6 +272,8 @@ struct WorkloadGroupInfo { int min_remote_scan_thread_num; int spill_low_watermark; int spill_high_watermark; + int read_bytes_per_second; + int remote_read_bytes_per_second; // log cgroup cpu info uint64_t cgroup_cpu_shares = 0; int cgroup_cpu_hard_limit = 0; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index e9221f67db..dc121895ae 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -27,6 +27,7 @@ #include "util/mem_info.h" #include "util/threadpool.h" #include "util/time.h" +#include "vec/core/block.h" #include "vec/exec/scan/scanner_scheduler.h" namespace doris { @@ -241,6 +242,52 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_ratio() { } } +void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) { + auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = + reinterpret_cast(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast*>(col_ptr)->insert_value( + int_val); + nullable_column->get_null_map_data().emplace_back(0); + }; + + auto insert_double_value = [&](int col_index, double double_val, vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = + reinterpret_cast(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast*>(col_ptr)->insert_value( + double_val); + nullable_column->get_null_map_data().emplace_back(0); + }; + + int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; + int cpu_num = CpuInfo::num_cores(); + cpu_num = cpu_num <= 0 ? 1 : cpu_num; + uint64_t total_cpu_time_ns_per_second = cpu_num * 1000000000ll; + + std::shared_lock r_lock(_group_mutex); + block->reserve(_workload_groups.size()); + for (const auto& [id, wg] : _workload_groups) { + insert_int_value(0, be_id, block); + insert_int_value(1, wg->id(), block); + insert_int_value(2, wg->get_mem_used(), block); + + double cpu_usage_p = + (double)wg->get_cpu_usage() / (double)total_cpu_time_ns_per_second * 100; + cpu_usage_p = std::round(cpu_usage_p * 100.0) / 100.0; + + insert_double_value(3, cpu_usage_p, block); + + insert_int_value(4, wg->get_local_scan_bytes_per_second(), block); + insert_int_value(5, wg->get_remote_scan_bytes_per_second(), block); + } +} + void WorkloadGroupMgr::stop() { for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { iter->second->try_stop_schedulers(); diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index 37539ada8d..4c4e82409d 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -27,6 +27,10 @@ namespace doris { class CgroupCpuCtl; +namespace vectorized { +class Block; +} // namespace vectorized + namespace pipeline { class TaskScheduler; class MultiCoreTaskQueue; @@ -56,6 +60,8 @@ public: void refresh_wg_weighted_memory_ratio(); + void get_wg_resource_usage(vectorized::Block* block); + private: std::shared_mutex _group_mutex; std::unordered_map _workload_groups; diff --git a/be/src/runtime/workload_management/io_throttle.cpp b/be/src/runtime/workload_management/io_throttle.cpp new file mode 100644 index 0000000000..dacfa29012 --- /dev/null +++ b/be/src/runtime/workload_management/io_throttle.cpp @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/workload_management/io_throttle.h" + +#include "util/defer_op.h" +#include "util/time.h" + +namespace doris { + +IOThrottle::IOThrottle(std::string prefix, std::string name) { + _io_adder = std::make_unique>(prefix, name); + _io_adder_per_second = std::make_unique>>( + prefix, name + "_per_second", _io_adder.get(), 1); +} + +bool IOThrottle::acquire(int64_t block_timeout_ms) { + if (_io_bytes_per_second_limit < 0) { + return true; + } + + std::unique_lock w_lock(_mutex); + int64_t current_time = GetCurrentTimeMicros(); + int64_t block_finish_time = block_timeout_ms <= 0 ? 0 : current_time + block_timeout_ms * 1000; + + while (current_time <= _next_io_time_micros) { + if (block_finish_time > 0 && current_time >= block_finish_time) { + return false; + } + wait_condition.wait_for(w_lock, + std::chrono::microseconds(_next_io_time_micros - current_time)); + current_time = GetCurrentTimeMicros(); + } + return true; +} + +bool IOThrottle::try_acquire() { + if (_io_bytes_per_second_limit < 0) { + return true; + } + std::unique_lock w_lock(_mutex); + return GetCurrentTimeMicros() > _next_io_time_micros; +} + +void IOThrottle::update_next_io_time(int64_t io_bytes) { + Defer defer {[&]() { + if (io_bytes > 0) { + (*_io_adder) << io_bytes; + } + }}; + if (_io_bytes_per_second_limit <= 0 || io_bytes <= 0) { + return; + } + int64_t read_bytes_per_second = _io_bytes_per_second_limit; + { + std::unique_lock w_lock(_mutex); + double io_bytes_float = static_cast(io_bytes); + double ret = (io_bytes_float / static_cast(read_bytes_per_second)) * + static_cast(MICROS_PER_SEC); + int64_t current_time = GetCurrentTimeMicros(); + + if (current_time > _next_io_time_micros) { + _next_io_time_micros = current_time; + } + _next_io_time_micros += ret < 1 ? static_cast(1) : static_cast(ret); + } +} + +void IOThrottle::set_io_bytes_per_second(int64_t io_bytes_per_second) { + _io_bytes_per_second_limit = io_bytes_per_second; +} + +}; // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/io_throttle.h b/be/src/runtime/workload_management/io_throttle.h new file mode 100644 index 0000000000..ce62c65d7a --- /dev/null +++ b/be/src/runtime/workload_management/io_throttle.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include +#include +#include + +namespace doris { + +class IOThrottle { +public: + IOThrottle(std::string prefix, std::string name); + + ~IOThrottle() = default; + + bool acquire(int64_t block_timeout_ms); + + // non-block acquire + bool try_acquire(); + + void update_next_io_time(int64_t bytes); + + void set_io_bytes_per_second(int64_t read_bytes_per_second); + + size_t get_bvar_io_per_second() { return _io_adder_per_second->get_value(); } + +private: + std::mutex _mutex; + std::condition_variable wait_condition; + int64_t _next_io_time_micros {0}; + std::atomic _io_bytes_per_second_limit {10485760}; + + // bvar monitor + std::unique_ptr> _io_adder; + std::unique_ptr>> _io_adder_per_second; +}; +}; // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index d8ae052e18..6291505a0d 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -290,4 +290,13 @@ void VScanner::_collect_profile_before_close() { _state->update_num_rows_load_unselected(_counter.num_rows_unselected); } +void VScanner::update_scan_cpu_timer() { + int64_t cpu_time = _cpu_watch.elapsed_time(); + _scan_cpu_timer += cpu_time; + _query_statistics->add_cpu_nanos(cpu_time); + if (_state && _state->get_query_ctx()) { + _state->get_query_ctx()->update_wg_cpu_adder(cpu_time); + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 6e83c05970..b42ec8d99d 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -132,11 +132,7 @@ public: int64_t get_scanner_wait_worker_timer() const { return _scanner_wait_worker_timer; } - void update_scan_cpu_timer() { - int64_t cpu_time = _cpu_watch.elapsed_time(); - _scan_cpu_timer += cpu_time; - _query_statistics->add_cpu_nanos(cpu_time); - } + void update_scan_cpu_timer(); RuntimeState* runtime_state() { return _state; } diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 2982bf8174..c83c66f241 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -115,6 +115,13 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi if (_writer_status.ok()) { while (true) { + ThreadCpuStopWatch cpu_time_stop_watch; + cpu_time_stop_watch.start(); + Defer defer {[&]() { + if (state && state->get_query_ctx()) { + state->get_query_ctx()->update_wg_cpu_adder(cpu_time_stop_watch.elapsed_time()); + } + }}; if (!_eos && _data_queue.empty() && _writer_status.ok()) { std::unique_lock l(_m); while (!_eos && _data_queue.empty() && _writer_status.ok()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java index ee45a9fb7c..e2f618c817 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java @@ -81,7 +81,10 @@ public enum SchemaTableType { SCH_FILE_CACHE_STATISTICS("FILE_CACHE_STATISTICS", "FILE_CACHE_STATISTICS", TSchemaTableType.SCH_FILE_CACHE_STATISTICS), SCH_WORKLOAD_GROUP_PRIVILEGES("WORKLOAD_GROUP_PRIVILEGES", - "WORKLOAD_GROUP_PRIVILEGES", TSchemaTableType.SCH_WORKLOAD_GROUP_PRIVILEGES); + "WORKLOAD_GROUP_PRIVILEGES", TSchemaTableType.SCH_WORKLOAD_GROUP_PRIVILEGES), + + SCH_WORKLOAD_GROUP_RESOURCE_USAGE("WORKLOAD_GROUP_RESOURCE_USAGE", + "WORKLOAD_GROUP_RESOURCE_USAGE", TSchemaTableType.SCH_WORKLOAD_GROUP_RESOURCE_USAGE); private static final String dbName = "INFORMATION_SCHEMA"; private static SelectList fullSelectLists; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index d0c9c58727..6162304a5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -488,6 +488,8 @@ public class SchemaTable extends Table { .column("SPILL_THRESHOLD_LOW_WATERMARK", ScalarType.createVarchar(256)) .column("SPILL_THRESHOLD_HIGH_WATERMARK", ScalarType.createVarchar(256)) .column("TAG", ScalarType.createVarchar(256)) + .column("READ_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT)) + .column("REMOTE_READ_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT)) .build())) .put("processlist", new SchemaTable(SystemIdGenerator.getNextId(), "processlist", TableType.SCHEMA, @@ -532,6 +534,16 @@ public class SchemaTable extends Table { .column("IS_GRANTABLE", ScalarType.createVarchar(IS_GRANTABLE_LEN)) .build()) ) + .put("workload_group_resource_usage", + new SchemaTable(SystemIdGenerator.getNextId(), "workload_group_resource_usage", TableType.SCHEMA, + builder().column("BE_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("WORKLOAD_GROUP_ID", ScalarType.createType(PrimitiveType.BIGINT)) + .column("MEMORY_USAGE_BYTES", ScalarType.createType(PrimitiveType.BIGINT)) + .column("CPU_USAGE_PERCENT", ScalarType.createType(PrimitiveType.DOUBLE)) + .column("LOCAL_SCAN_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT)) + .column("REMOTE_SCAN_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT)) + .build()) + ) .build(); private boolean fetchAllFe = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java index e05fadd81d..cbbb2f6756 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java @@ -65,6 +65,7 @@ public class BackendPartitionedSchemaScanNode extends SchemaScanNode { BEACKEND_ID_COLUMN_SET.add("backend_id"); BACKEND_TABLE.add("backend_active_tasks"); + BACKEND_TABLE.add("workload_group_resource_usage"); BEACKEND_ID_COLUMN_SET.add("be_id"); BACKEND_TABLE.add("file_cache_statistics"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 2c200e425b..cab8045e98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -75,6 +75,10 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { public static final String TAG = "tag"; + public static final String READ_BYTES_PER_SECOND = "read_bytes_per_second"; + + public static final String REMOTE_READ_BYTES_PER_SECOND = "remote_read_bytes_per_second"; + // NOTE(wb): all property is not required, some properties default value is set in be // default value is as followed // cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true @@ -83,7 +87,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM) .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM) .add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK) - .add(TAG).build(); + .add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).build(); public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50; public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80; @@ -394,6 +398,35 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { } } + if (properties.containsKey(READ_BYTES_PER_SECOND)) { + String readBytesVal = properties.get(READ_BYTES_PER_SECOND); + try { + long longVal = Long.parseLong(readBytesVal); + boolean isValidValue = longVal == -1 || longVal > 0; + if (!isValidValue) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new DdlException( + READ_BYTES_PER_SECOND + " should be -1 or an integer value bigger than 0, but input value is " + + readBytesVal); + } + } + + if (properties.containsKey(REMOTE_READ_BYTES_PER_SECOND)) { + String readBytesVal = properties.get(REMOTE_READ_BYTES_PER_SECOND); + try { + long longVal = Long.parseLong(readBytesVal); + boolean isValidValue = longVal == -1 || longVal > 0; + if (!isValidValue) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new DdlException(REMOTE_READ_BYTES_PER_SECOND + + " should be -1 or an integer value bigger than 0, but input value is " + readBytesVal); + } + } + } public long getId() { @@ -484,6 +517,13 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { } else { row.add(val); } + } else if (READ_BYTES_PER_SECOND.equals(key) || REMOTE_READ_BYTES_PER_SECOND.equals(key)) { + String val = properties.get(key); + if (StringUtils.isEmpty(val)) { + row.add("-1"); + } else { + row.add(val); + } } else { row.add(properties.get(key)); } @@ -571,6 +611,16 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { tWorkloadGroupInfo.setTag(tagStr); } + String readBytesPerSecStr = properties.get(READ_BYTES_PER_SECOND); + if (readBytesPerSecStr != null) { + tWorkloadGroupInfo.setReadBytesPerSecond(Long.valueOf(readBytesPerSecStr)); + } + + String remoteReadBytesPerSecStr = properties.get(REMOTE_READ_BYTES_PER_SECOND); + if (remoteReadBytesPerSecStr != null) { + tWorkloadGroupInfo.setRemoteReadBytesPerSecond(Long.valueOf(remoteReadBytesPerSecStr)); + } + TopicInfo topicInfo = new TopicInfo(); topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo); return topicInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index b54114c6bf..272f045f41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -80,6 +80,7 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost .add(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM) .add(WorkloadGroup.SPILL_THRESHOLD_LOW_WATERMARK).add(WorkloadGroup.SPILL_THRESHOLD_HIGH_WATERMARK) .add(WorkloadGroup.TAG) + .add(WorkloadGroup.READ_BYTES_PER_SECOND).add(WorkloadGroup.REMOTE_READ_BYTES_PER_SECOND) .add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM) .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 9fb37242d8..db50b15536 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -491,8 +491,11 @@ public class MetadataGenerator { trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(12))); // spill low watermark trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(13))); // spill high watermark trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(14))); // tag - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(15))); // running query num - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(16))); // waiting query num + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(15)))); // read bytes per second + trow.addToColumnValue( + new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(16)))); // remote read bytes per second + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(17))); // running query num + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(18))); // waiting query num dataBatch.add(trow); } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index b6a6e20450..4d2435029e 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -133,7 +133,8 @@ enum TSchemaTableType { SCH_PROCS_PRIV, SCH_WORKLOAD_POLICY, SCH_FILE_CACHE_STATISTICS, - SCH_WORKLOAD_GROUP_PRIVILEGES; + SCH_WORKLOAD_GROUP_PRIVILEGES, + SCH_WORKLOAD_GROUP_RESOURCE_USAGE; } enum THdfsCompression { diff --git a/regression-test/data/workload_manager_p0/test_curd_wlg.out b/regression-test/data/workload_manager_p0/test_curd_wlg.out index e921746dd9..5d9629aa44 100644 --- a/regression-test/data/workload_manager_p0/test_curd_wlg.out +++ b/regression-test/data/workload_manager_p0/test_curd_wlg.out @@ -6,8 +6,8 @@ 2 -- !show_1 -- -normal 20 50% true 2147483647 0 0 1% 16 -test_group 10 10% true 2147483647 0 0 -1 -1 +normal 20 50% true 2147483647 0 0 1% 16 -1 -1 +test_group 10 10% true 2147483647 0 0 -1 -1 -1 -1 -- !show_del_wg_1 -- normal 20 50% true 2147483647 0 0 1% 16 @@ -46,8 +46,8 @@ test_group 10 11% false 2147483647 0 0 20% -1 2 -- !show_queue -- -normal 20 50% true 2147483647 0 0 1% 16 -test_group 10 11% false 100 0 0 20% -1 +normal 20 50% true 2147483647 0 0 1% 16 -1 -1 +test_group 10 11% false 100 0 0 20% -1 104857600 104857600 -- !select_tvf_1 -- normal 20 50% true 2147483647 0 0 1% 16 diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index ec34b489e0..b272c67c85 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -85,6 +85,8 @@ suite("test_crud_wlg") { sql "alter workload group normal properties ( 'queue_timeout'='0' );" sql "alter workload group normal properties ( 'cpu_hard_limit'='1%' );" sql "alter workload group normal properties ( 'scan_thread_num'='-1' );" + sql "alter workload group normal properties ( 'scan_thread_num'='-1' );" + sql "alter workload group normal properties ( 'remote_read_bytes_per_second'='-1' );" sql "set workload_group=normal;" @@ -119,7 +121,7 @@ suite("test_crud_wlg") { ");" sql "set workload_group=test_group;" - qt_show_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag from information_schema.workload_groups where name in ('normal','test_group') order by name;" + qt_show_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag,read_bytes_per_second,remote_read_bytes_per_second from information_schema.workload_groups where name in ('normal','test_group') order by name;" // test drop workload group sql "create workload group if not exists test_drop_wg properties ('cpu_share'='10')" @@ -189,9 +191,31 @@ suite("test_crud_wlg") { exception "requires a positive integer" } + test { + sql "alter workload group test_group properties('read_bytes_per_second'='0')" + exception "an integer value bigger than" + } + + test { + sql "alter workload group test_group properties('read_bytes_per_second'='-2')" + exception "an integer value bigger than" + } + + test { + sql "alter workload group test_group properties('remote_read_bytes_per_second'='0')" + exception "an integer value bigger than" + } + + test { + sql "alter workload group test_group properties('remote_read_bytes_per_second'='-2')" + exception "an integer value bigger than" + } + sql "alter workload group test_group properties ( 'max_concurrency'='100' );" + sql "alter workload group test_group properties('remote_read_bytes_per_second'='104857600')" + sql "alter workload group test_group properties('read_bytes_per_second'='104857600')" qt_queue_1 """ select count(1) from ${table_name} """ - qt_show_queue "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from information_schema.workload_groups where name in ('normal','test_group') order by name;" + qt_show_queue "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,read_bytes_per_second,remote_read_bytes_per_second from information_schema.workload_groups where name in ('normal','test_group') order by name;" // test create group failed // failed for cpu_share