[Bug](materialized-view) do not check key/value column when index is dup or mow (#32695)
do not check key/value column when index is dup or mow
This commit is contained in:
@ -1776,13 +1776,13 @@ Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void SegmentIterator::_init_current_block(
|
||||
Status SegmentIterator::_init_current_block(
|
||||
vectorized::Block* block, std::vector<vectorized::MutableColumnPtr>& current_columns) {
|
||||
block->clear_column_data(_schema->num_column_ids());
|
||||
|
||||
for (size_t i = 0; i < _schema->num_column_ids(); i++) {
|
||||
auto cid = _schema->column_id(i);
|
||||
auto column_desc = _schema->column(cid);
|
||||
const auto* column_desc = _schema->column(cid);
|
||||
if (!_is_pred_column[cid] &&
|
||||
!_segment->same_with_storage_type(
|
||||
cid, *_schema, _opts.io_ctx.reader_type != ReaderType::READER_QUERY)) {
|
||||
@ -1802,6 +1802,11 @@ void SegmentIterator::_init_current_block(
|
||||
// the column in block must clear() here to insert new data
|
||||
if (_is_pred_column[cid] ||
|
||||
i >= block->columns()) { //todo(wb) maybe we can release it after output block
|
||||
if (current_columns[cid].get() == nullptr) {
|
||||
return Status::InternalError(
|
||||
"SegmentIterator meet invalid column, id={}, name={}", cid,
|
||||
_schema->column(cid)->name());
|
||||
}
|
||||
current_columns[cid]->clear();
|
||||
} else { // non-predicate column
|
||||
current_columns[cid] = std::move(*block->get_by_position(i).column).mutate();
|
||||
@ -1815,6 +1820,7 @@ void SegmentIterator::_init_current_block(
|
||||
}
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
|
||||
@ -2206,7 +2212,7 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
|
||||
}
|
||||
}
|
||||
}
|
||||
_init_current_block(block, _current_return_columns);
|
||||
RETURN_IF_ERROR(_init_current_block(block, _current_return_columns));
|
||||
_converted_column_ids.assign(_schema->columns().size(), 0);
|
||||
|
||||
_current_batch_rows_read = 0;
|
||||
|
||||
@ -217,8 +217,8 @@ private:
|
||||
[[nodiscard]] Status _read_columns_by_index(uint32_t nrows_read_limit, uint32_t& nrows_read,
|
||||
bool set_block_rowid);
|
||||
void _replace_version_col(size_t num_rows);
|
||||
void _init_current_block(vectorized::Block* block,
|
||||
std::vector<vectorized::MutableColumnPtr>& non_pred_vector);
|
||||
Status _init_current_block(vectorized::Block* block,
|
||||
std::vector<vectorized::MutableColumnPtr>& non_pred_vector);
|
||||
uint16_t _evaluate_vectorization_predicate(uint16_t* sel_rowid_idx, uint16_t selected_size);
|
||||
uint16_t _evaluate_short_circuit_predicate(uint16_t* sel_rowid_idx, uint16_t selected_size);
|
||||
void _output_non_pred_columns(vectorized::Block* block);
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.nereids.rules.rewrite.mv;
|
||||
import org.apache.doris.analysis.CreateMaterializedViewStmt;
|
||||
import org.apache.doris.catalog.AggregateType;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.KeysType;
|
||||
import org.apache.doris.catalog.MaterializedIndex;
|
||||
import org.apache.doris.catalog.MaterializedIndexMeta;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
@ -631,7 +632,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
|
||||
long selectIndexId = selectBestIndex(haveAllRequiredColumns, scan, predicates);
|
||||
// Pre-aggregation is set to `on` by default for duplicate-keys table.
|
||||
// In other cases where mv is not hit, preagg may turn off from on.
|
||||
if (!table.isDupKeysOrMergeOnWrite() && (new CheckContext(scan, selectIndexId)).isBaseIndex()) {
|
||||
if ((new CheckContext(scan, selectIndexId)).isBaseIndex()) {
|
||||
PreAggStatus preagg = scan.getPreAggStatus();
|
||||
if (preagg.isOn()) {
|
||||
preagg = checkPreAggStatus(scan, scan.getTable().getBaseIndexId(), predicates, aggregateFunctions,
|
||||
@ -706,13 +707,12 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// Set pre-aggregation status.
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
private PreAggStatus checkPreAggStatus(
|
||||
LogicalOlapScan olapScan,
|
||||
long indexId,
|
||||
Set<Expression> predicates,
|
||||
List<AggregateFunction> aggregateFuncs,
|
||||
List<Expression> groupingExprs) {
|
||||
private PreAggStatus checkPreAggStatus(LogicalOlapScan olapScan, long indexId, Set<Expression> predicates,
|
||||
List<AggregateFunction> aggregateFuncs, List<Expression> groupingExprs) {
|
||||
CheckContext checkContext = new CheckContext(olapScan, indexId);
|
||||
if (checkContext.isDupKeysOrMergeOnWrite) {
|
||||
return PreAggStatus.on();
|
||||
}
|
||||
return checkAggregateFunctions(aggregateFuncs, checkContext)
|
||||
.offOrElse(() -> checkGroupingExprs(groupingExprs, checkContext))
|
||||
.offOrElse(() -> checkPredicates(ImmutableList.copyOf(predicates), checkContext));
|
||||
@ -747,29 +747,30 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
|
||||
|
||||
@Override
|
||||
public PreAggStatus visitAggregateFunction(AggregateFunction aggregateFunction, CheckContext context) {
|
||||
return checkAggFunc(aggregateFunction, AggregateType.NONE, context, false);
|
||||
return checkAggFunc(aggregateFunction, AggregateType.NONE, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PreAggStatus visitMax(Max max, CheckContext context) {
|
||||
return checkAggFunc(max, AggregateType.MAX, context, true);
|
||||
return checkAggFunc(max, AggregateType.MAX, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PreAggStatus visitMin(Min min, CheckContext context) {
|
||||
return checkAggFunc(min, AggregateType.MIN, context, true);
|
||||
return checkAggFunc(min, AggregateType.MIN, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PreAggStatus visitSum(Sum sum, CheckContext context) {
|
||||
return checkAggFunc(sum, AggregateType.SUM, context, false);
|
||||
return checkAggFunc(sum, AggregateType.SUM, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PreAggStatus visitCount(Count count, CheckContext context) {
|
||||
if (count.isDistinct() && count.arity() == 1) {
|
||||
Optional<Slot> slotOpt = ExpressionUtils.extractSlotOrCastOnSlot(count.child(0));
|
||||
if (slotOpt.isPresent() && context.keyNameToColumn.containsKey(normalizeName(slotOpt.get().toSql()))) {
|
||||
if (slotOpt.isPresent() && (context.isDupKeysOrMergeOnWrite
|
||||
|| context.keyNameToColumn.containsKey(normalizeName(slotOpt.get().toSql())))) {
|
||||
return PreAggStatus.on();
|
||||
}
|
||||
if (count.child(0).arity() != 0) {
|
||||
@ -830,8 +831,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
|
||||
private PreAggStatus checkAggFunc(
|
||||
AggregateFunction aggFunc,
|
||||
AggregateType matchingAggType,
|
||||
CheckContext ctx,
|
||||
boolean canUseKeyColumn) {
|
||||
CheckContext ctx) {
|
||||
String childNameWithFuncName = ctx.isBaseIndex()
|
||||
? normalizeName(aggFunc.child(0).toSql())
|
||||
: normalizeName(CreateMaterializedViewStmt.mvColumnBuilder(
|
||||
@ -839,7 +839,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
|
||||
|
||||
boolean contains = containsAllColumn(aggFunc.child(0), ctx.keyNameToColumn.keySet());
|
||||
if (contains || ctx.keyNameToColumn.containsKey(childNameWithFuncName)) {
|
||||
if (canUseKeyColumn || (!ctx.isBaseIndex() && contains)) {
|
||||
if (ctx.isDupKeysOrMergeOnWrite || (!ctx.isBaseIndex() && contains)) {
|
||||
return PreAggStatus.on();
|
||||
} else {
|
||||
Column column = ctx.keyNameToColumn.get(childNameWithFuncName);
|
||||
@ -966,6 +966,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
|
||||
public final long index;
|
||||
public final Map<String, Column> keyNameToColumn;
|
||||
public final Map<String, Column> valueNameToColumn;
|
||||
public final boolean isDupKeysOrMergeOnWrite;
|
||||
|
||||
public CheckContext(LogicalOlapScan scan, long indexId) {
|
||||
this.scan = scan;
|
||||
@ -1005,6 +1006,9 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
|
||||
this.valueNameToColumn.putIfAbsent(key, baseNameToColumnGroupingByIsKey.get(false).get(key));
|
||||
}
|
||||
this.index = indexId;
|
||||
this.isDupKeysOrMergeOnWrite = getMeta().getKeysType() == KeysType.DUP_KEYS
|
||||
|| scan.getTable().getEnableUniqueKeyMergeOnWrite()
|
||||
&& getMeta().getKeysType() == KeysType.UNIQUE_KEYS;
|
||||
}
|
||||
|
||||
public boolean isBaseIndex() {
|
||||
|
||||
@ -375,7 +375,11 @@ public abstract class Literal extends Expression implements LeafExpression, Comp
|
||||
if (isNullLiteral()) {
|
||||
return false;
|
||||
}
|
||||
if (dataType.isSmallIntType() || dataType.isTinyIntType() || dataType.isIntegerType()) {
|
||||
if (dataType.isTinyIntType()) {
|
||||
return getValue().equals((byte) 0);
|
||||
} else if (dataType.isSmallIntType()) {
|
||||
return getValue().equals((short) 0);
|
||||
} else if (dataType.isIntegerType()) {
|
||||
return getValue().equals(0);
|
||||
} else if (dataType.isBigIntType()) {
|
||||
return getValue().equals(0L);
|
||||
|
||||
@ -37,9 +37,9 @@
|
||||
|
||||
-- !select_group_mv_add --
|
||||
-4
|
||||
3
|
||||
1
|
||||
2
|
||||
-3
|
||||
|
||||
-- !select_group_mv_not --
|
||||
-3
|
||||
|
||||
@ -18,9 +18,6 @@
|
||||
import org.codehaus.groovy.runtime.IOGroovyMethods
|
||||
|
||||
suite ("mv_ssb_q_1_1") {
|
||||
|
||||
sql """set enable_nereids_planner=false"""
|
||||
|
||||
sql """ DROP TABLE IF EXISTS lineorder_flat; """
|
||||
|
||||
sql """
|
||||
|
||||
@ -18,9 +18,6 @@
|
||||
import org.codehaus.groovy.runtime.IOGroovyMethods
|
||||
|
||||
suite ("mv_ssb_q_4_1") {
|
||||
|
||||
sql """set enable_nereids_planner=false"""
|
||||
|
||||
sql """ DROP TABLE IF EXISTS lineorder_flat; """
|
||||
|
||||
sql """
|
||||
|
||||
@ -18,10 +18,6 @@
|
||||
import org.codehaus.groovy.runtime.IOGroovyMethods
|
||||
|
||||
suite ("test_dup_mv_plus") {
|
||||
|
||||
// because nereids cannot support rollup correctly forbid it temporary
|
||||
sql """set enable_nereids_planner=false"""
|
||||
|
||||
sql """ DROP TABLE IF EXISTS d_table; """
|
||||
|
||||
sql """
|
||||
@ -59,10 +55,10 @@ suite ("test_dup_mv_plus") {
|
||||
qt_select_mv_sub "select k2+1 from d_table order by k1;"
|
||||
|
||||
explain {
|
||||
sql("select k2+1-1 from d_table order by k1;")
|
||||
sql("select k2+1 from d_table order by k1+1-1;")
|
||||
contains "(k12p)"
|
||||
}
|
||||
qt_select_mv_sub_add "select k2+1-1 from d_table order by k1;"
|
||||
qt_select_mv_sub_add "select k2+1-1 from d_table order by k1+1-1;"
|
||||
|
||||
explain {
|
||||
sql("select sum(k2+1) from d_table group by k1 order by k1;")
|
||||
@ -77,10 +73,10 @@ suite ("test_dup_mv_plus") {
|
||||
qt_select_group_mv "select sum(k1) from d_table group by k2+1 order by k2+1;"
|
||||
|
||||
explain {
|
||||
sql("select sum(k2+1-1) from d_table group by k1 order by k1;")
|
||||
sql("select sum(k1+1-1) from d_table group by k2+1 order by k2+1;")
|
||||
contains "(k12p)"
|
||||
}
|
||||
qt_select_group_mv_add "select sum(k2+1-1) from d_table group by k1 order by k1;"
|
||||
qt_select_group_mv_add "select sum(k1+1-1) from d_table group by k2+1 order by k2+1;"
|
||||
|
||||
explain {
|
||||
sql("select sum(k2) from d_table group by k3;")
|
||||
|
||||
@ -56,7 +56,6 @@ suite ("testProjectionMV1") {
|
||||
}
|
||||
qt_select_mv "select empid, deptno from emps order by empid;"
|
||||
|
||||
sql """set enable_nereids_planner=false""" // need fix it on nereids
|
||||
explain {
|
||||
sql("select empid, sum(deptno) from emps group by empid order by empid;")
|
||||
contains "(emps_mv)"
|
||||
|
||||
Reference in New Issue
Block a user