diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h index ca3710002d..19548b59f3 100644 --- a/be/src/exec/olap_common.h +++ b/be/src/exec/olap_common.h @@ -199,6 +199,7 @@ public: if (TYPE_MIN != _low_value || FILTER_LARGER_OR_EQUAL != _low_op) { low.__set_column_name(_column_name); low.__set_condition_op((_low_op == FILTER_LARGER_OR_EQUAL ? ">=" : ">>")); + low.__set_marked_by_runtime_filter(_marked_runtime_filter_predicate); low.condition_values.push_back( cast_to_string(_low_value, _scale)); } @@ -211,6 +212,7 @@ public: if (TYPE_MAX != _high_value || FILTER_LESS_OR_EQUAL != _high_op) { high.__set_column_name(_column_name); high.__set_condition_op((_high_op == FILTER_LESS_OR_EQUAL ? "<=" : "<<")); + high.__set_marked_by_runtime_filter(_marked_runtime_filter_predicate); high.condition_values.push_back( cast_to_string(_high_value, _scale)); } @@ -237,6 +239,7 @@ public: TCondition condition; condition.__set_column_name(_column_name); condition.__set_condition_op(is_in ? "*=" : "!*="); + condition.__set_marked_by_runtime_filter(_marked_runtime_filter_predicate); for (const auto& value : _fixed_values) { condition.condition_values.push_back( @@ -333,6 +336,12 @@ public: _contain_null = contain_null; } + void mark_runtime_filter_predicate(bool is_runtime_filter_predicate) { + _marked_runtime_filter_predicate = is_runtime_filter_predicate; + } + + bool get_marked_by_runtime_filter() const { return _marked_runtime_filter_predicate; } + int precision() const { return _precision; } int scale() const { return _scale; } @@ -413,6 +422,7 @@ private: // range value except leaf node of and node in compound expr tree std::set> _compound_values; + bool _marked_runtime_filter_predicate = false; }; class OlapScanKeys { diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h index 2af921ddf4..c87adaafb8 100644 --- a/be/src/olap/column_predicate.h +++ b/be/src/olap/column_predicate.h @@ -34,6 +34,7 @@ class Schema; struct PredicateParams { std::string value; + bool marked_by_runtime_filter = false; }; enum class PredicateType { diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index 61ce931e52..3cc8da46ca 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -119,6 +119,7 @@ public: std::vector* read_orderby_key_columns = nullptr; IOContext io_ctx; vectorized::VExpr* remaining_vconjunct_root = nullptr; + const std::set* output_columns = nullptr; // runtime state RuntimeState* runtime_state = nullptr; RowsetId rowset_id; diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 0d116dd0ae..8d59085908 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -227,6 +227,7 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params, _reader_context.record_rowids = read_params.record_rowids; _reader_context.is_key_column_group = read_params.is_key_column_group; _reader_context.remaining_vconjunct_root = read_params.remaining_vconjunct_root; + _reader_context.output_columns = &read_params.output_columns; *valid_rs_readers = *rs_readers; @@ -457,6 +458,7 @@ void TabletReader::_init_conditions_param(const ReaderParams& read_params) { // _gen_predicate_result_sign will build predicate result unique sign with condition value auto predicate_params = predicate->predicate_params(); predicate_params->value = condition.condition_values[0]; + predicate_params->marked_by_runtime_filter = condition.marked_by_runtime_filter; if (_tablet_schema->column_by_uid(condition_col_uid).aggregation() != FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) { _value_col_predicates.push_back(predicate); @@ -476,7 +478,13 @@ void TabletReader::_init_conditions_param(const ReaderParams& read_params) { } for (const auto& filter : read_params.in_filters) { - _col_predicates.emplace_back(_parse_to_predicate(filter)); + ColumnPredicate* predicate = _parse_to_predicate(filter); + if (predicate != nullptr) { + // in_filters from runtime filter predicates which pushed down to data source. + auto predicate_params = predicate->predicate_params(); + predicate_params->marked_by_runtime_filter = true; + } + _col_predicates.emplace_back(predicate); } // Function filter push down to storage engine @@ -523,6 +531,7 @@ void TabletReader::_init_conditions_param_except_leafnode_of_andnode( parse_to_predicate(_tablet_schema, tmp_cond, _predicate_mem_pool.get()); if (predicate != nullptr) { auto predicate_params = predicate->predicate_params(); + predicate_params->marked_by_runtime_filter = condition.marked_by_runtime_filter; predicate_params->value = condition.condition_values[0]; _col_preds_except_leafnode_of_andnode.push_back(predicate); } diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 3abccc8044..4168341738 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -101,7 +101,10 @@ public: DeleteBitmap* delete_bitmap {nullptr}; std::vector rs_readers; + // return_columns is init from query schema std::vector return_columns; + // output_columns only contain columns in OrderByExprs and outputExprs + std::set output_columns; RuntimeProfile* profile = nullptr; RuntimeState* runtime_state = nullptr; diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 0f978e3c3a..2ba9e2fe85 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -163,6 +163,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context _read_options.read_orderby_key_columns = read_context->read_orderby_key_columns; _read_options.io_ctx.reader_type = read_context->reader_type; _read_options.runtime_state = read_context->runtime_state; + _read_options.output_columns = read_context->output_columns; // load segments // use cache is true when do vertica compaction diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index cd52884fc9..e342a5a824 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -77,6 +77,7 @@ struct RowsetReaderContext { bool record_rowids = false; bool is_vertical_compaction = false; bool is_key_column_group = false; + const std::set* output_columns = nullptr; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 0f29a4e01a..39ada1e7d5 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -176,7 +176,14 @@ Status SegmentIterator::init(const StorageReadOptions& opts) { if (!opts.column_predicates_except_leafnode_of_andnode.empty()) { _col_preds_except_leafnode_of_andnode = opts.column_predicates_except_leafnode_of_andnode; } + + if (opts.output_columns != nullptr) { + _output_columns = *(opts.output_columns); + } + _remaining_vconjunct_root = opts.remaining_vconjunct_root; + _column_predicate_info.reset(new ColumnPredicateInfo()); + _calculate_pred_in_remaining_vconjunct_root(_remaining_vconjunct_root); _column_predicate_info.reset(new ColumnPredicateInfo()); if (_schema.rowid_col_idx() > 0) { @@ -419,12 +426,19 @@ Status SegmentIterator::_apply_bitmap_index() { for (auto pred : _col_predicates) { int32_t unique_id = _schema.unique_id(pred->column_id()); if (_bitmap_index_iterators.count(unique_id) < 1 || - _bitmap_index_iterators[unique_id] == nullptr) { + _bitmap_index_iterators[unique_id] == nullptr || pred->type() == PredicateType::BF) { // no bitmap index for this column remaining_predicates.push_back(pred); } else { RETURN_IF_ERROR(pred->evaluate(_bitmap_index_iterators[unique_id], _segment->num_rows(), &_row_bitmap)); + + auto column_name = _schema.column(pred->column_id())->name(); + if (_check_column_pred_all_push_down(column_name) && + !pred->predicate_params()->marked_by_runtime_filter) { + _need_read_data_indices[unique_id] = false; + } + if (_row_bitmap.isEmpty()) { break; // all rows have been pruned, no need to process further predicates } @@ -518,7 +532,8 @@ Status SegmentIterator::_execute_compound_fn(const std::string& function_name) { bool SegmentIterator::_can_filter_by_preds_except_leafnode_of_andnode() { for (auto pred : _col_preds_except_leafnode_of_andnode) { - if (!_check_apply_by_bitmap_index(pred) && !_check_apply_by_inverted_index(pred)) { + if (_not_apply_index_pred.count(pred->column_id()) || + (!_check_apply_by_bitmap_index(pred) && !_check_apply_by_inverted_index(pred))) { return false; } } @@ -597,6 +612,7 @@ Status SegmentIterator::_apply_index_except_leafnode_of_andnode() { pred->type() != PredicateType::MATCH) || res.code() == ErrorCode::INVERTED_INDEX_FILE_HIT_LIMIT) { // downgrade without index query + _not_apply_index_pred.insert(pred->column_id()); continue; } LOG(WARNING) << "failed to evaluate index" @@ -610,6 +626,16 @@ Status SegmentIterator::_apply_index_except_leafnode_of_andnode() { std::make_pair(pred_result_sign, std::make_pair(true, bitmap))); } + for (auto pred : _col_preds_except_leafnode_of_andnode) { + auto column_name = _schema.column(pred->column_id())->name(); + if (_remaining_vconjunct_root != nullptr && + _check_column_pred_all_push_down(column_name, true) && + !pred->predicate_params()->marked_by_runtime_filter) { + int32_t unique_id = _schema.unique_id(pred->column_id()); + _need_read_data_indices[unique_id] = false; + } + } + return Status::OK(); } @@ -645,6 +671,11 @@ bool SegmentIterator::_is_handle_predicate_by_fulltext(int32_t unique_id) { std::all_of(predicate_set.begin(), predicate_set.end(), \ [](const ColumnPredicate* p) { return PredicateTypeTraits::is_range(p->type()); }) +#define all_predicates_are_marked_by_runtime_filter(predicate_set) \ + std::all_of(predicate_set.begin(), predicate_set.end(), [](const ColumnPredicate* p) { \ + return const_cast(p)->predicate_params()->marked_by_runtime_filter; \ + }) + Status SegmentIterator::_apply_inverted_index_on_column_predicate( ColumnPredicate* pred, std::vector& remaining_predicates, bool* continue_apply) { @@ -679,6 +710,12 @@ Status SegmentIterator::_apply_inverted_index_on_column_predicate( return res; } + auto column_name = _schema.column(pred->column_id())->name(); + if (_check_column_pred_all_push_down(column_name) && + !pred->predicate_params()->marked_by_runtime_filter) { + _need_read_data_indices[unique_id] = false; + } + auto pred_type = pred->type(); if (pred_type == PredicateType::MATCH) { std::string pred_result_sign = _gen_predicate_result_sign(pred); @@ -721,6 +758,10 @@ Status SegmentIterator::_apply_inverted_index_on_block_column_predicate( &output_result); if (res.ok()) { + if (_check_column_pred_all_push_down(column_name) && + !all_predicates_are_marked_by_runtime_filter(predicate_set)) { + _need_read_data_indices[unique_id] = false; + } no_need_to_pass_column_predicate_set.insert(predicate_set.begin(), predicate_set.end()); _row_bitmap &= output_result; if (_row_bitmap.isEmpty()) { @@ -742,6 +783,17 @@ Status SegmentIterator::_apply_inverted_index_on_block_column_predicate( return Status::OK(); } +bool SegmentIterator::_need_read_data(ColumnId cid) { + int32_t unique_id = _schema.unique_id(cid); + if (_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id] && + _output_columns.count(unique_id) < 1) { + VLOG_DEBUG << "SegmentIterator no need read data for column: " + << _opts.tablet_schema->column_by_uid(unique_id).name(); + return false; + } + return true; +} + Status SegmentIterator::_apply_inverted_index() { SCOPED_RAW_TIMER(&_opts.stats->inverted_index_filter_timer); size_t input_rows = _row_bitmap.cardinality(); @@ -993,6 +1045,9 @@ Status SegmentIterator::_seek_and_peek(rowid_t rowid) { Status SegmentIterator::_seek_columns(const std::vector& column_ids, rowid_t pos) { for (auto cid : column_ids) { + if (!_need_read_data(cid)) { + continue; + } RETURN_IF_ERROR(_column_iterators[_schema.unique_id(cid)]->seek_to_ordinal(pos)); } return Status::OK(); @@ -1201,11 +1256,33 @@ void SegmentIterator::_vec_init_char_column_id() { } } +bool SegmentIterator::_prune_column(ColumnId cid, vectorized::MutableColumnPtr& column, + bool fill_defaults, size_t num_of_defaults) { + if (_need_read_data(cid)) { + return false; + } + if (!fill_defaults) { + return true; + } + if (column->is_nullable()) { + auto nullable_col_ptr = reinterpret_cast(column.get()); + nullable_col_ptr->get_null_map_column().insert_many_defaults(num_of_defaults); + nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(num_of_defaults); + } else { + // assert(column->is_const()); + column->insert_many_defaults(num_of_defaults); + } + return true; +} + Status SegmentIterator::_read_columns(const std::vector& column_ids, vectorized::MutableColumns& column_block, size_t nrows) { for (auto cid : column_ids) { auto& column = column_block[cid]; size_t rows_read = nrows; + if (_prune_column(cid, column, true, rows_read)) { + continue; + } RETURN_IF_ERROR(_column_iterators[_schema.unique_id(cid)]->next_batch(&rows_read, column)); DCHECK_EQ(nrows, rows_read); } @@ -1370,7 +1447,8 @@ uint16_t SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_ro Status SegmentIterator::_read_columns_by_rowids(std::vector& read_column_ids, std::vector& rowid_vector, - uint16_t* sel_rowid_idx, size_t select_size) { + uint16_t* sel_rowid_idx, size_t select_size, + vectorized::MutableColumns* mutable_columns) { SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns); std::vector rowids(select_size); for (size_t i = 0; i < select_size; ++i) { @@ -1378,6 +1456,9 @@ Status SegmentIterator::_read_columns_by_rowids(std::vector& read_colu } for (auto cid : read_column_ids) { + if (_prune_column(cid, (*mutable_columns)[cid], true, select_size)) { + continue; + } RETURN_IF_ERROR(_column_iterators[_schema.unique_id(cid)]->read_by_rowids( rowids.data(), select_size, _current_return_columns[cid])); } @@ -1493,7 +1574,8 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { // step3: read non_predicate column RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns, _block_rowids, - sel_rowid_idx, selected_size)); + sel_rowid_idx, selected_size, + &_current_return_columns)); // step4: output columns // 4.1 output non-predicate column @@ -1642,5 +1724,77 @@ Status SegmentIterator::current_block_row_locations(std::vector* bl return Status::OK(); } +/** + * solution 1: where cluase included nodes are all `and` leaf nodes, + * predicate pushed down and remove from vconjunct. + * for example: where A = 1 and B = 'test' and B like '%he%'; + * column A : `A = 1` pushed down, this column's predicates all pushed down, + * call _check_column_pred_all_push_down will return true. + * column B : `B = 'test'` pushed down, but `B like '%he%'` remain in vconjunct, + * call _check_column_pred_all_push_down will return false. + * + * solution 2: where cluase included nodes are compound or other complex conditions, + * predicate pushed down but still remain in vconjunct. + * for exmple: where (A = 1 and B = 'test') or B = 'hi' or (C like '%ye%' and C > 'aa'); + * column A : `A = 1` pushed down, check it applyed by index, + * call _check_column_pred_all_push_down will return true. + * column B : `B = 'test'`, `B = 'hi'` all pushed down, check them all applyed by index, + * call _check_column_pred_all_push_down will return true. + * column C : `C like '%ye%'` not pushed down, `C > 'aa'` pushed down, only `C > 'aa'` applyed by index, + * call _check_column_pred_all_push_down will return false. +*/ +bool SegmentIterator::_check_column_pred_all_push_down(const std::string& column_name, + bool in_compound) { + if (_remaining_vconjunct_root == nullptr) { + return true; + } + + if (in_compound) { + auto preds_in_remaining_vconjuct = _column_pred_in_remaining_vconjunct[column_name]; + for (auto pred_info : preds_in_remaining_vconjuct) { + auto column_sign = _gen_predicate_result_sign(&pred_info); + if (_rowid_result_for_index.count(column_sign) < 1) { + return false; + } + } + } else { + if (_column_pred_in_remaining_vconjunct[column_name].size() != 0) { + return false; + } + } + return true; +} + +void SegmentIterator::_calculate_pred_in_remaining_vconjunct_root(const vectorized::VExpr* expr) { + if (expr == nullptr) { + return; + } + + auto children = expr->children(); + for (int i = 0; i < children.size(); ++i) { + _calculate_pred_in_remaining_vconjunct_root(children[i]); + } + + auto node_type = expr->node_type(); + if (node_type == TExprNodeType::SLOT_REF) { + _column_predicate_info->column_name = expr->expr_name(); + } else if (_is_literal_node(node_type)) { + auto v_literal_expr = static_cast(expr); + _column_predicate_info->query_value = v_literal_expr->value(); + } else { + if (node_type == TExprNodeType::MATCH_PRED) { + _column_predicate_info->query_op = "match"; + } else if (node_type != TExprNodeType::COMPOUND_PRED) { + _column_predicate_info->query_op = expr->fn().name.function_name; + } + + if (!_column_predicate_info->is_empty()) { + _column_pred_in_remaining_vconjunct[_column_predicate_info->column_name].push_back( + *_column_predicate_info); + _column_predicate_info.reset(new ColumnPredicateInfo()); + } + } +} + } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h index f8ec46333b..b976fc53c7 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -51,6 +51,32 @@ class ColumnIterator; struct ColumnPredicateInfo { ColumnPredicateInfo() = default; + + std::string debug_string() const { + std::stringstream ss; + ss << "column_name=" << column_name << ", query_op=" << query_op + << ", query_value=" << query_value; + return ss.str(); + } + + bool is_empty() const { return column_name.empty() && query_value.empty() && query_op.empty(); } + + bool is_equal(const ColumnPredicateInfo& column_pred_info) const { + if (column_pred_info.column_name != column_name) { + return false; + } + + if (column_pred_info.query_value != query_value) { + return false; + } + + if (column_pred_info.query_op != query_op) { + return false; + } + + return true; + } + std::string column_name; std::string query_value; std::string query_op; @@ -167,7 +193,7 @@ private: void _output_non_pred_columns(vectorized::Block* block); Status _read_columns_by_rowids(std::vector& read_column_ids, std::vector& rowid_vector, uint16_t* sel_rowid_idx, - size_t select_size); + size_t select_size, vectorized::MutableColumns* mutable_columns); template Status _output_column_by_sel_idx(vectorized::Block* block, const Container& column_ids, @@ -203,6 +229,14 @@ private: void _output_index_result_column(uint16_t* sel_rowid_idx, uint16_t select_size, vectorized::Block* block); + bool _need_read_data(ColumnId cid); + bool _prune_column(ColumnId cid, vectorized::MutableColumnPtr& column, bool fill_defaults, + size_t num_of_defaults); + + // return true means one column's predicates all pushed down + bool _check_column_pred_all_push_down(const std::string& column_name, bool in_compound = false); + void _calculate_pred_in_remaining_vconjunct_root(const vectorized::VExpr* expr); + private: // todo(wb) remove this method after RowCursor is removed void _convert_rowcursor_to_short_key(const RowCursor& key, size_t num_keys) { @@ -297,6 +331,7 @@ private: std::vector _short_cir_pred_column_ids; // keep columnId of columns for short circuit predicate evaluation std::vector _is_pred_column; // columns hold by segmentIter + std::map _need_read_data_indices; vectorized::MutableColumns _current_return_columns; std::vector _pre_eval_block_predicate; std::vector _short_cir_eval_predicate; @@ -320,8 +355,12 @@ private: doris::vectorized::VExpr* _remaining_vconjunct_root; std::vector _pred_except_leafnode_of_andnode_evaluate_result; std::unique_ptr _column_predicate_info; + std::unordered_map> + _column_pred_in_remaining_vconjunct; + std::set _not_apply_index_pred; std::shared_ptr _runtime_predicate {nullptr}; + std::set _output_columns; // row schema of the key to seek // only used in `_get_row_ranges_by_keys` diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 0285189ef1..2978d806db 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -372,6 +372,15 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { (*_vconjunct_ctx_ptr)->root()->debug_string()); } + if (!_olap_scan_node.output_column_unique_ids.empty()) { + for (auto uid : _olap_scan_node.output_column_unique_ids) { + if (uid < 0) { + continue; + } + _maybe_read_column_ids.emplace(uid); + } + } + // ranges constructed from scan keys std::vector> cond_ranges; RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges)); diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h b/be/src/vec/exec/scan/new_olap_scan_node.h index c7a346b6db..b179f31060 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.h +++ b/be/src/vec/exec/scan/new_olap_scan_node.h @@ -68,6 +68,8 @@ private: // _compound_filters store conditions in the one compound relationship in conjunct expr tree except leaf node of `and` node, // such as: "(a or b) and (c or d)", conditions for a,b,c,d will be stored std::vector _compound_filters; + // If column id in this set, indicate that we need to read data after index filtering + std::set _maybe_read_column_ids; private: std::unique_ptr _segment_profile; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 8dc2e80e2f..a1d8ed0cea 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -215,6 +215,7 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.version = Version(0, _version); _tablet_reader_params.remaining_vconjunct_root = (_vconjunct_ctx == nullptr) ? nullptr : _vconjunct_ctx->root(); + _tablet_reader_params.output_columns = ((NewOlapScanNode*)_parent)->_maybe_read_column_ids; // Condition for (auto& filter : filters) { diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 53cf4930f5..f151a22014 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -511,6 +511,8 @@ Status VScanNode::_normalize_predicate(VExpr* conjunct_expr_root, VExpr** output auto impl = conjunct_expr_root->get_impl(); // If impl is not null, which means this a conjuncts from runtime filter. VExpr* cur_expr = impl ? const_cast(impl) : conjunct_expr_root; + bool is_runtimer_filter_predicate = + _rf_vexpr_set.find(conjunct_expr_root) != _rf_vexpr_set.end(); SlotDescriptor* slot = nullptr; ColumnValueRangeType* range = nullptr; PushDownType pdt = PushDownType::UNACCEPTABLE; @@ -523,6 +525,10 @@ Status VScanNode::_normalize_predicate(VExpr* conjunct_expr_root, VExpr** output _is_predicate_acting_on_slot(cur_expr, eq_predicate_checker, &slot, &range)) { std::visit( [&](auto& value_range) { + Defer mark_runtime_filter_flag {[&]() { + value_range.mark_runtime_filter_predicate( + is_runtimer_filter_predicate); + }}; RETURN_IF_PUSH_DOWN(_normalize_in_and_eq_predicate( cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range, &pdt)); @@ -555,7 +561,8 @@ Status VScanNode::_normalize_predicate(VExpr* conjunct_expr_root, VExpr** output if (pdt == PushDownType::UNACCEPTABLE && TExprNodeType::COMPOUND_PRED == cur_expr->node_type()) { _normalize_compound_predicate(cur_expr, *(_vconjunct_ctx_ptr.get()), &pdt, - in_predicate_checker, eq_predicate_checker); + is_runtimer_filter_predicate, in_predicate_checker, + eq_predicate_checker); *output_expr = conjunct_expr_root; // remaining in conjunct tree return Status::OK(); } @@ -977,6 +984,7 @@ Status VScanNode::_normalize_noneq_binary_predicate(VExpr* expr, VExprContext* e Status VScanNode::_normalize_compound_predicate( vectorized::VExpr* expr, VExprContext* expr_ctx, PushDownType* pdt, + bool is_runtimer_filter_predicate, const std::function&, const VSlotRef**, VExpr**)>& in_predicate_checker, const std::function&, const VSlotRef**, VExpr**)>& @@ -997,6 +1005,10 @@ Status VScanNode::_normalize_compound_predicate( *range_on_slot; // copy, in order not to affect the range in the _colname_to_value_range std::visit( [&](auto& value_range) { + Defer mark_runtime_filter_flag {[&]() { + value_range.mark_runtime_filter_predicate( + is_runtimer_filter_predicate); + }}; _normalize_binary_in_compound_predicate(child_expr, expr_ctx, slot, value_range, pdt); }, @@ -1015,6 +1027,10 @@ Status VScanNode::_normalize_compound_predicate( *range_on_slot; // copy, in order not to affect the range in the _colname_to_value_range std::visit( [&](auto& value_range) { + Defer mark_runtime_filter_flag {[&]() { + value_range.mark_runtime_filter_predicate( + is_runtimer_filter_predicate); + }}; _normalize_match_in_compound_predicate(child_expr, expr_ctx, slot, value_range, pdt); }, @@ -1023,7 +1039,8 @@ Status VScanNode::_normalize_compound_predicate( _compound_value_ranges.emplace_back(active_range); } } else if (TExprNodeType::COMPOUND_PRED == child_expr->node_type()) { - _normalize_compound_predicate(child_expr, expr_ctx, pdt, in_predicate_checker, + _normalize_compound_predicate(child_expr, expr_ctx, pdt, + is_runtimer_filter_predicate, in_predicate_checker, eq_predicate_checker); } } diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index c8d7269087..476c7c37da 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -340,6 +340,7 @@ private: Status _normalize_compound_predicate( vectorized::VExpr* expr, VExprContext* expr_ctx, PushDownType* pdt, + bool is_runtimer_filter_predicate, const std::function&, const VSlotRef**, VExpr**)>& in_predicate_checker, const std::function&, const VSlotRef**, VExpr**)>& diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java index 2501b308f2..431d5c9cba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java @@ -154,6 +154,11 @@ public abstract class AggregateInfoBase { Expr expr = exprs.get(i); SlotDescriptor slotDesc = analyzer.addSlotDescriptor(result); slotDesc.initFromExpr(expr); + if (expr instanceof SlotRef) { + if (((SlotRef) expr).getColumn() != null) { + slotDesc.setColumn(((SlotRef) expr).getColumn()); + } + } // Not change the nullable of slot desc when is not grouping set id if (isGroupingSet && i < aggregateExprStartIndex - 1 && !(expr instanceof VirtualSlotRef)) { slotDesc.setIsNullable(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java index 00b871c3b4..0dc41c037d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SortInfo.java @@ -51,6 +51,7 @@ public class SortInfo { private static final float SORT_MATERIALIZATION_COST_THRESHOLD = Expr.FUNCTION_CALL_COST; private List orderingExprs; + private List origOrderingExprs; private final List isAscOrder; // True if "NULLS FIRST", false if "NULLS LAST", null if not specified. private final List nullsFirstParams; @@ -122,6 +123,10 @@ public class SortInfo { return orderingExprs; } + public List getOrigOrderingExprs() { + return origOrderingExprs; + } + public List getIsAscOrder() { return isAscOrder; } @@ -261,6 +266,9 @@ public class SortInfo { } } + // backup before substitute orderingExprs + origOrderingExprs = orderingExprs; + // The ordering exprs are evaluated against the sort tuple, so they must reflect the // materialization decision above. substituteOrderingExprs(substOrderBy, analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 0d1e3044d0..5c37623b2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -155,6 +155,7 @@ public class OlapScanNode extends ScanNode { private long totalBytes = 0; private SortInfo sortInfo = null; + private HashSet outputColumnUniqueIds = new HashSet<>(); // When scan match sort_info, we can push limit into OlapScanNode. // It's limit for scanner instead of scanNode so we add a new limit. @@ -813,6 +814,10 @@ public class OlapScanNode extends ScanNode { LOG.debug("distribution prune cost: {} ms", (System.currentTimeMillis() - start)); } + public void setOutputColumnUniqueIds(HashSet outputColumnUniqueIds) { + this.outputColumnUniqueIds = outputColumnUniqueIds; + } + /** * First, determine how many rows to sample from each partition according to the number of partitions. * Then determine the number of Tablets to be selected for each partition according to the average number @@ -1149,6 +1154,10 @@ public class OlapScanNode extends ScanNode { if (pushDownAggNoGroupingOp != null) { msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); } + + if (outputColumnUniqueIds != null) { + msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds); + } } // export some tablets diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index 00328d1ef9..f03c9fe1fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -50,6 +50,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -253,6 +254,10 @@ public class OriginalPlanner extends Planner { pushDownResultFileSink(analyzer); + if (VectorizedUtil.isVectorized()) { + pushOutColumnUniqueIdsToOlapScan(rootFragment, analyzer); + } + if (queryStmt instanceof SelectStmt) { SelectStmt selectStmt = (SelectStmt) queryStmt; if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) { @@ -436,6 +441,72 @@ public class OriginalPlanner extends Planner { } } + /** + * outputColumnUniqueIds contain columns in OrderByExprs and outputExprs, + * push output column unique id set to olap scan. + * + * when query to storage layer, there are need read raw data + * for columns which in outputColumnUniqueIds + * + * for example: + * select A from tb where B = 1 and C > 'hello' order by B; + * + * column unique id for `A` and `B` will put into outputColumnUniqueIds. + * + */ + private void pushOutColumnUniqueIdsToOlapScan(PlanFragment rootFragment, Analyzer analyzer) { + HashSet outputColumnUniqueIds = new HashSet<>(); + ArrayList outputExprs = rootFragment.getOutputExprs(); + for (Expr expr : outputExprs) { + if (expr instanceof SlotRef) { + if (((SlotRef) expr).getColumn() != null) { + outputColumnUniqueIds.add(((SlotRef) expr).getColumn().getUniqueId()); + } + } + } + + for (PlanFragment fragment : fragments) { + PlanNode node = fragment.getPlanRoot(); + PlanNode parent = null; + while (node.getChildren().size() != 0) { + for (PlanNode childNode : node.getChildren()) { + List outputSlotIds = childNode.getOutputSlotIds(); + if (outputSlotIds != null) { + for (SlotId sid : outputSlotIds) { + SlotDescriptor slotDesc = analyzer.getSlotDesc(sid); + outputColumnUniqueIds.add(slotDesc.getUniqueId()); + } + } + } + // OlapScanNode is the last node. + // So, just get the two node and check if they are SortNode and OlapScan. + parent = node; + node = node.getChildren().get(0); + } + + if (parent instanceof SortNode) { + SortNode sortNode = (SortNode) parent; + List orderingExprs = sortNode.getSortInfo().getOrigOrderingExprs(); + if (orderingExprs != null) { + for (Expr expr : orderingExprs) { + if (expr instanceof SlotRef) { + if (((SlotRef) expr).getColumn() != null) { + outputColumnUniqueIds.add(((SlotRef) expr).getColumn().getUniqueId()); + } + } + } + } + } + + if (!(node instanceof OlapScanNode)) { + continue; + } + + OlapScanNode scanNode = (OlapScanNode) node; + scanNode.setOutputColumnUniqueIds(outputColumnUniqueIds); + } + } + /** * optimize for topn query like: SELECT * FROM t1 WHERE a>100 ORDER BY b,c LIMIT 100 * the pre-requirement is as follows: diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 4c6cf88fad..86c0520b27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -1130,4 +1130,8 @@ public abstract class PlanNode extends TreeNode implements PlanStats { public List getProjectList() { return projectList; } + + public List getOutputSlotIds() { + return outputSlotIds; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java index a621994b34..244b7c9288 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java @@ -409,7 +409,7 @@ public class TableFunctionPlanTest { "SlotDescriptor{id=1,col=k2,colUniqueId=1,type=VARCHAR(1)" )); Assert.assertTrue(formatString.contains( - "SlotDescriptor{id=2,col=null,colUniqueId=null,type=INT" + "SlotDescriptor{id=2,col=k1,colUniqueId=0,type=INT" )); Assert.assertTrue(formatString.contains( "SlotDescriptor{id=3,col=null,colUniqueId=null,type=VARCHAR(*)" diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 59945cb932..52404f1615 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -545,6 +545,7 @@ struct TCondition { // In delete condition, the different column may have same column name, need // using unique id to distinguish them 4: optional i32 column_unique_id + 5: optional bool marked_by_runtime_filter = false } struct TExportStatusResult { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index e7853a1bc7..1eb2372977 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -575,6 +575,7 @@ struct TOlapScanNode { 12: optional TPushAggOp push_down_agg_type_opt 13: optional bool use_topn_opt 14: optional list indexes_desc + 15: optional set output_column_unique_ids } struct TEqJoinCondition { diff --git a/regression-test/suites/inverted_index_p0/test_add_drop_index_with_delete_data.groovy b/regression-test/suites/inverted_index_p0/test_add_drop_index_with_delete_data.groovy index ebf0a7f4b9..365811f3a6 100644 --- a/regression-test/suites/inverted_index_p0/test_add_drop_index_with_delete_data.groovy +++ b/regression-test/suites/inverted_index_p0/test_add_drop_index_with_delete_data.groovy @@ -35,7 +35,7 @@ suite("test_add_drop_index_with_delete_data", "inverted_index"){ assertTrue(useTime <= OpTimeout) } - def indexTbName1 = "test_add_drop_inverted_index2" + def indexTbName1 = "test_add_drop_inverted_index3" sql "DROP TABLE IF EXISTS ${indexTbName1}" // create 1 replica table