From c0aac043b66e6b3f14468a7c77e40f7d7c5c8f64 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 30 Nov 2023 21:08:45 +0800 Subject: [PATCH] [pipelineX](local shuffle) Use local shuffle to optimize BHJ (#27823) --- .../pipeline/exec/exchange_source_operator.h | 5 + be/src/pipeline/pipeline.h | 20 +++ be/src/pipeline/pipeline_x/dependency.cpp | 9 +- be/src/pipeline/pipeline_x/dependency.h | 15 +- .../local_exchange_sink_operator.cpp | 59 +------ .../local_exchange_sink_operator.h | 40 +++-- .../local_exchange_source_operator.cpp | 57 +++---- .../local_exchange_source_operator.h | 17 +- .../local_exchange/local_exchanger.cpp | 147 ++++++++++++++++++ .../local_exchange/local_exchanger.h | 98 ++++++++++++ be/src/pipeline/pipeline_x/operator.h | 4 +- .../pipeline_x_fragment_context.cpp | 67 ++++++-- .../pipeline_x/pipeline_x_fragment_context.h | 5 +- 13 files changed, 412 insertions(+), 131 deletions(-) create mode 100644 be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp create mode 100644 be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index abac33001b..479a879905 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -104,6 +104,11 @@ public: return _sub_plan_query_statistics_recvr; } + bool need_to_local_shuffle() const override { + // TODO(gabriel): + return false; + } + private: friend class ExchangeLocalState; const int _num_senders; diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index f4b7928887..a0b6de5c62 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -126,6 +126,17 @@ public: return _collect_query_statistics_with_every_batch; } + bool need_to_local_shuffle() const { return _need_to_local_shuffle; } + void set_need_to_local_shuffle(bool need_to_local_shuffle) { + _need_to_local_shuffle = need_to_local_shuffle; + } + void init_need_to_local_shuffle_by_source() { + set_need_to_local_shuffle(operatorXs.front()->need_to_local_shuffle()); + } + + std::vector>& children() { return _children; } + void set_children(std::shared_ptr child) { _children.push_back(child); } + private: void _init_profile(); @@ -136,6 +147,8 @@ private: std::vector>> _parents; std::vector>> _dependencies; + std::vector> _children; + PipelineId _pipeline_id; std::weak_ptr _context; int _previous_schedule_id = -1; @@ -178,6 +191,13 @@ private: bool _always_can_write = false; bool _is_root_pipeline = false; bool _collect_query_statistics_with_every_batch = false; + + // If source operator meets one of all conditions below: + // 1. is scan operator with Hash Bucket + // 2. is exchange operator with Hash/BucketHash partition + // then set `_need_to_local_shuffle` to false which means we should use local shuffle in this fragment + // because data already be partitioned by storage/shuffling. + bool _need_to_local_shuffle = true; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 2fcf0906c1..dcb149d078 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -22,7 +22,7 @@ #include "common/logging.h" #include "pipeline/pipeline_fragment_context.h" -#include "pipeline/pipeline_task.h" +#include "pipeline/pipeline_x/local_exchange/local_exchanger.h" #include "pipeline/pipeline_x/pipeline_x_task.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" @@ -180,4 +180,11 @@ void RuntimeFilterDependency::sub_filters() { } } +void LocalExchangeSharedState::sub_running_sink_operators() { + std::unique_lock lc(le_lock); + if (exchanger->running_sink_operators.fetch_sub(1) == 1) { + _set_ready_for_read(); + } +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 16e98f11b2..9fbb25aaa2 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -31,6 +31,7 @@ #include "gutil/integral_types.h" #include "pipeline/exec/data_queue.h" #include "pipeline/exec/multi_cast_data_streamer.h" +#include "pipeline/exec/operator.h" #include "vec/common/hash_table/hash_map_context_creator.h" #include "vec/common/sort/partition_sorter.h" #include "vec/common/sort/sorter.h" @@ -579,21 +580,15 @@ public: } }; -using PartitionedBlock = std::pair, - std::tuple>, size_t, size_t>>; +class Exchanger; + struct LocalExchangeSharedState : public BasicSharedState { public: ENABLE_FACTORY_CREATOR(LocalExchangeSharedState); - std::vector> data_queue; + std::unique_ptr exchanger {}; std::vector source_dependencies; - std::atomic running_sink_operators = 0; std::mutex le_lock; - void sub_running_sink_operators() { - std::unique_lock lc(le_lock); - if (running_sink_operators.fetch_sub(1) == 1) { - _set_ready_for_read(); - } - } + void sub_running_sink_operators(); void _set_ready_for_read() { for (auto* dep : source_dependencies) { DCHECK(dep); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp index 12cc5e042e..3d1540cdc4 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -17,6 +17,8 @@ #include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h" +#include "pipeline/pipeline_x/local_exchange/local_exchanger.h" + namespace doris::pipeline { Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { @@ -26,47 +28,12 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime"); _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime"); - auto& p = _parent->cast(); - _num_rows_in_queue.resize(p._num_partitions); - for (size_t i = 0; i < p._num_partitions; i++) { - _num_rows_in_queue[i] = ADD_COUNTER_WITH_LEVEL( - profile(), "NumRowsInQueue" + std::to_string(i), TUnit::UNIT, 1); - } - RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner)); - return Status::OK(); -} + _exchanger = _shared_state->exchanger.get(); + DCHECK(_exchanger != nullptr); -Status LocalExchangeSinkLocalState::split_rows(RuntimeState* state, - const uint32_t* __restrict channel_ids, - vectorized::Block* block, SourceState source_state) { - auto& data_queue = _shared_state->data_queue; - const auto num_partitions = data_queue.size(); - const auto rows = block->rows(); - auto row_idx = std::make_shared>(rows); - { - _partition_rows_histogram.assign(num_partitions + 1, 0); - for (size_t i = 0; i < rows; ++i) { - _partition_rows_histogram[channel_ids[i]]++; - } - for (int32_t i = 1; i <= num_partitions; ++i) { - _partition_rows_histogram[i] += _partition_rows_histogram[i - 1]; - } - - for (int32_t i = rows - 1; i >= 0; --i) { - (*row_idx)[_partition_rows_histogram[channel_ids[i]] - 1] = i; - _partition_rows_histogram[channel_ids[i]]--; - } - } - auto new_block = vectorized::Block::create_shared(block->clone_empty()); - new_block->swap(*block); - for (size_t i = 0; i < num_partitions; i++) { - size_t start = _partition_rows_histogram[i]; - size_t size = _partition_rows_histogram[i + 1] - start; - if (size > 0) { - data_queue[i].enqueue({new_block, {row_idx, start, size}}); - _shared_state->set_ready_for_read(i); - COUNTER_UPDATE(_num_rows_in_queue[i], size); - } + if (_exchanger->get_type() == ExchangeType::SHUFFLE) { + auto& p = _parent->cast(); + RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner)); } return Status::OK(); @@ -77,17 +44,7 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - { - SCOPED_TIMER(local_state._compute_hash_value_timer); - RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, in_block, - local_state.mem_tracker())); - } - { - SCOPED_TIMER(local_state._distribute_timer); - RETURN_IF_ERROR(local_state.split_rows( - state, (const uint32_t*)local_state._partitioner->get_channel_ids(), in_block, - source_state)); - } + RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, source_state, local_state)); if (source_state == SourceState::FINISHED) { local_state._shared_state->sub_running_sink_operators(); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index 45d61d4ff6..c5fe9dc7dc 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -30,6 +30,9 @@ public: ~LocalExchangeSinkDependency() override = default; }; +class Exchanger; +class ShuffleExchanger; +class PassthroughExchanger; class LocalExchangeSinkOperatorX; class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState { @@ -43,17 +46,21 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - Status split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, - vectorized::Block* block, SourceState source_state); - private: friend class LocalExchangeSinkOperatorX; + friend class ShuffleExchanger; + friend class PassthroughExchanger; + Exchanger* _exchanger = nullptr; + + // Used by shuffle exchanger RuntimeProfile::Counter* _compute_hash_value_timer = nullptr; RuntimeProfile::Counter* _distribute_timer = nullptr; - std::vector _num_rows_in_queue {}; std::unique_ptr _partitioner = nullptr; - std::vector _partition_rows_histogram {}; + std::vector _partition_rows_histogram; + + // Used by random passthrough exchanger + int _channel_id = 0; }; // A single 32-bit division on a recent x64 processor has a throughput of one instruction every six cycles with a latency of 26 cycles. @@ -82,21 +89,31 @@ public: return Status::InternalError("{} should not init with TPlanNode", Base::_name); } - Status init() override { + Status init(bool need_partitioner) override { _name = "LOCAL_EXCHANGE_SINK_OPERATOR"; - _partitioner.reset( - new vectorized::Crc32HashPartitioner(_num_partitions)); - RETURN_IF_ERROR(_partitioner->init(_texprs)); + _need_partitioner = need_partitioner; + if (_need_partitioner) { + _partitioner.reset( + new vectorized::Crc32HashPartitioner(_num_partitions)); + RETURN_IF_ERROR(_partitioner->init(_texprs)); + } + return Status::OK(); } Status prepare(RuntimeState* state) override { - RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc())); + if (_need_partitioner) { + RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc())); + } + return Status::OK(); } Status open(RuntimeState* state) override { - RETURN_IF_ERROR(_partitioner->open(state)); + if (_need_partitioner) { + RETURN_IF_ERROR(_partitioner->open(state)); + } + return Status::OK(); } @@ -105,6 +122,7 @@ public: private: friend class LocalExchangeSinkLocalState; + bool _need_partitioner; const int _num_partitions; const std::vector& _texprs; std::unique_ptr _partitioner; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp index 83dac5eb8f..dd64852891 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -17,8 +17,21 @@ #include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h" +#include "pipeline/pipeline_x/local_exchange/local_exchanger.h" + namespace doris::pipeline { +void LocalExchangeSourceDependency::block() { + if (((LocalExchangeSharedState*)_shared_state.get())->exchanger->running_sink_operators == 0) { + return; + } + std::unique_lock lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock); + if (((LocalExchangeSharedState*)_shared_state.get())->exchanger->running_sink_operators == 0) { + return; + } + Dependency::block(); +} + Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); @@ -28,9 +41,14 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& DCHECK(_shared_state != nullptr); _channel_id = info.task_idx; _shared_state->set_dep_by_channel_id(_dependency, _channel_id); + _exchanger = _shared_state->exchanger.get(); + DCHECK(_exchanger != nullptr); _get_block_failed_counter = ADD_COUNTER_WITH_LEVEL(profile(), "GetBlockFailedTime", TUnit::UNIT, 1); - _copy_data_timer = ADD_TIMER(profile(), "CopyDataTime"); + if (_exchanger->get_type() == ExchangeType::SHUFFLE) { + _copy_data_timer = ADD_TIMER(profile(), "CopyDataTime"); + } + return Status::OK(); } @@ -38,42 +56,7 @@ Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized:: SourceState& source_state) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - PartitionedBlock partitioned_block; - std::unique_ptr mutable_block = nullptr; - - auto get_data = [&](vectorized::Block* result_block) { - do { - const auto* offset_start = &(( - *std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]); - mutable_block->add_rows(partitioned_block.first.get(), offset_start, - offset_start + std::get<2>(partitioned_block.second)); - } while (mutable_block->rows() < state->batch_size() && - local_state._shared_state->data_queue[local_state._channel_id].try_dequeue( - partitioned_block)); - *result_block = mutable_block->to_block(); - }; - if (local_state._shared_state->running_sink_operators == 0) { - if (local_state._shared_state->data_queue[local_state._channel_id].try_dequeue( - partitioned_block)) { - SCOPED_TIMER(local_state._copy_data_timer); - mutable_block = - vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty()); - get_data(block); - } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - source_state = SourceState::FINISHED; - } - } else if (local_state._shared_state->data_queue[local_state._channel_id].try_dequeue( - partitioned_block)) { - SCOPED_TIMER(local_state._copy_data_timer); - mutable_block = - vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty()); - get_data(block); - } else { - local_state._dependency->block(); - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - } - + RETURN_IF_ERROR(local_state._exchanger->get_block(state, block, source_state, local_state)); local_state.reached_limit(block, source_state); return Status::OK(); } diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h index 3ccc38854f..d94b9041fc 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h @@ -29,18 +29,12 @@ public: : Dependency(id, node_id, "LocalExchangeSourceDependency", query_ctx) {} ~LocalExchangeSourceDependency() override = default; - void block() override { - if (((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0) { - return; - } - std::unique_lock lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock); - if (((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0) { - return; - } - Dependency::block(); - } + void block() override; }; +class Exchanger; +class ShuffleExchanger; +class PassthroughExchanger; class LocalExchangeSourceOperatorX; class LocalExchangeSourceLocalState final : public PipelineXLocalState { @@ -54,7 +48,10 @@ public: private: friend class LocalExchangeSourceOperatorX; + friend class ShuffleExchanger; + friend class PassthroughExchanger; + Exchanger* _exchanger = nullptr; int _channel_id; RuntimeProfile::Counter* _get_block_failed_counter = nullptr; RuntimeProfile::Counter* _copy_data_timer = nullptr; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp new file mode 100644 index 0000000000..616b469a99 --- /dev/null +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/pipeline_x/local_exchange/local_exchanger.h" + +#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h" +#include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h" + +namespace doris::pipeline { + +Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state, LocalExchangeSinkLocalState& local_state) { + { + SCOPED_TIMER(local_state._compute_hash_value_timer); + RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, in_block, + local_state.mem_tracker())); + } + { + SCOPED_TIMER(local_state._distribute_timer); + RETURN_IF_ERROR(_split_rows(state, + (const uint32_t*)local_state._partitioner->get_channel_ids(), + in_block, source_state, local_state)); + } + + return Status::OK(); +} + +Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state, + LocalExchangeSourceLocalState& local_state) { + PartitionedBlock partitioned_block; + std::unique_ptr mutable_block = nullptr; + + auto get_data = [&](vectorized::Block* result_block) { + do { + const auto* offset_start = &(( + *std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]); + mutable_block->add_rows(partitioned_block.first.get(), offset_start, + offset_start + std::get<2>(partitioned_block.second)); + } while (mutable_block->rows() < state->batch_size() && + _data_queue[local_state._channel_id].try_dequeue(partitioned_block)); + *result_block = mutable_block->to_block(); + }; + if (running_sink_operators == 0) { + if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { + SCOPED_TIMER(local_state._copy_data_timer); + mutable_block = + vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty()); + get_data(block); + } else { + COUNTER_UPDATE(local_state._get_block_failed_counter, 1); + source_state = SourceState::FINISHED; + } + } else if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { + SCOPED_TIMER(local_state._copy_data_timer); + mutable_block = + vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty()); + get_data(block); + } else { + COUNTER_UPDATE(local_state._get_block_failed_counter, 1); + local_state._dependency->block(); + } + return Status::OK(); +} + +Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, + vectorized::Block* block, SourceState source_state, + LocalExchangeSinkLocalState& local_state) { + auto& data_queue = _data_queue; + const auto rows = block->rows(); + auto row_idx = std::make_shared>(rows); + { + local_state._partition_rows_histogram.assign(_num_instances + 1, 0); + for (size_t i = 0; i < rows; ++i) { + local_state._partition_rows_histogram[channel_ids[i]]++; + } + for (int32_t i = 1; i <= _num_instances; ++i) { + local_state._partition_rows_histogram[i] += + local_state._partition_rows_histogram[i - 1]; + } + + for (int32_t i = rows - 1; i >= 0; --i) { + (*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] - 1] = i; + local_state._partition_rows_histogram[channel_ids[i]]--; + } + } + auto new_block = vectorized::Block::create_shared(block->clone_empty()); + new_block->swap(*block); + for (size_t i = 0; i < _num_instances; i++) { + size_t start = local_state._partition_rows_histogram[i]; + size_t size = local_state._partition_rows_histogram[i + 1] - start; + if (size > 0) { + data_queue[i].enqueue({new_block, {row_idx, start, size}}); + local_state._shared_state->set_ready_for_read(i); + } + } + + return Status::OK(); +} + +Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state, + LocalExchangeSinkLocalState& local_state) { + auto new_block = vectorized::Block::create_unique(in_block->clone_empty()); + new_block->swap(*in_block); + auto channel_id = (local_state._channel_id++) % _num_instances; + _data_queue[channel_id].enqueue(std::move(new_block)); + local_state._shared_state->set_ready_for_read(channel_id); + + return Status::OK(); +} + +Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state, + LocalExchangeSourceLocalState& local_state) { + std::unique_ptr next_block; + if (running_sink_operators == 0) { + if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + *block = *next_block.release(); + } else { + COUNTER_UPDATE(local_state._get_block_failed_counter, 1); + source_state = SourceState::FINISHED; + } + } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + *block = *next_block.release(); + } else { + COUNTER_UPDATE(local_state._get_block_failed_counter, 1); + local_state._dependency->block(); + } + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h new file mode 100644 index 0000000000..13e3fe931e --- /dev/null +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "pipeline/pipeline_x/dependency.h" +#include "pipeline/pipeline_x/operator.h" + +namespace doris::pipeline { + +enum class ExchangeType : uint8_t { + SHUFFLE = 0, + PASSTHROUGH = 1, +}; + +class LocalExchangeSourceLocalState; +class LocalExchangeSinkLocalState; + +class Exchanger { +public: + Exchanger(int num_instances) + : running_sink_operators(num_instances), _num_instances(num_instances) {} + virtual ~Exchanger() = default; + virtual Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state, + LocalExchangeSourceLocalState& local_state) = 0; + virtual Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, + LocalExchangeSinkLocalState& local_state) = 0; + virtual ExchangeType get_type() const = 0; + + std::atomic running_sink_operators = 0; + +protected: + const int _num_instances; +}; + +class LocalExchangeSourceLocalState; +class LocalExchangeSinkLocalState; + +class ShuffleExchanger final : public Exchanger { + using PartitionedBlock = + std::pair, + std::tuple>, size_t, size_t>>; + +public: + ENABLE_FACTORY_CREATOR(ShuffleExchanger); + ShuffleExchanger(int num_instances) : Exchanger(num_instances) { + _data_queue.resize(num_instances); + } + ~ShuffleExchanger() override = default; + Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, + LocalExchangeSinkLocalState& local_state) override; + + Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state, + LocalExchangeSourceLocalState& local_state) override; + ExchangeType get_type() const override { return ExchangeType::SHUFFLE; } + +private: + Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, + vectorized::Block* block, SourceState source_state, + LocalExchangeSinkLocalState& local_state); + + std::vector> _data_queue; +}; + +class PassthroughExchanger final : public Exchanger { +public: + ENABLE_FACTORY_CREATOR(PassthroughExchanger); + PassthroughExchanger(int num_instances) : Exchanger(num_instances) { + _data_queue.resize(num_instances); + } + ~PassthroughExchanger() override = default; + Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, + LocalExchangeSinkLocalState& local_state) override; + + Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state, + LocalExchangeSourceLocalState& local_state) override; + ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; } + +private: + std::vector>> _data_queue; +}; + +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 5fa6785435..294eb962ee 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -179,6 +179,8 @@ public: [[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; } + [[nodiscard]] virtual bool need_to_local_shuffle() const { return true; } + bool can_read() override { LOG(FATAL) << "should not reach here!"; return false; @@ -423,7 +425,7 @@ public: virtual Status init(const TPlanNode& tnode, RuntimeState* state); Status init(const TDataSink& tsink) override; - virtual Status init() { + virtual Status init(bool need_partitioner) { return Status::InternalError("init() is only implemented in local exchange!"); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index d49e290c04..636d4e235d 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -231,6 +231,8 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r RETURN_IF_ERROR(_sink->init(request.fragment.output_sink)); static_cast(root_pipeline->set_sink(_sink)); + // RETURN_IF_ERROR(_plan_local_shuffle()); + // 4. Initialize global states in pipelines. for (PipelinePtr& pipeline : _pipelines) { DCHECK(pipeline->sink_x() != nullptr) << pipeline->operator_xs().size(); @@ -247,6 +249,17 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r return Status::OK(); } +Status PipelineXFragmentContext::_plan_local_shuffle() { + for (int pip_idx = _pipelines.size() - 1; pip_idx >= 0; pip_idx--) { + auto& children = _pipelines[pip_idx]->children(); + if (children.empty()) { + _pipelines[pip_idx]->init_need_to_local_shuffle_by_source(); + } else { + } + } + return Status::OK(); +} + Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink, const std::vector& output_exprs, const TPipelineFragmentParams& params, @@ -595,7 +608,8 @@ Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool, Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorXPtr& op, PipelinePtr& cur_pipe, const TPlanNode& tnode, - const std::vector& texprs) { + const std::vector& texprs, + ExchangeType exchange_type) { if (!_runtime_state->enable_local_shuffle() || _num_instances <= 1) { return Status::OK(); } @@ -617,12 +631,23 @@ Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorX sink.reset(new LocalExchangeSinkOperatorX(next_sink_operator_id(), local_exchange_id, _num_instances, texprs)); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); - RETURN_IF_ERROR(cur_pipe->sink_x()->init()); + bool need_partitioner = false; auto shared_state = LocalExchangeSharedState::create_shared(); - shared_state->data_queue.resize(_num_instances); shared_state->source_dependencies.resize(_num_instances, nullptr); - shared_state->running_sink_operators = _num_instances; + switch (exchange_type) { + case ExchangeType::SHUFFLE: + shared_state->exchanger = ShuffleExchanger::create_unique(_num_instances); + need_partitioner = true; + break; + case ExchangeType::PASSTHROUGH: + shared_state->exchanger = PassthroughExchanger::create_unique(_num_instances); + break; + default: + return Status::InternalError("Unsupported local exchange type : " + + std::to_string((int)exchange_type)); + } + RETURN_IF_ERROR(cur_pipe->sink_x()->init(need_partitioner)); _op_id_to_le_state.insert({local_exchange_id, shared_state}); return Status::OK(); } @@ -687,6 +712,10 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); + + // RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, + // tnode.agg_node.grouping_exprs, + // ExchangeType::PASSTHROUGH)); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation) { op.reset(new StreamingAggSourceOperatorX(pool, tnode, next_operator_id(), descs)); @@ -703,6 +732,10 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); + + // RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, + // tnode.agg_node.grouping_exprs, + // ExchangeType::PASSTHROUGH)); } else { op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -720,10 +753,19 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); - if (!tnode.agg_node.need_finalize) { - RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, - tnode.agg_node.grouping_exprs)); - } + // if (tnode.agg_node.grouping_exprs.empty()) { + // if (tnode.agg_node.need_finalize) { + // RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, + // tnode.agg_node.grouping_exprs, + // ExchangeType::PASSTHROUGH)); + // } else { + // // TODO(gabriel): maybe use local shuffle + // } + // } else if (cur_pipe->need_to_local_shuffle()) { + // RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, + // tnode.agg_node.grouping_exprs, + // ExchangeType::SHUFFLE)); + // } } break; } @@ -750,7 +792,14 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN for (const auto& eq_join_conjunct : eq_join_conjuncts) { probe_exprs.push_back(eq_join_conjunct.left); } - RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, probe_exprs)); + if (tnode.hash_join_node.__isset.is_broadcast_join && + tnode.hash_join_node.is_broadcast_join) { + RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, probe_exprs, + ExchangeType::PASSTHROUGH)); + } else if (cur_pipe->need_to_local_shuffle()) { + RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, probe_exprs, + ExchangeType::SHUFFLE)); + } _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); break; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 7f47052296..1390f2a954 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -35,6 +35,7 @@ #include "pipeline/pipeline.h" #include "pipeline/pipeline_fragment_context.h" #include "pipeline/pipeline_task.h" +#include "pipeline/pipeline_x/local_exchange/local_exchanger.h" #include "pipeline/pipeline_x/pipeline_x_task.h" #include "runtime/query_context.h" #include "runtime/runtime_state.h" @@ -125,7 +126,8 @@ private: void _close_fragment_instance() override; Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override; Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op, PipelinePtr& cur_pipe, - const TPlanNode& tnode, const std::vector& texprs); + const TPlanNode& tnode, const std::vector& texprs, + ExchangeType exchange_type); [[nodiscard]] Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, @@ -151,6 +153,7 @@ private: const TPipelineFragmentParams& params, const RowDescriptor& row_desc, RuntimeState* state, DescriptorTbl& desc_tbl, PipelineId cur_pipeline_id); + Status _plan_local_shuffle(); bool _has_inverted_index_or_partial_update(TOlapTableSink sink);