From 552091f21ffb8d97fe390e97c7875048ccae446d Mon Sep 17 00:00:00 2001 From: Mryange <59914473+Mryange@users.noreply.github.com> Date: Wed, 25 Oct 2023 10:13:17 +0800 Subject: [PATCH] [performance](pipelineX) optimize pipelineX (#25713) --- .../exec/aggregation_sink_operator.cpp | 5 +- .../pipeline/exec/aggregation_sink_operator.h | 4 +- .../exec/aggregation_source_operator.cpp | 9 +- .../exec/aggregation_source_operator.h | 4 +- .../pipeline/exec/analytic_sink_operator.cpp | 11 +- be/src/pipeline/exec/analytic_sink_operator.h | 5 +- .../exec/analytic_source_operator.cpp | 9 +- .../pipeline/exec/analytic_source_operator.h | 4 +- .../exec/assert_num_rows_operator.cpp | 4 +- .../pipeline/exec/assert_num_rows_operator.h | 3 +- be/src/pipeline/exec/datagen_operator.cpp | 12 +- be/src/pipeline/exec/datagen_operator.h | 3 +- ...ct_streaming_aggregation_sink_operator.cpp | 3 +- ...inct_streaming_aggregation_sink_operator.h | 7 +- ..._streaming_aggregation_source_operator.cpp | 3 +- ...ct_streaming_aggregation_source_operator.h | 2 +- be/src/pipeline/exec/empty_set_operator.h | 5 +- be/src/pipeline/exec/es_scan_operator.cpp | 4 +- be/src/pipeline/exec/es_scan_operator.h | 3 +- .../pipeline/exec/exchange_sink_operator.cpp | 24 ++-- be/src/pipeline/exec/exchange_sink_operator.h | 5 +- .../exec/exchange_source_operator.cpp | 16 +-- .../pipeline/exec/exchange_source_operator.h | 7 +- be/src/pipeline/exec/file_scan_operator.cpp | 5 +- be/src/pipeline/exec/file_scan_operator.h | 7 +- be/src/pipeline/exec/hashjoin_build_sink.cpp | 22 ++-- be/src/pipeline/exec/hashjoin_build_sink.h | 12 +- .../pipeline/exec/hashjoin_probe_operator.cpp | 11 +- .../pipeline/exec/hashjoin_probe_operator.h | 4 +- be/src/pipeline/exec/jdbc_scan_operator.cpp | 4 +- be/src/pipeline/exec/jdbc_scan_operator.h | 3 +- .../exec/jdbc_table_sink_operator.cpp | 11 +- .../pipeline/exec/jdbc_table_sink_operator.h | 4 +- .../exec/join_build_sink_operator.cpp | 4 +- .../pipeline/exec/join_build_sink_operator.h | 3 +- be/src/pipeline/exec/join_probe_operator.cpp | 4 +- be/src/pipeline/exec/join_probe_operator.h | 3 +- be/src/pipeline/exec/meta_scan_operator.cpp | 4 +- be/src/pipeline/exec/meta_scan_operator.h | 3 +- .../exec/multi_cast_data_stream_source.cpp | 1 + .../exec/multi_cast_data_stream_source.h | 10 +- .../exec/nested_loop_join_build_operator.cpp | 4 +- .../exec/nested_loop_join_build_operator.h | 2 +- .../exec/nested_loop_join_probe_operator.cpp | 11 +- .../exec/nested_loop_join_probe_operator.h | 3 +- be/src/pipeline/exec/olap_scan_operator.cpp | 4 +- be/src/pipeline/exec/olap_scan_operator.h | 3 +- .../pipeline/exec/olap_table_sink_operator.h | 12 +- .../exec/partition_sort_sink_operator.cpp | 5 +- .../exec/partition_sort_sink_operator.h | 2 +- .../exec/partition_sort_source_operator.cpp | 5 - .../exec/partition_sort_source_operator.h | 6 +- be/src/pipeline/exec/repeat_operator.cpp | 6 +- be/src/pipeline/exec/repeat_operator.h | 3 +- .../exec/result_file_sink_operator.cpp | 16 +-- .../pipeline/exec/result_file_sink_operator.h | 8 +- be/src/pipeline/exec/result_sink_operator.cpp | 17 +-- be/src/pipeline/exec/result_sink_operator.h | 7 +- be/src/pipeline/exec/scan_operator.cpp | 26 ++-- be/src/pipeline/exec/scan_operator.h | 17 +-- be/src/pipeline/exec/schema_scan_operator.cpp | 4 +- be/src/pipeline/exec/schema_scan_operator.h | 3 +- be/src/pipeline/exec/select_operator.h | 5 +- .../pipeline/exec/set_probe_sink_operator.cpp | 11 +- .../pipeline/exec/set_probe_sink_operator.h | 4 +- be/src/pipeline/exec/set_sink_operator.h | 2 +- be/src/pipeline/exec/set_source_operator.h | 12 +- be/src/pipeline/exec/sort_sink_operator.cpp | 4 +- be/src/pipeline/exec/sort_sink_operator.h | 3 +- be/src/pipeline/exec/sort_source_operator.cpp | 9 +- be/src/pipeline/exec/sort_source_operator.h | 5 +- .../streaming_aggregation_sink_operator.cpp | 5 +- .../streaming_aggregation_sink_operator.h | 8 +- .../streaming_aggregation_source_operator.cpp | 3 +- .../streaming_aggregation_source_operator.h | 2 +- .../pipeline/exec/table_function_operator.cpp | 4 +- .../pipeline/exec/table_function_operator.h | 5 +- .../pipeline/exec/union_source_operator.cpp | 10 +- be/src/pipeline/exec/union_source_operator.h | 17 +-- be/src/pipeline/pipeline_x/dependency.h | 74 ++++++++---- be/src/pipeline/pipeline_x/operator.cpp | 70 ++++++----- be/src/pipeline/pipeline_x/operator.h | 111 ++++++++++-------- .../pipeline_x_fragment_context.cpp | 100 +++++++++------- .../pipeline_x/pipeline_x_fragment_context.h | 6 +- .../pipeline/pipeline_x/pipeline_x_task.cpp | 29 ++++- be/src/pipeline/pipeline_x/pipeline_x_task.h | 11 +- be/src/runtime/runtime_state.cpp | 34 +++--- be/src/runtime/runtime_state.h | 17 ++- 88 files changed, 507 insertions(+), 485 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 013c7854f8..403343b76e 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -657,9 +657,10 @@ Status AggSinkLocalState::try_spill_disk(bool eos) { } template -AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, +AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, + const TPlanNode& tnode, const DescriptorTbl& descs) - : DataSinkOperatorX(tnode.node_id), + : DataSinkOperatorX(operator_id, tnode.node_id), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _intermediate_tuple_desc(nullptr), _output_tuple_id(tnode.agg_node.output_tuple_id), diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index e1223f5c7e..bee007b3b4 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -312,7 +312,8 @@ public: template class AggSinkOperatorX : public DataSinkOperatorX { public: - AggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, + const DescriptorTbl& descs); ~AggSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", @@ -328,6 +329,7 @@ public: SourceState source_state) override; using DataSinkOperatorX::id; + using DataSinkOperatorX::operator_id; protected: using LocalState = LocalStateType; diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 8d959b525e..69900fc8f2 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -500,9 +500,9 @@ Status AggLocalState::_get_without_key_result(RuntimeState* state, vectorized::B return Status::OK(); } -AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, +AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) - : Base(pool, tnode, descs), + : Base(pool, tnode, operator_id, descs), _needs_finalize(tnode.agg_node.need_finalize), _without_key(tnode.agg_node.grouping_exprs.empty()) {} @@ -552,10 +552,5 @@ Status AggLocalState::close(RuntimeState* state) { return Base::close(state); } -Dependency* AggSourceOperatorX::wait_for_dependency(RuntimeState* state) { - CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._dependency->read_blocked_by(); -} - } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index 2d5ca45dec..b7bbabd9a3 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -116,9 +116,9 @@ protected: class AggSourceOperatorX : public OperatorX { public: using Base = OperatorX; - AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); ~AggSourceOperatorX() = default; - Dependency* wait_for_dependency(RuntimeState* state) override; Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override; diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index d839be0dc1..7260ae9d77 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -66,9 +66,9 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf return Status::OK(); } -AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) - : DataSinkOperatorX(tnode.node_id), +AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, + const TPlanNode& tnode, const DescriptorTbl& descs) + : DataSinkOperatorX(operator_id, tnode.node_id), _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id ? tnode.analytic_node.buffered_tuple_id : 0) {} @@ -122,11 +122,6 @@ Status AnalyticSinkOperatorX::prepare(RuntimeState* state) { return Status::OK(); } -WriteDependency* AnalyticSinkOperatorX::wait_for_dependency(RuntimeState* state) { - CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._dependency->write_blocked_by(); -} - Status AnalyticSinkOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(vectorized::VExpr::open(_partition_by_eq_expr_ctxs, state)); RETURN_IF_ERROR(vectorized::VExpr::open(_order_by_eq_expr_ctxs, state)); diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index c858392586..a6c0fa745a 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -68,7 +68,8 @@ private: class AnalyticSinkOperatorX final : public DataSinkOperatorX { public: - AnalyticSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, + const DescriptorTbl& descs); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX::_name); @@ -82,8 +83,6 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - WriteDependency* wait_for_dependency(RuntimeState* state) override; - private: Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr, vectorized::IColumn* dst_column, size_t length); diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 5397a0a851..a139d76fdd 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -331,8 +331,8 @@ Status AnalyticLocalState::output_current_block(vectorized::Block* block) { } AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) - : OperatorX(pool, tnode, descs), + int operator_id, const DescriptorTbl& descs) + : OperatorX(pool, tnode, operator_id, descs), _window(tnode.analytic_node.window), _intermediate_tuple_id(tnode.analytic_node.intermediate_tuple_id), _output_tuple_id(tnode.analytic_node.output_tuple_id), @@ -414,11 +414,6 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block return Status::OK(); } -Dependency* AnalyticSourceOperatorX::wait_for_dependency(RuntimeState* state) { - CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._dependency->read_blocked_by(); -} - Status AnalyticLocalState::close(RuntimeState* state) { SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(_close_timer); diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h index 873ed570ba..b75840cc18 100644 --- a/be/src/pipeline/exec/analytic_source_operator.h +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -114,8 +114,8 @@ private: class AnalyticSourceOperatorX final : public OperatorX { public: - AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - Dependency* wait_for_dependency(RuntimeState* state) override; + AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override; diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp b/be/src/pipeline/exec/assert_num_rows_operator.cpp index 375d7c0943..b2b97fee42 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.cpp +++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp @@ -24,8 +24,8 @@ OperatorPtr AssertNumRowsOperatorBuilder::build_operator() { } AssertNumRowsOperatorX::AssertNumRowsOperatorX(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) - : StreamingOperatorX(pool, tnode, descs), + int operator_id, const DescriptorTbl& descs) + : StreamingOperatorX(pool, tnode, operator_id, descs), _desired_num_rows(tnode.assert_num_rows_node.desired_num_rows), _subquery_string(tnode.assert_num_rows_node.subquery_string) { if (tnode.assert_num_rows_node.__isset.assertion) { diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h b/be/src/pipeline/exec/assert_num_rows_operator.h index 2c271be3d6..025f6f8c67 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.h +++ b/be/src/pipeline/exec/assert_num_rows_operator.h @@ -50,7 +50,8 @@ public: class AssertNumRowsOperatorX final : public StreamingOperatorX { public: - AssertNumRowsOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + AssertNumRowsOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); Status pull(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override; diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index fd5b9a716b..240e2c44e2 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -45,8 +45,8 @@ Status DataGenOperator::close(RuntimeState* state) { } DataGenSourceOperatorX::DataGenSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) - : OperatorX(pool, tnode, descs), + int operator_id, const DescriptorTbl& descs) + : OperatorX(pool, tnode, operator_id, descs), _tuple_id(tnode.data_gen_scan_node.tuple_id), _tuple_desc(nullptr), _runtime_filter_descs(tnode.runtime_filters) {} @@ -102,14 +102,14 @@ Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) { IRuntimeFilter* runtime_filter = nullptr; if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) { RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( - filter_desc, state->query_options(), p.id(), false)); + filter_desc, state->query_options(), p.node_id(), false)); RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter( - filter_desc.filter_id, p.id(), &runtime_filter)); + filter_desc.filter_id, p.node_id(), &runtime_filter)); } else { RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter( - filter_desc, state->query_options(), p.id(), false)); + filter_desc, state->query_options(), p.node_id(), false)); RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter( - filter_desc.filter_id, p.id(), &runtime_filter)); + filter_desc.filter_id, p.node_id(), &runtime_filter)); } runtime_filter->init_profile(_runtime_profile.get()); } diff --git a/be/src/pipeline/exec/datagen_operator.h b/be/src/pipeline/exec/datagen_operator.h index f2d97536a0..613955ae81 100644 --- a/be/src/pipeline/exec/datagen_operator.h +++ b/be/src/pipeline/exec/datagen_operator.h @@ -68,7 +68,8 @@ private: class DataGenSourceOperatorX final : public OperatorX { public: - DataGenSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + DataGenSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp index f0a86fcbd6..fec9f7e3c6 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp @@ -180,9 +180,10 @@ void DistinctStreamingAggSinkLocalState::_emplace_into_hash_table_to_distinct( } DistinctStreamingAggSinkOperatorX::DistinctStreamingAggSinkOperatorX(ObjectPool* pool, + int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs) - : AggSinkOperatorX(pool, tnode, descs) {} + : AggSinkOperatorX(pool, operator_id, tnode, descs) {} Status DistinctStreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(AggSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h index f0e938a745..60af15179d 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h @@ -110,16 +110,11 @@ private: class DistinctStreamingAggSinkOperatorX final : public AggSinkOperatorX { public: - DistinctStreamingAggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, + DistinctStreamingAggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - - WriteDependency* wait_for_dependency(RuntimeState* state) override { - CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._dependency->write_blocked_by(); - } }; } // namespace pipeline diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp index 89cc5f5d37..1abfb6510d 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp @@ -90,8 +90,9 @@ OperatorPtr DistinctStreamingAggSourceOperatorBuilder::build_operator() { DistinctStreamingAggSourceOperatorX::DistinctStreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, + int operator_id, const DescriptorTbl& descs) - : Base(pool, tnode, descs) { + : Base(pool, tnode, operator_id, descs) { if (tnode.agg_node.__isset.use_streaming_preaggregation) { _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; if (_is_streaming_preagg) { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h index 78edf4815f..0b4851ad97 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.h @@ -67,7 +67,7 @@ private: class DistinctStreamingAggSourceOperatorX final : public AggSourceOperatorX { public: using Base = AggSourceOperatorX; - DistinctStreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, + DistinctStreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs); ~DistinctStreamingAggSourceOperatorX() = default; diff --git a/be/src/pipeline/exec/empty_set_operator.h b/be/src/pipeline/exec/empty_set_operator.h index 538f3b0a16..126f132dd9 100644 --- a/be/src/pipeline/exec/empty_set_operator.h +++ b/be/src/pipeline/exec/empty_set_operator.h @@ -54,8 +54,9 @@ public: class EmptySetSourceOperatorX final : public OperatorX { public: - EmptySetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : OperatorX(pool, tnode, descs) {} + EmptySetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : OperatorX(pool, tnode, operator_id, descs) {} Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override; diff --git a/be/src/pipeline/exec/es_scan_operator.cpp b/be/src/pipeline/exec/es_scan_operator.cpp index e4b1c4956b..901daa9a43 100644 --- a/be/src/pipeline/exec/es_scan_operator.cpp +++ b/be/src/pipeline/exec/es_scan_operator.cpp @@ -113,9 +113,9 @@ void EsScanLocalState::set_scan_ranges(const std::vector& scan } } -EsScanOperatorX::EsScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, +EsScanOperatorX::EsScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) - : ScanOperatorX(pool, tnode, descs), + : ScanOperatorX(pool, tnode, operator_id, descs), _tuple_id(tnode.es_scan_node.tuple_id), _tuple_desc(nullptr) { ScanOperatorX::_output_tuple_id = tnode.es_scan_node.tuple_id; diff --git a/be/src/pipeline/exec/es_scan_operator.h b/be/src/pipeline/exec/es_scan_operator.h index 9802357a5a..60a5f04510 100644 --- a/be/src/pipeline/exec/es_scan_operator.h +++ b/be/src/pipeline/exec/es_scan_operator.h @@ -62,7 +62,8 @@ private: class EsScanOperatorX final : public ScanOperatorX { public: - EsScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + EsScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 047e336a1e..4528d3411d 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -173,13 +173,13 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf register_channels(_sink_buffer.get()); - _exchange_sink_dependency = AndDependency::create_shared(_parent->id()); - _queue_dependency = ExchangeSinkQueueDependency::create_shared(_parent->id()); + _exchange_sink_dependency = AndDependency::create_shared(_parent->operator_id()); + _queue_dependency = ExchangeSinkQueueDependency::create_shared(_parent->operator_id()); _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); _exchange_sink_dependency->add_child(_queue_dependency); if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && !only_local_exchange) { - _broadcast_dependency = BroadcastDependency::create_shared(_parent->id()); + _broadcast_dependency = BroadcastDependency::create_shared(_parent->operator_id()); _broadcast_dependency->set_available_block(config::num_broadcast_buffer); _broadcast_pb_blocks.reserve(config::num_broadcast_buffer); for (size_t i = 0; i < config::num_broadcast_buffer; i++) { @@ -194,10 +194,11 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf size_t dep_id = 0; _channels_dependency.resize(local_size); _wait_channel_timer.resize(local_size); - auto deps_for_channels = AndDependency::create_shared(_parent->id()); + auto deps_for_channels = AndDependency::create_shared(_parent->operator_id()); for (auto channel : channels) { if (channel->is_local()) { - _channels_dependency[dep_id] = ChannelDependency::create_shared(_parent->id()); + _channels_dependency[dep_id] = + ChannelDependency::create_shared(_parent->operator_id()); channel->set_dependency(_channels_dependency[dep_id]); deps_for_channels->add_child(_channels_dependency[dep_id]); _wait_channel_timer[dep_id] = @@ -237,10 +238,10 @@ segment_v2::CompressionTypePB& ExchangeSinkLocalState::compression_type() { } ExchangeSinkOperatorX::ExchangeSinkOperatorX( - RuntimeState* state, const RowDescriptor& row_desc, const TDataStreamSink& sink, - const std::vector& destinations, + RuntimeState* state, const RowDescriptor& row_desc, int operator_id, + const TDataStreamSink& sink, const std::vector& destinations, bool send_query_statistics_with_every_batch) - : DataSinkOperatorX(sink.dest_node_id), + : DataSinkOperatorX(operator_id, sink.dest_node_id), _texprs(sink.output_partition.partition_exprs), _row_desc(row_desc), _part_type(sink.output_partition.type), @@ -507,13 +508,8 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { return PipelineXSinkLocalState<>::close(state, exec_status); } -WriteDependency* ExchangeSinkOperatorX::wait_for_dependency(RuntimeState* state) { - CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._exchange_sink_dependency->write_blocked_by(); -} - FinishDependency* ExchangeSinkOperatorX::finish_blocked_by(RuntimeState* state) const { - auto& local_state = state->get_sink_local_state(id())->cast(); + auto& local_state = state->get_sink_local_state(operator_id())->cast(); return local_state._finish_dependency->finish_blocked_by(); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index ed8df4dbe0..5ef3d9ee71 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -133,7 +133,7 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; - + WriteDependency* dependency() override { return _exchange_sink_dependency.get(); } Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1); void register_channels(pipeline::ExchangeSinkBuffer* buffer); Status get_next_available_buffer(vectorized::BroadcastPBlockHolder** holder); @@ -214,7 +214,7 @@ private: class ExchangeSinkOperatorX final : public DataSinkOperatorX { public: - ExchangeSinkOperatorX(RuntimeState* state, const RowDescriptor& row_desc, + ExchangeSinkOperatorX(RuntimeState* state, const RowDescriptor& row_desc, int operator_id, const TDataStreamSink& sink, const std::vector& destinations, bool send_query_statistics_with_every_batch); @@ -232,7 +232,6 @@ public: int num_receivers = 1); Status try_close(RuntimeState* state, Status exec_status) override; - WriteDependency* wait_for_dependency(RuntimeState* state) override; FinishDependency* finish_blocked_by(RuntimeState* state) const override; private: diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 66a6ecc536..4a3260b51e 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -49,14 +49,14 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( - state, p.input_row_desc(), state->fragment_instance_id(), p.id(), p.num_senders(), + state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(), profile(), p.is_merging(), p.sub_plan_query_statistics_recvr()); - source_dependency = AndDependency::create_shared(_parent->id()); + source_dependency = AndDependency::create_shared(_parent->operator_id()); const auto& queues = stream_recvr->sender_queues(); deps.resize(queues.size()); metrics.resize(queues.size()); for (size_t i = 0; i < queues.size(); i++) { - deps[i] = ExchangeDataDependency::create_shared(_parent->id(), queues[i]); + deps[i] = ExchangeDataDependency::create_shared(_parent->operator_id(), queues[i]); queues[i]->set_dependency(deps[i]); source_dependency->add_child(deps[i]); } @@ -79,8 +79,9 @@ Status ExchangeLocalState::open(RuntimeState* state) { } ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs, int num_senders) - : OperatorX(pool, tnode, descs), + int operator_id, const DescriptorTbl& descs, + int num_senders) + : OperatorX(pool, tnode, operator_id, descs), _num_senders(num_senders), _is_merging(tnode.exchange_node.__isset.sort_info), _input_row_desc(descs, tnode.exchange_node.input_row_tuples, @@ -166,11 +167,6 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block return status; } -Dependency* ExchangeSourceOperatorX::wait_for_dependency(RuntimeState* state) { - CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state.source_dependency->read_blocked_by(); -} - Status ExchangeLocalState::close(RuntimeState* state) { SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(_close_timer); diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index b0f455cefc..12c6c38e4b 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -96,7 +96,7 @@ class ExchangeLocalState final : public PipelineXLocalState<> { Status init(RuntimeState* state, LocalStateInfo& info) override; Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; - + Dependency* dependency() override { return source_dependency.get(); } std::shared_ptr stream_recvr; doris::vectorized::VSortExecExprs vsort_exec_exprs; int64_t num_rows_skipped; @@ -110,9 +110,8 @@ class ExchangeLocalState final : public PipelineXLocalState<> { class ExchangeSourceOperatorX final : public OperatorX { public: - ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, - int num_senders); - Dependency* wait_for_dependency(RuntimeState* state) override; + ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs, int num_senders); Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 28fdb89198..9fdafc734c 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -102,8 +102,9 @@ Status FileScanLocalState::_process_conjuncts() { Status FileScanOperatorX::prepare(RuntimeState* state) { RETURN_IF_ERROR(ScanOperatorX::prepare(state)); if (state->get_query_ctx() != nullptr && - state->get_query_ctx()->file_scan_range_params_map.count(_id) > 0) { - TFileScanRangeParams& params = state->get_query_ctx()->file_scan_range_params_map[_id]; + state->get_query_ctx()->file_scan_range_params_map.contains(node_id())) { + TFileScanRangeParams& params = + state->get_query_ctx()->file_scan_range_params_map[node_id()]; _output_tuple_id = params.dest_tuple_id; } return Status::OK(); diff --git a/be/src/pipeline/exec/file_scan_operator.h b/be/src/pipeline/exec/file_scan_operator.h index 63638872bf..df54a22e61 100644 --- a/be/src/pipeline/exec/file_scan_operator.h +++ b/be/src/pipeline/exec/file_scan_operator.h @@ -52,7 +52,7 @@ public: Status _process_conjuncts() override; Status _init_scanners(std::list* scanners) override; void set_scan_ranges(const std::vector& scan_ranges) override; - int parent_id() { return _parent->id(); } + int parent_id() { return _parent->node_id(); } private: std::vector _scan_ranges; @@ -67,8 +67,9 @@ private: class FileScanOperatorX final : public ScanOperatorX { public: - FileScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ScanOperatorX(pool, tnode, descs) { + FileScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : ScanOperatorX(pool, tnode, operator_id, descs) { _output_tuple_id = tnode.file_scan_node.tuple_id; } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 153882075b..e04229f7a2 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -48,7 +48,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info)); SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(_open_timer); - _shared_hash_table_dependency = SharedHashTableDependency::create_shared(_parent->id()); + _shared_hash_table_dependency = + SharedHashTableDependency::create_shared(_parent->operator_id()); auto& p = _parent->cast(); _shared_state->join_op_variants = p._join_op_variants; if (p._is_broadcast_join && state->enable_share_hash_table_for_broadcast_join()) { @@ -74,14 +75,15 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo if (state->enable_share_hash_table_for_broadcast_join()) { profile()->add_info_string("ShareHashTableEnabled", "true"); _should_build_hash_table = p._shared_hashtable_controller->should_build_hash_table( - state->fragment_instance_id(), p.id()); + state->fragment_instance_id(), p.node_id()); } else { profile()->add_info_string("ShareHashTableEnabled", "false"); } } if (!_should_build_hash_table) { _shared_hash_table_dependency->block_writing(); - p._shared_hashtable_controller->append_dependency(p.id(), _shared_hash_table_dependency); + p._shared_hashtable_controller->append_dependency(p.node_id(), + _shared_hash_table_dependency); } else if (p._is_broadcast_join) { // avoid vector expand change block address. // one block can store 4g data, _build_blocks can store 128*4g data. @@ -343,9 +345,10 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) { *_shared_state->hash_table_variants); } -HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, +HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, + const TPlanNode& tnode, const DescriptorTbl& descs) - : JoinBuildSinkOperatorX(pool, tnode, descs), + : JoinBuildSinkOperatorX(pool, operator_id, tnode, descs), _is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join && tnode.hash_join_node.is_broadcast_join) { _runtime_filter_descs = tnode.runtime_filters; @@ -356,7 +359,7 @@ Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) { if (state->enable_share_hash_table_for_broadcast_join()) { _shared_hashtable_controller = state->get_query_ctx()->get_shared_hash_table_controller(); - _shared_hash_table_context = _shared_hashtable_controller->get_context(id()); + _shared_hash_table_context = _shared_hashtable_controller->get_context(node_id()); } } RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child_x->row_desc())); @@ -477,7 +480,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* if (!ret.ok()) { if (_shared_hashtable_controller) { _shared_hash_table_context->status = ret; - _shared_hashtable_controller->signal(id()); + _shared_hashtable_controller->signal(node_id()); } return ret; } @@ -493,7 +496,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._runtime_filter_slots->copy_to_shared_context( _shared_hash_table_context); } - _shared_hashtable_controller->signal(id()); + _shared_hashtable_controller->signal(node_id()); } } else if (!local_state._should_build_hash_table) { DCHECK(_shared_hashtable_controller != nullptr); @@ -505,7 +508,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state.profile()->add_info_string( "SharedHashTableFrom", - print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id()))); + print_id( + _shared_hashtable_controller->get_builder_fragment_instance_id(node_id()))); local_state._shared_state->_has_null_in_build_side = _shared_hash_table_context->short_circuit_for_null_in_probe_side; std::visit( diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 9b43f95cd3..8ba6e2fba3 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -46,7 +46,7 @@ public: class HashJoinBuildSinkOperatorX; -class SharedHashTableDependency : public WriteDependency { +class SharedHashTableDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(SharedHashTableDependency); SharedHashTableDependency(int id) : WriteDependency(id, "SharedHashTableDependency") {} @@ -80,6 +80,7 @@ public: void add_hash_buckets_filled_info(const std::string& info) const { _profile->add_info_string("HashTableFilledBuckets", info); } + WriteDependency* dependency() override { return _shared_hash_table_dependency.get(); } protected: void _hash_table_init(RuntimeState* state); @@ -130,7 +131,7 @@ protected: class HashJoinBuildSinkOperatorX final : public JoinBuildSinkOperatorX { public: - HashJoinBuildSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, + HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TDataSink", @@ -145,13 +146,8 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - WriteDependency* wait_for_dependency(RuntimeState* state) override { - CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._shared_hash_table_dependency->write_blocked_by(); - } - bool should_dry_run(RuntimeState* state) override { - return _is_broadcast_join && !state->get_sink_local_state(id()) + return _is_broadcast_join && !state->get_sink_local_state(operator_id()) ->cast() ._should_build_hash_table; } diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 3d0ea8a4ff..5fe5dcc743 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -218,8 +218,8 @@ void HashJoinProbeLocalState::_prepare_probe_block() { } HashJoinProbeOperatorX::HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) - : JoinProbeOperatorX(pool, tnode, descs), + int operator_id, const DescriptorTbl& descs) + : JoinProbeOperatorX(pool, tnode, operator_id, descs), _hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids ? tnode.hash_join_node.hash_output_slot_ids : std::vector {}) {} @@ -428,7 +428,7 @@ Status HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state } bool HashJoinProbeOperatorX::need_more_input_data(RuntimeState* state) const { - auto& local_state = state->get_local_state(id())->cast(); + auto& local_state = state->get_local_state(operator_id())->cast(); return (local_state._probe_block.rows() == 0 || local_state._probe_index == local_state._probe_block.rows()) && !local_state._probe_eos && !local_state._shared_state->short_circuit_for_probe; @@ -588,10 +588,5 @@ Status HashJoinProbeOperatorX::open(RuntimeState* state) { return Status::OK(); } -Dependency* HashJoinProbeOperatorX::wait_for_dependency(RuntimeState* state) { - CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._dependency->read_blocked_by(); -} - } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 082f45199c..923f7dd7b9 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -138,11 +138,11 @@ private: class HashJoinProbeOperatorX final : public JoinProbeOperatorX { public: - HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Dependency* wait_for_dependency(RuntimeState* state) override; Status push(RuntimeState* state, vectorized::Block* input_block, SourceState source_state) const override; diff --git a/be/src/pipeline/exec/jdbc_scan_operator.cpp b/be/src/pipeline/exec/jdbc_scan_operator.cpp index 8611134c8e..1f03939df1 100644 --- a/be/src/pipeline/exec/jdbc_scan_operator.cpp +++ b/be/src/pipeline/exec/jdbc_scan_operator.cpp @@ -32,9 +32,9 @@ Status JDBCScanLocalState::_init_scanners(std::list* s return Status::OK(); } -JDBCScanOperatorX::JDBCScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, +JDBCScanOperatorX::JDBCScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) - : ScanOperatorX(pool, tnode, descs), + : ScanOperatorX(pool, tnode, operator_id, descs), _table_name(tnode.jdbc_scan_node.table_name), _tuple_id(tnode.jdbc_scan_node.tuple_id), _query_string(tnode.jdbc_scan_node.query_string), diff --git a/be/src/pipeline/exec/jdbc_scan_operator.h b/be/src/pipeline/exec/jdbc_scan_operator.h index 8fbbea2cfc..bf954e25df 100644 --- a/be/src/pipeline/exec/jdbc_scan_operator.h +++ b/be/src/pipeline/exec/jdbc_scan_operator.h @@ -51,7 +51,8 @@ private: class JDBCScanOperatorX final : public ScanOperatorX { public: - JDBCScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + JDBCScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); private: friend class JDBCScanLocalState; diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp index 32c5958b95..e68ce9500a 100644 --- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp +++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp @@ -30,9 +30,9 @@ class DataSink; namespace doris::pipeline { -JdbcTableSinkOperatorX::JdbcTableSinkOperatorX(const RowDescriptor& row_desc, +JdbcTableSinkOperatorX::JdbcTableSinkOperatorX(const RowDescriptor& row_desc, int operator_id, const std::vector& t_output_expr) - : DataSinkOperatorX(0), _row_desc(row_desc), _t_output_expr(t_output_expr) {} + : DataSinkOperatorX(operator_id, 0), _row_desc(row_desc), _t_output_expr(t_output_expr) {} Status JdbcTableSinkOperatorX::init(const TDataSink& thrift_sink) { RETURN_IF_ERROR(DataSinkOperatorX::init(thrift_sink)); @@ -63,13 +63,8 @@ Status JdbcTableSinkOperatorX::sink(RuntimeState* state, vectorized::Block* bloc return Status::OK(); } -WriteDependency* JdbcTableSinkOperatorX::wait_for_dependency(RuntimeState* state) { - CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state.write_blocked_by(); -} - FinishDependency* JdbcTableSinkOperatorX::finish_blocked_by(RuntimeState* state) const { - auto& local_state = state->get_sink_local_state(id())->cast(); + auto& local_state = state->get_sink_local_state(operator_id())->cast(); return local_state._finish_dependency->finish_blocked_by(); } diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.h b/be/src/pipeline/exec/jdbc_table_sink_operator.h index a37e1c4098..02ccd00fa3 100644 --- a/be/src/pipeline/exec/jdbc_table_sink_operator.h +++ b/be/src/pipeline/exec/jdbc_table_sink_operator.h @@ -46,7 +46,8 @@ private: class JdbcTableSinkOperatorX final : public DataSinkOperatorX { public: - JdbcTableSinkOperatorX(const RowDescriptor& row_desc, const std::vector& select_exprs); + JdbcTableSinkOperatorX(const RowDescriptor& row_desc, int operator_id, + const std::vector& select_exprs); Status init(const TDataSink& thrift_sink) override; Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; @@ -54,7 +55,6 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - WriteDependency* wait_for_dependency(RuntimeState* state) override; FinishDependency* finish_blocked_by(RuntimeState* state) const override; private: diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp b/be/src/pipeline/exec/join_build_sink_operator.cpp index fe5faf43a8..2ed4ebfb0d 100644 --- a/be/src/pipeline/exec/join_build_sink_operator.cpp +++ b/be/src/pipeline/exec/join_build_sink_operator.cpp @@ -44,10 +44,10 @@ Status JoinBuildSinkLocalState::init(RuntimeState* stat } template -JoinBuildSinkOperatorX::JoinBuildSinkOperatorX(ObjectPool* pool, +JoinBuildSinkOperatorX::JoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs) - : DataSinkOperatorX(tnode.node_id), + : DataSinkOperatorX(operator_id, tnode.node_id), _join_op(tnode.__isset.hash_join_node ? tnode.hash_join_node.join_op : (tnode.__isset.nested_loop_join_node ? tnode.nested_loop_join_node.join_op diff --git a/be/src/pipeline/exec/join_build_sink_operator.h b/be/src/pipeline/exec/join_build_sink_operator.h index 5f44083687..9034057658 100644 --- a/be/src/pipeline/exec/join_build_sink_operator.h +++ b/be/src/pipeline/exec/join_build_sink_operator.h @@ -48,7 +48,8 @@ protected: template class JoinBuildSinkOperatorX : public DataSinkOperatorX { public: - JoinBuildSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + JoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, + const DescriptorTbl& descs); ~JoinBuildSinkOperatorX() override = default; protected: diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index 8ae888f7f0..5dd2d5c0cc 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -153,8 +153,8 @@ void JoinProbeLocalState::_reset_tuple_is_null_column() template JoinProbeOperatorX::JoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) - : Base(pool, tnode, descs), + int operator_id, const DescriptorTbl& descs) + : Base(pool, tnode, operator_id, descs), _join_op(tnode.__isset.hash_join_node ? tnode.hash_join_node.join_op : (tnode.__isset.nested_loop_join_node ? tnode.nested_loop_join_node.join_op diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index 5727318a4f..9c874b0b67 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -66,7 +66,8 @@ template class JoinProbeOperatorX : public StatefulOperatorX { public: using Base = StatefulOperatorX; - JoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + JoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state) override; Status open(doris::RuntimeState* state) override; diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp b/be/src/pipeline/exec/meta_scan_operator.cpp index 87f4e2187a..326b9f9e25 100644 --- a/be/src/pipeline/exec/meta_scan_operator.cpp +++ b/be/src/pipeline/exec/meta_scan_operator.cpp @@ -47,9 +47,9 @@ Status MetaScanLocalState::_process_conjuncts() { return Status::OK(); } -MetaScanOperatorX::MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, +MetaScanOperatorX::MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) - : ScanOperatorX(pool, tnode, descs), + : ScanOperatorX(pool, tnode, operator_id, descs), _tuple_id(tnode.meta_scan_node.tuple_id) { _output_tuple_id = _tuple_id; if (tnode.meta_scan_node.__isset.current_user_ident) { diff --git a/be/src/pipeline/exec/meta_scan_operator.h b/be/src/pipeline/exec/meta_scan_operator.h index bbe67ba974..ed371847a1 100644 --- a/be/src/pipeline/exec/meta_scan_operator.h +++ b/be/src/pipeline/exec/meta_scan_operator.h @@ -57,7 +57,8 @@ private: class MetaScanOperatorX final : public ScanOperatorX { public: - MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); private: friend class MetaScanLocalState; diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 5629da5e35..bbd66973f8 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -138,6 +138,7 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); + static_cast(_dependency)->set_consumer_id(p._consumer_id); _output_expr_contexts.resize(p._output_expr_contexts.size()); for (size_t i = 0; i < p._output_expr_contexts.size(); i++) { RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i])); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 943c62d077..22a50fa1fc 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -121,18 +121,14 @@ public: using Base = OperatorX; MultiCastDataStreamerSourceOperatorX(const int consumer_id, ObjectPool* pool, const TDataStreamSink& sink, - const RowDescriptor& row_descriptor, int id) - : Base(pool, -1, id), + const RowDescriptor& row_descriptor, int operator_id) + : Base(pool, -1, operator_id), _consumer_id(consumer_id), _t_data_stream_sink(sink), _row_descriptor(row_descriptor) { _op_name = "MULTI_CAST_DATA_STREAM_SOURCE_OPERATOR"; }; ~MultiCastDataStreamerSourceOperatorX() override = default; - Dependency* wait_for_dependency(RuntimeState* state) override { - CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._dependency->can_read(_consumer_id); - } Status prepare(RuntimeState* state) override { RETURN_IF_ERROR(Base::prepare(state)); @@ -176,7 +172,7 @@ public: int dest_id_from_sink() const { return _t_data_stream_sink.dest_node_id; } bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const override { - return state->get_local_state(id()) + return state->get_local_state(operator_id()) ->template cast() .runtime_filters_are_ready_or_timeout(); } diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index f3b1c1ee32..e09d1041c0 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -52,9 +52,11 @@ const std::vector& NestedLoopJoinBuildSinkLocalState::runtim } NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, + int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs) - : JoinBuildSinkOperatorX(pool, tnode, descs), + : JoinBuildSinkOperatorX(pool, operator_id, tnode, + descs), _runtime_filter_descs(tnode.runtime_filters), _is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only && tnode.nested_loop_join_node.is_output_left_side_only), diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index 568e02be33..bd94c9e0e9 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -74,7 +74,7 @@ private: class NestedLoopJoinBuildSinkOperatorX final : public JoinBuildSinkOperatorX { public: - NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, + NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs); Status init(const TDataSink& tsink) override { return Status::InternalError( diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index e1454c8c3c..718d66decf 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -438,8 +438,9 @@ void NestedLoopJoinProbeLocalState::_process_left_child_block( } NestedLoopJoinProbeOperatorX::NestedLoopJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, + int operator_id, const DescriptorTbl& descs) - : JoinProbeOperatorX(pool, tnode, descs), + : JoinProbeOperatorX(pool, tnode, operator_id, descs), _is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only && tnode.nested_loop_join_node.is_output_left_side_only), _old_version_flag(!tnode.__isset.nested_loop_join_node) {} @@ -478,7 +479,8 @@ Status NestedLoopJoinProbeOperatorX::open(RuntimeState* state) { } bool NestedLoopJoinProbeOperatorX::need_more_input_data(RuntimeState* state) const { - auto& local_state = state->get_local_state(id())->cast(); + auto& local_state = + state->get_local_state(operator_id())->cast(); return local_state._need_more_input_data and !local_state._shared_state->left_side_eos and local_state._join_block.rows() == 0; } @@ -563,9 +565,4 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block return Status::OK(); } -Dependency* NestedLoopJoinProbeOperatorX::wait_for_dependency(RuntimeState* state) { - CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._dependency->read_blocked_by(); -} - } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 8ad39451b0..1a18d419b8 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -205,12 +205,11 @@ private: class NestedLoopJoinProbeOperatorX final : public JoinProbeOperatorX { public: - NestedLoopJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, + NestedLoopJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - Dependency* wait_for_dependency(RuntimeState* state) override; Status push(RuntimeState* state, vectorized::Block* input_block, SourceState source_state) const override; diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 4c09b3017e..b4e61af205 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -470,9 +470,9 @@ void OlapScanLocalState::add_filter_info(int id, const PredicateFilterInfo& upda _segment_profile->add_info_string(filter_name, info_str); } -OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, +OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) - : ScanOperatorX(pool, tnode, descs), + : ScanOperatorX(pool, tnode, operator_id, descs), _olap_scan_node(tnode.olap_scan_node) { _output_tuple_id = tnode.olap_scan_node.tuple_id; _col_distribute_ids = tnode.olap_scan_node.distribute_column_ids; diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index dce1f680a9..f15d085b51 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -182,7 +182,8 @@ private: class OlapScanOperatorX final : public ScanOperatorX { public: - OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); private: friend class OlapScanLocalState; diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h b/be/src/pipeline/exec/olap_table_sink_operator.h index 79633e7dc4..ed7ac10cfb 100644 --- a/be/src/pipeline/exec/olap_table_sink_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_operator.h @@ -68,9 +68,9 @@ private: class OlapTableSinkOperatorX final : public DataSinkOperatorX { public: using Base = DataSinkOperatorX; - OlapTableSinkOperatorX(ObjectPool* pool, const RowDescriptor& row_desc, + OlapTableSinkOperatorX(ObjectPool* pool, int operator_id, const RowDescriptor& row_desc, const std::vector& t_output_expr, bool group_commit) - : Base(0), + : Base(operator_id, 0), _row_desc(row_desc), _t_output_expr(t_output_expr), _group_commit(group_commit), @@ -101,15 +101,11 @@ public: } FinishDependency* finish_blocked_by(RuntimeState* state) const override { - auto& local_state = state->get_sink_local_state(id())->cast(); + auto& local_state = + state->get_sink_local_state(operator_id())->cast(); return local_state._finish_dependency->finish_blocked_by(); }; - WriteDependency* wait_for_dependency(RuntimeState* state) override { - CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state.write_blocked_by(); - } - private: friend class OlapTableSinkLocalState; template diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 515beba944..4379a088a3 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -51,9 +51,10 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo return Status::OK(); } -PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, +PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, int operator_id, + const TPlanNode& tnode, const DescriptorTbl& descs) - : DataSinkOperatorX(tnode.node_id), + : DataSinkOperatorX(operator_id, tnode.node_id), _pool(pool), _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), _limit(tnode.limit), diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index 1b76623c4f..93771ac8e2 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -86,7 +86,7 @@ private: class PartitionSortSinkOperatorX final : public DataSinkOperatorX { public: - PartitionSortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, + PartitionSortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index f9003c6526..9800efd8ea 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -79,11 +79,6 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: return Status::OK(); } -Dependency* PartitionSortSourceOperatorX::wait_for_dependency(RuntimeState* state) { - CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._dependency->read_blocked_by(); -} - Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state, vectorized::Block* output_block, PartitionSortSourceLocalState& local_state) { diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h b/be/src/pipeline/exec/partition_sort_source_operator.h index 23393a870a..0bb1149603 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.h +++ b/be/src/pipeline/exec/partition_sort_source_operator.h @@ -74,15 +74,13 @@ private: class PartitionSortSourceOperatorX final : public OperatorX { public: using Base = OperatorX; - PartitionSortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, + PartitionSortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) - : OperatorX(pool, tnode, descs) {} + : OperatorX(pool, tnode, operator_id, descs) {} Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override; - Dependency* wait_for_dependency(RuntimeState* state) override; - bool is_source() const override { return true; } private: diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index f3628d735b..e804685bc4 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -90,9 +90,9 @@ Status RepeatOperatorX::open(RuntimeState* state) { return Status::OK(); } -RepeatOperatorX::RepeatOperatorX(ObjectPool* pool, const TPlanNode& tnode, +RepeatOperatorX::RepeatOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) - : Base(pool, tnode, descs), + : Base(pool, tnode, operator_id, descs), _slot_id_set_list(tnode.repeat_node.slot_id_set_list), _all_slot_ids(tnode.repeat_node.all_slot_ids), _repeat_id_list(tnode.repeat_node.repeat_id_list), @@ -100,7 +100,7 @@ RepeatOperatorX::RepeatOperatorX(ObjectPool* pool, const TPlanNode& tnode, _output_tuple_id(tnode.repeat_node.output_tuple_id) {}; bool RepeatOperatorX::need_more_input_data(RuntimeState* state) const { - auto& local_state = state->get_local_state(id())->cast(); + auto& local_state = state->get_local_state(operator_id())->cast(); return !local_state._child_block->rows() && !local_state._child_eos; } diff --git a/be/src/pipeline/exec/repeat_operator.h b/be/src/pipeline/exec/repeat_operator.h index 08c6503e87..f6c52a0be8 100644 --- a/be/src/pipeline/exec/repeat_operator.h +++ b/be/src/pipeline/exec/repeat_operator.h @@ -73,7 +73,8 @@ private: class RepeatOperatorX final : public StatefulOperatorX { public: using Base = StatefulOperatorX; - RepeatOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + RepeatOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 41845811b3..eeb0bab68a 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -49,19 +49,19 @@ ResultFileSinkLocalState::ResultFileSinkLocalState(DataSinkOperatorXBase* parent : AsyncWriterSink(parent, state), _serializer(this) {} -ResultFileSinkOperatorX::ResultFileSinkOperatorX(const RowDescriptor& row_desc, +ResultFileSinkOperatorX::ResultFileSinkOperatorX(int operator_id, const RowDescriptor& row_desc, const std::vector& t_output_expr) - : DataSinkOperatorX(0), + : DataSinkOperatorX(operator_id, 0), _row_desc(row_desc), _t_output_expr(t_output_expr), _is_top_sink(true) {} ResultFileSinkOperatorX::ResultFileSinkOperatorX( - const RowDescriptor& row_desc, const TResultFileSink& sink, + int operator_id, const RowDescriptor& row_desc, const TResultFileSink& sink, const std::vector& destinations, bool send_query_statistics_with_every_batch, const std::vector& t_output_expr, DescriptorTbl& descs) - : DataSinkOperatorX(0), + : DataSinkOperatorX(operator_id, 0), _row_desc(row_desc), _t_output_expr(t_output_expr), _dests(destinations), @@ -270,13 +270,9 @@ Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_ } FinishDependency* ResultFileSinkOperatorX::finish_blocked_by(RuntimeState* state) const { - auto& local_state = state->get_sink_local_state(id())->cast(); + auto& local_state = + state->get_sink_local_state(operator_id())->cast(); return local_state._finish_dependency->finish_blocked_by(); } -WriteDependency* ResultFileSinkOperatorX::wait_for_dependency(RuntimeState* state) { - CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state.write_blocked_by(); -} - } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h index 5ce028e63c..f80093428c 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.h +++ b/be/src/pipeline/exec/result_file_sink_operator.h @@ -79,8 +79,10 @@ private: class ResultFileSinkOperatorX final : public DataSinkOperatorX { public: - ResultFileSinkOperatorX(const RowDescriptor& row_desc, const std::vector& t_output_expr); - ResultFileSinkOperatorX(const RowDescriptor& row_desc, const TResultFileSink& sink, + ResultFileSinkOperatorX(int operator_id, const RowDescriptor& row_desc, + const std::vector& t_output_expr); + ResultFileSinkOperatorX(int operator_id, const RowDescriptor& row_desc, + const TResultFileSink& sink, const std::vector& destinations, bool send_query_statistics_with_every_batch, const std::vector& t_output_expr, DescriptorTbl& descs); @@ -92,8 +94,6 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - WriteDependency* wait_for_dependency(RuntimeState* state) override; - FinishDependency* finish_blocked_by(RuntimeState* state) const override; private: diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index b9c83a99bc..1ef338b509 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -65,12 +65,12 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( state->fragment_instance_id(), vectorized::RESULT_SINK_BUFFER_SIZE, &_sender, true, state->execution_timeout())); - _result_sink_dependency = OrDependency::create_shared(_parent->id()); - _buffer_dependency = ResultBufferDependency::create_shared(_parent->id()); - _cancel_dependency = CancelDependency::create_shared(_parent->id()); + _result_sink_dependency = OrDependency::create_shared(_parent->operator_id()); + _buffer_dependency = ResultBufferDependency::create_shared(_parent->operator_id()); + _cancel_dependency = CancelDependency::create_shared(_parent->operator_id()); _result_sink_dependency->add_child(_cancel_dependency); _result_sink_dependency->add_child(_buffer_dependency); - _queue_dependency = ResultQueueDependency::create_shared(_parent->id()); + _queue_dependency = ResultQueueDependency::create_shared(_parent->operator_id()); _result_sink_dependency->add_child(_queue_dependency); ((PipBufferControlBlock*)_sender.get()) @@ -101,10 +101,10 @@ Status ResultSinkLocalState::open(RuntimeState* state) { return Status::OK(); } -ResultSinkOperatorX::ResultSinkOperatorX(const RowDescriptor& row_desc, +ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& row_desc, const std::vector& t_output_expr, const TResultSink& sink) - : DataSinkOperatorX(0), _row_desc(row_desc), _t_output_expr(t_output_expr) { + : DataSinkOperatorX(operator_id, 0), _row_desc(row_desc), _t_output_expr(t_output_expr) { if (!sink.__isset.type || sink.type == TResultSinkType::MYSQL_PROTOCAL) { _sink_type = TResultSinkType::MYSQL_PROTOCAL; } else { @@ -206,9 +206,4 @@ Status ResultSinkLocalState::close(RuntimeState* state, Status exec_status) { return final_status; } -WriteDependency* ResultSinkOperatorX::wait_for_dependency(RuntimeState* state) { - CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._result_sink_dependency->write_blocked_by(); -} - } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index c6c3a1c69d..311d2c7067 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -80,6 +80,7 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; + WriteDependency* dependency() override { return _result_sink_dependency.get(); } private: friend class ResultSinkOperatorX; @@ -101,16 +102,14 @@ private: class ResultSinkOperatorX final : public DataSinkOperatorX { public: - ResultSinkOperatorX(const RowDescriptor& row_desc, const std::vector& select_exprs, - const TResultSink& sink); + ResultSinkOperatorX(int operator_id, const RowDescriptor& row_desc, + const std::vector& select_exprs, const TResultSink& sink); Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - WriteDependency* wait_for_dependency(RuntimeState* state) override; - private: friend class ResultSinkLocalState; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index d637dd5ee5..10289f63af 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -122,11 +122,11 @@ Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(RuntimeFilterConsumer::init(state)); - _source_dependency = OrDependency::create_shared(PipelineXLocalState<>::_parent->id()); + _source_dependency = OrDependency::create_shared(PipelineXLocalState<>::_parent->operator_id()); - _open_dependency = OpenDependency::create_shared(PipelineXLocalState<>::_parent->id()); + _open_dependency = OpenDependency::create_shared(PipelineXLocalState<>::_parent->operator_id()); _source_dependency->add_child(_open_dependency); - _eos_dependency = EosDependency::create_shared(PipelineXLocalState<>::_parent->id()); + _eos_dependency = EosDependency::create_shared(PipelineXLocalState<>::_parent->operator_id()); _source_dependency->add_child(_eos_dependency); auto& p = _parent->cast(); @@ -1178,9 +1178,11 @@ Status ScanLocalState::_start_scanners( _scanner_ctx = PipScannerContext::create_shared(state(), this, p._output_tuple_desc, scanners, p.limit(), state()->scan_queue_mem_limit(), p._col_distribute_ids, 1); - _scanner_done_dependency = ScannerDoneDependency::create_shared(p.id(), _scanner_ctx.get()); + _scanner_done_dependency = + ScannerDoneDependency::create_shared(p.operator_id(), _scanner_ctx.get()); _source_dependency->add_child(_scanner_done_dependency); - _data_ready_dependency = DataReadyDependency::create_shared(p.id(), _scanner_ctx.get()); + _data_ready_dependency = + DataReadyDependency::create_shared(p.operator_id(), _scanner_ctx.get()); _source_dependency->add_child(_data_ready_dependency); _scanner_ctx->set_dependency(_data_ready_dependency, _scanner_done_dependency, @@ -1265,8 +1267,8 @@ Status ScanLocalState::_init_profile() { template ScanOperatorX::ScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) - : OperatorX(pool, tnode, descs), + int operator_id, const DescriptorTbl& descs) + : OperatorX(pool, tnode, operator_id, descs), _runtime_filter_descs(tnode.runtime_filters) { if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) { // Which means the request could be fullfilled in a single segment iterator request. @@ -1279,15 +1281,9 @@ ScanOperatorX::ScanOperatorX(ObjectPool* pool, const TPlanNode& } } -template -Dependency* ScanOperatorX::wait_for_dependency(RuntimeState* state) { - CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._source_dependency->read_blocked_by(); -} - template FinishDependency* ScanOperatorX::finish_blocked_by(RuntimeState* state) const { - auto& local_state = state->get_local_state(id())->template cast(); + auto& local_state = state->get_local_state(operator_id())->template cast(); return local_state._finish_dependency->finish_blocked_by(); } @@ -1375,7 +1371,7 @@ Status ScanLocalState::close(RuntimeState* state) { template bool ScanOperatorX::runtime_filters_are_ready_or_timeout( RuntimeState* state) const { - return state->get_local_state(id()) + return state->get_local_state(operator_id()) ->template cast() .runtime_filters_are_ready_or_timeout(); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 5cdbac8957..83724e3df5 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -56,7 +56,7 @@ public: Status try_close(RuntimeState* state) override; }; -struct OpenDependency : public Dependency { +struct OpenDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(OpenDependency); OpenDependency(int id) : Dependency(id, "OpenDependency") {} @@ -65,14 +65,14 @@ public: [[nodiscard]] int64_t read_watcher_elapse_time() override { return 0; } }; -class EosDependency : public Dependency { +class EosDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(EosDependency); EosDependency(int id) : Dependency(id, "EosDependency") {} void* shared_state() override { return nullptr; } }; -class ScannerDoneDependency : public Dependency { +class ScannerDoneDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(ScannerDoneDependency); ScannerDoneDependency(int id, vectorized::ScannerContext* scanner_ctx) @@ -90,7 +90,7 @@ private: vectorized::ScannerContext* _scanner_ctx; }; -class DataReadyDependency : public Dependency { +class DataReadyDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(DataReadyDependency); DataReadyDependency(int id, vectorized::ScannerContext* scanner_ctx) @@ -118,7 +118,7 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::Runt public: ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent) : PipelineXLocalState<>(state, parent), - vectorized::RuntimeFilterConsumer(parent->id(), parent->runtime_filter_descs(), + vectorized::RuntimeFilterConsumer(parent->node_id(), parent->runtime_filter_descs(), parent->row_descriptor(), _conjuncts) {} virtual ~ScanLocalStateBase() = default; @@ -225,6 +225,8 @@ class ScanLocalState : public ScanLocalStateBase { int64_t get_push_down_count() override; + Dependency* dependency() override { return _source_dependency.get(); } + protected: template friend class ScanOperatorX; @@ -416,7 +418,6 @@ public: Status try_close(RuntimeState* state) override; - Dependency* wait_for_dependency(RuntimeState* state) override; FinishDependency* finish_blocked_by(RuntimeState* state) const override; Status init(const TPlanNode& tnode, RuntimeState* state) override; @@ -434,10 +435,12 @@ public: int64_t get_push_down_count() const { return _push_down_count; } using OperatorX::id; + using OperatorX::operator_id; protected: using LocalState = LocalStateType; - ScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); virtual ~ScanOperatorX() = default; template friend class ScanLocalState; diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index a406f8ae2c..165885e63a 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -72,9 +72,9 @@ Status SchemaScanLocalState::open(RuntimeState* state) { return _schema_scanner->start(state); } -SchemaScanOperatorX::SchemaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, +SchemaScanOperatorX::SchemaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) - : Base(pool, tnode, descs), + : Base(pool, tnode, operator_id, descs), _table_name(tnode.schema_scan_node.table_name), _common_scanner_param(new SchemaScannerCommonParam()), _tuple_id(tnode.schema_scan_node.tuple_id), diff --git a/be/src/pipeline/exec/schema_scan_operator.h b/be/src/pipeline/exec/schema_scan_operator.h index 03238947e8..e7479a75e9 100644 --- a/be/src/pipeline/exec/schema_scan_operator.h +++ b/be/src/pipeline/exec/schema_scan_operator.h @@ -72,7 +72,8 @@ private: class SchemaScanOperatorX final : public OperatorX { public: using Base = OperatorX; - SchemaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + SchemaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); ~SchemaScanOperatorX() override = default; Status init(const TPlanNode& tnode, RuntimeState* state) override; diff --git a/be/src/pipeline/exec/select_operator.h b/be/src/pipeline/exec/select_operator.h index 8c2b494719..93e2ee184c 100644 --- a/be/src/pipeline/exec/select_operator.h +++ b/be/src/pipeline/exec/select_operator.h @@ -55,8 +55,9 @@ private: class SelectOperatorX final : public StreamingOperatorX { public: - SelectOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : StreamingOperatorX(pool, tnode, descs) {} + SelectOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : StreamingOperatorX(pool, tnode, operator_id, descs) {} Status pull(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override { CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state); diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index aea9aff0e7..31e99d6e5e 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -101,15 +101,6 @@ Status SetProbeSinkOperatorX::open(RuntimeState* state) { return vectorized::VExpr::open(_child_exprs, state); } -template -WriteDependency* SetProbeSinkOperatorX::wait_for_dependency(RuntimeState* state) { - CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return ((SetSharedState*)local_state._dependency->shared_state()) - ->probe_finished_children_index[_cur_child_id - 1] - ? nullptr - : local_state._dependency; -} - template Status SetProbeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) { @@ -153,7 +144,7 @@ Status SetProbeSinkLocalState::init(RuntimeState* state, LocalSink SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(_open_timer); Parent& parent = _parent->cast(); - + static_cast(_dependency)->set_cur_child_id(parent._cur_child_id); _child_exprs.resize(parent._child_exprs.size()); for (size_t i = 0; i < _child_exprs.size(); i++) { RETURN_IF_ERROR(parent._child_exprs[i]->clone(state, _child_exprs[i])); diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index 45176fd009..d31c822d62 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -99,7 +99,7 @@ template class SetProbeSinkOperatorX final : public DataSinkOperatorX> { public: using Base = DataSinkOperatorX>; - using DataSinkOperatorXBase::id; + using DataSinkOperatorXBase::operator_id; using typename Base::LocalState; friend class SetProbeSinkLocalState; @@ -122,8 +122,6 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - WriteDependency* wait_for_dependency(RuntimeState* state) override; - private: void _finalize_probe(SetProbeSinkLocalState& local_state); Status _extract_probe_column(SetProbeSinkLocalState& local_state, diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 5383b1b3a5..a475807d19 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -92,7 +92,7 @@ template class SetSinkOperatorX final : public DataSinkOperatorX> { public: using Base = DataSinkOperatorX>; - using DataSinkOperatorXBase::id; + using DataSinkOperatorXBase::operator_id; using typename Base::LocalState; friend class SetSinkLocalState; diff --git a/be/src/pipeline/exec/set_source_operator.h b/be/src/pipeline/exec/set_source_operator.h index dd1132be01..e8cbce8a7d 100644 --- a/be/src/pipeline/exec/set_source_operator.h +++ b/be/src/pipeline/exec/set_source_operator.h @@ -79,18 +79,14 @@ class SetSourceOperatorX final : public OperatorX>; // for non-delay tempalte instantiation - using OperatorXBase::id; + using OperatorXBase::operator_id; using typename Base::LocalState; - SetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : Base(pool, tnode, descs) {}; + SetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : Base(pool, tnode, operator_id, descs) {}; ~SetSourceOperatorX() override = default; - Dependency* wait_for_dependency(RuntimeState* state) override { - CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._dependency->read_blocked_by(); - } - [[nodiscard]] bool is_source() const override { return true; } Status get_block(RuntimeState* state, vectorized::Block* block, diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 9da23d33cd..0184ceab66 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -68,9 +68,9 @@ Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { return Status::OK(); } -SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, +SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs) - : DataSinkOperatorX(tnode.node_id), + : DataSinkOperatorX(operator_id, tnode.node_id), _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0), _pool(pool), _reuse_mem(true), diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 69662f25b4..ccb19c5456 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -72,7 +72,8 @@ private: class SortSinkOperatorX final : public DataSinkOperatorX { public: - SortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, + const DescriptorTbl& descs); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX::_name); diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index de5cd937be..8b189b87a6 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -28,9 +28,9 @@ OPERATOR_CODE_GENERATOR(SortSourceOperator, SourceOperator) SortLocalState::SortLocalState(RuntimeState* state, OperatorXBase* parent) : PipelineXLocalState(state, parent) {} -SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, +SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) - : OperatorX(pool, tnode, descs) {} + : OperatorX(pool, tnode, operator_id, descs) {} Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { @@ -46,9 +46,4 @@ Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* bl return Status::OK(); } -Dependency* SortSourceOperatorX::wait_for_dependency(RuntimeState* state) { - CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._dependency->read_blocked_by(); -} - } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/sort_source_operator.h b/be/src/pipeline/exec/sort_source_operator.h index 8521715154..5d2614fce1 100644 --- a/be/src/pipeline/exec/sort_source_operator.h +++ b/be/src/pipeline/exec/sort_source_operator.h @@ -58,9 +58,8 @@ private: class SortSourceOperatorX final : public OperatorX { public: - SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - Dependency* wait_for_dependency(RuntimeState* state) override; - + SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override; diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp index 9b11ef361e..3022d542e8 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp @@ -340,9 +340,10 @@ Status StreamingAggSinkLocalState::_pre_agg_with_serialized_key( return Status::OK(); } -StreamingAggSinkOperatorX::StreamingAggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, +StreamingAggSinkOperatorX::StreamingAggSinkOperatorX(ObjectPool* pool, int operator_id, + const TPlanNode& tnode, const DescriptorTbl& descs) - : AggSinkOperatorX(pool, tnode, descs) {} + : AggSinkOperatorX(pool, operator_id, tnode, descs) {} Status StreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(AggSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h index e92c5ff4e6..6f49ac86e2 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h @@ -108,16 +108,12 @@ private: class StreamingAggSinkOperatorX final : public AggSinkOperatorX { public: - StreamingAggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + StreamingAggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, + const DescriptorTbl& descs); ~StreamingAggSinkOperatorX() override = default; Status init(const TPlanNode& tnode, RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - - WriteDependency* wait_for_dependency(RuntimeState* state) override { - CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._dependency->write_blocked_by(); - } }; } // namespace pipeline diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp index 1deb653fc3..0db8ef6f6b 100644 --- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp @@ -74,8 +74,9 @@ OperatorPtr StreamingAggSourceOperatorBuilder::build_operator() { } StreamingAggSourceOperatorX::StreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, + int operator_id, const DescriptorTbl& descs) - : Base(pool, tnode, descs) {} + : Base(pool, tnode, operator_id, descs) {} Status StreamingAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.h b/be/src/pipeline/exec/streaming_aggregation_source_operator.h index 10dc6cd026..3e05955470 100644 --- a/be/src/pipeline/exec/streaming_aggregation_source_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.h @@ -63,7 +63,7 @@ private: class StreamingAggSourceOperatorX final : public AggSourceOperatorX { public: using Base = AggSourceOperatorX; - StreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, + StreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs); ~StreamingAggSourceOperatorX() = default; diff --git a/be/src/pipeline/exec/table_function_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp index 911fdf65ee..5e1d5c281e 100644 --- a/be/src/pipeline/exec/table_function_operator.cpp +++ b/be/src/pipeline/exec/table_function_operator.cpp @@ -243,8 +243,8 @@ Status TableFunctionLocalState::process_next_child_row() { } TableFunctionOperatorX::TableFunctionOperatorX(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) - : Base(pool, tnode, descs) {} + int operator_id, const DescriptorTbl& descs) + : Base(pool, tnode, operator_id, descs) {} Status TableFunctionOperatorX::_prepare_output_slot_ids(const TPlanNode& tnode) { // Prepare output slot ids diff --git a/be/src/pipeline/exec/table_function_operator.h b/be/src/pipeline/exec/table_function_operator.h index 541938ec72..b71cc52ef2 100644 --- a/be/src/pipeline/exec/table_function_operator.h +++ b/be/src/pipeline/exec/table_function_operator.h @@ -84,13 +84,14 @@ private: class TableFunctionOperatorX final : public StatefulOperatorX { public: using Base = StatefulOperatorX; - TableFunctionOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + TableFunctionOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(doris::RuntimeState* state) override; Status open(doris::RuntimeState* state) override; bool need_more_input_data(RuntimeState* state) const override { - auto& local_state = state->get_local_state(id())->cast(); + auto& local_state = state->get_local_state(operator_id())->cast(); return !local_state._child_block->rows() && local_state._child_source_state != SourceState::FINISHED; } diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index aa85b6fbaf..d486c3d775 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -108,6 +108,14 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { for (auto& dep : deps) { ((UnionDependency*)dep.get())->set_shared_state(ss); } + } else { + auto& deps = info.dependencys; + DCHECK(child_count == 0); + DCHECK(deps.size() == 1); + DCHECK(deps.front() == nullptr); + //child_count == 0 , we need to creat a UnionDependency + deps.front() = std::make_shared(_parent->operator_id()); + ((UnionDependency*)deps.front().get())->set_shared_state(ss); } RETURN_IF_ERROR(Base::init(state, info)); ss->data_queue.set_dependency(_dependency); @@ -178,7 +186,7 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b Status UnionSourceOperatorX::get_next_const(RuntimeState* state, vectorized::Block* block) { DCHECK_EQ(state->per_fragment_instance_idx(), 0); - auto& local_state = state->get_local_state(id())->cast(); + auto& local_state = state->get_local_state(operator_id())->cast(); DCHECK_LT(local_state._const_expr_list_idx, _const_expr_lists.size()); auto& _const_expr_list_idx = local_state._const_expr_list_idx; vectorized::MutableBlock mblock = diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 209d618699..0cf2d3d403 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -91,17 +91,10 @@ private: class UnionSourceOperatorX final : public OperatorX { public: using Base = OperatorX; - UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : Base(pool, tnode, descs), _child_size(tnode.num_children) {}; + UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : Base(pool, tnode, operator_id, descs), _child_size(tnode.num_children) {}; ~UnionSourceOperatorX() override = default; - Dependency* wait_for_dependency(RuntimeState* state) override { - if (_child_size == 0) { - return nullptr; - } - CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); - return local_state._dependency->read_blocked_by(); - } - Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override; @@ -140,14 +133,14 @@ public: private: bool _has_data(RuntimeState* state) { - auto& local_state = state->get_local_state(id())->cast(); + auto& local_state = state->get_local_state(operator_id())->cast(); if (_child_size == 0) { return local_state._need_read_for_const_expr; } return local_state._shared_state->data_queue.remaining_has_data(); } bool has_more_const(RuntimeState* state) const { - auto& local_state = state->get_local_state(id())->cast(); + auto& local_state = state->get_local_state(operator_id())->cast(); return state->per_fragment_instance_idx() == 0 && local_state._const_expr_list_idx < local_state._const_expr_lists.size(); } diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 79089c48ed..545b495906 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -19,6 +19,7 @@ #include +#include #include #include @@ -88,15 +89,11 @@ public: void set_parent(std::weak_ptr parent) { _parent = parent; } void add_child(std::shared_ptr child) { - std::unique_lock l(_lock); _children.push_back(child); child->set_parent(weak_from_this()); } - void remove_first_child() { - std::unique_lock l(_lock); - _children.erase(_children.begin()); - } + void remove_first_child() { _children.erase(_children.begin()); } protected: int _id; @@ -107,7 +104,6 @@ protected: std::weak_ptr _parent; std::list> _children; - std::mutex _lock; }; class WriteDependency : public Dependency { @@ -153,7 +149,7 @@ protected: MonotonicStopWatch _write_dependency_watcher; }; -class FinishDependency : public Dependency { +class FinishDependency final : public Dependency { public: FinishDependency(int id, std::string name) : Dependency(id, name), _ready_to_finish(true) {} ~FinishDependency() override = default; @@ -195,7 +191,7 @@ protected: MonotonicStopWatch _finish_dependency_watcher; }; -class AndDependency : public WriteDependency { +class AndDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(AndDependency); AndDependency(int id) : WriteDependency(id, "AndDependency") {} @@ -215,7 +211,6 @@ public: std::string debug_string(int indentation_level = 0) override; [[nodiscard]] Dependency* read_blocked_by() override { - std::unique_lock l(_lock); for (auto& child : _children) { if (auto* dep = child->read_blocked_by()) { return dep; @@ -225,7 +220,6 @@ public: } [[nodiscard]] WriteDependency* write_blocked_by() override { - std::unique_lock l(_lock); for (auto& child : _children) { CHECK(child->is_write_dependency()); if (auto* dep = ((WriteDependency*)child.get())->write_blocked_by()) { @@ -236,7 +230,7 @@ public: } }; -class OrDependency : public WriteDependency { +class OrDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(OrDependency); OrDependency(int id) : WriteDependency(id, "OrDependency") {} @@ -257,7 +251,6 @@ public: [[nodiscard]] Dependency* read_blocked_by() override { Dependency* res = nullptr; - std::unique_lock l(_lock); for (auto& child : _children) { auto* cur_res = child->read_blocked_by(); if (cur_res == nullptr) { @@ -271,7 +264,6 @@ public: [[nodiscard]] WriteDependency* write_blocked_by() override { WriteDependency* res = nullptr; - std::unique_lock l(_lock); for (auto& child : _children) { CHECK(child->is_write_dependency()); auto* cur_res = ((WriteDependency*)child.get())->write_blocked_by(); @@ -286,16 +278,30 @@ public: }; struct FakeSharedState {}; -struct FakeDependency : public WriteDependency { +struct FakeDependency final : public WriteDependency { public: - FakeDependency(int id) : WriteDependency(0, "FakeDependency") {} + FakeDependency(int id) : WriteDependency(id, "FakeDependency") {} using SharedState = FakeSharedState; void* shared_state() override { return nullptr; } - + [[nodiscard]] Dependency* read_blocked_by() override { return nullptr; } + [[nodiscard]] WriteDependency* write_blocked_by() override { return nullptr; } [[nodiscard]] int64_t read_watcher_elapse_time() override { return 0; } [[nodiscard]] int64_t write_watcher_elapse_time() override { return 0; } }; +class AsyncWriterSinkDependency : public WriteDependency { +public: + AsyncWriterSinkDependency(int id) : WriteDependency(id, "AsyncWriterSinkDependency") {} + using SharedState = FakeSharedState; + void* shared_state() override { return nullptr; } + [[nodiscard]] Dependency* read_blocked_by() override { return nullptr; } + [[nodiscard]] WriteDependency* write_blocked_by() override { return _call_func(); } + void set_write_blocked_by(std::function call_func) { + _call_func = call_func; + } + std::function _call_func; +}; + struct AggSharedState { public: AggSharedState() { @@ -321,7 +327,7 @@ public: std::unique_ptr data_queue = nullptr; }; -class AggDependency : public WriteDependency { +class AggDependency final : public WriteDependency { public: using SharedState = AggSharedState; AggDependency(int id) : WriteDependency(id, "AggDependency") { @@ -441,8 +447,10 @@ private: struct UnionSharedState { public: UnionSharedState(int child_count = 1, WriteDependency* dependency = nullptr) - : data_queue(child_count, dependency) {}; + : data_queue(child_count, dependency), _child_count(child_count) {}; + int child_count() const { return _child_count; } DataQueue data_queue; + const int _child_count; }; class UnionDependency final : public WriteDependency { @@ -465,7 +473,12 @@ public: _read_dependency_watcher.stop(); _ready_for_read = true; } - + [[nodiscard]] Dependency* read_blocked_by() override { + if (_union_state->child_count() == 0) { + return nullptr; + } + return WriteDependency::read_blocked_by(); + } void block_reading() override {} void block_writing() override {} @@ -489,13 +502,14 @@ public: void set_shared_state(std::shared_ptr multi_cast_state) { _multi_cast_state = multi_cast_state; } - MultiCastDependency* can_read(const int consumer_id) { - if (_multi_cast_state->multi_cast_data_streamer.can_read(consumer_id)) { + WriteDependency* read_blocked_by() override { + if (_multi_cast_state->multi_cast_data_streamer.can_read(_consumer_id)) { return nullptr; - } else { - return this; } + return this; } + int _consumer_id {}; + void set_consumer_id(int consumer_id) { _consumer_id = consumer_id; } private: std::shared_ptr _multi_cast_state; @@ -770,6 +784,14 @@ public: return _set_state->ready_for_read ? nullptr : this; } + [[nodiscard]] WriteDependency* write_blocked_by() override { + if (is_set_probe) { + DCHECK((_cur_child_id - 1) < _set_state->probe_finished_children_index.size()); + return _set_state->probe_finished_children_index[_cur_child_id - 1] ? nullptr : this; + } + return nullptr; + } + // Notify downstream pipeline tasks this dependency is ready. void set_ready_for_read() override { if (_set_state->ready_for_read) { @@ -778,9 +800,15 @@ public: _read_dependency_watcher.stop(); _set_state->ready_for_read = true; } + void set_cur_child_id(int id) { + _cur_child_id = id; + is_set_probe = true; + } private: std::shared_ptr _set_state; + int _cur_child_id; + bool is_set_probe {false}; }; } // namespace pipeline diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index dc7dfa5b29..2dc42a3b0a 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -17,6 +17,9 @@ #include "operator.h" +#include + +#include #include #include "common/logging.h" @@ -75,7 +78,7 @@ std::string PipelineXLocalState::debug_string(int indentation_le fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}", PipelineXLocalStateBase::debug_string(indentation_level)); - if (_dependency) { + if constexpr (!std::is_same_v) { fmt::format_to(debug_string_buffer, "\nDependency: \n {}", _dependency->debug_string(indentation_level + 1)); } @@ -87,7 +90,7 @@ std::string PipelineXSinkLocalState::debug_string(int indentatio fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}", PipelineXSinkLocalStateBase::debug_string(indentation_level)); - if (_dependency) { + if constexpr (!std::is_same_v) { fmt::format_to(debug_string_buffer, "\n{}Dependency: \n {}", std::string(indentation_level * 2, ' '), _dependency->debug_string(indentation_level + 1)); @@ -98,12 +101,12 @@ std::string PipelineXSinkLocalState::debug_string(int indentatio std::string OperatorXBase::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}{}: id={}", std::string(indentation_level * 2, ' '), - _op_name, _id); + _op_name, node_id()); return fmt::to_string(debug_string_buffer); } std::string OperatorXBase::debug_string(RuntimeState* state, int indentation_level) const { - return state->get_local_state(id())->debug_string(indentation_level); + return state->get_local_state(operator_id())->debug_string(indentation_level); } Status OperatorXBase::init(const TPlanNode& tnode, RuntimeState* /*state*/) { @@ -160,7 +163,7 @@ Status OperatorXBase::close(RuntimeState* state) { if (_child_x && !is_source()) { RETURN_IF_ERROR(_child_x->close(state)); } - return state->get_local_state(id())->close(state); + return state->get_local_state(operator_id())->close(state); } void PipelineXLocalStateBase::clear_origin_block() { @@ -169,7 +172,7 @@ void PipelineXLocalStateBase::clear_origin_block() { Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* origin_block, vectorized::Block* output_block) const { - auto local_state = state->get_local_state(id()); + auto local_state = state->get_local_state(operator_id()); SCOPED_TIMER(local_state->_projection_timer); using namespace vectorized; vectorized::MutableBlock mutable_block = @@ -202,7 +205,7 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori Status OperatorXBase::get_next_after_projects(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { - auto local_state = state->get_local_state(id()); + auto local_state = state->get_local_state(operator_id()); if (_output_row_descriptor) { local_state->clear_origin_block(); auto status = get_block(state, &local_state->_origin_block, source_state); @@ -232,7 +235,7 @@ std::string DataSinkOperatorXBase::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}{}: id={}", std::string(indentation_level * 2, ' '), - _name, _id); + _name, node_id()); return fmt::to_string(debug_string_buffer); } @@ -241,7 +244,7 @@ std::string PipelineXSinkLocalStateBase::debug_string(int indentation_level) con } std::string DataSinkOperatorXBase::debug_string(RuntimeState* state, int indentation_level) const { - return state->get_sink_local_state(id())->debug_string(indentation_level); + return state->get_sink_local_state(operator_id())->debug_string(indentation_level); } Status DataSinkOperatorXBase::init(const TDataSink& tsink) { @@ -267,9 +270,9 @@ Status DataSinkOperatorXBase::init(const TPlanNode& tnode, RuntimeState* state) template Status DataSinkOperatorX::setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) { - auto local_state = LocalStateType::create_shared(this, state); - state->emplace_sink_local_state(id(), local_state); + auto local_state = LocalStateType::create_unique(this, state); RETURN_IF_ERROR(local_state->init(state, info)); + state->emplace_sink_local_state(operator_id(), std::move(local_state)); return Status::OK(); } @@ -288,17 +291,18 @@ void DataSinkOperatorX::get_dependency(vector& d template Status OperatorX::setup_local_state(RuntimeState* state, LocalStateInfo& info) { - auto local_state = LocalStateType::create_shared(state, this); - state->emplace_local_state(id(), local_state); - return local_state->init(state, info); + auto local_state = LocalStateType::create_unique(state, this); + RETURN_IF_ERROR(local_state->init(state, info)); + state->emplace_local_state(operator_id(), std::move(local_state)); + return Status::OK(); } PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent, RuntimeState* state) : _parent(parent), _state(state), - _finish_dependency( - new FinishDependency(parent->id(), parent->get_name() + "_FINISH_DEPENDENCY")) {} + _finish_dependency(new FinishDependency(parent->operator_id(), + parent->get_name() + "_FINISH_DEPENDENCY")) {} PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent) : _num_rows_returned(0), @@ -306,8 +310,8 @@ PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB _peak_memory_usage_counter(nullptr), _parent(parent), _state(state), - _finish_dependency( - new FinishDependency(parent->id(), parent->get_name() + "_FINISH_DEPENDENCY")) {} + _finish_dependency(new FinishDependency(parent->operator_id(), + parent->get_name() + "_FINISH_DEPENDENCY")) {} template Status PipelineXLocalState::init(RuntimeState* state, LocalStateInfo& info) { @@ -318,7 +322,7 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState info.parent_profile->add_child(_runtime_profile.get(), true, nullptr); _wait_for_finish_dependency_timer = ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency"); - if constexpr (!std::is_same_v) { + if constexpr (!std::is_same_v) { auto& deps = info.dependencys; _dependency = (DependencyType*)deps.front().get(); if (_dependency) { @@ -326,6 +330,10 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState _wait_for_dependency_timer = ADD_TIMER( _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time"); } + } else { + auto& deps = info.dependencys; + deps.front() = std::make_shared(0); + _dependency = (DependencyType*)deps.front().get(); } _conjuncts.resize(_parent->_conjuncts.size()); @@ -355,7 +363,7 @@ Status PipelineXLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); } - if (_dependency) { + if constexpr (!std::is_same_v) { COUNTER_SET(_wait_for_dependency_timer, _dependency->read_watcher_elapse_time()); } COUNTER_SET(_wait_for_finish_dependency_timer, @@ -376,7 +384,7 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, _profile->set_metadata(_parent->node_id()); _profile->set_is_sink(true); _wait_for_finish_dependency_timer = ADD_TIMER(_profile, "PendingFinishDependency"); - if constexpr (!std::is_same_v) { + if constexpr (!std::is_same_v) { auto& deps = info.dependencys; _dependency = (DependencyType*)deps.front().get(); if (_dependency) { @@ -384,6 +392,10 @@ Status PipelineXSinkLocalState::init(RuntimeState* state, _wait_for_dependency_timer = ADD_TIMER(_profile, "WaitForDependency[" + _dependency->name() + "]Time"); } + } else { + auto& deps = info.dependencys; + deps.front() = std::make_shared(0); + _dependency = (DependencyType*)deps.front().get(); } _rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows", TUnit::UNIT, 1); _open_timer = ADD_TIMER_WITH_LEVEL(_profile, "OpenTime", 1); @@ -398,7 +410,7 @@ Status PipelineXSinkLocalState::close(RuntimeState* state, Statu if (_closed) { return Status::OK(); } - if (_dependency) { + if constexpr (!std::is_same_v) { COUNTER_SET(_wait_for_dependency_timer, _dependency->write_watcher_elapse_time()); } COUNTER_SET(_wait_for_finish_dependency_timer, @@ -418,7 +430,7 @@ Status StreamingOperatorX::get_block(RuntimeState* state, vector template Status StatefulOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { - auto& local_state = state->get_local_state(OperatorX::id()) + auto& local_state = state->get_local_state(OperatorX::operator_id()) ->template cast(); if (need_more_input_data(state)) { local_state._child_block->clear_column_data(); @@ -449,15 +461,17 @@ Status StatefulOperatorX::get_block(RuntimeState* state, vectori template Status AsyncWriterSink::init(RuntimeState* state, LocalSinkStateInfo& info) { - RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info)); + RETURN_IF_ERROR(Base::init(state, info)); _output_vexpr_ctxs.resize(_parent->cast()._output_vexpr_ctxs.size()); for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { RETURN_IF_ERROR( _parent->cast()._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); } - + static_cast(_dependency)->set_write_blocked_by([this]() { + return this->write_blocked_by(); + }); _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs)); - _async_writer_dependency = AsyncWriterDependency::create_shared(_parent->id()); + _async_writer_dependency = AsyncWriterDependency::create_shared(_parent->operator_id()); _writer->set_dependency(_async_writer_dependency.get(), _finish_dependency.get()); _wait_for_dependency_timer = @@ -467,7 +481,7 @@ Status AsyncWriterSink::init(RuntimeState* state, LocalSinkState template Status AsyncWriterSink::open(RuntimeState* state) { - RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state)); + RETURN_IF_ERROR(Base::open(state)); _writer->start_writer(state, _profile); return Status::OK(); } @@ -500,7 +514,7 @@ Status AsyncWriterSink::close(RuntimeState* state, Status exec_s RETURN_IF_ERROR(_writer->get_writer_status()); } } - return PipelineXSinkLocalState<>::close(state, exec_status); + return Base::close(state, exec_status); } template diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 0d3e2a13be..b7a437cf6a 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -17,28 +17,18 @@ #pragma once +#include "common/logging.h" #include "pipeline/exec/operator.h" +#include "pipeline/pipeline_x/dependency.h" namespace doris::pipeline { -#define CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state) \ - auto _sptr = state->get_local_state(id()); \ - if (!_sptr) return Status::InternalError("could not find local state id {}", id()); \ +#define CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state) \ + auto _sptr = state->get_local_state(operator_id()); \ auto& local_state = _sptr->template cast(); -#define CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state) \ - auto _sptr = state->get_sink_local_state(id()); \ - if (!_sptr) return Status::InternalError("could not find local state id {}", id()); \ - auto& local_state = _sptr->template cast(); - -#define CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state) \ - auto _sptr = state->get_local_state(id()); \ - if (!_sptr) return nullptr; \ - auto& local_state = _sptr->template cast(); - -#define CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state) \ - auto _sptr = state->get_sink_local_state(id()); \ - if (!_sptr) return nullptr; \ +#define CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state) \ + auto _sptr = state->get_sink_local_state(operator_id()); \ auto& local_state = _sptr->template cast(); // This struct is used only for initializing local state. @@ -105,6 +95,8 @@ public: [[nodiscard]] virtual std::string debug_string(int indentation_level = 0) const; + virtual Dependency* dependency() { return nullptr; } + protected: friend class OperatorXBase; @@ -140,9 +132,10 @@ protected: class OperatorXBase : public OperatorBase { public: - OperatorXBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + OperatorXBase(ObjectPool* pool, const TPlanNode& tnode, const int operator_id, + const DescriptorTbl& descs) : OperatorBase(nullptr), - _id(tnode.node_id), + _operator_id(operator_id), _node_id(tnode.node_id), _type(tnode.node_type), _pool(pool), @@ -155,8 +148,8 @@ public: } } - OperatorXBase(ObjectPool* pool, int node_id, int id) - : OperatorBase(nullptr), _id(id), _node_id(node_id), _pool(pool) {}; + OperatorXBase(ObjectPool* pool, int node_id, int operator_id) + : OperatorBase(nullptr), _operator_id(operator_id), _node_id(node_id), _pool(pool) {}; virtual Status init(const TPlanNode& tnode, RuntimeState* state); Status init(const TDataSink& tsink) override { LOG(FATAL) << "should not reach here!"; @@ -211,8 +204,6 @@ public: Status close(RuntimeState* state) override; - virtual Dependency* wait_for_dependency(RuntimeState* state) { return nullptr; } - virtual FinishDependency* finish_blocked_by(RuntimeState* state) const { return nullptr; } [[nodiscard]] virtual const RowDescriptor& intermediate_row_desc() const { @@ -247,7 +238,8 @@ public: [[nodiscard]] vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; } [[nodiscard]] RowDescriptor& row_descriptor() { return _row_descriptor; } - [[nodiscard]] int id() const override { return _id; } + [[nodiscard]] int id() const override { return node_id(); } + [[nodiscard]] int operator_id() const { return _operator_id; } [[nodiscard]] int node_id() const { return _node_id; } [[nodiscard]] int64_t limit() const { return _limit; } @@ -269,7 +261,7 @@ protected: template friend class PipelineXLocalState; friend class PipelineXLocalStateBase; - int _id; + const int _operator_id; const int _node_id; // unique w/in single plan tree TPlanNodeType::type _type; ObjectPool* _pool; @@ -293,9 +285,11 @@ protected: template class OperatorX : public OperatorXBase { public: - OperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : OperatorXBase(pool, tnode, descs) {} - OperatorX(ObjectPool* pool, int node_id, int id) : OperatorXBase(pool, node_id, id) {}; + OperatorX(ObjectPool* pool, const TPlanNode& tnode, const int operator_id, + const DescriptorTbl& descs) + : OperatorXBase(pool, tnode, operator_id, descs) {} + OperatorX(ObjectPool* pool, int node_id, int operator_id) + : OperatorXBase(pool, node_id, operator_id) {}; ~OperatorX() override = default; Status setup_local_state(RuntimeState* state, LocalStateInfo& info) override; @@ -315,6 +309,8 @@ public: [[nodiscard]] std::string debug_string(int indentation_level = 0) const override; + Dependency* dependency() override { return _dependency; } + protected: DependencyType* _dependency; typename DependencyType::SharedState* _shared_state; @@ -365,6 +361,8 @@ public: RuntimeProfile::Counter* rows_input_counter() { return _rows_input_counter; } + virtual WriteDependency* dependency() { return nullptr; } + protected: DataSinkOperatorXBase* _parent; RuntimeState* _state; @@ -393,14 +391,23 @@ protected: class DataSinkOperatorXBase : public OperatorBase { public: - DataSinkOperatorXBase(const int id) - : OperatorBase(nullptr), _id(id), _node_id(id), _dests_id({id}) {} + DataSinkOperatorXBase(const int operator_id, const int node_id) + : OperatorBase(nullptr), + _operator_id(operator_id), + _node_id(node_id), + _dests_id({operator_id}) {} - DataSinkOperatorXBase(const int id, const int node_id, const int dest_id) - : OperatorBase(nullptr), _id(id), _node_id(node_id), _dests_id({dest_id}) {} + DataSinkOperatorXBase(const int operator_id, const int node_id, const int dest_id) + : OperatorBase(nullptr), + _operator_id(operator_id), + _node_id(node_id), + _dests_id({dest_id}) {} - DataSinkOperatorXBase(const int id, const int node_id, std::vector& sources) - : OperatorBase(nullptr), _id(id), _node_id(node_id), _dests_id(sources) {} + DataSinkOperatorXBase(const int operator_id, const int node_id, std::vector& sources) + : OperatorBase(nullptr), + _operator_id(operator_id), + _node_id(node_id), + _dests_id(sources) {} ~DataSinkOperatorXBase() override = default; @@ -454,8 +461,6 @@ public: return false; } - virtual WriteDependency* wait_for_dependency(RuntimeState* state) { return nullptr; } - virtual FinishDependency* finish_blocked_by(RuntimeState* state) const { return nullptr; } [[nodiscard]] std::string debug_string() const override { return ""; } @@ -470,11 +475,11 @@ public: [[nodiscard]] bool is_source() const override { return false; } virtual Status close(RuntimeState* state, Status exec_status) { - return state->get_sink_local_state(id())->close(state, exec_status); + return state->get_sink_local_state(operator_id())->close(state, exec_status); } [[nodiscard]] virtual Status try_close(RuntimeState* state, Status exec_status) { - return state->get_sink_local_state(id())->try_close(state, exec_status); + return state->get_sink_local_state(operator_id())->try_close(state, exec_status); } [[nodiscard]] RuntimeProfile* get_runtime_profile() const override { @@ -483,10 +488,14 @@ public: return nullptr; } - [[nodiscard]] int id() const override { return _id; } + [[nodiscard]] int id() const override { return node_id(); } + + [[nodiscard]] int operator_id() const { return _operator_id; } [[nodiscard]] const std::vector& dests_id() const { return _dests_id; } + void set_dests_id(const std::vector& dest_id) { _dests_id = dest_id; } + [[nodiscard]] int node_id() const { return _node_id; } [[nodiscard]] std::string get_name() const override { return _name; } @@ -498,10 +507,10 @@ public: protected: template friend class AsyncWriterSink; - // _id : the current Operator's ID, which is not visible to the user. + // _operator_id : the current Operator's ID, which is not visible to the user. // _node_id : the plan node ID corresponding to the Operator, which is visible on the profile. - // _dests_id : the target ID of the sink, for example, in the case of a multi-sink, there are multiple targets. - const int _id; + // _dests_id : the target _operator_id of the sink, for example, in the case of a multi-sink, there are multiple targets. + const int _operator_id; const int _node_id; std::vector _dests_id; std::string _name; @@ -513,7 +522,8 @@ protected: template class DataSinkOperatorX : public DataSinkOperatorXBase { public: - DataSinkOperatorX(const int id) : DataSinkOperatorXBase(id) {} + DataSinkOperatorX(int operator_id, const int node_id) + : DataSinkOperatorXBase(operator_id, node_id) {} DataSinkOperatorX(const int id, const int node_id, const int source_id) : DataSinkOperatorXBase(id, node_id, source_id) {} @@ -548,6 +558,8 @@ public: virtual std::string id_name() { return " (id=" + std::to_string(_parent->node_id()) + ")"; } + WriteDependency* dependency() override { return _dependency; } + protected: DependencyType* _dependency = nullptr; typename DependencyType::SharedState* _shared_state; @@ -559,8 +571,9 @@ protected: template class StreamingOperatorX : public OperatorX { public: - StreamingOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : OperatorX(pool, tnode, descs) {} + StreamingOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : OperatorX(pool, tnode, operator_id, descs) {} virtual ~StreamingOperatorX() = default; Status get_block(RuntimeState* state, vectorized::Block* block, @@ -581,8 +594,9 @@ public: template class StatefulOperatorX : public OperatorX { public: - StatefulOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : OperatorX(pool, tnode, descs) {} + StatefulOperatorX(ObjectPool* pool, const TPlanNode& tnode, const int operator_id, + const DescriptorTbl& descs) + : OperatorX(pool, tnode, operator_id, descs) {} virtual ~StatefulOperatorX() = default; [[nodiscard]] Status get_block(RuntimeState* state, vectorized::Block* block, @@ -596,10 +610,11 @@ public: }; template -class AsyncWriterSink : public PipelineXSinkLocalState<> { +class AsyncWriterSink : public PipelineXSinkLocalState { public: + using Base = PipelineXSinkLocalState; AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state) - : PipelineXSinkLocalState<>(parent, state), _async_writer_dependency(nullptr) {} + : Base(parent, state), _async_writer_dependency(nullptr) {} Status init(RuntimeState* state, LocalSinkStateInfo& info) override; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 2c852eecac..77f1672f74 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -251,8 +251,8 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData params.__isset.send_query_statistics_with_every_batch ? params.send_query_statistics_with_every_batch : false; - _sink.reset(new ExchangeSinkOperatorX(state, row_desc, thrift_sink.stream_sink, - params.destinations, + _sink.reset(new ExchangeSinkOperatorX(state, row_desc, next_operator_id(), + thrift_sink.stream_sink, params.destinations, send_query_statistics_with_every_batch)); break; } @@ -262,7 +262,8 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData } // TODO: figure out good buffer size based on size of output row - _sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, thrift_sink.result_sink)); + _sink.reset(new ResultSinkOperatorX(next_operator_id(), row_desc, output_exprs, + thrift_sink.result_sink)); break; } case TDataSinkType::OLAP_TABLE_SINK: { @@ -270,7 +271,8 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData return Status::InternalError( "Unsuported OLAP_TABLE_SINK with enable_memtable_on_sink_node "); } else { - _sink.reset(new OlapTableSinkOperatorX(pool, row_desc, output_exprs, false)); + _sink.reset(new OlapTableSinkOperatorX(pool, next_operator_id(), row_desc, output_exprs, + false)); } break; } @@ -279,7 +281,7 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData return Status::InternalError("Missing data jdbc sink."); } if (config::enable_java_support) { - _sink.reset(new JdbcTableSinkOperatorX(row_desc, output_exprs)); + _sink.reset(new JdbcTableSinkOperatorX(row_desc, next_operator_id(), output_exprs)); } else { return Status::InternalError( "Jdbc table sink is not enabled, you can change be config " @@ -300,10 +302,10 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData // Result file sink is not the top sink if (params.__isset.destinations && params.destinations.size() > 0) { _sink.reset(new ResultFileSinkOperatorX( - row_desc, thrift_sink.result_file_sink, params.destinations, + next_operator_id(), row_desc, thrift_sink.result_file_sink, params.destinations, send_query_statistics_with_every_batch, output_exprs, desc_tbl)); } else { - _sink.reset(new ResultFileSinkOperatorX(row_desc, output_exprs)); + _sink.reset(new ResultFileSinkOperatorX(next_operator_id(), row_desc, output_exprs)); } break; } @@ -346,7 +348,8 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData DataSinkOperatorXPtr sink_op; sink_op.reset(new ExchangeSinkOperatorX( - state, *_row_desc, thrift_sink.multi_cast_stream_sink.sinks[i], + state, *_row_desc, next_operator_id(), + thrift_sink.multi_cast_stream_sink.sinks[i], thrift_sink.multi_cast_stream_sink.destinations[i], false)); static_cast(new_pipeline->set_sink(sink_op)); @@ -405,6 +408,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( _runtime_states[i]->set_desc_tbl(_query_ctx->desc_tbl); _runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id); _runtime_states[i]->set_num_per_fragment_instances(request.num_senders); + _runtime_states[i]->resize_op_id_to_local_state(max_operator_id()); std::map pipeline_id_to_task; for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { auto task = std::make_unique(_pipelines[pip_idx], _total_tasks++, @@ -503,7 +507,7 @@ Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool, int num_children = tnodes[*node_idx].num_children; OperatorXPtr op = nullptr; RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe, - parent == nullptr ? -1 : parent->id(), child_idx)); + parent == nullptr ? -1 : parent->node_id(), child_idx)); // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr)); if (parent != nullptr) { @@ -552,36 +556,37 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN std::stringstream error_msg; switch (tnode.node_type) { case TPlanNodeType::OLAP_SCAN_NODE: { - op.reset(new OlapScanOperatorX(pool, tnode, descs)); + op.reset(new OlapScanOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } case doris::TPlanNodeType::JDBC_SCAN_NODE: { - op.reset(new JDBCScanOperatorX(pool, tnode, descs)); + op.reset(new JDBCScanOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } case doris::TPlanNodeType::FILE_SCAN_NODE: { - op.reset(new FileScanOperatorX(pool, tnode, descs)); + op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } case TPlanNodeType::ES_SCAN_NODE: case TPlanNodeType::ES_HTTP_SCAN_NODE: { - op.reset(new EsScanOperatorX(pool, tnode, descs)); + op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } case TPlanNodeType::EXCHANGE_NODE: { int num_senders = find_with_default(request.per_exch_num_senders, tnode.node_id, 0); DCHECK_GT(num_senders, 0); - op.reset(new ExchangeSourceOperatorX(pool, tnode, descs, num_senders)); + op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), descs, num_senders)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } case TPlanNodeType::AGGREGATION_NODE: { if (tnode.agg_node.aggregate_functions.empty()) { - op.reset(new DistinctStreamingAggSourceOperatorX(pool, tnode, descs)); + op.reset(new DistinctStreamingAggSourceOperatorX(pool, tnode, next_operator_id(), + descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -591,12 +596,14 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN cur_pipe = add_pipeline(); _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new DistinctStreamingAggSinkOperatorX(pool, tnode, descs)); + sink.reset( + new DistinctStreamingAggSinkOperatorX(pool, next_operator_id(), tnode, descs)); + sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation) { - op.reset(new StreamingAggSourceOperatorX(pool, tnode, descs)); + op.reset(new StreamingAggSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -606,11 +613,12 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN cur_pipe = add_pipeline(); _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new StreamingAggSinkOperatorX(pool, tnode, descs)); + sink.reset(new StreamingAggSinkOperatorX(pool, next_operator_id(), tnode, descs)); + sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); } else { - op.reset(new AggSourceOperatorX(pool, tnode, descs)); + op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -621,14 +629,15 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new AggSinkOperatorX<>(pool, tnode, descs)); + sink.reset(new AggSinkOperatorX<>(pool, next_operator_id(), tnode, descs)); + sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); } break; } case TPlanNodeType::HASH_JOIN_NODE: { - op.reset(new HashJoinProbeOperatorX(pool, tnode, descs)); + op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -639,14 +648,15 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new HashJoinBuildSinkOperatorX(pool, tnode, descs)); + sink.reset(new HashJoinBuildSinkOperatorX(pool, next_operator_id(), tnode, descs)); + sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); - _build_side_pipelines.insert({sink->id(), build_side_pipe}); + _build_side_pipelines.insert({sink->node_id(), build_side_pipe}); break; } case TPlanNodeType::CROSS_JOIN_NODE: { - op.reset(new NestedLoopJoinProbeOperatorX(pool, tnode, descs)); + op.reset(new NestedLoopJoinProbeOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -657,15 +667,16 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool, tnode, descs)); + sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool, next_operator_id(), tnode, descs)); + sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); - _build_side_pipelines.insert({sink->id(), build_side_pipe}); + _build_side_pipelines.insert({sink->node_id(), build_side_pipe}); break; } case TPlanNodeType::UNION_NODE: { int child_count = tnode.num_children; - op.reset(new UnionSourceOperatorX(pool, tnode, descs)); + op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -678,6 +689,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorXPtr sink; sink.reset(new UnionSinkOperatorX(i, next_operator_id(), pool, tnode, descs)); + sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build. @@ -691,7 +703,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN break; } case TPlanNodeType::SORT_NODE: { - op.reset(new SortSourceOperatorX(pool, tnode, descs)); + op.reset(new SortSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -702,13 +714,14 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new SortSinkOperatorX(pool, tnode, descs)); + sink.reset(new SortSinkOperatorX(pool, next_operator_id(), tnode, descs)); + sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); break; } case doris::TPlanNodeType::PARTITION_SORT_NODE: { - op.reset(new PartitionSortSourceOperatorX(pool, tnode, descs)); + op.reset(new PartitionSortSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -719,13 +732,14 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new PartitionSortSinkOperatorX(pool, tnode, descs)); + sink.reset(new PartitionSortSinkOperatorX(pool, next_operator_id(), tnode, descs)); + sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); break; } case TPlanNodeType::ANALYTIC_EVAL_NODE: { - op.reset(new AnalyticSourceOperatorX(pool, tnode, descs)); + op.reset(new AnalyticSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -736,7 +750,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new AnalyticSinkOperatorX(pool, tnode, descs)); + sink.reset(new AnalyticSinkOperatorX(pool, next_operator_id(), tnode, descs)); + sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); break; @@ -752,42 +767,42 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN break; } case TPlanNodeType::REPEAT_NODE: { - op.reset(new RepeatOperatorX(pool, tnode, descs)); + op.reset(new RepeatOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } case TPlanNodeType::TABLE_FUNCTION_NODE: { - op.reset(new TableFunctionOperatorX(pool, tnode, descs)); + op.reset(new TableFunctionOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } case TPlanNodeType::ASSERT_NUM_ROWS_NODE: { - op.reset(new AssertNumRowsOperatorX(pool, tnode, descs)); + op.reset(new AssertNumRowsOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } case TPlanNodeType::EMPTY_SET_NODE: { - op.reset(new EmptySetSourceOperatorX(pool, tnode, descs)); + op.reset(new EmptySetSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } case TPlanNodeType::DATA_GEN_SCAN_NODE: { - op.reset(new DataGenSourceOperatorX(pool, tnode, descs)); + op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } case TPlanNodeType::SCHEMA_SCAN_NODE: { - op.reset(new SchemaScanOperatorX(pool, tnode, descs)); + op.reset(new SchemaScanOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } case TPlanNodeType::META_SCAN_NODE: { - op.reset(new MetaScanOperatorX(pool, tnode, descs)); + op.reset(new MetaScanOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } case TPlanNodeType::SELECT_NODE: { - op.reset(new SelectOperatorX(pool, tnode, descs)); + op.reset(new SelectOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } @@ -802,7 +817,7 @@ template Status PipelineXFragmentContext::_build_operators_for_set_operation_node( ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorXPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx) { - op.reset(new SetSourceOperatorX(pool, tnode, descs)); + op.reset(new SetSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -824,6 +839,7 @@ Status PipelineXFragmentContext::_build_operators_for_set_operation_node( sink.reset(new SetProbeSinkOperatorX(child_id, next_operator_id(), pool, tnode, descs)); } + sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(probe_side_pipe->set_sink(sink)); RETURN_IF_ERROR(probe_side_pipe->sink_x()->init(tnode, _runtime_state.get())); // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build. diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 3219b3d107..2f3db31687 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -111,7 +111,9 @@ public: } } - int next_operator_id() { return --_operator_id; } + [[nodiscard]] int next_operator_id() { return _operator_id++; } + + [[nodiscard]] int max_operator_id() const { return _operator_id; } private: void _close_action() override; @@ -175,7 +177,7 @@ private: // We can guarantee that a plan node ID can correspond to an operator ID, // but some operators do not have a corresponding plan node ID. // We set these IDs as negative numbers, which are not visible to the user. - int _operator_id = -1; + int _operator_id = 0; }; } // namespace pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index f7377a11da..7a2d837d69 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -25,6 +25,7 @@ #include #include +#include "common/status.h" #include "pipeline/exec/operator.h" #include "pipeline/exec/scan_operator.h" #include "pipeline/pipeline.h" @@ -73,26 +74,42 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams std::vector no_scan_ranges; auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, - _operators.front()->id(), no_scan_ranges); + _operators.front()->node_id(), no_scan_ranges); for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { - auto& deps = get_upstream_dependency(_operators[op_idx]->id()); + auto& deps = get_upstream_dependency(_operators[op_idx]->operator_id()); LocalStateInfo info { op_idx == _operators.size() - 1 ? _parent_profile - : state->get_local_state(_operators[op_idx + 1]->id())->profile(), + : state->get_local_state(_operators[op_idx + 1]->operator_id())->profile(), scan_ranges, deps}; RETURN_IF_ERROR(_operators[op_idx]->setup_local_state(state, info)); } _block = doris::vectorized::Block::create_unique(); - + RETURN_IF_ERROR(extract_dependencies()); // We should make sure initial state for task are runnable so that we can do some preparation jobs (e.g. initialize runtime filters). set_state(PipelineTaskState::RUNNABLE); _prepared = true; return Status::OK(); } +Status PipelineXTask::extract_dependencies() { + for (auto op : _operators) { + auto* local_state = _state->get_local_state(op->operator_id()); + auto* dep = local_state->dependency(); + DCHECK(dep != nullptr); + _read_dependencies.push_back(dep); + } + { + auto* local_state = _state->get_sink_local_state(_sink->operator_id()); + auto* dep = local_state->dependency(); + DCHECK(dep != nullptr); + _write_dependencies = dep; + } + return Status::OK(); +} + void PipelineXTask::_init_profile() { std::stringstream ss; ss << "PipelineTask" @@ -139,9 +156,9 @@ Status PipelineXTask::_open() { SCOPED_TIMER(_open_timer); _dry_run = _sink->should_dry_run(_state); for (auto& o : _operators) { - RETURN_IF_ERROR(_state->get_local_state(o->id())->open(_state)); + RETURN_IF_ERROR(_state->get_local_state(o->operator_id())->open(_state)); } - RETURN_IF_ERROR(_state->get_sink_local_state(_sink->id())->open(_state)); + RETURN_IF_ERROR(_state->get_sink_local_state(_sink->operator_id())->open(_state)); _opened = true; return Status::OK(); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index fa977c68f6..864d2e93b0 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -73,8 +73,8 @@ public: if (_dry_run) { return true; } - for (auto& op : _operators) { - auto dep = op->wait_for_dependency(_state); + for (auto* op_dep : _read_dependencies) { + auto* dep = op_dep->read_blocked_by(); if (dep != nullptr) { dep->start_read_watcher(); return false; @@ -88,7 +88,7 @@ public: } bool sink_can_write() override { - auto dep = _sink->wait_for_dependency(_state); + auto* dep = _write_dependencies->write_blocked_by(); if (dep != nullptr) { dep->start_write_watcher(); return false; @@ -141,6 +141,8 @@ public: return _upstream_dependency[id]; } + Status extract_dependencies(); + private: void set_close_pipeline_time() override {} void _init_profile() override; @@ -153,6 +155,9 @@ private: OperatorXPtr _root; DataSinkOperatorXPtr _sink; + std::vector _read_dependencies; + WriteDependency* _write_dependencies; + DependencyMap _upstream_dependency; std::vector _downstream_dependency; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index bad1487f32..8535e2dcc1 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -424,33 +424,27 @@ int64_t RuntimeState::get_load_mem_limit() { } } -void RuntimeState::emplace_local_state( - int id, std::shared_ptr state) { - std::unique_lock l(_local_state_lock); - _op_id_to_local_state.emplace(id, state); +void RuntimeState::resize_op_id_to_local_state(int size) { + _op_id_to_local_state.resize(size); + _op_id_to_sink_local_state.resize(size); } -std::shared_ptr RuntimeState::get_local_state(int id) { - std::unique_lock l(_local_state_lock); - if (_op_id_to_local_state.find(id) == _op_id_to_local_state.end()) { - return nullptr; - } - return _op_id_to_local_state[id]; +void RuntimeState::emplace_local_state( + int id, std::unique_ptr state) { + _op_id_to_local_state[id] = std::move(state); +} + +doris::pipeline::PipelineXLocalStateBase* RuntimeState::get_local_state(int id) { + return _op_id_to_local_state[id].get(); } void RuntimeState::emplace_sink_local_state( - int id, std::shared_ptr state) { - std::unique_lock l(_local_sink_state_lock); - _op_id_to_sink_local_state.emplace(id, state); + int id, std::unique_ptr state) { + _op_id_to_sink_local_state[id] = std::move(state); } -std::shared_ptr RuntimeState::get_sink_local_state( - int id) { - std::unique_lock l(_local_sink_state_lock); - if (_op_id_to_sink_local_state.find(id) == _op_id_to_sink_local_state.end()) { - return nullptr; - } - return _op_id_to_sink_local_state[id]; +doris::pipeline::PipelineXSinkLocalStateBase* RuntimeState::get_sink_local_state(int id) { + return _op_id_to_sink_local_state[id].get(); } bool RuntimeState::enable_page_cache() const { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 06ceb699e7..f555b54ba6 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -455,14 +455,16 @@ public: } void emplace_local_state(int id, - std::shared_ptr state); + std::unique_ptr state); - std::shared_ptr get_local_state(int id); + doris::pipeline::PipelineXLocalStateBase* get_local_state(int id); void emplace_sink_local_state( - int id, std::shared_ptr state); + int id, std::unique_ptr state); - std::shared_ptr get_sink_local_state(int id); + doris::pipeline::PipelineXSinkLocalStateBase* get_sink_local_state(int id); + + void resize_op_id_to_local_state(int size); private: Status create_error_log_file(); @@ -561,13 +563,10 @@ private: std::vector _tablet_commit_infos; std::vector _error_tablet_infos; - std::map> _op_id_to_local_state; - std::map> + std::vector> _op_id_to_local_state; + std::vector> _op_id_to_sink_local_state; - std::mutex _local_state_lock; - std::mutex _local_sink_state_lock; - QueryContext* _query_ctx = nullptr; // true if max_filter_ratio is 0