[pipelineX](local exchange) Add local exchange operator (#25846)

This commit is contained in:
Gabriel
2023-10-25 18:45:02 +08:00
committed by GitHub
parent a919ef618d
commit e8f479882d
19 changed files with 534 additions and 40 deletions

View File

@ -210,12 +210,14 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
}
if (p._part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();
_partitioner.reset(new vectorized::HashPartitioner(channels.size()));
_partitioner.reset(
new vectorized::XXHashPartitioner<vectorized::ShuffleChannelIds>(channels.size()));
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
} else if (p._part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
_partition_count = channel_shared_ptrs.size();
_partitioner.reset(new vectorized::BucketHashPartitioner(channel_shared_ptrs.size()));
_partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
channel_shared_ptrs.size()));
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
}
@ -388,12 +390,12 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
if (_part_type == TPartitionType::HASH_PARTITIONED) {
RETURN_IF_ERROR(channel_add_rows(state, local_state.channels,
local_state._partition_count,
(uint64_t*)local_state._partitioner->get_hash_values(),
(uint64_t*)local_state._partitioner->get_channel_ids(),
rows, block, source_state == SourceState::FINISHED));
} else {
RETURN_IF_ERROR(channel_add_rows(state, local_state.channel_shared_ptrs,
local_state._partition_count,
(uint32_t*)local_state._partitioner->get_hash_values(),
(uint32_t*)local_state._partitioner->get_channel_ids(),
rows, block, source_state == SourceState::FINISHED));
}
} else {

View File

@ -23,6 +23,7 @@
#include <memory>
#include <mutex>
#include "concurrentqueue.h"
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/multi_cast_data_streamer.h"
#include "vec/common/hash_table/hash_map_context_creator.h"
@ -811,5 +812,48 @@ private:
bool is_set_probe {false};
};
struct LocalExchangeSharedState {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> data_queue;
int num_partitions = 0;
std::atomic<int> running_sink_operators = 0;
};
struct LocalExchangeDependency final : public WriteDependency {
public:
using SharedState = LocalExchangeSharedState;
LocalExchangeDependency(int id)
: WriteDependency(id, "LocalExchangeDependency"),
_local_exchange_shared_state(nullptr) {}
~LocalExchangeDependency() override = default;
void* shared_state() override { return _local_exchange_shared_state.get(); }
void set_shared_state(std::shared_ptr<LocalExchangeSharedState> state) {
DCHECK(_local_exchange_shared_state == nullptr);
_local_exchange_shared_state = state;
}
void set_channel_id(int channel_id) { _channel_id = channel_id; }
Dependency* read_blocked_by() override {
if (config::enable_fuzzy_mode && !_should_run() &&
_read_dependency_watcher.elapsed_time() > SLOW_DEPENDENCY_THRESHOLD) {
LOG(WARNING) << "========Dependency may be blocked by some reasons: " << name() << " "
<< id();
}
return _should_run() ? nullptr : this;
}
private:
bool _should_run() const {
DCHECK(_local_exchange_shared_state != nullptr);
return _local_exchange_shared_state->data_queue[_channel_id].size_approx() > 0 ||
_local_exchange_shared_state->running_sink_operators == 0;
}
std::shared_ptr<LocalExchangeSharedState> _local_exchange_shared_state;
int _channel_id;
};
} // namespace pipeline
} // namespace doris

View File

