diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp index b44f15d13e..e02c8fed7f 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp @@ -23,12 +23,4 @@ OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() { return std::make_shared(this, _sink); } -Status MultiCastDataStreamSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { - RETURN_IF_ERROR(Base::init(state, info)); - auto& p = _parent->cast(); - _shared_state->multi_cast_data_streamer = std::make_shared( - p._row_desc, p._pool, p._cast_sender_count); - return Status::OK(); -} - } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h b/be/src/pipeline/exec/multi_cast_data_stream_sink.h index f949b624c7..ea5a155319 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h @@ -47,8 +47,6 @@ class MultiCastDataStreamSinkLocalState final ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState); MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) {} - - Status init(RuntimeState* state, LocalSinkStateInfo& info) override; friend class MultiCastDataStreamSinkOperatorX; friend class DataSinkOperatorX; using Base = PipelineXSinkLocalState; @@ -72,11 +70,6 @@ public: _row_desc(row_desc), _cast_sender_count(cast_sender_count) {} ~MultiCastDataStreamSinkOperatorX() override = default; - Status init(const TDataSink& tsink) override { return Status::OK(); } - - Status open(doris::RuntimeState* state) override { return Status::OK(); }; - - Status prepare(RuntimeState* state) override { return Status::OK(); } Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override { diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 3d2b8157fa..98ab33ff1a 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -119,7 +119,9 @@ public: : Base(pool, id), _consumer_id(consumer_id), _t_data_stream_sink(sink), - _row_descriptor(row_descriptor) {}; + _row_descriptor(row_descriptor) { + _op_name = "MULTI_CAST_DATA_STREAM_SOURCE_OPERATOR"; + }; ~MultiCastDataStreamerSourceOperatorX() override = default; Dependency* wait_for_dependency(RuntimeState* state) override { CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 81bee3063d..3dd77167f3 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -275,13 +275,19 @@ Status DataSinkOperatorX::setup_local_states( RuntimeState* state, std::vector& infos) { auto multi_cast_data_streamer = static_cast(this)->create_multi_cast_data_streamer(); - for (auto& info : infos) { - auto local_state = MultiCastDataStreamSinkLocalState::create_shared(this, state); - state->emplace_sink_local_state(id(), local_state); - RETURN_IF_ERROR(local_state->init(state, info)); - local_state->_shared_state->multi_cast_data_streamer = multi_cast_data_streamer; + for (int i = 0; i < infos.size(); i++) { + auto& info = infos[i]; + if (i == 0) { + auto local_state = MultiCastDataStreamSinkLocalState::create_shared(this, state); + state->emplace_sink_local_state(id(), local_state); + RETURN_IF_ERROR(local_state->init(state, info)); + local_state->_shared_state->multi_cast_data_streamer = multi_cast_data_streamer; + } else { + auto* _shared_state = + (typename MultiCastDependency::SharedState*)info.dependency->shared_state(); + _shared_state->multi_cast_data_streamer = multi_cast_data_streamer; + } } - return Status::OK(); } @@ -320,15 +326,28 @@ Status OperatorX::setup_local_states(RuntimeState* state, std::vector& infos) { int child_count = static_cast(this)->get_child_count(); std::shared_ptr data_queue; - for (auto& info : infos) { - auto local_state = UnionSourceLocalState::create_shared(state, this); - state->emplace_local_state(id(), local_state); - RETURN_IF_ERROR(local_state->init(state, info)); - if (child_count != 0) { - if (!data_queue) { - data_queue = local_state->create_data_queue(); - } + + if (child_count == 0) { + // for union only have const expr + for (auto& info : infos) { + auto local_state = UnionSourceLocalState::create_shared(state, this); + state->emplace_local_state(id(), local_state); + RETURN_IF_ERROR(local_state->init(state, info)); + } + return Status::OK(); + } + for (int i = 0; i < infos.size(); i++) { + auto& info = infos[i]; + if (i == 0) { + auto local_state = UnionSourceLocalState::create_shared(state, this); + state->emplace_local_state(id(), local_state); + RETURN_IF_ERROR(local_state->init(state, info)); + data_queue = local_state->create_data_queue(); local_state->_shared_state->data_queue = data_queue; + } else { + auto* _shared_state = + (typename UnionDependency::SharedState*)info.dependency->shared_state(); + _shared_state->data_queue = data_queue; } } return Status::OK(); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 85a9527eeb..64ece0baf2 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -313,27 +313,36 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData thrift_sink.multi_cast_stream_sink, row_desc)); for (int i = 0; i < sender_size; ++i) { auto new_pipeline = add_pipeline(); - auto _row_desc = - !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty() - ? RowDescriptor( - state->desc_tbl(), - {thrift_sink.multi_cast_stream_sink.sinks[i].output_tuple_id}, - {false}) - : _sink->row_desc(); + RowDescriptor* _row_desc = nullptr; + { + auto& tmp_row_desc = + !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty() + ? RowDescriptor(state->desc_tbl(), + {thrift_sink.multi_cast_stream_sink.sinks[i] + .output_tuple_id}, + {false}) + : _sink->row_desc(); + _row_desc = pool->add(new RowDescriptor(tmp_row_desc)); + } auto source_id = sources[i]; OperatorXPtr source_op; // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline source_op.reset(new MultiCastDataStreamerSourceOperatorX( i, pool, thrift_sink.multi_cast_stream_sink.sinks[i], row_desc, source_id)); static_cast(new_pipeline->add_operator(source_op)); - // 2. create and set sink operator of data stream sender for new pipeline DataSinkOperatorXPtr sink_op; sink_op.reset(new ExchangeSinkOperatorX( - state, row_desc, thrift_sink.multi_cast_stream_sink.sinks[i], + state, *_row_desc, thrift_sink.multi_cast_stream_sink.sinks[i], thrift_sink.multi_cast_stream_sink.destinations[i], false)); + static_cast(new_pipeline->set_sink(sink_op)); + { + TDataSink* t = pool->add(new TDataSink()); + t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i]; + RETURN_IF_ERROR(sink_op->init(*t)); + } // 3. set dependency dag _dag[new_pipeline->id()].push_back(cur_pipeline_id);