diff --git a/be/src/pipeline/exec/olap_table_sink_operator.cpp b/be/src/pipeline/exec/olap_table_sink_operator.cpp new file mode 100644 index 0000000000..999bf921d5 --- /dev/null +++ b/be/src/pipeline/exec/olap_table_sink_operator.cpp @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap_table_sink_operator.h" + +#include "common/status.h" + +namespace doris { +class DataSink; +} // namespace doris + +namespace doris::pipeline { + +OperatorPtr OlapTableSinkOperatorBuilder::build_operator() { + return std::make_shared(this, _sink); +} + +Status OlapTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(_open_timer); + auto& p = _parent->cast(); + RETURN_IF_ERROR(_writer->init_properties(p._pool, p._group_commit)); + return Status::OK(); +} + +Status OlapTableSinkLocalState::close(RuntimeState* state, Status exec_status) { + if (Base::_closed) { + return Status::OK(); + } + SCOPED_TIMER(_close_timer); + SCOPED_TIMER(profile()->total_time_counter()); + if (_closed) { + return _close_status; + } + _close_status = Base::close(state, exec_status); + return _close_status; +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h b/be/src/pipeline/exec/olap_table_sink_operator.h index e3bab01faf..244480273e 100644 --- a/be/src/pipeline/exec/olap_table_sink_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_operator.h @@ -18,6 +18,7 @@ #pragma once #include "operator.h" +#include "pipeline/pipeline_x/operator.h" #include "vec/sink/vtablet_sink.h" namespace doris { @@ -41,9 +42,82 @@ public: bool can_write() override { return true; } // TODO: need use mem_limit }; -OperatorPtr OlapTableSinkOperatorBuilder::build_operator() { - return std::make_shared(this, _sink); -} +class OlapTableSinkOperatorX; + +class OlapTableSinkLocalState final + : public AsyncWriterSink { +public: + using Base = AsyncWriterSink; + using Parent = OlapTableSinkOperatorX; + ENABLE_FACTORY_CREATOR(OlapTableSinkLocalState); + OlapTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {}; + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override { + SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(_open_timer); + return Base::open(state); + } + + Status close(RuntimeState* state, Status exec_status) override; + friend class OlapTableSinkOperatorX; + +private: + Status _close_status = Status::OK(); +}; +class OlapTableSinkOperatorX final : public DataSinkOperatorX { +public: + using Base = DataSinkOperatorX; + OlapTableSinkOperatorX(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector& t_output_expr, bool group_commit) + : Base(0), + _row_desc(row_desc), + _t_output_expr(t_output_expr), + _group_commit(group_commit), + _pool(pool) {}; + + Status init(const TDataSink& thrift_sink) override { + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); + return Status::OK(); + } + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); + return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc); + } + + Status open(RuntimeState* state) override { + RETURN_IF_ERROR(Base::open(state)); + return vectorized::VExpr::open(_output_vexpr_ctxs, state); + } + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state); + SCOPED_TIMER(local_state.profile()->total_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + return local_state.sink(state, in_block, source_state); + } + + FinishDependency* finish_blocked_by(RuntimeState* state) const override { + auto& local_state = state->get_sink_local_state(id())->cast(); + return local_state._finish_dependency->finish_blocked_by(); + }; + + WriteDependency* wait_for_dependency(RuntimeState* state) override { + CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); + return local_state.write_blocked_by(); + } + +private: + friend class OlapTableSinkLocalState; + template + friend class AsyncWriterSink; + const RowDescriptor& _row_desc; + vectorized::VExprContextSPtrs _output_vexpr_ctxs; + const std::vector& _t_output_expr; + const bool _group_commit; + ObjectPool* _pool; +}; } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 2bc19bf391..fcdbb68535 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -55,7 +55,8 @@ PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, const T : DataSinkOperatorX(tnode.node_id), _pool(pool), _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), - _limit(tnode.limit) {} + _limit(tnode.limit), + _topn_phase(tnode.partition_sort_node.ptopn_phase) {} Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); @@ -106,7 +107,9 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._value_places[0]->append_whole_block(input_block, _child_x->row_desc()); } else { //just simply use partition num to check - if (local_state._num_partition > config::partition_topn_partition_threshold && + //if is TWO_PHASE_GLOBAL, must be sort all data thought partition num threshold have been exceeded. + if (_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL && + local_state._num_partition > config::partition_topn_partition_threshold && local_state.child_input_rows < 10000 * local_state._num_partition) { { std::lock_guard lock(local_state._shared_state->buffer_mutex); diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index 7dbe616fd6..6c124bf3b1 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -110,6 +110,8 @@ private: int _partition_exprs_num = 0; vectorized::VExprContextSPtrs _partition_expr_ctxs; + TPartTopNPhase::type _topn_phase; + // Expressions and parameters used for build _sort_description vectorized::VSortExecExprs _vsort_exec_exprs; std::vector _is_asc_order; diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index 1382bf716b..a67728de4f 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -53,6 +53,8 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: local_state._shared_state->blocks_buffer.front().swap(*output_block); local_state._shared_state->blocks_buffer.pop(); //if buffer have no data, block reading and wait for signal again + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block, + output_block->columns())); if (local_state._shared_state->blocks_buffer.empty()) { local_state._dependency->block_reading(); } @@ -61,7 +63,13 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: } // is_ready_for_read: this is set by sink node using: local_state._dependency->set_ready_for_read() + // notice: must output block from _blocks_buffer firstly, and then get_sorted_block. + // as when the child is eos, then set _can_read = true, and _partition_sorts have push_back sorter. + // if we move the _blocks_buffer output at last(behind 286 line), + // it's maybe eos but not output all data: when _blocks_buffer.empty() and _can_read = false (this: _sort_idx && _partition_sorts.size() are 0) RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state)); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block, + output_block->columns())); { std::lock_guard lock(local_state._shared_state->buffer_mutex); if (local_state._shared_state->blocks_buffer.empty() && diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 191dabd66e..85598e6874 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -42,6 +42,7 @@ #include "pipeline/exec/nested_loop_join_build_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/olap_scan_operator.h" +#include "pipeline/exec/olap_table_sink_operator.h" #include "pipeline/exec/partition_sort_sink_operator.h" #include "pipeline/exec/partition_sort_source_operator.h" #include "pipeline/exec/repeat_operator.h" @@ -588,11 +589,15 @@ Status AsyncWriterSink::close(RuntimeState* state, Status exec_s } COUNTER_SET(_wait_for_dependency_timer, _async_writer_dependency->write_watcher_elapse_time()); // if the init failed, the _writer may be nullptr. so here need check - if (_writer && _writer->need_normal_close()) { - if (exec_status.ok() && !state->is_cancelled()) { - RETURN_IF_ERROR(_writer->commit_trans()); + if (_writer) { + if (_writer->need_normal_close()) { + if (exec_status.ok() && !state->is_cancelled()) { + RETURN_IF_ERROR(_writer->commit_trans()); + } + RETURN_IF_ERROR(_writer->close(exec_status)); + } else { + RETURN_IF_ERROR(_writer->get_writer_status()); } - RETURN_IF_ERROR(_writer->close(exec_status)); } return PipelineXSinkLocalState<>::close(state, exec_status); } @@ -610,6 +615,7 @@ DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState) DECLARE_OPERATOR_X(ResultSinkLocalState) DECLARE_OPERATOR_X(JdbcTableSinkLocalState) DECLARE_OPERATOR_X(ResultFileSinkLocalState) +DECLARE_OPERATOR_X(OlapTableSinkLocalState) DECLARE_OPERATOR_X(AnalyticSinkLocalState) DECLARE_OPERATOR_X(SortSinkLocalState) DECLARE_OPERATOR_X(BlockingAggSinkLocalState) @@ -685,5 +691,6 @@ template class PipelineXLocalState; template class AsyncWriterSink; template class AsyncWriterSink; +template class AsyncWriterSink; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 1191653637..2c852eecac 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -65,6 +65,7 @@ #include "pipeline/exec/nested_loop_join_build_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/olap_scan_operator.h" +#include "pipeline/exec/olap_table_sink_operator.h" #include "pipeline/exec/partition_sort_sink_operator.h" #include "pipeline/exec/partition_sort_source_operator.h" #include "pipeline/exec/repeat_operator.h" @@ -264,6 +265,15 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData _sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, thrift_sink.result_sink)); break; } + case TDataSinkType::OLAP_TABLE_SINK: { + if (state->query_options().enable_memtable_on_sink_node) { + return Status::InternalError( + "Unsuported OLAP_TABLE_SINK with enable_memtable_on_sink_node "); + } else { + _sink.reset(new OlapTableSinkOperatorX(pool, row_desc, output_exprs, false)); + } + break; + } case TDataSinkType::JDBC_TABLE_SINK: { if (!thrift_sink.__isset.jdbc_table_sink) { return Status::InternalError("Missing data jdbc sink."); @@ -394,7 +404,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( _runtime_states[i]->set_desc_tbl(_query_ctx->desc_tbl); _runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id); - + _runtime_states[i]->set_num_per_fragment_instances(request.num_senders); std::map pipeline_id_to_task; for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { auto task = std::make_unique(_pipelines[pip_idx], _total_tasks++, diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index c6ea704aa7..5247973f57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -323,7 +323,7 @@ public class Coordinator implements CoordInterface { this.enablePipelineEngine = context.getSessionVariable().getEnablePipelineEngine() && (fragments.size() > 0); this.enablePipelineXEngine = context.getSessionVariable().getEnablePipelineXEngine() - && (fragments.size() > 0 && fragments.get(0).getSink() instanceof ResultSink); + && (fragments.size() > 0); initQueryOptions(context); diff --git a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_mysql.out b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_mysql.out index f1fb0d3351..9a2ff768a1 100644 --- a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_mysql.out +++ b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_mysql.out @@ -1152,14 +1152,14 @@ abc \N -- !sql14 -- \N 342 0 136 -16 1 -17 1 -28 1 +1 1 +11 1 +13 1 +14 1 +2 1 4 1 -52 1 -58 1 -61 1 -89 1 +7 1 +8 1 -- !sql15 -- 1025 1 diff --git a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy index b30c5bc103..32c5051252 100644 --- a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy +++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_mysql.groovy @@ -733,7 +733,7 @@ suite("test_jdbc_query_mysql", "p0,external,mysql,external_docker,external_docke order_qt_sql13 """ SELECT k2, sum(CAST(NULL AS BIGINT)) FROM $jdbcMysql57Table1 GROUP BY k2 """ order_qt_sql14 """ SELECT `key`, COUNT(*) as c FROM ( SELECT CASE WHEN k8 % 3 = 0 THEN NULL WHEN k8 % 5 = 0 THEN 0 ELSE k8 END AS `key` - FROM $jdbcMysql57Table1) as a GROUP BY `key` order by c desc limit 10""" + FROM $jdbcMysql57Table1) as a GROUP BY `key` order by c desc , `key` asc limit 10""" order_qt_sql15 """ SELECT lines, COUNT(*) as c FROM (SELECT k7, COUNT(*) lines FROM $jdbcMysql57Table1 GROUP BY k7) U GROUP BY lines order by c""" order_qt_sql16 """ SELECT COUNT(DISTINCT k8 + 1) FROM $jdbcMysql57Table1 """ order_qt_sql17 """ SELECT COUNT(*) FROM (SELECT DISTINCT k8 + 1 FROM $jdbcMysql57Table1) t """ diff --git a/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy b/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy index bc9db5add0..189db974fb 100644 --- a/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy +++ b/regression-test/suites/load_p0/insert/test_insert_move_memtable.groovy @@ -17,6 +17,7 @@ suite("test_insert_move_memtable") { sql """ set enable_memtable_on_sink_node=true """ + sql """ set experimental_enable_pipeline_x_engine=false """ // todo: test insert, such as insert values, insert select, insert txn sql "show load" def test_baseall = "test_query_db.baseall";