From d0fe7e4d944cecc888062b94e898ee0ca8152a77 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Thu, 30 Apr 2020 14:57:21 +0800 Subject: [PATCH] [Profile] Make running profile clearer and more intuitive to improve usability (#3405) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This CL mainly made the following modifications: 1. Delete Invalid MemoryUsed Counter and Add PeakMemUsage in each exec node and datastreamsender 2. Add intent in child execnode profile,make it is easily to know the relationship between execnode 3. Del _is_result_order we not support any more in olap_scan_node.h and olap_scan_node.cpp 4. Add scan_disk method to olap_scanner to fix the counter _num_disks_accessed_counter 5. Now we do not use buffer pool to read and write disk, so annotation eadio counter and 6. Delete the MemUsed counter in exec node. --- be/src/exec/aggregation_node.cpp | 6 +----- be/src/exec/csv_scan_node.cpp | 4 ---- be/src/exec/exec_node.cpp | 6 ++---- be/src/exec/hash_join_node.cpp | 4 ---- be/src/exec/mysql_scan_node.cpp | 3 --- be/src/exec/olap_scan_node.cpp | 14 ++++---------- be/src/exec/olap_scan_node.h | 3 --- be/src/exec/olap_scanner.h | 4 ++++ be/src/exec/scan_node.cpp | 3 ++- be/src/exec/scan_node.h | 5 ----- be/src/exec/schema_scan_node.cpp | 4 ---- be/src/exec/set_operation_node.cpp | 4 ---- be/src/exec/topn_node.cpp | 3 --- be/src/runtime/bufferpool/buffer_pool.cc | 6 ------ .../runtime/bufferpool/buffer_pool_counters.h | 18 ------------------ be/src/runtime/data_stream_sender.cpp | 2 +- 16 files changed, 14 insertions(+), 75 deletions(-) diff --git a/be/src/exec/aggregation_node.cpp b/be/src/exec/aggregation_node.cpp index 0d1806eaac..bea37b48e4 100644 --- a/be/src/exec/aggregation_node.cpp +++ b/be/src/exec/aggregation_node.cpp @@ -202,8 +202,6 @@ Status AggregationNode::open(RuntimeState* state) { RETURN_IF_ERROR(state->check_query_state("Aggregation, after hashing the child 0.")); COUNTER_SET(_hash_table_buckets_counter, _hash_tbl->num_buckets()); - COUNTER_SET(memory_used_counter(), - _tuple_pool->peak_allocated_bytes() + _hash_tbl->byte_size()); COUNTER_SET(_hash_table_load_factor_counter, _hash_tbl->load_factor()); num_agg_rows += (_hash_tbl->size() - agg_rows_before); num_input_rows += batch.num_rows(); @@ -276,10 +274,8 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* *eos = _output_iterator.at_end() || reached_limit(); if (*eos) { - if (memory_used_counter() != NULL && _hash_tbl.get() != NULL && + if (_hash_tbl.get() != NULL && _hash_table_buckets_counter != NULL) { - COUNTER_SET(memory_used_counter(), - _tuple_pool->peak_allocated_bytes() + _hash_tbl->byte_size()); COUNTER_SET(_hash_table_buckets_counter, _hash_tbl->num_buckets()); } } diff --git a/be/src/exec/csv_scan_node.cpp b/be/src/exec/csv_scan_node.cpp index 4eda57a15f..f8ed70bf87 100644 --- a/be/src/exec/csv_scan_node.cpp +++ b/be/src/exec/csv_scan_node.cpp @@ -340,10 +340,6 @@ Status CsvScanNode::close(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); - if (memory_used_counter() != nullptr && _tuple_pool.get() != nullptr) { - COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes()); - } - RETURN_IF_ERROR(ExecNode::close(state)); if (state->num_rows_load_success() == 0) { diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 4bde878ae2..9a6f208c9b 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -172,15 +172,13 @@ Status ExecNode::prepare(RuntimeState* state) { DCHECK(_runtime_profile.get() != NULL); _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT); - _memory_used_counter = - ADD_COUNTER(_runtime_profile, "MemoryUsed", TUnit::BYTES); _rows_returned_rate = runtime_profile()->add_derived_counter( ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND, boost::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, runtime_profile()->total_time_counter()), ""); - _mem_tracker.reset(new MemTracker(-1, _runtime_profile->name(), state->instance_mem_tracker())); + _mem_tracker.reset(new MemTracker(_runtime_profile.get(), -1, _runtime_profile->name(), state->instance_mem_tracker())); _expr_mem_tracker.reset(new MemTracker(-1, "Exprs", _mem_tracker.get())); _expr_mem_pool.reset(new MemPool(_expr_mem_tracker.get())); // TODO chenhao @@ -339,7 +337,7 @@ Status ExecNode::create_tree_helper( } if (!node->_children.empty()) { - node->runtime_profile()->add_child(node->_children[0]->runtime_profile(), false, NULL); + node->runtime_profile()->add_child(node->_children[0]->runtime_profile(), true, NULL); } return Status::OK(); diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index ba4393db90..3065cd8d6e 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -153,10 +153,6 @@ Status HashJoinNode::close(RuntimeState* state) { // Must reset _probe_batch in close() to release resources _probe_batch.reset(NULL); - if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) { - COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes()); - COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size()); - } if (_hash_tbl.get() != NULL) { _hash_tbl->close(); } diff --git a/be/src/exec/mysql_scan_node.cpp b/be/src/exec/mysql_scan_node.cpp index b7393e62f3..f211b07707 100644 --- a/be/src/exec/mysql_scan_node.cpp +++ b/be/src/exec/mysql_scan_node.cpp @@ -251,9 +251,6 @@ Status MysqlScanNode::close(RuntimeState* state) { RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); SCOPED_TIMER(_runtime_profile->total_time_counter()); - if (memory_used_counter() != NULL) { - COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes()); - } _tuple_pool.reset(); return ExecNode::close(state); diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 77b798be2d..8535bfec78 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -72,16 +72,6 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); _direct_conjunct_size = _conjunct_ctxs.size(); - if (tnode.olap_scan_node.__isset.sort_column) { - _is_result_order = true; - } else { - _is_result_order = false; - } - - // Before, we support scan data ordered, but is not used in production - // Now, we drop this functional - DCHECK(!_is_result_order) << "ordered result don't support any more"; - return Status::OK(); } @@ -668,6 +658,8 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { } int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); + + std::unordered_set disk_set; for (auto& scan_range : _scan_ranges) { std::vector>* ranges = &cond_ranges; std::vector> split_ranges; @@ -702,8 +694,10 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) { state, this, _olap_scan_node.is_preaggregation, _need_agg_finalize, *scan_range, scanner_ranges); _scanner_pool->add(scanner); _olap_scanners.push_back(scanner); + disk_set.insert(scanner->scan_disk()); } } + COUNTER_SET(_num_disks_accessed_counter, static_cast(disk_set.size())); // init progress std::stringstream ss; diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 884ee5d389..11333a3039 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -186,9 +186,6 @@ private: std::vector _olap_filter; - // Order Result Flag - bool _is_result_order; - // Pool for storing allocated scanner objects. We don't want to use the // runtime pool to ensure that the scanner objects are deleted before this // object is. diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h index f3e9f39403..75ee5422d3 100644 --- a/be/src/exec/olap_scanner.h +++ b/be/src/exec/olap_scanner.h @@ -81,6 +81,10 @@ public: int64_t raw_rows_read() const { return _raw_rows_read; } void update_counter(); + + const std::string& scan_disk() const { + return _tablet->data_dir()->path(); + } private: Status _prepare( const TPaloScanRange& scan_range, diff --git a/be/src/exec/scan_node.cpp b/be/src/exec/scan_node.cpp index be49d5d17c..6a9ae67b1a 100644 --- a/be/src/exec/scan_node.cpp +++ b/be/src/exec/scan_node.cpp @@ -28,7 +28,7 @@ const string ScanNode::_s_total_throughput_counter = "TotalReadThroughput"; const string ScanNode::_s_materialize_tuple_timer = "MaterializeTupleTime(*)"; const string ScanNode::_s_per_read_thread_throughput_counter = "PerReadThreadRawHdfsThroughput"; -const string ScanNode::_s_num_disks_accessed_counter = "NumThread"; +const string ScanNode::_s_num_disks_accessed_counter = "NumDiskAccess"; const string ScanNode::_s_scan_ranges_complete_counter = "ScanRangesComplete"; const string ScanNode::_s_scanner_thread_counters_prefix = "ScannerThreads"; const string ScanNode::_s_scanner_thread_total_wallclock_time = @@ -43,6 +43,7 @@ Status ScanNode::prepare(RuntimeState* state) { ADD_THREAD_COUNTERS(runtime_profile(), _s_scanner_thread_counters_prefix); _bytes_read_counter = ADD_COUNTER(runtime_profile(), _s_bytes_read_counter, TUnit::BYTES); + //TODO: The _rows_read_counter == RowsReturned counter in exec node, there is no need to keep both of them _rows_read_counter = ADD_COUNTER(runtime_profile(), _s_rows_read_counter, TUnit::UNIT); _read_timer = ADD_TIMER(runtime_profile(), _s_total_read_timer); diff --git a/be/src/exec/scan_node.h b/be/src/exec/scan_node.h index 8fb016e699..0ca7bd1ecd 100644 --- a/be/src/exec/scan_node.h +++ b/be/src/exec/scan_node.h @@ -52,11 +52,6 @@ class TScanRange; // // AverageScannerThreadConcurrency - the average number of active scanner threads. A // scanner thread is considered active if it is not blocked by IO. This number would -// be low (less than 1) for IO-bound queries. For cpu-bound queries, this number -// would be close to the max scanner threads allowed. -// -// AverageScannerThreadConcurrency - the average number of active scanner threads. A -// scanner thread is considered active if it is not blocked by IO. This number would // be low (less than 1) for IO bounded queries. For cpu bounded queries, this number // would be close to the max scanner threads allowed. // diff --git a/be/src/exec/schema_scan_node.cpp b/be/src/exec/schema_scan_node.cpp index 440838bf7e..a0521af94b 100644 --- a/be/src/exec/schema_scan_node.cpp +++ b/be/src/exec/schema_scan_node.cpp @@ -310,10 +310,6 @@ Status SchemaScanNode::close(RuntimeState* state) { RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE)); SCOPED_TIMER(_runtime_profile->total_time_counter()); - if (memory_used_counter() != NULL) { - COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes()); - } - _tuple_pool.reset(); return ExecNode::close(state); } diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp index 8321899212..627e139f38 100644 --- a/be/src/exec/set_operation_node.cpp +++ b/be/src/exec/set_operation_node.cpp @@ -75,10 +75,6 @@ Status SetOperationNode::close(RuntimeState* state) { // Must reset _probe_batch in close() to release resources _probe_batch.reset(NULL); - if (_memory_used_counter != NULL && _hash_tbl.get() != NULL) { - COUNTER_UPDATE(_memory_used_counter, _build_pool->peak_allocated_bytes()); - COUNTER_UPDATE(_memory_used_counter, _hash_tbl->byte_size()); - } if (_hash_tbl.get() != NULL) { _hash_tbl->close(); } diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp index e83346904e..304820f969 100644 --- a/be/src/exec/topn_node.cpp +++ b/be/src/exec/topn_node.cpp @@ -163,9 +163,6 @@ Status TopNNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { // block(s) in the pool are all full or when the pool has reached a certain size. if (*eos) { row_batch->tuple_data_pool()->acquire_data(_tuple_pool.get(), false); - if (memory_used_counter() != NULL) { - COUNTER_UPDATE(memory_used_counter(), _tuple_pool->peak_allocated_bytes()); - } } return Status::OK(); } diff --git a/be/src/runtime/bufferpool/buffer_pool.cc b/be/src/runtime/bufferpool/buffer_pool.cc index 04b2bba7d1..e66bcc232f 100644 --- a/be/src/runtime/bufferpool/buffer_pool.cc +++ b/be/src/runtime/bufferpool/buffer_pool.cc @@ -392,12 +392,6 @@ BufferPool::Client::Client(BufferPool* pool, //TmpFileMgr::FileGroup* file_group ADD_COUNTER(child_profile, "CumulativeAllocations", TUnit::UNIT); counters_.cumulative_bytes_alloced = ADD_COUNTER(child_profile, "CumulativeAllocationBytes", TUnit::BYTES); - counters_.read_wait_time = ADD_TIMER(child_profile, "ReadIoWaitTime"); - counters_.read_io_ops = ADD_COUNTER(child_profile, "ReadIoOps", TUnit::UNIT); - counters_.bytes_read = ADD_COUNTER(child_profile, "ReadIoBytes", TUnit::BYTES); - counters_.write_wait_time = ADD_TIMER(child_profile, "WriteIoWaitTime"); - counters_.write_io_ops = ADD_COUNTER(child_profile, "WriteIoOps", TUnit::UNIT); - counters_.bytes_written = ADD_COUNTER(child_profile, "WriteIoBytes", TUnit::BYTES); counters_.peak_unpinned_bytes = child_profile->AddHighWaterMarkCounter("PeakUnpinnedBytes", TUnit::BYTES); } diff --git a/be/src/runtime/bufferpool/buffer_pool_counters.h b/be/src/runtime/bufferpool/buffer_pool_counters.h index 85b29604a7..92a0691808 100644 --- a/be/src/runtime/bufferpool/buffer_pool_counters.h +++ b/be/src/runtime/bufferpool/buffer_pool_counters.h @@ -34,24 +34,6 @@ struct BufferPoolClientCounters { /// Bytes of buffers allocated via BufferAllocator::AllocateBuffer(). RuntimeProfile::Counter* cumulative_bytes_alloced; - /// Amount of time spent waiting for reads from disk to complete. - RuntimeProfile::Counter* read_wait_time; - - /// Total number of read I/O operations issued. - RuntimeProfile::Counter* read_io_ops; - - /// Total bytes read from disk. - RuntimeProfile::Counter* bytes_read; - - /// Amount of time spent waiting for writes to disk to complete. - RuntimeProfile::Counter* write_wait_time; - - /// Total number of write I/O operations issued. - RuntimeProfile::Counter* write_io_ops; - - /// Total bytes written to disk. - RuntimeProfile::Counter* bytes_written; - /// The peak total size of unpinned pages. RuntimeProfile::HighWaterMarkCounter* peak_unpinned_bytes; }; diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 82459a855e..3c98529b81 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -385,7 +385,7 @@ Status DataStreamSender::prepare(RuntimeState* state) { _profile = _pool->add(new RuntimeProfile(_pool, title.str())); SCOPED_TIMER(_profile->total_time_counter()); _mem_tracker.reset( - new MemTracker(-1, "DataStreamSender", state->instance_mem_tracker())); + new MemTracker(_profile, -1, "DataStreamSender", state->instance_mem_tracker())); if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) {