[pipelineX](local shuffle) Use local shuffle to optimize BHJ (#27823)

This commit is contained in:
Gabriel
2023-11-30 21:08:45 +08:00
committed by GitHub
parent 16fb7a507c
commit c0aac043b6
13 changed files with 412 additions and 131 deletions

View File

@ -104,6 +104,11 @@ public:
return _sub_plan_query_statistics_recvr;
}
bool need_to_local_shuffle() const override {
// TODO(gabriel):
return false;
}
private:
friend class ExchangeLocalState;
const int _num_senders;

View File

@ -126,6 +126,17 @@ public:
return _collect_query_statistics_with_every_batch;
}
bool need_to_local_shuffle() const { return _need_to_local_shuffle; }
void set_need_to_local_shuffle(bool need_to_local_shuffle) {
_need_to_local_shuffle = need_to_local_shuffle;
}
void init_need_to_local_shuffle_by_source() {
set_need_to_local_shuffle(operatorXs.front()->need_to_local_shuffle());
}
std::vector<std::shared_ptr<Pipeline>>& children() { return _children; }
void set_children(std::shared_ptr<Pipeline> child) { _children.push_back(child); }
private:
void _init_profile();
@ -136,6 +147,8 @@ private:
std::vector<std::pair<int, std::weak_ptr<Pipeline>>> _parents;
std::vector<std::pair<int, std::shared_ptr<Pipeline>>> _dependencies;
std::vector<std::shared_ptr<Pipeline>> _children;
PipelineId _pipeline_id;
std::weak_ptr<PipelineFragmentContext> _context;
int _previous_schedule_id = -1;
@ -178,6 +191,13 @@ private:
bool _always_can_write = false;
bool _is_root_pipeline = false;
bool _collect_query_statistics_with_every_batch = false;
// If source operator meets one of all conditions below:
// 1. is scan operator with Hash Bucket
// 2. is exchange operator with Hash/BucketHash partition
// then set `_need_to_local_shuffle` to false which means we should use local shuffle in this fragment
// because data already be partitioned by storage/shuffling.
bool _need_to_local_shuffle = true;
};
} // namespace doris::pipeline

View File

@ -22,7 +22,7 @@
#include "common/logging.h"
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_task.h"
#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
#include "pipeline/pipeline_x/pipeline_x_task.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
@ -180,4 +180,11 @@ void RuntimeFilterDependency::sub_filters() {
}
}
void LocalExchangeSharedState::sub_running_sink_operators() {
std::unique_lock<std::mutex> lc(le_lock);
if (exchanger->running_sink_operators.fetch_sub(1) == 1) {
_set_ready_for_read();
}
}
} // namespace doris::pipeline

View File

