[pipelineX](refactor) rename functions (#28846)
This commit is contained in:
@ -366,12 +366,12 @@ public:
|
||||
Status sink(RuntimeState* state, vectorized::Block* in_block,
|
||||
SourceState source_state) override;
|
||||
|
||||
DataDistribution get_local_exchange_type() const override {
|
||||
DataDistribution required_data_distribution() const override {
|
||||
if (_probe_expr_ctxs.empty()) {
|
||||
return _needs_finalize || DataSinkOperatorX<LocalStateType>::_child_x
|
||||
->ignore_data_distribution()
|
||||
? DataDistribution(ExchangeType::PASSTHROUGH)
|
||||
: DataSinkOperatorX<LocalStateType>::get_local_exchange_type();
|
||||
: DataSinkOperatorX<LocalStateType>::required_data_distribution();
|
||||
}
|
||||
return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
|
||||
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
|
||||
|
||||
@ -107,7 +107,7 @@ public:
|
||||
|
||||
Status sink(RuntimeState* state, vectorized::Block* in_block,
|
||||
SourceState source_state) override;
|
||||
DataDistribution get_local_exchange_type() const override {
|
||||
DataDistribution required_data_distribution() const override {
|
||||
if (_partition_by_eq_expr_ctxs.empty()) {
|
||||
return {ExchangeType::PASSTHROUGH};
|
||||
} else if (_order_by_eq_expr_ctxs.empty()) {
|
||||
@ -115,7 +115,7 @@ public:
|
||||
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
|
||||
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
|
||||
}
|
||||
return DataSinkOperatorX<AnalyticSinkLocalState>::get_local_exchange_type();
|
||||
return DataSinkOperatorX<AnalyticSinkLocalState>::required_data_distribution();
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
@ -57,7 +57,7 @@ public:
|
||||
|
||||
[[nodiscard]] bool is_source() const override { return false; }
|
||||
|
||||
DataDistribution get_local_exchange_type() const override {
|
||||
DataDistribution required_data_distribution() const override {
|
||||
return {ExchangeType::PASSTHROUGH};
|
||||
}
|
||||
|
||||
|
||||
@ -117,7 +117,7 @@ public:
|
||||
return _sub_plan_query_statistics_recvr;
|
||||
}
|
||||
|
||||
DataDistribution get_local_exchange_type() const override {
|
||||
DataDistribution required_data_distribution() const override {
|
||||
if (OperatorX<ExchangeLocalState>::ignore_data_distribution()) {
|
||||
return {ExchangeType::NOOP};
|
||||
}
|
||||
|
||||
@ -156,7 +156,7 @@ public:
|
||||
._should_build_hash_table;
|
||||
}
|
||||
|
||||
DataDistribution get_local_exchange_type() const override {
|
||||
DataDistribution required_data_distribution() const override {
|
||||
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
|
||||
return {ExchangeType::NOOP};
|
||||
} else if (_is_broadcast_join) {
|
||||
|
||||
@ -163,7 +163,7 @@ public:
|
||||
SourceState& source_state) const override;
|
||||
|
||||
bool need_more_input_data(RuntimeState* state) const override;
|
||||
DataDistribution get_local_exchange_type() const override {
|
||||
DataDistribution required_data_distribution() const override {
|
||||
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
|
||||
return {ExchangeType::NOOP};
|
||||
}
|
||||
|
||||
@ -102,7 +102,7 @@ public:
|
||||
Status sink(RuntimeState* state, vectorized::Block* in_block,
|
||||
SourceState source_state) override;
|
||||
|
||||
DataDistribution get_local_exchange_type() const override {
|
||||
DataDistribution required_data_distribution() const override {
|
||||
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
|
||||
return {ExchangeType::NOOP};
|
||||
}
|
||||
|
||||
@ -227,7 +227,7 @@ public:
|
||||
return _old_version_flag ? _row_descriptor : *_intermediate_row_desc;
|
||||
}
|
||||
|
||||
DataDistribution get_local_exchange_type() const override {
|
||||
DataDistribution required_data_distribution() const override {
|
||||
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
|
||||
return {ExchangeType::NOOP};
|
||||
}
|
||||
|
||||
@ -105,9 +105,9 @@ public:
|
||||
Status open(RuntimeState* state) override;
|
||||
Status sink(RuntimeState* state, vectorized::Block* in_block,
|
||||
SourceState source_state) override;
|
||||
DataDistribution get_local_exchange_type() const override {
|
||||
DataDistribution required_data_distribution() const override {
|
||||
if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
|
||||
return DataSinkOperatorX<PartitionSortSinkLocalState>::get_local_exchange_type();
|
||||
return DataSinkOperatorX<PartitionSortSinkLocalState>::required_data_distribution();
|
||||
}
|
||||
return {ExchangeType::PASSTHROUGH};
|
||||
}
|
||||
|
||||
@ -434,7 +434,7 @@ public:
|
||||
|
||||
TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
|
||||
|
||||
DataDistribution get_local_exchange_type() const override {
|
||||
DataDistribution required_data_distribution() const override {
|
||||
if (_col_distribute_ids.empty() || OperatorX<LocalStateType>::ignore_data_distribution()) {
|
||||
// 1. `_col_distribute_ids` is empty means storage distribution is not effective, so we prefer to do local shuffle.
|
||||
// 2. `ignore_data_distribution()` returns true means we ignore the distribution.
|
||||
|
||||
@ -144,7 +144,7 @@ public:
|
||||
|
||||
Status sink(RuntimeState* state, vectorized::Block* in_block,
|
||||
SourceState source_state) override;
|
||||
DataDistribution get_local_exchange_type() const override {
|
||||
DataDistribution required_data_distribution() const override {
|
||||
return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
|
||||
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
|
||||
}
|
||||
|
||||
@ -129,7 +129,7 @@ public:
|
||||
|
||||
Status sink(RuntimeState* state, vectorized::Block* in_block,
|
||||
SourceState source_state) override;
|
||||
DataDistribution get_local_exchange_type() const override {
|
||||
DataDistribution required_data_distribution() const override {
|
||||
return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
|
||||
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
|
||||
}
|
||||
|
||||
@ -93,12 +93,12 @@ public:
|
||||
Status open(RuntimeState* state) override;
|
||||
Status sink(RuntimeState* state, vectorized::Block* in_block,
|
||||
SourceState source_state) override;
|
||||
DataDistribution get_local_exchange_type() const override {
|
||||
DataDistribution required_data_distribution() const override {
|
||||
if (_merge_by_exchange) {
|
||||
// The current sort node is used for the ORDER BY
|
||||
return {ExchangeType::PASSTHROUGH};
|
||||
}
|
||||
return DataSinkOperatorX<SortSinkLocalState>::get_local_exchange_type();
|
||||
return DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
@ -120,7 +120,7 @@ public:
|
||||
Status init(const TPlanNode& tnode, RuntimeState* state) override;
|
||||
Status sink(RuntimeState* state, vectorized::Block* in_block,
|
||||
SourceState source_state) override;
|
||||
DataDistribution get_local_exchange_type() const override {
|
||||
DataDistribution required_data_distribution() const override {
|
||||
return {ExchangeType::PASSTHROUGH};
|
||||
}
|
||||
};
|
||||
|
||||
@ -148,7 +148,7 @@ public:
|
||||
}
|
||||
}
|
||||
void init_data_distribution() {
|
||||
set_data_distribution(operatorXs.front()->get_local_exchange_type());
|
||||
set_data_distribution(operatorXs.front()->required_data_distribution());
|
||||
}
|
||||
void set_data_distribution(const DataDistribution& data_distribution) {
|
||||
_data_distribution = data_distribution;
|
||||
|
||||
@ -181,7 +181,7 @@ public:
|
||||
}
|
||||
[[nodiscard]] std::string get_name() const override { return _op_name; }
|
||||
[[nodiscard]] virtual DependencySPtr get_dependency(QueryContext* ctx) = 0;
|
||||
[[nodiscard]] virtual DataDistribution get_local_exchange_type() const {
|
||||
[[nodiscard]] virtual DataDistribution required_data_distribution() const {
|
||||
return _child_x && _child_x->ignore_data_distribution() && !is_source()
|
||||
? DataDistribution(ExchangeType::PASSTHROUGH)
|
||||
: DataDistribution(ExchangeType::NOOP);
|
||||
@ -481,7 +481,7 @@ public:
|
||||
}
|
||||
|
||||
virtual void get_dependency(std::vector<DependencySPtr>& dependency, QueryContext* ctx) = 0;
|
||||
virtual DataDistribution get_local_exchange_type() const {
|
||||
virtual DataDistribution required_data_distribution() const {
|
||||
return _child_x && _child_x->ignore_data_distribution()
|
||||
? DataDistribution(ExchangeType::PASSTHROUGH)
|
||||
: DataDistribution(ExchangeType::NOOP);
|
||||
|
||||
@ -288,10 +288,10 @@ Status PipelineXFragmentContext::_plan_local_exchange(
|
||||
do_local_exchange = false;
|
||||
// Plan local exchange for each operator.
|
||||
for (; idx < ops.size();) {
|
||||
if (ops[idx]->get_local_exchange_type().need_local_exchange()) {
|
||||
if (ops[idx]->required_data_distribution().need_local_exchange()) {
|
||||
RETURN_IF_ERROR(_add_local_exchange(
|
||||
pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
|
||||
ops[idx]->get_local_exchange_type(), &do_local_exchange, num_buckets,
|
||||
ops[idx]->required_data_distribution(), &do_local_exchange, num_buckets,
|
||||
bucket_seq_to_instance_idx, ignore_data_hash_distribution));
|
||||
}
|
||||
if (do_local_exchange) {
|
||||
@ -305,10 +305,10 @@ Status PipelineXFragmentContext::_plan_local_exchange(
|
||||
idx++;
|
||||
}
|
||||
} while (do_local_exchange);
|
||||
if (pip->sink_x()->get_local_exchange_type().need_local_exchange()) {
|
||||
if (pip->sink_x()->required_data_distribution().need_local_exchange()) {
|
||||
RETURN_IF_ERROR(_add_local_exchange(
|
||||
pip_idx, idx, pip->sink_x()->node_id(), _runtime_state->obj_pool(), pip,
|
||||
pip->sink_x()->get_local_exchange_type(), &do_local_exchange, num_buckets,
|
||||
pip->sink_x()->required_data_distribution(), &do_local_exchange, num_buckets,
|
||||
bucket_seq_to_instance_idx, ignore_data_hash_distribution));
|
||||
}
|
||||
return Status::OK();
|
||||
|
||||
Reference in New Issue
Block a user