@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h"
namespace doris::pipeline {
Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_open_timer);
_compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
_distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
_mutable_block.resize(p._num_partitions);
_shared_state->running_sink_operators++;
return Status::OK();
}
Status LocalExchangeSinkLocalState::channel_add_rows(RuntimeState* state,
const uint32_t* __restrict channel_ids,
vectorized::Block* block,
SourceState source_state) {
auto& data_queue = _shared_state->data_queue;
std::vector<int> channel2rows[data_queue.size()];
auto rows = block->rows();
for (int i = 0; i < rows; i++) {
channel2rows[channel_ids[i]].emplace_back(i);
}
for (size_t i = 0; i < data_queue.size(); i++) {
if (_mutable_block[i] == nullptr) {
_mutable_block[i] = vectorized::MutableBlock::create_unique(block->clone_empty());
}
const int* begin = channel2rows[i].data();
_mutable_block[i]->add_rows(block, begin, begin + channel2rows[i].size());
if (_mutable_block[i]->rows() > state->batch_size() ||
source_state == SourceState::FINISHED) {
data_queue[i].enqueue(_mutable_block[i]->to_block());
_mutable_block[i].reset(nullptr);
}
}
return Status::OK();
}
Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
{
SCOPED_TIMER(local_state._compute_hash_value_timer);
RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, in_block,
local_state.mem_tracker()));
}
{
SCOPED_TIMER(local_state._distribute_timer);
RETURN_IF_ERROR(local_state.channel_add_rows(
state, (const uint32_t*)local_state._partitioner->get_channel_ids(), in_block,
source_state));
}
if (source_state == SourceState::FINISHED) {
local_state._shared_state->running_sink_operators--;
}
return Status::OK();
}
} // namespace doris::pipeline

View File

@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "pipeline/pipeline_x/dependency.h"
#include "pipeline/pipeline_x/operator.h"
namespace doris::pipeline {
class LocalExchangeSinkOperatorX;
class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState<LocalExchangeDependency> {
public:
using Base = PipelineXSinkLocalState<LocalExchangeDependency>;
ENABLE_FACTORY_CREATOR(LocalExchangeSinkLocalState);
LocalExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {}
~LocalExchangeSinkLocalState() override = default;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status channel_add_rows(RuntimeState* state, const uint32_t* __restrict channel_ids,
vectorized::Block* block, SourceState source_state);
private:
friend class LocalExchangeSinkOperatorX;
RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
RuntimeProfile::Counter* _distribute_timer = nullptr;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
std::vector<std::unique_ptr<vectorized::MutableBlock>> _mutable_block;
};
// A single 32-bit division on a recent x64 processor has a throughput of one instruction every six cycles with a latency of 26 cycles.
// In contrast, a multiplication has a throughput of one instruction every cycle and a latency of 3 cycles.
// So we prefer to this algorithm instead of modulo.
// Reference: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
struct LocalExchangeChannelIds {
static constexpr auto SHIFT_BITS = 32;
uint32_t operator()(uint32_t l, uint32_t r) {
return ((uint64_t)l * (uint64_t)r) >> SHIFT_BITS;
}
};
class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeSinkLocalState> {
public:
using Base = DataSinkOperatorX<LocalExchangeSinkLocalState>;
LocalExchangeSinkOperatorX(int sink_id, int num_partitions, const std::vector<TExpr>& texprs)
: Base(sink_id, -1), _num_partitions(num_partitions), _texprs(texprs) {}
Status init(const TPlanNode& tnode, RuntimeState* state) override {
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
}
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
}
Status init() override {
_name = "LOCAL_EXCHANGE_SINK_OPERATOR";
_partitioner.reset(
new vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_num_partitions));
RETURN_IF_ERROR(_partitioner->init(_texprs));
return Status::OK();
}
Status prepare(RuntimeState* state) override {
RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc()));
return Status::OK();
}
Status open(RuntimeState* state) override {
RETURN_IF_ERROR(_partitioner->open(state));
return Status::OK();
}
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
private:
friend class LocalExchangeSinkLocalState;
const int _num_partitions;
const std::vector<TExpr>& _texprs;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
};
} // namespace doris::pipeline

View File

@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h"
namespace doris::pipeline {
Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_open_timer);
_dependency->set_shared_state(info.local_exchange_state);
_shared_state = (LocalExchangeSharedState*)_dependency->shared_state();
DCHECK(_shared_state != nullptr);
_channel_id = info.task_idx;
_dependency->set_channel_id(_channel_id);
_get_block_failed_counter =
ADD_COUNTER_WITH_LEVEL(profile(), "GetBlockFailedTime", TUnit::UNIT, 1);
return Status::OK();
}
Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
if (!local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(*block)) {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
if (local_state._shared_state->running_sink_operators == 0) {
source_state = SourceState::FINISHED;
}
}
local_state.reached_limit(block, source_state);
return Status::OK();
}
} // namespace doris::pipeline

