diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index aa22f4cbd6..851c947796 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -210,12 +210,14 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf } if (p._part_type == TPartitionType::HASH_PARTITIONED) { _partition_count = channels.size(); - _partitioner.reset(new vectorized::HashPartitioner(channels.size())); + _partitioner.reset( + new vectorized::XXHashPartitioner(channels.size())); RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); } else if (p._part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { _partition_count = channel_shared_ptrs.size(); - _partitioner.reset(new vectorized::BucketHashPartitioner(channel_shared_ptrs.size())); + _partitioner.reset(new vectorized::Crc32HashPartitioner( + channel_shared_ptrs.size())); RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); } @@ -388,12 +390,12 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (_part_type == TPartitionType::HASH_PARTITIONED) { RETURN_IF_ERROR(channel_add_rows(state, local_state.channels, local_state._partition_count, - (uint64_t*)local_state._partitioner->get_hash_values(), + (uint64_t*)local_state._partitioner->get_channel_ids(), rows, block, source_state == SourceState::FINISHED)); } else { RETURN_IF_ERROR(channel_add_rows(state, local_state.channel_shared_ptrs, local_state._partition_count, - (uint32_t*)local_state._partitioner->get_hash_values(), + (uint32_t*)local_state._partitioner->get_channel_ids(), rows, block, source_state == SourceState::FINISHED)); } } else { diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 545b495906..488c33c5a4 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -23,6 +23,7 @@ #include #include +#include "concurrentqueue.h" #include "pipeline/exec/data_queue.h" #include "pipeline/exec/multi_cast_data_streamer.h" #include "vec/common/hash_table/hash_map_context_creator.h" @@ -811,5 +812,48 @@ private: bool is_set_probe {false}; }; +struct LocalExchangeSharedState { +public: + ENABLE_FACTORY_CREATOR(LocalExchangeSharedState); + std::vector> data_queue; + int num_partitions = 0; + std::atomic running_sink_operators = 0; +}; + +struct LocalExchangeDependency final : public WriteDependency { +public: + using SharedState = LocalExchangeSharedState; + LocalExchangeDependency(int id) + : WriteDependency(id, "LocalExchangeDependency"), + _local_exchange_shared_state(nullptr) {} + ~LocalExchangeDependency() override = default; + void* shared_state() override { return _local_exchange_shared_state.get(); } + void set_shared_state(std::shared_ptr state) { + DCHECK(_local_exchange_shared_state == nullptr); + _local_exchange_shared_state = state; + } + + void set_channel_id(int channel_id) { _channel_id = channel_id; } + + Dependency* read_blocked_by() override { + if (config::enable_fuzzy_mode && !_should_run() && + _read_dependency_watcher.elapsed_time() > SLOW_DEPENDENCY_THRESHOLD) { + LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " " + << id(); + } + return _should_run() ? nullptr : this; + } + +private: + bool _should_run() const { + DCHECK(_local_exchange_shared_state != nullptr); + return _local_exchange_shared_state->data_queue[_channel_id].size_approx() > 0 || + _local_exchange_shared_state->running_sink_operators == 0; + } + + std::shared_ptr _local_exchange_shared_state; + int _channel_id; +}; + } // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp new file mode 100644 index 0000000000..ec959b20ce --- /dev/null +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h" + +namespace doris::pipeline { + +Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(_open_timer); + _compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime"); + _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime"); + auto& p = _parent->cast(); + RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner)); + _mutable_block.resize(p._num_partitions); + _shared_state->running_sink_operators++; + return Status::OK(); +} + +Status LocalExchangeSinkLocalState::channel_add_rows(RuntimeState* state, + const uint32_t* __restrict channel_ids, + vectorized::Block* block, + SourceState source_state) { + auto& data_queue = _shared_state->data_queue; + std::vector channel2rows[data_queue.size()]; + + auto rows = block->rows(); + for (int i = 0; i < rows; i++) { + channel2rows[channel_ids[i]].emplace_back(i); + } + for (size_t i = 0; i < data_queue.size(); i++) { + if (_mutable_block[i] == nullptr) { + _mutable_block[i] = vectorized::MutableBlock::create_unique(block->clone_empty()); + } + + const int* begin = channel2rows[i].data(); + _mutable_block[i]->add_rows(block, begin, begin + channel2rows[i].size()); + if (_mutable_block[i]->rows() > state->batch_size() || + source_state == SourceState::FINISHED) { + data_queue[i].enqueue(_mutable_block[i]->to_block()); + _mutable_block[i].reset(nullptr); + } + } + + return Status::OK(); +} + +Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) { + auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state.profile()->total_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + { + SCOPED_TIMER(local_state._compute_hash_value_timer); + RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, in_block, + local_state.mem_tracker())); + } + { + SCOPED_TIMER(local_state._distribute_timer); + RETURN_IF_ERROR(local_state.channel_add_rows( + state, (const uint32_t*)local_state._partitioner->get_channel_ids(), in_block, + source_state)); + } + + if (source_state == SourceState::FINISHED) { + local_state._shared_state->running_sink_operators--; + } + + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h new file mode 100644 index 0000000000..4025def28e --- /dev/null +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "pipeline/pipeline_x/dependency.h" +#include "pipeline/pipeline_x/operator.h" + +namespace doris::pipeline { + +class LocalExchangeSinkOperatorX; +class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState { +public: + using Base = PipelineXSinkLocalState; + ENABLE_FACTORY_CREATOR(LocalExchangeSinkLocalState); + + LocalExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {} + ~LocalExchangeSinkLocalState() override = default; + + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + + Status channel_add_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, + vectorized::Block* block, SourceState source_state); + +private: + friend class LocalExchangeSinkOperatorX; + + RuntimeProfile::Counter* _compute_hash_value_timer = nullptr; + RuntimeProfile::Counter* _distribute_timer = nullptr; + std::unique_ptr _partitioner; + std::vector> _mutable_block; +}; + +// A single 32-bit division on a recent x64 processor has a throughput of one instruction every six cycles with a latency of 26 cycles. +// In contrast, a multiplication has a throughput of one instruction every cycle and a latency of 3 cycles. +// So we prefer to this algorithm instead of modulo. +// Reference: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ +struct LocalExchangeChannelIds { + static constexpr auto SHIFT_BITS = 32; + uint32_t operator()(uint32_t l, uint32_t r) { + return ((uint64_t)l * (uint64_t)r) >> SHIFT_BITS; + } +}; + +class LocalExchangeSinkOperatorX final : public DataSinkOperatorX { +public: + using Base = DataSinkOperatorX; + LocalExchangeSinkOperatorX(int sink_id, int num_partitions, const std::vector& texprs) + : Base(sink_id, -1), _num_partitions(num_partitions), _texprs(texprs) {} + + Status init(const TPlanNode& tnode, RuntimeState* state) override { + return Status::InternalError("{} should not init with TPlanNode", Base::_name); + } + + Status init(const TDataSink& tsink) override { + return Status::InternalError("{} should not init with TPlanNode", Base::_name); + } + + Status init() override { + _name = "LOCAL_EXCHANGE_SINK_OPERATOR"; + _partitioner.reset( + new vectorized::Crc32HashPartitioner(_num_partitions)); + RETURN_IF_ERROR(_partitioner->init(_texprs)); + return Status::OK(); + } + + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc())); + return Status::OK(); + } + + Status open(RuntimeState* state) override { + RETURN_IF_ERROR(_partitioner->open(state)); + return Status::OK(); + } + + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override; + +private: + friend class LocalExchangeSinkLocalState; + const int _num_partitions; + const std::vector& _texprs; + std::unique_ptr _partitioner; +}; + +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp new file mode 100644 index 0000000000..83c712a802 --- /dev/null +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h" + +namespace doris::pipeline { + +Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(_open_timer); + _dependency->set_shared_state(info.local_exchange_state); + _shared_state = (LocalExchangeSharedState*)_dependency->shared_state(); + DCHECK(_shared_state != nullptr); + _channel_id = info.task_idx; + _dependency->set_channel_id(_channel_id); + _get_block_failed_counter = + ADD_COUNTER_WITH_LEVEL(profile(), "GetBlockFailedTime", TUnit::UNIT, 1); + return Status::OK(); +} + +Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { + auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state.profile()->total_time_counter()); + if (!local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(*block)) { + COUNTER_UPDATE(local_state._get_block_failed_counter, 1); + if (local_state._shared_state->running_sink_operators == 0) { + source_state = SourceState::FINISHED; + } + } + + local_state.reached_limit(block, source_state); + + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h new file mode 100644 index 0000000000..ddd47ec18c --- /dev/null +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "pipeline/pipeline_x/dependency.h" +#include "pipeline/pipeline_x/operator.h" + +namespace doris::pipeline { + +class LocalExchangeSourceOperatorX; +class LocalExchangeSourceLocalState final : public PipelineXLocalState { +public: + using Base = PipelineXLocalState; + ENABLE_FACTORY_CREATOR(LocalExchangeSourceLocalState); + LocalExchangeSourceLocalState(RuntimeState* state, OperatorXBase* parent) + : Base(state, parent) {} + + Status init(RuntimeState* state, LocalStateInfo& info) override; + +private: + friend class LocalExchangeSourceOperatorX; + + int _channel_id; + RuntimeProfile::Counter* _get_block_failed_counter = nullptr; +}; + +class LocalExchangeSourceOperatorX final : public OperatorX { +public: + using Base = OperatorX; + LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, -1, id) {} + Status init(const TPlanNode& tnode, RuntimeState* state) override { + _op_name = "LOCAL_EXCHANGE_OPERATOR"; + return Status::OK(); + } + Status prepare(RuntimeState* state) override { return Status::OK(); } + Status open(RuntimeState* state) override { return Status::OK(); } + const RowDescriptor& intermediate_row_desc() const override { + return _child_x->intermediate_row_desc(); + } + RowDescriptor& row_descriptor() override { return _child_x->row_descriptor(); } + const RowDescriptor& row_desc() override { return _child_x->row_desc(); } + + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + + bool is_source() const override { return true; } + +private: + friend class LocalExchangeSourceLocalState; +}; + +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 2dc42a3b0a..7fa8a14993 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -64,6 +64,8 @@ #include "pipeline/exec/union_sink_operator.h" #include "pipeline/exec/union_source_operator.h" #include "pipeline/pipeline_x/dependency.h" +#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h" +#include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h" #include "util/debug_util.h" #include "util/runtime_profile.h" @@ -533,6 +535,7 @@ DECLARE_OPERATOR_X(ResultFileSinkLocalState) DECLARE_OPERATOR_X(OlapTableSinkLocalState) DECLARE_OPERATOR_X(AnalyticSinkLocalState) DECLARE_OPERATOR_X(SortSinkLocalState) +DECLARE_OPERATOR_X(LocalExchangeSinkLocalState) DECLARE_OPERATOR_X(BlockingAggSinkLocalState) DECLARE_OPERATOR_X(StreamingAggSinkLocalState) DECLARE_OPERATOR_X(DistinctStreamingAggSinkLocalState) @@ -571,6 +574,7 @@ DECLARE_OPERATOR_X(SetSourceLocalState) DECLARE_OPERATOR_X(DataGenLocalState) DECLARE_OPERATOR_X(SchemaScanLocalState) DECLARE_OPERATOR_X(MetaScanLocalState) +DECLARE_OPERATOR_X(LocalExchangeSourceLocalState) #undef DECLARE_OPERATOR_X @@ -592,6 +596,7 @@ template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; +template class PipelineXSinkLocalState; template class PipelineXLocalState; template class PipelineXLocalState; @@ -603,6 +608,7 @@ template class PipelineXLocalState; template class PipelineXLocalState; template class PipelineXLocalState; template class PipelineXLocalState; +template class PipelineXLocalState; template class AsyncWriterSink; template class AsyncWriterSink; diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index b4f5adf573..2e51600d6f 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -36,6 +36,8 @@ struct LocalStateInfo { RuntimeProfile* parent_profile; const std::vector scan_ranges; std::vector& dependencys; + std::shared_ptr local_exchange_state; + int task_idx; }; // This struct is used only for initializing local sink state. @@ -236,7 +238,7 @@ public: [[nodiscard]] OperatorXPtr get_child() { return _child_x; } [[nodiscard]] vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; } - [[nodiscard]] RowDescriptor& row_descriptor() { return _row_descriptor; } + [[nodiscard]] virtual RowDescriptor& row_descriptor() { return _row_descriptor; } [[nodiscard]] int id() const override { return node_id(); } [[nodiscard]] int operator_id() const { return _operator_id; } @@ -244,7 +246,7 @@ public: [[nodiscard]] int64_t limit() const { return _limit; } - [[nodiscard]] const RowDescriptor& row_desc() override { + [[nodiscard]] virtual const RowDescriptor& row_desc() override { return _output_row_descriptor ? *_output_row_descriptor : _row_descriptor; } @@ -418,6 +420,9 @@ public: virtual Status init(const TPlanNode& tnode, RuntimeState* state); Status init(const TDataSink& tsink) override; + virtual Status init() { + return Status::InternalError("init() is only implemented in local exchange!"); + } Status prepare(RuntimeState* state) override { return Status::OK(); } Status open(RuntimeState* state) override { return Status::OK(); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 77f1672f74..333eb9f816 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -84,6 +84,8 @@ #include "pipeline/exec/table_function_operator.h" #include "pipeline/exec/union_sink_operator.h" #include "pipeline/exec/union_source_operator.h" +#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h" +#include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h" #include "pipeline/task_scheduler.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" @@ -411,9 +413,15 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( _runtime_states[i]->resize_op_id_to_local_state(max_operator_id()); std::map pipeline_id_to_task; for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { - auto task = std::make_unique(_pipelines[pip_idx], _total_tasks++, - _runtime_states[i].get(), this, - _runtime_states[i]->runtime_profile()); + auto task = std::make_unique( + _pipelines[pip_idx], _total_tasks++, _runtime_states[i].get(), this, + _runtime_states[i]->runtime_profile(), + _op_id_to_le_state.count( + _pipelines[pip_idx]->operator_xs().front()->operator_id()) > 0 + ? _op_id_to_le_state + [_pipelines[pip_idx]->operator_xs().front()->operator_id()] + : nullptr, + i); pipeline_id_to_task.insert({_pipelines[pip_idx]->id(), task.get()}); _tasks[i].emplace_back(std::move(task)); } @@ -463,6 +471,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( _union_child_pipelines.clear(); _set_child_pipelines.clear(); _dag.clear(); + _op_id_to_le_state.clear(); return Status::OK(); } @@ -538,6 +547,37 @@ Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool, return Status::OK(); } +Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorXPtr& op, + PipelinePtr& cur_pipe, + const std::vector& texprs) { + if (!_runtime_state->enable_local_shuffle() || + _runtime_state->query_parallel_instance_num() == 1) { + return Status::OK(); + } + auto local_exchange_id = next_operator_id(); + op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id)); + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + + const auto downstream_pipeline_id = cur_pipe->id(); + if (_dag.find(downstream_pipeline_id) == _dag.end()) { + _dag.insert({downstream_pipeline_id, {}}); + } + cur_pipe = add_pipeline(); + _dag[downstream_pipeline_id].push_back(cur_pipe->id()); + + DataSinkOperatorXPtr sink; + sink.reset(new LocalExchangeSinkOperatorX( + local_exchange_id, _runtime_state->query_parallel_instance_num(), texprs)); + RETURN_IF_ERROR(cur_pipe->set_sink(sink)); + RETURN_IF_ERROR(cur_pipe->sink_x()->init()); + + auto shared_state = LocalExchangeSharedState::create_shared(); + shared_state->data_queue.resize(_runtime_state->query_parallel_instance_num()); + shared_state->num_partitions = _runtime_state->query_parallel_instance_num(); + _op_id_to_le_state.insert({local_exchange_id, shared_state}); + return Status::OK(); +} + Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr& op, @@ -633,6 +673,12 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); + + if (!tnode.agg_node.need_finalize) { + RETURN_IF_ERROR(op->init(tnode, _runtime_state.get())); + RETURN_IF_ERROR( + _add_local_exchange(pool, op, cur_pipe, tnode.agg_node.grouping_exprs)); + } } break; } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 2f3db31687..8c7777f95b 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -118,6 +118,8 @@ public: private: void _close_action() override; Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override; + Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op, PipelinePtr& cur_pipe, + const std::vector& texprs); [[nodiscard]] Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, @@ -178,6 +180,8 @@ private: // but some operators do not have a corresponding plan node ID. // We set these IDs as negative numbers, which are not visible to the user. int _operator_id = 0; + + std::map> _op_id_to_le_state; }; } // namespace pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 9f5d08c2b5..30be4f6240 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -44,14 +44,18 @@ class RuntimeState; namespace doris::pipeline { -PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, +PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state, PipelineFragmentContext* fragment_context, - RuntimeProfile* parent_profile) - : PipelineTask(pipeline, index, state, fragment_context, parent_profile), + RuntimeProfile* parent_profile, + std::shared_ptr local_exchange_state, + int task_idx) + : PipelineTask(pipeline, task_id, state, fragment_context, parent_profile), _operators(pipeline->operator_xs()), _source(_operators.front()), _root(_operators.back()), - _sink(pipeline->sink_shared_pointer()) { + _sink(pipeline->sink_shared_pointer()), + _local_exchange_state(local_exchange_state), + _task_idx(task_idx) { _pipeline_task_watcher.start(); _sink->get_dependency(_downstream_dependency); } @@ -82,7 +86,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams op_idx == _operators.size() - 1 ? _parent_profile : state->get_local_state(_operators[op_idx + 1]->operator_id())->profile(), - scan_ranges, deps}; + scan_ranges, deps, _local_exchange_state, _task_idx}; RETURN_IF_ERROR(_operators[op_idx]->setup_local_state(state, info)); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 864d2e93b0..0120cfa2e5 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -50,8 +50,9 @@ class PriorityTaskQueue; // The class do the pipeline task. Minest schdule union by task scheduler class PipelineXTask : public PipelineTask { public: - PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, - PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile); + PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state, + PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile, + std::shared_ptr local_exchange_state, int task_idx); Status prepare(RuntimeState* state) override { return Status::InternalError("Should not reach here!"); @@ -132,6 +133,8 @@ public: void release_dependency() override { std::vector {}.swap(_downstream_dependency); DependencyMap {}.swap(_upstream_dependency); + + _local_exchange_state = nullptr; } std::vector& get_upstream_dependency(int id) { @@ -161,7 +164,8 @@ private: DependencyMap _upstream_dependency; std::vector _downstream_dependency; - + std::shared_ptr _local_exchange_state; + int _task_idx; bool _dry_run = false; }; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index d630c778a5..65256b98dd 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -329,6 +329,9 @@ public: return _query_options.__isset.enable_pipeline_engine && _query_options.enable_pipeline_engine; } + bool enable_local_shuffle() const { + return _query_options.__isset.enable_local_shuffle && _query_options.enable_local_shuffle; + } bool trim_tailing_spaces_for_external_table_query() const { return _query_options.trim_tailing_spaces_for_external_table_query; diff --git a/be/src/vec/runtime/partitioner.cpp b/be/src/vec/runtime/partitioner.cpp index bb95dcbb6b..db40610723 100644 --- a/be/src/vec/runtime/partitioner.cpp +++ b/be/src/vec/runtime/partitioner.cpp @@ -17,14 +17,16 @@ #include "partitioner.h" +#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h" #include "runtime/thread_context.h" #include "vec/columns/column_const.h" +#include "vec/sink/vdata_stream_sender.h" namespace doris::vectorized { -template -Status Partitioner::do_partitioning(RuntimeState* state, Block* block, - MemTracker* mem_tracker) const { +template +Status Partitioner::do_partitioning(RuntimeState* state, Block* block, + MemTracker* mem_tracker) const { int rows = block->rows(); if (rows > 0) { @@ -45,7 +47,7 @@ Status Partitioner::do_partitioning(RuntimeState* state, Block* b } for (int i = 0; i < rows; i++) { - hashes[i] = hashes[i] % _partition_count; + hashes[i] = ChannelIds()(hashes[i], _partition_count); } { @@ -56,15 +58,51 @@ Status Partitioner::do_partitioning(RuntimeState* state, Block* b return Status::OK(); } -void BucketHashPartitioner::_do_hash(const ColumnPtr& column, uint32_t* __restrict result, - int idx) const { - column->update_crcs_with_value(result, _partition_expr_ctxs[idx]->root()->type().type, +template +void Crc32HashPartitioner::_do_hash(const ColumnPtr& column, + uint32_t* __restrict result, int idx) const { + column->update_crcs_with_value(result, Base::_partition_expr_ctxs[idx]->root()->type().type, column->size()); } -void HashPartitioner::_do_hash(const ColumnPtr& column, uint64_t* __restrict result, - int /*idx*/) const { +template +void XXHashPartitioner::_do_hash(const ColumnPtr& column, uint64_t* __restrict result, + int /*idx*/) const { column->update_hashes_with_value(result); } +template +Status XXHashPartitioner::clone(RuntimeState* state, + std::unique_ptr& partitioner) { + auto* new_partitioner = new XXHashPartitioner(Base::_partition_count); + partitioner.reset(new_partitioner); + new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size()); + for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone( + state, new_partitioner->_partition_expr_ctxs[i])); + } + return Status::OK(); +} + +template +Status Crc32HashPartitioner::clone(RuntimeState* state, + std::unique_ptr& partitioner) { + auto* new_partitioner = new Crc32HashPartitioner(Base::_partition_count); + partitioner.reset(new_partitioner); + new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size()); + for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone( + state, new_partitioner->_partition_expr_ctxs[i])); + } + return Status::OK(); +} + +template class Partitioner; +template class XXHashPartitioner; +template class Partitioner; +template class XXHashPartitioner; +template class Partitioner; +template class Crc32HashPartitioner; +template class Crc32HashPartitioner; + } // namespace doris::vectorized diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h index c0ee400012..66ed8809d7 100644 --- a/be/src/vec/runtime/partitioner.h +++ b/be/src/vec/runtime/partitioner.h @@ -40,13 +40,15 @@ public: virtual Status do_partitioning(RuntimeState* state, Block* block, MemTracker* mem_tracker) const = 0; - virtual void* get_hash_values() const = 0; + virtual void* get_channel_ids() const = 0; + + virtual Status clone(RuntimeState* state, std::unique_ptr& partitioner) = 0; protected: const size_t _partition_count; }; -template +template class Partitioner : public PartitionerBase { public: Partitioner(int partition_count) : PartitionerBase(partition_count) {} @@ -65,7 +67,7 @@ public: Status do_partitioning(RuntimeState* state, Block* block, MemTracker* mem_tracker) const override; - void* get_hash_values() const override { return _hash_vals.data(); } + void* get_channel_ids() const override { return _hash_vals.data(); } protected: Status _get_partition_column_result(Block* block, std::vector& result) const { @@ -83,19 +85,28 @@ protected: mutable std::vector _hash_vals; }; -class HashPartitioner final : public Partitioner { +template +class XXHashPartitioner final : public Partitioner { public: - HashPartitioner(int partition_count) : Partitioner(partition_count) {} - ~HashPartitioner() override = default; + using Base = Partitioner; + XXHashPartitioner(int partition_count) : Partitioner(partition_count) {} + ~XXHashPartitioner() override = default; + + Status clone(RuntimeState* state, std::unique_ptr& partitioner) override; private: void _do_hash(const ColumnPtr& column, uint64_t* __restrict result, int idx) const override; }; -class BucketHashPartitioner final : public Partitioner { +template +class Crc32HashPartitioner final : public Partitioner { public: - BucketHashPartitioner(int partition_count) : Partitioner(partition_count) {} - ~BucketHashPartitioner() override = default; + using Base = Partitioner; + Crc32HashPartitioner(int partition_count) + : Partitioner(partition_count) {} + ~Crc32HashPartitioner() override = default; + + Status clone(RuntimeState* state, std::unique_ptr& partitioner) override; private: void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const override; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index bff65bd897..ca565f82d4 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -429,11 +429,12 @@ Status VDataStreamSender::init(const TDataSink& tsink) { const TDataStreamSink& t_stream_sink = tsink.stream_sink; if (_part_type == TPartitionType::HASH_PARTITIONED) { _partition_count = _channels.size(); - _partitioner.reset(new HashPartitioner(_channels.size())); + _partitioner.reset(new XXHashPartitioner(_channels.size())); RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs)); } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { _partition_count = _channel_shared_ptrs.size(); - _partitioner.reset(new BucketHashPartitioner(_channel_shared_ptrs.size())); + _partitioner.reset( + new Crc32HashPartitioner(_channel_shared_ptrs.size())); RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs)); } else if (_part_type == TPartitionType::RANGE_PARTITIONED) { return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used"); @@ -634,11 +635,11 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { RETURN_IF_ERROR(_partitioner->do_partitioning(state, block, _mem_tracker.get())); if (_part_type == TPartitionType::HASH_PARTITIONED) { RETURN_IF_ERROR(channel_add_rows(state, _channels, _partition_count, - (uint64_t*)_partitioner->get_hash_values(), rows, + (uint64_t*)_partitioner->get_channel_ids(), rows, block, _enable_pipeline_exec ? eos : false)); } else { RETURN_IF_ERROR(channel_add_rows(state, _channel_shared_ptrs, _partition_count, - (uint32_t*)_partitioner->get_hash_values(), rows, + (uint32_t*)_partitioner->get_channel_ids(), rows, block, _enable_pipeline_exec ? eos : false)); } } else { diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 35f20d1b82..3a58514c8d 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -96,6 +96,13 @@ private: const int _batch_size; }; +struct ShuffleChannelIds { + template + HashValueType operator()(HashValueType l, size_t r) { + return l % r; + } +}; + class VDataStreamSender : public DataSink { public: friend class pipeline::ExchangeSinkOperator; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index d467b3131d..e0872b2046 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -213,6 +213,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_PIPELINE_X_ENGINE = "enable_pipeline_x_engine"; + public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle"; + public static final String ENABLE_AGG_STATE = "enable_agg_state"; public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline"; @@ -719,6 +721,8 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_PIPELINE_X_ENGINE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL) private boolean enablePipelineXEngine = false; + @VariableMgr.VarAttr(name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL) + private boolean enableLocalShuffle = false; @VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL) public boolean enableAggState = false; @@ -1922,6 +1926,10 @@ public class SessionVariable implements Serializable, Writable { this.enablePipelineXEngine = enablePipelineXEngine; } + public void setEnableLocalShuffle(boolean enableLocalShuffle) { + this.enableLocalShuffle = enableLocalShuffle; + } + public boolean enablePushDownNoGroupAgg() { return enablePushDownNoGroupAgg; } @@ -2327,6 +2335,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setBeExecVersion(Config.be_exec_version); tResult.setEnablePipelineEngine(enablePipelineEngine); tResult.setEnablePipelineXEngine(enablePipelineXEngine); + tResult.setEnableLocalShuffle(enableLocalShuffle); tResult.setParallelInstance(getParallelExecInstanceNum()); tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary); tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 03d026daba..40815983bb 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -250,7 +250,9 @@ struct TQueryOptions { 87: optional bool faster_float_convert = false; - 88: optional bool enable_decimal256 = false + 88: optional bool enable_decimal256 = false; + + 89: optional bool enable_local_shuffle = false; }