diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index 2064bf62e3..3048b95fe6 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -24,6 +24,7 @@ #include "common/status.h" #include "gen_cpp/DataSinks_types.h" #include "gen_cpp/Exprs_types.h" +#include "runtime/exec_node_consumption_provider.h" #include "runtime/mem_tracker.h" namespace doris { @@ -78,11 +79,17 @@ public: // Returns the runtime profile for the sink. virtual RuntimeProfile* profile() = 0; + void set_query_consumption(const ExecNodeConsumptionProvider::Consumption& consumption) { + _query_consumption = consumption; + } + protected: // Set to true after close() has been called. subclasses should check and set this in // close(). bool _closed; std::unique_ptr _expr_mem_tracker; + + ExecNodeConsumptionProvider::Consumption _query_consumption; }; } // namespace doris diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index 4c7828c309..52fbe98e76 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -131,6 +131,7 @@ Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* if (reached_limit()) { _stream_recvr->transfer_all_resources(output_batch); + set_runtime_consumption(state); *eos = true; return Status::OK; } else { @@ -179,6 +180,7 @@ Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* if (reached_limit()) { _stream_recvr->transfer_all_resources(output_batch); + set_runtime_consumption(state); *eos = true; return Status::OK; } @@ -197,6 +199,7 @@ Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* RETURN_IF_ERROR(fill_input_row_batch(state)); *eos = (_input_batch == NULL); if (*eos) { + set_runtime_consumption(state); return Status::OK; } @@ -243,6 +246,7 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, RowBatch* output_batc // by the merger to the output batch. if (*eos) { _stream_recvr->transfer_all_resources(output_batch); + set_runtime_consumption(state); } COUNTER_SET(_rows_returned_counter, _num_rows_returned); diff --git a/be/src/exec/exchange_node.h b/be/src/exec/exchange_node.h index 6450d39f91..b800e71f6d 100644 --- a/be/src/exec/exchange_node.h +++ b/be/src/exec/exchange_node.h @@ -21,11 +21,12 @@ #include #include "exec/exec_node.h" #include "exec/sort_exec_exprs.h" +#include "runtime/data_stream_recvr.h" +#include "runtime/exec_node_consumption_provider.h" namespace doris { class RowBatch; -class DataStreamRecvr; class RuntimeProfile; // Receiver node for data streams. The data stream receiver is created in Prepare() @@ -61,6 +62,12 @@ protected: virtual void debug_string(int indentation_level, std::stringstream* out) const; private: + + void set_runtime_consumption(RuntimeState* state) { + ExecNodeConsumptionProvider::Consumption consumption = _stream_recvr->get_sub_plan_consumption(); + state->add_sub_plan_consumption(consumption); + } + // Implements GetNext() for the case where _is_merging is true. Delegates the GetNext() // call to the underlying DataStreamRecvr. Status get_next_merging(RuntimeState* state, RowBatch* output_batch, bool* eos); diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index f4486dea08..d79b1794a6 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -31,9 +31,13 @@ void GetResultBatchCtx::on_failure(const Status& status) { delete this; } -void GetResultBatchCtx::on_close(int64_t packet_seq) { +void GetResultBatchCtx::on_close(int64_t packet_seq, + ExecNodeConsumptionProvider::Consumption* consumption) { Status status; status.to_protobuf(result->mutable_status()); + if (consumption != nullptr) { + consumption->serialize(result->mutable_query_consumption()); + } result->set_packet_seq(packet_seq); result->set_eos(true); done->Run(); @@ -183,7 +187,7 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { return; } if (_is_close) { - ctx->on_close(_packet_num); + ctx->on_close(_packet_num, &_consumption); return; } // no ready data, push ctx to waiting list @@ -200,7 +204,7 @@ Status BufferControlBlock::close(Status exec_status) { if (!_waiting_rpc.empty()) { if (_status.ok()) { for (auto& ctx : _waiting_rpc) { - ctx->on_close(_packet_num); + ctx->on_close(_packet_num, &_consumption); } } else { for (auto& ctx : _waiting_rpc) { diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 701ef5f70a..c02fa882b5 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -24,6 +24,7 @@ #include #include "common/status.h" #include "gen_cpp/Types_types.h" +#include "runtime/exec_node_consumption_provider.h" namespace google { namespace protobuf { @@ -52,7 +53,7 @@ struct GetResultBatchCtx { } void on_failure(const Status& status); - void on_close(int64_t packet_seq); + void on_close(int64_t packet_seq, ExecNodeConsumptionProvider::Consumption* consumption = nullptr); void on_data(TFetchDataResult* t_result, int64_t packet_seq, bool eos = false); }; @@ -80,6 +81,9 @@ public: return _fragment_id; } + void set_query_consumption(const ExecNodeConsumptionProvider::Consumption& consumption) { + _consumption = consumption; + } private: typedef std::list ResultQueue; @@ -100,8 +104,10 @@ private: boost::condition_variable _data_arriaval; // signal removal of data by stream consumer boost::condition_variable _data_removal; - + std::deque _waiting_rpc; + + ExecNodeConsumptionProvider::Consumption _consumption; }; } diff --git a/be/src/runtime/data_stream_mgr.cpp b/be/src/runtime/data_stream_mgr.cpp index 0c75fbb04a..dee0c938b4 100644 --- a/be/src/runtime/data_stream_mgr.cpp +++ b/be/src/runtime/data_stream_mgr.cpp @@ -121,7 +121,8 @@ Status DataStreamMgr::add_data( Status DataStreamMgr::close_sender(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int sender_id, - int be_number) { + int be_number, + const PQueryConsumption& consumption) { VLOG_FILE << "close_sender(): fragment_instance_id=" << fragment_instance_id << ", node=" << dest_node_id; shared_ptr recvr = find_recvr(fragment_instance_id, dest_node_id); @@ -135,6 +136,7 @@ Status DataStreamMgr::close_sender(const TUniqueId& fragment_instance_id, // errors from receiver-initiated teardowns. return Status::OK; } + recvr->add_sub_plan_consumption(consumption); recvr->remove_sender(sender_id, be_number); return Status::OK; } diff --git a/be/src/runtime/data_stream_mgr.h b/be/src/runtime/data_stream_mgr.h index a4880b65e1..ca94dc6ccd 100644 --- a/be/src/runtime/data_stream_mgr.h +++ b/be/src/runtime/data_stream_mgr.h @@ -46,6 +46,7 @@ class DataStreamRecvr; class RowBatch; class RuntimeState; class PRowBatch; +class PQueryConsumption; class PUniqueId; // Singleton class which manages all incoming data streams at a backend node. It @@ -96,7 +97,7 @@ public: // sender has closed. // Returns OK if successful, error status otherwise. Status close_sender(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, - int sender_id, int be_number); + int sender_id, int be_number, const PQueryConsumption& consumption); // Closes all receivers registered for fragment_instance_id immediately. void cancel(const TUniqueId& fragment_instance_id); diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index f007e6f67b..84f083dee1 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -242,6 +242,7 @@ void DataStreamRecvr::SenderQueue::add_batch( // it in this thread. batch = new RowBatch(_recvr->row_desc(), pb_batch, _recvr->mem_tracker()); } + VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size << "\n"; _batch_queue.emplace_back(batch_size, batch); @@ -433,4 +434,9 @@ Status DataStreamRecvr::get_batch(RowBatch** next_batch) { return _sender_queues[0]->get_batch(next_batch); } +void DataStreamRecvr::add_sub_plan_consumption(const PQueryConsumption& p_consumption) { + ExecNodeConsumptionProvider::Consumption consumption; + consumption.deserialize(p_consumption); + _sub_plan_consumption.add(consumption); +} } diff --git a/be/src/runtime/data_stream_recvr.h b/be/src/runtime/data_stream_recvr.h index 22bca1cecd..7b897f6e7f 100644 --- a/be/src/runtime/data_stream_recvr.h +++ b/be/src/runtime/data_stream_recvr.h @@ -24,6 +24,7 @@ #include "common/object_pool.h" #include "common/status.h" #include "gen_cpp/Types_types.h" // for TUniqueId +#include "runtime/exec_node_consumption_provider.h" #include "runtime/descriptors.h" #include "util/tuple_row_compare.h" @@ -99,6 +100,11 @@ public: const RowDescriptor& row_desc() const { return _row_desc; } MemTracker* mem_tracker() const { return _mem_tracker.get(); } + void add_sub_plan_consumption(const PQueryConsumption& p_consumption); + + ExecNodeConsumptionProvider::Consumption get_sub_plan_consumption() { + return _sub_plan_consumption; + } private: friend class DataStreamMgr; class SenderQueue; @@ -194,6 +200,7 @@ private: // Wall time senders spend waiting for the recv buffer to have capacity. RuntimeProfile::Counter* _buffer_full_wall_timer; + ExecNodeConsumptionProvider::Consumption _sub_plan_consumption; // Total time spent waiting for data to arrive in the recv buffer // RuntimeProfile::Counter* _data_arrival_timer; }; diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index d0c60960fc..c319692f30 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -71,7 +71,7 @@ public: Channel(DataStreamSender* parent, const RowDescriptor& row_desc, const TNetworkAddress& brpc_dest, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int buffer_size) : + PlanNodeId dest_node_id, int buffer_size, bool is_transfer_chain) : _parent(parent), _buffer_size(buffer_size), _row_desc(row_desc), @@ -80,7 +80,8 @@ public: _num_data_bytes_sent(0), _packet_seq(0), _need_close(false), - _brpc_dest_addr(brpc_dest) { + _brpc_dest_addr(brpc_dest), + _is_transfer_chain(is_transfer_chain) { } virtual ~Channel() { @@ -163,6 +164,8 @@ private: palo::PInternalService_Stub* _brpc_stub = nullptr; RefCountClosure* _closure = nullptr; int32_t _brpc_timeout_ms = 500; + // whether the dest can be treated as consumption transfer chain. + bool _is_transfer_chain; }; Status DataStreamSender::Channel::init(RuntimeState* state) { @@ -203,6 +206,10 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { } VLOG_ROW << "Channel::send_batch() instance_id=" << _fragment_instance_id << " dest_node=" << _dest_node_id; + if (eos && _is_transfer_chain) { + auto consumption = _brpc_request.mutable_query_consumption(); + _parent->_query_consumption.serialize(consumption); + } _brpc_request.set_eos(eos); if (batch != nullptr) { @@ -305,11 +312,16 @@ DataStreamSender::DataStreamSender( || sink.output_partition.type == TPartitionType::RANGE_PARTITIONED); // TODO: use something like google3's linked_ptr here (scoped_ptr isn't copyable) for (int i = 0; i < destinations.size(); ++i) { + bool is_transfer_chain = false; + if (destinations[i].__isset.is_transfer_chain) { + is_transfer_chain = destinations[i].is_transfer_chain; + } _channel_shared_ptrs.emplace_back( new Channel(this, row_desc, destinations[i].brpc_server, destinations[i].fragment_instance_id, - sink.dest_node_id, per_channel_buffer_size)); + sink.dest_node_id, per_channel_buffer_size, + is_transfer_chain)); _channels.push_back(_channel_shared_ptrs[i].get()); } } diff --git a/be/src/runtime/exec_node_consumption_provider.h b/be/src/runtime/exec_node_consumption_provider.h new file mode 100644 index 0000000000..a8aae48381 --- /dev/null +++ b/be/src/runtime/exec_node_consumption_provider.h @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_RUNTIME_EXEC_NODE_CONSUMPTION_PROVIDER_H +#define DORIS_BE_RUNTIME_EXEC_NODE_CONSUMPTION_PROVIDER_H + +#include "util/runtime_profile.h" +#include "util/string_util.h" +#include "gen_cpp/data.pb.h" +#include "gen_cpp/PlanNodes_types.h" + +namespace doris { + +// Generate ExecNode resource Consumption with RuntimeProfile. CPU +// consumption is measured by the number of rows processed and IO +// consumption is measured by the size of the scans. +class ExecNodeConsumptionProvider { +public: + + ExecNodeConsumptionProvider() { + init(); + } + + class Consumption { + public: + Consumption() : cpu(0), io(0) { + } + + void add(const Consumption& other) { + cpu.add(other.cpu); + io.add(other.io); + } + + void serialize(PQueryConsumption* consumption) { + DCHECK(consumption != nullptr); + consumption->set_cpu(cpu.load()); + consumption->set_io(io.load()); + } + + void deserialize(const PQueryConsumption& consumption) { + cpu.store(consumption.cpu()); + io.store(consumption.io()); + } + + int64_t get_cpu() { + return cpu.load(); + } + + int64_t get_io() { + return io.load(); + } + + void set(int64_t cpu, int64_t io) { + this->cpu.store(cpu); + this->io.store(io); + } + + Consumption& operator=(const Consumption& other) { + if (this != &other) { + set(other.cpu, other.io); + } + return *this; + } + private: + AtomicInt64 cpu; + AtomicInt64 io; + }; + + Consumption get_consumption(RuntimeProfile* profile) { + Consumption total_consumption; + std::vector all_profiles; + profile->get_all_children(&all_profiles); + for (auto profile : all_profiles) { + // ExecNode's RuntimeProfile name is "$ExecNode_type_name (id=?)" + std::vector elements; + boost::split(elements, profile->name(), boost::is_any_of(" "), boost::token_compress_off); + Consumption consumption; + bool has = get_consumption(profile, &consumption, elements[0]); + if (elements.size() == 2 && has) { + total_consumption.add(consumption); + } + } + return total_consumption; + } + +private: + + void init() { + functions["OLAP_SCAN_NODE"] = get_olap_scan_consumption; + functions["HASH_JOIN_NODE"] = get_hash_join_consumption; + functions["AGGREGATION_NODE"] = get_hash_agg_consumption; + functions["SORT_NODE"] = get_sort_consumption; + functions["ANALYTIC_EVAL_NODE"] = get_windows_consumption; + functions["UNION_NODE"] = get_union_consumption; + functions["EXCHANGE_NODE"] = get_exchange_consumption; + } + + bool get_consumption(RuntimeProfile* profile, Consumption* consumption, const std::string& name) { + ConsumptionFunc get_consumption_func = functions[name]; + if (get_consumption_func != nullptr) { + get_consumption_func(profile, consumption); + return true; + } + return false; + } + + static void get_olap_scan_consumption(RuntimeProfile* profile, Consumption* consumption) { + RuntimeProfile::Counter* read_compressed_counter = profile->get_counter("CompressedBytesRead"); + consumption->set(0, read_compressed_counter->value()); + } + + static void get_hash_join_consumption(RuntimeProfile* profile, Consumption* consumption) { + RuntimeProfile::Counter* probe_counter = profile->get_counter("ProbeRows"); + RuntimeProfile::Counter* build_counter = profile->get_counter("BuildRows"); + consumption->set(probe_counter->value() + build_counter->value(), 0); + } + + static void get_hash_agg_consumption(RuntimeProfile* profile, Consumption* consumption) { + RuntimeProfile::Counter* build_counter = profile->get_counter("BuildRows"); + consumption->set(build_counter->value(), 0); + } + + static void get_sort_consumption(RuntimeProfile* profile, Consumption* consumption) { + RuntimeProfile::Counter* sort_counter = profile->get_counter("SortRows"); + consumption->set(sort_counter->value(), 0); + } + + static void get_windows_consumption(RuntimeProfile* profile, Consumption* consumption) { + RuntimeProfile::Counter* process_counter = profile->get_counter("ProcessRows"); + consumption->set(process_counter->value(), 0); + } + + static void get_union_consumption(RuntimeProfile* profile, Consumption* consumption) { + RuntimeProfile::Counter* materialize_counter = profile->get_counter("MaterializeRows"); + consumption->set(materialize_counter->value(), 0); + } + + static void get_exchange_consumption(RuntimeProfile* profile, Consumption* consumption) { + RuntimeProfile::Counter* merge_counter = profile->get_counter("MergeRows"); + // exchange merge sort + if (merge_counter != nullptr) { + consumption->set(merge_counter->value(), 0); + } + } + + typedef std::function ConsumptionFunc; + // ExecNode type name to function + std::map functions; +}; + +} + +#endif diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 1e769ee6df..db62400111 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -333,6 +333,8 @@ Status PlanFragmentExecutor::open_internal() { // audit the sinks to check that this is ok, or change that behaviour. { SCOPED_TIMER(profile()->total_time_counter()); + ExecNodeConsumptionProvider::Consumption consumption = runtime_state()->get_consumption(); + _sink->set_query_consumption(consumption); Status status = _sink->close(runtime_state(), _status); RETURN_IF_ERROR(status); } diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp index b5eb2c978b..1b08ec32be 100644 --- a/be/src/runtime/result_sink.cpp +++ b/be/src/runtime/result_sink.cpp @@ -82,6 +82,8 @@ Status ResultSink::close(RuntimeState* state, Status exec_status) { } // close sender, this is normal path end if (_sender) { + // In the last, send consumption of execnode. + _sender->set_query_consumption(_query_consumption); _sender->close(exec_status); } state->exec_env()->result_mgr()->cancel_at_time(time(NULL) + config::result_buffer_cancelled_interval_time, diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index b58ef95493..a6cb40be9f 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -33,6 +33,7 @@ #include "common/global_types.h" #include "util/logging.h" +#include "runtime/exec_node_consumption_provider.h" #include "runtime/mem_pool.h" #include "runtime/thread_resource_mgr.h" #include "gen_cpp/Types_types.h" // for TUniqueId @@ -492,6 +493,18 @@ public: return _is_running; } + void add_sub_plan_consumption(const ExecNodeConsumptionProvider::Consumption& consumption) { + _sub_plan_consumption.add(consumption); + } + + ExecNodeConsumptionProvider::Consumption get_consumption() { + ExecNodeConsumptionProvider provider; + ExecNodeConsumptionProvider::Consumption total_consumption; + total_consumption = provider.get_consumption(&_profile); + total_consumption.add(_sub_plan_consumption); + return total_consumption; + } + private: // Allow TestEnv to set block_mgr manually for testing. friend class TestEnv; @@ -638,6 +651,9 @@ private: /// TODO: not needed if we call ReleaseResources() in a timely manner (IMPALA-1575). AtomicInt32 _initial_reservation_refcnt; + // Consumption from sub plan, it should only be updated by ExchangeNode. + ExecNodeConsumptionProvider::Consumption _sub_plan_consumption; + // prohibit copies RuntimeState(const RuntimeState&); }; diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 056495a281..496fa89131 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -130,14 +130,14 @@ void BackendService::transmit_data(TTransmitDataResult& return_val, } if (params.eos) { - Status status = _exec_env->stream_mgr()->close_sender( - params.dest_fragment_instance_id, - params.dest_node_id, - params.sender_id, - params.be_number); - VLOG_ROW << "params.eos: " << (params.eos ? "true" : "false") - << " close_sender status: " << status.get_error_msg(); - status.set_t_status(&return_val); + // Status status = _exec_env->stream_mgr()->close_sender( + // params.dest_fragment_instance_id, + // params.dest_node_id, + // params.sender_id, + // params.be_number); + //VLOG_ROW << "params.eos: " << (params.eos ? "true" : "false") + // << " close_sender status: " << status.get_error_msg(); + //status.set_t_status(&return_val); } } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 2f4b3f347a..e5375a29d9 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -60,7 +60,7 @@ void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cnt finst_id.__set_lo(request->finst_id().lo()); _exec_env->stream_mgr()->close_sender( finst_id, request->node_id(), - request->sender_id(), request->be_number()); + request->sender_id(), request->be_number(), request->query_consumption()); } if (done != nullptr) { done->Run(); diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java index 21ffef73fa..88cdbed3f0 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java @@ -79,8 +79,8 @@ public class CurrentQueryFragmentProcNode implements ProcNodeInterface { rowData.add(instanceConsumption.getFragmentId()); rowData.add(instanceConsumption.getInstanceId().toString()); rowData.add(instanceConsumption.getAddress().toString()); - rowData.add(String.valueOf(instanceConsumption.getTotalIoConsumption())); - rowData.add(String.valueOf(instanceConsumption.getTotalCpuConsumption())); + rowData.add(instanceConsumption.getFormattingIoConsumption()); + rowData.add(instanceConsumption.getFormattingCpuConsumption()); sortedRowDatas.add(rowData); } diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java index f18d9d1399..647efca885 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Collection; +import java.util.Formatter; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -334,7 +335,7 @@ public class CurrentQueryInfoProvider { } } - public long getTotalCpuConsumption() { + private long getTotalCpuConsumption() { long cpu = 0; for (ConsumptionCalculator consumption : calculators) { cpu += consumption.getCpu(); @@ -342,13 +343,27 @@ public class CurrentQueryInfoProvider { return cpu; } - public long getTotalIoConsumption() { + private long getTotalIoConsumption() { long io = 0; for (ConsumptionCalculator consumption : calculators) { io += consumption.getIo(); } return io; } + + public String getFormattingCpuConsumption() { + final StringBuilder builder = new StringBuilder(); + builder.append(getTotalCpuConsumption()).append(" Rows"); + return builder.toString(); + } + + public String getFormattingIoConsumption() { + final Pair pair = DebugUtil.getByteUint(getTotalIoConsumption()); + final Formatter fmt = new Formatter(); + final StringBuilder builder = new StringBuilder(); + builder.append(fmt.format("%.2f", pair.first)).append(" ").append(pair.second); + return builder.toString(); + } } public static class InstanceConsumption extends Consumption { diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java index a59c3dbe9e..9f16be1b4b 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java @@ -76,8 +76,8 @@ public class CurrentQueryStatisticsProcDir implements ProcDirInterface { values.add(item.getDb()); values.add(item.getUser()); final CurrentQueryInfoProvider.Consumption consumption = consumptions.get(item.getQueryId()); - values.add(String.valueOf(consumption.getTotalIoConsumption())); - values.add(String.valueOf(consumption.getTotalCpuConsumption())); + values.add(consumption.getFormattingIoConsumption()); + values.add(consumption.getFormattingCpuConsumption()); values.add(item.getQueryExecTime()); sortedRowData.add(values); } diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 560d538bb2..7fada7e5f7 100644 --- a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -37,6 +37,7 @@ import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; @@ -92,12 +93,16 @@ public class ConnectProcessor { ctx.getState().setOk(); } - private void auditAfterExec(String origStmt, StatementBase parsedStmt) { + private void auditAfterExec(String origStmt, StatementBase parsedStmt, + StmtExecutor.QueryConsumption queryConsumption) { // slow query long elapseMs = System.currentTimeMillis() - ctx.getStartTime(); // query state log ctx.getAuditBuilder().put("state", ctx.getState()); ctx.getAuditBuilder().put("time", elapseMs); + Preconditions.checkNotNull(queryConsumption); + ctx.getAuditBuilder().put("cpu", queryConsumption.getFormattingCpuConsumption()); + ctx.getAuditBuilder().put("io", queryConsumption.getFormattingIoConsumption()); ctx.getAuditBuilder().put("returnRows", ctx.getReturnRows()); ctx.getAuditBuilder().put("stmt_id", ctx.getStmtId()); @@ -177,7 +182,8 @@ public class ConnectProcessor { // audit after exec // replace '\n' to '\\n' to make string in one line - auditAfterExec(stmt.replace("\n", " \\n"), executor.getParsedStmt()); + auditAfterExec(stmt.replace("\n", " \\n"), executor.getParsedStmt(), + executor.getQueryConsumptionForAuditLog()); } // Get the column definitions of a table diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 252a9984e6..8947718b31 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -574,12 +574,12 @@ public class Coordinator { } } - TResultBatch getNext() throws Exception { + public RowBatch getNext() throws Exception { if (receiver == null) { throw new UserException("There is no receiver."); } - TResultBatch resultBatch; + RowBatch resultBatch; Status status = new Status(); resultBatch = receiver.getNext(status); @@ -611,7 +611,7 @@ public class Coordinator { } } - if (resultBatch == null) { + if (resultBatch.isEos()) { this.returnedAllResults = true; // if this query is a block query do not cancel. @@ -622,7 +622,7 @@ public class Coordinator { cancelInternal(); } } else { - numReceivedRows += resultBatch.getRowsSize(); + numReceivedRows += resultBatch.getBatch().getRowsSize(); } return resultBatch; @@ -746,6 +746,12 @@ public class Coordinator { dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); dest.setBrpc_server(toBrpcHost(destParams.instanceExecParams.get(j).host)); + // select first dest as consumption transfer chain. + if (j == 0) { + dest.setIs_transfer_chain(true); + } else { + dest.setIs_transfer_chain(false); + } params.destinations.add(dest); } } @@ -1038,7 +1044,7 @@ public class Coordinator { return result; } - public void createScanInstance(PlanNodeId leftMostScanId, FragmentExecParams fragmentExecParams) + private void createScanInstance(PlanNodeId leftMostScanId, FragmentExecParams fragmentExecParams) throws UserException { int maxNumInstance = queryOptions.mt_dop; if (maxNumInstance == 0) { @@ -1150,7 +1156,7 @@ public class Coordinator { } // create collocated instance according to inputFragments - public void createCollocatedInstance(FragmentExecParams fragmentExecParams) { + private void createCollocatedInstance(FragmentExecParams fragmentExecParams) { Preconditions.checkState(fragmentExecParams.inputFragments.size() >= 1); final FragmentExecParams inputFragmentParams = fragmentExecParamsMap.get(fragmentExecParams. inputFragments.get(0)); @@ -1169,7 +1175,7 @@ public class Coordinator { } - public void createUnionInstance(FragmentExecParams fragmentExecParams) { + private void createUnionInstance(FragmentExecParams fragmentExecParams) { final PlanFragment fragment = fragmentExecParams.fragment; // Add hosts of scan nodes List scanNodeIds = findScanNodes(fragment.getPlanRoot()); @@ -1563,7 +1569,6 @@ public class Coordinator { params.setResource_info(tResourceInfo); params.params.setQuery_id(queryId); params.params.setFragment_instance_id(instanceExecParam.instanceId); - Map> scanRanges = instanceExecParam.perNodeScanRanges; if (scanRanges == null) { scanRanges = Maps.newHashMap(); diff --git a/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java index acb87f135d..101fa70d37 100644 --- a/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java +++ b/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java @@ -58,11 +58,11 @@ public class ResultReceiver { this.timeoutTs = System.currentTimeMillis() + timeoutMs; } - public TResultBatch getNext(Status status) throws TException { + public RowBatch getNext(Status status) throws TException { if (isDone) { return null; } - + final RowBatch rowBatch = new RowBatch(); try { while (!isDone && !isCancel) { PFetchDataRequest request = new PFetchDataRequest(finstId); @@ -90,6 +90,10 @@ public class ResultReceiver { if (code != TStatusCode.OK) { status.setPstatus(pResult.status); return null; + } + + if (pResult.queryConsumption != null) { + rowBatch.setQueryConsumption(pResult.queryConsumption); } if (packetIdx != pResult.packetSeq) { @@ -106,7 +110,9 @@ public class ResultReceiver { TResultBatch resultBatch = new TResultBatch(); TDeserializer deserializer = new TDeserializer(); deserializer.deserialize(resultBatch, serialResult); - return resultBatch; + rowBatch.setBatch(resultBatch); + rowBatch.setEos(pResult.eos); + return rowBatch; } } } catch (RpcException e) { @@ -134,7 +140,7 @@ public class ResultReceiver { if (isCancel) { status.setStatus(Status.CANCELLED); } - return null; + return rowBatch; } public void cancel() { diff --git a/fe/src/main/java/org/apache/doris/qe/RowBatch.java b/fe/src/main/java/org/apache/doris/qe/RowBatch.java new file mode 100644 index 0000000000..c86d93311d --- /dev/null +++ b/fe/src/main/java/org/apache/doris/qe/RowBatch.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.rpc.PQueryConsumption; +import org.apache.doris.thrift.TResultBatch; + +public final class RowBatch { + private TResultBatch batch; + private PQueryConsumption queryConsumption; + private boolean eos; + + public RowBatch() { + eos = true; + } + + public TResultBatch getBatch() { + return batch; + } + + public void setBatch(TResultBatch batch) { + this.batch = batch; + } + + public PQueryConsumption getQueryConsumption() { + return queryConsumption; + } + + public void setQueryConsumption(PQueryConsumption queryConsumption) { + this.queryConsumption = queryConsumption; + } + + public boolean isEos() { + return eos; + } + + public void setEos(boolean eos) { + this.eos = eos; + } +} diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index 53b6c02309..32528dc5b7 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -52,6 +53,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.NotImplementedException; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ProfileManager; @@ -63,6 +65,7 @@ import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.Planner; import org.apache.doris.rewrite.ExprRewriter; +import org.apache.doris.rpc.PQueryConsumption; import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TQueryOptions; @@ -75,6 +78,7 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; import java.io.StringReader; import java.nio.ByteBuffer; +import java.util.Formatter; import java.util.List; import java.util.Map; import java.util.UUID; @@ -101,6 +105,7 @@ public class StmtExecutor { private Planner planner; private boolean isProxy; private ShowResultSet proxyResultSet = null; + private QueryConsumption consumptionForAuditLog; public StmtExecutor(ConnectContext context, String stmt, boolean isProxy) { this.context = context; @@ -537,26 +542,34 @@ public class StmtExecutor { // so We need to send fields after first batch arrived // send result - TResultBatch batch; + RowBatch batch; MysqlChannel channel = context.getMysqlChannel(); boolean isSendFields = false; - while ((batch = coord.getNext()) != null) { + + while ((batch = coord.getNext()) != null && !batch.isEos()) { if (!isSendFields) { sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); } isSendFields = true; - - for (ByteBuffer row : batch.getRows()) { + for (ByteBuffer row : batch.getBatch().getRows()) { channel.sendOnePacket(row); } - context.updateReturnRows(batch.getRows().size()); + context.updateReturnRows(batch.getBatch().getRows().size()); } + setConsumptionForAuditLog(batch); if (!isSendFields) { sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); } context.getState().setEof(); } + private void setConsumptionForAuditLog(RowBatch batch) { + if (batch != null) { + final PQueryConsumption queryConsumption = batch.getQueryConsumption(); + consumptionForAuditLog = new QueryConsumption(queryConsumption.cpu, queryConsumption.io); + } + } + // Process a select statement. private void handleInsertStmt() throws Exception { // Every time set no send flag and clean all data in buffer @@ -774,4 +787,40 @@ public class StmtExecutor { ExportStmt exportStmt = (ExportStmt) parsedStmt; context.getCatalog().getExportMgr().addExportJob(exportStmt); } + + public QueryConsumption getQueryConsumptionForAuditLog() { + if (consumptionForAuditLog == null) { + consumptionForAuditLog = new QueryConsumption(); + } + return consumptionForAuditLog; + } + + public static class QueryConsumption { + private final long cpu; + private final long io; + + public QueryConsumption() { + this.cpu = 0; + this.io = 0; + } + + public QueryConsumption(long cpu, long io) { + this.cpu = cpu; + this.io = io; + } + + public String getFormattingCpuConsumption() { + final StringBuilder builder = new StringBuilder(); + builder.append(cpu).append(" Rows"); + return builder.toString(); + } + + public String getFormattingIoConsumption() { + final Pair pair = DebugUtil.getByteUint(io); + final Formatter fmt = new Formatter(); + final StringBuilder builder = new StringBuilder(); + builder.append(fmt.format("%.2f", pair.first)).append(" ").append(pair.second); + return builder.toString(); + } + } } diff --git a/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java b/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java index cd4af477e4..2b1406a80a 100644 --- a/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java +++ b/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java @@ -28,4 +28,6 @@ public class PFetchDataResult { public long packetSeq; @Protobuf(order = 3, required = false) public boolean eos; + @Protobuf(order = 4, required = false) + public PQueryConsumption queryConsumption; } diff --git a/fe/src/main/java/org/apache/doris/rpc/PQueryConsumption.java b/fe/src/main/java/org/apache/doris/rpc/PQueryConsumption.java new file mode 100644 index 0000000000..3b23391414 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/rpc/PQueryConsumption.java @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.rpc; + +import com.baidu.bjf.remoting.protobuf.annotation.Protobuf; +import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass; + +@ProtobufClass +public class PQueryConsumption { + @Protobuf(order = 1, required = false) + public long cpu; + @Protobuf(order = 2, required = false) + public long io; +} diff --git a/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java b/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java index e3e4a573c6..d580159dc1 100644 --- a/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java +++ b/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java @@ -231,6 +231,7 @@ public class ConnectProcessorTest { // Mock statement executor StmtExecutor qe = EasyMock.createNiceMock(StmtExecutor.class); qe.execute(); + EasyMock.expect(qe.getQueryConsumptionForAuditLog()).andReturn(new StmtExecutor.QueryConsumption()); EasyMock.expectLastCall().anyTimes(); EasyMock.replay(qe); PowerMock.expectNew( @@ -254,11 +255,11 @@ public class ConnectProcessorTest { StmtExecutor qe = EasyMock.createNiceMock(StmtExecutor.class); qe.execute(); EasyMock.expectLastCall().andThrow(new IOException("Fail")).anyTimes(); + EasyMock.expect(qe.getQueryConsumptionForAuditLog()).andReturn(new StmtExecutor.QueryConsumption()); EasyMock.replay(qe); PowerMock.expectNew(StmtExecutor.class, EasyMock.isA(ConnectContext.class), EasyMock.isA(String.class)) .andReturn(qe).anyTimes(); PowerMock.replay(StmtExecutor.class); - processor.processOnce(); Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand()); } @@ -272,6 +273,7 @@ public class ConnectProcessorTest { // Mock statement executor StmtExecutor qe = EasyMock.createNiceMock(StmtExecutor.class); qe.execute(); + EasyMock.expect(qe.getQueryConsumptionForAuditLog()).andReturn(new StmtExecutor.QueryConsumption()); EasyMock.expectLastCall().andThrow(new NullPointerException("Fail")).anyTimes(); EasyMock.replay(qe); PowerMock.expectNew(StmtExecutor.class, EasyMock.isA(ConnectContext.class), EasyMock.isA(String.class)) diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto index bec67edbc3..8df748012d 100644 --- a/gensrc/proto/data.proto +++ b/gensrc/proto/data.proto @@ -19,6 +19,11 @@ syntax="proto2"; package doris; +message PQueryConsumption { + optional int64 cpu = 1; + optional int64 io = 2; +} + message PRowBatch { required int32 num_rows = 1; repeated int32 row_tuples = 2; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 71782934b2..daad8a62a7 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -41,6 +41,8 @@ message PTransmitDataParams { optional PRowBatch row_batch = 6; // different per packet required int64 packet_seq = 7; + // query consumption + optional PQueryConsumption query_consumption = 8; }; message PTransmitDataResult { @@ -129,6 +131,7 @@ message PFetchDataResult { // valid when status is ok optional int64 packet_seq = 2; optional bool eos = 3; + optional PQueryConsumption query_consumption = 4; }; message PTriggerProfileReportRequest { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 4bc4d5ee13..77db297fdf 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -142,6 +142,7 @@ struct TPlanFragmentDestination { // ... which is being executed on this server 2: required Types.TNetworkAddress server 3: optional Types.TNetworkAddress brpc_server + 4: optional bool is_transfer_chain } // Parameters for a single execution instance of a particular TPlanFragment