[branch-2.1]Pick IO limit/workload group usage table (#39839)

This commit is contained in:
wangbo
2024-08-23 18:51:47 +08:00
committed by GitHub
parent e716658fba
commit 6ceb574aa0
37 changed files with 753 additions and 32 deletions

View File

@ -59,6 +59,9 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
// 4 create and update task scheduler
wg->upsert_task_scheduler(&workload_group_info, _exec_env);
// 5 upsert io throttle
wg->upsert_scan_io_throttle(&workload_group_info);
LOG(INFO) << "[topic_publish_wg]update workload group finish, wg info="
<< wg->debug_string() << ", enable_cpu_hard_limit="
<< (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false")

View File

@ -49,6 +49,7 @@
#include "exec/schema_scanner/schema_variables_scanner.h"
#include "exec/schema_scanner/schema_views_scanner.h"
#include "exec/schema_scanner/schema_workload_group_privileges.h"
#include "exec/schema_scanner/schema_workload_group_resource_usage_scanner.h"
#include "exec/schema_scanner/schema_workload_groups_scanner.h"
#include "exec/schema_scanner/schema_workload_sched_policy_scanner.h"
#include "olap/hll.h"
@ -230,6 +231,8 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
return SchemaFileCacheStatisticsScanner::create_unique();
case TSchemaTableType::SCH_WORKLOAD_GROUP_PRIVILEGES:
return SchemaWorkloadGroupPrivilegesScanner::create_unique();
case TSchemaTableType::SCH_WORKLOAD_GROUP_RESOURCE_USAGE:
return SchemaBackendWorkloadGroupResourceUsage::create_unique();
default:
return SchemaDummyScanner::create_unique();
break;

View File

@ -27,7 +27,7 @@
namespace doris {
std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_columns = {
// name, type, size
{"BE_ID", TYPE_BIGINT, sizeof(StringRef), false},
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"FE_HOST", TYPE_VARCHAR, sizeof(StringRef), false},
{"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
{"TASK_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},

View File

@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/schema_scanner/schema_workload_group_resource_usage_scanner.h"
#include <iomanip>
#include <iostream>
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_factory.hpp"
namespace doris {
std::vector<SchemaScanner::ColumnDesc> SchemaBackendWorkloadGroupResourceUsage::_s_tbls_columns = {
// name, type, size
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"WORKLOAD_GROUP_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"MEMORY_USAGE_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"CPU_USAGE_PERCENT", TYPE_DOUBLE, sizeof(double), false},
{"LOCAL_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
{"REMOTE_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
};
SchemaBackendWorkloadGroupResourceUsage::SchemaBackendWorkloadGroupResourceUsage()
: SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_WORKLOAD_GROUP_RESOURCE_USAGE) {}
SchemaBackendWorkloadGroupResourceUsage::~SchemaBackendWorkloadGroupResourceUsage() {}
Status SchemaBackendWorkloadGroupResourceUsage::start(RuntimeState* state) {
_block_rows_limit = state->batch_size();
return Status::OK();
}
Status SchemaBackendWorkloadGroupResourceUsage::get_next_block_internal(vectorized::Block* block,
bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
if (nullptr == block || nullptr == eos) {
return Status::InternalError("input pointer is nullptr.");
}
if (_block == nullptr) {
_block = vectorized::Block::create_unique();
for (int i = 0; i < _s_tbls_columns.size(); ++i) {
TypeDescriptor descriptor(_s_tbls_columns[i].type);
auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
_block->insert(vectorized::ColumnWithTypeAndName(data_type->create_column(), data_type,
_s_tbls_columns[i].name));
}
ExecEnv::GetInstance()->workload_group_mgr()->get_wg_resource_usage(_block.get());
_total_rows = _block->rows();
}
if (_row_idx == _total_rows) {
*eos = true;
return Status::OK();
}
int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
RETURN_IF_ERROR(mblock.add_rows(_block.get(), _row_idx, current_batch_rows));
_row_idx += current_batch_rows;
*eos = _row_idx == _total_rows;
return Status::OK();
}
} // namespace doris

View File

@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <vector>
#include "common/status.h"
#include "exec/schema_scanner.h"
namespace doris {
class RuntimeState;
namespace vectorized {
class Block;
} // namespace vectorized
class SchemaBackendWorkloadGroupResourceUsage : public SchemaScanner {
ENABLE_FACTORY_CREATOR(SchemaBackendWorkloadGroupResourceUsage);
public:
SchemaBackendWorkloadGroupResourceUsage();
~SchemaBackendWorkloadGroupResourceUsage() override;
Status start(RuntimeState* state) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
private:
int _block_rows_limit = 4096;
int _row_idx = 0;
int _total_rows = 0;
std::unique_ptr<vectorized::Block> _block = nullptr;
};
}; // namespace doris

View File

@ -42,6 +42,8 @@ std::vector<SchemaScanner::ColumnDesc> SchemaWorkloadGroupsScanner::_s_tbls_colu
{"SPILL_THRESHOLD_LOW_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"SPILL_THRESHOLD_HIGH_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"TAG", TYPE_VARCHAR, sizeof(StringRef), true},
{"READ_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), true},
{"REMOTE_READ_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), true},
};
SchemaWorkloadGroupsScanner::SchemaWorkloadGroupsScanner()

View File

@ -28,6 +28,8 @@
#include "common/config.h"
#include "common/status.h"
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/runtime_profile.h"
#include "util/threadpool.h"
@ -585,15 +587,19 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
if (UNLIKELY(0 == _len || _offset + _len < off)) {
return Status::OK();
}
// [0]: maximum len trying to read, [1] maximum length buffer can provide, [2] actual len buffer has
size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off});
{
SCOPED_RAW_TIMER(&_statis.copy_time);
memcpy((void*)out, _buf.get() + (off - _offset), read_len);
LIMIT_REMOTE_SCAN_IO(bytes_read);
// [0]: maximum len trying to read, [1] maximum length buffer can provide, [2] actual len buffer has
size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off});
{
SCOPED_RAW_TIMER(&_statis.copy_time);
memcpy((void*)out, _buf.get() + (off - _offset), read_len);
}
*bytes_read = read_len;
_statis.request_io += 1;
_statis.request_bytes += read_len;
}
*bytes_read = read_len;
_statis.request_io += 1;
_statis.request_bytes += read_len;
if (off + *bytes_read == _offset + _len) {
reset_offset(_offset + _whole_buffer_size);
}

View File

@ -26,6 +26,8 @@
namespace doris {
namespace io {
const std::string FileReader::VIRTUAL_REMOTE_DATA_DIR = "virtual_remote_data_dir";
Status FileReader::read_at(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) {
DCHECK(bthread_self() == 0);

View File

@ -68,6 +68,8 @@ public:
FileReader() = default;
virtual ~FileReader() = default;
static const std::string VIRTUAL_REMOTE_DATA_DIR;
DISALLOW_COPY_AND_ASSIGN(FileReader);
/// If io_ctx is not null,
@ -85,6 +87,8 @@ public:
virtual std::shared_ptr<FileSystem> fs() const = 0;
virtual const std::string& get_data_dir_path() { return VIRTUAL_REMOTE_DATA_DIR; }
protected:
virtual Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) = 0;

View File

@ -28,6 +28,8 @@
#include "common/logging.h"
#include "io/fs/err_utils.h"
// #include "io/fs/hdfs_file_system.h"
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "service/backend_options.h"
#include "util/doris_metrics.h"
#include "util/hdfs_util.h"
@ -97,6 +99,7 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
return Status::OK();
}
LIMIT_REMOTE_SCAN_IO(bytes_read);
size_t has_read = 0;
while (has_read < bytes_req) {
tSize loop_read = hdfsPread(_handle->fs(), _handle->file(), offset + has_read,
@ -152,6 +155,7 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
return Status::OK();
}
LIMIT_REMOTE_SCAN_IO(bytes_read);
size_t has_read = 0;
while (has_read < bytes_req) {
int64_t loop_read =

View File

@ -33,6 +33,10 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/sync_point.h"
#include "io/fs/err_utils.h"
#include "olap/olap_common.h"
#include "olap/options.h"
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/async_io.h"
#include "util/doris_metrics.h"
@ -40,9 +44,57 @@ namespace doris {
namespace io {
struct IOContext;
std::vector<doris::DataDirInfo> BeConfDataDirReader::be_config_data_dir_list;
void BeConfDataDirReader::get_data_dir_by_file_path(io::Path* file_path,
std::string* data_dir_arg) {
for (const auto& data_dir_info : be_config_data_dir_list) {
if (data_dir_info.path.size() >= file_path->string().size()) {
continue;
}
if (file_path->string().compare(0, data_dir_info.path.size(), data_dir_info.path) == 0) {
*data_dir_arg = data_dir_info.path;
}
}
}
void BeConfDataDirReader::init_be_conf_data_dir(
const std::vector<doris::StorePath>& store_paths,
const std::vector<doris::StorePath>& spill_store_paths,
const std::vector<doris::CachePath>& cache_paths) {
for (int i = 0; i < store_paths.size(); i++) {
DataDirInfo data_dir_info;
data_dir_info.path = store_paths[i].path;
data_dir_info.storage_medium = store_paths[i].storage_medium;
data_dir_info.data_dir_type = DataDirType::OLAP_DATA_DIR;
data_dir_info.bvar_name = "local_data_dir_" + std::to_string(i);
be_config_data_dir_list.push_back(data_dir_info);
}
for (int i = 0; i < spill_store_paths.size(); i++) {
doris::DataDirInfo data_dir_info;
data_dir_info.path = spill_store_paths[i].path;
data_dir_info.storage_medium = spill_store_paths[i].storage_medium;
data_dir_info.data_dir_type = doris::DataDirType::SPILL_DISK_DIR;
data_dir_info.bvar_name = "spill_data_dir_" + std::to_string(i);
be_config_data_dir_list.push_back(data_dir_info);
}
for (int i = 0; i < cache_paths.size(); i++) {
doris::DataDirInfo data_dir_info;
data_dir_info.path = cache_paths[i].path;
data_dir_info.storage_medium = TStorageMedium::REMOTE_CACHE;
data_dir_info.data_dir_type = doris::DataDirType::DATA_CACHE_DIR;
data_dir_info.bvar_name = "local_cache_dir_" + std::to_string(i);
be_config_data_dir_list.push_back(data_dir_info);
}
}
LocalFileReader::LocalFileReader(Path path, size_t file_size, int fd,
std::shared_ptr<LocalFileSystem> fs)
: _fd(fd), _path(std::move(path)), _file_size(file_size), _fs(std::move(fs)) {
_data_dir_path = "";
BeConfDataDirReader::get_data_dir_by_file_path(&_path, &_data_dir_path);
DorisMetrics::instance()->local_file_open_reading->increment(1);
DorisMetrics::instance()->local_file_reader_total->increment(1);
}
@ -78,6 +130,8 @@ Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_
bytes_req = std::min(bytes_req, _file_size - offset);
*bytes_read = 0;
LIMIT_LOCAL_SCAN_IO(get_data_dir_path(), bytes_read);
while (bytes_req != 0) {
auto res = SYNC_POINT_HOOK_RETURN_VALUE(::pread(_fd, to, bytes_req, offset),
"LocalFileReader::pread", _fd, to);

View File

@ -30,7 +30,23 @@
#include "util/slice.h"
namespace doris {
struct StorePath;
struct DataDirInfo;
struct CachePath;
namespace io {
struct BeConfDataDirReader {
static std::vector<doris::DataDirInfo> be_config_data_dir_list;
static void get_data_dir_by_file_path(Path* file_path, std::string* data_dir_arg);
static void init_be_conf_data_dir(const std::vector<doris::StorePath>& store_paths,
const std::vector<doris::StorePath>& spill_store_paths,
const std::vector<doris::CachePath>& cache_paths);
};
struct IOContext;
class LocalFileReader final : public FileReader {
@ -49,6 +65,8 @@ public:
FileSystemSPtr fs() const override { return _fs; }
const std::string& get_data_dir_path() override { return _data_dir_path; }
private:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
@ -59,6 +77,7 @@ private:
size_t _file_size;
std::atomic<bool> _closed = false;
std::shared_ptr<LocalFileSystem> _fs;
std::string _data_dir_path; // be conf's data dir path
};
} // namespace io

View File

@ -33,6 +33,8 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "io/fs/err_utils.h"
#include "io/fs/s3_common.h"
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/bvar_helper.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
@ -115,6 +117,8 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea
const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds
const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s, 8s for each backoff
LIMIT_REMOTE_SCAN_IO(bytes_read);
int total_sleep_time = 0;
while (retry_count <= max_retries) {
s3_file_reader_read_counter << 1;

View File

@ -54,6 +54,12 @@ using TabletUid = UniqueId;
enum CompactionType { BASE_COMPACTION = 1, CUMULATIVE_COMPACTION = 2, FULL_COMPACTION = 3 };
enum DataDirType {
SPILL_DISK_DIR,
OLAP_DATA_DIR,
DATA_CACHE_DIR,
};
struct DataDirInfo {
std::string path;
size_t path_hash = 0;
@ -64,6 +70,8 @@ struct DataDirInfo {
int64_t trash_used_capacity = 0;
bool is_used = false; // whether available mark
TStorageMedium::type storage_medium = TStorageMedium::HDD; // Storage medium type: SSD|HDD
DataDirType data_dir_type = DataDirType::OLAP_DATA_DIR;
std::string bvar_name;
};
struct PredicateFilterInfo {
int type = 0;

View File

@ -226,6 +226,7 @@ Status PipelineTask::execute(bool* eos) {
if (cpu_qs) {
cpu_qs->add_cpu_nanos(delta_cpu_time);
}
query_context()->update_wg_cpu_adder(delta_cpu_time);
}};
// The status must be runnable
*eos = false;

View File

@ -203,7 +203,7 @@ public:
ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get(); }
Status init_pipeline_task_scheduler();
void init_file_cache_factory();
void init_file_cache_factory(std::vector<doris::CachePath>& cache_paths);
io::FileCacheFactory* file_cache_factory() { return _file_cache_factory; }
UserFunctionCache* user_function_cache() { return _user_function_cache; }
FragmentMgr* fragment_mgr() { return _fragment_mgr; }

View File

@ -36,6 +36,7 @@
#include "common/status.h"
#include "io/cache/block/block_file_cache_factory.h"
#include "io/fs/file_meta_cache.h"
#include "io/fs/local_file_reader.h"
#include "olap/memtable_memory_limiter.h"
#include "olap/olap_define.h"
#include "olap/options.h"
@ -212,7 +213,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
// so it should be created before all query begin and deleted after all query and daemon thread stoppped
_runtime_query_statistics_mgr = new RuntimeQueryStatiticsMgr();
init_file_cache_factory();
std::vector<doris::CachePath> cache_paths;
init_file_cache_factory(cache_paths);
doris::io::BeConfDataDirReader::init_be_conf_data_dir(store_paths, spill_store_paths,
cache_paths);
_pipeline_tracer_ctx = std::make_unique<pipeline::PipelineTracerContext>(); // before query
RETURN_IF_ERROR(init_pipeline_task_scheduler());
_workload_group_manager = new WorkloadGroupMgr();
@ -321,13 +325,12 @@ Status ExecEnv::init_pipeline_task_scheduler() {
return Status::OK();
}
void ExecEnv::init_file_cache_factory() {
void ExecEnv::init_file_cache_factory(std::vector<doris::CachePath>& cache_paths) {
// Load file cache before starting up daemon threads to make sure StorageEngine is read.
if (doris::config::enable_file_cache) {
_file_cache_factory = new io::FileCacheFactory();
io::IFileCache::init();
std::unordered_set<std::string> cache_path_set;
std::vector<doris::CachePath> cache_paths;
Status olap_res =
doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths);
if (!olap_res) {

View File

@ -308,6 +308,12 @@ public:
// only for file scan node
std::map<int, TFileScanRangeParams> file_scan_range_params_map;
void update_wg_cpu_adder(int64_t delta_cpu_time) {
if (_workload_group != nullptr) {
_workload_group->update_cpu_adder(delta_cpu_time);
}
}
private:
TUniqueId _query_id;
ExecEnv* _exec_env = nullptr;

View File

@ -111,6 +111,40 @@
__VA_ARGS__; \
} while (0)
#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
auto* t_ctx = doris::thread_context(true); \
if (t_ctx) { \
iot = t_ctx->get_local_scan_io_throttle(data_dir); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
t_ctx->update_total_local_scan_io_adder(*bytes_read); \
} \
} \
}
#define LIMIT_REMOTE_SCAN_IO(bytes_read) \
std::shared_ptr<IOThrottle> iot = nullptr; \
if (auto* t_ctx = doris::thread_context(true)) { \
iot = t_ctx->get_remote_scan_io_throttle(); \
} \
if (iot) { \
iot->acquire(-1); \
} \
Defer defer { \
[&]() { \
if (iot) { \
iot->update_next_io_time(*bytes_read); \
} \
} \
}
namespace doris {
class ThreadContext;
@ -227,6 +261,26 @@ public:
std::weak_ptr<WorkloadGroup> workload_group() { return _wg_wptr; }
std::shared_ptr<IOThrottle> get_local_scan_io_throttle(const std::string& data_dir) {
if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) {
return wg_ptr->get_local_scan_io_throttle(data_dir);
}
return nullptr;
}
std::shared_ptr<IOThrottle> get_remote_scan_io_throttle() {
if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) {
return wg_ptr->get_remote_scan_io_throttle();
}
return nullptr;
}
void update_total_local_scan_io_adder(size_t bytes_read) {
if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) {
wg_ptr->update_total_local_scan_io_adder(bytes_read);
}
}
int thread_local_handle_count = 0;
int skip_memory_check = 0;
int skip_large_memory_check = 0;
@ -292,7 +346,7 @@ public:
};
// must call create_thread_local_if_not_exits() before use thread_context().
static ThreadContext* thread_context() {
static ThreadContext* thread_context(bool allow_return_null = false) {
if (pthread_context_ptr_init) {
// in pthread
DCHECK(bthread_self() == 0);
@ -306,6 +360,9 @@ static ThreadContext* thread_context() {
DCHECK(bthread_context != nullptr);
return bthread_context;
}
if (allow_return_null) {
return nullptr;
}
// It means that use thread_context() but this thread not attached a query/load using SCOPED_ATTACH_TASK macro.
LOG(FATAL) << "__builtin_unreachable, " << doris::memory_orphan_check_msg;
__builtin_unreachable();

View File

@ -27,12 +27,14 @@
#include <utility>
#include "common/logging.h"
#include "io/fs/local_file_reader.h"
#include "olap/storage_engine.h"
#include "pipeline/task_queue.h"
#include "pipeline/task_scheduler.h"
#include "runtime/exec_env.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/mem_info.h"
#include "util/parse_util.h"
#include "util/runtime_profile.h"
@ -62,7 +64,24 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
_max_remote_scan_thread_num(tg_info.max_remote_scan_thread_num),
_min_remote_scan_thread_num(tg_info.min_remote_scan_thread_num),
_spill_low_watermark(tg_info.spill_low_watermark),
_spill_high_watermark(tg_info.spill_high_watermark) {}
_spill_high_watermark(tg_info.spill_high_watermark),
_scan_bytes_per_second(tg_info.read_bytes_per_second),
_remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second) {
std::vector<DataDirInfo>& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list;
for (const auto& data_dir : data_dir_list) {
_scan_io_throttle_map[data_dir.path] =
std::make_shared<IOThrottle>(_name, data_dir.bvar_name + "_read_bytes");
}
_remote_scan_io_throttle = std::make_shared<IOThrottle>(_name, "remote_read_bytes");
_mem_used_status = std::make_unique<bvar::Status<int64_t>>(_name, "memory_used", 0);
_cpu_usage_adder = std::make_unique<bvar::Adder<uint64_t>>(_name, "cpu_usage_adder");
_cpu_usage_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<uint64_t>>>(
_name, "cpu_usage", _cpu_usage_adder.get(), 10);
_total_local_scan_io_adder =
std::make_unique<bvar::Adder<size_t>>(_name, "total_local_read_bytes");
_total_local_scan_io_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>(
_name, "total_local_read_bytes_per_second", _total_local_scan_io_adder.get(), 1);
}
std::string WorkloadGroup::debug_string() const {
std::shared_lock<std::shared_mutex> rl {_mutex};
@ -70,11 +89,13 @@ std::string WorkloadGroup::debug_string() const {
"TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = "
"{}, version = {}, cpu_hard_limit = {}, scan_thread_num = "
"{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = {}, "
"spill_low_watermark={}, spill_high_watermark={}, is_shutdown={}, query_num={}]",
"spill_low_watermark={}, spill_high_watermark={}, is_shutdown={}, query_num={}, "
"read_bytes_per_second={}, remote_read_bytes_per_second={}]",
_id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES),
_enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit(),
_scan_thread_num, _max_remote_scan_thread_num, _min_remote_scan_thread_num,
_spill_low_watermark, _spill_high_watermark, _is_shutdown, _query_ctxs.size());
_spill_low_watermark, _spill_high_watermark, _is_shutdown, _query_ctxs.size(),
_scan_bytes_per_second, _remote_scan_bytes_per_second);
}
void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
@ -101,6 +122,8 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
_min_remote_scan_thread_num = tg_info.min_remote_scan_thread_num;
_spill_low_watermark = tg_info.spill_low_watermark;
_spill_high_watermark = tg_info.spill_high_watermark;
_scan_bytes_per_second = tg_info.read_bytes_per_second;
_remote_scan_bytes_per_second = tg_info.remote_read_bytes_per_second;
} else {
return;
}
@ -122,6 +145,7 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots(
}
}
refresh_memory(used_memory);
_mem_used_status->set_value(used_memory);
return used_memory;
}
@ -368,6 +392,20 @@ Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_g
}
workload_group_info->spill_high_watermark = spill_high_watermark;
// 14 scan io
int read_bytes_per_second = -1;
if (tworkload_group_info.__isset.read_bytes_per_second) {
read_bytes_per_second = tworkload_group_info.read_bytes_per_second;
}
workload_group_info->read_bytes_per_second = read_bytes_per_second;
// 15 remote scan io
int remote_read_bytes_per_second = -1;
if (tworkload_group_info.__isset.remote_read_bytes_per_second) {
remote_read_bytes_per_second = tworkload_group_info.remote_read_bytes_per_second;
}
workload_group_info->remote_read_bytes_per_second = remote_read_bytes_per_second;
return Status::OK();
}
@ -537,6 +575,38 @@ std::string WorkloadGroup::thread_debug_info() {
return str;
}
void WorkloadGroup::upsert_scan_io_throttle(WorkloadGroupInfo* tg_info) {
for (const auto& [key, io_throttle] : _scan_io_throttle_map) {
io_throttle->set_io_bytes_per_second(tg_info->read_bytes_per_second);
}
_remote_scan_io_throttle->set_io_bytes_per_second(tg_info->remote_read_bytes_per_second);
}
std::shared_ptr<IOThrottle> WorkloadGroup::get_local_scan_io_throttle(const std::string& disk_dir) {
auto find_ret = _scan_io_throttle_map.find(disk_dir);
if (find_ret != _scan_io_throttle_map.end()) {
return find_ret->second;
}
return nullptr;
}
std::shared_ptr<IOThrottle> WorkloadGroup::get_remote_scan_io_throttle() {
return _remote_scan_io_throttle;
}
void WorkloadGroup::update_cpu_adder(int64_t delta_cpu_time) {
(*_cpu_usage_adder) << (uint64_t)delta_cpu_time;
}
void WorkloadGroup::update_total_local_scan_io_adder(size_t scan_bytes) {
(*_total_local_scan_io_adder) << scan_bytes;
}
int64_t WorkloadGroup::get_remote_scan_bytes_per_second() {
return _remote_scan_io_throttle->get_bvar_io_per_second();
}
void WorkloadGroup::try_stop_schedulers() {
std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
if (_task_sched) {

View File

@ -17,6 +17,7 @@
#pragma once
#include <bvar/bvar.h>
#include <gen_cpp/BackendService_types.h>
#include <stddef.h>
#include <stdint.h>
@ -40,6 +41,7 @@ class ThreadPool;
class ExecEnv;
class CgroupCpuCtl;
class QueryContext;
class IOThrottle;
namespace vectorized {
class SimplifiedScanScheduler;
@ -190,6 +192,23 @@ public:
std::string thread_debug_info();
std::shared_ptr<IOThrottle> get_local_scan_io_throttle(const std::string& disk_dir);
std::shared_ptr<IOThrottle> get_remote_scan_io_throttle();
void upsert_scan_io_throttle(WorkloadGroupInfo* tg_info);
void update_cpu_adder(int64_t delta_cpu_time);
void update_total_local_scan_io_adder(size_t scan_bytes);
int64_t get_mem_used() { return _mem_used_status->get_value(); }
uint64_t get_cpu_usage() { return _cpu_usage_per_second->get_value(); }
int64_t get_local_scan_bytes_per_second() {
return _total_local_scan_io_per_second->get_value();
}
int64_t get_remote_scan_bytes_per_second();
private:
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit
const uint64_t _id;
@ -210,6 +229,8 @@ private:
std::atomic<int> _min_remote_scan_thread_num;
std::atomic<int> _spill_low_watermark;
std::atomic<int> _spill_high_watermark;
std::atomic<int64_t> _scan_bytes_per_second {-1};
std::atomic<int64_t> _remote_scan_bytes_per_second {-1};
// means workload group is mark dropped
// new query can not submit
@ -223,6 +244,16 @@ private:
std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched {nullptr};
std::unique_ptr<vectorized::SimplifiedScanScheduler> _remote_scan_task_sched {nullptr};
std::unique_ptr<ThreadPool> _memtable_flush_pool {nullptr};
std::map<std::string, std::shared_ptr<IOThrottle>> _scan_io_throttle_map;
std::shared_ptr<IOThrottle> _remote_scan_io_throttle {nullptr};
// bvar metric
std::unique_ptr<bvar::Status<int64_t>> _mem_used_status;
std::unique_ptr<bvar::Adder<uint64_t>> _cpu_usage_adder;
std::unique_ptr<bvar::PerSecond<bvar::Adder<uint64_t>>> _cpu_usage_per_second;
std::unique_ptr<bvar::Adder<size_t>> _total_local_scan_io_adder;
std::unique_ptr<bvar::PerSecond<bvar::Adder<size_t>>> _total_local_scan_io_per_second;
};
using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>;
@ -241,6 +272,8 @@ struct WorkloadGroupInfo {
int min_remote_scan_thread_num;
int spill_low_watermark;
int spill_high_watermark;
int read_bytes_per_second;
int remote_read_bytes_per_second;
// log cgroup cpu info
uint64_t cgroup_cpu_shares = 0;
int cgroup_cpu_hard_limit = 0;

View File

@ -27,6 +27,7 @@
#include "util/mem_info.h"
#include "util/threadpool.h"
#include "util/time.h"
#include "vec/core/block.h"
#include "vec/exec/scan/scanner_scheduler.h"
namespace doris {
@ -241,6 +242,52 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_ratio() {
}
}
void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
auto insert_int_value = [&](int col_index, int64_t int_val, vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(
int_val);
nullable_column->get_null_map_data().emplace_back(0);
};
auto insert_double_value = [&](int col_index, double double_val, vectorized::Block* block) {
vectorized::MutableColumnPtr mutable_col_ptr;
mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable();
auto* nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(
double_val);
nullable_column->get_null_map_data().emplace_back(0);
};
int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
int cpu_num = CpuInfo::num_cores();
cpu_num = cpu_num <= 0 ? 1 : cpu_num;
uint64_t total_cpu_time_ns_per_second = cpu_num * 1000000000ll;
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
block->reserve(_workload_groups.size());
for (const auto& [id, wg] : _workload_groups) {
insert_int_value(0, be_id, block);
insert_int_value(1, wg->id(), block);
insert_int_value(2, wg->get_mem_used(), block);
double cpu_usage_p =
(double)wg->get_cpu_usage() / (double)total_cpu_time_ns_per_second * 100;
cpu_usage_p = std::round(cpu_usage_p * 100.0) / 100.0;
insert_double_value(3, cpu_usage_p, block);
insert_int_value(4, wg->get_local_scan_bytes_per_second(), block);
insert_int_value(5, wg->get_remote_scan_bytes_per_second(), block);
}
}
void WorkloadGroupMgr::stop() {
for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) {
iter->second->try_stop_schedulers();

View File

@ -27,6 +27,10 @@ namespace doris {
class CgroupCpuCtl;
namespace vectorized {
class Block;
} // namespace vectorized
namespace pipeline {
class TaskScheduler;
class MultiCoreTaskQueue;
@ -56,6 +60,8 @@ public:
void refresh_wg_weighted_memory_ratio();
void get_wg_resource_usage(vectorized::Block* block);
private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;

View File

@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/workload_management/io_throttle.h"
#include "util/defer_op.h"
#include "util/time.h"
namespace doris {
IOThrottle::IOThrottle(std::string prefix, std::string name) {
_io_adder = std::make_unique<bvar::Adder<size_t>>(prefix, name);
_io_adder_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>(
prefix, name + "_per_second", _io_adder.get(), 1);
}
bool IOThrottle::acquire(int64_t block_timeout_ms) {
if (_io_bytes_per_second_limit < 0) {
return true;
}
std::unique_lock<std::mutex> w_lock(_mutex);
int64_t current_time = GetCurrentTimeMicros();
int64_t block_finish_time = block_timeout_ms <= 0 ? 0 : current_time + block_timeout_ms * 1000;
while (current_time <= _next_io_time_micros) {
if (block_finish_time > 0 && current_time >= block_finish_time) {
return false;
}
wait_condition.wait_for(w_lock,
std::chrono::microseconds(_next_io_time_micros - current_time));
current_time = GetCurrentTimeMicros();
}
return true;
}
bool IOThrottle::try_acquire() {
if (_io_bytes_per_second_limit < 0) {
return true;
}
std::unique_lock<std::mutex> w_lock(_mutex);
return GetCurrentTimeMicros() > _next_io_time_micros;
}
void IOThrottle::update_next_io_time(int64_t io_bytes) {
Defer defer {[&]() {
if (io_bytes > 0) {
(*_io_adder) << io_bytes;
}
}};
if (_io_bytes_per_second_limit <= 0 || io_bytes <= 0) {
return;
}
int64_t read_bytes_per_second = _io_bytes_per_second_limit;
{
std::unique_lock<std::mutex> w_lock(_mutex);
double io_bytes_float = static_cast<double>(io_bytes);
double ret = (io_bytes_float / static_cast<double>(read_bytes_per_second)) *
static_cast<double>(MICROS_PER_SEC);
int64_t current_time = GetCurrentTimeMicros();
if (current_time > _next_io_time_micros) {
_next_io_time_micros = current_time;
}
_next_io_time_micros += ret < 1 ? static_cast<int64_t>(1) : static_cast<int64_t>(ret);
}
}
void IOThrottle::set_io_bytes_per_second(int64_t io_bytes_per_second) {
_io_bytes_per_second_limit = io_bytes_per_second;
}
}; // namespace doris

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <bvar/bvar.h>
#include <stdint.h>
#include <atomic>
#include <condition_variable>
#include <mutex>
namespace doris {
class IOThrottle {
public:
IOThrottle(std::string prefix, std::string name);
~IOThrottle() = default;
bool acquire(int64_t block_timeout_ms);
// non-block acquire
bool try_acquire();
void update_next_io_time(int64_t bytes);
void set_io_bytes_per_second(int64_t read_bytes_per_second);
size_t get_bvar_io_per_second() { return _io_adder_per_second->get_value(); }
private:
std::mutex _mutex;
std::condition_variable wait_condition;
int64_t _next_io_time_micros {0};
std::atomic<int64_t> _io_bytes_per_second_limit {10485760};
// bvar monitor
std::unique_ptr<bvar::Adder<size_t>> _io_adder;
std::unique_ptr<bvar::PerSecond<bvar::Adder<size_t>>> _io_adder_per_second;
};
}; // namespace doris

View File

@ -290,4 +290,13 @@ void VScanner::_collect_profile_before_close() {
_state->update_num_rows_load_unselected(_counter.num_rows_unselected);
}
void VScanner::update_scan_cpu_timer() {
int64_t cpu_time = _cpu_watch.elapsed_time();
_scan_cpu_timer += cpu_time;
_query_statistics->add_cpu_nanos(cpu_time);
if (_state && _state->get_query_ctx()) {
_state->get_query_ctx()->update_wg_cpu_adder(cpu_time);
}
}
} // namespace doris::vectorized

View File

@ -132,11 +132,7 @@ public:
int64_t get_scanner_wait_worker_timer() const { return _scanner_wait_worker_timer; }
void update_scan_cpu_timer() {
int64_t cpu_time = _cpu_watch.elapsed_time();
_scan_cpu_timer += cpu_time;
_query_statistics->add_cpu_nanos(cpu_time);
}
void update_scan_cpu_timer();
RuntimeState* runtime_state() { return _state; }

View File

@ -115,6 +115,13 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi
if (_writer_status.ok()) {
while (true) {
ThreadCpuStopWatch cpu_time_stop_watch;
cpu_time_stop_watch.start();
Defer defer {[&]() {
if (state && state->get_query_ctx()) {
state->get_query_ctx()->update_wg_cpu_adder(cpu_time_stop_watch.elapsed_time());
}
}};
if (!_eos && _data_queue.empty() && _writer_status.ok()) {
std::unique_lock l(_m);
while (!_eos && _data_queue.empty() && _writer_status.ok()) {

View File

@ -81,7 +81,10 @@ public enum SchemaTableType {
SCH_FILE_CACHE_STATISTICS("FILE_CACHE_STATISTICS", "FILE_CACHE_STATISTICS",
TSchemaTableType.SCH_FILE_CACHE_STATISTICS),
SCH_WORKLOAD_GROUP_PRIVILEGES("WORKLOAD_GROUP_PRIVILEGES",
"WORKLOAD_GROUP_PRIVILEGES", TSchemaTableType.SCH_WORKLOAD_GROUP_PRIVILEGES);
"WORKLOAD_GROUP_PRIVILEGES", TSchemaTableType.SCH_WORKLOAD_GROUP_PRIVILEGES),
SCH_WORKLOAD_GROUP_RESOURCE_USAGE("WORKLOAD_GROUP_RESOURCE_USAGE",
"WORKLOAD_GROUP_RESOURCE_USAGE", TSchemaTableType.SCH_WORKLOAD_GROUP_RESOURCE_USAGE);
private static final String dbName = "INFORMATION_SCHEMA";
private static SelectList fullSelectLists;

View File

@ -488,6 +488,8 @@ public class SchemaTable extends Table {
.column("SPILL_THRESHOLD_LOW_WATERMARK", ScalarType.createVarchar(256))
.column("SPILL_THRESHOLD_HIGH_WATERMARK", ScalarType.createVarchar(256))
.column("TAG", ScalarType.createVarchar(256))
.column("READ_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT))
.column("REMOTE_READ_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT))
.build()))
.put("processlist",
new SchemaTable(SystemIdGenerator.getNextId(), "processlist", TableType.SCHEMA,
@ -532,6 +534,16 @@ public class SchemaTable extends Table {
.column("IS_GRANTABLE", ScalarType.createVarchar(IS_GRANTABLE_LEN))
.build())
)
.put("workload_group_resource_usage",
new SchemaTable(SystemIdGenerator.getNextId(), "workload_group_resource_usage", TableType.SCHEMA,
builder().column("BE_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("WORKLOAD_GROUP_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("MEMORY_USAGE_BYTES", ScalarType.createType(PrimitiveType.BIGINT))
.column("CPU_USAGE_PERCENT", ScalarType.createType(PrimitiveType.DOUBLE))
.column("LOCAL_SCAN_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT))
.column("REMOTE_SCAN_BYTES_PER_SECOND", ScalarType.createType(PrimitiveType.BIGINT))
.build())
)
.build();
private boolean fetchAllFe = false;

View File

@ -65,6 +65,7 @@ public class BackendPartitionedSchemaScanNode extends SchemaScanNode {
BEACKEND_ID_COLUMN_SET.add("backend_id");
BACKEND_TABLE.add("backend_active_tasks");
BACKEND_TABLE.add("workload_group_resource_usage");
BEACKEND_ID_COLUMN_SET.add("be_id");
BACKEND_TABLE.add("file_cache_statistics");

View File

@ -75,6 +75,10 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
public static final String TAG = "tag";
public static final String READ_BYTES_PER_SECOND = "read_bytes_per_second";
public static final String REMOTE_READ_BYTES_PER_SECOND = "remote_read_bytes_per_second";
// NOTE(wb): all property is not required, some properties default value is set in be
// default value is as followed
// cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true
@ -83,7 +87,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
.add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK)
.add(TAG).build();
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).build();
public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
@ -394,6 +398,35 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
}
}
if (properties.containsKey(READ_BYTES_PER_SECOND)) {
String readBytesVal = properties.get(READ_BYTES_PER_SECOND);
try {
long longVal = Long.parseLong(readBytesVal);
boolean isValidValue = longVal == -1 || longVal > 0;
if (!isValidValue) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new DdlException(
READ_BYTES_PER_SECOND + " should be -1 or an integer value bigger than 0, but input value is "
+ readBytesVal);
}
}
if (properties.containsKey(REMOTE_READ_BYTES_PER_SECOND)) {
String readBytesVal = properties.get(REMOTE_READ_BYTES_PER_SECOND);
try {
long longVal = Long.parseLong(readBytesVal);
boolean isValidValue = longVal == -1 || longVal > 0;
if (!isValidValue) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new DdlException(REMOTE_READ_BYTES_PER_SECOND
+ " should be -1 or an integer value bigger than 0, but input value is " + readBytesVal);
}
}
}
public long getId() {
@ -484,6 +517,13 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
} else {
row.add(val);
}
} else if (READ_BYTES_PER_SECOND.equals(key) || REMOTE_READ_BYTES_PER_SECOND.equals(key)) {
String val = properties.get(key);
if (StringUtils.isEmpty(val)) {
row.add("-1");
} else {
row.add(val);
}
} else {
row.add(properties.get(key));
}
@ -571,6 +611,16 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
tWorkloadGroupInfo.setTag(tagStr);
}
String readBytesPerSecStr = properties.get(READ_BYTES_PER_SECOND);
if (readBytesPerSecStr != null) {
tWorkloadGroupInfo.setReadBytesPerSecond(Long.valueOf(readBytesPerSecStr));
}
String remoteReadBytesPerSecStr = properties.get(REMOTE_READ_BYTES_PER_SECOND);
if (remoteReadBytesPerSecStr != null) {
tWorkloadGroupInfo.setRemoteReadBytesPerSecond(Long.valueOf(remoteReadBytesPerSecStr));
}
TopicInfo topicInfo = new TopicInfo();
topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
return topicInfo;

View File

@ -80,6 +80,7 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost
.add(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM)
.add(WorkloadGroup.SPILL_THRESHOLD_LOW_WATERMARK).add(WorkloadGroup.SPILL_THRESHOLD_HIGH_WATERMARK)
.add(WorkloadGroup.TAG)
.add(WorkloadGroup.READ_BYTES_PER_SECOND).add(WorkloadGroup.REMOTE_READ_BYTES_PER_SECOND)
.add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM)
.build();

View File

@ -491,8 +491,11 @@ public class MetadataGenerator {
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(12))); // spill low watermark
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(13))); // spill high watermark
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(14))); // tag
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(15))); // running query num
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(16))); // waiting query num
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(15)))); // read bytes per second
trow.addToColumnValue(
new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(16)))); // remote read bytes per second
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(17))); // running query num
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(18))); // waiting query num
dataBatch.add(trow);
}