View File

@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "pipeline/pipeline_x/dependency.h"
#include "pipeline/pipeline_x/operator.h"
namespace doris::pipeline {
class LocalExchangeSourceOperatorX;
class LocalExchangeSourceLocalState final : public PipelineXLocalState<LocalExchangeDependency> {
public:
using Base = PipelineXLocalState<LocalExchangeDependency>;
ENABLE_FACTORY_CREATOR(LocalExchangeSourceLocalState);
LocalExchangeSourceLocalState(RuntimeState* state, OperatorXBase* parent)
: Base(state, parent) {}
Status init(RuntimeState* state, LocalStateInfo& info) override;
private:
friend class LocalExchangeSourceOperatorX;
int _channel_id;
RuntimeProfile::Counter* _get_block_failed_counter = nullptr;
};
class LocalExchangeSourceOperatorX final : public OperatorX<LocalExchangeSourceLocalState> {
public:
using Base = OperatorX<LocalExchangeSourceLocalState>;
LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, -1, id) {}
Status init(const TPlanNode& tnode, RuntimeState* state) override {
_op_name = "LOCAL_EXCHANGE_OPERATOR";
return Status::OK();
}
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status open(RuntimeState* state) override { return Status::OK(); }
const RowDescriptor& intermediate_row_desc() const override {
return _child_x->intermediate_row_desc();
}
RowDescriptor& row_descriptor() override { return _child_x->row_descriptor(); }
const RowDescriptor& row_desc() override { return _child_x->row_desc(); }
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
bool is_source() const override { return true; }
private:
friend class LocalExchangeSourceLocalState;
};
} // namespace doris::pipeline

View File

@ -64,6 +64,8 @@
#include "pipeline/exec/union_sink_operator.h"
#include "pipeline/exec/union_source_operator.h"
#include "pipeline/pipeline_x/dependency.h"
#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
@ -533,6 +535,7 @@ DECLARE_OPERATOR_X(ResultFileSinkLocalState)
DECLARE_OPERATOR_X(OlapTableSinkLocalState)
DECLARE_OPERATOR_X(AnalyticSinkLocalState)
DECLARE_OPERATOR_X(SortSinkLocalState)
DECLARE_OPERATOR_X(LocalExchangeSinkLocalState)
DECLARE_OPERATOR_X(BlockingAggSinkLocalState)
DECLARE_OPERATOR_X(StreamingAggSinkLocalState)
DECLARE_OPERATOR_X(DistinctStreamingAggSinkLocalState)
@ -571,6 +574,7 @@ DECLARE_OPERATOR_X(SetSourceLocalState<false>)
DECLARE_OPERATOR_X(DataGenLocalState)
DECLARE_OPERATOR_X(SchemaScanLocalState)
DECLARE_OPERATOR_X(MetaScanLocalState)
DECLARE_OPERATOR_X(LocalExchangeSourceLocalState)
#undef DECLARE_OPERATOR_X
@ -592,6 +596,7 @@ template class PipelineXSinkLocalState<UnionDependency>;
template class PipelineXSinkLocalState<PartitionSortDependency>;
template class PipelineXSinkLocalState<MultiCastDependency>;
template class PipelineXSinkLocalState<SetDependency>;
template class PipelineXSinkLocalState<LocalExchangeDependency>;
template class PipelineXLocalState<HashJoinDependency>;
template class PipelineXLocalState<SortDependency>;
@ -603,6 +608,7 @@ template class PipelineXLocalState<UnionDependency>;
template class PipelineXLocalState<MultiCastDependency>;
template class PipelineXLocalState<PartitionSortDependency>;
template class PipelineXLocalState<SetDependency>;
template class PipelineXLocalState<LocalExchangeDependency>;
template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX>;

View File

