diff --git a/be/src/http/action/pipeline_task_action.cpp b/be/src/http/action/pipeline_task_action.cpp new file mode 100644 index 0000000000..c3b49b5713 --- /dev/null +++ b/be/src/http/action/pipeline_task_action.cpp @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/pipeline_task_action.h" + +#include +#include + +#include "http/http_channel.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "pipeline/pipeline_fragment_context.h" +#include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" + +namespace doris { + +const static std::string HEADER_JSON = "application/json"; + +void PipelineTaskAction::handle(HttpRequest* req) { + req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; version=0.0.4"); + HttpChannel::send_reply(req, HttpStatus::OK, + ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks()); +} + +} // end namespace doris diff --git a/be/src/http/action/pipeline_task_action.h b/be/src/http/action/pipeline_task_action.h new file mode 100644 index 0000000000..488a1148a5 --- /dev/null +++ b/be/src/http/action/pipeline_task_action.h @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "http/http_handler.h" + +namespace doris { + +class HttpRequest; + +// Get BE health state from http API. +class PipelineTaskAction : public HttpHandler { +public: + PipelineTaskAction() = default; + + ~PipelineTaskAction() override = default; + + void handle(HttpRequest* req) override; +}; + +} // end namespace doris diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 4636452845..68f34ffc82 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -90,6 +90,13 @@ bool ExchangeSinkBuffer::can_write() const { return total_package_size <= max_package_size; } +template +void ExchangeSinkBuffer::_set_ready_to_finish(bool all_done) { + if (_finish_dependency && _should_stop && all_done) { + _finish_dependency->set_ready_to_finish(); + } +} + template bool ExchangeSinkBuffer::is_pending_finish() { //note(wb) angly implementation here, because operator couples the scheduling logic @@ -160,9 +167,6 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { send_now = true; _rpc_channel_is_idle[ins_id.lo] = false; _busy_channels++; - if (_finish_dependency) { - _finish_dependency->block_finishing(); - } } _instance_to_package_queue[ins_id.lo].emplace(std::move(request)); _total_queue_size++; @@ -196,9 +200,6 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& req send_now = true; _rpc_channel_is_idle[ins_id.lo] = false; _busy_channels++; - if (_finish_dependency) { - _finish_dependency->block_finishing(); - } } _instance_to_broadcast_package_queue[ins_id.lo].emplace(request); } @@ -222,10 +223,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { if (_is_finishing) { _rpc_channel_is_idle[id] = true; - _busy_channels--; - if (_finish_dependency && _busy_channels == 0) { - _finish_dependency->set_ready_to_finish(); - } + _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1); return Status::OK(); } @@ -361,10 +359,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { broadcast_q.pop(); } else { _rpc_channel_is_idle[id] = true; - _busy_channels--; - if (_finish_dependency && _busy_channels == 0) { - _finish_dependency->set_ready_to_finish(); - } + _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1); } return Status::OK(); @@ -396,11 +391,8 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) { } else { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); if (!_rpc_channel_is_idle[id]) { - _busy_channels--; _rpc_channel_is_idle[id] = true; - if (_finish_dependency && _busy_channels == 0) { - _finish_dependency->set_ready_to_finish(); - } + _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1); } } } @@ -417,11 +409,8 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); _instance_to_receiver_eof[id] = true; if (!_rpc_channel_is_idle[id]) { - _busy_channels--; _rpc_channel_is_idle[id] = true; - if (_finish_dependency && _busy_channels == 0) { - _finish_dependency->set_ready_to_finish(); - } + _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1); } } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 9111f553b2..edcfa80bc2 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -195,7 +195,14 @@ public: } void set_query_statistics(QueryStatistics* statistics) { _statistics = statistics; } + void set_should_stop() { + _should_stop = true; + _set_ready_to_finish(_busy_channels == 0); + } + private: + void _set_ready_to_finish(bool all_done); + phmap::flat_hash_map> _instance_to_package_queue_mutex; // store data in non-broadcast shuffle @@ -244,6 +251,7 @@ private: std::shared_ptr _queue_dependency = nullptr; std::shared_ptr _finish_dependency = nullptr; QueryStatistics* _statistics = nullptr; + std::atomic _should_stop {false}; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 2541849295..f5518a1553 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -173,13 +173,16 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf register_channels(_sink_buffer.get()); - _exchange_sink_dependency = AndDependency::create_shared(_parent->operator_id()); - _queue_dependency = ExchangeSinkQueueDependency::create_shared(_parent->operator_id()); + _exchange_sink_dependency = + AndDependency::create_shared(_parent->operator_id(), _parent->node_id()); + _queue_dependency = + ExchangeSinkQueueDependency::create_shared(_parent->operator_id(), _parent->node_id()); _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); _exchange_sink_dependency->add_child(_queue_dependency); if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && !only_local_exchange) { - _broadcast_dependency = BroadcastDependency::create_shared(_parent->operator_id()); + _broadcast_dependency = + BroadcastDependency::create_shared(_parent->operator_id(), _parent->node_id()); _broadcast_dependency->set_available_block(config::num_broadcast_buffer); _broadcast_pb_blocks.reserve(config::num_broadcast_buffer); for (size_t i = 0; i < config::num_broadcast_buffer; i++) { @@ -194,7 +197,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf size_t dep_id = 0; _local_channels_dependency.resize(local_size); _wait_channel_timer.resize(local_size); - auto deps_for_channels = AndDependency::create_shared(_parent->operator_id()); + auto deps_for_channels = + AndDependency::create_shared(_parent->operator_id(), _parent->node_id()); for (auto channel : channels) { if (channel->is_local()) { _local_channels_dependency[dep_id] = channel->get_local_channel_dependency(); @@ -225,6 +229,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf fmt::format("Crc32HashPartitioner({})", _partition_count)); } + _finish_dependency->should_finish_after_check(); + return Status::OK(); } @@ -506,6 +512,7 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status exec_status) final_st = st; } } + local_state._sink_buffer->set_should_stop(); return final_st; } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 6b9d3b5e4b..9dc670cd66 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -67,7 +67,8 @@ private: class ExchangeSinkQueueDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(ExchangeSinkQueueDependency); - ExchangeSinkQueueDependency(int id) : WriteDependency(id, "ResultQueueDependency") {} + ExchangeSinkQueueDependency(int id, int node_id) + : WriteDependency(id, node_id, "ResultQueueDependency") {} ~ExchangeSinkQueueDependency() override = default; void* shared_state() override { return nullptr; } @@ -76,23 +77,23 @@ public: class BroadcastDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(BroadcastDependency); - BroadcastDependency(int id) : WriteDependency(id, "BroadcastDependency"), _available_block(0) {} + BroadcastDependency(int id, int node_id) + : WriteDependency(id, node_id, "BroadcastDependency"), _available_block(0) {} ~BroadcastDependency() override = default; - [[nodiscard]] WriteDependency* write_blocked_by() override { - if (config::enable_fuzzy_mode && _available_block == 0 && - _should_log(_write_dependency_watcher.elapsed_time())) { - LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " - << id(); - } - return _available_block > 0 ? nullptr : this; - } - void set_available_block(int available_block) { _available_block = available_block; } - void return_available_block() { _available_block++; } + void return_available_block() { + _available_block++; + WriteDependency::set_ready_for_write(); + } - void take_available_block() { _available_block--; } + void take_available_block() { + auto old_vale = _available_block.fetch_sub(1); + if (old_vale == 1) { + WriteDependency::block_writing(); + } + } void* shared_state() override { throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!"); @@ -134,25 +135,11 @@ private: class LocalExchangeChannelDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(LocalExchangeChannelDependency); - LocalExchangeChannelDependency(int id, std::shared_ptr mem_available) - : WriteDependency(id, "LocalExchangeChannelDependency"), - _mem_available(mem_available) {} + LocalExchangeChannelDependency(int id, int node_id) + : WriteDependency(id, node_id, "LocalExchangeChannelDependency") {} ~LocalExchangeChannelDependency() override = default; - - WriteDependency* write_blocked_by() override { - if (config::enable_fuzzy_mode && !_is_runnable() && - _should_log(_write_dependency_watcher.elapsed_time())) { - LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " - << id(); - } - return _is_runnable() ? nullptr : this; - } - void* shared_state() override { return nullptr; } - -private: - bool _is_runnable() const { return _ready_for_write || *_mem_available; } - std::shared_ptr _mem_available; + // TODO(gabriel): blocked by memory }; class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 3213ed5577..4a29694a37 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -51,12 +51,13 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(), profile(), p.is_merging(), p.sub_plan_query_statistics_recvr()); - source_dependency = AndDependency::create_shared(_parent->operator_id()); + source_dependency = AndDependency::create_shared(_parent->operator_id(), _parent->node_id()); const auto& queues = stream_recvr->sender_queues(); deps.resize(queues.size()); metrics.resize(queues.size()); for (size_t i = 0; i < queues.size(); i++) { - deps[i] = ExchangeDataDependency::create_shared(_parent->operator_id(), queues[i]); + deps[i] = ExchangeDataDependency::create_shared(_parent->operator_id(), _parent->node_id(), + queues[i]); queues[i]->set_dependency(deps[i]); source_dependency->add_child(deps[i]); } diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index c41268f8ea..5d754747be 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -53,8 +53,9 @@ public: struct ExchangeDataDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(ExchangeDataDependency); - ExchangeDataDependency(int id, vectorized::VDataStreamRecvr::SenderQueue* sender_queue) - : Dependency(id, "DataDependency"), _always_done(false) {} + ExchangeDataDependency(int id, int node_id, + vectorized::VDataStreamRecvr::SenderQueue* sender_queue) + : Dependency(id, node_id, "DataDependency"), _always_done(false) {} void* shared_state() override { return nullptr; } void set_always_done() { @@ -70,7 +71,7 @@ public: if (_always_done) { return; } - _ready_for_read = false; + Dependency::block_reading(); } private: diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index cb63f64ab4..3d9827a27b 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -49,7 +49,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); _shared_hash_table_dependency = - SharedHashTableDependency::create_shared(_parent->operator_id()); + SharedHashTableDependency::create_shared(_parent->operator_id(), _parent->node_id()); auto& p = _parent->cast(); _shared_state->join_op_variants = p._join_op_variants; if (p._is_broadcast_join && state->enable_share_hash_table_for_broadcast_join()) { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 10056a30e7..a1815ca511 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -49,7 +49,8 @@ class HashJoinBuildSinkOperatorX; class SharedHashTableDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(SharedHashTableDependency); - SharedHashTableDependency(int id) : WriteDependency(id, "SharedHashTableDependency") {} + SharedHashTableDependency(int id, int node_id) + : WriteDependency(id, node_id, "SharedHashTableDependency") {} ~SharedHashTableDependency() override = default; void* shared_state() override { return nullptr; } diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 4c6e21c5c6..26eee2161c 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -138,7 +138,8 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); - static_cast(_dependency)->set_consumer_id(p._consumer_id); + _shared_state->multi_cast_data_streamer.set_dep_by_sender_idx( + p._consumer_id, static_cast(_dependency)); _output_expr_contexts.resize(p._output_expr_contexts.size()); for (size_t i = 0; i < p._output_expr_contexts.size(); i++) { RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i])); diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp b/be/src/pipeline/exec/multi_cast_data_streamer.cpp index 3929c6ced0..8d7a745a04 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp +++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp @@ -17,6 +17,7 @@ #include "multi_cast_data_streamer.h" +#include "pipeline/pipeline_x/dependency.h" #include "runtime/runtime_state.h" namespace doris::pipeline { @@ -46,6 +47,9 @@ void MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block } } *eos = _eos and pos_to_pull == _multi_cast_blocks.end(); + if (pos_to_pull == _multi_cast_blocks.end()) { + _block_reading(sender_idx); + } } void MultiCastDataStreamer::close_sender(int sender_idx) { @@ -63,6 +67,7 @@ void MultiCastDataStreamer::close_sender(int sender_idx) { } } _closed_sender_count++; + _block_reading(sender_idx); } Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block* block, bool eos) { @@ -87,10 +92,36 @@ Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block for (int i = 0; i < _sender_pos_to_read.size(); ++i) { if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) { _sender_pos_to_read[i] = end; + _set_ready_for_read(i); } } _eos = eos; return Status::OK(); } +void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) { + if (_dependencies.empty()) { + return; + } + auto* dep = _dependencies[sender_idx]; + DCHECK(dep); + dep->set_ready_for_read(); +} + +void MultiCastDataStreamer::_set_ready_for_read() { + for (auto* dep : _dependencies) { + DCHECK(dep); + dep->set_ready_for_read(); + } +} + +void MultiCastDataStreamer::_block_reading(int sender_idx) { + if (_dependencies.empty()) { + return; + } + auto* dep = _dependencies[sender_idx]; + DCHECK(dep); + dep->block_reading(); +} + } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h b/be/src/pipeline/exec/multi_cast_data_streamer.h index 92c0e24079..4d03fe53b8 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.h +++ b/be/src/pipeline/exec/multi_cast_data_streamer.h @@ -21,6 +21,7 @@ namespace doris::pipeline { +class MultiCastDependency; struct MultiCastBlock { MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size); @@ -33,11 +34,16 @@ struct MultiCastBlock { // code class MultiCastDataStreamer { public: - MultiCastDataStreamer(const RowDescriptor& row_desc, ObjectPool* pool, int cast_sender_count) + MultiCastDataStreamer(const RowDescriptor& row_desc, ObjectPool* pool, int cast_sender_count, + bool with_dependencies = false) : _row_desc(row_desc), _profile(pool->add(new RuntimeProfile("MultiCastDataStreamSink"))), _cast_sender_count(cast_sender_count) { _sender_pos_to_read.resize(cast_sender_count, _multi_cast_blocks.end()); + if (with_dependencies) { + _dependencies.resize(cast_sender_count, nullptr); + } + _peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES); _process_rows = ADD_COUNTER(profile(), "ProcessRows", TUnit::UNIT); }; @@ -65,9 +71,19 @@ public: void set_eos() { std::lock_guard l(_mutex); _eos = true; + _set_ready_for_read(); + } + + void set_dep_by_sender_idx(int sender_idx, MultiCastDependency* dep) { + _dependencies[sender_idx] = dep; + _block_reading(sender_idx); } private: + void _set_ready_for_read(int sender_idx); + void _set_ready_for_read(); + void _block_reading(int sender_idx); + const RowDescriptor& _row_desc; RuntimeProfile* _profile; std::list _multi_cast_blocks; @@ -80,5 +96,7 @@ private: RuntimeProfile::Counter* _process_rows; RuntimeProfile::Counter* _peak_mem_usage; + + std::vector _dependencies; }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h b/be/src/pipeline/exec/partition_sort_source_operator.h index 40b802c35e..9720e55efa 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.h +++ b/be/src/pipeline/exec/partition_sort_source_operator.h @@ -66,7 +66,7 @@ private: friend class PartitionSortSourceOperatorX; RuntimeProfile::Counter* _get_sorted_timer; RuntimeProfile::Counter* _get_next_timer; - int _sort_idx = 0; + std::atomic _sort_idx = 0; }; class PartitionSortSourceOperatorX final : public OperatorX { diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index b65a5ba1b8..e27b0da1d3 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -65,12 +65,16 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( state->fragment_instance_id(), vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true, state->execution_timeout())); - _result_sink_dependency = OrDependency::create_shared(_parent->operator_id()); - _buffer_dependency = ResultBufferDependency::create_shared(_parent->operator_id()); - _cancel_dependency = CancelDependency::create_shared(_parent->operator_id()); + _result_sink_dependency = + OrDependency::create_shared(_parent->operator_id(), _parent->node_id()); + _buffer_dependency = + ResultBufferDependency::create_shared(_parent->operator_id(), _parent->node_id()); + _cancel_dependency = + CancelDependency::create_shared(_parent->operator_id(), _parent->node_id()); _result_sink_dependency->add_child(_cancel_dependency); _result_sink_dependency->add_child(_buffer_dependency); - _queue_dependency = ResultQueueDependency::create_shared(_parent->operator_id()); + _queue_dependency = + ResultQueueDependency::create_shared(_parent->operator_id(), _parent->node_id()); _result_sink_dependency->add_child(_queue_dependency); ((PipBufferControlBlock*)_sender.get()) diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 311d2c7067..e117f55d25 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -46,7 +46,8 @@ public: class ResultBufferDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(ResultBufferDependency); - ResultBufferDependency(int id) : WriteDependency(id, "ResultBufferDependency") {} + ResultBufferDependency(int id, int node_id) + : WriteDependency(id, node_id, "ResultBufferDependency") {} ~ResultBufferDependency() override = default; void* shared_state() override { return nullptr; } @@ -55,7 +56,8 @@ public: class ResultQueueDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(ResultQueueDependency); - ResultQueueDependency(int id) : WriteDependency(id, "ResultQueueDependency") {} + ResultQueueDependency(int id, int node_id) + : WriteDependency(id, node_id, "ResultQueueDependency") {} ~ResultQueueDependency() override = default; void* shared_state() override { return nullptr; } @@ -64,7 +66,9 @@ public: class CancelDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(CancelDependency); - CancelDependency(int id) : WriteDependency(id, "CancelDependency") { _ready_for_write = false; } + CancelDependency(int id, int node_id) : WriteDependency(id, node_id, "CancelDependency") { + _ready_for_write = false; + } ~CancelDependency() override = default; void* shared_state() override { return nullptr; } diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 601bf5b8b9..d2953d3593 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -122,11 +122,11 @@ Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(RuntimeFilterConsumer::init(state)); - _source_dependency = OrDependency::create_shared(PipelineXLocalState<>::_parent->operator_id()); + _source_dependency = OrDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(), + PipelineXLocalState<>::_parent->node_id()); - _open_dependency = OpenDependency::create_shared(PipelineXLocalState<>::_parent->operator_id()); - _source_dependency->add_child(_open_dependency); - _eos_dependency = EosDependency::create_shared(PipelineXLocalState<>::_parent->operator_id()); + _eos_dependency = EosDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(), + PipelineXLocalState<>::_parent->node_id()); _source_dependency->add_child(_eos_dependency); auto& p = _parent->cast(); set_scan_ranges(state, info.scan_ranges); @@ -168,7 +168,7 @@ template Status ScanLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); - if (_open_dependency == nullptr) { + if (_opened) { return Status::OK(); } RETURN_IF_ERROR(_acquire_runtime_filter()); @@ -177,12 +177,12 @@ Status ScanLocalState::open(RuntimeState* state) { auto status = _eos_dependency->read_blocked_by() == nullptr ? Status::OK() : _prepare_scanners(); if (_scanner_ctx) { + _finish_dependency->should_finish_after_check(); DCHECK(_eos_dependency->read_blocked_by() != nullptr && _num_scanners->value() > 0); RETURN_IF_ERROR(_scanner_ctx->init()); RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get())); } - _source_dependency->remove_first_child(); - _open_dependency = nullptr; + _opened = true; return status; } @@ -1181,11 +1181,10 @@ Status ScanLocalState::_start_scanners( _scanner_ctx = PipScannerContext::create_shared(state(), this, p._output_tuple_desc, scanners, p.limit(), state()->scan_queue_mem_limit(), p._col_distribute_ids, 1); - _scanner_done_dependency = - ScannerDoneDependency::create_shared(p.operator_id(), _scanner_ctx.get()); + _scanner_done_dependency = ScannerDoneDependency::create_shared(p.operator_id(), p.node_id()); _source_dependency->add_child(_scanner_done_dependency); _data_ready_dependency = - DataReadyDependency::create_shared(p.operator_id(), _scanner_ctx.get()); + DataReadyDependency::create_shared(p.operator_id(), p.node_id(), _scanner_ctx.get()); _source_dependency->add_child(_data_ready_dependency); _scanner_ctx->set_dependency(_data_ready_dependency, _scanner_done_dependency, diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 68d006006f..66543dc7ff 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -56,58 +56,34 @@ public: Status try_close(RuntimeState* state) override; }; -struct OpenDependency final : public Dependency { -public: - ENABLE_FACTORY_CREATOR(OpenDependency); - OpenDependency(int id) : Dependency(id, "OpenDependency") {} - void* shared_state() override { return nullptr; } - [[nodiscard]] Dependency* read_blocked_by() override { return nullptr; } - [[nodiscard]] int64_t read_watcher_elapse_time() override { return 0; } -}; - class EosDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(EosDependency); - EosDependency(int id) : Dependency(id, "EosDependency") {} + EosDependency(int id, int node_id) : Dependency(id, node_id, "EosDependency") {} void* shared_state() override { return nullptr; } }; class ScannerDoneDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(ScannerDoneDependency); - ScannerDoneDependency(int id, vectorized::ScannerContext* scanner_ctx) - : Dependency(id, "ScannerDoneDependency"), _scanner_ctx(scanner_ctx) {} + ScannerDoneDependency(int id, int node_id) : Dependency(id, node_id, "ScannerDoneDependency") {} void* shared_state() override { return nullptr; } - [[nodiscard]] Dependency* read_blocked_by() override { - return _scanner_ctx->done() ? nullptr : this; - } - void set_ready_for_read() override { - // ScannerContext is set done outside this function now and only stop watcher here. - _read_dependency_watcher.stop(); - } - -private: - vectorized::ScannerContext* _scanner_ctx; }; class DataReadyDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(DataReadyDependency); - DataReadyDependency(int id, vectorized::ScannerContext* scanner_ctx) - : Dependency(id, "DataReadyDependency"), _scanner_ctx(scanner_ctx) {} + DataReadyDependency(int id, int node_id, vectorized::ScannerContext* scanner_ctx) + : Dependency(id, node_id, "DataReadyDependency"), _scanner_ctx(scanner_ctx) {} void* shared_state() override { return nullptr; } - [[nodiscard]] Dependency* read_blocked_by() override { + // TODO(gabriel): + [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override { if (_scanner_ctx->get_num_running_scanners() == 0 && _scanner_ctx->should_be_scheduled()) { _scanner_ctx->reschedule_scanner_ctx(); } - if (config::enable_fuzzy_mode && !_ready_for_read && - _should_log(_read_dependency_watcher.elapsed_time())) { - LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " - << id(); - } - return _ready_for_read ? nullptr : this; + return Dependency::read_blocked_by(task); } private: @@ -151,7 +127,7 @@ protected: virtual Status _init_profile() = 0; - std::shared_ptr _open_dependency; + std::atomic _opened {false}; std::shared_ptr _eos_dependency; std::shared_ptr _source_dependency; std::shared_ptr _scanner_done_dependency; diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index 36503df2db..de9b48362c 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -72,6 +72,7 @@ template class SetProbeSinkOperator; template Status SetProbeSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + DataSinkOperatorX>::_name = "SET_PROBE_SINK_OPERATOR"; const std::vector>* result_texpr_lists; // Create result_expr_ctx_lists_ from thrift exprs. @@ -109,11 +110,6 @@ Status SetProbeSinkOperatorX::sink(RuntimeState* state, vectorized SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - if (_cur_child_id > 1) { - CHECK(local_state._shared_state->probe_finished_children_index[_cur_child_id - 1]) - << fmt::format("child with id: {} should be probed first", _cur_child_id); - } - auto probe_rows = in_block->rows(); if (probe_rows > 0) { RETURN_IF_ERROR(_extract_probe_column(local_state, *in_block, local_state._probe_columns, @@ -202,7 +198,6 @@ void SetProbeSinkOperatorX::_finalize_probe( SetProbeSinkLocalState& local_state) { auto& valid_element_in_hash_tbl = local_state._shared_state->valid_element_in_hash_tbl; auto& hash_table_variants = local_state._shared_state->hash_table_variants; - auto& probe_finished_children_index = local_state._shared_state->probe_finished_children_index; if (_cur_child_id != (local_state._shared_state->child_quantity - 1)) { _refresh_hash_table(local_state); @@ -223,7 +218,7 @@ void SetProbeSinkOperatorX::_finalize_probe( } else { local_state._dependency->set_ready_for_read(); } - probe_finished_children_index[_cur_child_id] = true; + local_state._shared_state->set_probe_finished_children(_cur_child_id); } template diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 90cc792d47..52bd8aa3cf 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -90,7 +90,7 @@ Status SetSinkOperatorX::sink(RuntimeState* state, vectorized::Blo }, *local_state._shared_state->hash_table_variants); } - local_state._shared_state->probe_finished_children_index[_cur_child_id] = true; + local_state._shared_state->set_probe_finished_children(_cur_child_id); if (_child_quantity == 1) { local_state._dependency->set_ready_for_read(); } @@ -171,7 +171,6 @@ Status SetSinkLocalState::init(RuntimeState* state, LocalSinkState } _shared_state->child_quantity = parent._child_quantity; - _shared_state->probe_finished_children_index.assign(parent._child_quantity, false); auto& child_exprs_lists = _shared_state->child_exprs_lists; DCHECK(child_exprs_lists.size() == 0 || child_exprs_lists.size() == parent._child_quantity); @@ -192,6 +191,7 @@ Status SetSinkLocalState::init(RuntimeState* state, LocalSinkState template Status SetSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + Base::_name = "SET_SINK_OPERATOR"; const std::vector>* result_texpr_lists; // Create result_expr_ctx_lists_ from thrift exprs. diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index d3840285c3..6e5e014036 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -52,8 +52,8 @@ template class SetSourceOperator; template Status SetSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { std::shared_ptr ss = nullptr; - ss.reset(new typename SetDependency::SharedState()); auto& deps = info.dependencys; + ss.reset(new typename SetDependency::SharedState(deps.size())); for (auto& dep : deps) { ((SetDependency*)dep.get())->set_shared_state(ss); } diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 8e5179734c..d1515faa1b 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -114,7 +114,8 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { DCHECK(deps.size() == 1); DCHECK(deps.front() == nullptr); //child_count == 0 , we need to creat a UnionDependency - deps.front() = std::make_shared(_parent->operator_id()); + deps.front() = + std::make_shared(_parent->operator_id(), _parent->node_id()); ((UnionDependency*)deps.front().get())->set_shared_state(ss); } RETURN_IF_ERROR(Base::init(state, info)); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 9610122bc0..be4fd6c5ae 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -126,7 +126,8 @@ PipelineFragmentContext::PipelineFragmentContext( _call_back(call_back), _is_report_on_cancel(true), _report_status_cb(report_status_cb), - _group_commit(group_commit) { + _group_commit(group_commit), + _create_time(MonotonicNanos()) { if (_query_ctx->get_task_group()) { _task_group_entity = _query_ctx->get_task_group()->task_entity(); } diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 85834e513f..aa2f139c50 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -109,8 +109,6 @@ public: void close_a_pipeline(); - std::string to_http_path(const std::string& file_name); - void set_merge_controller_handler( std::shared_ptr& handler) { _merge_controller_handler = handler; @@ -145,6 +143,10 @@ public: } void refresh_next_report_time(); + virtual std::string debug_string() { return ""; } + + uint64_t create_time() const { return _create_time; } + protected: Status _create_sink(int sender_id, const TDataSink& t_data_sink, RuntimeState* state); Status _build_pipelines(ExecNode*, PipelinePtr); @@ -222,6 +224,8 @@ private: static bool _has_inverted_index_or_partial_update(TOlapTableSink sink); std::vector> _tasks; bool _group_commit; + + uint64_t _create_time; }; } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 690c4e3419..947f341880 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -251,6 +251,11 @@ public: void set_parent_profile(RuntimeProfile* profile) { _parent_profile = profile; } + virtual bool is_pipelineX() const { return false; } + + bool is_running() { return _running.load(); } + void set_running(bool running) { _running = running; } + protected: void _finish_p_dependency() { for (const auto& p : _pipeline->_parents) { @@ -359,5 +364,7 @@ private: OperatorPtr _source; OperatorPtr _root; OperatorPtr _sink; + + std::atomic _running {false}; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 32bd06f598..a06fadef03 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -21,10 +21,177 @@ #include #include "common/logging.h" +#include "pipeline/pipeline_fragment_context.h" +#include "pipeline/pipeline_task.h" +#include "pipeline/pipeline_x/pipeline_x_task.h" #include "runtime/memory/mem_tracker.h" namespace doris::pipeline { +void Dependency::add_block_task(PipelineXTask* task) { + // TODO(gabriel): support read dependency + if (!_blocked_task.empty() && _blocked_task[_blocked_task.size() - 1] == task) { + return; + } + _blocked_task.push_back(task); +} + +void WriteDependency::add_write_block_task(PipelineXTask* task) { + DCHECK(_write_blocked_task.empty() || + _write_blocked_task[_write_blocked_task.size() - 1] != task) + << "Duplicate task: " << task->debug_string(); + _write_blocked_task.push_back(task); +} + +void FinishDependency::add_block_task(PipelineXTask* task) { + DCHECK(_finish_blocked_task.empty() || + _finish_blocked_task[_finish_blocked_task.size() - 1] != task) + << "Duplicate task: " << task->debug_string(); + _finish_blocked_task.push_back(task); +} + +void RuntimeFilterDependency::add_block_task(PipelineXTask* task) { + DCHECK(_filter_blocked_task.empty() || + _filter_blocked_task[_filter_blocked_task.size() - 1] != task) + << "Duplicate task: " << task->debug_string(); + DCHECK(_blocked_by_rf) << "It is not allowed: task: " << task->debug_string() + << " \n dependency: " << debug_string() + << " \n state: " << get_state_name(task->get_state()); + _filter_blocked_task.push_back(task); +} + +void Dependency::set_ready_for_read() { + if (_ready_for_read) { + return; + } + _read_dependency_watcher.stop(); + std::vector local_block_task {}; + { + std::unique_lock lc(_task_lock); + if (_ready_for_read) { + return; + } + _ready_for_read = true; + local_block_task.swap(_blocked_task); + } +} + +void WriteDependency::set_ready_for_write() { + if (_ready_for_write) { + return; + } + _write_dependency_watcher.stop(); + + std::vector local_block_task {}; + { + std::unique_lock lc(_task_lock); + if (_ready_for_write) { + return; + } + _ready_for_write = true; + local_block_task.swap(_write_blocked_task); + } + for (auto* task : local_block_task) { + task->try_wake_up(this); + } +} + +void FinishDependency::set_ready_to_finish() { + if (_ready_to_finish) { + return; + } + _finish_dependency_watcher.stop(); + + std::vector local_block_task {}; + { + std::unique_lock lc(_task_lock); + if (_ready_to_finish) { + return; + } + _ready_to_finish = true; + local_block_task.swap(_finish_blocked_task); + } + for (auto* task : local_block_task) { + task->try_wake_up(this); + } +} + +Dependency* Dependency::read_blocked_by(PipelineXTask* task) { + if (config::enable_fuzzy_mode && !_ready_for_read && + _should_log(_read_dependency_watcher.elapsed_time())) { + LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " + << _node_id << " block tasks: " << _blocked_task.size() + << " write block tasks: " + << (is_write_dependency() + ? ((WriteDependency*)this)->_write_blocked_task.size() + : 0) + << " write done: " + << (is_write_dependency() ? ((WriteDependency*)this)->_ready_for_write.load() + : true) + << "task: " << (task ? task->fragment_context()->debug_string() : ""); + } + + std::unique_lock lc(_task_lock); + auto ready_for_read = _ready_for_read.load(); + if (!ready_for_read && task) { + add_block_task(task); + } + return ready_for_read ? nullptr : this; +} + +RuntimeFilterDependency* RuntimeFilterDependency::filter_blocked_by(PipelineXTask* task) { + if (!_blocked_by_rf) { + return nullptr; + } + std::unique_lock lc(_task_lock); + if (*_blocked_by_rf) { + if (LIKELY(task)) { + add_block_task(task); + } + return this; + } + return nullptr; +} + +FinishDependency* FinishDependency::finish_blocked_by(PipelineXTask* task) { + std::unique_lock lc(_task_lock); + if (!_ready_to_finish && task) { + add_block_task(task); + } + return _ready_to_finish ? nullptr : this; +} + +WriteDependency* WriteDependency::write_blocked_by(PipelineXTask* task) { + std::unique_lock lc(_task_lock); + const auto ready_for_write = _ready_for_write.load(); + if (!ready_for_write && task) { + add_write_block_task(task); + } + return ready_for_write ? nullptr : this; +} + +Dependency* OrDependency::read_blocked_by(PipelineXTask* task) { + // TODO(gabriel): + for (auto& child : _children) { + auto* cur_res = child->read_blocked_by(nullptr); + if (cur_res == nullptr) { + return nullptr; + } + } + return this; +} + +WriteDependency* OrDependency::write_blocked_by(PipelineXTask* task) { + for (auto& child : _children) { + CHECK(child->is_write_dependency()); + auto* cur_res = ((WriteDependency*)child.get())->write_blocked_by(nullptr); + if (cur_res == nullptr) { + return nullptr; + } + } + return this; +} + template Status HashJoinDependency::extract_join_column( vectorized::Block&, COW::mutable_ptr>&, @@ -39,17 +206,43 @@ template Status HashJoinDependency::extract_join_column( std::string Dependency::debug_string(int indentation_level) { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}{}: id={}, done={}", - std::string(indentation_level * 2, ' '), _name, _id, - read_blocked_by() == nullptr); + fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {}, _ready_for_read={}", + std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(), + _ready_for_read); + return fmt::to_string(debug_string_buffer); +} + +std::string WriteDependency::debug_string(int indentation_level) { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, + "{}{}: id={}, read block task = {},write block " + "task = {}, _ready_for_write = {}, _ready_for_read = {}", + std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(), + _write_blocked_task.size(), _ready_for_write, _ready_for_read); + return fmt::to_string(debug_string_buffer); +} + +std::string FinishDependency::debug_string(int indentation_level) { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {}, _ready_to_finish = {}", + std::string(indentation_level * 2, ' '), _name, _node_id, + _finish_blocked_task.size(), _ready_to_finish); + return fmt::to_string(debug_string_buffer); +} + +std::string RuntimeFilterDependency::debug_string(int indentation_level) { + fmt::memory_buffer debug_string_buffer; + fmt::format_to( + debug_string_buffer, "{}{}: id={}, block task = {}, _blocked_by_rf = {}, _filters = {}", + std::string(indentation_level * 2, ' '), _name, _node_id, _filter_blocked_task.size(), + _blocked_by_rf ? _blocked_by_rf->load() : false, _filters); return fmt::to_string(debug_string_buffer); } std::string AndDependency::debug_string(int indentation_level) { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}{}: id={}, done={}, children=[", - std::string(indentation_level * 2, ' '), _name, _id, - read_blocked_by() == nullptr); + fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[", + std::string(indentation_level * 2, ' '), _name, _node_id); for (auto& child : _children) { fmt::format_to(debug_string_buffer, "{}, \n", child->debug_string(indentation_level = 1)); } @@ -59,9 +252,8 @@ std::string AndDependency::debug_string(int indentation_level) { std::string OrDependency::debug_string(int indentation_level) { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}{}: id={}, done={}, children=[", - std::string(indentation_level * 2, ' '), _name, _id, - read_blocked_by() == nullptr); + fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[", + std::string(indentation_level * 2, ' '), _name, _node_id); for (auto& child : _children) { fmt::format_to(debug_string_buffer, "{}, \n", child->debug_string(indentation_level = 1)); } @@ -294,6 +486,12 @@ std::vector HashJoinDependency::convert_block_to_null(vectorized::Bloc return results; } +void SetSharedState::set_probe_finished_children(int child_id) { + if (child_id + 1 < probe_finished_children_dependency.size()) { + probe_finished_children_dependency[child_id + 1]->set_ready_for_write(); + } +} + template Status HashJoinDependency::extract_join_column(vectorized::Block& block, vectorized::ColumnUInt8::MutablePtr& null_map, @@ -433,9 +631,17 @@ void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) { } void RuntimeFilterDependency::sub_filters() { - _filters--; - if (_filters == 0) { - *_blocked_by_rf = false; + auto value = _filters.fetch_sub(1); + if (value == 1) { + std::vector local_block_task {}; + { + std::unique_lock lc(_task_lock); + *_blocked_by_rf = false; + local_block_task.swap(_filter_blocked_task); + } + for (auto* task : local_block_task) { + task->try_wake_up(this); + } } } diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index e9635253b2..11a8975b30 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -40,19 +40,22 @@ #include "vec/exec/vanalytic_eval_node.h" #include "vec/exec/vpartition_sort_node.h" -namespace doris { -namespace pipeline { +namespace doris::pipeline { class Dependency; +class PipelineXTask; using DependencySPtr = std::shared_ptr; -static constexpr auto SLOW_DEPENDENCY_THRESHOLD = 10 * 1000L * 1000L * 1000L; -static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 5 * 1000L * 1000L * 1000L; +static constexpr auto SLOW_DEPENDENCY_THRESHOLD = 60 * 1000L * 1000L * 1000L; +static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L * 1000L * 1000L; static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD); + class Dependency : public std::enable_shared_from_this { public: - Dependency(int id, std::string name) : _id(id), _name(name), _ready_for_read(false) {} + Dependency(int id, int node_id, std::string name) + : _id(id), _node_id(node_id), _name(std::move(name)), _ready_for_read(false) {} virtual ~Dependency() = default; + virtual bool is_or_dep() { return false; } [[nodiscard]] int id() const { return _id; } [[nodiscard]] virtual std::string name() const { return _name; } virtual void* shared_state() = 0; @@ -72,35 +75,19 @@ public: } // Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready. - [[nodiscard]] virtual Dependency* read_blocked_by() { - if (config::enable_fuzzy_mode && !_ready_for_read && - _should_log(_read_dependency_watcher.elapsed_time())) { - LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " - << id(); - } - return _ready_for_read ? nullptr : this; - } + [[nodiscard]] virtual Dependency* read_blocked_by(PipelineXTask* task = nullptr); // Notify downstream pipeline tasks this dependency is ready. - virtual void set_ready_for_read() { - if (_ready_for_read) { - return; - } - _read_dependency_watcher.stop(); - _ready_for_read = true; - } + virtual void set_ready_for_read(); // Notify downstream pipeline tasks this dependency is blocked. virtual void block_reading() { _ready_for_read = false; } void set_parent(std::weak_ptr parent) { _parent = parent; } - void add_child(std::shared_ptr child) { - _children.push_back(child); - child->set_parent(weak_from_this()); - } + virtual void add_child(std::shared_ptr child) { _children.push_back(child); } - void remove_first_child() { _children.erase(_children.begin()); } + virtual void add_block_task(PipelineXTask* task); protected: bool _should_log(uint64_t cur_time) { @@ -115,6 +102,7 @@ protected: } int _id; + const int _node_id; std::string _name; std::atomic _ready_for_read; MonotonicStopWatch _read_dependency_watcher; @@ -124,15 +112,16 @@ protected: std::list> _children; uint64_t _last_log_time = 0; + std::mutex _task_lock; + std::vector _blocked_task; }; class WriteDependency : public Dependency { public: - WriteDependency(int id, std::string name) : Dependency(id, name), _ready_for_write(true) {} + WriteDependency(int id, int node_id, std::string name) : Dependency(id, node_id, name) {} ~WriteDependency() override = default; bool is_write_dependency() override { return true; } - void start_write_watcher() { for (auto& child : _children) { CHECK(child->is_write_dependency()); @@ -145,36 +134,30 @@ public: return _write_dependency_watcher.elapsed_time(); } - [[nodiscard]] virtual WriteDependency* write_blocked_by() { - if (config::enable_fuzzy_mode && !_ready_for_write && - _should_log(_write_dependency_watcher.elapsed_time())) { - LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " - << id(); - } - return _ready_for_write ? nullptr : this; - } + [[nodiscard]] virtual WriteDependency* write_blocked_by(PipelineXTask* task); - virtual void set_ready_for_write() { - if (_ready_for_write) { - return; - } - _write_dependency_watcher.stop(); - _ready_for_write = true; - } + virtual void set_ready_for_write(); virtual void block_writing() { _ready_for_write = false; } + std::string debug_string(int indentation_level = 0) override; + void add_write_block_task(PipelineXTask* task); + protected: - std::atomic _ready_for_write; + friend class Dependency; + std::atomic _ready_for_write {true}; MonotonicStopWatch _write_dependency_watcher; + +private: + std::vector _write_blocked_task; }; class FinishDependency final : public Dependency { public: - FinishDependency(int id, int node_id, std::string name) - : Dependency(id, name), _ready_to_finish(true), _node_id(node_id) {} + FinishDependency(int id, int node_id, std::string name) : Dependency(id, node_id, name) {} ~FinishDependency() override = default; + void should_finish_after_check() { _ready_to_finish = false; } void start_finish_watcher() { for (auto& child : _children) { ((FinishDependency*)child.get())->start_finish_watcher(); @@ -186,31 +169,19 @@ public: return _finish_dependency_watcher.elapsed_time(); } - [[nodiscard]] FinishDependency* finish_blocked_by() { - if (config::enable_fuzzy_mode && !_ready_to_finish && - _should_log(_finish_dependency_watcher.elapsed_time())) { - LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " - << _node_id; - } - return _ready_to_finish ? nullptr : this; - } + [[nodiscard]] FinishDependency* finish_blocked_by(PipelineXTask* task); - void set_ready_to_finish() { - if (_ready_to_finish) { - return; - } - _finish_dependency_watcher.stop(); - _ready_to_finish = true; - } - - void block_finishing() { _ready_to_finish = false; } + void set_ready_to_finish(); void* shared_state() override { return nullptr; } + std::string debug_string(int indentation_level = 0) override; -protected: - std::atomic _ready_to_finish; + void add_block_task(PipelineXTask* task) override; + +private: + std::atomic _ready_to_finish {true}; MonotonicStopWatch _finish_dependency_watcher; - const int _node_id; + std::vector _finish_blocked_task; }; class RuntimeFilterDependency; @@ -246,37 +217,34 @@ private: const int32_t _wait_time_ms; IRuntimeFilter* _runtime_filter; }; + class RuntimeFilterDependency final : public Dependency { public: RuntimeFilterDependency(int id, int node_id, std::string name) - : Dependency(id, name), _node_id(node_id) {} - - RuntimeFilterDependency* filter_blocked_by() { - if (!_blocked_by_rf) { - return nullptr; - } - if (*_blocked_by_rf) { - return this; - } - return nullptr; - } + : Dependency(id, node_id, name) {} + RuntimeFilterDependency* filter_blocked_by(PipelineXTask* task); void* shared_state() override { return nullptr; } void add_filters(IRuntimeFilter* runtime_filter); void sub_filters(); void set_blocked_by_rf(std::shared_ptr blocked_by_rf) { _blocked_by_rf = blocked_by_rf; } + std::string debug_string(int indentation_level = 0) override; + + void add_block_task(PipelineXTask* task) override; protected: - const int _node_id; std::atomic_int _filters; std::shared_ptr _blocked_by_rf; + +private: + std::vector _filter_blocked_task; }; class AndDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(AndDependency); - AndDependency(int id) : WriteDependency(id, "AndDependency") {} + AndDependency(int id, int node_id) : WriteDependency(id, node_id, "AndDependency") {} [[nodiscard]] std::string name() const override { fmt::memory_buffer debug_string_buffer; @@ -292,19 +260,19 @@ public: std::string debug_string(int indentation_level = 0) override; - [[nodiscard]] Dependency* read_blocked_by() override { + [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override { for (auto& child : _children) { - if (auto* dep = child->read_blocked_by()) { + if (auto* dep = child->read_blocked_by(task)) { return dep; } } return nullptr; } - [[nodiscard]] WriteDependency* write_blocked_by() override { + [[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task) override { for (auto& child : _children) { CHECK(child->is_write_dependency()); - if (auto* dep = ((WriteDependency*)child.get())->write_blocked_by()) { + if (auto* dep = ((WriteDependency*)child.get())->write_blocked_by(task)) { return dep; } } @@ -315,7 +283,7 @@ public: class OrDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(OrDependency); - OrDependency(int id) : WriteDependency(id, "OrDependency") {} + OrDependency(int id, int node_id) : WriteDependency(id, node_id, "OrDependency") {} [[nodiscard]] std::string name() const override { fmt::memory_buffer debug_string_buffer; @@ -331,59 +299,32 @@ public: std::string debug_string(int indentation_level = 0) override; - [[nodiscard]] Dependency* read_blocked_by() override { - Dependency* res = nullptr; - for (auto& child : _children) { - auto* cur_res = child->read_blocked_by(); - if (cur_res == nullptr) { - return nullptr; - } else { - res = cur_res; - } - } - return res; - } + bool is_or_dep() override { return true; } - [[nodiscard]] WriteDependency* write_blocked_by() override { - WriteDependency* res = nullptr; - for (auto& child : _children) { - CHECK(child->is_write_dependency()); - auto* cur_res = ((WriteDependency*)child.get())->write_blocked_by(); - if (cur_res == nullptr) { - return nullptr; - } else { - res = cur_res; - } - } - return res; + [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override; + + [[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task) override; + + void add_child(std::shared_ptr child) override { + WriteDependency::add_child(child); + child->set_parent(weak_from_this()); } }; struct FakeSharedState {}; struct FakeDependency final : public WriteDependency { public: - FakeDependency(int id) : WriteDependency(id, "FakeDependency") {} + FakeDependency(int id, int node_id) : WriteDependency(id, node_id, "FakeDependency") {} using SharedState = FakeSharedState; void* shared_state() override { return nullptr; } - [[nodiscard]] Dependency* read_blocked_by() override { return nullptr; } - [[nodiscard]] WriteDependency* write_blocked_by() override { return nullptr; } + [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override { return nullptr; } + [[nodiscard]] WriteDependency* write_blocked_by(PipelineXTask* task) override { + return nullptr; + } [[nodiscard]] int64_t read_watcher_elapse_time() override { return 0; } [[nodiscard]] int64_t write_watcher_elapse_time() override { return 0; } }; -class AsyncWriterSinkDependency : public WriteDependency { -public: - AsyncWriterSinkDependency(int id) : WriteDependency(id, "AsyncWriterSinkDependency") {} - using SharedState = FakeSharedState; - void* shared_state() override { return nullptr; } - [[nodiscard]] Dependency* read_blocked_by() override { return nullptr; } - [[nodiscard]] WriteDependency* write_blocked_by() override { return _call_func(); } - void set_write_blocked_by(std::function call_func) { - _call_func = call_func; - } - std::function _call_func; -}; - struct AggSharedState { public: AggSharedState() { @@ -412,7 +353,7 @@ public: class AggDependency final : public WriteDependency { public: using SharedState = AggSharedState; - AggDependency(int id) : WriteDependency(id, "AggDependency") { + AggDependency(int id, int node_id) : WriteDependency(id, node_id, "AggDependency") { _mem_tracker = std::make_unique("AggregateOperator:"); } ~AggDependency() override = default; @@ -421,20 +362,20 @@ public: if (_is_streaming_agg_state()) { if (_agg_state.data_queue->_cur_blocks_nums_in_queue[0] == 0 && !_agg_state.data_queue->_is_finished[0]) { - _ready_for_read = false; + Dependency::block_reading(); } } else { - _ready_for_read = false; + Dependency::block_reading(); } } void block_writing() override { if (_is_streaming_agg_state()) { if (!_agg_state.data_queue->has_enough_space_to_push()) { - _ready_for_write = false; + WriteDependency::block_writing(); } } else { - _ready_for_write = false; + WriteDependency::block_writing(); } } @@ -518,7 +459,7 @@ public: class SortDependency final : public WriteDependency { public: using SharedState = SortSharedState; - SortDependency(int id) : WriteDependency(id, "SortDependency") {} + SortDependency(int id, int node_id) : WriteDependency(id, node_id, "SortDependency") {} ~SortDependency() override = default; void* shared_state() override { return (void*)&_sort_state; }; @@ -538,15 +479,15 @@ public: class UnionDependency final : public WriteDependency { public: using SharedState = UnionSharedState; - UnionDependency(int id) : WriteDependency(id, "UnionDependency") {} + UnionDependency(int id, int node_id) : WriteDependency(id, node_id, "UnionDependency") {} ~UnionDependency() override = default; + void* shared_state() override { return (void*)_union_state.get(); } void set_shared_state(std::shared_ptr union_state) { _union_state = union_state; } - void set_ready_for_write() override {} - void set_ready_for_read() override {} - [[nodiscard]] Dependency* read_blocked_by() override { + + [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override { if (_union_state->child_count() == 0) { return nullptr; } @@ -566,27 +507,20 @@ private: struct MultiCastSharedState { public: MultiCastSharedState(const RowDescriptor& row_desc, ObjectPool* pool, int cast_sender_count) - : multi_cast_data_streamer(row_desc, pool, cast_sender_count) {} + : multi_cast_data_streamer(row_desc, pool, cast_sender_count, true) {} pipeline::MultiCastDataStreamer multi_cast_data_streamer; }; class MultiCastDependency final : public WriteDependency { public: using SharedState = MultiCastSharedState; - MultiCastDependency(int id) : WriteDependency(id, "MultiCastDependency") {} + MultiCastDependency(int id, int node_id) + : WriteDependency(id, node_id, "MultiCastDependency") {} ~MultiCastDependency() override = default; void* shared_state() override { return (void*)_multi_cast_state.get(); }; void set_shared_state(std::shared_ptr multi_cast_state) { _multi_cast_state = multi_cast_state; } - WriteDependency* read_blocked_by() override { - if (_multi_cast_state->multi_cast_data_streamer.can_read(_consumer_id)) { - return nullptr; - } - return this; - } - int _consumer_id {}; - void set_consumer_id(int consumer_id) { _consumer_id = consumer_id; } private: std::shared_ptr _multi_cast_state; @@ -617,7 +551,7 @@ public: class AnalyticDependency final : public WriteDependency { public: using SharedState = AnalyticSharedState; - AnalyticDependency(int id) : WriteDependency(id, "AnalyticDependency") {} + AnalyticDependency(int id, int node_id) : WriteDependency(id, node_id, "AnalyticDependency") {} ~AnalyticDependency() override = default; void* shared_state() override { return (void*)&_analytic_state; }; @@ -675,7 +609,7 @@ struct HashJoinSharedState : public JoinSharedState { class HashJoinDependency final : public WriteDependency { public: using SharedState = HashJoinSharedState; - HashJoinDependency(int id) : WriteDependency(id, "HashJoinDependency") {} + HashJoinDependency(int id, int node_id) : WriteDependency(id, node_id, "HashJoinDependency") {} ~HashJoinDependency() override = default; void* shared_state() override { return (void*)&_join_state; } @@ -707,7 +641,8 @@ struct NestedLoopJoinSharedState : public JoinSharedState { class NestedLoopJoinDependency final : public WriteDependency { public: using SharedState = NestedLoopJoinSharedState; - NestedLoopJoinDependency(int id) : WriteDependency(id, "NestedLoopJoinDependency") {} + NestedLoopJoinDependency(int id, int node_id) + : WriteDependency(id, node_id, "NestedLoopJoinDependency") {} ~NestedLoopJoinDependency() override = default; void* shared_state() override { return (void*)&_join_state; } @@ -727,22 +662,28 @@ public: class PartitionSortDependency final : public WriteDependency { public: using SharedState = PartitionSortNodeSharedState; - PartitionSortDependency(int id) : WriteDependency(id, "PartitionSortDependency"), _eos(false) {} + PartitionSortDependency(int id, int node_id) + : WriteDependency(id, node_id, "PartitionSortDependency"), _eos(false) {} ~PartitionSortDependency() override = default; void* shared_state() override { return (void*)&_partition_sort_state; }; - void set_ready_for_write() override {} - void block_writing() override {} - - [[nodiscard]] Dependency* read_blocked_by() override { - if (config::enable_fuzzy_mode && !(_ready_for_read || _eos) && - _should_log(_read_dependency_watcher.elapsed_time())) { - LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " - << id(); - } - return _ready_for_read || _eos ? nullptr : this; + void set_ready_for_write() override { + throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!"); + } + void block_writing() override { + throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!"); } - void set_eos() { _eos = true; } + void block_reading() override { + if (_eos) { + return; + } + Dependency::block_reading(); + } + + void set_eos() { + _eos = true; + WriteDependency::set_ready_for_read(); + } private: PartitionSortNodeSharedState _partition_sort_state; @@ -752,12 +693,17 @@ private: class AsyncWriterDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(AsyncWriterDependency); - AsyncWriterDependency(int id) : WriteDependency(id, "AsyncWriterDependency") {} + AsyncWriterDependency(int id, int node_id) + : WriteDependency(id, node_id, "AsyncWriterDependency") {} ~AsyncWriterDependency() override = default; void* shared_state() override { return nullptr; } }; +class SetDependency; + struct SetSharedState { +public: + SetSharedState(int num_deps) { probe_finished_children_dependency.resize(num_deps, nullptr); } /// default init //record memory during running int64_t mem_used = 0; @@ -782,14 +728,15 @@ struct SetSharedState { /// init in build side int child_quantity; vectorized::VExprContextSPtrs build_child_exprs; - std::vector probe_finished_children_index; // use in probe side + std::vector probe_finished_children_dependency; /// init in probe side std::vector probe_child_exprs_lists; std::atomic ready_for_read = false; -public: + void set_probe_finished_children(int child_id); + /// called in setup_local_state void hash_table_init() { if (child_exprs_lists[0].size() == 1 && (!build_not_ignore_null[0])) { @@ -845,30 +792,26 @@ public: class SetDependency final : public WriteDependency { public: using SharedState = SetSharedState; - SetDependency(int id) : WriteDependency(id, "SetDependency") {} + SetDependency(int id, int node_id) : WriteDependency(id, node_id, "SetDependency") {} ~SetDependency() override = default; void* shared_state() override { return (void*)_set_state.get(); } void set_shared_state(std::shared_ptr set_state) { _set_state = set_state; } // Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready. - [[nodiscard]] Dependency* read_blocked_by() override { + [[nodiscard]] Dependency* read_blocked_by(PipelineXTask* task) override { if (config::enable_fuzzy_mode && !_set_state->ready_for_read && _should_log(_read_dependency_watcher.elapsed_time())) { LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " - << id(); + << id() << " " << _node_id << " block tasks: " << _blocked_task.size(); + } + std::unique_lock lc(_task_lock); + if (!_set_state->ready_for_read && task) { + add_block_task(task); } return _set_state->ready_for_read ? nullptr : this; } - [[nodiscard]] WriteDependency* write_blocked_by() override { - if (is_set_probe) { - DCHECK((_cur_child_id - 1) < _set_state->probe_finished_children_index.size()); - return _set_state->probe_finished_children_index[_cur_child_id - 1] ? nullptr : this; - } - return nullptr; - } - // Notify downstream pipeline tasks this dependency is ready. void set_ready_for_read() override { if (_set_state->ready_for_read) { @@ -877,15 +820,16 @@ public: _read_dependency_watcher.stop(); _set_state->ready_for_read = true; } + void set_cur_child_id(int id) { - _cur_child_id = id; - is_set_probe = true; + _set_state->probe_finished_children_dependency[id] = this; + if (id != 0) { + block_writing(); + } } private: std::shared_ptr _set_state; - int _cur_child_id; - bool is_set_probe {false}; }; using PartitionedBlock = std::pair, @@ -894,14 +838,37 @@ struct LocalExchangeSharedState { public: ENABLE_FACTORY_CREATOR(LocalExchangeSharedState); std::vector> data_queue; + std::vector source_dependencies; std::atomic running_sink_operators = 0; + void add_running_sink_operators() { running_sink_operators++; } + void sub_running_sink_operators() { + auto val = running_sink_operators.fetch_sub(1); + if (val == 1) { + _set_ready_for_read(); + } + } + void _set_ready_for_read() { + for (auto* dep : source_dependencies) { + DCHECK(dep); + dep->set_ready_for_read(); + } + } + void set_dep_by_channel_id(Dependency* dep, int channel_id) { + source_dependencies[channel_id] = dep; + dep->block_reading(); + } + void set_ready_for_read(int channel_id) { + auto* dep = source_dependencies[channel_id]; + DCHECK(dep); + dep->set_ready_for_read(); + } }; struct LocalExchangeDependency final : public WriteDependency { public: using SharedState = LocalExchangeSharedState; - LocalExchangeDependency(int id) - : WriteDependency(id, "LocalExchangeDependency"), + LocalExchangeDependency(int id, int node_id) + : WriteDependency(id, node_id, "LocalExchangeDependency"), _local_exchange_shared_state(nullptr) {} ~LocalExchangeDependency() override = default; void* shared_state() override { return _local_exchange_shared_state.get(); } @@ -910,27 +877,8 @@ public: _local_exchange_shared_state = state; } - void set_channel_id(int channel_id) { _channel_id = channel_id; } - - Dependency* read_blocked_by() override { - if (config::enable_fuzzy_mode && !_should_run() && - _should_log(_read_dependency_watcher.elapsed_time())) { - LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " - << id(); - } - return _should_run() ? nullptr : this; - } - private: - bool _should_run() const { - DCHECK(_local_exchange_shared_state != nullptr); - return _local_exchange_shared_state->data_queue[_channel_id].size_approx() > 0 || - _local_exchange_shared_state->running_sink_operators == 0; - } - std::shared_ptr _local_exchange_shared_state; - int _channel_id; }; -} // namespace pipeline -} // namespace doris +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp index eba75c0fc1..a793a22761 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -27,7 +27,7 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime"); auto& p = _parent->cast(); RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner)); - _shared_state->running_sink_operators++; + _shared_state->add_running_sink_operators(); return Status::OK(); } @@ -60,6 +60,7 @@ Status LocalExchangeSinkLocalState::split_rows(RuntimeState* state, if (size > 0) { data_queue[i].enqueue({new_block, {row_idx, start, size}}); } + _shared_state->set_ready_for_read(i); } return Status::OK(); @@ -83,7 +84,7 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* } if (source_state == SourceState::FINISHED) { - local_state._shared_state->running_sink_operators--; + local_state._shared_state->sub_running_sink_operators(); } return Status::OK(); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp index 127e14dba6..a1bff19cb2 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -27,7 +27,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& _shared_state = (LocalExchangeSharedState*)_dependency->shared_state(); DCHECK(_shared_state != nullptr); _channel_id = info.task_idx; - _dependency->set_channel_id(_channel_id); + _shared_state->set_dep_by_channel_id(_dependency, _channel_id); _get_block_failed_counter = ADD_COUNTER_WITH_LEVEL(profile(), "GetBlockFailedTime", TUnit::UNIT, 1); _copy_data_timer = ADD_TIMER(profile(), "CopyDataTime"); diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 9e6df06da0..cc21a38965 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -288,7 +288,7 @@ void DataSinkOperatorX::get_dependency(vector& d if constexpr (!std::is_same_v) { auto& dests = dests_id(); for (auto& dest_id : dests) { - dependency.push_back(std::make_shared(dest_id)); + dependency.push_back(std::make_shared(dest_id, _node_id)); } } else { dependency.push_back(nullptr); @@ -341,7 +341,7 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState } } else { auto& deps = info.dependencys; - deps.front() = std::make_shared(0); + deps.front() = std::make_shared(0, 0); _dependency = (DependencyType*)deps.front().get(); } @@ -403,7 +403,7 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, } } else { auto& deps = info.dependencys; - deps.front() = std::make_shared(0); + deps.front() = std::make_shared(0, 0); _dependency = (DependencyType*)deps.front().get(); } _rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows", TUnit::UNIT, 1); @@ -481,15 +481,14 @@ Status AsyncWriterSink::init(RuntimeState* state, LocalSinkState RETURN_IF_ERROR( _parent->cast()._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); } - static_cast(_dependency)->set_write_blocked_by([this]() { - return this->write_blocked_by(); - }); _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs)); - _async_writer_dependency = AsyncWriterDependency::create_shared(_parent->operator_id()); + _async_writer_dependency = + AsyncWriterDependency::create_shared(_parent->operator_id(), _parent->node_id()); _writer->set_dependency(_async_writer_dependency.get(), _finish_dependency.get()); _wait_for_dependency_timer = ADD_TIMER(_profile, "WaitForDependency[" + _async_writer_dependency->name() + "]Time"); + _finish_dependency->should_finish_after_check(); return Status::OK(); } @@ -507,8 +506,8 @@ Status AsyncWriterSink::sink(RuntimeState* state, vectorized::Bl } template -WriteDependency* AsyncWriterSink::write_blocked_by() { - return _writer->write_blocked_by(); +WriteDependency* AsyncWriterSink::write_blocked_by(PipelineXTask* task) { + return _writer->write_blocked_by(task); } template diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 265579bfaa..4e8f030d50 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -625,9 +625,9 @@ public: }; template -class AsyncWriterSink : public PipelineXSinkLocalState { +class AsyncWriterSink : public PipelineXSinkLocalState { public: - using Base = PipelineXSinkLocalState; + using Base = PipelineXSinkLocalState; AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state), _async_writer_dependency(nullptr) {} @@ -637,8 +637,8 @@ public: Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state); - WriteDependency* write_blocked_by(); - + WriteDependency* write_blocked_by(PipelineXTask* task); + WriteDependency* dependency() override { return _async_writer_dependency.get(); } Status close(RuntimeState* state, Status exec_status) override; Status try_close(RuntimeState* state, Status exec_status) override; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 7113989ee1..28e1be496f 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -422,8 +422,8 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( auto task = std::make_unique( _pipelines[pip_idx], _total_tasks++, _runtime_states[i].get(), this, _runtime_states[i]->runtime_profile(), - _op_id_to_le_state.count( - _pipelines[pip_idx]->operator_xs().front()->operator_id()) > 0 + _op_id_to_le_state.contains( + _pipelines[pip_idx]->operator_xs().front()->operator_id()) ? _op_id_to_le_state [_pipelines[pip_idx]->operator_xs().front()->operator_id()] : nullptr, @@ -592,13 +592,14 @@ Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorX _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new LocalExchangeSinkOperatorX( - local_exchange_id, _runtime_state->query_parallel_instance_num(), texprs)); + auto num_instances = _runtime_state->query_parallel_instance_num(); + sink.reset(new LocalExchangeSinkOperatorX(local_exchange_id, num_instances, texprs)); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init()); auto shared_state = LocalExchangeSharedState::create_shared(); - shared_state->data_queue.resize(_runtime_state->query_parallel_instance_num()); + shared_state->data_queue.resize(num_instances); + shared_state->source_dependencies.resize(num_instances, nullptr); _op_id_to_le_state.insert({local_exchange_id, shared_state}); return Status::OK(); } @@ -1029,4 +1030,18 @@ bool PipelineXFragmentContext::_has_inverted_index_or_partial_update(TOlapTableS return false; } +std::string PipelineXFragmentContext::debug_string() { + fmt::memory_buffer debug_string_buffer; + for (size_t j = 0; j < _tasks.size(); j++) { + fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j); + for (size_t i = 0; i < _tasks[j].size(); i++) { + if (_tasks[j][i]->get_state() == PipelineTaskState::FINISHED) { + continue; + } + fmt::format_to(debug_string_buffer, "Task {}: {}\n", i, _tasks[j][i]->debug_string()); + } + } + + return fmt::to_string(debug_string_buffer); +} } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 6fa91aedf1..9e7ff42219 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -115,6 +115,8 @@ public: [[nodiscard]] int max_operator_id() const { return _operator_id; } + std::string debug_string() override; + private: void _close_action() override; Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 140fecbb1d..7295f38a12 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -91,14 +91,14 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams } _block = doris::vectorized::Block::create_unique(); - RETURN_IF_ERROR(extract_dependencies()); + RETURN_IF_ERROR(_extract_dependencies()); // We should make sure initial state for task are runnable so that we can do some preparation jobs (e.g. initialize runtime filters). set_state(PipelineTaskState::RUNNABLE); _prepared = true; return Status::OK(); } -Status PipelineXTask::extract_dependencies() { +Status PipelineXTask::_extract_dependencies() { for (auto op : _operators) { auto result = _state->get_local_state_result(op->operator_id()); if (!result) { @@ -179,7 +179,22 @@ Status PipelineXTask::_open() { SCOPED_TIMER(_open_timer); _dry_run = _sink->should_dry_run(_state); for (auto& o : _operators) { - RETURN_IF_ERROR(_state->get_local_state(o->operator_id())->open(_state)); + auto* local_state = _state->get_local_state(o->operator_id()); + for (size_t i = 0; i < 2; i++) { + auto st = local_state->open(_state); + if (st.is()) { + _blocked_dep = _filter_dependency->filter_blocked_by(this); + if (_blocked_dep) { + set_state(PipelineTaskState::BLOCKED_FOR_RF); + set_use_blocking_queue(false); + RETURN_IF_ERROR(st); + } else if (i == 1) { + CHECK(false) << debug_string(); + } + } else { + break; + } + } } RETURN_IF_ERROR(_state->get_sink_local_state(_sink->operator_id())->open(_state)); _opened = true; @@ -204,10 +219,6 @@ Status PipelineXTask::execute(bool* eos) { SCOPED_RAW_TIMER(&time_spent); auto st = _open(); if (st.is()) { - set_state(PipelineTaskState::BLOCKED_FOR_RF); - return Status::OK(); - } else if (st.is()) { - set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); return Status::OK(); } RETURN_IF_ERROR(st); @@ -326,17 +337,10 @@ std::string PipelineXTask::debug_string() { fmt::format_to(debug_string_buffer, "InstanceId: {}\n", print_id(_state->fragment_instance_id())); - fmt::format_to(debug_string_buffer, "RuntimeUsage: {}\n", - PrettyPrinter::print(get_runtime_ns(), TUnit::TIME_NS)); - { - std::stringstream profile_ss; - _fresh_profile_counter(); - _task_profile->pretty_print(&profile_ss, ""); - fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str()); - } - fmt::format_to(debug_string_buffer, - "PipelineTask[this = {}, state = {}]\noperators: ", (void*)this, - get_state_name(_cur_state)); + fmt::format_to( + debug_string_buffer, + "PipelineTask[this = {}, state = {}, data state = {}, dry run = {}]\noperators: ", + (void*)this, get_state_name(_cur_state), (int)_data_state, _dry_run); for (size_t i = 0; i < _operators.size(); i++) { fmt::format_to( debug_string_buffer, "\n{}", @@ -345,7 +349,42 @@ std::string PipelineXTask::debug_string() { fmt::format_to(debug_string_buffer, "\n{}", _opened ? _sink->debug_string(_state, _operators.size()) : _sink->debug_string(_operators.size())); + fmt::format_to(debug_string_buffer, "\nRead Dependency Information: \n"); + for (size_t i = 0; i < _read_dependencies.size(); i++) { + fmt::format_to(debug_string_buffer, "{}{}\n", std::string(i * 2, ' '), + _read_dependencies[i]->debug_string()); + } + + fmt::format_to(debug_string_buffer, "Write Dependency Information: \n"); + fmt::format_to(debug_string_buffer, "{}\n", _write_dependencies->debug_string()); + + fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n"); + fmt::format_to(debug_string_buffer, "{}\n", _filter_dependency->debug_string()); + + fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n"); + for (size_t i = 0; i < _finish_dependencies.size(); i++) { + fmt::format_to(debug_string_buffer, "{}{}\n", std::string(i * 2, ' '), + _finish_dependencies[i]->debug_string()); + } return fmt::to_string(debug_string_buffer); } +void PipelineXTask::try_wake_up(Dependency* wake_up_dep) { + // call by dependency + VecDateTimeValue now = VecDateTimeValue::local_time(); + // TODO(gabriel): task will never be wake up if canceled / timeout + if (query_context()->is_cancelled()) { + _make_run(); + return; + } + if (query_context()->is_timeout(now)) { + query_context()->cancel(true, "", Status::Cancelled("")); + } + _make_run(); +} + +void PipelineXTask::_make_run() { + static_cast(get_task_queue()->push_back(this)); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 90fdda921f..bc50f1e89d 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -74,51 +74,21 @@ public: if (_dry_run) { return true; } - for (auto* op_dep : _read_dependencies) { - auto* dep = op_dep->read_blocked_by(); - if (dep != nullptr) { - dep->start_read_watcher(); - push_blocked_task_to_dependency(dep); - return false; - } - } - return true; + return _read_blocked_dependency() == nullptr; } bool runtime_filters_are_ready_or_timeout() override { - auto* dep = _filter_dependency->filter_blocked_by(); - if (dep != nullptr) { - push_blocked_task_to_dependency(dep); - return false; - } - return true; + throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not reach here!"); + return false; } - bool sink_can_write() override { - auto* dep = _write_dependencies->write_blocked_by(); - if (dep != nullptr) { - dep->start_write_watcher(); - push_blocked_task_to_dependency(dep); - return false; - } - return true; - } + bool sink_can_write() override { return _write_blocked_dependency() == nullptr; } Status finalize() override; std::string debug_string() override; - bool is_pending_finish() override { - for (auto* fin_dep : _finish_dependencies) { - auto* dep = fin_dep->finish_blocked_by(); - if (dep != nullptr) { - dep->start_finish_watcher(); - push_blocked_task_to_dependency(dep); - return true; - } - } - return false; - } + bool is_pending_finish() override { return _finish_blocked_dependency() != nullptr; } std::vector& get_downstream_dependency() { return _downstream_dependency; } @@ -147,9 +117,9 @@ public: return _upstream_dependency[id]; } - Status extract_dependencies(); + bool is_pipelineX() const override { return true; } - void push_blocked_task_to_dependency(Dependency* dep) {} + void try_wake_up(Dependency* wake_up_dep); DataSinkOperatorXPtr sink() const { return _sink; } @@ -157,7 +127,60 @@ public: OperatorXs operatorXs() { return _operators; } + bool push_blocked_task_to_queue() { + /** + * Push task into blocking queue if: + * 1. `_use_blocking_queue` is true. + * 2. Or this task is blocked by FE two phase execution (BLOCKED_FOR_DEPENDENCY). + */ + return _use_blocking_queue || get_state() == PipelineTaskState::BLOCKED_FOR_DEPENDENCY; + } + void set_use_blocking_queue(bool use_blocking_queue) { + if (_blocked_dep->is_or_dep()) { + _use_blocking_queue = true; + return; + } + _use_blocking_queue = use_blocking_queue; + } + private: + Dependency* _write_blocked_dependency() { + _blocked_dep = _write_dependencies->write_blocked_by(this); + if (_blocked_dep != nullptr) { + set_use_blocking_queue(false); + static_cast(_blocked_dep)->start_write_watcher(); + return _blocked_dep; + } + return nullptr; + } + + Dependency* _finish_blocked_dependency() { + for (auto* fin_dep : _finish_dependencies) { + _blocked_dep = fin_dep->finish_blocked_by(this); + if (_blocked_dep != nullptr) { + set_use_blocking_queue(false); + static_cast(_blocked_dep)->start_finish_watcher(); + return _blocked_dep; + } + } + return nullptr; + } + + Dependency* _read_blocked_dependency() { + for (auto* op_dep : _read_dependencies) { + _blocked_dep = op_dep->read_blocked_by(this); + if (_blocked_dep != nullptr) { + // TODO(gabriel): + set_use_blocking_queue(true); + _blocked_dep->start_read_watcher(); + return _blocked_dep; + } + } + return nullptr; + } + + Status _extract_dependencies(); + void _make_run(); void set_close_pipeline_time() override {} void _init_profile() override; void _fresh_profile_counter() override; @@ -180,6 +203,10 @@ private: std::shared_ptr _local_exchange_state; int _task_idx; bool _dry_run = false; + + Dependency* _blocked_dep {nullptr}; + + std::atomic _use_blocking_queue {true}; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index aa3891a5a2..9ce2711c27 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -34,6 +34,8 @@ #include "common/logging.h" #include "common/signal_handler.h" #include "pipeline/pipeline_task.h" +#include "pipeline/pipeline_x/dependency.h" +#include "pipeline/pipeline_x/pipeline_x_task.h" #include "pipeline/task_queue.h" #include "pipeline_fragment_context.h" #include "runtime/query_context.h" @@ -75,6 +77,11 @@ Status BlockedTaskScheduler::add_blocked_task(PipelineTask* task) { return Status::InternalError("BlockedTaskScheduler shutdown"); } std::unique_lock lock(_task_mutex); + if (task->is_pipelineX() && !static_cast(task)->push_blocked_task_to_queue()) { + // put this task into current dependency's blocking queue and wait for event notification + // instead of using a separate BlockedTaskScheduler. + return Status::OK(); + } _blocked_tasks.push_back(task); _task_cond.notify_one(); return Status::OK(); @@ -222,24 +229,30 @@ void TaskScheduler::_do_work(size_t index) { if (!task) { continue; } + if (task->is_pipelineX() && task->is_running()) { + static_cast(_task_queue->push_back(task, index)); + continue; + } + task->set_running(true); task->set_task_queue(_task_queue.get()); auto* fragment_ctx = task->fragment_context(); signal::query_id_hi = fragment_ctx->get_query_id().hi; signal::query_id_lo = fragment_ctx->get_query_id().lo; bool canceled = fragment_ctx->is_canceled(); - auto check_state = task->get_state(); - if (check_state == PipelineTaskState::PENDING_FINISH) { - DCHECK(!task->is_pending_finish()) << "must not pending close " << task->debug_string(); + auto state = task->get_state(); + if (state == PipelineTaskState::PENDING_FINISH) { + DCHECK(task->is_pipelineX() || !task->is_pending_finish()) + << "must not pending close " << task->debug_string(); Status exec_status = fragment_ctx->get_query_context()->exec_status(); _try_close_task(task, canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED, exec_status); continue; } - DCHECK(check_state != PipelineTaskState::FINISHED && - check_state != PipelineTaskState::CANCELED) - << "task already finish"; + + DCHECK(state != PipelineTaskState::FINISHED && state != PipelineTaskState::CANCELED) + << "task already finish: " << task->debug_string(); if (canceled) { // may change from pending FINISH,should called cancel @@ -253,7 +266,13 @@ void TaskScheduler::_do_work(size_t index) { continue; } - DCHECK(check_state == PipelineTaskState::RUNNABLE); + if (task->is_pipelineX()) { + task->set_state(PipelineTaskState::RUNNABLE); + } + + DCHECK(task->is_pipelineX() || task->get_state() == PipelineTaskState::RUNNABLE) + << "state:" << get_state_name(task->get_state()) + << " task: " << task->debug_string(); // task exec bool eos = false; auto status = Status::OK(); @@ -313,6 +332,7 @@ void TaskScheduler::_do_work(size_t index) { } auto pipeline_state = task->get_state(); + task->set_running(false); switch (pipeline_state) { case PipelineTaskState::BLOCKED_FOR_SOURCE: case PipelineTaskState::BLOCKED_FOR_SINK: @@ -324,7 +344,8 @@ void TaskScheduler::_do_work(size_t index) { static_cast(_task_queue->push_back(task, index)); break; default: - DCHECK(false) << "error state after run task, " << get_state_name(pipeline_state); + DCHECK(false) << "error state after run task, " << get_state_name(pipeline_state) + << " task: " << task->debug_string(); break; } } @@ -344,21 +365,24 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state, cancel(); // Call `close` if `try_close` failed to make sure allocated resources are released static_cast(task->close(exec_status)); - } else if (!task->is_pending_finish()) { - status = task->close(exec_status); - if (!status.ok() && state != PipelineTaskState::CANCELED) { - cancel(); - } - } - - if (task->is_pending_finish()) { + } else if (!task->is_pipelineX() && task->is_pending_finish()) { task->set_state(PipelineTaskState::PENDING_FINISH); static_cast(_blocked_task_scheduler->add_blocked_task(task)); + task->set_running(false); return; + } else if (task->is_pending_finish()) { + task->set_state(PipelineTaskState::PENDING_FINISH); + task->set_running(false); + return; + } + status = task->close(exec_status); + if (!status.ok() && state != PipelineTaskState::CANCELED) { + cancel(); } task->set_state(state); task->set_close_pipeline_time(); task->release_dependency(); + task->set_running(false); task->fragment_context()->close_a_pipeline(); } diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 9b85ec420e..070f2b6a5a 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -65,7 +65,6 @@ private: static constexpr auto EMPTY_TIMES_TO_YIELD = 64; -private: void _schedule(); void _make_task_run(std::list& local_tasks, std::list::iterator& task_itr, diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index e4b703e792..a7a89bfd42 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -797,6 +797,24 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, return Status::OK(); } +std::string FragmentMgr::dump_pipeline_tasks() { + fmt::memory_buffer debug_string_buffer; + auto t = MonotonicNanos(); + size_t i = 0; + { + std::lock_guard lock(_lock); + fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts are still running!\n", + _pipeline_map.size()); + for (auto& it : _pipeline_map) { + fmt::format_to(debug_string_buffer, "No.{} (elapse time = {}, InstanceId = {}) : {}\n", + i, t - it.second->create_time(), print_id(it.first), + it.second->debug_string()); + i++; + } + } + return fmt::to_string(debug_string_buffer); +} + Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, const FinishCallback& cb) { VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment params is " diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 93f010a5ec..08f95bd335 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -143,6 +143,8 @@ public: return _query_ctx_map.size(); } + std::string dump_pipeline_tasks(); + private: void cancel_unlocked_impl(const TUniqueId& id, const PPlanFragmentCancelReason& reason, const std::unique_lock& state_lock, bool is_pipeline, diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index c4f24cfdf8..cb8f275574 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -41,6 +41,7 @@ #include "http/action/meta_action.h" #include "http/action/metrics_action.h" #include "http/action/pad_rowset_action.h" +#include "http/action/pipeline_task_action.h" #include "http/action/pprof_actions.h" #include "http/action/reload_tablet_action.h" #include "http/action/reset_rpc_channel_action.h" @@ -165,6 +166,11 @@ Status HttpService::start() { HealthAction* health_action = _pool.add(new HealthAction()); _ev_http_server->register_handler(HttpMethod::GET, "/api/health", health_action); + // Register BE health action + PipelineTaskAction* pipeline_task_action = _pool.add(new PipelineTaskAction()); + _ev_http_server->register_handler(HttpMethod::GET, "/api/running_pipeline_tasks", + pipeline_task_action); + // Register Tablets Info action TabletsInfoAction* tablets_info_action = _pool.add(new TabletsInfoAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index d3688507fc..ef6a9d415d 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -65,6 +65,7 @@ ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::V ctx_id = UniqueId::gen_uid().to_string(); if (_scanners.empty()) { _is_finished = true; + _set_scanner_done(); } if (limit < 0) { limit = -1; @@ -135,10 +136,6 @@ Status ScannerContext::init() { // 4. This ctx will be submitted to the scanner scheduler right after init. // So set _num_scheduling_ctx to 1 here. _num_scheduling_ctx = 1; - if (_finish_dependency) { - std::lock_guard l(_transfer_lock); - _finish_dependency->block_finishing(); - } _num_unfinished_scanners = _scanners.size(); @@ -230,9 +227,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo auto state = _scanner_scheduler->submit(this); if (state.ok()) { _num_scheduling_ctx++; - if (_finish_dependency) { - _finish_dependency->block_finishing(); - } } else { set_status_on_error(state, false); } @@ -304,12 +298,11 @@ Status ScannerContext::validate_block_schema(Block* block) { } void ScannerContext::set_should_stop() { - if (_scanner_done_dependency) { - _scanner_done_dependency->set_ready_for_read(); - } std::lock_guard l(_transfer_lock); _should_stop = true; + _set_scanner_done(); _blocks_queue_added_cv.notify_one(); + set_ready_to_finish(); } void ScannerContext::inc_num_running_scanners(int32_t inc) { @@ -320,19 +313,20 @@ void ScannerContext::inc_num_running_scanners(int32_t inc) { void ScannerContext::dec_num_scheduling_ctx() { std::lock_guard l(_transfer_lock); _num_scheduling_ctx--; - if (_finish_dependency) { - if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) { - _finish_dependency->set_ready_to_finish(); - } else { - _finish_dependency->block_finishing(); - } - } - + set_ready_to_finish(); if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) { _ctx_finish_cv.notify_one(); } } +void ScannerContext::set_ready_to_finish() { + // `_should_stop == true` means this task has already ended and wait for pending finish now. + if (_finish_dependency && _should_stop && _num_running_scanners == 0 && + _num_scheduling_ctx == 0) { + _finish_dependency->set_ready_to_finish(); + } +} + bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) { std::unique_lock l(_transfer_lock, std::defer_lock); if (need_lock) { @@ -342,10 +336,8 @@ bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) { _process_status = status; _status_error = true; _blocks_queue_added_cv.notify_one(); - if (_scanner_done_dependency) { - _scanner_done_dependency->set_ready_for_read(); - } _should_stop = true; + _set_scanner_done(); return true; } return false; @@ -437,6 +429,12 @@ bool ScannerContext::no_schedule() { return _num_running_scanners == 0 && _num_scheduling_ctx == 0; } +void ScannerContext::_set_scanner_done() { + if (_scanner_done_dependency) { + _scanner_done_dependency->set_ready_for_read(); + } +} + std::string ScannerContext::debug_string() { return fmt::format( "id: {}, sacnners: {}, blocks in queue: {}," @@ -455,9 +453,6 @@ void ScannerContext::reschedule_scanner_ctx() { //todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times? if (state.ok()) { _num_scheduling_ctx++; - if (_finish_dependency) { - _finish_dependency->block_finishing(); - } } else { set_status_on_error(state, false); } @@ -474,17 +469,12 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { // We have to decrease _num_running_scanners before schedule, otherwise // schedule does not woring due to _num_running_scanners. _num_running_scanners--; - if (_finish_dependency && _num_running_scanners == 0 && _num_scheduling_ctx == 0) { - _finish_dependency->set_ready_to_finish(); - } + set_ready_to_finish(); if (should_be_scheduled()) { auto state = _scanner_scheduler->submit(this); if (state.ok()) { _num_scheduling_ctx++; - if (_finish_dependency) { - _finish_dependency->block_finishing(); - } } else { set_status_on_error(state, false); } @@ -499,9 +489,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { (--_num_unfinished_scanners) == 0) { _dispose_coloate_blocks_not_in_queue(); _is_finished = true; - if (_scanner_done_dependency) { - _scanner_done_dependency->set_ready_for_read(); - } + _set_scanner_done(); _blocks_queue_added_cv.notify_one(); } _ctx_finish_cv.notify_one(); diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 244aedf87a..10b4775cef 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -120,6 +120,8 @@ public: void inc_num_running_scanners(int32_t scanner_inc); + void set_ready_to_finish(); + int get_num_running_scanners() const { return _num_running_scanners; } void dec_num_scheduling_ctx(); @@ -185,6 +187,8 @@ private: protected: virtual void _dispose_coloate_blocks_not_in_queue() {} + void _set_scanner_done(); + RuntimeState* _state; VScanNode* _parent; pipeline::ScanLocalStateBase* _local_state; diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index be291828f0..891161f071 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -363,7 +363,7 @@ VDataStreamRecvr::VDataStreamRecvr( for (size_t i = 0; i < num_queues; i++) { _sender_to_local_channel_dependency[i] = pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id, - _mem_available); + _dest_node_id); } } _sender_queues.reserve(num_queues); diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 9f2f597494..4ae7c3d364 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -41,7 +41,6 @@ void AsyncResultWriter::set_dependency(pipeline::AsyncWriterDependency* dep, pipeline::FinishDependency* finish_dep) { _dependency = dep; _finish_dependency = finish_dep; - _finish_dependency->block_finishing(); } Status AsyncResultWriter::sink(Block* block, bool eos) { @@ -181,10 +180,10 @@ std::unique_ptr AsyncResultWriter::_get_free_block(doris::vectorized::Blo return b; } -pipeline::WriteDependency* AsyncResultWriter::write_blocked_by() { +pipeline::WriteDependency* AsyncResultWriter::write_blocked_by(pipeline::PipelineXTask* task) { std::lock_guard l(_m); DCHECK(_dependency != nullptr); - return _dependency->write_blocked_by(); + return _dependency->write_blocked_by(task); } } // namespace vectorized diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index 780f8b506e..75cc6529ba 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -36,6 +36,7 @@ namespace pipeline { class AsyncWriterDependency; class WriteDependency; class FinishDependency; +class PipelineXTask; } // namespace pipeline @@ -79,7 +80,7 @@ public: return _data_queue_is_available() || _is_finished(); } - pipeline::WriteDependency* write_blocked_by(); + pipeline::WriteDependency* write_blocked_by(pipeline::PipelineXTask* task); [[nodiscard]] bool is_pending_finish() const { return !_writer_thread_closed; }