pick from #39497
This commit is contained in:
@ -34,6 +34,7 @@ namespace doris {
|
||||
* but pass (set/return true) for NULL value rows.
|
||||
*
|
||||
* At parent, it's used for topn runtime predicate.
|
||||
* Eg: original input indexs is '1,2,3,7,8,9' and value of index9 is null, we get nested predicate output index is '1,2,3', but we finally output '1,2,3,9'
|
||||
*/
|
||||
class AcceptNullPredicate : public ColumnPredicate {
|
||||
ENABLE_FACTORY_CREATOR(AcceptNullPredicate);
|
||||
@ -44,8 +45,6 @@ public:
|
||||
|
||||
PredicateType type() const override { return _nested->type(); }
|
||||
|
||||
void set_nested(ColumnPredicate* nested) { _nested.reset(nested); }
|
||||
|
||||
Status evaluate(BitmapIndexIterator* iterator, uint32_t num_rows,
|
||||
roaring::Roaring* roaring) const override {
|
||||
return _nested->evaluate(iterator, num_rows, roaring);
|
||||
@ -64,18 +63,14 @@ public:
|
||||
void evaluate_and(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size,
|
||||
bool* flags) const override {
|
||||
if (column.has_null()) {
|
||||
// copy original flags
|
||||
bool original_flags[size];
|
||||
memcpy(original_flags, flags, size * sizeof(bool));
|
||||
std::vector<uint8_t> original_flags(size);
|
||||
memcpy(original_flags.data(), flags, size);
|
||||
|
||||
const auto& nullable_col = assert_cast<const vectorized::ColumnNullable&>(column);
|
||||
// call evaluate_and and restore true for NULL rows
|
||||
_nested->evaluate_and(nullable_col.get_nested_column(), sel, size, flags);
|
||||
const auto& nullmap = nullable_col.get_null_map_data();
|
||||
for (uint16_t i = 0; i < size; ++i) {
|
||||
uint16_t idx = sel[i];
|
||||
if (original_flags[i] && !flags[i] && nullable_col.is_null_at(idx)) {
|
||||
flags[i] = true;
|
||||
}
|
||||
flags[i] |= (original_flags[i] && nullmap[sel[i]]);
|
||||
}
|
||||
} else {
|
||||
_nested->evaluate_and(column, sel, size, flags);
|
||||
@ -84,20 +79,7 @@ public:
|
||||
|
||||
void evaluate_or(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size,
|
||||
bool* flags) const override {
|
||||
if (column.has_null()) {
|
||||
const auto& nullable_col = assert_cast<const vectorized::ColumnNullable&>(column);
|
||||
_nested->evaluate_or(nullable_col.get_nested_column(), sel, size, flags);
|
||||
|
||||
// call evaluate_or and set true for NULL rows
|
||||
for (uint16_t i = 0; i < size; ++i) {
|
||||
uint16_t idx = sel[i];
|
||||
if (!flags[i] && nullable_col.is_null_at(idx)) {
|
||||
flags[i] = true;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
_nested->evaluate_or(column, sel, size, flags);
|
||||
}
|
||||
DCHECK(false) << "should not reach here";
|
||||
}
|
||||
|
||||
bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& statistic) const override {
|
||||
@ -138,8 +120,8 @@ public:
|
||||
bool* flags) const override {
|
||||
if (column.has_null()) {
|
||||
// copy original flags
|
||||
bool original_flags[size];
|
||||
memcpy(original_flags, flags, size * sizeof(bool));
|
||||
std::vector<uint8_t> original_flags(size);
|
||||
memcpy(original_flags.data(), flags, size);
|
||||
|
||||
const auto& nullable_col = assert_cast<const vectorized::ColumnNullable&>(column);
|
||||
// call evaluate_and_vec and restore true for NULL rows
|
||||
@ -160,23 +142,32 @@ private:
|
||||
uint16_t _evaluate_inner(const vectorized::IColumn& column, uint16_t* sel,
|
||||
uint16_t size) const override {
|
||||
if (column.has_null()) {
|
||||
if (size == 0) return 0;
|
||||
if (size == 0) {
|
||||
return 0;
|
||||
}
|
||||
// create selected_flags
|
||||
uint16_t max_idx = sel[size - 1];
|
||||
bool selected[max_idx + 1];
|
||||
std::vector<uint16_t> old_sel(size);
|
||||
memcpy(old_sel.data(), sel, sizeof(uint16_t) * size);
|
||||
|
||||
const auto& nullable_col = assert_cast<const vectorized::ColumnNullable&>(column);
|
||||
memcpy(selected, nullable_col.get_null_map_data().data(), (max_idx + 1) * sizeof(bool));
|
||||
// call nested predicate evaluate
|
||||
uint16_t new_size = _nested->evaluate(nullable_col.get_nested_column(), sel, size);
|
||||
|
||||
// process NULL values
|
||||
if (new_size < size) {
|
||||
std::vector<uint8_t> selected(max_idx + 1, 0);
|
||||
const auto* nullmap = nullable_col.get_null_map_data().data();
|
||||
// add rows selected by _nested->evaluate
|
||||
for (uint16_t i = 0; i < new_size; ++i) {
|
||||
uint16_t row_idx = sel[i];
|
||||
selected[row_idx] = true;
|
||||
}
|
||||
// reset null from original data
|
||||
for (uint16_t i = 0; i < size; ++i) {
|
||||
uint16_t row_idx = old_sel[i];
|
||||
selected[row_idx] |= nullmap[row_idx];
|
||||
}
|
||||
|
||||
// recaculate new_size and sel array
|
||||
new_size = 0;
|
||||
@ -198,4 +189,4 @@ private:
|
||||
std::unique_ptr<ColumnPredicate> _nested;
|
||||
};
|
||||
|
||||
} //namespace doris
|
||||
} //namespace doris
|
||||
@ -167,9 +167,9 @@ private:
|
||||
std::string _debug_string() const override {
|
||||
std::shared_lock<std::shared_mutex> lock(_mtx);
|
||||
if (!_nested) {
|
||||
return "shared_predicate<unknow>";
|
||||
return "shared_predicate(unknow)";
|
||||
}
|
||||
return "shared_predicate<" + _nested->debug_string() + ">";
|
||||
return "shared_predicate(" + _nested->debug_string() + ")";
|
||||
}
|
||||
|
||||
mutable std::shared_mutex _mtx;
|
||||
|
||||
13
regression-test/data/nereids_arith_p0/topn/accept_null.out
Normal file
13
regression-test/data/nereids_arith_p0/topn/accept_null.out
Normal file
@ -0,0 +1,13 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !test --
|
||||
100 dd 100 0
|
||||
1000 dd 1000 0
|
||||
10000 dd 10000 0
|
||||
10001 dd 10001 0
|
||||
10002 dd 10002 0
|
||||
10003 dd 10003 0
|
||||
10004 dd 10004 0
|
||||
10005 dd 10005 0
|
||||
10006 dd 10006 0
|
||||
10007 dd 10007 0
|
||||
|
||||
110
regression-test/suites/nereids_arith_p0/topn/accept_null.groovy
Normal file
110
regression-test/suites/nereids_arith_p0/topn/accept_null.groovy
Normal file
@ -0,0 +1,110 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
import org.codehaus.groovy.runtime.IOGroovyMethods
|
||||
|
||||
suite ("accept_null") {
|
||||
sql """ drop table IF EXISTS detail_tmp;"""
|
||||
|
||||
sql """
|
||||
CREATE TABLE `detail_tmp` (
|
||||
`id` VARCHAR(512) NOT NULL,
|
||||
`accident_no` VARCHAR(512) NULL,
|
||||
`accident_type_name` VARCHAR(512) NULL
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(`id`)
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS AUTO
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"min_load_replica_num" = "-1",
|
||||
"is_being_synced" = "false",
|
||||
"storage_medium" = "hdd",
|
||||
"storage_format" = "V2",
|
||||
"inverted_index_storage_format" = "V1",
|
||||
"enable_unique_key_merge_on_write" = "true",
|
||||
"light_schema_change" = "true",
|
||||
"disable_auto_compaction" = "false",
|
||||
"enable_single_replica_compaction" = "false",
|
||||
"group_commit_interval_ms" = "10000",
|
||||
"group_commit_data_bytes" = "134217728",
|
||||
"enable_mow_light_delete" = "false"
|
||||
);
|
||||
"""
|
||||
|
||||
sql "insert into detail_tmp(id,accident_type_name,accident_no) select e1,'dd',e1 from (select 1 k1) as t lateral view explode_numbers(100000) tmp1 as e1;"
|
||||
sql "delete from detail_tmp where accident_no <100;"
|
||||
|
||||
def tablets = sql_return_maparray """ show tablets from detail_tmp; """
|
||||
|
||||
// before full compaction, there are 7 rowsets in all tablets.
|
||||
for (def tablet : tablets) {
|
||||
int rowsetCount = 0
|
||||
def (code, out, err) = curl("GET", tablet.CompactionStatus)
|
||||
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
|
||||
assertEquals(code, 0)
|
||||
def tabletJson = parseJson(out.trim())
|
||||
assert tabletJson.rowsets instanceof List
|
||||
}
|
||||
|
||||
// trigger full compactions for all tablets by table id in ${tableName}
|
||||
def backendId_to_backendIP = [:]
|
||||
def backendId_to_backendHttpPort = [:]
|
||||
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
|
||||
boolean disableAutoCompaction = true
|
||||
for(int i=0;i<backendId_to_backendIP.keySet().size();i++){
|
||||
backend_id = backendId_to_backendIP.keySet()[i]
|
||||
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
|
||||
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
|
||||
assertEquals(code, 0)
|
||||
def configList = parseJson(out.trim())
|
||||
assert configList instanceof List
|
||||
|
||||
for (Object ele in (List) configList) {
|
||||
assert ele instanceof List<String>
|
||||
if (((List<String>) ele)[0] == "disable_auto_compaction") {
|
||||
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (def tablet : tablets) {
|
||||
String tablet_id = tablet.TabletId
|
||||
def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """
|
||||
logger.info("tablet"+tablet_info)
|
||||
def table_id = tablet_info[0].TableId
|
||||
backend_id = tablet.BackendId
|
||||
def times = 1
|
||||
def code, out, err
|
||||
do{
|
||||
(code, out, err) = be_run_full_compaction_by_table_id(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), table_id)
|
||||
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
|
||||
++times
|
||||
sleep(2000)
|
||||
} while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10)
|
||||
|
||||
def compactJson = parseJson(out.trim())
|
||||
if (compactJson.status.toLowerCase() == "fail") {
|
||||
assertEquals(disableAutoCompaction, false)
|
||||
logger.info("Compaction was done automatically!")
|
||||
}
|
||||
if (disableAutoCompaction) {
|
||||
assertEquals("success", compactJson.status.toLowerCase())
|
||||
}
|
||||
}
|
||||
|
||||
qt_test "select id,accident_type_name,accident_no,__DORIS_DELETE_SIGN__ From detail_tmp where accident_type_name = 'dd' order by accident_no,id limit 10;"
|
||||
}
|
||||
Reference in New Issue
Block a user