[pipelineX](bug) Add some logs (#27596)

This commit is contained in:
Gabriel
2023-11-28 10:02:13 +08:00
committed by GitHub
parent 5bdfaf6447
commit ea7eca9345
7 changed files with 46 additions and 4 deletions

View File

@ -201,6 +201,7 @@ public:
}
private:
friend class ExchangeSinkLocalState;
void _set_ready_to_finish(bool all_done);
phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>

View File

@ -518,6 +518,15 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status exec_status)
return final_st;
}
std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
PipelineXSinkLocalState<>::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {}, _busy_channels = {})",
_sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load());
return fmt::to_string(debug_string_buffer);
}
Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();

View File

@ -186,6 +186,7 @@ public:
std::string id_name() override;
segment_v2::CompressionTypePB& compression_type();
std::string debug_string(int indentation_level) const override;
std::vector<vectorized::PipChannel<ExchangeSinkLocalState>*> channels;
std::vector<std::shared_ptr<vectorized::PipChannel<ExchangeSinkLocalState>>>

View File

@ -43,6 +43,30 @@ bool ExchangeSourceOperator::is_pending_finish() const {
ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<>(state, parent), num_rows_skipped(0), is_ready(false) {}
std::string ExchangeLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
PipelineXLocalState<>::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", Queues: (");
const auto& queues = stream_recvr->sender_queues();
for (size_t i = 0; i < queues.size(); i++) {
fmt::format_to(debug_string_buffer,
"No. {} queue: (_num_remaining_senders = {}, block_queue size = {})", i,
queues[i]->_num_remaining_senders, queues[i]->_block_queue.size());
}
fmt::format_to(debug_string_buffer, ")");
return fmt::to_string(debug_string_buffer);
}
std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
OperatorX<ExchangeLocalState>::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", Info: (_num_senders = {}, _is_merging = {})",
_num_senders, _is_merging);
return fmt::to_string(debug_string_buffer);
}
Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
SCOPED_TIMER(exec_time_counter());

View File

@ -67,6 +67,7 @@ class ExchangeLocalState final : public PipelineXLocalState<> {
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
Dependency* dependency() override { return source_dependency.get(); }
std::string debug_string(int indentation_level) const override;
std::shared_ptr<doris::vectorized::VDataStreamRecvr> stream_recvr;
doris::vectorized::VSortExecExprs vsort_exec_exprs;
int64_t num_rows_skipped;
@ -89,6 +90,8 @@ public:
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
std::string debug_string(int indentation_level = 0) const override;
Status close(RuntimeState* state) override;
[[nodiscard]] bool is_source() const override { return true; }

View File

@ -342,10 +342,12 @@ std::string PipelineXTask::debug_string() {
fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
print_id(_state->fragment_instance_id()));
fmt::format_to(
debug_string_buffer,
"PipelineTask[this = {}, state = {}, data state = {}, dry run = {}]\noperators: ",
(void*)this, get_state_name(_cur_state), (int)_data_state, _dry_run);
fmt::format_to(debug_string_buffer,
"PipelineTask[this = {}, state = {}, data state = {}, dry run = {}, elapse time "
"= {}ns], block dependency = {}, _use_blocking_queue = {}\noperators: ",
(void*)this, get_state_name(_cur_state), (int)_data_state, _dry_run,
MonotonicNanos() - _fragment_context->create_time(),
_blocked_dep ? _blocked_dep->debug_string() : "NULL", _use_blocking_queue);
for (size_t i = 0; i < _operators.size(); i++) {
fmt::format_to(
debug_string_buffer, "\n{}",

View File

@ -60,6 +60,7 @@ class RuntimeState;
namespace pipeline {
struct ExchangeDataDependency;
class LocalExchangeChannelDependency;
class ExchangeLocalState;
} // namespace pipeline
namespace vectorized {
@ -235,6 +236,7 @@ public:
}
protected:
friend class pipeline::ExchangeLocalState;
Status _inner_get_batch_without_lock(Block* block, bool* eos);
// Not managed by this class