From 717285db1ee488e8731a168e58ca4c3f93bc5c00 Mon Sep 17 00:00:00 2001 From: chenhao <510341142@qq.com> Date: Fri, 18 Jan 2019 09:53:40 +0800 Subject: [PATCH] Remove unused code about showing current queries (#552) --- be/src/exec/analytic_eval_node.cpp | 3 - be/src/exec/analytic_eval_node.h | 2 - be/src/exec/exchange_node.cpp | 11 +- be/src/exec/exchange_node.h | 2 - be/src/exec/olap_scan_node.cpp | 2 - be/src/exec/sort_node.cpp | 2 - be/src/exec/sort_node.h | 2 - be/src/exec/union_node.cpp | 3 - be/src/exec/union_node.h | 2 - be/src/runtime/runtime_state.cpp | 1 - be/src/runtime/runtime_state.h | 15 - .../proc/CurrentQueryFragmentProcNode.java | 21 +- .../common/proc/CurrentQueryInfoProvider.java | 268 +++--------------- .../proc/CurrentQueryStatisticsProcDir.java | 17 +- .../common/util/QueryStatisticsFormatter.java | 39 +++ .../doris/common/util/RuntimeProfile.java | 4 +- .../org/apache/doris/qe/ConnectProcessor.java | 21 +- 17 files changed, 115 insertions(+), 300 deletions(-) create mode 100644 fe/src/main/java/org/apache/doris/common/util/QueryStatisticsFormatter.java diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp index 99aa5e8b51..c0b3e4d2e4 100644 --- a/be/src/exec/analytic_eval_node.cpp +++ b/be/src/exec/analytic_eval_node.cpp @@ -153,7 +153,6 @@ Status AnalyticEvalNode::prepare(RuntimeState* state) { _mem_pool.reset(new MemPool(mem_tracker())); _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime"); - _process_rows_counter = ADD_COUNTER(runtime_profile(), "ProcessRows", TUnit::UNIT); DCHECK_EQ(_result_tuple_desc->slots().size(), _evaluators.size()); for (int i = 0; i < _evaluators.size(); ++i) { @@ -236,7 +235,6 @@ Status AnalyticEvalNode::open(RuntimeState* state) { while (!_input_eos && _prev_input_row == NULL) { RETURN_IF_ERROR(child(0)->get_next(state, _curr_child_batch.get(), &_input_eos)); - COUNTER_UPDATE(_process_rows_counter, _curr_child_batch->num_rows()); if (_curr_child_batch->num_rows() > 0) { _prev_input_row = _curr_child_batch->get_row(0); process_child_batches(state); @@ -612,7 +610,6 @@ Status AnalyticEvalNode::process_child_batches(RuntimeState* state) { _prev_child_batch->reset(); _prev_child_batch.swap(_curr_child_batch); RETURN_IF_ERROR(child(0)->get_next(state, _curr_child_batch.get(), &_input_eos)); - COUNTER_UPDATE(_process_rows_counter, _curr_child_batch->num_rows()); } return Status::OK; diff --git a/be/src/exec/analytic_eval_node.h b/be/src/exec/analytic_eval_node.h index 6b92c37ea5..882b3d391c 100644 --- a/be/src/exec/analytic_eval_node.h +++ b/be/src/exec/analytic_eval_node.h @@ -327,8 +327,6 @@ private: // Time spent processing the child rows. RuntimeProfile::Counter* _evaluation_timer; - - RuntimeProfile::Counter* _process_rows_counter; }; } diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index ea9ef64d3d..f2c4624116 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -43,8 +43,7 @@ ExchangeNode::ExchangeNode( _next_row_idx(0), _is_merging(tnode.exchange_node.__isset.sort_info), _offset(tnode.exchange_node.__isset.offset ? tnode.exchange_node.offset : 0), - _num_rows_skipped(0), - _merge_rows_counter(nullptr) { + _num_rows_skipped(0) { DCHECK_GE(_offset, 0); DCHECK(_is_merging || (_offset == 0)); } @@ -64,7 +63,6 @@ Status ExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) { Status ExchangeNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); _convert_row_batch_timer = ADD_TIMER(runtime_profile(), "ConvertRowBatchTime"); - _merge_rows_counter = ADD_COUNTER(runtime_profile(), "MergeRows", TUnit::UNIT); // TODO: figure out appropriate buffer size DCHECK_GT(_num_senders, 0); _sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr()); @@ -120,10 +118,8 @@ Status ExchangeNode::fill_input_row_batch(RuntimeState* state) { DCHECK(!_is_merging); Status ret_status; { - state->set_query_state_for_wait(); // SCOPED_TIMER(state->total_network_receive_timer()); ret_status = _stream_recvr->get_batch(&_input_batch); - state->set_query_state_for_running(); } VLOG_FILE << "exch: has batch=" << (_input_batch == NULL ? "false" : "true") << " #rows=" << (_input_batch != NULL ? _input_batch->num_rows() : 0) @@ -218,11 +214,7 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, RowBatch* output_batc // RETURN_IF_ERROR(QueryMaintenance(state)); RETURN_IF_ERROR(state->check_query_state()); - state->set_query_state_for_wait(); RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos)); - state->set_query_state_for_running(); - //TODO chenhao, count only one instance lost others. - COUNTER_UPDATE(_merge_rows_counter, output_batch->num_rows()); while ((_num_rows_skipped < _offset)) { _num_rows_skipped += output_batch->num_rows(); // Throw away rows in the output batch until the offset is skipped. @@ -237,7 +229,6 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, RowBatch* output_batc break; } RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos)); - COUNTER_UPDATE(_merge_rows_counter, output_batch->num_rows()); } _num_rows_returned += output_batch->num_rows(); diff --git a/be/src/exec/exchange_node.h b/be/src/exec/exchange_node.h index e53742b30d..0911c0ea92 100644 --- a/be/src/exec/exchange_node.h +++ b/be/src/exec/exchange_node.h @@ -110,8 +110,6 @@ private: // Number of rows skipped so far. int64_t _num_rows_skipped; - RuntimeProfile::Counter* _merge_rows_counter; - // Sub plan query statistics receiver. It is shared with DataStreamRecvr and will be // called in two different threads. But their calls are all at different time, there is // no problem of multithreaded access. diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index cc9cd28698..d851ef1e87 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -243,7 +243,6 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo // wait for batch from queue RowBatch* materialized_batch = NULL; { - state->set_query_state_for_wait(); boost::unique_lock l(_row_batches_lock); while (_materialized_row_batches.empty() && !_transfer_done) { if (state->is_cancelled()) { @@ -258,7 +257,6 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo DCHECK(materialized_batch != NULL); _materialized_row_batches.pop_front(); } - state->set_query_state_for_running(); } // return batch diff --git a/be/src/exec/sort_node.cpp b/be/src/exec/sort_node.cpp index 3b816e9edc..fa84a47927 100644 --- a/be/src/exec/sort_node.cpp +++ b/be/src/exec/sort_node.cpp @@ -49,7 +49,6 @@ Status SortNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); RETURN_IF_ERROR(_sort_exec_exprs.prepare( state, child(0)->row_desc(), _row_descriptor, expr_mem_tracker())); - _sort_rows_counter = ADD_COUNTER(runtime_profile(), "SortRows", TUnit::UNIT); return Status::OK; } @@ -145,7 +144,6 @@ Status SortNode::sort_input(RuntimeState* state) { do { batch.reset(); RETURN_IF_ERROR(child(0)->get_next(state, &batch, &eos)); - COUNTER_UPDATE(_sort_rows_counter, batch.num_rows()); RETURN_IF_ERROR(_sorter->add_batch(&batch)); RETURN_IF_CANCELLED(state); RETURN_IF_LIMIT_EXCEEDED(state); diff --git a/be/src/exec/sort_node.h b/be/src/exec/sort_node.h index 68dcfd8edb..6ccff0978d 100644 --- a/be/src/exec/sort_node.h +++ b/be/src/exec/sort_node.h @@ -69,8 +69,6 @@ private: std::vector _is_asc_order; std::vector _nulls_first; boost::scoped_ptr _tuple_pool; - - RuntimeProfile::Counter* _sort_rows_counter; }; } diff --git a/be/src/exec/union_node.cpp b/be/src/exec/union_node.cpp index a1494cc1ef..b8714bec81 100644 --- a/be/src/exec/union_node.cpp +++ b/be/src/exec/union_node.cpp @@ -77,7 +77,6 @@ Status UnionNode::prepare(RuntimeState* state) { _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); DCHECK(_tuple_desc != nullptr); _codegend_union_materialize_batch_fns.resize(_child_expr_lists.size()); - _materialize_rows_counter = ADD_COUNTER(runtime_profile(), "MaterializeRows", TUnit::UNIT); // Prepare const expr lists. for (const vector& exprs : _const_expr_lists) { RETURN_IF_ERROR(Expr::prepare(exprs, state, row_desc(), expr_mem_tracker())); @@ -210,7 +209,6 @@ Status UnionNode::get_next_materialized(RuntimeState* state, RowBatch* row_batch // The first batch from each child is always fetched here. RETURN_IF_ERROR(child(_child_idx)->get_next( state, _child_batch.get(), &_child_eos)); - COUNTER_UPDATE(_materialize_rows_counter, _child_batch->num_rows()); } while (!row_batch->at_capacity()) { @@ -225,7 +223,6 @@ Status UnionNode::get_next_materialized(RuntimeState* state, RowBatch* row_batch // All batches except the first batch from each child are fetched here. RETURN_IF_ERROR(child(_child_idx)->get_next( state, _child_batch.get(), &_child_eos)); - COUNTER_UPDATE(_materialize_rows_counter, _child_batch->num_rows()); // If we fetched an empty batch, go back to the beginning of this while loop, and // try again. if (_child_batch->num_rows() == 0) continue; diff --git a/be/src/exec/union_node.h b/be/src/exec/union_node.h index 7daf2a623a..b12fb597a1 100644 --- a/be/src/exec/union_node.h +++ b/be/src/exec/union_node.h @@ -100,8 +100,6 @@ private: /// to -1 if no child needs to be closed. int _to_close_child_idx; - RuntimeProfile::Counter* _materialize_rows_counter; - /// END: Members that must be Reset() ///////////////////////////////////////// diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 1f6fde88eb..5accbebb72 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -65,7 +65,6 @@ RuntimeState::RuntimeState( _normal_row_number(0), _error_row_number(0), _error_log_file(nullptr), - _is_running(true), _instance_buffer_reservation(new ReservationTracker) { Status status = init(fragment_instance_id, query_options, now, exec_env); DCHECK(status.ok()); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index b58ef95493..e89cbaaa78 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -480,18 +480,6 @@ public: /// Helper to call QueryState::StartSpilling(). Status StartSpilling(MemTracker* mem_tracker); - void set_query_state_for_wait() { - _is_running = false; - } - - void set_query_state_for_running() { - _is_running = true; - } - - bool is_running() { - return _is_running; - } - private: // Allow TestEnv to set block_mgr manually for testing. friend class TestEnv; @@ -615,9 +603,6 @@ private: std::unique_ptr _error_hub; std::vector _tablet_commit_infos; - // state of execution - volatile bool _is_running; - //TODO chenhao , remove this to QueryState /// Pool of buffer reservations used to distribute initial reservations to operators /// in the query. Contains a ReservationTracker that is a child of diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java index 4136d756fc..2de259e2db 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java @@ -25,6 +25,7 @@ import org.apache.doris.qe.QueryStatisticsItem; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.common.util.QueryStatisticsFormatter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -70,17 +71,19 @@ public class CurrentQueryFragmentProcNode implements ProcNodeInterface { private ProcResult requestFragmentExecInfos() throws AnalysisException { final CurrentQueryInfoProvider provider = new CurrentQueryInfoProvider(); - final Collection instanceConsumptions - = provider.getQueryInstanceConsumption(item); + final Collection instanceStatisticsCollection + = provider.getInstanceStatistics(item); final List> sortedRowDatas = Lists.newArrayList(); - for (CurrentQueryInfoProvider.InstanceConsumption instanceConsumption : - instanceConsumptions) { + for (CurrentQueryInfoProvider.InstanceStatistics instanceStatistics : + instanceStatisticsCollection) { final List rowData = Lists.newArrayList(); - rowData.add(instanceConsumption.getFragmentId()); - rowData.add(instanceConsumption.getInstanceId().toString()); - rowData.add(instanceConsumption.getAddress().toString()); - rowData.add(instanceConsumption.getFormattingScanBytes()); - rowData.add(instanceConsumption.getFormattingProcessRows()); + rowData.add(instanceStatistics.getFragmentId()); + rowData.add(instanceStatistics.getInstanceId().toString()); + rowData.add(instanceStatistics.getAddress().toString()); + rowData.add(QueryStatisticsFormatter.getScanBytes( + instanceStatistics.getScanBytes())); + rowData.add(QueryStatisticsFormatter.getRowsReturned( + instanceStatistics.getRowsReturned())); sortedRowDatas.add(rowData); } diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java index 3cddd59a4e..eeb8811cc1 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java @@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** - * Provide running query's PlanNode informations, IO consumption and CPU consumption. + * Provide running query's statistics. */ public class CurrentQueryInfoProvider { private static final Logger LOG = LogManager.getLogger(CurrentQueryInfoProvider.class); @@ -55,57 +55,57 @@ public class CurrentQueryInfoProvider { /** * Firstly send request to trigger profile to report for specified query and wait a while, - * Secondly get Counters from Coordinator's RuntimeProfile and return query's consumption. + * Secondly get Counters from Coordinator's RuntimeProfile and return query's statistics. * * @param item * @return * @throws AnalysisException */ - public Consumption getQueryConsumption(QueryStatisticsItem item) throws AnalysisException { + public QueryStatistics getQueryStatistics(QueryStatisticsItem item) throws AnalysisException { triggerReportAndWait(item, getWaitingTimeForSingleQuery(), false); - return new Consumption(item.getQueryProfile()); + return new QueryStatistics(item.getQueryProfile()); } /** - * Same as getQueryConsumption, but this will cause BE to report all queries profile. + * Same as above, but this will cause BE to report all queries profile. * * @param items * @return * @throws AnalysisException */ - public Map getQueryConsumption(Collection items) + public Map getQueryStatistics(Collection items) throws AnalysisException { triggerReportAndWait(items, getWaitingTime(items.size()), true); - final Map queryConsumptions = Maps.newHashMap(); + final Map queryStatisticsMap = Maps.newHashMap(); for (QueryStatisticsItem item : items) { - queryConsumptions.put(item.getQueryId(), new Consumption(item.getQueryProfile())); + queryStatisticsMap.put(item.getQueryId(), new QueryStatistics(item.getQueryProfile())); } - return queryConsumptions; + return queryStatisticsMap; } /** - * Return query's instances consumption. + * Return query's instances statistics. * * @param item * @return * @throws AnalysisException */ - public Collection getQueryInstanceConsumption(QueryStatisticsItem item) throws AnalysisException { + public Collection getInstanceStatistics(QueryStatisticsItem item) throws AnalysisException { triggerReportAndWait(item, getWaitingTimeForSingleQuery(), false); final Map instanceProfiles = collectInstanceProfile(item.getQueryProfile()); - final List instanceConsumptions = Lists.newArrayList(); + final List instanceStatisticsList = Lists.newArrayList(); for (QueryStatisticsItem.FragmentInstanceInfo instanceInfo : item.getFragmentInstanceInfos()) { final RuntimeProfile instanceProfile = instanceProfiles.get(DebugUtil.printId(instanceInfo.getInstanceId())); Preconditions.checkNotNull(instanceProfile); - final InstanceConsumption consumption = - new InstanceConsumption( + final InstanceStatistics Statistics = + new InstanceStatistics( instanceInfo.getFragmentId(), instanceInfo.getInstanceId(), instanceInfo.getAddress(), instanceProfile); - instanceConsumptions.add(consumption); + instanceStatisticsList.add(Statistics); } - return instanceConsumptions; + return instanceStatisticsList; } /** @@ -237,7 +237,7 @@ public class CurrentQueryInfoProvider { errMsg = result.status.msgs.get(0); } throw new AnalysisException(reasonPrefix + " backend:" + pair.first.getAddress() - + " reason:" + errMsg); + + " reason:" + errMsg); } } catch (InterruptedException | ExecutionException | TimeoutException e) { LOG.warn(reasonPrefix + " reason:" + e.getCause()); @@ -265,122 +265,56 @@ public class CurrentQueryInfoProvider { } - public static class Consumption { - private final static String OLAP_SCAN_NODE = "OLAP_SCAN_NODE"; - private final static String HASH_JOIN_NODE = "HASH_JOIN_NODE"; - private final static String HASH_AGGREGATION_NODE = "AGGREGATION_NODE"; - private final static String SORT_NODE = "SORT_NODE"; - private final static String ANALYTIC_EVAL_NODE = "ANALYTIC_EVAL_NODE"; - private final static String UNION_NODE = "UNION_NODE"; - private final static String EXCHANGE_NODE = "EXCHANGE_NODE"; + public static class QueryStatistics { + final List> counterMaps; - protected final List calculators; - - public Consumption(RuntimeProfile profile) { - this.calculators = Lists.newArrayList(); - init(profile); + public QueryStatistics(RuntimeProfile profile) { + counterMaps = Lists.newArrayList(); + collectCounters(profile, counterMaps); } - private void init(RuntimeProfile profile) { - final List> olapScanCounters = Lists.newArrayList(); - collectNodeProfileCounters(profile, olapScanCounters, OLAP_SCAN_NODE); - calculators.add(new OlapScanNodeConsumptionCalculator(olapScanCounters)); - - final List> hashJoinCounters = Lists.newArrayList(); - collectNodeProfileCounters(profile, hashJoinCounters, HASH_JOIN_NODE); - calculators.add(new HashJoinConsumptionCalculator(hashJoinCounters)); - - final List> hashAggCounters = Lists.newArrayList(); - collectNodeProfileCounters(profile, hashAggCounters, HASH_AGGREGATION_NODE); - calculators.add(new HashAggConsumptionCalculator(hashAggCounters)); - - final List> sortCounters = Lists.newArrayList(); - collectNodeProfileCounters(profile, sortCounters, SORT_NODE); - calculators.add(new SortConsumptionCalculator(sortCounters)); - - final List> windowsCounters = Lists.newArrayList(); - collectNodeProfileCounters(profile, windowsCounters, ANALYTIC_EVAL_NODE); - calculators.add(new WindowsConsumptionCalculator(windowsCounters)); - - final List> unionCounters = Lists.newArrayList(); - collectNodeProfileCounters(profile, unionCounters, UNION_NODE); - calculators.add(new UnionConsumptionCalculator(unionCounters)); - - final List> exchangeCounters = Lists.newArrayList(); - collectNodeProfileCounters(profile, exchangeCounters, EXCHANGE_NODE); - calculators.add(new ExchangeConsumptionCalculator(exchangeCounters)); - } - - private void collectNodeProfileCounters(RuntimeProfile profile, - List> counterMaps, String name) { + private void collectCounters(RuntimeProfile profile, + List> counterMaps) { for (Map.Entry entry : profile.getChildMap().entrySet()) { - if (name.equals(parsePossibleExecNodeName(entry.getKey()))) { - counterMaps.add(entry.getValue().getCounterMap()); - } - collectNodeProfileCounters(entry.getValue(), counterMaps, name); + counterMaps.add(entry.getValue().getCounterMap()); + collectCounters(entry.getValue(), counterMaps); } } - /** - * ExecNode's RuntimeProfile name is "$node_type_name (id=?)" - * @param str - * @return - */ - private String parsePossibleExecNodeName(String str) { - final String[] elements = str.split(" "); - if (elements.length == 2) { - return elements[0]; - } else { - return ""; + public long getScanBytes() { + long scanBytes = 0; + for (Map counters : counterMaps) { + final Counter counter = counters.get("CompressedBytesRead"); + scanBytes += counter == null ? 0 : counter.getValue(); } + return scanBytes; } - private long getTotalCpuConsumption() { - long cpu = 0; - for (ConsumptionCalculator consumption : calculators) { - cpu += consumption.getProcessRows(); + public long getRowsReturned() { + long rowsReturned = 0; + for (Map counters : counterMaps) { + final Counter counter = counters.get("RowsReturned"); + rowsReturned += counter == null ? 0 : counter.getValue(); } - return cpu; - } - - private long getTotalIoConsumption() { - long io = 0; - for (ConsumptionCalculator consumption : calculators) { - io += consumption.getScanBytes(); - } - return io; - } - - public String getFormattingProcessRows() { - final StringBuilder builder = new StringBuilder(); - builder.append(getTotalCpuConsumption()).append(" Rows"); - return builder.toString(); - } - - public String getFormattingScanBytes() { - final Pair pair = DebugUtil.getByteUint(getTotalIoConsumption()); - final Formatter fmt = new Formatter(); - final StringBuilder builder = new StringBuilder(); - builder.append(fmt.format("%.2f", pair.first)).append(" ").append(pair.second); - return builder.toString(); + return rowsReturned; } } - public static class InstanceConsumption extends Consumption { + public static class InstanceStatistics { private final String fragmentId; private final TUniqueId instanceId; private final TNetworkAddress address; + private final QueryStatistics statistics; - public InstanceConsumption( + public InstanceStatistics( String fragmentId, TUniqueId instanceId, TNetworkAddress address, RuntimeProfile profile) { - super(profile); this.fragmentId = fragmentId; this.instanceId = instanceId; this.address = address; - + this.statistics = new QueryStatistics(profile); } public String getFragmentId() { @@ -394,125 +328,13 @@ public class CurrentQueryInfoProvider { public TNetworkAddress getAddress() { return address; } - } - private static abstract class ConsumptionCalculator { - protected final List> counterMaps; - - public ConsumptionCalculator(List> counterMaps) { - this.counterMaps = counterMaps; - } - - public long getProcessRows() { - long cpu = 0; - for (Map counters : counterMaps) { - cpu += getProcessRows(counters); - } - return cpu; + public long getRowsReturned() { + return statistics.getRowsReturned(); } public long getScanBytes() { - long io = 0; - for (Map counters : counterMaps) { - io += getScanBytes(counters); - } - return io; - } - - protected long getProcessRows(Map counters) { - return 0; - } - - protected long getScanBytes(Map counters) { - return 0; - } - } - - private static class OlapScanNodeConsumptionCalculator extends ConsumptionCalculator { - public OlapScanNodeConsumptionCalculator(List> counterMaps) { - super(counterMaps); - } - - @Override - protected long getScanBytes(Map counters) { - final Counter counter = counters.get("CompressedBytesRead"); - return counter == null ? 0 : counter.getValue(); - } - } - - private static class HashJoinConsumptionCalculator extends ConsumptionCalculator { - public HashJoinConsumptionCalculator(List> counterMaps) { - super(counterMaps); - } - - @Override - protected long getProcessRows(Map counters) { - final Counter probeCounter = counters.get("ProbeRows"); - final Counter buildCounter = counters.get("BuildRows"); - return probeCounter == null || buildCounter == null ? - 0 : probeCounter.getValue() + buildCounter.getValue(); - } - } - - private static class HashAggConsumptionCalculator extends ConsumptionCalculator { - public HashAggConsumptionCalculator(List> counterMaps) { - super(counterMaps); - } - - @Override - protected long getProcessRows(Map counters) { - final Counter buildCounter = counters.get("BuildRows"); - return buildCounter == null ? 0 : buildCounter.getValue(); - } - } - - private static class SortConsumptionCalculator extends ConsumptionCalculator { - public SortConsumptionCalculator(List> counterMaps) { - super(counterMaps); - } - - @Override - protected long getProcessRows(Map counters) { - final Counter sortRowsCounter = counters.get("SortRows"); - return sortRowsCounter == null ? 0 : sortRowsCounter.getValue(); - } - } - - private static class WindowsConsumptionCalculator extends ConsumptionCalculator { - public WindowsConsumptionCalculator(List> counterMaps) { - super(counterMaps); - } - - @Override - protected long getProcessRows(Map counters) { - final Counter processRowsCounter = counters.get("ProcessRows"); - return processRowsCounter == null ? 0 : processRowsCounter.getValue(); - - } - } - - private static class UnionConsumptionCalculator extends ConsumptionCalculator { - public UnionConsumptionCalculator(List> counterMaps) { - super(counterMaps); - } - - @Override - protected long getProcessRows(Map counters) { - final Counter materializeRowsCounter = counters.get("MaterializeRows"); - return materializeRowsCounter == null ? 0 : materializeRowsCounter.getValue(); - } - } - - private static class ExchangeConsumptionCalculator extends ConsumptionCalculator { - - public ExchangeConsumptionCalculator(List> counterMaps) { - super(counterMaps); - } - - @Override - protected long getProcessRows(Map counters) { - final Counter mergeRowsCounter = counters.get("MergeRows"); - return mergeRowsCounter == null ? 0 : mergeRowsCounter.getValue(); + return statistics.getScanBytes(); } } diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java index 02257be567..61295a404d 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java @@ -21,6 +21,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.QueryStatisticsFormatter; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QueryStatisticsItem; @@ -62,22 +63,26 @@ public class CurrentQueryStatisticsProcDir implements ProcDirInterface { @Override public ProcResult fetchResult() throws AnalysisException { final BaseProcResult result = new BaseProcResult(); - final Map statistic = QeProcessorImpl.INSTANCE.getQueryStatistics(); + final Map statistic = + QeProcessorImpl.INSTANCE.getQueryStatistics(); result.setNames(TITLE_NAMES.asList()); final List> sortedRowData = Lists.newArrayList(); final CurrentQueryInfoProvider provider = new CurrentQueryInfoProvider(); - final Map consumptions - = provider.getQueryConsumption(statistic.values()); + final Map statisticsMap + = provider.getQueryStatistics(statistic.values()); for (QueryStatisticsItem item : statistic.values()) { final List values = Lists.newArrayList(); values.add(item.getConnId()); values.add(item.getQueryId()); values.add(item.getDb()); values.add(item.getUser()); - final CurrentQueryInfoProvider.Consumption consumption = consumptions.get(item.getQueryId()); - values.add(consumption.getFormattingScanBytes()); - values.add(consumption.getFormattingProcessRows()); + final CurrentQueryInfoProvider.QueryStatistics statistics + = statisticsMap.get(item.getQueryId()); + values.add(QueryStatisticsFormatter.getScanBytes( + statistics.getScanBytes())); + values.add(QueryStatisticsFormatter.getRowsReturned( + statistics.getRowsReturned())); values.add(item.getQueryExecTime()); sortedRowData.add(values); } diff --git a/fe/src/main/java/org/apache/doris/common/util/QueryStatisticsFormatter.java b/fe/src/main/java/org/apache/doris/common/util/QueryStatisticsFormatter.java new file mode 100644 index 0000000000..3914b7e09e --- /dev/null +++ b/fe/src/main/java/org/apache/doris/common/util/QueryStatisticsFormatter.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.common.Pair; + +import java.util.Formatter; + +public class QueryStatisticsFormatter { + + public static String getScanBytes(long scanBytes) { + final Pair pair = DebugUtil.getByteUint(scanBytes); + final Formatter fmt = new Formatter(); + final StringBuilder builder = new StringBuilder(); + builder.append(fmt.format("%.2f", pair.first)).append(" ").append(pair.second); + return builder.toString(); + } + + public static String getRowsReturned(long rowsReturned) { + final StringBuilder builder = new StringBuilder(); + builder.append(rowsReturned).append(" Rows"); + return builder.toString(); + } +} diff --git a/fe/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index f5f359f917..b237767bfa 100644 --- a/fe/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -53,11 +53,11 @@ public class RuntimeProfile { private Map infoStrings = Maps.newHashMap(); private List infoStringsDisplayOrder = Lists.newArrayList(); - // It will be hold by other thread. + // These will be hold by other thread. private Map counterMap = Maps.newConcurrentMap(); + private Map childMap = Maps.newConcurrentMap(); private Map > childCounterMap = Maps.newHashMap(); - private Map childMap = Maps.newHashMap(); private List> childList = Lists.newArrayList(); private String name; diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java index ffa52f7abd..1c302dc207 100644 --- a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -29,6 +29,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.Pair; +import org.apache.doris.common.util.QueryStatisticsFormatter; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.MysqlChannel; @@ -97,20 +98,6 @@ public class ConnectProcessor { ctx.getState().setOk(); } - private String getFormattingScanRows(PQueryStatistics statistics) { - final StringBuilder builder = new StringBuilder(); - builder.append(statistics.scanRows).append(" Rows"); - return builder.toString(); - } - - private String getFormattingScanBytes(PQueryStatistics statistics) { - final Pair pair = DebugUtil.getByteUint(statistics.scanBytes); - final Formatter fmt = new Formatter(); - final StringBuilder builder = new StringBuilder(); - builder.append(fmt.format("%.2f", pair.first)).append(" ").append(pair.second); - return builder.toString(); - } - private void auditAfterExec(String origStmt, StatementBase parsedStmt, PQueryStatistics statistics) { // slow query @@ -119,8 +106,10 @@ public class ConnectProcessor { ctx.getAuditBuilder().put("state", ctx.getState()); ctx.getAuditBuilder().put("time", elapseMs); Preconditions.checkNotNull(statistics); - ctx.getAuditBuilder().put("ScanRows", getFormattingScanRows(statistics)); - ctx.getAuditBuilder().put("ScanRawData", getFormattingScanBytes(statistics)); + ctx.getAuditBuilder().put("ScanRows", + QueryStatisticsFormatter.getScanBytes((statistics.scanRows))); + ctx.getAuditBuilder().put("ScanRawData", + QueryStatisticsFormatter.getRowsReturned(statistics.scanBytes)); ctx.getAuditBuilder().put("returnRows", ctx.getReturnRows()); ctx.getAuditBuilder().put("stmt_id", ctx.getStmtId());