From fe42e528517b210b4dd182bd5e4114f4348c5d72 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Thu, 18 May 2023 10:34:37 +0800 Subject: [PATCH] [pipeline](CTE) Support multi stream data sink in pipeline (#19519) --- be/src/exec/data_sink.cpp | 12 +++ be/src/pipeline/CMakeLists.txt | 4 +- .../pipeline/exec/exchange_sink_operator.cpp | 27 ++++-- be/src/pipeline/exec/exchange_sink_operator.h | 7 +- .../exec/multi_cast_data_stream_sink.h | 47 +++++++++++ .../exec/multi_cast_data_stream_source.cpp | 64 +++++++++++++++ .../exec/multi_cast_data_stream_source.h | 78 ++++++++++++++++++ .../exec/multi_cast_data_streamer.cpp | 75 +++++++++++++++++ .../pipeline/exec/multi_cast_data_streamer.h | 82 +++++++++++++++++++ be/src/pipeline/pipeline_fragment_context.cpp | 47 ++++++++++- be/src/pipeline/pipeline_fragment_context.h | 5 +- be/src/vec/sink/multi_cast_data_stream_sink.h | 58 +++++++++++++ be/src/vec/sink/vdata_stream_sender.cpp | 2 - 13 files changed, 493 insertions(+), 15 deletions(-) create mode 100644 be/src/pipeline/exec/multi_cast_data_stream_sink.h create mode 100644 be/src/pipeline/exec/multi_cast_data_stream_source.cpp create mode 100644 be/src/pipeline/exec/multi_cast_data_stream_source.h create mode 100644 be/src/pipeline/exec/multi_cast_data_streamer.cpp create mode 100644 be/src/pipeline/exec/multi_cast_data_streamer.h create mode 100644 be/src/vec/sink/multi_cast_data_stream_sink.h diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index f93aa7bb6f..89cc207497 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -31,6 +31,7 @@ #include #include "common/config.h" +#include "vec/sink/multi_cast_data_stream_sink.h" #include "vec/sink/vdata_stream_sender.h" #include "vec/sink/vjdbc_table_sink.h" #include "vec/sink/vmemory_scratch_sink.h" @@ -161,6 +162,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink RETURN_IF_ERROR(status); break; } + case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: { + return Status::NotSupported("MULTI_CAST_DATA_STREAM_SINK only support in pipeline engine"); + } default: { std::stringstream error_msg; @@ -302,6 +306,14 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink RETURN_IF_ERROR(status); break; } + case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: { + DCHECK(thrift_sink.__isset.multi_cast_stream_sink); + DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0); + auto multi_cast_data_streamer = std::make_shared( + row_desc, pool, thrift_sink.multi_cast_stream_sink.sinks.size()); + sink->reset(new vectorized::MultiCastDataStreamSink(multi_cast_data_streamer)); + break; + } default: { std::stringstream error_msg; diff --git a/be/src/pipeline/CMakeLists.txt b/be/src/pipeline/CMakeLists.txt index 03d3410e7b..1b3c2784be 100644 --- a/be/src/pipeline/CMakeLists.txt +++ b/be/src/pipeline/CMakeLists.txt @@ -58,7 +58,9 @@ set(PIPELINE_FILES exec/union_source_operator.cpp exec/data_queue.cpp exec/select_operator.cpp - exec/empty_source_operator.cpp) + exec/empty_source_operator.cpp + exec/multi_cast_data_streamer.cpp + exec/multi_cast_data_stream_source.cpp) if (WITH_MYSQL) set(PIPELINE_FILES diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index ab7a1ac07c..1405e8f17a 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -34,20 +34,33 @@ class DataSink; namespace doris::pipeline { ExchangeSinkOperatorBuilder::ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink, - PipelineFragmentContext* context) - : DataSinkOperatorBuilder(id, "ExchangeSinkOperator", sink), _context(context) {} + PipelineFragmentContext* context, + int mult_cast_id) + : DataSinkOperatorBuilder(id, "ExchangeSinkOperator", sink), + _context(context), + _mult_cast_id(mult_cast_id) {} OperatorPtr ExchangeSinkOperatorBuilder::build_operator() { - return std::make_shared(this, _sink, _context); + return std::make_shared(this, _sink, _context, _mult_cast_id); } ExchangeSinkOperator::ExchangeSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink, - PipelineFragmentContext* context) - : DataSinkOperator(operator_builder, sink), _context(context) {} + PipelineFragmentContext* context, int mult_cast_id) + : DataSinkOperator(operator_builder, sink), + _context(context), + _mult_cast_id(mult_cast_id) {} Status ExchangeSinkOperator::init(const TDataSink& tsink) { - RETURN_IF_ERROR(_sink->init(tsink)); - _dest_node_id = tsink.stream_sink.dest_node_id; + // -1 means not the mult cast stream sender + if (_mult_cast_id == -1) { + RETURN_IF_ERROR(_sink->init(tsink)); + _dest_node_id = tsink.stream_sink.dest_node_id; + } else { + TDataSink new_t_sink; + new_t_sink.stream_sink = tsink.multi_cast_stream_sink.sinks[_mult_cast_id]; + RETURN_IF_ERROR(_sink->init(new_t_sink)); + _dest_node_id = tsink.multi_cast_stream_sink.sinks[_mult_cast_id].dest_node_id; + } return Status::OK(); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 034ceeffa2..0ebb8b3e9e 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -37,19 +37,21 @@ class PipelineFragmentContext; class ExchangeSinkOperatorBuilder final : public DataSinkOperatorBuilder { public: - ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink, PipelineFragmentContext* context); + ExchangeSinkOperatorBuilder(int32_t id, DataSink* sink, PipelineFragmentContext* context, + int mult_cast_id = -1); OperatorPtr build_operator() override; private: PipelineFragmentContext* _context; + int _mult_cast_id = -1; }; // Now local exchange is not supported since VDataStreamRecvr is considered as a pipeline broker. class ExchangeSinkOperator final : public DataSinkOperator { public: ExchangeSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink, - PipelineFragmentContext* context); + PipelineFragmentContext* context, int mult_cast_id); Status init(const TDataSink& tsink) override; Status prepare(RuntimeState* state) override; @@ -65,6 +67,7 @@ private: int _dest_node_id = -1; RuntimeState* _state = nullptr; PipelineFragmentContext* _context; + int _mult_cast_id = -1; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h b/be/src/pipeline/exec/multi_cast_data_stream_sink.h new file mode 100644 index 0000000000..e137a7e655 --- /dev/null +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" +#include "vec/sink/multi_cast_data_stream_sink.h" + +namespace doris::pipeline { + +class MultiCastDataStreamSinkOperatorBuilder final + : public DataSinkOperatorBuilder { +public: + MultiCastDataStreamSinkOperatorBuilder(int32_t id, DataSink* sink) + : DataSinkOperatorBuilder(id, "MultiCastDataStreamSinkOperator", sink) {} + + OperatorPtr build_operator() override; +}; + +class MultiCastDataStreamSinkOperator final + : public DataSinkOperator { +public: + MultiCastDataStreamSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) + : DataSinkOperator(operator_builder, sink) {} + + bool can_write() override { return true; } +}; + +OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() { + return std::make_shared(this, _sink); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp new file mode 100644 index 0000000000..5334d2a669 --- /dev/null +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "multi_cast_data_stream_source.h" + +#include + +#include "common/status.h" +#include "pipeline/exec/multi_cast_data_streamer.h" +#include "pipeline/exec/operator.h" +#include "vec/core/block.h" + +namespace doris::pipeline { + +MultiCastDataStreamerSourceOperatorBuilder::MultiCastDataStreamerSourceOperatorBuilder( + int32_t id, const int consumer_id, std::shared_ptr& data_streamer) + : OperatorBuilderBase(id, "MultiCastDataStreamerSourceOperator"), + _consumer_id(consumer_id), + _multi_cast_data_streamer(data_streamer) {}; + +OperatorPtr MultiCastDataStreamerSourceOperatorBuilder::build_operator() { + return std::make_shared(this, _consumer_id, + _multi_cast_data_streamer); +} + +const RowDescriptor& MultiCastDataStreamerSourceOperatorBuilder::row_desc() { + return _multi_cast_data_streamer->row_desc(); +} + +MultiCastDataStreamerSourceOperator::MultiCastDataStreamerSourceOperator( + OperatorBuilderBase* operator_builder, const int consumer_id, + std::shared_ptr& data_streamer) + : OperatorBase(operator_builder), + _consumer_id(consumer_id), + _multi_cast_data_streamer(data_streamer) {}; + +bool MultiCastDataStreamerSourceOperator::can_read() { + return _multi_cast_data_streamer->can_read(_consumer_id); +} + +Status MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { + bool eos = false; + _multi_cast_data_streamer->pull(_consumer_id, block, &eos); + if (eos) { + source_state = SourceState::FINISHED; + } + return Status::OK(); +} +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h new file mode 100644 index 0000000000..c44b37ee2e --- /dev/null +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include + +#include + +#include "common/status.h" +#include "operator.h" + +namespace doris { +class ExecNode; +class RuntimeState; + +namespace vectorized { +class Block; +} // namespace vectorized + +namespace pipeline { +class MultiCastDataStreamer; + +class MultiCastDataStreamerSourceOperatorBuilder final : public OperatorBuilderBase { +public: + MultiCastDataStreamerSourceOperatorBuilder(int32_t id, const int consumer_id, + std::shared_ptr&); + + bool is_source() const override { return true; } + + OperatorPtr build_operator() override; + + const RowDescriptor& row_desc() override; + +private: + const int _consumer_id; + std::shared_ptr _multi_cast_data_streamer; +}; + +class MultiCastDataStreamerSourceOperator final : public OperatorBase { +public: + MultiCastDataStreamerSourceOperator(OperatorBuilderBase* operator_builder, + const int consumer_id, + std::shared_ptr& data_streamer); + + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + + Status prepare(RuntimeState* state) override { return Status::OK(); }; + + Status open(RuntimeState* state) override { return Status::OK(); }; + + Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override { + return Status::OK(); + } + + bool can_read() override; + +private: + const int _consumer_id; + std::shared_ptr _multi_cast_data_streamer; +}; + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp b/be/src/pipeline/exec/multi_cast_data_streamer.cpp new file mode 100644 index 0000000000..2262fb3930 --- /dev/null +++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "multi_cast_data_streamer.h" + +#include "runtime/runtime_state.h" + +namespace doris::pipeline { + +MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size) + : _used_count(used_count), _mem_size(mem_size) { + _block = vectorized::Block::create_unique(block->get_columns_with_type_and_name()); + block->clear(); +} + +void MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) { + std::lock_guard l(_mutex); + auto& pos_to_pull = _sender_pos_to_read[sender_idx]; + if (pos_to_pull != _multi_cast_blocks.end()) { + if (pos_to_pull->_used_count == 1) { + DCHECK(pos_to_pull == _multi_cast_blocks.begin()); + pos_to_pull->_block->swap(*block); + + _cumulative_mem_size -= pos_to_pull->_mem_size; + pos_to_pull++; + _multi_cast_blocks.pop_front(); + } else { + pos_to_pull->_used_count--; + pos_to_pull->_block->create_same_struct_block(0)->swap(*block); + (void)vectorized::MutableBlock(block).merge(*pos_to_pull->_block); + pos_to_pull++; + } + } + *eos = _eos and pos_to_pull == _multi_cast_blocks.end(); +} + +void MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block* block, bool eos) { + auto rows = block->rows(); + COUNTER_UPDATE(_process_rows, rows); + + auto block_mem_size = block->allocated_bytes(); + std::lock_guard l(_mutex); + int need_process_count = _cast_sender_count - _opened_sender_count; + // TODO: if the [queue back block rows + block->rows()] < batch_size, better + // do merge block. but need check the need_process_count and used_count whether + // equal + _multi_cast_blocks.emplace_back(block, need_process_count, block_mem_size); + _cumulative_mem_size += block_mem_size; + COUNTER_SET(_peak_mem_usage, std::max(_cumulative_mem_size, _peak_mem_usage->value())); + + auto end = _multi_cast_blocks.end(); + end--; + for (int i = 0; i < _sender_pos_to_read.size(); ++i) { + if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) { + _sender_pos_to_read[i] = end; + } + } + _eos = eos; +} + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h b/be/src/pipeline/exec/multi_cast_data_streamer.h new file mode 100644 index 0000000000..022761bf3d --- /dev/null +++ b/be/src/pipeline/exec/multi_cast_data_streamer.h @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/sink/vdata_stream_sender.h" + +namespace doris::pipeline { + +struct MultiCastBlock { + MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size); + + std::unique_ptr _block; + int _used_count; + size_t _mem_size; +}; + +// TDOD: MultiCastDataStreamer same as the data queue, maybe rethink union and refactor the +// code +class MultiCastDataStreamer { +public: + MultiCastDataStreamer(const RowDescriptor& row_desc, ObjectPool* pool, int cast_sender_count) + : _row_desc(row_desc), + _profile(pool->add(new RuntimeProfile("MultiCastDataStreamSink"))), + _cast_sender_count(cast_sender_count) { + _sender_pos_to_read.resize(cast_sender_count, _multi_cast_blocks.end()); + _peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES); + _process_rows = ADD_COUNTER(profile(), "ProcessRows", TUnit::UNIT); + }; + + ~MultiCastDataStreamer() = default; + + void pull(int sender_idx, vectorized::Block* block, bool* eos); + + void push(RuntimeState* state, vectorized::Block* block, bool eos); + + // use sink to check can_write, now always true after we support spill to disk + bool can_write() { return true; } + + bool can_read(int sender_idx) { + std::lock_guard l(_mutex); + return _sender_pos_to_read[sender_idx] != _multi_cast_blocks.end() || _eos; + } + + const RowDescriptor& row_desc() { return _row_desc; } + + RuntimeProfile* profile() { return _profile; } + + void set_eos() { + std::lock_guard l(_mutex); + _eos = true; + } + +private: + const RowDescriptor& _row_desc; + RuntimeProfile* _profile; + std::list _multi_cast_blocks; + std::vector::iterator> _sender_pos_to_read; + std::mutex _mutex; + bool _eos = false; + int _cast_sender_count = 0; + int _opened_sender_count = 0; + int64_t _cumulative_mem_size = 0; + + RuntimeProfile::Counter* _process_rows; + RuntimeProfile::Counter* _peak_mem_usage; +}; +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 362a697fcb..408a41da6e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -54,6 +54,8 @@ #include "pipeline/exec/exchange_source_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" +#include "pipeline/exec/multi_cast_data_stream_sink.h" +#include "pipeline/exec/multi_cast_data_stream_source.h" #include "pipeline/exec/mysql_scan_operator.h" // IWYU pragma: keep #include "pipeline/exec/nested_loop_join_build_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h" @@ -307,7 +309,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _root_pipeline = fragment_context->add_pipeline(); RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline)); if (_sink) { - RETURN_IF_ERROR(_create_sink(request.fragment.output_sink)); + RETURN_IF_ERROR( + _create_sink(request.local_params[idx].sender_id, request.fragment.output_sink)); } RETURN_IF_ERROR(_build_pipeline_tasks(request)); if (_sink) { @@ -341,6 +344,12 @@ Status PipelineFragmentContext::_build_pipeline_tasks( RETURN_IF_ERROR(task->prepare(_runtime_state.get())); } _total_tasks = _tasks.size(); + + // register the profile of child data stream sender + for (auto& sender : _multi_cast_stream_sink_senders) { + _sink->profile()->add_child(sender->profile(), true, nullptr); + } + return Status::OK(); } @@ -686,7 +695,7 @@ void PipelineFragmentContext::close_if_prepare_failed() { } // construct sink operator -Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) { +Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thrift_sink) { OperatorBuilderPtr sink_; switch (thrift_sink.type) { case TDataSinkType::DATA_STREAM_SINK: { @@ -715,6 +724,40 @@ Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) { _sink.get()); break; } + case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: { + sink_ = std::make_shared(next_operator_builder_id(), + _sink.get()); + RETURN_IF_ERROR(_root_pipeline->set_sink(sink_)); + + auto& multi_cast_data_streamer = + assert_cast(_sink.get()) + ->get_multi_cast_data_streamer(); + DCHECK_EQ(thrift_sink.multi_cast_stream_sink.sinks.size(), + thrift_sink.multi_cast_stream_sink.destinations.size()); + auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size(); + _multi_cast_stream_sink_senders.resize(sender_size); + for (int i = 0; i < sender_size; ++i) { + auto new_pipeline = add_pipeline(); + // 1. create the data stream sender sink + _multi_cast_stream_sink_senders[i].reset(new vectorized::VDataStreamSender( + _runtime_state.get(), _runtime_state->obj_pool(), sender_id, sink_->row_desc(), + thrift_sink.multi_cast_stream_sink.sinks[i], + thrift_sink.multi_cast_stream_sink.destinations[i], 16 * 1024, false)); + + // 2. create and set the source operator of multi_cast_data_stream_source for new pipeline + OperatorBuilderPtr source_op = + std::make_shared( + next_operator_builder_id(), i, multi_cast_data_streamer); + new_pipeline->add_operator(source_op); + + // 3. create and set sink operator of data stream sender for new pipeline + OperatorBuilderPtr sink_op_builder = std::make_shared( + next_operator_builder_id(), _multi_cast_stream_sink_senders[i].get(), this, i); + new_pipeline->set_sink(sink_op_builder); + } + + return Status::OK(); + } default: return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type); } diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index db421262c2..7b429fb5d0 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -123,7 +123,7 @@ public: taskgroup::TaskGroup* get_task_group() const { return _query_ctx->get_task_group(); } private: - Status _create_sink(const TDataSink& t_data_sink); + Status _create_sink(int sender_id, const TDataSink& t_data_sink); Status _build_pipelines(ExecNode*, PipelinePtr); Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request); template @@ -168,7 +168,10 @@ private: std::unique_ptr _runtime_state; ExecNode* _root_plan = nullptr; // lives in _runtime_state->obj_pool() + // TODO: remove the _sink and _multi_cast_stream_sink_senders to set both + // of it in pipeline task not the fragment_context std::unique_ptr _sink; + std::vector> _multi_cast_stream_sink_senders; std::shared_ptr _query_ctx; diff --git a/be/src/vec/sink/multi_cast_data_stream_sink.h b/be/src/vec/sink/multi_cast_data_stream_sink.h new file mode 100644 index 0000000000..9c1e63b442 --- /dev/null +++ b/be/src/vec/sink/multi_cast_data_stream_sink.h @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "pipeline/exec/multi_cast_data_streamer.h" +#include "vec/sink/vdata_stream_sender.h" + +namespace doris::vectorized { + +class MultiCastDataStreamSink : public DataSink { +public: + MultiCastDataStreamSink(std::shared_ptr& streamer) + : _multi_cast_data_streamer(streamer) {}; + + ~MultiCastDataStreamSink() override = default; + + Status send(RuntimeState* state, Block* block, bool eos = false) override { + _multi_cast_data_streamer->push(state, block, eos); + return Status::OK(); + }; + + Status open(doris::RuntimeState* state) override { return Status::OK(); }; + + // use sink to check can_write, now always true after we support spill to disk + bool can_write() { return _multi_cast_data_streamer->can_write(); } + + RuntimeProfile* profile() override { return _multi_cast_data_streamer->profile(); } + + const RowDescriptor& row_desc() { return _multi_cast_data_streamer->row_desc(); } + + std::shared_ptr& get_multi_cast_data_streamer() { + return _multi_cast_data_streamer; + } + + Status close(RuntimeState* state, Status exec_status) override { + _multi_cast_data_streamer->set_eos(); + return DataSink::close(state, exec_status); + } + +private: + std::shared_ptr _multi_cast_data_streamer; +}; +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 48edab1911..446716970e 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -45,9 +45,7 @@ #include "util/brpc_client_cache.h" #include "util/proto_util.h" #include "util/telemetry/telemetry.h" -#include "vec/columns/column.h" #include "vec/common/sip_hash.h" -#include "vec/core/column_with_type_and_name.h" #include "vec/exprs/vexpr.h" #include "vec/runtime/vdata_stream_mgr.h" #include "vec/runtime/vdata_stream_recvr.h"