[fix](sequence) fix that update table core dump with sequence column (#13847)

* [fix](sequence) fix that update table core dump with sequence column

* update
This commit is contained in:
Xin Liao
2022-11-03 09:02:21 +08:00
committed by GitHub
parent 1ee6518e00
commit 37e4a1769d
9 changed files with 178 additions and 48 deletions

View File

@ -156,7 +156,7 @@ Status NewOlapScanner::_init_tablet_reader_params(
real_parent->_olap_scan_node.__isset.push_down_agg_type_opt;
}
RETURN_IF_ERROR(_init_return_columns(!_tablet_reader_params.direct_mode));
RETURN_IF_ERROR(_init_return_columns());
_tablet_reader_params.tablet = _tablet;
_tablet_reader_params.tablet_schema = _tablet_schema;
@ -224,6 +224,22 @@ Status NewOlapScanner::_init_tablet_reader_params(
_tablet_reader_params.return_columns.push_back(index);
}
}
// expand the sequence column
if (_tablet_schema->has_sequence_col()) {
bool has_replace_col = false;
for (auto col : _return_columns) {
if (_tablet_schema->column(col).aggregation() ==
FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) {
has_replace_col = true;
break;
}
}
if (auto sequence_col_idx = _tablet_schema->sequence_col_idx();
has_replace_col && std::find(_return_columns.begin(), _return_columns.end(),
sequence_col_idx) == _return_columns.end()) {
_tablet_reader_params.return_columns.push_back(sequence_col_idx);
}
}
}
// If a agg node is this scan node direct parent
@ -255,7 +271,7 @@ Status NewOlapScanner::_init_tablet_reader_params(
return Status::OK();
}
Status NewOlapScanner::_init_return_columns(bool need_seq_col) {
Status NewOlapScanner::_init_return_columns() {
for (auto slot : _output_tuple_desc->slots()) {
if (!slot->is_materialized()) {
continue;
@ -277,23 +293,6 @@ Status NewOlapScanner::_init_return_columns(bool need_seq_col) {
}
}
// expand the sequence column
if (_tablet_schema->has_sequence_col() && need_seq_col) {
bool has_replace_col = false;
for (auto col : _return_columns) {
if (_tablet_schema->column(col).aggregation() ==
FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) {
has_replace_col = true;
break;
}
}
if (auto sequence_col_idx = _tablet_schema->sequence_col_idx();
has_replace_col && std::find(_return_columns.begin(), _return_columns.end(),
sequence_col_idx) == _return_columns.end()) {
_return_columns.push_back(sequence_col_idx);
}
}
if (_return_columns.empty()) {
return Status::InternalError("failed to build storage scanner, no materialized slot!");
}

View File

@ -64,7 +64,7 @@ private:
bloom_filters,
const std::vector<FunctionFilter>& function_filters);
Status _init_return_columns(bool need_seq_col);
Status _init_return_columns();
private:
bool _aggregation;

View File

@ -175,7 +175,7 @@ Status VOlapScanner::_init_tablet_reader_params(
_tablet_reader_params.direct_mode = _aggregation || single_version ||
_parent->_olap_scan_node.__isset.push_down_agg_type_opt;
}
RETURN_IF_ERROR(_init_return_columns(!_tablet_reader_params.direct_mode));
RETURN_IF_ERROR(_init_return_columns());
_tablet_reader_params.tablet = _tablet;
_tablet_reader_params.tablet_schema = _tablet_schema;
@ -243,6 +243,22 @@ Status VOlapScanner::_init_tablet_reader_params(
_tablet_reader_params.return_columns.push_back(index);
}
}
// expand the sequence column
if (_tablet_schema->has_sequence_col()) {
bool has_replace_col = false;
for (auto col : _return_columns) {
if (_tablet_schema->column(col).aggregation() ==
FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) {
has_replace_col = true;
break;
}
}
if (auto sequence_col_idx = _tablet_schema->sequence_col_idx();
has_replace_col && std::find(_return_columns.begin(), _return_columns.end(),
sequence_col_idx) == _return_columns.end()) {
_tablet_reader_params.return_columns.push_back(sequence_col_idx);
}
}
}
// If a agg node is this scan node direct parent
@ -274,7 +290,7 @@ Status VOlapScanner::_init_tablet_reader_params(
return Status::OK();
}
Status VOlapScanner::_init_return_columns(bool need_seq_col) {
Status VOlapScanner::_init_return_columns() {
for (auto slot : _tuple_desc->slots()) {
if (!slot->is_materialized()) {
continue;
@ -295,23 +311,6 @@ Status VOlapScanner::_init_return_columns(bool need_seq_col) {
}
}
// expand the sequence column
if (_tablet_schema->has_sequence_col() && need_seq_col) {
bool has_replace_col = false;
for (auto col : _return_columns) {
if (_tablet_schema->column(col).aggregation() ==
FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) {
has_replace_col = true;
break;
}
}
if (auto sequence_col_idx = _tablet_schema->sequence_col_idx();
has_replace_col && std::find(_return_columns.begin(), _return_columns.end(),
sequence_col_idx) == _return_columns.end()) {
_return_columns.push_back(sequence_col_idx);
}
}
if (_return_columns.empty()) {
return Status::InternalError("failed to build storage scanner, no materialized slot!");
}

View File

@ -97,7 +97,7 @@ private:
const std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>>&
bloom_filters,
const std::vector<FunctionFilter>& function_filters);
Status _init_return_columns(bool need_seq_col);
Status _init_return_columns();
// Update profile that need to be reported in realtime.
void _update_realtime_counter();

View File

@ -108,14 +108,7 @@ void BlockReader::_init_agg_state(const ReaderParams& read_params) {
Status BlockReader::init(const ReaderParams& read_params) {
RETURN_NOT_OK(TabletReader::init(read_params));
int32_t return_column_size = 0;
// read sequence column if not reader_query
if (read_params.reader_type != ReaderType::READER_QUERY) {
return_column_size = read_params.origin_return_columns->size();
} else {
return_column_size =
read_params.origin_return_columns->size() - (_sequence_col_idx != -1 ? 1 : 0);
}
int32_t return_column_size = read_params.origin_return_columns->size();
_return_columns_loc.resize(read_params.return_columns.size());
for (int i = 0; i < return_column_size; ++i) {
auto cid = read_params.origin_return_columns->at(i);

View File

@ -0,0 +1,22 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !all --
1 4 11
2 5 12
3 6 13
-- !all --
1 2 14
2 5 12
3 6 13
-- !all --
1 10 14
2 5 14
3 6 11
-- !all --
1 10 14 0 14
15 9 18 0 \N
2 5 14 0 12
3 6 11 0 13

View File

@ -0,0 +1,3 @@
1,4,11
2,5,12
3,6,13
1 1 4 11
2 2 5 12
3 3 6 13

View File

@ -0,0 +1,3 @@
1,2,14
2,3,2
3,4,3
1 1 2 14
2 2 3 2
3 3 4 3

View File

@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_unique_table_sequence") {
def tableName = "test_uniq_sequence"
sql "DROP TABLE IF EXISTS ${tableName}"
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`k1` int NULL,
`v1` tinyint NULL,
`v2` int
) ENGINE=OLAP
UNIQUE KEY(k1)
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
PROPERTIES (
"function_column.sequence_type" = "int",
"replication_allocation" = "tag.location.default: 1"
);
"""
// load unique key
streamLoad {
table "${tableName}"
set 'column_separator', ','
set 'columns', 'k1,v1,v2'
set 'function_column.sequence_col', 'v2'
file 'unique_key_data1.csv'
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(3, json.NumberTotalRows)
assertEquals(3, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(0, json.NumberUnselectedRows)
}
}
sql "sync"
order_qt_all "SELECT * from ${tableName}"
// load unique key
streamLoad {
table "${tableName}"
set 'column_separator', ','
set 'columns', 'k1,v1,v2'
set 'function_column.sequence_col', 'v2'
file 'unique_key_data2.csv'
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(3, json.NumberTotalRows)
assertEquals(3, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(0, json.NumberUnselectedRows)
}
}
sql "sync"
order_qt_all "SELECT * from ${tableName}"
sql "UPDATE ${tableName} SET v1 = 10 WHERE k1 = 1"
sql "UPDATE ${tableName} SET v2 = 14 WHERE k1 = 2"
sql "UPDATE ${tableName} SET v2 = 11 WHERE k1 = 3"
sql "sync"
order_qt_all "SELECT * from ${tableName}"
sql "INSERT INTO ${tableName} values(15, 8, 19)"
sql "INSERT INTO ${tableName} values(15, 9, 18)"
sql "SET show_hidden_columns=true"
sql "sync"
order_qt_all "SELECT * from ${tableName}"
sql "DROP TABLE ${tableName}"
}