[fix](partial-update) fix update core for merge-on-write table (#22090)

This commit is contained in:
Xin Liao
2023-07-23 13:35:08 +08:00
committed by GitHub
parent 2c16fe0da9
commit 4f0158c458
5 changed files with 76 additions and 8 deletions

View File

@ -517,8 +517,8 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
continue;
}
for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, rids,
old_value_block.get_names()[cid],
TabletColumn tablet_column = _tablet_schema->column(cids_missing[cid]);
auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column,
mutable_old_columns[cid]);
// set read value to output block
if (!st.ok()) {

View File

@ -2654,13 +2654,12 @@ Status Tablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint
Status Tablet::fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t segid,
const std::vector<uint32_t>& rowids,
const std::string& column_name,
const TabletColumn& tablet_column,
vectorized::MutableColumnPtr& dst) {
// read row data
BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(input_rowset);
CHECK(rowset);
const TabletSchemaSPtr tablet_schema = rowset->tablet_schema();
SegmentCacheHandle segment_cache;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, &segment_cache, true));
// find segment
@ -2681,8 +2680,7 @@ Status Tablet::fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t segi
});
// create _source column
std::unique_ptr<segment_v2::ColumnIterator> column_iterator = nullptr;
RETURN_IF_ERROR(
segment->new_column_iterator(tablet_schema->column(column_name), &column_iterator));
RETURN_IF_ERROR(segment->new_column_iterator(tablet_column, &column_iterator));
segment_v2::ColumnIteratorOptions opt;
OlapReaderStatistics stats;
opt.file_reader = segment->file_reader().get();
@ -3130,8 +3128,9 @@ Status Tablet::read_columns_by_plan(TabletSchemaSPtr tablet_schema,
continue;
}
for (size_t cid = 0; cid < mutable_columns.size(); ++cid) {
TabletColumn tablet_column = tablet_schema->column(cids_to_read[cid]);
auto st = fetch_value_by_rowids(rowset_iter->second, seg_it.first, rids,
block.get_names()[cid], mutable_columns[cid]);
tablet_column, mutable_columns[cid]);
// set read value to output block
if (!st.ok()) {
LOG(WARNING) << "failed to fetch value";

View File

@ -428,7 +428,8 @@ public:
Status fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t segid,
const std::vector<uint32_t>& rowids,
const std::string& column_name, vectorized::MutableColumnPtr& dst);
const TabletColumn& tablet_column,
vectorized::MutableColumnPtr& dst);
Status fetch_value_through_row_column(RowsetSharedPtr input_rowset, uint32_t segid,
const std::vector<uint32_t>& rowids,

View File

@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1
2 2 2
-- !sql --
1 3 1 0
2 2 2 0
-- !sql --
1 4 1
2 2 2

View File

@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_partial_update_schema_change", "p0") {
def tableName = "test_partial_update_schema_change"
// create table
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
`c0` int NOT NULL,
`c1` int NOT NULL,
`c2` int NOT NULL)
UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
"light_schema_change" = "true",
"enable_unique_key_merge_on_write" = "true")
"""
sql " insert into ${tableName} values(1,1,1) "
sql " insert into ${tableName} values(2,2,2) "
qt_sql " select * from ${tableName} order by c0 "
sql " ALTER table ${tableName} add column c3 INT DEFAULT '0' "
sql " update ${tableName} set c1 = 3 where c0 = 1 "
qt_sql " select * from ${tableName} order by c0 "
sql " ALTER table ${tableName} drop column c3 "
sql " update ${tableName} set c1 = 4 where c0 = 1 "
qt_sql " select * from ${tableName} order by c0 "
// drop table
sql """ DROP TABLE IF EXISTS ${tableName} """
}