@ -31,6 +31,7 @@
#include "gutil/integral_types.h"
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/multi_cast_data_streamer.h"
#include "pipeline/exec/operator.h"
#include "vec/common/hash_table/hash_map_context_creator.h"
#include "vec/common/sort/partition_sorter.h"
#include "vec/common/sort/sorter.h"
@ -579,21 +580,15 @@ public:
}
};
using PartitionedBlock = std::pair<std::shared_ptr<vectorized::Block>,
std::tuple<std::shared_ptr<std::vector<int>>, size_t, size_t>>;
class Exchanger;
struct LocalExchangeSharedState : public BasicSharedState {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
std::unique_ptr<Exchanger> exchanger {};
std::vector<Dependency*> source_dependencies;
std::atomic<int> running_sink_operators = 0;
std::mutex le_lock;
void sub_running_sink_operators() {
std::unique_lock<std::mutex> lc(le_lock);
if (running_sink_operators.fetch_sub(1) == 1) {
_set_ready_for_read();
}
}
void sub_running_sink_operators();
void _set_ready_for_read() {
for (auto* dep : source_dependencies) {
DCHECK(dep);

View File

@ -17,6 +17,8 @@
#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
namespace doris::pipeline {
Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
@ -26,47 +28,12 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
_distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
_num_rows_in_queue.resize(p._num_partitions);
for (size_t i = 0; i < p._num_partitions; i++) {
_num_rows_in_queue[i] = ADD_COUNTER_WITH_LEVEL(
profile(), "NumRowsInQueue" + std::to_string(i), TUnit::UNIT, 1);
}
RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
return Status::OK();
}
_exchanger = _shared_state->exchanger.get();
DCHECK(_exchanger != nullptr);
Status LocalExchangeSinkLocalState::split_rows(RuntimeState* state,
const uint32_t* __restrict channel_ids,
vectorized::Block* block, SourceState source_state) {
auto& data_queue = _shared_state->data_queue;
const auto num_partitions = data_queue.size();
const auto rows = block->rows();
auto row_idx = std::make_shared<std::vector<int>>(rows);
{
_partition_rows_histogram.assign(num_partitions + 1, 0);
for (size_t i = 0; i < rows; ++i) {
_partition_rows_histogram[channel_ids[i]]++;
}
for (int32_t i = 1; i <= num_partitions; ++i) {
_partition_rows_histogram[i] += _partition_rows_histogram[i - 1];
}
for (int32_t i = rows - 1; i >= 0; --i) {
(*row_idx)[_partition_rows_histogram[channel_ids[i]] - 1] = i;
_partition_rows_histogram[channel_ids[i]]--;
}
}
auto new_block = vectorized::Block::create_shared(block->clone_empty());
new_block->swap(*block);
for (size_t i = 0; i < num_partitions; i++) {
size_t start = _partition_rows_histogram[i];
size_t size = _partition_rows_histogram[i + 1] - start;
if (size > 0) {
data_queue[i].enqueue({new_block, {row_idx, start, size}});
_shared_state->set_ready_for_read(i);
COUNTER_UPDATE(_num_rows_in_queue[i], size);
}
if (_exchanger->get_type() == ExchangeType::SHUFFLE) {
auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
}
return Status::OK();
@ -77,17 +44,7 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
{
SCOPED_TIMER(local_state._compute_hash_value_timer);
RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, in_block,
local_state.mem_tracker()));
}
{
SCOPED_TIMER(local_state._distribute_timer);
RETURN_IF_ERROR(local_state.split_rows(
state, (const uint32_t*)local_state._partitioner->get_channel_ids(), in_block,
source_state));
}
RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, source_state, local_state));
if (source_state == SourceState::FINISHED) {
local_state._shared_state->sub_running_sink_operators();

View File

@ -30,6 +30,9 @@ public:
~LocalExchangeSinkDependency() override = default;
};
class Exchanger;
class ShuffleExchanger;
class PassthroughExchanger;
class LocalExchangeSinkOperatorX;
class LocalExchangeSinkLocalState final
: public PipelineXSinkLocalState<LocalExchangeSinkDependency> {
@ -43,17 +46,21 @@ public:
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids,
vectorized::Block* block, SourceState source_state);
private:
friend class LocalExchangeSinkOperatorX;
friend class ShuffleExchanger;
friend class PassthroughExchanger;
Exchanger* _exchanger = nullptr;
// Used by shuffle exchanger
RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
RuntimeProfile::Counter* _distribute_timer = nullptr;
std::vector<RuntimeProfile::Counter*> _num_rows_in_queue {};
std::unique_ptr<vectorized::PartitionerBase> _partitioner = nullptr;
std::vector<size_t> _partition_rows_histogram {};
std::vector<size_t> _partition_rows_histogram;
// Used by random passthrough exchanger
int _channel_id = 0;
};
// A single 32-bit division on a recent x64 processor has a throughput of one instruction every six cycles with a latency of 26 cycles.
@ -82,21 +89,31 @@ public:
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
}
Status init() override {
Status init(bool need_partitioner) override {
_name = "LOCAL_EXCHANGE_SINK_OPERATOR";
_partitioner.reset(
new vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_num_partitions));
RETURN_IF_ERROR(_partitioner->init(_texprs));
_need_partitioner = need_partitioner;
if (_need_partitioner) {
_partitioner.reset(
new vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_num_partitions));
RETURN_IF_ERROR(_partitioner->init(_texprs));
}
return Status::OK();
}
Status prepare(RuntimeState* state) override {
RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc()));
if (_need_partitioner) {
RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc()));
}
return Status::OK();
}
Status open(RuntimeState* state) override {
RETURN_IF_ERROR(_partitioner->open(state));
if (_need_partitioner) {
RETURN_IF_ERROR(_partitioner->open(state));
}
return Status::OK();
}
@ -105,6 +122,7 @@ public:
private:
friend class LocalExchangeSinkLocalState;
bool _need_partitioner;
const int _num_partitions;
const std::vector<TExpr>& _texprs;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;

View File

