From ea7eca9345c4af3f486d0cefac21ebc485beebdb Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 28 Nov 2023 10:02:13 +0800 Subject: [PATCH] [pipelineX](bug) Add some logs (#27596) --- be/src/pipeline/exec/exchange_sink_buffer.h | 1 + .../pipeline/exec/exchange_sink_operator.cpp | 9 +++++++ be/src/pipeline/exec/exchange_sink_operator.h | 1 + .../exec/exchange_source_operator.cpp | 24 +++++++++++++++++++ .../pipeline/exec/exchange_source_operator.h | 3 +++ .../pipeline/pipeline_x/pipeline_x_task.cpp | 10 ++++---- be/src/vec/runtime/vdata_stream_recvr.h | 2 ++ 7 files changed, 46 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index a04b3b29b3..d59872e2a1 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -201,6 +201,7 @@ public: } private: + friend class ExchangeSinkLocalState; void _set_ready_to_finish(bool all_done); phmap::flat_hash_map> diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 71517f377f..1c66ec0220 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -518,6 +518,15 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status exec_status) return final_st; } +std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}", + PipelineXSinkLocalState<>::debug_string(indentation_level)); + fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {}, _busy_channels = {})", + _sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load()); + return fmt::to_string(debug_string_buffer); +} + Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 19a326d5c6..751c2768e8 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -186,6 +186,7 @@ public: std::string id_name() override; segment_v2::CompressionTypePB& compression_type(); + std::string debug_string(int indentation_level) const override; std::vector*> channels; std::vector>> diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 3b630cfcfc..1c766f06a8 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -43,6 +43,30 @@ bool ExchangeSourceOperator::is_pending_finish() const { ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase* parent) : PipelineXLocalState<>(state, parent), num_rows_skipped(0), is_ready(false) {} +std::string ExchangeLocalState::debug_string(int indentation_level) const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}", + PipelineXLocalState<>::debug_string(indentation_level)); + fmt::format_to(debug_string_buffer, ", Queues: ("); + const auto& queues = stream_recvr->sender_queues(); + for (size_t i = 0; i < queues.size(); i++) { + fmt::format_to(debug_string_buffer, + "No. {} queue: (_num_remaining_senders = {}, block_queue size = {})", i, + queues[i]->_num_remaining_senders, queues[i]->_block_queue.size()); + } + fmt::format_to(debug_string_buffer, ")"); + return fmt::to_string(debug_string_buffer); +} + +std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}", + OperatorX::debug_string(indentation_level)); + fmt::format_to(debug_string_buffer, ", Info: (_num_senders = {}, _is_merging = {})", + _num_senders, _is_merging); + return fmt::to_string(debug_string_buffer); +} + Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); SCOPED_TIMER(exec_time_counter()); diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 8e745def1b..abac33001b 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -67,6 +67,7 @@ class ExchangeLocalState final : public PipelineXLocalState<> { Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; Dependency* dependency() override { return source_dependency.get(); } + std::string debug_string(int indentation_level) const override; std::shared_ptr stream_recvr; doris::vectorized::VSortExecExprs vsort_exec_exprs; int64_t num_rows_skipped; @@ -89,6 +90,8 @@ public: Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override; + std::string debug_string(int indentation_level = 0) const override; + Status close(RuntimeState* state) override; [[nodiscard]] bool is_source() const override { return true; } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index c96f34b213..ed2af3f785 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -342,10 +342,12 @@ std::string PipelineXTask::debug_string() { fmt::format_to(debug_string_buffer, "InstanceId: {}\n", print_id(_state->fragment_instance_id())); - fmt::format_to( - debug_string_buffer, - "PipelineTask[this = {}, state = {}, data state = {}, dry run = {}]\noperators: ", - (void*)this, get_state_name(_cur_state), (int)_data_state, _dry_run); + fmt::format_to(debug_string_buffer, + "PipelineTask[this = {}, state = {}, data state = {}, dry run = {}, elapse time " + "= {}ns], block dependency = {}, _use_blocking_queue = {}\noperators: ", + (void*)this, get_state_name(_cur_state), (int)_data_state, _dry_run, + MonotonicNanos() - _fragment_context->create_time(), + _blocked_dep ? _blocked_dep->debug_string() : "NULL", _use_blocking_queue); for (size_t i = 0; i < _operators.size(); i++) { fmt::format_to( debug_string_buffer, "\n{}", diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index e31b433491..bfdd1dd351 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -60,6 +60,7 @@ class RuntimeState; namespace pipeline { struct ExchangeDataDependency; class LocalExchangeChannelDependency; +class ExchangeLocalState; } // namespace pipeline namespace vectorized { @@ -235,6 +236,7 @@ public: } protected: + friend class pipeline::ExchangeLocalState; Status _inner_get_batch_without_lock(Block* block, bool* eos); // Not managed by this class