diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index b79ed6552e..49c42615b1 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -834,8 +834,42 @@ Status RowBlockChanger::change_block(vectorized::Block* ref_block, } for (auto it : swap_idx_map) { - new_block->get_by_position(it.second).column.swap( - ref_block->get_by_position(it.first).column); + auto& ref_col = ref_block->get_by_position(it.first); + auto& new_col = new_block->get_by_position(it.second); + + bool ref_col_nullable = ref_col.column->is_nullable(); + bool new_col_nullable = new_col.column->is_nullable(); + + if (ref_col_nullable != new_col_nullable) { + // not nullable to nullable + if (new_col_nullable) { + auto* new_nullable_col = assert_cast( + std::move(*new_col.column).mutate().get()); + + new_nullable_col->swap_nested_column(ref_col.column); + new_nullable_col->get_null_map_data().resize_fill(new_nullable_col->size()); + } else { + // nullable to not nullable: + // suppose column `c_phone` is originally varchar(16) NOT NULL, + // then do schema change `alter table test modify column c_phone int not null`, + // the cast expr of schema change is `CastExpr(CAST String to Nullable(Int32))`, + // so need to handle nullable to not nullable here + auto* ref_nullable_col = assert_cast( + std::move(*ref_col.column).mutate().get()); + + const auto* null_map = ref_nullable_col->get_null_map_column().get_data().data(); + + for (size_t i = 0; i < row_size; i++) { + if (null_map[i]) { + return Status::DataQualityError("is_null of data is changed!"); + } + } + ref_nullable_col->swap_nested_column(new_col.column); + } + } else { + new_block->get_by_position(it.second).column.swap( + ref_block->get_by_position(it.first).column); + } } return Status::OK(); @@ -1286,21 +1320,19 @@ Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader RowsetWriter* rowset_writer, TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema) { - auto new_block = - std::make_unique(new_tablet->tablet_schema()->create_block()); - auto ref_block = std::make_unique(base_tablet_schema->create_block()); + do { + auto new_block = + std::make_unique(new_tablet->tablet_schema()->create_block()); + auto ref_block = std::make_unique(base_tablet_schema->create_block()); - int origin_columns_size = ref_block->columns(); + rowset_reader->next_block(ref_block.get()); + if (ref_block->rows() < 1) { + break; + } - rowset_reader->next_block(ref_block.get()); - while (ref_block->rows()) { RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get())); RETURN_IF_ERROR(rowset_writer->add_block(new_block.get())); - - new_block->clear_column_data(); - ref_block->clear_column_data(origin_columns_size); - rowset_reader->next_block(ref_block.get()); - } + } while (true); if (!rowset_writer->flush()) { return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR); @@ -1522,12 +1554,6 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea int64_t newest_write_timestamp = rowset->newest_write_timestamp(); _temp_delta_versions.first = _temp_delta_versions.second; - auto new_block = - std::make_unique(new_tablet->tablet_schema()->create_block()); - auto ref_block = std::make_unique(base_tablet_schema->create_block()); - - int origin_columns_size = ref_block->columns(); - auto create_rowset = [&]() -> Status { if (blocks.empty()) { return Status::OK(); @@ -1550,8 +1576,16 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea return Status::OK(); }; - rowset_reader->next_block(ref_block.get()); - while (ref_block->rows()) { + auto new_block = + std::make_unique(new_tablet->tablet_schema()->create_block()); + + do { + auto ref_block = std::make_unique(base_tablet_schema->create_block()); + rowset_reader->next_block(ref_block.get()); + if (ref_block->rows() < 1) { + break; + } + RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get())); if (!_mem_tracker->check_limit(_memory_limitation, new_block->allocated_bytes())) { RETURN_IF_ERROR(create_rowset()); @@ -1570,10 +1604,7 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea blocks.push_back( std::make_unique(new_tablet->tablet_schema()->create_block())); swap(blocks.back(), new_block); - - ref_block->clear_column_data(origin_columns_size); - rowset_reader->next_block(ref_block.get()); - } + } while (true); RETURN_IF_ERROR(create_rowset()); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index fdfa880e1d..906686676d 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -214,6 +214,9 @@ public: } bool only_null() const override { return nested_column->is_dummy(); } + // used in schema change + void swap_nested_column(ColumnPtr& other) { ((ColumnPtr&)nested_column).swap(other); } + /// Return the column that represents values. IColumn& get_nested_column() { return *nested_column; } const IColumn& get_nested_column() const { return *nested_column; } diff --git a/regression-test/data/schema_change_p0/test_alter_table_column_nullable.out b/regression-test/data/schema_change_p0/test_alter_table_column_nullable.out new file mode 100644 index 0000000000..aafbf59f58 --- /dev/null +++ b/regression-test/data/schema_change_p0/test_alter_table_column_nullable.out @@ -0,0 +1,13 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_char -- +1 11 + +-- !select_int -- +1 11 + +-- !select_char_to_int_fail -- +1 zz + +-- !select_char_to_int -- +1 11 + diff --git a/regression-test/suites/schema_change_p0/test_alter_table_column_nullable.groovy b/regression-test/suites/schema_change_p0/test_alter_table_column_nullable.groovy new file mode 100644 index 0000000000..6e6903bf3c --- /dev/null +++ b/regression-test/suites/schema_change_p0/test_alter_table_column_nullable.groovy @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_alter_table_column_nullable") { + def tbName = "alter_table_column_nullable" + + def getJobState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ + return jobStateResult[0][9] + } + + sql "DROP TABLE IF EXISTS ${tbName}" + // char not null to null + sql """ + CREATE TABLE ${tbName} ( + k1 INT NOT NULL, + value1 varchar(16) NOT NULL + ) + DUPLICATE KEY (k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 properties("replication_num" = "1"); + """ + StringBuilder insertCommand = new StringBuilder(); + insertCommand.append("INSERT INTO ${tbName} VALUES "); + // insert row count: 4096 * 2 + 1 = 8193, 3 blocks + int insert_row_count = 8193; + while (insert_row_count-- > 1) { + insertCommand.append("(1, '11'),"); + } + insertCommand.append("(1, '11')"); + sql insertCommand.toString() + + // not nullable to nullable + sql """alter table ${tbName} modify column value1 varchar(128) NULL"""; + + int max_try_secs = 60 + while (max_try_secs--) { + String res = getJobState(tbName) + if (res == "FINISHED") { + break + } else { + Thread.sleep(2000) + if (max_try_secs < 1) { + println "test timeout," + "state:" + res + assertEquals("FINISHED",res) + } + } + } + + qt_select_char """select * from ${tbName} limit 1""" + + // int not null to null, linked schema change + sql "DROP TABLE ${tbName}" + sql """ + CREATE TABLE ${tbName} ( + k1 INT NOT NULL, + value1 INT NOT NULL + ) + DUPLICATE KEY (k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 properties("replication_num" = "1"); + """ + insertCommand = new StringBuilder(); + insertCommand.append("INSERT INTO ${tbName} VALUES "); + while (insert_row_count-- > 1) { + insertCommand.append("(1, 11),"); + } + insertCommand.append("(1, 11)"); + sql insertCommand.toString() + + sql """alter table ${tbName} modify column value1 INT NULL"""; + + max_try_secs = 60 + while (max_try_secs--) { + String res = getJobState(tbName) + if (res == "FINISHED") { + break + } else { + Thread.sleep(2000) + if (max_try_secs < 1) { + println "test timeout," + "state:" + res + assertEquals("FINISHED",res) + } + } + } + + qt_select_int """select * from ${tbName} limit 1""" + + // char not null to int not null, data loss + sql "DROP TABLE ${tbName}" + sql """ + CREATE TABLE ${tbName} ( + k1 INT NOT NULL, + value1 varchar(16) NOT NULL + ) + DUPLICATE KEY (k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 properties("replication_num" = "1"); + """ + sql "INSERT INTO ${tbName} VALUES (1, 'zz')"; + + // the cast expr of schema change is `CastExpr(CAST String to Nullable(Int32))`, + sql """alter table ${tbName} modify column value1 INT NOT NULL""" + + max_try_secs = 60 + while (max_try_secs--) { + String res = getJobState(tbName) + if (res == "CANCELLED") { + break + } else { + Thread.sleep(2000) + if (max_try_secs < 1) { + println "test timeout," + "state:" + res + assertEquals("CANCELLED",res) + } + } + } + qt_select_char_to_int_fail """select * from ${tbName}""" + + // char not null to int not null OK + sql "DROP TABLE ${tbName}" + sql """ + CREATE TABLE ${tbName} ( + k1 INT NOT NULL, + value1 varchar(16) NOT NULL + ) + DUPLICATE KEY (k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 properties("replication_num" = "1"); + """ + insertCommand = new StringBuilder(); + insertCommand.append("INSERT INTO ${tbName} VALUES "); + while (insert_row_count-- > 1) { + insertCommand.append("(1, '11'),"); + } + insertCommand.append("(1, '11')"); + sql insertCommand.toString() + + // the cast expr of schema change is `CastExpr(CAST String to Nullable(Int32))`, + sql """alter table ${tbName} modify column value1 INT NOT NULL"""; + + max_try_secs = 60 + while (max_try_secs--) { + String res = getJobState(tbName) + if (res == "FINISHED") { + break + } else { + Thread.sleep(2000) + if (max_try_secs < 1) { + println "test timeout," + "state:" + res + assertEquals("FINISHED",res) + } + } + } + + qt_select_char_to_int """select * from ${tbName} limit 1""" +}