@ -17,8 +17,21 @@
#include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h"
#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
namespace doris::pipeline {
void LocalExchangeSourceDependency::block() {
if (((LocalExchangeSharedState*)_shared_state.get())->exchanger->running_sink_operators == 0) {
return;
}
std::unique_lock<std::mutex> lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock);
if (((LocalExchangeSharedState*)_shared_state.get())->exchanger->running_sink_operators == 0) {
return;
}
Dependency::block();
}
Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
@ -28,9 +41,14 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo&
DCHECK(_shared_state != nullptr);
_channel_id = info.task_idx;
_shared_state->set_dep_by_channel_id(_dependency, _channel_id);
_exchanger = _shared_state->exchanger.get();
DCHECK(_exchanger != nullptr);
_get_block_failed_counter =
ADD_COUNTER_WITH_LEVEL(profile(), "GetBlockFailedTime", TUnit::UNIT, 1);
_copy_data_timer = ADD_TIMER(profile(), "CopyDataTime");
if (_exchanger->get_type() == ExchangeType::SHUFFLE) {
_copy_data_timer = ADD_TIMER(profile(), "CopyDataTime");
}
return Status::OK();
}
@ -38,42 +56,7 @@ Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::
SourceState& source_state) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
PartitionedBlock partitioned_block;
std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
auto get_data = [&](vectorized::Block* result_block) {
do {
const auto* offset_start = &((
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
mutable_block->add_rows(partitioned_block.first.get(), offset_start,
offset_start + std::get<2>(partitioned_block.second));
} while (mutable_block->rows() < state->batch_size() &&
local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
partitioned_block));
*result_block = mutable_block->to_block();
};
if (local_state._shared_state->running_sink_operators == 0) {
if (local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block =
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
get_data(block);
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
source_state = SourceState::FINISHED;
}
} else if (local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block =
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
get_data(block);
} else {
local_state._dependency->block();
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
}
RETURN_IF_ERROR(local_state._exchanger->get_block(state, block, source_state, local_state));
local_state.reached_limit(block, source_state);
return Status::OK();
}

View File

@ -29,18 +29,12 @@ public:
: Dependency(id, node_id, "LocalExchangeSourceDependency", query_ctx) {}
~LocalExchangeSourceDependency() override = default;
void block() override {
if (((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0) {
return;
}
std::unique_lock<std::mutex> lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock);
if (((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0) {
return;
}
Dependency::block();
}
void block() override;
};
class Exchanger;
class ShuffleExchanger;
class PassthroughExchanger;
class LocalExchangeSourceOperatorX;
class LocalExchangeSourceLocalState final
: public PipelineXLocalState<LocalExchangeSourceDependency> {
@ -54,7 +48,10 @@ public:
private:
friend class LocalExchangeSourceOperatorX;
friend class ShuffleExchanger;
friend class PassthroughExchanger;
Exchanger* _exchanger = nullptr;
int _channel_id;
RuntimeProfile::Counter* _get_block_failed_counter = nullptr;
RuntimeProfile::Counter* _copy_data_timer = nullptr;

View File

@ -0,0 +1,147 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h"
namespace doris::pipeline {
Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state, LocalExchangeSinkLocalState& local_state) {
{
SCOPED_TIMER(local_state._compute_hash_value_timer);
RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, in_block,
local_state.mem_tracker()));
}
{
SCOPED_TIMER(local_state._distribute_timer);
RETURN_IF_ERROR(_split_rows(state,
(const uint32_t*)local_state._partitioner->get_channel_ids(),
in_block, source_state, local_state));
}
return Status::OK();
}
Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state,
LocalExchangeSourceLocalState& local_state) {
PartitionedBlock partitioned_block;
std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
auto get_data = [&](vectorized::Block* result_block) {
do {
const auto* offset_start = &((
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
mutable_block->add_rows(partitioned_block.first.get(), offset_start,
offset_start + std::get<2>(partitioned_block.second));
} while (mutable_block->rows() < state->batch_size() &&
_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
*result_block = mutable_block->to_block();
};
if (running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block =
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
get_data(block);
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
source_state = SourceState::FINISHED;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block =
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
get_data(block);
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
}
return Status::OK();
}
Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids,
vectorized::Block* block, SourceState source_state,
LocalExchangeSinkLocalState& local_state) {
auto& data_queue = _data_queue;
const auto rows = block->rows();
auto row_idx = std::make_shared<std::vector<int>>(rows);
{
local_state._partition_rows_histogram.assign(_num_instances + 1, 0);
for (size_t i = 0; i < rows; ++i) {
local_state._partition_rows_histogram[channel_ids[i]]++;
}
for (int32_t i = 1; i <= _num_instances; ++i) {
local_state._partition_rows_histogram[i] +=
local_state._partition_rows_histogram[i - 1];
}
for (int32_t i = rows - 1; i >= 0; --i) {
(*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] - 1] = i;
local_state._partition_rows_histogram[channel_ids[i]]--;
}
}
auto new_block = vectorized::Block::create_shared(block->clone_empty());
new_block->swap(*block);
for (size_t i = 0; i < _num_instances; i++) {
size_t start = local_state._partition_rows_histogram[i];
size_t size = local_state._partition_rows_histogram[i + 1] - start;
if (size > 0) {
data_queue[i].enqueue({new_block, {row_idx, start, size}});
local_state._shared_state->set_ready_for_read(i);
}
}
return Status::OK();
}
Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state,
LocalExchangeSinkLocalState& local_state) {
auto new_block = vectorized::Block::create_unique(in_block->clone_empty());
new_block->swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_instances;
_data_queue[channel_id].enqueue(std::move(new_block));
local_state._shared_state->set_ready_for_read(channel_id);
return Status::OK();
}
Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state,
LocalExchangeSourceLocalState& local_state) {
std::unique_ptr<vectorized::Block> next_block;
if (running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
*block = *next_block.release();
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
source_state = SourceState::FINISHED;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
*block = *next_block.release();
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
}
return Status::OK();
}
} // namespace doris::pipeline

