diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 2b35e1b6a2..d839be0dc1 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -215,4 +215,6 @@ Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block, return Status::OK(); } +template class DataSinkOperatorX; + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 41d276205b..c858392586 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -102,4 +102,4 @@ private: }; } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp new file mode 100644 index 0000000000..b44f15d13e --- /dev/null +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "multi_cast_data_stream_sink.h" + +namespace doris::pipeline { + +OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() { + return std::make_shared(this, _sink); +} + +Status MultiCastDataStreamSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + auto& p = _parent->cast(); + _shared_state->multi_cast_data_streamer = std::make_shared( + p._row_desc, p._pool, p._cast_sender_count); + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h b/be/src/pipeline/exec/multi_cast_data_stream_sink.h index e137a7e655..f949b624c7 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h @@ -18,6 +18,7 @@ #pragma once #include "operator.h" +#include "pipeline/pipeline_x/operator.h" #include "vec/sink/multi_cast_data_stream_sink.h" namespace doris::pipeline { @@ -40,8 +41,75 @@ public: bool can_write() override { return true; } }; -OperatorPtr MultiCastDataStreamSinkOperatorBuilder::build_operator() { - return std::make_shared(this, _sink); -} +class MultiCastDataStreamSinkOperatorX; +class MultiCastDataStreamSinkLocalState final + : public PipelineXSinkLocalState { + ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState); + MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {} + + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + friend class MultiCastDataStreamSinkOperatorX; + friend class DataSinkOperatorX; + using Base = PipelineXSinkLocalState; + using Parent = MultiCastDataStreamSinkOperatorX; + +private: + std::shared_ptr _multi_cast_data_streamer; +}; + +class MultiCastDataStreamSinkOperatorX final + : public DataSinkOperatorX { + using Base = DataSinkOperatorX; + +public: + MultiCastDataStreamSinkOperatorX(int sink_id, std::vector& sources, + const int cast_sender_count, ObjectPool* pool, + const TMultiCastDataStreamSink& sink, + const RowDescriptor& row_desc) + : Base(sink_id, sources), + _pool(pool), + _row_desc(row_desc), + _cast_sender_count(cast_sender_count) {} + ~MultiCastDataStreamSinkOperatorX() override = default; + Status init(const TDataSink& tsink) override { return Status::OK(); } + + Status open(doris::RuntimeState* state) override { return Status::OK(); }; + + Status prepare(RuntimeState* state) override { return Status::OK(); } + + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override { + CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state); + SCOPED_TIMER(local_state.profile()->total_time_counter()); + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + if (in_block->rows() > 0 || source_state == SourceState::FINISHED) { + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + auto st = local_state._shared_state->multi_cast_data_streamer->push( + state, in_block, source_state == SourceState::FINISHED); + // TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished + if (st.template is()) { + return Status::OK(); + } + return st; + } + return Status::OK(); + } + + RowDescriptor& row_desc() override { return _row_desc; } + + std::shared_ptr create_multi_cast_data_streamer() { + auto multi_cast_data_streamer = std::make_shared( + _row_desc, _pool, _cast_sender_count); + return multi_cast_data_streamer; + } + +private: + friend class MultiCastDataStreamSinkLocalState; + ObjectPool* _pool; + RowDescriptor _row_desc; + int _cast_sender_count; + friend class MultiCastDataStreamSinkLocalState; +}; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index c0e7b14659..c70d87f59e 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -130,11 +130,9 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); - if (p._t_data_stream_sink.__isset.output_exprs) { - _output_expr_contexts.resize(p._output_expr_contexts.size()); - for (size_t i = 0; i < p._output_expr_contexts.size(); i++) { - RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i])); - } + _output_expr_contexts.resize(p._output_expr_contexts.size()); + for (size_t i = 0; i < p._output_expr_contexts.size(); i++) { + RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i])); } return Status::OK(); } @@ -150,7 +148,7 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, if (!local_state._output_expr_contexts.empty()) { output_block = &tmp_block; } - local_state._shared_state->_multi_cast_data_streamer->pull(_consumer_id, output_block, &eos); + local_state._shared_state->multi_cast_data_streamer->pull(_consumer_id, output_block, &eos); if (!local_state._conjuncts.empty()) { RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block, @@ -162,9 +160,11 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, local_state._output_expr_contexts, *output_block, block)); materialize_block_inplace(*block); } + COUNTER_UPDATE(local_state._rows_returned_counter, block->rows()); if (eos) { source_state = SourceState::FINISHED; } return Status::OK(); } + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index aa20272d07..3d2b8157fa 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -108,6 +108,7 @@ public: private: vectorized::VExprContextSPtrs _output_expr_contexts; }; + class MultiCastDataStreamerSourceOperatorX final : public OperatorX { public: @@ -169,73 +170,5 @@ private: const RowDescriptor& _row_desc() { return _row_descriptor; } }; -// sink operator - -class MultiCastDataStreamSinkOperatorX; -class MultiCastDataStreamSinkLocalState final - : public PipelineXSinkLocalState { - ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState); - MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) - : Base(parent, state) {} - friend class MultiCastDataStreamSinkOperatorX; - friend class DataSinkOperatorX; - using Base = PipelineXSinkLocalState; - using Parent = MultiCastDataStreamSinkOperatorX; -}; - -class MultiCastDataStreamSinkOperatorX final - : public DataSinkOperatorX { - using Base = DataSinkOperatorX; - -public: - friend class UnionSinkLocalState; - MultiCastDataStreamSinkOperatorX(int sink_id, std::vector& sources, - const int cast_sender_count, ObjectPool* pool, - const TMultiCastDataStreamSink& sink, - const RowDescriptor& row_desc) - : Base(sink_id, sources), - _pool(pool), - _row_desc(row_desc), - _cast_sender_count(cast_sender_count) {} - ~MultiCastDataStreamSinkOperatorX() override = default; - Status init(const TDataSink& tsink) override { return Status::OK(); } - - Status open(doris::RuntimeState* state) override { return Status::OK(); }; - - Status prepare(RuntimeState* state) override { return Status::OK(); } - - Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override { - CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state); - SCOPED_TIMER(local_state.profile()->total_time_counter()); - COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - if (in_block->rows() > 0 || source_state == SourceState::FINISHED) { - COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - auto st = local_state._shared_state->_multi_cast_data_streamer->push( - state, in_block, source_state == SourceState::FINISHED); - // TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished - if (st.template is()) { - return Status::OK(); - } - return st; - } - return Status::OK(); - } - - std::shared_ptr multi_cast_data_streamer() { - auto multi_cast_data_streamer = std::make_shared( - _row_desc, _pool, _cast_sender_count); - return multi_cast_data_streamer; - } - - RowDescriptor& row_desc() override { return _row_desc; } - -private: - ObjectPool* _pool; - RowDescriptor _row_desc; - int _cast_sender_count; - friend class MultiCastDataStreamSinkLocalState; -}; - } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 67a8ac6d4a..54a9603e7f 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -124,7 +124,7 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { return Status::OK(); } -std::shared_ptr UnionSourceLocalState::data_queue() { +std::shared_ptr UnionSourceLocalState::create_data_queue() { auto& p = _parent->cast(); std::shared_ptr data_queue = std::make_shared(p._child_size, _dependency); return data_queue; diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index a22a0e8c0a..d02176fc8d 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -78,7 +78,7 @@ public: UnionSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}; Status init(RuntimeState* state, LocalStateInfo& info) override; - std::shared_ptr data_queue(); + std::shared_ptr create_data_queue(); private: friend class UnionSourceOperatorX; diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 2ea0992beb..c69b49870d 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -111,7 +111,7 @@ protected: class WriteDependency : public Dependency { public: WriteDependency(int id, std::string name) : Dependency(id, name), _ready_for_write(true) {} - virtual ~WriteDependency() = default; + ~WriteDependency() override = default; bool is_write_dependency() override { return true; } @@ -428,7 +428,7 @@ private: struct MultiCastSharedState { public: - std::shared_ptr _multi_cast_data_streamer; + std::shared_ptr multi_cast_data_streamer; }; class MultiCastDependency final : public WriteDependency { @@ -438,7 +438,7 @@ public: ~MultiCastDependency() override = default; void* shared_state() override { return (void*)&_multi_cast_state; }; MultiCastDependency* can_read(const int consumer_id) { - if (_multi_cast_state._multi_cast_data_streamer->can_read(consumer_id)) { + if (_multi_cast_state.multi_cast_data_streamer->can_read(consumer_id)) { return nullptr; } else { return this; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index c6331b04fb..81bee3063d 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -37,6 +37,7 @@ #include "pipeline/exec/jdbc_scan_operator.h" #include "pipeline/exec/jdbc_table_sink_operator.h" #include "pipeline/exec/meta_scan_operator.h" +#include "pipeline/exec/multi_cast_data_stream_sink.h" #include "pipeline/exec/multi_cast_data_stream_source.h" #include "pipeline/exec/nested_loop_join_build_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h" @@ -257,20 +258,14 @@ Status DataSinkOperatorXBase::init(const TPlanNode& tnode, RuntimeState* state) return Status::OK(); } -template -Status DataSinkOperatorX::setup_local_state(RuntimeState* state, - LocalSinkStateInfo& info) { - auto local_state = LocalStateType::create_shared(this, state); - state->emplace_sink_local_state(id(), local_state); - return local_state->init(state, info); -} - template Status DataSinkOperatorX::setup_local_states( RuntimeState* state, std::vector& infos) { DCHECK(infos.size() == 1); for (auto& info : infos) { - RETURN_IF_ERROR(setup_local_state(state, info)); + auto local_state = LocalStateType::create_shared(this, state); + state->emplace_sink_local_state(id(), local_state); + RETURN_IF_ERROR(local_state->init(state, info)); } return Status::OK(); } @@ -279,12 +274,12 @@ template <> Status DataSinkOperatorX::setup_local_states( RuntimeState* state, std::vector& infos) { auto multi_cast_data_streamer = - static_cast(this)->multi_cast_data_streamer(); + static_cast(this)->create_multi_cast_data_streamer(); for (auto& info : infos) { auto local_state = MultiCastDataStreamSinkLocalState::create_shared(this, state); state->emplace_sink_local_state(id(), local_state); RETURN_IF_ERROR(local_state->init(state, info)); - local_state->_shared_state->_multi_cast_data_streamer = multi_cast_data_streamer; + local_state->_shared_state->multi_cast_data_streamer = multi_cast_data_streamer; } return Status::OK(); @@ -331,7 +326,7 @@ Status OperatorX::setup_local_states(RuntimeState* state, RETURN_IF_ERROR(local_state->init(state, info)); if (child_count != 0) { if (!data_queue) { - data_queue = local_state->data_queue(); + data_queue = local_state->create_data_queue(); } local_state->_shared_state->data_queue = data_queue; } diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 53a294412a..149d28265e 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -419,8 +419,6 @@ public: Status prepare(RuntimeState* state) override { return Status::OK(); } Status open(RuntimeState* state) override { return Status::OK(); } - virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) = 0; - virtual Status setup_local_states(RuntimeState* state, std::vector& infos) = 0; @@ -529,8 +527,6 @@ public: : DataSinkOperatorXBase(id, sources) {} ~DataSinkOperatorX() override = default; - Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override; - Status setup_local_states(RuntimeState* state, std::vector& infos) override; void get_dependency(std::vector& dependency) override; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 4dedbce44f..85a9527eeb 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -60,6 +60,7 @@ #include "pipeline/exec/jdbc_scan_operator.h" #include "pipeline/exec/jdbc_table_sink_operator.h" #include "pipeline/exec/meta_scan_operator.h" +#include "pipeline/exec/multi_cast_data_stream_sink.h" #include "pipeline/exec/multi_cast_data_stream_source.h" #include "pipeline/exec/nested_loop_join_build_operator.h" #include "pipeline/exec/nested_loop_join_probe_operator.h"