[pipelineX](debug) Add debug logs for long-running load task (#32534)
This commit is contained in:
@ -510,6 +510,15 @@ Status AsyncWriterSink<Writer, Parent>::sink(RuntimeState* state, vectorized::Bl
|
||||
return _writer->sink(block, eos);
|
||||
}
|
||||
|
||||
template <typename Writer, typename Parent>
|
||||
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
|
||||
std::string AsyncWriterSink<Writer, Parent>::debug_string(int indentation_level) const {
|
||||
fmt::memory_buffer debug_string_buffer;
|
||||
fmt::format_to(debug_string_buffer, "{}, writer : {}", Base::debug_string(indentation_level),
|
||||
_writer->debug_string());
|
||||
return fmt::to_string(debug_string_buffer);
|
||||
}
|
||||
|
||||
template <typename Writer, typename Parent>
|
||||
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
|
||||
Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_status) {
|
||||
|
||||
@ -770,6 +770,8 @@ public:
|
||||
|
||||
Status sink(RuntimeState* state, vectorized::Block* block, bool eos);
|
||||
|
||||
std::string debug_string(int indentation_level) const override;
|
||||
|
||||
std::vector<Dependency*> dependencies() const override {
|
||||
return {_async_writer_dependency.get()};
|
||||
}
|
||||
|
||||
@ -83,6 +83,14 @@ public:
|
||||
return _writer_status;
|
||||
}
|
||||
|
||||
std::string debug_string() const {
|
||||
fmt::memory_buffer debug_string_buffer;
|
||||
fmt::format_to(debug_string_buffer,
|
||||
"{}, _eos = {}, _writer_thread_closed = {}, _writer_status = {}", _eos,
|
||||
_writer_thread_closed, _writer_status.to_string());
|
||||
return fmt::to_string(debug_string_buffer);
|
||||
}
|
||||
|
||||
protected:
|
||||
Status _projection_block(Block& input_block, Block* output_block);
|
||||
const VExprContextSPtrs& _vec_output_expr_ctxs;
|
||||
|
||||
@ -274,4 +274,33 @@ suite("test_base_insert_job") {
|
||||
def jobCountRsp = sql """select count(1) from jobs("type"="insert") where name in ('JOB','DO','SCHEDULE','AT','STARTS','ENDS')"""
|
||||
assert jobCountRsp.get(0).get(0) == 6
|
||||
|
||||
sql """
|
||||
DROP JOB IF EXISTS where jobname = '${jobName}'
|
||||
"""
|
||||
sql """
|
||||
DROP JOB IF EXISTS where jobname = 'JOB'
|
||||
"""
|
||||
sql """
|
||||
DROP JOB IF EXISTS where jobname = 'DO'
|
||||
"""
|
||||
sql """
|
||||
DROP JOB IF EXISTS where jobname = 'AT'
|
||||
"""
|
||||
sql """
|
||||
DROP JOB IF EXISTS where jobname = 'SCHEDULE'
|
||||
"""
|
||||
sql """
|
||||
DROP JOB IF EXISTS where jobname = 'STARTS'
|
||||
"""
|
||||
sql """
|
||||
DROP JOB IF EXISTS where jobname = 'ENDS'
|
||||
"""
|
||||
sql """
|
||||
DROP JOB IF EXISTS where jobname = '${jobMixedName}'
|
||||
"""
|
||||
|
||||
sql """
|
||||
DROP JOB IF EXISTS where jobname = '${jobName}'
|
||||
"""
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user