@ -36,6 +36,8 @@ struct LocalStateInfo {
RuntimeProfile* parent_profile;
const std::vector<TScanRangeParams> scan_ranges;
std::vector<DependencySPtr>& dependencys;
std::shared_ptr<LocalExchangeSharedState> local_exchange_state;
int task_idx;
};
// This struct is used only for initializing local sink state.
@ -236,7 +238,7 @@ public:
[[nodiscard]] OperatorXPtr get_child() { return _child_x; }
[[nodiscard]] vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; }
[[nodiscard]] RowDescriptor& row_descriptor() { return _row_descriptor; }
[[nodiscard]] virtual RowDescriptor& row_descriptor() { return _row_descriptor; }
[[nodiscard]] int id() const override { return node_id(); }
[[nodiscard]] int operator_id() const { return _operator_id; }
@ -244,7 +246,7 @@ public:
[[nodiscard]] int64_t limit() const { return _limit; }
[[nodiscard]] const RowDescriptor& row_desc() override {
[[nodiscard]] virtual const RowDescriptor& row_desc() override {
return _output_row_descriptor ? *_output_row_descriptor : _row_descriptor;
}
@ -418,6 +420,9 @@ public:
virtual Status init(const TPlanNode& tnode, RuntimeState* state);
Status init(const TDataSink& tsink) override;
virtual Status init() {
return Status::InternalError("init() is only implemented in local exchange!");
}
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status open(RuntimeState* state) override { return Status::OK(); }

View File

@ -84,6 +84,8 @@
#include "pipeline/exec/table_function_operator.h"
#include "pipeline/exec/union_sink_operator.h"
#include "pipeline/exec/union_source_operator.h"
#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h"
#include "pipeline/task_scheduler.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
@ -411,9 +413,15 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_runtime_states[i]->resize_op_id_to_local_state(max_operator_id());
std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto task = std::make_unique<PipelineXTask>(_pipelines[pip_idx], _total_tasks++,
_runtime_states[i].get(), this,
_runtime_states[i]->runtime_profile());
auto task = std::make_unique<PipelineXTask>(
_pipelines[pip_idx], _total_tasks++, _runtime_states[i].get(), this,
_runtime_states[i]->runtime_profile(),
_op_id_to_le_state.count(
_pipelines[pip_idx]->operator_xs().front()->operator_id()) > 0
? _op_id_to_le_state
[_pipelines[pip_idx]->operator_xs().front()->operator_id()]
: nullptr,
i);
pipeline_id_to_task.insert({_pipelines[pip_idx]->id(), task.get()});
_tasks[i].emplace_back(std::move(task));
}
@ -463,6 +471,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_union_child_pipelines.clear();
_set_child_pipelines.clear();
_dag.clear();
_op_id_to_le_state.clear();
return Status::OK();
}
@ -538,6 +547,37 @@ Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
return Status::OK();
}
Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorXPtr& op,
PipelinePtr& cur_pipe,
const std::vector<TExpr>& texprs) {
if (!_runtime_state->enable_local_shuffle() ||
_runtime_state->query_parallel_instance_num() == 1) {
return Status::OK();
}
auto local_exchange_id = next_operator_id();
op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
cur_pipe = add_pipeline();
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
sink.reset(new LocalExchangeSinkOperatorX(
local_exchange_id, _runtime_state->query_parallel_instance_num(), texprs));
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init());
auto shared_state = LocalExchangeSharedState::create_shared();
shared_state->data_queue.resize(_runtime_state->query_parallel_instance_num());
shared_state->num_partitions = _runtime_state->query_parallel_instance_num();
_op_id_to_le_state.insert({local_exchange_id, shared_state});
return Status::OK();
}
Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
const doris::TPipelineFragmentParams& request,
const DescriptorTbl& descs, OperatorXPtr& op,
@ -633,6 +673,12 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
if (!tnode.agg_node.need_finalize) {
RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
RETURN_IF_ERROR(
_add_local_exchange(pool, op, cur_pipe, tnode.agg_node.grouping_exprs));
}
}
break;
}

View File

