diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index 58fd8f3ef7..62422f1410 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -119,6 +119,7 @@ public: std::vector* read_orderby_key_columns = nullptr; IOContext io_ctx; vectorized::VExpr* remaining_vconjunct_root = nullptr; + vectorized::VExprContext* common_vexpr_ctxs_pushdown = nullptr; const std::set* output_columns = nullptr; // runtime state RuntimeState* runtime_state = nullptr; diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 2ae791969b..5dfbb96a8b 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -306,6 +306,7 @@ struct OlapReaderStatistics { int64_t block_init_seek_num = 0; int64_t block_init_seek_ns = 0; int64_t first_read_ns = 0; + int64_t second_read_ns = 0; int64_t block_first_read_seek_num = 0; int64_t block_first_read_seek_ns = 0; int64_t lazy_read_ns = 0; @@ -320,6 +321,7 @@ struct OlapReaderStatistics { int64_t rows_vec_del_cond_filtered = 0; int64_t vec_cond_ns = 0; int64_t short_cond_ns = 0; + int64_t expr_filter_ns = 0; int64_t output_col_ns = 0; int64_t rows_key_range_filtered = 0; diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 7d03696bc7..dba7f222ac 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -225,6 +225,7 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) { _reader_context.record_rowids = read_params.record_rowids; _reader_context.is_key_column_group = read_params.is_key_column_group; _reader_context.remaining_vconjunct_root = read_params.remaining_vconjunct_root; + _reader_context.common_vexpr_ctxs_pushdown = read_params.common_vexpr_ctxs_pushdown; _reader_context.output_columns = &read_params.output_columns; return Status::OK(); diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 41e095ade0..17f7e79286 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -113,6 +113,7 @@ public: std::unordered_set* tablet_columns_convert_to_null_set = nullptr; TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE; vectorized::VExpr* remaining_vconjunct_root = nullptr; + vectorized::VExprContext* common_vexpr_ctxs_pushdown = nullptr; // used for compaction to record row ids bool record_rowids = false; diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 2ed1891892..0f4410a45f 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -66,6 +66,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context _read_options.stats = _stats; _read_options.push_down_agg_type_opt = _context->push_down_agg_type_opt; _read_options.remaining_vconjunct_root = _context->remaining_vconjunct_root; + _read_options.common_vexpr_ctxs_pushdown = _context->common_vexpr_ctxs_pushdown; _read_options.rowset_id = _rowset->rowset_id(); _read_options.version = _rowset->version(); _read_options.tablet_id = _rowset->rowset_meta()->tablet_id(); diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index 0fce2cde12..7671a75d43 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -64,6 +64,7 @@ struct RowsetReaderContext { OlapReaderStatistics* stats = nullptr; RuntimeState* runtime_state = nullptr; vectorized::VExpr* remaining_vconjunct_root = nullptr; + vectorized::VExprContext* common_vexpr_ctxs_pushdown = nullptr; bool use_page_cache = false; int sequence_id_idx = -1; int batch_size = 1024; diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index dfdb340027..7d73492be7 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -32,9 +32,12 @@ #include "util/doris_metrics.h" #include "util/key_util.h" #include "util/simd/bits.h" +#include "vec/columns/column.h" +#include "vec/columns/column_const.h" #include "vec/data_types/data_type_factory.hpp" #include "vec/data_types/data_type_number.h" #include "vec/exprs/vliteral.h" +#include "vec/exprs/vslot_ref.h" namespace doris { using namespace ErrorCode; @@ -153,7 +156,8 @@ SegmentIterator::SegmentIterator(std::shared_ptr segment, const Schema& _cur_rowid(0), _lazy_materialization_read(false), _inited(false), - _estimate_row_size(true) {} + _estimate_row_size(true), + _wait_times_estimate_row_size(10) {} SegmentIterator::~SegmentIterator() { for (auto iter : _column_iterators) { @@ -183,6 +187,8 @@ Status SegmentIterator::init(const StorageReadOptions& opts) { } _remaining_vconjunct_root = opts.remaining_vconjunct_root; + _common_vexpr_ctxs_pushdown = opts.common_vexpr_ctxs_pushdown; + _enable_common_expr_pushdown = _common_vexpr_ctxs_pushdown ? true : false; _column_predicate_info.reset(new ColumnPredicateInfo()); _calculate_pred_in_remaining_vconjunct_root(_remaining_vconjunct_root); @@ -465,6 +471,22 @@ bool SegmentIterator::_is_literal_node(const TExprNodeType::type& node_type) { } } +Status SegmentIterator::_extract_common_expr_columns(vectorized::VExpr* expr) { + auto children = expr->children(); + for (int i = 0; i < children.size(); ++i) { + RETURN_IF_ERROR(_extract_common_expr_columns(children[i])); + } + + auto node_type = expr->node_type(); + if (node_type == TExprNodeType::SLOT_REF) { + auto slot_expr = dynamic_cast(expr); + _is_common_expr_column[_schema.column_id(slot_expr->column_id())] = true; + _common_expr_columns.insert(_schema.column_id(slot_expr->column_id())); + } + + return Status::OK(); +} + Status SegmentIterator::_execute_predicates_except_leafnode_of_andnode(vectorized::VExpr* expr) { if (expr == nullptr) { return Status::OK(); @@ -1154,6 +1176,7 @@ void SegmentIterator::_vec_init_lazy_materialization() { } } + // Step1: extract columns that can be lazy materialization if (!_col_predicates.empty() || !del_cond_id_set.empty()) { std::set short_cir_pred_col_id_set; // using set for distinct cid std::set vec_pred_col_id_set; @@ -1163,7 +1186,7 @@ void SegmentIterator::_vec_init_lazy_materialization() { _is_pred_column[cid] = true; pred_column_ids.insert(cid); - // Step1: check pred using short eval or vec eval + // check pred using short eval or vec eval if (_can_evaluated_by_vectorized(predicate)) { vec_pred_col_id_set.insert(predicate->column_id()); _pre_eval_block_predicate.push_back(predicate); @@ -1195,58 +1218,92 @@ void SegmentIterator::_vec_init_lazy_materialization() { _is_need_short_eval = true; } - // Step 2: check non-predicate read costs to determine whether need lazy materialization - // fill _non_predicate_columns. - // After some optimization, we suppose lazy materialization is better performance. - if (_schema.column_ids().size() > pred_column_ids.size()) { - for (auto cid : _schema.column_ids()) { - if (!_is_pred_column[cid]) { - _non_predicate_columns.push_back(cid); - if (_is_need_vec_eval || _is_need_short_eval) { - _lazy_materialization_read = true; - } - } - } - } - - // Step 3: fill column ids for read and output - if (_lazy_materialization_read) { - // insert pred cid to first_read_columns - for (auto cid : pred_column_ids) { - _first_read_column_ids.push_back(cid); - } - } else if (!_is_need_vec_eval && - !_is_need_short_eval) { // no pred exists, just read and output column - for (int i = 0; i < _schema.num_column_ids(); i++) { - auto cid = _schema.column_id(i); - _first_read_column_ids.push_back(cid); - } - } else { // pred exits, but we can eliminate lazy materialization - // insert pred/non-pred cid to first read columns - std::set pred_id_set; - pred_id_set.insert(_short_cir_pred_column_ids.begin(), _short_cir_pred_column_ids.end()); - pred_id_set.insert(_vec_pred_column_ids.begin(), _vec_pred_column_ids.end()); - std::set non_pred_set(_non_predicate_columns.begin(), - _non_predicate_columns.end()); - - for (int i = 0; i < _schema.num_column_ids(); i++) { - auto cid = _schema.column_id(i); - if (pred_id_set.find(cid) != pred_id_set.end()) { - _first_read_column_ids.push_back(cid); - } else if (non_pred_set.find(cid) != non_pred_set.end()) { - _first_read_column_ids.push_back(cid); - // when _lazy_materialization_read = false, non-predicate column should also be filtered by sel idx, so we regard it as pred columns - _is_pred_column[cid] = true; - } - } - } - // make _schema_block_id_map _schema_block_id_map.resize(_schema.columns().size()); for (int i = 0; i < _schema.num_column_ids(); i++) { auto cid = _schema.column_id(i); _schema_block_id_map[cid] = i; } + + // Step2: extract columns that can execute expr context + _is_common_expr_column.resize(_schema.columns().size(), false); + if (_enable_common_expr_pushdown && _remaining_vconjunct_root != nullptr) { + _extract_common_expr_columns(_remaining_vconjunct_root); + if (!_common_expr_columns.empty()) { + _is_need_expr_eval = true; + for (auto cid : _schema.column_ids()) { + // pred column also needs to be filtered by expr + if (_is_common_expr_column[cid] || _is_pred_column[cid]) { + auto loc = _schema_block_id_map[cid]; + _columns_to_filter.push_back(loc); + } + } + } + } + + // Step 3: fill non predicate columns and second read column + // if _schema columns size equal to pred_column_ids size, lazy_materialization_read is false, + // all columns are lazy materialization columns without non predicte column. + // If common expr pushdown exists, and expr column is not contained in lazy materialization columns, + // add to second read column, which will be read after lazy materialization + if (_schema.column_ids().size() > pred_column_ids.size()) { + for (auto cid : _schema.column_ids()) { + if (!_is_pred_column[cid]) { + if (_is_need_vec_eval || _is_need_short_eval) { + _lazy_materialization_read = true; + } + if (!_is_common_expr_column[cid]) { + _non_predicate_columns.push_back(cid); + } else { + _second_read_column_ids.push_back(cid); + } + } + } + } + + // Step 4: fill first read columns + if (_lazy_materialization_read) { + // insert pred cid to first_read_columns + for (auto cid : pred_column_ids) { + _first_read_column_ids.push_back(cid); + } + } else if (!_is_need_vec_eval && !_is_need_short_eval && + !_is_need_expr_eval) { // no pred exists, just read and output column + for (int i = 0; i < _schema.num_column_ids(); i++) { + auto cid = _schema.column_id(i); + _first_read_column_ids.push_back(cid); + } + } else { + if (_is_need_vec_eval || _is_need_short_eval) { + // TODO To refactor, because we suppose lazy materialization is better performance. + // pred exits, but we can eliminate lazy materialization + // insert pred/non-pred cid to first read columns + std::set pred_id_set; + pred_id_set.insert(_short_cir_pred_column_ids.begin(), + _short_cir_pred_column_ids.end()); + pred_id_set.insert(_vec_pred_column_ids.begin(), _vec_pred_column_ids.end()); + std::set non_pred_set(_non_predicate_columns.begin(), + _non_predicate_columns.end()); + + DCHECK(_second_read_column_ids.empty()); + // _second_read_column_ids must be empty. Otherwise _lazy_materialization_read must not false. + for (int i = 0; i < _schema.num_column_ids(); i++) { + auto cid = _schema.column_id(i); + if (pred_id_set.find(cid) != pred_id_set.end()) { + _first_read_column_ids.push_back(cid); + } else if (non_pred_set.find(cid) != non_pred_set.end()) { + _first_read_column_ids.push_back(cid); + // when _lazy_materialization_read = false, non-predicate column should also be filtered by sel idx, so we regard it as pred columns + _is_pred_column[cid] = true; + } + } + } else if (_is_need_expr_eval) { + DCHECK(!_is_need_vec_eval && !_is_need_short_eval); + for (auto cid : _common_expr_columns) { + _first_read_column_ids.push_back(cid); + } + } + } } bool SegmentIterator::_can_evaluated_by_vectorized(ColumnPredicate* predicate) { @@ -1281,6 +1338,9 @@ void SegmentIterator::_vec_init_char_column_id() { do { if (column_desc->type() == OLAP_FIELD_TYPE_CHAR) { _char_type_idx.emplace_back(i); + if (i != 0) { + _char_type_idx_no_0.emplace_back(i); + } break; } else if (column_desc->type() != OLAP_FIELD_TYPE_ARRAY) { break; @@ -1355,7 +1415,7 @@ void SegmentIterator::_output_non_pred_columns(vectorized::Block* block) { SCOPED_RAW_TIMER(&_opts.stats->output_col_ns); for (auto cid : _non_predicate_columns) { auto loc = _schema_block_id_map[cid]; - // if loc < block->block->columns() means the column is delete column and should + // if loc > block->columns() means the column is delete column and should // not output by block, so just skip the column. if (loc < block->columns()) { block->replace_by_position(loc, std::move(_current_return_columns[cid])); @@ -1536,7 +1596,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { if (UNLIKELY(!_inited)) { RETURN_IF_ERROR(_init()); _inited = true; - if (_lazy_materialization_read || _opts.record_rowids) { + if (_lazy_materialization_read || _opts.record_rowids || _is_need_expr_eval) { _block_rowids.resize(_opts.block_row_max); } _current_return_columns.resize(_schema.columns().size()); @@ -1567,14 +1627,21 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { _current_batch_rows_read = 0; uint32_t nrows_read_limit = _opts.block_row_max; - if (UNLIKELY(_estimate_row_size)) { - // read 100 rows to estimate average row size + if (_wait_times_estimate_row_size > 0) { + // first time, read 100 rows to estimate average row size, to avoid oom caused by a single batch being too large. + // If no valid data is read for the first time, block_row_max is read each time thereafter. + // Avoid low performance when valid data cannot be read all the time nrows_read_limit = std::min(nrows_read_limit, (uint32_t)100); + _wait_times_estimate_row_size--; } _split_row_ranges.clear(); _split_row_ranges.reserve(nrows_read_limit / 2); _read_columns_by_index(nrows_read_limit, _current_batch_rows_read, - _lazy_materialization_read || _opts.record_rowids); + _lazy_materialization_read || _opts.record_rowids || _is_need_expr_eval); + if (std::find(_first_read_column_ids.begin(), _first_read_column_ids.end(), + _schema.version_col_idx()) != _first_read_column_ids.end()) { + _replace_version_col(_current_batch_rows_read); + } _opts.stats->blocks_load += 1; _opts.stats->raw_rows_read += _current_batch_rows_read; @@ -1583,7 +1650,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { for (int i = 0; i < block->columns(); i++) { auto cid = _schema.column_id(i); // todo(wb) abstract make column where - if (!_is_pred_column[cid]) { // non-predicate + if (!_is_pred_column[cid]) { block->replace_by_position(i, std::move(_current_return_columns[cid])); } } @@ -1591,23 +1658,109 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { return Status::EndOfFile("no more data in segment"); } - if (!_is_need_vec_eval && !_is_need_short_eval) { - _replace_version_col(_current_batch_rows_read); + if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) { _output_non_pred_columns(block); _output_index_result_column(nullptr, 0, block); } else { - _convert_dict_code_for_predicate_if_necessary(); uint16_t selected_size = _current_batch_rows_read; uint16_t sel_rowid_idx[selected_size]; - // step 1: evaluate vectorization predicate - selected_size = _evaluate_vectorization_predicate(sel_rowid_idx, selected_size); + if (_is_need_vec_eval || _is_need_short_eval) { + _convert_dict_code_for_predicate_if_necessary(); - // step 2: evaluate short circuit predicate - // todo(wb) research whether need to read short predicate after vectorization evaluation - // to reduce cost of read short circuit columns. - // In SSB test, it make no difference; So need more scenarios to test - selected_size = _evaluate_short_circuit_predicate(sel_rowid_idx, selected_size); + // step 1: evaluate vectorization predicate + selected_size = _evaluate_vectorization_predicate(sel_rowid_idx, selected_size); + + // step 2: evaluate short circuit predicate + // todo(wb) research whether need to read short predicate after vectorization evaluation + // to reduce cost of read short circuit columns. + // In SSB test, it make no difference; So need more scenarios to test + selected_size = _evaluate_short_circuit_predicate(sel_rowid_idx, selected_size); + + if (selected_size > 0) { + // step 3.1: output short circuit and predicate column + // when lazy materialization enables, _first_read_column_ids = distinct(_short_cir_pred_column_ids + _vec_pred_column_ids) + // see _vec_init_lazy_materialization + // todo(wb) need to tell input columnids from output columnids + RETURN_IF_ERROR(_output_column_by_sel_idx(block, _first_read_column_ids, + sel_rowid_idx, selected_size)); + + // step 3.2: read remaining expr column and evaluate it. + if (_is_need_expr_eval) { + // The predicate column contains the remaining expr column, no need second read. + if (!_second_read_column_ids.empty()) { + SCOPED_RAW_TIMER(&_opts.stats->second_read_ns); + RETURN_IF_ERROR(_read_columns_by_rowids( + _second_read_column_ids, _block_rowids, sel_rowid_idx, + selected_size, &_current_return_columns)); + if (std::find(_second_read_column_ids.begin(), + _second_read_column_ids.end(), + _schema.version_col_idx()) != _second_read_column_ids.end()) { + _replace_version_col(selected_size); + } + for (auto cid : _second_read_column_ids) { + auto loc = _schema_block_id_map[cid]; + block->replace_by_position(loc, + std::move(_current_return_columns[cid])); + } + } + + DCHECK(block->columns() > _schema_block_id_map[*_common_expr_columns.begin()]); + // block->rows() takes the size of the first column by default. If the first column is no predicate column, + // it has not been read yet. add a const column that has been read to calculate rows(). + if (block->rows() == 0) { + vectorized::MutableColumnPtr col0 = + std::move(*block->get_by_position(0).column).mutate(); + auto res_column = vectorized::ColumnString::create(); + res_column->insert_data("", 0); + auto col_const = vectorized::ColumnConst::create(std::move(res_column), + selected_size); + block->replace_by_position(0, std::move(col_const)); + _output_index_result_column(sel_rowid_idx, selected_size, block); + block->shrink_char_type_column_suffix_zero(_char_type_idx_no_0); + RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, selected_size, block)); + block->replace_by_position(0, std::move(col0)); + } else { + _output_index_result_column(sel_rowid_idx, selected_size, block); + block->shrink_char_type_column_suffix_zero(_char_type_idx); + RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, selected_size, block)); + } + } + } else if (_is_need_expr_eval) { + for (auto cid : _second_read_column_ids) { + auto loc = _schema_block_id_map[cid]; + block->replace_by_position(loc, std::move(_current_return_columns[cid])); + } + } + } else if (_is_need_expr_eval) { + DCHECK(!_first_read_column_ids.empty()); + // first read all rows are insert block, initialize sel_rowid_idx to all rows. + for (auto cid : _first_read_column_ids) { + auto loc = _schema_block_id_map[cid]; + block->replace_by_position(loc, std::move(_current_return_columns[cid])); + } + for (uint32_t i = 0; i < selected_size; ++i) { + sel_rowid_idx[i] = i; + } + + if (block->rows() == 0) { + vectorized::MutableColumnPtr col0 = + std::move(*block->get_by_position(0).column).mutate(); + auto res_column = vectorized::ColumnString::create(); + res_column->insert_data("", 0); + auto col_const = + vectorized::ColumnConst::create(std::move(res_column), selected_size); + block->replace_by_position(0, std::move(col_const)); + _output_index_result_column(nullptr, 0, block); + block->shrink_char_type_column_suffix_zero(_char_type_idx_no_0); + RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, selected_size, block)); + block->replace_by_position(0, std::move(col0)); + } else { + _output_index_result_column(nullptr, 0, block); + block->shrink_char_type_column_suffix_zero(_char_type_idx); + RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, selected_size, block)); + } + } if (UNLIKELY(_opts.record_rowids)) { _sel_rowid_idx.resize(selected_size); @@ -1617,45 +1770,33 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { } } - if (!_lazy_materialization_read) { - Status ret = Status::OK(); - if (selected_size > 0) { - _replace_version_col(selected_size); - ret = _output_column_by_sel_idx(block, _first_read_column_ids, sel_rowid_idx, - selected_size); - } - if (!ret.ok()) { - return ret; - } + if (_non_predicate_columns.empty()) { // shrink char_type suffix zero data block->shrink_char_type_column_suffix_zero(_char_type_idx); if (UNLIKELY(_estimate_row_size) && block->rows() > 0) { _update_max_row(block); } - return ret; + return Status::OK(); } - // step3: read non_predicate column - RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns, _block_rowids, - sel_rowid_idx, selected_size, - &_current_return_columns)); + // step4: read non_predicate column + if (selected_size > 0) { + RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns, _block_rowids, + sel_rowid_idx, selected_size, + &_current_return_columns)); + if (std::find(_non_predicate_columns.begin(), _non_predicate_columns.end(), + _schema.version_col_idx()) != _non_predicate_columns.end()) { + _replace_version_col(selected_size); + } + } - _replace_version_col(selected_size); - - // step4: output columns - // 4.1 output non-predicate column + // step5: output columns _output_non_pred_columns(block); - // 4.3 output short circuit and predicate column - // when lazy materialization enables, _first_read_column_ids = distinct(_short_cir_pred_column_ids + _vec_pred_column_ids) - // see _vec_init_lazy_materialization - // todo(wb) need to tell input columnids from output columnids - if (selected_size > 0) { - RETURN_IF_ERROR(_output_column_by_sel_idx(block, _first_read_column_ids, sel_rowid_idx, - selected_size)); + if (!_is_need_expr_eval) { + _output_index_result_column(sel_rowid_idx, selected_size, block); } - _output_index_result_column(sel_rowid_idx, selected_size, block); } // shrink char_type suffix zero data @@ -1680,6 +1821,106 @@ Status SegmentIterator::next_batch(vectorized::Block* block) { return Status::OK(); } +Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size, + vectorized::Block* block) { + SCOPED_RAW_TIMER(&_opts.stats->expr_filter_ns); + DCHECK(_remaining_vconjunct_root != nullptr); + DCHECK(block->rows() != 0); + size_t prev_columns = block->columns(); + Defer defer {[&]() { vectorized::Block::erase_useless_column(block, prev_columns); }}; + + int result_column_id = -1; + RETURN_IF_ERROR(_common_vexpr_ctxs_pushdown->execute(block, &result_column_id)); + vectorized::ColumnPtr filter_column = block->get_by_position(result_column_id).column; + if (auto* nullable_column = + vectorized::check_and_get_column(*filter_column)) { + vectorized::ColumnPtr nested_column = nullable_column->get_nested_column_ptr(); + + vectorized::MutableColumnPtr mutable_holder = + nested_column->use_count() == 1 + ? nested_column->assume_mutable() + : nested_column->clone_resized(nested_column->size()); + + vectorized::ColumnUInt8* concrete_column = + typeid_cast(mutable_holder.get()); + if (!concrete_column) { + return Status::InvalidArgument( + "Illegal type {} of column for filter. Must be UInt8 or Nullable(UInt8).", + filter_column->get_name()); + } + auto* __restrict null_map = nullable_column->get_null_map_data().data(); + vectorized::IColumn::Filter& filter = concrete_column->get_data(); + auto* __restrict filter_data = filter.data(); + + const size_t size = filter.size(); + for (size_t i = 0; i < size; ++i) { + filter_data[i] &= !null_map[i]; + } + + selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, filter); + vectorized::Block::filter_block_internal(block, _columns_to_filter, filter); + } else if (auto* const_column = + vectorized::check_and_get_column(*filter_column)) { + bool ret = const_column->get_bool(0); + if (!ret) { + for (auto& col : _columns_to_filter) { + std::move(*block->get_by_position(col).column).assume_mutable()->clear(); + } + selected_size = 0; + } + } else { + const vectorized::IColumn::Filter& filter = + assert_cast&>( + *filter_column) + .get_data(); + selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, filter); + vectorized::Block::filter_block_internal(block, _columns_to_filter, filter); + } + return Status::OK(); +} + +uint16_t SegmentIterator::_evaluate_common_expr_filter(uint16_t* sel_rowid_idx, + uint16_t selected_size, + const vectorized::IColumn::Filter& filter) { + size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); + if (count == 0) { + return 0; + } else { + const vectorized::UInt8* filt_pos = filter.data(); + + uint16_t new_size = 0; + uint32_t sel_pos = 0; + const uint32_t sel_end = selected_size; + static constexpr size_t SIMD_BYTES = 32; + const uint32_t sel_end_simd = sel_pos + selected_size / SIMD_BYTES * SIMD_BYTES; + + while (sel_pos < sel_end_simd) { + auto mask = simd::bytes32_mask_to_bits32_mask(filt_pos + sel_pos); + if (0 == mask) { + //pass + } else if (0xffffffff == mask) { + for (uint32_t i = 0; i < SIMD_BYTES; i++) { + sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos + i]; + } + } else { + while (mask) { + const size_t bit_pos = __builtin_ctzll(mask); + sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos + bit_pos]; + mask = mask & (mask - 1); + } + } + sel_pos += SIMD_BYTES; + } + + for (; sel_pos < sel_end; sel_pos++) { + if (filt_pos[sel_pos]) { + sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos]; + } + } + return new_size; + } +} + void SegmentIterator::_output_index_result_column(uint16_t* sel_rowid_idx, uint16_t select_size, vectorized::Block* block) { SCOPED_RAW_TIMER(&_opts.stats->output_index_result_column_timer); @@ -1776,7 +2017,7 @@ Status SegmentIterator::current_block_row_locations(std::vector* bl DCHECK(_opts.record_rowids); DCHECK_GE(_block_rowids.size(), _current_batch_rows_read); uint32_t sid = segment_id(); - if (!_is_need_vec_eval && !_is_need_short_eval) { + if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) { block_row_locations->resize(_current_batch_rows_read); for (auto i = 0; i < _current_batch_rows_read; i++) { (*block_row_locations)[i] = RowLocation(sid, _block_rowids[i]); diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h index 9eba9637c8..17af761071 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -170,7 +170,6 @@ private: Status _execute_compound_fn(const std::string& function_name); bool _is_literal_node(const TExprNodeType::type& node_type); - void _init_lazy_materialization(); void _vec_init_lazy_materialization(); // TODO: Fix Me // CHAR type in storage layer padding the 0 in length. But query engine need ignore the padding 0. @@ -212,6 +211,12 @@ private: bool _can_evaluated_by_vectorized(ColumnPredicate* predicate); + Status _extract_common_expr_columns(vectorized::VExpr* expr); + Status _execute_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size, + vectorized::Block* block); + uint16_t _evaluate_common_expr_filter(uint16_t* sel_rowid_idx, uint16_t selected_size, + const vectorized::IColumn::Filter& filter); + // Dictionary column should do something to initial. void _convert_dict_code_for_predicate_if_necessary(); @@ -317,15 +322,15 @@ private: // -------------------------------------------- // whether lazy materialization read should be used. bool _lazy_materialization_read; - // columns to read before predicate evaluation - std::vector _predicate_columns; - // columns to read after predicate evaluation + // columns to read after predicate evaluation and remaining expr execute std::vector _non_predicate_columns; + std::set _common_expr_columns; // remember the rowids we've read for the current row block. // could be a local variable of next_batch(), kept here to reuse vector memory std::vector _block_rowids; bool _is_need_vec_eval = false; bool _is_need_short_eval = false; + bool _is_need_expr_eval = false; // fields for vectorization execution std::vector @@ -334,6 +339,7 @@ private: _short_cir_pred_column_ids; // keep columnId of columns for short circuit predicate evaluation std::vector _is_pred_column; // columns hold by segmentIter std::map _need_read_data_indices; + std::vector _is_common_expr_column; vectorized::MutableColumns _current_return_columns; std::vector _pre_eval_block_predicate; std::vector _short_cir_eval_predicate; @@ -344,16 +350,22 @@ private: // second, read non-predicate columns // so we need a field to stand for columns first time to read std::vector _first_read_column_ids; + std::vector _second_read_column_ids; + std::vector _columns_to_filter; std::vector _schema_block_id_map; // map from schema column id to column idx in Block // the actual init process is delayed to the first call to next_batch() bool _inited; bool _estimate_row_size; + // Read up to 100 rows at a time while waiting for the estimated row size. + int _wait_times_estimate_row_size; StorageReadOptions _opts; // make a copy of `_opts.column_predicates` in order to make local changes std::vector _col_predicates; std::vector _col_preds_except_leafnode_of_andnode; + doris::vectorized::VExprContext* _common_vexpr_ctxs_pushdown; + bool _enable_common_expr_pushdown = false; doris::vectorized::VExpr* _remaining_vconjunct_root; std::vector _pred_except_leafnode_of_andnode_evaluate_result; std::unique_ptr _column_predicate_info; @@ -378,6 +390,7 @@ private: // char_type or array type columns cid std::vector _char_type_idx; + std::vector _char_type_idx_no_0; // number of rows read in the current batch uint32_t _current_batch_rows_read = 0; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 029c74d27d..d309390d6a 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -118,6 +118,11 @@ public: _query_options.check_overflow_for_decimal; } + bool enable_common_expr_pushdown() const { + return _query_options.__isset.enable_common_expr_pushdown && + _query_options.enable_common_expr_pushdown; + } + Status query_status() { std::lock_guard l(_process_status_lock); return _process_status; diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index cc09d53cd2..804ef8c9db 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -321,8 +321,8 @@ void Block::check_number_of_rows(bool allow_null_columns) const { if (rows == -1) { rows = size; } else if (rows != size) { - LOG(FATAL) << fmt::format("Sizes of columns doesn't match: {}:{},{}:{}", - data.front().name, rows, elem.name, size); + LOG(FATAL) << fmt::format("Sizes of columns doesn't match: {}:{},{}:{}, col size: {}", + data.front().name, rows, elem.name, size, each_col_size()); } } } @@ -337,7 +337,7 @@ size_t Block::rows() const { return 0; } -std::string Block::each_col_size() { +std::string Block::each_col_size() const { std::stringstream ss; for (const auto& elem : data) { if (elem.column) { @@ -438,6 +438,11 @@ std::string Block::dump_data(size_t begin, size_t row_limit) const { // content for (size_t row_num = begin; row_num < rows() && row_num < row_limit + begin; ++row_num) { for (size_t i = 0; i < columns(); ++i) { + if (data[i].column->empty()) { + out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) + << std::right; + continue; + } std::string s = ""; if (data[i].column) { s = data[i].to_string(row_num); @@ -959,6 +964,11 @@ std::string MutableBlock::dump_data(size_t row_limit) const { // content for (size_t row_num = 0; row_num < rows() && row_num < row_limit; ++row_num) { for (size_t i = 0; i < columns(); ++i) { + if (_columns[i].get()->empty()) { + out << std::setfill(' ') << std::setw(1) << "|" << std::setw(headers_size[i]) + << std::right; + continue; + } std::string s = _data_types[i]->to_string(*_columns[i].get(), row_num); if (s.length() > headers_size[i]) { s = s.substr(0, headers_size[i] - 3) + "..."; diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index ba1809f19a..bcaf6fa7d2 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -200,7 +200,7 @@ public: /// Returns number of rows from first column in block, not equal to nullptr. If no columns, returns 0. size_t rows() const; - std::string each_col_size(); + std::string each_col_size() const; // Cut the rows in block, use in LIMIT operation void set_num_rows(size_t length); diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 0ea05409cb..8761ca380c 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -80,7 +80,9 @@ Status NewOlapScanNode::_init_profile() { _rows_vec_cond_counter = ADD_COUNTER(_segment_profile, "RowsVectorPredFiltered", TUnit::UNIT); _vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime"); _short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime"); + _expr_filter_timer = ADD_TIMER(_segment_profile, "ExprFilterEvalTime"); _first_read_timer = ADD_TIMER(_segment_profile, "FirstReadTime"); + _second_read_timer = ADD_TIMER(_segment_profile, "SecondReadTime"); _first_read_seek_timer = ADD_TIMER(_segment_profile, "FirstReadSeekTime"); _first_read_seek_counter = ADD_COUNTER(_segment_profile, "FirstReadSeekCount", TUnit::UNIT); @@ -337,6 +339,14 @@ Status NewOlapScanNode::_should_push_down_function_filter(VectorizedFnCall* fn_c return Status::OK(); } +bool NewOlapScanNode::_should_push_down_common_expr() { + return _state->enable_common_expr_pushdown() && + (_olap_scan_node.keyType == TKeysType::DUP_KEYS || + (_olap_scan_node.keyType == TKeysType::UNIQUE_KEYS && + _olap_scan_node.__isset.enable_unique_key_merge_on_write && + _olap_scan_node.enable_unique_key_merge_on_write)); +} + // PlanFragmentExecutor will call this method to set scan range // Doris scan range is defined in thrift file like this // struct TPaloScanRange { @@ -437,9 +447,9 @@ Status NewOlapScanNode::_init_scanners(std::list* scanners) { // add scanner to pool before doing prepare. // so that scanner can be automatically deconstructed if prepare failed. _scanner_pool.add(scanner); - RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _vconjunct_ctx_ptr.get(), - _olap_filters, _filter_predicates, - _push_down_functions)); + RETURN_IF_ERROR(scanner->prepare( + *scan_range, scanner_ranges, _vconjunct_ctx_ptr.get(), _olap_filters, + _filter_predicates, _push_down_functions, _common_vexpr_ctxs_pushdown.get())); scanners->push_back((VScanner*)scanner); disk_set.insert(scanner->scan_disk()); } diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h b/be/src/vec/exec/scan/new_olap_scan_node.h index 3043fe60f7..1363b88947 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.h +++ b/be/src/vec/exec/scan/new_olap_scan_node.h @@ -55,6 +55,8 @@ protected: PushDownType _should_push_down_is_null_predicate() override { return PushDownType::ACCEPTABLE; } + bool _should_push_down_common_expr() override; + Status _init_scanners(std::list* scanners) override; private: @@ -91,6 +93,7 @@ private: RuntimeProfile::Counter* _rows_vec_cond_counter = nullptr; RuntimeProfile::Counter* _vec_cond_timer = nullptr; RuntimeProfile::Counter* _short_cond_timer = nullptr; + RuntimeProfile::Counter* _expr_filter_timer = nullptr; RuntimeProfile::Counter* _output_col_timer = nullptr; RuntimeProfile::Counter* _stats_filtered_counter = nullptr; @@ -109,6 +112,7 @@ private: RuntimeProfile::Counter* _block_init_seek_counter = nullptr; RuntimeProfile::Counter* _block_conditions_filtered_timer = nullptr; RuntimeProfile::Counter* _first_read_timer = nullptr; + RuntimeProfile::Counter* _second_read_timer = nullptr; RuntimeProfile::Counter* _first_read_seek_timer = nullptr; RuntimeProfile::Counter* _first_read_seek_counter = nullptr; RuntimeProfile::Counter* _lazy_read_timer = nullptr; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 1eea0afaa8..9a503bd16d 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -51,8 +51,13 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range, VExprContext** vconjunct_ctx_ptr, const std::vector& filters, const FilterPredicates& filter_predicates, - const std::vector& function_filters) { + const std::vector& function_filters, + VExprContext** common_vexpr_ctxs_pushdown) { RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr)); + if (common_vexpr_ctxs_pushdown != nullptr) { + // Copy common_vexpr_ctxs_pushdown from scan node to this scanner's _common_vexpr_ctxs_pushdown, just necessary. + RETURN_IF_ERROR((*common_vexpr_ctxs_pushdown)->clone(_state, &_common_vexpr_ctxs_pushdown)); + } // set limit to reduce end of rowset and segment mem use _tablet_reader = std::make_unique(); @@ -209,8 +214,14 @@ Status NewOlapScanner::_init_tablet_reader_params( real_parent->_olap_scan_node.push_down_agg_type_opt; } _tablet_reader_params.version = Version(0, _version); + // TODO: If a new runtime filter arrives after `_vconjunct_ctx` move to `_common_vexpr_ctxs_pushdown`, + // `_vconjunct_ctx` and `_common_vexpr_ctxs_pushdown` will have values at the same time, + // and the root() of `_vconjunct_ctx` and `_common_vexpr_ctxs_pushdown` should be merged as `remaining_vconjunct_root` _tablet_reader_params.remaining_vconjunct_root = - (_vconjunct_ctx == nullptr) ? nullptr : _vconjunct_ctx->root(); + (_common_vexpr_ctxs_pushdown == nullptr) + ? (_vconjunct_ctx == nullptr ? nullptr : _vconjunct_ctx->root()) + : _common_vexpr_ctxs_pushdown->root(); + _tablet_reader_params.common_vexpr_ctxs_pushdown = _common_vexpr_ctxs_pushdown; _tablet_reader_params.output_columns = ((NewOlapScanNode*)_parent)->_maybe_read_column_ids; // Condition @@ -471,12 +482,14 @@ void NewOlapScanner::_update_counters_before_close() { _raw_rows_read += _tablet_reader->mutable_stats()->raw_rows_read; COUNTER_UPDATE(olap_parent->_vec_cond_timer, stats.vec_cond_ns); COUNTER_UPDATE(olap_parent->_short_cond_timer, stats.short_cond_ns); + COUNTER_UPDATE(olap_parent->_expr_filter_timer, stats.expr_filter_ns); COUNTER_UPDATE(olap_parent->_block_init_timer, stats.block_init_ns); COUNTER_UPDATE(olap_parent->_block_init_seek_timer, stats.block_init_seek_ns); COUNTER_UPDATE(olap_parent->_block_init_seek_counter, stats.block_init_seek_num); COUNTER_UPDATE(olap_parent->_block_conditions_filtered_timer, stats.block_conditions_filtered_ns); COUNTER_UPDATE(olap_parent->_first_read_timer, stats.first_read_ns); + COUNTER_UPDATE(olap_parent->_second_read_timer, stats.second_read_ns); COUNTER_UPDATE(olap_parent->_first_read_seek_timer, stats.block_first_read_seek_ns); COUNTER_UPDATE(olap_parent->_first_read_seek_counter, stats.block_first_read_seek_num); COUNTER_UPDATE(olap_parent->_lazy_read_timer, stats.lazy_read_ns); diff --git a/be/src/vec/exec/scan/new_olap_scanner.h b/be/src/vec/exec/scan/new_olap_scanner.h index 1b3bfcb364..83968e2535 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.h +++ b/be/src/vec/exec/scan/new_olap_scanner.h @@ -44,7 +44,8 @@ public: Status prepare(const TPaloScanRange& scan_range, const std::vector& key_ranges, VExprContext** vconjunct_ctx_ptr, const std::vector& filters, const FilterPredicates& filter_predicates, - const std::vector& function_filters); + const std::vector& function_filters, + VExprContext** common_vexpr_ctxs_pushdown); const std::string& scan_disk() const { return _tablet->data_dir()->path(); } diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 6093c6314d..e7b46ca659 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -388,6 +388,9 @@ void VScanNode::release_resource(RuntimeState* state) { for (auto& ctx : _stale_vexpr_ctxs) { (*ctx)->close(state); } + if (_common_vexpr_ctxs_pushdown) { + (*_common_vexpr_ctxs_pushdown)->close(state); + } _scanner_pool.clear(); ExecNode::release_resource(state); @@ -457,7 +460,11 @@ Status VScanNode::_normalize_conjuncts() { RETURN_IF_ERROR(_normalize_predicate((*_vconjunct_ctx_ptr)->root(), &new_root)); if (new_root) { (*_vconjunct_ctx_ptr)->set_root(new_root); - } else { + if (_should_push_down_common_expr()) { + _common_vexpr_ctxs_pushdown = std::move(_vconjunct_ctx_ptr); + _vconjunct_ctx_ptr.reset(nullptr); + } + } else { // All conjucts are pushed down as predicate column _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr)); _vconjunct_ctx_ptr.reset(nullptr); } diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 2d6e287af0..23b1e59b51 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -164,6 +164,8 @@ protected: return Status::OK(); } + virtual bool _should_push_down_common_expr() { return false; } + virtual PushDownType _should_push_down_bloom_filter() { return PushDownType::UNACCEPTABLE; } virtual PushDownType _should_push_down_bitmap_filter() { return PushDownType::UNACCEPTABLE; } @@ -259,6 +261,7 @@ protected: // Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in this vector // so that it will be destroyed uniformly at the end of the query. std::vector> _stale_vexpr_ctxs; + std::unique_ptr _common_vexpr_ctxs_pushdown = nullptr; // If sort info is set, push limit to each scanner; int64_t _limit_per_scanner = -1; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 68dbbbfe9e..180b573614 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -78,9 +78,9 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { { SCOPED_TIMER(_parent->_filter_timer); RETURN_IF_ERROR(_filter_output_block(block)); - // record rows return (after filter) for _limit check - _num_rows_return += block->rows(); } + // record rows return (after filter) for _limit check + _num_rows_return += block->rows(); } while (!state->is_cancelled() && block->rows() == 0 && !(*eof) && _num_rows_read < rows_read_threshold); } @@ -141,6 +141,9 @@ Status VScanner::close(RuntimeState* state) { if (_vconjunct_ctx) { _vconjunct_ctx->close(state); } + if (_common_vexpr_ctxs_pushdown) { + _common_vexpr_ctxs_pushdown->close(state); + } COUNTER_UPDATE(_parent->_scanner_wait_worker_timer, _scanner_wait_worker_timer); _is_closed = true; diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index e36968e6f9..88cac3db42 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -153,6 +153,7 @@ protected: // Cloned from _vconjunct_ctx of scan node. // It includes predicate in SQL and runtime filters. VExprContext* _vconjunct_ctx = nullptr; + VExprContext* _common_vexpr_ctxs_pushdown = nullptr; // Late arriving runtime filters will update _vconjunct_ctx. // The old _vconjunct_ctx will be temporarily placed in _stale_vexpr_ctxs // and will be destroyed at the end. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 4d7cb98994..1133cf6807 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -229,6 +229,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_FUNCTION_PUSHDOWN = "enable_function_pushdown"; + public static final String ENABLE_COMMON_EXPR_PUSHDOWN = "enable_common_expr_pushdown"; + public static final String FRAGMENT_TRANSMISSION_COMPRESSION_CODEC = "fragment_transmission_compression_codec"; public static final String ENABLE_LOCAL_EXCHANGE = "enable_local_exchange"; @@ -643,6 +645,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_FUNCTION_PUSHDOWN) public boolean enableFunctionPushdown = true; + @VariableMgr.VarAttr(name = ENABLE_COMMON_EXPR_PUSHDOWN, fuzzy = true) + public boolean enableCommonExprPushdown = true; + @VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = true) public boolean enableLocalExchange = true; @@ -765,6 +770,7 @@ public class SessionVariable implements Serializable, Writable { public void initFuzzyModeVariables() { Random random = new Random(System.currentTimeMillis()); this.parallelExecInstanceNum = random.nextInt(8) + 1; + this.enableCommonExprPushdown = random.nextBoolean(); this.enableLocalExchange = random.nextBoolean(); // This will cause be dead loop, disable it first // this.disableJoinReorder = random.nextBoolean(); @@ -1609,6 +1615,7 @@ public class SessionVariable implements Serializable, Writable { } tResult.setEnableFunctionPushdown(enableFunctionPushdown); + tResult.setEnableCommonExprPushdown(enableCommonExprPushdown); tResult.setCheckOverflowForDecimal(checkOverflowForDecimal); tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec); tResult.setEnableLocalExchange(enableLocalExchange); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 578701004e..cafcf1044d 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -202,6 +202,8 @@ struct TQueryOptions { // For debug purpose, skip delete bitmap when reading data 63: optional bool skip_delete_bitmap = false 64: optional bool dry_run_query = false + + 65: optional bool enable_common_expr_pushdown = false; } diff --git a/regression-test/data/correctness/test_pushdown_common_expr.out b/regression-test/data/correctness/test_pushdown_common_expr.out new file mode 100644 index 0000000000..e50a453c5c --- /dev/null +++ b/regression-test/data/correctness/test_pushdown_common_expr.out @@ -0,0 +1,49 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1 -- +1 a aa +2 b bb +4 c cc +8 d dd + +-- !2 -- +64 g gg + +-- !3 -- +256 i ii + +-- !4 -- +1 a aa +128 h hh +16 e ee +2 b bb +256 i ii +32 f ff +4 c cc +64 g gg +8 d dd + +-- !1 -- +1 a aa +128 h hh +2 b bb +4 c cc +8 d dd + +-- !2 -- +64 g gg + +-- !3 -- +256 i ii + +-- !4 -- +1 a aa +1024 k kk +128 h hh +16 e ee +2 b bb +256 i ii +32 f ff +4 c cc +64 g gg +8 d dd + diff --git a/regression-test/suites/correctness/test_pushdown_common_expr.groovy b/regression-test/suites/correctness/test_pushdown_common_expr.groovy new file mode 100644 index 0000000000..8819288e10 --- /dev/null +++ b/regression-test/suites/correctness/test_pushdown_common_expr.groovy @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_pushdown_common_expr") { + sql """ DROP TABLE IF EXISTS t_pushdown_common_expr """ + sql """ + CREATE TABLE `t_pushdown_common_expr` ( + `c1` int(11) NULL, + `c2` varchar(100) NULL COMMENT "", + `c3` varchar(100) NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`c1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`c1`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + INSERT INTO t_pushdown_common_expr VALUES + (1,'a','aa'), + (2,'b','bb'), + (4,'c','cc'), + (8,'d','dd'), + (16,'e','ee'), + (32,'f','ff'), + (64,'g','gg'), + (128,'h','hh'), + (256,'i','ii'), + (512,'j','jj'), + (1024,'k','kk'); + """ + + sql """set batch_size=8""" + sql """set enable_common_expr_pushdown=true""" + + order_qt_1 """ + SELECT * FROM t_pushdown_common_expr WHERE c3 LIKE "%c%" OR c1 < 10; + """ + + order_qt_2 """ + SELECT * FROM t_pushdown_common_expr WHERE UPPER(c2)="G" OR UPPER(c2)="P"; + """ + + order_qt_3 """ + SELECT * FROM t_pushdown_common_expr WHERE c1 = 256 OR c1 = 100; + """ + + order_qt_4 """ + SELECT * FROM t_pushdown_common_expr WHERE c1 < 300 OR UPPER(c2)="F" OR c3 LIKE "%f%"; + """ + + sql """set enable_common_expr_pushdown=false""" + + order_qt_1 """ + SELECT * FROM t_pushdown_common_expr WHERE c3 LIKE "%h%" OR c1 < 10; + """ + + order_qt_2 """ + SELECT * FROM t_pushdown_common_expr WHERE UPPER(c2)="G" OR UPPER(c2)="P"; + """ + + order_qt_3 """ + SELECT * FROM t_pushdown_common_expr WHERE c1 = 256 OR c1 = 100; + """ + + order_qt_4 """ + SELECT * FROM t_pushdown_common_expr WHERE c1 < 300 OR UPPER(c2)="K" OR c3 LIKE "%k%"; + """ + +}