From 06f71f2bca7f8f38e0eb58a43e661c8374e1de9f Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 23 Dec 2022 22:17:50 +0800 Subject: [PATCH] [pipeline](fix) Fix bugs to pass all regression cases (#15306) * [pipeline](fix) Fix bugs to pass all regression cases * update * update --- be/src/pipeline/CMakeLists.txt | 1 + be/src/pipeline/exec/operator.h | 2 + .../exec/result_file_sink_operator.cpp | 34 ++++++++++ .../pipeline/exec/result_file_sink_operator.h | 45 +++++++++++++ be/src/pipeline/pipeline_fragment_context.cpp | 67 ++++++++++++------- be/src/pipeline/pipeline_fragment_context.h | 4 +- be/src/pipeline/pipeline_task.cpp | 2 + be/src/pipeline/task_scheduler.cpp | 1 + be/src/runtime/fragment_mgr.cpp | 6 +- be/src/vec/exec/join/vhash_join_node.cpp | 19 +++++- be/src/vec/exec/join/vhash_join_node.h | 4 +- .../vec/exec/join/vnested_loop_join_node.cpp | 3 +- be/src/vec/exec/vrepeat_node.cpp | 2 +- be/src/vec/exec/vrepeat_node.h | 5 +- be/src/vec/exec/vtable_function_node.h | 3 +- .../string_functions/test_split_part.groovy | 2 +- 16 files changed, 162 insertions(+), 38 deletions(-) create mode 100644 be/src/pipeline/exec/result_file_sink_operator.cpp create mode 100644 be/src/pipeline/exec/result_file_sink_operator.h diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt index ee49218de3..c7c5bf87af 100644 --- a/be/src/pipeline/CMakeLists.txt +++ b/be/src/pipeline/CMakeLists.txt @@ -37,6 +37,7 @@ set(PIPELINE_FILES exec/exchange_sink_operator.cpp exec/exchange_sink_buffer.cpp exec/result_sink_operator.cpp + exec/result_file_sink_operator.cpp exec/aggregation_sink_operator.cpp exec/aggregation_source_operator.cpp exec/hashjoin_build_sink.cpp diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 58c36b3a51..a5df8edd79 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -449,6 +449,8 @@ public: source_state = SourceState::FINISHED; } else if (!node->need_more_input_data()) { source_state = SourceState::MORE_DATA; + } else if (source_state == SourceState::MORE_DATA) { + source_state = _child_source_state; } } return Status::OK(); diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp new file mode 100644 index 0000000000..72f621ec8b --- /dev/null +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "result_file_sink_operator.h" + +#include "vec/sink/vresult_file_sink.h" + +namespace doris::pipeline { + +ResultFileSinkOperatorBuilder::ResultFileSinkOperatorBuilder(int32_t id, DataSink* sink) + : DataSinkOperatorBuilder(id, "ResultSinkOperator", sink) {}; + +OperatorPtr ResultFileSinkOperatorBuilder::build_operator() { + return std::make_shared(this, _sink); +} + +ResultFileSinkOperator::ResultFileSinkOperator(OperatorBuilderBase* operator_builder, + DataSink* sink) + : DataSinkOperator(operator_builder, sink) {}; +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h new file mode 100644 index 0000000000..5f21b108a7 --- /dev/null +++ b/be/src/pipeline/exec/result_file_sink_operator.h @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" + +namespace doris { +namespace vectorized { +class VResultFileSink; +} + +namespace pipeline { + +class ResultFileSinkOperatorBuilder final + : public DataSinkOperatorBuilder { +public: + ResultFileSinkOperatorBuilder(int32_t id, DataSink* sink); + + OperatorPtr build_operator() override; +}; + +class ResultFileSinkOperator final : public DataSinkOperator { +public: + ResultFileSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink); + + bool can_write() override { return true; } +}; + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 5b9918885a..4cfeb036ae 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -20,43 +20,44 @@ #include #include -#include "exec/aggregation_sink_operator.h" -#include "exec/aggregation_source_operator.h" -#include "exec/analytic_sink_operator.h" -#include "exec/analytic_source_operator.h" #include "exec/data_sink.h" -#include "exec/datagen_operator.h" -#include "exec/empty_set_operator.h" -#include "exec/exchange_sink_operator.h" -#include "exec/exchange_source_operator.h" -#include "exec/hashjoin_build_sink.h" -#include "exec/hashjoin_probe_operator.h" -#include "exec/mysql_scan_operator.h" -#include "exec/repeat_operator.h" -#include "exec/result_sink_operator.h" #include "exec/scan_node.h" -#include "exec/scan_operator.h" -#include "exec/schema_scan_operator.h" -#include "exec/select_operator.h" -#include "exec/set_probe_sink_operator.h" -#include "exec/set_sink_operator.h" -#include "exec/set_source_operator.h" -#include "exec/sort_sink_operator.h" -#include "exec/sort_source_operator.h" -#include "exec/streaming_aggregation_sink_operator.h" -#include "exec/streaming_aggregation_source_operator.h" -#include "exec/table_sink_operator.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/HeartbeatService_types.h" +#include "pipeline/exec/aggregation_sink_operator.h" +#include "pipeline/exec/aggregation_source_operator.h" +#include "pipeline/exec/analytic_sink_operator.h" +#include "pipeline/exec/analytic_source_operator.h" #include "pipeline/exec/assert_num_rows_operator.h" #include "pipeline/exec/broker_scan_operator.h" #include "pipeline/exec/const_value_operator.h" #include "pipeline/exec/data_queue.h" +#include "pipeline/exec/datagen_operator.h" +#include "pipeline/exec/empty_set_operator.h" +#include "pipeline/exec/exchange_sink_operator.h" +#include "pipeline/exec/exchange_source_operator.h" +#include "pipeline/exec/hashjoin_build_sink.h" +#include "pipeline/exec/hashjoin_probe_operator.h" +#include "pipeline/exec/mysql_scan_operator.h" #include "pipeline/exec/nested_loop_join_build_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/olap_table_sink_operator.h" #include "pipeline/exec/operator.h" +#include "pipeline/exec/repeat_operator.h" +#include "pipeline/exec/result_file_sink_operator.h" +#include "pipeline/exec/result_sink_operator.h" +#include "pipeline/exec/scan_operator.h" +#include "pipeline/exec/schema_scan_operator.h" +#include "pipeline/exec/select_operator.h" +#include "pipeline/exec/set_probe_sink_operator.h" +#include "pipeline/exec/set_sink_operator.h" +#include "pipeline/exec/set_source_operator.h" +#include "pipeline/exec/sort_sink_operator.h" +#include "pipeline/exec/sort_source_operator.h" +#include "pipeline/exec/streaming_aggregation_sink_operator.h" +#include "pipeline/exec/streaming_aggregation_source_operator.h" #include "pipeline/exec/table_function_operator.h" +#include "pipeline/exec/table_sink_operator.h" #include "pipeline/exec/union_sink_operator.h" #include "pipeline/exec/union_source_operator.h" #include "pipeline_task.h" @@ -78,6 +79,7 @@ #include "vec/exec/vsort_node.h" #include "vec/exec/vunion_node.h" #include "vec/runtime/vdata_stream_mgr.h" +#include "vec/sink/vresult_file_sink.h" #include "vec/sink/vresult_sink.h" using apache::thrift::transport::TTransportException; @@ -99,6 +101,10 @@ PipelineFragmentContext::PipelineFragmentContext( _fragment_watcher.start(); } +PipelineFragmentContext::~PipelineFragmentContext() { + _call_back(_runtime_state.get(), &_exec_status); +} + void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { if (!_runtime_state->is_cancelled()) { @@ -547,6 +553,14 @@ Status PipelineFragmentContext::submit() { } } +void PipelineFragmentContext::close_if_prepare_failed() { + for (auto& task : _tasks) { + DCHECK(!task->is_pending_finish()); + WARN_IF_ERROR(task->close(), "close_if_prepare_failed failed: "); + close_a_pipeline(); + } +} + // construct sink operator Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) { OperatorBuilderPtr sink_; @@ -572,6 +586,11 @@ Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) { sink_ = std::make_shared(next_operator_builder_id(), _sink.get()); break; } + case TDataSinkType::RESULT_FILE_SINK: { + sink_ = std::make_shared(next_operator_builder_id(), + _sink.get()); + break; + } default: return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type); } diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 6f4f30ac91..0b36a3a421 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -41,7 +41,7 @@ public: ExecEnv* exec_env, std::function call_back); - ~PipelineFragmentContext() { _call_back(_runtime_state.get(), &_exec_status); } + ~PipelineFragmentContext(); PipelinePtr add_pipeline(); @@ -58,6 +58,8 @@ public: Status submit(); + void close_if_prepare_failed(); + void set_is_report_success(bool is_report_success) { _is_report_success = is_report_success; } ExecNode*& plan() { return _root_plan; } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index c55ef9e305..f6c2fc4f93 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -226,6 +226,8 @@ std::string PipelineTask::debug_string() const { fmt::format_to(debug_string_buffer, "\n{}{}", std::string(i * 2, ' '), _operators[i]->debug_string()); } + fmt::format_to(debug_string_buffer, "\n{}{}", std::string(_operators.size() * 2, ' '), + _sink->debug_string()); return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index c5e5ee57c1..3ee4cb82d1 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -249,6 +249,7 @@ void TaskScheduler::_do_work(size_t index) { status.to_string()); // exec failed,cancel all fragment instance fragment_ctx->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, status.to_string()); + fragment_ctx->send_report(true); _try_close_task(task, CANCELED); continue; } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 8801e733ee..0beccbdf9c 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -747,8 +747,12 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi fragments_ctx, _exec_env, cb); { SCOPED_RAW_TIMER(&duration_ns); - RETURN_IF_ERROR(context->prepare(params)); + auto prepare_st = context->prepare(params); g_fragmentmgr_prepare_latency << (duration_ns / 1000); + if (!prepare_st.ok()) { + context->close_if_prepare_failed(); + return prepare_st; + } } std::shared_ptr handler; diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 497f60b828..70bf40cce0 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -448,7 +448,7 @@ Status HashJoinNode::close(RuntimeState* state) { return VJoinNodeBase::close(state); } -bool HashJoinNode::need_more_input_data() { +bool HashJoinNode::need_more_input_data() const { return (_probe_block.rows() == 0 || _probe_index == _probe_block.rows()) && !_probe_eos && !_short_circuit_for_null_in_probe_side; } @@ -812,7 +812,9 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc } _shared_hashtable_controller->signal(id()); } - } else if (!_should_build_hash_table) { + } else if (!_should_build_hash_table && + ((state->enable_pipeline_exec() && eos) || !state->enable_pipeline_exec())) { + // TODO: For pipeline engine, we should finish this pipeline task if _should_build_hash_table is false if (!state->enable_pipeline_exec()) { child(1)->close(state); } @@ -858,12 +860,23 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc } } - if (eos || !_should_build_hash_table) { + if (eos || (!_should_build_hash_table && !state->enable_pipeline_exec())) { _process_hashtable_ctx_variants_init(state); } return Status::OK(); } +void HashJoinNode::debug_string(int indentation_level, std::stringstream* out) const { + *out << string(indentation_level * 2, ' '); + *out << "HashJoin(need_more_input_data=" << (need_more_input_data() ? "true" : "false") + << " _probe_block.rows()=" << _probe_block.rows() << " _probe_index=" << _probe_index + << " _probe_eos=" << _probe_eos + << " _short_circuit_for_null_in_probe_side=" << _short_circuit_for_null_in_probe_side; + *out << ")\n children=("; + ExecNode::debug_string(indentation_level, out); + *out << ")"; +} + template Status HashJoinNode::_extract_join_column(Block& block, ColumnUInt8::MutablePtr& null_map, ColumnRawPtrs& raw_ptrs, diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 76fa064903..9e1f61050a 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -207,11 +207,13 @@ public: Status alloc_resource(RuntimeState* state) override; void release_resource(RuntimeState* state) override; Status sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override; - bool need_more_input_data(); + bool need_more_input_data() const; Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override; void prepare_for_next() override; + void debug_string(int indentation_level, std::stringstream* out) const override; + private: using VExprContexts = std::vector; // probe expr diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 7fc43fcf19..5bebcd0d10 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -611,8 +611,9 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) { void VNestedLoopJoinNode::debug_string(int indentation_level, std::stringstream* out) const { *out << std::string(indentation_level * 2, ' '); *out << "VNestedLoopJoinNode"; - *out << "(eos=" << (_matched_rows_done ? "true" : "false") + *out << "(need_more_input_data=" << (need_more_input_data() ? "true" : "false") << " left_block_pos=" << _left_block_pos; + *out << ")\n children="; VJoinNodeBase::debug_string(indentation_level, out); *out << ")"; } diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp index 01db7d29ef..882619930e 100644 --- a/be/src/vec/exec/vrepeat_node.cpp +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -223,7 +223,7 @@ Status VRepeatNode::push(RuntimeState* state, vectorized::Block* input_block, bo return Status::OK(); } -bool VRepeatNode::need_more_input_data() { +bool VRepeatNode::need_more_input_data() const { return !_child_block.rows() && !_child_eos; } diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h index 394690d729..5f038c606e 100644 --- a/be/src/vec/exec/vrepeat_node.h +++ b/be/src/vec/exec/vrepeat_node.h @@ -45,11 +45,10 @@ public: Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override; - bool need_more_input_data(); + bool need_more_input_data() const; Block* get_child_block() { return &_child_block; } -protected: - virtual void debug_string(int indentation_level, std::stringstream* out) const override; + void debug_string(int indentation_level, std::stringstream* out) const override; private: Status get_repeated_block(Block* child_block, int repeat_id_idx, Block* output_block); diff --git a/be/src/vec/exec/vtable_function_node.h b/be/src/vec/exec/vtable_function_node.h index 85c0fca5be..85eccc047a 100644 --- a/be/src/vec/exec/vtable_function_node.h +++ b/be/src/vec/exec/vtable_function_node.h @@ -37,8 +37,7 @@ public: return _children[0]->open(state); } Status get_next(RuntimeState* state, Block* block, bool* eos) override; - - bool need_more_input_data() { return !_child_block.rows() && !_child_eos; } + bool need_more_input_data() const { return !_child_block.rows() && !_child_eos; } void release_resource(doris::RuntimeState* state) override { Expr::close(_fn_ctxs, state); diff --git a/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy b/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy index 4369a1da1d..9b15ffb686 100644 --- a/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy +++ b/regression-test/suites/query_p0/sql_functions/string_functions/test_split_part.groovy @@ -25,6 +25,6 @@ suite("test_split_part") { where split_part("bCKHDX07at", "5.7.37", cast(name as int)) is not null; """ - exception "errCode = 2, detailMessage = [RUNTIME_ERROR]Argument at index 3 for function split_part must be constant" + exception "[RUNTIME_ERROR]Argument at index 3 for function split_part must be constant" } } \ No newline at end of file