From 5192e2f010308eefffa5271b0bdc947dfd9168ae Mon Sep 17 00:00:00 2001 From: chenhao <510341142@qq.com> Date: Wed, 9 Jan 2019 22:28:20 +0800 Subject: [PATCH] Add cpu and io indicates to audit log (#513) Record query consumption into fe audit log. Its basic mode of work is as follows, one of instance of parent plan is responsible for accumulating sub plan's consumption and send to it's parent, BE coordinator will get total consumption because it's a single instance. --- be/src/exec/data_sink.h | 7 + be/src/exec/exchange_node.cpp | 4 + be/src/exec/exchange_node.h | 9 +- be/src/runtime/buffer_control_block.cpp | 10 +- be/src/runtime/buffer_control_block.h | 10 +- be/src/runtime/data_stream_mgr.cpp | 4 +- be/src/runtime/data_stream_mgr.h | 3 +- be/src/runtime/data_stream_recvr.cc | 6 + be/src/runtime/data_stream_recvr.h | 7 + be/src/runtime/data_stream_sender.cpp | 18 +- .../runtime/exec_node_consumption_provider.h | 167 ++++++++++++++++++ be/src/runtime/plan_fragment_executor.cpp | 2 + be/src/runtime/result_sink.cpp | 2 + be/src/runtime/runtime_state.h | 16 ++ be/src/service/backend_service.cpp | 16 +- be/src/service/internal_service.cpp | 2 +- .../proc/CurrentQueryFragmentProcNode.java | 4 +- .../common/proc/CurrentQueryInfoProvider.java | 19 +- .../proc/CurrentQueryStatisticsProcDir.java | 4 +- .../org/apache/doris/qe/ConnectProcessor.java | 10 +- .../java/org/apache/doris/qe/Coordinator.java | 21 ++- .../org/apache/doris/qe/ResultReceiver.java | 14 +- .../java/org/apache/doris/qe/RowBatch.java | 55 ++++++ .../org/apache/doris/qe/StmtExecutor.java | 59 ++++++- .../apache/doris/rpc/PFetchDataResult.java | 2 + .../apache/doris/rpc/PQueryConsumption.java | 29 +++ .../apache/doris/qe/ConnectProcessorTest.java | 4 +- gensrc/proto/data.proto | 5 + gensrc/proto/internal_service.proto | 3 + gensrc/thrift/PaloInternalService.thrift | 1 + 30 files changed, 467 insertions(+), 46 deletions(-) create mode 100644 be/src/runtime/exec_node_consumption_provider.h create mode 100644 fe/src/main/java/org/apache/doris/qe/RowBatch.java create mode 100644 fe/src/main/java/org/apache/doris/rpc/PQueryConsumption.java diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index 2064bf62e3..3048b95fe6 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -24,6 +24,7 @@ #include "common/status.h" #include "gen_cpp/DataSinks_types.h" #include "gen_cpp/Exprs_types.h" +#include "runtime/exec_node_consumption_provider.h" #include "runtime/mem_tracker.h" namespace doris { @@ -78,11 +79,17 @@ public: // Returns the runtime profile for the sink. virtual RuntimeProfile* profile() = 0; + void set_query_consumption(const ExecNodeConsumptionProvider::Consumption& consumption) { + _query_consumption = consumption; + } + protected: // Set to true after close() has been called. subclasses should check and set this in // close(). bool _closed; std::unique_ptr _expr_mem_tracker; + + ExecNodeConsumptionProvider::Consumption _query_consumption; }; } // namespace doris diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index 4c7828c309..52fbe98e76 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -131,6 +131,7 @@ Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* if (reached_limit()) { _stream_recvr->transfer_all_resources(output_batch); + set_runtime_consumption(state); *eos = true; return Status::OK; } else { @@ -179,6 +180,7 @@ Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* if (reached_limit()) { _stream_recvr->transfer_all_resources(output_batch); + set_runtime_consumption(state); *eos = true; return Status::OK; } @@ -197,6 +199,7 @@ Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* RETURN_IF_ERROR(fill_input_row_batch(state)); *eos = (_input_batch == NULL); if (*eos) { + set_runtime_consumption(state); return Status::OK; } @@ -243,6 +246,7 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, RowBatch* output_batc // by the merger to the output batch. if (*eos) { _stream_recvr->transfer_all_resources(output_batch); + set_runtime_consumption(state); } COUNTER_SET(_rows_returned_counter, _num_rows_returned); diff --git a/be/src/exec/exchange_node.h b/be/src/exec/exchange_node.h index 6450d39f91..b800e71f6d 100644 --- a/be/src/exec/exchange_node.h +++ b/be/src/exec/exchange_node.h @@ -21,11 +21,12 @@ #include #include "exec/exec_node.h" #include "exec/sort_exec_exprs.h" +#include "runtime/data_stream_recvr.h" +#include "runtime/exec_node_consumption_provider.h" namespace doris { class RowBatch; -class DataStreamRecvr; class RuntimeProfile; // Receiver node for data streams. The data stream receiver is created in Prepare() @@ -61,6 +62,12 @@ protected: virtual void debug_string(int indentation_level, std::stringstream* out) const; private: + + void set_runtime_consumption(RuntimeState* state) { + ExecNodeConsumptionProvider::Consumption consumption = _stream_recvr->get_sub_plan_consumption(); + state->add_sub_plan_consumption(consumption); + } + // Implements GetNext() for the case where _is_merging is true. Delegates the GetNext() // call to the underlying DataStreamRecvr. Status get_next_merging(RuntimeState* state, RowBatch* output_batch, bool* eos); diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index f4486dea08..d79b1794a6 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -31,9 +31,13 @@ void GetResultBatchCtx::on_failure(const Status& status) { delete this; } -void GetResultBatchCtx::on_close(int64_t packet_seq) { +void GetResultBatchCtx::on_close(int64_t packet_seq, + ExecNodeConsumptionProvider::Consumption* consumption) { Status status; status.to_protobuf(result->mutable_status()); + if (consumption != nullptr) { + consumption->serialize(result->mutable_query_consumption()); + } result->set_packet_seq(packet_seq); result->set_eos(true); done->Run(); @@ -183,7 +187,7 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { return; } if (_is_close) { - ctx->on_close(_packet_num); + ctx->on_close(_packet_num, &_consumption); return; } // no ready data, push ctx to waiting list @@ -200,7 +204,7 @@ Status BufferControlBlock::close(Status exec_status) { if (!_waiting_rpc.empty()) { if (_status.ok()) { for (auto& ctx : _waiting_rpc) { - ctx->on_close(_packet_num); + ctx->on_close(_packet_num, &_consumption); } } else { for (auto& ctx : _waiting_rpc) { diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 701ef5f70a..c02fa882b5 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -24,6 +24,7 @@ #include #include "common/status.h" #include "gen_cpp/Types_types.h" +#include "runtime/exec_node_consumption_provider.h" namespace google { namespace protobuf { @@ -52,7 +53,7 @@ struct GetResultBatchCtx { } void on_failure(const Status& status); - void on_close(int64_t packet_seq); + void on_close(int64_t packet_seq, ExecNodeConsumptionProvider::Consumption* consumption = nullptr); void on_data(TFetchDataResult* t_result, int64_t packet_seq, bool eos = false); }; @@ -80,6 +81,9 @@ public: return _fragment_id; } + void set_query_consumption(const ExecNodeConsumptionProvider::Consumption& consumption) { + _consumption = consumption; + } private: typedef std::list ResultQueue; @@ -100,8 +104,10 @@ private: boost::condition_variable _data_arriaval; // signal removal of data by stream consumer boost::condition_variable _data_removal; - + std::deque _waiting_rpc; + + ExecNodeConsumptionProvider::Consumption _consumption; }; } diff --git a/be/src/runtime/data_stream_mgr.cpp b/be/src/runtime/data_stream_mgr.cpp index 0c75fbb04a..dee0c938b4 100644 --- a/be/src/runtime/data_stream_mgr.cpp +++ b/be/src/runtime/data_stream_mgr.cpp @@ -121,7 +121,8 @@ Status DataStreamMgr::add_data( Status DataStreamMgr::close_sender(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int sender_id, - int be_number) { + int be_number, + const PQueryConsumption& consumption) { VLOG_FILE << "close_sender(): fragment_instance_id=" << fragment_instance_id << ", node=" << dest_node_id; shared_ptr recvr = find_recvr(fragment_instance_id, dest_node_id); @@ -135,6 +136,7 @@ Status DataStreamMgr::close_sender(const TUniqueId& fragment_instance_id, // errors from receiver-initiated teardowns. return Status::OK; } + recvr->add_sub_plan_consumption(consumption); recvr->remove_sender(sender_id, be_number); return Status::OK; } diff --git a/be/src/runtime/data_stream_mgr.h b/be/src/runtime/data_stream_mgr.h index a4880b65e1..ca94dc6ccd 100644 --- a/be/src/runtime/data_stream_mgr.h +++ b/be/src/runtime/data_stream_mgr.h @@ -46,6 +46,7 @@ class DataStreamRecvr; class RowBatch; class RuntimeState; class PRowBatch; +class PQueryConsumption; class PUniqueId; // Singleton class which manages all incoming data streams at a backend node. It @@ -96,7 +97,7 @@ public: // sender has closed. // Returns OK if successful, error status otherwise. Status close_sender(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, - int sender_id, int be_number); + int sender_id, int be_number, const PQueryConsumption& consumption); // Closes all receivers registered for fragment_instance_id immediately. void cancel(const TUniqueId& fragment_instance_id); diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index f007e6f67b..84f083dee1 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -242,6 +242,7 @@ void DataStreamRecvr::SenderQueue::add_batch( // it in this thread. batch = new RowBatch(_recvr->row_desc(), pb_batch, _recvr->mem_tracker()); } + VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size << "\n"; _batch_queue.emplace_back(batch_size, batch); @@ -433,4 +434,9 @@ Status DataStreamRecvr::get_batch(RowBatch** next_batch) { return _sender_queues[0]->get_batch(next_batch); } +void DataStreamRecvr::add_sub_plan_consumption(const PQueryConsumption& p_consumption) { + ExecNodeConsumptionProvider::Consumption consumption; + consumption.deserialize(p_consumption); + _sub_plan_consumption.add(consumption); +} } diff --git a/be/src/runtime/data_stream_recvr.h b/be/src/runtime/data_stream_recvr.h index 22bca1cecd..7b897f6e7f 100644 --- a/be/src/runtime/data_stream_recvr.h +++ b/be/src/runtime/data_stream_recvr.h @@ -24,6 +24,7 @@ #include "common/object_pool.h" #include "common/status.h" #include "gen_cpp/Types_types.h" // for TUniqueId +#include "runtime/exec_node_consumption_provider.h" #include "runtime/descriptors.h" #include "util/tuple_row_compare.h" @@ -99,6 +100,11 @@ public: const RowDescriptor& row_desc() const { return _row_desc; } MemTracker* mem_tracker() const { return _mem_tracker.get(); } + void add_sub_plan_consumption(const PQueryConsumption& p_consumption); + + ExecNodeConsumptionProvider::Consumption get_sub_plan_consumption() { + return _sub_plan_consumption; + } private: friend class DataStreamMgr; class SenderQueue; @@ -194,6 +200,7 @@ private: // Wall time senders spend waiting for the recv buffer to have capacity. RuntimeProfile::Counter* _buffer_full_wall_timer; + ExecNodeConsumptionProvider::Consumption _sub_plan_consumption; // Total time spent waiting for data to arrive in the recv buffer // RuntimeProfile::Counter* _data_arrival_timer; }; diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index d0c60960fc..c319692f30 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -71,7 +71,7 @@ public: Channel(DataStreamSender* parent, const RowDescriptor& row_desc, const TNetworkAddress& brpc_dest, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int buffer_size) : + PlanNodeId dest_node_id, int buffer_size, bool is_transfer_chain) : _parent(parent), _buffer_size(buffer_size), _row_desc(row_desc), @@ -80,7 +80,8 @@ public: _num_data_bytes_sent(0), _packet_seq(0), _need_close(false), - _brpc_dest_addr(brpc_dest) { + _brpc_dest_addr(brpc_dest), + _is_transfer_chain(is_transfer_chain) { } virtual ~Channel() { @@ -163,6 +164,8 @@ private: palo::PInternalService_Stub* _brpc_stub = nullptr; RefCountClosure* _closure = nullptr; int32_t _brpc_timeout_ms = 500; + // whether the dest can be treated as consumption transfer chain. + bool _is_transfer_chain; }; Status DataStreamSender::Channel::init(RuntimeState* state) { @@ -203,6 +206,10 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { } VLOG_ROW << "Channel::send_batch() instance_id=" << _fragment_instance_id << " dest_node=" << _dest_node_id; + if (eos && _is_transfer_chain) { + auto consumption = _brpc_request.mutable_query_consumption(); + _parent->_query_consumption.serialize(consumption); + } _brpc_request.set_eos(eos); if (batch != nullptr) { @@ -305,11 +312,16 @@ DataStreamSender::DataStreamSender( || sink.output_partition.type == TPartitionType::RANGE_PARTITIONED); // TODO: use something like google3's linked_ptr here (scoped_ptr isn't copyable) for (int i = 0; i < destinations.size(); ++i) { + bool is_transfer_chain = false; + if (destinations[i].__isset.is_transfer_chain) { + is_transfer_chain = destinations[i].is_transfer_chain; + } _channel_shared_ptrs.emplace_back( new Channel(this, row_desc, destinations[i].brpc_server, destinations[i].fragment_instance_id, - sink.dest_node_id, per_channel_buffer_size)); + sink.dest_node_id, per_channel_buffer_size, + is_transfer_chain)); _channels.push_back(_channel_shared_ptrs[i].get()); } } diff --git a/be/src/runtime/exec_node_consumption_provider.h b/be/src/runtime/exec_node_consumption_provider.h new file mode 100644 index 0000000000..a8aae48381 --- /dev/null +++ b/be/src/runtime/exec_node_consumption_provider.h @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_RUNTIME_EXEC_NODE_CONSUMPTION_PROVIDER_H +#define DORIS_BE_RUNTIME_EXEC_NODE_CONSUMPTION_PROVIDER_H + +#include "util/runtime_profile.h" +#include "util/string_util.h" +#include "gen_cpp/data.pb.h" +#include "gen_cpp/PlanNodes_types.h" + +namespace doris { + +// Generate ExecNode resource Consumption with RuntimeProfile. CPU +// consumption is measured by the number of rows processed and IO +// consumption is measured by the size of the scans. +class ExecNodeConsumptionProvider { +public: + + ExecNodeConsumptionProvider() { + init(); + } + + class Consumption { + public: + Consumption() : cpu(0), io(0) { + } + + void add(const Consumption& other) { + cpu.add(other.cpu); + io.add(other.io); + } + + void serialize(PQueryConsumption* consumption) { + DCHECK(consumption != nullptr); + consumption->set_cpu(cpu.load()); + consumption->set_io(io.load()); + } + + void deserialize(const PQueryConsumption& consumption) { + cpu.store(consumption.cpu()); + io.store(consumption.io()); + } + + int64_t get_cpu() { + return cpu.load(); + } + + int64_t get_io() { + return io.load(); + } + + void set(int64_t cpu, int64_t io) { + this->cpu.store(cpu); + this->io.store(io); + } + + Consumption& operator=(const Consumption& other) { + if (this != &other) { + set(other.cpu, other.io); + } + return *this; + } + private: + AtomicInt64 cpu; + AtomicInt64 io; + }; + + Consumption get_consumption(RuntimeProfile* profile) { + Consumption total_consumption; + std::vector all_profiles; + profile->get_all_children(&all_profiles); + for (auto profile : all_profiles) { + // ExecNode's RuntimeProfile name is "$ExecNode_type_name (id=?)" + std::vector elements; + boost::split(elements, profile->name(), boost::is_any_of(" "), boost::token_compress_off); + Consumption consumption; + bool has = get_consumption(profile, &consumption, elements[0]); + if (elements.size() == 2 && has) { + total_consumption.add(consumption); + } + } + return total_consumption; + } + +private: + + void init() { + functions["OLAP_SCAN_NODE"] = get_olap_scan_consumption; + functions["HASH_JOIN_NODE"] = get_hash_join_consumption; + functions["AGGREGATION_NODE"] = get_hash_agg_consumption; + functions["SORT_NODE"] = get_sort_consumption; + functions["ANALYTIC_EVAL_NODE"] = get_windows_consumption; + functions["UNION_NODE"] = get_union_consumption; + functions["EXCHANGE_NODE"] = get_exchange_consumption; + } + + bool get_consumption(RuntimeProfile* profile, Consumption* consumption, const std::string& name) { + ConsumptionFunc get_consumption_func = functions[name]; + if (get_consumption_func != nullptr) { + get_consumption_func(profile, consumption); + return true; + } + return false; + } + + static void get_olap_scan_consumption(RuntimeProfile* profile, Consumption* consumption) { + RuntimeProfile::Counter* read_compressed_counter = profile->get_counter("CompressedBytesRead"); + consumption->set(0, read_compressed_counter->value()); + } + + static void get_hash_join_consumption(RuntimeProfile* profile, Consumption* consumption) { + RuntimeProfile::Counter* probe_counter = profile->get_counter("ProbeRows"); + RuntimeProfile::Counter* build_counter = profile->get_counter("BuildRows"); + consumption->set(probe_counter->value() + build_counter->value(), 0); + } + + static void get_hash_agg_consumption(RuntimeProfile* profile, Consumption* consumption) { + RuntimeProfile::Counter* build_counter = profile->get_counter("BuildRows"); + consumption->set(build_counter->value(), 0); + } + + static void get_sort_consumption(RuntimeProfile* profile, Consumption* consumption) { + RuntimeProfile::Counter* sort_counter = profile->get_counter("SortRows"); + consumption->set(sort_counter->value(), 0); + } + + static void get_windows_consumption(RuntimeProfile* profile, Consumption* consumption) { + RuntimeProfile::Counter* process_counter = profile->get_counter("ProcessRows"); + consumption->set(process_counter->value(), 0); + } + + static void get_union_consumption(RuntimeProfile* profile, Consumption* consumption) { + RuntimeProfile::Counter* materialize_counter = profile->get_counter("MaterializeRows"); + consumption->set(materialize_counter->value(), 0); + } + + static void get_exchange_consumption(RuntimeProfile* profile, Consumption* consumption) { + RuntimeProfile::Counter* merge_counter = profile->get_counter("MergeRows"); + // exchange merge sort + if (merge_counter != nullptr) { + consumption->set(merge_counter->value(), 0); + } + } + + typedef std::function ConsumptionFunc; + // ExecNode type name to function + std::map functions; +}; + +} + +#endif diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 1e769ee6df..db62400111 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -333,6 +333,8 @@ Status PlanFragmentExecutor::open_internal() { // audit the sinks to check that this is ok, or change that behaviour. { SCOPED_TIMER(profile()->total_time_counter()); + ExecNodeConsumptionProvider::Consumption consumption = runtime_state()->get_consumption(); + _sink->set_query_consumption(consumption); Status status = _sink->close(runtime_state(), _status); RETURN_IF_ERROR(status); } diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp index b5eb2c978b..1b08ec32be 100644 --- a/be/src/runtime/result_sink.cpp +++ b/be/src/runtime/result_sink.cpp @@ -82,6 +82,8 @@ Status ResultSink::close(RuntimeState* state, Status exec_status) { } // close sender, this is normal path end if (_sender) { + // In the last, send consumption of execnode. + _sender->set_query_consumption(_query_consumption); _sender->close(exec_status); } state->exec_env()->result_mgr()->cancel_at_time(time(NULL) + config::result_buffer_cancelled_interval_time, diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index b58ef95493..a6cb40be9f 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -33,6 +33,7 @@ #include "common/global_types.h" #include "util/logging.h" +#include "runtime/exec_node_consumption_provider.h" #include "runtime/mem_pool.h" #include "runtime/thread_resource_mgr.h" #include "gen_cpp/Types_types.h" // for TUniqueId @@ -492,6 +493,18 @@ public: return _is_running; } + void add_sub_plan_consumption(const ExecNodeConsumptionProvider::Consumption& consumption) { + _sub_plan_consumption.add(consumption); + } + + ExecNodeConsumptionProvider::Consumption get_consumption() { + ExecNodeConsumptionProvider provider; + ExecNodeConsumptionProvider::Consumption total_consumption; + total_consumption = provider.get_consumption(&_profile); + total_consumption.add(_sub_plan_consumption); + return total_consumption; + } + private: // Allow TestEnv to set block_mgr manually for testing. friend class TestEnv; @@ -638,6 +651,9 @@ private: /// TODO: not needed if we call ReleaseResources() in a timely manner (IMPALA-1575). AtomicInt32 _initial_reservation_refcnt; + // Consumption from sub plan, it should only be updated by ExchangeNode. + ExecNodeConsumptionProvider::Consumption _sub_plan_consumption; + // prohibit copies RuntimeState(const RuntimeState&); }; diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 056495a281..496fa89131 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -130,14 +130,14 @@ void BackendService::transmit_data(TTransmitDataResult& return_val, } if (params.eos) { - Status status = _exec_env->stream_mgr()->close_sender( - params.dest_fragment_instance_id, - params.dest_node_id, - params.sender_id, - params.be_number); - VLOG_ROW << "params.eos: " << (params.eos ? "true" : "false") - << " close_sender status: " << status.get_error_msg(); - status.set_t_status(&return_val); + // Status status = _exec_env->stream_mgr()->close_sender( + // params.dest_fragment_instance_id, + // params.dest_node_id, + // params.sender_id, + // params.be_number); + //VLOG_ROW << "params.eos: " << (params.eos ? "true" : "false") + // << " close_sender status: " << status.get_error_msg(); + //status.set_t_status(&return_val); } } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 2f4b3f347a..e5375a29d9 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -60,7 +60,7 @@ void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cnt finst_id.__set_lo(request->finst_id().lo()); _exec_env->stream_mgr()->close_sender( finst_id, request->node_id(), - request->sender_id(), request->be_number()); + request->sender_id(), request->be_number(), request->query_consumption()); } if (done != nullptr) { done->Run(); diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java index 21ffef73fa..88cdbed3f0 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryFragmentProcNode.java @@ -79,8 +79,8 @@ public class CurrentQueryFragmentProcNode implements ProcNodeInterface { rowData.add(instanceConsumption.getFragmentId()); rowData.add(instanceConsumption.getInstanceId().toString()); rowData.add(instanceConsumption.getAddress().toString()); - rowData.add(String.valueOf(instanceConsumption.getTotalIoConsumption())); - rowData.add(String.valueOf(instanceConsumption.getTotalCpuConsumption())); + rowData.add(instanceConsumption.getFormattingIoConsumption()); + rowData.add(instanceConsumption.getFormattingCpuConsumption()); sortedRowDatas.add(rowData); } diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java index f18d9d1399..647efca885 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Collection; +import java.util.Formatter; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -334,7 +335,7 @@ public class CurrentQueryInfoProvider { } } - public long getTotalCpuConsumption() { + private long getTotalCpuConsumption() { long cpu = 0; for (ConsumptionCalculator consumption : calculators) { cpu += consumption.getCpu(); @@ -342,13 +343,27 @@ public class CurrentQueryInfoProvider { return cpu; } - public long getTotalIoConsumption() { + private long getTotalIoConsumption() { long io = 0; for (ConsumptionCalculator consumption : calculators) { io += consumption.getIo(); } return io; } + + public String getFormattingCpuConsumption() { + final StringBuilder builder = new StringBuilder(); + builder.append(getTotalCpuConsumption()).append(" Rows"); + return builder.toString(); + } + + public String getFormattingIoConsumption() { + final Pair pair = DebugUtil.getByteUint(getTotalIoConsumption()); + final Formatter fmt = new Formatter(); + final StringBuilder builder = new StringBuilder(); + builder.append(fmt.format("%.2f", pair.first)).append(" ").append(pair.second); + return builder.toString(); + } } public static class InstanceConsumption extends Consumption { diff --git a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java index a59c3dbe9e..9f16be1b4b 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/CurrentQueryStatisticsProcDir.java @@ -76,8 +76,8 @@ public class CurrentQueryStatisticsProcDir implements ProcDirInterface { values.add(item.getDb()); values.add(item.getUser()); final CurrentQueryInfoProvider.Consumption consumption = consumptions.get(item.getQueryId()); - values.add(String.valueOf(consumption.getTotalIoConsumption())); - values.add(String.valueOf(consumption.getTotalCpuConsumption())); + values.add(consumption.getFormattingIoConsumption()); + values.add(consumption.getFormattingCpuConsumption()); values.add(item.getQueryExecTime()); sortedRowData.add(values); } diff --git a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 560d538bb2..7fada7e5f7 100644 --- a/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -37,6 +37,7 @@ import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; @@ -92,12 +93,16 @@ public class ConnectProcessor { ctx.getState().setOk(); } - private void auditAfterExec(String origStmt, StatementBase parsedStmt) { + private void auditAfterExec(String origStmt, StatementBase parsedStmt, + StmtExecutor.QueryConsumption queryConsumption) { // slow query long elapseMs = System.currentTimeMillis() - ctx.getStartTime(); // query state log ctx.getAuditBuilder().put("state", ctx.getState()); ctx.getAuditBuilder().put("time", elapseMs); + Preconditions.checkNotNull(queryConsumption); + ctx.getAuditBuilder().put("cpu", queryConsumption.getFormattingCpuConsumption()); + ctx.getAuditBuilder().put("io", queryConsumption.getFormattingIoConsumption()); ctx.getAuditBuilder().put("returnRows", ctx.getReturnRows()); ctx.getAuditBuilder().put("stmt_id", ctx.getStmtId()); @@ -177,7 +182,8 @@ public class ConnectProcessor { // audit after exec // replace '\n' to '\\n' to make string in one line - auditAfterExec(stmt.replace("\n", " \\n"), executor.getParsedStmt()); + auditAfterExec(stmt.replace("\n", " \\n"), executor.getParsedStmt(), + executor.getQueryConsumptionForAuditLog()); } // Get the column definitions of a table diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 252a9984e6..8947718b31 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -574,12 +574,12 @@ public class Coordinator { } } - TResultBatch getNext() throws Exception { + public RowBatch getNext() throws Exception { if (receiver == null) { throw new UserException("There is no receiver."); } - TResultBatch resultBatch; + RowBatch resultBatch; Status status = new Status(); resultBatch = receiver.getNext(status); @@ -611,7 +611,7 @@ public class Coordinator { } } - if (resultBatch == null) { + if (resultBatch.isEos()) { this.returnedAllResults = true; // if this query is a block query do not cancel. @@ -622,7 +622,7 @@ public class Coordinator { cancelInternal(); } } else { - numReceivedRows += resultBatch.getRowsSize(); + numReceivedRows += resultBatch.getBatch().getRowsSize(); } return resultBatch; @@ -746,6 +746,12 @@ public class Coordinator { dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); dest.setBrpc_server(toBrpcHost(destParams.instanceExecParams.get(j).host)); + // select first dest as consumption transfer chain. + if (j == 0) { + dest.setIs_transfer_chain(true); + } else { + dest.setIs_transfer_chain(false); + } params.destinations.add(dest); } } @@ -1038,7 +1044,7 @@ public class Coordinator { return result; } - public void createScanInstance(PlanNodeId leftMostScanId, FragmentExecParams fragmentExecParams) + private void createScanInstance(PlanNodeId leftMostScanId, FragmentExecParams fragmentExecParams) throws UserException { int maxNumInstance = queryOptions.mt_dop; if (maxNumInstance == 0) { @@ -1150,7 +1156,7 @@ public class Coordinator { } // create collocated instance according to inputFragments - public void createCollocatedInstance(FragmentExecParams fragmentExecParams) { + private void createCollocatedInstance(FragmentExecParams fragmentExecParams) { Preconditions.checkState(fragmentExecParams.inputFragments.size() >= 1); final FragmentExecParams inputFragmentParams = fragmentExecParamsMap.get(fragmentExecParams. inputFragments.get(0)); @@ -1169,7 +1175,7 @@ public class Coordinator { } - public void createUnionInstance(FragmentExecParams fragmentExecParams) { + private void createUnionInstance(FragmentExecParams fragmentExecParams) { final PlanFragment fragment = fragmentExecParams.fragment; // Add hosts of scan nodes List scanNodeIds = findScanNodes(fragment.getPlanRoot()); @@ -1563,7 +1569,6 @@ public class Coordinator { params.setResource_info(tResourceInfo); params.params.setQuery_id(queryId); params.params.setFragment_instance_id(instanceExecParam.instanceId); - Map> scanRanges = instanceExecParam.perNodeScanRanges; if (scanRanges == null) { scanRanges = Maps.newHashMap(); diff --git a/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java index acb87f135d..101fa70d37 100644 --- a/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java +++ b/fe/src/main/java/org/apache/doris/qe/ResultReceiver.java @@ -58,11 +58,11 @@ public class ResultReceiver { this.timeoutTs = System.currentTimeMillis() + timeoutMs; } - public TResultBatch getNext(Status status) throws TException { + public RowBatch getNext(Status status) throws TException { if (isDone) { return null; } - + final RowBatch rowBatch = new RowBatch(); try { while (!isDone && !isCancel) { PFetchDataRequest request = new PFetchDataRequest(finstId); @@ -90,6 +90,10 @@ public class ResultReceiver { if (code != TStatusCode.OK) { status.setPstatus(pResult.status); return null; + } + + if (pResult.queryConsumption != null) { + rowBatch.setQueryConsumption(pResult.queryConsumption); } if (packetIdx != pResult.packetSeq) { @@ -106,7 +110,9 @@ public class ResultReceiver { TResultBatch resultBatch = new TResultBatch(); TDeserializer deserializer = new TDeserializer(); deserializer.deserialize(resultBatch, serialResult); - return resultBatch; + rowBatch.setBatch(resultBatch); + rowBatch.setEos(pResult.eos); + return rowBatch; } } } catch (RpcException e) { @@ -134,7 +140,7 @@ public class ResultReceiver { if (isCancel) { status.setStatus(Status.CANCELLED); } - return null; + return rowBatch; } public void cancel() { diff --git a/fe/src/main/java/org/apache/doris/qe/RowBatch.java b/fe/src/main/java/org/apache/doris/qe/RowBatch.java new file mode 100644 index 0000000000..c86d93311d --- /dev/null +++ b/fe/src/main/java/org/apache/doris/qe/RowBatch.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.rpc.PQueryConsumption; +import org.apache.doris.thrift.TResultBatch; + +public final class RowBatch { + private TResultBatch batch; + private PQueryConsumption queryConsumption; + private boolean eos; + + public RowBatch() { + eos = true; + } + + public TResultBatch getBatch() { + return batch; + } + + public void setBatch(TResultBatch batch) { + this.batch = batch; + } + + public PQueryConsumption getQueryConsumption() { + return queryConsumption; + } + + public void setQueryConsumption(PQueryConsumption queryConsumption) { + this.queryConsumption = queryConsumption; + } + + public boolean isEos() { + return eos; + } + + public void setEos(boolean eos) { + this.eos = eos; + } +} diff --git a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java index 53b6c02309..32528dc5b7 100644 --- a/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -52,6 +53,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.NotImplementedException; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ProfileManager; @@ -63,6 +65,7 @@ import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.Planner; import org.apache.doris.rewrite.ExprRewriter; +import org.apache.doris.rpc.PQueryConsumption; import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TQueryOptions; @@ -75,6 +78,7 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; import java.io.StringReader; import java.nio.ByteBuffer; +import java.util.Formatter; import java.util.List; import java.util.Map; import java.util.UUID; @@ -101,6 +105,7 @@ public class StmtExecutor { private Planner planner; private boolean isProxy; private ShowResultSet proxyResultSet = null; + private QueryConsumption consumptionForAuditLog; public StmtExecutor(ConnectContext context, String stmt, boolean isProxy) { this.context = context; @@ -537,26 +542,34 @@ public class StmtExecutor { // so We need to send fields after first batch arrived // send result - TResultBatch batch; + RowBatch batch; MysqlChannel channel = context.getMysqlChannel(); boolean isSendFields = false; - while ((batch = coord.getNext()) != null) { + + while ((batch = coord.getNext()) != null && !batch.isEos()) { if (!isSendFields) { sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); } isSendFields = true; - - for (ByteBuffer row : batch.getRows()) { + for (ByteBuffer row : batch.getBatch().getRows()) { channel.sendOnePacket(row); } - context.updateReturnRows(batch.getRows().size()); + context.updateReturnRows(batch.getBatch().getRows().size()); } + setConsumptionForAuditLog(batch); if (!isSendFields) { sendFields(queryStmt.getColLabels(), queryStmt.getResultExprs()); } context.getState().setEof(); } + private void setConsumptionForAuditLog(RowBatch batch) { + if (batch != null) { + final PQueryConsumption queryConsumption = batch.getQueryConsumption(); + consumptionForAuditLog = new QueryConsumption(queryConsumption.cpu, queryConsumption.io); + } + } + // Process a select statement. private void handleInsertStmt() throws Exception { // Every time set no send flag and clean all data in buffer @@ -774,4 +787,40 @@ public class StmtExecutor { ExportStmt exportStmt = (ExportStmt) parsedStmt; context.getCatalog().getExportMgr().addExportJob(exportStmt); } + + public QueryConsumption getQueryConsumptionForAuditLog() { + if (consumptionForAuditLog == null) { + consumptionForAuditLog = new QueryConsumption(); + } + return consumptionForAuditLog; + } + + public static class QueryConsumption { + private final long cpu; + private final long io; + + public QueryConsumption() { + this.cpu = 0; + this.io = 0; + } + + public QueryConsumption(long cpu, long io) { + this.cpu = cpu; + this.io = io; + } + + public String getFormattingCpuConsumption() { + final StringBuilder builder = new StringBuilder(); + builder.append(cpu).append(" Rows"); + return builder.toString(); + } + + public String getFormattingIoConsumption() { + final Pair pair = DebugUtil.getByteUint(io); + final Formatter fmt = new Formatter(); + final StringBuilder builder = new StringBuilder(); + builder.append(fmt.format("%.2f", pair.first)).append(" ").append(pair.second); + return builder.toString(); + } + } } diff --git a/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java b/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java index cd4af477e4..2b1406a80a 100644 --- a/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java +++ b/fe/src/main/java/org/apache/doris/rpc/PFetchDataResult.java @@ -28,4 +28,6 @@ public class PFetchDataResult { public long packetSeq; @Protobuf(order = 3, required = false) public boolean eos; + @Protobuf(order = 4, required = false) + public PQueryConsumption queryConsumption; } diff --git a/fe/src/main/java/org/apache/doris/rpc/PQueryConsumption.java b/fe/src/main/java/org/apache/doris/rpc/PQueryConsumption.java new file mode 100644 index 0000000000..3b23391414 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/rpc/PQueryConsumption.java @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.rpc; + +import com.baidu.bjf.remoting.protobuf.annotation.Protobuf; +import com.baidu.bjf.remoting.protobuf.annotation.ProtobufClass; + +@ProtobufClass +public class PQueryConsumption { + @Protobuf(order = 1, required = false) + public long cpu; + @Protobuf(order = 2, required = false) + public long io; +} diff --git a/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java b/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java index e3e4a573c6..d580159dc1 100644 --- a/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java +++ b/fe/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java @@ -231,6 +231,7 @@ public class ConnectProcessorTest { // Mock statement executor StmtExecutor qe = EasyMock.createNiceMock(StmtExecutor.class); qe.execute(); + EasyMock.expect(qe.getQueryConsumptionForAuditLog()).andReturn(new StmtExecutor.QueryConsumption()); EasyMock.expectLastCall().anyTimes(); EasyMock.replay(qe); PowerMock.expectNew( @@ -254,11 +255,11 @@ public class ConnectProcessorTest { StmtExecutor qe = EasyMock.createNiceMock(StmtExecutor.class); qe.execute(); EasyMock.expectLastCall().andThrow(new IOException("Fail")).anyTimes(); + EasyMock.expect(qe.getQueryConsumptionForAuditLog()).andReturn(new StmtExecutor.QueryConsumption()); EasyMock.replay(qe); PowerMock.expectNew(StmtExecutor.class, EasyMock.isA(ConnectContext.class), EasyMock.isA(String.class)) .andReturn(qe).anyTimes(); PowerMock.replay(StmtExecutor.class); - processor.processOnce(); Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand()); } @@ -272,6 +273,7 @@ public class ConnectProcessorTest { // Mock statement executor StmtExecutor qe = EasyMock.createNiceMock(StmtExecutor.class); qe.execute(); + EasyMock.expect(qe.getQueryConsumptionForAuditLog()).andReturn(new StmtExecutor.QueryConsumption()); EasyMock.expectLastCall().andThrow(new NullPointerException("Fail")).anyTimes(); EasyMock.replay(qe); PowerMock.expectNew(StmtExecutor.class, EasyMock.isA(ConnectContext.class), EasyMock.isA(String.class)) diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto index bec67edbc3..8df748012d 100644 --- a/gensrc/proto/data.proto +++ b/gensrc/proto/data.proto @@ -19,6 +19,11 @@ syntax="proto2"; package doris; +message PQueryConsumption { + optional int64 cpu = 1; + optional int64 io = 2; +} + message PRowBatch { required int32 num_rows = 1; repeated int32 row_tuples = 2; diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 71782934b2..daad8a62a7 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -41,6 +41,8 @@ message PTransmitDataParams { optional PRowBatch row_batch = 6; // different per packet required int64 packet_seq = 7; + // query consumption + optional PQueryConsumption query_consumption = 8; }; message PTransmitDataResult { @@ -129,6 +131,7 @@ message PFetchDataResult { // valid when status is ok optional int64 packet_seq = 2; optional bool eos = 3; + optional PQueryConsumption query_consumption = 4; }; message PTriggerProfileReportRequest { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 4bc4d5ee13..77db297fdf 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -142,6 +142,7 @@ struct TPlanFragmentDestination { // ... which is being executed on this server 2: required Types.TNetworkAddress server 3: optional Types.TNetworkAddress brpc_server + 4: optional bool is_transfer_chain } // Parameters for a single execution instance of a particular TPlanFragment