View File

@ -0,0 +1,98 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "pipeline/pipeline_x/dependency.h"
#include "pipeline/pipeline_x/operator.h"
namespace doris::pipeline {
enum class ExchangeType : uint8_t {
SHUFFLE = 0,
PASSTHROUGH = 1,
};
class LocalExchangeSourceLocalState;
class LocalExchangeSinkLocalState;
class Exchanger {
public:
Exchanger(int num_instances)
: running_sink_operators(num_instances), _num_instances(num_instances) {}
virtual ~Exchanger() = default;
virtual Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state,
LocalExchangeSourceLocalState& local_state) = 0;
virtual Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state,
LocalExchangeSinkLocalState& local_state) = 0;
virtual ExchangeType get_type() const = 0;
std::atomic<int> running_sink_operators = 0;
protected:
const int _num_instances;
};
class LocalExchangeSourceLocalState;
class LocalExchangeSinkLocalState;
class ShuffleExchanger final : public Exchanger {
using PartitionedBlock =
std::pair<std::shared_ptr<vectorized::Block>,
std::tuple<std::shared_ptr<std::vector<int>>, size_t, size_t>>;
public:
ENABLE_FACTORY_CREATOR(ShuffleExchanger);
ShuffleExchanger(int num_instances) : Exchanger(num_instances) {
_data_queue.resize(num_instances);
}
~ShuffleExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state,
LocalExchangeSinkLocalState& local_state) override;
Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state,
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return ExchangeType::SHUFFLE; }
private:
Status _split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids,
vectorized::Block* block, SourceState source_state,
LocalExchangeSinkLocalState& local_state);
std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> _data_queue;
};
class PassthroughExchanger final : public Exchanger {
public:
ENABLE_FACTORY_CREATOR(PassthroughExchanger);
PassthroughExchanger(int num_instances) : Exchanger(num_instances) {
_data_queue.resize(num_instances);
}
~PassthroughExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state,
LocalExchangeSinkLocalState& local_state) override;
Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state,
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; }
private:
std::vector<moodycamel::ConcurrentQueue<std::unique_ptr<vectorized::Block>>> _data_queue;
};
} // namespace doris::pipeline

View File

