diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 34ba9e2449..bae9e1a8ec 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -52,9 +52,9 @@ template class AggSinkLocalState : public PipelineXSinkLocalState { public: using Base = PipelineXSinkLocalState; - virtual ~AggSinkLocalState() = default; + ~AggSinkLocalState() override = default; - virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; @@ -332,7 +332,7 @@ template class AggSinkOperatorX : public DataSinkOperatorX { public: AggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - virtual ~AggSinkOperatorX() = default; + ~AggSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX::_name); @@ -343,8 +343,8 @@ public: Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - virtual Status sink(RuntimeState* state, vectorized::Block* in_block, - SourceState source_state) override; + Status sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state) override; using DataSinkOperatorX::id; diff --git a/be/src/pipeline/exec/aggregation_source_operator.h b/be/src/pipeline/exec/aggregation_source_operator.h index 274424f1f9..2d5ca45dec 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.h +++ b/be/src/pipeline/exec/aggregation_source_operator.h @@ -50,11 +50,12 @@ public: class AggSourceOperatorX; -class AggLocalState : public PipelineXLocalState { +class AggLocalState final : public PipelineXLocalState { public: using Base = PipelineXLocalState; ENABLE_FACTORY_CREATOR(AggLocalState); AggLocalState(RuntimeState* state, OperatorXBase* parent); + ~AggLocalState() override = default; Status init(RuntimeState* state, LocalStateInfo& info) override; Status close(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 0b05491901..2c899ec49a 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -120,7 +120,7 @@ public: : WriteDependency(id, "ChannelDependency"), _sender_id(sender_id), _local_recvr(local_recvr) {} - virtual ~ChannelDependency() = default; + ~ChannelDependency() override = default; void* shared_state() override { return nullptr; } @@ -150,7 +150,7 @@ private: vectorized::VDataStreamRecvr* _local_recvr; }; -class ExchangeSinkLocalState : public PipelineXSinkLocalState<> { +class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState); public: @@ -312,4 +312,4 @@ private: }; } // namespace pipeline -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index df6d44494e..a7e146b54d 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -50,7 +50,7 @@ public: bool is_pending_finish() const override; }; -struct ExchangeDataDependency : public Dependency { +struct ExchangeDataDependency final : public Dependency { public: ENABLE_FACTORY_CREATOR(ExchangeDataDependency); ExchangeDataDependency(int id, vectorized::VDataStreamRecvr::SenderQueue* sender_queue) @@ -89,7 +89,7 @@ private: }; class ExchangeSourceOperatorX; -class ExchangeLocalState : public PipelineXLocalState<> { +class ExchangeLocalState final : public PipelineXLocalState<> { ENABLE_FACTORY_CREATOR(ExchangeLocalState); ExchangeLocalState(RuntimeState* state, OperatorXBase* parent); @@ -123,12 +123,12 @@ public: SourceState& source_state) override; Status close(RuntimeState* state) override; - bool is_source() const override { return true; } + [[nodiscard]] bool is_source() const override { return true; } - RowDescriptor input_row_desc() const { return _input_row_desc; } + [[nodiscard]] RowDescriptor input_row_desc() const { return _input_row_desc; } - int num_senders() const { return _num_senders; } - bool is_merging() const { return _is_merging; } + [[nodiscard]] int num_senders() const { return _num_senders; } + [[nodiscard]] bool is_merging() const { return _is_merging; } std::shared_ptr sub_plan_query_statistics_recvr() { return _sub_plan_query_statistics_recvr; diff --git a/be/src/pipeline/exec/join_build_sink_operator.h b/be/src/pipeline/exec/join_build_sink_operator.h index 361982dd97..55553df462 100644 --- a/be/src/pipeline/exec/join_build_sink_operator.h +++ b/be/src/pipeline/exec/join_build_sink_operator.h @@ -36,7 +36,7 @@ public: protected: JoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : PipelineXSinkLocalState(parent, state) {} - virtual ~JoinBuildSinkLocalState() = default; + ~JoinBuildSinkLocalState() override = default; template friend class JoinBuildSinkOperatorX; @@ -53,7 +53,7 @@ template class JoinBuildSinkOperatorX : public DataSinkOperatorX { public: JoinBuildSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - virtual ~JoinBuildSinkOperatorX() = default; + ~JoinBuildSinkOperatorX() override = default; protected: void _init_join_op(); diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index e3ed33e4d9..1a7090b232 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -31,8 +31,8 @@ template class JoinProbeLocalState : public PipelineXLocalState { public: using Base = PipelineXLocalState; - virtual Status init(RuntimeState* state, LocalStateInfo& info) override; - virtual Status close(RuntimeState* state) override; + Status init(RuntimeState* state, LocalStateInfo& info) override; + Status close(RuntimeState* state) override; virtual void add_tuple_is_null_column(vectorized::Block* block) = 0; protected: @@ -42,7 +42,7 @@ protected: : Base(state, parent), _child_block(vectorized::Block::create_unique()), _child_source_state(SourceState::DEPEND_ON_SOURCE) {} - virtual ~JoinProbeLocalState() = default; + ~JoinProbeLocalState() override = default; void _construct_mutable_join_block(); Status _build_output_block(vectorized::Block* origin_block, vectorized::Block* output_block, bool keep_origin = true); @@ -67,7 +67,7 @@ class JoinProbeOperatorX : public StatefulOperatorX { public: using Base = StatefulOperatorX; JoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - virtual Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status init(const TPlanNode& tnode, RuntimeState* state) override; Status open(doris::RuntimeState* state) override; [[nodiscard]] const RowDescriptor& row_desc() override { return *_output_row_desc; } diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h index 1796716804..e92c5ff4e6 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h @@ -81,6 +81,7 @@ public: using Base = AggSinkLocalState; ENABLE_FACTORY_CREATOR(StreamingAggSinkLocalState); StreamingAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state); + ~StreamingAggSinkLocalState() override = default; Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status close(RuntimeState* state, Status exec_status) override; @@ -108,6 +109,7 @@ private: class StreamingAggSinkOperatorX final : public AggSinkOperatorX { public: StreamingAggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~StreamingAggSinkOperatorX() override = default; Status init(const TPlanNode& tnode, RuntimeState* state) override; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override; diff --git a/be/src/pipeline/exec/table_function_operator.cpp b/be/src/pipeline/exec/table_function_operator.cpp index 3f6133c3fc..7ce7ac3410 100644 --- a/be/src/pipeline/exec/table_function_operator.cpp +++ b/be/src/pipeline/exec/table_function_operator.cpp @@ -21,6 +21,7 @@ #include "pipeline/exec/operator.h" #include "vec/core/block.h" +#include "vec/exprs/table_function/table_function_factory.h" namespace doris { class RuntimeState; @@ -41,4 +42,287 @@ Status TableFunctionOperator::close(doris::RuntimeState* state) { return StatefulOperator::close(state); } +TableFunctionLocalState::TableFunctionLocalState(RuntimeState* state, OperatorXBase* parent) + : PipelineXLocalState<>(state, parent), _child_block(vectorized::Block::create_unique()) {} + +Status TableFunctionLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); + auto& p = _parent->cast(); + _vfn_ctxs.resize(p._vfn_ctxs.size()); + for (size_t i = 0; i < _vfn_ctxs.size(); i++) { + RETURN_IF_ERROR(p._vfn_ctxs[i]->clone(state, _vfn_ctxs[i])); + + const std::string& tf_name = _vfn_ctxs[i]->root()->fn().name.function_name; + vectorized::TableFunction* fn = nullptr; + RETURN_IF_ERROR(vectorized::TableFunctionFactory::get_fn(tf_name, state->obj_pool(), &fn)); + fn->set_expr_context(_vfn_ctxs[i]); + _fns.push_back(fn); + } + + _cur_child_offset = -1; + return Status::OK(); +} + +void TableFunctionLocalState::_copy_output_slots( + std::vector& columns) { + if (!_current_row_insert_times) { + return; + } + auto& p = _parent->cast(); + for (auto index : p._output_slot_indexs) { + auto src_column = _child_block->get_by_position(index).column; + columns[index]->insert_many_from(*src_column, _cur_child_offset, _current_row_insert_times); + } + _current_row_insert_times = 0; +} + +// Returns the index of fn of the last eos counted from back to front +// eg: there are 3 functions in `_fns` +// eos: false, true, true +// return: 1 +// +// eos: false, false, true +// return: 2 +// +// eos: false, false, false +// return: -1 +// +// eos: true, true, true +// return: 0 +// +// return: +// 0: all fns are eos +// -1: all fns are not eos +// >0: some of fns are eos +int TableFunctionLocalState::_find_last_fn_eos_idx() const { + for (int i = _parent->cast()._fn_num - 1; i >= 0; --i) { + if (!_fns[i]->eos()) { + if (i == _parent->cast()._fn_num - 1) { + return -1; + } else { + return i + 1; + } + } + } + // all eos + return 0; +} + +// Roll to reset the table function. +// Eg: +// There are 3 functions f1, f2 and f3 in `_fns`. +// If `last_eos_idx` is 1, which means f2 and f3 are eos. +// So we need to forward f1, and reset f2 and f3. +bool TableFunctionLocalState::_roll_table_functions(int last_eos_idx) { + int i = last_eos_idx - 1; + for (; i >= 0; --i) { + _fns[i]->forward(); + if (!_fns[i]->eos()) { + break; + } + } + if (i == -1) { + // after forward, all functions are eos. + // we should process next child row to get more table function results. + return false; + } + + for (int j = i + 1; j < _parent->cast()._fn_num; ++j) { + _fns[j]->reset(); + } + + return true; +} + +bool TableFunctionLocalState::_is_inner_and_empty() { + for (int i = 0; i < _parent->cast()._fn_num; i++) { + // if any table function is not outer and has empty result, go to next child row + if (!_fns[i]->is_outer() && _fns[i]->current_empty()) { + return true; + } + } + return false; +} + +Status TableFunctionLocalState::get_expanded_block(RuntimeState* state, + vectorized::Block* output_block, + SourceState& source_state) { + auto& p = _parent->cast(); + vectorized::MutableBlock m_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( + output_block, p._output_slots); + vectorized::MutableColumns& columns = m_block.mutable_columns(); + + for (int i = 0; i < p._fn_num; i++) { + if (columns[i + p._child_slots.size()]->is_nullable()) { + _fns[i]->set_nullable(); + } + } + + while (columns[p._child_slots.size()]->size() < state->batch_size()) { + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(state->check_query_state("VTableFunctionNode, while getting next batch.")); + + if (_child_block->rows() == 0) { + break; + } + + bool skip_child_row = false; + while (columns[p._child_slots.size()]->size() < state->batch_size()) { + int idx = _find_last_fn_eos_idx(); + if (idx == 0 || skip_child_row) { + _copy_output_slots(columns); + // all table functions' results are exhausted, process next child row. + RETURN_IF_ERROR(process_next_child_row()); + if (_cur_child_offset == -1) { + break; + } + } else if (idx < p._fn_num && idx != -1) { + // some of table functions' results are exhausted. + if (!_roll_table_functions(idx)) { + // continue to process next child row. + continue; + } + } + + // if any table function is not outer and has empty result, go to next child row + if (skip_child_row = _is_inner_and_empty(); skip_child_row) { + continue; + } + if (p._fn_num == 1) { + _current_row_insert_times += _fns[0]->get_value( + columns[p._child_slots.size()], + state->batch_size() - columns[p._child_slots.size()]->size()); + } else { + for (int i = 0; i < p._fn_num; i++) { + _fns[i]->get_value(columns[i + p._child_slots.size()]); + } + _current_row_insert_times++; + _fns[p._fn_num - 1]->forward(); + } + } + } + + _copy_output_slots(columns); + + size_t row_size = columns[p._child_slots.size()]->size(); + for (auto index : p._useless_slot_indexs) { + columns[index]->insert_many_defaults(row_size - columns[index]->size()); + } + + // 3. eval conjuncts + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block, + output_block->columns())); + + if (_child_source_state == SourceState::FINISHED && _cur_child_offset == -1) { + source_state = SourceState::FINISHED; + } + return Status::OK(); +} + +Status TableFunctionLocalState::process_next_child_row() { + _cur_child_offset++; + + if (_cur_child_offset >= _child_block->rows()) { + // release block use count. + for (vectorized::TableFunction* fn : _fns) { + RETURN_IF_ERROR(fn->process_close()); + } + + _child_block->clear_column_data(_parent->cast() + ._child_x->row_desc() + .num_materialized_slots()); + _cur_child_offset = -1; + return Status::OK(); + } + + for (vectorized::TableFunction* fn : _fns) { + RETURN_IF_ERROR(fn->process_row(_cur_child_offset)); + } + + return Status::OK(); +} + +TableFunctionOperatorX::TableFunctionOperatorX(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : Base(pool, tnode, descs) {} + +Status TableFunctionOperatorX::_prepare_output_slot_ids(const TPlanNode& tnode) { + // Prepare output slot ids + if (tnode.table_function_node.outputSlotIds.empty()) { + return Status::InternalError("Output slots of table function node is empty"); + } + SlotId max_id = -1; + for (auto slot_id : tnode.table_function_node.outputSlotIds) { + if (slot_id > max_id) { + max_id = slot_id; + } + } + _output_slot_ids = std::vector(max_id + 1, false); + for (auto slot_id : tnode.table_function_node.outputSlotIds) { + _output_slot_ids[slot_id] = true; + } + + return Status::OK(); +} + +Status TableFunctionOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(Base::init(tnode, state)); + + for (const TExpr& texpr : tnode.table_function_node.fnCallExprList) { + vectorized::VExprContextSPtr ctx; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(texpr, ctx)); + _vfn_ctxs.push_back(ctx); + + auto root = ctx->root(); + const std::string& tf_name = root->fn().name.function_name; + vectorized::TableFunction* fn = nullptr; + RETURN_IF_ERROR(vectorized::TableFunctionFactory::get_fn(tf_name, _pool, &fn)); + fn->set_expr_context(ctx); + _fns.push_back(fn); + } + _fn_num = _fns.size(); + + // Prepare output slot ids + RETURN_IF_ERROR(_prepare_output_slot_ids(tnode)); + return Status::OK(); +} + +Status TableFunctionOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(Base::prepare(state)); + + for (auto fn : _fns) { + RETURN_IF_ERROR(fn->prepare()); + } + RETURN_IF_ERROR(vectorized::VExpr::prepare(_vfn_ctxs, state, _row_descriptor)); + + // get current all output slots + for (const auto& tuple_desc : _row_descriptor.tuple_descriptors()) { + for (const auto& slot_desc : tuple_desc->slots()) { + _output_slots.push_back(slot_desc); + } + } + + // get all input slots + for (const auto& child_tuple_desc : _child_x->row_desc().tuple_descriptors()) { + for (const auto& child_slot_desc : child_tuple_desc->slots()) { + _child_slots.push_back(child_slot_desc); + } + } + + for (size_t i = 0; i < _child_slots.size(); i++) { + if (_slot_need_copy(i)) { + _output_slot_indexs.push_back(i); + } else { + _useless_slot_indexs.push_back(i); + } + } + + return Status::OK(); +} + +Status TableFunctionOperatorX::open(doris::RuntimeState* state) { + RETURN_IF_ERROR(Base::open(state)); + return vectorized::VExpr::open(_vfn_ctxs, state); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/table_function_operator.h b/be/src/pipeline/exec/table_function_operator.h index a4b4e77141..541938ec72 100644 --- a/be/src/pipeline/exec/table_function_operator.h +++ b/be/src/pipeline/exec/table_function_operator.h @@ -21,6 +21,7 @@ #include "common/status.h" #include "operator.h" +#include "pipeline/pipeline_x/operator.h" #include "vec/exec/vtable_function_node.h" namespace doris { @@ -45,4 +46,115 @@ public: Status close(RuntimeState* state) override; }; + +class TableFunctionOperatorX; +class TableFunctionLocalState final : public PipelineXLocalState<> { +public: + using Parent = TableFunctionOperatorX; + ENABLE_FACTORY_CREATOR(TableFunctionLocalState); + TableFunctionLocalState(RuntimeState* state, OperatorXBase* parent); + ~TableFunctionLocalState() override = default; + + Status init(RuntimeState* state, LocalStateInfo& info) override; + Status process_next_child_row(); + Status get_expanded_block(RuntimeState* state, vectorized::Block* output_block, + SourceState& source_state); + +private: + friend class TableFunctionOperatorX; + friend class StatefulOperatorX; + + void _copy_output_slots(std::vector& columns); + bool _roll_table_functions(int last_eos_idx); + // return: + // 0: all fns are eos + // -1: all fns are not eos + // >0: some of fns are eos + int _find_last_fn_eos_idx() const; + bool _is_inner_and_empty(); + + std::vector _fns; + vectorized::VExprContextSPtrs _vfn_ctxs; + int64_t _cur_child_offset = 0; + std::unique_ptr _child_block; + int _current_row_insert_times = 0; + SourceState _child_source_state; +}; + +class TableFunctionOperatorX final : public StatefulOperatorX { +public: + using Base = StatefulOperatorX; + TableFunctionOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + Status init(const TPlanNode& tnode, RuntimeState* state) override; + Status prepare(doris::RuntimeState* state) override; + Status open(doris::RuntimeState* state) override; + + bool need_more_input_data(RuntimeState* state) const override { + auto& local_state = state->get_local_state(id())->cast(); + return !local_state._child_block->rows() && + local_state._child_source_state != SourceState::FINISHED; + } + + Status push(RuntimeState* state, vectorized::Block* input_block, + SourceState source_state) const override { + CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state); + if (input_block->rows() == 0) { + return Status::OK(); + } + + for (auto* fn : local_state._fns) { + RETURN_IF_ERROR(fn->process_init(input_block, state)); + } + RETURN_IF_ERROR(local_state.process_next_child_row()); + return Status::OK(); + } + + Status pull(RuntimeState* state, vectorized::Block* output_block, + SourceState& source_state) const override { + CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state); + RETURN_IF_ERROR(local_state.get_expanded_block(state, output_block, source_state)); + local_state.reached_limit(output_block, source_state); + return Status::OK(); + } + +private: + friend class TableFunctionLocalState; + + Status _prepare_output_slot_ids(const TPlanNode& tnode); + + /* Now the output tuples for table function node is base_table_tuple + tf1 + tf2 + ... + But not all slots are used, the real used slots are inside table_function_node.outputSlotIds. + For case like explode_bitmap: + SELECT a2,count(*) as a3 FROM A WHERE a1 IN + (SELECT c1 FROM B LATERAL VIEW explode_bitmap(b1) C as c1) + GROUP BY a2 ORDER BY a3; + Actually we only need to output column c1, no need to output columns in bitmap table B. + Copy large bitmap columns are very expensive and slow. + + Here we check if the slot is really used, otherwise we avoid copy it and just insert a default value. + + A better solution is: + 1. FE: create a new output tuple based on the real output slots; + 2. BE: refractor (V)TableFunctionNode output rows based no the new tuple; + */ + [[nodiscard]] inline bool _slot_need_copy(SlotId slot_id) const { + auto id = _output_slots[slot_id]->id(); + return (id < _output_slot_ids.size()) && (_output_slot_ids[id]); + } + + std::vector _child_slots; + std::vector _output_slots; + + vectorized::VExprContextSPtrs _vfn_ctxs; + + std::vector _fns; + int _fn_num = 0; + + std::vector _output_slot_ids; + std::vector _output_slot_indexs; + std::vector _useless_slot_indexs; + + std::vector _child_slot_sizes; +}; + } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index b29aaeb00b..c67ef6d29e 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -112,11 +112,11 @@ public: RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); } - const RowDescriptor& output_row_desc() const { - return operatorXs[operatorXs.size() - 1]->row_desc(); + [[nodiscard]] const RowDescriptor& output_row_desc() const { + return operatorXs.back()->row_desc(); } - PipelineId id() const { return _pipeline_id; } + [[nodiscard]] PipelineId id() const { return _pipeline_id; } private: void _init_profile(); diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index a9490bd967..068551da90 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -45,6 +45,7 @@ #include "pipeline/exec/sort_source_operator.h" #include "pipeline/exec/streaming_aggregation_sink_operator.h" #include "pipeline/exec/streaming_aggregation_source_operator.h" +#include "pipeline/exec/table_function_operator.h" #include "pipeline/exec/union_sink_operator.h" #include "pipeline/exec/union_source_operator.h" #include "util/debug_util.h" @@ -394,6 +395,7 @@ DECLARE_OPERATOR_X(OlapScanLocalState) DECLARE_OPERATOR_X(AnalyticLocalState) DECLARE_OPERATOR_X(SortLocalState) DECLARE_OPERATOR_X(AggLocalState) +DECLARE_OPERATOR_X(TableFunctionLocalState) DECLARE_OPERATOR_X(ExchangeLocalState) DECLARE_OPERATOR_X(RepeatLocalState) DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState) @@ -411,6 +413,7 @@ template class StreamingOperatorX; template class StatefulOperatorX; template class StatefulOperatorX; template class StatefulOperatorX; +template class StatefulOperatorX; template class PipelineXSinkLocalState; template class PipelineXSinkLocalState; diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index f219b68f2e..92f2b0b9da 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -94,7 +94,7 @@ public: // If use projection, we should clear `_origin_block`. void clear_origin_block(); - bool reached_limit() const; + [[nodiscard]] bool reached_limit() const; void reached_limit(vectorized::Block* block, SourceState& source_state); RuntimeProfile* profile() { return _runtime_profile.get(); } @@ -114,7 +114,7 @@ public: void add_num_rows_returned(int64_t delta) { _num_rows_returned += delta; } void set_num_rows_returned(int64_t value) { _num_rows_returned = value; } - virtual std::string debug_string(int indentation_level = 0) const; + [[nodiscard]] virtual std::string debug_string(int indentation_level = 0) const; protected: friend class OperatorXBase; @@ -181,9 +181,9 @@ public: } [[nodiscard]] std::string get_name() const override { return _op_name; } - virtual Status prepare(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; - virtual Status open(RuntimeState* state) override; + Status open(RuntimeState* state) override; Status finalize(RuntimeState* state) override { return Status::OK(); } @@ -201,7 +201,7 @@ public: return false; } - bool is_pending_finish() const override { + [[nodiscard]] bool is_pending_finish() const override { LOG(FATAL) << "should not reach here!"; return false; } @@ -228,7 +228,7 @@ public: return _row_descriptor; } - std::string debug_string() const override { return ""; } + [[nodiscard]] std::string debug_string() const override { return ""; } virtual std::string debug_string(int indentation_level = 0) const; @@ -305,7 +305,7 @@ public: OperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : OperatorXBase(pool, tnode, descs) {} OperatorX(ObjectPool* pool, int id) : OperatorXBase(pool, id) {}; - virtual ~OperatorX() = default; + ~OperatorX() override = default; Status setup_local_state(RuntimeState* state, LocalStateInfo& info) override; Status setup_local_states(RuntimeState* state, std::vector& info) override; @@ -317,7 +317,7 @@ class PipelineXLocalState : public PipelineXLocalStateBase { public: PipelineXLocalState(RuntimeState* state, OperatorXBase* parent) : PipelineXLocalStateBase(state, parent) {} - virtual ~PipelineXLocalState() {} + ~PipelineXLocalState() override = default; Status init(RuntimeState* state, LocalStateInfo& info) override { _runtime_profile.reset(new RuntimeProfile(_parent->get_name() + @@ -399,7 +399,7 @@ public: virtual Status close(RuntimeState* state, Status exec_status) = 0; virtual Status try_close(RuntimeState* state, Status exec_status) = 0; - virtual std::string debug_string(int indentation_level) const; + [[nodiscard]] virtual std::string debug_string(int indentation_level) const; template TARGET& cast() { @@ -421,7 +421,9 @@ public: RuntimeProfile* profile() { return _profile; } MemTracker* mem_tracker() { return _mem_tracker.get(); } QueryStatistics* query_statistics() { return _query_statistics.get(); } - RuntimeProfile* faker_runtime_profile() const { return _faker_runtime_profile.get(); } + [[nodiscard]] RuntimeProfile* faker_runtime_profile() const { + return _faker_runtime_profile.get(); + } RuntimeProfile::Counter* rows_input_counter() { return _rows_input_counter; } @@ -508,7 +510,7 @@ public: return false; } - bool is_pending_finish() const override { + [[nodiscard]] bool is_pending_finish() const override { LOG(FATAL) << "should not reach here!"; return false; } @@ -519,9 +521,10 @@ public: [[nodiscard]] std::string debug_string() const override { return ""; } - virtual std::string debug_string(int indentation_level) const; + [[nodiscard]] virtual std::string debug_string(int indentation_level) const; - virtual std::string debug_string(RuntimeState* state, int indentation_level) const; + [[nodiscard]] virtual std::string debug_string(RuntimeState* state, + int indentation_level) const; [[nodiscard]] bool is_sink() const override { return true; } @@ -531,7 +534,7 @@ public: return state->get_sink_local_state(id())->close(state, exec_status); } - virtual Status try_close(RuntimeState* state, Status exec_status) { + [[nodiscard]] virtual Status try_close(RuntimeState* state, Status exec_status) { return state->get_sink_local_state(id())->try_close(state, exec_status); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index f8ea574c7c..e8dc9f3f29 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -70,6 +70,7 @@ #include "pipeline/exec/sort_source_operator.h" #include "pipeline/exec/streaming_aggregation_sink_operator.h" #include "pipeline/exec/streaming_aggregation_source_operator.h" +#include "pipeline/exec/table_function_operator.h" #include "pipeline/exec/union_sink_operator.h" #include "pipeline/exec/union_source_operator.h" #include "pipeline/task_scheduler.h" @@ -740,6 +741,11 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN RETURN_IF_ERROR(cur_pipe->add_operator(op)); break; } + case TPlanNodeType::TABLE_FUNCTION_NODE: { + op.reset(new TableFunctionOperatorX(pool, tnode, descs)); + RETURN_IF_ERROR(cur_pipe->add_operator(op)); + break; + } case TPlanNodeType::ASSERT_NUM_ROWS_NODE: { op.reset(new AssertNumRowsOperatorX(pool, tnode, descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op));