From 6b2eed779c9bc3df551e61f5efeb6f663ad38d36 Mon Sep 17 00:00:00 2001 From: Mryange <59914473+Mryange@users.noreply.github.com> Date: Wed, 25 Oct 2023 10:00:35 +0800 Subject: [PATCH] [feature](AuditLog) add scanRows scanBytes in auditlog (#25435) --- be/src/exec/exec_node.cpp | 8 +++++ be/src/exec/exec_node.h | 2 ++ be/src/pipeline/exec/exchange_sink_buffer.cpp | 8 +++++ be/src/pipeline/exec/exchange_sink_buffer.h | 3 ++ .../pipeline/exec/exchange_sink_operator.cpp | 1 + be/src/pipeline/exec/operator.h | 21 ++++++++++++ be/src/pipeline/pipeline.h | 10 ++++++ be/src/pipeline/pipeline_fragment_context.cpp | 2 ++ be/src/pipeline/pipeline_task.cpp | 25 +++++++++++++++ be/src/pipeline/pipeline_task.h | 3 ++ be/src/runtime/query_statistics.cpp | 32 ++++++++++--------- be/src/runtime/query_statistics.h | 20 +++++++++--- be/src/vec/exec/scan/new_olap_scan_node.cpp | 9 ++++-- be/src/vec/exec/scan/new_olap_scan_node.h | 1 + be/src/vec/exec/scan/new_olap_scanner.cpp | 3 +- be/src/vec/exec/scan/vscan_node.cpp | 3 +- be/src/vec/exec/scan/vscan_node.h | 1 + be/src/vec/exec/scan/vscanner.cpp | 2 ++ be/src/vec/exec/scan/vscanner.h | 2 ++ be/src/vec/exec/vexchange_node.cpp | 6 +++- be/src/vec/exec/vexchange_node.h | 1 + be/src/vec/runtime/vdata_stream_recvr.cpp | 11 +++++++ be/src/vec/runtime/vdata_stream_recvr.h | 3 ++ be/src/vec/sink/vdata_stream_sender.cpp | 15 +++++++-- be/src/vec/sink/vdata_stream_sender.h | 1 + 25 files changed, 167 insertions(+), 26 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 4c6a734347..ed8b11457a 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -187,6 +187,14 @@ Status ExecNode::collect_query_statistics(QueryStatistics* statistics) { return Status::OK(); } +Status ExecNode::collect_query_statistics(QueryStatistics* statistics, int sender_id) { + DCHECK(statistics != nullptr); + for (auto child_node : _children) { + RETURN_IF_ERROR(child_node->collect_query_statistics(statistics, sender_id)); + } + return Status::OK(); +} + void ExecNode::release_resource(doris::RuntimeState* state) { if (!_is_resource_released) { if (_rows_returned_counter != nullptr) { diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 8aeecc2b8c..d4d90546ff 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -162,6 +162,8 @@ public: // error. [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* statistics); + [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* statistics, + int sender_id); // close() will get called for every exec node, regardless of what else is called and // the status of these calls (i.e. prepare() may never have been called, or // prepare()/open()/get_next() returned with an error). diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index b3d2222e50..577eb4a462 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -237,6 +237,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { auto& brpc_request = _instance_to_request[id]; brpc_request->set_eos(request.eos); brpc_request->set_packet_seq(_instance_to_seq[id]++); + if (_statistics && _statistics->collected()) { + auto statistic = brpc_request->mutable_query_statistics(); + _statistics->to_pb(statistic); + } if (request.block) { brpc_request->set_allocated_block(request.block.get()); } @@ -297,6 +301,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { if (request.block_holder->get_block()) { brpc_request->set_allocated_block(request.block_holder->get_block()); } + if (_statistics && _statistics->collected()) { + auto statistic = brpc_request->mutable_query_statistics(); + _statistics->to_pb(statistic); + } auto* closure = request.channel->get_closure(id, request.eos, request.block_holder); ExchangeRpcContext rpc_ctx; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index c47d6c6a14..d63dd2a55f 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -33,6 +33,7 @@ #include "common/global_types.h" #include "common/status.h" +#include "runtime/query_statistics.h" #include "runtime/runtime_state.h" #include "service/backend_options.h" @@ -188,6 +189,7 @@ public: _queue_dependency = queue_dependency; _finish_dependency = finish_dependency; } + void set_query_statistics(QueryStatistics* statistics) { _statistics = statistics; } private: phmap::flat_hash_map> @@ -237,6 +239,7 @@ private: int _queue_capacity = 0; std::shared_ptr _queue_dependency = nullptr; std::shared_ptr _finish_dependency = nullptr; + QueryStatistics* _statistics = nullptr; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 89679ceff8..047e336a1e 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -66,6 +66,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) { _sink_buffer = std::make_unique>( id, _dest_node_id, _sink->_sender_id, _state->be_number(), state->get_query_ctx()); + _sink_buffer->set_query_statistics(_sink->query_statistics()); RETURN_IF_ERROR(DataSinkOperator::prepare(state)); _sink->registe_channels(_sink_buffer.get()); return Status::OK(); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 125f8fd89e..5362be88e8 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -174,6 +174,14 @@ public: virtual bool is_source() const; + virtual Status collect_query_statistics(QueryStatistics* statistics) { return Status::OK(); }; + + virtual Status collect_query_statistics(QueryStatistics* statistics, int sender_id) { + return Status::OK(); + }; + + virtual void set_query_statistics(std::shared_ptr) {}; + virtual Status init(const TDataSink& tsink) { return Status::OK(); } // Prepare for running. (e.g. resource allocation, etc.) @@ -318,6 +326,9 @@ public: Status finalize(RuntimeState* state) override { return Status::OK(); } [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { return _sink->profile(); } + void set_query_statistics(std::shared_ptr statistics) override { + _sink->set_query_statistics(statistics); + } protected: NodeType* _sink; @@ -388,6 +399,16 @@ public: return _node->runtime_profile(); } + Status collect_query_statistics(QueryStatistics* statistics) override { + RETURN_IF_ERROR(_node->collect_query_statistics(statistics)); + return Status::OK(); + } + + Status collect_query_statistics(QueryStatistics* statistics, int sender_id) override { + RETURN_IF_ERROR(_node->collect_query_statistics(statistics, sender_id)); + return Status::OK(); + } + protected: NodeType* _node; bool _use_projection; diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index c67ef6d29e..7dcffb410a 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -117,6 +117,14 @@ public: } [[nodiscard]] PipelineId id() const { return _pipeline_id; } + void set_is_root_pipeline() { _is_root_pipeline = true; } + bool is_root_pipeline() const { return _is_root_pipeline; } + void set_collect_query_statistics_with_every_batch() { + _collect_query_statistics_with_every_batch = true; + } + [[nodiscard]] bool collect_query_statistics_with_every_batch() const { + return _collect_query_statistics_with_every_batch; + } private: void _init_profile(); @@ -168,6 +176,8 @@ private: */ bool _always_can_read = false; bool _always_can_write = false; + bool _is_root_pipeline = false; + bool _collect_query_statistics_with_every_batch = false; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index d7c6aebdc1..226b1f0695 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -320,6 +320,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re } _root_pipeline = fragment_context->add_pipeline(); + _root_pipeline->set_is_root_pipeline(); + _root_pipeline->set_collect_query_statistics_with_every_batch(); RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline)); if (_sink) { RETURN_IF_ERROR(_create_sink(request.local_params[idx].sender_id, diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index a46914fb60..7e907b318d 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -59,6 +59,10 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* _root(_operators.back()), _sink(sink) { _pipeline_task_watcher.start(); + _query_statistics.reset(new QueryStatistics()); + _sink->set_query_statistics(_query_statistics); + _collect_query_statistics_with_every_batch = + _pipeline->collect_query_statistics_with_every_batch(); } PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, @@ -295,6 +299,10 @@ Status PipelineTask::execute(bool* eos) { if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); + if (_data_state == SourceState::FINISHED || + _collect_query_statistics_with_every_batch) { + RETURN_IF_ERROR(_collect_query_statistics()); + } auto status = _sink->sink(_state, block, _data_state); if (UNLIKELY(!status.ok() || block->rows() == 0)) { if (_fragment_context->is_group_commit()) { @@ -331,6 +339,23 @@ Status PipelineTask::finalize() { return _sink->finalize(_state); } +Status PipelineTask::_collect_query_statistics() { + // The execnode tree of a fragment will be split into multiple pipelines, we only need to collect the root pipeline. + if (_pipeline->is_root_pipeline()) { + // If the current fragment has only one instance, we can collect all of them; + // otherwise, we need to collect them based on the sender_id. + if (_state->num_per_fragment_instances() == 1) { + _query_statistics->clear(); + RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get())); + } else { + _query_statistics->clear(); + RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get(), + _state->per_fragment_instance_idx())); + } + } + return Status::OK(); +} + Status PipelineTask::try_close(Status exec_status) { if (_try_close_flag) { return Status::OK(); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 41f60c2765..99e41421bb 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -368,6 +368,9 @@ protected: int64_t _close_pipeline_time = 0; RuntimeProfile::Counter* _pip_task_total_timer; + std::shared_ptr _query_statistics; + Status _collect_query_statistics(); + bool _collect_query_statistics_with_every_batch = false; private: Operators _operators; // left is _source, right is _root diff --git a/be/src/runtime/query_statistics.cpp b/be/src/runtime/query_statistics.cpp index a6754215ac..22c18faa1e 100644 --- a/be/src/runtime/query_statistics.cpp +++ b/be/src/runtime/query_statistics.cpp @@ -20,6 +20,8 @@ #include #include +#include + namespace doris { void NodeStatistics::merge(const NodeStatistics& other) { @@ -85,6 +87,13 @@ void QueryStatistics::merge(QueryStatisticsRecvr* recvr) { recvr->merge(this); } +void QueryStatistics::merge(QueryStatisticsRecvr* recvr, int sender_id) { + auto it = recvr->_query_statistics.find(sender_id); + if (it != recvr->_query_statistics.end()) { + merge(*it->second); + } +} + void QueryStatistics::clearNodeStatistics() { for (auto& pair : _nodes_statistics_map) { delete pair.second; @@ -98,24 +107,17 @@ QueryStatistics::~QueryStatistics() { void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int sender_id) { std::lock_guard l(_lock); - QueryStatistics* query_statistics = nullptr; - auto iter = _query_statistics.find(sender_id); - if (iter == _query_statistics.end()) { - query_statistics = new QueryStatistics; - _query_statistics[sender_id] = query_statistics; - } else { - query_statistics = iter->second; + if (!_query_statistics.contains(sender_id)) { + _query_statistics[sender_id] = std::make_shared(); } - query_statistics->from_pb(statistics); + _query_statistics[sender_id]->from_pb(statistics); } -QueryStatisticsRecvr::~QueryStatisticsRecvr() { - // It is unnecessary to lock here, because the destructor will be - // called alter DataStreamRecvr's close in ExchangeNode. - for (auto& pair : _query_statistics) { - delete pair.second; - } - _query_statistics.clear(); +void QueryStatisticsRecvr::insert(QueryStatisticsPtr statistics, int sender_id) { + if (!statistics->collected()) return; + if (_query_statistics.contains(sender_id)) return; + std::lock_guard l(_lock); + _query_statistics[sender_id] = statistics; } } // namespace doris diff --git a/be/src/runtime/query_statistics.h b/be/src/runtime/query_statistics.h index c4ace8f23e..42c1457472 100644 --- a/be/src/runtime/query_statistics.h +++ b/be/src/runtime/query_statistics.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -88,6 +89,7 @@ public: void merge(QueryStatisticsRecvr* recvr); + void merge(QueryStatisticsRecvr* recvr, int sender_id); // Get the maximum value from the peak memory collected by all node statistics int64_t calculate_max_peak_memory_bytes(); @@ -100,13 +102,18 @@ public: returned_rows = 0; max_peak_memory_bytes = 0; clearNodeStatistics(); + //clear() is used before collection, so calling "clear" is equivalent to being collected. + set_collected(); } void to_pb(PQueryStatistics* statistics); void from_pb(const PQueryStatistics& statistics); + bool collected() const { return _collected; } + void set_collected() { _collected = true; } private: + friend class QueryStatisticsRecvr; int64_t scan_rows; int64_t scan_bytes; int64_t cpu_ms; @@ -117,17 +124,22 @@ private: // only set once by result sink when closing. int64_t max_peak_memory_bytes; // The statistics of the query on each backend. - typedef std::unordered_map NodeStatisticsMap; + using NodeStatisticsMap = std::unordered_map; NodeStatisticsMap _nodes_statistics_map; + bool _collected = false; }; - +using QueryStatisticsPtr = std::shared_ptr; // It is used for collecting sub plan query statistics in DataStreamRecvr. class QueryStatisticsRecvr { public: - ~QueryStatisticsRecvr(); + ~QueryStatisticsRecvr() = default; + // Transmitted via RPC, incurring serialization overhead. void insert(const PQueryStatistics& statistics, int sender_id); + // using local_exchange for transmission, only need to hold a shared pointer. + void insert(QueryStatisticsPtr statistics, int sender_id); + private: friend class QueryStatistics; @@ -138,7 +150,7 @@ private: } } - std::map _query_statistics; + std::map _query_statistics; SpinLock _lock; }; diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 128fb31039..3bbff6c20a 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -82,13 +82,18 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) { RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics)); if (!_is_pipeline_scan || _should_create_scanner) { - statistics->add_scan_bytes(_read_compressed_counter->value()); - statistics->add_scan_rows(_raw_rows_counter->value()); + statistics->add_scan_bytes(_byte_read_counter->value()); + statistics->add_scan_rows(_rows_read_counter->value()); statistics->add_cpu_ms(_scan_cpu_timer->value() / NANOS_PER_MILLIS); } return Status::OK(); } +Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics, int) { + RETURN_IF_ERROR(collect_query_statistics(statistics)); + return Status::OK(); +} + Status NewOlapScanNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VScanNode::prepare(state)); // if you want to add some profile in scan node, even it have not new VScanner object diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h b/be/src/vec/exec/scan/new_olap_scan_node.h index 0725c37cf5..cbbf307287 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.h +++ b/be/src/vec/exec/scan/new_olap_scan_node.h @@ -64,6 +64,7 @@ public: Status prepare(RuntimeState* state) override; Status collect_query_statistics(QueryStatistics* statistics) override; + Status collect_query_statistics(QueryStatistics* statistics, int sender_id) override; void set_scan_ranges(const std::vector& scan_ranges) override; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 26b5ead815..eef7cf8271 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -551,7 +551,8 @@ void NewOlapScanner::_update_realtime_counters() { } void NewOlapScanner::_update_counters_before_close() { - if (!_state->enable_profile() || _has_updated_counter) { + // Please don't directly enable the profile here, we need to set QueryStatistics using the counter inside. + if (_has_updated_counter) { return; } _has_updated_counter = true; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 57dc6b66fd..16ef362b96 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -276,7 +276,8 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* Status VScanNode::_init_profile() { // 1. counters for scan node - _rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead", TUnit::UNIT); + _rows_read_counter = ADD_COUNTER(_runtime_profile, "ScanRowsRead", TUnit::UNIT); + _byte_read_counter = ADD_COUNTER(_runtime_profile, "ScanByteRead", TUnit::BYTES); _total_throughput_counter = runtime_profile()->add_rate_counter("TotalReadThroughput", _rows_read_counter); _num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT); diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 2ecd6ac320..458cb318b7 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -319,6 +319,7 @@ protected: // rows read from the scanner (including those discarded by (pre)filters) RuntimeProfile::Counter* _rows_read_counter; + RuntimeProfile::Counter* _byte_read_counter; // Wall based aggregate read throughput [rows/sec] RuntimeProfile::Counter* _total_throughput_counter; RuntimeProfile::Counter* _num_scanners; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 0a43b3dd3d..3f900d1ca3 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -95,6 +95,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { break; } _num_rows_read += block->rows(); + _num_byte_read += block->allocated_bytes(); } // 2. Filter the output block finally. @@ -191,6 +192,7 @@ void VScanner::_update_counters_before_close() { if (_parent) { COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer); COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read); + COUNTER_UPDATE(_parent->_byte_read_counter, _num_byte_read); } else { COUNTER_UPDATE(_local_state->_scan_cpu_timer, _scan_cpu_timer); COUNTER_UPDATE(_local_state->_rows_read_counter, _num_rows_read); diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 5359204758..1abd20ba27 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -196,6 +196,8 @@ protected: // num of rows read from scanner int64_t _num_rows_read = 0; + int64_t _num_byte_read = 0; + // num of rows return from scanner, after filter block int64_t _num_rows_return = 0; diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index 7eb9858889..4b2e4f76d0 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -150,7 +150,11 @@ Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics) { statistics->merge(_sub_plan_query_statistics_recvr.get()); return Status::OK(); } - +Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics, int sender_id) { + RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics)); + statistics->merge(_sub_plan_query_statistics_recvr.get(), sender_id); + return Status::OK(); +} Status VExchangeNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h index c4f083dda4..94302e84d9 100644 --- a/be/src/vec/exec/vexchange_node.h +++ b/be/src/vec/exec/vexchange_node.h @@ -56,6 +56,7 @@ public: Status get_next(RuntimeState* state, Block* row_batch, bool* eos) override; void release_resource(RuntimeState* state) override; Status collect_query_statistics(QueryStatistics* statistics) override; + Status collect_query_statistics(QueryStatistics* statistics, int sender_id) override; Status close(RuntimeState* state) override; void set_num_senders(int num_senders) { _num_senders = num_senders; } diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index a6af6c634d..17f3975645 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -458,6 +458,17 @@ void VDataStreamRecvr::remove_sender(int sender_id, int be_number, Status exec_s _sender_queues[use_sender_id]->decrement_senders(be_number); } +void VDataStreamRecvr::remove_sender(int sender_id, int be_number, QueryStatisticsPtr statistics, + Status exec_status) { + if (!exec_status.ok()) { + cancel_stream(exec_status); + return; + } + int use_sender_id = _is_merging ? sender_id : 0; + _sender_queues[use_sender_id]->decrement_senders(be_number); + _sub_plan_query_statistics_recvr->insert(statistics, sender_id); +} + void VDataStreamRecvr::cancel_stream(Status exec_status) { VLOG_QUERY << "cancel_stream: fragment_instance_id=" << print_id(_fragment_instance_id) << exec_status; diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index d8ab873f87..b2fd6afdc7 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -108,6 +108,9 @@ public: // sender queue. Called from DataStreamMgr. void remove_sender(int sender_id, int be_number, Status exec_status); + void remove_sender(int sender_id, int be_number, QueryStatisticsPtr statistics, + Status exec_status); + void cancel_stream(Status exec_status); void close(); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 4f9816db0b..bff65bd897 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -140,7 +140,13 @@ Status Channel::send_local_block(Status exec_status, bool eos) { _local_recvr->add_block(&block, _parent->sender_id(), true); if (eos) { - _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); + /// TODO: Supported on pipelineX, we can hold QueryStatistics on the fragment instead of on instances. + if constexpr (std::is_same_v) { + _local_recvr->remove_sender(_parent->sender_id(), _be_number, + _parent->query_statisticsPtr(), exec_status); + } else { + _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); + } } return Status::OK(); } else { @@ -273,7 +279,12 @@ Status Channel::close_internal(Status exec_status) { SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); if (is_local()) { if (_recvr_is_valid()) { - _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); + if constexpr (std::is_same_v) { + _local_recvr->remove_sender(_parent->sender_id(), _be_number, + _parent->query_statisticsPtr(), exec_status); + } else { + _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); + } } } else { status = send_remote_block((PBlock*)nullptr, true, exec_status); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 203d59dc66..35f20d1b82 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -139,6 +139,7 @@ public: } MemTracker* mem_tracker() { return _mem_tracker.get(); } QueryStatistics* query_statistics() { return _query_statistics.get(); } + QueryStatisticsPtr query_statisticsPtr() { return _query_statistics; } bool transfer_large_data_by_brpc() { return _transfer_large_data_by_brpc; } RuntimeProfile::Counter* merge_block_timer() { return _merge_block_timer; } segment_v2::CompressionTypePB& compression_type() { return _compression_type; }