@ -179,6 +179,8 @@ public:
[[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; }
[[nodiscard]] virtual bool need_to_local_shuffle() const { return true; }
bool can_read() override {
LOG(FATAL) << "should not reach here!";
return false;
@ -423,7 +425,7 @@ public:
virtual Status init(const TPlanNode& tnode, RuntimeState* state);
Status init(const TDataSink& tsink) override;
virtual Status init() {
virtual Status init(bool need_partitioner) {
return Status::InternalError("init() is only implemented in local exchange!");
}

View File

@ -231,6 +231,8 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
static_cast<void>(root_pipeline->set_sink(_sink));
// RETURN_IF_ERROR(_plan_local_shuffle());
// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
DCHECK(pipeline->sink_x() != nullptr) << pipeline->operator_xs().size();
@ -247,6 +249,17 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
return Status::OK();
}
Status PipelineXFragmentContext::_plan_local_shuffle() {
for (int pip_idx = _pipelines.size() - 1; pip_idx >= 0; pip_idx--) {
auto& children = _pipelines[pip_idx]->children();
if (children.empty()) {
_pipelines[pip_idx]->init_need_to_local_shuffle_by_source();
} else {
}
}
return Status::OK();
}
Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
const std::vector<TExpr>& output_exprs,
const TPipelineFragmentParams& params,
@ -595,7 +608,8 @@ Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorXPtr& op,
PipelinePtr& cur_pipe, const TPlanNode& tnode,
const std::vector<TExpr>& texprs) {
const std::vector<TExpr>& texprs,
ExchangeType exchange_type) {
if (!_runtime_state->enable_local_shuffle() || _num_instances <= 1) {
return Status::OK();
}
@ -617,12 +631,23 @@ Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorX
sink.reset(new LocalExchangeSinkOperatorX(next_sink_operator_id(), local_exchange_id,
_num_instances, texprs));
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init());
bool need_partitioner = false;
auto shared_state = LocalExchangeSharedState::create_shared();
shared_state->data_queue.resize(_num_instances);
shared_state->source_dependencies.resize(_num_instances, nullptr);
shared_state->running_sink_operators = _num_instances;
switch (exchange_type) {
case ExchangeType::SHUFFLE:
shared_state->exchanger = ShuffleExchanger::create_unique(_num_instances);
need_partitioner = true;
break;
case ExchangeType::PASSTHROUGH:
shared_state->exchanger = PassthroughExchanger::create_unique(_num_instances);
break;
default:
return Status::InternalError("Unsupported local exchange type : " +
std::to_string((int)exchange_type));
}
RETURN_IF_ERROR(cur_pipe->sink_x()->init(need_partitioner));
_op_id_to_le_state.insert({local_exchange_id, shared_state});
return Status::OK();
}
@ -687,6 +712,10 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
// RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
// tnode.agg_node.grouping_exprs,
// ExchangeType::PASSTHROUGH));
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
tnode.agg_node.use_streaming_preaggregation) {
op.reset(new StreamingAggSourceOperatorX(pool, tnode, next_operator_id(), descs));
@ -703,6 +732,10 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
// RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
// tnode.agg_node.grouping_exprs,
// ExchangeType::PASSTHROUGH));
} else {
op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
@ -720,10 +753,19 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
if (!tnode.agg_node.need_finalize) {
RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
tnode.agg_node.grouping_exprs));
}
// if (tnode.agg_node.grouping_exprs.empty()) {
// if (tnode.agg_node.need_finalize) {
// RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
// tnode.agg_node.grouping_exprs,
// ExchangeType::PASSTHROUGH));
// } else {
// // TODO(gabriel): maybe use local shuffle
// }
// } else if (cur_pipe->need_to_local_shuffle()) {
// RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
// tnode.agg_node.grouping_exprs,
// ExchangeType::SHUFFLE));
// }
}
break;
}
@ -750,7 +792,14 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
for (const auto& eq_join_conjunct : eq_join_conjuncts) {
probe_exprs.push_back(eq_join_conjunct.left);
}
RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, probe_exprs));
if (tnode.hash_join_node.__isset.is_broadcast_join &&
tnode.hash_join_node.is_broadcast_join) {
RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, probe_exprs,
ExchangeType::PASSTHROUGH));
} else if (cur_pipe->need_to_local_shuffle()) {
RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, probe_exprs,
ExchangeType::SHUFFLE));
}
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
break;

View File

@ -35,6 +35,7 @@
#include "pipeline/pipeline.h"
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_task.h"
#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
#include "pipeline/pipeline_x/pipeline_x_task.h"
#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
@ -125,7 +126,8 @@ private:
void _close_fragment_instance() override;
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override;
Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op, PipelinePtr& cur_pipe,
const TPlanNode& tnode, const std::vector<TExpr>& texprs);
const TPlanNode& tnode, const std::vector<TExpr>& texprs,
ExchangeType exchange_type);
[[nodiscard]] Status _build_pipelines(ObjectPool* pool,
const doris::TPipelineFragmentParams& request,
@ -151,6 +153,7 @@ private:
const TPipelineFragmentParams& params, const RowDescriptor& row_desc,
RuntimeState* state, DescriptorTbl& desc_tbl,
PipelineId cur_pipeline_id);
Status _plan_local_shuffle();
bool _has_inverted_index_or_partial_update(TOlapTableSink sink);