@ -118,6 +118,8 @@ public:
private:
void _close_action() override;
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override;
Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op, PipelinePtr& cur_pipe,
const std::vector<TExpr>& texprs);
[[nodiscard]] Status _build_pipelines(ObjectPool* pool,
const doris::TPipelineFragmentParams& request,
@ -178,6 +180,8 @@ private:
// but some operators do not have a corresponding plan node ID.
// We set these IDs as negative numbers, which are not visible to the user.
int _operator_id = 0;
std::map<PipelineId, std::shared_ptr<LocalExchangeSharedState>> _op_id_to_le_state;
};
} // namespace pipeline

View File

@ -44,14 +44,18 @@ class RuntimeState;
namespace doris::pipeline {
PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
PipelineFragmentContext* fragment_context,
RuntimeProfile* parent_profile)
: PipelineTask(pipeline, index, state, fragment_context, parent_profile),
RuntimeProfile* parent_profile,
std::shared_ptr<LocalExchangeSharedState> local_exchange_state,
int task_idx)
: PipelineTask(pipeline, task_id, state, fragment_context, parent_profile),
_operators(pipeline->operator_xs()),
_source(_operators.front()),
_root(_operators.back()),
_sink(pipeline->sink_shared_pointer()) {
_sink(pipeline->sink_shared_pointer()),
_local_exchange_state(local_exchange_state),
_task_idx(task_idx) {
_pipeline_task_watcher.start();
_sink->get_dependency(_downstream_dependency);
}
@ -82,7 +86,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams
op_idx == _operators.size() - 1
? _parent_profile
: state->get_local_state(_operators[op_idx + 1]->operator_id())->profile(),
scan_ranges, deps};
scan_ranges, deps, _local_exchange_state, _task_idx};
RETURN_IF_ERROR(_operators[op_idx]->setup_local_state(state, info));
}

View File

