From 49a32c2ee0015b11eb8aa7bcf4fa73ab33035eba Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 25 Aug 2023 17:57:35 +0800 Subject: [PATCH] [pipelineX](fix) fix two phase execution and add test cases (#23353) --- .../exec/aggregation_source_operator.cpp | 3 +-- .../exec/exchange_source_operator.cpp | 6 +---- .../pipeline/exec/exchange_source_operator.h | 10 +++++++ be/src/pipeline/exec/operator.h | 7 +++++ be/src/pipeline/exec/result_sink_operator.cpp | 16 ++++-------- be/src/pipeline/exec/result_sink_operator.h | 5 ++-- be/src/pipeline/pipeline_task.h | 2 ++ .../pipeline_x_fragment_context.cpp | 26 ++++++++++++++++--- .../pipeline/pipeline_x/pipeline_x_task.cpp | 12 ++++++--- be/src/pipeline/pipeline_x/pipeline_x_task.h | 8 +++++- be/src/pipeline/task_scheduler.cpp | 3 ++- .../suites/ssb_sf0.1_p1/sql/flat_q1.1.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/flat_q1.2.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/flat_q1.3.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/flat_q2.1.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/flat_q2.2.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/flat_q2.3.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/flat_q3.1.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/flat_q3.2.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/flat_q3.3.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/flat_q3.4.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/flat_q4.1.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/flat_q4.2.sql | 2 +- .../suites/ssb_sf0.1_p1/sql/flat_q4.3.sql | 2 +- 24 files changed, 82 insertions(+), 42 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 2f91a633f2..cbee5a832f 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -557,8 +557,7 @@ Status AggSourceOperatorX::setup_local_state(RuntimeState* state, LocalStateInfo } bool AggSourceOperatorX::can_read(RuntimeState* state) { - auto& local_state = state->get_local_state(id())->cast(); - return local_state._dependency->done(); + return state->get_local_state(id())->cast()._dependency->done(); } } // namespace pipeline diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index c593e5ab96..dadf70f0de 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -48,11 +48,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { return Status::OK(); } RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); - auto& parent_ref = _parent->cast(); - stream_recvr = _state->exec_env()->vstream_mgr()->create_recvr( - _state, parent_ref._input_row_desc, _state->fragment_instance_id(), parent_ref._id, - parent_ref._num_senders, profile(), parent_ref._is_merging, - parent_ref._sub_plan_query_statistics_recvr); + stream_recvr = info.recvr; RETURN_IF_ERROR(_parent->cast()._vsort_exec_exprs.clone( state, vsort_exec_exprs)); _init = true; diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index d717970a97..d599d3d06d 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -80,6 +80,16 @@ public: Status close(RuntimeState* state) override; bool is_source() const override { return true; } + bool need_to_create_exch_recv() const override { return true; } + + RowDescriptor input_row_desc() const { return _input_row_desc; } + + int num_senders() const { return _num_senders; } + bool is_merging() const { return _is_merging; } + + std::shared_ptr sub_plan_query_statistics_recvr() { + return _sub_plan_query_statistics_recvr; + } private: friend class ExchangeLocalState; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index dbc323cc67..2fb484afc0 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -36,6 +36,8 @@ #include "runtime/runtime_state.h" #include "util/runtime_profile.h" #include "vec/core/block.h" +#include "vec/runtime/vdata_stream_recvr.h" +#include "vec/sink/vresult_sink.h" namespace doris { class DataSink; @@ -484,12 +486,14 @@ protected: struct LocalStateInfo { const std::vector scan_ranges; Dependency* dependency; + std::shared_ptr recvr; }; // This struct is used only for initializing local sink state. struct LocalSinkStateInfo { const int sender_id; Dependency* dependency; + std::shared_ptr sender; }; class PipelineXLocalState { @@ -674,6 +678,7 @@ public: } virtual bool is_source() const override { return false; } + [[nodiscard]] virtual bool need_to_create_exch_recv() const { return false; } Status get_next_after_projects(RuntimeState* state, vectorized::Block* block, SourceState& source_state); @@ -768,6 +773,8 @@ public: virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) = 0; + [[nodiscard]] virtual bool need_to_create_result_sender() const { return false; } + template TARGET& cast() { DCHECK(dynamic_cast(this)); diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index df917c26af..ce8a933d7e 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -58,9 +58,7 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) // create profile _profile = state->obj_pool()->add(new RuntimeProfile(title)); // create sender - RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->fragment_instance_id(), - p._buf_size, &_sender, true, - state->execution_timeout())); + _sender = info.sender; _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size()); for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); @@ -81,11 +79,8 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) ResultSinkOperatorX::ResultSinkOperatorX(const RowDescriptor& row_desc, const std::vector& t_output_expr, - const TResultSink& sink, int buffer_size) - : DataSinkOperatorX(0), - _row_desc(row_desc), - _t_output_expr(t_output_expr), - _buf_size(buffer_size) { + const TResultSink& sink) + : DataSinkOperatorX(0), _row_desc(row_desc), _t_output_expr(t_output_expr) { if (!sink.__isset.type || sink.type == TResultSinkType::MYSQL_PROTOCAL) { _sink_type = TResultSinkType::MYSQL_PROTOCAL; } else { @@ -185,7 +180,6 @@ Status ResultSinkLocalState::close(RuntimeState* state) { } bool ResultSinkOperatorX::can_write(RuntimeState* state) { - auto& local_state = state->get_sink_local_state(id())->cast(); - return local_state._sender->can_sink(); + return state->get_sink_local_state(id())->cast()._sender->can_sink(); } -} // namespace doris::pipeline \ No newline at end of file +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index e2e2e517f8..e98bae86e7 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -65,7 +65,7 @@ private: class ResultSinkOperatorX final : public DataSinkOperatorX { public: ResultSinkOperatorX(const RowDescriptor& row_desc, const std::vector& select_exprs, - const TResultSink& sink, int buffer_size); + const TResultSink& sink); Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override; @@ -75,6 +75,8 @@ public: bool can_write(RuntimeState* state) override; + [[nodiscard]] bool need_to_create_result_sender() const override { return true; } + private: friend class ResultSinkLocalState; @@ -89,7 +91,6 @@ private: // Owned by the RuntimeState. const std::vector& _t_output_expr; vectorized::VExprContextSPtrs _output_vexpr_ctxs; - int _buf_size; // Allocated from _pool // for fetch data by rowids TFetchOption _fetch_option; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 57d7659197..fc66ca54f1 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -246,6 +246,8 @@ public: } } + TUniqueId instance_id() const { return _state->fragment_instance_id(); } + protected: void _finish_p_dependency() { for (const auto& p : _pipeline->_parents) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 7e97696a62..1a569202fb 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include // IWYU pragma: no_include #include // IWYU pragma: keep @@ -252,8 +253,7 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData } // TODO: figure out good buffer size based on size of output row - _sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, thrift_sink.result_sink, - vectorized::RESULT_SINK_BUFFER_SIZE)); + _sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, thrift_sink.result_sink)); break; } default: @@ -302,10 +302,30 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, _pipelines[pip_idx]->operator_xs().front()->id(), no_scan_ranges); + std::shared_ptr sender = nullptr; + if (_pipelines[pip_idx]->sink_x()->need_to_create_result_sender()) { + // create sender + RETURN_IF_ERROR(_runtime_states[i]->exec_env()->result_mgr()->create_sender( + _runtime_states[i]->fragment_instance_id(), + vectorized::RESULT_SINK_BUFFER_SIZE, &sender, true, + _runtime_states[i]->execution_timeout())); + } + + std::shared_ptr recvr = nullptr; + if (_pipelines[pip_idx]->operator_xs().front()->need_to_create_exch_recv()) { + auto* src = + (ExchangeSourceOperatorX*)_pipelines[pip_idx]->operator_xs().front().get(); + recvr = _runtime_states[i]->exec_env()->vstream_mgr()->create_recvr( + _runtime_states[i].get(), src->input_row_desc(), + _runtime_states[i]->fragment_instance_id(), src->id(), src->num_senders(), + _runtime_profile.get(), src->is_merging(), + src->sub_plan_query_statistics_recvr()); + } auto task = std::make_unique( _pipelines[pip_idx], _total_tasks++, _runtime_states[i].get(), this, - _pipelines[pip_idx]->pipeline_profile(), scan_ranges, local_params.sender_id); + _pipelines[pip_idx]->pipeline_profile(), scan_ranges, local_params.sender_id, + sender, recvr); pipeline_id_to_task.insert({_pipelines[pip_idx]->id(), task.get()}); RETURN_IF_ERROR(task->prepare(_runtime_states[i].get())); _runtime_profile->add_child(_pipelines[pip_idx]->pipeline_profile(), true, nullptr); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 676414fce5..f55c11982e 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -45,14 +45,18 @@ namespace doris::pipeline { PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile, - const std::vector& scan_ranges, const int sender_id) + const std::vector& scan_ranges, const int sender_id, + std::shared_ptr& sender, + std::shared_ptr& recvr) : PipelineTask(pipeline, index, state, fragment_context, parent_profile), _scan_ranges(scan_ranges), _operators(pipeline->operator_xs()), _source(_operators.front()), _root(_operators.back()), _sink(pipeline->sink_shared_pointer()), - _sender_id(sender_id) { + _sender_id(sender_id), + _sender(sender), + _recvr(recvr) { _pipeline_task_watcher.start(); _sink->get_dependency(_downstream_dependency); } @@ -99,13 +103,13 @@ Status PipelineXTask::_open() { Dependency* dep = _upstream_dependency.find(o->id()) == _upstream_dependency.end() ? (Dependency*)nullptr : _upstream_dependency.find(o->id())->second.get(); - LocalStateInfo info {_scan_ranges, dep}; + LocalStateInfo info {_scan_ranges, dep, _recvr}; Status cur_st = o->setup_local_state(_state, info); if (!cur_st.ok()) { st = cur_st; } } - LocalSinkStateInfo info {_sender_id, _downstream_dependency.get()}; + LocalSinkStateInfo info {_sender_id, _downstream_dependency.get(), _sender}; RETURN_IF_ERROR(_sink->setup_local_state(_state, info)); RETURN_IF_ERROR(st); _opened = true; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 1453b10ba2..864709b4ed 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -31,6 +31,7 @@ #include "util/runtime_profile.h" #include "util/stopwatch.hpp" #include "vec/core/block.h" +#include "vec/sink/vresult_sink.h" namespace doris { class QueryContext; @@ -50,7 +51,9 @@ class PipelineXTask : public PipelineTask { public: PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile, - const std::vector& scan_ranges, const int sender_id); + const std::vector& scan_ranges, const int sender_id, + std::shared_ptr& sender, + std::shared_ptr& recvr); Status prepare(RuntimeState* state) override; @@ -127,5 +130,8 @@ private: DependencyMap _upstream_dependency; DependencySPtr _downstream_dependency; + + std::shared_ptr _sender; + std::shared_ptr _recvr; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index b792f0f4c6..0be333479b 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -123,7 +123,8 @@ void BlockedTaskScheduler::_schedule() { } else if (task->query_context()->is_timeout(now)) { LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id()) << ", instance_id=" - << print_id(task->fragment_context()->get_fragment_instance_id()); + << print_id(task->fragment_context()->get_fragment_instance_id()) + << ", task info: " << task->debug_string(); task->fragment_context()->cancel(PPlanFragmentCancelReason::TIMEOUT); _make_task_run(local_blocked_tasks, iter, ready_tasks); diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.1.sql index eae02823a0..c3d19b67a2 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. -SELECT SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue FROM lineorder_flat WHERE LO_ORDERDATE >= 19930101 diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.2.sql index 3a899c9344..6ab6ceea34 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q1.2 -SELECT SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue FROM lineorder_flat WHERE LO_ORDERDATE >= 19940101 diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.3.sql index 5aaeff83a7..70796c2a95 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q1.3 -SELECT SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue FROM lineorder_flat WHERE weekofyear(LO_ORDERDATE) = 6 diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.1.sql index 254ea6481a..57f2ada296 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q2.1 -SELECT +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR, P_BRAND FROM lineorder_flat diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.2.sql index 6a636f3a9e..9b7a5db502 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q2.2 -SELECT +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR, P_BRAND FROM lineorder_flat diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.3.sql index a2ef0c6df3..3a8a5e74d4 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q2.3 -SELECT +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR, P_BRAND FROM lineorder_flat diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.1.sql index 8df98222c4..6b3257f1f3 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q3.1 -SELECT +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ C_NATION, S_NATION, (LO_ORDERDATE DIV 10000) AS YEAR, SUM(LO_REVENUE) AS revenue diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.2.sql index c588b5bbce..fefe727da8 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q3.2 -SELECT +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ C_CITY, S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR, SUM(LO_REVENUE) AS revenue diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.3.sql index 9a099d1732..c4560b701e 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q3.3 -SELECT +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ C_CITY, S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR, SUM(LO_REVENUE) AS revenue diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.4.sql b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.4.sql index 6bd71b5891..4ae5d956e4 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.4.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.4.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q3.4 -SELECT +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ C_CITY, S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR, SUM(LO_REVENUE) AS revenue diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.1.sql b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.1.sql index aedd0e047e..87b29bf160 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.1.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.1.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q4.1 -SELECT (LO_ORDERDATE DIV 10000) AS YEAR, +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE DIV 10000) AS YEAR, C_NATION, SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit FROM lineorder_flat diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.2.sql b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.2.sql index b9891ee408..8ea28f3f12 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.2.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.2.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q4.2 -SELECT (LO_ORDERDATE DIV 10000) AS YEAR, +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE DIV 10000) AS YEAR, S_NATION, P_CATEGORY, SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.3.sql b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.3.sql index 6871023137..0f7c7401ab 100644 --- a/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.3.sql +++ b/regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.3.sql @@ -15,7 +15,7 @@ -- specific language governing permissions and limitations -- under the License. --Q4.3 -SELECT (LO_ORDERDATE DIV 10000) AS YEAR, +SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE DIV 10000) AS YEAR, S_CITY, P_BRAND, SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit