diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 0a484803cb..dcb6b2e497 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -346,8 +346,6 @@ public: virtual Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - virtual bool can_write(RuntimeState* state) override { return true; } - using DataSinkOperatorX::id; protected: diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index a840a72f13..a5741e0a91 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -64,6 +64,7 @@ public: protected: friend class AggSourceOperatorX; friend class StreamingAggSourceOperatorX; + friend class StreamingAggSinkOperatorX; void _close_without_key(); void _close_with_serialized_key(); diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 6d0fc9ced7..9159f6b176 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -122,10 +122,10 @@ Status AnalyticSinkOperatorX::prepare(RuntimeState* state) { return Status::OK(); } -bool AnalyticSinkOperatorX::can_write(RuntimeState* state) { +WriteDependency* AnalyticSinkOperatorX::wait_for_dependency(RuntimeState* state) { return state->get_sink_local_state(id()) ->cast() - ._shared_state->need_more_input; + ._dependency->write_blocked_by(); } Status AnalyticSinkOperatorX::open(RuntimeState* state) { @@ -144,8 +144,8 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); local_state._shared_state->input_eos = source_state == SourceState::FINISHED; if (local_state._shared_state->input_eos && input_block->rows() == 0) { - local_state._shared_state->need_more_input = false; local_state._dependency->set_ready_for_read(); + local_state._dependency->block_writing(); return Status::OK(); } diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 14abe5a6d8..41d276205b 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -82,7 +82,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - bool can_write(RuntimeState* state) override; + WriteDependency* wait_for_dependency(RuntimeState* state) override; private: Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr, diff --git a/be/src/pipeline/exec/data_queue.cpp b/be/src/pipeline/exec/data_queue.cpp index e070fe2c99..49016736f1 100644 --- a/be/src/pipeline/exec/data_queue.cpp +++ b/be/src/pipeline/exec/data_queue.cpp @@ -30,8 +30,7 @@ namespace doris { namespace pipeline { -DataQueue::DataQueue(int child_count, AggDependency* agg_dependency, - UnionDependency* union_dependency) +DataQueue::DataQueue(int child_count, WriteDependency* dependency) : _queue_blocks_lock(child_count), _queue_blocks(child_count), _free_blocks_lock(child_count), @@ -42,8 +41,7 @@ DataQueue::DataQueue(int child_count, AggDependency* agg_dependency, _cur_bytes_in_queue(child_count), _cur_blocks_nums_in_queue(child_count), _flag_queue_idx(0), - _agg_dependency(agg_dependency), - _union_dependency(union_dependency) { + _dependency(dependency) { for (int i = 0; i < child_count; ++i) { _queue_blocks_lock[i].reset(new std::mutex()); _free_blocks_lock[i].reset(new std::mutex()); @@ -119,9 +117,9 @@ Status DataQueue::get_block_from_queue(std::unique_ptr* outpu } _cur_bytes_in_queue[_flag_queue_idx] -= (*output_block)->allocated_bytes(); _cur_blocks_nums_in_queue[_flag_queue_idx] -= 1; - if (_agg_dependency && _cur_blocks_nums_in_queue[_flag_queue_idx] == 0 && - !_is_finished[0]) { - _agg_dependency->block_reading(); + if (_dependency) { + _dependency->block_reading(); + _dependency->set_ready_for_write(); } } else { if (_is_finished[_flag_queue_idx]) { @@ -141,8 +139,9 @@ void DataQueue::push_block(std::unique_ptr block, int child_i _cur_bytes_in_queue[child_idx] += block->allocated_bytes(); _queue_blocks[child_idx].emplace_back(std::move(block)); _cur_blocks_nums_in_queue[child_idx] += 1; - if (_agg_dependency) { - _agg_dependency->set_ready_for_read(); + if (_dependency) { + _dependency->set_ready_for_read(); + _dependency->block_writing(); } //this only use to record the queue[0] for profile _max_bytes_in_queue = std::max(_max_bytes_in_queue, _cur_bytes_in_queue[0].load()); @@ -152,11 +151,8 @@ void DataQueue::push_block(std::unique_ptr block, int child_i void DataQueue::set_finish(int child_idx) { _is_finished[child_idx] = true; - if (_agg_dependency) { - _agg_dependency->set_ready_for_read(); - } - if (_union_dependency && is_all_finish()) { - _union_dependency->set_ready_for_read(); + if (_dependency) { + _dependency->set_ready_for_read(); } } @@ -164,11 +160,8 @@ void DataQueue::set_canceled(int child_idx) { DCHECK(!_is_finished[child_idx]); _is_canceled[child_idx] = true; _is_finished[child_idx] = true; - if (_agg_dependency) { - _agg_dependency->set_ready_for_read(); - } - if (_union_dependency && is_all_finish()) { - _union_dependency->set_ready_for_read(); + if (_dependency) { + _dependency->set_ready_for_read(); } } @@ -186,4 +179,4 @@ bool DataQueue::is_all_finish() { } } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h index ab65bfeea6..cc716309a6 100644 --- a/be/src/pipeline/exec/data_queue.h +++ b/be/src/pipeline/exec/data_queue.h @@ -30,14 +30,12 @@ namespace doris { namespace pipeline { -class AggDependency; -class UnionDependency; +class WriteDependency; class DataQueue { public: //always one is enough, but in union node it's has more children - DataQueue(int child_count = 1, AggDependency* agg_dependency = nullptr, - UnionDependency* union_dependency = nullptr); + DataQueue(int child_count = 1, WriteDependency* dependency = nullptr); ~DataQueue() = default; Status get_block_from_queue(std::unique_ptr* block, @@ -64,6 +62,9 @@ public: bool data_exhausted() const { return _data_exhausted; } private: + friend class AggDependency; + friend class UnionDependency; + std::vector> _queue_blocks_lock; std::vector>> _queue_blocks; @@ -88,8 +89,8 @@ private: int64_t _max_size_of_queue = 0; static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10; - AggDependency* _agg_dependency = nullptr; - UnionDependency* _union_dependency = nullptr; + WriteDependency* _dependency = nullptr; }; + } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 0c7df2437d..8809dfdfe5 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -44,7 +44,19 @@ #include "util/time.h" #include "vec/sink/vdata_stream_sender.h" -namespace doris::pipeline { +namespace doris { +namespace vectorized { + +void BroadcastPBlockHolder::unref() noexcept { + DCHECK_GT(_ref_count._value, 0); + _ref_count._value.fetch_sub(1); + if (_dep && _ref_count._value == 0) { + _dep->return_available_block(); + } +} +} // namespace vectorized + +namespace pipeline { template ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, @@ -118,6 +130,7 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { std::queue, std::list>>(); _instance_to_broadcast_package_queue[low_id] = std::queue, std::list>>(); + _queue_capacity = QUEUE_CAPACITY_FACTOR * _instance_to_package_queue.size(); PUniqueId finst_id; finst_id.set_hi(fragment_instance_id.hi); finst_id.set_lo(fragment_instance_id.lo); @@ -143,6 +156,10 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { _instance_to_sending_by_pipeline[ins_id.lo] = false; } _instance_to_package_queue[ins_id.lo].emplace(std::move(request)); + _total_queue_size++; + if (_queue_dependency && _total_queue_size > _queue_capacity) { + _queue_dependency->block_writing(); + } } if (send_now) { RETURN_IF_ERROR(_send_rpc(ins_id.lo)); @@ -244,6 +261,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { static_cast(brpc_request->release_block()); } q.pop(); + _total_queue_size--; + if (_queue_dependency && _total_queue_size <= _queue_capacity) { + _queue_dependency->set_ready_for_write(); + } } else if (!broadcast_q.empty()) { // If we have data to shuffle which is broadcasted auto& request = broadcast_q.front(); @@ -395,6 +416,6 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { template class ExchangeSinkBuffer; template class ExchangeSinkBuffer; -// -//template bool ExchangeSinkBuffer::can_write() const; -} // namespace doris::pipeline + +} // namespace pipeline +} // namespace doris diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index f6c702b417..b5d6812b4f 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -42,6 +42,11 @@ class TUniqueId; using InstanceLoId = int64_t; +namespace pipeline { +class BroadcastDependency; +class ExchangeSinkQueueDependency; +} // namespace pipeline + namespace vectorized { class VDataStreamSender; template @@ -65,13 +70,11 @@ struct AtomicWrapper { // PBlock is available for next serialization. class BroadcastPBlockHolder { public: - BroadcastPBlockHolder() : _ref_count(0) {} + BroadcastPBlockHolder() : _ref_count(0), _dep(nullptr) {} + BroadcastPBlockHolder(pipeline::BroadcastDependency* dep) : _ref_count(0), _dep(dep) {} ~BroadcastPBlockHolder() noexcept = default; - void unref() noexcept { - DCHECK_GT(_ref_count._value, 0); - _ref_count._value.fetch_sub(1); - } + void unref() noexcept; void ref() noexcept { _ref_count._value.fetch_add(1); } bool available() { return _ref_count._value == 0; } @@ -81,6 +84,7 @@ public: private: AtomicWrapper _ref_count; PBlock pblock; + pipeline::BroadcastDependency* _dep; }; } // namespace vectorized @@ -177,6 +181,10 @@ public: void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); void update_profile(RuntimeProfile* profile); + void set_queue_dependency(std::shared_ptr queue_dependency) { + _queue_dependency = queue_dependency; + } + private: phmap::flat_hash_map> _instance_to_package_queue_mutex; @@ -216,7 +224,12 @@ private: inline bool _is_receiver_eof(InstanceLoId id); void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time); int64_t get_sum_rpc_time(); + + std::atomic _total_queue_size = 0; + static constexpr int QUEUE_CAPACITY_FACTOR = 64; + int _queue_capacity = 0; + std::shared_ptr _queue_dependency = nullptr; }; } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 1ded3eaa25..d4e4e353b7 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -124,8 +124,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _peak_memory_usage_counter = _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); - _broadcast_pb_blocks.resize(config::num_broadcast_buffer); - _broadcast_pb_block_idx = 0; + static const std::string timer_name = "WaitForDependencyTime"; + _wait_for_dependency_timer = ADD_TIMER(_profile, timer_name); + _wait_queue_timer = ADD_CHILD_TIMER(_profile, "WaitForRpcBufferQueue", timer_name); + auto& p = _parent->cast(); std::map fragment_id_to_channel_index; @@ -174,6 +176,41 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf id, p._dest_node_id, _sender_id, _state->be_number(), state->get_query_ctx()); register_channels(_sink_buffer.get()); + + _exchange_sink_dependency = AndDependency::create_shared(_parent->id()); + _queue_dependency = ExchangeSinkQueueDependency::create_shared(_parent->id()); + _exchange_sink_dependency->add_child(_queue_dependency); + if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && + !only_local_exchange) { + _broadcast_dependency = BroadcastDependency::create_shared(_parent->id()); + _broadcast_dependency->set_available_block(config::num_broadcast_buffer); + _broadcast_pb_blocks.reserve(config::num_broadcast_buffer); + for (size_t i = 0; i < config::num_broadcast_buffer; i++) { + _broadcast_pb_blocks.emplace_back( + vectorized::BroadcastPBlockHolder(_broadcast_dependency.get())); + } + _exchange_sink_dependency->add_child(_broadcast_dependency); + + _wait_broadcast_buffer_timer = + ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name); + } else if (local_size > 0) { + size_t dep_id = 0; + _channels_dependency.resize(local_size); + _wait_channel_timer.resize(local_size); + auto deps_for_channels = AndDependency::create_shared(_parent->id()); + for (auto channel : channels) { + if (channel->is_local()) { + _channels_dependency[dep_id] = ChannelDependency::create_shared( + _parent->id(), _sender_id, channel->local_recvr()); + channel->set_dependency(_channels_dependency[dep_id]); + deps_for_channels->add_child(_channels_dependency[dep_id]); + _wait_channel_timer[dep_id] = + ADD_CHILD_TIMER(_profile, "WaitForLocalExchangeBuffer", timer_name); + dep_id++; + } + } + _exchange_sink_dependency->add_child(deps_for_channels); + } return Status::OK(); } @@ -303,6 +340,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } else { vectorized::BroadcastPBlockHolder* block_holder = nullptr; RETURN_IF_ERROR(local_state.get_next_available_buffer(&block_holder)); + local_state._broadcast_dependency->take_available_block(); { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); bool serialized = false; @@ -447,32 +485,6 @@ Status ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vec return Status::OK(); } -bool ExchangeSinkLocalState::channel_all_can_write() { - auto& p = _parent->cast(); - if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && - !only_local_exchange) { - // This condition means we need use broadcast buffer, so we should make sure - // there are available buffer before running pipeline - if (_broadcast_pb_block_idx == _broadcast_pb_blocks.size()) { - _broadcast_pb_block_idx = 0; - } - - for (; _broadcast_pb_block_idx < _broadcast_pb_blocks.size(); _broadcast_pb_block_idx++) { - if (_broadcast_pb_blocks[_broadcast_pb_block_idx].available()) { - return true; - } - } - return false; - } else { - for (auto channel : channels) { - if (!channel->can_write()) { - return false; - } - } - return true; - } -} - void ExchangeSinkLocalState::register_channels( pipeline::ExchangeSinkBuffer* buffer) { for (auto channel : channels) { @@ -482,10 +494,16 @@ void ExchangeSinkLocalState::register_channels( Status ExchangeSinkLocalState::get_next_available_buffer( vectorized::BroadcastPBlockHolder** holder) { - DCHECK(_broadcast_pb_blocks[_broadcast_pb_block_idx].available()); - *holder = &_broadcast_pb_blocks[_broadcast_pb_block_idx]; - _broadcast_pb_block_idx++; - return Status::OK(); + // This condition means we need use broadcast buffer, so we should make sure + // there are available buffer before running pipeline + for (size_t broadcast_pb_block_idx = 0; broadcast_pb_block_idx < _broadcast_pb_blocks.size(); + broadcast_pb_block_idx++) { + if (_broadcast_pb_blocks[broadcast_pb_block_idx].available()) { + *holder = &_broadcast_pb_blocks[broadcast_pb_block_idx]; + return Status::OK(); + } + } + return Status::InternalError("No broadcast buffer left!"); } template @@ -534,19 +552,28 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* state) { } Status ExchangeSinkLocalState::close(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); - SCOPED_TIMER(_close_timer); if (_closed) { return Status::OK(); } + SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(_close_timer); + COUNTER_UPDATE(_wait_queue_timer, _queue_dependency->write_watcher_elapse_time()); + if (_broadcast_dependency) { + COUNTER_UPDATE(_wait_broadcast_buffer_timer, + _broadcast_dependency->write_watcher_elapse_time()); + } + for (size_t i = 0; i < _channels_dependency.size(); i++) { + COUNTER_UPDATE(_wait_channel_timer[i], + _channels_dependency[i]->write_watcher_elapse_time()); + } _sink_buffer->update_profile(profile()); _sink_buffer->close(); return PipelineXSinkLocalState<>::close(state); } -bool ExchangeSinkOperatorX::can_write(RuntimeState* state) { +WriteDependency* ExchangeSinkOperatorX::wait_for_dependency(RuntimeState* state) { auto& local_state = state->get_sink_local_state(id())->cast(); - return local_state._sink_buffer->can_write() && local_state.channel_all_can_write(); + return local_state._exchange_sink_dependency->write_blocked_by(); } bool ExchangeSinkOperatorX::is_pending_finish(RuntimeState* state) const { diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 134a9c9cc4..6ee64f326a 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -66,6 +66,85 @@ private: int _mult_cast_id = -1; }; +class ExchangeSinkQueueDependency final : public WriteDependency { +public: + ENABLE_FACTORY_CREATOR(ExchangeSinkQueueDependency); + ExchangeSinkQueueDependency(int id) : WriteDependency(id, "ResultQueueDependency") {} + ~ExchangeSinkQueueDependency() = default; + + void* shared_state() override { return nullptr; } +}; + +class BroadcastDependency final : public WriteDependency { +public: + ENABLE_FACTORY_CREATOR(BroadcastDependency); + BroadcastDependency(int id) : WriteDependency(id, "BroadcastDependency"), _available_block(0) {} + virtual ~BroadcastDependency() = default; + + [[nodiscard]] WriteDependency* write_blocked_by() override { + return _available_block > 0 ? nullptr : this; + } + + void set_available_block(int available_block) { _available_block = available_block; } + + void return_available_block() { _available_block++; } + + void take_available_block() { _available_block--; } + + void* shared_state() override { + throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!"); + return nullptr; + } + + void set_ready_for_write() override { + throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!"); + } + + void block_writing() override { + throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!"); + } + +private: + std::atomic _available_block; +}; + +class ChannelDependency final : public WriteDependency { +public: + ENABLE_FACTORY_CREATOR(ChannelDependency); + ChannelDependency(int id, int sender_id, vectorized::VDataStreamRecvr* local_recvr) + : WriteDependency(id, "ChannelDependency"), + _sender_id(sender_id), + _local_recvr(local_recvr) {} + virtual ~ChannelDependency() = default; + + void* shared_state() override { return nullptr; } + + void try_set_ready_for_write() { + if (_ready_for_write) { + return; + } + if (_is_runnable()) { + _write_dependency_watcher.stop(); + _ready_for_write = true; + } + } + + void try_block_writing() { + if (!_is_runnable()) { + _ready_for_write = false; + } + } + +private: + bool _is_runnable() { + return _local_recvr->is_closed() || !_local_recvr->exceeds_limit(0) || + _local_recvr->sender_queue_empty(_sender_id); + } + + int _sender_id; + vectorized::VDataStreamRecvr* _local_recvr; +}; + class ExchangeSinkLocalState : public PipelineXSinkLocalState<> { ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState); @@ -141,12 +220,21 @@ private: RuntimeProfile::Counter* _memory_usage_counter; RuntimeProfile::Counter* _peak_memory_usage_counter; + RuntimeProfile::Counter* _wait_queue_timer; + RuntimeProfile::Counter* _wait_broadcast_buffer_timer; + std::vector _wait_channel_timer; + // Sender instance id, unique within a fragment. int _sender_id; std::vector _broadcast_pb_blocks; int _broadcast_pb_block_idx; vectorized::BlockSerializer _serializer; + + std::shared_ptr _queue_dependency = nullptr; + std::shared_ptr _exchange_sink_dependency = nullptr; + std::shared_ptr _broadcast_dependency = nullptr; + std::vector> _channels_dependency; }; class ExchangeSinkOperatorX final : public DataSinkOperatorX { @@ -174,7 +262,7 @@ public: int num_receivers = 1); Status try_close(RuntimeState* state) override; - bool can_write(RuntimeState* state) override; + WriteDependency* wait_for_dependency(RuntimeState* state) override; bool is_pending_finish(RuntimeState* state) const override; private: diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 35614f2c67..57891dc904 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -49,6 +49,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info)); SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(_open_timer); + _shared_hash_table_dependency = SharedHashTableDependency::create_shared(_parent->id()); auto& p = _parent->cast(); _shared_state->join_op_variants = p._join_op_variants; _shared_state->probe_key_sz = p._build_key_sz; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 04f80e7284..036f6a3d92 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -46,6 +46,15 @@ public: class HashJoinBuildSinkOperatorX; +class SharedHashTableDependency : public WriteDependency { +public: + ENABLE_FACTORY_CREATOR(SharedHashTableDependency); + SharedHashTableDependency(int id) : WriteDependency(id, "SharedHashTableDependency") {} + ~SharedHashTableDependency() = default; + + void* shared_state() override { return nullptr; } +}; + class HashJoinBuildSinkLocalState final : public JoinBuildSinkLocalState { public: @@ -82,6 +91,7 @@ protected: bool _build_side_ignore_null = false; size_t _build_rf_cardinality = 0; std::unordered_map> _inserted_rows; + std::shared_ptr _shared_hash_table_dependency; RuntimeProfile::Counter* _build_table_timer; RuntimeProfile::Counter* _build_expr_call_timer; @@ -123,13 +133,15 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - bool can_write(RuntimeState* state) override { + WriteDependency* wait_for_dependency(RuntimeState* state) override { if (state->get_sink_local_state(id()) ->cast() ._should_build_hash_table) { - return true; + return nullptr; } - return _shared_hash_table_context && _shared_hash_table_context->signaled; + return state->get_sink_local_state(id()) + ->cast() + ._shared_hash_table_dependency->write_blocked_by(); } bool should_dry_run(RuntimeState* state) override { diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index 20c9634319..568e02be33 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -90,8 +90,6 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - bool can_write(RuntimeState* state) override { return true; } - private: friend class NestedLoopJoinBuildSinkLocalState; diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 02bf5bf99d..124e67a4ec 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -54,12 +54,27 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info)); SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(_open_timer); + static const std::string timer_name = "WaitForDependencyTime"; + _wait_for_dependency_timer = ADD_TIMER(_profile, timer_name); + _wait_for_queue_timer = ADD_CHILD_TIMER(_profile, "WaitForQueue", timer_name); + _wait_for_buffer_timer = ADD_CHILD_TIMER(_profile, "WaitForBuffer", timer_name); + _wait_for_cancel_timer = ADD_CHILD_TIMER(_profile, "WaitForCancel", timer_name); auto fragment_instance_id = state->fragment_instance_id(); // create sender std::shared_ptr sender = nullptr; RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( state->fragment_instance_id(), vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true, state->execution_timeout())); + _result_sink_dependency = OrDependency::create_shared(_parent->id()); + _buffer_dependency = ResultBufferDependency::create_shared(_parent->id()); + _cancel_dependency = CancelDependency::create_shared(_parent->id()); + _result_sink_dependency->add_child(_cancel_dependency); + _result_sink_dependency->add_child(_buffer_dependency); + _queue_dependency = ResultQueueDependency::create_shared(_parent->id()); + _result_sink_dependency->add_child(_queue_dependency); + + ((PipBufferControlBlock*)_sender.get()) + ->set_dependency(_buffer_dependency, _queue_dependency, _cancel_dependency); return Status::OK(); } @@ -153,11 +168,19 @@ Status ResultSinkOperatorX::_second_phase_fetch_data(RuntimeState* state, } Status ResultSinkLocalState::close(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); - SCOPED_TIMER(_close_timer); if (_closed) { return Status::OK(); } + SCOPED_TIMER(_close_timer); + COUNTER_UPDATE(_wait_for_queue_timer, _queue_dependency->write_watcher_elapse_time()); + COUNTER_UPDATE(profile()->total_time_counter(), _queue_dependency->write_watcher_elapse_time()); + COUNTER_SET(_wait_for_buffer_timer, _buffer_dependency->write_watcher_elapse_time()); + COUNTER_UPDATE(profile()->total_time_counter(), + _buffer_dependency->write_watcher_elapse_time()); + COUNTER_SET(_wait_for_cancel_timer, _cancel_dependency->write_watcher_elapse_time()); + COUNTER_UPDATE(profile()->total_time_counter(), + _cancel_dependency->write_watcher_elapse_time()); + SCOPED_TIMER(profile()->total_time_counter()); Status final_status = Status::OK(); if (_writer) { // close the writer @@ -183,7 +206,10 @@ Status ResultSinkLocalState::close(RuntimeState* state) { return final_status; } -bool ResultSinkOperatorX::can_write(RuntimeState* state) { - return state->get_sink_local_state(id())->cast()._sender->can_sink(); +WriteDependency* ResultSinkOperatorX::wait_for_dependency(RuntimeState* state) { + return state->get_sink_local_state(id()) + ->cast() + ._result_sink_dependency->write_blocked_by(); } + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index a83e3c28a9..27c229be1b 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -25,6 +25,7 @@ namespace doris { class DataSink; +class PipBufferControlBlock; namespace pipeline { @@ -42,6 +43,33 @@ public: bool can_write() override; }; +class ResultBufferDependency : public WriteDependency { +public: + ENABLE_FACTORY_CREATOR(ResultBufferDependency); + ResultBufferDependency(int id) : WriteDependency(id, "ResultBufferDependency") {} + ~ResultBufferDependency() = default; + + void* shared_state() override { return nullptr; } +}; + +class ResultQueueDependency : public WriteDependency { +public: + ENABLE_FACTORY_CREATOR(ResultQueueDependency); + ResultQueueDependency(int id) : WriteDependency(id, "ResultQueueDependency") {} + ~ResultQueueDependency() = default; + + void* shared_state() override { return nullptr; } +}; + +class CancelDependency : public WriteDependency { +public: + ENABLE_FACTORY_CREATOR(CancelDependency); + CancelDependency(int id) : WriteDependency(id, "CancelDependency") { _ready_for_write = false; } + ~CancelDependency() = default; + + void* shared_state() override { return nullptr; } +}; + class ResultSinkLocalState final : public PipelineXSinkLocalState<> { ENABLE_FACTORY_CREATOR(ResultSinkLocalState); @@ -60,6 +88,15 @@ private: std::shared_ptr _sender; std::shared_ptr _writer; + std::shared_ptr _result_sink_dependency; + std::shared_ptr _buffer_dependency; + std::shared_ptr _queue_dependency; + std::shared_ptr _cancel_dependency; + + RuntimeProfile::Counter* _wait_for_queue_timer = nullptr; + RuntimeProfile::Counter* _wait_for_buffer_timer = nullptr; + // time of prefilter input block from scanner + RuntimeProfile::Counter* _wait_for_cancel_timer = nullptr; }; class ResultSinkOperatorX final : public DataSinkOperatorX { @@ -72,7 +109,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - bool can_write(RuntimeState* state) override; + WriteDependency* wait_for_dependency(RuntimeState* state) override; private: friend class ResultSinkLocalState; @@ -94,4 +131,4 @@ private: }; } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 0b107d9a13..4508b0f4a2 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1337,6 +1337,9 @@ Status ScanOperatorX::try_close(RuntimeState* state) { template Status ScanLocalState::close(RuntimeState* state) { + if (_closed) { + return Status::OK(); + } SCOPED_TIMER(_close_timer); if (_data_ready_dependency) { COUNTER_UPDATE(_wait_for_data_timer, _data_ready_dependency->read_watcher_elapse_time()); @@ -1355,9 +1358,6 @@ Status ScanLocalState::close(RuntimeState* state) { _scanner_done_dependency->read_watcher_elapse_time()); } SCOPED_TIMER(profile()->total_time_counter()); - if (_closed) { - return Status::OK(); - } if (_scanner_ctx.get()) { _scanner_ctx->clear_and_join(reinterpret_cast(this), state); } diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index b66977c115..84e4268996 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -88,8 +88,6 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - bool can_write(RuntimeState* state) override { return true; } - private: friend class SortSinkLocalState; diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp index b58cd4fdc5..ebc6368226 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp @@ -144,6 +144,7 @@ StreamingAggSinkLocalState::StreamingAggSinkLocalState(DataSinkOperatorXBase* pa Status StreamingAggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); + _shared_state->data_queue.reset(new DataQueue(1, _dependency)); _queue_byte_size_counter = ADD_COUNTER(profile(), "MaxSizeInBlockQueue", TUnit::BYTES); _queue_size_counter = ADD_COUNTER(profile(), "MaxSizeOfBlockQueue", TUnit::UNIT); _streaming_agg_timer = ADD_TIMER(profile(), "StreamingAggTime"); @@ -353,13 +354,6 @@ StreamingAggSinkOperatorX::StreamingAggSinkOperatorX(ObjectPool* pool, const TPl const DescriptorTbl& descs) : AggSinkOperatorX(pool, tnode, descs) {} -bool StreamingAggSinkOperatorX::can_write(RuntimeState* state) { - // sink and source in diff threads - return state->get_sink_local_state(id()) - ->cast() - ._shared_state->data_queue->has_enough_space_to_push(); -} - Status StreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(AggSinkOperatorX::init(tnode, state)); _name = "STREAMING_AGGREGATION_SINK_OPERATOR"; diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h index 5b9c635580..a149a44c67 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h @@ -22,6 +22,7 @@ #include #include "aggregation_sink_operator.h" +#include "aggregation_source_operator.h" #include "common/status.h" #include "operator.h" #include "pipeline/pipeline_x/operator.h" @@ -112,7 +113,9 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - bool can_write(RuntimeState* state) override; + WriteDependency* wait_for_dependency(RuntimeState* state) override { + return state->get_local_state(id())->cast()._dependency->write_blocked_by(); + } }; } // namespace pipeline diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index 824decd790..5c2480538f 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -150,13 +150,13 @@ Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (local_state._output_block == nullptr) { local_state._output_block = - local_state._shared_state->_data_queue->get_free_block(_cur_child_id); + local_state._shared_state->data_queue->get_free_block(_cur_child_id); } if (_cur_child_id < _get_first_materialized_child_idx()) { //pass_through if (in_block->rows() > 0) { local_state._output_block->swap(*in_block); - local_state._shared_state->_data_queue->push_block(std::move(local_state._output_block), - _cur_child_id); + local_state._shared_state->data_queue->push_block(std::move(local_state._output_block), + _cur_child_id); } } else if (_get_first_materialized_child_idx() != children_count() && _cur_child_id < children_count()) { //need materialized @@ -173,17 +173,17 @@ Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block //because maybe sink is eos and queue have none data, if not push block //the source can't can_read again and can't set source finished if (local_state._output_block) { - local_state._shared_state->_data_queue->push_block(std::move(local_state._output_block), - _cur_child_id); + local_state._shared_state->data_queue->push_block(std::move(local_state._output_block), + _cur_child_id); } - local_state._shared_state->_data_queue->set_finish(_cur_child_id); + local_state._shared_state->data_queue->set_finish(_cur_child_id); return Status::OK(); } // not eos and block rows is enough to output,so push block if (local_state._output_block && (local_state._output_block->rows() >= state->batch_size())) { - local_state._shared_state->_data_queue->push_block(std::move(local_state._output_block), - _cur_child_id); + local_state._shared_state->data_queue->push_block(std::move(local_state._output_block), + _cur_child_id); } return Status::OK(); } diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 2bbfab5cfc..4a3bcbf774 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -111,8 +111,6 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - bool can_write(RuntimeState* state) override { return true; } - private: int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; } diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 041038bd16..3b14c61f2e 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -103,9 +103,8 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); - std::shared_ptr data_queue = - std::make_shared(p._child_size, nullptr, _dependency); - _shared_state->_data_queue.swap(data_queue); + std::shared_ptr data_queue = std::make_shared(p._child_size, _dependency); + _shared_state->data_queue.swap(data_queue); return Status::OK(); } @@ -115,17 +114,17 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b SCOPED_TIMER(local_state.profile()->total_time_counter()); std::unique_ptr output_block = vectorized::Block::create_unique(); int child_idx = 0; - local_state._shared_state->_data_queue->get_block_from_queue(&output_block, &child_idx); + local_state._shared_state->data_queue->get_block_from_queue(&output_block, &child_idx); if (!output_block) { return Status::OK(); } block->swap(*output_block); output_block->clear_column_data(row_desc().num_materialized_slots()); - local_state._shared_state->_data_queue->push_free_block(std::move(output_block), child_idx); + local_state._shared_state->data_queue->push_free_block(std::move(output_block), child_idx); local_state.reached_limit(block, source_state); //have exectue const expr, queue have no data any more, and child could be colsed - if ((!_has_data(state) && local_state._shared_state->_data_queue->is_all_finish())) { + if ((!_has_data(state) && local_state._shared_state->data_queue->is_all_finish())) { source_state = SourceState::FINISHED; } else if (_has_data(state)) { source_state = SourceState::MORE_DATA; diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 77e88571c5..c6a6d03316 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -101,7 +101,7 @@ public: private: bool _has_data(RuntimeState* state) { auto& local_state = state->get_local_state(id())->cast(); - return local_state._shared_state->_data_queue->remaining_has_data(); + return local_state._shared_state->data_queue->remaining_has_data(); } bool has_more_const(const RuntimeState* state) const { return state->per_fragment_instance_idx() == 0; diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index e4caf1ebdf..965a547c22 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -94,7 +94,7 @@ protected: class WriteDependency : public Dependency { public: - WriteDependency(int id, std::string name) : Dependency(id, name), _ready_for_write(false) {} + WriteDependency(int id, std::string name) : Dependency(id, name), _ready_for_write(true) {} virtual ~WriteDependency() = default; bool is_write_dependency() override { return true; } @@ -111,7 +111,7 @@ public: return _write_dependency_watcher.elapsed_time(); } - [[nodiscard]] virtual Dependency* write_blocked_by() { + [[nodiscard]] virtual WriteDependency* write_blocked_by() { return _ready_for_write ? nullptr : this; } @@ -159,7 +159,7 @@ public: return nullptr; } - [[nodiscard]] Dependency* write_blocked_by() override { + [[nodiscard]] WriteDependency* write_blocked_by() override { std::unique_lock l(_lock); for (auto& child : _children) { CHECK(child->is_write_dependency()); @@ -204,8 +204,8 @@ public: return res; } - [[nodiscard]] Dependency* write_blocked_by() override { - Dependency* res = nullptr; + [[nodiscard]] WriteDependency* write_blocked_by() override { + WriteDependency* res = nullptr; std::unique_lock l(_lock); for (auto& child : _children) { CHECK(child->is_write_dependency()); @@ -254,18 +254,48 @@ public: size_t input_num_rows = 0; std::vector values; std::unique_ptr agg_profile_arena; - std::unique_ptr data_queue; + std::unique_ptr data_queue = nullptr; }; -class AggDependency : public Dependency { +class AggDependency : public WriteDependency { public: using SharedState = AggSharedState; - AggDependency(int id) : Dependency(id, "AggDependency") { + AggDependency(int id) : WriteDependency(id, "AggDependency") { _mem_tracker = std::make_unique("AggregateOperator:"); - _agg_state.data_queue = std::make_unique(1, this); } ~AggDependency() override = default; + void block_reading() override { + if (_is_streaming_agg_state()) { + if (_agg_state.data_queue->_cur_blocks_nums_in_queue[0] == 0 && + !_agg_state.data_queue->_is_finished[0]) { + _ready_for_read = false; + } + } else { + _ready_for_read = false; + } + } + + void block_writing() override { + if (_is_streaming_agg_state()) { + if (!_agg_state.data_queue->has_enough_space_to_push()) { + _ready_for_write = false; + } + } else { + _ready_for_write = false; + } + } + + void set_ready_for_write() override { + if (_is_streaming_agg_state()) { + if (_agg_state.data_queue->has_enough_space_to_push()) { + WriteDependency::set_ready_for_write(); + } + } else { + WriteDependency::set_ready_for_write(); + } + } + void* shared_state() override { return (void*)&_agg_state; }; Status reset_hash_table(); @@ -316,6 +346,7 @@ protected: std::unique_ptr _mem_tracker; private: + bool _is_streaming_agg_state() { return _agg_state.data_queue != nullptr; } AggSharedState _agg_state; }; @@ -324,10 +355,10 @@ public: std::unique_ptr sorter; }; -class SortDependency final : public Dependency { +class SortDependency final : public WriteDependency { public: using SharedState = SortSharedState; - SortDependency(int id) : Dependency(id, "SortDependency") {} + SortDependency(int id) : WriteDependency(id, "SortDependency") {} ~SortDependency() override = default; void* shared_state() override { return (void*)&_sort_state; }; @@ -337,15 +368,30 @@ private: struct UnionSharedState { public: - std::shared_ptr _data_queue; + std::shared_ptr data_queue; }; -class UnionDependency final : public Dependency { +class UnionDependency final : public WriteDependency { public: using SharedState = UnionSharedState; - UnionDependency(int id) : Dependency(id, "UnionDependency") {} + UnionDependency(int id) : WriteDependency(id, "UnionDependency") {} ~UnionDependency() override = default; - void* shared_state() override { return (void*)&_union_state; }; + void* shared_state() override { return (void*)&_union_state; } + + void set_ready_for_write() override {} + void set_ready_for_read() override { + if (!_union_state.data_queue->is_all_finish()) { + return; + } + if (_ready_for_read) { + return; + } + _read_dependency_watcher.stop(); + _ready_for_read = true; + } + + void block_reading() override {} + void block_writing() override {} private: UnionSharedState _union_state; @@ -362,7 +408,6 @@ public: vectorized::BlockRowPos all_block_end; std::vector input_blocks; bool input_eos = false; - std::atomic_bool need_more_input = true; vectorized::BlockRowPos found_partition_end; std::vector origin_cols; vectorized::VExprContextSPtrs order_by_eq_expr_ctxs; @@ -374,10 +419,10 @@ public: std::vector ordey_by_column_idxs; }; -class AnalyticDependency final : public Dependency { +class AnalyticDependency final : public WriteDependency { public: using SharedState = AnalyticSharedState; - AnalyticDependency(int id) : Dependency(id, "AnalyticDependency") {} + AnalyticDependency(int id) : WriteDependency(id, "AnalyticDependency") {} ~AnalyticDependency() override = default; void* shared_state() override { return (void*)&_analytic_state; }; @@ -385,14 +430,13 @@ public: vectorized::BlockRowPos get_partition_by_end(); bool refresh_need_more_input() { - _analytic_state.need_more_input = - whether_need_next_partition(_analytic_state.found_partition_end); - if (_analytic_state.need_more_input) { + auto need_more_input = whether_need_next_partition(_analytic_state.found_partition_end); + if (need_more_input) { block_reading(); } else { set_ready_for_read(); } - return _analytic_state.need_more_input; + return need_more_input; } bool whether_need_next_partition(vectorized::BlockRowPos& found_partition_end); @@ -432,10 +476,10 @@ struct HashJoinSharedState : public JoinSharedState { std::make_shared>(); }; -class HashJoinDependency final : public Dependency { +class HashJoinDependency final : public WriteDependency { public: using SharedState = HashJoinSharedState; - HashJoinDependency(int id) : Dependency(id, "HashJoinDependency") {} + HashJoinDependency(int id) : WriteDependency(id, "HashJoinDependency") {} ~HashJoinDependency() override = default; void* shared_state() override { return (void*)&_join_state; } @@ -464,10 +508,10 @@ struct NestedLoopJoinSharedState : public JoinSharedState { vectorized::Blocks build_blocks; }; -class NestedLoopJoinDependency final : public Dependency { +class NestedLoopJoinDependency final : public WriteDependency { public: using SharedState = NestedLoopJoinSharedState; - NestedLoopJoinDependency(int id) : Dependency(id, "NestedLoopJoinDependency") {} + NestedLoopJoinDependency(int id) : WriteDependency(id, "NestedLoopJoinDependency") {} ~NestedLoopJoinDependency() override = default; void* shared_state() override { return (void*)&_join_state; } diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 60e034508a..70a3187c8f 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -417,6 +417,7 @@ protected: RuntimeProfile::Counter* _rows_input_counter; RuntimeProfile::Counter* _open_timer = nullptr; RuntimeProfile::Counter* _close_timer = nullptr; + RuntimeProfile::Counter* _wait_for_dependency_timer; }; class DataSinkOperatorXBase : public OperatorBase { @@ -471,7 +472,7 @@ public: return false; } - virtual bool can_write(RuntimeState* state) { return false; } + virtual WriteDependency* wait_for_dependency(RuntimeState* state) { return nullptr; } virtual bool is_pending_finish(RuntimeState* state) const { return false; } @@ -536,15 +537,17 @@ public: ~PipelineXSinkLocalState() override = default; virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) override { + // create profile + _profile = state->obj_pool()->add(new RuntimeProfile( + _parent->get_name() + " (id=" + std::to_string(_parent->id()) + ")")); if constexpr (!std::is_same_v) { _dependency = (DependencyType*)info.dependency; if (_dependency) { _shared_state = (typename DependencyType::SharedState*)_dependency->shared_state(); + _wait_for_dependency_timer = + ADD_TIMER(_profile, "WaitForDependency[" + _dependency->name() + "]Time"); } } - // create profile - _profile = state->obj_pool()->add(new RuntimeProfile( - _parent->get_name() + " (id=" + std::to_string(_parent->id()) + ")")); _rows_input_counter = ADD_COUNTER(_profile, "InputRows", TUnit::UNIT); _open_timer = ADD_TIMER(_profile, "OpenTime"); _close_timer = ADD_TIMER(_profile, "CloseTime"); @@ -559,6 +562,9 @@ public: if (_closed) { return Status::OK(); } + if (_dependency) { + COUNTER_SET(_wait_for_dependency_timer, _dependency->write_watcher_elapse_time()); + } _closed = true; return Status::OK(); } @@ -566,7 +572,7 @@ public: std::string debug_string(int indentation_level) const override; protected: - DependencyType* _dependency; + DependencyType* _dependency = nullptr; typename DependencyType::SharedState* _shared_state; }; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index ae37c233ad..41d5226e85 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -106,7 +106,6 @@ void PipelineXTask::_init_profile() { _close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time); _wait_bf_timer = ADD_TIMER(_task_profile, "WaitBfTime"); - _wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime"); _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime"); _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT); @@ -120,7 +119,6 @@ void PipelineXTask::_init_profile() { void PipelineXTask::_fresh_profile_counter() { COUNTER_SET(_wait_bf_timer, (int64_t)_wait_bf_watcher.elapsed_time()); COUNTER_SET(_schedule_counts, (int64_t)_schedule_time); - COUNTER_SET(_wait_sink_timer, (int64_t)_wait_sink_watcher.elapsed_time()); COUNTER_SET(_wait_worker_timer, (int64_t)_wait_worker_watcher.elapsed_time()); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 1fad0560b0..d54f4b44fd 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -85,7 +85,14 @@ public: return _source->runtime_filters_are_ready_or_timeout(_state); } - bool sink_can_write() override { return _sink->can_write(_state); } + bool sink_can_write() override { + auto dep = _sink->wait_for_dependency(_state); + if (dep != nullptr) { + dep->start_write_watcher(); + return false; + } + return true; + } Status finalize() override; diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 412a970eae..c51ced1aeb 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -31,6 +31,7 @@ #include "arrow/record_batch.h" #include "arrow/type_fwd.h" +#include "pipeline/exec/result_sink_operator.h" #include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "util/thrift_util.h" @@ -274,4 +275,62 @@ Status BufferControlBlock::cancel() { return Status::OK(); } +Status PipBufferControlBlock::add_batch(std::unique_ptr& result) { + RETURN_IF_ERROR(BufferControlBlock::add_batch(result)); + if (_buffer_dependency && _buffer_rows >= _buffer_limit) { + _buffer_dependency->block_writing(); + } + return Status::OK(); +} + +Status PipBufferControlBlock::add_arrow_batch(std::shared_ptr& result) { + RETURN_IF_ERROR(BufferControlBlock::add_arrow_batch(result)); + if (_buffer_dependency && _buffer_rows >= _buffer_limit) { + _buffer_dependency->block_writing(); + } + return Status::OK(); +} + +void PipBufferControlBlock::get_batch(GetResultBatchCtx* ctx) { + BufferControlBlock::get_batch(ctx); + if (_buffer_dependency && _buffer_rows < _buffer_limit) { + _buffer_dependency->set_ready_for_write(); + } +} + +Status PipBufferControlBlock::get_arrow_batch(std::shared_ptr* result) { + RETURN_IF_ERROR(BufferControlBlock::get_arrow_batch(result)); + if (_buffer_dependency && _buffer_rows < _buffer_limit) { + _buffer_dependency->set_ready_for_write(); + } + return Status::OK(); +} + +Status PipBufferControlBlock::cancel() { + RETURN_IF_ERROR(BufferControlBlock::cancel()); + if (_cancel_dependency) { + _cancel_dependency->set_ready_for_write(); + } + + return Status::OK(); +} + +void PipBufferControlBlock::set_dependency( + std::shared_ptr buffer_dependency, + std::shared_ptr queue_dependency, + std::shared_ptr cancel_dependency) { + _buffer_dependency = buffer_dependency; + _queue_dependency = queue_dependency; + _cancel_dependency = cancel_dependency; +} + +void PipBufferControlBlock::_update_batch_queue_empty() { + _batch_queue_empty = _fe_result_batch_queue.empty() && _arrow_flight_batch_queue.empty(); + if (_queue_dependency && _batch_queue_empty) { + _queue_dependency->set_ready_for_write(); + } else if (_queue_dependency) { + _queue_dependency->block_writing(); + } +} + } // namespace doris diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 43c264a866..1b53193f91 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -47,6 +47,12 @@ class Controller; namespace doris { +namespace pipeline { +class ResultBufferDependency; +class CancelDependency; +class ResultQueueDependency; +} // namespace pipeline + class PFetchDataResult; struct GetResultBatchCtx { @@ -73,17 +79,17 @@ public: Status init(); // Only one fragment is written, so can_sink returns true, then the sink must be executed virtual bool can_sink(); - Status add_batch(std::unique_ptr& result); - Status add_arrow_batch(std::shared_ptr& result); + virtual Status add_batch(std::unique_ptr& result); + virtual Status add_arrow_batch(std::shared_ptr& result); - void get_batch(GetResultBatchCtx* ctx); - Status get_arrow_batch(std::shared_ptr* result); + virtual void get_batch(GetResultBatchCtx* ctx); + virtual Status get_arrow_batch(std::shared_ptr* result); // close buffer block, set _status to exec_status and set _is_close to true; // called because data has been read or error happened. Status close(Status exec_status); // this is called by RPC, called from coordinator - Status cancel(); + virtual Status cancel(); [[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; } @@ -153,13 +159,28 @@ public: return _get_batch_queue_empty() || _buffer_rows < _buffer_limit || _is_cancelled; } + Status add_batch(std::unique_ptr& result) override; + + Status add_arrow_batch(std::shared_ptr& result) override; + + void get_batch(GetResultBatchCtx* ctx) override; + + Status get_arrow_batch(std::shared_ptr* result) override; + + Status cancel() override; + + void set_dependency(std::shared_ptr buffer_dependency, + std::shared_ptr queue_dependency, + std::shared_ptr cancel_dependency); + private: bool _get_batch_queue_empty() override { return _batch_queue_empty; } - void _update_batch_queue_empty() override { - _batch_queue_empty = _fe_result_batch_queue.empty() && _arrow_flight_batch_queue.empty(); - } + void _update_batch_queue_empty() override; std::atomic_bool _batch_queue_empty = false; + std::shared_ptr _buffer_dependency = nullptr; + std::shared_ptr _queue_dependency = nullptr; + std::shared_ptr _cancel_dependency = nullptr; }; } // namespace doris diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp b/be/src/vec/runtime/shared_hash_table_controller.cpp index b774876198..9d28983afb 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.cpp +++ b/be/src/vec/runtime/shared_hash_table_controller.cpp @@ -23,6 +23,8 @@ #include // IWYU pragma: keep #include +#include "pipeline/exec/hashjoin_build_sink.h" + namespace doris { namespace vectorized { @@ -68,6 +70,9 @@ void SharedHashTableController::signal(int my_node_id, Status status) { it->second->status = status; _shared_contexts.erase(it); } + for (auto& dep : _dependencies) { + dep->set_ready_for_write(); + } _cv.notify_all(); } @@ -78,6 +83,9 @@ void SharedHashTableController::signal(int my_node_id) { it->second->signaled = true; _shared_contexts.erase(it); } + for (auto& dep : _dependencies) { + dep->set_ready_for_write(); + } _cv.notify_all(); } diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index f76121e7c8..e79ab97af5 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -35,6 +35,10 @@ class HybridSetBase; class BloomFilterFuncBase; class BitmapFilterFuncBase; +namespace pipeline { +class SharedHashTableDependency; +} + namespace vectorized { class Arena; @@ -75,6 +79,10 @@ public: Status wait_for_signal(RuntimeState* state, const SharedHashTableContextPtr& context); bool should_build_hash_table(const TUniqueId& fragment_instance_id, int my_node_id); void set_pipeline_engine_enabled(bool enabled) { _pipeline_engine_enabled = enabled; } + void append_dependency(std::shared_ptr dep) { + std::lock_guard lock(_mutex); + _dependencies.push_back(dep); + } private: bool _pipeline_engine_enabled = false; @@ -82,6 +90,9 @@ private: std::condition_variable _cv; std::map _builder_fragment_ids; std::map _shared_contexts; + + // For pipelineX, we update all dependencies once hash table is built; + std::vector> _dependencies; }; } // namespace vectorized diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 27ef81be22..b65f8f6bfe 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -27,6 +27,7 @@ #include #include "common/logging.h" +#include "pipeline/exec/exchange_sink_operator.h" #include "pipeline/exec/exchange_source_operator.h" #include "runtime/memory/mem_tracker.h" #include "runtime/runtime_state.h" @@ -95,6 +96,9 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block auto [next_block, block_byte_size] = std::move(_block_queue.front()); _recvr->update_blocks_memory_usage(-block_byte_size); _block_queue.pop_front(); + if (_channel_dependency) { + _channel_dependency->try_set_ready_for_write(); + } if (_block_queue.size() == 0 && _dependency) { _dependency->block_reading(); } @@ -168,6 +172,9 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num if (!empty) { _block_queue.emplace_back(std::move(block), block_byte_size); + if (_channel_dependency) { + _channel_dependency->try_block_writing(); + } if (_dependency) { _dependency->set_ready_for_read(); } @@ -224,6 +231,9 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { if (!empty) { _block_queue.emplace_back(std::move(nblock), block_mem_size); + if (_channel_dependency) { + _channel_dependency->try_block_writing(); + } if (_dependency) { _dependency->set_ready_for_read(); } @@ -417,6 +427,13 @@ bool VDataStreamRecvr::sender_queue_empty(int sender_id) { return _sender_queues[use_sender_id]->queue_empty(); } +void VDataStreamRecvr::set_dependency(std::shared_ptr dependency) { + _dependency = dependency; + for (auto& queue : _sender_queues) { + queue->set_channel_dependency(dependency); + } +} + bool VDataStreamRecvr::ready_to_read() { for (const auto& queue : _sender_queues) { if (queue->should_wait()) { @@ -447,11 +464,24 @@ void VDataStreamRecvr::cancel_stream() { } } +void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) { + _blocks_memory_usage->add(size); + _blocks_memory_usage_current_value = _blocks_memory_usage->current_value(); + if (_dependency && _blocks_memory_usage_current_value > config::exchg_node_buffer_size_bytes) { + _dependency->try_block_writing(); + } else if (_dependency) { + _dependency->try_set_ready_for_write(); + } +} + void VDataStreamRecvr::close() { if (_is_closed) { return; } _is_closed = true; + if (_dependency) { + _dependency->try_set_ready_for_write(); + } for (int i = 0; i < _sender_queues.size(); ++i) { _sender_queues[i]->close(); } @@ -497,6 +527,9 @@ void VDataStreamRecvr::PipSenderQueue::add_block(Block* block, bool use_move) { return; } _block_queue.emplace_back(std::move(nblock), block_mem_size); + if (_channel_dependency) { + _channel_dependency->try_block_writing(); + } if (_dependency) { _dependency->set_ready_for_read(); } diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 271e14b88f..d0b009b22f 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -59,7 +59,8 @@ class RuntimeState; namespace pipeline { struct ExchangeDataDependency; -} +class ChannelDependency; +} // namespace pipeline namespace vectorized { class VDataStreamMgr; @@ -121,11 +122,10 @@ public: bool is_closed() const { return _is_closed; } + void set_dependency(std::shared_ptr dependency); + private: - void update_blocks_memory_usage(int64_t size) { - _blocks_memory_usage->add(size); - _blocks_memory_usage_current_value = _blocks_memory_usage->current_value(); - } + void update_blocks_memory_usage(int64_t size); class PipSenderQueue; friend struct BlockSupplierSortCursorImpl; @@ -180,6 +180,7 @@ private: std::shared_ptr _sub_plan_query_statistics_recvr; bool _enable_pipeline; + std::shared_ptr _dependency; }; class ThreadClosure : public google::protobuf::Closure { @@ -221,6 +222,10 @@ public: _dependency = dependency; } + void set_channel_dependency(std::shared_ptr channel_dependency) { + _channel_dependency = channel_dependency; + } + protected: Status _inner_get_batch_without_lock(Block* block, bool* eos); @@ -242,6 +247,7 @@ protected: std::unordered_map> _local_closure; std::shared_ptr _dependency = nullptr; + std::shared_ptr _channel_dependency = nullptr; }; class VDataStreamRecvr::PipSenderQueue : public SenderQueue { diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index a3e4ccf2a6..2e317a2ce3 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -64,6 +64,7 @@ enum CompressionTypePB : int; namespace pipeline { class ExchangeSinkOperator; class ExchangeSinkOperatorX; +class ChannelDependency; } // namespace pipeline namespace vectorized { @@ -305,6 +306,11 @@ public: bool is_local() const { return _is_local; } + VDataStreamRecvr* local_recvr() { + DCHECK(_is_local && _local_recvr != nullptr); + return _local_recvr.get(); + } + virtual void ch_roll_pb_block(); bool can_write() { @@ -544,6 +550,10 @@ public: return _closure.get(); } + void set_dependency(std::shared_ptr dependency) { + Channel::_local_recvr->set_dependency(dependency); + } + private: friend class VDataStreamSender;