From 498b06fbe28e09de11db68b450f8a775a9160671 Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Wed, 2 Sep 2020 10:39:41 +0800 Subject: [PATCH] [Metrics] Support tablet level metrics (#4428) Sometimes we want to detect the hotspot of a cluster, for example, hot scanned tablet, hot wrote tablet, but we have no insight about tablets in the cluster. This patch introduce tablet level metrics to help to achieve this object, now support 4 metrics on tablets: `query_scan_bytes `, `query_scan_rows `, `flush_bytes `, `flush_count `. However, one BE may holds hundreds of thousands of tablets, so I add a parameter for the metrics HTTP request, and not return tablet level metrics by default. --- be/src/agent/task_worker_pool.cpp | 26 +-- be/src/common/daemon.cpp | 18 +- be/src/exec/olap_scanner.cpp | 7 +- be/src/exec/tablet_sink.cpp | 4 +- be/src/http/action/compaction_action.cpp | 4 +- be/src/http/action/metrics_action.cpp | 5 +- be/src/http/action/stream_load.cpp | 24 +- be/src/http/action/stream_load.h | 10 +- be/src/olap/base_compaction.cpp | 4 +- be/src/olap/base_tablet.cpp | 16 +- be/src/olap/base_tablet.h | 7 + be/src/olap/cumulative_compaction.cpp | 4 +- be/src/olap/data_dir.cpp | 18 +- be/src/olap/data_dir.h | 10 +- be/src/olap/delta_writer.cpp | 6 +- be/src/olap/fs/block_manager_metrics.cpp | 18 +- be/src/olap/memtable.cpp | 4 +- be/src/olap/memtable_flush_executor.h | 2 +- be/src/olap/olap_meta.cpp | 12 +- .../rowset/segment_v2/segment_iterator.cpp | 8 +- be/src/olap/storage_engine.cpp | 8 +- be/src/olap/tablet.cpp | 6 + be/src/olap/tablet.h | 4 + be/src/olap/tablet_manager.cpp | 18 +- be/src/olap/task/engine_alter_tablet_task.cpp | 4 +- be/src/olap/task/engine_batch_load_task.cpp | 18 +- .../task/engine_storage_migration_task.cpp | 2 +- be/src/runtime/client_cache.cpp | 14 +- be/src/runtime/client_cache.h | 6 +- be/src/runtime/fragment_mgr.cpp | 4 +- be/src/runtime/mem_pool.cpp | 6 +- be/src/runtime/memory/chunk_allocator.cpp | 38 ++-- be/src/runtime/memory/chunk_allocator.h | 2 +- .../stream_load/stream_load_executor.cpp | 12 +- be/src/runtime/tmp_file_mgr.cc | 10 +- be/src/runtime/tmp_file_mgr.h | 2 +- be/src/util/doris_metrics.cpp | 178 +++++++-------- be/src/util/doris_metrics.h | 214 +++++++++--------- be/src/util/metrics.cpp | 82 ++++--- be/src/util/metrics.h | 87 +++++-- be/src/util/system_metrics.cpp | 212 +++++++++-------- be/src/util/system_metrics.h | 1 + be/src/util/thrift_server.cpp | 10 +- be/src/util/thrift_server.h | 6 +- be/test/agent/cgroups_mgr_test.cpp | 10 + be/test/exprs/hybird_set_test.cpp | 2 + be/test/http/metrics_action_test.cpp | 22 +- be/test/olap/push_handler_test.cpp | 2 +- be/test/util/doris_metrics_test.cpp | 51 ++--- be/test/util/new_metrics_test.cpp | 98 ++++---- be/test/util/system_metrics_test.cpp | 8 +- 51 files changed, 729 insertions(+), 615 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index b557f3eeaf..1b437a63ae 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -263,14 +263,14 @@ void TaskWorkerPool::_finish_task(const TFinishTaskRequest& finish_task_request) uint32_t try_time = 0; while (try_time < TASK_FINISH_MAX_RETRY) { - DorisMetrics::instance()->finish_task_requests_total.increment(1); + DorisMetrics::instance()->finish_task_requests_total->increment(1); AgentStatus client_status = _master_client->finish_task(finish_task_request, &result); if (client_status == DORIS_SUCCESS) { LOG(INFO) << "finish task success."; break; } else { - DorisMetrics::instance()->finish_task_requests_failed.increment(1); + DorisMetrics::instance()->finish_task_requests_failed->increment(1); LOG(WARNING) << "finish task failed. status_code=" << result.status.status_code; try_time += 1; } @@ -694,7 +694,7 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) { worker_pool_this->_tasks.pop_front(); } - DorisMetrics::instance()->publish_task_request_total.increment(1); + DorisMetrics::instance()->publish_task_request_total->increment(1); LOG(INFO) << "get publish version task, signature:" << agent_task_req.signature; Status st; @@ -718,7 +718,7 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) { TFinishTaskRequest finish_task_request; if (res != OLAP_SUCCESS) { - DorisMetrics::instance()->publish_task_failed_total.increment(1); + DorisMetrics::instance()->publish_task_failed_total->increment(1); // if publish failed, return failed, FE will ignore this error and // check error tablet ids and FE will also republish this task LOG(WARNING) << "publish version failed. signature:" << agent_task_req.signature @@ -890,7 +890,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { worker_pool_this->_tasks.pop_front(); } - DorisMetrics::instance()->clone_requests_total.increment(1); + DorisMetrics::instance()->clone_requests_total->increment(1); LOG(INFO) << "get clone task. signature:" << agent_task_req.signature; vector error_msgs; @@ -907,7 +907,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { TStatusCode::type status_code = TStatusCode::OK; if (status != DORIS_SUCCESS && status != DORIS_CREATE_TABLE_EXIST) { - DorisMetrics::instance()->clone_requests_failed.increment(1); + DorisMetrics::instance()->clone_requests_failed->increment(1); status_code = TStatusCode::RUNTIME_ERROR; LOG(WARNING) << "clone failed. signature: " << agent_task_req.signature; error_msgs.push_back("clone failed."); @@ -1050,12 +1050,12 @@ void* TaskWorkerPool::_report_task_worker_thread_callback(void* arg_this) { request.__set_tasks(_s_task_signatures); } - DorisMetrics::instance()->report_task_requests_total.increment(1); + DorisMetrics::instance()->report_task_requests_total->increment(1); TMasterResult result; AgentStatus status = worker_pool_this->_master_client->report(request, &result); if (status != DORIS_SUCCESS) { - DorisMetrics::instance()->report_task_requests_failed.increment(1); + DorisMetrics::instance()->report_task_requests_failed->increment(1); LOG(WARNING) << "finish report task failed. status:" << status << ", master host:" << worker_pool_this->_master_info.network_address.hostname << "port:" << worker_pool_this->_master_info.network_address.port; @@ -1103,12 +1103,12 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this) } request.__set_disks(disks); - DorisMetrics::instance()->report_disk_requests_total.increment(1); + DorisMetrics::instance()->report_disk_requests_total->increment(1); TMasterResult result; AgentStatus status = worker_pool_this->_master_client->report(request, &result); if (status != DORIS_SUCCESS) { - DorisMetrics::instance()->report_disk_requests_failed.increment(1); + DorisMetrics::instance()->report_disk_requests_failed->increment(1); LOG(WARNING) << "finish report disk state failed. status:" << status << ", master host:" << worker_pool_this->_master_info.network_address.hostname << ", port:" << worker_pool_this->_master_info.network_address.port; @@ -1161,15 +1161,15 @@ void* TaskWorkerPool::_report_tablet_worker_thread_callback(void* arg_this) { #endif } int64_t max_compaction_score = - std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score.value(), - DorisMetrics::instance()->tablet_base_max_compaction_score.value()); + std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(), + DorisMetrics::instance()->tablet_base_max_compaction_score->value()); request.__set_tablet_max_compaction_score(max_compaction_score); TMasterResult result; status = worker_pool_this->_master_client->report(request, &result); if (status != DORIS_SUCCESS) { - DorisMetrics::instance()->report_all_tablets_requests_failed.increment(1); + DorisMetrics::instance()->report_all_tablets_requests_failed->increment(1); LOG(WARNING) << "finish report olap table state failed. status:" << status << ", master host:" << worker_pool_this->_master_info.network_address.hostname diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index f3516f308c..be99b69bc7 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -137,8 +137,8 @@ void* calculate_metrics(void* dummy) { if (last_ts == -1L) { last_ts = GetCurrentTimeMicros() / 1000; - lst_push_bytes = DorisMetrics::instance()->push_request_write_bytes.value(); - lst_query_bytes = DorisMetrics::instance()->query_scan_bytes.value(); + lst_push_bytes = DorisMetrics::instance()->push_request_write_bytes->value(); + lst_query_bytes = DorisMetrics::instance()->query_scan_bytes->value(); DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time); DorisMetrics::instance()->system_metrics()->get_network_traffic(&lst_net_send_bytes, &lst_net_receive_bytes); } else { @@ -147,21 +147,21 @@ void* calculate_metrics(void* dummy) { last_ts = current_ts; // 1. push bytes per second - int64_t current_push_bytes = DorisMetrics::instance()->push_request_write_bytes.value(); + int64_t current_push_bytes = DorisMetrics::instance()->push_request_write_bytes->value(); int64_t pps = (current_push_bytes - lst_push_bytes) / (interval + 1); - DorisMetrics::instance()->push_request_write_bytes_per_second.set_value( + DorisMetrics::instance()->push_request_write_bytes_per_second->set_value( pps < 0 ? 0 : pps); lst_push_bytes = current_push_bytes; // 2. query bytes per second - int64_t current_query_bytes = DorisMetrics::instance()->query_scan_bytes.value(); + int64_t current_query_bytes = DorisMetrics::instance()->query_scan_bytes->value(); int64_t qps = (current_query_bytes - lst_query_bytes) / (interval + 1); - DorisMetrics::instance()->query_scan_bytes_per_second.set_value( + DorisMetrics::instance()->query_scan_bytes_per_second->set_value( qps < 0 ? 0 : qps); lst_query_bytes = current_query_bytes; // 3. max disk io util - DorisMetrics::instance()->max_disk_io_util_percent.set_value( + DorisMetrics::instance()->max_disk_io_util_percent->set_value( DorisMetrics::instance()->system_metrics()->get_max_io_util(lst_disks_io_time, 15)); // update lst map DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time); @@ -171,8 +171,8 @@ void* calculate_metrics(void* dummy) { int64_t max_receive = 0; DorisMetrics::instance()->system_metrics()->get_max_net_traffic( lst_net_send_bytes, lst_net_receive_bytes, 15, &max_send, &max_receive); - DorisMetrics::instance()->max_network_send_bytes_rate.set_value(max_send); - DorisMetrics::instance()->max_network_receive_bytes_rate.set_value(max_receive); + DorisMetrics::instance()->max_network_send_bytes_rate->set_value(max_send); + DorisMetrics::instance()->max_network_receive_bytes_rate->set_value(max_receive); // update lst map DorisMetrics::instance()->system_metrics()->get_network_traffic(&lst_net_send_bytes, &lst_net_receive_bytes); } diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 38bbf7c6b5..aa3c3278e8 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -479,8 +479,11 @@ void OlapScanner::update_counter() { COUNTER_UPDATE(_parent->_filtered_segment_counter, _reader->stats().filtered_segment_number); COUNTER_UPDATE(_parent->_total_segment_counter, _reader->stats().total_segment_number); - DorisMetrics::instance()->query_scan_bytes.increment(_compressed_bytes_read); - DorisMetrics::instance()->query_scan_rows.increment(_raw_rows_read); + DorisMetrics::instance()->query_scan_bytes->increment(_compressed_bytes_read); + DorisMetrics::instance()->query_scan_rows->increment(_raw_rows_read); + + _tablet->query_scan_bytes->increment(_compressed_bytes_read); + _tablet->query_scan_rows->increment(_raw_rows_read); _has_update_counter = true; } diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 2845127525..47a078257b 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -604,8 +604,8 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) { // the real 'num_rows_load_total' will be set when sink being closed. state->update_num_rows_load_total(input_batch->num_rows()); state->update_num_bytes_load_total(input_batch->total_byte_size()); - DorisMetrics::instance()->load_rows.increment(input_batch->num_rows()); - DorisMetrics::instance()->load_bytes.increment(input_batch->total_byte_size()); + DorisMetrics::instance()->load_rows->increment(input_batch->num_rows()); + DorisMetrics::instance()->load_bytes->increment(input_batch->total_byte_size()); RowBatch* batch = input_batch; if (!_output_expr_ctxs.empty()) { SCOPED_RAW_TIMER(&_convert_batch_ns); diff --git a/be/src/http/action/compaction_action.cpp b/be/src/http/action/compaction_action.cpp index 505db91f9b..6b8e2ffedc 100644 --- a/be/src/http/action/compaction_action.cpp +++ b/be/src/http/action/compaction_action.cpp @@ -219,7 +219,7 @@ OLAPStatus CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet OLAPStatus res = base_compaction.compact(); if (res != OLAP_SUCCESS) { if (res != OLAP_ERR_BE_NO_SUITABLE_VERSION) { - DorisMetrics::instance()->base_compaction_request_failed.increment(1); + DorisMetrics::instance()->base_compaction_request_failed->increment(1); LOG(WARNING) << "failed to init base compaction. res=" << res << ", table=" << tablet->full_name(); } @@ -232,7 +232,7 @@ OLAPStatus CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet OLAPStatus res = cumulative_compaction.compact(); if (res != OLAP_SUCCESS) { if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS) { - DorisMetrics::instance()->cumulative_compaction_request_failed.increment(1); + DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1); LOG(WARNING) << "failed to do cumulative compaction. res=" << res << ", table=" << tablet->full_name(); } diff --git a/be/src/http/action/metrics_action.cpp b/be/src/http/action/metrics_action.cpp index 2ffca2f786..809bf14360 100644 --- a/be/src/http/action/metrics_action.cpp +++ b/be/src/http/action/metrics_action.cpp @@ -34,13 +34,14 @@ namespace doris { void MetricsAction::handle(HttpRequest* req) { const std::string& type = req->param("type"); + const std::string& with_tablet = req->param("with_tablet"); std::string str; if (type == "core") { str = _metric_registry->to_core_string(); } else if (type == "json") { - str = _metric_registry->to_json(); + str = _metric_registry->to_json(with_tablet == "true"); } else { - str = _metric_registry->to_prometheus(); + str = _metric_registry->to_prometheus(with_tablet == "true"); } req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4"); diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 4dc6253d54..ed65920388 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -89,15 +89,15 @@ static bool is_format_support_streaming(TFileFormatType::type format) { } StreamLoadAction::StreamLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) { - _stream_load_entity = DorisMetrics::instance()->metric_registry()->register_entity("stream_load", {}); - METRIC_REGISTER(_stream_load_entity, streaming_load_requests_total); - METRIC_REGISTER(_stream_load_entity, streaming_load_bytes); - METRIC_REGISTER(_stream_load_entity, streaming_load_duration_ms); - METRIC_REGISTER(_stream_load_entity, streaming_load_current_processing); + _stream_load_entity = DorisMetrics::instance()->metric_registry()->register_entity("stream_load"); + INT_COUNTER_METRIC_REGISTER(_stream_load_entity, streaming_load_requests_total); + INT_COUNTER_METRIC_REGISTER(_stream_load_entity, streaming_load_bytes); + INT_COUNTER_METRIC_REGISTER(_stream_load_entity, streaming_load_duration_ms); + INT_GAUGE_METRIC_REGISTER(_stream_load_entity, streaming_load_current_processing); } StreamLoadAction::~StreamLoadAction() { - DorisMetrics::instance()->metric_registry()->deregister_entity("stream_load"); + DorisMetrics::instance()->metric_registry()->deregister_entity(_stream_load_entity); } void StreamLoadAction::handle(HttpRequest* req) { @@ -130,10 +130,10 @@ void StreamLoadAction::handle(HttpRequest* req) { HttpChannel::send_reply(req, str); // update statstics - streaming_load_requests_total.increment(1); - streaming_load_duration_ms.increment(ctx->load_cost_nanos / 1000000); - streaming_load_bytes.increment(ctx->receive_bytes); - streaming_load_current_processing.increment(-1); + streaming_load_requests_total->increment(1); + streaming_load_duration_ms->increment(ctx->load_cost_nanos / 1000000); + streaming_load_bytes->increment(ctx->receive_bytes); + streaming_load_current_processing->increment(-1); } Status StreamLoadAction::_handle(StreamLoadContext* ctx) { @@ -165,7 +165,7 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) { } int StreamLoadAction::on_header(HttpRequest* req) { - streaming_load_current_processing.increment(1); + streaming_load_current_processing->increment(1); StreamLoadContext* ctx = new StreamLoadContext(_exec_env); ctx->ref(); @@ -196,7 +196,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { } auto str = ctx->to_json(); HttpChannel::send_reply(req, str); - streaming_load_current_processing.increment(-1); + streaming_load_current_processing->increment(-1); return -1; } return 0; diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h index 1555981986..87ce0bf444 100644 --- a/be/src/http/action/stream_load.h +++ b/be/src/http/action/stream_load.h @@ -54,11 +54,11 @@ private: private: ExecEnv* _exec_env; - MetricEntity* _stream_load_entity; - IntCounter streaming_load_requests_total; - IntCounter streaming_load_bytes; - IntCounter streaming_load_duration_ms; - IntGauge streaming_load_current_processing; + std::shared_ptr _stream_load_entity; + IntCounter* streaming_load_requests_total; + IntCounter* streaming_load_bytes; + IntCounter* streaming_load_duration_ms; + IntGauge* streaming_load_current_processing; }; } diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 2b6343fbf6..bb2404ca3b 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -52,8 +52,8 @@ OLAPStatus BaseCompaction::compact() { _state = CompactionState::SUCCESS; // 4. add metric to base compaction - DorisMetrics::instance()->base_compaction_deltas_total.increment(_input_rowsets.size()); - DorisMetrics::instance()->base_compaction_bytes_total.increment(_input_rowsets_size); + DorisMetrics::instance()->base_compaction_deltas_total->increment(_input_rowsets.size()); + DorisMetrics::instance()->base_compaction_bytes_total->increment(_input_rowsets_size); TRACE("save base compaction metrics"); return OLAP_SUCCESS; diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index d544f3653e..507bbc6637 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -17,20 +17,34 @@ #include "olap/base_tablet.h" +#include "gutil/strings/substitute.h" #include "olap/data_dir.h" +#include "util/doris_metrics.h" #include "util/path_util.h" namespace doris { +extern MetricPrototype METRIC_query_scan_bytes; +extern MetricPrototype METRIC_query_scan_rows; + BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) : _state(tablet_meta->tablet_state()), _tablet_meta(tablet_meta), _schema(tablet_meta->tablet_schema()), _data_dir(data_dir) { _gen_tablet_path(); + + _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( + strings::Substitute("Tablet.$0", tablet_id()), + {{"tablet_id", std::to_string(tablet_id())}}, + MetricEntityType::kTablet); + INT_COUNTER_METRIC_REGISTER(_metric_entity, query_scan_bytes); + INT_COUNTER_METRIC_REGISTER(_metric_entity, query_scan_rows); } -BaseTablet::~BaseTablet() {} +BaseTablet::~BaseTablet() { + DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity); +} OLAPStatus BaseTablet::set_tablet_state(TabletState state) { if (_tablet_meta->tablet_state() == TABLET_SHUTDOWN && state != TABLET_SHUTDOWN) { diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index f3b0c2dfd6..6097d23957 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -23,6 +23,7 @@ #include "olap/olap_define.h" #include "olap/tablet_meta.h" #include "olap/utils.h" +#include "util/metrics.h" namespace doris { @@ -74,6 +75,12 @@ protected: DataDir* _data_dir; std::string _tablet_path; + // metrics of this tablet + std::shared_ptr _metric_entity = nullptr; +public: + IntCounter* query_scan_bytes; + IntCounter* query_scan_rows; + private: DISALLOW_COPY_AND_ASSIGN(BaseTablet); }; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index dcbe3d2be5..6611e5102b 100755 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -66,8 +66,8 @@ OLAPStatus CumulativeCompaction::compact() { << _tablet->cumulative_layer_point() << ", tablet=" << _tablet->full_name(); // 6. add metric to cumulative compaction - DorisMetrics::instance()->cumulative_compaction_deltas_total.increment(_input_rowsets.size()); - DorisMetrics::instance()->cumulative_compaction_bytes_total.increment(_input_rowsets_size); + DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size()); + DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(_input_rowsets_size); TRACE("save cumulative compaction metrics"); return OLAP_SUCCESS; diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp index 863f0654ce..82613eb646 100644 --- a/be/src/olap/data_dir.cpp +++ b/be/src/olap/data_dir.cpp @@ -79,14 +79,14 @@ DataDir::DataDir(const std::string& path, int64_t capacity_bytes, _current_shard(0), _meta(nullptr) { _data_dir_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(std::string("data_dir.") + path, {{"path", path}}); - METRIC_REGISTER(_data_dir_metric_entity, disks_total_capacity); - METRIC_REGISTER(_data_dir_metric_entity, disks_avail_capacity); - METRIC_REGISTER(_data_dir_metric_entity, disks_data_used_capacity); - METRIC_REGISTER(_data_dir_metric_entity, disks_state); + INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_total_capacity); + INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_avail_capacity); + INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_data_used_capacity); + INT_GAUGE_METRIC_REGISTER(_data_dir_metric_entity, disks_state); } DataDir::~DataDir() { - DorisMetrics::instance()->metric_registry()->deregister_entity(std::string("data_dir.") + _path); + DorisMetrics::instance()->metric_registry()->deregister_entity(_data_dir_metric_entity); delete _id_generator; delete _meta; } @@ -314,7 +314,7 @@ void DataDir::health_check() { } } } - disks_state.set_value(_is_used ? 1 : 0); + disks_state->set_value(_is_used ? 1 : 0); } OLAPStatus DataDir::_read_and_write_test_file() { @@ -937,8 +937,8 @@ Status DataDir::update_capacity() { "boost::filesystem::space failed"); } - disks_total_capacity.set_value(_disk_capacity_bytes); - disks_avail_capacity.set_value(_available_bytes); + disks_total_capacity->set_value(_disk_capacity_bytes); + disks_avail_capacity->set_value(_available_bytes); LOG(INFO) << "path: " << _path << " total capacity: " << _disk_capacity_bytes << ", available capacity: " << _available_bytes; @@ -946,7 +946,7 @@ Status DataDir::update_capacity() { } void DataDir::update_user_data_size(int64_t size) { - disks_data_used_capacity.set_value(size); + disks_data_used_capacity->set_value(size); } bool DataDir::reach_capacity_limit(int64_t incoming_data_size) { diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h index 18cc6124bd..be434de3df 100644 --- a/be/src/olap/data_dir.h +++ b/be/src/olap/data_dir.h @@ -192,11 +192,11 @@ private: // used in convert process bool _convert_old_data_success; - MetricEntity* _data_dir_metric_entity; - IntGauge disks_total_capacity; - IntGauge disks_avail_capacity; - IntGauge disks_data_used_capacity; - IntGauge disks_state; + std::shared_ptr _data_dir_metric_entity; + IntGauge* disks_total_capacity; + IntGauge* disks_avail_capacity; + IntGauge* disks_data_used_capacity; + IntGauge* disks_state; }; } // namespace doris diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 1960a16b41..b24a3410a5 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -60,6 +60,10 @@ DeltaWriter::~DeltaWriter() { if (_flush_token != nullptr) { // cancel and wait all memtables in flush queue to be finished _flush_token->cancel(); + + const FlushStatistic& stat = _flush_token->get_stats(); + _tablet->flush_bytes->increment(stat.flush_size_bytes); + _tablet->flush_count->increment(stat.flush_count); } if (_tablet != nullptr) { @@ -92,7 +96,7 @@ OLAPStatus DeltaWriter::init() { TabletManager* tablet_mgr = _storage_engine->tablet_manager(); _tablet = tablet_mgr->get_tablet(_req.tablet_id, _req.schema_hash); if (_tablet == nullptr) { - LOG(WARNING) << "fail to find tablet . tablet_id=" << _req.tablet_id + LOG(WARNING) << "fail to find tablet. tablet_id=" << _req.tablet_id << ", schema_hash=" << _req.schema_hash; return OLAP_ERR_TABLE_NOT_FOUND; } diff --git a/be/src/olap/fs/block_manager_metrics.cpp b/be/src/olap/fs/block_manager_metrics.cpp index dabc93d50a..0eba2510d9 100644 --- a/be/src/olap/fs/block_manager_metrics.cpp +++ b/be/src/olap/fs/block_manager_metrics.cpp @@ -24,16 +24,16 @@ namespace fs { namespace internal { BlockManagerMetrics::BlockManagerMetrics() { - blocks_open_reading = &DorisMetrics::instance()->blocks_open_reading; - blocks_open_writing = &DorisMetrics::instance()->blocks_open_writing; + blocks_open_reading = DorisMetrics::instance()->blocks_open_reading; + blocks_open_writing = DorisMetrics::instance()->blocks_open_writing; - total_readable_blocks = &DorisMetrics::instance()->readable_blocks_total; - total_writable_blocks = &DorisMetrics::instance()->writable_blocks_total; - total_blocks_created = &DorisMetrics::instance()->blocks_created_total; - total_blocks_deleted = &DorisMetrics::instance()->blocks_deleted_total; - total_bytes_read = &DorisMetrics::instance()->bytes_read_total; - total_bytes_written = &DorisMetrics::instance()->bytes_written_total; - total_disk_sync = &DorisMetrics::instance()->disk_sync_total; + total_readable_blocks = DorisMetrics::instance()->readable_blocks_total; + total_writable_blocks = DorisMetrics::instance()->writable_blocks_total; + total_blocks_created = DorisMetrics::instance()->blocks_created_total; + total_blocks_deleted = DorisMetrics::instance()->blocks_deleted_total; + total_bytes_read = DorisMetrics::instance()->bytes_read_total; + total_bytes_written = DorisMetrics::instance()->bytes_written_total; + total_disk_sync = DorisMetrics::instance()->disk_sync_total; } } // namespace internal diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 25a6a81912..135a70af9a 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -124,8 +124,8 @@ OLAPStatus MemTable::flush() { } RETURN_NOT_OK(_rowset_writer->flush()); } - DorisMetrics::instance()->memtable_flush_total.increment(1); - DorisMetrics::instance()->memtable_flush_duration_us.increment(duration_ns / 1000); + DorisMetrics::instance()->memtable_flush_total->increment(1); + DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); return OLAP_SUCCESS; } diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index b3d0cbeb47..cb789c46ce 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -35,7 +35,7 @@ class MemTable; // use atomic because it may be updated by multi threads struct FlushStatistic { int64_t flush_time_ns = 0; - int64_t flush_count= 0; + int64_t flush_count = 0; int64_t flush_size_bytes = 0; }; diff --git a/be/src/olap/olap_meta.cpp b/be/src/olap/olap_meta.cpp index a912f4fc07..13ab08686c 100755 --- a/be/src/olap/olap_meta.cpp +++ b/be/src/olap/olap_meta.cpp @@ -86,7 +86,7 @@ OLAPStatus OlapMeta::init() { } OLAPStatus OlapMeta::get(const int column_family_index, const std::string& key, std::string* value) { - DorisMetrics::instance()->meta_read_request_total.increment(1); + DorisMetrics::instance()->meta_read_request_total->increment(1); rocksdb::ColumnFamilyHandle* handle = _handles[column_family_index]; int64_t duration_ns = 0; rocksdb::Status s; @@ -94,7 +94,7 @@ OLAPStatus OlapMeta::get(const int column_family_index, const std::string& key, SCOPED_RAW_TIMER(&duration_ns); s = _db->Get(ReadOptions(), handle, Slice(key), value); } - DorisMetrics::instance()->meta_read_request_duration_us.increment(duration_ns / 1000); + DorisMetrics::instance()->meta_read_request_duration_us->increment(duration_ns / 1000); if (s.IsNotFound()) { return OLAP_ERR_META_KEY_NOT_FOUND; } else if (!s.ok()) { @@ -105,7 +105,7 @@ OLAPStatus OlapMeta::get(const int column_family_index, const std::string& key, } OLAPStatus OlapMeta::put(const int column_family_index, const std::string& key, const std::string& value) { - DorisMetrics::instance()->meta_write_request_total.increment(1); + DorisMetrics::instance()->meta_write_request_total->increment(1); rocksdb::ColumnFamilyHandle* handle = _handles[column_family_index]; int64_t duration_ns = 0; rocksdb::Status s; @@ -115,7 +115,7 @@ OLAPStatus OlapMeta::put(const int column_family_index, const std::string& key, write_options.sync = config::sync_tablet_meta; s = _db->Put(write_options, handle, Slice(key), Slice(value)); } - DorisMetrics::instance()->meta_write_request_duration_us.increment(duration_ns / 1000); + DorisMetrics::instance()->meta_write_request_duration_us->increment(duration_ns / 1000); if (!s.ok()) { LOG(WARNING) << "rocks db put key:" << key << " failed, reason:" << s.ToString(); return OLAP_ERR_META_PUT; @@ -124,7 +124,7 @@ OLAPStatus OlapMeta::put(const int column_family_index, const std::string& key, } OLAPStatus OlapMeta::remove(const int column_family_index, const std::string& key) { - DorisMetrics::instance()->meta_write_request_total.increment(1); + DorisMetrics::instance()->meta_write_request_total->increment(1); rocksdb::ColumnFamilyHandle* handle = _handles[column_family_index]; rocksdb::Status s; int64_t duration_ns = 0; @@ -134,7 +134,7 @@ OLAPStatus OlapMeta::remove(const int column_family_index, const std::string& ke write_options.sync = config::sync_tablet_meta; s = _db->Delete(write_options, handle, Slice(key)); } - DorisMetrics::instance()->meta_write_request_duration_us.increment(duration_ns / 1000); + DorisMetrics::instance()->meta_write_request_duration_us->increment(duration_ns / 1000); if (!s.ok()) { LOG(WARNING) << "rocks db delete key:" << key << " failed, reason:" << s.ToString(); return OLAP_ERR_META_DELETE; diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index ec8bb47415..90d8e8e51c 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -124,7 +124,7 @@ Status SegmentIterator::init(const StorageReadOptions& opts) { } Status SegmentIterator::_init() { - DorisMetrics::instance()->segment_read_total.increment(1); + DorisMetrics::instance()->segment_read_total->increment(1); // get file handle from file descriptor of segment fs::BlockManager* block_mgr = fs::fs_util::block_manager(); RETURN_IF_ERROR(block_mgr->open_block(_segment->_fname, &_rblock)); @@ -139,7 +139,7 @@ Status SegmentIterator::_init() { } Status SegmentIterator::_get_row_ranges_by_keys() { - DorisMetrics::instance()->segment_row_total.increment(num_rows()); + DorisMetrics::instance()->segment_row_total->increment(num_rows()); // fast path for empty segment or empty key ranges if (_row_bitmap.isEmpty() || _opts.key_ranges.empty()) { @@ -169,7 +169,7 @@ Status SegmentIterator::_get_row_ranges_by_keys() { size_t pre_size = _row_bitmap.cardinality(); _row_bitmap = RowRanges::ranges_to_roaring(result_ranges); _opts.stats->rows_key_range_filtered += (pre_size - _row_bitmap.cardinality()); - DorisMetrics::instance()->segment_rows_by_short_key.increment(_row_bitmap.cardinality()); + DorisMetrics::instance()->segment_rows_by_short_key->increment(_row_bitmap.cardinality()); return Status::OK(); } @@ -290,7 +290,7 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row RowRanges::ranges_intersection(zone_map_row_ranges, delete_condition_row_ranges, &zone_map_row_ranges); } - DorisMetrics::instance()->segment_rows_read_by_zone_map.increment(zone_map_row_ranges.count()); + DorisMetrics::instance()->segment_rows_read_by_zone_map->increment(zone_map_row_ranges.count()); pre_size = condition_row_ranges->count(); RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges, condition_row_ranges); _opts.stats->rows_stats_filtered += (pre_size - condition_row_ranges->count()); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 1a22e2cadf..b95e9d8598 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -561,7 +561,7 @@ void StorageEngine::_perform_cumulative_compaction(DataDir* data_dir) { } TRACE("found best tablet $0", best_tablet->get_tablet_info().tablet_id); - DorisMetrics::instance()->cumulative_compaction_request_total.increment(1); + DorisMetrics::instance()->cumulative_compaction_request_total->increment(1); std::string tracker_label = "cumulative compaction " + std::to_string(syscall(__NR_gettid)); CumulativeCompaction cumulative_compaction(best_tablet, tracker_label, _compaction_mem_tracker); @@ -570,7 +570,7 @@ void StorageEngine::_perform_cumulative_compaction(DataDir* data_dir) { if (res != OLAP_SUCCESS) { best_tablet->set_last_cumu_compaction_failure_time(UnixMillis()); if (res != OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS) { - DorisMetrics::instance()->cumulative_compaction_request_failed.increment(1); + DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1); LOG(WARNING) << "failed to do cumulative compaction. res=" << res << ", table=" << best_tablet->full_name(); } @@ -597,7 +597,7 @@ void StorageEngine::_perform_base_compaction(DataDir* data_dir) { } TRACE("found best tablet $0", best_tablet->get_tablet_info().tablet_id); - DorisMetrics::instance()->base_compaction_request_total.increment(1); + DorisMetrics::instance()->base_compaction_request_total->increment(1); std::string tracker_label = "base compaction " + std::to_string(syscall(__NR_gettid)); BaseCompaction base_compaction(best_tablet, tracker_label, _compaction_mem_tracker); @@ -605,7 +605,7 @@ void StorageEngine::_perform_base_compaction(DataDir* data_dir) { if (res != OLAP_SUCCESS) { best_tablet->set_last_base_compaction_failure_time(UnixMillis()); if (res != OLAP_ERR_BE_NO_SUITABLE_VERSION) { - DorisMetrics::instance()->base_compaction_request_failed.increment(1); + DorisMetrics::instance()->base_compaction_request_failed->increment(1); LOG(WARNING) << "failed to init base compaction. res=" << res << ", table=" << best_tablet->full_name(); } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 8f3ca8c125..58768942ea 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -49,6 +49,9 @@ using std::sort; using std::string; using std::vector; +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_bytes, MetricUnit::BYTES); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_count, MetricUnit::OPERATIONS); + TabletSharedPtr Tablet::create_tablet_from_meta(TabletMetaSharedPtr tablet_meta, DataDir* data_dir) { return std::make_shared(tablet_meta, data_dir); @@ -67,6 +70,9 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir, // construct _timestamped_versioned_tracker from rs and stale rs meta _timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas(), _tablet_meta->all_stale_rs_metas()); + + INT_COUNTER_METRIC_REGISTER(_metric_entity, flush_bytes); + INT_COUNTER_METRIC_REGISTER(_metric_entity, flush_count); } OLAPStatus Tablet::_init_once_action() { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 4f1ba22b92..c32daad3b5 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -301,6 +301,10 @@ private: std::unique_ptr _cumulative_compaction_policy; std::string _cumulative_compaction_type; DISALLOW_COPY_AND_ASSIGN(Tablet); + +public: + IntCounter* flush_bytes; + IntCounter* flush_count; }; inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() { diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 2290cf2058..6260f21631 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -207,7 +207,7 @@ bool TabletManager::_check_tablet_id_exist_unlocked(TTabletId tablet_id) { OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, std::vector stores) { - DorisMetrics::instance()->create_tablet_requests_total.increment(1); + DorisMetrics::instance()->create_tablet_requests_total->increment(1); int64_t tablet_id = request.tablet_id; int32_t schema_hash = request.tablet_schema.schema_hash; @@ -231,7 +231,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, } else { LOG(WARNING) << "fail to create tablet. tablet exist but with different schema_hash. " << "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash; - DorisMetrics::instance()->create_tablet_requests_failed.increment(1); + DorisMetrics::instance()->create_tablet_requests_failed->increment(1); return OLAP_ERR_CE_TABLET_ID_EXIST; } } @@ -247,7 +247,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, << "new_tablet_id=" << tablet_id << ", new_schema_hash=" << schema_hash << ", base_tablet_id=" << request.base_tablet_id << ", base_schema_hash=" << request.base_schema_hash; - DorisMetrics::instance()->create_tablet_requests_failed.increment(1); + DorisMetrics::instance()->create_tablet_requests_failed->increment(1); return OLAP_ERR_TABLE_CREATE_META_ERROR; } // If we are doing schema-change, we should use the same data dir @@ -262,7 +262,7 @@ OLAPStatus TabletManager::create_tablet(const TCreateTabletReq& request, AlterTabletType::SCHEMA_CHANGE, request, is_schema_change, base_tablet.get(), stores); if (tablet == nullptr) { LOG(WARNING) << "fail to create tablet. tablet_id=" << request.tablet_id; - DorisMetrics::instance()->create_tablet_requests_failed.increment(1); + DorisMetrics::instance()->create_tablet_requests_failed->increment(1); return OLAP_ERR_CE_CMD_PARAMS_ERROR; } @@ -459,7 +459,7 @@ OLAPStatus TabletManager::drop_tablet( OLAPStatus TabletManager::_drop_tablet_unlocked( TTabletId tablet_id, SchemaHash schema_hash, bool keep_files) { LOG(INFO) << "begin drop tablet. tablet_id=" << tablet_id << ", schema_hash=" << schema_hash; - DorisMetrics::instance()->drop_tablet_requests_total.increment(1); + DorisMetrics::instance()->drop_tablet_requests_total->increment(1); // Fetch tablet which need to be droped TabletSharedPtr to_drop_tablet = _get_tablet_unlocked(tablet_id, schema_hash); @@ -764,9 +764,9 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(CompactionType com // TODO(lingbin): Remove 'max' from metric name, it would be misunderstood as the // biggest in history(like peak), but it is really just the value at current moment. if (compaction_type == CompactionType::BASE_COMPACTION) { - DorisMetrics::instance()->tablet_base_max_compaction_score.set_value(highest_score); + DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(highest_score); } else { - DorisMetrics::instance()->tablet_cumulative_max_compaction_score.set_value(highest_score); + DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(highest_score); } } return best_tablet; @@ -906,7 +906,7 @@ void TabletManager::release_schema_change_lock(TTabletId tablet_id) { } OLAPStatus TabletManager::report_tablet_info(TTabletInfo* tablet_info) { - DorisMetrics::instance()->report_tablet_requests_total.increment(1); + DorisMetrics::instance()->report_tablet_requests_total->increment(1); LOG(INFO) << "begin to process report tablet info." << "tablet_id=" << tablet_info->tablet_id << ", schema_hash=" << tablet_info->schema_hash; @@ -934,7 +934,7 @@ OLAPStatus TabletManager::report_all_tablets_info(std::map* StorageEngine::instance()->txn_manager()->build_expire_txn_map(&expire_txn_map); LOG(INFO) << "find expired transactions for " << expire_txn_map.size() << " tablets"; - DorisMetrics::instance()->report_all_tablets_requests_total.increment(1); + DorisMetrics::instance()->report_all_tablets_requests_total->increment(1); for (int32 i = 0; i < _tablet_map_lock_shard_size; i++) { ReadLock rlock(&_tablet_map_lock_array[i]); diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp index 2d768819a1..0e7e1172e4 100644 --- a/be/src/olap/task/engine_alter_tablet_task.cpp +++ b/be/src/olap/task/engine_alter_tablet_task.cpp @@ -33,7 +33,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request, _process_name(process_name) { } OLAPStatus EngineAlterTabletTask::execute() { - DorisMetrics::instance()->create_rollup_requests_total.increment(1); + DorisMetrics::instance()->create_rollup_requests_total->increment(1); SchemaChangeHandler handler; OLAPStatus res = handler.process_alter_tablet_v2(_alter_tablet_req); @@ -44,7 +44,7 @@ OLAPStatus EngineAlterTabletTask::execute() { << ", base_schema_hash=" << _alter_tablet_req.base_schema_hash << ", new_tablet_id=" << _alter_tablet_req.new_tablet_id << ", new_schema_hash=" << _alter_tablet_req.new_schema_hash; - DorisMetrics::instance()->create_rollup_requests_failed.increment(1); + DorisMetrics::instance()->create_rollup_requests_failed->increment(1); return res; } diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index 7346f598d5..cf9c5a83cc 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -295,7 +295,7 @@ OLAPStatus EngineBatchLoadTask::_push(const TPushReq& request, if (tablet_info_vec == nullptr) { LOG(WARNING) << "invalid output parameter which is nullptr pointer."; - DorisMetrics::instance()->push_requests_fail_total.increment(1); + DorisMetrics::instance()->push_requests_fail_total->increment(1); return OLAP_ERR_CE_CMD_PARAMS_ERROR; } @@ -304,7 +304,7 @@ OLAPStatus EngineBatchLoadTask::_push(const TPushReq& request, if (tablet == nullptr) { LOG(WARNING) << "false to find tablet. tablet=" << request.tablet_id << ", schema_hash=" << request.schema_hash; - DorisMetrics::instance()->push_requests_fail_total.increment(1); + DorisMetrics::instance()->push_requests_fail_total->increment(1); return OLAP_ERR_TABLE_NOT_FOUND; } @@ -334,16 +334,16 @@ OLAPStatus EngineBatchLoadTask::_push(const TPushReq& request, << "transaction_id=" << request.transaction_id << " tablet=" << tablet->full_name() << ", cost=" << PrettyPrinter::print(duration_ns, TUnit::TIME_NS); - DorisMetrics::instance()->push_requests_fail_total.increment(1); + DorisMetrics::instance()->push_requests_fail_total->increment(1); } else { LOG(INFO) << "success to push delta, " << "transaction_id=" << request.transaction_id << " tablet=" << tablet->full_name() << ", cost=" << PrettyPrinter::print(duration_ns, TUnit::TIME_NS); - DorisMetrics::instance()->push_requests_success_total.increment(1); - DorisMetrics::instance()->push_request_duration_us.increment(duration_ns / 1000); - DorisMetrics::instance()->push_request_write_bytes.increment(push_handler.write_bytes()); - DorisMetrics::instance()->push_request_write_rows.increment(push_handler.write_rows()); + DorisMetrics::instance()->push_requests_success_total->increment(1); + DorisMetrics::instance()->push_request_duration_us->increment(duration_ns / 1000); + DorisMetrics::instance()->push_request_write_bytes->increment(push_handler.write_bytes()); + DorisMetrics::instance()->push_request_write_rows->increment(push_handler.write_rows()); } return res; } @@ -352,7 +352,7 @@ OLAPStatus EngineBatchLoadTask::_delete_data( const TPushReq& request, vector* tablet_info_vec) { LOG(INFO) << "begin to process delete data. request=" << ThriftDebugString(request); - DorisMetrics::instance()->delete_requests_total.increment(1); + DorisMetrics::instance()->delete_requests_total->increment(1); OLAPStatus res = OLAP_SUCCESS; @@ -381,7 +381,7 @@ OLAPStatus EngineBatchLoadTask::_delete_data( OLAP_LOG_WARNING("fail to push empty version for delete data. " "[res=%d tablet='%s']", res, tablet->full_name().c_str()); - DorisMetrics::instance()->delete_requests_failed.increment(1); + DorisMetrics::instance()->delete_requests_failed->increment(1); return res; } diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index dc19f348f3..c7ffa37452 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -42,7 +42,7 @@ OLAPStatus EngineStorageMigrationTask::_storage_medium_migrate( LOG(INFO) << "begin to process storage media migrate. " << "tablet_id=" << tablet_id << ", schema_hash=" << schema_hash << ", dest_storage_medium=" << storage_medium; - DorisMetrics::instance()->storage_migrate_requests_total.increment(1); + DorisMetrics::instance()->storage_migrate_requests_total->increment(1); OLAPStatus res = OLAP_SUCCESS; TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash); diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp index cc5bc75ece..d93c22ed79 100644 --- a/be/src/runtime/client_cache.cpp +++ b/be/src/runtime/client_cache.cpp @@ -71,7 +71,7 @@ Status ClientCacheHelper::get_client( _client_map[*client_key]->set_recv_timeout(timeout_ms); if (_metrics_enabled) { - thrift_used_clients.increment(1); + thrift_used_clients->increment(1); } return Status::OK(); @@ -96,7 +96,7 @@ Status ClientCacheHelper::reopen_client(client_factory factory_method, void** cl *client_key = NULL; if (_metrics_enabled) { - thrift_opened_clients.increment(-1); + thrift_opened_clients->increment(-1); } RETURN_IF_ERROR(create_client(make_network_address( @@ -127,7 +127,7 @@ Status ClientCacheHelper::create_client( _client_map[*client_key] = client_impl.release(); if (_metrics_enabled) { - thrift_opened_clients.increment(1); + thrift_opened_clients->increment(1); } return Status::OK(); @@ -149,14 +149,14 @@ void ClientCacheHelper::release_client(void** client_key) { delete info; if (_metrics_enabled) { - thrift_opened_clients.increment(-1); + thrift_opened_clients->increment(-1); } } else { j->second.push_back(*client_key); } if (_metrics_enabled) { - thrift_used_clients.increment(-1); + thrift_used_clients->increment(-1); } *client_key = NULL; @@ -222,8 +222,8 @@ void ClientCacheHelper::init_metrics(const std::string& name) { _thrift_client_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( std::string("thrift_client.") + name, {{"name", name}}); - METRIC_REGISTER(_thrift_client_metric_entity, thrift_used_clients); - METRIC_REGISTER(_thrift_client_metric_entity, thrift_opened_clients); + INT_GAUGE_METRIC_REGISTER(_thrift_client_metric_entity, thrift_used_clients); + INT_GAUGE_METRIC_REGISTER(_thrift_client_metric_entity, thrift_opened_clients); _metrics_enabled = true; } diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h index 0af2ae5eef..fbcaede544 100644 --- a/be/src/runtime/client_cache.h +++ b/be/src/runtime/client_cache.h @@ -114,13 +114,13 @@ private: // max connections per host in this cache, -1 means unlimited int _max_cache_size_per_host; - MetricEntity* _thrift_client_metric_entity; + std::shared_ptr _thrift_client_metric_entity; // Number of clients 'checked-out' from the cache - IntGauge thrift_used_clients; + IntGauge* thrift_used_clients; // Total clients in the cache, including those in use - IntGauge thrift_opened_clients; + IntGauge* thrift_opened_clients; // Create a new client for specific host/port in 'client' and put it in _client_map Status create_client(const TNetworkAddress& hostport, client_factory factory_method, diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index c3bf83bb86..2b12ad4bb4 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -220,8 +220,8 @@ Status FragmentExecState::execute() { print_id(_fragment_instance_id))); _executor.close(); } - DorisMetrics::instance()->fragment_requests_total.increment(1); - DorisMetrics::instance()->fragment_request_duration_us.increment(duration_ns / 1000); + DorisMetrics::instance()->fragment_requests_total->increment(1); + DorisMetrics::instance()->fragment_request_duration_us->increment(duration_ns / 1000); return Status::OK(); } diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index 4232cde200..799ec1a7b6 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -40,7 +40,7 @@ uint32_t MemPool::k_zero_length_region_ alignas(std::max_align_t) = MEM_POOL_POI MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), allocated_bytes(0) { - DorisMetrics::instance()->memory_pool_bytes_total.increment(chunk.size); + DorisMetrics::instance()->memory_pool_bytes_total->increment(chunk.size); } MemPool::~MemPool() { @@ -50,7 +50,7 @@ MemPool::~MemPool() { ChunkAllocator::instance()->free(chunk.chunk); } mem_tracker_->Release(total_bytes_released); - DorisMetrics::instance()->memory_pool_bytes_total.increment(-total_bytes_released); + DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); } void MemPool::clear() { @@ -76,7 +76,7 @@ void MemPool::free_all() { total_reserved_bytes_ = 0; mem_tracker_->Release(total_bytes_released); - DorisMetrics::instance()->memory_pool_bytes_total.increment(-total_bytes_released); + DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); } bool MemPool::find_chunk(size_t min_size, bool check_limits) { diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp index 5fbc431dc6..22654b13f9 100644 --- a/be/src/runtime/memory/chunk_allocator.cpp +++ b/be/src/runtime/memory/chunk_allocator.cpp @@ -41,12 +41,12 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_system_free_count, MetricUnit::N DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_system_alloc_cost_ns, MetricUnit::NANOSECONDS); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_system_free_cost_ns, MetricUnit::NANOSECONDS); -static IntCounter chunk_pool_local_core_alloc_count; -static IntCounter chunk_pool_other_core_alloc_count; -static IntCounter chunk_pool_system_alloc_count; -static IntCounter chunk_pool_system_free_count; -static IntCounter chunk_pool_system_alloc_cost_ns; -static IntCounter chunk_pool_system_free_cost_ns; +static IntCounter* chunk_pool_local_core_alloc_count; +static IntCounter* chunk_pool_other_core_alloc_count; +static IntCounter* chunk_pool_system_alloc_count; +static IntCounter* chunk_pool_system_free_count; +static IntCounter* chunk_pool_system_alloc_cost_ns; +static IntCounter* chunk_pool_system_free_cost_ns; #ifdef BE_TEST static std::mutex s_mutex; @@ -118,13 +118,13 @@ ChunkAllocator::ChunkAllocator(size_t reserve_limit) _arenas[i].reset(new ChunkArena()); } - _chunk_allocator_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity("chunk_allocator", {}); - METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_local_core_alloc_count); - METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_other_core_alloc_count); - METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_alloc_count); - METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_free_count); - METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_alloc_cost_ns); - METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_free_cost_ns); + _chunk_allocator_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity("chunk_allocator"); + INT_COUNTER_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_local_core_alloc_count); + INT_COUNTER_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_other_core_alloc_count); + INT_COUNTER_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_alloc_count); + INT_COUNTER_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_free_count); + INT_COUNTER_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_alloc_cost_ns); + INT_COUNTER_METRIC_REGISTER(_chunk_allocator_metric_entity, chunk_pool_system_free_cost_ns); } bool ChunkAllocator::allocate(size_t size, Chunk* chunk) { @@ -135,7 +135,7 @@ bool ChunkAllocator::allocate(size_t size, Chunk* chunk) { if (_arenas[core_id]->pop_free_chunk(size, &chunk->data)) { _reserved_bytes.fetch_sub(size); - chunk_pool_local_core_alloc_count.increment(1); + chunk_pool_local_core_alloc_count->increment(1); return true; } if (_reserved_bytes > size) { @@ -144,7 +144,7 @@ bool ChunkAllocator::allocate(size_t size, Chunk* chunk) { for (int i = 1; i < _arenas.size(); ++i, ++core_id) { if (_arenas[core_id % _arenas.size()]->pop_free_chunk(size, &chunk->data)) { _reserved_bytes.fetch_sub(size); - chunk_pool_other_core_alloc_count.increment(1); + chunk_pool_other_core_alloc_count->increment(1); // reset chunk's core_id to other chunk->core_id = core_id % _arenas.size(); return true; @@ -158,8 +158,8 @@ bool ChunkAllocator::allocate(size_t size, Chunk* chunk) { // allocate from system allocator chunk->data = SystemAllocator::allocate(size); } - chunk_pool_system_alloc_count.increment(1); - chunk_pool_system_alloc_cost_ns.increment(cost_ns); + chunk_pool_system_alloc_count->increment(1); + chunk_pool_system_alloc_cost_ns->increment(cost_ns); if (chunk->data == nullptr) { return false; } @@ -177,8 +177,8 @@ void ChunkAllocator::free(const Chunk& chunk) { SCOPED_RAW_TIMER(&cost_ns); SystemAllocator::free(chunk.data, chunk.size); } - chunk_pool_system_free_count.increment(1); - chunk_pool_system_free_cost_ns.increment(cost_ns); + chunk_pool_system_free_count->increment(1); + chunk_pool_system_free_cost_ns->increment(cost_ns); return; } diff --git a/be/src/runtime/memory/chunk_allocator.h b/be/src/runtime/memory/chunk_allocator.h index b4bb7fa90a..38f84bbf1f 100644 --- a/be/src/runtime/memory/chunk_allocator.h +++ b/be/src/runtime/memory/chunk_allocator.h @@ -77,7 +77,7 @@ private: // each core has a ChunkArena std::vector> _arenas; - MetricEntity* _chunk_allocator_metric_entity; + std::shared_ptr _chunk_allocator_metric_entity; }; } diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 1e5d4a196d..13dad53cfd 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -43,7 +43,7 @@ Status k_stream_load_plan_status; #endif Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { - DorisMetrics::instance()->txn_exec_plan_total.increment(1); + DorisMetrics::instance()->txn_exec_plan_total->increment(1); // submit this params #ifndef BE_TEST ctx->ref(); @@ -79,8 +79,8 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { } if (status.ok()) { - DorisMetrics::instance()->stream_receive_bytes_total.increment(ctx->receive_bytes); - DorisMetrics::instance()->stream_load_rows_total.increment(ctx->number_loaded_rows); + DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes); + DorisMetrics::instance()->stream_load_rows_total->increment(ctx->number_loaded_rows); } } else { LOG(WARNING) << "fragment execute failed" @@ -121,7 +121,7 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { } Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { - DorisMetrics::instance()->txn_begin_request_total.increment(1); + DorisMetrics::instance()->txn_begin_request_total->increment(1); TLoadTxnBeginRequest request; set_request_auth(&request, ctx->auth); @@ -162,7 +162,7 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { } Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { - DorisMetrics::instance()->txn_commit_request_total.increment(1); + DorisMetrics::instance()->txn_commit_request_total->increment(1); TLoadTxnCommitRequest request; set_request_auth(&request, ctx->auth); @@ -211,7 +211,7 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { } void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { - DorisMetrics::instance()->txn_rollback_request_total.increment(1); + DorisMetrics::instance()->txn_rollback_request_total->increment(1); TNetworkAddress master_addr = _exec_env->master_info()->network_address; TLoadTxnRollbackRequest request; diff --git a/be/src/runtime/tmp_file_mgr.cc b/be/src/runtime/tmp_file_mgr.cc index a2c71bcc48..4e592a8a31 100644 --- a/be/src/runtime/tmp_file_mgr.cc +++ b/be/src/runtime/tmp_file_mgr.cc @@ -54,15 +54,15 @@ const uint64_t _s_available_space_threshold_mb = 1024; TmpFileMgr::TmpFileMgr(ExecEnv* exec_env) : _exec_env(exec_env), _initialized(false), _dir_status_lock(), _tmp_dirs() { - METRIC_REGISTER(DorisMetrics::instance()->metric_registry()->get_entity("server"), active_scratch_dirs); + INT_GAUGE_METRIC_REGISTER(DorisMetrics::instance()->server_entity(), active_scratch_dirs); } TmpFileMgr::TmpFileMgr() { - METRIC_REGISTER(DorisMetrics::instance()->metric_registry()->get_entity("server"), active_scratch_dirs); + INT_GAUGE_METRIC_REGISTER(DorisMetrics::instance()->server_entity(), active_scratch_dirs); } TmpFileMgr::~TmpFileMgr() { - METRIC_DEREGISTER(DorisMetrics::instance()->metric_registry()->get_entity("server"), active_scratch_dirs); + METRIC_DEREGISTER(DorisMetrics::instance()->server_entity(), active_scratch_dirs); } Status TmpFileMgr::init() { @@ -124,7 +124,7 @@ Status TmpFileMgr::init_custom(const vector& tmp_dirs, bool one_dir_per_ } } - active_scratch_dirs.set_value(_tmp_dirs.size()); + active_scratch_dirs->set_value(_tmp_dirs.size()); _initialized = true; @@ -176,7 +176,7 @@ void TmpFileMgr::blacklist_device(DeviceId device_id) { added = _tmp_dirs[device_id].blacklist(); } if (added) { - active_scratch_dirs.increment(-1); + active_scratch_dirs->increment(-1); } } diff --git a/be/src/runtime/tmp_file_mgr.h b/be/src/runtime/tmp_file_mgr.h index d7ad149b45..b19250a81f 100644 --- a/be/src/runtime/tmp_file_mgr.h +++ b/be/src/runtime/tmp_file_mgr.h @@ -192,7 +192,7 @@ private: std::vector _tmp_dirs; // Metric to track active scratch directories. - IntGauge active_scratch_dirs; + IntGauge* active_scratch_dirs; }; } // end namespace doris diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index f9169c73e4..023074a66e 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -131,103 +131,103 @@ const std::string DorisMetrics::_s_registry_name = "doris_be"; const std::string DorisMetrics::_s_hook_name = "doris_metrics"; DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { - _server_metric_entity = _metric_registry.register_entity("server", {}); + _server_metric_entity = _metric_registry.register_entity("server"); - METRIC_REGISTER(_server_metric_entity, fragment_requests_total); - METRIC_REGISTER(_server_metric_entity, fragment_request_duration_us); - METRIC_REGISTER(_server_metric_entity, http_requests_total); - METRIC_REGISTER(_server_metric_entity, http_request_send_bytes); - METRIC_REGISTER(_server_metric_entity, query_scan_bytes); - METRIC_REGISTER(_server_metric_entity, query_scan_rows); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fragment_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, fragment_request_duration_us); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, http_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, http_request_send_bytes); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_bytes); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, query_scan_rows); - METRIC_REGISTER(_server_metric_entity, push_requests_success_total); - METRIC_REGISTER(_server_metric_entity, push_requests_fail_total); - METRIC_REGISTER(_server_metric_entity, push_request_duration_us); - METRIC_REGISTER(_server_metric_entity, push_request_write_bytes); - METRIC_REGISTER(_server_metric_entity, push_request_write_rows); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_requests_success_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_requests_fail_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_request_duration_us); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_request_write_bytes); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, push_request_write_rows); // engine_requests_total - METRIC_REGISTER(_server_metric_entity, create_tablet_requests_total); - METRIC_REGISTER(_server_metric_entity, create_tablet_requests_failed); - METRIC_REGISTER(_server_metric_entity, drop_tablet_requests_total); - METRIC_REGISTER(_server_metric_entity, report_all_tablets_requests_total); - METRIC_REGISTER(_server_metric_entity, report_all_tablets_requests_failed); - METRIC_REGISTER(_server_metric_entity, report_tablet_requests_total); - METRIC_REGISTER(_server_metric_entity, report_tablet_requests_failed); - METRIC_REGISTER(_server_metric_entity, report_disk_requests_total); - METRIC_REGISTER(_server_metric_entity, report_disk_requests_failed); - METRIC_REGISTER(_server_metric_entity, report_task_requests_total); - METRIC_REGISTER(_server_metric_entity, report_task_requests_failed); - METRIC_REGISTER(_server_metric_entity, schema_change_requests_total); - METRIC_REGISTER(_server_metric_entity, schema_change_requests_failed); - METRIC_REGISTER(_server_metric_entity, create_rollup_requests_total); - METRIC_REGISTER(_server_metric_entity, create_rollup_requests_failed); - METRIC_REGISTER(_server_metric_entity, storage_migrate_requests_total); - METRIC_REGISTER(_server_metric_entity, delete_requests_total); - METRIC_REGISTER(_server_metric_entity, delete_requests_failed); - METRIC_REGISTER(_server_metric_entity, clone_requests_total); - METRIC_REGISTER(_server_metric_entity, clone_requests_failed); - METRIC_REGISTER(_server_metric_entity, finish_task_requests_total); - METRIC_REGISTER(_server_metric_entity, finish_task_requests_failed); - METRIC_REGISTER(_server_metric_entity, base_compaction_request_total); - METRIC_REGISTER(_server_metric_entity, base_compaction_request_failed); - METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_total); - METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_failed); - METRIC_REGISTER(_server_metric_entity, publish_task_request_total); - METRIC_REGISTER(_server_metric_entity, publish_task_failed_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, create_tablet_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, create_tablet_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, drop_tablet_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_all_tablets_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_all_tablets_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_tablet_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_tablet_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_disk_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_disk_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_task_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, report_task_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, schema_change_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, schema_change_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, create_rollup_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, create_rollup_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, storage_migrate_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, delete_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, delete_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, clone_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, clone_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, finish_task_requests_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, finish_task_requests_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, base_compaction_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, base_compaction_request_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_request_failed); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, publish_task_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, publish_task_failed_total); - METRIC_REGISTER(_server_metric_entity, base_compaction_deltas_total); - METRIC_REGISTER(_server_metric_entity, base_compaction_bytes_total); - METRIC_REGISTER(_server_metric_entity, cumulative_compaction_deltas_total); - METRIC_REGISTER(_server_metric_entity, cumulative_compaction_bytes_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, base_compaction_deltas_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, base_compaction_bytes_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_deltas_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, cumulative_compaction_bytes_total); - METRIC_REGISTER(_server_metric_entity, meta_write_request_total); - METRIC_REGISTER(_server_metric_entity, meta_write_request_duration_us); - METRIC_REGISTER(_server_metric_entity, meta_read_request_total); - METRIC_REGISTER(_server_metric_entity, meta_read_request_duration_us); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, meta_write_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, meta_write_request_duration_us); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, meta_read_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, meta_read_request_duration_us); - METRIC_REGISTER(_server_metric_entity, segment_read_total); - METRIC_REGISTER(_server_metric_entity, segment_row_total); - METRIC_REGISTER(_server_metric_entity, segment_rows_by_short_key); - METRIC_REGISTER(_server_metric_entity, segment_rows_read_by_zone_map); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_read_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_row_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_rows_by_short_key); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_rows_read_by_zone_map); - METRIC_REGISTER(_server_metric_entity, txn_begin_request_total); - METRIC_REGISTER(_server_metric_entity, txn_commit_request_total); - METRIC_REGISTER(_server_metric_entity, txn_rollback_request_total); - METRIC_REGISTER(_server_metric_entity, txn_exec_plan_total); - METRIC_REGISTER(_server_metric_entity, stream_receive_bytes_total); - METRIC_REGISTER(_server_metric_entity, stream_load_rows_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, txn_begin_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, txn_commit_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, txn_rollback_request_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, txn_exec_plan_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, stream_receive_bytes_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, stream_load_rows_total); - METRIC_REGISTER(_server_metric_entity, memtable_flush_total); - METRIC_REGISTER(_server_metric_entity, memtable_flush_duration_us); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, memtable_flush_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, memtable_flush_duration_us); - METRIC_REGISTER(_server_metric_entity, memory_pool_bytes_total); - METRIC_REGISTER(_server_metric_entity, process_thread_num); - METRIC_REGISTER(_server_metric_entity, process_fd_num_used); - METRIC_REGISTER(_server_metric_entity, process_fd_num_limit_soft); - METRIC_REGISTER(_server_metric_entity, process_fd_num_limit_hard); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, memory_pool_bytes_total); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, process_thread_num); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, process_fd_num_used); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, process_fd_num_limit_soft); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, process_fd_num_limit_hard); - METRIC_REGISTER(_server_metric_entity, tablet_cumulative_max_compaction_score); - METRIC_REGISTER(_server_metric_entity, tablet_base_max_compaction_score); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, tablet_cumulative_max_compaction_score); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, tablet_base_max_compaction_score); - METRIC_REGISTER(_server_metric_entity, push_request_write_bytes_per_second); - METRIC_REGISTER(_server_metric_entity, query_scan_bytes_per_second); - METRIC_REGISTER(_server_metric_entity, max_disk_io_util_percent); - METRIC_REGISTER(_server_metric_entity, max_network_send_bytes_rate); - METRIC_REGISTER(_server_metric_entity, max_network_receive_bytes_rate); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, push_request_write_bytes_per_second); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, query_scan_bytes_per_second); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, max_disk_io_util_percent); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, max_network_send_bytes_rate); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, max_network_receive_bytes_rate); - METRIC_REGISTER(_server_metric_entity, readable_blocks_total); - METRIC_REGISTER(_server_metric_entity, writable_blocks_total); - METRIC_REGISTER(_server_metric_entity, blocks_created_total); - METRIC_REGISTER(_server_metric_entity, blocks_deleted_total); - METRIC_REGISTER(_server_metric_entity, bytes_read_total); - METRIC_REGISTER(_server_metric_entity, bytes_written_total); - METRIC_REGISTER(_server_metric_entity, disk_sync_total); - METRIC_REGISTER(_server_metric_entity, blocks_open_reading); - METRIC_REGISTER(_server_metric_entity, blocks_open_writing); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, readable_blocks_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, writable_blocks_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, blocks_created_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, blocks_deleted_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, bytes_read_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, bytes_written_total); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, disk_sync_total); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, blocks_open_reading); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, blocks_open_writing); - METRIC_REGISTER(_server_metric_entity, load_rows); - METRIC_REGISTER(_server_metric_entity, load_bytes); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_rows); + INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_bytes); _server_metric_entity->register_hook(_s_hook_name, std::bind(&DorisMetrics::_update, this)); } @@ -257,11 +257,11 @@ void DorisMetrics::_update_process_thread_num() { Status st = FileUtils::get_children_count(Env::Default(), ss.str(), &count); if (!st.ok()) { LOG(WARNING) << "failed to count thread num from: " << ss.str(); - process_thread_num.set_value(0); + process_thread_num->set_value(0); return; } - process_thread_num.set_value(count); + process_thread_num->set_value(count); } // get num of file descriptor of doris_be process @@ -275,10 +275,10 @@ void DorisMetrics::_update_process_fd_num() { Status st = FileUtils::get_children_count(Env::Default(), ss.str(), &count); if (!st.ok()) { LOG(WARNING) << "failed to count fd from: " << ss.str(); - process_fd_num_used.set_value(0); + process_fd_num_used->set_value(0); return; } - process_fd_num_used.set_value(count); + process_fd_num_used->set_value(count); // fd limits std::stringstream ss2; @@ -301,8 +301,8 @@ void DorisMetrics::_update_process_fd_num() { int num = sscanf(line_ptr, "Max open files %" PRId64 " %" PRId64, &values[0], &values[1]); if (num == 2) { - process_fd_num_limit_soft.set_value(values[0]); - process_fd_num_limit_hard.set_value(values[1]); + process_fd_num_limit_soft->set_value(values[0]); + process_fd_num_limit_hard->set_value(values[1]); break; } } diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 0b5a8bd78f..3ffbe746d8 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -28,146 +28,148 @@ namespace doris { -#define REGISTER_HOOK_METRIC(name, func) \ - DorisMetrics::instance()->server_entity()->register_metric(&METRIC_##name, &DorisMetrics::instance()->name); \ - DorisMetrics::instance()->server_entity()->register_hook(#name, [&]() { \ - DorisMetrics::instance()->name.set_value(func()); \ +#define REGISTER_HOOK_METRIC(metric, func) \ + DorisMetrics::instance()->metric = \ + (UIntGauge*)(DorisMetrics::instance()->server_entity()-> \ + register_metric(&METRIC_##metric)); \ + DorisMetrics::instance()->server_entity()->register_hook(#metric, [&]() { \ + DorisMetrics::instance()->metric->set_value(func()); \ }); -#define DEREGISTER_HOOK_METRIC(name) \ - DorisMetrics::instance()->server_entity()->deregister_metric(&METRIC_##name); \ +#define DEREGISTER_HOOK_METRIC(name) \ + DorisMetrics::instance()->server_entity()->deregister_metric(&METRIC_##name); \ DorisMetrics::instance()->server_entity()->deregister_hook(#name); class DorisMetrics { public: - IntCounter fragment_requests_total; - IntCounter fragment_request_duration_us; - IntCounter http_requests_total; - IntCounter http_request_send_bytes; - IntCounter query_scan_bytes; - IntCounter query_scan_rows; + IntCounter* fragment_requests_total; + IntCounter* fragment_request_duration_us; + IntCounter* http_requests_total; + IntCounter* http_request_send_bytes; + IntCounter* query_scan_bytes; + IntCounter* query_scan_rows; - IntCounter push_requests_success_total; - IntCounter push_requests_fail_total; - IntCounter push_request_duration_us; - IntCounter push_request_write_bytes; - IntCounter push_request_write_rows; - IntCounter create_tablet_requests_total; - IntCounter create_tablet_requests_failed; - IntCounter drop_tablet_requests_total; + IntCounter* push_requests_success_total; + IntCounter* push_requests_fail_total; + IntCounter* push_request_duration_us; + IntCounter* push_request_write_bytes; + IntCounter* push_request_write_rows; + IntCounter* create_tablet_requests_total; + IntCounter* create_tablet_requests_failed; + IntCounter* drop_tablet_requests_total; - IntCounter report_all_tablets_requests_total; - IntCounter report_all_tablets_requests_failed; - IntCounter report_tablet_requests_total; - IntCounter report_tablet_requests_failed; - IntCounter report_disk_requests_total; - IntCounter report_disk_requests_failed; - IntCounter report_task_requests_total; - IntCounter report_task_requests_failed; + IntCounter* report_all_tablets_requests_total; + IntCounter* report_all_tablets_requests_failed; + IntCounter* report_tablet_requests_total; + IntCounter* report_tablet_requests_failed; + IntCounter* report_disk_requests_total; + IntCounter* report_disk_requests_failed; + IntCounter* report_task_requests_total; + IntCounter* report_task_requests_failed; - IntCounter schema_change_requests_total; - IntCounter schema_change_requests_failed; - IntCounter create_rollup_requests_total; - IntCounter create_rollup_requests_failed; - IntCounter storage_migrate_requests_total; - IntCounter delete_requests_total; - IntCounter delete_requests_failed; - IntCounter clone_requests_total; - IntCounter clone_requests_failed; + IntCounter* schema_change_requests_total; + IntCounter* schema_change_requests_failed; + IntCounter* create_rollup_requests_total; + IntCounter* create_rollup_requests_failed; + IntCounter* storage_migrate_requests_total; + IntCounter* delete_requests_total; + IntCounter* delete_requests_failed; + IntCounter* clone_requests_total; + IntCounter* clone_requests_failed; - IntCounter finish_task_requests_total; - IntCounter finish_task_requests_failed; + IntCounter* finish_task_requests_total; + IntCounter* finish_task_requests_failed; - IntCounter base_compaction_request_total; - IntCounter base_compaction_request_failed; - IntCounter cumulative_compaction_request_total; - IntCounter cumulative_compaction_request_failed; + IntCounter* base_compaction_request_total; + IntCounter* base_compaction_request_failed; + IntCounter* cumulative_compaction_request_total; + IntCounter* cumulative_compaction_request_failed; - IntCounter base_compaction_deltas_total; - IntCounter base_compaction_bytes_total; - IntCounter cumulative_compaction_deltas_total; - IntCounter cumulative_compaction_bytes_total; + IntCounter* base_compaction_deltas_total; + IntCounter* base_compaction_bytes_total; + IntCounter* cumulative_compaction_deltas_total; + IntCounter* cumulative_compaction_bytes_total; - IntCounter publish_task_request_total; - IntCounter publish_task_failed_total; + IntCounter* publish_task_request_total; + IntCounter* publish_task_failed_total; - IntCounter meta_write_request_total; - IntCounter meta_write_request_duration_us; - IntCounter meta_read_request_total; - IntCounter meta_read_request_duration_us; + IntCounter* meta_write_request_total; + IntCounter* meta_write_request_duration_us; + IntCounter* meta_read_request_total; + IntCounter* meta_read_request_duration_us; // Counters for segment_v2 // ----------------------- // total number of segments read - IntCounter segment_read_total; + IntCounter* segment_read_total; // total number of rows in queried segments (before index pruning) - IntCounter segment_row_total; + IntCounter* segment_row_total; // total number of rows selected by short key index - IntCounter segment_rows_by_short_key; + IntCounter* segment_rows_by_short_key; // total number of rows selected by zone map index - IntCounter segment_rows_read_by_zone_map; + IntCounter* segment_rows_read_by_zone_map; - IntCounter txn_begin_request_total; - IntCounter txn_commit_request_total; - IntCounter txn_rollback_request_total; - IntCounter txn_exec_plan_total; - IntCounter stream_receive_bytes_total; - IntCounter stream_load_rows_total; - IntCounter load_rows; - IntCounter load_bytes; + IntCounter* txn_begin_request_total; + IntCounter* txn_commit_request_total; + IntCounter* txn_rollback_request_total; + IntCounter* txn_exec_plan_total; + IntCounter* stream_receive_bytes_total; + IntCounter* stream_load_rows_total; + IntCounter* load_rows; + IntCounter* load_bytes; - IntCounter memtable_flush_total; - IntCounter memtable_flush_duration_us; + IntCounter* memtable_flush_total; + IntCounter* memtable_flush_duration_us; - IntGauge memory_pool_bytes_total; - IntGauge process_thread_num; - IntGauge process_fd_num_used; - IntGauge process_fd_num_limit_soft; - IntGauge process_fd_num_limit_hard; + IntGauge* memory_pool_bytes_total; + IntGauge* process_thread_num; + IntGauge* process_fd_num_used; + IntGauge* process_fd_num_limit_soft; + IntGauge* process_fd_num_limit_hard; // the max compaction score of all tablets. // Record base and cumulative scores separately, because // we need to get the larger of the two. - IntGauge tablet_cumulative_max_compaction_score; - IntGauge tablet_base_max_compaction_score; + IntGauge* tablet_cumulative_max_compaction_score; + IntGauge* tablet_base_max_compaction_score; // The following metrics will be calculated // by metric calculator - IntGauge push_request_write_bytes_per_second; - IntGauge query_scan_bytes_per_second; - IntGauge max_disk_io_util_percent; - IntGauge max_network_send_bytes_rate; - IntGauge max_network_receive_bytes_rate; + IntGauge* push_request_write_bytes_per_second; + IntGauge* query_scan_bytes_per_second; + IntGauge* max_disk_io_util_percent; + IntGauge* max_network_send_bytes_rate; + IntGauge* max_network_receive_bytes_rate; // Metrics related with BlockManager - IntCounter readable_blocks_total; - IntCounter writable_blocks_total; - IntCounter blocks_created_total; - IntCounter blocks_deleted_total; - IntCounter bytes_read_total; - IntCounter bytes_written_total; - IntCounter disk_sync_total; - IntGauge blocks_open_reading; - IntGauge blocks_open_writing; + IntCounter* readable_blocks_total; + IntCounter* writable_blocks_total; + IntCounter* blocks_created_total; + IntCounter* blocks_deleted_total; + IntCounter* bytes_read_total; + IntCounter* bytes_written_total; + IntCounter* disk_sync_total; + IntGauge* blocks_open_reading; + IntGauge* blocks_open_writing; // Size of some global containers - UIntGauge rowset_count_generated_and_in_use; - UIntGauge unused_rowsets_count; - UIntGauge broker_count; - UIntGauge data_stream_receiver_count; - UIntGauge fragment_endpoint_count; - UIntGauge active_scan_context_count; - UIntGauge plan_fragment_count; - UIntGauge load_channel_count; - UIntGauge result_buffer_block_count; - UIntGauge result_block_queue_count; - UIntGauge routine_load_task_count; - UIntGauge small_file_cache_count; - UIntGauge stream_load_pipe_count; - UIntGauge brpc_endpoint_stub_count; - UIntGauge tablet_writer_count; + UIntGauge* rowset_count_generated_and_in_use; + UIntGauge* unused_rowsets_count; + UIntGauge* broker_count; + UIntGauge* data_stream_receiver_count; + UIntGauge* fragment_endpoint_count; + UIntGauge* active_scan_context_count; + UIntGauge* plan_fragment_count; + UIntGauge* load_channel_count; + UIntGauge* result_buffer_block_count; + UIntGauge* result_block_queue_count; + UIntGauge* routine_load_task_count; + UIntGauge* small_file_cache_count; + UIntGauge* stream_load_pipe_count; + UIntGauge* brpc_endpoint_stub_count; + UIntGauge* tablet_writer_count; - UIntGauge compaction_mem_current_consumption; + UIntGauge* compaction_mem_current_consumption; static DorisMetrics* instance() { static DorisMetrics instance; @@ -182,7 +184,7 @@ public: MetricRegistry* metric_registry() { return &_metric_registry; } SystemMetrics* system_metrics() { return _system_metrics.get(); } - MetricEntity* server_entity() { return _server_metric_entity; } + MetricEntity* server_entity() { return _server_metric_entity.get(); } private: // Don't allow constrctor @@ -200,7 +202,7 @@ private: std::unique_ptr _system_metrics; - MetricEntity* _server_metric_entity; + std::shared_ptr _server_metric_entity; }; }; // namespace doris diff --git a/be/src/util/metrics.cpp b/be/src/util/metrics.cpp index 1a1ff935ab..8a53d916e3 100644 --- a/be/src/util/metrics.cpp +++ b/be/src/util/metrics.cpp @@ -110,15 +110,13 @@ std::string MetricPrototype::combine_name(const std::string& registry_name) cons return (registry_name.empty() ? std::string() : registry_name + "_") + simple_name(); } -void MetricEntity::register_metric(const MetricPrototype* metric_type, Metric* metric) { - std::lock_guard l(_lock); - DCHECK(_metrics.find(metric_type) == _metrics.end()) << "metric is already exist! " << _name << ":" << metric_type->name; - _metrics.emplace(metric_type, metric); -} - void MetricEntity::deregister_metric(const MetricPrototype* metric_type) { std::lock_guard l(_lock); - _metrics.erase(metric_type); + auto metric = _metrics.find(metric_type); + if (metric != _metrics.end()) { + delete metric->second; + _metrics.erase(metric); + } } Metric* MetricEntity::get_metric(const std::string& name, const std::string& group_name) const { @@ -156,47 +154,62 @@ void MetricEntity::trigger_hook_unlocked(bool force) const { MetricRegistry::~MetricRegistry() { } -MetricEntity* MetricRegistry::register_entity(const std::string& name, const Labels& labels) { - std::shared_ptr entity = std::make_shared(name, labels); - +std::shared_ptr MetricRegistry::register_entity(const std::string& name, const Labels& labels, MetricEntityType type) { + std::shared_ptr entity = std::make_shared(type, name, labels); std::lock_guard l(_lock); - DCHECK(_entities.find(name) == _entities.end()) << name; - _entities.insert(std::make_pair(name, entity)); - return entity.get(); + auto inserted_entity = _entities.insert(std::make_pair(entity, 1)); + if (!inserted_entity.second) { + // If exist, increase the registered count + inserted_entity.first->second++; + } + return inserted_entity.first->first; } -void MetricRegistry::deregister_entity(const std::string& name) { +void MetricRegistry::deregister_entity(const std::shared_ptr& entity) { std::lock_guard l(_lock); - _entities.erase(name); + auto found_entity = _entities.find(entity); + if (found_entity != _entities.end()) { + // Decrease the registered count + --found_entity->second; + if (found_entity->second == 0) { + // Only erase it when registered count is zero + _entities.erase(found_entity); + } + } } -std::shared_ptr MetricRegistry::get_entity(const std::string& name) { +std::shared_ptr MetricRegistry::get_entity(const std::string& name, const Labels& labels, MetricEntityType type) { + std::shared_ptr dummy = std::make_shared(type, name, labels); + std::lock_guard l(_lock); - auto entity = _entities.find(name); + auto entity = _entities.find(dummy); if (entity == _entities.end()) { return std::shared_ptr(); } - return entity->second; + return entity->first; } void MetricRegistry::trigger_all_hooks(bool force) const { std::lock_guard l(_lock); for (const auto& entity : _entities) { - std::lock_guard l(entity.second->_lock); - entity.second->trigger_hook_unlocked(force); + std::lock_guard l(entity.first->_lock); + entity.first->trigger_hook_unlocked(force); } } -std::string MetricRegistry::to_prometheus() const { +std::string MetricRegistry::to_prometheus(bool with_tablet_metrics) const { std::stringstream ss; // Reorder by MetricPrototype EntityMetricsByType entity_metrics_by_types; std::lock_guard l(_lock); for (const auto& entity : _entities) { - std::lock_guard l(entity.second->_lock); - entity.second->trigger_hook_unlocked(false); - for (const auto& metric : entity.second->_metrics) { - std::pair new_elem = std::make_pair(entity.second.get(), metric.second); + if (entity.first->_type == MetricEntityType::kTablet && !with_tablet_metrics) { + continue; + } + std::lock_guard l(entity.first->_lock); + entity.first->trigger_hook_unlocked(false); + for (const auto& metric : entity.first->_metrics) { + std::pair new_elem = std::make_pair(entity.first.get(), metric.second); auto found = entity_metrics_by_types.find(metric.first); if (found == entity_metrics_by_types.end()) { entity_metrics_by_types.emplace(metric.first, std::vector>({new_elem})); @@ -224,14 +237,17 @@ std::string MetricRegistry::to_prometheus() const { return ss.str(); } -std::string MetricRegistry::to_json() const { +std::string MetricRegistry::to_json(bool with_tablet_metrics) const { rj::Document doc{rj::kArrayType}; rj::Document::AllocatorType& allocator = doc.GetAllocator(); std::lock_guard l(_lock); for (const auto& entity : _entities) { - std::lock_guard l(entity.second->_lock); - entity.second->trigger_hook_unlocked(false); - for (const auto& metric : entity.second->_metrics) { + if (entity.first->_type == MetricEntityType::kTablet && !with_tablet_metrics) { + continue; + } + std::lock_guard l(entity.first->_lock); + entity.first->trigger_hook_unlocked(false); + for (const auto& metric : entity.first->_metrics) { rj::Value metric_obj(rj::kObjectType); // tags rj::Value tag_obj(rj::kObjectType); @@ -244,7 +260,7 @@ std::string MetricRegistry::to_json() const { allocator); } // MetricEntity's labels - for (auto& label : entity.second->_labels) { + for (auto& label : entity.first->_labels) { tag_obj.AddMember( rj::Value(label.first.c_str(), allocator), rj::Value(label.second.c_str(), allocator), @@ -270,9 +286,9 @@ std::string MetricRegistry::to_core_string() const { std::stringstream ss; std::lock_guard l(_lock); for (const auto& entity : _entities) { - std::lock_guard l(entity.second->_lock); - entity.second->trigger_hook_unlocked(false); - for (const auto &metric : entity.second->_metrics) { + std::lock_guard l(entity.first->_lock); + entity.first->trigger_hook_unlocked(false); + for (const auto &metric : entity.first->_metrics) { if (metric.first->is_core_metric) { ss << metric.first->combine_name(_name) << " LONG " << metric.second->to_string() << "\n"; } diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h index 50093fe1da..61fccce7b9 100644 --- a/be/src/util/metrics.h +++ b/be/src/util/metrics.h @@ -212,6 +212,14 @@ public: virtual ~LockGauge() {} }; +using IntCounter = CoreLocalCounter; +using IntAtomicCounter = AtomicCounter; +using UIntCounter = CoreLocalCounter; +using DoubleCounter = LockCounter; +using IntGauge = AtomicGauge; +using UIntGauge = AtomicGauge; +using DoubleGauge = LockGauge; + using Labels = std::unordered_map; struct MetricPrototype { public: @@ -263,8 +271,17 @@ public: #define DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(name, unit, desc) \ DEFINE_METRIC_PROTOTYPE(name, MetricType::GAUGE, unit, desc, "", Labels(), false) -#define METRIC_REGISTER(entity, metric) \ - entity->register_metric(&METRIC_##metric, &metric) +#define INT_COUNTER_METRIC_REGISTER(entity, metric) \ + metric = (IntCounter*)(entity->register_metric(&METRIC_##metric)) + +#define INT_GAUGE_METRIC_REGISTER(entity, metric) \ + metric = (IntGauge*)(entity->register_metric(&METRIC_##metric)) + +#define INT_UGAUGE_METRIC_REGISTER(entity, metric) \ + metric = (UIntGauge*)(entity->register_metric(&METRIC_##metric)) + +#define INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, metric) \ + metric = (IntAtomicCounter*)(entity->register_metric(&METRIC_##metric)) #define METRIC_DEREGISTER(entity, metric) \ entity->deregister_metric(&METRIC_##metric) @@ -284,12 +301,34 @@ struct MetricPrototypeEqualTo { using MetricMap = std::unordered_map; +enum class MetricEntityType { + kServer, + kTablet +}; + class MetricEntity { public: - MetricEntity(const std::string& name, const Labels& labels) - : _name(name), _labels(labels) {} + MetricEntity(MetricEntityType type, const std::string& name, const Labels& labels) + : _type(type), _name(name), _labels(labels) {} + ~MetricEntity() { + for (auto& metric : _metrics) { + delete metric.second; + } + } + + const std::string& name() const { return _name; } + + template + Metric* register_metric(const MetricPrototype* metric_type) { + std::lock_guard l(_lock); + auto inserted_metric = _metrics.insert(std::make_pair(metric_type, nullptr)); + if (inserted_metric.second) { + // If not exist, make a new metric pointer + inserted_metric.first->second = new T(); + } + return inserted_metric.first->second; + } - void register_metric(const MetricPrototype* metric_type, Metric* metric); void deregister_metric(const MetricPrototype* metric_type); Metric* get_metric(const std::string& name, const std::string& group_name = "") const; @@ -300,7 +339,10 @@ public: private: friend class MetricRegistry; + friend class MetricEntityHash; + friend class MetricEntityEqualTo; + MetricEntityType _type; std::string _name; Labels _labels; @@ -309,6 +351,20 @@ private: std::map> _hooks; }; +struct MetricEntityHash { + size_t operator()(const std::shared_ptr metric_entity) const { + return std::hash()(metric_entity->name()); + } +}; + +struct MetricEntityEqualTo { + bool operator()(const std::shared_ptr first, const std::shared_ptr second) const { + return first->_type == second->_type + && first->_name == second->_name + && first->_labels == second->_labels; + } +}; + using EntityMetricsByType = std::unordered_map>, MetricPrototypeHash, MetricPrototypeEqualTo>; class MetricRegistry { @@ -316,29 +372,22 @@ public: MetricRegistry(const std::string& name) : _name(name) {} ~MetricRegistry(); - MetricEntity* register_entity(const std::string& name, const Labels& labels); - void deregister_entity(const std::string& name); - std::shared_ptr get_entity(const std::string& name); + std::shared_ptr register_entity(const std::string& name, const Labels& labels = {}, MetricEntityType type = MetricEntityType::kServer); + void deregister_entity(const std::shared_ptr& entity); + std::shared_ptr get_entity(const std::string& name, const Labels& labels = {}, MetricEntityType type = MetricEntityType::kServer); void trigger_all_hooks(bool force) const; - std::string to_prometheus() const; - std::string to_json() const; + std::string to_prometheus(bool with_tablet_metrics = false) const; + std::string to_json(bool with_tablet_metrics = false) const; std::string to_core_string() const; private: const std::string _name; mutable SpinLock _lock; - std::unordered_map> _entities; + // MetricEntity -> register count + std::unordered_map, int32_t, MetricEntityHash, MetricEntityEqualTo> _entities; }; -using IntCounter = CoreLocalCounter; -using IntAtomicCounter = AtomicCounter; -using UIntCounter = CoreLocalCounter; -using DoubleCounter = LockCounter; -using IntGauge = AtomicGauge; -using UIntGauge = AtomicGauge; -using DoubleGauge = LockGauge; - } // namespace doris diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp index c1aae9e761..a51a624639 100644 --- a/be/src/util/system_metrics.cpp +++ b/be/src/util/system_metrics.cpp @@ -43,42 +43,42 @@ DEFINE_CPU_COUNTER_METRIC(guest_nice); // /proc/stat: http://www.linuxhowtos.org/System/procstat.htm struct CpuMetrics { CpuMetrics(MetricEntity* ent) : entity(ent) { - METRIC_REGISTER(entity, cpu_user); - METRIC_REGISTER(entity, cpu_nice); - METRIC_REGISTER(entity, cpu_system); - METRIC_REGISTER(entity, cpu_idle); - METRIC_REGISTER(entity, cpu_iowait); - METRIC_REGISTER(entity, cpu_irq); - METRIC_REGISTER(entity, cpu_soft_irq); - METRIC_REGISTER(entity, cpu_steal); - METRIC_REGISTER(entity, cpu_guest); - METRIC_REGISTER(entity, cpu_guest_nice); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_user); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_nice); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_system); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_idle); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_iowait); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_irq); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_soft_irq); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_steal); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_guest); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, cpu_guest_nice); - metrics[0] = &cpu_user; - metrics[1] = &cpu_nice; - metrics[2] = &cpu_system; - metrics[3] = &cpu_idle; - metrics[4] = &cpu_iowait; - metrics[5] = &cpu_irq; - metrics[6] = &cpu_soft_irq; - metrics[7] = &cpu_steal; - metrics[8] = &cpu_guest; - metrics[9] = &cpu_guest_nice; + metrics[0] = cpu_user; + metrics[1] = cpu_nice; + metrics[2] = cpu_system; + metrics[3] = cpu_idle; + metrics[4] = cpu_iowait; + metrics[5] = cpu_irq; + metrics[6] = cpu_soft_irq; + metrics[7] = cpu_steal; + metrics[8] = cpu_guest; + metrics[9] = cpu_guest_nice; } static constexpr int cpu_num_metrics = 10; MetricEntity* entity = nullptr; - IntAtomicCounter cpu_user; - IntAtomicCounter cpu_nice; - IntAtomicCounter cpu_system; - IntAtomicCounter cpu_idle; - IntAtomicCounter cpu_iowait; - IntAtomicCounter cpu_irq; - IntAtomicCounter cpu_soft_irq; - IntAtomicCounter cpu_steal; - IntAtomicCounter cpu_guest; - IntAtomicCounter cpu_guest_nice; + IntAtomicCounter* cpu_user; + IntAtomicCounter* cpu_nice; + IntAtomicCounter* cpu_system; + IntAtomicCounter* cpu_idle; + IntAtomicCounter* cpu_iowait; + IntAtomicCounter* cpu_irq; + IntAtomicCounter* cpu_soft_irq; + IntAtomicCounter* cpu_steal; + IntAtomicCounter* cpu_guest; + IntAtomicCounter* cpu_guest_nice; IntAtomicCounter* metrics[cpu_num_metrics]; }; @@ -89,11 +89,11 @@ DEFINE_MEMORY_GAUGE_METRIC(allocated_bytes, MetricUnit::BYTES); struct MemoryMetrics { MemoryMetrics(MetricEntity* ent) : entity(ent) { - METRIC_REGISTER(entity, memory_allocated_bytes); + INT_GAUGE_METRIC_REGISTER(entity, memory_allocated_bytes); } MetricEntity* entity = nullptr; - IntGauge memory_allocated_bytes; + IntGauge* memory_allocated_bytes; }; #define DEFINE_DISK_COUNTER_METRIC(metric, unit) \ @@ -109,25 +109,25 @@ DEFINE_DISK_COUNTER_METRIC(io_time_weigthed, MetricUnit::MILLISECONDS); struct DiskMetrics { DiskMetrics(MetricEntity* ent) : entity(ent) { - METRIC_REGISTER(entity, disk_reads_completed); - METRIC_REGISTER(entity, disk_bytes_read); - METRIC_REGISTER(entity, disk_read_time_ms); - METRIC_REGISTER(entity, disk_writes_completed); - METRIC_REGISTER(entity, disk_bytes_written); - METRIC_REGISTER(entity, disk_write_time_ms); - METRIC_REGISTER(entity, disk_io_time_ms); - METRIC_REGISTER(entity, disk_io_time_weigthed); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_reads_completed); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_bytes_read); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_read_time_ms); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_writes_completed); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_bytes_written); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_write_time_ms); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_io_time_ms); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, disk_io_time_weigthed); } MetricEntity* entity = nullptr; - IntAtomicCounter disk_reads_completed; - IntAtomicCounter disk_bytes_read; - IntAtomicCounter disk_read_time_ms; - IntAtomicCounter disk_writes_completed; - IntAtomicCounter disk_bytes_written; - IntAtomicCounter disk_write_time_ms; - IntAtomicCounter disk_io_time_ms; - IntAtomicCounter disk_io_time_weigthed; + IntAtomicCounter* disk_reads_completed; + IntAtomicCounter* disk_bytes_read; + IntAtomicCounter* disk_read_time_ms; + IntAtomicCounter* disk_writes_completed; + IntAtomicCounter* disk_bytes_written; + IntAtomicCounter* disk_write_time_ms; + IntAtomicCounter* disk_io_time_ms; + IntAtomicCounter* disk_io_time_weigthed; }; #define DEFINE_NETWORK_COUNTER_METRIC(metric, unit) \ @@ -139,17 +139,17 @@ DEFINE_NETWORK_COUNTER_METRIC(send_packets, MetricUnit::PACKETS); struct NetworkMetrics { NetworkMetrics(MetricEntity* ent) : entity(ent) { - METRIC_REGISTER(entity, network_receive_bytes); - METRIC_REGISTER(entity, network_receive_packets); - METRIC_REGISTER(entity, network_send_bytes); - METRIC_REGISTER(entity, network_send_packets); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, network_receive_bytes); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, network_receive_packets); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, network_send_bytes); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, network_send_packets); } MetricEntity* entity = nullptr; - IntAtomicCounter network_receive_bytes; - IntAtomicCounter network_receive_packets; - IntAtomicCounter network_send_bytes; - IntAtomicCounter network_send_packets; + IntAtomicCounter* network_receive_bytes; + IntAtomicCounter* network_receive_packets; + IntAtomicCounter* network_send_bytes; + IntAtomicCounter* network_send_packets; }; #define DEFINE_SNMP_COUNTER_METRIC(metric, unit, desc) \ @@ -162,17 +162,17 @@ DEFINE_SNMP_COUNTER_METRIC(tcp_out_segs, MetricUnit::NOUNIT, "All send TCP packe // metrics read from /proc/net/snmp struct SnmpMetrics { SnmpMetrics(MetricEntity* ent) : entity(ent) { - METRIC_REGISTER(entity, snmp_tcp_in_errs); - METRIC_REGISTER(entity, snmp_tcp_retrans_segs); - METRIC_REGISTER(entity, snmp_tcp_in_segs); - METRIC_REGISTER(entity, snmp_tcp_out_segs); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, snmp_tcp_in_errs); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, snmp_tcp_retrans_segs); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, snmp_tcp_in_segs); + INT_ATOMIC_COUNTER_METRIC_REGISTER(entity, snmp_tcp_out_segs); } MetricEntity* entity = nullptr; - IntAtomicCounter snmp_tcp_in_errs; - IntAtomicCounter snmp_tcp_retrans_segs; - IntAtomicCounter snmp_tcp_in_segs; - IntAtomicCounter snmp_tcp_out_segs; + IntAtomicCounter* snmp_tcp_in_errs; + IntAtomicCounter* snmp_tcp_retrans_segs; + IntAtomicCounter* snmp_tcp_in_segs; + IntAtomicCounter* snmp_tcp_out_segs; }; #define DEFINE_FD_COUNTER_METRIC(metric, unit) \ @@ -182,13 +182,13 @@ DEFINE_FD_COUNTER_METRIC(num_used, MetricUnit::NOUNIT); struct FileDescriptorMetrics { FileDescriptorMetrics(MetricEntity* ent) : entity(ent) { - METRIC_REGISTER(entity, fd_num_limit); - METRIC_REGISTER(entity, fd_num_used); + INT_GAUGE_METRIC_REGISTER(entity, fd_num_limit); + INT_GAUGE_METRIC_REGISTER(entity, fd_num_used); } MetricEntity* entity = nullptr; - IntGauge fd_num_limit; - IntGauge fd_num_used; + IntGauge* fd_num_limit; + IntGauge* fd_num_used; }; const char* SystemMetrics::_s_hook_name = "system_metrics"; @@ -198,24 +198,20 @@ SystemMetrics::SystemMetrics(MetricRegistry* registry, const std::vector& network_interfaces) { DCHECK(registry != nullptr); _registry = registry; -#ifndef BE_TEST - auto entity = DorisMetrics::instance()->server_entity(); -#else - auto entity = _registry->register_entity("server", {}); -#endif - DCHECK(entity != nullptr); - entity->register_hook(_s_hook_name, std::bind(&SystemMetrics::update, this)); - _install_cpu_metrics(entity); - _install_memory_metrics(entity); + _server_entity = _registry->register_entity("server"); + DCHECK(_server_entity != nullptr); + _server_entity->register_hook(_s_hook_name, std::bind(&SystemMetrics::update, this)); + _install_cpu_metrics(_server_entity.get()); + _install_memory_metrics(_server_entity.get()); _install_disk_metrics(disk_devices); _install_net_metrics(network_interfaces); - _install_fd_metrics(entity); - _install_snmp_metrics(entity); + _install_fd_metrics(_server_entity.get()); + _install_snmp_metrics(_server_entity.get()); } SystemMetrics::~SystemMetrics() { - DCHECK(_registry != nullptr); - _registry->get_entity("server")->deregister_hook(_s_hook_name); + DCHECK(_server_entity != nullptr); + _server_entity->deregister_hook(_s_hook_name); for (auto& it : _disk_metrics) { delete it.second; @@ -302,14 +298,14 @@ void SystemMetrics::_update_memory_metrics() { size_t allocated_bytes = 0; MallocExtension::instance()->GetNumericProperty( "generic.current_allocated_bytes", &allocated_bytes); - _memory_metrics->memory_allocated_bytes.set_value(allocated_bytes); + _memory_metrics->memory_allocated_bytes->set_value(allocated_bytes); #endif } void SystemMetrics::_install_disk_metrics(const std::set& disk_devices) { for (auto& disk_device : disk_devices) { auto disk_entity = _registry->register_entity(std::string("disk_metrics.") + disk_device, {{"device", disk_device}}); - DiskMetrics* metrics = new DiskMetrics(disk_entity); + DiskMetrics* metrics = new DiskMetrics(disk_entity.get()); _disk_metrics.emplace(disk_device, metrics); } } @@ -368,21 +364,21 @@ void SystemMetrics::_update_disk_metrics() { } // update disk metrics // reads_completed: 4 reads completed successfully - it->second->disk_reads_completed.set_value(values[0]); + it->second->disk_reads_completed->set_value(values[0]); // bytes_read: 6 sectors read * 512; 5 reads merged is ignored - it->second->disk_bytes_read.set_value(values[2] * 512); + it->second->disk_bytes_read->set_value(values[2] * 512); // read_time_ms: 7 time spent reading (ms) - it->second->disk_read_time_ms.set_value(values[3]); + it->second->disk_read_time_ms->set_value(values[3]); // writes_completed: 8 writes completed - it->second->disk_writes_completed.set_value(values[4]); + it->second->disk_writes_completed->set_value(values[4]); // bytes_written: 10 sectors write * 512; 9 writes merged is ignored - it->second->disk_bytes_written.set_value(values[6] * 512); + it->second->disk_bytes_written->set_value(values[6] * 512); // write_time_ms: 11 time spent writing (ms) - it->second->disk_write_time_ms.set_value(values[7]); + it->second->disk_write_time_ms->set_value(values[7]); // io_time_ms: 13 time spent doing I/Os (ms) - it->second->disk_io_time_ms.set_value(values[9]); + it->second->disk_io_time_ms->set_value(values[9]); // io_time_weigthed: 14 - weighted time spent doing I/Os (ms) - it->second->disk_io_time_weigthed.set_value(values[10]); + it->second->disk_io_time_weigthed->set_value(values[10]); } if (ferror(fp) != 0) { char buf[64]; @@ -395,7 +391,7 @@ void SystemMetrics::_update_disk_metrics() { void SystemMetrics::_install_net_metrics(const std::vector& interfaces) { for (auto& interface : interfaces) { auto interface_entity = _registry->register_entity(std::string("network_metrics.") + interface, {{"device", interface}}); - NetworkMetrics* metrics = new NetworkMetrics(interface_entity); + NetworkMetrics* metrics = new NetworkMetrics(interface_entity.get()); _network_metrics.emplace(interface, metrics); } } @@ -486,10 +482,10 @@ void SystemMetrics::_update_net_metrics() { default: break; } - it->second->network_receive_bytes.set_value(receive_bytes); - it->second->network_receive_packets.set_value(receive_packets); - it->second->network_send_bytes.set_value(send_bytes); - it->second->network_send_packets.set_value(send_packets); + it->second->network_receive_bytes->set_value(receive_bytes); + it->second->network_receive_packets->set_value(receive_packets); + it->second->network_send_bytes->set_value(send_bytes); + it->second->network_send_packets->set_value(send_packets); } if (ferror(fp) != 0) { char buf[64]; @@ -558,10 +554,10 @@ void SystemMetrics::_update_snmp_metrics() { int64_t in_errs = atoi64(metrics[header_map["InErrs"]]); int64_t in_segs = atoi64(metrics[header_map["InSegs"]]); int64_t out_segs = atoi64(metrics[header_map["OutSegs"]]); - _snmp_metrics->snmp_tcp_retrans_segs.set_value(retrans_segs); - _snmp_metrics->snmp_tcp_in_errs.set_value(in_errs); - _snmp_metrics->snmp_tcp_in_segs.set_value(in_segs); - _snmp_metrics->snmp_tcp_out_segs.set_value(out_segs); + _snmp_metrics->snmp_tcp_retrans_segs->set_value(retrans_segs); + _snmp_metrics->snmp_tcp_in_errs->set_value(in_errs); + _snmp_metrics->snmp_tcp_in_segs->set_value(in_segs); + _snmp_metrics->snmp_tcp_out_segs->set_value(out_segs); if (ferror(fp) != 0) { char buf[64]; @@ -599,8 +595,8 @@ void SystemMetrics::_update_fd_metrics() { int num = sscanf(_line_ptr, "%" PRId64 " %" PRId64 " %" PRId64, &values[0], &values[1], &values[2]); if (num == 3) { - _fd_metrics->fd_num_limit.set_value(values[2]); - _fd_metrics->fd_num_used.set_value(values[0] - values[1]); + _fd_metrics->fd_num_limit->set_value(values[2]); + _fd_metrics->fd_num_used->set_value(values[0] - values[1]); } } @@ -616,7 +612,7 @@ int64_t SystemMetrics::get_max_io_util( const std::map& lst_value, int64_t interval_sec) { int64_t max = 0; for (auto& it : _disk_metrics) { - int64_t cur = it.second->disk_io_time_ms.value(); + int64_t cur = it.second->disk_io_time_ms->value(); const auto find = lst_value.find(it.first); if (find == lst_value.end()) { continue; @@ -630,7 +626,7 @@ int64_t SystemMetrics::get_max_io_util( void SystemMetrics::get_disks_io_time(std::map* map) { map->clear(); for (auto& it : _disk_metrics) { - map->emplace(it.first, it.second->disk_io_time_ms.value()); + map->emplace(it.first, it.second->disk_io_time_ms->value()); } } @@ -641,8 +637,8 @@ void SystemMetrics::get_network_traffic( rcv_map->clear(); for (auto& it : _network_metrics) { if (it.first == "lo") { continue; } - send_map->emplace(it.first, it.second->network_send_bytes.value()); - rcv_map->emplace(it.first, it.second->network_receive_bytes.value()); + send_map->emplace(it.first, it.second->network_send_bytes->value()); + rcv_map->emplace(it.first, it.second->network_receive_bytes->value()); } } @@ -654,8 +650,8 @@ void SystemMetrics::get_max_net_traffic( int64_t max_send = 0; int64_t max_rcv = 0; for (auto& it : _network_metrics) { - int64_t cur_send = it.second->network_send_bytes.value(); - int64_t cur_rcv = it.second->network_receive_bytes.value(); + int64_t cur_send = it.second->network_send_bytes->value(); + int64_t cur_rcv = it.second->network_receive_bytes->value(); const auto find_send = lst_send_map.find(it.first); if (find_send != lst_send_map.end()) { diff --git a/be/src/util/system_metrics.h b/be/src/util/system_metrics.h index 80ed60446f..9f422a5240 100644 --- a/be/src/util/system_metrics.h +++ b/be/src/util/system_metrics.h @@ -89,6 +89,7 @@ private: char* _line_ptr = nullptr; size_t _line_buf_size = 0; MetricRegistry* _registry = nullptr; + std::shared_ptr _server_entity = nullptr; }; } diff --git a/be/src/util/thrift_server.cpp b/be/src/util/thrift_server.cpp index 442c54d1fa..8af993a461 100644 --- a/be/src/util/thrift_server.cpp +++ b/be/src/util/thrift_server.cpp @@ -226,8 +226,8 @@ void* ThriftServer::ThriftServerEventProcessor::createContext( _thrift_server->_session_handler->session_start(*_session_key); } - _thrift_server->thrift_connections_total.increment(1L); - _thrift_server->thrift_current_connections.increment(1L); + _thrift_server->thrift_connections_total->increment(1L); + _thrift_server->thrift_current_connections->increment(1L); // Store the _session_key in the per-client context to avoid recomputing // it. If only this were accessible from RPC method calls, we wouldn't have to @@ -256,7 +256,7 @@ void ThriftServer::ThriftServerEventProcessor::deleteContext( _thrift_server->_session_keys.erase(_session_key); } - _thrift_server->thrift_current_connections.increment(-1L); + _thrift_server->thrift_current_connections->increment(-1L); } ThriftServer::ThriftServer( @@ -275,8 +275,8 @@ ThriftServer::ThriftServer( _processor(processor), _session_handler(NULL) { _thrift_server_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(std::string("thrift_server.") + name, {{"name", name}}); - METRIC_REGISTER(_thrift_server_metric_entity, thrift_current_connections); - METRIC_REGISTER(_thrift_server_metric_entity, thrift_connections_total); + INT_GAUGE_METRIC_REGISTER(_thrift_server_metric_entity, thrift_current_connections); + INT_COUNTER_METRIC_REGISTER(_thrift_server_metric_entity, thrift_connections_total); } Status ThriftServer::start() { diff --git a/be/src/util/thrift_server.h b/be/src/util/thrift_server.h index 779857bcc7..66efbb658b 100644 --- a/be/src/util/thrift_server.h +++ b/be/src/util/thrift_server.h @@ -145,11 +145,11 @@ private: class ThriftServerEventProcessor; friend class ThriftServerEventProcessor; - MetricEntity* _thrift_server_metric_entity; + std::shared_ptr _thrift_server_metric_entity; // Number of currently active connections - IntGauge thrift_current_connections; + IntGauge* thrift_current_connections; // Total connections made over the lifetime of this server - IntCounter thrift_connections_total; + IntCounter* thrift_connections_total; }; } diff --git a/be/test/agent/cgroups_mgr_test.cpp b/be/test/agent/cgroups_mgr_test.cpp index f46d1554b5..4ffcb4e3c9 100644 --- a/be/test/agent/cgroups_mgr_test.cpp +++ b/be/test/agent/cgroups_mgr_test.cpp @@ -34,6 +34,8 @@ using std::string; namespace doris { +StorageEngine* k_engine = nullptr; + class CgroupsMgrTest : public testing::Test { public: // create a mock cgroup folder @@ -41,6 +43,14 @@ public: ASSERT_FALSE(boost::filesystem::exists(_s_cgroup_path)); // create a mock cgroup path ASSERT_TRUE(boost::filesystem::create_directory(_s_cgroup_path)); + + std::vector paths; + paths.emplace_back(config::storage_root_path, -1); + + doris::EngineOptions options; + options.store_paths = paths; + Status s = doris::StorageEngine::open(options, &k_engine); + ASSERT_TRUE(s.ok()) << s.to_string(); } // delete the mock cgroup folder diff --git a/be/test/exprs/hybird_set_test.cpp b/be/test/exprs/hybird_set_test.cpp index f8a102ccb0..fc884c3f02 100644 --- a/be/test/exprs/hybird_set_test.cpp +++ b/be/test/exprs/hybird_set_test.cpp @@ -326,6 +326,8 @@ TEST_F(HybirdSetTest, string) { ASSERT_FALSE(set->find(&b)); } TEST_F(HybirdSetTest, timestamp) { + CpuInfo::init(); + HybirdSetBase* set = HybirdSetBase::create_set(TYPE_DATETIME); char s1[] = "2012-01-20 01:10:01"; char s2[] = "1990-10-20 10:10:10.123456 "; diff --git a/be/test/http/metrics_action_test.cpp b/be/test/http/metrics_action_test.cpp index db98454ef8..1dc6c72df1 100644 --- a/be/test/http/metrics_action_test.cpp +++ b/be/test/http/metrics_action_test.cpp @@ -53,18 +53,18 @@ private: TEST_F(MetricsActionTest, prometheus_output) { MetricRegistry metric_registry("test"); - MetricEntity* entity = metric_registry.register_entity("metrics_action_test.prometheus_output", {}); + std::shared_ptr entity = metric_registry.register_entity("metrics_action_test.prometheus_output"); - IntGauge cpu_idle; + IntGauge* cpu_idle = nullptr; DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(cpu_idle, MetricUnit::PERCENT); - METRIC_REGISTER(entity, cpu_idle); + INT_GAUGE_METRIC_REGISTER(entity, cpu_idle); - IntCounter put_requests_total; + IntCounter* put_requests_total = nullptr; DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(put_requests_total, MetricUnit::NOUNIT, "", requests_total, Labels({{"type", "put"}, {"path", "/sports"}})); - METRIC_REGISTER(entity, put_requests_total); + INT_COUNTER_METRIC_REGISTER(entity, put_requests_total); - cpu_idle.set_value(50); - put_requests_total.increment(2345); + cpu_idle->set_value(50); + put_requests_total->increment(2345); s_expect_response = "# TYPE test_cpu_idle gauge\n" @@ -78,13 +78,13 @@ TEST_F(MetricsActionTest, prometheus_output) { TEST_F(MetricsActionTest, prometheus_no_prefix) { MetricRegistry metric_registry(""); - MetricEntity* entity = metric_registry.register_entity("metrics_action_test.prometheus_no_prefix", {}); + std::shared_ptr entity = metric_registry.register_entity("metrics_action_test.prometheus_no_prefix"); - IntGauge cpu_idle; + IntGauge* cpu_idle = nullptr; DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(cpu_idle, MetricUnit::PERCENT); - METRIC_REGISTER(entity, cpu_idle); + INT_GAUGE_METRIC_REGISTER(entity, cpu_idle); - cpu_idle.set_value(50); + cpu_idle->set_value(50); s_expect_response = "# TYPE cpu_idle gauge\n" diff --git a/be/test/olap/push_handler_test.cpp b/be/test/olap/push_handler_test.cpp index 2f1465b6fd..39e0108fc7 100644 --- a/be/test/olap/push_handler_test.cpp +++ b/be/test/olap/push_handler_test.cpp @@ -34,7 +34,6 @@ public: init(); } static void SetUpTestCase() { - CpuInfo::init(); UserFunctionCache::instance()->init("./be/test/runtime/test_data/user_function_cache/normal"); CastFunctions::init(); } @@ -458,5 +457,6 @@ TEST_F(PushHandlerTest, PushBrokerReaderNormal) { int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); + CpuInfo::init(); return RUN_ALL_TESTS(); } diff --git a/be/test/util/doris_metrics_test.cpp b/be/test/util/doris_metrics_test.cpp index 711eceb6be..1ccd6544ae 100644 --- a/be/test/util/doris_metrics_test.cpp +++ b/be/test/util/doris_metrics_test.cpp @@ -31,152 +31,151 @@ public: }; TEST_F(DorisMetricsTest, Normal) { - auto metric_registry = DorisMetrics::instance()->metric_registry(); - auto server_entity = metric_registry->get_entity("server"); + auto server_entity = DorisMetrics::instance()->server_entity(); // check metric { - DorisMetrics::instance()->fragment_requests_total.increment(12); + DorisMetrics::instance()->fragment_requests_total->increment(12); auto metric = server_entity->get_metric("fragment_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("12", metric->to_string().c_str()); } { - DorisMetrics::instance()->fragment_request_duration_us.increment(101); + DorisMetrics::instance()->fragment_request_duration_us->increment(101); auto metric = server_entity->get_metric("fragment_request_duration_us"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("101", metric->to_string().c_str()); } { - DorisMetrics::instance()->http_requests_total.increment(102); + DorisMetrics::instance()->http_requests_total->increment(102); auto metric = server_entity->get_metric("http_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("102", metric->to_string().c_str()); } { - DorisMetrics::instance()->http_request_send_bytes.increment(104); + DorisMetrics::instance()->http_request_send_bytes->increment(104); auto metric = server_entity->get_metric("http_request_send_bytes"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("104", metric->to_string().c_str()); } { - DorisMetrics::instance()->query_scan_bytes.increment(104); + DorisMetrics::instance()->query_scan_bytes->increment(104); auto metric = server_entity->get_metric("query_scan_bytes"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("104", metric->to_string().c_str()); } { - DorisMetrics::instance()->query_scan_rows.increment(105); + DorisMetrics::instance()->query_scan_rows->increment(105); auto metric = server_entity->get_metric("query_scan_rows"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("105", metric->to_string().c_str()); } { - DorisMetrics::instance()->push_requests_success_total.increment(106); + DorisMetrics::instance()->push_requests_success_total->increment(106); auto metric = server_entity->get_metric("push_requests_success_total", "push_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("106", metric->to_string().c_str()); } { - DorisMetrics::instance()->push_requests_fail_total.increment(107); + DorisMetrics::instance()->push_requests_fail_total->increment(107); auto metric = server_entity->get_metric("push_requests_fail_total", "push_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("107", metric->to_string().c_str()); } { - DorisMetrics::instance()->push_request_duration_us.increment(108); + DorisMetrics::instance()->push_request_duration_us->increment(108); auto metric = server_entity->get_metric("push_request_duration_us"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("108", metric->to_string().c_str()); } { - DorisMetrics::instance()->push_request_write_bytes.increment(109); + DorisMetrics::instance()->push_request_write_bytes->increment(109); auto metric = server_entity->get_metric("push_request_write_bytes"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("109", metric->to_string().c_str()); } { - DorisMetrics::instance()->push_request_write_rows.increment(110); + DorisMetrics::instance()->push_request_write_rows->increment(110); auto metric = server_entity->get_metric("push_request_write_rows"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("110", metric->to_string().c_str()); } // engine request { - DorisMetrics::instance()->create_tablet_requests_total.increment(15); + DorisMetrics::instance()->create_tablet_requests_total->increment(15); auto metric = server_entity->get_metric("create_tablet_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("15", metric->to_string().c_str()); } { - DorisMetrics::instance()->drop_tablet_requests_total.increment(16); + DorisMetrics::instance()->drop_tablet_requests_total->increment(16); auto metric = server_entity->get_metric("drop_tablet_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("16", metric->to_string().c_str()); } { - DorisMetrics::instance()->report_all_tablets_requests_total.increment(17); + DorisMetrics::instance()->report_all_tablets_requests_total->increment(17); auto metric = server_entity->get_metric("report_all_tablets_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("17", metric->to_string().c_str()); } { - DorisMetrics::instance()->report_tablet_requests_total.increment(18); + DorisMetrics::instance()->report_tablet_requests_total->increment(18); auto metric = server_entity->get_metric("report_tablet_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("18", metric->to_string().c_str()); } { - DorisMetrics::instance()->schema_change_requests_total.increment(19); + DorisMetrics::instance()->schema_change_requests_total->increment(19); auto metric = server_entity->get_metric("schema_change_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("19", metric->to_string().c_str()); } { - DorisMetrics::instance()->create_rollup_requests_total.increment(20); + DorisMetrics::instance()->create_rollup_requests_total->increment(20); auto metric = server_entity->get_metric("create_rollup_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("20", metric->to_string().c_str()); } { - DorisMetrics::instance()->storage_migrate_requests_total.increment(21); + DorisMetrics::instance()->storage_migrate_requests_total->increment(21); auto metric = server_entity->get_metric("storage_migrate_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("21", metric->to_string().c_str()); } { - DorisMetrics::instance()->delete_requests_total.increment(22); + DorisMetrics::instance()->delete_requests_total->increment(22); auto metric = server_entity->get_metric("delete_requests_total", "engine_requests_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("22", metric->to_string().c_str()); } // comapction { - DorisMetrics::instance()->base_compaction_deltas_total.increment(30); + DorisMetrics::instance()->base_compaction_deltas_total->increment(30); auto metric = server_entity->get_metric("base_compaction_deltas_total", "compaction_deltas_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("30", metric->to_string().c_str()); } { - DorisMetrics::instance()->cumulative_compaction_deltas_total.increment(31); + DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(31); auto metric = server_entity->get_metric("cumulative_compaction_deltas_total", "compaction_deltas_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("31", metric->to_string().c_str()); } { - DorisMetrics::instance()->base_compaction_bytes_total.increment(32); + DorisMetrics::instance()->base_compaction_bytes_total->increment(32); auto metric = server_entity->get_metric("base_compaction_bytes_total", "compaction_bytes_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("32", metric->to_string().c_str()); } { - DorisMetrics::instance()->cumulative_compaction_bytes_total.increment(33); + DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(33); auto metric = server_entity->get_metric("cumulative_compaction_bytes_total", "compaction_bytes_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("33", metric->to_string().c_str()); } // Gauge { - DorisMetrics::instance()->memory_pool_bytes_total.increment(40); + DorisMetrics::instance()->memory_pool_bytes_total->increment(40); auto metric = server_entity->get_metric("memory_pool_bytes_total"); ASSERT_TRUE(metric != nullptr); ASSERT_STREQ("40", metric->to_string().c_str()); diff --git a/be/test/util/new_metrics_test.cpp b/be/test/util/new_metrics_test.cpp index 67b7aea77a..c1b4af8eea 100644 --- a/be/test/util/new_metrics_test.cpp +++ b/be/test/util/new_metrics_test.cpp @@ -205,9 +205,8 @@ TEST_F(MetricsTest, MetricPrototype) { } TEST_F(MetricsTest, MetricEntityWithMetric) { - MetricEntity entity("test_entity", {}); + MetricEntity entity(MetricEntityType::kServer, "test_entity", {}); - IntCounter cpu_idle; MetricPrototype cpu_idle_type(MetricType::COUNTER, MetricUnit::PERCENT, "cpu_idle"); // Before register @@ -215,14 +214,14 @@ TEST_F(MetricsTest, MetricEntityWithMetric) { ASSERT_EQ(nullptr, metric); // Register - entity.register_metric(&cpu_idle_type, &cpu_idle); - cpu_idle.increment(12); + IntCounter* cpu_idle = (IntCounter*)entity.register_metric(&cpu_idle_type); + cpu_idle->increment(12); metric = entity.get_metric("cpu_idle"); ASSERT_NE(nullptr, metric); ASSERT_EQ("12", metric->to_string()); - cpu_idle.increment(8); + cpu_idle->increment(8); ASSERT_EQ("20", metric->to_string()); // Deregister @@ -234,15 +233,14 @@ TEST_F(MetricsTest, MetricEntityWithMetric) { } TEST_F(MetricsTest, MetricEntityWithHook) { - MetricEntity entity("test_entity", {}); + MetricEntity entity(MetricEntityType::kServer, "test_entity", {}); - IntCounter cpu_idle; MetricPrototype cpu_idle_type(MetricType::COUNTER, MetricUnit::PERCENT, "cpu_idle"); // Register - entity.register_metric(&cpu_idle_type, &cpu_idle); - entity.register_hook("test_hook", [&cpu_idle]() { - cpu_idle.increment(6); + IntCounter* cpu_idle = (IntCounter*)entity.register_metric(&cpu_idle_type); + entity.register_hook("test_hook", [cpu_idle]() { + cpu_idle->increment(6); }); // Before hook @@ -272,22 +270,30 @@ TEST_F(MetricsTest, MetricRegistryRegister) { ASSERT_EQ("[]", registry.to_json()); ASSERT_EQ("", registry.to_core_string()); - // Before register - auto entity = registry.get_entity("test_entity").get(); - ASSERT_EQ(nullptr, entity); - // Register - entity = registry.register_entity("test_entity", {}); - ASSERT_NE(nullptr, entity); - - // After register - auto entity1 = registry.get_entity("test_entity").get(); + auto entity1 = registry.register_entity("test_entity"); ASSERT_NE(nullptr, entity1); - ASSERT_EQ(entity, entity1); - registry.deregister_entity("test_entity"); - entity = registry.get_entity("test_entity").get(); - ASSERT_EQ(nullptr, entity); + // Register again + auto entity2 = registry.register_entity("test_entity"); + ASSERT_NE(nullptr, entity2); + ASSERT_EQ(entity1.get(), entity2.get()); + + // Deregister entity once + registry.deregister_entity(entity1); + + // Still exist and equal to entity1 + entity2 = registry.get_entity("test_entity"); + ASSERT_NE(nullptr, entity2); + ASSERT_EQ(entity1.get(), entity2.get()); + + // Deregister entity twice + registry.deregister_entity(entity2); + + // Not exist and registry is empty + entity2 = registry.get_entity("test_entity"); + ASSERT_EQ(nullptr, entity2); + ASSERT_EQ("", registry.to_prometheus()); } TEST_F(MetricsTest, MetricRegistryOutput) { @@ -302,85 +308,79 @@ TEST_F(MetricsTest, MetricRegistryOutput) { { // Register one common metric to the entity - auto entity = registry.register_entity("test_entity", {}); + auto entity = registry.register_entity("test_entity"); - IntGauge cpu_idle; MetricPrototype cpu_idle_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_idle", "", "", {}, true); - entity->register_metric(&cpu_idle_type, &cpu_idle); - cpu_idle.increment(8); + IntCounter* cpu_idle = (IntCounter*)entity->register_metric(&cpu_idle_type); + cpu_idle->increment(8); ASSERT_EQ(R"(# TYPE test_registry_cpu_idle gauge test_registry_cpu_idle 8 )", registry.to_prometheus()); ASSERT_EQ(R"([{"tags":{"metric":"cpu_idle"},"unit":"percent","value":8}])", registry.to_json()); ASSERT_EQ("test_registry_cpu_idle LONG 8\n", registry.to_core_string()); - registry.deregister_entity("test_entity"); + registry.deregister_entity(entity); } { // Register one metric with group name to the entity - auto entity = registry.register_entity("test_entity", {}); + auto entity = registry.register_entity("test_entity"); - IntGauge cpu_idle; MetricPrototype cpu_idle_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_idle", "", "cpu", {{"mode", "idle"}}, false); - entity->register_metric(&cpu_idle_type, &cpu_idle); - cpu_idle.increment(18); + IntCounter* cpu_idle = (IntCounter*)entity->register_metric(&cpu_idle_type); + cpu_idle->increment(18); ASSERT_EQ(R"(# TYPE test_registry_cpu gauge test_registry_cpu{mode="idle"} 18 )", registry.to_prometheus()); ASSERT_EQ(R"([{"tags":{"metric":"cpu","mode":"idle"},"unit":"percent","value":18}])", registry.to_json()); ASSERT_EQ("", registry.to_core_string()); - registry.deregister_entity("test_entity"); + registry.deregister_entity(entity); } { // Register one common metric to an entity with label auto entity = registry.register_entity("test_entity", {{"name", "lable_test"}}); - IntGauge cpu_idle; MetricPrototype cpu_idle_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_idle"); - entity->register_metric(&cpu_idle_type, &cpu_idle); - cpu_idle.increment(28); + IntCounter* cpu_idle = (IntCounter*)entity->register_metric(&cpu_idle_type); + cpu_idle->increment(28); ASSERT_EQ(R"(# TYPE test_registry_cpu_idle gauge test_registry_cpu_idle{name="lable_test"} 28 )", registry.to_prometheus()); ASSERT_EQ(R"([{"tags":{"metric":"cpu_idle","name":"lable_test"},"unit":"percent","value":28}])", registry.to_json()); ASSERT_EQ("", registry.to_core_string()); - registry.deregister_entity("test_entity"); + registry.deregister_entity(entity); } { // Register one common metric with group name to an entity with label auto entity = registry.register_entity("test_entity", {{"name", "lable_test"}}); - IntGauge cpu_idle; MetricPrototype cpu_idle_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_idle", "", "cpu", {{"mode", "idle"}}); - entity->register_metric(&cpu_idle_type, &cpu_idle); - cpu_idle.increment(38); + IntCounter* cpu_idle = (IntCounter*)entity->register_metric(&cpu_idle_type); + cpu_idle->increment(38); ASSERT_EQ(R"(# TYPE test_registry_cpu gauge test_registry_cpu{name="lable_test",mode="idle"} 38 )", registry.to_prometheus()); ASSERT_EQ(R"([{"tags":{"metric":"cpu","mode":"idle","name":"lable_test"},"unit":"percent","value":38}])", registry.to_json()); ASSERT_EQ("", registry.to_core_string()); - registry.deregister_entity("test_entity"); + registry.deregister_entity(entity); } { // Register two common metrics to one entity - auto entity = registry.register_entity("test_entity", {}); + auto entity = registry.register_entity("test_entity"); - IntGauge cpu_idle; MetricPrototype cpu_idle_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_idle", "", "cpu", {{"mode", "idle"}}); - entity->register_metric(&cpu_idle_type, &cpu_idle); - cpu_idle.increment(48); + IntCounter* cpu_idle = (IntCounter*)entity->register_metric(&cpu_idle_type); + cpu_idle->increment(48); - IntGauge cpu_guest; MetricPrototype cpu_guest_type(MetricType::GAUGE, MetricUnit::PERCENT, "cpu_guest", "", "cpu", {{"mode", "guest"}}); - entity->register_metric(&cpu_guest_type, &cpu_guest); - cpu_guest.increment(58); + IntGauge* cpu_guest = (IntGauge*)entity->register_metric(&cpu_guest_type); + cpu_guest->increment(58); ASSERT_EQ(R"(# TYPE test_registry_cpu gauge test_registry_cpu{mode="idle"} 48 @@ -388,7 +388,7 @@ test_registry_cpu{mode="guest"} 58 )", registry.to_prometheus()); ASSERT_EQ(R"([{"tags":{"metric":"cpu","mode":"guest"},"unit":"percent","value":58},{"tags":{"metric":"cpu","mode":"idle"},"unit":"percent","value":48}])", registry.to_json()); ASSERT_EQ("", registry.to_core_string()); - registry.deregister_entity("test_entity"); + registry.deregister_entity(entity); } } } diff --git a/be/test/util/system_metrics_test.cpp b/be/test/util/system_metrics_test.cpp index a3e8cbb6ea..c18215e092 100644 --- a/be/test/util/system_metrics_test.cpp +++ b/be/test/util/system_metrics_test.cpp @@ -108,7 +108,7 @@ TEST_F(SystemMetricsTest, normal) { ASSERT_TRUE(memory_allocated_bytes != nullptr); // network - auto net_entity = registry.get_entity("network_metrics.xgbe0"); + auto net_entity = registry.get_entity("network_metrics.xgbe0", {{"device", "xgbe0"}}); ASSERT_TRUE(net_entity != nullptr); Metric* receive_bytes = net_entity->get_metric("network_receive_bytes"); @@ -125,7 +125,7 @@ TEST_F(SystemMetricsTest, normal) { ASSERT_STREQ("88277614", send_packets->to_string().c_str()); // disk - auto disk_entity = registry.get_entity("disk_metrics.sda"); + auto disk_entity = registry.get_entity("disk_metrics.sda", {{"device", "sda"}}); ASSERT_TRUE(disk_entity != nullptr); Metric* bytes_read = disk_entity->get_metric("disk_bytes_read"); ASSERT_TRUE(bytes_read != nullptr); @@ -207,13 +207,13 @@ TEST_F(SystemMetricsTest, no_proc_file) { Metric* memory_allocated_bytes = entity->get_metric("memory_allocated_bytes"); ASSERT_TRUE(memory_allocated_bytes != nullptr); // network - auto net_entity = registry.get_entity("network_metrics.xgbe0"); + auto net_entity = registry.get_entity("network_metrics.xgbe0", {{"device", "xgbe0"}}); ASSERT_TRUE(net_entity != nullptr); Metric* receive_bytes = net_entity->get_metric("network_receive_bytes"); ASSERT_TRUE(receive_bytes != nullptr); ASSERT_STREQ("0", receive_bytes->to_string().c_str()); // disk - auto disk_entity = registry.get_entity("disk_metrics.sda"); + auto disk_entity = registry.get_entity("disk_metrics.sda", {{"device", "sda"}}); ASSERT_TRUE(disk_entity != nullptr); Metric* bytes_read = disk_entity->get_metric("disk_bytes_read"); ASSERT_TRUE(bytes_read != nullptr);