diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index ae4495142e..e32dee42a4 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1165,6 +1165,8 @@ DEFINE_mInt64(enable_debug_log_timeout_secs, "0"); // Tolerance for the number of partition id 0 in rowset, default 0 DEFINE_Int32(ignore_invalid_partition_id_rowset_num, "0"); +DEFINE_mInt32(report_query_statistics_interval_ms, "3000"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 85b2631f36..0f67c64bb1 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1241,6 +1241,8 @@ DECLARE_mBool(enable_column_type_check); // Tolerance for the number of partition id 0 in rowset, default 0 DECLARE_Int32(ignore_invalid_partition_id_rowset_num); +DECLARE_mInt32(report_query_statistics_interval_ms); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index e3bf1a738b..5274808c9b 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -45,9 +45,12 @@ #include "olap/storage_engine.h" #include "olap/tablet_manager.h" #include "runtime/block_spill_manager.h" +#include "runtime/client_cache.h" #include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" #include "runtime/memory/mem_tracker.h" #include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/runtime_query_statistics_mgr.h" #include "runtime/task_group/task_group_manager.h" #include "util/cpu_info.h" #include "util/debug_util.h" @@ -352,6 +355,13 @@ void Daemon::block_spill_gc_thread() { } } +void Daemon::report_runtime_query_statistics_thread() { + while (!_stop_background_threads_latch.wait_for( + std::chrono::milliseconds(config::report_query_statistics_interval_ms))) { + ExecEnv::GetInstance()->runtime_query_statistics_mgr()->report_runtime_query_statistics(); + } +} + void Daemon::je_purge_dirty_pages_thread() const { do { std::unique_lock l(doris::MemInfo::je_purge_dirty_pages_lock); @@ -399,6 +409,9 @@ void Daemon::start() { st = Thread::create( "Daemon", "je_purge_dirty_pages_thread", [this]() { this->je_purge_dirty_pages_thread(); }, &_threads.emplace_back()); + st = Thread::create( + "Daemon", "query_runtime_statistics_thread", + [this]() { this->report_runtime_query_statistics_thread(); }, &_threads.emplace_back()); CHECK(st.ok()) << st; } diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index 18f78cbe58..e88dd73764 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -44,6 +44,7 @@ private: void calculate_metrics_thread(); void block_spill_gc_thread(); void je_purge_dirty_pages_thread() const; + void report_runtime_query_statistics_thread(); CountDownLatch _stop_background_threads_latch; std::vector> _threads; diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 2742ccd163..c58bbdb252 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -54,14 +54,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink if (!thrift_sink.__isset.stream_sink) { return Status::InternalError("Missing data stream sink."); } - bool send_query_statistics_with_every_batch = - params.__isset.send_query_statistics_with_every_batch - ? params.send_query_statistics_with_every_batch - : false; // TODO: figure out good buffer size based on size of output row sink->reset(new vectorized::VDataStreamSender(state, pool, params.sender_id, row_desc, - thrift_sink.stream_sink, params.destinations, - send_query_statistics_with_every_batch)); + thrift_sink.stream_sink, + params.destinations)); // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink)); break; } @@ -82,16 +78,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink } // TODO: figure out good buffer size based on size of output row - bool send_query_statistics_with_every_batch = - params.__isset.send_query_statistics_with_every_batch - ? params.send_query_statistics_with_every_batch - : false; // Result file sink is not the top sink if (params.__isset.destinations && params.destinations.size() > 0) { sink->reset(new doris::vectorized::VResultFileSink( state, pool, params.sender_id, row_desc, thrift_sink.result_file_sink, - params.destinations, send_query_statistics_with_every_batch, output_exprs, - desc_tbl)); + params.destinations, output_exprs, desc_tbl)); } else { sink->reset(new doris::vectorized::VResultFileSink(row_desc, output_exprs)); } @@ -201,14 +192,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink if (!thrift_sink.__isset.stream_sink) { return Status::InternalError("Missing data stream sink."); } - bool send_query_statistics_with_every_batch = - params.__isset.send_query_statistics_with_every_batch - ? params.send_query_statistics_with_every_batch - : false; // TODO: figure out good buffer size based on size of output row - *sink = std::make_unique( - state, pool, local_params.sender_id, row_desc, thrift_sink.stream_sink, - params.destinations, send_query_statistics_with_every_batch); + *sink = std::make_unique(state, pool, local_params.sender_id, + row_desc, thrift_sink.stream_sink, + params.destinations); // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink)); break; } @@ -229,16 +216,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink } // TODO: figure out good buffer size based on size of output row - bool send_query_statistics_with_every_batch = - params.__isset.send_query_statistics_with_every_batch - ? params.send_query_statistics_with_every_batch - : false; // Result file sink is not the top sink if (params.__isset.destinations && params.destinations.size() > 0) { sink->reset(new doris::vectorized::VResultFileSink( state, pool, local_params.sender_id, row_desc, thrift_sink.result_file_sink, - params.destinations, send_query_statistics_with_every_batch, output_exprs, - desc_tbl)); + params.destinations, output_exprs, desc_tbl)); } else { sink->reset(new doris::vectorized::VResultFileSink(row_desc, output_exprs)); } diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index 6faef8f8a7..3bf72ae545 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -36,7 +36,6 @@ class ObjectPool; class RuntimeState; class TPlanFragmentExecParams; class DescriptorTbl; -class QueryStatistics; class TDataSink; class TExpr; class TPipelineFragmentParams; @@ -104,10 +103,6 @@ public: // Returns the runtime profile for the sink. RuntimeProfile* profile() { return _profile; } - virtual void set_query_statistics(std::shared_ptr statistics) { - _query_statistics = statistics; - } - const RowDescriptor& row_desc() { return _row_desc; } virtual bool can_write() { return true; } @@ -124,9 +119,6 @@ protected: RuntimeProfile* _profile = nullptr; // Allocated from _pool - // Maybe this will be transferred to BufferControlBlock. - std::shared_ptr _query_statistics; - RuntimeProfile::Counter* _exec_timer = nullptr; RuntimeProfile::Counter* _blocks_sent_counter = nullptr; RuntimeProfile::Counter* _output_rows_counter = nullptr; diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 7d38ee5e65..6bc6e07c56 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -74,7 +74,6 @@ #include "vec/utils/util.hpp" namespace doris { -class QueryStatistics; const std::string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsProducedRate"; @@ -96,6 +95,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl if (tnode.__isset.output_tuple_id) { _output_row_descriptor.reset(new RowDescriptor(descs, {tnode.output_tuple_id}, {true})); } + _query_statistics = std::make_shared(); } ExecNode::~ExecNode() = default; @@ -176,22 +176,6 @@ Status ExecNode::reset(RuntimeState* state) { return Status::OK(); } -Status ExecNode::collect_query_statistics(QueryStatistics* statistics) { - DCHECK(statistics != nullptr); - for (auto child_node : _children) { - RETURN_IF_ERROR(child_node->collect_query_statistics(statistics)); - } - return Status::OK(); -} - -Status ExecNode::collect_query_statistics(QueryStatistics* statistics, int sender_id) { - DCHECK(statistics != nullptr); - for (auto child_node : _children) { - RETURN_IF_ERROR(child_node->collect_query_statistics(statistics, sender_id)); - } - return Status::OK(); -} - void ExecNode::release_resource(doris::RuntimeState* state) { if (!_is_resource_released) { if (_rows_returned_counter != nullptr) { @@ -276,6 +260,9 @@ Status ExecNode::create_tree_helper(RuntimeState* state, ObjectPool* pool, // Step 1 Create current ExecNode according to current thrift plan node. ExecNode* cur_exec_node = nullptr; RETURN_IF_ERROR(create_node(state, pool, cur_plan_node, descs, &cur_exec_node)); + if (cur_exec_node != nullptr) { + state->get_query_ctx()->register_query_statistics(cur_exec_node->get_query_statistics()); + } // Step 1.1 // Record current node if we have parent or record myself as root node. diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index aaa2f6ee07..f4b49cba6f 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -156,13 +156,6 @@ public: // so should be fast. [[nodiscard]] virtual Status reset(RuntimeState* state); - // This should be called before close() and after get_next(), it is responsible for - // collecting statistics sent with row batch, it can't be called when prepare() returns - // error. - [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* statistics); - - [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* statistics, - int sender_id); // close() will get called for every exec node, regardless of what else is called and // the status of these calls (i.e. prepare() may never have been called, or // prepare()/open()/get_next() returned with an error). @@ -243,6 +236,8 @@ public: // such as send the last buffer to remote. virtual Status try_close(RuntimeState* state) { return Status::OK(); } + std::shared_ptr get_query_statistics() { return _query_statistics; } + protected: friend class DataSink; @@ -330,6 +325,8 @@ protected: std::atomic _can_read = false; + std::shared_ptr _query_statistics = nullptr; + private: static Status create_tree_helper(RuntimeState* state, ObjectPool* pool, const std::vector& tnodes, diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 1f579b6a97..be5d3e6448 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -255,10 +255,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { auto& brpc_request = _instance_to_request[id]; brpc_request->set_eos(request.eos); brpc_request->set_packet_seq(_instance_to_seq[id]++); - if (_statistics && _statistics->collected()) { - auto statistic = brpc_request->mutable_query_statistics(); - _statistics->to_pb(statistic); - } if (request.block) { brpc_request->set_allocated_block(request.block.get()); } @@ -325,10 +321,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { if (request.block_holder->get_block()) { brpc_request->set_allocated_block(request.block_holder->get_block()); } - if (_statistics && _statistics->collected()) { - auto statistic = brpc_request->mutable_query_statistics(); - _statistics->to_pb(statistic); - } auto send_callback = request.channel->get_send_callback(id, request.eos); ExchangeRpcContext rpc_ctx; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 83b20f9c8a..a11b637f4d 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -215,7 +215,6 @@ public: _queue_dependency = queue_dependency; _finish_dependency = finish_dependency; } - void set_query_statistics(QueryStatistics* statistics) { _statistics = statistics; } void set_should_stop() { _should_stop = true; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 43bec0bd92..aa29bb4cf0 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -67,7 +67,6 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) { _sink_buffer = std::make_unique>( id, _dest_node_id, _sink->_sender_id, _state->be_number(), state->get_query_ctx()); - _sink_buffer->set_query_statistics(_sink->query_statistics()); RETURN_IF_ERROR(DataSinkOperator::prepare(state)); _sink->register_pipeline_channels(_sink_buffer.get()); return Status::OK(); @@ -135,14 +134,12 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf std::map fragment_id_to_channel_index; for (int i = 0; i < p._dests.size(); ++i) { - // Select first dest as transfer chain. - bool is_transfer_chain = (i == 0); const auto& fragment_instance_id = p._dests[i].fragment_instance_id; if (fragment_id_to_channel_index.find(fragment_instance_id.lo) == fragment_id_to_channel_index.end()) { channel_shared_ptrs.emplace_back(new vectorized::PipChannel( this, p._row_desc, p._dests[i].brpc_server, fragment_instance_id, - p._dest_node_id, is_transfer_chain, p._send_query_statistics_with_every_batch)); + p._dest_node_id)); fragment_id_to_channel_index.emplace(fragment_instance_id.lo, channel_shared_ptrs.size() - 1); channels.push_back(channel_shared_ptrs.back().get()); @@ -258,14 +255,12 @@ segment_v2::CompressionTypePB ExchangeSinkLocalState::compression_type() const { ExchangeSinkOperatorX::ExchangeSinkOperatorX( RuntimeState* state, const RowDescriptor& row_desc, int operator_id, - const TDataStreamSink& sink, const std::vector& destinations, - bool send_query_statistics_with_every_batch) + const TDataStreamSink& sink, const std::vector& destinations) : DataSinkOperatorX(operator_id, sink.dest_node_id), _texprs(sink.output_partition.partition_exprs), _row_desc(row_desc), _part_type(sink.output_partition.type), _dests(destinations), - _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch), _dest_node_id(sink.dest_node_id), _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) { DCHECK_GT(destinations.size(), 0); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 24fe9e1d84..6d1d1b6a4f 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -203,8 +203,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX& destinations, - bool send_query_statistics_with_every_batch); + const std::vector& destinations); Status init(const TDataSink& tsink) override; RuntimeState* state() { return _state; } @@ -244,7 +243,6 @@ private: PBlock _pb_block2; const std::vector _dests; - const bool _send_query_statistics_with_every_batch; std::unique_ptr _mem_tracker; // Identifier of the destination plan node. diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 255cb15141..73aa7a22e8 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -73,7 +73,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { auto& p = _parent->cast(); stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(), - profile(), p.is_merging(), p.sub_plan_query_statistics_recvr()); + profile(), p.is_merging()); auto* source_dependency = _dependency; const auto& queues = stream_recvr->sender_queues(); deps.resize(queues.size()); @@ -133,7 +133,6 @@ Status ExchangeSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state Status ExchangeSourceOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(OperatorX::prepare(state)); DCHECK_GT(_num_senders, 0); - _sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr()); if (_is_merging) { RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _row_descriptor, _row_descriptor)); diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index bbccfe5298..3340691f7e 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -113,10 +113,6 @@ public: [[nodiscard]] int num_senders() const { return _num_senders; } [[nodiscard]] bool is_merging() const { return _is_merging; } - std::shared_ptr sub_plan_query_statistics_recvr() { - return _sub_plan_query_statistics_recvr; - } - DataDistribution required_data_distribution() const override { if (OperatorX::ignore_data_distribution()) { return {ExchangeType::NOOP}; @@ -134,7 +130,6 @@ private: const bool _is_merging; const TPartitionType::type _partition_type; RowDescriptor _input_row_desc; - std::shared_ptr _sub_plan_query_statistics_recvr; // use in merge sort size_t _offset; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index bb0b8f3091..80184374b7 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -174,14 +174,6 @@ public: virtual bool is_source() const; - virtual Status collect_query_statistics(QueryStatistics* statistics) { return Status::OK(); }; - - virtual Status collect_query_statistics(QueryStatistics* statistics, int sender_id) { - return Status::OK(); - }; - - virtual void set_query_statistics(std::shared_ptr) {}; - virtual Status init(const TDataSink& tsink) { return Status::OK(); } // Prepare for running. (e.g. resource allocation, etc.) @@ -317,9 +309,6 @@ public: } [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { return _sink->profile(); } - void set_query_statistics(std::shared_ptr statistics) override { - _sink->set_query_statistics(statistics); - } protected: DataSinkType* _sink = nullptr; @@ -385,16 +374,6 @@ public: return _node->runtime_profile(); } - Status collect_query_statistics(QueryStatistics* statistics) override { - RETURN_IF_ERROR(_node->collect_query_statistics(statistics)); - return Status::OK(); - } - - Status collect_query_statistics(QueryStatistics* statistics, int sender_id) override { - RETURN_IF_ERROR(_node->collect_query_statistics(statistics, sender_id)); - return Status::OK(); - } - protected: StreamingNodeType* _node = nullptr; bool _use_projection; diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 11045c5f06..2b095748b1 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -59,13 +59,11 @@ ResultFileSinkOperatorX::ResultFileSinkOperatorX(int operator_id, const RowDescr ResultFileSinkOperatorX::ResultFileSinkOperatorX( int operator_id, const RowDescriptor& row_desc, const TResultFileSink& sink, const std::vector& destinations, - bool send_query_statistics_with_every_batch, const std::vector& t_output_expr, - DescriptorTbl& descs) + const std::vector& t_output_expr, DescriptorTbl& descs) : DataSinkOperatorX(operator_id, 0), _row_desc(row_desc), _t_output_expr(t_output_expr), _dests(destinations), - _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch), _output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false), _is_top_sink(false) { CHECK_EQ(destinations.size(), 1); @@ -134,10 +132,9 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i std::map fragment_id_to_channel_index; for (int i = 0; i < p._dests.size(); ++i) { - _channels.push_back(new vectorized::Channel( - this, p._row_desc, p._dests[i].brpc_server, state->fragment_instance_id(), - info.tsink.result_file_sink.dest_node_id, false, - p._send_query_statistics_with_every_batch)); + _channels.push_back(new vectorized::Channel(this, p._row_desc, p._dests[i].brpc_server, + state->fragment_instance_id(), + info.tsink.result_file_sink.dest_node_id)); } std::random_device rd; std::mt19937 g(rd()); @@ -187,7 +184,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) if (p._is_top_sink) { // close sender, this is normal path end if (_sender) { - _sender->update_num_written_rows(_writer == nullptr ? 0 : _writer->get_written_rows()); + _sender->update_return_rows(_writer == nullptr ? 0 : _writer->get_written_rows()); static_cast(_sender->close(final_status)); } static_cast(state->exec_env()->result_mgr()->cancel_at_time( diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h index f064b8f2b7..57e1e8c914 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.h +++ b/be/src/pipeline/exec/result_file_sink_operator.h @@ -94,7 +94,6 @@ public: ResultFileSinkOperatorX(int operator_id, const RowDescriptor& row_desc, const TResultFileSink& sink, const std::vector& destinations, - bool send_query_statistics_with_every_batch, const std::vector& t_output_expr, DescriptorTbl& descs); Status init(const TDataSink& thrift_sink) override; @@ -113,7 +112,6 @@ private: const std::vector& _t_output_expr; const std::vector _dests; - bool _send_query_statistics_with_every_batch; // set file options when sink type is FILE std::unique_ptr _file_opts; diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 8dc6eed299..8b3afb1908 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -180,9 +180,8 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) { // close sender, this is normal path end if (_sender) { if (_writer) { - _sender->update_num_written_rows(_writer->get_written_rows()); + _sender->update_return_rows(_writer->get_written_rows()); } - _sender->update_max_peak_memory_bytes(); static_cast(_sender->close(final_status)); } static_cast(state->exec_env()->result_mgr()->cancel_at_time( diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 851e0ad325..7001e2b828 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1213,6 +1213,9 @@ Status ScanLocalState::_prepare_scanners() { _eos = true; _scan_dependency->set_ready(); } else { + for (auto& scanner : scanners) { + scanner->set_query_statistics(_query_statistics.get()); + } COUNTER_SET(_num_scanners, static_cast(scanners.size())); RETURN_IF_ERROR(_start_scanners(_scanners)); } diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index ab6850b704..148191f4e2 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -115,12 +115,6 @@ public: [[nodiscard]] PipelineId id() const { return _pipeline_id; } void set_is_root_pipeline() { _is_root_pipeline = true; } bool is_root_pipeline() const { return _is_root_pipeline; } - void set_collect_query_statistics_with_every_batch() { - _collect_query_statistics_with_every_batch = true; - } - [[nodiscard]] bool collect_query_statistics_with_every_batch() const { - return _collect_query_statistics_with_every_batch; - } static bool is_hash_exchange(ExchangeType idx) { return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE; @@ -235,7 +229,6 @@ private: bool _always_can_read = false; bool _always_can_write = false; bool _is_root_pipeline = false; - bool _collect_query_statistics_with_every_batch = false; // Input data distribution of this pipeline. We do local exchange when input data distribution // does not match the target data distribution. diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 695fd6f4d3..b9c2382ce8 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -342,7 +342,6 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _root_pipeline = fragment_context->add_pipeline(); _root_pipeline->set_is_root_pipeline(); - _root_pipeline->set_collect_query_statistics_with_every_batch(); RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline)); if (_sink) { // DataSinkOperator is builded here @@ -854,7 +853,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr _multi_cast_stream_sink_senders[i].reset(new vectorized::VDataStreamSender( _runtime_state.get(), _runtime_state->obj_pool(), sender_id, row_desc, thrift_sink.multi_cast_stream_sink.sinks[i], - thrift_sink.multi_cast_stream_sink.destinations[i], false)); + thrift_sink.multi_cast_stream_sink.destinations[i])); // 2. create and set the source operator of multi_cast_data_stream_source for new pipeline OperatorBuilderPtr source_op = @@ -945,7 +944,7 @@ Status PipelineFragmentContext::send_report(bool done) { std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, std::placeholders::_2), - _dml_query_statistics()}, + _query_ctx->get_query_statistics()}, std::dynamic_pointer_cast(shared_from_this())); } diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 2a3a11d59c..353e7a0658 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -150,10 +150,6 @@ public: uint64_t create_time() const { return _create_time; } - void set_query_statistics(std::shared_ptr query_statistics) { - _query_statistics = query_statistics; - } - protected: Status _create_sink(int sender_id, const TDataSink& t_data_sink, RuntimeState* state); Status _build_pipelines(ExecNode*, PipelinePtr); @@ -230,17 +226,9 @@ protected: private: static bool _has_inverted_index_or_partial_update(TOlapTableSink sink); - std::shared_ptr _dml_query_statistics() { - if (_query_statistics && _query_statistics->collect_dml_statistics()) { - return _query_statistics; - } - return nullptr; - } std::vector> _tasks; uint64_t _create_time; - - std::shared_ptr _query_statistics = nullptr; }; } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 5f5ff56aa4..32f57c2998 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -58,11 +58,6 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* _root(_operators.back()), _sink(sink) { _pipeline_task_watcher.start(); - _query_statistics.reset(new QueryStatistics(state->query_options().query_type)); - _sink->set_query_statistics(_query_statistics); - _collect_query_statistics_with_every_batch = - _pipeline->collect_query_statistics_with_every_batch(); - fragment_context->set_query_statistics(_query_statistics); } PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, @@ -285,10 +280,6 @@ Status PipelineTask::execute(bool* eos) { if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); - if (_data_state == SourceState::FINISHED || - _collect_query_statistics_with_every_batch) { - RETURN_IF_ERROR(_collect_query_statistics()); - } status = _sink->sink(_state, block, _data_state); if (!status.is()) { RETURN_IF_ERROR(status); @@ -303,23 +294,6 @@ Status PipelineTask::execute(bool* eos) { return Status::OK(); } -Status PipelineTask::_collect_query_statistics() { - // The execnode tree of a fragment will be split into multiple pipelines, we only need to collect the root pipeline. - if (_pipeline->is_root_pipeline()) { - // If the current fragment has only one instance, we can collect all of them; - // otherwise, we need to collect them based on the sender_id. - if (_state->num_per_fragment_instances() == 1) { - _query_statistics->clear(); - RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get())); - } else { - _query_statistics->clear(); - RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get(), - _state->per_fragment_instance_idx())); - } - } - return Status::OK(); -} - Status PipelineTask::try_close(Status exec_status) { if (_try_close_flag) { return Status::OK(); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 85951bdb06..56e42370ff 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -389,9 +389,6 @@ protected: int64_t _close_pipeline_time = 0; RuntimeProfile::Counter* _pip_task_total_timer = nullptr; - std::shared_ptr _query_statistics; - Status _collect_query_statistics(); - bool _collect_query_statistics_with_every_batch = false; private: Operators _operators; // left is _source, right is _root diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index bc16d70b86..e38e7b39d9 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -316,7 +316,9 @@ PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB _rows_returned_counter(nullptr), _peak_memory_usage_counter(nullptr), _parent(parent), - _state(state) {} + _state(state) { + _query_statistics = std::make_shared(); +} template Status PipelineXLocalState::init(RuntimeState* state, LocalStateInfo& info) { diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index ec2500acf3..5304d0074f 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -107,6 +107,8 @@ public: // override in Scan MultiCastSink virtual RuntimeFilterDependency* filterdependency() { return nullptr; } + std::shared_ptr query_statistics_ptr() { return _query_statistics; } + protected: friend class OperatorXBase; @@ -119,6 +121,8 @@ protected: // which will providea reference for operator memory. std::unique_ptr _mem_tracker; + std::shared_ptr _query_statistics = nullptr; + RuntimeProfile::Counter* _rows_returned_counter = nullptr; RuntimeProfile::Counter* _blocks_returned_counter = nullptr; RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr; @@ -401,7 +405,6 @@ public: RuntimeState* state() { return _state; } RuntimeProfile* profile() { return _profile; } MemTracker* mem_tracker() { return _mem_tracker.get(); } - QueryStatistics* query_statistics() { return _query_statistics.get(); } [[nodiscard]] RuntimeProfile* faker_runtime_profile() const { return _faker_runtime_profile.get(); } @@ -418,8 +421,6 @@ protected: RuntimeState* _state = nullptr; RuntimeProfile* _profile = nullptr; std::unique_ptr _mem_tracker; - // Maybe this will be transferred to BufferControlBlock. - std::shared_ptr _query_statistics; // Set to true after close() has been called. subclasses should check and set this in // close(). bool _closed = false; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 6b1d9af512..4cc0031269 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -327,13 +327,8 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData if (!thrift_sink.__isset.stream_sink) { return Status::InternalError("Missing data stream sink."); } - bool send_query_statistics_with_every_batch = - params.__isset.send_query_statistics_with_every_batch - ? params.send_query_statistics_with_every_batch - : false; _sink.reset(new ExchangeSinkOperatorX(state, row_desc, next_sink_operator_id(), - thrift_sink.stream_sink, params.destinations, - send_query_statistics_with_every_batch)); + thrift_sink.stream_sink, params.destinations)); break; } case TDataSinkType::RESULT_SINK: { @@ -377,16 +372,11 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData } // TODO: figure out good buffer size based on size of output row - bool send_query_statistics_with_every_batch = - params.__isset.send_query_statistics_with_every_batch - ? params.send_query_statistics_with_every_batch - : false; // Result file sink is not the top sink if (params.__isset.destinations && params.destinations.size() > 0) { - _sink.reset(new ResultFileSinkOperatorX( - next_sink_operator_id(), row_desc, thrift_sink.result_file_sink, - params.destinations, send_query_statistics_with_every_batch, output_exprs, - desc_tbl)); + _sink.reset(new ResultFileSinkOperatorX(next_sink_operator_id(), row_desc, + thrift_sink.result_file_sink, + params.destinations, output_exprs, desc_tbl)); } else { _sink.reset( new ResultFileSinkOperatorX(next_sink_operator_id(), row_desc, output_exprs)); @@ -431,10 +421,10 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData // 2. create and set sink operator of data stream sender for new pipeline DataSinkOperatorXPtr sink_op; - sink_op.reset(new ExchangeSinkOperatorX( - state, *_row_desc, next_sink_operator_id(), - thrift_sink.multi_cast_stream_sink.sinks[i], - thrift_sink.multi_cast_stream_sink.destinations[i], false)); + sink_op.reset( + new ExchangeSinkOperatorX(state, *_row_desc, next_sink_operator_id(), + thrift_sink.multi_cast_stream_sink.sinks[i], + thrift_sink.multi_cast_stream_sink.destinations[i])); static_cast(new_pipeline->set_sink(sink_op)); { @@ -605,7 +595,8 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( auto prepare_and_set_parent_profile = [&](PipelineXTask* task, size_t pip_idx) { DCHECK(pipeline_id_to_profile[pip_idx]); - RETURN_IF_ERROR(task->prepare(local_params, request.fragment.output_sink)); + RETURN_IF_ERROR( + task->prepare(local_params, request.fragment.output_sink, _query_ctx.get())); return Status::OK(); }; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 4c0c0af1cd..29bc70fce6 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -67,7 +67,8 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta pipeline->incr_created_tasks(); } -Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink) { +Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink, + QueryContext* query_ctx) { DCHECK(_sink); DCHECK(_cur_state == PipelineTaskState::NOT_READY) << get_state_name(_cur_state); _init_profile(); @@ -97,6 +98,8 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const _le_state_map, _task_idx, _source_dependency[op->operator_id()]}; RETURN_IF_ERROR(op->setup_local_state(_state, info)); parent_profile = _state->get_local_state(op->operator_id())->profile(); + query_ctx->register_query_statistics( + _state->get_local_state(op->operator_id())->query_statistics_ptr()); } _block = doris::vectorized::Block::create_unique(); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index f7b996f40a..c9e17727c7 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -62,7 +62,8 @@ public: return Status::InternalError("Should not reach here!"); } - Status prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink); + Status prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink, + QueryContext* query_ctx); Status execute(bool* eos) override; diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 91cb032c76..d746a6aca1 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -91,7 +91,9 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size) _is_cancelled(false), _buffer_rows(0), _buffer_limit(buffer_size), - _packet_num(0) {} + _packet_num(0) { + _query_statistics = std::make_unique(); +} BufferControlBlock::~BufferControlBlock() { static_cast(cancel()); diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index f75008f101..d84bcba681 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -91,11 +91,7 @@ public: [[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; } - void set_query_statistics(std::shared_ptr statistics) { - _query_statistics = statistics; - } - - void update_num_written_rows(int64_t num_rows) { + void update_return_rows(int64_t num_rows) { // _query_statistics may be null when the result sink init failed // or some other failure. // and the number of written rows is only needed when all things go well. @@ -104,13 +100,6 @@ public: } } - void update_max_peak_memory_bytes() { - if (_query_statistics != nullptr) { - int64_t max_peak_memory_bytes = _query_statistics->calculate_max_peak_memory_bytes(); - _query_statistics->set_max_peak_memory_bytes(max_peak_memory_bytes); - } - } - protected: virtual bool _get_batch_queue_empty() { return _fe_result_batch_queue.empty() && _arrow_flight_batch_queue.empty(); @@ -142,10 +131,8 @@ protected: std::deque _waiting_rpc; - // It is shared with PlanFragmentExecutor and will be called in two different - // threads. But their calls are all at different time, there is no problem of - // multithreading access. - std::shared_ptr _query_statistics; + // only used for FE using return rows to check limit + std::unique_ptr _query_statistics; }; class PipBufferControlBlock : public BufferControlBlock { diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index ddbb90b45a..4d0e548ea2 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -75,6 +75,7 @@ class MemTracker; class StorageEngine; class ResultBufferMgr; class ResultQueueMgr; +class RuntimeQueryStatiticsMgr; class TMasterInfo; class LoadChannelMgr; class LoadStreamMgr; @@ -154,6 +155,9 @@ public: pipeline::TaskScheduler* pipeline_task_group_scheduler() { return _with_group_task_scheduler; } taskgroup::TaskGroupManager* task_group_manager() { return _task_group_manager; } WorkloadSchedPolicyMgr* workload_sched_policy_mgr() { return _workload_sched_mgr; } + RuntimeQueryStatiticsMgr* runtime_query_statistics_mgr() { + return _runtime_query_statistics_mgr; + } // using template to simplify client cache management template @@ -384,6 +388,8 @@ private: doris::pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr; WorkloadSchedPolicyMgr* _workload_sched_mgr = nullptr; + + RuntimeQueryStatiticsMgr* _runtime_query_statistics_mgr = nullptr; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 4ab954a550..109103c86f 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -70,6 +70,7 @@ #include "runtime/result_buffer_mgr.h" #include "runtime/result_queue_mgr.h" #include "runtime/routine_load/routine_load_task_executor.h" +#include "runtime/runtime_query_statistics_mgr.h" #include "runtime/small_file_mgr.h" #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" @@ -196,6 +197,9 @@ Status ExecEnv::_init(const std::vector& store_paths, .set_max_queue_size(1000000) .build(&_lazy_release_obj_pool)); + // NOTE: runtime query statistics mgr could be visited by query and daemon thread + // so it should be created before all query begin and deleted after all query and daemon thread stoppped + _runtime_query_statistics_mgr = new RuntimeQueryStatiticsMgr(); init_file_cache_factory(); RETURN_IF_ERROR(init_pipeline_task_scheduler()); _task_group_manager = new taskgroup::TaskGroupManager(); @@ -639,6 +643,10 @@ void ExecEnv::destroy() { // info is deconstructed then BE process will core at coordinator back method in fragment mgr. SAFE_DELETE(_master_info); + // NOTE: runtime query statistics mgr could be visited by query and daemon thread + // so it should be created before all query begin and deleted after all query and daemon thread stoppped + SAFE_DELETE(_runtime_query_statistics_mgr); + LOG(INFO) << "Doris exec envorinment is destoried."; } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 8cf11047b6..d18eb2ef00 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -223,7 +223,6 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { if (req.query_statistics) { // use to report 'insert into select' TQueryStatistics queryStatistics; - DCHECK(req.query_statistics->collect_dml_statistics()); req.query_statistics->to_thrift(&queryStatistics); params.__set_query_statistics(queryStatistics); } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 896178946e..9c40e2e9f3 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -92,10 +92,11 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, _closed(false), _is_report_success(false), _is_report_on_cancel(true), - _collect_query_statistics_with_every_batch(false), _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) { _report_thread_future = _report_thread_promise.get_future(); _start_time = VecDateTimeValue::local_time(); + _query_statistics = std::make_shared(); + _query_ctx->register_query_statistics(_query_statistics); } PlanFragmentExecutor::~PlanFragmentExecutor() { @@ -231,11 +232,6 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { if (sink_profile != nullptr) { profile()->add_child(sink_profile, true, nullptr); } - - _collect_query_statistics_with_every_batch = - params.__isset.send_query_statistics_with_every_batch - ? params.send_query_statistics_with_every_batch - : false; } else { // _sink is set to nullptr _sink.reset(nullptr); @@ -254,11 +250,6 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { VLOG_NOTICE << "plan_root=\n" << _plan->debug_string(); _prepared = true; - - _query_statistics.reset(new QueryStatistics(request.query_options.query_type)); - if (_sink != nullptr) { - _sink->set_query_statistics(_query_statistics); - } return Status::OK(); } @@ -336,10 +327,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() { st = get_vectorized_internal(block.get(), &eos); RETURN_IF_ERROR(st); - // Collect this plan and sub plan statistics, and send to parent plan. - if (_collect_query_statistics_with_every_batch) { - _collect_query_statistics(); - } + _query_statistics->add_cpu_ms(_fragment_cpu_timer->value() / NANOS_PER_MILLIS); if (!eos || block->rows() > 0) { st = _sink->send(runtime_state(), block.get()); @@ -351,7 +339,6 @@ Status PlanFragmentExecutor::open_vectorized_internal() { } } { - _collect_query_statistics(); Status status; { std::lock_guard l(_status_lock); @@ -430,44 +417,6 @@ bool PlanFragmentExecutor::is_timeout(const VecDateTimeValue& now) const { return false; } -void PlanFragmentExecutor::_collect_query_statistics() { - _query_statistics->clear(); - Status status; - /// TODO(yxc): - // The judgment of enable_local_exchange here is a bug, it should not need to be checked. I will fix this later. - bool _is_local = false; - if (_runtime_state->query_options().__isset.enable_local_exchange) { - _is_local = _runtime_state->query_options().enable_local_exchange; - } - - if (_is_local) { - if (_runtime_state->num_per_fragment_instances() == 1) { - status = _plan->collect_query_statistics(_query_statistics.get()); - } else { - status = _plan->collect_query_statistics(_query_statistics.get(), - _runtime_state->per_fragment_instance_idx()); - } - } else { - status = _plan->collect_query_statistics(_query_statistics.get()); - } - - if (!status.ok()) { - LOG(INFO) << "collect query statistics failed, st=" << status; - return; - } - _query_statistics->add_cpu_ms(_fragment_cpu_timer->value() / NANOS_PER_MILLIS); - if (_runtime_state->backend_id() != -1) { - _collect_node_statistics(); - } -} - -void PlanFragmentExecutor::_collect_node_statistics() { - DCHECK(_runtime_state->backend_id() != -1); - NodeStatistics* node_statistics = - _query_statistics->add_nodes_statistics(_runtime_state->backend_id()); - node_statistics->set_peak_memory(_runtime_state->query_mem_tracker()->peak_consumption()); -} - void PlanFragmentExecutor::report_profile() { SCOPED_ATTACH_TASK(_runtime_state.get()); VLOG_FILE << "report_profile(): instance_id=" << _runtime_state->fragment_instance_id(); @@ -559,7 +508,7 @@ void PlanFragmentExecutor::send_report(bool done) { std::bind(&PlanFragmentExecutor::update_status, this, std::placeholders::_1), std::bind(&PlanFragmentExecutor::cancel, this, std::placeholders::_1, std::placeholders::_2), - _dml_query_statistics()}; + _query_ctx->get_query_statistics()}; // This will send a report even if we are cancelled. If the query completed correctly // but fragments still need to be cancelled (e.g. limit reached), the coordinator will // be waiting for a final report and profile. diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 41817e5308..051448f13f 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -47,7 +47,6 @@ class DataSink; class DescriptorTbl; class ExecEnv; class ObjectPool; -class QueryStatistics; struct ReportStatusRequest; namespace vectorized { @@ -231,12 +230,6 @@ private: VecDateTimeValue _start_time; - // It is shared with BufferControlBlock and will be called in two different - // threads. But their calls are all at different time, there is no problem of - // multithreaded access. - std::shared_ptr _query_statistics; - bool _collect_query_statistics_with_every_batch; - // Record the cancel information when calling the cancel() method, return it to FE PPlanFragmentCancelReason _cancel_reason; std::string _cancel_msg; @@ -275,16 +268,9 @@ private: const DescriptorTbl& desc_tbl() const { return _runtime_state->desc_tbl(); } - void _collect_query_statistics(); - - std::shared_ptr _dml_query_statistics() { - if (_query_statistics && _query_statistics->collect_dml_statistics()) { - return _query_statistics; - } - return nullptr; - } - void _collect_node_statistics(); + + std::shared_ptr _query_statistics = nullptr; }; } // namespace doris diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index e60f29b291..bd7dee33b5 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -19,6 +19,7 @@ #include "pipeline/pipeline_fragment_context.h" #include "pipeline/pipeline_x/dependency.h" +#include "runtime/runtime_query_statistics_mgr.h" namespace doris { @@ -73,6 +74,7 @@ QueryContext::~QueryContext() { static_cast(ExecEnv::GetInstance()->lazy_release_obj_pool()->submit( std::make_shared(std::move(_thread_token)))); } + _exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id)); } void QueryContext::set_ready_to_execute(bool is_cancelled) { @@ -118,4 +120,15 @@ bool QueryContext::cancel(bool v, std::string msg, Status new_status, int fragme } return true; } + +void QueryContext::register_query_statistics(std::shared_ptr qs) { + _exec_env->runtime_query_statistics_mgr()->register_query_statistics(print_id(_query_id), qs, + coord_addr); +} + +std::shared_ptr QueryContext::get_query_statistics() { + return _exec_env->runtime_query_statistics_mgr()->get_runtime_query_statistics( + print_id(_query_id)); +} + } // namespace doris diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 203c5b6e3f..9e906cbbe1 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -208,6 +208,10 @@ public: pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); } + void register_query_statistics(std::shared_ptr qs); + + std::shared_ptr get_query_statistics(); + public: DescriptorTbl* desc_tbl = nullptr; bool set_rsc_info = false; diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp index 02789e2dab..7171803ce0 100644 --- a/be/src/runtime/query_statistics.cpp +++ b/be/src/runtime/query_statistics.cpp @@ -64,11 +64,11 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) { void QueryStatistics::to_thrift(TQueryStatistics* statistics) const { DCHECK(statistics != nullptr); - statistics->scan_bytes = scan_bytes; - statistics->scan_rows = scan_rows; - statistics->cpu_ms = cpu_ms; - statistics->returned_rows = returned_rows; - statistics->max_peak_memory_bytes = max_peak_memory_bytes; + statistics->__set_scan_bytes(scan_bytes); + statistics->__set_scan_rows(scan_rows); + statistics->__set_cpu_ms(cpu_ms); + statistics->__set_returned_rows(returned_rows); + statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes); } void QueryStatistics::from_pb(const PQueryStatistics& statistics) { diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index fa39c9ea18..8c4662ba59 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -59,13 +59,8 @@ private: // or plan's statistics and QueryStatisticsRecvr is responsible for collecting it. class QueryStatistics { public: - QueryStatistics(TQueryType::type query_type = TQueryType::type::SELECT) - : scan_rows(0), - scan_bytes(0), - cpu_ms(0), - returned_rows(0), - max_peak_memory_bytes(0), - _query_type(query_type) {} + QueryStatistics() + : scan_rows(0), scan_bytes(0), cpu_ms(0), returned_rows(0), max_peak_memory_bytes(0) {} virtual ~QueryStatistics(); void merge(const QueryStatistics& other); @@ -103,8 +98,9 @@ public: void clearNodeStatistics(); void clear() { - scan_rows = 0; - scan_bytes = 0; + scan_rows.store(0); + scan_bytes.store(0); + cpu_ms = 0; returned_rows = 0; max_peak_memory_bytes = 0; @@ -119,13 +115,13 @@ public: bool collected() const { return _collected; } void set_collected() { _collected = true; } - // LOAD does not need to collect information on the exchange node. - bool collect_dml_statistics() { return _query_type == TQueryType::LOAD; } + int64_t get_scan_rows() { return scan_rows.load(); } + int64_t get_scan_bytes() { return scan_bytes.load(); } private: friend class QueryStatisticsRecvr; - int64_t scan_rows; - int64_t scan_bytes; + std::atomic scan_rows; + std::atomic scan_bytes; int64_t cpu_ms; // number rows returned by query. // only set once by result sink when closing. @@ -137,7 +133,6 @@ private: using NodeStatisticsMap = std::unordered_map; NodeStatisticsMap _nodes_statistics_map; bool _collected = false; - const TQueryType::type _query_type; }; using QueryStatisticsPtr = std::shared_ptr; // It is used for collecting sub plan query statistics in DataStreamRecvr. diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp new file mode 100644 index 0000000000..6a8aa3f509 --- /dev/null +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/runtime_query_statistics_mgr.h" + +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "util/debug_util.h" + +namespace doris { + +void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id, + std::shared_ptr qs_ptr, + TNetworkAddress fe_addr) { + std::lock_guard write_lock(_qs_ctx_map_lock); + if (_query_statistics_ctx_map.find(query_id) == _query_statistics_ctx_map.end()) { + _query_statistics_ctx_map[query_id] = std::make_unique(fe_addr); + } + _query_statistics_ctx_map.at(query_id)->qs_list.push_back(qs_ptr); +} + +void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() { + int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id; + // 1 get query statistics map + std::map> fe_qs_map; + std::map query_finished; + { + std::lock_guard write_lock(_qs_ctx_map_lock); + for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) { + if (fe_qs_map.find(qs_ctx_ptr->fe_addr) == fe_qs_map.end()) { + std::map tmp_map; + fe_qs_map[qs_ctx_ptr->fe_addr] = std::move(tmp_map); + } + + QueryStatistics tmp_qs; + for (auto& qs_ptr : qs_ctx_ptr->qs_list) { + tmp_qs.merge(*qs_ptr); + } + TQueryStatistics ret_t_qs; + tmp_qs.to_thrift(&ret_t_qs); + fe_qs_map.at(qs_ctx_ptr->fe_addr)[query_id] = ret_t_qs; + query_finished[query_id] = qs_ctx_ptr->is_query_finished; + } + } + + // 2 report query statistics to fe + std::map rpc_result; + for (auto& [addr, qs_map] : fe_qs_map) { + rpc_result[addr] = false; + // 2.1 get client + Status coord_status; + FrontendServiceConnection coord(ExecEnv::GetInstance()->frontend_client_cache(), addr, + &coord_status); + std::string add_str = PrintThriftNetworkAddress(addr); + if (!coord_status.ok()) { + std::stringstream ss; + LOG(WARNING) << "could not get client " << add_str + << " when report workload runtime stats, reason is " + << coord_status.to_string(); + continue; + } + + // 2.2 send report + TReportWorkloadRuntimeStatusParams report_runtime_params; + report_runtime_params.__set_backend_id(be_id); + report_runtime_params.__set_query_statistics_map(qs_map); + + TReportExecStatusParams params; + params.report_workload_runtime_status = report_runtime_params; + + TReportExecStatusResult res; + Status rpc_status; + try { + coord->reportExecStatus(res, params); + rpc_result[addr] = true; + } catch (apache::thrift::transport::TTransportException& e) { + LOG(WARNING) << "report workload runtime stats to " << add_str + << " failed, err: " << e.what(); + rpc_status = coord.reopen(); + if (!rpc_status.ok()) { + LOG(WARNING) + << "reopen thrift client failed when report workload runtime statistics to" + << add_str; + } else { + try { + coord->reportExecStatus(res, params); + rpc_result[addr] = true; + } catch (apache::thrift::transport::TTransportException& e2) { + LOG(WARNING) << "retry report workload runtime stats to " << add_str + << " failed, err: " << e2.what(); + } + } + } + } + + // 3 when query is finished and (last rpc is send success), remove finished query statistics + { + std::lock_guard write_lock(_qs_ctx_map_lock); + for (auto& [addr, qs_map] : fe_qs_map) { + if (rpc_result[addr]) { + for (auto& [query_id, qs] : qs_map) { + if (query_finished[query_id]) { + _query_statistics_ctx_map.erase(query_id); + } + } + } + } + } +} + +void RuntimeQueryStatiticsMgr::set_query_finished(std::string query_id) { + // NOTE: here must be a write lock + std::lock_guard write_lock(_qs_ctx_map_lock); + // when a query get query_ctx succ, but failed before create node/operator, + // it may not register query statistics, so it can not be mark finish + if (_query_statistics_ctx_map.find(query_id) != _query_statistics_ctx_map.end()) { + _query_statistics_ctx_map.at(query_id)->is_query_finished = true; + } +} + +std::shared_ptr RuntimeQueryStatiticsMgr::get_runtime_query_statistics( + std::string query_id) { + std::shared_lock read_lock(_qs_ctx_map_lock); + if (_query_statistics_ctx_map.find(query_id) == _query_statistics_ctx_map.end()) { + return nullptr; + } + std::shared_ptr qs_ptr = std::make_shared(); + for (auto const& qs : _query_statistics_ctx_map[query_id]->qs_list) { + qs_ptr->merge(*qs); + } + return qs_ptr; +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/runtime_query_statistics_mgr.h b/be/src/runtime/runtime_query_statistics_mgr.h new file mode 100644 index 0000000000..b3fa4bbc40 --- /dev/null +++ b/be/src/runtime/runtime_query_statistics_mgr.h @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "runtime/query_statistics.h" + +namespace doris { + +class QueryStatisticsCtx { +public: + QueryStatisticsCtx(TNetworkAddress fe_addr) : fe_addr(fe_addr) { + this->is_query_finished = false; + } + ~QueryStatisticsCtx() = default; + + std::vector> qs_list; + std::atomic is_query_finished; + TNetworkAddress fe_addr; +}; + +class RuntimeQueryStatiticsMgr { +public: + RuntimeQueryStatiticsMgr() = default; + ~RuntimeQueryStatiticsMgr() = default; + + void register_query_statistics(std::string query_id, std::shared_ptr qs_ptr, + TNetworkAddress fe_addr); + + void report_runtime_query_statistics(); + + void set_query_finished(std::string query_id); + + std::shared_ptr get_runtime_query_statistics(std::string query_id); + +private: + std::shared_mutex _qs_ctx_map_lock; + std::map> _query_statistics_ctx_map; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 8ac1af004d..e1f39f2948 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -80,21 +80,6 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, } } -Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) { - RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics)); - if (!_is_pipeline_scan || _should_create_scanner) { - statistics->add_scan_bytes(_byte_read_counter->value()); - statistics->add_scan_rows(_rows_read_counter->value()); - statistics->add_cpu_ms(_scan_cpu_timer->value() / NANOS_PER_MILLIS); - } - return Status::OK(); -} - -Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics, int) { - RETURN_IF_ERROR(collect_query_statistics(statistics)); - return Status::OK(); -} - Status NewOlapScanNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VScanNode::prepare(state)); // if you want to add some profile in scan node, even it have not new VScanner object diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h b/be/src/vec/exec/scan/new_olap_scan_node.h index 309bac5699..ca357b7eb7 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.h +++ b/be/src/vec/exec/scan/new_olap_scan_node.h @@ -63,8 +63,6 @@ public: friend class doris::pipeline::OlapScanOperator; Status prepare(RuntimeState* state) override; - Status collect_query_statistics(QueryStatistics* statistics) override; - Status collect_query_statistics(QueryStatistics* statistics, int sender_id) override; void set_scan_ranges(RuntimeState* state, const std::vector& scan_ranges) override; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 2a56a733b4..cf52341b34 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -1332,6 +1332,9 @@ Status VScanNode::_prepare_scanners(const int query_parallel_instance_num) { if (scanners.empty()) { _eos = true; } else { + for (auto& scanner : scanners) { + scanner->set_query_statistics(_query_statistics.get()); + } COUNTER_SET(_num_scanners, static_cast(scanners.size())); _start_scanners(_scanners, query_parallel_instance_num); } diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 0a7a8c9c01..2cdd1d503b 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -102,6 +102,8 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { } } + int64_t old_scan_rows = _num_rows_read; + int64_t old_scan_bytes = _num_byte_read; { do { // if step 2 filter all rows of block, and block will be reused to get next rows, @@ -133,6 +135,11 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { _num_rows_read < rows_read_threshold); } + if (_query_statistics) { + _query_statistics->add_scan_rows(_num_rows_read - old_scan_rows); + _query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes); + } + if (state->is_cancelled()) { return Status::Cancelled("cancelled"); } diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 9fdaddcea2..23ddb65629 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -32,6 +32,7 @@ namespace doris { class RuntimeProfile; class TupleDescriptor; +class QueryStatistics; namespace vectorized { class VExprContext; @@ -148,6 +149,10 @@ public: void set_status_on_failure(const Status& st) { _status = st; } + void set_query_statistics(QueryStatistics* query_statistics) { + _query_statistics = query_statistics; + } + protected: void _discard_conjuncts() { for (auto& conjunct : _conjuncts) { @@ -159,6 +164,8 @@ protected: RuntimeState* _state = nullptr; VScanNode* _parent = nullptr; pipeline::ScanLocalStateBase* _local_state = nullptr; + QueryStatistics* _query_statistics = nullptr; + // Set if scan node has sort limit info int64_t _limit = -1; diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index 6baf6749ed..5b3e38af56 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -63,12 +63,11 @@ Status VExchangeNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_TIMER(_exec_timer); DCHECK_GT(_num_senders, 0); - _sub_plan_query_statistics_recvr.reset(new QueryStatisticsRecvr()); CHECK(state->exec_env() != nullptr); CHECK(state->exec_env()->vstream_mgr() != nullptr); _stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( state, _input_row_desc, state->fragment_instance_id(), _id, _num_senders, - _runtime_profile.get(), _is_merging, _sub_plan_query_statistics_recvr); + _runtime_profile.get(), _is_merging); if (_is_merging) { RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _row_descriptor, _row_descriptor)); @@ -145,20 +144,6 @@ void VExchangeNode::release_resource(RuntimeState* state) { ExecNode::release_resource(state); } -Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics) { - RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics)); - if (!statistics->collect_dml_statistics()) { - statistics->merge(_sub_plan_query_statistics_recvr.get()); - } - return Status::OK(); -} -Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics, int sender_id) { - RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics)); - if (!statistics->collect_dml_statistics()) { - statistics->merge(_sub_plan_query_statistics_recvr.get(), sender_id); - } - return Status::OK(); -} Status VExchangeNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h index 94302e84d9..e49eb86a92 100644 --- a/be/src/vec/exec/vexchange_node.h +++ b/be/src/vec/exec/vexchange_node.h @@ -32,7 +32,6 @@ namespace doris { class DorisNodesInfo; class ObjectPool; class QueryStatistics; -class QueryStatisticsRecvr; class RuntimeState; class TPlanNode; @@ -55,8 +54,6 @@ public: Status open(RuntimeState* state) override; Status get_next(RuntimeState* state, Block* row_batch, bool* eos) override; void release_resource(RuntimeState* state) override; - Status collect_query_statistics(QueryStatistics* statistics) override; - Status collect_query_statistics(QueryStatistics* statistics, int sender_id) override; Status close(RuntimeState* state) override; void set_num_senders(int num_senders) { _num_senders = num_senders; } @@ -67,7 +64,6 @@ private: bool _is_ready; std::shared_ptr _stream_recvr; RowDescriptor _input_row_desc; - std::shared_ptr _sub_plan_query_statistics_recvr; // use in merge sort size_t _offset; diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index 425e92e92e..1210cd811d 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -57,14 +57,13 @@ inline uint32_t VDataStreamMgr::get_hash_value(const TUniqueId& fragment_instanc std::shared_ptr VDataStreamMgr::create_recvr( RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, bool is_merging, - std::shared_ptr sub_plan_query_statistics_recvr) { + PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile, bool is_merging) { DCHECK(profile != nullptr); VLOG_FILE << "creating receiver for fragment=" << print_id(fragment_instance_id) << ", node=" << dest_node_id; - std::shared_ptr recvr(new VDataStreamRecvr( - this, state, row_desc, fragment_instance_id, dest_node_id, num_senders, is_merging, - profile, sub_plan_query_statistics_recvr)); + std::shared_ptr recvr(new VDataStreamRecvr(this, state, row_desc, + fragment_instance_id, dest_node_id, + num_senders, is_merging, profile)); uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id); std::lock_guard l(_lock); _fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id)); @@ -127,12 +126,6 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, // then the upstream node may report error status to FE, the query is failed. return Status::EndOfFile("data stream receiver is deconstructed"); } - // request can only be used before calling recvr's add_batch or when request - // is the last for the sender, because request maybe released after it's batch - // is consumed by ExchangeNode. - if (request->has_query_statistics()) { - recvr->add_sub_plan_statistics(request->query_statistics(), request->sender_id()); - } bool eos = request->eos(); if (request->has_block()) { diff --git a/be/src/vec/runtime/vdata_stream_mgr.h b/be/src/vec/runtime/vdata_stream_mgr.h index d809ff96fb..853d984621 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.h +++ b/be/src/vec/runtime/vdata_stream_mgr.h @@ -39,7 +39,6 @@ namespace doris { class RuntimeState; class RowDescriptor; class RuntimeProfile; -class QueryStatisticsRecvr; class PTransmitDataParams; namespace vectorized { @@ -50,11 +49,11 @@ public: VDataStreamMgr(); ~VDataStreamMgr(); - std::shared_ptr create_recvr( - RuntimeState* state, const RowDescriptor& row_desc, - const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, - RuntimeProfile* profile, bool is_merging, - std::shared_ptr sub_plan_query_statistics_recvr); + std::shared_ptr create_recvr(RuntimeState* state, + const RowDescriptor& row_desc, + const TUniqueId& fragment_instance_id, + PlanNodeId dest_node_id, int num_senders, + RuntimeProfile* profile, bool is_merging); std::shared_ptr find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id, bool acquire_lock = true); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index b75bd4c21b..dfc574591b 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -336,11 +336,10 @@ void VDataStreamRecvr::SenderQueue::close() { _block_queue.clear(); } -VDataStreamRecvr::VDataStreamRecvr( - VDataStreamMgr* stream_mgr, RuntimeState* state, const RowDescriptor& row_desc, - const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, - bool is_merging, RuntimeProfile* profile, - std::shared_ptr sub_plan_query_statistics_recvr) +VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, + const RowDescriptor& row_desc, + const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, + int num_senders, bool is_merging, RuntimeProfile* profile) : HasTaskExecutionCtx(state), _mgr(stream_mgr), #ifdef USE_MEM_TRACKER @@ -354,7 +353,6 @@ VDataStreamRecvr::VDataStreamRecvr( _is_closed(false), _profile(profile), _peak_memory_usage_counter(nullptr), - _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr), _enable_pipeline(state->enable_pipeline_exec()), _mem_available(std::make_shared(true)) { // DataStreamRecvr may be destructed after the instance execution thread ends. @@ -483,17 +481,6 @@ void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status exec_s _sender_queues[use_sender_id]->decrement_senders(be_number); } -void VDataStreamRecvr::remove_sender(int sender_id, int be_number, QueryStatisticsPtr statistics, - Status exec_status) { - if (!exec_status.ok()) { - cancel_stream(exec_status); - return; - } - int use_sender_id = _is_merging ? sender_id : 0; - _sender_queues[use_sender_id]->decrement_senders(be_number); - _sub_plan_query_statistics_recvr->insert(statistics, sender_id); -} - void VDataStreamRecvr::cancel_stream(Status exec_status) { VLOG_QUERY << "cancel_stream: fragment_instance_id=" << print_id(_fragment_instance_id) << exec_status; diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 7f9436cba5..5e64268276 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -56,7 +56,6 @@ namespace doris { class MemTracker; class PBlock; class MemTrackerLimiter; -class PQueryStatistics; class RuntimeState; namespace pipeline { @@ -76,8 +75,7 @@ public: class SenderQueue; VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* state, const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, - int num_senders, bool is_merging, RuntimeProfile* profile, - std::shared_ptr sub_plan_query_statistics_recvr); + int num_senders, bool is_merging, RuntimeProfile* profile); virtual ~VDataStreamRecvr(); @@ -103,17 +101,10 @@ public: PlanNodeId dest_node_id() const { return _dest_node_id; } const RowDescriptor& row_desc() const { return _row_desc; } - void add_sub_plan_statistics(const PQueryStatistics& statistics, int sender_id) { - _sub_plan_query_statistics_recvr->insert(statistics, sender_id); - } - // Indicate that a particular sender is done. Delegated to the appropriate // sender queue. Called from DataStreamMgr. void remove_sender(int sender_id, int be_number, Status exec_status); - void remove_sender(int sender_id, int be_number, QueryStatisticsPtr statistics, - Status exec_status); - void cancel_stream(Status exec_status); void close(); @@ -184,8 +175,6 @@ private: // Number of blocks received RuntimeProfile::Counter* _blocks_produced_counter = nullptr; - std::shared_ptr _sub_plan_query_statistics_recvr; - bool _enable_pipeline; std::vector> _sender_to_local_channel_dependency; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 53c140ef5a..6c4d10839e 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -152,13 +152,7 @@ Status Channel::send_local_block(Status exec_status, bool eos) { _local_recvr->add_block(&block, _parent->sender_id(), true); if (eos) { - /// TODO: Supported on pipelineX, we can hold QueryStatistics on the fragment instead of on instances. - if constexpr (std::is_same_v) { - _local_recvr->remove_sender(_parent->sender_id(), _be_number, - _parent->query_statisticsPtr(), exec_status); - } else { - _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); - } + _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); } return Status::OK(); } else { @@ -199,10 +193,6 @@ Status Channel::send_remote_block(PBlock* block, bool eos, Status exec_s VLOG_ROW << "Channel::send_batch() instance_id=" << print_id(_fragment_instance_id) << " dest_node=" << _dest_node_id << " to_host=" << _brpc_dest_addr.hostname << " _packet_seq=" << _packet_seq << " row_desc=" << _row_desc.debug_string(); - if (_is_transfer_chain && (_send_query_statistics_with_every_batch || eos)) { - auto statistic = _brpc_request->mutable_query_statistics(); - _parent->query_statistics()->to_pb(statistic); - } _brpc_request->set_eos(eos); if (!exec_status.ok()) { @@ -289,12 +279,7 @@ Status Channel::close_internal(Status exec_status) { SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); if (is_local()) { if (_recvr_is_valid()) { - if constexpr (std::is_same_v) { - _local_recvr->remove_sender(_parent->sender_id(), _be_number, - _parent->query_statisticsPtr(), exec_status); - } else { - _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); - } + _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); } } else { status = send_remote_block((PBlock*)nullptr, true, exec_status); @@ -329,8 +314,7 @@ void Channel::ch_roll_pb_block() { VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, const TDataStreamSink& sink, - const std::vector& destinations, - bool send_query_statistics_with_every_batch) + const std::vector& destinations) : DataSink(row_desc), _sender_id(sender_id), _state(state), @@ -351,21 +335,17 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int _enable_pipeline_exec = state->enable_pipeline_exec(); for (int i = 0; i < destinations.size(); ++i) { - // Select first dest as transfer chain. - bool is_transfer_chain = (i == 0); const auto& fragment_instance_id = destinations[i].fragment_instance_id; if (fragment_id_to_channel_index.find(fragment_instance_id.lo) == fragment_id_to_channel_index.end()) { if (_enable_pipeline_exec) { _channel_shared_ptrs.emplace_back(new PipChannel( this, row_desc, destinations[i].brpc_server, fragment_instance_id, - sink.dest_node_id, is_transfer_chain, - send_query_statistics_with_every_batch)); + sink.dest_node_id)); } else { _channel_shared_ptrs.emplace_back( new Channel(this, row_desc, destinations[i].brpc_server, - fragment_instance_id, sink.dest_node_id, is_transfer_chain, - send_query_statistics_with_every_batch)); + fragment_instance_id, sink.dest_node_id)); } fragment_id_to_channel_index.emplace(fragment_instance_id.lo, _channel_shared_ptrs.size() - 1); @@ -388,8 +368,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, PlanNodeId dest_node_id, - const std::vector& destinations, - bool send_query_statistics_with_every_batch) + const std::vector& destinations) : DataSink(row_desc), _sender_id(sender_id), _state(state), @@ -405,9 +384,9 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int const auto& fragment_instance_id = destinations[i].fragment_instance_id; if (fragment_id_to_channel_index.find(fragment_instance_id.lo) == fragment_id_to_channel_index.end()) { - _channel_shared_ptrs.emplace_back( - new Channel(this, row_desc, destinations[i].brpc_server, fragment_instance_id, - _dest_node_id, false, send_query_statistics_with_every_batch)); + _channel_shared_ptrs.emplace_back(new Channel(this, row_desc, + destinations[i].brpc_server, + fragment_instance_id, _dest_node_id)); } fragment_id_to_channel_index.emplace(fragment_instance_id.lo, _channel_shared_ptrs.size() - 1); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index f59dad266f..ca020d9bab 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -108,13 +108,11 @@ public: friend class pipeline::ExchangeSinkOperator; VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, const TDataStreamSink& sink, - const std::vector& destinations, - bool send_query_statistics_with_every_batch); + const std::vector& destinations); VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, PlanNodeId dest_node_id, - const std::vector& destinations, - bool send_query_statistics_with_every_batch); + const std::vector& destinations); ~VDataStreamSender() override; @@ -145,8 +143,6 @@ public: return _split_block_distribute_by_channel_timer; } MemTracker* mem_tracker() { return _mem_tracker.get(); } - QueryStatistics* query_statistics() { return _query_statistics.get(); } - QueryStatisticsPtr query_statisticsPtr() { return _query_statistics; } bool transfer_large_data_by_brpc() { return _transfer_large_data_by_brpc; } RuntimeProfile::Counter* merge_block_timer() { return _merge_block_timer; } segment_v2::CompressionTypePB compression_type() const { return _compression_type; } @@ -237,8 +233,7 @@ public: // how much tuple data is getting accumulated before being sent; it only applies // when data is added via add_row() and not sent directly via send_batch(). Channel(Parent* parent, const RowDescriptor& row_desc, const TNetworkAddress& brpc_dest, - const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, bool is_transfer_chain, - bool send_query_statistics_with_every_batch) + const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id) : _parent(parent), _row_desc(row_desc), _fragment_instance_id(fragment_instance_id), @@ -248,8 +243,6 @@ public: _need_close(false), _closed(false), _brpc_dest_addr(brpc_dest), - _is_transfer_chain(is_transfer_chain), - _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch), _is_local((_brpc_dest_addr.hostname == BackendOptions::get_localhost()) && (_brpc_dest_addr.port == config::brpc_port)), _serializer(_parent, _is_local) { @@ -380,9 +373,6 @@ protected: std::shared_ptr> _send_remote_block_callback; Status _receiver_status; int32_t _brpc_timeout_ms = 500; - // whether the dest can be treated as query statistics transfer chain. - bool _is_transfer_chain; - bool _send_query_statistics_with_every_batch; RuntimeState* _state = nullptr; bool _is_local; @@ -442,10 +432,8 @@ template class PipChannel final : public Channel { public: PipChannel(Parent* parent, const RowDescriptor& row_desc, const TNetworkAddress& brpc_dest, - const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, - bool is_transfer_chain, bool send_query_statistics_with_every_batch) - : Channel(parent, row_desc, brpc_dest, fragment_instance_id, dest_node_id, - is_transfer_chain, send_query_statistics_with_every_batch) { + const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id) + : Channel(parent, row_desc, brpc_dest, fragment_instance_id, dest_node_id) { ch_roll_pb_block(); } diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index 5bb0de4b50..02d77fa6d4 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -32,7 +32,6 @@ #include "vec/exprs/vexpr.h" namespace doris { -class QueryStatistics; class TExpr; } // namespace doris @@ -45,15 +44,13 @@ VResultFileSink::VResultFileSink(const RowDescriptor& row_desc, VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, const TResultFileSink& sink, const std::vector& destinations, - bool send_query_statistics_with_every_batch, const std::vector& t_output_expr, DescriptorTbl& descs) : AsyncWriterSink(row_desc, t_output_expr), _output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false) { _is_top_sink = false; CHECK_EQ(destinations.size(), 1); _stream_sender.reset(new VDataStreamSender(state, pool, sender_id, row_desc, sink.dest_node_id, - destinations, - send_query_statistics_with_every_batch)); + destinations)); } Status VResultFileSink::init(const TDataSink& tsink) { @@ -127,7 +124,7 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) { if (_is_top_sink) { // close sender, this is normal path end if (_sender) { - _sender->update_num_written_rows(_writer == nullptr ? 0 : _writer->get_written_rows()); + _sender->update_return_rows(_writer == nullptr ? 0 : _writer->get_written_rows()); static_cast(_sender->close(final_status)); } static_cast(state->exec_env()->result_mgr()->cancel_at_time( @@ -148,12 +145,4 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) { return Status::OK(); } -void VResultFileSink::set_query_statistics(std::shared_ptr statistics) { - if (_is_top_sink) { - _sender->set_query_statistics(statistics); - } else { - _stream_sender->set_query_statistics(statistics); - } -} - } // namespace doris::vectorized diff --git a/be/src/vec/sink/vresult_file_sink.h b/be/src/vec/sink/vresult_file_sink.h index b0d05823a5..65bc0492d8 100644 --- a/be/src/vec/sink/vresult_file_sink.h +++ b/be/src/vec/sink/vresult_file_sink.h @@ -34,7 +34,6 @@ namespace doris { class BufferControlBlock; class ObjectPool; -class QueryStatistics; class RuntimeProfile; class RuntimeState; class TDataSink; @@ -54,7 +53,6 @@ public: VResultFileSink(RuntimeState* state, ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, const TResultFileSink& sink, const std::vector& destinations, - bool send_query_statistics_with_every_batch, const std::vector& t_output_expr, DescriptorTbl& descs); Status init(const TDataSink& thrift_sink) override; @@ -65,8 +63,6 @@ public: // hosts. Further send() calls are illegal after calling close(). Status close(RuntimeState* state, Status exec_status) override; - void set_query_statistics(std::shared_ptr statistics) override; - private: // set file options when sink type is FILE std::unique_ptr _file_opts; diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp index 3fa2e03597..59bf82483c 100644 --- a/be/src/vec/sink/vresult_sink.cpp +++ b/be/src/vec/sink/vresult_sink.cpp @@ -41,7 +41,6 @@ #include "vec/sink/writer/vfile_result_writer.h" namespace doris { -class QueryStatistics; class RowDescriptor; class TExpr; @@ -169,9 +168,8 @@ Status VResultSink::close(RuntimeState* state, Status exec_status) { // close sender, this is normal path end if (_sender) { if (_writer) { - _sender->update_num_written_rows(_writer->get_written_rows()); + _sender->update_return_rows(_writer->get_written_rows()); } - _sender->update_max_peak_memory_bytes(); static_cast(_sender->close(final_status)); } static_cast(state->exec_env()->result_mgr()->cancel_at_time( @@ -180,9 +178,5 @@ Status VResultSink::close(RuntimeState* state, Status exec_status) { return DataSink::close(state, exec_status); } -void VResultSink::set_query_statistics(std::shared_ptr statistics) { - _sender->set_query_statistics(statistics); -} - } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h index 0cde7399c4..0dd69ee84c 100644 --- a/be/src/vec/sink/vresult_sink.h +++ b/be/src/vec/sink/vresult_sink.h @@ -34,7 +34,6 @@ namespace doris { class RuntimeState; class RuntimeProfile; class BufferControlBlock; -class QueryStatistics; class ResultWriter; class RowDescriptor; class TExpr; @@ -138,8 +137,6 @@ public: // hosts. Further send() calls are illegal after calling close(). Status close(RuntimeState* state, Status exec_status) override; - void set_query_statistics(std::shared_ptr statistics) override; - private: Status prepare_exprs(RuntimeState* state); Status second_phase_fetch_data(RuntimeState* state, Block* final_block); diff --git a/be/test/vec/runtime/vdata_stream_test.cpp b/be/test/vec/runtime/vdata_stream_test.cpp index 2ac8f8a648..86e6803fd6 100644 --- a/be/test/vec/runtime/vdata_stream_test.cpp +++ b/be/test/vec/runtime/vdata_stream_test.cpp @@ -171,9 +171,8 @@ TEST_F(VDataStreamTest, BasicTest) { int num_senders = 1; RuntimeProfile profile("profile"); bool is_merge = false; - std::shared_ptr statistics = std::make_shared(); auto recv = _instance.create_recvr(&runtime_stat, row_desc, uid, nid, num_senders, &profile, - is_merge, statistics); + is_merge); // Test Sender int sender_id = 1; @@ -194,10 +193,8 @@ TEST_F(VDataStreamTest, BasicTest) { dest.__set_server(addr); dests.push_back(dest); } - bool send_query_statistics_with_every_batch = false; VDataStreamSender sender(&runtime_stat, &_object_pool, sender_id, row_desc, tsink.stream_sink, - dests, send_query_statistics_with_every_batch); - sender.set_query_statistics(std::make_shared()); + dests); static_cast(sender.init(tsink)); static_cast(sender.prepare(&runtime_stat)); static_cast(sender.open(&runtime_stat)); diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 248b0f0dbe..af9d9bee4f 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2337,6 +2337,16 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int workload_max_action_num_in_policy = 5; // mainly used to limit set session var action + @ConfField(mutable = true) + public static int workload_runtime_status_thread_interval_ms = 2000; + + // NOTE: it should bigger than be config report_query_statistics_interval_ms + @ConfField(mutable = true) + public static int query_audit_log_timeout_ms = 5000; + + @ConfField(mutable = true) + public static int be_report_query_statistics_timeout_ms = 60000; + @ConfField(mutable = true, masterOnly = true) public static int workload_group_max_num = 15; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 3b3f5939fd..2f33e12da4 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -230,6 +230,7 @@ import org.apache.doris.qe.QueryCancelWorker; import org.apache.doris.qe.VariableMgr; import org.apache.doris.resource.Tag; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; +import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyPublisher; import org.apache.doris.scheduler.manager.TransientTaskManager; @@ -491,6 +492,9 @@ public class Env { private WorkloadGroupMgr workloadGroupMgr; private WorkloadSchedPolicyMgr workloadSchedPolicyMgr; + + private WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr; + private QueryStats queryStats; private StatisticsCleaner statisticsCleaner; @@ -739,6 +743,7 @@ public class Env { this.globalFunctionMgr = new GlobalFunctionMgr(); this.workloadGroupMgr = new WorkloadGroupMgr(); this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr(); + this.workloadRuntimeStatusMgr = new WorkloadRuntimeStatusMgr(); this.queryStats = new QueryStats(); this.loadManagerAdapter = new LoadManagerAdapter(); this.hiveTransactionMgr = new HiveTransactionMgr(); @@ -835,6 +840,10 @@ public class Env { return workloadSchedPolicyMgr; } + public WorkloadRuntimeStatusMgr getWorkloadRuntimeStatusMgr() { + return workloadRuntimeStatusMgr; + } + // use this to get correct ClusterInfoService instance public static SystemInfoService getCurrentSystemInfo() { return getCurrentEnv().getClusterInfo(); @@ -1014,6 +1023,7 @@ public class Env { workloadGroupMgr.startUpdateThread(); workloadSchedPolicyMgr.start(); workloadActionPublisherThread.start(); + workloadRuntimeStatusMgr.start(); } // wait until FE is ready. diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java index 01a03e8c26..732d33c5e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java @@ -100,6 +100,8 @@ public class AuditEvent { @AuditField(value = "FuzzyVariables") public String fuzzyVariables = ""; + public long pushToAuditLogQueueTime; + public static class AuditEventBuilder { private AuditEvent auditEvent = new AuditEvent(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index 4181f66786..83cd1d401f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -119,6 +119,6 @@ public class AuditLogHelper { } } } - Env.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build()); + Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(ctx.getAuditEventBuilder().build()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 8daf40054a..e4d8f8273f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -271,7 +271,8 @@ public abstract class ConnectProcessor { break; } } - auditAfterExec(auditStmt, executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(), true); + auditAfterExec(auditStmt, executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(), + true); // execute failed, skip remaining stmts if (ctx.getState().getStateType() == MysqlStateType.ERR) { break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index a3ee3c09e4..b6d902b76c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -201,6 +201,15 @@ public final class QeProcessorImpl implements QeProcessor { LOG.debug("params: {}", params); } final TReportExecStatusResult result = new TReportExecStatusResult(); + + if (params.isSetReportWorkloadRuntimeStatus()) { + Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().updateBeQueryStats(params.report_workload_runtime_status); + if (!params.isSetQueryId()) { + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } + } + final QueryInfo info = coordinatorMap.get(params.query_id); if (info == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java new file mode 100644 index 0000000000..085d844e61 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -0,0 +1,223 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadschedpolicy; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.Daemon; +import org.apache.doris.plugin.AuditEvent; +import org.apache.doris.thrift.TQueryStatistics; +import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class WorkloadRuntimeStatusMgr { + + private static final Logger LOG = LogManager.getLogger(WorkloadRuntimeStatusMgr.class); + private Map> beToQueryStatsMap = Maps.newConcurrentMap(); + private Map beLastReportTime = Maps.newConcurrentMap(); + private Map queryLastReportTime = Maps.newConcurrentMap(); + private final ReentrantReadWriteLock queryAuditEventLock = new ReentrantReadWriteLock(); + private List queryAuditEventList = Lists.newLinkedList(); + + class WorkloadRuntimeStatsThread extends Daemon { + + WorkloadRuntimeStatusMgr workloadStatsMgr; + + public WorkloadRuntimeStatsThread(WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr, String threadName, + int interval) { + super(threadName, interval); + this.workloadStatsMgr = workloadRuntimeStatusMgr; + } + + @Override + protected void runOneCycle() { + // 1 merge be query statistics + Map queryStatisticsMap = workloadStatsMgr.getQueryStatisticsMap(); + + // 2 log query audit + List auditEventList = workloadStatsMgr.getQueryNeedAudit(); + for (AuditEvent auditEvent : auditEventList) { + TQueryStatistics queryStats = queryStatisticsMap.get(auditEvent.queryId); + if (queryStats != null) { + auditEvent.scanRows = queryStats.scan_rows; + auditEvent.scanBytes = queryStats.scan_bytes; + auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes; + } + Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent); + } + + // 3 clear beToQueryStatsMap when be report timeout + workloadStatsMgr.clearReportTimeoutBeStatistics(); + } + + } + + private Daemon thread = null; + + public void submitFinishQueryToAudit(AuditEvent event) { + queryAuditEventLogWriteLock(); + try { + event.pushToAuditLogQueueTime = System.currentTimeMillis(); + queryAuditEventList.add(event); + } finally { + queryAuditEventLogWriteUnlock(); + } + } + + public List getQueryNeedAudit() { + List ret = new ArrayList<>(); + long currentTime = System.currentTimeMillis(); + queryAuditEventLogWriteLock(); + try { + int queryAuditLogTimeout = Config.query_audit_log_timeout_ms; + Iterator iter = queryAuditEventList.iterator(); + while (iter.hasNext()) { + AuditEvent ae = iter.next(); + if (currentTime - ae.pushToAuditLogQueueTime > queryAuditLogTimeout) { + ret.add(ae); + iter.remove(); + } else { + break; + } + } + } finally { + queryAuditEventLogWriteUnlock(); + } + return ret; + } + + public void start() { + thread = new WorkloadRuntimeStatsThread(this, "workload-runtime-stats-thread", + Config.workload_runtime_status_thread_interval_ms); + thread.start(); + } + + public void updateBeQueryStats(TReportWorkloadRuntimeStatusParams params) { + if (!params.isSetBackendId()) { + LOG.warn("be report workload runtime status but without beid"); + return; + } + if (!params.isSetQueryStatisticsMap()) { + LOG.warn("be report workload runtime status but without query stats map"); + return; + } + long beId = params.backend_id; + Map queryIdMap = beToQueryStatsMap.get(beId); + beLastReportTime.put(beId, System.currentTimeMillis()); + if (queryIdMap == null) { + queryIdMap = Maps.newConcurrentMap(); + queryIdMap.putAll(params.query_statistics_map); + beToQueryStatsMap.put(beId, queryIdMap); + } else { + long currentTime = System.currentTimeMillis(); + for (Map.Entry entry : params.query_statistics_map.entrySet()) { + queryIdMap.put(entry.getKey(), entry.getValue()); + queryLastReportTime.put(entry.getKey(), currentTime); + } + } + } + + public Map getQueryStatisticsMap() { + // 1 merge query stats in all be + Set beIdSet = beToQueryStatsMap.keySet(); + Map retQueryMap = Maps.newHashMap(); + for (Long beId : beIdSet) { + Map currentQueryMap = beToQueryStatsMap.get(beId); + Set queryIdSet = currentQueryMap.keySet(); + for (String queryId : queryIdSet) { + TQueryStatistics retQuery = retQueryMap.get(queryId); + if (retQuery == null) { + retQuery = new TQueryStatistics(); + retQueryMap.put(queryId, retQuery); + } + + TQueryStatistics curQueryStats = currentQueryMap.get(queryId); + mergeQueryStatistics(retQuery, curQueryStats); + } + } + + return retQueryMap; + } + + private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics src) { + dst.scan_rows += src.scan_rows; + dst.scan_bytes += src.scan_bytes; + dst.cpu_ms += src.cpu_ms; + if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) { + dst.max_peak_memory_bytes = src.max_peak_memory_bytes; + } + } + + void clearReportTimeoutBeStatistics() { + // 1 clear report timeout be + Set beNeedToRemove = new HashSet<>(); + Set currentBeIdSet = beToQueryStatsMap.keySet(); + Long currentTime = System.currentTimeMillis(); + for (Long beId : currentBeIdSet) { + Long lastReportTime = beLastReportTime.get(beId); + if (lastReportTime != null + && currentTime - lastReportTime > Config.be_report_query_statistics_timeout_ms) { + beNeedToRemove.add(beId); + } + } + for (Long beId : beNeedToRemove) { + beToQueryStatsMap.remove(beId); + beLastReportTime.remove(beId); + } + + // 2 clear report timeout query + Set queryNeedToClear = new HashSet<>(); + Long newCurrentTime = System.currentTimeMillis(); + Set queryLastReportTimeKeySet = queryLastReportTime.keySet(); + for (String queryId : queryLastReportTimeKeySet) { + Long lastReportTime = queryLastReportTime.get(queryId); + if (lastReportTime != null + && newCurrentTime - lastReportTime > Config.be_report_query_statistics_timeout_ms) { + queryNeedToClear.add(queryId); + } + } + + Set beIdSet = beToQueryStatsMap.keySet(); + for (String queryId : queryNeedToClear) { + for (Long beId : beIdSet) { + beToQueryStatsMap.get(beId).remove(queryId); + } + queryLastReportTime.remove(queryId); + } + } + + private void queryAuditEventLogWriteLock() { + queryAuditEventLock.writeLock().lock(); + } + + private void queryAuditEventLogWriteUnlock() { + queryAuditEventLock.writeLock().unlock(); + } +} diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 75e6772211..615f86ca9a 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -406,6 +406,11 @@ struct TQueryStatistics { 5: optional i64 max_peak_memory_bytes } +struct TReportWorkloadRuntimeStatusParams { + 1: optional i64 backend_id + 2: map query_statistics_map +} + // The results of an INSERT query, sent to the coordinator as part of // TReportExecStatusParams struct TReportExecStatusParams { @@ -470,6 +475,8 @@ struct TReportExecStatusParams { 23: optional list detailed_report 24: optional TQueryStatistics query_statistics + + 25: TReportWorkloadRuntimeStatusParams report_workload_runtime_status } struct TFeResult {