diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 093bf17f46..ff73bb9226 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -510,6 +510,15 @@ Status AsyncWriterSink::sink(RuntimeState* state, vectorized::Bl return _writer->sink(block, eos); } +template + requires(std::is_base_of_v) +std::string AsyncWriterSink::debug_string(int indentation_level) const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}, writer : {}", Base::debug_string(indentation_level), + _writer->debug_string()); + return fmt::to_string(debug_string_buffer); +} + template requires(std::is_base_of_v) Status AsyncWriterSink::close(RuntimeState* state, Status exec_status) { diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 06ba93a36f..ba3e74cd97 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -770,6 +770,8 @@ public: Status sink(RuntimeState* state, vectorized::Block* block, bool eos); + std::string debug_string(int indentation_level) const override; + std::vector dependencies() const override { return {_async_writer_dependency.get()}; } diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index 7f9700486d..974aa5ee61 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -83,6 +83,14 @@ public: return _writer_status; } + std::string debug_string() const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, + "{}, _eos = {}, _writer_thread_closed = {}, _writer_status = {}", _eos, + _writer_thread_closed, _writer_status.to_string()); + return fmt::to_string(debug_string_buffer); + } + protected: Status _projection_block(Block& input_block, Block* output_block); const VExprContextSPtrs& _vec_output_expr_ctxs; diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index b4400e89f2..02e4ecc9c7 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -274,4 +274,33 @@ suite("test_base_insert_job") { def jobCountRsp = sql """select count(1) from jobs("type"="insert") where name in ('JOB','DO','SCHEDULE','AT','STARTS','ENDS')""" assert jobCountRsp.get(0).get(0) == 6 + sql """ + DROP JOB IF EXISTS where jobname = '${jobName}' + """ + sql """ + DROP JOB IF EXISTS where jobname = 'JOB' + """ + sql """ + DROP JOB IF EXISTS where jobname = 'DO' + """ + sql """ + DROP JOB IF EXISTS where jobname = 'AT' + """ + sql """ + DROP JOB IF EXISTS where jobname = 'SCHEDULE' + """ + sql """ + DROP JOB IF EXISTS where jobname = 'STARTS' + """ + sql """ + DROP JOB IF EXISTS where jobname = 'ENDS' + """ + sql """ + DROP JOB IF EXISTS where jobname = '${jobMixedName}' + """ + + sql """ + DROP JOB IF EXISTS where jobname = '${jobName}' + """ + }