[feature-wip](duplicate_no_keys) skip sort function if the table is duplicate without keys (#19483)
This commit is contained in:
@ -269,30 +269,16 @@ void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_
|
||||
}
|
||||
template <bool is_final>
|
||||
void MemTable::_collect_vskiplist_results() {
|
||||
VecTable::Iterator it(_vec_skip_list.get());
|
||||
vectorized::Block in_block = _input_mutable_block.to_block();
|
||||
if (_keys_type == KeysType::DUP_KEYS) {
|
||||
vectorized::MutableBlock mutable_block =
|
||||
vectorized::MutableBlock::build_mutable_block(&in_block);
|
||||
_vec_row_comparator->set_block(&mutable_block);
|
||||
std::sort(_row_in_blocks.begin(), _row_in_blocks.end(),
|
||||
[this](const RowInBlock* l, const RowInBlock* r) -> bool {
|
||||
auto value = (*(this->_vec_row_comparator))(l, r);
|
||||
if (value == 0) {
|
||||
return l->_row_pos > r->_row_pos;
|
||||
} else {
|
||||
return value < 0;
|
||||
}
|
||||
});
|
||||
std::vector<int> row_pos_vec;
|
||||
DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
|
||||
row_pos_vec.reserve(in_block.rows());
|
||||
for (int i = 0; i < _row_in_blocks.size(); i++) {
|
||||
row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos);
|
||||
if (_schema->num_key_columns() > 0) {
|
||||
_collect_dup_table_with_keys();
|
||||
} else {
|
||||
// skip sort if the table is dup table without keys
|
||||
_collect_dup_table_without_keys();
|
||||
}
|
||||
_output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
|
||||
row_pos_vec.data() + in_block.rows());
|
||||
} else {
|
||||
VecTable::Iterator it(_vec_skip_list.get());
|
||||
vectorized::Block in_block = _input_mutable_block.to_block();
|
||||
size_t idx = 0;
|
||||
for (it.SeekToFirst(); it.Valid(); it.Next()) {
|
||||
auto& block_data = in_block.get_columns_with_type_and_name();
|
||||
@ -343,6 +329,34 @@ void MemTable::_collect_vskiplist_results() {
|
||||
}
|
||||
}
|
||||
|
||||
void MemTable::_collect_dup_table_with_keys() {
|
||||
vectorized::Block in_block = _input_mutable_block.to_block();
|
||||
vectorized::MutableBlock mutable_block =
|
||||
vectorized::MutableBlock::build_mutable_block(&in_block);
|
||||
_vec_row_comparator->set_block(&mutable_block);
|
||||
std::sort(_row_in_blocks.begin(), _row_in_blocks.end(),
|
||||
[this](const RowInBlock* l, const RowInBlock* r) -> bool {
|
||||
auto value = (*(this->_vec_row_comparator))(l, r);
|
||||
if (value == 0) {
|
||||
return l->_row_pos > r->_row_pos;
|
||||
} else {
|
||||
return value < 0;
|
||||
}
|
||||
});
|
||||
std::vector<int> row_pos_vec;
|
||||
DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
|
||||
row_pos_vec.reserve(in_block.rows());
|
||||
for (int i = 0; i < _row_in_blocks.size(); i++) {
|
||||
row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos);
|
||||
}
|
||||
_output_mutable_block.add_rows(&in_block, row_pos_vec.data(),
|
||||
row_pos_vec.data() + in_block.rows());
|
||||
}
|
||||
|
||||
void MemTable::_collect_dup_table_without_keys() {
|
||||
_output_mutable_block.swap(_input_mutable_block);
|
||||
}
|
||||
|
||||
void MemTable::shrink_memtable_by_agg() {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get());
|
||||
if (_keys_type == KeysType::DUP_KEYS) {
|
||||
|
||||
@ -181,6 +181,8 @@ private:
|
||||
|
||||
template <bool is_final>
|
||||
void _collect_vskiplist_results();
|
||||
void _collect_dup_table_with_keys();
|
||||
void _collect_dup_table_without_keys();
|
||||
bool _is_first_insertion;
|
||||
|
||||
void _init_agg_functions(const vectorized::Block* block);
|
||||
|
||||
@ -0,0 +1,20 @@
|
||||
1,1,1
|
||||
2,0,2
|
||||
3,1,3
|
||||
4,0,4
|
||||
5,1,5
|
||||
6,1,1
|
||||
7,0,2
|
||||
8,1,3
|
||||
9,0,4
|
||||
10,1,5
|
||||
11,1,1
|
||||
12,0,2
|
||||
13,1,3
|
||||
14,0,4
|
||||
15,1,5
|
||||
16,1,1
|
||||
17,0,2
|
||||
18,1,3
|
||||
19,0,4
|
||||
20,1,5
|
||||
|
@ -0,0 +1,111 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_dup_table_without_keys_load") {
|
||||
|
||||
sql """ DROP TABLE IF EXISTS test_dup_table_without_keys_load """
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS test_dup_table_without_keys_load (
|
||||
`user_id` bigint(20) NULL,
|
||||
`is_delete` tinyint(4) NULL,
|
||||
`client_version_int` int(11) NULL
|
||||
) ENGINE=OLAP
|
||||
COMMENT 'duplicate_no_keys'
|
||||
DISTRIBUTED BY HASH(`user_id`) BUCKETS 4
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"disable_auto_compaction" = "true"
|
||||
);
|
||||
"""
|
||||
|
||||
sql """ INSERT INTO test_dup_table_without_keys_load VALUES
|
||||
(1,1,1),
|
||||
(2,0,2),
|
||||
(3,1,3),
|
||||
(4,0,4),
|
||||
(5,1,5),
|
||||
(6,1,1),
|
||||
(7,0,2),
|
||||
(8,1,3),
|
||||
(9,0,4),
|
||||
(10,1,5);
|
||||
"""
|
||||
|
||||
sql """ INSERT INTO test_dup_table_without_keys_load VALUES
|
||||
(11,1,1),
|
||||
(12,0,2),
|
||||
(13,1,3),
|
||||
(14,0,4),
|
||||
(15,1,5),
|
||||
(16,1,1),
|
||||
(17,0,2),
|
||||
(18,1,3),
|
||||
(19,0,4),
|
||||
(20,1,5);
|
||||
"""
|
||||
|
||||
test {
|
||||
sql """
|
||||
SELECT * FROM test_dup_table_without_keys_load;
|
||||
"""
|
||||
rowNum 20
|
||||
}
|
||||
|
||||
sql """ DROP TABLE IF EXISTS test_dup_table_without_keys_load """
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS test_dup_table_without_keys_load (
|
||||
`user_id` bigint(20) NULL,
|
||||
`is_delete` tinyint(4) NULL,
|
||||
`client_version_int` int(11) NULL
|
||||
) ENGINE=OLAP
|
||||
COMMENT 'duplicate_no_keys'
|
||||
DISTRIBUTED BY HASH(`user_id`) BUCKETS 4
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"disable_auto_compaction" = "true"
|
||||
);
|
||||
"""
|
||||
|
||||
streamLoad {
|
||||
// you can skip declare db, because a default db already specify in ${DORIS_HOME}/conf/regression-conf.groovy
|
||||
// db 'regression_test'
|
||||
db 'regression_test_correctness'
|
||||
table 'test_dup_table_without_keys_load'
|
||||
|
||||
// default label is UUID:
|
||||
// set 'label' UUID.randomUUID().toString()
|
||||
|
||||
// default column_separator is specify in doris fe config, usually is '\t'.
|
||||
// this line change to ','
|
||||
set 'column_separator', ','
|
||||
|
||||
// relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
|
||||
// also, you can stream load a http stream, e.g. http://xxx/some.csv
|
||||
file 'test_dup_table_without_keys_load.csv'
|
||||
|
||||
time 10000 // limit inflight 10s
|
||||
|
||||
// stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows
|
||||
}
|
||||
|
||||
test {
|
||||
sql """
|
||||
SELECT * FROM test_dup_table_without_keys_load;
|
||||
"""
|
||||
rowNum 20
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user