diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 363f847a06..14f81d9083 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -98,6 +98,11 @@ bool ExchangeSinkLocalState::transfer_large_data_by_brpc() const { Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info)); _sender_id = info.sender_id; + return Status::OK(); +} + +Status ExchangeSinkLocalState::open(RuntimeState* state) { + RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state)); _broadcast_pb_blocks.resize(config::num_broadcast_buffer); _broadcast_pb_block_idx = 0; auto& p = _parent->cast(); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index ef2353dd1c..caeb844175 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -77,6 +77,7 @@ public: _serializer(this) {} Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1); diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 587b109d6b..76cd2ece9d 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -45,7 +45,10 @@ ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase* paren Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); - stream_recvr = info.recvr; + auto& p = _parent->cast(); + stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( + state, p.input_row_desc(), state->fragment_instance_id(), p.id(), p.num_senders(), + profile(), p.is_merging(), p.sub_plan_query_statistics_recvr()); RETURN_IF_ERROR(_parent->cast()._vsort_exec_exprs.clone( state, vsort_exec_exprs)); return Status::OK(); diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 9e52fcd0fc..a95cef8140 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -80,7 +80,6 @@ public: Status close(RuntimeState* state) override; bool is_source() const override { return true; } - bool need_to_create_exch_recv() const override { return true; } RowDescriptor input_row_desc() const { return _input_row_desc; } diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index bde0b1574f..c8c02a77fd 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -52,10 +52,18 @@ bool ResultSinkOperator::can_write() { Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info)); - auto& p = _parent->cast(); auto fragment_instance_id = state->fragment_instance_id(); // create sender - _sender = info.sender; + std::shared_ptr sender = nullptr; + RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( + state->fragment_instance_id(), vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true, + state->execution_timeout())); + return Status::OK(); +} + +Status ResultSinkLocalState::open(RuntimeState* state) { + RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state)); + auto& p = _parent->cast(); _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size()); for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 19c3b13c0c..a83e3c28a9 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -50,7 +50,7 @@ public: : PipelineXSinkLocalState<>(parent, state) {} Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - + Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; private: @@ -74,8 +74,6 @@ public: bool can_write(RuntimeState* state) override; - [[nodiscard]] bool need_to_create_result_sender() const override { return true; } - private: friend class ResultSinkLocalState; diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 0d25fe803c..3b214bce52 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -25,14 +25,12 @@ namespace doris::pipeline { struct LocalStateInfo { const std::vector scan_ranges; Dependency* dependency; - std::shared_ptr recvr; }; // This struct is used only for initializing local sink state. struct LocalSinkStateInfo { const int sender_id; Dependency* dependency; - std::shared_ptr sender; }; class PipelineXLocalStateBase { @@ -232,7 +230,6 @@ public: } [[nodiscard]] virtual bool is_source() const override { return false; } - [[nodiscard]] virtual bool need_to_create_exch_recv() const { return false; } Status get_next_after_projects(RuntimeState* state, vectorized::Block* block, SourceState& source_state); @@ -345,6 +342,8 @@ public: virtual ~PipelineXSinkLocalStateBase() {} virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) = 0; + + virtual Status open(RuntimeState* state) { return Status::OK(); } virtual Status close(RuntimeState* state) = 0; virtual std::string debug_string(int indentation_level) const; @@ -402,8 +401,6 @@ public: virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) = 0; - [[nodiscard]] virtual bool need_to_create_result_sender() const { return false; } - template TARGET& cast() { DCHECK(dynamic_cast(this)) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 62853c0515..8faa2a76b8 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -299,33 +299,9 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( std::map pipeline_id_to_task; for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { - auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, - _pipelines[pip_idx]->operator_xs().front()->id(), - no_scan_ranges); - std::shared_ptr sender = nullptr; - if (_pipelines[pip_idx]->sink_x()->need_to_create_result_sender()) { - // create sender - RETURN_IF_ERROR(_runtime_states[i]->exec_env()->result_mgr()->create_sender( - _runtime_states[i]->fragment_instance_id(), - vectorized::RESULT_SINK_BUFFER_SIZE, &sender, true, - _runtime_states[i]->execution_timeout())); - } - - std::shared_ptr recvr = nullptr; - if (_pipelines[pip_idx]->operator_xs().front()->need_to_create_exch_recv()) { - auto* src = - (ExchangeSourceOperatorX*)_pipelines[pip_idx]->operator_xs().front().get(); - recvr = _runtime_states[i]->exec_env()->vstream_mgr()->create_recvr( - _runtime_states[i].get(), src->input_row_desc(), - _runtime_states[i]->fragment_instance_id(), src->id(), src->num_senders(), - _runtime_profile.get(), src->is_merging(), - src->sub_plan_query_statistics_recvr()); - } - - auto task = std::make_unique( - _pipelines[pip_idx], _total_tasks++, _runtime_states[i].get(), this, - _pipelines[pip_idx]->pipeline_profile(), scan_ranges, local_params.sender_id, - sender, recvr); + auto task = std::make_unique(_pipelines[pip_idx], _total_tasks++, + _runtime_states[i].get(), this, + _pipelines[pip_idx]->pipeline_profile()); pipeline_id_to_task.insert({_pipelines[pip_idx]->id(), task.get()}); RETURN_IF_ERROR(task->prepare(_runtime_states[i].get())); _runtime_profile->add_child(_pipelines[pip_idx]->pipeline_profile(), true, nullptr); @@ -361,6 +337,19 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( pipeline_id_to_task[dep]->get_downstream_dependency()); } } + + auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, + _pipelines[pip_idx]->operator_xs().front()->id(), + no_scan_ranges); + for (auto& op : _pipelines[pip_idx]->operator_xs()) { + LocalStateInfo info {scan_ranges, task->get_upstream_dependency(op->id())}; + RETURN_IF_ERROR(op->setup_local_state(_runtime_states[i].get(), info)); + } + + LocalSinkStateInfo info {local_params.sender_id, + task->get_downstream_dependency().get()}; + RETURN_IF_ERROR(_pipelines[pip_idx]->sink_x()->setup_local_state( + _runtime_states[i].get(), info)); } { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 370235580f..ae22b95acb 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -44,19 +44,12 @@ namespace doris::pipeline { PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, PipelineFragmentContext* fragment_context, - RuntimeProfile* parent_profile, - const std::vector& scan_ranges, const int sender_id, - std::shared_ptr& sender, - std::shared_ptr& recvr) + RuntimeProfile* parent_profile) : PipelineTask(pipeline, index, state, fragment_context, parent_profile), - _scan_ranges(scan_ranges), _operators(pipeline->operator_xs()), _source(_operators.front()), _root(_operators.back()), - _sink(pipeline->sink_shared_pointer()), - _sender_id(sender_id), - _sender(sender), - _recvr(recvr) { + _sink(pipeline->sink_shared_pointer()) { _pipeline_task_watcher.start(); _sink->get_dependency(_downstream_dependency); } @@ -98,27 +91,11 @@ Status PipelineXTask::_open() { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_CPU_TIMER(_task_cpu_timer); SCOPED_TIMER(_open_timer); - if (!_init_local_state) { - Status st = Status::OK(); - for (auto& o : _operators) { - Dependency* dep = _upstream_dependency.find(o->id()) == _upstream_dependency.end() - ? (Dependency*)nullptr - : _upstream_dependency.find(o->id())->second.get(); - LocalStateInfo info {_scan_ranges, dep, _recvr}; - Status cur_st = o->setup_local_state(_state, info); - if (!cur_st.ok()) { - st = cur_st; - } - } - LocalSinkStateInfo info {_sender_id, _downstream_dependency.get(), _sender}; - RETURN_IF_ERROR(_sink->setup_local_state(_state, info)); - _dry_run = _sink->should_dry_run(_state); - RETURN_IF_ERROR(st); - _init_local_state = true; - } + _dry_run = _sink->should_dry_run(_state); for (auto& o : _operators) { RETURN_IF_ERROR(_state->get_local_state(o->id())->open(_state)); } + RETURN_IF_ERROR(_state->get_sink_local_state(_sink->id())->open(_state)); _opened = true; return Status::OK(); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index bc90a959b6..c9e1103a58 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -50,10 +50,7 @@ class PriorityTaskQueue; class PipelineXTask : public PipelineTask { public: PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, - PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile, - const std::vector& scan_ranges, const int sender_id, - std::shared_ptr& sender, - std::shared_ptr& recvr); + PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile); Status prepare(RuntimeState* state) override; @@ -110,26 +107,25 @@ public: _upstream_dependency.insert({upstream_dependency->id(), upstream_dependency}); } + Dependency* get_upstream_dependency(int id) { + return _upstream_dependency.find(id) == _upstream_dependency.end() + ? (Dependency*)nullptr + : _upstream_dependency.find(id)->second.get(); + } + private: using DependencyMap = std::map; Status _open() override; - const std::vector _scan_ranges; - OperatorXs _operators; // left is _source, right is _root OperatorXPtr _source; OperatorXPtr _root; DataSinkOperatorXPtr _sink; - const int _sender_id; - DependencyMap _upstream_dependency; DependencySPtr _downstream_dependency; - std::shared_ptr _sender; - std::shared_ptr _recvr; bool _dry_run = false; - bool _init_local_state = false; }; } // namespace doris::pipeline diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql index ec27845894..50b50bc368 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ SUM(lo_extendedprice*lo_discount) AS +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_extendedprice*lo_discount) AS REVENUE FROM lineorder, date WHERE lo_orderdate = d_datekey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql index d4ed16a4a3..77c0262016 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ SUM(lo_extendedprice*lo_discount) AS +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_extendedprice*lo_discount) AS REVENUE FROM lineorder, date WHERE lo_orderdate = d_datekey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql index df41035f99..0052db0aac 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ SUM(lo_extendedprice*lo_discount) AS +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_extendedprice*lo_discount) AS REVENUE FROM lineorder, date WHERE lo_orderdate = d_datekey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql index e7a8529ea7..a47ec82b51 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ SUM(lo_revenue), d_year, p_brand +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_revenue), d_year, p_brand FROM lineorder, date, part, supplier WHERE lo_orderdate = d_datekey AND lo_partkey = p_partkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql index 221d0db794..9ab1a95d4d 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ SUM(lo_revenue), d_year, p_brand +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_revenue), d_year, p_brand FROM lineorder, date, part, supplier WHERE lo_orderdate = d_datekey AND lo_partkey = p_partkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql index 3fee8a0ade..b7e6bd7840 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ SUM(lo_revenue), d_year, p_brand +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(lo_revenue), d_year, p_brand FROM lineorder, date, part, supplier WHERE lo_orderdate = d_datekey AND lo_partkey = p_partkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql index 56b2cd68a4..85c470b708 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ c_nation, s_nation, d_year, +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_nation, s_nation, d_year, SUM(lo_revenue) AS REVENUE FROM customer, lineorder, supplier, date WHERE lo_custkey = c_custkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql index 0d24c46376..cd0b320f87 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ c_city, s_city, d_year, sum(lo_revenue) +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city, s_city, d_year, sum(lo_revenue) AS REVENUE FROM customer, lineorder, supplier, date WHERE lo_custkey = c_custkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql index eb41f98453..89765c02d9 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ c_city, s_city, d_year, SUM(lo_revenue) +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city, s_city, d_year, SUM(lo_revenue) AS REVENUE FROM customer, lineorder, supplier, date WHERE lo_custkey = c_custkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql index 43758bee15..5cef87a3fe 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ c_city, s_city, d_year, SUM(lo_revenue) +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city, s_city, d_year, SUM(lo_revenue) AS REVENUE FROM customer, lineorder, supplier, date WHERE lo_custkey = c_custkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql index 66d98abfa0..3e0227c2ea 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ d_year, c_nation, +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year, c_nation, SUM(lo_revenue - lo_supplycost) AS PROFIT FROM date, customer, supplier, part, lineorder WHERE lo_custkey = c_custkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql index 6b9e21d1f2..1338e780ae 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ d_year, s_nation, p_category, +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year, s_nation, p_category, SUM(lo_revenue - lo_supplycost) AS PROFIT FROM date, customer, supplier, part, lineorder WHERE lo_custkey = c_custkey diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql index b70db54312..d8e6f7c42d 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ d_year, s_city, p_brand, +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year, s_city, p_brand, SUM(lo_revenue - lo_supplycost) AS PROFIT FROM date, customer, supplier, part, lineorder WHERE lo_custkey = c_custkey diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql index fbb302e8c0..ded6754a97 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql @@ -1,5 +1,5 @@ -- tables: lineitem -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ l_returnflag, l_linestatus, sum(l_quantity) AS sum_qty, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql index 46cf6d7e13..f102f7504d 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql @@ -1,5 +1,5 @@ -- tables: part,supplier,partsupp,nation,region -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ s_acctbal, s_name, n_name, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql index 770d0cf07e..8bd60f0e07 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql @@ -1,5 +1,5 @@ -- tables: customer,orders,lineitem -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ l_orderkey, sum(l_extendedprice * (1 - l_discount)) AS revenue, o_orderdate, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql index f0ef6bd650..3f44094729 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql @@ -1,5 +1,5 @@ -- tables: orders,lineitem -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ o_orderpriority, count(*) AS order_count FROM orders diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql index 4e1c7f66c7..ed179f8b86 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql @@ -1,5 +1,5 @@ -- tables: customer,orders,lineitem,supplier,nation,region -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ n_name, sum(l_extendedprice * (1 - l_discount)) AS revenue FROM diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql index 30d6b66b1c..2dd86f8c2c 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql @@ -1,6 +1,6 @@ -- tables: lineitem -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ sum(l_extendedprice * l_discount) AS revenue +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ sum(l_extendedprice * l_discount) AS revenue FROM lineitem WHERE diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql index 729c03716b..6453c1094a 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql @@ -1,5 +1,5 @@ -- tables: supplier,lineitem,orders,customer,nation -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ supp_nation, cust_nation, l_year, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql index 044d938555..e4c46fb084 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql @@ -1,5 +1,5 @@ -- tables: part,supplier,lineitem,orders,customer,nation,region -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ o_year, sum(CASE WHEN nation = 'BRAZIL' diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql index ed99a0375b..cee9925fb5 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql @@ -1,5 +1,5 @@ -- tables: part,supplier,lineitem,partsupp,orders,nation -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ nation, o_year, sum(amount) AS sum_profit diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql index 2164023726..c95a80fcee 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql @@ -1,5 +1,5 @@ -- tables: customer,orders,lineitem,nation -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_custkey, c_name, sum(l_extendedprice * (1 - l_discount)) AS revenue, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql index b33bd481cb..b23701e940 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql @@ -1,5 +1,5 @@ -- tables: partsupp,supplier,nation -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ ps_partkey, sum(ps_supplycost * ps_availqty) AS value FROM diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql index 909fecda95..e8893e71e4 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql @@ -1,5 +1,5 @@ -- tables: orders,lineitem -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ l_shipmode, sum(CASE WHEN o_orderpriority = '1-URGENT' diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql index b43ecb6418..9db2da60ee 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql @@ -1,5 +1,5 @@ -- tables: customer -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_count, count(*) AS custdist FROM ( diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql index 56be5be01b..70d7a57d07 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql @@ -1,5 +1,5 @@ -- tables: lineitem,part -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ 100.00 * sum(CASE +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 100.00 * sum(CASE WHEN p_type LIKE 'PROMO%' THEN l_extendedprice * (1 - l_discount) ELSE 0 diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql index 6cd05dfab7..45f75ff985 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql @@ -1,4 +1,4 @@ -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ s_suppkey, s_name, s_address, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql index 55da2c2056..37a438c796 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql @@ -1,5 +1,5 @@ -- tables: partsupp,part,supplier -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ p_brand, p_type, p_size, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql index dd52bef885..62f39a750c 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql @@ -1,5 +1,5 @@ -- tables: lineitem,part -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ sum(l_extendedprice) / 7.0 AS avg_yearly +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ sum(l_extendedprice) / 7.0 AS avg_yearly FROM lineitem, part diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql index 4aa46be458..2eb2505c01 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql @@ -1,5 +1,5 @@ -- tables: customer,orders,lineitem -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_name, c_custkey, o_orderkey, diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql index fe049badfe..16e543f87c 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql @@ -1,5 +1,5 @@ -- tables: lineitem,part -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ sum(l_extendedprice * (1 - l_discount)) AS revenue +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ sum(l_extendedprice * (1 - l_discount)) AS revenue FROM lineitem, part diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql index 9dc41c145b..a2aca56790 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql @@ -1,5 +1,5 @@ -- tables: supplier,nation,partsupp,lineitem,part -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ s_name, s_address FROM diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql index 5ac09109d2..7b4874f96c 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql @@ -1,5 +1,5 @@ -- tables: supplier,lineitem,orders,nation -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ s_name, count(*) AS numwait FROM diff --git a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql index c8de2089a7..bf784175e0 100644 --- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql +++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql @@ -1,5 +1,5 @@ -- tables: orders,customer -SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, runtime_filter_mode=OFF) */ +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ cntrycode, count(*) AS numcust, sum(c_acctbal) AS totacctbal