diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index d65e8f96bd..ac6e7eae9a 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -25,8 +25,8 @@ #include #include +#include #include -#include #include #include "common/compiler_util.h" // IWYU pragma: keep @@ -84,16 +84,10 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl _tuple_ids(tnode.row_tuples), _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), _resource_profile(tnode.resource_profile), - _limit(tnode.limit), - _num_rows_returned(0), - _rows_returned_counter(nullptr), - _rows_returned_rate(nullptr), - _memory_used_counter(nullptr), - _peak_memory_usage_counter(nullptr), - _is_closed(false), - _ref(0) { + _limit(tnode.limit) { if (tnode.__isset.output_tuple_id) { - _output_row_descriptor.reset(new RowDescriptor(descs, {tnode.output_tuple_id}, {true})); + _output_row_descriptor = std::make_unique( + descs, std::vector {tnode.output_tuple_id}, std::vector {true}); } _query_statistics = std::make_shared(); } @@ -108,7 +102,7 @@ Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(tnode.vconjunct, context)); _conjuncts.emplace_back(context); } else if (tnode.__isset.conjuncts) { - for (auto& conjunct : tnode.conjuncts) { + for (const auto& conjunct : tnode.conjuncts) { vectorized::VExprContextSPtr context; RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(conjunct, context)); _conjuncts.emplace_back(context); @@ -136,8 +130,9 @@ Status ExecNode::prepare(RuntimeState* state) { _projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime"); _rows_returned_rate = runtime_profile()->add_derived_counter( ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND, - std::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, - runtime_profile()->total_time_counter()), + [this, capture0 = runtime_profile()->total_time_counter()] { + return RuntimeProfile::units_per_second(_rows_returned_counter, capture0); + }, ""); _memory_used_counter = ADD_LABEL_COUNTER(runtime_profile(), "MemoryUsage"); _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter( @@ -150,13 +145,13 @@ Status ExecNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, intermediate_row_desc())); - for (int i = 0; i < _children.size(); ++i) { - RETURN_IF_ERROR(_children[i]->prepare(state)); + for (auto& i : _children) { + RETURN_IF_ERROR(i->prepare(state)); } return Status::OK(); } -Status ExecNode::alloc_resource(doris::RuntimeState* state) { +Status ExecNode::alloc_resource(RuntimeState* state) { for (auto& conjunct : _conjuncts) { RETURN_IF_ERROR(conjunct->open(state)); } @@ -170,8 +165,8 @@ Status ExecNode::open(RuntimeState* state) { Status ExecNode::reset(RuntimeState* state) { _num_rows_returned = 0; - for (int i = 0; i < _children.size(); ++i) { - RETURN_IF_ERROR(_children[i]->reset(state)); + for (auto& i : _children) { + RETURN_IF_ERROR(i->reset(state)); } return Status::OK(); } @@ -199,8 +194,8 @@ Status ExecNode::close(RuntimeState* state) { _is_closed = true; Status result; - for (int i = 0; i < _children.size(); ++i) { - auto st = _children[i]->close(state); + for (auto& i : _children) { + auto st = i->close(state); if (result.ok() && !st.ok()) { result = st; } @@ -227,7 +222,7 @@ void ExecNode::add_runtime_exec_option(const std::string& str) { Status ExecNode::create_tree(RuntimeState* state, ObjectPool* pool, const TPlan& plan, const DescriptorTbl& descs, ExecNode** root) { - if (plan.nodes.size() == 0) { + if (plan.nodes.empty()) { *root = nullptr; return Status::OK(); } @@ -305,6 +300,7 @@ Status ExecNode::create_tree_helper(RuntimeState* state, ObjectPool* pool, return Status::OK(); } +// NOLINTBEGIN(readability-function-size) Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, ExecNode** node) { VLOG_CRITICAL << "tnode:\n" << apache::thrift::ThriftDebugString(tnode); @@ -428,8 +424,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN return Status::OK(); default: - std::map::const_iterator i = - _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); + auto i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); const char* str = "unknown node type"; if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) { @@ -443,6 +438,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN return Status::OK(); } +// NOLINTEND(readability-function-size) std::string ExecNode::debug_string() const { std::stringstream out; @@ -459,9 +455,9 @@ void ExecNode::debug_string(int indentation_level, std::stringstream* out) const } *out << "]"; - for (int i = 0; i < _children.size(); ++i) { + for (auto* i : _children) { *out << "\n"; - _children[i]->debug_string(indentation_level + 1, out); + i->debug_string(indentation_level + 1, out); } } @@ -470,8 +466,8 @@ void ExecNode::collect_nodes(TPlanNodeType::type node_type, std::vectorpush_back(this); } - for (int i = 0; i < _children.size(); ++i) { - _children[i]->collect_nodes(node_type, nodes); + for (auto& i : _children) { + i->collect_nodes(node_type, nodes); } } @@ -488,7 +484,7 @@ void ExecNode::collect_scan_nodes(vector* nodes) { void ExecNode::init_runtime_profile(const std::string& name) { std::stringstream ss; ss << name << " (id=" << _id << ")"; - _runtime_profile.reset(new RuntimeProfile(ss.str())); + _runtime_profile = std::make_unique(ss.str()); _runtime_profile->set_metadata(_id); } diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index f4b49cba6f..903122ecde 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -21,10 +21,10 @@ #pragma once #include -#include -#include #include +#include +#include #include #include #include @@ -267,7 +267,7 @@ protected: const TBackendResourceProfile _resource_profile; int64_t _limit; // -1: no limit - int64_t _num_rows_returned; + int64_t _num_rows_returned = 0; std::unique_ptr _runtime_profile; @@ -303,15 +303,6 @@ protected: bool is_closed() const { return _is_closed; } - // TODO(zc) - /// Pointer to the containing SubplanNode or nullptr if not inside a subplan. - /// Set by SubplanNode::Init(). Not owned. - // SubplanNode* containing_subplan_; - - /// Returns true if this node is inside the right-hand side plan tree of a SubplanNode. - /// Valid to call in or after Prepare(). - bool is_in_subplan() const { return false; } - // Create a single exec node derived from thrift node; place exec node in 'pool'. static Status create_node(RuntimeState* state, ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, ExecNode** node); @@ -334,9 +325,9 @@ private: ExecNode** root); friend class pipeline::OperatorBase; - bool _is_closed; + bool _is_closed = false; bool _is_resource_released = false; - std::atomic_int _ref; // used by pipeline operator to release resource. + std::atomic_int _ref = 0; // used by pipeline operator to release resource. }; } // namespace doris diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index a4f3ff55a5..6ac06ee5f1 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -17,13 +17,11 @@ #include "multi_cast_data_stream_source.h" -#include - #include "common/status.h" #include "pipeline/exec/multi_cast_data_streamer.h" #include "pipeline/exec/operator.h" -#include "runtime/query_statistics.h" #include "vec/core/block.h" +#include "vec/core/materialize_block.h" namespace doris::pipeline { @@ -108,7 +106,7 @@ Status MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto if (!_output_expr_contexts.empty() && output_block->rows() > 0) { RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( _output_expr_contexts, *output_block, block, true)); - materialize_block_inplace(*block); + vectorized::materialize_block_inplace(*block); } if (eos) { source_state = SourceState::FINISHED; @@ -176,7 +174,7 @@ Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state, if (!local_state._output_expr_contexts.empty() && output_block->rows() > 0) { RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( local_state._output_expr_contexts, *output_block, block, true)); - materialize_block_inplace(*block); + vectorized::materialize_block_inplace(*block); } COUNTER_UPDATE(local_state._rows_returned_counter, block->rows()); if (eos) { diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index cd5fba5fee..bf41c670e0 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -19,20 +19,16 @@ #include #include -#include +#include #include #include -#include #include -#include #include #include #include "common/status.h" #include "exec/exec_node.h" -#include "pipeline/pipeline_x/dependency.h" -#include "runtime/memory/mem_tracker.h" #include "runtime/runtime_state.h" #include "util/runtime_profile.h" #include "vec/core/block.h" @@ -105,7 +101,7 @@ using OperatorBuilders = std::vector; class OperatorBuilderBase { public: - OperatorBuilderBase(int32_t id, const std::string& name) : _id(id), _name(name) {} + OperatorBuilderBase(int32_t id, std::string name) : _id(id), _name(std::move(name)) {} virtual ~OperatorBuilderBase() = default; @@ -333,10 +329,7 @@ public: return Status::OK(); } - Status open(RuntimeState* state) override { - RETURN_IF_ERROR(_node->alloc_resource(state)); - return Status::OK(); - } + Status open(RuntimeState* state) override { return _node->alloc_resource(state); } Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override { @@ -413,8 +406,7 @@ class StatefulOperator : public StreamingOperator { public: StatefulOperator(OperatorBuilderBase* builder, ExecNode* node) : StreamingOperator(builder, node), - _child_block(vectorized::Block::create_shared()), - _child_source_state(SourceState::DEPEND_ON_SOURCE) {} + _child_block(vectorized::Block::create_shared()) {} virtual ~StatefulOperator() = default; @@ -454,7 +446,7 @@ public: protected: std::shared_ptr _child_block; - SourceState _child_source_state; + SourceState _child_source_state {SourceState::DEPEND_ON_SOURCE}; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 6c18cab03f..cb106d76ed 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -21,6 +21,7 @@ #include "pipeline/exec/operator.h" #include "vec/common/hash_table/hash_table_set_build.h" +#include "vec/core/materialize_block.h" #include "vec/exec/vset_operation_node.h" namespace doris { @@ -139,10 +140,10 @@ Status SetSinkOperatorX::_extract_build_column( block.get_by_position(result_col_id).column = block.get_by_position(result_col_id).column->convert_to_full_column_if_const(); - auto column = block.get_by_position(result_col_id).column.get(); + const auto* column = block.get_by_position(result_col_id).column.get(); - if (auto* nullable = check_and_get_column(*column)) { - auto& col_nested = nullable->get_nested_column(); + if (const auto* nullable = check_and_get_column(*column)) { + const auto& col_nested = nullable->get_nested_column(); if (local_state._shared_state->build_not_ignore_null[i]) { raw_ptrs[i] = nullable; } else { @@ -165,7 +166,7 @@ Status SetSinkLocalState::init(RuntimeState* state, LocalSinkState SCOPED_TIMER(_open_timer); _build_timer = ADD_TIMER(_profile, "BuildTime"); - Parent& parent = _parent->cast(); + auto& parent = _parent->cast(); _dependency->set_cur_child_id(parent._cur_child_id); _child_exprs.resize(parent._child_exprs.size()); for (size_t i = 0; i < _child_exprs.size(); i++) { @@ -175,16 +176,15 @@ Status SetSinkLocalState::init(RuntimeState* state, LocalSinkState _shared_state->child_quantity = parent._child_quantity; auto& child_exprs_lists = _shared_state->child_exprs_lists; - DCHECK(child_exprs_lists.size() == 0 || child_exprs_lists.size() == parent._child_quantity); - if (child_exprs_lists.size() == 0) { + DCHECK(child_exprs_lists.empty() || child_exprs_lists.size() == parent._child_quantity); + if (child_exprs_lists.empty()) { child_exprs_lists.resize(parent._child_quantity); } child_exprs_lists[parent._cur_child_id] = _child_exprs; _shared_state->hash_table_variants = std::make_unique(); - for (int i = 0; i < child_exprs_lists[0].size(); ++i) { - const auto& ctx = child_exprs_lists[0][i]; + for (const auto& ctx : child_exprs_lists[0]) { _shared_state->build_not_ignore_null.push_back(ctx->root()->is_nullable()); } _shared_state->hash_table_init(); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index b9c2382ce8..538a2ce1bd 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -22,13 +22,15 @@ #include #include #include -#include + +#include // IWYU pragma: no_include #include #include #include // IWYU pragma: keep #include +#include #include #include #include @@ -212,8 +214,7 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { return pipeline; } -Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request, - const size_t idx) { +Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request, size_t idx) { if (_prepared) { return Status::InternalError("Already prepared"); } @@ -299,16 +300,16 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re << local_params.per_node_scan_ranges.size(); // set scan range in ScanNode - for (int i = 0; i < scan_nodes.size(); ++i) { + for (auto& i : scan_nodes) { // TODO(cmy): this "if...else" should be removed once all ScanNode are derived from VScanNode. - ExecNode* node = scan_nodes[i]; + ExecNode* node = i; if (typeid(*node) == typeid(vectorized::NewOlapScanNode) || typeid(*node) == typeid(vectorized::NewFileScanNode) || typeid(*node) == typeid(vectorized::NewOdbcScanNode) || typeid(*node) == typeid(vectorized::NewEsScanNode) || typeid(*node) == typeid(vectorized::VMetaScanNode) || typeid(*node) == typeid(vectorized::NewJdbcScanNode)) { - auto* scan_node = static_cast(scan_nodes[i]); + auto* scan_node = static_cast(i); auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); const bool shared_scan = @@ -316,7 +317,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re scan_node->set_scan_ranges(_runtime_state.get(), scan_ranges); scan_node->set_shared_scan(_runtime_state.get(), shared_scan); } else { - ScanNode* scan_node = static_cast(node); + auto* scan_node = static_cast(node); auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); static_cast(scan_node->set_scan_ranges(_runtime_state.get(), scan_ranges)); @@ -850,10 +851,10 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr {false}) : sink_->row_desc(); // 1. create the data stream sender sink - _multi_cast_stream_sink_senders[i].reset(new vectorized::VDataStreamSender( + _multi_cast_stream_sink_senders[i] = std::make_unique( _runtime_state.get(), _runtime_state->obj_pool(), sender_id, row_desc, thrift_sink.multi_cast_stream_sink.sinks[i], - thrift_sink.multi_cast_stream_sink.destinations[i])); + thrift_sink.multi_cast_stream_sink.destinations[i]); // 2. create and set the source operator of multi_cast_data_stream_source for new pipeline OperatorBuilderPtr source_op = @@ -941,9 +942,10 @@ Status PipelineFragmentContext::send_report(bool done) { _fragment_instance_id, _backend_num, _runtime_state.get(), - std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), - std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, - std::placeholders::_2), + [this](auto&& PH1) { return update_status(std::forward(PH1)); }, + [this](auto&& PH1, auto&& PH2) { + cancel(std::forward(PH1), std::forward(PH2)); + }, _query_ctx->get_query_statistics()}, std::dynamic_pointer_cast(shared_from_this())); } diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 353e7a0658..a7a45d8f07 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -67,7 +67,7 @@ public: const std::function& call_back, const report_status_callback& report_status_cb); - virtual ~PipelineFragmentContext(); + ~PipelineFragmentContext() override; PipelinePtr add_pipeline(); @@ -89,7 +89,7 @@ public: int32_t next_operator_builder_id() { return _next_operator_builder_id++; } - Status prepare(const doris::TPipelineFragmentParams& request, const size_t idx); + Status prepare(const doris::TPipelineFragmentParams& request, size_t idx); virtual Status prepare(const doris::TPipelineFragmentParams& request) { return Status::InternalError("Pipeline fragment context do not implement prepare"); diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index a86d5ed90b..83d059ba20 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -26,6 +26,7 @@ #include #include +#include "common/status.h" #include "gutil/integral_types.h" #include "olap/lru_cache.h" #include "olap/olap_tuple.h" @@ -56,21 +57,22 @@ Status Reusable::init(const TDescriptorTable& t_desc_tbl, const std::vectorobj_pool(), t_desc_tbl, &_desc_tbl)); _runtime_state->set_desc_tbl(_desc_tbl); _block_pool.resize(block_size); - for (int i = 0; i < _block_pool.size(); ++i) { - _block_pool[i] = vectorized::Block::create_unique(tuple_desc()->slots(), 2); + for (auto& i : _block_pool) { + i = vectorized::Block::create_unique(tuple_desc()->slots(), 2); // Name is useless but cost space - _block_pool[i]->clear_names(); + i->clear_names(); } RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(output_exprs, _output_exprs_ctxs)); RowDescriptor row_desc(tuple_desc(), false); // Prepare the exprs to run. RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_exprs_ctxs, _runtime_state.get(), row_desc)); + RETURN_IF_ERROR(vectorized::VExpr::open(_output_exprs_ctxs, _runtime_state.get())); _create_timestamp = butil::gettimeofday_ms(); _data_type_serdes = vectorized::create_data_type_serdes(tuple_desc()->slots()); _col_default_values.resize(tuple_desc()->slots().size()); for (int i = 0; i < tuple_desc()->slots().size(); ++i) { - auto slot = tuple_desc()->slots()[i]; + auto* slot = tuple_desc()->slots()[i]; _col_uid_to_idx[slot->col_unique_id()] = i; _col_default_values[i] = slot->col_default_value(); } diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index 426de2d4f7..8b0008d6e2 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -23,9 +23,7 @@ #include "vec/columns/column_const.h" #include "vec/common/arena.h" #include "vec/common/assert_cast.h" -#include "vec/common/nan_utils.h" #include "vec/common/sip_hash.h" -#include "vec/common/typeid_cast.h" #include "vec/core/sort_block.h" #include "vec/data_types/data_type.h" #include "vec/utils/util.hpp" @@ -571,7 +569,9 @@ bool ColumnNullable::has_null(size_t size) const { } ColumnPtr make_nullable(const ColumnPtr& column, bool is_nullable) { - if (is_column_nullable(*column)) return column; + if (is_column_nullable(*column)) { + return column; + } if (is_column_const(*column)) { return ColumnConst::create( diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 83cbe82e32..91128fb69a 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -32,7 +32,6 @@ #include "olap/olap_common.h" #include "runtime/define_primitive_type.h" #include "vec/columns/column.h" -#include "vec/columns/column_impl.h" #include "vec/columns/column_vector.h" #include "vec/columns/columns_number.h" #include "vec/common/assert_cast.h" @@ -77,8 +76,7 @@ public: null_map_->assume_mutable()); } - template ::value>::type> + template ::value>> static MutablePtr create(Args&&... args) { return Base::create(std::forward(args)...); } diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index a44bc5513a..c918e26e6f 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -23,7 +23,6 @@ #include #include #include -#include #include "common/status.h" #include "exec/exec_node.h" @@ -57,22 +56,20 @@ class VJoinNodeBase : public ExecNode { public: VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - virtual Status prepare(RuntimeState* state) override; + Status prepare(RuntimeState* state) override; - virtual Status close(RuntimeState* state) override; + Status close(RuntimeState* state) override; - virtual Status open(RuntimeState* state) override; + Status open(RuntimeState* state) override; - virtual const RowDescriptor& row_desc() const override { return *_output_row_desc; } + const RowDescriptor& row_desc() const override { return *_output_row_desc; } - virtual const RowDescriptor& intermediate_row_desc() const override { - return *_intermediate_row_desc; - } + const RowDescriptor& intermediate_row_desc() const override { return *_intermediate_row_desc; } - virtual Status alloc_resource(RuntimeState* state) override; - virtual void release_resource(RuntimeState* state) override; + Status alloc_resource(RuntimeState* state) override; + void release_resource(RuntimeState* state) override; - virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; [[nodiscard]] bool can_terminate_early() override { return _short_circuit_for_probe; } diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 4f4634fbd6..3c47638ef4 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -22,8 +22,6 @@ #include #include -#include -#include #include #include #include @@ -31,12 +29,9 @@ #include "runtime/define_primitive_type.h" #include "runtime/runtime_state.h" -#include "util/defer_op.h" #include "vec/columns/column_nullable.h" -#include "vec/common/columns_hashing.h" #include "vec/common/hash_table/hash_table_set_build.h" #include "vec/common/hash_table/hash_table_set_probe.h" -#include "vec/common/uint128.h" #include "vec/core/column_with_type_and_name.h" #include "vec/core/materialize_block.h" #include "vec/core/types.h" @@ -100,6 +95,8 @@ Status VSetOperationNode::init(const TPlanNode& tnode, RuntimeStat template Status VSetOperationNode::alloc_resource(RuntimeState* state) { SCOPED_TIMER(_exec_timer); + // will open projections + RETURN_IF_ERROR(ExecNode::alloc_resource(state)); // open result expr lists. for (const VExprContextSPtrs& exprs : _child_expr_lists) { RETURN_IF_ERROR(VExpr::open(exprs, state)); diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index ae600c6490..b1ab9c4765 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -17,10 +17,7 @@ #pragma once -#include -#include - -#include +#include #include #include #include @@ -33,8 +30,6 @@ #include "vec/aggregate_functions/aggregate_function.h" #include "vec/columns/column.h" #include "vec/common/arena.h" -#include "vec/common/hash_table/hash_map.h" -#include "vec/common/string_ref.h" #include "vec/core/block.h" #include "vec/exec/join/process_hash_table_probe.h" #include "vec/exec/join/vhash_join_node.h" diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp index e77fd9fee9..eac8aa1b16 100644 --- a/be/src/vec/exec/vunion_node.cpp +++ b/be/src/vec/exec/vunion_node.cpp @@ -20,7 +20,6 @@ #include #include -#include #include #include #include @@ -66,8 +65,8 @@ Status VUnionNode::init(const TPlanNode& tnode, RuntimeState* state) { _const_expr_lists.push_back(ctxs); } // Create result_expr_ctx_lists_ from thrift exprs. - auto& result_texpr_lists = tnode.union_node.result_expr_lists; - for (auto& texprs : result_texpr_lists) { + const auto& result_texpr_lists = tnode.union_node.result_expr_lists; + for (const auto& texprs : result_texpr_lists) { VExprContextSPtrs ctxs; RETURN_IF_ERROR(VExpr::create_expr_trees(texprs, ctxs)); _child_expr_lists.push_back(ctxs); @@ -127,7 +126,6 @@ Status VUnionNode::alloc_resource(RuntimeState* state) { Status VUnionNode::get_next_pass_through(RuntimeState* state, Block* block) { DCHECK(!reached_limit()); - DCHECK(!is_in_subplan()); DCHECK_LT(_child_idx, _children.size()); DCHECK(is_child_passthrough(_child_idx)); if (_child_eos) { @@ -196,12 +194,6 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) { // incremented '_num_rows_returned' yet. DCHECK(!reached_limit()); if (_child_eos) { - // Unless we are inside a subplan expecting to call open()/get_next() on the child - // again, the child can be closed at this point. - // TODO: Recheck whether is_in_subplan() is right - // if (!is_in_subplan()) { - // child(_child_idx)->close(state); - // } ++_child_idx; } } @@ -275,7 +267,6 @@ Status VUnionNode::get_next(RuntimeState* state, Block* block, bool* eos) { // The previous child needs to be closed if passthrough was enabled for it. In the non // passthrough case, the child was already closed in the previous call to get_next(). DCHECK(is_child_passthrough(_to_close_child_idx)); - DCHECK(!is_in_subplan()); static_cast(child(_to_close_child_idx)->close(state)); _to_close_child_idx = -1; } @@ -317,8 +308,8 @@ void VUnionNode::debug_string(int indentation_level, std::stringstream* out) con *out << string(indentation_level * 2, ' '); *out << "_union(_first_materialized_child_idx=" << _first_materialized_child_idx << " _child_expr_lists=["; - for (int i = 0; i < _child_expr_lists.size(); ++i) { - *out << VExpr::debug_string(_child_expr_lists[i]) << ", "; + for (const auto& _child_expr_list : _child_expr_lists) { + *out << VExpr::debug_string(_child_expr_list) << ", "; } *out << "] \n"; ExecNode::debug_string(indentation_level, out); @@ -329,9 +320,9 @@ Status VUnionNode::materialize_block(Block* src_block, int child_idx, Block* res SCOPED_TIMER(_exec_timer); const auto& child_exprs = _child_expr_lists[child_idx]; ColumnsWithTypeAndName colunms; - for (size_t i = 0; i < child_exprs.size(); ++i) { + for (const auto& child_expr : child_exprs) { int result_column_id = -1; - RETURN_IF_ERROR(child_exprs[i]->execute(src_block, &result_column_id)); + RETURN_IF_ERROR(child_expr->execute(src_block, &result_column_id)); colunms.emplace_back(src_block->get_by_position(result_column_id)); } _child_row_idx += src_block->rows(); diff --git a/be/src/vec/exprs/vbitmap_predicate.cpp b/be/src/vec/exprs/vbitmap_predicate.cpp index 0e158298d8..8116311247 100644 --- a/be/src/vec/exprs/vbitmap_predicate.cpp +++ b/be/src/vec/exprs/vbitmap_predicate.cpp @@ -66,19 +66,23 @@ doris::Status vectorized::VBitmapPredicate::prepare(doris::RuntimeState* state, auto column = child->data_type()->create_column(); argument_template.emplace_back(std::move(column), child->data_type(), child->expr_name()); } + _prepare_finished = true; return Status::OK(); } doris::Status vectorized::VBitmapPredicate::open(doris::RuntimeState* state, vectorized::VExprContext* context, FunctionContext::FunctionStateScope scope) { + DCHECK(_prepare_finished); RETURN_IF_ERROR(VExpr::open(state, context, scope)); + _open_finished = true; return Status::OK(); } doris::Status vectorized::VBitmapPredicate::execute(vectorized::VExprContext* context, doris::vectorized::Block* block, int* result_column_id) { + DCHECK(_open_finished || _getting_const_col); doris::vectorized::ColumnNumbers arguments(_children.size()); for (int i = 0; i < _children.size(); ++i) { int column_id = -1; diff --git a/be/src/vec/exprs/vbloom_predicate.cpp b/be/src/vec/exprs/vbloom_predicate.cpp index f72657c528..08f891b0e5 100644 --- a/be/src/vec/exprs/vbloom_predicate.cpp +++ b/be/src/vec/exprs/vbloom_predicate.cpp @@ -60,12 +60,15 @@ Status VBloomPredicate::prepare(RuntimeState* state, const RowDescriptor& desc, } _be_exec_version = state->be_exec_version(); + _prepare_finished = true; return Status::OK(); } Status VBloomPredicate::open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) { + DCHECK(_prepare_finished); RETURN_IF_ERROR(VExpr::open(state, context, scope)); + _open_finished = true; return Status::OK(); } @@ -74,6 +77,7 @@ void VBloomPredicate::close(VExprContext* context, FunctionContext::FunctionStat } Status VBloomPredicate::execute(VExprContext* context, Block* block, int* result_column_id) { + DCHECK(_open_finished || _getting_const_col); doris::vectorized::ColumnNumbers arguments(_children.size()); for (int i = 0; i < _children.size(); ++i) { int column_id = -1; diff --git a/be/src/vec/exprs/vcase_expr.cpp b/be/src/vec/exprs/vcase_expr.cpp index e09d62bfb2..dee60b5a6f 100644 --- a/be/src/vec/exprs/vcase_expr.cpp +++ b/be/src/vec/exprs/vcase_expr.cpp @@ -74,22 +74,26 @@ Status VCaseExpr::prepare(RuntimeState* state, const RowDescriptor& desc, VExprC } VExpr::register_function_context(state, context); + _prepare_finished = true; return Status::OK(); } Status VCaseExpr::open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) { - for (int i = 0; i < _children.size(); ++i) { - RETURN_IF_ERROR(_children[i]->open(state, context, scope)); + DCHECK(_prepare_finished); + for (auto& i : _children) { + RETURN_IF_ERROR(i->open(state, context, scope)); } RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function)); if (scope == FunctionContext::FRAGMENT_LOCAL) { RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); } + _open_finished = true; return Status::OK(); } void VCaseExpr::close(VExprContext* context, FunctionContext::FunctionStateScope scope) { + DCHECK(_prepare_finished); VExpr::close_function_context(context, scope, _function); VExpr::close(context, scope); } @@ -98,6 +102,7 @@ Status VCaseExpr::execute(VExprContext* context, Block* block, int* result_colum if (is_const_and_have_executed()) { // const have execute in open function return get_result_from_const(block, _expr_name, result_column_id); } + DCHECK(_open_finished || _getting_const_col); ColumnNumbers arguments(_children.size()); for (int i = 0; i < _children.size(); i++) { int column_id = -1; diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp index 3207ba5b54..f322c1d2fa 100644 --- a/be/src/vec/exprs/vcast_expr.cpp +++ b/be/src/vec/exprs/vcast_expr.cpp @@ -76,6 +76,7 @@ doris::Status VCastExpr::prepare(doris::RuntimeState* state, const doris::RowDes VExpr::register_function_context(state, context); _expr_name = fmt::format("(CAST {}({}) TO {})", child_name, child->data_type()->get_name(), _target_data_type_name); + _prepare_finished = true; return Status::OK(); } @@ -85,6 +86,7 @@ const DataTypePtr& VCastExpr::get_target_type() const { doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) { + DCHECK(_prepare_finished); for (int i = 0; i < _children.size(); ++i) { RETURN_IF_ERROR(_children[i]->open(state, context, scope)); } @@ -92,6 +94,7 @@ doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext* context, if (scope == FunctionContext::FRAGMENT_LOCAL) { RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); } + _open_finished = true; return Status::OK(); } @@ -102,6 +105,7 @@ void VCastExpr::close(VExprContext* context, FunctionContext::FunctionStateScope doris::Status VCastExpr::execute(VExprContext* context, doris::vectorized::Block* block, int* result_column_id) { + DCHECK(_open_finished || _getting_const_col); // for each child call execute int column_id = 0; RETURN_IF_ERROR(_children[0]->execute(context, block, &column_id)); diff --git a/be/src/vec/exprs/vcolumn_ref.h b/be/src/vec/exprs/vcolumn_ref.h index 25e35dec4a..a763797880 100644 --- a/be/src/vec/exprs/vcolumn_ref.h +++ b/be/src/vec/exprs/vcolumn_ref.h @@ -43,10 +43,20 @@ public: "VColumnRef have invalid slot id: {}, _column_name: {}, desc: {}", _column_id, _column_name, desc.debug_string()); } + _prepare_finished = true; + return Status::OK(); + } + + Status open(RuntimeState* state, VExprContext* context, + FunctionContext::FunctionStateScope scope) override { + DCHECK(_prepare_finished); + RETURN_IF_ERROR(VExpr::open(state, context, scope)); + _open_finished = true; return Status::OK(); } Status execute(VExprContext* context, Block* block, int* result_column_id) override { + DCHECK(_open_finished || _getting_const_col); *result_column_id = _column_id; return Status::OK(); } diff --git a/be/src/vec/exprs/vdirect_in_predicate.h b/be/src/vec/exprs/vdirect_in_predicate.h index 5211e01346..a68a6c3121 100644 --- a/be/src/vec/exprs/vdirect_in_predicate.h +++ b/be/src/vec/exprs/vdirect_in_predicate.h @@ -18,6 +18,7 @@ #pragma once #include "common/status.h" +#include "exprs/hybrid_set.h" #include "vec/exprs/vexpr.h" namespace doris::vectorized { @@ -29,7 +30,23 @@ public: : VExpr(node), _filter(nullptr), _expr_name("direct_in_predicate") {} ~VDirectInPredicate() override = default; + Status prepare(RuntimeState* state, const RowDescriptor& row_desc, + VExprContext* context) override { + RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, row_desc, context)); + _prepare_finished = true; + return Status::OK(); + } + + Status open(RuntimeState* state, VExprContext* context, + FunctionContext::FunctionStateScope scope) override { + DCHECK(_prepare_finished); + RETURN_IF_ERROR(VExpr::open(state, context, scope)); + _open_finished = true; + return Status::OK(); + } + Status execute(VExprContext* context, Block* block, int* result_column_id) override { + DCHECK(_open_finished || _getting_const_col); ColumnNumbers arguments(_children.size()); for (int i = 0; i < _children.size(); ++i) { int column_id = -1; @@ -47,7 +64,7 @@ public: if (argument_column->is_nullable()) { auto column_nested = static_cast(argument_column.get()) ->get_nested_column_ptr(); - auto& null_map = + const auto& null_map = static_cast(argument_column.get())->get_null_map_data(); _filter->find_batch_nullable(*column_nested, sz, null_map, res_data_column->get_data()); } else { diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp b/be/src/vec/exprs/vectorized_fn_call.cpp index 48522b3550..bf38185f7d 100644 --- a/be/src/vec/exprs/vectorized_fn_call.cpp +++ b/be/src/vec/exprs/vectorized_fn_call.cpp @@ -21,8 +21,6 @@ #include // IWYU pragma: keep #include -#include -#include #include #include #include @@ -115,19 +113,21 @@ Status VectorizedFnCall::prepare(RuntimeState* state, const RowDescriptor& desc, VExpr::register_function_context(state, context); _function_name = _fn.name.function_name; _can_fast_execute = _function->can_fast_execute(); - + _prepare_finished = true; return Status::OK(); } Status VectorizedFnCall::open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) { - for (int i = 0; i < _children.size(); ++i) { - RETURN_IF_ERROR(_children[i]->open(state, context, scope)); + DCHECK(_prepare_finished); + for (auto& i : _children) { + RETURN_IF_ERROR(i->open(state, context, scope)); } RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function)); if (scope == FunctionContext::FRAGMENT_LOCAL) { RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); } + _open_finished = true; return Status::OK(); } @@ -142,6 +142,7 @@ Status VectorizedFnCall::execute(VExprContext* context, vectorized::Block* block return get_result_from_const(block, _expr_name, result_column_id); } + DCHECK(_open_finished || _getting_const_col) << debug_string(); // TODO: not execute const expr again, but use the const column in function context vectorized::ColumnNumbers arguments(_children.size()); for (int i = 0; i < _children.size(); ++i) { @@ -188,9 +189,9 @@ bool VectorizedFnCall::fast_execute(FunctionContext* context, Block& block, block.get_by_name(result_column_name).column->convert_to_full_column_if_const(); auto& result_info = block.get_by_position(result); if (result_info.type->is_nullable()) { - block.replace_by_position(result, - ColumnNullable::create(std::move(result_column), - ColumnUInt8::create(input_rows_count, 0))); + block.replace_by_position( + result, + ColumnNullable::create(result_column, ColumnUInt8::create(input_rows_count, 0))); } else { block.replace_by_position(result, std::move(result_column)); } @@ -208,7 +209,7 @@ std::string VectorizedFnCall::debug_string() const { out << _expr_name; out << "]{"; bool first = true; - for (auto& input_expr : children()) { + for (const auto& input_expr : children()) { if (first) { first = false; } else { diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp index 7270126e56..4f6b984e8f 100644 --- a/be/src/vec/exprs/vexpr.cpp +++ b/be/src/vec/exprs/vexpr.cpp @@ -17,6 +17,7 @@ #include "vec/exprs/vexpr.h" +#include #include #include @@ -27,7 +28,6 @@ #include "common/config.h" #include "common/exception.h" -#include "common/object_pool.h" #include "common/status.h" #include "vec/columns/column_vector.h" #include "vec/columns/columns_number.h" @@ -40,6 +40,7 @@ #include "vec/exprs/vcompound_pred.h" #include "vec/exprs/vectorized_fn_call.h" #include "vec/exprs/vexpr_context.h" +#include "vec/exprs/vexpr_fwd.h" #include "vec/exprs/vin_predicate.h" #include "vec/exprs/vinfo_func.h" #include "vec/exprs/vlambda_function_call_expr.h" @@ -55,6 +56,9 @@ namespace doris { class RowDescriptor; class RuntimeState; + +// NOLINTBEGIN(readability-function-cognitive-complexity) +// NOLINTBEGIN(readability-function-size) TExprNode create_texpr_node_from(const void* data, const PrimitiveType& type, int precision, int scale) { TExprNode node; @@ -146,6 +150,8 @@ TExprNode create_texpr_node_from(const void* data, const PrimitiveType& type, in } return node; } +// NOLINTEND(readability-function-size) +// NOLINTEND(readability-function-cognitive-complexity) } // namespace doris namespace doris::vectorized { @@ -162,9 +168,7 @@ bool VExpr::is_acting_on_a_slot(const VExpr& expr) { VExpr::VExpr(const TExprNode& node) : _node_type(node.node_type), _opcode(node.__isset.opcode ? node.opcode : TExprOpcode::INVALID_OPCODE), - _type(TypeDescriptor::from_thrift(node.type)), - _fn_context_index(-1), - _prepared(false) { + _type(TypeDescriptor::from_thrift(node.type)) { if (node.__isset.fn) { _fn = node.fn; } @@ -183,10 +187,7 @@ VExpr::VExpr(const TExprNode& node) VExpr::VExpr(const VExpr& vexpr) = default; VExpr::VExpr(TypeDescriptor type, bool is_slotref, bool is_nullable) - : _opcode(TExprOpcode::INVALID_OPCODE), - _type(std::move(type)), - _fn_context_index(-1), - _prepared(false) { + : _opcode(TExprOpcode::INVALID_OPCODE), _type(std::move(type)) { if (is_slotref) { _node_type = TExprNodeType::SLOT_REF; } @@ -221,11 +222,12 @@ Status VExpr::open(RuntimeState* state, VExprContext* context, } void VExpr::close(VExprContext* context, FunctionContext::FunctionStateScope scope) { - for (int i = 0; i < _children.size(); ++i) { - _children[i]->close(context, scope); + for (auto& i : _children) { + i->close(context, scope); } } +// NOLINTBEGIN(readability-function-size) Status VExpr::create_expr(const TExprNode& expr_node, VExprSPtr& expr) { try { switch (expr_node.node_type) { @@ -326,6 +328,7 @@ Status VExpr::create_expr(const TExprNode& expr_node, VExprSPtr& expr) { } return Status::OK(); } +// NOLINTEND(readability-function-size) Status VExpr::create_tree_from_thrift(const std::vector& nodes, int* node_idx, VExprSPtr& root_expr, VExprContextSPtr& ctx) { @@ -348,7 +351,7 @@ Status VExpr::create_tree_from_thrift(const std::vector& nodes, int* // non-recursive traversal std::stack> s; - s.push({root, root_children}); + s.emplace(root, root_children); while (!s.empty()) { auto& parent = s.top(); if (parent.second > 1) { @@ -366,14 +369,14 @@ Status VExpr::create_tree_from_thrift(const std::vector& nodes, int* parent.first->add_child(expr); int num_children = nodes[*node_idx].num_children; if (num_children > 0) { - s.push({expr, num_children}); + s.emplace(expr, num_children); } } return Status::OK(); } Status VExpr::create_expr_tree(const TExpr& texpr, VExprContextSPtr& ctx) { - if (texpr.nodes.size() == 0) { + if (texpr.nodes.empty()) { ctx = nullptr; return Status::OK(); } @@ -395,9 +398,9 @@ Status VExpr::create_expr_tree(const TExpr& texpr, VExprContextSPtr& ctx) { Status VExpr::create_expr_trees(const std::vector& texprs, VExprContextSPtrs& ctxs) { ctxs.clear(); - for (int i = 0; i < texprs.size(); ++i) { + for (const auto& texpr : texprs) { VExprContextSPtr ctx; - RETURN_IF_ERROR(create_expr_tree(texprs[i], ctx)); + RETURN_IF_ERROR(create_expr_tree(texpr, ctx)); ctxs.push_back(ctx); } return Status::OK(); @@ -412,8 +415,8 @@ Status VExpr::prepare(const VExprContextSPtrs& ctxs, RuntimeState* state, } Status VExpr::open(const VExprContextSPtrs& ctxs, RuntimeState* state) { - for (int i = 0; i < ctxs.size(); ++i) { - RETURN_IF_ERROR(ctxs[i]->open(state)); + for (const auto& ctx : ctxs) { + RETURN_IF_ERROR(ctx->open(state)); } return Status::OK(); } @@ -423,8 +426,8 @@ Status VExpr::clone_if_not_exists(const VExprContextSPtrs& ctxs, RuntimeState* s if (!new_ctxs.empty()) { // 'ctxs' was already cloned into '*new_ctxs', nothing to do. DCHECK_EQ(new_ctxs.size(), ctxs.size()); - for (int i = 0; i < new_ctxs.size(); ++i) { - DCHECK(new_ctxs[i]->_is_clone); + for (auto& new_ctx : new_ctxs) { + DCHECK(new_ctx->_is_clone); } return Status::OK(); } @@ -461,20 +464,15 @@ std::string VExpr::debug_string(const VExprSPtrs& exprs) { std::string VExpr::debug_string(const VExprContextSPtrs& ctxs) { VExprSPtrs exprs; - for (int i = 0; i < ctxs.size(); ++i) { - exprs.push_back(ctxs[i]->root()); + for (const auto& ctx : ctxs) { + exprs.push_back(ctx->root()); } return debug_string(exprs); } bool VExpr::is_constant() const { - for (int i = 0; i < _children.size(); ++i) { - if (!_children[i]->is_constant()) { - return false; - } - } - - return true; + return std::all_of(_children.begin(), _children.end(), + [](const VExprSPtr& expr) { return expr->is_constant(); }); } Status VExpr::get_const_col(VExprContext* context, @@ -494,7 +492,11 @@ Status VExpr::get_const_col(VExprContext* context, // If block is empty, some functions will produce no result. So we insert a column with // single value here. block.insert({ColumnUInt8::create(1), std::make_shared(), ""}); + + _getting_const_col = true; RETURN_IF_ERROR(execute(context, &block, &result)); + _getting_const_col = false; + DCHECK(result != -1); const auto& column = block.get_by_position(result).column; _constant_col = std::make_shared(column); @@ -507,8 +509,8 @@ Status VExpr::get_const_col(VExprContext* context, void VExpr::register_function_context(RuntimeState* state, VExprContext* context) { std::vector arg_types; - for (int i = 0; i < _children.size(); ++i) { - arg_types.push_back(_children[i]->type()); + for (auto& i : _children) { + arg_types.push_back(i->type()); } _fn_context_index = context->register_function_context(state, _type, arg_types); diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index b6a2b4ac6b..a852afeb2d 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -21,15 +21,14 @@ #include #include #include -#include +#include #include #include #include #include #include -#include "common/factory_creator.h" #include "common/status.h" #include "runtime/define_primitive_type.h" #include "runtime/large_int_value.h" @@ -57,10 +56,9 @@ namespace vectorized { #define RETURN_IF_ERROR_OR_PREPARED(stmt) \ if (_prepared) { \ return Status::OK(); \ - } else { \ - _prepared = true; \ - RETURN_IF_ERROR(stmt); \ - } + } \ + _prepared = true; \ + RETURN_IF_ERROR(stmt); // VExpr should be used as shared pointer because it will be passed between classes // like runtime filter to scan node, or from scannode to scanner. We could not make sure @@ -106,6 +104,14 @@ public: virtual Status open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope); + // before execute, check if expr has been parepared+opened. + [[maybe_unused]] Status ready_status() const { + if (_prepare_finished && _open_finished) { + return Status::OK(); + } + return Status::InternalError(expr_name() + " is not ready when execute"); + } + virtual Status execute(VExprContext* context, Block* block, int* result_column_id) = 0; /// Subclasses overriding this function should call VExpr::Close(). @@ -156,6 +162,8 @@ public: static std::string debug_string(const VExprSPtrs& exprs); static std::string debug_string(const VExprContextSPtrs& ctxs); + void set_getting_const_col(bool val = true) { _getting_const_col = val; } + bool is_and_expr() const { return _fn.name.function_name == "and"; } virtual bool is_compound_predicate() const { return false; } @@ -254,63 +262,69 @@ protected: /// Index to pass to ExprContext::fn_context() to retrieve this expr's FunctionContext. /// Set in RegisterFunctionContext(). -1 if this expr does not need a FunctionContext and /// doesn't call RegisterFunctionContext(). - int _fn_context_index; + int _fn_context_index = -1; // If this expr is constant, this will store and cache the value generated by // get_const_col() std::shared_ptr _constant_col; - bool _prepared; + bool _prepared = false; // for base class VExpr + bool _getting_const_col = + false; // if true, current execute() is in prepare() (that is, can't check _prepared) + // for concrete classes + bool _prepare_finished = false; + bool _open_finished = false; }; } // namespace vectorized +// NOLINTBEGIN(readability-function-size) template Status create_texpr_literal_node(const void* data, TExprNode* node, int precision = 0, int scale = 0) { if constexpr (T == TYPE_BOOLEAN) { - auto origin_value = reinterpret_cast(data); + const auto* origin_value = reinterpret_cast(data); TBoolLiteral boolLiteral; (*node).__set_node_type(TExprNodeType::BOOL_LITERAL); boolLiteral.__set_value(*origin_value); (*node).__set_bool_literal(boolLiteral); (*node).__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); } else if constexpr (T == TYPE_TINYINT) { - auto origin_value = reinterpret_cast(data); + const auto* origin_value = reinterpret_cast(data); (*node).__set_node_type(TExprNodeType::INT_LITERAL); TIntLiteral intLiteral; intLiteral.__set_value(*origin_value); (*node).__set_int_literal(intLiteral); (*node).__set_type(create_type_desc(PrimitiveType::TYPE_TINYINT)); } else if constexpr (T == TYPE_SMALLINT) { - auto origin_value = reinterpret_cast(data); + const auto* origin_value = reinterpret_cast(data); (*node).__set_node_type(TExprNodeType::INT_LITERAL); TIntLiteral intLiteral; intLiteral.__set_value(*origin_value); (*node).__set_int_literal(intLiteral); (*node).__set_type(create_type_desc(PrimitiveType::TYPE_SMALLINT)); } else if constexpr (T == TYPE_INT) { - auto origin_value = reinterpret_cast(data); + const auto* origin_value = reinterpret_cast(data); (*node).__set_node_type(TExprNodeType::INT_LITERAL); TIntLiteral intLiteral; intLiteral.__set_value(*origin_value); (*node).__set_int_literal(intLiteral); (*node).__set_type(create_type_desc(PrimitiveType::TYPE_INT)); } else if constexpr (T == TYPE_BIGINT) { - auto origin_value = reinterpret_cast(data); + const auto* origin_value = reinterpret_cast(data); (*node).__set_node_type(TExprNodeType::INT_LITERAL); TIntLiteral intLiteral; intLiteral.__set_value(*origin_value); (*node).__set_int_literal(intLiteral); (*node).__set_type(create_type_desc(PrimitiveType::TYPE_BIGINT)); } else if constexpr (T == TYPE_LARGEINT) { - auto origin_value = reinterpret_cast(data); + const auto* origin_value = reinterpret_cast(data); (*node).__set_node_type(TExprNodeType::LARGE_INT_LITERAL); TLargeIntLiteral large_int_literal; large_int_literal.__set_value(LargeIntValue::to_string(*origin_value)); (*node).__set_large_int_literal(large_int_literal); (*node).__set_type(create_type_desc(PrimitiveType::TYPE_LARGEINT)); } else if constexpr ((T == TYPE_DATE) || (T == TYPE_DATETIME) || (T == TYPE_TIME)) { - auto origin_value = reinterpret_cast(data); + const auto* origin_value = reinterpret_cast(data); TDateLiteral date_literal; char convert_buffer[30]; origin_value->to_string(convert_buffer); @@ -325,7 +339,7 @@ Status create_texpr_literal_node(const void* data, TExprNode* node, int precisio (*node).__set_type(create_type_desc(PrimitiveType::TYPE_TIME)); } } else if constexpr (T == TYPE_DATEV2) { - auto origin_value = reinterpret_cast*>(data); + const auto* origin_value = reinterpret_cast*>(data); TDateLiteral date_literal; char convert_buffer[30]; origin_value->to_string(convert_buffer); @@ -334,7 +348,7 @@ Status create_texpr_literal_node(const void* data, TExprNode* node, int precisio (*node).__set_node_type(TExprNodeType::DATE_LITERAL); (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATEV2)); } else if constexpr (T == TYPE_DATETIMEV2) { - auto origin_value = reinterpret_cast*>(data); + const auto* origin_value = reinterpret_cast*>(data); TDateLiteral date_literal; char convert_buffer[30]; origin_value->to_string(convert_buffer); @@ -343,28 +357,28 @@ Status create_texpr_literal_node(const void* data, TExprNode* node, int precisio (*node).__set_node_type(TExprNodeType::DATE_LITERAL); (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATETIMEV2)); } else if constexpr (T == TYPE_DECIMALV2) { - auto origin_value = reinterpret_cast(data); + const auto* origin_value = reinterpret_cast(data); (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL); TDecimalLiteral decimal_literal; decimal_literal.__set_value(origin_value->to_string()); (*node).__set_decimal_literal(decimal_literal); (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMALV2, precision, scale)); } else if constexpr (T == TYPE_DECIMAL32) { - auto origin_value = reinterpret_cast*>(data); + const auto* origin_value = reinterpret_cast*>(data); (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL); TDecimalLiteral decimal_literal; decimal_literal.__set_value(origin_value->to_string(scale)); (*node).__set_decimal_literal(decimal_literal); (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL32, precision, scale)); } else if constexpr (T == TYPE_DECIMAL64) { - auto origin_value = reinterpret_cast*>(data); + const auto* origin_value = reinterpret_cast*>(data); (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL); TDecimalLiteral decimal_literal; decimal_literal.__set_value(origin_value->to_string(scale)); (*node).__set_decimal_literal(decimal_literal); (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL64, precision, scale)); } else if constexpr (T == TYPE_DECIMAL128I) { - auto origin_value = reinterpret_cast*>(data); + const auto* origin_value = reinterpret_cast*>(data); (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL); TDecimalLiteral decimal_literal; decimal_literal.__set_value(origin_value->to_string(scale)); @@ -378,21 +392,21 @@ Status create_texpr_literal_node(const void* data, TExprNode* node, int precisio (*node).__set_decimal_literal(decimal_literal); (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL256, precision, scale)); } else if constexpr (T == TYPE_FLOAT) { - auto origin_value = reinterpret_cast(data); + const auto* origin_value = reinterpret_cast(data); (*node).__set_node_type(TExprNodeType::FLOAT_LITERAL); TFloatLiteral float_literal; float_literal.__set_value(*origin_value); (*node).__set_float_literal(float_literal); (*node).__set_type(create_type_desc(PrimitiveType::TYPE_FLOAT)); } else if constexpr (T == TYPE_DOUBLE) { - auto origin_value = reinterpret_cast(data); + const auto* origin_value = reinterpret_cast(data); (*node).__set_node_type(TExprNodeType::FLOAT_LITERAL); TFloatLiteral float_literal; float_literal.__set_value(*origin_value); (*node).__set_float_literal(float_literal); (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DOUBLE)); } else if constexpr ((T == TYPE_STRING) || (T == TYPE_CHAR) || (T == TYPE_VARCHAR)) { - auto origin_value = reinterpret_cast(data); + const auto* origin_value = reinterpret_cast(data); (*node).__set_node_type(TExprNodeType::STRING_LITERAL); TStringLiteral string_literal; string_literal.__set_value(origin_value->to_string()); @@ -403,6 +417,7 @@ Status create_texpr_literal_node(const void* data, TExprNode* node, int precisio } return Status::OK(); } +// NOLINTEND(readability-function-size) TExprNode create_texpr_node_from(const void* data, const PrimitiveType& type, int precision = 0, int scale = 0); diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index 35eaae5c60..cebb7dd2e5 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -17,17 +17,14 @@ #include "vec/exprs/vexpr_context.h" -#include #include #include #include "common/compiler_util.h" // IWYU pragma: keep #include "common/exception.h" -#include "common/object_pool.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "udf/udf.h" -#include "util/stack_util.h" #include "vec/columns/column_const.h" #include "vec/core/column_with_type_and_name.h" #include "vec/core/columns_with_type_and_name.h" @@ -38,13 +35,6 @@ class RowDescriptor; } // namespace doris namespace doris::vectorized { -VExprContext::VExprContext(const VExprSPtr& expr) - : _root(expr), - _is_clone(false), - _prepared(false), - _opened(false), - _last_result_column_id(-1) {} - VExprContext::~VExprContext() { // In runtime filter, only create expr context to get expr root, will not call // prepare or open, so that it is not need to call close. And call close may core @@ -154,19 +144,19 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, return execute_conjuncts(ctxs, filters, false, block, result_filter, can_filter_all); } -// TODO Performance Optimization +// TODO: Performance Optimization Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, const std::vector* filters, - const bool accept_null, Block* block, + bool accept_null, Block* block, IColumn::Filter* result_filter, bool* can_filter_all) { DCHECK(result_filter->size() == block->rows()); *can_filter_all = false; auto* __restrict result_filter_data = result_filter->data(); - for (auto& ctx : ctxs) { + for (const auto& ctx : ctxs) { int result_column_id = -1; RETURN_IF_ERROR(ctx->execute(block, &result_column_id)); ColumnPtr& filter_column = block->get_by_position(result_column_id).column; - if (auto* nullable_column = check_and_get_column(*filter_column)) { + if (const auto* nullable_column = check_and_get_column(*filter_column)) { size_t column_size = nullable_column->size(); if (column_size == 0) { *can_filter_all = true; @@ -175,9 +165,9 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); const IColumn::Filter& filter = assert_cast(*nested_column).get_data(); - auto* __restrict filter_data = filter.data(); + const auto* __restrict filter_data = filter.data(); const size_t size = filter.size(); - auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); + const auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); if (accept_null) { for (size_t i = 0; i < size; ++i) { @@ -194,7 +184,7 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, return Status::OK(); } } - } else if (auto* const_column = check_and_get_column(*filter_column)) { + } else if (const auto* const_column = check_and_get_column(*filter_column)) { // filter all if (!const_column->get_bool(0)) { *can_filter_all = true; @@ -204,7 +194,7 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, } else { const IColumn::Filter& filter = assert_cast(*filter_column).get_data(); - auto* __restrict filter_data = filter.data(); + const auto* __restrict filter_data = filter.data(); const size_t size = filter.size(); for (size_t i = 0; i < size; ++i) { @@ -297,7 +287,7 @@ Status VExprContext::get_output_block_after_execute_exprs( auto rows = input_block.rows(); vectorized::Block tmp_block(input_block.get_columns_with_type_and_name()); vectorized::ColumnsWithTypeAndName result_columns; - for (auto& vexpr_ctx : output_vexpr_ctxs) { + for (const auto& vexpr_ctx : output_vexpr_ctxs) { int result_column_id = -1; RETURN_IF_ERROR(vexpr_ctx->execute(&tmp_block, &result_column_id)); DCHECK(result_column_id != -1); diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h index db5c4c87d8..70bd37b187 100644 --- a/be/src/vec/exprs/vexpr_context.h +++ b/be/src/vec/exprs/vexpr_context.h @@ -20,6 +20,7 @@ #include #include +#include #include #include "common/factory_creator.h" @@ -40,7 +41,7 @@ class VExprContext { ENABLE_FACTORY_CREATOR(VExprContext); public: - VExprContext(const VExprSPtr& expr); + VExprContext(VExprSPtr expr) : _root(std::move(expr)) {} ~VExprContext(); [[nodiscard]] Status prepare(RuntimeState* state, const RowDescriptor& row_desc); [[nodiscard]] Status open(RuntimeState* state); @@ -76,7 +77,7 @@ public: [[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs& ctxs, const std::vector* filters, - const bool accept_null, Block* block, + bool accept_null, Block* block, IColumn::Filter* result_filter, bool* can_filter_all); @@ -121,7 +122,7 @@ public: _prepared = other._prepared; _opened = other._opened; - for (auto& fn : other._fn_contexts) { + for (const auto& fn : other._fn_contexts) { _fn_contexts.emplace_back(fn->clone()); } @@ -152,17 +153,17 @@ private: VExprSPtr _root; /// True if this context came from a Clone() call. Used to manage FunctionStateScope. - bool _is_clone; + bool _is_clone = false; /// Variables keeping track of current state. - bool _prepared; - bool _opened; + bool _prepared = false; + bool _opened = false; /// FunctionContexts for each registered expression. The FunctionContexts are created /// and owned by this VExprContext. std::vector> _fn_contexts; - int _last_result_column_id; + int _last_result_column_id = -1; /// The depth of expression-tree. int _depth_num = 0; diff --git a/be/src/vec/exprs/vin_predicate.cpp b/be/src/vec/exprs/vin_predicate.cpp index 9a25d3a223..896b2a903d 100644 --- a/be/src/vec/exprs/vin_predicate.cpp +++ b/be/src/vec/exprs/vin_predicate.cpp @@ -73,11 +73,13 @@ Status VInPredicate::prepare(RuntimeState* state, const RowDescriptor& desc, } VExpr::register_function_context(state, context); + _prepare_finished = true; return Status::OK(); } Status VInPredicate::open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) { + DCHECK(_prepare_finished); for (int i = 0; i < _children.size(); ++i) { RETURN_IF_ERROR(_children[i]->open(state, context, scope)); } @@ -85,6 +87,7 @@ Status VInPredicate::open(RuntimeState* state, VExprContext* context, if (scope == FunctionContext::FRAGMENT_LOCAL) { RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); } + _open_finished = true; return Status::OK(); } @@ -97,6 +100,7 @@ Status VInPredicate::execute(VExprContext* context, Block* block, int* result_co if (is_const_and_have_executed()) { // const have execute in open function return get_result_from_const(block, _expr_name, result_column_id); } + DCHECK(_open_finished || _getting_const_col); // TODO: not execute const expr again, but use the const column in function context doris::vectorized::ColumnNumbers arguments(_children.size()); for (int i = 0; i < _children.size(); ++i) { diff --git a/be/src/vec/exprs/vlambda_function_call_expr.h b/be/src/vec/exprs/vlambda_function_call_expr.h index 4467849802..44d22b1f9e 100644 --- a/be/src/vec/exprs/vlambda_function_call_expr.h +++ b/be/src/vec/exprs/vlambda_function_call_expr.h @@ -34,6 +34,8 @@ public: VLambdaFunctionCallExpr(const TExprNode& node) : VExpr(node) {} ~VLambdaFunctionCallExpr() override = default; + const std::string& expr_name() const override { return _expr_name; } + Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override { RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context)); @@ -48,12 +50,20 @@ public: return Status::InternalError("Lambda Function {} is not implemented.", _fn.name.function_name); } + _prepare_finished = true; return Status::OK(); } - const std::string& expr_name() const override { return _expr_name; } + Status open(RuntimeState* state, VExprContext* context, + FunctionContext::FunctionStateScope scope) override { + DCHECK(_prepare_finished); + RETURN_IF_ERROR(VExpr::open(state, context, scope)); + _open_finished = true; + return Status::OK(); + } Status execute(VExprContext* context, Block* block, int* result_column_id) override { + DCHECK(_open_finished || _getting_const_col); return _lambda_function->execute(context, block, result_column_id, _data_type, _children); } diff --git a/be/src/vec/exprs/vlambda_function_expr.h b/be/src/vec/exprs/vlambda_function_expr.h index 6d84abb937..94571712e4 100644 --- a/be/src/vec/exprs/vlambda_function_expr.h +++ b/be/src/vec/exprs/vlambda_function_expr.h @@ -28,7 +28,22 @@ public: VLambdaFunctionExpr(const TExprNode& node) : VExpr(node) {} ~VLambdaFunctionExpr() override = default; + Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override { + RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context)); + _prepare_finished = true; + return Status::OK(); + } + + Status open(RuntimeState* state, VExprContext* context, + FunctionContext::FunctionStateScope scope) override { + DCHECK(_prepare_finished); + RETURN_IF_ERROR(VExpr::open(state, context, scope)); + _open_finished = true; + return Status::OK(); + } + Status execute(VExprContext* context, Block* block, int* result_column_id) override { + DCHECK(_open_finished || _getting_const_col); return get_child(0)->execute(context, block, result_column_id); } diff --git a/be/src/vec/exprs/vliteral.cpp b/be/src/vec/exprs/vliteral.cpp index 03d1659eee..c7fbb08167 100644 --- a/be/src/vec/exprs/vliteral.cpp +++ b/be/src/vec/exprs/vliteral.cpp @@ -47,9 +47,7 @@ #include "vec/data_types/data_type_decimal.h" #include "vec/runtime/vdatetime_value.h" -namespace doris { - -namespace vectorized { +namespace doris::vectorized { class VExprContext; void VLiteral::init(const TExprNode& node) { @@ -58,8 +56,20 @@ void VLiteral::init(const TExprNode& node) { _column_ptr = _data_type->create_column_const(1, field); } +Status VLiteral::prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) { + RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context)); + return Status::OK(); +} + +Status VLiteral::open(RuntimeState* state, VExprContext* context, + FunctionContext::FunctionStateScope scope) { + RETURN_IF_ERROR(VExpr::open(state, context, scope)); + return Status::OK(); +} + Status VLiteral::execute(VExprContext* context, vectorized::Block* block, int* result_column_id) { // Literal expr should return least one row. + // sometimes we just use a VLiteral without open or prepare. so can't check it at this moment size_t row_size = std::max(block->rows(), _column_ptr->size()); *result_column_id = VExpr::insert_param(block, {_column_ptr, _data_type, _expr_name}, row_size); return Status::OK(); @@ -86,5 +96,4 @@ std::string VLiteral::debug_string() const { return out.str(); } -} // namespace vectorized -} // namespace doris +} // namespace doris::vectorized diff --git a/be/src/vec/exprs/vliteral.h b/be/src/vec/exprs/vliteral.h index 78879c00d0..d443478ada 100644 --- a/be/src/vec/exprs/vliteral.h +++ b/be/src/vec/exprs/vliteral.h @@ -42,7 +42,12 @@ public: init(node); } } + + Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override; + Status open(RuntimeState* state, VExprContext* context, + FunctionContext::FunctionStateScope scope) override; Status execute(VExprContext* context, Block* block, int* result_column_id) override; + const std::string& expr_name() const override { return _expr_name; } std::string debug_string() const override; diff --git a/be/src/vec/exprs/vmatch_predicate.cpp b/be/src/vec/exprs/vmatch_predicate.cpp index 23a34aae5a..17326b5b23 100644 --- a/be/src/vec/exprs/vmatch_predicate.cpp +++ b/be/src/vec/exprs/vmatch_predicate.cpp @@ -91,12 +91,13 @@ Status VMatchPredicate::prepare(RuntimeState* state, const RowDescriptor& desc, VExpr::register_function_context(state, context); _expr_name = fmt::format("{}({})", _fn.name.function_name, child_expr_name); _function_name = _fn.name.function_name; - + _prepare_finished = true; return Status::OK(); } Status VMatchPredicate::open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) { + DCHECK(_prepare_finished); for (int i = 0; i < _children.size(); ++i) { RETURN_IF_ERROR(_children[i]->open(state, context, scope)); } @@ -107,6 +108,7 @@ Status VMatchPredicate::open(RuntimeState* state, VExprContext* context, if (scope == FunctionContext::FRAGMENT_LOCAL) { RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr)); } + _open_finished = true; return Status::OK(); } @@ -116,6 +118,7 @@ void VMatchPredicate::close(VExprContext* context, FunctionContext::FunctionStat } Status VMatchPredicate::execute(VExprContext* context, Block* block, int* result_column_id) { + DCHECK(_open_finished || _getting_const_col); // TODO: not execute const expr again, but use the const column in function context doris::vectorized::ColumnNumbers arguments(_children.size()); for (int i = 0; i < _children.size(); ++i) { diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp b/be/src/vec/exprs/vruntimefilter_wrapper.cpp index 62ef2bbdb6..c623355d67 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp +++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp @@ -52,12 +52,16 @@ Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& VExprContext* context) { RETURN_IF_ERROR_OR_PREPARED(_impl->prepare(state, desc, context)); _expr_name = fmt::format("VRuntimeFilterWrapper({})", _impl->expr_name()); + _prepare_finished = true; return Status::OK(); } Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context, FunctionContext::FunctionStateScope scope) { - return _impl->open(state, context, scope); + DCHECK(_prepare_finished); + RETURN_IF_ERROR(_impl->open(state, context, scope)); + _open_finished = true; + return Status::OK(); } void VRuntimeFilterWrapper::close(VExprContext* context, @@ -66,6 +70,7 @@ void VRuntimeFilterWrapper::close(VExprContext* context, } Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) { + DCHECK(_open_finished || _getting_const_col); if (_always_true) { auto res_data_column = ColumnVector::create(block->rows(), 1); size_t num_columns_without_result = block->columns(); @@ -80,7 +85,15 @@ Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* return Status::OK(); } else { _scan_rows += block->rows(); + + if (_getting_const_col) { + _impl->set_getting_const_col(true); + } RETURN_IF_ERROR(_impl->execute(context, block, result_column_id)); + if (_getting_const_col) { + _impl->set_getting_const_col(false); + } + uint8_t* data = nullptr; const ColumnWithTypeAndName& result_column = block->get_by_position(*result_column_id); if (is_column_const(*result_column.column)) { diff --git a/be/src/vec/exprs/vslot_ref.cpp b/be/src/vec/exprs/vslot_ref.cpp index 5a34999acc..b683a1fb15 100644 --- a/be/src/vec/exprs/vslot_ref.cpp +++ b/be/src/vec/exprs/vslot_ref.cpp @@ -51,6 +51,7 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const doris::RowDescriptor& RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context)); DCHECK_EQ(_children.size(), 0); if (_slot_id == -1) { + _prepare_finished = true; return Status::OK(); } const SlotDescriptor* slot_desc = state->desc_tbl().get_slot_descriptor(_slot_id); @@ -63,6 +64,7 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const doris::RowDescriptor& if (!context->force_materialize_slot() && !slot_desc->need_materialize()) { // slot should be ignored manually _column_id = -1; + _prepare_finished = true; return Status::OK(); } _column_id = desc.get_column_id(_slot_id, context->force_materialize_slot()); @@ -72,6 +74,15 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const doris::RowDescriptor& *_column_name, _slot_id, desc.debug_string(), slot_desc->debug_string(), state->desc_tbl().debug_string()); } + _prepare_finished = true; + return Status::OK(); +} + +Status VSlotRef::open(RuntimeState* state, VExprContext* context, + FunctionContext::FunctionStateScope scope) { + DCHECK(_prepare_finished); + RETURN_IF_ERROR(VExpr::open(state, context, scope)); + _open_finished = true; return Status::OK(); } diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h index 2084ae1871..c30ac64041 100644 --- a/be/src/vec/exprs/vslot_ref.h +++ b/be/src/vec/exprs/vslot_ref.h @@ -38,8 +38,11 @@ class VSlotRef final : public VExpr { public: VSlotRef(const TExprNode& node); VSlotRef(const SlotDescriptor* desc); - Status execute(VExprContext* context, Block* block, int* result_column_id) override; Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override; + Status open(RuntimeState* state, VExprContext* context, + FunctionContext::FunctionStateScope scope) override; + Status execute(VExprContext* context, Block* block, int* result_column_id) override; + const std::string& expr_name() const override; std::string debug_string() const override; bool is_constant() const override { return false; } diff --git a/be/src/vec/exprs/vtuple_is_null_predicate.cpp b/be/src/vec/exprs/vtuple_is_null_predicate.cpp index b17428bfc0..641e34590a 100644 --- a/be/src/vec/exprs/vtuple_is_null_predicate.cpp +++ b/be/src/vec/exprs/vtuple_is_null_predicate.cpp @@ -48,11 +48,20 @@ Status VTupleIsNullPredicate::prepare(RuntimeState* state, const RowDescriptor& DCHECK_EQ(0, _children.size()); _column_to_check = _is_left_null_side ? desc.num_materialized_slots() : desc.num_materialized_slots() + 1; + _prepare_finished = true; + return Status::OK(); +} +Status VTupleIsNullPredicate::open(RuntimeState* state, VExprContext* context, + FunctionContext::FunctionStateScope scope) { + DCHECK(_prepare_finished); + RETURN_IF_ERROR(VExpr::open(state, context, scope)); + _open_finished = true; return Status::OK(); } Status VTupleIsNullPredicate::execute(VExprContext* context, Block* block, int* result_column_id) { + DCHECK(_open_finished || _getting_const_col); *result_column_id = _column_to_check; return Status::OK(); } diff --git a/be/src/vec/exprs/vtuple_is_null_predicate.h b/be/src/vec/exprs/vtuple_is_null_predicate.h index 9d3b794fb8..c42e7300d1 100644 --- a/be/src/vec/exprs/vtuple_is_null_predicate.h +++ b/be/src/vec/exprs/vtuple_is_null_predicate.h @@ -42,8 +42,10 @@ class VTupleIsNullPredicate final : public VExpr { public: explicit VTupleIsNullPredicate(const TExprNode& node); ~VTupleIsNullPredicate() override = default; - Status execute(VExprContext* context, Block* block, int* result_column_id) override; Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override; + Status open(RuntimeState* state, VExprContext* context, + FunctionContext::FunctionStateScope scope) override; + Status execute(VExprContext* context, Block* block, int* result_column_id) override; [[nodiscard]] bool is_constant() const override { return false; } diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 141a5c54b6..2da6f9f920 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -20,11 +20,11 @@ #include #include #include -#include -#include #include #include +#include +#include #include #include #include @@ -42,14 +42,10 @@ #include "common/object_pool.h" #include "common/status.h" #include "runtime/descriptors.h" -#include "runtime/query_statistics.h" #include "runtime/task_execution_context.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" -#include "vec/columns/column.h" #include "vec/core/block.h" -#include "vec/core/column_with_type_and_name.h" -#include "vec/core/materialize_block.h" #include "vec/exprs/vexpr_fwd.h" namespace doris {