From 1609b6cbf2cecbabdcafd98fb11a1ed6f7c60fac Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 22 Aug 2023 19:36:50 +0800 Subject: [PATCH] [pipelineX](sort) Support sort operator (#23322) --- .../exec/aggregation_sink_operator.cpp | 17 +- .../pipeline/exec/aggregation_sink_operator.h | 9 +- .../exec/aggregation_source_operator.cpp | 2 +- .../pipeline/exec/exchange_sink_operator.cpp | 35 ++-- be/src/pipeline/exec/exchange_sink_operator.h | 13 +- .../exec/exchange_source_operator.cpp | 7 +- .../pipeline/exec/exchange_source_operator.h | 2 +- be/src/pipeline/exec/operator.cpp | 10 ++ be/src/pipeline/exec/operator.h | 11 +- be/src/pipeline/exec/result_sink_operator.cpp | 10 +- be/src/pipeline/exec/result_sink_operator.h | 9 +- be/src/pipeline/exec/sort_sink_operator.cpp | 154 ++++++++++++++++++ be/src/pipeline/exec/sort_sink_operator.h | 74 +++++++++ be/src/pipeline/exec/sort_source_operator.cpp | 48 ++++++ be/src/pipeline/exec/sort_source_operator.h | 38 ++++- be/src/pipeline/pipeline_x/dependency.h | 29 +++- .../pipeline_x_fragment_context.cpp | 24 ++- .../pipeline_x/pipeline_x_fragment_context.h | 2 - .../pipeline/pipeline_x/pipeline_x_task.cpp | 15 +- be/src/pipeline/pipeline_x/pipeline_x_task.h | 9 +- be/src/runtime/runtime_state.cpp | 4 + be/src/runtime/runtime_state.h | 3 + 22 files changed, 452 insertions(+), 73 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index a856247315..6db0162325 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -59,8 +59,8 @@ AggSinkLocalState::AggSinkLocalState(DataSinkOperatorX* parent, RuntimeState* st _hash_table_size_counter(nullptr), _max_row_size_counter(nullptr) {} -Status AggSinkLocalState::init(RuntimeState* state, Dependency* dependency) { - _dependency = (AggDependency*)dependency; +Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + _dependency = (AggDependency*)info.dependency; _shared_state = (AggSharedState*)_dependency->shared_state(); _agg_data = _shared_state->agg_data.get(); _agg_arena_pool = _shared_state->agg_arena_pool.get(); @@ -79,8 +79,7 @@ Status AggSinkLocalState::init(RuntimeState* state, Dependency* dependency) { for (size_t i = 0; i < _shared_state->probe_expr_ctxs.size(); i++) { RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, _shared_state->probe_expr_ctxs[i])); } - std::string title = fmt::format("AggSinkLocalState"); - _profile = p._pool->add(new RuntimeProfile(title)); + _profile = p._pool->add(new RuntimeProfile("AggSinkLocalState")); _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage"); _hash_table_memory_usage = ADD_CHILD_COUNTER(profile(), "HashTable", TUnit::BYTES, "MemoryUsage"); @@ -712,9 +711,9 @@ Status AggSinkLocalState::try_spill_disk(bool eos) { _agg_data->method_variant); } -AggSinkOperatorX::AggSinkOperatorX(const int id, ObjectPool* pool, const TPlanNode& tnode, +AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : DataSinkOperatorX(id), + : DataSinkOperatorX(tnode.node_id), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _intermediate_tuple_desc(nullptr), _output_tuple_id(tnode.agg_node.output_tuple_id), @@ -825,7 +824,7 @@ Status AggSinkOperatorX::prepare(RuntimeState* state) { return Status::OK(); } -Status AggSinkOperatorX::open(doris::RuntimeState* state) { +Status AggSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state)); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { @@ -855,10 +854,10 @@ Status AggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_ return Status::OK(); } -Status AggSinkOperatorX::setup_local_state(RuntimeState* state, Dependency* dependency) { +Status AggSinkOperatorX::setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) { auto local_state = AggSinkLocalState::create_shared(this, state); state->emplace_sink_local_state(id(), local_state); - return local_state->init(state, dependency); + return local_state->init(state, info); } Status AggSinkOperatorX::close(RuntimeState* state) { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 7e70004094..7ca62b3cab 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -52,7 +52,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState { public: AggSinkLocalState(DataSinkOperatorX* parent, RuntimeState* state); - Status init(RuntimeState* state, Dependency* dependency) override; + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status try_spill_disk(bool eos = false); @@ -312,8 +312,7 @@ private: class AggSinkOperatorX final : public DataSinkOperatorX { public: - AggSinkOperatorX(const int id, ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs); + AggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", _name); } @@ -322,7 +321,7 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status setup_local_state(RuntimeState* state, Dependency* dependency) override; + Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; @@ -331,7 +330,7 @@ public: bool can_write(RuntimeState* state) override { return true; } void get_dependency(DependencySPtr& dependency) override { - dependency.reset(new AggDependency()); + dependency.reset(new AggDependency(id())); } private: diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index b8436e8735..5dee84b216 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -38,7 +38,7 @@ AggLocalState::AggLocalState(RuntimeState* state, OperatorXBase* parent) Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); _dependency = (AggDependency*)info.dependency; - _shared_state = ((AggSharedState*)_dependency->shared_state()); + _shared_state = (AggSharedState*)_dependency->shared_state(); _agg_data = _shared_state->agg_data.get(); _get_results_timer = ADD_TIMER(profile(), "GetResultsTime"); _serialize_result_timer = ADD_TIMER(profile(), "SerializeResultTime"); diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index ec52ed5422..f58b9406bf 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -101,7 +101,8 @@ bool ExchangeSinkLocalState::transfer_large_data_by_brpc() const { return _parent->cast()._transfer_large_data_by_brpc; } -Status ExchangeSinkLocalState::init(RuntimeState* state, Dependency* dependency) { +Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + _sender_id = info.sender_id; _broadcast_pb_blocks.resize(config::num_broadcast_buffer); _broadcast_pb_block_idx = 0; auto& p = _parent->cast(); @@ -187,10 +188,10 @@ segment_v2::CompressionTypePB& ExchangeSinkLocalState::compression_type() { } ExchangeSinkOperatorX::ExchangeSinkOperatorX( - const int id, RuntimeState* state, ObjectPool* pool, const RowDescriptor& row_desc, + RuntimeState* state, ObjectPool* pool, const RowDescriptor& row_desc, const TDataStreamSink& sink, const std::vector& destinations, bool send_query_statistics_with_every_batch, PipelineXFragmentContext* context) - : DataSinkOperatorX(id), + : DataSinkOperatorX(sink.dest_node_id), _context(context), _pool(pool), _row_desc(row_desc), @@ -209,10 +210,10 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( } ExchangeSinkOperatorX::ExchangeSinkOperatorX( - const int id, ObjectPool* pool, const RowDescriptor& row_desc, PlanNodeId dest_node_id, + ObjectPool* pool, const RowDescriptor& row_desc, PlanNodeId dest_node_id, const std::vector& destinations, bool send_query_statistics_with_every_batch, PipelineXFragmentContext* context) - : DataSinkOperatorX(id), + : DataSinkOperatorX(dest_node_id), _context(context), _pool(pool), _row_desc(row_desc), @@ -224,11 +225,10 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( _name = "ExchangeSinkOperatorX"; } -ExchangeSinkOperatorX::ExchangeSinkOperatorX(const int id, ObjectPool* pool, - const RowDescriptor& row_desc, +ExchangeSinkOperatorX::ExchangeSinkOperatorX(ObjectPool* pool, const RowDescriptor& row_desc, bool send_query_statistics_with_every_batch, PipelineXFragmentContext* context) - : DataSinkOperatorX(id), + : DataSinkOperatorX(0), _context(context), _pool(pool), _row_desc(row_desc), @@ -253,10 +253,10 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { return Status::OK(); } -Status ExchangeSinkOperatorX::setup_local_state(RuntimeState* state, Dependency* dependency) { +Status ExchangeSinkOperatorX::setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) { auto local_state = ExchangeSinkLocalState::create_shared(this, state); state->emplace_sink_local_state(id(), local_state); - return local_state->init(state, dependency); + return local_state->init(state, info); } Status ExchangeSinkOperatorX::prepare(RuntimeState* state) { @@ -523,8 +523,18 @@ Status ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch Status status; for (int i = 0; i < num_channels; ++i) { if (!channels[i]->is_receiver_eof() && !channel2rows[i].empty()) { - status = channels[i]->add_rows(block, channel2rows[i], eos); + status = channels[i]->add_rows(block, channel2rows[i], false); HANDLE_CHANNEL_STATUS(state, channels[i], status); + channel2rows[i].clear(); + } + } + + if (eos) { + for (int i = 0; i < num_channels; ++i) { + if (!channels[i]->is_receiver_eof()) { + status = channels[i]->add_rows(block, channel2rows[i], true); + HANDLE_CHANNEL_STATUS(state, channels[i], status); + } } } @@ -533,8 +543,7 @@ Status ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch Status ExchangeSinkOperatorX::try_close(RuntimeState* state) { auto& local_state = state->get_sink_local_state(id())->cast(); - DCHECK(local_state._serializer.get_block() == nullptr || - local_state._serializer.get_block()->rows() == 0); + local_state._serializer.reset_block(); Status final_st = Status::OK(); for (int i = 0; i < local_state.channels.size(); ++i) { Status st = local_state.channels[i]->close(state); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index b6d92d5829..6e97b4a2fe 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -81,7 +81,7 @@ public: only_local_exchange(false), _serializer(this) {} - Status init(RuntimeState* state, Dependency* dependency) override; + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1); void register_channels(pipeline::ExchangeSinkBuffer* buffer); @@ -155,17 +155,16 @@ private: class ExchangeSinkOperatorX final : public DataSinkOperatorX { public: - ExchangeSinkOperatorX(const int id, RuntimeState* state, ObjectPool* pool, - const RowDescriptor& row_desc, const TDataStreamSink& sink, + ExchangeSinkOperatorX(RuntimeState* state, ObjectPool* pool, const RowDescriptor& row_desc, + const TDataStreamSink& sink, const std::vector& destinations, bool send_query_statistics_with_every_batch, PipelineXFragmentContext* context); - ExchangeSinkOperatorX(const int id, ObjectPool* pool, const RowDescriptor& row_desc, - PlanNodeId dest_node_id, + ExchangeSinkOperatorX(ObjectPool* pool, const RowDescriptor& row_desc, PlanNodeId dest_node_id, const std::vector& destinations, bool send_query_statistics_with_every_batch, PipelineXFragmentContext* context); - ExchangeSinkOperatorX(const int id, ObjectPool* pool, const RowDescriptor& row_desc, + ExchangeSinkOperatorX(ObjectPool* pool, const RowDescriptor& row_desc, bool send_query_statistics_with_every_batch, PipelineXFragmentContext* context); Status init(const TDataSink& tsink) override; @@ -174,7 +173,7 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status setup_local_state(RuntimeState* state, Dependency* dependency) override; + Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 12e8b0c79a..78e8a70bdd 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -41,7 +41,7 @@ bool ExchangeSourceOperator::is_pending_finish() const { } ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase* parent) - : PipelineXLocalState(state, parent), num_rows_skipped(0) {} + : PipelineXLocalState(state, parent), num_rows_skipped(0), is_ready(false) {} Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { if (_init) { @@ -65,7 +65,6 @@ ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo : OperatorXBase(pool, tnode, descs, op_name), _num_senders(num_senders), _is_merging(tnode.exchange_node.__isset.sort_info), - _is_ready(false), _input_row_desc(descs, tnode.exchange_node.input_row_tuples, std::vector(tnode.nullable_tuples.begin(), tnode.nullable_tuples.begin() + @@ -108,11 +107,11 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block SourceState& source_state) { auto& local_state = state->get_local_state(id())->cast(); SCOPED_TIMER(local_state.profile()->total_time_counter()); - if (_is_merging && !_is_ready) { + if (_is_merging && !local_state.is_ready) { RETURN_IF_ERROR(local_state.stream_recvr->create_merger( local_state.vsort_exec_exprs.lhs_ordering_expr_ctxs(), _is_asc_order, _nulls_first, state->batch_size(), _limit, _offset)); - _is_ready = true; + local_state.is_ready = true; return Status::OK(); } bool eos = false; diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 7df08639fd..75821c0671 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -59,6 +59,7 @@ class ExchangeLocalState : public PipelineXLocalState { std::shared_ptr stream_recvr; doris::vectorized::VSortExecExprs vsort_exec_exprs; int64_t num_rows_skipped; + bool is_ready; }; class ExchangeSourceOperatorX final : public OperatorXBase { @@ -83,7 +84,6 @@ private: friend class ExchangeLocalState; const int _num_senders; const bool _is_merging; - bool _is_ready; RowDescriptor _input_row_desc; std::shared_ptr _sub_plan_query_statistics_recvr; diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 0ea73dea54..b240521913 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -224,6 +224,16 @@ void PipelineXLocalState::reached_limit(vectorized::Block* block, bool* eos) { COUNTER_SET(_rows_returned_counter, _num_rows_returned); } +void PipelineXLocalState::reached_limit(vectorized::Block* block, SourceState& source_state) { + if (_parent->_limit != -1 and _num_rows_returned + block->rows() >= _parent->_limit) { + block->set_num_rows(_parent->_limit - _num_rows_returned); + source_state = SourceState::FINISHED; + } + + _num_rows_returned += block->rows(); + COUNTER_SET(_rows_returned_counter, _num_rows_returned); +} + std::string DataSinkOperatorX::debug_string() const { std::stringstream ss; ss << _name << ", is_closed: " << _is_closed; diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 844fd02ad4..1d59025bf2 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -485,6 +485,12 @@ struct LocalStateInfo { Dependency* dependency; }; +// This struct is used only for initializing local sink state. +struct LocalSinkStateInfo { + const int sender_id; + Dependency* dependency; +}; + class PipelineXLocalState { public: PipelineXLocalState(RuntimeState* state, OperatorXBase* parent) @@ -512,6 +518,7 @@ public: bool reached_limit() const; void reached_limit(vectorized::Block* block, bool* eos); + void reached_limit(vectorized::Block* block, SourceState& source_state); RuntimeProfile* profile() { return _runtime_profile.get(); } MemTracker* mem_tracker() { return _mem_tracker.get(); } @@ -694,7 +701,7 @@ public: : _parent(parent_), _state(state_) {} virtual ~PipelineXSinkLocalState() {} - virtual Status init(RuntimeState* state, Dependency* dependency) { return Status::OK(); } + virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) { return Status::OK(); } template TARGET& cast() { DCHECK(dynamic_cast(this)); @@ -734,7 +741,7 @@ public: virtual Status init(const TDataSink& tsink) override { return Status::OK(); } - virtual Status setup_local_state(RuntimeState* state, Dependency* dependency) = 0; + virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) = 0; template TARGET& cast() { diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 797a510b3d..57a7e674ee 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -50,7 +50,7 @@ bool ResultSinkOperator::can_write() { return _sink->_sender->can_sink(); } -Status ResultSinkLocalState::init(RuntimeState* state, Dependency* dependency) { +Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { auto& p = _parent->cast(); auto fragment_instance_id = state->fragment_instance_id(); auto title = fmt::format("VDataBufferSender (dst_fragment_instance_id={:x}-{:x})", @@ -79,10 +79,10 @@ Status ResultSinkLocalState::init(RuntimeState* state, Dependency* dependency) { return Status::OK(); } -ResultSinkOperatorX::ResultSinkOperatorX(const int id, const RowDescriptor& row_desc, +ResultSinkOperatorX::ResultSinkOperatorX(const RowDescriptor& row_desc, const std::vector& t_output_expr, const TResultSink& sink, int buffer_size) - : DataSinkOperatorX(id), + : DataSinkOperatorX(0), _row_desc(row_desc), _t_output_expr(t_output_expr), _buf_size(buffer_size) { @@ -119,10 +119,10 @@ Status ResultSinkOperatorX::open(RuntimeState* state) { return vectorized::VExpr::open(_output_vexpr_ctxs, state); } -Status ResultSinkOperatorX::setup_local_state(RuntimeState* state, Dependency* dependency) { +Status ResultSinkOperatorX::setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) { auto local_state = ResultSinkLocalState::create_shared(this, state); state->emplace_sink_local_state(id(), local_state); - return local_state->init(state, dependency); + return local_state->init(state, info); } Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index c4b399bb94..3526ce4e3e 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -48,7 +48,7 @@ public: ResultSinkLocalState(DataSinkOperatorX* parent, RuntimeState* state) : PipelineXSinkLocalState(parent, state) {} - Status init(RuntimeState* state, Dependency* dependency) override; + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; private: friend class ResultSinkOperatorX; @@ -62,12 +62,11 @@ private: class ResultSinkOperatorX final : public DataSinkOperatorX { public: - ResultSinkOperatorX(const int id, const RowDescriptor& row_desc, - const std::vector& select_exprs, const TResultSink& sink, - int buffer_size); + ResultSinkOperatorX(const RowDescriptor& row_desc, const std::vector& select_exprs, + const TResultSink& sink, int buffer_size); Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Status setup_local_state(RuntimeState* state, Dependency* dependency) override; + Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 5326b5d757..12f0d8a753 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -20,9 +20,163 @@ #include #include "pipeline/exec/operator.h" +#include "runtime/query_context.h" +#include "vec/common/sort/heap_sorter.h" +#include "vec/common/sort/topn_sorter.h" namespace doris::pipeline { OPERATOR_CODE_GENERATOR(SortSinkOperator, StreamingOperator) +Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + auto& p = _parent->cast(); + _dependency = (SortDependency*)info.dependency; + _shared_state = (SortSharedState*)_dependency->shared_state(); + + _profile = p._pool->add(new RuntimeProfile("SortSinkLocalState")); + switch (p._algorithm) { + case SortAlgorithm::HEAP_SORT: { + _shared_state->sorter = vectorized::HeapSorter::create_unique( + _vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, + p._child_x->row_desc()); + break; + } + case SortAlgorithm::TOPN_SORT: { + _shared_state->sorter = vectorized::TopNSorter::create_unique( + _vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, + p._child_x->row_desc(), state, _profile); + break; + } + case SortAlgorithm::FULL_SORT: { + _shared_state->sorter = vectorized::FullSorter::create_unique( + _vsort_exec_exprs, p._limit, p._offset, p._pool, p._is_asc_order, p._nulls_first, + p._child_x->row_desc(), state, _profile); + break; + } + default: { + return Status::InvalidArgument("Invalid sort algorithm!"); + } + } + + _shared_state->sorter->init_profile(_profile); + + SCOPED_TIMER(_profile->total_time_counter()); + _profile->add_info_string("TOP-N", p._limit == -1 ? "false" : "true"); + + _memory_usage_counter = ADD_LABEL_COUNTER(_profile, "MemoryUsage"); + _sort_blocks_memory_usage = + ADD_CHILD_COUNTER(_profile, "SortBlocks", TUnit::BYTES, "MemoryUsage"); + + _child_get_next_timer = ADD_TIMER(_profile, "ChildGetResultTime"); + _sink_timer = ADD_TIMER(_profile, "PartialSortTotalTime"); + + return p._vsort_exec_exprs.clone(state, _vsort_exec_exprs); +} + +SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : DataSinkOperatorX(tnode.node_id), + _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0), + _pool(pool), + _reuse_mem(true), + _limit(tnode.limit), + _use_topn_opt(tnode.sort_node.use_topn_opt), + _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), + _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read) {} + +Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.sort_node.sort_info, _pool)); + _is_asc_order = tnode.sort_node.sort_info.is_asc_order; + _nulls_first = tnode.sort_node.sort_info.nulls_first; + + // init runtime predicate + if (_use_topn_opt) { + auto query_ctx = state->get_query_ctx(); + auto first_sort_expr_node = tnode.sort_node.sort_info.ordering_exprs[0].nodes[0]; + if (first_sort_expr_node.node_type == TExprNodeType::SLOT_REF) { + auto first_sort_slot = first_sort_expr_node.slot_ref; + for (auto tuple_desc : _row_descriptor.tuple_descriptors()) { + if (tuple_desc->id() != first_sort_slot.tuple_id) { + continue; + } + for (auto slot : tuple_desc->slots()) { + if (slot->id() == first_sort_slot.slot_id) { + RETURN_IF_ERROR(query_ctx->get_runtime_predicate().init(slot->type().type, + _nulls_first[0])); + break; + } + } + } + } + if (!query_ctx->get_runtime_predicate().inited()) { + return Status::InternalError("runtime predicate is not properly initialized"); + } + } + return Status::OK(); +} + +Status SortSinkOperatorX::prepare(RuntimeState* state) { + const auto& row_desc = _child_x->row_desc(); + + // If `limit` is smaller than HEAP_SORT_THRESHOLD, we consider using heap sort in priority. + // To do heap sorting, each income block will be filtered by heap-top row. There will be some + // `memcpy` operations. To ensure heap sort will not incur performance fallback, we should + // exclude cases which incoming blocks has string column which is sensitive to operations like + // `filter` and `memcpy` + if (_limit > 0 && _limit + _offset < vectorized::HeapSorter::HEAP_SORT_THRESHOLD && + (_use_two_phase_read || _use_topn_opt || !row_desc.has_varlen_slots())) { + _algorithm = SortAlgorithm::HEAP_SORT; + _reuse_mem = false; + } else if (_limit > 0 && row_desc.has_varlen_slots() && + _limit + _offset < vectorized::TopNSorter::TOPN_SORT_THRESHOLD) { + _algorithm = SortAlgorithm::TOPN_SORT; + } else { + _algorithm = SortAlgorithm::FULL_SORT; + } + return _vsort_exec_exprs.prepare(state, _child_x->row_desc(), _row_descriptor); +} + +Status SortSinkOperatorX::open(RuntimeState* state) { + return _vsort_exec_exprs.open(state); +} + +Status SortSinkOperatorX::setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) { + auto local_state = SortSinkLocalState::create_shared(this, state); + state->emplace_sink_local_state(id(), local_state); + return local_state->init(state, info); +} + +Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) { + auto& local_state = state->get_sink_local_state(id())->cast(); + if (in_block->rows() > 0) { + RETURN_IF_ERROR(local_state._shared_state->sorter->append_block(in_block)); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(state->check_query_state("vsort, while sorting input.")); + + // update runtime predicate + if (_use_topn_opt) { + vectorized::Field new_top = local_state._shared_state->sorter->get_top_value(); + if (!new_top.is_null() && new_top != local_state.old_top) { + auto& sort_description = local_state._shared_state->sorter->get_sort_description(); + auto col = in_block->get_by_position(sort_description[0].column_number); + bool is_reverse = sort_description[0].direction < 0; + auto query_ctx = state->get_query_ctx(); + RETURN_IF_ERROR( + query_ctx->get_runtime_predicate().update(new_top, col.name, is_reverse)); + local_state.old_top = std::move(new_top); + } + } + if (!_reuse_mem) { + in_block->clear(); + } + } + + if (source_state == SourceState::FINISHED) { + RETURN_IF_ERROR(local_state._shared_state->sorter->prepare_for_read()); + local_state._dependency->set_done(); + } + return Status::OK(); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 6ee8f4e170..c3150a87c4 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -20,6 +20,7 @@ #include #include "operator.h" +#include "vec/core/field.h" #include "vec/exec/vsort_node.h" namespace doris { @@ -43,5 +44,78 @@ public: bool can_write() override { return true; } }; +enum class SortAlgorithm { HEAP_SORT, TOPN_SORT, FULL_SORT }; + +class SortSinkOperatorX; + +class SortSinkLocalState : public PipelineXSinkLocalState { + ENABLE_FACTORY_CREATOR(SortSinkLocalState); + +public: + SortSinkLocalState(DataSinkOperatorX* parent, RuntimeState* state) + : PipelineXSinkLocalState(parent, state) {} + + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + +private: + friend class SortSinkOperatorX; + SortDependency* _dependency; + SortSharedState* _shared_state; + + // Expressions and parameters used for build _sort_description + vectorized::VSortExecExprs _vsort_exec_exprs; + + RuntimeProfile::Counter* _memory_usage_counter; + RuntimeProfile::Counter* _sort_blocks_memory_usage; + RuntimeProfile::Counter* _child_get_next_timer = nullptr; + RuntimeProfile::Counter* _sink_timer = nullptr; + + // topn top value + vectorized::Field old_top {vectorized::Field::Types::Null}; +}; + +class SortSinkOperatorX final : public DataSinkOperatorX { +public: + SortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + Status init(const TDataSink& tsink) override { + return Status::InternalError("{} should not init with TPlanNode", _name); + } + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; + Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override; + + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override; + + bool can_write(RuntimeState* state) override { return true; } + + void get_dependency(DependencySPtr& dependency) override { + dependency.reset(new SortDependency(id())); + } + +private: + friend class SortSinkLocalState; + + // Number of rows to skip. + const int64_t _offset; + ObjectPool* _pool; + + // Expressions and parameters used for build _sort_description + vectorized::VSortExecExprs _vsort_exec_exprs; + std::vector _is_asc_order; + std::vector _nulls_first; + + bool _reuse_mem; + const int64_t _limit; + const bool _use_topn_opt; + SortAlgorithm _algorithm; + + const RowDescriptor _row_descriptor; + const bool _use_two_phase_read; +}; + } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index 0c2b5f4b6e..52aa51ad09 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -25,4 +25,52 @@ namespace doris::pipeline { OPERATOR_CODE_GENERATOR(SortSourceOperator, SourceOperator) +SortLocalState::SortLocalState(RuntimeState* state, OperatorXBase* parent) + : PipelineXLocalState(state, parent), _get_next_timer(nullptr) {} + +Status SortLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); + _dependency = (SortDependency*)info.dependency; + _shared_state = (SortSharedState*)_dependency->shared_state(); + _get_next_timer = ADD_TIMER(profile(), "GetResultTime"); + return Status::OK(); +} + +SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs, std::string op_name) + : OperatorXBase(pool, tnode, descs, op_name) {} + +Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { + auto& local_state = state->get_local_state(id())->cast(); + SCOPED_TIMER(local_state._get_next_timer); + bool eos; + RETURN_IF_ERROR_OR_CATCH_EXCEPTION( + local_state._shared_state->sorter->get_next(state, block, &eos)); + local_state.reached_limit(block, &eos); + if (eos) { + _runtime_profile->add_info_string( + "Spilled", local_state._shared_state->sorter->is_spilled() ? "true" : "false"); + source_state = SourceState::FINISHED; + } + return Status::OK(); +} + +bool SortSourceOperatorX::can_read(RuntimeState* state) { + auto& local_state = state->get_local_state(id())->cast(); + return local_state._dependency->done(); +} + +Status SortSourceOperatorX::setup_local_state(RuntimeState* state, LocalStateInfo& info) { + auto local_state = SortLocalState::create_shared(state, this); + state->emplace_local_state(id(), local_state); + return local_state->init(state, info); +} + +Status SortSourceOperatorX::close(doris::RuntimeState* state) { + auto& local_state = state->get_local_state(id())->cast(); + local_state._shared_state->sorter = nullptr; + return Status::OK(); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/sort_source_operator.h b/be/src/pipeline/exec/sort_source_operator.h index 389b86236f..8a11ce6245 100644 --- a/be/src/pipeline/exec/sort_source_operator.h +++ b/be/src/pipeline/exec/sort_source_operator.h @@ -44,5 +44,41 @@ public: Status open(RuntimeState*) override { return Status::OK(); } }; +class SortSourceOperatorX; +class SortLocalState : public PipelineXLocalState { + ENABLE_FACTORY_CREATOR(SortLocalState); + +public: + SortLocalState(RuntimeState* state, OperatorXBase* parent); + + Status init(RuntimeState* state, LocalStateInfo& info) override; + +private: + friend class SortSourceOperatorX; + + SortDependency* _dependency; + SortSharedState* _shared_state; + + RuntimeProfile::Counter* _get_next_timer = nullptr; +}; + +class SortSourceOperatorX final : public OperatorXBase { +public: + SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, + std::string op_name); + bool can_read(RuntimeState* state) override; + + Status setup_local_state(RuntimeState* state, LocalStateInfo& info) override; + + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + + Status close(RuntimeState* state) override; + bool is_source() const override { return true; } + +private: + friend class SortLocalState; +}; + } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 9e5876d426..21678a5194 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -17,23 +17,27 @@ #pragma once +#include "vec/common/sort/sorter.h" #include "vec/exec/vaggregation_node.h" -namespace doris::pipeline { +namespace doris { +namespace pipeline { class Dependency; using DependencySPtr = std::shared_ptr; class Dependency { public: - Dependency() : _done(false) {} + Dependency(int id) : _id(id), _done(false) {} virtual ~Dependency() = default; [[nodiscard]] bool done() const { return _done; } void set_done() { _done = true; } virtual void* shared_state() = 0; + [[nodiscard]] int id() const { return _id; } private: + int _id; std::atomic _done; }; @@ -63,7 +67,7 @@ public: class AggDependency final : public Dependency { public: - AggDependency() : Dependency() { + AggDependency(int id) : Dependency(id) { _mem_tracker = std::make_unique("AggregateOperator:"); } ~AggDependency() override = default; @@ -118,4 +122,21 @@ private: MemoryRecord _mem_usage_record; std::unique_ptr _mem_tracker; }; -} // namespace doris::pipeline + +struct SortSharedState { +public: + std::unique_ptr sorter; +}; + +class SortDependency final : public Dependency { +public: + SortDependency(int id) : Dependency(id) {} + ~SortDependency() override = default; + void* shared_state() override { return (void*)&_sort_state; }; + +private: + SortSharedState _sort_state; +}; + +} // namespace pipeline +} // namespace doris diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index a182934bd3..7f351f3146 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -49,6 +49,8 @@ #include "pipeline/exec/olap_scan_operator.h" #include "pipeline/exec/result_sink_operator.h" #include "pipeline/exec/scan_operator.h" +#include "pipeline/exec/sort_sink_operator.h" +#include "pipeline/exec/sort_source_operator.h" #include "pipeline/task_scheduler.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" @@ -235,8 +237,8 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData params.__isset.send_query_statistics_with_every_batch ? params.send_query_statistics_with_every_batch : false; - _sink.reset(new ExchangeSinkOperatorX(_sink_idx++, state, pool, row_desc, - thrift_sink.stream_sink, params.destinations, + _sink.reset(new ExchangeSinkOperatorX(state, pool, row_desc, thrift_sink.stream_sink, + params.destinations, send_query_statistics_with_every_batch, this)); break; } @@ -246,8 +248,7 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData } // TODO: figure out good buffer size based on size of output row - _sink.reset(new ResultSinkOperatorX(_sink_idx++, row_desc, output_exprs, - thrift_sink.result_sink, + _sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, thrift_sink.result_sink, vectorized::RESULT_SINK_BUFFER_SIZE)); break; } @@ -299,7 +300,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( auto task = std::make_unique( _pipelines[pip_id], _total_tasks++, _runtime_states[i].get(), this, - _pipelines[pip_id]->pipeline_profile(), scan_ranges); + _pipelines[pip_id]->pipeline_profile(), scan_ranges, local_params.sender_id); RETURN_IF_ERROR(task->prepare(_runtime_states[i].get())); _runtime_profile->add_child(_pipelines[pip_id]->pipeline_profile(), true, nullptr); if (pip_id < _pipelines.size() - 1) { @@ -471,7 +472,18 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN cur_pipe = add_pipeline(); DataSinkOperatorXPtr sink; - sink.reset(new AggSinkOperatorX(_sink_idx++, pool, tnode, descs)); + sink.reset(new AggSinkOperatorX(pool, tnode, descs)); + RETURN_IF_ERROR(cur_pipe->set_sink(sink)); + RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); + break; + } + case TPlanNodeType::SORT_NODE: { + op.reset(new SortSourceOperatorX(pool, tnode, descs, "SortSourceXOperator")); + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + + cur_pipe = add_pipeline(); + DataSinkOperatorXPtr sink; + sink.reset(new SortSinkOperatorX(pool, tnode, descs)); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); break; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 9f179718f1..fd27464491 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -124,8 +124,6 @@ private: // of it in pipeline task not the fragment_context DataSinkOperatorXPtr _sink; - size_t _sink_idx = 0; - std::atomic_bool _canceled = false; }; } // namespace pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 497f100b02..d6d24d4e09 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -32,6 +32,7 @@ #include "runtime/descriptors.h" #include "runtime/query_context.h" #include "runtime/thread_context.h" +#include "util/container_util.hpp" #include "util/defer_op.h" #include "util/runtime_profile.h" @@ -44,16 +45,16 @@ namespace doris::pipeline { PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile, - const std::vector& scan_ranges) + const std::vector& scan_ranges, const int sender_id) : PipelineTask(pipeline, index, state, fragment_context, parent_profile), _scan_ranges(scan_ranges), _operators(pipeline->operator_xs()), _source(_operators.front()), _root(_operators.back()), - _sink(pipeline->sink_shared_pointer()) { + _sink(pipeline->sink_shared_pointer()), + _sender_id(sender_id) { _pipeline_task_watcher.start(); _sink->get_dependency(_downstream_dependency); - _upstream_dependency.reset((Dependency*)nullptr); } Status PipelineXTask::prepare(RuntimeState* state) { @@ -93,15 +94,19 @@ Status PipelineXTask::_open() { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_CPU_TIMER(_task_cpu_timer); SCOPED_TIMER(_open_timer); - LocalStateInfo info {_scan_ranges, _upstream_dependency.get()}; Status st = Status::OK(); for (auto& o : _operators) { + Dependency* dep = _upstream_dependency.find(o->id()) == _upstream_dependency.end() + ? (Dependency*)nullptr + : _upstream_dependency.find(o->id())->second.get(); + LocalStateInfo info {_scan_ranges, dep}; Status cur_st = o->setup_local_state(_state, info); if (!cur_st.ok()) { st = cur_st; } } - RETURN_IF_ERROR(_sink->setup_local_state(_state, _downstream_dependency.get())); + LocalSinkStateInfo info {_sender_id, _downstream_dependency.get()}; + RETURN_IF_ERROR(_sink->setup_local_state(_state, info)); RETURN_IF_ERROR(st); _opened = true; return Status::OK(); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 8bbc4a66ff..74688fdc90 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -50,7 +50,7 @@ class PipelineXTask : public PipelineTask { public: PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile, - const std::vector& scan_ranges); + const std::vector& scan_ranges, const int sender_id); Status prepare(RuntimeState* state) override; @@ -96,10 +96,11 @@ public: DependencySPtr& get_downstream_dependency() { return _downstream_dependency; } void set_upstream_dependency(DependencySPtr& upstream_dependency) { - _upstream_dependency = upstream_dependency; + _upstream_dependency.insert({upstream_dependency->id(), upstream_dependency}); } private: + using DependencyMap = std::map; Status _open() override; const std::vector _scan_ranges; @@ -109,7 +110,9 @@ private: OperatorXPtr _root; DataSinkOperatorXPtr _sink; - DependencySPtr _upstream_dependency; + const int _sender_id; + + DependencyMap _upstream_dependency; DependencySPtr _downstream_dependency; }; } // namespace doris::pipeline diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 1589e6de44..3efce72ea1 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -417,21 +417,25 @@ int64_t RuntimeState::get_load_mem_limit() { void RuntimeState::emplace_local_state( int id, std::shared_ptr state) { + std::unique_lock l(_local_state_lock); _op_id_to_local_state.emplace(id, state); } std::shared_ptr RuntimeState::get_local_state(int id) { + std::unique_lock l(_local_state_lock); DCHECK(_op_id_to_local_state.find(id) != _op_id_to_local_state.end()); return _op_id_to_local_state[id]; } void RuntimeState::emplace_sink_local_state( int id, std::shared_ptr state) { + std::unique_lock l(_local_sink_state_lock); _op_id_to_sink_local_state.emplace(id, state); } std::shared_ptr RuntimeState::get_sink_local_state( int id) { + std::unique_lock l(_local_sink_state_lock); DCHECK(_op_id_to_sink_local_state.find(id) != _op_id_to_sink_local_state.end()); return _op_id_to_sink_local_state[id]; } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index d9e9bae0db..3f1126c4f3 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -547,6 +547,9 @@ private: std::map> _op_id_to_sink_local_state; + std::mutex _local_state_lock; + std::mutex _local_sink_state_lock; + QueryContext* _query_ctx = nullptr; // true if max_filter_ratio is 0