View File

@ -133,7 +133,8 @@ enum TSchemaTableType {
SCH_PROCS_PRIV,
SCH_WORKLOAD_POLICY,
SCH_FILE_CACHE_STATISTICS,
SCH_WORKLOAD_GROUP_PRIVILEGES;
SCH_WORKLOAD_GROUP_PRIVILEGES,
SCH_WORKLOAD_GROUP_RESOURCE_USAGE;
}
enum THdfsCompression {

View File

@ -6,8 +6,8 @@
2
-- !show_1 --
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 10% true 2147483647 0 0 -1 -1
normal 20 50% true 2147483647 0 0 1% 16 -1 -1
test_group 10 10% true 2147483647 0 0 -1 -1 -1 -1
-- !show_del_wg_1 --
normal 20 50% true 2147483647 0 0 1% 16
@ -46,8 +46,8 @@ test_group 10 11% false 2147483647 0 0 20% -1
2
-- !show_queue --
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 11% false 100 0 0 20% -1
normal 20 50% true 2147483647 0 0 1% 16 -1 -1
test_group 10 11% false 100 0 0 20% -1 104857600 104857600
-- !select_tvf_1 --
normal 20 50% true 2147483647 0 0 1% 16

View File

@ -85,6 +85,8 @@ suite("test_crud_wlg") {
sql "alter workload group normal properties ( 'queue_timeout'='0' );"
sql "alter workload group normal properties ( 'cpu_hard_limit'='1%' );"
sql "alter workload group normal properties ( 'scan_thread_num'='-1' );"
sql "alter workload group normal properties ( 'scan_thread_num'='-1' );"
sql "alter workload group normal properties ( 'remote_read_bytes_per_second'='-1' );"
sql "set workload_group=normal;"
@ -119,7 +121,7 @@ suite("test_crud_wlg") {
");"
sql "set workload_group=test_group;"
qt_show_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag from information_schema.workload_groups where name in ('normal','test_group') order by name;"
qt_show_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag,read_bytes_per_second,remote_read_bytes_per_second from information_schema.workload_groups where name in ('normal','test_group') order by name;"
// test drop workload group
sql "create workload group if not exists test_drop_wg properties ('cpu_share'='10')"
@ -189,9 +191,31 @@ suite("test_crud_wlg") {
exception "requires a positive integer"
}
test {
sql "alter workload group test_group properties('read_bytes_per_second'='0')"
exception "an integer value bigger than"
}
test {
sql "alter workload group test_group properties('read_bytes_per_second'='-2')"
exception "an integer value bigger than"
}
test {
sql "alter workload group test_group properties('remote_read_bytes_per_second'='0')"
exception "an integer value bigger than"
}
test {
sql "alter workload group test_group properties('remote_read_bytes_per_second'='-2')"
exception "an integer value bigger than"
}
sql "alter workload group test_group properties ( 'max_concurrency'='100' );"
sql "alter workload group test_group properties('remote_read_bytes_per_second'='104857600')"
sql "alter workload group test_group properties('read_bytes_per_second'='104857600')"
qt_queue_1 """ select count(1) from ${table_name} """
qt_show_queue "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from information_schema.workload_groups where name in ('normal','test_group') order by name;"
qt_show_queue "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,read_bytes_per_second,remote_read_bytes_per_second from information_schema.workload_groups where name in ('normal','test_group') order by name;"
// test create group failed
// failed for cpu_share