[Improvement](local shuffle) Reduce locking scope in local exchanger … (#46294)

…(#46251)

Reduce lock scope from global level to data queue level.
This commit is contained in:
Gabriel
2025-01-03 09:14:22 +08:00
committed by GitHub
parent 79955e3d94
commit 34f9072ab6
2 changed files with 21 additions and 24 deletions

View File

@ -28,7 +28,7 @@ template <typename BlockType>
bool Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id,
LocalExchangeSinkLocalState& local_state,
BlockType&& block) {
std::unique_lock l(_m);
std::unique_lock l(*_m[channel_id]);
if (_data_queue[channel_id].enqueue(std::move(block))) {
local_state._shared_state->set_ready_to_read(channel_id);
return true;
@ -45,7 +45,7 @@ bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_st
} else if (all_finished) {
*eos = true;
} else {
std::unique_lock l(_m);
std::unique_lock l(*_m[local_state._channel_id]);
if (_data_queue[local_state._channel_id].try_dequeue(block)) {
return true;
}

View File

@ -119,9 +119,20 @@ template <typename BlockType>
class Exchanger : public ExchangerBase {
public:
Exchanger(int running_sink_operators, int num_partitions, int free_block_limit)
: ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {}
: ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {
_data_queue.resize(num_partitions);
_m.resize(num_partitions);
for (size_t i = 0; i < num_partitions; i++) {
_m[i] = std::make_unique<std::mutex>();
}
}
Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit)
: ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) {
_data_queue.resize(num_partitions);
_m.resize(num_partitions);
for (size_t i = 0; i < num_partitions; i++) {
_m[i] = std::make_unique<std::mutex>();
}
}
~Exchanger() override = default;
std::string data_queue_debug_string(int i) override {
@ -134,9 +145,7 @@ protected:
BlockType&& block);
bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& block, bool* eos);
std::vector<BlockQueue<BlockType>> _data_queue;
private:
std::mutex _m;
std::vector<std::unique_ptr<std::mutex>> _m;
};
class LocalExchangeSourceLocalState;
@ -169,9 +178,7 @@ public:
ENABLE_FACTORY_CREATOR(ShuffleExchanger);
ShuffleExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
: Exchanger<PartitionedBlock>(running_sink_operators, num_partitions,
free_block_limit) {
_data_queue.resize(num_partitions);
}
free_block_limit) {}
~ShuffleExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
LocalExchangeSinkLocalState& local_state) override;
@ -186,9 +193,7 @@ protected:
bool ignore_source_data_distribution, int free_block_limit)
: Exchanger<PartitionedBlock>(running_sink_operators, num_sources, num_partitions,
free_block_limit),
_ignore_source_data_distribution(ignore_source_data_distribution) {
_data_queue.resize(num_partitions);
}
_ignore_source_data_distribution(ignore_source_data_distribution) {}
Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids,
vectorized::Block* block, bool eos,
LocalExchangeSinkLocalState& local_state);
@ -211,9 +216,7 @@ public:
ENABLE_FACTORY_CREATOR(PassthroughExchanger);
PassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
: Exchanger<vectorized::Block>(running_sink_operators, num_partitions,
free_block_limit) {
_data_queue.resize(num_partitions);
}
free_block_limit) {}
~PassthroughExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
LocalExchangeSinkLocalState& local_state) override;
@ -229,9 +232,7 @@ public:
ENABLE_FACTORY_CREATOR(PassToOneExchanger);
PassToOneExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
: Exchanger<vectorized::Block>(running_sink_operators, num_partitions,
free_block_limit) {
_data_queue.resize(num_partitions);
}
free_block_limit) {}
~PassToOneExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
LocalExchangeSinkLocalState& local_state) override;
@ -247,9 +248,7 @@ public:
ENABLE_FACTORY_CREATOR(BroadcastExchanger);
BroadcastExchanger(int running_sink_operators, int num_partitions, int free_block_limit)
: Exchanger<vectorized::Block>(running_sink_operators, num_partitions,
free_block_limit) {
_data_queue.resize(num_partitions);
}
free_block_limit) {}
~BroadcastExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
LocalExchangeSinkLocalState& local_state) override;
@ -268,9 +267,7 @@ public:
AdaptivePassthroughExchanger(int running_sink_operators, int num_partitions,
int free_block_limit)
: Exchanger<vectorized::Block>(running_sink_operators, num_partitions,
free_block_limit) {
_data_queue.resize(num_partitions);
}
free_block_limit) {}
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
LocalExchangeSinkLocalState& local_state) override;