[Optimization](index) Optimization for no need to read raw data for index column that only in where clause (#16569)
This commit is contained in:
@ -199,6 +199,7 @@ public:
|
||||
if (TYPE_MIN != _low_value || FILTER_LARGER_OR_EQUAL != _low_op) {
|
||||
low.__set_column_name(_column_name);
|
||||
low.__set_condition_op((_low_op == FILTER_LARGER_OR_EQUAL ? ">=" : ">>"));
|
||||
low.__set_marked_by_runtime_filter(_marked_runtime_filter_predicate);
|
||||
low.condition_values.push_back(
|
||||
cast_to_string<primitive_type, CppType>(_low_value, _scale));
|
||||
}
|
||||
@ -211,6 +212,7 @@ public:
|
||||
if (TYPE_MAX != _high_value || FILTER_LESS_OR_EQUAL != _high_op) {
|
||||
high.__set_column_name(_column_name);
|
||||
high.__set_condition_op((_high_op == FILTER_LESS_OR_EQUAL ? "<=" : "<<"));
|
||||
high.__set_marked_by_runtime_filter(_marked_runtime_filter_predicate);
|
||||
high.condition_values.push_back(
|
||||
cast_to_string<primitive_type, CppType>(_high_value, _scale));
|
||||
}
|
||||
@ -237,6 +239,7 @@ public:
|
||||
TCondition condition;
|
||||
condition.__set_column_name(_column_name);
|
||||
condition.__set_condition_op(is_in ? "*=" : "!*=");
|
||||
condition.__set_marked_by_runtime_filter(_marked_runtime_filter_predicate);
|
||||
|
||||
for (const auto& value : _fixed_values) {
|
||||
condition.condition_values.push_back(
|
||||
@ -333,6 +336,12 @@ public:
|
||||
_contain_null = contain_null;
|
||||
}
|
||||
|
||||
void mark_runtime_filter_predicate(bool is_runtime_filter_predicate) {
|
||||
_marked_runtime_filter_predicate = is_runtime_filter_predicate;
|
||||
}
|
||||
|
||||
bool get_marked_by_runtime_filter() const { return _marked_runtime_filter_predicate; }
|
||||
|
||||
int precision() const { return _precision; }
|
||||
|
||||
int scale() const { return _scale; }
|
||||
@ -413,6 +422,7 @@ private:
|
||||
|
||||
// range value except leaf node of and node in compound expr tree
|
||||
std::set<std::pair<SQLFilterOp, CppType>> _compound_values;
|
||||
bool _marked_runtime_filter_predicate = false;
|
||||
};
|
||||
|
||||
class OlapScanKeys {
|
||||
|
||||
@ -34,6 +34,7 @@ class Schema;
|
||||
|
||||
struct PredicateParams {
|
||||
std::string value;
|
||||
bool marked_by_runtime_filter = false;
|
||||
};
|
||||
|
||||
enum class PredicateType {
|
||||
|
||||
@ -119,6 +119,7 @@ public:
|
||||
std::vector<uint32_t>* read_orderby_key_columns = nullptr;
|
||||
IOContext io_ctx;
|
||||
vectorized::VExpr* remaining_vconjunct_root = nullptr;
|
||||
const std::set<int32_t>* output_columns = nullptr;
|
||||
// runtime state
|
||||
RuntimeState* runtime_state = nullptr;
|
||||
RowsetId rowset_id;
|
||||
|
||||
@ -227,6 +227,7 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params,
|
||||
_reader_context.record_rowids = read_params.record_rowids;
|
||||
_reader_context.is_key_column_group = read_params.is_key_column_group;
|
||||
_reader_context.remaining_vconjunct_root = read_params.remaining_vconjunct_root;
|
||||
_reader_context.output_columns = &read_params.output_columns;
|
||||
|
||||
*valid_rs_readers = *rs_readers;
|
||||
|
||||
@ -457,6 +458,7 @@ void TabletReader::_init_conditions_param(const ReaderParams& read_params) {
|
||||
// _gen_predicate_result_sign will build predicate result unique sign with condition value
|
||||
auto predicate_params = predicate->predicate_params();
|
||||
predicate_params->value = condition.condition_values[0];
|
||||
predicate_params->marked_by_runtime_filter = condition.marked_by_runtime_filter;
|
||||
if (_tablet_schema->column_by_uid(condition_col_uid).aggregation() !=
|
||||
FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
|
||||
_value_col_predicates.push_back(predicate);
|
||||
@ -476,7 +478,13 @@ void TabletReader::_init_conditions_param(const ReaderParams& read_params) {
|
||||
}
|
||||
|
||||
for (const auto& filter : read_params.in_filters) {
|
||||
_col_predicates.emplace_back(_parse_to_predicate(filter));
|
||||
ColumnPredicate* predicate = _parse_to_predicate(filter);
|
||||
if (predicate != nullptr) {
|
||||
// in_filters from runtime filter predicates which pushed down to data source.
|
||||
auto predicate_params = predicate->predicate_params();
|
||||
predicate_params->marked_by_runtime_filter = true;
|
||||
}
|
||||
_col_predicates.emplace_back(predicate);
|
||||
}
|
||||
|
||||
// Function filter push down to storage engine
|
||||
@ -523,6 +531,7 @@ void TabletReader::_init_conditions_param_except_leafnode_of_andnode(
|
||||
parse_to_predicate(_tablet_schema, tmp_cond, _predicate_mem_pool.get());
|
||||
if (predicate != nullptr) {
|
||||
auto predicate_params = predicate->predicate_params();
|
||||
predicate_params->marked_by_runtime_filter = condition.marked_by_runtime_filter;
|
||||
predicate_params->value = condition.condition_values[0];
|
||||
_col_preds_except_leafnode_of_andnode.push_back(predicate);
|
||||
}
|
||||
|
||||
@ -101,7 +101,10 @@ public:
|
||||
DeleteBitmap* delete_bitmap {nullptr};
|
||||
|
||||
std::vector<RowsetReaderSharedPtr> rs_readers;
|
||||
// return_columns is init from query schema
|
||||
std::vector<uint32_t> return_columns;
|
||||
// output_columns only contain columns in OrderByExprs and outputExprs
|
||||
std::set<int32_t> output_columns;
|
||||
RuntimeProfile* profile = nullptr;
|
||||
RuntimeState* runtime_state = nullptr;
|
||||
|
||||
|
||||
@ -163,6 +163,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
|
||||
_read_options.read_orderby_key_columns = read_context->read_orderby_key_columns;
|
||||
_read_options.io_ctx.reader_type = read_context->reader_type;
|
||||
_read_options.runtime_state = read_context->runtime_state;
|
||||
_read_options.output_columns = read_context->output_columns;
|
||||
|
||||
// load segments
|
||||
// use cache is true when do vertica compaction
|
||||
|
||||
@ -77,6 +77,7 @@ struct RowsetReaderContext {
|
||||
bool record_rowids = false;
|
||||
bool is_vertical_compaction = false;
|
||||
bool is_key_column_group = false;
|
||||
const std::set<int32_t>* output_columns = nullptr;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -176,7 +176,14 @@ Status SegmentIterator::init(const StorageReadOptions& opts) {
|
||||
if (!opts.column_predicates_except_leafnode_of_andnode.empty()) {
|
||||
_col_preds_except_leafnode_of_andnode = opts.column_predicates_except_leafnode_of_andnode;
|
||||
}
|
||||
|
||||
if (opts.output_columns != nullptr) {
|
||||
_output_columns = *(opts.output_columns);
|
||||
}
|
||||
|
||||
_remaining_vconjunct_root = opts.remaining_vconjunct_root;
|
||||
_column_predicate_info.reset(new ColumnPredicateInfo());
|
||||
_calculate_pred_in_remaining_vconjunct_root(_remaining_vconjunct_root);
|
||||
|
||||
_column_predicate_info.reset(new ColumnPredicateInfo());
|
||||
if (_schema.rowid_col_idx() > 0) {
|
||||
@ -419,12 +426,19 @@ Status SegmentIterator::_apply_bitmap_index() {
|
||||
for (auto pred : _col_predicates) {
|
||||
int32_t unique_id = _schema.unique_id(pred->column_id());
|
||||
if (_bitmap_index_iterators.count(unique_id) < 1 ||
|
||||
_bitmap_index_iterators[unique_id] == nullptr) {
|
||||
_bitmap_index_iterators[unique_id] == nullptr || pred->type() == PredicateType::BF) {
|
||||
// no bitmap index for this column
|
||||
remaining_predicates.push_back(pred);
|
||||
} else {
|
||||
RETURN_IF_ERROR(pred->evaluate(_bitmap_index_iterators[unique_id], _segment->num_rows(),
|
||||
&_row_bitmap));
|
||||
|
||||
auto column_name = _schema.column(pred->column_id())->name();
|
||||
if (_check_column_pred_all_push_down(column_name) &&
|
||||
!pred->predicate_params()->marked_by_runtime_filter) {
|
||||
_need_read_data_indices[unique_id] = false;
|
||||
}
|
||||
|
||||
if (_row_bitmap.isEmpty()) {
|
||||
break; // all rows have been pruned, no need to process further predicates
|
||||
}
|
||||
@ -518,7 +532,8 @@ Status SegmentIterator::_execute_compound_fn(const std::string& function_name) {
|
||||
|
||||
bool SegmentIterator::_can_filter_by_preds_except_leafnode_of_andnode() {
|
||||
for (auto pred : _col_preds_except_leafnode_of_andnode) {
|
||||
if (!_check_apply_by_bitmap_index(pred) && !_check_apply_by_inverted_index(pred)) {
|
||||
if (_not_apply_index_pred.count(pred->column_id()) ||
|
||||
(!_check_apply_by_bitmap_index(pred) && !_check_apply_by_inverted_index(pred))) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@ -597,6 +612,7 @@ Status SegmentIterator::_apply_index_except_leafnode_of_andnode() {
|
||||
pred->type() != PredicateType::MATCH) ||
|
||||
res.code() == ErrorCode::INVERTED_INDEX_FILE_HIT_LIMIT) {
|
||||
// downgrade without index query
|
||||
_not_apply_index_pred.insert(pred->column_id());
|
||||
continue;
|
||||
}
|
||||
LOG(WARNING) << "failed to evaluate index"
|
||||
@ -610,6 +626,16 @@ Status SegmentIterator::_apply_index_except_leafnode_of_andnode() {
|
||||
std::make_pair(pred_result_sign, std::make_pair(true, bitmap)));
|
||||
}
|
||||
|
||||
for (auto pred : _col_preds_except_leafnode_of_andnode) {
|
||||
auto column_name = _schema.column(pred->column_id())->name();
|
||||
if (_remaining_vconjunct_root != nullptr &&
|
||||
_check_column_pred_all_push_down(column_name, true) &&
|
||||
!pred->predicate_params()->marked_by_runtime_filter) {
|
||||
int32_t unique_id = _schema.unique_id(pred->column_id());
|
||||
_need_read_data_indices[unique_id] = false;
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -645,6 +671,11 @@ bool SegmentIterator::_is_handle_predicate_by_fulltext(int32_t unique_id) {
|
||||
std::all_of(predicate_set.begin(), predicate_set.end(), \
|
||||
[](const ColumnPredicate* p) { return PredicateTypeTraits::is_range(p->type()); })
|
||||
|
||||
#define all_predicates_are_marked_by_runtime_filter(predicate_set) \
|
||||
std::all_of(predicate_set.begin(), predicate_set.end(), [](const ColumnPredicate* p) { \
|
||||
return const_cast<ColumnPredicate*>(p)->predicate_params()->marked_by_runtime_filter; \
|
||||
})
|
||||
|
||||
Status SegmentIterator::_apply_inverted_index_on_column_predicate(
|
||||
ColumnPredicate* pred, std::vector<ColumnPredicate*>& remaining_predicates,
|
||||
bool* continue_apply) {
|
||||
@ -679,6 +710,12 @@ Status SegmentIterator::_apply_inverted_index_on_column_predicate(
|
||||
return res;
|
||||
}
|
||||
|
||||
auto column_name = _schema.column(pred->column_id())->name();
|
||||
if (_check_column_pred_all_push_down(column_name) &&
|
||||
!pred->predicate_params()->marked_by_runtime_filter) {
|
||||
_need_read_data_indices[unique_id] = false;
|
||||
}
|
||||
|
||||
auto pred_type = pred->type();
|
||||
if (pred_type == PredicateType::MATCH) {
|
||||
std::string pred_result_sign = _gen_predicate_result_sign(pred);
|
||||
@ -721,6 +758,10 @@ Status SegmentIterator::_apply_inverted_index_on_block_column_predicate(
|
||||
&output_result);
|
||||
|
||||
if (res.ok()) {
|
||||
if (_check_column_pred_all_push_down(column_name) &&
|
||||
!all_predicates_are_marked_by_runtime_filter(predicate_set)) {
|
||||
_need_read_data_indices[unique_id] = false;
|
||||
}
|
||||
no_need_to_pass_column_predicate_set.insert(predicate_set.begin(), predicate_set.end());
|
||||
_row_bitmap &= output_result;
|
||||
if (_row_bitmap.isEmpty()) {
|
||||
@ -742,6 +783,17 @@ Status SegmentIterator::_apply_inverted_index_on_block_column_predicate(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool SegmentIterator::_need_read_data(ColumnId cid) {
|
||||
int32_t unique_id = _schema.unique_id(cid);
|
||||
if (_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id] &&
|
||||
_output_columns.count(unique_id) < 1) {
|
||||
VLOG_DEBUG << "SegmentIterator no need read data for column: "
|
||||
<< _opts.tablet_schema->column_by_uid(unique_id).name();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
Status SegmentIterator::_apply_inverted_index() {
|
||||
SCOPED_RAW_TIMER(&_opts.stats->inverted_index_filter_timer);
|
||||
size_t input_rows = _row_bitmap.cardinality();
|
||||
@ -993,6 +1045,9 @@ Status SegmentIterator::_seek_and_peek(rowid_t rowid) {
|
||||
|
||||
Status SegmentIterator::_seek_columns(const std::vector<ColumnId>& column_ids, rowid_t pos) {
|
||||
for (auto cid : column_ids) {
|
||||
if (!_need_read_data(cid)) {
|
||||
continue;
|
||||
}
|
||||
RETURN_IF_ERROR(_column_iterators[_schema.unique_id(cid)]->seek_to_ordinal(pos));
|
||||
}
|
||||
return Status::OK();
|
||||
@ -1201,11 +1256,33 @@ void SegmentIterator::_vec_init_char_column_id() {
|
||||
}
|
||||
}
|
||||
|
||||
bool SegmentIterator::_prune_column(ColumnId cid, vectorized::MutableColumnPtr& column,
|
||||
bool fill_defaults, size_t num_of_defaults) {
|
||||
if (_need_read_data(cid)) {
|
||||
return false;
|
||||
}
|
||||
if (!fill_defaults) {
|
||||
return true;
|
||||
}
|
||||
if (column->is_nullable()) {
|
||||
auto nullable_col_ptr = reinterpret_cast<vectorized::ColumnNullable*>(column.get());
|
||||
nullable_col_ptr->get_null_map_column().insert_many_defaults(num_of_defaults);
|
||||
nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(num_of_defaults);
|
||||
} else {
|
||||
// assert(column->is_const());
|
||||
column->insert_many_defaults(num_of_defaults);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids,
|
||||
vectorized::MutableColumns& column_block, size_t nrows) {
|
||||
for (auto cid : column_ids) {
|
||||
auto& column = column_block[cid];
|
||||
size_t rows_read = nrows;
|
||||
if (_prune_column(cid, column, true, rows_read)) {
|
||||
continue;
|
||||
}
|
||||
RETURN_IF_ERROR(_column_iterators[_schema.unique_id(cid)]->next_batch(&rows_read, column));
|
||||
DCHECK_EQ(nrows, rows_read);
|
||||
}
|
||||
@ -1370,7 +1447,8 @@ uint16_t SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_ro
|
||||
|
||||
Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
|
||||
std::vector<rowid_t>& rowid_vector,
|
||||
uint16_t* sel_rowid_idx, size_t select_size) {
|
||||
uint16_t* sel_rowid_idx, size_t select_size,
|
||||
vectorized::MutableColumns* mutable_columns) {
|
||||
SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns);
|
||||
std::vector<rowid_t> rowids(select_size);
|
||||
for (size_t i = 0; i < select_size; ++i) {
|
||||
@ -1378,6 +1456,9 @@ Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_colu
|
||||
}
|
||||
|
||||
for (auto cid : read_column_ids) {
|
||||
if (_prune_column(cid, (*mutable_columns)[cid], true, select_size)) {
|
||||
continue;
|
||||
}
|
||||
RETURN_IF_ERROR(_column_iterators[_schema.unique_id(cid)]->read_by_rowids(
|
||||
rowids.data(), select_size, _current_return_columns[cid]));
|
||||
}
|
||||
@ -1493,7 +1574,8 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
|
||||
|
||||
// step3: read non_predicate column
|
||||
RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns, _block_rowids,
|
||||
sel_rowid_idx, selected_size));
|
||||
sel_rowid_idx, selected_size,
|
||||
&_current_return_columns));
|
||||
|
||||
// step4: output columns
|
||||
// 4.1 output non-predicate column
|
||||
@ -1642,5 +1724,77 @@ Status SegmentIterator::current_block_row_locations(std::vector<RowLocation>* bl
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
/**
|
||||
* solution 1: where cluase included nodes are all `and` leaf nodes,
|
||||
* predicate pushed down and remove from vconjunct.
|
||||
* for example: where A = 1 and B = 'test' and B like '%he%';
|
||||
* column A : `A = 1` pushed down, this column's predicates all pushed down,
|
||||
* call _check_column_pred_all_push_down will return true.
|
||||
* column B : `B = 'test'` pushed down, but `B like '%he%'` remain in vconjunct,
|
||||
* call _check_column_pred_all_push_down will return false.
|
||||
*
|
||||
* solution 2: where cluase included nodes are compound or other complex conditions,
|
||||
* predicate pushed down but still remain in vconjunct.
|
||||
* for exmple: where (A = 1 and B = 'test') or B = 'hi' or (C like '%ye%' and C > 'aa');
|
||||
* column A : `A = 1` pushed down, check it applyed by index,
|
||||
* call _check_column_pred_all_push_down will return true.
|
||||
* column B : `B = 'test'`, `B = 'hi'` all pushed down, check them all applyed by index,
|
||||
* call _check_column_pred_all_push_down will return true.
|
||||
* column C : `C like '%ye%'` not pushed down, `C > 'aa'` pushed down, only `C > 'aa'` applyed by index,
|
||||
* call _check_column_pred_all_push_down will return false.
|
||||
*/
|
||||
bool SegmentIterator::_check_column_pred_all_push_down(const std::string& column_name,
|
||||
bool in_compound) {
|
||||
if (_remaining_vconjunct_root == nullptr) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (in_compound) {
|
||||
auto preds_in_remaining_vconjuct = _column_pred_in_remaining_vconjunct[column_name];
|
||||
for (auto pred_info : preds_in_remaining_vconjuct) {
|
||||
auto column_sign = _gen_predicate_result_sign(&pred_info);
|
||||
if (_rowid_result_for_index.count(column_sign) < 1) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (_column_pred_in_remaining_vconjunct[column_name].size() != 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void SegmentIterator::_calculate_pred_in_remaining_vconjunct_root(const vectorized::VExpr* expr) {
|
||||
if (expr == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto children = expr->children();
|
||||
for (int i = 0; i < children.size(); ++i) {
|
||||
_calculate_pred_in_remaining_vconjunct_root(children[i]);
|
||||
}
|
||||
|
||||
auto node_type = expr->node_type();
|
||||
if (node_type == TExprNodeType::SLOT_REF) {
|
||||
_column_predicate_info->column_name = expr->expr_name();
|
||||
} else if (_is_literal_node(node_type)) {
|
||||
auto v_literal_expr = static_cast<const doris::vectorized::VLiteral*>(expr);
|
||||
_column_predicate_info->query_value = v_literal_expr->value();
|
||||
} else {
|
||||
if (node_type == TExprNodeType::MATCH_PRED) {
|
||||
_column_predicate_info->query_op = "match";
|
||||
} else if (node_type != TExprNodeType::COMPOUND_PRED) {
|
||||
_column_predicate_info->query_op = expr->fn().name.function_name;
|
||||
}
|
||||
|
||||
if (!_column_predicate_info->is_empty()) {
|
||||
_column_pred_in_remaining_vconjunct[_column_predicate_info->column_name].push_back(
|
||||
*_column_predicate_info);
|
||||
_column_predicate_info.reset(new ColumnPredicateInfo());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace segment_v2
|
||||
} // namespace doris
|
||||
|
||||
@ -51,6 +51,32 @@ class ColumnIterator;
|
||||
|
||||
struct ColumnPredicateInfo {
|
||||
ColumnPredicateInfo() = default;
|
||||
|
||||
std::string debug_string() const {
|
||||
std::stringstream ss;
|
||||
ss << "column_name=" << column_name << ", query_op=" << query_op
|
||||
<< ", query_value=" << query_value;
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
bool is_empty() const { return column_name.empty() && query_value.empty() && query_op.empty(); }
|
||||
|
||||
bool is_equal(const ColumnPredicateInfo& column_pred_info) const {
|
||||
if (column_pred_info.column_name != column_name) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (column_pred_info.query_value != query_value) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (column_pred_info.query_op != query_op) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::string column_name;
|
||||
std::string query_value;
|
||||
std::string query_op;
|
||||
@ -167,7 +193,7 @@ private:
|
||||
void _output_non_pred_columns(vectorized::Block* block);
|
||||
Status _read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
|
||||
std::vector<rowid_t>& rowid_vector, uint16_t* sel_rowid_idx,
|
||||
size_t select_size);
|
||||
size_t select_size, vectorized::MutableColumns* mutable_columns);
|
||||
|
||||
template <class Container>
|
||||
Status _output_column_by_sel_idx(vectorized::Block* block, const Container& column_ids,
|
||||
@ -203,6 +229,14 @@ private:
|
||||
void _output_index_result_column(uint16_t* sel_rowid_idx, uint16_t select_size,
|
||||
vectorized::Block* block);
|
||||
|
||||
bool _need_read_data(ColumnId cid);
|
||||
bool _prune_column(ColumnId cid, vectorized::MutableColumnPtr& column, bool fill_defaults,
|
||||
size_t num_of_defaults);
|
||||
|
||||
// return true means one column's predicates all pushed down
|
||||
bool _check_column_pred_all_push_down(const std::string& column_name, bool in_compound = false);
|
||||
void _calculate_pred_in_remaining_vconjunct_root(const vectorized::VExpr* expr);
|
||||
|
||||
private:
|
||||
// todo(wb) remove this method after RowCursor is removed
|
||||
void _convert_rowcursor_to_short_key(const RowCursor& key, size_t num_keys) {
|
||||
@ -297,6 +331,7 @@ private:
|
||||
std::vector<ColumnId>
|
||||
_short_cir_pred_column_ids; // keep columnId of columns for short circuit predicate evaluation
|
||||
std::vector<bool> _is_pred_column; // columns hold by segmentIter
|
||||
std::map<uint32_t, bool> _need_read_data_indices;
|
||||
vectorized::MutableColumns _current_return_columns;
|
||||
std::vector<ColumnPredicate*> _pre_eval_block_predicate;
|
||||
std::vector<ColumnPredicate*> _short_cir_eval_predicate;
|
||||
@ -320,8 +355,12 @@ private:
|
||||
doris::vectorized::VExpr* _remaining_vconjunct_root;
|
||||
std::vector<roaring::Roaring> _pred_except_leafnode_of_andnode_evaluate_result;
|
||||
std::unique_ptr<ColumnPredicateInfo> _column_predicate_info;
|
||||
std::unordered_map<std::string, std::vector<ColumnPredicateInfo>>
|
||||
_column_pred_in_remaining_vconjunct;
|
||||
std::set<ColumnId> _not_apply_index_pred;
|
||||
|
||||
std::shared_ptr<ColumnPredicate> _runtime_predicate {nullptr};
|
||||
std::set<int32_t> _output_columns;
|
||||
|
||||
// row schema of the key to seek
|
||||
// only used in `_get_row_ranges_by_keys`
|
||||
|
||||
@ -372,6 +372,15 @@ Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
|
||||
(*_vconjunct_ctx_ptr)->root()->debug_string());
|
||||
}
|
||||
|
||||
if (!_olap_scan_node.output_column_unique_ids.empty()) {
|
||||
for (auto uid : _olap_scan_node.output_column_unique_ids) {
|
||||
if (uid < 0) {
|
||||
continue;
|
||||
}
|
||||
_maybe_read_column_ids.emplace(uid);
|
||||
}
|
||||
}
|
||||
|
||||
// ranges constructed from scan keys
|
||||
std::vector<std::unique_ptr<doris::OlapScanRange>> cond_ranges;
|
||||
RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges));
|
||||
|
||||
@ -68,6 +68,8 @@ private:
|
||||
// _compound_filters store conditions in the one compound relationship in conjunct expr tree except leaf node of `and` node,
|
||||
// such as: "(a or b) and (c or d)", conditions for a,b,c,d will be stored
|
||||
std::vector<TCondition> _compound_filters;
|
||||
// If column id in this set, indicate that we need to read data after index filtering
|
||||
std::set<int32_t> _maybe_read_column_ids;
|
||||
|
||||
private:
|
||||
std::unique_ptr<RuntimeProfile> _segment_profile;
|
||||
|
||||
@ -215,6 +215,7 @@ Status NewOlapScanner::_init_tablet_reader_params(
|
||||
_tablet_reader_params.version = Version(0, _version);
|
||||
_tablet_reader_params.remaining_vconjunct_root =
|
||||
(_vconjunct_ctx == nullptr) ? nullptr : _vconjunct_ctx->root();
|
||||
_tablet_reader_params.output_columns = ((NewOlapScanNode*)_parent)->_maybe_read_column_ids;
|
||||
|
||||
// Condition
|
||||
for (auto& filter : filters) {
|
||||
|
||||
@ -511,6 +511,8 @@ Status VScanNode::_normalize_predicate(VExpr* conjunct_expr_root, VExpr** output
|
||||
auto impl = conjunct_expr_root->get_impl();
|
||||
// If impl is not null, which means this a conjuncts from runtime filter.
|
||||
VExpr* cur_expr = impl ? const_cast<VExpr*>(impl) : conjunct_expr_root;
|
||||
bool is_runtimer_filter_predicate =
|
||||
_rf_vexpr_set.find(conjunct_expr_root) != _rf_vexpr_set.end();
|
||||
SlotDescriptor* slot = nullptr;
|
||||
ColumnValueRangeType* range = nullptr;
|
||||
PushDownType pdt = PushDownType::UNACCEPTABLE;
|
||||
@ -523,6 +525,10 @@ Status VScanNode::_normalize_predicate(VExpr* conjunct_expr_root, VExpr** output
|
||||
_is_predicate_acting_on_slot(cur_expr, eq_predicate_checker, &slot, &range)) {
|
||||
std::visit(
|
||||
[&](auto& value_range) {
|
||||
Defer mark_runtime_filter_flag {[&]() {
|
||||
value_range.mark_runtime_filter_predicate(
|
||||
is_runtimer_filter_predicate);
|
||||
}};
|
||||
RETURN_IF_PUSH_DOWN(_normalize_in_and_eq_predicate(
|
||||
cur_expr, *(_vconjunct_ctx_ptr.get()), slot, value_range,
|
||||
&pdt));
|
||||
@ -555,7 +561,8 @@ Status VScanNode::_normalize_predicate(VExpr* conjunct_expr_root, VExpr** output
|
||||
if (pdt == PushDownType::UNACCEPTABLE &&
|
||||
TExprNodeType::COMPOUND_PRED == cur_expr->node_type()) {
|
||||
_normalize_compound_predicate(cur_expr, *(_vconjunct_ctx_ptr.get()), &pdt,
|
||||
in_predicate_checker, eq_predicate_checker);
|
||||
is_runtimer_filter_predicate, in_predicate_checker,
|
||||
eq_predicate_checker);
|
||||
*output_expr = conjunct_expr_root; // remaining in conjunct tree
|
||||
return Status::OK();
|
||||
}
|
||||
@ -977,6 +984,7 @@ Status VScanNode::_normalize_noneq_binary_predicate(VExpr* expr, VExprContext* e
|
||||
|
||||
Status VScanNode::_normalize_compound_predicate(
|
||||
vectorized::VExpr* expr, VExprContext* expr_ctx, PushDownType* pdt,
|
||||
bool is_runtimer_filter_predicate,
|
||||
const std::function<bool(const std::vector<VExpr*>&, const VSlotRef**, VExpr**)>&
|
||||
in_predicate_checker,
|
||||
const std::function<bool(const std::vector<VExpr*>&, const VSlotRef**, VExpr**)>&
|
||||
@ -997,6 +1005,10 @@ Status VScanNode::_normalize_compound_predicate(
|
||||
*range_on_slot; // copy, in order not to affect the range in the _colname_to_value_range
|
||||
std::visit(
|
||||
[&](auto& value_range) {
|
||||
Defer mark_runtime_filter_flag {[&]() {
|
||||
value_range.mark_runtime_filter_predicate(
|
||||
is_runtimer_filter_predicate);
|
||||
}};
|
||||
_normalize_binary_in_compound_predicate(child_expr, expr_ctx, slot,
|
||||
value_range, pdt);
|
||||
},
|
||||
@ -1015,6 +1027,10 @@ Status VScanNode::_normalize_compound_predicate(
|
||||
*range_on_slot; // copy, in order not to affect the range in the _colname_to_value_range
|
||||
std::visit(
|
||||
[&](auto& value_range) {
|
||||
Defer mark_runtime_filter_flag {[&]() {
|
||||
value_range.mark_runtime_filter_predicate(
|
||||
is_runtimer_filter_predicate);
|
||||
}};
|
||||
_normalize_match_in_compound_predicate(child_expr, expr_ctx, slot,
|
||||
value_range, pdt);
|
||||
},
|
||||
@ -1023,7 +1039,8 @@ Status VScanNode::_normalize_compound_predicate(
|
||||
_compound_value_ranges.emplace_back(active_range);
|
||||
}
|
||||
} else if (TExprNodeType::COMPOUND_PRED == child_expr->node_type()) {
|
||||
_normalize_compound_predicate(child_expr, expr_ctx, pdt, in_predicate_checker,
|
||||
_normalize_compound_predicate(child_expr, expr_ctx, pdt,
|
||||
is_runtimer_filter_predicate, in_predicate_checker,
|
||||
eq_predicate_checker);
|
||||
}
|
||||
}
|
||||
|
||||
@ -340,6 +340,7 @@ private:
|
||||
|
||||
Status _normalize_compound_predicate(
|
||||
vectorized::VExpr* expr, VExprContext* expr_ctx, PushDownType* pdt,
|
||||
bool is_runtimer_filter_predicate,
|
||||
const std::function<bool(const std::vector<VExpr*>&, const VSlotRef**, VExpr**)>&
|
||||
in_predicate_checker,
|
||||
const std::function<bool(const std::vector<VExpr*>&, const VSlotRef**, VExpr**)>&
|
||||
|
||||
@ -154,6 +154,11 @@ public abstract class AggregateInfoBase {
|
||||
Expr expr = exprs.get(i);
|
||||
SlotDescriptor slotDesc = analyzer.addSlotDescriptor(result);
|
||||
slotDesc.initFromExpr(expr);
|
||||
if (expr instanceof SlotRef) {
|
||||
if (((SlotRef) expr).getColumn() != null) {
|
||||
slotDesc.setColumn(((SlotRef) expr).getColumn());
|
||||
}
|
||||
}
|
||||
// Not change the nullable of slot desc when is not grouping set id
|
||||
if (isGroupingSet && i < aggregateExprStartIndex - 1 && !(expr instanceof VirtualSlotRef)) {
|
||||
slotDesc.setIsNullable(true);
|
||||
|
||||
@ -51,6 +51,7 @@ public class SortInfo {
|
||||
private static final float SORT_MATERIALIZATION_COST_THRESHOLD = Expr.FUNCTION_CALL_COST;
|
||||
|
||||
private List<Expr> orderingExprs;
|
||||
private List<Expr> origOrderingExprs;
|
||||
private final List<Boolean> isAscOrder;
|
||||
// True if "NULLS FIRST", false if "NULLS LAST", null if not specified.
|
||||
private final List<Boolean> nullsFirstParams;
|
||||
@ -122,6 +123,10 @@ public class SortInfo {
|
||||
return orderingExprs;
|
||||
}
|
||||
|
||||
public List<Expr> getOrigOrderingExprs() {
|
||||
return origOrderingExprs;
|
||||
}
|
||||
|
||||
public List<Boolean> getIsAscOrder() {
|
||||
return isAscOrder;
|
||||
}
|
||||
@ -261,6 +266,9 @@ public class SortInfo {
|
||||
}
|
||||
}
|
||||
|
||||
// backup before substitute orderingExprs
|
||||
origOrderingExprs = orderingExprs;
|
||||
|
||||
// The ordering exprs are evaluated against the sort tuple, so they must reflect the
|
||||
// materialization decision above.
|
||||
substituteOrderingExprs(substOrderBy, analyzer);
|
||||
|
||||
@ -155,6 +155,7 @@ public class OlapScanNode extends ScanNode {
|
||||
private long totalBytes = 0;
|
||||
|
||||
private SortInfo sortInfo = null;
|
||||
private HashSet<Integer> outputColumnUniqueIds = new HashSet<>();
|
||||
|
||||
// When scan match sort_info, we can push limit into OlapScanNode.
|
||||
// It's limit for scanner instead of scanNode so we add a new limit.
|
||||
@ -813,6 +814,10 @@ public class OlapScanNode extends ScanNode {
|
||||
LOG.debug("distribution prune cost: {} ms", (System.currentTimeMillis() - start));
|
||||
}
|
||||
|
||||
public void setOutputColumnUniqueIds(HashSet<Integer> outputColumnUniqueIds) {
|
||||
this.outputColumnUniqueIds = outputColumnUniqueIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* First, determine how many rows to sample from each partition according to the number of partitions.
|
||||
* Then determine the number of Tablets to be selected for each partition according to the average number
|
||||
@ -1149,6 +1154,10 @@ public class OlapScanNode extends ScanNode {
|
||||
if (pushDownAggNoGroupingOp != null) {
|
||||
msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
|
||||
}
|
||||
|
||||
if (outputColumnUniqueIds != null) {
|
||||
msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds);
|
||||
}
|
||||
}
|
||||
|
||||
// export some tablets
|
||||
|
||||
@ -50,6 +50,7 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@ -253,6 +254,10 @@ public class OriginalPlanner extends Planner {
|
||||
|
||||
pushDownResultFileSink(analyzer);
|
||||
|
||||
if (VectorizedUtil.isVectorized()) {
|
||||
pushOutColumnUniqueIdsToOlapScan(rootFragment, analyzer);
|
||||
}
|
||||
|
||||
if (queryStmt instanceof SelectStmt) {
|
||||
SelectStmt selectStmt = (SelectStmt) queryStmt;
|
||||
if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) {
|
||||
@ -436,6 +441,72 @@ public class OriginalPlanner extends Planner {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* outputColumnUniqueIds contain columns in OrderByExprs and outputExprs,
|
||||
* push output column unique id set to olap scan.
|
||||
*
|
||||
* when query to storage layer, there are need read raw data
|
||||
* for columns which in outputColumnUniqueIds
|
||||
*
|
||||
* for example:
|
||||
* select A from tb where B = 1 and C > 'hello' order by B;
|
||||
*
|
||||
* column unique id for `A` and `B` will put into outputColumnUniqueIds.
|
||||
*
|
||||
*/
|
||||
private void pushOutColumnUniqueIdsToOlapScan(PlanFragment rootFragment, Analyzer analyzer) {
|
||||
HashSet<Integer> outputColumnUniqueIds = new HashSet<>();
|
||||
ArrayList<Expr> outputExprs = rootFragment.getOutputExprs();
|
||||
for (Expr expr : outputExprs) {
|
||||
if (expr instanceof SlotRef) {
|
||||
if (((SlotRef) expr).getColumn() != null) {
|
||||
outputColumnUniqueIds.add(((SlotRef) expr).getColumn().getUniqueId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (PlanFragment fragment : fragments) {
|
||||
PlanNode node = fragment.getPlanRoot();
|
||||
PlanNode parent = null;
|
||||
while (node.getChildren().size() != 0) {
|
||||
for (PlanNode childNode : node.getChildren()) {
|
||||
List<SlotId> outputSlotIds = childNode.getOutputSlotIds();
|
||||
if (outputSlotIds != null) {
|
||||
for (SlotId sid : outputSlotIds) {
|
||||
SlotDescriptor slotDesc = analyzer.getSlotDesc(sid);
|
||||
outputColumnUniqueIds.add(slotDesc.getUniqueId());
|
||||
}
|
||||
}
|
||||
}
|
||||
// OlapScanNode is the last node.
|
||||
// So, just get the two node and check if they are SortNode and OlapScan.
|
||||
parent = node;
|
||||
node = node.getChildren().get(0);
|
||||
}
|
||||
|
||||
if (parent instanceof SortNode) {
|
||||
SortNode sortNode = (SortNode) parent;
|
||||
List<Expr> orderingExprs = sortNode.getSortInfo().getOrigOrderingExprs();
|
||||
if (orderingExprs != null) {
|
||||
for (Expr expr : orderingExprs) {
|
||||
if (expr instanceof SlotRef) {
|
||||
if (((SlotRef) expr).getColumn() != null) {
|
||||
outputColumnUniqueIds.add(((SlotRef) expr).getColumn().getUniqueId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!(node instanceof OlapScanNode)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
OlapScanNode scanNode = (OlapScanNode) node;
|
||||
scanNode.setOutputColumnUniqueIds(outputColumnUniqueIds);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* optimize for topn query like: SELECT * FROM t1 WHERE a>100 ORDER BY b,c LIMIT 100
|
||||
* the pre-requirement is as follows:
|
||||
|
||||
@ -1130,4 +1130,8 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
|
||||
public List<Expr> getProjectList() {
|
||||
return projectList;
|
||||
}
|
||||
|
||||
public List<SlotId> getOutputSlotIds() {
|
||||
return outputSlotIds;
|
||||
}
|
||||
}
|
||||
|
||||
@ -409,7 +409,7 @@ public class TableFunctionPlanTest {
|
||||
"SlotDescriptor{id=1,col=k2,colUniqueId=1,type=VARCHAR(1)"
|
||||
));
|
||||
Assert.assertTrue(formatString.contains(
|
||||
"SlotDescriptor{id=2,col=null,colUniqueId=null,type=INT"
|
||||
"SlotDescriptor{id=2,col=k1,colUniqueId=0,type=INT"
|
||||
));
|
||||
Assert.assertTrue(formatString.contains(
|
||||
"SlotDescriptor{id=3,col=null,colUniqueId=null,type=VARCHAR(*)"
|
||||
|
||||
@ -545,6 +545,7 @@ struct TCondition {
|
||||
// In delete condition, the different column may have same column name, need
|
||||
// using unique id to distinguish them
|
||||
4: optional i32 column_unique_id
|
||||
5: optional bool marked_by_runtime_filter = false
|
||||
}
|
||||
|
||||
struct TExportStatusResult {
|
||||
|
||||
@ -575,6 +575,7 @@ struct TOlapScanNode {
|
||||
12: optional TPushAggOp push_down_agg_type_opt
|
||||
13: optional bool use_topn_opt
|
||||
14: optional list<Descriptors.TOlapTableIndex> indexes_desc
|
||||
15: optional set<i32> output_column_unique_ids
|
||||
}
|
||||
|
||||
struct TEqJoinCondition {
|
||||
|
||||
@ -35,7 +35,7 @@ suite("test_add_drop_index_with_delete_data", "inverted_index"){
|
||||
assertTrue(useTime <= OpTimeout)
|
||||
}
|
||||
|
||||
def indexTbName1 = "test_add_drop_inverted_index2"
|
||||
def indexTbName1 = "test_add_drop_inverted_index3"
|
||||
|
||||
sql "DROP TABLE IF EXISTS ${indexTbName1}"
|
||||
// create 1 replica table
|
||||
|
||||
Reference in New Issue
Block a user