diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 03a9dd0bc2..1b8b5a80ea 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -148,6 +148,7 @@ Status OlapScanNode::prepare(RuntimeState* state) { ADD_COUNTER(_scanner_profile, "RowsPushedCondFiltered", TUnit::UNIT); _init_counter(state); _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (_tuple_desc == NULL) { // TODO: make sure we print all available diagnostic output to our error log return Status::InternalError("Failed to get tuple descriptor."); @@ -177,17 +178,6 @@ Status OlapScanNode::open(RuntimeState* state) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(ExecNode::open(state)); - for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { - // if conjunct is constant, compute direct and set eos = true - - if (_conjunct_ctxs[conj_idx]->root()->is_constant()) { - void* value = _conjunct_ctxs[conj_idx]->get_value(NULL); - if (value == NULL || *reinterpret_cast(value) == false) { - _eos = true; - } - } - } - _resource_info = ResourceTls::get_resource_tls(); return Status::OK(); @@ -208,11 +198,6 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo return _status; } - if (_eos) { - *eos = true; - return Status::OK(); - } - // check if started. if (!_start) { Status status = start_scan(state); @@ -226,6 +211,13 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo _start = true; } + // some conjuncts will be disposed in start_scan function, so + // we should check _eos after call start_scan + if (_eos) { + *eos = true; + return Status::OK(); + } + // wait for batch from queue RowBatch* materialized_batch = NULL; { @@ -370,25 +362,92 @@ Status OlapScanNode::set_scan_ranges(const std::vector& scan_r Status OlapScanNode::start_scan(RuntimeState* state) { RETURN_IF_CANCELLED(state); + VLOG(1) << "Eval Const Conjuncts"; + // 1. Eval const conjuncts to find whether eos = true + eval_const_conjuncts(); + VLOG(1) << "NormalizeConjuncts"; - // 1. Convert conjuncts to ColumnValueRange in each column + // 2. Convert conjuncts to ColumnValueRange in each column, some conjuncts will + // set eos = true RETURN_IF_ERROR(normalize_conjuncts()); + // 1 and 2 step dispose find conjuncts set eos = true, return directly + if (_eos) { + return Status::OK(); + } + VLOG(1) << "BuildOlapFilters"; - // 2. Using ColumnValueRange to Build StorageEngine filters + // 3. Using ColumnValueRange to Build StorageEngine filters RETURN_IF_ERROR(build_olap_filters()); + VLOG(1) << "Filter idle conjuncts"; + // 4. Filter idle conjunct which already trans to olap filters` + remove_pushed_conjuncts(state); + VLOG(1) << "BuildScanKey"; - // 3. Using `Key Column`'s ColumnValueRange to split ScanRange to several `Sub ScanRange` + // 5. Using `Key Column`'s ColumnValueRange to split ScanRange to several `Sub ScanRange` RETURN_IF_ERROR(build_scan_key()); VLOG(1) << "StartScanThread"; - // 4. Start multi thread to read several `Sub Sub ScanRange` + // 6. Start multi thread to read several `Sub Sub ScanRange` RETURN_IF_ERROR(start_scan_thread(state)); return Status::OK(); } +bool OlapScanNode::is_key_column(const std::string& key_name) { + // all column in dup_keys table olap scan node threat + // as key column + if (_olap_scan_node.keyType == TKeysType::DUP_KEYS) { + return true; + } + + auto res = std::find(_olap_scan_node.key_column_name.begin(), _olap_scan_node.key_column_name.end(), key_name); + return res != _olap_scan_node.key_column_name.end(); +} + +void OlapScanNode::remove_pushed_conjuncts(RuntimeState *state) { + if (_pushed_conjuncts_index.empty()) { + return; + } + + // dispose direct conjunct first + std::vector new_conjunct_ctxs; + for (int i = 0; i < _direct_conjunct_size; ++i) { + if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) == _pushed_conjuncts_index.cend()){ + new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]); + } else { + _conjunct_ctxs[i]->close(state); + } + } + auto new_direct_conjunct_size = new_conjunct_ctxs.size(); + + // dispose hash push down conjunct second + for (int i = _direct_conjunct_size; i < _conjunct_ctxs.size(); ++i) { + if (std::find(_pushed_conjuncts_index.cbegin(), _pushed_conjuncts_index.cend(), i) == _pushed_conjuncts_index.cend()){ + new_conjunct_ctxs.emplace_back(_conjunct_ctxs[i]); + } else { + _conjunct_ctxs[i]->close(state); + } + } + + _conjunct_ctxs = std::move(new_conjunct_ctxs); + _direct_conjunct_size = new_direct_conjunct_size; +} + +void OlapScanNode::eval_const_conjuncts() { + for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { + // if conjunct is constant, compute direct and set eos = true + if (_conjunct_ctxs[conj_idx]->root()->is_constant()) { + void* value = _conjunct_ctxs[conj_idx]->get_value(NULL); + if (value == NULL || *reinterpret_cast(value) == false) { + _eos = true; + break; + } + } + } +} + Status OlapScanNode::normalize_conjuncts() { std::vector slots = _tuple_desc->slots(); @@ -721,6 +780,7 @@ static bool ignore_cast(SlotDescriptor* slot, Expr* expr) { template Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, ColumnValueRange* range) { + std::vector filter_conjuncts_index; bool meet_eq_binary = false; for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)' @@ -770,6 +830,8 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, // begin to push InPredicate value into ColumnValueRange HybridSetBase::IteratorBase* iter = pred->hybrid_set()->begin(); + auto skip_invalid_value_count = 0; + while (iter->has_next()) { // column in (NULL,...) couldn't push down to StorageEngine // so that discard whole ColumnValueRange @@ -786,9 +848,14 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, } case TYPE_DATE: { DateTimeValue date_value = - *reinterpret_cast(iter->get_value()); - date_value.cast_to_date(); - range->add_fixed_value(*reinterpret_cast(&date_value)); + *reinterpret_cast(iter->get_value()); + if (date_value.check_loss_accuracy_cast_to_date()) { + // There is may return empty data in olap_scan_node, + // Because data value loss accuracy, skip this value + skip_invalid_value_count++; + } else { + range->add_fixed_value(*reinterpret_cast(&date_value)); + } break; } case TYPE_DECIMAL: @@ -817,6 +884,15 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, iter->next(); } + // all value in hybrid set in skip, means all in condition + // is invalid, so set eos = true + if (skip_invalid_value_count == pred->hybrid_set()->size()) { + _eos = true; + } + + if (is_key_column(slot->col_name())) { + filter_conjuncts_index.emplace_back(conj_idx); + } } // end of handle in predicate // 2. Normalize eq conjuncts like 'where col = value' @@ -866,47 +942,55 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, // because for AND compound predicates, it can overwrite previous conditions range->clear(); switch (slot->type().type) { - case TYPE_TINYINT: { - int32_t v = *reinterpret_cast(value); - range->add_fixed_value(*reinterpret_cast(&v)); - break; - } - case TYPE_DATE: { - DateTimeValue date_value = *reinterpret_cast(value); - date_value.cast_to_date(); - range->add_fixed_value(*reinterpret_cast(&date_value)); - break; - } - case TYPE_DECIMAL: - case TYPE_DECIMALV2: - case TYPE_CHAR: - case TYPE_VARCHAR: - case TYPE_HLL: - case TYPE_DATETIME: - case TYPE_SMALLINT: - case TYPE_INT: - case TYPE_BIGINT: - case TYPE_LARGEINT: { - range->add_fixed_value(*reinterpret_cast(value)); - break; - } - case TYPE_BOOLEAN: { - bool v = *reinterpret_cast(value); - range->add_fixed_value(*reinterpret_cast(&v)); - break; - } - default: { - LOG(WARNING) << "Normalize filter fail, Unsupported Primitive type. [type=" - << expr->type() << "]"; - return Status::InternalError( - "Normalize filter fail, Unsupported Primitive type"); - } + case TYPE_TINYINT: { + int32_t v = *reinterpret_cast(value); + range->add_fixed_value(*reinterpret_cast(&v)); + break; + } + case TYPE_DATE: { + DateTimeValue date_value = + *reinterpret_cast(value); + if (date_value.check_loss_accuracy_cast_to_date()) { + // There is must return empty data in olap_scan_node, + // Because data value loss accuracy + _eos = true; + } + range->add_fixed_value(*reinterpret_cast(&date_value)); + break; + } + case TYPE_DECIMAL: + case TYPE_DECIMALV2: + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_HLL: + case TYPE_DATETIME: + case TYPE_SMALLINT: + case TYPE_INT: + case TYPE_BIGINT: + case TYPE_LARGEINT: { + range->add_fixed_value(*reinterpret_cast(value)); + break; + } + case TYPE_BOOLEAN: { + bool v = *reinterpret_cast(value); + range->add_fixed_value(*reinterpret_cast(&v)); + break; + } + default: { + LOG(WARNING) << "Normalize filter fail, Unsupported Primitive type. [type=" + << expr->type() << "]"; + return Status::InternalError("Normalize filter fail, Unsupported Primitive type"); + } } + if (is_key_column(slot->col_name())) { + filter_conjuncts_index.emplace_back(conj_idx); + } meet_eq_binary = true; } // end for each binary predicate child } // end of handling eq binary predicate + if (range->get_fixed_value_size() > 0) { // this columns already meet some eq predicates(IN or Binary), // There is no need to continue to iterate. @@ -932,6 +1016,9 @@ Status OlapScanNode::normalize_in_and_eq_predicate(SlotDescriptor* slot, if (range->get_fixed_value_size() > _max_pushdown_conditions_per_column) { // exceed limit, no conditions will be pushed down to storage engine. range->clear(); + } else { + std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(), + std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin())); } return Status::OK(); } @@ -961,6 +1048,7 @@ void OlapScanNode::construct_is_null_pred_in_where_pred(Expr* expr, SlotDescript template Status OlapScanNode::normalize_noneq_binary_predicate(SlotDescriptor* slot, ColumnValueRange* range) { + std::vector filter_conjuncts_index; for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); ++conj_idx) { Expr* root_expr = _conjunct_ctxs[conj_idx]->root(); if (TExprNodeType::BINARY_PRED != root_expr->node_type() || @@ -1019,7 +1107,13 @@ Status OlapScanNode::normalize_noneq_binary_predicate(SlotDescriptor* slot, case TYPE_DATE: { DateTimeValue date_value = *reinterpret_cast(value); - date_value.cast_to_date(); + // NOTE: Datetime may be truncated to a date column, so we call ++operator for date_value + // for example: '2010-01-01 00:00:01' will be truncate to '2010-01-01' + if (date_value.check_loss_accuracy_cast_to_date()) { + if (pred->op() == TExprOpcode::LT || pred->op() == TExprOpcode::GE) { + ++date_value; + } + } range->add_range(to_olap_filter_type(pred->op(), child_idx), *reinterpret_cast(&date_value)); break; @@ -1053,13 +1147,22 @@ Status OlapScanNode::normalize_noneq_binary_predicate(SlotDescriptor* slot, } } - VLOG(1) << slot->col_name() - << " op: " << static_cast(to_olap_filter_type(pred->op(), child_idx)) + if (is_key_column(slot->col_name())) { + filter_conjuncts_index.emplace_back(conj_idx); + } + + VLOG(1) << slot->col_name() << " op: " + << static_cast(to_olap_filter_type(pred->op(), child_idx)) << " value: " << *reinterpret_cast(value); } } + + } + std::copy(filter_conjuncts_index.cbegin(), filter_conjuncts_index.cend(), + std::inserter(_pushed_conjuncts_index, _pushed_conjuncts_index.begin())); + return Status::OK(); } diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 811787cdae..d0fa54ee49 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -133,7 +133,14 @@ protected: VLOG(1) << s.str() << "\n]"; } + // In order to ensure the accuracy of the query result + // only key column conjuncts will be remove as idle conjunct + bool is_key_column(const std::string& key_name); + void remove_pushed_conjuncts(RuntimeState *state); + Status start_scan(RuntimeState* state); + + void eval_const_conjuncts(); Status normalize_conjuncts(); Status build_olap_filters(); Status build_scan_key(); @@ -178,6 +185,9 @@ private: int _tuple_idx; // string slots std::vector _string_slots; + // conjunct's index which already be push down storage engine + // should be remove in olap_scan_node, no need check this conjunct again + std::set _pushed_conjuncts_index; bool _eos; diff --git a/be/src/exec/olap_utils.h b/be/src/exec/olap_utils.h index f702df9de2..e08debf941 100644 --- a/be/src/exec/olap_utils.h +++ b/be/src/exec/olap_utils.h @@ -190,9 +190,9 @@ inline int get_olap_size(PrimitiveType type) { inline SQLFilterOp to_olap_filter_type(TExprOpcode::type type, bool opposite) { switch (type) { case TExprOpcode::LT: + return opposite ? FILTER_LARGER : FILTER_LESS; + case TExprOpcode::LE: - // NOTE: Datetime may be truncated to a date column, so we convert LT to LE - // for example: '2010-01-01 00:00:01' will be truncate to '2010-01-01' return opposite ? FILTER_LARGER_OR_EQUAL : FILTER_LESS_OR_EQUAL; case TExprOpcode::GT: diff --git a/be/src/runtime/datetime_value.h b/be/src/runtime/datetime_value.h index 39e22afea0..d092cc9817 100644 --- a/be/src/runtime/datetime_value.h +++ b/be/src/runtime/datetime_value.h @@ -273,6 +273,12 @@ public: int minute() const { return _minute; } int second() const { return _second; } + bool check_loss_accuracy_cast_to_date() { + auto loss_accuracy = _hour != 0 || _minute != 0 || _second != 0 || _microsecond != 0; + cast_to_date(); + return loss_accuracy; + } + void cast_to_date() { _hour = 0; _minute = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 9de8643c2a..58b100e701 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -630,6 +630,7 @@ public class OlapScanNode extends ScanNode { if (null != sortColumn) { msg.olap_scan_node.setSortColumn(sortColumn); } + msg.olap_scan_node.setKeyType(olapTable.getKeysType().toThrift()); } // export some tablets diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 250ac95500..ffc921004d 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -316,7 +316,9 @@ struct TOlapScanNode { 3: required list key_column_type 4: required bool is_preaggregation 5: optional string sort_column + 6: optional Types.TKeysType keyType } + struct TEqJoinCondition { // left-hand side of " = " 1: required Exprs.TExpr left;