[fix] (topn) fix uncleared block in topn_next() (#39119) (#39224)

## Proposed changes

pick from master #39119
This commit is contained in:
Sun Chenyang
2024-08-13 10:34:17 +08:00
committed by GitHub
parent beab6a81c1
commit a6155a517d
3 changed files with 105 additions and 16 deletions

View File

@ -256,18 +256,21 @@ Status VCollectIterator::_topn_next(Block* block) {
return Status::Error<END_OF_FILE>("");
}
// clear TEMP columns to avoid column align problem
auto clear_temp_columns = [](Block* block) {
auto all_column_names = block->get_names();
for (auto& name : all_column_names) {
if (name.rfind(BeConsts::BLOCK_TEMP_COLUMN_PREFIX, 0) == 0) {
// clear TEMP columns from block to prevent from storage engine merge with this
// fake column
block->erase(name);
}
}
};
clear_temp_columns(block);
auto clone_block = block->clone_empty();
MutableBlock mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block);
// clear TEMP columns to avoid column align problem in mutable_block.add_rows bellow
auto all_column_names = mutable_block.get_names();
for (auto& name : all_column_names) {
if (name.rfind(BeConsts::BLOCK_TEMP_COLUMN_PREFIX, 0) == 0) {
mutable_block.erase(name);
// clear TEMP columns from block to prevent from storage engine merge with this
// fake column
block->erase(name);
}
}
if (!_reader->_reader_context.read_orderby_key_columns) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
@ -301,6 +304,8 @@ Status VCollectIterator::_topn_next(Block* block) {
if (status.is<END_OF_FILE>()) {
eof = true;
if (block->rows() == 0) {
// clear TEMP columns to avoid column align problem in segment iterator
clear_temp_columns(block);
break;
}
} else {
@ -312,12 +317,7 @@ Status VCollectIterator::_topn_next(Block* block) {
RETURN_IF_ERROR(VExprContext::filter_block(
_reader->_reader_context.filter_block_conjuncts, block, block->columns()));
// clear TMPE columns to avoid column align problem in mutable_block.add_rows bellow
auto all_column_names = block->get_names();
for (auto& name : all_column_names) {
if (name.rfind(BeConsts::BLOCK_TEMP_COLUMN_PREFIX, 0) == 0) {
block->erase(name);
}
}
clear_temp_columns(block);
// update read rows
read_rows += block->rows();

View File

@ -0,0 +1,5 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
17.0.0.0
17.0.0.0

View File

@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_clear_block") {
// load data
def load_data = { loadTableName, fileName ->
streamLoad {
table loadTableName
set 'read_json_by_line', 'true'
set 'format', 'json'
file fileName
time 10000
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
}
sql """ set enable_match_without_inverted_index = false; """
// sql """ set
def dupTableName = "dup_httplogs"
sql """ drop table if exists ${dupTableName} """
// create table
sql """
CREATE TABLE IF NOT EXISTS dup_httplogs
(
`id` bigint NOT NULL AUTO_INCREMENT(100),
`@timestamp` int(11) NULL,
`clientip` varchar(20) NULL,
`request` text NULL,
`status` int(11) NULL,
`size` int(11) NULL,
INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true") COMMENT '',
INDEX status_idx (`status`) USING INVERTED COMMENT '',
INDEX size_idx (`size`) USING INVERTED COMMENT ''
) DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH (`id`) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"compaction_policy" = "time_series",
"inverted_index_storage_format" = "v2",
"compression" = "ZSTD",
"disable_auto_compaction" = "true"
);
"""
load_data.call(dupTableName, 'documents-1000.json');
load_data.call(dupTableName, 'documents-1000.json');
load_data.call(dupTableName, 'documents-1000.json');
load_data.call(dupTableName, 'documents-1000.json');
load_data.call(dupTableName, 'documents-1000.json');
sql """ delete from dup_httplogs where clientip = '40.135.0.0'; """
sql """ delete from dup_httplogs where status = 304; """
sql """ delete from dup_httplogs where size = 24736; """
sql """ delete from dup_httplogs where request = 'GET /images/hm_bg.jpg HTTP/1.0'; """
sql """ sync """
qt_sql """ SELECT clientip from ${dupTableName} WHERE clientip NOT IN (NULL, '') or clientip IN ('17.0.0.0') ORDER BY id LIMIT 2 """
}