From a5189156266b8cb1dfccdec29000eb834276ed9a Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 7 Jun 2024 15:40:55 +0800 Subject: [PATCH] [fix](pipeline) Do not push data in local exchange if eos (#35972) (#36010) pick #35972 and #34536 --- .../pipeline/exec/hashjoin_probe_operator.cpp | 9 ++ .../pipeline/exec/hashjoin_probe_operator.h | 1 + .../partitioned_hash_join_probe_operator.cpp | 20 +++ .../partitioned_hash_join_probe_operator.h | 2 + be/src/pipeline/pipeline_x/dependency.h | 4 +- .../local_exchange_sink_operator.h | 4 +- .../local_exchange_source_operator.cpp | 6 +- .../local_exchange_source_operator.h | 5 +- .../local_exchange/local_exchanger.cpp | 115 ++++++++------ .../local_exchange/local_exchanger.h | 147 ++++++++++++------ .../pipeline_x_fragment_context.cpp | 38 +++-- .../org/apache/doris/qe/SessionVariable.java | 5 + gensrc/thrift/PaloInternalService.thrift | 1 + 13 files changed, 249 insertions(+), 108 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index a58ad62211..00cf6a65eb 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -373,6 +373,15 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc return Status::OK(); } +std::string HashJoinProbeLocalState::debug_string(int indentation_level) const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}, short_circuit_for_probe: {}", + JoinProbeLocalState::debug_string( + indentation_level), + _shared_state ? std::to_string(_shared_state->short_circuit_for_probe) : "NULL"); + return fmt::to_string(debug_string_buffer); +} + Status HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block, vectorized::ColumnUInt8::MutablePtr& null_map, vectorized::ColumnRawPtrs& raw_ptrs, diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index b4930307bc..5cdfe9feeb 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -94,6 +94,7 @@ public: const std::shared_ptr& build_block() const { return _shared_state->build_block; } + std::string debug_string(int indentation_level) const override; private: void _prepare_probe_block(); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 10fa2effcc..0576ae91dd 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -437,6 +437,16 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti return st; } +std::string PartitionedHashJoinProbeOperatorX::debug_string(RuntimeState* state, + int indentation_level) const { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "{}, in mem join probe: {}", + JoinProbeOperatorX::debug_string( + state, indentation_level), + _inner_probe_operator ? _inner_probe_operator->debug_string(state, 0) : "NULL"); + return fmt::to_string(debug_string_buffer); +} + Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState* state, uint32_t partition_index, bool& has_data) { @@ -872,6 +882,16 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); const auto need_to_spill = local_state._shared_state->need_to_spill; +#ifndef NDEBUG + Defer eos_check_defer([&] { + if (*eos) { + LOG(INFO) << "query: " << print_id(state->query_id()) + << ", hash probe node: " << node_id() << ", task: " << state->task_id() + << ", eos with child eos: " << local_state._child_eos + << ", need spill: " << need_to_spill; + } + }); +#endif if (need_more_input_data(state)) { if (need_to_spill && _should_revoke_memory(state)) { bool wait_for_io = false; diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 3b942d1575..d56a57ae42 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -155,6 +155,8 @@ public: Status pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) const override; + std::string debug_string(RuntimeState* state, int indentation_level = 0) const override; + bool need_more_input_data(RuntimeState* state) const override; DataDistribution required_data_distribution() const override { if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 72763d03c8..b60b3e9ae3 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -766,13 +766,13 @@ struct DataDistribution { std::vector partition_exprs; }; -class Exchanger; +class ExchangerBase; struct LocalExchangeSharedState : public BasicSharedState { public: ENABLE_FACTORY_CREATOR(LocalExchangeSharedState); LocalExchangeSharedState(int num_instances); - std::unique_ptr exchanger {}; + std::unique_ptr exchanger {}; std::vector mem_trackers; std::atomic mem_usage = 0; std::mutex le_lock; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index db6662a221..99b88747a9 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -21,7 +21,7 @@ namespace doris::pipeline { -class Exchanger; +class ExchangerBase; class ShuffleExchanger; class PassthroughExchanger; class BroadcastExchanger; @@ -50,7 +50,7 @@ private: friend class PassToOneExchanger; friend class AdaptivePassthroughExchanger; - Exchanger* _exchanger = nullptr; + ExchangerBase* _exchanger = nullptr; // Used by shuffle exchanger RuntimeProfile::Counter* _compute_hash_value_timer = nullptr; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp index dbc4e37bba..086a3b551f 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -66,14 +66,16 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, " - "_running_sink_operators: {}, _running_source_operators: {}", + "_running_sink_operators: {}, _running_source_operators: {}, mem_usage: {}", Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, - _exchanger->_running_sink_operators, _exchanger->_running_source_operators); + _exchanger->_running_sink_operators, _exchanger->_running_source_operators, + _shared_state->mem_usage.load()); size_t i = 0; fmt::format_to(debug_string_buffer, ", MemTrackers: "); for (auto* mem_tracker : _shared_state->mem_trackers) { fmt::format_to(debug_string_buffer, "{}: {}, ", i, mem_tracker->consumption()); + i++; } return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h index c7583d1351..7cefc1ca90 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h @@ -21,7 +21,7 @@ namespace doris::pipeline { -class Exchanger; +class ExchangerBase; class ShuffleExchanger; class PassthroughExchanger; class BroadcastExchanger; @@ -41,13 +41,14 @@ public: private: friend class LocalExchangeSourceOperatorX; + friend class ExchangerBase; friend class ShuffleExchanger; friend class PassthroughExchanger; friend class BroadcastExchanger; friend class PassToOneExchanger; friend class AdaptivePassthroughExchanger; - Exchanger* _exchanger = nullptr; + ExchangerBase* _exchanger = nullptr; int _channel_id; RuntimeProfile::Counter* _get_block_failed_counter = nullptr; RuntimeProfile::Counter* _copy_data_timer = nullptr; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index abcc0161fd..f02fd0e5f0 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -17,8 +17,10 @@ #include "pipeline/pipeline_x/local_exchange/local_exchanger.h" +#include "common/status.h" #include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h" #include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h" +#include "vec/runtime/partitioner.h" namespace doris::pipeline { @@ -41,6 +43,7 @@ Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) { PartitionedBlock partitioned_block; + _data_queue[local_state._channel_id].set_eos(); while (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { auto block_wrapper = partitioned_block.first; local_state._shared_state->sub_mem_usage( @@ -52,38 +55,36 @@ void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) { Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { PartitionedBlock partitioned_block; - std::unique_ptr mutable_block = nullptr; + vectorized::MutableBlock mutable_block; auto get_data = [&](vectorized::Block* result_block) -> Status { do { - const auto* offset_start = &(( - *std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]); + const auto* offset_start = partitioned_block.second.row_idxs->data() + + partitioned_block.second.offset_start; auto block_wrapper = partitioned_block.first; local_state._shared_state->sub_mem_usage( local_state._channel_id, block_wrapper->data_block.allocated_bytes(), false); - RETURN_IF_ERROR( - mutable_block->add_rows(&block_wrapper->data_block, offset_start, - offset_start + std::get<2>(partitioned_block.second))); + RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block, offset_start, + offset_start + partitioned_block.second.length)); block_wrapper->unref(local_state._shared_state); - } while (mutable_block->rows() < state->batch_size() && + } while (mutable_block.rows() < state->batch_size() && _data_queue[local_state._channel_id].try_dequeue(partitioned_block)); - *result_block = mutable_block->to_block(); return Status::OK(); }; + if (_running_sink_operators == 0) { if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { SCOPED_TIMER(local_state._copy_data_timer); - mutable_block = vectorized::MutableBlock::create_unique( - partitioned_block.first->data_block.clone_empty()); + mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( + block, partitioned_block.first->data_block); RETURN_IF_ERROR(get_data(block)); } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); *eos = true; } } else if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { SCOPED_TIMER(local_state._copy_data_timer); - mutable_block = vectorized::MutableBlock::create_unique( - partitioned_block.first->data_block.clone_empty()); + mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( + block, partitioned_block.first->data_block); RETURN_IF_ERROR(get_data(block)); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); @@ -97,7 +98,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest LocalExchangeSinkLocalState& local_state) { auto& data_queue = _data_queue; const auto rows = block->rows(); - auto row_idx = std::make_shared>(rows); + auto row_idx = std::make_shared>(rows); { local_state._partition_rows_histogram.assign(_num_partitions + 1, 0); for (size_t i = 0; i < rows; ++i) { @@ -107,7 +108,6 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest local_state._partition_rows_histogram[i] += local_state._partition_rows_histogram[i - 1]; } - for (int32_t i = rows - 1; i >= 0; --i) { (*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] - 1] = i; local_state._partition_rows_histogram[channel_ids[i]]--; @@ -134,13 +134,16 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest for (const auto& it : map) { DCHECK(it.second >= 0 && it.second < _num_partitions) << it.first << " : " << it.second << " " << _num_partitions; - size_t start = local_state._partition_rows_histogram[it.first]; - size_t size = local_state._partition_rows_histogram[it.first + 1] - start; + uint32_t start = local_state._partition_rows_histogram[it.first]; + uint32_t size = local_state._partition_rows_histogram[it.first + 1] - start; if (size > 0) { local_state._shared_state->add_mem_usage( it.second, new_block_wrapper->data_block.allocated_bytes(), false); - data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}}); - local_state._shared_state->set_ready_to_read(it.second); + if (data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}})) { + local_state._shared_state->set_ready_to_read(it.second); + } else { + new_block_wrapper->unref(local_state._shared_state); + } } else { new_block_wrapper->unref(local_state._shared_state); } @@ -148,13 +151,17 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest } else if (_num_senders != _num_sources || _ignore_source_data_distribution) { new_block_wrapper->ref(_num_partitions); for (size_t i = 0; i < _num_partitions; i++) { - size_t start = local_state._partition_rows_histogram[i]; - size_t size = local_state._partition_rows_histogram[i + 1] - start; + uint32_t start = local_state._partition_rows_histogram[i]; + uint32_t size = local_state._partition_rows_histogram[i + 1] - start; if (size > 0) { local_state._shared_state->add_mem_usage( i % _num_sources, new_block_wrapper->data_block.allocated_bytes(), false); - data_queue[i % _num_sources].enqueue({new_block_wrapper, {row_idx, start, size}}); - local_state._shared_state->set_ready_to_read(i % _num_sources); + if (data_queue[i % _num_sources].enqueue( + {new_block_wrapper, {row_idx, start, size}})) { + local_state._shared_state->set_ready_to_read(i % _num_sources); + } else { + new_block_wrapper->unref(local_state._shared_state); + } } else { new_block_wrapper->unref(local_state._shared_state); } @@ -164,13 +171,16 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest auto map = local_state._parent->cast()._bucket_seq_to_instance_idx; for (size_t i = 0; i < _num_partitions; i++) { - size_t start = local_state._partition_rows_histogram[i]; - size_t size = local_state._partition_rows_histogram[i + 1] - start; + uint32_t start = local_state._partition_rows_histogram[i]; + uint32_t size = local_state._partition_rows_histogram[i + 1] - start; if (size > 0) { local_state._shared_state->add_mem_usage( map[i], new_block_wrapper->data_block.allocated_bytes(), false); - data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}}); - local_state._shared_state->set_ready_to_read(map[i]); + if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}})) { + local_state._shared_state->set_ready_to_read(map[i]); + } else { + new_block_wrapper->unref(local_state._shared_state); + } } else { new_block_wrapper->unref(local_state._shared_state); } @@ -189,14 +199,16 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo new_block.swap(*in_block); auto channel_id = (local_state._channel_id++) % _num_partitions; local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes()); - _data_queue[channel_id].enqueue(std::move(new_block)); - local_state._shared_state->set_ready_to_read(channel_id); + if (_data_queue[channel_id].enqueue(std::move(new_block))) { + local_state._shared_state->set_ready_to_read(channel_id); + } return Status::OK(); } void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; + _data_queue[local_state._channel_id].set_eos(); while (_data_queue[local_state._channel_id].try_dequeue(next_block)) { local_state._shared_state->sub_mem_usage(local_state._channel_id, next_block.allocated_bytes()); @@ -209,16 +221,21 @@ Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* b if (_running_sink_operators == 0) { if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { block->swap(next_block); - _free_blocks.enqueue(std::move(next_block)); local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); + if (_free_block_limit == 0 || + _free_blocks.size_approx() < _free_block_limit * _num_sources) { + _free_blocks.enqueue(std::move(next_block)); + } } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); *eos = true; } } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { block->swap(next_block); - _free_blocks.enqueue(std::move(next_block)); + if (_free_block_limit == 0 || + _free_blocks.size_approx() < _free_block_limit * _num_sources) { + _free_blocks.enqueue(std::move(next_block)); + } local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); @@ -231,8 +248,9 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block LocalExchangeSinkLocalState& local_state) { vectorized::Block new_block(in_block->clone_empty()); new_block.swap(*in_block); - _data_queue[0].enqueue(std::move(new_block)); - local_state._shared_state->set_ready_to_read(0); + if (_data_queue[0].enqueue(std::move(new_block))) { + local_state._shared_state->set_ready_to_read(0); + } return Status::OK(); } @@ -248,7 +266,6 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo if (_data_queue[0].try_dequeue(next_block)) { *block = std::move(next_block); } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); *eos = true; } } else if (_data_queue[0].try_dequeue(next_block)) { @@ -265,8 +282,9 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block for (size_t i = 0; i < _num_partitions; i++) { auto mutable_block = vectorized::MutableBlock::create_unique(in_block->clone_empty()); RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, in_block->rows())); - _data_queue[i].enqueue(mutable_block->to_block()); - local_state._shared_state->set_ready_to_read(i); + if (_data_queue[i].enqueue(mutable_block->to_block())) { + local_state._shared_state->set_ready_to_read(i); + } } return Status::OK(); @@ -274,6 +292,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; + _data_queue[local_state._channel_id].set_eos(); while (_data_queue[local_state._channel_id].try_dequeue(next_block)) { // do nothing } @@ -286,7 +305,6 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { *block = std::move(next_block); } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); *eos = true; } } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { @@ -308,8 +326,9 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, new_block.swap(*in_block); auto channel_id = (local_state._channel_id++) % _num_partitions; local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes()); - _data_queue[channel_id].enqueue(std::move(new_block)); - local_state._shared_state->set_ready_to_read(channel_id); + if (_data_queue[channel_id].enqueue(std::move(new_block))) { + local_state._shared_state->set_ready_to_read(channel_id); + } return Status::OK(); } @@ -365,9 +384,10 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, RETURN_IF_ERROR(mutable_block->add_rows(block, start, size)); auto new_block = mutable_block->to_block(); local_state._shared_state->add_mem_usage(i, new_block.allocated_bytes()); - data_queue[i].enqueue(std::move(new_block)); + if (data_queue[i].enqueue(std::move(new_block))) { + local_state._shared_state->set_ready_to_read(i); + } } - local_state._shared_state->set_ready_to_read(i); } return Status::OK(); } @@ -391,16 +411,21 @@ Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized:: if (_running_sink_operators == 0) { if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { block->swap(next_block); - _free_blocks.enqueue(std::move(next_block)); + if (_free_block_limit == 0 || + _free_blocks.size_approx() < _free_block_limit * _num_sources) { + _free_blocks.enqueue(std::move(next_block)); + } local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); *eos = true; } } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { block->swap(next_block); - _free_blocks.enqueue(std::move(next_block)); + if (_free_block_limit == 0 || + _free_blocks.size_approx() < _free_block_limit * _num_sources) { + _free_blocks.enqueue(std::move(next_block)); + } local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h index f3b210b11f..ee0b5e286d 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h @@ -17,6 +17,7 @@ #pragma once +#include "pipeline/pipeline_x/dependency.h" #include "pipeline/pipeline_x/operator.h" namespace doris::pipeline { @@ -24,28 +25,34 @@ namespace doris::pipeline { class LocalExchangeSourceLocalState; class LocalExchangeSinkLocalState; struct ShuffleBlockWrapper; +class SortSourceOperatorX; -class Exchanger { +class ExchangerBase { public: - Exchanger(int running_sink_operators, int num_partitions) + ExchangerBase(int running_sink_operators, int num_partitions, int free_block_limit) : _running_sink_operators(running_sink_operators), _running_source_operators(num_partitions), _num_partitions(num_partitions), _num_senders(running_sink_operators), - _num_sources(num_partitions) {} - Exchanger(int running_sink_operators, int num_sources, int num_partitions) + _num_sources(num_partitions), + _free_block_limit(free_block_limit) {} + ExchangerBase(int running_sink_operators, int num_sources, int num_partitions, + int free_block_limit) : _running_sink_operators(running_sink_operators), _running_source_operators(num_partitions), _num_partitions(num_partitions), _num_senders(running_sink_operators), - _num_sources(num_sources) {} - virtual ~Exchanger() = default; + _num_sources(num_sources), + _free_block_limit(free_block_limit) {} + virtual ~ExchangerBase() = default; virtual Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) = 0; virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) = 0; virtual ExchangeType get_type() const = 0; - virtual void close(LocalExchangeSourceLocalState& local_state) {} + virtual void close(LocalExchangeSourceLocalState& local_state) = 0; + + virtual DependencySPtr get_local_state_dependency(int _channel_id) { return nullptr; } protected: friend struct LocalExchangeSharedState; @@ -58,9 +65,60 @@ protected: const int _num_partitions; const int _num_senders; const int _num_sources; + const int _free_block_limit = 0; moodycamel::ConcurrentQueue _free_blocks; }; +struct PartitionedRowIdxs { + std::shared_ptr> row_idxs; + uint32_t offset_start; + uint32_t length; +}; + +using PartitionedBlock = std::pair, PartitionedRowIdxs>; + +template +struct BlockQueue { + std::atomic eos = false; + moodycamel::ConcurrentQueue data_queue; + BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue()) {} + BlockQueue(BlockQueue&& other) + : eos(other.eos.load()), data_queue(std::move(other.data_queue)) {} + inline bool enqueue(BlockType const& item) { + if (!eos) { + data_queue.enqueue(item); + return true; + } + return false; + } + + inline bool enqueue(BlockType&& item) { + if (!eos) { + data_queue.enqueue(std::move(item)); + return true; + } + return false; + } + + bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); } + + void set_eos() { eos = true; } +}; + +template +class Exchanger : public ExchangerBase { +public: + Exchanger(int running_sink_operators, int num_partitions, int free_block_limit) + : ExchangerBase(running_sink_operators, num_partitions, free_block_limit) {} + Exchanger(int running_sink_operators, int num_sources, int num_partitions, int free_block_limit) + : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) { + } + ~Exchanger() override = default; + +protected: + std::vector> _data_queue; +}; + class LocalExchangeSourceLocalState; class LocalExchangeSinkLocalState; @@ -71,23 +129,25 @@ struct ShuffleBlockWrapper { void unref(LocalExchangeSharedState* shared_state) { if (ref_count.fetch_sub(1) == 1) { shared_state->sub_total_mem_usage(data_block.allocated_bytes()); - data_block.clear_column_data(); - shared_state->exchanger->_free_blocks.enqueue(std::move(data_block)); + if (shared_state->exchanger->_free_block_limit == 0 || + shared_state->exchanger->_free_blocks.size_approx() < + shared_state->exchanger->_free_block_limit * + shared_state->exchanger->_num_sources) { + data_block.clear_column_data(); + shared_state->exchanger->_free_blocks.enqueue(std::move(data_block)); + } } } std::atomic ref_count = 0; vectorized::Block data_block; }; -class ShuffleExchanger : public Exchanger { - using PartitionedBlock = - std::pair, - std::tuple>, size_t, size_t>>; - +class ShuffleExchanger : public Exchanger { public: ENABLE_FACTORY_CREATOR(ShuffleExchanger); - ShuffleExchanger(int running_sink_operators, int num_partitions) - : Exchanger(running_sink_operators, num_partitions) { + ShuffleExchanger(int running_sink_operators, int num_partitions, int free_block_limit) + : Exchanger(running_sink_operators, num_partitions, + free_block_limit) { _data_queue.resize(num_partitions); } ~ShuffleExchanger() override = default; @@ -101,8 +161,9 @@ public: protected: ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, - bool ignore_source_data_distribution) - : Exchanger(running_sink_operators, num_sources, num_partitions), + bool ignore_source_data_distribution, int free_block_limit) + : Exchanger(running_sink_operators, num_sources, num_partitions, + free_block_limit), _ignore_source_data_distribution(ignore_source_data_distribution) { _data_queue.resize(num_partitions); } @@ -110,26 +171,25 @@ protected: vectorized::Block* block, bool eos, LocalExchangeSinkLocalState& local_state); - std::vector> _data_queue; - const bool _ignore_source_data_distribution = false; }; -class BucketShuffleExchanger : public ShuffleExchanger { +class BucketShuffleExchanger final : public ShuffleExchanger { ENABLE_FACTORY_CREATOR(BucketShuffleExchanger); BucketShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions, - bool ignore_source_data_distribution) + bool ignore_source_data_distribution, int free_block_limit) : ShuffleExchanger(running_sink_operators, num_sources, num_partitions, - ignore_source_data_distribution) {} + ignore_source_data_distribution, free_block_limit) {} ~BucketShuffleExchanger() override = default; ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; } }; -class PassthroughExchanger final : public Exchanger { +class PassthroughExchanger final : public Exchanger { public: ENABLE_FACTORY_CREATOR(PassthroughExchanger); - PassthroughExchanger(int running_sink_operators, int num_partitions) - : Exchanger(running_sink_operators, num_partitions) { + PassthroughExchanger(int running_sink_operators, int num_partitions, int free_block_limit) + : Exchanger(running_sink_operators, num_partitions, + free_block_limit) { _data_queue.resize(num_partitions); } ~PassthroughExchanger() override = default; @@ -140,16 +200,14 @@ public: LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; } void close(LocalExchangeSourceLocalState& local_state) override; - -private: - std::vector> _data_queue; }; -class PassToOneExchanger final : public Exchanger { +class PassToOneExchanger final : public Exchanger { public: ENABLE_FACTORY_CREATOR(PassToOneExchanger); - PassToOneExchanger(int running_sink_operators, int num_partitions) - : Exchanger(running_sink_operators, num_partitions) { + PassToOneExchanger(int running_sink_operators, int num_partitions, int free_block_limit) + : Exchanger(running_sink_operators, num_partitions, + free_block_limit) { _data_queue.resize(num_partitions); } ~PassToOneExchanger() override = default; @@ -159,16 +217,15 @@ public: Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; } - -private: - std::vector> _data_queue; + void close(LocalExchangeSourceLocalState& local_state) override {} }; -class BroadcastExchanger final : public Exchanger { +class BroadcastExchanger final : public Exchanger { public: ENABLE_FACTORY_CREATOR(BroadcastExchanger); - BroadcastExchanger(int running_sink_operators, int num_partitions) - : Exchanger(running_sink_operators, num_partitions) { + BroadcastExchanger(int running_sink_operators, int num_partitions, int free_block_limit) + : Exchanger(running_sink_operators, num_partitions, + free_block_limit) { _data_queue.resize(num_partitions); } ~BroadcastExchanger() override = default; @@ -179,18 +236,17 @@ public: LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::BROADCAST; } void close(LocalExchangeSourceLocalState& local_state) override; - -private: - std::vector> _data_queue; }; //The code in AdaptivePassthroughExchanger is essentially // a copy of ShuffleExchanger and PassthroughExchanger. -class AdaptivePassthroughExchanger : public Exchanger { +class AdaptivePassthroughExchanger : public Exchanger { public: ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger); - AdaptivePassthroughExchanger(int running_sink_operators, int num_partitions) - : Exchanger(running_sink_operators, num_partitions) { + AdaptivePassthroughExchanger(int running_sink_operators, int num_partitions, + int free_block_limit) + : Exchanger(running_sink_operators, num_partitions, + free_block_limit) { _data_queue.resize(num_partitions); } Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, @@ -200,6 +256,8 @@ public: LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::ADAPTIVE_PASSTHROUGH; } + void close(LocalExchangeSourceLocalState& local_state) override {} + private: Status _passthrough_sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state); @@ -208,7 +266,6 @@ private: Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, vectorized::Block* block, bool eos, LocalExchangeSinkLocalState& local_state); - std::vector> _data_queue; std::atomic_bool _is_pass_through = false; std::atomic_int32_t _total_block = 0; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index ccc51faa77..bcb3c53b53 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -801,28 +801,46 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( case ExchangeType::HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique( std::max(cur_pipe->num_tasks(), _num_instances), - is_shuffled_hash_join ? _total_instances : _num_instances); + is_shuffled_hash_join ? _total_instances : _num_instances, + _runtime_state->query_options().__isset.local_exchange_free_blocks_limit + ? _runtime_state->query_options().local_exchange_free_blocks_limit + : 0); break; case ExchangeType::BUCKET_HASH_SHUFFLE: shared_state->exchanger = BucketShuffleExchanger::create_unique( std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets, - ignore_data_hash_distribution); + ignore_data_hash_distribution, + _runtime_state->query_options().__isset.local_exchange_free_blocks_limit + ? _runtime_state->query_options().local_exchange_free_blocks_limit + : 0); break; case ExchangeType::PASSTHROUGH: - shared_state->exchanger = - PassthroughExchanger::create_unique(cur_pipe->num_tasks(), _num_instances); + shared_state->exchanger = PassthroughExchanger::create_unique( + cur_pipe->num_tasks(), _num_instances, + _runtime_state->query_options().__isset.local_exchange_free_blocks_limit + ? _runtime_state->query_options().local_exchange_free_blocks_limit + : 0); break; case ExchangeType::BROADCAST: - shared_state->exchanger = - BroadcastExchanger::create_unique(cur_pipe->num_tasks(), _num_instances); + shared_state->exchanger = BroadcastExchanger::create_unique( + cur_pipe->num_tasks(), _num_instances, + _runtime_state->query_options().__isset.local_exchange_free_blocks_limit + ? _runtime_state->query_options().local_exchange_free_blocks_limit + : 0); break; case ExchangeType::PASS_TO_ONE: - shared_state->exchanger = - BroadcastExchanger::create_unique(cur_pipe->num_tasks(), _num_instances); + shared_state->exchanger = BroadcastExchanger::create_unique( + cur_pipe->num_tasks(), _num_instances, + _runtime_state->query_options().__isset.local_exchange_free_blocks_limit + ? _runtime_state->query_options().local_exchange_free_blocks_limit + : 0); break; case ExchangeType::ADAPTIVE_PASSTHROUGH: - shared_state->exchanger = - AdaptivePassthroughExchanger::create_unique(cur_pipe->num_tasks(), _num_instances); + shared_state->exchanger = AdaptivePassthroughExchanger::create_unique( + cur_pipe->num_tasks(), _num_instances, + _runtime_state->query_options().__isset.local_exchange_free_blocks_limit + ? _runtime_state->query_options().local_exchange_free_blocks_limit + : 0); break; default: return Status::InternalError("Unsupported local exchange type : " + diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 0347964352..01cbfa82c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -76,6 +76,7 @@ public class SessionVariable implements Serializable, Writable { public static final Logger LOG = LogManager.getLogger(SessionVariable.class); public static final String EXEC_MEM_LIMIT = "exec_mem_limit"; + public static final String LOCAL_EXCHANGE_FREE_BLOCKS_LIMIT = "local_exchange_free_blocks_limit"; public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit"; public static final String NUM_SCANNER_THREADS = "num_scanner_threads"; public static final String SCANNER_SCALE_UP_RATIO = "scanner_scale_up_ratio"; @@ -615,6 +616,9 @@ public class SessionVariable implements Serializable, Writable { }) public int numScannerThreads = 0; + @VariableMgr.VarAttr(name = LOCAL_EXCHANGE_FREE_BLOCKS_LIMIT) + public int localExchangeFreeBlocksLimit = 4; + @VariableMgr.VarAttr(name = SCANNER_SCALE_UP_RATIO, needForward = true, description = { "ScanNode自适应的增加扫描并发,最大允许增长的并发倍率,默认为0,关闭该功能", "The max multiple of increasing the concurrency of scanners adaptively, " @@ -3157,6 +3161,7 @@ public class SessionVariable implements Serializable, Writable { public TQueryOptions toThrift() { TQueryOptions tResult = new TQueryOptions(); tResult.setMemLimit(maxExecMemByte); + tResult.setLocalExchangeFreeBlocksLimit(localExchangeFreeBlocksLimit); tResult.setScanQueueMemLimit(maxScanQueueMemByte); tResult.setNumScannerThreads(numScannerThreads); tResult.setScannerScaleUpRatio(scannerScaleUpRatio); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 54c6c57bdd..ad8836fefb 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -288,6 +288,7 @@ struct TQueryOptions { // max rows of each sub-queue in DataQueue. 106: optional i64 data_queue_max_blocks = 0; + 108: optional i64 local_exchange_free_blocks_limit; 110: optional bool enable_parquet_filter_by_min_max = true 111: optional bool enable_orc_filter_by_min_max = true