diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 1b8b5a80ea..ce79fb56fc 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -124,6 +124,7 @@ void OlapScanNode::_init_counter(RuntimeState* state) { _index_load_timer = ADD_TIMER(_segment_profile, "IndexLoadTime_V1"); _scan_timer = ADD_TIMER(_scanner_profile, "ScanTime"); + _scan_cpu_timer = ADD_TIMER(_scanner_profile, "ScanCpuTime"); _total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum", TUnit::UNIT); _cached_pages_num_counter = ADD_COUNTER(_segment_profile, "CachedPagesNum", TUnit::UNIT); @@ -290,6 +291,7 @@ Status OlapScanNode::collect_query_statistics(QueryStatistics* statistics) { RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics)); statistics->add_scan_bytes(_read_compressed_counter->value()); statistics->add_scan_rows(_raw_rows_counter->value()); + statistics->add_cpu_ms(_scan_cpu_timer->value() / NANOS_PER_MILLIS); return Status::OK(); } @@ -1308,6 +1310,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { } void OlapScanNode::scanner_thread(OlapScanner* scanner) { + SCOPED_CPU_TIMER(_scan_cpu_timer); Status status = Status::OK(); bool eos = false; RuntimeState* state = scanner->runtime_state(); diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index d0fa54ee49..0f5cbb6fe4 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -244,6 +244,7 @@ private: Status _status; RuntimeState* _runtime_state; RuntimeProfile::Counter* _scan_timer; + RuntimeProfile::Counter* _scan_cpu_timer = nullptr; RuntimeProfile::Counter* _tablet_counter; RuntimeProfile::Counter* _rows_pushed_cond_filtered_counter = nullptr; RuntimeProfile::Counter* _reader_init_timer = nullptr; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 20a4dd7d42..e29ee96d59 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -218,6 +218,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, // set up profile counters profile()->add_child(_plan->runtime_profile(), true, NULL); _rows_produced_counter = ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT); + _fragment_cpu_timer = ADD_TIMER(profile(), "FragmentCpuTime"); _row_batch.reset(new RowBatch(_plan->row_desc(), _runtime_state->batch_size(), _runtime_state->instance_mem_tracker().get())); @@ -264,6 +265,7 @@ Status PlanFragmentExecutor::open() { Status PlanFragmentExecutor::open_internal() { { + SCOPED_CPU_TIMER(_fragment_cpu_timer); SCOPED_TIMER(profile()->total_time_counter()); RETURN_IF_ERROR(_plan->open(_runtime_state.get())); } @@ -271,14 +273,19 @@ Status PlanFragmentExecutor::open_internal() { if (_sink.get() == NULL) { return Status::OK(); } - RETURN_IF_ERROR(_sink->open(runtime_state())); + { + SCOPED_CPU_TIMER(_fragment_cpu_timer); + RETURN_IF_ERROR(_sink->open(runtime_state())); + } // If there is a sink, do all the work of driving it here, so that // when this returns the query has actually finished RowBatch* batch = NULL; - while (true) { - RETURN_IF_ERROR(get_next_internal(&batch)); + { + SCOPED_CPU_TIMER(_fragment_cpu_timer); + RETURN_IF_ERROR(get_next_internal(&batch)); + } if (batch == NULL) { break; @@ -295,9 +302,10 @@ Status PlanFragmentExecutor::open_internal() { } SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_CPU_TIMER(_fragment_cpu_timer); // Collect this plan and sub plan statistics, and send to parent plan. if (_collect_query_statistics_with_every_batch) { - collect_query_statistics(); + _collect_query_statistics(); } RETURN_IF_ERROR(_sink->send(runtime_state(), batch)); } @@ -315,7 +323,7 @@ Status PlanFragmentExecutor::open_internal() { // audit the sinks to check that this is ok, or change that behaviour. { SCOPED_TIMER(profile()->total_time_counter()); - collect_query_statistics(); + _collect_query_statistics(); Status status; { boost::lock_guard l(_status_lock); @@ -337,9 +345,10 @@ Status PlanFragmentExecutor::open_internal() { return Status::OK(); } -void PlanFragmentExecutor::collect_query_statistics() { +void PlanFragmentExecutor::_collect_query_statistics() { _query_statistics->clear(); _plan->collect_query_statistics(_query_statistics.get()); + _query_statistics->add_cpu_ms(_fragment_cpu_timer->value() / NANOS_PER_MILLIS); } void PlanFragmentExecutor::report_profile() { diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index e2406fef63..37f544a703 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -28,6 +28,7 @@ #include "runtime/query_statistics.h" #include "runtime/runtime_state.h" #include "util/hash_util.hpp" +#include "util/time.h" namespace doris { @@ -205,6 +206,8 @@ private: // Number of rows returned by this fragment RuntimeProfile::Counter* _rows_produced_counter; + RuntimeProfile::Counter* _fragment_cpu_timer; + // Average number of thread tokens for the duration of the plan fragment execution. // Fragments that do a lot of cpu work (non-coordinator fragment) will have at // least 1 token. Fragments that contain a hdfs scan node will have 1+ tokens @@ -257,7 +260,7 @@ private: const DescriptorTbl& desc_tbl() { return _runtime_state->desc_tbl(); } - void collect_query_statistics(); + void _collect_query_statistics(); }; // Save the common components of fragments in a query. diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index 09ef8282f1..92234e8872 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -32,17 +32,20 @@ class QueryStatisticsRecvr; // or plan's statistics and QueryStatisticsRecvr is responsible for collecting it. class QueryStatistics { public: - QueryStatistics() : scan_rows(0), scan_bytes(0), returned_rows(0) {} + QueryStatistics() : scan_rows(0), scan_bytes(0), cpu_ms(0), returned_rows(0) {} void merge(const QueryStatistics& other) { scan_rows += other.scan_rows; scan_bytes += other.scan_bytes; + cpu_ms += other.cpu_ms; } void add_scan_rows(int64_t scan_rows) { this->scan_rows += scan_rows; } void add_scan_bytes(int64_t scan_bytes) { this->scan_bytes += scan_bytes; } + void add_cpu_ms(int64_t cpu_ms) { this->cpu_ms += cpu_ms; } + void set_returned_rows(int64_t num_rows) { this->returned_rows = num_rows; } void merge(QueryStatisticsRecvr* recvr); @@ -50,6 +53,7 @@ public: void clear() { scan_rows = 0; scan_bytes = 0; + cpu_ms = 0; returned_rows = 0; } @@ -57,17 +61,20 @@ public: DCHECK(statistics != nullptr); statistics->set_scan_rows(scan_rows); statistics->set_scan_bytes(scan_bytes); + statistics->set_cpu_ms(cpu_ms); statistics->set_returned_rows(returned_rows); } void merge_pb(const PQueryStatistics& statistics) { scan_rows += statistics.scan_rows(); scan_bytes += statistics.scan_bytes(); + cpu_ms += statistics.cpu_ms(); } private: int64_t scan_rows; int64_t scan_bytes; + int64_t cpu_ms; // number rows returned by query. // only set once by result sink when closing. int64_t returned_rows; diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index 46659c4647..2bd2da759b 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -48,6 +48,7 @@ namespace doris { #define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS) #define ADD_CHILD_TIMER(profile, name, parent) (profile)->add_counter(name, TUnit::TIME_NS, parent) #define SCOPED_TIMER(c) ScopedTimer MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) +#define SCOPED_CPU_TIMER(c) ScopedTimer MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) #define CANCEL_SAFE_SCOPED_TIMER(c, is_cancelled) \ ScopedTimer MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c, is_cancelled) #define SCOPED_RAW_TIMER(c) \ diff --git a/be/src/util/stopwatch.hpp b/be/src/util/stopwatch.hpp index 1d52d86f5b..53905a54ee 100644 --- a/be/src/util/stopwatch.hpp +++ b/be/src/util/stopwatch.hpp @@ -29,16 +29,17 @@ namespace doris { // it is not affected by user setting the system clock. // CLOCK_MONOTONIC represents monotonic time since some unspecified starting point. // It is good for computing elapsed time. -class MonotonicStopWatch { +template +class CustomStopWatch { public: - MonotonicStopWatch() { + CustomStopWatch() { _total_time = 0; _running = false; } void start() { if (!_running) { - clock_gettime(CLOCK_MONOTONIC, &_start); + clock_gettime(Clock, &_start); _running = true; } } @@ -55,7 +56,7 @@ public: uint64_t ret = elapsed_time(); if (_running) { - clock_gettime(CLOCK_MONOTONIC, &_start); + clock_gettime(Clock, &_start); } return ret; @@ -68,7 +69,7 @@ public: } timespec end; - clock_gettime(CLOCK_MONOTONIC, &end); + clock_gettime(Clock, &end); return (end.tv_sec - _start.tv_sec) * 1000L * 1000L * 1000L + (end.tv_nsec - _start.tv_nsec); } @@ -79,6 +80,17 @@ private: bool _running; }; +// Stop watch for reporting elapsed time in nanosec based on CLOCK_MONOTONIC. +// It is as fast as Rdtsc. +// It is also accurate because it not affected by cpu frequency changes and +// it is not affected by user setting the system clock. +// CLOCK_MONOTONIC represents monotonic time since some unspecified starting point. +// It is good for computing elapsed time. +using MonotonicStopWatch = CustomStopWatch; + +// Stop watch for reporting elapsed nanosec based on CLOCK_THREAD_CPUTIME_ID. +using ThreadCpuStopWatch = CustomStopWatch; + } #endif diff --git a/be/src/util/time.h b/be/src/util/time.h index 26e54ea26e..f54dc82f8d 100644 --- a/be/src/util/time.h +++ b/be/src/util/time.h @@ -24,6 +24,7 @@ #include #define NANOS_PER_SEC 1000000000ll +#define NANOS_PER_MILLIS 1000000ll #define NANOS_PER_MICRO 1000ll #define MICROS_PER_SEC 1000000ll #define MICROS_PER_MILLI 1000ll diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java index ebaf3dab44..1e9f9a7d43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java @@ -75,6 +75,8 @@ public class AuditEvent { public String feIp = ""; @AuditField(value = "Stmt") public String stmt = ""; + @AuditField(value = "CpuTimeMS") + public long cpuTimeMs = -1; public static class AuditEventBuilder { @@ -127,6 +129,11 @@ public class AuditEvent { return this; } + public AuditEventBuilder setCpuTimeMs(long cpuTimeMs) { + auditEvent.cpuTimeMs = cpuTimeMs; + return this; + } + public AuditEventBuilder setScanRows(long scanRows) { auditEvent.scanRows = scanRows; return this; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 993de8eb47..e32cbee1e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -115,6 +115,7 @@ public class ConnectProcessor { .setState(ctx.getState().toString()).setQueryTime(elapseMs) .setScanBytes(statistics == null ? 0 : statistics.scan_bytes) .setScanRows(statistics == null ? 0 : statistics.scan_rows) + .setCpuTimeMs(statistics == null ? 0 : statistics.cpu_ms) .setReturnRows(ctx.getReturnRows()) .setStmtId(ctx.getStmtId()) .setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index c894048a6d..b176dc0731 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1125,6 +1125,9 @@ public class StmtExecutor { if (statisticsForAuditLog.scan_rows == null) { statisticsForAuditLog.scan_rows = 0L; } + if (statisticsForAuditLog.cpu_ms == null) { + statisticsForAuditLog.cpu_ms = 0L; + } return statisticsForAuditLog; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java index 11bf6f202e..74216081aa 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java @@ -100,6 +100,7 @@ public class ConnectProcessorTest { statistics.scan_bytes = 0L; statistics.scan_rows = 0L; + statistics.cpu_ms = 0L; MetricRepo.init(); } diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto index 09af10c3c1..d380031dfb 100644 --- a/gensrc/proto/data.proto +++ b/gensrc/proto/data.proto @@ -24,6 +24,7 @@ message PQueryStatistics { optional int64 scan_rows = 1; optional int64 scan_bytes = 2; optional int64 returned_rows = 3; + optional int64 cpu_ms = 4; } message PRowBatch {