From 34f9072ab600ac3bb4940150cd1225644bcdedf1 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 3 Jan 2025 09:14:22 +0800 Subject: [PATCH] =?UTF-8?q?[Improvement](local=20shuffle)=20Reduce=20locki?= =?UTF-8?q?ng=20scope=20in=20local=20exchanger=20=E2=80=A6=20(#46294)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …(#46251) Reduce lock scope from global level to data queue level. --- .../local_exchange/local_exchanger.cpp | 4 +- .../local_exchange/local_exchanger.h | 41 +++++++++---------- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 72e475f546..016b35b0f2 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -28,7 +28,7 @@ template bool Exchanger::_enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState& local_state, BlockType&& block) { - std::unique_lock l(_m); + std::unique_lock l(*_m[channel_id]); if (_data_queue[channel_id].enqueue(std::move(block))) { local_state._shared_state->set_ready_to_read(channel_id); return true; @@ -45,7 +45,7 @@ bool Exchanger::_dequeue_data(LocalExchangeSourceLocalState& local_st } else if (all_finished) { *eos = true; } else { - std::unique_lock l(_m); + std::unique_lock l(*_m[local_state._channel_id]); if (_data_queue[local_state._channel_id].try_dequeue(block)) { return true; } diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h index 6a6680c0bb..a9dcc8d866 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h @@ -119,9 +119,20 @@ template class Exchanger : public ExchangerBase { public: Exchanger(int running_sink_operators, int num_partitions, int free_block_limit) - : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {} + : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) { + _data_queue.resize(num_partitions); + _m.resize(num_partitions); + for (size_t i = 0; i < num_partitions; i++) { + _m[i] = std::make_unique(); + } + } Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) { + _data_queue.resize(num_partitions); + _m.resize(num_partitions); + for (size_t i = 0; i < num_partitions; i++) { + _m[i] = std::make_unique(); + } } ~Exchanger() override = default; std::string data_queue_debug_string(int i) override { @@ -134,9 +145,7 @@ protected: BlockType&& block); bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& block, bool* eos); std::vector> _data_queue; - -private: - std::mutex _m; + std::vector> _m; }; class LocalExchangeSourceLocalState; @@ -169,9 +178,7 @@ public: ENABLE_FACTORY_CREATOR(ShuffleExchanger); ShuffleExchanger(int running_sink_operators, int num_partitions, int free_block_limit) : Exchanger(running_sink_operators, num_partitions, - free_block_limit) { - _data_queue.resize(num_partitions); - } + free_block_limit) {} ~ShuffleExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) override; @@ -186,9 +193,7 @@ protected: bool ignore_source_data_distribution, int free_block_limit) : Exchanger(running_sink_operators, num_sources, num_partitions, free_block_limit), - _ignore_source_data_distribution(ignore_source_data_distribution) { - _data_queue.resize(num_partitions); - } + _ignore_source_data_distribution(ignore_source_data_distribution) {} Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, vectorized::Block* block, bool eos, LocalExchangeSinkLocalState& local_state); @@ -211,9 +216,7 @@ public: ENABLE_FACTORY_CREATOR(PassthroughExchanger); PassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit) : Exchanger(running_sink_operators, num_partitions, - free_block_limit) { - _data_queue.resize(num_partitions); - } + free_block_limit) {} ~PassthroughExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) override; @@ -229,9 +232,7 @@ public: ENABLE_FACTORY_CREATOR(PassToOneExchanger); PassToOneExchanger(int running_sink_operators, int num_partitions, int free_block_limit) : Exchanger(running_sink_operators, num_partitions, - free_block_limit) { - _data_queue.resize(num_partitions); - } + free_block_limit) {} ~PassToOneExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) override; @@ -247,9 +248,7 @@ public: ENABLE_FACTORY_CREATOR(BroadcastExchanger); BroadcastExchanger(int running_sink_operators, int num_partitions, int free_block_limit) : Exchanger(running_sink_operators, num_partitions, - free_block_limit) { - _data_queue.resize(num_partitions); - } + free_block_limit) {} ~BroadcastExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) override; @@ -268,9 +267,7 @@ public: AdaptivePassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit) : Exchanger(running_sink_operators, num_partitions, - free_block_limit) { - _data_queue.resize(num_partitions); - } + free_block_limit) {} Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) override;