From 1b3512d9423168f80b4d0f9b13fbe8fc8da87fde Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 22 Nov 2023 22:31:34 +0800 Subject: [PATCH] [pipelineX](bug) Fix cancel timeout (#27396) --- .../pipeline/exec/aggregation_sink_operator.h | 3 +- .../exec/aggregation_source_operator.h | 3 +- be/src/pipeline/exec/analytic_sink_operator.h | 4 +- .../pipeline/exec/analytic_source_operator.h | 4 +- be/src/pipeline/exec/es_scan_operator.cpp | 5 +- .../pipeline/exec/exchange_sink_operator.cpp | 16 ++--- be/src/pipeline/exec/exchange_sink_operator.h | 22 ++++-- .../exec/exchange_source_operator.cpp | 5 +- .../pipeline/exec/exchange_source_operator.h | 4 +- be/src/pipeline/exec/file_scan_operator.cpp | 5 +- be/src/pipeline/exec/hashjoin_build_sink.cpp | 4 +- be/src/pipeline/exec/hashjoin_build_sink.h | 8 +-- .../pipeline/exec/hashjoin_probe_operator.h | 4 +- be/src/pipeline/exec/meta_scan_operator.cpp | 2 +- .../exec/multi_cast_data_stream_sink.h | 4 +- .../exec/multi_cast_data_stream_source.h | 4 +- .../exec/nested_loop_join_build_operator.h | 4 +- .../exec/nested_loop_join_probe_operator.h | 4 +- be/src/pipeline/exec/olap_scan_operator.cpp | 8 ++- .../exec/partition_sort_sink_operator.cpp | 10 ++- .../exec/partition_sort_sink_operator.h | 4 +- .../exec/partition_sort_source_operator.h | 22 +++++- be/src/pipeline/exec/result_sink_operator.cpp | 4 +- be/src/pipeline/exec/result_sink_operator.h | 4 +- be/src/pipeline/exec/scan_operator.cpp | 28 ++++---- be/src/pipeline/exec/scan_operator.h | 10 ++- .../pipeline/exec/set_probe_sink_operator.h | 4 +- be/src/pipeline/exec/set_sink_operator.h | 3 +- be/src/pipeline/exec/set_source_operator.h | 3 +- be/src/pipeline/exec/sort_sink_operator.h | 3 +- be/src/pipeline/exec/sort_source_operator.h | 3 +- be/src/pipeline/exec/union_sink_operator.h | 4 +- .../pipeline/exec/union_source_operator.cpp | 4 +- be/src/pipeline/exec/union_source_operator.h | 5 +- be/src/pipeline/pipeline_task.h | 10 +++ be/src/pipeline/pipeline_x/dependency.cpp | 28 +++++--- be/src/pipeline/pipeline_x/dependency.h | 67 ++++++++++--------- .../local_exchange_sink_operator.h | 4 +- .../local_exchange_source_operator.h | 4 +- be/src/pipeline/pipeline_x/operator.cpp | 51 ++++++-------- be/src/pipeline/pipeline_x/operator.h | 12 ++-- .../pipeline_x_fragment_context.cpp | 7 +- .../pipeline/pipeline_x/pipeline_x_task.cpp | 23 ++++--- be/src/pipeline/pipeline_x/pipeline_x_task.h | 7 ++ be/src/runtime/fragment_mgr.cpp | 10 +++ be/src/runtime/query_context.cpp | 43 ++++++++++++ be/src/runtime/query_context.h | 18 +++-- be/src/vec/runtime/vdata_stream_recvr.cpp | 10 +-- 48 files changed, 318 insertions(+), 200 deletions(-) create mode 100644 be/src/runtime/query_context.cpp diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 3f1ce26036..2d16df2be2 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -48,7 +48,8 @@ public: class AggSinkDependency final : public Dependency { public: using SharedState = AggSharedState; - AggSinkDependency(int id, int node_id) : Dependency(id, node_id, "AggSinkDependency", true) {} + AggSinkDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "AggSinkDependency", true, query_ctx) {} ~AggSinkDependency() override = default; void set_ready() override { diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index 79671fb9c7..9c6d3e0fd0 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -51,7 +51,8 @@ public: class AggSourceDependency final : public Dependency { public: using SharedState = AggSharedState; - AggSourceDependency(int id, int node_id) : Dependency(id, node_id, "AggSourceDependency") {} + AggSourceDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "AggSourceDependency", query_ctx) {} ~AggSourceDependency() override = default; void block() override { diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index a90321f5e3..1e8152a28f 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -48,8 +48,8 @@ public: class AnalyticSinkDependency final : public Dependency { public: using SharedState = AnalyticSharedState; - AnalyticSinkDependency(int id, int node_id) - : Dependency(id, node_id, "AnalyticSinkDependency", true) {} + AnalyticSinkDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "AnalyticSinkDependency", true, query_ctx) {} ~AnalyticSinkDependency() override = default; }; diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h index 0a741181f8..8b76fbfe26 100644 --- a/be/src/pipeline/exec/analytic_source_operator.h +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -49,8 +49,8 @@ public: class AnalyticSourceDependency final : public Dependency { public: using SharedState = AnalyticSharedState; - AnalyticSourceDependency(int id, int node_id) - : Dependency(id, node_id, "AnalyticSourceDependency") {} + AnalyticSourceDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "AnalyticSourceDependency", query_ctx) {} ~AnalyticSourceDependency() override = default; }; diff --git a/be/src/pipeline/exec/es_scan_operator.cpp b/be/src/pipeline/exec/es_scan_operator.cpp index b911291795..8567db9094 100644 --- a/be/src/pipeline/exec/es_scan_operator.cpp +++ b/be/src/pipeline/exec/es_scan_operator.cpp @@ -55,7 +55,7 @@ Status EsScanLocalState::_init_profile() { Status EsScanLocalState::_process_conjuncts() { RETURN_IF_ERROR(Base::_process_conjuncts()); - if (Base::_scan_dependency->eos()) { + if (Base::_eos) { return Status::OK(); } @@ -66,7 +66,8 @@ Status EsScanLocalState::_process_conjuncts() { Status EsScanLocalState::_init_scanners(std::list* scanners) { if (_scan_ranges.empty()) { - Base::_scan_dependency->set_eos(); + _eos = true; + _scan_dependency->set_ready(); return Status::OK(); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 616a1bd76f..f0e03596cc 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -174,16 +174,16 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf register_channels(_sink_buffer.get()); - _exchange_sink_dependency = - AndDependency::create_shared(_parent->operator_id(), _parent->node_id()); - _queue_dependency = - ExchangeSinkQueueDependency::create_shared(_parent->operator_id(), _parent->node_id()); + _exchange_sink_dependency = AndDependency::create_shared( + _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); + _queue_dependency = ExchangeSinkQueueDependency::create_shared( + _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); _exchange_sink_dependency->add_child(_queue_dependency); if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && !only_local_exchange) { - _broadcast_dependency = - BroadcastDependency::create_shared(_parent->operator_id(), _parent->node_id()); + _broadcast_dependency = BroadcastDependency::create_shared( + _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); _broadcast_dependency->set_available_block(config::num_broadcast_buffer); _broadcast_pb_blocks.reserve(config::num_broadcast_buffer); for (size_t i = 0; i < config::num_broadcast_buffer; i++) { @@ -198,8 +198,8 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf size_t dep_id = 0; _local_channels_dependency.resize(local_size); _wait_channel_timer.resize(local_size); - auto deps_for_channels = - AndDependency::create_shared(_parent->operator_id(), _parent->node_id()); + auto deps_for_channels = AndDependency::create_shared( + _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); for (auto channel : channels) { if (channel->is_local()) { _local_channels_dependency[dep_id] = channel->get_local_channel_dependency(); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 89b024a6c6..19a326d5c6 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -67,18 +67,28 @@ private: class ExchangeSinkQueueDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(ExchangeSinkQueueDependency); - ExchangeSinkQueueDependency(int id, int node_id) - : Dependency(id, node_id, "ResultQueueDependency", true) {} + ExchangeSinkQueueDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "ResultQueueDependency", true, query_ctx) {} ~ExchangeSinkQueueDependency() override = default; }; class BroadcastDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(BroadcastDependency); - BroadcastDependency(int id, int node_id) - : Dependency(id, node_id, "BroadcastDependency", true), _available_block(0) {} + BroadcastDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "BroadcastDependency", true, query_ctx), + _available_block(0) {} ~BroadcastDependency() override = default; + std::string debug_string(int indentation_level = 0) override { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, + "{}{}: id={}, block task = {}, ready={}, _available_block = {}", + std::string(indentation_level * 2, ' '), _name, _node_id, + _blocked_task.size(), _ready, _available_block.load()); + return fmt::to_string(debug_string_buffer); + } + void set_available_block(int available_block) { _available_block = available_block; } void return_available_block() { @@ -128,8 +138,8 @@ private: class LocalExchangeChannelDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(LocalExchangeChannelDependency); - LocalExchangeChannelDependency(int id, int node_id) - : Dependency(id, node_id, "LocalExchangeChannelDependency", true) {} + LocalExchangeChannelDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "LocalExchangeChannelDependency", true, query_ctx) {} ~LocalExchangeChannelDependency() override = default; // TODO(gabriel): blocked by memory }; diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 00cead5b2c..3b630cfcfc 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -51,13 +51,14 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(), profile(), p.is_merging(), p.sub_plan_query_statistics_recvr()); - source_dependency = AndDependency::create_shared(_parent->operator_id(), _parent->node_id()); + source_dependency = AndDependency::create_shared(_parent->operator_id(), _parent->node_id(), + state->get_query_ctx()); const auto& queues = stream_recvr->sender_queues(); deps.resize(queues.size()); metrics.resize(queues.size()); for (size_t i = 0; i < queues.size(); i++) { deps[i] = ExchangeDataDependency::create_shared(_parent->operator_id(), _parent->node_id(), - queues[i]); + state->get_query_ctx(), queues[i]); queues[i]->set_dependency(deps[i]); source_dependency->add_child(deps[i]); } diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 4a2bae8298..8e745def1b 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -53,9 +53,9 @@ public: struct ExchangeDataDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(ExchangeDataDependency); - ExchangeDataDependency(int id, int node_id, + ExchangeDataDependency(int id, int node_id, QueryContext* query_ctx, vectorized::VDataStreamRecvr::SenderQueue* sender_queue) - : Dependency(id, node_id, "DataDependency") {} + : Dependency(id, node_id, "DataDependency", query_ctx) {} }; class ExchangeSourceOperatorX; diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 369ad607c6..819575211c 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -32,7 +32,8 @@ namespace doris::pipeline { Status FileScanLocalState::_init_scanners(std::list* scanners) { if (_scan_ranges.empty()) { - Base::_scan_dependency->set_eos(); + _eos = true; + _scan_dependency->set_ready(); return Status::OK(); } @@ -95,7 +96,7 @@ Status FileScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { Status FileScanLocalState::_process_conjuncts() { RETURN_IF_ERROR(ScanLocalState::_process_conjuncts()); - if (Base::_scan_dependency->eos()) { + if (Base::_eos) { return Status::OK(); } // TODO: Push conjuncts down to reader. diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 53b20f53cd..41b030b4e1 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -48,8 +48,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); - _shared_hash_table_dependency = - SharedHashTableDependency::create_shared(_parent->operator_id(), _parent->node_id()); + _shared_hash_table_dependency = SharedHashTableDependency::create_shared( + _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); auto& p = _parent->cast(); _shared_state->join_op_variants = p._join_op_variants; if (p._is_broadcast_join && state->enable_share_hash_table_for_broadcast_join()) { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index e3f10a1feb..b0618d5992 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -49,16 +49,16 @@ class HashJoinBuildSinkOperatorX; class SharedHashTableDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(SharedHashTableDependency); - SharedHashTableDependency(int id, int node_id) - : Dependency(id, node_id, "SharedHashTableDependency", true) {} + SharedHashTableDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "SharedHashTableDependency", true, query_ctx) {} ~SharedHashTableDependency() override = default; }; class HashJoinBuildSinkDependency final : public Dependency { public: using SharedState = HashJoinSharedState; - HashJoinBuildSinkDependency(int id, int node_id) - : Dependency(id, node_id, "HashJoinBuildSinkDependency", true) {} + HashJoinBuildSinkDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "HashJoinBuildSinkDependency", true, query_ctx) {} ~HashJoinBuildSinkDependency() override = default; }; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 64f3cce2d7..583bba1b00 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -64,8 +64,8 @@ using HashTableCtxVariants = std::variant< class HashJoinProbeDependency final : public Dependency { public: using SharedState = HashJoinSharedState; - HashJoinProbeDependency(int id, int node_id) - : Dependency(id, node_id, "HashJoinProbeDependency") {} + HashJoinProbeDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "HashJoinProbeDependency", query_ctx) {} ~HashJoinProbeDependency() override = default; }; diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp b/be/src/pipeline/exec/meta_scan_operator.cpp index 2de19bb2ce..749fbcf333 100644 --- a/be/src/pipeline/exec/meta_scan_operator.cpp +++ b/be/src/pipeline/exec/meta_scan_operator.cpp @@ -22,7 +22,7 @@ namespace doris::pipeline { Status MetaScanLocalState::_init_scanners(std::list* scanners) { - if (Base::_scan_dependency->eos()) { + if (Base::_eos) { return Status::OK(); } diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h b/be/src/pipeline/exec/multi_cast_data_stream_sink.h index c9b3dcb479..a2ad07e529 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h @@ -44,8 +44,8 @@ public: class MultiCastSinkDependency final : public Dependency { public: using SharedState = MultiCastSharedState; - MultiCastSinkDependency(int id, int node_id) - : Dependency(id, node_id, "MultiCastSinkDependency", true) {} + MultiCastSinkDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "MultiCastSinkDependency", true, query_ctx) {} ~MultiCastSinkDependency() override = default; }; diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 6c3a4cfbbc..86034a76ce 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -95,8 +95,8 @@ private: class MultiCastSourceDependency final : public Dependency { public: using SharedState = MultiCastSharedState; - MultiCastSourceDependency(int id, int node_id) - : Dependency(id, node_id, "MultiCastSourceDependency") {} + MultiCastSourceDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "MultiCastSourceDependency", query_ctx) {} ~MultiCastSourceDependency() override = default; }; diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index a02fb8ec1a..0097b75c0a 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -47,8 +47,8 @@ public: class NestedLoopJoinBuildSinkDependency final : public Dependency { public: using SharedState = NestedLoopJoinSharedState; - NestedLoopJoinBuildSinkDependency(int id, int node_id) - : Dependency(id, node_id, "NestedLoopJoinBuildSinkDependency", true) {} + NestedLoopJoinBuildSinkDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "NestedLoopJoinBuildSinkDependency", true, query_ctx) {} ~NestedLoopJoinBuildSinkDependency() override = default; }; diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 34f123ba32..d7a9b54e89 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -54,8 +54,8 @@ public: class NestedLoopJoinProbeDependency final : public Dependency { public: using SharedState = NestedLoopJoinSharedState; - NestedLoopJoinProbeDependency(int id, int node_id) - : Dependency(id, node_id, "NestedLoopJoinProbeDependency") {} + NestedLoopJoinProbeDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "NestedLoopJoinProbeDependency", query_ctx) {} ~NestedLoopJoinProbeDependency() override = default; }; diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index acda79bdfb..059f961501 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -133,7 +133,7 @@ Status OlapScanLocalState::_init_profile() { Status OlapScanLocalState::_process_conjuncts() { SCOPED_TIMER(_process_conjunct_timer); RETURN_IF_ERROR(ScanLocalState::_process_conjuncts()); - if (ScanLocalState::_scan_dependency->eos()) { + if (ScanLocalState::_eos) { return Status::OK(); } RETURN_IF_ERROR(_build_key_ranges_and_filters()); @@ -213,7 +213,8 @@ bool OlapScanLocalState::_storage_no_merge() { Status OlapScanLocalState::_init_scanners(std::list* scanners) { if (_scan_ranges.empty()) { - ScanLocalState::_scan_dependency->set_eos(); + _eos = true; + _scan_dependency->set_ready(); return Status::OK(); } SCOPED_TIMER(_scanner_init_timer); @@ -408,7 +409,8 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() { iter->second)); } if (eos) { - ScanLocalState::_scan_dependency->set_eos(); + _eos = true; + _scan_dependency->set_ready(); } for (auto& iter : _colname_to_value_range) { diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 168d499e66..d7a2bc4c92 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -18,11 +18,10 @@ #include "partition_sort_sink_operator.h" #include "common/status.h" +#include "partition_sort_source_operator.h" #include "vec/common/hash_table/hash.h" -namespace doris { - -namespace pipeline { +namespace doris::pipeline { OperatorPtr PartitionSortSinkOperatorBuilder::build_operator() { return std::make_shared(this, _node); @@ -154,7 +153,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* COUNTER_SET(local_state._hash_table_size_counter, int64_t(local_state._num_partition)); //so all data from child have sink completed - local_state._dependency->set_eos(); + ((PartitionSortSourceDependency*)local_state._shared_state->source_dep)->set_always_ready(); } return Status::OK(); @@ -291,5 +290,4 @@ void PartitionSortSinkLocalState::_init_hash_method() { } } -} // namespace pipeline -} // namespace doris +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index a77df27cac..dbfb7ad5d0 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -53,8 +53,8 @@ public: class PartitionSortSinkDependency final : public Dependency { public: using SharedState = PartitionSortNodeSharedState; - PartitionSortSinkDependency(int id, int node_id) - : Dependency(id, node_id, "PartitionSortSinkDependency", true) {} + PartitionSortSinkDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "PartitionSortSinkDependency", true, query_ctx) {} ~PartitionSortSinkDependency() override = default; }; diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h b/be/src/pipeline/exec/partition_sort_source_operator.h index 0ef89c5068..df22bde636 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.h +++ b/be/src/pipeline/exec/partition_sort_source_operator.h @@ -52,9 +52,27 @@ public: class PartitionSortSourceDependency final : public Dependency { public: using SharedState = PartitionSortNodeSharedState; - PartitionSortSourceDependency(int id, int node_id) - : Dependency(id, node_id, "PartitionSortSourceDependency") {} + PartitionSortSourceDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "PartitionSortSourceDependency", query_ctx) {} ~PartitionSortSourceDependency() override = default; + + void block() override { + if (_always_ready) { + return; + } + Dependency::block(); + } + + void set_always_ready() { + if (_always_ready) { + return; + } + _always_ready = true; + set_ready(); + } + +private: + std::atomic _always_ready {false}; }; class PartitionSortSourceOperatorX; diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index 7f6d1673e9..8c314b995b 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -62,8 +62,8 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( state->fragment_instance_id(), vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true, state->execution_timeout())); - _result_sink_dependency = - ResultSinkDependency::create_shared(_parent->operator_id(), _parent->node_id()); + _result_sink_dependency = ResultSinkDependency::create_shared( + _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1); _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); ((PipBufferControlBlock*)_sender.get())->set_dependency(_result_sink_dependency); diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index e7af819e62..eedd2d4c05 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -46,8 +46,8 @@ public: class ResultSinkDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(ResultSinkDependency); - ResultSinkDependency(int id, int node_id) - : Dependency(id, node_id, "ResultSinkDependency", true) {} + ResultSinkDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "ResultSinkDependency", true, query_ctx) {} ~ResultSinkDependency() override = default; }; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 15b77af6ff..93c1eb9f98 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -123,7 +123,8 @@ Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) RETURN_IF_ERROR(RuntimeFilterConsumer::init(state)); _scan_dependency = ScanDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(), - PipelineXLocalState<>::_parent->node_id()); + PipelineXLocalState<>::_parent->node_id(), + state->get_query_ctx()); auto& p = _parent->cast(); set_scan_ranges(state, info.scan_ranges); @@ -147,10 +148,7 @@ Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) RETURN_IF_ERROR(_init_profile()); // if you want to add some profile in scan node, even it have not new VScanner object // could add here, not in the _init_profile() function - _get_next_timer = ADD_TIMER(_runtime_profile, "GetNextTime"); - _prepare_rf_timer(_runtime_profile.get()); - _alloc_resource_timer = ADD_TIMER(_runtime_profile, "AllocateResourceTime"); static const std::string timer_name = "WaitForDependencyTime"; _wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name); @@ -171,10 +169,10 @@ Status ScanLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(_acquire_runtime_filter()); RETURN_IF_ERROR(_process_conjuncts()); - auto status = _scan_dependency->eos() ? Status::OK() : _prepare_scanners(); + auto status = _eos ? Status::OK() : _prepare_scanners(); if (_scanner_ctx) { _finish_dependency->block(); - DCHECK(!_scan_dependency->eos() && _num_scanners->value() > 0); + DCHECK(!_eos && _num_scanners->value() > 0); RETURN_IF_ERROR(_scanner_ctx->init()); RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get())); } @@ -262,7 +260,8 @@ Status ScanLocalState::_normalize_conjuncts() { std::visit( [&](auto&& range) { if (range.is_empty_value_range()) { - _scan_dependency->set_eos(); + _eos = true; + _scan_dependency->set_ready(); } }, it.second.second); @@ -561,7 +560,8 @@ Status ScanLocalState::_eval_const_conjuncts(vectorized::VExpr* vexpr, constant_val = const_cast(const_column->get_data_at(0).data); if (constant_val == nullptr || !*reinterpret_cast(constant_val)) { *pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE; - _scan_dependency->set_eos(); + _eos = true; + _scan_dependency->set_ready(); } } else if (const vectorized::ColumnVector* bool_column = check_and_get_column>( @@ -578,7 +578,8 @@ Status ScanLocalState::_eval_const_conjuncts(vectorized::VExpr* vexpr, constant_val = const_cast(bool_column->get_data_at(0).data); if (constant_val == nullptr || !*reinterpret_cast(constant_val)) { *pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE; - _scan_dependency->set_eos(); + _eos = true; + _scan_dependency->set_ready(); } } else { LOG(WARNING) << "Constant predicate in scan node should return a bool column with " @@ -775,7 +776,8 @@ Status ScanLocalState::_normalize_not_in_and_not_eq_predicate( HybridSetBase::IteratorBase* iter = state->hybrid_set->begin(); auto fn_name = std::string(""); if (!is_fixed_range && state->null_in_set) { - _scan_dependency->set_eos(); + _eos = true; + _scan_dependency->set_ready(); } while (iter->has_next()) { // column not in (nullptr) is always true @@ -1168,7 +1170,8 @@ Status ScanLocalState::_prepare_scanners() { std::list scanners; RETURN_IF_ERROR(_init_scanners(&scanners)); if (scanners.empty()) { - _scan_dependency->set_eos(); + _eos = true; + _scan_dependency->set_ready(); } else { COUNTER_SET(_num_scanners, static_cast(scanners.size())); RETURN_IF_ERROR(_start_scanners(scanners)); @@ -1350,7 +1353,6 @@ template Status ScanOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { auto& local_state = get_local_state(state); - SCOPED_TIMER(local_state._get_next_timer); SCOPED_TIMER(local_state.exec_time_counter()); // in inverted index apply logic, in order to optimize query performance, // we built some temporary columns into block, these columns only used in scan node level, @@ -1376,7 +1378,7 @@ Status ScanOperatorX::get_block(RuntimeState* state, vectorized: } } - if (local_state._scan_dependency->eos()) { + if (local_state._eos) { source_state = SourceState::FINISHED; return Status::OK(); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 06a6c61129..23adfffc3a 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -59,8 +59,8 @@ public: class ScanDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(ScanDependency); - ScanDependency(int id, int node_id) - : Dependency(id, node_id, "ScanDependency"), _scanner_ctx(nullptr) {} + ScanDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "ScanDependency", query_ctx), _scanner_ctx(nullptr) {} // TODO(gabriel): [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override { @@ -71,7 +71,7 @@ public: return Dependency::is_blocked_by(task); } - bool push_to_blocking_queue() override { return true; } + bool push_to_blocking_queue() const override { return true; } void block() override { if (_scanner_done) { @@ -384,9 +384,7 @@ protected: // "_colname_to_value_range" and in "_not_in_value_ranges" std::vector _not_in_value_ranges; - RuntimeProfile::Counter* _get_next_timer = nullptr; - RuntimeProfile::Counter* _alloc_resource_timer = nullptr; - RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr; + bool _eos = false; doris::Mutex _block_lock; }; diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index cd1dbd6267..89a2ab3bb3 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -70,8 +70,8 @@ private: class SetProbeSinkDependency final : public Dependency { public: using SharedState = SetSharedState; - SetProbeSinkDependency(int id, int node_id) - : Dependency(id, node_id, "SetProbeSinkDependency", true) {} + SetProbeSinkDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "SetProbeSinkDependency", true, query_ctx) {} ~SetProbeSinkDependency() override = default; void set_cur_child_id(int id) { diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index e318c354c8..b8ca789b78 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -63,7 +63,8 @@ private: class SetSinkDependency final : public Dependency { public: using SharedState = SetSharedState; - SetSinkDependency(int id, int node_id) : Dependency(id, node_id, "SetSinkDependency", true) {} + SetSinkDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "SetSinkDependency", true, query_ctx) {} ~SetSinkDependency() override = default; void set_cur_child_id(int id) { diff --git a/be/src/pipeline/exec/set_source_operator.h b/be/src/pipeline/exec/set_source_operator.h index cc7275444c..44800f23f4 100644 --- a/be/src/pipeline/exec/set_source_operator.h +++ b/be/src/pipeline/exec/set_source_operator.h @@ -56,7 +56,8 @@ public: class SetSourceDependency final : public Dependency { public: using SharedState = SetSharedState; - SetSourceDependency(int id, int node_id) : Dependency(id, node_id, "SetSourceDependency") {} + SetSourceDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "SetSourceDependency", query_ctx) {} ~SetSourceDependency() override = default; }; diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 67305aad69..8730780b54 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -48,7 +48,8 @@ public: class SortSinkDependency final : public Dependency { public: using SharedState = SortSharedState; - SortSinkDependency(int id, int node_id) : Dependency(id, node_id, "SortSinkDependency", true) {} + SortSinkDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "SortSinkDependency", true, query_ctx) {} ~SortSinkDependency() override = default; }; diff --git a/be/src/pipeline/exec/sort_source_operator.h b/be/src/pipeline/exec/sort_source_operator.h index 3b615a58be..efbddfd105 100644 --- a/be/src/pipeline/exec/sort_source_operator.h +++ b/be/src/pipeline/exec/sort_source_operator.h @@ -48,7 +48,8 @@ public: class SortSourceDependency final : public Dependency { public: using SharedState = SortSharedState; - SortSourceDependency(int id, int node_id) : Dependency(id, node_id, "SortSourceDependency") {} + SortSourceDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "SortSourceDependency", query_ctx) {} ~SortSourceDependency() override = default; }; diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index e135566417..f3031dd101 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -69,8 +69,8 @@ private: class UnionSinkDependency final : public Dependency { public: using SharedState = UnionSharedState; - UnionSinkDependency(int id, int node_id) - : Dependency(id, node_id, "UnionSinkDependency", true) {} + UnionSinkDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "UnionSinkDependency", true, query_ctx) {} ~UnionSinkDependency() override = default; void block() override {} }; diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index fc1dd124a0..d824f9db7a 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -115,8 +115,8 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { DCHECK(deps.size() == 1); DCHECK(deps.front() == nullptr); //child_count == 0 , we need to creat a UnionDependency - deps.front() = - std::make_shared(_parent->operator_id(), _parent->node_id()); + deps.front() = std::make_shared( + _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); ((UnionSourceDependency*)deps.front().get())->set_shared_state(ss); } RETURN_IF_ERROR(Base::init(state, info)); diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 0c846b828f..8b7060884e 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -72,7 +72,8 @@ private: class UnionSourceDependency final : public Dependency { public: using SharedState = UnionSharedState; - UnionSourceDependency(int id, int node_id) : Dependency(id, node_id, "UnionSourceDependency") {} + UnionSourceDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "UnionSourceDependency", query_ctx) {} ~UnionSourceDependency() override = default; [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override { @@ -85,7 +86,7 @@ public: } return this; } - bool push_to_blocking_queue() override { return true; } + bool push_to_blocking_queue() const override { return true; } void block() override {} }; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index a302dc7c34..0b26c9d215 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -104,6 +104,16 @@ inline const char* get_state_name(PipelineTaskState idx) { __builtin_unreachable(); } +inline bool is_final_state(PipelineTaskState idx) { + switch (idx) { + case PipelineTaskState::FINISHED: + case PipelineTaskState::CANCELED: + return true; + default: + return false; + } +} + class TaskQueue; class PriorityTaskQueue; diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index c149da54fd..00a7d012c9 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -28,7 +28,7 @@ namespace doris::pipeline { -void Dependency::add_block_task(PipelineXTask* task) { +void Dependency::_add_block_task(PipelineXTask* task) { DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] != task) << "Duplicate task: " << task->debug_string(); _blocked_task.push_back(task); @@ -54,16 +54,19 @@ void Dependency::set_ready() { } Dependency* Dependency::is_blocked_by(PipelineXTask* task) { - if (config::enable_fuzzy_mode && !_ready && _should_log(_watcher.elapsed_time())) { - LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " - << _node_id << " block tasks: " << _blocked_task.size() - << "task: " << (task ? task->fragment_context()->debug_string() : ""); + std::unique_lock lc(_task_lock); + auto ready = _ready.load() || _is_cancelled(); + if (!ready && !push_to_blocking_queue() && task) { + _add_block_task(task); } + return ready ? nullptr : this; +} +Dependency* FinishDependency::is_blocked_by(PipelineXTask* task) { std::unique_lock lc(_task_lock); auto ready = _ready.load(); if (!ready && !push_to_blocking_queue() && task) { - add_block_task(task); + _add_block_task(task); } return ready ? nullptr : this; } @@ -73,9 +76,9 @@ Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) { return nullptr; } std::unique_lock lc(_task_lock); - if (*_blocked_by_rf) { + if (*_blocked_by_rf && !_is_cancelled()) { if (LIKELY(task)) { - add_block_task(task); + _add_block_task(task); } return this; } @@ -90,6 +93,15 @@ std::string Dependency::debug_string(int indentation_level) { return fmt::to_string(debug_string_buffer); } +std::string RuntimeFilterDependency::debug_string(int indentation_level) { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, + "{}{}: id={}, block task = {}, ready={}, _filters = {}, _blocked_by_rf = {}", + std::string(indentation_level * 2, ' '), _name, _node_id, _blocked_task.size(), + _ready, _filters.load(), _blocked_by_rf ? _blocked_by_rf->load() : false); + return fmt::to_string(debug_string_buffer); +} + std::string AndDependency::debug_string(int indentation_level) { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[", diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 662e789d65..13ae1cd9d7 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -61,30 +61,31 @@ struct BasicSharedState { class Dependency : public std::enable_shared_from_this { public: - Dependency(int id, int node_id, std::string name) + Dependency(int id, int node_id, std::string name, QueryContext* query_ctx) : _id(id), _node_id(node_id), _name(std::move(name)), _is_write_dependency(false), - _ready(false) {} - Dependency(int id, int node_id, std::string name, bool ready) + _ready(false), + _query_ctx(query_ctx) {} + Dependency(int id, int node_id, std::string name, bool ready, QueryContext* query_ctx) : _id(id), _node_id(node_id), _name(std::move(name)), _is_write_dependency(true), - _ready(ready) {} + _ready(ready), + _query_ctx(query_ctx) {} virtual ~Dependency() = default; [[nodiscard]] int id() const { return _id; } [[nodiscard]] virtual std::string name() const { return _name; } - void set_parent(std::weak_ptr parent) { _parent = parent; } void add_child(std::shared_ptr child) { _children.push_back(child); } std::shared_ptr shared_state() { return _shared_state; } void set_shared_state(std::shared_ptr shared_state) { _shared_state = shared_state; } virtual std::string debug_string(int indentation_level = 0); - virtual bool push_to_blocking_queue() { return false; } + virtual bool push_to_blocking_queue() const { return false; } // Start the watcher. We use it to count how long this dependency block the current pipeline task. void start_watcher() { @@ -104,26 +105,9 @@ public: DCHECK(_shared_state->source_dep != nullptr) << debug_string(); _shared_state->source_dep->set_ready(); } - void set_eos() { - if (_eos) { - return; - } - _eos = true; - set_ready(); - if (_is_write_dependency && _shared_state->source_dep != nullptr) { - _shared_state->source_dep->set_eos(); - } - } - bool eos() const { return _eos.load(); } // Notify downstream pipeline tasks this dependency is blocked. - virtual void block() { - if (_eos) { - return; - } - _ready = false; - } - void add_block_task(PipelineXTask* task); + virtual void block() { _ready = false; } protected: bool _should_log(uint64_t cur_time) { @@ -136,14 +120,19 @@ protected: _last_log_time = cur_time; return true; } + void _add_block_task(PipelineXTask* task); + bool _is_cancelled() const { + return push_to_blocking_queue() ? false : _query_ctx->is_cancelled(); + } const int _id; const int _node_id; const std::string _name; const bool _is_write_dependency; + std::atomic _ready; + const QueryContext* _query_ctx; std::shared_ptr _shared_state {nullptr}; - std::atomic _ready; MonotonicStopWatch _watcher; std::weak_ptr _parent; std::list> _children; @@ -151,7 +140,6 @@ protected: uint64_t _last_log_time = 0; std::mutex _task_lock; std::vector _blocked_task; - std::atomic _eos {false}; }; struct FakeSharedState : public BasicSharedState {}; @@ -159,11 +147,21 @@ struct FakeSharedState : public BasicSharedState {}; struct FakeDependency final : public Dependency { public: using SharedState = FakeSharedState; - FakeDependency(int id, int node_id) : Dependency(id, node_id, "FakeDependency") {} + FakeDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "FakeDependency", query_ctx) {} [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override { return nullptr; } }; +struct FinishDependency final : public Dependency { +public: + using SharedState = FakeSharedState; + FinishDependency(int id, int node_id, std::string name, QueryContext* query_ctx) + : Dependency(id, node_id, name, true, query_ctx) {} + + [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override; +}; + class RuntimeFilterDependency; class RuntimeFilterTimer { public: @@ -200,15 +198,17 @@ private: class RuntimeFilterDependency final : public Dependency { public: - RuntimeFilterDependency(int id, int node_id, std::string name) - : Dependency(id, node_id, name) {} - Dependency* is_blocked_by(PipelineXTask* task); + RuntimeFilterDependency(int id, int node_id, std::string name, QueryContext* query_ctx) + : Dependency(id, node_id, name, query_ctx) {} + Dependency* is_blocked_by(PipelineXTask* task) override; void add_filters(IRuntimeFilter* runtime_filter); void sub_filters(); void set_blocked_by_rf(std::shared_ptr blocked_by_rf) { _blocked_by_rf = blocked_by_rf; } + std::string debug_string(int indentation_level = 0) override; + protected: std::atomic_int _filters; std::shared_ptr _blocked_by_rf; @@ -218,7 +218,8 @@ class AndDependency final : public Dependency { public: using SharedState = FakeSharedState; ENABLE_FACTORY_CREATOR(AndDependency); - AndDependency(int id, int node_id) : Dependency(id, node_id, "AndDependency") {} + AndDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "AndDependency", query_ctx) {} [[nodiscard]] std::string name() const override { fmt::memory_buffer debug_string_buffer; @@ -371,8 +372,8 @@ class AsyncWriterDependency final : public Dependency { public: using SharedState = FakeSharedState; ENABLE_FACTORY_CREATOR(AsyncWriterDependency); - AsyncWriterDependency(int id, int node_id) - : Dependency(id, node_id, "AsyncWriterDependency", true) {} + AsyncWriterDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "AsyncWriterDependency", true, query_ctx) {} ~AsyncWriterDependency() override = default; }; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index c6c28fbe8d..b6ce3fbeb9 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -25,8 +25,8 @@ namespace doris::pipeline { struct LocalExchangeSinkDependency final : public Dependency { public: using SharedState = LocalExchangeSharedState; - LocalExchangeSinkDependency(int id, int node_id) - : Dependency(id, node_id, "LocalExchangeSinkDependency", true) {} + LocalExchangeSinkDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "LocalExchangeSinkDependency", true, query_ctx) {} ~LocalExchangeSinkDependency() override = default; }; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h index dfc89a86c9..ebf18d9a24 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h @@ -25,8 +25,8 @@ namespace doris::pipeline { struct LocalExchangeSourceDependency final : public Dependency { public: using SharedState = LocalExchangeSharedState; - LocalExchangeSourceDependency(int id, int node_id) - : Dependency(id, node_id, "LocalExchangeSourceDependency") {} + LocalExchangeSourceDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "LocalExchangeSourceDependency", query_ctx) {} ~LocalExchangeSourceDependency() override = default; }; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 4e3fec7abb..050b198a22 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -72,18 +72,12 @@ namespace doris::pipeline { -std::string PipelineXLocalStateBase::debug_string(int indentation_level) const { - return _parent->debug_string(indentation_level); -} - template std::string PipelineXLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}", - PipelineXLocalStateBase::debug_string(indentation_level)); + fmt::format_to(debug_string_buffer, "{}", _parent->debug_string(indentation_level)); if constexpr (!std::is_same_v) { - fmt::format_to(debug_string_buffer, "\nDependency: \n {}", - _dependency->debug_string(indentation_level + 1)); + fmt::format_to(debug_string_buffer, " Dependency: {}", _dependency->debug_string()); } return fmt::to_string(debug_string_buffer); } @@ -91,12 +85,9 @@ std::string PipelineXLocalState::debug_string(int indentation_le template std::string PipelineXSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "{}", - PipelineXSinkLocalStateBase::debug_string(indentation_level)); + fmt::format_to(debug_string_buffer, "{}", _parent->debug_string(indentation_level)); if constexpr (!std::is_same_v) { - fmt::format_to(debug_string_buffer, "\n{}Dependency: \n {}", - std::string(indentation_level * 2, ' '), - _dependency->debug_string(indentation_level + 1)); + fmt::format_to(debug_string_buffer, ", Dependency: {}", _dependency->debug_string()); } return fmt::to_string(debug_string_buffer); } @@ -245,10 +236,6 @@ std::string DataSinkOperatorXBase::debug_string(int indentation_level) const { return fmt::to_string(debug_string_buffer); } -std::string PipelineXSinkLocalStateBase::debug_string(int indentation_level) const { - return _parent->debug_string(indentation_level); -} - std::string DataSinkOperatorXBase::debug_string(RuntimeState* state, int indentation_level) const { return state->get_sink_local_state(operator_id())->debug_string(indentation_level); } @@ -294,7 +281,8 @@ template <> inline constexpr bool NeedToCreate = false; template -void DataSinkOperatorX::get_dependency(vector& dependency) { +void DataSinkOperatorX::get_dependency(vector& dependency, + QueryContext* ctx) { std::shared_ptr ss = nullptr; if constexpr (NeedToCreate) { ss.reset(new typename LocalStateType::DependencyType::SharedState()); @@ -302,8 +290,8 @@ void DataSinkOperatorX::get_dependency(vector& d if constexpr (!std::is_same_v) { auto& dests = dests_id(); for (auto& dest_id : dests) { - dependency.push_back( - std::make_shared(dest_id, _node_id)); + dependency.push_back(std::make_shared( + dest_id, _node_id, ctx)); dependency.back()->set_shared_state(ss); } } else { @@ -312,8 +300,8 @@ void DataSinkOperatorX::get_dependency(vector& d } template -DependencySPtr OperatorX::get_dependency() { - return std::make_shared(_operator_id, _node_id); +DependencySPtr OperatorX::get_dependency(QueryContext* ctx) { + return std::make_shared(_operator_id, _node_id, ctx); } template @@ -328,8 +316,9 @@ PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* RuntimeState* state) : _parent(parent), _state(state), - _finish_dependency(new Dependency(parent->operator_id(), parent->node_id(), - parent->get_name() + "_FINISH_DEPENDENCY", true)) {} + _finish_dependency(new FinishDependency(parent->operator_id(), parent->node_id(), + parent->get_name() + "_FINISH_DEPENDENCY", + state->get_query_ctx())) {} PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent) : _num_rows_returned(0), @@ -337,10 +326,12 @@ PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB _peak_memory_usage_counter(nullptr), _parent(parent), _state(state), - _finish_dependency(new Dependency(parent->operator_id(), parent->node_id(), - parent->get_name() + "_FINISH_DEPENDENCY", true)) { + _finish_dependency(new FinishDependency(parent->operator_id(), parent->node_id(), + parent->get_name() + "_FINISH_DEPENDENCY", + state->get_query_ctx())) { _filter_dependency = std::make_shared( - parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY"); + parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", + state->get_query_ctx()); } template @@ -421,7 +412,7 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, } } else { auto& deps = info.dependencys; - deps.front() = std::make_shared(0, 0); + deps.front() = std::make_shared(0, 0, state->get_query_ctx()); _dependency = (DependencyType*)deps.front().get(); } _rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows", TUnit::UNIT, 1); @@ -499,8 +490,8 @@ Status AsyncWriterSink::init(RuntimeState* state, LocalSinkState _parent->cast()._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); } _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs)); - _async_writer_dependency = - AsyncWriterDependency::create_shared(_parent->operator_id(), _parent->node_id()); + _async_writer_dependency = AsyncWriterDependency::create_shared( + _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); _writer->set_dependency(_async_writer_dependency.get(), _finish_dependency.get()); _wait_for_dependency_timer = diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 5afa080e6d..943d8c4670 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -97,7 +97,7 @@ public: void add_num_rows_returned(int64_t delta) { _num_rows_returned += delta; } void set_num_rows_returned(int64_t value) { _num_rows_returned = value; } - [[nodiscard]] virtual std::string debug_string(int indentation_level = 0) const; + [[nodiscard]] virtual std::string debug_string(int indentation_level = 0) const = 0; virtual Dependency* dependency() { return nullptr; } @@ -176,7 +176,7 @@ public: throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name); } [[nodiscard]] std::string get_name() const override { return _op_name; } - virtual DependencySPtr get_dependency() = 0; + virtual DependencySPtr get_dependency(QueryContext* ctx) = 0; Status prepare(RuntimeState* state) override; @@ -307,7 +307,7 @@ public: return state->get_local_state(operator_id())->template cast(); } - DependencySPtr get_dependency() override; + DependencySPtr get_dependency(QueryContext* ctx) override; }; template @@ -348,7 +348,7 @@ public: virtual Status close(RuntimeState* state, Status exec_status) = 0; virtual Status try_close(RuntimeState* state, Status exec_status) = 0; - [[nodiscard]] virtual std::string debug_string(int indentation_level) const; + [[nodiscard]] virtual std::string debug_string(int indentation_level) const = 0; template TARGET& cast() { @@ -456,7 +456,7 @@ public: return reinterpret_cast(*this); } - virtual void get_dependency(std::vector& dependency) = 0; + virtual void get_dependency(std::vector& dependency, QueryContext* ctx) = 0; Status close(RuntimeState* state) override { return Status::InternalError("Should not reach here!"); @@ -551,7 +551,7 @@ public: ~DataSinkOperatorX() override = default; Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override; - void get_dependency(std::vector& dependency) override; + void get_dependency(std::vector& dependency, QueryContext* ctx) override; using LocalState = LocalStateType; [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index b087c3bcab..e47b55e491 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -131,7 +131,7 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, .tag("fragment_id", _fragment_id) .tag("reason", reason) .tag("error message", msg); - if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) { + if (_query_ctx->cancel(true, msg, Status::Cancelled(msg), _fragment_id)) { if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { FOR_EACH_RUNTIME_STATE(LOG(WARNING) << "PipelineXFragmentContext cancel instance: " << print_id(runtime_state->fragment_instance_id());) @@ -149,6 +149,11 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, // TODO pipeline incomp // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); } + for (auto& tasks : _tasks) { + for (auto& task : tasks) { + task->clear_blocking_state(); + } + } } Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 5b3524c69a..703e4862f7 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -57,9 +57,9 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta _local_exchange_state(local_exchange_state), _task_idx(task_idx) { _pipeline_task_watcher.start(); - _sink->get_dependency(_downstream_dependency); + _sink->get_dependency(_downstream_dependency, state->get_query_ctx()); for (auto& op : _operators) { - _source_dependency.insert({op->operator_id(), op->get_dependency()}); + _source_dependency.insert({op->operator_id(), op->get_dependency(state->get_query_ctx())}); } } @@ -357,21 +357,24 @@ std::string PipelineXTask::debug_string() { _opened ? _sink->debug_string(_state, _operators.size()) : _sink->debug_string(_operators.size())); fmt::format_to(debug_string_buffer, "\nRead Dependency Information: \n"); - for (size_t i = 0; i < _read_dependencies.size(); i++) { - fmt::format_to(debug_string_buffer, "{}{}\n", std::string(i * 2, ' '), - _read_dependencies[i]->debug_string()); + size_t i = 0; + for (; i < _read_dependencies.size(); i++) { + fmt::format_to(debug_string_buffer, "{}. {}\n", i, + _read_dependencies[i]->debug_string(i + 1)); } fmt::format_to(debug_string_buffer, "Write Dependency Information: \n"); - fmt::format_to(debug_string_buffer, "{}\n", _write_dependencies->debug_string()); + fmt::format_to(debug_string_buffer, "{}. {}\n", i, _write_dependencies->debug_string(1)); + i++; fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n"); - fmt::format_to(debug_string_buffer, "{}\n", _filter_dependency->debug_string()); + fmt::format_to(debug_string_buffer, "{}. {}\n", i, _filter_dependency->debug_string(1)); + i++; fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n"); - for (size_t i = 0; i < _finish_dependencies.size(); i++) { - fmt::format_to(debug_string_buffer, "{}{}\n", std::string(i * 2, ' '), - _finish_dependencies[i]->debug_string()); + for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) { + fmt::format_to(debug_string_buffer, "{}. {}\n", i, + _finish_dependencies[i]->debug_string(j + 1)); } return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index e0d0e58e65..46d006d0da 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -142,6 +142,13 @@ public: } _use_blocking_queue = false; } + void clear_blocking_state() { + if (!is_final_state(get_state()) && get_state() != PipelineTaskState::PENDING_FINISH && + _blocked_dep) { + _blocked_dep->set_ready(); + _blocked_dep = nullptr; + } + } private: Dependency* _write_blocked_dependency() { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 210de93a7e..c0153652d2 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -560,6 +560,12 @@ void FragmentMgr::remove_pipeline_context( f_context->instance_ids(ins_ids); bool all_done = q_context->countdown(ins_ids.size()); for (const auto& ins_id : ins_ids) { + { + std::lock_guard plock(q_context->pipeline_lock); + if (q_context->fragment_id_to_pipeline_ctx.contains(f_context->get_fragment_id())) { + q_context->fragment_id_to_pipeline_ctx.erase(f_context->get_fragment_id()); + } + } LOG_INFO("Removing query {} instance {}, all done? {}", print_id(query_id), print_id(ins_id), all_done); _pipeline_map.erase(ins_id); @@ -866,6 +872,10 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, _cv.notify_all(); } + { + std::lock_guard lock(query_ctx->pipeline_lock); + query_ctx->fragment_id_to_pipeline_ctx.insert({params.fragment_id, context}); + } RETURN_IF_ERROR(context->submit()); return Status::OK(); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp new file mode 100644 index 0000000000..823893dbe7 --- /dev/null +++ b/be/src/runtime/query_context.cpp @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/query_context.h" + +#include "pipeline/pipeline_fragment_context.h" + +namespace doris { + +bool QueryContext::cancel(bool v, std::string msg, Status new_status, int fragment_id) { + if (_is_cancelled) { + return false; + } + set_exec_status(new_status); + _is_cancelled.store(v); + + set_ready_to_execute(true); + { + std::lock_guard plock(pipeline_lock); + for (auto& ctx : fragment_id_to_pipeline_ctx) { + if (fragment_id == ctx.first) { + continue; + } + ctx.second->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, msg); + } + } + return true; +} +} // namespace doris diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index bb7ad20b90..49c2deb0d8 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -40,6 +40,11 @@ #include "vec/runtime/shared_scanner_controller.h" namespace doris { + +namespace pipeline { +class PipelineFragmentContext; +} // namespace pipeline + struct ReportStatusRequest { bool is_pipeline_x; const Status status; @@ -138,16 +143,7 @@ public: } [[nodiscard]] bool is_cancelled() const { return _is_cancelled.load(); } - bool cancel(bool v, std::string msg, Status new_status) { - if (_is_cancelled) { - return false; - } - set_exec_status(new_status); - _is_cancelled.store(v); - - set_ready_to_execute(true); - return true; - } + bool cancel(bool v, std::string msg, Status new_status, int fragment_id = -1); void set_exec_status(Status new_status) { if (new_status.ok()) { @@ -267,6 +263,8 @@ public: std::shared_ptr query_mem_tracker; std::vector fragment_instance_ids; + std::map> fragment_id_to_pipeline_ctx; + std::mutex pipeline_lock; // plan node id -> TFileScanRangeParams // only for file scan node diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 94bc7a7bde..d5d460e80b 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -273,7 +273,7 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) { << " #senders=" << _num_remaining_senders; if (_num_remaining_senders == 0) { if (_dependency) { - _dependency->set_eos(); + _dependency->set_ready(); } _data_arrival_cv.notify_one(); } @@ -288,7 +288,7 @@ void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) { _is_cancelled = true; _cancel_status = cancel_status; if (_dependency) { - _dependency->set_eos(); + _dependency->set_ready(); } VLOG_QUERY << "cancelled stream: _fragment_instance_id=" << print_id(_recvr->fragment_instance_id()) @@ -318,7 +318,7 @@ void VDataStreamRecvr::SenderQueue::close() { std::lock_guard l(_lock); _is_cancelled = true; if (_dependency) { - _dependency->set_eos(); + _dependency->set_ready(); } for (auto closure_pair : _pending_closures) { @@ -362,8 +362,8 @@ VDataStreamRecvr::VDataStreamRecvr( _sender_to_local_channel_dependency.resize(num_queues); for (size_t i = 0; i < num_queues; i++) { _sender_to_local_channel_dependency[i] = - pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id, - _dest_node_id); + pipeline::LocalExchangeChannelDependency::create_shared( + _dest_node_id, _dest_node_id, state->get_query_ctx()); } } _sender_queues.reserve(num_queues);