diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 32f57c2998..8c7796f326 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -210,15 +210,22 @@ void PipelineTask::set_task_queue(TaskQueue* task_queue) { Status PipelineTask::execute(bool* eos) { SCOPED_TIMER(_task_profile->total_time_counter()); - SCOPED_CPU_TIMER(_task_cpu_timer); SCOPED_TIMER(_exec_timer); SCOPED_ATTACH_TASK(_state); int64_t time_spent = 0; + ThreadCpuStopWatch cpu_time_stop_watch; + cpu_time_stop_watch.start(); Defer defer {[&]() { if (_task_queue) { _task_queue->update_statistics(this, time_spent); } + int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time(); + _task_cpu_timer->update(delta_cpu_time); + auto cpu_qs = query_context()->get_cpu_statistics(); + if (cpu_qs) { + cpu_qs->add_cpu_nanos(delta_cpu_time); + } }}; // The status must be runnable *eos = false; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 29bc70fce6..574f4e7e5c 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -218,14 +218,22 @@ Status PipelineXTask::_open() { Status PipelineXTask::execute(bool* eos) { SCOPED_TIMER(_task_profile->total_time_counter()); - SCOPED_CPU_TIMER(_task_cpu_timer); SCOPED_TIMER(_exec_timer); SCOPED_ATTACH_TASK(_state); int64_t time_spent = 0; + + ThreadCpuStopWatch cpu_time_stop_watch; + cpu_time_stop_watch.start(); Defer defer {[&]() { if (_task_queue) { _task_queue->update_statistics(this, time_spent); } + int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time(); + _task_cpu_timer->update(delta_cpu_time); + auto cpu_qs = query_context()->get_cpu_statistics(); + if (cpu_qs) { + cpu_qs->add_cpu_nanos(delta_cpu_time); + } }}; // The status must be runnable *eos = false; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index d18eb2ef00..f0282e9d7c 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -678,6 +678,9 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo query_ctx->query_mem_tracker->enable_print_log_usage(); } + query_ctx->register_memory_statistics(); + query_ctx->register_cpu_statistics(); + if constexpr (std::is_same_v) { if (params.__isset.workload_groups && !params.workload_groups.empty()) { uint64_t tg_id = params.workload_groups[0].id; diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index 2e6e959541..9ce12f1532 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -32,6 +32,7 @@ #include #include "common/compiler_util.h" // IWYU pragma: keep +#include "runtime/query_statistics.h" #include "util/pretty_printer.h" namespace doris { @@ -138,6 +139,9 @@ public: return; } _consumption->add(bytes); + if (_query_statistics) { + _query_statistics->set_max_peak_memory_bytes(_consumption->peak_value()); + } } void consume_no_update_peak(int64_t bytes) { // need extreme fast @@ -148,6 +152,8 @@ public: void set_consumption(int64_t bytes) { _consumption->set(bytes); } + std::shared_ptr get_query_statistics() { return _query_statistics; } + public: virtual Snapshot make_snapshot() const; // Specify group_num from mem_tracker_pool to generate snapshot. @@ -180,6 +186,8 @@ protected: // Iterator into mem_tracker_pool for this object. Stored to have O(1) remove. std::list::iterator _tracker_group_it; + + std::shared_ptr _query_statistics = nullptr; }; } // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index a4a8d61928..f9980806f2 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -78,6 +78,12 @@ MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, int64_ } else { _group_num = random() % 999 + 1; } + + // currently only select/load need runtime query statistics + if (_type == Type::LOAD || _type == Type::QUERY) { + _query_statistics = std::make_shared(); + } + { std::lock_guard l(mem_tracker_limiter_pool[_group_num].group_lock); _tracker_limiter_group_it = mem_tracker_limiter_pool[_group_num].trackers.insert( diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 9c40e2e9f3..2e8735c013 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -309,7 +309,13 @@ Status PlanFragmentExecutor::open() { Status PlanFragmentExecutor::open_vectorized_internal() { SCOPED_TIMER(profile()->total_time_counter()); { - SCOPED_CPU_TIMER(_fragment_cpu_timer); + ThreadCpuStopWatch cpu_time_stop_watch; + cpu_time_stop_watch.start(); + Defer defer {[&]() { + int64_t cpu_time = cpu_time_stop_watch.elapsed_time(); + _fragment_cpu_timer->update(cpu_time); + }}; + RETURN_IF_ERROR(_plan->open(_runtime_state.get())); RETURN_IF_CANCELLED(_runtime_state); if (_sink == nullptr) { @@ -322,13 +328,18 @@ Status PlanFragmentExecutor::open_vectorized_internal() { auto st = Status::OK(); + int64_t old_cpu_time = cpu_time_stop_watch.elapsed_time(); while (!eos) { + Defer defer {[&]() { + int64_t current_cpu_time = cpu_time_stop_watch.elapsed_time(); + int64_t delta_time = current_cpu_time - old_cpu_time; + _query_statistics->add_cpu_nanos(delta_time); + old_cpu_time = current_cpu_time; + }}; RETURN_IF_CANCELLED(_runtime_state); st = get_vectorized_internal(block.get(), &eos); RETURN_IF_ERROR(st); - _query_statistics->add_cpu_ms(_fragment_cpu_timer->value() / NANOS_PER_MILLIS); - if (!eos || block->rows() > 0) { st = _sink->send(runtime_state(), block.get()); if (st.is()) { diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index bd7dee33b5..fffb5ad57a 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -131,4 +131,25 @@ std::shared_ptr QueryContext::get_query_statistics() { print_id(_query_id)); } +void QueryContext::register_memory_statistics() { + if (query_mem_tracker) { + std::shared_ptr qs = query_mem_tracker->get_query_statistics(); + std::string query_id = print_id(_query_id); + if (qs) { + _exec_env->runtime_query_statistics_mgr()->register_query_statistics(query_id, qs, + coord_addr); + } else { + LOG(INFO) << " query " << query_id << " get memory query statistics failed "; + } + } +} + +void QueryContext::register_cpu_statistics() { + if (!_cpu_statistics) { + _cpu_statistics = std::make_shared(); + _exec_env->runtime_query_statistics_mgr()->register_query_statistics( + print_id(_query_id), _cpu_statistics, coord_addr); + } +} + } // namespace doris diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 9e906cbbe1..dd24206c41 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -212,6 +212,12 @@ public: std::shared_ptr get_query_statistics(); + void register_memory_statistics(); + + void register_cpu_statistics(); + + std::shared_ptr get_cpu_statistics() { return _cpu_statistics; } + public: DescriptorTbl* desc_tbl = nullptr; bool set_rsc_info = false; @@ -278,6 +284,8 @@ private: pipeline::TaskScheduler* _task_scheduler = nullptr; vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr; std::unique_ptr _execution_dependency; + + std::shared_ptr _cpu_statistics = nullptr; }; } // namespace doris diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp index 7171803ce0..f71bad24e8 100644 --- a/be/src/runtime/query_statistics.cpp +++ b/be/src/runtime/query_statistics.cpp @@ -22,6 +22,8 @@ #include +#include "util/time.h" + namespace doris { void NodeStatistics::merge(const NodeStatistics& other) { @@ -40,7 +42,11 @@ void NodeStatistics::from_pb(const PNodeStatistics& node_statistics) { void QueryStatistics::merge(const QueryStatistics& other) { scan_rows += other.scan_rows; scan_bytes += other.scan_bytes; - cpu_ms += other.cpu_ms; + int64_t other_cpu_time = other.cpu_nanos.load(std::memory_order_relaxed); + cpu_nanos += other_cpu_time; + if (other.max_peak_memory_bytes > this->max_peak_memory_bytes) { + this->max_peak_memory_bytes = other.max_peak_memory_bytes.load(std::memory_order_relaxed); + } for (auto& other_node_statistics : other._nodes_statistics_map) { int64_t node_id = other_node_statistics.first; auto node_statistics = add_nodes_statistics(node_id); @@ -52,7 +58,7 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) { DCHECK(statistics != nullptr); statistics->set_scan_rows(scan_rows); statistics->set_scan_bytes(scan_bytes); - statistics->set_cpu_ms(cpu_ms); + statistics->set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS); statistics->set_returned_rows(returned_rows); statistics->set_max_peak_memory_bytes(max_peak_memory_bytes); for (auto iter = _nodes_statistics_map.begin(); iter != _nodes_statistics_map.end(); ++iter) { @@ -66,15 +72,15 @@ void QueryStatistics::to_thrift(TQueryStatistics* statistics) const { DCHECK(statistics != nullptr); statistics->__set_scan_bytes(scan_bytes); statistics->__set_scan_rows(scan_rows); - statistics->__set_cpu_ms(cpu_ms); + statistics->__set_cpu_ms(cpu_nanos.load(std::memory_order_relaxed) / NANOS_PER_MILLIS); statistics->__set_returned_rows(returned_rows); - statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes); + statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes.load(std::memory_order_relaxed)); } void QueryStatistics::from_pb(const PQueryStatistics& statistics) { scan_rows = statistics.scan_rows(); scan_bytes = statistics.scan_bytes(); - cpu_ms = statistics.cpu_ms(); + cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS; for (auto& p_node_statistics : statistics.nodes_statistics()) { int64_t node_id = p_node_statistics.node_id(); auto node_statistics = add_nodes_statistics(node_id); diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index 8c4662ba59..7f11daf6ec 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -60,7 +60,11 @@ private: class QueryStatistics { public: QueryStatistics() - : scan_rows(0), scan_bytes(0), cpu_ms(0), returned_rows(0), max_peak_memory_bytes(0) {} + : scan_rows(0), + scan_bytes(0), + cpu_nanos(0), + returned_rows(0), + max_peak_memory_bytes(0) {} virtual ~QueryStatistics(); void merge(const QueryStatistics& other); @@ -69,7 +73,9 @@ public: void add_scan_bytes(int64_t scan_bytes) { this->scan_bytes += scan_bytes; } - void add_cpu_ms(int64_t cpu_ms) { this->cpu_ms += cpu_ms; } + void add_cpu_nanos(int64_t delta_cpu_time) { + this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed); + } NodeStatistics* add_nodes_statistics(int64_t node_id) { NodeStatistics* nodeStatistics = nullptr; @@ -86,7 +92,7 @@ public: void set_returned_rows(int64_t num_rows) { this->returned_rows = num_rows; } void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) { - this->max_peak_memory_bytes = max_peak_memory_bytes; + this->max_peak_memory_bytes.store(max_peak_memory_bytes, std::memory_order_relaxed); } void merge(QueryStatisticsRecvr* recvr); @@ -98,12 +104,12 @@ public: void clearNodeStatistics(); void clear() { - scan_rows.store(0); - scan_bytes.store(0); + scan_rows.store(0, std::memory_order_relaxed); + scan_bytes.store(0, std::memory_order_relaxed); - cpu_ms = 0; + cpu_nanos.store(0, std::memory_order_relaxed); returned_rows = 0; - max_peak_memory_bytes = 0; + max_peak_memory_bytes.store(0, std::memory_order_relaxed); clearNodeStatistics(); //clear() is used before collection, so calling "clear" is equivalent to being collected. set_collected(); @@ -122,13 +128,13 @@ private: friend class QueryStatisticsRecvr; std::atomic scan_rows; std::atomic scan_bytes; - int64_t cpu_ms; + std::atomic cpu_nanos; // number rows returned by query. // only set once by result sink when closing. int64_t returned_rows; // Maximum memory peak for all backends. // only set once by result sink when closing. - int64_t max_peak_memory_bytes; + std::atomic max_peak_memory_bytes; // The statistics of the query on each backend. using NodeStatisticsMap = std::unordered_map; NodeStatisticsMap _nodes_statistics_map; diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index a9d1226b0d..6df9c0b858 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -24,6 +24,14 @@ namespace doris { +void QueryStatisticsCtx::collect_query_statistics(TQueryStatistics* tq_s) { + QueryStatistics tmp_qs; + for (auto& qs_ptr : _qs_list) { + tmp_qs.merge(*qs_ptr); + } + tmp_qs.to_thrift(tq_s); +} + void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id, std::shared_ptr qs_ptr, TNetworkAddress fe_addr) { @@ -31,7 +39,7 @@ void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id, if (_query_statistics_ctx_map.find(query_id) == _query_statistics_ctx_map.end()) { _query_statistics_ctx_map[query_id] = std::make_unique(fe_addr); } - _query_statistics_ctx_map.at(query_id)->qs_list.push_back(qs_ptr); + _query_statistics_ctx_map.at(query_id)->_qs_list.push_back(qs_ptr); } void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { @@ -44,24 +52,20 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { int64_t current_time = MonotonicMillis(); int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms; for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { - if (fe_qs_map.find(qs_ctx_ptr->fe_addr) == fe_qs_map.end()) { + if (fe_qs_map.find(qs_ctx_ptr->_fe_addr) == fe_qs_map.end()) { std::map tmp_map; - fe_qs_map[qs_ctx_ptr->fe_addr] = std::move(tmp_map); + fe_qs_map[qs_ctx_ptr->_fe_addr] = std::move(tmp_map); } - QueryStatistics tmp_qs; - for (auto& qs_ptr : qs_ctx_ptr->qs_list) { - tmp_qs.merge(*qs_ptr); - } TQueryStatistics ret_t_qs; - tmp_qs.to_thrift(&ret_t_qs); - fe_qs_map.at(qs_ctx_ptr->fe_addr)[query_id] = ret_t_qs; + qs_ctx_ptr->collect_query_statistics(&ret_t_qs); + fe_qs_map.at(qs_ctx_ptr->_fe_addr)[query_id] = ret_t_qs; - bool is_query_finished = qs_ctx_ptr->is_query_finished; + bool is_query_finished = qs_ctx_ptr->_is_query_finished; bool is_timeout_after_finish = false; if (is_query_finished) { is_timeout_after_finish = - (current_time - qs_ctx_ptr->query_finish_time) > conf_qs_timeout; + (current_time - qs_ctx_ptr->_query_finish_time) > conf_qs_timeout; } qs_status[query_id] = std::make_pair(is_query_finished, is_timeout_after_finish); } @@ -149,8 +153,8 @@ void RuntimeQueryStatiticsMgr::set_query_finished(std::string query_id) { // it may not register query statistics, so it can not be mark finish if (_query_statistics_ctx_map.find(query_id) != _query_statistics_ctx_map.end()) { auto* qs_ptr = _query_statistics_ctx_map.at(query_id).get(); - qs_ptr->is_query_finished = true; - qs_ptr->query_finish_time = MonotonicMillis(); + qs_ptr->_is_query_finished = true; + qs_ptr->_query_finish_time = MonotonicMillis(); } } @@ -161,7 +165,7 @@ std::shared_ptr RuntimeQueryStatiticsMgr::get_runtime_query_sta return nullptr; } std::shared_ptr qs_ptr = std::make_shared(); - for (auto const& qs : _query_statistics_ctx_map[query_id]->qs_list) { + for (auto const& qs : _query_statistics_ctx_map[query_id]->_qs_list) { qs_ptr->merge(*qs); } return qs_ptr; diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h index c4e997d9ff..6f1ea11a61 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.h +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -26,15 +26,18 @@ namespace doris { class QueryStatisticsCtx { public: - QueryStatisticsCtx(TNetworkAddress fe_addr) : fe_addr(fe_addr) { - this->is_query_finished = false; + QueryStatisticsCtx(TNetworkAddress fe_addr) : _fe_addr(fe_addr) { + this->_is_query_finished = false; } ~QueryStatisticsCtx() = default; - std::vector> qs_list; - bool is_query_finished; - TNetworkAddress fe_addr; - int64_t query_finish_time; + void collect_query_statistics(TQueryStatistics* tq_s); + +public: + std::vector> _qs_list; + bool _is_query_finished; + TNetworkAddress _fe_addr; + int64_t _query_finish_time; }; class RuntimeQueryStatiticsMgr { diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 23ddb65629..49c4c8f31e 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -123,7 +123,11 @@ public: int64_t get_scanner_wait_worker_timer() const { return _scanner_wait_worker_timer; } - void update_scan_cpu_timer() { _scan_cpu_timer += _cpu_watch.elapsed_time(); } + void update_scan_cpu_timer() { + int64_t cpu_time = _cpu_watch.elapsed_time(); + _scan_cpu_timer += cpu_time; + _query_statistics->add_cpu_nanos(cpu_time); + } RuntimeState* runtime_state() { return _state; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index 1f8cef9271..7fac98ca73 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -69,6 +69,7 @@ public class WorkloadRuntimeStatusMgr { auditEvent.scanRows = queryStats.scan_rows; auditEvent.scanBytes = queryStats.scan_bytes; auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes; + auditEvent.cpuTimeMs = queryStats.cpu_ms; } Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent); }