@ -50,8 +50,9 @@ class PriorityTaskQueue;
// The class do the pipeline task. Minest schdule union by task scheduler
class PipelineXTask : public PipelineTask {
public:
PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile);
PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile,
std::shared_ptr<LocalExchangeSharedState> local_exchange_state, int task_idx);
Status prepare(RuntimeState* state) override {
return Status::InternalError("Should not reach here!");
@ -132,6 +133,8 @@ public:
void release_dependency() override {
std::vector<DependencySPtr> {}.swap(_downstream_dependency);
DependencyMap {}.swap(_upstream_dependency);
_local_exchange_state = nullptr;
}
std::vector<DependencySPtr>& get_upstream_dependency(int id) {
@ -161,7 +164,8 @@ private:
DependencyMap _upstream_dependency;
std::vector<DependencySPtr> _downstream_dependency;
std::shared_ptr<LocalExchangeSharedState> _local_exchange_state;
int _task_idx;
bool _dry_run = false;
};

View File

@ -329,6 +329,9 @@ public:
return _query_options.__isset.enable_pipeline_engine &&
_query_options.enable_pipeline_engine;
}
bool enable_local_shuffle() const {
return _query_options.__isset.enable_local_shuffle && _query_options.enable_local_shuffle;
}
bool trim_tailing_spaces_for_external_table_query() const {
return _query_options.trim_tailing_spaces_for_external_table_query;

View File

@ -17,14 +17,16 @@
#include "partitioner.h"
#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h"
#include "runtime/thread_context.h"
#include "vec/columns/column_const.h"
#include "vec/sink/vdata_stream_sender.h"
namespace doris::vectorized {
template <typename HashValueType>
Status Partitioner<HashValueType>::do_partitioning(RuntimeState* state, Block* block,
MemTracker* mem_tracker) const {
template <typename HashValueType, typename ChannelIds>
Status Partitioner<HashValueType, ChannelIds>::do_partitioning(RuntimeState* state, Block* block,
MemTracker* mem_tracker) const {
int rows = block->rows();
if (rows > 0) {
@ -45,7 +47,7 @@ Status Partitioner<HashValueType>::do_partitioning(RuntimeState* state, Block* b
}
for (int i = 0; i < rows; i++) {
hashes[i] = hashes[i] % _partition_count;
hashes[i] = ChannelIds()(hashes[i], _partition_count);
}
{
@ -56,15 +58,51 @@ Status Partitioner<HashValueType>::do_partitioning(RuntimeState* state, Block* b
return Status::OK();
}
void BucketHashPartitioner::_do_hash(const ColumnPtr& column, uint32_t* __restrict result,
int idx) const {
column->update_crcs_with_value(result, _partition_expr_ctxs[idx]->root()->type().type,
template <typename ChannelIds>
void Crc32HashPartitioner<ChannelIds>::_do_hash(const ColumnPtr& column,
uint32_t* __restrict result, int idx) const {
column->update_crcs_with_value(result, Base::_partition_expr_ctxs[idx]->root()->type().type,
column->size());
}
void HashPartitioner::_do_hash(const ColumnPtr& column, uint64_t* __restrict result,
int /*idx*/) const {
template <typename ChannelIds>
void XXHashPartitioner<ChannelIds>::_do_hash(const ColumnPtr& column, uint64_t* __restrict result,
int /*idx*/) const {
column->update_hashes_with_value(result);
}
template <typename ChannelIds>
Status XXHashPartitioner<ChannelIds>::clone(RuntimeState* state,
std::unique_ptr<PartitionerBase>& partitioner) {
auto* new_partitioner = new XXHashPartitioner(Base::_partition_count);
partitioner.reset(new_partitioner);
new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size());
for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone(
state, new_partitioner->_partition_expr_ctxs[i]));
}
return Status::OK();
}
template <typename ChannelIds>
Status Crc32HashPartitioner<ChannelIds>::clone(RuntimeState* state,
std::unique_ptr<PartitionerBase>& partitioner) {
auto* new_partitioner = new Crc32HashPartitioner(Base::_partition_count);
partitioner.reset(new_partitioner);
new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size());
for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone(
state, new_partitioner->_partition_expr_ctxs[i]));
}
return Status::OK();
}
template class Partitioner<size_t, pipeline::LocalExchangeChannelIds>;
template class XXHashPartitioner<pipeline::LocalExchangeChannelIds>;
template class Partitioner<size_t, ShuffleChannelIds>;
template class XXHashPartitioner<ShuffleChannelIds>;
template class Partitioner<uint32_t, ShuffleChannelIds>;
template class Crc32HashPartitioner<ShuffleChannelIds>;
template class Crc32HashPartitioner<pipeline::LocalExchangeChannelIds>;
} // namespace doris::vectorized

View File

@ -40,13 +40,15 @@ public:
virtual Status do_partitioning(RuntimeState* state, Block* block,
MemTracker* mem_tracker) const = 0;
virtual void* get_hash_values() const = 0;
virtual void* get_channel_ids() const = 0;
virtual Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) = 0;
protected:
const size_t _partition_count;
};
template <typename HashValueType>
template <typename HashValueType, typename ChannelIds>
class Partitioner : public PartitionerBase {
public:
Partitioner(int partition_count) : PartitionerBase(partition_count) {}
@ -65,7 +67,7 @@ public:
Status do_partitioning(RuntimeState* state, Block* block,
MemTracker* mem_tracker) const override;
void* get_hash_values() const override { return _hash_vals.data(); }
void* get_channel_ids() const override { return _hash_vals.data(); }
protected:
Status _get_partition_column_result(Block* block, std::vector<int>& result) const {
@ -83,19 +85,28 @@ protected:
mutable std::vector<HashValueType> _hash_vals;
};
class HashPartitioner final : public Partitioner<uint64_t> {
template <typename ChannelIds>
class XXHashPartitioner final : public Partitioner<uint64_t, ChannelIds> {
public:
HashPartitioner(int partition_count) : Partitioner<uint64_t>(partition_count) {}
~HashPartitioner() override = default;
using Base = Partitioner<uint64_t, ChannelIds>;
XXHashPartitioner(int partition_count) : Partitioner<uint64_t, ChannelIds>(partition_count) {}
~XXHashPartitioner() override = default;
Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) override;
private:
void _do_hash(const ColumnPtr& column, uint64_t* __restrict result, int idx) const override;
};
class BucketHashPartitioner final : public Partitioner<uint32_t> {
template <typename ChannelIds>
class Crc32HashPartitioner final : public Partitioner<uint32_t, ChannelIds> {
public:
BucketHashPartitioner(int partition_count) : Partitioner<uint32_t>(partition_count) {}
~BucketHashPartitioner() override = default;
using Base = Partitioner<uint32_t, ChannelIds>;
Crc32HashPartitioner(int partition_count)
: Partitioner<uint32_t, ChannelIds>(partition_count) {}
~Crc32HashPartitioner() override = default;
Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) override;
private:
void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const override;

View File

@ -429,11 +429,12 @@ Status VDataStreamSender::init(const TDataSink& tsink) {
const TDataStreamSink& t_stream_sink = tsink.stream_sink;
if (_part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = _channels.size();
_partitioner.reset(new HashPartitioner(_channels.size()));
_partitioner.reset(new XXHashPartitioner<ShuffleChannelIds>(_channels.size()));
RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs));
} else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
_partition_count = _channel_shared_ptrs.size();
_partitioner.reset(new BucketHashPartitioner(_channel_shared_ptrs.size()));
_partitioner.reset(
new Crc32HashPartitioner<ShuffleChannelIds>(_channel_shared_ptrs.size()));
RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs));
} else if (_part_type == TPartitionType::RANGE_PARTITIONED) {
return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used");
@ -634,11 +635,11 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
RETURN_IF_ERROR(_partitioner->do_partitioning(state, block, _mem_tracker.get()));
if (_part_type == TPartitionType::HASH_PARTITIONED) {
RETURN_IF_ERROR(channel_add_rows(state, _channels, _partition_count,
(uint64_t*)_partitioner->get_hash_values(), rows,
(uint64_t*)_partitioner->get_channel_ids(), rows,
block, _enable_pipeline_exec ? eos : false));
} else {
RETURN_IF_ERROR(channel_add_rows(state, _channel_shared_ptrs, _partition_count,
(uint32_t*)_partitioner->get_hash_values(), rows,
(uint32_t*)_partitioner->get_channel_ids(), rows,
block, _enable_pipeline_exec ? eos : false));
}
} else {

View File

@ -96,6 +96,13 @@ private:
const int _batch_size;
};
struct ShuffleChannelIds {
template <typename HashValueType>
HashValueType operator()(HashValueType l, size_t r) {
return l % r;
}
};
class VDataStreamSender : public DataSink {
public:
friend class pipeline::ExchangeSinkOperator;

View File

@ -213,6 +213,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_PIPELINE_X_ENGINE = "enable_pipeline_x_engine";
public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle";
public static final String ENABLE_AGG_STATE = "enable_agg_state";
public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline";
@ -719,6 +721,8 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_PIPELINE_X_ENGINE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL)
private boolean enablePipelineXEngine = false;
@VariableMgr.VarAttr(name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL)
private boolean enableLocalShuffle = false;
@VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL)
public boolean enableAggState = false;
@ -1922,6 +1926,10 @@ public class SessionVariable implements Serializable, Writable {
this.enablePipelineXEngine = enablePipelineXEngine;
}
public void setEnableLocalShuffle(boolean enableLocalShuffle) {
this.enableLocalShuffle = enableLocalShuffle;
}
public boolean enablePushDownNoGroupAgg() {
return enablePushDownNoGroupAgg;
}
@ -2327,6 +2335,7 @@ public class SessionVariable implements Serializable, Writable {
tResult.setBeExecVersion(Config.be_exec_version);
tResult.setEnablePipelineEngine(enablePipelineEngine);
tResult.setEnablePipelineXEngine(enablePipelineXEngine);
tResult.setEnableLocalShuffle(enableLocalShuffle);
tResult.setParallelInstance(getParallelExecInstanceNum());
tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary);
tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery);

View File

@ -250,7 +250,9 @@ struct TQueryOptions {
87: optional bool faster_float_convert = false;
88: optional bool enable_decimal256 = false
88: optional bool enable_decimal256 = false;
89: optional bool enable_local_shuffle = false;
}