From 9e0a2e861cf05fcb7cb22fbcfe95955b16ab7f73 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 22 Dec 2023 17:24:39 +0800 Subject: [PATCH] [pipelineX](refactor) rename functions (#28846) --- be/src/pipeline/exec/aggregation_sink_operator.h | 4 ++-- be/src/pipeline/exec/analytic_sink_operator.h | 4 ++-- be/src/pipeline/exec/assert_num_rows_operator.h | 2 +- be/src/pipeline/exec/exchange_source_operator.h | 2 +- be/src/pipeline/exec/hashjoin_build_sink.h | 2 +- be/src/pipeline/exec/hashjoin_probe_operator.h | 2 +- be/src/pipeline/exec/nested_loop_join_build_operator.h | 2 +- be/src/pipeline/exec/nested_loop_join_probe_operator.h | 2 +- be/src/pipeline/exec/partition_sort_sink_operator.h | 4 ++-- be/src/pipeline/exec/scan_operator.h | 2 +- be/src/pipeline/exec/set_probe_sink_operator.h | 2 +- be/src/pipeline/exec/set_sink_operator.h | 2 +- be/src/pipeline/exec/sort_sink_operator.h | 4 ++-- .../pipeline/exec/streaming_aggregation_sink_operator.h | 2 +- be/src/pipeline/pipeline.h | 2 +- be/src/pipeline/pipeline_x/operator.h | 4 ++-- .../pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 8 ++++---- 17 files changed, 25 insertions(+), 25 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 2cd6ef5093..97be9dcd6a 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -366,12 +366,12 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_probe_expr_ctxs.empty()) { return _needs_finalize || DataSinkOperatorX::_child_x ->ignore_data_distribution() ? DataDistribution(ExchangeType::PASSTHROUGH) - : DataSinkOperatorX::get_local_exchange_type(); + : DataSinkOperatorX::required_data_distribution(); } return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 14ed8c815b..3e0eb85f76 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -107,7 +107,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_partition_by_eq_expr_ctxs.empty()) { return {ExchangeType::PASSTHROUGH}; } else if (_order_by_eq_expr_ctxs.empty()) { @@ -115,7 +115,7 @@ public: ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } - return DataSinkOperatorX::get_local_exchange_type(); + return DataSinkOperatorX::required_data_distribution(); } private: diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h b/be/src/pipeline/exec/assert_num_rows_operator.h index 1e796b622d..bb5e65168b 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.h +++ b/be/src/pipeline/exec/assert_num_rows_operator.h @@ -57,7 +57,7 @@ public: [[nodiscard]] bool is_source() const override { return false; } - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { return {ExchangeType::PASSTHROUGH}; } diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 221a43779a..b621da3807 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -117,7 +117,7 @@ public: return _sub_plan_query_statistics_recvr; } - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (OperatorX::ignore_data_distribution()) { return {ExchangeType::NOOP}; } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index ecf0a4a312..24faa4115d 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -156,7 +156,7 @@ public: ._should_build_hash_table; } - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; } else if (_is_broadcast_join) { diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 16b455e4f6..5dde597ec7 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -163,7 +163,7 @@ public: SourceState& source_state) const override; bool need_more_input_data(RuntimeState* state) const override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; } diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index daa976b4e7..ea0820253c 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -102,7 +102,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; } diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 5e57399eae..bc8913f5d0 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -227,7 +227,7 @@ public: return _old_version_flag ? _row_descriptor : *_intermediate_row_desc; } - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return {ExchangeType::NOOP}; } diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index 1a47e0fa13..486e705621 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -105,9 +105,9 @@ public: Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) { - return DataSinkOperatorX::get_local_exchange_type(); + return DataSinkOperatorX::required_data_distribution(); } return {ExchangeType::PASSTHROUGH}; } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 9bc42453c7..3690e9eb39 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -434,7 +434,7 @@ public: TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_col_distribute_ids.empty() || OperatorX::ignore_data_distribution()) { // 1. `_col_distribute_ids` is empty means storage distribution is not effective, so we prefer to do local shuffle. // 2. `ignore_data_distribution()` returns true means we ignore the distribution. diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index a86bf49172..6f453ff31f 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -144,7 +144,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 375906b5aa..635d1ee867 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -129,7 +129,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 3146e915ee..2f5512e108 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -93,12 +93,12 @@ public: Status open(RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { if (_merge_by_exchange) { // The current sort node is used for the ORDER BY return {ExchangeType::PASSTHROUGH}; } - return DataSinkOperatorX::get_local_exchange_type(); + return DataSinkOperatorX::required_data_distribution(); } private: diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h index ef7f71b7e2..a7fcdcf847 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h @@ -120,7 +120,7 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - DataDistribution get_local_exchange_type() const override { + DataDistribution required_data_distribution() const override { return {ExchangeType::PASSTHROUGH}; } }; diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 305676856a..2775c45019 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -148,7 +148,7 @@ public: } } void init_data_distribution() { - set_data_distribution(operatorXs.front()->get_local_exchange_type()); + set_data_distribution(operatorXs.front()->required_data_distribution()); } void set_data_distribution(const DataDistribution& data_distribution) { _data_distribution = data_distribution; diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index fc95785924..da52706b56 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -181,7 +181,7 @@ public: } [[nodiscard]] std::string get_name() const override { return _op_name; } [[nodiscard]] virtual DependencySPtr get_dependency(QueryContext* ctx) = 0; - [[nodiscard]] virtual DataDistribution get_local_exchange_type() const { + [[nodiscard]] virtual DataDistribution required_data_distribution() const { return _child_x && _child_x->ignore_data_distribution() && !is_source() ? DataDistribution(ExchangeType::PASSTHROUGH) : DataDistribution(ExchangeType::NOOP); @@ -481,7 +481,7 @@ public: } virtual void get_dependency(std::vector& dependency, QueryContext* ctx) = 0; - virtual DataDistribution get_local_exchange_type() const { + virtual DataDistribution required_data_distribution() const { return _child_x && _child_x->ignore_data_distribution() ? DataDistribution(ExchangeType::PASSTHROUGH) : DataDistribution(ExchangeType::NOOP); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index fe7388735e..7efe476c6d 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -288,10 +288,10 @@ Status PipelineXFragmentContext::_plan_local_exchange( do_local_exchange = false; // Plan local exchange for each operator. for (; idx < ops.size();) { - if (ops[idx]->get_local_exchange_type().need_local_exchange()) { + if (ops[idx]->required_data_distribution().need_local_exchange()) { RETURN_IF_ERROR(_add_local_exchange( pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, - ops[idx]->get_local_exchange_type(), &do_local_exchange, num_buckets, + ops[idx]->required_data_distribution(), &do_local_exchange, num_buckets, bucket_seq_to_instance_idx, ignore_data_hash_distribution)); } if (do_local_exchange) { @@ -305,10 +305,10 @@ Status PipelineXFragmentContext::_plan_local_exchange( idx++; } } while (do_local_exchange); - if (pip->sink_x()->get_local_exchange_type().need_local_exchange()) { + if (pip->sink_x()->required_data_distribution().need_local_exchange()) { RETURN_IF_ERROR(_add_local_exchange( pip_idx, idx, pip->sink_x()->node_id(), _runtime_state->obj_pool(), pip, - pip->sink_x()->get_local_exchange_type(), &do_local_exchange, num_buckets, + pip->sink_x()->required_data_distribution(), &do_local_exchange, num_buckets, bucket_seq_to_instance_idx, ignore_data_hash_distribution)); } return Status::OK();