From 1be513b9273eee8a7be57311186f433cbc4961c0 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 6 Dec 2023 10:02:36 +0800 Subject: [PATCH] [pipelineX](local shuffle) Fix local shuffle for colocate/bucket join (#28032) --- .../pipeline/exec/aggregation_sink_operator.h | 2 +- .../exec/aggregation_source_operator.cpp | 1 - ...inct_streaming_aggregation_sink_operator.h | 1 - be/src/pipeline/exec/hashjoin_build_sink.cpp | 2 + be/src/pipeline/exec/hashjoin_build_sink.h | 6 +- .../pipeline/exec/hashjoin_probe_operator.cpp | 2 + .../pipeline/exec/hashjoin_probe_operator.h | 9 +- .../pipeline/exec/join_build_sink_operator.h | 2 +- be/src/pipeline/exec/join_probe_operator.h | 2 +- be/src/pipeline/exec/scan_operator.h | 7 ++ be/src/pipeline/pipeline_fragment_context.cpp | 8 +- be/src/pipeline/pipeline_fragment_context.h | 2 +- be/src/pipeline/pipeline_x/dependency.cpp | 2 +- be/src/pipeline/pipeline_x/dependency.h | 11 +- .../local_exchange_sink_operator.cpp | 3 +- .../local_exchange_sink_operator.h | 25 ++++- .../local_exchange_source_operator.cpp | 7 +- .../local_exchange_source_operator.h | 2 +- .../local_exchange/local_exchanger.cpp | 35 ++++-- .../local_exchange/local_exchanger.h | 35 ++++-- be/src/pipeline/pipeline_x/operator.h | 3 +- .../pipeline_x_fragment_context.cpp | 103 +++++++++++------- .../pipeline_x/pipeline_x_fragment_context.h | 12 +- be/src/vec/exec/scan/pip_scanner_context.h | 5 +- .../apache/doris/planner/HashJoinNode.java | 18 +++ .../apache/doris/planner/OlapScanNode.java | 9 +- .../apache/doris/planner/PlanFragment.java | 10 ++ .../java/org/apache/doris/qe/Coordinator.java | 19 ++++ .../org/apache/doris/qe/SessionVariable.java | 10 +- gensrc/thrift/PaloInternalService.thrift | 2 + gensrc/thrift/PlanNodes.thrift | 9 ++ 31 files changed, 265 insertions(+), 99 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 232a61d50f..a6fa439f2f 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -371,7 +371,7 @@ public: if (_probe_expr_ctxs.empty()) { return _needs_finalize ? ExchangeType::PASSTHROUGH : ExchangeType::NOOP; } - return ExchangeType::SHUFFLE; + return ExchangeType::HASH_SHUFFLE; } using DataSinkOperatorX::id; diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index a417c1fa99..765fdec3da 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -50,7 +50,6 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) { _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize", TUnit::UNIT); auto& p = _parent->template cast(); if (p._is_streaming) { - _shared_state->data_queue.reset(new DataQueue(1)); _shared_state->data_queue->set_source_dependency(_dependency); } if (p._without_key) { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h index c04e35448d..b30a829872 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h @@ -110,7 +110,6 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; - ExchangeType get_local_exchange_type() const override { return ExchangeType::PASSTHROUGH; } }; } // namespace pipeline diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 92c61882e2..efc7582848 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -385,6 +385,8 @@ HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int ope const TPlanNode& tnode, const DescriptorTbl& descs) : JoinBuildSinkOperatorX(pool, operator_id, tnode, descs), + _join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type + : TJoinDistributionType::NONE), _is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join && tnode.hash_join_node.is_broadcast_join) { _runtime_filter_descs = tnode.runtime_filters; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index cc34f46d4d..c00d1b8e59 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -160,12 +160,16 @@ public: if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || _is_broadcast_join) { return ExchangeType::NOOP; } - return ExchangeType::SHUFFLE; + return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || + _join_distribution == TJoinDistributionType::COLOCATE + ? ExchangeType::BUCKET_HASH_SHUFFLE + : ExchangeType::HASH_SHUFFLE; } private: friend class HashJoinBuildSinkLocalState; + const TJoinDistributionType::type _join_distribution; // build expr vectorized::VExprContextSPtrs _build_expr_ctxs; // mark the build hash table whether it needs to store null value diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 74304bf535..2a788c6af4 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -222,6 +222,8 @@ void HashJoinProbeLocalState::_prepare_probe_block() { HashJoinProbeOperatorX::HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) : JoinProbeOperatorX(pool, tnode, operator_id, descs), + _join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type + : TJoinDistributionType::NONE), _is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join && tnode.hash_join_node.is_broadcast_join), _hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 263b4d4252..5ea66375ea 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -168,7 +168,12 @@ public: if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return ExchangeType::NOOP; } - return _is_broadcast_join ? ExchangeType::PASSTHROUGH : ExchangeType::SHUFFLE; + return _is_broadcast_join + ? ExchangeType::PASSTHROUGH + : (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || + _join_distribution == TJoinDistributionType::COLOCATE + ? ExchangeType::BUCKET_HASH_SHUFFLE + : ExchangeType::HASH_SHUFFLE); } private: @@ -177,6 +182,8 @@ private: std::vector& res_col_ids) const; friend class HashJoinProbeLocalState; + const TJoinDistributionType::type _join_distribution; + const bool _is_broadcast_join; // other expr vectorized::VExprContextSPtrs _other_join_conjuncts; diff --git a/be/src/pipeline/exec/join_build_sink_operator.h b/be/src/pipeline/exec/join_build_sink_operator.h index 8eeb02e2af..f7c1415d37 100644 --- a/be/src/pipeline/exec/join_build_sink_operator.h +++ b/be/src/pipeline/exec/join_build_sink_operator.h @@ -57,7 +57,7 @@ protected: template friend class JoinBuildSinkLocalState; - TJoinOp::type _join_op; + const TJoinOp::type _join_op; vectorized::JoinOpVariants _join_op_variants; const bool _have_other_join_conjunct; diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index 6a947c5f6b..9c6280bb64 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -98,7 +98,7 @@ protected: template friend class JoinProbeLocalState; - TJoinOp::type _join_op; + const TJoinOp::type _join_op; const bool _have_other_join_conjunct; const bool _match_all_probe; // output all rows coming from the probe input. Full/Left Join const bool _match_all_build; // output all rows coming from the build input. Full/Right Join diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index ebc1317b93..d7dd83a2e5 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -427,6 +427,13 @@ public: TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } + bool need_to_local_shuffle() const override { + // If _col_distribute_ids is not empty, we prefer to not do local shuffle. + return _col_distribute_ids.empty(); + } + + bool is_bucket_shuffle_scan() const override { return !_col_distribute_ids.empty(); } + int64_t get_push_down_count() const { return _push_down_count; } using OperatorX::id; using OperatorX::operator_id; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 5812da5b7f..9f93808d37 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -194,11 +194,15 @@ PipelinePtr PipelineFragmentContext::add_pipeline() { return pipeline; } -PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent) { +PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { // _prepared、_submitted, _canceled should do not add pipeline PipelineId id = _next_pipeline_id++; auto pipeline = std::make_shared(id, weak_from_this()); - _pipelines.emplace_back(pipeline); + if (idx >= 0) { + _pipelines.insert(_pipelines.begin() + idx, pipeline); + } else { + _pipelines.emplace_back(pipeline); + } parent->set_children(pipeline); return pipeline; } diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 480e4332d4..c8883248d4 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -71,7 +71,7 @@ public: PipelinePtr add_pipeline(); - PipelinePtr add_pipeline(PipelinePtr parent); + PipelinePtr add_pipeline(PipelinePtr parent, int idx = -1); TUniqueId get_fragment_instance_id() const { return _fragment_instance_id; } diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index dcb149d078..0be3d2d80b 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -182,7 +182,7 @@ void RuntimeFilterDependency::sub_filters() { void LocalExchangeSharedState::sub_running_sink_operators() { std::unique_lock lc(le_lock); - if (exchanger->running_sink_operators.fetch_sub(1) == 1) { + if (exchanger->_running_sink_operators.fetch_sub(1) == 1) { _set_ready_for_read(); } } diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 23ce13a1db..576ec5aff1 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -339,7 +339,7 @@ public: size_t input_num_rows = 0; std::vector values; std::unique_ptr agg_profile_arena; - std::unique_ptr data_queue; + std::unique_ptr data_queue = std::make_unique(1); /// The total size of the row from the aggregate functions. size_t total_size_of_aggregate_states = 0; size_t align_aggregate_states = 1; @@ -576,18 +576,21 @@ public: enum class ExchangeType : uint8_t { NOOP = 0, - SHUFFLE = 1, + HASH_SHUFFLE = 1, PASSTHROUGH = 2, + BUCKET_HASH_SHUFFLE = 3, }; inline std::string get_exchange_type_name(ExchangeType idx) { switch (idx) { case ExchangeType::NOOP: return "NOOP"; - case ExchangeType::SHUFFLE: - return "SHUFFLE"; + case ExchangeType::HASH_SHUFFLE: + return "HASH_SHUFFLE"; case ExchangeType::PASSTHROUGH: return "PASSTHROUGH"; + case ExchangeType::BUCKET_HASH_SHUFFLE: + return "BUCKET_HASH_SHUFFLE"; } LOG(FATAL) << "__builtin_unreachable"; __builtin_unreachable(); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp index 3d1540cdc4..b950febfea 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -31,7 +31,8 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _exchanger = _shared_state->exchanger.get(); DCHECK(_exchanger != nullptr); - if (_exchanger->get_type() == ExchangeType::SHUFFLE) { + if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE || + _exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) { auto& p = _parent->cast(); RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner)); } diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index f5f1d0f48a..2be71e5847 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -46,9 +46,12 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + int get_data_queue_idx() const; + private: friend class LocalExchangeSinkOperatorX; friend class ShuffleExchanger; + friend class BucketShuffleExchanger; friend class PassthroughExchanger; Exchanger* _exchanger = nullptr; @@ -78,8 +81,12 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX; LocalExchangeSinkOperatorX(int sink_id, int dest_id, int num_partitions, - const std::vector& texprs) - : Base(sink_id, -1, dest_id), _num_partitions(num_partitions), _texprs(texprs) {} + const std::vector& texprs, + const std::map& bucket_seq_to_instance_idx) + : Base(sink_id, dest_id, dest_id), + _num_partitions(num_partitions), + _texprs(texprs), + _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx) {} Status init(const TPlanNode& tnode, RuntimeState* state) override { return Status::InternalError("{} should not init with TPlanNode", Base::_name); @@ -89,20 +96,24 @@ public: return Status::InternalError("{} should not init with TPlanNode", Base::_name); } - Status init(ExchangeType type) override { + Status init(ExchangeType type, int num_buckets) override { _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")"; _type = type; - if (_type == ExchangeType::SHUFFLE) { + if (_type == ExchangeType::HASH_SHUFFLE) { _partitioner.reset( new vectorized::Crc32HashPartitioner(_num_partitions)); RETURN_IF_ERROR(_partitioner->init(_texprs)); + } else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) { + _partitioner.reset(new vectorized::Crc32HashPartitioner( + num_buckets)); + RETURN_IF_ERROR(_partitioner->init(_texprs)); } return Status::OK(); } Status prepare(RuntimeState* state) override { - if (_type == ExchangeType::SHUFFLE) { + if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) { RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc())); } @@ -110,7 +121,7 @@ public: } Status open(RuntimeState* state) override { - if (_type == ExchangeType::SHUFFLE) { + if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) { RETURN_IF_ERROR(_partitioner->open(state)); } @@ -122,10 +133,12 @@ public: private: friend class LocalExchangeSinkLocalState; + friend class ShuffleExchanger; ExchangeType _type; const int _num_partitions; const std::vector& _texprs; std::unique_ptr _partitioner; + const std::map _bucket_seq_to_instance_idx; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp index 4467b32db4..c086900f5a 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -22,11 +22,11 @@ namespace doris::pipeline { void LocalExchangeSourceDependency::block() { - if (((LocalExchangeSharedState*)_shared_state.get())->exchanger->running_sink_operators == 0) { + if (((LocalExchangeSharedState*)_shared_state.get())->exchanger->_running_sink_operators == 0) { return; } std::unique_lock lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock); - if (((LocalExchangeSharedState*)_shared_state.get())->exchanger->running_sink_operators == 0) { + if (((LocalExchangeSharedState*)_shared_state.get())->exchanger->_running_sink_operators == 0) { return; } Dependency::block(); @@ -42,7 +42,8 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& DCHECK(_exchanger != nullptr); _get_block_failed_counter = ADD_COUNTER_WITH_LEVEL(profile(), "GetBlockFailedTime", TUnit::UNIT, 1); - if (_exchanger->get_type() == ExchangeType::SHUFFLE) { + if (_exchanger->get_type() == ExchangeType::HASH_SHUFFLE || + _exchanger->get_type() == ExchangeType::BUCKET_HASH_SHUFFLE) { _copy_data_timer = ADD_TIMER(profile(), "CopyDataTime"); } diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h index 9a44ce9678..1f87b86ebe 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h @@ -60,7 +60,7 @@ private: class LocalExchangeSourceOperatorX final : public OperatorX { public: using Base = OperatorX; - LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, -1, id) {} + LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, id, id) {} Status init(ExchangeType type) override { _op_name = "LOCAL_EXCHANGE_OPERATOR (" + get_exchange_type_name(type) + ")"; return Status::OK(); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 3487f3bcbf..a0d6b32bae 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -55,7 +55,7 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block _data_queue[local_state._channel_id].try_dequeue(partitioned_block)); *result_block = mutable_block->to_block(); }; - if (running_sink_operators == 0) { + if (_running_sink_operators == 0) { if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { SCOPED_TIMER(local_state._copy_data_timer); mutable_block = @@ -84,11 +84,11 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest const auto rows = block->rows(); auto row_idx = std::make_shared>(rows); { - local_state._partition_rows_histogram.assign(_num_instances + 1, 0); + local_state._partition_rows_histogram.assign(_num_partitions + 1, 0); for (size_t i = 0; i < rows; ++i) { local_state._partition_rows_histogram[channel_ids[i]]++; } - for (int32_t i = 1; i <= _num_instances; ++i) { + for (int32_t i = 1; i <= _num_partitions; ++i) { local_state._partition_rows_histogram[i] += local_state._partition_rows_histogram[i - 1]; } @@ -100,12 +100,25 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest } auto new_block = vectorized::Block::create_shared(block->clone_empty()); new_block->swap(*block); - for (size_t i = 0; i < _num_instances; i++) { - size_t start = local_state._partition_rows_histogram[i]; - size_t size = local_state._partition_rows_histogram[i + 1] - start; - if (size > 0) { - data_queue[i].enqueue({new_block, {row_idx, start, size}}); - local_state._shared_state->set_ready_for_read(i); + if (get_type() == ExchangeType::HASH_SHUFFLE) { + for (size_t i = 0; i < _num_partitions; i++) { + size_t start = local_state._partition_rows_histogram[i]; + size_t size = local_state._partition_rows_histogram[i + 1] - start; + if (size > 0) { + data_queue[i].enqueue({new_block, {row_idx, start, size}}); + local_state._shared_state->set_ready_for_read(i); + } + } + } else { + auto map = + local_state._parent->cast()._bucket_seq_to_instance_idx; + for (size_t i = 0; i < _num_partitions; i++) { + size_t start = local_state._partition_rows_histogram[i]; + size_t size = local_state._partition_rows_histogram[i + 1] - start; + if (size > 0) { + data_queue[map[i]].enqueue({new_block, {row_idx, start, size}}); + local_state._shared_state->set_ready_for_read(map[i]); + } } } @@ -117,7 +130,7 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo LocalExchangeSinkLocalState& local_state) { vectorized::Block new_block(in_block->clone_empty()); new_block.swap(*in_block); - auto channel_id = (local_state._channel_id++) % _num_instances; + auto channel_id = (local_state._channel_id++) % _num_partitions; _data_queue[channel_id].enqueue(std::move(new_block)); local_state._shared_state->set_ready_for_read(channel_id); @@ -128,7 +141,7 @@ Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* b SourceState& source_state, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - if (running_sink_operators == 0) { + if (_running_sink_operators == 0) { if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { *block = std::move(next_block); } else { diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h index 6a9bebd7b4..3993289d5d 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h @@ -27,8 +27,10 @@ class LocalExchangeSinkLocalState; class Exchanger { public: - Exchanger(int num_instances) - : running_sink_operators(num_instances), _num_instances(num_instances) {} + Exchanger(int num_partitions) + : _running_sink_operators(num_partitions), _num_partitions(num_partitions) {} + Exchanger(int running_sink_operators, int num_partitions) + : _running_sink_operators(running_sink_operators), _num_partitions(num_partitions) {} virtual ~Exchanger() = default; virtual Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state, @@ -37,24 +39,29 @@ public: LocalExchangeSinkLocalState& local_state) = 0; virtual ExchangeType get_type() const = 0; - std::atomic running_sink_operators = 0; - protected: - const int _num_instances; + friend struct LocalExchangeSourceDependency; + friend struct LocalExchangeSharedState; + std::atomic _running_sink_operators = 0; + const int _num_partitions; }; class LocalExchangeSourceLocalState; class LocalExchangeSinkLocalState; -class ShuffleExchanger final : public Exchanger { +class ShuffleExchanger : public Exchanger { using PartitionedBlock = std::pair, std::tuple>, size_t, size_t>>; public: ENABLE_FACTORY_CREATOR(ShuffleExchanger); - ShuffleExchanger(int num_instances) : Exchanger(num_instances) { - _data_queue.resize(num_instances); + ShuffleExchanger(int num_partitions) : Exchanger(num_partitions) { + _data_queue.resize(num_partitions); + } + ShuffleExchanger(int running_sink_operators, int num_partitions) + : Exchanger(running_sink_operators, num_partitions) { + _data_queue.resize(num_partitions); } ~ShuffleExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, @@ -62,9 +69,9 @@ public: Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state, LocalExchangeSourceLocalState& local_state) override; - ExchangeType get_type() const override { return ExchangeType::SHUFFLE; } + ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; } -private: +protected: Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, vectorized::Block* block, SourceState source_state, LocalExchangeSinkLocalState& local_state); @@ -72,6 +79,14 @@ private: std::vector> _data_queue; }; +class BucketShuffleExchanger : public ShuffleExchanger { + ENABLE_FACTORY_CREATOR(BucketShuffleExchanger); + BucketShuffleExchanger(int running_sink_operators, int num_buckets) + : ShuffleExchanger(running_sink_operators, num_buckets) {} + ~BucketShuffleExchanger() override = default; + ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; } +}; + class PassthroughExchanger final : public Exchanger { public: ENABLE_FACTORY_CREATOR(PassthroughExchanger); diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 69d6a6cbc3..aba51c1a34 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -187,6 +187,7 @@ public: [[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; } [[nodiscard]] virtual bool need_to_local_shuffle() const { return true; } + [[nodiscard]] virtual bool is_bucket_shuffle_scan() const { return false; } bool can_read() override { LOG(FATAL) << "should not reach here!"; @@ -438,7 +439,7 @@ public: virtual Status init(const TPlanNode& tnode, RuntimeState* state); Status init(const TDataSink& tsink) override; - virtual Status init(ExchangeType type) { + virtual Status init(ExchangeType type, int num_buckets) { return Status::InternalError("init() is only implemented in local exchange!"); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 71b45ad79c..e6949e66c3 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -231,12 +231,13 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r RETURN_IF_ERROR(_sink->init(request.fragment.output_sink)); static_cast(root_pipeline->set_sink(_sink)); - RETURN_IF_ERROR(_plan_local_shuffle()); + RETURN_IF_ERROR(_plan_local_shuffle(request.num_buckets, request.bucket_seq_to_instance_idx)); // 4. Initialize global states in pipelines. for (PipelinePtr& pipeline : _pipelines) { DCHECK(pipeline->sink_x() != nullptr) << pipeline->operator_xs().size(); static_cast(pipeline->sink_x()->set_child(pipeline->operator_xs().back())); + pipeline->children().clear(); RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get())); } @@ -249,40 +250,55 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r return Status::OK(); } -Status PipelineXFragmentContext::_plan_local_shuffle() { +Status PipelineXFragmentContext::_plan_local_shuffle( + int num_buckets, const std::map& bucket_seq_to_instance_idx) { for (int pip_idx = _pipelines.size() - 1; pip_idx >= 0; pip_idx--) { - auto& children = _pipelines[pip_idx]->children(); - if (children.empty()) { - _pipelines[pip_idx]->init_need_to_local_shuffle_by_source(); - } else if (children.size() == 1) { - _pipelines[pip_idx]->set_need_to_local_shuffle(children[0]->need_to_local_shuffle()); + _pipelines[pip_idx]->init_need_to_local_shuffle_by_source(); + // Set property if child pipeline is not join operator's child. + if (!_pipelines[pip_idx]->children().empty()) { + for (auto& child : _pipelines[pip_idx]->children()) { + if (child->sink_x()->node_id() == + _pipelines[pip_idx]->operator_xs().front()->node_id()) { + _pipelines[pip_idx]->set_need_to_local_shuffle( + _pipelines[pip_idx]->need_to_local_shuffle() && + child->need_to_local_shuffle()); + } + } } - int idx = 0; - bool do_local_exchange = false; - do { - auto& ops = _pipelines[pip_idx]->operator_xs(); - do_local_exchange = false; - for (; idx < ops.size();) { - if (ops[idx]->get_local_exchange_type() != ExchangeType::NOOP) { - RETURN_IF_ERROR(_add_local_exchange( - idx, ops[idx]->node_id(), _runtime_state->obj_pool(), - _pipelines[pip_idx], ops[idx]->get_local_shuffle_exprs(), - ops[idx]->get_local_exchange_type(), &do_local_exchange)); - } - if (do_local_exchange) { - idx = 2; - break; - } - idx++; + RETURN_IF_ERROR(_plan_local_shuffle(num_buckets, pip_idx, _pipelines[pip_idx], + bucket_seq_to_instance_idx)); + } + return Status::OK(); +} + +Status PipelineXFragmentContext::_plan_local_shuffle( + int num_buckets, int pip_idx, PipelinePtr pip, + const std::map& bucket_seq_to_instance_idx) { + int idx = 0; + bool do_local_exchange = false; + do { + auto& ops = pip->operator_xs(); + do_local_exchange = false; + for (; idx < ops.size();) { + if (ops[idx]->get_local_exchange_type() != ExchangeType::NOOP) { + RETURN_IF_ERROR(_add_local_exchange( + pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, + ops[idx]->get_local_shuffle_exprs(), ops[idx]->get_local_exchange_type(), + &do_local_exchange, num_buckets, bucket_seq_to_instance_idx)); } - } while (do_local_exchange); - if (_pipelines[pip_idx]->sink_x()->get_local_exchange_type() != ExchangeType::NOOP) { - RETURN_IF_ERROR(_add_local_exchange( - idx, _pipelines[pip_idx]->sink_x()->node_id(), _runtime_state->obj_pool(), - _pipelines[pip_idx], _pipelines[pip_idx]->sink_x()->get_local_shuffle_exprs(), - _pipelines[pip_idx]->sink_x()->get_local_exchange_type(), &do_local_exchange)); + if (do_local_exchange) { + idx = 2; + break; + } + idx++; } + } while (do_local_exchange); + if (pip->sink_x()->get_local_exchange_type() != ExchangeType::NOOP) { + RETURN_IF_ERROR(_add_local_exchange( + pip_idx, idx, pip->sink_x()->node_id(), _runtime_state->obj_pool(), pip, + pip->sink_x()->get_local_shuffle_exprs(), pip->sink_x()->get_local_exchange_type(), + &do_local_exchange, num_buckets, bucket_seq_to_instance_idx)); } return Status::OK(); } @@ -634,16 +650,17 @@ Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool, return Status::OK(); } -Status PipelineXFragmentContext::_add_local_exchange(int idx, int node_id, ObjectPool* pool, - PipelinePtr cur_pipe, - const std::vector& texprs, - ExchangeType exchange_type, - bool* do_local_exchange) { +Status PipelineXFragmentContext::_add_local_exchange( + int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe, + const std::vector& texprs, ExchangeType exchange_type, bool* do_local_exchange, + int num_buckets, const std::map& bucket_seq_to_instance_idx) { if (!_runtime_state->enable_local_shuffle() || _num_instances <= 1) { return Status::OK(); } - if (!cur_pipe->need_to_local_shuffle() && exchange_type == ExchangeType::SHUFFLE) { + if (!cur_pipe->need_to_local_shuffle() && + (exchange_type == ExchangeType::HASH_SHUFFLE || + exchange_type == ExchangeType::BUCKET_HASH_SHUFFLE)) { return Status::OK(); } *do_local_exchange = true; @@ -653,21 +670,27 @@ Status PipelineXFragmentContext::_add_local_exchange(int idx, int node_id, Objec const auto downstream_pipeline_id = cur_pipe->id(); auto local_exchange_id = next_operator_id(); // 1. Create a new pipeline with local exchange sink. - auto new_pip = add_pipeline(); + auto new_pip = add_pipeline(cur_pipe, pip_idx + 1); DataSinkOperatorXPtr sink; sink.reset(new LocalExchangeSinkOperatorX(next_sink_operator_id(), local_exchange_id, - _num_instances, texprs)); + _num_instances, texprs, bucket_seq_to_instance_idx)); RETURN_IF_ERROR(new_pip->set_sink(sink)); auto shared_state = LocalExchangeSharedState::create_shared(); shared_state->source_dependencies.resize(_num_instances, nullptr); switch (exchange_type) { - case ExchangeType::SHUFFLE: + case ExchangeType::HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique(_num_instances); new_pip->set_need_to_local_shuffle(false); cur_pipe->set_need_to_local_shuffle(false); break; + case ExchangeType::BUCKET_HASH_SHUFFLE: + shared_state->exchanger = + BucketShuffleExchanger::create_unique(_num_instances, num_buckets); + new_pip->set_need_to_local_shuffle(false); + cur_pipe->set_need_to_local_shuffle(false); + break; case ExchangeType::PASSTHROUGH: shared_state->exchanger = PassthroughExchanger::create_unique(_num_instances); new_pip->set_need_to_local_shuffle(cur_pipe->need_to_local_shuffle()); @@ -677,7 +700,7 @@ Status PipelineXFragmentContext::_add_local_exchange(int idx, int node_id, Objec return Status::InternalError("Unsupported local exchange type : " + std::to_string((int)exchange_type)); } - RETURN_IF_ERROR(new_pip->sink_x()->init(exchange_type)); + RETURN_IF_ERROR(new_pip->sink_x()->init(exchange_type, num_buckets)); _op_id_to_le_state.insert({local_exchange_id, shared_state}); // 2. Initialize operators list. diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index b2239c4991..f78cf2aa85 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -125,9 +125,10 @@ public: private: void _close_fragment_instance() override; Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override; - Status _add_local_exchange(int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe, - const std::vector& texprs, ExchangeType exchange_type, - bool* do_local_exchange); + Status _add_local_exchange(int pip_idx, int idx, int node_id, ObjectPool* pool, + PipelinePtr cur_pipe, const std::vector& texprs, + ExchangeType exchange_type, bool* do_local_exchange, int num_buckets, + const std::map& bucket_seq_to_instance_idx); [[nodiscard]] Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, @@ -153,7 +154,10 @@ private: const TPipelineFragmentParams& params, const RowDescriptor& row_desc, RuntimeState* state, DescriptorTbl& desc_tbl, PipelineId cur_pipeline_id); - Status _plan_local_shuffle(); + Status _plan_local_shuffle(int num_buckets, + const std::map& bucket_seq_to_instance_idx); + Status _plan_local_shuffle(int num_buckets, int pip_idx, PipelinePtr pip, + const std::map& bucket_seq_to_instance_idx); bool _has_inverted_index_or_partial_update(TOlapTableSink sink); diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 57d632a03e..3c9009d54c 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -47,8 +47,7 @@ public: : vectorized::ScannerContext(state, nullptr, output_tuple_desc, scanners, limit_, max_bytes_in_blocks_queue, num_parallel_instances, local_state), - _col_distribute_ids(col_distribute_ids), - _need_colocate_distribute(!_col_distribute_ids.empty()) {} + _need_colocate_distribute(false) {} Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, int id, bool wait = false) override { @@ -249,7 +248,7 @@ private: std::vector> _blocks_queues; std::atomic_int64_t _current_used_bytes = 0; - const std::vector& _col_distribute_ids; + const std::vector _col_distribute_ids; const bool _need_colocate_distribute; std::vector _colocate_blocks; std::vector> _colocate_mutable_blocks; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 9a5e07b6f3..8a4a0057f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -42,6 +42,7 @@ import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TEqJoinCondition; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.THashJoinNode; +import org.apache.doris.thrift.TJoinDistributionType; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; @@ -730,6 +731,7 @@ public class HashJoinNode extends JoinNodeBase { msg.hash_join_node.addToVintermediateTupleIdList(tupleDescriptor.getId().asInt()); } } + msg.hash_join_node.setDistType(isColocate ? TJoinDistributionType.COLOCATE : distrMode.toThrift()); } @Override @@ -812,6 +814,22 @@ public class HashJoinNode extends JoinNodeBase { public String toString() { return description; } + + public TJoinDistributionType toThrift() { + switch (this) { + case NONE: + return TJoinDistributionType.NONE; + case BROADCAST: + return TJoinDistributionType.BROADCAST; + case PARTITIONED: + return TJoinDistributionType.PARTITIONED; + case BUCKET_SHUFFLE: + return TJoinDistributionType.BUCKET_SHUFFLE; + default: + Preconditions.checkArgument(false, "Unknown DistributionMode: " + toString()); + } + return TJoinDistributionType.NONE; + } } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 84b54f7e4d..7ca0198ce6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -64,6 +64,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.resource.Tag; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsDeriveResult; @@ -1444,7 +1445,7 @@ public class OlapScanNode extends ScanNode { msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds); } - if (shouldColoScan) { + if (shouldColoScan || SessionVariable.enablePipelineEngineX()) { msg.olap_scan_node.setDistributeColumnIds(new ArrayList<>(distributionColumnIds)); } } @@ -1621,8 +1622,10 @@ public class OlapScanNode extends ScanNode { public void finalizeForNereids() { computeNumNodes(); computeStatsForNereids(); - // distributionColumnIds is used for one backend node agg optimization, nereids do not support it. - distributionColumnIds.clear(); + if (!SessionVariable.enablePipelineEngineX()) { + // distributionColumnIds is used for one backend node agg optimization, nereids do not support it. + distributionColumnIds.clear(); + } } private void computeStatsForNereids() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 2a4aa17553..df4aa499fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -144,6 +144,8 @@ public class PlanFragment extends TreeNode { // The runtime filter id that is expected to be used private Set targetRuntimeFilterIds; + private int bucketNum; + // has colocate plan node private boolean hasColocatePlanNode = false; @@ -460,4 +462,12 @@ public class PlanFragment extends TreeNode { public void setFragmentSequenceNum(int seq) { fragmentSequenceNum = seq; } + + public int getBucketNum() { + return bucketNum; + } + + public void setBucketNum(int bucketNum) { + this.bucketNum = bucketNum; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 489cafa680..30a746d36e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -45,6 +45,7 @@ import org.apache.doris.planner.IntersectNode; import org.apache.doris.planner.MultiCastDataSink; import org.apache.doris.planner.MultiCastPlanFragment; import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.OriginalPlanner; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.planner.PlanNode; @@ -300,6 +301,9 @@ public class Coordinator implements CoordInterface { && (fragments.size() > 0); initQueryOptions(context); + if (planner instanceof OriginalPlanner) { + queryOptions.setEnableLocalShuffle(false); + } setFromUserProperty(context); @@ -2240,6 +2244,15 @@ public class Coordinator implements CoordInterface { if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) { fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>()); fragmentIdTobucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange()); + + // Same as bucket shuffle. + int bucketNum = 0; + if (scanNode.getOlapTable().isColocateTable()) { + bucketNum = scanNode.getOlapTable().getDefaultDistributionInfo().getBucketNum(); + } else { + bucketNum = (int) (scanNode.getTotalTabletsNum()); + } + scanNode.getFragment().setBucketNum(bucketNum); } Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); BucketSeqToScanRange bucketSeqToScanRange = fragmentIdTobucketSeqToScanRangeMap.get(scanNode.getFragmentId()); @@ -2767,6 +2780,7 @@ public class Coordinator implements CoordInterface { fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>()); fragmentIdBucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange()); fragmentIdToBuckendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>()); + scanNode.getFragment().setBucketNum(bucketNum); } Map bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); @@ -3554,7 +3568,12 @@ public class Coordinator implements CoordInterface { } params.setFileScanParams(fileScanRangeParamsMap); + params.setNumBuckets(fragment.getBucketNum()); res.put(instanceExecParam.host, params); + res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap()); + } + for (int bucket : instanceExecParam.bucketSeqSet) { + res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, i); } TPipelineFragmentParams params = res.get(instanceExecParam.host); TPipelineInstanceParams localParams = new TPipelineInstanceParams(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 2d0dac4886..bcd3a5980f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -2569,7 +2569,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setBeExecVersion(Config.be_exec_version); tResult.setEnablePipelineEngine(enablePipelineEngine); tResult.setEnablePipelineXEngine(enablePipelineXEngine); - tResult.setEnableLocalShuffle(enableLocalShuffle); + tResult.setEnableLocalShuffle(enableLocalShuffle && enableNereidsPlanner); tResult.setParallelInstance(getParallelExecInstanceNum()); tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary); tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery); @@ -2976,6 +2976,14 @@ public class SessionVariable implements Serializable, Writable { || connectContext.getSessionVariable().enablePipelineXEngine; } + public static boolean enablePipelineEngineX() { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext == null) { + return false; + } + return connectContext.getSessionVariable().enablePipelineXEngine; + } + public static boolean enableAggState() { ConnectContext connectContext = ConnectContext.get(); if (connectContext == null) { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 1c444cec1b..c13473e9db 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -687,6 +687,8 @@ struct TPipelineFragmentParams { 31: optional i32 load_stream_per_node // num load stream for each sink backend 32: optional i32 total_load_streams // total num of load streams the downstream backend will see 33: optional i32 num_local_sink + 34: optional i32 num_buckets + 35: optional map bucket_seq_to_instance_idx } struct TPipelineFragmentParamsList { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 2a59ddca9f..b32259d35c 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -728,6 +728,14 @@ enum TJoinOp { NULL_AWARE_LEFT_ANTI_JOIN } +enum TJoinDistributionType { + NONE, + BROADCAST, + PARTITIONED, + BUCKET_SHUFFLE, + COLOCATE, +} + struct THashJoinNode { 1: required TJoinOp join_op @@ -759,6 +767,7 @@ struct THashJoinNode { 10: optional bool is_broadcast_join 11: optional bool is_mark + 12: optional TJoinDistributionType dist_type } struct TNestedLoopJoinNode {